From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 9E2DF430411 for ; Thu, 3 Sep 2020 02:33:28 +0300 (MSK) From: Vladislav Shpilevoy Date: Thu, 3 Sep 2020 01:33:18 +0200 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 8/8] raft: state machine List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org, gorcunov@gmail.com First look at Raft state machine implementation. The commit is a draft. It does not contain any tests, and unlikely passes the existing tests. But gives a picture of where Raft is going. --- src/box/applier.cc | 18 +- src/box/box.cc | 5 +- src/box/lua/misc.cc | 25 +- src/box/raft.c | 645 +++++++++++++++++++++++++++++++++++++++++--- src/box/raft.h | 89 ++++++ src/box/relay.cc | 19 ++ 6 files changed, 746 insertions(+), 55 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 8de2f799b..7486d9929 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -916,8 +916,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row) * Return 0 for success or -1 in case of an error. */ static int -applier_apply_tx(struct stailq *rows) +applier_apply_tx(struct applier *applier, struct stailq *rows) { + /* + * Rows received not directly from a leader are ignored. That is a + * protection against the case when an old leader keeps sending data + * around not knowing yet that it is not a leader anymore. + * + * XXX: it may be that this can be fine to apply leader transactions by + * looking at their replica_id field if it is equal to leader id. That + * can be investigated as an 'optimization'. Even though may not give + * anything, because won't change total number of rows sent in the + * network anyway. + */ + if (!raft_is_source_allowed(applier->instance_id)) + return 0; struct xrow_header *first_row = &stailq_first_entry(rows, struct applier_tx_row, next)->row; struct xrow_header *last_row; @@ -1257,6 +1270,7 @@ applier_subscribe(struct applier *applier) struct xrow_header *first_row = &stailq_first_entry(&rows, struct applier_tx_row, next)->row; + raft_process_heartbeat(applier->instance_id); if (first_row->lsn == 0) { if (unlikely(iproto_type_is_raft_request( first_row->type))) { @@ -1266,7 +1280,7 @@ applier_subscribe(struct applier *applier) } else { applier_signal_ack(applier); } - } else if (applier_apply_tx(&rows) != 0) { + } else if (applier_apply_tx(applier, &rows) != 0) { diag_raise(); } diff --git a/src/box/box.cc b/src/box/box.cc index 427b771b3..2e9c90310 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -157,7 +157,7 @@ void box_update_ro_summary(void) { bool old_is_ro_summary = is_ro_summary; - is_ro_summary = is_ro || is_orphan; + is_ro_summary = is_ro || is_orphan || raft_is_ro(); /* In 99% nothing changes. Filter this out first. */ if (is_ro_summary == old_is_ro_summary) return; @@ -2646,6 +2646,7 @@ box_init(void) txn_limbo_init(); sequence_init(); + raft_init(); } bool @@ -2794,6 +2795,8 @@ box_cfg_xc(void) if (!is_bootstrap_leader) replicaset_sync(); + else if (raft_is_enabled()) + raft_bootstrap_leader(); /* box.cfg.read_only is not read yet. */ assert(box_is_ro()); diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc index 98e98abe2..efbbcfd1f 100644 --- a/src/box/lua/misc.cc +++ b/src/box/lua/misc.cc @@ -256,24 +256,26 @@ lbox_raft_new_term(struct lua_State *L) return 0; } -static int -lbox_raft_vote(struct lua_State *L) -{ - uint64_t vote_for = luaL_checkuint64(L, 1); - if (vote_for > UINT32_MAX) - return luaL_error(L, "Invalid vote"); - raft_vote(vote_for); - return 0; -} - static int lbox_raft_get(struct lua_State *L) { - lua_createtable(L, 0, 2); + lua_createtable(L, 0, 8); luaL_pushuint64(L, raft.term); lua_setfield(L, -2, "term"); + luaL_pushuint64(L, raft.volatile_term); + lua_setfield(L, -2, "volatile_term"); luaL_pushuint64(L, raft.vote); lua_setfield(L, -2, "vote"); + luaL_pushuint64(L, raft.volatile_vote); + lua_setfield(L, -2, "volatile_vote"); + lua_pushstring(L, raft_state_strs[raft.state]); + lua_setfield(L, -2, "state"); + lua_pushinteger(L, raft.vote_count); + lua_setfield(L, -2, "vote_count"); + lua_pushboolean(L, raft.is_write_in_progress); + lua_setfield(L, -2, "is_write_in_progress"); + lua_pushboolean(L, raft.is_candidate); + lua_setfield(L, -2, "is_candidate"); return 1; } @@ -285,7 +287,6 @@ box_lua_misc_init(struct lua_State *L) {"new_tuple_format", lbox_tuple_format_new}, /* Temporary helpers to sanity test raft persistency. */ {"raft_new_term", lbox_raft_new_term}, - {"raft_vote", lbox_raft_vote}, {"raft_get", lbox_raft_get}, {NULL, NULL} }; diff --git a/src/box/raft.c b/src/box/raft.c index 1acffb677..6f2891291 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -36,6 +36,12 @@ #include "small/region.h" #include "replication.h" #include "relay.h" +#include "box.h" + +/** + * Maximal random deviation of the election timeout. From the configured value. + */ +#define RAFT_RANDOM_ELECTION_FACTOR 0.1 const char *raft_state_strs[] = { NULL, @@ -48,19 +54,163 @@ const char *raft_state_strs[] = { struct raft raft = { .leader = 0, .state = RAFT_STATE_FOLLOWER, + .volatile_term = 1, + .volatile_vote = 0, .is_enabled = false, .is_candidate = false, + .is_cfg_candidate = false, + .is_write_in_progress = false, .term = 1, .vote = 0, + .vote_mask = 0, + .vote_count = 0, + .election_timeout = 5, }; +/** + * Check if Raft is completely synced with disk. Meaning all its critical values + * are in WAL. Only in that state the node can become a leader or a candidate. + * If the node has a not flushed data, it means either the term was bumped, or + * a new vote was made. + * + * In case of term bump it means either there is another node with a newer term, + * and this one should be a follower; or this node bumped the term itself along + * with making a vote to start a new election - then it is also a follower which + * will turn into a candidate when the flush is done. + * + * In case of a new not flushed vote it means either this node voted for some + * other node, and must be a follower; or it voted or self, and also must be a + * follower, but will become a candidate when the flush is done. + * + * In total - when something is not synced with disk, the instance is a follower + * in any case. + */ +static bool +raft_is_fully_on_disk(void) +{ + return raft.volatile_term == raft.term && + raft.volatile_vote == raft.vote; +} + +/** + * Raft protocol says that election timeout should be a bit randomized so as + * the nodes wouldn't start election at the same time and end up with not having + * a quorum for anybody. This implementation randomizes the election timeout by + * adding {election timeout * random factor} value, where the factor is a + * constant floating point value > 0. + */ +static inline double +raft_new_random_election_shift(void) +{ + double timeout = raft.election_timeout; + /* Translate to ms. */ + uint32_t rand_part = + (uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000); + if (rand_part == 0) + rand_part = 1; + /* + * XXX: this is not giving a good distribution, but it is not so trivial + * to implement a correct random value generator. There is a task to + * unify all such places. Not critical here. + */ + rand_part = rand() % (rand_part + 1); + return rand_part / 1000.0; +} + +/** + * Raft says that during election a node1 can vote for node2, if node2 has a + * bigger term, or has the same term but longer log. In case of Tarantool it + * means the node2 vclock should be >= node1 vclock, in all components. It is + * not enough to compare only one component. At least because there may be not + * a previous leader when the election happens first time. Or a node could + * restart and forget who the previous leader was. + */ +static inline bool +raft_can_vote_for(const struct vclock *v) +{ + if (v == NULL) + return false; + int cmp = vclock_compare_ignore0(v, &replicaset.vclock); + return cmp == 0 || cmp == 1; +} + +/** Broadcast an event about this node changed its state to all relays. */ +static inline void +raft_broadcast_new_state(void) +{ + struct raft_request req; + memset(&req, 0, sizeof(req)); + req.term = raft.term; + req.state = raft.state; + raft_broadcast(&req); +} + +/** Raft state machine methods. 'sm' stands for State Machine. */ + +/** + * Start the state machine. When it is stopped, Raft state is updated and + * goes to WAL when necessary, but it does not affect the instance operation. + * For example, when Raft is stopped, the instance role does not affect whether + * it is writable. + */ +static void +raft_sm_start(void); + +/** + * Stop the state machine. + * - Raft stops affecting the instance operation; + * - this node can't become a leader anymore; + * - this node can't vote. + */ +static void +raft_sm_stop(void); + +/** + * When the instance is a follower but is allowed to be a leader, it will wait + * for death of the current leader to start new election. + */ +static void +raft_sm_wait_leader_dead(void); + +/** + * If election is started by this node, or it voted for some other node started + * the election, and it can be a leader itself, it will wait until the current + * election times out. When it happens, the node will start new election. + */ +static void +raft_sm_wait_election_end(void); + +/** Bump volatile term and schedule its flush to disk. */ +static void +raft_sm_schedule_new_term(uint64_t new_term); + +/** Bump volatile vote and schedule its flush to disk. */ +static void +raft_sm_schedule_new_vote(uint32_t new_vote); + +/** Bump volatile term, vote for self, and schedule their flush to disk. */ +static void +raft_sm_schedule_new_election(void); + +/** + * The main trigger of Raft state machine - start new election when the current + * leader dies, or when there is no a leader and the previous election failed. + */ +static void +raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer, + int events); + void raft_process_recovery(const struct raft_request *req) { - if (req->term != 0) + if (req->term != 0) { raft.term = req->term; - if (req->vote != 0) + raft.volatile_term = req->term; + } + if (req->vote != 0) { raft.vote = req->vote; + raft.volatile_vote = req->vote; + } /* * Role is never persisted. If recovery is happening, the * node was restarted, and the former role can be false @@ -80,34 +230,136 @@ raft_process_recovery(const struct raft_request *req) void raft_process_msg(const struct raft_request *req, uint32_t source) { - (void)source; - if (req->term > raft.term) { - // Update term. - // The logic will be similar, but the code - // below is for testing purposes. - raft.term = req->term; - } - if (req->vote > 0) { - // Check whether the vote's for us. + assert(source > 0); + assert(source != instance_id); + /* Outdated request. */ + if (req->term < raft.volatile_term) + return; + + enum raft_state old_state = raft.state; + + /* Term bump. */ + if (req->term > raft.volatile_term) + raft_sm_schedule_new_term(req->term); + + /* Vote request during the on-going election. */ + if (req->vote != 0) { + switch (raft.state) { + case RAFT_STATE_FOLLOWER: + case RAFT_STATE_LEADER: + /* + * Can't respond on vote requests when Raft is disabled. + */ + if (!raft.is_enabled) + break; + /* Check if already voted in this term. */ + if (raft.volatile_vote != 0) + break; + /* Not a candidate. Can't accept votes. */ + if (req->vote == instance_id) + break; + /* Can't vote for too old or incomparable nodes. */ + if (!raft_can_vote_for(req->vclock)) + break; + /* + * Either the term is new, or didn't vote in the current + * term yet. Anyway can vote now. + */ + raft.state = RAFT_STATE_FOLLOWER; + raft_sm_schedule_new_vote(req->vote); + break; + case RAFT_STATE_CANDIDATE: + /* Check if this is a vote for a competing candidate. */ + if (req->vote != instance_id) + break; + /* + * Vote for self was requested earlier in this round, + * and now was answered by some other instance. + */ + assert(raft.volatile_vote == instance_id); + bool was_set = bit_set(&raft.vote_mask, source); + raft.vote_count += !was_set; + if (raft.vote_count < replication_synchro_quorum) + break; + raft.state = RAFT_STATE_LEADER; + raft.leader = instance_id; + break; + default: + unreachable(); + } } - switch (req->state) { - case RAFT_STATE_FOLLOWER: - break; - case RAFT_STATE_CANDIDATE: - // Perform voting logic. - break; - case RAFT_STATE_LEADER: - // Switch to a new leader. - break; - default: - break; + /* + * If the node does not claim to be a leader, nothing interesting. Terms + * and votes are already handled. + */ + if (req->state != RAFT_STATE_LEADER) + goto end; + /* The node is a leader, but it is already known. */ + if (source == raft.leader) + goto end; + /* + * XXX: A message from a conflicting leader. Split brain, basically. + * Need to decide what to do. Current solution is to do nothing. In + * future either this node should try to become a leader, or should stop + * all writes and require manual intervention. + */ + if (raft.leader != 0) + goto end; + + /* New leader was elected. */ + raft.state = RAFT_STATE_FOLLOWER; + raft.leader = source; +end: + if (raft.state != old_state) { + /* + * If the node stopped being a leader - should become read-only. + * If became a leader - should become read-write (if other + * subsystems also allow read-write). + */ + box_update_ro_summary(); + /* + * New term and vote are not broadcasted yet. Firstly their WAL + * write should be finished. But the state is volatile. It is ok + * to broadcast it now. + */ + raft_broadcast_new_state(); } } +void +raft_process_heartbeat(uint32_t source) +{ + /* + * When not a candidate - don't wait for anything. Therefore does not + * care about the leader being dead. + */ + if (!raft.is_candidate) + return; + /* Don't care about heartbeats when this node is a leader itself. */ + if (raft.state == RAFT_STATE_LEADER) + return; + /* Not interested in heartbeats from not a leader. */ + if (raft.leader != source) + return; + /* + * XXX: it may be expensive to reset the timer like that. It may be less + * expensive to let the timer work, and remember last timestamp when + * anything was heard from the leader. Then in the timer callback check + * the timestamp, and restart the timer, if it is fine. + */ + assert(ev_is_active(&raft.timer)); + ev_timer_stop(loop(), &raft.timer); + raft_sm_wait_leader_dead(); +} + void raft_serialize(struct raft_request *req, struct vclock *vclock) { memset(req, 0, sizeof(*req)); + /* + * Volatile state is never used for any communications. + * Use only persisted state. + */ req->term = raft.term; req->vote = raft.vote; req->state = raft.state; @@ -117,15 +369,23 @@ raft_serialize(struct raft_request *req, struct vclock *vclock) req->vclock = vclock; } +/** Wakeup Raft state writer fiber waiting for WAL write end. */ static void raft_write_cb(struct journal_entry *entry) { fiber_wakeup(entry->complete_data); } +/** Synchronously write a Raft request into WAL. */ static void raft_write_request(const struct raft_request *req) { + assert(raft.is_write_in_progress); + /* + * Vclock is never persisted by Raft. It is used only to + * be sent to network when vote for self. + */ + assert(req->vclock == NULL); struct region *region = &fiber()->gc; uint32_t svp = region_used(region); struct xrow_header row; @@ -157,57 +417,343 @@ fail: panic("Could not write a raft request to WAL\n"); } +/** + * Flush Raft state changes to WAL. The callback resets itself, if during the + * write more changes appear. + */ +static void +raft_sm_dump_step(ev_loop *loop, ev_check *watcher, int events) +{ + assert(watcher == &raft.io); + (void) events; + assert(raft.is_write_in_progress); + /* During write Raft can't be anything but a follower. */ + assert(raft.state == RAFT_STATE_FOLLOWER); + struct raft_request req; + uint64_t old_term = raft.term; + uint32_t old_vote = raft.vote; + enum raft_state old_state = raft.state; + + if (raft_is_fully_on_disk()) { +end_dump: + raft.is_write_in_progress = false; + ev_check_stop(loop, watcher); + /* + * The state machine is stable. Can see now, to what state to + * go. + */ + if (!raft.is_candidate) { + /* + * If not a candidate, can't do anything except vote for + * somebody (if Raft is enabled). Nothing to do except + * staying a follower without timeouts. + */ + } else if (raft.leader != 0) { + /* There is a known leader. Wait until it is dead. */ + raft_sm_wait_leader_dead(); + } else if (raft.vote == instance_id) { + /* Just wrote own vote. */ + if (replication_synchro_quorum == 1) { + raft.state = RAFT_STATE_LEADER; + raft.leader = instance_id; + /* + * Make read-write (if other subsystems allow + * that). + */ + box_update_ro_summary(); + } else { + raft.state = RAFT_STATE_CANDIDATE; + raft.vote_count = 1; + raft.vote_mask = 0; + raft_sm_wait_election_end(); + } + } else if (raft.vote != 0) { + /* + * Voted for some other node. Wait if it manages to + * become a leader. + */ + raft_sm_wait_election_end(); + } else { + /* No leaders, no votes. */ + raft_sm_schedule_new_election(); + } + } else { + memset(&req, 0, sizeof(req)); + assert(raft.volatile_term >= raft.term); + /* Term is written always. */ + req.term = raft.volatile_term; + if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote) + req.vote = raft.volatile_vote; + + raft_write_request(&req); + + assert(req.term >= raft.term); + if (req.term > raft.term) { + raft.term = req.term; + raft.vote = 0; + } + if (req.vote != 0) { + assert(raft.vote == 0); + raft.vote = req.vote; + } + if (raft_is_fully_on_disk()) + goto end_dump; + } + + memset(&req, 0, sizeof(req)); + /* Term is encoded always. */ + req.term = raft.term; + bool has_changes = old_term != raft.term; + if (raft.vote != 0 && old_vote != raft.vote) { + req.vote = raft.vote; + /* + * When vote for self, need to send current vclock too. Two + * reasons for that: + * + * - nodes need to vote for the instance containing the newest + * data. So as not to loose it, because some of it may be + * confirmed by the synchronous replication; + * + * - replication is basically stopped during election. Other + * nodes can't learn vclock of this instance through regular + * replication. + */ + if (raft.vote == instance_id) + req.vclock = &replicaset.vclock; + has_changes = true; + } + if (raft.state != old_state) { + req.state = raft.state; + has_changes = true; + } + if (has_changes) + raft_broadcast(&req); +} + +/** Start Raft state flush to disk. */ +static void +raft_sm_pause_and_dump(void) +{ + assert(raft.state == RAFT_STATE_FOLLOWER); + if (raft.is_write_in_progress) + return; + ev_timer_stop(loop(), &raft.timer); + ev_check_start(loop(), &raft.io); + raft.is_write_in_progress = true; +} + +/** Bump term, reset Raft state, and persist that fact. */ +static void +raft_sm_schedule_new_term(uint64_t new_term) +{ + assert(new_term > raft.volatile_term); + assert(raft.volatile_term >= raft.term); + raft.volatile_term = new_term; + /* New terms means completely new Raft state. */ + raft.volatile_vote = 0; + raft.leader = 0; + raft.state = RAFT_STATE_FOLLOWER; + raft_sm_pause_and_dump(); +} + +/** Vote in the current term, and persist that fact. */ +static void +raft_sm_schedule_new_vote(uint32_t new_vote) +{ + assert(raft.volatile_vote == 0); + raft.volatile_vote = new_vote; + raft_sm_pause_and_dump(); +} + +/** + * Bump term and vote for self immediately. After that is persisted, the + * election timeout will be activated. Unless during that nothing newer happens. + */ +static void +raft_sm_schedule_new_election(void) +{ + assert(raft_is_fully_on_disk()); + assert(raft.is_candidate); + assert(raft.leader == 0); + /* Everyone is a follower until its vote for self is persisted. */ + raft_sm_schedule_new_term(raft.term + 1); + raft_sm_schedule_new_vote(instance_id); + box_update_ro_summary(); +} + void raft_new_term(uint64_t min_new_term) { + uint64_t new_term; if (raft.term < min_new_term) - raft.term = min_new_term + 1; + new_term = min_new_term + 1; else - ++raft.term; + new_term = raft.term + 1; + enum raft_state old_state = raft.state; + raft_sm_schedule_new_term(new_term); + if (raft.state != old_state) + raft_broadcast_new_state(); + box_update_ro_summary(); +} + +static void +raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer, + int events) +{ + assert(timer == &raft.timer); + (void)events; + ev_timer_stop(loop, timer); + raft_sm_schedule_new_election(); +} + +static void +raft_sm_wait_leader_dead(void) +{ + assert(!ev_is_active(&raft.timer)); + assert(!ev_is_active(&raft.io)); + assert(!raft.is_write_in_progress); + assert(raft.is_candidate); + assert(raft.state == RAFT_STATE_FOLLOWER); + assert(raft.leader != 0); + double death_timeout = replication_disconnect_timeout(); + ev_timer_set(&raft.timer, death_timeout, death_timeout); +} +static void +raft_sm_wait_election_end(void) +{ + assert(!ev_is_active(&raft.timer)); + assert(!ev_is_active(&raft.io)); + assert(!raft.is_write_in_progress); + assert(raft.is_candidate); + assert(raft.state == RAFT_STATE_FOLLOWER || + (raft.state == RAFT_STATE_CANDIDATE && + raft.volatile_vote == instance_id)); + assert(raft.leader == 0); + double election_timeout = raft.election_timeout + + raft_new_random_election_shift(); + ev_timer_set(&raft.timer, election_timeout, election_timeout); +} + +static void +raft_sm_start(void) +{ + assert(!ev_is_active(&raft.timer)); + assert(!ev_is_active(&raft.io)); + assert(!raft.is_write_in_progress); + assert(!raft.is_enabled); + assert(raft.state == RAFT_STATE_FOLLOWER); + raft.is_enabled = true; + raft.is_candidate = raft.is_cfg_candidate; + if (!raft.is_candidate) + /* Nop. */; + else if (raft.leader != 0) + raft_sm_wait_leader_dead(); + else + raft_sm_schedule_new_election(); + box_update_ro_summary(); + /* + * When Raft is enabled, send the complete state. Because + * it wasn't sent in disabled state. + */ struct raft_request req; - memset(&req, 0, sizeof(req)); - req.term = raft.term; - raft_write_request(&req); + raft_serialize(&req, NULL); + raft_broadcast(&req); +} + +static void +raft_sm_stop(void) +{ + assert(raft.is_enabled); + raft.is_enabled = false; + raft.is_candidate = false; + box_update_ro_summary(); } void raft_cfg_is_enabled(bool is_enabled) { - raft.is_enabled = is_enabled; + if (is_enabled == raft.is_enabled) + return; + + if (!is_enabled) + raft_sm_stop(); + else + raft_sm_start(); } void raft_cfg_is_candidate(bool is_candidate) { - raft.is_candidate = is_candidate; + bool old_is_candidate = raft.is_candidate; + raft.is_cfg_candidate = is_candidate; + raft.is_candidate = is_candidate && raft.is_enabled; + if (raft.is_candidate == old_is_candidate) + return; + + if (raft.is_candidate) { + assert(raft.state == RAFT_STATE_FOLLOWER); + /* + * If there is an on-going WAL write, it means + * there was some node who sent newer data to this + * node. + */ + if (raft.leader == 0 && raft_is_fully_on_disk()) + raft_sm_schedule_new_election(); + } else if (raft.state != RAFT_STATE_FOLLOWER) { + raft.state = RAFT_STATE_FOLLOWER; + raft_broadcast_new_state(); + } + box_update_ro_summary(); } void raft_cfg_election_timeout(double timeout) { + if (timeout == raft.election_timeout) + return; + raft.election_timeout = timeout; + if (raft.vote != 0 && raft.leader == 0) { + assert(ev_is_active(&raft.timer)); + double timeout = ev_timer_remaining(loop(), &raft.timer) - + raft.timer.at + raft.election_timeout; + ev_timer_stop(loop(), &raft.timer); + ev_timer_set(&raft.timer, timeout, timeout); + } } void raft_cfg_election_quorum(void) { + if (raft.state != RAFT_STATE_CANDIDATE || + raft.state == RAFT_STATE_LEADER) + return; + if (raft.vote_count < replication_synchro_quorum) + return; + /* + * The node is a candidate. It means its state if fully synced with + * disk. Otherwise it would be a follower. + */ + assert(!raft.is_write_in_progress); + raft.state = RAFT_STATE_LEADER; + raft.leader = instance_id; + raft_broadcast_new_state(); + box_update_ro_summary(); } void raft_cfg_death_timeout(void) { -} - -void -raft_vote(uint32_t vote_for) -{ - raft.vote = vote_for; - - struct raft_request req; - memset(&req, 0, sizeof(req)); - req.vote = vote_for; - raft_write_request(&req); + if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate && + raft.leader != 0) { + assert(ev_is_active(&raft.timer)); + double death_timeout = replication_disconnect_timeout(); + double timeout = ev_timer_remaining(loop(), &raft.timer) - + raft.timer.at + death_timeout; + ev_timer_stop(loop(), &raft.timer); + ev_timer_set(&raft.timer, timeout, timeout); + } } void @@ -220,3 +766,22 @@ raft_broadcast(const struct raft_request *req) } } } + +void +raft_bootstrap_leader(void) +{ + assert(raft.is_enabled); + assert(raft.volatile_term == 0); + assert(raft.volatile_vote == 0); + assert(raft.state == RAFT_STATE_FOLLOWER); + raft.state = RAFT_STATE_LEADER; + raft_broadcast_new_state(); + box_update_ro_summary(); +} + +void +raft_init(void) +{ + ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0); + ev_check_init(&raft.io, raft_sm_dump_step); +} diff --git a/src/box/raft.h b/src/box/raft.h index d875707de..57584bc1b 100644 --- a/src/box/raft.h +++ b/src/box/raft.h @@ -31,6 +31,7 @@ */ #include #include +#include "tarantool_ev.h" #if defined(__cplusplus) extern "C" { @@ -48,12 +49,62 @@ enum raft_state { extern const char *raft_state_strs[]; struct raft { + /** Instance ID of leader of the current term. */ uint32_t leader; + /** State of the instance. */ enum raft_state state; + /** + * Volatile part of the Raft state, whose WAL write may be + * still in-progress, and yet the state may be already + * used. Volatile state is never sent to anywhere, but the + * state machine makes decisions based on it. That is + * vital. + * As an example, volatile vote needs to be used to reject + * votes inside a term, where the instance already voted + * (even if the vote WAL write is not finished yet). + * Otherwise the instance would try to write several votes + * inside one term. + */ + uint64_t volatile_term; + uint32_t volatile_vote; + /** + * Flag whether Raft is enabled. When disabled, it still + * persists terms so as to quickly enroll into the cluster + * when (if) it is enabled. In everything else disabled + * Raft does not affect instance work. + */ bool is_enabled; + /** + * Flag whether the node can become a leader. It is an + * accumulated value of configuration options Raft enabled + * Raft candidate. If at least one is false - the instance + * is not a candidate. + */ bool is_candidate; + /** Flag whether the instance is allowed to be a leader. */ + bool is_cfg_candidate; + /** + * Flag whether Raft currently tries to write something into WAL. It + * happens asynchronously, not right after Raft state is updated. + */ + bool is_write_in_progress; + /** + * Persisted Raft state. These values are used when need to tell current + * Raft state to other nodes. + */ uint64_t term; uint32_t vote; + /** Bit 1 means that a vote from that instance was obtained. */ + uint32_t vote_mask; + /** Number of votes for this instance. Valid only in candidate state. */ + int vote_count; + /** State machine timed event trigger. */ + struct ev_timer timer; + /** + * Dump of Raft state in the end of event loop, when it is changed. + */ + struct ev_check io; + /** Configured election timeout in seconds. */ double election_timeout; }; @@ -65,6 +116,18 @@ raft_new_term(uint64_t min_new_term); void raft_vote(uint32_t vote_for); +static inline bool +raft_is_ro(void) +{ + return raft.is_enabled && raft.state != RAFT_STATE_LEADER; +} + +static inline bool +raft_is_source_allowed(uint32_t source_id) +{ + return !raft.is_enabled || raft.leader == source_id; +} + static inline bool raft_is_enabled(void) { @@ -83,6 +146,9 @@ raft_process_recovery(const struct raft_request *req); void raft_process_msg(const struct raft_request *req, uint32_t source); +void +raft_process_heartbeat(uint32_t source); + /** * Broadcast the changes in this instance's raft status to all * the followers. @@ -113,6 +179,29 @@ raft_serialize(struct raft_request *req, struct vclock *vclock); void raft_broadcast(const struct raft_request *req); +/** + * Bootstrap the current instance as the first leader of the cluster. That is + * done bypassing the Raft election protocol, by just assigning this node a + * leader role. That is needed, because when the cluster is not bootstrapped, it + * is necessary to find a node, which will generate a replicaset UUID, write it + * into _cluster space, and register all the other nodes in _cluster. + * Until it is done, all nodes but one won't boot. Their WALs won't work. And + * therefore they won't be able to participate in leader election. That + * effectively makes the cluster dead from the beginning unless the first + * bootstrapped node won't declare itself a leader without elections. + * + * XXX: That does not solve the problem, when the first node boots, creates a + * snapshot, and then immediately dies. After recovery it won't declare itself a + * leader. Therefore if quorum > 1, the cluster won't proceed to registering + * any replicas and becomes completely dead. Perhaps that must be solved by + * truncating quorum down to number of records in _cluster. + */ +void +raft_bootstrap_leader(void); + +void +raft_init(void); + #if defined(__cplusplus) } #endif diff --git a/src/box/relay.cc b/src/box/relay.cc index 74581db9c..4f9bbc0de 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -771,6 +771,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) relay_send(relay, row); } +/** + * Recreate recovery cursor from the last confirmed point. That is + * used by Raft, when the node becomes a leader. It may happen, + * that it already sent some data to other nodes as a follower, + * and they ignored the data. Now when the node is a leader, it + * should send the not confirmed data again. Otherwise the cluster + * will stuck, or worse - the newer data would be sent without the + * older sent but ignored data. + */ +static void +relay_restart_recovery(struct relay *relay) +{ + recovery_delete(relay->r); + relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock); + recover_remaining_wals(relay->r, &relay->stream, NULL, true); +} + struct relay_raft_msg { struct cmsg base; struct cmsg_hop route; @@ -786,6 +803,8 @@ relay_raft_msg_push(struct cmsg *base) struct xrow_header row; xrow_encode_raft(&row, &fiber()->gc, &msg->req); try { + if (msg->req.state == RAFT_STATE_LEADER) + relay_restart_recovery(msg->relay); relay_send(msg->relay, &row); } catch (Exception *e) { relay_set_error(msg->relay, e); -- 2.21.1 (Apple Git-122.3)