From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp47.i.mail.ru (smtp47.i.mail.ru [94.100.177.107]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id EC93A469719 for ; Thu, 27 Feb 2020 16:15:56 +0300 (MSK) From: Serge Petrenko Message-Id: Content-Type: multipart/alternative; boundary="Apple-Mail=_B43F93F1-BD7B-4955-AFA9-BA15CBEE88CB" Mime-Version: 1.0 (Mac OS X Mail 13.0 \(3608.40.2.2.4\)) Date: Thu, 27 Feb 2020 16:15:54 +0300 In-Reply-To: <172faa01-c31c-76e6-bb45-066f44ffc73d@tarantool.org> References: <172faa01-c31c-76e6-bb45-066f44ffc73d@tarantool.org> Subject: Re: [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy Cc: kirichenkoga@gmail.com, tarantool-patches@dev.tarantool.org --Apple-Mail=_B43F93F1-BD7B-4955-AFA9-BA15CBEE88CB Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 > 27 =D1=84=D0=B5=D0=B2=D1=80. 2020 =D0=B3., =D0=B2 02:54, Vladislav = Shpilevoy =D0=BD=D0=B0=D0=BF=D0=B8=D1=81=D0=B0= =D0=BB(=D0=B0): >=20 > Thanks for the patch! >=20 > See 8 comments below. Hi! Thanks for the review! Please find my comments inline and the diff below. >=20 >> replication: implement an instance id filter for relay >>=20 >> Add a filter for relay to skip rows coming from unwanted = instances. >> A list of instance ids whose rows replica doesn't want to fetch is = encoded >> together with SUBSCRIBE request after a freshly introduced flag = IPROTO_ID_FILTER. >>=20 >> Filtering rows is needed to prevent an instance from fetching its = own >> rows from a remote master, which is useful on initial = configuration and >> harmful on resubscribe. >>=20 >> Prerequisite #4739, #3294 >>=20 >> @TarantoolBot document >>=20 >> Title: document new binary protocol key and subscribe request = changes >>=20 >> Add key `IPROTO_ID_FILTER =3D 0x51` to the internals reference. >> This is an optional key used in SUBSCRIBE request followed by an = array >> of ids of instances whose rows won't be relayed to the replica. >>=20 >> SUBSCRIBE request is supplemented with an optional field of the >> following structure: >> ``` >> +=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D+ >> | ID_FILTER | >> | 0x51 : ID LIST | >> | MP_INT : MP_ARRRAY | >> | | >> +=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D+ >> ``` >> The field is encoded only when the id list is not empty. >>=20 >> diff --git a/src/box/relay.cc b/src/box/relay.cc >> index b89632273..dc89b90e2 100644 >> --- a/src/box/relay.cc >> +++ b/src/box/relay.cc >> @@ -109,6 +109,7 @@ struct relay { >> struct vclock recv_vclock; >> /** Replicatoin slave version. */ >> uint32_t version_id; >> + unsigned int id_filter; >=20 > 1. Please, add a comment here explaining what is it, > and why is needed. Ok. >=20 >> /** >> * Local vclock at the moment of subscribe, used to check >> * dataset on the other side and send missing data rows if any. >> @@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, struct = xrow_header *packet) >> packet->group_id =3D GROUP_DEFAULT; >> packet->bodycnt =3D 0; >> } >> + /* Check if the rows from the instance are filtered. */ >> + if (1 << packet->replica_id & relay->id_filter) >=20 > 2. Please use explicit !=3D 0 here and below when check 'filter_size' > variable. No problem. >=20 >> + return; >> /* >> * We're feeding a WAL, thus responding to FINAL JOIN or = SUBSCRIBE >> * request. If this is FINAL JOIN (i.e. relay->replica is NULL), >> diff --git a/src/box/xrow.c b/src/box/xrow.c >> index 968c3a202..10edbf6a8 100644 >> --- a/src/box/xrow.c >> +++ b/src/box/xrow.c >> @@ -1194,17 +1194,23 @@ int >> xrow_encode_subscribe(struct xrow_header *row, >> const struct tt_uuid *replicaset_uuid, >> const struct tt_uuid *instance_uuid, >> - const struct vclock *vclock, bool anon) >> + const struct vclock *vclock, bool anon, >> + unsigned int id_filter) >=20 > 3. Nit - it would be better to have it as uint32_t explicitly. > Becuase max id count is 32. Unsigned int does not have size > guarantees, formally speaking. It is at least 16 bits, no more > info. Done, and I=E2=80=99ll send a followup to vclock.h later, as Kostja = suggests. >=20 >> { >> memset(row, 0, sizeof(*row)); >> size_t size =3D XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); >> + unsigned int filter_size =3D __builtin_popcount(id_filter); >> + if (filter_size) { >> + size +=3D mp_sizeof_array(filter_size) + filter_size * >> + mp_sizeof_uint(VCLOCK_MAX); >=20 > 4. You didn't add mp_sizeof_uint(IPROTO_ID_FILTER) here, but you > didn't update XROW_BODY_LEN_MAX either. Does it fit in the current > value? Also seems like it can't be called XROW_BODY_LEN_MAX anymore. > Because clearly it is not enough to fit the filter array, if you > needed to patch the allocation here. Let=E2=80=99s then just double XROW_BODY_LEN_MAX up. >=20 > You could also update XROW_BODY_LEN_MAX so as it includes the biggest > possible filter. Max filter size is just 34 bytes. This is the > simplest option, I think. >=20 >> + } >> char *buf =3D (char *) region_alloc(&fiber()->gc, size); >> if (buf =3D=3D NULL) { >> diag_set(OutOfMemory, size, "region_alloc", "buf"); >> return -1; >> } >> char *data =3D buf; >> - data =3D mp_encode_map(data, 5); >> + data =3D mp_encode_map(data, filter_size ? 6 : 5); >> data =3D mp_encode_uint(data, IPROTO_CLUSTER_UUID); >> data =3D xrow_encode_uuid(data, replicaset_uuid); >> data =3D mp_encode_uint(data, IPROTO_INSTANCE_UUID); >> @@ -1215,6 +1221,17 @@ xrow_encode_subscribe(struct xrow_header *row, >> data =3D mp_encode_uint(data, tarantool_version_id()); >> data =3D mp_encode_uint(data, IPROTO_REPLICA_ANON); >> data =3D mp_encode_bool(data, anon); >> + if (filter_size) { >=20 > 5. I wouldn't make it optional in encoding, since this is sent > really rare, but could simplify the code a bit. However, up to > you. I don=E2=80=99t like the idea to pass zero length arrays in case there = is no filter, Lets just not encode this part. The filter is initially empty on = decoding side anyway. >=20 > Also, won't it break replication from an old version? You > will send subscribe with this new key, and the old instance > should ignore it. Does it? I don't remember. >=20 Yes, the old instance ignores all unknown keys. > If the old instance would ignore it, it means that the bug > still can explode when replicating from an old version, right? > I don't know how to fix that, but if it is true, we need to > document that, at least. Yes, the bug=E2=80=99s still here when replicating from an old instance. To trigger it, you have to set up master-master replication between the old and the new instances and fit some additional conditions. I don=E2=80=99t think it=E2=80=99s usual to set up master-master when = upgrading, as Kostja has pointed out. >=20 >> + data =3D mp_encode_uint(data, IPROTO_ID_FILTER); >> + data =3D mp_encode_array(data, filter_size); >> + struct bit_iterator it; >> + bit_iterator_init(&it, &id_filter, sizeof(id_filter), >> + true); >> + for (size_t id =3D bit_iterator_next(&it); id < = VCLOCK_MAX; >> + id =3D bit_iterator_next(&it)) { >> + data =3D mp_encode_uint(data, id); >> + } >> + } >> assert(data <=3D buf + size); >> row->body[0].iov_base =3D buf; >> row->body[0].iov_len =3D (data - buf); >> @@ -1301,6 +1321,21 @@ xrow_decode_subscribe(struct xrow_header *row, = struct tt_uuid *replicaset_uuid, >> } >> *anon =3D mp_decode_bool(&d); >> break; >> + case IPROTO_ID_FILTER: >> + if (id_filter =3D=3D NULL) >> + goto skip; >> + if (mp_typeof(*d) !=3D MP_ARRAY) { >> +decode_err: xrow_on_decode_err(data, end, = ER_INVALID_MSGPACK, >> + "invalid id_filter"); >=20 > 6. Lets say it in caps: ID_FILTER, just like with other decode > error messages. Also I would name this label 'id_filter_decode_err', > becuase other keys can't jump here. Okay. >=20 >> + return -1; >> + } >> + uint32_t len =3D mp_decode_array(&d); >> + for(uint32_t i =3D 0; i < len; ++i) { >> + if (mp_typeof(*d) !=3D MP_UINT) >> + goto decode_err; >> + *id_filter |=3D 1 << mp_decode_uint(&d); >=20 > 7. If someone would send a big ID (a program, pretending to be a = Tarantool > instance), it would cause unsigned bit shift overflow, which is = undefined > behaviour. Lets check that it is not bigger than 31. >=20 > However this won't really help much. This code will crash even if I = will > just send a truncated packet. =46rom what I see. I=E2=80=99m not sure I understand what you=E2=80=99re speaking about. = This piece of code is similar to the one we have in mp_decode_vclock. The situation didn=E2=80=99= t get worse, at least. >=20 > Up to you whether you want to fix the bit shift. Fixed. >=20 >> + } >> + break; >> default: skip: >> mp_next(&d); /* value */ >> } >> diff --git a/src/box/xrow.h b/src/box/xrow.h >> index 0973c497d..8e5716b30 100644 >> --- a/src/box/xrow.h >> +++ b/src/box/xrow.h >> @@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row, >> * @param instance_uuid Instance uuid. >> * @param vclock Replication clock. >> * @param anon Whether it is an anonymous subscribe request or not. >> + * @param id_filter A List of replica ids to skip rows from >> + * when feeding a replica. >=20 > 8. Looks like a huge indentation. Keep if you want. I noticed > just in case it was an accident. In the next @param description > you aligned it differently. Thanks! I fixed both pieces. >=20 >> * >> * @retval 0 Success. >> * @retval -1 Memory error. >> @@ -330,7 +332,8 @@ int >> @@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row, >> * @param[out] vclock. >> * @param[out] version_id. >> * @param[out] anon Whether it is an anonymous subscribe. >> + * @param[out] id_filter A list of ids to skip rows from when >> + * feeding replica. >> * >> * @retval 0 Success. >> * @retval -1 Memory or format error. diff --git a/src/box/box.cc b/src/box/box.cc index 94267c74e..09dd67ab4 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1787,7 +1787,7 @@ box_process_subscribe(struct ev_io *io, struct = xrow_header *header) uint32_t replica_version_id; vclock_create(&replica_clock); bool anon; - unsigned int id_filter; + uint32_t id_filter; xrow_decode_subscribe_xc(header, NULL, &replica_uuid, = &replica_clock, &replica_version_id, &anon, = &id_filter); =20 diff --git a/src/box/relay.cc b/src/box/relay.cc index dc89b90e2..95245a3cf 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -109,7 +109,13 @@ struct relay { struct vclock recv_vclock; /** Replicatoin slave version. */ uint32_t version_id; - unsigned int id_filter; + /** + * A filter of replica ids whose rows should be ignored. + * Each set filter bit corresponds to a replica id whose + * rows shouldn't be relayed. The list of ids to ignore + * is passed by the replica on subscribe. + */ + uint32_t id_filter; /** * Local vclock at the moment of subscribe, used to check * dataset on the other side and send missing data rows if any. @@ -678,7 +684,7 @@ relay_subscribe_f(va_list ap) void relay_subscribe(struct replica *replica, int fd, uint64_t sync, struct vclock *replica_clock, uint32_t = replica_version_id, - unsigned int replica_id_filter) + uint32_t replica_id_filter) { assert(replica->anon || replica->id !=3D REPLICA_ID_NIL); struct relay *relay =3D replica->relay; @@ -768,7 +774,7 @@ relay_send_row(struct xstream *stream, struct = xrow_header *packet) packet->bodycnt =3D 0; } /* Check if the rows from the instance are filtered. */ - if (1 << packet->replica_id & relay->id_filter) + if ((1 << packet->replica_id & relay->id_filter) !=3D 0) return; /* * We're feeding a WAL, thus responding to FINAL JOIN or = SUBSCRIBE diff --git a/src/box/relay.h b/src/box/relay.h index 6e7eebab1..0632fa912 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct = vclock *start_vclock, void relay_subscribe(struct replica *replica, int fd, uint64_t sync, struct vclock *replica_vclock, uint32_t = replica_version_id, - unsigned int replica_id_filter); + uint32_t replica_id_filter); =20 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */ diff --git a/src/box/xrow.c b/src/box/xrow.c index 10edbf6a8..602049004 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1195,22 +1195,18 @@ xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, const struct vclock *vclock, bool anon, - unsigned int id_filter) + uint32_t id_filter) { memset(row, 0, sizeof(*row)); size_t size =3D XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); - unsigned int filter_size =3D __builtin_popcount(id_filter); - if (filter_size) { - size +=3D mp_sizeof_array(filter_size) + filter_size * - mp_sizeof_uint(VCLOCK_MAX); - } char *buf =3D (char *) region_alloc(&fiber()->gc, size); if (buf =3D=3D NULL) { diag_set(OutOfMemory, size, "region_alloc", "buf"); return -1; } char *data =3D buf; - data =3D mp_encode_map(data, filter_size ? 6 : 5); + int filter_size =3D __builtin_popcount(id_filter); + data =3D mp_encode_map(data, filter_size !=3D 0 ? 6 : 5); data =3D mp_encode_uint(data, IPROTO_CLUSTER_UUID); data =3D xrow_encode_uuid(data, replicaset_uuid); data =3D mp_encode_uint(data, IPROTO_INSTANCE_UUID); @@ -1221,7 +1217,7 @@ xrow_encode_subscribe(struct xrow_header *row, data =3D mp_encode_uint(data, tarantool_version_id()); data =3D mp_encode_uint(data, IPROTO_REPLICA_ANON); data =3D mp_encode_bool(data, anon); - if (filter_size) { + if (filter_size !=3D 0) { data =3D mp_encode_uint(data, IPROTO_ID_FILTER); data =3D mp_encode_array(data, filter_size); struct bit_iterator it; @@ -1244,7 +1240,7 @@ int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid = *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock = *vclock, uint32_t *version_id, bool *anon, - unsigned int *id_filter) + uint32_t *id_filter) { if (row->bodycnt =3D=3D 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request = body"); @@ -1325,15 +1321,18 @@ xrow_decode_subscribe(struct xrow_header *row, = struct tt_uuid *replicaset_uuid, if (id_filter =3D=3D NULL) goto skip; if (mp_typeof(*d) !=3D MP_ARRAY) { -decode_err: xrow_on_decode_err(data, end, = ER_INVALID_MSGPACK, - "invalid id_filter"); +id_filter_decode_err: xrow_on_decode_err(data, end, = ER_INVALID_MSGPACK, + "invalid ID_FILTER"); return -1; } uint32_t len =3D mp_decode_array(&d); - for(uint32_t i =3D 0; i < len; ++i) { + for (uint32_t i =3D 0; i < len; ++i) { if (mp_typeof(*d) !=3D MP_UINT) - goto decode_err; - *id_filter |=3D 1 << mp_decode_uint(&d); + goto id_filter_decode_err; + uint64_t val =3D mp_decode_uint(&d); + if (val >=3D VCLOCK_MAX) + goto id_filter_decode_err; + *id_filter |=3D 1 << val; } break; default: skip: diff --git a/src/box/xrow.h b/src/box/xrow.h index 8e5716b30..2a0a9c852 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -48,7 +48,7 @@ enum { XROW_BODY_IOVMAX =3D 2, XROW_IOVMAX =3D XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, XROW_HEADER_LEN_MAX =3D 52, - XROW_BODY_LEN_MAX =3D 128, + XROW_BODY_LEN_MAX =3D 256, IPROTO_HEADER_LEN =3D 28, /** 7 =3D sizeof(iproto_body_bin). */ IPROTO_SELECT_HEADER_LEN =3D IPROTO_HEADER_LEN + 7, @@ -323,7 +323,7 @@ xrow_encode_register(struct xrow_header *row, * @param vclock Replication clock. * @param anon Whether it is an anonymous subscribe request or not. * @param id_filter A List of replica ids to skip rows from - * when feeding a replica. + * when feeding a replica. * * @retval 0 Success. * @retval -1 Memory error. @@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, const struct vclock *vclock, bool anon, - unsigned int id_filter); + uint32_t id_filter); =20 /** * Decode SUBSCRIBE command. @@ -344,7 +344,7 @@ xrow_encode_subscribe(struct xrow_header *row, * @param[out] version_id. * @param[out] anon Whether it is an anonymous subscribe. * @param[out] id_filter A list of ids to skip rows from when - * feeding replica. + * feeding a replica. * * @retval 0 Success. * @retval -1 Memory or format error. @@ -353,7 +353,7 @@ int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid = *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock = *vclock, uint32_t *version_id, bool *anon, - unsigned int *id_filter); + uint32_t *id_filter); =20 /** * Encode JOIN command. @@ -827,7 +827,7 @@ xrow_encode_subscribe_xc(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, const struct vclock *vclock, bool anon, - unsigned int id_filter) + uint32_t id_filter) { if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, vclock, anon, id_filter) !=3D 0) @@ -840,7 +840,7 @@ xrow_decode_subscribe_xc(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock = *vclock, uint32_t *replica_version_id, bool *anon, - unsigned int *id_filter) + uint32_t *id_filter) { if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid, vclock, replica_version_id, anon, -- Serge Petrenko sergepetrenko@tarantool.org --Apple-Mail=_B43F93F1-BD7B-4955-AFA9-BA15CBEE88CB Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8

27 =D1=84=D0=B5=D0=B2=D1=80. 2020 =D0=B3., =D0=B2 02:54, = Vladislav Shpilevoy <v.shpilevoy@tarantool.org> = =D0=BD=D0=B0=D0=BF=D0=B8=D1=81=D0=B0=D0=BB(=D0=B0):

Thanks= for the patch!

See 8 comments below.

Hi! = Thanks for the review!
Please find my comments inline and the = diff below.


   replication: implement an instance id = filter for relay

   Add a = filter for relay to skip rows coming from unwanted instances.
   A list of instance ids whose rows replica = doesn't want to fetch is encoded
=    together with SUBSCRIBE request after a freshly = introduced flag IPROTO_ID_FILTER.

=    Filtering rows is needed to prevent an instance from = fetching its own
   rows from a remote = master, which is useful on initial configuration and
=    harmful on resubscribe.

=    Prerequisite #4739, #3294

=    @TarantoolBot document

=    Title: document new binary protocol key and subscribe = request changes

   Add key = `IPROTO_ID_FILTER =3D 0x51` to the internals reference.
=    This is an optional key used in SUBSCRIBE request = followed by an array
   of ids of = instances whose rows won't be relayed to the replica.

   SUBSCRIBE request is supplemented with an = optional field of the
   following = structure:
   ```
=    +=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D+
   | =      ID_FILTER     |
   |   0x51 : ID LIST =   |
   | MP_INT : MP_ARRRAY |
   | =             &n= bsp;      |
=    +=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D+
   ```
=    The field is encoded only when the id list is not = empty.

diff --git a/src/box/relay.cc b/src/box/relay.cc
index = b89632273..dc89b90e2 100644
--- a/src/box/relay.cc
+++ = b/src/box/relay.cc
@@ -109,6 +109,7 @@ struct relay {
struct = vclock recv_vclock;
/** Replicatoin slave version. = */
uint32_t version_id;
+ = unsigned int id_filter;

1. Please, add a comment here explaining what is it,
and why is needed.

Ok.


/**
* Local = vclock at the moment of subscribe, used to check
* = dataset on the other side and send missing data rows if any.
@@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, = struct xrow_header *packet)
packet->group_id =3D = GROUP_DEFAULT;
packet->bodycnt =3D 0;
= }
+ /* Check if the rows from the = instance are filtered. */
+ if (1 << = packet->replica_id & relay->id_filter)

2. Please use explicit !=3D 0 = here and below when check 'filter_size'
variable.

No = problem.


+ return;
/*
= * We're feeding a WAL, thus responding to FINAL JOIN or = SUBSCRIBE
* request. If this is FINAL JOIN = (i.e. relay->replica is NULL),
diff --git = a/src/box/xrow.c b/src/box/xrow.c
index = 968c3a202..10edbf6a8 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1194,17 +1194,23 @@ = int
xrow_encode_subscribe(struct xrow_header *row,
= =      const struct tt_uuid = *replicaset_uuid,
=      const struct tt_uuid *instance_uuid,
- = =      const struct vclock *vclock, bool = anon)
+ =      const struct vclock *vclock, bool anon,
+ = =      unsigned int id_filter)

3. Nit - it would be better to = have it as uint32_t explicitly.
Becuase max id count is = 32. Unsigned int does not have size
guarantees, formally = speaking. It is at least 16 bits, no more
info.

Done, = and I=E2=80=99ll send a followup to vclock.h later, as Kostja = suggests.


{
memset(row, 0, sizeof(*row));
= size_t size =3D XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
+ = unsigned int filter_size =3D __builtin_popcount(id_filter);
+ = if (filter_size) {
+ size +=3D = mp_sizeof_array(filter_size) + filter_size *
+ = mp_sizeof_uint(VCLOCK_MAX);

4. You didn't add mp_sizeof_uint(IPROTO_ID_FILTER) here, but = you
didn't update XROW_BODY_LEN_MAX either. Does it fit in = the current
value? Also seems like it can't be called = XROW_BODY_LEN_MAX anymore.
Because clearly it is not = enough to fit the filter array, if you
needed to patch the = allocation here.

Let=E2=80=99s then just double XROW_BODY_LEN_MAX = up.


You could also update = XROW_BODY_LEN_MAX so as it includes the biggest
possible = filter. Max filter size is just 34 bytes. This is the
simplest option, I think.

+ }
= char *buf =3D (char *) region_alloc(&fiber()->gc, = size);
if (buf =3D=3D NULL) {
= = diag_set(OutOfMemory, size, "region_alloc", "buf");
= = = return -1;
}
char = *data =3D buf;
- data =3D mp_encode_map(data, = 5);
+ data =3D mp_encode_map(data, = filter_size ? 6 : 5);
data =3D mp_encode_uint(data, = IPROTO_CLUSTER_UUID);
data =3D xrow_encode_uuid(data, = replicaset_uuid);
data =3D mp_encode_uint(data, = IPROTO_INSTANCE_UUID);
@@ -1215,6 +1221,17 @@ = xrow_encode_subscribe(struct xrow_header *row,
data =3D = mp_encode_uint(data, tarantool_version_id());
data =3D = mp_encode_uint(data, IPROTO_REPLICA_ANON);
data =3D = mp_encode_bool(data, anon);
+ if (filter_size) {

5. I wouldn't make it optional in = encoding, since this is sent
really rare, but could = simplify the code a bit. However, up to
you.

I = don=E2=80=99t like the idea to pass zero length arrays in case there is = no filter,
Lets just not encode this part. The filter is = initially empty on decoding side
anyway.


Also, won't it break replication from an old = version? You
will send subscribe with this new key, and = the old instance
should ignore it. Does it? I don't = remember.


Yes, the old instance ignores all unknown = keys.

If the old instance would ignore it, it means = that the bug
still can explode when replicating from an = old version, right?
I don't know how to fix that, but if = it is true, we need to
document that, at least.

Yes, = the bug=E2=80=99s still here when replicating from an old = instance.
To trigger it, you have to set up master-master = replication between
the old and the new instances and fit some = additional conditions.
I don=E2=80=99t think it=E2=80=99s = usual to set up master-master when upgrading, as
Kostja has = pointed out.


+ = = data =3D mp_encode_uint(data, IPROTO_ID_FILTER);
+ = = data =3D mp_encode_array(data, filter_size);
+ struct = bit_iterator it;
+ bit_iterator_init(&it, = &id_filter, sizeof(id_filter),
+ =  true);
+ for (size_t id =3D = bit_iterator_next(&it); id < VCLOCK_MAX;
+ =     id =3D bit_iterator_next(&it)) {
+ = = = data =3D mp_encode_uint(data, id);
+ }
+ = }
assert(data <=3D buf + = size);
row->body[0].iov_base =3D = buf;
row->body[0].iov_len =3D (data = - buf);
@@ -1301,6 +1321,21 @@ = xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid = *replicaset_uuid,
}
*anon =3D = mp_decode_bool(&d);
break;
+ case = IPROTO_ID_FILTER:
+ if (id_filter =3D=3D NULL)
+ = = = = goto skip;
+ if (mp_typeof(*d) !=3D MP_ARRAY) = {
+decode_err: xrow_on_decode_err(data, end, = ER_INVALID_MSGPACK,
+   "invalid = id_filter");

6. Lets say it in = caps: ID_FILTER, just like with other decode
error = messages. Also I would name this label 'id_filter_decode_err',
becuase other keys can't jump here.

Okay.


+ return -1;
+ }
+ = = = uint32_t len =3D mp_decode_array(&d);
+ = for(uint32_t i =3D 0; i < len; ++i) {
+ if = (mp_typeof(*d) !=3D MP_UINT)
+ goto decode_err;
+ = = = = *id_filter |=3D 1 << mp_decode_uint(&d);

7. If someone would send a big ID = (a program, pretending to be a Tarantool
instance), it = would cause unsigned bit shift overflow, which is undefined
behaviour. Lets check that it is not bigger than 31.

However this won't really help much. This code = will crash even if I will
just send a truncated packet. = =46rom what I see.

I=E2=80=99m not sure I understand what you=E2=80=99r= e speaking about. This piece of code is
similar to the one we = have in mp_decode_vclock. The situation didn=E2=80=99t get = worse,
at least.


Up= to you whether you want to fix the bit shift.

Fixed.


+ }
+ = = = break;
default: skip:
= = = = mp_next(&d); /* value */
}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 0973c497d..8e5716b30 100644
--- = a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ = -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row,
 * @param instance_uuid Instance uuid.
=  * @param vclock Replication clock.
 * @param = anon Whether it is an anonymous subscribe request or not.
+ = * @param id_filter A List of replica ids to skip rows from
+= * =             &n= bsp;        when feeding a = replica.

8. Looks like a huge = indentation. Keep if you want. I noticed
just in case it = was an accident. In the next @param description
you = aligned it differently.

Thanks! I fixed both pieces.


=  *
 * @retval  0 Success.
=  * @retval -1 Memory error.
@@ -330,7 +332,8 @@ = int
@@ -340,6 +343,8 @@ xrow_encode_subscribe(struct = xrow_header *row,
 * @param[out] vclock.
 * @param[out] version_id.
 * = @param[out] anon Whether it is an anonymous subscribe.
+ * = @param[out] id_filter A list of ids to skip rows from when
+= * =             fe= eding replica.
 *
 * @retval =  0 Success.
 * @retval -1 Memory or format = error.


diff --git = a/src/box/box.cc b/src/box/box.cc
index = 94267c74e..09dd67ab4 100644
--- a/src/box/box.cc
+++ = b/src/box/box.cc
@@= -1787,7 +1787,7 @@ box_process_subscribe(struct ev_io *io, struct = xrow_header *header)
  uint32_t replica_version_id;
 = vclock_create(&replica_clock);
 = bool anon;
- unsigned int id_filter;
+ = uint32_t id_filter;
  = xrow_decode_subscribe_xc(header, NULL, &replica_uuid, = &replica_clock,
  =  &replica_version_id, &anon, &id_filter);
 
diff --git a/src/box/relay.cc b/src/box/relay.cc
index = dc89b90e2..95245a3cf 100644
--- a/src/box/relay.cc
+++ = b/src/box/relay.cc
@@ -109,7 +109,13 @@ struct relay {
  struct = vclock recv_vclock;
  /** Replicatoin slave version. = */
  uint32_t version_id;
- = unsigned int id_filter;
+ /**
+ =  * A filter of replica ids whose rows should be ignored.
+ =  * Each set filter bit corresponds to a replica id whose
+ =  * rows shouldn't be relayed. The list of ids to ignore
+ =  * is passed by the replica on subscribe.
+ =
 */
+ uint32_t id_filter;
 = /**
   * Local vclock at the = moment of subscribe, used to check
   * = dataset on the other side and send missing data rows if any.
@@ -678,7 +684,7 @@ relay_subscribe_f(va_list ap)
 void
 relay_subscribe(struct replica = *replica, int fd, uint64_t sync,
  = struct vclock *replica_clock, uint32_t replica_version_id,
- = unsigned int replica_id_filter)
+ = uint32_t replica_id_filter)
 {
 = assert(replica->anon || replica->id !=3D = REPLICA_ID_NIL);
  struct relay *relay =3D = replica->relay;
@@ -768,7 +774,7 @@ = relay_send_row(struct xstream *stream, struct xrow_header *packet)
 = packet->bodycnt =3D 0;
  }
 = /* Check if the rows from the instance are filtered. = */
- if (1 << = packet->replica_id & relay->id_filter)
+ if ((1 = << packet->replica_id & relay->id_filter) !=3D 0)
 = return;
  /*
 =  * We're feeding a WAL, thus responding to FINAL = JOIN or SUBSCRIBE
diff --git a/src/box/relay.h = b/src/box/relay.h
index 6e7eebab1..0632fa912 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, = struct vclock *start_vclock,
 void
 relay_subscribe(struct replica *replica, int fd, = uint64_t sync,
  struct vclock = *replica_vclock, uint32_t replica_version_id,
- = unsigned int replica_id_filter);
+ = uint32_t replica_id_filter);
 
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 10edbf6a8..602049004 100644
--- = a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ = -1195,22 +1195,18 @@ xrow_encode_subscribe(struct xrow_header *row,
 =       const struct tt_uuid = *replicaset_uuid,
      =   const struct tt_uuid *instance_uuid,
  =       const struct vclock *vclock, bool = anon,
-     =   unsigned int id_filter)
+ =       uint32_t id_filter)
 {
  memset(row, 0, sizeof(*row));
 = size_t size =3D XROW_BODY_LEN_MAX + = mp_sizeof_vclock(vclock);
- unsigned int filter_size =3D = __builtin_popcount(id_filter);
- if = (filter_size) {
- size +=3D = mp_sizeof_array(filter_size) + filter_size *
- = mp_sizeof_uint(VCLOCK_MAX);
- }
 = char *buf =3D (char *) region_alloc(&fiber()->gc, = size);
  if (buf =3D=3D NULL) {
 = diag_set(OutOfMemory, size, "region_alloc", = "buf");
  return -1;
 = }
  char *data =3D buf;
- = data =3D mp_encode_map(data, filter_size ? 6 : 5);
+ = int filter_size =3D __builtin_popcount(id_filter);
+ = data =3D mp_encode_map(data, filter_size !=3D 0 ? 6 : 5);
 = data =3D mp_encode_uint(data, IPROTO_CLUSTER_UUID);
 = data =3D xrow_encode_uuid(data, replicaset_uuid);
 = data =3D mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1221,7 +1217,7 @@ xrow_encode_subscribe(struct = xrow_header *row,
  data =3D mp_encode_uint(data, = tarantool_version_id());
  data =3D = mp_encode_uint(data, IPROTO_REPLICA_ANON);
  data =3D = mp_encode_bool(data, anon);
- if (filter_size) {
+ = if (filter_size !=3D 0) {
  = data =3D mp_encode_uint(data, IPROTO_ID_FILTER);
 = data =3D mp_encode_array(data, filter_size);
 = struct bit_iterator it;
@@ -1244,7 = +1240,7 @@ int
 xrow_decode_subscribe(struct = xrow_header *row, struct tt_uuid *replicaset_uuid,
 =       struct tt_uuid = *instance_uuid, struct vclock *vclock,
  =       uint32_t *version_id, bool *anon,
- =       unsigned int *id_filter)
+ =       uint32_t *id_filter)
 {
  if (row->bodycnt =3D=3D 0) = {
  diag_set(ClientError, = ER_INVALID_MSGPACK, "request body");
@@ -1325,15 +1321,18 = @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid = *replicaset_uuid,
  if (id_filter =3D=3D= NULL)
  goto = skip;
  if (mp_typeof(*d) = !=3D MP_ARRAY) {
-decode_err: = xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
- =    "invalid = id_filter");
+id_filter_decode_err: = xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+ =    "invalid = ID_FILTER");
  return = -1;
  }
 = uint32_t len =3D = mp_decode_array(&d);
- for(uint32_t i =3D = 0; i < len; ++i) {
+ for (uint32_t i =3D= 0; i < len; ++i) {
  if = (mp_typeof(*d) !=3D MP_UINT)
- = goto decode_err;
- = *id_filter |=3D 1 << mp_decode_uint(&d);
+ = goto id_filter_decode_err;
+ = uint64_t val =3D mp_decode_uint(&d);
+ = if (val >=3D VCLOCK_MAX)
+ = goto id_filter_decode_err;
+ = *id_filter |=3D 1 << val;
  = }
  break;
 = default: skip:
diff --git = a/src/box/xrow.h b/src/box/xrow.h
index = 8e5716b30..2a0a9c852 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -48,7 +48,7 @@ enum = {
  XROW_BODY_IOVMAX =3D 2,
 = XROW_IOVMAX =3D XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
 = XROW_HEADER_LEN_MAX =3D 52,
- = XROW_BODY_LEN_MAX =3D 128,
+ = XROW_BODY_LEN_MAX =3D 256,
  = IPROTO_HEADER_LEN =3D 28,
  /** 7 =3D = sizeof(iproto_body_bin). */
  = IPROTO_SELECT_HEADER_LEN =3D IPROTO_HEADER_LEN + 7,
@@ -323,7 +323,7 @@ xrow_encode_register(struct xrow_header = *row,
  * @param vclock Replication clock.
  * @param anon Whether it is an anonymous = subscribe request or not.
  * @param id_filter A = List of replica ids to skip rows from
- *    =                   when = feeding a replica.
+ *     when = feeding a replica.
  *
  *= @retval  0 Success.
  * @retval -1 = Memory error.
@@ -333,7 +333,7 @@ = xrow_encode_subscribe(struct xrow_header *row,
  =       const struct tt_uuid = *replicaset_uuid,
      =   const struct tt_uuid *instance_uuid,
  =       const struct vclock *vclock, bool = anon,
-     =   unsigned int id_filter);
+ =       uint32_t id_filter);
 
 /**
  * = Decode SUBSCRIBE command.
@@ -344,7 +344,7 @@ = xrow_encode_subscribe(struct xrow_header *row,
  *= @param[out] version_id.
  * @param[out] anon = Whether it is an anonymous subscribe.
  * = @param[out] id_filter A list of ids to skip rows from when
-= *             feeding = replica.
+ *  feeding a = replica.
  *
  * = @retval  0 Success.
  * @retval -1 = Memory or format error.
@@ -353,7 +353,7 @@ int
 xrow_decode_subscribe(struct xrow_header *row, struct = tt_uuid *replicaset_uuid,
  =       struct tt_uuid *instance_uuid, struct = vclock *vclock,
      =   uint32_t *version_id, bool *anon,
- =       unsigned int *id_filter);
+ =       uint32_t *id_filter);
 
 /**
  * = Encode JOIN command.
@@ -827,7 +827,7 @@ = xrow_encode_subscribe_xc(struct xrow_header *row,
 =  const struct tt_uuid = *replicaset_uuid,
   const = struct tt_uuid *instance_uuid,
  =  const struct vclock *vclock, bool anon,
- =  unsigned int id_filter)
+ =  uint32_t id_filter)
 {
 = if (xrow_encode_subscribe(row, replicaset_uuid, = instance_uuid,
  =   vclock, anon, id_filter) !=3D 0)
@@ = -840,7 +840,7 @@ xrow_decode_subscribe_xc(struct xrow_header *row,
 =  struct tt_uuid *replicaset_uuid,
 =  struct tt_uuid *instance_uuid, = struct vclock *vclock,
   uint32_t = *replica_version_id, bool *anon,
- =  unsigned int *id_filter)
+ =  uint32_t *id_filter)
 {
 = if (xrow_decode_subscribe(row, replicaset_uuid, = instance_uuid,
    = vclock, replica_version_id, anon,

--
Serge = Petrenko


= --Apple-Mail=_B43F93F1-BD7B-4955-AFA9-BA15CBEE88CB--