[Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader
Leonid Vasiliev
lvasiliev at tarantool.org
Fri Jul 10 01:03:08 MSK 2020
Hi! Thank you for the patch.
Sergey is in the army now, so my comment is addressed to Vlad.
On 05.07.2020 00:55, Serge Petrenko wrote:
> Introduce replication_synchro_leader option to box.cfg.
>
> Once an instance is promoted to leader, it makes sure that txn_limbo is
> free of previous leader's transactions. In order to achieve this goal,
> the instance first waits for 2 replication_synchro_timeouts so that
> confirmations and rollbacks from the former leader reach it.
>
> If the limbo remains non-empty, the new leader starts figuring out which
> transactions should be confirmed and which should be rolled back. In
> order to do so the instance scans through vclocks of all the instances
> that replicate from it and defines which former leader's lsn is the last
> reached by replication_synchro_quorum of replicas.
>
> Then the instance writes appropriate CONFIRM and ROLLBACK entries.
> After these actions the limbo must be empty, and the instance may
> proceed with appending its own entries to the limbo.
>
> Closes #4849
> ---
> src/box/box.cc | 79 ++++++++++++++++++++++++++++++++++++++++
> src/box/box.h | 1 +
> src/box/lua/cfg.cc | 9 +++++
> src/box/lua/load_cfg.lua | 4 ++
> src/box/txn_limbo.c | 16 +-------
> src/box/txn_limbo.h | 15 ++++++++
> 6 files changed, 110 insertions(+), 14 deletions(-)
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index ca24b98ca..087710383 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -78,6 +78,7 @@
> #include "sequence.h"
> #include "sql_stmt_cache.h"
> #include "msgpack.h"
> +#include "trivia/util.h"
>
> static char status[64] = "unknown";
>
> @@ -945,6 +946,84 @@ box_set_replication_anon(void)
>
> }
>
> +void
> +box_set_replication_synchro_leader(void)
> +{
> + bool is_leader = cfg_geti("replication_synchro_leader");
> + /*
> + * For now no actions required when an instance stops
> + * being a leader. We should probably wait until txn_limbo
> + * becomes empty.
> + */
> + if (!is_leader)
> + return;
> + uint32_t former_leader_id = txn_limbo.instance_id;
> + if (former_leader_id == REPLICA_ID_NIL ||
> + former_leader_id == instance_id) {
> + return;
> + }
> +
> + /* Wait until pending confirmations/rollbacks reach us. */
> + double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo);
> + double start_tm = fiber_time();
> + while (!txn_limbo_is_empty(&txn_limbo)) {
> + if (fiber_time() - start_tm > timeout)
From the d988d7fb92fe1dda4b64218fb06813e93eb56ed1 commit comment:
"
...use fiber_clock() instead of fiber_time() for timeouts
fiber_time() reports real time, which shouldn't be used for calculating
timeouts as it is affected by system time changes. Add fiber_clock()
based on ev_monotonic_now(), export it to Lua, and use it instead.
".
> + break;
> + fiber_sleep(0.001);
> + }
> +
> + if (!txn_limbo_is_empty(&txn_limbo)) {
> + int64_t lsns[VCLOCK_MAX];
> + int len = 0;
> + const struct vclock *vclock;
> + replicaset_foreach(replica) {
> + if (replica->relay != NULL &&
> + relay_get_state(replica->relay) != RELAY_OFF &&
> + !replica->anon) {
> + assert(!tt_uuid_is_equal(&INSTANCE_UUID,
> + &replica->uuid));
> + vclock = relay_vclock(replica->relay);
> + int64_t lsn = vclock_get(vclock,
> + former_leader_id);
> + lsns[len++] = lsn;
> + }
> + }
> + lsns[len++] = vclock_get(box_vclock, former_leader_id);
> + assert(len < VCLOCK_MAX);
> +
> + int64_t confirm_lsn = 0;
> + if (len >= replication_synchro_quorum) {
> + qsort(lsns, len, sizeof(int64_t), cmp_i64);
> + confirm_lsn = lsns[len - replication_synchro_quorum];
> + }
> +
> + struct txn_limbo_entry *e, *last_quorum = NULL;
> + struct txn_limbo_entry *rollback = NULL;
> + rlist_foreach_entry(e, &txn_limbo.queue, in_queue) {
> + if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
> + if (e->lsn <= confirm_lsn) {
> + last_quorum = e;
> + } else {
> + rollback = e;
> + break;
> + }
> + }
> + }
> +
> + if (last_quorum != NULL) {
> + confirm_lsn = last_quorum->lsn;
> + txn_limbo_write_confirm(&txn_limbo, confirm_lsn);
> + txn_limbo_read_confirm(&txn_limbo, confirm_lsn);
> + }
> + if (rollback != NULL) {
> + txn_limbo_write_rollback(&txn_limbo, rollback->lsn);
> + txn_limbo_read_rollback(&txn_limbo, rollback->lsn - 1);
> + }
> +
> + assert(txn_limbo_is_empty(&txn_limbo));
> + }
> +}
> +
> void
> box_listen(void)
> {
> diff --git a/src/box/box.h b/src/box/box.h
> index f9789154e..565b0ebce 100644
> --- a/src/box/box.h
> +++ b/src/box/box.h
> @@ -245,6 +245,7 @@ void box_set_replication_connect_quorum(void);
> void box_set_replication_sync_lag(void);
> int box_set_replication_synchro_quorum(void);
> int box_set_replication_synchro_timeout(void);
> +void box_set_replication_synchro_leader(void);
> void box_set_replication_sync_timeout(void);
> void box_set_replication_skip_conflict(void);
> void box_set_replication_anon(void);
> diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
> index d481155cd..adc1fcf3f 100644
> --- a/src/box/lua/cfg.cc
> +++ b/src/box/lua/cfg.cc
> @@ -329,6 +329,14 @@ lbox_cfg_set_replication_synchro_timeout(struct lua_State *L)
> return 0;
> }
>
> +static int
> +lbox_cfg_set_replication_synchro_leader(struct lua_State *L)
> +{
> + (void) L;
> + box_set_replication_synchro_leader();
> + return 0;
> +}
> +
> static int
> lbox_cfg_set_replication_sync_timeout(struct lua_State *L)
> {
> @@ -388,6 +396,7 @@ box_lua_cfg_init(struct lua_State *L)
> {"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag},
> {"cfg_set_replication_synchro_quorum", lbox_cfg_set_replication_synchro_quorum},
> {"cfg_set_replication_synchro_timeout", lbox_cfg_set_replication_synchro_timeout},
> + {"cfg_set_replication_synchro_leader", lbox_cfg_set_replication_synchro_leader},
> {"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout},
> {"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
> {"cfg_set_replication_anon", lbox_cfg_set_replication_anon},
> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
> index 107bc1582..9a968f30e 100644
> --- a/src/box/lua/load_cfg.lua
> +++ b/src/box/lua/load_cfg.lua
> @@ -91,6 +91,7 @@ local default_cfg = {
> replication_sync_timeout = 300,
> replication_synchro_quorum = 1,
> replication_synchro_timeout = 5,
> + replication_synchro_leader = false,
> replication_connect_timeout = 30,
> replication_connect_quorum = nil, -- connect all
> replication_skip_conflict = false,
> @@ -168,6 +169,7 @@ local template_cfg = {
> replication_sync_timeout = 'number',
> replication_synchro_quorum = 'number',
> replication_synchro_timeout = 'number',
> + replication_synchro_leader = 'boolean',
> replication_connect_timeout = 'number',
> replication_connect_quorum = 'number',
> replication_skip_conflict = 'boolean',
> @@ -286,6 +288,7 @@ local dynamic_cfg = {
> replication_sync_timeout = private.cfg_set_replication_sync_timeout,
> replication_synchro_quorum = private.cfg_set_replication_synchro_quorum,
> replication_synchro_timeout = private.cfg_set_replication_synchro_timeout,
> + replication_synchro_leader = private.cfg_set_replication_synchro_leader,
> replication_skip_conflict = private.cfg_set_replication_skip_conflict,
> replication_anon = private.cfg_set_replication_anon,
> instance_uuid = check_instance_uuid,
> @@ -333,6 +336,7 @@ local dynamic_cfg_order = {
> -- the new one. This should be fixed when box.cfg is able to
> -- apply some parameters together and atomically.
> replication_anon = 250,
> + replication_synchro__leader = 250,
> }
>
> local function sort_cfg_cb(l, r)
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index 44a0c7273..992115ad1 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -148,9 +148,6 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> return entry->is_commit;
> }
>
> -static int
> -txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
> -
> int
> txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> {
> @@ -261,11 +258,7 @@ rollback:
> return -1;
> }
>
> -/**
> - * Write a confirmation entry to WAL. After it's written all the
> - * transactions waiting for confirmation may be finished.
> - */
> -static int
> +int
> txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
> {
> return txn_limbo_write_confirm_rollback(limbo, lsn, true);
> @@ -303,12 +296,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
> }
> }
>
> -/**
> - * Write a rollback message to WAL. After it's written
> - * all the transactions following the current one and waiting
> - * for confirmation must be rolled back.
> - */
> -static int
> +int
> txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
> {
> return txn_limbo_write_confirm_rollback(limbo, lsn, false);
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 3abbe9e85..5bf5827ac 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -205,6 +205,21 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
> int
> txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>
> +/**
> + * Write a confirmation entry to WAL. After it's written all the
> + * transactions waiting for confirmation may be finished.
> + */
> +int
> +txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn);
> +
> +/**
> + * Write a rollback message to WAL. After it's written
> + * all the transactions following the current one and waiting
> + * for confirmation must be rolled back.
> + */
> +int
> +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
> +
> /**
> * Confirm all the entries up to the given master's LSN.
> */
>
More information about the Tarantool-patches
mailing list