> 27 февр. 2020 г., в 02:54, Vladislav Shpilevoy написал(а): > > Thanks for the patch! > > See 8 comments below. Hi! Thanks for the review! Please find my comments inline and the diff below. > >> replication: implement an instance id filter for relay >> >> Add a filter for relay to skip rows coming from unwanted instances. >> A list of instance ids whose rows replica doesn't want to fetch is encoded >> together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_FILTER. >> >> Filtering rows is needed to prevent an instance from fetching its own >> rows from a remote master, which is useful on initial configuration and >> harmful on resubscribe. >> >> Prerequisite #4739, #3294 >> >> @TarantoolBot document >> >> Title: document new binary protocol key and subscribe request changes >> >> Add key `IPROTO_ID_FILTER = 0x51` to the internals reference. >> This is an optional key used in SUBSCRIBE request followed by an array >> of ids of instances whose rows won't be relayed to the replica. >> >> SUBSCRIBE request is supplemented with an optional field of the >> following structure: >> ``` >> +====================+ >> | ID_FILTER | >> | 0x51 : ID LIST | >> | MP_INT : MP_ARRRAY | >> | | >> +====================+ >> ``` >> The field is encoded only when the id list is not empty. >> >> diff --git a/src/box/relay.cc b/src/box/relay.cc >> index b89632273..dc89b90e2 100644 >> --- a/src/box/relay.cc >> +++ b/src/box/relay.cc >> @@ -109,6 +109,7 @@ struct relay { >> struct vclock recv_vclock; >> /** Replicatoin slave version. */ >> uint32_t version_id; >> + unsigned int id_filter; > > 1. Please, add a comment here explaining what is it, > and why is needed. Ok. > >> /** >> * Local vclock at the moment of subscribe, used to check >> * dataset on the other side and send missing data rows if any. >> @@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) >> packet->group_id = GROUP_DEFAULT; >> packet->bodycnt = 0; >> } >> + /* Check if the rows from the instance are filtered. */ >> + if (1 << packet->replica_id & relay->id_filter) > > 2. Please use explicit != 0 here and below when check 'filter_size' > variable. No problem. > >> + return; >> /* >> * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE >> * request. If this is FINAL JOIN (i.e. relay->replica is NULL), >> diff --git a/src/box/xrow.c b/src/box/xrow.c >> index 968c3a202..10edbf6a8 100644 >> --- a/src/box/xrow.c >> +++ b/src/box/xrow.c >> @@ -1194,17 +1194,23 @@ int >> xrow_encode_subscribe(struct xrow_header *row, >> const struct tt_uuid *replicaset_uuid, >> const struct tt_uuid *instance_uuid, >> - const struct vclock *vclock, bool anon) >> + const struct vclock *vclock, bool anon, >> + unsigned int id_filter) > > 3. Nit - it would be better to have it as uint32_t explicitly. > Becuase max id count is 32. Unsigned int does not have size > guarantees, formally speaking. It is at least 16 bits, no more > info. Done, and I’ll send a followup to vclock.h later, as Kostja suggests. > >> { >> memset(row, 0, sizeof(*row)); >> size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); >> + unsigned int filter_size = __builtin_popcount(id_filter); >> + if (filter_size) { >> + size += mp_sizeof_array(filter_size) + filter_size * >> + mp_sizeof_uint(VCLOCK_MAX); > > 4. You didn't add mp_sizeof_uint(IPROTO_ID_FILTER) here, but you > didn't update XROW_BODY_LEN_MAX either. Does it fit in the current > value? Also seems like it can't be called XROW_BODY_LEN_MAX anymore. > Because clearly it is not enough to fit the filter array, if you > needed to patch the allocation here. Let’s then just double XROW_BODY_LEN_MAX up. > > You could also update XROW_BODY_LEN_MAX so as it includes the biggest > possible filter. Max filter size is just 34 bytes. This is the > simplest option, I think. > >> + } >> char *buf = (char *) region_alloc(&fiber()->gc, size); >> if (buf == NULL) { >> diag_set(OutOfMemory, size, "region_alloc", "buf"); >> return -1; >> } >> char *data = buf; >> - data = mp_encode_map(data, 5); >> + data = mp_encode_map(data, filter_size ? 6 : 5); >> data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); >> data = xrow_encode_uuid(data, replicaset_uuid); >> data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); >> @@ -1215,6 +1221,17 @@ xrow_encode_subscribe(struct xrow_header *row, >> data = mp_encode_uint(data, tarantool_version_id()); >> data = mp_encode_uint(data, IPROTO_REPLICA_ANON); >> data = mp_encode_bool(data, anon); >> + if (filter_size) { > > 5. I wouldn't make it optional in encoding, since this is sent > really rare, but could simplify the code a bit. However, up to > you. I don’t like the idea to pass zero length arrays in case there is no filter, Lets just not encode this part. The filter is initially empty on decoding side anyway. > > Also, won't it break replication from an old version? You > will send subscribe with this new key, and the old instance > should ignore it. Does it? I don't remember. > Yes, the old instance ignores all unknown keys. > If the old instance would ignore it, it means that the bug > still can explode when replicating from an old version, right? > I don't know how to fix that, but if it is true, we need to > document that, at least. Yes, the bug’s still here when replicating from an old instance. To trigger it, you have to set up master-master replication between the old and the new instances and fit some additional conditions. I don’t think it’s usual to set up master-master when upgrading, as Kostja has pointed out. > >> + data = mp_encode_uint(data, IPROTO_ID_FILTER); >> + data = mp_encode_array(data, filter_size); >> + struct bit_iterator it; >> + bit_iterator_init(&it, &id_filter, sizeof(id_filter), >> + true); >> + for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX; >> + id = bit_iterator_next(&it)) { >> + data = mp_encode_uint(data, id); >> + } >> + } >> assert(data <= buf + size); >> row->body[0].iov_base = buf; >> row->body[0].iov_len = (data - buf); >> @@ -1301,6 +1321,21 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, >> } >> *anon = mp_decode_bool(&d); >> break; >> + case IPROTO_ID_FILTER: >> + if (id_filter == NULL) >> + goto skip; >> + if (mp_typeof(*d) != MP_ARRAY) { >> +decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK, >> + "invalid id_filter"); > > 6. Lets say it in caps: ID_FILTER, just like with other decode > error messages. Also I would name this label 'id_filter_decode_err', > becuase other keys can't jump here. Okay. > >> + return -1; >> + } >> + uint32_t len = mp_decode_array(&d); >> + for(uint32_t i = 0; i < len; ++i) { >> + if (mp_typeof(*d) != MP_UINT) >> + goto decode_err; >> + *id_filter |= 1 << mp_decode_uint(&d); > > 7. If someone would send a big ID (a program, pretending to be a Tarantool > instance), it would cause unsigned bit shift overflow, which is undefined > behaviour. Lets check that it is not bigger than 31. > > However this won't really help much. This code will crash even if I will > just send a truncated packet. From what I see. I’m not sure I understand what you’re speaking about. This piece of code is similar to the one we have in mp_decode_vclock. The situation didn’t get worse, at least. > > Up to you whether you want to fix the bit shift. Fixed. > >> + } >> + break; >> default: skip: >> mp_next(&d); /* value */ >> } >> diff --git a/src/box/xrow.h b/src/box/xrow.h >> index 0973c497d..8e5716b30 100644 >> --- a/src/box/xrow.h >> +++ b/src/box/xrow.h >> @@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row, >> * @param instance_uuid Instance uuid. >> * @param vclock Replication clock. >> * @param anon Whether it is an anonymous subscribe request or not. >> + * @param id_filter A List of replica ids to skip rows from >> + * when feeding a replica. > > 8. Looks like a huge indentation. Keep if you want. I noticed > just in case it was an accident. In the next @param description > you aligned it differently. Thanks! I fixed both pieces. > >> * >> * @retval 0 Success. >> * @retval -1 Memory error. >> @@ -330,7 +332,8 @@ int >> @@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row, >> * @param[out] vclock. >> * @param[out] version_id. >> * @param[out] anon Whether it is an anonymous subscribe. >> + * @param[out] id_filter A list of ids to skip rows from when >> + * feeding replica. >> * >> * @retval 0 Success. >> * @retval -1 Memory or format error. diff --git a/src/box/box.cc b/src/box/box.cc index 94267c74e..09dd67ab4 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1787,7 +1787,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) uint32_t replica_version_id; vclock_create(&replica_clock); bool anon; - unsigned int id_filter; + uint32_t id_filter; xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock, &replica_version_id, &anon, &id_filter); diff --git a/src/box/relay.cc b/src/box/relay.cc index dc89b90e2..95245a3cf 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -109,7 +109,13 @@ struct relay { struct vclock recv_vclock; /** Replicatoin slave version. */ uint32_t version_id; - unsigned int id_filter; + /** + * A filter of replica ids whose rows should be ignored. + * Each set filter bit corresponds to a replica id whose + * rows shouldn't be relayed. The list of ids to ignore + * is passed by the replica on subscribe. + */ + uint32_t id_filter; /** * Local vclock at the moment of subscribe, used to check * dataset on the other side and send missing data rows if any. @@ -678,7 +684,7 @@ relay_subscribe_f(va_list ap) void relay_subscribe(struct replica *replica, int fd, uint64_t sync, struct vclock *replica_clock, uint32_t replica_version_id, - unsigned int replica_id_filter) + uint32_t replica_id_filter) { assert(replica->anon || replica->id != REPLICA_ID_NIL); struct relay *relay = replica->relay; @@ -768,7 +774,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) packet->bodycnt = 0; } /* Check if the rows from the instance are filtered. */ - if (1 << packet->replica_id & relay->id_filter) + if ((1 << packet->replica_id & relay->id_filter) != 0) return; /* * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE diff --git a/src/box/relay.h b/src/box/relay.h index 6e7eebab1..0632fa912 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, void relay_subscribe(struct replica *replica, int fd, uint64_t sync, struct vclock *replica_vclock, uint32_t replica_version_id, - unsigned int replica_id_filter); + uint32_t replica_id_filter); #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */ diff --git a/src/box/xrow.c b/src/box/xrow.c index 10edbf6a8..602049004 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1195,22 +1195,18 @@ xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, const struct vclock *vclock, bool anon, - unsigned int id_filter) + uint32_t id_filter) { memset(row, 0, sizeof(*row)); size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); - unsigned int filter_size = __builtin_popcount(id_filter); - if (filter_size) { - size += mp_sizeof_array(filter_size) + filter_size * - mp_sizeof_uint(VCLOCK_MAX); - } char *buf = (char *) region_alloc(&fiber()->gc, size); if (buf == NULL) { diag_set(OutOfMemory, size, "region_alloc", "buf"); return -1; } char *data = buf; - data = mp_encode_map(data, filter_size ? 6 : 5); + int filter_size = __builtin_popcount(id_filter); + data = mp_encode_map(data, filter_size != 0 ? 6 : 5); data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); data = xrow_encode_uuid(data, replicaset_uuid); data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); @@ -1221,7 +1217,7 @@ xrow_encode_subscribe(struct xrow_header *row, data = mp_encode_uint(data, tarantool_version_id()); data = mp_encode_uint(data, IPROTO_REPLICA_ANON); data = mp_encode_bool(data, anon); - if (filter_size) { + if (filter_size != 0) { data = mp_encode_uint(data, IPROTO_ID_FILTER); data = mp_encode_array(data, filter_size); struct bit_iterator it; @@ -1244,7 +1240,7 @@ int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, uint32_t *version_id, bool *anon, - unsigned int *id_filter) + uint32_t *id_filter) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -1325,15 +1321,18 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, if (id_filter == NULL) goto skip; if (mp_typeof(*d) != MP_ARRAY) { -decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK, - "invalid id_filter"); +id_filter_decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK, + "invalid ID_FILTER"); return -1; } uint32_t len = mp_decode_array(&d); - for(uint32_t i = 0; i < len; ++i) { + for (uint32_t i = 0; i < len; ++i) { if (mp_typeof(*d) != MP_UINT) - goto decode_err; - *id_filter |= 1 << mp_decode_uint(&d); + goto id_filter_decode_err; + uint64_t val = mp_decode_uint(&d); + if (val >= VCLOCK_MAX) + goto id_filter_decode_err; + *id_filter |= 1 << val; } break; default: skip: diff --git a/src/box/xrow.h b/src/box/xrow.h index 8e5716b30..2a0a9c852 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -48,7 +48,7 @@ enum { XROW_BODY_IOVMAX = 2, XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, XROW_HEADER_LEN_MAX = 52, - XROW_BODY_LEN_MAX = 128, + XROW_BODY_LEN_MAX = 256, IPROTO_HEADER_LEN = 28, /** 7 = sizeof(iproto_body_bin). */ IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7, @@ -323,7 +323,7 @@ xrow_encode_register(struct xrow_header *row, * @param vclock Replication clock. * @param anon Whether it is an anonymous subscribe request or not. * @param id_filter A List of replica ids to skip rows from - * when feeding a replica. + * when feeding a replica. * * @retval 0 Success. * @retval -1 Memory error. @@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, const struct vclock *vclock, bool anon, - unsigned int id_filter); + uint32_t id_filter); /** * Decode SUBSCRIBE command. @@ -344,7 +344,7 @@ xrow_encode_subscribe(struct xrow_header *row, * @param[out] version_id. * @param[out] anon Whether it is an anonymous subscribe. * @param[out] id_filter A list of ids to skip rows from when - * feeding replica. + * feeding a replica. * * @retval 0 Success. * @retval -1 Memory or format error. @@ -353,7 +353,7 @@ int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, uint32_t *version_id, bool *anon, - unsigned int *id_filter); + uint32_t *id_filter); /** * Encode JOIN command. @@ -827,7 +827,7 @@ xrow_encode_subscribe_xc(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, const struct vclock *vclock, bool anon, - unsigned int id_filter) + uint32_t id_filter) { if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, vclock, anon, id_filter) != 0) @@ -840,7 +840,7 @@ xrow_decode_subscribe_xc(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, uint32_t *replica_version_id, bool *anon, - unsigned int *id_filter) + uint32_t *id_filter) { if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid, vclock, replica_version_id, anon, -- Serge Petrenko sergepetrenko@tarantool.org