From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org, gorcunov@gmail.com Subject: [Tarantool-patches] [PATCH 6/8] raft: relay status updates to followers Date: Thu, 3 Sep 2020 01:33:16 +0200 [thread overview] Message-ID: <42310d7fc9a15c94e14f080c066b8eee3e4991c7.1599089353.git.v.shpilevoy@tarantool.org> (raw) In-Reply-To: <cover.1599089353.git.v.shpilevoy@tarantool.org> From: sergepetrenko <sergepetrenko@tarantool.org> The patch introduces a new type of system message used to notify the followers of the instance's raft status updates. It's relay's responsibility to deliver the new system rows to its peers. The notification system reuses and extends the same row type used to persist raft state in WAL and snapshot. Part of #1146 Part of #5204 --- src/box/applier.cc | 35 ++++++++++++++-- src/box/box.cc | 21 +++++++++- src/box/iproto_constants.h | 2 + src/box/memtx_engine.c | 9 ++++- src/box/raft.c | 73 ++++++++++++++++++++++++++++++++- src/box/raft.h | 35 +++++++++++++++- src/box/relay.cc | 62 +++++++++++++++++++++++++++- src/box/relay.h | 7 ++++ src/box/xrow.c | 83 ++++++++++++++++++++++++++++++++------ src/box/xrow.h | 5 ++- 10 files changed, 306 insertions(+), 26 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 699b5a683..17e3ce1ae 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -55,6 +55,7 @@ #include "scoped_guard.h" #include "txn_limbo.h" #include "journal.h" +#include "raft.h" STRS(applier_state, applier_STATE); @@ -315,6 +316,8 @@ apply_final_join_row(struct xrow_header *row) */ if (iproto_type_is_synchro_request(row->type)) return 0; + if (iproto_type_is_raft_request(row->type)) + return 0; struct txn *txn = txn_begin(); if (txn == NULL) return -1; @@ -894,6 +897,21 @@ err: return -1; } +static int +apply_raft_row(struct xrow_header *row) +{ + assert(iproto_type_is_raft_request(row->type)); + + struct raft_request req; + struct vclock candidate_clock; + if (xrow_decode_raft(row, &req, &candidate_clock) != 0) + return -1; + + raft_process_msg(&req); + + return 0; +} + /** * Apply all rows in the rows queue as a single transaction. * @@ -1238,11 +1256,20 @@ applier_subscribe(struct applier *applier) * In case of an heartbeat message wake a writer up * and check applier state. */ - if (stailq_first_entry(&rows, struct applier_tx_row, - next)->row.lsn == 0) - applier_signal_ack(applier); - else if (applier_apply_tx(&rows) != 0) + struct xrow_header *first_row = + &stailq_first_entry(&rows, struct applier_tx_row, + next)->row; + if (first_row->lsn == 0) { + if (unlikely(iproto_type_is_raft_request( + first_row->type))) { + if (apply_raft_row(first_row) != 0) + diag_raise(); + } else { + applier_signal_ack(applier); + } + } else if (applier_apply_tx(&rows) != 0) { diag_raise(); + } if (ibuf_used(ibuf) == 0) ibuf_reset(ibuf); diff --git a/src/box/box.cc b/src/box/box.cc index 5f04a1a78..427b771b3 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -387,7 +387,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) } if (iproto_type_is_raft_request(row->type)) { struct raft_request raft_req; - if (xrow_decode_raft(row, &raft_req) != 0) + /* Vclock is never persisted in WAL by Raft. */ + if (xrow_decode_raft(row, &raft_req, NULL) != 0) diag_raise(); raft_process_recovery(&raft_req); return; @@ -2132,7 +2133,23 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) tt_uuid_str(&replica_uuid), sio_socketname(io->fd)); say_info("remote vclock %s local vclock %s", vclock_to_string(&replica_clock), vclock_to_string(&vclock)); - + if (raft_is_enabled()) { + /* + * Send out the current raft state of the instance. Don't do + * that if Raft is disabled. It can be that a part of the + * cluster still contains old versions, which can't handle Raft + * messages. So when it is disabled, its network footprint + * should be 0. + */ + struct raft_request req; + /* + * Omit the candidate vclock, since we've just + * sent it in subscribe response. + */ + raft_serialize(&req, NULL); + xrow_encode_raft(&row, &fiber()->gc, &req); + coio_write_xrow(io, &row); + } /* * Replica clock is used in gc state and recovery * initialization, so we need to replace the remote 0-th diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 8a11626b3..3ec397d3c 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -263,6 +263,8 @@ extern const char *iproto_type_strs[]; enum iproto_raft_keys { IPROTO_RAFT_TERM = 0, IPROTO_RAFT_VOTE = 1, + IPROTO_RAFT_STATE = 2, + IPROTO_RAFT_VCLOCK = 3, }; /** diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index 31a8c260a..b0b744db8 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -206,7 +206,8 @@ memtx_engine_recover_raft(const struct xrow_header *row) { assert(row->type == IPROTO_RAFT); struct raft_request req; - if (xrow_decode_raft(row, &req) != 0) + /* Vclock is never persisted in WAL by Raft. */ + if (xrow_decode_raft(row, &req, NULL) != 0) return -1; raft_process_recovery(&req); return 0; @@ -516,7 +517,11 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit) opts.free_cache = true; xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts); vclock_create(&ckpt->vclock); - raft_serialize(&ckpt->raft); + /* + * Don't encode vclock, because it is stored in the snapshot header + * anyway. + */ + raft_serialize(&ckpt->raft, NULL); ckpt->touch = false; return ckpt; } diff --git a/src/box/raft.c b/src/box/raft.c index 6fd5515f4..fd8853d6e 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -34,9 +34,20 @@ #include "journal.h" #include "xrow.h" #include "small/region.h" +#include "replication.h" +#include "relay.h" + +const char *raft_state_strs[] = { + NULL, + "follower", + "candidate", + "leader", +}; /** Raft state of this instance. */ struct raft raft = { + .leader = 0, + .state = RAFT_STATE_FOLLOWER, .is_enabled = false, .is_candidate = false, .term = 1, @@ -50,13 +61,59 @@ raft_process_recovery(const struct raft_request *req) raft.term = req->term; if (req->vote != 0) raft.vote = req->vote; + /* + * Role is never persisted. If recovery is happening, the + * node was restarted, and the former role can be false + * anyway. + */ + assert(req->state == 0); + /* + * Vclock is always persisted by some other subsystem - WAL, snapshot. + * It is used only to decide to whom to give the vote during election, + * as a part of the volatile state. + */ + assert(req->vclock == NULL); + /* Raft is not enabled until recovery is finished. */ + assert(!raft_is_enabled()); +} + +void +raft_process_msg(const struct raft_request *req) +{ + if (req->term > raft.term) { + // Update term. + // The logic will be similar, but the code + // below is for testing purposes. + raft.term = req->term; + } + if (req->vote > 0) { + // Check whether the vote's for us. + } + switch (req->state) { + case RAFT_STATE_FOLLOWER: + break; + case RAFT_STATE_CANDIDATE: + // Perform voting logic. + break; + case RAFT_STATE_LEADER: + // Switch to a new leader. + break; + default: + break; + } } void -raft_serialize(struct raft_request *req) +raft_serialize(struct raft_request *req, struct vclock *vclock) { + memset(req, 0, sizeof(*req)); req->term = raft.term; req->vote = raft.vote; + req->state = raft.state; + /* + * Raft does not own vclock, so it always expects it passed externally. + */ + req->vclock = vclock; } static void @@ -86,6 +143,9 @@ raft_write_request(const struct raft_request *req) diag_log(); goto fail; } + + raft_broadcast(req); + region_truncate(region, svp); return; fail: @@ -148,3 +208,14 @@ raft_vote(uint32_t vote_for) req.vote = vote_for; raft_write_request(&req); } + +void +raft_broadcast(const struct raft_request *req) +{ + replicaset_foreach(replica) { + if (replica->relay != NULL && replica->id != REPLICA_ID_NIL && + relay_get_state(replica->relay) == RELAY_FOLLOW) { + relay_push_raft(replica->relay, req); + } + } +} diff --git a/src/box/raft.h b/src/box/raft.h index 2c4b5036c..0bf87e64b 100644 --- a/src/box/raft.h +++ b/src/box/raft.h @@ -37,8 +37,19 @@ extern "C" { #endif struct raft_request; +struct vclock; + +enum raft_state { + RAFT_STATE_FOLLOWER = 1, + RAFT_STATE_CANDIDATE = 2, + RAFT_STATE_LEADER = 3, +}; + +extern const char *raft_state_strs[]; struct raft { + uint32_t leader; + enum raft_state state; bool is_enabled; bool is_candidate; uint64_t term; @@ -54,9 +65,24 @@ raft_new_term(uint64_t min_new_term); void raft_vote(uint32_t vote_for); +static inline bool +raft_is_enabled(void) +{ + return raft.is_enabled; +} + +/** Process a raft entry stored in WAL/snapshot. */ void raft_process_recovery(const struct raft_request *req); +/** Process a raft status message coming from the network. */ +void +raft_process_msg(const struct raft_request *req); + +/** + * Broadcast the changes in this instance's raft status to all + * the followers. + */ void raft_cfg_is_enabled(bool is_enabled); @@ -74,7 +100,14 @@ raft_cfg_death_timeout(void); /** Save complete Raft state into the request. */ void -raft_serialize(struct raft_request *req); +raft_serialize(struct raft_request *req, struct vclock *vclock); + +/** + * Broadcast the changes in this instance's raft status to all + * the followers. + */ +void +raft_broadcast(const struct raft_request *req); #if defined(__cplusplus) } diff --git a/src/box/relay.cc b/src/box/relay.cc index 124b0f52f..74581db9c 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -53,6 +53,7 @@ #include "xstream.h" #include "wal.h" #include "txn_limbo.h" +#include "raft.h" /** * Cbus message to send status updates from relay to tx thread. @@ -770,13 +771,68 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) relay_send(relay, row); } +struct relay_raft_msg { + struct cmsg base; + struct cmsg_hop route; + struct raft_request req; + struct vclock vclock; + struct relay *relay; +}; + +static void +relay_raft_msg_push(struct cmsg *base) +{ + struct relay_raft_msg *msg = (struct relay_raft_msg *)base; + struct xrow_header row; + xrow_encode_raft(&row, &fiber()->gc, &msg->req); + try { + relay_send(msg->relay, &row); + } catch (Exception *e) { + relay_set_error(msg->relay, e); + fiber_cancel(fiber()); + } + free(msg); +} + +void +relay_push_raft(struct relay *relay, const struct raft_request *req) +{ + /* + * XXX: the message should be preallocated. It should + * work like Kharon in IProto. Relay should have 2 raft + * messages rotating. When one is sent, the other can be + * updated and a flag is set. When the first message is + * sent, the control returns to TX thread, sees the set + * flag, rotates the buffers, and sends it again. And so + * on. This is how it can work in future, with 0 heap + * allocations. Current solution with alloc-per-update is + * good enough as a start. Another option - wait until all + * is moved to WAL thread, where this will all happen + * in one thread and will be much simpler. + */ + struct relay_raft_msg *msg = + (struct relay_raft_msg *)malloc(sizeof(*msg)); + if (msg == NULL) { + panic("Couldn't allocate raft message"); + return; + } + msg->req = *req; + if (req->vclock != NULL) { + msg->req.vclock = &msg->vclock; + vclock_copy(&msg->vclock, req->vclock); + } + msg->route.f = relay_raft_msg_push; + msg->route.pipe = NULL; + cmsg_init(&msg->base, &msg->route); + msg->relay = relay; + cpipe_push(&relay->relay_pipe, &msg->base); +} + /** Send a single row to the client. */ static void relay_send_row(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); - assert(iproto_type_is_dml(packet->type) || - iproto_type_is_synchro_request(packet->type)); if (packet->group_id == GROUP_LOCAL) { /* * We do not relay replica-local rows to other @@ -793,6 +849,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) packet->group_id = GROUP_DEFAULT; packet->bodycnt = 0; } + assert(iproto_type_is_dml(packet->type) || + iproto_type_is_synchro_request(packet->type)); /* Check if the rows from the instance are filtered. */ if ((1 << packet->replica_id & relay->id_filter) != 0) return; diff --git a/src/box/relay.h b/src/box/relay.h index 0632fa912..b32e2ea2a 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -93,6 +93,13 @@ relay_vclock(const struct relay *relay); double relay_last_row_time(const struct relay *relay); +/** + * Send a Raft update request to the relay channel. It is not + * guaranteed that it will be delivered. The connection may break. + */ +void +relay_push_raft(struct relay *relay, const struct raft_request *req); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/xrow.c b/src/box/xrow.c index 1923bacfc..11fdacc0d 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -958,11 +958,30 @@ int xrow_encode_raft(struct xrow_header *row, struct region *region, const struct raft_request *r) { - size_t size = mp_sizeof_map(2) + - mp_sizeof_uint(IPROTO_RAFT_TERM) + - mp_sizeof_uint(r->term) + - mp_sizeof_uint(IPROTO_RAFT_VOTE) + - mp_sizeof_uint(r->vote); + /* + * Terms is encoded always. Sometimes the rest can be even ignored if + * the term is too old. + */ + int map_size = 1; + size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) + + mp_sizeof_uint(r->term); + if (r->vote != 0) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_VOTE) + + mp_sizeof_uint(r->vote); + } + if (r->state != 0) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_STATE) + + mp_sizeof_uint(r->state); + } + if (r->vclock != NULL) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) + + mp_sizeof_vclock_ignore0(r->vclock); + } + size += mp_sizeof_map(map_size); + char *buf = region_alloc(region, size); if (buf == NULL) { diag_set(OutOfMemory, size, "region_alloc", "buf"); @@ -974,40 +993,78 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, row->body[0].iov_len = size; row->group_id = GROUP_LOCAL; row->bodycnt = 1; - buf = mp_encode_map(buf, 2); + buf = mp_encode_map(buf, map_size); buf = mp_encode_uint(buf, IPROTO_RAFT_TERM); buf = mp_encode_uint(buf, r->term); - buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); - buf = mp_encode_uint(buf, r->vote); + if (r->vote != 0) { + buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); + buf = mp_encode_uint(buf, r->vote); + } + if (r->state != 0) { + buf = mp_encode_uint(buf, IPROTO_RAFT_STATE); + buf = mp_encode_uint(buf, r->state); + } + if (r->vclock != NULL) { + buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK); + buf = mp_encode_vclock_ignore0(buf, r->vclock); + } return 0; } int -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r) +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, + struct vclock *vclock) { - /* TODO: handle bad format. */ assert(row->type == IPROTO_RAFT); - assert(row->bodycnt == 1); - assert(row->group_id == GROUP_LOCAL); + if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) { + diag_set(ClientError, ER_INVALID_MSGPACK, + "malformed raft request"); + return -1; + } memset(r, 0, sizeof(*r)); - const char *pos = row->body[0].iov_base; + r->vclock = vclock; + + const char *begin = row->body[0].iov_base; + const char *end = begin + row->body[0].iov_len; + const char *pos = begin; uint32_t map_size = mp_decode_map(&pos); for (uint32_t i = 0; i < map_size; ++i) { + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; uint64_t key = mp_decode_uint(&pos); switch (key) { case IPROTO_RAFT_TERM: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; r->term = mp_decode_uint(&pos); break; case IPROTO_RAFT_VOTE: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; r->vote = mp_decode_uint(&pos); break; + case IPROTO_RAFT_STATE: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; + r->state = mp_decode_uint(&pos); + break; + case IPROTO_RAFT_VCLOCK: + if (r->vclock == NULL) + mp_next(&pos); + else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0) + goto bad_msgpack; + break; default: mp_next(&pos); break; } } return 0; + +bad_msgpack: + xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body"); + return -1; } int diff --git a/src/box/xrow.h b/src/box/xrow.h index c234f6f88..c627102dd 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -267,6 +267,8 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); struct raft_request { uint64_t term; uint32_t vote; + uint32_t state; + struct vclock *vclock; }; int @@ -274,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, const struct raft_request *r); int -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r); +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, + struct vclock *vclock); /** * CALL/EVAL request. -- 2.21.1 (Apple Git-122.3)
next prev parent reply other threads:[~2020-09-02 23:33 UTC|newest] Thread overview: 10+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-09-02 23:33 [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy 2020-09-02 23:33 ` [Tarantool-patches] [PATCH 1/8] applier: store instance_id in struct applier Vladislav Shpilevoy 2020-09-02 23:33 ` [Tarantool-patches] [PATCH 2/8] box: introduce summary RO flag Vladislav Shpilevoy 2020-09-02 23:33 ` [Tarantool-patches] [PATCH 3/8] wal: don't touch box.cfg.wal_dir more than once Vladislav Shpilevoy 2020-09-02 23:33 ` [Tarantool-patches] [PATCH 4/8] raft: introduce persistent raft state Vladislav Shpilevoy 2020-09-02 23:33 ` [Tarantool-patches] [PATCH 5/8] raft: introduce box.cfg.raft_* options Vladislav Shpilevoy 2020-09-02 23:33 ` Vladislav Shpilevoy [this message] 2020-09-02 23:33 ` [Tarantool-patches] [PATCH 7/8] [tosquash] raft: pass source instance_id to raft_process_msg() Vladislav Shpilevoy 2020-09-02 23:33 ` [Tarantool-patches] [PATCH 8/8] raft: state machine Vladislav Shpilevoy 2020-09-03 22:51 ` [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=42310d7fc9a15c94e14f080c066b8eee3e4991c7.1599089353.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=gorcunov@gmail.com \ --cc=sergepetrenko@tarantool.org \ --cc=tarantool-patches@dev.tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH 6/8] raft: relay status updates to followers' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox