From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp49.i.mail.ru (smtp49.i.mail.ru [94.100.177.109]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 89F0D469719 for ; Mon, 21 Sep 2020 11:22:16 +0300 (MSK) References: <3c916b9b7e70c4fbfa2b95c4aeb4146e83c82c58.1599693319.git.v.shpilevoy@tarantool.org> From: Serge Petrenko Message-ID: <9c8db026-60e1-5079-e884-7770cdfae0d7@tarantool.org> Date: Mon, 21 Sep 2020 11:22:15 +0300 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH v2 09/11] raft: introduce state machine List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org, gorcunov@gmail.com 19.09.2020 18:49, Vladislav Shpilevoy пишет: > Here is a new version of the patch after some squashes. > > ==================== > raft: introduce state machine > > The commit is a core part of Raft implementation. It introduces > the Raft state machine implementation and its integration into the > instance's life cycle. > > The implementation follows the protocol to the letter except a few > important details. > > Firstly, the original Raft assumes, that all nodes share the same > log record numbers. In Tarantool they are called LSNs. But in case > of Tarantool each node has its own LSN in its own component of > vclock. That makes the election messages a bit heavier, because > the nodes need to send and compare complete vclocks of each other > instead of a single number like in the original Raft. But logic > becomes simpler. Because in the original Raft there is a problem > of uncertainty about what to do with records of an old leader > right after a new leader is elected. They could be rolled back or > confirmed depending on circumstances. The issue disappears when > vclock is used. > > Secondly, leader election works differently during cluster > bootstrap, until number of bootstrapped replicas becomes >= > election quorum. That arises from specifics of replicas bootstrap > and order of systems initialization. In short: during bootstrap a > leader election may use a smaller election quorum than the > configured one. See more details in the code. > > Part of #1146 Consider these fixes for applier vclock segfault pushed on top of the branch.  src/box/box.cc |  6 +-----  src/box/raft.c | 12 +++++++++---  2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index c5dcbd959..1169f2cd5 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -2160,11 +2160,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)           * should be 0.           */          struct raft_request req; -        /* -         * Omit the candidate vclock, since we've just sent it in -         * subscribe response. -         */ -        raft_serialize_for_network(&req, NULL); +        raft_serialize_for_network(&req, &vclock);          xrow_encode_raft(&row, &fiber()->gc, &req);          coio_write_xrow(io, &row);      } diff --git a/src/box/raft.c b/src/box/raft.c index 7b7ef9c1c..45783f0e3 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -130,8 +130,6 @@ raft_new_random_election_shift(void)  static inline bool  raft_can_vote_for(const struct vclock *v)  { -    if (v == NULL) -        return false;      int cmp = vclock_compare_ignore0(v, &replicaset.vclock);      return cmp == 0 || cmp == 1;  } @@ -369,6 +367,12 @@ raft_process_msg(const struct raft_request *req, uint32_t source)                       "candidate");                  break;              } +            /* Can't vote when vclock is unknown. */ +            if (req->vclock == NULL) { +                say_info("RAFT: vote request is skipped - " +                     "missing candidate vclock."); +                break; +            }              /* Can't vote for too old or incomparable nodes. */              if (!raft_can_vote_for(req->vclock)) {                  say_info("RAFT: vote request is skipped - " @@ -858,8 +862,10 @@ raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)      req->state = raft.state;      /*       * Raft does not own vclock, so it always expects it passed externally. +     * Vclock is sent out only by candidate instances.       */ -    req->vclock = vclock; +    if (req->state == RAFT_STATE_CANDIDATE) +        req->vclock = vclock;  }  void -- 2.24.3 (Apple Git-128) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index fd7cf1c79..c352faf5e 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -900,8 +900,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row) > * Return 0 for success or -1 in case of an error. > */ > static int > -applier_apply_tx(struct stailq *rows) > +applier_apply_tx(struct applier *applier, struct stailq *rows) > { > + /* > + * Rows received not directly from a leader are ignored. That is a > + * protection against the case when an old leader keeps sending data > + * around not knowing yet that it is not a leader anymore. > + * > + * XXX: it may be that this can be fine to apply leader transactions by > + * looking at their replica_id field if it is equal to leader id. That > + * can be investigated as an 'optimization'. Even though may not give > + * anything, because won't change total number of rows sent in the > + * network anyway. > + */ > + if (!raft_is_source_allowed(applier->instance_id)) > + return 0; > struct xrow_header *first_row = &stailq_first_entry(rows, > struct applier_tx_row, next)->row; > struct xrow_header *last_row; > @@ -1241,6 +1254,7 @@ applier_subscribe(struct applier *applier) > struct xrow_header *first_row = > &stailq_first_entry(&rows, struct applier_tx_row, > next)->row; > + raft_process_heartbeat(applier->instance_id); > if (first_row->lsn == 0) { > if (unlikely(iproto_type_is_raft_request( > first_row->type))) { > @@ -1249,7 +1263,7 @@ applier_subscribe(struct applier *applier) > diag_raise(); > } > applier_signal_ack(applier); > - } else if (applier_apply_tx(&rows) != 0) { > + } else if (applier_apply_tx(applier, &rows) != 0) { > diag_raise(); > } > > diff --git a/src/box/box.cc b/src/box/box.cc > index 77ab21dbb..c5dcbd959 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -157,7 +157,7 @@ void > box_update_ro_summary(void) > { > bool old_is_ro_summary = is_ro_summary; > - is_ro_summary = is_ro || is_orphan; > + is_ro_summary = is_ro || is_orphan || raft_is_ro(); > /* In 99% nothing changes. Filter this out first. */ > if (is_ro_summary == old_is_ro_summary) > return; > @@ -171,6 +171,10 @@ static int > box_check_writable(void) > { > if (is_ro_summary) { > + /* > + * XXX: return a special error when the node is not a leader to > + * reroute to the leader node. > + */ > diag_set(ClientError, ER_READONLY); > diag_log(); > return -1; > @@ -2652,6 +2656,7 @@ box_init(void) > > txn_limbo_init(); > sequence_init(); > + raft_init(); > } > > bool > diff --git a/src/box/raft.c b/src/box/raft.c > index 4d3d07c48..07a49351f 100644 > --- a/src/box/raft.c > +++ b/src/box/raft.c > @@ -36,6 +36,13 @@ > #include "small/region.h" > #include "replication.h" > #include "relay.h" > +#include "box.h" > +#include "tt_static.h" > + > +/** > + * Maximal random deviation of the election timeout. From the configured value. > + */ > +#define RAFT_RANDOM_ELECTION_FACTOR 0.1 > > const char *raft_state_strs[] = { > NULL, > @@ -48,19 +55,261 @@ const char *raft_state_strs[] = { > struct raft raft = { > .leader = 0, > .state = RAFT_STATE_FOLLOWER, > + .volatile_term = 1, > + .volatile_vote = 0, > .is_enabled = false, > .is_candidate = false, > + .is_cfg_candidate = false, > + .is_write_in_progress = false, > .term = 1, > .vote = 0, > + .vote_mask = 0, > + .vote_count = 0, > + .worker = NULL, > + .election_timeout = 5, > }; > > +/** > + * Check if Raft is completely synced with disk. Meaning all its critical values > + * are in WAL. Only in that state the node can become a leader or a candidate. > + * If the node has a not flushed data, it means either the term was bumped, or > + * a new vote was made. > + * > + * In case of term bump it means either there is another node with a newer term, > + * and this one should be a follower; or this node bumped the term itself along > + * with making a vote to start a new election - then it is also a follower which > + * will turn into a candidate when the flush is done. > + * > + * In case of a new not flushed vote it means either this node voted for some > + * other node, and must be a follower; or it voted for self, and also must be a > + * follower, but will become a candidate when the flush is done. > + * > + * In total - when something is not synced with disk, the instance is a follower > + * in any case. > + */ > +static bool > +raft_is_fully_on_disk(void) > +{ > + return raft.volatile_term == raft.term && > + raft.volatile_vote == raft.vote; > +} > + > +/** > + * Raft protocol says that election timeout should be a bit randomized so as > + * the nodes wouldn't start election at the same time and end up with not having > + * a quorum for anybody. This implementation randomizes the election timeout by > + * adding {election timeout * random factor} value, where max value of the > + * factor is a constant floating point value > 0. > + */ > +static inline double > +raft_new_random_election_shift(void) > +{ > + double timeout = raft.election_timeout; > + /* Translate to ms. Integer is needed to be able to use mod below. */ > + uint32_t rand_part = > + (uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000); > + if (rand_part == 0) > + rand_part = 1; > + /* > + * XXX: this is not giving a good distribution, but it is not so trivial > + * to implement a correct random value generator. There is a task to > + * unify all such places. Not critical here. > + */ > + rand_part = rand() % (rand_part + 1); > + return rand_part / 1000.0; > +} > + > +/** > + * Raft says that during election a node1 can vote for node2, if node2 has a > + * bigger term, or has the same term but longer log. In case of Tarantool it > + * means the node2 vclock should be >= node1 vclock, in all components. It is > + * not enough to compare only one component. At least because there may be not > + * a previous leader when the election happens first time. Or a node could > + * restart and forget who the previous leader was. > + */ > +static inline bool > +raft_can_vote_for(const struct vclock *v) > +{ > + if (v == NULL) > + return false; > + int cmp = vclock_compare_ignore0(v, &replicaset.vclock); > + return cmp == 0 || cmp == 1; > +} > + > +/** > + * Election quorum is not strictly equal to synchronous replication quorum. > + * Sometimes it can be lowered. That is about bootstrap. > + * > + * The problem with bootstrap is that when the replicaset boots, all the > + * instances can't write to WAL and can't recover from their initial snapshot. > + * They need one node which will boot first, and then they will replicate from > + * it. > + * > + * This one node should boot from its zero snapshot, create replicaset UUID, > + * register self with ID 1 in _cluster space, and then register all the other > + * instances here. To do that the node must be writable. It should have > + * read_only = false, connection quorum satisfied, and be a Raft leader if Raft > + * is enabled. > + * > + * To be elected a Raft leader it needs to perform election. But it can't be > + * done before at least synchronous quorum of the replicas is bootstrapped. And > + * they can't be bootstrapped because wait for a leader to initialize _cluster. > + * Cyclic dependency. > + * > + * This is resolved by truncation of the election quorum to the number of > + * registered replicas, if their count is less than synchronous quorum. That > + * helps to elect a first leader. > + * > + * It may seem that the first node could just declare itself a leader and then > + * strictly follow the protocol from now on, but that won't work, because if the > + * first node will restart after it is booted, but before quorum of replicas is > + * booted, the cluster will stuck again. > + * > + * The current solution is totally safe because > + * > + * - after all the cluster will have node count >= quorum, if user used a > + * correct config (God help him if he didn't); > + * > + * - synchronous replication quorum is untouched - it is not truncated. Only > + * leader election quorum is affected. So synchronous data won't be lost. > + */ > +static inline int > +raft_election_quorum(void) > +{ > + return MIN(replication_synchro_quorum, replicaset.registered_count); > +} > + > +/** Broadcast an event about this node changed its state to all relays. */ > +static inline void > +raft_broadcast_new_state(void) > +{ > + struct raft_request req; > + memset(&req, 0, sizeof(req)); > + req.term = raft.term; > + req.state = raft.state; > + raft_broadcast(&req); > +} > + > +/** Raft state machine methods. 'sm' stands for State Machine. */ > + > +/** > + * Start the state machine. When it is stopped, Raft state is updated and > + * goes to WAL when necessary, but it does not affect the instance operation. > + * For example, when Raft is stopped, the instance role does not affect whether > + * it is writable. > + */ > +static void > +raft_sm_start(void); > + > +/** > + * Stop the state machine. Now until Raft is re-enabled, > + * - Raft stops affecting the instance operation; > + * - this node can't become a leader; > + * - this node can't vote. > + */ > +static void > +raft_sm_stop(void); > + > +/** > + * When the instance is a follower but is allowed to be a leader, it will wait > + * for death of the current leader to start new election. > + */ > +static void > +raft_sm_wait_leader_dead(void); > + > +/** > + * If election is started by this node, or it voted for some other node started > + * the election, and it can be a leader itself, it will wait until the current > + * election times out. When it happens, the node will start new election. > + */ > +static void > +raft_sm_wait_election_end(void); > + > +/** Bump volatile term and schedule its flush to disk. */ > +static void > +raft_sm_schedule_new_term(uint64_t new_term); > + > +/** Bump volatile vote and schedule its flush to disk. */ > +static void > +raft_sm_schedule_new_vote(uint32_t new_vote); > + > +/** > + * Bump term and vote for self immediately. After that is persisted, the > + * election timeout will be activated. Unless during that nothing newer happens. > + */ > +static void > +raft_sm_schedule_new_election(void); > + > +/** > + * The main trigger of Raft state machine - start new election when the current > + * leader dies, or when there is no a leader and the previous election failed. > + */ > +static void > +raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer, > + int events); > + > +/** Start Raft state flush to disk. */ > +static void > +raft_sm_pause_and_dump(void); > + > +static void > +raft_sm_become_leader(void); > + > +static void > +raft_sm_follow_leader(uint32_t leader); > + > +static void > +raft_sm_become_candidate(void); > + > +static const char * > +raft_request_to_string(const struct raft_request *req) > +{ > + assert(req->term != 0); > + int size = 1024; > + char buf[1024]; > + char *pos = buf; > + int rc = snprintf(pos, size, "{term: %llu", req->term); > + assert(rc >= 0); > + pos += rc; > + size -= rc; > + if (req->vote != 0) { > + rc = snprintf(pos, size, ", vote: %u", req->vote); > + assert(rc >= 0); > + pos += rc; > + size -= rc; > + } > + if (req->state != 0) { > + rc = snprintf(pos, size, ", state: %s", > + raft_state_strs[req->state]); > + assert(rc >= 0); > + pos += rc; > + size -= rc; > + } > + if (req->vclock != NULL) { > + rc = snprintf(pos, size, ", vclock: %s", > + vclock_to_string(req->vclock)); > + assert(rc >= 0); > + pos += rc; > + size -= rc; > + } > + rc = snprintf(pos, size, "}"); > + assert(rc >= 0); > + pos += rc; > + return tt_cstr(buf, pos - buf); > +} > + > void > raft_process_recovery(const struct raft_request *req) > { > - if (req->term != 0) > + say_verbose("RAFT: recover %s", raft_request_to_string(req)); > + if (req->term != 0) { > raft.term = req->term; > - if (req->vote != 0) > + raft.volatile_term = req->term; > + } > + if (req->vote != 0) { > raft.vote = req->vote; > + raft.volatile_vote = req->vote; > + } > /* > * Role is never persisted. If recovery is happening, the > * node was restarted, and the former role can be false > @@ -80,34 +329,526 @@ raft_process_recovery(const struct raft_request *req) > void > raft_process_msg(const struct raft_request *req, uint32_t source) > { > - (void)source; > - if (req->term > raft.term) { > - // Update term. > - // The logic will be similar, but the code > - // below is for testing purposes. > - raft.term = req->term; > + say_info("RAFT: message %s from %u", raft_request_to_string(req), > + source); > + assert(source > 0); > + assert(source != instance_id); > + /* Outdated request. */ > + if (req->term < raft.volatile_term) > + return; > + > + enum raft_state old_state = raft.state; > + > + /* Term bump. */ > + if (req->term > raft.volatile_term) > + raft_sm_schedule_new_term(req->term); > + > + /* Vote request during the on-going election. */ > + if (req->vote != 0) { > + switch (raft.state) { > + case RAFT_STATE_FOLLOWER: > + /* > + * Can't respond on vote requests when Raft is disabled. > + */ > + if (!raft.is_enabled) { > + say_info("RAFT: vote request is skipped - RAFT " > + "is disabled"); > + break; > + } > + /* Check if already voted in this term. */ > + if (raft.volatile_vote != 0) { > + say_info("RAFT: vote request is skipped - " > + "already voted in this term"); > + break; > + } > + /* Not a candidate. Can't accept votes. */ > + if (req->vote == instance_id) { > + say_info("RAFT: vote request is skipped - " > + "can't accept vote for self if not a " > + "candidate"); > + break; > + } > + /* Can't vote for too old or incomparable nodes. */ > + if (!raft_can_vote_for(req->vclock)) { > + say_info("RAFT: vote request is skipped - " > + "the vclock is not acceptable = %s", > + vclock_to_string(req->vclock)); > + break; > + } > + /* > + * Check if somebody is asking to vote for a third > + * node - nope. Make votes only when asked directly by > + * the new candidate. However that restriction may be > + * relaxed in future, if can be proven to be safe. > + */ > + if (req->vote != source) { > + say_info("RAFT: vote request is skipped - " > + "indirect votes are not allowed"); > + break; > + } > + /* > + * Either the term is new, or didn't vote in the current > + * term yet. Anyway can vote now. > + */ > + raft_sm_schedule_new_vote(req->vote); > + break; > + case RAFT_STATE_CANDIDATE: > + /* Check if this is a vote for a competing candidate. */ > + if (req->vote != instance_id) { > + say_info("RAFT: vote request is skipped - " > + "competing candidate"); > + break; > + } > + /* > + * Vote for self was requested earlier in this round, > + * and now was answered by some other instance. > + */ > + assert(raft.volatile_vote == instance_id); > + int quorum = raft_election_quorum(); > + bool was_set = bit_set(&raft.vote_mask, source); > + raft.vote_count += !was_set; > + if (raft.vote_count < quorum) { > + say_info("RAFT: accepted vote for self, vote " > + "count is %d/%d", raft.vote_count, > + quorum); > + break; > + } > + raft_sm_become_leader(); > + break; > + case RAFT_STATE_LEADER: > + /* > + * If the node is still a leader, it ignores all votes. > + * Indeed, if the received vote would be from a new > + * term, the node would bump its own term and would > + * enter the follower state by now. If the vote is from > + * the current term, then the leader can freely ignore > + * it. > + */ > + break; > + default: > + unreachable(); > + } > + } > + /* > + * If the node does not claim to be a leader, nothing interesting. Terms > + * and votes are already handled. > + */ > + if (req->state != RAFT_STATE_LEADER) > + goto end; > + /* The node is a leader, but it is already known. */ > + if (source == raft.leader) > + goto end; > + /* > + * XXX: A message from a conflicting leader. Split brain, basically. > + * Need to decide what to do. Current solution is to do nothing. In > + * future either this node should try to become a leader, or should stop > + * all writes and require manual intervention. > + */ > + if (raft.leader != 0) { > + say_warn("RAFT: conflicting leader detected in one term - " > + "known is %u, received %u", raft.leader, source); > + goto end; > + } > + > + /* New leader was elected. */ > + raft_sm_follow_leader(source); > +end: > + if (raft.state != old_state) { > + /* > + * New term and vote are not broadcasted yet. Firstly their WAL > + * write should be finished. But the state is volatile. It is ok > + * to broadcast it now. > + */ > + raft_broadcast_new_state(); > + } > +} > + > +void > +raft_process_heartbeat(uint32_t source) > +{ > + /* > + * When not a candidate - don't wait for anything. Therefore do not care > + * about the leader being dead. > + */ > + if (!raft.is_candidate) > + return; > + /* Don't care about heartbeats when this node is a leader itself. */ > + if (raft.state == RAFT_STATE_LEADER) > + return; > + /* Not interested in heartbeats from not a leader. */ > + if (raft.leader != source) > + return; > + /* > + * The instance currently is busy with writing something on disk. Can't > + * react to heartbeats. > + */ > + if (raft.is_write_in_progress) > + return; > + /* > + * XXX: it may be expensive to reset the timer like that. It may be less > + * expensive to let the timer work, and remember last timestamp when > + * anything was heard from the leader. Then in the timer callback check > + * the timestamp, and restart the timer, if it is fine. > + */ > + assert(ev_is_active(&raft.timer)); > + ev_timer_stop(loop(), &raft.timer); > + raft_sm_wait_leader_dead(); > +} > + > +/** Wakeup Raft state writer fiber waiting for WAL write end. */ > +static void > +raft_write_cb(struct journal_entry *entry) > +{ > + fiber_wakeup(entry->complete_data); > +} > + > +/** Synchronously write a Raft request into WAL. */ > +static void > +raft_write_request(const struct raft_request *req) > +{ > + assert(raft.is_write_in_progress); > + /* > + * Vclock is never persisted by Raft. It is used only to > + * be sent to network when vote for self. > + */ > + assert(req->vclock == NULL); > + /* > + * State is not persisted. That would be strictly against Raft protocol. > + * The reason is that it does not make much sense - even if the node is > + * a leader now, after the node is restarted, there will be another > + * leader elected by that time likely. > + */ > + assert(req->state == 0); > + struct region *region = &fiber()->gc; > + uint32_t svp = region_used(region); > + struct xrow_header row; > + char buf[sizeof(struct journal_entry) + > + sizeof(struct xrow_header *)]; > + struct journal_entry *entry = (struct journal_entry *)buf; > + entry->rows[0] = &row; > + > + if (xrow_encode_raft(&row, region, req) != 0) > + goto fail; > + journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb, > + fiber()); > + > + if (journal_write(entry) != 0 || entry->res < 0) { > + diag_set(ClientError, ER_WAL_IO); > + diag_log(); > + goto fail; > } > - if (req->vote > 0) { > - // Check whether the vote's for us. > + > + raft_broadcast(req); > + > + region_truncate(region, svp); > + return; > +fail: > + /* > + * XXX: the stub is supposed to be removed once it is defined what to do > + * when a raft request WAL write fails. > + */ > + panic("Could not write a raft request to WAL\n"); > +} > + > +static void > +raft_worker_handle_io(void) > +{ > + assert(raft.is_write_in_progress); > + /* During write Raft can't be anything but a follower. */ > + assert(raft.state == RAFT_STATE_FOLLOWER); > + struct raft_request req; > + uint64_t old_term = raft.term; > + uint32_t old_vote = raft.vote; > + enum raft_state old_state = raft.state; > + > + if (raft_is_fully_on_disk()) { > +end_dump: > + raft.is_write_in_progress = false; > + /* > + * The state machine is stable. Can see now, to what state to > + * go. > + */ > + if (!raft.is_candidate) { > + /* > + * If not a candidate, can't do anything except vote for > + * somebody (if Raft is enabled). Nothing to do except > + * staying a follower without timeouts. > + */ > + } else if (raft.leader != 0) { > + /* There is a known leader. Wait until it is dead. */ > + raft_sm_wait_leader_dead(); > + } else if (raft.vote == instance_id) { > + /* Just wrote own vote. */ > + if (raft_election_quorum() == 1) > + raft_sm_become_leader(); > + else > + raft_sm_become_candidate(); > + } else if (raft.vote != 0) { > + /* > + * Voted for some other node. Wait if it manages to > + * become a leader. > + */ > + raft_sm_wait_election_end(); > + } else { > + /* No leaders, no votes. */ > + raft_sm_schedule_new_election(); > + } > + } else { > + memset(&req, 0, sizeof(req)); > + assert(raft.volatile_term >= raft.term); > + /* Term is written always. */ > + req.term = raft.volatile_term; > + if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote) > + req.vote = raft.volatile_vote; > + > + raft_write_request(&req); > + say_verbose("RAFT: persist and apply state %s", > + raft_request_to_string(&req)); > + > + assert(req.term >= raft.term); > + if (req.term > raft.term) { > + raft.term = req.term; > + raft.vote = 0; > + } > + if (req.vote != 0) { > + assert(raft.vote == 0); > + raft.vote = req.vote; > + } > + if (raft_is_fully_on_disk()) > + goto end_dump; > + } > + > + memset(&req, 0, sizeof(req)); > + /* Term is encoded always. */ > + req.term = raft.term; > + bool has_changes = old_term != raft.term; > + if (raft.vote != 0 && old_vote != raft.vote) { > + req.vote = raft.vote; > + /* > + * When vote for self, need to send current vclock too. Two > + * reasons for that: > + * > + * - nodes need to vote for the instance containing the newest > + * data. So as not to loose it, because some of it may be > + * confirmed by the synchronous replication; > + * > + * - replication is basically stopped during election. Other > + * nodes can't learn vclock of this instance through regular > + * replication. > + */ > + if (raft.vote == instance_id) > + req.vclock = &replicaset.vclock; > + has_changes = true; > } > - switch (req->state) { > - case RAFT_STATE_FOLLOWER: > - break; > - case RAFT_STATE_CANDIDATE: > - // Perform voting logic. > - break; > - case RAFT_STATE_LEADER: > - // Switch to a new leader. > - break; > - default: > - break; > + if (raft.state != old_state) { > + req.state = raft.state; > + has_changes = true; > } > + if (has_changes) > + raft_broadcast(&req); > +} > + > +static int > +raft_worker_f(va_list args) > +{ > + (void)args; > + while (!fiber_is_cancelled()) { > + if (!raft.is_write_in_progress) > + goto idle; > + raft_worker_handle_io(); > + if (!raft.is_write_in_progress) > + goto idle; > + fiber_sleep(0); > + continue; > + idle: > + assert(raft_is_fully_on_disk()); > + fiber_yield(); > + } > + return 0; > +} > + > +static void > +raft_sm_pause_and_dump(void) > +{ > + assert(raft.state == RAFT_STATE_FOLLOWER); > + if (raft.is_write_in_progress) > + return; > + ev_timer_stop(loop(), &raft.timer); > + raft.is_write_in_progress = true; > + if (raft.worker == NULL) > + raft.worker = fiber_new("raft_worker", raft_worker_f); > + fiber_wakeup(raft.worker); > +} > + > +static void > +raft_sm_become_leader(void) > +{ > + assert(raft.state != RAFT_STATE_LEADER); > + say_info("RAFT: enter leader state with quorum %d", > + raft_election_quorum()); > + assert(raft.leader == 0); > + assert(raft.is_candidate); > + assert(!raft.is_write_in_progress); > + raft.state = RAFT_STATE_LEADER; > + raft.leader = instance_id; > + ev_timer_stop(loop(), &raft.timer); > + /* Make read-write (if other subsystems allow that. */ > + box_update_ro_summary(); > +} > + > +static void > +raft_sm_follow_leader(uint32_t leader) > +{ > + say_info("RAFT: leader is %u, follow", leader); > + assert(raft.state != RAFT_STATE_LEADER); > + assert(raft.leader == 0); > + raft.state = RAFT_STATE_FOLLOWER; > + raft.leader = leader; > + if (!raft.is_write_in_progress) { > + ev_timer_stop(loop(), &raft.timer); > + raft_sm_wait_leader_dead(); > + } > +} > + > +static void > +raft_sm_become_candidate(void) > +{ > + say_info("RAFT: enter candidate state with 1 self vote"); > + assert(raft.state == RAFT_STATE_FOLLOWER); > + assert(raft.leader == 0); > + assert(raft.is_candidate); > + assert(!raft.is_write_in_progress); > + assert(raft_election_quorum() > 1); > + raft.state = RAFT_STATE_CANDIDATE; > + raft.vote_count = 1; > + raft.vote_mask = 0; > + bit_set(&raft.vote_mask, instance_id); > + raft_sm_wait_election_end(); > +} > + > +static void > +raft_sm_schedule_new_term(uint64_t new_term) > +{ > + say_info("RAFT: bump term to %llu, follow", new_term); > + assert(new_term > raft.volatile_term); > + assert(raft.volatile_term >= raft.term); > + raft.volatile_term = new_term; > + /* New terms means completely new Raft state. */ > + raft.volatile_vote = 0; > + raft.leader = 0; > + raft.state = RAFT_STATE_FOLLOWER; > + box_update_ro_summary(); > + raft_sm_pause_and_dump(); > +} > + > +static void > +raft_sm_schedule_new_vote(uint32_t new_vote) > +{ > + say_info("RAFT: vote for %u, follow", new_vote, raft.volatile_term); > + assert(raft.volatile_vote == 0); > + assert(raft.state == RAFT_STATE_FOLLOWER); > + raft.volatile_vote = new_vote; > + raft_sm_pause_and_dump(); > +} > + > +static void > +raft_sm_schedule_new_election(void) > +{ > + say_info("RAFT: begin new election round"); > + assert(raft_is_fully_on_disk()); > + assert(raft.is_candidate); > + /* Everyone is a follower until its vote for self is persisted. */ > + raft_sm_schedule_new_term(raft.term + 1); > + raft_sm_schedule_new_vote(instance_id); > + box_update_ro_summary(); > +} > + > +static void > +raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer, > + int events) > +{ > + assert(timer == &raft.timer); > + (void)events; > + ev_timer_stop(loop, timer); > + raft_sm_schedule_new_election(); > +} > + > +static void > +raft_sm_wait_leader_dead(void) > +{ > + assert(!ev_is_active(&raft.timer)); > + assert(!raft.is_write_in_progress); > + assert(raft.is_candidate); > + assert(raft.state == RAFT_STATE_FOLLOWER); > + assert(raft.leader != 0); > + double death_timeout = replication_disconnect_timeout(); > + ev_timer_set(&raft.timer, death_timeout, death_timeout); > + ev_timer_start(loop(), &raft.timer); > +} > + > +static void > +raft_sm_wait_election_end(void) > +{ > + assert(!ev_is_active(&raft.timer)); > + assert(!raft.is_write_in_progress); > + assert(raft.is_candidate); > + assert(raft.state == RAFT_STATE_FOLLOWER || > + (raft.state == RAFT_STATE_CANDIDATE && > + raft.volatile_vote == instance_id)); > + assert(raft.leader == 0); > + double election_timeout = raft.election_timeout + > + raft_new_random_election_shift(); > + ev_timer_set(&raft.timer, election_timeout, election_timeout); > + ev_timer_start(loop(), &raft.timer); > +} > + > +static void > +raft_sm_start(void) > +{ > + say_info("RAFT: start state machine"); > + assert(!ev_is_active(&raft.timer)); > + assert(!raft.is_write_in_progress); > + assert(!raft.is_enabled); > + assert(raft.state == RAFT_STATE_FOLLOWER); > + raft.is_enabled = true; > + raft.is_candidate = raft.is_cfg_candidate; > + if (!raft.is_candidate) > + /* Nop. */; > + else if (raft.leader != 0) > + raft_sm_wait_leader_dead(); > + else > + raft_sm_schedule_new_election(); > + box_update_ro_summary(); > + /* > + * When Raft is enabled, send the complete state. Because > + * it wasn't sent in disabled state. > + */ > + struct raft_request req; > + raft_serialize_for_network(&req, NULL); > + raft_broadcast(&req); > +} > + > +static void > +raft_sm_stop(void) > +{ > + say_info("RAFT: stop state machine"); > + assert(raft.is_enabled); > + raft.is_enabled = false; > + raft.is_candidate = false; > + if (raft.state == RAFT_STATE_LEADER) > + raft.leader = 0; > + raft.state = RAFT_STATE_FOLLOWER; > + box_update_ro_summary(); > } > > void > raft_serialize_for_network(struct raft_request *req, struct vclock *vclock) > { > memset(req, 0, sizeof(*req)); > + /* > + * Volatile state is never used for any communications. > + * Use only persisted state. > + */ > req->term = raft.term; > req->vote = raft.vote; > req->state = raft.state; > @@ -128,29 +869,90 @@ raft_serialize_for_disk(struct raft_request *req) > void > raft_cfg_is_enabled(bool is_enabled) > { > - raft.is_enabled = is_enabled; > + if (is_enabled == raft.is_enabled) > + return; > + > + if (!is_enabled) > + raft_sm_stop(); > + else > + raft_sm_start(); > } > > void > raft_cfg_is_candidate(bool is_candidate) > { > - raft.is_candidate = is_candidate; > + bool old_is_candidate = raft.is_candidate; > + raft.is_cfg_candidate = is_candidate; > + raft.is_candidate = is_candidate && raft.is_enabled; > + if (raft.is_candidate == old_is_candidate) > + return; > + > + if (raft.is_candidate) { > + assert(raft.state == RAFT_STATE_FOLLOWER); > + /* > + * If there is an on-going WAL write, it means there was some > + * node who sent newer data to this node. > + */ > + if (raft.leader == 0 && raft_is_fully_on_disk()) > + raft_sm_schedule_new_election(); > + } else if (raft.state != RAFT_STATE_FOLLOWER) { > + if (raft.state == RAFT_STATE_LEADER) > + raft.leader = 0; > + raft.state = RAFT_STATE_FOLLOWER; > + raft_broadcast_new_state(); > + } > + box_update_ro_summary(); > } > > void > raft_cfg_election_timeout(double timeout) > { > + if (timeout == raft.election_timeout) > + return; > + > raft.election_timeout = timeout; > + if (raft.vote != 0 && raft.leader == 0 && raft.is_candidate) { > + assert(ev_is_active(&raft.timer)); > + double timeout = ev_timer_remaining(loop(), &raft.timer) - > + raft.timer.at + raft.election_timeout; > + ev_timer_stop(loop(), &raft.timer); > + ev_timer_set(&raft.timer, timeout, timeout); > + ev_timer_start(loop(), &raft.timer); > + } > } > > void > raft_cfg_election_quorum(void) > { > + if (raft.state != RAFT_STATE_CANDIDATE || > + raft.state == RAFT_STATE_LEADER) > + return; > + if (raft.vote_count < raft_election_quorum()) > + return; > + /* > + * The node is a candidate. It means its state if fully synced with > + * disk. Otherwise it would be a follower. > + */ > + assert(!raft.is_write_in_progress); > + raft.state = RAFT_STATE_LEADER; > + raft.leader = instance_id; > + raft_broadcast_new_state(); > + box_update_ro_summary(); > } > > void > raft_cfg_death_timeout(void) > { > + if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate && > + raft.leader != 0) { > + assert(ev_is_active(&raft.timer)); > + double death_timeout = replication_disconnect_timeout(); > + double timeout = ev_timer_remaining(loop(), &raft.timer) - > + raft.timer.at + death_timeout; > + ev_timer_stop(loop(), &raft.timer); > + ev_timer_set(&raft.timer, timeout, timeout); > + ev_timer_start(loop(), &raft.timer); > + } > } > > void > @@ -163,3 +965,9 @@ raft_broadcast(const struct raft_request *req) > } > } > } > + > +void > +raft_init(void) > +{ > + ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0); > +} > diff --git a/src/box/raft.h b/src/box/raft.h > index db64cf933..23aedfe10 100644 > --- a/src/box/raft.h > +++ b/src/box/raft.h > @@ -31,34 +31,140 @@ > */ > #include > #include > +#include "tarantool_ev.h" > > #if defined(__cplusplus) > extern "C" { > #endif > > +/** > + * This is an implementation of Raft leader election protocol, separated from > + * synchronous replication part. > + * > + * The protocol describes an algorithm which helps to elect a single leader in > + * the cluster, which is supposed to handle write requests. And re-elect a new > + * leader, when the current leader dies. > + * > + * The implementation follows the protocol to the letter except a few important > + * details. > + * > + * Firstly, the original Raft assumes, that all nodes share the same log record > + * numbers. In Tarantool they are called LSNs. But in case of Tarantool each > + * node has its own LSN in its own component of vclock. That makes the election > + * messages a bit heavier, because the nodes need to send and compare complete > + * vclocks of each other instead of a single number like in the original Raft. > + * But logic becomes simpler. Because in the original Raft there is a problem of > + * uncertainty about what to do with records of an old leader right after a new > + * leader is elected. They could be rolled back or confirmed depending on > + * circumstances. The issue disappears when vclock is used. > + * > + * Secondly, leader election works differently during cluster bootstrap, until > + * number of bootstrapped replicas becomes >= election quorum. That arises from > + * specifics of replicas bootstrap and order of systems initialization. In > + * short: during bootstrap a leader election may use a smaller election quorum > + * than the configured one. See more details in the code. > + */ > + > +struct fiber; > struct raft_request; > struct vclock; > > enum raft_state { > + /** > + * Can't write. Can only accept data from a leader. Node in this state > + * either monitors an existing leader, or there is an on-going election > + * and the node voted for another node, or it can't be a candidate and > + * does not do anything. > + */ > RAFT_STATE_FOLLOWER = 1, > + /** > + * The node can't write. There is an active election, in which the node > + * voted for self. Now it waits for election outcome. > + */ > RAFT_STATE_CANDIDATE = 2, > + /** Election was successful. The node accepts write requests. */ > RAFT_STATE_LEADER = 3, > }; > > extern const char *raft_state_strs[]; > > struct raft { > + /** Instance ID of leader of the current term. */ > uint32_t leader; > + /** State of the instance. */ > enum raft_state state; > + /** > + * Volatile part of the Raft state, whose WAL write may be still > + * in-progress, and yet the state may be already used. Volatile state is > + * never sent to anywhere, but the state machine makes decisions based > + * on it. That is vital. > + * As an example, volatile vote needs to be used to reject votes inside > + * a term, where the instance already voted (even if the vote WAL write > + * is not finished yet). Otherwise the instance would try to write > + * several votes inside one term. > + */ > + uint64_t volatile_term; > + uint32_t volatile_vote; > + /** > + * Flag whether Raft is enabled. When disabled, it still persists terms > + * so as to quickly enroll into the cluster when (if) it is enabled. In > + * everything else disabled Raft does not affect instance work. > + */ > bool is_enabled; > + /** > + * Flag whether the node can become a leader. It is an accumulated value > + * of configuration options Raft enabled and Raft candidate. If at least > + * one is false - the instance is not a candidate. > + */ > bool is_candidate; > + /** Flag whether the instance is allowed to be a leader. */ > + bool is_cfg_candidate; > + /** > + * Flag whether Raft currently tries to write something into WAL. It > + * happens asynchronously, not right after Raft state is updated. > + */ > + bool is_write_in_progress; > + /** > + * Persisted Raft state. These values are used when need to tell current > + * Raft state to other nodes. > + */ > uint64_t term; > uint32_t vote; > + /** > + * Bit 1 on position N means that a vote from instance with ID = N was > + * obtained. > + */ > + uint32_t vote_mask; > + /** Number of votes for this instance. Valid only in candidate state. */ > + int vote_count; > + /** State machine timed event trigger. */ > + struct ev_timer timer; > + /** Worker fiber to execute blocking tasks like IO. */ > + struct fiber *worker; > + /** Configured election timeout in seconds. */ > double election_timeout; > }; > > extern struct raft raft; > > +/** > + * A flag whether the instance is read-only according to Raft. Even if Raft > + * allows writes though, it does not mean the instance is writable. It can be > + * affected by box.cfg.read_only, connection quorum. > + */ > +static inline bool > +raft_is_ro(void) > +{ > + return raft.is_enabled && raft.state != RAFT_STATE_LEADER; > +} > + > +/** See if the instance can accept rows from an instance with the given ID. */ > +static inline bool > +raft_is_source_allowed(uint32_t source_id) > +{ > + return !raft.is_enabled || raft.leader == source_id; > +} > + > /** Check if Raft is enabled. */ > static inline bool > raft_is_enabled(void) > @@ -78,6 +184,13 @@ raft_process_recovery(const struct raft_request *req); > void > raft_process_msg(const struct raft_request *req, uint32_t source); > > +/** > + * Process a heartbeat message from an instance with the given ID. It is used to > + * watch leader's health and start election when necessary. > + */ > +void > +raft_process_heartbeat(uint32_t source); > + > /** Configure whether Raft is enabled. */ > void > raft_cfg_is_enabled(bool is_enabled); > @@ -130,6 +243,10 @@ raft_serialize_for_disk(struct raft_request *req); > void > raft_broadcast(const struct raft_request *req); > > +/** Initialize Raft global data structures. */ > +void > +raft_init(void); > + > #if defined(__cplusplus) > } > #endif > diff --git a/src/box/relay.cc b/src/box/relay.cc > index 74581db9c..d63711600 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -771,6 +771,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) > relay_send(relay, row); > } > > +/** > + * Recreate recovery cursor from the last confirmed point. That is > + * used by Raft, when the node becomes a leader. It may happen, > + * that it already sent some data to other nodes as a follower, > + * and they ignored the data. Now when the node is a leader, it > + * should send the not confirmed data again. Otherwise the cluster > + * will stuck, or worse - the newer data would be sent without the > + * older sent but ignored data. > + */ > +static void > +relay_restart_recovery(struct relay *relay) > +{ > + recovery_delete(relay->r); > + relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock); > + recover_remaining_wals(relay->r, &relay->stream, NULL, true); > +} > + > struct relay_raft_msg { > struct cmsg base; > struct cmsg_hop route; > @@ -786,7 +803,14 @@ relay_raft_msg_push(struct cmsg *base) > struct xrow_header row; > xrow_encode_raft(&row, &fiber()->gc, &msg->req); > try { > + /* > + * Send the message before restarting the recovery. Otherwise > + * all the rows would be sent from under a non-leader role and > + * would be ignored again. > + */ > relay_send(msg->relay, &row); > + if (msg->req.state == RAFT_STATE_LEADER) > + relay_restart_recovery(msg->relay); > } catch (Exception *e) { > relay_set_error(msg->relay, e); > fiber_cancel(fiber()); -- Serge Petrenko