[Tarantool-patches] [PATCH 09/12] raft: introduce raft_msg, drop xrow dependency

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Nov 17 03:02:26 MSK 2020


Raft used to depend on xrow, because it used raft_request as a
communication and persistence unit. Xrow is a part of src/box
library set, so it blocked Raft extraction into src/lib/raft.

This patch makes Raft not depend on xrow. For that Raft introduces
a new communication and persistence unit - struct raft_msg.
Interestingly, throughout its source code Raft already uses term
'message' to describe requests, so this patch also restores the
consistency. This is because raft_request name was used to be
consistent with other *_request structs in xrow.h. Now Raft does
not depend on this, and can use its own name.

Struct raft_msg repeats raft_request literally, but it actually
makes sense. Because when Raft is extracted to a new library, it
may start evolving independently. Its raft_msg may be populated
with new members, or their behaviour may change depending on how
the algorithm will evolve.

But inside box it will be possible to tweak and extend raft_msg
whenever it is necessary, via struct raft_request, and without
changing the basic library.

For instance, in future we may want to make nodes forward the
messages to each other during voting to speed the process up, and
for that we may want to add an explicit 'source' field to
raft_request, while it won't be necessary on the level of
raft_msg.

There is a new compatibility layer in src/box/raft.h which hides
raft_msg details from other box code, and does the msg <-> request
conversions.

Part of #5303
---
 src/box/applier.cc     |  2 +-
 src/box/box.cc         |  4 +--
 src/box/memtx_engine.c |  4 +--
 src/box/raft.c         | 70 ++++++++++++++++++++++++++++++++++++++----
 src/box/raft.h         | 24 +++++++++++++++
 src/box/raftlib.c      | 24 ++++++---------
 src/box/raftlib.h      | 38 ++++++++++++++++++-----
 src/box/xrow.h         |  4 +++
 8 files changed, 137 insertions(+), 33 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index fbde0eccd..fb2f5d130 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -893,7 +893,7 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
 	struct vclock candidate_clock;
 	if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
 		return -1;
-	return raft_process_msg(box_raft(), &req, applier->instance_id);
+	return box_raft_process(&req, applier->instance_id);
 }
 
 /**
diff --git a/src/box/box.cc b/src/box/box.cc
index ff80e45a4..7d23de95c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -394,7 +394,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 		/* Vclock is never persisted in WAL by Raft. */
 		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
 			diag_raise();
-		raft_process_recovery(box_raft(), &raft_req);
+		box_raft_recover(&raft_req);
 		return;
 	}
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
@@ -2142,7 +2142,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 		 * should be 0.
 		 */
 		struct raft_request req;
-		raft_serialize_for_network(box_raft(), &req);
+		box_raft_checkpoint_remote(&req);
 		xrow_encode_raft(&row, &fiber()->gc, &req);
 		coio_write_xrow(io, &row);
 	}
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 39d3ffa15..db2bb2333 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -210,7 +210,7 @@ memtx_engine_recover_raft(const struct xrow_header *row)
 	/* Vclock is never persisted in WAL by Raft. */
 	if (xrow_decode_raft(row, &req, NULL) != 0)
 		return -1;
-	raft_process_recovery(box_raft(), &req);
+	box_raft_recover(&req);
 	return 0;
 }
 
@@ -554,7 +554,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
 	opts.free_cache = true;
 	xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
 	vclock_create(&ckpt->vclock);
-	raft_serialize_for_disk(box_raft(), &ckpt->raft);
+	box_raft_checkpoint_local(&ckpt->raft);
 	ckpt->touch = false;
 	return ckpt;
 }
diff --git a/src/box/raft.c b/src/box/raft.c
index 845525660..f3652bbcb 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -49,6 +49,28 @@ struct raft box_raft_global = {
  */
 static struct trigger box_raft_on_update;
 
+static void
+box_raft_msg_to_request(const struct raft_msg *msg, struct raft_request *req)
+{
+	*req = (struct raft_request) {
+		.term = msg->term,
+		.vote = msg->vote,
+		.state = msg->state,
+		.vclock = msg->vclock,
+	};
+}
+
+static void
+box_raft_request_to_msg(const struct raft_request *req, struct raft_msg *msg)
+{
+	*msg = (struct raft_msg) {
+		.term = req->term,
+		.vote = req->vote,
+		.state = req->state,
+		.vclock = req->vclock,
+	};
+}
+
 static int
 box_raft_on_update_f(struct trigger *trigger, void *event)
 {
@@ -117,13 +139,47 @@ box_raft_reconsider_election_quorum(void)
 	raft_cfg_election_quorum(box_raft(), quorum);
 }
 
+void
+box_raft_recover(const struct raft_request *req)
+{
+	struct raft_msg msg;
+	box_raft_request_to_msg(req, &msg);
+	raft_process_recovery(box_raft(), &msg);
+}
+
+void
+box_raft_checkpoint_local(struct raft_request *req)
+{
+	struct raft_msg msg;
+	raft_checkpoint_local(box_raft(), &msg);
+	box_raft_msg_to_request(&msg, req);
+}
+
+void
+box_raft_checkpoint_remote(struct raft_request *req)
+{
+	struct raft_msg msg;
+	raft_checkpoint_remote(box_raft(), &msg);
+	box_raft_msg_to_request(&msg, req);
+}
+
+int
+box_raft_process(struct raft_request *req, uint32_t source)
+{
+	struct raft_msg msg;
+	box_raft_request_to_msg(req, &msg);
+	return raft_process_msg(box_raft(), &msg, source);
+}
+
 static void
-box_raft_broadcast(struct raft *raft, const struct raft_request *req)
+box_raft_broadcast(struct raft *raft, const struct raft_msg *msg)
 {
 	(void)raft;
 	assert(raft == box_raft());
+	struct raft_request req;
+	box_raft_msg_to_request(msg, &req);
 	replicaset_foreach(replica)
-		relay_push_raft(replica->relay, req);
+		relay_push_raft(replica->relay, &req);
 }
 
 /** Wakeup Raft state writer fiber waiting for WAL write end. */
@@ -134,14 +190,16 @@ box_raft_write_cb(struct journal_entry *entry)
 }
 
 static void
-box_raft_write(struct raft *raft, const struct raft_request *req)
+box_raft_write(struct raft *raft, const struct raft_msg *msg)
 {
 	(void)raft;
 	assert(raft == box_raft());
 	/* See Raft implementation why these fields are never written. */
-	assert(req->vclock == NULL);
-	assert(req->state == 0);
+	assert(msg->vclock == NULL);
+	assert(msg->state == 0);
 
+	struct raft_request req;
+	box_raft_msg_to_request(msg, &req);
 	struct region *region = &fiber()->gc;
 	uint32_t svp = region_used(region);
 	struct xrow_header row;
@@ -150,7 +208,7 @@ box_raft_write(struct raft *raft, const struct raft_request *req)
 	struct journal_entry *entry = (struct journal_entry *)buf;
 	entry->rows[0] = &row;
 
-	if (xrow_encode_raft(&row, region, req) != 0)
+	if (xrow_encode_raft(&row, region, &req) != 0)
 		goto fail;
 	journal_entry_create(entry, 1, xrow_approx_len(&row), box_raft_write_cb,
 			     fiber());
diff --git a/src/box/raft.h b/src/box/raft.h
index 09297273f..4dffce380 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -35,6 +35,8 @@
 extern "C" {
 #endif
 
+struct raft_request;
+
 /** Raft state of this instance. */
 static inline struct raft *
 box_raft(void)
@@ -56,6 +58,28 @@ box_raft(void)
 void
 box_raft_reconsider_election_quorum(void);
 
+/**
+ * Recovery a single Raft request. Raft state machine is not turned on yet, this
+ * works only during instance recovery from the journal.
+ */
+void
+box_raft_recover(const struct raft_request *req);
+
+/** Save complete Raft state into a request to be persisted on disk locally. */
+void
+box_raft_checkpoint_local(struct raft_request *req);
+
+/**
+ * Save complete Raft state into a request to be sent to other instances of the
+ * cluster.
+ */
+void
+box_raft_checkpoint_remote(struct raft_request *req);
+
+/** Handle a single Raft request from a node with instance id @a source. */
+int
+box_raft_process(struct raft_request *req, uint32_t source);
+
 void
 box_raft_init(void);
 
diff --git a/src/box/raftlib.c b/src/box/raftlib.c
index cc9139a5b..512dbd51f 100644
--- a/src/box/raftlib.c
+++ b/src/box/raftlib.c
@@ -32,7 +32,6 @@
 
 #include "error.h"
 #include "fiber.h"
-#include "xrow.h"
 #include "small/region.h"
 #include "box.h"
 #include "tt_static.h"
@@ -221,7 +220,7 @@ static void
 raft_sm_become_candidate(struct raft *raft);
 
 static const char *
-raft_request_to_string(const struct raft_request *req)
+raft_msg_to_string(const struct raft_msg *req)
 {
 	assert(req->term != 0);
 	char buf[1024];
@@ -259,9 +258,9 @@ raft_request_to_string(const struct raft_request *req)
 }
 
 void
-raft_process_recovery(struct raft *raft, const struct raft_request *req)
+raft_process_recovery(struct raft *raft, const struct raft_msg *req)
 {
-	say_verbose("RAFT: recover %s", raft_request_to_string(req));
+	say_verbose("RAFT: recover %s", raft_msg_to_string(req));
 	if (req->term != 0) {
 		raft->term = req->term;
 		raft->volatile_term = req->term;
@@ -287,11 +286,9 @@ raft_process_recovery(struct raft *raft, const struct raft_request *req)
 }
 
 int
-raft_process_msg(struct raft *raft, const struct raft_request *req,
-		 uint32_t source)
+raft_process_msg(struct raft *raft, const struct raft_msg *req, uint32_t source)
 {
-	say_info("RAFT: message %s from %u", raft_request_to_string(req),
-		 source);
+	say_info("RAFT: message %s from %u", raft_msg_to_string(req), source);
 	assert(source > 0);
 	assert(source != raft->self);
 	if (req->term == 0 || req->state == 0) {
@@ -474,7 +471,7 @@ raft_worker_handle_io(struct raft *raft)
 	assert(raft->is_write_in_progress);
 	/* During write Raft can't be anything but a follower. */
 	assert(raft->state == RAFT_STATE_FOLLOWER);
-	struct raft_request req;
+	struct raft_msg req;
 
 	if (raft_is_fully_on_disk(raft)) {
 end_dump:
@@ -524,8 +521,7 @@ end_dump:
 		 * another leader elected by that time likely.
 		 */
 		raft->vtab->write(raft, &req);
-		say_info("RAFT: persisted state %s",
-			 raft_request_to_string(&req));
+		say_info("RAFT: persisted state %s", raft_msg_to_string(&req));
 
 		assert(req.term >= raft->term);
 		raft->term = req.term;
@@ -544,7 +540,7 @@ static void
 raft_worker_handle_broadcast(struct raft *raft)
 {
 	assert(raft->is_broadcast_scheduled);
-	struct raft_request req;
+	struct raft_msg req;
 	memset(&req, 0, sizeof(req));
 	req.term = raft->term;
 	req.vote = raft->vote;
@@ -803,7 +799,7 @@ raft_sm_stop(struct raft *raft)
 }
 
 void
-raft_serialize_for_network(const struct raft *raft, struct raft_request *req)
+raft_checkpoint_remote(const struct raft *raft, struct raft_msg *req)
 {
 	memset(req, 0, sizeof(*req));
 	/*
@@ -822,7 +818,7 @@ raft_serialize_for_network(const struct raft *raft, struct raft_request *req)
 }
 
 void
-raft_serialize_for_disk(const struct raft *raft, struct raft_request *req)
+raft_checkpoint_local(const struct raft *raft, struct raft_msg *req)
 {
 	memset(req, 0, sizeof(*req));
 	req->term = raft->term;
diff --git a/src/box/raftlib.h b/src/box/raftlib.h
index 6181d9d49..4f4d24ca8 100644
--- a/src/box/raftlib.h
+++ b/src/box/raftlib.h
@@ -70,7 +70,6 @@ extern "C" {
 
 struct fiber;
 struct raft;
-struct raft_request;
 
 enum raft_state {
 	/**
@@ -95,9 +94,32 @@ enum raft_state {
 const char *
 raft_state_str(uint32_t state);
 
-typedef void (*raft_broadcast_f)(struct raft *raft,
-				 const struct raft_request *req);
-typedef void (*raft_write_f)(struct raft *raft, const struct raft_request *req);
+/**
+ * Basic Raft communication unit for talking to other nodes, and even to other
+ * subsystems such as disk storage.
+ */
+struct raft_msg {
+	/** Term of the instance. */
+	uint64_t term;
+	/**
+	 * Instance ID of the instance this node voted for in the current term.
+	 * 0 means the node didn't vote in this term.
+	 */
+	uint32_t vote;
+	/**
+	 * State of the instance. Can be 0 if the state does not matter for the
+	 * message. For instance, when the message is sent to disk.
+	 */
+	enum raft_state state;
+	/**
+	 * Vclock of the instance. Can be NULL, if the node is not a candidate.
+	 * Also is omitted when does not matter (when the message is for disk).
+	 */
+	const struct vclock *vclock;
+};
+
+typedef void (*raft_broadcast_f)(struct raft *raft, const struct raft_msg *req);
+typedef void (*raft_write_f)(struct raft *raft, const struct raft_msg *req);
 
 /**
  * Raft connection to the environment, via which it talks to other nodes and
@@ -226,11 +248,11 @@ raft_is_enabled(const struct raft *raft)
 
 /** Process a raft entry stored in WAL/snapshot. */
 void
-raft_process_recovery(struct raft *raft, const struct raft_request *req);
+raft_process_recovery(struct raft *raft, const struct raft_msg *req);
 
 /** Process a raft status message coming from the network. */
 int
-raft_process_msg(struct raft *raft, const struct raft_request *req,
+raft_process_msg(struct raft *raft, const struct raft_msg *req,
 		 uint32_t source);
 
 /**
@@ -297,14 +319,14 @@ raft_new_term(struct raft *raft);
  * cluster. It is allowed to save anything here, not only persistent state.
  */
 void
-raft_serialize_for_network(const struct raft *raft, struct raft_request *req);
+raft_checkpoint_remote(const struct raft *raft, struct raft_msg *req);
 
 /**
  * Save complete Raft state into a request to be persisted on disk. Only term
  * and vote are being persisted.
  */
 void
-raft_serialize_for_disk(const struct raft *raft, struct raft_request *req);
+raft_checkpoint_local(const struct raft *raft, struct raft_msg *req);
 
 /**
  * Add a trigger invoked each time any of the Raft node visible attributes are
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3d68c1268..fde8f9474 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -264,6 +264,10 @@ xrow_encode_synchro(struct xrow_header *row,
 int
 xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
 
+/**
+ * Raft request. It repeats Raft message to the letter, but can be extended in
+ * future not depending on the Raft library.
+ */
 struct raft_request {
 	uint64_t term;
 	uint32_t vote;
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list