* [Tarantool-patches] [PATCH 0/2] introduce actions on leader election @ 2020-07-04 21:55 Serge Petrenko 2020-07-04 21:55 ` [Tarantool-patches] [PATCH 1/2] util: move cmp_i64 from xlog.c to util.h Serge Petrenko 2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko 0 siblings, 2 replies; 8+ messages in thread From: Serge Petrenko @ 2020-07-04 21:55 UTC (permalink / raw) To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches Branch: gh-4842-sync-replication Issue: https://github.com/tarantool/tarantool/issues/4849 Serge Petrenko (2): util: move cmp_i64 from xlog.c to util.h box: introduce a cfg handle to become syncro leader src/box/box.cc | 79 ++++++++++++++++++++++++++++++++++++++++ src/box/box.h | 1 + src/box/lua/cfg.cc | 9 +++++ src/box/lua/load_cfg.lua | 4 ++ src/box/txn_limbo.c | 16 +------- src/box/txn_limbo.h | 15 ++++++++ src/box/xlog.c | 10 +---- src/trivia/util.h | 11 ++++++ 8 files changed, 122 insertions(+), 23 deletions(-) -- 2.24.3 (Apple Git-128) ^ permalink raw reply [flat|nested] 8+ messages in thread
* [Tarantool-patches] [PATCH 1/2] util: move cmp_i64 from xlog.c to util.h 2020-07-04 21:55 [Tarantool-patches] [PATCH 0/2] introduce actions on leader election Serge Petrenko @ 2020-07-04 21:55 ` Serge Petrenko 2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko 1 sibling, 0 replies; 8+ messages in thread From: Serge Petrenko @ 2020-07-04 21:55 UTC (permalink / raw) To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches The comparator will be needed in other files too, e.g. box.cc Prerequisite #4849 --- src/box/xlog.c | 10 +--------- src/trivia/util.h | 11 +++++++++++ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/box/xlog.c b/src/box/xlog.c index b5b082a20..05f8c2e29 100644 --- a/src/box/xlog.c +++ b/src/box/xlog.c @@ -46,6 +46,7 @@ #include "xrow.h" #include "iproto_constants.h" #include "errinj.h" +#include "trivia/util.h" /* * FALLOC_FL_KEEP_SIZE flag has existed since fallocate() was @@ -475,15 +476,6 @@ xdir_open_cursor(struct xdir *dir, int64_t signature, return 0; } -static int -cmp_i64(const void *_a, const void *_b) -{ - const int64_t *a = (const int64_t *) _a, *b = (const int64_t *) _b; - if (*a == *b) - return 0; - return (*a > *b) ? 1 : -1; -} - /** * Scan (or rescan) a directory with snapshot or write ahead logs. * Read all files matching a pattern from the directory - diff --git a/src/trivia/util.h b/src/trivia/util.h index 29c7f0194..b344af303 100644 --- a/src/trivia/util.h +++ b/src/trivia/util.h @@ -534,6 +534,17 @@ double_compare_int64(double lhs, int64_t rhs, int k) return double_compare_nint64(lhs, rhs, k); } +/** + * Compare two operands as int64_t. + * Needed for qsort. + */ +static inline int +cmp_i64(const void *_a, const void *_b) +{ + const int64_t *a = (const int64_t *) _a, *b = (const int64_t *) _b; + return COMPARE_RESULT(*a, *b); +} + /** * Put the current thread in sleep for the given number of * seconds. -- 2.24.3 (Apple Git-128) ^ permalink raw reply [flat|nested] 8+ messages in thread
* [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader 2020-07-04 21:55 [Tarantool-patches] [PATCH 0/2] introduce actions on leader election Serge Petrenko 2020-07-04 21:55 ` [Tarantool-patches] [PATCH 1/2] util: move cmp_i64 from xlog.c to util.h Serge Petrenko @ 2020-07-04 21:55 ` Serge Petrenko 2020-07-04 23:03 ` Vladislav Shpilevoy ` (2 more replies) 1 sibling, 3 replies; 8+ messages in thread From: Serge Petrenko @ 2020-07-04 21:55 UTC (permalink / raw) To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches Introduce replication_synchro_leader option to box.cfg. Once an instance is promoted to leader, it makes sure that txn_limbo is free of previous leader's transactions. In order to achieve this goal, the instance first waits for 2 replication_synchro_timeouts so that confirmations and rollbacks from the former leader reach it. If the limbo remains non-empty, the new leader starts figuring out which transactions should be confirmed and which should be rolled back. In order to do so the instance scans through vclocks of all the instances that replicate from it and defines which former leader's lsn is the last reached by replication_synchro_quorum of replicas. Then the instance writes appropriate CONFIRM and ROLLBACK entries. After these actions the limbo must be empty, and the instance may proceed with appending its own entries to the limbo. Closes #4849 --- src/box/box.cc | 79 ++++++++++++++++++++++++++++++++++++++++ src/box/box.h | 1 + src/box/lua/cfg.cc | 9 +++++ src/box/lua/load_cfg.lua | 4 ++ src/box/txn_limbo.c | 16 +------- src/box/txn_limbo.h | 15 ++++++++ 6 files changed, 110 insertions(+), 14 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index ca24b98ca..087710383 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -78,6 +78,7 @@ #include "sequence.h" #include "sql_stmt_cache.h" #include "msgpack.h" +#include "trivia/util.h" static char status[64] = "unknown"; @@ -945,6 +946,84 @@ box_set_replication_anon(void) } +void +box_set_replication_synchro_leader(void) +{ + bool is_leader = cfg_geti("replication_synchro_leader"); + /* + * For now no actions required when an instance stops + * being a leader. We should probably wait until txn_limbo + * becomes empty. + */ + if (!is_leader) + return; + uint32_t former_leader_id = txn_limbo.instance_id; + if (former_leader_id == REPLICA_ID_NIL || + former_leader_id == instance_id) { + return; + } + + /* Wait until pending confirmations/rollbacks reach us. */ + double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo); + double start_tm = fiber_time(); + while (!txn_limbo_is_empty(&txn_limbo)) { + if (fiber_time() - start_tm > timeout) + break; + fiber_sleep(0.001); + } + + if (!txn_limbo_is_empty(&txn_limbo)) { + int64_t lsns[VCLOCK_MAX]; + int len = 0; + const struct vclock *vclock; + replicaset_foreach(replica) { + if (replica->relay != NULL && + relay_get_state(replica->relay) != RELAY_OFF && + !replica->anon) { + assert(!tt_uuid_is_equal(&INSTANCE_UUID, + &replica->uuid)); + vclock = relay_vclock(replica->relay); + int64_t lsn = vclock_get(vclock, + former_leader_id); + lsns[len++] = lsn; + } + } + lsns[len++] = vclock_get(box_vclock, former_leader_id); + assert(len < VCLOCK_MAX); + + int64_t confirm_lsn = 0; + if (len >= replication_synchro_quorum) { + qsort(lsns, len, sizeof(int64_t), cmp_i64); + confirm_lsn = lsns[len - replication_synchro_quorum]; + } + + struct txn_limbo_entry *e, *last_quorum = NULL; + struct txn_limbo_entry *rollback = NULL; + rlist_foreach_entry(e, &txn_limbo.queue, in_queue) { + if (txn_has_flag(e->txn, TXN_WAIT_ACK)) { + if (e->lsn <= confirm_lsn) { + last_quorum = e; + } else { + rollback = e; + break; + } + } + } + + if (last_quorum != NULL) { + confirm_lsn = last_quorum->lsn; + txn_limbo_write_confirm(&txn_limbo, confirm_lsn); + txn_limbo_read_confirm(&txn_limbo, confirm_lsn); + } + if (rollback != NULL) { + txn_limbo_write_rollback(&txn_limbo, rollback->lsn); + txn_limbo_read_rollback(&txn_limbo, rollback->lsn - 1); + } + + assert(txn_limbo_is_empty(&txn_limbo)); + } +} + void box_listen(void) { diff --git a/src/box/box.h b/src/box/box.h index f9789154e..565b0ebce 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -245,6 +245,7 @@ void box_set_replication_connect_quorum(void); void box_set_replication_sync_lag(void); int box_set_replication_synchro_quorum(void); int box_set_replication_synchro_timeout(void); +void box_set_replication_synchro_leader(void); void box_set_replication_sync_timeout(void); void box_set_replication_skip_conflict(void); void box_set_replication_anon(void); diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index d481155cd..adc1fcf3f 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -329,6 +329,14 @@ lbox_cfg_set_replication_synchro_timeout(struct lua_State *L) return 0; } +static int +lbox_cfg_set_replication_synchro_leader(struct lua_State *L) +{ + (void) L; + box_set_replication_synchro_leader(); + return 0; +} + static int lbox_cfg_set_replication_sync_timeout(struct lua_State *L) { @@ -388,6 +396,7 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag}, {"cfg_set_replication_synchro_quorum", lbox_cfg_set_replication_synchro_quorum}, {"cfg_set_replication_synchro_timeout", lbox_cfg_set_replication_synchro_timeout}, + {"cfg_set_replication_synchro_leader", lbox_cfg_set_replication_synchro_leader}, {"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout}, {"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict}, {"cfg_set_replication_anon", lbox_cfg_set_replication_anon}, diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 107bc1582..9a968f30e 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -91,6 +91,7 @@ local default_cfg = { replication_sync_timeout = 300, replication_synchro_quorum = 1, replication_synchro_timeout = 5, + replication_synchro_leader = false, replication_connect_timeout = 30, replication_connect_quorum = nil, -- connect all replication_skip_conflict = false, @@ -168,6 +169,7 @@ local template_cfg = { replication_sync_timeout = 'number', replication_synchro_quorum = 'number', replication_synchro_timeout = 'number', + replication_synchro_leader = 'boolean', replication_connect_timeout = 'number', replication_connect_quorum = 'number', replication_skip_conflict = 'boolean', @@ -286,6 +288,7 @@ local dynamic_cfg = { replication_sync_timeout = private.cfg_set_replication_sync_timeout, replication_synchro_quorum = private.cfg_set_replication_synchro_quorum, replication_synchro_timeout = private.cfg_set_replication_synchro_timeout, + replication_synchro_leader = private.cfg_set_replication_synchro_leader, replication_skip_conflict = private.cfg_set_replication_skip_conflict, replication_anon = private.cfg_set_replication_anon, instance_uuid = check_instance_uuid, @@ -333,6 +336,7 @@ local dynamic_cfg_order = { -- the new one. This should be fixed when box.cfg is able to -- apply some parameters together and atomically. replication_anon = 250, + replication_synchro__leader = 250, } local function sort_cfg_cb(l, r) diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index 44a0c7273..992115ad1 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -148,9 +148,6 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) return entry->is_commit; } -static int -txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn); - int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) { @@ -261,11 +258,7 @@ rollback: return -1; } -/** - * Write a confirmation entry to WAL. After it's written all the - * transactions waiting for confirmation may be finished. - */ -static int +int txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn) { return txn_limbo_write_confirm_rollback(limbo, lsn, true); @@ -303,12 +296,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) } } -/** - * Write a rollback message to WAL. After it's written - * all the transactions following the current one and waiting - * for confirmation must be rolled back. - */ -static int +int txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn) { return txn_limbo_write_confirm_rollback(limbo, lsn, false); diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index 3abbe9e85..5bf5827ac 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -205,6 +205,21 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); +/** + * Write a confirmation entry to WAL. After it's written all the + * transactions waiting for confirmation may be finished. + */ +int +txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn); + +/** + * Write a rollback message to WAL. After it's written + * all the transactions following the current one and waiting + * for confirmation must be rolled back. + */ +int +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn); + /** * Confirm all the entries up to the given master's LSN. */ -- 2.24.3 (Apple Git-128) ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader 2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko @ 2020-07-04 23:03 ` Vladislav Shpilevoy 2020-07-04 23:18 ` Vladislav Shpilevoy 2020-07-09 22:03 ` Leonid Vasiliev 2 siblings, 0 replies; 8+ messages in thread From: Vladislav Shpilevoy @ 2020-07-04 23:03 UTC (permalink / raw) To: Serge Petrenko, gorcunov, sergos; +Cc: tarantool-patches Hi! Thanks for the patch! I didn't review it properly yet. Just one comment. > diff --git a/src/box/box.cc b/src/box/box.cc > index ca24b98ca..087710383 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -945,6 +946,84 @@ box_set_replication_anon(void) > > } > > +void > +box_set_replication_synchro_leader(void) > +{ > + bool is_leader = cfg_geti("replication_synchro_leader"); > + /* > + * For now no actions required when an instance stops > + * being a leader. We should probably wait until txn_limbo > + * becomes empty. > + */ > + if (!is_leader) > + return; > + uint32_t former_leader_id = txn_limbo.instance_id; > + if (former_leader_id == REPLICA_ID_NIL || > + former_leader_id == instance_id) { > + return; > + } > + > + /* Wait until pending confirmations/rollbacks reach us. */ > + double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo); > + double start_tm = fiber_time(); > + while (!txn_limbo_is_empty(&txn_limbo)) { > + if (fiber_time() - start_tm > timeout) > + break; > + fiber_sleep(0.001); > + } > + > + if (!txn_limbo_is_empty(&txn_limbo)) { > + int64_t lsns[VCLOCK_MAX]; > + int len = 0; > + const struct vclock *vclock; > + replicaset_foreach(replica) { > + if (replica->relay != NULL && > + relay_get_state(replica->relay) != RELAY_OFF && > + !replica->anon) { > + assert(!tt_uuid_is_equal(&INSTANCE_UUID, > + &replica->uuid)); > + vclock = relay_vclock(replica->relay); > + int64_t lsn = vclock_get(vclock, > + former_leader_id); > + lsns[len++] = lsn; > + } > + } > + lsns[len++] = vclock_get(box_vclock, former_leader_id); > + assert(len < VCLOCK_MAX); > + > + int64_t confirm_lsn = 0; > + if (len >= replication_synchro_quorum) { > + qsort(lsns, len, sizeof(int64_t), cmp_i64); > + confirm_lsn = lsns[len - replication_synchro_quorum]; > + } > + Can the code below be moved to txn_limbo.c somehow? Doesn't look right to touch the queue here. > + struct txn_limbo_entry *e, *last_quorum = NULL; > + struct txn_limbo_entry *rollback = NULL; > + rlist_foreach_entry(e, &txn_limbo.queue, in_queue) { > + if (txn_has_flag(e->txn, TXN_WAIT_ACK)) { > + if (e->lsn <= confirm_lsn) { > + last_quorum = e; > + } else { > + rollback = e; > + break; > + } > + } > + } > + > + if (last_quorum != NULL) { > + confirm_lsn = last_quorum->lsn; > + txn_limbo_write_confirm(&txn_limbo, confirm_lsn); > + txn_limbo_read_confirm(&txn_limbo, confirm_lsn); > + } > + if (rollback != NULL) { > + txn_limbo_write_rollback(&txn_limbo, rollback->lsn); > + txn_limbo_read_rollback(&txn_limbo, rollback->lsn - 1); > + } > + > + assert(txn_limbo_is_empty(&txn_limbo)); > + } > +} ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader 2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko 2020-07-04 23:03 ` Vladislav Shpilevoy @ 2020-07-04 23:18 ` Vladislav Shpilevoy 2020-07-05 11:09 ` Serge Petrenko 2020-07-09 22:03 ` Leonid Vasiliev 2 siblings, 1 reply; 8+ messages in thread From: Vladislav Shpilevoy @ 2020-07-04 23:18 UTC (permalink / raw) To: Serge Petrenko, gorcunov, sergos; +Cc: tarantool-patches Here is also a general problem - having this as box.cfg option means, that the selected leader should stay selected regardless of what happens in the cluster. In particular, it should reject any attempts to add an entry into the limbo, not originated from this instance. Currently this is not guaranteed, see comment below. > diff --git a/src/box/box.cc b/src/box/box.cc > index ca24b98ca..087710383 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -78,6 +78,7 @@ > #include "sequence.h" > #include "sql_stmt_cache.h" > #include "msgpack.h" > +#include "trivia/util.h" > > static char status[64] = "unknown"; > > @@ -945,6 +946,84 @@ box_set_replication_anon(void) > > } > > +void > +box_set_replication_synchro_leader(void) > +{ > + bool is_leader = cfg_geti("replication_synchro_leader"); > + /* > + * For now no actions required when an instance stops > + * being a leader. We should probably wait until txn_limbo > + * becomes empty. > + */ > + if (!is_leader) > + return; > + uint32_t former_leader_id = txn_limbo.instance_id; > + if (former_leader_id == REPLICA_ID_NIL || > + former_leader_id == instance_id) { When limbo is empty, it will change its instance id to whatever entry will be added next. So it can happen, that I gave replication_synchro_leader to 2 instances, and if they will create transactions one at a time, this will work. But looks wrong. Perhaps it would be better to add a box.ctl function to do this 'limbo cleanup'? Without persisting any leader role in a config. Until we have a better understanding how leader-read_only-master roles coexist. ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader 2020-07-04 23:18 ` Vladislav Shpilevoy @ 2020-07-05 11:09 ` Serge Petrenko 0 siblings, 0 replies; 8+ messages in thread From: Serge Petrenko @ 2020-07-05 11:09 UTC (permalink / raw) To: Vladislav Shpilevoy, gorcunov, sergos; +Cc: tarantool-patches 05.07.2020 02:18, Vladislav Shpilevoy пишет: > Here is also a general problem - having this as box.cfg option > means, that the selected leader should stay selected regardless > of what happens in the cluster. In particular, it should reject > any attempts to add an entry into the limbo, not originated from > this instance. > > Currently this is not guaranteed, see comment below. Thanks for the answer! > >> diff --git a/src/box/box.cc b/src/box/box.cc >> index ca24b98ca..087710383 100644 >> --- a/src/box/box.cc >> +++ b/src/box/box.cc >> @@ -78,6 +78,7 @@ >> #include "sequence.h" >> #include "sql_stmt_cache.h" >> #include "msgpack.h" >> +#include "trivia/util.h" >> >> static char status[64] = "unknown"; >> >> @@ -945,6 +946,84 @@ box_set_replication_anon(void) >> >> } >> >> +void >> +box_set_replication_synchro_leader(void) >> +{ >> + bool is_leader = cfg_geti("replication_synchro_leader"); >> + /* >> + * For now no actions required when an instance stops >> + * being a leader. We should probably wait until txn_limbo >> + * becomes empty. >> + */ >> + if (!is_leader) >> + return; >> + uint32_t former_leader_id = txn_limbo.instance_id; >> + if (former_leader_id == REPLICA_ID_NIL || >> + former_leader_id == instance_id) { > When limbo is empty, it will change its instance id to whatever > entry will be added next. So it can happen, that I gave replication_synchro_leader > to 2 instances, and if they will create transactions one at a > time, this will work. But looks wrong. Good catch. > > Perhaps it would be better to add a box.ctl function to do this > 'limbo cleanup'? Without persisting any leader role in a config. > Until we have a better understanding how leader-read_only-master > roles coexist. Agree. I updated the patch according to your comments. I'm posting it here. Subject: [PATCH] box.ctl: introduce clear_synchro_queue function Introduce a new function to box.ctl API: box.ctl.clear_synchro_queue() The function performs some actions to make sure that after it's executed, the txn_limbo is free of any transactions issued on a remote instance. In order to achieve this goal, the instance first waits for 2 replication_synchro_timeouts so that confirmations and rollbacks from the remote instance reach it. If the limbo remains non-empty, the instance starts figuring out which transactions should be confirmed and which should be rolled back. In order to do so the instance scans through vclocks of all the instances that replicate from it and defines which old leader's lsn is the last reached by replication_synchro_quorum of replicas. Then the instance writes appropriate CONFIRM and ROLLBACK entries. After these actions the limbo must be empty. Closes #4849 --- src/box/box.cc | 50 +++++++++++++++++++++++++++++++++++++++++++++ src/box/box.h | 2 ++ src/box/lua/ctl.c | 9 ++++++++ src/box/txn_limbo.c | 26 +++++++++++++++++++++++ src/box/txn_limbo.h | 10 +++++++++ 5 files changed, 97 insertions(+) diff --git a/src/box/box.cc b/src/box/box.cc index ca24b98ca..749c96ca1 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -78,6 +78,7 @@ #include "sequence.h" #include "sql_stmt_cache.h" #include "msgpack.h" +#include "trivia/util.h" static char status[64] = "unknown"; @@ -945,6 +946,55 @@ box_set_replication_anon(void) } +void +box_clear_synchro_queue(void) +{ + if (!is_box_configured || txn_limbo_is_empty(&txn_limbo)) + return; + uint32_t former_leader_id = txn_limbo.instance_id; + assert(former_leader_id != REPLICA_ID_NIL); + if (former_leader_id == instance_id) + return; + + /* Wait until pending confirmations/rollbacks reach us. */ + double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo); + double start_tm = fiber_time(); + while (!txn_limbo_is_empty(&txn_limbo)) { + if (fiber_time() - start_tm > timeout) + break; + fiber_sleep(0.001); + } + + if (!txn_limbo_is_empty(&txn_limbo)) { + int64_t lsns[VCLOCK_MAX]; + int len = 0; + const struct vclock *vclock; + replicaset_foreach(replica) { + if (replica->relay != NULL && + relay_get_state(replica->relay) != RELAY_OFF && + !replica->anon) { + assert(!tt_uuid_is_equal(&INSTANCE_UUID, + &replica->uuid)); + vclock = relay_vclock(replica->relay); + int64_t lsn = vclock_get(vclock, + former_leader_id); + lsns[len++] = lsn; + } + } + lsns[len++] = vclock_get(box_vclock, former_leader_id); + assert(len < VCLOCK_MAX); + + int64_t confirm_lsn = 0; + if (len >= replication_synchro_quorum) { + qsort(lsns, len, sizeof(int64_t), cmp_i64); + confirm_lsn = lsns[len - replication_synchro_quorum]; + } + + txn_limbo_force_empty(&txn_limbo, confirm_lsn); + assert(txn_limbo_is_empty(&txn_limbo)); + } +} + void box_listen(void) { diff --git a/src/box/box.h b/src/box/box.h index f9789154e..5c4a5ed78 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -258,6 +258,8 @@ extern "C" { typedef struct tuple box_tuple_t; +void box_clear_synchro_queue(void); + /* box_select is private and used only by FFI */ API_EXPORT int box_select(uint32_t space_id, uint32_t index_id, diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c index 85ed30c50..2017ddc18 100644 --- a/src/box/lua/ctl.c +++ b/src/box/lua/ctl.c @@ -78,11 +78,20 @@ lbox_ctl_on_schema_init(struct lua_State *L) return lbox_trigger_reset(L, 2, &on_schema_init, NULL, NULL); } +static int +lbox_ctl_clear_synchro_queue(struct lua_State *L) +{ + (void) L; + box_clear_synchro_queue(); + return 0; +} + static const struct luaL_Reg lbox_ctl_lib[] = { {"wait_ro", lbox_ctl_wait_ro}, {"wait_rw", lbox_ctl_wait_rw}, {"on_shutdown", lbox_ctl_on_shutdown}, {"on_schema_init", lbox_ctl_on_schema_init}, + {"clear_synchro_queue", lbox_ctl_clear_synchro_queue}, {NULL, NULL} }; diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index 44a0c7273..9603d3eb3 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -482,6 +482,32 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo) return 0; } +void +txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn) +{ + struct txn_limbo_entry *e, *last_quorum = NULL; + struct txn_limbo_entry *rollback = NULL; + rlist_foreach_entry(e, &limbo->queue, in_queue) { + if (txn_has_flag(e->txn, TXN_WAIT_ACK)) { + if (e->lsn <= confirm_lsn) { + last_quorum = e; + } else { + rollback = e; + break; + } + } + } + + if (last_quorum != NULL) { + txn_limbo_write_confirm(limbo, last_quorum->lsn); + txn_limbo_read_confirm(limbo, last_quorum->lsn); + } + if (rollback != NULL) { + txn_limbo_write_rollback(limbo, rollback->lsn); + txn_limbo_read_rollback(limbo, rollback->lsn - 1); + } +} + void txn_limbo_init(void) { diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index 3abbe9e85..1c945f21f 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -237,6 +237,16 @@ txn_limbo_confirm_timeout(struct txn_limbo *limbo); int txn_limbo_wait_confirm(struct txn_limbo *limbo); +/** + * Make txn_limbo confirm all the entries with lsn less than or + * equal to the given one, and rollback all the following entries. + * The function makes txn_limbo write CONFIRM and ROLLBACK + * messages for appropriate lsns, and then process the messages + * immediately. + */ +void +txn_limbo_force_empty(struct txn_limbo *limbo, int64_t last_confirm); + void txn_limbo_init(); -- 2.24.3 (Apple Git-128) -- Serge Petrenko ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader 2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko 2020-07-04 23:03 ` Vladislav Shpilevoy 2020-07-04 23:18 ` Vladislav Shpilevoy @ 2020-07-09 22:03 ` Leonid Vasiliev 2020-07-20 21:13 ` Vladislav Shpilevoy 2 siblings, 1 reply; 8+ messages in thread From: Leonid Vasiliev @ 2020-07-09 22:03 UTC (permalink / raw) To: Serge Petrenko, v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches Hi! Thank you for the patch. Sergey is in the army now, so my comment is addressed to Vlad. On 05.07.2020 00:55, Serge Petrenko wrote: > Introduce replication_synchro_leader option to box.cfg. > > Once an instance is promoted to leader, it makes sure that txn_limbo is > free of previous leader's transactions. In order to achieve this goal, > the instance first waits for 2 replication_synchro_timeouts so that > confirmations and rollbacks from the former leader reach it. > > If the limbo remains non-empty, the new leader starts figuring out which > transactions should be confirmed and which should be rolled back. In > order to do so the instance scans through vclocks of all the instances > that replicate from it and defines which former leader's lsn is the last > reached by replication_synchro_quorum of replicas. > > Then the instance writes appropriate CONFIRM and ROLLBACK entries. > After these actions the limbo must be empty, and the instance may > proceed with appending its own entries to the limbo. > > Closes #4849 > --- > src/box/box.cc | 79 ++++++++++++++++++++++++++++++++++++++++ > src/box/box.h | 1 + > src/box/lua/cfg.cc | 9 +++++ > src/box/lua/load_cfg.lua | 4 ++ > src/box/txn_limbo.c | 16 +------- > src/box/txn_limbo.h | 15 ++++++++ > 6 files changed, 110 insertions(+), 14 deletions(-) > > diff --git a/src/box/box.cc b/src/box/box.cc > index ca24b98ca..087710383 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -78,6 +78,7 @@ > #include "sequence.h" > #include "sql_stmt_cache.h" > #include "msgpack.h" > +#include "trivia/util.h" > > static char status[64] = "unknown"; > > @@ -945,6 +946,84 @@ box_set_replication_anon(void) > > } > > +void > +box_set_replication_synchro_leader(void) > +{ > + bool is_leader = cfg_geti("replication_synchro_leader"); > + /* > + * For now no actions required when an instance stops > + * being a leader. We should probably wait until txn_limbo > + * becomes empty. > + */ > + if (!is_leader) > + return; > + uint32_t former_leader_id = txn_limbo.instance_id; > + if (former_leader_id == REPLICA_ID_NIL || > + former_leader_id == instance_id) { > + return; > + } > + > + /* Wait until pending confirmations/rollbacks reach us. */ > + double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo); > + double start_tm = fiber_time(); > + while (!txn_limbo_is_empty(&txn_limbo)) { > + if (fiber_time() - start_tm > timeout) From the d988d7fb92fe1dda4b64218fb06813e93eb56ed1 commit comment: " ...use fiber_clock() instead of fiber_time() for timeouts fiber_time() reports real time, which shouldn't be used for calculating timeouts as it is affected by system time changes. Add fiber_clock() based on ev_monotonic_now(), export it to Lua, and use it instead. ". > + break; > + fiber_sleep(0.001); > + } > + > + if (!txn_limbo_is_empty(&txn_limbo)) { > + int64_t lsns[VCLOCK_MAX]; > + int len = 0; > + const struct vclock *vclock; > + replicaset_foreach(replica) { > + if (replica->relay != NULL && > + relay_get_state(replica->relay) != RELAY_OFF && > + !replica->anon) { > + assert(!tt_uuid_is_equal(&INSTANCE_UUID, > + &replica->uuid)); > + vclock = relay_vclock(replica->relay); > + int64_t lsn = vclock_get(vclock, > + former_leader_id); > + lsns[len++] = lsn; > + } > + } > + lsns[len++] = vclock_get(box_vclock, former_leader_id); > + assert(len < VCLOCK_MAX); > + > + int64_t confirm_lsn = 0; > + if (len >= replication_synchro_quorum) { > + qsort(lsns, len, sizeof(int64_t), cmp_i64); > + confirm_lsn = lsns[len - replication_synchro_quorum]; > + } > + > + struct txn_limbo_entry *e, *last_quorum = NULL; > + struct txn_limbo_entry *rollback = NULL; > + rlist_foreach_entry(e, &txn_limbo.queue, in_queue) { > + if (txn_has_flag(e->txn, TXN_WAIT_ACK)) { > + if (e->lsn <= confirm_lsn) { > + last_quorum = e; > + } else { > + rollback = e; > + break; > + } > + } > + } > + > + if (last_quorum != NULL) { > + confirm_lsn = last_quorum->lsn; > + txn_limbo_write_confirm(&txn_limbo, confirm_lsn); > + txn_limbo_read_confirm(&txn_limbo, confirm_lsn); > + } > + if (rollback != NULL) { > + txn_limbo_write_rollback(&txn_limbo, rollback->lsn); > + txn_limbo_read_rollback(&txn_limbo, rollback->lsn - 1); > + } > + > + assert(txn_limbo_is_empty(&txn_limbo)); > + } > +} > + > void > box_listen(void) > { > diff --git a/src/box/box.h b/src/box/box.h > index f9789154e..565b0ebce 100644 > --- a/src/box/box.h > +++ b/src/box/box.h > @@ -245,6 +245,7 @@ void box_set_replication_connect_quorum(void); > void box_set_replication_sync_lag(void); > int box_set_replication_synchro_quorum(void); > int box_set_replication_synchro_timeout(void); > +void box_set_replication_synchro_leader(void); > void box_set_replication_sync_timeout(void); > void box_set_replication_skip_conflict(void); > void box_set_replication_anon(void); > diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc > index d481155cd..adc1fcf3f 100644 > --- a/src/box/lua/cfg.cc > +++ b/src/box/lua/cfg.cc > @@ -329,6 +329,14 @@ lbox_cfg_set_replication_synchro_timeout(struct lua_State *L) > return 0; > } > > +static int > +lbox_cfg_set_replication_synchro_leader(struct lua_State *L) > +{ > + (void) L; > + box_set_replication_synchro_leader(); > + return 0; > +} > + > static int > lbox_cfg_set_replication_sync_timeout(struct lua_State *L) > { > @@ -388,6 +396,7 @@ box_lua_cfg_init(struct lua_State *L) > {"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag}, > {"cfg_set_replication_synchro_quorum", lbox_cfg_set_replication_synchro_quorum}, > {"cfg_set_replication_synchro_timeout", lbox_cfg_set_replication_synchro_timeout}, > + {"cfg_set_replication_synchro_leader", lbox_cfg_set_replication_synchro_leader}, > {"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout}, > {"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict}, > {"cfg_set_replication_anon", lbox_cfg_set_replication_anon}, > diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua > index 107bc1582..9a968f30e 100644 > --- a/src/box/lua/load_cfg.lua > +++ b/src/box/lua/load_cfg.lua > @@ -91,6 +91,7 @@ local default_cfg = { > replication_sync_timeout = 300, > replication_synchro_quorum = 1, > replication_synchro_timeout = 5, > + replication_synchro_leader = false, > replication_connect_timeout = 30, > replication_connect_quorum = nil, -- connect all > replication_skip_conflict = false, > @@ -168,6 +169,7 @@ local template_cfg = { > replication_sync_timeout = 'number', > replication_synchro_quorum = 'number', > replication_synchro_timeout = 'number', > + replication_synchro_leader = 'boolean', > replication_connect_timeout = 'number', > replication_connect_quorum = 'number', > replication_skip_conflict = 'boolean', > @@ -286,6 +288,7 @@ local dynamic_cfg = { > replication_sync_timeout = private.cfg_set_replication_sync_timeout, > replication_synchro_quorum = private.cfg_set_replication_synchro_quorum, > replication_synchro_timeout = private.cfg_set_replication_synchro_timeout, > + replication_synchro_leader = private.cfg_set_replication_synchro_leader, > replication_skip_conflict = private.cfg_set_replication_skip_conflict, > replication_anon = private.cfg_set_replication_anon, > instance_uuid = check_instance_uuid, > @@ -333,6 +336,7 @@ local dynamic_cfg_order = { > -- the new one. This should be fixed when box.cfg is able to > -- apply some parameters together and atomically. > replication_anon = 250, > + replication_synchro__leader = 250, > } > > local function sort_cfg_cb(l, r) > diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c > index 44a0c7273..992115ad1 100644 > --- a/src/box/txn_limbo.c > +++ b/src/box/txn_limbo.c > @@ -148,9 +148,6 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > return entry->is_commit; > } > > -static int > -txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn); > - > int > txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > { > @@ -261,11 +258,7 @@ rollback: > return -1; > } > > -/** > - * Write a confirmation entry to WAL. After it's written all the > - * transactions waiting for confirmation may be finished. > - */ > -static int > +int > txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn) > { > return txn_limbo_write_confirm_rollback(limbo, lsn, true); > @@ -303,12 +296,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) > } > } > > -/** > - * Write a rollback message to WAL. After it's written > - * all the transactions following the current one and waiting > - * for confirmation must be rolled back. > - */ > -static int > +int > txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn) > { > return txn_limbo_write_confirm_rollback(limbo, lsn, false); > diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h > index 3abbe9e85..5bf5827ac 100644 > --- a/src/box/txn_limbo.h > +++ b/src/box/txn_limbo.h > @@ -205,6 +205,21 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); > int > txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); > > +/** > + * Write a confirmation entry to WAL. After it's written all the > + * transactions waiting for confirmation may be finished. > + */ > +int > +txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn); > + > +/** > + * Write a rollback message to WAL. After it's written > + * all the transactions following the current one and waiting > + * for confirmation must be rolled back. > + */ > +int > +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn); > + > /** > * Confirm all the entries up to the given master's LSN. > */ > ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader 2020-07-09 22:03 ` Leonid Vasiliev @ 2020-07-20 21:13 ` Vladislav Shpilevoy 0 siblings, 0 replies; 8+ messages in thread From: Vladislav Shpilevoy @ 2020-07-20 21:13 UTC (permalink / raw) To: Leonid Vasiliev, Serge Petrenko, gorcunov, sergos; +Cc: tarantool-patches Hi! Thanks for the review! >> diff --git a/src/box/box.cc b/src/box/box.cc >> index ca24b98ca..087710383 100644 >> --- a/src/box/box.cc >> +++ b/src/box/box.cc >> @@ -78,6 +78,7 @@ >> #include "sequence.h" >> #include "sql_stmt_cache.h" >> #include "msgpack.h" >> +#include "trivia/util.h" >> static char status[64] = "unknown"; >> @@ -945,6 +946,84 @@ box_set_replication_anon(void) >> } >> +void >> +box_set_replication_synchro_leader(void) >> +{ >> + bool is_leader = cfg_geti("replication_synchro_leader"); >> + /* >> + * For now no actions required when an instance stops >> + * being a leader. We should probably wait until txn_limbo >> + * becomes empty. >> + */ >> + if (!is_leader) >> + return; >> + uint32_t former_leader_id = txn_limbo.instance_id; >> + if (former_leader_id == REPLICA_ID_NIL || >> + former_leader_id == instance_id) { >> + return; >> + } >> + >> + /* Wait until pending confirmations/rollbacks reach us. */ >> + double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo); >> + double start_tm = fiber_time(); >> + while (!txn_limbo_is_empty(&txn_limbo)) { >> + if (fiber_time() - start_tm > timeout) > > From the d988d7fb92fe1dda4b64218fb06813e93eb56ed1 commit comment: > " > ...use fiber_clock() instead of fiber_time() for timeouts > > fiber_time() reports real time, which shouldn't be used for calculating > timeouts as it is affected by system time changes. Add fiber_clock() > based on ev_monotonic_now(), export it to Lua, and use it instead. > ". Indeed. I've sent a patch fixing it. ^ permalink raw reply [flat|nested] 8+ messages in thread
end of thread, other threads:[~2020-07-20 21:13 UTC | newest] Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2020-07-04 21:55 [Tarantool-patches] [PATCH 0/2] introduce actions on leader election Serge Petrenko 2020-07-04 21:55 ` [Tarantool-patches] [PATCH 1/2] util: move cmp_i64 from xlog.c to util.h Serge Petrenko 2020-07-04 21:55 ` [Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader Serge Petrenko 2020-07-04 23:03 ` Vladislav Shpilevoy 2020-07-04 23:18 ` Vladislav Shpilevoy 2020-07-05 11:09 ` Serge Petrenko 2020-07-09 22:03 ` Leonid Vasiliev 2020-07-20 21:13 ` Vladislav Shpilevoy
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox