From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Subject: [tarantool-patches] [PATCH 6/6] swim: introduce payload Date: Fri, 12 Apr 2019 01:22:30 +0300 [thread overview] Message-ID: <04b17677a6e24d46587f718733000f426e1176e7.1555021137.git.v.shpilevoy@tarantool.org> (raw) In-Reply-To: <cover.1555021137.git.v.shpilevoy@tarantool.org> In-Reply-To: <cover.1555021137.git.v.shpilevoy@tarantool.org> Payload is arbitrary user data disseminated over the cluster along with other member attributes. Part of #3234 --- src/lib/swim/swim.c | 155 ++++++++++++++++++++++++++-- src/lib/swim/swim.h | 8 ++ src/lib/swim/swim_proto.c | 31 +++++- src/lib/swim/swim_proto.h | 41 +++++++- test/unit/swim.c | 195 +++++++++++++++++++++++++++++++++++- test/unit/swim.result | 32 +++++- test/unit/swim_test_utils.c | 62 ++++++++++++ test/unit/swim_test_utils.h | 18 ++++ 8 files changed, 525 insertions(+), 17 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 2dac6eedd..2be1c846b 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -308,6 +308,38 @@ struct swim_member { * allow other members to learn the dead status. */ int status_ttl; + /** Arbitrary user data, disseminated on each change. */ + char *payload; + /** Payload size, in bytes. */ + uint16_t payload_size; + /** + * True, if the payload is thought to be of the most + * actual version. In such a case it can be disseminated + * farther. Otherwise @a payload is suspected to be + * outdated and can be updated in two cases only: + * + * 1) when it is received with a bigger incarnation from + * anywhere; + * + * 2) when it is received with the same incarnation, but + * local payload is outdated. + * + * A payload can become outdated, if anyhow a new + * incarnation of the member has been learned, but not a + * new payload. In such a case it can't be said exactly + * whether the member has updated payload, or another + * attribute. The only way here is to wait until the most + * actual payload will be received from another instance. + * Note, that such an instance always exists - the payload + * originator instance. + */ + bool is_payload_up_to_date; + /** + * TTL of payload. At most this number of times payload is + * sent as a part of dissemination component. Reset on + * each payload update. + */ + int payload_ttl; /** * All created events are put into a queue sorted by event * time. @@ -523,6 +555,34 @@ swim_by_scheduler(struct swim_scheduler *scheduler) return container_of(scheduler, struct swim, scheduler); } +/** Update member's payload, register a corresponding event. */ +static inline int +swim_update_member_payload(struct swim *swim, struct swim_member *member, + const char *payload, uint16_t payload_size, + int incarnation_increment) +{ + assert(payload_size <= MAX_PAYLOAD_SIZE); + char *new_payload; + if (payload_size > 0) { + new_payload = (char *) realloc(member->payload, payload_size); + if (new_payload == NULL) { + diag_set(OutOfMemory, payload_size, "realloc", "new_payload"); + return -1; + } + memcpy(new_payload, payload, payload_size); + } else { + free(member->payload); + new_payload = NULL; + } + member->payload = new_payload; + member->payload_size = payload_size; + member->payload_ttl = mh_size(swim->members); + member->incarnation += incarnation_increment; + member->is_payload_up_to_date = true; + swim_on_member_update(swim, member); + return 0; +} + /** * Once a ping is sent, the member should start waiting for an * ACK. @@ -556,6 +616,7 @@ swim_member_delete(struct swim_member *member) /* Dissemination component. */ assert(rlist_empty(&member->in_dissemination_queue)); + free(member->payload); free(member); } @@ -637,7 +698,7 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid) static struct swim_member * swim_new_member(struct swim *swim, const struct sockaddr_in *addr, const struct tt_uuid *uuid, enum swim_member_status status, - uint64_t incarnation) + uint64_t incarnation, const char *payload, int payload_size) { int new_bsize = sizeof(swim->shuffled[0]) * (mh_size(swim->members) + 1); @@ -675,6 +736,12 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, /* Dissemination component. */ swim_on_member_update(swim, member); + if (payload_size >= 0 && + swim_update_member_payload(swim, member, payload, + payload_size, 0) != 0) { + swim_delete_member(swim, member); + return NULL; + } say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim), swim_uuid_str(&member->uuid), mh_size(swim->members)); @@ -739,17 +806,29 @@ swim_new_round(struct swim *swim) */ static int swim_encode_member(struct swim_packet *packet, struct swim_member *m, - struct swim_passport_bin *passport) + struct swim_passport_bin *passport, + struct swim_member_payload_bin *payload_header, + bool is_payload_needed) { /* The headers should be initialized. */ assert(passport->k_status == SWIM_MEMBER_STATUS); + assert(payload_header->k_payload == SWIM_MEMBER_PAYLOAD); int size = sizeof(*passport); + if (is_payload_needed) + size += sizeof(*payload_header) + m->payload_size; char *pos = swim_packet_alloc(packet, size); if (pos == NULL) return -1; swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status, - m->incarnation); + m->incarnation, is_payload_needed); memcpy(pos, passport, sizeof(*passport)); + if (is_payload_needed) { + pos += sizeof(*passport); + swim_member_payload_bin_fill(payload_header, m->payload_size); + memcpy(pos, payload_header, sizeof(*payload_header)); + pos += sizeof(*payload_header); + memcpy(pos, m->payload, m->payload_size); + } return 0; } @@ -763,17 +842,21 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet) { struct swim_anti_entropy_header_bin ae_header_bin; struct swim_passport_bin passport_bin; + struct swim_member_payload_bin payload_header; char *header = swim_packet_alloc(packet, sizeof(ae_header_bin)); if (header == NULL) return 0; swim_passport_bin_create(&passport_bin); + swim_member_payload_bin_create(&payload_header); struct mh_swim_table_t *t = swim->members; int i = 0, member_count = mh_size(t); int rnd = swim_scaled_rand(0, member_count - 1); for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t); i < member_count; ++i) { struct swim_member *m = *mh_swim_table_node(t, rc); - if (swim_encode_member(packet, m, &passport_bin) != 0) + if (swim_encode_member(packet, m, &passport_bin, + &payload_header, + m->is_payload_up_to_date) != 0) break; /* * First random member could be chosen too close @@ -833,16 +916,21 @@ static int swim_encode_dissemination(struct swim *swim, struct swim_packet *packet) { struct swim_diss_header_bin diss_header_bin; + struct swim_member_payload_bin payload_header; struct swim_passport_bin passport_bin; char *header = swim_packet_alloc(packet, sizeof(diss_header_bin)); if (header == NULL) return 0; swim_passport_bin_create(&passport_bin); + swim_member_payload_bin_create(&payload_header); int i = 0; struct swim_member *m; rlist_foreach_entry(m, &swim->dissemination_queue, in_dissemination_queue) { - if (swim_encode_member(packet, m, &passport_bin) != 0) + bool is_payload_needed = m->payload_ttl > 0 && + m->is_payload_up_to_date; + if (swim_encode_member(packet, m, &passport_bin, + &payload_header, is_payload_needed) != 0) break; ++i; } @@ -890,6 +978,10 @@ swim_decrease_event_ttl(struct swim *swim) rlist_foreach_entry_safe(member, &swim->dissemination_queue, in_dissemination_queue, tmp) { + if (member->payload_ttl > 0) { + if (--member->payload_ttl == 0) + swim_cached_round_msg_invalidate(swim); + } if (--member->status_ttl == 0) { rlist_del_entry(member, in_dissemination_queue); swim_cached_round_msg_invalidate(swim); @@ -1065,8 +1157,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, { assert(member != swim->self); assert(def->incarnation >= member->incarnation); - if (def->incarnation > member->incarnation) + /* + * Payload update rules are simple: it can be updated + * either if the new payload has a bigger incarnation, or + * the same incarnation, but local payload is outdated. + */ + bool is_payload_needed = false; + if (def->incarnation > member->incarnation) { swim_update_member_addr(swim, member, &def->addr, 0); + if (def->payload_size >= 0) { + is_payload_needed = true; + } else if (member->is_payload_up_to_date) { + member->is_payload_up_to_date = false; + swim_on_member_update(swim, member); + } + } else if (! member->is_payload_up_to_date && def->payload_size >= 0) { + is_payload_needed = true; + } + if (is_payload_needed && + swim_update_member_payload(swim, member, def->payload, + def->payload_size, 0) != 0) { + /* Not such a critical error. */ + diag_log(); + } swim_update_member_inc_status(swim, member, def->status, def->incarnation); } @@ -1107,7 +1220,8 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, goto skip; } *result = swim_new_member(swim, &def->addr, &def->uuid, - def->status, def->incarnation); + def->status, def->incarnation, + def->payload, def->payload_size); return *result != NULL ? 0 : -1; } *result = member; @@ -1436,7 +1550,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, - 0); + 0, NULL, 0); if (swim->self == NULL) return -1; } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { @@ -1448,7 +1562,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } new_self = swim_new_member(swim, &swim->self->addr, uuid, - MEMBER_ALIVE, 0); + MEMBER_ALIVE, 0, swim->self->payload, + swim->self->payload_size); if (new_self == NULL) return -1; } @@ -1504,6 +1619,18 @@ swim_is_configured(const struct swim *swim) return swim->self != NULL; } +int +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size) +{ + if (payload_size > MAX_PAYLOAD_SIZE) { + diag_set(IllegalParams, "Payload should be <= %d", + MAX_PAYLOAD_SIZE); + return -1; + } + return swim_update_member_payload(swim, swim->self, payload, + payload_size, 1); +} + int swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) { @@ -1518,7 +1645,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) return -1; struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { - member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0); + member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0, + NULL, -1); return member == NULL ? -1 : 0; } diag_set(SwimError, "%s a member with such UUID already exists", @@ -1754,3 +1882,10 @@ swim_member_incarnation(const struct swim_member *member) { return member->incarnation; } + +const char * +swim_member_payload(const struct swim_member *member, uint16_t *size) +{ + *size = member->payload_size; + return member->payload; +} diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 09d933b83..6a219d131 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -104,6 +104,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, double swim_ack_timeout(const struct swim *swim); +/** Set payload to disseminate over the cluster. */ +int +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size); + /** * Stop listening and broadcasting messages, cleanup all internal * structures, free memory. @@ -220,6 +224,10 @@ swim_member_uuid(const struct swim_member *member); uint64_t swim_member_incarnation(const struct swim_member *member); +/** Member's payload. */ +const char * +swim_member_payload(const struct swim_member *member, uint16_t *size); + #if defined(__cplusplus) } #endif diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index d84550663..16034a25b 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -160,6 +160,7 @@ swim_member_def_create(struct swim_member_def *def) memset(def, 0, sizeof(*def)); def->addr.sin_family = AF_INET; def->status = MEMBER_ALIVE; + def->payload_size = -1; } /** @@ -179,6 +180,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, struct swim_member_def *def) { uint64_t tmp; + uint32_t len; switch (key) { case SWIM_MEMBER_STATUS: if (swim_decode_uint(pos, end, &tmp, prefix, @@ -210,6 +212,17 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member incarnation") != 0) return -1; break; + case SWIM_MEMBER_PAYLOAD: + if (swim_decode_bin(&def->payload, &len, pos, end, prefix, + "member payload") != 0) + return -1; + if (len > MAX_PAYLOAD_SIZE) { + diag_set(SwimError, "%s member payload size should be "\ + "<= %d", prefix, MAX_PAYLOAD_SIZE); + return -1; + } + def->payload_size = (int) len; + break; default: unreachable(); } @@ -330,10 +343,22 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, header->v_anti_entropy = mp_bswap_u16(batch_size); } +void +swim_member_payload_bin_create(struct swim_member_payload_bin *bin) +{ + bin->k_payload = SWIM_MEMBER_PAYLOAD; + bin->m_payload_size = 0xc5; +} + +void +swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, uint16_t size) +{ + bin->v_payload_size = mp_bswap_u16(size); +} + void swim_passport_bin_create(struct swim_passport_bin *passport) { - passport->m_header = 0x85; passport->k_status = SWIM_MEMBER_STATUS; passport->k_addr = SWIM_MEMBER_ADDRESS; passport->m_addr = 0xce; @@ -350,8 +375,10 @@ void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation) + enum swim_member_status status, uint64_t incarnation, + bool is_payload_needed) { + passport->m_header = 0x85 + is_payload_needed; passport->v_status = status; passport->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr)); passport->v_port = mp_bswap_u16(ntohs(addr->sin_port)); diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index ab4057185..607ac80dd 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -38,6 +38,11 @@ #include <stdbool.h> #include "swim_constants.h" +enum { + /** Reserve 272 bytes for headers. */ + MAX_PAYLOAD_SIZE = 1200, +}; + /** * SWIM binary protocol structures and helpers. Below is a picture * of a SWIM message template: @@ -67,7 +72,8 @@ * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint | + * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | * | ], | @@ -80,7 +86,8 @@ * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint | + * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | * | ], | @@ -103,6 +110,8 @@ struct swim_member_def { struct sockaddr_in addr; uint64_t incarnation; enum swim_member_status status; + const char *payload; + int payload_size; }; /** Initialize the definition with default values. */ @@ -242,6 +251,7 @@ enum swim_member_key { SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, SWIM_MEMBER_INCARNATION, + SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, }; @@ -305,6 +315,30 @@ struct PACKED swim_passport_bin { uint64_t v_incarnation; }; +/** + * SWIM member's payload header. Payload data should be encoded + * right after it. + */ +struct PACKED swim_member_payload_bin { + /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */ + uint8_t k_payload; + /** mp_encode_bin(16bit bin header) */ + uint8_t m_payload_size; + uint16_t v_payload_size; + /** Payload data ... */ +}; + +/** Initialize payload record. */ +void +swim_member_payload_bin_create(struct swim_member_payload_bin *bin); + +/** + * Fill a previously created payload record with an actual size. + */ +void +swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, + uint16_t size); + /** Initialize a member's binary passport. */ void swim_passport_bin_create(struct swim_passport_bin *passport); @@ -319,7 +353,8 @@ void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation); + enum swim_member_status status, uint64_t incarnation, + bool is_payload_needed); /** }}} Anti-entropy component */ diff --git a/test/unit/swim.c b/test/unit/swim.c index 48aea2f07..10d2b0bbd 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -36,6 +36,7 @@ #include "uri/uri.h" #include "swim/swim.h" #include "swim/swim_ev.h" +#include "swim/swim_proto.h" #include "swim_test_transport.h" #include "swim_test_ev.h" #include "swim_test_utils.h" @@ -642,10 +643,200 @@ swim_test_broadcast(void) swim_finish_test(); } +static void +swim_test_payload_basic(void) +{ + swim_start_test(11); + uint16_t size, cluster_size = 3; + struct swim_cluster *cluster = swim_cluster_new(cluster_size); + for (int i = 0; i < cluster_size; ++i) { + for (int j = i + 1; j < cluster_size; ++j) + swim_cluster_interconnect(cluster, i, j); + } + ok(swim_cluster_member_payload(cluster, 0, 0, &size) == NULL && + size == 0, "no payload by default"); + is(swim_cluster_member_set_payload(cluster, 0, NULL, 1300), -1, + "can not set too big payload"); + ok(swim_error_check_match("Payload should be <="), "diag says too big"); + + const char *s0_payload = "S1 payload"; + uint16_t s0_payload_size = strlen(s0_payload) + 1; + is(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size), 0, + "payload is set"); + is(swim_cluster_member_incarnation(cluster, 0, 0), 1, + "incarnation is incremeted on each payload update"); + const char *tmp = swim_cluster_member_payload(cluster, 0, 0, &size); + ok(size == s0_payload_size && memcmp(s0_payload, tmp, size) == 0, + "payload is successfully obtained back"); + + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "payload is disseminated"); + s0_payload = "S1 second version of payload"; + s0_payload_size = strlen(s0_payload) + 1; + is(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size), 0, + "payload is changed"); + is(swim_cluster_member_incarnation(cluster, 0, 0), 2, + "incarnation is incremeted on each payload update"); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "second payload is disseminated"); + /* + * Test that new incarnations help to rewrite the old + * payload from anti-entropy. + */ + swim_cluster_set_drop(cluster, 0, 100); + s0_payload = "S1 third version of payload"; + s0_payload_size = strlen(s0_payload) + 1; + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_payload, + s0_payload_size) != 0); + /* Wait at least one round until payload TTL gets 0. */ + swim_run_for(3); + swim_cluster_set_drop(cluster, 0, 0); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, + s0_payload_size, cluster_size), + 0, "third payload is disseminated via anti-entropy"); + + swim_cluster_delete(cluster); + swim_finish_test(); +} + +static void +swim_test_payload_refutation(void) +{ + swim_start_test(11); + uint16_t size, cluster_size = 3; + struct swim_cluster *cluster = swim_cluster_new(cluster_size); + swim_cluster_set_ack_timeout(cluster, 1); + for (int i = 0; i < cluster_size; ++i) { + for (int j = i + 1; j < cluster_size; ++j) + swim_cluster_interconnect(cluster, i, j); + } + const char *s0_old_payload = "s0 payload"; + uint16_t s0_old_payload_size = strlen(s0_old_payload) + 1; + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_old_payload, + s0_old_payload_size) != 0); + fail_if(swim_cluster_wait_payload_everywhere(cluster, 0, s0_old_payload, + s0_old_payload_size, + 3) != 0); + /* + * The test checks the following case. Assume there are 3 + * nodes: S1, S2, S3. They all know each other. S1 sets + * new payload, S2 and S3 knows that. They all see that S1 + * has incarnation 1 and payload P1. + * + * Now S1 changes payload to P2. Its incarnation becomes + * 2. During next entire round its round messages are + * lost, however ACKs work ok. + */ + const char *s0_new_payload = "s0 second payload"; + uint16_t s0_new_payload_size = strlen(s0_new_payload); + fail_if(swim_cluster_member_set_payload(cluster, 0, s0_new_payload, + s0_new_payload_size) != 0); + int components[2] = {SWIM_DISSEMINATION, SWIM_ANTI_ENTROPY}; + swim_cluster_drop_components(cluster, 0, components, 2); + swim_run_for(3); + swim_cluster_drop_components(cluster, 0, NULL, 0); + + is(swim_cluster_member_incarnation(cluster, 1, 0), 2, + "S2 sees new incarnation of S1"); + is(swim_cluster_member_incarnation(cluster, 2, 0), 2, + "S3 does the same"); + + const char *tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "but S2 does not known the new payload"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "as well as S3"); + + /* Restore normal ACK timeout. */ + swim_cluster_set_ack_timeout(cluster, 30); + + /* + * Now S1's payload TTL is 0, but via ACKs S1 sent its new + * incarnation to S2 and S3. Despite that they should + * apply new S1's payload via anti-entropy. Next lines + * test that: + * + * 1) S2 can apply new S1's payload from S1's + * anti-entropy; + * + * 2) S2 will not receive the old S1's payload from S3. + * S3 knows, that its payload is outdated, and should + * not send it; + * + * 2) S3 can apply new S1's payload from S2's + * anti-entropy. Note, that here S3 applies the payload + * not directly from the originator. It is the most + * complex case. + * + * Next lines test the case (1). + */ + + /* S3 does not participate in the test (1). */ + swim_cluster_set_drop(cluster, 2, 100); + swim_run_for(3); + + tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_new_payload_size && + memcmp(tmp, s0_new_payload, size) == 0, + "S2 learned S1's payload via anti-entropy"); + is(swim_cluster_member_incarnation(cluster, 1, 0), 2, + "incarnation still is the same"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "S3 was blocked and does not know anything"); + is(swim_cluster_member_incarnation(cluster, 2, 0), 2, + "incarnation still is the same"); + + /* S1 will not participate in the tests further. */ + swim_cluster_set_drop(cluster, 0, 100); + + /* + * Now check the case (2) - S3 will not send outdated + * version of S1's payload. To maintain the experimental + * integrity S1 and S2 are silent. Only S3 sends packets. + */ + swim_cluster_set_drop(cluster, 2, 0); + swim_cluster_set_drop_out(cluster, 1, 100); + swim_run_for(3); + + tmp = swim_cluster_member_payload(cluster, 1, 0, &size); + ok(size == s0_new_payload_size && + memcmp(tmp, s0_new_payload, size) == 0, + "S2 keeps the same new S1's payload, S3 did not rewrite it"); + + tmp = swim_cluster_member_payload(cluster, 2, 0, &size); + ok(size == s0_old_payload_size && + memcmp(tmp, s0_old_payload, size) == 0, + "S3 still does not know anything"); + + /* + * Now check the case (3) - S3 accepts new S1's payload + * from S2. Even knowing the same S1's incarnation. + */ + swim_cluster_set_drop(cluster, 1, 0); + swim_cluster_set_drop_out(cluster, 2, 100); + is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_new_payload, + s0_new_payload_size, 3), 0, + "S3 learns S1's payload from S2") + + swim_cluster_delete(cluster); + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(15); + swim_start_test(17); (void) ap; swim_test_ev_init(); @@ -666,6 +857,8 @@ main_f(va_list ap) swim_test_quit(); swim_test_uri_update(); swim_test_broadcast(); + swim_test_payload_basic(); + swim_test_payload_refutation(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index d315f181f..8b0ab54aa 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..15 +1..17 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -147,4 +147,34 @@ ok 14 - subtests ok 6 - fullmesh is reached, and no one link was added explicitly ok 15 - subtests *** swim_test_broadcast: done *** + *** swim_test_payload_basic *** + 1..11 + ok 1 - no payload by default + ok 2 - can not set too big payload + ok 3 - diag says too big + ok 4 - payload is set + ok 5 - incarnation is incremeted on each payload update + ok 6 - payload is successfully obtained back + ok 7 - payload is disseminated + ok 8 - payload is changed + ok 9 - incarnation is incremeted on each payload update + ok 10 - second payload is disseminated + ok 11 - third payload is disseminated via anti-entropy +ok 16 - subtests + *** swim_test_payload_basic: done *** + *** swim_test_payload_refutation *** + 1..11 + ok 1 - S2 sees new incarnation of S1 + ok 2 - S3 does the same + ok 3 - but S2 does not known the new payload + ok 4 - as well as S3 + ok 5 - S2 learned S1's payload via anti-entropy + ok 6 - incarnation still is the same + ok 7 - S3 was blocked and does not know anything + ok 8 - incarnation still is the same + ok 9 - S2 keeps the same new S1's payload, S3 did not rewrite it + ok 10 - S3 still does not know anything + ok 11 - S3 learns S1's payload from S2 +ok 17 - subtests + *** swim_test_payload_refutation: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index fd528d166..45570cce5 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -191,6 +191,27 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, return swim_member_incarnation(m); } +const char * +swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, + int member_id, uint16_t *size) +{ + const struct swim_member *m = + swim_cluster_member_view(cluster, node_id, member_id); + if (m == NULL) { + *size = 0; + return NULL; + } + return swim_member_payload(m, size); +} + +int +swim_cluster_member_set_payload(struct swim_cluster *cluster, int i, + const char *payload, uint16_t size) +{ + struct swim *s = swim_cluster_node(cluster, i); + return swim_set_payload(s, payload, size); +} + struct swim * swim_cluster_node(struct swim_cluster *cluster, int i) { @@ -506,6 +527,13 @@ struct swim_member_template { */ bool need_check_incarnation; uint64_t incarnation; + /** + * True, if the payload should be checked to be equal to + * @a payload of size @a payload_size. + */ + bool need_check_payload; + const char *payload; + uint16_t payload_size; }; /** Build member template. No checks are set. */ @@ -542,6 +570,19 @@ swim_member_template_set_incarnation(struct swim_member_template *t, t->incarnation = incarnation; } +/** + * Set that the member template should be used to check member + * status. + */ +static inline void +swim_member_template_set_payload(struct swim_member_template *t, + const char *payload, uint16_t payload_size) +{ + t->need_check_payload = true; + t->payload = payload; + t->payload_size = payload_size; +} + /** Callback to check that a member matches a template. */ static bool swim_loop_check_member(struct swim_cluster *cluster, void *data) @@ -551,17 +592,26 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) swim_cluster_member_view(cluster, t->node_id, t->member_id); enum swim_member_status status; uint64_t incarnation; + const char *payload; + uint16_t payload_size; if (m != NULL) { status = swim_member_status(m); incarnation = swim_member_incarnation(m); + payload = swim_member_payload(m, &payload_size); } else { status = swim_member_status_MAX; incarnation = 0; + payload = NULL; + payload_size = 0; } if (t->need_check_status && status != t->status) return false; if (t->need_check_incarnation && incarnation != t->incarnation) return false; + if (t->need_check_payload && + (payload_size != t->payload_size || + memcmp(payload, t->payload, payload_size) != 0)) + return false; return true; } @@ -644,6 +694,18 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, swim_loop_check_member_everywhere, &t); } +int +swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, + int member_id, const char *payload, + uint16_t payload_size, double timeout) +{ + struct swim_member_template t; + swim_member_template_create(&t, -1, member_id); + swim_member_template_set_payload(&t, payload, payload_size); + return swim_wait_timeout(timeout, cluster, + swim_loop_check_member_everywhere, &t); +} + bool swim_error_check_match(const char *msg) { diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 6ea136e36..100a67e0c 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -141,6 +141,14 @@ uint64_t swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, int member_id); +const char * +swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, + int member_id, uint16_t *size); + +int +swim_cluster_member_set_payload(struct swim_cluster *cluster, int i, + const char *payload, uint16_t size); + /** * Check if in the cluster every instance knowns the about other * instances. @@ -192,6 +200,16 @@ swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, int member_id, uint64_t incarnation, double timeout); +/** + * Wait until a member with id @a member_id is seen with + * @a payload of size @a payload_size in the membership table of + * every instance in @a cluster. At most @a timeout seconds. + */ +int +swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster, + int member_id, const char *payload, + uint16_t payload_size, double timeout); + /** Process SWIM events for @a duration fake seconds. */ void swim_run_for(double duration); -- 2.17.2 (Apple Git-113)
next prev parent reply other threads:[~2019-04-11 22:22 UTC|newest] Thread overview: 27+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-04-11 22:22 [tarantool-patches] [PATCH 0/6] swim payload Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 1/6] swim: factor out MP_BIN decoding from swim_decode_uuid Vladislav Shpilevoy 2019-04-11 23:09 ` [tarantool-patches] " Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 2/6] swim: replace event_bin and member_bin with the passport Vladislav Shpilevoy 2019-04-11 23:10 ` [tarantool-patches] " Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 3/6] swim: factor out 'update' part of swim_member_upsert() Vladislav Shpilevoy 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 4/6] test: generalize SWIM fake descriptor filters Vladislav Shpilevoy 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 2019-04-11 22:22 ` [tarantool-patches] [PATCH 5/6] test: introduce new SWIM packet filter by component names Vladislav Shpilevoy 2019-04-11 23:11 ` [tarantool-patches] " Konstantin Osipov 2019-04-12 19:23 ` Vladislav Shpilevoy 2019-04-11 22:22 ` Vladislav Shpilevoy [this message] 2019-04-18 15:12 ` [tarantool-patches] Re: [PATCH 6/6] swim: introduce payload Konstantin Osipov 2019-04-18 17:43 ` Vladislav Shpilevoy 2019-04-18 18:03 ` Konstantin Osipov 2019-04-18 20:40 ` Vladislav Shpilevoy 2019-04-18 17:43 ` [tarantool-patches] [PATCH 5.5/6] swim: rename TTL to TTD Vladislav Shpilevoy 2019-04-18 17:48 ` [tarantool-patches] " Konstantin Osipov 2019-04-18 20:40 ` Vladislav Shpilevoy 2019-04-18 18:16 ` [tarantool-patches] [PATCH 7/6] swim: drop incarnation_inc parameter from update() routines Vladislav Shpilevoy 2019-04-18 18:20 ` [tarantool-patches] " Konstantin Osipov 2019-04-18 20:40 ` Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=04b17677a6e24d46587f718733000f426e1176e7.1555021137.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=kostja@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH 6/6] swim: introduce payload' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox