From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp49.i.mail.ru (smtp49.i.mail.ru [94.100.177.109]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 9A2FA42EF5C for ; Thu, 2 Jul 2020 11:52:15 +0300 (MSK) References: <678db6688752c63074d87eb987aee60114e038ae.1593472477.git.v.shpilevoy@tarantool.org> From: Serge Petrenko Message-ID: <80940829-f967-edd9-b8c0-1245564ffbc1@tarantool.org> Date: Thu, 2 Jul 2020 11:52:14 +0300 MIME-Version: 1.0 In-Reply-To: <678db6688752c63074d87eb987aee60114e038ae.1593472477.git.v.shpilevoy@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH v2 08/19] replication: add support of qsync to the snapshot machinery List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org 30.06.2020 02:15, Vladislav Shpilevoy пишет: > From: Leonid Vasiliev > > To support qsync replication, the waiting for confirmation of > current "sync" transactions during a timeout has been added to > the snapshot machinery. In the case of rollback or the timeout > expiration, the snapshot will be cancelled. > > Closes #4850 > --- > src/box/gc.c | 12 ++ > src/box/txn_limbo.c | 81 ++++++++++ > src/box/txn_limbo.h | 29 ++++ > test/unit/CMakeLists.txt | 3 + > test/unit/snap_quorum_delay.cc | 250 +++++++++++++++++++++++++++++ > test/unit/snap_quorum_delay.result | 12 ++ > 6 files changed, 387 insertions(+) > create mode 100644 test/unit/snap_quorum_delay.cc > create mode 100644 test/unit/snap_quorum_delay.result > > diff --git a/src/box/gc.c b/src/box/gc.c > index 8e8ffea75..170c0a97f 100644 > --- a/src/box/gc.c > +++ b/src/box/gc.c > @@ -57,6 +57,9 @@ > #include "engine.h" /* engine_collect_garbage() */ > #include "wal.h" /* wal_collect_garbage() */ > #include "checkpoint_schedule.h" > +#include "trigger.h" > +#include "txn.h" > +#include "txn_limbo.h" > > struct gc_state gc; > > @@ -395,6 +398,15 @@ gc_do_checkpoint(bool is_scheduled) > rc = wal_begin_checkpoint(&checkpoint); > if (rc != 0) > goto out; > + > + /* > + * Wait the confirms on all "sync" transactions before > + * create a snapshot. > + */ > + rc = txn_limbo_wait_confirm(&txn_limbo); > + if (rc != 0) > + goto out; > + > rc = engine_commit_checkpoint(&checkpoint.vclock); > if (rc != 0) > goto out; > diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c > index b38d82e4f..bee8e8155 100644 > --- a/src/box/txn_limbo.c > +++ b/src/box/txn_limbo.c > @@ -231,6 +231,87 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) > } > } > > +double > +txn_limbo_confirm_timeout(struct txn_limbo *limbo) > +{ > + (void)limbo; > + return replication_synchro_timeout; > +} > + > +/** > + * Waitpoint stores information about the progress of confirmation. > + * In the case of multimaster support, it will store a bitset > + * or array instead of the boolean. > + */ > +struct confirm_waitpoint { > + /** > + * Variable for wake up the fiber that is waiting for > + * the end of confirmation. > + */ > + struct fiber_cond confirm_cond; > + /** > + * Result flag. > + */ > + bool is_confirm; > +}; > + > +static int > +txn_commit_cb(struct trigger *trigger, void *event) > +{ > + (void)event; > + struct confirm_waitpoint *cwp = > + (struct confirm_waitpoint *)trigger->data; > + cwp->is_confirm = true; > + fiber_cond_signal(&cwp->confirm_cond); > + return 0; > +} > + > +static int > +txn_rollback_cb(struct trigger *trigger, void *event) > +{ > + (void)event; > + struct confirm_waitpoint *cwp = > + (struct confirm_waitpoint *)trigger->data; > + fiber_cond_signal(&cwp->confirm_cond); > + return 0; > +} > + > +int > +txn_limbo_wait_confirm(struct txn_limbo *limbo) > +{ > + if (txn_limbo_is_empty(limbo)) > + return 0; > + > + /* initialization of a waitpoint. */ > + struct confirm_waitpoint cwp; > + fiber_cond_create(&cwp.confirm_cond); > + cwp.is_confirm = false; > + > + /* Set triggers for the last limbo transaction. */ > + struct trigger on_complete; > + trigger_create(&on_complete, txn_commit_cb, &cwp, NULL); > + struct trigger on_rollback; > + trigger_create(&on_rollback, txn_rollback_cb, &cwp, NULL); > + struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo); > + txn_on_commit(tle->txn, &on_complete); > + txn_on_rollback(tle->txn, &on_rollback); > + > + int rc = fiber_cond_wait_timeout(&cwp.confirm_cond, > + txn_limbo_confirm_timeout(limbo)); > + fiber_cond_destroy(&cwp.confirm_cond); > + if (rc != 0) { > + /* Clear the triggers if the timeout has been reached. */ > + trigger_clear(&on_complete); > + trigger_clear(&on_rollback); > + return -1; > + } > + if (!cwp.is_confirm) { > + /* The transaction has been rolled back. */ > + return -1; > + } > + return 0; > +} > + > void > txn_limbo_init(void) > { > diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h > index de415cd97..94f224131 100644 > --- a/src/box/txn_limbo.h > +++ b/src/box/txn_limbo.h > @@ -166,6 +166,35 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); > void > txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn); > > +/** > + * Return TRUE if limbo is empty. > + */ > +static inline bool > +txn_limbo_is_empty(struct txn_limbo *limbo) > +{ > + return rlist_empty(&limbo->queue); > +} > + > +/** > + * Return a pointer to the last txn_limbo_entry of limbo. > + */ > +static inline struct txn_limbo_entry * > +txn_limbo_last_entry(struct txn_limbo *limbo) > +{ > + return rlist_last_entry(&limbo->queue, struct txn_limbo_entry, > + in_queue); > +} > + > +double > +txn_limbo_confirm_timeout(struct txn_limbo *limbo); > + > +/** > + * Waiting for confirmation of all "sync" transactions > + * during confirm timeout or fail. > + */ > +int > +txn_limbo_wait_confirm(struct txn_limbo *limbo); > + > void > txn_limbo_init(); > > diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt > index 672122118..419477748 100644 > --- a/test/unit/CMakeLists.txt > +++ b/test/unit/CMakeLists.txt > @@ -257,6 +257,9 @@ target_link_libraries(swim_errinj.test unit swim) > add_executable(merger.test merger.test.c) > target_link_libraries(merger.test unit core box) > > +add_executable(snap_quorum_delay.test snap_quorum_delay.cc) > +target_link_libraries(snap_quorum_delay.test box core unit) > + > # > # Client for popen.test > add_executable(popen-child popen-child.c) > diff --git a/test/unit/snap_quorum_delay.cc b/test/unit/snap_quorum_delay.cc > new file mode 100644 > index 000000000..7a200673a > --- /dev/null > +++ b/test/unit/snap_quorum_delay.cc > @@ -0,0 +1,250 @@ > +/* > + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. > + * > + * Redistribution and use in source and iproto forms, with or > + * without modification, are permitted provided that the following > + * conditions are met: > + * > + * 1. Redistributions of source code must retain the above > + * copyright notice, this list of conditions and the > + * following disclaimer. > + * > + * 2. Redistributions in iproto form must reproduce the above > + * copyright notice, this list of conditions and the following > + * disclaimer in the documentation and/or other materials > + * provided with the distribution. > + * > + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL > + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > + * SUCH DAMAGE. > + */ > +#include "unit.h" > +#include "gc.h" > +#include "memory.h" > +#include "txn.h" > +#include "txn_limbo.h" > + > +/** > + * This test is only about delay in snapshot machinery (needed > + * for qsync replication). It doesn't test the snapshot > + * machinery, txn_limbo or something else and uses some tricks > + * around txn_limbo. > + * The logic of the test is as folows: > + * In fiber_1 ("txn_fiber"): > + * - start a transaction. > + * - push the transaction to the limbo. > + * - start wait confirm (yield). > + * In fiber_2 ("main"): > + * - do a snapshot. > + * - start wait while the last transaction > + * from the limbo will be completed. > + * In fiber_3 ("confirm_fiber"): > + * - confirm the transaction (remove the transaction from > + * the limbo and wakeup fiber_1). > + * In fiber_1 ("txn_fiber"): > + * - confirm/rollback/hung the transaction. > + * In fiber_2 ("main"): > + * - check_results > + */ > + > +extern int replication_synchro_quorum; > +extern double replication_synchro_timeout; > + > +namespace /* local symbols */ { > + > +int test_result; > + > +/** > + * Variations of a transaction completion. > + */ > +enum process_type { > + TXN_PROCESS_COMMIT, > + TXN_PROCESS_ROLLBACK, > + TXN_PROCESS_TIMEOUT > +}; > + > +/** > + * Some fake values needed for work with the limbo > + * (to push a transaction to the limbo and simulate confirm). > + */ > +const int fake_lsn = 1; > +const int instace_id = 1; > +const int relay_id = 2; > + > +int > +trg_cb(struct trigger *trigger, void *event) > +{ > + (void)event; > + bool *check_trg = (bool *)trigger->data; > + *check_trg = true; > + return 0; > +} > + > +int > +txn_process_func(va_list ap) > +{ > + bool *check_trg = va_arg(ap, bool *); > + enum process_type process_type = (enum process_type)va_arg(ap, int); > + struct txn *txn = txn_begin(); > + txn->fiber = fiber(); > + /* Set the TXN_WAIT_ACK flag to simulate a sync transaction.*/ > + txn_set_flag(txn, TXN_WAIT_ACK); > + /* > + * The true way to push the transaction to limbo is to call > + * txn_commit() for sync transaction. But, if txn_commit() > + * will be called now, the transaction will not be pushed to > + * the limbo because this is the case txn_commit_nop(). > + * Instead, we push the transaction to the limbo manually > + * and call txn_commit (or another) later. > + */ > + struct txn_limbo_entry *entry = txn_limbo_append(&txn_limbo, > + instace_id, txn); > + /* > + * The trigger is used to verify that the transaction has been > + * completed. > + */ > + struct trigger trg; > + trigger_create(&trg, trg_cb, check_trg, NULL); > + > + switch (process_type) { > + case TXN_PROCESS_COMMIT: > + txn_on_commit(txn, &trg); > + break; > + case TXN_PROCESS_ROLLBACK: > + txn_on_rollback(txn, &trg); > + break; > + case TXN_PROCESS_TIMEOUT: > + break; > + default: > + unreachable(); > + } > + > + txn_limbo_assign_lsn(&txn_limbo, entry, fake_lsn); > + txn_limbo_wait_complete(&txn_limbo, entry); > + > + switch (process_type) { > + case TXN_PROCESS_COMMIT: > + txn_commit(txn); > + break; > + case TXN_PROCESS_ROLLBACK: > + txn_rollback(txn); > + break; > + case TXN_PROCESS_TIMEOUT: > + fiber_yield(); > + break; > + default: > + unreachable(); > + } > + return 0; > +} > + > +int > +txn_confirm_func(va_list ap) > +{ > + /* > + * We shouldn't react on gc_wait_cleanup() yield > + * inside gc_checkpoint(). > + */ > + fiber_sleep(0); > + txn_limbo_ack(&txn_limbo, relay_id, fake_lsn); > + return 0; > +} > + > + > +void > +test_snap_delay_common(enum process_type process_type) > +{ > + plan(1); > + > + /* > + * We need to clear the limbo vclock before the new test > + * variation because the same fake lsn will be used. > + */ > + vclock_clear(&txn_limbo.vclock); > + vclock_create(&txn_limbo.vclock); > + > + bool check_trg = false; > + struct fiber *txn_fiber = fiber_new("txn_fiber", txn_process_func); > + fiber_start(txn_fiber, &check_trg, process_type); > + > + struct fiber *confirm_entry = fiber_new("confirm_fiber", > + txn_confirm_func); > + fiber_wakeup(confirm_entry); > + > + switch (process_type) { > + case TXN_PROCESS_COMMIT: > + ok(gc_checkpoint() == 0 && check_trg, > + "check snapshot delay confirm"); > + break; > + case TXN_PROCESS_ROLLBACK: > + ok(gc_checkpoint() == -1 && check_trg, > + "check snapshot delay rollback"); > + break; > + case TXN_PROCESS_TIMEOUT: > + ok(gc_checkpoint() == -1, "check snapshot delay timeout"); > + /* join the "hung" fiber */ > + fiber_set_joinable(txn_fiber, true); > + fiber_cancel(txn_fiber); > + fiber_join(txn_fiber); > + break; > + default: > + unreachable(); > + } > + check_plan(); > +} > + > +void > +test_snap_delay_timeout() > +{ > + /* Set the timeout to a small value for the test. */ > + replication_synchro_timeout = 0.01; > + test_snap_delay_common(TXN_PROCESS_TIMEOUT); > +} > + > +int > +test_snap_delay(va_list ap) > +{ > + header(); > + plan(3); > + (void)ap; > + replication_synchro_quorum = 2; > + > + test_snap_delay_common(TXN_PROCESS_COMMIT); > + test_snap_delay_common(TXN_PROCESS_ROLLBACK); > + test_snap_delay_timeout(); > + > + ev_break(loop(), EVBREAK_ALL); > + footer(); > + test_result = check_plan(); > + return 0; > +} > +} /* end of anonymous namespace */ > + > +int > +main(void) > +{ > + memory_init(); > + fiber_init(fiber_c_invoke); > + gc_init(); > + txn_limbo_init(); > + > + struct fiber *main_fiber = fiber_new("main", test_snap_delay); > + assert(main_fiber != NULL); > + fiber_wakeup(main_fiber); > + ev_run(loop(), 0); > + > + gc_free(); > + fiber_free(); > + memory_free(); > + return test_result; > +} > diff --git a/test/unit/snap_quorum_delay.result b/test/unit/snap_quorum_delay.result > new file mode 100644 > index 000000000..6ca213391 > --- /dev/null > +++ b/test/unit/snap_quorum_delay.result > @@ -0,0 +1,12 @@ > + *** test_snap_delay *** > +1..3 > + 1..1 > + ok 1 - check snapshot delay confirm > +ok 1 - subtests > + 1..1 > + ok 1 - check snapshot delay rollback > +ok 2 - subtests > + 1..1 > + ok 1 - check snapshot delay timeout > +ok 3 - subtests > + *** test_snap_delay: done *** Thanks for the patch! LGTM. -- Serge Petrenko