[Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay

Serge Petrenko sergepetrenko at tarantool.org
Wed Feb 26 14:16:20 MSK 2020




  
>Среда, 26 февраля 2020, 13:18 +03:00 от Konstantin Osipov <kostja.osipov at gmail.com>:
> 
>
>Generally I think you're on track with these series.
>
>I believe it will also be possible to use this filter to not send
>records twice in a full mesh. E.g. you could add a follow up patch
>which excludes from the subscription peers to which we have a
>direct connection, this will easily solve the full-mesh
>replication traffic explosion issue, the replica will only get the
>changes directly from the peers.
 
Okay, will do.
 
>
>Please solicit a detailed review.
>
>* sergepetrenko < sergepetrenko at tarantool.org > [20/02/26 13:00]:
> 
>> + unsigned int map_size = __builtin_popcount(id_mask);
>> + if (map_size) {
>> + size += mp_sizeof_array(map_size) + map_size *
>> + mp_sizeof_uint(VCLOCK_MAX);
>> + }
>Better:mask_size.
>
>Besides, this is perhaps not id_mask, but id_filter. The
>difference is that mask is something inclusive, i.e. if you're in
>the mask, you get the records, while filter is
>something exclusive, if you're not in the mask, you don't get the
>records.
 
Okay,

s/id_mask/id_filter
s/map_size/mask_size
 
diff --git a/src/box/box.cc b/src/box/box.cc
index 232d7861b..94267c74e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1787,9 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
  uint32_t replica_version_id;
  vclock_create(&replica_clock);
  bool anon;
- unsigned int id_mask;
+ unsigned int id_filter;
  xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,
- &replica_version_id, &anon, &id_mask);
+ &replica_version_id, &anon, &id_filter);
 
  /* Forbid connection to itself */
  if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))
@@ -1872,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
  * indefinitely).
  */
  relay_subscribe(replica, io->fd, header->sync, &replica_clock,
- replica_version_id, id_mask);
+ replica_version_id, id_filter);
}
 
void
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 814ada303..f9d413a31 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -125,7 +125,7 @@ enum iproto_key {
  IPROTO_STMT_ID = 0x43,
  /* Leave a gap between SQL keys and additional request keys */
  IPROTO_REPLICA_ANON = 0x50,
- IPROTO_ID_MASK = 0x51,
+ IPROTO_ID_FILTER = 0x51,
  IPROTO_KEY_MAX
};
 
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 87930a006..dc89b90e2 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -109,7 +109,7 @@ struct relay {
  struct vclock recv_vclock;
  /** Replicatoin slave version. */
  uint32_t version_id;
- unsigned int id_mask;
+ unsigned int id_filter;
  /**
  * Local vclock at the moment of subscribe, used to check
  * dataset on the other side and send missing data rows if any.
@@ -678,7 +678,7 @@ relay_subscribe_f(va_list ap)
void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
  struct vclock *replica_clock, uint32_t replica_version_id,
- unsigned int replica_id_mask)
+ unsigned int replica_id_filter)
{
  assert(replica->anon || replica->id != REPLICA_ID_NIL);
  struct relay *relay = replica->relay;
@@ -707,7 +707,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
  vclock_copy(&relay->tx.vclock, replica_clock);
  relay->version_id = replica_version_id;
 
- relay->id_mask = replica_id_mask;
+ relay->id_filter = replica_id_filter;
 
  int rc = cord_costart(&relay->cord, "subscribe",
        relay_subscribe_f, relay);
@@ -768,7 +768,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
  packet->bodycnt = 0;
  }
  /* Check if the rows from the instance are filtered. */
- if (1 << packet->replica_id & relay->id_mask)
+ if (1 << packet->replica_id & relay->id_filter)
  return;
  /*
  * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
diff --git a/src/box/relay.h b/src/box/relay.h
index 255d29d90..6e7eebab1 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
  struct vclock *replica_vclock, uint32_t replica_version_id,
- unsigned int replica_id_mask);
+ unsigned int replica_id_filter);
 
#endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index ee78ed24d..10edbf6a8 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1195,13 +1195,13 @@ xrow_encode_subscribe(struct xrow_header *row,
        const struct tt_uuid *replicaset_uuid,
        const struct tt_uuid *instance_uuid,
        const struct vclock *vclock, bool anon,
-       unsigned int id_mask)
+       unsigned int id_filter)
{
  memset(row, 0, sizeof(*row));
  size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
- unsigned int map_size = __builtin_popcount(id_mask);
- if (map_size) {
- size += mp_sizeof_array(map_size) + map_size *
+ unsigned int filter_size = __builtin_popcount(id_filter);
+ if (filter_size) {
+ size += mp_sizeof_array(filter_size) + filter_size *
  mp_sizeof_uint(VCLOCK_MAX);
  }
  char *buf = (char *) region_alloc(&fiber()->gc, size);
@@ -1210,7 +1210,7 @@ xrow_encode_subscribe(struct xrow_header *row,
  return -1;
  }
  char *data = buf;
- data = mp_encode_map(data, map_size ? 6 : 5);
+ data = mp_encode_map(data, filter_size ? 6 : 5);
  data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
  data = xrow_encode_uuid(data, replicaset_uuid);
  data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1221,11 +1221,11 @@ xrow_encode_subscribe(struct xrow_header *row,
  data = mp_encode_uint(data, tarantool_version_id());
  data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
  data = mp_encode_bool(data, anon);
- if (map_size) {
- data = mp_encode_uint(data, IPROTO_ID_MASK);
- data = mp_encode_array(data, map_size);
+ if (filter_size) {
+ data = mp_encode_uint(data, IPROTO_ID_FILTER);
+ data = mp_encode_array(data, filter_size);
  struct bit_iterator it;
- bit_iterator_init(&it, &id_mask, sizeof(id_mask),
+ bit_iterator_init(&it, &id_filter, sizeof(id_filter),
    true);
  for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
      id = bit_iterator_next(&it)) {
@@ -1244,7 +1244,7 @@ int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
        struct tt_uuid *instance_uuid, struct vclock *vclock,
        uint32_t *version_id, bool *anon,
-       unsigned int *id_mask)
+       unsigned int *id_filter)
{
  if (row->bodycnt == 0) {
  diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -1262,8 +1262,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 
  if (anon)
  *anon = false;
- if (id_mask)
- *id_mask = 0;
+ if (id_filter)
+ *id_filter = 0;
  d = data;
  uint32_t map_size = mp_decode_map(&d);
  for (uint32_t i = 0; i < map_size; i++) {
@@ -1321,19 +1321,19 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
  }
  *anon = mp_decode_bool(&d);
  break;
- case IPROTO_ID_MASK:
- if (id_mask == NULL)
+ case IPROTO_ID_FILTER:
+ if (id_filter == NULL)
  goto skip;
  if (mp_typeof(*d) != MP_ARRAY) {
decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
-   "invalid id_mask");
+   "invalid id_filter");
  return -1;
  }
  uint32_t len = mp_decode_array(&d);
  for(uint32_t i = 0; i < len; ++i) {
  if (mp_typeof(*d) != MP_UINT)
  goto decode_err;
- *id_mask |= 1 << mp_decode_uint(&d);
+ *id_filter |= 1 << mp_decode_uint(&d);
  }
  break;
  default: skip:
diff --git a/src/box/xrow.h b/src/box/xrow.h
index ab5fc944a..8e5716b30 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -322,7 +322,7 @@ xrow_encode_register(struct xrow_header *row,
  * @param instance_uuid Instance uuid.
  * @param vclock Replication clock.
  * @param anon Whether it is an anonymous subscribe request or not.
- * @param id_mask A List of replica ids to skip rows from
+ * @param id_filter A List of replica ids to skip rows from
  *                      when feeding a replica.
  *
  * @retval  0 Success.
@@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row,
        const struct tt_uuid *replicaset_uuid,
        const struct tt_uuid *instance_uuid,
        const struct vclock *vclock, bool anon,
-       unsigned int id_mask);
+       unsigned int id_filter);
 
/**
  * Decode SUBSCRIBE command.
@@ -343,7 +343,7 @@ xrow_encode_subscribe(struct xrow_header *row,
  * @param[out] vclock.
  * @param[out] version_id.
  * @param[out] anon Whether it is an anonymous subscribe.
- * @param[out] id_mask A list of ids to skip rows from when
+ * @param[out] id_filter A list of ids to skip rows from when
  *             feeding replica.
  *
  * @retval  0 Success.
@@ -353,7 +353,7 @@ int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
        struct tt_uuid *instance_uuid, struct vclock *vclock,
        uint32_t *version_id, bool *anon,
-       unsigned int *id_mask);
+       unsigned int *id_filter);
 
/**
  * Encode JOIN command.
@@ -827,10 +827,10 @@ xrow_encode_subscribe_xc(struct xrow_header *row,
  const struct tt_uuid *replicaset_uuid,
  const struct tt_uuid *instance_uuid,
  const struct vclock *vclock, bool anon,
- unsigned int id_mask)
+ unsigned int id_filter)
{
  if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
-   vclock, anon, id_mask) != 0)
+   vclock, anon, id_filter) != 0)
  diag_raise();
}
 
@@ -840,11 +840,11 @@ xrow_decode_subscribe_xc(struct xrow_header *row,
  struct tt_uuid *replicaset_uuid,
  struct tt_uuid *instance_uuid, struct vclock *vclock,
  uint32_t *replica_version_id, bool *anon,
- unsigned int *id_mask)
+ unsigned int *id_filter)
{
  if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
    vclock, replica_version_id, anon,
-   id_mask) != 0)
+   id_filter) != 0)
  diag_raise();
}
>
>
>--
>Konstantin Osipov, Moscow, Russia 
 
 
--
Serge Petrenko
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20200226/0e5d5c85/attachment.html>


More information about the Tarantool-patches mailing list