From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org, gorcunov@gmail.com Subject: [Tarantool-patches] [PATCH v2 07/10] raft: relay status updates to followers Date: Fri, 4 Sep 2020 00:51:22 +0200 [thread overview] Message-ID: <42afaa9ac42dbd911582074a8073a643e52c051f.1599173312.git.v.shpilevoy@tarantool.org> (raw) In-Reply-To: <cover.1599173312.git.v.shpilevoy@tarantool.org> From: sergepetrenko <sergepetrenko@tarantool.org> The patch introduces a new type of system message used to notify the followers of the instance's raft status updates. It's relay's responsibility to deliver the new system rows to its peers. The notification system reuses and extends the same row type used to persist raft state in WAL and snapshot. Part of #1146 Part of #5204 --- src/box/applier.cc | 32 +++++++++++++-- src/box/box.cc | 21 +++++++++- src/box/iproto_constants.h | 2 + src/box/memtx_engine.c | 3 +- src/box/raft.c | 71 +++++++++++++++++++++++++++++++- src/box/raft.h | 31 +++++++++++++- src/box/relay.cc | 62 +++++++++++++++++++++++++++- src/box/relay.h | 7 ++++ src/box/xrow.c | 83 ++++++++++++++++++++++++++++++++------ src/box/xrow.h | 5 ++- 10 files changed, 293 insertions(+), 24 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 699b5a683..53db97d6d 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -55,6 +55,7 @@ #include "scoped_guard.h" #include "txn_limbo.h" #include "journal.h" +#include "raft.h" STRS(applier_state, applier_STATE); @@ -315,6 +316,8 @@ apply_final_join_row(struct xrow_header *row) */ if (iproto_type_is_synchro_request(row->type)) return 0; + if (iproto_type_is_raft_request(row->type)) + return 0; struct txn *txn = txn_begin(); if (txn == NULL) return -1; @@ -894,6 +897,21 @@ err: return -1; } +static int +apply_raft_row(struct xrow_header *row) +{ + assert(iproto_type_is_raft_request(row->type)); + + struct raft_request req; + struct vclock candidate_clock; + if (xrow_decode_raft(row, &req, &candidate_clock) != 0) + return -1; + + raft_process_msg(&req); + + return 0; +} + /** * Apply all rows in the rows queue as a single transaction. * @@ -1238,11 +1256,19 @@ applier_subscribe(struct applier *applier) * In case of an heartbeat message wake a writer up * and check applier state. */ - if (stailq_first_entry(&rows, struct applier_tx_row, - next)->row.lsn == 0) + struct xrow_header *first_row = + &stailq_first_entry(&rows, struct applier_tx_row, + next)->row; + if (first_row->lsn == 0) { + if (unlikely(iproto_type_is_raft_request( + first_row->type))) { + if (apply_raft_row(first_row) != 0) + diag_raise(); + } applier_signal_ack(applier); - else if (applier_apply_tx(&rows) != 0) + } else if (applier_apply_tx(&rows) != 0) { diag_raise(); + } if (ibuf_used(ibuf) == 0) ibuf_reset(ibuf); diff --git a/src/box/box.cc b/src/box/box.cc index 5f04a1a78..6f85e734a 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -387,7 +387,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) } if (iproto_type_is_raft_request(row->type)) { struct raft_request raft_req; - if (xrow_decode_raft(row, &raft_req) != 0) + /* Vclock is never persisted in WAL by Raft. */ + if (xrow_decode_raft(row, &raft_req, NULL) != 0) diag_raise(); raft_process_recovery(&raft_req); return; @@ -2132,7 +2133,23 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) tt_uuid_str(&replica_uuid), sio_socketname(io->fd)); say_info("remote vclock %s local vclock %s", vclock_to_string(&replica_clock), vclock_to_string(&vclock)); - + if (raft_is_enabled()) { + /* + * Send out the current raft state of the instance. Don't do + * that if Raft is disabled. It can be that a part of the + * cluster still contains old versions, which can't handle Raft + * messages. So when it is disabled, its network footprint + * should be 0. + */ + struct raft_request req; + /* + * Omit the candidate vclock, since we've just sent it in + * subscribe response. + */ + raft_serialize_for_network(&req, NULL); + xrow_encode_raft(&row, &fiber()->gc, &req); + coio_write_xrow(io, &row); + } /* * Replica clock is used in gc state and recovery * initialization, so we need to replace the remote 0-th diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 8a11626b3..3ec397d3c 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -263,6 +263,8 @@ extern const char *iproto_type_strs[]; enum iproto_raft_keys { IPROTO_RAFT_TERM = 0, IPROTO_RAFT_VOTE = 1, + IPROTO_RAFT_STATE = 2, + IPROTO_RAFT_VCLOCK = 3, }; /** diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index 7ebed7aa8..fe7ae9f63 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -206,7 +206,8 @@ memtx_engine_recover_raft(const struct xrow_header *row) { assert(row->type == IPROTO_RAFT); struct raft_request req; - if (xrow_decode_raft(row, &req) != 0) + /* Vclock is never persisted in WAL by Raft. */ + if (xrow_decode_raft(row, &req, NULL) != 0) return -1; raft_process_recovery(&req); return 0; diff --git a/src/box/raft.c b/src/box/raft.c index ee54d02b7..7697809ee 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -34,9 +34,20 @@ #include "journal.h" #include "xrow.h" #include "small/region.h" +#include "replication.h" +#include "relay.h" + +const char *raft_state_strs[] = { + NULL, + "follower", + "candidate", + "leader", +}; /** Raft state of this instance. */ struct raft raft = { + .leader = 0, + .state = RAFT_STATE_FOLLOWER, .is_enabled = false, .is_candidate = false, .term = 1, @@ -50,18 +61,65 @@ raft_process_recovery(const struct raft_request *req) raft.term = req->term; if (req->vote != 0) raft.vote = req->vote; + /* + * Role is never persisted. If recovery is happening, the + * node was restarted, and the former role can be false + * anyway. + */ + assert(req->state == 0); + /* + * Vclock is always persisted by some other subsystem - WAL, snapshot. + * It is used only to decide to whom to give the vote during election, + * as a part of the volatile state. + */ + assert(req->vclock == NULL); + /* Raft is not enabled until recovery is finished. */ + assert(!raft_is_enabled()); } void -raft_serialize_for_network(struct raft_request *req) +raft_process_msg(const struct raft_request *req) { + if (req->term > raft.term) { + // Update term. + // The logic will be similar, but the code + // below is for testing purposes. + raft.term = req->term; + } + if (req->vote > 0) { + // Check whether the vote's for us. + } + switch (req->state) { + case RAFT_STATE_FOLLOWER: + break; + case RAFT_STATE_CANDIDATE: + // Perform voting logic. + break; + case RAFT_STATE_LEADER: + // Switch to a new leader. + break; + default: + break; + } +} + +void +raft_serialize_for_network(struct raft_request *req, struct vclock *vclock) +{ + memset(req, 0, sizeof(*req)); req->term = raft.term; req->vote = raft.vote; + req->state = raft.state; + /* + * Raft does not own vclock, so it always expects it passed externally. + */ + req->vclock = vclock; } void raft_serialize_for_disk(struct raft_request *req) { + memset(req, 0, sizeof(*req)); req->term = raft.term; req->vote = raft.vote; } @@ -93,3 +151,14 @@ void raft_cfg_death_timeout(void) { } + +void +raft_broadcast(const struct raft_request *req) +{ + replicaset_foreach(replica) { + if (replica->relay != NULL && replica->id != REPLICA_ID_NIL && + relay_get_state(replica->relay) == RELAY_FOLLOW) { + relay_push_raft(replica->relay, req); + } + } +} diff --git a/src/box/raft.h b/src/box/raft.h index f27222752..e3261454b 100644 --- a/src/box/raft.h +++ b/src/box/raft.h @@ -37,8 +37,19 @@ extern "C" { #endif struct raft_request; +struct vclock; + +enum raft_state { + RAFT_STATE_FOLLOWER = 1, + RAFT_STATE_CANDIDATE = 2, + RAFT_STATE_LEADER = 3, +}; + +extern const char *raft_state_strs[]; struct raft { + uint32_t leader; + enum raft_state state; bool is_enabled; bool is_candidate; uint64_t term; @@ -48,10 +59,21 @@ struct raft { extern struct raft raft; +/** Check if Raft is enabled. */ +static inline bool +raft_is_enabled(void) +{ + return raft.is_enabled; +} + /** Process a raft entry stored in WAL/snapshot. */ void raft_process_recovery(const struct raft_request *req); +/** Process a raft status message coming from the network. */ +void +raft_process_msg(const struct raft_request *req); + /** Configure whether Raft is enabled. */ void raft_cfg_is_enabled(bool is_enabled); @@ -88,7 +110,7 @@ raft_cfg_death_timeout(void); * cluster. It is allowed to save anything here, not only persistent state. */ void -raft_serialize_for_network(struct raft_request *req); +raft_serialize_for_network(struct raft_request *req, struct vclock *vclock); /** * Save complete Raft state into a request to be persisted on disk. Only term @@ -97,6 +119,13 @@ raft_serialize_for_network(struct raft_request *req); void raft_serialize_for_disk(struct raft_request *req); +/** + * Broadcast the changes in this instance's raft status to all + * the followers. + */ +void +raft_broadcast(const struct raft_request *req); + #if defined(__cplusplus) } #endif diff --git a/src/box/relay.cc b/src/box/relay.cc index 124b0f52f..74581db9c 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -53,6 +53,7 @@ #include "xstream.h" #include "wal.h" #include "txn_limbo.h" +#include "raft.h" /** * Cbus message to send status updates from relay to tx thread. @@ -770,13 +771,68 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) relay_send(relay, row); } +struct relay_raft_msg { + struct cmsg base; + struct cmsg_hop route; + struct raft_request req; + struct vclock vclock; + struct relay *relay; +}; + +static void +relay_raft_msg_push(struct cmsg *base) +{ + struct relay_raft_msg *msg = (struct relay_raft_msg *)base; + struct xrow_header row; + xrow_encode_raft(&row, &fiber()->gc, &msg->req); + try { + relay_send(msg->relay, &row); + } catch (Exception *e) { + relay_set_error(msg->relay, e); + fiber_cancel(fiber()); + } + free(msg); +} + +void +relay_push_raft(struct relay *relay, const struct raft_request *req) +{ + /* + * XXX: the message should be preallocated. It should + * work like Kharon in IProto. Relay should have 2 raft + * messages rotating. When one is sent, the other can be + * updated and a flag is set. When the first message is + * sent, the control returns to TX thread, sees the set + * flag, rotates the buffers, and sends it again. And so + * on. This is how it can work in future, with 0 heap + * allocations. Current solution with alloc-per-update is + * good enough as a start. Another option - wait until all + * is moved to WAL thread, where this will all happen + * in one thread and will be much simpler. + */ + struct relay_raft_msg *msg = + (struct relay_raft_msg *)malloc(sizeof(*msg)); + if (msg == NULL) { + panic("Couldn't allocate raft message"); + return; + } + msg->req = *req; + if (req->vclock != NULL) { + msg->req.vclock = &msg->vclock; + vclock_copy(&msg->vclock, req->vclock); + } + msg->route.f = relay_raft_msg_push; + msg->route.pipe = NULL; + cmsg_init(&msg->base, &msg->route); + msg->relay = relay; + cpipe_push(&relay->relay_pipe, &msg->base); +} + /** Send a single row to the client. */ static void relay_send_row(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); - assert(iproto_type_is_dml(packet->type) || - iproto_type_is_synchro_request(packet->type)); if (packet->group_id == GROUP_LOCAL) { /* * We do not relay replica-local rows to other @@ -793,6 +849,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) packet->group_id = GROUP_DEFAULT; packet->bodycnt = 0; } + assert(iproto_type_is_dml(packet->type) || + iproto_type_is_synchro_request(packet->type)); /* Check if the rows from the instance are filtered. */ if ((1 << packet->replica_id & relay->id_filter) != 0) return; diff --git a/src/box/relay.h b/src/box/relay.h index 0632fa912..b32e2ea2a 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -93,6 +93,13 @@ relay_vclock(const struct relay *relay); double relay_last_row_time(const struct relay *relay); +/** + * Send a Raft update request to the relay channel. It is not + * guaranteed that it will be delivered. The connection may break. + */ +void +relay_push_raft(struct relay *relay, const struct raft_request *req); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/xrow.c b/src/box/xrow.c index 1923bacfc..11fdacc0d 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -958,11 +958,30 @@ int xrow_encode_raft(struct xrow_header *row, struct region *region, const struct raft_request *r) { - size_t size = mp_sizeof_map(2) + - mp_sizeof_uint(IPROTO_RAFT_TERM) + - mp_sizeof_uint(r->term) + - mp_sizeof_uint(IPROTO_RAFT_VOTE) + - mp_sizeof_uint(r->vote); + /* + * Terms is encoded always. Sometimes the rest can be even ignored if + * the term is too old. + */ + int map_size = 1; + size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) + + mp_sizeof_uint(r->term); + if (r->vote != 0) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_VOTE) + + mp_sizeof_uint(r->vote); + } + if (r->state != 0) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_STATE) + + mp_sizeof_uint(r->state); + } + if (r->vclock != NULL) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) + + mp_sizeof_vclock_ignore0(r->vclock); + } + size += mp_sizeof_map(map_size); + char *buf = region_alloc(region, size); if (buf == NULL) { diag_set(OutOfMemory, size, "region_alloc", "buf"); @@ -974,40 +993,78 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, row->body[0].iov_len = size; row->group_id = GROUP_LOCAL; row->bodycnt = 1; - buf = mp_encode_map(buf, 2); + buf = mp_encode_map(buf, map_size); buf = mp_encode_uint(buf, IPROTO_RAFT_TERM); buf = mp_encode_uint(buf, r->term); - buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); - buf = mp_encode_uint(buf, r->vote); + if (r->vote != 0) { + buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); + buf = mp_encode_uint(buf, r->vote); + } + if (r->state != 0) { + buf = mp_encode_uint(buf, IPROTO_RAFT_STATE); + buf = mp_encode_uint(buf, r->state); + } + if (r->vclock != NULL) { + buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK); + buf = mp_encode_vclock_ignore0(buf, r->vclock); + } return 0; } int -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r) +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, + struct vclock *vclock) { - /* TODO: handle bad format. */ assert(row->type == IPROTO_RAFT); - assert(row->bodycnt == 1); - assert(row->group_id == GROUP_LOCAL); + if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) { + diag_set(ClientError, ER_INVALID_MSGPACK, + "malformed raft request"); + return -1; + } memset(r, 0, sizeof(*r)); - const char *pos = row->body[0].iov_base; + r->vclock = vclock; + + const char *begin = row->body[0].iov_base; + const char *end = begin + row->body[0].iov_len; + const char *pos = begin; uint32_t map_size = mp_decode_map(&pos); for (uint32_t i = 0; i < map_size; ++i) { + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; uint64_t key = mp_decode_uint(&pos); switch (key) { case IPROTO_RAFT_TERM: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; r->term = mp_decode_uint(&pos); break; case IPROTO_RAFT_VOTE: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; r->vote = mp_decode_uint(&pos); break; + case IPROTO_RAFT_STATE: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; + r->state = mp_decode_uint(&pos); + break; + case IPROTO_RAFT_VCLOCK: + if (r->vclock == NULL) + mp_next(&pos); + else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0) + goto bad_msgpack; + break; default: mp_next(&pos); break; } } return 0; + +bad_msgpack: + xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body"); + return -1; } int diff --git a/src/box/xrow.h b/src/box/xrow.h index c234f6f88..c627102dd 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -267,6 +267,8 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); struct raft_request { uint64_t term; uint32_t vote; + uint32_t state; + struct vclock *vclock; }; int @@ -274,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, const struct raft_request *r); int -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r); +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, + struct vclock *vclock); /** * CALL/EVAL request. -- 2.21.1 (Apple Git-122.3)
next prev parent reply other threads:[~2020-09-03 22:51 UTC|newest] Thread overview: 27+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy 2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 01/10] applier: store instance_id in struct applier Vladislav Shpilevoy 2020-09-04 8:13 ` Serge Petrenko 2020-09-07 22:54 ` Vladislav Shpilevoy 2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 10/10] raft: introduce box.info.raft Vladislav Shpilevoy 2020-09-04 11:38 ` Serge Petrenko 2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 02/10] box: introduce summary RO flag Vladislav Shpilevoy 2020-09-04 8:17 ` Serge Petrenko 2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 03/10] wal: don't touch box.cfg.wal_dir more than once Vladislav Shpilevoy 2020-09-04 8:20 ` Serge Petrenko 2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 04/10] replication: track registered replica count Vladislav Shpilevoy 2020-09-04 8:24 ` Serge Petrenko 2020-09-07 22:54 ` Vladislav Shpilevoy 2020-09-07 15:45 ` Sergey Ostanevich 2020-09-07 22:54 ` Vladislav Shpilevoy 2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 05/10] raft: introduce persistent raft state Vladislav Shpilevoy 2020-09-04 8:59 ` Serge Petrenko 2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 06/10] raft: introduce box.cfg.raft_* options Vladislav Shpilevoy 2020-09-04 9:07 ` Serge Petrenko 2020-09-07 22:55 ` Vladislav Shpilevoy 2020-09-03 22:51 ` Vladislav Shpilevoy [this message] 2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 08/10] [tosquash] raft: pass source instance_id to raft_process_msg() Vladislav Shpilevoy 2020-09-04 9:22 ` Serge Petrenko 2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 09/10] raft: introduce state machine Vladislav Shpilevoy 2020-09-04 11:36 ` Serge Petrenko 2020-09-07 22:57 ` Vladislav Shpilevoy 2020-09-09 8:04 ` Serge Petrenko
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=42afaa9ac42dbd911582074a8073a643e52c051f.1599173312.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=gorcunov@gmail.com \ --cc=sergepetrenko@tarantool.org \ --cc=tarantool-patches@dev.tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v2 07/10] raft: relay status updates to followers' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox