[Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything
Serge Petrenko
sergepetrenko at tarantool.org
Thu Dec 10 23:55:14 MSK 2020
It is possible that a new leader (elected either via raft or manually or
via some user-written election algorithm) loses the data that the old
leader has successfully committed and confirmed.
Imagine such a situation: there are N nodes in a replicaset, the old
leader, denoted A, tries to apply some synchronous transaction. It is
written on the leader itself and N other nodes, one of which is B.
The transaction has thus gathered quorum, N/2 + 1 acks.
Now A writes CONFIRM and commits the transaction, but dies before the
confirmation reaches any of its followers. B is elected the new leader and it
sees that the last A's transaction is present on N/2 nodes, so it doesn't have a
quorum (A was one of the N/2 + 1).
Current `clear_synchro_queue()` implementation makes B roll back the
transaction, leading to rollback after commit, which is unacceptable.
To fix the problem, make `clear_synchro_queue()` wait until all the rows from
the previous leader gather `replication_synchro_quorum` acks.
In case any new rows come via replication while waiting for acks,
wait for their confirmation as well.
Closes #5435
---
src/box/box.cc | 126 +++++++++++++---
test/replication/election_replica.lua | 5 +-
...5435-clear-synchro-queue-commit-all.result | 137 ++++++++++++++++++
...35-clear-synchro-queue-commit-all.test.lua | 60 ++++++++
test/replication/suite.cfg | 1 +
5 files changed, 306 insertions(+), 23 deletions(-)
create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result
create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
diff --git a/src/box/box.cc b/src/box/box.cc
index 8e0c9a160..fb9167977 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1001,6 +1001,25 @@ box_set_replication_anon(void)
}
+struct lsn_watcher_data {
+ int *ack_count;
+ int *watcher_count;
+};
+
+static void
+count_confirm_f(void *data)
+{
+ struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
+ (*d->ack_count)++;
+}
+
+static void
+watcher_destroy_f(void *data)
+{
+ struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
+ (*d->watcher_count)--;
+}
+
int
box_clear_synchro_queue(bool try_wait)
{
@@ -1011,6 +1030,10 @@ box_clear_synchro_queue(bool try_wait)
"simultaneous invocations");
return -1;
}
+ /*
+ * XXX: we may want to write confirm + rollback even when the limbo is
+ * empty for the sake of limbo ownership transition.
+ */
if (!is_box_configured || txn_limbo_is_empty(&txn_limbo))
return 0;
uint32_t former_leader_id = txn_limbo.owner_id;
@@ -1032,34 +1055,93 @@ box_clear_synchro_queue(bool try_wait)
}
}
- if (!txn_limbo_is_empty(&txn_limbo)) {
- int64_t lsns[VCLOCK_MAX];
- int len = 0;
- const struct vclock *vclock;
+ if (txn_limbo_is_empty(&txn_limbo))
+ return 0;
+
+ /*
+ * Allocate the watchers statically to not bother with alloc/free.
+ * This is fine since we have a single execution guard.
+ */
+ static struct relay_lsn_watcher watchers[VCLOCK_MAX];
+ for (int i = 1; i < VCLOCK_MAX; i++)
+ rlist_create(&watchers[i].in_list);
+
+ int64_t wait_lsn = 0;
+ bool restart = false;
+ do {
+ wait_lsn = txn_limbo_last_entry(&txn_limbo)->lsn;
+ /*
+ * Take this node into account immediately.
+ * clear_synchro_queue() is a no-op on the limbo owner for now,
+ * so all the rows in the limbo must've come through the applier
+ * and so they already have an lsn assigned, even if their wal
+ * write isn't finished yet.
+ */
+ assert(wait_lsn > 0);
+ int count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
+ int watcher_count = 0;
+ struct lsn_watcher_data data = {
+ .ack_count = &count,
+ .watcher_count = &watcher_count,
+ };
+
replicaset_foreach(replica) {
- if (replica->relay != NULL &&
- relay_get_state(replica->relay) != RELAY_OFF &&
- !replica->anon) {
- assert(!tt_uuid_is_equal(&INSTANCE_UUID,
- &replica->uuid));
- vclock = relay_vclock(replica->relay);
- int64_t lsn = vclock_get(vclock,
- former_leader_id);
- lsns[len++] = lsn;
+ if (replica->anon || replica->relay == NULL ||
+ relay_get_state(replica->relay) != RELAY_FOLLOW)
+ continue;
+ assert(replica->id != 0);
+ assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
+
+ if (vclock_get(relay_vclock(replica->relay),
+ former_leader_id) >= wait_lsn) {
+ count++;
+ continue;
}
+
+ relay_lsn_watcher_create(&watchers[replica->id],
+ former_leader_id, wait_lsn,
+ count_confirm_f,
+ watcher_destroy_f, &data);
+ relay_set_lsn_watcher(replica->relay,
+ &watchers[replica->id]);
+ watcher_count++;
}
- lsns[len++] = vclock_get(box_vclock, former_leader_id);
- assert(len < VCLOCK_MAX);
- int64_t confirm_lsn = 0;
- if (len >= replication_synchro_quorum) {
- qsort(lsns, len, sizeof(int64_t), cmp_i64);
- confirm_lsn = lsns[len - replication_synchro_quorum];
+
+ while (count < replication_synchro_quorum &&
+ count + watcher_count >= replication_synchro_quorum) {
+ fiber_yield();
}
- txn_limbo_force_empty(&txn_limbo, confirm_lsn);
- assert(txn_limbo_is_empty(&txn_limbo));
- }
+ /*
+ * In case some new limbo entries arrived, confirm them as well.
+ */
+ restart = wait_lsn < txn_limbo_last_entry(&txn_limbo)->lsn;
+
+ /*
+ * Not enough replicas connected. Give user some time to
+ * reconfigure quorum and replicas some time to reconnect, then
+ * restart the watchers.
+ */
+ if (count + watcher_count < replication_synchro_quorum) {
+ say_info("clear_syncrho_queue cannot collect quorum: "
+ "number of connected replicas (%d) is less "
+ "than replication_synchro_quorum (%d). "
+ "Will retry in %.2f seconds",
+ count + watcher_count,
+ replication_synchro_quorum,
+ replication_timeout);
+ fiber_sleep(replication_timeout);
+ restart = true;
+ }
+
+ /* Detach the watchers that haven't fired. */
+ for (int i = 1; i < VCLOCK_MAX; i++)
+ rlist_del_entry(&watchers[i], in_list);
+ } while (restart);
+
+ txn_limbo_force_empty(&txn_limbo, wait_lsn);
+ assert(txn_limbo_is_empty(&txn_limbo));
return 0;
}
diff --git a/test/replication/election_replica.lua b/test/replication/election_replica.lua
index db037ed67..3b4d9a123 100644
--- a/test/replication/election_replica.lua
+++ b/test/replication/election_replica.lua
@@ -4,6 +4,8 @@ local INSTANCE_ID = string.match(arg[0], "%d")
local SOCKET_DIR = require('fio').cwd()
local SYNCHRO_QUORUM = arg[1] and tonumber(arg[1]) or 3
local ELECTION_TIMEOUT = arg[2] and tonumber(arg[2]) or 0.1
+local ELECTION_MODE = arg[3] or 'candidate'
+local CONNECT_QUORUM = arg[4] and tonumber(arg[4]) or 3
local function instance_uri(instance_id)
return SOCKET_DIR..'/election_replica'..instance_id..'.sock';
@@ -19,7 +21,8 @@ box.cfg({
instance_uri(3),
},
replication_timeout = 0.1,
- election_mode = 'candidate',
+ replication_connect_quorum = CONNECT_QUORUM,
+ election_mode = ELECTION_MODE,
election_timeout = ELECTION_TIMEOUT,
replication_synchro_quorum = SYNCHRO_QUORUM,
replication_synchro_timeout = 0.1,
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
new file mode 100644
index 000000000..e633f9e60
--- /dev/null
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
@@ -0,0 +1,137 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--
+-- gh-5435: make sure the new limbo owner commits everything there is left in
+-- the limbo from an old owner.
+--
+
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+ | ---
+ | ...
+
+test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
+ | ---
+ | ...
+test_run:wait_fullmesh(SERVERS)
+ | ---
+ | ...
+
+-- Force election_replica1 to become leader.
+test_run:switch('election_replica2')
+ | ---
+ | - true
+ | ...
+box.cfg{election_mode='voter'}
+ | ---
+ | ...
+test_run:switch('election_replica3')
+ | ---
+ | - true
+ | ...
+box.cfg{election_mode='voter'}
+ | ---
+ | ...
+
+test_run:switch('election_replica1')
+ | ---
+ | - true
+ | ...
+box.ctl.wait_rw()
+ | ---
+ | ...
+
+_ = box.schema.space.create('test', {is_sync=true})
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+
+-- Fill the limbo with pending entries. 3 mustn't receive them yet.
+test_run:cmd('stop server election_replica3')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_quorum=3}
+ | ---
+ | ...
+
+lsn = box.info.lsn
+ | ---
+ | ...
+
+for i=1,10 do\
+ require('fiber').create(function() box.space.test:insert{i} end)\
+end
+ | ---
+ | ...
+
+-- Wait for WAL write and replication.
+test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('election_replica2', 'election_replica1')
+ | ---
+ | ...
+
+test_run:cmd('switch election_replica2')
+ | ---
+ | - true
+ | ...
+
+test_run:cmd('stop server election_replica1')
+ | ---
+ | - true
+ | ...
+-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
+-- It will vote for 3, however, since 3 has newer data, and start replication
+-- once 3 becomes the leader.
+test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
+ | ---
+ | - true
+ | ...
+
+box.cfg{election_mode='candidate'}
+ | ---
+ | ...
+box.ctl.wait_rw()
+ | ---
+ | ...
+
+-- If 2 decided whether to keep the rows or not right on becoming the leader,
+-- it would roll them all back. Make sure 2 waits till the rows are replicated
+-- to 3.
+box.space.test:select{}
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | - [5]
+ | - [6]
+ | - [7]
+ | - [8]
+ | - [9]
+ | - [10]
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+-- To silence the QA warning. The 1st replica is already stopped.
+SERVERS[1] = nil
+ | ---
+ | ...
+test_run:cmd('delete server election_replica1')
+ | ---
+ | - true
+ | ...
+test_run:drop_cluster(SERVERS)
+ | ---
+ | ...
+
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
new file mode 100644
index 000000000..6cf616671
--- /dev/null
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
@@ -0,0 +1,60 @@
+test_run = require('test_run').new()
+
+--
+-- gh-5435: make sure the new limbo owner commits everything there is left in
+-- the limbo from an old owner.
+--
+
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+
+test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
+test_run:wait_fullmesh(SERVERS)
+
+-- Force election_replica1 to become leader.
+test_run:switch('election_replica2')
+box.cfg{election_mode='voter'}
+test_run:switch('election_replica3')
+box.cfg{election_mode='voter'}
+
+test_run:switch('election_replica1')
+box.ctl.wait_rw()
+
+_ = box.schema.space.create('test', {is_sync=true})
+_ = box.space.test:create_index('pk')
+
+-- Fill the limbo with pending entries. 3 mustn't receive them yet.
+test_run:cmd('stop server election_replica3')
+box.cfg{replication_synchro_quorum=3}
+
+lsn = box.info.lsn
+
+for i=1,10 do\
+ require('fiber').create(function() box.space.test:insert{i} end)\
+end
+
+-- Wait for WAL write and replication.
+test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
+test_run:wait_lsn('election_replica2', 'election_replica1')
+
+test_run:cmd('switch election_replica2')
+
+test_run:cmd('stop server election_replica1')
+-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
+-- It will vote for 3, however, since 3 has newer data, and start replication
+-- once 3 becomes the leader.
+test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
+
+box.cfg{election_mode='candidate'}
+box.ctl.wait_rw()
+
+-- If 2 decided whether to keep the rows or not right on becoming the leader,
+-- it would roll them all back. Make sure 2 waits till the rows are replicated
+-- to 3.
+box.space.test:select{}
+
+test_run:cmd('switch default')
+-- To silence the QA warning. The 1st replica is already stopped.
+SERVERS[1] = nil
+test_run:cmd('delete server election_replica1')
+test_run:drop_cluster(SERVERS)
+
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 5670acc4d..8fe3930db 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -36,6 +36,7 @@
"gh-4730-applier-rollback.test.lua": {},
"gh-4928-tx-boundaries.test.lua": {},
"gh-5440-qsync-ro.test.lua": {},
+ "gh-5435-clear-synchro-queue-commit-all.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
--
2.24.3 (Apple Git-128)
More information about the Tarantool-patches
mailing list