[Tarantool-patches] [PATCH 2/3] recovery: make it transactional

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Apr 2 01:23:43 MSK 2021


Recovery used to be performed row by row. It was fine because
anyway all the persisted rows are supposed to be committed, and
should not meet any problems during recovery so a transaction
could be applied partially.

But it became not true after the synchronous replication
introduction. Synchronous transactions might be in the log, but
can be followed by a ROLLBACK record which is supposed to delete
them.

During row-by-row recovery, firstly, the synchro rows each turned
into a sync transaction. Which is probably fine. But the rows on
non-sync spaces which were a part of a sync transaction, could be
applied right away bypassing the limbo leading to all kind of the
sweet errors like duplicate keys, or inconsistency of a partially
applied transaction.

The patch makes the recovery transactional. Either an entire
transaction is recovered, or it is rolled back which normally
happens only for synchro transactions followed by ROLLBACK.

In force recovery of a broken log the consistency is not
guaranteed though.

Closes #5874
---
 .../unreleased/qsync-multi-statement-recovery |   5 +
 src/box/box.cc                                | 261 ++++++++++++++++--
 .../gh-5874-qsync-txn-recovery.result         | 124 +++++++++
 .../gh-5874-qsync-txn-recovery.test.lua       |  64 +++++
 4 files changed, 427 insertions(+), 27 deletions(-)
 create mode 100644 changelogs/unreleased/qsync-multi-statement-recovery
 create mode 100644 test/replication/gh-5874-qsync-txn-recovery.result
 create mode 100644 test/replication/gh-5874-qsync-txn-recovery.test.lua

diff --git a/changelogs/unreleased/qsync-multi-statement-recovery b/changelogs/unreleased/qsync-multi-statement-recovery
new file mode 100644
index 000000000..c902cbe24
--- /dev/null
+++ b/changelogs/unreleased/qsync-multi-statement-recovery
@@ -0,0 +1,5 @@
+## bugfix/replication
+
+* Fix recovery of a rolled back multi-statement synchronous transaction which
+  could lead to the transaction being applied partially, and to recovery errors.
+  It happened in case the transaction worked with non-sync spaces (gh-5874).
diff --git a/src/box/box.cc b/src/box/box.cc
index 4da274976..f70a2bd0e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -334,7 +334,16 @@ box_set_orphan(bool orphan)
 }
 
 struct wal_stream {
+	/** Base class. */
 	struct xstream base;
+	/** Current transaction ID. 0 when no transaction. */
+	int64_t tsn;
+	/**
+	 * Flag whether there is a pending yield to do when the current
+	 * transaction is finished. It can't always be done right away because
+	 * would abort the current transaction if it is memtx.
+	 */
+	bool has_yield;
 	/** How many rows have been recovered so far. */
 	size_t rows;
 };
@@ -379,47 +388,224 @@ recovery_journal_create(struct vclock *v)
 	journal_set(&journal.base);
 }
 
+/**
+ * Drop the stream to the initial state. It is supposed to be done when an error
+ * happens. Because in case of force recovery the stream will continue getting
+ * tuples. For that it must stay in a valid state and must handle them somehow.
+ *
+ * Now the stream simply drops the current transaction like it never happened,
+ * even if its commit-row wasn't met yet. Should be good enough for
+ * force-recovery when the consistency is already out of the game.
+ */
 static void
-apply_wal_row(struct xstream *stream, struct xrow_header *row)
+wal_stream_abort(struct wal_stream *stream)
+{
+	struct txn *tx = in_txn();
+	if (tx != NULL)
+		txn_rollback(tx);
+	stream->tsn = 0;
+	fiber_gc();
+}
+
+/**
+ * The wrapper exists only for the debug purposes, to ensure tsn being non-0 is
+ * in sync with the fiber's txn being non-NULL. It has nothing to do with the
+ * journal content, and therefore can use assertions instead of rigorous error
+ * checking even in release.
+ */
+static bool
+wal_stream_has_tx(const struct wal_stream *stream)
+{
+	bool has = stream->tsn != 0;
+	assert(has == (in_txn() != NULL));
+	return has;
+}
+
+static int
+wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
+{
+	assert(iproto_type_is_synchro_request(row->type));
+	if (wal_stream_has_tx(stream)) {
+		diag_set(XlogError, "found synchro request in a transaction");
+		return -1;
+	}
+	struct synchro_request syn_req;
+	if (xrow_decode_synchro(row, &syn_req) != 0) {
+		say_error("couldn't decode a synchro request");
+		return -1;
+	}
+	txn_limbo_process(&txn_limbo, &syn_req);
+	return 0;
+}
+
+static int
+wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row)
+{
+	assert(iproto_type_is_raft_request(row->type));
+	if (wal_stream_has_tx(stream)) {
+		diag_set(XlogError, "found raft request in a transaction");
+		return -1;
+	}
+	struct raft_request raft_req;
+	/* Vclock is never persisted in WAL by Raft. */
+	if (xrow_decode_raft(row, &raft_req, NULL) != 0) {
+		say_error("couldn't decode a raft request");
+		return -1;
+	}
+	box_raft_recover(&raft_req);
+	return 0;
+}
+
+/**
+ * Rows of the same transaction are wrapped into begin/commit. Mostly for the
+ * sake of synchronous replication, when the log can contain rolled back
+ * transactions, which must be entirely reverted during recovery when ROLLBACK
+ * records are met. Row-by-row recovery wouldn't work for multi-statement
+ * synchronous transactions.
+ */
+static int
+wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
 {
 	struct request request;
-	if (iproto_type_is_synchro_request(row->type)) {
-		struct synchro_request syn_req;
-		if (xrow_decode_synchro(row, &syn_req) != 0)
-			diag_raise();
-		txn_limbo_process(&txn_limbo, &syn_req);
-		return;
+	uint64_t req_type = dml_request_key_map(row->type);
+	if (xrow_decode_dml(row, &request, req_type) != 0) {
+		say_error("couldn't decode a DML request");
+		return -1;
 	}
-	if (iproto_type_is_raft_request(row->type)) {
-		struct raft_request raft_req;
-		/* Vclock is never persisted in WAL by Raft. */
-		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
-			diag_raise();
-		box_raft_recover(&raft_req);
-		return;
+	/*
+	 * Note that all the information which came from the log is validated
+	 * and the errors are handled. Not asserted or paniced. That is for the
+	 * sake of force recovery, which must be able to recover just everything
+	 * what possible instead of terminating the instance.
+	 */
+	struct txn *txn;
+	if (stream->tsn == 0) {
+		if (row->tsn == 0) {
+			diag_set(XlogError, "found a row without TSN");
+			goto end_diag_request;
+		}
+		if (row->tsn != row->lsn) {
+			diag_set(XlogError, "found a first row in a "
+				 "transaction with LSN/TSN mismatch");
+			goto end_diag_request;
+		}
+		stream->tsn = row->tsn;
+		/*
+		 * Rows are not stacked into a list like during replication,
+		 * because recovery does not yield while reading the rows. All
+		 * the yields are controlled by the stream, and therefore no
+		 * need to wait for all the rows to start a transaction. Can
+		 * start now, apply the rows, and make a yield after commit if
+		 * necessary. Helps to avoid a lot of copying.
+		 */
+		txn = txn_begin();
+		if (txn == NULL) {
+			say_error("couldn't begin a recovery transaction");
+			return -1;
+		}
+	} else if (row->tsn != stream->tsn) {
+		diag_set(XlogError, "found a next transaction with the "
+			 "previous one not yet committed");
+		goto end_diag_request;
+	} else {
+		txn = in_txn();
 	}
-	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
+	assert(wal_stream_has_tx(stream));
+	/* Nops might appear at least after before_replace skipping rows. */
 	if (request.type != IPROTO_NOP) {
-		struct space *space = space_cache_find_xc(request.space_id);
+		struct space *space = space_cache_find(request.space_id);
+		if (space == NULL) {
+			say_error("couldn't find space by ID");
+			goto end_diag_request;
+		}
 		if (box_process_rw(&request, space, NULL) != 0) {
-			say_error("error applying row: %s", request_str(&request));
-			diag_raise();
+			say_error("couldn't apply the request");
+			goto end_diag_request;
 		}
 	}
-	struct wal_stream *xstream =
-		container_of(stream, struct wal_stream, base);
-	/**
-	 * Yield once in a while, but not too often,
-	 * mostly to allow signal handling to take place.
+	assert(txn != NULL);
+	if (!row->is_commit)
+		return 0;
+
+	stream->tsn = 0;
+	if (txn_commit_try_async(txn) != 0) {
+		/* Commit fail automatically leads to rollback. */
+		assert(in_txn() == NULL);
+		say_error("couldn't commit a recovery transaction");
+		return -1;
+	}
+	assert(in_txn() == NULL);
+	fiber_gc();
+	return 0;
+
+end_diag_request:
+	/*
+	 * The label must be used only for the errors related directly to the
+	 * request. Errors like txn_begin() fail has nothing to do with it, and
+	 * therefore don't log the request as the fault reason.
+	 */
+	say_error("error at request: %s", request_str(&request));
+	return -1;
+}
+
+/**
+ * Yield once in a while, but not too often, mostly to allow signal handling to
+ * take place.
+ */
+static void
+wal_stream_try_yield(struct wal_stream *stream)
+{
+	bool needs_yield = (stream->rows % WAL_ROWS_PER_YIELD == 0);
+	if (wal_stream_has_tx(stream)) {
+		/*
+		 * Save the yield. Otherwise it would happen only on rows which
+		 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
+		 * transaction, which is probably a very rare coincidence.
+		 */
+		stream->has_yield = true;
+		return;
+	}
+	if (stream->has_yield)
+		stream->has_yield = false;
+	else if (!needs_yield)
+		return;
+	fiber_sleep(0);
+}
+
+static void
+wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
+{
+	struct wal_stream *stream =
+		container_of(base, struct wal_stream, base);
+	/*
+	 * Account all rows, even non-DML, and even leading to an error. Because
+	 * still need to yield sometimes.
 	 */
-	if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
-		fiber_sleep(0);
+	++stream->rows;
+	if (iproto_type_is_synchro_request(row->type)) {
+		if (wal_stream_apply_synchro_row(stream, row) != 0)
+			goto end_error;
+	} else if (iproto_type_is_raft_request(row->type)) {
+		if (wal_stream_apply_raft_row(stream, row) != 0)
+			goto end_error;
+	} else if (wal_stream_apply_dml_row(stream, row) != 0) {
+		goto end_error;
+	}
+	wal_stream_try_yield(stream);
+	return;
+
+end_error:
+	wal_stream_abort(stream);
+	wal_stream_try_yield(stream);
+	diag_raise();
 }
 
 static void
 wal_stream_create(struct wal_stream *ctx)
 {
-	xstream_create(&ctx->base, apply_wal_row);
+	xstream_create(&ctx->base, wal_stream_apply_row);
+	ctx->tsn = 0;
+	ctx->has_yield = false;
 	ctx->rows = 0;
 }
 
@@ -2797,9 +2983,13 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	struct wal_stream wal_stream;
 	wal_stream_create(&wal_stream);
+	auto stream_guard = make_scoped_guard([&]{
+		wal_stream_abort(&wal_stream);
+	});
 
 	struct recovery *recovery;
-	recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"),
+	bool is_force_recovery = cfg_geti("force_recovery");
+	recovery = recovery_new(wal_dir(), is_force_recovery,
 				checkpoint_vclock);
 
 	/*
@@ -2861,6 +3051,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	engine_begin_final_recovery_xc();
 	recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
+	if (wal_stream_has_tx(&wal_stream)) {
+		wal_stream_abort(&wal_stream);
+		diag_set(XlogError, "found a not finished transaction "
+			 "in the log");
+		if (!is_force_recovery)
+			diag_raise();
+		diag_log();
+	}
 	engine_end_recovery_xc();
 	/*
 	 * Leave hot standby mode, if any, only after
@@ -2880,6 +3078,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		}
 		recovery_stop_local(recovery);
 		recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
+		if (wal_stream_has_tx(&wal_stream)) {
+			wal_stream_abort(&wal_stream);
+			diag_set(XlogError, "found a not finished transaction "
+				 "in the log in hot standby mode");
+			if (!is_force_recovery)
+				diag_raise();
+			diag_log();
+		}
 		/*
 		 * Advance replica set vclock to reflect records
 		 * applied in hot standby mode.
@@ -2888,6 +3094,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		box_listen();
 		box_sync_replication(false);
 	}
+	stream_guard.is_active = false;
 	recovery_finalize(recovery);
 	is_local_recovery = false;
 
diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result
new file mode 100644
index 000000000..a64eadd9c
--- /dev/null
+++ b/test/replication/gh-5874-qsync-txn-recovery.result
@@ -0,0 +1,124 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+--
+-- gh-5874: synchronous transactions should be recovered as whole units, not row
+-- by row. So as to be able to roll them back when ROLLBACK is recovered
+-- afterwards.
+--
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+ | ---
+ | ...
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+ | ---
+ | ...
+box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
+ | ---
+ | ...
+engine = test_run:get_cfg('engine')
+ | ---
+ | ...
+async = box.schema.create_space('async', {engine = engine})
+ | ---
+ | ...
+_ = async:create_index('pk')
+ | ---
+ | ...
+sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
+ | ---
+ | ...
+_ = sync:create_index('pk')
+ | ---
+ | ...
+
+-- The transaction fails, but is written to the log anyway.
+box.begin() async:insert{1} sync:insert{1} box.commit()
+ | ---
+ | - error: Quorum collection for a synchronous transaction is timed out
+ | ...
+-- Ok, the previous txn is rolled back.
+_ = async:insert{1}
+ | ---
+ | ...
+box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
+ | ---
+ | ...
+_ = sync:insert{1}
+ | ---
+ | ...
+-- Try multi-statement sync txn to see how it recovers.
+box.begin() sync:insert{2} sync:insert{3} box.commit()
+ | ---
+ | ...
+
+-- See if NOP multi-statement recovery works fine.
+--
+-- Start with NOP.
+do_skip = false
+ | ---
+ | ...
+_ = async:before_replace(function()                                             \
+    if do_skip then                                                             \
+        return nil                                                              \
+    end                                                                         \
+end)
+ | ---
+ | ...
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{2}                                                                \
+do_skip = false                                                                 \
+async:replace{3}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- NOP in the middle.
+box.begin()                                                                     \
+async:replace{4}                                                                \
+do_skip = true                                                                  \
+async:replace{5}                                                                \
+do_skip = false                                                                 \
+async:replace{6}                                                                \
+box.commit()
+ | ---
+ | ...
+
+-- All NOP.
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{7}                                                                \
+async:replace{8}                                                                \
+do_skip = false                                                                 \
+box.commit()
+ | ---
+ | ...
+
+test_run:cmd('restart server default')
+ | 
+async = box.space.async
+ | ---
+ | ...
+sync = box.space.sync
+ | ---
+ | ...
+async:select()
+ | ---
+ | - - [1]
+ |   - [3]
+ |   - [4]
+ |   - [6]
+ | ...
+sync:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ | ...
+async:drop()
+ | ---
+ | ...
+sync:drop()
+ | ---
+ | ...
diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua
new file mode 100644
index 000000000..efcf727cc
--- /dev/null
+++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua
@@ -0,0 +1,64 @@
+test_run = require('test_run').new()
+--
+-- gh-5874: synchronous transactions should be recovered as whole units, not row
+-- by row. So as to be able to roll them back when ROLLBACK is recovered
+-- afterwards.
+--
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
+engine = test_run:get_cfg('engine')
+async = box.schema.create_space('async', {engine = engine})
+_ = async:create_index('pk')
+sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
+_ = sync:create_index('pk')
+
+-- The transaction fails, but is written to the log anyway.
+box.begin() async:insert{1} sync:insert{1} box.commit()
+-- Ok, the previous txn is rolled back.
+_ = async:insert{1}
+box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
+_ = sync:insert{1}
+-- Try multi-statement sync txn to see how it recovers.
+box.begin() sync:insert{2} sync:insert{3} box.commit()
+
+-- See if NOP multi-statement recovery works fine.
+--
+-- Start with NOP.
+do_skip = false
+_ = async:before_replace(function()                                             \
+    if do_skip then                                                             \
+        return nil                                                              \
+    end                                                                         \
+end)
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{2}                                                                \
+do_skip = false                                                                 \
+async:replace{3}                                                                \
+box.commit()
+
+-- NOP in the middle.
+box.begin()                                                                     \
+async:replace{4}                                                                \
+do_skip = true                                                                  \
+async:replace{5}                                                                \
+do_skip = false                                                                 \
+async:replace{6}                                                                \
+box.commit()
+
+-- All NOP.
+box.begin()                                                                     \
+do_skip = true                                                                  \
+async:replace{7}                                                                \
+async:replace{8}                                                                \
+do_skip = false                                                                 \
+box.commit()
+
+test_run:cmd('restart server default')
+async = box.space.async
+sync = box.space.sync
+async:select()
+sync:select()
+async:drop()
+sync:drop()
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list