[Tarantool-patches] [PATCH v2 09/11] raft: introduce state machine

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Sep 19 18:49:08 MSK 2020


Here is a new version of the patch after some squashes.

====================
    raft: introduce state machine
    
    The commit is a core part of Raft implementation. It introduces
    the Raft state machine implementation and its integration into the
    instance's life cycle.
    
    The implementation follows the protocol to the letter except a few
    important details.
    
    Firstly, the original Raft assumes, that all nodes share the same
    log record numbers. In Tarantool they are called LSNs. But in case
    of Tarantool each node has its own LSN in its own component of
    vclock. That makes the election messages a bit heavier, because
    the nodes need to send and compare complete vclocks of each other
    instead of a single number like in the original Raft. But logic
    becomes simpler. Because in the original Raft there is a problem
    of uncertainty about what to do with records of an old leader
    right after a new leader is elected. They could be rolled back or
    confirmed depending on circumstances. The issue disappears when
    vclock is used.
    
    Secondly, leader election works differently during cluster
    bootstrap, until number of bootstrapped replicas becomes >=
    election quorum. That arises from specifics of replicas bootstrap
    and order of systems initialization. In short: during bootstrap a
    leader election may use a smaller election quorum than the
    configured one. See more details in the code.
    
    Part of #1146

diff --git a/src/box/applier.cc b/src/box/applier.cc
index fd7cf1c79..c352faf5e 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -900,8 +900,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
  * Return 0 for success or -1 in case of an error.
  */
 static int
-applier_apply_tx(struct stailq *rows)
+applier_apply_tx(struct applier *applier, struct stailq *rows)
 {
+	/*
+	 * Rows received not directly from a leader are ignored. That is a
+	 * protection against the case when an old leader keeps sending data
+	 * around not knowing yet that it is not a leader anymore.
+	 *
+	 * XXX: it may be that this can be fine to apply leader transactions by
+	 * looking at their replica_id field if it is equal to leader id. That
+	 * can be investigated as an 'optimization'. Even though may not give
+	 * anything, because won't change total number of rows sent in the
+	 * network anyway.
+	 */
+	if (!raft_is_source_allowed(applier->instance_id))
+		return 0;
 	struct xrow_header *first_row = &stailq_first_entry(rows,
 					struct applier_tx_row, next)->row;
 	struct xrow_header *last_row;
@@ -1241,6 +1254,7 @@ applier_subscribe(struct applier *applier)
 		struct xrow_header *first_row =
 			&stailq_first_entry(&rows, struct applier_tx_row,
 					    next)->row;
+		raft_process_heartbeat(applier->instance_id);
 		if (first_row->lsn == 0) {
 			if (unlikely(iproto_type_is_raft_request(
 							first_row->type))) {
@@ -1249,7 +1263,7 @@ applier_subscribe(struct applier *applier)
 					diag_raise();
 			}
 			applier_signal_ack(applier);
-		} else if (applier_apply_tx(&rows) != 0) {
+		} else if (applier_apply_tx(applier, &rows) != 0) {
 			diag_raise();
 		}
 
diff --git a/src/box/box.cc b/src/box/box.cc
index 77ab21dbb..c5dcbd959 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -157,7 +157,7 @@ void
 box_update_ro_summary(void)
 {
 	bool old_is_ro_summary = is_ro_summary;
-	is_ro_summary = is_ro || is_orphan;
+	is_ro_summary = is_ro || is_orphan || raft_is_ro();
 	/* In 99% nothing changes. Filter this out first. */
 	if (is_ro_summary == old_is_ro_summary)
 		return;
@@ -171,6 +171,10 @@ static int
 box_check_writable(void)
 {
 	if (is_ro_summary) {
+		/*
+		 * XXX: return a special error when the node is not a leader to
+		 * reroute to the leader node.
+		 */
 		diag_set(ClientError, ER_READONLY);
 		diag_log();
 		return -1;
@@ -2652,6 +2656,7 @@ box_init(void)
 
 	txn_limbo_init();
 	sequence_init();
+	raft_init();
 }
 
 bool
diff --git a/src/box/raft.c b/src/box/raft.c
index 4d3d07c48..07a49351f 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -36,6 +36,13 @@
 #include "small/region.h"
 #include "replication.h"
 #include "relay.h"
+#include "box.h"
+#include "tt_static.h"
+
+/**
+ * Maximal random deviation of the election timeout. From the configured value.
+ */
+#define RAFT_RANDOM_ELECTION_FACTOR 0.1
 
 const char *raft_state_strs[] = {
 	NULL,
@@ -48,19 +55,261 @@ const char *raft_state_strs[] = {
 struct raft raft = {
 	.leader = 0,
 	.state = RAFT_STATE_FOLLOWER,
+	.volatile_term = 1,
+	.volatile_vote = 0,
 	.is_enabled = false,
 	.is_candidate = false,
+	.is_cfg_candidate = false,
+	.is_write_in_progress = false,
 	.term = 1,
 	.vote = 0,
+	.vote_mask = 0,
+	.vote_count = 0,
+	.worker = NULL,
+	.election_timeout = 5,
 };
 
+/**
+ * Check if Raft is completely synced with disk. Meaning all its critical values
+ * are in WAL. Only in that state the node can become a leader or a candidate.
+ * If the node has a not flushed data, it means either the term was bumped, or
+ * a new vote was made.
+ *
+ * In case of term bump it means either there is another node with a newer term,
+ * and this one should be a follower; or this node bumped the term itself along
+ * with making a vote to start a new election - then it is also a follower which
+ * will turn into a candidate when the flush is done.
+ *
+ * In case of a new not flushed vote it means either this node voted for some
+ * other node, and must be a follower; or it voted for self, and also must be a
+ * follower, but will become a candidate when the flush is done.
+ *
+ * In total - when something is not synced with disk, the instance is a follower
+ * in any case.
+ */
+static bool
+raft_is_fully_on_disk(void)
+{
+	return raft.volatile_term == raft.term &&
+	       raft.volatile_vote == raft.vote;
+}
+
+/**
+ * Raft protocol says that election timeout should be a bit randomized so as
+ * the nodes wouldn't start election at the same time and end up with not having
+ * a quorum for anybody. This implementation randomizes the election timeout by
+ * adding {election timeout * random factor} value, where max value of the
+ * factor is a constant floating point value > 0.
+ */
+static inline double
+raft_new_random_election_shift(void)
+{
+	double timeout = raft.election_timeout;
+	/* Translate to ms. Integer is needed to be able to use mod below. */
+	uint32_t rand_part =
+		(uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
+	if (rand_part == 0)
+		rand_part = 1;
+	/*
+	 * XXX: this is not giving a good distribution, but it is not so trivial
+	 * to implement a correct random value generator. There is a task to
+	 * unify all such places. Not critical here.
+	 */
+	rand_part = rand() % (rand_part + 1);
+	return rand_part / 1000.0;
+}
+
+/**
+ * Raft says that during election a node1 can vote for node2, if node2 has a
+ * bigger term, or has the same term but longer log. In case of Tarantool it
+ * means the node2 vclock should be >= node1 vclock, in all components. It is
+ * not enough to compare only one component. At least because there may be not
+ * a previous leader when the election happens first time. Or a node could
+ * restart and forget who the previous leader was.
+ */
+static inline bool
+raft_can_vote_for(const struct vclock *v)
+{
+	if (v == NULL)
+		return false;
+	int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
+	return cmp == 0 || cmp == 1;
+}
+
+/**
+ * Election quorum is not strictly equal to synchronous replication quorum.
+ * Sometimes it can be lowered. That is about bootstrap.
+ *
+ * The problem with bootstrap is that when the replicaset boots, all the
+ * instances can't write to WAL and can't recover from their initial snapshot.
+ * They need one node which will boot first, and then they will replicate from
+ * it.
+ *
+ * This one node should boot from its zero snapshot, create replicaset UUID,
+ * register self with ID 1 in _cluster space, and then register all the other
+ * instances here. To do that the node must be writable. It should have
+ * read_only = false, connection quorum satisfied, and be a Raft leader if Raft
+ * is enabled.
+ *
+ * To be elected a Raft leader it needs to perform election. But it can't be
+ * done before at least synchronous quorum of the replicas is bootstrapped. And
+ * they can't be bootstrapped because wait for a leader to initialize _cluster.
+ * Cyclic dependency.
+ *
+ * This is resolved by truncation of the election quorum to the number of
+ * registered replicas, if their count is less than synchronous quorum. That
+ * helps to elect a first leader.
+ *
+ * It may seem that the first node could just declare itself a leader and then
+ * strictly follow the protocol from now on, but that won't work, because if the
+ * first node will restart after it is booted, but before quorum of replicas is
+ * booted, the cluster will stuck again.
+ *
+ * The current solution is totally safe because
+ *
+ * - after all the cluster will have node count >= quorum, if user used a
+ *   correct config (God help him if he didn't);
+ *
+ * - synchronous replication quorum is untouched - it is not truncated. Only
+ *   leader election quorum is affected. So synchronous data won't be lost.
+ */
+static inline int
+raft_election_quorum(void)
+{
+	return MIN(replication_synchro_quorum, replicaset.registered_count);
+}
+
+/** Broadcast an event about this node changed its state to all relays. */
+static inline void
+raft_broadcast_new_state(void)
+{
+	struct raft_request req;
+	memset(&req, 0, sizeof(req));
+	req.term = raft.term;
+	req.state = raft.state;
+	raft_broadcast(&req);
+}
+
+/** Raft state machine methods. 'sm' stands for State Machine. */
+
+/**
+ * Start the state machine. When it is stopped, Raft state is updated and
+ * goes to WAL when necessary, but it does not affect the instance operation.
+ * For example, when Raft is stopped, the instance role does not affect whether
+ * it is writable.
+ */
+static void
+raft_sm_start(void);
+
+/**
+ * Stop the state machine. Now until Raft is re-enabled,
+ * - Raft stops affecting the instance operation;
+ * - this node can't become a leader;
+ * - this node can't vote.
+ */
+static void
+raft_sm_stop(void);
+
+/**
+ * When the instance is a follower but is allowed to be a leader, it will wait
+ * for death of the current leader to start new election.
+ */
+static void
+raft_sm_wait_leader_dead(void);
+
+/**
+ * If election is started by this node, or it voted for some other node started
+ * the election, and it can be a leader itself, it will wait until the current
+ * election times out. When it happens, the node will start new election.
+ */
+static void
+raft_sm_wait_election_end(void);
+
+/** Bump volatile term and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_term(uint64_t new_term);
+
+/** Bump volatile vote and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote);
+
+/**
+ * Bump term and vote for self immediately. After that is persisted, the
+ * election timeout will be activated. Unless during that nothing newer happens.
+ */
+static void
+raft_sm_schedule_new_election(void);
+
+/**
+ * The main trigger of Raft state machine - start new election when the current
+ * leader dies, or when there is no a leader and the previous election failed.
+ */
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+				 int events);
+
+/** Start Raft state flush to disk. */
+static void
+raft_sm_pause_and_dump(void);
+
+static void
+raft_sm_become_leader(void);
+
+static void
+raft_sm_follow_leader(uint32_t leader);
+
+static void
+raft_sm_become_candidate(void);
+
+static const char *
+raft_request_to_string(const struct raft_request *req)
+{
+	assert(req->term != 0);
+	int size = 1024;
+	char buf[1024];
+	char *pos = buf;
+	int rc = snprintf(pos, size, "{term: %llu", req->term);
+	assert(rc >= 0);
+	pos += rc;
+	size -= rc;
+	if (req->vote != 0) {
+		rc = snprintf(pos, size, ", vote: %u", req->vote);
+		assert(rc >= 0);
+		pos += rc;
+		size -= rc;
+	}
+	if (req->state != 0) {
+		rc = snprintf(pos, size, ", state: %s",
+			      raft_state_strs[req->state]);
+		assert(rc >= 0);
+		pos += rc;
+		size -= rc;
+	}
+	if (req->vclock != NULL) {
+		rc = snprintf(pos, size, ", vclock: %s",
+			      vclock_to_string(req->vclock));
+		assert(rc >= 0);
+		pos += rc;
+		size -= rc;
+	}
+	rc = snprintf(pos, size, "}");
+	assert(rc >= 0);
+	pos += rc;
+	return tt_cstr(buf, pos - buf);
+}
+
 void
 raft_process_recovery(const struct raft_request *req)
 {
-	if (req->term != 0)
+	say_verbose("RAFT: recover %s", raft_request_to_string(req));
+	if (req->term != 0) {
 		raft.term = req->term;
-	if (req->vote != 0)
+		raft.volatile_term = req->term;
+	}
+	if (req->vote != 0) {
 		raft.vote = req->vote;
+		raft.volatile_vote = req->vote;
+	}
 	/*
 	 * Role is never persisted. If recovery is happening, the
 	 * node was restarted, and the former role can be false
@@ -80,34 +329,526 @@ raft_process_recovery(const struct raft_request *req)
 void
 raft_process_msg(const struct raft_request *req, uint32_t source)
 {
-	(void)source;
-	if (req->term > raft.term) {
-		// Update term.
-		// The logic will be similar, but the code
-		// below is for testing purposes.
-		raft.term = req->term;
+	say_info("RAFT: message %s from %u", raft_request_to_string(req),
+		 source);
+	assert(source > 0);
+	assert(source != instance_id);
+	/* Outdated request. */
+	if (req->term < raft.volatile_term)
+		return;
+
+	enum raft_state old_state = raft.state;
+
+	/* Term bump. */
+	if (req->term > raft.volatile_term)
+		raft_sm_schedule_new_term(req->term);
+
+	/* Vote request during the on-going election. */
+	if (req->vote != 0) {
+		switch (raft.state) {
+		case RAFT_STATE_FOLLOWER:
+			/*
+			 * Can't respond on vote requests when Raft is disabled.
+			 */
+			if (!raft.is_enabled) {
+				say_info("RAFT: vote request is skipped - RAFT "
+					 "is disabled");
+				break;
+			}
+			/* Check if already voted in this term. */
+			if (raft.volatile_vote != 0) {
+				say_info("RAFT: vote request is skipped - "
+					 "already voted in this term");
+				break;
+			}
+			/* Not a candidate. Can't accept votes. */
+			if (req->vote == instance_id) {
+				say_info("RAFT: vote request is skipped - "
+					 "can't accept vote for self if not a "
+					 "candidate");
+				break;
+			}
+			/* Can't vote for too old or incomparable nodes. */
+			if (!raft_can_vote_for(req->vclock)) {
+				say_info("RAFT: vote request is skipped - "
+					 "the vclock is not acceptable = %s",
+					 vclock_to_string(req->vclock));
+				break;
+			}
+			/*
+			 * Check if somebody is asking to vote for a third
+			 * node - nope. Make votes only when asked directly by
+			 * the new candidate. However that restriction may be
+			 * relaxed in future, if can be proven to be safe.
+			 */
+			if (req->vote != source) {
+				say_info("RAFT: vote request is skipped - "
+					 "indirect votes are not allowed");
+				break;
+			}
+			/*
+			 * Either the term is new, or didn't vote in the current
+			 * term yet. Anyway can vote now.
+			 */
+			raft_sm_schedule_new_vote(req->vote);
+			break;
+		case RAFT_STATE_CANDIDATE:
+			/* Check if this is a vote for a competing candidate. */
+			if (req->vote != instance_id) {
+				say_info("RAFT: vote request is skipped - "
+					 "competing candidate");
+				break;
+			}
+			/*
+			 * Vote for self was requested earlier in this round,
+			 * and now was answered by some other instance.
+			 */
+			assert(raft.volatile_vote == instance_id);
+			int quorum = raft_election_quorum();
+			bool was_set = bit_set(&raft.vote_mask, source);
+			raft.vote_count += !was_set;
+			if (raft.vote_count < quorum) {
+				say_info("RAFT: accepted vote for self, vote "
+					 "count is %d/%d", raft.vote_count,
+					 quorum);
+				break;
+			}
+			raft_sm_become_leader();
+			break;
+		case RAFT_STATE_LEADER:
+			/*
+			 * If the node is still a leader, it ignores all votes.
+			 * Indeed, if the received vote would be from a new
+			 * term, the node would bump its own term and would
+			 * enter the follower state by now. If the vote is from
+			 * the current term, then the leader can freely ignore
+			 * it.
+			 */
+			break;
+		default:
+			unreachable();
+		}
+	}
+	/*
+	 * If the node does not claim to be a leader, nothing interesting. Terms
+	 * and votes are already handled.
+	 */
+	if (req->state != RAFT_STATE_LEADER)
+		goto end;
+	/* The node is a leader, but it is already known. */
+	if (source == raft.leader)
+		goto end;
+	/*
+	 * XXX: A message from a conflicting leader. Split brain, basically.
+	 * Need to decide what to do. Current solution is to do nothing. In
+	 * future either this node should try to become a leader, or should stop
+	 * all writes and require manual intervention.
+	 */
+	if (raft.leader != 0) {
+		say_warn("RAFT: conflicting leader detected in one term - "
+			 "known is %u, received %u", raft.leader, source);
+		goto end;
+	}
+
+	/* New leader was elected. */
+	raft_sm_follow_leader(source);
+end:
+	if (raft.state != old_state) {
+		/*
+		 * New term and vote are not broadcasted yet. Firstly their WAL
+		 * write should be finished. But the state is volatile. It is ok
+		 * to broadcast it now.
+		 */
+		raft_broadcast_new_state();
+	}
+}
+
+void
+raft_process_heartbeat(uint32_t source)
+{
+	/*
+	 * When not a candidate - don't wait for anything. Therefore do not care
+	 * about the leader being dead.
+	 */
+	if (!raft.is_candidate)
+		return;
+	/* Don't care about heartbeats when this node is a leader itself. */
+	if (raft.state == RAFT_STATE_LEADER)
+		return;
+	/* Not interested in heartbeats from not a leader. */
+	if (raft.leader != source)
+		return;
+	/*
+	 * The instance currently is busy with writing something on disk. Can't
+	 * react to heartbeats.
+	 */
+	if (raft.is_write_in_progress)
+		return;
+	/*
+	 * XXX: it may be expensive to reset the timer like that. It may be less
+	 * expensive to let the timer work, and remember last timestamp when
+	 * anything was heard from the leader. Then in the timer callback check
+	 * the timestamp, and restart the timer, if it is fine.
+	 */
+	assert(ev_is_active(&raft.timer));
+	ev_timer_stop(loop(), &raft.timer);
+	raft_sm_wait_leader_dead();
+}
+
+/** Wakeup Raft state writer fiber waiting for WAL write end. */
+static void
+raft_write_cb(struct journal_entry *entry)
+{
+	fiber_wakeup(entry->complete_data);
+}
+
+/** Synchronously write a Raft request into WAL. */
+static void
+raft_write_request(const struct raft_request *req)
+{
+	assert(raft.is_write_in_progress);
+	/*
+	 * Vclock is never persisted by Raft. It is used only to
+	 * be sent to network when vote for self.
+	 */
+	assert(req->vclock == NULL);
+	/*
+	 * State is not persisted. That would be strictly against Raft protocol.
+	 * The reason is that it does not make much sense - even if the node is
+	 * a leader now, after the node is restarted, there will be another
+	 * leader elected by that time likely.
+	 */
+	assert(req->state == 0);
+	struct region *region = &fiber()->gc;
+	uint32_t svp = region_used(region);
+	struct xrow_header row;
+	char buf[sizeof(struct journal_entry) +
+		 sizeof(struct xrow_header *)];
+	struct journal_entry *entry = (struct journal_entry *)buf;
+	entry->rows[0] = &row;
+
+	if (xrow_encode_raft(&row, region, req) != 0)
+		goto fail;
+	journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb,
+			     fiber());
+
+	if (journal_write(entry) != 0 || entry->res < 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		goto fail;
 	}
-	if (req->vote > 0) {
-		// Check whether the vote's for us.
+
+	raft_broadcast(req);
+
+	region_truncate(region, svp);
+	return;
+fail:
+	/*
+	 * XXX: the stub is supposed to be removed once it is defined what to do
+	 * when a raft request WAL write fails.
+	 */
+	panic("Could not write a raft request to WAL\n");
+}
+
+static void
+raft_worker_handle_io(void)
+{
+	assert(raft.is_write_in_progress);
+	/* During write Raft can't be anything but a follower. */
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	struct raft_request req;
+	uint64_t old_term = raft.term;
+	uint32_t old_vote = raft.vote;
+	enum raft_state old_state = raft.state;
+
+	if (raft_is_fully_on_disk()) {
+end_dump:
+		raft.is_write_in_progress = false;
+		/*
+		 * The state machine is stable. Can see now, to what state to
+		 * go.
+		 */
+		if (!raft.is_candidate) {
+			/*
+			 * If not a candidate, can't do anything except vote for
+			 * somebody (if Raft is enabled). Nothing to do except
+			 * staying a follower without timeouts.
+			 */
+		} else if (raft.leader != 0) {
+			/* There is a known leader. Wait until it is dead. */
+			raft_sm_wait_leader_dead();
+		} else if (raft.vote == instance_id) {
+			/* Just wrote own vote. */
+			if (raft_election_quorum() == 1)
+				raft_sm_become_leader();
+			else
+				raft_sm_become_candidate();
+		} else if (raft.vote != 0) {
+			/*
+			 * Voted for some other node. Wait if it manages to
+			 * become a leader.
+			 */
+			raft_sm_wait_election_end();
+		} else {
+			/* No leaders, no votes. */
+			raft_sm_schedule_new_election();
+		}
+	} else {
+		memset(&req, 0, sizeof(req));
+		assert(raft.volatile_term >= raft.term);
+		/* Term is written always. */
+		req.term = raft.volatile_term;
+		if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote)
+			req.vote = raft.volatile_vote;
+
+		raft_write_request(&req);
+		say_verbose("RAFT: persist and apply state %s",
+			    raft_request_to_string(&req));
+
+		assert(req.term >= raft.term);
+		if (req.term > raft.term) {
+			raft.term = req.term;
+			raft.vote = 0;
+		}
+		if (req.vote != 0) {
+			assert(raft.vote == 0);
+			raft.vote = req.vote;
+		}
+		if (raft_is_fully_on_disk())
+			goto end_dump;
+	}
+
+	memset(&req, 0, sizeof(req));
+	/* Term is encoded always. */
+	req.term = raft.term;
+	bool has_changes = old_term != raft.term;
+	if (raft.vote != 0 && old_vote != raft.vote) {
+		req.vote = raft.vote;
+		/*
+		 * When vote for self, need to send current vclock too. Two
+		 * reasons for that:
+		 *
+		 * - nodes need to vote for the instance containing the newest
+		 *   data. So as not to loose it, because some of it may be
+		 *   confirmed by the synchronous replication;
+		 *
+		 * - replication is basically stopped during election. Other
+		 *   nodes can't learn vclock of this instance through regular
+		 *   replication.
+		 */
+		if (raft.vote == instance_id)
+			req.vclock = &replicaset.vclock;
+		has_changes = true;
 	}
-	switch (req->state) {
-	case RAFT_STATE_FOLLOWER:
-	    break;
-	case RAFT_STATE_CANDIDATE:
-	    // Perform voting logic.
-	    break;
-	case RAFT_STATE_LEADER:
-	    // Switch to a new leader.
-	    break;
-	default:
-	    break;
+	if (raft.state != old_state) {
+		req.state = raft.state;
+		has_changes = true;
 	}
+	if (has_changes)
+		raft_broadcast(&req);
+}
+
+static int
+raft_worker_f(va_list args)
+{
+	(void)args;
+	while (!fiber_is_cancelled()) {
+		if (!raft.is_write_in_progress)
+			goto idle;
+		raft_worker_handle_io();
+		if (!raft.is_write_in_progress)
+			goto idle;
+		fiber_sleep(0);
+		continue;
+	idle:
+		assert(raft_is_fully_on_disk());
+		fiber_yield();
+	}
+	return 0;
+}
+
+static void
+raft_sm_pause_and_dump(void)
+{
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	if (raft.is_write_in_progress)
+		return;
+	ev_timer_stop(loop(), &raft.timer);
+	raft.is_write_in_progress = true;
+	if (raft.worker == NULL)
+		raft.worker = fiber_new("raft_worker", raft_worker_f);
+	fiber_wakeup(raft.worker);
+}
+
+static void
+raft_sm_become_leader(void)
+{
+	assert(raft.state != RAFT_STATE_LEADER);
+	say_info("RAFT: enter leader state with quorum %d",
+		 raft_election_quorum());
+	assert(raft.leader == 0);
+	assert(raft.is_candidate);
+	assert(!raft.is_write_in_progress);
+	raft.state = RAFT_STATE_LEADER;
+	raft.leader = instance_id;
+	ev_timer_stop(loop(), &raft.timer);
+	/* Make read-write (if other subsystems allow that. */
+	box_update_ro_summary();
+}
+
+static void
+raft_sm_follow_leader(uint32_t leader)
+{
+	say_info("RAFT: leader is %u, follow", leader);
+	assert(raft.state != RAFT_STATE_LEADER);
+	assert(raft.leader == 0);
+	raft.state = RAFT_STATE_FOLLOWER;
+	raft.leader = leader;
+	if (!raft.is_write_in_progress) {
+		ev_timer_stop(loop(), &raft.timer);
+		raft_sm_wait_leader_dead();
+	}
+}
+
+static void
+raft_sm_become_candidate(void)
+{
+	say_info("RAFT: enter candidate state with 1 self vote");
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	assert(raft.leader == 0);
+	assert(raft.is_candidate);
+	assert(!raft.is_write_in_progress);
+	assert(raft_election_quorum() > 1);
+	raft.state = RAFT_STATE_CANDIDATE;
+	raft.vote_count = 1;
+	raft.vote_mask = 0;
+	bit_set(&raft.vote_mask, instance_id);
+	raft_sm_wait_election_end();
+}
+
+static void
+raft_sm_schedule_new_term(uint64_t new_term)
+{
+	say_info("RAFT: bump term to %llu, follow", new_term);
+	assert(new_term > raft.volatile_term);
+	assert(raft.volatile_term >= raft.term);
+	raft.volatile_term = new_term;
+	/* New terms means completely new Raft state. */
+	raft.volatile_vote = 0;
+	raft.leader = 0;
+	raft.state = RAFT_STATE_FOLLOWER;
+	box_update_ro_summary();
+	raft_sm_pause_and_dump();
+}
+
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote)
+{
+	say_info("RAFT: vote for %u, follow", new_vote, raft.volatile_term);
+	assert(raft.volatile_vote == 0);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	raft.volatile_vote = new_vote;
+	raft_sm_pause_and_dump();
+}
+
+static void
+raft_sm_schedule_new_election(void)
+{
+	say_info("RAFT: begin new election round");
+	assert(raft_is_fully_on_disk());
+	assert(raft.is_candidate);
+	/* Everyone is a follower until its vote for self is persisted. */
+	raft_sm_schedule_new_term(raft.term + 1);
+	raft_sm_schedule_new_vote(instance_id);
+	box_update_ro_summary();
+}
+
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+				 int events)
+{
+	assert(timer == &raft.timer);
+	(void)events;
+	ev_timer_stop(loop, timer);
+	raft_sm_schedule_new_election();
+}
+
+static void
+raft_sm_wait_leader_dead(void)
+{
+	assert(!ev_is_active(&raft.timer));
+	assert(!raft.is_write_in_progress);
+	assert(raft.is_candidate);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	assert(raft.leader != 0);
+	double death_timeout = replication_disconnect_timeout();
+	ev_timer_set(&raft.timer, death_timeout, death_timeout);
+	ev_timer_start(loop(), &raft.timer);
+}
+
+static void
+raft_sm_wait_election_end(void)
+{
+	assert(!ev_is_active(&raft.timer));
+	assert(!raft.is_write_in_progress);
+	assert(raft.is_candidate);
+	assert(raft.state == RAFT_STATE_FOLLOWER ||
+	       (raft.state == RAFT_STATE_CANDIDATE &&
+		raft.volatile_vote == instance_id));
+	assert(raft.leader == 0);
+	double election_timeout = raft.election_timeout +
+				  raft_new_random_election_shift();
+	ev_timer_set(&raft.timer, election_timeout, election_timeout);
+	ev_timer_start(loop(), &raft.timer);
+}
+
+static void
+raft_sm_start(void)
+{
+	say_info("RAFT: start state machine");
+	assert(!ev_is_active(&raft.timer));
+	assert(!raft.is_write_in_progress);
+	assert(!raft.is_enabled);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	raft.is_enabled = true;
+	raft.is_candidate = raft.is_cfg_candidate;
+	if (!raft.is_candidate)
+		/* Nop. */;
+	else if (raft.leader != 0)
+		raft_sm_wait_leader_dead();
+	else
+		raft_sm_schedule_new_election();
+	box_update_ro_summary();
+	/*
+	 * When Raft is enabled, send the complete state. Because
+	 * it wasn't sent in disabled state.
+	 */
+	struct raft_request req;
+	raft_serialize_for_network(&req, NULL);
+	raft_broadcast(&req);
+}
+
+static void
+raft_sm_stop(void)
+{
+	say_info("RAFT: stop state machine");
+	assert(raft.is_enabled);
+	raft.is_enabled = false;
+	raft.is_candidate = false;
+	if (raft.state == RAFT_STATE_LEADER)
+		raft.leader = 0;
+	raft.state = RAFT_STATE_FOLLOWER;
+	box_update_ro_summary();
 }
 
 void
 raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
 {
 	memset(req, 0, sizeof(*req));
+	/*
+	 * Volatile state is never used for any communications.
+	 * Use only persisted state.
+	 */
 	req->term = raft.term;
 	req->vote = raft.vote;
 	req->state = raft.state;
@@ -128,29 +869,90 @@ raft_serialize_for_disk(struct raft_request *req)
 void
 raft_cfg_is_enabled(bool is_enabled)
 {
-	raft.is_enabled = is_enabled;
+	if (is_enabled == raft.is_enabled)
+		return;
+
+	if (!is_enabled)
+		raft_sm_stop();
+	else
+		raft_sm_start();
 }
 
 void
 raft_cfg_is_candidate(bool is_candidate)
 {
-	raft.is_candidate = is_candidate;
+	bool old_is_candidate = raft.is_candidate;
+	raft.is_cfg_candidate = is_candidate;
+	raft.is_candidate = is_candidate && raft.is_enabled;
+	if (raft.is_candidate == old_is_candidate)
+		return;
+
+	if (raft.is_candidate) {
+		assert(raft.state == RAFT_STATE_FOLLOWER);
+		/*
+		 * If there is an on-going WAL write, it means there was some
+		 * node who sent newer data to this node.
+		 */
+		if (raft.leader == 0 && raft_is_fully_on_disk())
+			raft_sm_schedule_new_election();
+	} else if (raft.state != RAFT_STATE_FOLLOWER) {
+		if (raft.state == RAFT_STATE_LEADER)
+			raft.leader = 0;
+		raft.state = RAFT_STATE_FOLLOWER;
+		raft_broadcast_new_state();
+	}
+	box_update_ro_summary();
 }
 
 void
 raft_cfg_election_timeout(double timeout)
 {
+	if (timeout == raft.election_timeout)
+		return;
+
 	raft.election_timeout = timeout;
+	if (raft.vote != 0 && raft.leader == 0 && raft.is_candidate) {
+		assert(ev_is_active(&raft.timer));
+		double timeout = ev_timer_remaining(loop(), &raft.timer) -
+				 raft.timer.at + raft.election_timeout;
+		ev_timer_stop(loop(), &raft.timer);
+		ev_timer_set(&raft.timer, timeout, timeout);
+		ev_timer_start(loop(), &raft.timer);
+	}
 }
 
 void
 raft_cfg_election_quorum(void)
 {
+	if (raft.state != RAFT_STATE_CANDIDATE ||
+	    raft.state == RAFT_STATE_LEADER)
+		return;
+	if (raft.vote_count < raft_election_quorum())
+		return;
+	/*
+	 * The node is a candidate. It means its state if fully synced with
+	 * disk. Otherwise it would be a follower.
+	 */
+	assert(!raft.is_write_in_progress);
+	raft.state = RAFT_STATE_LEADER;
+	raft.leader = instance_id;
+	raft_broadcast_new_state();
+	box_update_ro_summary();
 }
 
 void
 raft_cfg_death_timeout(void)
 {
+	if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate &&
+	    raft.leader != 0) {
+		assert(ev_is_active(&raft.timer));
+		double death_timeout = replication_disconnect_timeout();
+		double timeout = ev_timer_remaining(loop(), &raft.timer) -
+				 raft.timer.at + death_timeout;
+		ev_timer_stop(loop(), &raft.timer);
+		ev_timer_set(&raft.timer, timeout, timeout);
+		ev_timer_start(loop(), &raft.timer);
+	}
 }
 
 void
@@ -163,3 +965,9 @@ raft_broadcast(const struct raft_request *req)
 		}
 	}
 }
+
+void
+raft_init(void)
+{
+	ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0);
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index db64cf933..23aedfe10 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -31,34 +31,140 @@
  */
 #include <stdint.h>
 #include <stdbool.h>
+#include "tarantool_ev.h"
 
 #if defined(__cplusplus)
 extern "C" {
 #endif
 
+/**
+ * This is an implementation of Raft leader election protocol, separated from
+ * synchronous replication part.
+ *
+ * The protocol describes an algorithm which helps to elect a single leader in
+ * the cluster, which is supposed to handle write requests. And re-elect a new
+ * leader, when the current leader dies.
+ *
+ * The implementation follows the protocol to the letter except a few important
+ * details.
+ *
+ * Firstly, the original Raft assumes, that all nodes share the same log record
+ * numbers. In Tarantool they are called LSNs. But in case of Tarantool each
+ * node has its own LSN in its own component of vclock. That makes the election
+ * messages a bit heavier, because the nodes need to send and compare complete
+ * vclocks of each other instead of a single number like in the original Raft.
+ * But logic becomes simpler. Because in the original Raft there is a problem of
+ * uncertainty about what to do with records of an old leader right after a new
+ * leader is elected. They could be rolled back or confirmed depending on
+ * circumstances. The issue disappears when vclock is used.
+ *
+ * Secondly, leader election works differently during cluster bootstrap, until
+ * number of bootstrapped replicas becomes >= election quorum. That arises from
+ * specifics of replicas bootstrap and order of systems initialization. In
+ * short: during bootstrap a leader election may use a smaller election quorum
+ * than the configured one. See more details in the code.
+ */
+
+struct fiber;
 struct raft_request;
 struct vclock;
 
 enum raft_state {
+	/**
+	 * Can't write. Can only accept data from a leader. Node in this state
+	 * either monitors an existing leader, or there is an on-going election
+	 * and the node voted for another node, or it can't be a candidate and
+	 * does not do anything.
+	 */
 	RAFT_STATE_FOLLOWER = 1,
+	/**
+	 * The node can't write. There is an active election, in which the node
+	 * voted for self. Now it waits for election outcome.
+	 */
 	RAFT_STATE_CANDIDATE = 2,
+	/** Election was successful. The node accepts write requests. */
 	RAFT_STATE_LEADER = 3,
 };
 
 extern const char *raft_state_strs[];
 
 struct raft {
+	/** Instance ID of leader of the current term. */
 	uint32_t leader;
+	/** State of the instance. */
 	enum raft_state state;
+	/**
+	 * Volatile part of the Raft state, whose WAL write may be still
+	 * in-progress, and yet the state may be already used. Volatile state is
+	 * never sent to anywhere, but the state machine makes decisions based
+	 * on it. That is vital.
+	 * As an example, volatile vote needs to be used to reject votes inside
+	 * a term, where the instance already voted (even if the vote WAL write
+	 * is not finished yet). Otherwise the instance would try to write
+	 * several votes inside one term.
+	 */
+	uint64_t volatile_term;
+	uint32_t volatile_vote;
+	/**
+	 * Flag whether Raft is enabled. When disabled, it still persists terms
+	 * so as to quickly enroll into the cluster when (if) it is enabled. In
+	 * everything else disabled Raft does not affect instance work.
+	 */
 	bool is_enabled;
+	/**
+	 * Flag whether the node can become a leader. It is an accumulated value
+	 * of configuration options Raft enabled and Raft candidate. If at least
+	 * one is false - the instance is not a candidate.
+	 */
 	bool is_candidate;
+	/** Flag whether the instance is allowed to be a leader. */
+	bool is_cfg_candidate;
+	/**
+	 * Flag whether Raft currently tries to write something into WAL. It
+	 * happens asynchronously, not right after Raft state is updated.
+	 */
+	bool is_write_in_progress;
+	/**
+	 * Persisted Raft state. These values are used when need to tell current
+	 * Raft state to other nodes.
+	 */
 	uint64_t term;
 	uint32_t vote;
+	/**
+	 * Bit 1 on position N means that a vote from instance with ID = N was
+	 * obtained.
+	 */
+	uint32_t vote_mask;
+	/** Number of votes for this instance. Valid only in candidate state. */
+	int vote_count;
+	/** State machine timed event trigger. */
+	struct ev_timer timer;
+	/** Worker fiber to execute blocking tasks like IO. */
+	struct fiber *worker;
+	/** Configured election timeout in seconds. */
 	double election_timeout;
 };
 
 extern struct raft raft;
 
+/**
+ * A flag whether the instance is read-only according to Raft. Even if Raft
+ * allows writes though, it does not mean the instance is writable. It can be
+ * affected by box.cfg.read_only, connection quorum.
+ */
+static inline bool
+raft_is_ro(void)
+{
+	return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
+}
+
+/** See if the instance can accept rows from an instance with the given ID. */
+static inline bool
+raft_is_source_allowed(uint32_t source_id)
+{
+	return !raft.is_enabled || raft.leader == source_id;
+}
+
 /** Check if Raft is enabled. */
 static inline bool
 raft_is_enabled(void)
@@ -78,6 +184,13 @@ raft_process_recovery(const struct raft_request *req);
 void
 raft_process_msg(const struct raft_request *req, uint32_t source);
 
+/**
+ * Process a heartbeat message from an instance with the given ID. It is used to
+ * watch leader's health and start election when necessary.
+ */
+void
+raft_process_heartbeat(uint32_t source);
+
 /** Configure whether Raft is enabled. */
 void
 raft_cfg_is_enabled(bool is_enabled);
@@ -130,6 +243,10 @@ raft_serialize_for_disk(struct raft_request *req);
 void
 raft_broadcast(const struct raft_request *req);
 
+/** Initialize Raft global data structures. */
+void
+raft_init(void);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 74581db9c..d63711600 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -771,6 +771,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
+/**
+ * Recreate recovery cursor from the last confirmed point. That is
+ * used by Raft, when the node becomes a leader. It may happen,
+ * that it already sent some data to other nodes as a follower,
+ * and they ignored the data. Now when the node is a leader, it
+ * should send the not confirmed data again. Otherwise the cluster
+ * will stuck, or worse - the newer data would be sent without the
+ * older sent but ignored data.
+ */
+static void
+relay_restart_recovery(struct relay *relay)
+{
+	recovery_delete(relay->r);
+	relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock);
+	recover_remaining_wals(relay->r, &relay->stream, NULL, true);
+}
+
 struct relay_raft_msg {
 	struct cmsg base;
 	struct cmsg_hop route;
@@ -786,7 +803,14 @@ relay_raft_msg_push(struct cmsg *base)
 	struct xrow_header row;
 	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
 	try {
+		/*
+		 * Send the message before restarting the recovery. Otherwise
+		 * all the rows would be sent from under a non-leader role and
+		 * would be ignored again.
+		 */
 		relay_send(msg->relay, &row);
+		if (msg->req.state == RAFT_STATE_LEADER)
+			relay_restart_recovery(msg->relay);
 	} catch (Exception *e) {
 		relay_set_error(msg->relay, e);
 		fiber_cancel(fiber());


More information about the Tarantool-patches mailing list