[Tarantool-patches] [PATCH v2 08/11] raft: relay status updates to followers
Serge Petrenko
sergepetrenko at tarantool.org
Mon Sep 21 13:50:39 MSK 2020
10.09.2020 02:16, Vladislav Shpilevoy пишет:
> From: sergepetrenko <sergepetrenko at tarantool.org>
>
> The patch introduces a new type of system message used to notify the
> followers of the instance's raft status updates.
> It's relay's responsibility to deliver the new system rows to its peers.
> The notification system reuses and extends the same row type used to
> persist raft state in WAL and snapshot.
>
> Part of #1146
> Part of #5204
> ---
> src/box/applier.cc | 31 ++++++++++++--
> src/box/box.cc | 21 +++++++++-
> src/box/iproto_constants.h | 2 +
> src/box/memtx_engine.c | 3 +-
> src/box/raft.c | 72 ++++++++++++++++++++++++++++++++-
> src/box/raft.h | 35 +++++++++++++++-
> src/box/relay.cc | 62 +++++++++++++++++++++++++++-
> src/box/relay.h | 7 ++++
> src/box/xrow.c | 83 ++++++++++++++++++++++++++++++++------
> src/box/xrow.h | 5 ++-
> 10 files changed, 297 insertions(+), 24 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index c1d07ca54..ed76bf2ca 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -55,6 +55,7 @@
> #include "scoped_guard.h"
> #include "txn_limbo.h"
> #include "journal.h"
> +#include "raft.h"
>
> STRS(applier_state, applier_STATE);
>
> @@ -298,6 +299,8 @@ apply_final_join_row(struct xrow_header *row)
> */
> if (iproto_type_is_synchro_request(row->type))
> return 0;
> + if (iproto_type_is_raft_request(row->type))
> + return 0;
> struct txn *txn = txn_begin();
> if (txn == NULL)
> return -1;
> @@ -876,6 +879,19 @@ err:
> return -1;
> }
Hi! Consider these fixes on top of this commit:
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 928bf1497..250794a3e 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -994,24 +994,28 @@ xrow_encode_raft(struct xrow_header *row, struct
region *region,
memset(row, 0, sizeof(*row));
row->type = IPROTO_RAFT;
row->body[0].iov_base = buf;
- row->body[0].iov_len = size;
row->group_id = GROUP_LOCAL;
row->bodycnt = 1;
- buf = mp_encode_map(buf, map_size);
- buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
- buf = mp_encode_uint(buf, r->term);
+ char *data = buf;
+
+ data = mp_encode_map(data, map_size);
+ data = mp_encode_uint(data, IPROTO_RAFT_TERM);
+ data = mp_encode_uint(data, r->term);
if (r->vote != 0) {
- buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
- buf = mp_encode_uint(buf, r->vote);
+ data = mp_encode_uint(data, IPROTO_RAFT_VOTE);
+ data = mp_encode_uint(data, r->vote);
}
if (r->state != 0) {
- buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
- buf = mp_encode_uint(buf, r->state);
+ data = mp_encode_uint(data, IPROTO_RAFT_STATE);
+ data = mp_encode_uint(data, r->state);
}
if (r->vclock != NULL) {
- buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK);
- buf = mp_encode_vclock_ignore0(buf, r->vclock);
+ data = mp_encode_uint(data, IPROTO_RAFT_VCLOCK);
+ data = mp_encode_vclock_ignore0(data, r->vclock);
}
+
+ row->body[0].iov_len = data - buf ;
+
return 0;
}
>
> +static int
> +applier_handle_raft(struct applier *applier, struct xrow_header *row)
> +{
> + assert(iproto_type_is_raft_request(row->type));
> +
> + struct raft_request req;
> + struct vclock candidate_clock;
> + if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
> + return -1;
> + raft_process_msg(&req, applier->instance_id);
> + return 0;
> +}
> +
> /**
> * Apply all rows in the rows queue as a single transaction.
> *
> @@ -1219,11 +1235,20 @@ applier_subscribe(struct applier *applier)
> * In case of an heartbeat message wake a writer up
> * and check applier state.
> */
> - if (stailq_first_entry(&rows, struct applier_tx_row,
> - next)->row.lsn == 0)
> + struct xrow_header *first_row =
> + &stailq_first_entry(&rows, struct applier_tx_row,
> + next)->row;
> + if (first_row->lsn == 0) {
> + if (unlikely(iproto_type_is_raft_request(
> + first_row->type))) {
> + if (applier_handle_raft(applier,
> + first_row) != 0)
> + diag_raise();
> + }
> applier_signal_ack(applier);
> - else if (applier_apply_tx(&rows) != 0)
> + } else if (applier_apply_tx(&rows) != 0) {
> diag_raise();
> + }
>
> if (ibuf_used(ibuf) == 0)
> ibuf_reset(ibuf);
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 7c3c895d2..980754d1d 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -387,7 +387,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
> }
> if (iproto_type_is_raft_request(row->type)) {
> struct raft_request raft_req;
> - if (xrow_decode_raft(row, &raft_req) != 0)
> + /* Vclock is never persisted in WAL by Raft. */
> + if (xrow_decode_raft(row, &raft_req, NULL) != 0)
> diag_raise();
> raft_process_recovery(&raft_req);
> return;
> @@ -2132,7 +2133,23 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
> tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
> say_info("remote vclock %s local vclock %s",
> vclock_to_string(&replica_clock), vclock_to_string(&vclock));
> -
> + if (raft_is_enabled()) {
> + /*
> + * Send out the current raft state of the instance. Don't do
> + * that if Raft is disabled. It can be that a part of the
> + * cluster still contains old versions, which can't handle Raft
> + * messages. So when it is disabled, its network footprint
> + * should be 0.
> + */
> + struct raft_request req;
> + /*
> + * Omit the candidate vclock, since we've just sent it in
> + * subscribe response.
> + */
> + raft_serialize_for_network(&req, NULL);
> + xrow_encode_raft(&row, &fiber()->gc, &req);
> + coio_write_xrow(io, &row);
> + }
> /*
> * Replica clock is used in gc state and recovery
> * initialization, so we need to replace the remote 0-th
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 8a11626b3..3ec397d3c 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -263,6 +263,8 @@ extern const char *iproto_type_strs[];
> enum iproto_raft_keys {
> IPROTO_RAFT_TERM = 0,
> IPROTO_RAFT_VOTE = 1,
> + IPROTO_RAFT_STATE = 2,
> + IPROTO_RAFT_VCLOCK = 3,
> };
>
> /**
> diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
> index 5ab2cf266..166fe3136 100644
> --- a/src/box/memtx_engine.c
> +++ b/src/box/memtx_engine.c
> @@ -206,7 +206,8 @@ memtx_engine_recover_raft(const struct xrow_header *row)
> {
> assert(row->type == IPROTO_RAFT);
> struct raft_request req;
> - if (xrow_decode_raft(row, &req) != 0)
> + /* Vclock is never persisted in WAL by Raft. */
> + if (xrow_decode_raft(row, &req, NULL) != 0)
> return -1;
> raft_process_recovery(&req);
> return 0;
> diff --git a/src/box/raft.c b/src/box/raft.c
> index ee54d02b7..4d3d07c48 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -34,9 +34,20 @@
> #include "journal.h"
> #include "xrow.h"
> #include "small/region.h"
> +#include "replication.h"
> +#include "relay.h"
> +
> +const char *raft_state_strs[] = {
> + NULL,
> + "follower",
> + "candidate",
> + "leader",
> +};
>
> /** Raft state of this instance. */
> struct raft raft = {
> + .leader = 0,
> + .state = RAFT_STATE_FOLLOWER,
> .is_enabled = false,
> .is_candidate = false,
> .term = 1,
> @@ -50,18 +61,66 @@ raft_process_recovery(const struct raft_request *req)
> raft.term = req->term;
> if (req->vote != 0)
> raft.vote = req->vote;
> + /*
> + * Role is never persisted. If recovery is happening, the
> + * node was restarted, and the former role can be false
> + * anyway.
> + */
> + assert(req->state == 0);
> + /*
> + * Vclock is always persisted by some other subsystem - WAL, snapshot.
> + * It is used only to decide to whom to give the vote during election,
> + * as a part of the volatile state.
> + */
> + assert(req->vclock == NULL);
> + /* Raft is not enabled until recovery is finished. */
> + assert(!raft_is_enabled());
> }
>
> void
> -raft_serialize_for_network(struct raft_request *req)
> +raft_process_msg(const struct raft_request *req, uint32_t source)
> {
> + (void)source;
> + if (req->term > raft.term) {
> + // Update term.
> + // The logic will be similar, but the code
> + // below is for testing purposes.
> + raft.term = req->term;
> + }
> + if (req->vote > 0) {
> + // Check whether the vote's for us.
> + }
> + switch (req->state) {
> + case RAFT_STATE_FOLLOWER:
> + break;
> + case RAFT_STATE_CANDIDATE:
> + // Perform voting logic.
> + break;
> + case RAFT_STATE_LEADER:
> + // Switch to a new leader.
> + break;
> + default:
> + break;
> + }
> +}
> +
> +void
> +raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
> +{
> + memset(req, 0, sizeof(*req));
> req->term = raft.term;
> req->vote = raft.vote;
> + req->state = raft.state;
> + /*
> + * Raft does not own vclock, so it always expects it passed externally.
> + */
> + req->vclock = vclock;
> }
>
> void
> raft_serialize_for_disk(struct raft_request *req)
> {
> + memset(req, 0, sizeof(*req));
> req->term = raft.term;
> req->vote = raft.vote;
> }
> @@ -93,3 +152,14 @@ void
> raft_cfg_death_timeout(void)
> {
> }
> +
> +void
> +raft_broadcast(const struct raft_request *req)
> +{
> + replicaset_foreach(replica) {
> + if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
> + relay_get_state(replica->relay) == RELAY_FOLLOW) {
> + relay_push_raft(replica->relay, req);
> + }
> + }
> +}
> diff --git a/src/box/raft.h b/src/box/raft.h
> index f27222752..db64cf933 100644
> --- a/src/box/raft.h
> +++ b/src/box/raft.h
> @@ -37,8 +37,19 @@ extern "C" {
> #endif
>
> struct raft_request;
> +struct vclock;
> +
> +enum raft_state {
> + RAFT_STATE_FOLLOWER = 1,
> + RAFT_STATE_CANDIDATE = 2,
> + RAFT_STATE_LEADER = 3,
> +};
> +
> +extern const char *raft_state_strs[];
>
> struct raft {
> + uint32_t leader;
> + enum raft_state state;
> bool is_enabled;
> bool is_candidate;
> uint64_t term;
> @@ -48,10 +59,25 @@ struct raft {
>
> extern struct raft raft;
>
> +/** Check if Raft is enabled. */
> +static inline bool
> +raft_is_enabled(void)
> +{
> + return raft.is_enabled;
> +}
> +
> /** Process a raft entry stored in WAL/snapshot. */
> void
> raft_process_recovery(const struct raft_request *req);
>
> +/**
> + * Process a raft status message coming from the network.
> + * @param req Raft request.
> + * @param source Instance ID of the message sender.
> + */
> +void
> +raft_process_msg(const struct raft_request *req, uint32_t source);
> +
> /** Configure whether Raft is enabled. */
> void
> raft_cfg_is_enabled(bool is_enabled);
> @@ -88,7 +114,7 @@ raft_cfg_death_timeout(void);
> * cluster. It is allowed to save anything here, not only persistent state.
> */
> void
> -raft_serialize_for_network(struct raft_request *req);
> +raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
>
> /**
> * Save complete Raft state into a request to be persisted on disk. Only term
> @@ -97,6 +123,13 @@ raft_serialize_for_network(struct raft_request *req);
> void
> raft_serialize_for_disk(struct raft_request *req);
>
> +/**
> + * Broadcast the changes in this instance's raft status to all
> + * the followers.
> + */
> +void
> +raft_broadcast(const struct raft_request *req);
> +
> #if defined(__cplusplus)
> }
> #endif
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 124b0f52f..74581db9c 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -53,6 +53,7 @@
> #include "xstream.h"
> #include "wal.h"
> #include "txn_limbo.h"
> +#include "raft.h"
>
> /**
> * Cbus message to send status updates from relay to tx thread.
> @@ -770,13 +771,68 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
> relay_send(relay, row);
> }
>
> +struct relay_raft_msg {
> + struct cmsg base;
> + struct cmsg_hop route;
> + struct raft_request req;
> + struct vclock vclock;
> + struct relay *relay;
> +};
> +
> +static void
> +relay_raft_msg_push(struct cmsg *base)
> +{
> + struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
> + struct xrow_header row;
> + xrow_encode_raft(&row, &fiber()->gc, &msg->req);
> + try {
> + relay_send(msg->relay, &row);
> + } catch (Exception *e) {
> + relay_set_error(msg->relay, e);
> + fiber_cancel(fiber());
> + }
> + free(msg);
> +}
> +
> +void
> +relay_push_raft(struct relay *relay, const struct raft_request *req)
> +{
> + /*
> + * XXX: the message should be preallocated. It should
> + * work like Kharon in IProto. Relay should have 2 raft
> + * messages rotating. When one is sent, the other can be
> + * updated and a flag is set. When the first message is
> + * sent, the control returns to TX thread, sees the set
> + * flag, rotates the buffers, and sends it again. And so
> + * on. This is how it can work in future, with 0 heap
> + * allocations. Current solution with alloc-per-update is
> + * good enough as a start. Another option - wait until all
> + * is moved to WAL thread, where this will all happen
> + * in one thread and will be much simpler.
> + */
> + struct relay_raft_msg *msg =
> + (struct relay_raft_msg *)malloc(sizeof(*msg));
> + if (msg == NULL) {
> + panic("Couldn't allocate raft message");
> + return;
> + }
> + msg->req = *req;
> + if (req->vclock != NULL) {
> + msg->req.vclock = &msg->vclock;
> + vclock_copy(&msg->vclock, req->vclock);
> + }
> + msg->route.f = relay_raft_msg_push;
> + msg->route.pipe = NULL;
> + cmsg_init(&msg->base, &msg->route);
> + msg->relay = relay;
> + cpipe_push(&relay->relay_pipe, &msg->base);
> +}
> +
> /** Send a single row to the client. */
> static void
> relay_send_row(struct xstream *stream, struct xrow_header *packet)
> {
> struct relay *relay = container_of(stream, struct relay, stream);
> - assert(iproto_type_is_dml(packet->type) ||
> - iproto_type_is_synchro_request(packet->type));
> if (packet->group_id == GROUP_LOCAL) {
> /*
> * We do not relay replica-local rows to other
> @@ -793,6 +849,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
> packet->group_id = GROUP_DEFAULT;
> packet->bodycnt = 0;
> }
> + assert(iproto_type_is_dml(packet->type) ||
> + iproto_type_is_synchro_request(packet->type));
> /* Check if the rows from the instance are filtered. */
> if ((1 << packet->replica_id & relay->id_filter) != 0)
> return;
> diff --git a/src/box/relay.h b/src/box/relay.h
> index 0632fa912..b32e2ea2a 100644
> --- a/src/box/relay.h
> +++ b/src/box/relay.h
> @@ -93,6 +93,13 @@ relay_vclock(const struct relay *relay);
> double
> relay_last_row_time(const struct relay *relay);
>
> +/**
> + * Send a Raft update request to the relay channel. It is not
> + * guaranteed that it will be delivered. The connection may break.
> + */
> +void
> +relay_push_raft(struct relay *relay, const struct raft_request *req);
> +
> #if defined(__cplusplus)
> } /* extern "C" */
> #endif /* defined(__cplusplus) */
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index 1923bacfc..11fdacc0d 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -958,11 +958,30 @@ int
> xrow_encode_raft(struct xrow_header *row, struct region *region,
> const struct raft_request *r)
> {
> - size_t size = mp_sizeof_map(2) +
> - mp_sizeof_uint(IPROTO_RAFT_TERM) +
> - mp_sizeof_uint(r->term) +
> - mp_sizeof_uint(IPROTO_RAFT_VOTE) +
> - mp_sizeof_uint(r->vote);
> + /*
> + * Terms is encoded always. Sometimes the rest can be even ignored if
> + * the term is too old.
> + */
> + int map_size = 1;
> + size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) +
> + mp_sizeof_uint(r->term);
> + if (r->vote != 0) {
> + ++map_size;
> + size += mp_sizeof_uint(IPROTO_RAFT_VOTE) +
> + mp_sizeof_uint(r->vote);
> + }
> + if (r->state != 0) {
> + ++map_size;
> + size += mp_sizeof_uint(IPROTO_RAFT_STATE) +
> + mp_sizeof_uint(r->state);
> + }
> + if (r->vclock != NULL) {
> + ++map_size;
> + size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) +
> + mp_sizeof_vclock_ignore0(r->vclock);
> + }
> + size += mp_sizeof_map(map_size);
> +
> char *buf = region_alloc(region, size);
> if (buf == NULL) {
> diag_set(OutOfMemory, size, "region_alloc", "buf");
> @@ -974,40 +993,78 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
> row->body[0].iov_len = size;
> row->group_id = GROUP_LOCAL;
> row->bodycnt = 1;
> - buf = mp_encode_map(buf, 2);
> + buf = mp_encode_map(buf, map_size);
> buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
> buf = mp_encode_uint(buf, r->term);
> - buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
> - buf = mp_encode_uint(buf, r->vote);
> + if (r->vote != 0) {
> + buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
> + buf = mp_encode_uint(buf, r->vote);
> + }
> + if (r->state != 0) {
> + buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
> + buf = mp_encode_uint(buf, r->state);
> + }
> + if (r->vclock != NULL) {
> + buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK);
> + buf = mp_encode_vclock_ignore0(buf, r->vclock);
> + }
> return 0;
> }
>
> int
> -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
> +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
> + struct vclock *vclock)
> {
> - /* TODO: handle bad format. */
> assert(row->type == IPROTO_RAFT);
> - assert(row->bodycnt == 1);
> - assert(row->group_id == GROUP_LOCAL);
> + if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) {
> + diag_set(ClientError, ER_INVALID_MSGPACK,
> + "malformed raft request");
> + return -1;
> + }
> memset(r, 0, sizeof(*r));
> - const char *pos = row->body[0].iov_base;
> + r->vclock = vclock;
> +
> + const char *begin = row->body[0].iov_base;
> + const char *end = begin + row->body[0].iov_len;
> + const char *pos = begin;
> uint32_t map_size = mp_decode_map(&pos);
> for (uint32_t i = 0; i < map_size; ++i)
> {
> + if (mp_typeof(*pos) != MP_UINT)
> + goto bad_msgpack;
> uint64_t key = mp_decode_uint(&pos);
> switch (key) {
> case IPROTO_RAFT_TERM:
> + if (mp_typeof(*pos) != MP_UINT)
> + goto bad_msgpack;
> r->term = mp_decode_uint(&pos);
> break;
> case IPROTO_RAFT_VOTE:
> + if (mp_typeof(*pos) != MP_UINT)
> + goto bad_msgpack;
> r->vote = mp_decode_uint(&pos);
> break;
> + case IPROTO_RAFT_STATE:
> + if (mp_typeof(*pos) != MP_UINT)
> + goto bad_msgpack;
> + r->state = mp_decode_uint(&pos);
> + break;
> + case IPROTO_RAFT_VCLOCK:
> + if (r->vclock == NULL)
> + mp_next(&pos);
> + else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0)
> + goto bad_msgpack;
> + break;
> default:
> mp_next(&pos);
> break;
> }
> }
> return 0;
> +
> +bad_msgpack:
> + xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body");
> + return -1;
> }
>
> int
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index c234f6f88..c627102dd 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -267,6 +267,8 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
> struct raft_request {
> uint64_t term;
> uint32_t vote;
> + uint32_t state;
> + struct vclock *vclock;
> };
>
> int
> @@ -274,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
> const struct raft_request *r);
>
> int
> -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
> +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
> + struct vclock *vclock);
>
> /**
> * CALL/EVAL request.
--
Serge Petrenko
More information about the Tarantool-patches
mailing list