[Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader

Leonid Vasiliev lvasiliev at tarantool.org
Fri Jul 10 01:03:08 MSK 2020


Hi! Thank you for the patch.
Sergey is in the army now, so my comment is addressed to Vlad.

On 05.07.2020 00:55, Serge Petrenko wrote:
> Introduce replication_synchro_leader option to box.cfg.
> 
> Once an instance is promoted to leader, it makes sure that txn_limbo is
> free of previous leader's transactions. In order to achieve this goal,
> the instance first waits for 2 replication_synchro_timeouts  so that
> confirmations and rollbacks from the former leader reach it.
> 
> If the limbo remains non-empty, the new leader starts figuring out which
> transactions should be confirmed and which should be rolled back. In
> order to do so the instance scans through vclocks of all the instances
> that replicate from it and defines which former leader's lsn is the last
> reached by replication_synchro_quorum of replicas.
> 
> Then the instance writes appropriate CONFIRM and ROLLBACK entries.
> After these actions the limbo must be empty, and the instance may
> proceed with appending its own entries to the limbo.
> 
> Closes #4849
> ---
>   src/box/box.cc           | 79 ++++++++++++++++++++++++++++++++++++++++
>   src/box/box.h            |  1 +
>   src/box/lua/cfg.cc       |  9 +++++
>   src/box/lua/load_cfg.lua |  4 ++
>   src/box/txn_limbo.c      | 16 +-------
>   src/box/txn_limbo.h      | 15 ++++++++
>   6 files changed, 110 insertions(+), 14 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index ca24b98ca..087710383 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -78,6 +78,7 @@
>   #include "sequence.h"
>   #include "sql_stmt_cache.h"
>   #include "msgpack.h"
> +#include "trivia/util.h"
>   
>   static char status[64] = "unknown";
>   
> @@ -945,6 +946,84 @@ box_set_replication_anon(void)
>   
>   }
>   
> +void
> +box_set_replication_synchro_leader(void)
> +{
> +	bool is_leader = cfg_geti("replication_synchro_leader");
> +	/*
> +	 * For now no actions required when an instance stops
> +	 * being a leader. We should probably wait until txn_limbo
> +	 * becomes empty.
> +	 */
> +	if (!is_leader)
> +		return;
> +	uint32_t former_leader_id = txn_limbo.instance_id;
> +	if (former_leader_id == REPLICA_ID_NIL ||
> +	    former_leader_id == instance_id) {
> +		return;
> +	}
> +
> +	/* Wait until pending confirmations/rollbacks reach us. */
> +	double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo);
> +	double start_tm = fiber_time();
> +	while (!txn_limbo_is_empty(&txn_limbo)) {
> +		if (fiber_time() - start_tm > timeout)

 From the d988d7fb92fe1dda4b64218fb06813e93eb56ed1 commit comment:
"
...use fiber_clock() instead of fiber_time() for timeouts

fiber_time() reports real time, which shouldn't be used for calculating
timeouts as it is affected by system time changes. Add fiber_clock()
based on ev_monotonic_now(), export it to Lua, and use it instead.
".

> +			break;
> +		fiber_sleep(0.001);
> +	}
> +
> +	if (!txn_limbo_is_empty(&txn_limbo)) {
> +		int64_t lsns[VCLOCK_MAX];
> +		int len = 0;
> +		const struct vclock  *vclock;
> +		replicaset_foreach(replica) {
> +			if (replica->relay != NULL &&
> +			    relay_get_state(replica->relay) != RELAY_OFF &&
> +			    !replica->anon) {
> +				assert(!tt_uuid_is_equal(&INSTANCE_UUID,
> +							 &replica->uuid));
> +				vclock = relay_vclock(replica->relay);
> +				int64_t lsn = vclock_get(vclock,
> +							 former_leader_id);
> +				lsns[len++] = lsn;
> +			}
> +		}
> +		lsns[len++] = vclock_get(box_vclock, former_leader_id);
> +		assert(len < VCLOCK_MAX);
> +
> +		int64_t confirm_lsn = 0;
> +		if (len >= replication_synchro_quorum) {
> +			qsort(lsns, len, sizeof(int64_t), cmp_i64);
> +			confirm_lsn = lsns[len - replication_synchro_quorum];
> +		}
> +
> +		struct txn_limbo_entry *e, *last_quorum = NULL;
> +		struct txn_limbo_entry *rollback = NULL;
> +		rlist_foreach_entry(e, &txn_limbo.queue, in_queue) {
> +			if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
> +				if (e->lsn <= confirm_lsn) {
> +					last_quorum = e;
> +				} else {
> +					rollback = e;
> +					break;
> +				}
> +			}
> +		}
> +
> +		if (last_quorum != NULL) {
> +			confirm_lsn = last_quorum->lsn;
> +			txn_limbo_write_confirm(&txn_limbo, confirm_lsn);
> +			txn_limbo_read_confirm(&txn_limbo, confirm_lsn);
> +		}
> +		if (rollback != NULL) {
> +			txn_limbo_write_rollback(&txn_limbo, rollback->lsn);
> +			txn_limbo_read_rollback(&txn_limbo, rollback->lsn - 1);
> +		}
> +
> +		assert(txn_limbo_is_empty(&txn_limbo));
> +	}
> +}
> +
>   void
>   box_listen(void)
>   {
> diff --git a/src/box/box.h b/src/box/box.h
> index f9789154e..565b0ebce 100644
> --- a/src/box/box.h
> +++ b/src/box/box.h
> @@ -245,6 +245,7 @@ void box_set_replication_connect_quorum(void);
>   void box_set_replication_sync_lag(void);
>   int box_set_replication_synchro_quorum(void);
>   int box_set_replication_synchro_timeout(void);
> +void box_set_replication_synchro_leader(void);
>   void box_set_replication_sync_timeout(void);
>   void box_set_replication_skip_conflict(void);
>   void box_set_replication_anon(void);
> diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
> index d481155cd..adc1fcf3f 100644
> --- a/src/box/lua/cfg.cc
> +++ b/src/box/lua/cfg.cc
> @@ -329,6 +329,14 @@ lbox_cfg_set_replication_synchro_timeout(struct lua_State *L)
>   	return 0;
>   }
>   
> +static int
> +lbox_cfg_set_replication_synchro_leader(struct lua_State *L)
> +{
> +	(void) L;
> +	box_set_replication_synchro_leader();
> +	return 0;
> +}
> +
>   static int
>   lbox_cfg_set_replication_sync_timeout(struct lua_State *L)
>   {
> @@ -388,6 +396,7 @@ box_lua_cfg_init(struct lua_State *L)
>   		{"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag},
>   		{"cfg_set_replication_synchro_quorum", lbox_cfg_set_replication_synchro_quorum},
>   		{"cfg_set_replication_synchro_timeout", lbox_cfg_set_replication_synchro_timeout},
> +		{"cfg_set_replication_synchro_leader", lbox_cfg_set_replication_synchro_leader},
>   		{"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout},
>   		{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
>   		{"cfg_set_replication_anon", lbox_cfg_set_replication_anon},
> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
> index 107bc1582..9a968f30e 100644
> --- a/src/box/lua/load_cfg.lua
> +++ b/src/box/lua/load_cfg.lua
> @@ -91,6 +91,7 @@ local default_cfg = {
>       replication_sync_timeout = 300,
>       replication_synchro_quorum = 1,
>       replication_synchro_timeout = 5,
> +    replication_synchro_leader = false,
>       replication_connect_timeout = 30,
>       replication_connect_quorum = nil, -- connect all
>       replication_skip_conflict = false,
> @@ -168,6 +169,7 @@ local template_cfg = {
>       replication_sync_timeout = 'number',
>       replication_synchro_quorum = 'number',
>       replication_synchro_timeout = 'number',
> +    replication_synchro_leader = 'boolean',
>       replication_connect_timeout = 'number',
>       replication_connect_quorum = 'number',
>       replication_skip_conflict = 'boolean',
> @@ -286,6 +288,7 @@ local dynamic_cfg = {
>       replication_sync_timeout = private.cfg_set_replication_sync_timeout,
>       replication_synchro_quorum = private.cfg_set_replication_synchro_quorum,
>       replication_synchro_timeout = private.cfg_set_replication_synchro_timeout,
> +    replication_synchro_leader = private.cfg_set_replication_synchro_leader,
>       replication_skip_conflict = private.cfg_set_replication_skip_conflict,
>       replication_anon        = private.cfg_set_replication_anon,
>       instance_uuid           = check_instance_uuid,
> @@ -333,6 +336,7 @@ local dynamic_cfg_order = {
>       -- the new one. This should be fixed when box.cfg is able to
>       -- apply some parameters together and atomically.
>       replication_anon        = 250,
> +    replication_synchro__leader = 250,
>   }
>   
>   local function sort_cfg_cb(l, r)
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index 44a0c7273..992115ad1 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -148,9 +148,6 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>   	return entry->is_commit;
>   }
>   
> -static int
> -txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
> -
>   int
>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>   {
> @@ -261,11 +258,7 @@ rollback:
>   	return -1;
>   }
>   
> -/**
> - * Write a confirmation entry to WAL. After it's written all the
> - * transactions waiting for confirmation may be finished.
> - */
> -static int
> +int
>   txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
>   {
>   	return txn_limbo_write_confirm_rollback(limbo, lsn, true);
> @@ -303,12 +296,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
>   	}
>   }
>   
> -/**
> - * Write a rollback message to WAL. After it's written
> - * all the transactions following the current one and waiting
> - * for confirmation must be rolled back.
> - */
> -static int
> +int
>   txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
>   {
>   	return txn_limbo_write_confirm_rollback(limbo, lsn, false);
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 3abbe9e85..5bf5827ac 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -205,6 +205,21 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
>   int
>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>   
> +/**
> + * Write a confirmation entry to WAL. After it's written all the
> + * transactions waiting for confirmation may be finished.
> + */
> +int
> +txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn);
> +
> +/**
> + * Write a rollback message to WAL. After it's written
> + * all the transactions following the current one and waiting
> + * for confirmation must be rolled back.
> + */
> +int
> +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
> +
>   /**
>    * Confirm all the entries up to the given master's LSN.
>    */
> 


More information about the Tarantool-patches mailing list