[Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches

Serge Petrenko sergepetrenko at tarantool.org
Mon May 18 15:28:46 MSK 2020


18.05.2020 15:24, Serge Petrenko пишет:
> Make relay accumulate all the tx rows coming from recovery into a linked
> list and send them as a batch.
>
> This enables relay to make transaction-wide manipulations before sending
> rows to replica.
>
> Add one such manipulation right away: when a transaction ends with a
> local row, the local row has is_commit flag set to true. However, local
> rows are not replicated, so the replica never gets an is_commit marker
> and yields an error `ER_UNSUPPORTED: replication does not support
> interleaving transactions`, breaking replication.
>
> Now relay is assigns is_commit = true to the last global row it's
> about to send.
>
> Closes #4928
> ---
>   src/box/relay.cc                              |  98 ++++++++++---
>   test/replication/gh-4928-tx-boundaries.result | 132 ++++++++++++++++++
>   .../gh-4928-tx-boundaries.test.lua            |  61 ++++++++
>   test/replication/suite.cfg                    |   1 +
>   4 files changed, 275 insertions(+), 17 deletions(-)
>   create mode 100644 test/replication/gh-4928-tx-boundaries.result
>   create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua

Sorry for the mess, the correct patch is below:


  src/box/relay.cc                              |  98 ++++++++++---
  test/replication/gh-4928-tx-boundaries.result | 138 ++++++++++++++++++
  .../gh-4928-tx-boundaries.test.lua            |  61 ++++++++
  test/replication/suite.cfg                    |   1 +
  4 files changed, 281 insertions(+), 17 deletions(-)
  create mode 100644 test/replication/gh-4928-tx-boundaries.result
  create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2ad02cb8a..17b7bc667 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -177,7 +177,7 @@ relay_send(struct relay *relay, struct xrow_header 
*packet);
  static void
  relay_send_initial_join_row(struct xstream *stream, struct xrow_header 
*row);
  static void
-relay_send_row(struct xstream *stream, struct xrow_header *row);
+relay_collect_tx(struct xstream *stream, struct xrow_header *row);

  struct relay *
  relay_new(struct replica *replica)
@@ -355,7 +355,7 @@ relay_final_join(int fd, uint64_t sync, struct 
vclock *start_vclock,
      if (relay == NULL)
          diag_raise();

-    relay_start(relay, fd, sync, relay_send_row);
+    relay_start(relay, fd, sync, relay_collect_tx);
      auto relay_guard = make_scoped_guard([=] {
          relay_stop(relay);
          relay_delete(relay);
@@ -705,7 +705,7 @@ relay_subscribe(struct replica *replica, int fd, 
uint64_t sync,
              diag_raise();
      }

-    relay_start(relay, fd, sync, relay_send_row);
+    relay_start(relay, fd, sync, relay_collect_tx);
      auto relay_guard = make_scoped_guard([=] {
          relay_stop(relay);
          replica_on_relay_stop(replica);
@@ -754,11 +754,39 @@ relay_send_initial_join_row(struct xstream 
*stream, struct xrow_header *row)
          relay_send(relay, row);
  }

-/** Send a single row to the client. */
+struct relay_tx_row {
+    struct rlist link;
+    struct xrow_header row;
+};
+
  static void
-relay_send_row(struct xstream *stream, struct xrow_header *packet)
+relay_send_tx(struct relay *relay, struct rlist *tx_rows)
+{
+    ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
+
+    struct relay_tx_row *tx_row;
+    rlist_foreach_entry(tx_row, tx_rows, link) {
+        tx_row->row.sync = relay->sync;
+        coio_write_xrow(&relay->io, &tx_row->row);
+    }
+    relay->last_row_time = ev_monotonic_now(loop());
+
+    struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+    if (inj != NULL && inj->dparam > 0)
+        fiber_sleep(inj->dparam);
+}
+
+/**
+ * Collect all the rows belonging to a single transaction and
+ * send them at once.
+ */
+static void
+relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
  {
      struct relay *relay = container_of(stream, struct relay, stream);
+    static RLIST_HEAD(tx_rows);
+    struct errinj *inj;
+    struct relay_tx_row *tx_row;
      assert(iproto_type_is_dml(packet->type));
      if (packet->group_id == GROUP_LOCAL) {
          /*
@@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct 
xrow_header *packet)
           * order to correctly promote the vclock on the
           * replica.
           */
-        if (packet->replica_id == REPLICA_ID_NIL)
+        if (packet->replica_id == REPLICA_ID_NIL) {
+            /*
+             * Make sure is_commit flag from the
+             * local row makes it to the replica,
+             * in case the transaction is not fully
+             * local.
+             */
+            if (packet->is_commit && !rlist_empty(&tx_rows)) {
+                rlist_last_entry(&tx_rows, struct relay_tx_row,
+                         link)->row.is_commit = true;
+                goto write;
+            }
              return;
+        }
          packet->type = IPROTO_NOP;
          packet->group_id = GROUP_DEFAULT;
          packet->bodycnt = 0;
@@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct 
xrow_header *packet)
       * it). In the latter case packet's LSN is less than or equal to
       * local master's LSN at the moment it received 'SUBSCRIBE' request.
       */
-    if (relay->replica == NULL ||
-        packet->replica_id != relay->replica->id ||
-        packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
-                      packet->replica_id)) {
-        struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
-                        ERRINJ_INT);
-        if (inj != NULL && packet->lsn == inj->iparam) {
-            packet->lsn = inj->iparam - 1;
-            say_warn("injected broken lsn: %lld",
-                 (long long) packet->lsn);
-        }
+    if (relay->replica != NULL && packet->replica_id == 
relay->replica->id &&
+        packet->lsn > vclock_get(&relay->local_vclock_at_subscribe,
+                     packet->replica_id)) {
+        return;
+    }
+
+    inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT);
+    if (inj != NULL && packet->lsn == inj->iparam) {
+        packet->lsn = inj->iparam - 1;
+        say_warn("injected broken lsn: %lld",
+             (long long) packet->lsn);
+    }
+
+    /* A short path for single-statement transactions. */
+    if (packet->is_commit && rlist_empty(&tx_rows)) {
          relay_send(relay, packet);
+        return;
+    }
+
+    tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc,
+                             sizeof(*tx_row));
+    if (tx_row == NULL) {
+        tnt_raise(OutOfMemory, sizeof(*tx_row), "region",
+              "struct relay_tx_row");
+    }
+    tx_row->row = *packet;
+    rlist_add_tail_entry(&tx_rows, tx_row, link);
+
+    if (packet->is_commit) {
+write:        relay_send_tx(relay, &tx_rows);
+        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
+        /*
+         * Free all the relay_tx_rows allocated on the
+         * fiber region.
+         */
+        fiber_gc();
      }
  }
diff --git a/test/replication/gh-4928-tx-boundaries.result 
b/test/replication/gh-4928-tx-boundaries.result
new file mode 100644
index 000000000..969bd8438
--- /dev/null
+++ b/test/replication/gh-4928-tx-boundaries.result
@@ -0,0 +1,138 @@
+-- test-run result file version 2
+-- gh-4928. Test that transactions mixing local and global
+-- space operations are replicated correctly.
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+bit = require('bit')
+ | ---
+ | ...
+
+-- Init.
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+_ = box.schema.space.create('glob')
+ | ---
+ | ...
+_ = box.schema.space.create('loc', {is_local=true})
+ | ---
+ | ...
+_ = box.space.glob:create_index('pk')
+ | ---
+ | ...
+_ = box.space.loc:create_index('pk')
+ | ---
+ | ...
+
+function gen_mixed_tx(i)\
+    box.begin()\
+    if bit.band(i, 1) ~= 0 then\
+        box.space.glob:insert{10 * i + 1}\
+    else\
+        box.space.loc:insert{10 * i + 1}\
+    end\
+    if bit.band(i, 2) ~= 0 then\
+        box.space.glob:insert{10 * i + 2}\
+    else\
+        box.space.loc:insert{10 * i + 2}\
+    end\
+    if bit.band(i, 4) ~= 0 then\
+        box.space.glob:insert{10 * i + 3}\
+    else\
+        box.space.loc:insert{10 * i + 3}\
+    end\
+    box.commit()\
+end
+ | ---
+ | ...
+
+test_run:cmd("create server replica with rpl_master=default,\
+             script='replication/replica.lua'")
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+test_run:wait_downstream(2, {status='follow'})
+ | ---
+ | - true
+ | ...
+
+for i = 0, 7 do gen_mixed_tx(i) end
+ | ---
+ | ...
+
+box.info.replication[2].status
+ | ---
+ | - null
+ | ...
+
+vclock = box.info.vclock
+ | ---
+ | ...
+vclock[0] = nil
+ | ---
+ | ...
+test_run:wait_vclock("replica", vclock)
+ | ---
+ | ...
+
+test_run:cmd('switch replica')
+ | ---
+ | - true
+ | ...
+
+box.info.status
+ | ---
+ | - running
+ | ...
+box.info.replication[1].upstream.status
+ | ---
+ | - follow
+ | ...
+
+box.space.glob:select{}
+ | ---
+ | - - [11]
+ |   - [22]
+ |   - [31]
+ |   - [32]
+ |   - [43]
+ |   - [51]
+ |   - [53]
+ |   - [62]
+ |   - [63]
+ |   - [71]
+ |   - [72]
+ |   - [73]
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+
+-- Cleanup.
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+box.space.loc:drop()
+ | ---
+ | ...
+box.space.glob:drop()
+ | ---
+ | ...
diff --git a/test/replication/gh-4928-tx-boundaries.test.lua 
b/test/replication/gh-4928-tx-boundaries.test.lua
new file mode 100644
index 000000000..92526fc51
--- /dev/null
+++ b/test/replication/gh-4928-tx-boundaries.test.lua
@@ -0,0 +1,61 @@
+-- gh-4928. Test that transactions mixing local and global
+-- space operations are replicated correctly.
+env = require('test_run')
+test_run = env.new()
+bit = require('bit')
+
+-- Init.
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('glob')
+_ = box.schema.space.create('loc', {is_local=true})
+_ = box.space.glob:create_index('pk')
+_ = box.space.loc:create_index('pk')
+
+function gen_mixed_tx(i)\
+    box.begin()\
+    if bit.band(i, 1) ~= 0 then\
+        box.space.glob:insert{10 * i + 1}\
+    else\
+        box.space.loc:insert{10 * i + 1}\
+    end\
+    if bit.band(i, 2) ~= 0 then\
+        box.space.glob:insert{10 * i + 2}\
+    else\
+        box.space.loc:insert{10 * i + 2}\
+    end\
+    if bit.band(i, 4) ~= 0 then\
+        box.space.glob:insert{10 * i + 3}\
+    else\
+        box.space.loc:insert{10 * i + 3}\
+    end\
+    box.commit()\
+end
+
+test_run:cmd("create server replica with rpl_master=default,\
+             script='replication/replica.lua'")
+test_run:cmd('start server replica')
+test_run:wait_downstream(2, {status='follow'})
+
+for i = 0, 7 do gen_mixed_tx(i) end
+
+box.info.replication[2].status
+
+vclock = box.info.vclock
+vclock[0] = nil
+test_run:wait_vclock("replica", vclock)
+
+test_run:cmd('switch replica')
+
+box.info.status
+box.info.replication[1].upstream.status
+
+box.space.glob:select{}
+
+test_run:cmd('switch default')
+
+-- Cleanup.
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.schema.user.revoke('guest', 'replication')
+box.space.loc:drop()
+box.space.glob:drop()
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index d2743b5ed..f357b07da 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -18,6 +18,7 @@
      "gh-4606-admin-creds.test.lua": {},
      "gh-4739-vclock-assert.test.lua": {},
      "gh-4730-applier-rollback.test.lua": {},
+    "gh-4928-tx-boundaries.test.lua": {},
      "*": {
          "memtx": {"engine": "memtx"},
          "vinyl": {"engine": "vinyl"}
-- 
2.24.2 (Apple Git-127)

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list