From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id A1EA22BE43 for ; Thu, 18 Apr 2019 13:43:43 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id QwAtQjzNAG4C for ; Thu, 18 Apr 2019 13:43:43 -0400 (EDT) Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 0CE0B2BDB2 for ; Thu, 18 Apr 2019 13:43:43 -0400 (EDT) Subject: [tarantool-patches] Re: [PATCH 6/6] swim: introduce payload References: <04b17677a6e24d46587f718733000f426e1176e7.1555021137.git.v.shpilevoy@tarantool.org> <20190418151225.GA13022@chai> From: Vladislav Shpilevoy Message-ID: <1dedcec0-97cc-e955-91d1-4690b01b6d70@tarantool.org> Date: Thu, 18 Apr 2019 20:43:40 +0300 MIME-Version: 1.0 In-Reply-To: <20190418151225.GA13022@chai> Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org, Konstantin Osipov On 18/04/2019 18:12, Konstantin Osipov wrote: > * Vladislav Shpilevoy [19/04/12 01:25]: >> Payload is arbitrary user data disseminated over the cluster >> along with other member attributes. >> Part of #3234 >> --- >> src/lib/swim/swim.c | 155 ++++++++++++++++++++++++++-- >> src/lib/swim/swim.h | 8 ++ >> src/lib/swim/swim_proto.c | 31 +++++- >> src/lib/swim/swim_proto.h | 41 +++++++- >> test/unit/swim.c | 195 +++++++++++++++++++++++++++++++++++- >> test/unit/swim.result | 32 +++++- >> test/unit/swim_test_utils.c | 62 ++++++++++++ >> test/unit/swim_test_utils.h | 18 ++++ >> 8 files changed, 525 insertions(+), 17 deletions(-)>> + * whether the member has updated payload, or another >> + * attribute. The only way here is to wait until the most >> + * actual payload will be received from another instance. >> + * Note, that such an instance always exists - the payload >> + * originator instance. >> + */ >> + bool is_payload_up_to_date; >> + /** >> + * TTL of payload. At most this number of times payload is >> + * sent as a part of dissemination component. Reset on >> + * each payload update. >> + */ >> + int payload_ttl; > > As agreed, let's rename ttl to ttd across the board and update the > comments to say that ttd is time to disseminate. Cool, done in a separate commit. > >> +/** Update member's payload, register a corresponding event. */ >> +static inline int >> +swim_update_member_payload(struct swim *swim, struct swim_member *member, >> + const char *payload, uint16_t payload_size, >> + int incarnation_increment) > > Passing incarnation_increment raises a lot of questions when > reading the code. We already use it for address update, it simplified a lot of things, and most importantly encapsulated incarnation + TTD set into one place. I tried to avoid this argument for address, but it looked much worse, when TTD is reset in too many places - it is easy to miss something. The main goal I pursued was encapsulation of incarnation update and event registration on small attribute changes. Now I have these functions hiding that task inside: swim_update_member_inc_status swim_update_member_payload swim_update_member_addr With your way incarnation and member attribute update business will spread among literally every function touching members. > I would not use this function from the > constructor - I don't see any issue in copy-pasting 3 lines of > code into the constructor to make both branches simpler. It is not 3 lines, generally it is 6-7 lines per each swim_new_member() to call update_payload() afterward. I have 4 invocations of swim_new_member(), two of them pass payload and its size. Without encapsulated processing of payload update I would be forced to add more checks into two these two places with larger code duplication than you thought, which I would not want to do. These places are swim_upsert_member() and swim_cfg() - the functions are very sensitive to code duplication in terms of readability. During SWIM development I tried to keep them as clean and short as possible. I tried your way right now again, but it just looks much worse. Dropped. > > Or I would ban passing payload to the constructor and make this > function public. > >> static int >> swim_encode_member(struct swim_packet *packet, struct swim_member *m, >> - struct swim_passport_bin *passport) >> + struct swim_passport_bin *passport, >> + struct swim_member_payload_bin *payload_header, >> + bool is_payload_needed) > > Please rename is_payload_needed -> encode_payload. I followed our rules on flags naming: is_<...>. But ok, done. > There is no > comment for encode_payload and how it is used. Added a comment. > I don't see why you can't move all the decision making about > whether to encode the payload or not into this function. > > The payload should be encoded if: > - its not null > - payload ttl is greater than zero > - it's not trustworthy. (is_payload_up_to_date). I would btw > rename is_payload_up_to_date to is_payload_trustworthy - this > name would be closer to truth. > > Why can't you look at these conditions once in swim_encode_member > rather than evaluate them outside this function? Because anti-entropy and dissemination have different conditions when payload should be encoded. For anti-entropy it is enough for payload be up to date. For dissemination TTD > 0 additionally is required. Verbally decided to always check for is_payload_up_to_date inside the function, and use the argument encode_payload for the dissemination component only. > > BTW, there is no harm in encoding an empty payload all the time > (e.g. mp_bin of size 0). This would make the code simpler - you > would only need to look at payload_size to see if payload exists. This is what I do. I encode payload even when it is empty and up-to-date. > >> +int >> +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size) >> +{ >> + if (payload_size > MAX_PAYLOAD_SIZE) { >> + diag_set(IllegalParams, "Payload should be <= %d", >> + MAX_PAYLOAD_SIZE); >> + return -1; >> + } >> + return swim_update_member_payload(swim, swim->self, payload, >> + payload_size, 1); >> +} > > Like I said above, if there is swim_set_paylaod, there is no need > to supply payload in swim_new_member(). Swim_set_payload does not work on foreign members, it takes 'self' only. During members decoding and update I need to reset payloads of other members. > >> + def->payload_size = -1; > > Ugh. I need a way to detect whether payload was found in a packet and decoded. It is not sent always, because 1) dissemination does not send it when TTD == 0 or it is outdated, 2) anti-entropy does not send outdated payloads as well. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > A new version: =================================================================== diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 3e64e4c91..22760cdd7 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -309,6 +309,44 @@ struct swim_member { * allow other members to learn the dead status. */ int status_ttd; + /** Arbitrary user data, disseminated on each change. */ + char *payload; + /** Payload size, in bytes. */ + uint16_t payload_size; + /** + * True, if the payload is thought to be of the most + * actual version. In such a case it can be disseminated + * further. Otherwise @a payload is suspected to be + * outdated and can be updated in two cases only: + * + * 1) when it is received with a bigger incarnation from + * anywhere; + * + * 2) when it is received with the same incarnation, but + * local payload is outdated. + * + * A payload can become outdated, if anyhow a new + * incarnation of the member has been learned, but not a + * new payload. For example, a message with new payload + * could be lost, and at the same time this instance + * responds to a ping with newly incarnated ack. The ack + * receiver will learn the new incarnation, but not the + * new payload. + * + * In this case it can't be said exactly whether the + * member has updated payload, or another attribute. The + * only way here is to wait until the most actual payload + * will be received from another instance. Note, that such + * an instance always exists - the payload originator + * instance. + */ + bool is_payload_up_to_date; + /** + * TTD of payload. At most this number of times payload is + * sent as a part of dissemination component. Reset on + * each payload update. + */ + int payload_ttd; /** * All created events are put into a queue sorted by event * time. @@ -524,6 +562,34 @@ swim_by_scheduler(struct swim_scheduler *scheduler) return container_of(scheduler, struct swim, scheduler); } +/** Update member's payload, register a corresponding event. */ +static inline int +swim_update_member_payload(struct swim *swim, struct swim_member *member, + const char *payload, uint16_t payload_size, + int incarnation_increment) +{ + assert(payload_size <= MAX_PAYLOAD_SIZE); + char *new_payload; + if (payload_size > 0) { + new_payload = (char *) realloc(member->payload, payload_size); + if (new_payload == NULL) { + diag_set(OutOfMemory, payload_size, "realloc", "new_payload"); + return -1; + } + memcpy(new_payload, payload, payload_size); + } else { + free(member->payload); + new_payload = NULL; + } + member->payload = new_payload; + member->payload_size = payload_size; + member->payload_ttd = mh_size(swim->members); + member->incarnation += incarnation_increment; + member->is_payload_up_to_date = true; + swim_on_member_update(swim, member); + return 0; +} + /** * Once a ping is sent, the member should start waiting for an * ACK. @@ -557,6 +623,7 @@ swim_member_delete(struct swim_member *member) /* Dissemination component. */ assert(rlist_empty(&member->in_dissemination_queue)); + free(member->payload); free(member); } @@ -638,7 +705,7 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid) static struct swim_member * swim_new_member(struct swim *swim, const struct sockaddr_in *addr, const struct tt_uuid *uuid, enum swim_member_status status, - uint64_t incarnation) + uint64_t incarnation, const char *payload, int payload_size) { int new_bsize = sizeof(swim->shuffled[0]) * (mh_size(swim->members) + 1); @@ -676,6 +743,12 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, /* Dissemination component. */ swim_on_member_update(swim, member); + if (payload_size >= 0 && + swim_update_member_payload(swim, member, payload, + payload_size, 0) != 0) { + swim_delete_member(swim, member); + return NULL; + } say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim), swim_uuid_str(&member->uuid), mh_size(swim->members)); @@ -735,22 +808,40 @@ swim_new_round(struct swim *swim) /** * Encode one member into @a packet using @a passport structure. + * Note that this function does not make a decision whether + * payload should be encoded, because its callers have different + * conditions for that. The anti-entropy needs the payload be + * up-to-date. The dissemination component additionally needs + * TTD > 0. * @retval 0 Success, encoded. * @retval -1 Not enough memory in the packet. */ static int swim_encode_member(struct swim_packet *packet, struct swim_member *m, - struct swim_passport_bin *passport) + struct swim_passport_bin *passport, + struct swim_member_payload_bin *payload_header, + bool encode_payload) { /* The headers should be initialized. */ assert(passport->k_status == SWIM_MEMBER_STATUS); + assert(payload_header->k_payload == SWIM_MEMBER_PAYLOAD); int size = sizeof(*passport); + encode_payload = encode_payload && m->is_payload_up_to_date; + if (encode_payload) + size += sizeof(*payload_header) + m->payload_size; char *pos = swim_packet_alloc(packet, size); if (pos == NULL) return -1; swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status, - m->incarnation); + m->incarnation, encode_payload); memcpy(pos, passport, sizeof(*passport)); + if (encode_payload) { + pos += sizeof(*passport); + swim_member_payload_bin_fill(payload_header, m->payload_size); + memcpy(pos, payload_header, sizeof(*payload_header)); + pos += sizeof(*payload_header); + memcpy(pos, m->payload, m->payload_size); + } return 0; } @@ -764,17 +855,20 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet) { struct swim_anti_entropy_header_bin ae_header_bin; struct swim_passport_bin passport_bin; + struct swim_member_payload_bin payload_header; char *header = swim_packet_alloc(packet, sizeof(ae_header_bin)); if (header == NULL) return 0; swim_passport_bin_create(&passport_bin); + swim_member_payload_bin_create(&payload_header); struct mh_swim_table_t *t = swim->members; int i = 0, member_count = mh_size(t); int rnd = swim_scaled_rand(0, member_count - 1); for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t); i < member_count; ++i) { struct swim_member *m = *mh_swim_table_node(t, rc); - if (swim_encode_member(packet, m, &passport_bin) != 0) + if (swim_encode_member(packet, m, &passport_bin, + &payload_header, true) != 0) break; /* * First random member could be chosen too close @@ -834,16 +928,20 @@ static int swim_encode_dissemination(struct swim *swim, struct swim_packet *packet) { struct swim_diss_header_bin diss_header_bin; + struct swim_member_payload_bin payload_header; struct swim_passport_bin passport_bin; char *header = swim_packet_alloc(packet, sizeof(diss_header_bin)); if (header == NULL) return 0; swim_passport_bin_create(&passport_bin); + swim_member_payload_bin_create(&payload_header); int i = 0; struct swim_member *m; rlist_foreach_entry(m, &swim->dissemination_queue, in_dissemination_queue) { - if (swim_encode_member(packet, m, &passport_bin) != 0) + if (swim_encode_member(packet, m, &passport_bin, + &payload_header, + m->payload_ttd > 0) != 0) break; ++i; } @@ -891,6 +989,10 @@ swim_decrease_event_ttd(struct swim *swim) rlist_foreach_entry_safe(member, &swim->dissemination_queue, in_dissemination_queue, tmp) { + if (member->payload_ttd > 0) { + if (--member->payload_ttd == 0) + swim_cached_round_msg_invalidate(swim); + } if (--member->status_ttd == 0) { rlist_del_entry(member, in_dissemination_queue); swim_cached_round_msg_invalidate(swim); @@ -1066,8 +1168,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, { assert(member != swim->self); assert(def->incarnation >= member->incarnation); - if (def->incarnation > member->incarnation) + /* + * Payload update rules are simple: it can be updated + * either if the new payload has a bigger incarnation, or + * the same incarnation, but local payload is outdated. + */ + bool encode_payload = false; + if (def->incarnation > member->incarnation) { swim_update_member_addr(swim, member, &def->addr, 0); + if (def->payload_size >= 0) { + encode_payload = true; + } else if (member->is_payload_up_to_date) { + member->is_payload_up_to_date = false; + swim_on_member_update(swim, member); + } + } else if (! member->is_payload_up_to_date && def->payload_size >= 0) { + encode_payload = true; + } + if (encode_payload && + swim_update_member_payload(swim, member, def->payload, + def->payload_size, 0) != 0) { + /* Not such a critical error. */ + diag_log(); + } swim_update_member_inc_status(swim, member, def->status, def->incarnation); } @@ -1108,7 +1231,8 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, goto skip; } *result = swim_new_member(swim, &def->addr, &def->uuid, - def->status, def->incarnation); + def->status, def->incarnation, + def->payload, def->payload_size); return *result != NULL ? 0 : -1; } *result = member; @@ -1437,7 +1561,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, - 0); + 0, NULL, 0); if (swim->self == NULL) return -1; } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { @@ -1449,7 +1573,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } new_self = swim_new_member(swim, &swim->self->addr, uuid, - MEMBER_ALIVE, 0); + MEMBER_ALIVE, 0, swim->self->payload, + swim->self->payload_size); if (new_self == NULL) return -1; } @@ -1505,6 +1630,18 @@ swim_is_configured(const struct swim *swim) return swim->self != NULL; } +int +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size) +{ + if (payload_size > MAX_PAYLOAD_SIZE) { + diag_set(IllegalParams, "Payload should be <= %d", + MAX_PAYLOAD_SIZE); + return -1; + } + return swim_update_member_payload(swim, swim->self, payload, + payload_size, 1); +} + int swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) { @@ -1519,7 +1656,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) return -1; struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { - member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0); + member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0, + NULL, -1); return member == NULL ? -1 : 0; } diag_set(SwimError, "%s a member with such UUID already exists", @@ -1755,3 +1893,10 @@ swim_member_incarnation(const struct swim_member *member) { return member->incarnation; } + +const char * +swim_member_payload(const struct swim_member *member, uint16_t *size) +{ + *size = member->payload_size; + return member->payload; +} diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 09d933b83..6a219d131 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -104,6 +104,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, double swim_ack_timeout(const struct swim *swim); +/** Set payload to disseminate over the cluster. */ +int +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size); + /** * Stop listening and broadcasting messages, cleanup all internal * structures, free memory. @@ -220,6 +224,10 @@ swim_member_uuid(const struct swim_member *member); uint64_t swim_member_incarnation(const struct swim_member *member); +/** Member's payload. */ +const char * +swim_member_payload(const struct swim_member *member, uint16_t *size); + #if defined(__cplusplus) } #endif diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index d84550663..58c9ec119 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -160,6 +160,7 @@ swim_member_def_create(struct swim_member_def *def) memset(def, 0, sizeof(*def)); def->addr.sin_family = AF_INET; def->status = MEMBER_ALIVE; + def->payload_size = -1; } /** @@ -179,6 +180,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, struct swim_member_def *def) { uint64_t tmp; + uint32_t len; switch (key) { case SWIM_MEMBER_STATUS: if (swim_decode_uint(pos, end, &tmp, prefix, @@ -210,6 +212,17 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member incarnation") != 0) return -1; break; + case SWIM_MEMBER_PAYLOAD: + if (swim_decode_bin(&def->payload, &len, pos, end, prefix, + "member payload") != 0) + return -1; + if (len > MAX_PAYLOAD_SIZE) { + diag_set(SwimError, "%s member payload size should be "\ + "<= %d", prefix, MAX_PAYLOAD_SIZE); + return -1; + } + def->payload_size = (int) len; + break; default: unreachable(); } @@ -330,10 +343,22 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, header->v_anti_entropy = mp_bswap_u16(batch_size); } +void +swim_member_payload_bin_create(struct swim_member_payload_bin *bin) +{ + bin->k_payload = SWIM_MEMBER_PAYLOAD; + bin->m_payload_size = 0xc5; +} + +void +swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, uint16_t size) +{ + bin->v_payload_size = mp_bswap_u16(size); +} + void swim_passport_bin_create(struct swim_passport_bin *passport) { - passport->m_header = 0x85; passport->k_status = SWIM_MEMBER_STATUS; passport->k_addr = SWIM_MEMBER_ADDRESS; passport->m_addr = 0xce; @@ -350,8 +375,10 @@ void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation) + enum swim_member_status status, uint64_t incarnation, + bool encode_payload) { + passport->m_header = 0x85 + encode_payload; passport->v_status = status; passport->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr)); passport->v_port = mp_bswap_u16(ntohs(addr->sin_port)); diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index e1c70db43..23d339e80 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -38,6 +38,11 @@ #include #include "swim_constants.h" +enum { + /** Reserve 272 bytes for headers. */ + MAX_PAYLOAD_SIZE = 1200, +}; + /** * SWIM binary protocol structures and helpers. Below is a picture * of a SWIM message template: @@ -67,7 +72,8 @@ * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint | + * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | * | ], | @@ -80,7 +86,8 @@ * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint | + * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | * | ], | @@ -103,6 +110,8 @@ struct swim_member_def { struct sockaddr_in addr; uint64_t incarnation; enum swim_member_status status; + const char *payload; + int payload_size; }; /** Initialize the definition with default values. */ @@ -242,6 +251,7 @@ enum swim_member_key { SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, SWIM_MEMBER_INCARNATION, + SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, }; @@ -305,6 +315,30 @@ struct PACKED swim_passport_bin { uint64_t v_incarnation; }; +/** + * SWIM member's payload header. Payload data should be encoded + * right after it. + */ +struct PACKED swim_member_payload_bin { + /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */ + uint8_t k_payload; + /** mp_encode_bin(16bit bin header) */ + uint8_t m_payload_size; + uint16_t v_payload_size; + /** Payload data ... */ +}; + +/** Initialize payload record. */ +void +swim_member_payload_bin_create(struct swim_member_payload_bin *bin); + +/** + * Fill a previously created payload record with an actual size. + */ +void +swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, + uint16_t size); + /** Initialize a member's binary passport. */ void swim_passport_bin_create(struct swim_passport_bin *passport); @@ -319,7 +353,8 @@ void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation); + enum swim_member_status status, uint64_t incarnation, + bool encode_payload); /** }}} Anti-entropy component */ diff --git a/test/unit/swim.c b/test/unit/swim.c index 6f3871606..0b8058f0b 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -36,6 +36,7 @@ #include "uri/uri.h" #include "swim/swim.h" #include "swim/swim_ev.h" +#include "swim/swim_proto.h" #include "swim_test_transport.h" #include "swim_test_ev.h" #include "swim_test_utils.h" @@ -642,10 +643,200 @@ swim_test_broadcast(void) swim_finish_test(); } +static void +swim_test_payload_basic(void) +{ + swim_start_test(11); + uint16_t size, cluster_size = 3; + struct swim_cluster *cluster = swim_cluster_new(cluster_size); + for (int i = 0; i < cluster_size; ++i) { + for (int j = i + 1; j < cluster_size; ++j) + swim_cluster_interconnect(cluster, i, j); + } + ok(swim_cluster_member_payload(cluster, 0, 0, &size) == NULL && + size == 0, "no payload by default"); + is(swim_cluster_member_set_payload(cluster, 0, NULL, 1300), -1, + "can not set too big payload"); + ok(swim_error_check_match("Payload should be <="), "diag says too big"); + + const char *s0_payload = "S1 payload"; + uint16_t s0_payload_size = strlen(s0_payload) + 1; + is(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size), 0, + "payload is set"); + is(swim_cluster_member_incarnation(cluster, 0, 0), 1, + "incarnation is incremeted on each payload update"); + const char *tmp = swim_cluster_member_payload(cluster, 0, 0, &size); + ok(size == s0_payload_size && memcmp(s0_payload, tmp, size) == 0, + "payload is successfully obtained back"); + + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "payload is disseminated"); + s0_payload = "S1 second version of payload"; + s0_payload_size = strlen(s0_payload) + 1; + is(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size), 0, + "payload is changed"); + is(swim_cluster_member_incarnation(cluster, 0, 0), 2, + "incarnation is incremeted on each payload update"); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "second payload is disseminated"); + /* + * Test that new incarnations help to rewrite the old + * payload from anti-entropy. + */ + swim_cluster_set_drop(cluster, 0, 100); + s0_payload = "S1 third version of payload"; + s0_payload_size = strlen(s0_payload) + 1; + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size) != 0); + /* Wait at least one round until payload TTD gets 0. */ + swim_run_for(3); + swim_cluster_set_drop(cluster, 0, 0); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "third payload is disseminated via anti-entropy"); + + swim_cluster_delete(cluster); + swim_finish_test(); +} + +static void +swim_test_payload_refutation(void) +{ + swim_start_test(11); + uint16_t size, cluster_size = 3; + struct swim_cluster *cluster = swim_cluster_new(cluster_size); + swim_cluster_set_ack_timeout(cluster, 1); + for (int i = 0; i < cluster_size; ++i) { + for (int j = i + 1; j < cluster_size; ++j) + swim_cluster_interconnect(cluster, i, j); + } + const char *s0_old_payload = "s0 payload"; + uint16_t s0_old_payload_size = strlen(s0_old_payload) + 1; + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_old_payload, + s0_old_payload_size) != 0); + fail_if(swim_cluster_wait_payload_everywhere(cluster, 0, s0_old_payload, + s0_old_payload_size, + 3) != 0); + /* + * The test checks the following case. Assume there are 3 + * nodes: S1, S2, S3. They all know each other. S1 sets + * new payload, S2 and S3 knows that. They all see that S1 + * has incarnation 1 and payload P1. + * + * Now S1 changes payload to P2. Its incarnation becomes + * 2. During next entire round its round messages are + * lost, however ACKs work ok. + */ + const char *s0_new_payload = "s0 second payload"; + uint16_t s0_new_payload_size = strlen(s0_new_payload); + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_new_payload, + s0_new_payload_size) != 0); + int components[2] = {SWIM_DISSEMINATION, SWIM_ANTI_ENTROPY}; + swim_cluster_drop_components(cluster, 0, components, 2); + swim_run_for(3); + swim_cluster_drop_components(cluster, 0, NULL, 0); + + is(swim_cluster_member_incarnation(cluster, 1, 0), 2, + "S2 sees new incarnation of S1"); + is(swim_cluster_member_incarnation(cluster, 2, 0), 2, + "S3 does the same"); + + const char *tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "but S2 does not known the new payload"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "as well as S3"); + + /* Restore normal ACK timeout. */ + swim_cluster_set_ack_timeout(cluster, 30); + + /* + * Now S1's payload TTD is 0, but via ACKs S1 sent its new + * incarnation to S2 and S3. Despite that they should + * apply new S1's payload via anti-entropy. Next lines + * test that: + * + * 1) S2 can apply new S1's payload from S1's + * anti-entropy; + * + * 2) S2 will not receive the old S1's payload from S3. + * S3 knows, that its payload is outdated, and should + * not send it; + * + * 2) S3 can apply new S1's payload from S2's + * anti-entropy. Note, that here S3 applies the payload + * not directly from the originator. It is the most + * complex case. + * + * Next lines test the case (1). + */ + + /* S3 does not participate in the test (1). */ + swim_cluster_set_drop(cluster, 2, 100); + swim_run_for(3); + + tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_new_payload_size && + memcmp(tmp, s0_new_payload, size) == 0, + "S2 learned S1's payload via anti-entropy"); + is(swim_cluster_member_incarnation(cluster, 1, 0), 2, + "incarnation still is the same"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "S3 was blocked and does not know anything"); + is(swim_cluster_member_incarnation(cluster, 2, 0), 2, + "incarnation still is the same"); + + /* S1 will not participate in the tests further. */ + swim_cluster_set_drop(cluster, 0, 100); + + /* + * Now check the case (2) - S3 will not send outdated + * version of S1's payload. To maintain the experimental + * integrity S1 and S2 are silent. Only S3 sends packets. + */ + swim_cluster_set_drop(cluster, 2, 0); + swim_cluster_set_drop_out(cluster, 1, 100); + swim_run_for(3); + + tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_new_payload_size && + memcmp(tmp, s0_new_payload, size) == 0, + "S2 keeps the same new S1's payload, S3 did not rewrite it"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "S3 still does not know anything"); + + /* + * Now check the case (3) - S3 accepts new S1's payload + * from S2. Even knowing the same S1's incarnation. + */ + swim_cluster_set_drop(cluster, 1, 0); + swim_cluster_set_drop_out(cluster, 2, 100); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_new_payload, + s0_new_payload_size, 3), 0, + "S3 learns S1's payload from S2") + + swim_cluster_delete(cluster); + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(15); + swim_start_test(17); (void) ap; swim_test_ev_init(); @@ -666,6 +857,8 @@ main_f(va_list ap) swim_test_quit(); swim_test_uri_update(); swim_test_broadcast(); + swim_test_payload_basic(); + swim_test_payload_refutation(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index a90a86dd0..4b1407db3 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..15 +1..17 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -147,4 +147,34 @@ ok 14 - subtests ok 6 - fullmesh is reached, and no one link was added explicitly ok 15 - subtests *** swim_test_broadcast: done *** + *** swim_test_payload_basic *** + 1..11 + ok 1 - no payload by default + ok 2 - can not set too big payload + ok 3 - diag says too big + ok 4 - payload is set + ok 5 - incarnation is incremeted on each payload update + ok 6 - payload is successfully obtained back + ok 7 - payload is disseminated + ok 8 - payload is changed + ok 9 - incarnation is incremeted on each payload update + ok 10 - second payload is disseminated + ok 11 - third payload is disseminated via anti-entropy +ok 16 - subtests + *** swim_test_payload_basic: done *** + *** swim_test_payload_refutation *** + 1..11 + ok 1 - S2 sees new incarnation of S1 + ok 2 - S3 does the same + ok 3 - but S2 does not known the new payload + ok 4 - as well as S3 + ok 5 - S2 learned S1's payload via anti-entropy + ok 6 - incarnation still is the same + ok 7 - S3 was blocked and does not know anything + ok 8 - incarnation still is the same + ok 9 - S2 keeps the same new S1's payload, S3 did not rewrite it + ok 10 - S3 still does not know anything + ok 11 - S3 learns S1's payload from S2 +ok 17 - subtests + *** swim_test_payload_refutation: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index fd528d166..45570cce5 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -191,6 +191,27 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, return swim_member_incarnation(m); } +const char * +swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, + int member_id, uint16_t *size) +{ + const struct swim_member *m = + swim_cluster_member_view(cluster, node_id, member_id); + if (m == NULL) { + *size = 0; + return NULL; + } + return swim_member_payload(m, size); +} + +int +swim_cluster_member_set_payload(struct swim_cluster *cluster, int i, + const char *payload, uint16_t size) +{ + struct swim *s = swim_cluster_node(cluster, i); + return swim_set_payload(s, payload, size); +} + struct swim * swim_cluster_node(struct swim_cluster *cluster, int i) { @@ -506,6 +527,13 @@ struct swim_member_template { */ bool need_check_incarnation; uint64_t incarnation; + /** + * True, if the payload should be checked to be equal to + * @a payload of size @a payload_size. + */ + bool need_check_payload; + const char *payload; + uint16_t payload_size; }; /** Build member template. No checks are set. */ @@ -542,6 +570,19 @@ swim_member_template_set_incarnation(struct swim_member_template *t, t->incarnation = incarnation; } +/** + * Set that the member template should be used to check member + * status. + */ +static inline void +swim_member_template_set_payload(struct swim_member_template *t, + const char *payload, uint16_t payload_size) +{ + t->need_check_payload = true; + t->payload = payload; + t->payload_size = payload_size; +} + /** Callback to check that a member matches a template. */ static bool swim_loop_check_member(struct swim_cluster *cluster, void *data) @@ -551,17 +592,26 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) swim_cluster_member_view(cluster, t->node_id, t->member_id); enum swim_member_status status; uint64_t incarnation; + const char *payload; + uint16_t payload_size; if (m != NULL) { status = swim_member_status(m); incarnation = swim_member_incarnation(m); + payload = swim_member_payload(m, &payload_size); } else { status = swim_member_status_MAX; incarnation = 0; + payload = NULL; + payload_size = 0; } if (t->need_check_status && status != t->status) return false; if (t->need_check_incarnation && incarnation != t->incarnation) return false; + if (t->need_check_payload && + (payload_size != t->payload_size || + memcmp(payload, t->payload, payload_size) != 0)) + return false; return true; } @@ -644,6 +694,18 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, swim_loop_check_member_everywhere, &t); } +int +swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, + int member_id, const char *payload, + uint16_t payload_size, double timeout) +{ + struct swim_member_template t; + swim_member_template_create(&t, -1, member_id); + swim_member_template_set_payload(&t, payload, payload_size); + return swim_wait_timeout(timeout, cluster, + swim_loop_check_member_everywhere, &t); +} + bool swim_error_check_match(const char *msg) { diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 6ea136e36..100a67e0c 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -141,6 +141,14 @@ uint64_t swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, int member_id); +const char * +swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, + int member_id, uint16_t *size); + +int +swim_cluster_member_set_payload(struct swim_cluster *cluster, int i, + const char *payload, uint16_t size); + /** * Check if in the cluster every instance knowns the about other * instances. @@ -192,6 +200,16 @@ swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, int member_id, uint64_t incarnation, double timeout); +/** + * Wait until a member with id @a member_id is seen with + * @a payload of size @a payload_size in the membership table of + * every instance in @a cluster. At most @a timeout seconds. + */ +int +swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, + int member_id, const char *payload, + uint16_t payload_size, double timeout); + /** Process SWIM events for @a duration fake seconds. */ void swim_run_for(double duration);