Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH 0/2] fix replication tx boundaries after local space rework
@ 2020-05-18 12:24 Serge Petrenko
  2020-05-18 12:24 ` [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries Serge Petrenko
  2020-05-18 12:24 ` [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches Serge Petrenko
  0 siblings, 2 replies; 11+ messages in thread
From: Serge Petrenko @ 2020-05-18 12:24 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

https://github.com/tarantool/tarantool/issues/4928
https://github.com/tarantool/tarantool/tree/sp/gh-4928-tx-boundary-fix

The patchset fixes 2 errors in replication resulting from trying to replicate
transactions mixing local and global space requests.

The first patch fixes an error when a local row is the first in tx and the
second patch fixes an error when a local row is the last one in tx.

Serge Petrenko (2):
  wal: fix tx boundaries
  replication: make relay send txs in batches

 src/box/relay.cc                              |  98 ++++++++++---
 src/box/wal.c                                 |  26 +++-
 test/replication/gh-4928-tx-boundaries.result | 132 ++++++++++++++++++
 .../gh-4928-tx-boundaries.test.lua            |  61 ++++++++
 test/replication/suite.cfg                    |   1 +
 5 files changed, 297 insertions(+), 21 deletions(-)
 create mode 100644 test/replication/gh-4928-tx-boundaries.result
 create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua

-- 
2.24.2 (Apple Git-127)

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries
  2020-05-18 12:24 [Tarantool-patches] [PATCH 0/2] fix replication tx boundaries after local space rework Serge Petrenko
@ 2020-05-18 12:24 ` Serge Petrenko
  2020-05-19  9:08   ` Cyrill Gorcunov
  2020-05-18 12:24 ` [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches Serge Petrenko
  1 sibling, 1 reply; 11+ messages in thread
From: Serge Petrenko @ 2020-05-18 12:24 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

In order to preserve transaction boundaries in replication protocol, wal
assigns each tx row a transaction sequence number (tsn). Tsn is equal to
the lsn of the first transaction row.

Starting with commit 7eb4650eecf1ac382119d0038076c19b2708f4a1, local
space requests are assigned a special replica id, 0, and have their own
lsns. These operations are not replicated.

If a transaction starting with a local space operation ends up in the
WAL, it gets a tsn equal to the lsn of the local space request. Then,
during replication, when such a transaction is replicated, the local
space request is omitted, and replica receives a global part of the
transaction with a seemingly random tsn, yielding an ER_PROTOCOL error:
"Transaction id must be equal to LSN of the first row in the transaction".

Assign tsn as equal to the lsn of the first global row in the
transaction to fix the problem, and assign tsn as before for fully local
transactions.

Follow-up #4114
Part-of #4928
---
 src/box/wal.c | 26 ++++++++++++++++++++++----
 1 file changed, 22 insertions(+), 4 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index b979244e3..d36d4259e 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -956,24 +956,33 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 	       struct xrow_header **end)
 {
 	int64_t tsn = 0;
+	struct xrow_header **start = row;
+	struct xrow_header **first_glob_row = end;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
 			/*
 			 * All rows representing local space data
-			 * manipulations are signed wth a zero
+			 * manipulations are signed with a zero
 			 * instance id. This is also true for
 			 * anonymous replicas, since they are
 			 * only capable of writing to local and
 			 * temporary spaces.
 			 */
-			if ((*row)->group_id != GROUP_LOCAL)
+			if ((*row)->group_id != GROUP_LOCAL) {
 				(*row)->replica_id = instance_id;
+			}
 
 			(*row)->lsn = vclock_inc(vclock_diff, (*row)->replica_id) +
 				      vclock_get(base, (*row)->replica_id);
-			/* Use lsn of the first local row as transaction id. */
-			tsn = tsn == 0 ? (*row)->lsn : tsn;
+			/*
+			 * Use lsn of the first global row as
+			 * transaction id.
+			 */
+			if ((*row)->group_id != GROUP_LOCAL && tsn == 0) {
+				tsn = (*row)->lsn;
+				first_glob_row = row;
+			}
 			(*row)->tsn = tsn;
 			(*row)->is_commit = row == end - 1;
 		} else {
@@ -993,6 +1002,15 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 			}
 		}
 	}
+	if (tsn == 0)
+		tsn = (*start)->lsn;
+	/*
+	 * Fill transaction id for all the local rows preceding
+	 * the first global row. tsn was yet unknown when those
+	 * rows were processed.
+	 */
+	for (row = start; row < first_glob_row; row++)
+		(*row)->tsn = tsn;
 }
 
 static void
-- 
2.24.2 (Apple Git-127)

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
  2020-05-18 12:24 [Tarantool-patches] [PATCH 0/2] fix replication tx boundaries after local space rework Serge Petrenko
  2020-05-18 12:24 ` [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries Serge Petrenko
@ 2020-05-18 12:24 ` Serge Petrenko
  2020-05-18 12:28   ` Serge Petrenko
  1 sibling, 1 reply; 11+ messages in thread
From: Serge Petrenko @ 2020-05-18 12:24 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

Make relay accumulate all the tx rows coming from recovery into a linked
list and send them as a batch.

This enables relay to make transaction-wide manipulations before sending
rows to replica.

Add one such manipulation right away: when a transaction ends with a
local row, the local row has is_commit flag set to true. However, local
rows are not replicated, so the replica never gets an is_commit marker
and yields an error `ER_UNSUPPORTED: replication does not support
interleaving transactions`, breaking replication.

Now relay is assigns is_commit = true to the last global row it's
about to send.

Closes #4928
---
 src/box/relay.cc                              |  98 ++++++++++---
 test/replication/gh-4928-tx-boundaries.result | 132 ++++++++++++++++++
 .../gh-4928-tx-boundaries.test.lua            |  61 ++++++++
 test/replication/suite.cfg                    |   1 +
 4 files changed, 275 insertions(+), 17 deletions(-)
 create mode 100644 test/replication/gh-4928-tx-boundaries.result
 create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2ad02cb8a..17b7bc667 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -177,7 +177,7 @@ relay_send(struct relay *relay, struct xrow_header *packet);
 static void
 relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row);
 static void
-relay_send_row(struct xstream *stream, struct xrow_header *row);
+relay_collect_tx(struct xstream *stream, struct xrow_header *row);
 
 struct relay *
 relay_new(struct replica *replica)
@@ -355,7 +355,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 	if (relay == NULL)
 		diag_raise();
 
-	relay_start(relay, fd, sync, relay_send_row);
+	relay_start(relay, fd, sync, relay_collect_tx);
 	auto relay_guard = make_scoped_guard([=] {
 		relay_stop(relay);
 		relay_delete(relay);
@@ -705,7 +705,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 			diag_raise();
 	}
 
-	relay_start(relay, fd, sync, relay_send_row);
+	relay_start(relay, fd, sync, relay_collect_tx);
 	auto relay_guard = make_scoped_guard([=] {
 		relay_stop(relay);
 		replica_on_relay_stop(replica);
@@ -754,11 +754,39 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
-/** Send a single row to the client. */
+struct relay_tx_row {
+	struct rlist link;
+	struct xrow_header row;
+};
+
 static void
-relay_send_row(struct xstream *stream, struct xrow_header *packet)
+relay_send_tx(struct relay *relay, struct rlist *tx_rows)
+{
+	ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
+
+	struct relay_tx_row *tx_row;
+	rlist_foreach_entry(tx_row, tx_rows, link) {
+		tx_row->row.sync = relay->sync;
+		coio_write_xrow(&relay->io, &tx_row->row);
+	}
+	relay->last_row_time = ev_monotonic_now(loop());
+
+	struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+	if (inj != NULL && inj->dparam > 0)
+		fiber_sleep(inj->dparam);
+}
+
+/**
+ * Collect all the rows belonging to a single transaction and
+ * send them at once.
+ */
+static void
+relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
+	static RLIST_HEAD(tx_rows);
+	struct errinj *inj;
+	struct relay_tx_row *tx_row;
 	assert(iproto_type_is_dml(packet->type));
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
@@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		 * order to correctly promote the vclock on the
 		 * replica.
 		 */
-		if (packet->replica_id == REPLICA_ID_NIL)
+		if (packet->replica_id == REPLICA_ID_NIL) {
+			/*
+			 * Make sure is_commit flag from the
+			 * local row makes it to the replica,
+			 * in case the transaction is not fully
+			 * local.
+			 */
+			if (packet->is_commit && !rlist_empty(&tx_rows)) {
+				rlist_last_entry(&tx_rows, struct relay_tx_row,
+						 link)->row.is_commit = true;
+				goto write;
+			}
 			return;
+		}
 		packet->type = IPROTO_NOP;
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
@@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 	 * it). In the latter case packet's LSN is less than or equal to
 	 * local master's LSN at the moment it received 'SUBSCRIBE' request.
 	 */
-	if (relay->replica == NULL ||
-	    packet->replica_id != relay->replica->id ||
-	    packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
-				      packet->replica_id)) {
-		struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
-					    ERRINJ_INT);
-		if (inj != NULL && packet->lsn == inj->iparam) {
-			packet->lsn = inj->iparam - 1;
-			say_warn("injected broken lsn: %lld",
-				 (long long) packet->lsn);
-		}
+	if (relay->replica != NULL && packet->replica_id == relay->replica->id &&
+	    packet->lsn > vclock_get(&relay->local_vclock_at_subscribe,
+				     packet->replica_id)) {
+		return;
+	}
+
+	inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT);
+	if (inj != NULL && packet->lsn == inj->iparam) {
+		packet->lsn = inj->iparam - 1;
+		say_warn("injected broken lsn: %lld",
+			 (long long) packet->lsn);
+	}
+
+	/* A short path for single-statement transactions. */
+	if (packet->is_commit && rlist_empty(&tx_rows)) {
 		relay_send(relay, packet);
+		return;
+	}
+
+	tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc,
+						     sizeof(*tx_row));
+	if (tx_row == NULL) {
+		tnt_raise(OutOfMemory, sizeof(*tx_row), "region",
+			  "struct relay_tx_row");
+	}
+	tx_row->row = *packet;
+	rlist_add_tail_entry(&tx_rows, tx_row, link);
+
+	if (packet->is_commit) {
+write:		relay_send_tx(relay, &tx_rows);
+		tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
+		/*
+		 * Free all the relay_tx_rows allocated on the
+		 * fiber region.
+		 */
+		fiber_gc();
 	}
 }
diff --git a/test/replication/gh-4928-tx-boundaries.result b/test/replication/gh-4928-tx-boundaries.result
new file mode 100644
index 000000000..89b569e65
--- /dev/null
+++ b/test/replication/gh-4928-tx-boundaries.result
@@ -0,0 +1,132 @@
+-- test-run result file version 2
+-- gh-4928. Test that transactions mixing local and global
+-- space operations are replicated correctly.
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+bit = require('bit')
+ | ---
+ | ...
+
+-- Init.
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+box.schema.space.create('glob')
+ | ---
+ | - engine: memtx
+ |   before_replace: 'function: 0x010392fcb0'
+ |   on_replace: 'function: 0x010392fc78'
+ |   ck_constraint: []
+ |   field_count: 0
+ |   temporary: false
+ |   index: []
+ |   is_local: false
+ |   enabled: false
+ |   name: glob
+ |   id: 512
+ | - created
+ | ...
+box.schema.space.create('loc', {is_local=true})
+ | ---
+ | - engine: memtx
+ |   before_replace: 'function: 0x010383f3b0'
+ |   on_replace: 'function: 0x01033dcad8'
+ |   ck_constraint: []
+ |   field_count: 0
+ |   temporary: false
+ |   index: []
+ |   is_local: true
+ |   enabled: false
+ |   name: loc
+ |   id: 513
+ | - created
+ | ...
+box.space.glob:create_index('pk')
+ | ---
+ | - unique: true
+ |   parts:
+ |   - type: unsigned
+ |     is_nullable: false
+ |     fieldno: 1
+ |   id: 0
+ |   space_id: 512
+ |   type: TREE
+ |   name: pk
+ | ...
+box.space.loc:create_index('pk')
+ | ---
+ | - unique: true
+ |   parts:
+ |   - type: unsigned
+ |     is_nullable: false
+ |     fieldno: 1
+ |   id: 0
+ |   space_id: 513
+ |   type: TREE
+ |   name: pk
+ | ...
+
+function gen_mixed_tx(i)\
+    box.begin()\
+    if bit.band(i, 1) ~= 0 then\
+        box.space.glob:insert{10 * i + 1}\
+    else\
+        box.space.loc:insert{10 * i + 1}\
+    end\
+    if bit.band(i, 2) ~= 0 then\
+        box.space.glob:insert{10 * i + 2}\
+    else\
+        box.space.loc:insert{10 * i + 2}\
+    end\
+    if bit.band(i, 4) ~= 0 then\
+        box.space.glob:insert{10 * i + 3}\
+    else\
+        box.space.loc:insert{10 * i + 3}\
+    end\
+    box.commit()\
+end
+ | ---
+ | ...
+
+test_run:cmd("create server replica with rpl_master=default,\
+             script='replication/replica.lua'")
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+test_run:wait_downstream(2, {status='follow'})
+ | ---
+ | - true
+ | ...
+
+for i = 0, 7 do gen_mixed_tx(i) end
+ | ---
+ | ...
+
+box.info.replication[2].status
+ | ---
+ | - null
+ | ...
+
+vclock = box.info.vclock
+ | ---
+ | ...
+vclock[0] = nil
+ | ---
+ | ...
+test_run:wait_vclock("replica", vclock)
+ | 
+
+test_run:cmd('switch replica')
+ | [Lost current connection]
+
+box.info.status
+ | [Lost current connection]
+box.info.replication[1].upstream.status
diff --git a/test/replication/gh-4928-tx-boundaries.test.lua b/test/replication/gh-4928-tx-boundaries.test.lua
new file mode 100644
index 000000000..07599c27f
--- /dev/null
+++ b/test/replication/gh-4928-tx-boundaries.test.lua
@@ -0,0 +1,61 @@
+-- gh-4928. Test that transactions mixing local and global
+-- space operations are replicated correctly.
+env = require('test_run')
+test_run = env.new()
+bit = require('bit')
+
+-- Init.
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('glob')
+_ = box.schema.space.create('loc', {is_local=true})
+_ = box.space.glob:create_index('pk')
+_ = box.space.loc:create_index('pk')
+
+function gen_mixed_tx(i)\
+    box.begin()\
+    if bit.band(i, 1) ~= 0 then\
+        box.space.glob:insert{10 * i + 1}\
+    else\
+        box.space.loc:insert{10 * i + 1}\
+    end\
+    if bit.band(i, 2) ~= 0 then\
+        box.space.glob:insert{10 * i + 2}\
+    else\
+        box.space.loc:insert{10 * i + 2}\
+    end\
+    if bit.band(i, 4) ~= 0 then\
+        box.space.glob:insert{10 * i + 3}\
+    else\
+        box.space.loc:insert{10 * i + 3}\
+    end\
+    box.commit()\
+end
+
+test_run:cmd("create server replica with rpl_master=default,\
+             script='replication/replica.lua'")
+test_run:cmd('start server replica')
+test_run:wait_downstream(2, {status='follow'})
+
+for i = 0, 7 do gen_mixed_tx(i) end
+
+box.info.replication[2].status
+
+vclock = box.info.vclock
+vclock[0] = nil
+test_run:wait_vclock("replica", vclock)
+
+test_run:cmd('switch replica')
+
+box.info.status
+box.info.replication[1].upstream.status
+
+box.space.test:select{}
+
+test_run:cmd('switch default')
+
+-- Cleanup.
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.schema.user.revoke('guest', 'replication')
+box.space.loc:drop()
+box.space.glob:drop()
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index d2743b5ed..f357b07da 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -18,6 +18,7 @@
     "gh-4606-admin-creds.test.lua": {},
     "gh-4739-vclock-assert.test.lua": {},
     "gh-4730-applier-rollback.test.lua": {},
+    "gh-4928-tx-boundaries.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
-- 
2.24.2 (Apple Git-127)

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
  2020-05-18 12:24 ` [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches Serge Petrenko
@ 2020-05-18 12:28   ` Serge Petrenko
  2020-05-19 10:23     ` Cyrill Gorcunov
  0 siblings, 1 reply; 11+ messages in thread
From: Serge Petrenko @ 2020-05-18 12:28 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches


18.05.2020 15:24, Serge Petrenko пишет:
> Make relay accumulate all the tx rows coming from recovery into a linked
> list and send them as a batch.
>
> This enables relay to make transaction-wide manipulations before sending
> rows to replica.
>
> Add one such manipulation right away: when a transaction ends with a
> local row, the local row has is_commit flag set to true. However, local
> rows are not replicated, so the replica never gets an is_commit marker
> and yields an error `ER_UNSUPPORTED: replication does not support
> interleaving transactions`, breaking replication.
>
> Now relay is assigns is_commit = true to the last global row it's
> about to send.
>
> Closes #4928
> ---
>   src/box/relay.cc                              |  98 ++++++++++---
>   test/replication/gh-4928-tx-boundaries.result | 132 ++++++++++++++++++
>   .../gh-4928-tx-boundaries.test.lua            |  61 ++++++++
>   test/replication/suite.cfg                    |   1 +
>   4 files changed, 275 insertions(+), 17 deletions(-)
>   create mode 100644 test/replication/gh-4928-tx-boundaries.result
>   create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua

Sorry for the mess, the correct patch is below:


  src/box/relay.cc                              |  98 ++++++++++---
  test/replication/gh-4928-tx-boundaries.result | 138 ++++++++++++++++++
  .../gh-4928-tx-boundaries.test.lua            |  61 ++++++++
  test/replication/suite.cfg                    |   1 +
  4 files changed, 281 insertions(+), 17 deletions(-)
  create mode 100644 test/replication/gh-4928-tx-boundaries.result
  create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2ad02cb8a..17b7bc667 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -177,7 +177,7 @@ relay_send(struct relay *relay, struct xrow_header 
*packet);
  static void
  relay_send_initial_join_row(struct xstream *stream, struct xrow_header 
*row);
  static void
-relay_send_row(struct xstream *stream, struct xrow_header *row);
+relay_collect_tx(struct xstream *stream, struct xrow_header *row);

  struct relay *
  relay_new(struct replica *replica)
@@ -355,7 +355,7 @@ relay_final_join(int fd, uint64_t sync, struct 
vclock *start_vclock,
      if (relay == NULL)
          diag_raise();

-    relay_start(relay, fd, sync, relay_send_row);
+    relay_start(relay, fd, sync, relay_collect_tx);
      auto relay_guard = make_scoped_guard([=] {
          relay_stop(relay);
          relay_delete(relay);
@@ -705,7 +705,7 @@ relay_subscribe(struct replica *replica, int fd, 
uint64_t sync,
              diag_raise();
      }

-    relay_start(relay, fd, sync, relay_send_row);
+    relay_start(relay, fd, sync, relay_collect_tx);
      auto relay_guard = make_scoped_guard([=] {
          relay_stop(relay);
          replica_on_relay_stop(replica);
@@ -754,11 +754,39 @@ relay_send_initial_join_row(struct xstream 
*stream, struct xrow_header *row)
          relay_send(relay, row);
  }

-/** Send a single row to the client. */
+struct relay_tx_row {
+    struct rlist link;
+    struct xrow_header row;
+};
+
  static void
-relay_send_row(struct xstream *stream, struct xrow_header *packet)
+relay_send_tx(struct relay *relay, struct rlist *tx_rows)
+{
+    ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
+
+    struct relay_tx_row *tx_row;
+    rlist_foreach_entry(tx_row, tx_rows, link) {
+        tx_row->row.sync = relay->sync;
+        coio_write_xrow(&relay->io, &tx_row->row);
+    }
+    relay->last_row_time = ev_monotonic_now(loop());
+
+    struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+    if (inj != NULL && inj->dparam > 0)
+        fiber_sleep(inj->dparam);
+}
+
+/**
+ * Collect all the rows belonging to a single transaction and
+ * send them at once.
+ */
+static void
+relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
  {
      struct relay *relay = container_of(stream, struct relay, stream);
+    static RLIST_HEAD(tx_rows);
+    struct errinj *inj;
+    struct relay_tx_row *tx_row;
      assert(iproto_type_is_dml(packet->type));
      if (packet->group_id == GROUP_LOCAL) {
          /*
@@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct 
xrow_header *packet)
           * order to correctly promote the vclock on the
           * replica.
           */
-        if (packet->replica_id == REPLICA_ID_NIL)
+        if (packet->replica_id == REPLICA_ID_NIL) {
+            /*
+             * Make sure is_commit flag from the
+             * local row makes it to the replica,
+             * in case the transaction is not fully
+             * local.
+             */
+            if (packet->is_commit && !rlist_empty(&tx_rows)) {
+                rlist_last_entry(&tx_rows, struct relay_tx_row,
+                         link)->row.is_commit = true;
+                goto write;
+            }
              return;
+        }
          packet->type = IPROTO_NOP;
          packet->group_id = GROUP_DEFAULT;
          packet->bodycnt = 0;
@@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct 
xrow_header *packet)
       * it). In the latter case packet's LSN is less than or equal to
       * local master's LSN at the moment it received 'SUBSCRIBE' request.
       */
-    if (relay->replica == NULL ||
-        packet->replica_id != relay->replica->id ||
-        packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
-                      packet->replica_id)) {
-        struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
-                        ERRINJ_INT);
-        if (inj != NULL && packet->lsn == inj->iparam) {
-            packet->lsn = inj->iparam - 1;
-            say_warn("injected broken lsn: %lld",
-                 (long long) packet->lsn);
-        }
+    if (relay->replica != NULL && packet->replica_id == 
relay->replica->id &&
+        packet->lsn > vclock_get(&relay->local_vclock_at_subscribe,
+                     packet->replica_id)) {
+        return;
+    }
+
+    inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT);
+    if (inj != NULL && packet->lsn == inj->iparam) {
+        packet->lsn = inj->iparam - 1;
+        say_warn("injected broken lsn: %lld",
+             (long long) packet->lsn);
+    }
+
+    /* A short path for single-statement transactions. */
+    if (packet->is_commit && rlist_empty(&tx_rows)) {
          relay_send(relay, packet);
+        return;
+    }
+
+    tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc,
+                             sizeof(*tx_row));
+    if (tx_row == NULL) {
+        tnt_raise(OutOfMemory, sizeof(*tx_row), "region",
+              "struct relay_tx_row");
+    }
+    tx_row->row = *packet;
+    rlist_add_tail_entry(&tx_rows, tx_row, link);
+
+    if (packet->is_commit) {
+write:        relay_send_tx(relay, &tx_rows);
+        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
+        /*
+         * Free all the relay_tx_rows allocated on the
+         * fiber region.
+         */
+        fiber_gc();
      }
  }
diff --git a/test/replication/gh-4928-tx-boundaries.result 
b/test/replication/gh-4928-tx-boundaries.result
new file mode 100644
index 000000000..969bd8438
--- /dev/null
+++ b/test/replication/gh-4928-tx-boundaries.result
@@ -0,0 +1,138 @@
+-- test-run result file version 2
+-- gh-4928. Test that transactions mixing local and global
+-- space operations are replicated correctly.
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+bit = require('bit')
+ | ---
+ | ...
+
+-- Init.
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+_ = box.schema.space.create('glob')
+ | ---
+ | ...
+_ = box.schema.space.create('loc', {is_local=true})
+ | ---
+ | ...
+_ = box.space.glob:create_index('pk')
+ | ---
+ | ...
+_ = box.space.loc:create_index('pk')
+ | ---
+ | ...
+
+function gen_mixed_tx(i)\
+    box.begin()\
+    if bit.band(i, 1) ~= 0 then\
+        box.space.glob:insert{10 * i + 1}\
+    else\
+        box.space.loc:insert{10 * i + 1}\
+    end\
+    if bit.band(i, 2) ~= 0 then\
+        box.space.glob:insert{10 * i + 2}\
+    else\
+        box.space.loc:insert{10 * i + 2}\
+    end\
+    if bit.band(i, 4) ~= 0 then\
+        box.space.glob:insert{10 * i + 3}\
+    else\
+        box.space.loc:insert{10 * i + 3}\
+    end\
+    box.commit()\
+end
+ | ---
+ | ...
+
+test_run:cmd("create server replica with rpl_master=default,\
+             script='replication/replica.lua'")
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+test_run:wait_downstream(2, {status='follow'})
+ | ---
+ | - true
+ | ...
+
+for i = 0, 7 do gen_mixed_tx(i) end
+ | ---
+ | ...
+
+box.info.replication[2].status
+ | ---
+ | - null
+ | ...
+
+vclock = box.info.vclock
+ | ---
+ | ...
+vclock[0] = nil
+ | ---
+ | ...
+test_run:wait_vclock("replica", vclock)
+ | ---
+ | ...
+
+test_run:cmd('switch replica')
+ | ---
+ | - true
+ | ...
+
+box.info.status
+ | ---
+ | - running
+ | ...
+box.info.replication[1].upstream.status
+ | ---
+ | - follow
+ | ...
+
+box.space.glob:select{}
+ | ---
+ | - - [11]
+ |   - [22]
+ |   - [31]
+ |   - [32]
+ |   - [43]
+ |   - [51]
+ |   - [53]
+ |   - [62]
+ |   - [63]
+ |   - [71]
+ |   - [72]
+ |   - [73]
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+
+-- Cleanup.
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+box.space.loc:drop()
+ | ---
+ | ...
+box.space.glob:drop()
+ | ---
+ | ...
diff --git a/test/replication/gh-4928-tx-boundaries.test.lua 
b/test/replication/gh-4928-tx-boundaries.test.lua
new file mode 100644
index 000000000..92526fc51
--- /dev/null
+++ b/test/replication/gh-4928-tx-boundaries.test.lua
@@ -0,0 +1,61 @@
+-- gh-4928. Test that transactions mixing local and global
+-- space operations are replicated correctly.
+env = require('test_run')
+test_run = env.new()
+bit = require('bit')
+
+-- Init.
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('glob')
+_ = box.schema.space.create('loc', {is_local=true})
+_ = box.space.glob:create_index('pk')
+_ = box.space.loc:create_index('pk')
+
+function gen_mixed_tx(i)\
+    box.begin()\
+    if bit.band(i, 1) ~= 0 then\
+        box.space.glob:insert{10 * i + 1}\
+    else\
+        box.space.loc:insert{10 * i + 1}\
+    end\
+    if bit.band(i, 2) ~= 0 then\
+        box.space.glob:insert{10 * i + 2}\
+    else\
+        box.space.loc:insert{10 * i + 2}\
+    end\
+    if bit.band(i, 4) ~= 0 then\
+        box.space.glob:insert{10 * i + 3}\
+    else\
+        box.space.loc:insert{10 * i + 3}\
+    end\
+    box.commit()\
+end
+
+test_run:cmd("create server replica with rpl_master=default,\
+             script='replication/replica.lua'")
+test_run:cmd('start server replica')
+test_run:wait_downstream(2, {status='follow'})
+
+for i = 0, 7 do gen_mixed_tx(i) end
+
+box.info.replication[2].status
+
+vclock = box.info.vclock
+vclock[0] = nil
+test_run:wait_vclock("replica", vclock)
+
+test_run:cmd('switch replica')
+
+box.info.status
+box.info.replication[1].upstream.status
+
+box.space.glob:select{}
+
+test_run:cmd('switch default')
+
+-- Cleanup.
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.schema.user.revoke('guest', 'replication')
+box.space.loc:drop()
+box.space.glob:drop()
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index d2743b5ed..f357b07da 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -18,6 +18,7 @@
      "gh-4606-admin-creds.test.lua": {},
      "gh-4739-vclock-assert.test.lua": {},
      "gh-4730-applier-rollback.test.lua": {},
+    "gh-4928-tx-boundaries.test.lua": {},
      "*": {
          "memtx": {"engine": "memtx"},
          "vinyl": {"engine": "vinyl"}
-- 
2.24.2 (Apple Git-127)

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries
  2020-05-18 12:24 ` [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries Serge Petrenko
@ 2020-05-19  9:08   ` Cyrill Gorcunov
  2020-05-19 10:30     ` Serge Petrenko
  0 siblings, 1 reply; 11+ messages in thread
From: Cyrill Gorcunov @ 2020-05-19  9:08 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy

On Mon, May 18, 2020 at 03:24:04PM +0300, Serge Petrenko wrote:
...
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -956,24 +956,33 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>  	       struct xrow_header **end)
>  {
>  	int64_t tsn = 0;
> +	struct xrow_header **start = row;
> +	struct xrow_header **first_glob_row = end;
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
>  		if ((*row)->replica_id == 0) {
>  			/*
>  			 * All rows representing local space data
> -			 * manipulations are signed wth a zero
> +			 * manipulations are signed with a zero
>  			 * instance id. This is also true for
>  			 * anonymous replicas, since they are
>  			 * only capable of writing to local and
>  			 * temporary spaces.
>  			 */
> -			if ((*row)->group_id != GROUP_LOCAL)
> +			if ((*row)->group_id != GROUP_LOCAL) {
>  				(*row)->replica_id = instance_id;
> +			}
>  
>  			(*row)->lsn = vclock_inc(vclock_diff, (*row)->replica_id) +
>  				      vclock_get(base, (*row)->replica_id);
> -			/* Use lsn of the first local row as transaction id. */
> -			tsn = tsn == 0 ? (*row)->lsn : tsn;
> +			/*
> +			 * Use lsn of the first global row as
> +			 * transaction id.
> +			 */
> +			if ((*row)->group_id != GROUP_LOCAL && tsn == 0) {
> +				tsn = (*row)->lsn;
> +				first_glob_row = row;
> +			}
>  			(*row)->tsn = tsn;

1) ^^^

>  			(*row)->is_commit = row == end - 1;
>  		} else {
> @@ -993,6 +1002,15 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>  			}
>  		}
>  	}
> +	if (tsn == 0)
> +		tsn = (*start)->lsn;
> +	/*
> +	 * Fill transaction id for all the local rows preceding
> +	 * the first global row. tsn was yet unknown when those
> +	 * rows were processed.
> +	 */
> +	for (row = start; row < first_glob_row; row++)
> +		(*row)->tsn = tsn;
>  }

Wait, the chunk above -- lets assume we've all rows in GROUP_LOCAL thus
in (1) we already set this member to 0 and now we re-walk the entries
again and assign lsn to the first row's lsn. This is ugly as hell,
if only I didn't miss something obvious.

Can't we do something like below to eliminate needless rewalk over
all rows? Do I make a mistake?
---
diff --git a/src/box/wal.c b/src/box/wal.c
index d36d4259e..ef4d84920 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -957,7 +957,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 {
 	int64_t tsn = 0;
 	struct xrow_header **start = row;
-	struct xrow_header **first_glob_row = end;
+	struct xrow_header **first_glob_row = row;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
@@ -981,9 +981,12 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 			 */
 			if ((*row)->group_id != GROUP_LOCAL && tsn == 0) {
 				tsn = (*row)->lsn;
+				/*
+				 * Remember the tail being processed.
+				 */
 				first_glob_row = row;
 			}
-			(*row)->tsn = tsn;
+			(*row)->tsn = tsn == 0 ? (*start)->lsn : tsn;
 			(*row)->is_commit = row == end - 1;
 		} else {
 			int64_t diff = (*row)->lsn - vclock_get(base, (*row)->replica_id);
@@ -1002,8 +1005,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 			}
 		}
 	}
-	if (tsn == 0)
-		tsn = (*start)->lsn;
+
 	/*
 	 * Fill transaction id for all the local rows preceding
 	 * the first global row. tsn was yet unknown when those

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
  2020-05-18 12:28   ` Serge Petrenko
@ 2020-05-19 10:23     ` Cyrill Gorcunov
  2020-05-19 10:49       ` Serge Petrenko
  0 siblings, 1 reply; 11+ messages in thread
From: Cyrill Gorcunov @ 2020-05-19 10:23 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy

On Mon, May 18, 2020 at 03:28:46PM +0300, Serge Petrenko wrote:
> 
> Sorry for the mess, the correct patch is below:

space/tabs are screwed (but since we're usually fetching from
the branch this is not critical)

...
> 
> -/** Send a single row to the client. */
> +struct relay_tx_row {
> +    struct rlist link;
> +    struct xrow_header row;
> +};
> +
>  static void
> -relay_send_row(struct xstream *stream, struct xrow_header *packet)
> +relay_send_tx(struct relay *relay, struct rlist *tx_rows)
> +{
> +    ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
> +
> +    struct relay_tx_row *tx_row;
> +    rlist_foreach_entry(tx_row, tx_rows, link) {
> +        tx_row->row.sync = relay->sync;
> +        coio_write_xrow(&relay->io, &tx_row->row);
> +    }
> +    relay->last_row_time = ev_monotonic_now(loop());
> +
> +    struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
> +    if (inj != NULL && inj->dparam > 0)
> +        fiber_sleep(inj->dparam);
> +}
> +
> +/**
> + * Collect all the rows belonging to a single transaction and
> + * send them at once.
> + */
> +static void
> +relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>  {
>      struct relay *relay = container_of(stream, struct relay, stream);
> +    static RLIST_HEAD(tx_rows);
> +    struct errinj *inj;
> +    struct relay_tx_row *tx_row;
>      assert(iproto_type_is_dml(packet->type));
>      if (packet->group_id == GROUP_LOCAL) {
>          /*
> @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct
> xrow_header *packet)
>           * order to correctly promote the vclock on the
>           * replica.
>           */
> -        if (packet->replica_id == REPLICA_ID_NIL)
> +        if (packet->replica_id == REPLICA_ID_NIL) {
> +            /*
> +             * Make sure is_commit flag from the
> +             * local row makes it to the replica,
> +             * in case the transaction is not fully
> +             * local.
> +             */
> +            if (packet->is_commit && !rlist_empty(&tx_rows)) {
> +                rlist_last_entry(&tx_rows, struct relay_tx_row,
> +                         link)->row.is_commit = true;
> +                goto write;
> +            }

I guees we can do simplier

		if (!packet->is_commit || rlist_empty(&tx_rows))
			return;

		rlist_last_entry(&tx_rows, struct relay_tx_row,
			link)->row.is_commit = true;
		goto write;
?

>              return;
> +        }
>          packet->type = IPROTO_NOP;
>          packet->group_id = GROUP_DEFAULT;
>          packet->bodycnt = 0;
> @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct
> xrow_header *packet)
>       * it). In the latter case packet's LSN is less than or equal to
>       * local master's LSN at the moment it received 'SUBSCRIBE' request.
>       */
> -    if (relay->replica == NULL ||
> -        packet->replica_id != relay->replica->id ||
> -        packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
> -                      packet->replica_id)) {
> -        struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
> -                        ERRINJ_INT);
> -        if (inj != NULL && packet->lsn == inj->iparam) {
> -            packet->lsn = inj->iparam - 1;
> -            say_warn("injected broken lsn: %lld",
> -                 (long long) packet->lsn);
> -        }
> +    if (relay->replica != NULL && packet->replica_id == relay->replica->id
> &&
> +        packet->lsn > vclock_get(&relay->local_vclock_at_subscribe,
> +                     packet->replica_id)) {
> +        return;
> +    }
> +
> +    inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT);
> +    if (inj != NULL && packet->lsn == inj->iparam) {
> +        packet->lsn = inj->iparam - 1;
> +        say_warn("injected broken lsn: %lld",
> +             (long long) packet->lsn);
> +    }
> +
> +    /* A short path for single-statement transactions. */
> +    if (packet->is_commit && rlist_empty(&tx_rows)) {
>          relay_send(relay, packet);
> +        return;
> +    }
> +
> +    tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc,
> +                             sizeof(*tx_row));
> +    if (tx_row == NULL) {
> +        tnt_raise(OutOfMemory, sizeof(*tx_row), "region",
> +              "struct relay_tx_row");
> +    }
> +    tx_row->row = *packet;
> +    rlist_add_tail_entry(&tx_rows, tx_row, link);
> +
> +    if (packet->is_commit) {
> +write:        relay_send_tx(relay, &tx_rows);
> +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);

Why?! This is stack local variable, no?

> +        /*
> +         * Free all the relay_tx_rows allocated on the
> +         * fiber region.
> +         */
> +        fiber_gc();
>      }
>  }

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries
  2020-05-19  9:08   ` Cyrill Gorcunov
@ 2020-05-19 10:30     ` Serge Petrenko
  0 siblings, 0 replies; 11+ messages in thread
From: Serge Petrenko @ 2020-05-19 10:30 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tarantool-patches, v.shpilevoy


19.05.2020 12:08, Cyrill Gorcunov пишет:
> On Mon, May 18, 2020 at 03:24:04PM +0300, Serge Petrenko wrote:
> ...
>> --- a/src/box/wal.c
>> +++ b/src/box/wal.c
>> @@ -956,24 +956,33 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>>   	       struct xrow_header **end)
>>   {
>>   	int64_t tsn = 0;
>> +	struct xrow_header **start = row;
>> +	struct xrow_header **first_glob_row = end;
>>   	/** Assign LSN to all local rows. */
>>   	for ( ; row < end; row++) {
>>   		if ((*row)->replica_id == 0) {
>>   			/*
>>   			 * All rows representing local space data
>> -			 * manipulations are signed wth a zero
>> +			 * manipulations are signed with a zero
>>   			 * instance id. This is also true for
>>   			 * anonymous replicas, since they are
>>   			 * only capable of writing to local and
>>   			 * temporary spaces.
>>   			 */
>> -			if ((*row)->group_id != GROUP_LOCAL)
>> +			if ((*row)->group_id != GROUP_LOCAL) {
>>   				(*row)->replica_id = instance_id;
>> +			}
>>   
>>   			(*row)->lsn = vclock_inc(vclock_diff, (*row)->replica_id) +
>>   				      vclock_get(base, (*row)->replica_id);
>> -			/* Use lsn of the first local row as transaction id. */
>> -			tsn = tsn == 0 ? (*row)->lsn : tsn;
>> +			/*
>> +			 * Use lsn of the first global row as
>> +			 * transaction id.
>> +			 */
>> +			if ((*row)->group_id != GROUP_LOCAL && tsn == 0) {
>> +				tsn = (*row)->lsn;
>> +				first_glob_row = row;
>> +			}
>>   			(*row)->tsn = tsn;
> 1) ^^^
>
>>   			(*row)->is_commit = row == end - 1;
>>   		} else {
>> @@ -993,6 +1002,15 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>>   			}
>>   		}
>>   	}
>> +	if (tsn == 0)
>> +		tsn = (*start)->lsn;
>> +	/*
>> +	 * Fill transaction id for all the local rows preceding
>> +	 * the first global row. tsn was yet unknown when those
>> +	 * rows were processed.
>> +	 */
>> +	for (row = start; row < first_glob_row; row++)
>> +		(*row)->tsn = tsn;
>>   }
> Wait, the chunk above -- lets assume we've all rows in GROUP_LOCAL thus
> in (1) we already set this member to 0 and now we re-walk the entries
> again and assign lsn to the first row's lsn. This is ugly as hell,
> if only I didn't miss something obvious.
>
> Can't we do something like below to eliminate needless rewalk over
> all rows? Do I make a mistake?
> ---
> diff --git a/src/box/wal.c b/src/box/wal.c
> index d36d4259e..ef4d84920 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -957,7 +957,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>   {
>   	int64_t tsn = 0;
>   	struct xrow_header **start = row;
> -	struct xrow_header **first_glob_row = end;
> +	struct xrow_header **first_glob_row = row;
>   	/** Assign LSN to all local rows. */
>   	for ( ; row < end; row++) {
>   		if ((*row)->replica_id == 0) {
> @@ -981,9 +981,12 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>   			 */
>   			if ((*row)->group_id != GROUP_LOCAL && tsn == 0) {
>   				tsn = (*row)->lsn;
> +				/*
> +				 * Remember the tail being processed.
> +				 */
>   				first_glob_row = row;
>   			}
> -			(*row)->tsn = tsn;
> +			(*row)->tsn = tsn == 0 ? (*start)->lsn : tsn;
>   			(*row)->is_commit = row == end - 1;
>   		} else {
>   			int64_t diff = (*row)->lsn - vclock_get(base, (*row)->replica_id);
> @@ -1002,8 +1005,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>   			}
>   		}
>   	}
> -	if (tsn == 0)
> -		tsn = (*start)->lsn;
> +
>   	/*
>   	 * Fill transaction id for all the local rows preceding
>   	 * the first global row. tsn was yet unknown when those


Hi! Thanks for the review!

You're right, I applied your diff.

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
  2020-05-19 10:23     ` Cyrill Gorcunov
@ 2020-05-19 10:49       ` Serge Petrenko
  2020-05-19 11:18         ` Cyrill Gorcunov
  0 siblings, 1 reply; 11+ messages in thread
From: Serge Petrenko @ 2020-05-19 10:49 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tarantool-patches, v.shpilevoy


19.05.2020 13:23, Cyrill Gorcunov пишет:
> On Mon, May 18, 2020 at 03:28:46PM +0300, Serge Petrenko wrote:


Hi! Thanks for the review!

>> Sorry for the mess, the correct patch is below:
> space/tabs are screwed (but since we're usually fetching from
> the branch this is not critical)
>
> ...
Yep, that's cos I pasted it from the console, sorry.
>> -/** Send a single row to the client. */
>> +struct relay_tx_row {
>> +    struct rlist link;
>> +    struct xrow_header row;
>> +};
>> +
>>   static void
>> -relay_send_row(struct xstream *stream, struct xrow_header *packet)
>> +relay_send_tx(struct relay *relay, struct rlist *tx_rows)
>> +{
>> +    ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
>> +
>> +    struct relay_tx_row *tx_row;
>> +    rlist_foreach_entry(tx_row, tx_rows, link) {
>> +        tx_row->row.sync = relay->sync;
>> +        coio_write_xrow(&relay->io, &tx_row->row);
>> +    }
>> +    relay->last_row_time = ev_monotonic_now(loop());
>> +
>> +    struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
>> +    if (inj != NULL && inj->dparam > 0)
>> +        fiber_sleep(inj->dparam);
>> +}
>> +
>> +/**
>> + * Collect all the rows belonging to a single transaction and
>> + * send them at once.
>> + */
>> +static void
>> +relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>>   {
>>       struct relay *relay = container_of(stream, struct relay, stream);
>> +    static RLIST_HEAD(tx_rows);
>> +    struct errinj *inj;
>> +    struct relay_tx_row *tx_row;
>>       assert(iproto_type_is_dml(packet->type));
>>       if (packet->group_id == GROUP_LOCAL) {
>>           /*
>> @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct
>> xrow_header *packet)
>>            * order to correctly promote the vclock on the
>>            * replica.
>>            */
>> -        if (packet->replica_id == REPLICA_ID_NIL)
>> +        if (packet->replica_id == REPLICA_ID_NIL) {
>> +            /*
>> +             * Make sure is_commit flag from the
>> +             * local row makes it to the replica,
>> +             * in case the transaction is not fully
>> +             * local.
>> +             */
>> +            if (packet->is_commit && !rlist_empty(&tx_rows)) {
>> +                rlist_last_entry(&tx_rows, struct relay_tx_row,
>> +                         link)->row.is_commit = true;
>> +                goto write;
>> +            }
> I guees we can do simplier
>
> 		if (!packet->is_commit || rlist_empty(&tx_rows))
> 			return;
>
> 		rlist_last_entry(&tx_rows, struct relay_tx_row,
> 			link)->row.is_commit = true;
> 		goto write;
> ?
Yes, we can, thanks. I applied your diff.
>>               return;
>> +        }
>>           packet->type = IPROTO_NOP;
>>           packet->group_id = GROUP_DEFAULT;
>>           packet->bodycnt = 0;
>> @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct
>> xrow_header *packet)
>>        * it). In the latter case packet's LSN is less than or equal to
>>        * local master's LSN at the moment it received 'SUBSCRIBE' request.
>>        */
>> -    if (relay->replica == NULL ||
>> -        packet->replica_id != relay->replica->id ||
>> -        packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
>> -                      packet->replica_id)) {
>> -        struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
>> -                        ERRINJ_INT);
>> -        if (inj != NULL && packet->lsn == inj->iparam) {
>> -            packet->lsn = inj->iparam - 1;
>> -            say_warn("injected broken lsn: %lld",
>> -                 (long long) packet->lsn);
>> -        }
>> +    if (relay->replica != NULL && packet->replica_id == relay->replica->id
>> &&
>> +        packet->lsn > vclock_get(&relay->local_vclock_at_subscribe,
>> +                     packet->replica_id)) {
>> +        return;
>> +    }
>> +
>> +    inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT);
>> +    if (inj != NULL && packet->lsn == inj->iparam) {
>> +        packet->lsn = inj->iparam - 1;
>> +        say_warn("injected broken lsn: %lld",
>> +             (long long) packet->lsn);
>> +    }
>> +
>> +    /* A short path for single-statement transactions. */
>> +    if (packet->is_commit && rlist_empty(&tx_rows)) {
>>           relay_send(relay, packet);
>> +        return;
>> +    }
>> +
>> +    tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc,
>> +                             sizeof(*tx_row));
>> +    if (tx_row == NULL) {
>> +        tnt_raise(OutOfMemory, sizeof(*tx_row), "region",
>> +              "struct relay_tx_row");
>> +    }
>> +    tx_row->row = *packet;
>> +    rlist_add_tail_entry(&tx_rows, tx_row, link);
>> +
>> +    if (packet->is_commit) {
>> +write:        relay_send_tx(relay, &tx_rows);
>> +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
> Why?! This is stack local variable, no?
Why? It's static.
>
>> +        /*
>> +         * Free all the relay_tx_rows allocated on the
>> +         * fiber region.
>> +         */
>> +        fiber_gc();
>>       }
>>   }

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
  2020-05-19 10:49       ` Serge Petrenko
@ 2020-05-19 11:18         ` Cyrill Gorcunov
  2020-05-19 12:31           ` Serge Petrenko
  0 siblings, 1 reply; 11+ messages in thread
From: Cyrill Gorcunov @ 2020-05-19 11:18 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy

On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote:
...
> > > +    if (packet->is_commit) {
> > > +write:        relay_send_tx(relay, &tx_rows);
> > > +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
> > Why?! This is stack local variable, no?
> Why? It's static.

Yup, just found out ;) It is so hidden in the other vars
declaration so was not obvious (a least for me :-). I'm
appending the more clear approach.

But Serge, this is my personal opinion, I'm fine with your
current code as well, so

Reviewed-by: Cyrill Gorcunov <gorcunov@gmail.com>

it is up to you to pick or drop the diff below
---
 src/box/relay.cc | 37 ++++++++++++++++++++++---------------
 1 file changed, 22 insertions(+), 15 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 17b7bc667..27424e0ca 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -760,12 +760,12 @@ struct relay_tx_row {
 };
 
 static void
-relay_send_tx(struct relay *relay, struct rlist *tx_rows)
+relay_send_tx(struct relay *relay, struct rlist *head)
 {
 	ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
 
 	struct relay_tx_row *tx_row;
-	rlist_foreach_entry(tx_row, tx_rows, link) {
+	rlist_foreach_entry(tx_row, head, link) {
 		tx_row->row.sync = relay->sync;
 		coio_write_xrow(&relay->io, &tx_row->row);
 	}
@@ -784,10 +784,13 @@ static void
 relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	static RLIST_HEAD(tx_rows);
-	struct errinj *inj;
 	struct relay_tx_row *tx_row;
+	struct errinj *inj;
+
+	static RLIST_HEAD(tx_rows_list);
+
 	assert(iproto_type_is_dml(packet->type));
+
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
 		 * We do not relay replica-local rows to other
@@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
 			 * in case the transaction is not fully
 			 * local.
 			 */
-			if (packet->is_commit && !rlist_empty(&tx_rows)) {
-				rlist_last_entry(&tx_rows, struct relay_tx_row,
-						 link)->row.is_commit = true;
-				goto write;
-			}
-			return;
+			if (!packet->is_commit || rlist_empty(&tx_rows_list))
+				return;
+
+			rlist_last_entry(&tx_rows_list, struct relay_tx_row,
+					 link)->row.is_commit = true;
+			goto write;
 		}
 		packet->type = IPROTO_NOP;
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
 	}
+
 	/* Check if the rows from the instance are filtered. */
-	if ((1 << packet->replica_id & relay->id_filter) != 0)
+	if (((1u << packet->replica_id) & relay->id_filter) != 0)
 		return;
+
 	/*
 	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
 	 * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
@@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
 	}
 
 	/* A short path for single-statement transactions. */
-	if (packet->is_commit && rlist_empty(&tx_rows)) {
+	if (packet->is_commit && rlist_empty(&tx_rows_list)) {
 		relay_send(relay, packet);
 		return;
 	}
@@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
 			  "struct relay_tx_row");
 	}
 	tx_row->row = *packet;
-	rlist_add_tail_entry(&tx_rows, tx_row, link);
+	rlist_add_tail_entry(&tx_rows_list, tx_row, link);
 
 	if (packet->is_commit) {
-write:		relay_send_tx(relay, &tx_rows);
-		tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
+write:
+		relay_send_tx(relay, &tx_rows_list);
+		tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list);
+
 		/*
 		 * Free all the relay_tx_rows allocated on the
 		 * fiber region.
-- 
2.26.2

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
  2020-05-19 11:18         ` Cyrill Gorcunov
@ 2020-05-19 12:31           ` Serge Petrenko
  2020-05-19 16:24             ` Serge Petrenko
  0 siblings, 1 reply; 11+ messages in thread
From: Serge Petrenko @ 2020-05-19 12:31 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tarantool-patches, v.shpilevoy


19.05.2020 14:18, Cyrill Gorcunov пишет:
> On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote:
> ...
>>>> +    if (packet->is_commit) {
>>>> +write:        relay_send_tx(relay, &tx_rows);
>>>> +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
>>> Why?! This is stack local variable, no?
>> Why? It's static.
> Yup, just found out ;) It is so hidden in the other vars
> declaration so was not obvious (a least for me :-). I'm
> appending the more clear approach.
>
> But Serge, this is my personal opinion, I'm fine with your
> current code as well, so
>
> Reviewed-by: Cyrill Gorcunov <gorcunov@gmail.com>
>
> it is up to you to pick or drop the diff below

Thanks! Looks good, applied.

> ---
>   src/box/relay.cc | 37 ++++++++++++++++++++++---------------
>   1 file changed, 22 insertions(+), 15 deletions(-)
>
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 17b7bc667..27424e0ca 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -760,12 +760,12 @@ struct relay_tx_row {
>   };
>   
>   static void
> -relay_send_tx(struct relay *relay, struct rlist *tx_rows)
> +relay_send_tx(struct relay *relay, struct rlist *head)
>   {
>   	ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
>   
>   	struct relay_tx_row *tx_row;
> -	rlist_foreach_entry(tx_row, tx_rows, link) {
> +	rlist_foreach_entry(tx_row, head, link) {
>   		tx_row->row.sync = relay->sync;
>   		coio_write_xrow(&relay->io, &tx_row->row);
>   	}
> @@ -784,10 +784,13 @@ static void
>   relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>   {
>   	struct relay *relay = container_of(stream, struct relay, stream);
> -	static RLIST_HEAD(tx_rows);
> -	struct errinj *inj;
>   	struct relay_tx_row *tx_row;
> +	struct errinj *inj;
> +
> +	static RLIST_HEAD(tx_rows_list);
> +
>   	assert(iproto_type_is_dml(packet->type));
> +
>   	if (packet->group_id == GROUP_LOCAL) {
>   		/*
>   		 * We do not relay replica-local rows to other
> @@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>   			 * in case the transaction is not fully
>   			 * local.
>   			 */
> -			if (packet->is_commit && !rlist_empty(&tx_rows)) {
> -				rlist_last_entry(&tx_rows, struct relay_tx_row,
> -						 link)->row.is_commit = true;
> -				goto write;
> -			}
> -			return;
> +			if (!packet->is_commit || rlist_empty(&tx_rows_list))
> +				return;
> +
> +			rlist_last_entry(&tx_rows_list, struct relay_tx_row,
> +					 link)->row.is_commit = true;
> +			goto write;
>   		}
>   		packet->type = IPROTO_NOP;
>   		packet->group_id = GROUP_DEFAULT;
>   		packet->bodycnt = 0;
>   	}
> +
>   	/* Check if the rows from the instance are filtered. */
> -	if ((1 << packet->replica_id & relay->id_filter) != 0)
> +	if (((1u << packet->replica_id) & relay->id_filter) != 0)
>   		return;
> +
>   	/*
>   	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
>   	 * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
> @@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>   	}
>   
>   	/* A short path for single-statement transactions. */
> -	if (packet->is_commit && rlist_empty(&tx_rows)) {
> +	if (packet->is_commit && rlist_empty(&tx_rows_list)) {
>   		relay_send(relay, packet);
>   		return;
>   	}
> @@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>   			  "struct relay_tx_row");
>   	}
>   	tx_row->row = *packet;
> -	rlist_add_tail_entry(&tx_rows, tx_row, link);
> +	rlist_add_tail_entry(&tx_rows_list, tx_row, link);
>   
>   	if (packet->is_commit) {
> -write:		relay_send_tx(relay, &tx_rows);
> -		tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
> +write:
> +		relay_send_tx(relay, &tx_rows_list);
> +		tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list);
> +
>   		/*
>   		 * Free all the relay_tx_rows allocated on the
>   		 * fiber region.

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
  2020-05-19 12:31           ` Serge Petrenko
@ 2020-05-19 16:24             ` Serge Petrenko
  0 siblings, 0 replies; 11+ messages in thread
From: Serge Petrenko @ 2020-05-19 16:24 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tarantool-patches, v.shpilevoy


19.05.2020 15:31, Serge Petrenko пишет:
>
> 19.05.2020 14:18, Cyrill Gorcunov пишет:
>> On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote:
>> ...
>>>>> +    if (packet->is_commit) {
>>>>> +write:        relay_send_tx(relay, &tx_rows);
>>>>> +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
>>>> Why?! This is stack local variable, no?
>>> Why? It's static.
>> Yup, just found out ;) It is so hidden in the other vars
>> declaration so was not obvious (a least for me :-). I'm
>> appending the more clear approach.
>>
>> But Serge, this is my personal opinion, I'm fine with your
>> current code as well, so
>>
>> Reviewed-by: Cyrill Gorcunov <gorcunov@gmail.com>
>>
>> it is up to you to pick or drop the diff below
>
> Thanks! Looks good, applied.
>
>> ---
>>   src/box/relay.cc | 37 ++++++++++++++++++++++---------------
>>   1 file changed, 22 insertions(+), 15 deletions(-)
>>
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 17b7bc667..27424e0ca 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -760,12 +760,12 @@ struct relay_tx_row {
>>   };
>>     static void
>> -relay_send_tx(struct relay *relay, struct rlist *tx_rows)
>> +relay_send_tx(struct relay *relay, struct rlist *head)
>>   {
>>       ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
>>         struct relay_tx_row *tx_row;
>> -    rlist_foreach_entry(tx_row, tx_rows, link) {
>> +    rlist_foreach_entry(tx_row, head, link) {
>>           tx_row->row.sync = relay->sync;
>>           coio_write_xrow(&relay->io, &tx_row->row);
>>       }
>> @@ -784,10 +784,13 @@ static void
>>   relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>>   {
>>       struct relay *relay = container_of(stream, struct relay, stream);
>> -    static RLIST_HEAD(tx_rows);
>> -    struct errinj *inj;
>>       struct relay_tx_row *tx_row;
>> +    struct errinj *inj;
>> +
>> +    static RLIST_HEAD(tx_rows_list);
>> +
>>       assert(iproto_type_is_dml(packet->type));
>> +
>>       if (packet->group_id == GROUP_LOCAL) {
>>           /*
>>            * We do not relay replica-local rows to other
>> @@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct 
>> xrow_header *packet)
>>                * in case the transaction is not fully
>>                * local.
>>                */
>> -            if (packet->is_commit && !rlist_empty(&tx_rows)) {
>> -                rlist_last_entry(&tx_rows, struct relay_tx_row,
>> -                         link)->row.is_commit = true;
>> -                goto write;
>> -            }
>> -            return;
>> +            if (!packet->is_commit || rlist_empty(&tx_rows_list))
>> +                return;
>> +
>> +            rlist_last_entry(&tx_rows_list, struct relay_tx_row,
>> +                     link)->row.is_commit = true;
>> +            goto write;
>>           }
>>           packet->type = IPROTO_NOP;
>>           packet->group_id = GROUP_DEFAULT;
>>           packet->bodycnt = 0;
>>       }
>> +
>>       /* Check if the rows from the instance are filtered. */
>> -    if ((1 << packet->replica_id & relay->id_filter) != 0)
>> +    if (((1u << packet->replica_id) & relay->id_filter) != 0)
>>           return;
>> +
>>       /*
>>        * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
>>        * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
>> @@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct 
>> xrow_header *packet)
>>       }
>>         /* A short path for single-statement transactions. */
>> -    if (packet->is_commit && rlist_empty(&tx_rows)) {
>> +    if (packet->is_commit && rlist_empty(&tx_rows_list)) {
>>           relay_send(relay, packet);
>>           return;
>>       }
>> @@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct 
>> xrow_header *packet)
>>                 "struct relay_tx_row");
>>       }
>>       tx_row->row = *packet;
>> -    rlist_add_tail_entry(&tx_rows, tx_row, link);
>> +    rlist_add_tail_entry(&tx_rows_list, tx_row, link);
>>         if (packet->is_commit) {
>> -write:        relay_send_tx(relay, &tx_rows);
>> -        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
>> +write:
>> +        relay_send_tx(relay, &tx_rows_list);
>> +        tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list);
>> +
>>           /*
>>            * Free all the relay_tx_rows allocated on the
>>            * fiber region.
>
This patch needs some more thought. I noticed test failures on GitLab:

https://gitlab.com/tarantool/tarantool/-/jobs/559102582

Looks like the problem is with row bodies, which are stored in an ibuf

owned by xlog_cursor. The cursor was designed to work in a row-by-row

manner, just like the recovery system, which calls stream_write() on each

recovered row. So the ibuf may be invalidated after recovery writes a single

row (i.e. after a single call to relay_collect_tx()).

Even though I copy the xrow_header itself, the header contains pointers 
to its

body which is stored in the xlog_cursor's ibuf, and it gets rewritten as 
soon as

ibuf_reserve_slow() is called.

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2020-05-19 16:24 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-05-18 12:24 [Tarantool-patches] [PATCH 0/2] fix replication tx boundaries after local space rework Serge Petrenko
2020-05-18 12:24 ` [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries Serge Petrenko
2020-05-19  9:08   ` Cyrill Gorcunov
2020-05-19 10:30     ` Serge Petrenko
2020-05-18 12:24 ` [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches Serge Petrenko
2020-05-18 12:28   ` Serge Petrenko
2020-05-19 10:23     ` Cyrill Gorcunov
2020-05-19 10:49       ` Serge Petrenko
2020-05-19 11:18         ` Cyrill Gorcunov
2020-05-19 12:31           ` Serge Petrenko
2020-05-19 16:24             ` Serge Petrenko

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox