[Tarantool-patches] [PATCH 2/3] recovery: make it transactional

Serge Petrenko sergepetrenko at tarantool.org
Mon Apr 5 11:36:39 MSK 2021



03.04.2021 16:18, Vladislav Shpilevoy пишет:
> Hi! Thanks for the review!
>
>> I ran the tests locally and replication suite fails occasionally with
>> ```
>> [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:539 E> error at request: {type: 'INSERT', replica_id: 0, lsn: 7, space_id: 517, index_id: 0, tuple: [21]}
>> [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:481 E> XlogError: found a first row in a transaction with LSN/TSN mismatch
>> [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch
>> [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch
>> ```
> Thanks for noticing! There was a bug with transactions starting
> from a local row - its LSN is from vclock[0] while for global
> rows it is from vclock[instance_id > 0].

Oh, I see.

>
> I fixed it by checking LSN == TSN only for the first global row.
> If all rows are local, I check LSN matches the first row's LSN.
>
> Here is the diff.

Thanks! LGTM.

>
> ====================
> diff --git a/src/box/box.cc b/src/box/box.cc
> index f70a2bd0e..67b44c053 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -338,12 +338,22 @@ struct wal_stream {
>   	struct xstream base;
>   	/** Current transaction ID. 0 when no transaction. */
>   	int64_t tsn;
> +	/**
> +	 * LSN of the first row saved to check TSN and LSN match in case all
> +	 * rows of the tx appeared to be local.
> +	 */
> +	int64_t first_row_lsn;
>   	/**
>   	 * Flag whether there is a pending yield to do when the current
>   	 * transaction is finished. It can't always be done right away because
>   	 * would abort the current transaction if it is memtx.
>   	 */
>   	bool has_yield;
> +	/**
> +	 * True if any row in the transaction was global. Saved to check if TSN
> +	 * matches LSN of a first global row.
> +	 */
> +	bool has_global_row;
>   	/** How many rows have been recovered so far. */
>   	size_t rows;
>   };
> @@ -484,12 +494,9 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
>   			diag_set(XlogError, "found a row without TSN");
>   			goto end_diag_request;
>   		}
> -		if (row->tsn != row->lsn) {
> -			diag_set(XlogError, "found a first row in a "
> -				 "transaction with LSN/TSN mismatch");
> -			goto end_diag_request;
> -		}
>   		stream->tsn = row->tsn;
> +		stream->first_row_lsn = row->lsn;
> +		stream->has_global_row = false;
>   		/*
>   		 * Rows are not stacked into a list like during replication,
>   		 * because recovery does not yield while reading the rows. All
> @@ -510,6 +517,15 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
>   	} else {
>   		txn = in_txn();
>   	}
> +	/* Ensure TSN is equal to LSN of the first global row. */
> +	if (!stream->has_global_row && row->group_id != GROUP_LOCAL) {
> +		if (row->tsn != row->lsn) {
> +			diag_set(XlogError, "found a first global row in a "
> +				 "transaction with LSN/TSN mismatch");
> +			goto end_diag_request;
> +		}
> +		stream->has_global_row = true;
> +	}
>   	assert(wal_stream_has_tx(stream));
>   	/* Nops might appear at least after before_replace skipping rows. */
>   	if (request.type != IPROTO_NOP) {
> @@ -526,8 +542,26 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
>   	assert(txn != NULL);
>   	if (!row->is_commit)
>   		return 0;
> -
> +	/*
> +	 * For fully local transactions the TSN check won't work like for global
> +	 * transactions, because it is not known if there are global rows until
> +	 * commit arrives.
> +	 */
> +	if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) {
> +		diag_set(XlogError, "fully local transaction's TSN does not "
> +			 "match LSN of the first row");
> +		return -1;
> +	}
>   	stream->tsn = 0;
> +	/*
> +	 * During local recovery the commit procedure should be async, otherwise
> +	 * the only fiber processing recovery will get stuck on the first
> +	 * synchronous tx it meets until confirm timeout is reached and the tx
> +	 * is rolled back, yielding an error.
> +	 * Moreover, txn_commit_try_async() doesn't hurt at all during local
> +	 * recovery, since journal_write is faked at this stage and returns
> +	 * immediately.
> +	 */
>   	if (txn_commit_try_async(txn) != 0) {
>   		/* Commit fail automatically leads to rollback. */
>   		assert(in_txn() == NULL);
> @@ -555,20 +589,15 @@ end_diag_request:
>   static void
>   wal_stream_try_yield(struct wal_stream *stream)
>   {
> -	bool needs_yield = (stream->rows % WAL_ROWS_PER_YIELD == 0);
> -	if (wal_stream_has_tx(stream)) {
> -		/*
> -		 * Save the yield. Otherwise it would happen only on rows which
> -		 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
> -		 * transaction, which is probably a very rare coincidence.
> -		 */
> -		stream->has_yield = true;
> -		return;
> -	}
> -	if (stream->has_yield)
> -		stream->has_yield = false;
> -	else if (!needs_yield)
> +	/*
> +	 * Save the yield. Otherwise it would happen only on rows which
> +	 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
> +	 * transaction, which is probably a very rare coincidence.
> +	 */
> +	stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
> +	if (wal_stream_has_tx(stream) || !stream->has_yield)
>   		return;
> +	stream->has_yield = false;
>   	fiber_sleep(0);
>   }
>   
> @@ -605,7 +634,9 @@ wal_stream_create(struct wal_stream *ctx)
>   {
>   	xstream_create(&ctx->base, wal_stream_apply_row);
>   	ctx->tsn = 0;
> +	ctx->first_row_lsn = 0;
>   	ctx->has_yield = false;
> +	ctx->has_global_row = false;
>   	ctx->rows = 0;
>   }
>   
> diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result
> index a64eadd9c..73f903ca7 100644
> --- a/test/replication/gh-5874-qsync-txn-recovery.result
> +++ b/test/replication/gh-5874-qsync-txn-recovery.result
> @@ -95,6 +95,31 @@ box.commit()
>    | ---
>    | ...
>   
> +--
> +-- First row might be for a local space and its LSN won't match TSN. Need to be
> +-- ok with that.
> +--
> +loc = box.schema.create_space('loc', {is_local = true, engine = engine})
> + | ---
> + | ...
> +_ = loc:create_index('pk')
> + | ---
> + | ...
> +box.begin()                                                                     \
> +loc:replace{1}                                                                  \
> +async:replace{9}                                                                \
> +box.commit()
> + | ---
> + | ...
> +
> +-- All local.
> +box.begin()                                                                     \
> +loc:replace{2}                                                                  \
> +loc:replace{3}                                                                  \
> +box.commit()
> + | ---
> + | ...
> +
>   test_run:cmd('restart server default')
>    |
>   async = box.space.async
> @@ -103,12 +128,16 @@ async = box.space.async
>   sync = box.space.sync
>    | ---
>    | ...
> +loc = box.space.loc
> + | ---
> + | ...
>   async:select()
>    | ---
>    | - - [1]
>    |   - [3]
>    |   - [4]
>    |   - [6]
> + |   - [9]
>    | ...
>   sync:select()
>    | ---
> @@ -116,9 +145,18 @@ sync:select()
>    |   - [2]
>    |   - [3]
>    | ...
> +loc:select()
> + | ---
> + | - - [1]
> + |   - [2]
> + |   - [3]
> + | ...
>   async:drop()
>    | ---
>    | ...
>   sync:drop()
>    | ---
>    | ...
> +loc:drop()
> + | ---
> + | ...
> diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua
> index efcf727cc..f35eb68de 100644
> --- a/test/replication/gh-5874-qsync-txn-recovery.test.lua
> +++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua
> @@ -55,10 +55,30 @@ async:replace{8}
>   do_skip = false                                                                 \
>   box.commit()
>   
> +--
> +-- First row might be for a local space and its LSN won't match TSN. Need to be
> +-- ok with that.
> +--
> +loc = box.schema.create_space('loc', {is_local = true, engine = engine})
> +_ = loc:create_index('pk')
> +box.begin()                                                                     \
> +loc:replace{1}                                                                  \
> +async:replace{9}                                                                \
> +box.commit()
> +
> +-- All local.
> +box.begin()                                                                     \
> +loc:replace{2}                                                                  \
> +loc:replace{3}                                                                  \
> +box.commit()
> +
>   test_run:cmd('restart server default')
>   async = box.space.async
>   sync = box.space.sync
> +loc = box.space.loc
>   async:select()
>   sync:select()
> +loc:select()
>   async:drop()
>   sync:drop()
> +loc:drop()
>
> ====================
>
>> Consider this diff. It looks simpler IMO, but feel free to ignore.
> Applied your diff, see above.
>
> The full patch:
>
> ====================
> diff --git a/changelogs/unreleased/qsync-multi-statement-recovery b/changelogs/unreleased/qsync-multi-statement-recovery
> new file mode 100644
> index 000000000..c902cbe24
> --- /dev/null
> +++ b/changelogs/unreleased/qsync-multi-statement-recovery
> @@ -0,0 +1,5 @@
> +## bugfix/replication
> +
> +* Fix recovery of a rolled back multi-statement synchronous transaction which
> +  could lead to the transaction being applied partially, and to recovery errors.
> +  It happened in case the transaction worked with non-sync spaces (gh-5874).
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 4da274976..67b44c053 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -334,7 +334,26 @@ box_set_orphan(bool orphan)
>   }
>   
>   struct wal_stream {
> +	/** Base class. */
>   	struct xstream base;
> +	/** Current transaction ID. 0 when no transaction. */
> +	int64_t tsn;
> +	/**
> +	 * LSN of the first row saved to check TSN and LSN match in case all
> +	 * rows of the tx appeared to be local.
> +	 */
> +	int64_t first_row_lsn;
> +	/**
> +	 * Flag whether there is a pending yield to do when the current
> +	 * transaction is finished. It can't always be done right away because
> +	 * would abort the current transaction if it is memtx.
> +	 */
> +	bool has_yield;
> +	/**
> +	 * True if any row in the transaction was global. Saved to check if TSN
> +	 * matches LSN of a first global row.
> +	 */
> +	bool has_global_row;
>   	/** How many rows have been recovered so far. */
>   	size_t rows;
>   };
> @@ -379,47 +398,245 @@ recovery_journal_create(struct vclock *v)
>   	journal_set(&journal.base);
>   }
>   
> +/**
> + * Drop the stream to the initial state. It is supposed to be done when an error
> + * happens. Because in case of force recovery the stream will continue getting
> + * tuples. For that it must stay in a valid state and must handle them somehow.
> + *
> + * Now the stream simply drops the current transaction like it never happened,
> + * even if its commit-row wasn't met yet. Should be good enough for
> + * force-recovery when the consistency is already out of the game.
> + */
>   static void
> -apply_wal_row(struct xstream *stream, struct xrow_header *row)
> +wal_stream_abort(struct wal_stream *stream)
> +{
> +	struct txn *tx = in_txn();
> +	if (tx != NULL)
> +		txn_rollback(tx);
> +	stream->tsn = 0;
> +	fiber_gc();
> +}
> +
> +/**
> + * The wrapper exists only for the debug purposes, to ensure tsn being non-0 is
> + * in sync with the fiber's txn being non-NULL. It has nothing to do with the
> + * journal content, and therefore can use assertions instead of rigorous error
> + * checking even in release.
> + */
> +static bool
> +wal_stream_has_tx(const struct wal_stream *stream)
> +{
> +	bool has = stream->tsn != 0;
> +	assert(has == (in_txn() != NULL));
> +	return has;
> +}
> +
> +static int
> +wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
> +{
> +	assert(iproto_type_is_synchro_request(row->type));
> +	if (wal_stream_has_tx(stream)) {
> +		diag_set(XlogError, "found synchro request in a transaction");
> +		return -1;
> +	}
> +	struct synchro_request syn_req;
> +	if (xrow_decode_synchro(row, &syn_req) != 0) {
> +		say_error("couldn't decode a synchro request");
> +		return -1;
> +	}
> +	txn_limbo_process(&txn_limbo, &syn_req);
> +	return 0;
> +}
> +
> +static int
> +wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row)
> +{
> +	assert(iproto_type_is_raft_request(row->type));
> +	if (wal_stream_has_tx(stream)) {
> +		diag_set(XlogError, "found raft request in a transaction");
> +		return -1;
> +	}
> +	struct raft_request raft_req;
> +	/* Vclock is never persisted in WAL by Raft. */
> +	if (xrow_decode_raft(row, &raft_req, NULL) != 0) {
> +		say_error("couldn't decode a raft request");
> +		return -1;
> +	}
> +	box_raft_recover(&raft_req);
> +	return 0;
> +}
> +
> +/**
> + * Rows of the same transaction are wrapped into begin/commit. Mostly for the
> + * sake of synchronous replication, when the log can contain rolled back
> + * transactions, which must be entirely reverted during recovery when ROLLBACK
> + * records are met. Row-by-row recovery wouldn't work for multi-statement
> + * synchronous transactions.
> + */
> +static int
> +wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row)
>   {
>   	struct request request;
> -	if (iproto_type_is_synchro_request(row->type)) {
> -		struct synchro_request syn_req;
> -		if (xrow_decode_synchro(row, &syn_req) != 0)
> -			diag_raise();
> -		txn_limbo_process(&txn_limbo, &syn_req);
> -		return;
> +	uint64_t req_type = dml_request_key_map(row->type);
> +	if (xrow_decode_dml(row, &request, req_type) != 0) {
> +		say_error("couldn't decode a DML request");
> +		return -1;
>   	}
> -	if (iproto_type_is_raft_request(row->type)) {
> -		struct raft_request raft_req;
> -		/* Vclock is never persisted in WAL by Raft. */
> -		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
> -			diag_raise();
> -		box_raft_recover(&raft_req);
> -		return;
> +	/*
> +	 * Note that all the information which came from the log is validated
> +	 * and the errors are handled. Not asserted or paniced. That is for the
> +	 * sake of force recovery, which must be able to recover just everything
> +	 * what possible instead of terminating the instance.
> +	 */
> +	struct txn *txn;
> +	if (stream->tsn == 0) {
> +		if (row->tsn == 0) {
> +			diag_set(XlogError, "found a row without TSN");
> +			goto end_diag_request;
> +		}
> +		stream->tsn = row->tsn;
> +		stream->first_row_lsn = row->lsn;
> +		stream->has_global_row = false;
> +		/*
> +		 * Rows are not stacked into a list like during replication,
> +		 * because recovery does not yield while reading the rows. All
> +		 * the yields are controlled by the stream, and therefore no
> +		 * need to wait for all the rows to start a transaction. Can
> +		 * start now, apply the rows, and make a yield after commit if
> +		 * necessary. Helps to avoid a lot of copying.
> +		 */
> +		txn = txn_begin();
> +		if (txn == NULL) {
> +			say_error("couldn't begin a recovery transaction");
> +			return -1;
> +		}
> +	} else if (row->tsn != stream->tsn) {
> +		diag_set(XlogError, "found a next transaction with the "
> +			 "previous one not yet committed");
> +		goto end_diag_request;
> +	} else {
> +		txn = in_txn();
> +	}
> +	/* Ensure TSN is equal to LSN of the first global row. */
> +	if (!stream->has_global_row && row->group_id != GROUP_LOCAL) {
> +		if (row->tsn != row->lsn) {
> +			diag_set(XlogError, "found a first global row in a "
> +				 "transaction with LSN/TSN mismatch");
> +			goto end_diag_request;
> +		}
> +		stream->has_global_row = true;
>   	}
> -	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> +	assert(wal_stream_has_tx(stream));
> +	/* Nops might appear at least after before_replace skipping rows. */
>   	if (request.type != IPROTO_NOP) {
> -		struct space *space = space_cache_find_xc(request.space_id);
> +		struct space *space = space_cache_find(request.space_id);
> +		if (space == NULL) {
> +			say_error("couldn't find space by ID");
> +			goto end_diag_request;
> +		}
>   		if (box_process_rw(&request, space, NULL) != 0) {
> -			say_error("error applying row: %s", request_str(&request));
> -			diag_raise();
> +			say_error("couldn't apply the request");
> +			goto end_diag_request;
>   		}
>   	}
> -	struct wal_stream *xstream =
> -		container_of(stream, struct wal_stream, base);
> -	/**
> -	 * Yield once in a while, but not too often,
> -	 * mostly to allow signal handling to take place.
> +	assert(txn != NULL);
> +	if (!row->is_commit)
> +		return 0;
> +	/*
> +	 * For fully local transactions the TSN check won't work like for global
> +	 * transactions, because it is not known if there are global rows until
> +	 * commit arrives.
>   	 */
> -	if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
> -		fiber_sleep(0);
> +	if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) {
> +		diag_set(XlogError, "fully local transaction's TSN does not "
> +			 "match LSN of the first row");
> +		return -1;
> +	}
> +	stream->tsn = 0;
> +	/*
> +	 * During local recovery the commit procedure should be async, otherwise
> +	 * the only fiber processing recovery will get stuck on the first
> +	 * synchronous tx it meets until confirm timeout is reached and the tx
> +	 * is rolled back, yielding an error.
> +	 * Moreover, txn_commit_try_async() doesn't hurt at all during local
> +	 * recovery, since journal_write is faked at this stage and returns
> +	 * immediately.
> +	 */
> +	if (txn_commit_try_async(txn) != 0) {
> +		/* Commit fail automatically leads to rollback. */
> +		assert(in_txn() == NULL);
> +		say_error("couldn't commit a recovery transaction");
> +		return -1;
> +	}
> +	assert(in_txn() == NULL);
> +	fiber_gc();
> +	return 0;
> +
> +end_diag_request:
> +	/*
> +	 * The label must be used only for the errors related directly to the
> +	 * request. Errors like txn_begin() fail has nothing to do with it, and
> +	 * therefore don't log the request as the fault reason.
> +	 */
> +	say_error("error at request: %s", request_str(&request));
> +	return -1;
> +}
> +
> +/**
> + * Yield once in a while, but not too often, mostly to allow signal handling to
> + * take place.
> + */
> +static void
> +wal_stream_try_yield(struct wal_stream *stream)
> +{
> +	/*
> +	 * Save the yield. Otherwise it would happen only on rows which
> +	 * are a multiple of WAL_ROWS_PER_YIELD and are last in their
> +	 * transaction, which is probably a very rare coincidence.
> +	 */
> +	stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0);
> +	if (wal_stream_has_tx(stream) || !stream->has_yield)
> +		return;
> +	stream->has_yield = false;
> +	fiber_sleep(0);
> +}
> +
> +static void
> +wal_stream_apply_row(struct xstream *base, struct xrow_header *row)
> +{
> +	struct wal_stream *stream =
> +		container_of(base, struct wal_stream, base);
> +	/*
> +	 * Account all rows, even non-DML, and even leading to an error. Because
> +	 * still need to yield sometimes.
> +	 */
> +	++stream->rows;
> +	if (iproto_type_is_synchro_request(row->type)) {
> +		if (wal_stream_apply_synchro_row(stream, row) != 0)
> +			goto end_error;
> +	} else if (iproto_type_is_raft_request(row->type)) {
> +		if (wal_stream_apply_raft_row(stream, row) != 0)
> +			goto end_error;
> +	} else if (wal_stream_apply_dml_row(stream, row) != 0) {
> +		goto end_error;
> +	}
> +	wal_stream_try_yield(stream);
> +	return;
> +
> +end_error:
> +	wal_stream_abort(stream);
> +	wal_stream_try_yield(stream);
> +	diag_raise();
>   }
>   
>   static void
>   wal_stream_create(struct wal_stream *ctx)
>   {
> -	xstream_create(&ctx->base, apply_wal_row);
> +	xstream_create(&ctx->base, wal_stream_apply_row);
> +	ctx->tsn = 0;
> +	ctx->first_row_lsn = 0;
> +	ctx->has_yield = false;
> +	ctx->has_global_row = false;
>   	ctx->rows = 0;
>   }
>   
> @@ -2797,9 +3014,13 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   
>   	struct wal_stream wal_stream;
>   	wal_stream_create(&wal_stream);
> +	auto stream_guard = make_scoped_guard([&]{
> +		wal_stream_abort(&wal_stream);
> +	});
>   
>   	struct recovery *recovery;
> -	recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"),
> +	bool is_force_recovery = cfg_geti("force_recovery");
> +	recovery = recovery_new(wal_dir(), is_force_recovery,
>   				checkpoint_vclock);
>   
>   	/*
> @@ -2861,6 +3082,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   
>   	engine_begin_final_recovery_xc();
>   	recover_remaining_wals(recovery, &wal_stream.base, NULL, false);
> +	if (wal_stream_has_tx(&wal_stream)) {
> +		wal_stream_abort(&wal_stream);
> +		diag_set(XlogError, "found a not finished transaction "
> +			 "in the log");
> +		if (!is_force_recovery)
> +			diag_raise();
> +		diag_log();
> +	}
>   	engine_end_recovery_xc();
>   	/*
>   	 * Leave hot standby mode, if any, only after
> @@ -2880,6 +3109,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   		}
>   		recovery_stop_local(recovery);
>   		recover_remaining_wals(recovery, &wal_stream.base, NULL, true);
> +		if (wal_stream_has_tx(&wal_stream)) {
> +			wal_stream_abort(&wal_stream);
> +			diag_set(XlogError, "found a not finished transaction "
> +				 "in the log in hot standby mode");
> +			if (!is_force_recovery)
> +				diag_raise();
> +			diag_log();
> +		}
>   		/*
>   		 * Advance replica set vclock to reflect records
>   		 * applied in hot standby mode.
> @@ -2888,6 +3125,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   		box_listen();
>   		box_sync_replication(false);
>   	}
> +	stream_guard.is_active = false;
>   	recovery_finalize(recovery);
>   	is_local_recovery = false;
>   
> diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result
> new file mode 100644
> index 000000000..73f903ca7
> --- /dev/null
> +++ b/test/replication/gh-5874-qsync-txn-recovery.result
> @@ -0,0 +1,162 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +--
> +-- gh-5874: synchronous transactions should be recovered as whole units, not row
> +-- by row. So as to be able to roll them back when ROLLBACK is recovered
> +-- afterwards.
> +--
> +old_synchro_quorum = box.cfg.replication_synchro_quorum
> + | ---
> + | ...
> +old_synchro_timeout = box.cfg.replication_synchro_timeout
> + | ---
> + | ...
> +box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
> + | ---
> + | ...
> +engine = test_run:get_cfg('engine')
> + | ---
> + | ...
> +async = box.schema.create_space('async', {engine = engine})
> + | ---
> + | ...
> +_ = async:create_index('pk')
> + | ---
> + | ...
> +sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
> + | ---
> + | ...
> +_ = sync:create_index('pk')
> + | ---
> + | ...
> +
> +-- The transaction fails, but is written to the log anyway.
> +box.begin() async:insert{1} sync:insert{1} box.commit()
> + | ---
> + | - error: Quorum collection for a synchronous transaction is timed out
> + | ...
> +-- Ok, the previous txn is rolled back.
> +_ = async:insert{1}
> + | ---
> + | ...
> +box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
> + | ---
> + | ...
> +_ = sync:insert{1}
> + | ---
> + | ...
> +-- Try multi-statement sync txn to see how it recovers.
> +box.begin() sync:insert{2} sync:insert{3} box.commit()
> + | ---
> + | ...
> +
> +-- See if NOP multi-statement recovery works fine.
> +--
> +-- Start with NOP.
> +do_skip = false
> + | ---
> + | ...
> +_ = async:before_replace(function()                                             \
> +    if do_skip then                                                             \
> +        return nil                                                              \
> +    end                                                                         \
> +end)
> + | ---
> + | ...
> +box.begin()                                                                     \
> +do_skip = true                                                                  \
> +async:replace{2}                                                                \
> +do_skip = false                                                                 \
> +async:replace{3}                                                                \
> +box.commit()
> + | ---
> + | ...
> +
> +-- NOP in the middle.
> +box.begin()                                                                     \
> +async:replace{4}                                                                \
> +do_skip = true                                                                  \
> +async:replace{5}                                                                \
> +do_skip = false                                                                 \
> +async:replace{6}                                                                \
> +box.commit()
> + | ---
> + | ...
> +
> +-- All NOP.
> +box.begin()                                                                     \
> +do_skip = true                                                                  \
> +async:replace{7}                                                                \
> +async:replace{8}                                                                \
> +do_skip = false                                                                 \
> +box.commit()
> + | ---
> + | ...
> +
> +--
> +-- First row might be for a local space and its LSN won't match TSN. Need to be
> +-- ok with that.
> +--
> +loc = box.schema.create_space('loc', {is_local = true, engine = engine})
> + | ---
> + | ...
> +_ = loc:create_index('pk')
> + | ---
> + | ...
> +box.begin()                                                                     \
> +loc:replace{1}                                                                  \
> +async:replace{9}                                                                \
> +box.commit()
> + | ---
> + | ...
> +
> +-- All local.
> +box.begin()                                                                     \
> +loc:replace{2}                                                                  \
> +loc:replace{3}                                                                  \
> +box.commit()
> + | ---
> + | ...
> +
> +test_run:cmd('restart server default')
> + |
> +async = box.space.async
> + | ---
> + | ...
> +sync = box.space.sync
> + | ---
> + | ...
> +loc = box.space.loc
> + | ---
> + | ...
> +async:select()
> + | ---
> + | - - [1]
> + |   - [3]
> + |   - [4]
> + |   - [6]
> + |   - [9]
> + | ...
> +sync:select()
> + | ---
> + | - - [1]
> + |   - [2]
> + |   - [3]
> + | ...
> +loc:select()
> + | ---
> + | - - [1]
> + |   - [2]
> + |   - [3]
> + | ...
> +async:drop()
> + | ---
> + | ...
> +sync:drop()
> + | ---
> + | ...
> +loc:drop()
> + | ---
> + | ...
> diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua
> new file mode 100644
> index 000000000..f35eb68de
> --- /dev/null
> +++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua
> @@ -0,0 +1,84 @@
> +test_run = require('test_run').new()
> +--
> +-- gh-5874: synchronous transactions should be recovered as whole units, not row
> +-- by row. So as to be able to roll them back when ROLLBACK is recovered
> +-- afterwards.
> +--
> +old_synchro_quorum = box.cfg.replication_synchro_quorum
> +old_synchro_timeout = box.cfg.replication_synchro_timeout
> +box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001}
> +engine = test_run:get_cfg('engine')
> +async = box.schema.create_space('async', {engine = engine})
> +_ = async:create_index('pk')
> +sync = box.schema.create_space('sync', {is_sync = true, engine = engine})
> +_ = sync:create_index('pk')
> +
> +-- The transaction fails, but is written to the log anyway.
> +box.begin() async:insert{1} sync:insert{1} box.commit()
> +-- Ok, the previous txn is rolled back.
> +_ = async:insert{1}
> +box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000}
> +_ = sync:insert{1}
> +-- Try multi-statement sync txn to see how it recovers.
> +box.begin() sync:insert{2} sync:insert{3} box.commit()
> +
> +-- See if NOP multi-statement recovery works fine.
> +--
> +-- Start with NOP.
> +do_skip = false
> +_ = async:before_replace(function()                                             \
> +    if do_skip then                                                             \
> +        return nil                                                              \
> +    end                                                                         \
> +end)
> +box.begin()                                                                     \
> +do_skip = true                                                                  \
> +async:replace{2}                                                                \
> +do_skip = false                                                                 \
> +async:replace{3}                                                                \
> +box.commit()
> +
> +-- NOP in the middle.
> +box.begin()                                                                     \
> +async:replace{4}                                                                \
> +do_skip = true                                                                  \
> +async:replace{5}                                                                \
> +do_skip = false                                                                 \
> +async:replace{6}                                                                \
> +box.commit()
> +
> +-- All NOP.
> +box.begin()                                                                     \
> +do_skip = true                                                                  \
> +async:replace{7}                                                                \
> +async:replace{8}                                                                \
> +do_skip = false                                                                 \
> +box.commit()
> +
> +--
> +-- First row might be for a local space and its LSN won't match TSN. Need to be
> +-- ok with that.
> +--
> +loc = box.schema.create_space('loc', {is_local = true, engine = engine})
> +_ = loc:create_index('pk')
> +box.begin()                                                                     \
> +loc:replace{1}                                                                  \
> +async:replace{9}                                                                \
> +box.commit()
> +
> +-- All local.
> +box.begin()                                                                     \
> +loc:replace{2}                                                                  \
> +loc:replace{3}                                                                  \
> +box.commit()
> +
> +test_run:cmd('restart server default')
> +async = box.space.async
> +sync = box.space.sync
> +loc = box.space.loc
> +async:select()
> +sync:select()
> +loc:select()
> +async:drop()
> +sync:drop()
> +loc:drop()

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list