From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH v3 03/11] Introduce IPROTO_REQUEST_STATUS command Date: Sat, 14 Jul 2018 23:49:18 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: The new command is supposed to supersede IPROTO_REQUEST_VOTE, which is difficult to extend, because it uses the global iproto key namespace. The new command returns a map (IPROTO_STATUS), to which we can add various information without polluting the global namespace. Currently, the map contains IPROTO_STATUS_IS_RO and IPROTO_STATUS_VCLOCK keys, but soon it will be added info needed for replica rebootstrap feature. Needed for #461 --- src/box/applier.cc | 6 +-- src/box/applier.h | 8 ++-- src/box/box.cc | 7 ++++ src/box/box.h | 3 ++ src/box/iproto.cc | 7 ++++ src/box/iproto_constants.c | 3 +- src/box/iproto_constants.h | 13 +++++- src/box/replication.cc | 9 +++-- src/box/xrow.c | 98 +++++++++++++++++++++++++++++++++++++++++++++- src/box/xrow.h | 80 +++++++++++++++++++++++-------------- 10 files changed, 188 insertions(+), 46 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 556502bf..ad2710a3 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -218,14 +218,12 @@ applier_connect(struct applier *applier) * It will be used for leader election on bootstrap. */ if (applier->version_id >= version_id(1, 7, 7)) { - xrow_encode_request_vote(&row); + xrow_encode_status_request(&row); coio_write_xrow(coio, &row); coio_read_xrow(coio, ibuf, &row); if (row.type != IPROTO_OK) xrow_decode_error_xc(&row); - vclock_create(&applier->vclock); - xrow_decode_request_vote_xc(&row, &applier->vclock, - &applier->remote_is_ro); + xrow_decode_status_xc(&row, &applier->remote_status); } applier_set_state(applier, APPLIER_CONNECTED); diff --git a/src/box/applier.h b/src/box/applier.h index c33562cc..29b4e5af 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -43,7 +43,7 @@ #include "tt_uuid.h" #include "uri.h" -#include "vclock.h" +#include "xrow.h" struct xstream; @@ -94,10 +94,8 @@ struct applier { struct uri uri; /** Remote version encoded as a number, see version_id() macro */ uint32_t version_id; - /** Remote vclock at time of connect. */ - struct vclock vclock; - /** Remote peer mode, true if read-only, default: false */ - bool remote_is_ro; + /** Remote status at time of connect. */ + struct status remote_status; /** Remote address */ union { struct sockaddr addr; diff --git a/src/box/box.cc b/src/box/box.cc index 7fc15f33..200e49a1 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1564,6 +1564,13 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) replica_version_id); } +void +box_process_status_request(struct status *status) +{ + status->is_ro = cfg_geti("read_only") != 0; + vclock_copy(&status->vclock, &replicaset.vclock); +} + /** Insert a new cluster into _schema */ static void box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid) diff --git a/src/box/box.h b/src/box/box.h index 182e1b72..8c38b416 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -163,6 +163,9 @@ box_process_join(struct ev_io *io, struct xrow_header *header); void box_process_subscribe(struct ev_io *io, struct xrow_header *header); +void +box_process_status_request(struct status *status); + /** * Check Lua configuration before initialization or * in case of a configuration change. diff --git a/src/box/iproto.cc b/src/box/iproto.cc index cba81a22..17f161a3 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -1159,6 +1159,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, *stop_input = true; break; case IPROTO_REQUEST_VOTE: + case IPROTO_REQUEST_STATUS: cmsg_init(&msg->base, misc_route); break; case IPROTO_AUTH: @@ -1526,6 +1527,7 @@ tx_process_misc(struct cmsg *m) goto error; try { + struct status status; switch (msg->header.type) { case IPROTO_AUTH: box_process_auth(&msg->auth, con->salt); @@ -1542,6 +1544,11 @@ tx_process_misc(struct cmsg *m) &replicaset.vclock, cfg_geti("read_only")); break; + case IPROTO_REQUEST_STATUS: + box_process_status_request(&status); + iproto_reply_status_xc(out, &status, msg->header.sync, + ::schema_version); + break; default: unreachable(); } diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index 3bc965bd..bc7dfd7d 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -87,6 +87,7 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = /* 0x27 */ MP_STR, /* IPROTO_EXPR */ /* 0x28 */ MP_ARRAY, /* IPROTO_OPS */ /* 0x29 */ MP_BOOL, /* IPROTO_SERVER_IS_RO */ + /* 0x2a */ MP_MAP, /* IPROTO_STATUS */ /* }}} */ }; @@ -168,7 +169,7 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = { "expression", /* 0x27 */ "operations", /* 0x28 */ "server is ro", /* 0x29 */ - NULL, /* 0x2a */ + "status", /* 0x2a */ NULL, /* 0x2b */ NULL, /* 0x2c */ NULL, /* 0x2d */ diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index ccbf2da5..ce4366ec 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -78,12 +78,18 @@ enum iproto_key { IPROTO_EXPR = 0x27, /* EVAL */ IPROTO_OPS = 0x28, /* UPSERT but not UPDATE ops, because of legacy */ IPROTO_SERVER_IS_RO = 0x29, + IPROTO_STATUS = 0x2a, /* Leave a gap between request keys and response keys */ IPROTO_DATA = 0x30, IPROTO_ERROR = 0x31, IPROTO_KEY_MAX }; +enum iproto_status_key { + IPROTO_STATUS_IS_RO = 0x01, + IPROTO_STATUS_VCLOCK = 0x02, +}; + #define bit(c) (1ULL<applier == NULL) + struct applier *applier = replica->applier; + if (applier == NULL) continue; /** * While bootstrapping a new cluster, read-only @@ -741,7 +742,7 @@ replicaset_round(bool skip_ro) * replicas since there is still a possibility * that all replicas exist in cluster table. */ - if (skip_ro && replica->applier->remote_is_ro) + if (skip_ro && applier->remote_status.is_ro) continue; if (leader == NULL) { leader = replica; @@ -753,8 +754,8 @@ replicaset_round(bool skip_ro) * with the same vclock, prefer the one with * the lowest uuid. */ - int cmp = vclock_compare(&replica->applier->vclock, - &leader->applier->vclock); + int cmp = vclock_compare(&applier->remote_status.vclock, + &leader->applier->remote_status.vclock); if (cmp < 0) continue; if (cmp == 0 && tt_uuid_compare(&replica->uuid, diff --git a/src/box/xrow.c b/src/box/xrow.c index 56197d0e..4bc1f81e 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -344,6 +344,41 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync, } int +iproto_reply_status(struct obuf *out, const struct status *status, + uint64_t sync, uint32_t schema_version) +{ + size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) + + mp_sizeof_uint(UINT32_MAX) + mp_sizeof_map(2) + + mp_sizeof_uint(UINT32_MAX) + mp_sizeof_bool(status->is_ro) + + mp_sizeof_uint(UINT32_MAX) + mp_sizeof_vclock(&status->vclock); + + char *buf = obuf_reserve(out, max_size); + if (buf == NULL) { + diag_set(OutOfMemory, max_size, + "obuf_alloc", "buf"); + return -1; + } + + char *data = buf + IPROTO_HEADER_LEN; + data = mp_encode_map(data, 1); + data = mp_encode_uint(data, IPROTO_STATUS); + data = mp_encode_map(data, 2); + data = mp_encode_uint(data, IPROTO_STATUS_IS_RO); + data = mp_encode_bool(data, status->is_ro); + data = mp_encode_uint(data, IPROTO_STATUS_VCLOCK); + data = mp_encode_vclock(data, &status->vclock); + size_t size = data - buf; + assert(size <= max_size); + + iproto_header_encode(buf, IPROTO_OK, sync, schema_version, + size - IPROTO_HEADER_LEN); + + char *ptr = obuf_alloc(out, size); + assert(ptr == buf); + return 0; +} + +int iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync, uint32_t schema_version) { @@ -847,10 +882,69 @@ error: } void -xrow_encode_request_vote(struct xrow_header *row) +xrow_encode_status_request(struct xrow_header *row) { memset(row, 0, sizeof(*row)); - row->type = IPROTO_REQUEST_VOTE; + row->type = IPROTO_REQUEST_STATUS; +} + +int +xrow_decode_status(struct xrow_header *row, struct status *status) +{ + status->is_ro = false; + vclock_create(&status->vclock); + + if (row->bodycnt == 0) + goto err; + assert(row->bodycnt == 1); + + const char *data = (const char *) row->body[0].iov_base; + const char *end = data + row->body[0].iov_len; + const char *tmp = data; + if (mp_check(&tmp, end) != 0 || mp_typeof(*data) != MP_MAP) + goto err; + + /* Find STATUS key. */ + uint32_t map_size = mp_decode_map(&data); + for (uint32_t i = 0; i < map_size; i++) { + if (mp_typeof(*data) != MP_UINT) { + mp_next(&data); /* key */ + mp_next(&data); /* value */ + continue; + } + if (mp_decode_uint(&data) == IPROTO_STATUS) + break; + } + if (data == end) + return 0; + + /* Decode STATUS map. */ + map_size = mp_decode_map(&data); + for (uint32_t i = 0; i < map_size; i++) { + if (mp_typeof(*data) != MP_UINT) { + mp_next(&data); /* key */ + mp_next(&data); /* value */ + continue; + } + uint32_t key = mp_decode_uint(&data); + switch (key) { + case IPROTO_STATUS_IS_RO: + if (mp_typeof(*data) != MP_BOOL) + goto err; + status->is_ro = mp_decode_bool(&data); + break; + case IPROTO_STATUS_VCLOCK: + if (mp_decode_vclock(&data, &status->vclock) != 0) + goto err; + break; + default: + mp_next(&data); + } + } + return 0; +err: + diag_set(ClientError, ER_INVALID_MSGPACK, "packet body"); + return -1; } int diff --git a/src/box/xrow.h b/src/box/xrow.h index 92ea3c97..67910d52 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -30,19 +30,19 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include #include #include #include /* struct iovec */ #include "tt_uuid.h" #include "diag.h" +#include "vclock.h" #if defined(__cplusplus) extern "C" { #endif -struct vclock; - enum { XROW_HEADER_IOVMAX = 1, XROW_BODY_IOVMAX = 2, @@ -223,12 +223,28 @@ xrow_encode_auth(struct xrow_header *row, const char *salt, size_t salt_len, const char *login, size_t login_len, const char *password, size_t password_len); +/** Instance status. */ +struct status { + /** Set if the instance is running in read-only mode. */ + bool is_ro; + /** Current instance vclock. */ + struct vclock vclock; +}; + /** - * Encode a vote request for master election. + * Decode STATUS from MessagePack. + * @param row Row to decode. + * @param[out] status + */ +int +xrow_decode_status(struct xrow_header *row, struct status *status); + +/** + * Encode an instance status request. * @param row[out] Row to encode into. */ void -xrow_encode_request_vote(struct xrow_header *row); +xrow_encode_status_request(struct xrow_header *row); /** * Encode SUBSCRIBE command. @@ -315,22 +331,6 @@ xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock) } /** - * Decode peer vclock and access rights (a response to VOTE command). - * @param row Row to decode. - * @param[out] vclock. - * @param[out] read_only. - * - * @retval 0 Success. - * @retval -1 Memory or format error. - */ -static inline int -xrow_decode_request_vote(struct xrow_header *row, struct vclock *vclock, - bool *read_only) -{ - return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, read_only); -} - -/** * Encode a heartbeat message. * @param row[out] Row to encode into. * @param replica_id Instance id. @@ -405,6 +405,20 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync, bool read_only); /** + * Encode a reply to an instance status request. + * @param out Buffer to write to. + * @param status Instance status to encode. + * @param sync Request sync. + * @param schema_version Actual schema version. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +int +iproto_reply_status(struct obuf *out, const struct status *status, + uint64_t sync, uint32_t schema_version); + +/** * Write an error packet int output buffer. Doesn't throw if out * of memory */ @@ -585,6 +599,14 @@ xrow_encode_auth_xc(struct xrow_header *row, const char *salt, size_t salt_len, diag_raise(); } +/** @copydoc xrow_decode_status. */ +static inline void +xrow_decode_status_xc(struct xrow_header *row, struct status *status) +{ + if (xrow_decode_status(row, status) != 0) + diag_raise(); +} + /** @copydoc xrow_encode_subscribe. */ static inline void xrow_encode_subscribe_xc(struct xrow_header *row, @@ -642,15 +664,6 @@ xrow_decode_vclock_xc(struct xrow_header *row, struct vclock *vclock) diag_raise(); } -/** @copydoc xrow_decode_request_vote. */ -static inline void -xrow_decode_request_vote_xc(struct xrow_header *row, struct vclock *vclock, - bool *read_only) -{ - if (xrow_decode_request_vote(row, vclock, read_only) != 0) - diag_raise(); -} - /** @copydoc iproto_reply_ok. */ static inline void iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version) @@ -670,6 +683,15 @@ iproto_reply_request_vote_xc(struct obuf *out, uint64_t sync, diag_raise(); } +/** @copydoc iproto_reply_status. */ +static inline void +iproto_reply_status_xc(struct obuf *out, const struct status *status, + uint64_t sync, uint32_t schema_version) +{ + if (iproto_reply_status(out, status, sync, schema_version) != 0) + diag_raise(); +} + #endif #endif /* TARANTOOL_XROW_H_INCLUDED */ -- 2.11.0