From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp55.i.mail.ru (smtp55.i.mail.ru [217.69.128.35]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 09D79445320 for ; Tue, 7 Jul 2020 01:43:18 +0300 (MSK) Received: by smtp55.i.mail.ru with esmtpa (envelope-from ) id 1jsZpV-0002VI-6H for tarantool-patches@dev.tarantool.org; Tue, 07 Jul 2020 01:43:17 +0300 From: Vladislav Shpilevoy Date: Tue, 7 Jul 2020 00:43:16 +0200 Message-Id: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Synchronous replication options - replication_synchro_quorum and replication_synchro_timeout - were not updated for the existing transactions on change. As a result, there could be weird inconsistencies, when a new transaction could have required quorum smaller than a previous transaction's, and could implicitly confirm it. The same could be told about rollback on timeout - new transactions could wake up earlier than older transactions. This patch makes configuration dynamic. So if the mentioned options are updated, they are applied to the existing transactions too. It opens wide administrative capabilities. For example, when replica count becomes less than the quorum, an administrator can lower the quorum dynamically, and it will be applied to all the existing transactions. Closes #5119 --- Branch: http://github.com/tarantool/tarantool/tree/gh-4842-sync-replication Issue 1: https://github.com/tarantool/tarantool/issues/4842 Issue 2: https://github.com/tarantool/tarantool/issues/5119 src/box/box.cc | 2 + src/box/txn_limbo.c | 162 +++++++++++++++++--------- src/box/txn_limbo.h | 10 ++ test/replication/qsync_basic.result | 90 ++++++++++++++ test/replication/qsync_basic.test.lua | 32 +++++ 5 files changed, 242 insertions(+), 54 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 5e28276f0..e15ae0e44 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -865,6 +865,7 @@ box_set_replication_synchro_quorum(void) if (value < 0) return -1; replication_synchro_quorum = value; + txn_limbo_on_parameters_change(&txn_limbo); return 0; } @@ -875,6 +876,7 @@ box_set_replication_synchro_timeout(void) if (value < 0) return -1; replication_synchro_timeout = value; + txn_limbo_on_parameters_change(&txn_limbo); return 0; } diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index e28e2016f..2575f4c25 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -39,6 +39,7 @@ txn_limbo_create(struct txn_limbo *limbo) { rlist_create(&limbo->queue); limbo->instance_id = REPLICA_ID_NIL; + fiber_cond_create(&limbo->wait_cond); vclock_create(&limbo->vclock); limbo->rollback_count = 0; } @@ -159,45 +160,56 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) assert(!txn_has_flag(txn, TXN_IS_DONE)); assert(txn_has_flag(txn, TXN_WAIT_SYNC)); - bool cancellable = fiber_set_cancellable(false); - bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo)); - fiber_set_cancellable(cancellable); - if (timed_out) { - assert(!txn_limbo_is_empty(limbo)); - if (txn_limbo_first_entry(limbo) != entry) { - /* - * If this is not a first entry in the - * limbo, it is definitely not a first - * timed out entry. And since it managed - * to time out too, it means there is - * currently another fiber writing - * rollback. Wait when it will finish and - * wake us up. - */ - bool cancellable = fiber_set_cancellable(false); - fiber_yield(); - fiber_set_cancellable(cancellable); - assert(txn_limbo_entry_is_complete(entry)); + double start_time = fiber_clock(); + while (true) { + double deadline = start_time + txn_limbo_confirm_timeout(limbo); + bool cancellable = fiber_set_cancellable(false); + double timeout = deadline - fiber_clock(); + bool timed_out = fiber_cond_wait_timeout(&limbo->wait_cond, + timeout); + fiber_set_cancellable(cancellable); + if (txn_limbo_entry_is_complete(entry)) goto complete; - } + if (timed_out) + goto do_rollback; + } - txn_limbo_write_rollback(limbo, entry->lsn); - struct txn_limbo_entry *e, *tmp; - rlist_foreach_entry_safe_reverse(e, &limbo->queue, - in_queue, tmp) { - e->is_rollback = true; - e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT; - txn_limbo_pop(limbo, e); - txn_clear_flag(e->txn, TXN_WAIT_SYNC); - txn_clear_flag(e->txn, TXN_WAIT_ACK); - txn_complete(e->txn); - if (e == entry) - break; - fiber_wakeup(e->txn->fiber); - } - diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT); - return -1; +do_rollback: + assert(!txn_limbo_is_empty(limbo)); + if (txn_limbo_first_entry(limbo) != entry) { + /* + * If this is not a first entry in the limbo, it + * is definitely not a first timed out entry. And + * since it managed to time out too, it means + * there is currently another fiber writing + * rollback. Wait when it will finish and wake us + * up. + */ + bool cancellable = fiber_set_cancellable(false); + do { + fiber_yield(); + } while (!txn_limbo_entry_is_complete(entry)); + fiber_set_cancellable(cancellable); + goto complete; } + + txn_limbo_write_rollback(limbo, entry->lsn); + struct txn_limbo_entry *e, *tmp; + rlist_foreach_entry_safe_reverse(e, &limbo->queue, + in_queue, tmp) { + e->is_rollback = true; + e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT; + txn_limbo_pop(limbo, e); + txn_clear_flag(e->txn, TXN_WAIT_SYNC); + txn_clear_flag(e->txn, TXN_WAIT_ACK); + txn_complete(e->txn); + if (e == entry) + break; + fiber_wakeup(e->txn->fiber); + } + diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT); + return -1; + complete: assert(txn_limbo_entry_is_complete(entry)); /* @@ -421,15 +433,13 @@ txn_limbo_confirm_timeout(struct txn_limbo *limbo) * or array instead of the boolean. */ struct confirm_waitpoint { - /** - * Variable for wake up the fiber that is waiting for - * the end of confirmation. - */ - struct fiber_cond confirm_cond; + /** Fiber that is waiting for the end of confirmation. */ + struct fiber *caller; /** * Result flag. */ bool is_confirm; + bool is_rollback; }; static int @@ -439,7 +449,7 @@ txn_commit_cb(struct trigger *trigger, void *event) struct confirm_waitpoint *cwp = (struct confirm_waitpoint *)trigger->data; cwp->is_confirm = true; - fiber_cond_signal(&cwp->confirm_cond); + fiber_wakeup(cwp->caller); return 0; } @@ -449,7 +459,8 @@ txn_rollback_cb(struct trigger *trigger, void *event) (void)event; struct confirm_waitpoint *cwp = (struct confirm_waitpoint *)trigger->data; - fiber_cond_signal(&cwp->confirm_cond); + cwp->is_rollback = true; + fiber_wakeup(cwp->caller); return 0; } @@ -461,8 +472,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo) /* initialization of a waitpoint. */ struct confirm_waitpoint cwp; - fiber_cond_create(&cwp.confirm_cond); + cwp.caller = fiber(); cwp.is_confirm = false; + cwp.is_rollback = false; /* Set triggers for the last limbo transaction. */ struct trigger on_complete; @@ -472,17 +484,26 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo) struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo); txn_on_commit(tle->txn, &on_complete); txn_on_rollback(tle->txn, &on_rollback); - - int rc = fiber_cond_wait_timeout(&cwp.confirm_cond, - txn_limbo_confirm_timeout(limbo)); - fiber_cond_destroy(&cwp.confirm_cond); - if (rc != 0) { - /* Clear the triggers if the timeout has been reached. */ - trigger_clear(&on_complete); - trigger_clear(&on_rollback); - diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT); - return -1; + double start_time = fiber_clock(); + while (true) { + double deadline = start_time + txn_limbo_confirm_timeout(limbo); + bool cancellable = fiber_set_cancellable(false); + double timeout = deadline - fiber_clock(); + int rc = fiber_cond_wait_timeout(&limbo->wait_cond, timeout); + fiber_set_cancellable(cancellable); + if (cwp.is_confirm || cwp.is_rollback) + goto complete; + if (rc != 0) + goto timed_out; } +timed_out: + /* Clear the triggers if the timeout has been reached. */ + trigger_clear(&on_complete); + trigger_clear(&on_rollback); + diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT); + return -1; + +complete: if (!cwp.is_confirm) { /* The transaction has been rolled back. */ diag_set(ClientError, ER_SYNC_ROLLBACK); @@ -517,6 +538,39 @@ txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn) } } +void +txn_limbo_on_parameters_change(struct txn_limbo *limbo) +{ + if (rlist_empty(&limbo->queue)) + return; + struct txn_limbo_entry *e; + int64_t confirm_lsn = -1; + rlist_foreach_entry(e, &limbo->queue, in_queue) { + assert(e->ack_count <= VCLOCK_MAX); + if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) { + assert(e->lsn == -1); + if (confirm_lsn == -1) + continue; + } else if (e->ack_count < replication_synchro_quorum) { + continue; + } else { + confirm_lsn = e->lsn; + assert(confirm_lsn > 0); + } + e->is_commit = true; + } + if (confirm_lsn > 0 && + txn_limbo_write_confirm(limbo, confirm_lsn) != 0) { + panic("Couldn't write CONFIRM to WAL"); + return; + } + /* + * Wakeup all. Confirmed will be committed. Timed out will + * rollback. + */ + fiber_cond_broadcast(&limbo->wait_cond); +} + void txn_limbo_init(void) { diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index 0d56d0d69..1ee416231 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -103,6 +103,13 @@ struct txn_limbo { * LSNs in their vclock components. */ uint32_t instance_id; + /** + * Condition to wait for completion. It is supposed to be + * signaled when the synchro parameters change. Allowing + * the sleeping fibers to reconsider their timeouts when + * the parameters are updated. + */ + struct fiber_cond wait_cond; /** * All components of the vclock are versions of the limbo * owner's LSN, how it is visible on other nodes. For @@ -219,6 +226,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo); void txn_limbo_force_empty(struct txn_limbo *limbo, int64_t last_confirm); +void +txn_limbo_on_parameters_change(struct txn_limbo *limbo); + void txn_limbo_init(); diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result index 3e28607b0..59f5d9123 100644 --- a/test/replication/qsync_basic.result +++ b/test/replication/qsync_basic.result @@ -475,6 +475,96 @@ box.space.sync:select{7} | - - [7] | ... +-- +-- gh-5119: dynamic limbo configuration. Updated parameters should +-- be applied even to existing transactions. +-- +test_run:switch('default') + | --- + | - true + | ... +box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000} + | --- + | ... +ok, err = nil + | --- + | ... +f = fiber.create(function() \ + ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \ +end) + | --- + | ... +f:status() + | --- + | - suspended + | ... +box.cfg{replication_synchro_timeout = 0.001} + | --- + | ... +test_run:wait_cond(function() return f:status() == 'dead' end) + | --- + | - true + | ... +ok, err + | --- + | - false + | - Quorum collection for a synchronous transaction is timed out + | ... +box.space.sync:select{11} + | --- + | - [] + | ... +test_run:switch('replica') + | --- + | - true + | ... +box.space.sync:select{11} + | --- + | - [] + | ... + +-- Test it is possible to early ACK a transaction with a new quorum. +test_run:switch('default') + | --- + | - true + | ... +ok, err = nil + | --- + | ... +f = fiber.create(function() \ + ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \ +end) + | --- + | ... +f:status() + | --- + | - suspended + | ... +box.cfg{replication_synchro_quorum = 2} + | --- + | ... +test_run:wait_cond(function() return f:status() == 'dead' end) + | --- + | - true + | ... +ok, err + | --- + | - true + | - [12] + | ... +box.space.sync:select{12} + | --- + | - - [12] + | ... +test_run:switch('replica') + | --- + | - true + | ... +box.space.sync:select{12} + | --- + | - - [12] + | ... + -- Cleanup. test_run:cmd('switch default') | --- diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua index 860d6d6c4..1bb3ba87d 100644 --- a/test/replication/qsync_basic.test.lua +++ b/test/replication/qsync_basic.test.lua @@ -198,6 +198,38 @@ assert(newlsn >= oldlsn + 2) test_run:switch('replica') box.space.sync:select{7} +-- +-- gh-5119: dynamic limbo configuration. Updated parameters should +-- be applied even to existing transactions. +-- +test_run:switch('default') +box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000} +ok, err = nil +f = fiber.create(function() \ + ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \ +end) +f:status() +box.cfg{replication_synchro_timeout = 0.001} +test_run:wait_cond(function() return f:status() == 'dead' end) +ok, err +box.space.sync:select{11} +test_run:switch('replica') +box.space.sync:select{11} + +-- Test it is possible to early ACK a transaction with a new quorum. +test_run:switch('default') +ok, err = nil +f = fiber.create(function() \ + ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \ +end) +f:status() +box.cfg{replication_synchro_quorum = 2} +test_run:wait_cond(function() return f:status() == 'dead' end) +ok, err +box.space.sync:select{12} +test_run:switch('replica') +box.space.sync:select{12} + -- Cleanup. test_run:cmd('switch default') -- 2.21.1 (Apple Git-122.3)