<HTML><BODY><div><br><br> <blockquote style="border-left:1px solid #0857A6; margin:10px; padding:0 0 0 10px;">Среда, 26 февраля 2020, 13:18 +03:00 от Konstantin Osipov <kostja.osipov@gmail.com>:<br> <div id=""><div class="js-helper js-readmsg-msg"><style type="text/css"></style><div><div id="style_15827123031191740028_BODY"><br>Generally I think you're on track with these series.<br><br>I believe it will also be possible to use this filter to not send<br>records twice in a full mesh. E.g. you could add a follow up patch<br>which excludes from the subscription peers to which we have a<br>direct connection, this will easily solve the full-mesh<br>replication traffic explosion issue, the replica will only get the<br>changes directly from the peers.</div></div></div></div></blockquote></div><div> </div><div>Okay, will do.</div><div> </div><div><blockquote style="border-left:1px solid #0857A6; margin:10px; padding:0 0 0 10px;"><div><div class="js-helper js-readmsg-msg"><div><div><br>Please solicit a detailed review.<br><br>* sergepetrenko <<a href="/compose?To=sergepetrenko@tarantool.org">sergepetrenko@tarantool.org</a>> [20/02/26 13:00]:<br> <div class="mail-quote-collapse">> + unsigned int map_size = __builtin_popcount(id_mask);<br>> + if (map_size) {<br>> + size += mp_sizeof_array(map_size) + map_size *<br>> + mp_sizeof_uint(VCLOCK_MAX);<br>> + }</div><br>Better:mask_size.<br><br>Besides, this is perhaps not id_mask, but id_filter. The<br>difference is that mask is something inclusive, i.e. if you're in<br>the mask, you get the records, while filter is<br>something exclusive, if you're not in the mask, you don't get the<br>records.</div></div></div></div></blockquote></div><div> </div><div>Okay,<br><br>s/id_mask/id_filter<br>s/map_size/mask_size<br> <p>diff --git a/src/box/box.cc b/src/box/box.cc</p><p>index 232d7861b..94267c74e 100644</p><p>--- a/src/box/box.cc</p><p>+++ b/src/box/box.cc</p><p>@@ -1787,9 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)</p><p> uint32_t replica_version_id;</p><p> vclock_create(&replica_clock);</p><p> bool anon;</p><p>- unsigned int id_mask;</p><p>+ unsigned int id_filter;</p><p> xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,</p><p>- &replica_version_id, &anon, &id_mask);</p><p>+ &replica_version_id, &anon, &id_filter);</p><p> </p><p> /* Forbid connection to itself */</p><p> if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))</p><p>@@ -1872,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)</p><p> * indefinitely).</p><p> */</p><p> relay_subscribe(replica, io->fd, header->sync, &replica_clock,</p><p>- replica_version_id, id_mask);</p><p>+ replica_version_id, id_filter);</p><p>}</p><p> </p><p>void</p><p>diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h</p><p>index 814ada303..f9d413a31 100644</p><p>--- a/src/box/iproto_constants.h</p><p>+++ b/src/box/iproto_constants.h</p><p>@@ -125,7 +125,7 @@ enum iproto_key {</p><p> IPROTO_STMT_ID = 0x43,</p><p> /* Leave a gap between SQL keys and additional request keys */</p><p> IPROTO_REPLICA_ANON = 0x50,</p><p>- IPROTO_ID_MASK = 0x51,</p><p>+ IPROTO_ID_FILTER = 0x51,</p><p> IPROTO_KEY_MAX</p><p>};</p><p> </p><p>diff --git a/src/box/relay.cc b/src/box/relay.cc</p><p>index 87930a006..dc89b90e2 100644</p><p>--- a/src/box/relay.cc</p><p>+++ b/src/box/relay.cc</p><p>@@ -109,7 +109,7 @@ struct relay {</p><p> struct vclock recv_vclock;</p><p> /** Replicatoin slave version. */</p><p> uint32_t version_id;</p><p>- unsigned int id_mask;</p><p>+ unsigned int id_filter;</p><p> /**</p><p> * Local vclock at the moment of subscribe, used to check</p><p> * dataset on the other side and send missing data rows if any.</p><p>@@ -678,7 +678,7 @@ relay_subscribe_f(va_list ap)</p><p>void</p><p>relay_subscribe(struct replica *replica, int fd, uint64_t sync,</p><p> struct vclock *replica_clock, uint32_t replica_version_id,</p><p>- unsigned int replica_id_mask)</p><p>+ unsigned int replica_id_filter)</p><p>{</p><p> assert(replica->anon || replica->id != REPLICA_ID_NIL);</p><p> struct relay *relay = replica->relay;</p><p>@@ -707,7 +707,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,</p><p> vclock_copy(&relay->tx.vclock, replica_clock);</p><p> relay->version_id = replica_version_id;</p><p> </p><p>- relay->id_mask = replica_id_mask;</p><p>+ relay->id_filter = replica_id_filter;</p><p> </p><p> int rc = cord_costart(&relay->cord, "subscribe",</p><p> relay_subscribe_f, relay);</p><p>@@ -768,7 +768,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)</p><p> packet->bodycnt = 0;</p><p> }</p><p> /* Check if the rows from the instance are filtered. */</p><p>- if (1 << packet->replica_id & relay->id_mask)</p><p>+ if (1 << packet->replica_id & relay->id_filter)</p><p> return;</p><p> /*</p><p> * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE</p><p>diff --git a/src/box/relay.h b/src/box/relay.h</p><p>index 255d29d90..6e7eebab1 100644</p><p>--- a/src/box/relay.h</p><p>+++ b/src/box/relay.h</p><p>@@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,</p><p>void</p><p>relay_subscribe(struct replica *replica, int fd, uint64_t sync,</p><p> struct vclock *replica_vclock, uint32_t replica_version_id,</p><p>- unsigned int replica_id_mask);</p><p>+ unsigned int replica_id_filter);</p><p> </p><p>#endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */</p><p>diff --git a/src/box/xrow.c b/src/box/xrow.c</p><p>index ee78ed24d..10edbf6a8 100644</p><p>--- a/src/box/xrow.c</p><p>+++ b/src/box/xrow.c</p><p>@@ -1195,13 +1195,13 @@ xrow_encode_subscribe(struct xrow_header *row,</p><p> const struct tt_uuid *replicaset_uuid,</p><p> const struct tt_uuid *instance_uuid,</p><p> const struct vclock *vclock, bool anon,</p><p>- unsigned int id_mask)</p><p>+ unsigned int id_filter)</p><p>{</p><p> memset(row, 0, sizeof(*row));</p><p> size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);</p><p>- unsigned int map_size = __builtin_popcount(id_mask);</p><p>- if (map_size) {</p><p>- size += mp_sizeof_array(map_size) + map_size *</p><p>+ unsigned int filter_size = __builtin_popcount(id_filter);</p><p>+ if (filter_size) {</p><p>+ size += mp_sizeof_array(filter_size) + filter_size *</p><p> mp_sizeof_uint(VCLOCK_MAX);</p><p> }</p><p> char *buf = (char *) region_alloc(&fiber()->gc, size);</p><p>@@ -1210,7 +1210,7 @@ xrow_encode_subscribe(struct xrow_header *row,</p><p> return -1;</p><p> }</p><p> char *data = buf;</p><p>- data = mp_encode_map(data, map_size ? 6 : 5);</p><p>+ data = mp_encode_map(data, filter_size ? 6 : 5);</p><p> data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);</p><p> data = xrow_encode_uuid(data, replicaset_uuid);</p><p> data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);</p><p>@@ -1221,11 +1221,11 @@ xrow_encode_subscribe(struct xrow_header *row,</p><p> data = mp_encode_uint(data, tarantool_version_id());</p><p> data = mp_encode_uint(data, IPROTO_REPLICA_ANON);</p><p> data = mp_encode_bool(data, anon);</p><p>- if (map_size) {</p><p>- data = mp_encode_uint(data, IPROTO_ID_MASK);</p><p>- data = mp_encode_array(data, map_size);</p><p>+ if (filter_size) {</p><p>+ data = mp_encode_uint(data, IPROTO_ID_FILTER);</p><p>+ data = mp_encode_array(data, filter_size);</p><p> struct bit_iterator it;</p><p>- bit_iterator_init(&it, &id_mask, sizeof(id_mask),</p><p>+ bit_iterator_init(&it, &id_filter, sizeof(id_filter),</p><p> true);</p><p> for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;</p><p> id = bit_iterator_next(&it)) {</p><p>@@ -1244,7 +1244,7 @@ int</p><p>xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,</p><p> struct tt_uuid *instance_uuid, struct vclock *vclock,</p><p> uint32_t *version_id, bool *anon,</p><p>- unsigned int *id_mask)</p><p>+ unsigned int *id_filter)</p><p>{</p><p> if (row->bodycnt == 0) {</p><p> diag_set(ClientError, ER_INVALID_MSGPACK, "request body");</p><p>@@ -1262,8 +1262,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,</p><p> </p><p> if (anon)</p><p> *anon = false;</p><p>- if (id_mask)</p><p>- *id_mask = 0;</p><p>+ if (id_filter)</p><p>+ *id_filter = 0;</p><p> d = data;</p><p> uint32_t map_size = mp_decode_map(&d);</p><p> for (uint32_t i = 0; i < map_size; i++) {</p><p>@@ -1321,19 +1321,19 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,</p><p> }</p><p> *anon = mp_decode_bool(&d);</p><p> break;</p><p>- case IPROTO_ID_MASK:</p><p>- if (id_mask == NULL)</p><p>+ case IPROTO_ID_FILTER:</p><p>+ if (id_filter == NULL)</p><p> goto skip;</p><p> if (mp_typeof(*d) != MP_ARRAY) {</p><p>decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,</p><p>- "invalid id_mask");</p><p>+ "invalid id_filter");</p><p> return -1;</p><p> }</p><p> uint32_t len = mp_decode_array(&d);</p><p> for(uint32_t i = 0; i < len; ++i) {</p><p> if (mp_typeof(*d) != MP_UINT)</p><p> goto decode_err;</p><p>- *id_mask |= 1 << mp_decode_uint(&d);</p><p>+ *id_filter |= 1 << mp_decode_uint(&d);</p><p> }</p><p> break;</p><p> default: skip:</p><p>diff --git a/src/box/xrow.h b/src/box/xrow.h</p><p>index ab5fc944a..8e5716b30 100644</p><p>--- a/src/box/xrow.h</p><p>+++ b/src/box/xrow.h</p><p>@@ -322,7 +322,7 @@ xrow_encode_register(struct xrow_header *row,</p><p> * @param instance_uuid Instance uuid.</p><p> * @param vclock Replication clock.</p><p> * @param anon Whether it is an anonymous subscribe request or not.</p><p>- * @param id_mask A List of replica ids to skip rows from</p><p>+ * @param id_filter A List of replica ids to skip rows from</p><p> * when feeding a replica.</p><p> *</p><p> * @retval 0 Success.</p><p>@@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row,</p><p> const struct tt_uuid *replicaset_uuid,</p><p> const struct tt_uuid *instance_uuid,</p><p> const struct vclock *vclock, bool anon,</p><p>- unsigned int id_mask);</p><p>+ unsigned int id_filter);</p><p> </p><p>/**</p><p> * Decode SUBSCRIBE command.</p><p>@@ -343,7 +343,7 @@ xrow_encode_subscribe(struct xrow_header *row,</p><p> * @param[out] vclock.</p><p> * @param[out] version_id.</p><p> * @param[out] anon Whether it is an anonymous subscribe.</p><p>- * @param[out] id_mask A list of ids to skip rows from when</p><p>+ * @param[out] id_filter A list of ids to skip rows from when</p><p> * feeding replica.</p><p> *</p><p> * @retval 0 Success.</p><p>@@ -353,7 +353,7 @@ int</p><p>xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,</p><p> struct tt_uuid *instance_uuid, struct vclock *vclock,</p><p> uint32_t *version_id, bool *anon,</p><p>- unsigned int *id_mask);</p><p>+ unsigned int *id_filter);</p><p> </p><p>/**</p><p> * Encode JOIN command.</p><p>@@ -827,10 +827,10 @@ xrow_encode_subscribe_xc(struct xrow_header *row,</p><p> const struct tt_uuid *replicaset_uuid,</p><p> const struct tt_uuid *instance_uuid,</p><p> const struct vclock *vclock, bool anon,</p><p>- unsigned int id_mask)</p><p>+ unsigned int id_filter)</p><p>{</p><p> if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,</p><p>- vclock, anon, id_mask) != 0)</p><p>+ vclock, anon, id_filter) != 0)</p><p> diag_raise();</p><p>}</p><p> </p><p>@@ -840,11 +840,11 @@ xrow_decode_subscribe_xc(struct xrow_header *row,</p><p> struct tt_uuid *replicaset_uuid,</p><p> struct tt_uuid *instance_uuid, struct vclock *vclock,</p><p> uint32_t *replica_version_id, bool *anon,</p><p>- unsigned int *id_mask)</p><p>+ unsigned int *id_filter)</p><p>{</p><p> if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,</p><p> vclock, replica_version_id, anon,</p><p>- id_mask) != 0)</p><p>+ id_filter) != 0)</p><p> diag_raise();</p><p>}</p></div><div><blockquote style="border-left:1px solid #0857A6; margin:10px; padding:0 0 0 10px;"><div><div class="js-helper js-readmsg-msg"><div><div><br><br>--<br>Konstantin Osipov, Moscow, Russia</div></div></div></div></blockquote> <div> </div><div data-signature-widget="container"><div data-signature-widget="content"><div>--<br>Serge Petrenko</div></div></div><div> </div></div></BODY></HTML>