From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp36.i.mail.ru (smtp36.i.mail.ru [94.100.177.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id AAB0542EF6A for ; Tue, 30 Jun 2020 02:16:00 +0300 (MSK) From: Vladislav Shpilevoy Date: Tue, 30 Jun 2020 01:15:27 +0200 Message-Id: <678db6688752c63074d87eb987aee60114e038ae.1593472477.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v2 08/19] replication: add support of qsync to the snapshot machinery List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org From: Leonid Vasiliev To support qsync replication, the waiting for confirmation of current "sync" transactions during a timeout has been added to the snapshot machinery. In the case of rollback or the timeout expiration, the snapshot will be cancelled. Closes #4850 --- src/box/gc.c | 12 ++ src/box/txn_limbo.c | 81 ++++++++++ src/box/txn_limbo.h | 29 ++++ test/unit/CMakeLists.txt | 3 + test/unit/snap_quorum_delay.cc | 250 +++++++++++++++++++++++++++++ test/unit/snap_quorum_delay.result | 12 ++ 6 files changed, 387 insertions(+) create mode 100644 test/unit/snap_quorum_delay.cc create mode 100644 test/unit/snap_quorum_delay.result diff --git a/src/box/gc.c b/src/box/gc.c index 8e8ffea75..170c0a97f 100644 --- a/src/box/gc.c +++ b/src/box/gc.c @@ -57,6 +57,9 @@ #include "engine.h" /* engine_collect_garbage() */ #include "wal.h" /* wal_collect_garbage() */ #include "checkpoint_schedule.h" +#include "trigger.h" +#include "txn.h" +#include "txn_limbo.h" struct gc_state gc; @@ -395,6 +398,15 @@ gc_do_checkpoint(bool is_scheduled) rc = wal_begin_checkpoint(&checkpoint); if (rc != 0) goto out; + + /* + * Wait the confirms on all "sync" transactions before + * create a snapshot. + */ + rc = txn_limbo_wait_confirm(&txn_limbo); + if (rc != 0) + goto out; + rc = engine_commit_checkpoint(&checkpoint.vclock); if (rc != 0) goto out; diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index b38d82e4f..bee8e8155 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -231,6 +231,87 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) } } +double +txn_limbo_confirm_timeout(struct txn_limbo *limbo) +{ + (void)limbo; + return replication_synchro_timeout; +} + +/** + * Waitpoint stores information about the progress of confirmation. + * In the case of multimaster support, it will store a bitset + * or array instead of the boolean. + */ +struct confirm_waitpoint { + /** + * Variable for wake up the fiber that is waiting for + * the end of confirmation. + */ + struct fiber_cond confirm_cond; + /** + * Result flag. + */ + bool is_confirm; +}; + +static int +txn_commit_cb(struct trigger *trigger, void *event) +{ + (void)event; + struct confirm_waitpoint *cwp = + (struct confirm_waitpoint *)trigger->data; + cwp->is_confirm = true; + fiber_cond_signal(&cwp->confirm_cond); + return 0; +} + +static int +txn_rollback_cb(struct trigger *trigger, void *event) +{ + (void)event; + struct confirm_waitpoint *cwp = + (struct confirm_waitpoint *)trigger->data; + fiber_cond_signal(&cwp->confirm_cond); + return 0; +} + +int +txn_limbo_wait_confirm(struct txn_limbo *limbo) +{ + if (txn_limbo_is_empty(limbo)) + return 0; + + /* initialization of a waitpoint. */ + struct confirm_waitpoint cwp; + fiber_cond_create(&cwp.confirm_cond); + cwp.is_confirm = false; + + /* Set triggers for the last limbo transaction. */ + struct trigger on_complete; + trigger_create(&on_complete, txn_commit_cb, &cwp, NULL); + struct trigger on_rollback; + trigger_create(&on_rollback, txn_rollback_cb, &cwp, NULL); + struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo); + txn_on_commit(tle->txn, &on_complete); + txn_on_rollback(tle->txn, &on_rollback); + + int rc = fiber_cond_wait_timeout(&cwp.confirm_cond, + txn_limbo_confirm_timeout(limbo)); + fiber_cond_destroy(&cwp.confirm_cond); + if (rc != 0) { + /* Clear the triggers if the timeout has been reached. */ + trigger_clear(&on_complete); + trigger_clear(&on_rollback); + return -1; + } + if (!cwp.is_confirm) { + /* The transaction has been rolled back. */ + return -1; + } + return 0; +} + void txn_limbo_init(void) { diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index de415cd97..94f224131 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -166,6 +166,35 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); void txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn); +/** + * Return TRUE if limbo is empty. + */ +static inline bool +txn_limbo_is_empty(struct txn_limbo *limbo) +{ + return rlist_empty(&limbo->queue); +} + +/** + * Return a pointer to the last txn_limbo_entry of limbo. + */ +static inline struct txn_limbo_entry * +txn_limbo_last_entry(struct txn_limbo *limbo) +{ + return rlist_last_entry(&limbo->queue, struct txn_limbo_entry, + in_queue); +} + +double +txn_limbo_confirm_timeout(struct txn_limbo *limbo); + +/** + * Waiting for confirmation of all "sync" transactions + * during confirm timeout or fail. + */ +int +txn_limbo_wait_confirm(struct txn_limbo *limbo); + void txn_limbo_init(); diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 672122118..419477748 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -257,6 +257,9 @@ target_link_libraries(swim_errinj.test unit swim) add_executable(merger.test merger.test.c) target_link_libraries(merger.test unit core box) +add_executable(snap_quorum_delay.test snap_quorum_delay.cc) +target_link_libraries(snap_quorum_delay.test box core unit) + # # Client for popen.test add_executable(popen-child popen-child.c) diff --git a/test/unit/snap_quorum_delay.cc b/test/unit/snap_quorum_delay.cc new file mode 100644 index 000000000..7a200673a --- /dev/null +++ b/test/unit/snap_quorum_delay.cc @@ -0,0 +1,250 @@ +/* + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and iproto forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in iproto form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "unit.h" +#include "gc.h" +#include "memory.h" +#include "txn.h" +#include "txn_limbo.h" + +/** + * This test is only about delay in snapshot machinery (needed + * for qsync replication). It doesn't test the snapshot + * machinery, txn_limbo or something else and uses some tricks + * around txn_limbo. + * The logic of the test is as folows: + * In fiber_1 ("txn_fiber"): + * - start a transaction. + * - push the transaction to the limbo. + * - start wait confirm (yield). + * In fiber_2 ("main"): + * - do a snapshot. + * - start wait while the last transaction + * from the limbo will be completed. + * In fiber_3 ("confirm_fiber"): + * - confirm the transaction (remove the transaction from + * the limbo and wakeup fiber_1). + * In fiber_1 ("txn_fiber"): + * - confirm/rollback/hung the transaction. + * In fiber_2 ("main"): + * - check_results + */ + +extern int replication_synchro_quorum; +extern double replication_synchro_timeout; + +namespace /* local symbols */ { + +int test_result; + +/** + * Variations of a transaction completion. + */ +enum process_type { + TXN_PROCESS_COMMIT, + TXN_PROCESS_ROLLBACK, + TXN_PROCESS_TIMEOUT +}; + +/** + * Some fake values needed for work with the limbo + * (to push a transaction to the limbo and simulate confirm). + */ +const int fake_lsn = 1; +const int instace_id = 1; +const int relay_id = 2; + +int +trg_cb(struct trigger *trigger, void *event) +{ + (void)event; + bool *check_trg = (bool *)trigger->data; + *check_trg = true; + return 0; +} + +int +txn_process_func(va_list ap) +{ + bool *check_trg = va_arg(ap, bool *); + enum process_type process_type = (enum process_type)va_arg(ap, int); + struct txn *txn = txn_begin(); + txn->fiber = fiber(); + /* Set the TXN_WAIT_ACK flag to simulate a sync transaction.*/ + txn_set_flag(txn, TXN_WAIT_ACK); + /* + * The true way to push the transaction to limbo is to call + * txn_commit() for sync transaction. But, if txn_commit() + * will be called now, the transaction will not be pushed to + * the limbo because this is the case txn_commit_nop(). + * Instead, we push the transaction to the limbo manually + * and call txn_commit (or another) later. + */ + struct txn_limbo_entry *entry = txn_limbo_append(&txn_limbo, + instace_id, txn); + /* + * The trigger is used to verify that the transaction has been + * completed. + */ + struct trigger trg; + trigger_create(&trg, trg_cb, check_trg, NULL); + + switch (process_type) { + case TXN_PROCESS_COMMIT: + txn_on_commit(txn, &trg); + break; + case TXN_PROCESS_ROLLBACK: + txn_on_rollback(txn, &trg); + break; + case TXN_PROCESS_TIMEOUT: + break; + default: + unreachable(); + } + + txn_limbo_assign_lsn(&txn_limbo, entry, fake_lsn); + txn_limbo_wait_complete(&txn_limbo, entry); + + switch (process_type) { + case TXN_PROCESS_COMMIT: + txn_commit(txn); + break; + case TXN_PROCESS_ROLLBACK: + txn_rollback(txn); + break; + case TXN_PROCESS_TIMEOUT: + fiber_yield(); + break; + default: + unreachable(); + } + return 0; +} + +int +txn_confirm_func(va_list ap) +{ + /* + * We shouldn't react on gc_wait_cleanup() yield + * inside gc_checkpoint(). + */ + fiber_sleep(0); + txn_limbo_ack(&txn_limbo, relay_id, fake_lsn); + return 0; +} + + +void +test_snap_delay_common(enum process_type process_type) +{ + plan(1); + + /* + * We need to clear the limbo vclock before the new test + * variation because the same fake lsn will be used. + */ + vclock_clear(&txn_limbo.vclock); + vclock_create(&txn_limbo.vclock); + + bool check_trg = false; + struct fiber *txn_fiber = fiber_new("txn_fiber", txn_process_func); + fiber_start(txn_fiber, &check_trg, process_type); + + struct fiber *confirm_entry = fiber_new("confirm_fiber", + txn_confirm_func); + fiber_wakeup(confirm_entry); + + switch (process_type) { + case TXN_PROCESS_COMMIT: + ok(gc_checkpoint() == 0 && check_trg, + "check snapshot delay confirm"); + break; + case TXN_PROCESS_ROLLBACK: + ok(gc_checkpoint() == -1 && check_trg, + "check snapshot delay rollback"); + break; + case TXN_PROCESS_TIMEOUT: + ok(gc_checkpoint() == -1, "check snapshot delay timeout"); + /* join the "hung" fiber */ + fiber_set_joinable(txn_fiber, true); + fiber_cancel(txn_fiber); + fiber_join(txn_fiber); + break; + default: + unreachable(); + } + check_plan(); +} + +void +test_snap_delay_timeout() +{ + /* Set the timeout to a small value for the test. */ + replication_synchro_timeout = 0.01; + test_snap_delay_common(TXN_PROCESS_TIMEOUT); +} + +int +test_snap_delay(va_list ap) +{ + header(); + plan(3); + (void)ap; + replication_synchro_quorum = 2; + + test_snap_delay_common(TXN_PROCESS_COMMIT); + test_snap_delay_common(TXN_PROCESS_ROLLBACK); + test_snap_delay_timeout(); + + ev_break(loop(), EVBREAK_ALL); + footer(); + test_result = check_plan(); + return 0; +} +} /* end of anonymous namespace */ + +int +main(void) +{ + memory_init(); + fiber_init(fiber_c_invoke); + gc_init(); + txn_limbo_init(); + + struct fiber *main_fiber = fiber_new("main", test_snap_delay); + assert(main_fiber != NULL); + fiber_wakeup(main_fiber); + ev_run(loop(), 0); + + gc_free(); + fiber_free(); + memory_free(); + return test_result; +} diff --git a/test/unit/snap_quorum_delay.result b/test/unit/snap_quorum_delay.result new file mode 100644 index 000000000..6ca213391 --- /dev/null +++ b/test/unit/snap_quorum_delay.result @@ -0,0 +1,12 @@ + *** test_snap_delay *** +1..3 + 1..1 + ok 1 - check snapshot delay confirm +ok 1 - subtests + 1..1 + ok 1 - check snapshot delay rollback +ok 2 - subtests + 1..1 + ok 1 - check snapshot delay timeout +ok 3 - subtests + *** test_snap_delay: done *** -- 2.21.1 (Apple Git-122.3)