From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp56.i.mail.ru (smtp56.i.mail.ru [217.69.128.36]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 3989B4765E4 for ; Wed, 23 Dec 2020 14:59:50 +0300 (MSK) From: Serge Petrenko Date: Wed, 23 Dec 2020 14:59:22 +0300 Message-Id: <3136023eb90fd3c6a10cb288466a9f3c8f9d2c01.1608724239.git.sergepetrenko@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org It is possible that a new leader (elected either via raft or manually or via some user-written election algorithm) loses the data that the old leader has successfully committed and confirmed. Imagine such a situation: there are N nodes in a replicaset, the old leader, denoted A, tries to apply some synchronous transaction. It is written on the leader itself and N/2 other nodes, one of which is B. The transaction has thus gathered quorum, N/2 + 1 acks. Now A writes CONFIRM and commits the transaction, but dies before the confirmation reaches any of its followers. B is elected the new leader and it sees that the last A's transaction is present on N/2 nodes, so it doesn't have a quorum (A was one of the N/2 + 1). Current `clear_synchro_queue()` implementation makes B roll the transaction back, leading to rollback after commit, which is unacceptable. To fix the problem, make `clear_synchro_queue()` wait until all the rows from the previous leader gather `replication_synchro_quorum` acks. In case the quorum wasn't achieved during replication_synchro_timeout, rollback nothing and wait for user's intervention. Closes #5435 --- src/box/box.cc | 149 +++++++++++++++--- test/replication/election_replica.lua | 5 +- ...5435-clear-synchro-queue-commit-all.result | 144 +++++++++++++++++ ...35-clear-synchro-queue-commit-all.test.lua | 65 ++++++++ test/replication/suite.cfg | 1 + 5 files changed, 339 insertions(+), 25 deletions(-) create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua diff --git a/src/box/box.cc b/src/box/box.cc index 2d403fc9a..38bf4034e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1002,6 +1002,36 @@ box_set_replication_anon(void) } +struct ack_trigger_data { + bool fired; + int64_t *target_lsn; + uint32_t *replica_id; + int *quorum; + int *ack_count; + struct fiber *waiter; +}; + +struct ack_trigger { + struct ack_trigger_data data; + struct trigger trigger; +}; + +static int ack_trigger_f(struct trigger *trigger, void *event) +{ + struct relay *relay = (struct relay *)event; + struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data; + if (data->fired) + return 0; + if (*data->target_lsn <= vclock_get(relay_vclock(relay), + *data->replica_id)) { + ++*data->ack_count; + data->fired = true; + if (*data->ack_count >= *data->quorum) + fiber_wakeup(data->waiter); + } + return 0; +} + int box_clear_synchro_queue(bool try_wait) { @@ -1012,6 +1042,10 @@ box_clear_synchro_queue(bool try_wait) "simultaneous invocations"); return -1; } + /* + * XXX: we may want to write confirm + rollback even when the limbo is + * empty for the sake of limbo ownership transition. + */ if (!is_box_configured || txn_limbo_is_empty(&txn_limbo)) return 0; uint32_t former_leader_id = txn_limbo.owner_id; @@ -1030,37 +1064,104 @@ box_clear_synchro_queue(bool try_wait) break; fiber_sleep(0.001); } + /* + * Our mission was to clear the limbo from former leader's + * transactions. Exit in case someone did that for us. + */ + if (txn_limbo_is_empty(&txn_limbo) || + former_leader_id != txn_limbo.owner_id) { + in_clear_synchro_queue = false; + return 0; + } } - if (!txn_limbo_is_empty(&txn_limbo)) { - int64_t lsns[VCLOCK_MAX]; - int len = 0; - const struct vclock *vclock; - replicaset_foreach(replica) { - if (replica->relay != NULL && - relay_get_state(replica->relay) != RELAY_OFF && - !replica->anon) { - assert(!tt_uuid_is_equal(&INSTANCE_UUID, - &replica->uuid)); - vclock = relay_vclock(replica->relay); - int64_t lsn = vclock_get(vclock, - former_leader_id); - lsns[len++] = lsn; - } - } - lsns[len++] = vclock_get(box_vclock, former_leader_id); - assert(len < VCLOCK_MAX); + /* + * clear_synchro_queue() is a no-op on the limbo owner, so all the rows + * in the limbo must've come through the applier meaning they already + * have an lsn assigned, even if their WAL write hasn't finished yet. + */ + int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn; + assert(wait_lsn > 0); + + struct ack_trigger triggers[VCLOCK_MAX]; + + /* Take this node into account immediately. */ + int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn; + int trigger_count = 0; + + replicaset_foreach(replica) { + if (relay_get_state(replica->relay) != RELAY_FOLLOW || + replica->anon) + continue; + + assert(replica->id != REPLICA_ID_NIL); + assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)); - int64_t confirm_lsn = 0; - if (len >= replication_synchro_quorum) { - qsort(lsns, len, sizeof(int64_t), cmp_i64); - confirm_lsn = lsns[len - replication_synchro_quorum]; + if (vclock_get(relay_vclock(replica->relay), + former_leader_id) >= wait_lsn) { + ack_count++; + continue; } + int i = trigger_count++; + triggers[i].data = { + .fired = false, + .target_lsn = &wait_lsn, + .replica_id = &former_leader_id, + .quorum = &replication_synchro_quorum, + .ack_count = &ack_count, + .waiter = fiber(), + }; + trigger_create(&triggers[i].trigger, ack_trigger_f, + &triggers[i].data, NULL); + relay_on_status_update(replica->relay, &triggers[i].trigger); + } + + assert(trigger_count <= VCLOCK_MAX); + + if (ack_count + trigger_count < replication_synchro_quorum) { + /* Don't even bother waiting when not enough replicas. */ + say_warn("clear_synchro_queue cannot gather quorum. " + "There're only %d replicas (including this one), while" + "quorum should be %d.", ack_count + trigger_count, + replication_synchro_quorum); + for (int i = 0; i < trigger_count; i++) + trigger_clear(&triggers[i].trigger); + goto end; + } + + if (trigger_count > 0) { + /* Allow to interrupt the function when it takes too long. */ + bool cancellable = fiber_set_cancellable(true); + fiber_sleep(replication_synchro_timeout); + fiber_set_cancellable(cancellable); + } + + for (int i = 0; i < trigger_count; i++) + trigger_clear(&triggers[i].trigger); + + /* + * No point to proceed after cancellation even if got the quorum. + * Emptying the limbo involves a pair of blocking WAL writes, + * making the fiber sleep even longer, which isn't appropriate + * when it's cancelled. + */ + if (fiber_is_cancelled()) { + say_info("clear_synchro_queue interrupted by the fiber " + "cancellation."); + goto end; + } - txn_limbo_force_empty(&txn_limbo, confirm_lsn); - assert(txn_limbo_is_empty(&txn_limbo)); + if (ack_count < replication_synchro_quorum) { + say_warn("clear_synchro_queue timed out after %.2f " + "seconds. Collected %d acks, quorum is %d. ", + replication_synchro_timeout, ack_count, + replication_synchro_quorum); + goto end; } + txn_limbo_force_empty(&txn_limbo, wait_lsn); + assert(txn_limbo_is_empty(&txn_limbo)); +end: in_clear_synchro_queue = false; return 0; } diff --git a/test/replication/election_replica.lua b/test/replication/election_replica.lua index db037ed67..3b4d9a123 100644 --- a/test/replication/election_replica.lua +++ b/test/replication/election_replica.lua @@ -4,6 +4,8 @@ local INSTANCE_ID = string.match(arg[0], "%d") local SOCKET_DIR = require('fio').cwd() local SYNCHRO_QUORUM = arg[1] and tonumber(arg[1]) or 3 local ELECTION_TIMEOUT = arg[2] and tonumber(arg[2]) or 0.1 +local ELECTION_MODE = arg[3] or 'candidate' +local CONNECT_QUORUM = arg[4] and tonumber(arg[4]) or 3 local function instance_uri(instance_id) return SOCKET_DIR..'/election_replica'..instance_id..'.sock'; @@ -19,7 +21,8 @@ box.cfg({ instance_uri(3), }, replication_timeout = 0.1, - election_mode = 'candidate', + replication_connect_quorum = CONNECT_QUORUM, + election_mode = ELECTION_MODE, election_timeout = ELECTION_TIMEOUT, replication_synchro_quorum = SYNCHRO_QUORUM, replication_synchro_timeout = 0.1, diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result new file mode 100644 index 000000000..e806d9d53 --- /dev/null +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result @@ -0,0 +1,144 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... + +-- +-- gh-5435: make sure the new limbo owner commits everything there is left in +-- the limbo from an old owner. +-- + +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} + | --- + | ... + +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) + | --- + | ... +test_run:wait_fullmesh(SERVERS) + | --- + | ... + +-- Force election_replica1 to become leader. +test_run:switch('election_replica2') + | --- + | - true + | ... +box.cfg{election_mode='voter'} + | --- + | ... +test_run:switch('election_replica3') + | --- + | - true + | ... +box.cfg{election_mode='voter'} + | --- + | ... + +test_run:switch('election_replica1') + | --- + | - true + | ... +box.ctl.wait_rw() + | --- + | ... + +_ = box.schema.space.create('test', {is_sync=true}) + | --- + | ... +_ = box.space.test:create_index('pk') + | --- + | ... + +-- Fill the limbo with pending entries. 3 mustn't receive them yet. +test_run:cmd('stop server election_replica3') + | --- + | - true + | ... +box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000} + | --- + | ... + +lsn = box.info.lsn + | --- + | ... + +for i=1,10 do\ + require('fiber').create(function() box.space.test:insert{i} end)\ +end + | --- + | ... + +-- Wait for WAL write and replication. +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) + | --- + | - true + | ... +test_run:wait_lsn('election_replica2', 'election_replica1') + | --- + | ... + +test_run:cmd('switch election_replica2') + | --- + | - true + | ... + +test_run:cmd('stop server election_replica1') + | --- + | - true + | ... +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. +-- It will vote for 2, however, since 2 has newer data, and start replication +-- once 2 becomes the leader. +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') + | --- + | - true + | ... + +-- Set a huge timeout for 2 reasons. +-- First, this guards us from the intance leaving clear_synchro_queue too early +-- and confirming nothing. +-- Second, it lets us test that the instance doesn't wait for the full timeout. +box.cfg{replication_synchro_timeout=1000} + | --- + | ... +box.cfg{election_mode='candidate'} + | --- + | ... +box.ctl.wait_rw() + | --- + | ... + +-- If 2 decided whether to keep the rows or not right on becoming the leader, +-- it would roll them all back. Make sure 2 waits till the rows are replicated +-- to 3. +box.space.test:select{} + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | - [6] + | - [7] + | - [8] + | - [9] + | - [10] + | ... + +test_run:cmd('switch default') + | --- + | - true + | ... +-- To silence the QA warning. The 1st replica is already stopped. +SERVERS[1] = nil + | --- + | ... +test_run:cmd('delete server election_replica1') + | --- + | - true + | ... +test_run:drop_cluster(SERVERS) + | --- + | ... + diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua new file mode 100644 index 000000000..da218624b --- /dev/null +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua @@ -0,0 +1,65 @@ +test_run = require('test_run').new() + +-- +-- gh-5435: make sure the new limbo owner commits everything there is left in +-- the limbo from an old owner. +-- + +SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'} + +test_run:create_cluster(SERVERS, "replication", {args='2 0.4'}) +test_run:wait_fullmesh(SERVERS) + +-- Force election_replica1 to become leader. +test_run:switch('election_replica2') +box.cfg{election_mode='voter'} +test_run:switch('election_replica3') +box.cfg{election_mode='voter'} + +test_run:switch('election_replica1') +box.ctl.wait_rw() + +_ = box.schema.space.create('test', {is_sync=true}) +_ = box.space.test:create_index('pk') + +-- Fill the limbo with pending entries. 3 mustn't receive them yet. +test_run:cmd('stop server election_replica3') +box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000} + +lsn = box.info.lsn + +for i=1,10 do\ + require('fiber').create(function() box.space.test:insert{i} end)\ +end + +-- Wait for WAL write and replication. +test_run:wait_cond(function() return box.info.lsn == lsn + 10 end) +test_run:wait_lsn('election_replica2', 'election_replica1') + +test_run:cmd('switch election_replica2') + +test_run:cmd('stop server election_replica1') +-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it. +-- It will vote for 2, however, since 2 has newer data, and start replication +-- once 2 becomes the leader. +test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"') + +-- Set a huge timeout for 2 reasons. +-- First, this guards us from the intance leaving clear_synchro_queue too early +-- and confirming nothing. +-- Second, it lets us test that the instance doesn't wait for the full timeout. +box.cfg{replication_synchro_timeout=1000} +box.cfg{election_mode='candidate'} +box.ctl.wait_rw() + +-- If 2 decided whether to keep the rows or not right on becoming the leader, +-- it would roll them all back. Make sure 2 waits till the rows are replicated +-- to 3. +box.space.test:select{} + +test_run:cmd('switch default') +-- To silence the QA warning. The 1st replica is already stopped. +SERVERS[1] = nil +test_run:cmd('delete server election_replica1') +test_run:drop_cluster(SERVERS) + diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 5670acc4d..8fe3930db 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -36,6 +36,7 @@ "gh-4730-applier-rollback.test.lua": {}, "gh-4928-tx-boundaries.test.lua": {}, "gh-5440-qsync-ro.test.lua": {}, + "gh-5435-clear-synchro-queue-commit-all.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} -- 2.24.3 (Apple Git-128)