>Среда, 26 февраля 2020, 13:18 +03:00 от Konstantin Osipov : >  > >Generally I think you're on track with these series. > >I believe it will also be possible to use this filter to not send >records twice in a full mesh. E.g. you could add a follow up patch >which excludes from the subscription peers to which we have a >direct connection, this will easily solve the full-mesh >replication traffic explosion issue, the replica will only get the >changes directly from the peers.   Okay, will do.   > >Please solicit a detailed review. > >* sergepetrenko < sergepetrenko@tarantool.org > [20/02/26 13:00]: >  >> + unsigned int map_size = __builtin_popcount(id_mask); >> + if (map_size) { >> + size += mp_sizeof_array(map_size) + map_size * >> + mp_sizeof_uint(VCLOCK_MAX); >> + } >Better:mask_size. > >Besides, this is perhaps not id_mask, but id_filter. The >difference is that mask is something inclusive, i.e. if you're in >the mask, you get the records, while filter is >something exclusive, if you're not in the mask, you don't get the >records.   Okay, s/id_mask/id_filter s/map_size/mask_size   diff --git a/src/box/box.cc b/src/box/box.cc index 232d7861b..94267c74e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1787,9 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)   uint32_t replica_version_id;   vclock_create(&replica_clock);   bool anon; - unsigned int id_mask; + unsigned int id_filter;   xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock, - &replica_version_id, &anon, &id_mask); + &replica_version_id, &anon, &id_filter);     /* Forbid connection to itself */   if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID)) @@ -1872,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)   * indefinitely).   */   relay_subscribe(replica, io->fd, header->sync, &replica_clock, - replica_version_id, id_mask); + replica_version_id, id_filter); }   void diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 814ada303..f9d413a31 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -125,7 +125,7 @@ enum iproto_key {   IPROTO_STMT_ID = 0x43,   /* Leave a gap between SQL keys and additional request keys */   IPROTO_REPLICA_ANON = 0x50, - IPROTO_ID_MASK = 0x51, + IPROTO_ID_FILTER = 0x51,   IPROTO_KEY_MAX };   diff --git a/src/box/relay.cc b/src/box/relay.cc index 87930a006..dc89b90e2 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -109,7 +109,7 @@ struct relay {   struct vclock recv_vclock;   /** Replicatoin slave version. */   uint32_t version_id; - unsigned int id_mask; + unsigned int id_filter;   /**   * Local vclock at the moment of subscribe, used to check   * dataset on the other side and send missing data rows if any. @@ -678,7 +678,7 @@ relay_subscribe_f(va_list ap) void relay_subscribe(struct replica *replica, int fd, uint64_t sync,   struct vclock *replica_clock, uint32_t replica_version_id, - unsigned int replica_id_mask) + unsigned int replica_id_filter) {   assert(replica->anon || replica->id != REPLICA_ID_NIL);   struct relay *relay = replica->relay; @@ -707,7 +707,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,   vclock_copy(&relay->tx.vclock, replica_clock);   relay->version_id = replica_version_id;   - relay->id_mask = replica_id_mask; + relay->id_filter = replica_id_filter;     int rc = cord_costart(&relay->cord, "subscribe",         relay_subscribe_f, relay); @@ -768,7 +768,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)   packet->bodycnt = 0;   }   /* Check if the rows from the instance are filtered. */ - if (1 << packet->replica_id & relay->id_mask) + if (1 << packet->replica_id & relay->id_filter)   return;   /*   * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE diff --git a/src/box/relay.h b/src/box/relay.h index 255d29d90..6e7eebab1 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, void relay_subscribe(struct replica *replica, int fd, uint64_t sync,   struct vclock *replica_vclock, uint32_t replica_version_id, - unsigned int replica_id_mask); + unsigned int replica_id_filter);   #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */ diff --git a/src/box/xrow.c b/src/box/xrow.c index ee78ed24d..10edbf6a8 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1195,13 +1195,13 @@ xrow_encode_subscribe(struct xrow_header *row,         const struct tt_uuid *replicaset_uuid,         const struct tt_uuid *instance_uuid,         const struct vclock *vclock, bool anon, -       unsigned int id_mask) +       unsigned int id_filter) {   memset(row, 0, sizeof(*row));   size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); - unsigned int map_size = __builtin_popcount(id_mask); - if (map_size) { - size += mp_sizeof_array(map_size) + map_size * + unsigned int filter_size = __builtin_popcount(id_filter); + if (filter_size) { + size += mp_sizeof_array(filter_size) + filter_size *   mp_sizeof_uint(VCLOCK_MAX);   }   char *buf = (char *) region_alloc(&fiber()->gc, size); @@ -1210,7 +1210,7 @@ xrow_encode_subscribe(struct xrow_header *row,   return -1;   }   char *data = buf; - data = mp_encode_map(data, map_size ? 6 : 5); + data = mp_encode_map(data, filter_size ? 6 : 5);   data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);   data = xrow_encode_uuid(data, replicaset_uuid);   data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); @@ -1221,11 +1221,11 @@ xrow_encode_subscribe(struct xrow_header *row,   data = mp_encode_uint(data, tarantool_version_id());   data = mp_encode_uint(data, IPROTO_REPLICA_ANON);   data = mp_encode_bool(data, anon); - if (map_size) { - data = mp_encode_uint(data, IPROTO_ID_MASK); - data = mp_encode_array(data, map_size); + if (filter_size) { + data = mp_encode_uint(data, IPROTO_ID_FILTER); + data = mp_encode_array(data, filter_size);   struct bit_iterator it; - bit_iterator_init(&it, &id_mask, sizeof(id_mask), + bit_iterator_init(&it, &id_filter, sizeof(id_filter),     true);   for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;       id = bit_iterator_next(&it)) { @@ -1244,7 +1244,7 @@ int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,         struct tt_uuid *instance_uuid, struct vclock *vclock,         uint32_t *version_id, bool *anon, -       unsigned int *id_mask) +       unsigned int *id_filter) {   if (row->bodycnt == 0) {   diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -1262,8 +1262,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,     if (anon)   *anon = false; - if (id_mask) - *id_mask = 0; + if (id_filter) + *id_filter = 0;   d = data;   uint32_t map_size = mp_decode_map(&d);   for (uint32_t i = 0; i < map_size; i++) { @@ -1321,19 +1321,19 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,   }   *anon = mp_decode_bool(&d);   break; - case IPROTO_ID_MASK: - if (id_mask == NULL) + case IPROTO_ID_FILTER: + if (id_filter == NULL)   goto skip;   if (mp_typeof(*d) != MP_ARRAY) { decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK, -   "invalid id_mask"); +   "invalid id_filter");   return -1;   }   uint32_t len = mp_decode_array(&d);   for(uint32_t i = 0; i < len; ++i) {   if (mp_typeof(*d) != MP_UINT)   goto decode_err; - *id_mask |= 1 << mp_decode_uint(&d); + *id_filter |= 1 << mp_decode_uint(&d);   }   break;   default: skip: diff --git a/src/box/xrow.h b/src/box/xrow.h index ab5fc944a..8e5716b30 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -322,7 +322,7 @@ xrow_encode_register(struct xrow_header *row,   * @param instance_uuid Instance uuid.   * @param vclock Replication clock.   * @param anon Whether it is an anonymous subscribe request or not. - * @param id_mask A List of replica ids to skip rows from + * @param id_filter A List of replica ids to skip rows from   *                      when feeding a replica.   *   * @retval  0 Success. @@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row,         const struct tt_uuid *replicaset_uuid,         const struct tt_uuid *instance_uuid,         const struct vclock *vclock, bool anon, -       unsigned int id_mask); +       unsigned int id_filter);   /**   * Decode SUBSCRIBE command. @@ -343,7 +343,7 @@ xrow_encode_subscribe(struct xrow_header *row,   * @param[out] vclock.   * @param[out] version_id.   * @param[out] anon Whether it is an anonymous subscribe. - * @param[out] id_mask A list of ids to skip rows from when + * @param[out] id_filter A list of ids to skip rows from when   *             feeding replica.   *   * @retval  0 Success. @@ -353,7 +353,7 @@ int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,         struct tt_uuid *instance_uuid, struct vclock *vclock,         uint32_t *version_id, bool *anon, -       unsigned int *id_mask); +       unsigned int *id_filter);   /**   * Encode JOIN command. @@ -827,10 +827,10 @@ xrow_encode_subscribe_xc(struct xrow_header *row,   const struct tt_uuid *replicaset_uuid,   const struct tt_uuid *instance_uuid,   const struct vclock *vclock, bool anon, - unsigned int id_mask) + unsigned int id_filter) {   if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, -   vclock, anon, id_mask) != 0) +   vclock, anon, id_filter) != 0)   diag_raise(); }   @@ -840,11 +840,11 @@ xrow_decode_subscribe_xc(struct xrow_header *row,   struct tt_uuid *replicaset_uuid,   struct tt_uuid *instance_uuid, struct vclock *vclock,   uint32_t *replica_version_id, bool *anon, - unsigned int *id_mask) + unsigned int *id_filter) {   if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,     vclock, replica_version_id, anon, -   id_mask) != 0) +   id_filter) != 0)   diag_raise(); } > > >-- >Konstantin Osipov, Moscow, Russia     -- Serge Petrenko