Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH 0/3] Transaction boundaries for applier
@ 2019-03-03 20:26 Georgy Kirichenko
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream Georgy Kirichenko
                   ` (2 more replies)
  0 siblings, 3 replies; 15+ messages in thread
From: Georgy Kirichenko @ 2019-03-03 20:26 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

This patchset consists of three patches. The first two are refactoring
which removes xstream dependency from applier. The last one fixes txn in
order to move local transaction effect to the transaction tail which
allows to replicate this effects back and then makes applier to follow
transaction boundaries.

Issue: https://github.com/tarantool/tarantool/issues/2798
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-2798-transaction-boundaries

Georgy Kirichenko (3):
  Applier gets rid of a xstream
  Merge apply row and apply_initial_join_row
  Transaction support for applier

 src/box/applier.cc                    | 259 +++++++++++++++++++-------
 src/box/applier.h                     |   9 +-
 src/box/box.cc                        |  77 +++-----
 src/box/box.h                         |   9 +
 src/box/txn.c                         |  21 ++-
 src/box/txn.h                         |   4 +
 test/replication/transaction.result   | 240 ++++++++++++++++++++++++
 test/replication/transaction.test.lua |  86 +++++++++
 8 files changed, 577 insertions(+), 128 deletions(-)
 create mode 100644 test/replication/transaction.result
 create mode 100644 test/replication/transaction.test.lua

-- 
2.21.0

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream
  2019-03-03 20:26 [tarantool-patches] [PATCH 0/3] Transaction boundaries for applier Georgy Kirichenko
@ 2019-03-03 20:26 ` Georgy Kirichenko
  2019-03-05  8:52   ` [tarantool-patches] " Konstantin Osipov
  2019-03-05 10:19   ` [tarantool-patches] " Vladimir Davydov
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 2/3] Merge apply row and apply_initial_join_row Georgy Kirichenko
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 3/3] Transaction support for applier Georgy Kirichenko
  2 siblings, 2 replies; 15+ messages in thread
From: Georgy Kirichenko @ 2019-03-03 20:26 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Remove xstream dependency and use direct box interface to apply all
replication rows. This is refactoring before transactional replication.

Needed for: #2798
---
 src/box/applier.cc | 20 ++++++++------------
 src/box/applier.h  |  9 +--------
 src/box/box.cc     | 32 ++++++++++++--------------------
 src/box/box.h      | 18 ++++++++++++++++++
 4 files changed, 39 insertions(+), 40 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index e9addcb3e..fd98b733d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -37,7 +37,6 @@
 #include "fiber_cond.h"
 #include "coio.h"
 #include "coio_buf.h"
-#include "xstream.h"
 #include "wal.h"
 #include "xrow.h"
 #include "replication.h"
@@ -48,6 +47,7 @@
 #include "error.h"
 #include "session.h"
 #include "cfg.h"
+#include "box.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -309,13 +309,13 @@ applier_join(struct applier *applier)
 	/*
 	 * Receive initial data.
 	 */
-	assert(applier->join_stream != NULL);
 	uint64_t row_count = 0;
 	while (true) {
 		coio_read_xrow(coio, ibuf, &row);
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
-			xstream_write_xc(applier->join_stream, &row);
+			if (apply_initial_join_row(&row) != 0)
+				diag_raise();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
 		} else if (row.type == IPROTO_OK) {
@@ -357,7 +357,8 @@ applier_join(struct applier *applier)
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			vclock_follow_xrow(&replicaset.vclock, &row);
-			xstream_write_xc(applier->subscribe_stream, &row);
+			if (apply_row(&row) != 0)
+				diag_raise();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
 		} else if (row.type == IPROTO_OK) {
@@ -385,8 +386,6 @@ applier_join(struct applier *applier)
 static void
 applier_subscribe(struct applier *applier)
 {
-	assert(applier->subscribe_stream != NULL);
-
 	/* Send SUBSCRIBE request */
 	struct ev_io *coio = &applier->io;
 	struct ibuf *ibuf = &applier->ibuf;
@@ -550,7 +549,7 @@ applier_subscribe(struct applier *applier)
 		 */
 		latch_lock(latch);
 		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			int res = xstream_write(applier->subscribe_stream, &row);
+			int res = apply_row(&row);
 			if (res != 0) {
 				struct error *e = diag_last_error(diag_get());
 				/*
@@ -569,7 +568,7 @@ applier_subscribe(struct applier *applier)
 					nop.bodycnt = 0;
 					nop.replica_id = row.replica_id;
 					nop.lsn = row.lsn;
-					res = xstream_write(applier->subscribe_stream, &nop);
+					res = apply_row(&nop);
 				}
 			}
 			if (res != 0) {
@@ -731,8 +730,7 @@ applier_stop(struct applier *applier)
 }
 
 struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
-	    struct xstream *subscribe_stream)
+applier_new(const char *uri)
 {
 	struct applier *applier = (struct applier *)
 		calloc(1, sizeof(struct applier));
@@ -751,8 +749,6 @@ applier_new(const char *uri, struct xstream *join_stream,
 	assert(rc == 0 && applier->uri.service != NULL);
 	(void) rc;
 
-	applier->join_stream = join_stream;
-	applier->subscribe_stream = subscribe_stream;
 	applier->last_row_time = ev_monotonic_now(loop());
 	rlist_create(&applier->on_state);
 	fiber_cond_create(&applier->resume_cond);
diff --git a/src/box/applier.h b/src/box/applier.h
index d942b6fbb..5bff90031 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -45,8 +45,6 @@
 
 #include "xrow.h"
 
-struct xstream;
-
 enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
 
 #define applier_STATE(_)                                             \
@@ -116,10 +114,6 @@ struct applier {
 	bool is_paused;
 	/** Condition variable signaled to resume the applier. */
 	struct fiber_cond resume_cond;
-	/** xstream to process rows during initial JOIN */
-	struct xstream *join_stream;
-	/** xstream to process rows during final JOIN and SUBSCRIBE */
-	struct xstream *subscribe_stream;
 };
 
 /**
@@ -152,8 +146,7 @@ applier_stop(struct applier *applier);
  * @error   throws OutOfMemory exception if out of memory.
  */
 struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
-	    struct xstream *subscribe_stream);
+applier_new(const char *uri);
 
 /**
  * Destroy and delete a applier.
diff --git a/src/box/box.cc b/src/box/box.cc
index a62595421..22cd52b04 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -122,10 +122,6 @@ static fiber_cond ro_cond;
  */
 static bool is_orphan;
 
-/* Use the shared instance of xstream for all appliers */
-static struct xstream join_stream;
-static struct xstream subscribe_stream;
-
 /**
  * The pool of fibers in the transaction processor thread
  * working on incoming messages from net, wal and other
@@ -305,28 +301,29 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 	journal->vclock = v;
 }
 
-static inline void
-apply_row(struct xstream *stream, struct xrow_header *row)
+int
+apply_row(struct xrow_header *row)
 {
-	(void) stream;
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type == IPROTO_NOP) {
 		if (process_nop(&request) != 0)
-			diag_raise();
-		return;
+			return -1;
+		return 0;
 	}
 	struct space *space = space_cache_find_xc(request.space_id);
 	if (box_process_rw(&request, space, NULL) != 0) {
 		say_error("error applying row: %s", request_str(&request));
-		diag_raise();
+		return -1;
 	}
+	return 0;
 }
 
 static void
 apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
-	apply_row(stream, row);
+	if (apply_row(row) != 0)
+		diag_raise();
 
 	struct wal_stream *xstream =
 		container_of(stream, struct wal_stream, base);
@@ -352,15 +349,14 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows)
 	ctx->yield = (wal_max_rows >> 4)  + 1;
 }
 
-static void
-apply_initial_join_row(struct xstream *stream, struct xrow_header *row)
+int
+apply_initial_join_row(struct xrow_header *row)
 {
-	(void) stream;
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	struct space *space = space_cache_find_xc(request.space_id);
 	/* no access checks here - applier always works with admin privs */
-	space_apply_initial_join_row_xc(space, &request);
+	return space_apply_initial_join_row(space, &request);
 }
 
 /* {{{ configuration bindings */
@@ -656,9 +652,7 @@ cfg_get_replication(int *p_count)
 
 	for (int i = 0; i < count; i++) {
 		const char *source = cfg_getarr_elem("replication", i);
-		struct applier *applier = applier_new(source,
-						      &join_stream,
-						      &subscribe_stream);
+		struct applier *applier = applier_new(source);
 		if (applier == NULL) {
 			/* Delete created appliers */
 			while (--i >= 0)
@@ -2131,8 +2125,6 @@ box_cfg_xc(void)
 	box_set_replication_sync_lag();
 	box_set_replication_sync_timeout();
 	box_set_replication_skip_conflict();
-	xstream_create(&join_stream, apply_initial_join_row);
-	xstream_create(&subscribe_stream, apply_row);
 
 	struct gc_checkpoint *checkpoint = gc_last_checkpoint();
 
diff --git a/src/box/box.h b/src/box/box.h
index 53d88ab71..8d76b723d 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -218,6 +218,24 @@ extern "C" {
 
 typedef struct tuple box_tuple_t;
 
+/*
+ * Apply row while bootstraping.
+ * Return codes:
+ * 0 for Ok
+ * -1 in case of an error.
+ */
+int
+apply_initial_join_row(struct xrow_header *row);
+
+/*
+ * Apply row after bootstrap is done (e.g. final join, subscribe.
+ * Return codes:
+ * 0 for Ok
+ * -1 in case of an error.
+ */
+int
+apply_row(struct xrow_header *row);
+
 /* box_select is private and used only by FFI */
 API_EXPORT int
 box_select(uint32_t space_id, uint32_t index_id,
-- 
2.21.0

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [tarantool-patches] [PATCH 2/3] Merge apply row and apply_initial_join_row
  2019-03-03 20:26 [tarantool-patches] [PATCH 0/3] Transaction boundaries for applier Georgy Kirichenko
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream Georgy Kirichenko
@ 2019-03-03 20:26 ` Georgy Kirichenko
  2019-03-05  9:06   ` [tarantool-patches] " Konstantin Osipov
  2019-03-05 10:26   ` [tarantool-patches] " Vladimir Davydov
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 3/3] Transaction support for applier Georgy Kirichenko
  2 siblings, 2 replies; 15+ messages in thread
From: Georgy Kirichenko @ 2019-03-03 20:26 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Use apply_row for bot initial join and subscribe.
Refactoring: add memtx and vinyl engine static variable, get rid of
engine_by_name at box.cc

Needed for: 2798
---
 src/box/applier.cc |  2 +-
 src/box/box.cc     | 51 ++++++++++++++++------------------------------
 src/box/box.h      |  9 --------
 3 files changed, 19 insertions(+), 43 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index fd98b733d..3222b041d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -314,7 +314,7 @@ applier_join(struct applier *applier)
 		coio_read_xrow(coio, ibuf, &row);
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
-			if (apply_initial_join_row(&row) != 0)
+			if (apply_row(&row) != 0)
 				diag_raise();
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
diff --git a/src/box/box.cc b/src/box/box.cc
index 22cd52b04..e62f2ea12 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -137,6 +137,15 @@ static struct fiber_pool tx_fiber_pool;
  */
 static struct cbus_endpoint tx_prio_endpoint;
 
+/**
+ * Memtx engine instance
+ */
+static struct memtx_engine *memtx = NULL;
+/**
+ * Vinyl engine instance
+ */
+static struct vinyl_engine *vinyl = NULL;
+
 static int
 box_check_writable(void)
 {
@@ -306,6 +315,15 @@ apply_row(struct xrow_header *row)
 {
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
+
+	assert(memtx != NULL);
+	if (memtx->state == MEMTX_INITIAL_RECOVERY) {
+		/* Snapshot recovery or initial join */
+		struct space *space = space_cache_find_xc(request.space_id);
+		/* no access checks here - applier always works with admin privs */
+		return space_apply_initial_join_row(space, &request);
+	}
+
 	if (request.type == IPROTO_NOP) {
 		if (process_nop(&request) != 0)
 			return -1;
@@ -349,16 +367,6 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows)
 	ctx->yield = (wal_max_rows >> 4)  + 1;
 }
 
-int
-apply_initial_join_row(struct xrow_header *row)
-{
-	struct request request;
-	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
-	struct space *space = space_cache_find_xc(request.space_id);
-	/* no access checks here - applier always works with admin privs */
-	return space_apply_initial_join_row(space, &request);
-}
-
 /* {{{ configuration bindings */
 
 static void
@@ -784,13 +792,9 @@ box_set_io_collect_interval(void)
 void
 box_set_snap_io_rate_limit(void)
 {
-	struct memtx_engine *memtx;
-	memtx = (struct memtx_engine *)engine_by_name("memtx");
 	assert(memtx != NULL);
 	memtx_engine_set_snap_io_rate_limit(memtx,
 			cfg_getd("snap_io_rate_limit"));
-	struct vinyl_engine *vinyl;
-	vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
 	assert(vinyl != NULL);
 	vinyl_engine_set_snap_io_rate_limit(vinyl,
 			cfg_getd("snap_io_rate_limit"));
@@ -799,8 +803,6 @@ box_set_snap_io_rate_limit(void)
 void
 box_set_memtx_memory(void)
 {
-	struct memtx_engine *memtx;
-	memtx = (struct memtx_engine *)engine_by_name("memtx");
 	assert(memtx != NULL);
 	memtx_engine_set_memory_xc(memtx,
 		box_check_memtx_memory(cfg_geti64("memtx_memory")));
@@ -809,8 +811,6 @@ box_set_memtx_memory(void)
 void
 box_set_memtx_max_tuple_size(void)
 {
-	struct memtx_engine *memtx;
-	memtx = (struct memtx_engine *)engine_by_name("memtx");
 	assert(memtx != NULL);
 	memtx_engine_set_max_tuple_size(memtx,
 			cfg_geti("memtx_max_tuple_size"));
@@ -821,8 +821,6 @@ box_set_too_long_threshold(void)
 {
 	too_long_threshold = cfg_getd("too_long_threshold");
 
-	struct vinyl_engine *vinyl;
-	vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
 	assert(vinyl != NULL);
 	vinyl_engine_set_too_long_threshold(vinyl, too_long_threshold);
 }
@@ -860,8 +858,6 @@ box_set_checkpoint_wal_threshold(void)
 void
 box_set_vinyl_memory(void)
 {
-	struct vinyl_engine *vinyl;
-	vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
 	assert(vinyl != NULL);
 	vinyl_engine_set_memory_xc(vinyl,
 		box_check_vinyl_memory(cfg_geti64("vinyl_memory")));
@@ -870,8 +866,6 @@ box_set_vinyl_memory(void)
 void
 box_set_vinyl_max_tuple_size(void)
 {
-	struct vinyl_engine *vinyl;
-	vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
 	assert(vinyl != NULL);
 	vinyl_engine_set_max_tuple_size(vinyl,
 			cfg_geti("vinyl_max_tuple_size"));
@@ -880,8 +874,6 @@ box_set_vinyl_max_tuple_size(void)
 void
 box_set_vinyl_cache(void)
 {
-	struct vinyl_engine *vinyl;
-	vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
 	assert(vinyl != NULL);
 	vinyl_engine_set_cache(vinyl, cfg_geti64("vinyl_cache"));
 }
@@ -889,8 +881,6 @@ box_set_vinyl_cache(void)
 void
 box_set_vinyl_timeout(void)
 {
-	struct vinyl_engine *vinyl;
-	vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
 	assert(vinyl != NULL);
 	vinyl_engine_set_timeout(vinyl,	cfg_getd("vinyl_timeout"));
 }
@@ -1703,7 +1693,6 @@ engine_init()
 	 * in checkpoints (in enigne_foreach order),
 	 * so it must be registered first.
 	 */
-	struct memtx_engine *memtx;
 	memtx = memtx_engine_new_xc(cfg_gets("memtx_dir"),
 				    cfg_geti("force_recovery"),
 				    cfg_getd("memtx_memory"),
@@ -1718,7 +1707,6 @@ engine_init()
 	struct engine *blackhole = blackhole_engine_new_xc();
 	engine_register(blackhole);
 
-	struct vinyl_engine *vinyl;
 	vinyl = vinyl_engine_new_xc(cfg_gets("vinyl_dir"),
 				    cfg_geti64("vinyl_memory"),
 				    cfg_geti("vinyl_read_threads"),
@@ -1963,9 +1951,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	 */
 	engine_begin_initial_recovery_xc(&recovery->vclock);
 
-	struct memtx_engine *memtx;
-	memtx = (struct memtx_engine *)engine_by_name("memtx");
-	assert(memtx != NULL);
 
 	struct recovery_journal journal;
 	recovery_journal_create(&journal, &recovery->vclock);
diff --git a/src/box/box.h b/src/box/box.h
index 8d76b723d..4f9b27264 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -218,15 +218,6 @@ extern "C" {
 
 typedef struct tuple box_tuple_t;
 
-/*
- * Apply row while bootstraping.
- * Return codes:
- * 0 for Ok
- * -1 in case of an error.
- */
-int
-apply_initial_join_row(struct xrow_header *row);
-
 /*
  * Apply row after bootstrap is done (e.g. final join, subscribe.
  * Return codes:
-- 
2.21.0

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [tarantool-patches] [PATCH 3/3] Transaction support for applier
  2019-03-03 20:26 [tarantool-patches] [PATCH 0/3] Transaction boundaries for applier Georgy Kirichenko
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream Georgy Kirichenko
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 2/3] Merge apply row and apply_initial_join_row Georgy Kirichenko
@ 2019-03-03 20:26 ` Georgy Kirichenko
  2019-03-05  9:11   ` [tarantool-patches] " Konstantin Osipov
                     ` (4 more replies)
  2 siblings, 5 replies; 15+ messages in thread
From: Georgy Kirichenko @ 2019-03-03 20:26 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Applier fetch incoming rows to form a transaction and then apply it.
In case of replication all local changes moved to an journal entry
tail to form a separate transaction (like autonomous transaction)
to be able to replicate changes back so applier assumes that transactions
could not be mixed in a replication stream.

Closes: #2798
Needed for: #980
---
 src/box/applier.cc                    | 243 ++++++++++++++++++++------
 src/box/txn.c                         |  21 ++-
 src/box/txn.h                         |   4 +
 test/replication/transaction.result   | 240 +++++++++++++++++++++++++
 test/replication/transaction.test.lua |  86 +++++++++
 5 files changed, 534 insertions(+), 60 deletions(-)
 create mode 100644 test/replication/transaction.result
 create mode 100644 test/replication/transaction.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 3222b041d..dfabbe5ab 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -48,6 +48,12 @@
 #include "session.h"
 #include "cfg.h"
 #include "box.h"
+#include "txn.h"
+
+enum {
+	/* Initial capacity of rows array. */
+	APPLIER_TX_INITIAL_ROW_COUNT = 16,
+};
 
 STRS(applier_state, applier_STATE);
 
@@ -380,6 +386,176 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * Read one transaction from network using applier's input buffer.
+ * Transaction rows are placed onto fiber gc region.
+ * We could not use applier input buffer for that because rpos is adjusted
+ * after each xrow decoding and corresponding network input space is going
+ * to be reused.
+ *
+ * Return count of transaction rows and put row's header pointers into rows
+ * array.
+ */
+static int
+applier_read_tx(struct applier *applier, struct xrow_header **rows)
+{
+	struct ev_io *coio = &applier->io;
+	struct ibuf *ibuf = &applier->ibuf;
+	int64_t tsn = 0;
+	int row_capacity = APPLIER_TX_INITIAL_ROW_COUNT;
+	struct xrow_header *first_row, *row;
+	first_row = (struct xrow_header *)region_alloc(&fiber()->gc,
+						       row_capacity *
+						       sizeof(struct xrow_header));
+	if (first_row == NULL) {
+		diag_set(OutOfMemory, sizeof(struct xrow_header) * row_capacity,
+			 "region", "struct xrow_header");
+		goto error;
+	}
+	row = first_row;
+
+	do {
+		if (row == first_row + row_capacity) {
+			/* Realloc rows array. */
+			row = (struct xrow_header *)region_alloc(&fiber()->gc,
+								 row_capacity *
+								 sizeof(struct xrow_header) << 1);
+			if (row == NULL) {
+				diag_set(OutOfMemory,
+					 sizeof(struct xrow_header) *
+					 row_capacity << 1,
+					 "region", "struct xrow_header");
+				goto error;
+			}
+			memcpy(row, first_row, row_capacity *
+					       sizeof(struct xrow_header) << 1);
+			first_row = row;
+			row = first_row + row_capacity;
+			row_capacity <<= 1;
+		}
+
+		double timeout = replication_disconnect_timeout();
+		/*
+		 * Unfortunately we do not have C-version of coio read xrow
+		 * functions yet so use try-catch guard as workaround.
+		 */
+		try {
+			/*
+			 * Tarantool < 1.7.7 does not send periodic heartbeat
+			 * messages so we can't assume that if we haven't heard
+			 * from the master for quite a while the connection is
+			 * broken - the master might just be idle.
+			 */
+			if (applier->version_id < version_id(1, 7, 7))
+				coio_read_xrow(coio, ibuf, row);
+			else
+				coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+		} catch (...) {
+			goto error;
+		}
+
+		if (iproto_type_is_error(row->type)) {
+			xrow_decode_error(row);
+			goto error;
+		}
+
+		/* Replication request. */
+		if (row->replica_id == REPLICA_ID_NIL ||
+		    row->replica_id >= VCLOCK_MAX) {
+			/*
+			 * A safety net, this can only occur
+			 * if we're fed a strangely broken xlog.
+			 */
+			diag_set(ClientError, ER_UNKNOWN_REPLICA,
+				 int2str(row->replica_id),
+				 tt_uuid_str(&REPLICASET_UUID));
+			goto error;
+		}
+		if (row == first_row) {
+			/*
+			 * First row in a transaction. In order to enforce
+			 * consistency check that first row lsn and replica id
+			 * match with transaction.
+			 */
+			tsn = row->tsn;
+			if (row->lsn != tsn) {
+				/* There is not a first row in the transactions. */
+				diag_set(ClientError, ER_PROTOCOL,
+					 "Not a first row in a transaction");
+				goto error;
+			}
+		}
+		if (tsn != row->tsn) {
+			/* We are not able to handle interleaving transactions. */
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "replications",
+				 "interleaving transactions");
+			goto error;
+		}
+
+
+		applier->lag = ev_now(loop()) - row->tm;
+		applier->last_row_time = ev_monotonic_now(loop());
+
+		if (row->body->iov_base != NULL) {
+			/* Save row bodies to gc region. */
+			void *new_base = region_alloc(&fiber()->gc,
+						      row->body->iov_len);
+			if (new_base == NULL) {
+				diag_set(OutOfMemory, row->body->iov_len,
+					 "slab", "xrow_data");
+				goto error;
+			}
+			memcpy(new_base, row->body->iov_base, row->body->iov_len);
+			/* Adjust row body pointers. */
+			row->body->iov_base = new_base;
+		}
+
+	} while (row->is_commit == 0 && ++row);
+
+	*rows = first_row;
+	return row - first_row + 1;
+error:
+	return -1;
+}
+
+static int
+applier_apply_tx(struct xrow_header *first_row, struct xrow_header *last_row)
+{
+	int res = 0;
+	struct txn *txn = NULL;
+	struct xrow_header *row = first_row;
+	if (first_row != last_row)
+		txn = txn_begin(false);
+	while (row <= last_row && res == 0) {
+		res = apply_row(row);
+		struct error *e;
+		if (res != 0 &&
+		    (e = diag_last_error(diag_get()))->type ==
+			    &type_ClientError &&
+		    box_error_code(e) == ER_TUPLE_FOUND &&
+		    replication_skip_conflict) {
+			/*
+			 * In case of ER_TUPLE_FOUND error and enabled
+			 * replication_skip_conflict configuration
+			 * option, skip applying the foreign row and
+			 * replace it with NOP in the local write ahead
+			 * log.
+			 */
+			diag_clear(diag_get());
+			(row)->type = IPROTO_NOP;
+			(row)->bodycnt = 0;
+			res = apply_row(row);
+		}
+		++row;
+	}
+	if (res == 0 && txn != NULL)
+		res = txn_commit(txn);
+	if (res != 0)
+		txn_rollback();
+	return res;
+}
+
 /**
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
@@ -509,36 +685,14 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Tarantool < 1.7.7 does not send periodic heartbeat
-		 * messages so we can't assume that if we haven't heard
-		 * from the master for quite a while the connection is
-		 * broken - the master might just be idle.
-		 */
-		if (applier->version_id < version_id(1, 7, 7)) {
-			coio_read_xrow(coio, ibuf, &row);
-		} else {
-			double timeout = replication_disconnect_timeout();
-			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
-		}
+		struct xrow_header *tx_rows;
+		int row_count = applier_read_tx(applier, &tx_rows);
+		if (row_count < 0)
+			diag_raise();
 
-		if (iproto_type_is_error(row.type))
-			xrow_decode_error_xc(&row);  /* error */
-		/* Replication request. */
-		if (row.replica_id == REPLICA_ID_NIL ||
-		    row.replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row.replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
-
-		applier->lag = ev_now(loop()) - row.tm;
+		applier->lag = ev_now(loop()) - (tx_rows + row_count - 1)->tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(row.replica_id);
+		struct replica *replica = replica_by_id(tx_rows->replica_id);
 		struct latch *latch = (replica ? &replica->order_latch :
 				       &replicaset.applier.order_latch);
 		/*
@@ -548,33 +702,12 @@ applier_subscribe(struct applier *applier)
 		 * that belong to the same server id.
 		 */
 		latch_lock(latch);
-		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			int res = apply_row(&row);
-			if (res != 0) {
-				struct error *e = diag_last_error(diag_get());
-				/*
-				 * In case of ER_TUPLE_FOUND error and enabled
-				 * replication_skip_conflict configuration
-				 * option, skip applying the foreign row and
-				 * replace it with NOP in the local write ahead
-				 * log.
-				 */
-				if (e->type == &type_ClientError &&
-				    box_error_code(e) == ER_TUPLE_FOUND &&
-				    replication_skip_conflict) {
-					diag_clear(diag_get());
-					struct xrow_header nop;
-					nop.type = IPROTO_NOP;
-					nop.bodycnt = 0;
-					nop.replica_id = row.replica_id;
-					nop.lsn = row.lsn;
-					res = apply_row(&nop);
-				}
-			}
-			if (res != 0) {
-				latch_unlock(latch);
-				diag_raise();
-			}
+		if (vclock_get(&replicaset.vclock,
+			       tx_rows->replica_id) < tx_rows->lsn &&
+		    applier_apply_tx(tx_rows, tx_rows + row_count - 1) != 0) {
+			latch_unlock(latch);
+			fiber_gc();
+			diag_raise();
 		}
 		latch_unlock(latch);
 
diff --git a/src/box/txn.c b/src/box/txn.c
index 7900fb3ab..f6bf72d0c 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -34,6 +34,7 @@
 #include "journal.h"
 #include <fiber.h>
 #include "xrow.h"
+#include "replication.h"
 
 double too_long_threshold;
 
@@ -141,6 +142,7 @@ txn_begin(bool is_autocommit)
 	/* Initialize members explicitly to save time on memset() */
 	stailq_create(&txn->stmts);
 	txn->n_rows = 0;
+	txn->n_remote_rows = 0;
 	txn->is_autocommit = is_autocommit;
 	txn->has_triggers  = false;
 	txn->is_aborted = false;
@@ -233,6 +235,9 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
 		if (txn_add_redo(stmt, request) != 0)
 			goto fail;
+		if (stmt->row->replica_id != 0 &&
+		    stmt->row->replica_id != instance_id)
+			++txn->n_remote_rows;
 		++txn->n_rows;
 	}
 	/*
@@ -271,14 +276,20 @@ txn_write_to_wal(struct txn *txn)
 		return -1;
 
 	struct txn_stmt *stmt;
-	struct xrow_header **row = req->rows;
+	struct xrow_header **remote_row = req->rows;
+	struct xrow_header **local_row = req->rows + txn->n_remote_rows;
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->row == NULL)
 			continue; /* A read (e.g. select) request */
-		*row++ = stmt->row;
+		if (stmt->row->replica_id != 0 &&
+		    stmt->row->replica_id != instance_id)
+			*remote_row++ = stmt->row;
+		else
+			*local_row++ = stmt->row;
 		req->approx_len += xrow_approx_len(stmt->row);
 	}
-	assert(row == req->rows + req->n_rows);
+	assert(remote_row == req->rows + txn->n_remote_rows);
+	assert(local_row == req->rows + req->n_rows);
 
 	ev_tstamp start = ev_monotonic_now(loop());
 	int64_t res = journal_write(req);
@@ -399,8 +410,6 @@ txn_rollback()
 		txn_stmt_unref_tuples(stmt);
 
 	TRASH(txn);
-	/** Free volatile txn memory. */
-	fiber_gc();
 	fiber_set_txn(fiber(), NULL);
 }
 
@@ -480,6 +489,8 @@ box_txn_rollback()
 		return -1;
 	}
 	txn_rollback(); /* doesn't throw */
+	/** Free volatile txn memory. */
+	fiber_gc();
 	return 0;
 }
 
diff --git a/src/box/txn.h b/src/box/txn.h
index de5cb0de4..2791fdf73 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -142,6 +142,10 @@ struct txn {
 	struct stailq stmts;
 	/** Total number of WAL rows in this txn. */
 	int n_rows;
+	/**
+	 * Count of rows generated on a remote replica.
+	 */
+	int n_remote_rows;
 	/**
 	 * True if this transaction is running in autocommit mode
 	 * (statement end causes an automatic transaction commit).
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
new file mode 100644
index 000000000..009f84430
--- /dev/null
+++ b/test/replication/transaction.result
@@ -0,0 +1,240 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+---
+...
+_ = s:create_index('pk')
+---
+...
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+---
+- [4, 'r']
+...
+v1 = box.info.vclock
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+---
+...
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- set conflict to third transaction
+box.space.test:delete({3})
+---
+...
+box.space.test:replace({6, 'r'})
+---
+- [6, 'r']
+...
+-- restart replication
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+---
+- false
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+  - [6, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- check restart does not help
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+  - [6, 'r']
+...
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}, replication_skip_conflict = true}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- check last transaction applied without conflicting row
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+---
+- [8, 'r']
+...
+box.space.test:replace({9, 'r'})
+---
+- [9, 'r']
+...
+-- issue a conflicting tx
+test_run:cmd("switch default")
+---
+- true
+...
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- vclock should be increased but rows skipped
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+-- check restart does not change something
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
new file mode 100644
index 000000000..47003c644
--- /dev/null
+++ b/test/replication/transaction.test.lua
@@ -0,0 +1,86 @@
+env = require('test_run')
+test_run = env.new()
+box.schema.user.grant('guest', 'replication')
+
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+_ = s:create_index('pk')
+
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+v1 = box.info.vclock
+
+test_run:cmd("switch default")
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+-- set conflict to third transaction
+box.space.test:delete({3})
+box.space.test:replace({6, 'r'})
+-- restart replication
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+
+-- check restart does not help
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+box.cfg{replication = {}, replication_skip_conflict = true}
+box.cfg{replication = replication}
+
+-- check last transaction applied without conflicting row
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+box.space.test:replace({9, 'r'})
+
+-- issue a conflicting tx
+test_run:cmd("switch default")
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- vclock should be increased but rows skipped
+box.space.test:select()
+
+-- check restart does not change something
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.schema.user.revoke('guest', 'replication')
+s:drop()
-- 
2.21.0

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [tarantool-patches] Re: [PATCH 1/3] Applier gets rid of a xstream
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream Georgy Kirichenko
@ 2019-03-05  8:52   ` Konstantin Osipov
  2019-03-05 10:19   ` [tarantool-patches] " Vladimir Davydov
  1 sibling, 0 replies; 15+ messages in thread
From: Konstantin Osipov @ 2019-03-05  8:52 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/03/03 23:30]:
> Remove xstream dependency and use direct box interface to apply all
> replication rows. This is refactoring before transactional replication.

Looks good to me. I would look into moving apply_row +
apply_initial_join_row into applier.cc, although I see that it's
re-used for wal_apply_row. 

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [tarantool-patches] Re: [PATCH 2/3] Merge apply row and apply_initial_join_row
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 2/3] Merge apply row and apply_initial_join_row Georgy Kirichenko
@ 2019-03-05  9:06   ` Konstantin Osipov
  2019-03-05 10:26   ` [tarantool-patches] " Vladimir Davydov
  1 sibling, 0 replies; 15+ messages in thread
From: Konstantin Osipov @ 2019-03-05  9:06 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/03/03 23:30]:
> +/**
> + * Memtx engine instance
> + */
> +static struct memtx_engine *memtx = NULL;
> +/**
> + * Vinyl engine instance
> + */
> +static struct vinyl_engine *vinyl = NULL;

Would you have an instance for each engine? 

The idea with find_by_name() was that sometime in the future we
will have storage engines entirely pluggable (no, this will not
happen really).

So I'm OK with ditching engine_by_name, but then the declaration
should be in memtx_engine.h and vinyl_engine.h, respectively, or
at least in engine.h, not in box.[hc]

Re the patch itself, it's OK to push (I assume it helps moving
apply_row into applier).

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [tarantool-patches] Re: [PATCH 3/3] Transaction support for applier
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 3/3] Transaction support for applier Georgy Kirichenko
@ 2019-03-05  9:11   ` Konstantin Osipov
  2019-03-05 10:28     ` Vladimir Davydov
  2019-03-05  9:13   ` Konstantin Osipov
                     ` (3 subsequent siblings)
  4 siblings, 1 reply; 15+ messages in thread
From: Konstantin Osipov @ 2019-03-05  9:11 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/03/03 23:30]:
> Applier fetch incoming rows to form a transaction and then apply it.
> In case of replication all local changes moved to an journal entry
> tail to form a separate transaction (like autonomous transaction)
> to be able to replicate changes back so applier assumes that transactions
> could not be mixed in a replication stream.
> 
> Closes: #2798
> Needed for: #980
> ---
>  src/box/applier.cc                    | 243 ++++++++++++++++++++------
>  src/box/txn.c                         |  21 ++-
>  src/box/txn.h                         |   4 +
>  test/replication/transaction.result   | 240 +++++++++++++++++++++++++
>  test/replication/transaction.test.lua |  86 +++++++++
>  5 files changed, 534 insertions(+), 60 deletions(-)
>  create mode 100644 test/replication/transaction.result
>  create mode 100644 test/replication/transaction.test.lua
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 3222b041d..dfabbe5ab 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,12 @@
>  #include "session.h"
>  #include "cfg.h"
>  #include "box.h"
> +#include "txn.h"

I thought we agreed to use box API, not txn API? 
> +static int
> +applier_apply_tx(struct xrow_header *first_row, struct xrow_header *last_row)
> +{
> +	int res = 0;
> +	struct txn *txn = NULL;
> +	struct xrow_header *row = first_row;
> +	if (first_row != last_row)
> +		txn = txn_begin(false);

Shouldn't it be box_txn_begin()?


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [tarantool-patches] Re: [PATCH 3/3] Transaction support for applier
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 3/3] Transaction support for applier Georgy Kirichenko
  2019-03-05  9:11   ` [tarantool-patches] " Konstantin Osipov
@ 2019-03-05  9:13   ` Konstantin Osipov
  2019-03-05  9:25   ` Konstantin Osipov
                     ` (2 subsequent siblings)
  4 siblings, 0 replies; 15+ messages in thread
From: Konstantin Osipov @ 2019-03-05  9:13 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/03/03 23:30]:
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 7900fb3ab..f6bf72d0c 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -34,6 +34,7 @@
>  #include "journal.h"
>  #include <fiber.h>
>  #include "xrow.h"
> +#include "replication.h"

What do you need this header for? instance_id? Is there a way to
keep these two modules independent on each other? 

> +	txn->n_remote_rows = 0;

Just the same as in the previous patch, this name is very general.
Could you please come up with something more specific?

> +	/**
> +	 * Count of rows generated on a remote replica.
> +	 */
> +	int n_remote_rows;

OK, now I know what this is, although I don't know what is
"generated", but what is this for? Please write a better comment.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [tarantool-patches] Re: [PATCH 3/3] Transaction support for applier
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 3/3] Transaction support for applier Georgy Kirichenko
  2019-03-05  9:11   ` [tarantool-patches] " Konstantin Osipov
  2019-03-05  9:13   ` Konstantin Osipov
@ 2019-03-05  9:25   ` Konstantin Osipov
  2019-03-05  9:28   ` Konstantin Osipov
  2019-03-05 11:59   ` [tarantool-patches] " Vladimir Davydov
  4 siblings, 0 replies; 15+ messages in thread
From: Konstantin Osipov @ 2019-03-05  9:25 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/03/03 23:30]:
> Applier fetch incoming rows to form a transaction and then apply it.
> In case of replication all local changes moved to an journal entry
> tail to form a separate transaction (like autonomous transaction)
> to be able to replicate changes back so applier assumes that transactions
> could not be mixed in a replication stream.
> 
> Closes: #2798
> Needed for: #980
> ---
>  src/box/applier.cc                    | 243 ++++++++++++++++++++------
>  src/box/txn.c                         |  21 ++-
>  src/box/txn.h                         |   4 +
>  test/replication/transaction.result   | 240 +++++++++++++++++++++++++
>  test/replication/transaction.test.lua |  86 +++++++++
>  5 files changed, 534 insertions(+), 60 deletions(-)
>  create mode 100644 test/replication/transaction.result
>  create mode 100644 test/replication/transaction.test.lua
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 3222b041d..dfabbe5ab 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,12 @@
>  #include "session.h"
>  #include "cfg.h"
>  #include "box.h"
> +#include "txn.h"
> +
> +enum {
> +	/* Initial capacity of rows array. */
> +	APPLIER_TX_INITIAL_ROW_COUNT = 16,
> +};
>  
>  STRS(applier_state, applier_STATE);
>  
> @@ -380,6 +386,176 @@ applier_join(struct applier *applier)
>  	applier_set_state(applier, APPLIER_READY);
>  }
>  
> +/**
> + * Read one transaction from network using applier's input buffer.
> + * Transaction rows are placed onto fiber gc region.
> + * We could not use applier input buffer for that because rpos is adjusted
> + * after each xrow decoding and corresponding network input space is going
> + * to be reused.
> + *
> + * Return count of transaction rows and put row's header pointers into rows
> + * array.
> + */
> +static int
> +applier_read_tx(struct applier *applier, struct xrow_header **rows)
> +{
> +	struct ev_io *coio = &applier->io;
> +	struct ibuf *ibuf = &applier->ibuf;
> +	int64_t tsn = 0;
> +	int row_capacity = APPLIER_TX_INITIAL_ROW_COUNT;
> +	struct xrow_header *first_row, *row;
> +	first_row = (struct xrow_header *)region_alloc(&fiber()->gc,
> +						       row_capacity *
> +						       sizeof(struct xrow_header));
> +	if (first_row == NULL) {
> +		diag_set(OutOfMemory, sizeof(struct xrow_header) * row_capacity,
> +			 "region", "struct xrow_header");
> +		goto error;
> +	}
> +	row = first_row;
> +
> +	do {
> +		if (row == first_row + row_capacity) {
> +			/* Realloc rows array. */
> +			row = (struct xrow_header *)region_alloc(&fiber()->gc,
> +								 row_capacity *
> +								 sizeof(struct xrow_header) << 1);
> +			if (row == NULL) {
> +				diag_set(OutOfMemory,
> +					 sizeof(struct xrow_header) *
> +					 row_capacity << 1,
> +					 "region", "struct xrow_header");
> +				goto error;
> +			}
> +			memcpy(row, first_row, row_capacity *
> +					       sizeof(struct xrow_header) << 1);
> +			first_row = row;
> +			row = first_row + row_capacity;
> +			row_capacity <<= 1;
> +		}

This looks like inventing a wheel, let's move the resize part to
region_realloc().

> +		if (row == first_row) {
> +			/*
> +			 * First row in a transaction. In order to enforce
> +			 * consistency check that first row lsn and replica id
> +			 * match with transaction.
> +			 */
> +			tsn = row->tsn;
> +			if (row->lsn != tsn) {
> +				/* There is not a first row in the transactions. */

/* Transaction id must be derived from the log sequence number of
 * the first row in the transaction.
 */

> +				diag_set(ClientError, ER_PROTOCOL,
> +					 "Not a first row in a transaction");

This message would be confusing when it pops up, please use a
message from the suggested comment.

> +				goto error;
> +			}
> +		}
> +		if (tsn != row->tsn) {
> +			/* We are not able to handle interleaving transactions. */
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "replications",
> +				 "interleaving transactions");

"replication"

> +		if (row->body->iov_base != NULL) {
> +			/* Save row bodies to gc region. */

As a courtesy to performance you could only do this for multi-row
transactions. You can see it's a multi-row transaction from xrow
header.

> +	*rows = first_row;
> +	return row - first_row + 1;

As an alternative to region_realloc(), you could add stailq in_txn
to struct xrow_header.  Being able to add xrow to a linked list
won't hurt in other places either.

> +
> +static int
> +applier_apply_tx(struct xrow_header *first_row, struct xrow_header *last_row)

Please add a comment.

> diff --git a/src/box/txn.c b/src/box/txn.c
> index 7900fb3ab..f6bf72d0c 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -34,6 +34,7 @@
>  #include "journal.h"
>  #include <fiber.h>
>  #include "xrow.h"
> +#include "replication.h"

Could this entire thing with remote rows be moved to a separate
patch?


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 15+ messages in thread

* [tarantool-patches] Re: [PATCH 3/3] Transaction support for applier
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 3/3] Transaction support for applier Georgy Kirichenko
                     ` (2 preceding siblings ...)
  2019-03-05  9:25   ` Konstantin Osipov
@ 2019-03-05  9:28   ` Konstantin Osipov
  2019-03-05 11:59   ` [tarantool-patches] " Vladimir Davydov
  4 siblings, 0 replies; 15+ messages in thread
From: Konstantin Osipov @ 2019-03-05  9:28 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/03/03 23:30]:
> @@ -271,14 +276,20 @@ txn_write_to_wal(struct txn *txn)
>  		return -1;
>  
>  	struct txn_stmt *stmt;
> -	struct xrow_header **row = req->rows;
> +	struct xrow_header **remote_row = req->rows;
> +	struct xrow_header **local_row = req->rows + txn->n_remote_rows;
>  	stailq_foreach_entry(stmt, &txn->stmts, next) {
>  		if (stmt->row == NULL)
>  			continue; /* A read (e.g. select) request */
> -		*row++ = stmt->row;
> +		if (stmt->row->replica_id != 0 &&
> +		    stmt->row->replica_id != instance_id)
> +			*remote_row++ = stmt->row;
> +		else
> +			*local_row++ = stmt->row;
>  		req->approx_len += xrow_approx_len(stmt->row);
>  	}
> -	assert(row == req->rows + req->n_rows);
> +	assert(remote_row == req->rows + txn->n_remote_rows);
> +	assert(local_row == req->rows + req->n_rows);

AFAIU this patch tests local rows by means of
replication_skip_conflict option. Could you please also test it
with local on_replace/before_replace triggers? Besides, now that
applier issues multi-statement transactions, we could add a test
for on_commit/on_rollback triggers fired from applier as well. We
could set such triggers from statement-level triggers.


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream Georgy Kirichenko
  2019-03-05  8:52   ` [tarantool-patches] " Konstantin Osipov
@ 2019-03-05 10:19   ` Vladimir Davydov
  1 sibling, 0 replies; 15+ messages in thread
From: Vladimir Davydov @ 2019-03-05 10:19 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Mar 03, 2019 at 11:26:16PM +0300, Georgy Kirichenko wrote:
> diff --git a/src/box/applier.h b/src/box/applier.h
> index d942b6fbb..5bff90031 100644
> --- a/src/box/applier.h
> +++ b/src/box/applier.h
> @@ -305,28 +301,29 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
>  	journal->vclock = v;
>  }
>  
> -static inline void
> -apply_row(struct xstream *stream, struct xrow_header *row)
> +int
> +apply_row(struct xrow_header *row)
>  {
> -	(void) stream;
>  	struct request request;
>  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));

This function may throw an exception while apply_row protocol forbids
that.

>  	if (request.type == IPROTO_NOP) {
>  		if (process_nop(&request) != 0)
> -			diag_raise();
> -		return;
> +			return -1;
> +		return 0;
>  	}
>  	struct space *space = space_cache_find_xc(request.space_id);

Ditto.

>  	if (box_process_rw(&request, space, NULL) != 0) {
>  		say_error("error applying row: %s", request_str(&request));
> -		diag_raise();
> +		return -1;
>  	}
> +	return 0;
>  }
>  
>  static void
>  apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  {
> -	apply_row(stream, row);
> +	if (apply_row(row) != 0)
> +		diag_raise();
>  
>  	struct wal_stream *xstream =
>  		container_of(stream, struct wal_stream, base);
> @@ -352,15 +349,14 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows)
>  	ctx->yield = (wal_max_rows >> 4)  + 1;
>  }
>  
> -static void
> -apply_initial_join_row(struct xstream *stream, struct xrow_header *row)
> +int
> +apply_initial_join_row(struct xrow_header *row)
>  {
> -	(void) stream;
>  	struct request request;
>  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>  	struct space *space = space_cache_find_xc(request.space_id);

Ditto.

Anyway, I don't think it's worth removing exceptions in the scope of
this patch. Since applier is written in C++, let's let them be for now -
we'll remove them later when we convert applier to pure C.

>  	/* no access checks here - applier always works with admin privs */
> -	space_apply_initial_join_row_xc(space, &request);
> +	return space_apply_initial_join_row(space, &request);

I agree with Kostja that it's worth moving apply_initial_join_row and
apply_row to applier.cc rather than exporting them, because they are
only used by appliers (names match, too).

As for apply_wal_row, I think we should leave it in box.cc and implement
it without the help of apply_row, because those two functions are
essentially different and the difference will only grow over time. E.g.
we don't need to handle NOP requests when recovering from WAL, we can
simply skip those. And when transaction boundaries are introduced, we
won't need to handle them on local recovery, either.

Let's please do that in the scope of this patch.

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [tarantool-patches] [PATCH 2/3] Merge apply row and apply_initial_join_row
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 2/3] Merge apply row and apply_initial_join_row Georgy Kirichenko
  2019-03-05  9:06   ` [tarantool-patches] " Konstantin Osipov
@ 2019-03-05 10:26   ` Vladimir Davydov
  1 sibling, 0 replies; 15+ messages in thread
From: Vladimir Davydov @ 2019-03-05 10:26 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Mar 03, 2019 at 11:26:17PM +0300, Georgy Kirichenko wrote:
> Use apply_row for bot initial join and subscribe.

s/bot/both

> Refactoring: add memtx and vinyl engine static variable, get rid of
> engine_by_name at box.cc

Why do you need that? Looking at the next patch, I see no point at all
in such a refactoring.

> 
> Needed for: 2798
> ---
>  src/box/applier.cc |  2 +-
>  src/box/box.cc     | 51 ++++++++++++++++------------------------------
>  src/box/box.h      |  9 --------
>  3 files changed, 19 insertions(+), 43 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index fd98b733d..3222b041d 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -314,7 +314,7 @@ applier_join(struct applier *applier)
>  		coio_read_xrow(coio, ibuf, &row);
>  		applier->last_row_time = ev_monotonic_now(loop());
>  		if (iproto_type_is_dml(row.type)) {
> -			if (apply_initial_join_row(&row) != 0)
> +			if (apply_row(&row) != 0)
>  				diag_raise();
>  			if (++row_count % 100000 == 0)
>  				say_info("%.1fM rows received", row_count / 1e6);
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 22cd52b04..e62f2ea12 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -137,6 +137,15 @@ static struct fiber_pool tx_fiber_pool;
>   */
>  static struct cbus_endpoint tx_prio_endpoint;
>  
> +/**
> + * Memtx engine instance
> + */
> +static struct memtx_engine *memtx = NULL;
> +/**
> + * Vinyl engine instance
> + */
> +static struct vinyl_engine *vinyl = NULL;
> +
>  static int
>  box_check_writable(void)
>  {
> @@ -306,6 +315,15 @@ apply_row(struct xrow_header *row)
>  {
>  	struct request request;
>  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> +
> +	assert(memtx != NULL);
> +	if (memtx->state == MEMTX_INITIAL_RECOVERY) {
> +		/* Snapshot recovery or initial join */
> +		struct space *space = space_cache_find_xc(request.space_id);
> +		/* no access checks here - applier always works with admin privs */
> +		return space_apply_initial_join_row(space, &request);
> +	}
> +

Mixing two essentially different cases (initial join and subscribe) in
one function, using 'if' to switch between them, accessing memtx->state
here - all of this looks convoluted and confusing. Let's please let
apply_initial_join_row be.

>  	if (request.type == IPROTO_NOP) {
>  		if (process_nop(&request) != 0)
>  			return -1;
> @@ -349,16 +367,6 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows)
>  	ctx->yield = (wal_max_rows >> 4)  + 1;
>  }
>  
> -int
> -apply_initial_join_row(struct xrow_header *row)
> -{
> -	struct request request;
> -	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> -	struct space *space = space_cache_find_xc(request.space_id);
> -	/* no access checks here - applier always works with admin privs */
> -	return space_apply_initial_join_row(space, &request);
> -}
> -
>  /* {{{ configuration bindings */
>  
>  static void
> @@ -784,13 +792,9 @@ box_set_io_collect_interval(void)
>  void
>  box_set_snap_io_rate_limit(void)
>  {
> -	struct memtx_engine *memtx;
> -	memtx = (struct memtx_engine *)engine_by_name("memtx");

Getting rid of engine_by_name may be worthwhile, but it's definitely out
of the scope of your task. Besides, this looks incomplete, as we still
use engine_by_name in other places (lua/stat.c, lua/info.c). If you
think, we need to drop engine_by_name and access engines directly, then
please submit it in a separate patch with a proper justification. For
now, let's please drop this patch from this series.

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [tarantool-patches] Re: [PATCH 3/3] Transaction support for applier
  2019-03-05  9:11   ` [tarantool-patches] " Konstantin Osipov
@ 2019-03-05 10:28     ` Vladimir Davydov
       [not found]       ` <20190305112333.GA30697@chai>
  0 siblings, 1 reply; 15+ messages in thread
From: Vladimir Davydov @ 2019-03-05 10:28 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches, Georgy Kirichenko

On Tue, Mar 05, 2019 at 12:11:35PM +0300, Konstantin Osipov wrote:
> * Georgy Kirichenko <georgy@tarantool.org> [19/03/03 23:30]:
> > Applier fetch incoming rows to form a transaction and then apply it.
> > In case of replication all local changes moved to an journal entry
> > tail to form a separate transaction (like autonomous transaction)
> > to be able to replicate changes back so applier assumes that transactions
> > could not be mixed in a replication stream.
> > 
> > Closes: #2798
> > Needed for: #980
> > ---
> >  src/box/applier.cc                    | 243 ++++++++++++++++++++------
> >  src/box/txn.c                         |  21 ++-
> >  src/box/txn.h                         |   4 +
> >  test/replication/transaction.result   | 240 +++++++++++++++++++++++++
> >  test/replication/transaction.test.lua |  86 +++++++++
> >  5 files changed, 534 insertions(+), 60 deletions(-)
> >  create mode 100644 test/replication/transaction.result
> >  create mode 100644 test/replication/transaction.test.lua
> > 
> > diff --git a/src/box/applier.cc b/src/box/applier.cc
> > index 3222b041d..dfabbe5ab 100644
> > --- a/src/box/applier.cc
> > +++ b/src/box/applier.cc
> > @@ -48,6 +48,12 @@
> >  #include "session.h"
> >  #include "cfg.h"
> >  #include "box.h"
> > +#include "txn.h"
> 
> I thought we agreed to use box API, not txn API? 

I don't think it's a good idea to use any public box API functions, such
as box_txn_begin and box_txn_commit, in internals, because they might
have some fool-proof checks we don't need.

> > +static int
> > +applier_apply_tx(struct xrow_header *first_row, struct xrow_header *last_row)
> > +{
> > +	int res = 0;
> > +	struct txn *txn = NULL;
> > +	struct xrow_header *row = first_row;
> > +	if (first_row != last_row)
> > +		txn = txn_begin(false);
> 
> Shouldn't it be box_txn_begin()?

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [tarantool-patches] Re: [PATCH 3/3] Transaction support for applier
       [not found]       ` <20190305112333.GA30697@chai>
@ 2019-03-05 11:27         ` Vladimir Davydov
  0 siblings, 0 replies; 15+ messages in thread
From: Vladimir Davydov @ 2019-03-05 11:27 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

On Tue, Mar 05, 2019 at 02:23:33PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [19/03/05 13:32]:
> > > I thought we agreed to use box API, not txn API? 
> > 
> > I don't think it's a good idea to use any public box API functions, such
> > as box_txn_begin and box_txn_commit, in internals, because they might
> > have some fool-proof checks we don't need.
> 
> Umh, okay

Forwarding to the list for the record.

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [tarantool-patches] [PATCH 3/3] Transaction support for applier
  2019-03-03 20:26 ` [tarantool-patches] [PATCH 3/3] Transaction support for applier Georgy Kirichenko
                     ` (3 preceding siblings ...)
  2019-03-05  9:28   ` Konstantin Osipov
@ 2019-03-05 11:59   ` Vladimir Davydov
  4 siblings, 0 replies; 15+ messages in thread
From: Vladimir Davydov @ 2019-03-05 11:59 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Sun, Mar 03, 2019 at 11:26:18PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> In case of replication all local changes moved to an journal entry
> tail to form a separate transaction (like autonomous transaction)
> to be able to replicate changes back so applier assumes that transactions
> could not be mixed in a replication stream.
> 
> Closes: #2798
> Needed for: #980
> ---
>  src/box/applier.cc                    | 243 ++++++++++++++++++++------
>  src/box/txn.c                         |  21 ++-
>  src/box/txn.h                         |   4 +
>  test/replication/transaction.result   | 240 +++++++++++++++++++++++++
>  test/replication/transaction.test.lua |  86 +++++++++
>  5 files changed, 534 insertions(+), 60 deletions(-)
>  create mode 100644 test/replication/transaction.result
>  create mode 100644 test/replication/transaction.test.lua
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 3222b041d..dfabbe5ab 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,12 @@
>  #include "session.h"
>  #include "cfg.h"
>  #include "box.h"
> +#include "txn.h"
> +
> +enum {
> +	/* Initial capacity of rows array. */
> +	APPLIER_TX_INITIAL_ROW_COUNT = 16,
> +};

I don't think it makes sense to make this a file-wide constant. After
all, it's only used in applier_read_tx so let's please move it there or,
even better, use 16 directly in region_alloc - it should be clear what
it means.

>  
>  STRS(applier_state, applier_STATE);
>  
> @@ -380,6 +386,176 @@ applier_join(struct applier *applier)
>  	applier_set_state(applier, APPLIER_READY);
>  }
>  
> +/**
> + * Read one transaction from network using applier's input buffer.
> + * Transaction rows are placed onto fiber gc region.
> + * We could not use applier input buffer for that because rpos is adjusted
> + * after each xrow decoding and corresponding network input space is going
> + * to be reused.
> + *
> + * Return count of transaction rows and put row's header pointers into rows
> + * array.
> + */
> +static int
> +applier_read_tx(struct applier *applier, struct xrow_header **rows)
> +{
> +	struct ev_io *coio = &applier->io;
> +	struct ibuf *ibuf = &applier->ibuf;
> +	int64_t tsn = 0;
> +	int row_capacity = APPLIER_TX_INITIAL_ROW_COUNT;
> +	struct xrow_header *first_row, *row;
> +	first_row = (struct xrow_header *)region_alloc(&fiber()->gc,
> +						       row_capacity *
> +						       sizeof(struct xrow_header));
> +	if (first_row == NULL) {
> +		diag_set(OutOfMemory, sizeof(struct xrow_header) * row_capacity,
> +			 "region", "struct xrow_header");
> +		goto error;
> +	}
> +	row = first_row;
> +
> +	do {
> +		if (row == first_row + row_capacity) {
> +			/* Realloc rows array. */
> +			row = (struct xrow_header *)region_alloc(&fiber()->gc,
> +								 row_capacity *
> +								 sizeof(struct xrow_header) << 1);

This like is too long. Please make sure your code fits in ~80
characters.

> +			if (row == NULL) {
> +				diag_set(OutOfMemory,
> +					 sizeof(struct xrow_header) *
> +					 row_capacity << 1,
> +					 "region", "struct xrow_header");
> +				goto error;
> +			}
> +			memcpy(row, first_row, row_capacity *
> +					       sizeof(struct xrow_header) << 1);

I agree with Kostja - it's worth introducing region_realloc for this.
Please do in a separate patch. I bet there are other places in the code
that do exactly the same thing. It would be nice if you patched those as
well.

> +			first_row = row;
> +			row = first_row + row_capacity;
> +			row_capacity <<= 1;
> +		}

Please move the code below this point to a new helper function,
applier_read_row. It will make the code easier to follow and also
reduce the indentation level.

> +
> +		double timeout = replication_disconnect_timeout();
> +		/*
> +		 * Unfortunately we do not have C-version of coio read xrow
> +		 * functions yet so use try-catch guard as workaround.
> +		 */

Why return -1 at all in this function when you can simply pass the
exception to the caller. Let's please use exceptions in this code for
now, otherwise it looks inconsistent - sometimes we return -1, sometimes
we use exceptions for error propagation. We'll remove all exceptions at
once when we convert applier.cc to C.

> +		try {
> +			/*
> +			 * Tarantool < 1.7.7 does not send periodic heartbeat
> +			 * messages so we can't assume that if we haven't heard
> +			 * from the master for quite a while the connection is
> +			 * broken - the master might just be idle.
> +			 */
> +			if (applier->version_id < version_id(1, 7, 7))
> +				coio_read_xrow(coio, ibuf, row);
> +			else
> +				coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
> +		} catch (...) {
> +			goto error;
> +		}
> +
> +		if (iproto_type_is_error(row->type)) {
> +			xrow_decode_error(row);
> +			goto error;
> +		}
> +
> +		/* Replication request. */
> +		if (row->replica_id == REPLICA_ID_NIL ||
> +		    row->replica_id >= VCLOCK_MAX) {
> +			/*
> +			 * A safety net, this can only occur
> +			 * if we're fed a strangely broken xlog.
> +			 */
> +			diag_set(ClientError, ER_UNKNOWN_REPLICA,
> +				 int2str(row->replica_id),
> +				 tt_uuid_str(&REPLICASET_UUID));
> +			goto error;
> +		}
> +		if (row == first_row) {
> +			/*
> +			 * First row in a transaction. In order to enforce
> +			 * consistency check that first row lsn and replica id
> +			 * match with transaction.
> +			 */
> +			tsn = row->tsn;
> +			if (row->lsn != tsn) {
> +				/* There is not a first row in the transactions. */

This comment is useless as it simply reiterates the error message.

> +				diag_set(ClientError, ER_PROTOCOL,
> +					 "Not a first row in a transaction");

Don't very much like the error message. It's kinda too specific and
doesn't say much what happened at the same time. May be, we'd better
rewrite it as

	"Inconsistent transaction stream"

?

> +				goto error;
> +			}
> +		}
> +		if (tsn != row->tsn) {
> +			/* We are not able to handle interleaving transactions. */

Again, a useless comment.

> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "replications",

s/replications/replication

> +				 "interleaving transactions");
> +			goto error;
> +		}
> +
> +
> +		applier->lag = ev_now(loop()) - row->tm;
> +		applier->last_row_time = ev_monotonic_now(loop());
> +
> +		if (row->body->iov_base != NULL) {
> +			/* Save row bodies to gc region. */
> +			void *new_base = region_alloc(&fiber()->gc,
> +						      row->body->iov_len);
> +			if (new_base == NULL) {
> +				diag_set(OutOfMemory, row->body->iov_len,
> +					 "slab", "xrow_data");
> +				goto error;
> +			}
> +			memcpy(new_base, row->body->iov_base, row->body->iov_len);

I assume there can't be more than one iov in the body in thise case, but
still, assuming this implicitly in the code looks fragile. Let's please
either add an assertion or loop over all iovs.

Also, I think it's worth factoring out this function to xrow.c
(xrow_dup_body or something).

> +			/* Adjust row body pointers. */
> +			row->body->iov_base = new_base;
> +		}
> +
> +	} while (row->is_commit == 0 && ++row);

Nit: is_commit is a bool so s/row->is_commit == 0/!row->is_commit/

> +
> +	*rows = first_row;
> +	return row - first_row + 1;
> +error:
> +	return -1;
> +}
> +
> +static int
> +applier_apply_tx(struct xrow_header *first_row, struct xrow_header *last_row)
> +{
> +	int res = 0;
> +	struct txn *txn = NULL;
> +	struct xrow_header *row = first_row;
> +	if (first_row != last_row)
> +		txn = txn_begin(false);

Shouldn't we return immediately with an error if txn_begin returned
NULL?

> +	while (row <= last_row && res == 0) {
> +		res = apply_row(row);
> +		struct error *e;
> +		if (res != 0 &&
> +		    (e = diag_last_error(diag_get()))->type ==
> +			    &type_ClientError &&

Ouch, this hurts my eyes. Please rewrite it as it original was:

	if (res != 0) {
		struct error *e = ...
		if (e->type == ...) {
		}
	}

> +		    box_error_code(e) == ER_TUPLE_FOUND &&
> +		    replication_skip_conflict) {
> +			/*
> +			 * In case of ER_TUPLE_FOUND error and enabled
> +			 * replication_skip_conflict configuration
> +			 * option, skip applying the foreign row and
> +			 * replace it with NOP in the local write ahead
> +			 * log.
> +			 */

I wonder if we should skip the whole conflicting transaction rather than
just one row in this case...

> +			diag_clear(diag_get());
> +			(row)->type = IPROTO_NOP;
> +			(row)->bodycnt = 0;

Nit: pointless parentheses around 'row'.

What about row->tsn? Shouldn't we set it, too.

> +			res = apply_row(row);
> +		}
> +		++row;
> +	}
> +	if (res == 0 && txn != NULL)
> +		res = txn_commit(txn);
> +	if (res != 0)
> +		txn_rollback();
> +	return res;
> +}
> +
>  /**
>   * Execute and process SUBSCRIBE request (follow updates from a master).
>   */
> @@ -509,36 +685,14 @@ applier_subscribe(struct applier *applier)
>  			applier_set_state(applier, APPLIER_FOLLOW);
>  		}
>  
> -		/*
> -		 * Tarantool < 1.7.7 does not send periodic heartbeat
> -		 * messages so we can't assume that if we haven't heard
> -		 * from the master for quite a while the connection is
> -		 * broken - the master might just be idle.
> -		 */
> -		if (applier->version_id < version_id(1, 7, 7)) {
> -			coio_read_xrow(coio, ibuf, &row);
> -		} else {
> -			double timeout = replication_disconnect_timeout();
> -			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
> -		}
> +		struct xrow_header *tx_rows;
> +		int row_count = applier_read_tx(applier, &tx_rows);
> +		if (row_count < 0)
> +			diag_raise();
>  
> -		if (iproto_type_is_error(row.type))
> -			xrow_decode_error_xc(&row);  /* error */
> -		/* Replication request. */
> -		if (row.replica_id == REPLICA_ID_NIL ||
> -		    row.replica_id >= VCLOCK_MAX) {
> -			/*
> -			 * A safety net, this can only occur
> -			 * if we're fed a strangely broken xlog.
> -			 */
> -			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> -				  int2str(row.replica_id),
> -				  tt_uuid_str(&REPLICASET_UUID));
> -		}
> -
> -		applier->lag = ev_now(loop()) - row.tm;
> +		applier->lag = ev_now(loop()) - (tx_rows + row_count - 1)->tm;
>  		applier->last_row_time = ev_monotonic_now(loop());

Hmm, why? Isn't it enough to update those in applier_read_tx?

> -		struct replica *replica = replica_by_id(row.replica_id);
> +		struct replica *replica = replica_by_id(tx_rows->replica_id);
>  		struct latch *latch = (replica ? &replica->order_latch :
>  				       &replicaset.applier.order_latch);
>  		/*
> @@ -548,33 +702,12 @@ applier_subscribe(struct applier *applier)
>  		 * that belong to the same server id.
>  		 */
>  		latch_lock(latch);
> -		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> -			int res = apply_row(&row);
> -			if (res != 0) {
> -				struct error *e = diag_last_error(diag_get());
> -				/*
> -				 * In case of ER_TUPLE_FOUND error and enabled
> -				 * replication_skip_conflict configuration
> -				 * option, skip applying the foreign row and
> -				 * replace it with NOP in the local write ahead
> -				 * log.
> -				 */
> -				if (e->type == &type_ClientError &&
> -				    box_error_code(e) == ER_TUPLE_FOUND &&
> -				    replication_skip_conflict) {
> -					diag_clear(diag_get());
> -					struct xrow_header nop;
> -					nop.type = IPROTO_NOP;
> -					nop.bodycnt = 0;
> -					nop.replica_id = row.replica_id;
> -					nop.lsn = row.lsn;
> -					res = apply_row(&nop);
> -				}
> -			}
> -			if (res != 0) {
> -				latch_unlock(latch);
> -				diag_raise();
> -			}
> +		if (vclock_get(&replicaset.vclock,
> +			       tx_rows->replica_id) < tx_rows->lsn &&
> +		    applier_apply_tx(tx_rows, tx_rows + row_count - 1) != 0) {
> +			latch_unlock(latch);
> +			fiber_gc();
> +			diag_raise();

Why do you call fiber_gc() on this particular error, but don't in other
cases? Let's please instead add a guard to the loop body that would call
fiber_gc both on exception and when an iteration is complete.

>  		}
>  		latch_unlock(latch);
>  
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 7900fb3ab..f6bf72d0c 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -34,6 +34,7 @@
>  #include "journal.h"
>  #include <fiber.h>
>  #include "xrow.h"
> +#include "replication.h"
>  
>  double too_long_threshold;
>  
> @@ -141,6 +142,7 @@ txn_begin(bool is_autocommit)
>  	/* Initialize members explicitly to save time on memset() */
>  	stailq_create(&txn->stmts);
>  	txn->n_rows = 0;
> +	txn->n_remote_rows = 0;
>  	txn->is_autocommit = is_autocommit;
>  	txn->has_triggers  = false;
>  	txn->is_aborted = false;
> @@ -233,6 +235,9 @@ txn_commit_stmt(struct txn *txn, struct request *request)
>  	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
>  		if (txn_add_redo(stmt, request) != 0)
>  			goto fail;
> +		if (stmt->row->replica_id != 0 &&
> +		    stmt->row->replica_id != instance_id)
> +			++txn->n_remote_rows;
>  		++txn->n_rows;
>  	}
>  	/*
> @@ -271,14 +276,20 @@ txn_write_to_wal(struct txn *txn)
>  		return -1;
>  
>  	struct txn_stmt *stmt;
> -	struct xrow_header **row = req->rows;
> +	struct xrow_header **remote_row = req->rows;
> +	struct xrow_header **local_row = req->rows + txn->n_remote_rows;
>  	stailq_foreach_entry(stmt, &txn->stmts, next) {
>  		if (stmt->row == NULL)
>  			continue; /* A read (e.g. select) request */
> -		*row++ = stmt->row;
> +		if (stmt->row->replica_id != 0 &&
> +		    stmt->row->replica_id != instance_id)
> +			*remote_row++ = stmt->row;
> +		else
> +			*local_row++ = stmt->row;

Please write a comment explaining what are you doing here and why it's
necessary.

>  		req->approx_len += xrow_approx_len(stmt->row);
>  	}
> -	assert(row == req->rows + req->n_rows);
> +	assert(remote_row == req->rows + txn->n_remote_rows);
> +	assert(local_row == req->rows + req->n_rows);
>  
>  	ev_tstamp start = ev_monotonic_now(loop());
>  	int64_t res = journal_write(req);
> @@ -399,8 +410,6 @@ txn_rollback()
>  		txn_stmt_unref_tuples(stmt);
>  
>  	TRASH(txn);
> -	/** Free volatile txn memory. */
> -	fiber_gc();
>  	fiber_set_txn(fiber(), NULL);
>  }
>  
> @@ -480,6 +489,8 @@ box_txn_rollback()
>  		return -1;
>  	}
>  	txn_rollback(); /* doesn't throw */
> +	/** Free volatile txn memory. */
> +	fiber_gc();

I assume this is a follow-up for 77fa1736dbb9 ("box: factor fiber_gc
out of txn_commit"). Please do this in a separate patch with a proper
justification.

>  	return 0;
>  }
>  
> diff --git a/src/box/txn.h b/src/box/txn.h
> index de5cb0de4..2791fdf73 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -142,6 +142,10 @@ struct txn {
>  	struct stailq stmts;
>  	/** Total number of WAL rows in this txn. */
>  	int n_rows;
> +	/**
> +	 * Count of rows generated on a remote replica.
> +	 */
> +	int n_remote_rows;
>  	/**
>  	 * True if this transaction is running in autocommit mode
>  	 * (statement end causes an automatic transaction commit).
> diff --git a/test/replication/transaction.result b/test/replication/transaction.result
> new file mode 100644
> index 000000000..009f84430
> --- /dev/null
> +++ b/test/replication/transaction.result
> @@ -0,0 +1,240 @@
> +env = require('test_run')
> +---
> +...
> +test_run = env.new()
> +---
> +...
> +box.schema.user.grant('guest', 'replication')
> +---
> +...
> +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
> +---
> +...
> +_ = s:create_index('pk')
> +---
> +...
> +-- transaction w/o conflict
> +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
> +---
> +...
> +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
> +---
> +- true
> +...
> +test_run:cmd("start server replica")
> +---
> +- true
> +...
> +test_run:cmd("switch replica")
> +---
> +- true
> +...
> +-- insert a conflicting row
> +box.space.test:replace({4, 'r'})
> +---
> +- [4, 'r']
> +...
> +v1 = box.info.vclock
> +---
> +...
> +test_run:cmd("switch default")
> +---
> +- true
> +...
> +-- create a two-row transaction with conflicting second
> +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
> +---
> +...
> +-- create a third transaction
> +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
> +---
> +...
> +test_run:cmd("switch replica")
> +---
> +- true
> +...
> +-- nothing was applied
> +v1[1] == box.info.vclock[1]
> +---
> +- true
> +...
> +box.space.test:select()
> +---
> +- - [1, 'm']
> +  - [2, 'm']
> +  - [4, 'r']
> +...
> +-- check replication status
> +box.info.replication[1].upstream.status
> +---
> +- stopped
> +...
> +box.info.replication[1].upstream.message
> +---
> +- Duplicate key exists in unique index 'pk' in space 'test'
> +...
> +-- set conflict to third transaction
> +box.space.test:delete({3})
> +---
> +...
> +box.space.test:replace({6, 'r'})
> +---
> +- [6, 'r']
> +...
> +-- restart replication
> +replication = box.cfg.replication
> +---
> +...
> +box.cfg{replication = {}}
> +---
> +...
> +box.cfg{replication = replication}
> +---
> +...
> +-- replication stopped of third transaction
> +v1[1] + 2 == box.info.vclock[1]
> +---
> +- false

false?

> diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
> new file mode 100644
> index 000000000..47003c644
> --- /dev/null
> +++ b/test/replication/transaction.test.lua
> @@ -0,0 +1,86 @@
> +env = require('test_run')
> +test_run = env.new()
> +box.schema.user.grant('guest', 'replication')
> +
> +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
> +_ = s:create_index('pk')
> +
> +-- transaction w/o conflict
> +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
> +
> +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
> +test_run:cmd("start server replica")
> +test_run:cmd("switch replica")
> +
> +-- insert a conflicting row
> +box.space.test:replace({4, 'r'})
> +v1 = box.info.vclock
> +
> +test_run:cmd("switch default")
> +-- create a two-row transaction with conflicting second
> +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
> +-- create a third transaction
> +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
> +
> +test_run:cmd("switch replica")
> +-- nothing was applied
> +v1[1] == box.info.vclock[1]
> +box.space.test:select()
> +-- check replication status
> +box.info.replication[1].upstream.status
> +box.info.replication[1].upstream.message
> +-- set conflict to third transaction
> +box.space.test:delete({3})
> +box.space.test:replace({6, 'r'})
> +-- restart replication
> +replication = box.cfg.replication
> +box.cfg{replication = {}}
> +box.cfg{replication = replication}
> +-- replication stopped of third transaction
> +v1[1] + 2 == box.info.vclock[1]
> +box.space.test:select()
> +-- check replication status
> +box.info.replication[1].upstream.status
> +box.info.replication[1].upstream.message
> +
> +-- check restart does not help
> +test_run:cmd("switch default")
> +test_run:cmd("restart server replica")
> +test_run:cmd("switch replica")
> +
> +box.space.test:select()
> +-- set skip conflict rows and check that non-conflicting were applied
> +replication = box.cfg.replication
> +box.cfg{replication = {}, replication_skip_conflict = true}
> +box.cfg{replication = replication}
> +
> +-- check last transaction applied without conflicting row
> +box.space.test:select()
> +box.info.replication[1].upstream.status
> +
> +-- make some new conflicting rows with skip-conflicts
> +box.space.test:replace({8, 'r'})
> +box.space.test:replace({9, 'r'})
> +
> +-- issue a conflicting tx
> +test_run:cmd("switch default")
> +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
> +
> +test_run:cmd("switch replica")
> +-- vclock should be increased but rows skipped
> +box.space.test:select()
> +
> +-- check restart does not change something
> +test_run:cmd("switch default")
> +test_run:cmd("restart server replica")
> +test_run:cmd("switch replica")
> +
> +box.space.test:select()
> +box.info.replication[1].upstream.status
> +
> +test_run:cmd("switch default")
> +test_run:cmd("stop server replica")
> +test_run:cmd("cleanup server replica")

You need to delete the replica here and call cleanup_cluster on the
default instance. Please take a look at how other replication tests do
cleanup.

> +
> +box.schema.user.revoke('guest', 'replication')
> +s:drop()

^ permalink raw reply	[flat|nested] 15+ messages in thread

end of thread, other threads:[~2019-03-05 11:59 UTC | newest]

Thread overview: 15+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-03-03 20:26 [tarantool-patches] [PATCH 0/3] Transaction boundaries for applier Georgy Kirichenko
2019-03-03 20:26 ` [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream Georgy Kirichenko
2019-03-05  8:52   ` [tarantool-patches] " Konstantin Osipov
2019-03-05 10:19   ` [tarantool-patches] " Vladimir Davydov
2019-03-03 20:26 ` [tarantool-patches] [PATCH 2/3] Merge apply row and apply_initial_join_row Georgy Kirichenko
2019-03-05  9:06   ` [tarantool-patches] " Konstantin Osipov
2019-03-05 10:26   ` [tarantool-patches] " Vladimir Davydov
2019-03-03 20:26 ` [tarantool-patches] [PATCH 3/3] Transaction support for applier Georgy Kirichenko
2019-03-05  9:11   ` [tarantool-patches] " Konstantin Osipov
2019-03-05 10:28     ` Vladimir Davydov
     [not found]       ` <20190305112333.GA30697@chai>
2019-03-05 11:27         ` Vladimir Davydov
2019-03-05  9:13   ` Konstantin Osipov
2019-03-05  9:25   ` Konstantin Osipov
2019-03-05  9:28   ` Konstantin Osipov
2019-03-05 11:59   ` [tarantool-patches] " Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox