From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [RFC PATCH 11/12] applier: inquire oldest vclock on connect Date: Wed, 6 Jun 2018 20:45:11 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: Introduce a new iproto command IPROTO_GET_GC_VCLOCK that returns the vclock of the oldest checkpoint available at the master. Use this command when applier is connected to set applier->gc_vclock. We will need it to check whether a replica fell too much behind its peers in the cluster and so needs to be rebootstrapped. Needed for #461 --- src/box/applier.cc | 15 +++++++++++++++ src/box/applier.h | 2 ++ src/box/box.cc | 12 ++++++++++++ src/box/box.h | 3 +++ src/box/iproto.cc | 7 +++++++ src/box/iproto_constants.h | 2 ++ src/box/xrow.c | 36 ++++++++++++++++++++++++++++++++++++ src/box/xrow.h | 31 +++++++++++++++++++++++++++++++ 8 files changed, 108 insertions(+) diff --git a/src/box/applier.cc b/src/box/applier.cc index 556502bf..8d750dc6 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -228,6 +228,21 @@ applier_connect(struct applier *applier) &applier->remote_is_ro); } + /** + * Tarantool >= 1.10.1: send an IPROTO_GET_GC_VCLOCK message + * to find out the oldest vclock available at the remove end. + * Needed to check if the replica has to be rebootstrapped. + */ + if (applier->version_id >= version_id(1, 10, 1)) { + xrow_encode_get_gc_vclock(&row); + coio_write_xrow(coio, &row); + coio_read_xrow(coio, ibuf, &row); + if (row.type != IPROTO_OK) + xrow_decode_error_xc(&row); + vclock_create(&applier->gc_vclock); + xrow_decode_vclock_xc(&row, &applier->gc_vclock); + } + applier_set_state(applier, APPLIER_CONNECTED); /* Detect connection to itself */ diff --git a/src/box/applier.h b/src/box/applier.h index c33562cc..d0ae1ed1 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -96,6 +96,8 @@ struct applier { uint32_t version_id; /** Remote vclock at time of connect. */ struct vclock vclock; + /** Oldest vclock available at remote at time of connect. */ + struct vclock gc_vclock; /** Remote peer mode, true if read-only, default: false */ bool remote_is_ro; /** Remote address */ diff --git a/src/box/box.cc b/src/box/box.cc index 9b2c2e2a..c10124ea 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1540,6 +1540,18 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) replica_version_id); } +void +box_get_gc_vclock(struct vclock *vclock) +{ + struct checkpoint_iterator it; + checkpoint_iterator_init(&it); + const struct vclock *oldest = checkpoint_iterator_next(&it); + if (oldest != NULL) + vclock_copy(vclock, oldest); + else + vclock_create(vclock); +} + /** Insert a new cluster into _schema */ static void box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid) diff --git a/src/box/box.h b/src/box/box.h index 182e1b72..10c54102 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -163,6 +163,9 @@ box_process_join(struct ev_io *io, struct xrow_header *header); void box_process_subscribe(struct ev_io *io, struct xrow_header *header); +void +box_get_gc_vclock(struct vclock *vclock); + /** * Check Lua configuration before initialization or * in case of a configuration change. diff --git a/src/box/iproto.cc b/src/box/iproto.cc index c6b13934..fdb286ad 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -1062,6 +1062,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, *stop_input = true; break; case IPROTO_REQUEST_VOTE: + case IPROTO_GET_GC_VCLOCK: cmsg_init(&msg->base, misc_route); break; case IPROTO_AUTH: @@ -1423,6 +1424,7 @@ tx_process_misc(struct cmsg *m) struct iproto_msg *msg = tx_accept_msg(m); struct iproto_connection *con = msg->connection; struct obuf *out = con->tx.p_obuf; + struct vclock vclock; tx_fiber_init(con->session, msg->header.sync); @@ -1446,6 +1448,11 @@ tx_process_misc(struct cmsg *m) &replicaset.vclock, cfg_geti("read_only")); break; + case IPROTO_GET_GC_VCLOCK: + box_get_gc_vclock(&vclock); + iproto_reply_vclock_xc(out, msg->header.sync, + ::schema_version, &vclock); + break; default: unreachable(); } diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 46d47719..cb2fdbf1 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -156,6 +156,8 @@ enum iproto_type { IPROTO_SUBSCRIBE = 66, /** Vote request command for master election */ IPROTO_REQUEST_VOTE = 67, + /** Command to inquire garbage collection state */ + IPROTO_GET_GC_VCLOCK = 68, /** Vinyl run info stored in .index file */ VY_INDEX_RUN_INFO = 100, diff --git a/src/box/xrow.c b/src/box/xrow.c index 532e1296..dc5fa0a2 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -282,6 +282,35 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version) } int +iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version, + const struct vclock *vclock) +{ + size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) + + mp_sizeof_uint(UINT32_MAX) + mp_sizeof_vclock(vclock); + + char *buf = obuf_reserve(out, max_size); + if (buf == NULL) { + diag_set(OutOfMemory, max_size, + "obuf_alloc", "buf"); + return -1; + } + + char *data = buf + IPROTO_HEADER_LEN; + data = mp_encode_map(data, 1); + data = mp_encode_uint(data, IPROTO_VCLOCK); + data = mp_encode_vclock(data, vclock); + size_t size = data - buf; + assert(size <= max_size); + + iproto_header_encode(buf, IPROTO_OK, sync, schema_version, + size - IPROTO_HEADER_LEN); + + char *ptr = obuf_alloc(out, size); + assert(ptr == buf); + return 0; +} + +int iproto_reply_request_vote(struct obuf *out, uint64_t sync, uint32_t schema_version, const struct vclock *vclock, bool read_only) @@ -811,6 +840,13 @@ xrow_encode_request_vote(struct xrow_header *row) row->type = IPROTO_REQUEST_VOTE; } +void +xrow_encode_get_gc_vclock(struct xrow_header *row) +{ + memset(row, 0, sizeof(*row)); + row->type = IPROTO_GET_GC_VCLOCK; +} + int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, diff --git a/src/box/xrow.h b/src/box/xrow.h index b10bf26d..edf16ec2 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -230,6 +230,13 @@ void xrow_encode_request_vote(struct xrow_header *row); /** + * Encode a vote request for gc state inquiry. + * @param row[out] Row to encode into. + */ +void +xrow_encode_get_gc_vclock(struct xrow_header *row); + +/** * Encode SUBSCRIBE command. * @param[out] Row. * @param replicaset_uuid Replica set uuid. @@ -393,6 +400,21 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version); * @param sync Request sync. * @param schema_version. * @param vclock. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +int +iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version, + const struct vclock *vclock); + +/** + * Encode iproto header with IPROTO_OK response code + * and vclock in the body. + * @param out Encode to. + * @param sync Request sync. + * @param schema_version. + * @param vclock. * @param read_only. * * @retval 0 Success. @@ -646,6 +668,15 @@ iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version) diag_raise(); } +/** @copydoc iproto_reply_vclock. */ +static inline void +iproto_reply_vclock_xc(struct obuf *out, uint64_t sync, uint32_t schema_version, + const struct vclock *vclock) +{ + if (iproto_reply_vclock(out, sync, schema_version, vclock) != 0) + diag_raise(); +} + /** @copydoc iproto_reply_request_vote_xc. */ static inline void iproto_reply_request_vote_xc(struct obuf *out, uint64_t sync, -- 2.11.0