[Tarantool-patches] [PATCH v2 09/11] raft: introduce state machine

Serge Petrenko sergepetrenko at tarantool.org
Mon Sep 21 11:22:15 MSK 2020


19.09.2020 18:49, Vladislav Shpilevoy пишет:
> Here is a new version of the patch after some squashes.
>
> ====================
>      raft: introduce state machine
>      
>      The commit is a core part of Raft implementation. It introduces
>      the Raft state machine implementation and its integration into the
>      instance's life cycle.
>      
>      The implementation follows the protocol to the letter except a few
>      important details.
>      
>      Firstly, the original Raft assumes, that all nodes share the same
>      log record numbers. In Tarantool they are called LSNs. But in case
>      of Tarantool each node has its own LSN in its own component of
>      vclock. That makes the election messages a bit heavier, because
>      the nodes need to send and compare complete vclocks of each other
>      instead of a single number like in the original Raft. But logic
>      becomes simpler. Because in the original Raft there is a problem
>      of uncertainty about what to do with records of an old leader
>      right after a new leader is elected. They could be rolled back or
>      confirmed depending on circumstances. The issue disappears when
>      vclock is used.
>      
>      Secondly, leader election works differently during cluster
>      bootstrap, until number of bootstrapped replicas becomes >=
>      election quorum. That arises from specifics of replicas bootstrap
>      and order of systems initialization. In short: during bootstrap a
>      leader election may use a smaller election quorum than the
>      configured one. See more details in the code.
>      
>      Part of #1146

Consider these fixes for applier vclock segfault pushed on top of the 
branch.

  src/box/box.cc |  6 +-----
  src/box/raft.c | 12 +++++++++---
  2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index c5dcbd959..1169f2cd5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2160,11 +2160,7 @@ box_process_subscribe(struct ev_io *io, struct 
xrow_header *header)
           * should be 0.
           */
          struct raft_request req;
-        /*
-         * Omit the candidate vclock, since we've just sent it in
-         * subscribe response.
-         */
-        raft_serialize_for_network(&req, NULL);
+        raft_serialize_for_network(&req, &vclock);
          xrow_encode_raft(&row, &fiber()->gc, &req);
          coio_write_xrow(io, &row);
      }
diff --git a/src/box/raft.c b/src/box/raft.c
index 7b7ef9c1c..45783f0e3 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -130,8 +130,6 @@ raft_new_random_election_shift(void)
  static inline bool
  raft_can_vote_for(const struct vclock *v)
  {
-    if (v == NULL)
-        return false;
      int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
      return cmp == 0 || cmp == 1;
  }
@@ -369,6 +367,12 @@ raft_process_msg(const struct raft_request *req, 
uint32_t source)
                       "candidate");
                  break;
              }
+            /* Can't vote when vclock is unknown. */
+            if (req->vclock == NULL) {
+                say_info("RAFT: vote request is skipped - "
+                     "missing candidate vclock.");
+                break;
+            }
              /* Can't vote for too old or incomparable nodes. */
              if (!raft_can_vote_for(req->vclock)) {
                  say_info("RAFT: vote request is skipped - "
@@ -858,8 +862,10 @@ raft_serialize_for_network(struct raft_request 
*req, struct vclock *vclock)
      req->state = raft.state;
      /*
       * Raft does not own vclock, so it always expects it passed 
externally.
+     * Vclock is sent out only by candidate instances.
       */
-    req->vclock = vclock;
+    if (req->state == RAFT_STATE_CANDIDATE)
+        req->vclock = vclock;
  }

  void
-- 
2.24.3 (Apple Git-128)


>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index fd7cf1c79..c352faf5e 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -900,8 +900,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>    * Return 0 for success or -1 in case of an error.
>    */
>   static int
> -applier_apply_tx(struct stailq *rows)
> +applier_apply_tx(struct applier *applier, struct stailq *rows)
>   {
> +	/*
> +	 * Rows received not directly from a leader are ignored. That is a
> +	 * protection against the case when an old leader keeps sending data
> +	 * around not knowing yet that it is not a leader anymore.
> +	 *
> +	 * XXX: it may be that this can be fine to apply leader transactions by
> +	 * looking at their replica_id field if it is equal to leader id. That
> +	 * can be investigated as an 'optimization'. Even though may not give
> +	 * anything, because won't change total number of rows sent in the
> +	 * network anyway.
> +	 */
> +	if (!raft_is_source_allowed(applier->instance_id))
> +		return 0;
>   	struct xrow_header *first_row = &stailq_first_entry(rows,
>   					struct applier_tx_row, next)->row;
>   	struct xrow_header *last_row;
> @@ -1241,6 +1254,7 @@ applier_subscribe(struct applier *applier)
>   		struct xrow_header *first_row =
>   			&stailq_first_entry(&rows, struct applier_tx_row,
>   					    next)->row;
> +		raft_process_heartbeat(applier->instance_id);
>   		if (first_row->lsn == 0) {
>   			if (unlikely(iproto_type_is_raft_request(
>   							first_row->type))) {
> @@ -1249,7 +1263,7 @@ applier_subscribe(struct applier *applier)
>   					diag_raise();
>   			}
>   			applier_signal_ack(applier);
> -		} else if (applier_apply_tx(&rows) != 0) {
> +		} else if (applier_apply_tx(applier, &rows) != 0) {
>   			diag_raise();
>   		}
>   
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 77ab21dbb..c5dcbd959 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -157,7 +157,7 @@ void
>   box_update_ro_summary(void)
>   {
>   	bool old_is_ro_summary = is_ro_summary;
> -	is_ro_summary = is_ro || is_orphan;
> +	is_ro_summary = is_ro || is_orphan || raft_is_ro();
>   	/* In 99% nothing changes. Filter this out first. */
>   	if (is_ro_summary == old_is_ro_summary)
>   		return;
> @@ -171,6 +171,10 @@ static int
>   box_check_writable(void)
>   {
>   	if (is_ro_summary) {
> +		/*
> +		 * XXX: return a special error when the node is not a leader to
> +		 * reroute to the leader node.
> +		 */
>   		diag_set(ClientError, ER_READONLY);
>   		diag_log();
>   		return -1;
> @@ -2652,6 +2656,7 @@ box_init(void)
>   
>   	txn_limbo_init();
>   	sequence_init();
> +	raft_init();
>   }
>   
>   bool
> diff --git a/src/box/raft.c b/src/box/raft.c
> index 4d3d07c48..07a49351f 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -36,6 +36,13 @@
>   #include "small/region.h"
>   #include "replication.h"
>   #include "relay.h"
> +#include "box.h"
> +#include "tt_static.h"
> +
> +/**
> + * Maximal random deviation of the election timeout. From the configured value.
> + */
> +#define RAFT_RANDOM_ELECTION_FACTOR 0.1
>   
>   const char *raft_state_strs[] = {
>   	NULL,
> @@ -48,19 +55,261 @@ const char *raft_state_strs[] = {
>   struct raft raft = {
>   	.leader = 0,
>   	.state = RAFT_STATE_FOLLOWER,
> +	.volatile_term = 1,
> +	.volatile_vote = 0,
>   	.is_enabled = false,
>   	.is_candidate = false,
> +	.is_cfg_candidate = false,
> +	.is_write_in_progress = false,
>   	.term = 1,
>   	.vote = 0,
> +	.vote_mask = 0,
> +	.vote_count = 0,
> +	.worker = NULL,
> +	.election_timeout = 5,
>   };
>   
> +/**
> + * Check if Raft is completely synced with disk. Meaning all its critical values
> + * are in WAL. Only in that state the node can become a leader or a candidate.
> + * If the node has a not flushed data, it means either the term was bumped, or
> + * a new vote was made.
> + *
> + * In case of term bump it means either there is another node with a newer term,
> + * and this one should be a follower; or this node bumped the term itself along
> + * with making a vote to start a new election - then it is also a follower which
> + * will turn into a candidate when the flush is done.
> + *
> + * In case of a new not flushed vote it means either this node voted for some
> + * other node, and must be a follower; or it voted for self, and also must be a
> + * follower, but will become a candidate when the flush is done.
> + *
> + * In total - when something is not synced with disk, the instance is a follower
> + * in any case.
> + */
> +static bool
> +raft_is_fully_on_disk(void)
> +{
> +	return raft.volatile_term == raft.term &&
> +	       raft.volatile_vote == raft.vote;
> +}
> +
> +/**
> + * Raft protocol says that election timeout should be a bit randomized so as
> + * the nodes wouldn't start election at the same time and end up with not having
> + * a quorum for anybody. This implementation randomizes the election timeout by
> + * adding {election timeout * random factor} value, where max value of the
> + * factor is a constant floating point value > 0.
> + */
> +static inline double
> +raft_new_random_election_shift(void)
> +{
> +	double timeout = raft.election_timeout;
> +	/* Translate to ms. Integer is needed to be able to use mod below. */
> +	uint32_t rand_part =
> +		(uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
> +	if (rand_part == 0)
> +		rand_part = 1;
> +	/*
> +	 * XXX: this is not giving a good distribution, but it is not so trivial
> +	 * to implement a correct random value generator. There is a task to
> +	 * unify all such places. Not critical here.
> +	 */
> +	rand_part = rand() % (rand_part + 1);
> +	return rand_part / 1000.0;
> +}
> +
> +/**
> + * Raft says that during election a node1 can vote for node2, if node2 has a
> + * bigger term, or has the same term but longer log. In case of Tarantool it
> + * means the node2 vclock should be >= node1 vclock, in all components. It is
> + * not enough to compare only one component. At least because there may be not
> + * a previous leader when the election happens first time. Or a node could
> + * restart and forget who the previous leader was.
> + */
> +static inline bool
> +raft_can_vote_for(const struct vclock *v)
> +{
> +	if (v == NULL)
> +		return false;
> +	int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
> +	return cmp == 0 || cmp == 1;
> +}
> +
> +/**
> + * Election quorum is not strictly equal to synchronous replication quorum.
> + * Sometimes it can be lowered. That is about bootstrap.
> + *
> + * The problem with bootstrap is that when the replicaset boots, all the
> + * instances can't write to WAL and can't recover from their initial snapshot.
> + * They need one node which will boot first, and then they will replicate from
> + * it.
> + *
> + * This one node should boot from its zero snapshot, create replicaset UUID,
> + * register self with ID 1 in _cluster space, and then register all the other
> + * instances here. To do that the node must be writable. It should have
> + * read_only = false, connection quorum satisfied, and be a Raft leader if Raft
> + * is enabled.
> + *
> + * To be elected a Raft leader it needs to perform election. But it can't be
> + * done before at least synchronous quorum of the replicas is bootstrapped. And
> + * they can't be bootstrapped because wait for a leader to initialize _cluster.
> + * Cyclic dependency.
> + *
> + * This is resolved by truncation of the election quorum to the number of
> + * registered replicas, if their count is less than synchronous quorum. That
> + * helps to elect a first leader.
> + *
> + * It may seem that the first node could just declare itself a leader and then
> + * strictly follow the protocol from now on, but that won't work, because if the
> + * first node will restart after it is booted, but before quorum of replicas is
> + * booted, the cluster will stuck again.
> + *
> + * The current solution is totally safe because
> + *
> + * - after all the cluster will have node count >= quorum, if user used a
> + *   correct config (God help him if he didn't);
> + *
> + * - synchronous replication quorum is untouched - it is not truncated. Only
> + *   leader election quorum is affected. So synchronous data won't be lost.
> + */
> +static inline int
> +raft_election_quorum(void)
> +{
> +	return MIN(replication_synchro_quorum, replicaset.registered_count);
> +}
> +
> +/** Broadcast an event about this node changed its state to all relays. */
> +static inline void
> +raft_broadcast_new_state(void)
> +{
> +	struct raft_request req;
> +	memset(&req, 0, sizeof(req));
> +	req.term = raft.term;
> +	req.state = raft.state;
> +	raft_broadcast(&req);
> +}
> +
> +/** Raft state machine methods. 'sm' stands for State Machine. */
> +
> +/**
> + * Start the state machine. When it is stopped, Raft state is updated and
> + * goes to WAL when necessary, but it does not affect the instance operation.
> + * For example, when Raft is stopped, the instance role does not affect whether
> + * it is writable.
> + */
> +static void
> +raft_sm_start(void);
> +
> +/**
> + * Stop the state machine. Now until Raft is re-enabled,
> + * - Raft stops affecting the instance operation;
> + * - this node can't become a leader;
> + * - this node can't vote.
> + */
> +static void
> +raft_sm_stop(void);
> +
> +/**
> + * When the instance is a follower but is allowed to be a leader, it will wait
> + * for death of the current leader to start new election.
> + */
> +static void
> +raft_sm_wait_leader_dead(void);
> +
> +/**
> + * If election is started by this node, or it voted for some other node started
> + * the election, and it can be a leader itself, it will wait until the current
> + * election times out. When it happens, the node will start new election.
> + */
> +static void
> +raft_sm_wait_election_end(void);
> +
> +/** Bump volatile term and schedule its flush to disk. */
> +static void
> +raft_sm_schedule_new_term(uint64_t new_term);
> +
> +/** Bump volatile vote and schedule its flush to disk. */
> +static void
> +raft_sm_schedule_new_vote(uint32_t new_vote);
> +
> +/**
> + * Bump term and vote for self immediately. After that is persisted, the
> + * election timeout will be activated. Unless during that nothing newer happens.
> + */
> +static void
> +raft_sm_schedule_new_election(void);
> +
> +/**
> + * The main trigger of Raft state machine - start new election when the current
> + * leader dies, or when there is no a leader and the previous election failed.
> + */
> +static void
> +raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
> +				 int events);
> +
> +/** Start Raft state flush to disk. */
> +static void
> +raft_sm_pause_and_dump(void);
> +
> +static void
> +raft_sm_become_leader(void);
> +
> +static void
> +raft_sm_follow_leader(uint32_t leader);
> +
> +static void
> +raft_sm_become_candidate(void);
> +
> +static const char *
> +raft_request_to_string(const struct raft_request *req)
> +{
> +	assert(req->term != 0);
> +	int size = 1024;
> +	char buf[1024];
> +	char *pos = buf;
> +	int rc = snprintf(pos, size, "{term: %llu", req->term);
> +	assert(rc >= 0);
> +	pos += rc;
> +	size -= rc;
> +	if (req->vote != 0) {
> +		rc = snprintf(pos, size, ", vote: %u", req->vote);
> +		assert(rc >= 0);
> +		pos += rc;
> +		size -= rc;
> +	}
> +	if (req->state != 0) {
> +		rc = snprintf(pos, size, ", state: %s",
> +			      raft_state_strs[req->state]);
> +		assert(rc >= 0);
> +		pos += rc;
> +		size -= rc;
> +	}
> +	if (req->vclock != NULL) {
> +		rc = snprintf(pos, size, ", vclock: %s",
> +			      vclock_to_string(req->vclock));
> +		assert(rc >= 0);
> +		pos += rc;
> +		size -= rc;
> +	}
> +	rc = snprintf(pos, size, "}");
> +	assert(rc >= 0);
> +	pos += rc;
> +	return tt_cstr(buf, pos - buf);
> +}
> +
>   void
>   raft_process_recovery(const struct raft_request *req)
>   {
> -	if (req->term != 0)
> +	say_verbose("RAFT: recover %s", raft_request_to_string(req));
> +	if (req->term != 0) {
>   		raft.term = req->term;
> -	if (req->vote != 0)
> +		raft.volatile_term = req->term;
> +	}
> +	if (req->vote != 0) {
>   		raft.vote = req->vote;
> +		raft.volatile_vote = req->vote;
> +	}
>   	/*
>   	 * Role is never persisted. If recovery is happening, the
>   	 * node was restarted, and the former role can be false
> @@ -80,34 +329,526 @@ raft_process_recovery(const struct raft_request *req)
>   void
>   raft_process_msg(const struct raft_request *req, uint32_t source)
>   {
> -	(void)source;
> -	if (req->term > raft.term) {
> -		// Update term.
> -		// The logic will be similar, but the code
> -		// below is for testing purposes.
> -		raft.term = req->term;
> +	say_info("RAFT: message %s from %u", raft_request_to_string(req),
> +		 source);
> +	assert(source > 0);
> +	assert(source != instance_id);
> +	/* Outdated request. */
> +	if (req->term < raft.volatile_term)
> +		return;
> +
> +	enum raft_state old_state = raft.state;
> +
> +	/* Term bump. */
> +	if (req->term > raft.volatile_term)
> +		raft_sm_schedule_new_term(req->term);
> +
> +	/* Vote request during the on-going election. */
> +	if (req->vote != 0) {
> +		switch (raft.state) {
> +		case RAFT_STATE_FOLLOWER:
> +			/*
> +			 * Can't respond on vote requests when Raft is disabled.
> +			 */
> +			if (!raft.is_enabled) {
> +				say_info("RAFT: vote request is skipped - RAFT "
> +					 "is disabled");
> +				break;
> +			}
> +			/* Check if already voted in this term. */
> +			if (raft.volatile_vote != 0) {
> +				say_info("RAFT: vote request is skipped - "
> +					 "already voted in this term");
> +				break;
> +			}
> +			/* Not a candidate. Can't accept votes. */
> +			if (req->vote == instance_id) {
> +				say_info("RAFT: vote request is skipped - "
> +					 "can't accept vote for self if not a "
> +					 "candidate");
> +				break;
> +			}
> +			/* Can't vote for too old or incomparable nodes. */
> +			if (!raft_can_vote_for(req->vclock)) {
> +				say_info("RAFT: vote request is skipped - "
> +					 "the vclock is not acceptable = %s",
> +					 vclock_to_string(req->vclock));
> +				break;
> +			}
> +			/*
> +			 * Check if somebody is asking to vote for a third
> +			 * node - nope. Make votes only when asked directly by
> +			 * the new candidate. However that restriction may be
> +			 * relaxed in future, if can be proven to be safe.
> +			 */
> +			if (req->vote != source) {
> +				say_info("RAFT: vote request is skipped - "
> +					 "indirect votes are not allowed");
> +				break;
> +			}
> +			/*
> +			 * Either the term is new, or didn't vote in the current
> +			 * term yet. Anyway can vote now.
> +			 */
> +			raft_sm_schedule_new_vote(req->vote);
> +			break;
> +		case RAFT_STATE_CANDIDATE:
> +			/* Check if this is a vote for a competing candidate. */
> +			if (req->vote != instance_id) {
> +				say_info("RAFT: vote request is skipped - "
> +					 "competing candidate");
> +				break;
> +			}
> +			/*
> +			 * Vote for self was requested earlier in this round,
> +			 * and now was answered by some other instance.
> +			 */
> +			assert(raft.volatile_vote == instance_id);
> +			int quorum = raft_election_quorum();
> +			bool was_set = bit_set(&raft.vote_mask, source);
> +			raft.vote_count += !was_set;
> +			if (raft.vote_count < quorum) {
> +				say_info("RAFT: accepted vote for self, vote "
> +					 "count is %d/%d", raft.vote_count,
> +					 quorum);
> +				break;
> +			}
> +			raft_sm_become_leader();
> +			break;
> +		case RAFT_STATE_LEADER:
> +			/*
> +			 * If the node is still a leader, it ignores all votes.
> +			 * Indeed, if the received vote would be from a new
> +			 * term, the node would bump its own term and would
> +			 * enter the follower state by now. If the vote is from
> +			 * the current term, then the leader can freely ignore
> +			 * it.
> +			 */
> +			break;
> +		default:
> +			unreachable();
> +		}
> +	}
> +	/*
> +	 * If the node does not claim to be a leader, nothing interesting. Terms
> +	 * and votes are already handled.
> +	 */
> +	if (req->state != RAFT_STATE_LEADER)
> +		goto end;
> +	/* The node is a leader, but it is already known. */
> +	if (source == raft.leader)
> +		goto end;
> +	/*
> +	 * XXX: A message from a conflicting leader. Split brain, basically.
> +	 * Need to decide what to do. Current solution is to do nothing. In
> +	 * future either this node should try to become a leader, or should stop
> +	 * all writes and require manual intervention.
> +	 */
> +	if (raft.leader != 0) {
> +		say_warn("RAFT: conflicting leader detected in one term - "
> +			 "known is %u, received %u", raft.leader, source);
> +		goto end;
> +	}
> +
> +	/* New leader was elected. */
> +	raft_sm_follow_leader(source);
> +end:
> +	if (raft.state != old_state) {
> +		/*
> +		 * New term and vote are not broadcasted yet. Firstly their WAL
> +		 * write should be finished. But the state is volatile. It is ok
> +		 * to broadcast it now.
> +		 */
> +		raft_broadcast_new_state();
> +	}
> +}
> +
> +void
> +raft_process_heartbeat(uint32_t source)
> +{
> +	/*
> +	 * When not a candidate - don't wait for anything. Therefore do not care
> +	 * about the leader being dead.
> +	 */
> +	if (!raft.is_candidate)
> +		return;
> +	/* Don't care about heartbeats when this node is a leader itself. */
> +	if (raft.state == RAFT_STATE_LEADER)
> +		return;
> +	/* Not interested in heartbeats from not a leader. */
> +	if (raft.leader != source)
> +		return;
> +	/*
> +	 * The instance currently is busy with writing something on disk. Can't
> +	 * react to heartbeats.
> +	 */
> +	if (raft.is_write_in_progress)
> +		return;
> +	/*
> +	 * XXX: it may be expensive to reset the timer like that. It may be less
> +	 * expensive to let the timer work, and remember last timestamp when
> +	 * anything was heard from the leader. Then in the timer callback check
> +	 * the timestamp, and restart the timer, if it is fine.
> +	 */
> +	assert(ev_is_active(&raft.timer));
> +	ev_timer_stop(loop(), &raft.timer);
> +	raft_sm_wait_leader_dead();
> +}
> +
> +/** Wakeup Raft state writer fiber waiting for WAL write end. */
> +static void
> +raft_write_cb(struct journal_entry *entry)
> +{
> +	fiber_wakeup(entry->complete_data);
> +}
> +
> +/** Synchronously write a Raft request into WAL. */
> +static void
> +raft_write_request(const struct raft_request *req)
> +{
> +	assert(raft.is_write_in_progress);
> +	/*
> +	 * Vclock is never persisted by Raft. It is used only to
> +	 * be sent to network when vote for self.
> +	 */
> +	assert(req->vclock == NULL);
> +	/*
> +	 * State is not persisted. That would be strictly against Raft protocol.
> +	 * The reason is that it does not make much sense - even if the node is
> +	 * a leader now, after the node is restarted, there will be another
> +	 * leader elected by that time likely.
> +	 */
> +	assert(req->state == 0);
> +	struct region *region = &fiber()->gc;
> +	uint32_t svp = region_used(region);
> +	struct xrow_header row;
> +	char buf[sizeof(struct journal_entry) +
> +		 sizeof(struct xrow_header *)];
> +	struct journal_entry *entry = (struct journal_entry *)buf;
> +	entry->rows[0] = &row;
> +
> +	if (xrow_encode_raft(&row, region, req) != 0)
> +		goto fail;
> +	journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb,
> +			     fiber());
> +
> +	if (journal_write(entry) != 0 || entry->res < 0) {
> +		diag_set(ClientError, ER_WAL_IO);
> +		diag_log();
> +		goto fail;
>   	}
> -	if (req->vote > 0) {
> -		// Check whether the vote's for us.
> +
> +	raft_broadcast(req);
> +
> +	region_truncate(region, svp);
> +	return;
> +fail:
> +	/*
> +	 * XXX: the stub is supposed to be removed once it is defined what to do
> +	 * when a raft request WAL write fails.
> +	 */
> +	panic("Could not write a raft request to WAL\n");
> +}
> +
> +static void
> +raft_worker_handle_io(void)
> +{
> +	assert(raft.is_write_in_progress);
> +	/* During write Raft can't be anything but a follower. */
> +	assert(raft.state == RAFT_STATE_FOLLOWER);
> +	struct raft_request req;
> +	uint64_t old_term = raft.term;
> +	uint32_t old_vote = raft.vote;
> +	enum raft_state old_state = raft.state;
> +
> +	if (raft_is_fully_on_disk()) {
> +end_dump:
> +		raft.is_write_in_progress = false;
> +		/*
> +		 * The state machine is stable. Can see now, to what state to
> +		 * go.
> +		 */
> +		if (!raft.is_candidate) {
> +			/*
> +			 * If not a candidate, can't do anything except vote for
> +			 * somebody (if Raft is enabled). Nothing to do except
> +			 * staying a follower without timeouts.
> +			 */
> +		} else if (raft.leader != 0) {
> +			/* There is a known leader. Wait until it is dead. */
> +			raft_sm_wait_leader_dead();
> +		} else if (raft.vote == instance_id) {
> +			/* Just wrote own vote. */
> +			if (raft_election_quorum() == 1)
> +				raft_sm_become_leader();
> +			else
> +				raft_sm_become_candidate();
> +		} else if (raft.vote != 0) {
> +			/*
> +			 * Voted for some other node. Wait if it manages to
> +			 * become a leader.
> +			 */
> +			raft_sm_wait_election_end();
> +		} else {
> +			/* No leaders, no votes. */
> +			raft_sm_schedule_new_election();
> +		}
> +	} else {
> +		memset(&req, 0, sizeof(req));
> +		assert(raft.volatile_term >= raft.term);
> +		/* Term is written always. */
> +		req.term = raft.volatile_term;
> +		if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote)
> +			req.vote = raft.volatile_vote;
> +
> +		raft_write_request(&req);
> +		say_verbose("RAFT: persist and apply state %s",
> +			    raft_request_to_string(&req));
> +
> +		assert(req.term >= raft.term);
> +		if (req.term > raft.term) {
> +			raft.term = req.term;
> +			raft.vote = 0;
> +		}
> +		if (req.vote != 0) {
> +			assert(raft.vote == 0);
> +			raft.vote = req.vote;
> +		}
> +		if (raft_is_fully_on_disk())
> +			goto end_dump;
> +	}
> +
> +	memset(&req, 0, sizeof(req));
> +	/* Term is encoded always. */
> +	req.term = raft.term;
> +	bool has_changes = old_term != raft.term;
> +	if (raft.vote != 0 && old_vote != raft.vote) {
> +		req.vote = raft.vote;
> +		/*
> +		 * When vote for self, need to send current vclock too. Two
> +		 * reasons for that:
> +		 *
> +		 * - nodes need to vote for the instance containing the newest
> +		 *   data. So as not to loose it, because some of it may be
> +		 *   confirmed by the synchronous replication;
> +		 *
> +		 * - replication is basically stopped during election. Other
> +		 *   nodes can't learn vclock of this instance through regular
> +		 *   replication.
> +		 */
> +		if (raft.vote == instance_id)
> +			req.vclock = &replicaset.vclock;
> +		has_changes = true;
>   	}
> -	switch (req->state) {
> -	case RAFT_STATE_FOLLOWER:
> -	    break;
> -	case RAFT_STATE_CANDIDATE:
> -	    // Perform voting logic.
> -	    break;
> -	case RAFT_STATE_LEADER:
> -	    // Switch to a new leader.
> -	    break;
> -	default:
> -	    break;
> +	if (raft.state != old_state) {
> +		req.state = raft.state;
> +		has_changes = true;
>   	}
> +	if (has_changes)
> +		raft_broadcast(&req);
> +}
> +
> +static int
> +raft_worker_f(va_list args)
> +{
> +	(void)args;
> +	while (!fiber_is_cancelled()) {
> +		if (!raft.is_write_in_progress)
> +			goto idle;
> +		raft_worker_handle_io();
> +		if (!raft.is_write_in_progress)
> +			goto idle;
> +		fiber_sleep(0);
> +		continue;
> +	idle:
> +		assert(raft_is_fully_on_disk());
> +		fiber_yield();
> +	}
> +	return 0;
> +}
> +
> +static void
> +raft_sm_pause_and_dump(void)
> +{
> +	assert(raft.state == RAFT_STATE_FOLLOWER);
> +	if (raft.is_write_in_progress)
> +		return;
> +	ev_timer_stop(loop(), &raft.timer);
> +	raft.is_write_in_progress = true;
> +	if (raft.worker == NULL)
> +		raft.worker = fiber_new("raft_worker", raft_worker_f);
> +	fiber_wakeup(raft.worker);
> +}
> +
> +static void
> +raft_sm_become_leader(void)
> +{
> +	assert(raft.state != RAFT_STATE_LEADER);
> +	say_info("RAFT: enter leader state with quorum %d",
> +		 raft_election_quorum());
> +	assert(raft.leader == 0);
> +	assert(raft.is_candidate);
> +	assert(!raft.is_write_in_progress);
> +	raft.state = RAFT_STATE_LEADER;
> +	raft.leader = instance_id;
> +	ev_timer_stop(loop(), &raft.timer);
> +	/* Make read-write (if other subsystems allow that. */
> +	box_update_ro_summary();
> +}
> +
> +static void
> +raft_sm_follow_leader(uint32_t leader)
> +{
> +	say_info("RAFT: leader is %u, follow", leader);
> +	assert(raft.state != RAFT_STATE_LEADER);
> +	assert(raft.leader == 0);
> +	raft.state = RAFT_STATE_FOLLOWER;
> +	raft.leader = leader;
> +	if (!raft.is_write_in_progress) {
> +		ev_timer_stop(loop(), &raft.timer);
> +		raft_sm_wait_leader_dead();
> +	}
> +}
> +
> +static void
> +raft_sm_become_candidate(void)
> +{
> +	say_info("RAFT: enter candidate state with 1 self vote");
> +	assert(raft.state == RAFT_STATE_FOLLOWER);
> +	assert(raft.leader == 0);
> +	assert(raft.is_candidate);
> +	assert(!raft.is_write_in_progress);
> +	assert(raft_election_quorum() > 1);
> +	raft.state = RAFT_STATE_CANDIDATE;
> +	raft.vote_count = 1;
> +	raft.vote_mask = 0;
> +	bit_set(&raft.vote_mask, instance_id);
> +	raft_sm_wait_election_end();
> +}
> +
> +static void
> +raft_sm_schedule_new_term(uint64_t new_term)
> +{
> +	say_info("RAFT: bump term to %llu, follow", new_term);
> +	assert(new_term > raft.volatile_term);
> +	assert(raft.volatile_term >= raft.term);
> +	raft.volatile_term = new_term;
> +	/* New terms means completely new Raft state. */
> +	raft.volatile_vote = 0;
> +	raft.leader = 0;
> +	raft.state = RAFT_STATE_FOLLOWER;
> +	box_update_ro_summary();
> +	raft_sm_pause_and_dump();
> +}
> +
> +static void
> +raft_sm_schedule_new_vote(uint32_t new_vote)
> +{
> +	say_info("RAFT: vote for %u, follow", new_vote, raft.volatile_term);
> +	assert(raft.volatile_vote == 0);
> +	assert(raft.state == RAFT_STATE_FOLLOWER);
> +	raft.volatile_vote = new_vote;
> +	raft_sm_pause_and_dump();
> +}
> +
> +static void
> +raft_sm_schedule_new_election(void)
> +{
> +	say_info("RAFT: begin new election round");
> +	assert(raft_is_fully_on_disk());
> +	assert(raft.is_candidate);
> +	/* Everyone is a follower until its vote for self is persisted. */
> +	raft_sm_schedule_new_term(raft.term + 1);
> +	raft_sm_schedule_new_vote(instance_id);
> +	box_update_ro_summary();
> +}
> +
> +static void
> +raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
> +				 int events)
> +{
> +	assert(timer == &raft.timer);
> +	(void)events;
> +	ev_timer_stop(loop, timer);
> +	raft_sm_schedule_new_election();
> +}
> +
> +static void
> +raft_sm_wait_leader_dead(void)
> +{
> +	assert(!ev_is_active(&raft.timer));
> +	assert(!raft.is_write_in_progress);
> +	assert(raft.is_candidate);
> +	assert(raft.state == RAFT_STATE_FOLLOWER);
> +	assert(raft.leader != 0);
> +	double death_timeout = replication_disconnect_timeout();
> +	ev_timer_set(&raft.timer, death_timeout, death_timeout);
> +	ev_timer_start(loop(), &raft.timer);
> +}
> +
> +static void
> +raft_sm_wait_election_end(void)
> +{
> +	assert(!ev_is_active(&raft.timer));
> +	assert(!raft.is_write_in_progress);
> +	assert(raft.is_candidate);
> +	assert(raft.state == RAFT_STATE_FOLLOWER ||
> +	       (raft.state == RAFT_STATE_CANDIDATE &&
> +		raft.volatile_vote == instance_id));
> +	assert(raft.leader == 0);
> +	double election_timeout = raft.election_timeout +
> +				  raft_new_random_election_shift();
> +	ev_timer_set(&raft.timer, election_timeout, election_timeout);
> +	ev_timer_start(loop(), &raft.timer);
> +}
> +
> +static void
> +raft_sm_start(void)
> +{
> +	say_info("RAFT: start state machine");
> +	assert(!ev_is_active(&raft.timer));
> +	assert(!raft.is_write_in_progress);
> +	assert(!raft.is_enabled);
> +	assert(raft.state == RAFT_STATE_FOLLOWER);
> +	raft.is_enabled = true;
> +	raft.is_candidate = raft.is_cfg_candidate;
> +	if (!raft.is_candidate)
> +		/* Nop. */;
> +	else if (raft.leader != 0)
> +		raft_sm_wait_leader_dead();
> +	else
> +		raft_sm_schedule_new_election();
> +	box_update_ro_summary();
> +	/*
> +	 * When Raft is enabled, send the complete state. Because
> +	 * it wasn't sent in disabled state.
> +	 */
> +	struct raft_request req;
> +	raft_serialize_for_network(&req, NULL);
> +	raft_broadcast(&req);
> +}
> +
> +static void
> +raft_sm_stop(void)
> +{
> +	say_info("RAFT: stop state machine");
> +	assert(raft.is_enabled);
> +	raft.is_enabled = false;
> +	raft.is_candidate = false;
> +	if (raft.state == RAFT_STATE_LEADER)
> +		raft.leader = 0;
> +	raft.state = RAFT_STATE_FOLLOWER;
> +	box_update_ro_summary();
>   }
>   
>   void
>   raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
>   {
>   	memset(req, 0, sizeof(*req));
> +	/*
> +	 * Volatile state is never used for any communications.
> +	 * Use only persisted state.
> +	 */
>   	req->term = raft.term;
>   	req->vote = raft.vote;
>   	req->state = raft.state;
> @@ -128,29 +869,90 @@ raft_serialize_for_disk(struct raft_request *req)
>   void
>   raft_cfg_is_enabled(bool is_enabled)
>   {
> -	raft.is_enabled = is_enabled;
> +	if (is_enabled == raft.is_enabled)
> +		return;
> +
> +	if (!is_enabled)
> +		raft_sm_stop();
> +	else
> +		raft_sm_start();
>   }
>   
>   void
>   raft_cfg_is_candidate(bool is_candidate)
>   {
> -	raft.is_candidate = is_candidate;
> +	bool old_is_candidate = raft.is_candidate;
> +	raft.is_cfg_candidate = is_candidate;
> +	raft.is_candidate = is_candidate && raft.is_enabled;
> +	if (raft.is_candidate == old_is_candidate)
> +		return;
> +
> +	if (raft.is_candidate) {
> +		assert(raft.state == RAFT_STATE_FOLLOWER);
> +		/*
> +		 * If there is an on-going WAL write, it means there was some
> +		 * node who sent newer data to this node.
> +		 */
> +		if (raft.leader == 0 && raft_is_fully_on_disk())
> +			raft_sm_schedule_new_election();
> +	} else if (raft.state != RAFT_STATE_FOLLOWER) {
> +		if (raft.state == RAFT_STATE_LEADER)
> +			raft.leader = 0;
> +		raft.state = RAFT_STATE_FOLLOWER;
> +		raft_broadcast_new_state();
> +	}
> +	box_update_ro_summary();
>   }
>   
>   void
>   raft_cfg_election_timeout(double timeout)
>   {
> +	if (timeout == raft.election_timeout)
> +		return;
> +
>   	raft.election_timeout = timeout;
> +	if (raft.vote != 0 && raft.leader == 0 && raft.is_candidate) {
> +		assert(ev_is_active(&raft.timer));
> +		double timeout = ev_timer_remaining(loop(), &raft.timer) -
> +				 raft.timer.at + raft.election_timeout;
> +		ev_timer_stop(loop(), &raft.timer);
> +		ev_timer_set(&raft.timer, timeout, timeout);
> +		ev_timer_start(loop(), &raft.timer);
> +	}
>   }
>   
>   void
>   raft_cfg_election_quorum(void)
>   {
> +	if (raft.state != RAFT_STATE_CANDIDATE ||
> +	    raft.state == RAFT_STATE_LEADER)
> +		return;
> +	if (raft.vote_count < raft_election_quorum())
> +		return;
> +	/*
> +	 * The node is a candidate. It means its state if fully synced with
> +	 * disk. Otherwise it would be a follower.
> +	 */
> +	assert(!raft.is_write_in_progress);
> +	raft.state = RAFT_STATE_LEADER;
> +	raft.leader = instance_id;
> +	raft_broadcast_new_state();
> +	box_update_ro_summary();
>   }
>   
>   void
>   raft_cfg_death_timeout(void)
>   {
> +	if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate &&
> +	    raft.leader != 0) {
> +		assert(ev_is_active(&raft.timer));
> +		double death_timeout = replication_disconnect_timeout();
> +		double timeout = ev_timer_remaining(loop(), &raft.timer) -
> +				 raft.timer.at + death_timeout;
> +		ev_timer_stop(loop(), &raft.timer);
> +		ev_timer_set(&raft.timer, timeout, timeout);
> +		ev_timer_start(loop(), &raft.timer);
> +	}
>   }
>   
>   void
> @@ -163,3 +965,9 @@ raft_broadcast(const struct raft_request *req)
>   		}
>   	}
>   }
> +
> +void
> +raft_init(void)
> +{
> +	ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0);
> +}
> diff --git a/src/box/raft.h b/src/box/raft.h
> index db64cf933..23aedfe10 100644
> --- a/src/box/raft.h
> +++ b/src/box/raft.h
> @@ -31,34 +31,140 @@
>    */
>   #include <stdint.h>
>   #include <stdbool.h>
> +#include "tarantool_ev.h"
>   
>   #if defined(__cplusplus)
>   extern "C" {
>   #endif
>   
> +/**
> + * This is an implementation of Raft leader election protocol, separated from
> + * synchronous replication part.
> + *
> + * The protocol describes an algorithm which helps to elect a single leader in
> + * the cluster, which is supposed to handle write requests. And re-elect a new
> + * leader, when the current leader dies.
> + *
> + * The implementation follows the protocol to the letter except a few important
> + * details.
> + *
> + * Firstly, the original Raft assumes, that all nodes share the same log record
> + * numbers. In Tarantool they are called LSNs. But in case of Tarantool each
> + * node has its own LSN in its own component of vclock. That makes the election
> + * messages a bit heavier, because the nodes need to send and compare complete
> + * vclocks of each other instead of a single number like in the original Raft.
> + * But logic becomes simpler. Because in the original Raft there is a problem of
> + * uncertainty about what to do with records of an old leader right after a new
> + * leader is elected. They could be rolled back or confirmed depending on
> + * circumstances. The issue disappears when vclock is used.
> + *
> + * Secondly, leader election works differently during cluster bootstrap, until
> + * number of bootstrapped replicas becomes >= election quorum. That arises from
> + * specifics of replicas bootstrap and order of systems initialization. In
> + * short: during bootstrap a leader election may use a smaller election quorum
> + * than the configured one. See more details in the code.
> + */
> +
> +struct fiber;
>   struct raft_request;
>   struct vclock;
>   
>   enum raft_state {
> +	/**
> +	 * Can't write. Can only accept data from a leader. Node in this state
> +	 * either monitors an existing leader, or there is an on-going election
> +	 * and the node voted for another node, or it can't be a candidate and
> +	 * does not do anything.
> +	 */
>   	RAFT_STATE_FOLLOWER = 1,
> +	/**
> +	 * The node can't write. There is an active election, in which the node
> +	 * voted for self. Now it waits for election outcome.
> +	 */
>   	RAFT_STATE_CANDIDATE = 2,
> +	/** Election was successful. The node accepts write requests. */
>   	RAFT_STATE_LEADER = 3,
>   };
>   
>   extern const char *raft_state_strs[];
>   
>   struct raft {
> +	/** Instance ID of leader of the current term. */
>   	uint32_t leader;
> +	/** State of the instance. */
>   	enum raft_state state;
> +	/**
> +	 * Volatile part of the Raft state, whose WAL write may be still
> +	 * in-progress, and yet the state may be already used. Volatile state is
> +	 * never sent to anywhere, but the state machine makes decisions based
> +	 * on it. That is vital.
> +	 * As an example, volatile vote needs to be used to reject votes inside
> +	 * a term, where the instance already voted (even if the vote WAL write
> +	 * is not finished yet). Otherwise the instance would try to write
> +	 * several votes inside one term.
> +	 */
> +	uint64_t volatile_term;
> +	uint32_t volatile_vote;
> +	/**
> +	 * Flag whether Raft is enabled. When disabled, it still persists terms
> +	 * so as to quickly enroll into the cluster when (if) it is enabled. In
> +	 * everything else disabled Raft does not affect instance work.
> +	 */
>   	bool is_enabled;
> +	/**
> +	 * Flag whether the node can become a leader. It is an accumulated value
> +	 * of configuration options Raft enabled and Raft candidate. If at least
> +	 * one is false - the instance is not a candidate.
> +	 */
>   	bool is_candidate;
> +	/** Flag whether the instance is allowed to be a leader. */
> +	bool is_cfg_candidate;
> +	/**
> +	 * Flag whether Raft currently tries to write something into WAL. It
> +	 * happens asynchronously, not right after Raft state is updated.
> +	 */
> +	bool is_write_in_progress;
> +	/**
> +	 * Persisted Raft state. These values are used when need to tell current
> +	 * Raft state to other nodes.
> +	 */
>   	uint64_t term;
>   	uint32_t vote;
> +	/**
> +	 * Bit 1 on position N means that a vote from instance with ID = N was
> +	 * obtained.
> +	 */
> +	uint32_t vote_mask;
> +	/** Number of votes for this instance. Valid only in candidate state. */
> +	int vote_count;
> +	/** State machine timed event trigger. */
> +	struct ev_timer timer;
> +	/** Worker fiber to execute blocking tasks like IO. */
> +	struct fiber *worker;
> +	/** Configured election timeout in seconds. */
>   	double election_timeout;
>   };
>   
>   extern struct raft raft;
>   
> +/**
> + * A flag whether the instance is read-only according to Raft. Even if Raft
> + * allows writes though, it does not mean the instance is writable. It can be
> + * affected by box.cfg.read_only, connection quorum.
> + */
> +static inline bool
> +raft_is_ro(void)
> +{
> +	return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
> +}
> +
> +/** See if the instance can accept rows from an instance with the given ID. */
> +static inline bool
> +raft_is_source_allowed(uint32_t source_id)
> +{
> +	return !raft.is_enabled || raft.leader == source_id;
> +}
> +
>   /** Check if Raft is enabled. */
>   static inline bool
>   raft_is_enabled(void)
> @@ -78,6 +184,13 @@ raft_process_recovery(const struct raft_request *req);
>   void
>   raft_process_msg(const struct raft_request *req, uint32_t source);
>   
> +/**
> + * Process a heartbeat message from an instance with the given ID. It is used to
> + * watch leader's health and start election when necessary.
> + */
> +void
> +raft_process_heartbeat(uint32_t source);
> +
>   /** Configure whether Raft is enabled. */
>   void
>   raft_cfg_is_enabled(bool is_enabled);
> @@ -130,6 +243,10 @@ raft_serialize_for_disk(struct raft_request *req);
>   void
>   raft_broadcast(const struct raft_request *req);
>   
> +/** Initialize Raft global data structures. */
> +void
> +raft_init(void);
> +
>   #if defined(__cplusplus)
>   }
>   #endif
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 74581db9c..d63711600 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -771,6 +771,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
>   		relay_send(relay, row);
>   }
>   
> +/**
> + * Recreate recovery cursor from the last confirmed point. That is
> + * used by Raft, when the node becomes a leader. It may happen,
> + * that it already sent some data to other nodes as a follower,
> + * and they ignored the data. Now when the node is a leader, it
> + * should send the not confirmed data again. Otherwise the cluster
> + * will stuck, or worse - the newer data would be sent without the
> + * older sent but ignored data.
> + */
> +static void
> +relay_restart_recovery(struct relay *relay)
> +{
> +	recovery_delete(relay->r);
> +	relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock);
> +	recover_remaining_wals(relay->r, &relay->stream, NULL, true);
> +}
> +
>   struct relay_raft_msg {
>   	struct cmsg base;
>   	struct cmsg_hop route;
> @@ -786,7 +803,14 @@ relay_raft_msg_push(struct cmsg *base)
>   	struct xrow_header row;
>   	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
>   	try {
> +		/*
> +		 * Send the message before restarting the recovery. Otherwise
> +		 * all the rows would be sent from under a non-leader role and
> +		 * would be ignored again.
> +		 */
>   		relay_send(msg->relay, &row);
> +		if (msg->req.state == RAFT_STATE_LEADER)
> +			relay_restart_recovery(msg->relay);
>   	} catch (Exception *e) {
>   		relay_set_error(msg->relay, e);
>   		fiber_cancel(fiber());

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list