From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp33.i.mail.ru (smtp33.i.mail.ru [94.100.177.93]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id A591743040D for ; Wed, 26 Aug 2020 10:53:40 +0300 (MSK) From: Serge Petrenko Date: Wed, 26 Aug 2020 10:53:29 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [RAFT 10/10] [tosquash] relay: move raft broadcast details into relay List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, gorcunov@gmail.com, sergos@tarantool.org Cc: tarantool-patches@dev.tarantool.org From: Vladislav Shpilevoy Raft did some allocations, cbus and cmsg initializations, in order to broadcast its state update. Also it didn't copy vclock value so it could point at invalid memory. The patch moves all the details about pushing a raft update into relay.cc file, and fixes the vclock copying. --- src/box/raft.c | 15 +---------- src/box/raft.h | 8 ------ src/box/relay.cc | 68 ++++++++++++++++++++++++++++++++++-------------- src/box/relay.h | 11 +++----- 4 files changed, 52 insertions(+), 50 deletions(-) diff --git a/src/box/raft.c b/src/box/raft.c index 34c1cf7aa..e40d778af 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -169,26 +169,13 @@ raft_vote(uint32_t vote_for) raft_write_request(&req); } -void -raft_free_msg(struct cmsg *msg) -{ - free((void *)msg->route); - free(msg); -} - void raft_broadcast(const struct raft_request *req) { replicaset_foreach(replica) { if (replica->relay != NULL && replica->id != REPLICA_ID_NIL && relay_get_state(replica->relay) == RELAY_FOLLOW) { - // TODO: think of a proper allocator. - struct raft_broadcast_msg *raft_msg = - calloc(1, sizeof(*raft_msg)); - raft_msg->req = *req; - struct cmsg_hop *route = calloc(2, sizeof(*route)); - relay_push_raft_msg(replica->relay, &raft_msg->base, - route); + relay_push_raft(replica->relay, req); } } } diff --git a/src/box/raft.h b/src/box/raft.h index be071c215..e14173057 100644 --- a/src/box/raft.h +++ b/src/box/raft.h @@ -52,11 +52,6 @@ struct raft { extern struct raft raft; -struct raft_broadcast_msg { - struct cmsg base; - struct raft_request req; -}; - void raft_new_term(uint64_t min_new_term); @@ -74,9 +69,6 @@ raft_process_msg(const struct raft_request *req); void raft_serialize(struct raft_request *req, struct vclock *vclock); -void -raft_free_msg(struct cmsg *msg); - /** * Broadcast the changes in this instance's raft status to all * the followers. diff --git a/src/box/relay.cc b/src/box/relay.cc index be252cad1..53a90f826 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -774,33 +774,61 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) relay_send(relay, row); } -static void -relay_send_raft(struct relay *relay, struct raft_request *req) -{ - struct xrow_header packet; - xrow_encode_raft(&packet, &fiber()->gc, req); - relay_send(relay, &packet); -} +struct relay_raft_msg { + struct cmsg base; + struct cmsg_hop route; + struct raft_request req; + struct vclock vclock; + struct relay *relay; +}; static void -relay_send_raft_msg(struct cmsg *msg) +relay_raft_msg_send(struct cmsg *base) { - struct raft_broadcast_msg *raft_msg = (struct raft_broadcast_msg *)msg; - struct relay *relay = container_of(msg->route[0].pipe, struct relay, - tx_pipe); - relay_send_raft(relay, &raft_msg->req); + struct relay_raft_msg *msg = (struct relay_raft_msg *)base; + struct xrow_header row; + xrow_encode_raft(&row, &fiber()->gc, &msg->req); + try { + relay_send(msg->relay, &row); + } catch (Exception *e) { + relay_set_error(msg->relay, e); + fiber_cancel(fiber()); + } + free(msg); } void -relay_push_raft_msg(struct relay *relay, struct cmsg *msg, - struct cmsg_hop *route) +relay_push_raft(struct relay *relay, const struct raft_request *req) { - route[0].f = relay_send_raft_msg; - route[0].pipe = &relay->tx_pipe; - route[1].f = raft_free_msg; - route[1].pipe = NULL; - cmsg_init(msg, route); - cpipe_push(&relay->relay_pipe, msg); + /* + * XXX: the message should be preallocated. It should + * work like Kharon in IProto. Relay should have 2 raft + * messages rotating. When one is sent, the other can be + * updated and a flag is set. When the first message is + * sent, the control returns to TX thread, sees the set + * flag, rotates the buffers, and sends it again. And so + * on. This is how it can work in future, with 0 heap + * allocations. Current solution with alloc-per-update is + * good enough as a start. Another option - wait until all + * is moved to WAL thread, where this will all happen + * in one thread and will be much simpler. + */ + struct relay_raft_msg *msg = + (struct relay_raft_msg *)malloc(sizeof(*msg)); + if (msg == NULL) { + panic("Couldn't allocate raft message"); + return; + } + msg->req = *req; + if (req->vclock != NULL) { + msg->req.vclock = &msg->vclock; + vclock_copy(&msg->vclock, req->vclock); + } + msg->route.f = relay_raft_msg_send; + msg->route.pipe = NULL; + cmsg_init(&msg->base, &msg->route); + msg->relay = relay; + cpipe_push(&relay->relay_pipe, &msg->base); } /** Send a single row to the client. */ diff --git a/src/box/relay.h b/src/box/relay.h index c2c30cd11..4d291698d 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -96,16 +96,11 @@ double relay_last_row_time(const struct relay *relay); /** - * Initialize a raft status message with the route to relay and - * back and push the message to relay. - * - * @param relay relay. - * @param msg a preallocated status message. - * @param route a preallocated message route. + * Send a Raft update request to the relay channel. It is not + * guaranteed that it will be delivered. The connection may break. */ void -relay_push_raft_msg(struct relay *relay, struct cmsg *msg, - struct cmsg_hop *route); +relay_push_raft(struct relay *relay, const struct raft_request *req); #if defined(__cplusplus) } /* extern "C" */ -- 2.20.1 (Apple Git-117)