From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp61.i.mail.ru (smtp61.i.mail.ru [217.69.128.41]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 80A0D469719 for ; Mon, 21 Sep 2020 13:50:41 +0300 (MSK) References: <17979db071a67d8e5f299fd405095dc14a507b3c.1599693319.git.v.shpilevoy@tarantool.org> From: Serge Petrenko Message-ID: <6ca694f2-2e84-fcf3-ca46-15e188bfdf10@tarantool.org> Date: Mon, 21 Sep 2020 13:50:39 +0300 MIME-Version: 1.0 In-Reply-To: <17979db071a67d8e5f299fd405095dc14a507b3c.1599693319.git.v.shpilevoy@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH v2 08/11] raft: relay status updates to followers List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org, gorcunov@gmail.com 10.09.2020 02:16, Vladislav Shpilevoy пишет: > From: sergepetrenko > > The patch introduces a new type of system message used to notify the > followers of the instance's raft status updates. > It's relay's responsibility to deliver the new system rows to its peers. > The notification system reuses and extends the same row type used to > persist raft state in WAL and snapshot. > > Part of #1146 > Part of #5204 > --- > src/box/applier.cc | 31 ++++++++++++-- > src/box/box.cc | 21 +++++++++- > src/box/iproto_constants.h | 2 + > src/box/memtx_engine.c | 3 +- > src/box/raft.c | 72 ++++++++++++++++++++++++++++++++- > src/box/raft.h | 35 +++++++++++++++- > src/box/relay.cc | 62 +++++++++++++++++++++++++++- > src/box/relay.h | 7 ++++ > src/box/xrow.c | 83 ++++++++++++++++++++++++++++++++------ > src/box/xrow.h | 5 ++- > 10 files changed, 297 insertions(+), 24 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index c1d07ca54..ed76bf2ca 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -55,6 +55,7 @@ > #include "scoped_guard.h" > #include "txn_limbo.h" > #include "journal.h" > +#include "raft.h" > > STRS(applier_state, applier_STATE); > > @@ -298,6 +299,8 @@ apply_final_join_row(struct xrow_header *row) > */ > if (iproto_type_is_synchro_request(row->type)) > return 0; > + if (iproto_type_is_raft_request(row->type)) > + return 0; > struct txn *txn = txn_begin(); > if (txn == NULL) > return -1; > @@ -876,6 +879,19 @@ err: > return -1; > } Hi! Consider these fixes on top of this commit: diff --git a/src/box/xrow.c b/src/box/xrow.c index 928bf1497..250794a3e 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -994,24 +994,28 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,         memset(row, 0, sizeof(*row));         row->type = IPROTO_RAFT;         row->body[0].iov_base = buf; -       row->body[0].iov_len = size;         row->group_id = GROUP_LOCAL;         row->bodycnt = 1; -       buf = mp_encode_map(buf, map_size); -       buf = mp_encode_uint(buf, IPROTO_RAFT_TERM); -       buf = mp_encode_uint(buf, r->term); +       char *data = buf; + +       data = mp_encode_map(data, map_size); +       data = mp_encode_uint(data, IPROTO_RAFT_TERM); +       data = mp_encode_uint(data, r->term);         if (r->vote != 0) { -               buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); -               buf = mp_encode_uint(buf, r->vote); +               data = mp_encode_uint(data, IPROTO_RAFT_VOTE); +               data = mp_encode_uint(data, r->vote);         }         if (r->state != 0) { -               buf = mp_encode_uint(buf, IPROTO_RAFT_STATE); -               buf = mp_encode_uint(buf, r->state); +               data = mp_encode_uint(data, IPROTO_RAFT_STATE); +               data = mp_encode_uint(data, r->state);         }         if (r->vclock != NULL) { -               buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK); -               buf = mp_encode_vclock_ignore0(buf, r->vclock); +               data = mp_encode_uint(data, IPROTO_RAFT_VCLOCK); +               data = mp_encode_vclock_ignore0(data, r->vclock);         } + +       row->body[0].iov_len = data - buf ; +         return 0;  } > > +static int > +applier_handle_raft(struct applier *applier, struct xrow_header *row) > +{ > + assert(iproto_type_is_raft_request(row->type)); > + > + struct raft_request req; > + struct vclock candidate_clock; > + if (xrow_decode_raft(row, &req, &candidate_clock) != 0) > + return -1; > + raft_process_msg(&req, applier->instance_id); > + return 0; > +} > + > /** > * Apply all rows in the rows queue as a single transaction. > * > @@ -1219,11 +1235,20 @@ applier_subscribe(struct applier *applier) > * In case of an heartbeat message wake a writer up > * and check applier state. > */ > - if (stailq_first_entry(&rows, struct applier_tx_row, > - next)->row.lsn == 0) > + struct xrow_header *first_row = > + &stailq_first_entry(&rows, struct applier_tx_row, > + next)->row; > + if (first_row->lsn == 0) { > + if (unlikely(iproto_type_is_raft_request( > + first_row->type))) { > + if (applier_handle_raft(applier, > + first_row) != 0) > + diag_raise(); > + } > applier_signal_ack(applier); > - else if (applier_apply_tx(&rows) != 0) > + } else if (applier_apply_tx(&rows) != 0) { > diag_raise(); > + } > > if (ibuf_used(ibuf) == 0) > ibuf_reset(ibuf); > diff --git a/src/box/box.cc b/src/box/box.cc > index 7c3c895d2..980754d1d 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -387,7 +387,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) > } > if (iproto_type_is_raft_request(row->type)) { > struct raft_request raft_req; > - if (xrow_decode_raft(row, &raft_req) != 0) > + /* Vclock is never persisted in WAL by Raft. */ > + if (xrow_decode_raft(row, &raft_req, NULL) != 0) > diag_raise(); > raft_process_recovery(&raft_req); > return; > @@ -2132,7 +2133,23 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) > tt_uuid_str(&replica_uuid), sio_socketname(io->fd)); > say_info("remote vclock %s local vclock %s", > vclock_to_string(&replica_clock), vclock_to_string(&vclock)); > - > + if (raft_is_enabled()) { > + /* > + * Send out the current raft state of the instance. Don't do > + * that if Raft is disabled. It can be that a part of the > + * cluster still contains old versions, which can't handle Raft > + * messages. So when it is disabled, its network footprint > + * should be 0. > + */ > + struct raft_request req; > + /* > + * Omit the candidate vclock, since we've just sent it in > + * subscribe response. > + */ > + raft_serialize_for_network(&req, NULL); > + xrow_encode_raft(&row, &fiber()->gc, &req); > + coio_write_xrow(io, &row); > + } > /* > * Replica clock is used in gc state and recovery > * initialization, so we need to replace the remote 0-th > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > index 8a11626b3..3ec397d3c 100644 > --- a/src/box/iproto_constants.h > +++ b/src/box/iproto_constants.h > @@ -263,6 +263,8 @@ extern const char *iproto_type_strs[]; > enum iproto_raft_keys { > IPROTO_RAFT_TERM = 0, > IPROTO_RAFT_VOTE = 1, > + IPROTO_RAFT_STATE = 2, > + IPROTO_RAFT_VCLOCK = 3, > }; > > /** > diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c > index 5ab2cf266..166fe3136 100644 > --- a/src/box/memtx_engine.c > +++ b/src/box/memtx_engine.c > @@ -206,7 +206,8 @@ memtx_engine_recover_raft(const struct xrow_header *row) > { > assert(row->type == IPROTO_RAFT); > struct raft_request req; > - if (xrow_decode_raft(row, &req) != 0) > + /* Vclock is never persisted in WAL by Raft. */ > + if (xrow_decode_raft(row, &req, NULL) != 0) > return -1; > raft_process_recovery(&req); > return 0; > diff --git a/src/box/raft.c b/src/box/raft.c > index ee54d02b7..4d3d07c48 100644 > --- a/src/box/raft.c > +++ b/src/box/raft.c > @@ -34,9 +34,20 @@ > #include "journal.h" > #include "xrow.h" > #include "small/region.h" > +#include "replication.h" > +#include "relay.h" > + > +const char *raft_state_strs[] = { > + NULL, > + "follower", > + "candidate", > + "leader", > +}; > > /** Raft state of this instance. */ > struct raft raft = { > + .leader = 0, > + .state = RAFT_STATE_FOLLOWER, > .is_enabled = false, > .is_candidate = false, > .term = 1, > @@ -50,18 +61,66 @@ raft_process_recovery(const struct raft_request *req) > raft.term = req->term; > if (req->vote != 0) > raft.vote = req->vote; > + /* > + * Role is never persisted. If recovery is happening, the > + * node was restarted, and the former role can be false > + * anyway. > + */ > + assert(req->state == 0); > + /* > + * Vclock is always persisted by some other subsystem - WAL, snapshot. > + * It is used only to decide to whom to give the vote during election, > + * as a part of the volatile state. > + */ > + assert(req->vclock == NULL); > + /* Raft is not enabled until recovery is finished. */ > + assert(!raft_is_enabled()); > } > > void > -raft_serialize_for_network(struct raft_request *req) > +raft_process_msg(const struct raft_request *req, uint32_t source) > { > + (void)source; > + if (req->term > raft.term) { > + // Update term. > + // The logic will be similar, but the code > + // below is for testing purposes. > + raft.term = req->term; > + } > + if (req->vote > 0) { > + // Check whether the vote's for us. > + } > + switch (req->state) { > + case RAFT_STATE_FOLLOWER: > + break; > + case RAFT_STATE_CANDIDATE: > + // Perform voting logic. > + break; > + case RAFT_STATE_LEADER: > + // Switch to a new leader. > + break; > + default: > + break; > + } > +} > + > +void > +raft_serialize_for_network(struct raft_request *req, struct vclock *vclock) > +{ > + memset(req, 0, sizeof(*req)); > req->term = raft.term; > req->vote = raft.vote; > + req->state = raft.state; > + /* > + * Raft does not own vclock, so it always expects it passed externally. > + */ > + req->vclock = vclock; > } > > void > raft_serialize_for_disk(struct raft_request *req) > { > + memset(req, 0, sizeof(*req)); > req->term = raft.term; > req->vote = raft.vote; > } > @@ -93,3 +152,14 @@ void > raft_cfg_death_timeout(void) > { > } > + > +void > +raft_broadcast(const struct raft_request *req) > +{ > + replicaset_foreach(replica) { > + if (replica->relay != NULL && replica->id != REPLICA_ID_NIL && > + relay_get_state(replica->relay) == RELAY_FOLLOW) { > + relay_push_raft(replica->relay, req); > + } > + } > +} > diff --git a/src/box/raft.h b/src/box/raft.h > index f27222752..db64cf933 100644 > --- a/src/box/raft.h > +++ b/src/box/raft.h > @@ -37,8 +37,19 @@ extern "C" { > #endif > > struct raft_request; > +struct vclock; > + > +enum raft_state { > + RAFT_STATE_FOLLOWER = 1, > + RAFT_STATE_CANDIDATE = 2, > + RAFT_STATE_LEADER = 3, > +}; > + > +extern const char *raft_state_strs[]; > > struct raft { > + uint32_t leader; > + enum raft_state state; > bool is_enabled; > bool is_candidate; > uint64_t term; > @@ -48,10 +59,25 @@ struct raft { > > extern struct raft raft; > > +/** Check if Raft is enabled. */ > +static inline bool > +raft_is_enabled(void) > +{ > + return raft.is_enabled; > +} > + > /** Process a raft entry stored in WAL/snapshot. */ > void > raft_process_recovery(const struct raft_request *req); > > +/** > + * Process a raft status message coming from the network. > + * @param req Raft request. > + * @param source Instance ID of the message sender. > + */ > +void > +raft_process_msg(const struct raft_request *req, uint32_t source); > + > /** Configure whether Raft is enabled. */ > void > raft_cfg_is_enabled(bool is_enabled); > @@ -88,7 +114,7 @@ raft_cfg_death_timeout(void); > * cluster. It is allowed to save anything here, not only persistent state. > */ > void > -raft_serialize_for_network(struct raft_request *req); > +raft_serialize_for_network(struct raft_request *req, struct vclock *vclock); > > /** > * Save complete Raft state into a request to be persisted on disk. Only term > @@ -97,6 +123,13 @@ raft_serialize_for_network(struct raft_request *req); > void > raft_serialize_for_disk(struct raft_request *req); > > +/** > + * Broadcast the changes in this instance's raft status to all > + * the followers. > + */ > +void > +raft_broadcast(const struct raft_request *req); > + > #if defined(__cplusplus) > } > #endif > diff --git a/src/box/relay.cc b/src/box/relay.cc > index 124b0f52f..74581db9c 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -53,6 +53,7 @@ > #include "xstream.h" > #include "wal.h" > #include "txn_limbo.h" > +#include "raft.h" > > /** > * Cbus message to send status updates from relay to tx thread. > @@ -770,13 +771,68 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) > relay_send(relay, row); > } > > +struct relay_raft_msg { > + struct cmsg base; > + struct cmsg_hop route; > + struct raft_request req; > + struct vclock vclock; > + struct relay *relay; > +}; > + > +static void > +relay_raft_msg_push(struct cmsg *base) > +{ > + struct relay_raft_msg *msg = (struct relay_raft_msg *)base; > + struct xrow_header row; > + xrow_encode_raft(&row, &fiber()->gc, &msg->req); > + try { > + relay_send(msg->relay, &row); > + } catch (Exception *e) { > + relay_set_error(msg->relay, e); > + fiber_cancel(fiber()); > + } > + free(msg); > +} > + > +void > +relay_push_raft(struct relay *relay, const struct raft_request *req) > +{ > + /* > + * XXX: the message should be preallocated. It should > + * work like Kharon in IProto. Relay should have 2 raft > + * messages rotating. When one is sent, the other can be > + * updated and a flag is set. When the first message is > + * sent, the control returns to TX thread, sees the set > + * flag, rotates the buffers, and sends it again. And so > + * on. This is how it can work in future, with 0 heap > + * allocations. Current solution with alloc-per-update is > + * good enough as a start. Another option - wait until all > + * is moved to WAL thread, where this will all happen > + * in one thread and will be much simpler. > + */ > + struct relay_raft_msg *msg = > + (struct relay_raft_msg *)malloc(sizeof(*msg)); > + if (msg == NULL) { > + panic("Couldn't allocate raft message"); > + return; > + } > + msg->req = *req; > + if (req->vclock != NULL) { > + msg->req.vclock = &msg->vclock; > + vclock_copy(&msg->vclock, req->vclock); > + } > + msg->route.f = relay_raft_msg_push; > + msg->route.pipe = NULL; > + cmsg_init(&msg->base, &msg->route); > + msg->relay = relay; > + cpipe_push(&relay->relay_pipe, &msg->base); > +} > + > /** Send a single row to the client. */ > static void > relay_send_row(struct xstream *stream, struct xrow_header *packet) > { > struct relay *relay = container_of(stream, struct relay, stream); > - assert(iproto_type_is_dml(packet->type) || > - iproto_type_is_synchro_request(packet->type)); > if (packet->group_id == GROUP_LOCAL) { > /* > * We do not relay replica-local rows to other > @@ -793,6 +849,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) > packet->group_id = GROUP_DEFAULT; > packet->bodycnt = 0; > } > + assert(iproto_type_is_dml(packet->type) || > + iproto_type_is_synchro_request(packet->type)); > /* Check if the rows from the instance are filtered. */ > if ((1 << packet->replica_id & relay->id_filter) != 0) > return; > diff --git a/src/box/relay.h b/src/box/relay.h > index 0632fa912..b32e2ea2a 100644 > --- a/src/box/relay.h > +++ b/src/box/relay.h > @@ -93,6 +93,13 @@ relay_vclock(const struct relay *relay); > double > relay_last_row_time(const struct relay *relay); > > +/** > + * Send a Raft update request to the relay channel. It is not > + * guaranteed that it will be delivered. The connection may break. > + */ > +void > +relay_push_raft(struct relay *relay, const struct raft_request *req); > + > #if defined(__cplusplus) > } /* extern "C" */ > #endif /* defined(__cplusplus) */ > diff --git a/src/box/xrow.c b/src/box/xrow.c > index 1923bacfc..11fdacc0d 100644 > --- a/src/box/xrow.c > +++ b/src/box/xrow.c > @@ -958,11 +958,30 @@ int > xrow_encode_raft(struct xrow_header *row, struct region *region, > const struct raft_request *r) > { > - size_t size = mp_sizeof_map(2) + > - mp_sizeof_uint(IPROTO_RAFT_TERM) + > - mp_sizeof_uint(r->term) + > - mp_sizeof_uint(IPROTO_RAFT_VOTE) + > - mp_sizeof_uint(r->vote); > + /* > + * Terms is encoded always. Sometimes the rest can be even ignored if > + * the term is too old. > + */ > + int map_size = 1; > + size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) + > + mp_sizeof_uint(r->term); > + if (r->vote != 0) { > + ++map_size; > + size += mp_sizeof_uint(IPROTO_RAFT_VOTE) + > + mp_sizeof_uint(r->vote); > + } > + if (r->state != 0) { > + ++map_size; > + size += mp_sizeof_uint(IPROTO_RAFT_STATE) + > + mp_sizeof_uint(r->state); > + } > + if (r->vclock != NULL) { > + ++map_size; > + size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) + > + mp_sizeof_vclock_ignore0(r->vclock); > + } > + size += mp_sizeof_map(map_size); > + > char *buf = region_alloc(region, size); > if (buf == NULL) { > diag_set(OutOfMemory, size, "region_alloc", "buf"); > @@ -974,40 +993,78 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, > row->body[0].iov_len = size; > row->group_id = GROUP_LOCAL; > row->bodycnt = 1; > - buf = mp_encode_map(buf, 2); > + buf = mp_encode_map(buf, map_size); > buf = mp_encode_uint(buf, IPROTO_RAFT_TERM); > buf = mp_encode_uint(buf, r->term); > - buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); > - buf = mp_encode_uint(buf, r->vote); > + if (r->vote != 0) { > + buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); > + buf = mp_encode_uint(buf, r->vote); > + } > + if (r->state != 0) { > + buf = mp_encode_uint(buf, IPROTO_RAFT_STATE); > + buf = mp_encode_uint(buf, r->state); > + } > + if (r->vclock != NULL) { > + buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK); > + buf = mp_encode_vclock_ignore0(buf, r->vclock); > + } > return 0; > } > > int > -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r) > +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, > + struct vclock *vclock) > { > - /* TODO: handle bad format. */ > assert(row->type == IPROTO_RAFT); > - assert(row->bodycnt == 1); > - assert(row->group_id == GROUP_LOCAL); > + if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) { > + diag_set(ClientError, ER_INVALID_MSGPACK, > + "malformed raft request"); > + return -1; > + } > memset(r, 0, sizeof(*r)); > - const char *pos = row->body[0].iov_base; > + r->vclock = vclock; > + > + const char *begin = row->body[0].iov_base; > + const char *end = begin + row->body[0].iov_len; > + const char *pos = begin; > uint32_t map_size = mp_decode_map(&pos); > for (uint32_t i = 0; i < map_size; ++i) > { > + if (mp_typeof(*pos) != MP_UINT) > + goto bad_msgpack; > uint64_t key = mp_decode_uint(&pos); > switch (key) { > case IPROTO_RAFT_TERM: > + if (mp_typeof(*pos) != MP_UINT) > + goto bad_msgpack; > r->term = mp_decode_uint(&pos); > break; > case IPROTO_RAFT_VOTE: > + if (mp_typeof(*pos) != MP_UINT) > + goto bad_msgpack; > r->vote = mp_decode_uint(&pos); > break; > + case IPROTO_RAFT_STATE: > + if (mp_typeof(*pos) != MP_UINT) > + goto bad_msgpack; > + r->state = mp_decode_uint(&pos); > + break; > + case IPROTO_RAFT_VCLOCK: > + if (r->vclock == NULL) > + mp_next(&pos); > + else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0) > + goto bad_msgpack; > + break; > default: > mp_next(&pos); > break; > } > } > return 0; > + > +bad_msgpack: > + xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body"); > + return -1; > } > > int > diff --git a/src/box/xrow.h b/src/box/xrow.h > index c234f6f88..c627102dd 100644 > --- a/src/box/xrow.h > +++ b/src/box/xrow.h > @@ -267,6 +267,8 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); > struct raft_request { > uint64_t term; > uint32_t vote; > + uint32_t state; > + struct vclock *vclock; > }; > > int > @@ -274,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, > const struct raft_request *r); > > int > -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r); > +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, > + struct vclock *vclock); > > /** > * CALL/EVAL request. -- Serge Petrenko