[Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay
    Vladislav Shpilevoy 
    v.shpilevoy at tarantool.org
       
    Thu Feb 27 02:54:48 MSK 2020
    
    
  
Thanks for the patch!
See 8 comments below.
>     replication: implement an instance id filter for relay
>     
>     Add a filter for relay to skip rows coming from unwanted instances.
>     A list of instance ids whose rows replica doesn't want to fetch is encoded
>     together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_FILTER.
>     
>     Filtering rows is needed to prevent an instance from fetching its own
>     rows from a remote master, which is useful on initial configuration and
>     harmful on resubscribe.
>     
>     Prerequisite #4739, #3294
>     
>     @TarantoolBot document
>     
>     Title: document new binary protocol key and subscribe request changes
>     
>     Add key `IPROTO_ID_FILTER = 0x51` to the internals reference.
>     This is an optional key used in SUBSCRIBE request followed by an array
>     of ids of instances whose rows won't be relayed to the replica.
>     
>     SUBSCRIBE request is supplemented with an optional field of the
>     following structure:
>     ```
>     +====================+
>     |      ID_FILTER     |
>     |   0x51 : ID LIST   |
>     | MP_INT : MP_ARRRAY |
>     |                    |
>     +====================+
>     ```
>     The field is encoded only when the id list is not empty.
> 
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index b89632273..dc89b90e2 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -109,6 +109,7 @@ struct relay {
>  	struct vclock recv_vclock;
>  	/** Replicatoin slave version. */
>  	uint32_t version_id;
> +	unsigned int id_filter;
1. Please, add a comment here explaining what is it,
and why is needed.
>  	/**
>  	 * Local vclock at the moment of subscribe, used to check
>  	 * dataset on the other side and send missing data rows if any.
> @@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
>  		packet->group_id = GROUP_DEFAULT;
>  		packet->bodycnt = 0;
>  	}
> +	/* Check if the rows from the instance are filtered. */
> +	if (1 << packet->replica_id & relay->id_filter)
2. Please use explicit != 0 here and below when check 'filter_size'
variable.
> +		return;
>  	/*
>  	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
>  	 * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index 968c3a202..10edbf6a8 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -1194,17 +1194,23 @@ int
>  xrow_encode_subscribe(struct xrow_header *row,
>  		      const struct tt_uuid *replicaset_uuid,
>  		      const struct tt_uuid *instance_uuid,
> -		      const struct vclock *vclock, bool anon)
> +		      const struct vclock *vclock, bool anon,
> +		      unsigned int id_filter)
3. Nit - it would be better to have it as uint32_t explicitly.
Becuase max id count is 32. Unsigned int does not have size
guarantees, formally speaking. It is at least 16 bits, no more
info.
>  {
>  	memset(row, 0, sizeof(*row));
>  	size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
> +	unsigned int filter_size = __builtin_popcount(id_filter);
> +	if (filter_size) {
> +		size += mp_sizeof_array(filter_size) + filter_size *
> +			mp_sizeof_uint(VCLOCK_MAX);
4. You didn't add mp_sizeof_uint(IPROTO_ID_FILTER) here, but you
didn't update XROW_BODY_LEN_MAX either. Does it fit in the current
value? Also seems like it can't be called XROW_BODY_LEN_MAX anymore.
Because clearly it is not enough to fit the filter array, if you
needed to patch the allocation here.
You could also update XROW_BODY_LEN_MAX so as it includes the biggest
possible filter. Max filter size is just 34 bytes. This is the
simplest option, I think.
> +	}
>  	char *buf = (char *) region_alloc(&fiber()->gc, size);
>  	if (buf == NULL) {
>  		diag_set(OutOfMemory, size, "region_alloc", "buf");
>  		return -1;
>  	}
>  	char *data = buf;
> -	data = mp_encode_map(data, 5);
> +	data = mp_encode_map(data, filter_size ? 6 : 5);
>  	data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
>  	data = xrow_encode_uuid(data, replicaset_uuid);
>  	data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
> @@ -1215,6 +1221,17 @@ xrow_encode_subscribe(struct xrow_header *row,
>  	data = mp_encode_uint(data, tarantool_version_id());
>  	data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
>  	data = mp_encode_bool(data, anon);
> +	if (filter_size) {
5. I wouldn't make it optional in encoding, since this is sent
really rare, but could simplify the code a bit. However, up to
you.
Also, won't it break replication from an old version? You
will send subscribe with this new key, and the old instance
should ignore it. Does it? I don't remember.
If the old instance would ignore it, it means that the bug
still can explode when replicating from an old version, right?
I don't know how to fix that, but if it is true, we need to
document that, at least.
> +		data = mp_encode_uint(data, IPROTO_ID_FILTER);
> +		data = mp_encode_array(data, filter_size);
> +		struct bit_iterator it;
> +		bit_iterator_init(&it, &id_filter, sizeof(id_filter),
> +				  true);
> +		for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
> +		     id = bit_iterator_next(&it)) {
> +			data = mp_encode_uint(data, id);
> +		}
> +	}
>  	assert(data <= buf + size);
>  	row->body[0].iov_base = buf;
>  	row->body[0].iov_len = (data - buf);
> @@ -1301,6 +1321,21 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
>  			}
>  			*anon = mp_decode_bool(&d);
>  			break;
> +		case IPROTO_ID_FILTER:
> +			if (id_filter == NULL)
> +				goto skip;
> +			if (mp_typeof(*d) != MP_ARRAY) {
> +decode_err:			xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
> +						   "invalid id_filter");
6. Lets say it in caps: ID_FILTER, just like with other decode
error messages. Also I would name this label 'id_filter_decode_err',
becuase other keys can't jump here.
> +				return -1;
> +			}
> +			uint32_t len = mp_decode_array(&d);
> +			for(uint32_t i = 0; i < len; ++i) {
> +				if (mp_typeof(*d) != MP_UINT)
> +					goto decode_err;
> +				*id_filter |= 1 << mp_decode_uint(&d);
7. If someone would send a big ID (a program, pretending to be a Tarantool
instance), it would cause unsigned bit shift overflow, which is undefined
behaviour. Lets check that it is not bigger than 31.
However this won't really help much. This code will crash even if I will
just send a truncated packet. From what I see.
Up to you whether you want to fix the bit shift.
> +			}
> +			break;
>  		default: skip:
>  			mp_next(&d); /* value */
>  		}
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 0973c497d..8e5716b30 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row,
>   * @param instance_uuid Instance uuid.
>   * @param vclock Replication clock.
>   * @param anon Whether it is an anonymous subscribe request or not.
> + * @param id_filter A List of replica ids to skip rows from
> + *                      when feeding a replica.
8. Looks like a huge indentation. Keep if you want. I noticed
just in case it was an accident. In the next @param description
you aligned it differently.
>   *
>   * @retval  0 Success.
>   * @retval -1 Memory error.
> @@ -330,7 +332,8 @@ int
> @@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row,
>   * @param[out] vclock.
>   * @param[out] version_id.
>   * @param[out] anon Whether it is an anonymous subscribe.
> + * @param[out] id_filter A list of ids to skip rows from when
> + *             feeding replica.
>   *
>   * @retval  0 Success.
>   * @retval -1 Memory or format error.
    
    
More information about the Tarantool-patches
mailing list