From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id BA6B530EA6 for ; Sat, 22 Jun 2019 17:17:51 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 1I4hU7qTob-A for ; Sat, 22 Jun 2019 17:17:51 -0400 (EDT) Received: from smtp21.mail.ru (smtp21.mail.ru [94.100.179.250]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 305F630E96 for ; Sat, 22 Jun 2019 17:17:51 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH v2 1/2] swim: make incarnation struct Date: Sat, 22 Jun 2019 23:18:20 +0200 Message-Id: <51c439eb0ff8bb7ff46306fde45b4d8f166f3fc5.1561238125.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Traditional SWIM describes member version as incarnation - volatile monotonically growing number to refute false gossips. But it is not enough in the real world because of necessity to detect restarts and refute information from previous lifes of an instance. Incarnation is going to be a two-part value with persistent upper part and volatile lower part. This patch does preparations making incarnation struct instead of a number. Volatile part is called 'version. Part of #4280 --- src/lib/swim/swim.c | 150 ++++++++++++++++++++++------------ src/lib/swim/swim.h | 7 +- src/lib/swim/swim_constants.h | 32 ++++++++ src/lib/swim/swim_proto.c | 78 ++++++++++++------ src/lib/swim/swim_proto.h | 65 +++++++++------ src/lua/swim.lua | 28 ++++++- test/swim/swim.result | 59 ++++++------- test/unit/swim.c | 48 +++++------ test/unit/swim.result | 19 +++-- test/unit/swim_test_utils.c | 26 +++--- test/unit/swim_test_utils.h | 6 +- 11 files changed, 340 insertions(+), 178 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 2b37d41e0..2c3cfa9bc 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -213,6 +213,31 @@ swim_uuid_hash(const struct tt_uuid *uuid) return mh_strn_hash((const char *) uuid, UUID_LEN); } +/** + * Compare two incarnation values and collect their diff into + * @a diff out parameter. The difference is used to fire triggers. + */ +static inline int +swim_incarnation_diff(const struct swim_incarnation *l, + const struct swim_incarnation *r, + enum swim_ev_mask *diff) +{ + if (l->version == r->version) { + *diff = 0; + return 0; + } + *diff = SWIM_EV_NEW_VERSION; + return l->version < r->version ? -1 : 1; +} + +int +swim_incarnation_cmp(const struct swim_incarnation *l, + const struct swim_incarnation *r) +{ + enum swim_ev_mask unused; + return swim_incarnation_diff(l, r, &unused); +} + /** * A cluster member description. This structure describes the * last known state of an instance. This state is updated @@ -353,11 +378,11 @@ struct swim_member { * Failure detection component */ /** - * A monotonically growing number to refute old member's + * A monotonically growing value to refute old member's * state, characterized by a triplet * {incarnation, status, address}. */ - uint64_t incarnation; + struct swim_incarnation incarnation; /** * How many recent pings did not receive an ack while the * member was in the current status. When this number @@ -631,7 +656,7 @@ swim_has_pending_events(struct swim *swim) static inline void swim_update_member_inc_status(struct swim *swim, struct swim_member *member, enum swim_member_status new_status, - uint64_t incarnation) + const struct swim_incarnation *incarnation) { /* * Source of truth about self is this instance and it is @@ -639,16 +664,17 @@ swim_update_member_inc_status(struct swim *swim, struct swim_member *member, * separately. */ assert(member != swim->self); - if (member->incarnation < incarnation) { - enum swim_ev_mask events = SWIM_EV_NEW_INCARNATION; + enum swim_ev_mask events; + int cmp = swim_incarnation_diff(&member->incarnation, incarnation, + &events); + if (cmp < 0) { if (new_status != member->status) { events |= SWIM_EV_NEW_STATUS; member->status = new_status; } - member->incarnation = incarnation; + member->incarnation = *incarnation; swim_on_member_update(swim, member, events); - } else if (member->incarnation == incarnation && - member->status < new_status) { + } else if (cmp == 0 && member->status < new_status) { member->status = new_status; swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS); } @@ -760,7 +786,8 @@ swim_member_delete(struct swim_member *member) /** Create a new member. It is not registered anywhere here. */ static struct swim_member * swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation) + enum swim_member_status status, + const struct swim_incarnation *incarnation) { struct swim_member *member = (struct swim_member *) calloc(1, sizeof(*member)); @@ -776,7 +803,7 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid, rlist_create(&member->in_round_queue); /* Failure detection component. */ - member->incarnation = incarnation; + member->incarnation = *incarnation; heap_node_create(&member->in_wait_ack_heap); swim_task_create(&member->ack_task, NULL, NULL, "ack"); swim_task_create(&member->ping_task, swim_ping_task_complete, NULL, @@ -835,7 +862,8 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid) static struct swim_member * swim_new_member(struct swim *swim, const struct sockaddr_in *addr, const struct tt_uuid *uuid, enum swim_member_status status, - uint64_t incarnation, const char *payload, int payload_size) + const struct swim_incarnation *incarnation, const char *payload, + int payload_size) { int new_bsize = sizeof(swim->shuffled[0]) * (mh_size(swim->members) + 1); @@ -962,7 +990,7 @@ swim_encode_member(struct swim_packet *packet, struct swim_member *m, if (pos == NULL) return -1; swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status, - m->incarnation, encode_payload); + &m->incarnation, encode_payload); memcpy(pos, passport, sizeof(*passport)); if (encode_payload) { pos += sizeof(*passport); @@ -1044,7 +1072,7 @@ swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet, if (pos == NULL) return 0; swim_fd_header_bin_create(&fd_header_bin, type, - swim->self->incarnation); + &swim->self->incarnation); memcpy(pos, &fd_header_bin, size); return 1; } @@ -1412,14 +1440,15 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, struct swim_member *member) { assert(member != swim->self); - assert(def->incarnation >= member->incarnation); + int cmp = swim_incarnation_cmp(&def->incarnation, &member->incarnation); + assert(cmp >= 0); /* * Payload update rules are simple: it can be updated * either if the new payload has a bigger incarnation, or * the same incarnation, but local payload is outdated. */ bool update_payload = false; - if (def->incarnation > member->incarnation) { + if (cmp > 0) { if (! swim_inaddr_eq(&def->addr, &member->addr)) swim_update_member_addr(swim, member, &def->addr); if (def->payload_size >= 0) @@ -1436,7 +1465,7 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, diag_log(); } swim_update_member_inc_status(swim, member, def->status, - def->incarnation); + &def->incarnation); } /** @@ -1475,14 +1504,17 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, goto skip; } *result = swim_new_member(swim, &def->addr, &def->uuid, - def->status, def->incarnation, + def->status, &def->incarnation, def->payload, def->payload_size); return *result != NULL ? 0 : -1; } *result = member; struct swim_member *self = swim->self; + enum swim_ev_mask diff; + int cmp = swim_incarnation_diff(&def->incarnation, &member->incarnation, + &diff); if (member != self) { - if (def->incarnation < member->incarnation) + if (cmp < 0) goto skip; swim_update_member(swim, def, member); return 0; @@ -1491,22 +1523,21 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, * It is possible that other instances know a bigger * incarnation of this instance - such thing happens when * the instance restarts and loses its local incarnation - * number. It will be restored by receiving dissemination + * value. It will be restored by receiving dissemination * and anti-entropy messages about self. */ - if (self->incarnation < def->incarnation) { + if (cmp > 0) { self->incarnation = def->incarnation; - swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); + swim_on_member_update(swim, self, diff); } - if (def->status != MEMBER_ALIVE && - def->incarnation == self->incarnation) { + if (def->status != MEMBER_ALIVE && cmp == 0) { /* * In the cluster a gossip exists that this * instance is not alive. Refute this information * with a bigger incarnation. */ - self->incarnation++; - swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); + self->incarnation.version++; + swim_on_member_update(swim, self, SWIM_EV_NEW_VERSION); } return 0; skip: @@ -1595,7 +1626,7 @@ swim_process_failure_detection(struct swim *swim, const char **pos, * case - this message was received from the member * directly, and evidently it is alive. */ - if (def.incarnation == member->incarnation && + if (swim_incarnation_cmp(&def.incarnation, &member->incarnation) == 0 && member->status != MEMBER_ALIVE) { member->status = MEMBER_ALIVE; swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS); @@ -1646,19 +1677,28 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, uint32_t size; if (swim_decode_map(pos, end, &size, prefix, "root") != 0) return -1; - if (size != 1) { - diag_set(SwimError, "%s map of size 1 is expected", prefix); + if (size != SWIM_INCARNATION_BIN_SIZE) { + diag_set(SwimError, "%s map of size %d is expected", + prefix, SWIM_INCARNATION_BIN_SIZE); return -1; } - uint64_t tmp; - if (swim_decode_uint(pos, end, &tmp, prefix, "a key") != 0) - return -1; - if (tmp != SWIM_QUIT_INCARNATION) { - diag_set(SwimError, "%s a key should be incarnation", prefix); - return -1; + struct swim_incarnation incarnation; + swim_incarnation_create(&incarnation, 0); + for (uint32_t i = 0; i < size; ++i) { + uint64_t tmp; + if (swim_decode_uint(pos, end, &tmp, prefix, "a key") != 0) + return -1; + switch (tmp) { + case SWIM_QUIT_VERSION: + if (swim_decode_uint(pos, end, &incarnation.version, + prefix, "version") != 0) + return -1; + break; + default: + diag_set(SwimError, "%s unknown key", prefix); + return -1; + } } - if (swim_decode_uint(pos, end, &tmp, prefix, "incarnation") != 0) - return -1; struct swim_member *m = swim_find_member(swim, uuid); if (m == NULL) return 0; @@ -1666,11 +1706,16 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, * Check for 'self' in case this instance took UUID of a * quited instance. */ + enum swim_ev_mask diff; if (m != swim->self) { - swim_update_member_inc_status(swim, m, MEMBER_LEFT, tmp); - } else if (tmp >= m->incarnation) { - m->incarnation = tmp + 1; - swim_on_member_update(swim, m, SWIM_EV_NEW_INCARNATION); + swim_update_member_inc_status(swim, m, MEMBER_LEFT, + &incarnation); + } else if (swim_incarnation_diff(&incarnation, &m->incarnation, + &diff) >= 0) { + m->incarnation = incarnation; + ++m->incarnation.version; + diff |= SWIM_EV_NEW_VERSION; + swim_on_member_update(swim, m, diff); } return 0; } @@ -1895,8 +1940,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, "a first config", prefix); return -1; } + struct swim_incarnation incarnation; + swim_incarnation_create(&incarnation, 0); swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, - 0, NULL, 0); + &incarnation, NULL, 0); if (swim->self == NULL) return -1; } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { @@ -1908,7 +1955,9 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } new_self = swim_new_member(swim, &swim->self->addr, uuid, - MEMBER_ALIVE, 0, swim->self->payload, + MEMBER_ALIVE, + &swim->self->incarnation, + swim->self->payload, swim->self->payload_size); if (new_self == NULL) return -1; @@ -1959,9 +2008,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, swim->self = new_self; } if (! swim_inaddr_eq(&addr, &swim->self->addr)) { - swim->self->incarnation++; - swim_on_member_update(swim, swim->self, - SWIM_EV_NEW_INCARNATION); + swim->self->incarnation.version++; + swim_on_member_update(swim, swim->self, SWIM_EV_NEW_VERSION); swim_update_member_addr(swim, swim->self, &addr); } if (gc_mode != SWIM_GC_DEFAULT) @@ -1994,8 +2042,8 @@ swim_set_payload(struct swim *swim, const char *payload, int payload_size) struct swim_member *self = swim->self; if (swim_update_member_payload(swim, self, payload, payload_size) != 0) return -1; - self->incarnation++; - swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); + self->incarnation.version++; + swim_on_member_update(swim, self, SWIM_EV_NEW_VERSION); return 0; } @@ -2013,7 +2061,9 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) return -1; struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { - member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0, + struct swim_incarnation inc; + swim_incarnation_create(&inc, 0); + member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &inc, NULL, -1); return member == NULL ? -1 : 0; } @@ -2173,7 +2223,7 @@ swim_encode_quit(struct swim *swim, struct swim_packet *packet) char *pos = swim_packet_alloc(packet, sizeof(bin)); if (pos == NULL) return 0; - swim_quit_bin_create(&bin, swim->self->incarnation); + swim_quit_bin_create(&bin, &swim->self->incarnation); memcpy(pos, &bin, sizeof(bin)); return 1; } @@ -2265,7 +2315,7 @@ swim_member_uuid(const struct swim_member *member) return &member->uuid; } -uint64_t +struct swim_incarnation swim_member_incarnation(const struct swim_member *member) { return member->incarnation; diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index a42ace7c6..b8e44515e 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -234,7 +234,7 @@ const struct tt_uuid * swim_member_uuid(const struct swim_member *member); /** Member's incarnation. */ -uint64_t +struct swim_incarnation swim_member_incarnation(const struct swim_member *member); /** Member's payload. */ @@ -279,6 +279,11 @@ enum swim_ev_mask { SWIM_EV_NEW = 0b00000001, SWIM_EV_NEW_STATUS = 0b00000010, SWIM_EV_NEW_URI = 0b00000100, + SWIM_EV_NEW_VERSION = 0b00001000, + /* + * Shortcut to check for update of any part of + * incarnation. + */ SWIM_EV_NEW_INCARNATION = 0b00001000, SWIM_EV_NEW_PAYLOAD = 0b00010000, /* Shortcut to check for any update. */ diff --git a/src/lib/swim/swim_constants.h b/src/lib/swim/swim_constants.h index 4f8404ce3..f105040c6 100644 --- a/src/lib/swim/swim_constants.h +++ b/src/lib/swim/swim_constants.h @@ -55,4 +55,36 @@ enum swim_member_status { extern const char *swim_member_status_strs[]; +/** + * A monotonically growing value to refute false gossips and + * update member attributes on remote instances. Any piece of + * information is labeled with an incarnation value. Information + * labeled with a newer (bigger) incarnation is considered more + * actual. + */ +struct swim_incarnation { + /** + * Version is a volatile part of incarnation. It is + * managed by SWIM fully internally. + */ + uint64_t version; +}; + +/** Create a new incarnation value. */ +static inline void +swim_incarnation_create(struct swim_incarnation *i, uint64_t version) +{ + i->version = version; +} + +/** + * Compare two incarnation values. + * @retval =0 l == r. + * @retval <0 l < r. + * @retval >0 l > r. + */ +int +swim_incarnation_cmp(const struct swim_incarnation *l, + const struct swim_incarnation *r); + #endif /* TARANTOOL_SWIM_CONSTANTS_H_INCLUDED */ diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 938631e49..31c931b98 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -155,6 +155,29 @@ swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, return 0; } +/** + * Create incarnation binary MessagePack structure. It expects + * parent structure specific keys for incarnation parts. + */ +static inline void +swim_incarnation_bin_create(struct swim_incarnation_bin *bin, + uint8_t version_key) +{ + bin->k_version = version_key; + bin->m_version = 0xcf; +} + +/** + * Fill a created incarnation binary structure with an incarnation + * value. + */ +static inline void +swim_incarnation_bin_fill(struct swim_incarnation_bin *bin, + const struct swim_incarnation *incarnation) +{ + bin->v_version = mp_bswap_u64(incarnation->version); +} + /** * Check if @a addr is not empty, i.e. not nullified. Set an error * in the diagnostics area in case of emptiness. @@ -247,9 +270,9 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member uuid") != 0) return -1; break; - case SWIM_MEMBER_INCARNATION: - if (swim_decode_uint(pos, end, &def->incarnation, prefix, - "member incarnation") != 0) + case SWIM_MEMBER_VERSION: + if (swim_decode_uint(pos, end, &def->incarnation.version, + prefix, "member version") != 0) return -1; break; case SWIM_MEMBER_PAYLOAD: @@ -308,17 +331,19 @@ swim_src_uuid_bin_create(struct swim_src_uuid_bin *header, void swim_fd_header_bin_create(struct swim_fd_header_bin *header, - enum swim_fd_msg_type type, uint64_t incarnation) + enum swim_fd_msg_type type, + const struct swim_incarnation *incarnation) { header->k_header = SWIM_FAILURE_DETECTION; - header->m_header = 0x82; + int map_size = 1 + SWIM_INCARNATION_BIN_SIZE; + assert(mp_sizeof_map(map_size) == 1); + header->m_header = 0x80 | map_size; header->k_type = SWIM_FD_MSG_TYPE; header->v_type = type; - header->k_incarnation = SWIM_FD_INCARNATION; - header->m_incarnation = 0xcf; - header->v_incarnation = mp_bswap_u64(incarnation); + swim_incarnation_bin_create(&header->incarnation, SWIM_FD_VERSION); + swim_incarnation_bin_fill(&header->incarnation, incarnation); } int @@ -331,9 +356,10 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, return -1; memset(def, 0, sizeof(*def)); def->type = swim_fd_msg_type_MAX; - if (size != 2) { - diag_set(SwimError, "%s root map should have two keys - "\ - "message type and incarnation", prefix); + if (size != 1 + SWIM_INCARNATION_BIN_SIZE) { + diag_set(SwimError, "%s root map should have %d keys - "\ + "message type and version", prefix, + 1 + SWIM_INCARNATION_BIN_SIZE); return -1; } for (int i = 0; i < (int) size; ++i) { @@ -352,9 +378,10 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, } def->type = key; break; - case SWIM_FD_INCARNATION: - if (swim_decode_uint(pos, end, &def->incarnation, - prefix, "incarnation") != 0) + case SWIM_FD_VERSION: + if (swim_decode_uint(pos, end, + &def->incarnation.version, prefix, + "version") != 0) return -1; break; default: @@ -401,24 +428,26 @@ swim_passport_bin_create(struct swim_passport_bin *passport) passport->k_uuid = SWIM_MEMBER_UUID; passport->m_uuid = 0xc4; passport->m_uuid_len = UUID_LEN; - passport->k_incarnation = SWIM_MEMBER_INCARNATION; - passport->m_incarnation = 0xcf; + swim_incarnation_bin_create(&passport->incarnation, + SWIM_MEMBER_VERSION); } void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation, + enum swim_member_status status, + const struct swim_incarnation *incarnation, bool encode_payload) { - int map_size = 3 + SWIM_INADDR_BIN_SIZE + encode_payload; + int map_size = 2 + SWIM_INCARNATION_BIN_SIZE + SWIM_INADDR_BIN_SIZE + + encode_payload; assert(mp_sizeof_map(map_size) == 1); passport->m_header = 0x80 | map_size; passport->v_status = status; swim_inaddr_bin_fill(&passport->addr, addr); memcpy(passport->v_uuid, uuid, UUID_LEN); - passport->v_incarnation = mp_bswap_u64(incarnation); + swim_incarnation_bin_fill(&passport->incarnation, incarnation); } void @@ -556,13 +585,14 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos, } void -swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation) +swim_quit_bin_create(struct swim_quit_bin *header, + const struct swim_incarnation *incarnation) { header->k_quit = SWIM_QUIT; - header->m_quit = 0x81; - header->k_incarnation = SWIM_QUIT_INCARNATION; - header->m_incarnation = 0xcf; - header->v_incarnation = mp_bswap_u64(incarnation); + assert(mp_sizeof_map(SWIM_INCARNATION_BIN_SIZE) == 1); + header->m_quit = 0x80 | SWIM_INCARNATION_BIN_SIZE; + swim_incarnation_bin_create(&header->incarnation, SWIM_QUIT_VERSION); + swim_incarnation_bin_fill(&header->incarnation, incarnation); } void diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index 482d79fb1..fee6e078d 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -72,7 +72,7 @@ enum { * | | * | SWIM_FAILURE_DETECTION: { | * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | - * | SWIM_FD_INCARNATION: uint | + * | SWIM_FD_VERSION: uint | * | }, | * | | * | OR/AND | @@ -83,7 +83,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_VERSION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | @@ -97,7 +97,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | - * | SWIM_MEMBER_INCARNATION: uint, | + * | SWIM_MEMBER_VERSION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | * | ... | @@ -106,12 +106,29 @@ enum { * | OR/AND | * | | * | SWIM_QUIT: { | - * | SWIM_QUIT_INCARNATION: uint | + * | SWIM_QUIT_VERSION: uint | * | } | * | } | * +-------------------------------------------------------------+ */ +enum { + /** + * Number of keys in the incarnation binary structure. + * Structures storing an incarnation should use this size + * so as to correctly encode MessagePack map header. + */ + SWIM_INCARNATION_BIN_SIZE = 1, +}; + +struct PACKED swim_incarnation_bin { + /** mp_encode_uint(version key) */ + uint8_t k_version; + /** mp_encode_uint(64bit version) */ + uint8_t m_version; + uint64_t v_version; +}; + /** * SWIM member attributes from anti-entropy and dissemination * messages. @@ -119,7 +136,7 @@ enum { struct swim_member_def { struct tt_uuid uuid; struct sockaddr_in addr; - uint64_t incarnation; + struct swim_incarnation incarnation; enum swim_member_status status; const char *payload; int payload_size; @@ -188,7 +205,7 @@ enum swim_fd_key { * it was considered dead, but ping/ack with greater * incarnation was received from it. */ - SWIM_FD_INCARNATION, + SWIM_FD_VERSION, }; /** Failure detection message type. */ @@ -212,24 +229,22 @@ struct PACKED swim_fd_header_bin { /** mp_encode_uint(enum swim_fd_msg_type) */ uint8_t v_type; - /** mp_encode_uint(SWIM_FD_INCARNATION) */ - uint8_t k_incarnation; - /** mp_encode_uint(64bit incarnation) */ - uint8_t m_incarnation; - uint64_t v_incarnation; + /** SWIM_FD_VERSION */ + struct swim_incarnation_bin incarnation; }; /** Initialize failure detection section. */ void swim_fd_header_bin_create(struct swim_fd_header_bin *header, - enum swim_fd_msg_type type, uint64_t incarnation); + enum swim_fd_msg_type type, + const struct swim_incarnation *incarnation); /** A decoded failure detection message. */ struct swim_failure_detection_def { /** Type of the message. */ enum swim_fd_msg_type type; /** Incarnation of the sender. */ - uint64_t incarnation; + struct swim_incarnation incarnation; }; /** @@ -290,7 +305,7 @@ enum swim_member_key { SWIM_MEMBER_ADDRESS, SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, - SWIM_MEMBER_INCARNATION, + SWIM_MEMBER_VERSION, SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, }; @@ -339,11 +354,8 @@ struct PACKED swim_passport_bin { uint8_t m_uuid_len; uint8_t v_uuid[UUID_LEN]; - /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ - uint8_t k_incarnation; - /** mp_encode_uint(64bit incarnation) */ - uint8_t m_incarnation; - uint64_t v_incarnation; + /** SWIM_MEMBER_VERSION */ + struct swim_incarnation_bin incarnation; }; /** @@ -384,7 +396,8 @@ void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation, + enum swim_member_status status, + const struct swim_incarnation *incarnation, bool encode_payload); /** }}} Anti-entropy component */ @@ -562,7 +575,7 @@ swim_route_bin_create(struct swim_route_bin *route, enum swim_quit_key { /** Incarnation to ignore old quit messages. */ - SWIM_QUIT_INCARNATION = 0, + SWIM_QUIT_VERSION = 0, }; /** Quit section. Describes voluntary quit from the cluster. */ @@ -572,16 +585,14 @@ struct PACKED swim_quit_bin { /** mp_encode_map(1) */ uint8_t m_quit; - /** mp_encode_uint(SWIM_QUIT_INCARNATION) */ - uint8_t k_incarnation; - /** mp_encode_uint(64bit incarnation) */ - uint8_t m_incarnation; - uint64_t v_incarnation; + /** SWIM_QUIT_VERSION */ + struct swim_incarnation_bin incarnation; }; /** Initialize quit section. */ void -swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation); +swim_quit_bin_create(struct swim_quit_bin *header, + const struct swim_incarnation *incarnation); /** * Helpers to decode some values - map, array, etc with diff --git a/src/lua/swim.lua b/src/lua/swim.lua index 4f91ac233..0686590cb 100644 --- a/src/lua/swim.lua +++ b/src/lua/swim.lua @@ -28,12 +28,17 @@ ffi.cdef[[ SWIM_EV_NEW = 0b00000001, SWIM_EV_NEW_STATUS = 0b00000010, SWIM_EV_NEW_URI = 0b00000100, + SWIM_EV_NEW_VERSION = 0b00001000, SWIM_EV_NEW_INCARNATION = 0b00001000, SWIM_EV_NEW_PAYLOAD = 0b00010000, SWIM_EV_UPDATE = 0b00011110, SWIM_EV_DROP = 0b00100000, }; + struct swim_incarnation { + uint64_t version; + }; + bool swim_is_configured(const struct swim *swim); @@ -92,7 +97,7 @@ ffi.cdef[[ const struct tt_uuid * swim_member_uuid(const struct swim_member *member); - uint64_t + struct swim_incarnation swim_member_incarnation(const struct swim_member *member); const char * @@ -124,6 +129,22 @@ local swim_member_status_strs = { [capi.MEMBER_LEFT] = 'left' } +local swim_incarnation_mt = { + __eq = function(l, r) + return l.version == r.version + end, + __lt = function(l, r) + return l.version < r.version + end, + __le = function(l, r) + return l.version <= r.version + end, + __tostring = function(i) + return string.format('cdata {version = %s}', i.version) + end, +} +ffi.metatype(ffi.typeof('struct swim_incarnation'), swim_incarnation_mt) + -- -- Check if @a value is something that can be passed as a -- URI parameter. Note, it does not validate URI, because it is @@ -370,7 +391,7 @@ local function swim_member_payload(m) -- payload. For example, via ACK messages. local key1 = capi.swim_member_incarnation(ptr) local key2 = capi.swim_member_is_payload_up_to_date(ptr) - if key1 == m.p_key1 and key2 == m.p_key2 then + if m.p_key1 and key1 == m.p_key1 and key2 == m.p_key2 then return m.p end local cdata, size = swim_member_payload_raw(ptr) @@ -726,6 +747,9 @@ local swim_member_event_index = { is_new_incarnation = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0 end, + is_new_version = function(self) + return bit.band(self[1], capi.SWIM_EV_NEW_VERSION) ~= 0 + end, is_new_payload = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_PAYLOAD) ~= 0 end, diff --git a/test/swim/swim.result b/test/swim/swim.result index cceee2595..e3b89f809 100644 --- a/test/swim/swim.result +++ b/test/swim/swim.result @@ -307,7 +307,7 @@ old_self --- - uri: 127.0.0.1: status: left - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -391,7 +391,7 @@ s --- - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -413,7 +413,7 @@ s:uri() ... s:incarnation() --- -- 1 +- cdata {version = 1ULL} ... s:payload_cdata() --- @@ -490,7 +490,7 @@ s1:member_by_uuid(s:uuid()) --- - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -721,7 +721,7 @@ s1_view:payload() ... s1_view:incarnation() --- -- 1 +- cdata {version = 1ULL} ... s1:set_payload('payload') --- @@ -732,7 +732,7 @@ while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end ... s1_view:incarnation() --- -- 2 +- cdata {version = 2ULL} ... s1:set_payload('payload2') --- @@ -743,7 +743,7 @@ while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end ... s1_view:incarnation() --- -- 3 +- cdata {version = 3ULL} ... s1:delete() --- @@ -778,7 +778,7 @@ iterate() - - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -791,13 +791,13 @@ iterate() - - - 00000000-0000-1000-8000-000000000002 - uri: 127.0.0.1: status: alive - incarnation: 0 + incarnation: cdata {version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -810,19 +810,19 @@ iterate() - - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 - - 00000000-0000-1000-8000-000000000003 - uri: 127.0.0.1: status: alive - incarnation: 0 + incarnation: cdata {version = 0ULL} uuid: 00000000-0000-1000-8000-000000000003 payload_size: 0 - - 00000000-0000-1000-8000-000000000002 - uri: 127.0.0.1: status: alive - incarnation: 0 + incarnation: cdata {version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -906,7 +906,7 @@ s1_view:payload() ... s1_view:incarnation() --- -- 3 +- cdata {version = 3ULL} ... s1:cfg({heartbeat_rate = 0.01}) --- @@ -932,7 +932,7 @@ p ... s1_view:incarnation() --- -- 3 +- cdata {version = 3ULL} ... s1:delete() --- @@ -964,7 +964,7 @@ s2 --- - uri: 127.0.0.1: status: alive - incarnation: 0 + incarnation: cdata {version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -1210,7 +1210,7 @@ s2:member_by_uuid(s1:self():uuid()) --- - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -1287,13 +1287,14 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... e_list --- -- - is_update: true +- - is_new_version: true + is_update: true is_new_payload: true is_new_uri: true is_new: true @@ -1328,7 +1329,7 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -1370,17 +1371,17 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - uri: 127.0.0.1: status: alive - incarnation: 2 + incarnation: cdata {version = 2ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 8 - uri: 127.0.0.1: status: alive - incarnation: 2 + incarnation: cdata {version = 2ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 8 ... @@ -1389,12 +1390,14 @@ e_list - - is_new_payload: true is_new: true is_update: true - - is_new_payload: true - is_update: true + - is_new_version: true + is_new_payload: true is_new_incarnation: true - - is_new_payload: true is_update: true + - is_new_version: true + is_new_payload: true is_new_incarnation: true + is_update: true ... ctx_list --- @@ -1427,12 +1430,12 @@ m_list --- - - uri: 127.0.0.1: status: left - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - uri: 127.0.0.1: status: left - incarnation: 1 + incarnation: cdata {version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... diff --git a/test/unit/swim.c b/test/unit/swim.c index 0977e0969..63f816d4a 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -397,8 +397,8 @@ swim_test_refute(void) "new incarnation has reached S1 with a next round message"); swim_cluster_restart_node(cluster, 1); - is(swim_cluster_member_incarnation(cluster, 1, 1), 0, - "after restart S2's incarnation is 0 again"); + is(swim_cluster_member_incarnation(cluster, 1, 1).version, 0, + "after restart S2's incarnation is default again"); is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, "S2 learned its old bigger incarnation 1 from S0"); @@ -698,8 +698,8 @@ swim_test_payload_basic(void) is(swim_cluster_member_set_payload(cluster, 0, s0_payload, s0_payload_size), 0, "payload is set"); - is(swim_cluster_member_incarnation(cluster, 0, 0), 1, - "incarnation is incremeted on each payload update"); + is(swim_cluster_member_incarnation(cluster, 0, 0).version, 1, + "version is incremented on each payload update"); const char *tmp = swim_cluster_member_payload(cluster, 0, 0, &size); ok(size == s0_payload_size && memcmp(s0_payload, tmp, size) == 0, "payload is successfully obtained back"); @@ -712,8 +712,8 @@ swim_test_payload_basic(void) is(swim_cluster_member_set_payload(cluster, 0, s0_payload, s0_payload_size), 0, "payload is changed"); - is(swim_cluster_member_incarnation(cluster, 0, 0), 2, - "incarnation is incremeted on each payload update"); + is(swim_cluster_member_incarnation(cluster, 0, 0).version, 2, + "version is incremented on each payload update"); is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload, s0_payload_size, cluster_size), 0, "second payload is disseminated"); @@ -759,9 +759,9 @@ swim_test_payload_refutation(void) * The test checks the following case. Assume there are 3 * nodes: S1, S2, S3. They all know each other. S1 sets * new payload, S2 and S3 knows that. They all see that S1 - * has incarnation 1 and payload P1. + * has version 1 and payload P1. * - * Now S1 changes payload to P2. Its incarnation becomes + * Now S1 changes payload to P2. Its version becomes * 2. During next entire round its round messages are * lost, however ACKs work ok. */ @@ -774,9 +774,9 @@ swim_test_payload_refutation(void) swim_run_for(3); swim_cluster_drop_components(cluster, 0, NULL, 0); - is(swim_cluster_member_incarnation(cluster, 1, 0), 2, - "S2 sees new incarnation of S1"); - is(swim_cluster_member_incarnation(cluster, 2, 0), 2, + is(swim_cluster_member_incarnation(cluster, 1, 0).version, 2, + "S2 sees new version of S1"); + is(swim_cluster_member_incarnation(cluster, 2, 0).version, 2, "S3 does the same"); const char *tmp = swim_cluster_member_payload(cluster, 1, 0, &size); @@ -794,7 +794,7 @@ swim_test_payload_refutation(void) /* * Now S1's payload TTD is 0, but via ACKs S1 sent its new - * incarnation to S2 and S3. Despite that they should + * version to S2 and S3. Despite that they should * apply new S1's payload via anti-entropy. Next lines * test that: * @@ -821,15 +821,15 @@ swim_test_payload_refutation(void) ok(size == s0_new_payload_size && memcmp(tmp, s0_new_payload, size) == 0, "S2 learned S1's payload via anti-entropy"); - is(swim_cluster_member_incarnation(cluster, 1, 0), 2, - "incarnation still is the same"); + is(swim_cluster_member_incarnation(cluster, 1, 0).version, 2, + "version still is the same"); tmp = swim_cluster_member_payload(cluster, 2, 0, &size); ok(size == s0_old_payload_size && memcmp(tmp, s0_old_payload, size) == 0, "S3 was blocked and does not know anything"); - is(swim_cluster_member_incarnation(cluster, 2, 0), 2, - "incarnation still is the same"); + is(swim_cluster_member_incarnation(cluster, 2, 0).version, 2, + "version still is the same"); /* S1 will not participate in the tests further. */ swim_cluster_set_drop(cluster, 0, 100); @@ -855,7 +855,7 @@ swim_test_payload_refutation(void) /* * Now check the case (3) - S3 accepts new S1's payload - * from S2. Even knowing the same S1's incarnation. + * from S2. Even knowing the same S1's version. */ swim_cluster_set_drop(cluster, 1, 0); swim_cluster_set_drop_out(cluster, 2, 100); @@ -998,7 +998,7 @@ swim_cluster_delete_f(va_list ap) static void swim_test_triggers(void) { - swim_start_test(22); + swim_start_test(23); struct swim_cluster *cluster = swim_cluster_new(2); swim_cluster_set_ack_timeout(cluster, 1); struct trigger_ctx tctx, tctx2; @@ -1034,8 +1034,8 @@ swim_test_triggers(void) swim_cluster_run_triggers(cluster); is(tctx.counter, 3, "self payload is updated"); is(tctx.ctx.member, swim_self(s1), "self is set as a member"); - is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD | SWIM_EV_NEW_INCARNATION, - "both incarnation and payload events are presented"); + is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD | SWIM_EV_NEW_VERSION, + "both version and payload events are presented"); swim_cluster_set_drop(cluster, 1, 100); fail_if(swim_cluster_wait_status(cluster, 0, 1, @@ -1089,7 +1089,7 @@ swim_test_triggers(void) if (tctx.ctx.member != NULL) swim_member_unref(tctx.ctx.member); - /* Check that recfg fires incarnation update trigger. */ + /* Check that recfg fires version update trigger. */ s1 = swim_new(); struct tt_uuid uuid = uuid_nil; uuid.time_low = 1; @@ -1100,8 +1100,10 @@ swim_test_triggers(void) fail_if(swim_cfg(s1, "127.0.0.1:2", -1, -1, -1, NULL) != 0); while (tctx.ctx.events == 0) fiber_sleep(0); - is(tctx.ctx.events, SWIM_EV_NEW_URI | SWIM_EV_NEW_INCARNATION, - "local URI update warns about incarnation update"); + is(tctx.ctx.events, SWIM_EV_NEW_URI | SWIM_EV_NEW_VERSION, + "local URI update warns about version update"); + ok((tctx.ctx.events & SWIM_EV_NEW_INCARNATION) != 0, + "version is a part of incarnation, so the latter is updated too"); swim_delete(s1); if (tctx.ctx.member != NULL) diff --git a/test/unit/swim.result b/test/unit/swim.result index 2968a2da7..8d653477b 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -88,7 +88,7 @@ ok 7 - subtests 1..4 ok 1 - S2 increments its own incarnation to refute its suspicion ok 2 - new incarnation has reached S1 with a next round message - ok 3 - after restart S2's incarnation is 0 again + ok 3 - after restart S2's incarnation is default again ok 4 - S2 learned its old bigger incarnation 1 from S0 ok 8 - subtests *** swim_test_refute: done *** @@ -158,25 +158,25 @@ ok 15 - subtests ok 2 - can not set too big payload ok 3 - diag says too big ok 4 - payload is set - ok 5 - incarnation is incremeted on each payload update + ok 5 - version is incremented on each payload update ok 6 - payload is successfully obtained back ok 7 - payload is disseminated ok 8 - payload is changed - ok 9 - incarnation is incremeted on each payload update + ok 9 - version is incremented on each payload update ok 10 - second payload is disseminated ok 11 - third payload is disseminated via anti-entropy ok 16 - subtests *** swim_test_payload_basic: done *** *** swim_test_payload_refutation *** 1..11 - ok 1 - S2 sees new incarnation of S1 + ok 1 - S2 sees new version of S1 ok 2 - S3 does the same ok 3 - but S2 does not known the new payload ok 4 - as well as S3 ok 5 - S2 learned S1's payload via anti-entropy - ok 6 - incarnation still is the same + ok 6 - version still is the same ok 7 - S3 was blocked and does not know anything - ok 8 - incarnation still is the same + ok 8 - version still is the same ok 9 - S2 keeps the same new S1's payload, S3 did not rewrite it ok 10 - S3 still does not know anything ok 11 - S3 learns S1's payload from S2 @@ -201,7 +201,7 @@ ok 19 - subtests ok 20 - subtests *** swim_test_slow_net: done *** *** swim_test_triggers *** - 1..22 + 1..23 ok 1 - trigger is fired ok 2 - is not deleted ok 3 - ctx.member is set @@ -211,7 +211,7 @@ ok 20 - subtests ok 7 - mask says that ok 8 - self payload is updated ok 9 - self is set as a member - ok 10 - both incarnation and payload events are presented + ok 10 - both version and payload events are presented ok 11 - suspicion fired a trigger ok 12 - status suspected ok 13 - death fired a trigger @@ -224,7 +224,8 @@ ok 20 - subtests ok 20 - non-yielding still is not ok 21 - trigger is not deleted until all currently sleeping triggers are finished # now all the triggers are done and deleted - ok 22 - local URI update warns about incarnation update + ok 22 - local URI update warns about version update + ok 23 - version is a part of incarnation, so the latter is updated too ok 21 - subtests *** swim_test_triggers: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 463c62390..72149b353 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -348,14 +348,17 @@ swim_cluster_member_status(struct swim_cluster *cluster, int node_id, return swim_member_status(m); } -uint64_t +struct swim_incarnation swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, int member_id) { const struct swim_member *m = swim_cluster_member_view(cluster, node_id, member_id); - if (m == NULL) - return UINT64_MAX; + if (m == NULL) { + struct swim_incarnation inc; + swim_incarnation_create(&inc, UINT64_MAX); + return inc; + } return swim_member_incarnation(m); } @@ -713,7 +716,7 @@ struct swim_member_template { * to @a incarnation. */ bool need_check_incarnation; - uint64_t incarnation; + struct swim_incarnation incarnation; /** * True, if the payload should be checked to be equal to * @a payload of size @a payload_size. @@ -751,10 +754,10 @@ swim_member_template_set_status(struct swim_member_template *t, */ static inline void swim_member_template_set_incarnation(struct swim_member_template *t, - uint64_t incarnation) + uint64_t version) { t->need_check_incarnation = true; - t->incarnation = incarnation; + swim_incarnation_create(&t->incarnation, version); } /** @@ -778,7 +781,7 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) const struct swim_member *m = swim_cluster_member_view(cluster, t->node_id, t->member_id); enum swim_member_status status; - uint64_t incarnation; + struct swim_incarnation incarnation; const char *payload; int payload_size; if (m != NULL) { @@ -787,13 +790,14 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) payload = swim_member_payload(m, &payload_size); } else { status = swim_member_status_MAX; - incarnation = 0; + swim_incarnation_create(&incarnation, 0); payload = NULL; payload_size = 0; } if (t->need_check_status && status != t->status) return false; - if (t->need_check_incarnation && incarnation != t->incarnation) + if (t->need_check_incarnation && + swim_incarnation_cmp(&incarnation, &t->incarnation) != 0) return false; if (t->need_check_payload && (payload_size != t->payload_size || @@ -848,12 +852,12 @@ swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, int swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, - int member_id, uint64_t incarnation, + int member_id, uint64_t version, double timeout) { struct swim_member_template t; swim_member_template_create(&t, node_id, member_id); - swim_member_template_set_incarnation(&t, incarnation); + swim_member_template_set_incarnation(&t, version); return swim_wait_timeout(timeout, cluster, swim_loop_check_member, &t); } diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index fde84e39b..587118c60 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -156,7 +156,7 @@ enum swim_member_status swim_cluster_member_status(struct swim_cluster *cluster, int node_id, int member_id); -uint64_t +struct swim_incarnation swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, int member_id); @@ -218,13 +218,13 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, double timeout); /** - * Wait until a member with id @a member_id is seen with @a + * Wait until a member with id @a member_id is seen with needed * incarnation in the membership table of a member with id @a * node_id. At most @a timeout seconds. */ int swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, - int member_id, uint64_t incarnation, + int member_id, uint64_t version, double timeout); /** -- 2.20.1 (Apple Git-117)