[Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
Serge Petrenko
sergepetrenko at tarantool.org
Mon May 18 15:28:46 MSK 2020
18.05.2020 15:24, Serge Petrenko пишет:
> Make relay accumulate all the tx rows coming from recovery into a linked
> list and send them as a batch.
>
> This enables relay to make transaction-wide manipulations before sending
> rows to replica.
>
> Add one such manipulation right away: when a transaction ends with a
> local row, the local row has is_commit flag set to true. However, local
> rows are not replicated, so the replica never gets an is_commit marker
> and yields an error `ER_UNSUPPORTED: replication does not support
> interleaving transactions`, breaking replication.
>
> Now relay is assigns is_commit = true to the last global row it's
> about to send.
>
> Closes #4928
> ---
> src/box/relay.cc | 98 ++++++++++---
> test/replication/gh-4928-tx-boundaries.result | 132 ++++++++++++++++++
> .../gh-4928-tx-boundaries.test.lua | 61 ++++++++
> test/replication/suite.cfg | 1 +
> 4 files changed, 275 insertions(+), 17 deletions(-)
> create mode 100644 test/replication/gh-4928-tx-boundaries.result
> create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua
Sorry for the mess, the correct patch is below:
src/box/relay.cc | 98 ++++++++++---
test/replication/gh-4928-tx-boundaries.result | 138 ++++++++++++++++++
.../gh-4928-tx-boundaries.test.lua | 61 ++++++++
test/replication/suite.cfg | 1 +
4 files changed, 281 insertions(+), 17 deletions(-)
create mode 100644 test/replication/gh-4928-tx-boundaries.result
create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2ad02cb8a..17b7bc667 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -177,7 +177,7 @@ relay_send(struct relay *relay, struct xrow_header
*packet);
static void
relay_send_initial_join_row(struct xstream *stream, struct xrow_header
*row);
static void
-relay_send_row(struct xstream *stream, struct xrow_header *row);
+relay_collect_tx(struct xstream *stream, struct xrow_header *row);
struct relay *
relay_new(struct replica *replica)
@@ -355,7 +355,7 @@ relay_final_join(int fd, uint64_t sync, struct
vclock *start_vclock,
if (relay == NULL)
diag_raise();
- relay_start(relay, fd, sync, relay_send_row);
+ relay_start(relay, fd, sync, relay_collect_tx);
auto relay_guard = make_scoped_guard([=] {
relay_stop(relay);
relay_delete(relay);
@@ -705,7 +705,7 @@ relay_subscribe(struct replica *replica, int fd,
uint64_t sync,
diag_raise();
}
- relay_start(relay, fd, sync, relay_send_row);
+ relay_start(relay, fd, sync, relay_collect_tx);
auto relay_guard = make_scoped_guard([=] {
relay_stop(relay);
replica_on_relay_stop(replica);
@@ -754,11 +754,39 @@ relay_send_initial_join_row(struct xstream
*stream, struct xrow_header *row)
relay_send(relay, row);
}
-/** Send a single row to the client. */
+struct relay_tx_row {
+ struct rlist link;
+ struct xrow_header row;
+};
+
static void
-relay_send_row(struct xstream *stream, struct xrow_header *packet)
+relay_send_tx(struct relay *relay, struct rlist *tx_rows)
+{
+ ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
+
+ struct relay_tx_row *tx_row;
+ rlist_foreach_entry(tx_row, tx_rows, link) {
+ tx_row->row.sync = relay->sync;
+ coio_write_xrow(&relay->io, &tx_row->row);
+ }
+ relay->last_row_time = ev_monotonic_now(loop());
+
+ struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+ if (inj != NULL && inj->dparam > 0)
+ fiber_sleep(inj->dparam);
+}
+
+/**
+ * Collect all the rows belonging to a single transaction and
+ * send them at once.
+ */
+static void
+relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
{
struct relay *relay = container_of(stream, struct relay, stream);
+ static RLIST_HEAD(tx_rows);
+ struct errinj *inj;
+ struct relay_tx_row *tx_row;
assert(iproto_type_is_dml(packet->type));
if (packet->group_id == GROUP_LOCAL) {
/*
@@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct
xrow_header *packet)
* order to correctly promote the vclock on the
* replica.
*/
- if (packet->replica_id == REPLICA_ID_NIL)
+ if (packet->replica_id == REPLICA_ID_NIL) {
+ /*
+ * Make sure is_commit flag from the
+ * local row makes it to the replica,
+ * in case the transaction is not fully
+ * local.
+ */
+ if (packet->is_commit && !rlist_empty(&tx_rows)) {
+ rlist_last_entry(&tx_rows, struct relay_tx_row,
+ link)->row.is_commit = true;
+ goto write;
+ }
return;
+ }
packet->type = IPROTO_NOP;
packet->group_id = GROUP_DEFAULT;
packet->bodycnt = 0;
@@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct
xrow_header *packet)
* it). In the latter case packet's LSN is less than or equal to
* local master's LSN at the moment it received 'SUBSCRIBE' request.
*/
- if (relay->replica == NULL ||
- packet->replica_id != relay->replica->id ||
- packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
- packet->replica_id)) {
- struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
- ERRINJ_INT);
- if (inj != NULL && packet->lsn == inj->iparam) {
- packet->lsn = inj->iparam - 1;
- say_warn("injected broken lsn: %lld",
- (long long) packet->lsn);
- }
+ if (relay->replica != NULL && packet->replica_id ==
relay->replica->id &&
+ packet->lsn > vclock_get(&relay->local_vclock_at_subscribe,
+ packet->replica_id)) {
+ return;
+ }
+
+ inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT);
+ if (inj != NULL && packet->lsn == inj->iparam) {
+ packet->lsn = inj->iparam - 1;
+ say_warn("injected broken lsn: %lld",
+ (long long) packet->lsn);
+ }
+
+ /* A short path for single-statement transactions. */
+ if (packet->is_commit && rlist_empty(&tx_rows)) {
relay_send(relay, packet);
+ return;
+ }
+
+ tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc,
+ sizeof(*tx_row));
+ if (tx_row == NULL) {
+ tnt_raise(OutOfMemory, sizeof(*tx_row), "region",
+ "struct relay_tx_row");
+ }
+ tx_row->row = *packet;
+ rlist_add_tail_entry(&tx_rows, tx_row, link);
+
+ if (packet->is_commit) {
+write: relay_send_tx(relay, &tx_rows);
+ tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
+ /*
+ * Free all the relay_tx_rows allocated on the
+ * fiber region.
+ */
+ fiber_gc();
}
}
diff --git a/test/replication/gh-4928-tx-boundaries.result
b/test/replication/gh-4928-tx-boundaries.result
new file mode 100644
index 000000000..969bd8438
--- /dev/null
+++ b/test/replication/gh-4928-tx-boundaries.result
@@ -0,0 +1,138 @@
+-- test-run result file version 2
+-- gh-4928. Test that transactions mixing local and global
+-- space operations are replicated correctly.
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+bit = require('bit')
+ | ---
+ | ...
+
+-- Init.
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+_ = box.schema.space.create('glob')
+ | ---
+ | ...
+_ = box.schema.space.create('loc', {is_local=true})
+ | ---
+ | ...
+_ = box.space.glob:create_index('pk')
+ | ---
+ | ...
+_ = box.space.loc:create_index('pk')
+ | ---
+ | ...
+
+function gen_mixed_tx(i)\
+ box.begin()\
+ if bit.band(i, 1) ~= 0 then\
+ box.space.glob:insert{10 * i + 1}\
+ else\
+ box.space.loc:insert{10 * i + 1}\
+ end\
+ if bit.band(i, 2) ~= 0 then\
+ box.space.glob:insert{10 * i + 2}\
+ else\
+ box.space.loc:insert{10 * i + 2}\
+ end\
+ if bit.band(i, 4) ~= 0 then\
+ box.space.glob:insert{10 * i + 3}\
+ else\
+ box.space.loc:insert{10 * i + 3}\
+ end\
+ box.commit()\
+end
+ | ---
+ | ...
+
+test_run:cmd("create server replica with rpl_master=default,\
+ script='replication/replica.lua'")
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+test_run:wait_downstream(2, {status='follow'})
+ | ---
+ | - true
+ | ...
+
+for i = 0, 7 do gen_mixed_tx(i) end
+ | ---
+ | ...
+
+box.info.replication[2].status
+ | ---
+ | - null
+ | ...
+
+vclock = box.info.vclock
+ | ---
+ | ...
+vclock[0] = nil
+ | ---
+ | ...
+test_run:wait_vclock("replica", vclock)
+ | ---
+ | ...
+
+test_run:cmd('switch replica')
+ | ---
+ | - true
+ | ...
+
+box.info.status
+ | ---
+ | - running
+ | ...
+box.info.replication[1].upstream.status
+ | ---
+ | - follow
+ | ...
+
+box.space.glob:select{}
+ | ---
+ | - - [11]
+ | - [22]
+ | - [31]
+ | - [32]
+ | - [43]
+ | - [51]
+ | - [53]
+ | - [62]
+ | - [63]
+ | - [71]
+ | - [72]
+ | - [73]
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+
+-- Cleanup.
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+box.space.loc:drop()
+ | ---
+ | ...
+box.space.glob:drop()
+ | ---
+ | ...
diff --git a/test/replication/gh-4928-tx-boundaries.test.lua
b/test/replication/gh-4928-tx-boundaries.test.lua
new file mode 100644
index 000000000..92526fc51
--- /dev/null
+++ b/test/replication/gh-4928-tx-boundaries.test.lua
@@ -0,0 +1,61 @@
+-- gh-4928. Test that transactions mixing local and global
+-- space operations are replicated correctly.
+env = require('test_run')
+test_run = env.new()
+bit = require('bit')
+
+-- Init.
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('glob')
+_ = box.schema.space.create('loc', {is_local=true})
+_ = box.space.glob:create_index('pk')
+_ = box.space.loc:create_index('pk')
+
+function gen_mixed_tx(i)\
+ box.begin()\
+ if bit.band(i, 1) ~= 0 then\
+ box.space.glob:insert{10 * i + 1}\
+ else\
+ box.space.loc:insert{10 * i + 1}\
+ end\
+ if bit.band(i, 2) ~= 0 then\
+ box.space.glob:insert{10 * i + 2}\
+ else\
+ box.space.loc:insert{10 * i + 2}\
+ end\
+ if bit.band(i, 4) ~= 0 then\
+ box.space.glob:insert{10 * i + 3}\
+ else\
+ box.space.loc:insert{10 * i + 3}\
+ end\
+ box.commit()\
+end
+
+test_run:cmd("create server replica with rpl_master=default,\
+ script='replication/replica.lua'")
+test_run:cmd('start server replica')
+test_run:wait_downstream(2, {status='follow'})
+
+for i = 0, 7 do gen_mixed_tx(i) end
+
+box.info.replication[2].status
+
+vclock = box.info.vclock
+vclock[0] = nil
+test_run:wait_vclock("replica", vclock)
+
+test_run:cmd('switch replica')
+
+box.info.status
+box.info.replication[1].upstream.status
+
+box.space.glob:select{}
+
+test_run:cmd('switch default')
+
+-- Cleanup.
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.schema.user.revoke('guest', 'replication')
+box.space.loc:drop()
+box.space.glob:drop()
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index d2743b5ed..f357b07da 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -18,6 +18,7 @@
"gh-4606-admin-creds.test.lua": {},
"gh-4739-vclock-assert.test.lua": {},
"gh-4730-applier-rollback.test.lua": {},
+ "gh-4928-tx-boundaries.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
--
2.24.2 (Apple Git-127)
--
Serge Petrenko
More information about the Tarantool-patches
mailing list