From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp16.mail.ru (smtp16.mail.ru [94.100.176.153]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 39E2E45C308 for ; Thu, 10 Dec 2020 23:55:35 +0300 (MSK) From: Serge Petrenko Date: Thu, 10 Dec 2020 23:55:14 +0300 Message-Id: <99accb77474e59e62203f9028196620a07635a7d.1607633488.git.sergepetrenko@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, cyrillos@gmail.com Cc: tarantool-patches@dev.tarantool.org It is possible that a new leader (elected either via raft or manually or via some user-written election algorithm) loses the data that the old leader has successfully committed and confirmed. Imagine such a situation: there are N nodes in a replicaset, the old leader, denoted A, tries to apply some synchronous transaction. It is written on the leader itself and N other nodes, one of which is B. The transaction has thus gathered quorum, N/2 + 1 acks. Now A writes CONFIRM and commits the transaction, but dies before the confirmation reaches any of its followers. B is elected the new leader and it sees that the last A's transaction is present on N/2 nodes, so it doesn't have a quorum (A was one of the N/2 + 1). Current `clear_synchro_queue()` implementation makes B roll back the transaction, leading to rollback after commit, which is unacceptable. To fix the problem, make `clear_synchro_queue()` wait until all the rows from the previous leader gather `replication_synchro_quorum` acks. In case any new rows come via replication while waiting for acks, wait for their confirmation as well. Closes #5435 --- src/box/box.cc | 126 +++++++++++++--- test/replication/election_replica.lua | 5 +- ...5435-clear-synchro-queue-commit-all.result | 137 ++++++++++++++++++ ...35-clear-synchro-queue-commit-all.test.lua | 60 ++++++++ test/replication/suite.cfg | 1 + 5 files changed, 306 insertions(+), 23 deletions(-) create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua diff --git a/src/box/box.cc b/src/box/box.cc index 8e0c9a160..fb9167977 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1001,6 +1001,25 @@ box_set_replication_anon(void) } +struct lsn_watcher_data { + int *ack_count; + int *watcher_count; +}; + +static void +count_confirm_f(void *data) +{ + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data; + (*d->ack_count)++; +} + +static void +watcher_destroy_f(void *data) +{ + struct lsn_watcher_data *d = (struct lsn_watcher_data *)data; + (*d->watcher_count)--; +} + int box_clear_synchro_queue(bool try_wait) { @@ -1011,6 +1030,10 @@ box_clear_synchro_queue(bool try_wait) "simultaneous invocations"); return -1; } + /* + * XXX: we may want to write confirm + rollback even when the limbo is + * empty for the sake of limbo ownership transition. + */ if (!is_box_configured || txn_limbo_is_empty(&txn_limbo)) return 0; uint32_t former_leader_id = txn_limbo.owner_id; @@ -1032,34 +1055,93 @@ box_clear_synchro_queue(bool try_wait) } } - if (!txn_limbo_is_empty(&txn_limbo)) { - int64_t lsns[VCLOCK_MAX]; - int len = 0; - const struct vclock *vclock; + if (txn_limbo_is_empty(&txn_limbo)) + return 0; + + /* + * Allocate the watchers statically to not bother with alloc/free. + * This is fine since we have a single execution guard. + */ + static struct relay_lsn_watcher watchers[VCLOCK_MAX]; + for (int i = 1; i < VCLOCK_MAX; i++) + rlist_create(&watchers[i].in_list); + + int64_t wait_lsn = 0; + bool restart = false; + do { + wait_lsn = txn_limbo_last_entry(&txn_limbo)->lsn; + /* + * Take this node into account immediately. + * clear_synchro_queue() is a no-op on the limbo owner for now, + * so all the rows in the limbo must've come through the applier + * and so they already have an lsn assigned, even if their wal + * write isn't finished yet. + */ + assert(wait_lsn > 0); + int count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; + int watcher_count = 0; + struct lsn_watcher_data data = { + .ack_count = &count, + .watcher_count = &watcher_count, + }; + replicaset_foreach(replica) { - if (replica->relay != NULL && - relay_get_state(replica->relay) != RELAY_OFF && - !replica->anon) { - assert(!tt_uuid_is_equal(&INSTANCE_UUID, - &replica->uuid)); - vclock = relay_vclock(replica->relay); - int64_t lsn = vclock_get(vclock, - former_leader_id); - lsns[len++] = lsn; + if (replica->anon || replica->relay == NULL || + relay_get_state(replica->relay) != RELAY_FOLLOW) + continue; + assert(replica->id != 0); + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); + + if (vclock_get(relay_vclock(replica->relay), + former_leader_id) >= wait_lsn) { + count++; + continue; } + + relay_lsn_watcher_create(&watchers[replica->id], + former_leader_id, wait_lsn, + count_confirm_f, + watcher_destroy_f, &data); + relay_set_lsn_watcher(replica->relay, + &watchers[replica->id]); + watcher_count++; } - lsns[len++] = vclock_get(box_vclock, former_leader_id); - assert(len < VCLOCK_MAX); - int64_t confirm_lsn = 0; - if (len >= replication_synchro_quorum) { - qsort(lsns, len, sizeof(int64_t), cmp_i64); - confirm_lsn = lsns[len - replication_synchro_quorum]; + + while (count < replication_synchro_quorum && + count + watcher_count >= replication_synchro_quorum) { + fiber_yield(); } - txn_limbo_force_empty(&txn_limbo, confirm_lsn); - assert(txn_limbo_is_empty(&txn_limbo)); - } + /* + * In case some new limbo entries arrived, confirm them as well. + */ + restart = wait_lsn < txn_limbo_last_entry(&txn_limbo)->lsn; + + /* + * Not enough replicas connected. Give user some time to + * reconfigure quorum and replicas some time to reconnect, then + * restart the watchers. + */ + if (count + watcher_count < replication_synchro_quorum) { + say_info("clear_syncrho_queue cannot collect quorum: " + "number of connected replicas (%d) is less " + "than replication_synchro_quorum (%d). " + "Will retry in %.2f seconds", + count + watcher_count, + replication_synchro_quorum, + replication_timeout); + fiber_sleep(replication_timeout); + restart = true; + } + + /* Detach the watchers that haven't fired. */ + for (int i = 1; i < VCLOCK_MAX; i++) + rlist_del_entry(&watchers[i], in_list); + } while (restart); + + txn_limbo_force_empty(&txn_limbo, wait_lsn); + assert(txn_limbo_is_empty(&txn_limbo)); return 0; } diff --git a/test/replication/election_replica.lua b/test/replication/election_replica.lua index db037ed67..3b4d9a123 100644 --- a/test/replication/election_replica.lua +++ b/test/replication/election_replica.lua @@ -4,6 +4,8 @@ local INSTANCE_ID = string.match(arg[0], "%d") local SOCKET_DIR = require('fio').cwd() local SYNCHRO_QUORUM = arg[1] and tonumber(arg[1]) or 3 local ELECTION_TIMEOUT = arg[2] and tonumber(arg[2]) or 0.1 +local ELECTION_MODE = arg[3] or 'candidate' +local CONNECT_QUORUM = arg[4] and tonumber(arg[4]) or 3 local function instance_uri(instance_id) return SOCKET_DIR..'/election_replica'..instance_id..'.sock'; @@ -19,7 +21,8 @@ box.cfg({ instance_uri(3), }, replication_timeout = 0.1, - election_mode = 'candidate', + replication_connect_quorum = CONNECT_QUORUM, + election_mode = ELECTION_MODE, election_timeout = ELECTION_TIMEOUT, replication_synchro_quorum = SYNCHRO_QUORUM, replication_synchro_timeout = 0.1, diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result new file mode 100644 index 000000000..e633f9e60 --- /dev/null +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result @@ -0,0 +1,137 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... + +-- +-- gh-5435: make sure the new limbo owner commits everything there is left in +-- the limbo from an old owner. +-- + +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} + | --- + | ... + +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) + | --- + | ... +test_run:wait_fullmesh(SERVERS) + | --- + | ... + +-- Force election_replica1 to become leader. +test_run:switch('election_replica2') + | --- + | - true + | ... +box.cfg{election_mode='voter'} + | --- + | ... +test_run:switch('election_replica3') + | --- + | - true + | ... +box.cfg{election_mode='voter'} + | --- + | ... + +test_run:switch('election_replica1') + | --- + | - true + | ... +box.ctl.wait_rw() + | --- + | ... + +_ = box.schema.space.create('test', {is_sync=true}) + | --- + | ... +_ = box.space.test:create_index('pk') + | --- + | ... + +-- Fill the limbo with pending entries. 3 mustn't receive them yet. +test_run:cmd('stop server election_replica3') + | --- + | - true + | ... +box.cfg{replication_synchro_quorum=3} + | --- + | ... + +lsn = box.info.lsn + | --- + | ... + +for i=1,10 do\ + require('fiber').create(function() box.space.test:insert{i} end)\ +end + | --- + | ... + +-- Wait for WAL write and replication. +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) + | --- + | - true + | ... +test_run:wait_lsn('election_replica2', 'election_replica1') + | --- + | ... + +test_run:cmd('switch election_replica2') + | --- + | - true + | ... + +test_run:cmd('stop server election_replica1') + | --- + | - true + | ... +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. +-- It will vote for 3, however, since 3 has newer data, and start replication +-- once 3 becomes the leader. +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') + | --- + | - true + | ... + +box.cfg{election_mode='candidate'} + | --- + | ... +box.ctl.wait_rw() + | --- + | ... + +-- If 2 decided whether to keep the rows or not right on becoming the leader, +-- it would roll them all back. Make sure 2 waits till the rows are replicated +-- to 3. +box.space.test:select{} + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | - [6] + | - [7] + | - [8] + | - [9] + | - [10] + | ... + +test_run:cmd('switch default') + | --- + | - true + | ... +-- To silence the QA warning. The 1st replica is already stopped. +SERVERS[1] = nil + | --- + | ... +test_run:cmd('delete server election_replica1') + | --- + | - true + | ... +test_run:drop_cluster(SERVERS) + | --- + | ... + diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua new file mode 100644 index 000000000..6cf616671 --- /dev/null +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua @@ -0,0 +1,60 @@ +test_run = require('test_run').new() + +-- +-- gh-5435: make sure the new limbo owner commits everything there is left in +-- the limbo from an old owner. +-- + +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} + +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) +test_run:wait_fullmesh(SERVERS) + +-- Force election_replica1 to become leader. +test_run:switch('election_replica2') +box.cfg{election_mode='voter'} +test_run:switch('election_replica3') +box.cfg{election_mode='voter'} + +test_run:switch('election_replica1') +box.ctl.wait_rw() + +_ = box.schema.space.create('test', {is_sync=true}) +_ = box.space.test:create_index('pk') + +-- Fill the limbo with pending entries. 3 mustn't receive them yet. +test_run:cmd('stop server election_replica3') +box.cfg{replication_synchro_quorum=3} + +lsn = box.info.lsn + +for i=1,10 do\ + require('fiber').create(function() box.space.test:insert{i} end)\ +end + +-- Wait for WAL write and replication. +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) +test_run:wait_lsn('election_replica2', 'election_replica1') + +test_run:cmd('switch election_replica2') + +test_run:cmd('stop server election_replica1') +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. +-- It will vote for 3, however, since 3 has newer data, and start replication +-- once 3 becomes the leader. +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') + +box.cfg{election_mode='candidate'} +box.ctl.wait_rw() + +-- If 2 decided whether to keep the rows or not right on becoming the leader, +-- it would roll them all back. Make sure 2 waits till the rows are replicated +-- to 3. +box.space.test:select{} + +test_run:cmd('switch default') +-- To silence the QA warning. The 1st replica is already stopped. +SERVERS[1] = nil +test_run:cmd('delete server election_replica1') +test_run:drop_cluster(SERVERS) + diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 5670acc4d..8fe3930db 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -36,6 +36,7 @@ "gh-4730-applier-rollback.test.lua": {}, "gh-4928-tx-boundaries.test.lua": {}, "gh-5440-qsync-ro.test.lua": {}, + "gh-5435-clear-synchro-queue-commit-all.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} -- 2.24.3 (Apple Git-128)