[Tarantool-patches] [PATCH v4 09/16] box: split promote() into reasonable parts

Serge Petrenko sergepetrenko at tarantool.org
Wed Jul 14 21:25:37 MSK 2021


box_promote() is a monster. It does a lot of different things based on
flags: try_wait and run_elections. The flags themselves depend on the
node's Raft state and the lunar calendar.

Moreover, there are multiple cancellation points and places where
external state may have changed and needs a re-check.

Things are going to get even worse with the introduction of box.ctl.demote().

So it's time to split up box_promote() into reasonable parts, each doing
exactly one thing.

This commit mostly addresses the multiple cancellation points issue,
so that promote() doesn't look like a huge pile of if(something_changed)
blocks. Some other functions will look like that instead.

Part of #6034
---
 src/box/box.cc | 269 ++++++++++++++++++++++++++++---------------------
 1 file changed, 155 insertions(+), 114 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 86370514a..445875f8f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1527,6 +1527,147 @@ box_wait_quorum(uint32_t lead_id, int64_t target_lsn, int quorum,
 	return 0;
 }
 
+/**
+ * A helper to start new Raft election round and wait until the election results
+ * are known.
+ * Returns 0 in case this instance has won the elections, -1 otherwise.
+ */
+static int
+box_run_elections(void)
+{
+	assert(box_raft()->is_enabled);
+	assert(box_election_mode != ELECTION_MODE_VOTER);
+	/*
+	 * Make this instance a candidate and run until some leader, not
+	 * necessarily this instance, emerges.
+	 */
+	raft_start_candidate(box_raft());
+	/*
+	 * Trigger new elections without waiting for an old leader to
+	 * disappear.
+	 */
+	raft_new_term(box_raft());
+	int rc = box_raft_wait_leader_found();
+
+	if (box_election_mode == ELECTION_MODE_MANUAL)
+		raft_stop_candidate(box_raft(), false);
+	if (rc != 0)
+		return -1;
+	if (!box_raft()->is_enabled) {
+		diag_set(ClientError, ER_RAFT_DISABLED);
+		return -1;
+	}
+	if (box_raft()->state != RAFT_STATE_LEADER) {
+		diag_set(ClientError, ER_INTERFERING_PROMOTE,
+			 box_raft()->leader);
+		return -1;
+	}
+
+	return 0;
+}
+
+/**
+ * Check whether the greatest promote term has changed since it was last read.
+ * IOW check that a foreign PROMOTE arrived while we were sleeping.
+ */
+static int
+box_check_promote_term_changed(uint64_t promote_term)
+{
+	if (txn_limbo.promote_greatest_term != promote_term) {
+		diag_set(ClientError, ER_INTERFERING_PROMOTE,
+			 txn_limbo.owner_id);
+		return -1;
+	}
+	return 0;
+}
+
+/** Try waiting until limbo is emptied up to given timeout. */
+static int
+box_try_wait_confirm(double timeout)
+{
+	uint64_t promote_term = txn_limbo.promote_greatest_term;
+	txn_limbo_wait_empty(&txn_limbo, timeout);
+	return box_check_promote_term_changed(promote_term);
+}
+
+/**
+ * A helper to wait until all limbo entries are ready to be confirmed, i.e.
+ * written to WAL and have gathered a quorum of ACKs from replicas.
+ * Return lsn of the last limbo entry on success, -1 on error.
+ */
+static int64_t
+box_wait_limbo_acked(void)
+{
+	if (txn_limbo_is_empty(&txn_limbo))
+		return txn_limbo.confirmed_lsn;
+
+	uint64_t promote_term = txn_limbo.promote_greatest_term;
+	int quorum = replication_synchro_quorum;
+	struct txn_limbo_entry *last_entry;
+	last_entry = txn_limbo_last_synchro_entry(&txn_limbo);
+	/* Wait for the last entries WAL write. */
+	if (last_entry->lsn < 0) {
+		int64_t tid = last_entry->txn->id;
+
+		if (wal_sync(NULL) < 0)
+			return -1;
+
+		if (box_check_promote_term_changed(promote_term) < 0)
+			return -1;
+		if (txn_limbo_is_empty(&txn_limbo))
+			return txn_limbo.confirmed_lsn;
+		if (tid != txn_limbo_last_synchro_entry(&txn_limbo)->txn->id) {
+			diag_set(ClientError, ER_QUORUM_WAIT, quorum,
+				 "new synchronous transactions appeared");
+			return -1;
+		}
+	}
+	assert(last_entry->lsn > 0);
+	int64_t wait_lsn = last_entry->lsn;
+
+	if (box_wait_quorum(txn_limbo.owner_id, wait_lsn, quorum,
+			    replication_synchro_timeout) < 0)
+		return -1;
+
+	if (box_check_promote_term_changed(promote_term) < 0)
+		return -1;
+
+	if (txn_limbo_is_empty(&txn_limbo))
+		return txn_limbo.confirmed_lsn;
+
+	if (quorum < replication_synchro_quorum) {
+		diag_set(ClientError, ER_QUORUM_WAIT, quorum,
+			 "quorum was increased while waiting");
+		return -1;
+	}
+	if (wait_lsn < txn_limbo_last_synchro_entry(&txn_limbo)->lsn) {
+		diag_set(ClientError, ER_QUORUM_WAIT, quorum,
+			 "new synchronous transactions appeared");
+		return -1;
+	}
+
+	return wait_lsn;
+}
+
+/** Write and process a PROMOTE request. */
+static void
+box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn)
+{
+	assert(box_raft()->volatile_term == box_raft()->term);
+	assert(promote_lsn >= 0);
+	txn_limbo_write_promote(&txn_limbo, promote_lsn,
+				box_raft()->term);
+	struct synchro_request req = {
+		.type = IPROTO_PROMOTE,
+		.replica_id = prev_leader_id,
+		.origin_id = instance_id,
+		.lsn = promote_lsn,
+		.term = box_raft()->term,
+	};
+	txn_limbo_process(&txn_limbo, &req);
+	assert(txn_limbo_is_empty(&txn_limbo));
+}
+
 int
 box_promote(void)
 {
@@ -1537,6 +1678,10 @@ box_promote(void)
 			 "simultaneous invocations");
 		return -1;
 	}
+	in_promote = true;
+	auto promote_guard = make_scoped_guard([&] {
+		in_promote = false;
+	});
 
 	/*
 	 * Do nothing when box isn't configured and when PROMOTE was already
@@ -1582,122 +1727,18 @@ box_promote(void)
 		unreachable();
 	}
 
-	uint32_t former_leader_id = txn_limbo.owner_id;
-	int64_t wait_lsn = txn_limbo.confirmed_lsn;
-	int rc = 0;
-	int quorum = replication_synchro_quorum;
-	in_promote = true;
-	auto promote_guard = make_scoped_guard([&] {
-		in_promote = false;
-	});
-
-	if (run_elections) {
-		/*
-		 * Make this instance a candidate and run until some leader, not
-		 * necessarily this instance, emerges.
-		 */
-		raft_start_candidate(box_raft());
-		/*
-		 * Trigger new elections without waiting for an old leader to
-		 * disappear.
-		 */
-		raft_new_term(box_raft());
-		rc = box_raft_wait_leader_found();
-		/*
-		 * Do not reset raft mode if it was changed while running the
-		 * elections.
-		 */
-		if (box_election_mode == ELECTION_MODE_MANUAL)
-			raft_stop_candidate(box_raft(), false);
-		if (rc != 0)
-			return -1;
-		if (!box_raft()->is_enabled) {
-			diag_set(ClientError, ER_RAFT_DISABLED);
-			return -1;
-		}
-		if (box_raft()->state != RAFT_STATE_LEADER) {
-			diag_set(ClientError, ER_INTERFERING_PROMOTE,
-				 box_raft()->leader);
-			return -1;
-		}
-	}
-
-	if (txn_limbo_is_empty(&txn_limbo))
-		goto promote;
+	int64_t wait_lsn = -1;
 
-	if (try_wait) {
-		/* Wait until pending confirmations/rollbacks reach us. */
-		double timeout = 2 * replication_synchro_timeout;
-		txn_limbo_wait_empty(&txn_limbo, timeout);
-		/*
-		 * Our mission was to clear the limbo from former leader's
-		 * transactions. Exit in case someone did that for us.
-		 */
-		if (former_leader_id != txn_limbo.owner_id) {
-			diag_set(ClientError, ER_INTERFERING_PROMOTE,
-				 txn_limbo.owner_id);
-			return -1;
-		}
-		if (txn_limbo_is_empty(&txn_limbo)) {
-			wait_lsn = txn_limbo.confirmed_lsn;
-			goto promote;
-		}
-	}
-
-	struct txn_limbo_entry *last_entry;
-	last_entry = txn_limbo_last_synchro_entry(&txn_limbo);
-	/* Wait for the last entries WAL write. */
-	if (last_entry->lsn < 0) {
-		int64_t tid = last_entry->txn->id;
-		if (wal_sync(NULL) < 0)
-			return -1;
-		if (former_leader_id != txn_limbo.owner_id) {
-			diag_set(ClientError, ER_INTERFERING_PROMOTE,
-				 txn_limbo.owner_id);
-			return -1;
-		}
-		if (txn_limbo_is_empty(&txn_limbo)) {
-			wait_lsn = txn_limbo.confirmed_lsn;
-			goto promote;
-		}
-		if (tid != txn_limbo_last_synchro_entry(&txn_limbo)->txn->id) {
-			diag_set(ClientError, ER_QUORUM_WAIT, quorum,
-				 "new synchronous transactions appeared");
-			return -1;
-		}
-	}
-	wait_lsn = last_entry->lsn;
-	assert(wait_lsn > 0);
+	if (run_elections && box_run_elections() < 0)
+		return -1;
+	if (try_wait &&
+	    box_try_wait_confirm(2 * replication_synchro_timeout) < 0)
+		return -1;
+	if ((wait_lsn = box_wait_limbo_acked()) < 0)
+		return -1;
 
-	rc = box_wait_quorum(former_leader_id, wait_lsn, quorum,
-			     replication_synchro_timeout);
-	if (rc == 0) {
-		if (quorum < replication_synchro_quorum) {
-			diag_set(ClientError, ER_QUORUM_WAIT, quorum,
-				 "quorum was increased while waiting");
-			rc = -1;
-		} else if (wait_lsn < txn_limbo_last_synchro_entry(&txn_limbo)->lsn) {
-			diag_set(ClientError, ER_QUORUM_WAIT, quorum,
-				 "new synchronous transactions appeared");
-			rc = -1;
-		} else {
-promote:
-			/* We cannot possibly get here in a volatile state. */
-			assert(box_raft()->volatile_term == box_raft()->term);
-			txn_limbo_write_promote(&txn_limbo, wait_lsn,
-						box_raft()->term);
-			struct synchro_request req = {
-				.type = IPROTO_PROMOTE,
-				.replica_id = former_leader_id,
-				.origin_id = instance_id,
-				.lsn = wait_lsn,
-				.term = box_raft()->term,
-			};
-			txn_limbo_process(&txn_limbo, &req);
-			assert(txn_limbo_is_empty(&txn_limbo));
-		}
-	}
-	return rc;
+	box_issue_promote(txn_limbo.owner_id, wait_lsn);
+	return 0;
 }
 
 int
-- 
2.30.1 (Apple Git-130)



More information about the Tarantool-patches mailing list