27 февр. 2020 г., в 02:54, Vladislav Shpilevoy <v.shpilevoy@tarantool.org> написал(а):

Thanks for the patch!

See 8 comments below.

Hi! Thanks for the review!
Please find my comments inline and the diff below.


   replication: implement an instance id filter for relay

   Add a filter for relay to skip rows coming from unwanted instances.
   A list of instance ids whose rows replica doesn't want to fetch is encoded
   together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_FILTER.

   Filtering rows is needed to prevent an instance from fetching its own
   rows from a remote master, which is useful on initial configuration and
   harmful on resubscribe.

   Prerequisite #4739, #3294

   @TarantoolBot document

   Title: document new binary protocol key and subscribe request changes

   Add key `IPROTO_ID_FILTER = 0x51` to the internals reference.
   This is an optional key used in SUBSCRIBE request followed by an array
   of ids of instances whose rows won't be relayed to the replica.

   SUBSCRIBE request is supplemented with an optional field of the
   following structure:
   ```
   +====================+
   |      ID_FILTER     |
   |   0x51 : ID LIST   |
   | MP_INT : MP_ARRRAY |
   |                    |
   +====================+
   ```
   The field is encoded only when the id list is not empty.

diff --git a/src/box/relay.cc b/src/box/relay.cc
index b89632273..dc89b90e2 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -109,6 +109,7 @@ struct relay {
struct vclock recv_vclock;
/** Replicatoin slave version. */
uint32_t version_id;
+ unsigned int id_filter;

1. Please, add a comment here explaining what is it,
and why is needed.

Ok.


/**
* Local vclock at the moment of subscribe, used to check
* dataset on the other side and send missing data rows if any.
@@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
packet->group_id = GROUP_DEFAULT;
packet->bodycnt = 0;
}
+ /* Check if the rows from the instance are filtered. */
+ if (1 << packet->replica_id & relay->id_filter)

2. Please use explicit != 0 here and below when check 'filter_size'
variable.

No problem.


+ return;
/*
* We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
* request. If this is FINAL JOIN (i.e. relay->replica is NULL),
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 968c3a202..10edbf6a8 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1194,17 +1194,23 @@ int
xrow_encode_subscribe(struct xrow_header *row,
     const struct tt_uuid *replicaset_uuid,
     const struct tt_uuid *instance_uuid,
-      const struct vclock *vclock, bool anon)
+      const struct vclock *vclock, bool anon,
+      unsigned int id_filter)

3. Nit - it would be better to have it as uint32_t explicitly.
Becuase max id count is 32. Unsigned int does not have size
guarantees, formally speaking. It is at least 16 bits, no more
info.

Done, and I’ll send a followup to vclock.h later, as Kostja suggests.


{
memset(row, 0, sizeof(*row));
size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
+ unsigned int filter_size = __builtin_popcount(id_filter);
+ if (filter_size) {
+ size += mp_sizeof_array(filter_size) + filter_size *
+ mp_sizeof_uint(VCLOCK_MAX);

4. You didn't add mp_sizeof_uint(IPROTO_ID_FILTER) here, but you
didn't update XROW_BODY_LEN_MAX either. Does it fit in the current
value? Also seems like it can't be called XROW_BODY_LEN_MAX anymore.
Because clearly it is not enough to fit the filter array, if you
needed to patch the allocation here.

Let’s then just double XROW_BODY_LEN_MAX up.


You could also update XROW_BODY_LEN_MAX so as it includes the biggest
possible filter. Max filter size is just 34 bytes. This is the
simplest option, I think.

+ }
char *buf = (char *) region_alloc(&fiber()->gc, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
return -1;
}
char *data = buf;
- data = mp_encode_map(data, 5);
+ data = mp_encode_map(data, filter_size ? 6 : 5);
data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
data = xrow_encode_uuid(data, replicaset_uuid);
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1215,6 +1221,17 @@ xrow_encode_subscribe(struct xrow_header *row,
data = mp_encode_uint(data, tarantool_version_id());
data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
data = mp_encode_bool(data, anon);
+ if (filter_size) {

5. I wouldn't make it optional in encoding, since this is sent
really rare, but could simplify the code a bit. However, up to
you.

I don’t like the idea to pass zero length arrays in case there is no filter,
Lets just not encode this part. The filter is initially empty on decoding side
anyway.


Also, won't it break replication from an old version? You
will send subscribe with this new key, and the old instance
should ignore it. Does it? I don't remember.


Yes, the old instance ignores all unknown keys.

If the old instance would ignore it, it means that the bug
still can explode when replicating from an old version, right?
I don't know how to fix that, but if it is true, we need to
document that, at least.

Yes, the bug’s still here when replicating from an old instance.
To trigger it, you have to set up master-master replication between
the old and the new instances and fit some additional conditions.
I don’t think it’s usual to set up master-master when upgrading, as
Kostja has pointed out.


+ data = mp_encode_uint(data, IPROTO_ID_FILTER);
+ data = mp_encode_array(data, filter_size);
+ struct bit_iterator it;
+ bit_iterator_init(&it, &id_filter, sizeof(id_filter),
+  true);
+ for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
+     id = bit_iterator_next(&it)) {
+ data = mp_encode_uint(data, id);
+ }
+ }
assert(data <= buf + size);
row->body[0].iov_base = buf;
row->body[0].iov_len = (data - buf);
@@ -1301,6 +1321,21 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
}
*anon = mp_decode_bool(&d);
break;
+ case IPROTO_ID_FILTER:
+ if (id_filter == NULL)
+ goto skip;
+ if (mp_typeof(*d) != MP_ARRAY) {
+decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+   "invalid id_filter");

6. Lets say it in caps: ID_FILTER, just like with other decode
error messages. Also I would name this label 'id_filter_decode_err',
becuase other keys can't jump here.

Okay.


+ return -1;
+ }
+ uint32_t len = mp_decode_array(&d);
+ for(uint32_t i = 0; i < len; ++i) {
+ if (mp_typeof(*d) != MP_UINT)
+ goto decode_err;
+ *id_filter |= 1 << mp_decode_uint(&d);

7. If someone would send a big ID (a program, pretending to be a Tarantool
instance), it would cause unsigned bit shift overflow, which is undefined
behaviour. Lets check that it is not bigger than 31.

However this won't really help much. This code will crash even if I will
just send a truncated packet. From what I see.

I’m not sure I understand what you’re speaking about. This piece of code is
similar to the one we have in mp_decode_vclock. The situation didn’t get worse,
at least.


Up to you whether you want to fix the bit shift.

Fixed.


+ }
+ break;
default: skip:
mp_next(&d); /* value */
}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 0973c497d..8e5716b30 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row,
 * @param instance_uuid Instance uuid.
 * @param vclock Replication clock.
 * @param anon Whether it is an anonymous subscribe request or not.
+ * @param id_filter A List of replica ids to skip rows from
+ *                      when feeding a replica.

8. Looks like a huge indentation. Keep if you want. I noticed
just in case it was an accident. In the next @param description
you aligned it differently.

Thanks! I fixed both pieces.


 *
 * @retval  0 Success.
 * @retval -1 Memory error.
@@ -330,7 +332,8 @@ int
@@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row,
 * @param[out] vclock.
 * @param[out] version_id.
 * @param[out] anon Whether it is an anonymous subscribe.
+ * @param[out] id_filter A list of ids to skip rows from when
+ *             feeding replica.
 *
 * @retval  0 Success.
 * @retval -1 Memory or format error.


diff --git a/src/box/box.cc b/src/box/box.cc
index 94267c74e..09dd67ab4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1787,7 +1787,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
  uint32_t replica_version_id;
  vclock_create(&replica_clock);
  bool anon;
- unsigned int id_filter;
+ uint32_t id_filter;
  xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,
   &replica_version_id, &anon, &id_filter);
 
diff --git a/src/box/relay.cc b/src/box/relay.cc
index dc89b90e2..95245a3cf 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -109,7 +109,13 @@ struct relay {
  struct vclock recv_vclock;
  /** Replicatoin slave version. */
  uint32_t version_id;
- unsigned int id_filter;
+ /**
+  * A filter of replica ids whose rows should be ignored.
+  * Each set filter bit corresponds to a replica id whose
+  * rows shouldn't be relayed. The list of ids to ignore
+  * is passed by the replica on subscribe.
+  */
+ uint32_t id_filter;
  /**
   * Local vclock at the moment of subscribe, used to check
   * dataset on the other side and send missing data rows if any.
@@ -678,7 +684,7 @@ relay_subscribe_f(va_list ap)
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
  struct vclock *replica_clock, uint32_t replica_version_id,
- unsigned int replica_id_filter)
+ uint32_t replica_id_filter)
 {
  assert(replica->anon || replica->id != REPLICA_ID_NIL);
  struct relay *relay = replica->relay;
@@ -768,7 +774,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
  packet->bodycnt = 0;
  }
  /* Check if the rows from the instance are filtered. */
- if (1 << packet->replica_id & relay->id_filter)
+ if ((1 << packet->replica_id & relay->id_filter) != 0)
  return;
  /*
   * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
diff --git a/src/box/relay.h b/src/box/relay.h
index 6e7eebab1..0632fa912 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
  struct vclock *replica_vclock, uint32_t replica_version_id,
- unsigned int replica_id_filter);
+ uint32_t replica_id_filter);
 
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 10edbf6a8..602049004 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1195,22 +1195,18 @@ xrow_encode_subscribe(struct xrow_header *row,
        const struct tt_uuid *replicaset_uuid,
        const struct tt_uuid *instance_uuid,
        const struct vclock *vclock, bool anon,
-       unsigned int id_filter)
+       uint32_t id_filter)
 {
  memset(row, 0, sizeof(*row));
  size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
- unsigned int filter_size = __builtin_popcount(id_filter);
- if (filter_size) {
- size += mp_sizeof_array(filter_size) + filter_size *
- mp_sizeof_uint(VCLOCK_MAX);
- }
  char *buf = (char *) region_alloc(&fiber()->gc, size);
  if (buf == NULL) {
  diag_set(OutOfMemory, size, "region_alloc", "buf");
  return -1;
  }
  char *data = buf;
- data = mp_encode_map(data, filter_size ? 6 : 5);
+ int filter_size = __builtin_popcount(id_filter);
+ data = mp_encode_map(data, filter_size != 0 ? 6 : 5);
  data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
  data = xrow_encode_uuid(data, replicaset_uuid);
  data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1221,7 +1217,7 @@ xrow_encode_subscribe(struct xrow_header *row,
  data = mp_encode_uint(data, tarantool_version_id());
  data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
  data = mp_encode_bool(data, anon);
- if (filter_size) {
+ if (filter_size != 0) {
  data = mp_encode_uint(data, IPROTO_ID_FILTER);
  data = mp_encode_array(data, filter_size);
  struct bit_iterator it;
@@ -1244,7 +1240,7 @@ int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
        struct tt_uuid *instance_uuid, struct vclock *vclock,
        uint32_t *version_id, bool *anon,
-       unsigned int *id_filter)
+       uint32_t *id_filter)
 {
  if (row->bodycnt == 0) {
  diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -1325,15 +1321,18 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
  if (id_filter == NULL)
  goto skip;
  if (mp_typeof(*d) != MP_ARRAY) {
-decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
-    "invalid id_filter");
+id_filter_decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+    "invalid ID_FILTER");
  return -1;
  }
  uint32_t len = mp_decode_array(&d);
- for(uint32_t i = 0; i < len; ++i) {
+ for (uint32_t i = 0; i < len; ++i) {
  if (mp_typeof(*d) != MP_UINT)
- goto decode_err;
- *id_filter |= 1 << mp_decode_uint(&d);
+ goto id_filter_decode_err;
+ uint64_t val = mp_decode_uint(&d);
+ if (val >= VCLOCK_MAX)
+ goto id_filter_decode_err;
+ *id_filter |= 1 << val;
  }
  break;
  default: skip:
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 8e5716b30..2a0a9c852 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -48,7 +48,7 @@ enum {
  XROW_BODY_IOVMAX = 2,
  XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
  XROW_HEADER_LEN_MAX = 52,
- XROW_BODY_LEN_MAX = 128,
+ XROW_BODY_LEN_MAX = 256,
  IPROTO_HEADER_LEN = 28,
  /** 7 = sizeof(iproto_body_bin). */
  IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
@@ -323,7 +323,7 @@ xrow_encode_register(struct xrow_header *row,
  * @param vclock Replication clock.
  * @param anon Whether it is an anonymous subscribe request or not.
  * @param id_filter A List of replica ids to skip rows from
- *                      when feeding a replica.
+ *     when feeding a replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
@@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row,
        const struct tt_uuid *replicaset_uuid,
        const struct tt_uuid *instance_uuid,
        const struct vclock *vclock, bool anon,
-       unsigned int id_filter);
+       uint32_t id_filter);
 
 /**
  * Decode SUBSCRIBE command.
@@ -344,7 +344,7 @@ xrow_encode_subscribe(struct xrow_header *row,
  * @param[out] version_id.
  * @param[out] anon Whether it is an anonymous subscribe.
  * @param[out] id_filter A list of ids to skip rows from when
- *             feeding replica.
+ *  feeding a replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory or format error.
@@ -353,7 +353,7 @@ int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
        struct tt_uuid *instance_uuid, struct vclock *vclock,
        uint32_t *version_id, bool *anon,
-       unsigned int *id_filter);
+       uint32_t *id_filter);
 
 /**
  * Encode JOIN command.
@@ -827,7 +827,7 @@ xrow_encode_subscribe_xc(struct xrow_header *row,
   const struct tt_uuid *replicaset_uuid,
   const struct tt_uuid *instance_uuid,
   const struct vclock *vclock, bool anon,
-  unsigned int id_filter)
+  uint32_t id_filter)
 {
  if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
    vclock, anon, id_filter) != 0)
@@ -840,7 +840,7 @@ xrow_decode_subscribe_xc(struct xrow_header *row,
   struct tt_uuid *replicaset_uuid,
   struct tt_uuid *instance_uuid, struct vclock *vclock,
   uint32_t *replica_version_id, bool *anon,
-  unsigned int *id_filter)
+  uint32_t *id_filter)
 {
  if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
    vclock, replica_version_id, anon,

--
Serge Petrenko
sergepetrenko@tarantool.org