[Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay

Serge Petrenko sergepetrenko at tarantool.org
Thu Feb 27 16:15:54 MSK 2020



> 27 февр. 2020 г., в 02:54, Vladislav Shpilevoy <v.shpilevoy at tarantool.org> написал(а):
> 
> Thanks for the patch!
> 
> See 8 comments below.

Hi! Thanks for the review!
Please find my comments inline and the diff below.

> 
>>    replication: implement an instance id filter for relay
>> 
>>    Add a filter for relay to skip rows coming from unwanted instances.
>>    A list of instance ids whose rows replica doesn't want to fetch is encoded
>>    together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_FILTER.
>> 
>>    Filtering rows is needed to prevent an instance from fetching its own
>>    rows from a remote master, which is useful on initial configuration and
>>    harmful on resubscribe.
>> 
>>    Prerequisite #4739, #3294
>> 
>>    @TarantoolBot document
>> 
>>    Title: document new binary protocol key and subscribe request changes
>> 
>>    Add key `IPROTO_ID_FILTER = 0x51` to the internals reference.
>>    This is an optional key used in SUBSCRIBE request followed by an array
>>    of ids of instances whose rows won't be relayed to the replica.
>> 
>>    SUBSCRIBE request is supplemented with an optional field of the
>>    following structure:
>>    ```
>>    +====================+
>>    |      ID_FILTER     |
>>    |   0x51 : ID LIST   |
>>    | MP_INT : MP_ARRRAY |
>>    |                    |
>>    +====================+
>>    ```
>>    The field is encoded only when the id list is not empty.
>> 
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index b89632273..dc89b90e2 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -109,6 +109,7 @@ struct relay {
>> 	struct vclock recv_vclock;
>> 	/** Replicatoin slave version. */
>> 	uint32_t version_id;
>> +	unsigned int id_filter;
> 
> 1. Please, add a comment here explaining what is it,
> and why is needed.

Ok.

> 
>> 	/**
>> 	 * Local vclock at the moment of subscribe, used to check
>> 	 * dataset on the other side and send missing data rows if any.
>> @@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
>> 		packet->group_id = GROUP_DEFAULT;
>> 		packet->bodycnt = 0;
>> 	}
>> +	/* Check if the rows from the instance are filtered. */
>> +	if (1 << packet->replica_id & relay->id_filter)
> 
> 2. Please use explicit != 0 here and below when check 'filter_size'
> variable.

No problem.

> 
>> +		return;
>> 	/*
>> 	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
>> 	 * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
>> diff --git a/src/box/xrow.c b/src/box/xrow.c
>> index 968c3a202..10edbf6a8 100644
>> --- a/src/box/xrow.c
>> +++ b/src/box/xrow.c
>> @@ -1194,17 +1194,23 @@ int
>> xrow_encode_subscribe(struct xrow_header *row,
>> 		      const struct tt_uuid *replicaset_uuid,
>> 		      const struct tt_uuid *instance_uuid,
>> -		      const struct vclock *vclock, bool anon)
>> +		      const struct vclock *vclock, bool anon,
>> +		      unsigned int id_filter)
> 
> 3. Nit - it would be better to have it as uint32_t explicitly.
> Becuase max id count is 32. Unsigned int does not have size
> guarantees, formally speaking. It is at least 16 bits, no more
> info.

Done, and I’ll send a followup to vclock.h later, as Kostja suggests.

> 
>> {
>> 	memset(row, 0, sizeof(*row));
>> 	size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
>> +	unsigned int filter_size = __builtin_popcount(id_filter);
>> +	if (filter_size) {
>> +		size += mp_sizeof_array(filter_size) + filter_size *
>> +			mp_sizeof_uint(VCLOCK_MAX);
> 
> 4. You didn't add mp_sizeof_uint(IPROTO_ID_FILTER) here, but you
> didn't update XROW_BODY_LEN_MAX either. Does it fit in the current
> value? Also seems like it can't be called XROW_BODY_LEN_MAX anymore.
> Because clearly it is not enough to fit the filter array, if you
> needed to patch the allocation here.

Let’s then just double XROW_BODY_LEN_MAX up.

> 
> You could also update XROW_BODY_LEN_MAX so as it includes the biggest
> possible filter. Max filter size is just 34 bytes. This is the
> simplest option, I think.
> 
>> +	}
>> 	char *buf = (char *) region_alloc(&fiber()->gc, size);
>> 	if (buf == NULL) {
>> 		diag_set(OutOfMemory, size, "region_alloc", "buf");
>> 		return -1;
>> 	}
>> 	char *data = buf;
>> -	data = mp_encode_map(data, 5);
>> +	data = mp_encode_map(data, filter_size ? 6 : 5);
>> 	data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
>> 	data = xrow_encode_uuid(data, replicaset_uuid);
>> 	data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
>> @@ -1215,6 +1221,17 @@ xrow_encode_subscribe(struct xrow_header *row,
>> 	data = mp_encode_uint(data, tarantool_version_id());
>> 	data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
>> 	data = mp_encode_bool(data, anon);
>> +	if (filter_size) {
> 
> 5. I wouldn't make it optional in encoding, since this is sent
> really rare, but could simplify the code a bit. However, up to
> you.

I don’t like the idea to pass zero length arrays in case there is no filter,
Lets just not encode this part. The filter is initially empty on decoding side
anyway.

> 
> Also, won't it break replication from an old version? You
> will send subscribe with this new key, and the old instance
> should ignore it. Does it? I don't remember.
> 

Yes, the old instance ignores all unknown keys.

> If the old instance would ignore it, it means that the bug
> still can explode when replicating from an old version, right?
> I don't know how to fix that, but if it is true, we need to
> document that, at least.

Yes, the bug’s still here when replicating from an old instance.
To trigger it, you have to set up master-master replication between
the old and the new instances and fit some additional conditions.
I don’t think it’s usual to set up master-master when upgrading, as
Kostja has pointed out.

> 
>> +		data = mp_encode_uint(data, IPROTO_ID_FILTER);
>> +		data = mp_encode_array(data, filter_size);
>> +		struct bit_iterator it;
>> +		bit_iterator_init(&it, &id_filter, sizeof(id_filter),
>> +				  true);
>> +		for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
>> +		     id = bit_iterator_next(&it)) {
>> +			data = mp_encode_uint(data, id);
>> +		}
>> +	}
>> 	assert(data <= buf + size);
>> 	row->body[0].iov_base = buf;
>> 	row->body[0].iov_len = (data - buf);
>> @@ -1301,6 +1321,21 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
>> 			}
>> 			*anon = mp_decode_bool(&d);
>> 			break;
>> +		case IPROTO_ID_FILTER:
>> +			if (id_filter == NULL)
>> +				goto skip;
>> +			if (mp_typeof(*d) != MP_ARRAY) {
>> +decode_err:			xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
>> +						   "invalid id_filter");
> 
> 6. Lets say it in caps: ID_FILTER, just like with other decode
> error messages. Also I would name this label 'id_filter_decode_err',
> becuase other keys can't jump here.

Okay.

> 
>> +				return -1;
>> +			}
>> +			uint32_t len = mp_decode_array(&d);
>> +			for(uint32_t i = 0; i < len; ++i) {
>> +				if (mp_typeof(*d) != MP_UINT)
>> +					goto decode_err;
>> +				*id_filter |= 1 << mp_decode_uint(&d);
> 
> 7. If someone would send a big ID (a program, pretending to be a Tarantool
> instance), it would cause unsigned bit shift overflow, which is undefined
> behaviour. Lets check that it is not bigger than 31.
> 
> However this won't really help much. This code will crash even if I will
> just send a truncated packet. From what I see.

I’m not sure I understand what you’re speaking about. This piece of code is
similar to the one we have in mp_decode_vclock. The situation didn’t get worse,
at least.

> 
> Up to you whether you want to fix the bit shift.

Fixed.

> 
>> +			}
>> +			break;
>> 		default: skip:
>> 			mp_next(&d); /* value */
>> 		}
>> diff --git a/src/box/xrow.h b/src/box/xrow.h
>> index 0973c497d..8e5716b30 100644
>> --- a/src/box/xrow.h
>> +++ b/src/box/xrow.h
>> @@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row,
>>  * @param instance_uuid Instance uuid.
>>  * @param vclock Replication clock.
>>  * @param anon Whether it is an anonymous subscribe request or not.
>> + * @param id_filter A List of replica ids to skip rows from
>> + *                      when feeding a replica.
> 
> 8. Looks like a huge indentation. Keep if you want. I noticed
> just in case it was an accident. In the next @param description
> you aligned it differently.

Thanks! I fixed both pieces.

> 
>>  *
>>  * @retval  0 Success.
>>  * @retval -1 Memory error.
>> @@ -330,7 +332,8 @@ int
>> @@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row,
>>  * @param[out] vclock.
>>  * @param[out] version_id.
>>  * @param[out] anon Whether it is an anonymous subscribe.
>> + * @param[out] id_filter A list of ids to skip rows from when
>> + *             feeding replica.
>>  *
>>  * @retval  0 Success.
>>  * @retval -1 Memory or format error.


diff --git a/src/box/box.cc b/src/box/box.cc
index 94267c74e..09dd67ab4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1787,7 +1787,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	uint32_t replica_version_id;
 	vclock_create(&replica_clock);
 	bool anon;
-	unsigned int id_filter;
+	uint32_t id_filter;
 	xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,
 				 &replica_version_id, &anon, &id_filter);
 
diff --git a/src/box/relay.cc b/src/box/relay.cc
index dc89b90e2..95245a3cf 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -109,7 +109,13 @@ struct relay {
 	struct vclock recv_vclock;
 	/** Replicatoin slave version. */
 	uint32_t version_id;
-	unsigned int id_filter;
+	/**
+	 * A filter of replica ids whose rows should be ignored.
+	 * Each set filter bit corresponds to a replica id whose
+	 * rows shouldn't be relayed. The list of ids to ignore
+	 * is passed by the replica on subscribe.
+	 */
+	uint32_t id_filter;
 	/**
 	 * Local vclock at the moment of subscribe, used to check
 	 * dataset on the other side and send missing data rows if any.
@@ -678,7 +684,7 @@ relay_subscribe_f(va_list ap)
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		struct vclock *replica_clock, uint32_t replica_version_id,
-		unsigned int replica_id_filter)
+		uint32_t replica_id_filter)
 {
 	assert(replica->anon || replica->id != REPLICA_ID_NIL);
 	struct relay *relay = replica->relay;
@@ -768,7 +774,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		packet->bodycnt = 0;
 	}
 	/* Check if the rows from the instance are filtered. */
-	if (1 << packet->replica_id & relay->id_filter)
+	if ((1 << packet->replica_id & relay->id_filter) != 0)
 		return;
 	/*
 	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
diff --git a/src/box/relay.h b/src/box/relay.h
index 6e7eebab1..0632fa912 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		struct vclock *replica_vclock, uint32_t replica_version_id,
-		unsigned int replica_id_filter);
+		uint32_t replica_id_filter);
 
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 10edbf6a8..602049004 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1195,22 +1195,18 @@ xrow_encode_subscribe(struct xrow_header *row,
 		      const struct tt_uuid *replicaset_uuid,
 		      const struct tt_uuid *instance_uuid,
 		      const struct vclock *vclock, bool anon,
-		      unsigned int id_filter)
+		      uint32_t id_filter)
 {
 	memset(row, 0, sizeof(*row));
 	size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
-	unsigned int filter_size = __builtin_popcount(id_filter);
-	if (filter_size) {
-		size += mp_sizeof_array(filter_size) + filter_size *
-			mp_sizeof_uint(VCLOCK_MAX);
-	}
 	char *buf = (char *) region_alloc(&fiber()->gc, size);
 	if (buf == NULL) {
 		diag_set(OutOfMemory, size, "region_alloc", "buf");
 		return -1;
 	}
 	char *data = buf;
-	data = mp_encode_map(data, filter_size ? 6 : 5);
+	int filter_size = __builtin_popcount(id_filter);
+	data = mp_encode_map(data, filter_size != 0 ? 6 : 5);
 	data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
 	data = xrow_encode_uuid(data, replicaset_uuid);
 	data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1221,7 +1217,7 @@ xrow_encode_subscribe(struct xrow_header *row,
 	data = mp_encode_uint(data, tarantool_version_id());
 	data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
 	data = mp_encode_bool(data, anon);
-	if (filter_size) {
+	if (filter_size != 0) {
 		data = mp_encode_uint(data, IPROTO_ID_FILTER);
 		data = mp_encode_array(data, filter_size);
 		struct bit_iterator it;
@@ -1244,7 +1240,7 @@ int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
 		      uint32_t *version_id, bool *anon,
-		      unsigned int *id_filter)
+		      uint32_t *id_filter)
 {
 	if (row->bodycnt == 0) {
 		diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -1325,15 +1321,18 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 			if (id_filter == NULL)
 				goto skip;
 			if (mp_typeof(*d) != MP_ARRAY) {
-decode_err:			xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
-						   "invalid id_filter");
+id_filter_decode_err:		xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+						   "invalid ID_FILTER");
 				return -1;
 			}
 			uint32_t len = mp_decode_array(&d);
-			for(uint32_t i = 0; i < len; ++i) {
+			for (uint32_t i = 0; i < len; ++i) {
 				if (mp_typeof(*d) != MP_UINT)
-					goto decode_err;
-				*id_filter |= 1 << mp_decode_uint(&d);
+					goto id_filter_decode_err;
+				uint64_t val = mp_decode_uint(&d);
+				if (val >= VCLOCK_MAX)
+					goto id_filter_decode_err;
+				*id_filter |= 1 << val;
 			}
 			break;
 		default: skip:
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 8e5716b30..2a0a9c852 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -48,7 +48,7 @@ enum {
 	XROW_BODY_IOVMAX = 2,
 	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
 	XROW_HEADER_LEN_MAX = 52,
-	XROW_BODY_LEN_MAX = 128,
+	XROW_BODY_LEN_MAX = 256,
 	IPROTO_HEADER_LEN = 28,
 	/** 7 = sizeof(iproto_body_bin). */
 	IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
@@ -323,7 +323,7 @@ xrow_encode_register(struct xrow_header *row,
  * @param vclock Replication clock.
  * @param anon Whether it is an anonymous subscribe request or not.
  * @param id_filter A List of replica ids to skip rows from
- *                      when feeding a replica.
+ *		    when feeding a replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
@@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row,
 		      const struct tt_uuid *replicaset_uuid,
 		      const struct tt_uuid *instance_uuid,
 		      const struct vclock *vclock, bool anon,
-		      unsigned int id_filter);
+		      uint32_t id_filter);
 
 /**
  * Decode SUBSCRIBE command.
@@ -344,7 +344,7 @@ xrow_encode_subscribe(struct xrow_header *row,
  * @param[out] version_id.
  * @param[out] anon Whether it is an anonymous subscribe.
  * @param[out] id_filter A list of ids to skip rows from when
- *             feeding replica.
+ *			 feeding a replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory or format error.
@@ -353,7 +353,7 @@ int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
 		      uint32_t *version_id, bool *anon,
-		      unsigned int *id_filter);
+		      uint32_t *id_filter);
 
 /**
  * Encode JOIN command.
@@ -827,7 +827,7 @@ xrow_encode_subscribe_xc(struct xrow_header *row,
 			 const struct tt_uuid *replicaset_uuid,
 			 const struct tt_uuid *instance_uuid,
 			 const struct vclock *vclock, bool anon,
-			 unsigned int id_filter)
+			 uint32_t id_filter)
 {
 	if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
 				  vclock, anon, id_filter) != 0)
@@ -840,7 +840,7 @@ xrow_decode_subscribe_xc(struct xrow_header *row,
 			 struct tt_uuid *replicaset_uuid,
 			 struct tt_uuid *instance_uuid, struct vclock *vclock,
 			 uint32_t *replica_version_id, bool *anon,
-			 unsigned int *id_filter)
+			 uint32_t *id_filter)
 {
 	if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
 				  vclock, replica_version_id, anon,

--
Serge Petrenko
sergepetrenko at tarantool.org <mailto:sergepetrenko at tarantool.org>


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20200227/2dcb8775/attachment.html>


More information about the Tarantool-patches mailing list