[Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything

Serge Petrenko sergepetrenko at tarantool.org
Wed Dec 23 14:59:22 MSK 2020


It is possible that a new leader (elected either via raft or manually or
via some user-written election algorithm) loses the data that the old
leader has successfully committed and confirmed.

Imagine such a situation: there are N nodes in a replicaset, the old
leader, denoted A, tries to apply some synchronous transaction. It is
written on the leader itself and N/2 other nodes, one of which is B.
The transaction has thus gathered quorum, N/2 + 1 acks.

Now A writes CONFIRM and commits the transaction, but dies before the
confirmation reaches any of its followers. B is elected the new leader and it
sees that the last A's transaction is present on N/2 nodes, so it doesn't have a
quorum (A was one of the N/2 + 1).

Current `clear_synchro_queue()` implementation makes B roll the transaction
back, leading to rollback after commit, which is unacceptable.

To fix the problem, make `clear_synchro_queue()` wait until all the rows from
the previous leader gather `replication_synchro_quorum` acks.

In case the quorum wasn't achieved during replication_synchro_timeout, rollback
nothing and wait for user's intervention.

Closes #5435
---
 src/box/box.cc                                | 149 +++++++++++++++---
 test/replication/election_replica.lua         |   5 +-
 ...5435-clear-synchro-queue-commit-all.result | 144 +++++++++++++++++
 ...35-clear-synchro-queue-commit-all.test.lua |  65 ++++++++
 test/replication/suite.cfg                    |   1 +
 5 files changed, 339 insertions(+), 25 deletions(-)
 create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.result
 create mode 100644 test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua

diff --git a/src/box/box.cc b/src/box/box.cc
index 2d403fc9a..38bf4034e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1002,6 +1002,36 @@ box_set_replication_anon(void)
 
 }
 
+struct ack_trigger_data {
+	bool fired;
+	int64_t *target_lsn;
+	uint32_t *replica_id;
+	int *quorum;
+	int *ack_count;
+	struct fiber *waiter;
+};
+
+struct ack_trigger {
+	struct ack_trigger_data data;
+	struct trigger trigger;
+};
+
+static int ack_trigger_f(struct trigger *trigger, void *event)
+{
+	struct relay *relay = (struct relay *)event;
+	struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data;
+	if (data->fired)
+		return 0;
+	if (*data->target_lsn <= vclock_get(relay_vclock(relay),
+					    *data->replica_id)) {
+		++*data->ack_count;
+		data->fired = true;
+		if (*data->ack_count >= *data->quorum)
+			fiber_wakeup(data->waiter);
+	}
+	return 0;
+}
+
 int
 box_clear_synchro_queue(bool try_wait)
 {
@@ -1012,6 +1042,10 @@ box_clear_synchro_queue(bool try_wait)
 			 "simultaneous invocations");
 		return -1;
 	}
+	/*
+	 * XXX: we may want to write confirm + rollback even when the limbo is
+	 * empty for the sake of limbo ownership transition.
+	 */
 	if (!is_box_configured || txn_limbo_is_empty(&txn_limbo))
 		return 0;
 	uint32_t former_leader_id = txn_limbo.owner_id;
@@ -1030,37 +1064,104 @@ box_clear_synchro_queue(bool try_wait)
 				break;
 			fiber_sleep(0.001);
 		}
+		/*
+		 * Our mission was to clear the limbo from former leader's
+		 * transactions. Exit in case someone did that for us.
+		 */
+		if (txn_limbo_is_empty(&txn_limbo) ||
+		    former_leader_id != txn_limbo.owner_id) {
+			in_clear_synchro_queue = false;
+			return 0;
+		}
 	}
 
-	if (!txn_limbo_is_empty(&txn_limbo)) {
-		int64_t lsns[VCLOCK_MAX];
-		int len = 0;
-		const struct vclock  *vclock;
-		replicaset_foreach(replica) {
-			if (replica->relay != NULL &&
-			    relay_get_state(replica->relay) != RELAY_OFF &&
-			    !replica->anon) {
-				assert(!tt_uuid_is_equal(&INSTANCE_UUID,
-							 &replica->uuid));
-				vclock = relay_vclock(replica->relay);
-				int64_t lsn = vclock_get(vclock,
-							 former_leader_id);
-				lsns[len++] = lsn;
-			}
-		}
-		lsns[len++] = vclock_get(box_vclock, former_leader_id);
-		assert(len < VCLOCK_MAX);
+	/*
+	 * clear_synchro_queue() is a no-op on the limbo owner, so all the rows
+	 * in the limbo must've come through the applier meaning they already
+	 * have an lsn assigned, even if their WAL write hasn't finished yet.
+	 */
+	int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn;
+	assert(wait_lsn > 0);
+
+	struct ack_trigger triggers[VCLOCK_MAX];
+
+	/* Take this node into account immediately. */
+	int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
+	int trigger_count = 0;
+
+	replicaset_foreach(replica) {
+		if (relay_get_state(replica->relay) != RELAY_FOLLOW ||
+		    replica->anon)
+			continue;
+
+		assert(replica->id != REPLICA_ID_NIL);
+		assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
 
-		int64_t confirm_lsn = 0;
-		if (len >= replication_synchro_quorum) {
-			qsort(lsns, len, sizeof(int64_t), cmp_i64);
-			confirm_lsn = lsns[len - replication_synchro_quorum];
+		if (vclock_get(relay_vclock(replica->relay),
+			       former_leader_id) >= wait_lsn) {
+			ack_count++;
+			continue;
 		}
+		int i = trigger_count++;
+		triggers[i].data = {
+			.fired = false,
+			.target_lsn = &wait_lsn,
+			.replica_id = &former_leader_id,
+			.quorum = &replication_synchro_quorum,
+			.ack_count = &ack_count,
+			.waiter = fiber(),
+		};
+		trigger_create(&triggers[i].trigger, ack_trigger_f,
+			       &triggers[i].data, NULL);
+		relay_on_status_update(replica->relay, &triggers[i].trigger);
+	}
+
+	assert(trigger_count <= VCLOCK_MAX);
+
+	if (ack_count + trigger_count < replication_synchro_quorum) {
+		/* Don't even bother waiting when not enough replicas. */
+		say_warn("clear_synchro_queue cannot gather quorum. "
+			 "There're only %d replicas (including this one), while"
+			 "quorum should be %d.", ack_count + trigger_count,
+			 replication_synchro_quorum);
+		for (int i = 0; i < trigger_count; i++)
+			trigger_clear(&triggers[i].trigger);
+		goto end;
+	}
+
+	if (trigger_count > 0) {
+		/* Allow to interrupt the function when it takes too long. */
+		bool cancellable = fiber_set_cancellable(true);
+		fiber_sleep(replication_synchro_timeout);
+		fiber_set_cancellable(cancellable);
+	}
+
+	for (int i = 0; i < trigger_count; i++)
+		trigger_clear(&triggers[i].trigger);
+
+	/*
+	 * No point to proceed after cancellation even if got the quorum.
+	 * Emptying the limbo involves a pair of blocking WAL writes,
+	 * making the fiber sleep even longer, which isn't appropriate
+	 * when it's cancelled.
+	 */
+	if (fiber_is_cancelled()) {
+		say_info("clear_synchro_queue interrupted by the fiber "
+			 "cancellation.");
+		goto end;
+	}
 
-		txn_limbo_force_empty(&txn_limbo, confirm_lsn);
-		assert(txn_limbo_is_empty(&txn_limbo));
+	if (ack_count < replication_synchro_quorum) {
+		say_warn("clear_synchro_queue timed out after %.2f "
+		         "seconds. Collected %d acks, quorum is %d. ",
+			 replication_synchro_timeout, ack_count,
+			 replication_synchro_quorum);
+		goto end;
 	}
 
+	txn_limbo_force_empty(&txn_limbo, wait_lsn);
+	assert(txn_limbo_is_empty(&txn_limbo));
+end:
 	in_clear_synchro_queue = false;
 	return 0;
 }
diff --git a/test/replication/election_replica.lua b/test/replication/election_replica.lua
index db037ed67..3b4d9a123 100644
--- a/test/replication/election_replica.lua
+++ b/test/replication/election_replica.lua
@@ -4,6 +4,8 @@ local INSTANCE_ID = string.match(arg[0], "%d")
 local SOCKET_DIR = require('fio').cwd()
 local SYNCHRO_QUORUM = arg[1] and tonumber(arg[1]) or 3
 local ELECTION_TIMEOUT = arg[2] and tonumber(arg[2]) or 0.1
+local ELECTION_MODE = arg[3] or 'candidate'
+local CONNECT_QUORUM = arg[4] and tonumber(arg[4]) or 3
 
 local function instance_uri(instance_id)
     return SOCKET_DIR..'/election_replica'..instance_id..'.sock';
@@ -19,7 +21,8 @@ box.cfg({
         instance_uri(3),
     },
     replication_timeout = 0.1,
-    election_mode = 'candidate',
+    replication_connect_quorum = CONNECT_QUORUM,
+    election_mode = ELECTION_MODE,
     election_timeout = ELECTION_TIMEOUT,
     replication_synchro_quorum = SYNCHRO_QUORUM,
     replication_synchro_timeout = 0.1,
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
new file mode 100644
index 000000000..e806d9d53
--- /dev/null
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
@@ -0,0 +1,144 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--
+-- gh-5435: make sure the new limbo owner commits everything there is left in
+-- the limbo from an old owner.
+--
+
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+ | ---
+ | ...
+
+test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
+ | ---
+ | ...
+test_run:wait_fullmesh(SERVERS)
+ | ---
+ | ...
+
+-- Force election_replica1 to become leader.
+test_run:switch('election_replica2')
+ | ---
+ | - true
+ | ...
+box.cfg{election_mode='voter'}
+ | ---
+ | ...
+test_run:switch('election_replica3')
+ | ---
+ | - true
+ | ...
+box.cfg{election_mode='voter'}
+ | ---
+ | ...
+
+test_run:switch('election_replica1')
+ | ---
+ | - true
+ | ...
+box.ctl.wait_rw()
+ | ---
+ | ...
+
+_ = box.schema.space.create('test', {is_sync=true})
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+
+-- Fill the limbo with pending entries. 3 mustn't receive them yet.
+test_run:cmd('stop server election_replica3')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000}
+ | ---
+ | ...
+
+lsn = box.info.lsn
+ | ---
+ | ...
+
+for i=1,10 do\
+    require('fiber').create(function() box.space.test:insert{i} end)\
+end
+ | ---
+ | ...
+
+-- Wait for WAL write and replication.
+test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
+ | ---
+ | - true
+ | ...
+test_run:wait_lsn('election_replica2', 'election_replica1')
+ | ---
+ | ...
+
+test_run:cmd('switch election_replica2')
+ | ---
+ | - true
+ | ...
+
+test_run:cmd('stop server election_replica1')
+ | ---
+ | - true
+ | ...
+-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
+-- It will vote for 2, however, since 2 has newer data, and start replication
+-- once 2 becomes the leader.
+test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
+ | ---
+ | - true
+ | ...
+
+-- Set a huge timeout for 2 reasons.
+-- First, this guards us from the intance leaving clear_synchro_queue too early
+-- and confirming nothing.
+-- Second, it lets us test that the instance doesn't wait for the full timeout.
+box.cfg{replication_synchro_timeout=1000}
+ | ---
+ | ...
+box.cfg{election_mode='candidate'}
+ | ---
+ | ...
+box.ctl.wait_rw()
+ | ---
+ | ...
+
+-- If 2 decided whether to keep the rows or not right on becoming the leader,
+-- it would roll them all back. Make sure 2 waits till the rows are replicated
+-- to 3.
+box.space.test:select{}
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ |   - [5]
+ |   - [6]
+ |   - [7]
+ |   - [8]
+ |   - [9]
+ |   - [10]
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+-- To silence the QA warning. The 1st replica is already stopped.
+SERVERS[1] = nil
+ | ---
+ | ...
+test_run:cmd('delete server election_replica1')
+ | ---
+ | - true
+ | ...
+test_run:drop_cluster(SERVERS)
+ | ---
+ | ...
+
diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
new file mode 100644
index 000000000..da218624b
--- /dev/null
+++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
@@ -0,0 +1,65 @@
+test_run = require('test_run').new()
+
+--
+-- gh-5435: make sure the new limbo owner commits everything there is left in
+-- the limbo from an old owner.
+--
+
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+
+test_run:create_cluster(SERVERS, "replication", {args='2 0.4'})
+test_run:wait_fullmesh(SERVERS)
+
+-- Force election_replica1 to become leader.
+test_run:switch('election_replica2')
+box.cfg{election_mode='voter'}
+test_run:switch('election_replica3')
+box.cfg{election_mode='voter'}
+
+test_run:switch('election_replica1')
+box.ctl.wait_rw()
+
+_ = box.schema.space.create('test', {is_sync=true})
+_ = box.space.test:create_index('pk')
+
+-- Fill the limbo with pending entries. 3 mustn't receive them yet.
+test_run:cmd('stop server election_replica3')
+box.cfg{replication_synchro_quorum=3, replication_synchro_timeout=1000}
+
+lsn = box.info.lsn
+
+for i=1,10 do\
+    require('fiber').create(function() box.space.test:insert{i} end)\
+end
+
+-- Wait for WAL write and replication.
+test_run:wait_cond(function() return box.info.lsn == lsn + 10 end)
+test_run:wait_lsn('election_replica2', 'election_replica1')
+
+test_run:cmd('switch election_replica2')
+
+test_run:cmd('stop server election_replica1')
+-- Since 2 is not the leader yet, 3 doesn't replicate the rows from it.
+-- It will vote for 2, however, since 2 has newer data, and start replication
+-- once 2 becomes the leader.
+test_run:cmd('start server election_replica3 with wait=False, wait_load=False, args="2 0.4 voter 2"')
+
+-- Set a huge timeout for 2 reasons.
+-- First, this guards us from the intance leaving clear_synchro_queue too early
+-- and confirming nothing.
+-- Second, it lets us test that the instance doesn't wait for the full timeout.
+box.cfg{replication_synchro_timeout=1000}
+box.cfg{election_mode='candidate'}
+box.ctl.wait_rw()
+
+-- If 2 decided whether to keep the rows or not right on becoming the leader,
+-- it would roll them all back. Make sure 2 waits till the rows are replicated
+-- to 3.
+box.space.test:select{}
+
+test_run:cmd('switch default')
+-- To silence the QA warning. The 1st replica is already stopped.
+SERVERS[1] = nil
+test_run:cmd('delete server election_replica1')
+test_run:drop_cluster(SERVERS)
+
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 5670acc4d..8fe3930db 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -36,6 +36,7 @@
     "gh-4730-applier-rollback.test.lua": {},
     "gh-4928-tx-boundaries.test.lua": {},
     "gh-5440-qsync-ro.test.lua": {},
+    "gh-5435-clear-synchro-queue-commit-all.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list