[Tarantool-patches] [PATCH v2 09/11] raft: introduce state machine
Serge Petrenko
sergepetrenko at tarantool.org
Mon Sep 21 11:22:15 MSK 2020
19.09.2020 18:49, Vladislav Shpilevoy пишет:
> Here is a new version of the patch after some squashes.
>
> ====================
> raft: introduce state machine
>
> The commit is a core part of Raft implementation. It introduces
> the Raft state machine implementation and its integration into the
> instance's life cycle.
>
> The implementation follows the protocol to the letter except a few
> important details.
>
> Firstly, the original Raft assumes, that all nodes share the same
> log record numbers. In Tarantool they are called LSNs. But in case
> of Tarantool each node has its own LSN in its own component of
> vclock. That makes the election messages a bit heavier, because
> the nodes need to send and compare complete vclocks of each other
> instead of a single number like in the original Raft. But logic
> becomes simpler. Because in the original Raft there is a problem
> of uncertainty about what to do with records of an old leader
> right after a new leader is elected. They could be rolled back or
> confirmed depending on circumstances. The issue disappears when
> vclock is used.
>
> Secondly, leader election works differently during cluster
> bootstrap, until number of bootstrapped replicas becomes >=
> election quorum. That arises from specifics of replicas bootstrap
> and order of systems initialization. In short: during bootstrap a
> leader election may use a smaller election quorum than the
> configured one. See more details in the code.
>
> Part of #1146
Consider these fixes for applier vclock segfault pushed on top of the
branch.
src/box/box.cc | 6 +-----
src/box/raft.c | 12 +++++++++---
2 files changed, 10 insertions(+), 8 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index c5dcbd959..1169f2cd5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2160,11 +2160,7 @@ box_process_subscribe(struct ev_io *io, struct
xrow_header *header)
* should be 0.
*/
struct raft_request req;
- /*
- * Omit the candidate vclock, since we've just sent it in
- * subscribe response.
- */
- raft_serialize_for_network(&req, NULL);
+ raft_serialize_for_network(&req, &vclock);
xrow_encode_raft(&row, &fiber()->gc, &req);
coio_write_xrow(io, &row);
}
diff --git a/src/box/raft.c b/src/box/raft.c
index 7b7ef9c1c..45783f0e3 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -130,8 +130,6 @@ raft_new_random_election_shift(void)
static inline bool
raft_can_vote_for(const struct vclock *v)
{
- if (v == NULL)
- return false;
int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
return cmp == 0 || cmp == 1;
}
@@ -369,6 +367,12 @@ raft_process_msg(const struct raft_request *req,
uint32_t source)
"candidate");
break;
}
+ /* Can't vote when vclock is unknown. */
+ if (req->vclock == NULL) {
+ say_info("RAFT: vote request is skipped - "
+ "missing candidate vclock.");
+ break;
+ }
/* Can't vote for too old or incomparable nodes. */
if (!raft_can_vote_for(req->vclock)) {
say_info("RAFT: vote request is skipped - "
@@ -858,8 +862,10 @@ raft_serialize_for_network(struct raft_request
*req, struct vclock *vclock)
req->state = raft.state;
/*
* Raft does not own vclock, so it always expects it passed
externally.
+ * Vclock is sent out only by candidate instances.
*/
- req->vclock = vclock;
+ if (req->state == RAFT_STATE_CANDIDATE)
+ req->vclock = vclock;
}
void
--
2.24.3 (Apple Git-128)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index fd7cf1c79..c352faf5e 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -900,8 +900,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
> * Return 0 for success or -1 in case of an error.
> */
> static int
> -applier_apply_tx(struct stailq *rows)
> +applier_apply_tx(struct applier *applier, struct stailq *rows)
> {
> + /*
> + * Rows received not directly from a leader are ignored. That is a
> + * protection against the case when an old leader keeps sending data
> + * around not knowing yet that it is not a leader anymore.
> + *
> + * XXX: it may be that this can be fine to apply leader transactions by
> + * looking at their replica_id field if it is equal to leader id. That
> + * can be investigated as an 'optimization'. Even though may not give
> + * anything, because won't change total number of rows sent in the
> + * network anyway.
> + */
> + if (!raft_is_source_allowed(applier->instance_id))
> + return 0;
> struct xrow_header *first_row = &stailq_first_entry(rows,
> struct applier_tx_row, next)->row;
> struct xrow_header *last_row;
> @@ -1241,6 +1254,7 @@ applier_subscribe(struct applier *applier)
> struct xrow_header *first_row =
> &stailq_first_entry(&rows, struct applier_tx_row,
> next)->row;
> + raft_process_heartbeat(applier->instance_id);
> if (first_row->lsn == 0) {
> if (unlikely(iproto_type_is_raft_request(
> first_row->type))) {
> @@ -1249,7 +1263,7 @@ applier_subscribe(struct applier *applier)
> diag_raise();
> }
> applier_signal_ack(applier);
> - } else if (applier_apply_tx(&rows) != 0) {
> + } else if (applier_apply_tx(applier, &rows) != 0) {
> diag_raise();
> }
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 77ab21dbb..c5dcbd959 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -157,7 +157,7 @@ void
> box_update_ro_summary(void)
> {
> bool old_is_ro_summary = is_ro_summary;
> - is_ro_summary = is_ro || is_orphan;
> + is_ro_summary = is_ro || is_orphan || raft_is_ro();
> /* In 99% nothing changes. Filter this out first. */
> if (is_ro_summary == old_is_ro_summary)
> return;
> @@ -171,6 +171,10 @@ static int
> box_check_writable(void)
> {
> if (is_ro_summary) {
> + /*
> + * XXX: return a special error when the node is not a leader to
> + * reroute to the leader node.
> + */
> diag_set(ClientError, ER_READONLY);
> diag_log();
> return -1;
> @@ -2652,6 +2656,7 @@ box_init(void)
>
> txn_limbo_init();
> sequence_init();
> + raft_init();
> }
>
> bool
> diff --git a/src/box/raft.c b/src/box/raft.c
> index 4d3d07c48..07a49351f 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -36,6 +36,13 @@
> #include "small/region.h"
> #include "replication.h"
> #include "relay.h"
> +#include "box.h"
> +#include "tt_static.h"
> +
> +/**
> + * Maximal random deviation of the election timeout. From the configured value.
> + */
> +#define RAFT_RANDOM_ELECTION_FACTOR 0.1
>
> const char *raft_state_strs[] = {
> NULL,
> @@ -48,19 +55,261 @@ const char *raft_state_strs[] = {
> struct raft raft = {
> .leader = 0,
> .state = RAFT_STATE_FOLLOWER,
> + .volatile_term = 1,
> + .volatile_vote = 0,
> .is_enabled = false,
> .is_candidate = false,
> + .is_cfg_candidate = false,
> + .is_write_in_progress = false,
> .term = 1,
> .vote = 0,
> + .vote_mask = 0,
> + .vote_count = 0,
> + .worker = NULL,
> + .election_timeout = 5,
> };
>
> +/**
> + * Check if Raft is completely synced with disk. Meaning all its critical values
> + * are in WAL. Only in that state the node can become a leader or a candidate.
> + * If the node has a not flushed data, it means either the term was bumped, or
> + * a new vote was made.
> + *
> + * In case of term bump it means either there is another node with a newer term,
> + * and this one should be a follower; or this node bumped the term itself along
> + * with making a vote to start a new election - then it is also a follower which
> + * will turn into a candidate when the flush is done.
> + *
> + * In case of a new not flushed vote it means either this node voted for some
> + * other node, and must be a follower; or it voted for self, and also must be a
> + * follower, but will become a candidate when the flush is done.
> + *
> + * In total - when something is not synced with disk, the instance is a follower
> + * in any case.
> + */
> +static bool
> +raft_is_fully_on_disk(void)
> +{
> + return raft.volatile_term == raft.term &&
> + raft.volatile_vote == raft.vote;
> +}
> +
> +/**
> + * Raft protocol says that election timeout should be a bit randomized so as
> + * the nodes wouldn't start election at the same time and end up with not having
> + * a quorum for anybody. This implementation randomizes the election timeout by
> + * adding {election timeout * random factor} value, where max value of the
> + * factor is a constant floating point value > 0.
> + */
> +static inline double
> +raft_new_random_election_shift(void)
> +{
> + double timeout = raft.election_timeout;
> + /* Translate to ms. Integer is needed to be able to use mod below. */
> + uint32_t rand_part =
> + (uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
> + if (rand_part == 0)
> + rand_part = 1;
> + /*
> + * XXX: this is not giving a good distribution, but it is not so trivial
> + * to implement a correct random value generator. There is a task to
> + * unify all such places. Not critical here.
> + */
> + rand_part = rand() % (rand_part + 1);
> + return rand_part / 1000.0;
> +}
> +
> +/**
> + * Raft says that during election a node1 can vote for node2, if node2 has a
> + * bigger term, or has the same term but longer log. In case of Tarantool it
> + * means the node2 vclock should be >= node1 vclock, in all components. It is
> + * not enough to compare only one component. At least because there may be not
> + * a previous leader when the election happens first time. Or a node could
> + * restart and forget who the previous leader was.
> + */
> +static inline bool
> +raft_can_vote_for(const struct vclock *v)
> +{
> + if (v == NULL)
> + return false;
> + int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
> + return cmp == 0 || cmp == 1;
> +}
> +
> +/**
> + * Election quorum is not strictly equal to synchronous replication quorum.
> + * Sometimes it can be lowered. That is about bootstrap.
> + *
> + * The problem with bootstrap is that when the replicaset boots, all the
> + * instances can't write to WAL and can't recover from their initial snapshot.
> + * They need one node which will boot first, and then they will replicate from
> + * it.
> + *
> + * This one node should boot from its zero snapshot, create replicaset UUID,
> + * register self with ID 1 in _cluster space, and then register all the other
> + * instances here. To do that the node must be writable. It should have
> + * read_only = false, connection quorum satisfied, and be a Raft leader if Raft
> + * is enabled.
> + *
> + * To be elected a Raft leader it needs to perform election. But it can't be
> + * done before at least synchronous quorum of the replicas is bootstrapped. And
> + * they can't be bootstrapped because wait for a leader to initialize _cluster.
> + * Cyclic dependency.
> + *
> + * This is resolved by truncation of the election quorum to the number of
> + * registered replicas, if their count is less than synchronous quorum. That
> + * helps to elect a first leader.
> + *
> + * It may seem that the first node could just declare itself a leader and then
> + * strictly follow the protocol from now on, but that won't work, because if the
> + * first node will restart after it is booted, but before quorum of replicas is
> + * booted, the cluster will stuck again.
> + *
> + * The current solution is totally safe because
> + *
> + * - after all the cluster will have node count >= quorum, if user used a
> + * correct config (God help him if he didn't);
> + *
> + * - synchronous replication quorum is untouched - it is not truncated. Only
> + * leader election quorum is affected. So synchronous data won't be lost.
> + */
> +static inline int
> +raft_election_quorum(void)
> +{
> + return MIN(replication_synchro_quorum, replicaset.registered_count);
> +}
> +
> +/** Broadcast an event about this node changed its state to all relays. */
> +static inline void
> +raft_broadcast_new_state(void)
> +{
> + struct raft_request req;
> + memset(&req, 0, sizeof(req));
> + req.term = raft.term;
> + req.state = raft.state;
> + raft_broadcast(&req);
> +}
> +
> +/** Raft state machine methods. 'sm' stands for State Machine. */
> +
> +/**
> + * Start the state machine. When it is stopped, Raft state is updated and
> + * goes to WAL when necessary, but it does not affect the instance operation.
> + * For example, when Raft is stopped, the instance role does not affect whether
> + * it is writable.
> + */
> +static void
> +raft_sm_start(void);
> +
> +/**
> + * Stop the state machine. Now until Raft is re-enabled,
> + * - Raft stops affecting the instance operation;
> + * - this node can't become a leader;
> + * - this node can't vote.
> + */
> +static void
> +raft_sm_stop(void);
> +
> +/**
> + * When the instance is a follower but is allowed to be a leader, it will wait
> + * for death of the current leader to start new election.
> + */
> +static void
> +raft_sm_wait_leader_dead(void);
> +
> +/**
> + * If election is started by this node, or it voted for some other node started
> + * the election, and it can be a leader itself, it will wait until the current
> + * election times out. When it happens, the node will start new election.
> + */
> +static void
> +raft_sm_wait_election_end(void);
> +
> +/** Bump volatile term and schedule its flush to disk. */
> +static void
> +raft_sm_schedule_new_term(uint64_t new_term);
> +
> +/** Bump volatile vote and schedule its flush to disk. */
> +static void
> +raft_sm_schedule_new_vote(uint32_t new_vote);
> +
> +/**
> + * Bump term and vote for self immediately. After that is persisted, the
> + * election timeout will be activated. Unless during that nothing newer happens.
> + */
> +static void
> +raft_sm_schedule_new_election(void);
> +
> +/**
> + * The main trigger of Raft state machine - start new election when the current
> + * leader dies, or when there is no a leader and the previous election failed.
> + */
> +static void
> +raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
> + int events);
> +
> +/** Start Raft state flush to disk. */
> +static void
> +raft_sm_pause_and_dump(void);
> +
> +static void
> +raft_sm_become_leader(void);
> +
> +static void
> +raft_sm_follow_leader(uint32_t leader);
> +
> +static void
> +raft_sm_become_candidate(void);
> +
> +static const char *
> +raft_request_to_string(const struct raft_request *req)
> +{
> + assert(req->term != 0);
> + int size = 1024;
> + char buf[1024];
> + char *pos = buf;
> + int rc = snprintf(pos, size, "{term: %llu", req->term);
> + assert(rc >= 0);
> + pos += rc;
> + size -= rc;
> + if (req->vote != 0) {
> + rc = snprintf(pos, size, ", vote: %u", req->vote);
> + assert(rc >= 0);
> + pos += rc;
> + size -= rc;
> + }
> + if (req->state != 0) {
> + rc = snprintf(pos, size, ", state: %s",
> + raft_state_strs[req->state]);
> + assert(rc >= 0);
> + pos += rc;
> + size -= rc;
> + }
> + if (req->vclock != NULL) {
> + rc = snprintf(pos, size, ", vclock: %s",
> + vclock_to_string(req->vclock));
> + assert(rc >= 0);
> + pos += rc;
> + size -= rc;
> + }
> + rc = snprintf(pos, size, "}");
> + assert(rc >= 0);
> + pos += rc;
> + return tt_cstr(buf, pos - buf);
> +}
> +
> void
> raft_process_recovery(const struct raft_request *req)
> {
> - if (req->term != 0)
> + say_verbose("RAFT: recover %s", raft_request_to_string(req));
> + if (req->term != 0) {
> raft.term = req->term;
> - if (req->vote != 0)
> + raft.volatile_term = req->term;
> + }
> + if (req->vote != 0) {
> raft.vote = req->vote;
> + raft.volatile_vote = req->vote;
> + }
> /*
> * Role is never persisted. If recovery is happening, the
> * node was restarted, and the former role can be false
> @@ -80,34 +329,526 @@ raft_process_recovery(const struct raft_request *req)
> void
> raft_process_msg(const struct raft_request *req, uint32_t source)
> {
> - (void)source;
> - if (req->term > raft.term) {
> - // Update term.
> - // The logic will be similar, but the code
> - // below is for testing purposes.
> - raft.term = req->term;
> + say_info("RAFT: message %s from %u", raft_request_to_string(req),
> + source);
> + assert(source > 0);
> + assert(source != instance_id);
> + /* Outdated request. */
> + if (req->term < raft.volatile_term)
> + return;
> +
> + enum raft_state old_state = raft.state;
> +
> + /* Term bump. */
> + if (req->term > raft.volatile_term)
> + raft_sm_schedule_new_term(req->term);
> +
> + /* Vote request during the on-going election. */
> + if (req->vote != 0) {
> + switch (raft.state) {
> + case RAFT_STATE_FOLLOWER:
> + /*
> + * Can't respond on vote requests when Raft is disabled.
> + */
> + if (!raft.is_enabled) {
> + say_info("RAFT: vote request is skipped - RAFT "
> + "is disabled");
> + break;
> + }
> + /* Check if already voted in this term. */
> + if (raft.volatile_vote != 0) {
> + say_info("RAFT: vote request is skipped - "
> + "already voted in this term");
> + break;
> + }
> + /* Not a candidate. Can't accept votes. */
> + if (req->vote == instance_id) {
> + say_info("RAFT: vote request is skipped - "
> + "can't accept vote for self if not a "
> + "candidate");
> + break;
> + }
> + /* Can't vote for too old or incomparable nodes. */
> + if (!raft_can_vote_for(req->vclock)) {
> + say_info("RAFT: vote request is skipped - "
> + "the vclock is not acceptable = %s",
> + vclock_to_string(req->vclock));
> + break;
> + }
> + /*
> + * Check if somebody is asking to vote for a third
> + * node - nope. Make votes only when asked directly by
> + * the new candidate. However that restriction may be
> + * relaxed in future, if can be proven to be safe.
> + */
> + if (req->vote != source) {
> + say_info("RAFT: vote request is skipped - "
> + "indirect votes are not allowed");
> + break;
> + }
> + /*
> + * Either the term is new, or didn't vote in the current
> + * term yet. Anyway can vote now.
> + */
> + raft_sm_schedule_new_vote(req->vote);
> + break;
> + case RAFT_STATE_CANDIDATE:
> + /* Check if this is a vote for a competing candidate. */
> + if (req->vote != instance_id) {
> + say_info("RAFT: vote request is skipped - "
> + "competing candidate");
> + break;
> + }
> + /*
> + * Vote for self was requested earlier in this round,
> + * and now was answered by some other instance.
> + */
> + assert(raft.volatile_vote == instance_id);
> + int quorum = raft_election_quorum();
> + bool was_set = bit_set(&raft.vote_mask, source);
> + raft.vote_count += !was_set;
> + if (raft.vote_count < quorum) {
> + say_info("RAFT: accepted vote for self, vote "
> + "count is %d/%d", raft.vote_count,
> + quorum);
> + break;
> + }
> + raft_sm_become_leader();
> + break;
> + case RAFT_STATE_LEADER:
> + /*
> + * If the node is still a leader, it ignores all votes.
> + * Indeed, if the received vote would be from a new
> + * term, the node would bump its own term and would
> + * enter the follower state by now. If the vote is from
> + * the current term, then the leader can freely ignore
> + * it.
> + */
> + break;
> + default:
> + unreachable();
> + }
> + }
> + /*
> + * If the node does not claim to be a leader, nothing interesting. Terms
> + * and votes are already handled.
> + */
> + if (req->state != RAFT_STATE_LEADER)
> + goto end;
> + /* The node is a leader, but it is already known. */
> + if (source == raft.leader)
> + goto end;
> + /*
> + * XXX: A message from a conflicting leader. Split brain, basically.
> + * Need to decide what to do. Current solution is to do nothing. In
> + * future either this node should try to become a leader, or should stop
> + * all writes and require manual intervention.
> + */
> + if (raft.leader != 0) {
> + say_warn("RAFT: conflicting leader detected in one term - "
> + "known is %u, received %u", raft.leader, source);
> + goto end;
> + }
> +
> + /* New leader was elected. */
> + raft_sm_follow_leader(source);
> +end:
> + if (raft.state != old_state) {
> + /*
> + * New term and vote are not broadcasted yet. Firstly their WAL
> + * write should be finished. But the state is volatile. It is ok
> + * to broadcast it now.
> + */
> + raft_broadcast_new_state();
> + }
> +}
> +
> +void
> +raft_process_heartbeat(uint32_t source)
> +{
> + /*
> + * When not a candidate - don't wait for anything. Therefore do not care
> + * about the leader being dead.
> + */
> + if (!raft.is_candidate)
> + return;
> + /* Don't care about heartbeats when this node is a leader itself. */
> + if (raft.state == RAFT_STATE_LEADER)
> + return;
> + /* Not interested in heartbeats from not a leader. */
> + if (raft.leader != source)
> + return;
> + /*
> + * The instance currently is busy with writing something on disk. Can't
> + * react to heartbeats.
> + */
> + if (raft.is_write_in_progress)
> + return;
> + /*
> + * XXX: it may be expensive to reset the timer like that. It may be less
> + * expensive to let the timer work, and remember last timestamp when
> + * anything was heard from the leader. Then in the timer callback check
> + * the timestamp, and restart the timer, if it is fine.
> + */
> + assert(ev_is_active(&raft.timer));
> + ev_timer_stop(loop(), &raft.timer);
> + raft_sm_wait_leader_dead();
> +}
> +
> +/** Wakeup Raft state writer fiber waiting for WAL write end. */
> +static void
> +raft_write_cb(struct journal_entry *entry)
> +{
> + fiber_wakeup(entry->complete_data);
> +}
> +
> +/** Synchronously write a Raft request into WAL. */
> +static void
> +raft_write_request(const struct raft_request *req)
> +{
> + assert(raft.is_write_in_progress);
> + /*
> + * Vclock is never persisted by Raft. It is used only to
> + * be sent to network when vote for self.
> + */
> + assert(req->vclock == NULL);
> + /*
> + * State is not persisted. That would be strictly against Raft protocol.
> + * The reason is that it does not make much sense - even if the node is
> + * a leader now, after the node is restarted, there will be another
> + * leader elected by that time likely.
> + */
> + assert(req->state == 0);
> + struct region *region = &fiber()->gc;
> + uint32_t svp = region_used(region);
> + struct xrow_header row;
> + char buf[sizeof(struct journal_entry) +
> + sizeof(struct xrow_header *)];
> + struct journal_entry *entry = (struct journal_entry *)buf;
> + entry->rows[0] = &row;
> +
> + if (xrow_encode_raft(&row, region, req) != 0)
> + goto fail;
> + journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb,
> + fiber());
> +
> + if (journal_write(entry) != 0 || entry->res < 0) {
> + diag_set(ClientError, ER_WAL_IO);
> + diag_log();
> + goto fail;
> }
> - if (req->vote > 0) {
> - // Check whether the vote's for us.
> +
> + raft_broadcast(req);
> +
> + region_truncate(region, svp);
> + return;
> +fail:
> + /*
> + * XXX: the stub is supposed to be removed once it is defined what to do
> + * when a raft request WAL write fails.
> + */
> + panic("Could not write a raft request to WAL\n");
> +}
> +
> +static void
> +raft_worker_handle_io(void)
> +{
> + assert(raft.is_write_in_progress);
> + /* During write Raft can't be anything but a follower. */
> + assert(raft.state == RAFT_STATE_FOLLOWER);
> + struct raft_request req;
> + uint64_t old_term = raft.term;
> + uint32_t old_vote = raft.vote;
> + enum raft_state old_state = raft.state;
> +
> + if (raft_is_fully_on_disk()) {
> +end_dump:
> + raft.is_write_in_progress = false;
> + /*
> + * The state machine is stable. Can see now, to what state to
> + * go.
> + */
> + if (!raft.is_candidate) {
> + /*
> + * If not a candidate, can't do anything except vote for
> + * somebody (if Raft is enabled). Nothing to do except
> + * staying a follower without timeouts.
> + */
> + } else if (raft.leader != 0) {
> + /* There is a known leader. Wait until it is dead. */
> + raft_sm_wait_leader_dead();
> + } else if (raft.vote == instance_id) {
> + /* Just wrote own vote. */
> + if (raft_election_quorum() == 1)
> + raft_sm_become_leader();
> + else
> + raft_sm_become_candidate();
> + } else if (raft.vote != 0) {
> + /*
> + * Voted for some other node. Wait if it manages to
> + * become a leader.
> + */
> + raft_sm_wait_election_end();
> + } else {
> + /* No leaders, no votes. */
> + raft_sm_schedule_new_election();
> + }
> + } else {
> + memset(&req, 0, sizeof(req));
> + assert(raft.volatile_term >= raft.term);
> + /* Term is written always. */
> + req.term = raft.volatile_term;
> + if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote)
> + req.vote = raft.volatile_vote;
> +
> + raft_write_request(&req);
> + say_verbose("RAFT: persist and apply state %s",
> + raft_request_to_string(&req));
> +
> + assert(req.term >= raft.term);
> + if (req.term > raft.term) {
> + raft.term = req.term;
> + raft.vote = 0;
> + }
> + if (req.vote != 0) {
> + assert(raft.vote == 0);
> + raft.vote = req.vote;
> + }
> + if (raft_is_fully_on_disk())
> + goto end_dump;
> + }
> +
> + memset(&req, 0, sizeof(req));
> + /* Term is encoded always. */
> + req.term = raft.term;
> + bool has_changes = old_term != raft.term;
> + if (raft.vote != 0 && old_vote != raft.vote) {
> + req.vote = raft.vote;
> + /*
> + * When vote for self, need to send current vclock too. Two
> + * reasons for that:
> + *
> + * - nodes need to vote for the instance containing the newest
> + * data. So as not to loose it, because some of it may be
> + * confirmed by the synchronous replication;
> + *
> + * - replication is basically stopped during election. Other
> + * nodes can't learn vclock of this instance through regular
> + * replication.
> + */
> + if (raft.vote == instance_id)
> + req.vclock = &replicaset.vclock;
> + has_changes = true;
> }
> - switch (req->state) {
> - case RAFT_STATE_FOLLOWER:
> - break;
> - case RAFT_STATE_CANDIDATE:
> - // Perform voting logic.
> - break;
> - case RAFT_STATE_LEADER:
> - // Switch to a new leader.
> - break;
> - default:
> - break;
> + if (raft.state != old_state) {
> + req.state = raft.state;
> + has_changes = true;
> }
> + if (has_changes)
> + raft_broadcast(&req);
> +}
> +
> +static int
> +raft_worker_f(va_list args)
> +{
> + (void)args;
> + while (!fiber_is_cancelled()) {
> + if (!raft.is_write_in_progress)
> + goto idle;
> + raft_worker_handle_io();
> + if (!raft.is_write_in_progress)
> + goto idle;
> + fiber_sleep(0);
> + continue;
> + idle:
> + assert(raft_is_fully_on_disk());
> + fiber_yield();
> + }
> + return 0;
> +}
> +
> +static void
> +raft_sm_pause_and_dump(void)
> +{
> + assert(raft.state == RAFT_STATE_FOLLOWER);
> + if (raft.is_write_in_progress)
> + return;
> + ev_timer_stop(loop(), &raft.timer);
> + raft.is_write_in_progress = true;
> + if (raft.worker == NULL)
> + raft.worker = fiber_new("raft_worker", raft_worker_f);
> + fiber_wakeup(raft.worker);
> +}
> +
> +static void
> +raft_sm_become_leader(void)
> +{
> + assert(raft.state != RAFT_STATE_LEADER);
> + say_info("RAFT: enter leader state with quorum %d",
> + raft_election_quorum());
> + assert(raft.leader == 0);
> + assert(raft.is_candidate);
> + assert(!raft.is_write_in_progress);
> + raft.state = RAFT_STATE_LEADER;
> + raft.leader = instance_id;
> + ev_timer_stop(loop(), &raft.timer);
> + /* Make read-write (if other subsystems allow that. */
> + box_update_ro_summary();
> +}
> +
> +static void
> +raft_sm_follow_leader(uint32_t leader)
> +{
> + say_info("RAFT: leader is %u, follow", leader);
> + assert(raft.state != RAFT_STATE_LEADER);
> + assert(raft.leader == 0);
> + raft.state = RAFT_STATE_FOLLOWER;
> + raft.leader = leader;
> + if (!raft.is_write_in_progress) {
> + ev_timer_stop(loop(), &raft.timer);
> + raft_sm_wait_leader_dead();
> + }
> +}
> +
> +static void
> +raft_sm_become_candidate(void)
> +{
> + say_info("RAFT: enter candidate state with 1 self vote");
> + assert(raft.state == RAFT_STATE_FOLLOWER);
> + assert(raft.leader == 0);
> + assert(raft.is_candidate);
> + assert(!raft.is_write_in_progress);
> + assert(raft_election_quorum() > 1);
> + raft.state = RAFT_STATE_CANDIDATE;
> + raft.vote_count = 1;
> + raft.vote_mask = 0;
> + bit_set(&raft.vote_mask, instance_id);
> + raft_sm_wait_election_end();
> +}
> +
> +static void
> +raft_sm_schedule_new_term(uint64_t new_term)
> +{
> + say_info("RAFT: bump term to %llu, follow", new_term);
> + assert(new_term > raft.volatile_term);
> + assert(raft.volatile_term >= raft.term);
> + raft.volatile_term = new_term;
> + /* New terms means completely new Raft state. */
> + raft.volatile_vote = 0;
> + raft.leader = 0;
> + raft.state = RAFT_STATE_FOLLOWER;
> + box_update_ro_summary();
> + raft_sm_pause_and_dump();
> +}
> +
> +static void
> +raft_sm_schedule_new_vote(uint32_t new_vote)
> +{
> + say_info("RAFT: vote for %u, follow", new_vote, raft.volatile_term);
> + assert(raft.volatile_vote == 0);
> + assert(raft.state == RAFT_STATE_FOLLOWER);
> + raft.volatile_vote = new_vote;
> + raft_sm_pause_and_dump();
> +}
> +
> +static void
> +raft_sm_schedule_new_election(void)
> +{
> + say_info("RAFT: begin new election round");
> + assert(raft_is_fully_on_disk());
> + assert(raft.is_candidate);
> + /* Everyone is a follower until its vote for self is persisted. */
> + raft_sm_schedule_new_term(raft.term + 1);
> + raft_sm_schedule_new_vote(instance_id);
> + box_update_ro_summary();
> +}
> +
> +static void
> +raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
> + int events)
> +{
> + assert(timer == &raft.timer);
> + (void)events;
> + ev_timer_stop(loop, timer);
> + raft_sm_schedule_new_election();
> +}
> +
> +static void
> +raft_sm_wait_leader_dead(void)
> +{
> + assert(!ev_is_active(&raft.timer));
> + assert(!raft.is_write_in_progress);
> + assert(raft.is_candidate);
> + assert(raft.state == RAFT_STATE_FOLLOWER);
> + assert(raft.leader != 0);
> + double death_timeout = replication_disconnect_timeout();
> + ev_timer_set(&raft.timer, death_timeout, death_timeout);
> + ev_timer_start(loop(), &raft.timer);
> +}
> +
> +static void
> +raft_sm_wait_election_end(void)
> +{
> + assert(!ev_is_active(&raft.timer));
> + assert(!raft.is_write_in_progress);
> + assert(raft.is_candidate);
> + assert(raft.state == RAFT_STATE_FOLLOWER ||
> + (raft.state == RAFT_STATE_CANDIDATE &&
> + raft.volatile_vote == instance_id));
> + assert(raft.leader == 0);
> + double election_timeout = raft.election_timeout +
> + raft_new_random_election_shift();
> + ev_timer_set(&raft.timer, election_timeout, election_timeout);
> + ev_timer_start(loop(), &raft.timer);
> +}
> +
> +static void
> +raft_sm_start(void)
> +{
> + say_info("RAFT: start state machine");
> + assert(!ev_is_active(&raft.timer));
> + assert(!raft.is_write_in_progress);
> + assert(!raft.is_enabled);
> + assert(raft.state == RAFT_STATE_FOLLOWER);
> + raft.is_enabled = true;
> + raft.is_candidate = raft.is_cfg_candidate;
> + if (!raft.is_candidate)
> + /* Nop. */;
> + else if (raft.leader != 0)
> + raft_sm_wait_leader_dead();
> + else
> + raft_sm_schedule_new_election();
> + box_update_ro_summary();
> + /*
> + * When Raft is enabled, send the complete state. Because
> + * it wasn't sent in disabled state.
> + */
> + struct raft_request req;
> + raft_serialize_for_network(&req, NULL);
> + raft_broadcast(&req);
> +}
> +
> +static void
> +raft_sm_stop(void)
> +{
> + say_info("RAFT: stop state machine");
> + assert(raft.is_enabled);
> + raft.is_enabled = false;
> + raft.is_candidate = false;
> + if (raft.state == RAFT_STATE_LEADER)
> + raft.leader = 0;
> + raft.state = RAFT_STATE_FOLLOWER;
> + box_update_ro_summary();
> }
>
> void
> raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
> {
> memset(req, 0, sizeof(*req));
> + /*
> + * Volatile state is never used for any communications.
> + * Use only persisted state.
> + */
> req->term = raft.term;
> req->vote = raft.vote;
> req->state = raft.state;
> @@ -128,29 +869,90 @@ raft_serialize_for_disk(struct raft_request *req)
> void
> raft_cfg_is_enabled(bool is_enabled)
> {
> - raft.is_enabled = is_enabled;
> + if (is_enabled == raft.is_enabled)
> + return;
> +
> + if (!is_enabled)
> + raft_sm_stop();
> + else
> + raft_sm_start();
> }
>
> void
> raft_cfg_is_candidate(bool is_candidate)
> {
> - raft.is_candidate = is_candidate;
> + bool old_is_candidate = raft.is_candidate;
> + raft.is_cfg_candidate = is_candidate;
> + raft.is_candidate = is_candidate && raft.is_enabled;
> + if (raft.is_candidate == old_is_candidate)
> + return;
> +
> + if (raft.is_candidate) {
> + assert(raft.state == RAFT_STATE_FOLLOWER);
> + /*
> + * If there is an on-going WAL write, it means there was some
> + * node who sent newer data to this node.
> + */
> + if (raft.leader == 0 && raft_is_fully_on_disk())
> + raft_sm_schedule_new_election();
> + } else if (raft.state != RAFT_STATE_FOLLOWER) {
> + if (raft.state == RAFT_STATE_LEADER)
> + raft.leader = 0;
> + raft.state = RAFT_STATE_FOLLOWER;
> + raft_broadcast_new_state();
> + }
> + box_update_ro_summary();
> }
>
> void
> raft_cfg_election_timeout(double timeout)
> {
> + if (timeout == raft.election_timeout)
> + return;
> +
> raft.election_timeout = timeout;
> + if (raft.vote != 0 && raft.leader == 0 && raft.is_candidate) {
> + assert(ev_is_active(&raft.timer));
> + double timeout = ev_timer_remaining(loop(), &raft.timer) -
> + raft.timer.at + raft.election_timeout;
> + ev_timer_stop(loop(), &raft.timer);
> + ev_timer_set(&raft.timer, timeout, timeout);
> + ev_timer_start(loop(), &raft.timer);
> + }
> }
>
> void
> raft_cfg_election_quorum(void)
> {
> + if (raft.state != RAFT_STATE_CANDIDATE ||
> + raft.state == RAFT_STATE_LEADER)
> + return;
> + if (raft.vote_count < raft_election_quorum())
> + return;
> + /*
> + * The node is a candidate. It means its state if fully synced with
> + * disk. Otherwise it would be a follower.
> + */
> + assert(!raft.is_write_in_progress);
> + raft.state = RAFT_STATE_LEADER;
> + raft.leader = instance_id;
> + raft_broadcast_new_state();
> + box_update_ro_summary();
> }
>
> void
> raft_cfg_death_timeout(void)
> {
> + if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate &&
> + raft.leader != 0) {
> + assert(ev_is_active(&raft.timer));
> + double death_timeout = replication_disconnect_timeout();
> + double timeout = ev_timer_remaining(loop(), &raft.timer) -
> + raft.timer.at + death_timeout;
> + ev_timer_stop(loop(), &raft.timer);
> + ev_timer_set(&raft.timer, timeout, timeout);
> + ev_timer_start(loop(), &raft.timer);
> + }
> }
>
> void
> @@ -163,3 +965,9 @@ raft_broadcast(const struct raft_request *req)
> }
> }
> }
> +
> +void
> +raft_init(void)
> +{
> + ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0);
> +}
> diff --git a/src/box/raft.h b/src/box/raft.h
> index db64cf933..23aedfe10 100644
> --- a/src/box/raft.h
> +++ b/src/box/raft.h
> @@ -31,34 +31,140 @@
> */
> #include <stdint.h>
> #include <stdbool.h>
> +#include "tarantool_ev.h"
>
> #if defined(__cplusplus)
> extern "C" {
> #endif
>
> +/**
> + * This is an implementation of Raft leader election protocol, separated from
> + * synchronous replication part.
> + *
> + * The protocol describes an algorithm which helps to elect a single leader in
> + * the cluster, which is supposed to handle write requests. And re-elect a new
> + * leader, when the current leader dies.
> + *
> + * The implementation follows the protocol to the letter except a few important
> + * details.
> + *
> + * Firstly, the original Raft assumes, that all nodes share the same log record
> + * numbers. In Tarantool they are called LSNs. But in case of Tarantool each
> + * node has its own LSN in its own component of vclock. That makes the election
> + * messages a bit heavier, because the nodes need to send and compare complete
> + * vclocks of each other instead of a single number like in the original Raft.
> + * But logic becomes simpler. Because in the original Raft there is a problem of
> + * uncertainty about what to do with records of an old leader right after a new
> + * leader is elected. They could be rolled back or confirmed depending on
> + * circumstances. The issue disappears when vclock is used.
> + *
> + * Secondly, leader election works differently during cluster bootstrap, until
> + * number of bootstrapped replicas becomes >= election quorum. That arises from
> + * specifics of replicas bootstrap and order of systems initialization. In
> + * short: during bootstrap a leader election may use a smaller election quorum
> + * than the configured one. See more details in the code.
> + */
> +
> +struct fiber;
> struct raft_request;
> struct vclock;
>
> enum raft_state {
> + /**
> + * Can't write. Can only accept data from a leader. Node in this state
> + * either monitors an existing leader, or there is an on-going election
> + * and the node voted for another node, or it can't be a candidate and
> + * does not do anything.
> + */
> RAFT_STATE_FOLLOWER = 1,
> + /**
> + * The node can't write. There is an active election, in which the node
> + * voted for self. Now it waits for election outcome.
> + */
> RAFT_STATE_CANDIDATE = 2,
> + /** Election was successful. The node accepts write requests. */
> RAFT_STATE_LEADER = 3,
> };
>
> extern const char *raft_state_strs[];
>
> struct raft {
> + /** Instance ID of leader of the current term. */
> uint32_t leader;
> + /** State of the instance. */
> enum raft_state state;
> + /**
> + * Volatile part of the Raft state, whose WAL write may be still
> + * in-progress, and yet the state may be already used. Volatile state is
> + * never sent to anywhere, but the state machine makes decisions based
> + * on it. That is vital.
> + * As an example, volatile vote needs to be used to reject votes inside
> + * a term, where the instance already voted (even if the vote WAL write
> + * is not finished yet). Otherwise the instance would try to write
> + * several votes inside one term.
> + */
> + uint64_t volatile_term;
> + uint32_t volatile_vote;
> + /**
> + * Flag whether Raft is enabled. When disabled, it still persists terms
> + * so as to quickly enroll into the cluster when (if) it is enabled. In
> + * everything else disabled Raft does not affect instance work.
> + */
> bool is_enabled;
> + /**
> + * Flag whether the node can become a leader. It is an accumulated value
> + * of configuration options Raft enabled and Raft candidate. If at least
> + * one is false - the instance is not a candidate.
> + */
> bool is_candidate;
> + /** Flag whether the instance is allowed to be a leader. */
> + bool is_cfg_candidate;
> + /**
> + * Flag whether Raft currently tries to write something into WAL. It
> + * happens asynchronously, not right after Raft state is updated.
> + */
> + bool is_write_in_progress;
> + /**
> + * Persisted Raft state. These values are used when need to tell current
> + * Raft state to other nodes.
> + */
> uint64_t term;
> uint32_t vote;
> + /**
> + * Bit 1 on position N means that a vote from instance with ID = N was
> + * obtained.
> + */
> + uint32_t vote_mask;
> + /** Number of votes for this instance. Valid only in candidate state. */
> + int vote_count;
> + /** State machine timed event trigger. */
> + struct ev_timer timer;
> + /** Worker fiber to execute blocking tasks like IO. */
> + struct fiber *worker;
> + /** Configured election timeout in seconds. */
> double election_timeout;
> };
>
> extern struct raft raft;
>
> +/**
> + * A flag whether the instance is read-only according to Raft. Even if Raft
> + * allows writes though, it does not mean the instance is writable. It can be
> + * affected by box.cfg.read_only, connection quorum.
> + */
> +static inline bool
> +raft_is_ro(void)
> +{
> + return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
> +}
> +
> +/** See if the instance can accept rows from an instance with the given ID. */
> +static inline bool
> +raft_is_source_allowed(uint32_t source_id)
> +{
> + return !raft.is_enabled || raft.leader == source_id;
> +}
> +
> /** Check if Raft is enabled. */
> static inline bool
> raft_is_enabled(void)
> @@ -78,6 +184,13 @@ raft_process_recovery(const struct raft_request *req);
> void
> raft_process_msg(const struct raft_request *req, uint32_t source);
>
> +/**
> + * Process a heartbeat message from an instance with the given ID. It is used to
> + * watch leader's health and start election when necessary.
> + */
> +void
> +raft_process_heartbeat(uint32_t source);
> +
> /** Configure whether Raft is enabled. */
> void
> raft_cfg_is_enabled(bool is_enabled);
> @@ -130,6 +243,10 @@ raft_serialize_for_disk(struct raft_request *req);
> void
> raft_broadcast(const struct raft_request *req);
>
> +/** Initialize Raft global data structures. */
> +void
> +raft_init(void);
> +
> #if defined(__cplusplus)
> }
> #endif
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 74581db9c..d63711600 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -771,6 +771,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
> relay_send(relay, row);
> }
>
> +/**
> + * Recreate recovery cursor from the last confirmed point. That is
> + * used by Raft, when the node becomes a leader. It may happen,
> + * that it already sent some data to other nodes as a follower,
> + * and they ignored the data. Now when the node is a leader, it
> + * should send the not confirmed data again. Otherwise the cluster
> + * will stuck, or worse - the newer data would be sent without the
> + * older sent but ignored data.
> + */
> +static void
> +relay_restart_recovery(struct relay *relay)
> +{
> + recovery_delete(relay->r);
> + relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock);
> + recover_remaining_wals(relay->r, &relay->stream, NULL, true);
> +}
> +
> struct relay_raft_msg {
> struct cmsg base;
> struct cmsg_hop route;
> @@ -786,7 +803,14 @@ relay_raft_msg_push(struct cmsg *base)
> struct xrow_header row;
> xrow_encode_raft(&row, &fiber()->gc, &msg->req);
> try {
> + /*
> + * Send the message before restarting the recovery. Otherwise
> + * all the rows would be sent from under a non-leader role and
> + * would be ignored again.
> + */
> relay_send(msg->relay, &row);
> + if (msg->req.state == RAFT_STATE_LEADER)
> + relay_restart_recovery(msg->relay);
> } catch (Exception *e) {
> relay_set_error(msg->relay, e);
> fiber_cancel(fiber());
--
Serge Petrenko
More information about the Tarantool-patches
mailing list