Среда, 26 февраля 2020, 13:18 +03:00 от Konstantin Osipov <kostja.osipov@gmail.com>:
 

Generally I think you're on track with these series.

I believe it will also be possible to use this filter to not send
records twice in a full mesh. E.g. you could add a follow up patch
which excludes from the subscription peers to which we have a
direct connection, this will easily solve the full-mesh
replication traffic explosion issue, the replica will only get the
changes directly from the peers.
 
Okay, will do.
 

Please solicit a detailed review.

* sergepetrenko <sergepetrenko@tarantool.org> [20/02/26 13:00]:
 
> + unsigned int map_size = __builtin_popcount(id_mask);
> + if (map_size) {
> + size += mp_sizeof_array(map_size) + map_size *
> + mp_sizeof_uint(VCLOCK_MAX);
> + }

Better:mask_size.

Besides, this is perhaps not id_mask, but id_filter. The
difference is that mask is something inclusive, i.e. if you're in
the mask, you get the records, while filter is
something exclusive, if you're not in the mask, you don't get the
records.
 
Okay,

s/id_mask/id_filter
s/map_size/mask_size
 

diff --git a/src/box/box.cc b/src/box/box.cc

index 232d7861b..94267c74e 100644

--- a/src/box/box.cc

+++ b/src/box/box.cc

@@ -1787,9 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)

  uint32_t replica_version_id;

  vclock_create(&replica_clock);

  bool anon;

- unsigned int id_mask;

+ unsigned int id_filter;

  xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,

- &replica_version_id, &anon, &id_mask);

+ &replica_version_id, &anon, &id_filter);

 

  /* Forbid connection to itself */

  if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))

@@ -1872,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)

  * indefinitely).

  */

  relay_subscribe(replica, io->fd, header->sync, &replica_clock,

- replica_version_id, id_mask);

+ replica_version_id, id_filter);

}

 

void

diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h

index 814ada303..f9d413a31 100644

--- a/src/box/iproto_constants.h

+++ b/src/box/iproto_constants.h

@@ -125,7 +125,7 @@ enum iproto_key {

  IPROTO_STMT_ID = 0x43,

  /* Leave a gap between SQL keys and additional request keys */

  IPROTO_REPLICA_ANON = 0x50,

- IPROTO_ID_MASK = 0x51,

+ IPROTO_ID_FILTER = 0x51,

  IPROTO_KEY_MAX

};

 

diff --git a/src/box/relay.cc b/src/box/relay.cc

index 87930a006..dc89b90e2 100644

--- a/src/box/relay.cc

+++ b/src/box/relay.cc

@@ -109,7 +109,7 @@ struct relay {

  struct vclock recv_vclock;

  /** Replicatoin slave version. */

  uint32_t version_id;

- unsigned int id_mask;

+ unsigned int id_filter;

  /**

  * Local vclock at the moment of subscribe, used to check

  * dataset on the other side and send missing data rows if any.

@@ -678,7 +678,7 @@ relay_subscribe_f(va_list ap)

void

relay_subscribe(struct replica *replica, int fd, uint64_t sync,

  struct vclock *replica_clock, uint32_t replica_version_id,

- unsigned int replica_id_mask)

+ unsigned int replica_id_filter)

{

  assert(replica->anon || replica->id != REPLICA_ID_NIL);

  struct relay *relay = replica->relay;

@@ -707,7 +707,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,

  vclock_copy(&relay->tx.vclock, replica_clock);

  relay->version_id = replica_version_id;

 

- relay->id_mask = replica_id_mask;

+ relay->id_filter = replica_id_filter;

 

  int rc = cord_costart(&relay->cord, "subscribe",

        relay_subscribe_f, relay);

@@ -768,7 +768,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)

  packet->bodycnt = 0;

  }

  /* Check if the rows from the instance are filtered. */

- if (1 << packet->replica_id & relay->id_mask)

+ if (1 << packet->replica_id & relay->id_filter)

  return;

  /*

  * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE

diff --git a/src/box/relay.h b/src/box/relay.h

index 255d29d90..6e7eebab1 100644

--- a/src/box/relay.h

+++ b/src/box/relay.h

@@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,

void

relay_subscribe(struct replica *replica, int fd, uint64_t sync,

  struct vclock *replica_vclock, uint32_t replica_version_id,

- unsigned int replica_id_mask);

+ unsigned int replica_id_filter);

 

#endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */

diff --git a/src/box/xrow.c b/src/box/xrow.c

index ee78ed24d..10edbf6a8 100644

--- a/src/box/xrow.c

+++ b/src/box/xrow.c

@@ -1195,13 +1195,13 @@ xrow_encode_subscribe(struct xrow_header *row,

        const struct tt_uuid *replicaset_uuid,

        const struct tt_uuid *instance_uuid,

        const struct vclock *vclock, bool anon,

-       unsigned int id_mask)

+       unsigned int id_filter)

{

  memset(row, 0, sizeof(*row));

  size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);

- unsigned int map_size = __builtin_popcount(id_mask);

- if (map_size) {

- size += mp_sizeof_array(map_size) + map_size *

+ unsigned int filter_size = __builtin_popcount(id_filter);

+ if (filter_size) {

+ size += mp_sizeof_array(filter_size) + filter_size *

  mp_sizeof_uint(VCLOCK_MAX);

  }

  char *buf = (char *) region_alloc(&fiber()->gc, size);

@@ -1210,7 +1210,7 @@ xrow_encode_subscribe(struct xrow_header *row,

  return -1;

  }

  char *data = buf;

- data = mp_encode_map(data, map_size ? 6 : 5);

+ data = mp_encode_map(data, filter_size ? 6 : 5);

  data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);

  data = xrow_encode_uuid(data, replicaset_uuid);

  data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);

@@ -1221,11 +1221,11 @@ xrow_encode_subscribe(struct xrow_header *row,

  data = mp_encode_uint(data, tarantool_version_id());

  data = mp_encode_uint(data, IPROTO_REPLICA_ANON);

  data = mp_encode_bool(data, anon);

- if (map_size) {

- data = mp_encode_uint(data, IPROTO_ID_MASK);

- data = mp_encode_array(data, map_size);

+ if (filter_size) {

+ data = mp_encode_uint(data, IPROTO_ID_FILTER);

+ data = mp_encode_array(data, filter_size);

  struct bit_iterator it;

- bit_iterator_init(&it, &id_mask, sizeof(id_mask),

+ bit_iterator_init(&it, &id_filter, sizeof(id_filter),

    true);

  for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;

      id = bit_iterator_next(&it)) {

@@ -1244,7 +1244,7 @@ int

xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,

        struct tt_uuid *instance_uuid, struct vclock *vclock,

        uint32_t *version_id, bool *anon,

-       unsigned int *id_mask)

+       unsigned int *id_filter)

{

  if (row->bodycnt == 0) {

  diag_set(ClientError, ER_INVALID_MSGPACK, "request body");

@@ -1262,8 +1262,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,

 

  if (anon)

  *anon = false;

- if (id_mask)

- *id_mask = 0;

+ if (id_filter)

+ *id_filter = 0;

  d = data;

  uint32_t map_size = mp_decode_map(&d);

  for (uint32_t i = 0; i < map_size; i++) {

@@ -1321,19 +1321,19 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,

  }

  *anon = mp_decode_bool(&d);

  break;

- case IPROTO_ID_MASK:

- if (id_mask == NULL)

+ case IPROTO_ID_FILTER:

+ if (id_filter == NULL)

  goto skip;

  if (mp_typeof(*d) != MP_ARRAY) {

decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,

-   "invalid id_mask");

+   "invalid id_filter");

  return -1;

  }

  uint32_t len = mp_decode_array(&d);

  for(uint32_t i = 0; i < len; ++i) {

  if (mp_typeof(*d) != MP_UINT)

  goto decode_err;

- *id_mask |= 1 << mp_decode_uint(&d);

+ *id_filter |= 1 << mp_decode_uint(&d);

  }

  break;

  default: skip:

diff --git a/src/box/xrow.h b/src/box/xrow.h

index ab5fc944a..8e5716b30 100644

--- a/src/box/xrow.h

+++ b/src/box/xrow.h

@@ -322,7 +322,7 @@ xrow_encode_register(struct xrow_header *row,

  * @param instance_uuid Instance uuid.

  * @param vclock Replication clock.

  * @param anon Whether it is an anonymous subscribe request or not.

- * @param id_mask A List of replica ids to skip rows from

+ * @param id_filter A List of replica ids to skip rows from

  *                      when feeding a replica.

  *

  * @retval  0 Success.

@@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row,

        const struct tt_uuid *replicaset_uuid,

        const struct tt_uuid *instance_uuid,

        const struct vclock *vclock, bool anon,

-       unsigned int id_mask);

+       unsigned int id_filter);

 

/**

  * Decode SUBSCRIBE command.

@@ -343,7 +343,7 @@ xrow_encode_subscribe(struct xrow_header *row,

  * @param[out] vclock.

  * @param[out] version_id.

  * @param[out] anon Whether it is an anonymous subscribe.

- * @param[out] id_mask A list of ids to skip rows from when

+ * @param[out] id_filter A list of ids to skip rows from when

  *             feeding replica.

  *

  * @retval  0 Success.

@@ -353,7 +353,7 @@ int

xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,

        struct tt_uuid *instance_uuid, struct vclock *vclock,

        uint32_t *version_id, bool *anon,

-       unsigned int *id_mask);

+       unsigned int *id_filter);

 

/**

  * Encode JOIN command.

@@ -827,10 +827,10 @@ xrow_encode_subscribe_xc(struct xrow_header *row,

  const struct tt_uuid *replicaset_uuid,

  const struct tt_uuid *instance_uuid,

  const struct vclock *vclock, bool anon,

- unsigned int id_mask)

+ unsigned int id_filter)

{

  if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,

-   vclock, anon, id_mask) != 0)

+   vclock, anon, id_filter) != 0)

  diag_raise();

}

 

@@ -840,11 +840,11 @@ xrow_decode_subscribe_xc(struct xrow_header *row,

  struct tt_uuid *replicaset_uuid,

  struct tt_uuid *instance_uuid, struct vclock *vclock,

  uint32_t *replica_version_id, bool *anon,

- unsigned int *id_mask)

+ unsigned int *id_filter)

{

  if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,

    vclock, replica_version_id, anon,

-   id_mask) != 0)

+   id_filter) != 0)

  diag_raise();

}



--
Konstantin Osipov, Moscow, Russia
 
 
--
Serge Petrenko