* [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything @ 2020-12-23 11:59 Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 1/6] box: add a single execution guard to clear_synchro_queue Serge Petrenko ` (6 more replies) 0 siblings, 7 replies; 18+ messages in thread From: Serge Petrenko @ 2020-12-23 11:59 UTC (permalink / raw) To: v.shpilevoy, gorcunov; +Cc: tarantool-patches Changes in v2: - implement something similar to relay_lsn_watcher with the help of triggers. - fix election_qsync_stress test hang. One issue with the test itself and one issue with the txn_limbo. - various minor fixes. - change clear_synchro_queue behaviour: wait for quorum for replication_synchro_timeout. If the quorum isn't reached, rollback nothing, and warn the user. @ChangeLog - Change `box.ctl.clear_synchro_queue` behaviour. Now it tries to commit everything that is present on the node. In order to do so it waits for other instances to replicate the data for `replication_synchro_quorum` seconds. In case timeout passes and quorum wasn't reached, nothing is rolled back (gh-5435) Serge Petrenko (6): box: add a single execution guard to clear_synchro_queue relay: introduce on_status_update trigger txn_limbo: introduce txn_limbo_last_synchro_entry method box: rework clear_synchro_queue to commit everything test: fix replication/election_qsync_stress test txn_limbo: ignore CONFIRM/ROLLBACK for a foreign master src/box/applier.cc | 3 +- src/box/box.cc | 170 +++++++++++++++--- src/box/box.h | 2 +- src/box/lua/ctl.c | 4 +- src/box/raft.c | 9 +- src/box/relay.cc | 12 ++ src/box/relay.h | 4 + src/box/txn_limbo.c | 26 ++- src/box/txn_limbo.h | 9 +- test/replication/election_qsync_stress.result | 7 +- .../election_qsync_stress.test.lua | 7 +- test/replication/election_replica.lua | 5 +- ...5435-clear-synchro-queue-commit-all.result | 144 +++++++++++++++ ...35-clear-synchro-queue-commit-all.test.lua | 65 +++++++ test/replication/suite.cfg | 1 + 15 files changed, 423 insertions(+), 45 deletions(-) create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua -- 2.24.3 (Apple Git-128) ^ permalink raw reply [flat|nested] 18+ messages in thread
* [Tarantool-patches] [PATCH v2 1/6] box: add a single execution guard to clear_synchro_queue 2020-12-23 11:59 [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything Serge Petrenko @ 2020-12-23 11:59 ` Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 2/6] relay: introduce on_status_update trigger Serge Petrenko ` (5 subsequent siblings) 6 siblings, 0 replies; 18+ messages in thread From: Serge Petrenko @ 2020-12-23 11:59 UTC (permalink / raw) To: v.shpilevoy, gorcunov; +Cc: tarantool-patches Clear_synchro_queue isn't meant to be called multiple times on a single instance. Multiple simultaneous invocations of clear_synhcro_queue() shouldn't hurt now, since clear_synchro_queue simply exits on an empty limbo, but may be harmful in future, when clear_synchro_queue is reworked. Prohibit such misuse by introducing an execution guard and raising an error once duplicate invocation is detected. Prerequisite #5435 --- src/box/box.cc | 18 +++++++++++++++--- src/box/box.h | 2 +- src/box/lua/ctl.c | 4 ++-- src/box/raft.c | 9 +++++++-- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 306db7c37..2d403fc9a 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1002,15 +1002,24 @@ box_set_replication_anon(void) } -void +int box_clear_synchro_queue(bool try_wait) { + /* A guard to block multiple simultaneous function invocations. */ + static bool in_clear_synchro_queue = false; + if (in_clear_synchro_queue) { + diag_set(ClientError, ER_UNSUPPORTED, "clear_synchro_queue", + "simultaneous invocations"); + return -1; + } if (!is_box_configured || txn_limbo_is_empty(&txn_limbo)) - return; + return 0; uint32_t former_leader_id = txn_limbo.owner_id; assert(former_leader_id != REPLICA_ID_NIL); if (former_leader_id == instance_id) - return; + return 0; + + in_clear_synchro_queue = true; if (try_wait) { /* Wait until pending confirmations/rollbacks reach us. */ @@ -1051,6 +1060,9 @@ box_clear_synchro_queue(bool try_wait) txn_limbo_force_empty(&txn_limbo, confirm_lsn); assert(txn_limbo_is_empty(&txn_limbo)); } + + in_clear_synchro_queue = false; + return 0; } void diff --git a/src/box/box.h b/src/box/box.h index b47a220b7..95336688b 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -266,7 +266,7 @@ extern "C" { typedef struct tuple box_tuple_t; -void +int box_clear_synchro_queue(bool try_wait); /* box_select is private and used only by FFI */ diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c index bb4841994..b44474b12 100644 --- a/src/box/lua/ctl.c +++ b/src/box/lua/ctl.c @@ -83,8 +83,8 @@ lbox_ctl_on_schema_init(struct lua_State *L) static int lbox_ctl_clear_synchro_queue(struct lua_State *L) { - (void) L; - box_clear_synchro_queue(true); + if (box_clear_synchro_queue(true) != 0) + return luaT_error(L); return 0; } diff --git a/src/box/raft.c b/src/box/raft.c index 8f32a9662..1942df952 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -89,9 +89,14 @@ box_raft_update_synchro_queue(struct raft *raft) * If the node became a leader, it means it will ignore all records from * all the other nodes, and won't get late CONFIRM messages anyway. Can * clear the queue without waiting for confirmations. + * It's alright that the user may have called clear_synchro_queue + * manually. In this case the call below will exit immediately and we'll + * simply log a warning. */ - if (raft->state == RAFT_STATE_LEADER) - box_clear_synchro_queue(false); + if (raft->state == RAFT_STATE_LEADER) { + if (box_clear_synchro_queue(false) != 0) + diag_log(); + } } static int -- 2.24.3 (Apple Git-128) ^ permalink raw reply [flat|nested] 18+ messages in thread
* [Tarantool-patches] [PATCH v2 2/6] relay: introduce on_status_update trigger 2020-12-23 11:59 [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 1/6] box: add a single execution guard to clear_synchro_queue Serge Petrenko @ 2020-12-23 11:59 ` Serge Petrenko 2020-12-23 17:25 ` Vladislav Shpilevoy 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 3/6] txn_limbo: introduce txn_limbo_last_synchro_entry method Serge Petrenko ` (4 subsequent siblings) 6 siblings, 1 reply; 18+ messages in thread From: Serge Petrenko @ 2020-12-23 11:59 UTC (permalink / raw) To: v.shpilevoy, gorcunov; +Cc: tarantool-patches The trigger is fired from tx_status_update(), which is the notification for tx that relay's vclock is updated. The trigger will be used to collect synchronous transactions quorum for old leader's transactions. Part of #5435 --- src/box/relay.cc | 12 ++++++++++++ src/box/relay.h | 4 ++++ 2 files changed, 16 insertions(+) diff --git a/src/box/relay.cc b/src/box/relay.cc index e16ac5a6b..dbad8a680 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -146,6 +146,8 @@ struct relay { alignas(CACHELINE_SIZE) /** Known relay vclock. */ struct vclock vclock; + /** Triggers fired once tx knows the new relay vclock. */ + struct rlist on_status_update; /** * True if the relay needs Raft updates. It can live fine * without sending Raft updates, if it is a relay to an @@ -173,6 +175,12 @@ relay_vclock(const struct relay *relay) return &relay->tx.vclock; } +void +relay_on_status_update(struct relay *relay, struct trigger *trigger) +{ + trigger_add(&relay->tx.on_status_update, trigger); +} + double relay_last_row_time(const struct relay *relay) { @@ -201,6 +209,7 @@ relay_new(struct replica *replica) diag_create(&relay->diag); stailq_create(&relay->pending_gc); relay->state = RELAY_OFF; + rlist_create(&relay->tx.on_status_update); return relay; } @@ -423,6 +432,9 @@ tx_status_update(struct cmsg *msg) txn_limbo_ack(&txn_limbo, status->relay->replica->id, vclock_get(&status->vclock, instance_id)); } + + trigger_run(&status->relay->tx.on_status_update, status->relay); + static const struct cmsg_hop route[] = { {relay_status_update, NULL} }; diff --git a/src/box/relay.h b/src/box/relay.h index b32e2ea2a..4e6ecaffb 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -93,6 +93,10 @@ relay_vclock(const struct relay *relay); double relay_last_row_time(const struct relay *relay); +/** Set a trigger on relay vclock update as seen by tx. */ +void +relay_on_status_update(struct relay *relay, struct trigger *trigger); + /** * Send a Raft update request to the relay channel. It is not * guaranteed that it will be delivered. The connection may break. -- 2.24.3 (Apple Git-128) ^ permalink raw reply [flat|nested] 18+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 2/6] relay: introduce on_status_update trigger 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 2/6] relay: introduce on_status_update trigger Serge Petrenko @ 2020-12-23 17:25 ` Vladislav Shpilevoy 2020-12-24 16:11 ` Serge Petrenko 0 siblings, 1 reply; 18+ messages in thread From: Vladislav Shpilevoy @ 2020-12-23 17:25 UTC (permalink / raw) To: Serge Petrenko, gorcunov; +Cc: tarantool-patches Thanks for the patch! On 23.12.2020 12:59, Serge Petrenko via Tarantool-patches wrote: > The trigger is fired from tx_status_update(), which is the notification > for tx that relay's vclock is updated. > > The trigger will be used to collect synchronous transactions quorum for > old leader's transactions. I meant the trigger could be global, not in relay. The main point of the proposal was not to touch relays almost anyhow. It would make things even simpler. I tried to implement it on your branch. See comments in the next emails, and 1 comment below, in case my proposal won't work somewhy. > Part of #5435 > --- > src/box/relay.cc | 12 ++++++++++++ > src/box/relay.h | 4 ++++ > 2 files changed, 16 insertions(+) > > diff --git a/src/box/relay.cc b/src/box/relay.cc > index e16ac5a6b..dbad8a680 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -201,6 +209,7 @@ relay_new(struct replica *replica) > diag_create(&relay->diag); > stailq_create(&relay->pending_gc); > relay->state = RELAY_OFF; > + rlist_create(&relay->tx.on_status_update); You also need to add trigger_destroy() to relay_stop/relay_delete. > return relay; > } ^ permalink raw reply [flat|nested] 18+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 2/6] relay: introduce on_status_update trigger 2020-12-23 17:25 ` Vladislav Shpilevoy @ 2020-12-24 16:11 ` Serge Petrenko 0 siblings, 0 replies; 18+ messages in thread From: Serge Petrenko @ 2020-12-24 16:11 UTC (permalink / raw) To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches 23.12.2020 20:25, Vladislav Shpilevoy пишет: > Thanks for the patch! > > On 23.12.2020 12:59, Serge Petrenko via Tarantool-patches wrote: >> The trigger is fired from tx_status_update(), which is the notification >> for tx that relay's vclock is updated. >> >> The trigger will be used to collect synchronous transactions quorum for >> old leader's transactions. > I meant the trigger could be global, not in relay. The main point of the > proposal was not to touch relays almost anyhow. It would make things even > simpler. I tried to implement it on your branch. See comments in the next > emails, and 1 comment below, in case my proposal won't work somewhy. Thanks for the review & for your suggestion! Sorry I didn't understand your proposal at first. It looks good. I've split your commit and amended this patch and the patch "rework clear_synchro_queue". >> Part of #5435 >> --- >> src/box/relay.cc | 12 ++++++++++++ >> src/box/relay.h | 4 ++++ >> 2 files changed, 16 insertions(+) >> >> diff --git a/src/box/relay.cc b/src/box/relay.cc >> index e16ac5a6b..dbad8a680 100644 >> --- a/src/box/relay.cc >> +++ b/src/box/relay.cc >> @@ -201,6 +209,7 @@ relay_new(struct replica *replica) >> diag_create(&relay->diag); >> stailq_create(&relay->pending_gc); >> relay->state = RELAY_OFF; >> + rlist_create(&relay->tx.on_status_update); > You also need to add trigger_destroy() to relay_stop/relay_delete. > >> return relay; >> } -- Serge Petrenko ^ permalink raw reply [flat|nested] 18+ messages in thread
* [Tarantool-patches] [PATCH v2 3/6] txn_limbo: introduce txn_limbo_last_synchro_entry method 2020-12-23 11:59 [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 1/6] box: add a single execution guard to clear_synchro_queue Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 2/6] relay: introduce on_status_update trigger Serge Petrenko @ 2020-12-23 11:59 ` Serge Petrenko 2020-12-23 17:25 ` Vladislav Shpilevoy 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything Serge Petrenko ` (3 subsequent siblings) 6 siblings, 1 reply; 18+ messages in thread From: Serge Petrenko @ 2020-12-23 11:59 UTC (permalink / raw) To: v.shpilevoy, gorcunov; +Cc: tarantool-patches It'll be useful for box_clear_synchro_queue rework. Prerequisite #5435 --- src/box/txn_limbo.c | 12 ++++++++++++ src/box/txn_limbo.h | 7 +++++++ 2 files changed, 19 insertions(+) diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index c406ed4c8..9272f5227 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -55,6 +55,18 @@ txn_limbo_is_ro(struct txn_limbo *limbo) return limbo->owner_id != instance_id && !txn_limbo_is_empty(limbo); } +struct txn_limbo_entry * +txn_limbo_last_synchro_entry(struct txn_limbo *limbo) +{ + struct txn_limbo_entry *entry = NULL; + rlist_foreach_entry_reverse(entry, &limbo->queue, in_queue) { + if (txn_has_flag(entry->txn, TXN_WAIT_ACK)) + break; + } + assert(entry == NULL || txn_has_flag(entry->txn, TXN_WAIT_ACK)); + return entry; +} + struct txn_limbo_entry * txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn) { diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index f7d67a826..a49356c14 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -189,6 +189,13 @@ txn_limbo_last_entry(struct txn_limbo *limbo) in_queue); } +/** + * Return the last synchronous transaction in the limbo or NULL when it is + * empty. + */ +struct txn_limbo_entry * +txn_limbo_last_synchro_entry(struct txn_limbo *limbo); + /** * Allocate, create, and append a new transaction to the limbo. * The limbo entry is allocated on the transaction's region. -- 2.24.3 (Apple Git-128) ^ permalink raw reply [flat|nested] 18+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 3/6] txn_limbo: introduce txn_limbo_last_synchro_entry method 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 3/6] txn_limbo: introduce txn_limbo_last_synchro_entry method Serge Petrenko @ 2020-12-23 17:25 ` Vladislav Shpilevoy 2020-12-24 16:13 ` Serge Petrenko 0 siblings, 1 reply; 18+ messages in thread From: Vladislav Shpilevoy @ 2020-12-23 17:25 UTC (permalink / raw) To: Serge Petrenko, gorcunov; +Cc: tarantool-patches Thanks for the patch! On 23.12.2020 12:59, Serge Petrenko via Tarantool-patches wrote: > It'll be useful for box_clear_synchro_queue rework. > > Prerequisite #5435 > --- > src/box/txn_limbo.c | 12 ++++++++++++ > src/box/txn_limbo.h | 7 +++++++ > 2 files changed, 19 insertions(+) > > diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c > index c406ed4c8..9272f5227 100644 > --- a/src/box/txn_limbo.c > +++ b/src/box/txn_limbo.c > @@ -55,6 +55,18 @@ txn_limbo_is_ro(struct txn_limbo *limbo) > return limbo->owner_id != instance_id && !txn_limbo_is_empty(limbo); > } > > +struct txn_limbo_entry * > +txn_limbo_last_synchro_entry(struct txn_limbo *limbo) > +{ > + struct txn_limbo_entry *entry = NULL; > + rlist_foreach_entry_reverse(entry, &limbo->queue, in_queue) { > + if (txn_has_flag(entry->txn, TXN_WAIT_ACK)) > + break; > + } > + assert(entry == NULL || txn_has_flag(entry->txn, TXN_WAIT_ACK)); > + return entry; > +} This could be a little simpler, but up to you: ==================== @@ -58,13 +58,12 @@ txn_limbo_is_ro(struct txn_limbo *limbo) struct txn_limbo_entry * txn_limbo_last_synchro_entry(struct txn_limbo *limbo) { - struct txn_limbo_entry *entry = NULL; + struct txn_limbo_entry *entry; rlist_foreach_entry_reverse(entry, &limbo->queue, in_queue) { if (txn_has_flag(entry->txn, TXN_WAIT_ACK)) - break; + return entry; } - assert(entry == NULL || txn_has_flag(entry->txn, TXN_WAIT_ACK)); - return entry; + return NULL; } struct txn_limbo_entry * ^ permalink raw reply [flat|nested] 18+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 3/6] txn_limbo: introduce txn_limbo_last_synchro_entry method 2020-12-23 17:25 ` Vladislav Shpilevoy @ 2020-12-24 16:13 ` Serge Petrenko 0 siblings, 0 replies; 18+ messages in thread From: Serge Petrenko @ 2020-12-24 16:13 UTC (permalink / raw) To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches 23.12.2020 20:25, Vladislav Shpilevoy пишет: > Thanks for the patch! > > On 23.12.2020 12:59, Serge Petrenko via Tarantool-patches wrote: >> It'll be useful for box_clear_synchro_queue rework. >> >> Prerequisite #5435 >> --- >> src/box/txn_limbo.c | 12 ++++++++++++ >> src/box/txn_limbo.h | 7 +++++++ >> 2 files changed, 19 insertions(+) >> >> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c >> index c406ed4c8..9272f5227 100644 >> --- a/src/box/txn_limbo.c >> +++ b/src/box/txn_limbo.c >> @@ -55,6 +55,18 @@ txn_limbo_is_ro(struct txn_limbo *limbo) >> return limbo->owner_id != instance_id && !txn_limbo_is_empty(limbo); >> } >> >> +struct txn_limbo_entry * >> +txn_limbo_last_synchro_entry(struct txn_limbo *limbo) >> +{ >> + struct txn_limbo_entry *entry = NULL; >> + rlist_foreach_entry_reverse(entry, &limbo->queue, in_queue) { >> + if (txn_has_flag(entry->txn, TXN_WAIT_ACK)) >> + break; >> + } >> + assert(entry == NULL || txn_has_flag(entry->txn, TXN_WAIT_ACK)); >> + return entry; >> +} > This could be a little simpler, but up to you: > > ==================== > @@ -58,13 +58,12 @@ txn_limbo_is_ro(struct txn_limbo *limbo) > struct txn_limbo_entry * > txn_limbo_last_synchro_entry(struct txn_limbo *limbo) > { > - struct txn_limbo_entry *entry = NULL; > + struct txn_limbo_entry *entry; > rlist_foreach_entry_reverse(entry, &limbo->queue, in_queue) { > if (txn_has_flag(entry->txn, TXN_WAIT_ACK)) > - break; > + return entry; > } > - assert(entry == NULL || txn_has_flag(entry->txn, TXN_WAIT_ACK)); > - return entry; > + return NULL; > } > > struct txn_limbo_entry * Nice. Applied. -- Serge Petrenko ^ permalink raw reply [flat|nested] 18+ messages in thread
* [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything 2020-12-23 11:59 [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything Serge Petrenko ` (2 preceding siblings ...) 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 3/6] txn_limbo: introduce txn_limbo_last_synchro_entry method Serge Petrenko @ 2020-12-23 11:59 ` Serge Petrenko 2020-12-23 17:28 ` Vladislav Shpilevoy 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 5/6] test: fix replication/election_qsync_stress test Serge Petrenko ` (2 subsequent siblings) 6 siblings, 1 reply; 18+ messages in thread From: Serge Petrenko @ 2020-12-23 11:59 UTC (permalink / raw) To: v.shpilevoy, gorcunov; +Cc: tarantool-patches It is possible that a new leader (elected either via raft or manually or via some user-written election algorithm) loses the data that the old leader has successfully committed and confirmed. Imagine such a situation: there are N nodes in a replicaset, the old leader, denoted A, tries to apply some synchronous transaction. It is written on the leader itself and N/2 other nodes, one of which is B. The transaction has thus gathered quorum, N/2 + 1 acks. Now A writes CONFIRM and commits the transaction, but dies before the confirmation reaches any of its followers. B is elected the new leader and it sees that the last A's transaction is present on N/2 nodes, so it doesn't have a quorum (A was one of the N/2 + 1). Current `clear_synchro_queue()` implementation makes B roll the transaction back, leading to rollback after commit, which is unacceptable. To fix the problem, make `clear_synchro_queue()` wait until all the rows from the previous leader gather `replication_synchro_quorum` acks. In case the quorum wasn't achieved during replication_synchro_timeout, rollback nothing and wait for user's intervention. Closes #5435 --- src/box/box.cc | 149 +++++++++++++++--- test/replication/election_replica.lua | 5 +- ...5435-clear-synchro-queue-commit-all.result | 144 +++++++++++++++++ ...35-clear-synchro-queue-commit-all.test.lua | 65 ++++++++ test/replication/suite.cfg | 1 + 5 files changed, 339 insertions(+), 25 deletions(-) create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua diff --git a/src/box/box.cc b/src/box/box.cc index 2d403fc9a..38bf4034e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1002,6 +1002,36 @@ box_set_replication_anon(void) } +struct ack_trigger_data { + bool fired; + int64_t *target_lsn; + uint32_t *replica_id; + int *quorum; + int *ack_count; + struct fiber *waiter; +}; + +struct ack_trigger { + struct ack_trigger_data data; + struct trigger trigger; +}; + +static int ack_trigger_f(struct trigger *trigger, void *event) +{ + struct relay *relay = (struct relay *)event; + struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data; + if (data->fired) + return 0; + if (*data->target_lsn <= vclock_get(relay_vclock(relay), + *data->replica_id)) { + ++*data->ack_count; + data->fired = true; + if (*data->ack_count >= *data->quorum) + fiber_wakeup(data->waiter); + } + return 0; +} + int box_clear_synchro_queue(bool try_wait) { @@ -1012,6 +1042,10 @@ box_clear_synchro_queue(bool try_wait) "simultaneous invocations"); return -1; } + /* + * XXX: we may want to write confirm + rollback even when the limbo is + * empty for the sake of limbo ownership transition. + */ if (!is_box_configured || txn_limbo_is_empty(&txn_limbo)) return 0; uint32_t former_leader_id = txn_limbo.owner_id; @@ -1030,37 +1064,104 @@ box_clear_synchro_queue(bool try_wait) break; fiber_sleep(0.001); } + /* + * Our mission was to clear the limbo from former leader's + * transactions. Exit in case someone did that for us. + */ + if (txn_limbo_is_empty(&txn_limbo) || + former_leader_id != txn_limbo.owner_id) { + in_clear_synchro_queue = false; + return 0; + } } - if (!txn_limbo_is_empty(&txn_limbo)) { - int64_t lsns[VCLOCK_MAX]; - int len = 0; - const struct vclock *vclock; - replicaset_foreach(replica) { - if (replica->relay != NULL && - relay_get_state(replica->relay) != RELAY_OFF && - !replica->anon) { - assert(!tt_uuid_is_equal(&INSTANCE_UUID, - &replica->uuid)); - vclock = relay_vclock(replica->relay); - int64_t lsn = vclock_get(vclock, - former_leader_id); - lsns[len++] = lsn; - } - } - lsns[len++] = vclock_get(box_vclock, former_leader_id); - assert(len < VCLOCK_MAX); + /* + * clear_synchro_queue() is a no-op on the limbo owner, so all the rows + * in the limbo must've come through the applier meaning they already + * have an lsn assigned, even if their WAL write hasn't finished yet. + */ + int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn; + assert(wait_lsn > 0); + + struct ack_trigger triggers[VCLOCK_MAX]; + + /* Take this node into account immediately. */ + int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; + int trigger_count = 0; + + replicaset_foreach(replica) { + if (relay_get_state(replica->relay) != RELAY_FOLLOW || + replica->anon) + continue; + + assert(replica->id != REPLICA_ID_NIL); + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); - int64_t confirm_lsn = 0; - if (len >= replication_synchro_quorum) { - qsort(lsns, len, sizeof(int64_t), cmp_i64); - confirm_lsn = lsns[len - replication_synchro_quorum]; + if (vclock_get(relay_vclock(replica->relay), + former_leader_id) >= wait_lsn) { + ack_count++; + continue; } + int i = trigger_count++; + triggers[i].data = { + .fired = false, + .target_lsn = &wait_lsn, + .replica_id = &former_leader_id, + .quorum = &replication_synchro_quorum, + .ack_count = &ack_count, + .waiter = fiber(), + }; + trigger_create(&triggers[i].trigger, ack_trigger_f, + &triggers[i].data, NULL); + relay_on_status_update(replica->relay, &triggers[i].trigger); + } + + assert(trigger_count <= VCLOCK_MAX); + + if (ack_count + trigger_count < replication_synchro_quorum) { + /* Don't even bother waiting when not enough replicas. */ + say_warn("clear_synchro_queue cannot gather quorum. " + "There're only %d replicas (including this one), while" + "quorum should be %d.", ack_count + trigger_count, + replication_synchro_quorum); + for (int i = 0; i < trigger_count; i++) + trigger_clear(&triggers[i].trigger); + goto end; + } + + if (trigger_count > 0) { + /* Allow to interrupt the function when it takes too long. */ + bool cancellable = fiber_set_cancellable(true); + fiber_sleep(replication_synchro_timeout); + fiber_set_cancellable(cancellable); + } + + for (int i = 0; i < trigger_count; i++) + trigger_clear(&triggers[i].trigger); + + /* + * No point to proceed after cancellation even if got the quorum. + * Emptying the limbo involves a pair of blocking WAL writes, + * making the fiber sleep even longer, which isn't appropriate + * when it's cancelled. + */ + if (fiber_is_cancelled()) { + say_info("clear_synchro_queue interrupted by the fiber " + "cancellation."); + goto end; + } - txn_limbo_force_empty(&txn_limbo, confirm_lsn); - assert(txn_limbo_is_empty(&txn_limbo)); + if (ack_count < replication_synchro_quorum) { + say_warn("clear_synchro_queue timed out after %.2f " + "seconds. Collected %d acks, quorum is %d. ", + replication_synchro_timeout, ack_count, + replication_synchro_quorum); + goto end; } + txn_limbo_force_empty(&txn_limbo, wait_lsn); + assert(txn_limbo_is_empty(&txn_limbo)); +end: in_clear_synchro_queue = false; return 0; } diff --git a/test/replication/election_replica.lua b/test/replication/election_replica.lua index db037ed67..3b4d9a123 100644 --- a/test/replication/election_replica.lua +++ b/test/replication/election_replica.lua @@ -4,6 +4,8 @@ local INSTANCE_ID = string.match(arg[0], "%d") local SOCKET_DIR = require('fio').cwd() local SYNCHRO_QUORUM = arg[1] and tonumber(arg[1]) or 3 local ELECTION_TIMEOUT = arg[2] and tonumber(arg[2]) or 0.1 +local ELECTION_MODE = arg[3] or 'candidate' +local CONNECT_QUORUM = arg[4] and tonumber(arg[4]) or 3 local function instance_uri(instance_id) return SOCKET_DIR..'/election_replica'..instance_id..'.sock'; @@ -19,7 +21,8 @@ box.cfg({ instance_uri(3), }, replication_timeout = 0.1, - election_mode = 'candidate', + replication_connect_quorum = CONNECT_QUORUM, + election_mode = ELECTION_MODE, election_timeout = ELECTION_TIMEOUT, replication_synchro_quorum = SYNCHRO_QUORUM, replication_synchro_timeout = 0.1, diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result new file mode 100644 index 000000000..e806d9d53 --- /dev/null +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result @@ -0,0 +1,144 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... + +-- +-- gh-5435: make sure the new limbo owner commits everything there is left in +-- the limbo from an old owner. +-- + +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} + | --- + | ... + +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) + | --- + | ... +test_run:wait_fullmesh(SERVERS) + | --- + | ... + +-- Force election_replica1 to become leader. +test_run:switch('election_replica2') + | --- + | - true + | ... +box.cfg{election_mode='voter'} + | --- + | ... +test_run:switch('election_replica3') + | --- + | - true + | ... +box.cfg{election_mode='voter'} + | --- + | ... + +test_run:switch('election_replica1') + | --- + | - true + | ... +box.ctl.wait_rw() + | --- + | ... + +_ = box.schema.space.create('test', {is_sync=true}) + | --- + | ... +_ = box.space.test:create_index('pk') + | --- + | ... + +-- Fill the limbo with pending entries. 3 mustn't receive them yet. +test_run:cmd('stop server election_replica3') + | --- + | - true + | ... +box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000} + | --- + | ... + +lsn = box.info.lsn + | --- + | ... + +for i=1,10 do\ + require('fiber').create(function() box.space.test:insert{i} end)\ +end + | --- + | ... + +-- Wait for WAL write and replication. +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) + | --- + | - true + | ... +test_run:wait_lsn('election_replica2', 'election_replica1') + | --- + | ... + +test_run:cmd('switch election_replica2') + | --- + | - true + | ... + +test_run:cmd('stop server election_replica1') + | --- + | - true + | ... +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. +-- It will vote for 2, however, since 2 has newer data, and start replication +-- once 2 becomes the leader. +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') + | --- + | - true + | ... + +-- Set a huge timeout for 2 reasons. +-- First, this guards us from the intance leaving clear_synchro_queue too early +-- and confirming nothing. +-- Second, it lets us test that the instance doesn't wait for the full timeout. +box.cfg{replication_synchro_timeout=1000} + | --- + | ... +box.cfg{election_mode='candidate'} + | --- + | ... +box.ctl.wait_rw() + | --- + | ... + +-- If 2 decided whether to keep the rows or not right on becoming the leader, +-- it would roll them all back. Make sure 2 waits till the rows are replicated +-- to 3. +box.space.test:select{} + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | - [6] + | - [7] + | - [8] + | - [9] + | - [10] + | ... + +test_run:cmd('switch default') + | --- + | - true + | ... +-- To silence the QA warning. The 1st replica is already stopped. +SERVERS[1] = nil + | --- + | ... +test_run:cmd('delete server election_replica1') + | --- + | - true + | ... +test_run:drop_cluster(SERVERS) + | --- + | ... + diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua new file mode 100644 index 000000000..da218624b --- /dev/null +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua @@ -0,0 +1,65 @@ +test_run = require('test_run').new() + +-- +-- gh-5435: make sure the new limbo owner commits everything there is left in +-- the limbo from an old owner. +-- + +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} + +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) +test_run:wait_fullmesh(SERVERS) + +-- Force election_replica1 to become leader. +test_run:switch('election_replica2') +box.cfg{election_mode='voter'} +test_run:switch('election_replica3') +box.cfg{election_mode='voter'} + +test_run:switch('election_replica1') +box.ctl.wait_rw() + +_ = box.schema.space.create('test', {is_sync=true}) +_ = box.space.test:create_index('pk') + +-- Fill the limbo with pending entries. 3 mustn't receive them yet. +test_run:cmd('stop server election_replica3') +box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000} + +lsn = box.info.lsn + +for i=1,10 do\ + require('fiber').create(function() box.space.test:insert{i} end)\ +end + +-- Wait for WAL write and replication. +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) +test_run:wait_lsn('election_replica2', 'election_replica1') + +test_run:cmd('switch election_replica2') + +test_run:cmd('stop server election_replica1') +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. +-- It will vote for 2, however, since 2 has newer data, and start replication +-- once 2 becomes the leader. +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') + +-- Set a huge timeout for 2 reasons. +-- First, this guards us from the intance leaving clear_synchro_queue too early +-- and confirming nothing. +-- Second, it lets us test that the instance doesn't wait for the full timeout. +box.cfg{replication_synchro_timeout=1000} +box.cfg{election_mode='candidate'} +box.ctl.wait_rw() + +-- If 2 decided whether to keep the rows or not right on becoming the leader, +-- it would roll them all back. Make sure 2 waits till the rows are replicated +-- to 3. +box.space.test:select{} + +test_run:cmd('switch default') +-- To silence the QA warning. The 1st replica is already stopped. +SERVERS[1] = nil +test_run:cmd('delete server election_replica1') +test_run:drop_cluster(SERVERS) + diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 5670acc4d..8fe3930db 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -36,6 +36,7 @@ "gh-4730-applier-rollback.test.lua": {}, "gh-4928-tx-boundaries.test.lua": {}, "gh-5440-qsync-ro.test.lua": {}, + "gh-5435-clear-synchro-queue-commit-all.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} -- 2.24.3 (Apple Git-128) ^ permalink raw reply [flat|nested] 18+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything Serge Petrenko @ 2020-12-23 17:28 ` Vladislav Shpilevoy 2020-12-24 16:12 ` Serge Petrenko 0 siblings, 1 reply; 18+ messages in thread From: Vladislav Shpilevoy @ 2020-12-23 17:28 UTC (permalink / raw) To: Serge Petrenko, gorcunov; +Cc: tarantool-patches Thanks for the patch! See 9 comments below, my diff in the end of the email, and in the branch as a separate commit. > diff --git a/src/box/box.cc b/src/box/box.cc > index 2d403fc9a..38bf4034e 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -1002,6 +1002,36 @@ box_set_replication_anon(void) > > } > > +struct ack_trigger_data { > + bool fired; 1. This member can be deleted, if you would call trigger_clear(trigger) instead of setting the flag. > + int64_t *target_lsn; > + uint32_t *replica_id; > + int *quorum; > + int *ack_count; 2. Most of these members can be stored by value, see my diff. Even if the idea in my diff will look bad, still some minor details could be taken. > + struct fiber *waiter; > +}; > + > +struct ack_trigger { > + struct ack_trigger_data data; > + struct trigger trigger; 3. You can merge data right into the trigger object. > +}; > + > +static int ack_trigger_f(struct trigger *trigger, void *event) 4. Normally we put return type on a separate line. > +{ > + struct relay *relay = (struct relay *)event; > + struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data; > + if (data->fired) > + return 0; > + if (*data->target_lsn <= vclock_get(relay_vclock(relay), > + *data->replica_id)) { > + ++*data->ack_count; > + data->fired = true; > + if (*data->ack_count >= *data->quorum) > + fiber_wakeup(data->waiter); > + } > + return 0; > +} > @@ -1030,37 +1064,104 @@ box_clear_synchro_queue(bool try_wait) > break; > fiber_sleep(0.001); > } > + /* > + * Our mission was to clear the limbo from former leader's > + * transactions. Exit in case someone did that for us. > + */ > + if (txn_limbo_is_empty(&txn_limbo) || > + former_leader_id != txn_limbo.owner_id) { > + in_clear_synchro_queue = false; > + return 0; > + } > } > > - if (!txn_limbo_is_empty(&txn_limbo)) { > - int64_t lsns[VCLOCK_MAX]; > - int len = 0; > - const struct vclock *vclock; > - replicaset_foreach(replica) { > - if (replica->relay != NULL && > - relay_get_state(replica->relay) != RELAY_OFF && > - !replica->anon) { > - assert(!tt_uuid_is_equal(&INSTANCE_UUID, > - &replica->uuid)); > - vclock = relay_vclock(replica->relay); > - int64_t lsn = vclock_get(vclock, > - former_leader_id); > - lsns[len++] = lsn; > - } > - } > - lsns[len++] = vclock_get(box_vclock, former_leader_id); > - assert(len < VCLOCK_MAX); > + /* > + * clear_synchro_queue() is a no-op on the limbo owner, so all the rows > + * in the limbo must've come through the applier meaning they already > + * have an lsn assigned, even if their WAL write hasn't finished yet. > + */ > + int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn; > + assert(wait_lsn > 0); > + > + struct ack_trigger triggers[VCLOCK_MAX]; > + > + /* Take this node into account immediately. */ > + int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; > + int trigger_count = 0; > + > + replicaset_foreach(replica) { > + if (relay_get_state(replica->relay) != RELAY_FOLLOW || > + replica->anon) > + continue; > + > + assert(replica->id != REPLICA_ID_NIL); > + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); > > - int64_t confirm_lsn = 0; > - if (len >= replication_synchro_quorum) { > - qsort(lsns, len, sizeof(int64_t), cmp_i64); > - confirm_lsn = lsns[len - replication_synchro_quorum]; > + if (vclock_get(relay_vclock(replica->relay), > + former_leader_id) >= wait_lsn) { > + ack_count++; > + continue; > } > + int i = trigger_count++; > + triggers[i].data = { > + .fired = false, > + .target_lsn = &wait_lsn, > + .replica_id = &former_leader_id, > + .quorum = &replication_synchro_quorum, > + .ack_count = &ack_count, > + .waiter = fiber(), > + }; > + trigger_create(&triggers[i].trigger, ack_trigger_f, > + &triggers[i].data, NULL); > + relay_on_status_update(replica->relay, &triggers[i].trigger); > + } > + > + assert(trigger_count <= VCLOCK_MAX); > + > + if (ack_count + trigger_count < replication_synchro_quorum) { > + /* Don't even bother waiting when not enough replicas. */ > + say_warn("clear_synchro_queue cannot gather quorum. " > + "There're only %d replicas (including this one), while" > + "quorum should be %d.", ack_count + trigger_count, > + replication_synchro_quorum); > + for (int i = 0; i < trigger_count; i++) > + trigger_clear(&triggers[i].trigger); > + goto end; 5. Better wait anyway. Because more replicas may appear soon and confirm everything. During the timeout wait. Although it is probably not possible if we bind triggers to individual relays. I tried to fix it with a global trigger in my diff. > + } > + > + if (trigger_count > 0) { > + /* Allow to interrupt the function when it takes too long. */ > + bool cancellable = fiber_set_cancellable(true); > + fiber_sleep(replication_synchro_timeout); > + fiber_set_cancellable(cancellable); 6. I think a sleep is enough. If a user made the fiber non-cancellable, better leave it as is. We only change it to 'false', and only when do some operation when a wakeup is not legal, such as WAL write. > + } > + > + for (int i = 0; i < trigger_count; i++) > + trigger_clear(&triggers[i].trigger); > + > + /* > + * No point to proceed after cancellation even if got the quorum. > + * Emptying the limbo involves a pair of blocking WAL writes, > + * making the fiber sleep even longer, which isn't appropriate > + * when it's cancelled. > + */ > + if (fiber_is_cancelled()) { > + say_info("clear_synchro_queue interrupted by the fiber " > + "cancellation."); > + goto end; > + } > > - txn_limbo_force_empty(&txn_limbo, confirm_lsn); > - assert(txn_limbo_is_empty(&txn_limbo)); > + if (ack_count < replication_synchro_quorum) { > + say_warn("clear_synchro_queue timed out after %.2f " > + "seconds. Collected %d acks, quorum is %d. ", > + replication_synchro_timeout, ack_count, > + replication_synchro_quorum); > + goto end; 7. Why don't you return an error? The queue couldn't be cleared, so it looks like an error, no? I added diag_set() in my diff, but didn't change it to return -1 so far. > } > > + txn_limbo_force_empty(&txn_limbo, wait_lsn); > + assert(txn_limbo_is_empty(&txn_limbo)); > +end: > in_clear_synchro_queue = false; > return 0; > } > diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result > new file mode 100644 > index 000000000..e806d9d53 > --- /dev/null > +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result 8. Perhaps better rename to gh-5435-qsync-.... . Because now it is very useful that I can run all qsync tests with a small command `python test-run.py qsync`. > diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua > new file mode 100644 > index 000000000..da218624b > --- /dev/null > +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua > @@ -0,0 +1,65 @@ > +test_run = require('test_run').new() > + > +-- > +-- gh-5435: make sure the new limbo owner commits everything there is left in > +-- the limbo from an old owner. > +-- > + > +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} > + > +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) > +test_run:wait_fullmesh(SERVERS) > + > +-- Force election_replica1 to become leader. > +test_run:switch('election_replica2') > +box.cfg{election_mode='voter'} > +test_run:switch('election_replica3') > +box.cfg{election_mode='voter'} > + > +test_run:switch('election_replica1') > +box.ctl.wait_rw() > + > +_ = box.schema.space.create('test', {is_sync=true}) > +_ = box.space.test:create_index('pk') > + > +-- Fill the limbo with pending entries. 3 mustn't receive them yet. > +test_run:cmd('stop server election_replica3') > +box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000} > + > +lsn = box.info.lsn > + > +for i=1,10 do\ > + require('fiber').create(function() box.space.test:insert{i} end)\ > +end > + > +-- Wait for WAL write and replication. > +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) > +test_run:wait_lsn('election_replica2', 'election_replica1') > + > +test_run:cmd('switch election_replica2') > + > +test_run:cmd('stop server election_replica1') > +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. > +-- It will vote for 2, however, since 2 has newer data, and start replication > +-- once 2 becomes the leader. > +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') > + > +-- Set a huge timeout for 2 reasons. > +-- First, this guards us from the intance leaving clear_synchro_queue too early 9. intance -> instance. See my changes below and on the branch in a separate commit. ==================== diff --git a/src/box/box.cc b/src/box/box.cc index 38bf4034e..90c07c342 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1002,32 +1002,102 @@ box_set_replication_anon(void) } -struct ack_trigger_data { - bool fired; - int64_t *target_lsn; - uint32_t *replica_id; - int *quorum; - int *ack_count; +/** Trigger to catch ACKs from all nodes when need to wait for quorum. */ +struct box_quorum_trigger { + /** Inherit trigger. */ + struct trigger base; + /** Minimal number of nodes who should confirm the target LSN. */ + int quorum; + /** Target LSN to wait for. */ + int64_t target_lsn; + /** Replica ID whose LSN is being waited. */ + uint32_t replica_id; + /** + * All versions of the given replica's LSN as seen by other nodes. The + * same as in the txn limbo. + */ + struct vclock vclock; + /** Number of nodes who confirmed the LSN. */ + int ack_count; + /** Fiber to wakeup when quorum is reached. */ struct fiber *waiter; }; -struct ack_trigger { - struct ack_trigger_data data; - struct trigger trigger; -}; - -static int ack_trigger_f(struct trigger *trigger, void *event) +static int +box_quorum_on_ack_f(struct trigger *trigger, void *event) { - struct relay *relay = (struct relay *)event; - struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data; - if (data->fired) + struct replication_ack *ack = (struct replication_ack *)event; + struct box_quorum_trigger *t = (struct box_quorum_trigger *)trigger; + int64_t new_lsn = vclock_get(ack->vclock, t->replica_id); + int64_t old_lsn = vclock_get(&t->vclock, ack->source); + if (new_lsn < t->target_lsn || old_lsn >= t->target_lsn) return 0; - if (*data->target_lsn <= vclock_get(relay_vclock(relay), - *data->replica_id)) { - ++*data->ack_count; - data->fired = true; - if (*data->ack_count >= *data->quorum) - fiber_wakeup(data->waiter); + + vclock_follow(&t->vclock, ack->source, new_lsn); + ++t->ack_count; + if (t->ack_count >= t->quorum) + fiber_wakeup(t->waiter); + trigger_clear(trigger); + return 0; +} + +/** + * Wait until at least @a quorum of nodes confirm @a target_lsn from the node + * with id @a lead_id. + */ +static int +box_wait_quorum(uint32_t lead_id, int64_t target_lsn, int quorum, + double timeout) +{ + struct box_quorum_trigger t; + memset(&t, 0, sizeof(t)); + vclock_create(&t.vclock); + + /* Take this node into account immediately. */ + int ack_count = vclock_get(box_vclock, lead_id) >= target_lsn; + replicaset_foreach(replica) { + if (relay_get_state(replica->relay) != RELAY_FOLLOW || + replica->anon) + continue; + + assert(replica->id != REPLICA_ID_NIL); + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); + + int64_t lsn = vclock_get(relay_vclock(replica->relay), lead_id); + vclock_follow(&t.vclock, replica->id, lsn); + if (lsn >= target_lsn) { + ack_count++; + continue; + } + } + if (ack_count < quorum) { + t.quorum = quorum; + t.target_lsn = target_lsn; + t.replica_id = lead_id; + t.ack_count = ack_count; + t.waiter = fiber(); + trigger_create(&t.base, box_quorum_on_ack_f, NULL, NULL); + trigger_add(&replicaset.on_ack, &t.base); + fiber_sleep(timeout); + trigger_clear(&t.base); + ack_count = t.ack_count; + } + /* + * No point to proceed after cancellation even if got the quorum. The + * quorum is waited by limbo clear function. Emptying the limbo involves + * a pair of blocking WAL writes, making the fiber sleep even longer, + * which isn't appropriate when it's canceled. + */ + if (fiber_is_cancelled()) { + diag_set(ClientError, ER_QUORUM_WAIT, quorum, + "fiber is canceled"); + return -1; + } + if (ack_count < quorum) { + diag_set(ClientError, ER_QUORUM_WAIT, quorum, tt_sprintf( + "timeout after %.2lf seconds, collected %d acks with " + "%d quorum", timeout, ack_count, quorum)); + return -1; } return 0; } @@ -1083,85 +1153,14 @@ box_clear_synchro_queue(bool try_wait) int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn; assert(wait_lsn > 0); - struct ack_trigger triggers[VCLOCK_MAX]; - - /* Take this node into account immediately. */ - int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; - int trigger_count = 0; - - replicaset_foreach(replica) { - if (relay_get_state(replica->relay) != RELAY_FOLLOW || - replica->anon) - continue; - - assert(replica->id != REPLICA_ID_NIL); - assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); - - if (vclock_get(relay_vclock(replica->relay), - former_leader_id) >= wait_lsn) { - ack_count++; - continue; - } - int i = trigger_count++; - triggers[i].data = { - .fired = false, - .target_lsn = &wait_lsn, - .replica_id = &former_leader_id, - .quorum = &replication_synchro_quorum, - .ack_count = &ack_count, - .waiter = fiber(), - }; - trigger_create(&triggers[i].trigger, ack_trigger_f, - &triggers[i].data, NULL); - relay_on_status_update(replica->relay, &triggers[i].trigger); - } - - assert(trigger_count <= VCLOCK_MAX); - - if (ack_count + trigger_count < replication_synchro_quorum) { - /* Don't even bother waiting when not enough replicas. */ - say_warn("clear_synchro_queue cannot gather quorum. " - "There're only %d replicas (including this one), while" - "quorum should be %d.", ack_count + trigger_count, - replication_synchro_quorum); - for (int i = 0; i < trigger_count; i++) - trigger_clear(&triggers[i].trigger); - goto end; - } - - if (trigger_count > 0) { - /* Allow to interrupt the function when it takes too long. */ - bool cancellable = fiber_set_cancellable(true); - fiber_sleep(replication_synchro_timeout); - fiber_set_cancellable(cancellable); - } - - for (int i = 0; i < trigger_count; i++) - trigger_clear(&triggers[i].trigger); - - /* - * No point to proceed after cancellation even if got the quorum. - * Emptying the limbo involves a pair of blocking WAL writes, - * making the fiber sleep even longer, which isn't appropriate - * when it's cancelled. - */ - if (fiber_is_cancelled()) { - say_info("clear_synchro_queue interrupted by the fiber " - "cancellation."); - goto end; - } - - if (ack_count < replication_synchro_quorum) { - say_warn("clear_synchro_queue timed out after %.2f " - "seconds. Collected %d acks, quorum is %d. ", - replication_synchro_timeout, ack_count, - replication_synchro_quorum); - goto end; + if (box_wait_quorum(former_leader_id, wait_lsn, + replication_synchro_quorum, + replication_synchro_timeout) == 0) { + txn_limbo_force_empty(&txn_limbo, wait_lsn); + assert(txn_limbo_is_empty(&txn_limbo)); + } else { + diag_log(); } - - txn_limbo_force_empty(&txn_limbo, wait_lsn); - assert(txn_limbo_is_empty(&txn_limbo)); -end: in_clear_synchro_queue = false; return 0; } diff --git a/src/box/errcode.h b/src/box/errcode.h index cfea5bcf2..56573688e 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -274,6 +274,7 @@ struct errcode_record { /*219 */_(ER_XLOG_GAP, "%s") \ /*220 */_(ER_TOO_EARLY_SUBSCRIBE, "Can't subscribe non-anonymous replica %s until join is done") \ /*221 */_(ER_SQL_CANT_ADD_AUTOINC, "Can't add AUTOINCREMENT: space %s can't feature more than one AUTOINCREMENT field") \ + /*222 */_(ER_QUORUM_WAIT, "Couldn't wait for quorum %d: %s") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/relay.cc b/src/box/relay.cc index dbad8a680..df04f8198 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -146,8 +146,6 @@ struct relay { alignas(CACHELINE_SIZE) /** Known relay vclock. */ struct vclock vclock; - /** Triggers fired once tx knows the new relay vclock. */ - struct rlist on_status_update; /** * True if the relay needs Raft updates. It can live fine * without sending Raft updates, if it is a relay to an @@ -175,12 +173,6 @@ relay_vclock(const struct relay *relay) return &relay->tx.vclock; } -void -relay_on_status_update(struct relay *relay, struct trigger *trigger) -{ - trigger_add(&relay->tx.on_status_update, trigger); -} - double relay_last_row_time(const struct relay *relay) { @@ -209,7 +201,6 @@ relay_new(struct replica *replica) diag_create(&relay->diag); stailq_create(&relay->pending_gc); relay->state = RELAY_OFF; - rlist_create(&relay->tx.on_status_update); return relay; } @@ -421,6 +412,9 @@ tx_status_update(struct cmsg *msg) { struct relay_status_msg *status = (struct relay_status_msg *)msg; vclock_copy(&status->relay->tx.vclock, &status->vclock); + struct replication_ack ack; + ack.source = status->relay->replica->id; + ack.vclock = &status->vclock; /* * Let pending synchronous transactions know, which of * them were successfully sent to the replica. Acks are @@ -429,11 +423,10 @@ tx_status_update(struct cmsg *msg) * for master's CONFIRM message instead. */ if (txn_limbo.owner_id == instance_id) { - txn_limbo_ack(&txn_limbo, status->relay->replica->id, - vclock_get(&status->vclock, instance_id)); + txn_limbo_ack(&txn_limbo, ack.source, + vclock_get(ack.vclock, instance_id)); } - - trigger_run(&status->relay->tx.on_status_update, status->relay); + trigger_run(&replicaset.on_ack, &ack); static const struct cmsg_hop route[] = { {relay_status_update, NULL} diff --git a/src/box/relay.h b/src/box/relay.h index 4e6ecaffb..b32e2ea2a 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -93,10 +93,6 @@ relay_vclock(const struct relay *relay); double relay_last_row_time(const struct relay *relay); -/** Set a trigger on relay vclock update as seen by tx. */ -void -relay_on_status_update(struct relay *relay, struct trigger *trigger); - /** * Send a Raft update request to the relay channel. It is not * guaranteed that it will be delivered. The connection may break. diff --git a/src/box/replication.cc b/src/box/replication.cc index 931c73a37..980109597 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -98,6 +98,8 @@ replication_init(void) rlist_create(&replicaset.applier.on_rollback); rlist_create(&replicaset.applier.on_wal_write); + rlist_create(&replicaset.on_ack); + diag_create(&replicaset.applier.diag); } @@ -113,6 +115,7 @@ replication_free(void) relay_cancel(replica->relay); diag_destroy(&replicaset.applier.diag); + trigger_destroy(&replicaset.on_ack); } int diff --git a/src/box/replication.h b/src/box/replication.h index e57912848..2ad1cbf66 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -190,6 +190,14 @@ extern struct tt_uuid REPLICASET_UUID; typedef rb_tree(struct replica) replica_hash_t; +/** Ack which is passed to on ack triggers. */ +struct replication_ack { + /** Replica ID of the ACK source. */ + uint32_t source; + /** Confirmed vclock. */ + const struct vclock *vclock; +}; + /** * Replica set state. * @@ -271,6 +279,8 @@ struct replicaset { /* Shared applier diagnostic area. */ struct diag diag; } applier; + /** Triggers are invoked on each ACK from each replica. */ + struct rlist on_ack; /** Map of all known replica_id's to correspponding replica's. */ struct replica *replica_by_id[VCLOCK_MAX]; }; diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result index e806d9d53..2699231e5 100644 --- a/test/replication/gh-5435-clear-synchro-queue-commit-all.result +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result @@ -96,7 +96,7 @@ test_run:cmd('start server election_replica3 with wait=False, wait_load=False, a | ... -- Set a huge timeout for 2 reasons. --- First, this guards us from the intance leaving clear_synchro_queue too early +-- First, this guards us from the instance leaving clear_synchro_queue too early -- and confirming nothing. -- Second, it lets us test that the instance doesn't wait for the full timeout. box.cfg{replication_synchro_timeout=1000} diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua index da218624b..03705d96c 100644 --- a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua @@ -45,7 +45,7 @@ test_run:cmd('stop server election_replica1') test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') -- Set a huge timeout for 2 reasons. --- First, this guards us from the intance leaving clear_synchro_queue too early +-- First, this guards us from the instance leaving clear_synchro_queue too early -- and confirming nothing. -- Second, it lets us test that the instance doesn't wait for the full timeout. box.cfg{replication_synchro_timeout=1000} ^ permalink raw reply [flat|nested] 18+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything 2020-12-23 17:28 ` Vladislav Shpilevoy @ 2020-12-24 16:12 ` Serge Petrenko 2020-12-24 17:35 ` Vladislav Shpilevoy 0 siblings, 1 reply; 18+ messages in thread From: Serge Petrenko @ 2020-12-24 16:12 UTC (permalink / raw) To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches 23.12.2020 20:28, Vladislav Shpilevoy пишет: > Thanks for the patch! > > See 9 comments below, my diff in the end of the email, and in > the branch as a separate commit. Thanks for your review & the patch! I've applied your diff with a couple of changes. Please find my answers and diff below. >> diff --git a/src/box/box.cc b/src/box/box.cc >> index 2d403fc9a..38bf4034e 100644 >> --- a/src/box/box.cc >> +++ b/src/box/box.cc >> @@ -1002,6 +1002,36 @@ box_set_replication_anon(void) >> >> } >> >> +struct ack_trigger_data { >> + bool fired; > 1. This member can be deleted, if you would call > trigger_clear(trigger) instead of setting the flag. True. >> + int64_t *target_lsn; >> + uint32_t *replica_id; >> + int *quorum; >> + int *ack_count; > 2. Most of these members can be stored by value, see my diff. > Even if the idea in my diff will look bad, still some minor > details could be taken. I store quorum by reference so that the trigger gets the new value once quorumis changed. This would work when the quorum is increased, at least. Speaking of other values, I thought it might be useful to change them on the fly one day, even though this isn't used now. Anyway, let's just retry when quorum is changed. >> + struct fiber *waiter; >> +}; >> + >> +struct ack_trigger { >> + struct ack_trigger_data data; >> + struct trigger trigger; > 3. You can merge data right into the trigger object. > >> +}; >> + >> +static int ack_trigger_f(struct trigger *trigger, void *event) > 4. Normally we put return type on a separate line. Sure. My bad. >> +{ >> + struct relay *relay = (struct relay *)event; >> + struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data; >> + if (data->fired) >> + return 0; >> + if (*data->target_lsn <= vclock_get(relay_vclock(relay), >> + *data->replica_id)) { >> + ++*data->ack_count; >> + data->fired = true; >> + if (*data->ack_count >= *data->quorum) >> + fiber_wakeup(data->waiter); >> + } >> + return 0; >> +} >> @@ -1030,37 +1064,104 @@ box_clear_synchro_queue(bool try_wait) >> break; >> fiber_sleep(0.001); >> } >> + /* >> + * Our mission was to clear the limbo from former leader's >> + * transactions. Exit in case someone did that for us. >> + */ >> + if (txn_limbo_is_empty(&txn_limbo) || >> + former_leader_id != txn_limbo.owner_id) { >> + in_clear_synchro_queue = false; >> + return 0; >> + } >> } >> >> - if (!txn_limbo_is_empty(&txn_limbo)) { >> - int64_t lsns[VCLOCK_MAX]; >> - int len = 0; >> - const struct vclock *vclock; >> - replicaset_foreach(replica) { >> - if (replica->relay != NULL && >> - relay_get_state(replica->relay) != RELAY_OFF && >> - !replica->anon) { >> - assert(!tt_uuid_is_equal(&INSTANCE_UUID, >> - &replica->uuid)); >> - vclock = relay_vclock(replica->relay); >> - int64_t lsn = vclock_get(vclock, >> - former_leader_id); >> - lsns[len++] = lsn; >> - } >> - } >> - lsns[len++] = vclock_get(box_vclock, former_leader_id); >> - assert(len < VCLOCK_MAX); >> + /* >> + * clear_synchro_queue() is a no-op on the limbo owner, so all the rows >> + * in the limbo must've come through the applier meaning they already >> + * have an lsn assigned, even if their WAL write hasn't finished yet. >> + */ >> + int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn; >> + assert(wait_lsn > 0); >> + >> + struct ack_trigger triggers[VCLOCK_MAX]; >> + >> + /* Take this node into account immediately. */ >> + int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; >> + int trigger_count = 0; >> + >> + replicaset_foreach(replica) { >> + if (relay_get_state(replica->relay) != RELAY_FOLLOW || >> + replica->anon) >> + continue; >> + >> + assert(replica->id != REPLICA_ID_NIL); >> + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); >> >> - int64_t confirm_lsn = 0; >> - if (len >= replication_synchro_quorum) { >> - qsort(lsns, len, sizeof(int64_t), cmp_i64); >> - confirm_lsn = lsns[len - replication_synchro_quorum]; >> + if (vclock_get(relay_vclock(replica->relay), >> + former_leader_id) >= wait_lsn) { >> + ack_count++; >> + continue; >> } >> + int i = trigger_count++; >> + triggers[i].data = { >> + .fired = false, >> + .target_lsn = &wait_lsn, >> + .replica_id = &former_leader_id, >> + .quorum = &replication_synchro_quorum, >> + .ack_count = &ack_count, >> + .waiter = fiber(), >> + }; >> + trigger_create(&triggers[i].trigger, ack_trigger_f, >> + &triggers[i].data, NULL); >> + relay_on_status_update(replica->relay, &triggers[i].trigger); >> + } >> + >> + assert(trigger_count <= VCLOCK_MAX); >> + >> + if (ack_count + trigger_count < replication_synchro_quorum) { >> + /* Don't even bother waiting when not enough replicas. */ >> + say_warn("clear_synchro_queue cannot gather quorum. " >> + "There're only %d replicas (including this one), while" >> + "quorum should be %d.", ack_count + trigger_count, >> + replication_synchro_quorum); >> + for (int i = 0; i < trigger_count; i++) >> + trigger_clear(&triggers[i].trigger); >> + goto end; > 5. Better wait anyway. Because more replicas may appear soon and confirm > everything. During the timeout wait. Although it is probably not possible > if we bind triggers to individual relays. I tried to fix it with a global > trigger in my diff. Ok, your variant looks good. >> + } >> + >> + if (trigger_count > 0) { >> + /* Allow to interrupt the function when it takes too long. */ >> + bool cancellable = fiber_set_cancellable(true); >> + fiber_sleep(replication_synchro_timeout); >> + fiber_set_cancellable(cancellable); > 6. I think a sleep is enough. If a user made the fiber non-cancellable, better > leave it as is. We only change it to 'false', and only when do some operation > when a wakeup is not legal, such as WAL write. Okay. >> + } >> + >> + for (int i = 0; i < trigger_count; i++) >> + trigger_clear(&triggers[i].trigger); >> + >> + /* >> + * No point to proceed after cancellation even if got the quorum. >> + * Emptying the limbo involves a pair of blocking WAL writes, >> + * making the fiber sleep even longer, which isn't appropriate >> + * when it's cancelled. >> + */ >> + if (fiber_is_cancelled()) { >> + say_info("clear_synchro_queue interrupted by the fiber " >> + "cancellation."); >> + goto end; >> + } >> >> - txn_limbo_force_empty(&txn_limbo, confirm_lsn); >> - assert(txn_limbo_is_empty(&txn_limbo)); >> + if (ack_count < replication_synchro_quorum) { >> + say_warn("clear_synchro_queue timed out after %.2f " >> + "seconds. Collected %d acks, quorum is %d. ", >> + replication_synchro_timeout, ack_count, >> + replication_synchro_quorum); >> + goto end; > 7. Why don't you return an error? The queue couldn't be cleared, so it > looks like an error, no? > > I added diag_set() in my diff, but didn't change it to return -1 so far. Ok, let's return an error then. >> } >> >> + txn_limbo_force_empty(&txn_limbo, wait_lsn); >> + assert(txn_limbo_is_empty(&txn_limbo)); >> +end: >> in_clear_synchro_queue = false; >> return 0; >> } >> diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result >> new file mode 100644 >> index 000000000..e806d9d53 >> --- /dev/null >> +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result > 8. Perhaps better rename to gh-5435-qsync-.... . Because now it is very > useful that I can run all qsync tests with a small command > `python test-run.py qsync`. No problem. done. My diff's below: =============================================== diff --git a/src/box/box.cc b/src/box/box.cc index 90c07c342..22e3057f8 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1035,9 +1035,10 @@ box_quorum_on_ack_f(struct trigger *trigger, void *event) vclock_follow(&t->vclock, ack->source, new_lsn); ++t->ack_count; - if (t->ack_count >= t->quorum) + if (t->ack_count >= t->quorum) { fiber_wakeup(t->waiter); - trigger_clear(trigger); + trigger_clear(trigger); + } return 0; } @@ -1153,16 +1154,25 @@ box_clear_synchro_queue(bool try_wait) int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn; assert(wait_lsn > 0); - if (box_wait_quorum(former_leader_id, wait_lsn, - replication_synchro_quorum, - replication_synchro_timeout) == 0) { - txn_limbo_force_empty(&txn_limbo, wait_lsn); - assert(txn_limbo_is_empty(&txn_limbo)); - } else { - diag_log(); + int quorum = replication_synchro_quorum; + int rc = box_wait_quorum(former_leader_id, wait_lsn, quorum, + replication_synchro_timeout); + if (rc == 0) { + if (quorum < replication_synchro_quorum) { + diag_set(ClientError, ER_QUORUM_WAIT, quorum, + "quorum was increased while waiting"); + rc = -1; + } else if (wait_lsn < txn_limbo_last_synchro_entry(&txn_limbo)->lsn) { + diag_set(ClientError, ER_QUORUM_WAIT, quorum, + "new synchronous transactions appeared"); + rc = -1; + } else { + txn_limbo_force_empty(&txn_limbo, wait_lsn); + assert(txn_limbo_is_empty(&txn_limbo)); + } } in_clear_synchro_queue = false; - return 0; + return rc; } void diff --git a/src/box/raft.c b/src/box/raft.c index 1942df952..634740570 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -94,8 +94,17 @@ box_raft_update_synchro_queue(struct raft *raft) * simply log a warning. */ if (raft->state == RAFT_STATE_LEADER) { - if (box_clear_synchro_queue(false) != 0) - diag_log(); + int rc = 0; + uint32_t errcode = 0; + do { + rc = box_clear_synchro_queue(false); + if (rc) { + struct error *err = diag_last_error(diag_get()); + errcode = box_error_code(err); + diag_log(); + } + } while (rc != 0 && errcode == ER_QUORUM_WAIT && + !fiber_is_cancelled()); } } diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.result similarity index 100% rename from test/replication/gh-5435-clear-synchro-queue-commit-all.result rename to test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.result diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.test.lua similarity index 100% rename from test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua rename to test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.test.lua -- Serge Petrenko ^ permalink raw reply [flat|nested] 18+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything 2020-12-24 16:12 ` Serge Petrenko @ 2020-12-24 17:35 ` Vladislav Shpilevoy 2020-12-24 21:02 ` Serge Petrenko 0 siblings, 1 reply; 18+ messages in thread From: Vladislav Shpilevoy @ 2020-12-24 17:35 UTC (permalink / raw) To: Serge Petrenko, gorcunov; +Cc: tarantool-patches I've force pushed this diff: ==================== @@ -98,7 +98,7 @@ box_raft_update_synchro_queue(struct raft *raft) uint32_t errcode = 0; do { rc = box_clear_synchro_queue(false); - if (rc) { + if (rc != 0) { struct error *err = diag_last_error(diag_get()); errcode = box_error_code(err); diag_log(); ==================== The patchset looks good, but the test hangs if I run it a lot of times in parallel. I tried 132 times and after some number of runs all the workers hung. Couldn't find the reason right away. ^ permalink raw reply [flat|nested] 18+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything 2020-12-24 17:35 ` Vladislav Shpilevoy @ 2020-12-24 21:02 ` Serge Petrenko 0 siblings, 0 replies; 18+ messages in thread From: Serge Petrenko @ 2020-12-24 21:02 UTC (permalink / raw) To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches 24.12.2020 20:35, Vladislav Shpilevoy пишет: > I've force pushed this diff: > > ==================== > @@ -98,7 +98,7 @@ box_raft_update_synchro_queue(struct raft *raft) > uint32_t errcode = 0; > do { > rc = box_clear_synchro_queue(false); > - if (rc) { > + if (rc != 0) { > struct error *err = diag_last_error(diag_get()); > errcode = box_error_code(err); > diag_log(); > ==================== > > The patchset looks good, but the test hangs if I run it a lot of times > in parallel. I tried 132 times and after some number of runs all the > workers hung. > > Couldn't find the reason right away. I could reproduce the issue like this: `./test-run.py $(yes replication/gh-5435-qsync-clear-synchro-queue-commit-all.test.lua | head -n 512) -j 32` Looks like I've found the issue. Sometimes the replica doesn't receive is_leader notification from the new leader. So it ignores everything that the leader sends it and never sends out acks. I suppose it happens when the instance is just started and subscribes to the candidate. You see, box_process_subscribe sends out raft state unconditionally, and in our case it sends out 2's vote request to 3. 3 responds immediately, and 2 becomes leader, but relay isn't started yet, or is started but didn't have time to set is_raft_enabled to true, so 3 never gets 2's is_leader notification. In other words it's a race between 2 becoming a leader and trying to broadcast it's new state and 2's relay becoming ready to handle raft broadcasts. I couldn't find a way to ameliorate this in the test. Can we push it as is then? Two tiny fixes force pushed (not related to the test hang): =================================================== diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 8fe3930db..c80430afc 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -36,7 +36,7 @@ "gh-4730-applier-rollback.test.lua": {}, "gh-4928-tx-boundaries.test.lua": {}, "gh-5440-qsync-ro.test.lua": {}, - "gh-5435-clear-synchro-queue-commit-all.test.lua": {}, + "gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} diff --git a/src/box/box.cc b/src/box/box.cc index 28146a747..e1d8305c8 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1228,8 +1228,8 @@ box_wait_quorum(uint32_t lead_id, int64_t target_lsn, int quorum, } if (ack_count < quorum) { diag_set(ClientError, ER_QUORUM_WAIT, quorum, tt_sprintf( - "timeout after %.2lf seconds, collected %d acks with " - "%d quorum", timeout, ack_count, quorum)); + "timeout after %.2lf seconds, collected %d acks", + timeout, ack_count, quorum)); return -1; } return 0; -- Serge Petrenko ^ permalink raw reply [flat|nested] 18+ messages in thread
* [Tarantool-patches] [PATCH v2 5/6] test: fix replication/election_qsync_stress test 2020-12-23 11:59 [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything Serge Petrenko ` (3 preceding siblings ...) 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything Serge Petrenko @ 2020-12-23 11:59 ` Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 6/6] txn_limbo: ignore CONFIRM/ROLLBACK for a foreign master Serge Petrenko 2020-12-25 10:04 ` [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything Kirill Yukhin 6 siblings, 0 replies; 18+ messages in thread From: Serge Petrenko @ 2020-12-23 11:59 UTC (permalink / raw) To: v.shpilevoy, gorcunov; +Cc: tarantool-patches The test involves writing synchronous transactions on one node and making other nodes confirm these transactions after its death. In order for the test to work properly we need to make sure the old node replicates all its transactions to peers before killing it. Otherwise once the node is resurrected it'll have newer data, not present on other nodes, which leads to their vclocks being incompatible and noone becoming the new leader and hanging the test. Follow-up #5435 --- test/replication/election_qsync_stress.result | 7 ++++++- test/replication/election_qsync_stress.test.lua | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/test/replication/election_qsync_stress.result b/test/replication/election_qsync_stress.result index 1380c910b..2646995ee 100644 --- a/test/replication/election_qsync_stress.result +++ b/test/replication/election_qsync_stress.result @@ -88,8 +88,13 @@ test_run:cmd('setopt delimiter ";"') | ... for i = 1,10 do c:eval('box.cfg{replication_synchro_quorum=4, replication_synchro_timeout=1000}') + lsn = c:eval('return box.info.lsn') c.space.test:insert({i}, {is_async=true}) - test_run:wait_cond(function() return c.space.test:get{i} ~= nil end) + test_run:wait_cond(function() return c:eval('return box.info.lsn') > lsn end) + r1_nr = old_leader_nr % 3 + 1 + r2_nr = (old_leader_nr + 1) % 3 + 1 + test_run:wait_lsn('election_replica'..r1_nr, 'election_replica'..old_leader_nr) + test_run:wait_lsn('election_replica'..r2_nr, 'election_replica'..old_leader_nr) test_run:cmd('stop server '..old_leader) nrs[old_leader_nr] = false new_leader_nr = get_leader(nrs) diff --git a/test/replication/election_qsync_stress.test.lua b/test/replication/election_qsync_stress.test.lua index d70601cc5..5e5905f23 100644 --- a/test/replication/election_qsync_stress.test.lua +++ b/test/replication/election_qsync_stress.test.lua @@ -50,8 +50,13 @@ _ = c:eval('box.space.test:create_index("pk")') test_run:cmd('setopt delimiter ";"') for i = 1,10 do c:eval('box.cfg{replication_synchro_quorum=4, replication_synchro_timeout=1000}') + lsn = c:eval('return box.info.lsn') c.space.test:insert({i}, {is_async=true}) - test_run:wait_cond(function() return c.space.test:get{i} ~= nil end) + test_run:wait_cond(function() return c:eval('return box.info.lsn') > lsn end) + r1_nr = old_leader_nr % 3 + 1 + r2_nr = (old_leader_nr + 1) % 3 + 1 + test_run:wait_lsn('election_replica'..r1_nr, 'election_replica'..old_leader_nr) + test_run:wait_lsn('election_replica'..r2_nr, 'election_replica'..old_leader_nr) test_run:cmd('stop server '..old_leader) nrs[old_leader_nr] = false new_leader_nr = get_leader(nrs) -- 2.24.3 (Apple Git-128) ^ permalink raw reply [flat|nested] 18+ messages in thread
* [Tarantool-patches] [PATCH v2 6/6] txn_limbo: ignore CONFIRM/ROLLBACK for a foreign master 2020-12-23 11:59 [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything Serge Petrenko ` (4 preceding siblings ...) 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 5/6] test: fix replication/election_qsync_stress test Serge Petrenko @ 2020-12-23 11:59 ` Serge Petrenko 2020-12-23 17:28 ` Vladislav Shpilevoy 2020-12-25 10:04 ` [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything Kirill Yukhin 6 siblings, 1 reply; 18+ messages in thread From: Serge Petrenko @ 2020-12-23 11:59 UTC (permalink / raw) To: v.shpilevoy, gorcunov; +Cc: tarantool-patches We designed limbo so that it errors on receiving a CONFIRM or ROLLBACK for other instance's data. Actually, this error is pointless, and even harmful. Here's why: Imagine you have 3 instances, 1, 2 and 3. First 1 writes some synchronous transactions, but dies before writing CONFIRM. Now 2 has to write CONFIRM instead of 1 to take limbo ownership. From now on 2 is the limbo owner and in case of high enough load it constantly has some data in the limbo. Once 1 restarts, it first recovers its xlogs, and fills its limbo with its own unconfirmed transactions from the previous run. Now replication between 1, 2 and 3 is started and the first thing 1 sees is that 2 and 3 ack its old transactions. So 1 writes CONFIRM for its own transactions even before the same CONFIRM written by 2 reaches it. Once the CONFIRM written by 1 is replicated to 2 and 3 they error and stop replication, since their limbo contains entries from 2, not from 1. Actually, there's no need to error, since it's just a really old CONFIRM which's already processed by both 2 and 3. So, ignore CONFIRM/ROLLBACK when it references a wrong limbo owner. The issue was discovered with test replication/election_qsync_stress. Follow-up #5435 --- src/box/applier.cc | 3 +-- src/box/box.cc | 3 +-- src/box/txn_limbo.c | 14 +++++++++----- src/box/txn_limbo.h | 2 +- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index fb2f5d130..553db76fc 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -861,8 +861,7 @@ apply_synchro_row(struct xrow_header *row) if (xrow_decode_synchro(row, &req) != 0) goto err; - if (txn_limbo_process(&txn_limbo, &req)) - goto err; + txn_limbo_process(&txn_limbo, &req); struct synchro_entry *entry; entry = synchro_entry_new(row, &req); diff --git a/src/box/box.cc b/src/box/box.cc index 38bf4034e..fc4888955 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -383,8 +383,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) struct synchro_request syn_req; if (xrow_decode_synchro(row, &syn_req) != 0) diag_raise(); - if (txn_limbo_process(&txn_limbo, &syn_req) != 0) - diag_raise(); + txn_limbo_process(&txn_limbo, &syn_req); return; } if (iproto_type_is_raft_request(row->type)) { diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index 9272f5227..9498c7a44 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -634,13 +634,17 @@ complete: return 0; } -int +void txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) { if (req->replica_id != limbo->owner_id) { - diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, - req->replica_id, limbo->owner_id); - return -1; + /* + * Ignore CONFIRM/ROLLBACK messages for a foreign master. + * These are most likely outdated messages for already confirmed + * data from an old leader, who has just started and written + * confirm right on synchronous transaction recovery. + */ + return; } switch (req->type) { case IPROTO_CONFIRM: @@ -652,7 +656,7 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) default: unreachable(); } - return 0; + return; } void diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index a49356c14..c28b5666d 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -257,7 +257,7 @@ int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); /** Execute a synchronous replication request. */ -int +void txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req); /** -- 2.24.3 (Apple Git-128) ^ permalink raw reply [flat|nested] 18+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 6/6] txn_limbo: ignore CONFIRM/ROLLBACK for a foreign master 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 6/6] txn_limbo: ignore CONFIRM/ROLLBACK for a foreign master Serge Petrenko @ 2020-12-23 17:28 ` Vladislav Shpilevoy 2020-12-24 16:13 ` Serge Petrenko 0 siblings, 1 reply; 18+ messages in thread From: Vladislav Shpilevoy @ 2020-12-23 17:28 UTC (permalink / raw) To: Serge Petrenko, gorcunov; +Cc: tarantool-patches Thanks for the patch! On 23.12.2020 12:59, Serge Petrenko via Tarantool-patches wrote: > We designed limbo so that it errors on receiving a CONFIRM or ROLLBACK > for other instance's data. Actually, this error is pointless, and even > harmful. Here's why: > > Imagine you have 3 instances, 1, 2 and 3. > First 1 writes some synchronous transactions, but dies before writing CONFIRM. > > Now 2 has to write CONFIRM instead of 1 to take limbo ownership. > From now on 2 is the limbo owner and in case of high enough load it constantly > has some data in the limbo. > > Once 1 restarts, it first recovers its xlogs, and fills its limbo with > its own unconfirmed transactions from the previous run. Now replication > between 1, 2 and 3 is started and the first thing 1 sees is that 2 and 3 > ack its old transactions. So 1 writes CONFIRM for its own transactions > even before the same CONFIRM written by 2 reaches it. > Once the CONFIRM written by 1 is replicated to 2 and 3 they error and > stop replication, since their limbo contains entries from 2, not from 1. > Actually, there's no need to error, since it's just a really old CONFIRM > which's already processed by both 2 and 3.> > So, ignore CONFIRM/ROLLBACK when it references a wrong limbo owner. > > The issue was discovered with test replication/election_qsync_stress. The comment is good. ^ permalink raw reply [flat|nested] 18+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 6/6] txn_limbo: ignore CONFIRM/ROLLBACK for a foreign master 2020-12-23 17:28 ` Vladislav Shpilevoy @ 2020-12-24 16:13 ` Serge Petrenko 0 siblings, 0 replies; 18+ messages in thread From: Serge Petrenko @ 2020-12-24 16:13 UTC (permalink / raw) To: Vladislav Shpilevoy, gorcunov; +Cc: tarantool-patches 23.12.2020 20:28, Vladislav Shpilevoy пишет: > Thanks for the patch! > > On 23.12.2020 12:59, Serge Petrenko via Tarantool-patches wrote: >> We designed limbo so that it errors on receiving a CONFIRM or ROLLBACK >> for other instance's data. Actually, this error is pointless, and even >> harmful. Here's why: >> >> Imagine you have 3 instances, 1, 2 and 3. >> First 1 writes some synchronous transactions, but dies before writing CONFIRM. >> >> Now 2 has to write CONFIRM instead of 1 to take limbo ownership. >> From now on 2 is the limbo owner and in case of high enough load it constantly >> has some data in the limbo. >> >> Once 1 restarts, it first recovers its xlogs, and fills its limbo with >> its own unconfirmed transactions from the previous run. Now replication >> between 1, 2 and 3 is started and the first thing 1 sees is that 2 and 3 >> ack its old transactions. So 1 writes CONFIRM for its own transactions >> even before the same CONFIRM written by 2 reaches it. >> Once the CONFIRM written by 1 is replicated to 2 and 3 they error and >> stop replication, since their limbo contains entries from 2, not from 1. >> Actually, there's no need to error, since it's just a really old CONFIRM >> which's already processed by both 2 and 3.> >> So, ignore CONFIRM/ROLLBACK when it references a wrong limbo owner. >> >> The issue was discovered with test replication/election_qsync_stress. > The comment is good. Thanks! -- Serge Petrenko ^ permalink raw reply [flat|nested] 18+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything 2020-12-23 11:59 [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything Serge Petrenko ` (5 preceding siblings ...) 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 6/6] txn_limbo: ignore CONFIRM/ROLLBACK for a foreign master Serge Petrenko @ 2020-12-25 10:04 ` Kirill Yukhin 6 siblings, 0 replies; 18+ messages in thread From: Kirill Yukhin @ 2020-12-25 10:04 UTC (permalink / raw) To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy Hello, On 23 Dec 14:59, Serge Petrenko via Tarantool-patches wrote: > Changes in v2: > - implement something similar to relay_lsn_watcher with the help of triggers. > - fix election_qsync_stress test hang. One issue with the test itself and one > issue with the txn_limbo. > - various minor fixes. > - change clear_synchro_queue behaviour: wait for quorum for > replication_synchro_timeout. If the quorum isn't reached, rollback nothing, > and warn the user. > > @ChangeLog > - Change `box.ctl.clear_synchro_queue` behaviour. Now it tries to commit > everything that is present on the node. In order to do so it waits for other > instances to replicate the data for `replication_synchro_quorum` seconds. > In case timeout passes and quorum wasn't reached, nothing is rolled back (gh-5435) I've checked your patchset into 2.6 and master. -- Regards, Kirill Yukhin ^ permalink raw reply [flat|nested] 18+ messages in thread
end of thread, other threads:[~2020-12-25 10:04 UTC | newest] Thread overview: 18+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2020-12-23 11:59 [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 1/6] box: add a single execution guard to clear_synchro_queue Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 2/6] relay: introduce on_status_update trigger Serge Petrenko 2020-12-23 17:25 ` Vladislav Shpilevoy 2020-12-24 16:11 ` Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 3/6] txn_limbo: introduce txn_limbo_last_synchro_entry method Serge Petrenko 2020-12-23 17:25 ` Vladislav Shpilevoy 2020-12-24 16:13 ` Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything Serge Petrenko 2020-12-23 17:28 ` Vladislav Shpilevoy 2020-12-24 16:12 ` Serge Petrenko 2020-12-24 17:35 ` Vladislav Shpilevoy 2020-12-24 21:02 ` Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 5/6] test: fix replication/election_qsync_stress test Serge Petrenko 2020-12-23 11:59 ` [Tarantool-patches] [PATCH v2 6/6] txn_limbo: ignore CONFIRM/ROLLBACK for a foreign master Serge Petrenko 2020-12-23 17:28 ` Vladislav Shpilevoy 2020-12-24 16:13 ` Serge Petrenko 2020-12-25 10:04 ` [Tarantool-patches] [PATCH v2 0/6] make clear_synchro_queue commit everything Kirill Yukhin
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox