[PATCH v3 03/11] Introduce IPROTO_REQUEST_STATUS command

Vladimir Davydov vdavydov.dev at gmail.com
Sat Jul 14 23:49:18 MSK 2018


The new command is supposed to supersede IPROTO_REQUEST_VOTE, which is
difficult to extend, because it uses the global iproto key namespace.
The new command returns a map (IPROTO_STATUS), to which we can add
various information without polluting the global namespace. Currently,
the map contains IPROTO_STATUS_IS_RO and IPROTO_STATUS_VCLOCK keys,
but soon it will be added info needed for replica rebootstrap feature.

Needed for #461
---
 src/box/applier.cc         |  6 +--
 src/box/applier.h          |  8 ++--
 src/box/box.cc             |  7 ++++
 src/box/box.h              |  3 ++
 src/box/iproto.cc          |  7 ++++
 src/box/iproto_constants.c |  3 +-
 src/box/iproto_constants.h | 13 +++++-
 src/box/replication.cc     |  9 +++--
 src/box/xrow.c             | 98 +++++++++++++++++++++++++++++++++++++++++++++-
 src/box/xrow.h             | 80 +++++++++++++++++++++++--------------
 10 files changed, 188 insertions(+), 46 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 556502bf..ad2710a3 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -218,14 +218,12 @@ applier_connect(struct applier *applier)
 	 * It will be used for leader election on bootstrap.
 	 */
 	if (applier->version_id >= version_id(1, 7, 7)) {
-		xrow_encode_request_vote(&row);
+		xrow_encode_status_request(&row);
 		coio_write_xrow(coio, &row);
 		coio_read_xrow(coio, ibuf, &row);
 		if (row.type != IPROTO_OK)
 			xrow_decode_error_xc(&row);
-		vclock_create(&applier->vclock);
-		xrow_decode_request_vote_xc(&row, &applier->vclock,
-					    &applier->remote_is_ro);
+		xrow_decode_status_xc(&row, &applier->remote_status);
 	}
 
 	applier_set_state(applier, APPLIER_CONNECTED);
diff --git a/src/box/applier.h b/src/box/applier.h
index c33562cc..29b4e5af 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -43,7 +43,7 @@
 #include "tt_uuid.h"
 #include "uri.h"
 
-#include "vclock.h"
+#include "xrow.h"
 
 struct xstream;
 
@@ -94,10 +94,8 @@ struct applier {
 	struct uri uri;
 	/** Remote version encoded as a number, see version_id() macro */
 	uint32_t version_id;
-	/** Remote vclock at time of connect. */
-	struct vclock vclock;
-	/** Remote peer mode, true if read-only, default: false */
-	bool remote_is_ro;
+	/** Remote status at time of connect. */
+	struct status remote_status;
 	/** Remote address */
 	union {
 		struct sockaddr addr;
diff --git a/src/box/box.cc b/src/box/box.cc
index 7fc15f33..200e49a1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1564,6 +1564,13 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 			replica_version_id);
 }
 
+void
+box_process_status_request(struct status *status)
+{
+	status->is_ro = cfg_geti("read_only") != 0;
+	vclock_copy(&status->vclock, &replicaset.vclock);
+}
+
 /** Insert a new cluster into _schema */
 static void
 box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
diff --git a/src/box/box.h b/src/box/box.h
index 182e1b72..8c38b416 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -163,6 +163,9 @@ box_process_join(struct ev_io *io, struct xrow_header *header);
 void
 box_process_subscribe(struct ev_io *io, struct xrow_header *header);
 
+void
+box_process_status_request(struct status *status);
+
 /**
  * Check Lua configuration before initialization or
  * in case of a configuration change.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index cba81a22..17f161a3 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1159,6 +1159,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		*stop_input = true;
 		break;
 	case IPROTO_REQUEST_VOTE:
+	case IPROTO_REQUEST_STATUS:
 		cmsg_init(&msg->base, misc_route);
 		break;
 	case IPROTO_AUTH:
@@ -1526,6 +1527,7 @@ tx_process_misc(struct cmsg *m)
 		goto error;
 
 	try {
+		struct status status;
 		switch (msg->header.type) {
 		case IPROTO_AUTH:
 			box_process_auth(&msg->auth, con->salt);
@@ -1542,6 +1544,11 @@ tx_process_misc(struct cmsg *m)
 						     &replicaset.vclock,
 						     cfg_geti("read_only"));
 			break;
+		case IPROTO_REQUEST_STATUS:
+			box_process_status_request(&status);
+			iproto_reply_status_xc(out, &status, msg->header.sync,
+					       ::schema_version);
+			break;
 		default:
 			unreachable();
 		}
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index 3bc965bd..bc7dfd7d 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -87,6 +87,7 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
 	/* 0x27 */	MP_STR, /* IPROTO_EXPR */
 	/* 0x28 */	MP_ARRAY, /* IPROTO_OPS */
 	/* 0x29 */	MP_BOOL, /* IPROTO_SERVER_IS_RO */
+	/* 0x2a */	MP_MAP, /* IPROTO_STATUS */
 	/* }}} */
 };
 
@@ -168,7 +169,7 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
 	"expression",       /* 0x27 */
 	"operations",       /* 0x28 */
 	"server is ro",     /* 0x29 */
-	NULL,               /* 0x2a */
+	"status",           /* 0x2a */
 	NULL,               /* 0x2b */
 	NULL,               /* 0x2c */
 	NULL,               /* 0x2d */
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index ccbf2da5..ce4366ec 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -78,12 +78,18 @@ enum iproto_key {
 	IPROTO_EXPR = 0x27, /* EVAL */
 	IPROTO_OPS = 0x28, /* UPSERT but not UPDATE ops, because of legacy */
 	IPROTO_SERVER_IS_RO = 0x29,
+	IPROTO_STATUS = 0x2a,
 	/* Leave a gap between request keys and response keys */
 	IPROTO_DATA = 0x30,
 	IPROTO_ERROR = 0x31,
 	IPROTO_KEY_MAX
 };
 
+enum iproto_status_key {
+	IPROTO_STATUS_IS_RO = 0x01,
+	IPROTO_STATUS_VCLOCK = 0x02,
+};
+
 #define bit(c) (1ULL<<IPROTO_##c)
 
 #define IPROTO_HEAD_BMAP (bit(REQUEST_TYPE) | bit(SYNC) | bit(REPLICA_ID) |\
@@ -155,8 +161,13 @@ enum iproto_type {
 	IPROTO_JOIN = 65,
 	/** Replication SUBSCRIBE command */
 	IPROTO_SUBSCRIBE = 66,
-	/** Vote request command for master election */
+	/**
+	 * Vote request command for master election
+	 * DEPRECATED: use IPROTO_REQUEST_STATUS instead
+	 */
 	IPROTO_REQUEST_VOTE = 67,
+	/** Instance status request command */
+	IPROTO_REQUEST_STATUS = 68,
 
 	/** Vinyl run info stored in .index file */
 	VY_INDEX_RUN_INFO = 100,
diff --git a/src/box/replication.cc b/src/box/replication.cc
index c1e17698..f12244c9 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -732,7 +732,8 @@ replicaset_round(bool skip_ro)
 {
 	struct replica *leader = NULL;
 	replicaset_foreach(replica) {
-		if (replica->applier == NULL)
+		struct applier *applier = replica->applier;
+		if (applier == NULL)
 			continue;
 		/**
 		 * While bootstrapping a new cluster, read-only
@@ -741,7 +742,7 @@ replicaset_round(bool skip_ro)
 		 * replicas since there is still a possibility
 		 * that all replicas exist in cluster table.
 		 */
-		if (skip_ro && replica->applier->remote_is_ro)
+		if (skip_ro && applier->remote_status.is_ro)
 			continue;
 		if (leader == NULL) {
 			leader = replica;
@@ -753,8 +754,8 @@ replicaset_round(bool skip_ro)
 		 * with the same vclock, prefer the one with
 		 * the lowest uuid.
 		 */
-		int cmp = vclock_compare(&replica->applier->vclock,
-					 &leader->applier->vclock);
+		int cmp = vclock_compare(&applier->remote_status.vclock,
+				&leader->applier->remote_status.vclock);
 		if (cmp < 0)
 			continue;
 		if (cmp == 0 && tt_uuid_compare(&replica->uuid,
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 56197d0e..4bc1f81e 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -344,6 +344,41 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync,
 }
 
 int
+iproto_reply_status(struct obuf *out, const struct status *status,
+		    uint64_t sync, uint32_t schema_version)
+{
+	size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) +
+		mp_sizeof_uint(UINT32_MAX) + mp_sizeof_map(2) +
+		mp_sizeof_uint(UINT32_MAX) + mp_sizeof_bool(status->is_ro) +
+		mp_sizeof_uint(UINT32_MAX) + mp_sizeof_vclock(&status->vclock);
+
+	char *buf = obuf_reserve(out, max_size);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, max_size,
+			 "obuf_alloc", "buf");
+		return -1;
+	}
+
+	char *data = buf + IPROTO_HEADER_LEN;
+	data = mp_encode_map(data, 1);
+	data = mp_encode_uint(data, IPROTO_STATUS);
+	data = mp_encode_map(data, 2);
+	data = mp_encode_uint(data, IPROTO_STATUS_IS_RO);
+	data = mp_encode_bool(data, status->is_ro);
+	data = mp_encode_uint(data, IPROTO_STATUS_VCLOCK);
+	data = mp_encode_vclock(data, &status->vclock);
+	size_t size = data - buf;
+	assert(size <= max_size);
+
+	iproto_header_encode(buf, IPROTO_OK, sync, schema_version,
+			     size - IPROTO_HEADER_LEN);
+
+	char *ptr = obuf_alloc(out, size);
+	assert(ptr == buf);
+	return 0;
+}
+
+int
 iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
 		   uint32_t schema_version)
 {
@@ -847,10 +882,69 @@ error:
 }
 
 void
-xrow_encode_request_vote(struct xrow_header *row)
+xrow_encode_status_request(struct xrow_header *row)
 {
 	memset(row, 0, sizeof(*row));
-	row->type = IPROTO_REQUEST_VOTE;
+	row->type = IPROTO_REQUEST_STATUS;
+}
+
+int
+xrow_decode_status(struct xrow_header *row, struct status *status)
+{
+	status->is_ro = false;
+	vclock_create(&status->vclock);
+
+	if (row->bodycnt == 0)
+		goto err;
+	assert(row->bodycnt == 1);
+
+	const char *data = (const char *) row->body[0].iov_base;
+	const char *end = data + row->body[0].iov_len;
+	const char *tmp = data;
+	if (mp_check(&tmp, end) != 0 || mp_typeof(*data) != MP_MAP)
+		goto err;
+
+	/* Find STATUS key. */
+	uint32_t map_size = mp_decode_map(&data);
+	for (uint32_t i = 0; i < map_size; i++) {
+		if (mp_typeof(*data) != MP_UINT) {
+			mp_next(&data); /* key */
+			mp_next(&data); /* value */
+			continue;
+		}
+		if (mp_decode_uint(&data) == IPROTO_STATUS)
+			break;
+	}
+	if (data == end)
+		return 0;
+
+	/* Decode STATUS map. */
+	map_size = mp_decode_map(&data);
+	for (uint32_t i = 0; i < map_size; i++) {
+		if (mp_typeof(*data) != MP_UINT) {
+			mp_next(&data); /* key */
+			mp_next(&data); /* value */
+			continue;
+		}
+		uint32_t key = mp_decode_uint(&data);
+		switch (key) {
+		case IPROTO_STATUS_IS_RO:
+			if (mp_typeof(*data) != MP_BOOL)
+				goto err;
+			status->is_ro = mp_decode_bool(&data);
+			break;
+		case IPROTO_STATUS_VCLOCK:
+			if (mp_decode_vclock(&data, &status->vclock) != 0)
+				goto err;
+			break;
+		default:
+			mp_next(&data);
+		}
+	}
+	return 0;
+err:
+	diag_set(ClientError, ER_INVALID_MSGPACK, "packet body");
+	return -1;
 }
 
 int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 92ea3c97..67910d52 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -30,19 +30,19 @@
  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+#include <stdbool.h>
 #include <stdint.h>
 #include <stddef.h>
 #include <sys/uio.h> /* struct iovec */
 
 #include "tt_uuid.h"
 #include "diag.h"
+#include "vclock.h"
 
 #if defined(__cplusplus)
 extern "C" {
 #endif
 
-struct vclock;
-
 enum {
 	XROW_HEADER_IOVMAX = 1,
 	XROW_BODY_IOVMAX = 2,
@@ -223,12 +223,28 @@ xrow_encode_auth(struct xrow_header *row, const char *salt, size_t salt_len,
 		 const char *login, size_t login_len, const char *password,
 		 size_t password_len);
 
+/** Instance status. */
+struct status {
+	/** Set if the instance is running in read-only mode. */
+	bool is_ro;
+	/** Current instance vclock. */
+	struct vclock vclock;
+};
+
 /**
- * Encode a vote request for master election.
+ * Decode STATUS from MessagePack.
+ * @param row Row to decode.
+ * @param[out] status
+ */
+int
+xrow_decode_status(struct xrow_header *row, struct status *status);
+
+/**
+ * Encode an instance status request.
  * @param row[out] Row to encode into.
  */
 void
-xrow_encode_request_vote(struct xrow_header *row);
+xrow_encode_status_request(struct xrow_header *row);
 
 /**
  * Encode SUBSCRIBE command.
@@ -315,22 +331,6 @@ xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
 }
 
 /**
- * Decode peer vclock and access rights (a response to VOTE command).
- * @param row Row to decode.
- * @param[out] vclock.
- * @param[out] read_only.
- *
- * @retval  0 Success.
- * @retval -1 Memory or format error.
- */
-static inline int
-xrow_decode_request_vote(struct xrow_header *row, struct vclock *vclock,
-			 bool *read_only)
-{
-	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, read_only);
-}
-
-/**
  * Encode a heartbeat message.
  * @param row[out] Row to encode into.
  * @param replica_id Instance id.
@@ -405,6 +405,20 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync,
 			 bool read_only);
 
 /**
+ * Encode a reply to an instance status request.
+ * @param out Buffer to write to.
+ * @param status Instance status to encode.
+ * @param sync Request sync.
+ * @param schema_version Actual schema version.
+ *
+ * @retval  0 Success.
+ * @retval -1 Memory error.
+ */
+int
+iproto_reply_status(struct obuf *out, const struct status *status,
+		    uint64_t sync, uint32_t schema_version);
+
+/**
  * Write an error packet int output buffer. Doesn't throw if out
  * of memory
  */
@@ -585,6 +599,14 @@ xrow_encode_auth_xc(struct xrow_header *row, const char *salt, size_t salt_len,
 		diag_raise();
 }
 
+/** @copydoc xrow_decode_status. */
+static inline void
+xrow_decode_status_xc(struct xrow_header *row, struct status *status)
+{
+	if (xrow_decode_status(row, status) != 0)
+		diag_raise();
+}
+
 /** @copydoc xrow_encode_subscribe. */
 static inline void
 xrow_encode_subscribe_xc(struct xrow_header *row,
@@ -642,15 +664,6 @@ xrow_decode_vclock_xc(struct xrow_header *row, struct vclock *vclock)
 		diag_raise();
 }
 
-/** @copydoc xrow_decode_request_vote. */
-static inline void
-xrow_decode_request_vote_xc(struct xrow_header *row, struct vclock *vclock,
-			    bool *read_only)
-{
-	if (xrow_decode_request_vote(row, vclock, read_only) != 0)
-		diag_raise();
-}
-
 /** @copydoc iproto_reply_ok. */
 static inline void
 iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version)
@@ -670,6 +683,15 @@ iproto_reply_request_vote_xc(struct obuf *out, uint64_t sync,
 		diag_raise();
 }
 
+/** @copydoc iproto_reply_status. */
+static inline void
+iproto_reply_status_xc(struct obuf *out, const struct status *status,
+		       uint64_t sync, uint32_t schema_version)
+{
+	if (iproto_reply_status(out, status, sync, schema_version) != 0)
+		diag_raise();
+}
+
 #endif
 
 #endif /* TARANTOOL_XROW_H_INCLUDED */
-- 
2.11.0




More information about the Tarantool-patches mailing list