[Tarantool-patches] [RAFT 10/10] [tosquash] relay: move raft broadcast details into relay

Serge Petrenko sergepetrenko at tarantool.org
Wed Aug 26 10:53:29 MSK 2020


From: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>

Raft did some allocations, cbus and cmsg initializations, in
order to broadcast its state update. Also it didn't copy vclock
value so it could point at invalid memory.

The patch moves all the details about pushing a raft update into
relay.cc file, and fixes the vclock copying.
---
 src/box/raft.c   | 15 +----------
 src/box/raft.h   |  8 ------
 src/box/relay.cc | 68 ++++++++++++++++++++++++++++++++++--------------
 src/box/relay.h  | 11 +++-----
 4 files changed, 52 insertions(+), 50 deletions(-)

diff --git a/src/box/raft.c b/src/box/raft.c
index 34c1cf7aa..e40d778af 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -169,26 +169,13 @@ raft_vote(uint32_t vote_for)
 	raft_write_request(&req);
 }
 
-void
-raft_free_msg(struct cmsg *msg)
-{
-	free((void *)msg->route);
-	free(msg);
-}
-
 void
 raft_broadcast(const struct raft_request *req)
 {
 	replicaset_foreach(replica) {
 		if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
 		    relay_get_state(replica->relay) == RELAY_FOLLOW) {
-			// TODO: think of a proper allocator.
-			struct raft_broadcast_msg *raft_msg =
-				calloc(1, sizeof(*raft_msg));
-			raft_msg->req = *req;
-			struct cmsg_hop *route = calloc(2, sizeof(*route));
-			relay_push_raft_msg(replica->relay, &raft_msg->base,
-					    route);
+			relay_push_raft(replica->relay, req);
 		}
 	}
 }
diff --git a/src/box/raft.h b/src/box/raft.h
index be071c215..e14173057 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -52,11 +52,6 @@ struct raft {
 
 extern struct raft raft;
 
-struct raft_broadcast_msg {
-	struct cmsg base;
-	struct raft_request req;
-};
-
 void
 raft_new_term(uint64_t min_new_term);
 
@@ -74,9 +69,6 @@ raft_process_msg(const struct raft_request *req);
 void
 raft_serialize(struct raft_request *req, struct vclock *vclock);
 
-void
-raft_free_msg(struct cmsg *msg);
-
 /**
  * Broadcast the changes in this instance's raft status to all
  * the followers.
diff --git a/src/box/relay.cc b/src/box/relay.cc
index be252cad1..53a90f826 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -774,33 +774,61 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
-static void
-relay_send_raft(struct relay *relay, struct raft_request *req)
-{
-	struct xrow_header packet;
-	xrow_encode_raft(&packet, &fiber()->gc, req);
-	relay_send(relay, &packet);
-}
+struct relay_raft_msg {
+	struct cmsg base;
+	struct cmsg_hop route;
+	struct raft_request req;
+	struct vclock vclock;
+	struct relay *relay;
+};
 
 static void
-relay_send_raft_msg(struct cmsg *msg)
+relay_raft_msg_send(struct cmsg *base)
 {
-	struct raft_broadcast_msg *raft_msg = (struct raft_broadcast_msg *)msg;
-	struct relay *relay = container_of(msg->route[0].pipe, struct relay,
-					   tx_pipe);
-	relay_send_raft(relay, &raft_msg->req);
+	struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
+	struct xrow_header row;
+	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
+	try {
+		relay_send(msg->relay, &row);
+	} catch (Exception *e) {
+		relay_set_error(msg->relay, e);
+		fiber_cancel(fiber());
+	}
+	free(msg);
 }
 
 void
-relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
-		    struct cmsg_hop *route)
+relay_push_raft(struct relay *relay, const struct raft_request *req)
 {
-	route[0].f = relay_send_raft_msg;
-	route[0].pipe = &relay->tx_pipe;
-	route[1].f = raft_free_msg;
-	route[1].pipe = NULL;
-	cmsg_init(msg, route);
-	cpipe_push(&relay->relay_pipe, msg);
+	/*
+	 * XXX: the message should be preallocated. It should
+	 * work like Kharon in IProto. Relay should have 2 raft
+	 * messages rotating. When one is sent, the other can be
+	 * updated and a flag is set. When the first message is
+	 * sent, the control returns to TX thread, sees the set
+	 * flag, rotates the buffers, and sends it again. And so
+	 * on. This is how it can work in future, with 0 heap
+	 * allocations. Current solution with alloc-per-update is
+	 * good enough as a start. Another option - wait until all
+	 * is moved to WAL thread, where this will all happen
+	 * in one thread and will be much simpler.
+	 */
+	struct relay_raft_msg *msg =
+		(struct relay_raft_msg *)malloc(sizeof(*msg));
+	if (msg == NULL) {
+		panic("Couldn't allocate raft message");
+		return;
+	}
+	msg->req = *req;
+	if (req->vclock != NULL) {
+		msg->req.vclock = &msg->vclock;
+		vclock_copy(&msg->vclock, req->vclock);
+	}
+	msg->route.f = relay_raft_msg_send;
+	msg->route.pipe = NULL;
+	cmsg_init(&msg->base, &msg->route);
+	msg->relay = relay;
+	cpipe_push(&relay->relay_pipe, &msg->base);
 }
 
 /** Send a single row to the client. */
diff --git a/src/box/relay.h b/src/box/relay.h
index c2c30cd11..4d291698d 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -96,16 +96,11 @@ double
 relay_last_row_time(const struct relay *relay);
 
 /**
- * Initialize a raft status message with the route to relay and
- * back and push the message to relay.
- *
- * @param relay relay.
- * @param msg a preallocated status message.
- * @param route a preallocated message route.
+ * Send a Raft update request to the relay channel. It is not
+ * guaranteed that it will be delivered. The connection may break.
  */
 void
-relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
-		    struct cmsg_hop *route);
+relay_push_raft(struct relay *relay, const struct raft_request *req);
 
 #if defined(__cplusplus)
 } /* extern "C" */
-- 
2.20.1 (Apple Git-117)



More information about the Tarantool-patches mailing list