From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp56.i.mail.ru (smtp56.i.mail.ru [217.69.128.36]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 9B462469719 for ; Wed, 23 Sep 2020 12:59:09 +0300 (MSK) References: <3c916b9b7e70c4fbfa2b95c4aeb4146e83c82c58.1599693319.git.v.shpilevoy@tarantool.org> From: Serge Petrenko Message-ID: <7a9e07de-5f24-ed4b-3749-74a4dbedabb3@tarantool.org> Date: Wed, 23 Sep 2020 12:59:07 +0300 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH v2 09/11] raft: introduce state machine List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org, gorcunov@gmail.com 23.09.2020 01:48, Vladislav Shpilevoy пишет: > Consider a new pack of fixes on top of the branch. > > ==================== > [tosquash] raft: lots of amendments > > * Raft request can't be sent during final join, so its skipping is > removed from there; > > * Raft state broadcast became asynchronous (done in the worker > fiber). The motivation here is to be able to collect multiple > updates in one event loop iteration and send them in one batch. > Not caring if broadcast is expensive to call multiple times in > one function or not. > > * Raft messages now contain complete state of the sender: its > role, term, vote (if not 0), vclock (if candidate). That allows > to simplify a lot of code sacrificing saving a couple of bytes > in the network. > > * raft_process_msg() does protocol validation. > > * Log messages about vote request being skipped are reworded and > reordered. So as to make them less scary when their skip is > totally fine. Also the comments about skips are removed, since > they just duplicated the log messages. > > * Fixed a bug when leader could disable Raft, but it still was > considered a leader by the other nodes. Now they see it and > start a new election. > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 10186ab91..7686d6cbc 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -301,8 +301,6 @@ apply_final_join_row(struct xrow_header *row) > */ > if (iproto_type_is_synchro_request(row->type)) > return 0; > - if (iproto_type_is_raft_request(row->type)) > - return 0; > struct txn *txn = txn_begin(); > if (txn == NULL) > return -1; > @@ -895,8 +893,7 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row) > struct vclock candidate_clock; > if (xrow_decode_raft(row, &req, &candidate_clock) != 0) > return -1; > - raft_process_msg(&req, applier->instance_id); > - return 0; > + return raft_process_msg(&req, applier->instance_id); > } > > /** > diff --git a/src/box/raft.c b/src/box/raft.c > index f712887a1..ce90ed533 100644 > --- a/src/box/raft.c > +++ b/src/box/raft.c > @@ -61,6 +61,7 @@ struct raft raft = { > .is_candidate = false, > .is_cfg_candidate = false, > .is_write_in_progress = false, > + .is_broadcast_scheduled = false, > .term = 1, > .vote = 0, > .vote_mask = 0, > @@ -177,16 +178,9 @@ raft_election_quorum(void) > return MIN(replication_synchro_quorum, replicaset.registered_count); > } > > -/** Broadcast an event about this node changed its state to all relays. */ > -static inline void > -raft_broadcast_new_state(void) > -{ > - struct raft_request req; > - memset(&req, 0, sizeof(req)); > - req.term = raft.term; > - req.state = raft.state; > - raft_broadcast(&req); > -} > +/** Schedule broadcast of the complete Raft state to all the followers. */ > +static void > +raft_schedule_broadcast(void); > > /** Raft state machine methods. 'sm' stands for State Machine. */ > > @@ -324,80 +318,79 @@ raft_process_recovery(const struct raft_request *req) > assert(!raft_is_enabled()); > } > > -void > +int > raft_process_msg(const struct raft_request *req, uint32_t source) > { > say_info("RAFT: message %s from %u", raft_request_to_string(req), > source); > assert(source > 0); > assert(source != instance_id); > + if (req->term == 0 || req->state == 0) { > + diag_set(ClientError, ER_PROTOCOL, "Raft term and state can't " > + "be zero"); > + return -1; > + } > + if (req->state == RAFT_STATE_CANDIDATE && > + (req->vote != source || req->vclock == NULL)) { > + diag_set(ClientError, ER_PROTOCOL, "Candidate should always " > + "vote for self and provide its vclock"); > + return -1; > + } > /* Outdated request. */ > if (req->term < raft.volatile_term) { > say_info("RAFT: the message is ignored due to outdated term - " > "current term is %u", raft.volatile_term); > - return; > + return 0; > } > > - enum raft_state old_state = raft.state; > - > /* Term bump. */ > if (req->term > raft.volatile_term) > raft_sm_schedule_new_term(req->term); > - > - /* Vote request during the on-going election. */ > + /* > + * Either a vote request during an on-going election. Or an old vote > + * persisted long time ago and still broadcasted. Or a vote response. > + */ > if (req->vote != 0) { > switch (raft.state) { > case RAFT_STATE_FOLLOWER: > case RAFT_STATE_LEADER: > - /* > - * Can't respond on vote requests when Raft is disabled. > - */ > if (!raft.is_enabled) { > say_info("RAFT: vote request is skipped - RAFT " > "is disabled"); > break; > } > - /* Check if already voted in this term. */ > - if (raft.volatile_vote != 0) { > - say_info("RAFT: vote request is skipped - " > - "already voted in this term"); > + if (raft.leader != 0) { > + say_info("RAFT: vote request is skipped - the " > + "leader is already known - %u", > + raft.leader); > break; > } > - /* Not a candidate. Can't accept votes. */ > if (req->vote == instance_id) { > + /* > + * This is entirely valid. This instance could > + * request a vote, then become a follower or > + * leader, and then get the response. > + */ > say_info("RAFT: vote request is skipped - " > "can't accept vote for self if not a " > "candidate"); > break; > } > - /* Can't vote when vclock is unknown. */ > - if (req->vclock == NULL) { > - say_info("RAFT: vote request is skipped - " > - "missing candidate vclock."); > - break; > - } > - /* Can't vote for too old or incomparable nodes. */ > - if (!raft_can_vote_for(req->vclock)) { > + if (req->state != RAFT_STATE_CANDIDATE) { > say_info("RAFT: vote request is skipped - " > - "the vclock is not acceptable = %s", > - vclock_to_string(req->vclock)); > + "this is a notification about a vote " > + "for a third node, not a request"); > break; > } > - /* > - * Check if somebody is asking to vote for a third > - * node - nope. Make votes only when asked directly by > - * the new candidate. However that restriction may be > - * relaxed in future, if can be proven to be safe. > - */ > - if (req->vote != source) { > + if (raft.volatile_vote != 0) { > say_info("RAFT: vote request is skipped - " > - "indirect votes are not allowed"); > + "already voted in this term"); > break; > } > - if (raft.leader != 0) { > + /* Vclock is not NULL, validated above. */ > + if (!raft_can_vote_for(req->vclock)) { > say_info("RAFT: vote request is skipped - the " > - "leader is already known - %u", > - raft.leader); > + "vclock is not acceptable"); > break; Are you sure we want these messages on `info` level? Info is the default log level and RAFT spams the log with quite a lot of messages. Maybe `verbose` level would be better? > } > /* > @@ -433,15 +426,17 @@ raft_process_msg(const struct raft_request *req, uint32_t source) > unreachable(); > } > } > - /* > - * If the node does not claim to be a leader, nothing interesting. Terms > - * and votes are already handled. > - */ > - if (req->state != RAFT_STATE_LEADER) > - goto end; > + if (req->state != RAFT_STATE_LEADER) { > + if (source == raft.leader) { > + say_info("RAFT: the node %u has resigned from the " > + "leader role", raft.leader); > + raft_sm_schedule_new_election(); > + } > + return 0; > + } > /* The node is a leader, but it is already known. */ > if (source == raft.leader) > - goto end; > + return 0; > /* > * XXX: A message from a conflicting leader. Split brain, basically. > * Need to decide what to do. Current solution is to do nothing. In > @@ -451,20 +446,12 @@ raft_process_msg(const struct raft_request *req, uint32_t source) > if (raft.leader != 0) { > say_warn("RAFT: conflicting leader detected in one term - " > "known is %u, received %u", raft.leader, source); > - goto end; > + return 0; > } > > /* New leader was elected. */ > raft_sm_follow_leader(source); > -end: > - if (raft.state != old_state) { > - /* > - * New term and vote are not broadcasted yet. Firstly their WAL > - * write should be finished. But the state is volatile. It is ok > - * to broadcast it now. > - */ > - raft_broadcast_new_state(); > - } > + return 0; > } > > void > @@ -558,6 +545,7 @@ fail: > panic("Could not write a raft request to WAL\n"); > } > > +/* Dump Raft state to WAL in a blocking way. */ > static void > raft_worker_handle_io(void) > { > @@ -565,9 +553,6 @@ raft_worker_handle_io(void) > /* During write Raft can't be anything but a follower. */ > assert(raft.state == RAFT_STATE_FOLLOWER); > struct raft_request req; > - uint64_t old_term = raft.term; > - uint32_t old_vote = raft.vote; > - enum raft_state old_state = raft.state; > > if (raft_is_fully_on_disk()) { > end_dump: > @@ -604,71 +589,61 @@ end_dump: > } else { > memset(&req, 0, sizeof(req)); > assert(raft.volatile_term >= raft.term); > - /* Term is written always. */ > req.term = raft.volatile_term; > - if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote) > - req.vote = raft.volatile_vote; > + req.vote = raft.volatile_vote; > > raft_write_request(&req); > - say_verbose("RAFT: persist and apply state %s", > - raft_request_to_string(&req)); > + say_info("RAFT: persisted state %s", > + raft_request_to_string(&req)); > > assert(req.term >= raft.term); > - if (req.term > raft.term) { > - raft.term = req.term; > - raft.vote = 0; > - } > - if (req.vote != 0) { > - assert(raft.vote == 0); > - raft.vote = req.vote; > - } > + raft.term = req.term; > + raft.vote = req.vote; > + /* > + * Persistent state is visible, and it was changed - broadcast. > + */ > + raft_schedule_broadcast(); > if (raft_is_fully_on_disk()) > goto end_dump; > } > +} > > +/* Broadcast Raft complete state to the followers. */ > +static void > +raft_worker_handle_broadcast(void) > +{ > + assert(raft.is_broadcast_scheduled); > + struct raft_request req; > memset(&req, 0, sizeof(req)); > - /* Term is encoded always. */ > req.term = raft.term; > - bool has_changes = old_term != raft.term; > - if (raft.vote != 0 && old_vote != raft.vote) { > - req.vote = raft.vote; > - /* > - * When vote for self, need to send current vclock too. Two > - * reasons for that: > - * > - * - nodes need to vote for the instance containing the newest > - * data. So as not to loose it, because some of it may be > - * confirmed by the synchronous replication; > - * > - * - replication is basically stopped during election. Other > - * nodes can't learn vclock of this instance through regular > - * replication. > - */ > - if (raft.vote == instance_id) > - req.vclock = &replicaset.vclock; > - has_changes = true; > - } > - if (raft.state != old_state) { > - req.state = raft.state; > - has_changes = true; > + req.vote = raft.vote; > + req.state = raft.state; > + if (req.state == RAFT_STATE_CANDIDATE) { > + assert(raft.vote == instance_id); > + req.vclock = &replicaset.vclock; > } > - if (has_changes) > - raft_broadcast(&req); > + replicaset_foreach(replica) > + relay_push_raft(replica->relay, &req); > + raft.is_broadcast_scheduled = false; > } > > static int > raft_worker_f(va_list args) > { > (void)args; > + bool is_idle = true; > while (!fiber_is_cancelled()) { > - if (!raft.is_write_in_progress) > - goto idle; > - raft_worker_handle_io(); > - if (!raft.is_write_in_progress) > - goto idle; > + if (raft.is_write_in_progress) { > + raft_worker_handle_io(); > + is_idle = false; > + } > + if (raft.is_broadcast_scheduled) { > + raft_worker_handle_broadcast(); > + is_idle = false; > + } > fiber_sleep(0); > - continue; > - idle: > + if (!is_idle) > + continue; > assert(raft_is_fully_on_disk()); > fiber_yield(); > } > @@ -702,6 +677,8 @@ raft_sm_become_leader(void) > ev_timer_stop(loop(), &raft.timer); > /* Make read-write (if other subsystems allow that. */ > box_update_ro_summary(); > + /* State is visible and it is changed - broadcast. */ > + raft_schedule_broadcast(); > } > > static void > @@ -712,10 +689,12 @@ raft_sm_follow_leader(uint32_t leader) > assert(raft.leader == 0); > raft.state = RAFT_STATE_FOLLOWER; > raft.leader = leader; > - if (!raft.is_write_in_progress) { > + if (!raft.is_write_in_progress && raft.is_candidate) { > ev_timer_stop(loop(), &raft.timer); > raft_sm_wait_leader_dead(); > } > + /* State is visible and it is changed - broadcast. */ > + raft_schedule_broadcast(); > } > > static void > @@ -724,6 +703,7 @@ raft_sm_become_candidate(void) > say_info("RAFT: enter candidate state with 1 self vote"); > assert(raft.state == RAFT_STATE_FOLLOWER); > assert(raft.leader == 0); > + assert(raft.vote == instance_id); > assert(raft.is_candidate); > assert(!raft.is_write_in_progress); > assert(raft_election_quorum() > 1); > @@ -732,6 +712,8 @@ raft_sm_become_candidate(void) > raft.vote_mask = 0; > bit_set(&raft.vote_mask, instance_id); > raft_sm_wait_election_end(); > + /* State is visible and it is changed - broadcast. */ > + raft_schedule_broadcast(); > } > > static void > @@ -747,6 +729,12 @@ raft_sm_schedule_new_term(uint64_t new_term) > raft.state = RAFT_STATE_FOLLOWER; > box_update_ro_summary(); > raft_sm_pause_and_dump(); > + /* > + * State is visible and it is changed - broadcast. Term is also visible, > + * but only persistent term. Volatile term is not broadcasted until > + * saved to disk. > + */ > + raft_schedule_broadcast(); > } > > static void > @@ -758,6 +746,7 @@ raft_sm_schedule_new_vote(uint32_t new_vote) > assert(raft.state == RAFT_STATE_FOLLOWER); > raft.volatile_vote = new_vote; > raft_sm_pause_and_dump(); > + /* Nothing visible is changed - no broadcast. */ > } > > static void > @@ -828,13 +817,6 @@ raft_sm_start(void) > else > raft_sm_schedule_new_election(); > box_update_ro_summary(); > - /* > - * When Raft is enabled, send the complete state. Because > - * it wasn't sent in disabled state. > - */ > - struct raft_request req; > - raft_serialize_for_network(&req, NULL); > - raft_broadcast(&req); > } > > static void > @@ -848,6 +830,8 @@ raft_sm_stop(void) > raft.leader = 0; > raft.state = RAFT_STATE_FOLLOWER; > box_update_ro_summary(); > + /* State is visible and changed - broadcast. */ > + raft_schedule_broadcast(); > } > > void > @@ -912,7 +896,8 @@ raft_cfg_is_candidate(bool is_candidate) > if (raft.state == RAFT_STATE_LEADER) > raft.leader = 0; > raft.state = RAFT_STATE_FOLLOWER; > - raft_broadcast_new_state(); > + /* State is visible and changed - broadcast. */ > + raft_schedule_broadcast(); > } > box_update_ro_summary(); > } > @@ -943,7 +928,6 @@ raft_cfg_election_quorum(void) > if (raft.vote_count < raft_election_quorum()) > return; > raft_sm_become_leader(); > - raft_broadcast_new_state(); > } > > void > @@ -961,11 +945,20 @@ raft_cfg_death_timeout(void) > } > } > > -void > -raft_broadcast(const struct raft_request *req) > +static void > +raft_schedule_broadcast(void) > { > - replicaset_foreach(replica) > - relay_push_raft(replica->relay, req); > + raft.is_broadcast_scheduled = true; > + /* > + * Don't wake the fiber if it writes something. Otherwise it would be a > + * spurious wakeup breaking the WAL write not adapted to this. > + */ > + if (raft.is_write_in_progress) > + return; > + if (raft.worker == NULL) > + raft.worker = fiber_new("raft_worker", raft_worker_f); > + if (raft.worker != fiber()) > + fiber_wakeup(raft.worker); > } > > void > diff --git a/src/box/raft.h b/src/box/raft.h > index 23aedfe10..8c1cf467f 100644 > --- a/src/box/raft.h > +++ b/src/box/raft.h > @@ -124,6 +124,13 @@ struct raft { > * happens asynchronously, not right after Raft state is updated. > */ > bool is_write_in_progress; > + /** > + * Flag whether Raft wants to broadcast its state. It is done > + * asynchronously in the worker fiber. That allows to collect multiple > + * updates into one batch if they happen in one event loop iteration. > + * Usually even in one function. > + */ > + bool is_broadcast_scheduled; > /** > * Persisted Raft state. These values are used when need to tell current > * Raft state to other nodes. > @@ -181,7 +188,7 @@ raft_process_recovery(const struct raft_request *req); > * @param req Raft request. > * @param source Instance ID of the message sender. > */ > -void > +int > raft_process_msg(const struct raft_request *req, uint32_t source); > > /** > @@ -236,13 +243,6 @@ raft_serialize_for_network(struct raft_request *req, struct vclock *vclock); > void > raft_serialize_for_disk(struct raft_request *req); > > -/** > - * Broadcast the changes in this instance's raft status to all > - * the followers. > - */ > -void > -raft_broadcast(const struct raft_request *req); > - > /** Initialize Raft global data structures. */ > void > raft_init(void); -- Serge Petrenko