[Tarantool-patches] [PATCH v2 09/11] raft: introduce state machine

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Sep 23 01:48:50 MSK 2020


Consider a new pack of fixes on top of the branch.

====================
    [tosquash] raft: lots of amendments
    
    * Raft request can't be sent during final join, so its skipping is
      removed from there;
    
    * Raft state broadcast became asynchronous (done in the worker
      fiber). The motivation here is to be able to collect multiple
      updates in one event loop iteration and send them in one batch.
      Not caring if broadcast is expensive to call multiple times in
      one function or not.
    
    * Raft messages now contain complete state of the sender: its
      role, term, vote (if not 0), vclock (if candidate). That allows
      to simplify a lot of code sacrificing saving a couple of bytes
      in the network.
    
    * raft_process_msg() does protocol validation.
    
    * Log messages about vote request being skipped are reworded and
      reordered. So as to make them less scary when their skip is
      totally fine. Also the comments about skips are removed, since
      they just duplicated the log messages.
    
    * Fixed a bug when leader could disable Raft, but it still was
      considered a leader by the other nodes. Now they see it and
      start a new election.

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 10186ab91..7686d6cbc 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -301,8 +301,6 @@ apply_final_join_row(struct xrow_header *row)
 	 */
 	if (iproto_type_is_synchro_request(row->type))
 		return 0;
-	if (iproto_type_is_raft_request(row->type))
-		return 0;
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
 		return -1;
@@ -895,8 +893,7 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
 	struct vclock candidate_clock;
 	if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
 		return -1;
-	raft_process_msg(&req, applier->instance_id);
-	return 0;
+	return raft_process_msg(&req, applier->instance_id);
 }
 
 /**
diff --git a/src/box/raft.c b/src/box/raft.c
index f712887a1..ce90ed533 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -61,6 +61,7 @@ struct raft raft = {
 	.is_candidate = false,
 	.is_cfg_candidate = false,
 	.is_write_in_progress = false,
+	.is_broadcast_scheduled = false,
 	.term = 1,
 	.vote = 0,
 	.vote_mask = 0,
@@ -177,16 +178,9 @@ raft_election_quorum(void)
 	return MIN(replication_synchro_quorum, replicaset.registered_count);
 }
 
-/** Broadcast an event about this node changed its state to all relays. */
-static inline void
-raft_broadcast_new_state(void)
-{
-	struct raft_request req;
-	memset(&req, 0, sizeof(req));
-	req.term = raft.term;
-	req.state = raft.state;
-	raft_broadcast(&req);
-}
+/** Schedule broadcast of the complete Raft state to all the followers. */
+static void
+raft_schedule_broadcast(void);
 
 /** Raft state machine methods. 'sm' stands for State Machine. */
 
@@ -324,80 +318,79 @@ raft_process_recovery(const struct raft_request *req)
 	assert(!raft_is_enabled());
 }
 
-void
+int
 raft_process_msg(const struct raft_request *req, uint32_t source)
 {
 	say_info("RAFT: message %s from %u", raft_request_to_string(req),
 		 source);
 	assert(source > 0);
 	assert(source != instance_id);
+	if (req->term == 0 || req->state == 0) {
+		diag_set(ClientError, ER_PROTOCOL, "Raft term and state can't "
+			 "be zero");
+		return -1;
+	}
+	if (req->state == RAFT_STATE_CANDIDATE &&
+	    (req->vote != source || req->vclock == NULL)) {
+		diag_set(ClientError, ER_PROTOCOL, "Candidate should always "
+			 "vote for self and provide its vclock");
+		return -1;
+	}
 	/* Outdated request. */
 	if (req->term < raft.volatile_term) {
 		say_info("RAFT: the message is ignored due to outdated term - "
 			 "current term is %u", raft.volatile_term);
-		return;
+		return 0;
 	}
 
-	enum raft_state old_state = raft.state;
-
 	/* Term bump. */
 	if (req->term > raft.volatile_term)
 		raft_sm_schedule_new_term(req->term);
-
-	/* Vote request during the on-going election. */
+	/*
+	 * Either a vote request during an on-going election. Or an old vote
+	 * persisted long time ago and still broadcasted. Or a vote response.
+	 */
 	if (req->vote != 0) {
 		switch (raft.state) {
 		case RAFT_STATE_FOLLOWER:
 		case RAFT_STATE_LEADER:
-			/*
-			 * Can't respond on vote requests when Raft is disabled.
-			 */
 			if (!raft.is_enabled) {
 				say_info("RAFT: vote request is skipped - RAFT "
 					 "is disabled");
 				break;
 			}
-			/* Check if already voted in this term. */
-			if (raft.volatile_vote != 0) {
-				say_info("RAFT: vote request is skipped - "
-					 "already voted in this term");
+			if (raft.leader != 0) {
+				say_info("RAFT: vote request is skipped - the "
+					 "leader is already known - %u",
+					 raft.leader);
 				break;
 			}
-			/* Not a candidate. Can't accept votes. */
 			if (req->vote == instance_id) {
+				/*
+				 * This is entirely valid. This instance could
+				 * request a vote, then become a follower or
+				 * leader, and then get the response.
+				 */
 				say_info("RAFT: vote request is skipped - "
 					 "can't accept vote for self if not a "
 					 "candidate");
 				break;
 			}
-			/* Can't vote when vclock is unknown. */
-			if (req->vclock == NULL) {
-				say_info("RAFT: vote request is skipped - "
-					 "missing candidate vclock.");
-				break;
-			}
-			/* Can't vote for too old or incomparable nodes. */
-			if (!raft_can_vote_for(req->vclock)) {
+			if (req->state != RAFT_STATE_CANDIDATE) {
 				say_info("RAFT: vote request is skipped - "
-					 "the vclock is not acceptable = %s",
-					 vclock_to_string(req->vclock));
+					 "this is a notification about a vote "
+					 "for a third node, not a request");
 				break;
 			}
-			/*
-			 * Check if somebody is asking to vote for a third
-			 * node - nope. Make votes only when asked directly by
-			 * the new candidate. However that restriction may be
-			 * relaxed in future, if can be proven to be safe.
-			 */
-			if (req->vote != source) {
+			if (raft.volatile_vote != 0) {
 				say_info("RAFT: vote request is skipped - "
-					 "indirect votes are not allowed");
+					 "already voted in this term");
 				break;
 			}
-			if (raft.leader != 0) {
+			/* Vclock is not NULL, validated above. */
+			if (!raft_can_vote_for(req->vclock)) {
 				say_info("RAFT: vote request is skipped - the "
-					 "leader is already known - %u",
-					 raft.leader);
+					 "vclock is not acceptable");
 				break;
 			}
 			/*
@@ -433,15 +426,17 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
 			unreachable();
 		}
 	}
-	/*
-	 * If the node does not claim to be a leader, nothing interesting. Terms
-	 * and votes are already handled.
-	 */
-	if (req->state != RAFT_STATE_LEADER)
-		goto end;
+	if (req->state != RAFT_STATE_LEADER) {
+		if (source == raft.leader) {
+			say_info("RAFT: the node %u has resigned from the "
+				 "leader role", raft.leader);
+			raft_sm_schedule_new_election();
+		}
+		return 0;
+	}
 	/* The node is a leader, but it is already known. */
 	if (source == raft.leader)
-		goto end;
+		return 0;
 	/*
 	 * XXX: A message from a conflicting leader. Split brain, basically.
 	 * Need to decide what to do. Current solution is to do nothing. In
@@ -451,20 +446,12 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
 	if (raft.leader != 0) {
 		say_warn("RAFT: conflicting leader detected in one term - "
 			 "known is %u, received %u", raft.leader, source);
-		goto end;
+		return 0;
 	}
 
 	/* New leader was elected. */
 	raft_sm_follow_leader(source);
-end:
-	if (raft.state != old_state) {
-		/*
-		 * New term and vote are not broadcasted yet. Firstly their WAL
-		 * write should be finished. But the state is volatile. It is ok
-		 * to broadcast it now.
-		 */
-		raft_broadcast_new_state();
-	}
+	return 0;
 }
 
 void
@@ -558,6 +545,7 @@ fail:
 	panic("Could not write a raft request to WAL\n");
 }
 
+/* Dump Raft state to WAL in a blocking way. */
 static void
 raft_worker_handle_io(void)
 {
@@ -565,9 +553,6 @@ raft_worker_handle_io(void)
 	/* During write Raft can't be anything but a follower. */
 	assert(raft.state == RAFT_STATE_FOLLOWER);
 	struct raft_request req;
-	uint64_t old_term = raft.term;
-	uint32_t old_vote = raft.vote;
-	enum raft_state old_state = raft.state;
 
 	if (raft_is_fully_on_disk()) {
 end_dump:
@@ -604,71 +589,61 @@ end_dump:
 	} else {
 		memset(&req, 0, sizeof(req));
 		assert(raft.volatile_term >= raft.term);
-		/* Term is written always. */
 		req.term = raft.volatile_term;
-		if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote)
-			req.vote = raft.volatile_vote;
+		req.vote = raft.volatile_vote;
 
 		raft_write_request(&req);
-		say_verbose("RAFT: persist and apply state %s",
-			    raft_request_to_string(&req));
+		say_info("RAFT: persisted state %s",
+			 raft_request_to_string(&req));
 
 		assert(req.term >= raft.term);
-		if (req.term > raft.term) {
-			raft.term = req.term;
-			raft.vote = 0;
-		}
-		if (req.vote != 0) {
-			assert(raft.vote == 0);
-			raft.vote = req.vote;
-		}
+		raft.term = req.term;
+		raft.vote = req.vote;
+		/*
+		 * Persistent state is visible, and it was changed - broadcast.
+		 */
+		raft_schedule_broadcast();
 		if (raft_is_fully_on_disk())
 			goto end_dump;
 	}
+}
 
+/* Broadcast Raft complete state to the followers. */
+static void
+raft_worker_handle_broadcast(void)
+{
+	assert(raft.is_broadcast_scheduled);
+	struct raft_request req;
 	memset(&req, 0, sizeof(req));
-	/* Term is encoded always. */
 	req.term = raft.term;
-	bool has_changes = old_term != raft.term;
-	if (raft.vote != 0 && old_vote != raft.vote) {
-		req.vote = raft.vote;
-		/*
-		 * When vote for self, need to send current vclock too. Two
-		 * reasons for that:
-		 *
-		 * - nodes need to vote for the instance containing the newest
-		 *   data. So as not to loose it, because some of it may be
-		 *   confirmed by the synchronous replication;
-		 *
-		 * - replication is basically stopped during election. Other
-		 *   nodes can't learn vclock of this instance through regular
-		 *   replication.
-		 */
-		if (raft.vote == instance_id)
-			req.vclock = &replicaset.vclock;
-		has_changes = true;
-	}
-	if (raft.state != old_state) {
-		req.state = raft.state;
-		has_changes = true;
+	req.vote = raft.vote;
+	req.state = raft.state;
+	if (req.state == RAFT_STATE_CANDIDATE) {
+		assert(raft.vote == instance_id);
+		req.vclock = &replicaset.vclock;
 	}
-	if (has_changes)
-		raft_broadcast(&req);
+	replicaset_foreach(replica)
+		relay_push_raft(replica->relay, &req);
+	raft.is_broadcast_scheduled = false;
 }
 
 static int
 raft_worker_f(va_list args)
 {
 	(void)args;
+	bool is_idle = true;
 	while (!fiber_is_cancelled()) {
-		if (!raft.is_write_in_progress)
-			goto idle;
-		raft_worker_handle_io();
-		if (!raft.is_write_in_progress)
-			goto idle;
+		if (raft.is_write_in_progress) {
+			raft_worker_handle_io();
+			is_idle = false;
+		}
+		if (raft.is_broadcast_scheduled) {
+			raft_worker_handle_broadcast();
+			is_idle = false;
+		}
 		fiber_sleep(0);
-		continue;
-	idle:
+		if (!is_idle)
+			continue;
 		assert(raft_is_fully_on_disk());
 		fiber_yield();
 	}
@@ -702,6 +677,8 @@ raft_sm_become_leader(void)
 	ev_timer_stop(loop(), &raft.timer);
 	/* Make read-write (if other subsystems allow that. */
 	box_update_ro_summary();
+	/* State is visible and it is changed - broadcast. */
+	raft_schedule_broadcast();
 }
 
 static void
@@ -712,10 +689,12 @@ raft_sm_follow_leader(uint32_t leader)
 	assert(raft.leader == 0);
 	raft.state = RAFT_STATE_FOLLOWER;
 	raft.leader = leader;
-	if (!raft.is_write_in_progress) {
+	if (!raft.is_write_in_progress && raft.is_candidate) {
 		ev_timer_stop(loop(), &raft.timer);
 		raft_sm_wait_leader_dead();
 	}
+	/* State is visible and it is changed - broadcast. */
+	raft_schedule_broadcast();
 }
 
 static void
@@ -724,6 +703,7 @@ raft_sm_become_candidate(void)
 	say_info("RAFT: enter candidate state with 1 self vote");
 	assert(raft.state == RAFT_STATE_FOLLOWER);
 	assert(raft.leader == 0);
+	assert(raft.vote == instance_id);
 	assert(raft.is_candidate);
 	assert(!raft.is_write_in_progress);
 	assert(raft_election_quorum() > 1);
@@ -732,6 +712,8 @@ raft_sm_become_candidate(void)
 	raft.vote_mask = 0;
 	bit_set(&raft.vote_mask, instance_id);
 	raft_sm_wait_election_end();
+	/* State is visible and it is changed - broadcast. */
+	raft_schedule_broadcast();
 }
 
 static void
@@ -747,6 +729,12 @@ raft_sm_schedule_new_term(uint64_t new_term)
 	raft.state = RAFT_STATE_FOLLOWER;
 	box_update_ro_summary();
 	raft_sm_pause_and_dump();
+	/*
+	 * State is visible and it is changed - broadcast. Term is also visible,
+	 * but only persistent term. Volatile term is not broadcasted until
+	 * saved to disk.
+	 */
+	raft_schedule_broadcast();
 }
 
 static void
@@ -758,6 +746,7 @@ raft_sm_schedule_new_vote(uint32_t new_vote)
 	assert(raft.state == RAFT_STATE_FOLLOWER);
 	raft.volatile_vote = new_vote;
 	raft_sm_pause_and_dump();
+	/* Nothing visible is changed - no broadcast. */
 }
 
 static void
@@ -828,13 +817,6 @@ raft_sm_start(void)
 	else
 		raft_sm_schedule_new_election();
 	box_update_ro_summary();
-	/*
-	 * When Raft is enabled, send the complete state. Because
-	 * it wasn't sent in disabled state.
-	 */
-	struct raft_request req;
-	raft_serialize_for_network(&req, NULL);
-	raft_broadcast(&req);
 }
 
 static void
@@ -848,6 +830,8 @@ raft_sm_stop(void)
 		raft.leader = 0;
 	raft.state = RAFT_STATE_FOLLOWER;
 	box_update_ro_summary();
+	/* State is visible and changed - broadcast. */
+	raft_schedule_broadcast();
 }
 
 void
@@ -912,7 +896,8 @@ raft_cfg_is_candidate(bool is_candidate)
 		if (raft.state == RAFT_STATE_LEADER)
 			raft.leader = 0;
 		raft.state = RAFT_STATE_FOLLOWER;
-		raft_broadcast_new_state();
+		/* State is visible and changed - broadcast. */
+		raft_schedule_broadcast();
 	}
 	box_update_ro_summary();
 }
@@ -943,7 +928,6 @@ raft_cfg_election_quorum(void)
 	if (raft.vote_count < raft_election_quorum())
 		return;
 	raft_sm_become_leader();
-	raft_broadcast_new_state();
 }
 
 void
@@ -961,11 +945,20 @@ raft_cfg_death_timeout(void)
 	}
 }
 
-void
-raft_broadcast(const struct raft_request *req)
+static void
+raft_schedule_broadcast(void)
 {
-	replicaset_foreach(replica)
-		relay_push_raft(replica->relay, req);
+	raft.is_broadcast_scheduled = true;
+	/*
+	 * Don't wake the fiber if it writes something. Otherwise it would be a
+	 * spurious wakeup breaking the WAL write not adapted to this.
+	 */
+	if (raft.is_write_in_progress)
+		return;
+	if (raft.worker == NULL)
+		raft.worker = fiber_new("raft_worker", raft_worker_f);
+	if (raft.worker != fiber())
+		fiber_wakeup(raft.worker);
 }
 
 void
diff --git a/src/box/raft.h b/src/box/raft.h
index 23aedfe10..8c1cf467f 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -124,6 +124,13 @@ struct raft {
 	 * happens asynchronously, not right after Raft state is updated.
 	 */
 	bool is_write_in_progress;
+	/**
+	 * Flag whether Raft wants to broadcast its state. It is done
+	 * asynchronously in the worker fiber. That allows to collect multiple
+	 * updates into one batch if they happen in one event loop iteration.
+	 * Usually even in one function.
+	 */
+	bool is_broadcast_scheduled;
 	/**
 	 * Persisted Raft state. These values are used when need to tell current
 	 * Raft state to other nodes.
@@ -181,7 +188,7 @@ raft_process_recovery(const struct raft_request *req);
  * @param req Raft request.
  * @param source Instance ID of the message sender.
  */
-void
+int
 raft_process_msg(const struct raft_request *req, uint32_t source);
 
 /**
@@ -236,13 +243,6 @@ raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
 void
 raft_serialize_for_disk(struct raft_request *req);
 
-/**
- * Broadcast the changes in this instance's raft status to all
- * the followers.
- */
-void
-raft_broadcast(const struct raft_request *req);
-
 /** Initialize Raft global data structures. */
 void
 raft_init(void);


More information about the Tarantool-patches mailing list