Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko <sergepetrenko@tarantool.org>
To: v.shpilevoy@tarantool.org, gorcunov@gmail.com, sergos@tarantool.org
Cc: tarantool-patches@dev.tarantool.org
Subject: [Tarantool-patches] [RAFT 02/10] raft: relay status updates to followers
Date: Wed, 26 Aug 2020 10:52:34 +0300	[thread overview]
Message-ID: <ed0a2ef9659fddf76f51dd549fb5ab45e0d2df62.1598427905.git.sergepetrenko@tarantool.org> (raw)
In-Reply-To: <cover.1598427905.git.sergepetrenko@tarantool.org>

From: sergepetrenko <sergepetrenko@tarantool.org>

The patch introduces a new type of system message used to notify the
followers of the instance's raft status updates.
It's relay's responsibility to deliver the new system rows to its peers.
The notification system reuses and extends the same row type used to
persist raft state in WAL and snapshot.

Part of #1146
Part of #5204
---
 src/box/applier.cc         | 34 +++++++++++++++++++---
 src/box/box.cc             | 17 ++++++++++-
 src/box/iproto_constants.h |  1 +
 src/box/raft.c             | 58 ++++++++++++++++++++++++++++++++++++++
 src/box/raft.h             | 39 +++++++++++++++++++++++--
 src/box/relay.cc           | 34 ++++++++++++++++++++--
 src/box/relay.h            | 14 +++++++++
 src/box/xrow.c             | 38 +++++++++++++++++++++++--
 src/box/xrow.h             |  5 +---
 9 files changed, 225 insertions(+), 15 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index c1d07ca54..f27436b79 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -55,6 +55,7 @@
 #include "scoped_guard.h"
 #include "txn_limbo.h"
 #include "journal.h"
+#include "raft.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -298,6 +299,8 @@ apply_final_join_row(struct xrow_header *row)
 	 */
 	if (iproto_type_is_synchro_request(row->type))
 		return 0;
+	if (iproto_type_is_raft_request(row->type))
+		return 0;
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
 		return -1;
@@ -876,6 +879,23 @@ err:
 	return -1;
 }
 
+static int
+apply_raft_row(struct xrow_header *row)
+{
+	assert(iproto_type_is_raft_request(row->type));
+
+	struct raft_request req;
+	struct vclock candidate_clock;
+	req.vclock = &candidate_clock;
+
+	if (xrow_decode_raft(row, &req) != 0)
+		return -1;
+
+	raft_process_msg(&req);
+
+	return 0;
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -1219,11 +1239,17 @@ applier_subscribe(struct applier *applier)
 		 * In case of an heartbeat message wake a writer up
 		 * and check applier state.
 		 */
-		if (stailq_first_entry(&rows, struct applier_tx_row,
-				       next)->row.lsn == 0)
-			applier_signal_ack(applier);
-		else if (applier_apply_tx(&rows) != 0)
+		struct xrow_header *first_row =
+			&stailq_first_entry(&rows, struct applier_tx_row,
+					    next)->row;
+		if (first_row->lsn == 0) {
+			if (unlikely(iproto_type_is_raft_request(first_row->type)))
+				apply_raft_row(first_row);
+			else
+				applier_signal_ack(applier);
+		} else if (applier_apply_tx(&rows) != 0) {
 			diag_raise();
+		}
 
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
diff --git a/src/box/box.cc b/src/box/box.cc
index c0adccc6a..8323de531 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2050,7 +2050,22 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 		 tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
 	say_info("remote vclock %s local vclock %s",
 		 vclock_to_string(&replica_clock), vclock_to_string(&vclock));
-
+	/*
+	 * Send out the current raft state of the instance.
+	 */
+	if (raft.state != RAFT_STATE_NONE) {
+		struct raft_request req;
+		req.term = raft.term;
+		req.vote = raft.vote;
+		req.state = raft.state;
+		/*
+		 * Omit the candidate vclock, since we've just
+		 * sent it in subscribe response.
+		 */
+		req.vclock = NULL;
+		xrow_encode_raft(&row, &fiber()->gc, &req);
+		coio_write_xrow(io, &row);
+	}
 	/*
 	 * Replica clock is used in gc state and recovery
 	 * initialization, so we need to replace the remote 0-th
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 8a11626b3..4217ce2e0 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -263,6 +263,7 @@ extern const char *iproto_type_strs[];
 enum iproto_raft_keys {
 	IPROTO_RAFT_TERM = 0,
 	IPROTO_RAFT_VOTE = 1,
+	IPROTO_RAFT_STATE = 2,
 };
 
 /**
diff --git a/src/box/raft.c b/src/box/raft.c
index 5465f46b6..839a7dfeb 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -34,11 +34,15 @@
 #include "journal.h"
 #include "xrow.h"
 #include "small/region.h"
+#include "replication.h"
+#include "relay.h"
 
 /** Raft state of this instance. */
 struct raft raft = {
 	.term = 0,
 	.vote = 0,
+	.curr_leader = 0,
+	.state = RAFT_STATE_NONE,
 };
 
 void
@@ -50,9 +54,36 @@ raft_process(const struct raft_request *req)
 		raft.vote = req->vote;
 }
 
+void
+raft_process_msg(const struct raft_request *req)
+{
+	if (req->term > raft.term) {
+		// Update term.
+		// The logic will be similar, but the code
+		// below is for testing purposes.
+		raft.term = req->term;
+	}
+	if (req->vote > 0) {
+		// Check whether the vote's for us.
+	}
+	switch (req->state) {
+	case RAFT_STATE_FOLLOWER:
+	    break;
+	case RAFT_STATE_CANDIDATE:
+	    // Perform voting logic.
+	    break;
+	case RAFT_STATE_LEADER:
+	    // Switch to a new leader.
+	    break;
+	default:
+	    break;
+	}
+}
+
 void
 raft_serialize(struct raft_request *req)
 {
+	memset(req, 0, sizeof(*req));
 	req->term = raft.term;
 	req->vote = raft.vote;
 }
@@ -84,6 +115,9 @@ raft_write_request(const struct raft_request *req)
 		diag_log();
 		goto fail;
 	}
+
+	raft_broadcast(req);
+
 	region_truncate(region, svp);
 	return;
 fail:
@@ -118,3 +152,27 @@ raft_vote(uint32_t vote_for)
 	req.vote = vote_for;
 	raft_write_request(&req);
 }
+
+void
+raft_free_msg(struct cmsg *msg)
+{
+	free((void *)msg->route);
+	free(msg);
+}
+
+void
+raft_broadcast(const struct raft_request *req)
+{
+	replicaset_foreach(replica) {
+		if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
+		    relay_get_state(replica->relay) == RELAY_FOLLOW) {
+			// TODO: think of a proper allocator.
+			struct raft_broadcast_msg *raft_msg =
+				calloc(1, sizeof(*raft_msg));
+			raft_msg->req = *req;
+			struct cmsg_hop *route = calloc(2, sizeof(*route));
+			relay_push_raft_msg(replica->relay, &raft_msg->base,
+					    route);
+		}
+	}
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index 1f392033d..9cb39dd24 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -30,32 +30,67 @@
  * SUCH DAMAGE.
  */
 #include <stdint.h>
-
-struct raft_request;
+#include "cbus.h"
 
 #if defined(__cplusplus)
 extern "C" {
 #endif
 
+enum raft_state {
+	RAFT_STATE_NONE = 0,
+	RAFT_STATE_FOLLOWER = 1,
+	RAFT_STATE_CANDIDATE = 2,
+	RAFT_STATE_LEADER = 3
+};
+
 struct raft {
 	uint64_t term;
 	uint32_t vote;
+	uint32_t curr_leader;
+	enum raft_state state;
 };
 
 extern struct raft raft;
 
+struct raft_request {
+	uint64_t term;
+	uint32_t vote;
+	enum raft_state state;
+	struct vclock *vclock;
+};
+
+struct raft_broadcast_msg {
+	struct cmsg base;
+	struct raft_request req;
+};
+
 void
 raft_new_term(uint64_t min_new_term);
 
 void
 raft_vote(uint32_t vote_for);
 
+/** Process a raft entry stored in WAL/snapshot. */
 void
 raft_process(const struct raft_request *req);
 
+/** Process a raft status message coming from the network. */
+void
+raft_process_msg(const struct raft_request *req);
+
 void
 raft_serialize(struct raft_request *req);
 
+void
+raft_free_msg(struct cmsg *msg);
+
+/**
+ * Broadcast the changes in this instance's raft status to all
+ * the followers.
+ */
+void
+raft_broadcast(const struct raft_request *req);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a7843a8c2..be252cad1 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -54,6 +54,7 @@
 #include "xstream.h"
 #include "wal.h"
 #include "txn_limbo.h"
+#include "raft.h"
 
 /**
  * Cbus message to send status updates from relay to tx thread.
@@ -773,13 +774,40 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
+static void
+relay_send_raft(struct relay *relay, struct raft_request *req)
+{
+	struct xrow_header packet;
+	xrow_encode_raft(&packet, &fiber()->gc, req);
+	relay_send(relay, &packet);
+}
+
+static void
+relay_send_raft_msg(struct cmsg *msg)
+{
+	struct raft_broadcast_msg *raft_msg = (struct raft_broadcast_msg *)msg;
+	struct relay *relay = container_of(msg->route[0].pipe, struct relay,
+					   tx_pipe);
+	relay_send_raft(relay, &raft_msg->req);
+}
+
+void
+relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
+		    struct cmsg_hop *route)
+{
+	route[0].f = relay_send_raft_msg;
+	route[0].pipe = &relay->tx_pipe;
+	route[1].f = raft_free_msg;
+	route[1].pipe = NULL;
+	cmsg_init(msg, route);
+	cpipe_push(&relay->relay_pipe, msg);
+}
+
 /** Send a single row to the client. */
 static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	assert(iproto_type_is_dml(packet->type) ||
-	       iproto_type_is_synchro_request(packet->type));
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
 		 * We do not relay replica-local rows to other
@@ -796,6 +824,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
 	}
+	assert(iproto_type_is_dml(packet->type) ||
+	       iproto_type_is_synchro_request(packet->type));
 	/* Check if the rows from the instance are filtered. */
 	if ((1 << packet->replica_id & relay->id_filter) != 0)
 		return;
diff --git a/src/box/relay.h b/src/box/relay.h
index 0632fa912..c2c30cd11 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -41,6 +41,8 @@ struct relay;
 struct replica;
 struct tt_uuid;
 struct vclock;
+struct cmsg;
+struct cmsg_hop;
 
 enum relay_state {
 	/**
@@ -93,6 +95,18 @@ relay_vclock(const struct relay *relay);
 double
 relay_last_row_time(const struct relay *relay);
 
+/**
+ * Initialize a raft status message with the route to relay and
+ * back and push the message to relay.
+ *
+ * @param relay relay.
+ * @param msg a preallocated status message.
+ * @param route a preallocated message route.
+ */
+void
+relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
+		    struct cmsg_hop *route);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 1923bacfc..f60b12cfc 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -44,6 +44,7 @@
 #include "scramble.h"
 #include "iproto_constants.h"
 #include "mpstream/mpstream.h"
+#include "raft.h"
 
 static_assert(IPROTO_DATA < 0x7f && IPROTO_METADATA < 0x7f &&
 	      IPROTO_SQL_INFO < 0x7f, "encoded IPROTO_BODY keys must fit into "\
@@ -958,11 +959,26 @@ int
 xrow_encode_raft(struct xrow_header *row, struct region *region,
 		 const struct raft_request *r)
 {
+	assert(mp_sizeof_map(2) == mp_sizeof_map(4));
+	/*
+	 * Term and vote are encoded every time for the sake of
+	 * snapshot, while state and vclock are optional.
+	 */
 	size_t size = mp_sizeof_map(2) +
 		      mp_sizeof_uint(IPROTO_RAFT_TERM) +
 		      mp_sizeof_uint(r->term) +
 		      mp_sizeof_uint(IPROTO_RAFT_VOTE) +
 		      mp_sizeof_uint(r->vote);
+
+	size += (r->state != 0) * (mp_sizeof_uint(IPROTO_RAFT_STATE) +
+				   mp_sizeof_uint(r->state));
+	if (r->vclock != NULL) {
+		size += mp_sizeof_uint(IPROTO_VCLOCK) +
+		mp_sizeof_vclock_ignore0(r->vclock);
+	}
+
+	int map_size = 2 + (r->state != 0) + (r->vclock != NULL);
+
 	char *buf = region_alloc(region, size);
 	if (buf == NULL) {
 		diag_set(OutOfMemory, size, "region_alloc", "buf");
@@ -974,11 +990,20 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
 	row->body[0].iov_len = size;
 	row->group_id = GROUP_LOCAL;
 	row->bodycnt = 1;
-	buf = mp_encode_map(buf, 2);
+	buf = mp_encode_map(buf, map_size);
 	buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
 	buf = mp_encode_uint(buf, r->term);
 	buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
 	buf = mp_encode_uint(buf, r->vote);
+	if (r->state != 0) {
+		buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
+		buf = mp_encode_uint(buf, r->state);
+	}
+	if (r->vclock != NULL) {
+		buf = mp_encode_uint(buf, IPROTO_VCLOCK);
+		buf = mp_encode_vclock_ignore0(buf, r->vclock);
+	}
+
 	return 0;
 }
 
@@ -989,7 +1014,7 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
 	assert(row->type == IPROTO_RAFT);
 	assert(row->bodycnt == 1);
 	assert(row->group_id == GROUP_LOCAL);
-	memset(r, 0, sizeof(*r));
+	memset(r, 0, sizeof(*r) - sizeof(struct vclock *));
 	const char *pos = row->body[0].iov_base;
 	uint32_t map_size = mp_decode_map(&pos);
 	for (uint32_t i = 0; i < map_size; ++i)
@@ -1002,6 +1027,15 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
 		case IPROTO_RAFT_VOTE:
 			r->vote = mp_decode_uint(&pos);
 			break;
+		case IPROTO_RAFT_STATE:
+			r->state = mp_decode_uint(&pos);
+			break;
+		case IPROTO_VCLOCK:
+			if (r->vclock != NULL)
+				mp_decode_vclock_ignore0(&pos, r->vclock);
+			else
+				mp_next(&pos);
+			break;
 		default:
 			mp_next(&pos);
 			break;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index c234f6f88..3f37dc18f 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -264,10 +264,7 @@ xrow_encode_synchro(struct xrow_header *row,
 int
 xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
 
-struct raft_request {
-	uint64_t term;
-	uint32_t vote;
-};
+struct raft_request;
 
 int
 xrow_encode_raft(struct xrow_header *row, struct region *region,
-- 
2.20.1 (Apple Git-117)

  parent reply	other threads:[~2020-08-26  7:52 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-08-26  7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 01/10] raft: introduce persistent raft state Serge Petrenko
2020-08-26  7:52 ` Serge Petrenko [this message]
2020-08-27 20:36   ` [Tarantool-patches] [RAFT 02/10] raft: relay status updates to followers Vladislav Shpilevoy
2020-08-28 10:10     ` Sergey Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 03/10] [tosquash] raft: return raft_request to xrow Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 04/10] [tosquash] raft: introduce IPROTO_RAFT_VCLOCK Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 05/10] [tosquash] xrow: refactor raft request codec Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 06/10] [tosquash] raft: don't fill raft_request manually Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 07/10] [tosquash] raft: rename curr_leader to leader Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 08/10] [tosquash] raft: rename raft_process to raft_process_recovery Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 09/10] [tosquash] applier: handler error at raft row appliance Serge Petrenko
2020-08-26  7:53 ` [Tarantool-patches] [RAFT 10/10] [tosquash] relay: move raft broadcast details into relay Serge Petrenko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=ed0a2ef9659fddf76f51dd549fb5ab45e0d2df62.1598427905.git.sergepetrenko@tarantool.org \
    --to=sergepetrenko@tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=sergos@tarantool.org \
    --cc=tarantool-patches@dev.tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [RAFT 02/10] raft: relay status updates to followers' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox