[Tarantool-patches] [PATCH v2 08/19] replication: add support of qsync to the snapshot machinery
Serge Petrenko
sergepetrenko at tarantool.org
Thu Jul 2 11:52:14 MSK 2020
30.06.2020 02:15, Vladislav Shpilevoy пишет:
> From: Leonid Vasiliev <lvasiliev at tarantool.org>
>
> To support qsync replication, the waiting for confirmation of
> current "sync" transactions during a timeout has been added to
> the snapshot machinery. In the case of rollback or the timeout
> expiration, the snapshot will be cancelled.
>
> Closes #4850
> ---
> src/box/gc.c | 12 ++
> src/box/txn_limbo.c | 81 ++++++++++
> src/box/txn_limbo.h | 29 ++++
> test/unit/CMakeLists.txt | 3 +
> test/unit/snap_quorum_delay.cc | 250 +++++++++++++++++++++++++++++
> test/unit/snap_quorum_delay.result | 12 ++
> 6 files changed, 387 insertions(+)
> create mode 100644 test/unit/snap_quorum_delay.cc
> create mode 100644 test/unit/snap_quorum_delay.result
>
> diff --git a/src/box/gc.c b/src/box/gc.c
> index 8e8ffea75..170c0a97f 100644
> --- a/src/box/gc.c
> +++ b/src/box/gc.c
> @@ -57,6 +57,9 @@
> #include "engine.h" /* engine_collect_garbage() */
> #include "wal.h" /* wal_collect_garbage() */
> #include "checkpoint_schedule.h"
> +#include "trigger.h"
> +#include "txn.h"
> +#include "txn_limbo.h"
>
> struct gc_state gc;
>
> @@ -395,6 +398,15 @@ gc_do_checkpoint(bool is_scheduled)
> rc = wal_begin_checkpoint(&checkpoint);
> if (rc != 0)
> goto out;
> +
> + /*
> + * Wait the confirms on all "sync" transactions before
> + * create a snapshot.
> + */
> + rc = txn_limbo_wait_confirm(&txn_limbo);
> + if (rc != 0)
> + goto out;
> +
> rc = engine_commit_checkpoint(&checkpoint.vclock);
> if (rc != 0)
> goto out;
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index b38d82e4f..bee8e8155 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -231,6 +231,87 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
> }
> }
>
> +double
> +txn_limbo_confirm_timeout(struct txn_limbo *limbo)
> +{
> + (void)limbo;
> + return replication_synchro_timeout;
> +}
> +
> +/**
> + * Waitpoint stores information about the progress of confirmation.
> + * In the case of multimaster support, it will store a bitset
> + * or array instead of the boolean.
> + */
> +struct confirm_waitpoint {
> + /**
> + * Variable for wake up the fiber that is waiting for
> + * the end of confirmation.
> + */
> + struct fiber_cond confirm_cond;
> + /**
> + * Result flag.
> + */
> + bool is_confirm;
> +};
> +
> +static int
> +txn_commit_cb(struct trigger *trigger, void *event)
> +{
> + (void)event;
> + struct confirm_waitpoint *cwp =
> + (struct confirm_waitpoint *)trigger->data;
> + cwp->is_confirm = true;
> + fiber_cond_signal(&cwp->confirm_cond);
> + return 0;
> +}
> +
> +static int
> +txn_rollback_cb(struct trigger *trigger, void *event)
> +{
> + (void)event;
> + struct confirm_waitpoint *cwp =
> + (struct confirm_waitpoint *)trigger->data;
> + fiber_cond_signal(&cwp->confirm_cond);
> + return 0;
> +}
> +
> +int
> +txn_limbo_wait_confirm(struct txn_limbo *limbo)
> +{
> + if (txn_limbo_is_empty(limbo))
> + return 0;
> +
> + /* initialization of a waitpoint. */
> + struct confirm_waitpoint cwp;
> + fiber_cond_create(&cwp.confirm_cond);
> + cwp.is_confirm = false;
> +
> + /* Set triggers for the last limbo transaction. */
> + struct trigger on_complete;
> + trigger_create(&on_complete, txn_commit_cb, &cwp, NULL);
> + struct trigger on_rollback;
> + trigger_create(&on_rollback, txn_rollback_cb, &cwp, NULL);
> + struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo);
> + txn_on_commit(tle->txn, &on_complete);
> + txn_on_rollback(tle->txn, &on_rollback);
> +
> + int rc = fiber_cond_wait_timeout(&cwp.confirm_cond,
> + txn_limbo_confirm_timeout(limbo));
> + fiber_cond_destroy(&cwp.confirm_cond);
> + if (rc != 0) {
> + /* Clear the triggers if the timeout has been reached. */
> + trigger_clear(&on_complete);
> + trigger_clear(&on_rollback);
> + return -1;
> + }
> + if (!cwp.is_confirm) {
> + /* The transaction has been rolled back. */
> + return -1;
> + }
> + return 0;
> +}
> +
> void
> txn_limbo_init(void)
> {
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index de415cd97..94f224131 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -166,6 +166,35 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
> void
> txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
>
> +/**
> + * Return TRUE if limbo is empty.
> + */
> +static inline bool
> +txn_limbo_is_empty(struct txn_limbo *limbo)
> +{
> + return rlist_empty(&limbo->queue);
> +}
> +
> +/**
> + * Return a pointer to the last txn_limbo_entry of limbo.
> + */
> +static inline struct txn_limbo_entry *
> +txn_limbo_last_entry(struct txn_limbo *limbo)
> +{
> + return rlist_last_entry(&limbo->queue, struct txn_limbo_entry,
> + in_queue);
> +}
> +
> +double
> +txn_limbo_confirm_timeout(struct txn_limbo *limbo);
> +
> +/**
> + * Waiting for confirmation of all "sync" transactions
> + * during confirm timeout or fail.
> + */
> +int
> +txn_limbo_wait_confirm(struct txn_limbo *limbo);
> +
> void
> txn_limbo_init();
>
> diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
> index 672122118..419477748 100644
> --- a/test/unit/CMakeLists.txt
> +++ b/test/unit/CMakeLists.txt
> @@ -257,6 +257,9 @@ target_link_libraries(swim_errinj.test unit swim)
> add_executable(merger.test merger.test.c)
> target_link_libraries(merger.test unit core box)
>
> +add_executable(snap_quorum_delay.test snap_quorum_delay.cc)
> +target_link_libraries(snap_quorum_delay.test box core unit)
> +
> #
> # Client for popen.test
> add_executable(popen-child popen-child.c)
> diff --git a/test/unit/snap_quorum_delay.cc b/test/unit/snap_quorum_delay.cc
> new file mode 100644
> index 000000000..7a200673a
> --- /dev/null
> +++ b/test/unit/snap_quorum_delay.cc
> @@ -0,0 +1,250 @@
> +/*
> + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and iproto forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + * copyright notice, this list of conditions and the
> + * following disclaimer.
> + *
> + * 2. Redistributions in iproto form must reproduce the above
> + * copyright notice, this list of conditions and the following
> + * disclaimer in the documentation and/or other materials
> + * provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include "unit.h"
> +#include "gc.h"
> +#include "memory.h"
> +#include "txn.h"
> +#include "txn_limbo.h"
> +
> +/**
> + * This test is only about delay in snapshot machinery (needed
> + * for qsync replication). It doesn't test the snapshot
> + * machinery, txn_limbo or something else and uses some tricks
> + * around txn_limbo.
> + * The logic of the test is as folows:
> + * In fiber_1 ("txn_fiber"):
> + * - start a transaction.
> + * - push the transaction to the limbo.
> + * - start wait confirm (yield).
> + * In fiber_2 ("main"):
> + * - do a snapshot.
> + * - start wait while the last transaction
> + * from the limbo will be completed.
> + * In fiber_3 ("confirm_fiber"):
> + * - confirm the transaction (remove the transaction from
> + * the limbo and wakeup fiber_1).
> + * In fiber_1 ("txn_fiber"):
> + * - confirm/rollback/hung the transaction.
> + * In fiber_2 ("main"):
> + * - check_results
> + */
> +
> +extern int replication_synchro_quorum;
> +extern double replication_synchro_timeout;
> +
> +namespace /* local symbols */ {
> +
> +int test_result;
> +
> +/**
> + * Variations of a transaction completion.
> + */
> +enum process_type {
> + TXN_PROCESS_COMMIT,
> + TXN_PROCESS_ROLLBACK,
> + TXN_PROCESS_TIMEOUT
> +};
> +
> +/**
> + * Some fake values needed for work with the limbo
> + * (to push a transaction to the limbo and simulate confirm).
> + */
> +const int fake_lsn = 1;
> +const int instace_id = 1;
> +const int relay_id = 2;
> +
> +int
> +trg_cb(struct trigger *trigger, void *event)
> +{
> + (void)event;
> + bool *check_trg = (bool *)trigger->data;
> + *check_trg = true;
> + return 0;
> +}
> +
> +int
> +txn_process_func(va_list ap)
> +{
> + bool *check_trg = va_arg(ap, bool *);
> + enum process_type process_type = (enum process_type)va_arg(ap, int);
> + struct txn *txn = txn_begin();
> + txn->fiber = fiber();
> + /* Set the TXN_WAIT_ACK flag to simulate a sync transaction.*/
> + txn_set_flag(txn, TXN_WAIT_ACK);
> + /*
> + * The true way to push the transaction to limbo is to call
> + * txn_commit() for sync transaction. But, if txn_commit()
> + * will be called now, the transaction will not be pushed to
> + * the limbo because this is the case txn_commit_nop().
> + * Instead, we push the transaction to the limbo manually
> + * and call txn_commit (or another) later.
> + */
> + struct txn_limbo_entry *entry = txn_limbo_append(&txn_limbo,
> + instace_id, txn);
> + /*
> + * The trigger is used to verify that the transaction has been
> + * completed.
> + */
> + struct trigger trg;
> + trigger_create(&trg, trg_cb, check_trg, NULL);
> +
> + switch (process_type) {
> + case TXN_PROCESS_COMMIT:
> + txn_on_commit(txn, &trg);
> + break;
> + case TXN_PROCESS_ROLLBACK:
> + txn_on_rollback(txn, &trg);
> + break;
> + case TXN_PROCESS_TIMEOUT:
> + break;
> + default:
> + unreachable();
> + }
> +
> + txn_limbo_assign_lsn(&txn_limbo, entry, fake_lsn);
> + txn_limbo_wait_complete(&txn_limbo, entry);
> +
> + switch (process_type) {
> + case TXN_PROCESS_COMMIT:
> + txn_commit(txn);
> + break;
> + case TXN_PROCESS_ROLLBACK:
> + txn_rollback(txn);
> + break;
> + case TXN_PROCESS_TIMEOUT:
> + fiber_yield();
> + break;
> + default:
> + unreachable();
> + }
> + return 0;
> +}
> +
> +int
> +txn_confirm_func(va_list ap)
> +{
> + /*
> + * We shouldn't react on gc_wait_cleanup() yield
> + * inside gc_checkpoint().
> + */
> + fiber_sleep(0);
> + txn_limbo_ack(&txn_limbo, relay_id, fake_lsn);
> + return 0;
> +}
> +
> +
> +void
> +test_snap_delay_common(enum process_type process_type)
> +{
> + plan(1);
> +
> + /*
> + * We need to clear the limbo vclock before the new test
> + * variation because the same fake lsn will be used.
> + */
> + vclock_clear(&txn_limbo.vclock);
> + vclock_create(&txn_limbo.vclock);
> +
> + bool check_trg = false;
> + struct fiber *txn_fiber = fiber_new("txn_fiber", txn_process_func);
> + fiber_start(txn_fiber, &check_trg, process_type);
> +
> + struct fiber *confirm_entry = fiber_new("confirm_fiber",
> + txn_confirm_func);
> + fiber_wakeup(confirm_entry);
> +
> + switch (process_type) {
> + case TXN_PROCESS_COMMIT:
> + ok(gc_checkpoint() == 0 && check_trg,
> + "check snapshot delay confirm");
> + break;
> + case TXN_PROCESS_ROLLBACK:
> + ok(gc_checkpoint() == -1 && check_trg,
> + "check snapshot delay rollback");
> + break;
> + case TXN_PROCESS_TIMEOUT:
> + ok(gc_checkpoint() == -1, "check snapshot delay timeout");
> + /* join the "hung" fiber */
> + fiber_set_joinable(txn_fiber, true);
> + fiber_cancel(txn_fiber);
> + fiber_join(txn_fiber);
> + break;
> + default:
> + unreachable();
> + }
> + check_plan();
> +}
> +
> +void
> +test_snap_delay_timeout()
> +{
> + /* Set the timeout to a small value for the test. */
> + replication_synchro_timeout = 0.01;
> + test_snap_delay_common(TXN_PROCESS_TIMEOUT);
> +}
> +
> +int
> +test_snap_delay(va_list ap)
> +{
> + header();
> + plan(3);
> + (void)ap;
> + replication_synchro_quorum = 2;
> +
> + test_snap_delay_common(TXN_PROCESS_COMMIT);
> + test_snap_delay_common(TXN_PROCESS_ROLLBACK);
> + test_snap_delay_timeout();
> +
> + ev_break(loop(), EVBREAK_ALL);
> + footer();
> + test_result = check_plan();
> + return 0;
> +}
> +} /* end of anonymous namespace */
> +
> +int
> +main(void)
> +{
> + memory_init();
> + fiber_init(fiber_c_invoke);
> + gc_init();
> + txn_limbo_init();
> +
> + struct fiber *main_fiber = fiber_new("main", test_snap_delay);
> + assert(main_fiber != NULL);
> + fiber_wakeup(main_fiber);
> + ev_run(loop(), 0);
> +
> + gc_free();
> + fiber_free();
> + memory_free();
> + return test_result;
> +}
> diff --git a/test/unit/snap_quorum_delay.result b/test/unit/snap_quorum_delay.result
> new file mode 100644
> index 000000000..6ca213391
> --- /dev/null
> +++ b/test/unit/snap_quorum_delay.result
> @@ -0,0 +1,12 @@
> + *** test_snap_delay ***
> +1..3
> + 1..1
> + ok 1 - check snapshot delay confirm
> +ok 1 - subtests
> + 1..1
> + ok 1 - check snapshot delay rollback
> +ok 2 - subtests
> + 1..1
> + ok 1 - check snapshot delay timeout
> +ok 3 - subtests
> + *** test_snap_delay: done ***
Thanks for the patch! LGTM.
--
Serge Petrenko
More information about the Tarantool-patches
mailing list