* [tarantool-patches] [PATCH 0/6] swim payload @ 2019-04-11 22:22 Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 1/6] swim: factor out MP_BIN decoding from swim_decode_uuid Vladislav Shpilevoy ` (7 more replies) 0 siblings, 8 replies; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-11 22:22 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja The patchset introduces SWIM payload feature with a long tail of preliminary patches. First 3 of them are just refactoring. Next two patches strongly reworks SWIM tests' fake file descriptors filtering in order to make payload tests much simpler and faster than they could be without new filtering system. The last patch is a key one, and introduces the payload. Although it is worth noting, that I consider the patchset as rather raw. So it should not be pushed, in any part. Only reviewed. Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-3234-swim-payload Issue: https://github.com/tarantool/tarantool/issues/3234 Vladislav Shpilevoy (6): swim: factor out MP_BIN decoding from swim_decode_uuid swim: replace event_bin and member_bin with the passport swim: factor out 'update' part of swim_member_upsert() test: generalize SWIM fake descriptor filters test: introduce new SWIM packet filter by component names swim: introduce payload src/lib/swim/swim.c | 225 ++++++++++++++++++++++++++------ src/lib/swim/swim.h | 8 ++ src/lib/swim/swim_proto.c | 94 +++++++------ src/lib/swim/swim_proto.h | 87 ++++++------ test/unit/swim.c | 216 +++++++++++++++++++++++++++--- test/unit/swim.result | 32 ++++- test/unit/swim_test_transport.c | 133 +++++++++++++++---- test/unit/swim_test_transport.h | 41 +++++- test/unit/swim_test_utils.c | 184 +++++++++++++++++++++++++- test/unit/swim_test_utils.h | 45 +++++++ 10 files changed, 897 insertions(+), 168 deletions(-) -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] [PATCH 1/6] swim: factor out MP_BIN decoding from swim_decode_uuid 2019-04-11 22:22 [tarantool-patches] [PATCH 0/6] swim payload Vladislav Shpilevoy @ 2019-04-11 22:22 ` Vladislav Shpilevoy 2019-04-11 23:09 ` [tarantool-patches] " Konstantin Osipov 2019-04-11 22:22 ` [tarantool-patches] [PATCH 2/6] swim: replace event_bin and member_bin with the passport Vladislav Shpilevoy ` (6 subsequent siblings) 7 siblings, 1 reply; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-11 22:22 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja The new function is swim_decode_bin(), and is going to be used to safely decode payloads - arbitrary binary data disseminated alongside with all the other SWIM member attributes. Part of #3234 --- src/lib/swim/swim_proto.c | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index fa02b61c4..700eff431 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -120,9 +120,9 @@ swim_decode_port(struct sockaddr_in *address, const char **pos, const char *end, return 0; } -int -swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, - const char *prefix, const char *param_name) +static inline int +swim_decode_bin(const char **bin, uint32_t *size, const char **pos, + const char *end, const char *prefix, const char *param_name) { if (mp_typeof(**pos) != MP_BIN || *pos == end || mp_check_binl(*pos, end) > 0) { @@ -130,12 +130,27 @@ swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, param_name); return -1; } - if (mp_decode_binl(pos) != UUID_LEN || *pos + UUID_LEN > end) { + *bin = mp_decode_bin(pos, size); + if (*pos > end) { + diag_set(SwimError, "%s %s is invalid", prefix, param_name); + return -1; + } + return 0; +} + +int +swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, + const char *prefix, const char *param_name) +{ + uint32_t size; + const char *bin; + if (swim_decode_bin(&bin, &size, pos, end, prefix, param_name) != 0) + return -1; + if (size != UUID_LEN) { diag_set(SwimError, "%s %s is invalid", prefix, param_name); return -1; } - memcpy(uuid, *pos, UUID_LEN); - *pos += UUID_LEN; + memcpy(uuid, bin, UUID_LEN); return 0; } -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 1/6] swim: factor out MP_BIN decoding from swim_decode_uuid 2019-04-11 22:22 ` [tarantool-patches] [PATCH 1/6] swim: factor out MP_BIN decoding from swim_decode_uuid Vladislav Shpilevoy @ 2019-04-11 23:09 ` Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 0 siblings, 1 reply; 27+ messages in thread From: Konstantin Osipov @ 2019-04-11 23:09 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: > The new function is swim_decode_bin(), and is going to be used > to safely decode payloads - arbitrary binary data disseminated > alongside with all the other SWIM member attributes. ok to push. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 1/6] swim: factor out MP_BIN decoding from swim_decode_uuid 2019-04-11 23:09 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-12 19:23 ` Vladislav Shpilevoy 0 siblings, 0 replies; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-12 19:23 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed to the master. On 12/04/2019 02:09, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: >> The new function is swim_decode_bin(), and is going to be used >> to safely decode payloads - arbitrary binary data disseminated >> alongside with all the other SWIM member attributes. > > ok to push. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] [PATCH 2/6] swim: replace event_bin and member_bin with the passport 2019-04-11 22:22 [tarantool-patches] [PATCH 0/6] swim payload Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 1/6] swim: factor out MP_BIN decoding from swim_decode_uuid Vladislav Shpilevoy @ 2019-04-11 22:22 ` Vladislav Shpilevoy 2019-04-11 23:10 ` [tarantool-patches] " Konstantin Osipov 2019-04-11 22:22 ` [tarantool-patches] [PATCH 3/6] swim: factor out 'update' part of swim_member_upsert() Vladislav Shpilevoy ` (5 subsequent siblings) 7 siblings, 1 reply; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-11 22:22 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja Event_bin and member_bin binary packet structures were designed separately for different purposes. Initially the event_bin was thought having the same fields as passport + optional old UUID + optional payload. On the other hand, member_bin was supposed to store the passport + mandatory payload. But old UUID was cut off in favour of another way of UUID update. And payload appeared to be optional in both anti-entropy and dissemination. It means, that member_bin and event_bin are not needed anymore as separate structures. This commit replaces them with the passport completely. Part of #3234 --- src/lib/swim/swim.c | 60 +++++++++++++++++++-------------------- src/lib/swim/swim_proto.c | 38 ++----------------------- src/lib/swim/swim_proto.h | 56 +++++++++--------------------------- 3 files changed, 46 insertions(+), 108 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 4582e3205..0e7f51adf 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -732,6 +732,27 @@ swim_new_round(struct swim *swim) return 0; } +/** + * Encode one member into @a packet using @a passport structure. + * @retval 0 Success, encoded. + * @retval -1 Not enough memory in the packet. + */ +static int +swim_encode_member(struct swim_packet *packet, struct swim_member *m, + struct swim_passport_bin *passport) +{ + /* The headers should be initialized. */ + assert(passport->k_status == SWIM_MEMBER_STATUS); + int size = sizeof(*passport); + char *pos = swim_packet_alloc(packet, size); + if (pos == NULL) + return -1; + swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status, + m->incarnation); + memcpy(pos, passport, sizeof(*passport)); + return 0; +} + /** * Encode anti-entropy header and random members data as many as * possible to the end of the packet. @@ -741,28 +762,21 @@ static int swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet) { struct swim_anti_entropy_header_bin ae_header_bin; - struct swim_member_bin member_bin; - int size = sizeof(ae_header_bin); - char *header = swim_packet_reserve(packet, size); + struct swim_passport_bin passport_bin; + char *header = swim_packet_alloc(packet, sizeof(ae_header_bin)); if (header == NULL) return 0; - char *pos = header; - swim_member_bin_create(&member_bin); + swim_passport_bin_create(&passport_bin); struct mh_swim_table_t *t = swim->members; int i = 0, member_count = mh_size(t); int rnd = swim_scaled_rand(0, member_count - 1); for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t); i < member_count; ++i) { struct swim_member *m = *mh_swim_table_node(t, rc); - int new_size = size + sizeof(member_bin); - if (swim_packet_reserve(packet, new_size) == NULL) + if (swim_encode_member(packet, m, &passport_bin) != 0) break; - swim_member_bin_fill(&member_bin, &m->addr, &m->uuid, - m->status, m->incarnation); - memcpy(pos + size, &member_bin, sizeof(member_bin)); - size = new_size; /* - * First random member could be choosen too close + * First random member could be chosen too close * to the hash end. Here the cycle is wrapped, if * a packet still has free memory, but the * iterator has already reached the hash end. @@ -771,9 +785,6 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet) if (rc == end) rc = mh_first(t); } - if (i == 0) - return 0; - swim_packet_advance(packet, size); swim_anti_entropy_header_bin_create(&ae_header_bin, i); memcpy(header, &ae_header_bin, sizeof(ae_header_bin)); return 1; @@ -822,32 +833,21 @@ static int swim_encode_dissemination(struct swim *swim, struct swim_packet *packet) { struct swim_diss_header_bin diss_header_bin; - int size = sizeof(diss_header_bin); - char *header = swim_packet_reserve(packet, size); + struct swim_passport_bin passport_bin; + char *header = swim_packet_alloc(packet, sizeof(diss_header_bin)); if (header == NULL) return 0; + swim_passport_bin_create(&passport_bin); int i = 0; - char *pos = header + size; struct swim_member *m; - struct swim_event_bin event_bin; - swim_event_bin_create(&event_bin); rlist_foreach_entry(m, &swim->dissemination_queue, in_dissemination_queue) { - int new_size = size + sizeof(event_bin); - if (swim_packet_reserve(packet, new_size) == NULL) + if (swim_encode_member(packet, m, &passport_bin) != 0) break; - swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid, - m->incarnation); - memcpy(pos, &event_bin, sizeof(event_bin)); - pos += sizeof(event_bin); - size = new_size; ++i; } - if (i == 0) - return 0; swim_diss_header_bin_create(&diss_header_bin, i); memcpy(header, &diss_header_bin, sizeof(diss_header_bin)); - swim_packet_advance(packet, size); return 1; } diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 700eff431..d84550663 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -330,9 +330,10 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, header->v_anti_entropy = mp_bswap_u16(batch_size); } -static inline void +void swim_passport_bin_create(struct swim_passport_bin *passport) { + passport->m_header = 0x85; passport->k_status = SWIM_MEMBER_STATUS; passport->k_addr = SWIM_MEMBER_ADDRESS; passport->m_addr = 0xce; @@ -345,7 +346,7 @@ swim_passport_bin_create(struct swim_passport_bin *passport) passport->m_incarnation = 0xcf; } -static inline void +void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, @@ -358,22 +359,6 @@ swim_passport_bin_fill(struct swim_passport_bin *passport, passport->v_incarnation = mp_bswap_u64(incarnation); } -void -swim_member_bin_fill(struct swim_member_bin *header, - const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation) -{ - swim_passport_bin_fill(&header->passport, addr, uuid, status, - incarnation); -} - -void -swim_member_bin_create(struct swim_member_bin *header) -{ - header->m_header = 0x85; - swim_passport_bin_create(&header->passport); -} - void swim_diss_header_bin_create(struct swim_diss_header_bin *header, uint16_t batch_size) @@ -383,23 +368,6 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header, header->v_header = mp_bswap_u16(batch_size); } -void -swim_event_bin_create(struct swim_event_bin *header) -{ - swim_passport_bin_create(&header->passport); -} - -void -swim_event_bin_fill(struct swim_event_bin *header, - enum swim_member_status status, - const struct sockaddr_in *addr, const struct tt_uuid *uuid, - uint64_t incarnation) -{ - header->m_header = 0x85; - swim_passport_bin_fill(&header->passport, addr, uuid, status, - incarnation); -} - void swim_meta_header_bin_create(struct swim_meta_header_bin *header, const struct sockaddr_in *src) diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index 6ae4475c0..ab4057185 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -265,11 +265,15 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, * state, exact address. The whole passport is necessary for each * info related to a member: for anti-entropy records, for * dissemination events. The components can inherit that structure - * and add more attributes. For example, anti-entropy can add a - * mandatory payload; dissemination adds optional old UUID and - * payload. + * and add more attributes. Or just encode new attributes after + * the passport. For example, anti-entropy can add a payload when + * it is up to date; dissemination adds a payload when it is up to + * date and TTL is > 0. */ struct PACKED swim_passport_bin { + /** mp_encode_map(5) */ + uint8_t m_header; + /** mp_encode_uint(SWIM_MEMBER_STATUS) */ uint8_t k_status; /** mp_encode_uint(enum member_status) */ @@ -301,20 +305,9 @@ struct PACKED swim_passport_bin { uint64_t v_incarnation; }; -/** - * SWIM member MessagePack template. Represents one record in - * anti-entropy section. - */ -struct PACKED swim_member_bin { - /** mp_encode_map(5) */ - uint8_t m_header; - /** Basic member info like status, address. */ - struct swim_passport_bin passport; -}; - -/** Initialize antri-entropy record. */ +/** Initialize a member's binary passport. */ void -swim_member_bin_create(struct swim_member_bin *header); +swim_passport_bin_create(struct swim_passport_bin *passport); /** * Since usually there are many members, it is faster to reset a @@ -323,9 +316,10 @@ swim_member_bin_create(struct swim_member_bin *header); * fill() ... . */ void -swim_member_bin_fill(struct swim_member_bin *header, - const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation); +swim_passport_bin_fill(struct swim_passport_bin *passport, + const struct sockaddr_in *addr, + const struct tt_uuid *uuid, + enum swim_member_status status, uint64_t incarnation); /** }}} Anti-entropy component */ @@ -345,30 +339,6 @@ void swim_diss_header_bin_create(struct swim_diss_header_bin *header, uint16_t batch_size); -/** SWIM event MessagePack template. */ -struct PACKED swim_event_bin { - /** mp_encode_map(5 or 6) */ - uint8_t m_header; - /** Basic member info like status, address. */ - struct swim_passport_bin passport; -}; - -/** Initialize dissemination record. */ -void -swim_event_bin_create(struct swim_event_bin *header); - -/** - * Since usually there are many evnets, it is faster to reset a - * few fields in an existing template, then each time create a - * new template. So the usage pattern is create(), fill(), - * fill() ... . - */ -void -swim_event_bin_fill(struct swim_event_bin *header, - enum swim_member_status status, - const struct sockaddr_in *addr, const struct tt_uuid *uuid, - uint64_t incarnation); - /** }}} Dissemination component */ /** {{{ Meta component */ -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 2/6] swim: replace event_bin and member_bin with the passport 2019-04-11 22:22 ` [tarantool-patches] [PATCH 2/6] swim: replace event_bin and member_bin with the passport Vladislav Shpilevoy @ 2019-04-11 23:10 ` Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 0 siblings, 1 reply; 27+ messages in thread From: Konstantin Osipov @ 2019-04-11 23:10 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: > Event_bin and member_bin binary packet structures were designed > separately for different purposes. Initially the event_bin was > thought having the same fields as passport + optional old UUID + > optional payload. On the other hand, member_bin was supposed to > store the passport + mandatory payload. > > But old UUID was cut off in favour of another way of UUID update. > And payload appeared to be optional in both anti-entropy and > dissemination. It means, that member_bin and event_bin are not > needed anymore as separate structures. This commit replaces them > with the passport completely. ok to push -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 2/6] swim: replace event_bin and member_bin with the passport 2019-04-11 23:10 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-12 19:23 ` Vladislav Shpilevoy 0 siblings, 0 replies; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-12 19:23 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed to the master. On 12/04/2019 02:10, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: >> Event_bin and member_bin binary packet structures were designed >> separately for different purposes. Initially the event_bin was >> thought having the same fields as passport + optional old UUID + >> optional payload. On the other hand, member_bin was supposed to >> store the passport + mandatory payload. >> >> But old UUID was cut off in favour of another way of UUID update. >> And payload appeared to be optional in both anti-entropy and >> dissemination. It means, that member_bin and event_bin are not >> needed anymore as separate structures. This commit replaces them >> with the passport completely. > > ok to push > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] [PATCH 3/6] swim: factor out 'update' part of swim_member_upsert() 2019-04-11 22:22 [tarantool-patches] [PATCH 0/6] swim payload Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 1/6] swim: factor out MP_BIN decoding from swim_decode_uuid Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 2/6] swim: replace event_bin and member_bin with the passport Vladislav Shpilevoy @ 2019-04-11 22:22 ` Vladislav Shpilevoy 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov 2019-04-11 22:22 ` [tarantool-patches] [PATCH 4/6] test: generalize SWIM fake descriptor filters Vladislav Shpilevoy ` (4 subsequent siblings) 7 siblings, 1 reply; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-11 22:22 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja Move 'update' logic into a separate function, because in the next commits it is going to become more complicated due to payload introduction, and it would be undesirable to clog the upsert() function with payload-specific code. Part of #3234 --- src/lib/swim/swim.c | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 0e7f51adf..2dac6eedd 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -1055,6 +1055,22 @@ swim_update_member_addr(struct swim *swim, struct swim_member *member, } } +/** + * Update an existing member with a new definition. It is expected + * that @a def has an incarnation not older that @a member has. + */ +static inline void +swim_update_member(struct swim *swim, const struct swim_member_def *def, + struct swim_member *member) +{ + assert(member != swim->self); + assert(def->incarnation >= member->incarnation); + if (def->incarnation > member->incarnation) + swim_update_member_addr(swim, member, &def->addr, 0); + swim_update_member_inc_status(swim, member, def->status, + def->incarnation); +} + /** * Update or create a member by its definition, received from a * remote instance. @@ -1099,9 +1115,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, if (member != self) { if (def->incarnation < member->incarnation) goto skip; - swim_update_member_addr(swim, member, &def->addr, 0); - swim_update_member_inc_status(swim, member, def->status, - def->incarnation); + swim_update_member(swim, def, member); return 0; } /* -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 3/6] swim: factor out 'update' part of swim_member_upsert() 2019-04-11 22:22 ` [tarantool-patches] [PATCH 3/6] swim: factor out 'update' part of swim_member_upsert() Vladislav Shpilevoy @ 2019-04-11 23:11 ` Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 0 siblings, 1 reply; 27+ messages in thread From: Konstantin Osipov @ 2019-04-11 23:11 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: > Move 'update' logic into a separate function, because in the next > commits it is going to become more complicated due to payload > introduction, and it would be undesirable to clog the upsert() > function with payload-specific code. ok to push. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 3/6] swim: factor out 'update' part of swim_member_upsert() 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-12 19:23 ` Vladislav Shpilevoy 0 siblings, 0 replies; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-12 19:23 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed to the master. On 12/04/2019 02:11, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: >> Move 'update' logic into a separate function, because in the next >> commits it is going to become more complicated due to payload >> introduction, and it would be undesirable to clog the upsert() >> function with payload-specific code. > > ok to push. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] [PATCH 4/6] test: generalize SWIM fake descriptor filters 2019-04-11 22:22 [tarantool-patches] [PATCH 0/6] swim payload Vladislav Shpilevoy ` (2 preceding siblings ...) 2019-04-11 22:22 ` [tarantool-patches] [PATCH 3/6] swim: factor out 'update' part of swim_member_upsert() Vladislav Shpilevoy @ 2019-04-11 22:22 ` Vladislav Shpilevoy 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov 2019-04-11 22:22 ` [tarantool-patches] [PATCH 5/6] test: introduce new SWIM packet filter by component names Vladislav Shpilevoy ` (3 subsequent siblings) 7 siblings, 1 reply; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-11 22:22 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja At this moment SWIM test harness implementes its own fake file descriptor table, which is used unawares by the real SWIM code. Each fake fd has send and recv queues, can delay and drop packets with a certain probability. But it is going to be not enough for new tests. It is wanted to be able to drop packets with a specified content, from and to a specified direction. For that the patch implements a filtering mechanism. Each fake fd now has a list of filters, applied one by one to each packet. If at least on filter wants to drop a packet, then it is dropped. The filters know packet content and direction: outgoing or incomming. Now only one filter exists - drop rate. It existed even before the patch, but now it is ported on the new API. Part of #3234 --- test/unit/swim.c | 21 ++--- test/unit/swim_test_transport.c | 133 ++++++++++++++++++++++++++------ test/unit/swim_test_transport.h | 41 ++++++++-- test/unit/swim_test_utils.c | 68 +++++++++++++++- test/unit/swim_test_utils.h | 18 +++++ 5 files changed, 236 insertions(+), 45 deletions(-) diff --git a/test/unit/swim.c b/test/unit/swim.c index 03f6b412c..48aea2f07 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -590,29 +590,22 @@ swim_test_uri_update(void) swim_member_by_uuid(s1, swim_member_uuid(s0_self)); is(strcmp(new_s0_uri, swim_member_uri(s0_view)), 0, "S1 updated its URI and S2 sees that"); - /* - * S2 should not manage to send the new address to S3, but - * should accept S3 packets later - therefore block is - * needed. - */ - swim_cluster_block_io(cluster, 1); /* * S1 should not send the new address to S3 - drop its * packets. */ swim_cluster_set_drop(cluster, 0, 100); /* - * Main part of the test - S3 sends the old address to S1. + * S2 should not manage to send the new address to S3, but + * should accept S3 packets with the old address and + * ignore it. */ - swim_cluster_set_drop(cluster, 2, 0); - swim_run_for(3); - swim_cluster_set_drop(cluster, 2, 100); + swim_cluster_set_drop_out(cluster, 1, 100); /* - * S2 absorbs the packets, but should ignore the old - * address. + * Main part of the test - S3 sends the old address to S2. */ - swim_cluster_unblock_io(cluster, 1); - swim_run_for(2); + swim_cluster_set_drop(cluster, 2, 0); + swim_run_for(3); is(strcmp(new_s0_uri, swim_member_uri(s0_view)), 0, "S2 still keeps new S1's URI, even received the old one from S3"); diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c index 8ad434340..5f84a7c95 100644 --- a/test/unit/swim_test_transport.c +++ b/test/unit/swim_test_transport.c @@ -100,6 +100,51 @@ swim_test_packet_dup(struct swim_test_packet *p) return res; } +/** + * Packet filter. Each fake file descriptor has a list of filters. + * For each incoming and outgoing packet it checks all the + * filters in the list. If anyone wants to filter the packet out, + * then the packet is dropped. + */ +struct swim_fd_filter { + /** A function to decide whether to drop a packet. */ + swim_test_filter_check_f check; + /** + * A function called when the filter is deleted to free + * @a udata if necessary. + */ + swim_test_filter_delete_f delete; + /** + * Arbitrary user data. Passed to each call of @a check. + */ + void *udata; + /** Link in the list of filters in the descriptor. */ + struct rlist in_filters; +}; + +/** Create a new filter. */ +static inline struct swim_fd_filter * +swim_fd_filter_new(swim_test_filter_check_f check, + swim_test_filter_delete_f delete, void *udata) +{ + struct swim_fd_filter *f = (struct swim_fd_filter *) malloc(sizeof(*f)); + assert(f != NULL); + f->udata = udata; + f->check = check; + f->delete = delete; + rlist_create(&f->in_filters); + return f; +} + +/** Delete @a filter and its data. */ +static inline void +swim_fd_filter_delete(struct swim_fd_filter *filter) +{ + rlist_del_entry(filter, in_filters); + filter->delete(filter->udata); + free(filter); +} + /** Fake file descriptor. */ struct swim_fd { /** File descriptor number visible to libev. */ @@ -111,10 +156,11 @@ struct swim_fd { */ bool is_opened; /** - * Probability of packet loss. For both sends and - * receipts. + * List of packet filters. All of them are checked for + * each packet, and if at least one decides to drop, then + * the packet is deleted. */ - double drop_rate; + struct rlist filters; /** * Link in the list of opened and non-blocked descriptors. * Used to feed them all EV_WRITE. @@ -143,12 +189,47 @@ swim_fd_open(struct swim_fd *fd) diag_set(SocketError, "test_socket:1", "bind"); return -1; } + assert(rlist_empty(&fd->filters)); fd->is_opened = true; - fd->drop_rate = 0; rlist_add_tail_entry(&swim_fd_active, fd, in_active); return 0; } +/** + * Remove a filter having @a check function. Works just like the + * core triggers library. The found trigger is deleted. If nothing + * is found, then it is not an error. + */ +void +swim_test_transport_remove_filter(int fd, swim_test_filter_check_f check) +{ + struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE]; + assert(sfd->is_opened); + struct swim_fd_filter *f; + rlist_foreach_entry(f, &sfd->filters, in_filters) { + if (check == f->check) { + swim_fd_filter_delete(f); + return; + } + } +} + +/** + * Add a new filter, or replace an existing one. If a filter + * already exists with the same @a check function, then it is + * deleted. + */ +void +swim_test_transport_add_filter(int fd, swim_test_filter_check_f check, + swim_test_filter_delete_f delete, void *udata) +{ + struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE]; + assert(sfd->is_opened); + struct swim_fd_filter *f = swim_fd_filter_new(check, delete, udata); + swim_test_transport_remove_filter(fd, check); + rlist_add_tail_entry(&sfd->filters, f, in_filters); +} + /** Send one packet to destination's recv queue. */ static inline void swim_fd_send_packet(struct swim_fd *fd); @@ -159,6 +240,9 @@ swim_fd_close(struct swim_fd *fd) { if (! fd->is_opened) return; + struct swim_fd_filter *f, *f_tmp; + rlist_foreach_entry_safe(f, &fd->filters, in_filters, f_tmp) + swim_fd_filter_delete(f); struct swim_test_packet *i, *tmp; rlist_foreach_entry_safe(i, &fd->recv_queue, in_queue, tmp) swim_test_packet_delete(i); @@ -168,13 +252,30 @@ swim_fd_close(struct swim_fd *fd) fd->is_opened = false; } +/** + * Check all the packet filters if any wants to drop @a p packet. + * @a dir parameter says direction. Values are the same as for + * standard in/out descriptors: 0 for input, 1 for output. + */ +static inline bool +swim_fd_test_if_drop(struct swim_fd *fd, const struct swim_test_packet *p, + int dir) +{ + struct swim_fd_filter *f; + rlist_foreach_entry(f, &fd->filters, in_filters) { + if (f->check(p->data, p->size, f->udata, dir)) + return true; + } + return false; +} + void swim_test_transport_init(void) { for (int i = 0, evfd = FAKE_FD_BASE; i < FAKE_FD_NUMBER; ++i, ++evfd) { + rlist_create(&swim_fd[i].filters); swim_fd[i].evfd = evfd; swim_fd[i].is_opened = false; - swim_fd[i].drop_rate = 0; rlist_create(&swim_fd[i].in_active); rlist_create(&swim_fd[i].recv_queue); rlist_create(&swim_fd[i].send_queue); @@ -287,24 +388,6 @@ swim_test_transport_unblock_fd(int fd) rlist_add_tail_entry(&swim_fd_active, sfd, in_active); } -void -swim_test_transport_set_drop(int fd, double value) -{ - struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE]; - if (sfd->is_opened) - sfd->drop_rate = value; -} - -/** - * Returns true with probability @a rate, and is used to decided - * wether to drop a packet or not. - */ -static inline bool -swim_test_is_drop(double rate) -{ - return ((double) rand() / RAND_MAX) * 100 < rate; -} - /** * Move @a p packet, originated from @a src descriptor's send * queue, to @a dst descriptor's recv queue. The function checks @@ -315,8 +398,8 @@ static inline void swim_move_packet(struct swim_fd *src, struct swim_fd *dst, struct swim_test_packet *p) { - if (dst->is_opened && !swim_test_is_drop(dst->drop_rate) && - !swim_test_is_drop(src->drop_rate)) + if (dst->is_opened && !swim_fd_test_if_drop(dst, p, 0) && + !swim_fd_test_if_drop(src, p, 1)) rlist_add_tail_entry(&dst->recv_queue, p, in_queue); else swim_test_packet_delete(p); diff --git a/test/unit/swim_test_transport.h b/test/unit/swim_test_transport.h index d751efe83..454de1d8f 100644 --- a/test/unit/swim_test_transport.h +++ b/test/unit/swim_test_transport.h @@ -40,6 +40,23 @@ struct ev_loop; * capability to set necessary loss level, delay, reorders. */ +/** + * Signature of a packet filter function. It takes packet data, + * arbitrary user data, and should return true, if the packet + * should be dropped. False otherwise. Direction is said via + * @a dir parameter. 0 means incoming packet, 1 means outgoing + * packet, just like standard IO descriptors. + */ +typedef bool (*swim_test_filter_check_f)(const char *data, int size, + void *udata, int dir); + +/** + * It is possible that a filter is complex and uses helper data + * allocated somewhere. This function is called when the filter + * is dropped and allows to free user data. + */ +typedef void (*swim_test_filter_delete_f)(void *udata); + /** * Until there are no new IO events, feed EV_WRITE event to all * opened descriptors; EV_READ to ones, who have not empty recv @@ -61,13 +78,27 @@ void swim_test_transport_unblock_fd(int fd); /** - * Drop rate of incoming and outgoing packets. Note, that even if - * a packet is dropped on send, the node, owning @a fd, still - * thinks, that the packet is sent. It is not a sender-visible - * error. + * Add a filter to the file descriptor @a fd. If a filter with + * the same @a check function exists, then it is deleted and a + * new one is created. + * @param fd File descriptor to add filter to. + * @param check Check function. It is called for each packet and + * should return true, when the packet should be dropped. + * @param delete A destructor for @a udata called when the filter + * is dropped. + * @param udata Arbitrary user data, passed to each @a check + * invocation. + */ +void +swim_test_transport_add_filter(int fd, swim_test_filter_check_f check, + swim_test_filter_delete_f delete, void *udata); + +/** + * Drop a filter from @a fd descriptor having @a check filter + * function. */ void -swim_test_transport_set_drop(int fd, double value); +swim_test_transport_remove_filter(int fd, swim_test_filter_check_f check); /** Initialize test transport system. */ void diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index da8dd4386..d933434e9 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -242,10 +242,76 @@ swim_cluster_unblock_io(struct swim_cluster *cluster, int i) swim_test_transport_unblock_fd(swim_fd(cluster->node[i].swim)); } +/** A structure used by drop rate packet filter. */ +struct swim_drop_rate { + /** True if should be applied to incoming packets. */ + bool is_for_in; + /** True if should be applied to outgoing packets. */ + bool is_for_out; + /** Drop rate percentage. */ + double rate; +}; + +/** Create a new drop rate filter helper. */ +static inline struct swim_drop_rate * +swim_drop_rate_new(double rate, bool is_for_in, bool is_for_out) +{ + struct swim_drop_rate *dr = + (struct swim_drop_rate *) malloc(sizeof(*dr)); + assert(dr != NULL); + dr->rate = rate; + dr->is_for_in = is_for_in; + dr->is_for_out = is_for_out; + return dr; +} + +/** + * A packet filter dropping a packet with a certain probability. + */ +static bool +swim_filter_drop_rate(const char *data, int size, void *udata, int dir) +{ + (void) data; + (void) size; + struct swim_drop_rate *dr = (struct swim_drop_rate *) udata; + if ((dir == 0 && !dr->is_for_in) || (dir == 1 && !dr->is_for_out)) + return false; + return ((double) rand() / RAND_MAX) * 100 < dr->rate; +} + +/** + * Create a new drop rate filter for the instance with id @a i. + */ +static void +swim_cluster_set_drop_generic(struct swim_cluster *cluster, int i, + double value, bool is_for_in, bool is_for_out) +{ + int fd = swim_fd(swim_cluster_node(cluster, i)); + if (value == 0) { + swim_test_transport_remove_filter(fd, swim_filter_drop_rate); + return; + } + struct swim_drop_rate *dr = swim_drop_rate_new(value, is_for_in, + is_for_out); + swim_test_transport_add_filter(fd, swim_filter_drop_rate, free, dr); +} + void swim_cluster_set_drop(struct swim_cluster *cluster, int i, double value) { - swim_test_transport_set_drop(swim_fd(cluster->node[i].swim), value); + swim_cluster_set_drop_generic(cluster, i, value, true, true); +} + +void +swim_cluster_set_drop_out(struct swim_cluster *cluster, int i, double value) +{ + swim_cluster_set_drop_generic(cluster, i, value, false, true); +} + +void +swim_cluster_set_drop_in(struct swim_cluster *cluster, int i, double value) +{ + swim_cluster_set_drop_generic(cluster, i, value, true, false); } /** Check if @a s1 knows every member of @a s2's table. */ diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 5e1c192a1..af428c792 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -88,9 +88,27 @@ swim_cluster_block_io(struct swim_cluster *cluster, int i); void swim_cluster_unblock_io(struct swim_cluster *cluster, int i); +/** + * Set drop rate of incoming and outgoing packets for a node with + * id @a i. Note, that even if a packet is dropped on send, the + * node still thinks, that the packet is sent. It is not a + * sender-visible error. + */ void swim_cluster_set_drop(struct swim_cluster *cluster, int i, double value); +/** + * The same as simple drop, but applied to outgoing packets only. + */ +void +swim_cluster_set_drop_out(struct swim_cluster *cluster, int i, double value); + +/** + * The same as simple drop, but applied to incoming packets only. + */ +void +swim_cluster_set_drop_in(struct swim_cluster *cluster, int i, double value); + /** * Explicitly add a member of id @a from_id to a member of id * @a to_id. -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 4/6] test: generalize SWIM fake descriptor filters 2019-04-11 22:22 ` [tarantool-patches] [PATCH 4/6] test: generalize SWIM fake descriptor filters Vladislav Shpilevoy @ 2019-04-11 23:11 ` Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 0 siblings, 1 reply; 27+ messages in thread From: Konstantin Osipov @ 2019-04-11 23:11 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: > At this moment SWIM test harness implementes its own fake file > descriptor table, which is used unawares by the real SWIM code. > Each fake fd has send and recv queues, can delay and drop > packets with a certain probability. But it is going to be not > enough for new tests. > > It is wanted to be able to drop packets with a specified content, > from and to a specified direction. For that the patch implements > a filtering mechanism. Each fake fd now has a list of filters, > applied one by one to each packet. If at least on filter wants to > drop a packet, then it is dropped. The filters know packet > content and direction: outgoing or incomming. > > Now only one filter exists - drop rate. It existed even before > the patch, but now it is ported on the new API. ok to push. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 4/6] test: generalize SWIM fake descriptor filters 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-12 19:23 ` Vladislav Shpilevoy 0 siblings, 0 replies; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-12 19:23 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed to the master. On 12/04/2019 02:11, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: >> At this moment SWIM test harness implementes its own fake file >> descriptor table, which is used unawares by the real SWIM code. >> Each fake fd has send and recv queues, can delay and drop >> packets with a certain probability. But it is going to be not >> enough for new tests. >> >> It is wanted to be able to drop packets with a specified content, >> from and to a specified direction. For that the patch implements >> a filtering mechanism. Each fake fd now has a list of filters, >> applied one by one to each packet. If at least on filter wants to >> drop a packet, then it is dropped. The filters know packet >> content and direction: outgoing or incomming. >> >> Now only one filter exists - drop rate. It existed even before >> the patch, but now it is ported on the new API. > > ok to push. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] [PATCH 5/6] test: introduce new SWIM packet filter by component names 2019-04-11 22:22 [tarantool-patches] [PATCH 0/6] swim payload Vladislav Shpilevoy ` (3 preceding siblings ...) 2019-04-11 22:22 ` [tarantool-patches] [PATCH 4/6] test: generalize SWIM fake descriptor filters Vladislav Shpilevoy @ 2019-04-11 22:22 ` Vladislav Shpilevoy 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov 2019-04-11 22:22 ` [tarantool-patches] [PATCH 6/6] swim: introduce payload Vladislav Shpilevoy ` (2 subsequent siblings) 7 siblings, 1 reply; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-11 22:22 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja In the next patch on payloads it is wanted to drop only packets containing certain sections such as anti-entropy, dissemination. New SWIM test transport filters allow to implement this with ease. Part of #3234 --- test/unit/swim_test_utils.c | 54 +++++++++++++++++++++++++++++++++++++ test/unit/swim_test_utils.h | 9 +++++++ 2 files changed, 63 insertions(+) diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index d933434e9..fd528d166 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -35,6 +35,7 @@ #include "uuid/tt_uuid.h" #include "trivia/util.h" #include "fiber.h" +#include "msgpuck.h" /** * SWIM cluster node and its UUID. UUID is stored separately @@ -314,6 +315,59 @@ swim_cluster_set_drop_in(struct swim_cluster *cluster, int i, double value) swim_cluster_set_drop_generic(cluster, i, value, true, false); } +/** + * A list of components to drop used by component packet filter. + */ +struct swim_drop_components { + /** List of component body keys. */ + const int *keys; + /** Length of @a keys. */ + int key_count; +}; + +/** + * Check if a packet contains any of the components to filter out. + */ +static bool +swim_filter_drop_component(const char *data, int size, void *udata, int dir) +{ + (void) size; + (void) dir; + struct swim_drop_components *dc = (struct swim_drop_components *) udata; + /* Skip meta. */ + mp_next(&data); + int map_size = mp_decode_map(&data); + for (int i = 0; i < map_size; ++i) { + int key = mp_decode_uint(&data); + for (int j = 0; j < dc->key_count; ++j) { + if (dc->keys[j] == key) + return true; + } + /* Skip value. */ + mp_next(&data); + } + return false; +} + +void +swim_cluster_drop_components(struct swim_cluster *cluster, int i, + const int *keys, int key_count) +{ + int fd = swim_fd(swim_cluster_node(cluster, i)); + if (key_count == 0) { + swim_test_transport_remove_filter(fd, + swim_filter_drop_component); + return; + } + struct swim_drop_components *dc = + (struct swim_drop_components *) malloc(sizeof(*dc)); + assert(dc != NULL); + dc->key_count = key_count; + dc->keys = keys; + swim_test_transport_add_filter(fd, swim_filter_drop_component, free, + dc); +} + /** Check if @a s1 knows every member of @a s2's table. */ static inline bool swim1_contains_swim2(struct swim *s1, struct swim *s2) diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index af428c792..6ea136e36 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -109,6 +109,15 @@ swim_cluster_set_drop_out(struct swim_cluster *cluster, int i, double value); void swim_cluster_set_drop_in(struct swim_cluster *cluster, int i, double value); +/** + * Drop all packets from/to a SWIM instance with id @a i + * containing components specified in @a keys. Components are + * defined by the constants in the packet body. + */ +void +swim_cluster_drop_components(struct swim_cluster *cluster, int i, + const int *keys, int key_count); + /** * Explicitly add a member of id @a from_id to a member of id * @a to_id. -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 5/6] test: introduce new SWIM packet filter by component names 2019-04-11 22:22 ` [tarantool-patches] [PATCH 5/6] test: introduce new SWIM packet filter by component names Vladislav Shpilevoy @ 2019-04-11 23:11 ` Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 0 siblings, 1 reply; 27+ messages in thread From: Konstantin Osipov @ 2019-04-11 23:11 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: > In the next patch on payloads it is wanted to drop only packets > containing certain sections such as anti-entropy, dissemination. > New SWIM test transport filters allow to implement this with > ease. ok to push -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 5/6] test: introduce new SWIM packet filter by component names 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-12 19:23 ` Vladislav Shpilevoy 0 siblings, 0 replies; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-12 19:23 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed to the master. On 12/04/2019 02:11, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: >> In the next patch on payloads it is wanted to drop only packets >> containing certain sections such as anti-entropy, dissemination. >> New SWIM test transport filters allow to implement this with >> ease. > > ok to push > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] [PATCH 6/6] swim: introduce payload 2019-04-11 22:22 [tarantool-patches] [PATCH 0/6] swim payload Vladislav Shpilevoy ` (4 preceding siblings ...) 2019-04-11 22:22 ` [tarantool-patches] [PATCH 5/6] test: introduce new SWIM packet filter by component names Vladislav Shpilevoy @ 2019-04-11 22:22 ` Vladislav Shpilevoy 2019-04-18 15:12 ` [tarantool-patches] " Konstantin Osipov 2019-04-18 17:43 ` [tarantool-patches] [PATCH 5.5/6] swim: rename TTL to TTD Vladislav Shpilevoy 2019-04-18 18:16 ` [tarantool-patches] [PATCH 7/6] swim: drop incarnation_inc parameter from update() routines Vladislav Shpilevoy 7 siblings, 1 reply; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-11 22:22 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja Payload is arbitrary user data disseminated over the cluster along with other member attributes. Part of #3234 --- src/lib/swim/swim.c | 155 ++++++++++++++++++++++++++-- src/lib/swim/swim.h | 8 ++ src/lib/swim/swim_proto.c | 31 +++++- src/lib/swim/swim_proto.h | 41 +++++++- test/unit/swim.c | 195 +++++++++++++++++++++++++++++++++++- test/unit/swim.result | 32 +++++- test/unit/swim_test_utils.c | 62 ++++++++++++ test/unit/swim_test_utils.h | 18 ++++ 8 files changed, 525 insertions(+), 17 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 2dac6eedd..2be1c846b 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -308,6 +308,38 @@ struct swim_member { * allow other members to learn the dead status. */ int status_ttl; + /** Arbitrary user data, disseminated on each change. */ + char *payload; + /** Payload size, in bytes. */ + uint16_t payload_size; + /** + * True, if the payload is thought to be of the most + * actual version. In such a case it can be disseminated + * farther. Otherwise @a payload is suspected to be + * outdated and can be updated in two cases only: + * + * 1) when it is received with a bigger incarnation from + * anywhere; + * + * 2) when it is received with the same incarnation, but + * local payload is outdated. + * + * A payload can become outdated, if anyhow a new + * incarnation of the member has been learned, but not a + * new payload. In such a case it can't be said exactly + * whether the member has updated payload, or another + * attribute. The only way here is to wait until the most + * actual payload will be received from another instance. + * Note, that such an instance always exists - the payload + * originator instance. + */ + bool is_payload_up_to_date; + /** + * TTL of payload. At most this number of times payload is + * sent as a part of dissemination component. Reset on + * each payload update. + */ + int payload_ttl; /** * All created events are put into a queue sorted by event * time. @@ -523,6 +555,34 @@ swim_by_scheduler(struct swim_scheduler *scheduler) return container_of(scheduler, struct swim, scheduler); } +/** Update member's payload, register a corresponding event. */ +static inline int +swim_update_member_payload(struct swim *swim, struct swim_member *member, + const char *payload, uint16_t payload_size, + int incarnation_increment) +{ + assert(payload_size <= MAX_PAYLOAD_SIZE); + char *new_payload; + if (payload_size > 0) { + new_payload = (char *) realloc(member->payload, payload_size); + if (new_payload == NULL) { + diag_set(OutOfMemory, payload_size, "realloc", "new_payload"); + return -1; + } + memcpy(new_payload, payload, payload_size); + } else { + free(member->payload); + new_payload = NULL; + } + member->payload = new_payload; + member->payload_size = payload_size; + member->payload_ttl = mh_size(swim->members); + member->incarnation += incarnation_increment; + member->is_payload_up_to_date = true; + swim_on_member_update(swim, member); + return 0; +} + /** * Once a ping is sent, the member should start waiting for an * ACK. @@ -556,6 +616,7 @@ swim_member_delete(struct swim_member *member) /* Dissemination component. */ assert(rlist_empty(&member->in_dissemination_queue)); + free(member->payload); free(member); } @@ -637,7 +698,7 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid) static struct swim_member * swim_new_member(struct swim *swim, const struct sockaddr_in *addr, const struct tt_uuid *uuid, enum swim_member_status status, - uint64_t incarnation) + uint64_t incarnation, const char *payload, int payload_size) { int new_bsize = sizeof(swim->shuffled[0]) * (mh_size(swim->members) + 1); @@ -675,6 +736,12 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, /* Dissemination component. */ swim_on_member_update(swim, member); + if (payload_size >= 0 && + swim_update_member_payload(swim, member, payload, + payload_size, 0) != 0) { + swim_delete_member(swim, member); + return NULL; + } say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim), swim_uuid_str(&member->uuid), mh_size(swim->members)); @@ -739,17 +806,29 @@ swim_new_round(struct swim *swim) */ static int swim_encode_member(struct swim_packet *packet, struct swim_member *m, - struct swim_passport_bin *passport) + struct swim_passport_bin *passport, + struct swim_member_payload_bin *payload_header, + bool is_payload_needed) { /* The headers should be initialized. */ assert(passport->k_status == SWIM_MEMBER_STATUS); + assert(payload_header->k_payload == SWIM_MEMBER_PAYLOAD); int size = sizeof(*passport); + if (is_payload_needed) + size += sizeof(*payload_header) + m->payload_size; char *pos = swim_packet_alloc(packet, size); if (pos == NULL) return -1; swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status, - m->incarnation); + m->incarnation, is_payload_needed); memcpy(pos, passport, sizeof(*passport)); + if (is_payload_needed) { + pos += sizeof(*passport); + swim_member_payload_bin_fill(payload_header, m->payload_size); + memcpy(pos, payload_header, sizeof(*payload_header)); + pos += sizeof(*payload_header); + memcpy(pos, m->payload, m->payload_size); + } return 0; } @@ -763,17 +842,21 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet) { struct swim_anti_entropy_header_bin ae_header_bin; struct swim_passport_bin passport_bin; + struct swim_member_payload_bin payload_header; char *header = swim_packet_alloc(packet, sizeof(ae_header_bin)); if (header == NULL) return 0; swim_passport_bin_create(&passport_bin); + swim_member_payload_bin_create(&payload_header); struct mh_swim_table_t *t = swim->members; int i = 0, member_count = mh_size(t); int rnd = swim_scaled_rand(0, member_count - 1); for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t); i < member_count; ++i) { struct swim_member *m = *mh_swim_table_node(t, rc); - if (swim_encode_member(packet, m, &passport_bin) != 0) + if (swim_encode_member(packet, m, &passport_bin, + &payload_header, + m->is_payload_up_to_date) != 0) break; /* * First random member could be chosen too close @@ -833,16 +916,21 @@ static int swim_encode_dissemination(struct swim *swim, struct swim_packet *packet) { struct swim_diss_header_bin diss_header_bin; + struct swim_member_payload_bin payload_header; struct swim_passport_bin passport_bin; char *header = swim_packet_alloc(packet, sizeof(diss_header_bin)); if (header == NULL) return 0; swim_passport_bin_create(&passport_bin); + swim_member_payload_bin_create(&payload_header); int i = 0; struct swim_member *m; rlist_foreach_entry(m, &swim->dissemination_queue, in_dissemination_queue) { - if (swim_encode_member(packet, m, &passport_bin) != 0) + bool is_payload_needed = m->payload_ttl > 0 && + m->is_payload_up_to_date; + if (swim_encode_member(packet, m, &passport_bin, + &payload_header, is_payload_needed) != 0) break; ++i; } @@ -890,6 +978,10 @@ swim_decrease_event_ttl(struct swim *swim) rlist_foreach_entry_safe(member, &swim->dissemination_queue, in_dissemination_queue, tmp) { + if (member->payload_ttl > 0) { + if (--member->payload_ttl == 0) + swim_cached_round_msg_invalidate(swim); + } if (--member->status_ttl == 0) { rlist_del_entry(member, in_dissemination_queue); swim_cached_round_msg_invalidate(swim); @@ -1065,8 +1157,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, { assert(member != swim->self); assert(def->incarnation >= member->incarnation); - if (def->incarnation > member->incarnation) + /* + * Payload update rules are simple: it can be updated + * either if the new payload has a bigger incarnation, or + * the same incarnation, but local payload is outdated. + */ + bool is_payload_needed = false; + if (def->incarnation > member->incarnation) { swim_update_member_addr(swim, member, &def->addr, 0); + if (def->payload_size >= 0) { + is_payload_needed = true; + } else if (member->is_payload_up_to_date) { + member->is_payload_up_to_date = false; + swim_on_member_update(swim, member); + } + } else if (! member->is_payload_up_to_date && def->payload_size >= 0) { + is_payload_needed = true; + } + if (is_payload_needed && + swim_update_member_payload(swim, member, def->payload, + def->payload_size, 0) != 0) { + /* Not such a critical error. */ + diag_log(); + } swim_update_member_inc_status(swim, member, def->status, def->incarnation); } @@ -1107,7 +1220,8 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, goto skip; } *result = swim_new_member(swim, &def->addr, &def->uuid, - def->status, def->incarnation); + def->status, def->incarnation, + def->payload, def->payload_size); return *result != NULL ? 0 : -1; } *result = member; @@ -1436,7 +1550,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, - 0); + 0, NULL, 0); if (swim->self == NULL) return -1; } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { @@ -1448,7 +1562,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } new_self = swim_new_member(swim, &swim->self->addr, uuid, - MEMBER_ALIVE, 0); + MEMBER_ALIVE, 0, swim->self->payload, + swim->self->payload_size); if (new_self == NULL) return -1; } @@ -1504,6 +1619,18 @@ swim_is_configured(const struct swim *swim) return swim->self != NULL; } +int +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size) +{ + if (payload_size > MAX_PAYLOAD_SIZE) { + diag_set(IllegalParams, "Payload should be <= %d", + MAX_PAYLOAD_SIZE); + return -1; + } + return swim_update_member_payload(swim, swim->self, payload, + payload_size, 1); +} + int swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) { @@ -1518,7 +1645,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) return -1; struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { - member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0); + member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0, + NULL, -1); return member == NULL ? -1 : 0; } diag_set(SwimError, "%s a member with such UUID already exists", @@ -1754,3 +1882,10 @@ swim_member_incarnation(const struct swim_member *member) { return member->incarnation; } + +const char * +swim_member_payload(const struct swim_member *member, uint16_t *size) +{ + *size = member->payload_size; + return member->payload; +} diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 09d933b83..6a219d131 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -104,6 +104,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, double swim_ack_timeout(const struct swim *swim); +/** Set payload to disseminate over the cluster. */ +int +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size); + /** * Stop listening and broadcasting messages, cleanup all internal * structures, free memory. @@ -220,6 +224,10 @@ swim_member_uuid(const struct swim_member *member); uint64_t swim_member_incarnation(const struct swim_member *member); +/** Member's payload. */ +const char * +swim_member_payload(const struct swim_member *member, uint16_t *size); + #if defined(__cplusplus) } #endif diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index d84550663..16034a25b 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -160,6 +160,7 @@ swim_member_def_create(struct swim_member_def *def) memset(def, 0, sizeof(*def)); def->addr.sin_family = AF_INET; def->status = MEMBER_ALIVE; + def->payload_size = -1; } /** @@ -179,6 +180,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, struct swim_member_def *def) { uint64_t tmp; + uint32_t len; switch (key) { case SWIM_MEMBER_STATUS: if (swim_decode_uint(pos, end, &tmp, prefix, @@ -210,6 +212,17 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member incarnation") != 0) return -1; break; + case SWIM_MEMBER_PAYLOAD: + if (swim_decode_bin(&def->payload, &len, pos, end, prefix, + "member payload") != 0) + return -1; + if (len > MAX_PAYLOAD_SIZE) { + diag_set(SwimError, "%s member payload size should be "\ + "<= %d", prefix, MAX_PAYLOAD_SIZE); + return -1; + } + def->payload_size = (int) len; + break; default: unreachable(); } @@ -330,10 +343,22 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, header->v_anti_entropy = mp_bswap_u16(batch_size); } +void +swim_member_payload_bin_create(struct swim_member_payload_bin *bin) +{ + bin->k_payload = SWIM_MEMBER_PAYLOAD; + bin->m_payload_size = 0xc5; +} + +void +swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, uint16_t size) +{ + bin->v_payload_size = mp_bswap_u16(size); +} + void swim_passport_bin_create(struct swim_passport_bin *passport) { - passport->m_header = 0x85; passport->k_status = SWIM_MEMBER_STATUS; passport->k_addr = SWIM_MEMBER_ADDRESS; passport->m_addr = 0xce; @@ -350,8 +375,10 @@ void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation) + enum swim_member_status status, uint64_t incarnation, + bool is_payload_needed) { + passport->m_header = 0x85 + is_payload_needed; passport->v_status = status; passport->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr)); passport->v_port = mp_bswap_u16(ntohs(addr->sin_port)); diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index ab4057185..607ac80dd 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -38,6 +38,11 @@ #include <stdbool.h> #include "swim_constants.h" +enum { + /** Reserve 272 bytes for headers. */ + MAX_PAYLOAD_SIZE = 1200, +}; + /** * SWIM binary protocol structures and helpers. Below is a picture * of a SWIM message template: @@ -67,7 +72,8 @@ * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint | + * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | * | ], | @@ -80,7 +86,8 @@ * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint | + * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | * | ], | @@ -103,6 +110,8 @@ struct swim_member_def { struct sockaddr_in addr; uint64_t incarnation; enum swim_member_status status; + const char *payload; + int payload_size; }; /** Initialize the definition with default values. */ @@ -242,6 +251,7 @@ enum swim_member_key { SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, SWIM_MEMBER_INCARNATION, + SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, }; @@ -305,6 +315,30 @@ struct PACKED swim_passport_bin { uint64_t v_incarnation; }; +/** + * SWIM member's payload header. Payload data should be encoded + * right after it. + */ +struct PACKED swim_member_payload_bin { + /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */ + uint8_t k_payload; + /** mp_encode_bin(16bit bin header) */ + uint8_t m_payload_size; + uint16_t v_payload_size; + /** Payload data ... */ +}; + +/** Initialize payload record. */ +void +swim_member_payload_bin_create(struct swim_member_payload_bin *bin); + +/** + * Fill a previously created payload record with an actual size. + */ +void +swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, + uint16_t size); + /** Initialize a member's binary passport. */ void swim_passport_bin_create(struct swim_passport_bin *passport); @@ -319,7 +353,8 @@ void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation); + enum swim_member_status status, uint64_t incarnation, + bool is_payload_needed); /** }}} Anti-entropy component */ diff --git a/test/unit/swim.c b/test/unit/swim.c index 48aea2f07..10d2b0bbd 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -36,6 +36,7 @@ #include "uri/uri.h" #include "swim/swim.h" #include "swim/swim_ev.h" +#include "swim/swim_proto.h" #include "swim_test_transport.h" #include "swim_test_ev.h" #include "swim_test_utils.h" @@ -642,10 +643,200 @@ swim_test_broadcast(void) swim_finish_test(); } +static void +swim_test_payload_basic(void) +{ + swim_start_test(11); + uint16_t size, cluster_size = 3; + struct swim_cluster *cluster = swim_cluster_new(cluster_size); + for (int i = 0; i < cluster_size; ++i) { + for (int j = i + 1; j < cluster_size; ++j) + swim_cluster_interconnect(cluster, i, j); + } + ok(swim_cluster_member_payload(cluster, 0, 0, &size) == NULL && + size == 0, "no payload by default"); + is(swim_cluster_member_set_payload(cluster, 0, NULL, 1300), -1, + "can not set too big payload"); + ok(swim_error_check_match("Payload should be <="), "diag says too big"); + + const char *s0_payload = "S1 payload"; + uint16_t s0_payload_size = strlen(s0_payload) + 1; + is(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size), 0, + "payload is set"); + is(swim_cluster_member_incarnation(cluster, 0, 0), 1, + "incarnation is incremeted on each payload update"); + const char *tmp = swim_cluster_member_payload(cluster, 0, 0, &size); + ok(size == s0_payload_size && memcmp(s0_payload, tmp, size) == 0, + "payload is successfully obtained back"); + + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "payload is disseminated"); + s0_payload = "S1 second version of payload"; + s0_payload_size = strlen(s0_payload) + 1; + is(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size), 0, + "payload is changed"); + is(swim_cluster_member_incarnation(cluster, 0, 0), 2, + "incarnation is incremeted on each payload update"); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "second payload is disseminated"); + /* + * Test that new incarnations help to rewrite the old + * payload from anti-entropy. + */ + swim_cluster_set_drop(cluster, 0, 100); + s0_payload = "S1 third version of payload"; + s0_payload_size = strlen(s0_payload) + 1; + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size) != 0); + /* Wait at least one round until payload TTL gets 0. */ + swim_run_for(3); + swim_cluster_set_drop(cluster, 0, 0); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "third payload is disseminated via anti-entropy"); + + swim_cluster_delete(cluster); + swim_finish_test(); +} + +static void +swim_test_payload_refutation(void) +{ + swim_start_test(11); + uint16_t size, cluster_size = 3; + struct swim_cluster *cluster = swim_cluster_new(cluster_size); + swim_cluster_set_ack_timeout(cluster, 1); + for (int i = 0; i < cluster_size; ++i) { + for (int j = i + 1; j < cluster_size; ++j) + swim_cluster_interconnect(cluster, i, j); + } + const char *s0_old_payload = "s0 payload"; + uint16_t s0_old_payload_size = strlen(s0_old_payload) + 1; + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_old_payload, + s0_old_payload_size) != 0); + fail_if(swim_cluster_wait_payload_everywhere(cluster, 0, s0_old_payload, + s0_old_payload_size, + 3) != 0); + /* + * The test checks the following case. Assume there are 3 + * nodes: S1, S2, S3. They all know each other. S1 sets + * new payload, S2 and S3 knows that. They all see that S1 + * has incarnation 1 and payload P1. + * + * Now S1 changes payload to P2. Its incarnation becomes + * 2. During next entire round its round messages are + * lost, however ACKs work ok. + */ + const char *s0_new_payload = "s0 second payload"; + uint16_t s0_new_payload_size = strlen(s0_new_payload); + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_new_payload, + s0_new_payload_size) != 0); + int components[2] = {SWIM_DISSEMINATION, SWIM_ANTI_ENTROPY}; + swim_cluster_drop_components(cluster, 0, components, 2); + swim_run_for(3); + swim_cluster_drop_components(cluster, 0, NULL, 0); + + is(swim_cluster_member_incarnation(cluster, 1, 0), 2, + "S2 sees new incarnation of S1"); + is(swim_cluster_member_incarnation(cluster, 2, 0), 2, + "S3 does the same"); + + const char *tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "but S2 does not known the new payload"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "as well as S3"); + + /* Restore normal ACK timeout. */ + swim_cluster_set_ack_timeout(cluster, 30); + + /* + * Now S1's payload TTL is 0, but via ACKs S1 sent its new + * incarnation to S2 and S3. Despite that they should + * apply new S1's payload via anti-entropy. Next lines + * test that: + * + * 1) S2 can apply new S1's payload from S1's + * anti-entropy; + * + * 2) S2 will not receive the old S1's payload from S3. + * S3 knows, that its payload is outdated, and should + * not send it; + * + * 2) S3 can apply new S1's payload from S2's + * anti-entropy. Note, that here S3 applies the payload + * not directly from the originator. It is the most + * complex case. + * + * Next lines test the case (1). + */ + + /* S3 does not participate in the test (1). */ + swim_cluster_set_drop(cluster, 2, 100); + swim_run_for(3); + + tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_new_payload_size && + memcmp(tmp, s0_new_payload, size) == 0, + "S2 learned S1's payload via anti-entropy"); + is(swim_cluster_member_incarnation(cluster, 1, 0), 2, + "incarnation still is the same"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "S3 was blocked and does not know anything"); + is(swim_cluster_member_incarnation(cluster, 2, 0), 2, + "incarnation still is the same"); + + /* S1 will not participate in the tests further. */ + swim_cluster_set_drop(cluster, 0, 100); + + /* + * Now check the case (2) - S3 will not send outdated + * version of S1's payload. To maintain the experimental + * integrity S1 and S2 are silent. Only S3 sends packets. + */ + swim_cluster_set_drop(cluster, 2, 0); + swim_cluster_set_drop_out(cluster, 1, 100); + swim_run_for(3); + + tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_new_payload_size && + memcmp(tmp, s0_new_payload, size) == 0, + "S2 keeps the same new S1's payload, S3 did not rewrite it"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "S3 still does not know anything"); + + /* + * Now check the case (3) - S3 accepts new S1's payload + * from S2. Even knowing the same S1's incarnation. + */ + swim_cluster_set_drop(cluster, 1, 0); + swim_cluster_set_drop_out(cluster, 2, 100); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_new_payload, + s0_new_payload_size, 3), 0, + "S3 learns S1's payload from S2") + + swim_cluster_delete(cluster); + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(15); + swim_start_test(17); (void) ap; swim_test_ev_init(); @@ -666,6 +857,8 @@ main_f(va_list ap) swim_test_quit(); swim_test_uri_update(); swim_test_broadcast(); + swim_test_payload_basic(); + swim_test_payload_refutation(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index d315f181f..8b0ab54aa 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..15 +1..17 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -147,4 +147,34 @@ ok 14 - subtests ok 6 - fullmesh is reached, and no one link was added explicitly ok 15 - subtests *** swim_test_broadcast: done *** + *** swim_test_payload_basic *** + 1..11 + ok 1 - no payload by default + ok 2 - can not set too big payload + ok 3 - diag says too big + ok 4 - payload is set + ok 5 - incarnation is incremeted on each payload update + ok 6 - payload is successfully obtained back + ok 7 - payload is disseminated + ok 8 - payload is changed + ok 9 - incarnation is incremeted on each payload update + ok 10 - second payload is disseminated + ok 11 - third payload is disseminated via anti-entropy +ok 16 - subtests + *** swim_test_payload_basic: done *** + *** swim_test_payload_refutation *** + 1..11 + ok 1 - S2 sees new incarnation of S1 + ok 2 - S3 does the same + ok 3 - but S2 does not known the new payload + ok 4 - as well as S3 + ok 5 - S2 learned S1's payload via anti-entropy + ok 6 - incarnation still is the same + ok 7 - S3 was blocked and does not know anything + ok 8 - incarnation still is the same + ok 9 - S2 keeps the same new S1's payload, S3 did not rewrite it + ok 10 - S3 still does not know anything + ok 11 - S3 learns S1's payload from S2 +ok 17 - subtests + *** swim_test_payload_refutation: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index fd528d166..45570cce5 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -191,6 +191,27 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, return swim_member_incarnation(m); } +const char * +swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, + int member_id, uint16_t *size) +{ + const struct swim_member *m = + swim_cluster_member_view(cluster, node_id, member_id); + if (m == NULL) { + *size = 0; + return NULL; + } + return swim_member_payload(m, size); +} + +int +swim_cluster_member_set_payload(struct swim_cluster *cluster, int i, + const char *payload, uint16_t size) +{ + struct swim *s = swim_cluster_node(cluster, i); + return swim_set_payload(s, payload, size); +} + struct swim * swim_cluster_node(struct swim_cluster *cluster, int i) { @@ -506,6 +527,13 @@ struct swim_member_template { */ bool need_check_incarnation; uint64_t incarnation; + /** + * True, if the payload should be checked to be equal to + * @a payload of size @a payload_size. + */ + bool need_check_payload; + const char *payload; + uint16_t payload_size; }; /** Build member template. No checks are set. */ @@ -542,6 +570,19 @@ swim_member_template_set_incarnation(struct swim_member_template *t, t->incarnation = incarnation; } +/** + * Set that the member template should be used to check member + * status. + */ +static inline void +swim_member_template_set_payload(struct swim_member_template *t, + const char *payload, uint16_t payload_size) +{ + t->need_check_payload = true; + t->payload = payload; + t->payload_size = payload_size; +} + /** Callback to check that a member matches a template. */ static bool swim_loop_check_member(struct swim_cluster *cluster, void *data) @@ -551,17 +592,26 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) swim_cluster_member_view(cluster, t->node_id, t->member_id); enum swim_member_status status; uint64_t incarnation; + const char *payload; + uint16_t payload_size; if (m != NULL) { status = swim_member_status(m); incarnation = swim_member_incarnation(m); + payload = swim_member_payload(m, &payload_size); } else { status = swim_member_status_MAX; incarnation = 0; + payload = NULL; + payload_size = 0; } if (t->need_check_status && status != t->status) return false; if (t->need_check_incarnation && incarnation != t->incarnation) return false; + if (t->need_check_payload && + (payload_size != t->payload_size || + memcmp(payload, t->payload, payload_size) != 0)) + return false; return true; } @@ -644,6 +694,18 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, swim_loop_check_member_everywhere, &t); } +int +swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, + int member_id, const char *payload, + uint16_t payload_size, double timeout) +{ + struct swim_member_template t; + swim_member_template_create(&t, -1, member_id); + swim_member_template_set_payload(&t, payload, payload_size); + return swim_wait_timeout(timeout, cluster, + swim_loop_check_member_everywhere, &t); +} + bool swim_error_check_match(const char *msg) { diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 6ea136e36..100a67e0c 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -141,6 +141,14 @@ uint64_t swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, int member_id); +const char * +swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, + int member_id, uint16_t *size); + +int +swim_cluster_member_set_payload(struct swim_cluster *cluster, int i, + const char *payload, uint16_t size); + /** * Check if in the cluster every instance knowns the about other * instances. @@ -192,6 +200,16 @@ swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, int member_id, uint64_t incarnation, double timeout); +/** + * Wait until a member with id @a member_id is seen with + * @a payload of size @a payload_size in the membership table of + * every instance in @a cluster. At most @a timeout seconds. + */ +int +swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, + int member_id, const char *payload, + uint16_t payload_size, double timeout); + /** Process SWIM events for @a duration fake seconds. */ void swim_run_for(double duration); -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 6/6] swim: introduce payload 2019-04-11 22:22 ` [tarantool-patches] [PATCH 6/6] swim: introduce payload Vladislav Shpilevoy @ 2019-04-18 15:12 ` Konstantin Osipov 2019-04-18 17:43 ` Vladislav Shpilevoy 0 siblings, 1 reply; 27+ messages in thread From: Konstantin Osipov @ 2019-04-18 15:12 UTC (permalink / raw) To: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: > Payload is arbitrary user data disseminated over the cluster > along with other member attributes. Please see my comments inline. > Part of #3234 > --- > src/lib/swim/swim.c | 155 ++++++++++++++++++++++++++-- > src/lib/swim/swim.h | 8 ++ > src/lib/swim/swim_proto.c | 31 +++++- > src/lib/swim/swim_proto.h | 41 +++++++- > test/unit/swim.c | 195 +++++++++++++++++++++++++++++++++++- > test/unit/swim.result | 32 +++++- > test/unit/swim_test_utils.c | 62 ++++++++++++ > test/unit/swim_test_utils.h | 18 ++++ > 8 files changed, 525 insertions(+), 17 deletions(-) > > diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c > index 2dac6eedd..2be1c846b 100644 > --- a/src/lib/swim/swim.c > +++ b/src/lib/swim/swim.c > @@ -308,6 +308,38 @@ struct swim_member { > * allow other members to learn the dead status. > */ > int status_ttl; > + /** Arbitrary user data, disseminated on each change. */ > + char *payload; > + /** Payload size, in bytes. */ > + uint16_t payload_size; > + /** > + * True, if the payload is thought to be of the most > + * actual version. In such a case it can be disseminated > + * farther. Otherwise @a payload is suspected to be farther -> further. > + * outdated and can be updated in two cases only: > + * > + * 1) when it is received with a bigger incarnation from > + * anywhere; > + * > + * 2) when it is received with the same incarnation, but > + * local payload is outdated. > + * > + * A payload can become outdated, if anyhow a new > + * incarnation of the member has been learned, but not a > + * new payload. Please describe how this can happen: the payload message is lost, but the server with the new payload responeded to ping request, and a new incarnation arrived along with the ack. > In such a case it can't be said exactly Nit: in such a case is very rarely used. Why not simply: in this case? > + * whether the member has updated payload, or another > + * attribute. The only way here is to wait until the most > + * actual payload will be received from another instance. > + * Note, that such an instance always exists - the payload > + * originator instance. > + */ > + bool is_payload_up_to_date; > + /** > + * TTL of payload. At most this number of times payload is > + * sent as a part of dissemination component. Reset on > + * each payload update. > + */ > + int payload_ttl; As agreed, let's rename ttl to ttd across the board and update the comments to say that ttd is time to disseminate. > +/** Update member's payload, register a corresponding event. */ > +static inline int > +swim_update_member_payload(struct swim *swim, struct swim_member *member, > + const char *payload, uint16_t payload_size, > + int incarnation_increment) Passing incarnation_increment raises a lot of questions when reading the code. I would not use this function from the constructor - I don't see any issue in copy-pasting 3 lines of code into the constructor to make both branches simpler. Or I would ban passing payload to the constructor and make this function public. > static int > swim_encode_member(struct swim_packet *packet, struct swim_member *m, > - struct swim_passport_bin *passport) > + struct swim_passport_bin *passport, > + struct swim_member_payload_bin *payload_header, > + bool is_payload_needed) Please rename is_payload_needed -> encode_payload. There is no comment for encode_payload and how it is used. I don't see why you can't move all the decision making about whether to encode the payload or not into this function. The payload should be encoded if: - its not null - payload ttl is greater than zero - it's not trustworthy. (is_payload_up_to_date). I would btw rename is_payload_up_to_date to is_payload_trustworthy - this name would be closer to truth. Why can't you look at these conditions once in swim_encode_member rather than evaluate them outside this function? BTW, there is no harm in encoding an empty payload all the time (e.g. mp_bin of size 0). This would make the code simpler - you would only need to look at payload_size to see if payload exists. > +int > +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size) > +{ > + if (payload_size > MAX_PAYLOAD_SIZE) { > + diag_set(IllegalParams, "Payload should be <= %d", > + MAX_PAYLOAD_SIZE); > + return -1; > + } > + return swim_update_member_payload(swim, swim->self, payload, > + payload_size, 1); > +} Like I said above, if there is swim_set_paylaod, there is no need to supply payload in swim_new_member(). > + def->payload_size = -1; Ugh. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 6/6] swim: introduce payload 2019-04-18 15:12 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-18 17:43 ` Vladislav Shpilevoy 2019-04-18 18:03 ` Konstantin Osipov 0 siblings, 1 reply; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-18 17:43 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov On 18/04/2019 18:12, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/12 01:25]: >> Payload is arbitrary user data disseminated over the cluster >> along with other member attributes. >> Part of #3234 >> --- >> src/lib/swim/swim.c | 155 ++++++++++++++++++++++++++-- >> src/lib/swim/swim.h | 8 ++ >> src/lib/swim/swim_proto.c | 31 +++++- >> src/lib/swim/swim_proto.h | 41 +++++++- >> test/unit/swim.c | 195 +++++++++++++++++++++++++++++++++++- >> test/unit/swim.result | 32 +++++- >> test/unit/swim_test_utils.c | 62 ++++++++++++ >> test/unit/swim_test_utils.h | 18 ++++ >> 8 files changed, 525 insertions(+), 17 deletions(-)>> + * whether the member has updated payload, or another >> + * attribute. The only way here is to wait until the most >> + * actual payload will be received from another instance. >> + * Note, that such an instance always exists - the payload >> + * originator instance. >> + */ >> + bool is_payload_up_to_date; >> + /** >> + * TTL of payload. At most this number of times payload is >> + * sent as a part of dissemination component. Reset on >> + * each payload update. >> + */ >> + int payload_ttl; > > As agreed, let's rename ttl to ttd across the board and update the > comments to say that ttd is time to disseminate. Cool, done in a separate commit. > >> +/** Update member's payload, register a corresponding event. */ >> +static inline int >> +swim_update_member_payload(struct swim *swim, struct swim_member *member, >> + const char *payload, uint16_t payload_size, >> + int incarnation_increment) > > Passing incarnation_increment raises a lot of questions when > reading the code. We already use it for address update, it simplified a lot of things, and most importantly encapsulated incarnation + TTD set into one place. I tried to avoid this argument for address, but it looked much worse, when TTD is reset in too many places - it is easy to miss something. The main goal I pursued was encapsulation of incarnation update and event registration on small attribute changes. Now I have these functions hiding that task inside: swim_update_member_inc_status swim_update_member_payload swim_update_member_addr With your way incarnation and member attribute update business will spread among literally every function touching members. > I would not use this function from the > constructor - I don't see any issue in copy-pasting 3 lines of > code into the constructor to make both branches simpler. It is not 3 lines, generally it is 6-7 lines per each swim_new_member() to call update_payload() afterward. I have 4 invocations of swim_new_member(), two of them pass payload and its size. Without encapsulated processing of payload update I would be forced to add more checks into two these two places with larger code duplication than you thought, which I would not want to do. These places are swim_upsert_member() and swim_cfg() - the functions are very sensitive to code duplication in terms of readability. During SWIM development I tried to keep them as clean and short as possible. I tried your way right now again, but it just looks much worse. Dropped. > > Or I would ban passing payload to the constructor and make this > function public. > >> static int >> swim_encode_member(struct swim_packet *packet, struct swim_member *m, >> - struct swim_passport_bin *passport) >> + struct swim_passport_bin *passport, >> + struct swim_member_payload_bin *payload_header, >> + bool is_payload_needed) > > Please rename is_payload_needed -> encode_payload. I followed our rules on flags naming: is_<...>. But ok, done. > There is no > comment for encode_payload and how it is used. Added a comment. > I don't see why you can't move all the decision making about > whether to encode the payload or not into this function. > > The payload should be encoded if: > - its not null > - payload ttl is greater than zero > - it's not trustworthy. (is_payload_up_to_date). I would btw > rename is_payload_up_to_date to is_payload_trustworthy - this > name would be closer to truth. > > Why can't you look at these conditions once in swim_encode_member > rather than evaluate them outside this function? Because anti-entropy and dissemination have different conditions when payload should be encoded. For anti-entropy it is enough for payload be up to date. For dissemination TTD > 0 additionally is required. Verbally decided to always check for is_payload_up_to_date inside the function, and use the argument encode_payload for the dissemination component only. > > BTW, there is no harm in encoding an empty payload all the time > (e.g. mp_bin of size 0). This would make the code simpler - you > would only need to look at payload_size to see if payload exists. This is what I do. I encode payload even when it is empty and up-to-date. > >> +int >> +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size) >> +{ >> + if (payload_size > MAX_PAYLOAD_SIZE) { >> + diag_set(IllegalParams, "Payload should be <= %d", >> + MAX_PAYLOAD_SIZE); >> + return -1; >> + } >> + return swim_update_member_payload(swim, swim->self, payload, >> + payload_size, 1); >> +} > > Like I said above, if there is swim_set_paylaod, there is no need > to supply payload in swim_new_member(). Swim_set_payload does not work on foreign members, it takes 'self' only. During members decoding and update I need to reset payloads of other members. > >> + def->payload_size = -1; > > Ugh. I need a way to detect whether payload was found in a packet and decoded. It is not sent always, because 1) dissemination does not send it when TTD == 0 or it is outdated, 2) anti-entropy does not send outdated payloads as well. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > A new version: =================================================================== diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 3e64e4c91..22760cdd7 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -309,6 +309,44 @@ struct swim_member { * allow other members to learn the dead status. */ int status_ttd; + /** Arbitrary user data, disseminated on each change. */ + char *payload; + /** Payload size, in bytes. */ + uint16_t payload_size; + /** + * True, if the payload is thought to be of the most + * actual version. In such a case it can be disseminated + * further. Otherwise @a payload is suspected to be + * outdated and can be updated in two cases only: + * + * 1) when it is received with a bigger incarnation from + * anywhere; + * + * 2) when it is received with the same incarnation, but + * local payload is outdated. + * + * A payload can become outdated, if anyhow a new + * incarnation of the member has been learned, but not a + * new payload. For example, a message with new payload + * could be lost, and at the same time this instance + * responds to a ping with newly incarnated ack. The ack + * receiver will learn the new incarnation, but not the + * new payload. + * + * In this case it can't be said exactly whether the + * member has updated payload, or another attribute. The + * only way here is to wait until the most actual payload + * will be received from another instance. Note, that such + * an instance always exists - the payload originator + * instance. + */ + bool is_payload_up_to_date; + /** + * TTD of payload. At most this number of times payload is + * sent as a part of dissemination component. Reset on + * each payload update. + */ + int payload_ttd; /** * All created events are put into a queue sorted by event * time. @@ -524,6 +562,34 @@ swim_by_scheduler(struct swim_scheduler *scheduler) return container_of(scheduler, struct swim, scheduler); } +/** Update member's payload, register a corresponding event. */ +static inline int +swim_update_member_payload(struct swim *swim, struct swim_member *member, + const char *payload, uint16_t payload_size, + int incarnation_increment) +{ + assert(payload_size <= MAX_PAYLOAD_SIZE); + char *new_payload; + if (payload_size > 0) { + new_payload = (char *) realloc(member->payload, payload_size); + if (new_payload == NULL) { + diag_set(OutOfMemory, payload_size, "realloc", "new_payload"); + return -1; + } + memcpy(new_payload, payload, payload_size); + } else { + free(member->payload); + new_payload = NULL; + } + member->payload = new_payload; + member->payload_size = payload_size; + member->payload_ttd = mh_size(swim->members); + member->incarnation += incarnation_increment; + member->is_payload_up_to_date = true; + swim_on_member_update(swim, member); + return 0; +} + /** * Once a ping is sent, the member should start waiting for an * ACK. @@ -557,6 +623,7 @@ swim_member_delete(struct swim_member *member) /* Dissemination component. */ assert(rlist_empty(&member->in_dissemination_queue)); + free(member->payload); free(member); } @@ -638,7 +705,7 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid) static struct swim_member * swim_new_member(struct swim *swim, const struct sockaddr_in *addr, const struct tt_uuid *uuid, enum swim_member_status status, - uint64_t incarnation) + uint64_t incarnation, const char *payload, int payload_size) { int new_bsize = sizeof(swim->shuffled[0]) * (mh_size(swim->members) + 1); @@ -676,6 +743,12 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, /* Dissemination component. */ swim_on_member_update(swim, member); + if (payload_size >= 0 && + swim_update_member_payload(swim, member, payload, + payload_size, 0) != 0) { + swim_delete_member(swim, member); + return NULL; + } say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim), swim_uuid_str(&member->uuid), mh_size(swim->members)); @@ -735,22 +808,40 @@ swim_new_round(struct swim *swim) /** * Encode one member into @a packet using @a passport structure. + * Note that this function does not make a decision whether + * payload should be encoded, because its callers have different + * conditions for that. The anti-entropy needs the payload be + * up-to-date. The dissemination component additionally needs + * TTD > 0. * @retval 0 Success, encoded. * @retval -1 Not enough memory in the packet. */ static int swim_encode_member(struct swim_packet *packet, struct swim_member *m, - struct swim_passport_bin *passport) + struct swim_passport_bin *passport, + struct swim_member_payload_bin *payload_header, + bool encode_payload) { /* The headers should be initialized. */ assert(passport->k_status == SWIM_MEMBER_STATUS); + assert(payload_header->k_payload == SWIM_MEMBER_PAYLOAD); int size = sizeof(*passport); + encode_payload = encode_payload && m->is_payload_up_to_date; + if (encode_payload) + size += sizeof(*payload_header) + m->payload_size; char *pos = swim_packet_alloc(packet, size); if (pos == NULL) return -1; swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status, - m->incarnation); + m->incarnation, encode_payload); memcpy(pos, passport, sizeof(*passport)); + if (encode_payload) { + pos += sizeof(*passport); + swim_member_payload_bin_fill(payload_header, m->payload_size); + memcpy(pos, payload_header, sizeof(*payload_header)); + pos += sizeof(*payload_header); + memcpy(pos, m->payload, m->payload_size); + } return 0; } @@ -764,17 +855,20 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet) { struct swim_anti_entropy_header_bin ae_header_bin; struct swim_passport_bin passport_bin; + struct swim_member_payload_bin payload_header; char *header = swim_packet_alloc(packet, sizeof(ae_header_bin)); if (header == NULL) return 0; swim_passport_bin_create(&passport_bin); + swim_member_payload_bin_create(&payload_header); struct mh_swim_table_t *t = swim->members; int i = 0, member_count = mh_size(t); int rnd = swim_scaled_rand(0, member_count - 1); for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t); i < member_count; ++i) { struct swim_member *m = *mh_swim_table_node(t, rc); - if (swim_encode_member(packet, m, &passport_bin) != 0) + if (swim_encode_member(packet, m, &passport_bin, + &payload_header, true) != 0) break; /* * First random member could be chosen too close @@ -834,16 +928,20 @@ static int swim_encode_dissemination(struct swim *swim, struct swim_packet *packet) { struct swim_diss_header_bin diss_header_bin; + struct swim_member_payload_bin payload_header; struct swim_passport_bin passport_bin; char *header = swim_packet_alloc(packet, sizeof(diss_header_bin)); if (header == NULL) return 0; swim_passport_bin_create(&passport_bin); + swim_member_payload_bin_create(&payload_header); int i = 0; struct swim_member *m; rlist_foreach_entry(m, &swim->dissemination_queue, in_dissemination_queue) { - if (swim_encode_member(packet, m, &passport_bin) != 0) + if (swim_encode_member(packet, m, &passport_bin, + &payload_header, + m->payload_ttd > 0) != 0) break; ++i; } @@ -891,6 +989,10 @@ swim_decrease_event_ttd(struct swim *swim) rlist_foreach_entry_safe(member, &swim->dissemination_queue, in_dissemination_queue, tmp) { + if (member->payload_ttd > 0) { + if (--member->payload_ttd == 0) + swim_cached_round_msg_invalidate(swim); + } if (--member->status_ttd == 0) { rlist_del_entry(member, in_dissemination_queue); swim_cached_round_msg_invalidate(swim); @@ -1066,8 +1168,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, { assert(member != swim->self); assert(def->incarnation >= member->incarnation); - if (def->incarnation > member->incarnation) + /* + * Payload update rules are simple: it can be updated + * either if the new payload has a bigger incarnation, or + * the same incarnation, but local payload is outdated. + */ + bool encode_payload = false; + if (def->incarnation > member->incarnation) { swim_update_member_addr(swim, member, &def->addr, 0); + if (def->payload_size >= 0) { + encode_payload = true; + } else if (member->is_payload_up_to_date) { + member->is_payload_up_to_date = false; + swim_on_member_update(swim, member); + } + } else if (! member->is_payload_up_to_date && def->payload_size >= 0) { + encode_payload = true; + } + if (encode_payload && + swim_update_member_payload(swim, member, def->payload, + def->payload_size, 0) != 0) { + /* Not such a critical error. */ + diag_log(); + } swim_update_member_inc_status(swim, member, def->status, def->incarnation); } @@ -1108,7 +1231,8 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, goto skip; } *result = swim_new_member(swim, &def->addr, &def->uuid, - def->status, def->incarnation); + def->status, def->incarnation, + def->payload, def->payload_size); return *result != NULL ? 0 : -1; } *result = member; @@ -1437,7 +1561,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, - 0); + 0, NULL, 0); if (swim->self == NULL) return -1; } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { @@ -1449,7 +1573,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } new_self = swim_new_member(swim, &swim->self->addr, uuid, - MEMBER_ALIVE, 0); + MEMBER_ALIVE, 0, swim->self->payload, + swim->self->payload_size); if (new_self == NULL) return -1; } @@ -1505,6 +1630,18 @@ swim_is_configured(const struct swim *swim) return swim->self != NULL; } +int +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size) +{ + if (payload_size > MAX_PAYLOAD_SIZE) { + diag_set(IllegalParams, "Payload should be <= %d", + MAX_PAYLOAD_SIZE); + return -1; + } + return swim_update_member_payload(swim, swim->self, payload, + payload_size, 1); +} + int swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) { @@ -1519,7 +1656,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) return -1; struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { - member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0); + member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0, + NULL, -1); return member == NULL ? -1 : 0; } diag_set(SwimError, "%s a member with such UUID already exists", @@ -1755,3 +1893,10 @@ swim_member_incarnation(const struct swim_member *member) { return member->incarnation; } + +const char * +swim_member_payload(const struct swim_member *member, uint16_t *size) +{ + *size = member->payload_size; + return member->payload; +} diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 09d933b83..6a219d131 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -104,6 +104,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, double swim_ack_timeout(const struct swim *swim); +/** Set payload to disseminate over the cluster. */ +int +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size); + /** * Stop listening and broadcasting messages, cleanup all internal * structures, free memory. @@ -220,6 +224,10 @@ swim_member_uuid(const struct swim_member *member); uint64_t swim_member_incarnation(const struct swim_member *member); +/** Member's payload. */ +const char * +swim_member_payload(const struct swim_member *member, uint16_t *size); + #if defined(__cplusplus) } #endif diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index d84550663..58c9ec119 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -160,6 +160,7 @@ swim_member_def_create(struct swim_member_def *def) memset(def, 0, sizeof(*def)); def->addr.sin_family = AF_INET; def->status = MEMBER_ALIVE; + def->payload_size = -1; } /** @@ -179,6 +180,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, struct swim_member_def *def) { uint64_t tmp; + uint32_t len; switch (key) { case SWIM_MEMBER_STATUS: if (swim_decode_uint(pos, end, &tmp, prefix, @@ -210,6 +212,17 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member incarnation") != 0) return -1; break; + case SWIM_MEMBER_PAYLOAD: + if (swim_decode_bin(&def->payload, &len, pos, end, prefix, + "member payload") != 0) + return -1; + if (len > MAX_PAYLOAD_SIZE) { + diag_set(SwimError, "%s member payload size should be "\ + "<= %d", prefix, MAX_PAYLOAD_SIZE); + return -1; + } + def->payload_size = (int) len; + break; default: unreachable(); } @@ -330,10 +343,22 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, header->v_anti_entropy = mp_bswap_u16(batch_size); } +void +swim_member_payload_bin_create(struct swim_member_payload_bin *bin) +{ + bin->k_payload = SWIM_MEMBER_PAYLOAD; + bin->m_payload_size = 0xc5; +} + +void +swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, uint16_t size) +{ + bin->v_payload_size = mp_bswap_u16(size); +} + void swim_passport_bin_create(struct swim_passport_bin *passport) { - passport->m_header = 0x85; passport->k_status = SWIM_MEMBER_STATUS; passport->k_addr = SWIM_MEMBER_ADDRESS; passport->m_addr = 0xce; @@ -350,8 +375,10 @@ void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation) + enum swim_member_status status, uint64_t incarnation, + bool encode_payload) { + passport->m_header = 0x85 + encode_payload; passport->v_status = status; passport->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr)); passport->v_port = mp_bswap_u16(ntohs(addr->sin_port)); diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index e1c70db43..23d339e80 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -38,6 +38,11 @@ #include <stdbool.h> #include "swim_constants.h" +enum { + /** Reserve 272 bytes for headers. */ + MAX_PAYLOAD_SIZE = 1200, +}; + /** * SWIM binary protocol structures and helpers. Below is a picture * of a SWIM message template: @@ -67,7 +72,8 @@ * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint | + * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | * | ], | @@ -80,7 +86,8 @@ * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint | + * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | * | ], | @@ -103,6 +110,8 @@ struct swim_member_def { struct sockaddr_in addr; uint64_t incarnation; enum swim_member_status status; + const char *payload; + int payload_size; }; /** Initialize the definition with default values. */ @@ -242,6 +251,7 @@ enum swim_member_key { SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, SWIM_MEMBER_INCARNATION, + SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, }; @@ -305,6 +315,30 @@ struct PACKED swim_passport_bin { uint64_t v_incarnation; }; +/** + * SWIM member's payload header. Payload data should be encoded + * right after it. + */ +struct PACKED swim_member_payload_bin { + /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */ + uint8_t k_payload; + /** mp_encode_bin(16bit bin header) */ + uint8_t m_payload_size; + uint16_t v_payload_size; + /** Payload data ... */ +}; + +/** Initialize payload record. */ +void +swim_member_payload_bin_create(struct swim_member_payload_bin *bin); + +/** + * Fill a previously created payload record with an actual size. + */ +void +swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, + uint16_t size); + /** Initialize a member's binary passport. */ void swim_passport_bin_create(struct swim_passport_bin *passport); @@ -319,7 +353,8 @@ void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation); + enum swim_member_status status, uint64_t incarnation, + bool encode_payload); /** }}} Anti-entropy component */ diff --git a/test/unit/swim.c b/test/unit/swim.c index 6f3871606..0b8058f0b 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -36,6 +36,7 @@ #include "uri/uri.h" #include "swim/swim.h" #include "swim/swim_ev.h" +#include "swim/swim_proto.h" #include "swim_test_transport.h" #include "swim_test_ev.h" #include "swim_test_utils.h" @@ -642,10 +643,200 @@ swim_test_broadcast(void) swim_finish_test(); } +static void +swim_test_payload_basic(void) +{ + swim_start_test(11); + uint16_t size, cluster_size = 3; + struct swim_cluster *cluster = swim_cluster_new(cluster_size); + for (int i = 0; i < cluster_size; ++i) { + for (int j = i + 1; j < cluster_size; ++j) + swim_cluster_interconnect(cluster, i, j); + } + ok(swim_cluster_member_payload(cluster, 0, 0, &size) == NULL && + size == 0, "no payload by default"); + is(swim_cluster_member_set_payload(cluster, 0, NULL, 1300), -1, + "can not set too big payload"); + ok(swim_error_check_match("Payload should be <="), "diag says too big"); + + const char *s0_payload = "S1 payload"; + uint16_t s0_payload_size = strlen(s0_payload) + 1; + is(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size), 0, + "payload is set"); + is(swim_cluster_member_incarnation(cluster, 0, 0), 1, + "incarnation is incremeted on each payload update"); + const char *tmp = swim_cluster_member_payload(cluster, 0, 0, &size); + ok(size == s0_payload_size && memcmp(s0_payload, tmp, size) == 0, + "payload is successfully obtained back"); + + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "payload is disseminated"); + s0_payload = "S1 second version of payload"; + s0_payload_size = strlen(s0_payload) + 1; + is(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size), 0, + "payload is changed"); + is(swim_cluster_member_incarnation(cluster, 0, 0), 2, + "incarnation is incremeted on each payload update"); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "second payload is disseminated"); + /* + * Test that new incarnations help to rewrite the old + * payload from anti-entropy. + */ + swim_cluster_set_drop(cluster, 0, 100); + s0_payload = "S1 third version of payload"; + s0_payload_size = strlen(s0_payload) + 1; + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size) != 0); + /* Wait at least one round until payload TTD gets 0. */ + swim_run_for(3); + swim_cluster_set_drop(cluster, 0, 0); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "third payload is disseminated via anti-entropy"); + + swim_cluster_delete(cluster); + swim_finish_test(); +} + +static void +swim_test_payload_refutation(void) +{ + swim_start_test(11); + uint16_t size, cluster_size = 3; + struct swim_cluster *cluster = swim_cluster_new(cluster_size); + swim_cluster_set_ack_timeout(cluster, 1); + for (int i = 0; i < cluster_size; ++i) { + for (int j = i + 1; j < cluster_size; ++j) + swim_cluster_interconnect(cluster, i, j); + } + const char *s0_old_payload = "s0 payload"; + uint16_t s0_old_payload_size = strlen(s0_old_payload) + 1; + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_old_payload, + s0_old_payload_size) != 0); + fail_if(swim_cluster_wait_payload_everywhere(cluster, 0, s0_old_payload, + s0_old_payload_size, + 3) != 0); + /* + * The test checks the following case. Assume there are 3 + * nodes: S1, S2, S3. They all know each other. S1 sets + * new payload, S2 and S3 knows that. They all see that S1 + * has incarnation 1 and payload P1. + * + * Now S1 changes payload to P2. Its incarnation becomes + * 2. During next entire round its round messages are + * lost, however ACKs work ok. + */ + const char *s0_new_payload = "s0 second payload"; + uint16_t s0_new_payload_size = strlen(s0_new_payload); + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_new_payload, + s0_new_payload_size) != 0); + int components[2] = {SWIM_DISSEMINATION, SWIM_ANTI_ENTROPY}; + swim_cluster_drop_components(cluster, 0, components, 2); + swim_run_for(3); + swim_cluster_drop_components(cluster, 0, NULL, 0); + + is(swim_cluster_member_incarnation(cluster, 1, 0), 2, + "S2 sees new incarnation of S1"); + is(swim_cluster_member_incarnation(cluster, 2, 0), 2, + "S3 does the same"); + + const char *tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "but S2 does not known the new payload"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "as well as S3"); + + /* Restore normal ACK timeout. */ + swim_cluster_set_ack_timeout(cluster, 30); + + /* + * Now S1's payload TTD is 0, but via ACKs S1 sent its new + * incarnation to S2 and S3. Despite that they should + * apply new S1's payload via anti-entropy. Next lines + * test that: + * + * 1) S2 can apply new S1's payload from S1's + * anti-entropy; + * + * 2) S2 will not receive the old S1's payload from S3. + * S3 knows, that its payload is outdated, and should + * not send it; + * + * 2) S3 can apply new S1's payload from S2's + * anti-entropy. Note, that here S3 applies the payload + * not directly from the originator. It is the most + * complex case. + * + * Next lines test the case (1). + */ + + /* S3 does not participate in the test (1). */ + swim_cluster_set_drop(cluster, 2, 100); + swim_run_for(3); + + tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_new_payload_size && + memcmp(tmp, s0_new_payload, size) == 0, + "S2 learned S1's payload via anti-entropy"); + is(swim_cluster_member_incarnation(cluster, 1, 0), 2, + "incarnation still is the same"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "S3 was blocked and does not know anything"); + is(swim_cluster_member_incarnation(cluster, 2, 0), 2, + "incarnation still is the same"); + + /* S1 will not participate in the tests further. */ + swim_cluster_set_drop(cluster, 0, 100); + + /* + * Now check the case (2) - S3 will not send outdated + * version of S1's payload. To maintain the experimental + * integrity S1 and S2 are silent. Only S3 sends packets. + */ + swim_cluster_set_drop(cluster, 2, 0); + swim_cluster_set_drop_out(cluster, 1, 100); + swim_run_for(3); + + tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_new_payload_size && + memcmp(tmp, s0_new_payload, size) == 0, + "S2 keeps the same new S1's payload, S3 did not rewrite it"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "S3 still does not know anything"); + + /* + * Now check the case (3) - S3 accepts new S1's payload + * from S2. Even knowing the same S1's incarnation. + */ + swim_cluster_set_drop(cluster, 1, 0); + swim_cluster_set_drop_out(cluster, 2, 100); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_new_payload, + s0_new_payload_size, 3), 0, + "S3 learns S1's payload from S2") + + swim_cluster_delete(cluster); + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(15); + swim_start_test(17); (void) ap; swim_test_ev_init(); @@ -666,6 +857,8 @@ main_f(va_list ap) swim_test_quit(); swim_test_uri_update(); swim_test_broadcast(); + swim_test_payload_basic(); + swim_test_payload_refutation(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index a90a86dd0..4b1407db3 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..15 +1..17 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -147,4 +147,34 @@ ok 14 - subtests ok 6 - fullmesh is reached, and no one link was added explicitly ok 15 - subtests *** swim_test_broadcast: done *** + *** swim_test_payload_basic *** + 1..11 + ok 1 - no payload by default + ok 2 - can not set too big payload + ok 3 - diag says too big + ok 4 - payload is set + ok 5 - incarnation is incremeted on each payload update + ok 6 - payload is successfully obtained back + ok 7 - payload is disseminated + ok 8 - payload is changed + ok 9 - incarnation is incremeted on each payload update + ok 10 - second payload is disseminated + ok 11 - third payload is disseminated via anti-entropy +ok 16 - subtests + *** swim_test_payload_basic: done *** + *** swim_test_payload_refutation *** + 1..11 + ok 1 - S2 sees new incarnation of S1 + ok 2 - S3 does the same + ok 3 - but S2 does not known the new payload + ok 4 - as well as S3 + ok 5 - S2 learned S1's payload via anti-entropy + ok 6 - incarnation still is the same + ok 7 - S3 was blocked and does not know anything + ok 8 - incarnation still is the same + ok 9 - S2 keeps the same new S1's payload, S3 did not rewrite it + ok 10 - S3 still does not know anything + ok 11 - S3 learns S1's payload from S2 +ok 17 - subtests + *** swim_test_payload_refutation: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index fd528d166..45570cce5 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -191,6 +191,27 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, return swim_member_incarnation(m); } +const char * +swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, + int member_id, uint16_t *size) +{ + const struct swim_member *m = + swim_cluster_member_view(cluster, node_id, member_id); + if (m == NULL) { + *size = 0; + return NULL; + } + return swim_member_payload(m, size); +} + +int +swim_cluster_member_set_payload(struct swim_cluster *cluster, int i, + const char *payload, uint16_t size) +{ + struct swim *s = swim_cluster_node(cluster, i); + return swim_set_payload(s, payload, size); +} + struct swim * swim_cluster_node(struct swim_cluster *cluster, int i) { @@ -506,6 +527,13 @@ struct swim_member_template { */ bool need_check_incarnation; uint64_t incarnation; + /** + * True, if the payload should be checked to be equal to + * @a payload of size @a payload_size. + */ + bool need_check_payload; + const char *payload; + uint16_t payload_size; }; /** Build member template. No checks are set. */ @@ -542,6 +570,19 @@ swim_member_template_set_incarnation(struct swim_member_template *t, t->incarnation = incarnation; } +/** + * Set that the member template should be used to check member + * status. + */ +static inline void +swim_member_template_set_payload(struct swim_member_template *t, + const char *payload, uint16_t payload_size) +{ + t->need_check_payload = true; + t->payload = payload; + t->payload_size = payload_size; +} + /** Callback to check that a member matches a template. */ static bool swim_loop_check_member(struct swim_cluster *cluster, void *data) @@ -551,17 +592,26 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) swim_cluster_member_view(cluster, t->node_id, t->member_id); enum swim_member_status status; uint64_t incarnation; + const char *payload; + uint16_t payload_size; if (m != NULL) { status = swim_member_status(m); incarnation = swim_member_incarnation(m); + payload = swim_member_payload(m, &payload_size); } else { status = swim_member_status_MAX; incarnation = 0; + payload = NULL; + payload_size = 0; } if (t->need_check_status && status != t->status) return false; if (t->need_check_incarnation && incarnation != t->incarnation) return false; + if (t->need_check_payload && + (payload_size != t->payload_size || + memcmp(payload, t->payload, payload_size) != 0)) + return false; return true; } @@ -644,6 +694,18 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, swim_loop_check_member_everywhere, &t); } +int +swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, + int member_id, const char *payload, + uint16_t payload_size, double timeout) +{ + struct swim_member_template t; + swim_member_template_create(&t, -1, member_id); + swim_member_template_set_payload(&t, payload, payload_size); + return swim_wait_timeout(timeout, cluster, + swim_loop_check_member_everywhere, &t); +} + bool swim_error_check_match(const char *msg) { diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 6ea136e36..100a67e0c 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -141,6 +141,14 @@ uint64_t swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, int member_id); +const char * +swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, + int member_id, uint16_t *size); + +int +swim_cluster_member_set_payload(struct swim_cluster *cluster, int i, + const char *payload, uint16_t size); + /** * Check if in the cluster every instance knowns the about other * instances. @@ -192,6 +200,16 @@ swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, int member_id, uint64_t incarnation, double timeout); +/** + * Wait until a member with id @a member_id is seen with + * @a payload of size @a payload_size in the membership table of + * every instance in @a cluster. At most @a timeout seconds. + */ +int +swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, + int member_id, const char *payload, + uint16_t payload_size, double timeout); + /** Process SWIM events for @a duration fake seconds. */ void swim_run_for(double duration); ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 6/6] swim: introduce payload 2019-04-18 17:43 ` Vladislav Shpilevoy @ 2019-04-18 18:03 ` Konstantin Osipov 2019-04-18 20:40 ` Vladislav Shpilevoy 0 siblings, 1 reply; 27+ messages in thread From: Konstantin Osipov @ 2019-04-18 18:03 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/18 20:45]: Ok to push. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 6/6] swim: introduce payload 2019-04-18 18:03 ` Konstantin Osipov @ 2019-04-18 20:40 ` Vladislav Shpilevoy 0 siblings, 0 replies; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-18 20:40 UTC (permalink / raw) To: Konstantin Osipov; +Cc: tarantool-patches Pushed into the master. On 18/04/2019 21:03, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/18 20:45]: > > Ok to push. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] [PATCH 5.5/6] swim: rename TTL to TTD 2019-04-11 22:22 [tarantool-patches] [PATCH 0/6] swim payload Vladislav Shpilevoy ` (5 preceding siblings ...) 2019-04-11 22:22 ` [tarantool-patches] [PATCH 6/6] swim: introduce payload Vladislav Shpilevoy @ 2019-04-18 17:43 ` Vladislav Shpilevoy 2019-04-18 17:48 ` [tarantool-patches] " Konstantin Osipov 2019-04-18 18:16 ` [tarantool-patches] [PATCH 7/6] swim: drop incarnation_inc parameter from update() routines Vladislav Shpilevoy 7 siblings, 1 reply; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-18 17:43 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja TTL is time-to-live and it slightly confuses when is said about a member's attribute. Status_ttl looks like after this value gets 0 the status is deleted or is no longer valid. TTD is more precise definition for these counters and is expanded as time-to-disseminate. --- src/lib/swim/swim.c | 41 ++++++++++++++++++++------------------- src/lib/swim/swim_proto.h | 2 +- test/unit/swim.c | 4 ++-- test/unit/swim.result | 4 ++-- 4 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 2dac6eedd..3e64e4c91 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -284,30 +284,31 @@ struct swim_member { * member state each time any member attribute changes. * * According to SWIM, an event should be sent to all - * members at least once - for that a TTL (time-to-live) - * counter is maintained for each independent event type. + * members at least once - for that a TTD + * (time-to-disseminate) counter is maintained for each + * independent event type. * - * When a member state changes, the TTL is reset to the + * When a member state changes, the TTD is reset to the * cluster size. It is then decremented after each send. * This guarantees that each member state change is sent * to each SWIM member at least once. If a new event of * the same type is generated before a round is finished, * the current event object is updated in place with reset - * of the TTL. + * of the TTD. * - * To conclude, TTL works in two ways: to see which + * To conclude, TTD works in two ways: to see which * specific member attribute needs dissemination and to * track how many cluster members still need to learn * about the change from this instance. */ /** - * General TTL reset each time when any visible member + * General TTD reset each time when any visible member * attribute is updated. It is always bigger or equal than - * any other TTLs. In addition it helps to keep a dead - * member not dropped until the TTL gets zero so as to + * any other TTDs. In addition it helps to keep a dead + * member not dropped until the TTD gets zero so as to * allow other members to learn the dead status. */ - int status_ttl; + int status_ttd; /** * All created events are put into a queue sorted by event * time. @@ -419,7 +420,7 @@ struct swim { * Queue of all members which have dissemination * information. A member is added to the queue whenever * any of its attributes changes, and stays in the queue - * as long as the event TTL is non-zero. + * as long as the event TTD is non-zero. */ struct rlist dissemination_queue; }; @@ -444,9 +445,9 @@ swim_wait_ack(struct swim *swim, struct swim_member *member) /** * On literally any update of a member it is added to a queue of - * members to disseminate updates. Regardless of other TTLs, each - * update also resets status TTL. Status TTL is always greater - * than any other event-related TTL, so it's sufficient to look at + * members to disseminate updates. Regardless of other TTDs, each + * update also resets status TTD. Status TTD is always greater + * than any other event-related TTD, so it's sufficient to look at * it alone to see that a member needs information dissemination. * The status change itself occupies only 2 bytes in a packet, so * it is cheap to send it on any update, while does reduce @@ -459,7 +460,7 @@ swim_register_event(struct swim *swim, struct swim_member *member) rlist_add_tail_entry(&swim->dissemination_queue, member, in_dissemination_queue); } - member->status_ttl = mh_size(swim->members); + member->status_ttd = mh_size(swim->members); swim_cached_round_msg_invalidate(swim); } @@ -872,8 +873,8 @@ swim_encode_round_msg(struct swim *swim) } /** - * Decrement TTLs of all events. It is done after each round step. - * Note, since we decrement TTL of all events, even those which + * Decrement TTDs of all events. It is done after each round step. + * Note, since we decrement TTD of all events, even those which * have not been actually encoded and sent, if there are more * events than can fit into a packet, the tail of the queue begins * reeking and rotting. The most recently added members could even @@ -884,13 +885,13 @@ swim_encode_round_msg(struct swim *swim) * deal with. */ static void -swim_decrease_event_ttl(struct swim *swim) +swim_decrease_event_ttd(struct swim *swim) { struct swim_member *member, *tmp; rlist_foreach_entry_safe(member, &swim->dissemination_queue, in_dissemination_queue, tmp) { - if (--member->status_ttl == 0) { + if (--member->status_ttd == 0) { rlist_del_entry(member, in_dissemination_queue); swim_cached_round_msg_invalidate(swim); if (member->status == MEMBER_LEFT) @@ -959,7 +960,7 @@ swim_complete_step(struct swim_task *task, * sections. */ swim_wait_ack(swim, m); - swim_decrease_event_ttl(swim); + swim_decrease_event_ttd(swim); } } } @@ -1029,7 +1030,7 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events) break; case MEMBER_DEAD: if (m->unacknowledged_pings >= NO_ACKS_TO_GC && - swim->gc_mode == SWIM_GC_ON && m->status_ttl == 0) { + swim->gc_mode == SWIM_GC_ON && m->status_ttd == 0) { swim_delete_member(swim, m); continue; } diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index ab4057185..e1c70db43 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -268,7 +268,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, * and add more attributes. Or just encode new attributes after * the passport. For example, anti-entropy can add a payload when * it is up to date; dissemination adds a payload when it is up to - * date and TTL is > 0. + * date and TTD is > 0. */ struct PACKED swim_passport_bin { /** mp_encode_map(5) */ diff --git a/test/unit/swim.c b/test/unit/swim.c index f3cf6cd06..6f3871606 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -254,10 +254,10 @@ swim_test_basic_failure_detection(void) swim_run_for(1); is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD, "after 2 "\ - "more unacks the member still is not deleted - dissemination TTL "\ + "more unacks the member still is not deleted - dissemination TTD "\ "keeps it"); is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX, 2), - 0, "but it is dropped after 2 rounds when TTL gets 0"); + 0, "but it is dropped after 2 rounds when TTD gets 0"); /* * After IO unblock pending messages will be processed all diff --git a/test/unit/swim.result b/test/unit/swim.result index d315f181f..a90a86dd0 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -68,8 +68,8 @@ ok 5 - subtests ok 1 - node is added as alive ok 2 - member still is not dead after 2 noacks ok 3 - but it is dead after one more - ok 4 - after 2 more unacks the member still is not deleted - dissemination TTL keeps it - ok 5 - but it is dropped after 2 rounds when TTL gets 0 + ok 4 - after 2 more unacks the member still is not deleted - dissemination TTD keeps it + ok 5 - but it is dropped after 2 rounds when TTD gets 0 ok 6 - fullmesh is restored ok 7 - a member is added back on an ACK ok 6 - subtests -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 5.5/6] swim: rename TTL to TTD 2019-04-18 17:43 ` [tarantool-patches] [PATCH 5.5/6] swim: rename TTL to TTD Vladislav Shpilevoy @ 2019-04-18 17:48 ` Konstantin Osipov 2019-04-18 20:40 ` Vladislav Shpilevoy 0 siblings, 1 reply; 27+ messages in thread From: Konstantin Osipov @ 2019-04-18 17:48 UTC (permalink / raw) To: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/18 20:45]: > TTL is time-to-live and it slightly confuses when is said about a > member's attribute. Status_ttl looks like after this value gets > 0 the status is deleted or is no longer valid. OK to push. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 5.5/6] swim: rename TTL to TTD 2019-04-18 17:48 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-18 20:40 ` Vladislav Shpilevoy 0 siblings, 0 replies; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-18 20:40 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed into the master. On 18/04/2019 20:48, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/18 20:45]: >> TTL is time-to-live and it slightly confuses when is said about a >> member's attribute. Status_ttl looks like after this value gets >> 0 the status is deleted or is no longer valid. > > OK to push. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] [PATCH 7/6] swim: drop incarnation_inc parameter from update() routines 2019-04-11 22:22 [tarantool-patches] [PATCH 0/6] swim payload Vladislav Shpilevoy ` (6 preceding siblings ...) 2019-04-18 17:43 ` [tarantool-patches] [PATCH 5.5/6] swim: rename TTL to TTD Vladislav Shpilevoy @ 2019-04-18 18:16 ` Vladislav Shpilevoy 2019-04-18 18:20 ` [tarantool-patches] " Konstantin Osipov 7 siblings, 1 reply; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-18 18:16 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja Update_addr and update_payload need to increment member's incarnation when it is self. For that they used a special parameter incarnation_inc set in 1 for self and in 0 for others. It was used to encapsulate incarnation update + event scheduling on member attribute updates, but on the other hand it broke another encapsulation level - there should not be exceptions for 'self' in these functions. This patch makes incarnation increment explicit in the places where 'self' is updated. --- diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 22760cdd7..86b45a2da 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -565,8 +565,7 @@ swim_by_scheduler(struct swim_scheduler *scheduler) /** Update member's payload, register a corresponding event. */ static inline int swim_update_member_payload(struct swim *swim, struct swim_member *member, - const char *payload, uint16_t payload_size, - int incarnation_increment) + const char *payload, uint16_t payload_size) { assert(payload_size <= MAX_PAYLOAD_SIZE); char *new_payload; @@ -584,7 +583,6 @@ swim_update_member_payload(struct swim *swim, struct swim_member *member, member->payload = new_payload; member->payload_size = payload_size; member->payload_ttd = mh_size(swim->members); - member->incarnation += incarnation_increment; member->is_payload_up_to_date = true; swim_on_member_update(swim, member); return 0; @@ -745,7 +743,7 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, swim_on_member_update(swim, member); if (payload_size >= 0 && swim_update_member_payload(swim, member, payload, - payload_size, 0) != 0) { + payload_size) != 0) { swim_delete_member(swim, member); return NULL; } @@ -1149,13 +1147,10 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events) /** Update member's address.*/ static inline void swim_update_member_addr(struct swim *swim, struct swim_member *member, - const struct sockaddr_in *addr, int incarnation_inc) + const struct sockaddr_in *addr) { - if (! swim_sockaddr_in_eq(addr, &member->addr)) { - member->incarnation += incarnation_inc; - member->addr = *addr; - swim_on_member_update(swim, member); - } + member->addr = *addr; + swim_on_member_update(swim, member); } /** @@ -1175,7 +1170,8 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, */ bool encode_payload = false; if (def->incarnation > member->incarnation) { - swim_update_member_addr(swim, member, &def->addr, 0); + if (! swim_sockaddr_in_eq(&def->addr, &member->addr)) + swim_update_member_addr(swim, member, &def->addr); if (def->payload_size >= 0) { encode_payload = true; } else if (member->is_payload_up_to_date) { @@ -1187,7 +1183,7 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, } if (encode_payload && swim_update_member_payload(swim, member, def->payload, - def->payload_size, 0) != 0) { + def->payload_size) != 0) { /* Not such a critical error. */ diag_log(); } @@ -1612,7 +1608,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, swim_on_member_update(swim, swim->self); swim->self = new_self; } - swim_update_member_addr(swim, swim->self, &addr, 1); + if (! swim_sockaddr_in_eq(&addr, &swim->self->addr)) { + swim->self->incarnation++; + swim_update_member_addr(swim, swim->self, &addr); + } if (gc_mode != SWIM_GC_DEFAULT) swim->gc_mode = gc_mode; return 0; @@ -1638,8 +1637,12 @@ swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size) MAX_PAYLOAD_SIZE); return -1; } - return swim_update_member_payload(swim, swim->self, payload, - payload_size, 1); + struct swim_member *self = swim->self; + if (swim_update_member_payload(swim, self, payload, payload_size) != 0) + return -1; + self->incarnation++; + swim_on_member_update(swim, self); + return 0; } int ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 7/6] swim: drop incarnation_inc parameter from update() routines 2019-04-18 18:16 ` [tarantool-patches] [PATCH 7/6] swim: drop incarnation_inc parameter from update() routines Vladislav Shpilevoy @ 2019-04-18 18:20 ` Konstantin Osipov 2019-04-18 20:40 ` Vladislav Shpilevoy 0 siblings, 1 reply; 27+ messages in thread From: Konstantin Osipov @ 2019-04-18 18:20 UTC (permalink / raw) To: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/18 21:18]: > Update_addr and update_payload need to increment member's > incarnation when it is self. For that they used a special > parameter incarnation_inc set in 1 for self and in 0 for others. > > It was used to encapsulate incarnation update + event scheduling > on member attribute updates, but on the other hand it broke > another encapsulation level - there should not be exceptions for > 'self' in these functions. > > This patch makes incarnation increment explicit in the places > where 'self' is updated. ok to push -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 27+ messages in thread
* [tarantool-patches] Re: [PATCH 7/6] swim: drop incarnation_inc parameter from update() routines 2019-04-18 18:20 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-18 20:40 ` Vladislav Shpilevoy 0 siblings, 0 replies; 27+ messages in thread From: Vladislav Shpilevoy @ 2019-04-18 20:40 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed into the master. On 18/04/2019 21:20, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/18 21:18]: >> Update_addr and update_payload need to increment member's >> incarnation when it is self. For that they used a special >> parameter incarnation_inc set in 1 for self and in 0 for others. >> >> It was used to encapsulate incarnation update + event scheduling >> on member attribute updates, but on the other hand it broke >> another encapsulation level - there should not be exceptions for >> 'self' in these functions. >> >> This patch makes incarnation increment explicit in the places >> where 'self' is updated. > > ok to push > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 27+ messages in thread
end of thread, other threads:[~2019-04-18 20:40 UTC | newest] Thread overview: 27+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2019-04-11 22:22 [tarantool-patches] [PATCH 0/6] swim payload Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 1/6] swim: factor out MP_BIN decoding from swim_decode_uuid Vladislav Shpilevoy 2019-04-11 23:09 ` [tarantool-patches] " Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 2/6] swim: replace event_bin and member_bin with the passport Vladislav Shpilevoy 2019-04-11 23:10 ` [tarantool-patches] " Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 3/6] swim: factor out 'update' part of swim_member_upsert() Vladislav Shpilevoy 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 4/6] test: generalize SWIM fake descriptor filters Vladislav Shpilevoy 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 5/6] test: introduce new SWIM packet filter by component names Vladislav Shpilevoy 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 6/6] swim: introduce payload Vladislav Shpilevoy 2019-04-18 15:12 ` [tarantool-patches] " Konstantin Osipov 2019-04-18 17:43 ` Vladislav Shpilevoy 2019-04-18 18:03 ` Konstantin Osipov 2019-04-18 20:40 ` Vladislav Shpilevoy 2019-04-18 17:43 ` [tarantool-patches] [PATCH 5.5/6] swim: rename TTL to TTD Vladislav Shpilevoy 2019-04-18 17:48 ` [tarantool-patches] " Konstantin Osipov 2019-04-18 20:40 ` Vladislav Shpilevoy 2019-04-18 18:16 ` [tarantool-patches] [PATCH 7/6] swim: drop incarnation_inc parameter from update() routines Vladislav Shpilevoy 2019-04-18 18:20 ` [tarantool-patches] " Konstantin Osipov 2019-04-18 20:40 ` Vladislav Shpilevoy
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox