[Tarantool-patches] [RAFT 02/10] raft: relay status updates to followers

Serge Petrenko sergepetrenko at tarantool.org
Wed Aug 26 10:52:34 MSK 2020


From: sergepetrenko <sergepetrenko at tarantool.org>

The patch introduces a new type of system message used to notify the
followers of the instance's raft status updates.
It's relay's responsibility to deliver the new system rows to its peers.
The notification system reuses and extends the same row type used to
persist raft state in WAL and snapshot.

Part of #1146
Part of #5204
---
 src/box/applier.cc         | 34 +++++++++++++++++++---
 src/box/box.cc             | 17 ++++++++++-
 src/box/iproto_constants.h |  1 +
 src/box/raft.c             | 58 ++++++++++++++++++++++++++++++++++++++
 src/box/raft.h             | 39 +++++++++++++++++++++++--
 src/box/relay.cc           | 34 ++++++++++++++++++++--
 src/box/relay.h            | 14 +++++++++
 src/box/xrow.c             | 38 +++++++++++++++++++++++--
 src/box/xrow.h             |  5 +---
 9 files changed, 225 insertions(+), 15 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index c1d07ca54..f27436b79 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -55,6 +55,7 @@
 #include "scoped_guard.h"
 #include "txn_limbo.h"
 #include "journal.h"
+#include "raft.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -298,6 +299,8 @@ apply_final_join_row(struct xrow_header *row)
 	 */
 	if (iproto_type_is_synchro_request(row->type))
 		return 0;
+	if (iproto_type_is_raft_request(row->type))
+		return 0;
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
 		return -1;
@@ -876,6 +879,23 @@ err:
 	return -1;
 }
 
+static int
+apply_raft_row(struct xrow_header *row)
+{
+	assert(iproto_type_is_raft_request(row->type));
+
+	struct raft_request req;
+	struct vclock candidate_clock;
+	req.vclock = &candidate_clock;
+
+	if (xrow_decode_raft(row, &req) != 0)
+		return -1;
+
+	raft_process_msg(&req);
+
+	return 0;
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -1219,11 +1239,17 @@ applier_subscribe(struct applier *applier)
 		 * In case of an heartbeat message wake a writer up
 		 * and check applier state.
 		 */
-		if (stailq_first_entry(&rows, struct applier_tx_row,
-				       next)->row.lsn == 0)
-			applier_signal_ack(applier);
-		else if (applier_apply_tx(&rows) != 0)
+		struct xrow_header *first_row =
+			&stailq_first_entry(&rows, struct applier_tx_row,
+					    next)->row;
+		if (first_row->lsn == 0) {
+			if (unlikely(iproto_type_is_raft_request(first_row->type)))
+				apply_raft_row(first_row);
+			else
+				applier_signal_ack(applier);
+		} else if (applier_apply_tx(&rows) != 0) {
 			diag_raise();
+		}
 
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
diff --git a/src/box/box.cc b/src/box/box.cc
index c0adccc6a..8323de531 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2050,7 +2050,22 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 		 tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
 	say_info("remote vclock %s local vclock %s",
 		 vclock_to_string(&replica_clock), vclock_to_string(&vclock));
-
+	/*
+	 * Send out the current raft state of the instance.
+	 */
+	if (raft.state != RAFT_STATE_NONE) {
+		struct raft_request req;
+		req.term = raft.term;
+		req.vote = raft.vote;
+		req.state = raft.state;
+		/*
+		 * Omit the candidate vclock, since we've just
+		 * sent it in subscribe response.
+		 */
+		req.vclock = NULL;
+		xrow_encode_raft(&row, &fiber()->gc, &req);
+		coio_write_xrow(io, &row);
+	}
 	/*
 	 * Replica clock is used in gc state and recovery
 	 * initialization, so we need to replace the remote 0-th
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 8a11626b3..4217ce2e0 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -263,6 +263,7 @@ extern const char *iproto_type_strs[];
 enum iproto_raft_keys {
 	IPROTO_RAFT_TERM = 0,
 	IPROTO_RAFT_VOTE = 1,
+	IPROTO_RAFT_STATE = 2,
 };
 
 /**
diff --git a/src/box/raft.c b/src/box/raft.c
index 5465f46b6..839a7dfeb 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -34,11 +34,15 @@
 #include "journal.h"
 #include "xrow.h"
 #include "small/region.h"
+#include "replication.h"
+#include "relay.h"
 
 /** Raft state of this instance. */
 struct raft raft = {
 	.term = 0,
 	.vote = 0,
+	.curr_leader = 0,
+	.state = RAFT_STATE_NONE,
 };
 
 void
@@ -50,9 +54,36 @@ raft_process(const struct raft_request *req)
 		raft.vote = req->vote;
 }
 
+void
+raft_process_msg(const struct raft_request *req)
+{
+	if (req->term > raft.term) {
+		// Update term.
+		// The logic will be similar, but the code
+		// below is for testing purposes.
+		raft.term = req->term;
+	}
+	if (req->vote > 0) {
+		// Check whether the vote's for us.
+	}
+	switch (req->state) {
+	case RAFT_STATE_FOLLOWER:
+	    break;
+	case RAFT_STATE_CANDIDATE:
+	    // Perform voting logic.
+	    break;
+	case RAFT_STATE_LEADER:
+	    // Switch to a new leader.
+	    break;
+	default:
+	    break;
+	}
+}
+
 void
 raft_serialize(struct raft_request *req)
 {
+	memset(req, 0, sizeof(*req));
 	req->term = raft.term;
 	req->vote = raft.vote;
 }
@@ -84,6 +115,9 @@ raft_write_request(const struct raft_request *req)
 		diag_log();
 		goto fail;
 	}
+
+	raft_broadcast(req);
+
 	region_truncate(region, svp);
 	return;
 fail:
@@ -118,3 +152,27 @@ raft_vote(uint32_t vote_for)
 	req.vote = vote_for;
 	raft_write_request(&req);
 }
+
+void
+raft_free_msg(struct cmsg *msg)
+{
+	free((void *)msg->route);
+	free(msg);
+}
+
+void
+raft_broadcast(const struct raft_request *req)
+{
+	replicaset_foreach(replica) {
+		if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
+		    relay_get_state(replica->relay) == RELAY_FOLLOW) {
+			// TODO: think of a proper allocator.
+			struct raft_broadcast_msg *raft_msg =
+				calloc(1, sizeof(*raft_msg));
+			raft_msg->req = *req;
+			struct cmsg_hop *route = calloc(2, sizeof(*route));
+			relay_push_raft_msg(replica->relay, &raft_msg->base,
+					    route);
+		}
+	}
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index 1f392033d..9cb39dd24 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -30,32 +30,67 @@
  * SUCH DAMAGE.
  */
 #include <stdint.h>
-
-struct raft_request;
+#include "cbus.h"
 
 #if defined(__cplusplus)
 extern "C" {
 #endif
 
+enum raft_state {
+	RAFT_STATE_NONE = 0,
+	RAFT_STATE_FOLLOWER = 1,
+	RAFT_STATE_CANDIDATE = 2,
+	RAFT_STATE_LEADER = 3
+};
+
 struct raft {
 	uint64_t term;
 	uint32_t vote;
+	uint32_t curr_leader;
+	enum raft_state state;
 };
 
 extern struct raft raft;
 
+struct raft_request {
+	uint64_t term;
+	uint32_t vote;
+	enum raft_state state;
+	struct vclock *vclock;
+};
+
+struct raft_broadcast_msg {
+	struct cmsg base;
+	struct raft_request req;
+};
+
 void
 raft_new_term(uint64_t min_new_term);
 
 void
 raft_vote(uint32_t vote_for);
 
+/** Process a raft entry stored in WAL/snapshot. */
 void
 raft_process(const struct raft_request *req);
 
+/** Process a raft status message coming from the network. */
+void
+raft_process_msg(const struct raft_request *req);
+
 void
 raft_serialize(struct raft_request *req);
 
+void
+raft_free_msg(struct cmsg *msg);
+
+/**
+ * Broadcast the changes in this instance's raft status to all
+ * the followers.
+ */
+void
+raft_broadcast(const struct raft_request *req);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a7843a8c2..be252cad1 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -54,6 +54,7 @@
 #include "xstream.h"
 #include "wal.h"
 #include "txn_limbo.h"
+#include "raft.h"
 
 /**
  * Cbus message to send status updates from relay to tx thread.
@@ -773,13 +774,40 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
+static void
+relay_send_raft(struct relay *relay, struct raft_request *req)
+{
+	struct xrow_header packet;
+	xrow_encode_raft(&packet, &fiber()->gc, req);
+	relay_send(relay, &packet);
+}
+
+static void
+relay_send_raft_msg(struct cmsg *msg)
+{
+	struct raft_broadcast_msg *raft_msg = (struct raft_broadcast_msg *)msg;
+	struct relay *relay = container_of(msg->route[0].pipe, struct relay,
+					   tx_pipe);
+	relay_send_raft(relay, &raft_msg->req);
+}
+
+void
+relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
+		    struct cmsg_hop *route)
+{
+	route[0].f = relay_send_raft_msg;
+	route[0].pipe = &relay->tx_pipe;
+	route[1].f = raft_free_msg;
+	route[1].pipe = NULL;
+	cmsg_init(msg, route);
+	cpipe_push(&relay->relay_pipe, msg);
+}
+
 /** Send a single row to the client. */
 static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	assert(iproto_type_is_dml(packet->type) ||
-	       iproto_type_is_synchro_request(packet->type));
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
 		 * We do not relay replica-local rows to other
@@ -796,6 +824,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
 	}
+	assert(iproto_type_is_dml(packet->type) ||
+	       iproto_type_is_synchro_request(packet->type));
 	/* Check if the rows from the instance are filtered. */
 	if ((1 << packet->replica_id & relay->id_filter) != 0)
 		return;
diff --git a/src/box/relay.h b/src/box/relay.h
index 0632fa912..c2c30cd11 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -41,6 +41,8 @@ struct relay;
 struct replica;
 struct tt_uuid;
 struct vclock;
+struct cmsg;
+struct cmsg_hop;
 
 enum relay_state {
 	/**
@@ -93,6 +95,18 @@ relay_vclock(const struct relay *relay);
 double
 relay_last_row_time(const struct relay *relay);
 
+/**
+ * Initialize a raft status message with the route to relay and
+ * back and push the message to relay.
+ *
+ * @param relay relay.
+ * @param msg a preallocated status message.
+ * @param route a preallocated message route.
+ */
+void
+relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
+		    struct cmsg_hop *route);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 1923bacfc..f60b12cfc 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -44,6 +44,7 @@
 #include "scramble.h"
 #include "iproto_constants.h"
 #include "mpstream/mpstream.h"
+#include "raft.h"
 
 static_assert(IPROTO_DATA < 0x7f && IPROTO_METADATA < 0x7f &&
 	      IPROTO_SQL_INFO < 0x7f, "encoded IPROTO_BODY keys must fit into "\
@@ -958,11 +959,26 @@ int
 xrow_encode_raft(struct xrow_header *row, struct region *region,
 		 const struct raft_request *r)
 {
+	assert(mp_sizeof_map(2) == mp_sizeof_map(4));
+	/*
+	 * Term and vote are encoded every time for the sake of
+	 * snapshot, while state and vclock are optional.
+	 */
 	size_t size = mp_sizeof_map(2) +
 		      mp_sizeof_uint(IPROTO_RAFT_TERM) +
 		      mp_sizeof_uint(r->term) +
 		      mp_sizeof_uint(IPROTO_RAFT_VOTE) +
 		      mp_sizeof_uint(r->vote);
+
+	size += (r->state != 0) * (mp_sizeof_uint(IPROTO_RAFT_STATE) +
+				   mp_sizeof_uint(r->state));
+	if (r->vclock != NULL) {
+		size += mp_sizeof_uint(IPROTO_VCLOCK) +
+		mp_sizeof_vclock_ignore0(r->vclock);
+	}
+
+	int map_size = 2 + (r->state != 0) + (r->vclock != NULL);
+
 	char *buf = region_alloc(region, size);
 	if (buf == NULL) {
 		diag_set(OutOfMemory, size, "region_alloc", "buf");
@@ -974,11 +990,20 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
 	row->body[0].iov_len = size;
 	row->group_id = GROUP_LOCAL;
 	row->bodycnt = 1;
-	buf = mp_encode_map(buf, 2);
+	buf = mp_encode_map(buf, map_size);
 	buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
 	buf = mp_encode_uint(buf, r->term);
 	buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
 	buf = mp_encode_uint(buf, r->vote);
+	if (r->state != 0) {
+		buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
+		buf = mp_encode_uint(buf, r->state);
+	}
+	if (r->vclock != NULL) {
+		buf = mp_encode_uint(buf, IPROTO_VCLOCK);
+		buf = mp_encode_vclock_ignore0(buf, r->vclock);
+	}
+
 	return 0;
 }
 
@@ -989,7 +1014,7 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
 	assert(row->type == IPROTO_RAFT);
 	assert(row->bodycnt == 1);
 	assert(row->group_id == GROUP_LOCAL);
-	memset(r, 0, sizeof(*r));
+	memset(r, 0, sizeof(*r) - sizeof(struct vclock *));
 	const char *pos = row->body[0].iov_base;
 	uint32_t map_size = mp_decode_map(&pos);
 	for (uint32_t i = 0; i < map_size; ++i)
@@ -1002,6 +1027,15 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
 		case IPROTO_RAFT_VOTE:
 			r->vote = mp_decode_uint(&pos);
 			break;
+		case IPROTO_RAFT_STATE:
+			r->state = mp_decode_uint(&pos);
+			break;
+		case IPROTO_VCLOCK:
+			if (r->vclock != NULL)
+				mp_decode_vclock_ignore0(&pos, r->vclock);
+			else
+				mp_next(&pos);
+			break;
 		default:
 			mp_next(&pos);
 			break;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index c234f6f88..3f37dc18f 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -264,10 +264,7 @@ xrow_encode_synchro(struct xrow_header *row,
 int
 xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
 
-struct raft_request {
-	uint64_t term;
-	uint32_t vote;
-};
+struct raft_request;
 
 int
 xrow_encode_raft(struct xrow_header *row, struct region *region,
-- 
2.20.1 (Apple Git-117)



More information about the Tarantool-patches mailing list