[Tarantool-patches] [PATCH v2 09/11] raft: introduce state machine

Serge Petrenko sergepetrenko at tarantool.org
Wed Sep 23 12:59:07 MSK 2020


23.09.2020 01:48, Vladislav Shpilevoy пишет:
> Consider a new pack of fixes on top of the branch.
>
> ====================
>      [tosquash] raft: lots of amendments
>      
>      * Raft request can't be sent during final join, so its skipping is
>        removed from there;
>      
>      * Raft state broadcast became asynchronous (done in the worker
>        fiber). The motivation here is to be able to collect multiple
>        updates in one event loop iteration and send them in one batch.
>        Not caring if broadcast is expensive to call multiple times in
>        one function or not.
>      
>      * Raft messages now contain complete state of the sender: its
>        role, term, vote (if not 0), vclock (if candidate). That allows
>        to simplify a lot of code sacrificing saving a couple of bytes
>        in the network.
>      
>      * raft_process_msg() does protocol validation.
>      
>      * Log messages about vote request being skipped are reworded and
>        reordered. So as to make them less scary when their skip is
>        totally fine. Also the comments about skips are removed, since
>        they just duplicated the log messages.
>      
>      * Fixed a bug when leader could disable Raft, but it still was
>        considered a leader by the other nodes. Now they see it and
>        start a new election.
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 10186ab91..7686d6cbc 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -301,8 +301,6 @@ apply_final_join_row(struct xrow_header *row)
>   	 */
>   	if (iproto_type_is_synchro_request(row->type))
>   		return 0;
> -	if (iproto_type_is_raft_request(row->type))
> -		return 0;
>   	struct txn *txn = txn_begin();
>   	if (txn == NULL)
>   		return -1;
> @@ -895,8 +893,7 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>   	struct vclock candidate_clock;
>   	if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
>   		return -1;
> -	raft_process_msg(&req, applier->instance_id);
> -	return 0;
> +	return raft_process_msg(&req, applier->instance_id);
>   }
>   
>   /**
> diff --git a/src/box/raft.c b/src/box/raft.c
> index f712887a1..ce90ed533 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -61,6 +61,7 @@ struct raft raft = {
>   	.is_candidate = false,
>   	.is_cfg_candidate = false,
>   	.is_write_in_progress = false,
> +	.is_broadcast_scheduled = false,
>   	.term = 1,
>   	.vote = 0,
>   	.vote_mask = 0,
> @@ -177,16 +178,9 @@ raft_election_quorum(void)
>   	return MIN(replication_synchro_quorum, replicaset.registered_count);
>   }
>   
> -/** Broadcast an event about this node changed its state to all relays. */
> -static inline void
> -raft_broadcast_new_state(void)
> -{
> -	struct raft_request req;
> -	memset(&req, 0, sizeof(req));
> -	req.term = raft.term;
> -	req.state = raft.state;
> -	raft_broadcast(&req);
> -}
> +/** Schedule broadcast of the complete Raft state to all the followers. */
> +static void
> +raft_schedule_broadcast(void);
>   
>   /** Raft state machine methods. 'sm' stands for State Machine. */
>   
> @@ -324,80 +318,79 @@ raft_process_recovery(const struct raft_request *req)
>   	assert(!raft_is_enabled());
>   }
>   
> -void
> +int
>   raft_process_msg(const struct raft_request *req, uint32_t source)
>   {
>   	say_info("RAFT: message %s from %u", raft_request_to_string(req),
>   		 source);
>   	assert(source > 0);
>   	assert(source != instance_id);
> +	if (req->term == 0 || req->state == 0) {
> +		diag_set(ClientError, ER_PROTOCOL, "Raft term and state can't "
> +			 "be zero");
> +		return -1;
> +	}
> +	if (req->state == RAFT_STATE_CANDIDATE &&
> +	    (req->vote != source || req->vclock == NULL)) {
> +		diag_set(ClientError, ER_PROTOCOL, "Candidate should always "
> +			 "vote for self and provide its vclock");
> +		return -1;
> +	}
>   	/* Outdated request. */
>   	if (req->term < raft.volatile_term) {
>   		say_info("RAFT: the message is ignored due to outdated term - "
>   			 "current term is %u", raft.volatile_term);
> -		return;
> +		return 0;
>   	}
>   
> -	enum raft_state old_state = raft.state;
> -
>   	/* Term bump. */
>   	if (req->term > raft.volatile_term)
>   		raft_sm_schedule_new_term(req->term);
> -
> -	/* Vote request during the on-going election. */
> +	/*
> +	 * Either a vote request during an on-going election. Or an old vote
> +	 * persisted long time ago and still broadcasted. Or a vote response.
> +	 */
>   	if (req->vote != 0) {
>   		switch (raft.state) {
>   		case RAFT_STATE_FOLLOWER:
>   		case RAFT_STATE_LEADER:
> -			/*
> -			 * Can't respond on vote requests when Raft is disabled.
> -			 */
>   			if (!raft.is_enabled) {
>   				say_info("RAFT: vote request is skipped - RAFT "
>   					 "is disabled");
>   				break;
>   			}
> -			/* Check if already voted in this term. */
> -			if (raft.volatile_vote != 0) {
> -				say_info("RAFT: vote request is skipped - "
> -					 "already voted in this term");
> +			if (raft.leader != 0) {
> +				say_info("RAFT: vote request is skipped - the "
> +					 "leader is already known - %u",
> +					 raft.leader);
>   				break;
>   			}
> -			/* Not a candidate. Can't accept votes. */
>   			if (req->vote == instance_id) {
> +				/*
> +				 * This is entirely valid. This instance could
> +				 * request a vote, then become a follower or
> +				 * leader, and then get the response.
> +				 */
>   				say_info("RAFT: vote request is skipped - "
>   					 "can't accept vote for self if not a "
>   					 "candidate");
>   				break;
>   			}
> -			/* Can't vote when vclock is unknown. */
> -			if (req->vclock == NULL) {
> -				say_info("RAFT: vote request is skipped - "
> -					 "missing candidate vclock.");
> -				break;
> -			}
> -			/* Can't vote for too old or incomparable nodes. */
> -			if (!raft_can_vote_for(req->vclock)) {
> +			if (req->state != RAFT_STATE_CANDIDATE) {
>   				say_info("RAFT: vote request is skipped - "
> -					 "the vclock is not acceptable = %s",
> -					 vclock_to_string(req->vclock));
> +					 "this is a notification about a vote "
> +					 "for a third node, not a request");
>   				break;
>   			}
> -			/*
> -			 * Check if somebody is asking to vote for a third
> -			 * node - nope. Make votes only when asked directly by
> -			 * the new candidate. However that restriction may be
> -			 * relaxed in future, if can be proven to be safe.
> -			 */
> -			if (req->vote != source) {
> +			if (raft.volatile_vote != 0) {
>   				say_info("RAFT: vote request is skipped - "
> -					 "indirect votes are not allowed");
> +					 "already voted in this term");
>   				break;
>   			}
> -			if (raft.leader != 0) {
> +			/* Vclock is not NULL, validated above. */
> +			if (!raft_can_vote_for(req->vclock)) {
>   				say_info("RAFT: vote request is skipped - the "
> -					 "leader is already known - %u",
> -					 raft.leader);
> +					 "vclock is not acceptable");
>   				break;


Are you sure we want these messages on `info` level? Info is the default 
log level and

RAFT spams the log with quite a lot of messages. Maybe `verbose` level 
would be better?


>   			}
>   			/*
> @@ -433,15 +426,17 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
>   			unreachable();
>   		}
>   	}
> -	/*
> -	 * If the node does not claim to be a leader, nothing interesting. Terms
> -	 * and votes are already handled.
> -	 */
> -	if (req->state != RAFT_STATE_LEADER)
> -		goto end;
> +	if (req->state != RAFT_STATE_LEADER) {
> +		if (source == raft.leader) {
> +			say_info("RAFT: the node %u has resigned from the "
> +				 "leader role", raft.leader);
> +			raft_sm_schedule_new_election();
> +		}
> +		return 0;
> +	}
>   	/* The node is a leader, but it is already known. */
>   	if (source == raft.leader)
> -		goto end;
> +		return 0;
>   	/*
>   	 * XXX: A message from a conflicting leader. Split brain, basically.
>   	 * Need to decide what to do. Current solution is to do nothing. In
> @@ -451,20 +446,12 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
>   	if (raft.leader != 0) {
>   		say_warn("RAFT: conflicting leader detected in one term - "
>   			 "known is %u, received %u", raft.leader, source);
> -		goto end;
> +		return 0;
>   	}
>   
>   	/* New leader was elected. */
>   	raft_sm_follow_leader(source);
> -end:
> -	if (raft.state != old_state) {
> -		/*
> -		 * New term and vote are not broadcasted yet. Firstly their WAL
> -		 * write should be finished. But the state is volatile. It is ok
> -		 * to broadcast it now.
> -		 */
> -		raft_broadcast_new_state();
> -	}
> +	return 0;
>   }
>   
>   void
> @@ -558,6 +545,7 @@ fail:
>   	panic("Could not write a raft request to WAL\n");
>   }
>   
> +/* Dump Raft state to WAL in a blocking way. */
>   static void
>   raft_worker_handle_io(void)
>   {
> @@ -565,9 +553,6 @@ raft_worker_handle_io(void)
>   	/* During write Raft can't be anything but a follower. */
>   	assert(raft.state == RAFT_STATE_FOLLOWER);
>   	struct raft_request req;
> -	uint64_t old_term = raft.term;
> -	uint32_t old_vote = raft.vote;
> -	enum raft_state old_state = raft.state;
>   
>   	if (raft_is_fully_on_disk()) {
>   end_dump:
> @@ -604,71 +589,61 @@ end_dump:
>   	} else {
>   		memset(&req, 0, sizeof(req));
>   		assert(raft.volatile_term >= raft.term);
> -		/* Term is written always. */
>   		req.term = raft.volatile_term;
> -		if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote)
> -			req.vote = raft.volatile_vote;
> +		req.vote = raft.volatile_vote;
>   
>   		raft_write_request(&req);
> -		say_verbose("RAFT: persist and apply state %s",
> -			    raft_request_to_string(&req));
> +		say_info("RAFT: persisted state %s",
> +			 raft_request_to_string(&req));
>   
>   		assert(req.term >= raft.term);
> -		if (req.term > raft.term) {
> -			raft.term = req.term;
> -			raft.vote = 0;
> -		}
> -		if (req.vote != 0) {
> -			assert(raft.vote == 0);
> -			raft.vote = req.vote;
> -		}
> +		raft.term = req.term;
> +		raft.vote = req.vote;
> +		/*
> +		 * Persistent state is visible, and it was changed - broadcast.
> +		 */
> +		raft_schedule_broadcast();
>   		if (raft_is_fully_on_disk())
>   			goto end_dump;
>   	}
> +}
>   
> +/* Broadcast Raft complete state to the followers. */
> +static void
> +raft_worker_handle_broadcast(void)
> +{
> +	assert(raft.is_broadcast_scheduled);
> +	struct raft_request req;
>   	memset(&req, 0, sizeof(req));
> -	/* Term is encoded always. */
>   	req.term = raft.term;
> -	bool has_changes = old_term != raft.term;
> -	if (raft.vote != 0 && old_vote != raft.vote) {
> -		req.vote = raft.vote;
> -		/*
> -		 * When vote for self, need to send current vclock too. Two
> -		 * reasons for that:
> -		 *
> -		 * - nodes need to vote for the instance containing the newest
> -		 *   data. So as not to loose it, because some of it may be
> -		 *   confirmed by the synchronous replication;
> -		 *
> -		 * - replication is basically stopped during election. Other
> -		 *   nodes can't learn vclock of this instance through regular
> -		 *   replication.
> -		 */
> -		if (raft.vote == instance_id)
> -			req.vclock = &replicaset.vclock;
> -		has_changes = true;
> -	}
> -	if (raft.state != old_state) {
> -		req.state = raft.state;
> -		has_changes = true;
> +	req.vote = raft.vote;
> +	req.state = raft.state;
> +	if (req.state == RAFT_STATE_CANDIDATE) {
> +		assert(raft.vote == instance_id);
> +		req.vclock = &replicaset.vclock;
>   	}
> -	if (has_changes)
> -		raft_broadcast(&req);
> +	replicaset_foreach(replica)
> +		relay_push_raft(replica->relay, &req);
> +	raft.is_broadcast_scheduled = false;
>   }
>   
>   static int
>   raft_worker_f(va_list args)
>   {
>   	(void)args;
> +	bool is_idle = true;
>   	while (!fiber_is_cancelled()) {
> -		if (!raft.is_write_in_progress)
> -			goto idle;
> -		raft_worker_handle_io();
> -		if (!raft.is_write_in_progress)
> -			goto idle;
> +		if (raft.is_write_in_progress) {
> +			raft_worker_handle_io();
> +			is_idle = false;
> +		}
> +		if (raft.is_broadcast_scheduled) {
> +			raft_worker_handle_broadcast();
> +			is_idle = false;
> +		}
>   		fiber_sleep(0);
> -		continue;
> -	idle:
> +		if (!is_idle)
> +			continue;
>   		assert(raft_is_fully_on_disk());
>   		fiber_yield();
>   	}
> @@ -702,6 +677,8 @@ raft_sm_become_leader(void)
>   	ev_timer_stop(loop(), &raft.timer);
>   	/* Make read-write (if other subsystems allow that. */
>   	box_update_ro_summary();
> +	/* State is visible and it is changed - broadcast. */
> +	raft_schedule_broadcast();
>   }
>   
>   static void
> @@ -712,10 +689,12 @@ raft_sm_follow_leader(uint32_t leader)
>   	assert(raft.leader == 0);
>   	raft.state = RAFT_STATE_FOLLOWER;
>   	raft.leader = leader;
> -	if (!raft.is_write_in_progress) {
> +	if (!raft.is_write_in_progress && raft.is_candidate) {
>   		ev_timer_stop(loop(), &raft.timer);
>   		raft_sm_wait_leader_dead();
>   	}
> +	/* State is visible and it is changed - broadcast. */
> +	raft_schedule_broadcast();
>   }
>   
>   static void
> @@ -724,6 +703,7 @@ raft_sm_become_candidate(void)
>   	say_info("RAFT: enter candidate state with 1 self vote");
>   	assert(raft.state == RAFT_STATE_FOLLOWER);
>   	assert(raft.leader == 0);
> +	assert(raft.vote == instance_id);
>   	assert(raft.is_candidate);
>   	assert(!raft.is_write_in_progress);
>   	assert(raft_election_quorum() > 1);
> @@ -732,6 +712,8 @@ raft_sm_become_candidate(void)
>   	raft.vote_mask = 0;
>   	bit_set(&raft.vote_mask, instance_id);
>   	raft_sm_wait_election_end();
> +	/* State is visible and it is changed - broadcast. */
> +	raft_schedule_broadcast();
>   }
>   
>   static void
> @@ -747,6 +729,12 @@ raft_sm_schedule_new_term(uint64_t new_term)
>   	raft.state = RAFT_STATE_FOLLOWER;
>   	box_update_ro_summary();
>   	raft_sm_pause_and_dump();
> +	/*
> +	 * State is visible and it is changed - broadcast. Term is also visible,
> +	 * but only persistent term. Volatile term is not broadcasted until
> +	 * saved to disk.
> +	 */
> +	raft_schedule_broadcast();
>   }
>   
>   static void
> @@ -758,6 +746,7 @@ raft_sm_schedule_new_vote(uint32_t new_vote)
>   	assert(raft.state == RAFT_STATE_FOLLOWER);
>   	raft.volatile_vote = new_vote;
>   	raft_sm_pause_and_dump();
> +	/* Nothing visible is changed - no broadcast. */
>   }
>   
>   static void
> @@ -828,13 +817,6 @@ raft_sm_start(void)
>   	else
>   		raft_sm_schedule_new_election();
>   	box_update_ro_summary();
> -	/*
> -	 * When Raft is enabled, send the complete state. Because
> -	 * it wasn't sent in disabled state.
> -	 */
> -	struct raft_request req;
> -	raft_serialize_for_network(&req, NULL);
> -	raft_broadcast(&req);
>   }
>   
>   static void
> @@ -848,6 +830,8 @@ raft_sm_stop(void)
>   		raft.leader = 0;
>   	raft.state = RAFT_STATE_FOLLOWER;
>   	box_update_ro_summary();
> +	/* State is visible and changed - broadcast. */
> +	raft_schedule_broadcast();
>   }
>   
>   void
> @@ -912,7 +896,8 @@ raft_cfg_is_candidate(bool is_candidate)
>   		if (raft.state == RAFT_STATE_LEADER)
>   			raft.leader = 0;
>   		raft.state = RAFT_STATE_FOLLOWER;
> -		raft_broadcast_new_state();
> +		/* State is visible and changed - broadcast. */
> +		raft_schedule_broadcast();
>   	}
>   	box_update_ro_summary();
>   }
> @@ -943,7 +928,6 @@ raft_cfg_election_quorum(void)
>   	if (raft.vote_count < raft_election_quorum())
>   		return;
>   	raft_sm_become_leader();
> -	raft_broadcast_new_state();
>   }
>   
>   void
> @@ -961,11 +945,20 @@ raft_cfg_death_timeout(void)
>   	}
>   }
>   
> -void
> -raft_broadcast(const struct raft_request *req)
> +static void
> +raft_schedule_broadcast(void)
>   {
> -	replicaset_foreach(replica)
> -		relay_push_raft(replica->relay, req);
> +	raft.is_broadcast_scheduled = true;
> +	/*
> +	 * Don't wake the fiber if it writes something. Otherwise it would be a
> +	 * spurious wakeup breaking the WAL write not adapted to this.
> +	 */
> +	if (raft.is_write_in_progress)
> +		return;
> +	if (raft.worker == NULL)
> +		raft.worker = fiber_new("raft_worker", raft_worker_f);
> +	if (raft.worker != fiber())
> +		fiber_wakeup(raft.worker);
>   }
>   
>   void
> diff --git a/src/box/raft.h b/src/box/raft.h
> index 23aedfe10..8c1cf467f 100644
> --- a/src/box/raft.h
> +++ b/src/box/raft.h
> @@ -124,6 +124,13 @@ struct raft {
>   	 * happens asynchronously, not right after Raft state is updated.
>   	 */
>   	bool is_write_in_progress;
> +	/**
> +	 * Flag whether Raft wants to broadcast its state. It is done
> +	 * asynchronously in the worker fiber. That allows to collect multiple
> +	 * updates into one batch if they happen in one event loop iteration.
> +	 * Usually even in one function.
> +	 */
> +	bool is_broadcast_scheduled;
>   	/**
>   	 * Persisted Raft state. These values are used when need to tell current
>   	 * Raft state to other nodes.
> @@ -181,7 +188,7 @@ raft_process_recovery(const struct raft_request *req);
>    * @param req Raft request.
>    * @param source Instance ID of the message sender.
>    */
> -void
> +int
>   raft_process_msg(const struct raft_request *req, uint32_t source);
>   
>   /**
> @@ -236,13 +243,6 @@ raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
>   void
>   raft_serialize_for_disk(struct raft_request *req);
>   
> -/**
> - * Broadcast the changes in this instance's raft status to all
> - * the followers.
> - */
> -void
> -raft_broadcast(const struct raft_request *req);
> -
>   /** Initialize Raft global data structures. */
>   void
>   raft_init(void);

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list