From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp44.i.mail.ru (smtp44.i.mail.ru [94.100.177.104]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 17D7C469719 for ; Thu, 27 Feb 2020 02:54:52 +0300 (MSK) References: From: Vladislav Shpilevoy Message-ID: <172faa01-c31c-76e6-bb45-066f44ffc73d@tarantool.org> Date: Thu, 27 Feb 2020 00:54:48 +0100 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: sergepetrenko , kirichenkoga@gmail.com, kostja.osipov@gmail.com, alexander.turenko@tarantool.org Cc: tarantool-patches@dev.tarantool.org Thanks for the patch! See 8 comments below. > replication: implement an instance id filter for relay > > Add a filter for relay to skip rows coming from unwanted instances. > A list of instance ids whose rows replica doesn't want to fetch is encoded > together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_FILTER. > > Filtering rows is needed to prevent an instance from fetching its own > rows from a remote master, which is useful on initial configuration and > harmful on resubscribe. > > Prerequisite #4739, #3294 > > @TarantoolBot document > > Title: document new binary protocol key and subscribe request changes > > Add key `IPROTO_ID_FILTER = 0x51` to the internals reference. > This is an optional key used in SUBSCRIBE request followed by an array > of ids of instances whose rows won't be relayed to the replica. > > SUBSCRIBE request is supplemented with an optional field of the > following structure: > ``` > +====================+ > | ID_FILTER | > | 0x51 : ID LIST | > | MP_INT : MP_ARRRAY | > | | > +====================+ > ``` > The field is encoded only when the id list is not empty. > > diff --git a/src/box/relay.cc b/src/box/relay.cc > index b89632273..dc89b90e2 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -109,6 +109,7 @@ struct relay { > struct vclock recv_vclock; > /** Replicatoin slave version. */ > uint32_t version_id; > + unsigned int id_filter; 1. Please, add a comment here explaining what is it, and why is needed. > /** > * Local vclock at the moment of subscribe, used to check > * dataset on the other side and send missing data rows if any. > @@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) > packet->group_id = GROUP_DEFAULT; > packet->bodycnt = 0; > } > + /* Check if the rows from the instance are filtered. */ > + if (1 << packet->replica_id & relay->id_filter) 2. Please use explicit != 0 here and below when check 'filter_size' variable. > + return; > /* > * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE > * request. If this is FINAL JOIN (i.e. relay->replica is NULL), > diff --git a/src/box/xrow.c b/src/box/xrow.c > index 968c3a202..10edbf6a8 100644 > --- a/src/box/xrow.c > +++ b/src/box/xrow.c > @@ -1194,17 +1194,23 @@ int > xrow_encode_subscribe(struct xrow_header *row, > const struct tt_uuid *replicaset_uuid, > const struct tt_uuid *instance_uuid, > - const struct vclock *vclock, bool anon) > + const struct vclock *vclock, bool anon, > + unsigned int id_filter) 3. Nit - it would be better to have it as uint32_t explicitly. Becuase max id count is 32. Unsigned int does not have size guarantees, formally speaking. It is at least 16 bits, no more info. > { > memset(row, 0, sizeof(*row)); > size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); > + unsigned int filter_size = __builtin_popcount(id_filter); > + if (filter_size) { > + size += mp_sizeof_array(filter_size) + filter_size * > + mp_sizeof_uint(VCLOCK_MAX); 4. You didn't add mp_sizeof_uint(IPROTO_ID_FILTER) here, but you didn't update XROW_BODY_LEN_MAX either. Does it fit in the current value? Also seems like it can't be called XROW_BODY_LEN_MAX anymore. Because clearly it is not enough to fit the filter array, if you needed to patch the allocation here. You could also update XROW_BODY_LEN_MAX so as it includes the biggest possible filter. Max filter size is just 34 bytes. This is the simplest option, I think. > + } > char *buf = (char *) region_alloc(&fiber()->gc, size); > if (buf == NULL) { > diag_set(OutOfMemory, size, "region_alloc", "buf"); > return -1; > } > char *data = buf; > - data = mp_encode_map(data, 5); > + data = mp_encode_map(data, filter_size ? 6 : 5); > data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); > data = xrow_encode_uuid(data, replicaset_uuid); > data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); > @@ -1215,6 +1221,17 @@ xrow_encode_subscribe(struct xrow_header *row, > data = mp_encode_uint(data, tarantool_version_id()); > data = mp_encode_uint(data, IPROTO_REPLICA_ANON); > data = mp_encode_bool(data, anon); > + if (filter_size) { 5. I wouldn't make it optional in encoding, since this is sent really rare, but could simplify the code a bit. However, up to you. Also, won't it break replication from an old version? You will send subscribe with this new key, and the old instance should ignore it. Does it? I don't remember. If the old instance would ignore it, it means that the bug still can explode when replicating from an old version, right? I don't know how to fix that, but if it is true, we need to document that, at least. > + data = mp_encode_uint(data, IPROTO_ID_FILTER); > + data = mp_encode_array(data, filter_size); > + struct bit_iterator it; > + bit_iterator_init(&it, &id_filter, sizeof(id_filter), > + true); > + for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX; > + id = bit_iterator_next(&it)) { > + data = mp_encode_uint(data, id); > + } > + } > assert(data <= buf + size); > row->body[0].iov_base = buf; > row->body[0].iov_len = (data - buf); > @@ -1301,6 +1321,21 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, > } > *anon = mp_decode_bool(&d); > break; > + case IPROTO_ID_FILTER: > + if (id_filter == NULL) > + goto skip; > + if (mp_typeof(*d) != MP_ARRAY) { > +decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK, > + "invalid id_filter"); 6. Lets say it in caps: ID_FILTER, just like with other decode error messages. Also I would name this label 'id_filter_decode_err', becuase other keys can't jump here. > + return -1; > + } > + uint32_t len = mp_decode_array(&d); > + for(uint32_t i = 0; i < len; ++i) { > + if (mp_typeof(*d) != MP_UINT) > + goto decode_err; > + *id_filter |= 1 << mp_decode_uint(&d); 7. If someone would send a big ID (a program, pretending to be a Tarantool instance), it would cause unsigned bit shift overflow, which is undefined behaviour. Lets check that it is not bigger than 31. However this won't really help much. This code will crash even if I will just send a truncated packet. From what I see. Up to you whether you want to fix the bit shift. > + } > + break; > default: skip: > mp_next(&d); /* value */ > } > diff --git a/src/box/xrow.h b/src/box/xrow.h > index 0973c497d..8e5716b30 100644 > --- a/src/box/xrow.h > +++ b/src/box/xrow.h > @@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row, > * @param instance_uuid Instance uuid. > * @param vclock Replication clock. > * @param anon Whether it is an anonymous subscribe request or not. > + * @param id_filter A List of replica ids to skip rows from > + * when feeding a replica. 8. Looks like a huge indentation. Keep if you want. I noticed just in case it was an accident. In the next @param description you aligned it differently. > * > * @retval 0 Success. > * @retval -1 Memory error. > @@ -330,7 +332,8 @@ int > @@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row, > * @param[out] vclock. > * @param[out] version_id. > * @param[out] anon Whether it is an anonymous subscribe. > + * @param[out] id_filter A list of ids to skip rows from when > + * feeding replica. > * > * @retval 0 Success. > * @retval -1 Memory or format error.