[Tarantool-patches] [PATCH v2 09/11] raft: introduce state machine
Serge Petrenko
sergepetrenko at tarantool.org
Wed Sep 23 12:59:07 MSK 2020
23.09.2020 01:48, Vladislav Shpilevoy пишет:
> Consider a new pack of fixes on top of the branch.
>
> ====================
> [tosquash] raft: lots of amendments
>
> * Raft request can't be sent during final join, so its skipping is
> removed from there;
>
> * Raft state broadcast became asynchronous (done in the worker
> fiber). The motivation here is to be able to collect multiple
> updates in one event loop iteration and send them in one batch.
> Not caring if broadcast is expensive to call multiple times in
> one function or not.
>
> * Raft messages now contain complete state of the sender: its
> role, term, vote (if not 0), vclock (if candidate). That allows
> to simplify a lot of code sacrificing saving a couple of bytes
> in the network.
>
> * raft_process_msg() does protocol validation.
>
> * Log messages about vote request being skipped are reworded and
> reordered. So as to make them less scary when their skip is
> totally fine. Also the comments about skips are removed, since
> they just duplicated the log messages.
>
> * Fixed a bug when leader could disable Raft, but it still was
> considered a leader by the other nodes. Now they see it and
> start a new election.
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 10186ab91..7686d6cbc 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -301,8 +301,6 @@ apply_final_join_row(struct xrow_header *row)
> */
> if (iproto_type_is_synchro_request(row->type))
> return 0;
> - if (iproto_type_is_raft_request(row->type))
> - return 0;
> struct txn *txn = txn_begin();
> if (txn == NULL)
> return -1;
> @@ -895,8 +893,7 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
> struct vclock candidate_clock;
> if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
> return -1;
> - raft_process_msg(&req, applier->instance_id);
> - return 0;
> + return raft_process_msg(&req, applier->instance_id);
> }
>
> /**
> diff --git a/src/box/raft.c b/src/box/raft.c
> index f712887a1..ce90ed533 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -61,6 +61,7 @@ struct raft raft = {
> .is_candidate = false,
> .is_cfg_candidate = false,
> .is_write_in_progress = false,
> + .is_broadcast_scheduled = false,
> .term = 1,
> .vote = 0,
> .vote_mask = 0,
> @@ -177,16 +178,9 @@ raft_election_quorum(void)
> return MIN(replication_synchro_quorum, replicaset.registered_count);
> }
>
> -/** Broadcast an event about this node changed its state to all relays. */
> -static inline void
> -raft_broadcast_new_state(void)
> -{
> - struct raft_request req;
> - memset(&req, 0, sizeof(req));
> - req.term = raft.term;
> - req.state = raft.state;
> - raft_broadcast(&req);
> -}
> +/** Schedule broadcast of the complete Raft state to all the followers. */
> +static void
> +raft_schedule_broadcast(void);
>
> /** Raft state machine methods. 'sm' stands for State Machine. */
>
> @@ -324,80 +318,79 @@ raft_process_recovery(const struct raft_request *req)
> assert(!raft_is_enabled());
> }
>
> -void
> +int
> raft_process_msg(const struct raft_request *req, uint32_t source)
> {
> say_info("RAFT: message %s from %u", raft_request_to_string(req),
> source);
> assert(source > 0);
> assert(source != instance_id);
> + if (req->term == 0 || req->state == 0) {
> + diag_set(ClientError, ER_PROTOCOL, "Raft term and state can't "
> + "be zero");
> + return -1;
> + }
> + if (req->state == RAFT_STATE_CANDIDATE &&
> + (req->vote != source || req->vclock == NULL)) {
> + diag_set(ClientError, ER_PROTOCOL, "Candidate should always "
> + "vote for self and provide its vclock");
> + return -1;
> + }
> /* Outdated request. */
> if (req->term < raft.volatile_term) {
> say_info("RAFT: the message is ignored due to outdated term - "
> "current term is %u", raft.volatile_term);
> - return;
> + return 0;
> }
>
> - enum raft_state old_state = raft.state;
> -
> /* Term bump. */
> if (req->term > raft.volatile_term)
> raft_sm_schedule_new_term(req->term);
> -
> - /* Vote request during the on-going election. */
> + /*
> + * Either a vote request during an on-going election. Or an old vote
> + * persisted long time ago and still broadcasted. Or a vote response.
> + */
> if (req->vote != 0) {
> switch (raft.state) {
> case RAFT_STATE_FOLLOWER:
> case RAFT_STATE_LEADER:
> - /*
> - * Can't respond on vote requests when Raft is disabled.
> - */
> if (!raft.is_enabled) {
> say_info("RAFT: vote request is skipped - RAFT "
> "is disabled");
> break;
> }
> - /* Check if already voted in this term. */
> - if (raft.volatile_vote != 0) {
> - say_info("RAFT: vote request is skipped - "
> - "already voted in this term");
> + if (raft.leader != 0) {
> + say_info("RAFT: vote request is skipped - the "
> + "leader is already known - %u",
> + raft.leader);
> break;
> }
> - /* Not a candidate. Can't accept votes. */
> if (req->vote == instance_id) {
> + /*
> + * This is entirely valid. This instance could
> + * request a vote, then become a follower or
> + * leader, and then get the response.
> + */
> say_info("RAFT: vote request is skipped - "
> "can't accept vote for self if not a "
> "candidate");
> break;
> }
> - /* Can't vote when vclock is unknown. */
> - if (req->vclock == NULL) {
> - say_info("RAFT: vote request is skipped - "
> - "missing candidate vclock.");
> - break;
> - }
> - /* Can't vote for too old or incomparable nodes. */
> - if (!raft_can_vote_for(req->vclock)) {
> + if (req->state != RAFT_STATE_CANDIDATE) {
> say_info("RAFT: vote request is skipped - "
> - "the vclock is not acceptable = %s",
> - vclock_to_string(req->vclock));
> + "this is a notification about a vote "
> + "for a third node, not a request");
> break;
> }
> - /*
> - * Check if somebody is asking to vote for a third
> - * node - nope. Make votes only when asked directly by
> - * the new candidate. However that restriction may be
> - * relaxed in future, if can be proven to be safe.
> - */
> - if (req->vote != source) {
> + if (raft.volatile_vote != 0) {
> say_info("RAFT: vote request is skipped - "
> - "indirect votes are not allowed");
> + "already voted in this term");
> break;
> }
> - if (raft.leader != 0) {
> + /* Vclock is not NULL, validated above. */
> + if (!raft_can_vote_for(req->vclock)) {
> say_info("RAFT: vote request is skipped - the "
> - "leader is already known - %u",
> - raft.leader);
> + "vclock is not acceptable");
> break;
Are you sure we want these messages on `info` level? Info is the default
log level and
RAFT spams the log with quite a lot of messages. Maybe `verbose` level
would be better?
> }
> /*
> @@ -433,15 +426,17 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
> unreachable();
> }
> }
> - /*
> - * If the node does not claim to be a leader, nothing interesting. Terms
> - * and votes are already handled.
> - */
> - if (req->state != RAFT_STATE_LEADER)
> - goto end;
> + if (req->state != RAFT_STATE_LEADER) {
> + if (source == raft.leader) {
> + say_info("RAFT: the node %u has resigned from the "
> + "leader role", raft.leader);
> + raft_sm_schedule_new_election();
> + }
> + return 0;
> + }
> /* The node is a leader, but it is already known. */
> if (source == raft.leader)
> - goto end;
> + return 0;
> /*
> * XXX: A message from a conflicting leader. Split brain, basically.
> * Need to decide what to do. Current solution is to do nothing. In
> @@ -451,20 +446,12 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
> if (raft.leader != 0) {
> say_warn("RAFT: conflicting leader detected in one term - "
> "known is %u, received %u", raft.leader, source);
> - goto end;
> + return 0;
> }
>
> /* New leader was elected. */
> raft_sm_follow_leader(source);
> -end:
> - if (raft.state != old_state) {
> - /*
> - * New term and vote are not broadcasted yet. Firstly their WAL
> - * write should be finished. But the state is volatile. It is ok
> - * to broadcast it now.
> - */
> - raft_broadcast_new_state();
> - }
> + return 0;
> }
>
> void
> @@ -558,6 +545,7 @@ fail:
> panic("Could not write a raft request to WAL\n");
> }
>
> +/* Dump Raft state to WAL in a blocking way. */
> static void
> raft_worker_handle_io(void)
> {
> @@ -565,9 +553,6 @@ raft_worker_handle_io(void)
> /* During write Raft can't be anything but a follower. */
> assert(raft.state == RAFT_STATE_FOLLOWER);
> struct raft_request req;
> - uint64_t old_term = raft.term;
> - uint32_t old_vote = raft.vote;
> - enum raft_state old_state = raft.state;
>
> if (raft_is_fully_on_disk()) {
> end_dump:
> @@ -604,71 +589,61 @@ end_dump:
> } else {
> memset(&req, 0, sizeof(req));
> assert(raft.volatile_term >= raft.term);
> - /* Term is written always. */
> req.term = raft.volatile_term;
> - if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote)
> - req.vote = raft.volatile_vote;
> + req.vote = raft.volatile_vote;
>
> raft_write_request(&req);
> - say_verbose("RAFT: persist and apply state %s",
> - raft_request_to_string(&req));
> + say_info("RAFT: persisted state %s",
> + raft_request_to_string(&req));
>
> assert(req.term >= raft.term);
> - if (req.term > raft.term) {
> - raft.term = req.term;
> - raft.vote = 0;
> - }
> - if (req.vote != 0) {
> - assert(raft.vote == 0);
> - raft.vote = req.vote;
> - }
> + raft.term = req.term;
> + raft.vote = req.vote;
> + /*
> + * Persistent state is visible, and it was changed - broadcast.
> + */
> + raft_schedule_broadcast();
> if (raft_is_fully_on_disk())
> goto end_dump;
> }
> +}
>
> +/* Broadcast Raft complete state to the followers. */
> +static void
> +raft_worker_handle_broadcast(void)
> +{
> + assert(raft.is_broadcast_scheduled);
> + struct raft_request req;
> memset(&req, 0, sizeof(req));
> - /* Term is encoded always. */
> req.term = raft.term;
> - bool has_changes = old_term != raft.term;
> - if (raft.vote != 0 && old_vote != raft.vote) {
> - req.vote = raft.vote;
> - /*
> - * When vote for self, need to send current vclock too. Two
> - * reasons for that:
> - *
> - * - nodes need to vote for the instance containing the newest
> - * data. So as not to loose it, because some of it may be
> - * confirmed by the synchronous replication;
> - *
> - * - replication is basically stopped during election. Other
> - * nodes can't learn vclock of this instance through regular
> - * replication.
> - */
> - if (raft.vote == instance_id)
> - req.vclock = &replicaset.vclock;
> - has_changes = true;
> - }
> - if (raft.state != old_state) {
> - req.state = raft.state;
> - has_changes = true;
> + req.vote = raft.vote;
> + req.state = raft.state;
> + if (req.state == RAFT_STATE_CANDIDATE) {
> + assert(raft.vote == instance_id);
> + req.vclock = &replicaset.vclock;
> }
> - if (has_changes)
> - raft_broadcast(&req);
> + replicaset_foreach(replica)
> + relay_push_raft(replica->relay, &req);
> + raft.is_broadcast_scheduled = false;
> }
>
> static int
> raft_worker_f(va_list args)
> {
> (void)args;
> + bool is_idle = true;
> while (!fiber_is_cancelled()) {
> - if (!raft.is_write_in_progress)
> - goto idle;
> - raft_worker_handle_io();
> - if (!raft.is_write_in_progress)
> - goto idle;
> + if (raft.is_write_in_progress) {
> + raft_worker_handle_io();
> + is_idle = false;
> + }
> + if (raft.is_broadcast_scheduled) {
> + raft_worker_handle_broadcast();
> + is_idle = false;
> + }
> fiber_sleep(0);
> - continue;
> - idle:
> + if (!is_idle)
> + continue;
> assert(raft_is_fully_on_disk());
> fiber_yield();
> }
> @@ -702,6 +677,8 @@ raft_sm_become_leader(void)
> ev_timer_stop(loop(), &raft.timer);
> /* Make read-write (if other subsystems allow that. */
> box_update_ro_summary();
> + /* State is visible and it is changed - broadcast. */
> + raft_schedule_broadcast();
> }
>
> static void
> @@ -712,10 +689,12 @@ raft_sm_follow_leader(uint32_t leader)
> assert(raft.leader == 0);
> raft.state = RAFT_STATE_FOLLOWER;
> raft.leader = leader;
> - if (!raft.is_write_in_progress) {
> + if (!raft.is_write_in_progress && raft.is_candidate) {
> ev_timer_stop(loop(), &raft.timer);
> raft_sm_wait_leader_dead();
> }
> + /* State is visible and it is changed - broadcast. */
> + raft_schedule_broadcast();
> }
>
> static void
> @@ -724,6 +703,7 @@ raft_sm_become_candidate(void)
> say_info("RAFT: enter candidate state with 1 self vote");
> assert(raft.state == RAFT_STATE_FOLLOWER);
> assert(raft.leader == 0);
> + assert(raft.vote == instance_id);
> assert(raft.is_candidate);
> assert(!raft.is_write_in_progress);
> assert(raft_election_quorum() > 1);
> @@ -732,6 +712,8 @@ raft_sm_become_candidate(void)
> raft.vote_mask = 0;
> bit_set(&raft.vote_mask, instance_id);
> raft_sm_wait_election_end();
> + /* State is visible and it is changed - broadcast. */
> + raft_schedule_broadcast();
> }
>
> static void
> @@ -747,6 +729,12 @@ raft_sm_schedule_new_term(uint64_t new_term)
> raft.state = RAFT_STATE_FOLLOWER;
> box_update_ro_summary();
> raft_sm_pause_and_dump();
> + /*
> + * State is visible and it is changed - broadcast. Term is also visible,
> + * but only persistent term. Volatile term is not broadcasted until
> + * saved to disk.
> + */
> + raft_schedule_broadcast();
> }
>
> static void
> @@ -758,6 +746,7 @@ raft_sm_schedule_new_vote(uint32_t new_vote)
> assert(raft.state == RAFT_STATE_FOLLOWER);
> raft.volatile_vote = new_vote;
> raft_sm_pause_and_dump();
> + /* Nothing visible is changed - no broadcast. */
> }
>
> static void
> @@ -828,13 +817,6 @@ raft_sm_start(void)
> else
> raft_sm_schedule_new_election();
> box_update_ro_summary();
> - /*
> - * When Raft is enabled, send the complete state. Because
> - * it wasn't sent in disabled state.
> - */
> - struct raft_request req;
> - raft_serialize_for_network(&req, NULL);
> - raft_broadcast(&req);
> }
>
> static void
> @@ -848,6 +830,8 @@ raft_sm_stop(void)
> raft.leader = 0;
> raft.state = RAFT_STATE_FOLLOWER;
> box_update_ro_summary();
> + /* State is visible and changed - broadcast. */
> + raft_schedule_broadcast();
> }
>
> void
> @@ -912,7 +896,8 @@ raft_cfg_is_candidate(bool is_candidate)
> if (raft.state == RAFT_STATE_LEADER)
> raft.leader = 0;
> raft.state = RAFT_STATE_FOLLOWER;
> - raft_broadcast_new_state();
> + /* State is visible and changed - broadcast. */
> + raft_schedule_broadcast();
> }
> box_update_ro_summary();
> }
> @@ -943,7 +928,6 @@ raft_cfg_election_quorum(void)
> if (raft.vote_count < raft_election_quorum())
> return;
> raft_sm_become_leader();
> - raft_broadcast_new_state();
> }
>
> void
> @@ -961,11 +945,20 @@ raft_cfg_death_timeout(void)
> }
> }
>
> -void
> -raft_broadcast(const struct raft_request *req)
> +static void
> +raft_schedule_broadcast(void)
> {
> - replicaset_foreach(replica)
> - relay_push_raft(replica->relay, req);
> + raft.is_broadcast_scheduled = true;
> + /*
> + * Don't wake the fiber if it writes something. Otherwise it would be a
> + * spurious wakeup breaking the WAL write not adapted to this.
> + */
> + if (raft.is_write_in_progress)
> + return;
> + if (raft.worker == NULL)
> + raft.worker = fiber_new("raft_worker", raft_worker_f);
> + if (raft.worker != fiber())
> + fiber_wakeup(raft.worker);
> }
>
> void
> diff --git a/src/box/raft.h b/src/box/raft.h
> index 23aedfe10..8c1cf467f 100644
> --- a/src/box/raft.h
> +++ b/src/box/raft.h
> @@ -124,6 +124,13 @@ struct raft {
> * happens asynchronously, not right after Raft state is updated.
> */
> bool is_write_in_progress;
> + /**
> + * Flag whether Raft wants to broadcast its state. It is done
> + * asynchronously in the worker fiber. That allows to collect multiple
> + * updates into one batch if they happen in one event loop iteration.
> + * Usually even in one function.
> + */
> + bool is_broadcast_scheduled;
> /**
> * Persisted Raft state. These values are used when need to tell current
> * Raft state to other nodes.
> @@ -181,7 +188,7 @@ raft_process_recovery(const struct raft_request *req);
> * @param req Raft request.
> * @param source Instance ID of the message sender.
> */
> -void
> +int
> raft_process_msg(const struct raft_request *req, uint32_t source);
>
> /**
> @@ -236,13 +243,6 @@ raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
> void
> raft_serialize_for_disk(struct raft_request *req);
>
> -/**
> - * Broadcast the changes in this instance's raft status to all
> - * the followers.
> - */
> -void
> -raft_broadcast(const struct raft_request *req);
> -
> /** Initialize Raft global data structures. */
> void
> raft_init(void);
--
Serge Petrenko
More information about the Tarantool-patches
mailing list