[Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything

Serge Petrenko sergepetrenko at tarantool.org
Thu Dec 10 23:55:14 MSK 2020


It is possible that a new leader (elected either via raft or manually or
via some user-written election algorithm) loses the data that the old
leader has successfully committed and confirmed.

Imagine such a situation: there are N nodes in a replicaset, the old
leader, denoted A, tries to apply some synchronous transaction. It is
written on the leader itself and N other nodes, one of which is B.
The transaction has thus gathered quorum, N/2 + 1 acks.

Now A writes CONFIRM and commits the transaction, but dies before the
confirmation reaches any of its followers. B is elected the new leader and it
sees that the last A's transaction is present on N/2 nodes, so it doesn't have a
quorum (A was one of the N/2 + 1).

Current `clear_synchro_queue()` implementation makes B roll back the
transaction, leading to rollback after commit, which is unacceptable.

To fix the problem, make `clear_synchro_queue()` wait until all the rows from
the previous leader gather `replication_synchro_quorum` acks.

In case any new rows come via replication while waiting for acks,
wait for their confirmation as well.

Closes #5435
---
 src/box/box.cc                                | 126 +++++++++++++---
 test/replication/election_replica.lua         |   5 +-
 ...5435-clear-synchro-queue-commit-all.result | 137 ++++++++++++++++++
 ...35-clear-synchro-queue-commit-all.test.lua |  60 ++++++++
 test/replication/suite.cfg                    |   1 +
 5 files changed, 306 insertions(+), 23 deletions(-)
 create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result
 create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua

diff --git a/src/box/box.cc b/src/box/box.cc
index 8e0c9a160..fb9167977 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1001,6 +1001,25 @@ box_set_replication_anon(void)
 
 }
 
+struct lsn_watcher_data {
+	int *ack_count;
+	int *watcher_count;
+};
+
+static void
+count_confirm_f(void *data)
+{
+	struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
+	(*d->ack_count)++;
+}
+
+static void
+watcher_destroy_f(void *data)
+{
+	struct lsn_watcher_data *d = (struct lsn_watcher_data *)data;
+	(*d->watcher_count)--;
+}
+
 int
 box_clear_synchro_queue(bool try_wait)
 {
@@ -1011,6 +1030,10 @@ box_clear_synchro_queue(bool try_wait)
 			 "simultaneous invocations");
 		return -1;
 	}
+	/*
+	 * XXX: we may want to write confirm + rollback even when the limbo is
+	 * empty for the sake of limbo ownership transition.
+	 */
 	if (!is_box_configured || txn_limbo_is_empty(&txn_limbo))
 		return 0;
 	uint32_t former_leader_id = txn_limbo.owner_id;
@@ -1032,34 +1055,93 @@ box_clear_synchro_queue(bool try_wait)
 		}
 	}
 
-	if (!txn_limbo_is_empty(&txn_limbo)) {
-		int64_t lsns[VCLOCK_MAX];
-		int len = 0;
-		const struct vclock  *vclock;
+	if (txn_limbo_is_empty(&txn_limbo))
+		return 0;
+
+	/*
+	 * Allocate the watchers statically to not bother with alloc/free.
+	 * This is fine since we have a single execution guard.
+	 */
+	static struct relay_lsn_watcher watchers[VCLOCK_MAX];
+	for (int i = 1; i < VCLOCK_MAX; i++)
+		rlist_create(&watchers[i].in_list);
+
+	int64_t wait_lsn = 0;
+	bool restart = false;
+	do {
+		wait_lsn = txn_limbo_last_entry(&txn_limbo)->lsn;
+		/*
+		 * Take this node into account immediately.
+		 * clear_synchro_queue() is a no-op on the limbo owner for now,
+		 * so all the rows in the limbo must've come through the applier
+		 * and so they already have an lsn assigned, even if their wal
+		 * write isn't finished yet.
+		 */
+		assert(wait_lsn > 0);
+		int count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
+		int watcher_count = 0;
+		struct lsn_watcher_data data = {
+			.ack_count = &count,
+			.watcher_count = &watcher_count,
+		};
+
 		replicaset_foreach(replica) {
-			if (replica->relay != NULL &&
-			    relay_get_state(replica->relay) != RELAY_OFF &&
-			    !replica->anon) {
-				assert(!tt_uuid_is_equal(&INSTANCE_UUID,
-							 &replica->uuid));
-				vclock = relay_vclock(replica->relay);
-				int64_t lsn = vclock_get(vclock,
-							 former_leader_id);
-				lsns[len++] = lsn;
+			if (replica->anon || replica->relay == NULL ||
+			    relay_get_state(replica->relay) != RELAY_FOLLOW)
+				continue;
+			assert(replica->id != 0);
+			assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
+
+			if (vclock_get(relay_vclock(replica->relay),
+				       former_leader_id) >= wait_lsn) {
+				count++;
+				continue;
 			}
+
+			relay_lsn_watcher_create(&watchers[replica->id],
+						 former_leader_id, wait_lsn,
+						 count_confirm_f,
+						 watcher_destroy_f, &data);
+			relay_set_lsn_watcher(replica->relay,
+					      &watchers[replica->id]);
+			watcher_count++;
 		}
-		lsns[len++] = vclock_get(box_vclock, former_leader_id);
-		assert(len < VCLOCK_MAX);
 
-		int64_t confirm_lsn = 0;
-		if (len >= replication_synchro_quorum) {
-			qsort(lsns, len, sizeof(int64_t), cmp_i64);
-			confirm_lsn = lsns[len - replication_synchro_quorum];
+
+		while (count < replication_synchro_quorum &&
+		       count + watcher_count >= replication_synchro_quorum) {
+			fiber_yield();
 		}
 
-		txn_limbo_force_empty(&txn_limbo, confirm_lsn);
-		assert(txn_limbo_is_empty(&txn_limbo));
-	}
+		/*
+		 * In case some new limbo entries arrived, confirm them as well.
+		 */
+		restart = wait_lsn < txn_limbo_last_entry(&txn_limbo)->lsn;
+
+		/*
+		 * Not enough replicas connected. Give user some time to
+		 * reconfigure quorum and replicas some time to reconnect, then
+		 * restart the watchers.
+		 */
+		if (count + watcher_count < replication_synchro_quorum) {
+			say_info("clear_syncrho_queue cannot collect quorum: "
+				 "number of connected replicas (%d) is less "
+				 "than replication_synchro_quorum (%d). "
+				 "Will retry in %.2f seconds",
+				 count + watcher_count,
+				 replication_synchro_quorum,
+				 replication_timeout);
+			fiber_sleep(replication_timeout);
+			restart = true;
+		}
+
+		/* Detach the watchers that haven't fired. */
+		for (int i = 1; i < VCLOCK_MAX; i++)
+			rlist_del_entry(&watchers[i], in_list);
+	} while (restart);
+
+	txn_limbo_force_empty(&txn_limbo, wait_lsn);
+	assert(txn_limbo_is_empty(&txn_limbo));
 	return 0;
 }
 
diff --git a/test/replication/election_replica.lua b/test/replication/election_replica.lua
index db037ed67..3b4d9a123 100644
--- a/test/replication/election_replica.lua
+++ b/test/replication/election_replica.lua
@@ -4,6 +4,8 @@ local INSTANCE_ID = string.match(arg[0], "%d")
 local SOCKET_DIR = require('fio').cwd()
 local SYNCHRO_QUORUM = arg[1] and tonumber(arg[1]) or 3
 local ELECTION_TIMEOUT = arg[2] and tonumber(arg[2]) or 0.1
+local ELECTION_MODE = arg[3] or 'candidate'
+local CONNECT_QUORUM = arg[4] and tonumber(arg[4]) or 3
 
 local function instance_uri(instance_id)
     return SOCKET_DIR..'/election_replica'..instance_id..'.sock';
@@ -19,7 +21,8 @@ box.cfg({
         instance_uri(3),
     },
     replication_timeout = 0.1,
-    election_mode = 'candidate',
+    replication_connect_quorum = CONNECT_QUORUM,
+    election_mode = ELECTION_MODE,
     election_timeout = ELECTION_TIMEOUT,
     replication_synchro_quorum = SYNCHRO_QUORUM,
     replication_synchro_timeout = 0.1,
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
new file mode 100644
index 000000000..e633f9e60
--- /dev/null
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
@@ -0,0 +1,137 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--
+-- gh-5435: make sure the new limbo owner commits everything there is left in
+-- the limbo from an old owner.
+--
+
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+ | ---
+ | ...
+
+test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
+ | ---
+ | ...
+test_run:wait_fullmesh(SERVERS)
+ | ---
+ | ...
+
+-- Force election_replica1 to become leader.
+test_run:switch('election_replica2')
+ | ---
+ | - true
+ | ...
+box.cfg{election_mode='voter'}
+ | ---
+ | ...
+test_run:switch('election_replica3')
+ | ---
+ | - true
+ | ...
+box.cfg{election_mode='voter'}
+ | ---
+ | ...
+
+test_run:switch('election_replica1')
+ | ---
+ | - true
+ | ...
+box.ctl.wait_rw()
+ | ---
+ | ...
+
+_ = box.schema.space.create('test', {is_sync=true})
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+
+-- Fill the limbo with pending entries. 3 mustn't receive them yet.
+test_run:cmd('stop server election_replica3')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_quorum=3}
+ | ---
+ | ...
+
+lsn = box.info.lsn
+ | ---
+ | ...
+
+for i=1,10 do\
+    require('fiber').create(function() box.space.test:insert{i} end)\
+end
+ | ---
+ | ...
+
+-- Wait for WAL write and replication.
+test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('election_replica2', 'election_replica1')
+ | ---
+ | ...
+
+test_run:cmd('switch election_replica2')
+ | ---
+ | - true
+ | ...
+
+test_run:cmd('stop server election_replica1')
+ | ---
+ | - true
+ | ...
+-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
+-- It will vote for 3, however, since 3 has newer data, and start replication
+-- once 3 becomes the leader.
+test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
+ | ---
+ | - true
+ | ...
+
+box.cfg{election_mode='candidate'}
+ | ---
+ | ...
+box.ctl.wait_rw()
+ | ---
+ | ...
+
+-- If 2 decided whether to keep the rows or not right on becoming the leader,
+-- it would roll them all back. Make sure 2 waits till the rows are replicated
+-- to 3.
+box.space.test:select{}
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ |   - [5]
+ |   - [6]
+ |   - [7]
+ |   - [8]
+ |   - [9]
+ |   - [10]
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+-- To silence the QA warning. The 1st replica is already stopped.
+SERVERS[1] = nil
+ | ---
+ | ...
+test_run:cmd('delete server election_replica1')
+ | ---
+ | - true
+ | ...
+test_run:drop_cluster(SERVERS)
+ | ---
+ | ...
+
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
new file mode 100644
index 000000000..6cf616671
--- /dev/null
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
@@ -0,0 +1,60 @@
+test_run = require('test_run').new()
+
+--
+-- gh-5435: make sure the new limbo owner commits everything there is left in
+-- the limbo from an old owner.
+--
+
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+
+test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
+test_run:wait_fullmesh(SERVERS)
+
+-- Force election_replica1 to become leader.
+test_run:switch('election_replica2')
+box.cfg{election_mode='voter'}
+test_run:switch('election_replica3')
+box.cfg{election_mode='voter'}
+
+test_run:switch('election_replica1')
+box.ctl.wait_rw()
+
+_ = box.schema.space.create('test', {is_sync=true})
+_ = box.space.test:create_index('pk')
+
+-- Fill the limbo with pending entries. 3 mustn't receive them yet.
+test_run:cmd('stop server election_replica3')
+box.cfg{replication_synchro_quorum=3}
+
+lsn = box.info.lsn
+
+for i=1,10 do\
+    require('fiber').create(function() box.space.test:insert{i} end)\
+end
+
+-- Wait for WAL write and replication.
+test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
+test_run:wait_lsn('election_replica2', 'election_replica1')
+
+test_run:cmd('switch election_replica2')
+
+test_run:cmd('stop server election_replica1')
+-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
+-- It will vote for 3, however, since 3 has newer data, and start replication
+-- once 3 becomes the leader.
+test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
+
+box.cfg{election_mode='candidate'}
+box.ctl.wait_rw()
+
+-- If 2 decided whether to keep the rows or not right on becoming the leader,
+-- it would roll them all back. Make sure 2 waits till the rows are replicated
+-- to 3.
+box.space.test:select{}
+
+test_run:cmd('switch default')
+-- To silence the QA warning. The 1st replica is already stopped.
+SERVERS[1] = nil
+test_run:cmd('delete server election_replica1')
+test_run:drop_cluster(SERVERS)
+
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 5670acc4d..8fe3930db 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -36,6 +36,7 @@
     "gh-4730-applier-rollback.test.lua": {},
     "gh-4928-tx-boundaries.test.lua": {},
     "gh-5440-qsync-ro.test.lua": {},
+    "gh-5435-clear-synchro-queue-commit-all.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list