[Tarantool-patches] [PATCH v3 07/10] raft: relay status updates to followers

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Sep 30 01:11:29 MSK 2020


From: sergepetrenko <sergepetrenko at tarantool.org>

The patch introduces a new type of system message used to notify the
followers of the instance's raft status updates.
It's relay's responsibility to deliver the new system rows to its peers.
The notification system reuses and extends the same row type used to
persist raft state in WAL and snapshot.

Part of #1146
Part of #5204
---
 src/box/applier.cc         |  28 ++++++-
 src/box/box.cc             |  17 ++++-
 src/box/iproto_constants.h |   2 +
 src/box/memtx_engine.c     |   3 +-
 src/box/raft.c             |  73 +++++++++++++++++-
 src/box/raft.h             |  35 ++++++++-
 src/box/relay.cc           | 150 ++++++++++++++++++++++++++++++++++++-
 src/box/relay.h            |   7 ++
 src/box/xrow.c             |  87 +++++++++++++++++----
 src/box/xrow.h             |   5 +-
 10 files changed, 382 insertions(+), 25 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 1a0b55640..9fed3c071 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -55,6 +55,7 @@
 #include "scoped_guard.h"
 #include "txn_limbo.h"
 #include "journal.h"
+#include "raft.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -878,6 +879,18 @@ err:
 	return -1;
 }
 
+static int
+applier_handle_raft(struct applier *applier, struct xrow_header *row)
+{
+	assert(iproto_type_is_raft_request(row->type));
+
+	struct raft_request req;
+	struct vclock candidate_clock;
+	if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
+		return -1;
+	return raft_process_msg(&req, applier->instance_id);
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -1222,11 +1235,20 @@ applier_subscribe(struct applier *applier)
 		 * In case of an heartbeat message wake a writer up
 		 * and check applier state.
 		 */
-		if (stailq_first_entry(&rows, struct applier_tx_row,
-				       next)->row.lsn == 0)
+		struct xrow_header *first_row =
+			&stailq_first_entry(&rows, struct applier_tx_row,
+					    next)->row;
+		if (first_row->lsn == 0) {
+			if (unlikely(iproto_type_is_raft_request(
+							first_row->type))) {
+				if (applier_handle_raft(applier,
+							first_row) != 0)
+					diag_raise();
+			}
 			applier_signal_ack(applier);
-		else if (applier_apply_tx(&rows) != 0)
+		} else if (applier_apply_tx(&rows) != 0) {
 			diag_raise();
+		}
 
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
diff --git a/src/box/box.cc b/src/box/box.cc
index 99a15bfd0..a8542cb38 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -387,7 +387,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 	}
 	if (iproto_type_is_raft_request(row->type)) {
 		struct raft_request raft_req;
-		if (xrow_decode_raft(row, &raft_req) != 0)
+		/* Vclock is never persisted in WAL by Raft. */
+		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
 			diag_raise();
 		raft_process_recovery(&raft_req);
 		return;
@@ -2146,7 +2147,19 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 		 tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
 	say_info("remote vclock %s local vclock %s",
 		 vclock_to_string(&replica_clock), vclock_to_string(&vclock));
-
+	if (raft_is_enabled()) {
+		/*
+		 * Send out the current raft state of the instance. Don't do
+		 * that if Raft is disabled. It can be that a part of the
+		 * cluster still contains old versions, which can't handle Raft
+		 * messages. So when it is disabled, its network footprint
+		 * should be 0.
+		 */
+		struct raft_request req;
+		raft_serialize_for_network(&req, &vclock);
+		xrow_encode_raft(&row, &fiber()->gc, &req);
+		coio_write_xrow(io, &row);
+	}
 	/*
 	 * Replica clock is used in gc state and recovery
 	 * initialization, so we need to replace the remote 0-th
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index f9347f555..d3738c705 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -264,6 +264,8 @@ extern const char *iproto_type_strs[];
 enum iproto_raft_keys {
 	IPROTO_RAFT_TERM = 0,
 	IPROTO_RAFT_VOTE = 1,
+	IPROTO_RAFT_STATE = 2,
+	IPROTO_RAFT_VCLOCK = 3,
 };
 
 /**
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 2f38c2647..8147557f6 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -207,7 +207,8 @@ memtx_engine_recover_raft(const struct xrow_header *row)
 {
 	assert(row->type == IPROTO_RAFT);
 	struct raft_request req;
-	if (xrow_decode_raft(row, &req) != 0)
+	/* Vclock is never persisted in WAL by Raft. */
+	if (xrow_decode_raft(row, &req, NULL) != 0)
 		return -1;
 	raft_process_recovery(&req);
 	return 0;
diff --git a/src/box/raft.c b/src/box/raft.c
index ee54d02b7..024433369 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -34,9 +34,20 @@
 #include "journal.h"
 #include "xrow.h"
 #include "small/region.h"
+#include "replication.h"
+#include "relay.h"
+
+const char *raft_state_strs[] = {
+	NULL,
+	"follower",
+	"candidate",
+	"leader",
+};
 
 /** Raft state of this instance. */
 struct raft raft = {
+	.leader = 0,
+	.state = RAFT_STATE_FOLLOWER,
 	.is_enabled = false,
 	.is_candidate = false,
 	.term = 1,
@@ -50,18 +61,71 @@ raft_process_recovery(const struct raft_request *req)
 		raft.term = req->term;
 	if (req->vote != 0)
 		raft.vote = req->vote;
+	/*
+	 * Role is never persisted. If recovery is happening, the
+	 * node was restarted, and the former role can be false
+	 * anyway.
+	 */
+	assert(req->state == 0);
+	/*
+	 * Vclock is always persisted by some other subsystem - WAL, snapshot.
+	 * It is used only to decide to whom to give the vote during election,
+	 * as a part of the volatile state.
+	 */
+	assert(req->vclock == NULL);
+	/* Raft is not enabled until recovery is finished. */
+	assert(!raft_is_enabled());
+}
+
+int
+raft_process_msg(const struct raft_request *req, uint32_t source)
+{
+	(void)source;
+	if (req->term > raft.term) {
+		// Update term.
+		// The logic will be similar, but the code
+		// below is for testing purposes.
+		raft.term = req->term;
+	}
+	if (req->vote > 0) {
+		// Check whether the vote's for us.
+	}
+	switch (req->state) {
+	case RAFT_STATE_FOLLOWER:
+	    break;
+	case RAFT_STATE_CANDIDATE:
+	    // Perform voting logic.
+	    break;
+	case RAFT_STATE_LEADER:
+	    // Switch to a new leader.
+	    break;
+	default:
+	    break;
+	}
+	return 0;
 }
 
 void
-raft_serialize_for_network(struct raft_request *req)
+raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
 {
+	memset(req, 0, sizeof(*req));
 	req->term = raft.term;
 	req->vote = raft.vote;
+	req->state = raft.state;
+	/*
+	 * Raft does not own vclock, so it always expects it passed externally.
+	 * Vclock is sent out only by candidate instances.
+	 */
+	if (req->state == RAFT_STATE_CANDIDATE) {
+		req->vclock = vclock;
+		vclock_copy(vclock, &replicaset.vclock);
+	}
 }
 
 void
 raft_serialize_for_disk(struct raft_request *req)
 {
+	memset(req, 0, sizeof(*req));
 	req->term = raft.term;
 	req->vote = raft.vote;
 }
@@ -93,3 +157,10 @@ void
 raft_cfg_death_timeout(void)
 {
 }
+
+void
+raft_broadcast(const struct raft_request *req)
+{
+	replicaset_foreach(replica)
+		relay_push_raft(replica->relay, req);
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index f27222752..8abde4f4c 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -37,8 +37,19 @@ extern "C" {
 #endif
 
 struct raft_request;
+struct vclock;
+
+enum raft_state {
+	RAFT_STATE_FOLLOWER = 1,
+	RAFT_STATE_CANDIDATE = 2,
+	RAFT_STATE_LEADER = 3,
+};
+
+extern const char *raft_state_strs[];
 
 struct raft {
+	uint32_t leader;
+	enum raft_state state;
 	bool is_enabled;
 	bool is_candidate;
 	uint64_t term;
@@ -48,10 +59,25 @@ struct raft {
 
 extern struct raft raft;
 
+/** Check if Raft is enabled. */
+static inline bool
+raft_is_enabled(void)
+{
+	return raft.is_enabled;
+}
+
 /** Process a raft entry stored in WAL/snapshot. */
 void
 raft_process_recovery(const struct raft_request *req);
 
+/**
+ * Process a raft status message coming from the network.
+ * @param req Raft request.
+ * @param source Instance ID of the message sender.
+ */
+int
+raft_process_msg(const struct raft_request *req, uint32_t source);
+
 /** Configure whether Raft is enabled. */
 void
 raft_cfg_is_enabled(bool is_enabled);
@@ -88,7 +114,7 @@ raft_cfg_death_timeout(void);
  * cluster. It is allowed to save anything here, not only persistent state.
  */
 void
-raft_serialize_for_network(struct raft_request *req);
+raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
 
 /**
  * Save complete Raft state into a request to be persisted on disk. Only term
@@ -97,6 +123,13 @@ raft_serialize_for_network(struct raft_request *req);
 void
 raft_serialize_for_disk(struct raft_request *req);
 
+/**
+ * Broadcast the changes in this instance's raft status to all
+ * the followers.
+ */
+void
+raft_broadcast(const struct raft_request *req);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 124b0f52f..76430caa6 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -53,6 +53,7 @@
 #include "xstream.h"
 #include "wal.h"
 #include "txn_limbo.h"
+#include "raft.h"
 
 /**
  * Cbus message to send status updates from relay to tx thread.
@@ -145,6 +146,12 @@ struct relay {
 		alignas(CACHELINE_SIZE)
 		/** Known relay vclock. */
 		struct vclock vclock;
+		/**
+		 * True if the relay needs Raft updates. It can live fine
+		 * without sending Raft updates, if it is a relay to an
+		 * anonymous replica, for example.
+		 */
+		bool is_raft_enabled;
 	} tx;
 };
 
@@ -572,6 +579,74 @@ relay_send_heartbeat(struct relay *relay)
 	}
 }
 
+/** A message to set Raft enabled flag in TX thread from a relay thread. */
+struct relay_is_raft_enabled_msg {
+	/** Base cbus message. */
+	struct cmsg base;
+	/**
+	 * First hop - TX thread, second hop - a relay thread, to notify about
+	 * the flag being set.
+	 */
+	struct cmsg_hop route[2];
+	/** Relay pointer to set the flag in. */
+	struct relay *relay;
+	/** New flag value. */
+	bool value;
+	/** Flag to wait for the flag being set, in a relay thread. */
+	bool is_finished;
+};
+
+/** TX thread part of the Raft flag setting, first hop. */
+static void
+tx_set_is_raft_enabled(struct cmsg *base)
+{
+	struct relay_is_raft_enabled_msg *msg =
+		(struct relay_is_raft_enabled_msg *)base;
+	msg->relay->tx.is_raft_enabled = msg->value;
+}
+
+/** Relay thread part of the Raft flag setting, second hop. */
+static void
+relay_set_is_raft_enabled(struct cmsg *base)
+{
+	struct relay_is_raft_enabled_msg *msg =
+		(struct relay_is_raft_enabled_msg *)base;
+	msg->is_finished = true;
+}
+
+/**
+ * Set relay Raft enabled flag from a relay thread to be accessed by the TX
+ * thread.
+ */
+static void
+relay_send_is_raft_enabled(struct relay *relay,
+			   struct relay_is_raft_enabled_msg *msg, bool value)
+{
+	msg->route[0].f = tx_set_is_raft_enabled;
+	msg->route[0].pipe = &relay->relay_pipe;
+	msg->route[1].f = relay_set_is_raft_enabled;
+	msg->route[1].pipe = NULL;
+	msg->relay = relay;
+	msg->value = value;
+	msg->is_finished = false;
+	cmsg_init(&msg->base, msg->route);
+	cpipe_push(&relay->tx_pipe, &msg->base);
+	/*
+	 * cbus_call() can't be used, because it works only if the sender thread
+	 * is a simple cbus_process() loop. But the relay thread is not -
+	 * instead it calls cbus_process() manually when ready. And the thread
+	 * loop consists of the main fiber wakeup. So cbus_call() would just
+	 * hang, because cbus_process() wouldn't be called by the scheduler
+	 * fiber.
+	 */
+	while (!msg->is_finished) {
+		cbus_process(&relay->endpoint);
+		if (msg->is_finished)
+			break;
+		fiber_yield();
+	}
+}
+
 /**
  * A libev callback invoked when a relay client socket is ready
  * for read. This currently only happens when the client closes
@@ -592,6 +667,10 @@ relay_subscribe_f(va_list ap)
 	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
 		  &relay->relay_pipe, NULL, NULL, cbus_process);
 
+	struct relay_is_raft_enabled_msg raft_enabler;
+	if (!relay->replica->anon)
+		relay_send_is_raft_enabled(relay, &raft_enabler, true);
+
 	/*
 	 * Setup garbage collection trigger.
 	 * Not needed for anonymous replicas, since they
@@ -671,6 +750,9 @@ relay_subscribe_f(va_list ap)
 		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
 	}
 
+	if (!relay->replica->anon)
+		relay_send_is_raft_enabled(relay, &raft_enabler, false);
+
 	/*
 	 * Log the error that caused the relay to break the loop.
 	 * Don't clear the error for status reporting.
@@ -770,13 +852,75 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
+struct relay_raft_msg {
+	struct cmsg base;
+	struct cmsg_hop route;
+	struct raft_request req;
+	struct vclock vclock;
+	struct relay *relay;
+};
+
+static void
+relay_raft_msg_push(struct cmsg *base)
+{
+	struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
+	struct xrow_header row;
+	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
+	try {
+		relay_send(msg->relay, &row);
+	} catch (Exception *e) {
+		relay_set_error(msg->relay, e);
+		fiber_cancel(fiber());
+	}
+	free(msg);
+}
+
+void
+relay_push_raft(struct relay *relay, const struct raft_request *req)
+{
+	/*
+	 * Raft updates don't stack. They are thrown away if can't be pushed
+	 * now. This is fine, as long as relay's live much longer that the
+	 * timeouts in Raft are set.
+	 */
+	if (!relay->tx.is_raft_enabled)
+		return;
+	/*
+	 * XXX: the message should be preallocated. It should
+	 * work like Kharon in IProto. Relay should have 2 raft
+	 * messages rotating. When one is sent, the other can be
+	 * updated and a flag is set. When the first message is
+	 * sent, the control returns to TX thread, sees the set
+	 * flag, rotates the buffers, and sends it again. And so
+	 * on. This is how it can work in future, with 0 heap
+	 * allocations. Current solution with alloc-per-update is
+	 * good enough as a start. Another option - wait until all
+	 * is moved to WAL thread, where this will all happen
+	 * in one thread and will be much simpler.
+	 */
+	struct relay_raft_msg *msg =
+		(struct relay_raft_msg *)malloc(sizeof(*msg));
+	if (msg == NULL) {
+		panic("Couldn't allocate raft message");
+		return;
+	}
+	msg->req = *req;
+	if (req->vclock != NULL) {
+		msg->req.vclock = &msg->vclock;
+		vclock_copy(&msg->vclock, req->vclock);
+	}
+	msg->route.f = relay_raft_msg_push;
+	msg->route.pipe = NULL;
+	cmsg_init(&msg->base, &msg->route);
+	msg->relay = relay;
+	cpipe_push(&relay->relay_pipe, &msg->base);
+}
+
 /** Send a single row to the client. */
 static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	assert(iproto_type_is_dml(packet->type) ||
-	       iproto_type_is_synchro_request(packet->type));
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
 		 * We do not relay replica-local rows to other
@@ -793,6 +937,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
 	}
+	assert(iproto_type_is_dml(packet->type) ||
+	       iproto_type_is_synchro_request(packet->type));
 	/* Check if the rows from the instance are filtered. */
 	if ((1 << packet->replica_id & relay->id_filter) != 0)
 		return;
diff --git a/src/box/relay.h b/src/box/relay.h
index 0632fa912..b32e2ea2a 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -93,6 +93,13 @@ relay_vclock(const struct relay *relay);
 double
 relay_last_row_time(const struct relay *relay);
 
+/**
+ * Send a Raft update request to the relay channel. It is not
+ * guaranteed that it will be delivered. The connection may break.
+ */
+void
+relay_push_raft(struct relay *relay, const struct raft_request *req);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index b9bbb19a0..da5c6ffae 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -962,11 +962,30 @@ int
 xrow_encode_raft(struct xrow_header *row, struct region *region,
 		 const struct raft_request *r)
 {
-	size_t size = mp_sizeof_map(2) +
-		      mp_sizeof_uint(IPROTO_RAFT_TERM) +
-		      mp_sizeof_uint(r->term) +
-		      mp_sizeof_uint(IPROTO_RAFT_VOTE) +
-		      mp_sizeof_uint(r->vote);
+	/*
+	 * Terms is encoded always. Sometimes the rest can be even ignored if
+	 * the term is too old.
+	 */
+	int map_size = 1;
+	size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) +
+		      mp_sizeof_uint(r->term);
+	if (r->vote != 0) {
+		++map_size;
+		size += mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+			mp_sizeof_uint(r->vote);
+	}
+	if (r->state != 0) {
+		++map_size;
+		size += mp_sizeof_uint(IPROTO_RAFT_STATE) +
+			mp_sizeof_uint(r->state);
+	}
+	if (r->vclock != NULL) {
+		++map_size;
+		size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) +
+			mp_sizeof_vclock_ignore0(r->vclock);
+	}
+	size += mp_sizeof_map(map_size);
+
 	char *buf = region_alloc(region, size);
 	if (buf == NULL) {
 		diag_set(OutOfMemory, size, "region_alloc", "buf");
@@ -975,43 +994,83 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
 	memset(row, 0, sizeof(*row));
 	row->type = IPROTO_RAFT;
 	row->body[0].iov_base = buf;
-	row->body[0].iov_len = size;
 	row->group_id = GROUP_LOCAL;
 	row->bodycnt = 1;
-	buf = mp_encode_map(buf, 2);
+	const char *begin = buf;
+
+	buf = mp_encode_map(buf, map_size);
 	buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
 	buf = mp_encode_uint(buf, r->term);
-	buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
-	buf = mp_encode_uint(buf, r->vote);
+	if (r->vote != 0) {
+		buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+		buf = mp_encode_uint(buf, r->vote);
+	}
+	if (r->state != 0) {
+		buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
+		buf = mp_encode_uint(buf, r->state);
+	}
+	if (r->vclock != NULL) {
+		buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK);
+		buf = mp_encode_vclock_ignore0(buf, r->vclock);
+	}
+	row->body[0].iov_len = buf - begin;
 	return 0;
 }
 
 int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+		 struct vclock *vclock)
 {
-	/* TODO: handle bad format. */
 	assert(row->type == IPROTO_RAFT);
-	assert(row->bodycnt == 1);
-	assert(row->group_id == GROUP_LOCAL);
+	if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) {
+		diag_set(ClientError, ER_INVALID_MSGPACK,
+			 "malformed raft request");
+		return -1;
+	}
 	memset(r, 0, sizeof(*r));
-	const char *pos = row->body[0].iov_base;
+
+	const char *begin = row->body[0].iov_base;
+	const char *end = begin + row->body[0].iov_len;
+	const char *pos = begin;
 	uint32_t map_size = mp_decode_map(&pos);
 	for (uint32_t i = 0; i < map_size; ++i)
 	{
+		if (mp_typeof(*pos) != MP_UINT)
+			goto bad_msgpack;
 		uint64_t key = mp_decode_uint(&pos);
 		switch (key) {
 		case IPROTO_RAFT_TERM:
+			if (mp_typeof(*pos) != MP_UINT)
+				goto bad_msgpack;
 			r->term = mp_decode_uint(&pos);
 			break;
 		case IPROTO_RAFT_VOTE:
+			if (mp_typeof(*pos) != MP_UINT)
+				goto bad_msgpack;
 			r->vote = mp_decode_uint(&pos);
 			break;
+		case IPROTO_RAFT_STATE:
+			if (mp_typeof(*pos) != MP_UINT)
+				goto bad_msgpack;
+			r->state = mp_decode_uint(&pos);
+			break;
+		case IPROTO_RAFT_VCLOCK:
+			r->vclock = vclock;
+			if (r->vclock == NULL)
+				mp_next(&pos);
+			else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0)
+				goto bad_msgpack;
+			break;
 		default:
 			mp_next(&pos);
 			break;
 		}
 	}
 	return 0;
+
+bad_msgpack:
+	xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body");
+	return -1;
 }
 
 int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 1740df614..25985ad7f 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -267,6 +267,8 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
 struct raft_request {
 	uint64_t term;
 	uint32_t vote;
+	uint32_t state;
+	struct vclock *vclock;
 };
 
 int
@@ -274,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
 		 const struct raft_request *r);
 
 int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+		 struct vclock *vclock);
 
 /**
  * CALL/EVAL request.
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list