[Tarantool-patches] [PATCH v4 09/16] box: split promote() into reasonable parts
Serge Petrenko
sergepetrenko at tarantool.org
Wed Jul 14 21:25:37 MSK 2021
box_promote() is a monster. It does a lot of different things based on
flags: try_wait and run_elections. The flags themselves depend on the
node's Raft state and the lunar calendar.
Moreover, there are multiple cancellation points and places where
external state may have changed and needs a re-check.
Things are going to get even worse with the introduction of box.ctl.demote().
So it's time to split up box_promote() into reasonable parts, each doing
exactly one thing.
This commit mostly addresses the multiple cancellation points issue,
so that promote() doesn't look like a huge pile of if(something_changed)
blocks. Some other functions will look like that instead.
Part of #6034
---
src/box/box.cc | 269 ++++++++++++++++++++++++++++---------------------
1 file changed, 155 insertions(+), 114 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 86370514a..445875f8f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1527,6 +1527,147 @@ box_wait_quorum(uint32_t lead_id, int64_t target_lsn, int quorum,
return 0;
}
+/**
+ * A helper to start new Raft election round and wait until the election results
+ * are known.
+ * Returns 0 in case this instance has won the elections, -1 otherwise.
+ */
+static int
+box_run_elections(void)
+{
+ assert(box_raft()->is_enabled);
+ assert(box_election_mode != ELECTION_MODE_VOTER);
+ /*
+ * Make this instance a candidate and run until some leader, not
+ * necessarily this instance, emerges.
+ */
+ raft_start_candidate(box_raft());
+ /*
+ * Trigger new elections without waiting for an old leader to
+ * disappear.
+ */
+ raft_new_term(box_raft());
+ int rc = box_raft_wait_leader_found();
+
+ if (box_election_mode == ELECTION_MODE_MANUAL)
+ raft_stop_candidate(box_raft(), false);
+ if (rc != 0)
+ return -1;
+ if (!box_raft()->is_enabled) {
+ diag_set(ClientError, ER_RAFT_DISABLED);
+ return -1;
+ }
+ if (box_raft()->state != RAFT_STATE_LEADER) {
+ diag_set(ClientError, ER_INTERFERING_PROMOTE,
+ box_raft()->leader);
+ return -1;
+ }
+
+ return 0;
+}
+
+/**
+ * Check whether the greatest promote term has changed since it was last read.
+ * IOW check that a foreign PROMOTE arrived while we were sleeping.
+ */
+static int
+box_check_promote_term_changed(uint64_t promote_term)
+{
+ if (txn_limbo.promote_greatest_term != promote_term) {
+ diag_set(ClientError, ER_INTERFERING_PROMOTE,
+ txn_limbo.owner_id);
+ return -1;
+ }
+ return 0;
+}
+
+/** Try waiting until limbo is emptied up to given timeout. */
+static int
+box_try_wait_confirm(double timeout)
+{
+ uint64_t promote_term = txn_limbo.promote_greatest_term;
+ txn_limbo_wait_empty(&txn_limbo, timeout);
+ return box_check_promote_term_changed(promote_term);
+}
+
+/**
+ * A helper to wait until all limbo entries are ready to be confirmed, i.e.
+ * written to WAL and have gathered a quorum of ACKs from replicas.
+ * Return lsn of the last limbo entry on success, -1 on error.
+ */
+static int64_t
+box_wait_limbo_acked(void)
+{
+ if (txn_limbo_is_empty(&txn_limbo))
+ return txn_limbo.confirmed_lsn;
+
+ uint64_t promote_term = txn_limbo.promote_greatest_term;
+ int quorum = replication_synchro_quorum;
+ struct txn_limbo_entry *last_entry;
+ last_entry = txn_limbo_last_synchro_entry(&txn_limbo);
+ /* Wait for the last entries WAL write. */
+ if (last_entry->lsn < 0) {
+ int64_t tid = last_entry->txn->id;
+
+ if (wal_sync(NULL) < 0)
+ return -1;
+
+ if (box_check_promote_term_changed(promote_term) < 0)
+ return -1;
+ if (txn_limbo_is_empty(&txn_limbo))
+ return txn_limbo.confirmed_lsn;
+ if (tid != txn_limbo_last_synchro_entry(&txn_limbo)->txn->id) {
+ diag_set(ClientError, ER_QUORUM_WAIT, quorum,
+ "new synchronous transactions appeared");
+ return -1;
+ }
+ }
+ assert(last_entry->lsn > 0);
+ int64_t wait_lsn = last_entry->lsn;
+
+ if (box_wait_quorum(txn_limbo.owner_id, wait_lsn, quorum,
+ replication_synchro_timeout) < 0)
+ return -1;
+
+ if (box_check_promote_term_changed(promote_term) < 0)
+ return -1;
+
+ if (txn_limbo_is_empty(&txn_limbo))
+ return txn_limbo.confirmed_lsn;
+
+ if (quorum < replication_synchro_quorum) {
+ diag_set(ClientError, ER_QUORUM_WAIT, quorum,
+ "quorum was increased while waiting");
+ return -1;
+ }
+ if (wait_lsn < txn_limbo_last_synchro_entry(&txn_limbo)->lsn) {
+ diag_set(ClientError, ER_QUORUM_WAIT, quorum,
+ "new synchronous transactions appeared");
+ return -1;
+ }
+
+ return wait_lsn;
+}
+
+/** Write and process a PROMOTE request. */
+static void
+box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn)
+{
+ assert(box_raft()->volatile_term == box_raft()->term);
+ assert(promote_lsn >= 0);
+ txn_limbo_write_promote(&txn_limbo, promote_lsn,
+ box_raft()->term);
+ struct synchro_request req = {
+ .type = IPROTO_PROMOTE,
+ .replica_id = prev_leader_id,
+ .origin_id = instance_id,
+ .lsn = promote_lsn,
+ .term = box_raft()->term,
+ };
+ txn_limbo_process(&txn_limbo, &req);
+ assert(txn_limbo_is_empty(&txn_limbo));
+}
+
int
box_promote(void)
{
@@ -1537,6 +1678,10 @@ box_promote(void)
"simultaneous invocations");
return -1;
}
+ in_promote = true;
+ auto promote_guard = make_scoped_guard([&] {
+ in_promote = false;
+ });
/*
* Do nothing when box isn't configured and when PROMOTE was already
@@ -1582,122 +1727,18 @@ box_promote(void)
unreachable();
}
- uint32_t former_leader_id = txn_limbo.owner_id;
- int64_t wait_lsn = txn_limbo.confirmed_lsn;
- int rc = 0;
- int quorum = replication_synchro_quorum;
- in_promote = true;
- auto promote_guard = make_scoped_guard([&] {
- in_promote = false;
- });
-
- if (run_elections) {
- /*
- * Make this instance a candidate and run until some leader, not
- * necessarily this instance, emerges.
- */
- raft_start_candidate(box_raft());
- /*
- * Trigger new elections without waiting for an old leader to
- * disappear.
- */
- raft_new_term(box_raft());
- rc = box_raft_wait_leader_found();
- /*
- * Do not reset raft mode if it was changed while running the
- * elections.
- */
- if (box_election_mode == ELECTION_MODE_MANUAL)
- raft_stop_candidate(box_raft(), false);
- if (rc != 0)
- return -1;
- if (!box_raft()->is_enabled) {
- diag_set(ClientError, ER_RAFT_DISABLED);
- return -1;
- }
- if (box_raft()->state != RAFT_STATE_LEADER) {
- diag_set(ClientError, ER_INTERFERING_PROMOTE,
- box_raft()->leader);
- return -1;
- }
- }
-
- if (txn_limbo_is_empty(&txn_limbo))
- goto promote;
+ int64_t wait_lsn = -1;
- if (try_wait) {
- /* Wait until pending confirmations/rollbacks reach us. */
- double timeout = 2 * replication_synchro_timeout;
- txn_limbo_wait_empty(&txn_limbo, timeout);
- /*
- * Our mission was to clear the limbo from former leader's
- * transactions. Exit in case someone did that for us.
- */
- if (former_leader_id != txn_limbo.owner_id) {
- diag_set(ClientError, ER_INTERFERING_PROMOTE,
- txn_limbo.owner_id);
- return -1;
- }
- if (txn_limbo_is_empty(&txn_limbo)) {
- wait_lsn = txn_limbo.confirmed_lsn;
- goto promote;
- }
- }
-
- struct txn_limbo_entry *last_entry;
- last_entry = txn_limbo_last_synchro_entry(&txn_limbo);
- /* Wait for the last entries WAL write. */
- if (last_entry->lsn < 0) {
- int64_t tid = last_entry->txn->id;
- if (wal_sync(NULL) < 0)
- return -1;
- if (former_leader_id != txn_limbo.owner_id) {
- diag_set(ClientError, ER_INTERFERING_PROMOTE,
- txn_limbo.owner_id);
- return -1;
- }
- if (txn_limbo_is_empty(&txn_limbo)) {
- wait_lsn = txn_limbo.confirmed_lsn;
- goto promote;
- }
- if (tid != txn_limbo_last_synchro_entry(&txn_limbo)->txn->id) {
- diag_set(ClientError, ER_QUORUM_WAIT, quorum,
- "new synchronous transactions appeared");
- return -1;
- }
- }
- wait_lsn = last_entry->lsn;
- assert(wait_lsn > 0);
+ if (run_elections && box_run_elections() < 0)
+ return -1;
+ if (try_wait &&
+ box_try_wait_confirm(2 * replication_synchro_timeout) < 0)
+ return -1;
+ if ((wait_lsn = box_wait_limbo_acked()) < 0)
+ return -1;
- rc = box_wait_quorum(former_leader_id, wait_lsn, quorum,
- replication_synchro_timeout);
- if (rc == 0) {
- if (quorum < replication_synchro_quorum) {
- diag_set(ClientError, ER_QUORUM_WAIT, quorum,
- "quorum was increased while waiting");
- rc = -1;
- } else if (wait_lsn < txn_limbo_last_synchro_entry(&txn_limbo)->lsn) {
- diag_set(ClientError, ER_QUORUM_WAIT, quorum,
- "new synchronous transactions appeared");
- rc = -1;
- } else {
-promote:
- /* We cannot possibly get here in a volatile state. */
- assert(box_raft()->volatile_term == box_raft()->term);
- txn_limbo_write_promote(&txn_limbo, wait_lsn,
- box_raft()->term);
- struct synchro_request req = {
- .type = IPROTO_PROMOTE,
- .replica_id = former_leader_id,
- .origin_id = instance_id,
- .lsn = wait_lsn,
- .term = box_raft()->term,
- };
- txn_limbo_process(&txn_limbo, &req);
- assert(txn_limbo_is_empty(&txn_limbo));
- }
- }
- return rc;
+ box_issue_promote(txn_limbo.owner_id, wait_lsn);
+ return 0;
}
int
--
2.30.1 (Apple Git-130)
More information about the Tarantool-patches
mailing list