From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp5.mail.ru (smtp5.mail.ru [94.100.179.24]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 1ACDF445320 for ; Fri, 10 Jul 2020 00:40:57 +0300 (MSK) References: From: Leonid Vasiliev Message-ID: Date: Fri, 10 Jul 2020 00:40:55 +0300 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org Hi! Thank you for the patch. See some questions below. On 07.07.2020 01:43, Vladislav Shpilevoy wrote: > Synchronous replication options - replication_synchro_quorum and > replication_synchro_timeout - were not updated for the existing > transactions on change. As a result, there could be weird > inconsistencies, when a new transaction could have required quorum > smaller than a previous transaction's, and could implicitly > confirm it. The same could be told about rollback on timeout - new > transactions could wake up earlier than older transactions. > > This patch makes configuration dynamic. So if the mentioned > options are updated, they are applied to the existing transactions > too. > > It opens wide administrative capabilities. For example, when > replica count becomes less than the quorum, an administrator can > lower the quorum dynamically, and it will be applied to all the > existing transactions. > > Closes #5119 > --- > Branch: http://github.com/tarantool/tarantool/tree/gh-4842-sync-replication > Issue 1: https://github.com/tarantool/tarantool/issues/4842 > Issue 2: https://github.com/tarantool/tarantool/issues/5119 > > src/box/box.cc | 2 + > src/box/txn_limbo.c | 162 +++++++++++++++++--------- > src/box/txn_limbo.h | 10 ++ > test/replication/qsync_basic.result | 90 ++++++++++++++ > test/replication/qsync_basic.test.lua | 32 +++++ > 5 files changed, 242 insertions(+), 54 deletions(-) > > diff --git a/src/box/box.cc b/src/box/box.cc > index 5e28276f0..e15ae0e44 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -865,6 +865,7 @@ box_set_replication_synchro_quorum(void) > if (value < 0) > return -1; > replication_synchro_quorum = value; > + txn_limbo_on_parameters_change(&txn_limbo); > return 0; > } > > @@ -875,6 +876,7 @@ box_set_replication_synchro_timeout(void) > if (value < 0) > return -1; > replication_synchro_timeout = value; > + txn_limbo_on_parameters_change(&txn_limbo); > return 0; > } > > diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c > index e28e2016f..2575f4c25 100644 > --- a/src/box/txn_limbo.c > +++ b/src/box/txn_limbo.c > @@ -39,6 +39,7 @@ txn_limbo_create(struct txn_limbo *limbo) > { > rlist_create(&limbo->queue); > limbo->instance_id = REPLICA_ID_NIL; > + fiber_cond_create(&limbo->wait_cond); > vclock_create(&limbo->vclock); > limbo->rollback_count = 0; > } > @@ -159,45 +160,56 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > > assert(!txn_has_flag(txn, TXN_IS_DONE)); > assert(txn_has_flag(txn, TXN_WAIT_SYNC)); > - bool cancellable = fiber_set_cancellable(false); > - bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo)); > - fiber_set_cancellable(cancellable); > - if (timed_out) { > - assert(!txn_limbo_is_empty(limbo)); > - if (txn_limbo_first_entry(limbo) != entry) { > - /* > - * If this is not a first entry in the > - * limbo, it is definitely not a first > - * timed out entry. And since it managed > - * to time out too, it means there is > - * currently another fiber writing > - * rollback. Wait when it will finish and > - * wake us up. > - */ > - bool cancellable = fiber_set_cancellable(false); > - fiber_yield(); > - fiber_set_cancellable(cancellable); > - assert(txn_limbo_entry_is_complete(entry)); > + double start_time = fiber_clock(); > + while (true) { > + double deadline = start_time + txn_limbo_confirm_timeout(limbo); > + bool cancellable = fiber_set_cancellable(false); > + double timeout = deadline - fiber_clock(); Why not just timeout = txn_limbo_confirm_timeout(limbo) ? It's look like fiber_clock()(old) + txn_limbo_confirm_timeout(limbo) - fiber_clock()(new) ~= txn_limbo_confirm_timeout(limbo) > + bool timed_out = fiber_cond_wait_timeout(&limbo->wait_cond, > + timeout); > + fiber_set_cancellable(cancellable); > + if (txn_limbo_entry_is_complete(entry)) > goto complete; > - } > + if (timed_out) > + goto do_rollback; > + } > > - txn_limbo_write_rollback(limbo, entry->lsn); > - struct txn_limbo_entry *e, *tmp; > - rlist_foreach_entry_safe_reverse(e, &limbo->queue, > - in_queue, tmp) { > - e->is_rollback = true; > - e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT; > - txn_limbo_pop(limbo, e); > - txn_clear_flag(e->txn, TXN_WAIT_SYNC); > - txn_clear_flag(e->txn, TXN_WAIT_ACK); > - txn_complete(e->txn); > - if (e == entry) > - break; > - fiber_wakeup(e->txn->fiber); > - } > - diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT); > - return -1; > +do_rollback: > + assert(!txn_limbo_is_empty(limbo)); > + if (txn_limbo_first_entry(limbo) != entry) { > + /* > + * If this is not a first entry in the limbo, it > + * is definitely not a first timed out entry. And > + * since it managed to time out too, it means > + * there is currently another fiber writing > + * rollback. Wait when it will finish and wake us > + * up. > + */ > + bool cancellable = fiber_set_cancellable(false); > + do { > + fiber_yield(); > + } while (!txn_limbo_entry_is_complete(entry)); > + fiber_set_cancellable(cancellable); > + goto complete; > } > + > + txn_limbo_write_rollback(limbo, entry->lsn); > + struct txn_limbo_entry *e, *tmp; > + rlist_foreach_entry_safe_reverse(e, &limbo->queue, > + in_queue, tmp) { > + e->is_rollback = true; > + e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT; > + txn_limbo_pop(limbo, e); > + txn_clear_flag(e->txn, TXN_WAIT_SYNC); > + txn_clear_flag(e->txn, TXN_WAIT_ACK); > + txn_complete(e->txn); > + if (e == entry) > + break; > + fiber_wakeup(e->txn->fiber); > + } > + diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT); > + return -1; > + > complete: > assert(txn_limbo_entry_is_complete(entry)); > /* > @@ -421,15 +433,13 @@ txn_limbo_confirm_timeout(struct txn_limbo *limbo) > * or array instead of the boolean. > */ > struct confirm_waitpoint { > - /** > - * Variable for wake up the fiber that is waiting for > - * the end of confirmation. > - */ > - struct fiber_cond confirm_cond; > + /** Fiber that is waiting for the end of confirmation. */ > + struct fiber *caller; > /** > * Result flag. > */ > bool is_confirm; > + bool is_rollback; > }; > > static int > @@ -439,7 +449,7 @@ txn_commit_cb(struct trigger *trigger, void *event) > struct confirm_waitpoint *cwp = > (struct confirm_waitpoint *)trigger->data; > cwp->is_confirm = true; > - fiber_cond_signal(&cwp->confirm_cond); > + fiber_wakeup(cwp->caller); > return 0; > } > > @@ -449,7 +459,8 @@ txn_rollback_cb(struct trigger *trigger, void *event) > (void)event; > struct confirm_waitpoint *cwp = > (struct confirm_waitpoint *)trigger->data; > - fiber_cond_signal(&cwp->confirm_cond); > + cwp->is_rollback = true; > + fiber_wakeup(cwp->caller); > return 0; > } > > @@ -461,8 +472,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo) > > /* initialization of a waitpoint. */ > struct confirm_waitpoint cwp; > - fiber_cond_create(&cwp.confirm_cond); > + cwp.caller = fiber(); > cwp.is_confirm = false; > + cwp.is_rollback = false; > > /* Set triggers for the last limbo transaction. */ > struct trigger on_complete; > @@ -472,17 +484,26 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo) > struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo); > txn_on_commit(tle->txn, &on_complete); > txn_on_rollback(tle->txn, &on_rollback); > - > - int rc = fiber_cond_wait_timeout(&cwp.confirm_cond, > - txn_limbo_confirm_timeout(limbo)); > - fiber_cond_destroy(&cwp.confirm_cond); > - if (rc != 0) { > - /* Clear the triggers if the timeout has been reached. */ > - trigger_clear(&on_complete); > - trigger_clear(&on_rollback); > - diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT); > - return -1; > + double start_time = fiber_clock(); > + while (true) { > + double deadline = start_time + txn_limbo_confirm_timeout(limbo); > + bool cancellable = fiber_set_cancellable(false); > + double timeout = deadline - fiber_clock(); Maybe add a comment about the possible reasons of wake up (reconfiguration, triggers, timeout). > + int rc = fiber_cond_wait_timeout(&limbo->wait_cond, timeout); If I understand correctly, you use a trick for wake up from triggers. In this case fiber_wakeup invoke manually without using the fiber_cond API. In this context, I have few questions/comments: - IMHO, this is not true way to use implemantation details of fiber_cond passing over the fiber_cond API. - In the case, we don't interact with waiters list inside condition variable. Are you sure this is ok? > + fiber_set_cancellable(cancellable); > + if (cwp.is_confirm || cwp.is_rollback) > + goto complete; > + if (rc != 0) > + goto timed_out; > } > +timed_out: > + /* Clear the triggers if the timeout has been reached. */ > + trigger_clear(&on_complete); > + trigger_clear(&on_rollback); > + diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT); > + return -1; > + > +complete: > if (!cwp.is_confirm) { > /* The transaction has been rolled back. */ > diag_set(ClientError, ER_SYNC_ROLLBACK); > @@ -517,6 +538,39 @@ txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn) > } > } > > +void > +txn_limbo_on_parameters_change(struct txn_limbo *limbo) > +{ > + if (rlist_empty(&limbo->queue)) > + return; > + struct txn_limbo_entry *e; > + int64_t confirm_lsn = -1; > + rlist_foreach_entry(e, &limbo->queue, in_queue) { > + assert(e->ack_count <= VCLOCK_MAX); > + if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) { > + assert(e->lsn == -1); > + if (confirm_lsn == -1) > + continue; > + } else if (e->ack_count < replication_synchro_quorum) { > + continue; > + } else { > + confirm_lsn = e->lsn; > + assert(confirm_lsn > 0); > + } > + e->is_commit = true; > + } > + if (confirm_lsn > 0 && > + txn_limbo_write_confirm(limbo, confirm_lsn) != 0) { > + panic("Couldn't write CONFIRM to WAL"); > + return; > + } > + /* > + * Wakeup all. Confirmed will be committed. Timed out will > + * rollback. > + */ > + fiber_cond_broadcast(&limbo->wait_cond); > +} > + > void > txn_limbo_init(void) > { > diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h > index 0d56d0d69..1ee416231 100644 > --- a/src/box/txn_limbo.h > +++ b/src/box/txn_limbo.h > @@ -103,6 +103,13 @@ struct txn_limbo { > * LSNs in their vclock components. > */ > uint32_t instance_id; > + /** > + * Condition to wait for completion. It is supposed to be > + * signaled when the synchro parameters change. Allowing > + * the sleeping fibers to reconsider their timeouts when > + * the parameters are updated. > + */ > + struct fiber_cond wait_cond; > /** > * All components of the vclock are versions of the limbo > * owner's LSN, how it is visible on other nodes. For > @@ -219,6 +226,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo); > void > txn_limbo_force_empty(struct txn_limbo *limbo, int64_t last_confirm); > > +void > +txn_limbo_on_parameters_change(struct txn_limbo *limbo); > + > void > txn_limbo_init(); > > diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result > index 3e28607b0..59f5d9123 100644 > --- a/test/replication/qsync_basic.result > +++ b/test/replication/qsync_basic.result > @@ -475,6 +475,96 @@ box.space.sync:select{7} > | - - [7] > | ... > > +-- > +-- gh-5119: dynamic limbo configuration. Updated parameters should > +-- be applied even to existing transactions. > +-- > +test_run:switch('default') > + | --- > + | - true > + | ... > +box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000} > + | --- > + | ... > +ok, err = nil > + | --- > + | ... > +f = fiber.create(function() \ > + ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \ > +end) > + | --- > + | ... > +f:status() > + | --- > + | - suspended > + | ... > +box.cfg{replication_synchro_timeout = 0.001} > + | --- > + | ... > +test_run:wait_cond(function() return f:status() == 'dead' end) > + | --- > + | - true > + | ... > +ok, err > + | --- > + | - false > + | - Quorum collection for a synchronous transaction is timed out > + | ... > +box.space.sync:select{11} > + | --- > + | - [] > + | ... > +test_run:switch('replica') > + | --- > + | - true > + | ... > +box.space.sync:select{11} > + | --- > + | - [] > + | ... > + > +-- Test it is possible to early ACK a transaction with a new quorum. > +test_run:switch('default') > + | --- > + | - true > + | ... > +ok, err = nil > + | --- > + | ... > +f = fiber.create(function() \ > + ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \ > +end) > + | --- > + | ... > +f:status() > + | --- > + | - suspended > + | ... > +box.cfg{replication_synchro_quorum = 2} > + | --- > + | ... > +test_run:wait_cond(function() return f:status() == 'dead' end) > + | --- > + | - true > + | ... > +ok, err > + | --- > + | - true > + | - [12] > + | ... > +box.space.sync:select{12} > + | --- > + | - - [12] > + | ... > +test_run:switch('replica') > + | --- > + | - true > + | ... > +box.space.sync:select{12} > + | --- > + | - - [12] > + | ... > + > -- Cleanup. > test_run:cmd('switch default') > | --- > diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua > index 860d6d6c4..1bb3ba87d 100644 > --- a/test/replication/qsync_basic.test.lua > +++ b/test/replication/qsync_basic.test.lua > @@ -198,6 +198,38 @@ assert(newlsn >= oldlsn + 2) > test_run:switch('replica') > box.space.sync:select{7} > > +-- > +-- gh-5119: dynamic limbo configuration. Updated parameters should > +-- be applied even to existing transactions. > +-- > +test_run:switch('default') > +box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000} > +ok, err = nil > +f = fiber.create(function() \ > + ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \ > +end) > +f:status() > +box.cfg{replication_synchro_timeout = 0.001} > +test_run:wait_cond(function() return f:status() == 'dead' end) > +ok, err > +box.space.sync:select{11} > +test_run:switch('replica') > +box.space.sync:select{11} > + > +-- Test it is possible to early ACK a transaction with a new quorum. > +test_run:switch('default') > +ok, err = nil > +f = fiber.create(function() \ > + ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \ > +end) > +f:status() > +box.cfg{replication_synchro_quorum = 2} > +test_run:wait_cond(function() return f:status() == 'dead' end) > +ok, err > +box.space.sync:select{12} > +test_run:switch('replica') > +box.space.sync:select{12} > + > -- Cleanup. > test_run:cmd('switch default') > >