[PATCH v3 03/11] Introduce IPROTO_REQUEST_STATUS command
Vladimir Davydov
vdavydov.dev at gmail.com
Sat Jul 14 23:49:18 MSK 2018
The new command is supposed to supersede IPROTO_REQUEST_VOTE, which is
difficult to extend, because it uses the global iproto key namespace.
The new command returns a map (IPROTO_STATUS), to which we can add
various information without polluting the global namespace. Currently,
the map contains IPROTO_STATUS_IS_RO and IPROTO_STATUS_VCLOCK keys,
but soon it will be added info needed for replica rebootstrap feature.
Needed for #461
---
src/box/applier.cc | 6 +--
src/box/applier.h | 8 ++--
src/box/box.cc | 7 ++++
src/box/box.h | 3 ++
src/box/iproto.cc | 7 ++++
src/box/iproto_constants.c | 3 +-
src/box/iproto_constants.h | 13 +++++-
src/box/replication.cc | 9 +++--
src/box/xrow.c | 98 +++++++++++++++++++++++++++++++++++++++++++++-
src/box/xrow.h | 80 +++++++++++++++++++++++--------------
10 files changed, 188 insertions(+), 46 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 556502bf..ad2710a3 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -218,14 +218,12 @@ applier_connect(struct applier *applier)
* It will be used for leader election on bootstrap.
*/
if (applier->version_id >= version_id(1, 7, 7)) {
- xrow_encode_request_vote(&row);
+ xrow_encode_status_request(&row);
coio_write_xrow(coio, &row);
coio_read_xrow(coio, ibuf, &row);
if (row.type != IPROTO_OK)
xrow_decode_error_xc(&row);
- vclock_create(&applier->vclock);
- xrow_decode_request_vote_xc(&row, &applier->vclock,
- &applier->remote_is_ro);
+ xrow_decode_status_xc(&row, &applier->remote_status);
}
applier_set_state(applier, APPLIER_CONNECTED);
diff --git a/src/box/applier.h b/src/box/applier.h
index c33562cc..29b4e5af 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -43,7 +43,7 @@
#include "tt_uuid.h"
#include "uri.h"
-#include "vclock.h"
+#include "xrow.h"
struct xstream;
@@ -94,10 +94,8 @@ struct applier {
struct uri uri;
/** Remote version encoded as a number, see version_id() macro */
uint32_t version_id;
- /** Remote vclock at time of connect. */
- struct vclock vclock;
- /** Remote peer mode, true if read-only, default: false */
- bool remote_is_ro;
+ /** Remote status at time of connect. */
+ struct status remote_status;
/** Remote address */
union {
struct sockaddr addr;
diff --git a/src/box/box.cc b/src/box/box.cc
index 7fc15f33..200e49a1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1564,6 +1564,13 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
replica_version_id);
}
+void
+box_process_status_request(struct status *status)
+{
+ status->is_ro = cfg_geti("read_only") != 0;
+ vclock_copy(&status->vclock, &replicaset.vclock);
+}
+
/** Insert a new cluster into _schema */
static void
box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
diff --git a/src/box/box.h b/src/box/box.h
index 182e1b72..8c38b416 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -163,6 +163,9 @@ box_process_join(struct ev_io *io, struct xrow_header *header);
void
box_process_subscribe(struct ev_io *io, struct xrow_header *header);
+void
+box_process_status_request(struct status *status);
+
/**
* Check Lua configuration before initialization or
* in case of a configuration change.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index cba81a22..17f161a3 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1159,6 +1159,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
*stop_input = true;
break;
case IPROTO_REQUEST_VOTE:
+ case IPROTO_REQUEST_STATUS:
cmsg_init(&msg->base, misc_route);
break;
case IPROTO_AUTH:
@@ -1526,6 +1527,7 @@ tx_process_misc(struct cmsg *m)
goto error;
try {
+ struct status status;
switch (msg->header.type) {
case IPROTO_AUTH:
box_process_auth(&msg->auth, con->salt);
@@ -1542,6 +1544,11 @@ tx_process_misc(struct cmsg *m)
&replicaset.vclock,
cfg_geti("read_only"));
break;
+ case IPROTO_REQUEST_STATUS:
+ box_process_status_request(&status);
+ iproto_reply_status_xc(out, &status, msg->header.sync,
+ ::schema_version);
+ break;
default:
unreachable();
}
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index 3bc965bd..bc7dfd7d 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -87,6 +87,7 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
/* 0x27 */ MP_STR, /* IPROTO_EXPR */
/* 0x28 */ MP_ARRAY, /* IPROTO_OPS */
/* 0x29 */ MP_BOOL, /* IPROTO_SERVER_IS_RO */
+ /* 0x2a */ MP_MAP, /* IPROTO_STATUS */
/* }}} */
};
@@ -168,7 +169,7 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
"expression", /* 0x27 */
"operations", /* 0x28 */
"server is ro", /* 0x29 */
- NULL, /* 0x2a */
+ "status", /* 0x2a */
NULL, /* 0x2b */
NULL, /* 0x2c */
NULL, /* 0x2d */
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index ccbf2da5..ce4366ec 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -78,12 +78,18 @@ enum iproto_key {
IPROTO_EXPR = 0x27, /* EVAL */
IPROTO_OPS = 0x28, /* UPSERT but not UPDATE ops, because of legacy */
IPROTO_SERVER_IS_RO = 0x29,
+ IPROTO_STATUS = 0x2a,
/* Leave a gap between request keys and response keys */
IPROTO_DATA = 0x30,
IPROTO_ERROR = 0x31,
IPROTO_KEY_MAX
};
+enum iproto_status_key {
+ IPROTO_STATUS_IS_RO = 0x01,
+ IPROTO_STATUS_VCLOCK = 0x02,
+};
+
#define bit(c) (1ULL<<IPROTO_##c)
#define IPROTO_HEAD_BMAP (bit(REQUEST_TYPE) | bit(SYNC) | bit(REPLICA_ID) |\
@@ -155,8 +161,13 @@ enum iproto_type {
IPROTO_JOIN = 65,
/** Replication SUBSCRIBE command */
IPROTO_SUBSCRIBE = 66,
- /** Vote request command for master election */
+ /**
+ * Vote request command for master election
+ * DEPRECATED: use IPROTO_REQUEST_STATUS instead
+ */
IPROTO_REQUEST_VOTE = 67,
+ /** Instance status request command */
+ IPROTO_REQUEST_STATUS = 68,
/** Vinyl run info stored in .index file */
VY_INDEX_RUN_INFO = 100,
diff --git a/src/box/replication.cc b/src/box/replication.cc
index c1e17698..f12244c9 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -732,7 +732,8 @@ replicaset_round(bool skip_ro)
{
struct replica *leader = NULL;
replicaset_foreach(replica) {
- if (replica->applier == NULL)
+ struct applier *applier = replica->applier;
+ if (applier == NULL)
continue;
/**
* While bootstrapping a new cluster, read-only
@@ -741,7 +742,7 @@ replicaset_round(bool skip_ro)
* replicas since there is still a possibility
* that all replicas exist in cluster table.
*/
- if (skip_ro && replica->applier->remote_is_ro)
+ if (skip_ro && applier->remote_status.is_ro)
continue;
if (leader == NULL) {
leader = replica;
@@ -753,8 +754,8 @@ replicaset_round(bool skip_ro)
* with the same vclock, prefer the one with
* the lowest uuid.
*/
- int cmp = vclock_compare(&replica->applier->vclock,
- &leader->applier->vclock);
+ int cmp = vclock_compare(&applier->remote_status.vclock,
+ &leader->applier->remote_status.vclock);
if (cmp < 0)
continue;
if (cmp == 0 && tt_uuid_compare(&replica->uuid,
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 56197d0e..4bc1f81e 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -344,6 +344,41 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync,
}
int
+iproto_reply_status(struct obuf *out, const struct status *status,
+ uint64_t sync, uint32_t schema_version)
+{
+ size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) +
+ mp_sizeof_uint(UINT32_MAX) + mp_sizeof_map(2) +
+ mp_sizeof_uint(UINT32_MAX) + mp_sizeof_bool(status->is_ro) +
+ mp_sizeof_uint(UINT32_MAX) + mp_sizeof_vclock(&status->vclock);
+
+ char *buf = obuf_reserve(out, max_size);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, max_size,
+ "obuf_alloc", "buf");
+ return -1;
+ }
+
+ char *data = buf + IPROTO_HEADER_LEN;
+ data = mp_encode_map(data, 1);
+ data = mp_encode_uint(data, IPROTO_STATUS);
+ data = mp_encode_map(data, 2);
+ data = mp_encode_uint(data, IPROTO_STATUS_IS_RO);
+ data = mp_encode_bool(data, status->is_ro);
+ data = mp_encode_uint(data, IPROTO_STATUS_VCLOCK);
+ data = mp_encode_vclock(data, &status->vclock);
+ size_t size = data - buf;
+ assert(size <= max_size);
+
+ iproto_header_encode(buf, IPROTO_OK, sync, schema_version,
+ size - IPROTO_HEADER_LEN);
+
+ char *ptr = obuf_alloc(out, size);
+ assert(ptr == buf);
+ return 0;
+}
+
+int
iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
uint32_t schema_version)
{
@@ -847,10 +882,69 @@ error:
}
void
-xrow_encode_request_vote(struct xrow_header *row)
+xrow_encode_status_request(struct xrow_header *row)
{
memset(row, 0, sizeof(*row));
- row->type = IPROTO_REQUEST_VOTE;
+ row->type = IPROTO_REQUEST_STATUS;
+}
+
+int
+xrow_decode_status(struct xrow_header *row, struct status *status)
+{
+ status->is_ro = false;
+ vclock_create(&status->vclock);
+
+ if (row->bodycnt == 0)
+ goto err;
+ assert(row->bodycnt == 1);
+
+ const char *data = (const char *) row->body[0].iov_base;
+ const char *end = data + row->body[0].iov_len;
+ const char *tmp = data;
+ if (mp_check(&tmp, end) != 0 || mp_typeof(*data) != MP_MAP)
+ goto err;
+
+ /* Find STATUS key. */
+ uint32_t map_size = mp_decode_map(&data);
+ for (uint32_t i = 0; i < map_size; i++) {
+ if (mp_typeof(*data) != MP_UINT) {
+ mp_next(&data); /* key */
+ mp_next(&data); /* value */
+ continue;
+ }
+ if (mp_decode_uint(&data) == IPROTO_STATUS)
+ break;
+ }
+ if (data == end)
+ return 0;
+
+ /* Decode STATUS map. */
+ map_size = mp_decode_map(&data);
+ for (uint32_t i = 0; i < map_size; i++) {
+ if (mp_typeof(*data) != MP_UINT) {
+ mp_next(&data); /* key */
+ mp_next(&data); /* value */
+ continue;
+ }
+ uint32_t key = mp_decode_uint(&data);
+ switch (key) {
+ case IPROTO_STATUS_IS_RO:
+ if (mp_typeof(*data) != MP_BOOL)
+ goto err;
+ status->is_ro = mp_decode_bool(&data);
+ break;
+ case IPROTO_STATUS_VCLOCK:
+ if (mp_decode_vclock(&data, &status->vclock) != 0)
+ goto err;
+ break;
+ default:
+ mp_next(&data);
+ }
+ }
+ return 0;
+err:
+ diag_set(ClientError, ER_INVALID_MSGPACK, "packet body");
+ return -1;
}
int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 92ea3c97..67910d52 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -30,19 +30,19 @@
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
+#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>
#include <sys/uio.h> /* struct iovec */
#include "tt_uuid.h"
#include "diag.h"
+#include "vclock.h"
#if defined(__cplusplus)
extern "C" {
#endif
-struct vclock;
-
enum {
XROW_HEADER_IOVMAX = 1,
XROW_BODY_IOVMAX = 2,
@@ -223,12 +223,28 @@ xrow_encode_auth(struct xrow_header *row, const char *salt, size_t salt_len,
const char *login, size_t login_len, const char *password,
size_t password_len);
+/** Instance status. */
+struct status {
+ /** Set if the instance is running in read-only mode. */
+ bool is_ro;
+ /** Current instance vclock. */
+ struct vclock vclock;
+};
+
/**
- * Encode a vote request for master election.
+ * Decode STATUS from MessagePack.
+ * @param row Row to decode.
+ * @param[out] status
+ */
+int
+xrow_decode_status(struct xrow_header *row, struct status *status);
+
+/**
+ * Encode an instance status request.
* @param row[out] Row to encode into.
*/
void
-xrow_encode_request_vote(struct xrow_header *row);
+xrow_encode_status_request(struct xrow_header *row);
/**
* Encode SUBSCRIBE command.
@@ -315,22 +331,6 @@ xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
}
/**
- * Decode peer vclock and access rights (a response to VOTE command).
- * @param row Row to decode.
- * @param[out] vclock.
- * @param[out] read_only.
- *
- * @retval 0 Success.
- * @retval -1 Memory or format error.
- */
-static inline int
-xrow_decode_request_vote(struct xrow_header *row, struct vclock *vclock,
- bool *read_only)
-{
- return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, read_only);
-}
-
-/**
* Encode a heartbeat message.
* @param row[out] Row to encode into.
* @param replica_id Instance id.
@@ -405,6 +405,20 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync,
bool read_only);
/**
+ * Encode a reply to an instance status request.
+ * @param out Buffer to write to.
+ * @param status Instance status to encode.
+ * @param sync Request sync.
+ * @param schema_version Actual schema version.
+ *
+ * @retval 0 Success.
+ * @retval -1 Memory error.
+ */
+int
+iproto_reply_status(struct obuf *out, const struct status *status,
+ uint64_t sync, uint32_t schema_version);
+
+/**
* Write an error packet int output buffer. Doesn't throw if out
* of memory
*/
@@ -585,6 +599,14 @@ xrow_encode_auth_xc(struct xrow_header *row, const char *salt, size_t salt_len,
diag_raise();
}
+/** @copydoc xrow_decode_status. */
+static inline void
+xrow_decode_status_xc(struct xrow_header *row, struct status *status)
+{
+ if (xrow_decode_status(row, status) != 0)
+ diag_raise();
+}
+
/** @copydoc xrow_encode_subscribe. */
static inline void
xrow_encode_subscribe_xc(struct xrow_header *row,
@@ -642,15 +664,6 @@ xrow_decode_vclock_xc(struct xrow_header *row, struct vclock *vclock)
diag_raise();
}
-/** @copydoc xrow_decode_request_vote. */
-static inline void
-xrow_decode_request_vote_xc(struct xrow_header *row, struct vclock *vclock,
- bool *read_only)
-{
- if (xrow_decode_request_vote(row, vclock, read_only) != 0)
- diag_raise();
-}
-
/** @copydoc iproto_reply_ok. */
static inline void
iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version)
@@ -670,6 +683,15 @@ iproto_reply_request_vote_xc(struct obuf *out, uint64_t sync,
diag_raise();
}
+/** @copydoc iproto_reply_status. */
+static inline void
+iproto_reply_status_xc(struct obuf *out, const struct status *status,
+ uint64_t sync, uint32_t schema_version)
+{
+ if (iproto_reply_status(out, status, sync, schema_version) != 0)
+ diag_raise();
+}
+
#endif
#endif /* TARANTOOL_XROW_H_INCLUDED */
--
2.11.0
More information about the Tarantool-patches
mailing list