[Tarantool-patches] [PATCH v2 08/19] replication: add support of qsync to the snapshot machinery

Serge Petrenko sergepetrenko at tarantool.org
Thu Jul 2 11:52:14 MSK 2020


30.06.2020 02:15, Vladislav Shpilevoy пишет:
> From: Leonid Vasiliev <lvasiliev at tarantool.org>
>
> To support qsync replication, the waiting for confirmation of
> current "sync" transactions during a timeout has been added to
> the snapshot machinery. In the case of rollback or the timeout
> expiration, the snapshot will be cancelled.
>
> Closes #4850
> ---
>   src/box/gc.c                       |  12 ++
>   src/box/txn_limbo.c                |  81 ++++++++++
>   src/box/txn_limbo.h                |  29 ++++
>   test/unit/CMakeLists.txt           |   3 +
>   test/unit/snap_quorum_delay.cc     | 250 +++++++++++++++++++++++++++++
>   test/unit/snap_quorum_delay.result |  12 ++
>   6 files changed, 387 insertions(+)
>   create mode 100644 test/unit/snap_quorum_delay.cc
>   create mode 100644 test/unit/snap_quorum_delay.result
>
> diff --git a/src/box/gc.c b/src/box/gc.c
> index 8e8ffea75..170c0a97f 100644
> --- a/src/box/gc.c
> +++ b/src/box/gc.c
> @@ -57,6 +57,9 @@
>   #include "engine.h"		/* engine_collect_garbage() */
>   #include "wal.h"		/* wal_collect_garbage() */
>   #include "checkpoint_schedule.h"
> +#include "trigger.h"
> +#include "txn.h"
> +#include "txn_limbo.h"
>   
>   struct gc_state gc;
>   
> @@ -395,6 +398,15 @@ gc_do_checkpoint(bool is_scheduled)
>   	rc = wal_begin_checkpoint(&checkpoint);
>   	if (rc != 0)
>   		goto out;
> +
> +	/*
> +	 * Wait the confirms on all "sync" transactions before
> +	 * create a snapshot.
> +	 */
> +	rc = txn_limbo_wait_confirm(&txn_limbo);
> +	if (rc != 0)
> +		goto out;
> +
>   	rc = engine_commit_checkpoint(&checkpoint.vclock);
>   	if (rc != 0)
>   		goto out;
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index b38d82e4f..bee8e8155 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -231,6 +231,87 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
>   	}
>   }
>   
> +double
> +txn_limbo_confirm_timeout(struct txn_limbo *limbo)
> +{
> +	(void)limbo;
> +	return replication_synchro_timeout;
> +}
> +
> +/**
> + * Waitpoint stores information about the progress of confirmation.
> + * In the case of multimaster support, it will store a bitset
> + * or array instead of the boolean.
> + */
> +struct confirm_waitpoint {
> +	/**
> +	 * Variable for wake up the fiber that is waiting for
> +	 * the end of confirmation.
> +	 */
> +	struct fiber_cond confirm_cond;
> +	/**
> +	 * Result flag.
> +	 */
> +	bool is_confirm;
> +};
> +
> +static int
> +txn_commit_cb(struct trigger *trigger, void *event)
> +{
> +	(void)event;
> +	struct confirm_waitpoint *cwp =
> +		(struct confirm_waitpoint *)trigger->data;
> +	cwp->is_confirm = true;
> +	fiber_cond_signal(&cwp->confirm_cond);
> +	return 0;
> +}
> +
> +static int
> +txn_rollback_cb(struct trigger *trigger, void *event)
> +{
> +	(void)event;
> +	struct confirm_waitpoint *cwp =
> +		(struct confirm_waitpoint *)trigger->data;
> +	fiber_cond_signal(&cwp->confirm_cond);
> +	return 0;
> +}
> +
> +int
> +txn_limbo_wait_confirm(struct txn_limbo *limbo)
> +{
> +	if (txn_limbo_is_empty(limbo))
> +		return 0;
> +
> +	/* initialization of a waitpoint. */
> +	struct confirm_waitpoint cwp;
> +	fiber_cond_create(&cwp.confirm_cond);
> +	cwp.is_confirm = false;
> +
> +	/* Set triggers for the last limbo transaction. */
> +	struct trigger on_complete;
> +	trigger_create(&on_complete, txn_commit_cb, &cwp, NULL);
> +	struct trigger on_rollback;
> +	trigger_create(&on_rollback, txn_rollback_cb, &cwp, NULL);
> +	struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo);
> +	txn_on_commit(tle->txn, &on_complete);
> +	txn_on_rollback(tle->txn, &on_rollback);
> +
> +	int rc = fiber_cond_wait_timeout(&cwp.confirm_cond,
> +					 txn_limbo_confirm_timeout(limbo));
> +	fiber_cond_destroy(&cwp.confirm_cond);
> +	if (rc != 0) {
> +		/* Clear the triggers if the timeout has been reached. */
> +		trigger_clear(&on_complete);
> +		trigger_clear(&on_rollback);
> +		return -1;
> +	}
> +	if (!cwp.is_confirm) {
> +		/* The transaction has been rolled back. */
> +		return -1;
> +	}
> +	return 0;
> +}
> +
>   void
>   txn_limbo_init(void)
>   {
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index de415cd97..94f224131 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -166,6 +166,35 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>   void
>   txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
>   
> +/**
> + * Return TRUE if limbo is empty.
> + */
> +static inline bool
> +txn_limbo_is_empty(struct txn_limbo *limbo)
> +{
> +	return rlist_empty(&limbo->queue);
> +}
> +
> +/**
> + * Return a pointer to the last txn_limbo_entry of limbo.
> + */
> +static inline struct txn_limbo_entry *
> +txn_limbo_last_entry(struct txn_limbo *limbo)
> +{
> +	return rlist_last_entry(&limbo->queue, struct txn_limbo_entry,
> +				in_queue);
> +}
> +
> +double
> +txn_limbo_confirm_timeout(struct txn_limbo *limbo);
> +
> +/**
> + * Waiting for confirmation of all "sync" transactions
> + * during confirm timeout or fail.
> + */
> +int
> +txn_limbo_wait_confirm(struct txn_limbo *limbo);
> +
>   void
>   txn_limbo_init();
>   
> diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
> index 672122118..419477748 100644
> --- a/test/unit/CMakeLists.txt
> +++ b/test/unit/CMakeLists.txt
> @@ -257,6 +257,9 @@ target_link_libraries(swim_errinj.test unit swim)
>   add_executable(merger.test merger.test.c)
>   target_link_libraries(merger.test unit core box)
>   
> +add_executable(snap_quorum_delay.test snap_quorum_delay.cc)
> +target_link_libraries(snap_quorum_delay.test box core unit)
> +
>   #
>   # Client for popen.test
>   add_executable(popen-child popen-child.c)
> diff --git a/test/unit/snap_quorum_delay.cc b/test/unit/snap_quorum_delay.cc
> new file mode 100644
> index 000000000..7a200673a
> --- /dev/null
> +++ b/test/unit/snap_quorum_delay.cc
> @@ -0,0 +1,250 @@
> +/*
> + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and iproto forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in iproto form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include "unit.h"
> +#include "gc.h"
> +#include "memory.h"
> +#include "txn.h"
> +#include "txn_limbo.h"
> +
> +/**
> + * This test is only about delay in snapshot machinery (needed
> + * for qsync replication). It doesn't test the snapshot
> + * machinery, txn_limbo or something else and uses some tricks
> + * around txn_limbo.
> + * The logic of the test is as folows:
> + * In fiber_1 ("txn_fiber"):
> + *	- start a transaction.
> + *	- push the transaction to the limbo.
> + *	- start wait confirm (yield).
> + * In fiber_2 ("main"):
> + *	- do a snapshot.
> + *	- start wait while the last transaction
> + *	  from the limbo will be completed.
> + * In fiber_3 ("confirm_fiber"):
> + *	- confirm the transaction (remove the transaction from
> + *				   the limbo and wakeup fiber_1).
> + * In fiber_1 ("txn_fiber"):
> + *	- confirm/rollback/hung the transaction.
> + * In fiber_2 ("main"):
> + *	- check_results
> + */
> +
> +extern int replication_synchro_quorum;
> +extern double replication_synchro_timeout;
> +
> +namespace /* local symbols */ {
> +
> +int test_result;
> +
> +/**
> + * Variations of a transaction completion.
> + */
> +enum process_type {
> +	TXN_PROCESS_COMMIT,
> +	TXN_PROCESS_ROLLBACK,
> +	TXN_PROCESS_TIMEOUT
> +};
> +
> +/**
> + * Some fake values needed for work with the limbo
> + * (to push a transaction to the limbo and simulate confirm).
> + */
> +const int fake_lsn = 1;
> +const int instace_id = 1;
> +const int relay_id = 2;
> +
> +int
> +trg_cb(struct trigger *trigger, void *event)
> +{
> +	(void)event;
> +	bool *check_trg = (bool *)trigger->data;
> +	*check_trg = true;
> +	return 0;
> +}
> +
> +int
> +txn_process_func(va_list ap)
> +{
> +	bool *check_trg = va_arg(ap, bool *);
> +	enum process_type process_type = (enum process_type)va_arg(ap, int);
> +	struct txn *txn = txn_begin();
> +	txn->fiber = fiber();
> +	/* Set the TXN_WAIT_ACK flag to simulate a sync transaction.*/
> +	txn_set_flag(txn, TXN_WAIT_ACK);
> +	/*
> +	 * The true way to push the transaction to limbo is to call
> +	 * txn_commit() for sync transaction. But, if txn_commit()
> +	 * will be called now, the transaction will not be pushed to
> +	 * the limbo because this is the case txn_commit_nop().
> +	 * Instead, we push the transaction to the limbo manually
> +	 * and call txn_commit (or another) later.
> +	 */
> +	struct txn_limbo_entry *entry = txn_limbo_append(&txn_limbo,
> +							 instace_id, txn);
> +	/*
> +	 * The trigger is used to verify that the transaction has been
> +	 * completed.
> +	 */
> +	struct trigger trg;
> +	trigger_create(&trg, trg_cb, check_trg, NULL);
> +
> +	switch (process_type) {
> +	case TXN_PROCESS_COMMIT:
> +		txn_on_commit(txn, &trg);
> +		break;
> +	case TXN_PROCESS_ROLLBACK:
> +		txn_on_rollback(txn, &trg);
> +		break;
> +	case TXN_PROCESS_TIMEOUT:
> +		break;
> +	default:
> +		unreachable();
> +	}
> +
> +	txn_limbo_assign_lsn(&txn_limbo, entry, fake_lsn);
> +	txn_limbo_wait_complete(&txn_limbo, entry);
> +
> +	switch (process_type) {
> +	case TXN_PROCESS_COMMIT:
> +		txn_commit(txn);
> +		break;
> +	case TXN_PROCESS_ROLLBACK:
> +		txn_rollback(txn);
> +		break;
> +	case TXN_PROCESS_TIMEOUT:
> +		fiber_yield();
> +		break;
> +	default:
> +		unreachable();
> +	}
> +	return 0;
> +}
> +
> +int
> +txn_confirm_func(va_list ap)
> +{
> +	/*
> +	 * We shouldn't react on gc_wait_cleanup() yield
> +	 * inside gc_checkpoint().
> +	 */
> +	fiber_sleep(0);
> +	txn_limbo_ack(&txn_limbo, relay_id, fake_lsn);
> +	return 0;
> +}
> +
> +
> +void
> +test_snap_delay_common(enum process_type process_type)
> +{
> +	plan(1);
> +
> +	/*
> +	 * We need to clear the limbo vclock before the new test
> +	 * variation because the same fake lsn will be used.
> +	 */
> +	vclock_clear(&txn_limbo.vclock);
> +	vclock_create(&txn_limbo.vclock);
> +
> +	bool check_trg = false;
> +	struct fiber *txn_fiber = fiber_new("txn_fiber", txn_process_func);
> +	fiber_start(txn_fiber, &check_trg, process_type);
> +
> +	struct fiber *confirm_entry = fiber_new("confirm_fiber",
> +						txn_confirm_func);
> +	fiber_wakeup(confirm_entry);
> +
> +	switch (process_type) {
> +	case TXN_PROCESS_COMMIT:
> +		ok(gc_checkpoint() == 0 && check_trg,
> +		   "check snapshot delay confirm");
> +		break;
> +	case TXN_PROCESS_ROLLBACK:
> +		ok(gc_checkpoint() == -1 && check_trg,
> +		   "check snapshot delay rollback");
> +		break;
> +	case TXN_PROCESS_TIMEOUT:
> +		ok(gc_checkpoint() == -1, "check snapshot delay timeout");
> +		/* join the "hung" fiber */
> +		fiber_set_joinable(txn_fiber, true);
> +		fiber_cancel(txn_fiber);
> +		fiber_join(txn_fiber);
> +		break;
> +	default:
> +		unreachable();
> +	}
> +	check_plan();
> +}
> +
> +void
> +test_snap_delay_timeout()
> +{
> +	/* Set the timeout to a small value for the test. */
> +	replication_synchro_timeout = 0.01;
> +	test_snap_delay_common(TXN_PROCESS_TIMEOUT);
> +}
> +
> +int
> +test_snap_delay(va_list ap)
> +{
> +	header();
> +	plan(3);
> +	(void)ap;
> +	replication_synchro_quorum = 2;
> +
> +	test_snap_delay_common(TXN_PROCESS_COMMIT);
> +	test_snap_delay_common(TXN_PROCESS_ROLLBACK);
> +	test_snap_delay_timeout();
> +
> +	ev_break(loop(), EVBREAK_ALL);
> +	footer();
> +	test_result = check_plan();
> +	return 0;
> +}
> +} /* end of anonymous namespace */
> +
> +int
> +main(void)
> +{
> +	memory_init();
> +	fiber_init(fiber_c_invoke);
> +	gc_init();
> +	txn_limbo_init();
> +
> +	struct fiber *main_fiber = fiber_new("main", test_snap_delay);
> +	assert(main_fiber != NULL);
> +	fiber_wakeup(main_fiber);
> +	ev_run(loop(), 0);
> +
> +	gc_free();
> +	fiber_free();
> +	memory_free();
> +	return test_result;
> +}
> diff --git a/test/unit/snap_quorum_delay.result b/test/unit/snap_quorum_delay.result
> new file mode 100644
> index 000000000..6ca213391
> --- /dev/null
> +++ b/test/unit/snap_quorum_delay.result
> @@ -0,0 +1,12 @@
> +	*** test_snap_delay ***
> +1..3
> +    1..1
> +    ok 1 - check snapshot delay confirm
> +ok 1 - subtests
> +    1..1
> +    ok 1 - check snapshot delay rollback
> +ok 2 - subtests
> +    1..1
> +    ok 1 - check snapshot delay timeout
> +ok 3 - subtests
> +	*** test_snap_delay: done ***
Thanks for the patch! LGTM.

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list