[Tarantool-patches] [PATCH 8/8] raft: state machine
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Sep 3 02:33:18 MSK 2020
First look at Raft state machine implementation.
The commit is a draft. It does not contain any tests, and
unlikely passes the existing tests. But gives a picture of where
Raft is going.
---
src/box/applier.cc | 18 +-
src/box/box.cc | 5 +-
src/box/lua/misc.cc | 25 +-
src/box/raft.c | 645 +++++++++++++++++++++++++++++++++++++++++---
src/box/raft.h | 89 ++++++
src/box/relay.cc | 19 ++
6 files changed, 746 insertions(+), 55 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 8de2f799b..7486d9929 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -916,8 +916,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
* Return 0 for success or -1 in case of an error.
*/
static int
-applier_apply_tx(struct stailq *rows)
+applier_apply_tx(struct applier *applier, struct stailq *rows)
{
+ /*
+ * Rows received not directly from a leader are ignored. That is a
+ * protection against the case when an old leader keeps sending data
+ * around not knowing yet that it is not a leader anymore.
+ *
+ * XXX: it may be that this can be fine to apply leader transactions by
+ * looking at their replica_id field if it is equal to leader id. That
+ * can be investigated as an 'optimization'. Even though may not give
+ * anything, because won't change total number of rows sent in the
+ * network anyway.
+ */
+ if (!raft_is_source_allowed(applier->instance_id))
+ return 0;
struct xrow_header *first_row = &stailq_first_entry(rows,
struct applier_tx_row, next)->row;
struct xrow_header *last_row;
@@ -1257,6 +1270,7 @@ applier_subscribe(struct applier *applier)
struct xrow_header *first_row =
&stailq_first_entry(&rows, struct applier_tx_row,
next)->row;
+ raft_process_heartbeat(applier->instance_id);
if (first_row->lsn == 0) {
if (unlikely(iproto_type_is_raft_request(
first_row->type))) {
@@ -1266,7 +1280,7 @@ applier_subscribe(struct applier *applier)
} else {
applier_signal_ack(applier);
}
- } else if (applier_apply_tx(&rows) != 0) {
+ } else if (applier_apply_tx(applier, &rows) != 0) {
diag_raise();
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 427b771b3..2e9c90310 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -157,7 +157,7 @@ void
box_update_ro_summary(void)
{
bool old_is_ro_summary = is_ro_summary;
- is_ro_summary = is_ro || is_orphan;
+ is_ro_summary = is_ro || is_orphan || raft_is_ro();
/* In 99% nothing changes. Filter this out first. */
if (is_ro_summary == old_is_ro_summary)
return;
@@ -2646,6 +2646,7 @@ box_init(void)
txn_limbo_init();
sequence_init();
+ raft_init();
}
bool
@@ -2794,6 +2795,8 @@ box_cfg_xc(void)
if (!is_bootstrap_leader)
replicaset_sync();
+ else if (raft_is_enabled())
+ raft_bootstrap_leader();
/* box.cfg.read_only is not read yet. */
assert(box_is_ro());
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index 98e98abe2..efbbcfd1f 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -256,24 +256,26 @@ lbox_raft_new_term(struct lua_State *L)
return 0;
}
-static int
-lbox_raft_vote(struct lua_State *L)
-{
- uint64_t vote_for = luaL_checkuint64(L, 1);
- if (vote_for > UINT32_MAX)
- return luaL_error(L, "Invalid vote");
- raft_vote(vote_for);
- return 0;
-}
-
static int
lbox_raft_get(struct lua_State *L)
{
- lua_createtable(L, 0, 2);
+ lua_createtable(L, 0, 8);
luaL_pushuint64(L, raft.term);
lua_setfield(L, -2, "term");
+ luaL_pushuint64(L, raft.volatile_term);
+ lua_setfield(L, -2, "volatile_term");
luaL_pushuint64(L, raft.vote);
lua_setfield(L, -2, "vote");
+ luaL_pushuint64(L, raft.volatile_vote);
+ lua_setfield(L, -2, "volatile_vote");
+ lua_pushstring(L, raft_state_strs[raft.state]);
+ lua_setfield(L, -2, "state");
+ lua_pushinteger(L, raft.vote_count);
+ lua_setfield(L, -2, "vote_count");
+ lua_pushboolean(L, raft.is_write_in_progress);
+ lua_setfield(L, -2, "is_write_in_progress");
+ lua_pushboolean(L, raft.is_candidate);
+ lua_setfield(L, -2, "is_candidate");
return 1;
}
@@ -285,7 +287,6 @@ box_lua_misc_init(struct lua_State *L)
{"new_tuple_format", lbox_tuple_format_new},
/* Temporary helpers to sanity test raft persistency. */
{"raft_new_term", lbox_raft_new_term},
- {"raft_vote", lbox_raft_vote},
{"raft_get", lbox_raft_get},
{NULL, NULL}
};
diff --git a/src/box/raft.c b/src/box/raft.c
index 1acffb677..6f2891291 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -36,6 +36,12 @@
#include "small/region.h"
#include "replication.h"
#include "relay.h"
+#include "box.h"
+
+/**
+ * Maximal random deviation of the election timeout. From the configured value.
+ */
+#define RAFT_RANDOM_ELECTION_FACTOR 0.1
const char *raft_state_strs[] = {
NULL,
@@ -48,19 +54,163 @@ const char *raft_state_strs[] = {
struct raft raft = {
.leader = 0,
.state = RAFT_STATE_FOLLOWER,
+ .volatile_term = 1,
+ .volatile_vote = 0,
.is_enabled = false,
.is_candidate = false,
+ .is_cfg_candidate = false,
+ .is_write_in_progress = false,
.term = 1,
.vote = 0,
+ .vote_mask = 0,
+ .vote_count = 0,
+ .election_timeout = 5,
};
+/**
+ * Check if Raft is completely synced with disk. Meaning all its critical values
+ * are in WAL. Only in that state the node can become a leader or a candidate.
+ * If the node has a not flushed data, it means either the term was bumped, or
+ * a new vote was made.
+ *
+ * In case of term bump it means either there is another node with a newer term,
+ * and this one should be a follower; or this node bumped the term itself along
+ * with making a vote to start a new election - then it is also a follower which
+ * will turn into a candidate when the flush is done.
+ *
+ * In case of a new not flushed vote it means either this node voted for some
+ * other node, and must be a follower; or it voted or self, and also must be a
+ * follower, but will become a candidate when the flush is done.
+ *
+ * In total - when something is not synced with disk, the instance is a follower
+ * in any case.
+ */
+static bool
+raft_is_fully_on_disk(void)
+{
+ return raft.volatile_term == raft.term &&
+ raft.volatile_vote == raft.vote;
+}
+
+/**
+ * Raft protocol says that election timeout should be a bit randomized so as
+ * the nodes wouldn't start election at the same time and end up with not having
+ * a quorum for anybody. This implementation randomizes the election timeout by
+ * adding {election timeout * random factor} value, where the factor is a
+ * constant floating point value > 0.
+ */
+static inline double
+raft_new_random_election_shift(void)
+{
+ double timeout = raft.election_timeout;
+ /* Translate to ms. */
+ uint32_t rand_part =
+ (uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
+ if (rand_part == 0)
+ rand_part = 1;
+ /*
+ * XXX: this is not giving a good distribution, but it is not so trivial
+ * to implement a correct random value generator. There is a task to
+ * unify all such places. Not critical here.
+ */
+ rand_part = rand() % (rand_part + 1);
+ return rand_part / 1000.0;
+}
+
+/**
+ * Raft says that during election a node1 can vote for node2, if node2 has a
+ * bigger term, or has the same term but longer log. In case of Tarantool it
+ * means the node2 vclock should be >= node1 vclock, in all components. It is
+ * not enough to compare only one component. At least because there may be not
+ * a previous leader when the election happens first time. Or a node could
+ * restart and forget who the previous leader was.
+ */
+static inline bool
+raft_can_vote_for(const struct vclock *v)
+{
+ if (v == NULL)
+ return false;
+ int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
+ return cmp == 0 || cmp == 1;
+}
+
+/** Broadcast an event about this node changed its state to all relays. */
+static inline void
+raft_broadcast_new_state(void)
+{
+ struct raft_request req;
+ memset(&req, 0, sizeof(req));
+ req.term = raft.term;
+ req.state = raft.state;
+ raft_broadcast(&req);
+}
+
+/** Raft state machine methods. 'sm' stands for State Machine. */
+
+/**
+ * Start the state machine. When it is stopped, Raft state is updated and
+ * goes to WAL when necessary, but it does not affect the instance operation.
+ * For example, when Raft is stopped, the instance role does not affect whether
+ * it is writable.
+ */
+static void
+raft_sm_start(void);
+
+/**
+ * Stop the state machine.
+ * - Raft stops affecting the instance operation;
+ * - this node can't become a leader anymore;
+ * - this node can't vote.
+ */
+static void
+raft_sm_stop(void);
+
+/**
+ * When the instance is a follower but is allowed to be a leader, it will wait
+ * for death of the current leader to start new election.
+ */
+static void
+raft_sm_wait_leader_dead(void);
+
+/**
+ * If election is started by this node, or it voted for some other node started
+ * the election, and it can be a leader itself, it will wait until the current
+ * election times out. When it happens, the node will start new election.
+ */
+static void
+raft_sm_wait_election_end(void);
+
+/** Bump volatile term and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_term(uint64_t new_term);
+
+/** Bump volatile vote and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote);
+
+/** Bump volatile term, vote for self, and schedule their flush to disk. */
+static void
+raft_sm_schedule_new_election(void);
+
+/**
+ * The main trigger of Raft state machine - start new election when the current
+ * leader dies, or when there is no a leader and the previous election failed.
+ */
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+ int events);
+
void
raft_process_recovery(const struct raft_request *req)
{
- if (req->term != 0)
+ if (req->term != 0) {
raft.term = req->term;
- if (req->vote != 0)
+ raft.volatile_term = req->term;
+ }
+ if (req->vote != 0) {
raft.vote = req->vote;
+ raft.volatile_vote = req->vote;
+ }
/*
* Role is never persisted. If recovery is happening, the
* node was restarted, and the former role can be false
@@ -80,34 +230,136 @@ raft_process_recovery(const struct raft_request *req)
void
raft_process_msg(const struct raft_request *req, uint32_t source)
{
- (void)source;
- if (req->term > raft.term) {
- // Update term.
- // The logic will be similar, but the code
- // below is for testing purposes.
- raft.term = req->term;
- }
- if (req->vote > 0) {
- // Check whether the vote's for us.
+ assert(source > 0);
+ assert(source != instance_id);
+ /* Outdated request. */
+ if (req->term < raft.volatile_term)
+ return;
+
+ enum raft_state old_state = raft.state;
+
+ /* Term bump. */
+ if (req->term > raft.volatile_term)
+ raft_sm_schedule_new_term(req->term);
+
+ /* Vote request during the on-going election. */
+ if (req->vote != 0) {
+ switch (raft.state) {
+ case RAFT_STATE_FOLLOWER:
+ case RAFT_STATE_LEADER:
+ /*
+ * Can't respond on vote requests when Raft is disabled.
+ */
+ if (!raft.is_enabled)
+ break;
+ /* Check if already voted in this term. */
+ if (raft.volatile_vote != 0)
+ break;
+ /* Not a candidate. Can't accept votes. */
+ if (req->vote == instance_id)
+ break;
+ /* Can't vote for too old or incomparable nodes. */
+ if (!raft_can_vote_for(req->vclock))
+ break;
+ /*
+ * Either the term is new, or didn't vote in the current
+ * term yet. Anyway can vote now.
+ */
+ raft.state = RAFT_STATE_FOLLOWER;
+ raft_sm_schedule_new_vote(req->vote);
+ break;
+ case RAFT_STATE_CANDIDATE:
+ /* Check if this is a vote for a competing candidate. */
+ if (req->vote != instance_id)
+ break;
+ /*
+ * Vote for self was requested earlier in this round,
+ * and now was answered by some other instance.
+ */
+ assert(raft.volatile_vote == instance_id);
+ bool was_set = bit_set(&raft.vote_mask, source);
+ raft.vote_count += !was_set;
+ if (raft.vote_count < replication_synchro_quorum)
+ break;
+ raft.state = RAFT_STATE_LEADER;
+ raft.leader = instance_id;
+ break;
+ default:
+ unreachable();
+ }
}
- switch (req->state) {
- case RAFT_STATE_FOLLOWER:
- break;
- case RAFT_STATE_CANDIDATE:
- // Perform voting logic.
- break;
- case RAFT_STATE_LEADER:
- // Switch to a new leader.
- break;
- default:
- break;
+ /*
+ * If the node does not claim to be a leader, nothing interesting. Terms
+ * and votes are already handled.
+ */
+ if (req->state != RAFT_STATE_LEADER)
+ goto end;
+ /* The node is a leader, but it is already known. */
+ if (source == raft.leader)
+ goto end;
+ /*
+ * XXX: A message from a conflicting leader. Split brain, basically.
+ * Need to decide what to do. Current solution is to do nothing. In
+ * future either this node should try to become a leader, or should stop
+ * all writes and require manual intervention.
+ */
+ if (raft.leader != 0)
+ goto end;
+
+ /* New leader was elected. */
+ raft.state = RAFT_STATE_FOLLOWER;
+ raft.leader = source;
+end:
+ if (raft.state != old_state) {
+ /*
+ * If the node stopped being a leader - should become read-only.
+ * If became a leader - should become read-write (if other
+ * subsystems also allow read-write).
+ */
+ box_update_ro_summary();
+ /*
+ * New term and vote are not broadcasted yet. Firstly their WAL
+ * write should be finished. But the state is volatile. It is ok
+ * to broadcast it now.
+ */
+ raft_broadcast_new_state();
}
}
+void
+raft_process_heartbeat(uint32_t source)
+{
+ /*
+ * When not a candidate - don't wait for anything. Therefore does not
+ * care about the leader being dead.
+ */
+ if (!raft.is_candidate)
+ return;
+ /* Don't care about heartbeats when this node is a leader itself. */
+ if (raft.state == RAFT_STATE_LEADER)
+ return;
+ /* Not interested in heartbeats from not a leader. */
+ if (raft.leader != source)
+ return;
+ /*
+ * XXX: it may be expensive to reset the timer like that. It may be less
+ * expensive to let the timer work, and remember last timestamp when
+ * anything was heard from the leader. Then in the timer callback check
+ * the timestamp, and restart the timer, if it is fine.
+ */
+ assert(ev_is_active(&raft.timer));
+ ev_timer_stop(loop(), &raft.timer);
+ raft_sm_wait_leader_dead();
+}
+
void
raft_serialize(struct raft_request *req, struct vclock *vclock)
{
memset(req, 0, sizeof(*req));
+ /*
+ * Volatile state is never used for any communications.
+ * Use only persisted state.
+ */
req->term = raft.term;
req->vote = raft.vote;
req->state = raft.state;
@@ -117,15 +369,23 @@ raft_serialize(struct raft_request *req, struct vclock *vclock)
req->vclock = vclock;
}
+/** Wakeup Raft state writer fiber waiting for WAL write end. */
static void
raft_write_cb(struct journal_entry *entry)
{
fiber_wakeup(entry->complete_data);
}
+/** Synchronously write a Raft request into WAL. */
static void
raft_write_request(const struct raft_request *req)
{
+ assert(raft.is_write_in_progress);
+ /*
+ * Vclock is never persisted by Raft. It is used only to
+ * be sent to network when vote for self.
+ */
+ assert(req->vclock == NULL);
struct region *region = &fiber()->gc;
uint32_t svp = region_used(region);
struct xrow_header row;
@@ -157,57 +417,343 @@ fail:
panic("Could not write a raft request to WAL\n");
}
+/**
+ * Flush Raft state changes to WAL. The callback resets itself, if during the
+ * write more changes appear.
+ */
+static void
+raft_sm_dump_step(ev_loop *loop, ev_check *watcher, int events)
+{
+ assert(watcher == &raft.io);
+ (void) events;
+ assert(raft.is_write_in_progress);
+ /* During write Raft can't be anything but a follower. */
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ struct raft_request req;
+ uint64_t old_term = raft.term;
+ uint32_t old_vote = raft.vote;
+ enum raft_state old_state = raft.state;
+
+ if (raft_is_fully_on_disk()) {
+end_dump:
+ raft.is_write_in_progress = false;
+ ev_check_stop(loop, watcher);
+ /*
+ * The state machine is stable. Can see now, to what state to
+ * go.
+ */
+ if (!raft.is_candidate) {
+ /*
+ * If not a candidate, can't do anything except vote for
+ * somebody (if Raft is enabled). Nothing to do except
+ * staying a follower without timeouts.
+ */
+ } else if (raft.leader != 0) {
+ /* There is a known leader. Wait until it is dead. */
+ raft_sm_wait_leader_dead();
+ } else if (raft.vote == instance_id) {
+ /* Just wrote own vote. */
+ if (replication_synchro_quorum == 1) {
+ raft.state = RAFT_STATE_LEADER;
+ raft.leader = instance_id;
+ /*
+ * Make read-write (if other subsystems allow
+ * that).
+ */
+ box_update_ro_summary();
+ } else {
+ raft.state = RAFT_STATE_CANDIDATE;
+ raft.vote_count = 1;
+ raft.vote_mask = 0;
+ raft_sm_wait_election_end();
+ }
+ } else if (raft.vote != 0) {
+ /*
+ * Voted for some other node. Wait if it manages to
+ * become a leader.
+ */
+ raft_sm_wait_election_end();
+ } else {
+ /* No leaders, no votes. */
+ raft_sm_schedule_new_election();
+ }
+ } else {
+ memset(&req, 0, sizeof(req));
+ assert(raft.volatile_term >= raft.term);
+ /* Term is written always. */
+ req.term = raft.volatile_term;
+ if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote)
+ req.vote = raft.volatile_vote;
+
+ raft_write_request(&req);
+
+ assert(req.term >= raft.term);
+ if (req.term > raft.term) {
+ raft.term = req.term;
+ raft.vote = 0;
+ }
+ if (req.vote != 0) {
+ assert(raft.vote == 0);
+ raft.vote = req.vote;
+ }
+ if (raft_is_fully_on_disk())
+ goto end_dump;
+ }
+
+ memset(&req, 0, sizeof(req));
+ /* Term is encoded always. */
+ req.term = raft.term;
+ bool has_changes = old_term != raft.term;
+ if (raft.vote != 0 && old_vote != raft.vote) {
+ req.vote = raft.vote;
+ /*
+ * When vote for self, need to send current vclock too. Two
+ * reasons for that:
+ *
+ * - nodes need to vote for the instance containing the newest
+ * data. So as not to loose it, because some of it may be
+ * confirmed by the synchronous replication;
+ *
+ * - replication is basically stopped during election. Other
+ * nodes can't learn vclock of this instance through regular
+ * replication.
+ */
+ if (raft.vote == instance_id)
+ req.vclock = &replicaset.vclock;
+ has_changes = true;
+ }
+ if (raft.state != old_state) {
+ req.state = raft.state;
+ has_changes = true;
+ }
+ if (has_changes)
+ raft_broadcast(&req);
+}
+
+/** Start Raft state flush to disk. */
+static void
+raft_sm_pause_and_dump(void)
+{
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ if (raft.is_write_in_progress)
+ return;
+ ev_timer_stop(loop(), &raft.timer);
+ ev_check_start(loop(), &raft.io);
+ raft.is_write_in_progress = true;
+}
+
+/** Bump term, reset Raft state, and persist that fact. */
+static void
+raft_sm_schedule_new_term(uint64_t new_term)
+{
+ assert(new_term > raft.volatile_term);
+ assert(raft.volatile_term >= raft.term);
+ raft.volatile_term = new_term;
+ /* New terms means completely new Raft state. */
+ raft.volatile_vote = 0;
+ raft.leader = 0;
+ raft.state = RAFT_STATE_FOLLOWER;
+ raft_sm_pause_and_dump();
+}
+
+/** Vote in the current term, and persist that fact. */
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote)
+{
+ assert(raft.volatile_vote == 0);
+ raft.volatile_vote = new_vote;
+ raft_sm_pause_and_dump();
+}
+
+/**
+ * Bump term and vote for self immediately. After that is persisted, the
+ * election timeout will be activated. Unless during that nothing newer happens.
+ */
+static void
+raft_sm_schedule_new_election(void)
+{
+ assert(raft_is_fully_on_disk());
+ assert(raft.is_candidate);
+ assert(raft.leader == 0);
+ /* Everyone is a follower until its vote for self is persisted. */
+ raft_sm_schedule_new_term(raft.term + 1);
+ raft_sm_schedule_new_vote(instance_id);
+ box_update_ro_summary();
+}
+
void
raft_new_term(uint64_t min_new_term)
{
+ uint64_t new_term;
if (raft.term < min_new_term)
- raft.term = min_new_term + 1;
+ new_term = min_new_term + 1;
else
- ++raft.term;
+ new_term = raft.term + 1;
+ enum raft_state old_state = raft.state;
+ raft_sm_schedule_new_term(new_term);
+ if (raft.state != old_state)
+ raft_broadcast_new_state();
+ box_update_ro_summary();
+}
+
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+ int events)
+{
+ assert(timer == &raft.timer);
+ (void)events;
+ ev_timer_stop(loop, timer);
+ raft_sm_schedule_new_election();
+}
+
+static void
+raft_sm_wait_leader_dead(void)
+{
+ assert(!ev_is_active(&raft.timer));
+ assert(!ev_is_active(&raft.io));
+ assert(!raft.is_write_in_progress);
+ assert(raft.is_candidate);
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ assert(raft.leader != 0);
+ double death_timeout = replication_disconnect_timeout();
+ ev_timer_set(&raft.timer, death_timeout, death_timeout);
+}
+static void
+raft_sm_wait_election_end(void)
+{
+ assert(!ev_is_active(&raft.timer));
+ assert(!ev_is_active(&raft.io));
+ assert(!raft.is_write_in_progress);
+ assert(raft.is_candidate);
+ assert(raft.state == RAFT_STATE_FOLLOWER ||
+ (raft.state == RAFT_STATE_CANDIDATE &&
+ raft.volatile_vote == instance_id));
+ assert(raft.leader == 0);
+ double election_timeout = raft.election_timeout +
+ raft_new_random_election_shift();
+ ev_timer_set(&raft.timer, election_timeout, election_timeout);
+}
+
+static void
+raft_sm_start(void)
+{
+ assert(!ev_is_active(&raft.timer));
+ assert(!ev_is_active(&raft.io));
+ assert(!raft.is_write_in_progress);
+ assert(!raft.is_enabled);
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ raft.is_enabled = true;
+ raft.is_candidate = raft.is_cfg_candidate;
+ if (!raft.is_candidate)
+ /* Nop. */;
+ else if (raft.leader != 0)
+ raft_sm_wait_leader_dead();
+ else
+ raft_sm_schedule_new_election();
+ box_update_ro_summary();
+ /*
+ * When Raft is enabled, send the complete state. Because
+ * it wasn't sent in disabled state.
+ */
struct raft_request req;
- memset(&req, 0, sizeof(req));
- req.term = raft.term;
- raft_write_request(&req);
+ raft_serialize(&req, NULL);
+ raft_broadcast(&req);
+}
+
+static void
+raft_sm_stop(void)
+{
+ assert(raft.is_enabled);
+ raft.is_enabled = false;
+ raft.is_candidate = false;
+ box_update_ro_summary();
}
void
raft_cfg_is_enabled(bool is_enabled)
{
- raft.is_enabled = is_enabled;
+ if (is_enabled == raft.is_enabled)
+ return;
+
+ if (!is_enabled)
+ raft_sm_stop();
+ else
+ raft_sm_start();
}
void
raft_cfg_is_candidate(bool is_candidate)
{
- raft.is_candidate = is_candidate;
+ bool old_is_candidate = raft.is_candidate;
+ raft.is_cfg_candidate = is_candidate;
+ raft.is_candidate = is_candidate && raft.is_enabled;
+ if (raft.is_candidate == old_is_candidate)
+ return;
+
+ if (raft.is_candidate) {
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ /*
+ * If there is an on-going WAL write, it means
+ * there was some node who sent newer data to this
+ * node.
+ */
+ if (raft.leader == 0 && raft_is_fully_on_disk())
+ raft_sm_schedule_new_election();
+ } else if (raft.state != RAFT_STATE_FOLLOWER) {
+ raft.state = RAFT_STATE_FOLLOWER;
+ raft_broadcast_new_state();
+ }
+ box_update_ro_summary();
}
void
raft_cfg_election_timeout(double timeout)
{
+ if (timeout == raft.election_timeout)
+ return;
+
raft.election_timeout = timeout;
+ if (raft.vote != 0 && raft.leader == 0) {
+ assert(ev_is_active(&raft.timer));
+ double timeout = ev_timer_remaining(loop(), &raft.timer) -
+ raft.timer.at + raft.election_timeout;
+ ev_timer_stop(loop(), &raft.timer);
+ ev_timer_set(&raft.timer, timeout, timeout);
+ }
}
void
raft_cfg_election_quorum(void)
{
+ if (raft.state != RAFT_STATE_CANDIDATE ||
+ raft.state == RAFT_STATE_LEADER)
+ return;
+ if (raft.vote_count < replication_synchro_quorum)
+ return;
+ /*
+ * The node is a candidate. It means its state if fully synced with
+ * disk. Otherwise it would be a follower.
+ */
+ assert(!raft.is_write_in_progress);
+ raft.state = RAFT_STATE_LEADER;
+ raft.leader = instance_id;
+ raft_broadcast_new_state();
+ box_update_ro_summary();
}
void
raft_cfg_death_timeout(void)
{
-}
-
-void
-raft_vote(uint32_t vote_for)
-{
- raft.vote = vote_for;
-
- struct raft_request req;
- memset(&req, 0, sizeof(req));
- req.vote = vote_for;
- raft_write_request(&req);
+ if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate &&
+ raft.leader != 0) {
+ assert(ev_is_active(&raft.timer));
+ double death_timeout = replication_disconnect_timeout();
+ double timeout = ev_timer_remaining(loop(), &raft.timer) -
+ raft.timer.at + death_timeout;
+ ev_timer_stop(loop(), &raft.timer);
+ ev_timer_set(&raft.timer, timeout, timeout);
+ }
}
void
@@ -220,3 +766,22 @@ raft_broadcast(const struct raft_request *req)
}
}
}
+
+void
+raft_bootstrap_leader(void)
+{
+ assert(raft.is_enabled);
+ assert(raft.volatile_term == 0);
+ assert(raft.volatile_vote == 0);
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ raft.state = RAFT_STATE_LEADER;
+ raft_broadcast_new_state();
+ box_update_ro_summary();
+}
+
+void
+raft_init(void)
+{
+ ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0);
+ ev_check_init(&raft.io, raft_sm_dump_step);
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index d875707de..57584bc1b 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -31,6 +31,7 @@
*/
#include <stdint.h>
#include <stdbool.h>
+#include "tarantool_ev.h"
#if defined(__cplusplus)
extern "C" {
@@ -48,12 +49,62 @@ enum raft_state {
extern const char *raft_state_strs[];
struct raft {
+ /** Instance ID of leader of the current term. */
uint32_t leader;
+ /** State of the instance. */
enum raft_state state;
+ /**
+ * Volatile part of the Raft state, whose WAL write may be
+ * still in-progress, and yet the state may be already
+ * used. Volatile state is never sent to anywhere, but the
+ * state machine makes decisions based on it. That is
+ * vital.
+ * As an example, volatile vote needs to be used to reject
+ * votes inside a term, where the instance already voted
+ * (even if the vote WAL write is not finished yet).
+ * Otherwise the instance would try to write several votes
+ * inside one term.
+ */
+ uint64_t volatile_term;
+ uint32_t volatile_vote;
+ /**
+ * Flag whether Raft is enabled. When disabled, it still
+ * persists terms so as to quickly enroll into the cluster
+ * when (if) it is enabled. In everything else disabled
+ * Raft does not affect instance work.
+ */
bool is_enabled;
+ /**
+ * Flag whether the node can become a leader. It is an
+ * accumulated value of configuration options Raft enabled
+ * Raft candidate. If at least one is false - the instance
+ * is not a candidate.
+ */
bool is_candidate;
+ /** Flag whether the instance is allowed to be a leader. */
+ bool is_cfg_candidate;
+ /**
+ * Flag whether Raft currently tries to write something into WAL. It
+ * happens asynchronously, not right after Raft state is updated.
+ */
+ bool is_write_in_progress;
+ /**
+ * Persisted Raft state. These values are used when need to tell current
+ * Raft state to other nodes.
+ */
uint64_t term;
uint32_t vote;
+ /** Bit 1 means that a vote from that instance was obtained. */
+ uint32_t vote_mask;
+ /** Number of votes for this instance. Valid only in candidate state. */
+ int vote_count;
+ /** State machine timed event trigger. */
+ struct ev_timer timer;
+ /**
+ * Dump of Raft state in the end of event loop, when it is changed.
+ */
+ struct ev_check io;
+ /** Configured election timeout in seconds. */
double election_timeout;
};
@@ -65,6 +116,18 @@ raft_new_term(uint64_t min_new_term);
void
raft_vote(uint32_t vote_for);
+static inline bool
+raft_is_ro(void)
+{
+ return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
+}
+
+static inline bool
+raft_is_source_allowed(uint32_t source_id)
+{
+ return !raft.is_enabled || raft.leader == source_id;
+}
+
static inline bool
raft_is_enabled(void)
{
@@ -83,6 +146,9 @@ raft_process_recovery(const struct raft_request *req);
void
raft_process_msg(const struct raft_request *req, uint32_t source);
+void
+raft_process_heartbeat(uint32_t source);
+
/**
* Broadcast the changes in this instance's raft status to all
* the followers.
@@ -113,6 +179,29 @@ raft_serialize(struct raft_request *req, struct vclock *vclock);
void
raft_broadcast(const struct raft_request *req);
+/**
+ * Bootstrap the current instance as the first leader of the cluster. That is
+ * done bypassing the Raft election protocol, by just assigning this node a
+ * leader role. That is needed, because when the cluster is not bootstrapped, it
+ * is necessary to find a node, which will generate a replicaset UUID, write it
+ * into _cluster space, and register all the other nodes in _cluster.
+ * Until it is done, all nodes but one won't boot. Their WALs won't work. And
+ * therefore they won't be able to participate in leader election. That
+ * effectively makes the cluster dead from the beginning unless the first
+ * bootstrapped node won't declare itself a leader without elections.
+ *
+ * XXX: That does not solve the problem, when the first node boots, creates a
+ * snapshot, and then immediately dies. After recovery it won't declare itself a
+ * leader. Therefore if quorum > 1, the cluster won't proceed to registering
+ * any replicas and becomes completely dead. Perhaps that must be solved by
+ * truncating quorum down to number of records in _cluster.
+ */
+void
+raft_bootstrap_leader(void);
+
+void
+raft_init(void);
+
#if defined(__cplusplus)
}
#endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 74581db9c..4f9bbc0de 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -771,6 +771,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
relay_send(relay, row);
}
+/**
+ * Recreate recovery cursor from the last confirmed point. That is
+ * used by Raft, when the node becomes a leader. It may happen,
+ * that it already sent some data to other nodes as a follower,
+ * and they ignored the data. Now when the node is a leader, it
+ * should send the not confirmed data again. Otherwise the cluster
+ * will stuck, or worse - the newer data would be sent without the
+ * older sent but ignored data.
+ */
+static void
+relay_restart_recovery(struct relay *relay)
+{
+ recovery_delete(relay->r);
+ relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock);
+ recover_remaining_wals(relay->r, &relay->stream, NULL, true);
+}
+
struct relay_raft_msg {
struct cmsg base;
struct cmsg_hop route;
@@ -786,6 +803,8 @@ relay_raft_msg_push(struct cmsg *base)
struct xrow_header row;
xrow_encode_raft(&row, &fiber()->gc, &msg->req);
try {
+ if (msg->req.state == RAFT_STATE_LEADER)
+ relay_restart_recovery(msg->relay);
relay_send(msg->relay, &row);
} catch (Exception *e) {
relay_set_error(msg->relay, e);
--
2.21.1 (Apple Git-122.3)
More information about the Tarantool-patches
mailing list