[Tarantool-patches] [PATCH 8/8] raft: state machine

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu Sep 3 02:33:18 MSK 2020


First look at Raft state machine implementation.

The commit is a draft. It does not contain any tests, and
unlikely passes the existing tests. But gives a picture of where
Raft is going.
---
 src/box/applier.cc  |  18 +-
 src/box/box.cc      |   5 +-
 src/box/lua/misc.cc |  25 +-
 src/box/raft.c      | 645 +++++++++++++++++++++++++++++++++++++++++---
 src/box/raft.h      |  89 ++++++
 src/box/relay.cc    |  19 ++
 6 files changed, 746 insertions(+), 55 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 8de2f799b..7486d9929 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -916,8 +916,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
  * Return 0 for success or -1 in case of an error.
  */
 static int
-applier_apply_tx(struct stailq *rows)
+applier_apply_tx(struct applier *applier, struct stailq *rows)
 {
+	/*
+	 * Rows received not directly from a leader are ignored. That is a
+	 * protection against the case when an old leader keeps sending data
+	 * around not knowing yet that it is not a leader anymore.
+	 *
+	 * XXX: it may be that this can be fine to apply leader transactions by
+	 * looking at their replica_id field if it is equal to leader id. That
+	 * can be investigated as an 'optimization'. Even though may not give
+	 * anything, because won't change total number of rows sent in the
+	 * network anyway.
+	 */
+	if (!raft_is_source_allowed(applier->instance_id))
+		return 0;
 	struct xrow_header *first_row = &stailq_first_entry(rows,
 					struct applier_tx_row, next)->row;
 	struct xrow_header *last_row;
@@ -1257,6 +1270,7 @@ applier_subscribe(struct applier *applier)
 		struct xrow_header *first_row =
 			&stailq_first_entry(&rows, struct applier_tx_row,
 					    next)->row;
+		raft_process_heartbeat(applier->instance_id);
 		if (first_row->lsn == 0) {
 			if (unlikely(iproto_type_is_raft_request(
 							first_row->type))) {
@@ -1266,7 +1280,7 @@ applier_subscribe(struct applier *applier)
 			} else {
 				applier_signal_ack(applier);
 			}
-		} else if (applier_apply_tx(&rows) != 0) {
+		} else if (applier_apply_tx(applier, &rows) != 0) {
 			diag_raise();
 		}
 
diff --git a/src/box/box.cc b/src/box/box.cc
index 427b771b3..2e9c90310 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -157,7 +157,7 @@ void
 box_update_ro_summary(void)
 {
 	bool old_is_ro_summary = is_ro_summary;
-	is_ro_summary = is_ro || is_orphan;
+	is_ro_summary = is_ro || is_orphan || raft_is_ro();
 	/* In 99% nothing changes. Filter this out first. */
 	if (is_ro_summary == old_is_ro_summary)
 		return;
@@ -2646,6 +2646,7 @@ box_init(void)
 
 	txn_limbo_init();
 	sequence_init();
+	raft_init();
 }
 
 bool
@@ -2794,6 +2795,8 @@ box_cfg_xc(void)
 
 	if (!is_bootstrap_leader)
 		replicaset_sync();
+	else if (raft_is_enabled())
+		raft_bootstrap_leader();
 
 	/* box.cfg.read_only is not read yet. */
 	assert(box_is_ro());
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index 98e98abe2..efbbcfd1f 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -256,24 +256,26 @@ lbox_raft_new_term(struct lua_State *L)
 	return 0;
 }
 
-static int
-lbox_raft_vote(struct lua_State *L)
-{
-	uint64_t vote_for = luaL_checkuint64(L, 1);
-	if (vote_for > UINT32_MAX)
-		return luaL_error(L, "Invalid vote");
-	raft_vote(vote_for);
-	return 0;
-}
-
 static int
 lbox_raft_get(struct lua_State *L)
 {
-	lua_createtable(L, 0, 2);
+	lua_createtable(L, 0, 8);
 	luaL_pushuint64(L, raft.term);
 	lua_setfield(L, -2, "term");
+	luaL_pushuint64(L, raft.volatile_term);
+	lua_setfield(L, -2, "volatile_term");
 	luaL_pushuint64(L, raft.vote);
 	lua_setfield(L, -2, "vote");
+	luaL_pushuint64(L, raft.volatile_vote);
+	lua_setfield(L, -2, "volatile_vote");
+	lua_pushstring(L, raft_state_strs[raft.state]);
+	lua_setfield(L, -2, "state");
+	lua_pushinteger(L, raft.vote_count);
+	lua_setfield(L, -2, "vote_count");
+	lua_pushboolean(L, raft.is_write_in_progress);
+	lua_setfield(L, -2, "is_write_in_progress");
+	lua_pushboolean(L, raft.is_candidate);
+	lua_setfield(L, -2, "is_candidate");
 	return 1;
 }
 
@@ -285,7 +287,6 @@ box_lua_misc_init(struct lua_State *L)
 		{"new_tuple_format", lbox_tuple_format_new},
 		/* Temporary helpers to sanity test raft persistency. */
 		{"raft_new_term", lbox_raft_new_term},
-		{"raft_vote", lbox_raft_vote},
 		{"raft_get", lbox_raft_get},
 		{NULL, NULL}
 	};
diff --git a/src/box/raft.c b/src/box/raft.c
index 1acffb677..6f2891291 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -36,6 +36,12 @@
 #include "small/region.h"
 #include "replication.h"
 #include "relay.h"
+#include "box.h"
+
+/**
+ * Maximal random deviation of the election timeout. From the configured value.
+ */
+#define RAFT_RANDOM_ELECTION_FACTOR 0.1
 
 const char *raft_state_strs[] = {
 	NULL,
@@ -48,19 +54,163 @@ const char *raft_state_strs[] = {
 struct raft raft = {
 	.leader = 0,
 	.state = RAFT_STATE_FOLLOWER,
+	.volatile_term = 1,
+	.volatile_vote = 0,
 	.is_enabled = false,
 	.is_candidate = false,
+	.is_cfg_candidate = false,
+	.is_write_in_progress = false,
 	.term = 1,
 	.vote = 0,
+	.vote_mask = 0,
+	.vote_count = 0,
+	.election_timeout = 5,
 };
 
+/**
+ * Check if Raft is completely synced with disk. Meaning all its critical values
+ * are in WAL. Only in that state the node can become a leader or a candidate.
+ * If the node has a not flushed data, it means either the term was bumped, or
+ * a new vote was made.
+ *
+ * In case of term bump it means either there is another node with a newer term,
+ * and this one should be a follower; or this node bumped the term itself along
+ * with making a vote to start a new election - then it is also a follower which
+ * will turn into a candidate when the flush is done.
+ *
+ * In case of a new not flushed vote it means either this node voted for some
+ * other node, and must be a follower; or it voted or self, and also must be a
+ * follower, but will become a candidate when the flush is done.
+ *
+ * In total - when something is not synced with disk, the instance is a follower
+ * in any case.
+ */
+static bool
+raft_is_fully_on_disk(void)
+{
+	return raft.volatile_term == raft.term &&
+	       raft.volatile_vote == raft.vote;
+}
+
+/**
+ * Raft protocol says that election timeout should be a bit randomized so as
+ * the nodes wouldn't start election at the same time and end up with not having
+ * a quorum for anybody. This implementation randomizes the election timeout by
+ * adding {election timeout * random factor} value, where the factor is a
+ * constant floating point value > 0.
+ */
+static inline double
+raft_new_random_election_shift(void)
+{
+	double timeout = raft.election_timeout;
+	/* Translate to ms. */
+	uint32_t rand_part =
+		(uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
+	if (rand_part == 0)
+		rand_part = 1;
+	/*
+	 * XXX: this is not giving a good distribution, but it is not so trivial
+	 * to implement a correct random value generator. There is a task to
+	 * unify all such places. Not critical here.
+	 */
+	rand_part = rand() % (rand_part + 1);
+	return rand_part / 1000.0;
+}
+
+/**
+ * Raft says that during election a node1 can vote for node2, if node2 has a
+ * bigger term, or has the same term but longer log. In case of Tarantool it
+ * means the node2 vclock should be >= node1 vclock, in all components. It is
+ * not enough to compare only one component. At least because there may be not
+ * a previous leader when the election happens first time. Or a node could
+ * restart and forget who the previous leader was.
+ */
+static inline bool
+raft_can_vote_for(const struct vclock *v)
+{
+	if (v == NULL)
+		return false;
+	int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
+	return cmp == 0 || cmp == 1;
+}
+
+/** Broadcast an event about this node changed its state to all relays. */
+static inline void
+raft_broadcast_new_state(void)
+{
+	struct raft_request req;
+	memset(&req, 0, sizeof(req));
+	req.term = raft.term;
+	req.state = raft.state;
+	raft_broadcast(&req);
+}
+
+/** Raft state machine methods. 'sm' stands for State Machine. */
+
+/**
+ * Start the state machine. When it is stopped, Raft state is updated and
+ * goes to WAL when necessary, but it does not affect the instance operation.
+ * For example, when Raft is stopped, the instance role does not affect whether
+ * it is writable.
+ */
+static void
+raft_sm_start(void);
+
+/**
+ * Stop the state machine.
+ * - Raft stops affecting the instance operation;
+ * - this node can't become a leader anymore;
+ * - this node can't vote.
+ */
+static void
+raft_sm_stop(void);
+
+/**
+ * When the instance is a follower but is allowed to be a leader, it will wait
+ * for death of the current leader to start new election.
+ */
+static void
+raft_sm_wait_leader_dead(void);
+
+/**
+ * If election is started by this node, or it voted for some other node started
+ * the election, and it can be a leader itself, it will wait until the current
+ * election times out. When it happens, the node will start new election.
+ */
+static void
+raft_sm_wait_election_end(void);
+
+/** Bump volatile term and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_term(uint64_t new_term);
+
+/** Bump volatile vote and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote);
+
+/** Bump volatile term, vote for self, and schedule their flush to disk. */
+static void
+raft_sm_schedule_new_election(void);
+
+/**
+ * The main trigger of Raft state machine - start new election when the current
+ * leader dies, or when there is no a leader and the previous election failed.
+ */
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+				 int events);
+
 void
 raft_process_recovery(const struct raft_request *req)
 {
-	if (req->term != 0)
+	if (req->term != 0) {
 		raft.term = req->term;
-	if (req->vote != 0)
+		raft.volatile_term = req->term;
+	}
+	if (req->vote != 0) {
 		raft.vote = req->vote;
+		raft.volatile_vote = req->vote;
+	}
 	/*
 	 * Role is never persisted. If recovery is happening, the
 	 * node was restarted, and the former role can be false
@@ -80,34 +230,136 @@ raft_process_recovery(const struct raft_request *req)
 void
 raft_process_msg(const struct raft_request *req, uint32_t source)
 {
-	(void)source;
-	if (req->term > raft.term) {
-		// Update term.
-		// The logic will be similar, but the code
-		// below is for testing purposes.
-		raft.term = req->term;
-	}
-	if (req->vote > 0) {
-		// Check whether the vote's for us.
+	assert(source > 0);
+	assert(source != instance_id);
+	/* Outdated request. */
+	if (req->term < raft.volatile_term)
+		return;
+
+	enum raft_state old_state = raft.state;
+
+	/* Term bump. */
+	if (req->term > raft.volatile_term)
+		raft_sm_schedule_new_term(req->term);
+
+	/* Vote request during the on-going election. */
+	if (req->vote != 0) {
+		switch (raft.state) {
+		case RAFT_STATE_FOLLOWER:
+		case RAFT_STATE_LEADER:
+			/*
+			 * Can't respond on vote requests when Raft is disabled.
+			 */
+			if (!raft.is_enabled)
+				break;
+			/* Check if already voted in this term. */
+			if (raft.volatile_vote != 0)
+				break;
+			/* Not a candidate. Can't accept votes. */
+			if (req->vote == instance_id)
+				break;
+			/* Can't vote for too old or incomparable nodes. */
+			if (!raft_can_vote_for(req->vclock))
+				break;
+			/*
+			 * Either the term is new, or didn't vote in the current
+			 * term yet. Anyway can vote now.
+			 */
+			raft.state = RAFT_STATE_FOLLOWER;
+			raft_sm_schedule_new_vote(req->vote);
+			break;
+		case RAFT_STATE_CANDIDATE:
+			/* Check if this is a vote for a competing candidate. */
+			if (req->vote != instance_id)
+				break;
+			/*
+			 * Vote for self was requested earlier in this round,
+			 * and now was answered by some other instance.
+			 */
+			assert(raft.volatile_vote == instance_id);
+			bool was_set = bit_set(&raft.vote_mask, source);
+			raft.vote_count += !was_set;
+			if (raft.vote_count < replication_synchro_quorum)
+				break;
+			raft.state = RAFT_STATE_LEADER;
+			raft.leader = instance_id;
+			break;
+		default:
+			unreachable();
+		}
 	}
-	switch (req->state) {
-	case RAFT_STATE_FOLLOWER:
-	    break;
-	case RAFT_STATE_CANDIDATE:
-	    // Perform voting logic.
-	    break;
-	case RAFT_STATE_LEADER:
-	    // Switch to a new leader.
-	    break;
-	default:
-	    break;
+	/*
+	 * If the node does not claim to be a leader, nothing interesting. Terms
+	 * and votes are already handled.
+	 */
+	if (req->state != RAFT_STATE_LEADER)
+		goto end;
+	/* The node is a leader, but it is already known. */
+	if (source == raft.leader)
+		goto end;
+	/*
+	 * XXX: A message from a conflicting leader. Split brain, basically.
+	 * Need to decide what to do. Current solution is to do nothing. In
+	 * future either this node should try to become a leader, or should stop
+	 * all writes and require manual intervention.
+	 */
+	if (raft.leader != 0)
+		goto end;
+
+	/* New leader was elected. */
+	raft.state = RAFT_STATE_FOLLOWER;
+	raft.leader = source;
+end:
+	if (raft.state != old_state) {
+		/*
+		 * If the node stopped being a leader - should become read-only.
+		 * If became a leader - should become read-write (if other
+		 * subsystems also allow read-write).
+		 */
+		box_update_ro_summary();
+		/*
+		 * New term and vote are not broadcasted yet. Firstly their WAL
+		 * write should be finished. But the state is volatile. It is ok
+		 * to broadcast it now.
+		 */
+		raft_broadcast_new_state();
 	}
 }
 
+void
+raft_process_heartbeat(uint32_t source)
+{
+	/*
+	 * When not a candidate - don't wait for anything. Therefore does not
+	 * care about the leader being dead.
+	 */
+	if (!raft.is_candidate)
+		return;
+	/* Don't care about heartbeats when this node is a leader itself. */
+	if (raft.state == RAFT_STATE_LEADER)
+		return;
+	/* Not interested in heartbeats from not a leader. */
+	if (raft.leader != source)
+		return;
+	/*
+	 * XXX: it may be expensive to reset the timer like that. It may be less
+	 * expensive to let the timer work, and remember last timestamp when
+	 * anything was heard from the leader. Then in the timer callback check
+	 * the timestamp, and restart the timer, if it is fine.
+	 */
+	assert(ev_is_active(&raft.timer));
+	ev_timer_stop(loop(), &raft.timer);
+	raft_sm_wait_leader_dead();
+}
+
 void
 raft_serialize(struct raft_request *req, struct vclock *vclock)
 {
 	memset(req, 0, sizeof(*req));
+	/*
+	 * Volatile state is never used for any communications.
+	 * Use only persisted state.
+	 */
 	req->term = raft.term;
 	req->vote = raft.vote;
 	req->state = raft.state;
@@ -117,15 +369,23 @@ raft_serialize(struct raft_request *req, struct vclock *vclock)
 	req->vclock = vclock;
 }
 
+/** Wakeup Raft state writer fiber waiting for WAL write end. */
 static void
 raft_write_cb(struct journal_entry *entry)
 {
 	fiber_wakeup(entry->complete_data);
 }
 
+/** Synchronously write a Raft request into WAL. */
 static void
 raft_write_request(const struct raft_request *req)
 {
+	assert(raft.is_write_in_progress);
+	/*
+	 * Vclock is never persisted by Raft. It is used only to
+	 * be sent to network when vote for self.
+	 */
+	assert(req->vclock == NULL);
 	struct region *region = &fiber()->gc;
 	uint32_t svp = region_used(region);
 	struct xrow_header row;
@@ -157,57 +417,343 @@ fail:
 	panic("Could not write a raft request to WAL\n");
 }
 
+/**
+ * Flush Raft state changes to WAL. The callback resets itself, if during the
+ * write more changes appear.
+ */
+static void
+raft_sm_dump_step(ev_loop *loop, ev_check *watcher, int events)
+{
+	assert(watcher == &raft.io);
+	(void) events;
+	assert(raft.is_write_in_progress);
+	/* During write Raft can't be anything but a follower. */
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	struct raft_request req;
+	uint64_t old_term = raft.term;
+	uint32_t old_vote = raft.vote;
+	enum raft_state old_state = raft.state;
+
+	if (raft_is_fully_on_disk()) {
+end_dump:
+		raft.is_write_in_progress = false;
+		ev_check_stop(loop, watcher);
+		/*
+		 * The state machine is stable. Can see now, to what state to
+		 * go.
+		 */
+		if (!raft.is_candidate) {
+			/*
+			 * If not a candidate, can't do anything except vote for
+			 * somebody (if Raft is enabled). Nothing to do except
+			 * staying a follower without timeouts.
+			 */
+		} else if (raft.leader != 0) {
+			/* There is a known leader. Wait until it is dead. */
+			raft_sm_wait_leader_dead();
+		} else if (raft.vote == instance_id) {
+			/* Just wrote own vote. */
+			if (replication_synchro_quorum == 1) {
+				raft.state = RAFT_STATE_LEADER;
+				raft.leader = instance_id;
+				/*
+				 * Make read-write (if other subsystems allow
+				 * that).
+				 */
+				box_update_ro_summary();
+			} else {
+				raft.state = RAFT_STATE_CANDIDATE;
+				raft.vote_count = 1;
+				raft.vote_mask = 0;
+				raft_sm_wait_election_end();
+			}
+		} else if (raft.vote != 0) {
+			/*
+			 * Voted for some other node. Wait if it manages to
+			 * become a leader.
+			 */
+			raft_sm_wait_election_end();
+		} else {
+			/* No leaders, no votes. */
+			raft_sm_schedule_new_election();
+		}
+	} else {
+		memset(&req, 0, sizeof(req));
+		assert(raft.volatile_term >= raft.term);
+		/* Term is written always. */
+		req.term = raft.volatile_term;
+		if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote)
+			req.vote = raft.volatile_vote;
+
+		raft_write_request(&req);
+
+		assert(req.term >= raft.term);
+		if (req.term > raft.term) {
+			raft.term = req.term;
+			raft.vote = 0;
+		}
+		if (req.vote != 0) {
+			assert(raft.vote == 0);
+			raft.vote = req.vote;
+		}
+		if (raft_is_fully_on_disk())
+			goto end_dump;
+	}
+
+	memset(&req, 0, sizeof(req));
+	/* Term is encoded always. */
+	req.term = raft.term;
+	bool has_changes = old_term != raft.term;
+	if (raft.vote != 0 && old_vote != raft.vote) {
+		req.vote = raft.vote;
+		/*
+		 * When vote for self, need to send current vclock too. Two
+		 * reasons for that:
+		 *
+		 * - nodes need to vote for the instance containing the newest
+		 *   data. So as not to loose it, because some of it may be
+		 *   confirmed by the synchronous replication;
+		 *
+		 * - replication is basically stopped during election. Other
+		 *   nodes can't learn vclock of this instance through regular
+		 *   replication.
+		 */
+		if (raft.vote == instance_id)
+			req.vclock = &replicaset.vclock;
+		has_changes = true;
+	}
+	if (raft.state != old_state) {
+		req.state = raft.state;
+		has_changes = true;
+	}
+	if (has_changes)
+		raft_broadcast(&req);
+}
+
+/** Start Raft state flush to disk. */
+static void
+raft_sm_pause_and_dump(void)
+{
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	if (raft.is_write_in_progress)
+		return;
+	ev_timer_stop(loop(), &raft.timer);
+	ev_check_start(loop(), &raft.io);
+	raft.is_write_in_progress = true;
+}
+
+/** Bump term, reset Raft state, and persist that fact. */
+static void
+raft_sm_schedule_new_term(uint64_t new_term)
+{
+	assert(new_term > raft.volatile_term);
+	assert(raft.volatile_term >= raft.term);
+	raft.volatile_term = new_term;
+	/* New terms means completely new Raft state. */
+	raft.volatile_vote = 0;
+	raft.leader = 0;
+	raft.state = RAFT_STATE_FOLLOWER;
+	raft_sm_pause_and_dump();
+}
+
+/** Vote in the current term, and persist that fact. */
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote)
+{
+	assert(raft.volatile_vote == 0);
+	raft.volatile_vote = new_vote;
+	raft_sm_pause_and_dump();
+}
+
+/**
+ * Bump term and vote for self immediately. After that is persisted, the
+ * election timeout will be activated. Unless during that nothing newer happens.
+ */
+static void
+raft_sm_schedule_new_election(void)
+{
+	assert(raft_is_fully_on_disk());
+	assert(raft.is_candidate);
+	assert(raft.leader == 0);
+	/* Everyone is a follower until its vote for self is persisted. */
+	raft_sm_schedule_new_term(raft.term + 1);
+	raft_sm_schedule_new_vote(instance_id);
+	box_update_ro_summary();
+}
+
 void
 raft_new_term(uint64_t min_new_term)
 {
+	uint64_t new_term;
 	if (raft.term < min_new_term)
-		raft.term = min_new_term + 1;
+		new_term = min_new_term + 1;
 	else
-		++raft.term;
+		new_term = raft.term + 1;
+	enum raft_state old_state = raft.state;
+	raft_sm_schedule_new_term(new_term);
+	if (raft.state != old_state)
+		raft_broadcast_new_state();
+	box_update_ro_summary();
+}
+
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+				 int events)
+{
+	assert(timer == &raft.timer);
+	(void)events;
+	ev_timer_stop(loop, timer);
+	raft_sm_schedule_new_election();
+}
+
+static void
+raft_sm_wait_leader_dead(void)
+{
+	assert(!ev_is_active(&raft.timer));
+	assert(!ev_is_active(&raft.io));
+	assert(!raft.is_write_in_progress);
+	assert(raft.is_candidate);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	assert(raft.leader != 0);
+	double death_timeout = replication_disconnect_timeout();
+	ev_timer_set(&raft.timer, death_timeout, death_timeout);
+}
 
+static void
+raft_sm_wait_election_end(void)
+{
+	assert(!ev_is_active(&raft.timer));
+	assert(!ev_is_active(&raft.io));
+	assert(!raft.is_write_in_progress);
+	assert(raft.is_candidate);
+	assert(raft.state == RAFT_STATE_FOLLOWER ||
+	       (raft.state == RAFT_STATE_CANDIDATE &&
+		raft.volatile_vote == instance_id));
+	assert(raft.leader == 0);
+	double election_timeout = raft.election_timeout +
+				  raft_new_random_election_shift();
+	ev_timer_set(&raft.timer, election_timeout, election_timeout);
+}
+
+static void
+raft_sm_start(void)
+{
+	assert(!ev_is_active(&raft.timer));
+	assert(!ev_is_active(&raft.io));
+	assert(!raft.is_write_in_progress);
+	assert(!raft.is_enabled);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	raft.is_enabled = true;
+	raft.is_candidate = raft.is_cfg_candidate;
+	if (!raft.is_candidate)
+		/* Nop. */;
+	else if (raft.leader != 0)
+		raft_sm_wait_leader_dead();
+	else
+		raft_sm_schedule_new_election();
+	box_update_ro_summary();
+	/*
+	 * When Raft is enabled, send the complete state. Because
+	 * it wasn't sent in disabled state.
+	 */
 	struct raft_request req;
-	memset(&req, 0, sizeof(req));
-	req.term = raft.term;
-	raft_write_request(&req);
+	raft_serialize(&req, NULL);
+	raft_broadcast(&req);
+}
+
+static void
+raft_sm_stop(void)
+{
+	assert(raft.is_enabled);
+	raft.is_enabled = false;
+	raft.is_candidate = false;
+	box_update_ro_summary();
 }
 
 void
 raft_cfg_is_enabled(bool is_enabled)
 {
-	raft.is_enabled = is_enabled;
+	if (is_enabled == raft.is_enabled)
+		return;
+
+	if (!is_enabled)
+		raft_sm_stop();
+	else
+		raft_sm_start();
 }
 
 void
 raft_cfg_is_candidate(bool is_candidate)
 {
-	raft.is_candidate = is_candidate;
+	bool old_is_candidate = raft.is_candidate;
+	raft.is_cfg_candidate = is_candidate;
+	raft.is_candidate = is_candidate && raft.is_enabled;
+	if (raft.is_candidate == old_is_candidate)
+		return;
+
+	if (raft.is_candidate) {
+		assert(raft.state == RAFT_STATE_FOLLOWER);
+		/*
+		 * If there is an on-going WAL write, it means
+		 * there was some node who sent newer data to this
+		 * node.
+		 */
+		if (raft.leader == 0 && raft_is_fully_on_disk())
+			raft_sm_schedule_new_election();
+	} else if (raft.state != RAFT_STATE_FOLLOWER) {
+		raft.state = RAFT_STATE_FOLLOWER;
+		raft_broadcast_new_state();
+	}
+	box_update_ro_summary();
 }
 
 void
 raft_cfg_election_timeout(double timeout)
 {
+	if (timeout == raft.election_timeout)
+		return;
+
 	raft.election_timeout = timeout;
+	if (raft.vote != 0 && raft.leader == 0) {
+		assert(ev_is_active(&raft.timer));
+		double timeout = ev_timer_remaining(loop(), &raft.timer) -
+				 raft.timer.at + raft.election_timeout;
+		ev_timer_stop(loop(), &raft.timer);
+		ev_timer_set(&raft.timer, timeout, timeout);
+	}
 }
 
 void
 raft_cfg_election_quorum(void)
 {
+	if (raft.state != RAFT_STATE_CANDIDATE ||
+	    raft.state == RAFT_STATE_LEADER)
+		return;
+	if (raft.vote_count < replication_synchro_quorum)
+		return;
+	/*
+	 * The node is a candidate. It means its state if fully synced with
+	 * disk. Otherwise it would be a follower.
+	 */
+	assert(!raft.is_write_in_progress);
+	raft.state = RAFT_STATE_LEADER;
+	raft.leader = instance_id;
+	raft_broadcast_new_state();
+	box_update_ro_summary();
 }
 
 void
 raft_cfg_death_timeout(void)
 {
-}
-
-void
-raft_vote(uint32_t vote_for)
-{
-	raft.vote = vote_for;
-
-	struct raft_request req;
-	memset(&req, 0, sizeof(req));
-	req.vote = vote_for;
-	raft_write_request(&req);
+	if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate &&
+	    raft.leader != 0) {
+		assert(ev_is_active(&raft.timer));
+		double death_timeout = replication_disconnect_timeout();
+		double timeout = ev_timer_remaining(loop(), &raft.timer) -
+				 raft.timer.at + death_timeout;
+		ev_timer_stop(loop(), &raft.timer);
+		ev_timer_set(&raft.timer, timeout, timeout);
+	}
 }
 
 void
@@ -220,3 +766,22 @@ raft_broadcast(const struct raft_request *req)
 		}
 	}
 }
+
+void
+raft_bootstrap_leader(void)
+{
+	assert(raft.is_enabled);
+	assert(raft.volatile_term == 0);
+	assert(raft.volatile_vote == 0);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	raft.state = RAFT_STATE_LEADER;
+	raft_broadcast_new_state();
+	box_update_ro_summary();
+}
+
+void
+raft_init(void)
+{
+	ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0);
+	ev_check_init(&raft.io, raft_sm_dump_step);
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index d875707de..57584bc1b 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -31,6 +31,7 @@
  */
 #include <stdint.h>
 #include <stdbool.h>
+#include "tarantool_ev.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -48,12 +49,62 @@ enum raft_state {
 extern const char *raft_state_strs[];
 
 struct raft {
+	/** Instance ID of leader of the current term. */
 	uint32_t leader;
+	/** State of the instance. */
 	enum raft_state state;
+	/**
+	 * Volatile part of the Raft state, whose WAL write may be
+	 * still in-progress, and yet the state may be already
+	 * used. Volatile state is never sent to anywhere, but the
+	 * state machine makes decisions based on it. That is
+	 * vital.
+	 * As an example, volatile vote needs to be used to reject
+	 * votes inside a term, where the instance already voted
+	 * (even if the vote WAL write is not finished yet).
+	 * Otherwise the instance would try to write several votes
+	 * inside one term.
+	 */
+	uint64_t volatile_term;
+	uint32_t volatile_vote;
+	/**
+	 * Flag whether Raft is enabled. When disabled, it still
+	 * persists terms so as to quickly enroll into the cluster
+	 * when (if) it is enabled. In everything else disabled
+	 * Raft does not affect instance work.
+	 */
 	bool is_enabled;
+	/**
+	 * Flag whether the node can become a leader. It is an
+	 * accumulated value of configuration options Raft enabled
+	 * Raft candidate. If at least one is false - the instance
+	 * is not a candidate.
+	 */
 	bool is_candidate;
+	/** Flag whether the instance is allowed to be a leader. */
+	bool is_cfg_candidate;
+	/**
+	 * Flag whether Raft currently tries to write something into WAL. It
+	 * happens asynchronously, not right after Raft state is updated.
+	 */
+	bool is_write_in_progress;
+	/**
+	 * Persisted Raft state. These values are used when need to tell current
+	 * Raft state to other nodes.
+	 */
 	uint64_t term;
 	uint32_t vote;
+	/** Bit 1 means that a vote from that instance was obtained. */
+	uint32_t vote_mask;
+	/** Number of votes for this instance. Valid only in candidate state. */
+	int vote_count;
+	/** State machine timed event trigger. */
+	struct ev_timer timer;
+	/**
+	 * Dump of Raft state in the end of event loop, when it is changed.
+	 */
+	struct ev_check io;
+	/** Configured election timeout in seconds. */
 	double election_timeout;
 };
 
@@ -65,6 +116,18 @@ raft_new_term(uint64_t min_new_term);
 void
 raft_vote(uint32_t vote_for);
 
+static inline bool
+raft_is_ro(void)
+{
+	return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
+}
+
+static inline bool
+raft_is_source_allowed(uint32_t source_id)
+{
+	return !raft.is_enabled || raft.leader == source_id;
+}
+
 static inline bool
 raft_is_enabled(void)
 {
@@ -83,6 +146,9 @@ raft_process_recovery(const struct raft_request *req);
 void
 raft_process_msg(const struct raft_request *req, uint32_t source);
 
+void
+raft_process_heartbeat(uint32_t source);
+
 /**
  * Broadcast the changes in this instance's raft status to all
  * the followers.
@@ -113,6 +179,29 @@ raft_serialize(struct raft_request *req, struct vclock *vclock);
 void
 raft_broadcast(const struct raft_request *req);
 
+/**
+ * Bootstrap the current instance as the first leader of the cluster. That is
+ * done bypassing the Raft election protocol, by just assigning this node a
+ * leader role. That is needed, because when the cluster is not bootstrapped, it
+ * is necessary to find a node, which will generate a replicaset UUID, write it
+ * into _cluster space, and register all the other nodes in _cluster.
+ * Until it is done, all nodes but one won't boot. Their WALs won't work. And
+ * therefore they won't be able to participate in leader election. That
+ * effectively makes the cluster dead from the beginning unless the first
+ * bootstrapped node won't declare itself a leader without elections.
+ *
+ * XXX: That does not solve the problem, when the first node boots, creates a
+ * snapshot, and then immediately dies. After recovery it won't declare itself a
+ * leader. Therefore if quorum > 1, the cluster won't proceed to registering
+ * any replicas and becomes completely dead. Perhaps that must be solved by
+ * truncating quorum down to number of records in _cluster.
+ */
+void
+raft_bootstrap_leader(void);
+
+void
+raft_init(void);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 74581db9c..4f9bbc0de 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -771,6 +771,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
+/**
+ * Recreate recovery cursor from the last confirmed point. That is
+ * used by Raft, when the node becomes a leader. It may happen,
+ * that it already sent some data to other nodes as a follower,
+ * and they ignored the data. Now when the node is a leader, it
+ * should send the not confirmed data again. Otherwise the cluster
+ * will stuck, or worse - the newer data would be sent without the
+ * older sent but ignored data.
+ */
+static void
+relay_restart_recovery(struct relay *relay)
+{
+	recovery_delete(relay->r);
+	relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock);
+	recover_remaining_wals(relay->r, &relay->stream, NULL, true);
+}
+
 struct relay_raft_msg {
 	struct cmsg base;
 	struct cmsg_hop route;
@@ -786,6 +803,8 @@ relay_raft_msg_push(struct cmsg *base)
 	struct xrow_header row;
 	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
 	try {
+		if (msg->req.state == RAFT_STATE_LEADER)
+			relay_restart_recovery(msg->relay);
 		relay_send(msg->relay, &row);
 	} catch (Exception *e) {
 		relay_set_error(msg->relay, e);
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list