[Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay

sergepetrenko sergepetrenko at tarantool.org
Wed Feb 26 13:00:08 MSK 2020


From: Serge Petrenko <sergepetrenko at tarantool.org>

Add a filter for relay to skip rows coming from unwanted instances.
A list of instance ids whose rows replica doesn't want to fetch is encoded
together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_MASK.

Filtering rows is needed to prevent an instance from fetching its own
rows from a remote master, which is useful on initial configuration and
harmful on resubscribe.

Prerequisite #4739, #3294

@TarantoolBot document

Title: document new binary protocol key and subscribe request changes

Add key `IPROTO_ID_MASK = 0x51` to the internals reference.
This is an optional key used in SUBSCRIBE request followed by an array
of ids of instances whose rows won't be relayed to the replica.

SUBSCRIBE request is supplemented with an optional field of the
following structure:
```
+====================+
|      ID_MASK       |
|   0x51 : ID LIST   |
| MP_INT : MP_ARRRAY |
|                    |
+====================+
```
The field is encoded only when the id list is not empty.
---
 src/box/applier.cc         |  2 +-
 src/box/box.cc             |  7 ++++---
 src/box/iproto_constants.h |  1 +
 src/box/relay.cc           |  9 ++++++++-
 src/box/relay.h            |  3 ++-
 src/box/xrow.c             | 41 +++++++++++++++++++++++++++++++++++---
 src/box/xrow.h             | 34 +++++++++++++++++++++----------
 7 files changed, 77 insertions(+), 20 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index ae3d281a5..911353425 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -867,7 +867,7 @@ applier_subscribe(struct applier *applier)
 	vclock_create(&vclock);
 	vclock_copy(&vclock, &replicaset.vclock);
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
-				 &vclock, replication_anon);
+				 &vclock, replication_anon, 0);
 	coio_write_xrow(coio, &row);
 
 	/* Read SUBSCRIBE response */
diff --git a/src/box/box.cc b/src/box/box.cc
index 5850894de..232d7861b 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1787,8 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	uint32_t replica_version_id;
 	vclock_create(&replica_clock);
 	bool anon;
-	xrow_decode_subscribe_xc(header, NULL, &replica_uuid,
-				 &replica_clock, &replica_version_id, &anon);
+	unsigned int id_mask;
+	xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,
+				 &replica_version_id, &anon, &id_mask);
 
 	/* Forbid connection to itself */
 	if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))
@@ -1871,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	 * indefinitely).
 	 */
 	relay_subscribe(replica, io->fd, header->sync, &replica_clock,
-			replica_version_id);
+			replica_version_id, id_mask);
 }
 
 void
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index b66c05c06..814ada303 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -125,6 +125,7 @@ enum iproto_key {
 	IPROTO_STMT_ID = 0x43,
 	/* Leave a gap between SQL keys and additional request keys */
 	IPROTO_REPLICA_ANON = 0x50,
+	IPROTO_ID_MASK = 0x51,
 	IPROTO_KEY_MAX
 };
 
diff --git a/src/box/relay.cc b/src/box/relay.cc
index b89632273..87930a006 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -109,6 +109,7 @@ struct relay {
 	struct vclock recv_vclock;
 	/** Replicatoin slave version. */
 	uint32_t version_id;
+	unsigned int id_mask;
 	/**
 	 * Local vclock at the moment of subscribe, used to check
 	 * dataset on the other side and send missing data rows if any.
@@ -676,7 +677,8 @@ relay_subscribe_f(va_list ap)
 /** Replication acceptor fiber handler. */
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
-		struct vclock *replica_clock, uint32_t replica_version_id)
+		struct vclock *replica_clock, uint32_t replica_version_id,
+		unsigned int replica_id_mask)
 {
 	assert(replica->anon || replica->id != REPLICA_ID_NIL);
 	struct relay *relay = replica->relay;
@@ -705,6 +707,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
+	relay->id_mask = replica_id_mask;
+
 	int rc = cord_costart(&relay->cord, "subscribe",
 			      relay_subscribe_f, relay);
 	if (rc == 0)
@@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
 	}
+	/* Check if the rows from the instance are filtered. */
+	if (1 << packet->replica_id & relay->id_mask)
+		return;
 	/*
 	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
 	 * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
diff --git a/src/box/relay.h b/src/box/relay.h
index e1782d78f..255d29d90 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -124,6 +124,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
  */
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
-		struct vclock *replica_vclock, uint32_t replica_version_id);
+		struct vclock *replica_vclock, uint32_t replica_version_id,
+		unsigned int replica_id_mask);
 
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 968c3a202..ee78ed24d 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1194,17 +1194,23 @@ int
 xrow_encode_subscribe(struct xrow_header *row,
 		      const struct tt_uuid *replicaset_uuid,
 		      const struct tt_uuid *instance_uuid,
-		      const struct vclock *vclock, bool anon)
+		      const struct vclock *vclock, bool anon,
+		      unsigned int id_mask)
 {
 	memset(row, 0, sizeof(*row));
 	size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
+	unsigned int map_size = __builtin_popcount(id_mask);
+	if (map_size) {
+		size += mp_sizeof_array(map_size) + map_size *
+			mp_sizeof_uint(VCLOCK_MAX);
+	}
 	char *buf = (char *) region_alloc(&fiber()->gc, size);
 	if (buf == NULL) {
 		diag_set(OutOfMemory, size, "region_alloc", "buf");
 		return -1;
 	}
 	char *data = buf;
-	data = mp_encode_map(data, 5);
+	data = mp_encode_map(data, map_size ? 6 : 5);
 	data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
 	data = xrow_encode_uuid(data, replicaset_uuid);
 	data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1215,6 +1221,17 @@ xrow_encode_subscribe(struct xrow_header *row,
 	data = mp_encode_uint(data, tarantool_version_id());
 	data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
 	data = mp_encode_bool(data, anon);
+	if (map_size) {
+		data = mp_encode_uint(data, IPROTO_ID_MASK);
+		data = mp_encode_array(data, map_size);
+		struct bit_iterator it;
+		bit_iterator_init(&it, &id_mask, sizeof(id_mask),
+				  true);
+		for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
+		     id = bit_iterator_next(&it)) {
+			data = mp_encode_uint(data, id);
+		}
+	}
 	assert(data <= buf + size);
 	row->body[0].iov_base = buf;
 	row->body[0].iov_len = (data - buf);
@@ -1226,7 +1243,8 @@ xrow_encode_subscribe(struct xrow_header *row,
 int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
-		      uint32_t *version_id, bool *anon)
+		      uint32_t *version_id, bool *anon,
+		      unsigned int *id_mask)
 {
 	if (row->bodycnt == 0) {
 		diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -1244,6 +1262,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 
 	if (anon)
 		*anon = false;
+	if (id_mask)
+		*id_mask = 0;
 	d = data;
 	uint32_t map_size = mp_decode_map(&d);
 	for (uint32_t i = 0; i < map_size; i++) {
@@ -1301,6 +1321,21 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 			}
 			*anon = mp_decode_bool(&d);
 			break;
+		case IPROTO_ID_MASK:
+			if (id_mask == NULL)
+				goto skip;
+			if (mp_typeof(*d) != MP_ARRAY) {
+decode_err:			xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+						   "invalid id_mask");
+				return -1;
+			}
+			uint32_t len = mp_decode_array(&d);
+			for(uint32_t i = 0; i < len; ++i) {
+				if (mp_typeof(*d) != MP_UINT)
+					goto decode_err;
+				*id_mask |= 1 << mp_decode_uint(&d);
+			}
+			break;
 		default: skip:
 			mp_next(&d); /* value */
 		}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 0973c497d..ab5fc944a 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row,
  * @param instance_uuid Instance uuid.
  * @param vclock Replication clock.
  * @param anon Whether it is an anonymous subscribe request or not.
+ * @param id_mask A List of replica ids to skip rows from
+ *                      when feeding a replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
@@ -330,7 +332,8 @@ int
 xrow_encode_subscribe(struct xrow_header *row,
 		      const struct tt_uuid *replicaset_uuid,
 		      const struct tt_uuid *instance_uuid,
-		      const struct vclock *vclock, bool anon);
+		      const struct vclock *vclock, bool anon,
+		      unsigned int id_mask);
 
 /**
  * Decode SUBSCRIBE command.
@@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row,
  * @param[out] vclock.
  * @param[out] version_id.
  * @param[out] anon Whether it is an anonymous subscribe.
+ * @param[out] id_mask A list of ids to skip rows from when
+ *             feeding replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory or format error.
@@ -347,7 +352,8 @@ xrow_encode_subscribe(struct xrow_header *row,
 int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
-		      uint32_t *version_id, bool *anon);
+		      uint32_t *version_id, bool *anon,
+		      unsigned int *id_mask);
 
 /**
  * Encode JOIN command.
@@ -371,7 +377,8 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid);
 static inline int
 xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid)
 {
-	return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL);
+	return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL,
+				     NULL);
 }
 
 /**
@@ -386,7 +393,8 @@ static inline int
 xrow_decode_register(struct xrow_header *row, struct tt_uuid *instance_uuid,
 		     struct vclock *vclock)
 {
-	return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL, NULL);
+	return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL,
+				     NULL, NULL);
 }
 
 /**
@@ -411,7 +419,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock);
 static inline int
 xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
 {
-	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL);
+	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL, NULL);
 }
 
 /**
@@ -442,7 +450,8 @@ xrow_decode_subscribe_response(struct xrow_header *row,
 			       struct tt_uuid *replicaset_uuid,
 			       struct vclock *vclock)
 {
-	return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL, NULL);
+	return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL,
+				     NULL, NULL);
 }
 
 /**
@@ -817,10 +826,11 @@ static inline void
 xrow_encode_subscribe_xc(struct xrow_header *row,
 			 const struct tt_uuid *replicaset_uuid,
 			 const struct tt_uuid *instance_uuid,
-			 const struct vclock *vclock, bool anon)
+			 const struct vclock *vclock, bool anon,
+			 unsigned int id_mask)
 {
 	if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
-				  vclock, anon) != 0)
+				  vclock, anon, id_mask) != 0)
 		diag_raise();
 }
 
@@ -828,11 +838,13 @@ xrow_encode_subscribe_xc(struct xrow_header *row,
 static inline void
 xrow_decode_subscribe_xc(struct xrow_header *row,
 			 struct tt_uuid *replicaset_uuid,
-		         struct tt_uuid *instance_uuid, struct vclock *vclock,
-			 uint32_t *replica_version_id, bool *anon)
+			 struct tt_uuid *instance_uuid, struct vclock *vclock,
+			 uint32_t *replica_version_id, bool *anon,
+			 unsigned int *id_mask)
 {
 	if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
-				  vclock, replica_version_id, anon) != 0)
+				  vclock, replica_version_id, anon,
+				  id_mask) != 0)
 		diag_raise();
 }
 
-- 
2.20.1 (Apple Git-117)



More information about the Tarantool-patches mailing list