Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v2 0/3] Transaction boundaries for applier
@ 2019-03-06 20:16 Georgy Kirichenko
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream Georgy Kirichenko
                   ` (2 more replies)
  0 siblings, 3 replies; 11+ messages in thread
From: Georgy Kirichenko @ 2019-03-06 20:16 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

This patchset consists of three patches.
* The first one removes xstream dependency from applier.
* The second one creates a separate journal transaction for all local
  effects in case of replication what is needed to be able to replicate
  such effects back.
* The third one turns applier into transaction mode - an applier first
  fetches the whole transaction and then applies all rows within
  transaction boundaries.

Changes in v2:
 * Get rid of apply_initial_journal_row and apply_row from box.cc and
   purge box dependency from applier.
 * txn.cc and txn.h changes moved into a separate commit.
 * applier_read_tx uses stailq to form a list of rows in a transaction.
 * use exceptions for applier routines.
 * slight changes according to review.

Georgy Kirichenko (3):
  Applier gets rid of a xstream
  Put all new rows to the end of journal request
  Transaction support for applier

 src/box/applier.cc                    | 276 ++++++++++++++++++++------
 src/box/applier.h                     |   9 +-
 src/box/box.cc                        |  68 +------
 src/box/txn.c                         |  36 +++-
 src/box/txn.h                         |   4 +
 test/replication/transaction.result   | 240 ++++++++++++++++++++++
 test/replication/transaction.test.lua |  86 ++++++++
 7 files changed, 581 insertions(+), 138 deletions(-)
 create mode 100644 test/replication/transaction.result
 create mode 100644 test/replication/transaction.test.lua

-- 
2.21.0

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream
  2019-03-06 20:16 [tarantool-patches] [PATCH v2 0/3] Transaction boundaries for applier Georgy Kirichenko
@ 2019-03-06 20:16 ` Georgy Kirichenko
  2019-03-07  9:31   ` Vladimir Davydov
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 2/3] Put all new rows to the end of journal request Georgy Kirichenko
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 3/3] Transaction support for applier Georgy Kirichenko
  2 siblings, 1 reply; 11+ messages in thread
From: Georgy Kirichenko @ 2019-03-06 20:16 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Remove xstream dependency and use direct box interface to apply all
replication rows. This is refactoring before transactional replication.

Needed for: #2798
---
 src/box/applier.cc | 69 ++++++++++++++++++++++++++++++++++++++--------
 src/box/applier.h  |  9 +-----
 src/box/box.cc     | 68 ++++++++-------------------------------------
 3 files changed, 69 insertions(+), 77 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index e9addcb3e..a687d2bea 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -37,7 +37,6 @@
 #include "fiber_cond.h"
 #include "coio.h"
 #include "coio_buf.h"
-#include "xstream.h"
 #include "wal.h"
 #include "xrow.h"
 #include "replication.h"
@@ -48,6 +47,9 @@
 #include "error.h"
 #include "session.h"
 #include "cfg.h"
+#include "schema.h"
+#include "txn.h"
+#include "box.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -167,6 +169,53 @@ applier_writer_f(va_list ap)
 	return 0;
 }
 
+static int
+apply_initial_join_row(struct xrow_header *row)
+{
+	struct request request;
+	xrow_decode_dml(row, &request, dml_request_key_map(row->type));
+	struct space *space = space_cache_find_xc(request.space_id);
+	/* no access checks here - applier always works with admin privs */
+	return space_apply_initial_join_row(space, &request);
+}
+
+/**
+ * Process a no-op request.
+ *
+ * A no-op request does not affect any space, but it
+ * promotes vclock and is written to WAL.
+ */
+static int
+process_nop(struct request *request)
+{
+	assert(request->type == IPROTO_NOP);
+	struct txn *txn = txn_begin_stmt(NULL);
+	if (txn == NULL)
+		return -1;
+	return txn_commit_stmt(txn, request);
+}
+
+static int
+apply_row(struct xrow_header *row)
+{
+	struct request request;
+	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
+		return -1;
+	if (request.type == IPROTO_NOP) {
+		if (process_nop(&request) != 0)
+			return -1;
+		return 0;
+	}
+	struct space *space = space_cache_find(request.space_id);
+	if (space == NULL)
+		return -1;
+	if (box_process_rw(&request, space, NULL) != 0) {
+		say_error("error applying row: %s", request_str(&request));
+		return -1;
+	}
+	return 0;
+}
+
 /**
  * Connect to a remote host and authenticate the client.
  */
@@ -309,13 +358,13 @@ applier_join(struct applier *applier)
 	/*
 	 * Receive initial data.
 	 */
-	assert(applier->join_stream != NULL);
 	uint64_t row_count = 0;
 	while (true) {
 		coio_read_xrow(coio, ibuf, &row);
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
-			xstream_write_xc(applier->join_stream, &row);
+			if (apply_initial_join_row(&row) != 0)
+				diag_raise();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
 		} else if (row.type == IPROTO_OK) {
@@ -357,7 +406,8 @@ applier_join(struct applier *applier)
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			vclock_follow_xrow(&replicaset.vclock, &row);
-			xstream_write_xc(applier->subscribe_stream, &row);
+			if (apply_row(&row) != 0)
+				diag_raise();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
 		} else if (row.type == IPROTO_OK) {
@@ -385,8 +435,6 @@ applier_join(struct applier *applier)
 static void
 applier_subscribe(struct applier *applier)
 {
-	assert(applier->subscribe_stream != NULL);
-
 	/* Send SUBSCRIBE request */
 	struct ev_io *coio = &applier->io;
 	struct ibuf *ibuf = &applier->ibuf;
@@ -550,7 +598,7 @@ applier_subscribe(struct applier *applier)
 		 */
 		latch_lock(latch);
 		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			int res = xstream_write(applier->subscribe_stream, &row);
+			int res = apply_row(&row);
 			if (res != 0) {
 				struct error *e = diag_last_error(diag_get());
 				/*
@@ -569,7 +617,7 @@ applier_subscribe(struct applier *applier)
 					nop.bodycnt = 0;
 					nop.replica_id = row.replica_id;
 					nop.lsn = row.lsn;
-					res = xstream_write(applier->subscribe_stream, &nop);
+					res = apply_row(&nop);
 				}
 			}
 			if (res != 0) {
@@ -731,8 +779,7 @@ applier_stop(struct applier *applier)
 }
 
 struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
-	    struct xstream *subscribe_stream)
+applier_new(const char *uri)
 {
 	struct applier *applier = (struct applier *)
 		calloc(1, sizeof(struct applier));
@@ -751,8 +798,6 @@ applier_new(const char *uri, struct xstream *join_stream,
 	assert(rc == 0 && applier->uri.service != NULL);
 	(void) rc;
 
-	applier->join_stream = join_stream;
-	applier->subscribe_stream = subscribe_stream;
 	applier->last_row_time = ev_monotonic_now(loop());
 	rlist_create(&applier->on_state);
 	fiber_cond_create(&applier->resume_cond);
diff --git a/src/box/applier.h b/src/box/applier.h
index d942b6fbb..5bff90031 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -45,8 +45,6 @@
 
 #include "xrow.h"
 
-struct xstream;
-
 enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
 
 #define applier_STATE(_)                                             \
@@ -116,10 +114,6 @@ struct applier {
 	bool is_paused;
 	/** Condition variable signaled to resume the applier. */
 	struct fiber_cond resume_cond;
-	/** xstream to process rows during initial JOIN */
-	struct xstream *join_stream;
-	/** xstream to process rows during final JOIN and SUBSCRIBE */
-	struct xstream *subscribe_stream;
 };
 
 /**
@@ -152,8 +146,7 @@ applier_stop(struct applier *applier);
  * @error   throws OutOfMemory exception if out of memory.
  */
 struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
-	    struct xstream *subscribe_stream);
+applier_new(const char *uri);
 
 /**
  * Destroy and delete a applier.
diff --git a/src/box/box.cc b/src/box/box.cc
index a62595421..190e5eae5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -122,10 +122,6 @@ static fiber_cond ro_cond;
  */
 static bool is_orphan;
 
-/* Use the shared instance of xstream for all appliers */
-static struct xstream join_stream;
-static struct xstream subscribe_stream;
-
 /**
  * The pool of fibers in the transaction processor thread
  * working on incoming messages from net, wal and other
@@ -203,22 +199,6 @@ box_process_rw(struct request *request, struct space *space,
 	return rc;
 }
 
-/**
- * Process a no-op request.
- *
- * A no-op request does not affect any space, but it
- * promotes vclock and is written to WAL.
- */
-static int
-process_nop(struct request *request)
-{
-	assert(request->type == IPROTO_NOP);
-	struct txn *txn = txn_begin_stmt(NULL);
-	if (txn == NULL)
-		return -1;
-	return txn_commit_stmt(txn, request);
-}
-
 void
 box_set_ro(bool ro)
 {
@@ -305,35 +285,24 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 	journal->vclock = v;
 }
 
-static inline void
-apply_row(struct xstream *stream, struct xrow_header *row)
+static void
+apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
-	(void) stream;
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
-	if (request.type == IPROTO_NOP) {
-		if (process_nop(&request) != 0)
+	if (request.type != IPROTO_NOP) {
+		struct space *space = space_cache_find_xc(request.space_id);
+		if (box_process_rw(&request, space, NULL) != 0) {
+			say_error("error applying row: %s", request_str(&request));
 			diag_raise();
-		return;
-	}
-	struct space *space = space_cache_find_xc(request.space_id);
-	if (box_process_rw(&request, space, NULL) != 0) {
-		say_error("error applying row: %s", request_str(&request));
-		diag_raise();
+		}
 	}
-}
-
-static void
-apply_wal_row(struct xstream *stream, struct xrow_header *row)
-{
-	apply_row(stream, row);
-
 	struct wal_stream *xstream =
 		container_of(stream, struct wal_stream, base);
 	/**
-	 * Yield once in a while, but not too often,
-	 * mostly to allow signal handling to take place.
-	 */
+	* Yield once in a while, but not too often,
+	* mostly to allow signal handling to take place.
+	*/
 	if (++xstream->rows % xstream->yield == 0)
 		fiber_sleep(0);
 }
@@ -352,17 +321,6 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows)
 	ctx->yield = (wal_max_rows >> 4)  + 1;
 }
 
-static void
-apply_initial_join_row(struct xstream *stream, struct xrow_header *row)
-{
-	(void) stream;
-	struct request request;
-	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
-	struct space *space = space_cache_find_xc(request.space_id);
-	/* no access checks here - applier always works with admin privs */
-	space_apply_initial_join_row_xc(space, &request);
-}
-
 /* {{{ configuration bindings */
 
 static void
@@ -656,9 +614,7 @@ cfg_get_replication(int *p_count)
 
 	for (int i = 0; i < count; i++) {
 		const char *source = cfg_getarr_elem("replication", i);
-		struct applier *applier = applier_new(source,
-						      &join_stream,
-						      &subscribe_stream);
+		struct applier *applier = applier_new(source);
 		if (applier == NULL) {
 			/* Delete created appliers */
 			while (--i >= 0)
@@ -2131,8 +2087,6 @@ box_cfg_xc(void)
 	box_set_replication_sync_lag();
 	box_set_replication_sync_timeout();
 	box_set_replication_skip_conflict();
-	xstream_create(&join_stream, apply_initial_join_row);
-	xstream_create(&subscribe_stream, apply_row);
 
 	struct gc_checkpoint *checkpoint = gc_last_checkpoint();
 
-- 
2.21.0

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] [PATCH v2 2/3] Put all new rows to the end of journal request
  2019-03-06 20:16 [tarantool-patches] [PATCH v2 0/3] Transaction boundaries for applier Georgy Kirichenko
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream Georgy Kirichenko
@ 2019-03-06 20:16 ` Georgy Kirichenko
  2019-03-07  9:46   ` Vladimir Davydov
  2019-03-07 10:38   ` [tarantool-patches] " Konstantin Osipov
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 3/3] Transaction support for applier Georgy Kirichenko
  2 siblings, 2 replies; 11+ messages in thread
From: Georgy Kirichenko @ 2019-03-06 20:16 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Form a separate transaction with all local changes in case of replication.
This is important because we should be able to replicate such changes
(e.g. made within an on_replace triggers) back. In the opposite case
local changes will be incorporated into originating transaction and
wold be skipped by originator replica.

Needed for: #2798
---
 src/box/txn.c | 36 ++++++++++++++++++++++++++++--------
 src/box/txn.h |  4 ++++
 2 files changed, 32 insertions(+), 8 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 7900fb3ab..187e1c085 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -141,6 +141,8 @@ txn_begin(bool is_autocommit)
 	/* Initialize members explicitly to save time on memset() */
 	stailq_create(&txn->stmts);
 	txn->n_rows = 0;
+	txn->has_rows_with_lsn = false;
+	txn->has_rows_without_lsn = false;
 	txn->is_autocommit = is_autocommit;
 	txn->has_triggers  = false;
 	txn->is_aborted = false;
@@ -233,6 +235,8 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
 		if (txn_add_redo(stmt, request) != 0)
 			goto fail;
+		txn->has_rows_with_lsn |= stmt->row->replica_id != 0;
+		txn->has_rows_without_lsn |= stmt->row->replica_id == 0;
 		++txn->n_rows;
 	}
 	/*
@@ -272,13 +276,29 @@ txn_write_to_wal(struct txn *txn)
 
 	struct txn_stmt *stmt;
 	struct xrow_header **row = req->rows;
-	stailq_foreach_entry(stmt, &txn->stmts, next) {
-		if (stmt->row == NULL)
-			continue; /* A read (e.g. select) request */
-		*row++ = stmt->row;
-		req->approx_len += xrow_approx_len(stmt->row);
+	if (txn->has_rows_with_lsn) {
+		/* Write out rows that already have an assigned lsn. */
+		stailq_foreach_entry(stmt, &txn->stmts, next) {
+			if (stmt->row == NULL)
+				continue; /* A read (e.g. select) request */
+			if (stmt->row->replica_id == 0)
+				continue; /* A row without lsn. */
+			*row++ = stmt->row;
+			req->approx_len += xrow_approx_len(stmt->row);
+		}
+	}
+	if (txn->has_rows_without_lsn) {
+		/* Write out rows to assign a lsn. */
+		stailq_foreach_entry(stmt, &txn->stmts, next) {
+			if (stmt->row == NULL)
+				continue; /* A read (e.g. select) request */
+			if (stmt->row->replica_id != 0)
+				continue; /* A row with lsn. */
+			*row++ = stmt->row;
+			req->approx_len += xrow_approx_len(stmt->row);
+		}
 	}
-	assert(row == req->rows + req->n_rows);
+	assert(row == req->rows + txn->n_rows);
 
 	ev_tstamp start = ev_monotonic_now(loop());
 	int64_t res = journal_write(req);
@@ -399,8 +419,6 @@ txn_rollback()
 		txn_stmt_unref_tuples(stmt);
 
 	TRASH(txn);
-	/** Free volatile txn memory. */
-	fiber_gc();
 	fiber_set_txn(fiber(), NULL);
 }
 
@@ -480,6 +498,8 @@ box_txn_rollback()
 		return -1;
 	}
 	txn_rollback(); /* doesn't throw */
+	/** Free volatile txn memory. */
+	fiber_gc();
 	return 0;
 }
 
diff --git a/src/box/txn.h b/src/box/txn.h
index de5cb0de4..cb82ff25b 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -142,6 +142,10 @@ struct txn {
 	struct stailq stmts;
 	/** Total number of WAL rows in this txn. */
 	int n_rows;
+	/** True if the txn has rows with an assigned lsn. */
+	bool has_rows_with_lsn;
+	/** True if the txn has rows without a lsn. */
+	bool has_rows_without_lsn;
 	/**
 	 * True if this transaction is running in autocommit mode
 	 * (statement end causes an automatic transaction commit).
-- 
2.21.0

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] [PATCH v2 3/3] Transaction support for applier
  2019-03-06 20:16 [tarantool-patches] [PATCH v2 0/3] Transaction boundaries for applier Georgy Kirichenko
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream Georgy Kirichenko
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 2/3] Put all new rows to the end of journal request Georgy Kirichenko
@ 2019-03-06 20:16 ` Georgy Kirichenko
  2019-03-07 10:38   ` Vladimir Davydov
  2019-03-07 10:40   ` [tarantool-patches] " Konstantin Osipov
  2 siblings, 2 replies; 11+ messages in thread
From: Georgy Kirichenko @ 2019-03-06 20:16 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Applier fetch incoming rows to form a transaction and then apply it.
Rows are fetched and stored on fiber gc region until last transaction row
with is_commit was fetched. After fetch a multi row transaction is going to be
applied into txn_begin/txn_commit/txn_rolback boundaries. At this time
we could not apply single row transaction in such boundaries because of
ddl which does not support non auto commit transactions.

Closes: #2798
Needed for: #980
---
 src/box/applier.cc                    | 211 ++++++++++++++++------
 test/replication/transaction.result   | 240 ++++++++++++++++++++++++++
 test/replication/transaction.test.lua |  86 +++++++++
 3 files changed, 482 insertions(+), 55 deletions(-)
 create mode 100644 test/replication/transaction.result
 create mode 100644 test/replication/transaction.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index a687d2bea..f0a779aa7 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -429,6 +429,146 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * Helper struct to bind rows in a list.
+ */
+struct xrow_header_item {
+	struct stailq_entry next;
+	struct xrow_header row;
+};
+
+/**
+ * Read one transaction from network using applier's input buffer.
+ * Transaction rows are placed onto fiber gc region.
+ * We could not use applier input buffer for that because rpos is adjusted
+ * after each xrow decoding and corresponding network input space is going
+ * to be reused.
+ */
+static void
+applier_read_tx(struct applier *applier, struct stailq *rows)
+{
+	struct ev_io *coio = &applier->io;
+	struct ibuf *ibuf = &applier->ibuf;
+	int64_t tsn = 0;
+
+	stailq_create(rows);
+	do {
+		struct xrow_header_item *item = (struct xrow_header_item *)
+			region_alloc(&fiber()->gc,
+				     sizeof(struct xrow_header_item));
+		if (item == NULL)
+			tnt_raise(OutOfMemory, sizeof(struct xrow_header_item),
+				  "region", "struct xrow_header_item");
+		stailq_add_tail(rows, &item->next);
+		struct xrow_header *row = &item->row;
+
+		double timeout = replication_disconnect_timeout();
+		/*
+		 * Tarantool < 1.7.7 does not send periodic heartbeat
+		 * messages so we can't assume that if we haven't heard
+		 * from the master for quite a while the connection is
+		 * broken - the master might just be idle.
+		 */
+		if (applier->version_id < version_id(1, 7, 7))
+			coio_read_xrow(coio, ibuf, row);
+		else
+			coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+
+		if (iproto_type_is_error(row->type))
+			xrow_decode_error_xc(row);
+
+		/* Replication request. */
+		if (row->replica_id == REPLICA_ID_NIL ||
+		    row->replica_id >= VCLOCK_MAX) {
+			/*
+			 * A safety net, this can only occur
+			 * if we're fed a strangely broken xlog.
+			 */
+			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
+				  int2str(row->replica_id),
+				  tt_uuid_str(&REPLICASET_UUID));
+		}
+		if (tsn == 0) {
+			/*
+			 * Transaction id must be derived from the log sequence
+			 * number of the first row in the transaction.
+			 */
+			tsn = row->tsn;
+			if (row->lsn != tsn)
+				tnt_raise(ClientError, ER_PROTOCOL,
+					  "Transaction id must be derived from "
+					  "the lsn of the first row in the "
+					  "transaction.");
+		}
+		if (tsn != row->tsn)
+			tnt_raise(ClientError, ER_UNSUPPORTED,
+				  "replication",
+				  "interleaving transactions");
+
+		applier->lag = ev_now(loop()) - row->tm;
+		applier->last_row_time = ev_monotonic_now(loop());
+
+		if (row->body->iov_base != NULL && row->is_commit == false) {
+			/* Save row bodies to gc region. */
+			void *new_base = region_alloc(&fiber()->gc,
+						      row->body->iov_len);
+			if (new_base == NULL)
+				tnt_raise(OutOfMemory, row->body->iov_len,
+					  "region", "xrow body");
+			memcpy(new_base, row->body->iov_base, row->body->iov_len);
+			/* Adjust row body pointers. */
+			row->body->iov_base = new_base;
+		}
+
+	} while (stailq_last_entry(rows, struct xrow_header_item,
+				   next)->row.is_commit == 0);
+}
+
+/**
+ * Apply all rows in the rows queue as a single transaction.
+ *
+ * Return 0 for success or -1 in case of an error.
+ */
+static int
+applier_apply_tx(struct stailq *rows)
+{
+	int res = 0;
+	struct txn *txn = NULL;
+	struct xrow_header_item *item;
+	if (stailq_first(rows) != stailq_last(rows))
+		txn = txn_begin(false);
+	stailq_foreach_entry(item, rows, next) {
+		struct xrow_header *row = &item->row;
+		res = apply_row(row);
+		struct error *e;
+		if (res != 0 &&
+		    (e = diag_last_error(diag_get()))->type ==
+			    &type_ClientError &&
+		    box_error_code(e) == ER_TUPLE_FOUND &&
+		    replication_skip_conflict) {
+			/*
+			 * In case of ER_TUPLE_FOUND error and enabled
+			 * replication_skip_conflict configuration
+			 * option, skip applying the foreign row and
+			 * replace it with NOP in the local write ahead
+			 * log.
+			 */
+			diag_clear(diag_get());
+			(row)->type = IPROTO_NOP;
+			(row)->bodycnt = 0;
+			res = apply_row(row);
+		}
+		++row;
+		if (res != 0)
+			break;
+	}
+	if (res == 0 && txn != NULL)
+		res = txn_commit(txn);
+	if (res != 0)
+		txn_rollback();
+	return res;
+}
+
 /**
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
@@ -558,36 +698,18 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Tarantool < 1.7.7 does not send periodic heartbeat
-		 * messages so we can't assume that if we haven't heard
-		 * from the master for quite a while the connection is
-		 * broken - the master might just be idle.
-		 */
-		if (applier->version_id < version_id(1, 7, 7)) {
-			coio_read_xrow(coio, ibuf, &row);
-		} else {
-			double timeout = replication_disconnect_timeout();
-			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
-		}
-
-		if (iproto_type_is_error(row.type))
-			xrow_decode_error_xc(&row);  /* error */
-		/* Replication request. */
-		if (row.replica_id == REPLICA_ID_NIL ||
-		    row.replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row.replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
+		struct stailq rows;
+		applier_read_tx(applier, &rows);
 
-		applier->lag = ev_now(loop()) - row.tm;
+		struct xrow_header *first_row =
+			&stailq_first_entry(&rows, struct xrow_header_item,
+					    next)->row;
+		applier->lag = ev_now(loop()) -
+			       stailq_last_entry(&rows,
+						 struct xrow_header_item,
+						 next)->row.tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(row.replica_id);
+		struct replica *replica = replica_by_id(first_row->replica_id);
 		struct latch *latch = (replica ? &replica->order_latch :
 				       &replicaset.applier.order_latch);
 		/*
@@ -597,33 +719,12 @@ applier_subscribe(struct applier *applier)
 		 * that belong to the same server id.
 		 */
 		latch_lock(latch);
-		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			int res = apply_row(&row);
-			if (res != 0) {
-				struct error *e = diag_last_error(diag_get());
-				/*
-				 * In case of ER_TUPLE_FOUND error and enabled
-				 * replication_skip_conflict configuration
-				 * option, skip applying the foreign row and
-				 * replace it with NOP in the local write ahead
-				 * log.
-				 */
-				if (e->type == &type_ClientError &&
-				    box_error_code(e) == ER_TUPLE_FOUND &&
-				    replication_skip_conflict) {
-					diag_clear(diag_get());
-					struct xrow_header nop;
-					nop.type = IPROTO_NOP;
-					nop.bodycnt = 0;
-					nop.replica_id = row.replica_id;
-					nop.lsn = row.lsn;
-					res = apply_row(&nop);
-				}
-			}
-			if (res != 0) {
-				latch_unlock(latch);
-				diag_raise();
-			}
+		if (vclock_get(&replicaset.vclock, first_row->replica_id) <
+		    first_row->lsn &&
+		    applier_apply_tx(&rows) != 0) {
+			latch_unlock(latch);
+			fiber_gc();
+			diag_raise();
 		}
 		latch_unlock(latch);
 
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
new file mode 100644
index 000000000..009f84430
--- /dev/null
+++ b/test/replication/transaction.result
@@ -0,0 +1,240 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+---
+...
+_ = s:create_index('pk')
+---
+...
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+---
+- [4, 'r']
+...
+v1 = box.info.vclock
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+---
+...
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- set conflict to third transaction
+box.space.test:delete({3})
+---
+...
+box.space.test:replace({6, 'r'})
+---
+- [6, 'r']
+...
+-- restart replication
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+---
+- false
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+  - [6, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- check restart does not help
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+  - [6, 'r']
+...
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}, replication_skip_conflict = true}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- check last transaction applied without conflicting row
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+---
+- [8, 'r']
+...
+box.space.test:replace({9, 'r'})
+---
+- [9, 'r']
+...
+-- issue a conflicting tx
+test_run:cmd("switch default")
+---
+- true
+...
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- vclock should be increased but rows skipped
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+-- check restart does not change something
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
new file mode 100644
index 000000000..47003c644
--- /dev/null
+++ b/test/replication/transaction.test.lua
@@ -0,0 +1,86 @@
+env = require('test_run')
+test_run = env.new()
+box.schema.user.grant('guest', 'replication')
+
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+_ = s:create_index('pk')
+
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+v1 = box.info.vclock
+
+test_run:cmd("switch default")
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+-- set conflict to third transaction
+box.space.test:delete({3})
+box.space.test:replace({6, 'r'})
+-- restart replication
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+
+-- check restart does not help
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+box.cfg{replication = {}, replication_skip_conflict = true}
+box.cfg{replication = replication}
+
+-- check last transaction applied without conflicting row
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+box.space.test:replace({9, 'r'})
+
+-- issue a conflicting tx
+test_run:cmd("switch default")
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- vclock should be increased but rows skipped
+box.space.test:select()
+
+-- check restart does not change something
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.schema.user.revoke('guest', 'replication')
+s:drop()
-- 
2.21.0

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream Georgy Kirichenko
@ 2019-03-07  9:31   ` Vladimir Davydov
  0 siblings, 0 replies; 11+ messages in thread
From: Vladimir Davydov @ 2019-03-07  9:31 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Wed, Mar 06, 2019 at 11:16:15PM +0300, Georgy Kirichenko wrote:
> Remove xstream dependency and use direct box interface to apply all
> replication rows. This is refactoring before transactional replication.
> 
> Needed for: #2798
> ---
>  src/box/applier.cc | 69 ++++++++++++++++++++++++++++++++++++++--------
>  src/box/applier.h  |  9 +-----
>  src/box/box.cc     | 68 ++++++++-------------------------------------
>  3 files changed, 69 insertions(+), 77 deletions(-)

Pushed to 2.1 with the following minor changes:

diff --git a/src/box/applier.cc b/src/box/applier.cc
index a687d2be..94c07aac 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -201,11 +201,8 @@ apply_row(struct xrow_header *row)
 	struct request request;
 	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
 		return -1;
-	if (request.type == IPROTO_NOP) {
-		if (process_nop(&request) != 0)
-			return -1;
-		return 0;
-	}
+	if (request.type == IPROTO_NOP)
+		return process_nop(&request);
 	struct space *space = space_cache_find(request.space_id);
 	if (space == NULL)
 		return -1;
diff --git a/src/box/box.cc b/src/box/box.cc
index 45beefb9..f7ce33aa 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -307,9 +307,9 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 	struct wal_stream *xstream =
 		container_of(stream, struct wal_stream, base);
 	/**
-	* Yield once in a while, but not too often,
-	* mostly to allow signal handling to take place.
-	*/
+	 * Yield once in a while, but not too often,
+	 * mostly to allow signal handling to take place.
+	 */
 	if (++xstream->rows % xstream->yield == 0)
 		fiber_sleep(0);
 }

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [tarantool-patches] [PATCH v2 2/3] Put all new rows to the end of journal request
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 2/3] Put all new rows to the end of journal request Georgy Kirichenko
@ 2019-03-07  9:46   ` Vladimir Davydov
  2019-03-07 10:38   ` [tarantool-patches] " Konstantin Osipov
  1 sibling, 0 replies; 11+ messages in thread
From: Vladimir Davydov @ 2019-03-07  9:46 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Wed, Mar 06, 2019 at 11:16:17PM +0300, Georgy Kirichenko wrote:
> Form a separate transaction with all local changes in case of replication.
> This is important because we should be able to replicate such changes
> (e.g. made within an on_replace triggers) back. In the opposite case
> local changes will be incorporated into originating transaction and
> wold be skipped by originator replica.
> 
> Needed for: #2798
> ---
>  src/box/txn.c | 36 ++++++++++++++++++++++++++++--------
>  src/box/txn.h |  4 ++++
>  2 files changed, 32 insertions(+), 8 deletions(-)
> 
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 7900fb3ab..187e1c085 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -141,6 +141,8 @@ txn_begin(bool is_autocommit)
>  	/* Initialize members explicitly to save time on memset() */
>  	stailq_create(&txn->stmts);
>  	txn->n_rows = 0;
> +	txn->has_rows_with_lsn = false;
> +	txn->has_rows_without_lsn = false;
>  	txn->is_autocommit = is_autocommit;
>  	txn->has_triggers  = false;
>  	txn->is_aborted = false;
> @@ -233,6 +235,8 @@ txn_commit_stmt(struct txn *txn, struct request *request)
>  	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
>  		if (txn_add_redo(stmt, request) != 0)
>  			goto fail;
> +		txn->has_rows_with_lsn |= stmt->row->replica_id != 0;
> +		txn->has_rows_without_lsn |= stmt->row->replica_id == 0;
>  		++txn->n_rows;
>  	}
>  	/*
> @@ -272,13 +276,29 @@ txn_write_to_wal(struct txn *txn)
>  
>  	struct txn_stmt *stmt;
>  	struct xrow_header **row = req->rows;
> -	stailq_foreach_entry(stmt, &txn->stmts, next) {
> -		if (stmt->row == NULL)
> -			continue; /* A read (e.g. select) request */
> -		*row++ = stmt->row;
> -		req->approx_len += xrow_approx_len(stmt->row);
> +	if (txn->has_rows_with_lsn) {
> +		/* Write out rows that already have an assigned lsn. */
> +		stailq_foreach_entry(stmt, &txn->stmts, next) {
> +			if (stmt->row == NULL)
> +				continue; /* A read (e.g. select) request */
> +			if (stmt->row->replica_id == 0)
> +				continue; /* A row without lsn. */
> +			*row++ = stmt->row;
> +			req->approx_len += xrow_approx_len(stmt->row);
> +		}
> +	}
> +	if (txn->has_rows_without_lsn) {
> +		/* Write out rows to assign a lsn. */
> +		stailq_foreach_entry(stmt, &txn->stmts, next) {
> +			if (stmt->row == NULL)
> +				continue; /* A read (e.g. select) request */
> +			if (stmt->row->replica_id != 0)
> +				continue; /* A row with lsn. */
> +			*row++ = stmt->row;
> +			req->approx_len += xrow_approx_len(stmt->row);
> +		}

I liked the previous approach, where you counted remote rows, much more.
For one thing, it didn't require two passes over the statement list.
I don't understand why you ditched it.

>  	}
> -	assert(row == req->rows + req->n_rows);
> +	assert(row == req->rows + txn->n_rows);
>  
>  	ev_tstamp start = ev_monotonic_now(loop());
>  	int64_t res = journal_write(req);
> @@ -399,8 +419,6 @@ txn_rollback()
>  		txn_stmt_unref_tuples(stmt);
>  
>  	TRASH(txn);
> -	/** Free volatile txn memory. */
> -	fiber_gc();
>  	fiber_set_txn(fiber(), NULL);
>  }
>  
> @@ -480,6 +498,8 @@ box_txn_rollback()
>  		return -1;
>  	}
>  	txn_rollback(); /* doesn't throw */
> +	/** Free volatile txn memory. */
> +	fiber_gc();
>  	return 0;
>  }

As I told you during the previous review session, moving fiber_gc from
txn_rollback to box_txn_rollback is a separate change. In fact, it's a
follow-up for 77fa1736dbb9 ("box: factor fiber_gc out of txn_commit").
Please submit it in a separate patch with a proper commentary.

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] Re: [PATCH v2 2/3] Put all new rows to the end of journal request
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 2/3] Put all new rows to the end of journal request Georgy Kirichenko
  2019-03-07  9:46   ` Vladimir Davydov
@ 2019-03-07 10:38   ` Konstantin Osipov
  2019-03-07 10:53     ` Vladimir Davydov
  1 sibling, 1 reply; 11+ messages in thread
From: Konstantin Osipov @ 2019-03-07 10:38 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/03/07 11:20]:
> Form a separate transaction with all local changes in case of replication.
> This is important because we should be able to replicate such changes
> (e.g. made within an on_replace triggers) back. In the opposite case
> local changes will be incorporated into originating transaction and
> wold be skipped by originator replica.
> 
> Needed for: #2798

OK, this patch makes much more sense than the previous one
-perhaps because this has become an isolated change.

I liked a single pass over the list of transaction rows more,
obviously (sorry Vova). 

Could you please return to the idea of the previous patch? 

/*
  0 - count of local rows, 1 - count of remote rows
  We need two different counters to make sure that local and
  remote transaction is 
*/
int n_rows[2];

txn->n_rows[stmt->replica_id == instance_id] ++;

and then the same loop as in the previous patch?

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [tarantool-patches] [PATCH v2 3/3] Transaction support for applier
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 3/3] Transaction support for applier Georgy Kirichenko
@ 2019-03-07 10:38   ` Vladimir Davydov
  2019-03-07 10:40   ` [tarantool-patches] " Konstantin Osipov
  1 sibling, 0 replies; 11+ messages in thread
From: Vladimir Davydov @ 2019-03-07 10:38 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Wed, Mar 06, 2019 at 11:16:18PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> Rows are fetched and stored on fiber gc region until last transaction row
> with is_commit was fetched. After fetch a multi row transaction is going to be
> applied into txn_begin/txn_commit/txn_rolback boundaries. At this time
> we could not apply single row transaction in such boundaries because of
> ddl which does not support non auto commit transactions.
> 
> Closes: #2798
> Needed for: #980
> ---
>  src/box/applier.cc                    | 211 ++++++++++++++++------
>  test/replication/transaction.result   | 240 ++++++++++++++++++++++++++
>  test/replication/transaction.test.lua |  86 +++++++++
>  3 files changed, 482 insertions(+), 55 deletions(-)
>  create mode 100644 test/replication/transaction.result
>  create mode 100644 test/replication/transaction.test.lua
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index a687d2bea..f0a779aa7 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -429,6 +429,146 @@ applier_join(struct applier *applier)
>  	applier_set_state(applier, APPLIER_READY);
>  }
>  
> +/**
> + * Helper struct to bind rows in a list.
> + */
> +struct xrow_header_item {

I like that you now use a list instead of a dynamically growing array
for storing rows. This spares us from the necessity of implementing
region_realloc. Don't like the name though. What about applier_tx_stmt?

> +	struct stailq_entry next;
> +	struct xrow_header row;
> +};

Please add comments to struct members.

Also, it doesn't seem that you have addressed all my previous comments.
Please go through them once again.

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] Re: [PATCH v2 3/3] Transaction support for applier
  2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 3/3] Transaction support for applier Georgy Kirichenko
  2019-03-07 10:38   ` Vladimir Davydov
@ 2019-03-07 10:40   ` Konstantin Osipov
  1 sibling, 0 replies; 11+ messages in thread
From: Konstantin Osipov @ 2019-03-07 10:40 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/03/07 11:20]:
> Applier fetch incoming rows to form a transaction and then apply it.
> Rows are fetched and stored on fiber gc region until last transaction row
> with is_commit was fetched. After fetch a multi row transaction is going to be
> applied into txn_begin/txn_commit/txn_rolback boundaries. At this time
> we could not apply single row transaction in such boundaries because of
> ddl which does not support non auto commit transactions.
This is LGTM.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [tarantool-patches] Re: [PATCH v2 2/3] Put all new rows to the end of journal request
  2019-03-07 10:38   ` [tarantool-patches] " Konstantin Osipov
@ 2019-03-07 10:53     ` Vladimir Davydov
  2019-03-07 11:22       ` Konstantin Osipov
  0 siblings, 1 reply; 11+ messages in thread
From: Vladimir Davydov @ 2019-03-07 10:53 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches, Georgy Kirichenko

On Thu, Mar 07, 2019 at 01:38:13PM +0300, Konstantin Osipov wrote:
> * Georgy Kirichenko <georgy@tarantool.org> [19/03/07 11:20]:
> > Form a separate transaction with all local changes in case of replication.
> > This is important because we should be able to replicate such changes
> > (e.g. made within an on_replace triggers) back. In the opposite case
> > local changes will be incorporated into originating transaction and
> > wold be skipped by originator replica.
> > 
> > Needed for: #2798
> 
> OK, this patch makes much more sense than the previous one
> -perhaps because this has become an isolated change.
> 
> I liked a single pass over the list of transaction rows more,
> obviously (sorry Vova). 

Sorry for what? I never asked for the two-pass implementation.
In fact, I asked Georgy to return to the previous approach, too.

> 
> Could you please return to the idea of the previous patch? 
> 
> /*
>   0 - count of local rows, 1 - count of remote rows
>   We need two different counters to make sure that local and
>   remote transaction is 
> */
> int n_rows[2];
> 
> txn->n_rows[stmt->replica_id == instance_id] ++;

TBO I find it hackish.

n_remote_rows / n_rows looks much easier for understanding IMO.

> 
> and then the same loop as in the previous patch?

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [tarantool-patches] Re: [PATCH v2 2/3] Put all new rows to the end of journal request
  2019-03-07 10:53     ` Vladimir Davydov
@ 2019-03-07 11:22       ` Konstantin Osipov
  0 siblings, 0 replies; 11+ messages in thread
From: Konstantin Osipov @ 2019-03-07 11:22 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches, Georgy Kirichenko

* Vladimir Davydov <vdavydov.dev@gmail.com> [19/03/07 13:57]:
> > txn->n_rows[stmt->replica_id == instance_id] ++;
> 
> TBO I find it hackish.
> 
> n_remote_rows / n_rows looks much easier for understanding IMO.

It's n_local_rows and n_remote_rows then. Why do we need the total
and the partial sum then?


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2019-03-07 11:22 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-03-06 20:16 [tarantool-patches] [PATCH v2 0/3] Transaction boundaries for applier Georgy Kirichenko
2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream Georgy Kirichenko
2019-03-07  9:31   ` Vladimir Davydov
2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 2/3] Put all new rows to the end of journal request Georgy Kirichenko
2019-03-07  9:46   ` Vladimir Davydov
2019-03-07 10:38   ` [tarantool-patches] " Konstantin Osipov
2019-03-07 10:53     ` Vladimir Davydov
2019-03-07 11:22       ` Konstantin Osipov
2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 3/3] Transaction support for applier Georgy Kirichenko
2019-03-07 10:38   ` Vladimir Davydov
2019-03-07 10:40   ` [tarantool-patches] " Konstantin Osipov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox