[Tarantool-patches] [PATCH 2/3] recovery: make it transactional

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Apr 3 16:18:15 MSK 2021


Hi! Thanks for the review!

> I ran the tests locally and replication suite fails occasionally with
> ```
> [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:539 E> error at request: {type: 'INSERT', replica_id: 0, lsn: 7, space_id: 517, index_id: 0, tuple: [21]}
> [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:481 E> XlogError: found a first row in a transaction with LSN/TSN mismatch
> [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch
> [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch
> ```

Thanks for noticing! There was a bug with transactions starting
from a local row - its LSN is from vclock[0] while for global
rows it is from vclock[instance_id > 0].

I fixed it by checking LSN == TSN only for the first global row.
If all rows are local, I check LSN matches the first row's LSN.

Here is the diff.

====================
diff --git a/src/box/box.cc b/src/box/box.cc
index f70a2bd0e..67b44c053 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -338,12 +338,22 @@ struct wal_stream {
 	struct xstream base;
 	/** Current transaction ID. 0 when no transaction. */
 	int64_t tsn;
+	/**
+	 * LSN of the first row saved to check TSN and LSN match in case all
+	 * rows of the tx appeared to be local.
+	 */
+	int64_t first_row_lsn;
 	/**
 	 * Flag whether there is a pending yield to do when the current
 	 * transaction is finished. It can't always be done right away because
 	 * would abort the current transaction if it is memtx.
 	 */
 	bool has_yield;
+	/**
+	 * True if any row in the transaction was global. Saved to check if TSN
+	 * matches LSN of a first global row.
+	 */
+	bool has_global_row;
 	/** How many rows have been recovered so far. */
 	size_t rows;
 };
@@ -484,12 +494,9 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
 			diag_set(XlogError, "found a row without TSN");
 			goto end_diag_request;
 		}
-		if (row->tsn != row->lsn) {
-			diag_set(XlogError, "found a first row in a "
-				 "transaction with LSN/TSN mismatch");
-			goto end_diag_request;
-		}
 		stream->tsn = row->tsn;
+		stream->first_row_lsn = row->lsn;
+		stream->has_global_row = false;
 		/*
 		 * Rows are not stacked into a list like during replication,
 		 * because recovery does not yield while reading the rows. All
@@ -510,6 +517,15 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
 	} else {
 		txn = in_txn();
 	}
+	/* Ensure TSN is equal to LSN of the first global row. */
+	if (!stream->has_global_row && row->group_id != GROUP_LOCAL) {
+		if (row->tsn != row->lsn) {
+			diag_set(XlogError, "found a first global row in a "
+				 "transaction with LSN/TSN mismatch");
+			goto end_diag_request;
+		}
+		stream->has_global_row = true;
+	}
 	assert(wal_stream_has_tx(stream));
 	/* Nops might appear at least after before_replace skipping rows. */
 	if (request.type != IPROTO_NOP) {
@@ -526,8 +542,26 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
 	assert(txn != NULL);
 	if (!row->is_commit)
 		return 0;
-
+	/*
+	 * For fully local transactions the TSN check won't work like for global
+	 * transactions, because it is not known if there are global rows until
+	 * commit arrives.
+	 */
+	if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) {
+		diag_set(XlogError, "fully local transaction's TSN does not "
+			 "match LSN of the first row");
+		return -1;
+	}
 	stream->tsn = 0;
+	/*
+	 * During local recovery the commit procedure should be async, otherwise
+	 * the only fiber processing recovery will get stuck on the first
+	 * synchronous tx it meets until confirm timeout is reached and the tx
+	 * is rolled back, yielding an error.
+	 * Moreover, txn_commit_try_async() doesn't hurt at all during local
+	 * recovery, since journal_write is faked at this stage and returns
+	 * immediately.
+	 */
 	if (txn_commit_try_async(txn) != 0) {
 		/* Commit fail automatically leads to rollback. */
 		assert(in_txn() == NULL);
@@ -555,20 +589,15 @@ end_diag_request:
 static void
 wal_stream_try_yield(struct wal_stream *stream)
 {
-	bool needs_yield = (stream->rows % WAL_ROWS_PER_YIELD == 0);
-	if (wal_stream_has_tx(stream)) {
-		/*
-		 * Save the yield. Otherwise it would happen only on rows which
-		 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
-		 * transaction, which is probably a very rare coincidence.
-		 */
-		stream->has_yield = true;
-		return;
-	}
-	if (stream->has_yield)
-		stream->has_yield = false;
-	else if (!needs_yield)
+	/*
+	 * Save the yield. Otherwise it would happen only on rows which
+	 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
+	 * transaction, which is probably a very rare coincidence.
+	 */
+	stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
+	if (wal_stream_has_tx(stream) || !stream->has_yield)
 		return;
+	stream->has_yield = false;
 	fiber_sleep(0);
 }
 
@@ -605,7 +634,9 @@ wal_stream_create(struct wal_stream *ctx)
 {
 	xstream_create(&ctx->base, wal_stream_apply_row);
 	ctx->tsn = 0;
+	ctx->first_row_lsn = 0;
 	ctx->has_yield = false;
+	ctx->has_global_row = false;
 	ctx->rows = 0;
 }
 
diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result
index a64eadd9c..73f903ca7 100644
--- a/test/replication/gh-5874-qsync-txn-recovery.result
+++ b/test/replication/gh-5874-qsync-txn-recovery.result
@@ -95,6 +95,31 @@ box.commit()
  | ---
  | ...
 
+--
+-- First row might be for a local space and its LSN won't match TSN. Need to be
+-- ok with that.
+--
+loc = box.schema.create_space('loc', {is_local = true, engine = engine})
+ | ---
+ | ...
+_ = loc:create_index('pk')
+ | ---
+ | ...
+box.begin()                                                                     \
+loc:replace{1}                                                                  \
+async:replace{9}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- All local.
+box.begin()                                                                     \
+loc:replace{2}                                                                  \
+loc:replace{3}                                                                  \
+box.commit()
+ | ---
+ | ...
+
 test_run:cmd('restart server default')
  | 
 async = box.space.async
@@ -103,12 +128,16 @@ async = box.space.async
 sync = box.space.sync
  | ---
  | ...
+loc = box.space.loc
+ | ---
+ | ...
 async:select()
  | ---
  | - - [1]
  |   - [3]
  |   - [4]
  |   - [6]
+ |   - [9]
  | ...
 sync:select()
  | ---
@@ -116,9 +145,18 @@ sync:select()
  |   - [2]
  |   - [3]
  | ...
+loc:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ | ...
 async:drop()
  | ---
  | ...
 sync:drop()
  | ---
  | ...
+loc:drop()
+ | ---
+ | ...
diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua
index efcf727cc..f35eb68de 100644
--- a/test/replication/gh-5874-qsync-txn-recovery.test.lua
+++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua
@@ -55,10 +55,30 @@ async:replace{8}
 do_skip = false                                                                 \
 box.commit()
 
+--
+-- First row might be for a local space and its LSN won't match TSN. Need to be
+-- ok with that.
+--
+loc = box.schema.create_space('loc', {is_local = true, engine = engine})
+_ = loc:create_index('pk')
+box.begin()                                                                     \
+loc:replace{1}                                                                  \
+async:replace{9}                                                                \
+box.commit()
+
+-- All local.
+box.begin()                                                                     \
+loc:replace{2}                                                                  \
+loc:replace{3}                                                                  \
+box.commit()
+
 test_run:cmd('restart server default')
 async = box.space.async
 sync = box.space.sync
+loc = box.space.loc
 async:select()
 sync:select()
+loc:select()
 async:drop()
 sync:drop()
+loc:drop()

====================

> Consider this diff. It looks simpler IMO, but feel free to ignore.

Applied your diff, see above.

The full patch:

====================
diff --git a/changelogs/unreleased/qsync-multi-statement-recovery b/changelogs/unreleased/qsync-multi-statement-recovery
new file mode 100644
index 000000000..c902cbe24
--- /dev/null
+++ b/changelogs/unreleased/qsync-multi-statement-recovery
@@ -0,0 +1,5 @@
+## bugfix/replication
+
+* Fix recovery of a rolled back multi-statement synchronous transaction which
+  could lead to the transaction being applied partially, and to recovery errors.
+  It happened in case the transaction worked with non-sync spaces (gh-5874).
diff --git a/src/box/box.cc b/src/box/box.cc
index 4da274976..67b44c053 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -334,7 +334,26 @@ box_set_orphan(bool orphan)
 }
 
 struct wal_stream {
+	/** Base class. */
 	struct xstream base;
+	/** Current transaction ID. 0 when no transaction. */
+	int64_t tsn;
+	/**
+	 * LSN of the first row saved to check TSN and LSN match in case all
+	 * rows of the tx appeared to be local.
+	 */
+	int64_t first_row_lsn;
+	/**
+	 * Flag whether there is a pending yield to do when the current
+	 * transaction is finished. It can't always be done right away because
+	 * would abort the current transaction if it is memtx.
+	 */
+	bool has_yield;
+	/**
+	 * True if any row in the transaction was global. Saved to check if TSN
+	 * matches LSN of a first global row.
+	 */
+	bool has_global_row;
 	/** How many rows have been recovered so far. */
 	size_t rows;
 };
@@ -379,47 +398,245 @@ recovery_journal_create(struct vclock *v)
 	journal_set(&journal.base);
 }
 
+/**
+ * Drop the stream to the initial state. It is supposed to be done when an error
+ * happens. Because in case of force recovery the stream will continue getting
+ * tuples. For that it must stay in a valid state and must handle them somehow.
+ *
+ * Now the stream simply drops the current transaction like it never happened,
+ * even if its commit-row wasn't met yet. Should be good enough for
+ * force-recovery when the consistency is already out of the game.
+ */
 static void
-apply_wal_row(struct xstream *stream, struct xrow_header *row)
+wal_stream_abort(struct wal_stream *stream)
+{
+	struct txn *tx = in_txn();
+	if (tx != NULL)
+		txn_rollback(tx);
+	stream->tsn = 0;
+	fiber_gc();
+}
+
+/**
+ * The wrapper exists only for the debug purposes, to ensure tsn being non-0 is
+ * in sync with the fiber's txn being non-NULL. It has nothing to do with the
+ * journal content, and therefore can use assertions instead of rigorous error
+ * checking even in release.
+ */
+static bool
+wal_stream_has_tx(const struct wal_stream *stream)
+{
+	bool has = stream->tsn != 0;
+	assert(has == (in_txn() != NULL));
+	return has;
+}
+
+static int
+wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
+{
+	assert(iproto_type_is_synchro_request(row->type));
+	if (wal_stream_has_tx(stream)) {
+		diag_set(XlogError, "found synchro request in a transaction");
+		return -1;
+	}
+	struct synchro_request syn_req;
+	if (xrow_decode_synchro(row, &syn_req) != 0) {
+		say_error("couldn't decode a synchro request");
+		return -1;
+	}
+	txn_limbo_process(&txn_limbo, &syn_req);
+	return 0;
+}
+
+static int
+wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row)
+{
+	assert(iproto_type_is_raft_request(row->type));
+	if (wal_stream_has_tx(stream)) {
+		diag_set(XlogError, "found raft request in a transaction");
+		return -1;
+	}
+	struct raft_request raft_req;
+	/* Vclock is never persisted in WAL by Raft. */
+	if (xrow_decode_raft(row, &raft_req, NULL) != 0) {
+		say_error("couldn't decode a raft request");
+		return -1;
+	}
+	box_raft_recover(&raft_req);
+	return 0;
+}
+
+/**
+ * Rows of the same transaction are wrapped into begin/commit. Mostly for the
+ * sake of synchronous replication, when the log can contain rolled back
+ * transactions, which must be entirely reverted during recovery when ROLLBACK
+ * records are met. Row-by-row recovery wouldn't work for multi-statement
+ * synchronous transactions.
+ */
+static int
+wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
 {
 	struct request request;
-	if (iproto_type_is_synchro_request(row->type)) {
-		struct synchro_request syn_req;
-		if (xrow_decode_synchro(row, &syn_req) != 0)
-			diag_raise();
-		txn_limbo_process(&txn_limbo, &syn_req);
-		return;
+	uint64_t req_type = dml_request_key_map(row->type);
+	if (xrow_decode_dml(row, &request, req_type) != 0) {
+		say_error("couldn't decode a DML request");
+		return -1;
 	}
-	if (iproto_type_is_raft_request(row->type)) {
-		struct raft_request raft_req;
-		/* Vclock is never persisted in WAL by Raft. */
-		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
-			diag_raise();
-		box_raft_recover(&raft_req);
-		return;
+	/*
+	 * Note that all the information which came from the log is validated
+	 * and the errors are handled. Not asserted or paniced. That is for the
+	 * sake of force recovery, which must be able to recover just everything
+	 * what possible instead of terminating the instance.
+	 */
+	struct txn *txn;
+	if (stream->tsn == 0) {
+		if (row->tsn == 0) {
+			diag_set(XlogError, "found a row without TSN");
+			goto end_diag_request;
+		}
+		stream->tsn = row->tsn;
+		stream->first_row_lsn = row->lsn;
+		stream->has_global_row = false;
+		/*
+		 * Rows are not stacked into a list like during replication,
+		 * because recovery does not yield while reading the rows. All
+		 * the yields are controlled by the stream, and therefore no
+		 * need to wait for all the rows to start a transaction. Can
+		 * start now, apply the rows, and make a yield after commit if
+		 * necessary. Helps to avoid a lot of copying.
+		 */
+		txn = txn_begin();
+		if (txn == NULL) {
+			say_error("couldn't begin a recovery transaction");
+			return -1;
+		}
+	} else if (row->tsn != stream->tsn) {
+		diag_set(XlogError, "found a next transaction with the "
+			 "previous one not yet committed");
+		goto end_diag_request;
+	} else {
+		txn = in_txn();
+	}
+	/* Ensure TSN is equal to LSN of the first global row. */
+	if (!stream->has_global_row && row->group_id != GROUP_LOCAL) {
+		if (row->tsn != row->lsn) {
+			diag_set(XlogError, "found a first global row in a "
+				 "transaction with LSN/TSN mismatch");
+			goto end_diag_request;
+		}
+		stream->has_global_row = true;
 	}
-	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
+	assert(wal_stream_has_tx(stream));
+	/* Nops might appear at least after before_replace skipping rows. */
 	if (request.type != IPROTO_NOP) {
-		struct space *space = space_cache_find_xc(request.space_id);
+		struct space *space = space_cache_find(request.space_id);
+		if (space == NULL) {
+			say_error("couldn't find space by ID");
+			goto end_diag_request;
+		}
 		if (box_process_rw(&request, space, NULL) != 0) {
-			say_error("error applying row: %s", request_str(&request));
-			diag_raise();
+			say_error("couldn't apply the request");
+			goto end_diag_request;
 		}
 	}
-	struct wal_stream *xstream =
-		container_of(stream, struct wal_stream, base);
-	/**
-	 * Yield once in a while, but not too often,
-	 * mostly to allow signal handling to take place.
+	assert(txn != NULL);
+	if (!row->is_commit)
+		return 0;
+	/*
+	 * For fully local transactions the TSN check won't work like for global
+	 * transactions, because it is not known if there are global rows until
+	 * commit arrives.
 	 */
-	if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
-		fiber_sleep(0);
+	if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) {
+		diag_set(XlogError, "fully local transaction's TSN does not "
+			 "match LSN of the first row");
+		return -1;
+	}
+	stream->tsn = 0;
+	/*
+	 * During local recovery the commit procedure should be async, otherwise
+	 * the only fiber processing recovery will get stuck on the first
+	 * synchronous tx it meets until confirm timeout is reached and the tx
+	 * is rolled back, yielding an error.
+	 * Moreover, txn_commit_try_async() doesn't hurt at all during local
+	 * recovery, since journal_write is faked at this stage and returns
+	 * immediately.
+	 */
+	if (txn_commit_try_async(txn) != 0) {
+		/* Commit fail automatically leads to rollback. */
+		assert(in_txn() == NULL);
+		say_error("couldn't commit a recovery transaction");
+		return -1;
+	}
+	assert(in_txn() == NULL);
+	fiber_gc();
+	return 0;
+
+end_diag_request:
+	/*
+	 * The label must be used only for the errors related directly to the
+	 * request. Errors like txn_begin() fail has nothing to do with it, and
+	 * therefore don't log the request as the fault reason.
+	 */
+	say_error("error at request: %s", request_str(&request));
+	return -1;
+}
+
+/**
+ * Yield once in a while, but not too often, mostly to allow signal handling to
+ * take place.
+ */
+static void
+wal_stream_try_yield(struct wal_stream *stream)
+{
+	/*
+	 * Save the yield. Otherwise it would happen only on rows which
+	 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
+	 * transaction, which is probably a very rare coincidence.
+	 */
+	stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
+	if (wal_stream_has_tx(stream) || !stream->has_yield)
+		return;
+	stream->has_yield = false;
+	fiber_sleep(0);
+}
+
+static void
+wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
+{
+	struct wal_stream *stream =
+		container_of(base, struct wal_stream, base);
+	/*
+	 * Account all rows, even non-DML, and even leading to an error. Because
+	 * still need to yield sometimes.
+	 */
+	++stream->rows;
+	if (iproto_type_is_synchro_request(row->type)) {
+		if (wal_stream_apply_synchro_row(stream, row) != 0)
+			goto end_error;
+	} else if (iproto_type_is_raft_request(row->type)) {
+		if (wal_stream_apply_raft_row(stream, row) != 0)
+			goto end_error;
+	} else if (wal_stream_apply_dml_row(stream, row) != 0) {
+		goto end_error;
+	}
+	wal_stream_try_yield(stream);
+	return;
+
+end_error:
+	wal_stream_abort(stream);
+	wal_stream_try_yield(stream);
+	diag_raise();
 }
 
 static void
 wal_stream_create(struct wal_stream *ctx)
 {
-	xstream_create(&ctx->base, apply_wal_row);
+	xstream_create(&ctx->base, wal_stream_apply_row);
+	ctx->tsn = 0;
+	ctx->first_row_lsn = 0;
+	ctx->has_yield = false;
+	ctx->has_global_row = false;
 	ctx->rows = 0;
 }
 
@@ -2797,9 +3014,13 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	struct wal_stream wal_stream;
 	wal_stream_create(&wal_stream);
+	auto stream_guard = make_scoped_guard([&]{
+		wal_stream_abort(&wal_stream);
+	});
 
 	struct recovery *recovery;
-	recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"),
+	bool is_force_recovery = cfg_geti("force_recovery");
+	recovery = recovery_new(wal_dir(), is_force_recovery,
 				checkpoint_vclock);
 
 	/*
@@ -2861,6 +3082,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	engine_begin_final_recovery_xc();
 	recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
+	if (wal_stream_has_tx(&wal_stream)) {
+		wal_stream_abort(&wal_stream);
+		diag_set(XlogError, "found a not finished transaction "
+			 "in the log");
+		if (!is_force_recovery)
+			diag_raise();
+		diag_log();
+	}
 	engine_end_recovery_xc();
 	/*
 	 * Leave hot standby mode, if any, only after
@@ -2880,6 +3109,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		}
 		recovery_stop_local(recovery);
 		recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
+		if (wal_stream_has_tx(&wal_stream)) {
+			wal_stream_abort(&wal_stream);
+			diag_set(XlogError, "found a not finished transaction "
+				 "in the log in hot standby mode");
+			if (!is_force_recovery)
+				diag_raise();
+			diag_log();
+		}
 		/*
 		 * Advance replica set vclock to reflect records
 		 * applied in hot standby mode.
@@ -2888,6 +3125,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		box_listen();
 		box_sync_replication(false);
 	}
+	stream_guard.is_active = false;
 	recovery_finalize(recovery);
 	is_local_recovery = false;
 
diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result
new file mode 100644
index 000000000..73f903ca7
--- /dev/null
+++ b/test/replication/gh-5874-qsync-txn-recovery.result
@@ -0,0 +1,162 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+--
+-- gh-5874: synchronous transactions should be recovered as whole units, not row
+-- by row. So as to be able to roll them back when ROLLBACK is recovered
+-- afterwards.
+--
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+ | ---
+ | ...
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+ | ---
+ | ...
+box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
+ | ---
+ | ...
+engine = test_run:get_cfg('engine')
+ | ---
+ | ...
+async = box.schema.create_space('async', {engine = engine})
+ | ---
+ | ...
+_ = async:create_index('pk')
+ | ---
+ | ...
+sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
+ | ---
+ | ...
+_ = sync:create_index('pk')
+ | ---
+ | ...
+
+-- The transaction fails, but is written to the log anyway.
+box.begin() async:insert{1} sync:insert{1} box.commit()
+ | ---
+ | - error: Quorum collection for a synchronous transaction is timed out
+ | ...
+-- Ok, the previous txn is rolled back.
+_ = async:insert{1}
+ | ---
+ | ...
+box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
+ | ---
+ | ...
+_ = sync:insert{1}
+ | ---
+ | ...
+-- Try multi-statement sync txn to see how it recovers.
+box.begin() sync:insert{2} sync:insert{3} box.commit()
+ | ---
+ | ...
+
+-- See if NOP multi-statement recovery works fine.
+--
+-- Start with NOP.
+do_skip = false
+ | ---
+ | ...
+_ = async:before_replace(function()                                             \
+    if do_skip then                                                             \
+        return nil                                                              \
+    end                                                                         \
+end)
+ | ---
+ | ...
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{2}                                                                \
+do_skip = false                                                                 \
+async:replace{3}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- NOP in the middle.
+box.begin()                                                                     \
+async:replace{4}                                                                \
+do_skip = true                                                                  \
+async:replace{5}                                                                \
+do_skip = false                                                                 \
+async:replace{6}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- All NOP.
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{7}                                                                \
+async:replace{8}                                                                \
+do_skip = false                                                                 \
+box.commit()
+ | ---
+ | ...
+
+--
+-- First row might be for a local space and its LSN won't match TSN. Need to be
+-- ok with that.
+--
+loc = box.schema.create_space('loc', {is_local = true, engine = engine})
+ | ---
+ | ...
+_ = loc:create_index('pk')
+ | ---
+ | ...
+box.begin()                                                                     \
+loc:replace{1}                                                                  \
+async:replace{9}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- All local.
+box.begin()                                                                     \
+loc:replace{2}                                                                  \
+loc:replace{3}                                                                  \
+box.commit()
+ | ---
+ | ...
+
+test_run:cmd('restart server default')
+ | 
+async = box.space.async
+ | ---
+ | ...
+sync = box.space.sync
+ | ---
+ | ...
+loc = box.space.loc
+ | ---
+ | ...
+async:select()
+ | ---
+ | - - [1]
+ |   - [3]
+ |   - [4]
+ |   - [6]
+ |   - [9]
+ | ...
+sync:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ | ...
+loc:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ | ...
+async:drop()
+ | ---
+ | ...
+sync:drop()
+ | ---
+ | ...
+loc:drop()
+ | ---
+ | ...
diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua
new file mode 100644
index 000000000..f35eb68de
--- /dev/null
+++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua
@@ -0,0 +1,84 @@
+test_run = require('test_run').new()
+--
+-- gh-5874: synchronous transactions should be recovered as whole units, not row
+-- by row. So as to be able to roll them back when ROLLBACK is recovered
+-- afterwards.
+--
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
+engine = test_run:get_cfg('engine')
+async = box.schema.create_space('async', {engine = engine})
+_ = async:create_index('pk')
+sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
+_ = sync:create_index('pk')
+
+-- The transaction fails, but is written to the log anyway.
+box.begin() async:insert{1} sync:insert{1} box.commit()
+-- Ok, the previous txn is rolled back.
+_ = async:insert{1}
+box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
+_ = sync:insert{1}
+-- Try multi-statement sync txn to see how it recovers.
+box.begin() sync:insert{2} sync:insert{3} box.commit()
+
+-- See if NOP multi-statement recovery works fine.
+--
+-- Start with NOP.
+do_skip = false
+_ = async:before_replace(function()                                             \
+    if do_skip then                                                             \
+        return nil                                                              \
+    end                                                                         \
+end)
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{2}                                                                \
+do_skip = false                                                                 \
+async:replace{3}                                                                \
+box.commit()
+
+-- NOP in the middle.
+box.begin()                                                                     \
+async:replace{4}                                                                \
+do_skip = true                                                                  \
+async:replace{5}                                                                \
+do_skip = false                                                                 \
+async:replace{6}                                                                \
+box.commit()
+
+-- All NOP.
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{7}                                                                \
+async:replace{8}                                                                \
+do_skip = false                                                                 \
+box.commit()
+
+--
+-- First row might be for a local space and its LSN won't match TSN. Need to be
+-- ok with that.
+--
+loc = box.schema.create_space('loc', {is_local = true, engine = engine})
+_ = loc:create_index('pk')
+box.begin()                                                                     \
+loc:replace{1}                                                                  \
+async:replace{9}                                                                \
+box.commit()
+
+-- All local.
+box.begin()                                                                     \
+loc:replace{2}                                                                  \
+loc:replace{3}                                                                  \
+box.commit()
+
+test_run:cmd('restart server default')
+async = box.space.async
+sync = box.space.sync
+loc = box.space.loc
+async:select()
+sync:select()
+loc:select()
+async:drop()
+sync:drop()
+loc:drop()


More information about the Tarantool-patches mailing list