From: "Serge Petrenko" <sergepetrenko@tarantool.org> To: "Konstantin Osipov" <kostja.osipov@gmail.com> Cc: kirichenkoga@gmail.com, tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org Subject: Re: [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay Date: Wed, 26 Feb 2020 14:16:20 +0300 [thread overview] Message-ID: <1582715780.115016077@f179.i.mail.ru> (raw) In-Reply-To: <20200226101819.GA15433@atlas> [-- Attachment #1: Type: text/plain, Size: 10129 bytes --] >Среда, 26 февраля 2020, 13:18 +03:00 от Konstantin Osipov <kostja.osipov@gmail.com>: > > >Generally I think you're on track with these series. > >I believe it will also be possible to use this filter to not send >records twice in a full mesh. E.g. you could add a follow up patch >which excludes from the subscription peers to which we have a >direct connection, this will easily solve the full-mesh >replication traffic explosion issue, the replica will only get the >changes directly from the peers. Okay, will do. > >Please solicit a detailed review. > >* sergepetrenko < sergepetrenko@tarantool.org > [20/02/26 13:00]: > >> + unsigned int map_size = __builtin_popcount(id_mask); >> + if (map_size) { >> + size += mp_sizeof_array(map_size) + map_size * >> + mp_sizeof_uint(VCLOCK_MAX); >> + } >Better:mask_size. > >Besides, this is perhaps not id_mask, but id_filter. The >difference is that mask is something inclusive, i.e. if you're in >the mask, you get the records, while filter is >something exclusive, if you're not in the mask, you don't get the >records. Okay, s/id_mask/id_filter s/map_size/mask_size diff --git a/src/box/box.cc b/src/box/box.cc index 232d7861b..94267c74e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1787,9 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) uint32_t replica_version_id; vclock_create(&replica_clock); bool anon; - unsigned int id_mask; + unsigned int id_filter; xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock, - &replica_version_id, &anon, &id_mask); + &replica_version_id, &anon, &id_filter); /* Forbid connection to itself */ if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID)) @@ -1872,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) * indefinitely). */ relay_subscribe(replica, io->fd, header->sync, &replica_clock, - replica_version_id, id_mask); + replica_version_id, id_filter); } void diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 814ada303..f9d413a31 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -125,7 +125,7 @@ enum iproto_key { IPROTO_STMT_ID = 0x43, /* Leave a gap between SQL keys and additional request keys */ IPROTO_REPLICA_ANON = 0x50, - IPROTO_ID_MASK = 0x51, + IPROTO_ID_FILTER = 0x51, IPROTO_KEY_MAX }; diff --git a/src/box/relay.cc b/src/box/relay.cc index 87930a006..dc89b90e2 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -109,7 +109,7 @@ struct relay { struct vclock recv_vclock; /** Replicatoin slave version. */ uint32_t version_id; - unsigned int id_mask; + unsigned int id_filter; /** * Local vclock at the moment of subscribe, used to check * dataset on the other side and send missing data rows if any. @@ -678,7 +678,7 @@ relay_subscribe_f(va_list ap) void relay_subscribe(struct replica *replica, int fd, uint64_t sync, struct vclock *replica_clock, uint32_t replica_version_id, - unsigned int replica_id_mask) + unsigned int replica_id_filter) { assert(replica->anon || replica->id != REPLICA_ID_NIL); struct relay *relay = replica->relay; @@ -707,7 +707,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; - relay->id_mask = replica_id_mask; + relay->id_filter = replica_id_filter; int rc = cord_costart(&relay->cord, "subscribe", relay_subscribe_f, relay); @@ -768,7 +768,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) packet->bodycnt = 0; } /* Check if the rows from the instance are filtered. */ - if (1 << packet->replica_id & relay->id_mask) + if (1 << packet->replica_id & relay->id_filter) return; /* * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE diff --git a/src/box/relay.h b/src/box/relay.h index 255d29d90..6e7eebab1 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, void relay_subscribe(struct replica *replica, int fd, uint64_t sync, struct vclock *replica_vclock, uint32_t replica_version_id, - unsigned int replica_id_mask); + unsigned int replica_id_filter); #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */ diff --git a/src/box/xrow.c b/src/box/xrow.c index ee78ed24d..10edbf6a8 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1195,13 +1195,13 @@ xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, const struct vclock *vclock, bool anon, - unsigned int id_mask) + unsigned int id_filter) { memset(row, 0, sizeof(*row)); size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); - unsigned int map_size = __builtin_popcount(id_mask); - if (map_size) { - size += mp_sizeof_array(map_size) + map_size * + unsigned int filter_size = __builtin_popcount(id_filter); + if (filter_size) { + size += mp_sizeof_array(filter_size) + filter_size * mp_sizeof_uint(VCLOCK_MAX); } char *buf = (char *) region_alloc(&fiber()->gc, size); @@ -1210,7 +1210,7 @@ xrow_encode_subscribe(struct xrow_header *row, return -1; } char *data = buf; - data = mp_encode_map(data, map_size ? 6 : 5); + data = mp_encode_map(data, filter_size ? 6 : 5); data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); data = xrow_encode_uuid(data, replicaset_uuid); data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); @@ -1221,11 +1221,11 @@ xrow_encode_subscribe(struct xrow_header *row, data = mp_encode_uint(data, tarantool_version_id()); data = mp_encode_uint(data, IPROTO_REPLICA_ANON); data = mp_encode_bool(data, anon); - if (map_size) { - data = mp_encode_uint(data, IPROTO_ID_MASK); - data = mp_encode_array(data, map_size); + if (filter_size) { + data = mp_encode_uint(data, IPROTO_ID_FILTER); + data = mp_encode_array(data, filter_size); struct bit_iterator it; - bit_iterator_init(&it, &id_mask, sizeof(id_mask), + bit_iterator_init(&it, &id_filter, sizeof(id_filter), true); for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX; id = bit_iterator_next(&it)) { @@ -1244,7 +1244,7 @@ int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, uint32_t *version_id, bool *anon, - unsigned int *id_mask) + unsigned int *id_filter) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -1262,8 +1262,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, if (anon) *anon = false; - if (id_mask) - *id_mask = 0; + if (id_filter) + *id_filter = 0; d = data; uint32_t map_size = mp_decode_map(&d); for (uint32_t i = 0; i < map_size; i++) { @@ -1321,19 +1321,19 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, } *anon = mp_decode_bool(&d); break; - case IPROTO_ID_MASK: - if (id_mask == NULL) + case IPROTO_ID_FILTER: + if (id_filter == NULL) goto skip; if (mp_typeof(*d) != MP_ARRAY) { decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK, - "invalid id_mask"); + "invalid id_filter"); return -1; } uint32_t len = mp_decode_array(&d); for(uint32_t i = 0; i < len; ++i) { if (mp_typeof(*d) != MP_UINT) goto decode_err; - *id_mask |= 1 << mp_decode_uint(&d); + *id_filter |= 1 << mp_decode_uint(&d); } break; default: skip: diff --git a/src/box/xrow.h b/src/box/xrow.h index ab5fc944a..8e5716b30 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -322,7 +322,7 @@ xrow_encode_register(struct xrow_header *row, * @param instance_uuid Instance uuid. * @param vclock Replication clock. * @param anon Whether it is an anonymous subscribe request or not. - * @param id_mask A List of replica ids to skip rows from + * @param id_filter A List of replica ids to skip rows from * when feeding a replica. * * @retval 0 Success. @@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, const struct vclock *vclock, bool anon, - unsigned int id_mask); + unsigned int id_filter); /** * Decode SUBSCRIBE command. @@ -343,7 +343,7 @@ xrow_encode_subscribe(struct xrow_header *row, * @param[out] vclock. * @param[out] version_id. * @param[out] anon Whether it is an anonymous subscribe. - * @param[out] id_mask A list of ids to skip rows from when + * @param[out] id_filter A list of ids to skip rows from when * feeding replica. * * @retval 0 Success. @@ -353,7 +353,7 @@ int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, uint32_t *version_id, bool *anon, - unsigned int *id_mask); + unsigned int *id_filter); /** * Encode JOIN command. @@ -827,10 +827,10 @@ xrow_encode_subscribe_xc(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, const struct vclock *vclock, bool anon, - unsigned int id_mask) + unsigned int id_filter) { if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, - vclock, anon, id_mask) != 0) + vclock, anon, id_filter) != 0) diag_raise(); } @@ -840,11 +840,11 @@ xrow_decode_subscribe_xc(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, uint32_t *replica_version_id, bool *anon, - unsigned int *id_mask) + unsigned int *id_filter) { if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid, vclock, replica_version_id, anon, - id_mask) != 0) + id_filter) != 0) diag_raise(); } > > >-- >Konstantin Osipov, Moscow, Russia -- Serge Petrenko [-- Attachment #2: Type: text/html, Size: 13667 bytes --]
next prev parent reply other threads:[~2020-02-26 11:16 UTC|newest] Thread overview: 25+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-02-26 10:00 [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance sergepetrenko 2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 1/4] box: expose box_is_orphan method sergepetrenko 2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 2/4] wal: warn when trying to write a record with a broken lsn sergepetrenko 2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay sergepetrenko 2020-02-26 10:18 ` Konstantin Osipov 2020-02-26 11:16 ` Serge Petrenko [this message] 2020-02-26 23:54 ` Vladislav Shpilevoy 2020-02-27 6:48 ` Konstantin Osipov 2020-02-27 13:15 ` Serge Petrenko 2020-02-27 23:33 ` Vladislav Shpilevoy 2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it sergepetrenko 2020-02-26 10:23 ` Konstantin Osipov 2020-02-26 11:21 ` Serge Petrenko 2020-02-26 11:58 ` Konstantin Osipov 2020-02-26 15:58 ` Serge Petrenko 2020-02-26 16:40 ` Konstantin Osipov 2020-02-26 23:54 ` Vladislav Shpilevoy 2020-02-27 6:52 ` Konstantin Osipov 2020-02-27 14:13 ` Serge Petrenko 2020-02-27 21:17 ` Serge Petrenko 2020-02-27 23:22 ` Vladislav Shpilevoy 2020-02-28 8:03 ` Serge Petrenko 2020-02-26 23:54 ` [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance Vladislav Shpilevoy 2020-02-27 21:24 ` Serge Petrenko 2020-02-27 23:24 ` Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=1582715780.115016077@f179.i.mail.ru \ --to=sergepetrenko@tarantool.org \ --cc=kirichenkoga@gmail.com \ --cc=kostja.osipov@gmail.com \ --cc=tarantool-patches@dev.tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox