[Tarantool-patches] [PATCH v2 08/11] raft: relay status updates to followers

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu Sep 10 02:16:59 MSK 2020


From: sergepetrenko <sergepetrenko at tarantool.org>

The patch introduces a new type of system message used to notify the
followers of the instance's raft status updates.
It's relay's responsibility to deliver the new system rows to its peers.
The notification system reuses and extends the same row type used to
persist raft state in WAL and snapshot.

Part of #1146
Part of #5204
---
 src/box/applier.cc         | 31 ++++++++++++--
 src/box/box.cc             | 21 +++++++++-
 src/box/iproto_constants.h |  2 +
 src/box/memtx_engine.c     |  3 +-
 src/box/raft.c             | 72 ++++++++++++++++++++++++++++++++-
 src/box/raft.h             | 35 +++++++++++++++-
 src/box/relay.cc           | 62 +++++++++++++++++++++++++++-
 src/box/relay.h            |  7 ++++
 src/box/xrow.c             | 83 ++++++++++++++++++++++++++++++++------
 src/box/xrow.h             |  5 ++-
 10 files changed, 297 insertions(+), 24 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index c1d07ca54..ed76bf2ca 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -55,6 +55,7 @@
 #include "scoped_guard.h"
 #include "txn_limbo.h"
 #include "journal.h"
+#include "raft.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -298,6 +299,8 @@ apply_final_join_row(struct xrow_header *row)
 	 */
 	if (iproto_type_is_synchro_request(row->type))
 		return 0;
+	if (iproto_type_is_raft_request(row->type))
+		return 0;
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
 		return -1;
@@ -876,6 +879,19 @@ err:
 	return -1;
 }
 
+static int
+applier_handle_raft(struct applier *applier, struct xrow_header *row)
+{
+	assert(iproto_type_is_raft_request(row->type));
+
+	struct raft_request req;
+	struct vclock candidate_clock;
+	if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
+		return -1;
+	raft_process_msg(&req, applier->instance_id);
+	return 0;
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -1219,11 +1235,20 @@ applier_subscribe(struct applier *applier)
 		 * In case of an heartbeat message wake a writer up
 		 * and check applier state.
 		 */
-		if (stailq_first_entry(&rows, struct applier_tx_row,
-				       next)->row.lsn == 0)
+		struct xrow_header *first_row =
+			&stailq_first_entry(&rows, struct applier_tx_row,
+					    next)->row;
+		if (first_row->lsn == 0) {
+			if (unlikely(iproto_type_is_raft_request(
+							first_row->type))) {
+				if (applier_handle_raft(applier,
+							first_row) != 0)
+					diag_raise();
+			}
 			applier_signal_ack(applier);
-		else if (applier_apply_tx(&rows) != 0)
+		} else if (applier_apply_tx(&rows) != 0) {
 			diag_raise();
+		}
 
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
diff --git a/src/box/box.cc b/src/box/box.cc
index 7c3c895d2..980754d1d 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -387,7 +387,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 	}
 	if (iproto_type_is_raft_request(row->type)) {
 		struct raft_request raft_req;
-		if (xrow_decode_raft(row, &raft_req) != 0)
+		/* Vclock is never persisted in WAL by Raft. */
+		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
 			diag_raise();
 		raft_process_recovery(&raft_req);
 		return;
@@ -2132,7 +2133,23 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 		 tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
 	say_info("remote vclock %s local vclock %s",
 		 vclock_to_string(&replica_clock), vclock_to_string(&vclock));
-
+	if (raft_is_enabled()) {
+		/*
+		 * Send out the current raft state of the instance. Don't do
+		 * that if Raft is disabled. It can be that a part of the
+		 * cluster still contains old versions, which can't handle Raft
+		 * messages. So when it is disabled, its network footprint
+		 * should be 0.
+		 */
+		struct raft_request req;
+		/*
+		 * Omit the candidate vclock, since we've just sent it in
+		 * subscribe response.
+		 */
+		raft_serialize_for_network(&req, NULL);
+		xrow_encode_raft(&row, &fiber()->gc, &req);
+		coio_write_xrow(io, &row);
+	}
 	/*
 	 * Replica clock is used in gc state and recovery
 	 * initialization, so we need to replace the remote 0-th
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 8a11626b3..3ec397d3c 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -263,6 +263,8 @@ extern const char *iproto_type_strs[];
 enum iproto_raft_keys {
 	IPROTO_RAFT_TERM = 0,
 	IPROTO_RAFT_VOTE = 1,
+	IPROTO_RAFT_STATE = 2,
+	IPROTO_RAFT_VCLOCK = 3,
 };
 
 /**
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 5ab2cf266..166fe3136 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -206,7 +206,8 @@ memtx_engine_recover_raft(const struct xrow_header *row)
 {
 	assert(row->type == IPROTO_RAFT);
 	struct raft_request req;
-	if (xrow_decode_raft(row, &req) != 0)
+	/* Vclock is never persisted in WAL by Raft. */
+	if (xrow_decode_raft(row, &req, NULL) != 0)
 		return -1;
 	raft_process_recovery(&req);
 	return 0;
diff --git a/src/box/raft.c b/src/box/raft.c
index ee54d02b7..4d3d07c48 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -34,9 +34,20 @@
 #include "journal.h"
 #include "xrow.h"
 #include "small/region.h"
+#include "replication.h"
+#include "relay.h"
+
+const char *raft_state_strs[] = {
+	NULL,
+	"follower",
+	"candidate",
+	"leader",
+};
 
 /** Raft state of this instance. */
 struct raft raft = {
+	.leader = 0,
+	.state = RAFT_STATE_FOLLOWER,
 	.is_enabled = false,
 	.is_candidate = false,
 	.term = 1,
@@ -50,18 +61,66 @@ raft_process_recovery(const struct raft_request *req)
 		raft.term = req->term;
 	if (req->vote != 0)
 		raft.vote = req->vote;
+	/*
+	 * Role is never persisted. If recovery is happening, the
+	 * node was restarted, and the former role can be false
+	 * anyway.
+	 */
+	assert(req->state == 0);
+	/*
+	 * Vclock is always persisted by some other subsystem - WAL, snapshot.
+	 * It is used only to decide to whom to give the vote during election,
+	 * as a part of the volatile state.
+	 */
+	assert(req->vclock == NULL);
+	/* Raft is not enabled until recovery is finished. */
+	assert(!raft_is_enabled());
 }
 
 void
-raft_serialize_for_network(struct raft_request *req)
+raft_process_msg(const struct raft_request *req, uint32_t source)
 {
+	(void)source;
+	if (req->term > raft.term) {
+		// Update term.
+		// The logic will be similar, but the code
+		// below is for testing purposes.
+		raft.term = req->term;
+	}
+	if (req->vote > 0) {
+		// Check whether the vote's for us.
+	}
+	switch (req->state) {
+	case RAFT_STATE_FOLLOWER:
+	    break;
+	case RAFT_STATE_CANDIDATE:
+	    // Perform voting logic.
+	    break;
+	case RAFT_STATE_LEADER:
+	    // Switch to a new leader.
+	    break;
+	default:
+	    break;
+	}
+}
+
+void
+raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
+{
+	memset(req, 0, sizeof(*req));
 	req->term = raft.term;
 	req->vote = raft.vote;
+	req->state = raft.state;
+	/*
+	 * Raft does not own vclock, so it always expects it passed externally.
+	 */
+	req->vclock = vclock;
 }
 
 void
 raft_serialize_for_disk(struct raft_request *req)
 {
+	memset(req, 0, sizeof(*req));
 	req->term = raft.term;
 	req->vote = raft.vote;
 }
@@ -93,3 +152,14 @@ void
 raft_cfg_death_timeout(void)
 {
 }
+
+void
+raft_broadcast(const struct raft_request *req)
+{
+	replicaset_foreach(replica) {
+		if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
+		    relay_get_state(replica->relay) == RELAY_FOLLOW) {
+			relay_push_raft(replica->relay, req);
+		}
+	}
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index f27222752..db64cf933 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -37,8 +37,19 @@ extern "C" {
 #endif
 
 struct raft_request;
+struct vclock;
+
+enum raft_state {
+	RAFT_STATE_FOLLOWER = 1,
+	RAFT_STATE_CANDIDATE = 2,
+	RAFT_STATE_LEADER = 3,
+};
+
+extern const char *raft_state_strs[];
 
 struct raft {
+	uint32_t leader;
+	enum raft_state state;
 	bool is_enabled;
 	bool is_candidate;
 	uint64_t term;
@@ -48,10 +59,25 @@ struct raft {
 
 extern struct raft raft;
 
+/** Check if Raft is enabled. */
+static inline bool
+raft_is_enabled(void)
+{
+	return raft.is_enabled;
+}
+
 /** Process a raft entry stored in WAL/snapshot. */
 void
 raft_process_recovery(const struct raft_request *req);
 
+/**
+ * Process a raft status message coming from the network.
+ * @param req Raft request.
+ * @param source Instance ID of the message sender.
+ */
+void
+raft_process_msg(const struct raft_request *req, uint32_t source);
+
 /** Configure whether Raft is enabled. */
 void
 raft_cfg_is_enabled(bool is_enabled);
@@ -88,7 +114,7 @@ raft_cfg_death_timeout(void);
  * cluster. It is allowed to save anything here, not only persistent state.
  */
 void
-raft_serialize_for_network(struct raft_request *req);
+raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
 
 /**
  * Save complete Raft state into a request to be persisted on disk. Only term
@@ -97,6 +123,13 @@ raft_serialize_for_network(struct raft_request *req);
 void
 raft_serialize_for_disk(struct raft_request *req);
 
+/**
+ * Broadcast the changes in this instance's raft status to all
+ * the followers.
+ */
+void
+raft_broadcast(const struct raft_request *req);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 124b0f52f..74581db9c 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -53,6 +53,7 @@
 #include "xstream.h"
 #include "wal.h"
 #include "txn_limbo.h"
+#include "raft.h"
 
 /**
  * Cbus message to send status updates from relay to tx thread.
@@ -770,13 +771,68 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
+struct relay_raft_msg {
+	struct cmsg base;
+	struct cmsg_hop route;
+	struct raft_request req;
+	struct vclock vclock;
+	struct relay *relay;
+};
+
+static void
+relay_raft_msg_push(struct cmsg *base)
+{
+	struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
+	struct xrow_header row;
+	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
+	try {
+		relay_send(msg->relay, &row);
+	} catch (Exception *e) {
+		relay_set_error(msg->relay, e);
+		fiber_cancel(fiber());
+	}
+	free(msg);
+}
+
+void
+relay_push_raft(struct relay *relay, const struct raft_request *req)
+{
+	/*
+	 * XXX: the message should be preallocated. It should
+	 * work like Kharon in IProto. Relay should have 2 raft
+	 * messages rotating. When one is sent, the other can be
+	 * updated and a flag is set. When the first message is
+	 * sent, the control returns to TX thread, sees the set
+	 * flag, rotates the buffers, and sends it again. And so
+	 * on. This is how it can work in future, with 0 heap
+	 * allocations. Current solution with alloc-per-update is
+	 * good enough as a start. Another option - wait until all
+	 * is moved to WAL thread, where this will all happen
+	 * in one thread and will be much simpler.
+	 */
+	struct relay_raft_msg *msg =
+		(struct relay_raft_msg *)malloc(sizeof(*msg));
+	if (msg == NULL) {
+		panic("Couldn't allocate raft message");
+		return;
+	}
+	msg->req = *req;
+	if (req->vclock != NULL) {
+		msg->req.vclock = &msg->vclock;
+		vclock_copy(&msg->vclock, req->vclock);
+	}
+	msg->route.f = relay_raft_msg_push;
+	msg->route.pipe = NULL;
+	cmsg_init(&msg->base, &msg->route);
+	msg->relay = relay;
+	cpipe_push(&relay->relay_pipe, &msg->base);
+}
+
 /** Send a single row to the client. */
 static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	assert(iproto_type_is_dml(packet->type) ||
-	       iproto_type_is_synchro_request(packet->type));
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
 		 * We do not relay replica-local rows to other
@@ -793,6 +849,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
 	}
+	assert(iproto_type_is_dml(packet->type) ||
+	       iproto_type_is_synchro_request(packet->type));
 	/* Check if the rows from the instance are filtered. */
 	if ((1 << packet->replica_id & relay->id_filter) != 0)
 		return;
diff --git a/src/box/relay.h b/src/box/relay.h
index 0632fa912..b32e2ea2a 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -93,6 +93,13 @@ relay_vclock(const struct relay *relay);
 double
 relay_last_row_time(const struct relay *relay);
 
+/**
+ * Send a Raft update request to the relay channel. It is not
+ * guaranteed that it will be delivered. The connection may break.
+ */
+void
+relay_push_raft(struct relay *relay, const struct raft_request *req);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 1923bacfc..11fdacc0d 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -958,11 +958,30 @@ int
 xrow_encode_raft(struct xrow_header *row, struct region *region,
 		 const struct raft_request *r)
 {
-	size_t size = mp_sizeof_map(2) +
-		      mp_sizeof_uint(IPROTO_RAFT_TERM) +
-		      mp_sizeof_uint(r->term) +
-		      mp_sizeof_uint(IPROTO_RAFT_VOTE) +
-		      mp_sizeof_uint(r->vote);
+	/*
+	 * Terms is encoded always. Sometimes the rest can be even ignored if
+	 * the term is too old.
+	 */
+	int map_size = 1;
+	size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) +
+		      mp_sizeof_uint(r->term);
+	if (r->vote != 0) {
+		++map_size;
+		size += mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+			mp_sizeof_uint(r->vote);
+	}
+	if (r->state != 0) {
+		++map_size;
+		size += mp_sizeof_uint(IPROTO_RAFT_STATE) +
+			mp_sizeof_uint(r->state);
+	}
+	if (r->vclock != NULL) {
+		++map_size;
+		size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) +
+			mp_sizeof_vclock_ignore0(r->vclock);
+	}
+	size += mp_sizeof_map(map_size);
+
 	char *buf = region_alloc(region, size);
 	if (buf == NULL) {
 		diag_set(OutOfMemory, size, "region_alloc", "buf");
@@ -974,40 +993,78 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
 	row->body[0].iov_len = size;
 	row->group_id = GROUP_LOCAL;
 	row->bodycnt = 1;
-	buf = mp_encode_map(buf, 2);
+	buf = mp_encode_map(buf, map_size);
 	buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
 	buf = mp_encode_uint(buf, r->term);
-	buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
-	buf = mp_encode_uint(buf, r->vote);
+	if (r->vote != 0) {
+		buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+		buf = mp_encode_uint(buf, r->vote);
+	}
+	if (r->state != 0) {
+		buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
+		buf = mp_encode_uint(buf, r->state);
+	}
+	if (r->vclock != NULL) {
+		buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK);
+		buf = mp_encode_vclock_ignore0(buf, r->vclock);
+	}
 	return 0;
 }
 
 int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+		 struct vclock *vclock)
 {
-	/* TODO: handle bad format. */
 	assert(row->type == IPROTO_RAFT);
-	assert(row->bodycnt == 1);
-	assert(row->group_id == GROUP_LOCAL);
+	if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) {
+		diag_set(ClientError, ER_INVALID_MSGPACK,
+			 "malformed raft request");
+		return -1;
+	}
 	memset(r, 0, sizeof(*r));
-	const char *pos = row->body[0].iov_base;
+	r->vclock = vclock;
+
+	const char *begin = row->body[0].iov_base;
+	const char *end = begin + row->body[0].iov_len;
+	const char *pos = begin;
 	uint32_t map_size = mp_decode_map(&pos);
 	for (uint32_t i = 0; i < map_size; ++i)
 	{
+		if (mp_typeof(*pos) != MP_UINT)
+			goto bad_msgpack;
 		uint64_t key = mp_decode_uint(&pos);
 		switch (key) {
 		case IPROTO_RAFT_TERM:
+			if (mp_typeof(*pos) != MP_UINT)
+				goto bad_msgpack;
 			r->term = mp_decode_uint(&pos);
 			break;
 		case IPROTO_RAFT_VOTE:
+			if (mp_typeof(*pos) != MP_UINT)
+				goto bad_msgpack;
 			r->vote = mp_decode_uint(&pos);
 			break;
+		case IPROTO_RAFT_STATE:
+			if (mp_typeof(*pos) != MP_UINT)
+				goto bad_msgpack;
+			r->state = mp_decode_uint(&pos);
+			break;
+		case IPROTO_RAFT_VCLOCK:
+			if (r->vclock == NULL)
+				mp_next(&pos);
+			else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0)
+				goto bad_msgpack;
+			break;
 		default:
 			mp_next(&pos);
 			break;
 		}
 	}
 	return 0;
+
+bad_msgpack:
+	xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body");
+	return -1;
 }
 
 int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index c234f6f88..c627102dd 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -267,6 +267,8 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
 struct raft_request {
 	uint64_t term;
 	uint32_t vote;
+	uint32_t state;
+	struct vclock *vclock;
 };
 
 int
@@ -274,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
 		 const struct raft_request *r);
 
 int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+		 struct vclock *vclock);
 
 /**
  * CALL/EVAL request.
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list