[Tarantool-patches] [PATCH 4/5] [tosquash] replication: rework how local transactions wait sync

Serge Petrenko sergepetrenko at tarantool.org
Sun Jul 5 12:04:05 MSK 2020


03.07.2020 02:40, Vladislav Shpilevoy пишет:
> There was a bug about how async transactions were blocked by a
> not empty limbo. This was about fully local transactions. The
> problem is that they don't have LSN in the needed vclock
> component - it is always 0. It means, that their limbo entry
> can't get a valid LSN by any means. Even a copy of the previous
> sync transaction's LSN won't work, because the latter may by
> still not written to WAL.
>
> This patch makes async transactions always have lsn -1 in their
> limbo entry. Because anyway it does not matter. It is not needed
> to collect ACKs, nor to propagate limbo's vclock.
>
> Now even a fully local transaction can be blocked by a pending
> sync transaction.
>
> Note, this does not cover the case, when the transaction is not
> fully local, and its last row is local.
Hi! Thanks for the patch! Looks good, with one comment below.
> ---
>   src/box/txn.c                         |  18 ++--
>   src/box/txn_limbo.c                   |  60 +++++++------
>   test/replication/qsync_basic.result   | 120 ++++++++++++++++++++++++++
>   test/replication/qsync_basic.test.lua |  45 ++++++++++
>   4 files changed, 208 insertions(+), 35 deletions(-)
>
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 6c333cbed..fe0591197 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -688,15 +688,15 @@ txn_commit_async(struct txn *txn)
>   
>   		/* See txn_commit(). */
>   		uint32_t origin_id = req->rows[0]->replica_id;
> -		int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn;
>   		limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
>   		if (limbo_entry == NULL) {
>   			txn_rollback(txn);
>   			return -1;
>   		}
> -		assert(lsn > 0);
> -		txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
> -
> +		if (txn_has_flag(txn, TXN_WAIT_ACK)) {
> +			int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn;
> +			txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
> +		}
>   		/*
>   		 * Set a trigger to abort waiting for confirm on
>   		 * WAL write failure.
> @@ -779,10 +779,12 @@ txn_commit(struct txn *txn)
>   		return -1;
>   	}
>   	if (is_sync) {
> -		int64_t lsn = req->rows[req->n_rows - 1]->lsn;
> -		txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
> -		/* Local WAL write is a first 'ACK'. */
> -		txn_limbo_ack(&txn_limbo, txn_limbo.instance_id, lsn);
> +		if (txn_has_flag(txn, TXN_WAIT_ACK)) {
> +			int64_t lsn = req->rows[req->n_rows - 1]->lsn;
> +			txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
> +			/* Local WAL write is a first 'ACK'. */
> +			txn_limbo_ack(&txn_limbo, txn_limbo.instance_id, lsn);
> +		}
>   		if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) {
>   			txn_free(txn);
>   			return -1;
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index 387cfd337..44a0c7273 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -120,6 +120,7 @@ txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
>   	assert(limbo->instance_id != REPLICA_ID_NIL);
>   	assert(entry->lsn == -1);
>   	assert(lsn > 0);
> +	assert(txn_has_flag(entry->txn, TXN_WAIT_ACK));
>   	(void) limbo;
>   	entry->lsn = lsn;
>   }
> @@ -129,6 +130,12 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>   {
>   	if (txn_limbo_entry_is_complete(entry))
>   		return true;
> +	/*
> +	 * Async transaction can't complete itself. It is always
> +	 * completed by a previous sync transaction.
> +	 */
> +	if (!txn_has_flag(entry->txn, TXN_WAIT_ACK))
> +		return false;
>   	struct vclock_iterator iter;
>   	vclock_iterator_init(&iter, &limbo->vclock);
>   	int ack_count = 0;
> @@ -142,14 +149,13 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>   }
>   
>   static int
> -txn_limbo_write_rollback(struct txn_limbo *limbo,
> -			 struct txn_limbo_entry *entry);
> +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
>   
>   int
>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>   {
>   	struct txn *txn = entry->txn;
> -	assert(entry->lsn > 0);
> +	assert(entry->lsn > 0 || !txn_has_flag(entry->txn, TXN_WAIT_ACK));
>   	assert(!txn_has_flag(txn, TXN_IS_DONE));
>   	assert(txn_has_flag(txn, TXN_WAIT_SYNC));
>   	if (txn_limbo_check_complete(limbo, entry))
> @@ -176,7 +182,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>   			goto complete;
>   		}
>   
> -		txn_limbo_write_rollback(limbo, entry);
> +		txn_limbo_write_rollback(limbo, entry->lsn);
>   		struct txn_limbo_entry *e, *tmp;
>   		rlist_foreach_entry_safe_reverse(e, &limbo->queue,
>   						 in_queue, tmp) {
> @@ -210,10 +216,11 @@ complete:
>   }
>   
>   static int
> -txn_limbo_write_confirm_rollback(struct txn_limbo *limbo,
> -				 struct txn_limbo_entry *entry,
> +txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
>   				 bool is_confirm)
>   {
> +	assert(lsn > 0);
> +
>   	struct xrow_header row;
>   	struct request request = {
>   		.header = &row,
> @@ -221,14 +228,13 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo,
>   
>   	int res = 0;
>   	if (is_confirm) {
> -		res = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
> +		res = xrow_encode_confirm(&row, limbo->instance_id, lsn);
>   	} else {
>   		/*
>   		 * This entry is the first to be rolled back, so
> -		 * the last "safe" lsn is entry->lsn - 1.
> +		 * the last "safe" lsn is lsn - 1.
>   		 */
> -		res = xrow_encode_rollback(&row, limbo->instance_id,
> -					   entry->lsn - 1);
> +		res = xrow_encode_rollback(&row, limbo->instance_id, lsn - 1);
>   	}
>   	if (res == -1)
>   		return -1;
> @@ -260,10 +266,9 @@ rollback:
>    * transactions waiting for confirmation may be finished.
>    */
>   static int
> -txn_limbo_write_confirm(struct txn_limbo *limbo,
> -			struct txn_limbo_entry *entry)
> +txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
>   {
> -	return txn_limbo_write_confirm_rollback(limbo, entry, true);
> +	return txn_limbo_write_confirm_rollback(limbo, lsn, true);
>   }
>   
>   void
> @@ -300,14 +305,13 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
>   
>   /**
>    * Write a rollback message to WAL. After it's written
> - * all the tarnsactions following the current one and waiting
> + * all the transactions following the current one and waiting
>    * for confirmation must be rolled back.
>    */
>   static int
> -txn_limbo_write_rollback(struct txn_limbo *limbo,
> -			 struct txn_limbo_entry *entry)
> +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
>   {
> -	return txn_limbo_write_confirm_rollback(limbo, entry, false);
> +	return txn_limbo_write_confirm_rollback(limbo, lsn, false);
>   }
>   
>   void
> @@ -316,7 +320,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
>   	assert(limbo->instance_id != REPLICA_ID_NIL);
>   	struct txn_limbo_entry *e, *tmp;
>   	rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) {
> -		if (e->lsn <= lsn)
> +		if (e->lsn <= lsn && txn_has_flag(e->txn, TXN_WAIT_ACK))
>   			break;

Are you rolling back the async transactions that are before the last sync
transaction to be rolled back? Why?
Shouldn't this condition stay the same?

>   		e->is_rollback = true;
>   		txn_limbo_pop(limbo, e);
> @@ -350,31 +354,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
>   	int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
>   	vclock_follow(&limbo->vclock, replica_id, lsn);
>   	struct txn_limbo_entry *e, *last_quorum = NULL;
> +	int64_t confirm_lsn = -1;
>   	rlist_foreach_entry(e, &limbo->queue, in_queue) {
> +		assert(e->ack_count <= VCLOCK_MAX);
>   		if (e->lsn > lsn)
>   			break;
> -		if (e->lsn <= prev_lsn)
> -			continue;
> -		assert(e->ack_count <= VCLOCK_MAX);
>   		/*
>   		 * Sync transactions need to collect acks. Async
>   		 * transactions are automatically committed right
>   		 * after all the previous sync transactions are.
>   		 */
> -		if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
> -			if (++e->ack_count < replication_synchro_quorum)
> -				continue;
> -		} else {
> -			assert(txn_has_flag(e->txn, TXN_WAIT_SYNC));
> +		if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) {
> +			assert(e->lsn == -1);
>   			if (last_quorum == NULL)
>   				continue;
> +		} else if (e->lsn <= prev_lsn) {
> +			continue;
> +		} else if (++e->ack_count < replication_synchro_quorum) {
> +			continue;
> +		} else {
> +			confirm_lsn = e->lsn;
>   		}
>   		e->is_commit = true;
>   		last_quorum = e;
>   	}
>   	if (last_quorum == NULL)
>   		return;
> -	if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
> +	if (txn_limbo_write_confirm(limbo, confirm_lsn) != 0) {
>   		// TODO: what to do here?.
>   		// We already failed writing the CONFIRM
>   		// message. What are the chances we'll be
> diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result
> index 32deb2ac3..339fc0e33 100644
> --- a/test/replication/qsync_basic.result
> +++ b/test/replication/qsync_basic.result
> @@ -299,6 +299,126 @@ box.space.sync:select{6}
>    | - []
>    | ...
>   
> +--
> +-- Fully local async transaction also waits for existing sync txn.
> +--
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +box.cfg{replication_synchro_timeout = 1000, replication_synchro_quorum = 2}
> + | ---
> + | ...
> +_ = box.schema.create_space('locallocal', {is_local = true})
> + | ---
> + | ...
> +_ = _:create_index('pk')
> + | ---
> + | ...
> +-- Propagate local vclock to some insane value to ensure it won't
> +-- affect anything.
> +box.begin() for i = 1, 2000 do box.space.locallocal:replace{1} end box.commit()
> + | ---
> + | ...
> +do                                                                              \
> +    f1 = fiber.create(box.space.sync.replace, box.space.sync, {8})              \
> +    f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {8})  \
> +    box.space.test:replace{8}                                                   \
> +end
> + | ---
> + | ...
> +f1:status()
> + | ---
> + | - dead
> + | ...
> +f2:status()
> + | ---
> + | - dead
> + | ...
> +box.space.sync:select{8}
> + | ---
> + | - - [8]
> + | ...
> +box.space.locallocal:select{8}
> + | ---
> + | - - [8]
> + | ...
> +box.space.test:select{8}
> + | ---
> + | - - [8]
> + | ...
> +
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.sync:select{8}
> + | ---
> + | - - [8]
> + | ...
> +box.space.locallocal:select{8}
> + | ---
> + | - []
> + | ...
> +box.space.test:select{8}
> + | ---
> + | - - [8]
> + | ...
> +
> +-- Ensure sync rollback will affect all pending fully local async
> +-- transactions too.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3}
> + | ---
> + | ...
> +do                                                                              \
> +    f1 = fiber.create(box.space.sync.replace, box.space.sync, {9})              \
> +    f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {9})  \
> +    box.space.test:replace{9}                                                   \
> +end
> + | ---
> + | - error: A rollback for a synchronous transaction is received
> + | ...
> +f1:status()
> + | ---
> + | - dead
> + | ...
> +f2:status()
> + | ---
> + | - dead
> + | ...
> +box.space.sync:select{9}
> + | ---
> + | - []
> + | ...
> +box.space.locallocal:select{9}
> + | ---
> + | - []
> + | ...
> +box.space.test:select{9}
> + | ---
> + | - []
> + | ...
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.sync:select{9}
> + | ---
> + | - []
> + | ...
> +box.space.locallocal:select{9}
> + | ---
> + | - []
> + | ...
> +box.space.test:select{9}
> + | ---
> + | - []
> + | ...
> +
>   --
>   -- gh-5123: quorum 1 still should write CONFIRM.
>   --
> diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua
> index 361f22bc3..6e40131bf 100644
> --- a/test/replication/qsync_basic.test.lua
> +++ b/test/replication/qsync_basic.test.lua
> @@ -118,6 +118,51 @@ test_run:switch('replica')
>   box.space.test:select{6}
>   box.space.sync:select{6}
>   
> +--
> +-- Fully local async transaction also waits for existing sync txn.
> +--
> +test_run:switch('default')
> +box.cfg{replication_synchro_timeout = 1000, replication_synchro_quorum = 2}
> +_ = box.schema.create_space('locallocal', {is_local = true})
> +_ = _:create_index('pk')
> +-- Propagate local vclock to some insane value to ensure it won't
> +-- affect anything.
> +box.begin() for i = 1, 2000 do box.space.locallocal:replace{1} end box.commit()
> +do                                                                              \
> +    f1 = fiber.create(box.space.sync.replace, box.space.sync, {8})              \
> +    f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {8})  \
> +    box.space.test:replace{8}                                                   \
> +end
> +f1:status()
> +f2:status()
> +box.space.sync:select{8}
> +box.space.locallocal:select{8}
> +box.space.test:select{8}
> +
> +test_run:switch('replica')
> +box.space.sync:select{8}
> +box.space.locallocal:select{8}
> +box.space.test:select{8}
> +
> +-- Ensure sync rollback will affect all pending fully local async
> +-- transactions too.
> +test_run:switch('default')
> +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3}
> +do                                                                              \
> +    f1 = fiber.create(box.space.sync.replace, box.space.sync, {9})              \
> +    f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {9})  \
> +    box.space.test:replace{9}                                                   \
> +end
> +f1:status()
> +f2:status()
> +box.space.sync:select{9}
> +box.space.locallocal:select{9}
> +box.space.test:select{9}
> +test_run:switch('replica')
> +box.space.sync:select{9}
> +box.space.locallocal:select{9}
> +box.space.test:select{9}
> +
>   --
>   -- gh-5123: quorum 1 still should write CONFIRM.
>   --

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list