[Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay
Serge Petrenko
sergepetrenko at tarantool.org
Thu Feb 27 16:15:54 MSK 2020
> 27 февр. 2020 г., в 02:54, Vladislav Shpilevoy <v.shpilevoy at tarantool.org> написал(а):
>
> Thanks for the patch!
>
> See 8 comments below.
Hi! Thanks for the review!
Please find my comments inline and the diff below.
>
>> replication: implement an instance id filter for relay
>>
>> Add a filter for relay to skip rows coming from unwanted instances.
>> A list of instance ids whose rows replica doesn't want to fetch is encoded
>> together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_FILTER.
>>
>> Filtering rows is needed to prevent an instance from fetching its own
>> rows from a remote master, which is useful on initial configuration and
>> harmful on resubscribe.
>>
>> Prerequisite #4739, #3294
>>
>> @TarantoolBot document
>>
>> Title: document new binary protocol key and subscribe request changes
>>
>> Add key `IPROTO_ID_FILTER = 0x51` to the internals reference.
>> This is an optional key used in SUBSCRIBE request followed by an array
>> of ids of instances whose rows won't be relayed to the replica.
>>
>> SUBSCRIBE request is supplemented with an optional field of the
>> following structure:
>> ```
>> +====================+
>> | ID_FILTER |
>> | 0x51 : ID LIST |
>> | MP_INT : MP_ARRRAY |
>> | |
>> +====================+
>> ```
>> The field is encoded only when the id list is not empty.
>>
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index b89632273..dc89b90e2 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -109,6 +109,7 @@ struct relay {
>> struct vclock recv_vclock;
>> /** Replicatoin slave version. */
>> uint32_t version_id;
>> + unsigned int id_filter;
>
> 1. Please, add a comment here explaining what is it,
> and why is needed.
Ok.
>
>> /**
>> * Local vclock at the moment of subscribe, used to check
>> * dataset on the other side and send missing data rows if any.
>> @@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
>> packet->group_id = GROUP_DEFAULT;
>> packet->bodycnt = 0;
>> }
>> + /* Check if the rows from the instance are filtered. */
>> + if (1 << packet->replica_id & relay->id_filter)
>
> 2. Please use explicit != 0 here and below when check 'filter_size'
> variable.
No problem.
>
>> + return;
>> /*
>> * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
>> * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
>> diff --git a/src/box/xrow.c b/src/box/xrow.c
>> index 968c3a202..10edbf6a8 100644
>> --- a/src/box/xrow.c
>> +++ b/src/box/xrow.c
>> @@ -1194,17 +1194,23 @@ int
>> xrow_encode_subscribe(struct xrow_header *row,
>> const struct tt_uuid *replicaset_uuid,
>> const struct tt_uuid *instance_uuid,
>> - const struct vclock *vclock, bool anon)
>> + const struct vclock *vclock, bool anon,
>> + unsigned int id_filter)
>
> 3. Nit - it would be better to have it as uint32_t explicitly.
> Becuase max id count is 32. Unsigned int does not have size
> guarantees, formally speaking. It is at least 16 bits, no more
> info.
Done, and I’ll send a followup to vclock.h later, as Kostja suggests.
>
>> {
>> memset(row, 0, sizeof(*row));
>> size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
>> + unsigned int filter_size = __builtin_popcount(id_filter);
>> + if (filter_size) {
>> + size += mp_sizeof_array(filter_size) + filter_size *
>> + mp_sizeof_uint(VCLOCK_MAX);
>
> 4. You didn't add mp_sizeof_uint(IPROTO_ID_FILTER) here, but you
> didn't update XROW_BODY_LEN_MAX either. Does it fit in the current
> value? Also seems like it can't be called XROW_BODY_LEN_MAX anymore.
> Because clearly it is not enough to fit the filter array, if you
> needed to patch the allocation here.
Let’s then just double XROW_BODY_LEN_MAX up.
>
> You could also update XROW_BODY_LEN_MAX so as it includes the biggest
> possible filter. Max filter size is just 34 bytes. This is the
> simplest option, I think.
>
>> + }
>> char *buf = (char *) region_alloc(&fiber()->gc, size);
>> if (buf == NULL) {
>> diag_set(OutOfMemory, size, "region_alloc", "buf");
>> return -1;
>> }
>> char *data = buf;
>> - data = mp_encode_map(data, 5);
>> + data = mp_encode_map(data, filter_size ? 6 : 5);
>> data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
>> data = xrow_encode_uuid(data, replicaset_uuid);
>> data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
>> @@ -1215,6 +1221,17 @@ xrow_encode_subscribe(struct xrow_header *row,
>> data = mp_encode_uint(data, tarantool_version_id());
>> data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
>> data = mp_encode_bool(data, anon);
>> + if (filter_size) {
>
> 5. I wouldn't make it optional in encoding, since this is sent
> really rare, but could simplify the code a bit. However, up to
> you.
I don’t like the idea to pass zero length arrays in case there is no filter,
Lets just not encode this part. The filter is initially empty on decoding side
anyway.
>
> Also, won't it break replication from an old version? You
> will send subscribe with this new key, and the old instance
> should ignore it. Does it? I don't remember.
>
Yes, the old instance ignores all unknown keys.
> If the old instance would ignore it, it means that the bug
> still can explode when replicating from an old version, right?
> I don't know how to fix that, but if it is true, we need to
> document that, at least.
Yes, the bug’s still here when replicating from an old instance.
To trigger it, you have to set up master-master replication between
the old and the new instances and fit some additional conditions.
I don’t think it’s usual to set up master-master when upgrading, as
Kostja has pointed out.
>
>> + data = mp_encode_uint(data, IPROTO_ID_FILTER);
>> + data = mp_encode_array(data, filter_size);
>> + struct bit_iterator it;
>> + bit_iterator_init(&it, &id_filter, sizeof(id_filter),
>> + true);
>> + for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
>> + id = bit_iterator_next(&it)) {
>> + data = mp_encode_uint(data, id);
>> + }
>> + }
>> assert(data <= buf + size);
>> row->body[0].iov_base = buf;
>> row->body[0].iov_len = (data - buf);
>> @@ -1301,6 +1321,21 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
>> }
>> *anon = mp_decode_bool(&d);
>> break;
>> + case IPROTO_ID_FILTER:
>> + if (id_filter == NULL)
>> + goto skip;
>> + if (mp_typeof(*d) != MP_ARRAY) {
>> +decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
>> + "invalid id_filter");
>
> 6. Lets say it in caps: ID_FILTER, just like with other decode
> error messages. Also I would name this label 'id_filter_decode_err',
> becuase other keys can't jump here.
Okay.
>
>> + return -1;
>> + }
>> + uint32_t len = mp_decode_array(&d);
>> + for(uint32_t i = 0; i < len; ++i) {
>> + if (mp_typeof(*d) != MP_UINT)
>> + goto decode_err;
>> + *id_filter |= 1 << mp_decode_uint(&d);
>
> 7. If someone would send a big ID (a program, pretending to be a Tarantool
> instance), it would cause unsigned bit shift overflow, which is undefined
> behaviour. Lets check that it is not bigger than 31.
>
> However this won't really help much. This code will crash even if I will
> just send a truncated packet. From what I see.
I’m not sure I understand what you’re speaking about. This piece of code is
similar to the one we have in mp_decode_vclock. The situation didn’t get worse,
at least.
>
> Up to you whether you want to fix the bit shift.
Fixed.
>
>> + }
>> + break;
>> default: skip:
>> mp_next(&d); /* value */
>> }
>> diff --git a/src/box/xrow.h b/src/box/xrow.h
>> index 0973c497d..8e5716b30 100644
>> --- a/src/box/xrow.h
>> +++ b/src/box/xrow.h
>> @@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row,
>> * @param instance_uuid Instance uuid.
>> * @param vclock Replication clock.
>> * @param anon Whether it is an anonymous subscribe request or not.
>> + * @param id_filter A List of replica ids to skip rows from
>> + * when feeding a replica.
>
> 8. Looks like a huge indentation. Keep if you want. I noticed
> just in case it was an accident. In the next @param description
> you aligned it differently.
Thanks! I fixed both pieces.
>
>> *
>> * @retval 0 Success.
>> * @retval -1 Memory error.
>> @@ -330,7 +332,8 @@ int
>> @@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row,
>> * @param[out] vclock.
>> * @param[out] version_id.
>> * @param[out] anon Whether it is an anonymous subscribe.
>> + * @param[out] id_filter A list of ids to skip rows from when
>> + * feeding replica.
>> *
>> * @retval 0 Success.
>> * @retval -1 Memory or format error.
diff --git a/src/box/box.cc b/src/box/box.cc
index 94267c74e..09dd67ab4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1787,7 +1787,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
uint32_t replica_version_id;
vclock_create(&replica_clock);
bool anon;
- unsigned int id_filter;
+ uint32_t id_filter;
xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,
&replica_version_id, &anon, &id_filter);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index dc89b90e2..95245a3cf 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -109,7 +109,13 @@ struct relay {
struct vclock recv_vclock;
/** Replicatoin slave version. */
uint32_t version_id;
- unsigned int id_filter;
+ /**
+ * A filter of replica ids whose rows should be ignored.
+ * Each set filter bit corresponds to a replica id whose
+ * rows shouldn't be relayed. The list of ids to ignore
+ * is passed by the replica on subscribe.
+ */
+ uint32_t id_filter;
/**
* Local vclock at the moment of subscribe, used to check
* dataset on the other side and send missing data rows if any.
@@ -678,7 +684,7 @@ relay_subscribe_f(va_list ap)
void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
struct vclock *replica_clock, uint32_t replica_version_id,
- unsigned int replica_id_filter)
+ uint32_t replica_id_filter)
{
assert(replica->anon || replica->id != REPLICA_ID_NIL);
struct relay *relay = replica->relay;
@@ -768,7 +774,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
packet->bodycnt = 0;
}
/* Check if the rows from the instance are filtered. */
- if (1 << packet->replica_id & relay->id_filter)
+ if ((1 << packet->replica_id & relay->id_filter) != 0)
return;
/*
* We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
diff --git a/src/box/relay.h b/src/box/relay.h
index 6e7eebab1..0632fa912 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
struct vclock *replica_vclock, uint32_t replica_version_id,
- unsigned int replica_id_filter);
+ uint32_t replica_id_filter);
#endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 10edbf6a8..602049004 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1195,22 +1195,18 @@ xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
const struct vclock *vclock, bool anon,
- unsigned int id_filter)
+ uint32_t id_filter)
{
memset(row, 0, sizeof(*row));
size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
- unsigned int filter_size = __builtin_popcount(id_filter);
- if (filter_size) {
- size += mp_sizeof_array(filter_size) + filter_size *
- mp_sizeof_uint(VCLOCK_MAX);
- }
char *buf = (char *) region_alloc(&fiber()->gc, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
return -1;
}
char *data = buf;
- data = mp_encode_map(data, filter_size ? 6 : 5);
+ int filter_size = __builtin_popcount(id_filter);
+ data = mp_encode_map(data, filter_size != 0 ? 6 : 5);
data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
data = xrow_encode_uuid(data, replicaset_uuid);
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1221,7 +1217,7 @@ xrow_encode_subscribe(struct xrow_header *row,
data = mp_encode_uint(data, tarantool_version_id());
data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
data = mp_encode_bool(data, anon);
- if (filter_size) {
+ if (filter_size != 0) {
data = mp_encode_uint(data, IPROTO_ID_FILTER);
data = mp_encode_array(data, filter_size);
struct bit_iterator it;
@@ -1244,7 +1240,7 @@ int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
uint32_t *version_id, bool *anon,
- unsigned int *id_filter)
+ uint32_t *id_filter)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -1325,15 +1321,18 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
if (id_filter == NULL)
goto skip;
if (mp_typeof(*d) != MP_ARRAY) {
-decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
- "invalid id_filter");
+id_filter_decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+ "invalid ID_FILTER");
return -1;
}
uint32_t len = mp_decode_array(&d);
- for(uint32_t i = 0; i < len; ++i) {
+ for (uint32_t i = 0; i < len; ++i) {
if (mp_typeof(*d) != MP_UINT)
- goto decode_err;
- *id_filter |= 1 << mp_decode_uint(&d);
+ goto id_filter_decode_err;
+ uint64_t val = mp_decode_uint(&d);
+ if (val >= VCLOCK_MAX)
+ goto id_filter_decode_err;
+ *id_filter |= 1 << val;
}
break;
default: skip:
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 8e5716b30..2a0a9c852 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -48,7 +48,7 @@ enum {
XROW_BODY_IOVMAX = 2,
XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
XROW_HEADER_LEN_MAX = 52,
- XROW_BODY_LEN_MAX = 128,
+ XROW_BODY_LEN_MAX = 256,
IPROTO_HEADER_LEN = 28,
/** 7 = sizeof(iproto_body_bin). */
IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
@@ -323,7 +323,7 @@ xrow_encode_register(struct xrow_header *row,
* @param vclock Replication clock.
* @param anon Whether it is an anonymous subscribe request or not.
* @param id_filter A List of replica ids to skip rows from
- * when feeding a replica.
+ * when feeding a replica.
*
* @retval 0 Success.
* @retval -1 Memory error.
@@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
const struct vclock *vclock, bool anon,
- unsigned int id_filter);
+ uint32_t id_filter);
/**
* Decode SUBSCRIBE command.
@@ -344,7 +344,7 @@ xrow_encode_subscribe(struct xrow_header *row,
* @param[out] version_id.
* @param[out] anon Whether it is an anonymous subscribe.
* @param[out] id_filter A list of ids to skip rows from when
- * feeding replica.
+ * feeding a replica.
*
* @retval 0 Success.
* @retval -1 Memory or format error.
@@ -353,7 +353,7 @@ int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
uint32_t *version_id, bool *anon,
- unsigned int *id_filter);
+ uint32_t *id_filter);
/**
* Encode JOIN command.
@@ -827,7 +827,7 @@ xrow_encode_subscribe_xc(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
const struct vclock *vclock, bool anon,
- unsigned int id_filter)
+ uint32_t id_filter)
{
if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
vclock, anon, id_filter) != 0)
@@ -840,7 +840,7 @@ xrow_decode_subscribe_xc(struct xrow_header *row,
struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
uint32_t *replica_version_id, bool *anon,
- unsigned int *id_filter)
+ uint32_t *id_filter)
{
if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
vclock, replica_version_id, anon,
--
Serge Petrenko
sergepetrenko at tarantool.org <mailto:sergepetrenko at tarantool.org>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20200227/2dcb8775/attachment.html>
More information about the Tarantool-patches
mailing list