From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp29.i.mail.ru (smtp29.i.mail.ru [94.100.177.89]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C10CB42EF5C for ; Sun, 5 Jul 2020 00:56:08 +0300 (MSK) From: Serge Petrenko Date: Sun, 5 Jul 2020 00:55:49 +0300 Message-Id: <9a78892071bb44779f3bc21788b86b8c53a8ace5.1593899478.git.sergepetrenko@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, gorcunov@gmail.com, sergos@tarantool.org Cc: tarantool-patches@dev.tarantool.org Introduce replication_synchro_leader option to box.cfg. Once an instance is promoted to leader, it makes sure that txn_limbo is free of previous leader's transactions. In order to achieve this goal, the instance first waits for 2 replication_synchro_timeouts so that confirmations and rollbacks from the former leader reach it. If the limbo remains non-empty, the new leader starts figuring out which transactions should be confirmed and which should be rolled back. In order to do so the instance scans through vclocks of all the instances that replicate from it and defines which former leader's lsn is the last reached by replication_synchro_quorum of replicas. Then the instance writes appropriate CONFIRM and ROLLBACK entries. After these actions the limbo must be empty, and the instance may proceed with appending its own entries to the limbo. Closes #4849 --- src/box/box.cc | 79 ++++++++++++++++++++++++++++++++++++++++ src/box/box.h | 1 + src/box/lua/cfg.cc | 9 +++++ src/box/lua/load_cfg.lua | 4 ++ src/box/txn_limbo.c | 16 +------- src/box/txn_limbo.h | 15 ++++++++ 6 files changed, 110 insertions(+), 14 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index ca24b98ca..087710383 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -78,6 +78,7 @@ #include "sequence.h" #include "sql_stmt_cache.h" #include "msgpack.h" +#include "trivia/util.h" static char status[64] = "unknown"; @@ -945,6 +946,84 @@ box_set_replication_anon(void) } +void +box_set_replication_synchro_leader(void) +{ + bool is_leader = cfg_geti("replication_synchro_leader"); + /* + * For now no actions required when an instance stops + * being a leader. We should probably wait until txn_limbo + * becomes empty. + */ + if (!is_leader) + return; + uint32_t former_leader_id = txn_limbo.instance_id; + if (former_leader_id == REPLICA_ID_NIL || + former_leader_id == instance_id) { + return; + } + + /* Wait until pending confirmations/rollbacks reach us. */ + double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo); + double start_tm = fiber_time(); + while (!txn_limbo_is_empty(&txn_limbo)) { + if (fiber_time() - start_tm > timeout) + break; + fiber_sleep(0.001); + } + + if (!txn_limbo_is_empty(&txn_limbo)) { + int64_t lsns[VCLOCK_MAX]; + int len = 0; + const struct vclock *vclock; + replicaset_foreach(replica) { + if (replica->relay != NULL && + relay_get_state(replica->relay) != RELAY_OFF && + !replica->anon) { + assert(!tt_uuid_is_equal(&INSTANCE_UUID, + &replica->uuid)); + vclock = relay_vclock(replica->relay); + int64_t lsn = vclock_get(vclock, + former_leader_id); + lsns[len++] = lsn; + } + } + lsns[len++] = vclock_get(box_vclock, former_leader_id); + assert(len < VCLOCK_MAX); + + int64_t confirm_lsn = 0; + if (len >= replication_synchro_quorum) { + qsort(lsns, len, sizeof(int64_t), cmp_i64); + confirm_lsn = lsns[len - replication_synchro_quorum]; + } + + struct txn_limbo_entry *e, *last_quorum = NULL; + struct txn_limbo_entry *rollback = NULL; + rlist_foreach_entry(e, &txn_limbo.queue, in_queue) { + if (txn_has_flag(e->txn, TXN_WAIT_ACK)) { + if (e->lsn <= confirm_lsn) { + last_quorum = e; + } else { + rollback = e; + break; + } + } + } + + if (last_quorum != NULL) { + confirm_lsn = last_quorum->lsn; + txn_limbo_write_confirm(&txn_limbo, confirm_lsn); + txn_limbo_read_confirm(&txn_limbo, confirm_lsn); + } + if (rollback != NULL) { + txn_limbo_write_rollback(&txn_limbo, rollback->lsn); + txn_limbo_read_rollback(&txn_limbo, rollback->lsn - 1); + } + + assert(txn_limbo_is_empty(&txn_limbo)); + } +} + void box_listen(void) { diff --git a/src/box/box.h b/src/box/box.h index f9789154e..565b0ebce 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -245,6 +245,7 @@ void box_set_replication_connect_quorum(void); void box_set_replication_sync_lag(void); int box_set_replication_synchro_quorum(void); int box_set_replication_synchro_timeout(void); +void box_set_replication_synchro_leader(void); void box_set_replication_sync_timeout(void); void box_set_replication_skip_conflict(void); void box_set_replication_anon(void); diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index d481155cd..adc1fcf3f 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -329,6 +329,14 @@ lbox_cfg_set_replication_synchro_timeout(struct lua_State *L) return 0; } +static int +lbox_cfg_set_replication_synchro_leader(struct lua_State *L) +{ + (void) L; + box_set_replication_synchro_leader(); + return 0; +} + static int lbox_cfg_set_replication_sync_timeout(struct lua_State *L) { @@ -388,6 +396,7 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag}, {"cfg_set_replication_synchro_quorum", lbox_cfg_set_replication_synchro_quorum}, {"cfg_set_replication_synchro_timeout", lbox_cfg_set_replication_synchro_timeout}, + {"cfg_set_replication_synchro_leader", lbox_cfg_set_replication_synchro_leader}, {"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout}, {"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict}, {"cfg_set_replication_anon", lbox_cfg_set_replication_anon}, diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 107bc1582..9a968f30e 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -91,6 +91,7 @@ local default_cfg = { replication_sync_timeout = 300, replication_synchro_quorum = 1, replication_synchro_timeout = 5, + replication_synchro_leader = false, replication_connect_timeout = 30, replication_connect_quorum = nil, -- connect all replication_skip_conflict = false, @@ -168,6 +169,7 @@ local template_cfg = { replication_sync_timeout = 'number', replication_synchro_quorum = 'number', replication_synchro_timeout = 'number', + replication_synchro_leader = 'boolean', replication_connect_timeout = 'number', replication_connect_quorum = 'number', replication_skip_conflict = 'boolean', @@ -286,6 +288,7 @@ local dynamic_cfg = { replication_sync_timeout = private.cfg_set_replication_sync_timeout, replication_synchro_quorum = private.cfg_set_replication_synchro_quorum, replication_synchro_timeout = private.cfg_set_replication_synchro_timeout, + replication_synchro_leader = private.cfg_set_replication_synchro_leader, replication_skip_conflict = private.cfg_set_replication_skip_conflict, replication_anon = private.cfg_set_replication_anon, instance_uuid = check_instance_uuid, @@ -333,6 +336,7 @@ local dynamic_cfg_order = { -- the new one. This should be fixed when box.cfg is able to -- apply some parameters together and atomically. replication_anon = 250, + replication_synchro__leader = 250, } local function sort_cfg_cb(l, r) diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index 44a0c7273..992115ad1 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -148,9 +148,6 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) return entry->is_commit; } -static int -txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn); - int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) { @@ -261,11 +258,7 @@ rollback: return -1; } -/** - * Write a confirmation entry to WAL. After it's written all the - * transactions waiting for confirmation may be finished. - */ -static int +int txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn) { return txn_limbo_write_confirm_rollback(limbo, lsn, true); @@ -303,12 +296,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) } } -/** - * Write a rollback message to WAL. After it's written - * all the transactions following the current one and waiting - * for confirmation must be rolled back. - */ -static int +int txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn) { return txn_limbo_write_confirm_rollback(limbo, lsn, false); diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index 3abbe9e85..5bf5827ac 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -205,6 +205,21 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); +/** + * Write a confirmation entry to WAL. After it's written all the + * transactions waiting for confirmation may be finished. + */ +int +txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn); + +/** + * Write a rollback message to WAL. After it's written + * all the transactions following the current one and waiting + * for confirmation must be rolled back. + */ +int +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn); + /** * Confirm all the entries up to the given master's LSN. */ -- 2.24.3 (Apple Git-128)