[Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config

Leonid Vasiliev lvasiliev at tarantool.org
Fri Jul 10 00:40:55 MSK 2020


Hi! Thank you for the patch.
See some questions below.

On 07.07.2020 01:43, Vladislav Shpilevoy wrote:
> Synchronous replication options - replication_synchro_quorum and
> replication_synchro_timeout - were not updated for the existing
> transactions on change. As a result, there could be weird
> inconsistencies, when a new transaction could have required quorum
> smaller than a previous transaction's, and could implicitly
> confirm it. The same could be told about rollback on timeout - new
> transactions could wake up earlier than older transactions.
> 
> This patch makes configuration dynamic. So if the mentioned
> options are updated, they are applied to the existing transactions
> too.
> 
> It opens wide administrative capabilities. For example, when
> replica count becomes less than the quorum, an administrator can
> lower the quorum dynamically, and it will be applied to all the
> existing transactions.
> 
> Closes #5119
> ---
> Branch: http://github.com/tarantool/tarantool/tree/gh-4842-sync-replication
> Issue 1: https://github.com/tarantool/tarantool/issues/4842
> Issue 2: https://github.com/tarantool/tarantool/issues/5119
> 
>   src/box/box.cc                        |   2 +
>   src/box/txn_limbo.c                   | 162 +++++++++++++++++---------
>   src/box/txn_limbo.h                   |  10 ++
>   test/replication/qsync_basic.result   |  90 ++++++++++++++
>   test/replication/qsync_basic.test.lua |  32 +++++
>   5 files changed, 242 insertions(+), 54 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 5e28276f0..e15ae0e44 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -865,6 +865,7 @@ box_set_replication_synchro_quorum(void)
>   	if (value < 0)
>   		return -1;
>   	replication_synchro_quorum = value;
> +	txn_limbo_on_parameters_change(&txn_limbo);
>   	return 0;
>   }
>   
> @@ -875,6 +876,7 @@ box_set_replication_synchro_timeout(void)
>   	if (value < 0)
>   		return -1;
>   	replication_synchro_timeout = value;
> +	txn_limbo_on_parameters_change(&txn_limbo);
>   	return 0;
>   }
>   
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index e28e2016f..2575f4c25 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -39,6 +39,7 @@ txn_limbo_create(struct txn_limbo *limbo)
>   {
>   	rlist_create(&limbo->queue);
>   	limbo->instance_id = REPLICA_ID_NIL;
> +	fiber_cond_create(&limbo->wait_cond);
>   	vclock_create(&limbo->vclock);
>   	limbo->rollback_count = 0;
>   }
> @@ -159,45 +160,56 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>   
>   	assert(!txn_has_flag(txn, TXN_IS_DONE));
>   	assert(txn_has_flag(txn, TXN_WAIT_SYNC));
> -	bool cancellable = fiber_set_cancellable(false);
> -	bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo));
> -	fiber_set_cancellable(cancellable);
> -	if (timed_out) {
> -		assert(!txn_limbo_is_empty(limbo));
> -		if (txn_limbo_first_entry(limbo) != entry) {
> -			/*
> -			 * If this is not a first entry in the
> -			 * limbo, it is definitely not a first
> -			 * timed out entry. And since it managed
> -			 * to time out too, it means there is
> -			 * currently another fiber writing
> -			 * rollback. Wait when it will finish and
> -			 * wake us up.
> -			 */
> -			bool cancellable = fiber_set_cancellable(false);
> -			fiber_yield();
> -			fiber_set_cancellable(cancellable);
> -			assert(txn_limbo_entry_is_complete(entry));
> +	double start_time = fiber_clock();
> +	while (true) {
> +		double deadline = start_time + txn_limbo_confirm_timeout(limbo);
> +		bool cancellable = fiber_set_cancellable(false);
> +		double timeout = deadline - fiber_clock();

Why not just timeout = txn_limbo_confirm_timeout(limbo) ?
It's look like
fiber_clock()(old) + txn_limbo_confirm_timeout(limbo) - 
fiber_clock()(new) ~= txn_limbo_confirm_timeout(limbo)

> +		bool timed_out = fiber_cond_wait_timeout(&limbo->wait_cond,
> +							 timeout);
> +		fiber_set_cancellable(cancellable);
> +		if (txn_limbo_entry_is_complete(entry))
>   			goto complete;
> -		}
> +		if (timed_out)
> +			goto do_rollback;
> +	}
>   
> -		txn_limbo_write_rollback(limbo, entry->lsn);
> -		struct txn_limbo_entry *e, *tmp;
> -		rlist_foreach_entry_safe_reverse(e, &limbo->queue,
> -						 in_queue, tmp) {
> -			e->is_rollback = true;
> -			e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
> -			txn_limbo_pop(limbo, e);
> -			txn_clear_flag(e->txn, TXN_WAIT_SYNC);
> -			txn_clear_flag(e->txn, TXN_WAIT_ACK);
> -			txn_complete(e->txn);
> -			if (e == entry)
> -				break;
> -			fiber_wakeup(e->txn->fiber);
> -		}
> -		diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> -		return -1;
> +do_rollback:
> +	assert(!txn_limbo_is_empty(limbo));
> +	if (txn_limbo_first_entry(limbo) != entry) {
> +		/*
> +		 * If this is not a first entry in the limbo, it
> +		 * is definitely not a first timed out entry. And
> +		 * since it managed to time out too, it means
> +		 * there is currently another fiber writing
> +		 * rollback. Wait when it will finish and wake us
> +		 * up.
> +		 */
> +		bool cancellable = fiber_set_cancellable(false);
> +		do {
> +			fiber_yield();
> +		} while (!txn_limbo_entry_is_complete(entry));
> +		fiber_set_cancellable(cancellable);
> +		goto complete;
>   	}
> +
> +	txn_limbo_write_rollback(limbo, entry->lsn);
> +	struct txn_limbo_entry *e, *tmp;
> +	rlist_foreach_entry_safe_reverse(e, &limbo->queue,
> +					 in_queue, tmp) {
> +		e->is_rollback = true;
> +		e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
> +		txn_limbo_pop(limbo, e);
> +		txn_clear_flag(e->txn, TXN_WAIT_SYNC);
> +		txn_clear_flag(e->txn, TXN_WAIT_ACK);
> +		txn_complete(e->txn);
> +		if (e == entry)
> +			break;
> +		fiber_wakeup(e->txn->fiber);
> +	}
> +	diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> +	return -1;
> +
>   complete:
>   	assert(txn_limbo_entry_is_complete(entry));
>   	/*
> @@ -421,15 +433,13 @@ txn_limbo_confirm_timeout(struct txn_limbo *limbo)
>    * or array instead of the boolean.
>    */
>   struct confirm_waitpoint {
> -	/**
> -	 * Variable for wake up the fiber that is waiting for
> -	 * the end of confirmation.
> -	 */
> -	struct fiber_cond confirm_cond;
> +	/** Fiber that is waiting for the end of confirmation. */
> +	struct fiber *caller;
>   	/**
>   	 * Result flag.
>   	 */
>   	bool is_confirm;
> +	bool is_rollback;
>   };
>   
>   static int
> @@ -439,7 +449,7 @@ txn_commit_cb(struct trigger *trigger, void *event)
>   	struct confirm_waitpoint *cwp =
>   		(struct confirm_waitpoint *)trigger->data;
>   	cwp->is_confirm = true;
> -	fiber_cond_signal(&cwp->confirm_cond);
> +	fiber_wakeup(cwp->caller);
>   	return 0;
>   }
>   
> @@ -449,7 +459,8 @@ txn_rollback_cb(struct trigger *trigger, void *event)
>   	(void)event;
>   	struct confirm_waitpoint *cwp =
>   		(struct confirm_waitpoint *)trigger->data;
> -	fiber_cond_signal(&cwp->confirm_cond);
> +	cwp->is_rollback = true;
> +	fiber_wakeup(cwp->caller);
>   	return 0;
>   }
>   
> @@ -461,8 +472,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
>   
>   	/* initialization of a waitpoint. */
>   	struct confirm_waitpoint cwp;
> -	fiber_cond_create(&cwp.confirm_cond);
> +	cwp.caller = fiber();
>   	cwp.is_confirm = false;
> +	cwp.is_rollback = false;
>   
>   	/* Set triggers for the last limbo transaction. */
>   	struct trigger on_complete;
> @@ -472,17 +484,26 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
>   	struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo);
>   	txn_on_commit(tle->txn, &on_complete);
>   	txn_on_rollback(tle->txn, &on_rollback);
> -
> -	int rc = fiber_cond_wait_timeout(&cwp.confirm_cond,
> -					 txn_limbo_confirm_timeout(limbo));
> -	fiber_cond_destroy(&cwp.confirm_cond);
> -	if (rc != 0) {
> -		/* Clear the triggers if the timeout has been reached. */
> -		trigger_clear(&on_complete);
> -		trigger_clear(&on_rollback);
> -		diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> -		return -1;
> +	double start_time = fiber_clock(); > +	while (true) {
> +		double deadline = start_time + txn_limbo_confirm_timeout(limbo);
> +		bool cancellable = fiber_set_cancellable(false);
> +		double timeout = deadline - fiber_clock();

Maybe add a comment about the possible reasons of wake up
(reconfiguration, triggers, timeout).

> +		int rc = fiber_cond_wait_timeout(&limbo->wait_cond, timeout);

If I understand correctly, you use a trick for wake up from triggers.
In this case fiber_wakeup invoke manually without using the fiber_cond
API. In this context, I have few questions/comments:
- IMHO, this is not true way to use implemantation details of fiber_cond
passing over the fiber_cond API.
- In the case, we don't interact with waiters list inside condition
variable. Are you sure this is ok?

> +		fiber_set_cancellable(cancellable);
> +		if (cwp.is_confirm || cwp.is_rollback)
> +			goto complete;
> +		if (rc != 0)
> +			goto timed_out;
>   	}
> +timed_out:
> +	/* Clear the triggers if the timeout has been reached. */
> +	trigger_clear(&on_complete);
> +	trigger_clear(&on_rollback);
> +	diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> +	return -1;
> +
> +complete:
>   	if (!cwp.is_confirm) {
>   		/* The transaction has been rolled back. */
>   		diag_set(ClientError, ER_SYNC_ROLLBACK);
> @@ -517,6 +538,39 @@ txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
>   	}
>   }
>   
> +void
> +txn_limbo_on_parameters_change(struct txn_limbo *limbo)
> +{
> +	if (rlist_empty(&limbo->queue))
> +		return;
> +	struct txn_limbo_entry *e;
> +	int64_t confirm_lsn = -1;
> +	rlist_foreach_entry(e, &limbo->queue, in_queue) {
> +		assert(e->ack_count <= VCLOCK_MAX);
> +		if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) {
> +			assert(e->lsn == -1);
> +			if (confirm_lsn == -1)
> +				continue;
> +		} else if (e->ack_count < replication_synchro_quorum) {
> +			continue;
> +		} else {
> +			confirm_lsn = e->lsn;
> +			assert(confirm_lsn > 0);
> +		}
> +		e->is_commit = true;
> +	}
> +	if (confirm_lsn > 0 &&
> +	    txn_limbo_write_confirm(limbo, confirm_lsn) != 0) {
> +		panic("Couldn't write CONFIRM to WAL");
> +		return;
> +	}
> +	/*
> +	 * Wakeup all. Confirmed will be committed. Timed out will
> +	 * rollback.
> +	 */
> +	fiber_cond_broadcast(&limbo->wait_cond);
> +}
> +
>   void
>   txn_limbo_init(void)
>   {
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 0d56d0d69..1ee416231 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -103,6 +103,13 @@ struct txn_limbo {
>   	 * LSNs in their vclock components.
>   	 */
>   	uint32_t instance_id;
> +	/**
> +	 * Condition to wait for completion. It is supposed to be
> +	 * signaled when the synchro parameters change. Allowing
> +	 * the sleeping fibers to reconsider their timeouts when
> +	 * the parameters are updated.
> +	 */
> +	struct fiber_cond wait_cond;
>   	/**
>   	 * All components of the vclock are versions of the limbo
>   	 * owner's LSN, how it is visible on other nodes. For
> @@ -219,6 +226,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo);
>   void
>   txn_limbo_force_empty(struct txn_limbo *limbo, int64_t last_confirm);
>   
> +void
> +txn_limbo_on_parameters_change(struct txn_limbo *limbo);
> +
>   void
>   txn_limbo_init();
>   
> diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result
> index 3e28607b0..59f5d9123 100644
> --- a/test/replication/qsync_basic.result
> +++ b/test/replication/qsync_basic.result
> @@ -475,6 +475,96 @@ box.space.sync:select{7}
>    | - - [7]
>    | ...
>   
> +--
> +-- gh-5119: dynamic limbo configuration. Updated parameters should
> +-- be applied even to existing transactions.
> +--
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000}
> + | ---
> + | ...
> +ok, err = nil
> + | ---
> + | ...
> +f = fiber.create(function()                                                     \
> +    ok, err = pcall(box.space.sync.insert, box.space.sync, {11})                \
> +end)
> + | ---
> + | ...
> +f:status()
> + | ---
> + | - suspended
> + | ...
> +box.cfg{replication_synchro_timeout = 0.001}
> + | ---
> + | ...
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> + | ---
> + | - true
> + | ...
> +ok, err
> + | ---
> + | - false
> + | - Quorum collection for a synchronous transaction is timed out
> + | ...
> +box.space.sync:select{11}
> + | ---
> + | - []
> + | ...
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.sync:select{11}
> + | ---
> + | - []
> + | ...
> +
> +-- Test it is possible to early ACK a transaction with a new quorum.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +ok, err = nil
> + | ---
> + | ...
> +f = fiber.create(function()                                                     \
> +    ok, err = pcall(box.space.sync.insert, box.space.sync, {12})                \
> +end)
> + | ---
> + | ...
> +f:status()
> + | ---
> + | - suspended
> + | ...
> +box.cfg{replication_synchro_quorum = 2}
> + | ---
> + | ...
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> + | ---
> + | - true
> + | ...
> +ok, err
> + | ---
> + | - true
> + | - [12]
> + | ...
> +box.space.sync:select{12}
> + | ---
> + | - - [12]
> + | ...
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.sync:select{12}
> + | ---
> + | - - [12]
> + | ...
> +
>   -- Cleanup.
>   test_run:cmd('switch default')
>    | ---
> diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua
> index 860d6d6c4..1bb3ba87d 100644
> --- a/test/replication/qsync_basic.test.lua
> +++ b/test/replication/qsync_basic.test.lua
> @@ -198,6 +198,38 @@ assert(newlsn >= oldlsn + 2)
>   test_run:switch('replica')
>   box.space.sync:select{7}
>   
> +--
> +-- gh-5119: dynamic limbo configuration. Updated parameters should
> +-- be applied even to existing transactions.
> +--
> +test_run:switch('default')
> +box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000}
> +ok, err = nil
> +f = fiber.create(function()                                                     \
> +    ok, err = pcall(box.space.sync.insert, box.space.sync, {11})                \
> +end)
> +f:status()
> +box.cfg{replication_synchro_timeout = 0.001}
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> +ok, err
> +box.space.sync:select{11}
> +test_run:switch('replica')
> +box.space.sync:select{11}
> +
> +-- Test it is possible to early ACK a transaction with a new quorum.
> +test_run:switch('default')
> +ok, err = nil
> +f = fiber.create(function()                                                     \
> +    ok, err = pcall(box.space.sync.insert, box.space.sync, {12})                \
> +end)
> +f:status()
> +box.cfg{replication_synchro_quorum = 2}
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> +ok, err
> +box.space.sync:select{12}
> +test_run:switch('replica')
> +box.space.sync:select{12}
> +
>   -- Cleanup.
>   test_run:cmd('switch default')
>   
> 


More information about the Tarantool-patches mailing list