[PATCH v2 07/11] applier: inquire oldest vclock on connect

Vladimir Davydov vdavydov.dev at gmail.com
Fri Jun 8 20:34:25 MSK 2018


Introduce a new iproto command IPROTO_GET_GC_VCLOCK that returns the
vclock of the oldest checkpoint available at the master. Use this
command when applier is connected to set applier->gc_vclock. We will
need it to check whether a replica fell too much behind its peers in
the cluster and so needs to be rebootstrapped.

Needed for #461
---
 src/box/applier.cc         | 15 +++++++++++++++
 src/box/applier.h          |  2 ++
 src/box/box.cc             | 12 ++++++++++++
 src/box/box.h              |  3 +++
 src/box/iproto.cc          |  7 +++++++
 src/box/iproto_constants.h |  2 ++
 src/box/xrow.c             | 36 ++++++++++++++++++++++++++++++++++++
 src/box/xrow.h             | 31 +++++++++++++++++++++++++++++++
 8 files changed, 108 insertions(+)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 556502bf..8d750dc6 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -228,6 +228,21 @@ applier_connect(struct applier *applier)
 					    &applier->remote_is_ro);
 	}
 
+	/**
+	 * Tarantool >= 1.10.1: send an IPROTO_GET_GC_VCLOCK message
+	 * to find out the oldest vclock available at the remove end.
+	 * Needed to check if the replica has to be rebootstrapped.
+	 */
+	if (applier->version_id >= version_id(1, 10, 1)) {
+		xrow_encode_get_gc_vclock(&row);
+		coio_write_xrow(coio, &row);
+		coio_read_xrow(coio, ibuf, &row);
+		if (row.type != IPROTO_OK)
+			xrow_decode_error_xc(&row);
+		vclock_create(&applier->gc_vclock);
+		xrow_decode_vclock_xc(&row, &applier->gc_vclock);
+	}
+
 	applier_set_state(applier, APPLIER_CONNECTED);
 
 	/* Detect connection to itself */
diff --git a/src/box/applier.h b/src/box/applier.h
index c33562cc..d0ae1ed1 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -96,6 +96,8 @@ struct applier {
 	uint32_t version_id;
 	/** Remote vclock at time of connect. */
 	struct vclock vclock;
+	/** Oldest vclock available at remote at time of connect. */
+	struct vclock gc_vclock;
 	/** Remote peer mode, true if read-only, default: false */
 	bool remote_is_ro;
 	/** Remote address */
diff --git a/src/box/box.cc b/src/box/box.cc
index 922e8604..0aaed562 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1540,6 +1540,18 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 			replica_version_id);
 }
 
+void
+box_get_gc_vclock(struct vclock *vclock)
+{
+	struct checkpoint_iterator it;
+	checkpoint_iterator_init(&it);
+	const struct vclock *oldest = checkpoint_iterator_next(&it);
+	if (oldest != NULL)
+		vclock_copy(vclock, oldest);
+	else
+		vclock_create(vclock);
+}
+
 /** Insert a new cluster into _schema */
 static void
 box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
diff --git a/src/box/box.h b/src/box/box.h
index 182e1b72..10c54102 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -163,6 +163,9 @@ box_process_join(struct ev_io *io, struct xrow_header *header);
 void
 box_process_subscribe(struct ev_io *io, struct xrow_header *header);
 
+void
+box_get_gc_vclock(struct vclock *vclock);
+
 /**
  * Check Lua configuration before initialization or
  * in case of a configuration change.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index c6b13934..fdb286ad 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1062,6 +1062,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		*stop_input = true;
 		break;
 	case IPROTO_REQUEST_VOTE:
+	case IPROTO_GET_GC_VCLOCK:
 		cmsg_init(&msg->base, misc_route);
 		break;
 	case IPROTO_AUTH:
@@ -1423,6 +1424,7 @@ tx_process_misc(struct cmsg *m)
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = con->tx.p_obuf;
+	struct vclock vclock;
 
 	tx_fiber_init(con->session, msg->header.sync);
 
@@ -1446,6 +1448,11 @@ tx_process_misc(struct cmsg *m)
 						     &replicaset.vclock,
 						     cfg_geti("read_only"));
 			break;
+		case IPROTO_GET_GC_VCLOCK:
+			box_get_gc_vclock(&vclock);
+			iproto_reply_vclock_xc(out, msg->header.sync,
+					       ::schema_version, &vclock);
+			break;
 		default:
 			unreachable();
 		}
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 46d47719..cb2fdbf1 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -156,6 +156,8 @@ enum iproto_type {
 	IPROTO_SUBSCRIBE = 66,
 	/** Vote request command for master election */
 	IPROTO_REQUEST_VOTE = 67,
+	/** Command to inquire garbage collection state */
+	IPROTO_GET_GC_VCLOCK = 68,
 
 	/** Vinyl run info stored in .index file */
 	VY_INDEX_RUN_INFO = 100,
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 532e1296..dc5fa0a2 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -282,6 +282,35 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version)
 }
 
 int
+iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version,
+		    const struct vclock *vclock)
+{
+	size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) +
+		mp_sizeof_uint(UINT32_MAX) + mp_sizeof_vclock(vclock);
+
+	char *buf = obuf_reserve(out, max_size);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, max_size,
+			 "obuf_alloc", "buf");
+		return -1;
+	}
+
+	char *data = buf + IPROTO_HEADER_LEN;
+	data = mp_encode_map(data, 1);
+	data = mp_encode_uint(data, IPROTO_VCLOCK);
+	data = mp_encode_vclock(data, vclock);
+	size_t size = data - buf;
+	assert(size <= max_size);
+
+	iproto_header_encode(buf, IPROTO_OK, sync, schema_version,
+			     size - IPROTO_HEADER_LEN);
+
+	char *ptr = obuf_alloc(out, size);
+	assert(ptr == buf);
+	return 0;
+}
+
+int
 iproto_reply_request_vote(struct obuf *out, uint64_t sync,
 			  uint32_t schema_version, const struct vclock *vclock,
 			  bool read_only)
@@ -811,6 +840,13 @@ xrow_encode_request_vote(struct xrow_header *row)
 	row->type = IPROTO_REQUEST_VOTE;
 }
 
+void
+xrow_encode_get_gc_vclock(struct xrow_header *row)
+{
+	memset(row, 0, sizeof(*row));
+	row->type = IPROTO_GET_GC_VCLOCK;
+}
+
 int
 xrow_encode_subscribe(struct xrow_header *row,
 		      const struct tt_uuid *replicaset_uuid,
diff --git a/src/box/xrow.h b/src/box/xrow.h
index b10bf26d..edf16ec2 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -230,6 +230,13 @@ void
 xrow_encode_request_vote(struct xrow_header *row);
 
 /**
+ * Encode a vote request for gc state inquiry.
+ * @param row[out] Row to encode into.
+ */
+void
+xrow_encode_get_gc_vclock(struct xrow_header *row);
+
+/**
  * Encode SUBSCRIBE command.
  * @param[out] Row.
  * @param replicaset_uuid Replica set uuid.
@@ -393,6 +400,21 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version);
  * @param sync Request sync.
  * @param schema_version.
  * @param vclock.
+ *
+ * @retval  0 Success.
+ * @retval -1 Memory error.
+ */
+int
+iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version,
+		    const struct vclock *vclock);
+
+/**
+ * Encode iproto header with IPROTO_OK response code
+ * and vclock in the body.
+ * @param out Encode to.
+ * @param sync Request sync.
+ * @param schema_version.
+ * @param vclock.
  * @param read_only.
  *
  * @retval  0 Success.
@@ -646,6 +668,15 @@ iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version)
 		diag_raise();
 }
 
+/** @copydoc iproto_reply_vclock. */
+static inline void
+iproto_reply_vclock_xc(struct obuf *out, uint64_t sync, uint32_t schema_version,
+		       const struct vclock *vclock)
+{
+	if (iproto_reply_vclock(out, sync, schema_version, vclock) != 0)
+		diag_raise();
+}
+
 /** @copydoc iproto_reply_request_vote_xc. */
 static inline void
 iproto_reply_request_vote_xc(struct obuf *out, uint64_t sync,
-- 
2.11.0




More information about the Tarantool-patches mailing list