[Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything
Serge Petrenko
sergepetrenko at tarantool.org
Wed Dec 23 14:59:22 MSK 2020
It is possible that a new leader (elected either via raft or manually or
via some user-written election algorithm) loses the data that the old
leader has successfully committed and confirmed.
Imagine such a situation: there are N nodes in a replicaset, the old
leader, denoted A, tries to apply some synchronous transaction. It is
written on the leader itself and N/2 other nodes, one of which is B.
The transaction has thus gathered quorum, N/2 + 1 acks.
Now A writes CONFIRM and commits the transaction, but dies before the
confirmation reaches any of its followers. B is elected the new leader and it
sees that the last A's transaction is present on N/2 nodes, so it doesn't have a
quorum (A was one of the N/2 + 1).
Current `clear_synchro_queue()` implementation makes B roll the transaction
back, leading to rollback after commit, which is unacceptable.
To fix the problem, make `clear_synchro_queue()` wait until all the rows from
the previous leader gather `replication_synchro_quorum` acks.
In case the quorum wasn't achieved during replication_synchro_timeout, rollback
nothing and wait for user's intervention.
Closes #5435
---
src/box/box.cc | 149 +++++++++++++++---
test/replication/election_replica.lua | 5 +-
...5435-clear-synchro-queue-commit-all.result | 144 +++++++++++++++++
...35-clear-synchro-queue-commit-all.test.lua | 65 ++++++++
test/replication/suite.cfg | 1 +
5 files changed, 339 insertions(+), 25 deletions(-)
create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result
create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
diff --git a/src/box/box.cc b/src/box/box.cc
index 2d403fc9a..38bf4034e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1002,6 +1002,36 @@ box_set_replication_anon(void)
}
+struct ack_trigger_data {
+ bool fired;
+ int64_t *target_lsn;
+ uint32_t *replica_id;
+ int *quorum;
+ int *ack_count;
+ struct fiber *waiter;
+};
+
+struct ack_trigger {
+ struct ack_trigger_data data;
+ struct trigger trigger;
+};
+
+static int ack_trigger_f(struct trigger *trigger, void *event)
+{
+ struct relay *relay = (struct relay *)event;
+ struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data;
+ if (data->fired)
+ return 0;
+ if (*data->target_lsn <= vclock_get(relay_vclock(relay),
+ *data->replica_id)) {
+ ++*data->ack_count;
+ data->fired = true;
+ if (*data->ack_count >= *data->quorum)
+ fiber_wakeup(data->waiter);
+ }
+ return 0;
+}
+
int
box_clear_synchro_queue(bool try_wait)
{
@@ -1012,6 +1042,10 @@ box_clear_synchro_queue(bool try_wait)
"simultaneous invocations");
return -1;
}
+ /*
+ * XXX: we may want to write confirm + rollback even when the limbo is
+ * empty for the sake of limbo ownership transition.
+ */
if (!is_box_configured || txn_limbo_is_empty(&txn_limbo))
return 0;
uint32_t former_leader_id = txn_limbo.owner_id;
@@ -1030,37 +1064,104 @@ box_clear_synchro_queue(bool try_wait)
break;
fiber_sleep(0.001);
}
+ /*
+ * Our mission was to clear the limbo from former leader's
+ * transactions. Exit in case someone did that for us.
+ */
+ if (txn_limbo_is_empty(&txn_limbo) ||
+ former_leader_id != txn_limbo.owner_id) {
+ in_clear_synchro_queue = false;
+ return 0;
+ }
}
- if (!txn_limbo_is_empty(&txn_limbo)) {
- int64_t lsns[VCLOCK_MAX];
- int len = 0;
- const struct vclock *vclock;
- replicaset_foreach(replica) {
- if (replica->relay != NULL &&
- relay_get_state(replica->relay) != RELAY_OFF &&
- !replica->anon) {
- assert(!tt_uuid_is_equal(&INSTANCE_UUID,
- &replica->uuid));
- vclock = relay_vclock(replica->relay);
- int64_t lsn = vclock_get(vclock,
- former_leader_id);
- lsns[len++] = lsn;
- }
- }
- lsns[len++] = vclock_get(box_vclock, former_leader_id);
- assert(len < VCLOCK_MAX);
+ /*
+ * clear_synchro_queue() is a no-op on the limbo owner, so all the rows
+ * in the limbo must've come through the applier meaning they already
+ * have an lsn assigned, even if their WAL write hasn't finished yet.
+ */
+ int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn;
+ assert(wait_lsn > 0);
+
+ struct ack_trigger triggers[VCLOCK_MAX];
+
+ /* Take this node into account immediately. */
+ int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
+ int trigger_count = 0;
+
+ replicaset_foreach(replica) {
+ if (relay_get_state(replica->relay) != RELAY_FOLLOW ||
+ replica->anon)
+ continue;
+
+ assert(replica->id != REPLICA_ID_NIL);
+ assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
- int64_t confirm_lsn = 0;
- if (len >= replication_synchro_quorum) {
- qsort(lsns, len, sizeof(int64_t), cmp_i64);
- confirm_lsn = lsns[len - replication_synchro_quorum];
+ if (vclock_get(relay_vclock(replica->relay),
+ former_leader_id) >= wait_lsn) {
+ ack_count++;
+ continue;
}
+ int i = trigger_count++;
+ triggers[i].data = {
+ .fired = false,
+ .target_lsn = &wait_lsn,
+ .replica_id = &former_leader_id,
+ .quorum = &replication_synchro_quorum,
+ .ack_count = &ack_count,
+ .waiter = fiber(),
+ };
+ trigger_create(&triggers[i].trigger, ack_trigger_f,
+ &triggers[i].data, NULL);
+ relay_on_status_update(replica->relay, &triggers[i].trigger);
+ }
+
+ assert(trigger_count <= VCLOCK_MAX);
+
+ if (ack_count + trigger_count < replication_synchro_quorum) {
+ /* Don't even bother waiting when not enough replicas. */
+ say_warn("clear_synchro_queue cannot gather quorum. "
+ "There're only %d replicas (including this one), while"
+ "quorum should be %d.", ack_count + trigger_count,
+ replication_synchro_quorum);
+ for (int i = 0; i < trigger_count; i++)
+ trigger_clear(&triggers[i].trigger);
+ goto end;
+ }
+
+ if (trigger_count > 0) {
+ /* Allow to interrupt the function when it takes too long. */
+ bool cancellable = fiber_set_cancellable(true);
+ fiber_sleep(replication_synchro_timeout);
+ fiber_set_cancellable(cancellable);
+ }
+
+ for (int i = 0; i < trigger_count; i++)
+ trigger_clear(&triggers[i].trigger);
+
+ /*
+ * No point to proceed after cancellation even if got the quorum.
+ * Emptying the limbo involves a pair of blocking WAL writes,
+ * making the fiber sleep even longer, which isn't appropriate
+ * when it's cancelled.
+ */
+ if (fiber_is_cancelled()) {
+ say_info("clear_synchro_queue interrupted by the fiber "
+ "cancellation.");
+ goto end;
+ }
- txn_limbo_force_empty(&txn_limbo, confirm_lsn);
- assert(txn_limbo_is_empty(&txn_limbo));
+ if (ack_count < replication_synchro_quorum) {
+ say_warn("clear_synchro_queue timed out after %.2f "
+ "seconds. Collected %d acks, quorum is %d. ",
+ replication_synchro_timeout, ack_count,
+ replication_synchro_quorum);
+ goto end;
}
+ txn_limbo_force_empty(&txn_limbo, wait_lsn);
+ assert(txn_limbo_is_empty(&txn_limbo));
+end:
in_clear_synchro_queue = false;
return 0;
}
diff --git a/test/replication/election_replica.lua b/test/replication/election_replica.lua
index db037ed67..3b4d9a123 100644
--- a/test/replication/election_replica.lua
+++ b/test/replication/election_replica.lua
@@ -4,6 +4,8 @@ local INSTANCE_ID = string.match(arg[0], "%d")
local SOCKET_DIR = require('fio').cwd()
local SYNCHRO_QUORUM = arg[1] and tonumber(arg[1]) or 3
local ELECTION_TIMEOUT = arg[2] and tonumber(arg[2]) or 0.1
+local ELECTION_MODE = arg[3] or 'candidate'
+local CONNECT_QUORUM = arg[4] and tonumber(arg[4]) or 3
local function instance_uri(instance_id)
return SOCKET_DIR..'/election_replica'..instance_id..'.sock';
@@ -19,7 +21,8 @@ box.cfg({
instance_uri(3),
},
replication_timeout = 0.1,
- election_mode = 'candidate',
+ replication_connect_quorum = CONNECT_QUORUM,
+ election_mode = ELECTION_MODE,
election_timeout = ELECTION_TIMEOUT,
replication_synchro_quorum = SYNCHRO_QUORUM,
replication_synchro_timeout = 0.1,
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
new file mode 100644
index 000000000..e806d9d53
--- /dev/null
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
@@ -0,0 +1,144 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--
+-- gh-5435: make sure the new limbo owner commits everything there is left in
+-- the limbo from an old owner.
+--
+
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+ | ---
+ | ...
+
+test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
+ | ---
+ | ...
+test_run:wait_fullmesh(SERVERS)
+ | ---
+ | ...
+
+-- Force election_replica1 to become leader.
+test_run:switch('election_replica2')
+ | ---
+ | - true
+ | ...
+box.cfg{election_mode='voter'}
+ | ---
+ | ...
+test_run:switch('election_replica3')
+ | ---
+ | - true
+ | ...
+box.cfg{election_mode='voter'}
+ | ---
+ | ...
+
+test_run:switch('election_replica1')
+ | ---
+ | - true
+ | ...
+box.ctl.wait_rw()
+ | ---
+ | ...
+
+_ = box.schema.space.create('test', {is_sync=true})
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+
+-- Fill the limbo with pending entries. 3 mustn't receive them yet.
+test_run:cmd('stop server election_replica3')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000}
+ | ---
+ | ...
+
+lsn = box.info.lsn
+ | ---
+ | ...
+
+for i=1,10 do\
+ require('fiber').create(function() box.space.test:insert{i} end)\
+end
+ | ---
+ | ...
+
+-- Wait for WAL write and replication.
+test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('election_replica2', 'election_replica1')
+ | ---
+ | ...
+
+test_run:cmd('switch election_replica2')
+ | ---
+ | - true
+ | ...
+
+test_run:cmd('stop server election_replica1')
+ | ---
+ | - true
+ | ...
+-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
+-- It will vote for 2, however, since 2 has newer data, and start replication
+-- once 2 becomes the leader.
+test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
+ | ---
+ | - true
+ | ...
+
+-- Set a huge timeout for 2 reasons.
+-- First, this guards us from the intance leaving clear_synchro_queue too early
+-- and confirming nothing.
+-- Second, it lets us test that the instance doesn't wait for the full timeout.
+box.cfg{replication_synchro_timeout=1000}
+ | ---
+ | ...
+box.cfg{election_mode='candidate'}
+ | ---
+ | ...
+box.ctl.wait_rw()
+ | ---
+ | ...
+
+-- If 2 decided whether to keep the rows or not right on becoming the leader,
+-- it would roll them all back. Make sure 2 waits till the rows are replicated
+-- to 3.
+box.space.test:select{}
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | - [5]
+ | - [6]
+ | - [7]
+ | - [8]
+ | - [9]
+ | - [10]
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+-- To silence the QA warning. The 1st replica is already stopped.
+SERVERS[1] = nil
+ | ---
+ | ...
+test_run:cmd('delete server election_replica1')
+ | ---
+ | - true
+ | ...
+test_run:drop_cluster(SERVERS)
+ | ---
+ | ...
+
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
new file mode 100644
index 000000000..da218624b
--- /dev/null
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
@@ -0,0 +1,65 @@
+test_run = require('test_run').new()
+
+--
+-- gh-5435: make sure the new limbo owner commits everything there is left in
+-- the limbo from an old owner.
+--
+
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+
+test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
+test_run:wait_fullmesh(SERVERS)
+
+-- Force election_replica1 to become leader.
+test_run:switch('election_replica2')
+box.cfg{election_mode='voter'}
+test_run:switch('election_replica3')
+box.cfg{election_mode='voter'}
+
+test_run:switch('election_replica1')
+box.ctl.wait_rw()
+
+_ = box.schema.space.create('test', {is_sync=true})
+_ = box.space.test:create_index('pk')
+
+-- Fill the limbo with pending entries. 3 mustn't receive them yet.
+test_run:cmd('stop server election_replica3')
+box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000}
+
+lsn = box.info.lsn
+
+for i=1,10 do\
+ require('fiber').create(function() box.space.test:insert{i} end)\
+end
+
+-- Wait for WAL write and replication.
+test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
+test_run:wait_lsn('election_replica2', 'election_replica1')
+
+test_run:cmd('switch election_replica2')
+
+test_run:cmd('stop server election_replica1')
+-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
+-- It will vote for 2, however, since 2 has newer data, and start replication
+-- once 2 becomes the leader.
+test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
+
+-- Set a huge timeout for 2 reasons.
+-- First, this guards us from the intance leaving clear_synchro_queue too early
+-- and confirming nothing.
+-- Second, it lets us test that the instance doesn't wait for the full timeout.
+box.cfg{replication_synchro_timeout=1000}
+box.cfg{election_mode='candidate'}
+box.ctl.wait_rw()
+
+-- If 2 decided whether to keep the rows or not right on becoming the leader,
+-- it would roll them all back. Make sure 2 waits till the rows are replicated
+-- to 3.
+box.space.test:select{}
+
+test_run:cmd('switch default')
+-- To silence the QA warning. The 1st replica is already stopped.
+SERVERS[1] = nil
+test_run:cmd('delete server election_replica1')
+test_run:drop_cluster(SERVERS)
+
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 5670acc4d..8fe3930db 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -36,6 +36,7 @@
"gh-4730-applier-rollback.test.lua": {},
"gh-4928-tx-boundaries.test.lua": {},
"gh-5440-qsync-ro.test.lua": {},
+ "gh-5435-clear-synchro-queue-commit-all.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
--
2.24.3 (Apple Git-128)
More information about the Tarantool-patches
mailing list