[Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config
Leonid Vasiliev
lvasiliev at tarantool.org
Fri Jul 10 00:40:55 MSK 2020
Hi! Thank you for the patch.
See some questions below.
On 07.07.2020 01:43, Vladislav Shpilevoy wrote:
> Synchronous replication options - replication_synchro_quorum and
> replication_synchro_timeout - were not updated for the existing
> transactions on change. As a result, there could be weird
> inconsistencies, when a new transaction could have required quorum
> smaller than a previous transaction's, and could implicitly
> confirm it. The same could be told about rollback on timeout - new
> transactions could wake up earlier than older transactions.
>
> This patch makes configuration dynamic. So if the mentioned
> options are updated, they are applied to the existing transactions
> too.
>
> It opens wide administrative capabilities. For example, when
> replica count becomes less than the quorum, an administrator can
> lower the quorum dynamically, and it will be applied to all the
> existing transactions.
>
> Closes #5119
> ---
> Branch: http://github.com/tarantool/tarantool/tree/gh-4842-sync-replication
> Issue 1: https://github.com/tarantool/tarantool/issues/4842
> Issue 2: https://github.com/tarantool/tarantool/issues/5119
>
> src/box/box.cc | 2 +
> src/box/txn_limbo.c | 162 +++++++++++++++++---------
> src/box/txn_limbo.h | 10 ++
> test/replication/qsync_basic.result | 90 ++++++++++++++
> test/replication/qsync_basic.test.lua | 32 +++++
> 5 files changed, 242 insertions(+), 54 deletions(-)
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 5e28276f0..e15ae0e44 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -865,6 +865,7 @@ box_set_replication_synchro_quorum(void)
> if (value < 0)
> return -1;
> replication_synchro_quorum = value;
> + txn_limbo_on_parameters_change(&txn_limbo);
> return 0;
> }
>
> @@ -875,6 +876,7 @@ box_set_replication_synchro_timeout(void)
> if (value < 0)
> return -1;
> replication_synchro_timeout = value;
> + txn_limbo_on_parameters_change(&txn_limbo);
> return 0;
> }
>
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index e28e2016f..2575f4c25 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -39,6 +39,7 @@ txn_limbo_create(struct txn_limbo *limbo)
> {
> rlist_create(&limbo->queue);
> limbo->instance_id = REPLICA_ID_NIL;
> + fiber_cond_create(&limbo->wait_cond);
> vclock_create(&limbo->vclock);
> limbo->rollback_count = 0;
> }
> @@ -159,45 +160,56 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>
> assert(!txn_has_flag(txn, TXN_IS_DONE));
> assert(txn_has_flag(txn, TXN_WAIT_SYNC));
> - bool cancellable = fiber_set_cancellable(false);
> - bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo));
> - fiber_set_cancellable(cancellable);
> - if (timed_out) {
> - assert(!txn_limbo_is_empty(limbo));
> - if (txn_limbo_first_entry(limbo) != entry) {
> - /*
> - * If this is not a first entry in the
> - * limbo, it is definitely not a first
> - * timed out entry. And since it managed
> - * to time out too, it means there is
> - * currently another fiber writing
> - * rollback. Wait when it will finish and
> - * wake us up.
> - */
> - bool cancellable = fiber_set_cancellable(false);
> - fiber_yield();
> - fiber_set_cancellable(cancellable);
> - assert(txn_limbo_entry_is_complete(entry));
> + double start_time = fiber_clock();
> + while (true) {
> + double deadline = start_time + txn_limbo_confirm_timeout(limbo);
> + bool cancellable = fiber_set_cancellable(false);
> + double timeout = deadline - fiber_clock();
Why not just timeout = txn_limbo_confirm_timeout(limbo) ?
It's look like
fiber_clock()(old) + txn_limbo_confirm_timeout(limbo) -
fiber_clock()(new) ~= txn_limbo_confirm_timeout(limbo)
> + bool timed_out = fiber_cond_wait_timeout(&limbo->wait_cond,
> + timeout);
> + fiber_set_cancellable(cancellable);
> + if (txn_limbo_entry_is_complete(entry))
> goto complete;
> - }
> + if (timed_out)
> + goto do_rollback;
> + }
>
> - txn_limbo_write_rollback(limbo, entry->lsn);
> - struct txn_limbo_entry *e, *tmp;
> - rlist_foreach_entry_safe_reverse(e, &limbo->queue,
> - in_queue, tmp) {
> - e->is_rollback = true;
> - e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
> - txn_limbo_pop(limbo, e);
> - txn_clear_flag(e->txn, TXN_WAIT_SYNC);
> - txn_clear_flag(e->txn, TXN_WAIT_ACK);
> - txn_complete(e->txn);
> - if (e == entry)
> - break;
> - fiber_wakeup(e->txn->fiber);
> - }
> - diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> - return -1;
> +do_rollback:
> + assert(!txn_limbo_is_empty(limbo));
> + if (txn_limbo_first_entry(limbo) != entry) {
> + /*
> + * If this is not a first entry in the limbo, it
> + * is definitely not a first timed out entry. And
> + * since it managed to time out too, it means
> + * there is currently another fiber writing
> + * rollback. Wait when it will finish and wake us
> + * up.
> + */
> + bool cancellable = fiber_set_cancellable(false);
> + do {
> + fiber_yield();
> + } while (!txn_limbo_entry_is_complete(entry));
> + fiber_set_cancellable(cancellable);
> + goto complete;
> }
> +
> + txn_limbo_write_rollback(limbo, entry->lsn);
> + struct txn_limbo_entry *e, *tmp;
> + rlist_foreach_entry_safe_reverse(e, &limbo->queue,
> + in_queue, tmp) {
> + e->is_rollback = true;
> + e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
> + txn_limbo_pop(limbo, e);
> + txn_clear_flag(e->txn, TXN_WAIT_SYNC);
> + txn_clear_flag(e->txn, TXN_WAIT_ACK);
> + txn_complete(e->txn);
> + if (e == entry)
> + break;
> + fiber_wakeup(e->txn->fiber);
> + }
> + diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> + return -1;
> +
> complete:
> assert(txn_limbo_entry_is_complete(entry));
> /*
> @@ -421,15 +433,13 @@ txn_limbo_confirm_timeout(struct txn_limbo *limbo)
> * or array instead of the boolean.
> */
> struct confirm_waitpoint {
> - /**
> - * Variable for wake up the fiber that is waiting for
> - * the end of confirmation.
> - */
> - struct fiber_cond confirm_cond;
> + /** Fiber that is waiting for the end of confirmation. */
> + struct fiber *caller;
> /**
> * Result flag.
> */
> bool is_confirm;
> + bool is_rollback;
> };
>
> static int
> @@ -439,7 +449,7 @@ txn_commit_cb(struct trigger *trigger, void *event)
> struct confirm_waitpoint *cwp =
> (struct confirm_waitpoint *)trigger->data;
> cwp->is_confirm = true;
> - fiber_cond_signal(&cwp->confirm_cond);
> + fiber_wakeup(cwp->caller);
> return 0;
> }
>
> @@ -449,7 +459,8 @@ txn_rollback_cb(struct trigger *trigger, void *event)
> (void)event;
> struct confirm_waitpoint *cwp =
> (struct confirm_waitpoint *)trigger->data;
> - fiber_cond_signal(&cwp->confirm_cond);
> + cwp->is_rollback = true;
> + fiber_wakeup(cwp->caller);
> return 0;
> }
>
> @@ -461,8 +472,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
>
> /* initialization of a waitpoint. */
> struct confirm_waitpoint cwp;
> - fiber_cond_create(&cwp.confirm_cond);
> + cwp.caller = fiber();
> cwp.is_confirm = false;
> + cwp.is_rollback = false;
>
> /* Set triggers for the last limbo transaction. */
> struct trigger on_complete;
> @@ -472,17 +484,26 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
> struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo);
> txn_on_commit(tle->txn, &on_complete);
> txn_on_rollback(tle->txn, &on_rollback);
> -
> - int rc = fiber_cond_wait_timeout(&cwp.confirm_cond,
> - txn_limbo_confirm_timeout(limbo));
> - fiber_cond_destroy(&cwp.confirm_cond);
> - if (rc != 0) {
> - /* Clear the triggers if the timeout has been reached. */
> - trigger_clear(&on_complete);
> - trigger_clear(&on_rollback);
> - diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> - return -1;
> + double start_time = fiber_clock(); > + while (true) {
> + double deadline = start_time + txn_limbo_confirm_timeout(limbo);
> + bool cancellable = fiber_set_cancellable(false);
> + double timeout = deadline - fiber_clock();
Maybe add a comment about the possible reasons of wake up
(reconfiguration, triggers, timeout).
> + int rc = fiber_cond_wait_timeout(&limbo->wait_cond, timeout);
If I understand correctly, you use a trick for wake up from triggers.
In this case fiber_wakeup invoke manually without using the fiber_cond
API. In this context, I have few questions/comments:
- IMHO, this is not true way to use implemantation details of fiber_cond
passing over the fiber_cond API.
- In the case, we don't interact with waiters list inside condition
variable. Are you sure this is ok?
> + fiber_set_cancellable(cancellable);
> + if (cwp.is_confirm || cwp.is_rollback)
> + goto complete;
> + if (rc != 0)
> + goto timed_out;
> }
> +timed_out:
> + /* Clear the triggers if the timeout has been reached. */
> + trigger_clear(&on_complete);
> + trigger_clear(&on_rollback);
> + diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> + return -1;
> +
> +complete:
> if (!cwp.is_confirm) {
> /* The transaction has been rolled back. */
> diag_set(ClientError, ER_SYNC_ROLLBACK);
> @@ -517,6 +538,39 @@ txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
> }
> }
>
> +void
> +txn_limbo_on_parameters_change(struct txn_limbo *limbo)
> +{
> + if (rlist_empty(&limbo->queue))
> + return;
> + struct txn_limbo_entry *e;
> + int64_t confirm_lsn = -1;
> + rlist_foreach_entry(e, &limbo->queue, in_queue) {
> + assert(e->ack_count <= VCLOCK_MAX);
> + if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) {
> + assert(e->lsn == -1);
> + if (confirm_lsn == -1)
> + continue;
> + } else if (e->ack_count < replication_synchro_quorum) {
> + continue;
> + } else {
> + confirm_lsn = e->lsn;
> + assert(confirm_lsn > 0);
> + }
> + e->is_commit = true;
> + }
> + if (confirm_lsn > 0 &&
> + txn_limbo_write_confirm(limbo, confirm_lsn) != 0) {
> + panic("Couldn't write CONFIRM to WAL");
> + return;
> + }
> + /*
> + * Wakeup all. Confirmed will be committed. Timed out will
> + * rollback.
> + */
> + fiber_cond_broadcast(&limbo->wait_cond);
> +}
> +
> void
> txn_limbo_init(void)
> {
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 0d56d0d69..1ee416231 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -103,6 +103,13 @@ struct txn_limbo {
> * LSNs in their vclock components.
> */
> uint32_t instance_id;
> + /**
> + * Condition to wait for completion. It is supposed to be
> + * signaled when the synchro parameters change. Allowing
> + * the sleeping fibers to reconsider their timeouts when
> + * the parameters are updated.
> + */
> + struct fiber_cond wait_cond;
> /**
> * All components of the vclock are versions of the limbo
> * owner's LSN, how it is visible on other nodes. For
> @@ -219,6 +226,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo);
> void
> txn_limbo_force_empty(struct txn_limbo *limbo, int64_t last_confirm);
>
> +void
> +txn_limbo_on_parameters_change(struct txn_limbo *limbo);
> +
> void
> txn_limbo_init();
>
> diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result
> index 3e28607b0..59f5d9123 100644
> --- a/test/replication/qsync_basic.result
> +++ b/test/replication/qsync_basic.result
> @@ -475,6 +475,96 @@ box.space.sync:select{7}
> | - - [7]
> | ...
>
> +--
> +-- gh-5119: dynamic limbo configuration. Updated parameters should
> +-- be applied even to existing transactions.
> +--
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000}
> + | ---
> + | ...
> +ok, err = nil
> + | ---
> + | ...
> +f = fiber.create(function() \
> + ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \
> +end)
> + | ---
> + | ...
> +f:status()
> + | ---
> + | - suspended
> + | ...
> +box.cfg{replication_synchro_timeout = 0.001}
> + | ---
> + | ...
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> + | ---
> + | - true
> + | ...
> +ok, err
> + | ---
> + | - false
> + | - Quorum collection for a synchronous transaction is timed out
> + | ...
> +box.space.sync:select{11}
> + | ---
> + | - []
> + | ...
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.sync:select{11}
> + | ---
> + | - []
> + | ...
> +
> +-- Test it is possible to early ACK a transaction with a new quorum.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +ok, err = nil
> + | ---
> + | ...
> +f = fiber.create(function() \
> + ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \
> +end)
> + | ---
> + | ...
> +f:status()
> + | ---
> + | - suspended
> + | ...
> +box.cfg{replication_synchro_quorum = 2}
> + | ---
> + | ...
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> + | ---
> + | - true
> + | ...
> +ok, err
> + | ---
> + | - true
> + | - [12]
> + | ...
> +box.space.sync:select{12}
> + | ---
> + | - - [12]
> + | ...
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.sync:select{12}
> + | ---
> + | - - [12]
> + | ...
> +
> -- Cleanup.
> test_run:cmd('switch default')
> | ---
> diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua
> index 860d6d6c4..1bb3ba87d 100644
> --- a/test/replication/qsync_basic.test.lua
> +++ b/test/replication/qsync_basic.test.lua
> @@ -198,6 +198,38 @@ assert(newlsn >= oldlsn + 2)
> test_run:switch('replica')
> box.space.sync:select{7}
>
> +--
> +-- gh-5119: dynamic limbo configuration. Updated parameters should
> +-- be applied even to existing transactions.
> +--
> +test_run:switch('default')
> +box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000}
> +ok, err = nil
> +f = fiber.create(function() \
> + ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \
> +end)
> +f:status()
> +box.cfg{replication_synchro_timeout = 0.001}
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> +ok, err
> +box.space.sync:select{11}
> +test_run:switch('replica')
> +box.space.sync:select{11}
> +
> +-- Test it is possible to early ACK a transaction with a new quorum.
> +test_run:switch('default')
> +ok, err = nil
> +f = fiber.create(function() \
> + ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \
> +end)
> +f:status()
> +box.cfg{replication_synchro_quorum = 2}
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> +ok, err
> +box.space.sync:select{12}
> +test_run:switch('replica')
> +box.space.sync:select{12}
> +
> -- Cleanup.
> test_run:cmd('switch default')
>
>
More information about the Tarantool-patches
mailing list