[Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches

Serge Petrenko sergepetrenko at tarantool.org
Mon May 18 15:24:05 MSK 2020


Make relay accumulate all the tx rows coming from recovery into a linked
list and send them as a batch.

This enables relay to make transaction-wide manipulations before sending
rows to replica.

Add one such manipulation right away: when a transaction ends with a
local row, the local row has is_commit flag set to true. However, local
rows are not replicated, so the replica never gets an is_commit marker
and yields an error `ER_UNSUPPORTED: replication does not support
interleaving transactions`, breaking replication.

Now relay is assigns is_commit = true to the last global row it's
about to send.

Closes #4928
---
 src/box/relay.cc                              |  98 ++++++++++---
 test/replication/gh-4928-tx-boundaries.result | 132 ++++++++++++++++++
 .../gh-4928-tx-boundaries.test.lua            |  61 ++++++++
 test/replication/suite.cfg                    |   1 +
 4 files changed, 275 insertions(+), 17 deletions(-)
 create mode 100644 test/replication/gh-4928-tx-boundaries.result
 create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2ad02cb8a..17b7bc667 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -177,7 +177,7 @@ relay_send(struct relay *relay, struct xrow_header *packet);
 static void
 relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row);
 static void
-relay_send_row(struct xstream *stream, struct xrow_header *row);
+relay_collect_tx(struct xstream *stream, struct xrow_header *row);
 
 struct relay *
 relay_new(struct replica *replica)
@@ -355,7 +355,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 	if (relay == NULL)
 		diag_raise();
 
-	relay_start(relay, fd, sync, relay_send_row);
+	relay_start(relay, fd, sync, relay_collect_tx);
 	auto relay_guard = make_scoped_guard([=] {
 		relay_stop(relay);
 		relay_delete(relay);
@@ -705,7 +705,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 			diag_raise();
 	}
 
-	relay_start(relay, fd, sync, relay_send_row);
+	relay_start(relay, fd, sync, relay_collect_tx);
 	auto relay_guard = make_scoped_guard([=] {
 		relay_stop(relay);
 		replica_on_relay_stop(replica);
@@ -754,11 +754,39 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
-/** Send a single row to the client. */
+struct relay_tx_row {
+	struct rlist link;
+	struct xrow_header row;
+};
+
 static void
-relay_send_row(struct xstream *stream, struct xrow_header *packet)
+relay_send_tx(struct relay *relay, struct rlist *tx_rows)
+{
+	ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
+
+	struct relay_tx_row *tx_row;
+	rlist_foreach_entry(tx_row, tx_rows, link) {
+		tx_row->row.sync = relay->sync;
+		coio_write_xrow(&relay->io, &tx_row->row);
+	}
+	relay->last_row_time = ev_monotonic_now(loop());
+
+	struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+	if (inj != NULL && inj->dparam > 0)
+		fiber_sleep(inj->dparam);
+}
+
+/**
+ * Collect all the rows belonging to a single transaction and
+ * send them at once.
+ */
+static void
+relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
+	static RLIST_HEAD(tx_rows);
+	struct errinj *inj;
+	struct relay_tx_row *tx_row;
 	assert(iproto_type_is_dml(packet->type));
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
@@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		 * order to correctly promote the vclock on the
 		 * replica.
 		 */
-		if (packet->replica_id == REPLICA_ID_NIL)
+		if (packet->replica_id == REPLICA_ID_NIL) {
+			/*
+			 * Make sure is_commit flag from the
+			 * local row makes it to the replica,
+			 * in case the transaction is not fully
+			 * local.
+			 */
+			if (packet->is_commit && !rlist_empty(&tx_rows)) {
+				rlist_last_entry(&tx_rows, struct relay_tx_row,
+						 link)->row.is_commit = true;
+				goto write;
+			}
 			return;
+		}
 		packet->type = IPROTO_NOP;
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
@@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 	 * it). In the latter case packet's LSN is less than or equal to
 	 * local master's LSN at the moment it received 'SUBSCRIBE' request.
 	 */
-	if (relay->replica == NULL ||
-	    packet->replica_id != relay->replica->id ||
-	    packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
-				      packet->replica_id)) {
-		struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
-					    ERRINJ_INT);
-		if (inj != NULL && packet->lsn == inj->iparam) {
-			packet->lsn = inj->iparam - 1;
-			say_warn("injected broken lsn: %lld",
-				 (long long) packet->lsn);
-		}
+	if (relay->replica != NULL && packet->replica_id == relay->replica->id &&
+	    packet->lsn > vclock_get(&relay->local_vclock_at_subscribe,
+				     packet->replica_id)) {
+		return;
+	}
+
+	inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT);
+	if (inj != NULL && packet->lsn == inj->iparam) {
+		packet->lsn = inj->iparam - 1;
+		say_warn("injected broken lsn: %lld",
+			 (long long) packet->lsn);
+	}
+
+	/* A short path for single-statement transactions. */
+	if (packet->is_commit && rlist_empty(&tx_rows)) {
 		relay_send(relay, packet);
+		return;
+	}
+
+	tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc,
+						     sizeof(*tx_row));
+	if (tx_row == NULL) {
+		tnt_raise(OutOfMemory, sizeof(*tx_row), "region",
+			  "struct relay_tx_row");
+	}
+	tx_row->row = *packet;
+	rlist_add_tail_entry(&tx_rows, tx_row, link);
+
+	if (packet->is_commit) {
+write:		relay_send_tx(relay, &tx_rows);
+		tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
+		/*
+		 * Free all the relay_tx_rows allocated on the
+		 * fiber region.
+		 */
+		fiber_gc();
 	}
 }
diff --git a/test/replication/gh-4928-tx-boundaries.result b/test/replication/gh-4928-tx-boundaries.result
new file mode 100644
index 000000000..89b569e65
--- /dev/null
+++ b/test/replication/gh-4928-tx-boundaries.result
@@ -0,0 +1,132 @@
+-- test-run result file version 2
+-- gh-4928. Test that transactions mixing local and global
+-- space operations are replicated correctly.
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+bit = require('bit')
+ | ---
+ | ...
+
+-- Init.
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+box.schema.space.create('glob')
+ | ---
+ | - engine: memtx
+ |   before_replace: 'function: 0x010392fcb0'
+ |   on_replace: 'function: 0x010392fc78'
+ |   ck_constraint: []
+ |   field_count: 0
+ |   temporary: false
+ |   index: []
+ |   is_local: false
+ |   enabled: false
+ |   name: glob
+ |   id: 512
+ | - created
+ | ...
+box.schema.space.create('loc', {is_local=true})
+ | ---
+ | - engine: memtx
+ |   before_replace: 'function: 0x010383f3b0'
+ |   on_replace: 'function: 0x01033dcad8'
+ |   ck_constraint: []
+ |   field_count: 0
+ |   temporary: false
+ |   index: []
+ |   is_local: true
+ |   enabled: false
+ |   name: loc
+ |   id: 513
+ | - created
+ | ...
+box.space.glob:create_index('pk')
+ | ---
+ | - unique: true
+ |   parts:
+ |   - type: unsigned
+ |     is_nullable: false
+ |     fieldno: 1
+ |   id: 0
+ |   space_id: 512
+ |   type: TREE
+ |   name: pk
+ | ...
+box.space.loc:create_index('pk')
+ | ---
+ | - unique: true
+ |   parts:
+ |   - type: unsigned
+ |     is_nullable: false
+ |     fieldno: 1
+ |   id: 0
+ |   space_id: 513
+ |   type: TREE
+ |   name: pk
+ | ...
+
+function gen_mixed_tx(i)\
+    box.begin()\
+    if bit.band(i, 1) ~= 0 then\
+        box.space.glob:insert{10 * i + 1}\
+    else\
+        box.space.loc:insert{10 * i + 1}\
+    end\
+    if bit.band(i, 2) ~= 0 then\
+        box.space.glob:insert{10 * i + 2}\
+    else\
+        box.space.loc:insert{10 * i + 2}\
+    end\
+    if bit.band(i, 4) ~= 0 then\
+        box.space.glob:insert{10 * i + 3}\
+    else\
+        box.space.loc:insert{10 * i + 3}\
+    end\
+    box.commit()\
+end
+ | ---
+ | ...
+
+test_run:cmd("create server replica with rpl_master=default,\
+             script='replication/replica.lua'")
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+test_run:wait_downstream(2, {status='follow'})
+ | ---
+ | - true
+ | ...
+
+for i = 0, 7 do gen_mixed_tx(i) end
+ | ---
+ | ...
+
+box.info.replication[2].status
+ | ---
+ | - null
+ | ...
+
+vclock = box.info.vclock
+ | ---
+ | ...
+vclock[0] = nil
+ | ---
+ | ...
+test_run:wait_vclock("replica", vclock)
+ | 
+
+test_run:cmd('switch replica')
+ | [Lost current connection]
+
+box.info.status
+ | [Lost current connection]
+box.info.replication[1].upstream.status
diff --git a/test/replication/gh-4928-tx-boundaries.test.lua b/test/replication/gh-4928-tx-boundaries.test.lua
new file mode 100644
index 000000000..07599c27f
--- /dev/null
+++ b/test/replication/gh-4928-tx-boundaries.test.lua
@@ -0,0 +1,61 @@
+-- gh-4928. Test that transactions mixing local and global
+-- space operations are replicated correctly.
+env = require('test_run')
+test_run = env.new()
+bit = require('bit')
+
+-- Init.
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('glob')
+_ = box.schema.space.create('loc', {is_local=true})
+_ = box.space.glob:create_index('pk')
+_ = box.space.loc:create_index('pk')
+
+function gen_mixed_tx(i)\
+    box.begin()\
+    if bit.band(i, 1) ~= 0 then\
+        box.space.glob:insert{10 * i + 1}\
+    else\
+        box.space.loc:insert{10 * i + 1}\
+    end\
+    if bit.band(i, 2) ~= 0 then\
+        box.space.glob:insert{10 * i + 2}\
+    else\
+        box.space.loc:insert{10 * i + 2}\
+    end\
+    if bit.band(i, 4) ~= 0 then\
+        box.space.glob:insert{10 * i + 3}\
+    else\
+        box.space.loc:insert{10 * i + 3}\
+    end\
+    box.commit()\
+end
+
+test_run:cmd("create server replica with rpl_master=default,\
+             script='replication/replica.lua'")
+test_run:cmd('start server replica')
+test_run:wait_downstream(2, {status='follow'})
+
+for i = 0, 7 do gen_mixed_tx(i) end
+
+box.info.replication[2].status
+
+vclock = box.info.vclock
+vclock[0] = nil
+test_run:wait_vclock("replica", vclock)
+
+test_run:cmd('switch replica')
+
+box.info.status
+box.info.replication[1].upstream.status
+
+box.space.test:select{}
+
+test_run:cmd('switch default')
+
+-- Cleanup.
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.schema.user.revoke('guest', 'replication')
+box.space.loc:drop()
+box.space.glob:drop()
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index d2743b5ed..f357b07da 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -18,6 +18,7 @@
     "gh-4606-admin-creds.test.lua": {},
     "gh-4739-vclock-assert.test.lua": {},
     "gh-4730-applier-rollback.test.lua": {},
+    "gh-4928-tx-boundaries.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
-- 
2.24.2 (Apple Git-127)



More information about the Tarantool-patches mailing list