Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH 0/3] Transactional recovery
@ 2021-04-01 22:23 Vladislav Shpilevoy via Tarantool-patches
  2021-04-01 22:23 ` [Tarantool-patches] [PATCH 1/3] vinyl: handle multi-statement recovery txns Vladislav Shpilevoy via Tarantool-patches
                   ` (4 more replies)
  0 siblings, 5 replies; 14+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-04-01 22:23 UTC (permalink / raw)
  To: tarantool-patches, gorcunov, sergepetrenko, korablev

The patchset makes the recovery transactional. That is done for
the synchronous transactions, because they might be followed by a
ROLLBACK, and then must be reverted entirely, including their
statements working with non-sync spaces.

Nikita, I need you to check if the first commit is correct. It
touches vinyl.

Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-5874-txn-recovery
Issue: https://github.com/tarantool/tarantool/issues/5874

Vladislav Shpilevoy (3):
  vinyl: handle multi-statement recovery txns
  recovery: make it transactional
  box: remove is_local_recovery variable

 .../unreleased/qsync-multi-statement-recovery |   5 +
 src/box/box.cc                                | 273 +++++++++++++++---
 src/box/vy_tx.c                               |   9 +-
 .../gh-5874-qsync-txn-recovery.result         | 124 ++++++++
 .../gh-5874-qsync-txn-recovery.test.lua       |  64 ++++
 5 files changed, 436 insertions(+), 39 deletions(-)
 create mode 100644 changelogs/unreleased/qsync-multi-statement-recovery
 create mode 100644 test/replication/gh-5874-qsync-txn-recovery.result
 create mode 100644 test/replication/gh-5874-qsync-txn-recovery.test.lua

-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [Tarantool-patches] [PATCH 1/3] vinyl: handle multi-statement recovery txns
  2021-04-01 22:23 [Tarantool-patches] [PATCH 0/3] Transactional recovery Vladislav Shpilevoy via Tarantool-patches
@ 2021-04-01 22:23 ` Vladislav Shpilevoy via Tarantool-patches
  2021-04-02  9:24   ` Serge Petrenko via Tarantool-patches
  2021-04-01 22:23 ` [Tarantool-patches] [PATCH 2/3] recovery: make it transactional Vladislav Shpilevoy via Tarantool-patches
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 14+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-04-01 22:23 UTC (permalink / raw)
  To: tarantool-patches, gorcunov, sergepetrenko, korablev

During recovery and xlog replay vinyl skips the statements already
stored in runs. Indeed, their re-insertion into the mems would
lead to their second dump otherwise.

But that results into an issue that the recovery transactions in
vinyl don't have a write set - their tx->log is empty. On the
other hand they still are added to the write set (xm->writers).
Probably so as not to have too many checks "skip if in recovery"
all over the code.

It works fine with single-statement transactions, but would break
on multi-statement transactions. Because the decision whether
need to add to the write set was done based on the tx's log
emptiness. It is always empty, and so the transaction could be
added to the write set twice and corrupt its list-link member.

The patch makes the decision about being added to the write set
based on emptiness of the list-link member instead of the log so
it works fine both during recovery and normal operation.

Needed for #5874
---
 src/box/vy_tx.c | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index ff63cd7a1..cd210beb0 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -899,8 +899,15 @@ vy_tx_begin_statement(struct vy_tx *tx, struct space *space, void **savepoint)
 	}
 	assert(tx->state == VINYL_TX_READY);
 	tx->last_stmt_space = space;
-	if (stailq_empty(&tx->log))
+	/*
+	 * When want to add to the writer list, can't rely on the log emptiness.
+	 * During recovery it is empty always for the data stored both in runs
+	 * and xlogs. Must check the list member explicitly.
+	 */
+	if (rlist_empty(&tx->in_writers)) {
+		assert(stailq_empty(&tx->log));
 		rlist_add_entry(&tx->xm->writers, tx, in_writers);
+	}
 	*savepoint = stailq_last(&tx->log);
 	return 0;
 }
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [Tarantool-patches] [PATCH 2/3] recovery: make it transactional
  2021-04-01 22:23 [Tarantool-patches] [PATCH 0/3] Transactional recovery Vladislav Shpilevoy via Tarantool-patches
  2021-04-01 22:23 ` [Tarantool-patches] [PATCH 1/3] vinyl: handle multi-statement recovery txns Vladislav Shpilevoy via Tarantool-patches
@ 2021-04-01 22:23 ` Vladislav Shpilevoy via Tarantool-patches
  2021-04-02 11:47   ` Serge Petrenko via Tarantool-patches
  2021-04-02 15:11   ` Cyrill Gorcunov via Tarantool-patches
  2021-04-01 22:23 ` [Tarantool-patches] [PATCH 3/3] box: remove is_local_recovery variable Vladislav Shpilevoy via Tarantool-patches
                   ` (2 subsequent siblings)
  4 siblings, 2 replies; 14+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-04-01 22:23 UTC (permalink / raw)
  To: tarantool-patches, gorcunov, sergepetrenko, korablev

Recovery used to be performed row by row. It was fine because
anyway all the persisted rows are supposed to be committed, and
should not meet any problems during recovery so a transaction
could be applied partially.

But it became not true after the synchronous replication
introduction. Synchronous transactions might be in the log, but
can be followed by a ROLLBACK record which is supposed to delete
them.

During row-by-row recovery, firstly, the synchro rows each turned
into a sync transaction. Which is probably fine. But the rows on
non-sync spaces which were a part of a sync transaction, could be
applied right away bypassing the limbo leading to all kind of the
sweet errors like duplicate keys, or inconsistency of a partially
applied transaction.

The patch makes the recovery transactional. Either an entire
transaction is recovered, or it is rolled back which normally
happens only for synchro transactions followed by ROLLBACK.

In force recovery of a broken log the consistency is not
guaranteed though.

Closes #5874
---
 .../unreleased/qsync-multi-statement-recovery |   5 +
 src/box/box.cc                                | 261 ++++++++++++++++--
 .../gh-5874-qsync-txn-recovery.result         | 124 +++++++++
 .../gh-5874-qsync-txn-recovery.test.lua       |  64 +++++
 4 files changed, 427 insertions(+), 27 deletions(-)
 create mode 100644 changelogs/unreleased/qsync-multi-statement-recovery
 create mode 100644 test/replication/gh-5874-qsync-txn-recovery.result
 create mode 100644 test/replication/gh-5874-qsync-txn-recovery.test.lua

diff --git a/changelogs/unreleased/qsync-multi-statement-recovery b/changelogs/unreleased/qsync-multi-statement-recovery
new file mode 100644
index 000000000..c902cbe24
--- /dev/null
+++ b/changelogs/unreleased/qsync-multi-statement-recovery
@@ -0,0 +1,5 @@
+## bugfix/replication
+
+* Fix recovery of a rolled back multi-statement synchronous transaction which
+  could lead to the transaction being applied partially, and to recovery errors.
+  It happened in case the transaction worked with non-sync spaces (gh-5874).
diff --git a/src/box/box.cc b/src/box/box.cc
index 4da274976..f70a2bd0e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -334,7 +334,16 @@ box_set_orphan(bool orphan)
 }
 
 struct wal_stream {
+	/** Base class. */
 	struct xstream base;
+	/** Current transaction ID. 0 when no transaction. */
+	int64_t tsn;
+	/**
+	 * Flag whether there is a pending yield to do when the current
+	 * transaction is finished. It can't always be done right away because
+	 * would abort the current transaction if it is memtx.
+	 */
+	bool has_yield;
 	/** How many rows have been recovered so far. */
 	size_t rows;
 };
@@ -379,47 +388,224 @@ recovery_journal_create(struct vclock *v)
 	journal_set(&journal.base);
 }
 
+/**
+ * Drop the stream to the initial state. It is supposed to be done when an error
+ * happens. Because in case of force recovery the stream will continue getting
+ * tuples. For that it must stay in a valid state and must handle them somehow.
+ *
+ * Now the stream simply drops the current transaction like it never happened,
+ * even if its commit-row wasn't met yet. Should be good enough for
+ * force-recovery when the consistency is already out of the game.
+ */
 static void
-apply_wal_row(struct xstream *stream, struct xrow_header *row)
+wal_stream_abort(struct wal_stream *stream)
+{
+	struct txn *tx = in_txn();
+	if (tx != NULL)
+		txn_rollback(tx);
+	stream->tsn = 0;
+	fiber_gc();
+}
+
+/**
+ * The wrapper exists only for the debug purposes, to ensure tsn being non-0 is
+ * in sync with the fiber's txn being non-NULL. It has nothing to do with the
+ * journal content, and therefore can use assertions instead of rigorous error
+ * checking even in release.
+ */
+static bool
+wal_stream_has_tx(const struct wal_stream *stream)
+{
+	bool has = stream->tsn != 0;
+	assert(has == (in_txn() != NULL));
+	return has;
+}
+
+static int
+wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
+{
+	assert(iproto_type_is_synchro_request(row->type));
+	if (wal_stream_has_tx(stream)) {
+		diag_set(XlogError, "found synchro request in a transaction");
+		return -1;
+	}
+	struct synchro_request syn_req;
+	if (xrow_decode_synchro(row, &syn_req) != 0) {
+		say_error("couldn't decode a synchro request");
+		return -1;
+	}
+	txn_limbo_process(&txn_limbo, &syn_req);
+	return 0;
+}
+
+static int
+wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row)
+{
+	assert(iproto_type_is_raft_request(row->type));
+	if (wal_stream_has_tx(stream)) {
+		diag_set(XlogError, "found raft request in a transaction");
+		return -1;
+	}
+	struct raft_request raft_req;
+	/* Vclock is never persisted in WAL by Raft. */
+	if (xrow_decode_raft(row, &raft_req, NULL) != 0) {
+		say_error("couldn't decode a raft request");
+		return -1;
+	}
+	box_raft_recover(&raft_req);
+	return 0;
+}
+
+/**
+ * Rows of the same transaction are wrapped into begin/commit. Mostly for the
+ * sake of synchronous replication, when the log can contain rolled back
+ * transactions, which must be entirely reverted during recovery when ROLLBACK
+ * records are met. Row-by-row recovery wouldn't work for multi-statement
+ * synchronous transactions.
+ */
+static int
+wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
 {
 	struct request request;
-	if (iproto_type_is_synchro_request(row->type)) {
-		struct synchro_request syn_req;
-		if (xrow_decode_synchro(row, &syn_req) != 0)
-			diag_raise();
-		txn_limbo_process(&txn_limbo, &syn_req);
-		return;
+	uint64_t req_type = dml_request_key_map(row->type);
+	if (xrow_decode_dml(row, &request, req_type) != 0) {
+		say_error("couldn't decode a DML request");
+		return -1;
 	}
-	if (iproto_type_is_raft_request(row->type)) {
-		struct raft_request raft_req;
-		/* Vclock is never persisted in WAL by Raft. */
-		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
-			diag_raise();
-		box_raft_recover(&raft_req);
-		return;
+	/*
+	 * Note that all the information which came from the log is validated
+	 * and the errors are handled. Not asserted or paniced. That is for the
+	 * sake of force recovery, which must be able to recover just everything
+	 * what possible instead of terminating the instance.
+	 */
+	struct txn *txn;
+	if (stream->tsn == 0) {
+		if (row->tsn == 0) {
+			diag_set(XlogError, "found a row without TSN");
+			goto end_diag_request;
+		}
+		if (row->tsn != row->lsn) {
+			diag_set(XlogError, "found a first row in a "
+				 "transaction with LSN/TSN mismatch");
+			goto end_diag_request;
+		}
+		stream->tsn = row->tsn;
+		/*
+		 * Rows are not stacked into a list like during replication,
+		 * because recovery does not yield while reading the rows. All
+		 * the yields are controlled by the stream, and therefore no
+		 * need to wait for all the rows to start a transaction. Can
+		 * start now, apply the rows, and make a yield after commit if
+		 * necessary. Helps to avoid a lot of copying.
+		 */
+		txn = txn_begin();
+		if (txn == NULL) {
+			say_error("couldn't begin a recovery transaction");
+			return -1;
+		}
+	} else if (row->tsn != stream->tsn) {
+		diag_set(XlogError, "found a next transaction with the "
+			 "previous one not yet committed");
+		goto end_diag_request;
+	} else {
+		txn = in_txn();
 	}
-	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
+	assert(wal_stream_has_tx(stream));
+	/* Nops might appear at least after before_replace skipping rows. */
 	if (request.type != IPROTO_NOP) {
-		struct space *space = space_cache_find_xc(request.space_id);
+		struct space *space = space_cache_find(request.space_id);
+		if (space == NULL) {
+			say_error("couldn't find space by ID");
+			goto end_diag_request;
+		}
 		if (box_process_rw(&request, space, NULL) != 0) {
-			say_error("error applying row: %s", request_str(&request));
-			diag_raise();
+			say_error("couldn't apply the request");
+			goto end_diag_request;
 		}
 	}
-	struct wal_stream *xstream =
-		container_of(stream, struct wal_stream, base);
-	/**
-	 * Yield once in a while, but not too often,
-	 * mostly to allow signal handling to take place.
+	assert(txn != NULL);
+	if (!row->is_commit)
+		return 0;
+
+	stream->tsn = 0;
+	if (txn_commit_try_async(txn) != 0) {
+		/* Commit fail automatically leads to rollback. */
+		assert(in_txn() == NULL);
+		say_error("couldn't commit a recovery transaction");
+		return -1;
+	}
+	assert(in_txn() == NULL);
+	fiber_gc();
+	return 0;
+
+end_diag_request:
+	/*
+	 * The label must be used only for the errors related directly to the
+	 * request. Errors like txn_begin() fail has nothing to do with it, and
+	 * therefore don't log the request as the fault reason.
+	 */
+	say_error("error at request: %s", request_str(&request));
+	return -1;
+}
+
+/**
+ * Yield once in a while, but not too often, mostly to allow signal handling to
+ * take place.
+ */
+static void
+wal_stream_try_yield(struct wal_stream *stream)
+{
+	bool needs_yield = (stream->rows % WAL_ROWS_PER_YIELD == 0);
+	if (wal_stream_has_tx(stream)) {
+		/*
+		 * Save the yield. Otherwise it would happen only on rows which
+		 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
+		 * transaction, which is probably a very rare coincidence.
+		 */
+		stream->has_yield = true;
+		return;
+	}
+	if (stream->has_yield)
+		stream->has_yield = false;
+	else if (!needs_yield)
+		return;
+	fiber_sleep(0);
+}
+
+static void
+wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
+{
+	struct wal_stream *stream =
+		container_of(base, struct wal_stream, base);
+	/*
+	 * Account all rows, even non-DML, and even leading to an error. Because
+	 * still need to yield sometimes.
 	 */
-	if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
-		fiber_sleep(0);
+	++stream->rows;
+	if (iproto_type_is_synchro_request(row->type)) {
+		if (wal_stream_apply_synchro_row(stream, row) != 0)
+			goto end_error;
+	} else if (iproto_type_is_raft_request(row->type)) {
+		if (wal_stream_apply_raft_row(stream, row) != 0)
+			goto end_error;
+	} else if (wal_stream_apply_dml_row(stream, row) != 0) {
+		goto end_error;
+	}
+	wal_stream_try_yield(stream);
+	return;
+
+end_error:
+	wal_stream_abort(stream);
+	wal_stream_try_yield(stream);
+	diag_raise();
 }
 
 static void
 wal_stream_create(struct wal_stream *ctx)
 {
-	xstream_create(&ctx->base, apply_wal_row);
+	xstream_create(&ctx->base, wal_stream_apply_row);
+	ctx->tsn = 0;
+	ctx->has_yield = false;
 	ctx->rows = 0;
 }
 
@@ -2797,9 +2983,13 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	struct wal_stream wal_stream;
 	wal_stream_create(&wal_stream);
+	auto stream_guard = make_scoped_guard([&]{
+		wal_stream_abort(&wal_stream);
+	});
 
 	struct recovery *recovery;
-	recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"),
+	bool is_force_recovery = cfg_geti("force_recovery");
+	recovery = recovery_new(wal_dir(), is_force_recovery,
 				checkpoint_vclock);
 
 	/*
@@ -2861,6 +3051,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	engine_begin_final_recovery_xc();
 	recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
+	if (wal_stream_has_tx(&wal_stream)) {
+		wal_stream_abort(&wal_stream);
+		diag_set(XlogError, "found a not finished transaction "
+			 "in the log");
+		if (!is_force_recovery)
+			diag_raise();
+		diag_log();
+	}
 	engine_end_recovery_xc();
 	/*
 	 * Leave hot standby mode, if any, only after
@@ -2880,6 +3078,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		}
 		recovery_stop_local(recovery);
 		recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
+		if (wal_stream_has_tx(&wal_stream)) {
+			wal_stream_abort(&wal_stream);
+			diag_set(XlogError, "found a not finished transaction "
+				 "in the log in hot standby mode");
+			if (!is_force_recovery)
+				diag_raise();
+			diag_log();
+		}
 		/*
 		 * Advance replica set vclock to reflect records
 		 * applied in hot standby mode.
@@ -2888,6 +3094,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		box_listen();
 		box_sync_replication(false);
 	}
+	stream_guard.is_active = false;
 	recovery_finalize(recovery);
 	is_local_recovery = false;
 
diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result
new file mode 100644
index 000000000..a64eadd9c
--- /dev/null
+++ b/test/replication/gh-5874-qsync-txn-recovery.result
@@ -0,0 +1,124 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+--
+-- gh-5874: synchronous transactions should be recovered as whole units, not row
+-- by row. So as to be able to roll them back when ROLLBACK is recovered
+-- afterwards.
+--
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+ | ---
+ | ...
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+ | ---
+ | ...
+box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
+ | ---
+ | ...
+engine = test_run:get_cfg('engine')
+ | ---
+ | ...
+async = box.schema.create_space('async', {engine = engine})
+ | ---
+ | ...
+_ = async:create_index('pk')
+ | ---
+ | ...
+sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
+ | ---
+ | ...
+_ = sync:create_index('pk')
+ | ---
+ | ...
+
+-- The transaction fails, but is written to the log anyway.
+box.begin() async:insert{1} sync:insert{1} box.commit()
+ | ---
+ | - error: Quorum collection for a synchronous transaction is timed out
+ | ...
+-- Ok, the previous txn is rolled back.
+_ = async:insert{1}
+ | ---
+ | ...
+box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
+ | ---
+ | ...
+_ = sync:insert{1}
+ | ---
+ | ...
+-- Try multi-statement sync txn to see how it recovers.
+box.begin() sync:insert{2} sync:insert{3} box.commit()
+ | ---
+ | ...
+
+-- See if NOP multi-statement recovery works fine.
+--
+-- Start with NOP.
+do_skip = false
+ | ---
+ | ...
+_ = async:before_replace(function()                                             \
+    if do_skip then                                                             \
+        return nil                                                              \
+    end                                                                         \
+end)
+ | ---
+ | ...
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{2}                                                                \
+do_skip = false                                                                 \
+async:replace{3}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- NOP in the middle.
+box.begin()                                                                     \
+async:replace{4}                                                                \
+do_skip = true                                                                  \
+async:replace{5}                                                                \
+do_skip = false                                                                 \
+async:replace{6}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- All NOP.
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{7}                                                                \
+async:replace{8}                                                                \
+do_skip = false                                                                 \
+box.commit()
+ | ---
+ | ...
+
+test_run:cmd('restart server default')
+ | 
+async = box.space.async
+ | ---
+ | ...
+sync = box.space.sync
+ | ---
+ | ...
+async:select()
+ | ---
+ | - - [1]
+ |   - [3]
+ |   - [4]
+ |   - [6]
+ | ...
+sync:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ | ...
+async:drop()
+ | ---
+ | ...
+sync:drop()
+ | ---
+ | ...
diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua
new file mode 100644
index 000000000..efcf727cc
--- /dev/null
+++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua
@@ -0,0 +1,64 @@
+test_run = require('test_run').new()
+--
+-- gh-5874: synchronous transactions should be recovered as whole units, not row
+-- by row. So as to be able to roll them back when ROLLBACK is recovered
+-- afterwards.
+--
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
+engine = test_run:get_cfg('engine')
+async = box.schema.create_space('async', {engine = engine})
+_ = async:create_index('pk')
+sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
+_ = sync:create_index('pk')
+
+-- The transaction fails, but is written to the log anyway.
+box.begin() async:insert{1} sync:insert{1} box.commit()
+-- Ok, the previous txn is rolled back.
+_ = async:insert{1}
+box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
+_ = sync:insert{1}
+-- Try multi-statement sync txn to see how it recovers.
+box.begin() sync:insert{2} sync:insert{3} box.commit()
+
+-- See if NOP multi-statement recovery works fine.
+--
+-- Start with NOP.
+do_skip = false
+_ = async:before_replace(function()                                             \
+    if do_skip then                                                             \
+        return nil                                                              \
+    end                                                                         \
+end)
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{2}                                                                \
+do_skip = false                                                                 \
+async:replace{3}                                                                \
+box.commit()
+
+-- NOP in the middle.
+box.begin()                                                                     \
+async:replace{4}                                                                \
+do_skip = true                                                                  \
+async:replace{5}                                                                \
+do_skip = false                                                                 \
+async:replace{6}                                                                \
+box.commit()
+
+-- All NOP.
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{7}                                                                \
+async:replace{8}                                                                \
+do_skip = false                                                                 \
+box.commit()
+
+test_run:cmd('restart server default')
+async = box.space.async
+sync = box.space.sync
+async:select()
+sync:select()
+async:drop()
+sync:drop()
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [Tarantool-patches] [PATCH 3/3] box: remove is_local_recovery variable
  2021-04-01 22:23 [Tarantool-patches] [PATCH 0/3] Transactional recovery Vladislav Shpilevoy via Tarantool-patches
  2021-04-01 22:23 ` [Tarantool-patches] [PATCH 1/3] vinyl: handle multi-statement recovery txns Vladislav Shpilevoy via Tarantool-patches
  2021-04-01 22:23 ` [Tarantool-patches] [PATCH 2/3] recovery: make it transactional Vladislav Shpilevoy via Tarantool-patches
@ 2021-04-01 22:23 ` Vladislav Shpilevoy via Tarantool-patches
  2021-04-02 11:47   ` Serge Petrenko via Tarantool-patches
  2021-04-02  9:42 ` [Tarantool-patches] [PATCH 0/3] Transactional recovery Konstantin Osipov via Tarantool-patches
  2021-04-05 16:14 ` Kirill Yukhin via Tarantool-patches
  4 siblings, 1 reply; 14+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-04-01 22:23 UTC (permalink / raw)
  To: tarantool-patches, gorcunov, sergepetrenko, korablev

It was used so as to recover synchronous auto-commit transactions
in an async way (not blocking the fiber). But it became not
necessary since #5874 was fixed. Because recovery does not use
auto-commit transactions anymore.

Closes #5194
---
 src/box/box.cc | 12 +-----------
 1 file changed, 1 insertion(+), 11 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index f70a2bd0e..8eacbfebb 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -125,8 +125,6 @@ static struct gc_checkpoint_ref backup_gc;
 static bool is_box_configured = false;
 static bool is_ro = true;
 static fiber_cond ro_cond;
-/** Set to true during recovery from local files. */
-static bool is_local_recovery = false;
 
 /**
  * The following flag is set if the instance failed to
@@ -241,7 +239,6 @@ box_process_rw(struct request *request, struct space *space,
 		goto rollback;
 
 	if (is_autocommit) {
-		int res = 0;
 		/*
 		 * During local recovery the commit procedure
 		 * should be async, otherwise the only fiber
@@ -253,12 +250,7 @@ box_process_rw(struct request *request, struct space *space,
 		 * all during local recovery, since journal_write
 		 * is faked at this stage and returns immediately.
 		 */
-		if (is_local_recovery) {
-			res = txn_commit_try_async(txn);
-		} else {
-			res = txn_commit(txn);
-		}
-		if (res < 0)
+		if (txn_commit(txn) < 0)
 			goto error;
 	        fiber_gc();
 	}
@@ -3037,7 +3029,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	memtx = (struct memtx_engine *)engine_by_name("memtx");
 	assert(memtx != NULL);
 
-	is_local_recovery = true;
 	recovery_journal_create(&recovery->vclock);
 
 	/*
@@ -3096,7 +3087,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	}
 	stream_guard.is_active = false;
 	recovery_finalize(recovery);
-	is_local_recovery = false;
 
 	/*
 	 * We must enable WAL before finalizing engine recovery,
-- 
2.24.3 (Apple Git-128)


^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 1/3] vinyl: handle multi-statement recovery txns
  2021-04-01 22:23 ` [Tarantool-patches] [PATCH 1/3] vinyl: handle multi-statement recovery txns Vladislav Shpilevoy via Tarantool-patches
@ 2021-04-02  9:24   ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 14+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-04-02  9:24 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov, korablev



02.04.2021 01:23, Vladislav Shpilevoy пишет:
> During recovery and xlog replay vinyl skips the statements already
> stored in runs. Indeed, their re-insertion into the mems would
> lead to their second dump otherwise.
>
> But that results into an issue that the recovery transactions in
> vinyl don't have a write set - their tx->log is empty. On the
> other hand they still are added to the write set (xm->writers).
> Probably so as not to have too many checks "skip if in recovery"
> all over the code.
>
> It works fine with single-statement transactions, but would break
> on multi-statement transactions. Because the decision whether
> need to add to the write set was done based on the tx's log
> emptiness. It is always empty, and so the transaction could be
> added to the write set twice and corrupt its list-link member.
>
> The patch makes the decision about being added to the write set
> based on emptiness of the list-link member instead of the log so
> it works fine both during recovery and normal operation.
>
> Needed for #5874

Hi! Thanks for the patch!
Looks fine at first glance.

> ---
>   src/box/vy_tx.c | 9 ++++++++-
>   1 file changed, 8 insertions(+), 1 deletion(-)
>
> diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
> index ff63cd7a1..cd210beb0 100644
> --- a/src/box/vy_tx.c
> +++ b/src/box/vy_tx.c
> @@ -899,8 +899,15 @@ vy_tx_begin_statement(struct vy_tx *tx, struct space *space, void **savepoint)
>   	}
>   	assert(tx->state == VINYL_TX_READY);
>   	tx->last_stmt_space = space;
> -	if (stailq_empty(&tx->log))
> +	/*
> +	 * When want to add to the writer list, can't rely on the log emptiness.
> +	 * During recovery it is empty always for the data stored both in runs
> +	 * and xlogs. Must check the list member explicitly.
> +	 */
> +	if (rlist_empty(&tx->in_writers)) {
> +		assert(stailq_empty(&tx->log));
>   		rlist_add_entry(&tx->xm->writers, tx, in_writers);
> +	}
>   	*savepoint = stailq_last(&tx->log);
>   	return 0;
>   }

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 0/3] Transactional recovery
  2021-04-01 22:23 [Tarantool-patches] [PATCH 0/3] Transactional recovery Vladislav Shpilevoy via Tarantool-patches
                   ` (2 preceding siblings ...)
  2021-04-01 22:23 ` [Tarantool-patches] [PATCH 3/3] box: remove is_local_recovery variable Vladislav Shpilevoy via Tarantool-patches
@ 2021-04-02  9:42 ` Konstantin Osipov via Tarantool-patches
  2021-04-05 16:14 ` Kirill Yukhin via Tarantool-patches
  4 siblings, 0 replies; 14+ messages in thread
From: Konstantin Osipov via Tarantool-patches @ 2021-04-02  9:42 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org> [21/04/02 09:27]:

> The patchset makes the recovery transactional. That is done for
> the synchronous transactions, because they might be followed by a
> ROLLBACK, and then must be reverted entirely, including their
> statements working with non-sync spaces.
> 
> Nikita, I need you to check if the first commit is correct. It
> touches vinyl.

This is nice to see, I'm just curious, shouldn't you 
tell the community that sync repl. is currently broken?

I'm seeing the same with interactive transactions also released as
"stable" without gap looks and with corrupt secondary indexes, so looks like
the issue is systemic.

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/3] recovery: make it transactional
  2021-04-01 22:23 ` [Tarantool-patches] [PATCH 2/3] recovery: make it transactional Vladislav Shpilevoy via Tarantool-patches
@ 2021-04-02 11:47   ` Serge Petrenko via Tarantool-patches
  2021-04-03 13:18     ` Vladislav Shpilevoy via Tarantool-patches
  2021-04-02 15:11   ` Cyrill Gorcunov via Tarantool-patches
  1 sibling, 1 reply; 14+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-04-02 11:47 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov, korablev



02.04.2021 01:23, Vladislav Shpilevoy пишет:
> Recovery used to be performed row by row. It was fine because
> anyway all the persisted rows are supposed to be committed, and
> should not meet any problems during recovery so a transaction
> could be applied partially.
>
> But it became not true after the synchronous replication
> introduction. Synchronous transactions might be in the log, but
> can be followed by a ROLLBACK record which is supposed to delete
> them.
>
> During row-by-row recovery, firstly, the synchro rows each turned
> into a sync transaction. Which is probably fine. But the rows on
> non-sync spaces which were a part of a sync transaction, could be
> applied right away bypassing the limbo leading to all kind of the
> sweet errors like duplicate keys, or inconsistency of a partially
> applied transaction.
>
> The patch makes the recovery transactional. Either an entire
> transaction is recovered, or it is rolled back which normally
> happens only for synchro transactions followed by ROLLBACK.
>
> In force recovery of a broken log the consistency is not
> guaranteed though.
>
> Closes #5874

Thanks for the patch!

I ran the tests locally and replication suite fails occasionally with
```
[009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:539 E> 
error at request: {type: 'INSERT', replica_id: 0, lsn: 7, space_id: 517, 
index_id: 0, tuple: [21]}
[009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:481 E> 
XlogError: found a first row in a transaction with LSN/TSN mismatch
[009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't 
initialize storage: found a first row in a transaction with LSN/TSN mismatch
[009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't 
initialize storage: found a first row in a transaction with LSN/TSN mismatch
```

> ---
>   .../unreleased/qsync-multi-statement-recovery |   5 +
>   src/box/box.cc                                | 261 ++++++++++++++++--
>   .../gh-5874-qsync-txn-recovery.result         | 124 +++++++++
>   .../gh-5874-qsync-txn-recovery.test.lua       |  64 +++++
>   4 files changed, 427 insertions(+), 27 deletions(-)
>   create mode 100644 changelogs/unreleased/qsync-multi-statement-recovery
>   create mode 100644 test/replication/gh-5874-qsync-txn-recovery.result
>   create mode 100644 test/replication/gh-5874-qsync-txn-recovery.test.lua
>
...
> +/**
> + * Rows of the same transaction are wrapped into begin/commit. Mostly for the
> + * sake of synchronous replication, when the log can contain rolled back
> + * transactions, which must be entirely reverted during recovery when ROLLBACK
> + * records are met. Row-by-row recovery wouldn't work for multi-statement
> + * synchronous transactions.
> + */
> +static int
> +wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
>   {
>   	struct request request;
> -	if (iproto_type_is_synchro_request(row->type)) {
> -		struct synchro_request syn_req;
> -		if (xrow_decode_synchro(row, &syn_req) != 0)
> -			diag_raise();
> -		txn_limbo_process(&txn_limbo, &syn_req);
> -		return;
> +	uint64_t req_type = dml_request_key_map(row->type);
> +	if (xrow_decode_dml(row, &request, req_type) != 0) {
> +		say_error("couldn't decode a DML request");
> +		return -1;
>   	}
> -	if (iproto_type_is_raft_request(row->type)) {
> -		struct raft_request raft_req;
> -		/* Vclock is never persisted in WAL by Raft. */
> -		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
> -			diag_raise();
> -		box_raft_recover(&raft_req);
> -		return;
> +	/*
> +	 * Note that all the information which came from the log is validated
> +	 * and the errors are handled. Not asserted or paniced. That is for the
> +	 * sake of force recovery, which must be able to recover just everything
> +	 * what possible instead of terminating the instance.
> +	 */
> +	struct txn *txn;
> +	if (stream->tsn == 0) {
> +		if (row->tsn == 0) {
> +			diag_set(XlogError, "found a row without TSN");
> +			goto end_diag_request;
> +		}
> +		if (row->tsn != row->lsn) {
> +			diag_set(XlogError, "found a first row in a "
> +				 "transaction with LSN/TSN mismatch");
> +			goto end_diag_request;
> +		}
> +		stream->tsn = row->tsn;
> +		/*
> +		 * Rows are not stacked into a list like during replication,
> +		 * because recovery does not yield while reading the rows. All
> +		 * the yields are controlled by the stream, and therefore no
> +		 * need to wait for all the rows to start a transaction. Can
> +		 * start now, apply the rows, and make a yield after commit if
> +		 * necessary. Helps to avoid a lot of copying.
> +		 */

Nice solution!

> +		txn = txn_begin();
> +		if (txn == NULL) {
> +			say_error("couldn't begin a recovery transaction");
> +			return -1;
> +		}
> +	} else if (row->tsn != stream->tsn) {
> +		diag_set(XlogError, "found a next transaction with the "
> +			 "previous one not yet committed");
> +		goto end_diag_request;
> +	} else {
> +		txn = in_txn();
>   	}
> -	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> +	assert(wal_stream_has_tx(stream));
> +	/* Nops might appear at least after before_replace skipping rows. */
>   	if (request.type != IPROTO_NOP) {
> -		struct space *space = space_cache_find_xc(request.space_id);
> +		struct space *space = space_cache_find(request.space_id);
> +		if (space == NULL) {
> +			say_error("couldn't find space by ID");
> +			goto end_diag_request;
> +		}
>   		if (box_process_rw(&request, space, NULL) != 0) {
> -			say_error("error applying row: %s", request_str(&request));
> -			diag_raise();
> +			say_error("couldn't apply the request");
> +			goto end_diag_request;
>   		}
>   	}
> -	struct wal_stream *xstream =
> -		container_of(stream, struct wal_stream, base);
> -	/**
> -	 * Yield once in a while, but not too often,
> -	 * mostly to allow signal handling to take place.
> +	assert(txn != NULL);
> +	if (!row->is_commit)
> +		return 0;
> +
> +	stream->tsn = 0;
> +	if (txn_commit_try_async(txn) != 0) {
> +		/* Commit fail automatically leads to rollback. */
> +		assert(in_txn() == NULL);
> +		say_error("couldn't commit a recovery transaction");
> +		return -1;
> +	}
> +	assert(in_txn() == NULL);
> +	fiber_gc();
> +	return 0;
> +
> +end_diag_request:
> +	/*
> +	 * The label must be used only for the errors related directly to the
> +	 * request. Errors like txn_begin() fail has nothing to do with it, and
> +	 * therefore don't log the request as the fault reason.
> +	 */
> +	say_error("error at request: %s", request_str(&request));
> +	return -1;
> +}
> +
> +/**
> + * Yield once in a while, but not too often, mostly to allow signal handling to
> + * take place.
> + */
> +static void
> +wal_stream_try_yield(struct wal_stream *stream)
> +{
> +	bool needs_yield = (stream->rows % WAL_ROWS_PER_YIELD == 0);
> +	if (wal_stream_has_tx(stream)) {
                                       ^ && needs_yield ?

> +		/*
> +		 * Save the yield. Otherwise it would happen only on rows which
> +		 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
> +		 * transaction, which is probably a very rare coincidence.
> +		 */
> +		stream->has_yield = true;
> +		return;
> +	}
> +	if (stream->has_yield)
> +		stream->has_yield = false;
> +	else if (!needs_yield)
> +		return;
> +	fiber_sleep(0);
> +}

Consider this diff. It looks simpler IMO, but feel free to ignore.

=============================
diff --git a/src/box/box.cc b/src/box/box.cc
index 8eacbfebb..f31bc8600 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -547,20 +547,15 @@ end_diag_request:
  static void
  wal_stream_try_yield(struct wal_stream *stream)
  {
-       bool needs_yield = (stream->rows % WAL_ROWS_PER_YIELD == 0);
-       if (wal_stream_has_tx(stream)) {
-               /*
-                * Save the yield. Otherwise it would happen only on 
rows which
-                * are a multiple of WAL_ROWS_PER_YIELD and are last in 
their
-                * transaction, which is probably a very rare coincidence.
-                */
-               stream->has_yield = true;
-               return;
-       }
-       if (stream->has_yield)
-               stream->has_yield = false;
-       else if (!needs_yield)
+       /*
+        * Save the yield. Otherwise it would happen only on rows which
+        * are a multiple of WAL_ROWS_PER_YIELD and are last in their
+        * transaction, which is probably a very rare coincidence.
+        */
+       stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
+       if (wal_stream_has_tx(stream) || !stream->has_yield)
                 return;
+       stream->has_yield = false;
         fiber_sleep(0);
  }

=============================

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 3/3] box: remove is_local_recovery variable
  2021-04-01 22:23 ` [Tarantool-patches] [PATCH 3/3] box: remove is_local_recovery variable Vladislav Shpilevoy via Tarantool-patches
@ 2021-04-02 11:47   ` Serge Petrenko via Tarantool-patches
  2021-04-03 13:18     ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 14+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-04-02 11:47 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov, korablev

Hi! Thanks for the patch!

02.04.2021 01:23, Vladislav Shpilevoy пишет:
> It was used so as to recover synchronous auto-commit transactions
> in an async way (not blocking the fiber). But it became not
> necessary since #5874 was fixed. Because recovery does not use
> auto-commit transactions anymore.
>
> Closes #5194
> ---
>   src/box/box.cc | 12 +-----------
>   1 file changed, 1 insertion(+), 11 deletions(-)
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index f70a2bd0e..8eacbfebb 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -125,8 +125,6 @@ static struct gc_checkpoint_ref backup_gc;
>   static bool is_box_configured = false;
>   static bool is_ro = true;
>   static fiber_cond ro_cond;
> -/** Set to true during recovery from local files. */
> -static bool is_local_recovery = false;
>   
>   /**
>    * The following flag is set if the instance failed to
> @@ -241,7 +239,6 @@ box_process_rw(struct request *request, struct space *space,
>   		goto rollback;
>   
>   	if (is_autocommit) {
> -		int res = 0;
>   		/*
>   		 * During local recovery the commit procedure
>   		 * should be async, otherwise the only fiber
> @@ -253,12 +250,7 @@ box_process_rw(struct request *request, struct space *space,
>   		 * all during local recovery, since journal_write
>   		 * is faked at this stage and returns immediately.
>   		 */

The comment above doesn't belong here anymore.
I propose you move it to `wal_stream_apply_dml_row`, where 
txn_commit_try_async()
is called now.

LGTM otherwise.

> -		if (is_local_recovery) {
> -			res = txn_commit_try_async(txn);
> -		} else {
> -			res = txn_commit(txn);
> -		}
> -		if (res < 0)
> +		if (txn_commit(txn) < 0)
>   			goto error;
>   	        fiber_gc();
>   	}
> @@ -3037,7 +3029,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   	memtx = (struct memtx_engine *)engine_by_name("memtx");
>   	assert(memtx != NULL);
>   
> -	is_local_recovery = true;
>   	recovery_journal_create(&recovery->vclock);
>   
>   	/*
> @@ -3096,7 +3087,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   	}
>   	stream_guard.is_active = false;
>   	recovery_finalize(recovery);
> -	is_local_recovery = false;
>   
>   	/*
>   	 * We must enable WAL before finalizing engine recovery,

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/3] recovery: make it transactional
  2021-04-01 22:23 ` [Tarantool-patches] [PATCH 2/3] recovery: make it transactional Vladislav Shpilevoy via Tarantool-patches
  2021-04-02 11:47   ` Serge Petrenko via Tarantool-patches
@ 2021-04-02 15:11   ` Cyrill Gorcunov via Tarantool-patches
  1 sibling, 0 replies; 14+ messages in thread
From: Cyrill Gorcunov via Tarantool-patches @ 2021-04-02 15:11 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

On Fri, Apr 02, 2021 at 12:23:43AM +0200, Vladislav Shpilevoy wrote:
> +
> +/**
> + * Yield once in a while, but not too often, mostly to allow signal handling to
> + * take place.
> + */
> +static void
> +wal_stream_try_yield(struct wal_stream *stream)
> +{
> +	bool needs_yield = (stream->rows % WAL_ROWS_PER_YIELD == 0);

Maybe worth to move this calculation below?

> +	if (wal_stream_has_tx(stream)) {
> +		/*
> +		 * Save the yield. Otherwise it would happen only on rows which
> +		 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
> +		 * transaction, which is probably a very rare coincidence.
> +		 */
> +		stream->has_yield = true;
> +		return;
> +	}

Here -->

So it won't take place if not needed. Not a big deal though
up to you.

> +	if (stream->has_yield)
> +		stream->has_yield = false;
> +	else if (!needs_yield)
> +		return;
> +	fiber_sleep(0);
> +}

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/3] recovery: make it transactional
  2021-04-02 11:47   ` Serge Petrenko via Tarantool-patches
@ 2021-04-03 13:18     ` Vladislav Shpilevoy via Tarantool-patches
  2021-04-05  8:36       ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 14+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-04-03 13:18 UTC (permalink / raw)
  To: Serge Petrenko, tarantool-patches, gorcunov, korablev

Hi! Thanks for the review!

> I ran the tests locally and replication suite fails occasionally with
> ```
> [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:539 E> error at request: {type: 'INSERT', replica_id: 0, lsn: 7, space_id: 517, index_id: 0, tuple: [21]}
> [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:481 E> XlogError: found a first row in a transaction with LSN/TSN mismatch
> [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch
> [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch
> ```

Thanks for noticing! There was a bug with transactions starting
from a local row - its LSN is from vclock[0] while for global
rows it is from vclock[instance_id > 0].

I fixed it by checking LSN == TSN only for the first global row.
If all rows are local, I check LSN matches the first row's LSN.

Here is the diff.

====================
diff --git a/src/box/box.cc b/src/box/box.cc
index f70a2bd0e..67b44c053 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -338,12 +338,22 @@ struct wal_stream {
 	struct xstream base;
 	/** Current transaction ID. 0 when no transaction. */
 	int64_t tsn;
+	/**
+	 * LSN of the first row saved to check TSN and LSN match in case all
+	 * rows of the tx appeared to be local.
+	 */
+	int64_t first_row_lsn;
 	/**
 	 * Flag whether there is a pending yield to do when the current
 	 * transaction is finished. It can't always be done right away because
 	 * would abort the current transaction if it is memtx.
 	 */
 	bool has_yield;
+	/**
+	 * True if any row in the transaction was global. Saved to check if TSN
+	 * matches LSN of a first global row.
+	 */
+	bool has_global_row;
 	/** How many rows have been recovered so far. */
 	size_t rows;
 };
@@ -484,12 +494,9 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
 			diag_set(XlogError, "found a row without TSN");
 			goto end_diag_request;
 		}
-		if (row->tsn != row->lsn) {
-			diag_set(XlogError, "found a first row in a "
-				 "transaction with LSN/TSN mismatch");
-			goto end_diag_request;
-		}
 		stream->tsn = row->tsn;
+		stream->first_row_lsn = row->lsn;
+		stream->has_global_row = false;
 		/*
 		 * Rows are not stacked into a list like during replication,
 		 * because recovery does not yield while reading the rows. All
@@ -510,6 +517,15 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
 	} else {
 		txn = in_txn();
 	}
+	/* Ensure TSN is equal to LSN of the first global row. */
+	if (!stream->has_global_row && row->group_id != GROUP_LOCAL) {
+		if (row->tsn != row->lsn) {
+			diag_set(XlogError, "found a first global row in a "
+				 "transaction with LSN/TSN mismatch");
+			goto end_diag_request;
+		}
+		stream->has_global_row = true;
+	}
 	assert(wal_stream_has_tx(stream));
 	/* Nops might appear at least after before_replace skipping rows. */
 	if (request.type != IPROTO_NOP) {
@@ -526,8 +542,26 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
 	assert(txn != NULL);
 	if (!row->is_commit)
 		return 0;
-
+	/*
+	 * For fully local transactions the TSN check won't work like for global
+	 * transactions, because it is not known if there are global rows until
+	 * commit arrives.
+	 */
+	if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) {
+		diag_set(XlogError, "fully local transaction's TSN does not "
+			 "match LSN of the first row");
+		return -1;
+	}
 	stream->tsn = 0;
+	/*
+	 * During local recovery the commit procedure should be async, otherwise
+	 * the only fiber processing recovery will get stuck on the first
+	 * synchronous tx it meets until confirm timeout is reached and the tx
+	 * is rolled back, yielding an error.
+	 * Moreover, txn_commit_try_async() doesn't hurt at all during local
+	 * recovery, since journal_write is faked at this stage and returns
+	 * immediately.
+	 */
 	if (txn_commit_try_async(txn) != 0) {
 		/* Commit fail automatically leads to rollback. */
 		assert(in_txn() == NULL);
@@ -555,20 +589,15 @@ end_diag_request:
 static void
 wal_stream_try_yield(struct wal_stream *stream)
 {
-	bool needs_yield = (stream->rows % WAL_ROWS_PER_YIELD == 0);
-	if (wal_stream_has_tx(stream)) {
-		/*
-		 * Save the yield. Otherwise it would happen only on rows which
-		 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
-		 * transaction, which is probably a very rare coincidence.
-		 */
-		stream->has_yield = true;
-		return;
-	}
-	if (stream->has_yield)
-		stream->has_yield = false;
-	else if (!needs_yield)
+	/*
+	 * Save the yield. Otherwise it would happen only on rows which
+	 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
+	 * transaction, which is probably a very rare coincidence.
+	 */
+	stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
+	if (wal_stream_has_tx(stream) || !stream->has_yield)
 		return;
+	stream->has_yield = false;
 	fiber_sleep(0);
 }
 
@@ -605,7 +634,9 @@ wal_stream_create(struct wal_stream *ctx)
 {
 	xstream_create(&ctx->base, wal_stream_apply_row);
 	ctx->tsn = 0;
+	ctx->first_row_lsn = 0;
 	ctx->has_yield = false;
+	ctx->has_global_row = false;
 	ctx->rows = 0;
 }
 
diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result
index a64eadd9c..73f903ca7 100644
--- a/test/replication/gh-5874-qsync-txn-recovery.result
+++ b/test/replication/gh-5874-qsync-txn-recovery.result
@@ -95,6 +95,31 @@ box.commit()
  | ---
  | ...
 
+--
+-- First row might be for a local space and its LSN won't match TSN. Need to be
+-- ok with that.
+--
+loc = box.schema.create_space('loc', {is_local = true, engine = engine})
+ | ---
+ | ...
+_ = loc:create_index('pk')
+ | ---
+ | ...
+box.begin()                                                                     \
+loc:replace{1}                                                                  \
+async:replace{9}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- All local.
+box.begin()                                                                     \
+loc:replace{2}                                                                  \
+loc:replace{3}                                                                  \
+box.commit()
+ | ---
+ | ...
+
 test_run:cmd('restart server default')
  | 
 async = box.space.async
@@ -103,12 +128,16 @@ async = box.space.async
 sync = box.space.sync
  | ---
  | ...
+loc = box.space.loc
+ | ---
+ | ...
 async:select()
  | ---
  | - - [1]
  |   - [3]
  |   - [4]
  |   - [6]
+ |   - [9]
  | ...
 sync:select()
  | ---
@@ -116,9 +145,18 @@ sync:select()
  |   - [2]
  |   - [3]
  | ...
+loc:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ | ...
 async:drop()
  | ---
  | ...
 sync:drop()
  | ---
  | ...
+loc:drop()
+ | ---
+ | ...
diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua
index efcf727cc..f35eb68de 100644
--- a/test/replication/gh-5874-qsync-txn-recovery.test.lua
+++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua
@@ -55,10 +55,30 @@ async:replace{8}
 do_skip = false                                                                 \
 box.commit()
 
+--
+-- First row might be for a local space and its LSN won't match TSN. Need to be
+-- ok with that.
+--
+loc = box.schema.create_space('loc', {is_local = true, engine = engine})
+_ = loc:create_index('pk')
+box.begin()                                                                     \
+loc:replace{1}                                                                  \
+async:replace{9}                                                                \
+box.commit()
+
+-- All local.
+box.begin()                                                                     \
+loc:replace{2}                                                                  \
+loc:replace{3}                                                                  \
+box.commit()
+
 test_run:cmd('restart server default')
 async = box.space.async
 sync = box.space.sync
+loc = box.space.loc
 async:select()
 sync:select()
+loc:select()
 async:drop()
 sync:drop()
+loc:drop()

====================

> Consider this diff. It looks simpler IMO, but feel free to ignore.

Applied your diff, see above.

The full patch:

====================
diff --git a/changelogs/unreleased/qsync-multi-statement-recovery b/changelogs/unreleased/qsync-multi-statement-recovery
new file mode 100644
index 000000000..c902cbe24
--- /dev/null
+++ b/changelogs/unreleased/qsync-multi-statement-recovery
@@ -0,0 +1,5 @@
+## bugfix/replication
+
+* Fix recovery of a rolled back multi-statement synchronous transaction which
+  could lead to the transaction being applied partially, and to recovery errors.
+  It happened in case the transaction worked with non-sync spaces (gh-5874).
diff --git a/src/box/box.cc b/src/box/box.cc
index 4da274976..67b44c053 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -334,7 +334,26 @@ box_set_orphan(bool orphan)
 }
 
 struct wal_stream {
+	/** Base class. */
 	struct xstream base;
+	/** Current transaction ID. 0 when no transaction. */
+	int64_t tsn;
+	/**
+	 * LSN of the first row saved to check TSN and LSN match in case all
+	 * rows of the tx appeared to be local.
+	 */
+	int64_t first_row_lsn;
+	/**
+	 * Flag whether there is a pending yield to do when the current
+	 * transaction is finished. It can't always be done right away because
+	 * would abort the current transaction if it is memtx.
+	 */
+	bool has_yield;
+	/**
+	 * True if any row in the transaction was global. Saved to check if TSN
+	 * matches LSN of a first global row.
+	 */
+	bool has_global_row;
 	/** How many rows have been recovered so far. */
 	size_t rows;
 };
@@ -379,47 +398,245 @@ recovery_journal_create(struct vclock *v)
 	journal_set(&journal.base);
 }
 
+/**
+ * Drop the stream to the initial state. It is supposed to be done when an error
+ * happens. Because in case of force recovery the stream will continue getting
+ * tuples. For that it must stay in a valid state and must handle them somehow.
+ *
+ * Now the stream simply drops the current transaction like it never happened,
+ * even if its commit-row wasn't met yet. Should be good enough for
+ * force-recovery when the consistency is already out of the game.
+ */
 static void
-apply_wal_row(struct xstream *stream, struct xrow_header *row)
+wal_stream_abort(struct wal_stream *stream)
+{
+	struct txn *tx = in_txn();
+	if (tx != NULL)
+		txn_rollback(tx);
+	stream->tsn = 0;
+	fiber_gc();
+}
+
+/**
+ * The wrapper exists only for the debug purposes, to ensure tsn being non-0 is
+ * in sync with the fiber's txn being non-NULL. It has nothing to do with the
+ * journal content, and therefore can use assertions instead of rigorous error
+ * checking even in release.
+ */
+static bool
+wal_stream_has_tx(const struct wal_stream *stream)
+{
+	bool has = stream->tsn != 0;
+	assert(has == (in_txn() != NULL));
+	return has;
+}
+
+static int
+wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
+{
+	assert(iproto_type_is_synchro_request(row->type));
+	if (wal_stream_has_tx(stream)) {
+		diag_set(XlogError, "found synchro request in a transaction");
+		return -1;
+	}
+	struct synchro_request syn_req;
+	if (xrow_decode_synchro(row, &syn_req) != 0) {
+		say_error("couldn't decode a synchro request");
+		return -1;
+	}
+	txn_limbo_process(&txn_limbo, &syn_req);
+	return 0;
+}
+
+static int
+wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row)
+{
+	assert(iproto_type_is_raft_request(row->type));
+	if (wal_stream_has_tx(stream)) {
+		diag_set(XlogError, "found raft request in a transaction");
+		return -1;
+	}
+	struct raft_request raft_req;
+	/* Vclock is never persisted in WAL by Raft. */
+	if (xrow_decode_raft(row, &raft_req, NULL) != 0) {
+		say_error("couldn't decode a raft request");
+		return -1;
+	}
+	box_raft_recover(&raft_req);
+	return 0;
+}
+
+/**
+ * Rows of the same transaction are wrapped into begin/commit. Mostly for the
+ * sake of synchronous replication, when the log can contain rolled back
+ * transactions, which must be entirely reverted during recovery when ROLLBACK
+ * records are met. Row-by-row recovery wouldn't work for multi-statement
+ * synchronous transactions.
+ */
+static int
+wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
 {
 	struct request request;
-	if (iproto_type_is_synchro_request(row->type)) {
-		struct synchro_request syn_req;
-		if (xrow_decode_synchro(row, &syn_req) != 0)
-			diag_raise();
-		txn_limbo_process(&txn_limbo, &syn_req);
-		return;
+	uint64_t req_type = dml_request_key_map(row->type);
+	if (xrow_decode_dml(row, &request, req_type) != 0) {
+		say_error("couldn't decode a DML request");
+		return -1;
 	}
-	if (iproto_type_is_raft_request(row->type)) {
-		struct raft_request raft_req;
-		/* Vclock is never persisted in WAL by Raft. */
-		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
-			diag_raise();
-		box_raft_recover(&raft_req);
-		return;
+	/*
+	 * Note that all the information which came from the log is validated
+	 * and the errors are handled. Not asserted or paniced. That is for the
+	 * sake of force recovery, which must be able to recover just everything
+	 * what possible instead of terminating the instance.
+	 */
+	struct txn *txn;
+	if (stream->tsn == 0) {
+		if (row->tsn == 0) {
+			diag_set(XlogError, "found a row without TSN");
+			goto end_diag_request;
+		}
+		stream->tsn = row->tsn;
+		stream->first_row_lsn = row->lsn;
+		stream->has_global_row = false;
+		/*
+		 * Rows are not stacked into a list like during replication,
+		 * because recovery does not yield while reading the rows. All
+		 * the yields are controlled by the stream, and therefore no
+		 * need to wait for all the rows to start a transaction. Can
+		 * start now, apply the rows, and make a yield after commit if
+		 * necessary. Helps to avoid a lot of copying.
+		 */
+		txn = txn_begin();
+		if (txn == NULL) {
+			say_error("couldn't begin a recovery transaction");
+			return -1;
+		}
+	} else if (row->tsn != stream->tsn) {
+		diag_set(XlogError, "found a next transaction with the "
+			 "previous one not yet committed");
+		goto end_diag_request;
+	} else {
+		txn = in_txn();
+	}
+	/* Ensure TSN is equal to LSN of the first global row. */
+	if (!stream->has_global_row && row->group_id != GROUP_LOCAL) {
+		if (row->tsn != row->lsn) {
+			diag_set(XlogError, "found a first global row in a "
+				 "transaction with LSN/TSN mismatch");
+			goto end_diag_request;
+		}
+		stream->has_global_row = true;
 	}
-	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
+	assert(wal_stream_has_tx(stream));
+	/* Nops might appear at least after before_replace skipping rows. */
 	if (request.type != IPROTO_NOP) {
-		struct space *space = space_cache_find_xc(request.space_id);
+		struct space *space = space_cache_find(request.space_id);
+		if (space == NULL) {
+			say_error("couldn't find space by ID");
+			goto end_diag_request;
+		}
 		if (box_process_rw(&request, space, NULL) != 0) {
-			say_error("error applying row: %s", request_str(&request));
-			diag_raise();
+			say_error("couldn't apply the request");
+			goto end_diag_request;
 		}
 	}
-	struct wal_stream *xstream =
-		container_of(stream, struct wal_stream, base);
-	/**
-	 * Yield once in a while, but not too often,
-	 * mostly to allow signal handling to take place.
+	assert(txn != NULL);
+	if (!row->is_commit)
+		return 0;
+	/*
+	 * For fully local transactions the TSN check won't work like for global
+	 * transactions, because it is not known if there are global rows until
+	 * commit arrives.
 	 */
-	if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
-		fiber_sleep(0);
+	if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) {
+		diag_set(XlogError, "fully local transaction's TSN does not "
+			 "match LSN of the first row");
+		return -1;
+	}
+	stream->tsn = 0;
+	/*
+	 * During local recovery the commit procedure should be async, otherwise
+	 * the only fiber processing recovery will get stuck on the first
+	 * synchronous tx it meets until confirm timeout is reached and the tx
+	 * is rolled back, yielding an error.
+	 * Moreover, txn_commit_try_async() doesn't hurt at all during local
+	 * recovery, since journal_write is faked at this stage and returns
+	 * immediately.
+	 */
+	if (txn_commit_try_async(txn) != 0) {
+		/* Commit fail automatically leads to rollback. */
+		assert(in_txn() == NULL);
+		say_error("couldn't commit a recovery transaction");
+		return -1;
+	}
+	assert(in_txn() == NULL);
+	fiber_gc();
+	return 0;
+
+end_diag_request:
+	/*
+	 * The label must be used only for the errors related directly to the
+	 * request. Errors like txn_begin() fail has nothing to do with it, and
+	 * therefore don't log the request as the fault reason.
+	 */
+	say_error("error at request: %s", request_str(&request));
+	return -1;
+}
+
+/**
+ * Yield once in a while, but not too often, mostly to allow signal handling to
+ * take place.
+ */
+static void
+wal_stream_try_yield(struct wal_stream *stream)
+{
+	/*
+	 * Save the yield. Otherwise it would happen only on rows which
+	 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
+	 * transaction, which is probably a very rare coincidence.
+	 */
+	stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
+	if (wal_stream_has_tx(stream) || !stream->has_yield)
+		return;
+	stream->has_yield = false;
+	fiber_sleep(0);
+}
+
+static void
+wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
+{
+	struct wal_stream *stream =
+		container_of(base, struct wal_stream, base);
+	/*
+	 * Account all rows, even non-DML, and even leading to an error. Because
+	 * still need to yield sometimes.
+	 */
+	++stream->rows;
+	if (iproto_type_is_synchro_request(row->type)) {
+		if (wal_stream_apply_synchro_row(stream, row) != 0)
+			goto end_error;
+	} else if (iproto_type_is_raft_request(row->type)) {
+		if (wal_stream_apply_raft_row(stream, row) != 0)
+			goto end_error;
+	} else if (wal_stream_apply_dml_row(stream, row) != 0) {
+		goto end_error;
+	}
+	wal_stream_try_yield(stream);
+	return;
+
+end_error:
+	wal_stream_abort(stream);
+	wal_stream_try_yield(stream);
+	diag_raise();
 }
 
 static void
 wal_stream_create(struct wal_stream *ctx)
 {
-	xstream_create(&ctx->base, apply_wal_row);
+	xstream_create(&ctx->base, wal_stream_apply_row);
+	ctx->tsn = 0;
+	ctx->first_row_lsn = 0;
+	ctx->has_yield = false;
+	ctx->has_global_row = false;
 	ctx->rows = 0;
 }
 
@@ -2797,9 +3014,13 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	struct wal_stream wal_stream;
 	wal_stream_create(&wal_stream);
+	auto stream_guard = make_scoped_guard([&]{
+		wal_stream_abort(&wal_stream);
+	});
 
 	struct recovery *recovery;
-	recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"),
+	bool is_force_recovery = cfg_geti("force_recovery");
+	recovery = recovery_new(wal_dir(), is_force_recovery,
 				checkpoint_vclock);
 
 	/*
@@ -2861,6 +3082,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	engine_begin_final_recovery_xc();
 	recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
+	if (wal_stream_has_tx(&wal_stream)) {
+		wal_stream_abort(&wal_stream);
+		diag_set(XlogError, "found a not finished transaction "
+			 "in the log");
+		if (!is_force_recovery)
+			diag_raise();
+		diag_log();
+	}
 	engine_end_recovery_xc();
 	/*
 	 * Leave hot standby mode, if any, only after
@@ -2880,6 +3109,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		}
 		recovery_stop_local(recovery);
 		recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
+		if (wal_stream_has_tx(&wal_stream)) {
+			wal_stream_abort(&wal_stream);
+			diag_set(XlogError, "found a not finished transaction "
+				 "in the log in hot standby mode");
+			if (!is_force_recovery)
+				diag_raise();
+			diag_log();
+		}
 		/*
 		 * Advance replica set vclock to reflect records
 		 * applied in hot standby mode.
@@ -2888,6 +3125,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		box_listen();
 		box_sync_replication(false);
 	}
+	stream_guard.is_active = false;
 	recovery_finalize(recovery);
 	is_local_recovery = false;
 
diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result
new file mode 100644
index 000000000..73f903ca7
--- /dev/null
+++ b/test/replication/gh-5874-qsync-txn-recovery.result
@@ -0,0 +1,162 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+--
+-- gh-5874: synchronous transactions should be recovered as whole units, not row
+-- by row. So as to be able to roll them back when ROLLBACK is recovered
+-- afterwards.
+--
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+ | ---
+ | ...
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+ | ---
+ | ...
+box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
+ | ---
+ | ...
+engine = test_run:get_cfg('engine')
+ | ---
+ | ...
+async = box.schema.create_space('async', {engine = engine})
+ | ---
+ | ...
+_ = async:create_index('pk')
+ | ---
+ | ...
+sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
+ | ---
+ | ...
+_ = sync:create_index('pk')
+ | ---
+ | ...
+
+-- The transaction fails, but is written to the log anyway.
+box.begin() async:insert{1} sync:insert{1} box.commit()
+ | ---
+ | - error: Quorum collection for a synchronous transaction is timed out
+ | ...
+-- Ok, the previous txn is rolled back.
+_ = async:insert{1}
+ | ---
+ | ...
+box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
+ | ---
+ | ...
+_ = sync:insert{1}
+ | ---
+ | ...
+-- Try multi-statement sync txn to see how it recovers.
+box.begin() sync:insert{2} sync:insert{3} box.commit()
+ | ---
+ | ...
+
+-- See if NOP multi-statement recovery works fine.
+--
+-- Start with NOP.
+do_skip = false
+ | ---
+ | ...
+_ = async:before_replace(function()                                             \
+    if do_skip then                                                             \
+        return nil                                                              \
+    end                                                                         \
+end)
+ | ---
+ | ...
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{2}                                                                \
+do_skip = false                                                                 \
+async:replace{3}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- NOP in the middle.
+box.begin()                                                                     \
+async:replace{4}                                                                \
+do_skip = true                                                                  \
+async:replace{5}                                                                \
+do_skip = false                                                                 \
+async:replace{6}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- All NOP.
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{7}                                                                \
+async:replace{8}                                                                \
+do_skip = false                                                                 \
+box.commit()
+ | ---
+ | ...
+
+--
+-- First row might be for a local space and its LSN won't match TSN. Need to be
+-- ok with that.
+--
+loc = box.schema.create_space('loc', {is_local = true, engine = engine})
+ | ---
+ | ...
+_ = loc:create_index('pk')
+ | ---
+ | ...
+box.begin()                                                                     \
+loc:replace{1}                                                                  \
+async:replace{9}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- All local.
+box.begin()                                                                     \
+loc:replace{2}                                                                  \
+loc:replace{3}                                                                  \
+box.commit()
+ | ---
+ | ...
+
+test_run:cmd('restart server default')
+ | 
+async = box.space.async
+ | ---
+ | ...
+sync = box.space.sync
+ | ---
+ | ...
+loc = box.space.loc
+ | ---
+ | ...
+async:select()
+ | ---
+ | - - [1]
+ |   - [3]
+ |   - [4]
+ |   - [6]
+ |   - [9]
+ | ...
+sync:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ | ...
+loc:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ | ...
+async:drop()
+ | ---
+ | ...
+sync:drop()
+ | ---
+ | ...
+loc:drop()
+ | ---
+ | ...
diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua
new file mode 100644
index 000000000..f35eb68de
--- /dev/null
+++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua
@@ -0,0 +1,84 @@
+test_run = require('test_run').new()
+--
+-- gh-5874: synchronous transactions should be recovered as whole units, not row
+-- by row. So as to be able to roll them back when ROLLBACK is recovered
+-- afterwards.
+--
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
+engine = test_run:get_cfg('engine')
+async = box.schema.create_space('async', {engine = engine})
+_ = async:create_index('pk')
+sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
+_ = sync:create_index('pk')
+
+-- The transaction fails, but is written to the log anyway.
+box.begin() async:insert{1} sync:insert{1} box.commit()
+-- Ok, the previous txn is rolled back.
+_ = async:insert{1}
+box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
+_ = sync:insert{1}
+-- Try multi-statement sync txn to see how it recovers.
+box.begin() sync:insert{2} sync:insert{3} box.commit()
+
+-- See if NOP multi-statement recovery works fine.
+--
+-- Start with NOP.
+do_skip = false
+_ = async:before_replace(function()                                             \
+    if do_skip then                                                             \
+        return nil                                                              \
+    end                                                                         \
+end)
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{2}                                                                \
+do_skip = false                                                                 \
+async:replace{3}                                                                \
+box.commit()
+
+-- NOP in the middle.
+box.begin()                                                                     \
+async:replace{4}                                                                \
+do_skip = true                                                                  \
+async:replace{5}                                                                \
+do_skip = false                                                                 \
+async:replace{6}                                                                \
+box.commit()
+
+-- All NOP.
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{7}                                                                \
+async:replace{8}                                                                \
+do_skip = false                                                                 \
+box.commit()
+
+--
+-- First row might be for a local space and its LSN won't match TSN. Need to be
+-- ok with that.
+--
+loc = box.schema.create_space('loc', {is_local = true, engine = engine})
+_ = loc:create_index('pk')
+box.begin()                                                                     \
+loc:replace{1}                                                                  \
+async:replace{9}                                                                \
+box.commit()
+
+-- All local.
+box.begin()                                                                     \
+loc:replace{2}                                                                  \
+loc:replace{3}                                                                  \
+box.commit()
+
+test_run:cmd('restart server default')
+async = box.space.async
+sync = box.space.sync
+loc = box.space.loc
+async:select()
+sync:select()
+loc:select()
+async:drop()
+sync:drop()
+loc:drop()

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 3/3] box: remove is_local_recovery variable
  2021-04-02 11:47   ` Serge Petrenko via Tarantool-patches
@ 2021-04-03 13:18     ` Vladislav Shpilevoy via Tarantool-patches
  2021-04-05  8:17       ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 14+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-04-03 13:18 UTC (permalink / raw)
  To: Serge Petrenko, tarantool-patches, gorcunov, korablev

Thanks for the review!

>> @@ -253,12 +250,7 @@ box_process_rw(struct request *request, struct space *space,
>>            * all during local recovery, since journal_write
>>            * is faked at this stage and returns immediately.
>>            */
> 
> The comment above doesn't belong here anymore.
> I propose you move it to `wal_stream_apply_dml_row`, where txn_commit_try_async()
> is called now.

Yeah, I totally forgot about this comment. Moved to try_async in recovery
in the previous commit.

The full patch:

====================
diff --git a/src/box/box.cc b/src/box/box.cc
index 67b44c053..fa8a254f9 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -125,8 +125,6 @@ static struct gc_checkpoint_ref backup_gc;
 static bool is_box_configured = false;
 static bool is_ro = true;
 static fiber_cond ro_cond;
-/** Set to true during recovery from local files. */
-static bool is_local_recovery = false;
 
 /**
  * The following flag is set if the instance failed to
@@ -241,24 +239,7 @@ box_process_rw(struct request *request, struct space *space,
 		goto rollback;
 
 	if (is_autocommit) {
-		int res = 0;
-		/*
-		 * During local recovery the commit procedure
-		 * should be async, otherwise the only fiber
-		 * processing recovery will get stuck on the first
-		 * synchronous tx it meets until confirm timeout
-		 * is reached and the tx is rolled back, yielding
-		 * an error.
-		 * Moreover, txn_commit_try_async() doesn't hurt at
-		 * all during local recovery, since journal_write
-		 * is faked at this stage and returns immediately.
-		 */
-		if (is_local_recovery) {
-			res = txn_commit_try_async(txn);
-		} else {
-			res = txn_commit(txn);
-		}
-		if (res < 0)
+		if (txn_commit(txn) < 0)
 			goto error;
 	        fiber_gc();
 	}
@@ -3068,7 +3049,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	memtx = (struct memtx_engine *)engine_by_name("memtx");
 	assert(memtx != NULL);
 
-	is_local_recovery = true;
 	recovery_journal_create(&recovery->vclock);
 
 	/*
@@ -3127,7 +3107,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	}
 	stream_guard.is_active = false;
 	recovery_finalize(recovery);
-	is_local_recovery = false;
 
 	/*
 	 * We must enable WAL before finalizing engine recovery,

^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 3/3] box: remove is_local_recovery variable
  2021-04-03 13:18     ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-04-05  8:17       ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 14+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-04-05  8:17 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov, korablev



03.04.2021 16:18, Vladislav Shpilevoy пишет:
> Thanks for the review!
>
>>> @@ -253,12 +250,7 @@ box_process_rw(struct request *request, struct space *space,
>>>             * all during local recovery, since journal_write
>>>             * is faked at this stage and returns immediately.
>>>             */
>> The comment above doesn't belong here anymore.
>> I propose you move it to `wal_stream_apply_dml_row`, where txn_commit_try_async()
>> is called now.
> Yeah, I totally forgot about this comment. Moved to try_async in recovery
> in the previous commit.

Thanks for the fixes! LGTM.
> The full patch:
>
> ====================
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 67b44c053..fa8a254f9 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -125,8 +125,6 @@ static struct gc_checkpoint_ref backup_gc;
>   static bool is_box_configured = false;
>   static bool is_ro = true;
>   static fiber_cond ro_cond;
> -/** Set to true during recovery from local files. */
> -static bool is_local_recovery = false;
>   
>   /**
>    * The following flag is set if the instance failed to
> @@ -241,24 +239,7 @@ box_process_rw(struct request *request, struct space *space,
>   		goto rollback;
>   
>   	if (is_autocommit) {
> -		int res = 0;
> -		/*
> -		 * During local recovery the commit procedure
> -		 * should be async, otherwise the only fiber
> -		 * processing recovery will get stuck on the first
> -		 * synchronous tx it meets until confirm timeout
> -		 * is reached and the tx is rolled back, yielding
> -		 * an error.
> -		 * Moreover, txn_commit_try_async() doesn't hurt at
> -		 * all during local recovery, since journal_write
> -		 * is faked at this stage and returns immediately.
> -		 */
> -		if (is_local_recovery) {
> -			res = txn_commit_try_async(txn);
> -		} else {
> -			res = txn_commit(txn);
> -		}
> -		if (res < 0)
> +		if (txn_commit(txn) < 0)
>   			goto error;
>   	        fiber_gc();
>   	}
> @@ -3068,7 +3049,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   	memtx = (struct memtx_engine *)engine_by_name("memtx");
>   	assert(memtx != NULL);
>   
> -	is_local_recovery = true;
>   	recovery_journal_create(&recovery->vclock);
>   
>   	/*
> @@ -3127,7 +3107,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   	}
>   	stream_guard.is_active = false;
>   	recovery_finalize(recovery);
> -	is_local_recovery = false;
>   
>   	/*
>   	 * We must enable WAL before finalizing engine recovery,

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 2/3] recovery: make it transactional
  2021-04-03 13:18     ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-04-05  8:36       ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 14+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-04-05  8:36 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov, korablev



03.04.2021 16:18, Vladislav Shpilevoy пишет:
> Hi! Thanks for the review!
>
>> I ran the tests locally and replication suite fails occasionally with
>> ```
>> [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:539 E> error at request: {type: 'INSERT', replica_id: 0, lsn: 7, space_id: 517, index_id: 0, tuple: [21]}
>> [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:481 E> XlogError: found a first row in a transaction with LSN/TSN mismatch
>> [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch
>> [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch
>> ```
> Thanks for noticing! There was a bug with transactions starting
> from a local row - its LSN is from vclock[0] while for global
> rows it is from vclock[instance_id > 0].

Oh, I see.

>
> I fixed it by checking LSN == TSN only for the first global row.
> If all rows are local, I check LSN matches the first row's LSN.
>
> Here is the diff.

Thanks! LGTM.

>
> ====================
> diff --git a/src/box/box.cc b/src/box/box.cc
> index f70a2bd0e..67b44c053 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -338,12 +338,22 @@ struct wal_stream {
>   	struct xstream base;
>   	/** Current transaction ID. 0 when no transaction. */
>   	int64_t tsn;
> +	/**
> +	 * LSN of the first row saved to check TSN and LSN match in case all
> +	 * rows of the tx appeared to be local.
> +	 */
> +	int64_t first_row_lsn;
>   	/**
>   	 * Flag whether there is a pending yield to do when the current
>   	 * transaction is finished. It can't always be done right away because
>   	 * would abort the current transaction if it is memtx.
>   	 */
>   	bool has_yield;
> +	/**
> +	 * True if any row in the transaction was global. Saved to check if TSN
> +	 * matches LSN of a first global row.
> +	 */
> +	bool has_global_row;
>   	/** How many rows have been recovered so far. */
>   	size_t rows;
>   };
> @@ -484,12 +494,9 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
>   			diag_set(XlogError, "found a row without TSN");
>   			goto end_diag_request;
>   		}
> -		if (row->tsn != row->lsn) {
> -			diag_set(XlogError, "found a first row in a "
> -				 "transaction with LSN/TSN mismatch");
> -			goto end_diag_request;
> -		}
>   		stream->tsn = row->tsn;
> +		stream->first_row_lsn = row->lsn;
> +		stream->has_global_row = false;
>   		/*
>   		 * Rows are not stacked into a list like during replication,
>   		 * because recovery does not yield while reading the rows. All
> @@ -510,6 +517,15 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
>   	} else {
>   		txn = in_txn();
>   	}
> +	/* Ensure TSN is equal to LSN of the first global row. */
> +	if (!stream->has_global_row && row->group_id != GROUP_LOCAL) {
> +		if (row->tsn != row->lsn) {
> +			diag_set(XlogError, "found a first global row in a "
> +				 "transaction with LSN/TSN mismatch");
> +			goto end_diag_request;
> +		}
> +		stream->has_global_row = true;
> +	}
>   	assert(wal_stream_has_tx(stream));
>   	/* Nops might appear at least after before_replace skipping rows. */
>   	if (request.type != IPROTO_NOP) {
> @@ -526,8 +542,26 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
>   	assert(txn != NULL);
>   	if (!row->is_commit)
>   		return 0;
> -
> +	/*
> +	 * For fully local transactions the TSN check won't work like for global
> +	 * transactions, because it is not known if there are global rows until
> +	 * commit arrives.
> +	 */
> +	if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) {
> +		diag_set(XlogError, "fully local transaction's TSN does not "
> +			 "match LSN of the first row");
> +		return -1;
> +	}
>   	stream->tsn = 0;
> +	/*
> +	 * During local recovery the commit procedure should be async, otherwise
> +	 * the only fiber processing recovery will get stuck on the first
> +	 * synchronous tx it meets until confirm timeout is reached and the tx
> +	 * is rolled back, yielding an error.
> +	 * Moreover, txn_commit_try_async() doesn't hurt at all during local
> +	 * recovery, since journal_write is faked at this stage and returns
> +	 * immediately.
> +	 */
>   	if (txn_commit_try_async(txn) != 0) {
>   		/* Commit fail automatically leads to rollback. */
>   		assert(in_txn() == NULL);
> @@ -555,20 +589,15 @@ end_diag_request:
>   static void
>   wal_stream_try_yield(struct wal_stream *stream)
>   {
> -	bool needs_yield = (stream->rows % WAL_ROWS_PER_YIELD == 0);
> -	if (wal_stream_has_tx(stream)) {
> -		/*
> -		 * Save the yield. Otherwise it would happen only on rows which
> -		 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
> -		 * transaction, which is probably a very rare coincidence.
> -		 */
> -		stream->has_yield = true;
> -		return;
> -	}
> -	if (stream->has_yield)
> -		stream->has_yield = false;
> -	else if (!needs_yield)
> +	/*
> +	 * Save the yield. Otherwise it would happen only on rows which
> +	 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
> +	 * transaction, which is probably a very rare coincidence.
> +	 */
> +	stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
> +	if (wal_stream_has_tx(stream) || !stream->has_yield)
>   		return;
> +	stream->has_yield = false;
>   	fiber_sleep(0);
>   }
>   
> @@ -605,7 +634,9 @@ wal_stream_create(struct wal_stream *ctx)
>   {
>   	xstream_create(&ctx->base, wal_stream_apply_row);
>   	ctx->tsn = 0;
> +	ctx->first_row_lsn = 0;
>   	ctx->has_yield = false;
> +	ctx->has_global_row = false;
>   	ctx->rows = 0;
>   }
>   
> diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result
> index a64eadd9c..73f903ca7 100644
> --- a/test/replication/gh-5874-qsync-txn-recovery.result
> +++ b/test/replication/gh-5874-qsync-txn-recovery.result
> @@ -95,6 +95,31 @@ box.commit()
>    | ---
>    | ...
>   
> +--
> +-- First row might be for a local space and its LSN won't match TSN. Need to be
> +-- ok with that.
> +--
> +loc = box.schema.create_space('loc', {is_local = true, engine = engine})
> + | ---
> + | ...
> +_ = loc:create_index('pk')
> + | ---
> + | ...
> +box.begin()                                                                     \
> +loc:replace{1}                                                                  \
> +async:replace{9}                                                                \
> +box.commit()
> + | ---
> + | ...
> +
> +-- All local.
> +box.begin()                                                                     \
> +loc:replace{2}                                                                  \
> +loc:replace{3}                                                                  \
> +box.commit()
> + | ---
> + | ...
> +
>   test_run:cmd('restart server default')
>    |
>   async = box.space.async
> @@ -103,12 +128,16 @@ async = box.space.async
>   sync = box.space.sync
>    | ---
>    | ...
> +loc = box.space.loc
> + | ---
> + | ...
>   async:select()
>    | ---
>    | - - [1]
>    |   - [3]
>    |   - [4]
>    |   - [6]
> + |   - [9]
>    | ...
>   sync:select()
>    | ---
> @@ -116,9 +145,18 @@ sync:select()
>    |   - [2]
>    |   - [3]
>    | ...
> +loc:select()
> + | ---
> + | - - [1]
> + |   - [2]
> + |   - [3]
> + | ...
>   async:drop()
>    | ---
>    | ...
>   sync:drop()
>    | ---
>    | ...
> +loc:drop()
> + | ---
> + | ...
> diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua
> index efcf727cc..f35eb68de 100644
> --- a/test/replication/gh-5874-qsync-txn-recovery.test.lua
> +++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua
> @@ -55,10 +55,30 @@ async:replace{8}
>   do_skip = false                                                                 \
>   box.commit()
>   
> +--
> +-- First row might be for a local space and its LSN won't match TSN. Need to be
> +-- ok with that.
> +--
> +loc = box.schema.create_space('loc', {is_local = true, engine = engine})
> +_ = loc:create_index('pk')
> +box.begin()                                                                     \
> +loc:replace{1}                                                                  \
> +async:replace{9}                                                                \
> +box.commit()
> +
> +-- All local.
> +box.begin()                                                                     \
> +loc:replace{2}                                                                  \
> +loc:replace{3}                                                                  \
> +box.commit()
> +
>   test_run:cmd('restart server default')
>   async = box.space.async
>   sync = box.space.sync
> +loc = box.space.loc
>   async:select()
>   sync:select()
> +loc:select()
>   async:drop()
>   sync:drop()
> +loc:drop()
>
> ====================
>
>> Consider this diff. It looks simpler IMO, but feel free to ignore.
> Applied your diff, see above.
>
> The full patch:
>
> ====================
> diff --git a/changelogs/unreleased/qsync-multi-statement-recovery b/changelogs/unreleased/qsync-multi-statement-recovery
> new file mode 100644
> index 000000000..c902cbe24
> --- /dev/null
> +++ b/changelogs/unreleased/qsync-multi-statement-recovery
> @@ -0,0 +1,5 @@
> +## bugfix/replication
> +
> +* Fix recovery of a rolled back multi-statement synchronous transaction which
> +  could lead to the transaction being applied partially, and to recovery errors.
> +  It happened in case the transaction worked with non-sync spaces (gh-5874).
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 4da274976..67b44c053 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -334,7 +334,26 @@ box_set_orphan(bool orphan)
>   }
>   
>   struct wal_stream {
> +	/** Base class. */
>   	struct xstream base;
> +	/** Current transaction ID. 0 when no transaction. */
> +	int64_t tsn;
> +	/**
> +	 * LSN of the first row saved to check TSN and LSN match in case all
> +	 * rows of the tx appeared to be local.
> +	 */
> +	int64_t first_row_lsn;
> +	/**
> +	 * Flag whether there is a pending yield to do when the current
> +	 * transaction is finished. It can't always be done right away because
> +	 * would abort the current transaction if it is memtx.
> +	 */
> +	bool has_yield;
> +	/**
> +	 * True if any row in the transaction was global. Saved to check if TSN
> +	 * matches LSN of a first global row.
> +	 */
> +	bool has_global_row;
>   	/** How many rows have been recovered so far. */
>   	size_t rows;
>   };
> @@ -379,47 +398,245 @@ recovery_journal_create(struct vclock *v)
>   	journal_set(&journal.base);
>   }
>   
> +/**
> + * Drop the stream to the initial state. It is supposed to be done when an error
> + * happens. Because in case of force recovery the stream will continue getting
> + * tuples. For that it must stay in a valid state and must handle them somehow.
> + *
> + * Now the stream simply drops the current transaction like it never happened,
> + * even if its commit-row wasn't met yet. Should be good enough for
> + * force-recovery when the consistency is already out of the game.
> + */
>   static void
> -apply_wal_row(struct xstream *stream, struct xrow_header *row)
> +wal_stream_abort(struct wal_stream *stream)
> +{
> +	struct txn *tx = in_txn();
> +	if (tx != NULL)
> +		txn_rollback(tx);
> +	stream->tsn = 0;
> +	fiber_gc();
> +}
> +
> +/**
> + * The wrapper exists only for the debug purposes, to ensure tsn being non-0 is
> + * in sync with the fiber's txn being non-NULL. It has nothing to do with the
> + * journal content, and therefore can use assertions instead of rigorous error
> + * checking even in release.
> + */
> +static bool
> +wal_stream_has_tx(const struct wal_stream *stream)
> +{
> +	bool has = stream->tsn != 0;
> +	assert(has == (in_txn() != NULL));
> +	return has;
> +}
> +
> +static int
> +wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
> +{
> +	assert(iproto_type_is_synchro_request(row->type));
> +	if (wal_stream_has_tx(stream)) {
> +		diag_set(XlogError, "found synchro request in a transaction");
> +		return -1;
> +	}
> +	struct synchro_request syn_req;
> +	if (xrow_decode_synchro(row, &syn_req) != 0) {
> +		say_error("couldn't decode a synchro request");
> +		return -1;
> +	}
> +	txn_limbo_process(&txn_limbo, &syn_req);
> +	return 0;
> +}
> +
> +static int
> +wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row)
> +{
> +	assert(iproto_type_is_raft_request(row->type));
> +	if (wal_stream_has_tx(stream)) {
> +		diag_set(XlogError, "found raft request in a transaction");
> +		return -1;
> +	}
> +	struct raft_request raft_req;
> +	/* Vclock is never persisted in WAL by Raft. */
> +	if (xrow_decode_raft(row, &raft_req, NULL) != 0) {
> +		say_error("couldn't decode a raft request");
> +		return -1;
> +	}
> +	box_raft_recover(&raft_req);
> +	return 0;
> +}
> +
> +/**
> + * Rows of the same transaction are wrapped into begin/commit. Mostly for the
> + * sake of synchronous replication, when the log can contain rolled back
> + * transactions, which must be entirely reverted during recovery when ROLLBACK
> + * records are met. Row-by-row recovery wouldn't work for multi-statement
> + * synchronous transactions.
> + */
> +static int
> +wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
>   {
>   	struct request request;
> -	if (iproto_type_is_synchro_request(row->type)) {
> -		struct synchro_request syn_req;
> -		if (xrow_decode_synchro(row, &syn_req) != 0)
> -			diag_raise();
> -		txn_limbo_process(&txn_limbo, &syn_req);
> -		return;
> +	uint64_t req_type = dml_request_key_map(row->type);
> +	if (xrow_decode_dml(row, &request, req_type) != 0) {
> +		say_error("couldn't decode a DML request");
> +		return -1;
>   	}
> -	if (iproto_type_is_raft_request(row->type)) {
> -		struct raft_request raft_req;
> -		/* Vclock is never persisted in WAL by Raft. */
> -		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
> -			diag_raise();
> -		box_raft_recover(&raft_req);
> -		return;
> +	/*
> +	 * Note that all the information which came from the log is validated
> +	 * and the errors are handled. Not asserted or paniced. That is for the
> +	 * sake of force recovery, which must be able to recover just everything
> +	 * what possible instead of terminating the instance.
> +	 */
> +	struct txn *txn;
> +	if (stream->tsn == 0) {
> +		if (row->tsn == 0) {
> +			diag_set(XlogError, "found a row without TSN");
> +			goto end_diag_request;
> +		}
> +		stream->tsn = row->tsn;
> +		stream->first_row_lsn = row->lsn;
> +		stream->has_global_row = false;
> +		/*
> +		 * Rows are not stacked into a list like during replication,
> +		 * because recovery does not yield while reading the rows. All
> +		 * the yields are controlled by the stream, and therefore no
> +		 * need to wait for all the rows to start a transaction. Can
> +		 * start now, apply the rows, and make a yield after commit if
> +		 * necessary. Helps to avoid a lot of copying.
> +		 */
> +		txn = txn_begin();
> +		if (txn == NULL) {
> +			say_error("couldn't begin a recovery transaction");
> +			return -1;
> +		}
> +	} else if (row->tsn != stream->tsn) {
> +		diag_set(XlogError, "found a next transaction with the "
> +			 "previous one not yet committed");
> +		goto end_diag_request;
> +	} else {
> +		txn = in_txn();
> +	}
> +	/* Ensure TSN is equal to LSN of the first global row. */
> +	if (!stream->has_global_row && row->group_id != GROUP_LOCAL) {
> +		if (row->tsn != row->lsn) {
> +			diag_set(XlogError, "found a first global row in a "
> +				 "transaction with LSN/TSN mismatch");
> +			goto end_diag_request;
> +		}
> +		stream->has_global_row = true;
>   	}
> -	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> +	assert(wal_stream_has_tx(stream));
> +	/* Nops might appear at least after before_replace skipping rows. */
>   	if (request.type != IPROTO_NOP) {
> -		struct space *space = space_cache_find_xc(request.space_id);
> +		struct space *space = space_cache_find(request.space_id);
> +		if (space == NULL) {
> +			say_error("couldn't find space by ID");
> +			goto end_diag_request;
> +		}
>   		if (box_process_rw(&request, space, NULL) != 0) {
> -			say_error("error applying row: %s", request_str(&request));
> -			diag_raise();
> +			say_error("couldn't apply the request");
> +			goto end_diag_request;
>   		}
>   	}
> -	struct wal_stream *xstream =
> -		container_of(stream, struct wal_stream, base);
> -	/**
> -	 * Yield once in a while, but not too often,
> -	 * mostly to allow signal handling to take place.
> +	assert(txn != NULL);
> +	if (!row->is_commit)
> +		return 0;
> +	/*
> +	 * For fully local transactions the TSN check won't work like for global
> +	 * transactions, because it is not known if there are global rows until
> +	 * commit arrives.
>   	 */
> -	if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
> -		fiber_sleep(0);
> +	if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) {
> +		diag_set(XlogError, "fully local transaction's TSN does not "
> +			 "match LSN of the first row");
> +		return -1;
> +	}
> +	stream->tsn = 0;
> +	/*
> +	 * During local recovery the commit procedure should be async, otherwise
> +	 * the only fiber processing recovery will get stuck on the first
> +	 * synchronous tx it meets until confirm timeout is reached and the tx
> +	 * is rolled back, yielding an error.
> +	 * Moreover, txn_commit_try_async() doesn't hurt at all during local
> +	 * recovery, since journal_write is faked at this stage and returns
> +	 * immediately.
> +	 */
> +	if (txn_commit_try_async(txn) != 0) {
> +		/* Commit fail automatically leads to rollback. */
> +		assert(in_txn() == NULL);
> +		say_error("couldn't commit a recovery transaction");
> +		return -1;
> +	}
> +	assert(in_txn() == NULL);
> +	fiber_gc();
> +	return 0;
> +
> +end_diag_request:
> +	/*
> +	 * The label must be used only for the errors related directly to the
> +	 * request. Errors like txn_begin() fail has nothing to do with it, and
> +	 * therefore don't log the request as the fault reason.
> +	 */
> +	say_error("error at request: %s", request_str(&request));
> +	return -1;
> +}
> +
> +/**
> + * Yield once in a while, but not too often, mostly to allow signal handling to
> + * take place.
> + */
> +static void
> +wal_stream_try_yield(struct wal_stream *stream)
> +{
> +	/*
> +	 * Save the yield. Otherwise it would happen only on rows which
> +	 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
> +	 * transaction, which is probably a very rare coincidence.
> +	 */
> +	stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
> +	if (wal_stream_has_tx(stream) || !stream->has_yield)
> +		return;
> +	stream->has_yield = false;
> +	fiber_sleep(0);
> +}
> +
> +static void
> +wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
> +{
> +	struct wal_stream *stream =
> +		container_of(base, struct wal_stream, base);
> +	/*
> +	 * Account all rows, even non-DML, and even leading to an error. Because
> +	 * still need to yield sometimes.
> +	 */
> +	++stream->rows;
> +	if (iproto_type_is_synchro_request(row->type)) {
> +		if (wal_stream_apply_synchro_row(stream, row) != 0)
> +			goto end_error;
> +	} else if (iproto_type_is_raft_request(row->type)) {
> +		if (wal_stream_apply_raft_row(stream, row) != 0)
> +			goto end_error;
> +	} else if (wal_stream_apply_dml_row(stream, row) != 0) {
> +		goto end_error;
> +	}
> +	wal_stream_try_yield(stream);
> +	return;
> +
> +end_error:
> +	wal_stream_abort(stream);
> +	wal_stream_try_yield(stream);
> +	diag_raise();
>   }
>   
>   static void
>   wal_stream_create(struct wal_stream *ctx)
>   {
> -	xstream_create(&ctx->base, apply_wal_row);
> +	xstream_create(&ctx->base, wal_stream_apply_row);
> +	ctx->tsn = 0;
> +	ctx->first_row_lsn = 0;
> +	ctx->has_yield = false;
> +	ctx->has_global_row = false;
>   	ctx->rows = 0;
>   }
>   
> @@ -2797,9 +3014,13 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   
>   	struct wal_stream wal_stream;
>   	wal_stream_create(&wal_stream);
> +	auto stream_guard = make_scoped_guard([&]{
> +		wal_stream_abort(&wal_stream);
> +	});
>   
>   	struct recovery *recovery;
> -	recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"),
> +	bool is_force_recovery = cfg_geti("force_recovery");
> +	recovery = recovery_new(wal_dir(), is_force_recovery,
>   				checkpoint_vclock);
>   
>   	/*
> @@ -2861,6 +3082,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   
>   	engine_begin_final_recovery_xc();
>   	recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
> +	if (wal_stream_has_tx(&wal_stream)) {
> +		wal_stream_abort(&wal_stream);
> +		diag_set(XlogError, "found a not finished transaction "
> +			 "in the log");
> +		if (!is_force_recovery)
> +			diag_raise();
> +		diag_log();
> +	}
>   	engine_end_recovery_xc();
>   	/*
>   	 * Leave hot standby mode, if any, only after
> @@ -2880,6 +3109,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   		}
>   		recovery_stop_local(recovery);
>   		recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
> +		if (wal_stream_has_tx(&wal_stream)) {
> +			wal_stream_abort(&wal_stream);
> +			diag_set(XlogError, "found a not finished transaction "
> +				 "in the log in hot standby mode");
> +			if (!is_force_recovery)
> +				diag_raise();
> +			diag_log();
> +		}
>   		/*
>   		 * Advance replica set vclock to reflect records
>   		 * applied in hot standby mode.
> @@ -2888,6 +3125,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   		box_listen();
>   		box_sync_replication(false);
>   	}
> +	stream_guard.is_active = false;
>   	recovery_finalize(recovery);
>   	is_local_recovery = false;
>   
> diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result
> new file mode 100644
> index 000000000..73f903ca7
> --- /dev/null
> +++ b/test/replication/gh-5874-qsync-txn-recovery.result
> @@ -0,0 +1,162 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +--
> +-- gh-5874: synchronous transactions should be recovered as whole units, not row
> +-- by row. So as to be able to roll them back when ROLLBACK is recovered
> +-- afterwards.
> +--
> +old_synchro_quorum = box.cfg.replication_synchro_quorum
> + | ---
> + | ...
> +old_synchro_timeout = box.cfg.replication_synchro_timeout
> + | ---
> + | ...
> +box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
> + | ---
> + | ...
> +engine = test_run:get_cfg('engine')
> + | ---
> + | ...
> +async = box.schema.create_space('async', {engine = engine})
> + | ---
> + | ...
> +_ = async:create_index('pk')
> + | ---
> + | ...
> +sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
> + | ---
> + | ...
> +_ = sync:create_index('pk')
> + | ---
> + | ...
> +
> +-- The transaction fails, but is written to the log anyway.
> +box.begin() async:insert{1} sync:insert{1} box.commit()
> + | ---
> + | - error: Quorum collection for a synchronous transaction is timed out
> + | ...
> +-- Ok, the previous txn is rolled back.
> +_ = async:insert{1}
> + | ---
> + | ...
> +box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
> + | ---
> + | ...
> +_ = sync:insert{1}
> + | ---
> + | ...
> +-- Try multi-statement sync txn to see how it recovers.
> +box.begin() sync:insert{2} sync:insert{3} box.commit()
> + | ---
> + | ...
> +
> +-- See if NOP multi-statement recovery works fine.
> +--
> +-- Start with NOP.
> +do_skip = false
> + | ---
> + | ...
> +_ = async:before_replace(function()                                             \
> +    if do_skip then                                                             \
> +        return nil                                                              \
> +    end                                                                         \
> +end)
> + | ---
> + | ...
> +box.begin()                                                                     \
> +do_skip = true                                                                  \
> +async:replace{2}                                                                \
> +do_skip = false                                                                 \
> +async:replace{3}                                                                \
> +box.commit()
> + | ---
> + | ...
> +
> +-- NOP in the middle.
> +box.begin()                                                                     \
> +async:replace{4}                                                                \
> +do_skip = true                                                                  \
> +async:replace{5}                                                                \
> +do_skip = false                                                                 \
> +async:replace{6}                                                                \
> +box.commit()
> + | ---
> + | ...
> +
> +-- All NOP.
> +box.begin()                                                                     \
> +do_skip = true                                                                  \
> +async:replace{7}                                                                \
> +async:replace{8}                                                                \
> +do_skip = false                                                                 \
> +box.commit()
> + | ---
> + | ...
> +
> +--
> +-- First row might be for a local space and its LSN won't match TSN. Need to be
> +-- ok with that.
> +--
> +loc = box.schema.create_space('loc', {is_local = true, engine = engine})
> + | ---
> + | ...
> +_ = loc:create_index('pk')
> + | ---
> + | ...
> +box.begin()                                                                     \
> +loc:replace{1}                                                                  \
> +async:replace{9}                                                                \
> +box.commit()
> + | ---
> + | ...
> +
> +-- All local.
> +box.begin()                                                                     \
> +loc:replace{2}                                                                  \
> +loc:replace{3}                                                                  \
> +box.commit()
> + | ---
> + | ...
> +
> +test_run:cmd('restart server default')
> + |
> +async = box.space.async
> + | ---
> + | ...
> +sync = box.space.sync
> + | ---
> + | ...
> +loc = box.space.loc
> + | ---
> + | ...
> +async:select()
> + | ---
> + | - - [1]
> + |   - [3]
> + |   - [4]
> + |   - [6]
> + |   - [9]
> + | ...
> +sync:select()
> + | ---
> + | - - [1]
> + |   - [2]
> + |   - [3]
> + | ...
> +loc:select()
> + | ---
> + | - - [1]
> + |   - [2]
> + |   - [3]
> + | ...
> +async:drop()
> + | ---
> + | ...
> +sync:drop()
> + | ---
> + | ...
> +loc:drop()
> + | ---
> + | ...
> diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua
> new file mode 100644
> index 000000000..f35eb68de
> --- /dev/null
> +++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua
> @@ -0,0 +1,84 @@
> +test_run = require('test_run').new()
> +--
> +-- gh-5874: synchronous transactions should be recovered as whole units, not row
> +-- by row. So as to be able to roll them back when ROLLBACK is recovered
> +-- afterwards.
> +--
> +old_synchro_quorum = box.cfg.replication_synchro_quorum
> +old_synchro_timeout = box.cfg.replication_synchro_timeout
> +box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
> +engine = test_run:get_cfg('engine')
> +async = box.schema.create_space('async', {engine = engine})
> +_ = async:create_index('pk')
> +sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
> +_ = sync:create_index('pk')
> +
> +-- The transaction fails, but is written to the log anyway.
> +box.begin() async:insert{1} sync:insert{1} box.commit()
> +-- Ok, the previous txn is rolled back.
> +_ = async:insert{1}
> +box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
> +_ = sync:insert{1}
> +-- Try multi-statement sync txn to see how it recovers.
> +box.begin() sync:insert{2} sync:insert{3} box.commit()
> +
> +-- See if NOP multi-statement recovery works fine.
> +--
> +-- Start with NOP.
> +do_skip = false
> +_ = async:before_replace(function()                                             \
> +    if do_skip then                                                             \
> +        return nil                                                              \
> +    end                                                                         \
> +end)
> +box.begin()                                                                     \
> +do_skip = true                                                                  \
> +async:replace{2}                                                                \
> +do_skip = false                                                                 \
> +async:replace{3}                                                                \
> +box.commit()
> +
> +-- NOP in the middle.
> +box.begin()                                                                     \
> +async:replace{4}                                                                \
> +do_skip = true                                                                  \
> +async:replace{5}                                                                \
> +do_skip = false                                                                 \
> +async:replace{6}                                                                \
> +box.commit()
> +
> +-- All NOP.
> +box.begin()                                                                     \
> +do_skip = true                                                                  \
> +async:replace{7}                                                                \
> +async:replace{8}                                                                \
> +do_skip = false                                                                 \
> +box.commit()
> +
> +--
> +-- First row might be for a local space and its LSN won't match TSN. Need to be
> +-- ok with that.
> +--
> +loc = box.schema.create_space('loc', {is_local = true, engine = engine})
> +_ = loc:create_index('pk')
> +box.begin()                                                                     \
> +loc:replace{1}                                                                  \
> +async:replace{9}                                                                \
> +box.commit()
> +
> +-- All local.
> +box.begin()                                                                     \
> +loc:replace{2}                                                                  \
> +loc:replace{3}                                                                  \
> +box.commit()
> +
> +test_run:cmd('restart server default')
> +async = box.space.async
> +sync = box.space.sync
> +loc = box.space.loc
> +async:select()
> +sync:select()
> +loc:select()
> +async:drop()
> +sync:drop()
> +loc:drop()

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 14+ messages in thread

* Re: [Tarantool-patches] [PATCH 0/3] Transactional recovery
  2021-04-01 22:23 [Tarantool-patches] [PATCH 0/3] Transactional recovery Vladislav Shpilevoy via Tarantool-patches
                   ` (3 preceding siblings ...)
  2021-04-02  9:42 ` [Tarantool-patches] [PATCH 0/3] Transactional recovery Konstantin Osipov via Tarantool-patches
@ 2021-04-05 16:14 ` Kirill Yukhin via Tarantool-patches
  4 siblings, 0 replies; 14+ messages in thread
From: Kirill Yukhin via Tarantool-patches @ 2021-04-05 16:14 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

Hello,

On 02 апр 00:23, Vladislav Shpilevoy via Tarantool-patches wrote:
> The patchset makes the recovery transactional. That is done for
> the synchronous transactions, because they might be followed by a
> ROLLBACK, and then must be reverted entirely, including their
> statements working with non-sync spaces.
> 
> Nikita, I need you to check if the first commit is correct. It
> touches vinyl.
> 
> Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-5874-txn-recovery
> Issue: https://github.com/tarantool/tarantool/issues/5874

I've checked your patchset into 2.6, 2.7 and master.

--
Regards, Kirill Yukhin

^ permalink raw reply	[flat|nested] 14+ messages in thread

end of thread, other threads:[~2021-04-05 16:15 UTC | newest]

Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-04-01 22:23 [Tarantool-patches] [PATCH 0/3] Transactional recovery Vladislav Shpilevoy via Tarantool-patches
2021-04-01 22:23 ` [Tarantool-patches] [PATCH 1/3] vinyl: handle multi-statement recovery txns Vladislav Shpilevoy via Tarantool-patches
2021-04-02  9:24   ` Serge Petrenko via Tarantool-patches
2021-04-01 22:23 ` [Tarantool-patches] [PATCH 2/3] recovery: make it transactional Vladislav Shpilevoy via Tarantool-patches
2021-04-02 11:47   ` Serge Petrenko via Tarantool-patches
2021-04-03 13:18     ` Vladislav Shpilevoy via Tarantool-patches
2021-04-05  8:36       ` Serge Petrenko via Tarantool-patches
2021-04-02 15:11   ` Cyrill Gorcunov via Tarantool-patches
2021-04-01 22:23 ` [Tarantool-patches] [PATCH 3/3] box: remove is_local_recovery variable Vladislav Shpilevoy via Tarantool-patches
2021-04-02 11:47   ` Serge Petrenko via Tarantool-patches
2021-04-03 13:18     ` Vladislav Shpilevoy via Tarantool-patches
2021-04-05  8:17       ` Serge Petrenko via Tarantool-patches
2021-04-02  9:42 ` [Tarantool-patches] [PATCH 0/3] Transactional recovery Konstantin Osipov via Tarantool-patches
2021-04-05 16:14 ` Kirill Yukhin via Tarantool-patches

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox