* [tarantool-patches] [PATCH 0/2] SWIM generation @ 2019-06-20 21:23 Vladislav Shpilevoy 2019-06-20 21:23 ` [tarantool-patches] [PATCH 1/2] swim: encapsulate incarnation behind 'age' Vladislav Shpilevoy 2019-06-20 21:23 ` [tarantool-patches] [PATCH 2/2] swim: introduce generation Vladislav Shpilevoy 0 siblings, 2 replies; 9+ messages in thread From: Vladislav Shpilevoy @ 2019-06-20 21:23 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja The patchset introduces SWIM generation - a persistent part of incarnation used to detect restarts, and refute false gossips left from previous lifes of an instance. Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-4280-swim-generation Issue: https://github.com/tarantool/tarantool/issues/4280 Vladislav Shpilevoy (2): swim: encapsulate incarnation behind 'age' swim: introduce generation extra/exports | 1 + src/lib/swim/swim.c | 300 ++++++++++++++++++++++-------------- src/lib/swim/swim.h | 18 ++- src/lib/swim/swim_proto.c | 77 ++++++--- src/lib/swim/swim_proto.h | 113 ++++++++++---- src/lua/swim.c | 3 +- src/lua/swim.lua | 50 ++++-- test/swim/swim.result | 176 +++++++++++++++++---- test/swim/swim.test.lua | 38 +++++ test/unit/swim.c | 54 +++++-- test/unit/swim.result | 11 +- test/unit/swim_test_utils.c | 54 +++++-- test/unit/swim_test_utils.h | 16 +- 13 files changed, 657 insertions(+), 254 deletions(-) -- 2.20.1 (Apple Git-117) ^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] [PATCH 1/2] swim: encapsulate incarnation behind 'age' 2019-06-20 21:23 [tarantool-patches] [PATCH 0/2] SWIM generation Vladislav Shpilevoy @ 2019-06-20 21:23 ` Vladislav Shpilevoy 2019-06-20 21:23 ` [tarantool-patches] [PATCH 2/2] swim: introduce generation Vladislav Shpilevoy 1 sibling, 0 replies; 9+ messages in thread From: Vladislav Shpilevoy @ 2019-06-20 21:23 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja Traditional SWIM describes member age as incarnation - monotonically growing number to refute false gossips. But it is not enough in the real world because of necessity to detect restarts. Incarnations are not persisted, and even being persistent it won't help without addition of new incarnation-like attributes. This patch encapsulates incarnation into an 'age' to simplify further work around this area. Part of #4280 --- src/lib/swim/swim.c | 260 ++++++++++++++++++++---------------- src/lib/swim/swim_proto.c | 54 +++++--- src/lib/swim/swim_proto.h | 85 ++++++++---- src/lua/swim.lua | 7 +- test/unit/swim.c | 13 +- test/unit/swim_test_utils.c | 23 ++-- test/unit/swim_test_utils.h | 11 +- 7 files changed, 265 insertions(+), 188 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 2b37d41e0..b55945a34 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -213,6 +213,24 @@ swim_uuid_hash(const struct tt_uuid *uuid) return mh_strn_hash((const char *) uuid, UUID_LEN); } +/** + * Compare two age values. + * @retval <0 l < r. + * @retval >0 l > r. + * @retval =0 l == r. + */ +static inline int +swim_age_cmp(const struct swim_age *l, const struct swim_age *r, + enum swim_ev_mask *diff) +{ + if (l->incarnation == r->incarnation) { + *diff = 0; + return 0; + } + *diff = SWIM_EV_NEW_INCARNATION; + return l->incarnation < r->incarnation ? -1 : 1; +} + /** * A cluster member description. This structure describes the * last known state of an instance. This state is updated @@ -302,19 +320,17 @@ struct swim_member { * further. Otherwise @a payload is suspected to be * outdated and can be updated in two cases only: * - * 1) when it is received with a bigger incarnation from - * anywhere; + * 1) when it is received with a newer age from anywhere; * - * 2) when it is received with the same incarnation, but - * local payload is outdated. + * 2) when it is received with the same age, but local + * payload is outdated. * - * A payload can become outdated, if anyhow a new - * incarnation of the member has been learned, but not a - * new payload. For example, a message with new payload - * could be lost, and at the same time this instance - * responds to a ping with newly incarnated ack. The ack - * receiver will learn the new incarnation, but not the - * new payload. + * A payload can become outdated, if anyhow a new age of + * the member has been learned, but not a new payload. For + * example, a message with the new payload could be lost, + * and at the same time this instance responds to a ping + * with a newly aged ack. The ack receiver will learn the + * new age, but not the new payload. * * In this case it can't be said exactly whether the * member has updated payload, or another attribute. The @@ -353,18 +369,17 @@ struct swim_member { * Failure detection component */ /** - * A monotonically growing number to refute old member's - * state, characterized by a triplet - * {incarnation, status, address}. + * A monotonically growing age value to refute old + * member's state such as status, address, payload etc. */ - uint64_t incarnation; + struct swim_age age; /** * How many recent pings did not receive an ack while the * member was in the current status. When this number * reaches a configured threshold the instance is marked * as dead. After a few more unacknowledged it is removed * from the member table. This counter is reset on each - * acknowledged ping, status or incarnation change. + * acknowledged ping, status or age change. */ int unacknowledged_pings; /** @@ -578,7 +593,7 @@ swim_register_event(struct swim *swim, struct swim_member *member) /** * Make all needed actions to process a member's update like a - * change of its status, or incarnation, or both. + * change of its status, or age, or both. */ static void swim_on_member_update(struct swim *swim, struct swim_member *member, @@ -619,19 +634,20 @@ swim_has_pending_events(struct swim *swim) } /** - * Update status and incarnation of the member if needed. Statuses - * are compared as a compound key: {incarnation, status}. So @a - * new_status can override an old one only if its incarnation is - * greater, or the same, but its status is "bigger". Statuses are - * compared by their identifier, so "alive" < "dead". This - * protects from the case when a member is detected as dead on one - * instance, but overridden by another instance with the same - * incarnation's "alive" message. + * Update status and age of the member if needed. Statuses are + * compared as a compound key: {age, status}. So @a new_status + * overrides any older value, or of the same age, but if + * @a new_status status is "bigger". + * + * Statuses are compared by their identifier, so "alive" < "dead". + * This protects from the case when a member is detected as dead + * on one instance, but overridden by another instance with an + * "alive" message of the same age. */ static inline void -swim_update_member_inc_status(struct swim *swim, struct swim_member *member, +swim_update_member_age_status(struct swim *swim, struct swim_member *member, enum swim_member_status new_status, - uint64_t incarnation) + const struct swim_age *age) { /* * Source of truth about self is this instance and it is @@ -639,16 +655,16 @@ swim_update_member_inc_status(struct swim *swim, struct swim_member *member, * separately. */ assert(member != swim->self); - if (member->incarnation < incarnation) { - enum swim_ev_mask events = SWIM_EV_NEW_INCARNATION; + enum swim_ev_mask events; + int cmp = swim_age_cmp(&member->age, age, &events); + if (cmp < 0) { if (new_status != member->status) { events |= SWIM_EV_NEW_STATUS; member->status = new_status; } - member->incarnation = incarnation; + member->age = *age; swim_on_member_update(swim, member, events); - } else if (member->incarnation == incarnation && - member->status < new_status) { + } else if (cmp == 0 && member->status < new_status) { member->status = new_status; swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS); } @@ -760,7 +776,7 @@ swim_member_delete(struct swim_member *member) /** Create a new member. It is not registered anywhere here. */ static struct swim_member * swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation) + enum swim_member_status status, const struct swim_age *age) { struct swim_member *member = (struct swim_member *) calloc(1, sizeof(*member)); @@ -776,7 +792,7 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid, rlist_create(&member->in_round_queue); /* Failure detection component. */ - member->incarnation = incarnation; + member->age = *age; heap_node_create(&member->in_wait_ack_heap); swim_task_create(&member->ack_task, NULL, NULL, "ack"); swim_task_create(&member->ping_task, swim_ping_task_complete, NULL, @@ -835,7 +851,8 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid) static struct swim_member * swim_new_member(struct swim *swim, const struct sockaddr_in *addr, const struct tt_uuid *uuid, enum swim_member_status status, - uint64_t incarnation, const char *payload, int payload_size) + const struct swim_age *age, const char *payload, + int payload_size) { int new_bsize = sizeof(swim->shuffled[0]) * (mh_size(swim->members) + 1); @@ -855,8 +872,7 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, "wait_ack_heap"); return NULL; } - struct swim_member *member = - swim_member_new(addr, uuid, status, incarnation); + struct swim_member *member = swim_member_new(addr, uuid, status, age); if (member == NULL) return NULL; assert(swim_find_member(swim, uuid) == NULL); @@ -962,7 +978,7 @@ swim_encode_member(struct swim_packet *packet, struct swim_member *m, if (pos == NULL) return -1; swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status, - m->incarnation, encode_payload); + &m->age, encode_payload); memcpy(pos, passport, sizeof(*passport)); if (encode_payload) { pos += sizeof(*passport); @@ -1043,8 +1059,7 @@ swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet, char *pos = swim_packet_alloc(packet, size); if (pos == NULL) return 0; - swim_fd_header_bin_create(&fd_header_bin, type, - swim->self->incarnation); + swim_fd_header_bin_create(&fd_header_bin, type, &swim->self->age); memcpy(pos, &fd_header_bin, size); return 1; } @@ -1405,21 +1420,23 @@ swim_update_member_addr(struct swim *swim, struct swim_member *member, /** * Update an existing member with a new definition. It is expected - * that @a def has an incarnation not older that @a member has. + * that @a def is not older than @a member. */ static inline void swim_update_member(struct swim *swim, const struct swim_member_def *def, struct swim_member *member) { assert(member != swim->self); - assert(def->incarnation >= member->incarnation); + enum swim_ev_mask events; + int cmp = swim_age_cmp(&def->age, &member->age, &events); + assert(cmp >= 0); /* * Payload update rules are simple: it can be updated - * either if the new payload has a bigger incarnation, or - * the same incarnation, but local payload is outdated. + * either if the new payload is younger, or of the same + * age, but local payload is outdated. */ bool update_payload = false; - if (def->incarnation > member->incarnation) { + if (cmp > 0) { if (! swim_inaddr_eq(&def->addr, &member->addr)) swim_update_member_addr(swim, member, &def->addr); if (def->payload_size >= 0) @@ -1435,8 +1452,7 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def, /* Not such a critical error. */ diag_log(); } - swim_update_member_inc_status(swim, member, def->status, - def->incarnation); + swim_update_member_age_status(swim, member, def->status, &def->age); } /** @@ -1475,37 +1491,38 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, goto skip; } *result = swim_new_member(swim, &def->addr, &def->uuid, - def->status, def->incarnation, + def->status, &def->age, def->payload, def->payload_size); return *result != NULL ? 0 : -1; } *result = member; struct swim_member *self = swim->self; + enum swim_ev_mask events; + int cmp = swim_age_cmp(&def->age, &member->age, &events); if (member != self) { - if (def->incarnation < member->incarnation) + if (cmp < 0) goto skip; swim_update_member(swim, def, member); return 0; } /* - * It is possible that other instances know a bigger - * incarnation of this instance - such thing happens when - * the instance restarts and loses its local incarnation - * number. It will be restored by receiving dissemination - * and anti-entropy messages about self. + * It is possible that other instances know a bigger age + * of this instance - such thing happens when the instance + * restarts and loses its local age value. It will be + * restored by receiving dissemination and anti-entropy + * messages about self. */ - if (self->incarnation < def->incarnation) { - self->incarnation = def->incarnation; - swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); + if (cmp > 0) { + self->age = def->age; + swim_on_member_update(swim, self, events); } - if (def->status != MEMBER_ALIVE && - def->incarnation == self->incarnation) { + if (def->status != MEMBER_ALIVE && cmp == 0) { /* * In the cluster a gossip exists that this * instance is not alive. Refute this information * with a bigger incarnation. */ - self->incarnation++; + self->age.incarnation++; swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); } return 0; @@ -1569,33 +1586,33 @@ swim_process_failure_detection(struct swim *swim, const char **pos, swim_fd_msg_type_strs[def.type]); swim_member_def_create(&mdef); mdef.addr = *src; - mdef.incarnation = def.incarnation; + mdef.age = def.age; mdef.uuid = *uuid; struct swim_member *member; if (swim_upsert_member(swim, &mdef, &member) != 0) return -1; /* - * It can be NULL, for example, in case of too old - * incarnation of the failure detection request. For the - * obvious reasons we do not use outdated ACKs. But we - * also ignore outdated pings. It is because 1) we need to - * be consistent in neglect of all old messages; 2) if a - * ping is considered old, then after it was sent, this - * SWIM instance has already interacted with the sender - * and learned its new incarnation. + * It can be NULL, for example, in case of too old failure + * detection request. For the obvious reasons we do not + * use outdated ACKs. But we also ignore outdated pings. + * It is because 1) we need to be consistent in neglect of + * all old messages; 2) if a ping is considered old, then + * after it was sent, this SWIM instance has already + * interacted with the sender and learned its new age. */ if (member == NULL) return 0; /* - * It is well known fact, that SWIM compares statuses as - * compound keys {incarnation, status}. If inc1 == inc2, - * but status1 > status2, nothing should happen. But it - * works for anti-entropy only, when the new status is - * received indirectly, as a gossip. Here is a different - * case - this message was received from the member - * directly, and evidently it is alive. + * It is a well known fact, that SWIM compares statuses as + * compound keys {age, status}. If age1 == age2, but + * status1 > status2, nothing should happen. But it works + * for anti-entropy only, when the new status is received + * indirectly, as a gossip. Here is a different case - + * this message was received from the member directly, and + * evidently it is alive. */ - if (def.incarnation == member->incarnation && + enum swim_ev_mask events; + if (swim_age_cmp(&def.age, &member->age, &events) == 0 && member->status != MEMBER_ALIVE) { member->status = MEMBER_ALIVE; swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS); @@ -1650,15 +1667,22 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, diag_set(SwimError, "%s map of size 1 is expected", prefix); return -1; } - uint64_t tmp; - if (swim_decode_uint(pos, end, &tmp, prefix, "a key") != 0) - return -1; - if (tmp != SWIM_QUIT_INCARNATION) { - diag_set(SwimError, "%s a key should be incarnation", prefix); - return -1; + struct swim_age age; + uint64_t key; + for (uint32_t i = 0; i < size; ++i) { + if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) + return -1; + switch (key) { + case SWIM_QUIT_INCARNATION: + if (swim_decode_uint(pos, end, &age.incarnation, prefix, + "incarnation") != 0) + return -1; + break; + default: + diag_set(SwimError, "%s unknown key", prefix); + return -1; + } } - if (swim_decode_uint(pos, end, &tmp, prefix, "incarnation") != 0) - return -1; struct swim_member *m = swim_find_member(swim, uuid); if (m == NULL) return 0; @@ -1666,11 +1690,14 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, * Check for 'self' in case this instance took UUID of a * quited instance. */ + enum swim_ev_mask events; if (m != swim->self) { - swim_update_member_inc_status(swim, m, MEMBER_LEFT, tmp); - } else if (tmp >= m->incarnation) { - m->incarnation = tmp + 1; - swim_on_member_update(swim, m, SWIM_EV_NEW_INCARNATION); + swim_update_member_age_status(swim, m, MEMBER_LEFT, &age); + } else if (swim_age_cmp(&age, &m->age, &events) >= 0) { + m->age = age; + ++m->age.incarnation; + swim_on_member_update(swim, m, events | + SWIM_EV_NEW_INCARNATION); } return 0; } @@ -1887,7 +1914,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, struct sockaddr_in addr; if (uri != NULL && swim_uri_to_addr(uri, &addr, prefix) != 0) return -1; - bool is_first_cfg = swim->self == NULL; + struct swim_member *self = swim->self; + bool is_first_cfg = self == NULL; struct swim_member *new_self = NULL; if (is_first_cfg) { if (uuid == NULL || tt_uuid_is_nil(uuid) || uri == NULL) { @@ -1895,21 +1923,23 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, "a first config", prefix); return -1; } - swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, - 0, NULL, 0); - if (swim->self == NULL) + struct swim_age age; + swim_age_create(&age, 0); + new_self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, + &age, NULL, 0); + if (new_self == NULL) return -1; } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { - uuid = &swim->self->uuid; - } else if (! tt_uuid_is_equal(uuid, &swim->self->uuid)) { + uuid = &self->uuid; + } else if (! tt_uuid_is_equal(uuid, &self->uuid)) { if (swim_find_member(swim, uuid) != NULL) { diag_set(SwimError, "%s a member with such UUID "\ "already exists", prefix); return -1; } - new_self = swim_new_member(swim, &swim->self->addr, uuid, - MEMBER_ALIVE, 0, swim->self->payload, - swim->self->payload_size); + new_self = swim_new_member(swim, &self->addr, uuid, + MEMBER_ALIVE, &self->age, + self->payload, self->payload_size); if (new_self == NULL) return -1; } @@ -1919,12 +1949,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, * was not changed. */ if (swim_scheduler_bind(&swim->scheduler, &addr) != 0) { - if (is_first_cfg) { - swim_delete_member(swim, swim->self); - swim->self = NULL; - } else if (new_self != NULL) { + if (new_self != NULL) swim_delete_member(swim, new_self); - } return -1; } /* @@ -1937,7 +1963,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, tt_sprintf("SWIM event handler %d", swim_fd(swim))); } else { - addr = swim->self->addr; + addr = self->addr; } struct ev_timer *t = &swim->round_tick; struct ev_loop *l = swim_loop(); @@ -1954,15 +1980,17 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, } if (new_self != NULL) { - swim->self->status = MEMBER_LEFT; - swim_on_member_update(swim, swim->self, SWIM_EV_NEW_STATUS); + if (self != NULL) { + self->status = MEMBER_LEFT; + swim_on_member_update(swim, self, SWIM_EV_NEW_STATUS); + } swim->self = new_self; + self = new_self; } - if (! swim_inaddr_eq(&addr, &swim->self->addr)) { - swim->self->incarnation++; - swim_on_member_update(swim, swim->self, - SWIM_EV_NEW_INCARNATION); - swim_update_member_addr(swim, swim->self, &addr); + if (! swim_inaddr_eq(&addr, &self->addr)) { + self->age.incarnation++; + swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); + swim_update_member_addr(swim, self, &addr); } if (gc_mode != SWIM_GC_DEFAULT) swim->gc_mode = gc_mode; @@ -1994,7 +2022,7 @@ swim_set_payload(struct swim *swim, const char *payload, int payload_size) struct swim_member *self = swim->self; if (swim_update_member_payload(swim, self, payload, payload_size) != 0) return -1; - self->incarnation++; + self->age.incarnation++; swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION); return 0; } @@ -2013,7 +2041,9 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) return -1; struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { - member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0, + struct swim_age age; + swim_age_create(&age, 0); + member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &age, NULL, -1); return member == NULL ? -1 : 0; } @@ -2173,7 +2203,7 @@ swim_encode_quit(struct swim *swim, struct swim_packet *packet) char *pos = swim_packet_alloc(packet, sizeof(bin)); if (pos == NULL) return 0; - swim_quit_bin_create(&bin, swim->self->incarnation); + swim_quit_bin_create(&bin, &swim->self->age); memcpy(pos, &bin, sizeof(bin)); return 1; } @@ -2268,7 +2298,7 @@ swim_member_uuid(const struct swim_member *member) uint64_t swim_member_incarnation(const struct swim_member *member) { - return member->incarnation; + return member->age.incarnation; } const char * diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 938631e49..c42d67c0a 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -169,6 +169,24 @@ swim_check_inaddr_not_empty(const struct sockaddr_in *addr, const char *prefix, return -1; } +/** + * Create a binary age structure. It requires key values by which + * the age fields should be marked. + */ +static inline void +swim_age_bin_create(struct swim_age_bin *bin, uint8_t incarnation_key) +{ + bin->k_incarnation = incarnation_key; + bin->m_incarnation = 0xcf; +} + +/** Fill already created @a bin with a real age value. */ +static inline void +swim_age_bin_fill(struct swim_age_bin *bin, const struct swim_age *age) +{ + bin->v_incarnation = mp_bswap_u64(age->incarnation); +} + /** * Create a binary address structure. It requires explicit IP and * port keys specification since the keys depend on the component @@ -248,7 +266,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, return -1; break; case SWIM_MEMBER_INCARNATION: - if (swim_decode_uint(pos, end, &def->incarnation, prefix, + if (swim_decode_uint(pos, end, &def->age.incarnation, prefix, "member incarnation") != 0) return -1; break; @@ -308,17 +326,17 @@ swim_src_uuid_bin_create(struct swim_src_uuid_bin *header, void swim_fd_header_bin_create(struct swim_fd_header_bin *header, - enum swim_fd_msg_type type, uint64_t incarnation) + enum swim_fd_msg_type type, + const struct swim_age *age) { header->k_header = SWIM_FAILURE_DETECTION; - header->m_header = 0x82; + header->m_header = 0x81 + SWIM_AGE_BIN_SIZE; header->k_type = SWIM_FD_MSG_TYPE; header->v_type = type; - header->k_incarnation = SWIM_FD_INCARNATION; - header->m_incarnation = 0xcf; - header->v_incarnation = mp_bswap_u64(incarnation); + swim_age_bin_create(&header->age, SWIM_FD_INCARNATION); + swim_age_bin_fill(&header->age, age); } int @@ -353,7 +371,7 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, def->type = key; break; case SWIM_FD_INCARNATION: - if (swim_decode_uint(pos, end, &def->incarnation, + if (swim_decode_uint(pos, end, &def->age.incarnation, prefix, "incarnation") != 0) return -1; break; @@ -401,24 +419,24 @@ swim_passport_bin_create(struct swim_passport_bin *passport) passport->k_uuid = SWIM_MEMBER_UUID; passport->m_uuid = 0xc4; passport->m_uuid_len = UUID_LEN; - passport->k_incarnation = SWIM_MEMBER_INCARNATION; - passport->m_incarnation = 0xcf; + swim_age_bin_create(&passport->age, SWIM_MEMBER_INCARNATION); } void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation, - bool encode_payload) + enum swim_member_status status, + const struct swim_age *age, bool encode_payload) { - int map_size = 3 + SWIM_INADDR_BIN_SIZE + encode_payload; + int map_size = 2 + SWIM_AGE_BIN_SIZE + SWIM_INADDR_BIN_SIZE + + encode_payload; assert(mp_sizeof_map(map_size) == 1); passport->m_header = 0x80 | map_size; passport->v_status = status; swim_inaddr_bin_fill(&passport->addr, addr); memcpy(passport->v_uuid, uuid, UUID_LEN); - passport->v_incarnation = mp_bswap_u64(incarnation); + swim_age_bin_fill(&passport->age, age); } void @@ -556,13 +574,13 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos, } void -swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation) +swim_quit_bin_create(struct swim_quit_bin *header, const struct swim_age *age) { header->k_quit = SWIM_QUIT; - header->m_quit = 0x81; - header->k_incarnation = SWIM_QUIT_INCARNATION; - header->m_incarnation = 0xcf; - header->v_incarnation = mp_bswap_u64(incarnation); + assert(mp_sizeof_map(SWIM_AGE_BIN_SIZE) == 1); + header->m_quit = 0x80 | SWIM_AGE_BIN_SIZE; + swim_age_bin_create(&header->age, SWIM_QUIT_INCARNATION); + swim_age_bin_fill(&header->age, age); } void diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index 482d79fb1..acd266db4 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -112,6 +112,47 @@ enum { * +-------------------------------------------------------------+ */ +/** + * Age is attached to every sent piece of information, and is used + * to reject/rewrite old data. + */ +struct swim_age { + /** + * Incarnation is a volatile fully automatic part, which + * is used to refute incorrect and rewrite old information + * during SWIM instance life cycle. + */ + uint64_t incarnation; +}; + +/** Initialize age. */ +static inline void +swim_age_create(struct swim_age *age, uint64_t incarnation) +{ + age->incarnation = incarnation; +} + +enum { + /** + * Number of keys in the age binary structure. Structures + * storing an age should use this size so as to correctly + * encode MessagePack map header. + */ + SWIM_AGE_BIN_SIZE = 1, +}; + +/** + * Binary age structure. It is expected that parent structure is a + * map. + */ +struct PACKED swim_age_bin { + /** mp_encode_uint(incarnation key) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + /** * SWIM member attributes from anti-entropy and dissemination * messages. @@ -119,7 +160,7 @@ enum { struct swim_member_def { struct tt_uuid uuid; struct sockaddr_in addr; - uint64_t incarnation; + struct swim_age age; enum swim_member_status status; const char *payload; int payload_size; @@ -184,9 +225,9 @@ enum swim_fd_key { /** Type of the failure detection message: ping or ack. */ SWIM_FD_MSG_TYPE, /** - * Incarnation of the sender. To make the member alive if - * it was considered dead, but ping/ack with greater - * incarnation was received from it. + * Age of the sender. To make the member alive if it was + * considered dead, but a newer ping/ack was received from + * it. */ SWIM_FD_INCARNATION, }; @@ -212,24 +253,22 @@ struct PACKED swim_fd_header_bin { /** mp_encode_uint(enum swim_fd_msg_type) */ uint8_t v_type; - /** mp_encode_uint(SWIM_FD_INCARNATION) */ - uint8_t k_incarnation; - /** mp_encode_uint(64bit incarnation) */ - uint8_t m_incarnation; - uint64_t v_incarnation; + /** SWIM_FD_INCARNATION */ + struct swim_age_bin age; }; /** Initialize failure detection section. */ void swim_fd_header_bin_create(struct swim_fd_header_bin *header, - enum swim_fd_msg_type type, uint64_t incarnation); + enum swim_fd_msg_type type, + const struct swim_age *age); /** A decoded failure detection message. */ struct swim_failure_detection_def { /** Type of the message. */ enum swim_fd_msg_type type; - /** Incarnation of the sender. */ - uint64_t incarnation; + /** Age of the sender. */ + struct swim_age age; }; /** @@ -339,11 +378,8 @@ struct PACKED swim_passport_bin { uint8_t m_uuid_len; uint8_t v_uuid[UUID_LEN]; - /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ - uint8_t k_incarnation; - /** mp_encode_uint(64bit incarnation) */ - uint8_t m_incarnation; - uint64_t v_incarnation; + /** SWIM_MEMBER_INCARNATION */ + struct swim_age_bin age; }; /** @@ -384,8 +420,8 @@ void swim_passport_bin_fill(struct swim_passport_bin *passport, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation, - bool encode_payload); + enum swim_member_status status, + const struct swim_age *age, bool encode_payload); /** }}} Anti-entropy component */ @@ -561,7 +597,7 @@ swim_route_bin_create(struct swim_route_bin *route, /** }}} Meta component */ enum swim_quit_key { - /** Incarnation to ignore old quit messages. */ + /** Age to ignore old quit messages. */ SWIM_QUIT_INCARNATION = 0, }; @@ -572,16 +608,13 @@ struct PACKED swim_quit_bin { /** mp_encode_map(1) */ uint8_t m_quit; - /** mp_encode_uint(SWIM_QUIT_INCARNATION) */ - uint8_t k_incarnation; - /** mp_encode_uint(64bit incarnation) */ - uint8_t m_incarnation; - uint64_t v_incarnation; + /** SWIM_QUIT_INCARNATION */ + struct swim_age_bin age; }; /** Initialize quit section. */ void -swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation); +swim_quit_bin_create(struct swim_quit_bin *header, const struct swim_age *age); /** * Helpers to decode some values - map, array, etc with diff --git a/src/lua/swim.lua b/src/lua/swim.lua index 4f91ac233..e411a397d 100644 --- a/src/lua/swim.lua +++ b/src/lua/swim.lua @@ -360,13 +360,12 @@ end -- -- This function caches its result. It means, that only first call -- actually decodes cdata payload. All the next calls return --- pointer to the same result, until payload is changed with a new --- incarnation. +-- pointer to the same result, until a newer payload appears. -- local function swim_member_payload(m) local ptr = swim_check_member(m, 'member:payload()') - -- Two keys are needed. Incarnation is not enough, because a - -- new incarnation can be disseminated earlier than a new + -- Both the age and the flag are needed. Age is not enough, + -- because a new age can be disseminated earlier than a new -- payload. For example, via ACK messages. local key1 = capi.swim_member_incarnation(ptr) local key2 = capi.swim_member_is_payload_up_to_date(ptr) diff --git a/test/unit/swim.c b/test/unit/swim.c index 0977e0969..bffc0985d 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -391,15 +391,15 @@ swim_test_refute(void) fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_SUSPECTED, 4) != 0); swim_cluster_set_drop(cluster, 1, 0); - is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, + is(swim_cluster_wait_age(cluster, 1, 1, 1, 1), 0, "S2 increments its own incarnation to refute its suspicion"); - is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 1), 0, + is(swim_cluster_wait_age(cluster, 0, 1, 1, 1), 0, "new incarnation has reached S1 with a next round message"); swim_cluster_restart_node(cluster, 1); is(swim_cluster_member_incarnation(cluster, 1, 1), 0, "after restart S2's incarnation is 0 again"); - is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, + is(swim_cluster_wait_age(cluster, 1, 1, 1, 1), 0, "S2 learned its old bigger incarnation 1 from S0"); swim_cluster_delete(cluster); @@ -527,7 +527,7 @@ swim_test_quit(void) * old LEFT status. */ swim_cluster_restart_node(cluster, 0); - is(swim_cluster_wait_incarnation(cluster, 0, 0, 1, 2), 0, + is(swim_cluster_wait_age(cluster, 0, 0, 1, 2), 0, "quited member S1 has returned and refuted the old status"); fail_if(swim_cluster_wait_fullmesh(cluster, 2) != 0); /* @@ -557,9 +557,8 @@ swim_test_quit(void) /* Now allow S2 to get the 'self-quit' message. */ swim_cluster_unblock_io(cluster, 1); - is(swim_cluster_wait_incarnation(cluster, 1, 1, 2, 0), 0, - "S2 finally got 'quit' message from S1, but with its 'own' UUID - "\ - "refute it") + is(swim_cluster_wait_age(cluster, 1, 1, 2, 0), 0, "S2 finally got "\ + "'quit' message from S1, but with its 'own' UUID - refute it") swim_cluster_delete(cluster); /** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 463c62390..e646c3a47 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -709,10 +709,10 @@ struct swim_member_template { bool need_check_status; enum swim_member_status status; /** - * True, if the incarnation should be checked to be equal - * to @a incarnation. + * True, if the age should be checked to be equal to a + * target. */ - bool need_check_incarnation; + bool need_check_age; uint64_t incarnation; /** * True, if the payload should be checked to be equal to @@ -747,13 +747,13 @@ swim_member_template_set_status(struct swim_member_template *t, /** * Set that the member template should be used to check member - * incarnation. + * age. */ static inline void -swim_member_template_set_incarnation(struct swim_member_template *t, - uint64_t incarnation) +swim_member_template_set_age(struct swim_member_template *t, + uint64_t incarnation) { - t->need_check_incarnation = true; + t->need_check_age = true; t->incarnation = incarnation; } @@ -793,7 +793,7 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) } if (t->need_check_status && status != t->status) return false; - if (t->need_check_incarnation && incarnation != t->incarnation) + if (t->need_check_age && incarnation != t->incarnation) return false; if (t->need_check_payload && (payload_size != t->payload_size || @@ -847,13 +847,12 @@ swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, } int -swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, - int member_id, uint64_t incarnation, - double timeout) +swim_cluster_wait_age(struct swim_cluster *cluster, int node_id, int member_id, + uint64_t incarnation, double timeout) { struct swim_member_template t; swim_member_template_create(&t, node_id, member_id); - swim_member_template_set_incarnation(&t, incarnation); + swim_member_template_set_age(&t, incarnation); return swim_wait_timeout(timeout, cluster, swim_loop_check_member, &t); } diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index fde84e39b..309636b87 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -218,14 +218,13 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, double timeout); /** - * Wait until a member with id @a member_id is seen with @a - * incarnation in the membership table of a member with id @a - * node_id. At most @a timeout seconds. + * Wait until a member with id @a member_id is seen with a needed + * age in the membership table of a member with id @a node_id. At + * most @a timeout seconds. */ int -swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, - int member_id, uint64_t incarnation, - double timeout); +swim_cluster_wait_age(struct swim_cluster *cluster, int node_id, int member_id, + uint64_t incarnation, double timeout); /** * Wait until a member with id @a member_id is seen with -- 2.20.1 (Apple Git-117) ^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] [PATCH 2/2] swim: introduce generation 2019-06-20 21:23 [tarantool-patches] [PATCH 0/2] SWIM generation Vladislav Shpilevoy 2019-06-20 21:23 ` [tarantool-patches] [PATCH 1/2] swim: encapsulate incarnation behind 'age' Vladislav Shpilevoy @ 2019-06-20 21:23 ` Vladislav Shpilevoy 2019-06-21 6:53 ` [tarantool-patches] " Konstantin Osipov 1 sibling, 1 reply; 9+ messages in thread From: Vladislav Shpilevoy @ 2019-06-20 21:23 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja SWIM uses incarnation to refute old information, but it is not enough when restarts are possible. If an instance restarts, its incarnation is reset to 0. After several local and fast updates it gets N. But it is possible, that other instances also know incarnation of this instance as N, from its previous life, but with different information. They will never take new version of data, because their current version is also considered actual. As a result, incarnation is not enough. There was a necessity to create a persistent part of incarnation. This patch introduces it and calls 'generation'. As an additional profit, generation allows to react on instance restart in user defined triggers. Closes #4280 @TarantoolBot document Title: SWIM generation Generation is a persistent part of incarnation allowing users to refute old pieces of information left from previous lifes of an instance. It is a static attribute set when a SWIM instance is created, and can't be changed without restarting the instance. Generation not only helps with overriding old information, but also can be used to detect restarts in user defined triggers. How to set generation: ```Lua swim = require('swim') s = swim.new({generation = <value>}) ``` Generation can't be set in `swim:cfg`. If it is omitted, then 0 is used by default. But be careful - if the instance is started not a first time, it is safer to use a new generation. Ideally it should be persisted somehow: in a file, in a space, in a global service. How to detect restarts: ```Lua swim = require('swim') s = swim.new() s:on_member_event(function(m, e) if e:is_new_generation() then ... -- Process restart. end end) ``` `is_new_generation` is a new method of event object passed into triggers. How to learn generation - use new `swim_member:generation()` method. Binary protocol is updated. Now Protocol Logic section looks like this: +-------------------Protocol logic section--------------------+ | map { | | 0 = SWIM_SRC_UUID: 16 byte UUID, | | | | AND | | | | 2 = SWIM_FAILURE_DETECTION: map { | | 0 = SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | | 1 = SWIM_FD_GENERATION: uint, | | 2 = SWIM_FD_INCARNATION: uint | | }, | | | | OR/AND | | | | 3 = SWIM_DISSEMINATION: array [ | | map { | | 0 = SWIM_MEMBER_STATUS: uint, | | enum member_status, | | 1 = SWIM_MEMBER_ADDRESS: uint, ip, | | 2 = SWIM_MEMBER_PORT: uint, port, | | 3 = SWIM_MEMBER_UUID: 16 byte UUID, | | 4 = SWIM_MEMBER_GENERATION: uint, | | 5 = SWIM_MEMBER_INCARNATION: uint, | | 6 = SWIM_MEMBER_PAYLOAD: bin | | }, | | ... | | ], | | | | OR/AND | | | | 1 = SWIM_ANTI_ENTROPY: array [ | | map { | | 0 = SWIM_MEMBER_STATUS: uint, | | enum member_status, | | 1 = SWIM_MEMBER_ADDRESS: uint, ip, | | 2 = SWIM_MEMBER_PORT: uint, port, | | 3 = SWIM_MEMBER_UUID: 16 byte UUID, | | 4 = SWIM_MEMBER_GENERATION: uint, | | 5 = SWIM_MEMBER_INCARNATION: uint, | | 6 = SWIM_MEMBER_PAYLOAD: bin | | }, | | ... | | ], | | | | OR/AND | | | | 4 = SWIM_QUIT: map { | | 0 = SWIM_QUIT_GENERATION: uint, | | 1 = SWIM_QUIT_INCARNATION: uint | | } | | } | +-------------------------------------------------------------+ Note - SWIM_FD_INCARNATION, SWIM_MEMBER_INCARNATION, SWIM_MEMBER_PAYLOAD, SWIM_QUIT_INCARNATION got new values. This is because 1) the SWIM is not released yet, and it is legal to change values, 2) I wanted to emphasize that 'generation' is first/upper part of member age, 'incarnation' is second/lower part. --- extra/exports | 1 + src/lib/swim/swim.c | 46 ++++++++-- src/lib/swim/swim.h | 18 +++- src/lib/swim/swim_proto.c | 31 +++++-- src/lib/swim/swim_proto.h | 38 ++++++-- src/lua/swim.c | 3 +- src/lua/swim.lua | 43 +++++++-- test/swim/swim.result | 176 +++++++++++++++++++++++++++++------- test/swim/swim.test.lua | 38 ++++++++ test/unit/swim.c | 51 +++++++++-- test/unit/swim.result | 11 ++- test/unit/swim_test_utils.c | 39 ++++++-- test/unit/swim_test_utils.h | 7 +- 13 files changed, 414 insertions(+), 88 deletions(-) diff --git a/extra/exports b/extra/exports index b8c42c0df..a4b79099d 100644 --- a/extra/exports +++ b/extra/exports @@ -108,6 +108,7 @@ swim_iterator_close swim_member_uri swim_member_uuid swim_member_incarnation +swim_member_generation swim_member_payload swim_member_ref swim_member_unref diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index b55945a34..778d1b291 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -223,11 +223,24 @@ static inline int swim_age_cmp(const struct swim_age *l, const struct swim_age *r, enum swim_ev_mask *diff) { - if (l->incarnation == r->incarnation) { + /* + * The most likely path, checked first and foremost + * explicitly. + */ + if (l->generation == r->generation && + l->incarnation == r->incarnation) { *diff = 0; return 0; } *diff = SWIM_EV_NEW_INCARNATION; + if (l->generation < r->generation) { + *diff |= SWIM_EV_NEW_GENERATION; + return -1; + } + if (l->generation > r->generation) { + *diff |= SWIM_EV_NEW_GENERATION; + return 1; + } return l->incarnation < r->incarnation ? -1 : 1; } @@ -441,6 +454,15 @@ struct swim { * status. */ struct swim_member *self; + /** + * Generation of that instance is set when the latter is + * created. It is actual only until the instance is + * configured. After that the instance can learn a bigger + * own generation from other members. Despite meaning + * in fact a wrong usage of SWIM generations, it is still + * possible. + */ + uint64_t initial_generation; /** * Scheduler of output requests, receiver of incoming * ones. @@ -1663,8 +1685,8 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, uint32_t size; if (swim_decode_map(pos, end, &size, prefix, "root") != 0) return -1; - if (size != 1) { - diag_set(SwimError, "%s map of size 1 is expected", prefix); + if (size != 2) { + diag_set(SwimError, "%s map of size 2 is expected", prefix); return -1; } struct swim_age age; @@ -1673,6 +1695,11 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) return -1; switch (key) { + case SWIM_QUIT_GENERATION: + if (swim_decode_uint(pos, end, &age.generation, prefix, + "generation") != 0) + return -1; + break; case SWIM_QUIT_INCARNATION: if (swim_decode_uint(pos, end, &age.incarnation, prefix, "incarnation") != 0) @@ -1811,13 +1838,14 @@ swim_event_handler_f(va_list va) struct swim * -swim_new(void) +swim_new(uint64_t generation) { struct swim *swim = (struct swim *) calloc(1, sizeof(*swim)); if (swim == NULL) { diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim"); return NULL; } + swim->initial_generation = generation; swim->members = mh_swim_table_new(); if (swim->members == NULL) { free(swim); @@ -1924,7 +1952,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } struct swim_age age; - swim_age_create(&age, 0); + swim_age_create(&age, swim->initial_generation, 0); new_self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &age, NULL, 0); if (new_self == NULL) @@ -2042,7 +2070,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { struct swim_age age; - swim_age_create(&age, 0); + swim_age_create(&age, 0, 0); member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &age, NULL, -1); return member == NULL ? -1 : 0; @@ -2301,6 +2329,12 @@ swim_member_incarnation(const struct swim_member *member) return member->age.incarnation; } +uint64_t +swim_member_generation(const struct swim_member *member) +{ + return member->age.generation; +} + const char * swim_member_payload(const struct swim_member *member, int *size) { diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index a42ace7c6..e5e6ec658 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -66,9 +66,16 @@ enum swim_gc_mode { * Create a new SWIM instance. Do not bind to a port or set any * parameters. Allocation and initialization only. The function * yields. + * @param generation A user-defined upper part of the instance + * age. Age consists of volatile incarnation and this + * number. It is assumed that user persists generation + * value, after restart increments it, and therefore other + * instances can detect the restart. At the same time that + * value is used to refute old attributes left from + * previous lifes of that instance. */ struct swim * -swim_new(void); +swim_new(uint64_t generation); /** Check if a swim instance is configured. */ bool @@ -237,6 +244,10 @@ swim_member_uuid(const struct swim_member *member); uint64_t swim_member_incarnation(const struct swim_member *member); +/** Member's generation. */ +uint64_t +swim_member_generation(const struct swim_member *member); + /** Member's payload. */ const char * swim_member_payload(const struct swim_member *member, int *size); @@ -281,9 +292,10 @@ enum swim_ev_mask { SWIM_EV_NEW_URI = 0b00000100, SWIM_EV_NEW_INCARNATION = 0b00001000, SWIM_EV_NEW_PAYLOAD = 0b00010000, + SWIM_EV_NEW_GENERATION = 0b00100000, /* Shortcut to check for any update. */ - SWIM_EV_UPDATE = 0b00011110, - SWIM_EV_DROP = 0b00100000, + SWIM_EV_UPDATE = 0b00111110, + SWIM_EV_DROP = 0b01000000, }; /** On member event trigger context. */ diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index c42d67c0a..610c41068 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -174,8 +174,11 @@ swim_check_inaddr_not_empty(const struct sockaddr_in *addr, const char *prefix, * the age fields should be marked. */ static inline void -swim_age_bin_create(struct swim_age_bin *bin, uint8_t incarnation_key) +swim_age_bin_create(struct swim_age_bin *bin, uint8_t generation_key, + uint8_t incarnation_key) { + bin->k_generation = generation_key; + bin->m_generation = 0xcf; bin->k_incarnation = incarnation_key; bin->m_incarnation = 0xcf; } @@ -184,6 +187,7 @@ swim_age_bin_create(struct swim_age_bin *bin, uint8_t incarnation_key) static inline void swim_age_bin_fill(struct swim_age_bin *bin, const struct swim_age *age) { + bin->v_generation = mp_bswap_u64(age->generation); bin->v_incarnation = mp_bswap_u64(age->incarnation); } @@ -265,6 +269,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member uuid") != 0) return -1; break; + case SWIM_MEMBER_GENERATION: + if (swim_decode_uint(pos, end, &def->age.generation, prefix, + "member generation") != 0) + return -1; + break; case SWIM_MEMBER_INCARNATION: if (swim_decode_uint(pos, end, &def->age.incarnation, prefix, "member incarnation") != 0) @@ -335,7 +344,8 @@ swim_fd_header_bin_create(struct swim_fd_header_bin *header, header->k_type = SWIM_FD_MSG_TYPE; header->v_type = type; - swim_age_bin_create(&header->age, SWIM_FD_INCARNATION); + swim_age_bin_create(&header->age, SWIM_FD_GENERATION, + SWIM_FD_INCARNATION); swim_age_bin_fill(&header->age, age); } @@ -349,9 +359,9 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, return -1; memset(def, 0, sizeof(*def)); def->type = swim_fd_msg_type_MAX; - if (size != 2) { - diag_set(SwimError, "%s root map should have two keys - "\ - "message type and incarnation", prefix); + if (size != 3) { + diag_set(SwimError, "%s root map should have 3 keys - "\ + "message type, generation, incarnation", prefix); return -1; } for (int i = 0; i < (int) size; ++i) { @@ -370,6 +380,11 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, } def->type = key; break; + case SWIM_FD_GENERATION: + if (swim_decode_uint(pos, end, &def->age.generation, + prefix, "generation") != 0) + return -1; + break; case SWIM_FD_INCARNATION: if (swim_decode_uint(pos, end, &def->age.incarnation, prefix, "incarnation") != 0) @@ -419,7 +434,8 @@ swim_passport_bin_create(struct swim_passport_bin *passport) passport->k_uuid = SWIM_MEMBER_UUID; passport->m_uuid = 0xc4; passport->m_uuid_len = UUID_LEN; - swim_age_bin_create(&passport->age, SWIM_MEMBER_INCARNATION); + swim_age_bin_create(&passport->age, SWIM_MEMBER_GENERATION, + SWIM_MEMBER_INCARNATION); } void @@ -579,7 +595,8 @@ swim_quit_bin_create(struct swim_quit_bin *header, const struct swim_age *age) header->k_quit = SWIM_QUIT; assert(mp_sizeof_map(SWIM_AGE_BIN_SIZE) == 1); header->m_quit = 0x80 | SWIM_AGE_BIN_SIZE; - swim_age_bin_create(&header->age, SWIM_QUIT_INCARNATION); + swim_age_bin_create(&header->age, SWIM_QUIT_GENERATION, + SWIM_QUIT_INCARNATION); swim_age_bin_fill(&header->age, age); } diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index acd266db4..c65b29696 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -72,6 +72,7 @@ enum { * | | * | SWIM_FAILURE_DETECTION: { | * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | + * | SWIM_FD_GENERATION: uint, | * | SWIM_FD_INCARNATION: uint | * | }, | * | | @@ -83,6 +84,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_GENERATION: uint, | * | SWIM_MEMBER_INCARNATION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | @@ -97,6 +99,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_GENERATION: uint, | * | SWIM_MEMBER_INCARNATION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | @@ -106,6 +109,7 @@ enum { * | OR/AND | * | | * | SWIM_QUIT: { | + * | SWIM_QUIT_GENERATION: uint, | * | SWIM_QUIT_INCARNATION: uint | * | } | * | } | @@ -117,6 +121,12 @@ enum { * to reject/rewrite old data. */ struct swim_age { + /** + * Generation is a persistent part of age. Ages are + * compared firstly by this value, and only after by + * incarnation. + */ + uint64_t generation; /** * Incarnation is a volatile fully automatic part, which * is used to refute incorrect and rewrite old information @@ -127,8 +137,9 @@ struct swim_age { /** Initialize age. */ static inline void -swim_age_create(struct swim_age *age, uint64_t incarnation) +swim_age_create(struct swim_age *age, uint64_t generation, uint64_t incarnation) { + age->generation = generation; age->incarnation = incarnation; } @@ -138,7 +149,7 @@ enum { * storing an age should use this size so as to correctly * encode MessagePack map header. */ - SWIM_AGE_BIN_SIZE = 1, + SWIM_AGE_BIN_SIZE = 2, }; /** @@ -146,6 +157,12 @@ enum { * map. */ struct PACKED swim_age_bin { + /** mp_encode_uint(generation key) */ + uint8_t k_generation; + /** mp_encode_uint(64bit generation) */ + uint8_t m_generation; + uint64_t v_generation; + /** mp_encode_uint(incarnation key) */ uint8_t k_incarnation; /** mp_encode_uint(64bit incarnation) */ @@ -229,6 +246,7 @@ enum swim_fd_key { * considered dead, but a newer ping/ack was received from * it. */ + SWIM_FD_GENERATION, SWIM_FD_INCARNATION, }; @@ -245,7 +263,7 @@ extern const char *swim_fd_msg_type_strs[]; struct PACKED swim_fd_header_bin { /** mp_encode_uint(SWIM_FAILURE_DETECTION) */ uint8_t k_header; - /** mp_encode_map(2) */ + /** mp_encode_map(3) */ uint8_t m_header; /** mp_encode_uint(SWIM_FD_MSG_TYPE) */ @@ -253,7 +271,7 @@ struct PACKED swim_fd_header_bin { /** mp_encode_uint(enum swim_fd_msg_type) */ uint8_t v_type; - /** SWIM_FD_INCARNATION */ + /** SWIM_FD_GENERATION, SWIM_FD_INCARNATION */ struct swim_age_bin age; }; @@ -329,6 +347,7 @@ enum swim_member_key { SWIM_MEMBER_ADDRESS, SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, + SWIM_MEMBER_GENERATION, SWIM_MEMBER_INCARNATION, SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, @@ -360,7 +379,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, * date and TTD is > 0. */ struct PACKED swim_passport_bin { - /** mp_encode_map(5 or 6) */ + /** mp_encode_map(6 or 7) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -378,7 +397,7 @@ struct PACKED swim_passport_bin { uint8_t m_uuid_len; uint8_t v_uuid[UUID_LEN]; - /** SWIM_MEMBER_INCARNATION */ + /** SWIM_MEMBER_GENERATION, SWIM_MEMBER_INCARNATION */ struct swim_age_bin age; }; @@ -598,17 +617,18 @@ swim_route_bin_create(struct swim_route_bin *route, enum swim_quit_key { /** Age to ignore old quit messages. */ - SWIM_QUIT_INCARNATION = 0, + SWIM_QUIT_GENERATION = 0, + SWIM_QUIT_INCARNATION }; /** Quit section. Describes voluntary quit from the cluster. */ struct PACKED swim_quit_bin { /** mp_encode_uint(SWIM_QUIT) */ uint8_t k_quit; - /** mp_encode_map(1) */ + /** mp_encode_map(2) */ uint8_t m_quit; - /** SWIM_QUIT_INCARNATION */ + /** SWIM_QUIT_GENERATION, SWIM_QUIT_INCARNATION */ struct swim_age_bin age; }; diff --git a/src/lua/swim.c b/src/lua/swim.c index c3a0a9911..26646f41f 100644 --- a/src/lua/swim.c +++ b/src/lua/swim.c @@ -67,7 +67,8 @@ lua_swim_on_member_event(struct lua_State *L) static int lua_swim_new(struct lua_State *L) { - struct swim *s = swim_new(); + uint64_t generation = luaL_checkuint64(L, 1); + struct swim *s = swim_new(generation); *(struct swim **) luaL_pushcdata(L, ctid_swim_ptr) = s; if (s != NULL) return 1; diff --git a/src/lua/swim.lua b/src/lua/swim.lua index e411a397d..c6e843dbd 100644 --- a/src/lua/swim.lua +++ b/src/lua/swim.lua @@ -30,8 +30,9 @@ ffi.cdef[[ SWIM_EV_NEW_URI = 0b00000100, SWIM_EV_NEW_INCARNATION = 0b00001000, SWIM_EV_NEW_PAYLOAD = 0b00010000, - SWIM_EV_UPDATE = 0b00011110, - SWIM_EV_DROP = 0b00100000, + SWIM_EV_NEW_GENERATION = 0b00100000, + SWIM_EV_UPDATE = 0b00111110, + SWIM_EV_DROP = 0b01000000, }; bool @@ -95,6 +96,9 @@ ffi.cdef[[ uint64_t swim_member_incarnation(const struct swim_member *member); + uint64_t + swim_member_generation(const struct swim_member *member); + const char * swim_member_payload(const struct swim_member *member, int *size); @@ -315,6 +319,11 @@ local function swim_member_incarnation(m) return capi.swim_member_incarnation(ptr) end +local function swim_member_generation(m) + local ptr = swim_check_member(m, 'member:generation()') + return capi.swim_member_generation(ptr) +end + local function swim_member_is_dropped(m) local ptr = swim_check_member(m, 'member:is_dropped()') return capi.swim_member_is_dropped(ptr) @@ -367,9 +376,10 @@ local function swim_member_payload(m) -- Both the age and the flag are needed. Age is not enough, -- because a new age can be disseminated earlier than a new -- payload. For example, via ACK messages. - local key1 = capi.swim_member_incarnation(ptr) - local key2 = capi.swim_member_is_payload_up_to_date(ptr) - if key1 == m.p_key1 and key2 == m.p_key2 then + local key1 = capi.swim_member_generation(ptr) + local key2 = capi.swim_member_incarnation(ptr) + local key3 = capi.swim_member_is_payload_up_to_date(ptr) + if key1 == m.p_key1 and key2 == m.p_key2 and key3 == m.p_key3 then return m.p end local cdata, size = swim_member_payload_raw(ptr) @@ -386,6 +396,7 @@ local function swim_member_payload(m) rawset(m, 'p', result) rawset(m, 'p_key1', key1) rawset(m, 'p_key2', key2) + rawset(m, 'p_key3', key3) return result end @@ -412,6 +423,7 @@ local function swim_member_serialize(m) status = swim_member_status(m), uuid = swim_member_uuid(m), uri = swim_member_uri(m), + generation = swim_member_generation(m), incarnation = swim_member_incarnation(m), -- There are many ways to interpret a payload, and it is -- not a job of a serialization method. Only binary size @@ -427,6 +439,7 @@ local swim_member_mt = { uuid = swim_member_uuid, uri = swim_member_uri, incarnation = swim_member_incarnation, + generation = swim_member_generation, payload_cdata = swim_member_payload_cdata, payload_str = swim_member_payload_str, payload = swim_member_payload, @@ -722,6 +735,9 @@ local swim_member_event_index = { is_new_uri = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_URI) ~= 0 end, + is_new_generation = function(self) + return bit.band(self[1], capi.SWIM_EV_NEW_GENERATION) ~= 0 + end, is_new_incarnation = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0 end, @@ -910,10 +926,23 @@ local cache_table_mt = { __mode = 'v' } -- -- Create a new SWIM instance, and configure if @a cfg is --- provided. +-- provided. @a cfg can contain one non-dynamic parameter - +-- generation. It can't be changed later with swim:cfg(). -- local function swim_new(cfg) - local ptr = internal.swim_new() + local generation = 0 + if cfg and type(cfg) == 'table' and cfg.generation ~= nil then + generation = cfg.generation + if type(generation) ~= 'number' or generation < 0 or + generation ~= math.floor(generation) then + return error('swim.new: expected non-negative integer generation') + end + cfg = table.copy(cfg) + -- swim:cfg() should not see that parameter. It takes only + -- dynamic ones. + cfg.generation = nil + end + local ptr = internal.swim_new(generation) if ptr == nil then return nil, box.error.last() end diff --git a/test/swim/swim.result b/test/swim/swim.result index cceee2595..27acb0983 100644 --- a/test/swim/swim.result +++ b/test/swim/swim.result @@ -305,11 +305,12 @@ s1:self():uuid() ... old_self --- -- uri: 127.0.0.1:<port> +- generation: 0 + payload_size: 0 status: left incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1:<port> ... -- Can't remove self. s1:remove_member(uuid(3)) @@ -389,11 +390,12 @@ s = s1:self() ... s --- -- uri: 127.0.0.1:<port> +- generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1:<port> ... s:status() --- @@ -488,11 +490,12 @@ s1:member_by_uuid(uuid(2)) -- UUID can be cdata. s1:member_by_uuid(s:uuid()) --- -- uri: 127.0.0.1:<port> +- generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1:<port> ... s1:quit() --- @@ -776,11 +779,12 @@ s.pairs() iterate() --- - - - 00000000-0000-1000-8000-000000000001 - - uri: 127.0.0.1:<port> + - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1:<port> ... s:add_member({uuid = uuid(2), uri = uri()}) --- @@ -789,17 +793,19 @@ s:add_member({uuid = uuid(2), uri = uri()}) iterate() --- - - - 00000000-0000-1000-8000-000000000002 - - uri: 127.0.0.1:<port> + - generation: 0 + payload_size: 0 status: alive incarnation: 0 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 + uri: 127.0.0.1:<port> - - 00000000-0000-1000-8000-000000000001 - - uri: 127.0.0.1:<port> + - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1:<port> ... s:add_member({uuid = uuid(3), uri = uri()}) --- @@ -808,23 +814,26 @@ s:add_member({uuid = uuid(3), uri = uri()}) iterate() --- - - - 00000000-0000-1000-8000-000000000001 - - uri: 127.0.0.1:<port> + - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1:<port> - - 00000000-0000-1000-8000-000000000003 - - uri: 127.0.0.1:<port> + - generation: 0 + payload_size: 0 status: alive incarnation: 0 uuid: 00000000-0000-1000-8000-000000000003 - payload_size: 0 + uri: 127.0.0.1:<port> - - 00000000-0000-1000-8000-000000000002 - - uri: 127.0.0.1:<port> + - generation: 0 + payload_size: 0 status: alive incarnation: 0 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 + uri: 127.0.0.1:<port> ... s:delete() --- @@ -962,11 +971,12 @@ s2 = s:member_by_uuid(uuid(2)) ... s2 --- -- uri: 127.0.0.1:<port> +- generation: 0 + payload_size: 0 status: alive incarnation: 0 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 + uri: 127.0.0.1:<port> ... -- Next lookups return the same member table. s2_old_uri = s2:uri() @@ -1208,11 +1218,12 @@ while s1:member_by_uuid(s2:self():uuid()) == nil do fiber.sleep(0.01) end ... s2:member_by_uuid(s1:self():uuid()) --- -- uri: 127.0.0.1:<port> +- generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1:<port> ... s1:delete() --- @@ -1285,11 +1296,12 @@ while #m_list < 1 do fiber.sleep(0) end ... m_list --- -- - uri: 127.0.0.1:<port> +- - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1:<port> ... e_list --- @@ -1326,11 +1338,12 @@ while s1:size() ~= 2 do fiber.sleep(0.01) end -- sleeps. m_list --- -- - uri: 127.0.0.1:<port> +- - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 + uri: 127.0.0.1:<port> ... e_list --- @@ -1368,21 +1381,24 @@ while #m_list ~= 3 do fiber.sleep(0.01) end ... m_list --- -- - uri: 127.0.0.1:<port> +- - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 - - uri: 127.0.0.1:<port> + uri: 127.0.0.1:<port> + - generation: 0 + payload_size: 8 status: alive incarnation: 2 uuid: 00000000-0000-1000-8000-000000000001 + uri: 127.0.0.1:<port> + - generation: 0 payload_size: 8 - - uri: 127.0.0.1:<port> status: alive incarnation: 2 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 8 + uri: 127.0.0.1:<port> ... e_list --- @@ -1425,16 +1441,18 @@ fiber.sleep(0) -- Two events - status update to 'left', and 'drop'. m_list --- -- - uri: 127.0.0.1:<port> +- - generation: 0 + payload_size: 0 status: left incarnation: 1 uuid: 00000000-0000-1000-8000-000000000002 + uri: 127.0.0.1:<port> + - generation: 0 payload_size: 0 - - uri: 127.0.0.1:<port> status: left incarnation: 1 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 + uri: 127.0.0.1:<port> ... e_list --- @@ -1484,6 +1502,96 @@ ctx_list s1:delete() --- ... +-- +-- gh-4280: 'generation' counter to detect restarts and refute +-- information left from previous lifes of a SWIM instance. +-- +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 0}) +--- +... +s2 = swim.new({uuid = uuid(2), uri = 0, heartbeat_rate = 0.01}) +--- +... +s2:add_member({uuid = uuid(1), uri = s1:self():uri()}) +--- +- true +... +s1_view = s2:member_by_uuid(uuid(1)) +--- +... +s1:set_payload('payload 1') +--- +- true +... +while not s1_view:payload() do fiber.sleep(0.1) end +--- +... +s1_view:payload() +--- +- payload 1 +... +s1:self():incarnation() +--- +- 2 +... +s1:self():generation() +--- +- 0 +... +-- Now S2 knows S1's payload as 'payload 1'. +is_new_generation = false +--- +... +_ = s2:on_member_event(function(m, e) is_new_generation = is_new_generation or e:is_new_generation() end) +--- +... +s1:delete() +--- +... +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 1}) +--- +... +s1:add_member({uuid = uuid(2), uri = s2:self():uri()}) +--- +- true +... +s1:set_payload('payload 2') +--- +- true +... +s1:self():incarnation() +--- +- 2 +... +-- Without the new generation S2 would believe that S1's payload +-- is still 'payload 1'. Because 'payload 1' and 'payload 2' were +-- disseminated with the same incarnation. +s1:self():generation() +--- +- 1 +... +while s1_view:payload() ~= 'payload 2' do fiber.sleep(0.1) end +--- +... +s1_view:payload() +--- +- payload 2 +... +is_new_generation +--- +- true +... +-- Generation is static parameter. +s1:cfg({generation = 5}) +--- +- error: 'swim:cfg: unknown option generation' +... +s1:delete() +--- +... +s2:delete() +--- +... test_run:cmd("clear filter") --- - true diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua index 576219b4d..886c00af5 100644 --- a/test/swim/swim.test.lua +++ b/test/swim/swim.test.lua @@ -499,4 +499,42 @@ ctx_list s1:delete() +-- +-- gh-4280: 'generation' counter to detect restarts and refute +-- information left from previous lifes of a SWIM instance. +-- +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 0}) +s2 = swim.new({uuid = uuid(2), uri = 0, heartbeat_rate = 0.01}) +s2:add_member({uuid = uuid(1), uri = s1:self():uri()}) +s1_view = s2:member_by_uuid(uuid(1)) +s1:set_payload('payload 1') +while not s1_view:payload() do fiber.sleep(0.1) end +s1_view:payload() +s1:self():incarnation() +s1:self():generation() + +-- Now S2 knows S1's payload as 'payload 1'. + +is_new_generation = false +_ = s2:on_member_event(function(m, e) is_new_generation = is_new_generation or e:is_new_generation() end) +s1:delete() +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 1}) +s1:add_member({uuid = uuid(2), uri = s2:self():uri()}) +s1:set_payload('payload 2') +s1:self():incarnation() +-- Without the new generation S2 would believe that S1's payload +-- is still 'payload 1'. Because 'payload 1' and 'payload 2' were +-- disseminated with the same incarnation. +s1:self():generation() + +while s1_view:payload() ~= 'payload 2' do fiber.sleep(0.1) end +s1_view:payload() +is_new_generation + +-- Generation is static parameter. +s1:cfg({generation = 5}) + +s1:delete() +s2:delete() + test_run:cmd("clear filter") diff --git a/test/unit/swim.c b/test/unit/swim.c index bffc0985d..4371b56f0 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -133,7 +133,7 @@ swim_test_cfg(void) { swim_start_test(16); - struct swim *s = swim_new(); + struct swim *s = swim_new(0); assert(s != NULL); is(swim_cfg(s, NULL, -1, -1, -1, NULL), -1, "first cfg failed - no URI"); ok(swim_error_check_match("mandatory"), "diag says 'mandatory'"); @@ -149,7 +149,7 @@ swim_test_cfg(void) is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\ "URI"); - struct swim *s2 = swim_new(); + struct swim *s2 = swim_new(0); assert(s2 != NULL); const char *bad_uri1 = "127.1.1.1.1.1.1:1"; const char *bad_uri2 = "google.com:1"; @@ -391,16 +391,17 @@ swim_test_refute(void) fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_SUSPECTED, 4) != 0); swim_cluster_set_drop(cluster, 1, 0); - is(swim_cluster_wait_age(cluster, 1, 1, 1, 1), 0, + is(swim_cluster_wait_age(cluster, 1, 1, 0, 1, 1), 0, "S2 increments its own incarnation to refute its suspicion"); - is(swim_cluster_wait_age(cluster, 0, 1, 1, 1), 0, + is(swim_cluster_wait_age(cluster, 0, 1, 0, 1, 1), 0, "new incarnation has reached S1 with a next round message"); + fail_if(swim_cluster_member_generation(cluster, 1, 1) != 0); swim_cluster_restart_node(cluster, 1); is(swim_cluster_member_incarnation(cluster, 1, 1), 0, "after restart S2's incarnation is 0 again"); - is(swim_cluster_wait_age(cluster, 1, 1, 1, 1), 0, - "S2 learned its old bigger incarnation 1 from S0"); + is(swim_cluster_member_generation(cluster, 1, 1), 1, + "but generation is new"); swim_cluster_delete(cluster); swim_finish_test(); @@ -527,7 +528,7 @@ swim_test_quit(void) * old LEFT status. */ swim_cluster_restart_node(cluster, 0); - is(swim_cluster_wait_age(cluster, 0, 0, 1, 2), 0, + is(swim_cluster_wait_age(cluster, 0, 0, 1, 0, 2), 0, "quited member S1 has returned and refuted the old status"); fail_if(swim_cluster_wait_fullmesh(cluster, 2) != 0); /* @@ -557,7 +558,7 @@ swim_test_quit(void) /* Now allow S2 to get the 'self-quit' message. */ swim_cluster_unblock_io(cluster, 1); - is(swim_cluster_wait_age(cluster, 1, 1, 2, 0), 0, "S2 finally got "\ + is(swim_cluster_wait_age(cluster, 1, 1, 1, 1, 0), 0, "S2 finally got "\ "'quit' message from S1, but with its 'own' UUID - refute it") swim_cluster_delete(cluster); @@ -1089,7 +1090,7 @@ swim_test_triggers(void) swim_member_unref(tctx.ctx.member); /* Check that recfg fires incarnation update trigger. */ - s1 = swim_new(); + s1 = swim_new(0); struct tt_uuid uuid = uuid_nil; uuid.time_low = 1; fail_if(swim_cfg(s1, "127.0.0.1:1", -1, -1, -1, &uuid) != 0); @@ -1110,10 +1111,39 @@ swim_test_triggers(void) swim_finish_test(); } +static void +swim_test_generation(void) +{ + swim_start_test(3); + + struct swim_cluster *cluster = swim_cluster_new(2); + swim_cluster_interconnect(cluster, 0, 1); + + const char *p1 = "payload 1"; + int p1_size = strlen(p1); + swim_cluster_member_set_payload(cluster, 0, p1, p1_size); + is(swim_cluster_wait_payload_everywhere(cluster, 0, p1, p1_size, 1), 0, + "S1 disseminated its payload to S2"); + + swim_cluster_restart_node(cluster, 0); + const char *p2 = "payload 2"; + int p2_size = strlen(p2); + swim_cluster_member_set_payload(cluster, 0, p2, p2_size); + is(swim_cluster_wait_payload_everywhere(cluster, 0, p2, p2_size, 2), 0, + "S1 restarted and set another payload. Without generation it could "\ + "lead to never disseminated new payload."); + is(swim_cluster_member_generation(cluster, 1, 0), 1, + "S2 sees new generation of S1"); + + swim_cluster_delete(cluster); + + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(21); + swim_start_test(22); (void) ap; swim_test_ev_init(); @@ -1140,6 +1170,7 @@ main_f(va_list ap) swim_test_encryption(); swim_test_slow_net(); swim_test_triggers(); + swim_test_generation(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index 2968a2da7..bad3c30d0 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..21 +1..22 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -89,7 +89,7 @@ ok 7 - subtests ok 1 - S2 increments its own incarnation to refute its suspicion ok 2 - new incarnation has reached S1 with a next round message ok 3 - after restart S2's incarnation is 0 again - ok 4 - S2 learned its old bigger incarnation 1 from S0 + ok 4 - but generation is new ok 8 - subtests *** swim_test_refute: done *** *** swim_test_basic_gossip *** @@ -227,4 +227,11 @@ ok 20 - subtests ok 22 - local URI update warns about incarnation update ok 21 - subtests *** swim_test_triggers: done *** + *** swim_test_generation *** + 1..3 + ok 1 - S1 disseminated its payload to S2 + ok 2 - S1 restarted and set another payload. Without generation it could lead to never disseminated new payload. + ok 3 - S2 sees new generation of S1 +ok 22 - subtests + *** swim_test_generation: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index e646c3a47..228b2f752 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -157,6 +157,11 @@ struct swim_node { * that instance. */ struct tt_uuid uuid; + /** + * Generation counter. Persisted for restarts, when SWIM + * is explicitly deleted before restart. + */ + uint64_t generation; /** * Filter to drop packets with a certain probability * from/to a specified direction. @@ -212,7 +217,8 @@ swim_test_event_cb(struct trigger *trigger, void *event) static inline void swim_node_create(struct swim_node *n, int id) { - n->swim = swim_new(); + n->generation = 0; + n->swim = swim_new(n->generation); assert(n->swim != NULL); struct trigger *t = (struct trigger *) malloc(sizeof(*t)); trigger_create(t, swim_test_event_cb, NULL, (trigger_f0) free); @@ -259,7 +265,8 @@ swim_cluster_new(int size) void swim_cluster_set_ack_timeout(struct swim_cluster *cluster, double ack_timeout) { - swim_cluster_set_cfg(cluster, swim_cfg, NULL, -1, ack_timeout, -1, NULL); + swim_cluster_set_cfg(cluster, swim_cfg, NULL, -1, ack_timeout, -1, + NULL); cluster->ack_timeout = ack_timeout; } @@ -359,6 +366,17 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, return swim_member_incarnation(m); } +uint64_t +swim_cluster_member_generation(struct swim_cluster *cluster, int node_id, + int member_id) +{ + const struct swim_member *m = + swim_cluster_member_view(cluster, node_id, member_id); + if (m == NULL) + return UINT64_MAX; + return swim_member_generation(m); +} + const char * swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, int member_id, int *size) @@ -402,7 +420,7 @@ swim_cluster_restart_node(struct swim_cluster *cluster, int i) &n->uuid)); swim_delete(s); } - s = swim_new(); + s = swim_new(++n->generation); assert(s != NULL); int rc = swim_cfg(s, uri, -1, cluster->ack_timeout, cluster->gc_mode, &n->uuid); @@ -713,6 +731,7 @@ struct swim_member_template { * target. */ bool need_check_age; + uint64_t generation; uint64_t incarnation; /** * True, if the payload should be checked to be equal to @@ -751,9 +770,10 @@ swim_member_template_set_status(struct swim_member_template *t, */ static inline void swim_member_template_set_age(struct swim_member_template *t, - uint64_t incarnation) + uint64_t generation, uint64_t incarnation) { t->need_check_age = true; + t->generation = generation; t->incarnation = incarnation; } @@ -778,22 +798,25 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) const struct swim_member *m = swim_cluster_member_view(cluster, t->node_id, t->member_id); enum swim_member_status status; - uint64_t incarnation; + uint64_t incarnation, generation; const char *payload; int payload_size; if (m != NULL) { status = swim_member_status(m); + generation = swim_member_generation(m); incarnation = swim_member_incarnation(m); payload = swim_member_payload(m, &payload_size); } else { status = swim_member_status_MAX; + generation = 0; incarnation = 0; payload = NULL; payload_size = 0; } if (t->need_check_status && status != t->status) return false; - if (t->need_check_age && incarnation != t->incarnation) + if (t->need_check_age && (generation != t->generation || + incarnation != t->incarnation)) return false; if (t->need_check_payload && (payload_size != t->payload_size || @@ -848,11 +871,11 @@ swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, int swim_cluster_wait_age(struct swim_cluster *cluster, int node_id, int member_id, - uint64_t incarnation, double timeout) + uint64_t generation, uint64_t incarnation, double timeout) { struct swim_member_template t; swim_member_template_create(&t, node_id, member_id); - swim_member_template_set_age(&t, incarnation); + swim_member_template_set_age(&t, generation, incarnation); return swim_wait_timeout(timeout, cluster, swim_loop_check_member, &t); } diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 309636b87..11adc017f 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -160,6 +160,10 @@ uint64_t swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, int member_id); +uint64_t +swim_cluster_member_generation(struct swim_cluster *cluster, int node_id, + int member_id); + const char * swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, int member_id, int *size); @@ -224,7 +228,8 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, */ int swim_cluster_wait_age(struct swim_cluster *cluster, int node_id, int member_id, - uint64_t incarnation, double timeout); + uint64_t generation, uint64_t incarnation, + double timeout); /** * Wait until a member with id @a member_id is seen with -- 2.20.1 (Apple Git-117) ^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] Re: [PATCH 2/2] swim: introduce generation 2019-06-20 21:23 ` [tarantool-patches] [PATCH 2/2] swim: introduce generation Vladislav Shpilevoy @ 2019-06-21 6:53 ` Konstantin Osipov 2019-06-21 19:03 ` Vladislav Shpilevoy 0 siblings, 1 reply; 9+ messages in thread From: Konstantin Osipov @ 2019-06-21 6:53 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/21 00:25]: Generation and incarnation look and sound similar, so it is hard to tell which is which. I'd like to suggest renaming 'generation' to 'epoch'. "Age" would be a wrong name for a pair "epoch" + "incarnation". Besides, swim's term for the counter is "incarnation", so I would stick to the swim terminology. I suggest this: struct swim_incarnation { int64_t term; // Derived from RAFT word "term", persisted between restarts int64_t version; // Reflects swim member state version }; I am open to alternatives too. > SWIM uses incarnation to refute old information, but it is not > enough when restarts are possible. If an instance restarts, its > incarnation is reset to 0. After several local and fast updates > it gets N. But it is possible, that other instances also know > incarnation of this instance as N, from its previous life, but > with different information. They will never take new version of > data, because their current version is also considered actual. > > As a result, incarnation is not enough. There was a necessity to > create a persistent part of incarnation. This patch introduces it > and calls 'generation'. As an additional profit, generation > allows to react on instance restart in user defined triggers. > > Closes #4280 > > @TarantoolBot document > Title: SWIM generation > > Generation is a persistent part of incarnation allowing users to > refute old pieces of information left from previous lifes of an > instance. It is a static attribute set when a SWIM instance is > created, and can't be changed without restarting the instance. > > Generation not only helps with overriding old information, but > also can be used to detect restarts in user defined triggers. > > How to set generation: > ```Lua > swim = require('swim') > s = swim.new({generation = <value>}) > ``` > Generation can't be set in `swim:cfg`. If it is omitted, then 0 > is used by default. But be careful - if the instance is started > not a first time, it is safer to use a new generation. Ideally it > should be persisted somehow: in a file, in a space, in a global > service. > > How to detect restarts: > ```Lua > swim = require('swim') > s = swim.new() > s:on_member_event(function(m, e) > if e:is_new_generation() then > ... -- Process restart. > end > end) > ``` > > `is_new_generation` is a new method of event object passed into > triggers. > > How to learn generation - use new `swim_member:generation()` > method. > > Binary protocol is updated. Now Protocol Logic section looks like > this: > > +-------------------Protocol logic section--------------------+ > | map { | > | 0 = SWIM_SRC_UUID: 16 byte UUID, | > | | > | AND | > | | > | 2 = SWIM_FAILURE_DETECTION: map { | > | 0 = SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | > | 1 = SWIM_FD_GENERATION: uint, | > | 2 = SWIM_FD_INCARNATION: uint | > | }, | > | | > | OR/AND | > | | > | 3 = SWIM_DISSEMINATION: array [ | > | map { | > | 0 = SWIM_MEMBER_STATUS: uint, | > | enum member_status, | > | 1 = SWIM_MEMBER_ADDRESS: uint, ip, | > | 2 = SWIM_MEMBER_PORT: uint, port, | > | 3 = SWIM_MEMBER_UUID: 16 byte UUID, | > | 4 = SWIM_MEMBER_GENERATION: uint, | > | 5 = SWIM_MEMBER_INCARNATION: uint, | > | 6 = SWIM_MEMBER_PAYLOAD: bin | > | }, | > | ... | > | ], | > | | > | OR/AND | > | | > | 1 = SWIM_ANTI_ENTROPY: array [ | > | map { | > | 0 = SWIM_MEMBER_STATUS: uint, | > | enum member_status, | > | 1 = SWIM_MEMBER_ADDRESS: uint, ip, | > | 2 = SWIM_MEMBER_PORT: uint, port, | > | 3 = SWIM_MEMBER_UUID: 16 byte UUID, | > | 4 = SWIM_MEMBER_GENERATION: uint, | > | 5 = SWIM_MEMBER_INCARNATION: uint, | > | 6 = SWIM_MEMBER_PAYLOAD: bin | > | }, | > | ... | > | ], | > | | > | OR/AND | > | | > | 4 = SWIM_QUIT: map { | > | 0 = SWIM_QUIT_GENERATION: uint, | > | 1 = SWIM_QUIT_INCARNATION: uint | > | } | > | } | > +-------------------------------------------------------------+ > > Note - SWIM_FD_INCARNATION, SWIM_MEMBER_INCARNATION, > SWIM_MEMBER_PAYLOAD, SWIM_QUIT_INCARNATION got new values. This > is because 1) the SWIM is not released yet, and it is legal to > change values, 2) I wanted to emphasize that 'generation' is > first/upper part of member age, 'incarnation' is second/lower > part. > --- > extra/exports | 1 + > src/lib/swim/swim.c | 46 ++++++++-- > src/lib/swim/swim.h | 18 +++- > src/lib/swim/swim_proto.c | 31 +++++-- > src/lib/swim/swim_proto.h | 38 ++++++-- > src/lua/swim.c | 3 +- > src/lua/swim.lua | 43 +++++++-- > test/swim/swim.result | 176 +++++++++++++++++++++++++++++------- > test/swim/swim.test.lua | 38 ++++++++ > test/unit/swim.c | 51 +++++++++-- > test/unit/swim.result | 11 ++- > test/unit/swim_test_utils.c | 39 ++++++-- > test/unit/swim_test_utils.h | 7 +- > 13 files changed, 414 insertions(+), 88 deletions(-) > > diff --git a/extra/exports b/extra/exports > index b8c42c0df..a4b79099d 100644 > --- a/extra/exports > +++ b/extra/exports > @@ -108,6 +108,7 @@ swim_iterator_close > swim_member_uri > swim_member_uuid > swim_member_incarnation > +swim_member_generation > swim_member_payload > swim_member_ref > swim_member_unref > diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c > index b55945a34..778d1b291 100644 > --- a/src/lib/swim/swim.c > +++ b/src/lib/swim/swim.c > @@ -223,11 +223,24 @@ static inline int > swim_age_cmp(const struct swim_age *l, const struct swim_age *r, > enum swim_ev_mask *diff) > { > - if (l->incarnation == r->incarnation) { > + /* > + * The most likely path, checked first and foremost > + * explicitly. > + */ > + if (l->generation == r->generation && > + l->incarnation == r->incarnation) { > *diff = 0; > return 0; > } > *diff = SWIM_EV_NEW_INCARNATION; > + if (l->generation < r->generation) { > + *diff |= SWIM_EV_NEW_GENERATION; > + return -1; > + } > + if (l->generation > r->generation) { > + *diff |= SWIM_EV_NEW_GENERATION; > + return 1; > + } > return l->incarnation < r->incarnation ? -1 : 1; > } > > @@ -441,6 +454,15 @@ struct swim { > * status. > */ > struct swim_member *self; > + /** > + * Generation of that instance is set when the latter is > + * created. It is actual only until the instance is > + * configured. After that the instance can learn a bigger > + * own generation from other members. Despite meaning > + * in fact a wrong usage of SWIM generations, it is still > + * possible. > + */ > + uint64_t initial_generation; > /** > * Scheduler of output requests, receiver of incoming > * ones. > @@ -1663,8 +1685,8 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, > uint32_t size; > if (swim_decode_map(pos, end, &size, prefix, "root") != 0) > return -1; > - if (size != 1) { > - diag_set(SwimError, "%s map of size 1 is expected", prefix); > + if (size != 2) { > + diag_set(SwimError, "%s map of size 2 is expected", prefix); > return -1; > } > struct swim_age age; > @@ -1673,6 +1695,11 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, > if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) > return -1; > switch (key) { > + case SWIM_QUIT_GENERATION: > + if (swim_decode_uint(pos, end, &age.generation, prefix, > + "generation") != 0) > + return -1; > + break; > case SWIM_QUIT_INCARNATION: > if (swim_decode_uint(pos, end, &age.incarnation, prefix, > "incarnation") != 0) > @@ -1811,13 +1838,14 @@ swim_event_handler_f(va_list va) > > > struct swim * > -swim_new(void) > +swim_new(uint64_t generation) > { > struct swim *swim = (struct swim *) calloc(1, sizeof(*swim)); > if (swim == NULL) { > diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim"); > return NULL; > } > + swim->initial_generation = generation; > swim->members = mh_swim_table_new(); > if (swim->members == NULL) { > free(swim); > @@ -1924,7 +1952,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, > return -1; > } > struct swim_age age; > - swim_age_create(&age, 0); > + swim_age_create(&age, swim->initial_generation, 0); > new_self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, > &age, NULL, 0); > if (new_self == NULL) > @@ -2042,7 +2070,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) > struct swim_member *member = swim_find_member(swim, uuid); > if (member == NULL) { > struct swim_age age; > - swim_age_create(&age, 0); > + swim_age_create(&age, 0, 0); > member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &age, > NULL, -1); > return member == NULL ? -1 : 0; > @@ -2301,6 +2329,12 @@ swim_member_incarnation(const struct swim_member *member) > return member->age.incarnation; > } > > +uint64_t > +swim_member_generation(const struct swim_member *member) > +{ > + return member->age.generation; > +} > + > const char * > swim_member_payload(const struct swim_member *member, int *size) > { > diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h > index a42ace7c6..e5e6ec658 100644 > --- a/src/lib/swim/swim.h > +++ b/src/lib/swim/swim.h > @@ -66,9 +66,16 @@ enum swim_gc_mode { > * Create a new SWIM instance. Do not bind to a port or set any > * parameters. Allocation and initialization only. The function > * yields. > + * @param generation A user-defined upper part of the instance > + * age. Age consists of volatile incarnation and this > + * number. It is assumed that user persists generation > + * value, after restart increments it, and therefore other > + * instances can detect the restart. At the same time that > + * value is used to refute old attributes left from > + * previous lifes of that instance. > */ > struct swim * > -swim_new(void); > +swim_new(uint64_t generation); > > /** Check if a swim instance is configured. */ > bool > @@ -237,6 +244,10 @@ swim_member_uuid(const struct swim_member *member); > uint64_t > swim_member_incarnation(const struct swim_member *member); > > +/** Member's generation. */ > +uint64_t > +swim_member_generation(const struct swim_member *member); > + > /** Member's payload. */ > const char * > swim_member_payload(const struct swim_member *member, int *size); > @@ -281,9 +292,10 @@ enum swim_ev_mask { > SWIM_EV_NEW_URI = 0b00000100, > SWIM_EV_NEW_INCARNATION = 0b00001000, > SWIM_EV_NEW_PAYLOAD = 0b00010000, > + SWIM_EV_NEW_GENERATION = 0b00100000, > /* Shortcut to check for any update. */ > - SWIM_EV_UPDATE = 0b00011110, > - SWIM_EV_DROP = 0b00100000, > + SWIM_EV_UPDATE = 0b00111110, > + SWIM_EV_DROP = 0b01000000, > }; > > /** On member event trigger context. */ > diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c > index c42d67c0a..610c41068 100644 > --- a/src/lib/swim/swim_proto.c > +++ b/src/lib/swim/swim_proto.c > @@ -174,8 +174,11 @@ swim_check_inaddr_not_empty(const struct sockaddr_in *addr, const char *prefix, > * the age fields should be marked. > */ > static inline void > -swim_age_bin_create(struct swim_age_bin *bin, uint8_t incarnation_key) > +swim_age_bin_create(struct swim_age_bin *bin, uint8_t generation_key, > + uint8_t incarnation_key) > { > + bin->k_generation = generation_key; > + bin->m_generation = 0xcf; > bin->k_incarnation = incarnation_key; > bin->m_incarnation = 0xcf; > } > @@ -184,6 +187,7 @@ swim_age_bin_create(struct swim_age_bin *bin, uint8_t incarnation_key) > static inline void > swim_age_bin_fill(struct swim_age_bin *bin, const struct swim_age *age) > { > + bin->v_generation = mp_bswap_u64(age->generation); > bin->v_incarnation = mp_bswap_u64(age->incarnation); > } > > @@ -265,6 +269,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, > "member uuid") != 0) > return -1; > break; > + case SWIM_MEMBER_GENERATION: > + if (swim_decode_uint(pos, end, &def->age.generation, prefix, > + "member generation") != 0) > + return -1; > + break; > case SWIM_MEMBER_INCARNATION: > if (swim_decode_uint(pos, end, &def->age.incarnation, prefix, > "member incarnation") != 0) > @@ -335,7 +344,8 @@ swim_fd_header_bin_create(struct swim_fd_header_bin *header, > header->k_type = SWIM_FD_MSG_TYPE; > header->v_type = type; > > - swim_age_bin_create(&header->age, SWIM_FD_INCARNATION); > + swim_age_bin_create(&header->age, SWIM_FD_GENERATION, > + SWIM_FD_INCARNATION); > swim_age_bin_fill(&header->age, age); > } > > @@ -349,9 +359,9 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, > return -1; > memset(def, 0, sizeof(*def)); > def->type = swim_fd_msg_type_MAX; > - if (size != 2) { > - diag_set(SwimError, "%s root map should have two keys - "\ > - "message type and incarnation", prefix); > + if (size != 3) { > + diag_set(SwimError, "%s root map should have 3 keys - "\ > + "message type, generation, incarnation", prefix); > return -1; > } > for (int i = 0; i < (int) size; ++i) { > @@ -370,6 +380,11 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, > } > def->type = key; > break; > + case SWIM_FD_GENERATION: > + if (swim_decode_uint(pos, end, &def->age.generation, > + prefix, "generation") != 0) > + return -1; > + break; > case SWIM_FD_INCARNATION: > if (swim_decode_uint(pos, end, &def->age.incarnation, > prefix, "incarnation") != 0) > @@ -419,7 +434,8 @@ swim_passport_bin_create(struct swim_passport_bin *passport) > passport->k_uuid = SWIM_MEMBER_UUID; > passport->m_uuid = 0xc4; > passport->m_uuid_len = UUID_LEN; > - swim_age_bin_create(&passport->age, SWIM_MEMBER_INCARNATION); > + swim_age_bin_create(&passport->age, SWIM_MEMBER_GENERATION, > + SWIM_MEMBER_INCARNATION); > } > > void > @@ -579,7 +595,8 @@ swim_quit_bin_create(struct swim_quit_bin *header, const struct swim_age *age) > header->k_quit = SWIM_QUIT; > assert(mp_sizeof_map(SWIM_AGE_BIN_SIZE) == 1); > header->m_quit = 0x80 | SWIM_AGE_BIN_SIZE; > - swim_age_bin_create(&header->age, SWIM_QUIT_INCARNATION); > + swim_age_bin_create(&header->age, SWIM_QUIT_GENERATION, > + SWIM_QUIT_INCARNATION); > swim_age_bin_fill(&header->age, age); > } > > diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h > index acd266db4..c65b29696 100644 > --- a/src/lib/swim/swim_proto.h > +++ b/src/lib/swim/swim_proto.h > @@ -72,6 +72,7 @@ enum { > * | | > * | SWIM_FAILURE_DETECTION: { | > * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | > + * | SWIM_FD_GENERATION: uint, | > * | SWIM_FD_INCARNATION: uint | > * | }, | > * | | > @@ -83,6 +84,7 @@ enum { > * | SWIM_MEMBER_ADDRESS: uint, ip, | > * | SWIM_MEMBER_PORT: uint, port, | > * | SWIM_MEMBER_UUID: 16 byte UUID, | > + * | SWIM_MEMBER_GENERATION: uint, | > * | SWIM_MEMBER_INCARNATION: uint, | > * | SWIM_MEMBER_PAYLOAD: bin | > * | }, | > @@ -97,6 +99,7 @@ enum { > * | SWIM_MEMBER_ADDRESS: uint, ip, | > * | SWIM_MEMBER_PORT: uint, port, | > * | SWIM_MEMBER_UUID: 16 byte UUID, | > + * | SWIM_MEMBER_GENERATION: uint, | > * | SWIM_MEMBER_INCARNATION: uint, | > * | SWIM_MEMBER_PAYLOAD: bin | > * | }, | > @@ -106,6 +109,7 @@ enum { > * | OR/AND | > * | | > * | SWIM_QUIT: { | > + * | SWIM_QUIT_GENERATION: uint, | > * | SWIM_QUIT_INCARNATION: uint | > * | } | > * | } | > @@ -117,6 +121,12 @@ enum { > * to reject/rewrite old data. > */ > struct swim_age { > + /** > + * Generation is a persistent part of age. Ages are > + * compared firstly by this value, and only after by > + * incarnation. > + */ > + uint64_t generation; > /** > * Incarnation is a volatile fully automatic part, which > * is used to refute incorrect and rewrite old information > @@ -127,8 +137,9 @@ struct swim_age { > > /** Initialize age. */ > static inline void > -swim_age_create(struct swim_age *age, uint64_t incarnation) > +swim_age_create(struct swim_age *age, uint64_t generation, uint64_t incarnation) > { > + age->generation = generation; > age->incarnation = incarnation; > } > > @@ -138,7 +149,7 @@ enum { > * storing an age should use this size so as to correctly > * encode MessagePack map header. > */ > - SWIM_AGE_BIN_SIZE = 1, > + SWIM_AGE_BIN_SIZE = 2, > }; > > /** > @@ -146,6 +157,12 @@ enum { > * map. > */ > struct PACKED swim_age_bin { > + /** mp_encode_uint(generation key) */ > + uint8_t k_generation; > + /** mp_encode_uint(64bit generation) */ > + uint8_t m_generation; > + uint64_t v_generation; > + > /** mp_encode_uint(incarnation key) */ > uint8_t k_incarnation; > /** mp_encode_uint(64bit incarnation) */ > @@ -229,6 +246,7 @@ enum swim_fd_key { > * considered dead, but a newer ping/ack was received from > * it. > */ > + SWIM_FD_GENERATION, > SWIM_FD_INCARNATION, > }; > > @@ -245,7 +263,7 @@ extern const char *swim_fd_msg_type_strs[]; > struct PACKED swim_fd_header_bin { > /** mp_encode_uint(SWIM_FAILURE_DETECTION) */ > uint8_t k_header; > - /** mp_encode_map(2) */ > + /** mp_encode_map(3) */ > uint8_t m_header; > > /** mp_encode_uint(SWIM_FD_MSG_TYPE) */ > @@ -253,7 +271,7 @@ struct PACKED swim_fd_header_bin { > /** mp_encode_uint(enum swim_fd_msg_type) */ > uint8_t v_type; > > - /** SWIM_FD_INCARNATION */ > + /** SWIM_FD_GENERATION, SWIM_FD_INCARNATION */ > struct swim_age_bin age; > }; > > @@ -329,6 +347,7 @@ enum swim_member_key { > SWIM_MEMBER_ADDRESS, > SWIM_MEMBER_PORT, > SWIM_MEMBER_UUID, > + SWIM_MEMBER_GENERATION, > SWIM_MEMBER_INCARNATION, > SWIM_MEMBER_PAYLOAD, > swim_member_key_MAX, > @@ -360,7 +379,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, > * date and TTD is > 0. > */ > struct PACKED swim_passport_bin { > - /** mp_encode_map(5 or 6) */ > + /** mp_encode_map(6 or 7) */ > uint8_t m_header; > > /** mp_encode_uint(SWIM_MEMBER_STATUS) */ > @@ -378,7 +397,7 @@ struct PACKED swim_passport_bin { > uint8_t m_uuid_len; > uint8_t v_uuid[UUID_LEN]; > > - /** SWIM_MEMBER_INCARNATION */ > + /** SWIM_MEMBER_GENERATION, SWIM_MEMBER_INCARNATION */ > struct swim_age_bin age; > }; > > @@ -598,17 +617,18 @@ swim_route_bin_create(struct swim_route_bin *route, > > enum swim_quit_key { > /** Age to ignore old quit messages. */ > - SWIM_QUIT_INCARNATION = 0, > + SWIM_QUIT_GENERATION = 0, > + SWIM_QUIT_INCARNATION > }; > > /** Quit section. Describes voluntary quit from the cluster. */ > struct PACKED swim_quit_bin { > /** mp_encode_uint(SWIM_QUIT) */ > uint8_t k_quit; > - /** mp_encode_map(1) */ > + /** mp_encode_map(2) */ > uint8_t m_quit; > > - /** SWIM_QUIT_INCARNATION */ > + /** SWIM_QUIT_GENERATION, SWIM_QUIT_INCARNATION */ > struct swim_age_bin age; > }; > > diff --git a/src/lua/swim.c b/src/lua/swim.c > index c3a0a9911..26646f41f 100644 > --- a/src/lua/swim.c > +++ b/src/lua/swim.c > @@ -67,7 +67,8 @@ lua_swim_on_member_event(struct lua_State *L) > static int > lua_swim_new(struct lua_State *L) > { > - struct swim *s = swim_new(); > + uint64_t generation = luaL_checkuint64(L, 1); > + struct swim *s = swim_new(generation); > *(struct swim **) luaL_pushcdata(L, ctid_swim_ptr) = s; > if (s != NULL) > return 1; > diff --git a/src/lua/swim.lua b/src/lua/swim.lua > index e411a397d..c6e843dbd 100644 > --- a/src/lua/swim.lua > +++ b/src/lua/swim.lua > @@ -30,8 +30,9 @@ ffi.cdef[[ > SWIM_EV_NEW_URI = 0b00000100, > SWIM_EV_NEW_INCARNATION = 0b00001000, > SWIM_EV_NEW_PAYLOAD = 0b00010000, > - SWIM_EV_UPDATE = 0b00011110, > - SWIM_EV_DROP = 0b00100000, > + SWIM_EV_NEW_GENERATION = 0b00100000, > + SWIM_EV_UPDATE = 0b00111110, > + SWIM_EV_DROP = 0b01000000, > }; > > bool > @@ -95,6 +96,9 @@ ffi.cdef[[ > uint64_t > swim_member_incarnation(const struct swim_member *member); > > + uint64_t > + swim_member_generation(const struct swim_member *member); > + > const char * > swim_member_payload(const struct swim_member *member, int *size); > > @@ -315,6 +319,11 @@ local function swim_member_incarnation(m) > return capi.swim_member_incarnation(ptr) > end > > +local function swim_member_generation(m) > + local ptr = swim_check_member(m, 'member:generation()') > + return capi.swim_member_generation(ptr) > +end > + > local function swim_member_is_dropped(m) > local ptr = swim_check_member(m, 'member:is_dropped()') > return capi.swim_member_is_dropped(ptr) > @@ -367,9 +376,10 @@ local function swim_member_payload(m) > -- Both the age and the flag are needed. Age is not enough, > -- because a new age can be disseminated earlier than a new > -- payload. For example, via ACK messages. > - local key1 = capi.swim_member_incarnation(ptr) > - local key2 = capi.swim_member_is_payload_up_to_date(ptr) > - if key1 == m.p_key1 and key2 == m.p_key2 then > + local key1 = capi.swim_member_generation(ptr) > + local key2 = capi.swim_member_incarnation(ptr) > + local key3 = capi.swim_member_is_payload_up_to_date(ptr) > + if key1 == m.p_key1 and key2 == m.p_key2 and key3 == m.p_key3 then > return m.p > end > local cdata, size = swim_member_payload_raw(ptr) > @@ -386,6 +396,7 @@ local function swim_member_payload(m) > rawset(m, 'p', result) > rawset(m, 'p_key1', key1) > rawset(m, 'p_key2', key2) > + rawset(m, 'p_key3', key3) > return result > end > > @@ -412,6 +423,7 @@ local function swim_member_serialize(m) > status = swim_member_status(m), > uuid = swim_member_uuid(m), > uri = swim_member_uri(m), > + generation = swim_member_generation(m), > incarnation = swim_member_incarnation(m), > -- There are many ways to interpret a payload, and it is > -- not a job of a serialization method. Only binary size > @@ -427,6 +439,7 @@ local swim_member_mt = { > uuid = swim_member_uuid, > uri = swim_member_uri, > incarnation = swim_member_incarnation, > + generation = swim_member_generation, > payload_cdata = swim_member_payload_cdata, > payload_str = swim_member_payload_str, > payload = swim_member_payload, > @@ -722,6 +735,9 @@ local swim_member_event_index = { > is_new_uri = function(self) > return bit.band(self[1], capi.SWIM_EV_NEW_URI) ~= 0 > end, > + is_new_generation = function(self) > + return bit.band(self[1], capi.SWIM_EV_NEW_GENERATION) ~= 0 > + end, > is_new_incarnation = function(self) > return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0 > end, > @@ -910,10 +926,23 @@ local cache_table_mt = { __mode = 'v' } > > -- > -- Create a new SWIM instance, and configure if @a cfg is > --- provided. > +-- provided. @a cfg can contain one non-dynamic parameter - > +-- generation. It can't be changed later with swim:cfg(). > -- > local function swim_new(cfg) > - local ptr = internal.swim_new() > + local generation = 0 > + if cfg and type(cfg) == 'table' and cfg.generation ~= nil then > + generation = cfg.generation > + if type(generation) ~= 'number' or generation < 0 or > + generation ~= math.floor(generation) then > + return error('swim.new: expected non-negative integer generation') > + end > + cfg = table.copy(cfg) > + -- swim:cfg() should not see that parameter. It takes only > + -- dynamic ones. > + cfg.generation = nil > + end > + local ptr = internal.swim_new(generation) > if ptr == nil then > return nil, box.error.last() > end > diff --git a/test/swim/swim.result b/test/swim/swim.result > index cceee2595..27acb0983 100644 > --- a/test/swim/swim.result > +++ b/test/swim/swim.result > @@ -305,11 +305,12 @@ s1:self():uuid() > ... > old_self > --- > -- uri: 127.0.0.1:<port> > +- generation: 0 > + payload_size: 0 > status: left > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000001 > - payload_size: 0 > + uri: 127.0.0.1:<port> > ... > -- Can't remove self. > s1:remove_member(uuid(3)) > @@ -389,11 +390,12 @@ s = s1:self() > ... > s > --- > -- uri: 127.0.0.1:<port> > +- generation: 0 > + payload_size: 0 > status: alive > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000001 > - payload_size: 0 > + uri: 127.0.0.1:<port> > ... > s:status() > --- > @@ -488,11 +490,12 @@ s1:member_by_uuid(uuid(2)) > -- UUID can be cdata. > s1:member_by_uuid(s:uuid()) > --- > -- uri: 127.0.0.1:<port> > +- generation: 0 > + payload_size: 0 > status: alive > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000001 > - payload_size: 0 > + uri: 127.0.0.1:<port> > ... > s1:quit() > --- > @@ -776,11 +779,12 @@ s.pairs() > iterate() > --- > - - - 00000000-0000-1000-8000-000000000001 > - - uri: 127.0.0.1:<port> > + - generation: 0 > + payload_size: 0 > status: alive > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000001 > - payload_size: 0 > + uri: 127.0.0.1:<port> > ... > s:add_member({uuid = uuid(2), uri = uri()}) > --- > @@ -789,17 +793,19 @@ s:add_member({uuid = uuid(2), uri = uri()}) > iterate() > --- > - - - 00000000-0000-1000-8000-000000000002 > - - uri: 127.0.0.1:<port> > + - generation: 0 > + payload_size: 0 > status: alive > incarnation: 0 > uuid: 00000000-0000-1000-8000-000000000002 > - payload_size: 0 > + uri: 127.0.0.1:<port> > - - 00000000-0000-1000-8000-000000000001 > - - uri: 127.0.0.1:<port> > + - generation: 0 > + payload_size: 0 > status: alive > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000001 > - payload_size: 0 > + uri: 127.0.0.1:<port> > ... > s:add_member({uuid = uuid(3), uri = uri()}) > --- > @@ -808,23 +814,26 @@ s:add_member({uuid = uuid(3), uri = uri()}) > iterate() > --- > - - - 00000000-0000-1000-8000-000000000001 > - - uri: 127.0.0.1:<port> > + - generation: 0 > + payload_size: 0 > status: alive > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000001 > - payload_size: 0 > + uri: 127.0.0.1:<port> > - - 00000000-0000-1000-8000-000000000003 > - - uri: 127.0.0.1:<port> > + - generation: 0 > + payload_size: 0 > status: alive > incarnation: 0 > uuid: 00000000-0000-1000-8000-000000000003 > - payload_size: 0 > + uri: 127.0.0.1:<port> > - - 00000000-0000-1000-8000-000000000002 > - - uri: 127.0.0.1:<port> > + - generation: 0 > + payload_size: 0 > status: alive > incarnation: 0 > uuid: 00000000-0000-1000-8000-000000000002 > - payload_size: 0 > + uri: 127.0.0.1:<port> > ... > s:delete() > --- > @@ -962,11 +971,12 @@ s2 = s:member_by_uuid(uuid(2)) > ... > s2 > --- > -- uri: 127.0.0.1:<port> > +- generation: 0 > + payload_size: 0 > status: alive > incarnation: 0 > uuid: 00000000-0000-1000-8000-000000000002 > - payload_size: 0 > + uri: 127.0.0.1:<port> > ... > -- Next lookups return the same member table. > s2_old_uri = s2:uri() > @@ -1208,11 +1218,12 @@ while s1:member_by_uuid(s2:self():uuid()) == nil do fiber.sleep(0.01) end > ... > s2:member_by_uuid(s1:self():uuid()) > --- > -- uri: 127.0.0.1:<port> > +- generation: 0 > + payload_size: 0 > status: alive > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000001 > - payload_size: 0 > + uri: 127.0.0.1:<port> > ... > s1:delete() > --- > @@ -1285,11 +1296,12 @@ while #m_list < 1 do fiber.sleep(0) end > ... > m_list > --- > -- - uri: 127.0.0.1:<port> > +- - generation: 0 > + payload_size: 0 > status: alive > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000001 > - payload_size: 0 > + uri: 127.0.0.1:<port> > ... > e_list > --- > @@ -1326,11 +1338,12 @@ while s1:size() ~= 2 do fiber.sleep(0.01) end > -- sleeps. > m_list > --- > -- - uri: 127.0.0.1:<port> > +- - generation: 0 > + payload_size: 0 > status: alive > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000002 > - payload_size: 0 > + uri: 127.0.0.1:<port> > ... > e_list > --- > @@ -1368,21 +1381,24 @@ while #m_list ~= 3 do fiber.sleep(0.01) end > ... > m_list > --- > -- - uri: 127.0.0.1:<port> > +- - generation: 0 > + payload_size: 0 > status: alive > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000002 > - payload_size: 0 > - - uri: 127.0.0.1:<port> > + uri: 127.0.0.1:<port> > + - generation: 0 > + payload_size: 8 > status: alive > incarnation: 2 > uuid: 00000000-0000-1000-8000-000000000001 > + uri: 127.0.0.1:<port> > + - generation: 0 > payload_size: 8 > - - uri: 127.0.0.1:<port> > status: alive > incarnation: 2 > uuid: 00000000-0000-1000-8000-000000000001 > - payload_size: 8 > + uri: 127.0.0.1:<port> > ... > e_list > --- > @@ -1425,16 +1441,18 @@ fiber.sleep(0) > -- Two events - status update to 'left', and 'drop'. > m_list > --- > -- - uri: 127.0.0.1:<port> > +- - generation: 0 > + payload_size: 0 > status: left > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000002 > + uri: 127.0.0.1:<port> > + - generation: 0 > payload_size: 0 > - - uri: 127.0.0.1:<port> > status: left > incarnation: 1 > uuid: 00000000-0000-1000-8000-000000000002 > - payload_size: 0 > + uri: 127.0.0.1:<port> > ... > e_list > --- > @@ -1484,6 +1502,96 @@ ctx_list > s1:delete() > --- > ... > +-- > +-- gh-4280: 'generation' counter to detect restarts and refute > +-- information left from previous lifes of a SWIM instance. > +-- > +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 0}) > +--- > +... > +s2 = swim.new({uuid = uuid(2), uri = 0, heartbeat_rate = 0.01}) > +--- > +... > +s2:add_member({uuid = uuid(1), uri = s1:self():uri()}) > +--- > +- true > +... > +s1_view = s2:member_by_uuid(uuid(1)) > +--- > +... > +s1:set_payload('payload 1') > +--- > +- true > +... > +while not s1_view:payload() do fiber.sleep(0.1) end > +--- > +... > +s1_view:payload() > +--- > +- payload 1 > +... > +s1:self():incarnation() > +--- > +- 2 > +... > +s1:self():generation() > +--- > +- 0 > +... > +-- Now S2 knows S1's payload as 'payload 1'. > +is_new_generation = false > +--- > +... > +_ = s2:on_member_event(function(m, e) is_new_generation = is_new_generation or e:is_new_generation() end) > +--- > +... > +s1:delete() > +--- > +... > +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 1}) > +--- > +... > +s1:add_member({uuid = uuid(2), uri = s2:self():uri()}) > +--- > +- true > +... > +s1:set_payload('payload 2') > +--- > +- true > +... > +s1:self():incarnation() > +--- > +- 2 > +... > +-- Without the new generation S2 would believe that S1's payload > +-- is still 'payload 1'. Because 'payload 1' and 'payload 2' were > +-- disseminated with the same incarnation. > +s1:self():generation() > +--- > +- 1 > +... > +while s1_view:payload() ~= 'payload 2' do fiber.sleep(0.1) end > +--- > +... > +s1_view:payload() > +--- > +- payload 2 > +... > +is_new_generation > +--- > +- true > +... > +-- Generation is static parameter. > +s1:cfg({generation = 5}) > +--- > +- error: 'swim:cfg: unknown option generation' > +... > +s1:delete() > +--- > +... > +s2:delete() > +--- > +... > test_run:cmd("clear filter") > --- > - true > diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua > index 576219b4d..886c00af5 100644 > --- a/test/swim/swim.test.lua > +++ b/test/swim/swim.test.lua > @@ -499,4 +499,42 @@ ctx_list > > s1:delete() > > +-- > +-- gh-4280: 'generation' counter to detect restarts and refute > +-- information left from previous lifes of a SWIM instance. > +-- > +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 0}) > +s2 = swim.new({uuid = uuid(2), uri = 0, heartbeat_rate = 0.01}) > +s2:add_member({uuid = uuid(1), uri = s1:self():uri()}) > +s1_view = s2:member_by_uuid(uuid(1)) > +s1:set_payload('payload 1') > +while not s1_view:payload() do fiber.sleep(0.1) end > +s1_view:payload() > +s1:self():incarnation() > +s1:self():generation() > + > +-- Now S2 knows S1's payload as 'payload 1'. > + > +is_new_generation = false > +_ = s2:on_member_event(function(m, e) is_new_generation = is_new_generation or e:is_new_generation() end) > +s1:delete() > +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 1}) > +s1:add_member({uuid = uuid(2), uri = s2:self():uri()}) > +s1:set_payload('payload 2') > +s1:self():incarnation() > +-- Without the new generation S2 would believe that S1's payload > +-- is still 'payload 1'. Because 'payload 1' and 'payload 2' were > +-- disseminated with the same incarnation. > +s1:self():generation() > + > +while s1_view:payload() ~= 'payload 2' do fiber.sleep(0.1) end > +s1_view:payload() > +is_new_generation > + > +-- Generation is static parameter. > +s1:cfg({generation = 5}) > + > +s1:delete() > +s2:delete() > + > test_run:cmd("clear filter") > diff --git a/test/unit/swim.c b/test/unit/swim.c > index bffc0985d..4371b56f0 100644 > --- a/test/unit/swim.c > +++ b/test/unit/swim.c > @@ -133,7 +133,7 @@ swim_test_cfg(void) > { > swim_start_test(16); > > - struct swim *s = swim_new(); > + struct swim *s = swim_new(0); > assert(s != NULL); > is(swim_cfg(s, NULL, -1, -1, -1, NULL), -1, "first cfg failed - no URI"); > ok(swim_error_check_match("mandatory"), "diag says 'mandatory'"); > @@ -149,7 +149,7 @@ swim_test_cfg(void) > is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\ > "URI"); > > - struct swim *s2 = swim_new(); > + struct swim *s2 = swim_new(0); > assert(s2 != NULL); > const char *bad_uri1 = "127.1.1.1.1.1.1:1"; > const char *bad_uri2 = "google.com:1"; > @@ -391,16 +391,17 @@ swim_test_refute(void) > fail_if(swim_cluster_wait_status(cluster, 0, 1, > MEMBER_SUSPECTED, 4) != 0); > swim_cluster_set_drop(cluster, 1, 0); > - is(swim_cluster_wait_age(cluster, 1, 1, 1, 1), 0, > + is(swim_cluster_wait_age(cluster, 1, 1, 0, 1, 1), 0, > "S2 increments its own incarnation to refute its suspicion"); > - is(swim_cluster_wait_age(cluster, 0, 1, 1, 1), 0, > + is(swim_cluster_wait_age(cluster, 0, 1, 0, 1, 1), 0, > "new incarnation has reached S1 with a next round message"); > > + fail_if(swim_cluster_member_generation(cluster, 1, 1) != 0); > swim_cluster_restart_node(cluster, 1); > is(swim_cluster_member_incarnation(cluster, 1, 1), 0, > "after restart S2's incarnation is 0 again"); > - is(swim_cluster_wait_age(cluster, 1, 1, 1, 1), 0, > - "S2 learned its old bigger incarnation 1 from S0"); > + is(swim_cluster_member_generation(cluster, 1, 1), 1, > + "but generation is new"); > > swim_cluster_delete(cluster); > swim_finish_test(); > @@ -527,7 +528,7 @@ swim_test_quit(void) > * old LEFT status. > */ > swim_cluster_restart_node(cluster, 0); > - is(swim_cluster_wait_age(cluster, 0, 0, 1, 2), 0, > + is(swim_cluster_wait_age(cluster, 0, 0, 1, 0, 2), 0, > "quited member S1 has returned and refuted the old status"); > fail_if(swim_cluster_wait_fullmesh(cluster, 2) != 0); > /* > @@ -557,7 +558,7 @@ swim_test_quit(void) > > /* Now allow S2 to get the 'self-quit' message. */ > swim_cluster_unblock_io(cluster, 1); > - is(swim_cluster_wait_age(cluster, 1, 1, 2, 0), 0, "S2 finally got "\ > + is(swim_cluster_wait_age(cluster, 1, 1, 1, 1, 0), 0, "S2 finally got "\ > "'quit' message from S1, but with its 'own' UUID - refute it") > swim_cluster_delete(cluster); > > @@ -1089,7 +1090,7 @@ swim_test_triggers(void) > swim_member_unref(tctx.ctx.member); > > /* Check that recfg fires incarnation update trigger. */ > - s1 = swim_new(); > + s1 = swim_new(0); > struct tt_uuid uuid = uuid_nil; > uuid.time_low = 1; > fail_if(swim_cfg(s1, "127.0.0.1:1", -1, -1, -1, &uuid) != 0); > @@ -1110,10 +1111,39 @@ swim_test_triggers(void) > swim_finish_test(); > } > > +static void > +swim_test_generation(void) > +{ > + swim_start_test(3); > + > + struct swim_cluster *cluster = swim_cluster_new(2); > + swim_cluster_interconnect(cluster, 0, 1); > + > + const char *p1 = "payload 1"; > + int p1_size = strlen(p1); > + swim_cluster_member_set_payload(cluster, 0, p1, p1_size); > + is(swim_cluster_wait_payload_everywhere(cluster, 0, p1, p1_size, 1), 0, > + "S1 disseminated its payload to S2"); > + > + swim_cluster_restart_node(cluster, 0); > + const char *p2 = "payload 2"; > + int p2_size = strlen(p2); > + swim_cluster_member_set_payload(cluster, 0, p2, p2_size); > + is(swim_cluster_wait_payload_everywhere(cluster, 0, p2, p2_size, 2), 0, > + "S1 restarted and set another payload. Without generation it could "\ > + "lead to never disseminated new payload."); > + is(swim_cluster_member_generation(cluster, 1, 0), 1, > + "S2 sees new generation of S1"); > + > + swim_cluster_delete(cluster); > + > + swim_finish_test(); > +} > + > static int > main_f(va_list ap) > { > - swim_start_test(21); > + swim_start_test(22); > > (void) ap; > swim_test_ev_init(); > @@ -1140,6 +1170,7 @@ main_f(va_list ap) > swim_test_encryption(); > swim_test_slow_net(); > swim_test_triggers(); > + swim_test_generation(); > > swim_test_transport_free(); > swim_test_ev_free(); > diff --git a/test/unit/swim.result b/test/unit/swim.result > index 2968a2da7..bad3c30d0 100644 > --- a/test/unit/swim.result > +++ b/test/unit/swim.result > @@ -1,5 +1,5 @@ > *** main_f *** > -1..21 > +1..22 > *** swim_test_one_link *** > 1..6 > ok 1 - no rounds - no fullmesh > @@ -89,7 +89,7 @@ ok 7 - subtests > ok 1 - S2 increments its own incarnation to refute its suspicion > ok 2 - new incarnation has reached S1 with a next round message > ok 3 - after restart S2's incarnation is 0 again > - ok 4 - S2 learned its old bigger incarnation 1 from S0 > + ok 4 - but generation is new > ok 8 - subtests > *** swim_test_refute: done *** > *** swim_test_basic_gossip *** > @@ -227,4 +227,11 @@ ok 20 - subtests > ok 22 - local URI update warns about incarnation update > ok 21 - subtests > *** swim_test_triggers: done *** > + *** swim_test_generation *** > + 1..3 > + ok 1 - S1 disseminated its payload to S2 > + ok 2 - S1 restarted and set another payload. Without generation it could lead to never disseminated new payload. > + ok 3 - S2 sees new generation of S1 > +ok 22 - subtests > + *** swim_test_generation: done *** > *** main_f: done *** > diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c > index e646c3a47..228b2f752 100644 > --- a/test/unit/swim_test_utils.c > +++ b/test/unit/swim_test_utils.c > @@ -157,6 +157,11 @@ struct swim_node { > * that instance. > */ > struct tt_uuid uuid; > + /** > + * Generation counter. Persisted for restarts, when SWIM > + * is explicitly deleted before restart. > + */ > + uint64_t generation; > /** > * Filter to drop packets with a certain probability > * from/to a specified direction. > @@ -212,7 +217,8 @@ swim_test_event_cb(struct trigger *trigger, void *event) > static inline void > swim_node_create(struct swim_node *n, int id) > { > - n->swim = swim_new(); > + n->generation = 0; > + n->swim = swim_new(n->generation); > assert(n->swim != NULL); > struct trigger *t = (struct trigger *) malloc(sizeof(*t)); > trigger_create(t, swim_test_event_cb, NULL, (trigger_f0) free); > @@ -259,7 +265,8 @@ swim_cluster_new(int size) > void > swim_cluster_set_ack_timeout(struct swim_cluster *cluster, double ack_timeout) > { > - swim_cluster_set_cfg(cluster, swim_cfg, NULL, -1, ack_timeout, -1, NULL); > + swim_cluster_set_cfg(cluster, swim_cfg, NULL, -1, ack_timeout, -1, > + NULL); > cluster->ack_timeout = ack_timeout; > } > > @@ -359,6 +366,17 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, > return swim_member_incarnation(m); > } > > +uint64_t > +swim_cluster_member_generation(struct swim_cluster *cluster, int node_id, > + int member_id) > +{ > + const struct swim_member *m = > + swim_cluster_member_view(cluster, node_id, member_id); > + if (m == NULL) > + return UINT64_MAX; > + return swim_member_generation(m); > +} > + > const char * > swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, > int member_id, int *size) > @@ -402,7 +420,7 @@ swim_cluster_restart_node(struct swim_cluster *cluster, int i) > &n->uuid)); > swim_delete(s); > } > - s = swim_new(); > + s = swim_new(++n->generation); > assert(s != NULL); > int rc = swim_cfg(s, uri, -1, cluster->ack_timeout, cluster->gc_mode, > &n->uuid); > @@ -713,6 +731,7 @@ struct swim_member_template { > * target. > */ > bool need_check_age; > + uint64_t generation; > uint64_t incarnation; > /** > * True, if the payload should be checked to be equal to > @@ -751,9 +770,10 @@ swim_member_template_set_status(struct swim_member_template *t, > */ > static inline void > swim_member_template_set_age(struct swim_member_template *t, > - uint64_t incarnation) > + uint64_t generation, uint64_t incarnation) > { > t->need_check_age = true; > + t->generation = generation; > t->incarnation = incarnation; > } > > @@ -778,22 +798,25 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) > const struct swim_member *m = > swim_cluster_member_view(cluster, t->node_id, t->member_id); > enum swim_member_status status; > - uint64_t incarnation; > + uint64_t incarnation, generation; > const char *payload; > int payload_size; > if (m != NULL) { > status = swim_member_status(m); > + generation = swim_member_generation(m); > incarnation = swim_member_incarnation(m); > payload = swim_member_payload(m, &payload_size); > } else { > status = swim_member_status_MAX; > + generation = 0; > incarnation = 0; > payload = NULL; > payload_size = 0; > } > if (t->need_check_status && status != t->status) > return false; > - if (t->need_check_age && incarnation != t->incarnation) > + if (t->need_check_age && (generation != t->generation || > + incarnation != t->incarnation)) > return false; > if (t->need_check_payload && > (payload_size != t->payload_size || > @@ -848,11 +871,11 @@ swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, > > int > swim_cluster_wait_age(struct swim_cluster *cluster, int node_id, int member_id, > - uint64_t incarnation, double timeout) > + uint64_t generation, uint64_t incarnation, double timeout) > { > struct swim_member_template t; > swim_member_template_create(&t, node_id, member_id); > - swim_member_template_set_age(&t, incarnation); > + swim_member_template_set_age(&t, generation, incarnation); > return swim_wait_timeout(timeout, cluster, swim_loop_check_member, &t); > } > > diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h > index 309636b87..11adc017f 100644 > --- a/test/unit/swim_test_utils.h > +++ b/test/unit/swim_test_utils.h > @@ -160,6 +160,10 @@ uint64_t > swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, > int member_id); > > +uint64_t > +swim_cluster_member_generation(struct swim_cluster *cluster, int node_id, > + int member_id); > + > const char * > swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, > int member_id, int *size); > @@ -224,7 +228,8 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, > */ > int > swim_cluster_wait_age(struct swim_cluster *cluster, int node_id, int member_id, > - uint64_t incarnation, double timeout); > + uint64_t generation, uint64_t incarnation, > + double timeout); > > /** > * Wait until a member with id @a member_id is seen with > -- > 2.20.1 (Apple Git-117) > -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] Re: [PATCH 2/2] swim: introduce generation 2019-06-21 6:53 ` [tarantool-patches] " Konstantin Osipov @ 2019-06-21 19:03 ` Vladislav Shpilevoy 2019-06-21 19:48 ` Konstantin Osipov 0 siblings, 1 reply; 9+ messages in thread From: Vladislav Shpilevoy @ 2019-06-21 19:03 UTC (permalink / raw) To: Konstantin Osipov; +Cc: tarantool-patches On 21/06/2019 08:53, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/21 00:25]: > > Generation and incarnation look and sound similar, so it is hard > to tell which is which. > > I'd like to suggest renaming 'generation' to 'epoch'. > > "Age" would be a wrong name for a pair "epoch" + "incarnation". > > Besides, swim's term for the counter is "incarnation", so I would > stick to the swim terminology. > > I suggest this: > > struct swim_incarnation { > int64_t term; // Derived from RAFT word "term", persisted between restarts > int64_t version; // Reflects swim member state version > }; > > I am open to alternatives too. > Basically, you do not like that I kept generation out of incarnation. I did it deliberately to emphasize that SWIM's incarnation is not the same as our 'age'. Additionally, it allows not to change API and keep incarnation as a simple number. If we say, that incarnation is a structure, then I am not sure how to return it in public API. Split all methods operating on incarnation in 2 parts? Assume, that we decided to split, and can encapsulate everything inside the incarnation. 'version' for volatile part sounds good. 'term' does not. Raft uses 'term' as a version of round, not a version of an instance. It is about cluster state, not an instance state. 'epoch' is the same IMO. Furthermore, both 'epoch' and 'term' are dynamic usually. 'Generation' was perfect - it means 'поколение'. And it is logically to increment it only when an instance dies and starts again, and never change during lifetime. If you still think, that the complex incarnation is the best, I suggest incarnation = { generation, version } It leads to the following changes in the API. - member:incarnation() + member:generation() + member:version() Method event:is_new_incarnation() will return true if generation or version are changed. To check which one a user will use event:is_new_version() event:is_new_generation() But I think, that current 'age' solution is better. It allows to do not touch core SWIM concept 'incarnation'. I am waiting for your decision on what to do. ^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] Re: [PATCH 2/2] swim: introduce generation 2019-06-21 19:03 ` Vladislav Shpilevoy @ 2019-06-21 19:48 ` Konstantin Osipov 2019-06-21 19:53 ` Konstantin Osipov 0 siblings, 1 reply; 9+ messages in thread From: Konstantin Osipov @ 2019-06-21 19:48 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/21 22:31]: Well, I don't know, it's bikeshed. Your argument doesn't make sense however, age can not encompass generation - an age is something you have within a generation. Plus, I was thinking that the two fields together are the actual swim incarnation, because only the two things together ensure strict ordering, which is swim's requirement for incarnation. As for the API, you could return a binary or text for incarnation and thus preserve SWIM requirements. Basically, I thought that we keep the incarnation as before, but make it a bit complex than what swim does by default - it's part of your original proposal of keeping both members part of 8-byte incarnation, but without 8-byte compromise. -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] Re: [PATCH 2/2] swim: introduce generation 2019-06-21 19:48 ` Konstantin Osipov @ 2019-06-21 19:53 ` Konstantin Osipov 2019-06-21 22:00 ` Vladislav Shpilevoy 0 siblings, 1 reply; 9+ messages in thread From: Konstantin Osipov @ 2019-06-21 19:53 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Konstantin Osipov <kostja@tarantool.org> [19/06/21 22:48]: > As for the API, you could return a binary or text for incarnation and > thus preserve SWIM requirements. I mean: function incarnation() return tostring(generation) .. ":" .. tostring(incarnation) end > > Basically, I thought that we keep the incarnation as before, but > make it a bit complex than what swim does by default - it's part > of your original proposal of keeping both members part of 8-byte > incarnation, but without 8-byte compromise. > > -- > Konstantin Osipov, Moscow, Russia -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] Re: [PATCH 2/2] swim: introduce generation 2019-06-21 19:53 ` Konstantin Osipov @ 2019-06-21 22:00 ` Vladislav Shpilevoy 2019-06-21 22:31 ` Konstantin Osipov 0 siblings, 1 reply; 9+ messages in thread From: Vladislav Shpilevoy @ 2019-06-21 22:00 UTC (permalink / raw) To: Konstantin Osipov; +Cc: tarantool-patches We have decided in a chat, that a user should be able to get separately 'generation', 'version', and accumulated 'incarnation' as a binary merge of these two values. It allows in simple cases do not care about generation/version, and stick to the incarnation only. Concerning API. I suggest the following C API changes: - uint64_t - swim_member_incarnation(const struct swim_member *member); + struct swim_incarnation { + uint64_t generation; + uint64_t version; + }; + + int + swim_incarnation_cmp(const struct swim_incarnation *l, + const struct swim_incarnation *r); + + struct swim_incarnation + swim_member_incarnation(const struct swim_member *member); struct swim_incarnation will be a public structure in order to simplify working with C API. I am trying to avoid returning some const char * or similarly abstract thing. Lua API changes are different. I know, we've talked about returning a binary string as an incarnation, but then we need separate methods to take version and generation, and waste memory on these temporary binary strings, useless for anything but comparison. Worse, we will desync with C API. I've found a revolutionary feature of Lua FFI which can help us very much. It combines your 'merged' incarnation, and my 'totally different numbers' incarnation. In Lua FFI you can define your own comparison operators for C structs (just learned about that)! Method member:incarnation() will return cdata struct swim_incarnation, with ffi.metatype containing comparators. struct swim_incarnation will be initialized in Lua like this: ffi = require('ffi') ffi.cdef[[ struct swim_incarnation { uint64_t generation; uint64_t version; }; ]] inc_t = ffi.typeof('struct swim_incarnation') inc_mt = { __eq = function(l, r) return l.generation == r.generation and l.version == r.version end, __lt = function(l, r) return l.generation < r.generation or l.generation == r.generation and l.version < r.version end, __le = function(l, r) return l.generation < r.generation or l.generation == r.generation and l.version <= r.version end, } ffi.metatype(inc_t, inc_mt) ia = ffi.new('struct swim_incarnation') ib = ffi.new('struct swim_incarnation') ia and ib can be compared using just '< > <= >= == ~=' operators, and at the same time you can access individual members!!! What do you think? Both about C and Lua API changes. ^ permalink raw reply [flat|nested] 9+ messages in thread
* [tarantool-patches] Re: [PATCH 2/2] swim: introduce generation 2019-06-21 22:00 ` Vladislav Shpilevoy @ 2019-06-21 22:31 ` Konstantin Osipov 0 siblings, 0 replies; 9+ messages in thread From: Konstantin Osipov @ 2019-06-21 22:31 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/22 01:03]: Yup. > We have decided in a chat, that a user should be > able to get separately 'generation', 'version', and > accumulated 'incarnation' as a binary merge of these > two values. It allows in simple cases do not care > about generation/version, and stick to the incarnation > only. > > Concerning API. I suggest the following C API changes: > > - uint64_t > - swim_member_incarnation(const struct swim_member *member); > > + struct swim_incarnation { > + uint64_t generation; > + uint64_t version; > + }; > + > + int > + swim_incarnation_cmp(const struct swim_incarnation *l, > + const struct swim_incarnation *r); > + > + struct swim_incarnation > + swim_member_incarnation(const struct swim_member *member); > > struct swim_incarnation will be a public structure in order to > simplify working with C API. I am trying to avoid returning some > const char * or similarly abstract thing. > > > > Lua API changes are different. I know, we've talked about > returning a binary string as an incarnation, but then we need > separate methods to take version and generation, and waste > memory on these temporary binary strings, useless for anything > but comparison. Worse, we will desync with C API. > > > I've found a revolutionary feature of Lua FFI which can help us > very much. It combines your 'merged' incarnation, and my 'totally > different numbers' incarnation. In Lua FFI you can define your > own comparison operators for C structs (just learned about that)! > > Method member:incarnation() will return cdata struct swim_incarnation, > with ffi.metatype containing comparators. struct swim_incarnation > will be initialized in Lua like this: > > > ffi = require('ffi') > ffi.cdef[[ > struct swim_incarnation { > uint64_t generation; > uint64_t version; > }; > ]] > inc_t = ffi.typeof('struct swim_incarnation') > inc_mt = { > __eq = function(l, r) > return l.generation == r.generation and > l.version == r.version > end, > __lt = function(l, r) > return l.generation < r.generation or > l.generation == r.generation and l.version < r.version > end, > __le = function(l, r) > return l.generation < r.generation or > l.generation == r.generation and l.version <= r.version > end, > } > ffi.metatype(inc_t, inc_mt) > ia = ffi.new('struct swim_incarnation') > ib = ffi.new('struct swim_incarnation') > > ia and ib can be compared using just '< > <= >= == ~=' operators, > and at the same time you can access individual members!!! > > What do you think? Both about C and Lua API changes. -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 9+ messages in thread
end of thread, other threads:[~2019-06-21 22:31 UTC | newest] Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2019-06-20 21:23 [tarantool-patches] [PATCH 0/2] SWIM generation Vladislav Shpilevoy 2019-06-20 21:23 ` [tarantool-patches] [PATCH 1/2] swim: encapsulate incarnation behind 'age' Vladislav Shpilevoy 2019-06-20 21:23 ` [tarantool-patches] [PATCH 2/2] swim: introduce generation Vladislav Shpilevoy 2019-06-21 6:53 ` [tarantool-patches] " Konstantin Osipov 2019-06-21 19:03 ` Vladislav Shpilevoy 2019-06-21 19:48 ` Konstantin Osipov 2019-06-21 19:53 ` Konstantin Osipov 2019-06-21 22:00 ` Vladislav Shpilevoy 2019-06-21 22:31 ` Konstantin Osipov
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox