[Tarantool-patches] [PATCH v3 08/10] raft: introduce state machine

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Sep 30 01:11:30 MSK 2020


The commit is a core part of Raft implementation. It introduces
the Raft state machine implementation and its integration into the
instance's life cycle.

The implementation follows the protocol to the letter except a few
important details.

Firstly, the original Raft assumes, that all nodes share the same
log record numbers. In Tarantool they are called LSNs. But in case
of Tarantool each node has its own LSN in its own component of
vclock. That makes the election messages a bit heavier, because
the nodes need to send and compare complete vclocks of each other
instead of a single number like in the original Raft. But logic
becomes simpler. Because in the original Raft there is a problem
of uncertainty about what to do with records of an old leader
right after a new leader is elected. They could be rolled back or
confirmed depending on circumstances. The issue disappears when
vclock is used.

Secondly, leader election works differently during cluster
bootstrap, until number of bootstrapped replicas becomes >=
election quorum. That arises from specifics of replicas bootstrap
and order of systems initialization. In short: during bootstrap a
leader election may use a smaller election quorum than the
configured one. See more details in the code.

Part of #1146
---
 src/box/applier.cc |  23 +-
 src/box/box.cc     |  19 +-
 src/box/raft.c     | 897 +++++++++++++++++++++++++++++++++++++++++++--
 src/box/raft.h     | 135 ++++++-
 src/box/relay.cc   |  24 ++
 5 files changed, 1062 insertions(+), 36 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9fed3c071..7686d6cbc 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -883,6 +883,11 @@ static int
 applier_handle_raft(struct applier *applier, struct xrow_header *row)
 {
 	assert(iproto_type_is_raft_request(row->type));
+	if (applier->instance_id == 0) {
+		diag_set(ClientError, ER_PROTOCOL, "Can't apply a Raft request "
+			 "from an instance without an ID");
+		return -1;
+	}
 
 	struct raft_request req;
 	struct vclock candidate_clock;
@@ -897,8 +902,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
  * Return 0 for success or -1 in case of an error.
  */
 static int
-applier_apply_tx(struct stailq *rows)
+applier_apply_tx(struct applier *applier, struct stailq *rows)
 {
+	/*
+	 * Rows received not directly from a leader are ignored. That is a
+	 * protection against the case when an old leader keeps sending data
+	 * around not knowing yet that it is not a leader anymore.
+	 *
+	 * XXX: it may be that this can be fine to apply leader transactions by
+	 * looking at their replica_id field if it is equal to leader id. That
+	 * can be investigated as an 'optimization'. Even though may not give
+	 * anything, because won't change total number of rows sent in the
+	 * network anyway.
+	 */
+	if (!raft_is_source_allowed(applier->instance_id))
+		return 0;
 	struct xrow_header *first_row = &stailq_first_entry(rows,
 					struct applier_tx_row, next)->row;
 	struct xrow_header *last_row;
@@ -1238,6 +1256,7 @@ applier_subscribe(struct applier *applier)
 		struct xrow_header *first_row =
 			&stailq_first_entry(&rows, struct applier_tx_row,
 					    next)->row;
+		raft_process_heartbeat(applier->instance_id);
 		if (first_row->lsn == 0) {
 			if (unlikely(iproto_type_is_raft_request(
 							first_row->type))) {
@@ -1246,7 +1265,7 @@ applier_subscribe(struct applier *applier)
 					diag_raise();
 			}
 			applier_signal_ack(applier);
-		} else if (applier_apply_tx(&rows) != 0) {
+		} else if (applier_apply_tx(applier, &rows) != 0) {
 			diag_raise();
 		}
 
diff --git a/src/box/box.cc b/src/box/box.cc
index a8542cb38..6ec813c12 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -157,7 +157,7 @@ void
 box_update_ro_summary(void)
 {
 	bool old_is_ro_summary = is_ro_summary;
-	is_ro_summary = is_ro || is_orphan;
+	is_ro_summary = is_ro || is_orphan || raft_is_ro();
 	/* In 99% nothing changes. Filter this out first. */
 	if (is_ro_summary == old_is_ro_summary)
 		return;
@@ -171,6 +171,10 @@ static int
 box_check_writable(void)
 {
 	if (is_ro_summary) {
+		/*
+		 * XXX: return a special error when the node is not a leader to
+		 * reroute to the leader node.
+		 */
 		diag_set(ClientError, ER_READONLY);
 		diag_log();
 		return -1;
@@ -2648,6 +2652,7 @@ box_init(void)
 
 	txn_limbo_init();
 	sequence_init();
+	raft_init();
 }
 
 bool
@@ -2795,8 +2800,18 @@ box_cfg_xc(void)
 	title("running");
 	say_info("ready to accept requests");
 
-	if (!is_bootstrap_leader)
+	if (!is_bootstrap_leader) {
 		replicaset_sync();
+	} else {
+		/*
+		 * When the cluster is just bootstrapped and this instance is a
+		 * leader, it makes no sense to wait for a leader appearance.
+		 * There is no one. Moreover this node *is* a leader, so it
+		 * should take the control over the situation and start a new
+		 * term immediately.
+		 */
+		raft_new_term();
+	}
 
 	/* box.cfg.read_only is not read yet. */
 	assert(box_is_ro());
diff --git a/src/box/raft.c b/src/box/raft.c
index 024433369..e88e5adb6 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -36,6 +36,13 @@
 #include "small/region.h"
 #include "replication.h"
 #include "relay.h"
+#include "box.h"
+#include "tt_static.h"
+
+/**
+ * Maximal random deviation of the election timeout. From the configured value.
+ */
+#define RAFT_RANDOM_ELECTION_FACTOR 0.1
 
 const char *raft_state_strs[] = {
 	NULL,
@@ -48,19 +55,264 @@ const char *raft_state_strs[] = {
 struct raft raft = {
 	.leader = 0,
 	.state = RAFT_STATE_FOLLOWER,
+	.volatile_term = 1,
+	.volatile_vote = 0,
 	.is_enabled = false,
 	.is_candidate = false,
+	.is_cfg_candidate = false,
+	.is_write_in_progress = false,
+	.is_broadcast_scheduled = false,
 	.term = 1,
 	.vote = 0,
+	.vote_mask = 0,
+	.vote_count = 0,
+	.worker = NULL,
+	.election_timeout = 5,
 };
 
+/**
+ * Check if Raft is completely synced with disk. Meaning all its critical values
+ * are in WAL. Only in that state the node can become a leader or a candidate.
+ * If the node has a not flushed data, it means either the term was bumped, or
+ * a new vote was made.
+ *
+ * In case of term bump it means either there is another node with a newer term,
+ * and this one should be a follower; or this node bumped the term itself along
+ * with making a vote to start a new election - then it is also a follower which
+ * will turn into a candidate when the flush is done.
+ *
+ * In case of a new not flushed vote it means either this node voted for some
+ * other node, and must be a follower; or it voted for self, and also must be a
+ * follower, but will become a candidate when the flush is done.
+ *
+ * In total - when something is not synced with disk, the instance is a follower
+ * in any case.
+ */
+static bool
+raft_is_fully_on_disk(void)
+{
+	return raft.volatile_term == raft.term &&
+	       raft.volatile_vote == raft.vote;
+}
+
+/**
+ * Raft protocol says that election timeout should be a bit randomized so as
+ * the nodes wouldn't start election at the same time and end up with not having
+ * a quorum for anybody. This implementation randomizes the election timeout by
+ * adding {election timeout * random factor} value, where max value of the
+ * factor is a constant floating point value > 0.
+ */
+static inline double
+raft_new_random_election_shift(void)
+{
+	double timeout = raft.election_timeout;
+	/* Translate to ms. Integer is needed to be able to use mod below. */
+	uint32_t rand_part =
+		(uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
+	if (rand_part == 0)
+		rand_part = 1;
+	/*
+	 * XXX: this is not giving a good distribution, but it is not so trivial
+	 * to implement a correct random value generator. There is a task to
+	 * unify all such places. Not critical here.
+	 */
+	rand_part = rand() % (rand_part + 1);
+	return rand_part / 1000.0;
+}
+
+/**
+ * Raft says that during election a node1 can vote for node2, if node2 has a
+ * bigger term, or has the same term but longer log. In case of Tarantool it
+ * means the node2 vclock should be >= node1 vclock, in all components. It is
+ * not enough to compare only one component. At least because there may be not
+ * a previous leader when the election happens first time. Or a node could
+ * restart and forget who the previous leader was.
+ */
+static inline bool
+raft_can_vote_for(const struct vclock *v)
+{
+	int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
+	return cmp == 0 || cmp == 1;
+}
+
+/**
+ * Election quorum is not strictly equal to synchronous replication quorum.
+ * Sometimes it can be lowered. That is about bootstrap.
+ *
+ * The problem with bootstrap is that when the replicaset boots, all the
+ * instances can't write to WAL and can't recover from their initial snapshot.
+ * They need one node which will boot first, and then they will replicate from
+ * it.
+ *
+ * This one node should boot from its zero snapshot, create replicaset UUID,
+ * register self with ID 1 in _cluster space, and then register all the other
+ * instances here. To do that the node must be writable. It should have
+ * read_only = false, connection quorum satisfied, and be a Raft leader if Raft
+ * is enabled.
+ *
+ * To be elected a Raft leader it needs to perform election. But it can't be
+ * done before at least synchronous quorum of the replicas is bootstrapped. And
+ * they can't be bootstrapped because wait for a leader to initialize _cluster.
+ * Cyclic dependency.
+ *
+ * This is resolved by truncation of the election quorum to the number of
+ * registered replicas, if their count is less than synchronous quorum. That
+ * helps to elect a first leader.
+ *
+ * It may seem that the first node could just declare itself a leader and then
+ * strictly follow the protocol from now on, but that won't work, because if the
+ * first node will restart after it is booted, but before quorum of replicas is
+ * booted, the cluster will stuck again.
+ *
+ * The current solution is totally safe because
+ *
+ * - after all the cluster will have node count >= quorum, if user used a
+ *   correct config (God help him if he didn't);
+ *
+ * - synchronous replication quorum is untouched - it is not truncated. Only
+ *   leader election quorum is affected. So synchronous data won't be lost.
+ */
+static inline int
+raft_election_quorum(void)
+{
+	return MIN(replication_synchro_quorum, replicaset.registered_count);
+}
+
+/** Schedule broadcast of the complete Raft state to all the followers. */
+static void
+raft_schedule_broadcast(void);
+
+/** Raft state machine methods. 'sm' stands for State Machine. */
+
+/**
+ * Start the state machine. When it is stopped, Raft state is updated and
+ * goes to WAL when necessary, but it does not affect the instance operation.
+ * For example, when Raft is stopped, the instance role does not affect whether
+ * it is writable.
+ */
+static void
+raft_sm_start(void);
+
+/**
+ * Stop the state machine. Now until Raft is re-enabled,
+ * - Raft stops affecting the instance operation;
+ * - this node can't become a leader;
+ * - this node can't vote.
+ */
+static void
+raft_sm_stop(void);
+
+/**
+ * When the instance is a follower but is allowed to be a leader, it will wait
+ * for death of the current leader to start new election.
+ */
+static void
+raft_sm_wait_leader_dead(void);
+
+/**
+ * Wait for the leader death timeout until a leader lets the node know he is
+ * alive. Otherwise the node will start a new term. Can be useful when it is not
+ * known whether the leader is alive, but it is undesirable to start a new term
+ * immediately. Because in case the leader is alive, a new term would stun him
+ * and therefore would stun DB write requests. Usually happens when a follower
+ * restarts and may need some time to hear something from the leader.
+ */
+static void
+raft_sm_wait_leader_found(void);
+
+/**
+ * If election is started by this node, or it voted for some other node started
+ * the election, and it can be a leader itself, it will wait until the current
+ * election times out. When it happens, the node will start new election.
+ */
+static void
+raft_sm_wait_election_end(void);
+
+/** Bump volatile term and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_term(uint64_t new_term);
+
+/** Bump volatile vote and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote);
+
+/**
+ * Bump term and vote for self immediately. After that is persisted, the
+ * election timeout will be activated. Unless during that nothing newer happens.
+ */
+static void
+raft_sm_schedule_new_election(void);
+
+/**
+ * The main trigger of Raft state machine - start new election when the current
+ * leader dies, or when there is no a leader and the previous election failed.
+ */
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+				 int events);
+
+/** Start Raft state flush to disk. */
+static void
+raft_sm_pause_and_dump(void);
+
+static void
+raft_sm_become_leader(void);
+
+static void
+raft_sm_follow_leader(uint32_t leader);
+
+static void
+raft_sm_become_candidate(void);
+
+static const char *
+raft_request_to_string(const struct raft_request *req)
+{
+	assert(req->term != 0);
+	int size = 1024;
+	char buf[1024];
+	char *pos = buf;
+	int rc = snprintf(pos, size, "{term: %llu", req->term);
+	assert(rc >= 0);
+	pos += rc;
+	size -= rc;
+	if (req->vote != 0) {
+		rc = snprintf(pos, size, ", vote: %u", req->vote);
+		assert(rc >= 0);
+		pos += rc;
+		size -= rc;
+	}
+	if (req->state != 0) {
+		rc = snprintf(pos, size, ", state: %s",
+			      raft_state_strs[req->state]);
+		assert(rc >= 0);
+		pos += rc;
+		size -= rc;
+	}
+	if (req->vclock != NULL) {
+		rc = snprintf(pos, size, ", vclock: %s",
+			      vclock_to_string(req->vclock));
+		assert(rc >= 0);
+		pos += rc;
+		size -= rc;
+	}
+	rc = snprintf(pos, size, "}");
+	assert(rc >= 0);
+	pos += rc;
+	return tt_cstr(buf, pos - buf);
+}
+
 void
 raft_process_recovery(const struct raft_request *req)
 {
-	if (req->term != 0)
+	say_verbose("RAFT: recover %s", raft_request_to_string(req));
+	if (req->term != 0) {
 		raft.term = req->term;
-	if (req->vote != 0)
+		raft.volatile_term = req->term;
+	}
+	if (req->vote != 0) {
 		raft.vote = req->vote;
+		raft.volatile_vote = req->vote;
+	}
 	/*
 	 * Role is never persisted. If recovery is happening, the
 	 * node was restarted, and the former role can be false
@@ -80,35 +332,550 @@ raft_process_recovery(const struct raft_request *req)
 int
 raft_process_msg(const struct raft_request *req, uint32_t source)
 {
-	(void)source;
-	if (req->term > raft.term) {
-		// Update term.
-		// The logic will be similar, but the code
-		// below is for testing purposes.
-		raft.term = req->term;
+	say_info("RAFT: message %s from %u", raft_request_to_string(req),
+		 source);
+	assert(source > 0);
+	assert(source != instance_id);
+	if (req->term == 0 || req->state == 0) {
+		diag_set(ClientError, ER_PROTOCOL, "Raft term and state can't "
+			 "be zero");
+		return -1;
+	}
+	if (req->state == RAFT_STATE_CANDIDATE &&
+	    (req->vote != source || req->vclock == NULL)) {
+		diag_set(ClientError, ER_PROTOCOL, "Candidate should always "
+			 "vote for self and provide its vclock");
+		return -1;
+	}
+	/* Outdated request. */
+	if (req->term < raft.volatile_term) {
+		say_info("RAFT: the message is ignored due to outdated term - "
+			 "current term is %u", raft.volatile_term);
+		return 0;
+	}
+
+	/* Term bump. */
+	if (req->term > raft.volatile_term)
+		raft_sm_schedule_new_term(req->term);
+	/*
+	 * Either a vote request during an on-going election. Or an old vote
+	 * persisted long time ago and still broadcasted. Or a vote response.
+	 */
+	if (req->vote != 0) {
+		switch (raft.state) {
+		case RAFT_STATE_FOLLOWER:
+		case RAFT_STATE_LEADER:
+			if (!raft.is_enabled) {
+				say_info("RAFT: vote request is skipped - RAFT "
+					 "is disabled");
+				break;
+			}
+			if (raft.leader != 0) {
+				say_info("RAFT: vote request is skipped - the "
+					 "leader is already known - %u",
+					 raft.leader);
+				break;
+			}
+			if (req->vote == instance_id) {
+				/*
+				 * This is entirely valid. This instance could
+				 * request a vote, then become a follower or
+				 * leader, and then get the response.
+				 */
+				say_info("RAFT: vote request is skipped - "
+					 "can't accept vote for self if not a "
+					 "candidate");
+				break;
+			}
+			if (req->state != RAFT_STATE_CANDIDATE) {
+				say_info("RAFT: vote request is skipped - "
+					 "this is a notification about a vote "
+					 "for a third node, not a request");
+				break;
+			}
+			if (raft.volatile_vote != 0) {
+				say_info("RAFT: vote request is skipped - "
+					 "already voted in this term");
+				break;
+			}
+			/* Vclock is not NULL, validated above. */
+			if (!raft_can_vote_for(req->vclock)) {
+				say_info("RAFT: vote request is skipped - the "
+					 "vclock is not acceptable");
+				break;
+			}
+			/*
+			 * Either the term is new, or didn't vote in the current
+			 * term yet. Anyway can vote now.
+			 */
+			raft_sm_schedule_new_vote(req->vote);
+			break;
+		case RAFT_STATE_CANDIDATE:
+			/* Check if this is a vote for a competing candidate. */
+			if (req->vote != instance_id) {
+				say_info("RAFT: vote request is skipped - "
+					 "competing candidate");
+				break;
+			}
+			/*
+			 * Vote for self was requested earlier in this round,
+			 * and now was answered by some other instance.
+			 */
+			assert(raft.volatile_vote == instance_id);
+			int quorum = raft_election_quorum();
+			bool was_set = bit_set(&raft.vote_mask, source);
+			raft.vote_count += !was_set;
+			if (raft.vote_count < quorum) {
+				say_info("RAFT: accepted vote for self, vote "
+					 "count is %d/%d", raft.vote_count,
+					 quorum);
+				break;
+			}
+			raft_sm_become_leader();
+			break;
+		default:
+			unreachable();
+		}
+	}
+	if (req->state != RAFT_STATE_LEADER) {
+		if (source == raft.leader) {
+			say_info("RAFT: the node %u has resigned from the "
+				 "leader role", raft.leader);
+			raft_sm_schedule_new_election();
+		}
+		return 0;
+	}
+	/* The node is a leader, but it is already known. */
+	if (source == raft.leader)
+		return 0;
+	/*
+	 * XXX: A message from a conflicting leader. Split brain, basically.
+	 * Need to decide what to do. Current solution is to do nothing. In
+	 * future either this node should try to become a leader, or should stop
+	 * all writes and require manual intervention.
+	 */
+	if (raft.leader != 0) {
+		say_warn("RAFT: conflicting leader detected in one term - "
+			 "known is %u, received %u", raft.leader, source);
+		return 0;
+	}
+
+	/* New leader was elected. */
+	raft_sm_follow_leader(source);
+	return 0;
+}
+
+void
+raft_process_heartbeat(uint32_t source)
+{
+	/*
+	 * Raft handles heartbeats from all instances, including anon instances
+	 * which don't participate in Raft.
+	 */
+	if (source == 0)
+		return;
+	/*
+	 * When not a candidate - don't wait for anything. Therefore do not care
+	 * about the leader being dead.
+	 */
+	if (!raft.is_candidate)
+		return;
+	/* Don't care about heartbeats when this node is a leader itself. */
+	if (raft.state == RAFT_STATE_LEADER)
+		return;
+	/* Not interested in heartbeats from not a leader. */
+	if (raft.leader != source)
+		return;
+	/*
+	 * The instance currently is busy with writing something on disk. Can't
+	 * react to heartbeats.
+	 */
+	if (raft.is_write_in_progress)
+		return;
+	/*
+	 * XXX: it may be expensive to reset the timer like that. It may be less
+	 * expensive to let the timer work, and remember last timestamp when
+	 * anything was heard from the leader. Then in the timer callback check
+	 * the timestamp, and restart the timer, if it is fine.
+	 */
+	assert(ev_is_active(&raft.timer));
+	ev_timer_stop(loop(), &raft.timer);
+	raft_sm_wait_leader_dead();
+}
+
+/** Wakeup Raft state writer fiber waiting for WAL write end. */
+static void
+raft_write_cb(struct journal_entry *entry)
+{
+	fiber_wakeup(entry->complete_data);
+}
+
+/** Synchronously write a Raft request into WAL. */
+static void
+raft_write_request(const struct raft_request *req)
+{
+	assert(raft.is_write_in_progress);
+	/*
+	 * Vclock is never persisted by Raft. It is used only to
+	 * be sent to network when vote for self.
+	 */
+	assert(req->vclock == NULL);
+	/*
+	 * State is not persisted. That would be strictly against Raft protocol.
+	 * The reason is that it does not make much sense - even if the node is
+	 * a leader now, after the node is restarted, there will be another
+	 * leader elected by that time likely.
+	 */
+	assert(req->state == 0);
+	struct region *region = &fiber()->gc;
+	uint32_t svp = region_used(region);
+	struct xrow_header row;
+	char buf[sizeof(struct journal_entry) +
+		 sizeof(struct xrow_header *)];
+	struct journal_entry *entry = (struct journal_entry *)buf;
+	entry->rows[0] = &row;
+
+	if (xrow_encode_raft(&row, region, req) != 0)
+		goto fail;
+	journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb,
+			     fiber());
+
+	if (journal_write(entry) != 0 || entry->res < 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		goto fail;
 	}
-	if (req->vote > 0) {
-		// Check whether the vote's for us.
-	}
-	switch (req->state) {
-	case RAFT_STATE_FOLLOWER:
-	    break;
-	case RAFT_STATE_CANDIDATE:
-	    // Perform voting logic.
-	    break;
-	case RAFT_STATE_LEADER:
-	    // Switch to a new leader.
-	    break;
-	default:
-	    break;
+
+	region_truncate(region, svp);
+	return;
+fail:
+	/*
+	 * XXX: the stub is supposed to be removed once it is defined what to do
+	 * when a raft request WAL write fails.
+	 */
+	panic("Could not write a raft request to WAL\n");
+}
+
+/* Dump Raft state to WAL in a blocking way. */
+static void
+raft_worker_handle_io(void)
+{
+	assert(raft.is_write_in_progress);
+	/* During write Raft can't be anything but a follower. */
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	struct raft_request req;
+
+	if (raft_is_fully_on_disk()) {
+end_dump:
+		raft.is_write_in_progress = false;
+		/*
+		 * The state machine is stable. Can see now, to what state to
+		 * go.
+		 */
+		if (!raft.is_candidate) {
+			/*
+			 * If not a candidate, can't do anything except vote for
+			 * somebody (if Raft is enabled). Nothing to do except
+			 * staying a follower without timeouts.
+			 */
+		} else if (raft.leader != 0) {
+			/* There is a known leader. Wait until it is dead. */
+			raft_sm_wait_leader_dead();
+		} else if (raft.vote == instance_id) {
+			/* Just wrote own vote. */
+			if (raft_election_quorum() == 1)
+				raft_sm_become_leader();
+			else
+				raft_sm_become_candidate();
+		} else if (raft.vote != 0) {
+			/*
+			 * Voted for some other node. Wait if it manages to
+			 * become a leader.
+			 */
+			raft_sm_wait_election_end();
+		} else {
+			/* No leaders, no votes. */
+			raft_sm_schedule_new_vote(instance_id);
+		}
+	} else {
+		memset(&req, 0, sizeof(req));
+		assert(raft.volatile_term >= raft.term);
+		req.term = raft.volatile_term;
+		req.vote = raft.volatile_vote;
+
+		raft_write_request(&req);
+		say_info("RAFT: persisted state %s",
+			 raft_request_to_string(&req));
+
+		assert(req.term >= raft.term);
+		raft.term = req.term;
+		raft.vote = req.vote;
+		/*
+		 * Persistent state is visible, and it was changed - broadcast.
+		 */
+		raft_schedule_broadcast();
+		if (raft_is_fully_on_disk())
+			goto end_dump;
+	}
+}
+
+/* Broadcast Raft complete state to the followers. */
+static void
+raft_worker_handle_broadcast(void)
+{
+	assert(raft.is_broadcast_scheduled);
+	struct raft_request req;
+	memset(&req, 0, sizeof(req));
+	req.term = raft.term;
+	req.vote = raft.vote;
+	req.state = raft.state;
+	if (req.state == RAFT_STATE_CANDIDATE) {
+		assert(raft.vote == instance_id);
+		req.vclock = &replicaset.vclock;
+	}
+	replicaset_foreach(replica)
+		relay_push_raft(replica->relay, &req);
+	raft.is_broadcast_scheduled = false;
+}
+
+static int
+raft_worker_f(va_list args)
+{
+	(void)args;
+	bool is_idle;
+	while (!fiber_is_cancelled()) {
+		is_idle = true;
+		if (raft.is_write_in_progress) {
+			raft_worker_handle_io();
+			is_idle = false;
+		}
+		if (raft.is_broadcast_scheduled) {
+			raft_worker_handle_broadcast();
+			is_idle = false;
+		}
+		fiber_sleep(0);
+		if (!is_idle)
+			continue;
+		assert(raft_is_fully_on_disk());
+		fiber_yield();
 	}
 	return 0;
 }
 
+static void
+raft_sm_pause_and_dump(void)
+{
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	if (raft.is_write_in_progress)
+		return;
+	ev_timer_stop(loop(), &raft.timer);
+	raft.is_write_in_progress = true;
+	if (raft.worker == NULL)
+		raft.worker = fiber_new("raft_worker", raft_worker_f);
+	fiber_wakeup(raft.worker);
+}
+
+static void
+raft_sm_become_leader(void)
+{
+	assert(raft.state != RAFT_STATE_LEADER);
+	say_info("RAFT: enter leader state with quorum %d",
+		 raft_election_quorum());
+	assert(raft.leader == 0);
+	assert(raft.is_candidate);
+	assert(!raft.is_write_in_progress);
+	raft.state = RAFT_STATE_LEADER;
+	raft.leader = instance_id;
+	ev_timer_stop(loop(), &raft.timer);
+	/* Make read-write (if other subsystems allow that. */
+	box_update_ro_summary();
+	/* State is visible and it is changed - broadcast. */
+	raft_schedule_broadcast();
+}
+
+static void
+raft_sm_follow_leader(uint32_t leader)
+{
+	say_info("RAFT: leader is %u, follow", leader);
+	assert(raft.state != RAFT_STATE_LEADER);
+	assert(raft.leader == 0);
+	raft.state = RAFT_STATE_FOLLOWER;
+	raft.leader = leader;
+	if (!raft.is_write_in_progress && raft.is_candidate) {
+		ev_timer_stop(loop(), &raft.timer);
+		raft_sm_wait_leader_dead();
+	}
+	/* State is visible and it is changed - broadcast. */
+	raft_schedule_broadcast();
+}
+
+static void
+raft_sm_become_candidate(void)
+{
+	say_info("RAFT: enter candidate state with 1 self vote");
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	assert(raft.leader == 0);
+	assert(raft.vote == instance_id);
+	assert(raft.is_candidate);
+	assert(!raft.is_write_in_progress);
+	assert(raft_election_quorum() > 1);
+	raft.state = RAFT_STATE_CANDIDATE;
+	raft.vote_count = 1;
+	raft.vote_mask = 0;
+	bit_set(&raft.vote_mask, instance_id);
+	raft_sm_wait_election_end();
+	/* State is visible and it is changed - broadcast. */
+	raft_schedule_broadcast();
+}
+
+static void
+raft_sm_schedule_new_term(uint64_t new_term)
+{
+	say_info("RAFT: bump term to %llu, follow", new_term);
+	assert(new_term > raft.volatile_term);
+	assert(raft.volatile_term >= raft.term);
+	raft.volatile_term = new_term;
+	/* New terms means completely new Raft state. */
+	raft.volatile_vote = 0;
+	raft.leader = 0;
+	raft.state = RAFT_STATE_FOLLOWER;
+	box_update_ro_summary();
+	raft_sm_pause_and_dump();
+	/*
+	 * State is visible and it is changed - broadcast. Term is also visible,
+	 * but only persistent term. Volatile term is not broadcasted until
+	 * saved to disk.
+	 */
+	raft_schedule_broadcast();
+}
+
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote)
+{
+	say_info("RAFT: vote for %u, follow", new_vote, raft.volatile_term);
+	assert(raft.volatile_vote == 0);
+	assert(raft.leader == 0);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	raft.volatile_vote = new_vote;
+	raft_sm_pause_and_dump();
+	/* Nothing visible is changed - no broadcast. */
+}
+
+static void
+raft_sm_schedule_new_election(void)
+{
+	say_info("RAFT: begin new election round");
+	assert(raft_is_fully_on_disk());
+	assert(raft.is_candidate);
+	/* Everyone is a follower until its vote for self is persisted. */
+	raft_sm_schedule_new_term(raft.term + 1);
+	raft_sm_schedule_new_vote(instance_id);
+	box_update_ro_summary();
+}
+
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+				 int events)
+{
+	assert(timer == &raft.timer);
+	(void)events;
+	ev_timer_stop(loop, timer);
+	raft_sm_schedule_new_election();
+}
+
+static void
+raft_sm_wait_leader_dead(void)
+{
+	assert(!ev_is_active(&raft.timer));
+	assert(!raft.is_write_in_progress);
+	assert(raft.is_candidate);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	assert(raft.leader != 0);
+	double death_timeout = replication_disconnect_timeout();
+	ev_timer_set(&raft.timer, death_timeout, death_timeout);
+	ev_timer_start(loop(), &raft.timer);
+}
+
+static void
+raft_sm_wait_leader_found(void)
+{
+	assert(!ev_is_active(&raft.timer));
+	assert(!raft.is_write_in_progress);
+	assert(raft.is_candidate);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	assert(raft.leader == 0);
+	double death_timeout = replication_disconnect_timeout();
+	ev_timer_set(&raft.timer, death_timeout, death_timeout);
+	ev_timer_start(loop(), &raft.timer);
+}
+
+static void
+raft_sm_wait_election_end(void)
+{
+	assert(!ev_is_active(&raft.timer));
+	assert(!raft.is_write_in_progress);
+	assert(raft.is_candidate);
+	assert(raft.state == RAFT_STATE_FOLLOWER ||
+	       (raft.state == RAFT_STATE_CANDIDATE &&
+		raft.volatile_vote == instance_id));
+	assert(raft.leader == 0);
+	double election_timeout = raft.election_timeout +
+				  raft_new_random_election_shift();
+	ev_timer_set(&raft.timer, election_timeout, election_timeout);
+	ev_timer_start(loop(), &raft.timer);
+}
+
+static void
+raft_sm_start(void)
+{
+	say_info("RAFT: start state machine");
+	assert(!ev_is_active(&raft.timer));
+	assert(!raft.is_write_in_progress);
+	assert(!raft.is_enabled);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	raft.is_enabled = true;
+	raft.is_candidate = raft.is_cfg_candidate;
+	if (!raft.is_candidate) {
+		/* Nop. */;
+	} else if (raft.leader != 0) {
+		raft_sm_wait_leader_dead();
+	} else {
+		/*
+		 * Don't start new election. The situation is most likely
+		 * happened because this node was restarted. Instance restarts
+		 * may happen in the cluster, and each restart shouldn't
+		 * disturb the current leader. Give it time to notify this node
+		 * that there is a leader.
+		 */
+		raft_sm_wait_leader_found();
+	}
+	box_update_ro_summary();
+}
+
+static void
+raft_sm_stop(void)
+{
+	say_info("RAFT: stop state machine");
+	assert(raft.is_enabled);
+	raft.is_enabled = false;
+	raft.is_candidate = false;
+	if (raft.state == RAFT_STATE_LEADER)
+		raft.leader = 0;
+	raft.state = RAFT_STATE_FOLLOWER;
+	ev_timer_stop(loop(), &raft.timer);
+	box_update_ro_summary();
+	/* State is visible and changed - broadcast. */
+	raft_schedule_broadcast();
+}
+
 void
 raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
 {
 	memset(req, 0, sizeof(*req));
+	/*
+	 * Volatile state is never used for any communications.
+	 * Use only persisted state.
+	 */
 	req->term = raft.term;
 	req->vote = raft.vote;
 	req->state = raft.state;
@@ -133,34 +900,110 @@ raft_serialize_for_disk(struct raft_request *req)
 void
 raft_cfg_is_enabled(bool is_enabled)
 {
-	raft.is_enabled = is_enabled;
+	if (is_enabled == raft.is_enabled)
+		return;
+
+	if (!is_enabled)
+		raft_sm_stop();
+	else
+		raft_sm_start();
 }
 
 void
 raft_cfg_is_candidate(bool is_candidate)
 {
-	raft.is_candidate = is_candidate;
+	bool old_is_candidate = raft.is_candidate;
+	raft.is_cfg_candidate = is_candidate;
+	raft.is_candidate = is_candidate && raft.is_enabled;
+	if (raft.is_candidate == old_is_candidate)
+		return;
+
+	if (raft.is_candidate) {
+		assert(raft.state == RAFT_STATE_FOLLOWER);
+		/*
+		 * If there is an on-going WAL write, it means there was some
+		 * node who sent newer data to this node.
+		 */
+		if (raft.leader == 0 && raft_is_fully_on_disk())
+			raft_sm_wait_leader_found();
+	} else if (raft.state != RAFT_STATE_FOLLOWER) {
+		if (raft.state == RAFT_STATE_LEADER)
+			raft.leader = 0;
+		raft.state = RAFT_STATE_FOLLOWER;
+		/* State is visible and changed - broadcast. */
+		raft_schedule_broadcast();
+	}
+	box_update_ro_summary();
 }
 
 void
 raft_cfg_election_timeout(double timeout)
 {
+	if (timeout == raft.election_timeout)
+		return;
+
 	raft.election_timeout = timeout;
+	if (raft.vote != 0 && raft.leader == 0 && raft.is_candidate) {
+		assert(ev_is_active(&raft.timer));
+		double timeout = ev_timer_remaining(loop(), &raft.timer) -
+				 raft.timer.at + raft.election_timeout;
+		ev_timer_stop(loop(), &raft.timer);
+		ev_timer_set(&raft.timer, timeout, timeout);
+		ev_timer_start(loop(), &raft.timer);
+	}
 }
 
 void
 raft_cfg_election_quorum(void)
 {
+	if (raft.state != RAFT_STATE_CANDIDATE ||
+	    raft.state == RAFT_STATE_LEADER)
+		return;
+	if (raft.vote_count < raft_election_quorum())
+		return;
+	raft_sm_become_leader();
 }
 
 void
 raft_cfg_death_timeout(void)
 {
+	if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate &&
+	    raft.leader != 0) {
+		assert(ev_is_active(&raft.timer));
+		double death_timeout = replication_disconnect_timeout();
+		double timeout = ev_timer_remaining(loop(), &raft.timer) -
+				 raft.timer.at + death_timeout;
+		ev_timer_stop(loop(), &raft.timer);
+		ev_timer_set(&raft.timer, timeout, timeout);
+		ev_timer_start(loop(), &raft.timer);
+	}
 }
 
 void
-raft_broadcast(const struct raft_request *req)
+raft_new_term(void)
 {
-	replicaset_foreach(replica)
-		relay_push_raft(replica->relay, req);
+	if (raft.is_enabled)
+		raft_sm_schedule_new_term(raft.volatile_term + 1);
+}
+
+static void
+raft_schedule_broadcast(void)
+{
+	raft.is_broadcast_scheduled = true;
+	/*
+	 * Don't wake the fiber if it writes something. Otherwise it would be a
+	 * spurious wakeup breaking the WAL write not adapted to this.
+	 */
+	if (raft.is_write_in_progress)
+		return;
+	if (raft.worker == NULL)
+		raft.worker = fiber_new("raft_worker", raft_worker_f);
+	if (raft.worker != fiber())
+		fiber_wakeup(raft.worker);
+}
+
+void
+raft_init(void)
+{
+	ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0);
 }
diff --git a/src/box/raft.h b/src/box/raft.h
index 8abde4f4c..be77a5473 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -31,34 +31,147 @@
  */
 #include <stdint.h>
 #include <stdbool.h>
+#include "tarantool_ev.h"
 
 #if defined(__cplusplus)
 extern "C" {
 #endif
 
+/**
+ * This is an implementation of Raft leader election protocol, separated from
+ * synchronous replication part.
+ *
+ * The protocol describes an algorithm which helps to elect a single leader in
+ * the cluster, which is supposed to handle write requests. And re-elect a new
+ * leader, when the current leader dies.
+ *
+ * The implementation follows the protocol to the letter except a few important
+ * details.
+ *
+ * Firstly, the original Raft assumes, that all nodes share the same log record
+ * numbers. In Tarantool they are called LSNs. But in case of Tarantool each
+ * node has its own LSN in its own component of vclock. That makes the election
+ * messages a bit heavier, because the nodes need to send and compare complete
+ * vclocks of each other instead of a single number like in the original Raft.
+ * But logic becomes simpler. Because in the original Raft there is a problem of
+ * uncertainty about what to do with records of an old leader right after a new
+ * leader is elected. They could be rolled back or confirmed depending on
+ * circumstances. The issue disappears when vclock is used.
+ *
+ * Secondly, leader election works differently during cluster bootstrap, until
+ * number of bootstrapped replicas becomes >= election quorum. That arises from
+ * specifics of replicas bootstrap and order of systems initialization. In
+ * short: during bootstrap a leader election may use a smaller election quorum
+ * than the configured one. See more details in the code.
+ */
+
+struct fiber;
 struct raft_request;
 struct vclock;
 
 enum raft_state {
+	/**
+	 * Can't write. Can only accept data from a leader. Node in this state
+	 * either monitors an existing leader, or there is an on-going election
+	 * and the node voted for another node, or it can't be a candidate and
+	 * does not do anything.
+	 */
 	RAFT_STATE_FOLLOWER = 1,
+	/**
+	 * The node can't write. There is an active election, in which the node
+	 * voted for self. Now it waits for election outcome.
+	 */
 	RAFT_STATE_CANDIDATE = 2,
+	/** Election was successful. The node accepts write requests. */
 	RAFT_STATE_LEADER = 3,
 };
 
 extern const char *raft_state_strs[];
 
 struct raft {
+	/** Instance ID of leader of the current term. */
 	uint32_t leader;
+	/** State of the instance. */
 	enum raft_state state;
+	/**
+	 * Volatile part of the Raft state, whose WAL write may be still
+	 * in-progress, and yet the state may be already used. Volatile state is
+	 * never sent to anywhere, but the state machine makes decisions based
+	 * on it. That is vital.
+	 * As an example, volatile vote needs to be used to reject votes inside
+	 * a term, where the instance already voted (even if the vote WAL write
+	 * is not finished yet). Otherwise the instance would try to write
+	 * several votes inside one term.
+	 */
+	uint64_t volatile_term;
+	uint32_t volatile_vote;
+	/**
+	 * Flag whether Raft is enabled. When disabled, it still persists terms
+	 * so as to quickly enroll into the cluster when (if) it is enabled. In
+	 * everything else disabled Raft does not affect instance work.
+	 */
 	bool is_enabled;
+	/**
+	 * Flag whether the node can become a leader. It is an accumulated value
+	 * of configuration options Raft enabled and Raft candidate. If at least
+	 * one is false - the instance is not a candidate.
+	 */
 	bool is_candidate;
+	/** Flag whether the instance is allowed to be a leader. */
+	bool is_cfg_candidate;
+	/**
+	 * Flag whether Raft currently tries to write something into WAL. It
+	 * happens asynchronously, not right after Raft state is updated.
+	 */
+	bool is_write_in_progress;
+	/**
+	 * Flag whether Raft wants to broadcast its state. It is done
+	 * asynchronously in the worker fiber. That allows to collect multiple
+	 * updates into one batch if they happen in one event loop iteration.
+	 * Usually even in one function.
+	 */
+	bool is_broadcast_scheduled;
+	/**
+	 * Persisted Raft state. These values are used when need to tell current
+	 * Raft state to other nodes.
+	 */
 	uint64_t term;
 	uint32_t vote;
+	/**
+	 * Bit 1 on position N means that a vote from instance with ID = N was
+	 * obtained.
+	 */
+	uint32_t vote_mask;
+	/** Number of votes for this instance. Valid only in candidate state. */
+	int vote_count;
+	/** State machine timed event trigger. */
+	struct ev_timer timer;
+	/** Worker fiber to execute blocking tasks like IO. */
+	struct fiber *worker;
+	/** Configured election timeout in seconds. */
 	double election_timeout;
 };
 
 extern struct raft raft;
 
+/**
+ * A flag whether the instance is read-only according to Raft. Even if Raft
+ * allows writes though, it does not mean the instance is writable. It can be
+ * affected by box.cfg.read_only, connection quorum.
+ */
+static inline bool
+raft_is_ro(void)
+{
+	return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
+}
+
+/** See if the instance can accept rows from an instance with the given ID. */
+static inline bool
+raft_is_source_allowed(uint32_t source_id)
+{
+	return !raft.is_enabled || raft.leader == source_id;
+}
+
 /** Check if Raft is enabled. */
 static inline bool
 raft_is_enabled(void)
@@ -78,6 +191,13 @@ raft_process_recovery(const struct raft_request *req);
 int
 raft_process_msg(const struct raft_request *req, uint32_t source);
 
+/**
+ * Process a heartbeat message from an instance with the given ID. It is used to
+ * watch leader's health and start election when necessary.
+ */
+void
+raft_process_heartbeat(uint32_t source);
+
 /** Configure whether Raft is enabled. */
 void
 raft_cfg_is_enabled(bool is_enabled);
@@ -109,6 +229,14 @@ raft_cfg_election_quorum(void);
 void
 raft_cfg_death_timeout(void);
 
+/**
+ * Bump the term. When it is persisted, the node checks if there is a leader,
+ * and if there is not, a new election is started. That said, this function can
+ * be used as tool to forcefully start new election, or restart an existing.
+ */
+void
+raft_new_term(void);
+
 /**
  * Save complete Raft state into a request to be sent to other instances of the
  * cluster. It is allowed to save anything here, not only persistent state.
@@ -123,12 +251,9 @@ raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
 void
 raft_serialize_for_disk(struct raft_request *req);
 
-/**
- * Broadcast the changes in this instance's raft status to all
- * the followers.
- */
+/** Initialize Raft global data structures. */
 void
-raft_broadcast(const struct raft_request *req);
+raft_init(void);
 
 #if defined(__cplusplus)
 }
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 76430caa6..096f455a1 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -852,6 +852,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
+/**
+ * Recreate recovery cursor from the last confirmed point. That is
+ * used by Raft, when the node becomes a leader. It may happen,
+ * that it already sent some data to other nodes as a follower,
+ * and they ignored the data. Now when the node is a leader, it
+ * should send the not confirmed data again. Otherwise the cluster
+ * will stuck, or worse - the newer data would be sent without the
+ * older sent but ignored data.
+ */
+static void
+relay_restart_recovery(struct relay *relay)
+{
+	recovery_delete(relay->r);
+	relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock);
+	recover_remaining_wals(relay->r, &relay->stream, NULL, true);
+}
+
 struct relay_raft_msg {
 	struct cmsg base;
 	struct cmsg_hop route;
@@ -867,7 +884,14 @@ relay_raft_msg_push(struct cmsg *base)
 	struct xrow_header row;
 	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
 	try {
+		/*
+		 * Send the message before restarting the recovery. Otherwise
+		 * all the rows would be sent from under a non-leader role and
+		 * would be ignored again.
+		 */
 		relay_send(msg->relay, &row);
+		if (msg->req.state == RAFT_STATE_LEADER)
+			relay_restart_recovery(msg->relay);
 	} catch (Exception *e) {
 		relay_set_error(msg->relay, e);
 		fiber_cancel(fiber());
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list