[RFC PATCH 11/12] applier: inquire oldest vclock on connect
Vladimir Davydov
vdavydov.dev at gmail.com
Wed Jun 6 20:45:11 MSK 2018
Introduce a new iproto command IPROTO_GET_GC_VCLOCK that returns the
vclock of the oldest checkpoint available at the master. Use this
command when applier is connected to set applier->gc_vclock. We will
need it to check whether a replica fell too much behind its peers in
the cluster and so needs to be rebootstrapped.
Needed for #461
---
src/box/applier.cc | 15 +++++++++++++++
src/box/applier.h | 2 ++
src/box/box.cc | 12 ++++++++++++
src/box/box.h | 3 +++
src/box/iproto.cc | 7 +++++++
src/box/iproto_constants.h | 2 ++
src/box/xrow.c | 36 ++++++++++++++++++++++++++++++++++++
src/box/xrow.h | 31 +++++++++++++++++++++++++++++++
8 files changed, 108 insertions(+)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 556502bf..8d750dc6 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -228,6 +228,21 @@ applier_connect(struct applier *applier)
&applier->remote_is_ro);
}
+ /**
+ * Tarantool >= 1.10.1: send an IPROTO_GET_GC_VCLOCK message
+ * to find out the oldest vclock available at the remove end.
+ * Needed to check if the replica has to be rebootstrapped.
+ */
+ if (applier->version_id >= version_id(1, 10, 1)) {
+ xrow_encode_get_gc_vclock(&row);
+ coio_write_xrow(coio, &row);
+ coio_read_xrow(coio, ibuf, &row);
+ if (row.type != IPROTO_OK)
+ xrow_decode_error_xc(&row);
+ vclock_create(&applier->gc_vclock);
+ xrow_decode_vclock_xc(&row, &applier->gc_vclock);
+ }
+
applier_set_state(applier, APPLIER_CONNECTED);
/* Detect connection to itself */
diff --git a/src/box/applier.h b/src/box/applier.h
index c33562cc..d0ae1ed1 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -96,6 +96,8 @@ struct applier {
uint32_t version_id;
/** Remote vclock at time of connect. */
struct vclock vclock;
+ /** Oldest vclock available at remote at time of connect. */
+ struct vclock gc_vclock;
/** Remote peer mode, true if read-only, default: false */
bool remote_is_ro;
/** Remote address */
diff --git a/src/box/box.cc b/src/box/box.cc
index 9b2c2e2a..c10124ea 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1540,6 +1540,18 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
replica_version_id);
}
+void
+box_get_gc_vclock(struct vclock *vclock)
+{
+ struct checkpoint_iterator it;
+ checkpoint_iterator_init(&it);
+ const struct vclock *oldest = checkpoint_iterator_next(&it);
+ if (oldest != NULL)
+ vclock_copy(vclock, oldest);
+ else
+ vclock_create(vclock);
+}
+
/** Insert a new cluster into _schema */
static void
box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
diff --git a/src/box/box.h b/src/box/box.h
index 182e1b72..10c54102 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -163,6 +163,9 @@ box_process_join(struct ev_io *io, struct xrow_header *header);
void
box_process_subscribe(struct ev_io *io, struct xrow_header *header);
+void
+box_get_gc_vclock(struct vclock *vclock);
+
/**
* Check Lua configuration before initialization or
* in case of a configuration change.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index c6b13934..fdb286ad 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1062,6 +1062,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
*stop_input = true;
break;
case IPROTO_REQUEST_VOTE:
+ case IPROTO_GET_GC_VCLOCK:
cmsg_init(&msg->base, misc_route);
break;
case IPROTO_AUTH:
@@ -1423,6 +1424,7 @@ tx_process_misc(struct cmsg *m)
struct iproto_msg *msg = tx_accept_msg(m);
struct iproto_connection *con = msg->connection;
struct obuf *out = con->tx.p_obuf;
+ struct vclock vclock;
tx_fiber_init(con->session, msg->header.sync);
@@ -1446,6 +1448,11 @@ tx_process_misc(struct cmsg *m)
&replicaset.vclock,
cfg_geti("read_only"));
break;
+ case IPROTO_GET_GC_VCLOCK:
+ box_get_gc_vclock(&vclock);
+ iproto_reply_vclock_xc(out, msg->header.sync,
+ ::schema_version, &vclock);
+ break;
default:
unreachable();
}
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 46d47719..cb2fdbf1 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -156,6 +156,8 @@ enum iproto_type {
IPROTO_SUBSCRIBE = 66,
/** Vote request command for master election */
IPROTO_REQUEST_VOTE = 67,
+ /** Command to inquire garbage collection state */
+ IPROTO_GET_GC_VCLOCK = 68,
/** Vinyl run info stored in .index file */
VY_INDEX_RUN_INFO = 100,
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 532e1296..dc5fa0a2 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -282,6 +282,35 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version)
}
int
+iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version,
+ const struct vclock *vclock)
+{
+ size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) +
+ mp_sizeof_uint(UINT32_MAX) + mp_sizeof_vclock(vclock);
+
+ char *buf = obuf_reserve(out, max_size);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, max_size,
+ "obuf_alloc", "buf");
+ return -1;
+ }
+
+ char *data = buf + IPROTO_HEADER_LEN;
+ data = mp_encode_map(data, 1);
+ data = mp_encode_uint(data, IPROTO_VCLOCK);
+ data = mp_encode_vclock(data, vclock);
+ size_t size = data - buf;
+ assert(size <= max_size);
+
+ iproto_header_encode(buf, IPROTO_OK, sync, schema_version,
+ size - IPROTO_HEADER_LEN);
+
+ char *ptr = obuf_alloc(out, size);
+ assert(ptr == buf);
+ return 0;
+}
+
+int
iproto_reply_request_vote(struct obuf *out, uint64_t sync,
uint32_t schema_version, const struct vclock *vclock,
bool read_only)
@@ -811,6 +840,13 @@ xrow_encode_request_vote(struct xrow_header *row)
row->type = IPROTO_REQUEST_VOTE;
}
+void
+xrow_encode_get_gc_vclock(struct xrow_header *row)
+{
+ memset(row, 0, sizeof(*row));
+ row->type = IPROTO_GET_GC_VCLOCK;
+}
+
int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
diff --git a/src/box/xrow.h b/src/box/xrow.h
index b10bf26d..edf16ec2 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -230,6 +230,13 @@ void
xrow_encode_request_vote(struct xrow_header *row);
/**
+ * Encode a vote request for gc state inquiry.
+ * @param row[out] Row to encode into.
+ */
+void
+xrow_encode_get_gc_vclock(struct xrow_header *row);
+
+/**
* Encode SUBSCRIBE command.
* @param[out] Row.
* @param replicaset_uuid Replica set uuid.
@@ -393,6 +400,21 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version);
* @param sync Request sync.
* @param schema_version.
* @param vclock.
+ *
+ * @retval 0 Success.
+ * @retval -1 Memory error.
+ */
+int
+iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version,
+ const struct vclock *vclock);
+
+/**
+ * Encode iproto header with IPROTO_OK response code
+ * and vclock in the body.
+ * @param out Encode to.
+ * @param sync Request sync.
+ * @param schema_version.
+ * @param vclock.
* @param read_only.
*
* @retval 0 Success.
@@ -646,6 +668,15 @@ iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version)
diag_raise();
}
+/** @copydoc iproto_reply_vclock. */
+static inline void
+iproto_reply_vclock_xc(struct obuf *out, uint64_t sync, uint32_t schema_version,
+ const struct vclock *vclock)
+{
+ if (iproto_reply_vclock(out, sync, schema_version, vclock) != 0)
+ diag_raise();
+}
+
/** @copydoc iproto_reply_request_vote_xc. */
static inline void
iproto_reply_request_vote_xc(struct obuf *out, uint64_t sync,
--
2.11.0
More information about the Tarantool-patches
mailing list