From: Serge Petrenko <sergepetrenko@tarantool.org> To: v.shpilevoy@tarantool.org, cyrillos@gmail.com Cc: tarantool-patches@dev.tarantool.org Subject: [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything Date: Thu, 10 Dec 2020 23:55:14 +0300 [thread overview] Message-ID: <99accb77474e59e62203f9028196620a07635a7d.1607633488.git.sergepetrenko@tarantool.org> (raw) In-Reply-To: <cover.1607633488.git.sergepetrenko@tarantool.org> It is possible that a new leader (elected either via raft or manually or via some user-written election algorithm) loses the data that the old leader has successfully committed and confirmed. Imagine such a situation: there are N nodes in a replicaset, the old leader, denoted A, tries to apply some synchronous transaction. It is written on the leader itself and N other nodes, one of which is B. The transaction has thus gathered quorum, N/2 + 1 acks. Now A writes CONFIRM and commits the transaction, but dies before the confirmation reaches any of its followers. B is elected the new leader and it sees that the last A's transaction is present on N/2 nodes, so it doesn't have a quorum (A was one of the N/2 + 1). Current `clear_synchro_queue()` implementation makes B roll back the transaction, leading to rollback after commit, which is unacceptable. To fix the problem, make `clear_synchro_queue()` wait until all the rows from the previous leader gather `replication_synchro_quorum` acks. In case any new rows come via replication while waiting for acks, wait for their confirmation as well. Closes #5435 --- src/box/box.cc | 126 +++++++++++++--- test/replication/election_replica.lua | 5 +- ...5435-clear-synchro-queue-commit-all.result | 137 ++++++++++++++++++ ...35-clear-synchro-queue-commit-all.test.lua | 60 ++++++++ test/replication/suite.cfg | 1 + 5 files changed, 306 insertions(+), 23 deletions(-) create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua diff --git a/src/box/box.cc b/src/box/box.cc index 8e0c9a160..fb9167977 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1001,6 +1001,25 @@ box_set_replication_anon(void) } +struct lsn_watcher_data { + int *ack_count; + int *watcher_count; +}; + +static void +count_confirm_f(void *data) +{ + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data; + (*d->ack_count)++; +} + +static void +watcher_destroy_f(void *data) +{ + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data; + (*d->watcher_count)--; +} + int box_clear_synchro_queue(bool try_wait) { @@ -1011,6 +1030,10 @@ box_clear_synchro_queue(bool try_wait) "simultaneous invocations"); return -1; } + /* + * XXX: we may want to write confirm + rollback even when the limbo is + * empty for the sake of limbo ownership transition. + */ if (!is_box_configured || txn_limbo_is_empty(&txn_limbo)) return 0; uint32_t former_leader_id = txn_limbo.owner_id; @@ -1032,34 +1055,93 @@ box_clear_synchro_queue(bool try_wait) } } - if (!txn_limbo_is_empty(&txn_limbo)) { - int64_t lsns[VCLOCK_MAX]; - int len = 0; - const struct vclock *vclock; + if (txn_limbo_is_empty(&txn_limbo)) + return 0; + + /* + * Allocate the watchers statically to not bother with alloc/free. + * This is fine since we have a single execution guard. + */ + static struct relay_lsn_watcher watchers[VCLOCK_MAX]; + for (int i = 1; i < VCLOCK_MAX; i++) + rlist_create(&watchers[i].in_list); + + int64_t wait_lsn = 0; + bool restart = false; + do { + wait_lsn = txn_limbo_last_entry(&txn_limbo)->lsn; + /* + * Take this node into account immediately. + * clear_synchro_queue() is a no-op on the limbo owner for now, + * so all the rows in the limbo must've come through the applier + * and so they already have an lsn assigned, even if their wal + * write isn't finished yet. + */ + assert(wait_lsn > 0); + int count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; + int watcher_count = 0; + struct lsn_watcher_data data = { + .ack_count = &count, + .watcher_count = &watcher_count, + }; + replicaset_foreach(replica) { - if (replica->relay != NULL && - relay_get_state(replica->relay) != RELAY_OFF && - !replica->anon) { - assert(!tt_uuid_is_equal(&INSTANCE_UUID, - &replica->uuid)); - vclock = relay_vclock(replica->relay); - int64_t lsn = vclock_get(vclock, - former_leader_id); - lsns[len++] = lsn; + if (replica->anon || replica->relay == NULL || + relay_get_state(replica->relay) != RELAY_FOLLOW) + continue; + assert(replica->id != 0); + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); + + if (vclock_get(relay_vclock(replica->relay), + former_leader_id) >= wait_lsn) { + count++; + continue; } + + relay_lsn_watcher_create(&watchers[replica->id], + former_leader_id, wait_lsn, + count_confirm_f, + watcher_destroy_f, &data); + relay_set_lsn_watcher(replica->relay, + &watchers[replica->id]); + watcher_count++; } - lsns[len++] = vclock_get(box_vclock, former_leader_id); - assert(len < VCLOCK_MAX); - int64_t confirm_lsn = 0; - if (len >= replication_synchro_quorum) { - qsort(lsns, len, sizeof(int64_t), cmp_i64); - confirm_lsn = lsns[len - replication_synchro_quorum]; + + while (count < replication_synchro_quorum && + count + watcher_count >= replication_synchro_quorum) { + fiber_yield(); } - txn_limbo_force_empty(&txn_limbo, confirm_lsn); - assert(txn_limbo_is_empty(&txn_limbo)); - } + /* + * In case some new limbo entries arrived, confirm them as well. + */ + restart = wait_lsn < txn_limbo_last_entry(&txn_limbo)->lsn; + + /* + * Not enough replicas connected. Give user some time to + * reconfigure quorum and replicas some time to reconnect, then + * restart the watchers. + */ + if (count + watcher_count < replication_synchro_quorum) { + say_info("clear_syncrho_queue cannot collect quorum: " + "number of connected replicas (%d) is less " + "than replication_synchro_quorum (%d). " + "Will retry in %.2f seconds", + count + watcher_count, + replication_synchro_quorum, + replication_timeout); + fiber_sleep(replication_timeout); + restart = true; + } + + /* Detach the watchers that haven't fired. */ + for (int i = 1; i < VCLOCK_MAX; i++) + rlist_del_entry(&watchers[i], in_list); + } while (restart); + + txn_limbo_force_empty(&txn_limbo, wait_lsn); + assert(txn_limbo_is_empty(&txn_limbo)); return 0; } diff --git a/test/replication/election_replica.lua b/test/replication/election_replica.lua index db037ed67..3b4d9a123 100644 --- a/test/replication/election_replica.lua +++ b/test/replication/election_replica.lua @@ -4,6 +4,8 @@ local INSTANCE_ID = string.match(arg[0], "%d") local SOCKET_DIR = require('fio').cwd() local SYNCHRO_QUORUM = arg[1] and tonumber(arg[1]) or 3 local ELECTION_TIMEOUT = arg[2] and tonumber(arg[2]) or 0.1 +local ELECTION_MODE = arg[3] or 'candidate' +local CONNECT_QUORUM = arg[4] and tonumber(arg[4]) or 3 local function instance_uri(instance_id) return SOCKET_DIR..'/election_replica'..instance_id..'.sock'; @@ -19,7 +21,8 @@ box.cfg({ instance_uri(3), }, replication_timeout = 0.1, - election_mode = 'candidate', + replication_connect_quorum = CONNECT_QUORUM, + election_mode = ELECTION_MODE, election_timeout = ELECTION_TIMEOUT, replication_synchro_quorum = SYNCHRO_QUORUM, replication_synchro_timeout = 0.1, diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result new file mode 100644 index 000000000..e633f9e60 --- /dev/null +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result @@ -0,0 +1,137 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... + +-- +-- gh-5435: make sure the new limbo owner commits everything there is left in +-- the limbo from an old owner. +-- + +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} + | --- + | ... + +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) + | --- + | ... +test_run:wait_fullmesh(SERVERS) + | --- + | ... + +-- Force election_replica1 to become leader. +test_run:switch('election_replica2') + | --- + | - true + | ... +box.cfg{election_mode='voter'} + | --- + | ... +test_run:switch('election_replica3') + | --- + | - true + | ... +box.cfg{election_mode='voter'} + | --- + | ... + +test_run:switch('election_replica1') + | --- + | - true + | ... +box.ctl.wait_rw() + | --- + | ... + +_ = box.schema.space.create('test', {is_sync=true}) + | --- + | ... +_ = box.space.test:create_index('pk') + | --- + | ... + +-- Fill the limbo with pending entries. 3 mustn't receive them yet. +test_run:cmd('stop server election_replica3') + | --- + | - true + | ... +box.cfg{replication_synchro_quorum=3} + | --- + | ... + +lsn = box.info.lsn + | --- + | ... + +for i=1,10 do\ + require('fiber').create(function() box.space.test:insert{i} end)\ +end + | --- + | ... + +-- Wait for WAL write and replication. +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) + | --- + | - true + | ... +test_run:wait_lsn('election_replica2', 'election_replica1') + | --- + | ... + +test_run:cmd('switch election_replica2') + | --- + | - true + | ... + +test_run:cmd('stop server election_replica1') + | --- + | - true + | ... +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. +-- It will vote for 3, however, since 3 has newer data, and start replication +-- once 3 becomes the leader. +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') + | --- + | - true + | ... + +box.cfg{election_mode='candidate'} + | --- + | ... +box.ctl.wait_rw() + | --- + | ... + +-- If 2 decided whether to keep the rows or not right on becoming the leader, +-- it would roll them all back. Make sure 2 waits till the rows are replicated +-- to 3. +box.space.test:select{} + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | - [6] + | - [7] + | - [8] + | - [9] + | - [10] + | ... + +test_run:cmd('switch default') + | --- + | - true + | ... +-- To silence the QA warning. The 1st replica is already stopped. +SERVERS[1] = nil + | --- + | ... +test_run:cmd('delete server election_replica1') + | --- + | - true + | ... +test_run:drop_cluster(SERVERS) + | --- + | ... + diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua new file mode 100644 index 000000000..6cf616671 --- /dev/null +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua @@ -0,0 +1,60 @@ +test_run = require('test_run').new() + +-- +-- gh-5435: make sure the new limbo owner commits everything there is left in +-- the limbo from an old owner. +-- + +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} + +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) +test_run:wait_fullmesh(SERVERS) + +-- Force election_replica1 to become leader. +test_run:switch('election_replica2') +box.cfg{election_mode='voter'} +test_run:switch('election_replica3') +box.cfg{election_mode='voter'} + +test_run:switch('election_replica1') +box.ctl.wait_rw() + +_ = box.schema.space.create('test', {is_sync=true}) +_ = box.space.test:create_index('pk') + +-- Fill the limbo with pending entries. 3 mustn't receive them yet. +test_run:cmd('stop server election_replica3') +box.cfg{replication_synchro_quorum=3} + +lsn = box.info.lsn + +for i=1,10 do\ + require('fiber').create(function() box.space.test:insert{i} end)\ +end + +-- Wait for WAL write and replication. +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) +test_run:wait_lsn('election_replica2', 'election_replica1') + +test_run:cmd('switch election_replica2') + +test_run:cmd('stop server election_replica1') +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. +-- It will vote for 3, however, since 3 has newer data, and start replication +-- once 3 becomes the leader. +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') + +box.cfg{election_mode='candidate'} +box.ctl.wait_rw() + +-- If 2 decided whether to keep the rows or not right on becoming the leader, +-- it would roll them all back. Make sure 2 waits till the rows are replicated +-- to 3. +box.space.test:select{} + +test_run:cmd('switch default') +-- To silence the QA warning. The 1st replica is already stopped. +SERVERS[1] = nil +test_run:cmd('delete server election_replica1') +test_run:drop_cluster(SERVERS) + diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 5670acc4d..8fe3930db 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -36,6 +36,7 @@ "gh-4730-applier-rollback.test.lua": {}, "gh-4928-tx-boundaries.test.lua": {}, "gh-5440-qsync-ro.test.lua": {}, + "gh-5435-clear-synchro-queue-commit-all.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} -- 2.24.3 (Apple Git-128)
next prev parent reply other threads:[~2020-12-10 20:55 UTC|newest] Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-12-10 20:55 [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue " Serge Petrenko 2020-12-10 20:55 ` [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue Serge Petrenko 2020-12-17 21:43 ` Vladislav Shpilevoy 2020-12-21 10:18 ` Serge Petrenko 2020-12-21 17:11 ` Vladislav Shpilevoy 2020-12-23 12:01 ` Serge Petrenko 2020-12-10 20:55 ` [Tarantool-patches] [PATCH 2/4] relay: rename is_raft_enabled message to relay_is_running Serge Petrenko 2020-12-17 21:43 ` Vladislav Shpilevoy 2020-12-23 12:01 ` Serge Petrenko 2020-12-10 20:55 ` [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher Serge Petrenko 2020-12-17 21:43 ` Vladislav Shpilevoy [not found] ` <4b7f4fc1-6d48-4332-c432-1eeb0b28c016@tarantool.org> 2020-12-23 12:03 ` Serge Petrenko 2020-12-10 20:55 ` Serge Petrenko [this message] 2020-12-17 21:43 ` [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything Vladislav Shpilevoy 2020-12-23 12:04 ` Serge Petrenko 2020-12-11 7:15 ` [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue " Serge Petrenko 2020-12-11 9:19 ` Serge Petrenko
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=99accb77474e59e62203f9028196620a07635a7d.1607633488.git.sergepetrenko@tarantool.org \ --to=sergepetrenko@tarantool.org \ --cc=cyrillos@gmail.com \ --cc=tarantool-patches@dev.tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox