Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final
@ 2021-03-24 12:24 Serge Petrenko via Tarantool-patches
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 1/7] replication: fix a hang on final join retry Serge Petrenko via Tarantool-patches
                   ` (9 more replies)
  0 siblings, 10 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-24 12:24 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

Initially this patches idea was to ignore synchronous rows on applier side and
make it so that there are no rolled back transactions in final join stream on
master side.

Unfortunately, this easy fix didn't work. The reason for it is that once the
replica receives initial data, up to `start_vclock`, the final join stream has
to start right at `start_vclock` so that we do not lose any transactions.

This means that once master encounters a synchro rollback and makes replica
retry final join to get rid of the rollback, it still has to send it together
with other data. And this rollback must be processed by applier to avoid
conflicts.

In order to let applier process synchro requests (CONFIRM and ROLLBACK) we need
to make final join transactional, obviously. This is what this patchset does.

An alternative would be to retry not only final, but also initial join every
time master receives a rollback during final join stage. This would be too
violent due to possibly huge data amounts being sent during initial join.

Changes in v2:
  - Make applier transactional on final join stage
  - Remove guards for rollback during final join on master side
  - Some refactoring in preparation to #5874

https://github.com/tarantool/tarantool/issues/5566
https://github.com/tarantool/tarantool/tree/sp/gh-5566-final-join-synchro-v2

Serge Petrenko (7):
  replication: fix a hang on final join retry
  applier: extract tx boundary checks from applier_read_tx into a
    separate routine
  applier: extract plain tx application from applier_apply_tx()
  applier: remove excess last_row_time update from subscribe loop
  applier: make final join transactional
  replication: tolerate synchro rollback during final join
  replication: do not ignore replica vclock on register

 changelogs/unreleased/synchro-final-join.md   |   4 +
 src/box/applier.cc                            | 461 +++++++++---------
 src/box/box.cc                                |  38 +-
 src/box/relay.cc                              |   1 +
 .../gh-5566-final-join-synchro.result         | 139 ++++++
 .../gh-5566-final-join-synchro.test.lua       |  61 +++
 test/replication/suite.cfg                    |   1 +
 7 files changed, 449 insertions(+), 256 deletions(-)
 create mode 100644 changelogs/unreleased/synchro-final-join.md
 create mode 100644 test/replication/gh-5566-final-join-synchro.result
 create mode 100644 test/replication/gh-5566-final-join-synchro.test.lua

-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 33+ messages in thread

* [Tarantool-patches] [PATCH v2 1/7] replication: fix a hang on final join retry
  2021-03-24 12:24 [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Serge Petrenko via Tarantool-patches
@ 2021-03-24 12:24 ` Serge Petrenko via Tarantool-patches
  2021-03-26 20:44   ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 2/7] applier: extract tx boundary checks from applier_read_tx into a separate routine Serge Petrenko via Tarantool-patches
                   ` (8 subsequent siblings)
  9 siblings, 1 reply; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-24 12:24 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

Since the introduction of synchronous replication it became possible for
final join to fail on master side due to not being able to gather acks
for some tx around _cluster registration.

A replica receives an error in this case: either ER_SYNC_ROLLBACK or
ER_SYNC_QUORUM_TIMEOUT. The errors lead to applier retrying final join,
but with wrong state, APPLIER_REGISTER, which should be used only on an
anonymous replica. This lead to a hang in fiber executing box.cfg,
because it waited for APPLIER_JOINED state, which was never entered.

Part-of #5566
---
 src/box/applier.cc | 29 ++++++++++++++++++++++-------
 1 file changed, 22 insertions(+), 7 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 5a88a013e..326cf18d2 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -551,7 +551,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
 }
 
 static void
-applier_register(struct applier *applier)
+applier_register(struct applier *applier, bool was_anon)
 {
 	/* Send REGISTER request */
 	struct ev_io *coio = &applier->io;
@@ -566,9 +566,16 @@ applier_register(struct applier *applier)
 	row.type = IPROTO_REGISTER;
 	coio_write_xrow(coio, &row);
 
-	applier_set_state(applier, APPLIER_REGISTER);
+	/*
+	 * Register may serve as a retry for final join. Set corresponding
+	 * states to unblock anyone who's waiting for final join to start or
+	 * end.
+	 */
+	applier_set_state(applier, was_anon ? APPLIER_REGISTER :
+					      APPLIER_FINAL_JOIN);
 	applier_wait_register(applier, 0);
-	applier_set_state(applier, APPLIER_REGISTERED);
+	applier_set_state(applier, was_anon ? APPLIER_REGISTERED :
+					      APPLIER_JOINED);
 	applier_set_state(applier, APPLIER_READY);
 }
 
@@ -1303,6 +1310,14 @@ applier_f(va_list ap)
 		return -1;
 	session_set_type(session, SESSION_TYPE_APPLIER);
 
+	/*
+	 * The instance saves replication_anon value on bootstrap.
+	 * If a freshly started instance sees it has received
+	 * REPLICASET_UUID but hasn't yet registered, it must be an
+	 * anonymous replica, hence the default value 'true'.
+	 */
+	bool was_anon = true;
+
 	/* Re-connect loop */
 	while (!fiber_is_cancelled()) {
 		try {
@@ -1316,6 +1331,7 @@ applier_f(va_list ap)
 				 * The join will pause the applier
 				 * until WAL is created.
 				 */
+				was_anon = replication_anon;
 				if (replication_anon)
 					applier_fetch_snapshot(applier);
 				else
@@ -1324,11 +1340,10 @@ applier_f(va_list ap)
 			if (instance_id == REPLICA_ID_NIL &&
 			    !replication_anon) {
 				/*
-				 * The instance transitioned
-				 * from anonymous. Register it
-				 * now.
+				 * The instance transitioned from anonymous or
+				 * is retrying final join.
 				 */
-				applier_register(applier);
+				applier_register(applier, was_anon);
 			}
 			applier_subscribe(applier);
 			/*
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 33+ messages in thread

* [Tarantool-patches] [PATCH v2 2/7] applier: extract tx boundary checks from applier_read_tx into a separate routine
  2021-03-24 12:24 [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Serge Petrenko via Tarantool-patches
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 1/7] replication: fix a hang on final join retry Serge Petrenko via Tarantool-patches
@ 2021-03-24 12:24 ` Serge Petrenko via Tarantool-patches
  2021-03-26 12:35   ` Cyrill Gorcunov via Tarantool-patches
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 3/7] applier: extract plain tx application from applier_apply_tx() Serge Petrenko via Tarantool-patches
                   ` (7 subsequent siblings)
  9 siblings, 1 reply; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-24 12:24 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

Introduce a new routine, set_next_tx_row(), which checks tx boundary
violation and appends the new row to the current tx in case everything
is ok.

set_next_tx_row() is extracted from applier_read_tx() because it's a
common part of transaction assembly both for recovery and applier.

The only difference for recovery will be that the routine which's
responsible for tx assembly won't read rows. It'll be a callback ran on
each new row being read from WAL.

Prerequisite #5874
Part-of #5566
---
 src/box/applier.cc | 117 +++++++++++++++++++++++----------------------
 1 file changed, 60 insertions(+), 57 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 326cf18d2..65afa5e98 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -657,6 +657,64 @@ applier_read_tx_row(struct applier *applier)
 	return tx_row;
 }
 
+static inline int64_t
+set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
+{
+	struct xrow_header *row = &tx_row->row;
+
+	if (iproto_type_is_error(row->type))
+		xrow_decode_error_xc(row);
+
+	/* Replication request. */
+	if (row->replica_id >= VCLOCK_MAX) {
+		/*
+		 * A safety net, this can only occur if we're fed a strangely
+		 * broken xlog. row->replica_id == 0, when reading heartbeats
+		 * from an anonymous instance.
+		 */
+		tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
+			  int2str(row->replica_id),
+			  tt_uuid_str(&REPLICASET_UUID));
+	}
+	if (tsn == 0) {
+		/*
+		 * Transaction id must be derived from the log sequence number
+		 * of the first row in the transaction.
+		 */
+		tsn = row->tsn;
+		if (row->lsn != tsn)
+			tnt_raise(ClientError, ER_PROTOCOL,
+				  "Transaction id must be equal to LSN of the "
+				  "first row in the transaction.");
+	} else if (tsn != row->tsn) {
+		tnt_raise(ClientError, ER_UNSUPPORTED, "replication",
+			  "interleaving transactions");
+	}
+
+	assert(row->bodycnt <= 1);
+	if (row->is_commit) {
+		/* Signal the caller that we've reached the tx end. */
+		tsn = 0;
+	} else if (row->bodycnt == 1) {
+		/*
+		 * Save row body to gc region. Not done for single-statement
+		 * transactions and the last row of multi-statement transactions
+		 * knowing that the input buffer will not be used while the
+		 * transaction is applied.
+		 */
+		void *new_base = region_alloc(&fiber()->gc, row->body->iov_len);
+		if (new_base == NULL)
+			tnt_raise(OutOfMemory, row->body->iov_len, "region",
+				  "xrow body");
+		memcpy(new_base, row->body->iov_base, row->body->iov_len);
+		/* Adjust row body pointers. */
+		row->body->iov_base = new_base;
+	}
+
+	stailq_add_tail(rows, &tx_row->next);
+	return tsn;
+}
+
 /**
  * Read one transaction from network using applier's input buffer.
  * Transaction rows are placed onto fiber gc region.
@@ -672,63 +730,8 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 	stailq_create(rows);
 	do {
 		struct applier_tx_row *tx_row = applier_read_tx_row(applier);
-		struct xrow_header *row = &tx_row->row;
-
-		if (iproto_type_is_error(row->type))
-			xrow_decode_error_xc(row);
-
-		/* Replication request. */
-		if (row->replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 * row->replica_id == 0, when reading
-			 * heartbeats from an anonymous instance.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row->replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
-		if (tsn == 0) {
-			/*
-			 * Transaction id must be derived from the log sequence
-			 * number of the first row in the transaction.
-			 */
-			tsn = row->tsn;
-			if (row->lsn != tsn)
-				tnt_raise(ClientError, ER_PROTOCOL,
-					  "Transaction id must be equal to "
-					  "LSN of the first row in the "
-					  "transaction.");
-		}
-		if (tsn != row->tsn)
-			tnt_raise(ClientError, ER_UNSUPPORTED,
-				  "replication",
-				  "interleaving transactions");
-
-		assert(row->bodycnt <= 1);
-		if (row->bodycnt == 1 && !row->is_commit) {
-			/*
-			 * Save row body to gc region.
-			 * Not done for single-statement
-			 * transactions knowing that the input
-			 * buffer will not be used while the
-			 * transaction is applied.
-			 */
-			void *new_base = region_alloc(&fiber()->gc,
-						      row->body->iov_len);
-			if (new_base == NULL)
-				tnt_raise(OutOfMemory, row->body->iov_len,
-					  "region", "xrow body");
-			memcpy(new_base, row->body->iov_base,
-			       row->body->iov_len);
-			/* Adjust row body pointers. */
-			row->body->iov_base = new_base;
-		}
-		stailq_add_tail(rows, &tx_row->next);
-
-	} while (!stailq_last_entry(rows, struct applier_tx_row,
-				    next)->row.is_commit);
+		tsn = set_next_tx_row(rows, tx_row, tsn);
+	} while (tsn != 0);
 }
 
 static void
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 33+ messages in thread

* [Tarantool-patches] [PATCH v2 3/7] applier: extract plain tx application from applier_apply_tx()
  2021-03-24 12:24 [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Serge Petrenko via Tarantool-patches
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 1/7] replication: fix a hang on final join retry Serge Petrenko via Tarantool-patches
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 2/7] applier: extract tx boundary checks from applier_read_tx into a separate routine Serge Petrenko via Tarantool-patches
@ 2021-03-24 12:24 ` Serge Petrenko via Tarantool-patches
  2021-03-26 20:47   ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-27 18:30   ` [Tarantool-patches] [PATCH v2 3.5/7] applier: fix not releasing the latch on apply_synchro_row() fail Serge Petrenko via Tarantool-patches
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 4/7] applier: remove excess last_row_time update from subscribe loop Serge Petrenko via Tarantool-patches
                   ` (6 subsequent siblings)
  9 siblings, 2 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-24 12:24 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

The new routine, called apply_plain_tx(), may be used not only by
applier_apply_tx(), but also by final join, once we make it
transactional, and recovery, once it's also turned transactional.

Also, while we're at it. Remove excess fiber_gc() call from
applier_subscribe loop. Let's better make sure fiber_gc() is called on
any return from applier_apply_tx().

Prerequisite #5874
Part of #5566
---
 src/box/applier.cc | 188 ++++++++++++++++++++++-----------------------
 1 file changed, 93 insertions(+), 95 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 65afa5e98..07e557a51 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -905,6 +905,90 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
 	return box_raft_process(&req, applier->instance_id);
 }
 
+static inline int
+apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
+{
+	/**
+	 * Explicitly begin the transaction so that we can
+	 * control fiber->gc life cycle and, in case of apply
+	 * conflict safely access failed xrow object and allocate
+	 * IPROTO_NOP on gc.
+	 */
+	struct txn *txn = txn_begin();
+	struct applier_tx_row *item;
+	if (txn == NULL)
+		 return -1;
+
+	stailq_foreach_entry(item, rows, next) {
+		struct xrow_header *row = &item->row;
+		int res = apply_row(row);
+		if (res != 0 && skip_conflict) {
+			struct error *e = diag_last_error(diag_get());
+			/*
+			 * In case of ER_TUPLE_FOUND error and enabled
+			 * replication_skip_conflict configuration
+			 * option, skip applying the foreign row and
+			 * replace it with NOP in the local write ahead
+			 * log.
+			 */
+			if (e->type == &type_ClientError &&
+			    box_error_code(e) == ER_TUPLE_FOUND &&
+			    replication_skip_conflict) {
+				diag_clear(diag_get());
+				row->type = IPROTO_NOP;
+				row->bodycnt = 0;
+				res = apply_row(row);
+			}
+		}
+		if (res != 0)
+			goto fail;
+	}
+
+	/*
+	 * We are going to commit so it's a high time to check if
+	 * the current transaction has non-local effects.
+	 */
+	if (txn_is_distributed(txn)) {
+		/*
+		 * A transaction mixes remote and local rows.
+		 * Local rows must be replicated back, which
+		 * doesn't make sense since the master likely has
+		 * new changes which local rows may overwrite.
+		 * Raise an error.
+		 */
+		diag_set(ClientError, ER_UNSUPPORTED, "Replication",
+			 "distributed transactions");
+		goto fail;
+	}
+
+	if (use_triggers) {
+		/* We are ready to submit txn to wal. */
+		struct trigger *on_rollback, *on_wal_write;
+		size_t size;
+		on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
+						  &size);
+		on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
+						   &size);
+		if (on_rollback == NULL || on_wal_write == NULL) {
+			diag_set(OutOfMemory, size, "region_alloc_object",
+				 "on_rollback/on_wal_write");
+			goto fail;
+		}
+
+		trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
+		txn_on_rollback(txn, on_rollback);
+
+		trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
+		txn_on_wal_write(txn, on_wal_write);
+	}
+
+	return txn_commit_try_async(txn);
+fail:
+	txn_rollback(txn);
+	return -1;
+}
+
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -931,6 +1015,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 	struct xrow_header *last_row;
 	last_row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
 	struct replica *replica = replica_by_id(first_row->replica_id);
+	int rc = 0;
 	/*
 	 * In a full mesh topology, the same set of changes
 	 * may arrive via two concurrently running appliers.
@@ -942,8 +1027,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 	latch_lock(latch);
 	if (vclock_get(&replicaset.applier.vclock,
 		       last_row->replica_id) >= last_row->lsn) {
-		latch_unlock(latch);
-		return 0;
+		goto no_write;
 	} else if (vclock_get(&replicaset.applier.vclock,
 			      first_row->replica_id) >= first_row->lsn) {
 		/*
@@ -974,103 +1058,18 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 		assert(first_row == last_row);
 		if (apply_synchro_row(first_row) != 0)
 			diag_raise();
-		goto success;
-	}
-
-	/**
-	 * Explicitly begin the transaction so that we can
-	 * control fiber->gc life cycle and, in case of apply
-	 * conflict safely access failed xrow object and allocate
-	 * IPROTO_NOP on gc.
-	 */
-	struct txn *txn;
-	txn = txn_begin();
-	struct applier_tx_row *item;
-	if (txn == NULL) {
-		latch_unlock(latch);
-		return -1;
-	}
-	stailq_foreach_entry(item, rows, next) {
-		struct xrow_header *row = &item->row;
-		int res = apply_row(row);
-		if (res != 0) {
-			struct error *e = diag_last_error(diag_get());
-			/*
-			 * In case of ER_TUPLE_FOUND error and enabled
-			 * replication_skip_conflict configuration
-			 * option, skip applying the foreign row and
-			 * replace it with NOP in the local write ahead
-			 * log.
-			 */
-			if (e->type == &type_ClientError &&
-			    box_error_code(e) == ER_TUPLE_FOUND &&
-			    replication_skip_conflict) {
-				diag_clear(diag_get());
-				row->type = IPROTO_NOP;
-				row->bodycnt = 0;
-				res = apply_row(row);
-			}
-		}
-		if (res != 0)
-			goto rollback;
-	}
-	/*
-	 * We are going to commit so it's a high time to check if
-	 * the current transaction has non-local effects.
-	 */
-	if (txn_is_distributed(txn)) {
-		/*
-		 * A transaction mixes remote and local rows.
-		 * Local rows must be replicated back, which
-		 * doesn't make sense since the master likely has
-		 * new changes which local rows may overwrite.
-		 * Raise an error.
-		 */
-		diag_set(ClientError, ER_UNSUPPORTED,
-			 "Replication", "distributed transactions");
-		goto rollback;
+		goto written;
 	}
 
-	/* We are ready to submit txn to wal. */
-	struct trigger *on_rollback, *on_wal_write;
-	size_t size;
-	on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
-					  &size);
-	on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
-					   &size);
-	if (on_rollback == NULL || on_wal_write == NULL) {
-		diag_set(OutOfMemory, size, "region_alloc_object",
-			 "on_rollback/on_wal_write");
-		goto rollback;
+	if ((rc = apply_plain_tx(rows, true, true)) == 0) {
+written:
+		vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
+			      last_row->lsn);
 	}
-
-	trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
-	txn_on_rollback(txn, on_rollback);
-
-	trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
-	txn_on_wal_write(txn, on_wal_write);
-
-	if (txn_commit_try_async(txn) < 0)
-		goto fail;
-
-success:
-	/*
-	 * The transaction was sent to journal so promote vclock.
-	 *
-	 * Use the lsn of the last row to guard from 1.10
-	 * instances, which send every single tx row as a separate
-	 * transaction.
-	 */
-	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
-		      last_row->lsn);
-	latch_unlock(latch);
-	return 0;
-rollback:
-	txn_rollback(txn);
-fail:
+no_write:
 	latch_unlock(latch);
 	fiber_gc();
-	return -1;
+	return rc;
 }
 
 /**
@@ -1280,7 +1279,6 @@ applier_subscribe(struct applier *applier)
 
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
-		fiber_gc();
 	}
 }
 
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 33+ messages in thread

* [Tarantool-patches] [PATCH v2 4/7] applier: remove excess last_row_time update from subscribe loop
  2021-03-24 12:24 [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Serge Petrenko via Tarantool-patches
                   ` (2 preceding siblings ...)
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 3/7] applier: extract plain tx application from applier_apply_tx() Serge Petrenko via Tarantool-patches
@ 2021-03-24 12:24 ` Serge Petrenko via Tarantool-patches
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional Serge Petrenko via Tarantool-patches
                   ` (5 subsequent siblings)
  9 siblings, 0 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-24 12:24 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

applier->last_row_time is updated in applier_read_tx_row, which's called
at least once per each subscribe loop iteration. So there's no need to
have a separate last_row_time update inside the loop body itself.

Part of #5566
---
 src/box/applier.cc | 1 -
 1 file changed, 1 deletion(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 07e557a51..d53f13711 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -1256,7 +1256,6 @@ applier_subscribe(struct applier *applier)
 		struct stailq rows;
 		applier_read_tx(applier, &rows);
 
-		applier->last_row_time = ev_monotonic_now(loop());
 		/*
 		 * In case of an heartbeat message wake a writer up
 		 * and check applier state.
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 33+ messages in thread

* [Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional
  2021-03-24 12:24 [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Serge Petrenko via Tarantool-patches
                   ` (3 preceding siblings ...)
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 4/7] applier: remove excess last_row_time update from subscribe loop Serge Petrenko via Tarantool-patches
@ 2021-03-24 12:24 ` Serge Petrenko via Tarantool-patches
  2021-03-26 20:49   ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 6/7] replication: tolerate synchro rollback during final join Serge Petrenko via Tarantool-patches
                   ` (4 subsequent siblings)
  9 siblings, 1 reply; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-24 12:24 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

Now applier assembles rows into transactions not only on subscribe
stage, but also during final join / register.

This was necessary for correct handling of rolled back synchronous
transactions in final join stream.

Part of #5566
---
 src/box/applier.cc | 126 ++++++++++++++++++++++-----------------------
 1 file changed, 61 insertions(+), 65 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index d53f13711..9a8b0f0fc 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -292,34 +292,6 @@ apply_row(struct xrow_header *row)
 	return 0;
 }
 
-static int
-apply_final_join_row(struct xrow_header *row)
-{
-	/*
-	 * Confirms are ignored during join. All the data master
-	 * sends us is valid.
-	 */
-	if (iproto_type_is_synchro_request(row->type))
-		return 0;
-	struct txn *txn = txn_begin();
-	if (txn == NULL)
-		return -1;
-	/*
-	 * Do not wait for confirmation while processing final
-	 * join rows. See apply_snapshot_row().
-	 */
-	txn_set_flags(txn, TXN_FORCE_ASYNC);
-	if (apply_row(row) != 0) {
-		txn_rollback(txn);
-		fiber_gc();
-		return -1;
-	}
-	if (txn_commit(txn) != 0)
-		return -1;
-	fiber_gc();
-	return 0;
-}
-
 /**
  * Connect to a remote host and authenticate the client.
  */
@@ -505,13 +477,26 @@ applier_fetch_snapshot(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
+static void
+applier_read_tx(struct applier *applier, struct stailq *rows,
+		uint64_t *row_count);
+
+static int
+apply_final_join_tx(struct stailq *rows);
+
+/**
+ * A helper struct to link xrow objects in a list.
+ */
+struct applier_tx_row {
+	/* Next transaction row. */
+	struct stailq_entry next;
+	/* xrow_header struct for the current transaction row. */
+	struct xrow_header row;
+};
+
 static uint64_t
 applier_wait_register(struct applier *applier, uint64_t row_count)
 {
-	struct ev_io *coio = &applier->io;
-	struct ibuf *ibuf = &applier->ibuf;
-	struct xrow_header row;
-
 	/*
 	 * Tarantool < 1.7.0: there is no "final join" stage.
 	 * Proceed to "subscribe" and do not finish bootstrap
@@ -524,27 +509,19 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
 	 * Receive final data.
 	 */
 	while (true) {
-		coio_read_xrow(coio, ibuf, &row);
-		applier->last_row_time = ev_monotonic_now(loop());
-		if (iproto_type_is_dml(row.type)) {
-			vclock_follow_xrow(&replicaset.vclock, &row);
-			if (apply_final_join_row(&row) != 0)
-				diag_raise();
-			if (++row_count % 100000 == 0)
-				say_info("%.1fM rows received", row_count / 1e6);
-		} else if (row.type == IPROTO_OK) {
-			/*
-			 * Current vclock. This is not used now,
-			 * ignore.
-			 */
-			++row_count;
-			break; /* end of stream */
-		} else if (iproto_type_is_error(row.type)) {
-			xrow_decode_error_xc(&row);  /* rethrow error */
-		} else {
-			tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
-				  (uint32_t) row.type);
+		struct stailq rows;
+		applier_read_tx(applier, &rows, &row_count);
+		struct xrow_header *first_row =
+			&stailq_first_entry(&rows, struct applier_tx_row,
+					    next)->row;
+		if (first_row->type == IPROTO_OK) {
+			assert(first_row ==
+			       &stailq_last_entry(&rows, struct applier_tx_row,
+						  next)->row);
+			break;
 		}
+		if (apply_final_join_tx(&rows) != 0)
+			diag_raise();
 	}
 
 	return row_count;
@@ -616,16 +593,6 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
-/**
- * A helper struct to link xrow objects in a list.
- */
-struct applier_tx_row {
-	/* Next transaction row. */
-	struct stailq_entry next;
-	/* xrow_header struct for the current transaction row. */
-	struct xrow_header row;
-};
-
 static struct applier_tx_row *
 applier_read_tx_row(struct applier *applier)
 {
@@ -646,8 +613,11 @@ applier_read_tx_row(struct applier *applier)
 	 * messages so we can't assume that if we haven't heard
 	 * from the master for quite a while the connection is
 	 * broken - the master might just be idle.
+	 * Also there are no timeouts during final join and register.
 	 */
-	if (applier->version_id < version_id(1, 7, 7))
+	if (applier->version_id < version_id(1, 7, 7) ||
+	    applier->state == APPLIER_FINAL_JOIN ||
+	    applier->state == APPLIER_REGISTER)
 		coio_read_xrow(coio, ibuf, row);
 	else
 		coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
@@ -723,7 +693,8 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
  * network input space is reused for the next xrow.
  */
 static void
-applier_read_tx(struct applier *applier, struct stailq *rows)
+applier_read_tx(struct applier *applier, struct stailq *rows,
+		uint64_t *row_count)
 {
 	int64_t tsn = 0;
 
@@ -731,6 +702,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 	do {
 		struct applier_tx_row *tx_row = applier_read_tx_row(applier);
 		tsn = set_next_tx_row(rows, tx_row, tsn);
+
+		if (row_count != NULL && ++*row_count % 100000 == 0)
+			say_info("%.1fM rows received", *row_count / 1e6);
 	} while (tsn != 0);
 }
 
@@ -988,6 +962,28 @@ fail:
 	return -1;
 }
 
+/** A simpler version of applier_apply_tx() for final join stage. */
+static int
+apply_final_join_tx(struct stailq *rows)
+{
+	struct xrow_header *first_row =
+		&stailq_first_entry(rows, struct applier_tx_row, next)->row;
+	struct xrow_header *last_row =
+		&stailq_last_entry(rows, struct applier_tx_row, next)->row;
+	int rc = 0;
+	/* WAL isn't enabled yet, so follow vclock manually. */
+	vclock_follow_xrow(&replicaset.vclock, last_row);
+	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
+		assert(first_row == last_row);
+		rc = apply_synchro_row(first_row);
+		goto end;
+	}
+
+	rc = apply_plain_tx(rows, false, false);
+end:
+	fiber_gc();
+	return rc;
+}
 
 /**
  * Apply all rows in the rows queue as a single transaction.
@@ -1254,7 +1250,7 @@ applier_subscribe(struct applier *applier)
 		}
 
 		struct stailq rows;
-		applier_read_tx(applier, &rows);
+		applier_read_tx(applier, &rows, NULL);
 
 		/*
 		 * In case of an heartbeat message wake a writer up
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 33+ messages in thread

* [Tarantool-patches] [PATCH v2 6/7] replication: tolerate synchro rollback during final join
  2021-03-24 12:24 [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Serge Petrenko via Tarantool-patches
                   ` (4 preceding siblings ...)
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional Serge Petrenko via Tarantool-patches
@ 2021-03-24 12:24 ` Serge Petrenko via Tarantool-patches
  2021-03-24 12:45   ` Serge Petrenko via Tarantool-patches
  2021-03-26 20:49   ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 7/7] replication: do not ignore replica vclock on register Serge Petrenko via Tarantool-patches
                   ` (3 subsequent siblings)
  9 siblings, 2 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-24 12:24 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

Both box_process_register and box_process_join had guards ensuring that
not a single rollback occured for transactions residing in WAL around
replica's _cluster registration.
Both functions would error on a rollback and make the replica retry
final join.

The reason for that was that replica couldn't process synchronous
transactions correctly during final join, because it applied the final
join stream row-by-row.

This path with retrying final join was a dead end, because even if
master manages to receive no ROLLBACK messages around N-th retry of
box.space._cluster:insert{}, replica would still have to receive and
process all the data dating back to its first _cluster registration
attempt.
In other words, the guard against sending synchronous rows to the
replica didn't work.

Let's remove the guard altogether, since now replica is capable of
processing synchronous txs in final join stream and even retrying final
join in case the _cluster registration was rolled back.

Closes #5566
---
 changelogs/unreleased/synchro-final-join.md   |   4 +
 src/box/applier.cc                            |   2 +
 src/box/box.cc                                |  24 ---
 src/box/relay.cc                              |   1 +
 .../gh-5566-final-join-synchro.result         | 139 ++++++++++++++++++
 .../gh-5566-final-join-synchro.test.lua       |  61 ++++++++
 test/replication/suite.cfg                    |   1 +
 7 files changed, 208 insertions(+), 24 deletions(-)
 create mode 100644 changelogs/unreleased/synchro-final-join.md
 create mode 100644 test/replication/gh-5566-final-join-synchro.result
 create mode 100644 test/replication/gh-5566-final-join-synchro.test.lua

diff --git a/changelogs/unreleased/synchro-final-join.md b/changelogs/unreleased/synchro-final-join.md
new file mode 100644
index 000000000..cef77df87
--- /dev/null
+++ b/changelogs/unreleased/synchro-final-join.md
@@ -0,0 +1,4 @@
+## bugfix/core
+
+* Fix a bug in applier erroring with `Unknown request type 40` during final join
+  when master has synchronous spaces (gh-5566).
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9a8b0f0fc..0d1b4d28d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -109,6 +109,8 @@ applier_log_error(struct applier *applier, struct error *e)
 	case ER_PASSWORD_MISMATCH:
 	case ER_XLOG_GAP:
 	case ER_TOO_EARLY_SUBSCRIBE:
+	case ER_SYNC_QUORUM_TIMEOUT:
+	case ER_SYNC_ROLLBACK:
 		say_info("will retry every %.2lf second",
 			 replication_reconnect_interval());
 		break;
diff --git a/src/box/box.cc b/src/box/box.cc
index cc59564e1..292a54213 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2163,8 +2163,6 @@ box_process_register(struct ev_io *io, struct xrow_header *header)
 	say_info("registering replica %s at %s",
 		 tt_uuid_str(&instance_uuid), sio_socketname(io->fd));
 
-	/* See box_process_join() */
-	int64_t limbo_rollback_count = txn_limbo.rollback_count;
 	struct vclock start_vclock;
 	vclock_copy(&start_vclock, &replicaset.vclock);
 
@@ -2180,12 +2178,6 @@ box_process_register(struct ev_io *io, struct xrow_header *header)
 	struct vclock stop_vclock;
 	vclock_copy(&stop_vclock, &replicaset.vclock);
 
-	if (txn_limbo.rollback_count != limbo_rollback_count)
-		tnt_raise(ClientError, ER_SYNC_ROLLBACK);
-
-	if (txn_limbo_wait_confirm(&txn_limbo) != 0)
-		diag_raise();
-
 	/*
 	 * Feed replica with WALs in range
 	 * (start_vclock, stop_vclock) so that it gets its
@@ -2307,15 +2299,6 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	say_info("joining replica %s at %s",
 		 tt_uuid_str(&instance_uuid), sio_socketname(io->fd));
 
-	/*
-	 * In order to join a replica, master has to make sure it
-	 * doesn't send unconfirmed data. We have to check that
-	 * there are no rolled back transactions between
-	 * start_vclock and stop_vclock, and that the data right
-	 * before stop_vclock is confirmed, before we can proceed
-	 * to final join.
-	 */
-	int64_t limbo_rollback_count = txn_limbo.rollback_count;
 	/*
 	 * Initial stream: feed replica with dirty data from engines.
 	 */
@@ -2336,13 +2319,6 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	/* Remember master's vclock after the last request */
 	struct vclock stop_vclock;
 	vclock_copy(&stop_vclock, &replicaset.vclock);
-
-	if (txn_limbo.rollback_count != limbo_rollback_count)
-		tnt_raise(ClientError, ER_SYNC_ROLLBACK);
-
-	if (txn_limbo_wait_confirm(&txn_limbo) != 0)
-		diag_raise();
-
 	/* Send end of initial stage data marker */
 	struct xrow_header row;
 	xrow_encode_vclock_xc(&row, &stop_vclock);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 41f949e8e..dd7a167e4 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -1035,6 +1035,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 					    ERRINJ_INT);
 		if (inj != NULL && packet->lsn == inj->iparam) {
 			packet->lsn = inj->iparam - 1;
+			packet->tsn = packet->lsn;
 			say_warn("injected broken lsn: %lld",
 				 (long long) packet->lsn);
 		}
diff --git a/test/replication/gh-5566-final-join-synchro.result b/test/replication/gh-5566-final-join-synchro.result
new file mode 100644
index 000000000..32749bf12
--- /dev/null
+++ b/test/replication/gh-5566-final-join-synchro.result
@@ -0,0 +1,139 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--
+-- gh-5566 replica tolerates synchronous transactions in final join stream.
+--
+_ = box.schema.space.create('sync', {is_sync=true})
+ | ---
+ | ...
+_ = box.space.sync:create_index('pk')
+ | ---
+ | ...
+
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+box.schema.user.grant('guest', 'write', 'space', 'sync')
+ | ---
+ | ...
+
+-- Part 1. Make sure a joining instance tolerates synchronous rows in final join
+-- stream.
+trig = function()\
+    box.space.sync:replace{1}\
+end
+ | ---
+ | ...
+-- The trigger will generate synchronous rows each time a replica joins.
+_ = box.space._cluster:on_replace(trig)
+ | ---
+ | ...
+
+orig_synchro_quorum = box.cfg.replication_synchro_quorum
+ | ---
+ | ...
+box.cfg{replication_synchro_quorum=1}
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default,\
+                                         script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+test_run:wait_upstream(1, {status='follow'})
+ | ---
+ | - true
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+
+-- Part 2. Make sure master aborts final join if insert to _cluster is rolled
+-- back and replica is capable of retrying it.
+orig_synchro_timeout = box.cfg.replication_synchro_timeout
+ | ---
+ | ...
+-- Make the trigger we used above fail with no quorum.
+box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.01}
+ | ---
+ | ...
+-- Try to join the replica once again.
+test_run:cmd('create server replica with rpl_master=default,\
+                                         script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica with wait=False')
+ | ---
+ | - true
+ | ...
+
+test_run:wait_log('replica', 'ER_SYNC_QUORUM_TIMEOUT', nil, 10)
+ | ---
+ | - ER_SYNC_QUORUM_TIMEOUT
+ | ...
+-- Remove the trigger to let the replica connect.
+box.space._cluster:on_replace(nil, trig)
+ | ---
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+test_run:wait_upstream(1, {status='follow'})
+ | ---
+ | - true
+ | ...
+
+-- Cleanup.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+box.cfg{\
+    replication_synchro_quorum=orig_synchro_quorum,\
+    replication_synchro_timeout=orig_synchro_timeout\
+}
+ | ---
+ | ...
+box.space.sync:drop()
+ | ---
+ | ...
+test_run:cleanup_cluster()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
diff --git a/test/replication/gh-5566-final-join-synchro.test.lua b/test/replication/gh-5566-final-join-synchro.test.lua
new file mode 100644
index 000000000..14302f6e6
--- /dev/null
+++ b/test/replication/gh-5566-final-join-synchro.test.lua
@@ -0,0 +1,61 @@
+test_run = require('test_run').new()
+
+--
+-- gh-5566 replica tolerates synchronous transactions in final join stream.
+--
+_ = box.schema.space.create('sync', {is_sync=true})
+_ = box.space.sync:create_index('pk')
+
+box.schema.user.grant('guest', 'replication')
+box.schema.user.grant('guest', 'write', 'space', 'sync')
+
+-- Part 1. Make sure a joining instance tolerates synchronous rows in final join
+-- stream.
+trig = function()\
+    box.space.sync:replace{1}\
+end
+-- The trigger will generate synchronous rows each time a replica joins.
+_ = box.space._cluster:on_replace(trig)
+
+orig_synchro_quorum = box.cfg.replication_synchro_quorum
+box.cfg{replication_synchro_quorum=1}
+
+test_run:cmd('create server replica with rpl_master=default,\
+                                         script="replication/replica.lua"')
+test_run:cmd('start server replica')
+
+test_run:switch('replica')
+test_run:wait_upstream(1, {status='follow'})
+
+test_run:switch('default')
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+
+-- Part 2. Make sure master aborts final join if insert to _cluster is rolled
+-- back and replica is capable of retrying it.
+orig_synchro_timeout = box.cfg.replication_synchro_timeout
+-- Make the trigger we used above fail with no quorum.
+box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.01}
+-- Try to join the replica once again.
+test_run:cmd('create server replica with rpl_master=default,\
+                                         script="replication/replica.lua"')
+test_run:cmd('start server replica with wait=False')
+
+test_run:wait_log('replica', 'ER_SYNC_QUORUM_TIMEOUT', nil, 10)
+-- Remove the trigger to let the replica connect.
+box.space._cluster:on_replace(nil, trig)
+
+test_run:switch('replica')
+test_run:wait_upstream(1, {status='follow'})
+
+-- Cleanup.
+test_run:switch('default')
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.cfg{\
+    replication_synchro_quorum=orig_synchro_quorum,\
+    replication_synchro_timeout=orig_synchro_timeout\
+}
+box.space.sync:drop()
+test_run:cleanup_cluster()
+box.schema.user.revoke('guest', 'replication')
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 7e7004592..04a3c4bb2 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -38,6 +38,7 @@
     "gh-5440-qsync-ro.test.lua": {},
     "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {},
     "gh-5536-wal-limit.test.lua": {},
+    "gh-5566-final-join-synchro.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 33+ messages in thread

* [Tarantool-patches] [PATCH v2 7/7] replication: do not ignore replica vclock on register
  2021-03-24 12:24 [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Serge Petrenko via Tarantool-patches
                   ` (5 preceding siblings ...)
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 6/7] replication: tolerate synchro rollback during final join Serge Petrenko via Tarantool-patches
@ 2021-03-24 12:24 ` Serge Petrenko via Tarantool-patches
  2021-03-26 20:50   ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-26 13:46 ` [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Cyrill Gorcunov via Tarantool-patches
                   ` (2 subsequent siblings)
  9 siblings, 1 reply; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-24 12:24 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

There was a bug in box_process_register. It decoded replica's vclock but
never used it when sending the registration stream. So the replica might
lose the data in range (replica_vclock, start_vclock).

Follow-up #5566
---
 src/box/box.cc | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 292a54213..0bcb505a8 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2127,8 +2127,8 @@ box_process_register(struct ev_io *io, struct xrow_header *header)
 	assert(header->type == IPROTO_REGISTER);
 
 	struct tt_uuid instance_uuid = uuid_nil;
-	struct vclock vclock;
-	xrow_decode_register_xc(header, &instance_uuid, &vclock);
+	struct vclock replica_vclock;
+	xrow_decode_register_xc(header, &instance_uuid, &replica_vclock);
 
 	if (!is_box_configured)
 		tnt_raise(ClientError, ER_LOADING);
@@ -2154,7 +2154,8 @@ box_process_register(struct ev_io *io, struct xrow_header *header)
 			  "wal_mode = 'none'");
 	}
 
-	struct gc_consumer *gc = gc_consumer_register(&replicaset.vclock,
+	vclock_reset(&replica_vclock, 0, vclock_get(&replicaset.vclock, 0));
+	struct gc_consumer *gc = gc_consumer_register(&replica_vclock,
 				"replica %s", tt_uuid_str(&instance_uuid));
 	if (gc == NULL)
 		diag_raise();
@@ -2163,9 +2164,6 @@ box_process_register(struct ev_io *io, struct xrow_header *header)
 	say_info("registering replica %s at %s",
 		 tt_uuid_str(&instance_uuid), sio_socketname(io->fd));
 
-	struct vclock start_vclock;
-	vclock_copy(&start_vclock, &replicaset.vclock);
-
 	/**
 	 * Call the server-side hook which stores the replica uuid
 	 * in _cluster space.
@@ -2180,10 +2178,10 @@ box_process_register(struct ev_io *io, struct xrow_header *header)
 
 	/*
 	 * Feed replica with WALs in range
-	 * (start_vclock, stop_vclock) so that it gets its
+	 * (replica_vclock, stop_vclock) so that it gets its
 	 * registration.
 	 */
-	relay_final_join(io->fd, header->sync, &start_vclock, &stop_vclock);
+	relay_final_join(io->fd, header->sync, &replica_vclock, &stop_vclock);
 	say_info("final data sent.");
 
 	struct xrow_header row;
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 6/7] replication: tolerate synchro rollback during final join
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 6/7] replication: tolerate synchro rollback during final join Serge Petrenko via Tarantool-patches
@ 2021-03-24 12:45   ` Serge Petrenko via Tarantool-patches
  2021-03-26 20:49   ` Vladislav Shpilevoy via Tarantool-patches
  1 sibling, 0 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-24 12:45 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches



24.03.2021 15:24, Serge Petrenko пишет:
>   .../gh-5566-final-join-synchro.result         | 139 ++++++++++++++++++
>   .../gh-5566-final-join-synchro.test.lua       |  61 ++++++++
>
With the testcase from this patch some replication tests fail with
`ER_TUPLE_FOUND: Duplicate key exists in unique index 'primary' in space 
'_cluster'``

This happens once the default node is restarted, because recovery is not 
yet transactional.
see https://github.com/tarantool/tarantool/issues/5874.

I may rewrite the test so that synchronous transactions in it are 
single-statement, or we
may leave it as is until #5874 is fixed. Which will happen presumably 
soon enough.

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 2/7] applier: extract tx boundary checks from applier_read_tx into a separate routine
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 2/7] applier: extract tx boundary checks from applier_read_tx into a separate routine Serge Petrenko via Tarantool-patches
@ 2021-03-26 12:35   ` Cyrill Gorcunov via Tarantool-patches
  2021-03-27 16:54     ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 33+ messages in thread
From: Cyrill Gorcunov via Tarantool-patches @ 2021-03-26 12:35 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: v.shpilevoy, tarantool-patches

On Wed, Mar 24, 2021 at 03:24:12PM +0300, Serge Petrenko wrote:
> Introduce a new routine, set_next_tx_row(), which checks tx boundary
> violation and appends the new row to the current tx in case everything
> is ok.
> 
> set_next_tx_row() is extracted from applier_read_tx() because it's a
> common part of transaction assembly both for recovery and applier.
> 
> The only difference for recovery will be that the routine which's
> responsible for tx assembly won't read rows. It'll be a callback ran on
> each new row being read from WAL.
> 
> Prerequisite #5874
> Part-of #5566
> ---
>  src/box/applier.cc | 117 +++++++++++++++++++++++----------------------
>  1 file changed, 60 insertions(+), 57 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 326cf18d2..65afa5e98 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -657,6 +657,64 @@ applier_read_tx_row(struct applier *applier)
>  	return tx_row;
>  }
>  
> +static inline int64_t
> +set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)

Serge, why it is "inline"? I propose to not use inline until really needed.
The compiler may ignore inline word, same time if function is small the
compiler may inline it as a part of optimisation stage.

Not a big deal, feel free to ignore, but I see "inline" is used too much all over the code :(

https://www.kernel.org/doc/html/v4.10/process/coding-style.html#the-inline-disease

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final
  2021-03-24 12:24 [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Serge Petrenko via Tarantool-patches
                   ` (6 preceding siblings ...)
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 7/7] replication: do not ignore replica vclock on register Serge Petrenko via Tarantool-patches
@ 2021-03-26 13:46 ` Cyrill Gorcunov via Tarantool-patches
  2021-03-30 20:13 ` Vladislav Shpilevoy via Tarantool-patches
  2021-04-05 16:15 ` Kirill Yukhin via Tarantool-patches
  9 siblings, 0 replies; 33+ messages in thread
From: Cyrill Gorcunov via Tarantool-patches @ 2021-03-26 13:46 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: v.shpilevoy, tarantool-patches

On Wed, Mar 24, 2021 at 03:24:10PM +0300, Serge Petrenko wrote:
> Initially this patches idea was to ignore synchronous rows on applier side and
> make it so that there are no rolled back transactions in final join stream on
> master side.
> 
> Unfortunately, this easy fix didn't work. The reason for it is that once the
> replica receives initial data, up to `start_vclock`, the final join stream has
> to start right at `start_vclock` so that we do not lose any transactions.
> 
> This means that once master encounters a synchro rollback and makes replica
> retry final join to get rid of the rollback, it still has to send it together
> with other data. And this rollback must be processed by applier to avoid
> conflicts.
> 
> In order to let applier process synchro requests (CONFIRM and ROLLBACK) we need
> to make final join transactional, obviously. This is what this patchset does.
> 
> An alternative would be to retry not only final, but also initial join every
> time master receives a rollback during final join stage. This would be too
> violent due to possibly huge data amounts being sent during initial join.

I don't see any obvious problems in the series but I'm far from being
replication expert, still Ack

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 1/7] replication: fix a hang on final join retry
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 1/7] replication: fix a hang on final join retry Serge Petrenko via Tarantool-patches
@ 2021-03-26 20:44   ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-27 16:52     ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 33+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-26 20:44 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Hi! Thanks for working on this!

> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 5a88a013e..326cf18d2 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -566,9 +566,16 @@ applier_register(struct applier *applier)
>  	row.type = IPROTO_REGISTER;
>  	coio_write_xrow(coio, &row);
>  
> -	applier_set_state(applier, APPLIER_REGISTER);
> +	/*
> +	 * Register may serve as a retry for final join. Set corresponding
> +	 * states to unblock anyone who's waiting for final join to start or
> +	 * end.
> +	 */
> +	applier_set_state(applier, was_anon ? APPLIER_REGISTER :
> +					      APPLIER_FINAL_JOIN);
>  	applier_wait_register(applier, 0);
> -	applier_set_state(applier, APPLIER_REGISTERED);
> +	applier_set_state(applier, was_anon ? APPLIER_REGISTERED :
> +					      APPLIER_JOINED);
>  	applier_set_state(applier, APPLIER_READY);

Hm. I don't understand. Transition from anon to non-anon leads to
re-creation of all appliers. It calls box_sync_replication() and
creates new struct applier objects. How is it possible that during one
life of a reader fiber it manages to see 2 states and is not terminated?

Also could you please provide a test? Maybe it would be easier to see
what is happening then.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 3/7] applier: extract plain tx application from applier_apply_tx()
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 3/7] applier: extract plain tx application from applier_apply_tx() Serge Petrenko via Tarantool-patches
@ 2021-03-26 20:47   ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-27 17:34     ` Serge Petrenko via Tarantool-patches
  2021-03-27 18:30   ` [Tarantool-patches] [PATCH v2 3.5/7] applier: fix not releasing the latch on apply_synchro_row() fail Serge Petrenko via Tarantool-patches
  1 sibling, 1 reply; 33+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-26 20:47 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Thanks for the patch!

See 4 comments below.

On 24.03.2021 13:24, Serge Petrenko wrote:
> The new routine, called apply_plain_tx(), may be used not only by
> applier_apply_tx(), but also by final join, once we make it
> transactional, and recovery, once it's also turned transactional.
> 
> Also, while we're at it. Remove excess fiber_gc() call from
> applier_subscribe loop. Let's better make sure fiber_gc() is called on
> any return from applier_apply_tx().
> 
> Prerequisite #5874
> Part of #5566
> ---
>  src/box/applier.cc | 188 ++++++++++++++++++++++-----------------------
>  1 file changed, 93 insertions(+), 95 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 65afa5e98..07e557a51 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -905,6 +905,90 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>  	return box_raft_process(&req, applier->instance_id);
>  }
>  
> +static inline int
> +apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
> +{
> +	/**

1. Inside of functions for comment first line we use /*, not /**.

> +	 * Explicitly begin the transaction so that we can
> +	 * control fiber->gc life cycle and, in case of apply
> +	 * conflict safely access failed xrow object and allocate
> +	 * IPROTO_NOP on gc.
> +	 */
> +	struct txn *txn = txn_begin();
> +	struct applier_tx_row *item;
> +	if (txn == NULL)
> +		 return -1;
> +
> +	stailq_foreach_entry(item, rows, next) {
> +		struct xrow_header *row = &item->row;
> +		int res = apply_row(row);
> +		if (res != 0 && skip_conflict) {
> +			struct error *e = diag_last_error(diag_get());
> +			/*
> +			 * In case of ER_TUPLE_FOUND error and enabled
> +			 * replication_skip_conflict configuration
> +			 * option, skip applying the foreign row and
> +			 * replace it with NOP in the local write ahead
> +			 * log.
> +			 */
> +			if (e->type == &type_ClientError &&
> +			    box_error_code(e) == ER_TUPLE_FOUND &&
> +			    replication_skip_conflict) {

2. That looks kind of confusing - you pass skip_conflict option but
also use replication_skip_conflict. You could calculate skip_conflict
based on replication_skip_conflict in your patch.

> +				diag_clear(diag_get());
> +				row->type = IPROTO_NOP;
> +				row->bodycnt = 0;
> +				res = apply_row(row);
> +			}
> +		}
> +		if (res != 0)
> +			goto fail;
> +	}
> +
> +	/*
> +	 * We are going to commit so it's a high time to check if
> +	 * the current transaction has non-local effects.
> +	 */
> +	if (txn_is_distributed(txn)) {
> +		/*
> +		 * A transaction mixes remote and local rows.
> +		 * Local rows must be replicated back, which
> +		 * doesn't make sense since the master likely has
> +		 * new changes which local rows may overwrite.
> +		 * Raise an error.
> +		 */
> +		diag_set(ClientError, ER_UNSUPPORTED, "Replication",
> +			 "distributed transactions");
> +		goto fail;
> +	}
> +
> +	if (use_triggers) {
> +		/* We are ready to submit txn to wal. */
> +		struct trigger *on_rollback, *on_wal_write;
> +		size_t size;
> +		on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
> +						  &size);
> +		on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
> +						   &size);
> +		if (on_rollback == NULL || on_wal_write == NULL) {
> +			diag_set(OutOfMemory, size, "region_alloc_object",
> +				 "on_rollback/on_wal_write");
> +			goto fail;
> +		}
> +
> +		trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
> +		txn_on_rollback(txn, on_rollback);
> +
> +		trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
> +		txn_on_wal_write(txn, on_wal_write);
> +	}
> +
> +	return txn_commit_try_async(txn);
> +fail:
> +	txn_rollback(txn);
> +	return -1;
> +}
> @@ -974,103 +1058,18 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>  		assert(first_row == last_row);
>  		if (apply_synchro_row(first_row) != 0)
>  			diag_raise();

3. Hm. Isn't it a bug that we raise an error here, but don't unlock the
latch and don't call fiber_gc()? Looks like a separate bug. Could you
fix it please, and probably with a test? Can it be related to the
hang you fix in the previous commit?

> -		goto success;
> -	}
> -
> -	/**
> -	 * Explicitly begin the transaction so that we can
> -	 * control fiber->gc life cycle and, in case of apply
> -	 * conflict safely access failed xrow object and allocate
> -	 * IPROTO_NOP on gc.
> -	 */
> -	struct txn *txn;
> -	txn = txn_begin();
> -	struct applier_tx_row *item;
> -	if (txn == NULL) {
> -		latch_unlock(latch);
> -		return -1;
> -	}
> -	stailq_foreach_entry(item, rows, next) {
> -		struct xrow_header *row = &item->row;
> -		int res = apply_row(row);
> -		if (res != 0) {
> -			struct error *e = diag_last_error(diag_get());
> -			/*
> -			 * In case of ER_TUPLE_FOUND error and enabled
> -			 * replication_skip_conflict configuration
> -			 * option, skip applying the foreign row and
> -			 * replace it with NOP in the local write ahead
> -			 * log.
> -			 */
> -			if (e->type == &type_ClientError &&
> -			    box_error_code(e) == ER_TUPLE_FOUND &&
> -			    replication_skip_conflict) {
> -				diag_clear(diag_get());
> -				row->type = IPROTO_NOP;
> -				row->bodycnt = 0;
> -				res = apply_row(row);
> -			}
> -		}
> -		if (res != 0)
> -			goto rollback;
> -	}
> -	/*
> -	 * We are going to commit so it's a high time to check if
> -	 * the current transaction has non-local effects.
> -	 */
> -	if (txn_is_distributed(txn)) {
> -		/*
> -		 * A transaction mixes remote and local rows.
> -		 * Local rows must be replicated back, which
> -		 * doesn't make sense since the master likely has
> -		 * new changes which local rows may overwrite.
> -		 * Raise an error.
> -		 */
> -		diag_set(ClientError, ER_UNSUPPORTED,
> -			 "Replication", "distributed transactions");
> -		goto rollback;
> +		goto written;
>  	}
>  
> -	/* We are ready to submit txn to wal. */
> -	struct trigger *on_rollback, *on_wal_write;
> -	size_t size;
> -	on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
> -					  &size);
> -	on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
> -					   &size);
> -	if (on_rollback == NULL || on_wal_write == NULL) {
> -		diag_set(OutOfMemory, size, "region_alloc_object",
> -			 "on_rollback/on_wal_write");
> -		goto rollback;
> +	if ((rc = apply_plain_tx(rows, true, true)) == 0) {
> +written:
> +		vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
> +			      last_row->lsn);
>  	}
> -
> -	trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
> -	txn_on_rollback(txn, on_rollback);
> -
> -	trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
> -	txn_on_wal_write(txn, on_wal_write);
> -
> -	if (txn_commit_try_async(txn) < 0)
> -		goto fail;
> -
> -success:
> -	/*
> -	 * The transaction was sent to journal so promote vclock.
> -	 *
> -	 * Use the lsn of the last row to guard from 1.10
> -	 * instances, which send every single tx row as a separate
> -	 * transaction.
> -	 */
> -	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
> -		      last_row->lsn);
> -	latch_unlock(latch);
> -	return 0;
> -rollback:
> -	txn_rollback(txn);
> -fail:
> +no_write:

4. You go to this label even when write was done. Maybe rename to
'end' or 'finish'?

Consider this diff:

====================
@@ -1027,7 +1027,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 	latch_lock(latch);
 	if (vclock_get(&replicaset.applier.vclock,
 		       last_row->replica_id) >= last_row->lsn) {
-		goto no_write;
+		goto finish;
 	} else if (vclock_get(&replicaset.applier.vclock,
 			      first_row->replica_id) >= first_row->lsn) {
 		/*
@@ -1058,15 +1058,12 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 		assert(first_row == last_row);
 		if (apply_synchro_row(first_row) != 0)
 			diag_raise();
-		goto written;
+	} else if ((rc = apply_plain_tx(rows, true, true)) != 0) {
+		goto finish;
 	}
-
-	if ((rc = apply_plain_tx(rows, true, true)) == 0) {
-written:
-		vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
-			      last_row->lsn);
-	}
-no_write:
+	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
+		      last_row->lsn);
+finish:
 	latch_unlock(latch);
 	fiber_gc();
 	return rc;
====================

>  	latch_unlock(latch);
>  	fiber_gc();
> -	return -1;
> +	return rc;
>  }

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional Serge Petrenko via Tarantool-patches
@ 2021-03-26 20:49   ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-27 19:05     ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 33+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-26 20:49 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

I appreciate the work you did here!

See 3 comments below.

On 24.03.2021 13:24, Serge Petrenko wrote:
> Now applier assembles rows into transactions not only on subscribe
> stage, but also during final join / register.
> 
> This was necessary for correct handling of rolled back synchronous
> transactions in final join stream.
> 
> Part of #5566
> ---
>  src/box/applier.cc | 126 ++++++++++++++++++++++-----------------------
>  1 file changed, 61 insertions(+), 65 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index d53f13711..9a8b0f0fc 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -524,27 +509,19 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>  	 * Receive final data.
>  	 */
>  	while (true) {
> -		coio_read_xrow(coio, ibuf, &row);
> -		applier->last_row_time = ev_monotonic_now(loop());
> -		if (iproto_type_is_dml(row.type)) {
> -			vclock_follow_xrow(&replicaset.vclock, &row);
> -			if (apply_final_join_row(&row) != 0)
> -				diag_raise();
> -			if (++row_count % 100000 == 0)
> -				say_info("%.1fM rows received", row_count / 1e6);
> -		} else if (row.type == IPROTO_OK) {
> -			/*
> -			 * Current vclock. This is not used now,
> -			 * ignore.
> -			 */

1. The comment was helpful, lets keep it.

> -			++row_count;
> -			break; /* end of stream */
> -		} else if (iproto_type_is_error(row.type)) {
> -			xrow_decode_error_xc(&row);  /* rethrow error */
> -		} else {
> -			tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
> -				  (uint32_t) row.type);
> +		struct stailq rows;
> +		applier_read_tx(applier, &rows, &row_count);
> +		struct xrow_header *first_row =
> +			&stailq_first_entry(&rows, struct applier_tx_row,
> +					    next)->row;
> +		if (first_row->type == IPROTO_OK) {
> +			assert(first_row ==
> +			       &stailq_last_entry(&rows, struct applier_tx_row,
> +						  next)->row);
> +			break;
>  		}
> +		if (apply_final_join_tx(&rows) != 0)
> +			diag_raise();
>  	}
>  
>  	return row_count;
> @@ -646,8 +613,11 @@ applier_read_tx_row(struct applier *applier)
>  	 * messages so we can't assume that if we haven't heard
>  	 * from the master for quite a while the connection is
>  	 * broken - the master might just be idle.
> +	 * Also there are no timeouts during final join and register.
>  	 */
> -	if (applier->version_id < version_id(1, 7, 7))
> +	if (applier->version_id < version_id(1, 7, 7) ||
> +	    applier->state == APPLIER_FINAL_JOIN ||
> +	    applier->state == APPLIER_REGISTER)

2. Maybe it would be better to pass the timeout from the upper level and
always use coio_read_xrow_timeout_xc(). For the mentioned conditions
it would be infinity. Anyway the non-timed version in the end uses
TIMEOUT_INFINITY too (coio_read_ahead). That way it would be less
tricky conditions checks in the generic code.

>  		coio_read_xrow(coio, ibuf, row);
>  	else
>  		coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
> @@ -731,6 +702,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
>  	do {
>  		struct applier_tx_row *tx_row = applier_read_tx_row(applier);
>  		tsn = set_next_tx_row(rows, tx_row, tsn);
> +
> +		if (row_count != NULL && ++*row_count % 100000 == 0)
> +			say_info("%.1fM rows received", *row_count / 1e6);

3. Hm. That adds branching and heavy '%' operation. Maybe you could make it
return number of rows and in the caller code do this check + log. So it
would affect only the joins.

> @@ -1254,7 +1250,7 @@ applier_subscribe(struct applier *applier)
>  		}
>  
>  		struct stailq rows;
> -		applier_read_tx(applier, &rows);
> +		applier_read_tx(applier, &rows, NULL);
>  
>  		/*
>  		 * In case of an heartbeat message wake a writer up
> 

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 6/7] replication: tolerate synchro rollback during final join
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 6/7] replication: tolerate synchro rollback during final join Serge Petrenko via Tarantool-patches
  2021-03-24 12:45   ` Serge Petrenko via Tarantool-patches
@ 2021-03-26 20:49   ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-27 19:23     ` Serge Petrenko via Tarantool-patches
  1 sibling, 1 reply; 33+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-26 20:49 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Thanks for working on this!

> diff --git a/test/replication/gh-5566-final-join-synchro.result b/test/replication/gh-5566-final-join-synchro.result
> new file mode 100644
> index 000000000..32749bf12
> --- /dev/null
> +++ b/test/replication/gh-5566-final-join-synchro.result
> @@ -0,0 +1,139 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +
> +--
> +-- gh-5566 replica tolerates synchronous transactions in final join stream.
> +--
> +_ = box.schema.space.create('sync', {is_sync=true})
> + | ---
> + | ...
> +_ = box.space.sync:create_index('pk')
> + | ---
> + | ...
> +
> +box.schema.user.grant('guest', 'replication')
> + | ---
> + | ...
> +box.schema.user.grant('guest', 'write', 'space', 'sync')
> + | ---
> + | ...
> +
> +-- Part 1. Make sure a joining instance tolerates synchronous rows in final join
> +-- stream.
> +trig = function()\
> +    box.space.sync:replace{1}\
> +end

You might need to increase the synchro timeout because the default can
be flaky.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 7/7] replication: do not ignore replica vclock on register
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 7/7] replication: do not ignore replica vclock on register Serge Petrenko via Tarantool-patches
@ 2021-03-26 20:50   ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-27 20:13     ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 33+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-26 20:50 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Thanks for the patch!

See 2 comments below.

On 24.03.2021 13:24, Serge Petrenko wrote:
> There was a bug in box_process_register. It decoded replica's vclock but
> never used it when sending the registration stream. So the replica might
> lose the data in range (replica_vclock, start_vclock).

1. Could you please add a test?

> Follow-up #5566
> ---
>  src/box/box.cc | 14 ++++++--------
>  1 file changed, 6 insertions(+), 8 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 292a54213..0bcb505a8 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -2154,7 +2154,8 @@ box_process_register(struct ev_io *io, struct xrow_header *header)
>  			  "wal_mode = 'none'");
>  	}
>  
> -	struct gc_consumer *gc = gc_consumer_register(&replicaset.vclock,
> +	vclock_reset(&replica_vclock, 0, vclock_get(&replicaset.vclock, 0));

2. xrow_decode_register_xc() already returns the vclock with empty 0
part. Why do you need this reset?

> +	struct gc_consumer *gc = gc_consumer_register(&replica_vclock,
>  				"replica %s", tt_uuid_str(&instance_uuid));
>  	if (gc == NULL)
>  		diag_raise();

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 1/7] replication: fix a hang on final join retry
  2021-03-26 20:44   ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-27 16:52     ` Serge Petrenko via Tarantool-patches
  2021-03-29 21:50       ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-27 16:52 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



26.03.2021 23:44, Vladislav Shpilevoy пишет:
> Hi! Thanks for working on this!
>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 5a88a013e..326cf18d2 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -566,9 +566,16 @@ applier_register(struct applier *applier)
>>   	row.type = IPROTO_REGISTER;
>>   	coio_write_xrow(coio, &row);
>>   
>> -	applier_set_state(applier, APPLIER_REGISTER);
>> +	/*
>> +	 * Register may serve as a retry for final join. Set corresponding
>> +	 * states to unblock anyone who's waiting for final join to start or
>> +	 * end.
>> +	 */
>> +	applier_set_state(applier, was_anon ? APPLIER_REGISTER :
>> +					      APPLIER_FINAL_JOIN);
>>   	applier_wait_register(applier, 0);
>> -	applier_set_state(applier, APPLIER_REGISTERED);
>> +	applier_set_state(applier, was_anon ? APPLIER_REGISTERED :
>> +					      APPLIER_JOINED);
>>   	applier_set_state(applier, APPLIER_READY);
> Hm. I don't understand. Transition from anon to non-anon leads to
> re-creation of all appliers. It calls box_sync_replication() and
> creates new struct applier objects. How is it possible that during one
> life of a reader fiber it manages to see 2 states and is not terminated?

You're correct. This isn't possible for an applier to see two states, 
anon and not anon.
The flag is still needed though for the case when a normal replica 
receives some transient
error during final join. In this case applier reconnects and we get to 
the next applier loop
iteration. First it checks whether REPLICASET_UUID is nil. It isn't, 
because initial join succeeded.
Then it checks whether instance_id is 0. It is, because final join failed.
Applier now assumes that the replica was anonymous and tries to register.

The hang I'm talking about is in `bootstrap_from_master()`. It waits 
until applier enters
APPLIER_JOINED state, which never happened before this patch.

So, `was_anon` comes in play only when final join fails and is retried.

>
> Also could you please provide a test? Maybe it would be easier to see
> what is happening then.

Ok. I'm not sure this test is needed because this is implicitly tested 
in gh-5566-final-join-synchro test.

A test would be as follows:
master:
     box.cfg{listen=3301, replication_synchro_quorum=10}
     box.space._cluster:alter{is_sync=true}
     box.schema.user.grant("guest", "replication")
replica:
     box.cfg{replication=3301}
master: wait until replica receives ER_SYNC_QUORUM_TIMEOUT, and then:
     box.cfg{replication_synchro_quorum=1}

This test passes on the branch, meaning replica's box.cfg completes 
successfully,
but it would hang indefinitely without this commit.

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 2/7] applier: extract tx boundary checks from applier_read_tx into a separate routine
  2021-03-26 12:35   ` Cyrill Gorcunov via Tarantool-patches
@ 2021-03-27 16:54     ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-27 16:54 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: v.shpilevoy, tarantool-patches



26.03.2021 15:35, Cyrill Gorcunov пишет:
> On Wed, Mar 24, 2021 at 03:24:12PM +0300, Serge Petrenko wrote:
>> Introduce a new routine, set_next_tx_row(), which checks tx boundary
>> violation and appends the new row to the current tx in case everything
>> is ok.
>>
>> set_next_tx_row() is extracted from applier_read_tx() because it's a
>> common part of transaction assembly both for recovery and applier.
>>
>> The only difference for recovery will be that the routine which's
>> responsible for tx assembly won't read rows. It'll be a callback ran on
>> each new row being read from WAL.
>>
>> Prerequisite #5874
>> Part-of #5566
>> ---
>>   src/box/applier.cc | 117 +++++++++++++++++++++++----------------------
>>   1 file changed, 60 insertions(+), 57 deletions(-)
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 326cf18d2..65afa5e98 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -657,6 +657,64 @@ applier_read_tx_row(struct applier *applier)
>>   	return tx_row;
>>   }
>>   
>> +static inline int64_t
>> +set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
> Serge, why it is "inline"? I propose to not use inline until really needed.
> The compiler may ignore inline word, same time if function is small the
> compiler may inline it as a part of optimisation stage.
>
> Not a big deal, feel free to ignore, but I see "inline" is used too much all over the code :(
>
> https://www.kernel.org/doc/html/v4.10/process/coding-style.html#the-inline-disease

Thanks for the review!
No problem.

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 65afa5e98..00869d480 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -657,7 +657,7 @@ applier_read_tx_row(struct applier *applier)
      return tx_row;
  }

-static inline int64_t
+static int64_t
  set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, 
int64_t tsn)
  {
      struct xrow_header *row = &tx_row->row;


-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 3/7] applier: extract plain tx application from applier_apply_tx()
  2021-03-26 20:47   ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-27 17:34     ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-27 17:34 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



26.03.2021 23:47, Vladislav Shpilevoy пишет:
> Thanks for the patch!
>
> See 4 comments below.

Thanks for the review!

>
> On 24.03.2021 13:24, Serge Petrenko wrote:
>> The new routine, called apply_plain_tx(), may be used not only by
>> applier_apply_tx(), but also by final join, once we make it
>> transactional, and recovery, once it's also turned transactional.
>>
>> Also, while we're at it. Remove excess fiber_gc() call from
>> applier_subscribe loop. Let's better make sure fiber_gc() is called on
>> any return from applier_apply_tx().
>>
>> Prerequisite #5874
>> Part of #5566
>> ---
>>   src/box/applier.cc | 188 ++++++++++++++++++++++-----------------------
>>   1 file changed, 93 insertions(+), 95 deletions(-)
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 65afa5e98..07e557a51 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -905,6 +905,90 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>>   	return box_raft_process(&req, applier->instance_id);
>>   }
>>   
>> +static inline int
>> +apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
>> +{
>> +	/**
> 1. Inside of functions for comment first line we use /*, not /**.

Sure, fixed.

>
>> +	 * Explicitly begin the transaction so that we can
>> +	 * control fiber->gc life cycle and, in case of apply
>> +	 * conflict safely access failed xrow object and allocate
>> +	 * IPROTO_NOP on gc.
>> +	 */
>> +	struct txn *txn = txn_begin();
>> +	struct applier_tx_row *item;
>> +	if (txn == NULL)
>> +		 return -1;
>> +
>> +	stailq_foreach_entry(item, rows, next) {
>> +		struct xrow_header *row = &item->row;
>> +		int res = apply_row(row);
>> +		if (res != 0 && skip_conflict) {
>> +			struct error *e = diag_last_error(diag_get());
>> +			/*
>> +			 * In case of ER_TUPLE_FOUND error and enabled
>> +			 * replication_skip_conflict configuration
>> +			 * option, skip applying the foreign row and
>> +			 * replace it with NOP in the local write ahead
>> +			 * log.
>> +			 */
>> +			if (e->type == &type_ClientError &&
>> +			    box_error_code(e) == ER_TUPLE_FOUND &&
>> +			    replication_skip_conflict) {
> 2. That looks kind of confusing - you pass skip_conflict option but
> also use replication_skip_conflict. You could calculate skip_conflict
> based on replication_skip_conflict in your patch.

Yes, indeed. Thanks for noticing!

>
>> +				diag_clear(diag_get());
>> +				row->type = IPROTO_NOP;
>> +				row->bodycnt = 0;
>> +				res = apply_row(row);
>> +			}
>> +		}
>> +		if (res != 0)
>> +			goto fail;
>> +	}
>> +
>> +	/*
>> +	 * We are going to commit so it's a high time to check if
>> +	 * the current transaction has non-local effects.
>> +	 */
>> +	if (txn_is_distributed(txn)) {
>> +		/*
>> +		 * A transaction mixes remote and local rows.
>> +		 * Local rows must be replicated back, which
>> +		 * doesn't make sense since the master likely has
>> +		 * new changes which local rows may overwrite.
>> +		 * Raise an error.
>> +		 */
>> +		diag_set(ClientError, ER_UNSUPPORTED, "Replication",
>> +			 "distributed transactions");
>> +		goto fail;
>> +	}
>> +
>> +	if (use_triggers) {
>> +		/* We are ready to submit txn to wal. */
>> +		struct trigger *on_rollback, *on_wal_write;
>> +		size_t size;
>> +		on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
>> +						  &size);
>> +		on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
>> +						   &size);
>> +		if (on_rollback == NULL || on_wal_write == NULL) {
>> +			diag_set(OutOfMemory, size, "region_alloc_object",
>> +				 "on_rollback/on_wal_write");
>> +			goto fail;
>> +		}
>> +
>> +		trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
>> +		txn_on_rollback(txn, on_rollback);
>> +
>> +		trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
>> +		txn_on_wal_write(txn, on_wal_write);
>> +	}
>> +
>> +	return txn_commit_try_async(txn);
>> +fail:
>> +	txn_rollback(txn);
>> +	return -1;
>> +}
>> @@ -974,103 +1058,18 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>>   		assert(first_row == last_row);
>>   		if (apply_synchro_row(first_row) != 0)
>>   			diag_raise();
> 3. Hm. Isn't it a bug that we raise an error here, but don't unlock the
> latch and don't call fiber_gc()? Looks like a separate bug. Could you
> fix it please, and probably with a test? Can it be related to the
> hang you fix in the previous commit?

It is a bug, yes. Will fix in a commit on top. It's not related to the 
hang we spoke of
in the previous letter though.

>
>> -		goto success;
>> -	}
>> -
>> -	/**
>> -	 * Explicitly begin the transaction so that we can
>> -	 * control fiber->gc life cycle and, in case of apply
>> -	 * conflict safely access failed xrow object and allocate
>> -	 * IPROTO_NOP on gc.
>> -	 */
>> -	struct txn *txn;
>> -	txn = txn_begin();
>> -	struct applier_tx_row *item;
>> -	if (txn == NULL) {
>> -		latch_unlock(latch);
>> -		return -1;
>> -	}
>> -	stailq_foreach_entry(item, rows, next) {
>> -		struct xrow_header *row = &item->row;
>> -		int res = apply_row(row);
>> -		if (res != 0) {
>> -			struct error *e = diag_last_error(diag_get());
>> -			/*
>> -			 * In case of ER_TUPLE_FOUND error and enabled
>> -			 * replication_skip_conflict configuration
>> -			 * option, skip applying the foreign row and
>> -			 * replace it with NOP in the local write ahead
>> -			 * log.
>> -			 */
>> -			if (e->type == &type_ClientError &&
>> -			    box_error_code(e) == ER_TUPLE_FOUND &&
>> -			    replication_skip_conflict) {
>> -				diag_clear(diag_get());
>> -				row->type = IPROTO_NOP;
>> -				row->bodycnt = 0;
>> -				res = apply_row(row);
>> -			}
>> -		}
>> -		if (res != 0)
>> -			goto rollback;
>> -	}
>> -	/*
>> -	 * We are going to commit so it's a high time to check if
>> -	 * the current transaction has non-local effects.
>> -	 */
>> -	if (txn_is_distributed(txn)) {
>> -		/*
>> -		 * A transaction mixes remote and local rows.
>> -		 * Local rows must be replicated back, which
>> -		 * doesn't make sense since the master likely has
>> -		 * new changes which local rows may overwrite.
>> -		 * Raise an error.
>> -		 */
>> -		diag_set(ClientError, ER_UNSUPPORTED,
>> -			 "Replication", "distributed transactions");
>> -		goto rollback;
>> +		goto written;
>>   	}
>>   
>> -	/* We are ready to submit txn to wal. */
>> -	struct trigger *on_rollback, *on_wal_write;
>> -	size_t size;
>> -	on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
>> -					  &size);
>> -	on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
>> -					   &size);
>> -	if (on_rollback == NULL || on_wal_write == NULL) {
>> -		diag_set(OutOfMemory, size, "region_alloc_object",
>> -			 "on_rollback/on_wal_write");
>> -		goto rollback;
>> +	if ((rc = apply_plain_tx(rows, true, true)) == 0) {
>> +written:
>> +		vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
>> +			      last_row->lsn);
>>   	}
>> -
>> -	trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
>> -	txn_on_rollback(txn, on_rollback);
>> -
>> -	trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
>> -	txn_on_wal_write(txn, on_wal_write);
>> -
>> -	if (txn_commit_try_async(txn) < 0)
>> -		goto fail;
>> -
>> -success:
>> -	/*
>> -	 * The transaction was sent to journal so promote vclock.
>> -	 *
>> -	 * Use the lsn of the last row to guard from 1.10
>> -	 * instances, which send every single tx row as a separate
>> -	 * transaction.
>> -	 */
>> -	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
>> -		      last_row->lsn);
>> -	latch_unlock(latch);
>> -	return 0;
>> -rollback:
>> -	txn_rollback(txn);
>> -fail:
>> +no_write:
> 4. You go to this label even when write was done. Maybe rename to
> 'end' or 'finish'?
>
> Consider this diff:
>
> ====================
> @@ -1027,7 +1027,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>   	latch_lock(latch);
>   	if (vclock_get(&replicaset.applier.vclock,
>   		       last_row->replica_id) >= last_row->lsn) {
> -		goto no_write;
> +		goto finish;
>   	} else if (vclock_get(&replicaset.applier.vclock,
>   			      first_row->replica_id) >= first_row->lsn) {
>   		/*
> @@ -1058,15 +1058,12 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
>   		assert(first_row == last_row);
>   		if (apply_synchro_row(first_row) != 0)
>   			diag_raise();
> -		goto written;
> +	} else if ((rc = apply_plain_tx(rows, true, true)) != 0) {
> +		goto finish;
>   	}
> -
> -	if ((rc = apply_plain_tx(rows, true, true)) == 0) {
> -written:
> -		vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
> -			      last_row->lsn);
> -	}
> -no_write:
> +	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
> +		      last_row->lsn);
> +finish:
>   	latch_unlock(latch);
>   	fiber_gc();
>   	return rc;
> ====================

Looks good, applied. Incremental diff below.

========================================

diff --git a/src/box/applier.cc b/src/box/applier.cc
index f396e43a8..e6d9673dd 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -908,7 +908,7 @@ applier_handle_raft(struct applier *applier, struct 
xrow_header *row)
  static inline int
  apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
  {
-       /**
+       /*
          * Explicitly begin the transaction so that we can
          * control fiber->gc life cycle and, in case of apply
          * conflict safely access failed xrow object and allocate
@@ -932,8 +932,7 @@ apply_plain_tx(struct stailq *rows, bool 
skip_conflict, bool use_triggers)
                          * log.
                          */
                         if (e->type == &type_ClientError &&
-                           box_error_code(e) == ER_TUPLE_FOUND &&
-                           replication_skip_conflict) {
+                           box_error_code(e) == ER_TUPLE_FOUND) {
                                 diag_clear(diag_get());
                                 row->type = IPROTO_NOP;
                                 row->bodycnt = 0;
@@ -1027,7 +1026,7 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
         latch_lock(latch);
         if (vclock_get(&replicaset.applier.vclock,
                        last_row->replica_id) >= last_row->lsn) {
-               goto no_write;
+               goto finish;
         } else if (vclock_get(&replicaset.applier.vclock,
                               first_row->replica_id) >= first_row->lsn) {
                 /*
@@ -1058,15 +1057,13 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
                 assert(first_row == last_row);
                 if (apply_synchro_row(first_row) != 0)
                         diag_raise();
-               goto written;
-       }
-
-       if ((rc = apply_plain_tx(rows, true, true)) == 0) {
-written:
-               vclock_follow(&replicaset.applier.vclock, 
last_row->replica_id,
-                             last_row->lsn);
+       } else if ((rc = apply_plain_tx(rows, replication_skip_conflict,
+                                       true)) != 0) {
+               goto finish;
         }
-no_write:
+       vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
+                     last_row->lsn);
+finish:
         latch_unlock(latch);
         fiber_gc();
         return rc;

========================================
>
>>   	latch_unlock(latch);
>>   	fiber_gc();
>> -	return -1;
>> +	return rc;
>>   }

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 33+ messages in thread

* [Tarantool-patches] [PATCH v2 3.5/7] applier: fix not releasing the latch on apply_synchro_row() fail
  2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 3/7] applier: extract plain tx application from applier_apply_tx() Serge Petrenko via Tarantool-patches
  2021-03-26 20:47   ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-27 18:30   ` Serge Petrenko via Tarantool-patches
  2021-03-29 21:50     ` Vladislav Shpilevoy via Tarantool-patches
  1 sibling, 1 reply; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-27 18:30 UTC (permalink / raw)
  To: v.shpilevoy, gorcunov; +Cc: tarantool-patches

Once apply_synchro_row() failed, applier_apply_tx() would simply raise
an error without unlocking replica latch. This lead to all the appliers
hanging indefinitely on trying to lock the latch for this replica.

In scope of #5566
---
  src/box/applier.cc                            |   4 +-
  test/replication/hang_on_synchro_fail.result  | 130 ++++++++++++++++++
  .../replication/hang_on_synchro_fail.test.lua |  57 ++++++++
  test/replication/suite.cfg                    |   1 +
  test/replication/suite.ini                    |   2 +-
  5 files changed, 191 insertions(+), 3 deletions(-)
  create mode 100644 test/replication/hang_on_synchro_fail.result
  create mode 100644 test/replication/hang_on_synchro_fail.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index e6d9673dd..41abe64f9 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -1055,8 +1055,8 @@ applier_apply_tx(struct applier *applier, struct 
stailq *rows)
           * each other.
           */
          assert(first_row == last_row);
-        if (apply_synchro_row(first_row) != 0)
-            diag_raise();
+        if ((rc = apply_synchro_row(first_row)) != 0)
+            goto finish;
      } else if ((rc = apply_plain_tx(rows, replication_skip_conflict,
                      true)) != 0) {
          goto finish;
diff --git a/test/replication/hang_on_synchro_fail.result 
b/test/replication/hang_on_synchro_fail.result
new file mode 100644
index 000000000..9f6fac00b
--- /dev/null
+++ b/test/replication/hang_on_synchro_fail.result
@@ -0,0 +1,130 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+--
+-- All appliers could hang after failing to apply a synchronous 
message: either
+-- CONFIRM or ROLLBACK.
+--
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+
+_ = box.schema.space.create('sync', {is_sync=true})
+ | ---
+ | ...
+_ = box.space.sync:create_index('pk')
+ | ---
+ | ...
+
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+ | ---
+ | ...
+box.cfg{replication_synchro_quorum=3}
+ | ---
+ | ...
+-- A huge timeout so that we can perform some actions on a replica before
+-- writing ROLLBACK.
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+ | ---
+ | ...
+box.cfg{replication_synchro_timeout=1000}
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+
+_ = fiber.new(box.space.sync.insert, box.space.sync, {1})
+ | ---
+ | ...
+test_run:wait_lsn('replica', 'default')
+ | ---
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+
+box.error.injection.set('ERRINJ_WAL_IO', true)
+ | ---
+ | - ok
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+box.cfg{replication_synchro_timeout=0.01}
+ | ---
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+
+test_run:wait_upstream(1, {status='stopped',\
+                           message_re='Failed to write to disk'})
+ | ---
+ | - true
+ | ...
+box.error.injection.set('ERRINJ_WAL_IO', false)
+ | ---
+ | - ok
+ | ...
+
+-- Applier is killed due to a failed WAL write, so restart replication to
+-- check whether it hangs or not. Actually this single applier would 
fail an
+-- assertion rather than hang, but all the other appliers, if any, 
would hang.
+old_repl = box.cfg.replication
+ | ---
+ | ...
+box.cfg{replication=""}
+ | ---
+ | ...
+box.cfg{replication=old_repl}
+ | ---
+ | ...
+
+test_run:wait_upstream(1, {status='follow'})
+ | ---
+ | - true
+ | ...
+
+-- Cleanup.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_quorum=old_synchro_quorum,\
+        replication_synchro_timeout=old_synchro_timeout}
+ | ---
+ | ...
+box.space.sync:drop()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+
diff --git a/test/replication/hang_on_synchro_fail.test.lua 
b/test/replication/hang_on_synchro_fail.test.lua
new file mode 100644
index 000000000..6c3b09fab
--- /dev/null
+++ b/test/replication/hang_on_synchro_fail.test.lua
@@ -0,0 +1,57 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+--
+-- All appliers could hang after failing to apply a synchronous 
message: either
+-- CONFIRM or ROLLBACK.
+--
+box.schema.user.grant('guest', 'replication')
+
+_ = box.schema.space.create('sync', {is_sync=true})
+_ = box.space.sync:create_index('pk')
+
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+box.cfg{replication_synchro_quorum=3}
+-- A huge timeout so that we can perform some actions on a replica before
+-- writing ROLLBACK.
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+box.cfg{replication_synchro_timeout=1000}
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/replica.lua"')
+test_run:cmd('start server replica')
+
+_ = fiber.new(box.space.sync.insert, box.space.sync, {1})
+test_run:wait_lsn('replica', 'default')
+
+test_run:switch('replica')
+
+box.error.injection.set('ERRINJ_WAL_IO', true)
+
+test_run:switch('default')
+
+box.cfg{replication_synchro_timeout=0.01}
+
+test_run:switch('replica')
+
+test_run:wait_upstream(1, {status='stopped',\
+                           message_re='Failed to write to disk'})
+box.error.injection.set('ERRINJ_WAL_IO', false)
+
+-- Applier is killed due to a failed WAL write, so restart replication to
+-- check whether it hangs or not. Actually this single applier would 
fail an
+-- assertion rather than hang, but all the other appliers, if any, 
would hang.
+old_repl = box.cfg.replication
+box.cfg{replication=""}
+box.cfg{replication=old_repl}
+
+test_run:wait_upstream(1, {status='follow'})
+
+-- Cleanup.
+test_run:switch('default')
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.cfg{replication_synchro_quorum=old_synchro_quorum,\
+        replication_synchro_timeout=old_synchro_timeout}
+box.space.sync:drop()
+box.schema.user.revoke('guest', 'replication')
+
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 7e7004592..c1c329438 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -22,6 +22,7 @@
      "status.test.lua": {},
      "wal_off.test.lua": {},
      "hot_standby.test.lua": {},
+    "hang_on_synchro_fail.test.lua": {},
      "rebootstrap.test.lua": {},
      "wal_rw_stress.test.lua": {},
      "force_recovery.test.lua": {},
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index dcd711a2a..fc161700a 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
  script =  master.lua
  description = tarantool/box, replication
  disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua 
gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua 
qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua 
sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua 
gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua 
gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua 
gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua 
gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua 
qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua 
sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua 
gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua 
gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua 
gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua 
hang_on_synchro_fail.test.lua
  config = suite.cfg
  lua_libs = lua/fast_replica.lua lua/rlimit.lua
  use_unix_sockets = True
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional
  2021-03-26 20:49   ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-27 19:05     ` Serge Petrenko via Tarantool-patches
  2021-03-29 21:51       ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-27 19:05 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



26.03.2021 23:49, Vladislav Shpilevoy пишет:
> I appreciate the work you did here!
Thanks!

>
> See 3 comments below.
>
> On 24.03.2021 13:24, Serge Petrenko wrote:
>> Now applier assembles rows into transactions not only on subscribe
>> stage, but also during final join / register.
>>
>> This was necessary for correct handling of rolled back synchronous
>> transactions in final join stream.
>>
>> Part of #5566
>> ---
>>   src/box/applier.cc | 126 ++++++++++++++++++++++-----------------------
>>   1 file changed, 61 insertions(+), 65 deletions(-)
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index d53f13711..9a8b0f0fc 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -524,27 +509,19 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>>   	 * Receive final data.
>>   	 */
>>   	while (true) {
>> -		coio_read_xrow(coio, ibuf, &row);
>> -		applier->last_row_time = ev_monotonic_now(loop());
>> -		if (iproto_type_is_dml(row.type)) {
>> -			vclock_follow_xrow(&replicaset.vclock, &row);
>> -			if (apply_final_join_row(&row) != 0)
>> -				diag_raise();
>> -			if (++row_count % 100000 == 0)
>> -				say_info("%.1fM rows received", row_count / 1e6);
>> -		} else if (row.type == IPROTO_OK) {
>> -			/*
>> -			 * Current vclock. This is not used now,
>> -			 * ignore.
>> -			 */
> 1. The comment was helpful, lets keep it.

Ok, sure.

>
>> -			++row_count;
>> -			break; /* end of stream */
>> -		} else if (iproto_type_is_error(row.type)) {
>> -			xrow_decode_error_xc(&row);  /* rethrow error */
>> -		} else {
>> -			tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
>> -				  (uint32_t) row.type);
>> +		struct stailq rows;
>> +		applier_read_tx(applier, &rows, &row_count);
>> +		struct xrow_header *first_row =
>> +			&stailq_first_entry(&rows, struct applier_tx_row,
>> +					    next)->row;
>> +		if (first_row->type == IPROTO_OK) {
>> +			assert(first_row ==
>> +			       &stailq_last_entry(&rows, struct applier_tx_row,
>> +						  next)->row);
>> +			break;
>>   		}
>> +		if (apply_final_join_tx(&rows) != 0)
>> +			diag_raise();
>>   	}
>>   
>>   	return row_count;
>> @@ -646,8 +613,11 @@ applier_read_tx_row(struct applier *applier)
>>   	 * messages so we can't assume that if we haven't heard
>>   	 * from the master for quite a while the connection is
>>   	 * broken - the master might just be idle.
>> +	 * Also there are no timeouts during final join and register.
>>   	 */
>> -	if (applier->version_id < version_id(1, 7, 7))
>> +	if (applier->version_id < version_id(1, 7, 7) ||
>> +	    applier->state == APPLIER_FINAL_JOIN ||
>> +	    applier->state == APPLIER_REGISTER)
> 2. Maybe it would be better to pass the timeout from the upper level and
> always use coio_read_xrow_timeout_xc(). For the mentioned conditions
> it would be infinity. Anyway the non-timed version in the end uses
> TIMEOUT_INFINITY too (coio_read_ahead). That way it would be less
> tricky conditions checks in the generic code.

Thanks for the suggestion! Applied.

>
>>   		coio_read_xrow(coio, ibuf, row);
>>   	else
>>   		coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
>> @@ -731,6 +702,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
>>   	do {
>>   		struct applier_tx_row *tx_row = applier_read_tx_row(applier);
>>   		tsn = set_next_tx_row(rows, tx_row, tsn);
>> +
>> +		if (row_count != NULL && ++*row_count % 100000 == 0)
>> +			say_info("%.1fM rows received", *row_count / 1e6);
> 3. Hm. That adds branching and heavy '%' operation. Maybe you could make it
> return number of rows and in the caller code do this check + log. So it
> would affect only the joins.

Good idea, thanks! Please see an incremental diff below.

>
>> @@ -1254,7 +1250,7 @@ applier_subscribe(struct applier *applier)
>>   		}
>>   
>>   		struct stailq rows;
>> -		applier_read_tx(applier, &rows);
>> +		applier_read_tx(applier, &rows, NULL);
>>   
>>   		/*
>>   		 * In case of an heartbeat message wake a writer up
>>

=================================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index b96eb360b..0f4492fe3 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -477,9 +477,8 @@ applier_fetch_snapshot(struct applier *applier)
      applier_set_state(applier, APPLIER_READY);
  }

-static void
-applier_read_tx(struct applier *applier, struct stailq *rows,
-        uint64_t *row_count);
+static uint64_t
+applier_read_tx(struct applier *applier, struct stailq *rows, double 
timeout);

  static int
  apply_final_join_tx(struct stailq *rows);
@@ -497,6 +496,7 @@ struct applier_tx_row {
  static uint64_t
  applier_wait_register(struct applier *applier, uint64_t row_count)
  {
+#define ROWS_PER_LOG 100000
      /*
       * Tarantool < 1.7.0: there is no "final join" stage.
       * Proceed to "subscribe" and do not finish bootstrap
@@ -505,16 +505,23 @@ applier_wait_register(struct applier *applier, 
uint64_t row_count)
      if (applier->version_id < version_id(1, 7, 0))
          return row_count;

+    uint64_t next_log_cnt =
+        row_count + ROWS_PER_LOG - row_count % ROWS_PER_LOG;
      /*
       * Receive final data.
       */
      while (true) {
          struct stailq rows;
-        applier_read_tx(applier, &rows, &row_count);
+        row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY);
+        if (row_count >= next_log_cnt) {
+            say_info("%.1fM rows received", next_log_cnt / 1e6);
+            next_log_cnt += ROWS_PER_LOG;
+        }
          struct xrow_header *first_row =
              &stailq_first_entry(&rows, struct applier_tx_row,
                          next)->row;
          if (first_row->type == IPROTO_OK) {
+            /* Current vclock. This is not used now, ignore. */
              assert(first_row ==
                     &stailq_last_entry(&rows, struct applier_tx_row,
                            next)->row);
@@ -525,6 +532,7 @@ applier_wait_register(struct applier *applier, 
uint64_t row_count)
      }

      return row_count;
+#undef ROWS_PER_LOG
  }

  static void
@@ -594,7 +602,7 @@ applier_join(struct applier *applier)
  }

  static struct applier_tx_row *
-applier_read_tx_row(struct applier *applier)
+applier_read_tx_row(struct applier *applier, double timeout)
  {
      struct ev_io *coio = &applier->io;
      struct ibuf *ibuf = &applier->ibuf;
@@ -607,20 +615,7 @@ applier_read_tx_row(struct applier *applier)

      struct xrow_header *row = &tx_row->row;

-    double timeout = replication_disconnect_timeout();
-    /*
-     * Tarantool < 1.7.7 does not send periodic heartbeat
-     * messages so we can't assume that if we haven't heard
-     * from the master for quite a while the connection is
-     * broken - the master might just be idle.
-     * Also there are no timeouts during final join and register.
-     */
-    if (applier->version_id < version_id(1, 7, 7) ||
-        applier->state == APPLIER_FINAL_JOIN ||
-        applier->state == APPLIER_REGISTER)
-        coio_read_xrow(coio, ibuf, row);
-    else
-        coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+    coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);

      applier->lag = ev_now(loop()) - row->tm;
      applier->last_row_time = ev_monotonic_now(loop());
@@ -692,20 +687,20 @@ set_next_tx_row(struct stailq *rows, struct 
applier_tx_row *tx_row, int64_t tsn)
   * rpos is adjusted as xrow is decoded and the corresponding
   * network input space is reused for the next xrow.
   */
-static void
-applier_read_tx(struct applier *applier, struct stailq *rows,
-        uint64_t *row_count)
+static uint64_t
+applier_read_tx(struct applier *applier, struct stailq *rows, double 
timeout)
  {
      int64_t tsn = 0;
+    uint64_t row_count = 0;

      stailq_create(rows);
      do {
-        struct applier_tx_row *tx_row = applier_read_tx_row(applier);
+        struct applier_tx_row *tx_row = applier_read_tx_row(applier,
+                                    timeout);
          tsn = set_next_tx_row(rows, tx_row, tsn);
-
-        if (row_count != NULL && ++*row_count % 100000 == 0)
-            say_info("%.1fM rows received", *row_count / 1e6);
+        ++row_count;
      } while (tsn != 0);
+    return row_count;
  }

  static void
@@ -1234,6 +1229,15 @@ applier_subscribe(struct applier *applier)
          trigger_clear(&on_rollback);
      });

+    /*
+     * Tarantool < 1.7.7 does not send periodic heartbeat
+     * messages so we can't assume that if we haven't heard
+     * from the master for quite a while the connection is
+     * broken - the master might just be idle.
+     */
+    double timeout = applier->version_id < version_id(1, 7, 7) ?
+             TIMEOUT_INFINITY : replication_disconnect_timeout();
+
      /*
       * Process a stream of rows from the binary log.
       */
@@ -1247,7 +1251,7 @@ applier_subscribe(struct applier *applier)
          }

          struct stailq rows;
-        applier_read_tx(applier, &rows, NULL);
+        applier_read_tx(applier, &rows, timeout);

          /*
           * In case of an heartbeat message wake a writer up

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 6/7] replication: tolerate synchro rollback during final join
  2021-03-26 20:49   ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-27 19:23     ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-27 19:23 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



26.03.2021 23:49, Vladislav Shpilevoy пишет:
> Thanks for working on this!
>
>> diff --git a/test/replication/gh-5566-final-join-synchro.result b/test/replication/gh-5566-final-join-synchro.result
>> new file mode 100644
>> index 000000000..32749bf12
>> --- /dev/null
>> +++ b/test/replication/gh-5566-final-join-synchro.result
>> @@ -0,0 +1,139 @@
>> +-- test-run result file version 2
>> +test_run = require('test_run').new()
>> + | ---
>> + | ...
>> +
>> +--
>> +-- gh-5566 replica tolerates synchronous transactions in final join stream.
>> +--
>> +_ = box.schema.space.create('sync', {is_sync=true})
>> + | ---
>> + | ...
>> +_ = box.space.sync:create_index('pk')
>> + | ---
>> + | ...
>> +
>> +box.schema.user.grant('guest', 'replication')
>> + | ---
>> + | ...
>> +box.schema.user.grant('guest', 'write', 'space', 'sync')
>> + | ---
>> + | ...
>> +
>> +-- Part 1. Make sure a joining instance tolerates synchronous rows in final join
>> +-- stream.
>> +trig = function()\
>> +    box.space.sync:replace{1}\
>> +end
> You might need to increase the synchro timeout because the default can
> be flaky.

I think it's fine, since quorum is 1.
Do you think we can wait for a WAL write for more than 5 seconds?

Anyway, let's change it to a bigger value while we're at it.
I've also changed wait_log's timeout to 60 seconds:

==========================

diff --git a/test/replication/gh-5566-final-join-synchro.result 
b/test/replication/gh-5566-final-join-synchro.result
index 32749bf12..a09882ba6 100644
--- a/test/replication/gh-5566-final-join-synchro.result
+++ b/test/replication/gh-5566-final-join-synchro.result
@@ -35,7 +35,10 @@ _ = box.space._cluster:on_replace(trig)
  orig_synchro_quorum = box.cfg.replication_synchro_quorum
   | ---
   | ...
-box.cfg{replication_synchro_quorum=1}
+orig_synchro_timeout = box.cfg.replication_synchro_timeout
+ | ---
+ | ...
+box.cfg{replication_synchro_quorum=1, replication_synchro_timeout=60}
   | ---
   | ...

@@ -73,9 +76,6 @@ test_run:cmd('delete server replica')

  -- Part 2. Make sure master aborts final join if insert to _cluster is 
rolled
  -- back and replica is capable of retrying it.
-orig_synchro_timeout = box.cfg.replication_synchro_timeout
- | ---
- | ...
  -- Make the trigger we used above fail with no quorum.
  box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.01}
   | ---
@@ -91,7 +91,7 @@ test_run:cmd('start server replica with wait=False')
   | - true
   | ...

-test_run:wait_log('replica', 'ER_SYNC_QUORUM_TIMEOUT', nil, 10)
+test_run:wait_log('replica', 'ER_SYNC_QUORUM_TIMEOUT', nil, 60)
   | ---
   | - ER_SYNC_QUORUM_TIMEOUT
   | ...
diff --git a/test/replication/gh-5566-final-join-synchro.test.lua 
b/test/replication/gh-5566-final-join-synchro.test.lua
index 14302f6e6..2db2c742f 100644
--- a/test/replication/gh-5566-final-join-synchro.test.lua
+++ b/test/replication/gh-5566-final-join-synchro.test.lua
@@ -18,7 +18,8 @@ end
  _ = box.space._cluster:on_replace(trig)

  orig_synchro_quorum = box.cfg.replication_synchro_quorum
-box.cfg{replication_synchro_quorum=1}
+orig_synchro_timeout = box.cfg.replication_synchro_timeout
+box.cfg{replication_synchro_quorum=1, replication_synchro_timeout=60}

  test_run:cmd('create server replica with rpl_master=default,\
script="replication/replica.lua"')
@@ -33,7 +34,6 @@ test_run:cmd('delete server replica')

  -- Part 2. Make sure master aborts final join if insert to _cluster is 
rolled
  -- back and replica is capable of retrying it.
-orig_synchro_timeout = box.cfg.replication_synchro_timeout
  -- Make the trigger we used above fail with no quorum.
  box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.01}
  -- Try to join the replica once again.
@@ -41,7 +41,7 @@ test_run:cmd('create server replica with 
rpl_master=default,\
script="replication/replica.lua"')
  test_run:cmd('start server replica with wait=False')

-test_run:wait_log('replica', 'ER_SYNC_QUORUM_TIMEOUT', nil, 10)
+test_run:wait_log('replica', 'ER_SYNC_QUORUM_TIMEOUT', nil, 60)
  -- Remove the trigger to let the replica connect.
  box.space._cluster:on_replace(nil, trig)

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 7/7] replication: do not ignore replica vclock on register
  2021-03-26 20:50   ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-27 20:13     ` Serge Petrenko via Tarantool-patches
  2021-03-29 21:51       ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-30 12:33       ` Serge Petrenko via Tarantool-patches
  0 siblings, 2 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-27 20:13 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



26.03.2021 23:50, Vladislav Shpilevoy пишет:
> Thanks for the patch!
>
> See 2 comments below.
>
> On 24.03.2021 13:24, Serge Petrenko wrote:
>> There was a bug in box_process_register. It decoded replica's vclock but
>> never used it when sending the registration stream. So the replica might
>> lose the data in range (replica_vclock, start_vclock).
> 1. Could you please add a test?

Yes, sure.

>
>> Follow-up #5566
>> ---
>>   src/box/box.cc | 14 ++++++--------
>>   1 file changed, 6 insertions(+), 8 deletions(-)
>>
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 292a54213..0bcb505a8 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -2154,7 +2154,8 @@ box_process_register(struct ev_io *io, struct xrow_header *header)
>>   			  "wal_mode = 'none'");
>>   	}
>>   
>> -	struct gc_consumer *gc = gc_consumer_register(&replicaset.vclock,
>> +	vclock_reset(&replica_vclock, 0, vclock_get(&replicaset.vclock, 0));
> 2. xrow_decode_register_xc() already returns the vclock with empty 0
> part. Why do you need this reset?

It sets vclock[0] to master's vclock[0]. But it's not needed here, indeed.
Thanks for noticing!

I thought having vclock[0] = 0 would harm gc somehow, but that's not true.

>
>> +	struct gc_consumer *gc = gc_consumer_register(&replica_vclock,
>>   				"replica %s", tt_uuid_str(&instance_uuid));
>>   	if (gc == NULL)
>>   		diag_raise();

=================================

diff --git a/src/box/box.cc b/src/box/box.cc
index 0bcb505a8..ecec8df27 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2154,7 +2154,6 @@ box_process_register(struct ev_io *io, struct 
xrow_header *header)
                "wal_mode = 'none'");
      }

-    vclock_reset(&replica_vclock, 0, vclock_get(&replicaset.vclock, 0));
      struct gc_consumer *gc = gc_consumer_register(&replica_vclock,
                  "replica %s", tt_uuid_str(&instance_uuid));
      if (gc == NULL)
diff --git a/test/replication/anon_register_gap.result 
b/test/replication/anon_register_gap.result
new file mode 100644
index 000000000..24a3548c8
--- /dev/null
+++ b/test/replication/anon_register_gap.result
@@ -0,0 +1,116 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--
+-- When master's registering an anonymous replica, it might ignore the 
replica's
+-- current vclock, and skip the data in range (replica_clock, 
master_clock).
+--
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+_ = box.schema.space.create('test')
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/anon1.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+
+test_run:wait_lsn('replica', 'default')
+ | ---
+ | ...
+box.error.injection.set('ERRINJ_RELAY_SEND_DELAY', true)
+ | ---
+ | - ok
+ | ...
+
+box.space.test:insert{1}
+ | ---
+ | - [1]
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+
+test_run:wait_upstream(1, {status='disconnected'})
+ | ---
+ | - true
+ | ...
+box.space.test:select{}
+ | ---
+ | - []
+ | ...
+
+fiber = require('fiber')
+ | ---
+ | ...
+f = fiber.new(function() box.cfg{replication_anon=false} end)
+ | ---
+ | ...
+test_run:wait_upstream(1, {status='register'})
+ | ---
+ | - true
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+box.error.injection.set('ERRINJ_RELAY_SEND_DELAY', false)
+ | ---
+ | - ok
+ | ...
+box.space.test:insert{2}
+ | ---
+ | - [2]
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('replica', 'default')
+ | ---
+ | ...
+f:status()
+ | ---
+ | - dead
+ | ...
+box.space.test:select{}
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+
+-- Cleanup
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+box.space.test:drop()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
diff --git a/test/replication/anon_register_gap.test.lua 
b/test/replication/anon_register_gap.test.lua
new file mode 100644
index 000000000..c71576a23
--- /dev/null
+++ b/test/replication/anon_register_gap.test.lua
@@ -0,0 +1,43 @@
+test_run = require('test_run').new()
+
+--
+-- When master's registering an anonymous replica, it might ignore the 
replica's
+-- current vclock, and skip the data in range (replica_clock, 
master_clock).
+--
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+
+test_run:cmd('create server replica with rpl_master=default,\
+              script="replication/anon1.lua"')
+test_run:cmd('start server replica')
+
+test_run:wait_lsn('replica', 'default')
+box.error.injection.set('ERRINJ_RELAY_SEND_DELAY', true)
+
+box.space.test:insert{1}
+
+test_run:switch('replica')
+
+test_run:wait_upstream(1, {status='disconnected'})
+box.space.test:select{}
+
+fiber = require('fiber')
+f = fiber.new(function() box.cfg{replication_anon=false} end)
+test_run:wait_upstream(1, {status='register'})
+
+test_run:switch('default')
+box.error.injection.set('ERRINJ_RELAY_SEND_DELAY', false)
+box.space.test:insert{2}
+
+test_run:switch('replica')
+test_run:wait_lsn('replica', 'default')
+f:status()
+box.space.test:select{}
+
+-- Cleanup
+test_run:switch('default')
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index aea8b6157..aff5fda26 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -1,5 +1,6 @@
  {
      "anon.test.lua": {},
+    "anon_register_gap.test.lua": {},
      "gh-2991-misc-asserts-on-update.test.lua": {},
      "gh-3111-misc-rebootstrap-from-ro-master.test.lua": {},
      "gh-3160-misc-heartbeats-on-master-changes.test.lua": {},
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index fc161700a..a9e44e8cf 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
  script =  master.lua
  description = tarantool/box, replication
  disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua 
gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua 
qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua 
sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua 
gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua 
gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua 
gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua 
hang_on_synchro_fail.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua 
gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua 
qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua 
sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua 
gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua 
gh-5144-qsync-dup-confirm.test.lua gh-5167-qsync-rollback-snap.test.lua 
gh-5506-election-on-off.test.lua gh-5536-wal-limit.test.lua 
hang_on_synchro_fail.test.lua anon_register_gap.test.lua
  config = suite.cfg
  lua_libs = lua/fast_replica.lua lua/rlimit.lua
  use_unix_sockets = True

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 1/7] replication: fix a hang on final join retry
  2021-03-27 16:52     ` Serge Petrenko via Tarantool-patches
@ 2021-03-29 21:50       ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 0 replies; 33+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-29 21:50 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Hi! Nice explanation!

I think I understood everything now, no need in a separate test, thanks.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 3.5/7] applier: fix not releasing the latch on apply_synchro_row() fail
  2021-03-27 18:30   ` [Tarantool-patches] [PATCH v2 3.5/7] applier: fix not releasing the latch on apply_synchro_row() fail Serge Petrenko via Tarantool-patches
@ 2021-03-29 21:50     ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-30  8:15       ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 33+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-29 21:50 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Maybe it is worth to add a changelog file for this. But up to you.
I still don't consider anything related to qsync as 'released'.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional
  2021-03-27 19:05     ` Serge Petrenko via Tarantool-patches
@ 2021-03-29 21:51       ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-30  8:15         ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 33+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-29 21:51 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Good job on the fixes!

See 4 comments below.

> =================================
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index b96eb360b..0f4492fe3 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -497,6 +496,7 @@ struct applier_tx_row {
>  static uint64_t
>  applier_wait_register(struct applier *applier, uint64_t row_count)
>  {
> +#define ROWS_PER_LOG 100000

1. Better avoid in-function macro. This can be done as 'const uint64_t' or
as a enum in the beginning of the file.

>      /*
>       * Tarantool < 1.7.0: there is no "final join" stage.
>       * Proceed to "subscribe" and do not finish bootstrap
> @@ -505,16 +505,23 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>      if (applier->version_id < version_id(1, 7, 0))
>          return row_count;
> 
> +    uint64_t next_log_cnt =
> +        row_count + ROWS_PER_LOG - row_count % ROWS_PER_LOG;
>      /*
>       * Receive final data.
>       */
>      while (true) {
>          struct stailq rows;
> -        applier_read_tx(applier, &rows, &row_count);
> +        row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY);
> +        if (row_count >= next_log_cnt) {
> +            say_info("%.1fM rows received", next_log_cnt / 1e6);
> +            next_log_cnt += ROWS_PER_LOG;

2. What if row_count > ROWS_PER_LOG? Then it would be printed on the
next transaction immediately again (although I don't know if it is possible
to have such a big transaction).

> +        }
>          struct xrow_header *first_row =
>              &stailq_first_entry(&rows, struct applier_tx_row,
>                          next)->row;
>          if (first_row->type == IPROTO_OK) {
> +            /* Current vclock. This is not used now, ignore. */
>              assert(first_row ==
>                     &stailq_last_entry(&rows, struct applier_tx_row,
>                            next)->row);
> @@ -1234,6 +1229,15 @@ applier_subscribe(struct applier *applier)
>          trigger_clear(&on_rollback);
>      });
> 
> +    /*
> +     * Tarantool < 1.7.7 does not send periodic heartbeat
> +     * messages so we can't assume that if we haven't heard
> +     * from the master for quite a while the connection is
> +     * broken - the master might just be idle.
> +     */
> +    double timeout = applier->version_id < version_id(1, 7, 7) ?
> +             TIMEOUT_INFINITY : replication_disconnect_timeout();

3. What if replication_timeout is changed after first box.cfg{}? It
seems it won't affect the running appliers now, will it?

> +
>      /*
>       * Process a stream of rows from the binary log.
>       */
<...>

> +/** A simpler version of applier_apply_tx() for final join stage. */
> +static int
> +apply_final_join_tx(struct stailq *rows)
> +{
> +	struct xrow_header *first_row =
> +		&stailq_first_entry(rows, struct applier_tx_row, next)->row;
> +	struct xrow_header *last_row =
> +		&stailq_last_entry(rows, struct applier_tx_row, next)->row;
> +	int rc = 0;
> +	/* WAL isn't enabled yet, so follow vclock manually. */
> +	vclock_follow_xrow(&replicaset.vclock, last_row);
> +	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
> +		assert(first_row == last_row);
> +		rc = apply_synchro_row(first_row);
> +		goto end;
> +	}

4. You don't really need the 'end' label here:

====================
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -970,11 +970,9 @@ apply_final_join_tx(struct stailq *rows)
 	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
 		assert(first_row == last_row);
 		rc = apply_synchro_row(first_row);
-		goto end;
+	} else {
+		rc = apply_plain_tx(rows, false, false);
 	}
-
-	rc = apply_plain_tx(rows, false, false);
-end:
 	fiber_gc();
 	return rc;
 }

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 7/7] replication: do not ignore replica vclock on register
  2021-03-27 20:13     ` Serge Petrenko via Tarantool-patches
@ 2021-03-29 21:51       ` Vladislav Shpilevoy via Tarantool-patches
  2021-03-30  8:16         ` Serge Petrenko via Tarantool-patches
  2021-03-30 12:33       ` Serge Petrenko via Tarantool-patches
  1 sibling, 1 reply; 33+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-29 21:51 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Thanks for the fixes!

Maybe it is worth adding a changelog file for this.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 3.5/7] applier: fix not releasing the latch on apply_synchro_row() fail
  2021-03-29 21:50     ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-30  8:15       ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-30  8:15 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



30.03.2021 00:50, Vladislav Shpilevoy пишет:
> Maybe it is worth to add a changelog file for this. But up to you.
> I still don't consider anything related to qsync as 'released'.
No problem:

===============
diff --git a/changelogs/unreleased/applier-hang-synchro.md 
b/changelogs/unreleased/applier-hang-synchro.md
new file mode 100644
index 000000000..66adbd1f9
--- /dev/null
+++ b/changelogs/unreleased/applier-hang-synchro.md
@@ -0,0 +1,4 @@
+## bugfix/replication
+
+* Fix applier hang on a replica after it fails to process CONFIRM or 
ROLLBACK
+  message coming from a master.

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional
  2021-03-29 21:51       ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-30  8:15         ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-30  8:15 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



30.03.2021 00:51, Vladislav Shpilevoy пишет:
> Good job on the fixes!
>
> See 4 comments below.

Thanks for the review!

>
>> =================================
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index b96eb360b..0f4492fe3 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -497,6 +496,7 @@ struct applier_tx_row {
>>   static uint64_t
>>   applier_wait_register(struct applier *applier, uint64_t row_count)
>>   {
>> +#define ROWS_PER_LOG 100000
> 1. Better avoid in-function macro. This can be done as 'const uint64_t' or
> as a enum in the beginning of the file.

Ok.

>
>>       /*
>>        * Tarantool < 1.7.0: there is no "final join" stage.
>>        * Proceed to "subscribe" and do not finish bootstrap
>> @@ -505,16 +505,23 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>>       if (applier->version_id < version_id(1, 7, 0))
>>           return row_count;
>>
>> +    uint64_t next_log_cnt =
>> +        row_count + ROWS_PER_LOG - row_count % ROWS_PER_LOG;
>>       /*
>>        * Receive final data.
>>        */
>>       while (true) {
>>           struct stailq rows;
>> -        applier_read_tx(applier, &rows, &row_count);
>> +        row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY);
>> +        if (row_count >= next_log_cnt) {
>> +            say_info("%.1fM rows received", next_log_cnt / 1e6);
>> +            next_log_cnt += ROWS_PER_LOG;
> 2. What if row_count > ROWS_PER_LOG? Then it would be printed on the
> next transaction immediately again (although I don't know if it is possible
> to have such a big transaction).

First of all, I don't think someone will have a 100k-row-long transaction.
Secondly, even if this is the case, yes, the second info message will be
printed almost immediately after the first one. But is it a problem?
Say, we had row_count = 2 599 999, then we receive a transaction worth
100 001 rows. We'll print 2.6M rows received first, and then 2.7M rows 
received
after the next transaction.

An alternative would be:
```
while (row_count >= next_log_cnt) {
     say_info(...)
     next_log_cnt += ROWS_PER_LOG
}
```
I like this more, actually, so let's change. Thanks for pointing this out!

>
>> +        }
>>           struct xrow_header *first_row =
>>               &stailq_first_entry(&rows, struct applier_tx_row,
>>                           next)->row;
>>           if (first_row->type == IPROTO_OK) {
>> +            /* Current vclock. This is not used now, ignore. */
>>               assert(first_row ==
>>                      &stailq_last_entry(&rows, struct applier_tx_row,
>>                             next)->row);
>> @@ -1234,6 +1229,15 @@ applier_subscribe(struct applier *applier)
>>           trigger_clear(&on_rollback);
>>       });
>>
>> +    /*
>> +     * Tarantool < 1.7.7 does not send periodic heartbeat
>> +     * messages so we can't assume that if we haven't heard
>> +     * from the master for quite a while the connection is
>> +     * broken - the master might just be idle.
>> +     */
>> +    double timeout = applier->version_id < version_id(1, 7, 7) ?
>> +             TIMEOUT_INFINITY : replication_disconnect_timeout();
> 3. What if replication_timeout is changed after first box.cfg{}? It
> seems it won't affect the running appliers now, will it?

Missed that, thanks!

>
>> +
>>       /*
>>        * Process a stream of rows from the binary log.
>>        */
> <...>
>
>> +/** A simpler version of applier_apply_tx() for final join stage. */
>> +static int
>> +apply_final_join_tx(struct stailq *rows)
>> +{
>> +	struct xrow_header *first_row =
>> +		&stailq_first_entry(rows, struct applier_tx_row, next)->row;
>> +	struct xrow_header *last_row =
>> +		&stailq_last_entry(rows, struct applier_tx_row, next)->row;
>> +	int rc = 0;
>> +	/* WAL isn't enabled yet, so follow vclock manually. */
>> +	vclock_follow_xrow(&replicaset.vclock, last_row);
>> +	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
>> +		assert(first_row == last_row);
>> +		rc = apply_synchro_row(first_row);
>> +		goto end;
>> +	}
> 4. You don't really need the 'end' label here:
>
> ====================
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -970,11 +970,9 @@ apply_final_join_tx(struct stailq *rows)
>   	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
>   		assert(first_row == last_row);
>   		rc = apply_synchro_row(first_row);
> -		goto end;
> +	} else {
> +		rc = apply_plain_tx(rows, false, false);
>   	}
> -
> -	rc = apply_plain_tx(rows, false, false);
> -end:
>   	fiber_gc();
>   	return rc;
>   }
Good point, thanks!

==========================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 0f4492fe3..f00ffbd34 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -59,6 +59,13 @@

  STRS(applier_state, applier_STATE);

+enum {
+    /**
+     * How often to log received row count. Used during join and register.
+     */
+    ROWS_PER_LOG = 100000,
+};
+
  static inline void
  applier_set_state(struct applier *applier, enum applier_state state)
  {
@@ -435,7 +442,7 @@ applier_wait_snapshot(struct applier *applier)
          if (iproto_type_is_dml(row.type)) {
              if (apply_snapshot_row(&row) != 0)
                  diag_raise();
-            if (++row_count % 100000 == 0)
+            if (++row_count % ROWS_PER_LOG == 0)
                  say_info("%.1fM rows received", row_count / 1e6);
          } else if (row.type == IPROTO_OK) {
              if (applier->version_id < version_id(1, 7, 0)) {
@@ -496,7 +503,6 @@ struct applier_tx_row {
  static uint64_t
  applier_wait_register(struct applier *applier, uint64_t row_count)
  {
-#define ROWS_PER_LOG 100000
      /*
       * Tarantool < 1.7.0: there is no "final join" stage.
       * Proceed to "subscribe" and do not finish bootstrap
@@ -513,7 +519,7 @@ applier_wait_register(struct applier *applier, 
uint64_t row_count)
      while (true) {
          struct stailq rows;
          row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY);
-        if (row_count >= next_log_cnt) {
+        while (row_count >= next_log_cnt) {
              say_info("%.1fM rows received", next_log_cnt / 1e6);
              next_log_cnt += ROWS_PER_LOG;
          }
@@ -532,7 +538,6 @@ applier_wait_register(struct applier *applier, 
uint64_t row_count)
      }

      return row_count;
-#undef ROWS_PER_LOG
  }

  static void
@@ -970,11 +975,9 @@ apply_final_join_tx(struct stailq *rows)
      if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
          assert(first_row == last_row);
          rc = apply_synchro_row(first_row);
-        goto end;
+    } else {
+        rc = apply_plain_tx(rows, false, false);
      }
-
-    rc = apply_plain_tx(rows, false, false);
-end:
      fiber_gc();
      return rc;
  }
@@ -1229,15 +1232,6 @@ applier_subscribe(struct applier *applier)
          trigger_clear(&on_rollback);
      });

-    /*
-     * Tarantool < 1.7.7 does not send periodic heartbeat
-     * messages so we can't assume that if we haven't heard
-     * from the master for quite a while the connection is
-     * broken - the master might just be idle.
-     */
-    double timeout = applier->version_id < version_id(1, 7, 7) ?
-             TIMEOUT_INFINITY : replication_disconnect_timeout();
-
      /*
       * Process a stream of rows from the binary log.
       */
@@ -1250,6 +1244,16 @@ applier_subscribe(struct applier *applier)
              applier_set_state(applier, APPLIER_FOLLOW);
          }

+        /*
+         * Tarantool < 1.7.7 does not send periodic heartbeat
+         * messages so we can't assume that if we haven't heard
+         * from the master for quite a while the connection is
+         * broken - the master might just be idle.
+         */
+        double timeout = applier->version_id < version_id(1, 7, 7) ?
+                 TIMEOUT_INFINITY :
+                 replication_disconnect_timeout();
+
          struct stailq rows;
          applier_read_tx(applier, &rows, timeout);


-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 7/7] replication: do not ignore replica vclock on register
  2021-03-29 21:51       ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-30  8:16         ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-30  8:16 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



30.03.2021 00:51, Vladislav Shpilevoy пишет:
> Thanks for the fixes!
>
> Maybe it is worth adding a changelog file for this.
Sure:
===================
diff --git a/changelogs/unreleased/anon-register-gap.md 
b/changelogs/unreleased/anon-register-gap.md
new file mode 100644
index 000000000..5866f4b14
--- /dev/null
+++ b/changelogs/unreleased/anon-register-gap.md
@@ -0,0 +1,4 @@
+## bugfix/replication
+
+* Fix master not sending anonymous replica some rows when it fell 
behind and
+  is trying to register.

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 7/7] replication: do not ignore replica vclock on register
  2021-03-27 20:13     ` Serge Petrenko via Tarantool-patches
  2021-03-29 21:51       ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-03-30 12:33       ` Serge Petrenko via Tarantool-patches
  1 sibling, 0 replies; 33+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-03-30 12:33 UTC (permalink / raw)
  To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches



27.03.2021 23:13, Serge Petrenko via Tarantool-patches пишет:
>
>
> 26.03.2021 23:50, Vladislav Shpilevoy пишет:
>> Thanks for the patch!
>>
>> See 2 comments below.
>>
>> On 24.03.2021 13:24, Serge Petrenko wrote:
>>> There was a bug in box_process_register. It decoded replica's vclock 
>>> but
>>> never used it when sending the registration stream. So the replica 
>>> might
>>> lose the data in range (replica_vclock, start_vclock).
>> 1. Could you please add a test?
>
> Yes, sure.
>
>>
>>> Follow-up #5566
>>> ---
>>>   src/box/box.cc | 14 ++++++--------
>>>   1 file changed, 6 insertions(+), 8 deletions(-)
>>>
>>> diff --git a/src/box/box.cc b/src/box/box.cc
>>> index 292a54213..0bcb505a8 100644
>>> --- a/src/box/box.cc
>>> +++ b/src/box/box.cc
>>> @@ -2154,7 +2154,8 @@ box_process_register(struct ev_io *io, struct 
>>> xrow_header *header)
>>>                 "wal_mode = 'none'");
>>>       }
>>>   -    struct gc_consumer *gc = 
>>> gc_consumer_register(&replicaset.vclock,
>>> +    vclock_reset(&replica_vclock, 0, vclock_get(&replicaset.vclock, 
>>> 0));
>> 2. xrow_decode_register_xc() already returns the vclock with empty 0
>> part. Why do you need this reset?
>
> It sets vclock[0] to master's vclock[0]. But it's not needed here, 
> indeed.
> Thanks for noticing!
>
> I thought having vclock[0] = 0 would harm gc somehow, but that's not 
> true.
>

I'm returning this vclock_reset, and here's why.
It doesn't set vclock[0] to 0. It sets it to match master's vclock[0].

This is needed, because recovery tries to find xlog with vclock strictly
less or equal to the vclock provided.
If we leave vclock[0] = 0, recovery will try to open the last xlog which
had vclock[0] = 0. This log may be already deleted, which was the
reason for test failures in CI. Strangely enough, I couldn't reproduce the
issue locally. Anyway, here you go:

================================================
diff --git a/src/box/box.cc b/src/box/box.cc
index ecec8df27..4da274976 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2154,6 +2154,8 @@ box_process_register(struct ev_io *io, struct 
xrow_header *header)
                           "wal_mode = 'none'");
         }

+       /* @sa box_process_subscribe(). */
+       vclock_reset(&replica_vclock, 0, vclock_get(&replicaset.vclock, 0));
         struct gc_consumer *gc = gc_consumer_register(&replica_vclock,
                                 "replica %s", tt_uuid_str(&instance_uuid));
         if (gc == NULL)


================================================

>>
>>> +    struct gc_consumer *gc = gc_consumer_register(&replica_vclock,
>>>                   "replica %s", tt_uuid_str(&instance_uuid));
>>>       if (gc == NULL)
>>>           diag_raise();
>
> =================================
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 0bcb505a8..ecec8df27 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -2154,7 +2154,6 @@ box_process_register(struct ev_io *io, struct 
> xrow_header *header)
>                "wal_mode = 'none'");
>      }
>
> -    vclock_reset(&replica_vclock, 0, vclock_get(&replicaset.vclock, 0));
>      struct gc_consumer *gc = gc_consumer_register(&replica_vclock,
>                  "replica %s", tt_uuid_str(&instance_uuid));
>      if (gc == NULL)
> diff --git a/test/replication/anon_register_gap.result 
> b/test/replication/anon_register_gap.result
> new file mode 100644
> index 000000000..24a3548c8
> --- /dev/null
> +++ b/test/replication/anon_register_gap.result
> @@ -0,0 +1,116 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +
> +--
> +-- When master's registering an anonymous replica, it might ignore 
> the replica's
> +-- current vclock, and skip the data in range (replica_clock, 
> master_clock).
> +--
> +box.schema.user.grant('guest', 'replication')
> + | ---
> + | ...
> +_ = box.schema.space.create('test')
> + | ---
> + | ...
> +_ = box.space.test:create_index('pk')
> + | ---
> + | ...
> +
> +test_run:cmd('create server replica with rpl_master=default,\
> +              script="replication/anon1.lua"')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('start server replica')
> + | ---
> + | - true
> + | ...
> +
> +test_run:wait_lsn('replica', 'default')
> + | ---
> + | ...
> +box.error.injection.set('ERRINJ_RELAY_SEND_DELAY', true)
> + | ---
> + | - ok
> + | ...
> +
> +box.space.test:insert{1}
> + | ---
> + | - [1]
> + | ...
> +
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +
> +test_run:wait_upstream(1, {status='disconnected'})
> + | ---
> + | - true
> + | ...
> +box.space.test:select{}
> + | ---
> + | - []
> + | ...
> +
> +fiber = require('fiber')
> + | ---
> + | ...
> +f = fiber.new(function() box.cfg{replication_anon=false} end)
> + | ---
> + | ...
> +test_run:wait_upstream(1, {status='register'})
> + | ---
> + | - true
> + | ...
> +
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +box.error.injection.set('ERRINJ_RELAY_SEND_DELAY', false)
> + | ---
> + | - ok
> + | ...
> +box.space.test:insert{2}
> + | ---
> + | - [2]
> + | ...
> +
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +test_run:wait_lsn('replica', 'default')
> + | ---
> + | ...
> +f:status()
> + | ---
> + | - dead
> + | ...
> +box.space.test:select{}
> + | ---
> + | - - [1]
> + |   - [2]
> + | ...
> +
> +-- Cleanup
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('stop server replica')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('delete server replica')
> + | ---
> + | - true
> + | ...
> +box.space.test:drop()
> + | ---
> + | ...
> +box.schema.user.revoke('guest', 'replication')
> + | ---
> + | ...
> diff --git a/test/replication/anon_register_gap.test.lua 
> b/test/replication/anon_register_gap.test.lua
> new file mode 100644
> index 000000000..c71576a23
> --- /dev/null
> +++ b/test/replication/anon_register_gap.test.lua
> @@ -0,0 +1,43 @@
> +test_run = require('test_run').new()
> +
> +--
> +-- When master's registering an anonymous replica, it might ignore 
> the replica's
> +-- current vclock, and skip the data in range (replica_clock, 
> master_clock).
> +--
> +box.schema.user.grant('guest', 'replication')
> +_ = box.schema.space.create('test')
> +_ = box.space.test:create_index('pk')
> +
> +test_run:cmd('create server replica with rpl_master=default,\
> +              script="replication/anon1.lua"')
> +test_run:cmd('start server replica')
> +
> +test_run:wait_lsn('replica', 'default')
> +box.error.injection.set('ERRINJ_RELAY_SEND_DELAY', true)
> +
> +box.space.test:insert{1}
> +
> +test_run:switch('replica')
> +
> +test_run:wait_upstream(1, {status='disconnected'})
> +box.space.test:select{}
> +
> +fiber = require('fiber')
> +f = fiber.new(function() box.cfg{replication_anon=false} end)
> +test_run:wait_upstream(1, {status='register'})
> +
> +test_run:switch('default')
> +box.error.injection.set('ERRINJ_RELAY_SEND_DELAY', false)
> +box.space.test:insert{2}
> +
> +test_run:switch('replica')
> +test_run:wait_lsn('replica', 'default')
> +f:status()
> +box.space.test:select{}
> +
> +-- Cleanup
> +test_run:switch('default')
> +test_run:cmd('stop server replica')
> +test_run:cmd('delete server replica')
> +box.space.test:drop()
> +box.schema.user.revoke('guest', 'replication')
> diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
> index aea8b6157..aff5fda26 100644
> --- a/test/replication/suite.cfg
> +++ b/test/replication/suite.cfg
> @@ -1,5 +1,6 @@
>  {
>      "anon.test.lua": {},
> +    "anon_register_gap.test.lua": {},
>      "gh-2991-misc-asserts-on-update.test.lua": {},
>      "gh-3111-misc-rebootstrap-from-ro-master.test.lua": {},
>      "gh-3160-misc-heartbeats-on-master-changes.test.lua": {},
> diff --git a/test/replication/suite.ini b/test/replication/suite.ini
> index fc161700a..a9e44e8cf 100644
> --- a/test/replication/suite.ini
> +++ b/test/replication/suite.ini
> @@ -3,7 +3,7 @@ core = tarantool
>  script =  master.lua
>  description = tarantool/box, replication
>  disabled = consistent.test.lua
> -release_disabled = catch.test.lua errinj.test.lua gc.test.lua 
> gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua 
> qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua 
> sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua 
> gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua 
> gh-5144-qsync-dup-confirm.test.lua 
> gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua 
> gh-5536-wal-limit.test.lua hang_on_synchro_fail.test.lua
> +release_disabled = catch.test.lua errinj.test.lua gc.test.lua 
> gc_no_space.test.lua before_replace.test.lua qsync_advanced.test.lua 
> qsync_errinj.test.lua quorum.test.lua recover_missing_xlog.test.lua 
> sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua 
> gh-4730-applier-rollback.test.lua gh-5140-qsync-casc-rollback.test.lua 
> gh-5144-qsync-dup-confirm.test.lua 
> gh-5167-qsync-rollback-snap.test.lua gh-5506-election-on-off.test.lua 
> gh-5536-wal-limit.test.lua hang_on_synchro_fail.test.lua 
> anon_register_gap.test.lua
>  config = suite.cfg
>  lua_libs = lua/fast_replica.lua lua/rlimit.lua
>  use_unix_sockets = True
>

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final
  2021-03-24 12:24 [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Serge Petrenko via Tarantool-patches
                   ` (7 preceding siblings ...)
  2021-03-26 13:46 ` [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Cyrill Gorcunov via Tarantool-patches
@ 2021-03-30 20:13 ` Vladislav Shpilevoy via Tarantool-patches
  2021-04-05 16:15 ` Kirill Yukhin via Tarantool-patches
  9 siblings, 0 replies; 33+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-03-30 20:13 UTC (permalink / raw)
  To: Serge Petrenko, gorcunov; +Cc: tarantool-patches

Hi! Thanks for the patchset!

LGTM.

Although we probably should not push it now since as you said
the tests start failing until the recovery is transactional.

I will implement it on top of this branch, and later we will
push both bugfixes together.

^ permalink raw reply	[flat|nested] 33+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final
  2021-03-24 12:24 [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Serge Petrenko via Tarantool-patches
                   ` (8 preceding siblings ...)
  2021-03-30 20:13 ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-04-05 16:15 ` Kirill Yukhin via Tarantool-patches
  9 siblings, 0 replies; 33+ messages in thread
From: Kirill Yukhin via Tarantool-patches @ 2021-04-05 16:15 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy

Hello,

On 24 мар 15:24, Serge Petrenko via Tarantool-patches wrote:
> Initially this patches idea was to ignore synchronous rows on applier side and
> make it so that there are no rolled back transactions in final join stream on
> master side.
> 
> Unfortunately, this easy fix didn't work. The reason for it is that once the
> replica receives initial data, up to `start_vclock`, the final join stream has
> to start right at `start_vclock` so that we do not lose any transactions.
> 
> This means that once master encounters a synchro rollback and makes replica
> retry final join to get rid of the rollback, it still has to send it together
> with other data. And this rollback must be processed by applier to avoid
> conflicts.
> 
> In order to let applier process synchro requests (CONFIRM and ROLLBACK) we need
> to make final join transactional, obviously. This is what this patchset does.
> 
> An alternative would be to retry not only final, but also initial join every
> time master receives a rollback during final join stage. This would be too
> violent due to possibly huge data amounts being sent during initial join.
> 
> Changes in v2:
>   - Make applier transactional on final join stage
>   - Remove guards for rollback during final join on master side
>   - Some refactoring in preparation to #5874
> 
> https://github.com/tarantool/tarantool/issues/5566
> https://github.com/tarantool/tarantool/tree/sp/gh-5566-final-join-synchro-v2

I've checked your patchset into 2.6, 2.7 and master.

--
Regards, Kirill Yukhin

^ permalink raw reply	[flat|nested] 33+ messages in thread

end of thread, other threads:[~2021-04-05 16:15 UTC | newest]

Thread overview: 33+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-03-24 12:24 [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Serge Petrenko via Tarantool-patches
2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 1/7] replication: fix a hang on final join retry Serge Petrenko via Tarantool-patches
2021-03-26 20:44   ` Vladislav Shpilevoy via Tarantool-patches
2021-03-27 16:52     ` Serge Petrenko via Tarantool-patches
2021-03-29 21:50       ` Vladislav Shpilevoy via Tarantool-patches
2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 2/7] applier: extract tx boundary checks from applier_read_tx into a separate routine Serge Petrenko via Tarantool-patches
2021-03-26 12:35   ` Cyrill Gorcunov via Tarantool-patches
2021-03-27 16:54     ` Serge Petrenko via Tarantool-patches
2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 3/7] applier: extract plain tx application from applier_apply_tx() Serge Petrenko via Tarantool-patches
2021-03-26 20:47   ` Vladislav Shpilevoy via Tarantool-patches
2021-03-27 17:34     ` Serge Petrenko via Tarantool-patches
2021-03-27 18:30   ` [Tarantool-patches] [PATCH v2 3.5/7] applier: fix not releasing the latch on apply_synchro_row() fail Serge Petrenko via Tarantool-patches
2021-03-29 21:50     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-30  8:15       ` Serge Petrenko via Tarantool-patches
2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 4/7] applier: remove excess last_row_time update from subscribe loop Serge Petrenko via Tarantool-patches
2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional Serge Petrenko via Tarantool-patches
2021-03-26 20:49   ` Vladislav Shpilevoy via Tarantool-patches
2021-03-27 19:05     ` Serge Petrenko via Tarantool-patches
2021-03-29 21:51       ` Vladislav Shpilevoy via Tarantool-patches
2021-03-30  8:15         ` Serge Petrenko via Tarantool-patches
2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 6/7] replication: tolerate synchro rollback during final join Serge Petrenko via Tarantool-patches
2021-03-24 12:45   ` Serge Petrenko via Tarantool-patches
2021-03-26 20:49   ` Vladislav Shpilevoy via Tarantool-patches
2021-03-27 19:23     ` Serge Petrenko via Tarantool-patches
2021-03-24 12:24 ` [Tarantool-patches] [PATCH v2 7/7] replication: do not ignore replica vclock on register Serge Petrenko via Tarantool-patches
2021-03-26 20:50   ` Vladislav Shpilevoy via Tarantool-patches
2021-03-27 20:13     ` Serge Petrenko via Tarantool-patches
2021-03-29 21:51       ` Vladislav Shpilevoy via Tarantool-patches
2021-03-30  8:16         ` Serge Petrenko via Tarantool-patches
2021-03-30 12:33       ` Serge Petrenko via Tarantool-patches
2021-03-26 13:46 ` [Tarantool-patches] [PATCH v2 0/7] applier: handle synchronous transactions during final Cyrill Gorcunov via Tarantool-patches
2021-03-30 20:13 ` Vladislav Shpilevoy via Tarantool-patches
2021-04-05 16:15 ` Kirill Yukhin via Tarantool-patches

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox