From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 5BAEE311BE for ; Thu, 20 Jun 2019 17:23:06 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id Vui3bYgybuT6 for ; Thu, 20 Jun 2019 17:23:06 -0400 (EDT) Received: from smtp62.i.mail.ru (smtp62.i.mail.ru [217.69.128.42]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id C48EB311AC for ; Thu, 20 Jun 2019 17:23:05 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH 2/2] swim: introduce generation Date: Thu, 20 Jun 2019 23:23:25 +0200 Message-Id: <9852675764d25bdc720023ed6763d97cbeab8bee.1561065646.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org SWIM uses incarnation to refute old information, but it is not enough when restarts are possible. If an instance restarts, its incarnation is reset to 0. After several local and fast updates it gets N. But it is possible, that other instances also know incarnation of this instance as N, from its previous life, but with different information. They will never take new version of data, because their current version is also considered actual. As a result, incarnation is not enough. There was a necessity to create a persistent part of incarnation. This patch introduces it and calls 'generation'. As an additional profit, generation allows to react on instance restart in user defined triggers. Closes #4280 @TarantoolBot document Title: SWIM generation Generation is a persistent part of incarnation allowing users to refute old pieces of information left from previous lifes of an instance. It is a static attribute set when a SWIM instance is created, and can't be changed without restarting the instance. Generation not only helps with overriding old information, but also can be used to detect restarts in user defined triggers. How to set generation: ```Lua swim = require('swim') s = swim.new({generation = }) ``` Generation can't be set in `swim:cfg`. If it is omitted, then 0 is used by default. But be careful - if the instance is started not a first time, it is safer to use a new generation. Ideally it should be persisted somehow: in a file, in a space, in a global service. How to detect restarts: ```Lua swim = require('swim') s = swim.new() s:on_member_event(function(m, e) if e:is_new_generation() then ... -- Process restart. end end) ``` `is_new_generation` is a new method of event object passed into triggers. How to learn generation - use new `swim_member:generation()` method. Binary protocol is updated. Now Protocol Logic section looks like this: +-------------------Protocol logic section--------------------+ | map { | | 0 = SWIM_SRC_UUID: 16 byte UUID, | | | | AND | | | | 2 = SWIM_FAILURE_DETECTION: map { | | 0 = SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | | 1 = SWIM_FD_GENERATION: uint, | | 2 = SWIM_FD_INCARNATION: uint | | }, | | | | OR/AND | | | | 3 = SWIM_DISSEMINATION: array [ | | map { | | 0 = SWIM_MEMBER_STATUS: uint, | | enum member_status, | | 1 = SWIM_MEMBER_ADDRESS: uint, ip, | | 2 = SWIM_MEMBER_PORT: uint, port, | | 3 = SWIM_MEMBER_UUID: 16 byte UUID, | | 4 = SWIM_MEMBER_GENERATION: uint, | | 5 = SWIM_MEMBER_INCARNATION: uint, | | 6 = SWIM_MEMBER_PAYLOAD: bin | | }, | | ... | | ], | | | | OR/AND | | | | 1 = SWIM_ANTI_ENTROPY: array [ | | map { | | 0 = SWIM_MEMBER_STATUS: uint, | | enum member_status, | | 1 = SWIM_MEMBER_ADDRESS: uint, ip, | | 2 = SWIM_MEMBER_PORT: uint, port, | | 3 = SWIM_MEMBER_UUID: 16 byte UUID, | | 4 = SWIM_MEMBER_GENERATION: uint, | | 5 = SWIM_MEMBER_INCARNATION: uint, | | 6 = SWIM_MEMBER_PAYLOAD: bin | | }, | | ... | | ], | | | | OR/AND | | | | 4 = SWIM_QUIT: map { | | 0 = SWIM_QUIT_GENERATION: uint, | | 1 = SWIM_QUIT_INCARNATION: uint | | } | | } | +-------------------------------------------------------------+ Note - SWIM_FD_INCARNATION, SWIM_MEMBER_INCARNATION, SWIM_MEMBER_PAYLOAD, SWIM_QUIT_INCARNATION got new values. This is because 1) the SWIM is not released yet, and it is legal to change values, 2) I wanted to emphasize that 'generation' is first/upper part of member age, 'incarnation' is second/lower part. --- extra/exports | 1 + src/lib/swim/swim.c | 46 ++++++++-- src/lib/swim/swim.h | 18 +++- src/lib/swim/swim_proto.c | 31 +++++-- src/lib/swim/swim_proto.h | 38 ++++++-- src/lua/swim.c | 3 +- src/lua/swim.lua | 43 +++++++-- test/swim/swim.result | 176 +++++++++++++++++++++++++++++------- test/swim/swim.test.lua | 38 ++++++++ test/unit/swim.c | 51 +++++++++-- test/unit/swim.result | 11 ++- test/unit/swim_test_utils.c | 39 ++++++-- test/unit/swim_test_utils.h | 7 +- 13 files changed, 414 insertions(+), 88 deletions(-) diff --git a/extra/exports b/extra/exports index b8c42c0df..a4b79099d 100644 --- a/extra/exports +++ b/extra/exports @@ -108,6 +108,7 @@ swim_iterator_close swim_member_uri swim_member_uuid swim_member_incarnation +swim_member_generation swim_member_payload swim_member_ref swim_member_unref diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index b55945a34..778d1b291 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -223,11 +223,24 @@ static inline int swim_age_cmp(const struct swim_age *l, const struct swim_age *r, enum swim_ev_mask *diff) { - if (l->incarnation == r->incarnation) { + /* + * The most likely path, checked first and foremost + * explicitly. + */ + if (l->generation == r->generation && + l->incarnation == r->incarnation) { *diff = 0; return 0; } *diff = SWIM_EV_NEW_INCARNATION; + if (l->generation < r->generation) { + *diff |= SWIM_EV_NEW_GENERATION; + return -1; + } + if (l->generation > r->generation) { + *diff |= SWIM_EV_NEW_GENERATION; + return 1; + } return l->incarnation < r->incarnation ? -1 : 1; } @@ -441,6 +454,15 @@ struct swim { * status. */ struct swim_member *self; + /** + * Generation of that instance is set when the latter is + * created. It is actual only until the instance is + * configured. After that the instance can learn a bigger + * own generation from other members. Despite meaning + * in fact a wrong usage of SWIM generations, it is still + * possible. + */ + uint64_t initial_generation; /** * Scheduler of output requests, receiver of incoming * ones. @@ -1663,8 +1685,8 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, uint32_t size; if (swim_decode_map(pos, end, &size, prefix, "root") != 0) return -1; - if (size != 1) { - diag_set(SwimError, "%s map of size 1 is expected", prefix); + if (size != 2) { + diag_set(SwimError, "%s map of size 2 is expected", prefix); return -1; } struct swim_age age; @@ -1673,6 +1695,11 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) return -1; switch (key) { + case SWIM_QUIT_GENERATION: + if (swim_decode_uint(pos, end, &age.generation, prefix, + "generation") != 0) + return -1; + break; case SWIM_QUIT_INCARNATION: if (swim_decode_uint(pos, end, &age.incarnation, prefix, "incarnation") != 0) @@ -1811,13 +1838,14 @@ swim_event_handler_f(va_list va) struct swim * -swim_new(void) +swim_new(uint64_t generation) { struct swim *swim = (struct swim *) calloc(1, sizeof(*swim)); if (swim == NULL) { diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim"); return NULL; } + swim->initial_generation = generation; swim->members = mh_swim_table_new(); if (swim->members == NULL) { free(swim); @@ -1924,7 +1952,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } struct swim_age age; - swim_age_create(&age, 0); + swim_age_create(&age, swim->initial_generation, 0); new_self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &age, NULL, 0); if (new_self == NULL) @@ -2042,7 +2070,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { struct swim_age age; - swim_age_create(&age, 0); + swim_age_create(&age, 0, 0); member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &age, NULL, -1); return member == NULL ? -1 : 0; @@ -2301,6 +2329,12 @@ swim_member_incarnation(const struct swim_member *member) return member->age.incarnation; } +uint64_t +swim_member_generation(const struct swim_member *member) +{ + return member->age.generation; +} + const char * swim_member_payload(const struct swim_member *member, int *size) { diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index a42ace7c6..e5e6ec658 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -66,9 +66,16 @@ enum swim_gc_mode { * Create a new SWIM instance. Do not bind to a port or set any * parameters. Allocation and initialization only. The function * yields. + * @param generation A user-defined upper part of the instance + * age. Age consists of volatile incarnation and this + * number. It is assumed that user persists generation + * value, after restart increments it, and therefore other + * instances can detect the restart. At the same time that + * value is used to refute old attributes left from + * previous lifes of that instance. */ struct swim * -swim_new(void); +swim_new(uint64_t generation); /** Check if a swim instance is configured. */ bool @@ -237,6 +244,10 @@ swim_member_uuid(const struct swim_member *member); uint64_t swim_member_incarnation(const struct swim_member *member); +/** Member's generation. */ +uint64_t +swim_member_generation(const struct swim_member *member); + /** Member's payload. */ const char * swim_member_payload(const struct swim_member *member, int *size); @@ -281,9 +292,10 @@ enum swim_ev_mask { SWIM_EV_NEW_URI = 0b00000100, SWIM_EV_NEW_INCARNATION = 0b00001000, SWIM_EV_NEW_PAYLOAD = 0b00010000, + SWIM_EV_NEW_GENERATION = 0b00100000, /* Shortcut to check for any update. */ - SWIM_EV_UPDATE = 0b00011110, - SWIM_EV_DROP = 0b00100000, + SWIM_EV_UPDATE = 0b00111110, + SWIM_EV_DROP = 0b01000000, }; /** On member event trigger context. */ diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index c42d67c0a..610c41068 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -174,8 +174,11 @@ swim_check_inaddr_not_empty(const struct sockaddr_in *addr, const char *prefix, * the age fields should be marked. */ static inline void -swim_age_bin_create(struct swim_age_bin *bin, uint8_t incarnation_key) +swim_age_bin_create(struct swim_age_bin *bin, uint8_t generation_key, + uint8_t incarnation_key) { + bin->k_generation = generation_key; + bin->m_generation = 0xcf; bin->k_incarnation = incarnation_key; bin->m_incarnation = 0xcf; } @@ -184,6 +187,7 @@ swim_age_bin_create(struct swim_age_bin *bin, uint8_t incarnation_key) static inline void swim_age_bin_fill(struct swim_age_bin *bin, const struct swim_age *age) { + bin->v_generation = mp_bswap_u64(age->generation); bin->v_incarnation = mp_bswap_u64(age->incarnation); } @@ -265,6 +269,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member uuid") != 0) return -1; break; + case SWIM_MEMBER_GENERATION: + if (swim_decode_uint(pos, end, &def->age.generation, prefix, + "member generation") != 0) + return -1; + break; case SWIM_MEMBER_INCARNATION: if (swim_decode_uint(pos, end, &def->age.incarnation, prefix, "member incarnation") != 0) @@ -335,7 +344,8 @@ swim_fd_header_bin_create(struct swim_fd_header_bin *header, header->k_type = SWIM_FD_MSG_TYPE; header->v_type = type; - swim_age_bin_create(&header->age, SWIM_FD_INCARNATION); + swim_age_bin_create(&header->age, SWIM_FD_GENERATION, + SWIM_FD_INCARNATION); swim_age_bin_fill(&header->age, age); } @@ -349,9 +359,9 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, return -1; memset(def, 0, sizeof(*def)); def->type = swim_fd_msg_type_MAX; - if (size != 2) { - diag_set(SwimError, "%s root map should have two keys - "\ - "message type and incarnation", prefix); + if (size != 3) { + diag_set(SwimError, "%s root map should have 3 keys - "\ + "message type, generation, incarnation", prefix); return -1; } for (int i = 0; i < (int) size; ++i) { @@ -370,6 +380,11 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, } def->type = key; break; + case SWIM_FD_GENERATION: + if (swim_decode_uint(pos, end, &def->age.generation, + prefix, "generation") != 0) + return -1; + break; case SWIM_FD_INCARNATION: if (swim_decode_uint(pos, end, &def->age.incarnation, prefix, "incarnation") != 0) @@ -419,7 +434,8 @@ swim_passport_bin_create(struct swim_passport_bin *passport) passport->k_uuid = SWIM_MEMBER_UUID; passport->m_uuid = 0xc4; passport->m_uuid_len = UUID_LEN; - swim_age_bin_create(&passport->age, SWIM_MEMBER_INCARNATION); + swim_age_bin_create(&passport->age, SWIM_MEMBER_GENERATION, + SWIM_MEMBER_INCARNATION); } void @@ -579,7 +595,8 @@ swim_quit_bin_create(struct swim_quit_bin *header, const struct swim_age *age) header->k_quit = SWIM_QUIT; assert(mp_sizeof_map(SWIM_AGE_BIN_SIZE) == 1); header->m_quit = 0x80 | SWIM_AGE_BIN_SIZE; - swim_age_bin_create(&header->age, SWIM_QUIT_INCARNATION); + swim_age_bin_create(&header->age, SWIM_QUIT_GENERATION, + SWIM_QUIT_INCARNATION); swim_age_bin_fill(&header->age, age); } diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index acd266db4..c65b29696 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -72,6 +72,7 @@ enum { * | | * | SWIM_FAILURE_DETECTION: { | * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | + * | SWIM_FD_GENERATION: uint, | * | SWIM_FD_INCARNATION: uint | * | }, | * | | @@ -83,6 +84,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_GENERATION: uint, | * | SWIM_MEMBER_INCARNATION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | @@ -97,6 +99,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_GENERATION: uint, | * | SWIM_MEMBER_INCARNATION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | @@ -106,6 +109,7 @@ enum { * | OR/AND | * | | * | SWIM_QUIT: { | + * | SWIM_QUIT_GENERATION: uint, | * | SWIM_QUIT_INCARNATION: uint | * | } | * | } | @@ -117,6 +121,12 @@ enum { * to reject/rewrite old data. */ struct swim_age { + /** + * Generation is a persistent part of age. Ages are + * compared firstly by this value, and only after by + * incarnation. + */ + uint64_t generation; /** * Incarnation is a volatile fully automatic part, which * is used to refute incorrect and rewrite old information @@ -127,8 +137,9 @@ struct swim_age { /** Initialize age. */ static inline void -swim_age_create(struct swim_age *age, uint64_t incarnation) +swim_age_create(struct swim_age *age, uint64_t generation, uint64_t incarnation) { + age->generation = generation; age->incarnation = incarnation; } @@ -138,7 +149,7 @@ enum { * storing an age should use this size so as to correctly * encode MessagePack map header. */ - SWIM_AGE_BIN_SIZE = 1, + SWIM_AGE_BIN_SIZE = 2, }; /** @@ -146,6 +157,12 @@ enum { * map. */ struct PACKED swim_age_bin { + /** mp_encode_uint(generation key) */ + uint8_t k_generation; + /** mp_encode_uint(64bit generation) */ + uint8_t m_generation; + uint64_t v_generation; + /** mp_encode_uint(incarnation key) */ uint8_t k_incarnation; /** mp_encode_uint(64bit incarnation) */ @@ -229,6 +246,7 @@ enum swim_fd_key { * considered dead, but a newer ping/ack was received from * it. */ + SWIM_FD_GENERATION, SWIM_FD_INCARNATION, }; @@ -245,7 +263,7 @@ extern const char *swim_fd_msg_type_strs[]; struct PACKED swim_fd_header_bin { /** mp_encode_uint(SWIM_FAILURE_DETECTION) */ uint8_t k_header; - /** mp_encode_map(2) */ + /** mp_encode_map(3) */ uint8_t m_header; /** mp_encode_uint(SWIM_FD_MSG_TYPE) */ @@ -253,7 +271,7 @@ struct PACKED swim_fd_header_bin { /** mp_encode_uint(enum swim_fd_msg_type) */ uint8_t v_type; - /** SWIM_FD_INCARNATION */ + /** SWIM_FD_GENERATION, SWIM_FD_INCARNATION */ struct swim_age_bin age; }; @@ -329,6 +347,7 @@ enum swim_member_key { SWIM_MEMBER_ADDRESS, SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, + SWIM_MEMBER_GENERATION, SWIM_MEMBER_INCARNATION, SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, @@ -360,7 +379,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, * date and TTD is > 0. */ struct PACKED swim_passport_bin { - /** mp_encode_map(5 or 6) */ + /** mp_encode_map(6 or 7) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -378,7 +397,7 @@ struct PACKED swim_passport_bin { uint8_t m_uuid_len; uint8_t v_uuid[UUID_LEN]; - /** SWIM_MEMBER_INCARNATION */ + /** SWIM_MEMBER_GENERATION, SWIM_MEMBER_INCARNATION */ struct swim_age_bin age; }; @@ -598,17 +617,18 @@ swim_route_bin_create(struct swim_route_bin *route, enum swim_quit_key { /** Age to ignore old quit messages. */ - SWIM_QUIT_INCARNATION = 0, + SWIM_QUIT_GENERATION = 0, + SWIM_QUIT_INCARNATION }; /** Quit section. Describes voluntary quit from the cluster. */ struct PACKED swim_quit_bin { /** mp_encode_uint(SWIM_QUIT) */ uint8_t k_quit; - /** mp_encode_map(1) */ + /** mp_encode_map(2) */ uint8_t m_quit; - /** SWIM_QUIT_INCARNATION */ + /** SWIM_QUIT_GENERATION, SWIM_QUIT_INCARNATION */ struct swim_age_bin age; }; diff --git a/src/lua/swim.c b/src/lua/swim.c index c3a0a9911..26646f41f 100644 --- a/src/lua/swim.c +++ b/src/lua/swim.c @@ -67,7 +67,8 @@ lua_swim_on_member_event(struct lua_State *L) static int lua_swim_new(struct lua_State *L) { - struct swim *s = swim_new(); + uint64_t generation = luaL_checkuint64(L, 1); + struct swim *s = swim_new(generation); *(struct swim **) luaL_pushcdata(L, ctid_swim_ptr) = s; if (s != NULL) return 1; diff --git a/src/lua/swim.lua b/src/lua/swim.lua index e411a397d..c6e843dbd 100644 --- a/src/lua/swim.lua +++ b/src/lua/swim.lua @@ -30,8 +30,9 @@ ffi.cdef[[ SWIM_EV_NEW_URI = 0b00000100, SWIM_EV_NEW_INCARNATION = 0b00001000, SWIM_EV_NEW_PAYLOAD = 0b00010000, - SWIM_EV_UPDATE = 0b00011110, - SWIM_EV_DROP = 0b00100000, + SWIM_EV_NEW_GENERATION = 0b00100000, + SWIM_EV_UPDATE = 0b00111110, + SWIM_EV_DROP = 0b01000000, }; bool @@ -95,6 +96,9 @@ ffi.cdef[[ uint64_t swim_member_incarnation(const struct swim_member *member); + uint64_t + swim_member_generation(const struct swim_member *member); + const char * swim_member_payload(const struct swim_member *member, int *size); @@ -315,6 +319,11 @@ local function swim_member_incarnation(m) return capi.swim_member_incarnation(ptr) end +local function swim_member_generation(m) + local ptr = swim_check_member(m, 'member:generation()') + return capi.swim_member_generation(ptr) +end + local function swim_member_is_dropped(m) local ptr = swim_check_member(m, 'member:is_dropped()') return capi.swim_member_is_dropped(ptr) @@ -367,9 +376,10 @@ local function swim_member_payload(m) -- Both the age and the flag are needed. Age is not enough, -- because a new age can be disseminated earlier than a new -- payload. For example, via ACK messages. - local key1 = capi.swim_member_incarnation(ptr) - local key2 = capi.swim_member_is_payload_up_to_date(ptr) - if key1 == m.p_key1 and key2 == m.p_key2 then + local key1 = capi.swim_member_generation(ptr) + local key2 = capi.swim_member_incarnation(ptr) + local key3 = capi.swim_member_is_payload_up_to_date(ptr) + if key1 == m.p_key1 and key2 == m.p_key2 and key3 == m.p_key3 then return m.p end local cdata, size = swim_member_payload_raw(ptr) @@ -386,6 +396,7 @@ local function swim_member_payload(m) rawset(m, 'p', result) rawset(m, 'p_key1', key1) rawset(m, 'p_key2', key2) + rawset(m, 'p_key3', key3) return result end @@ -412,6 +423,7 @@ local function swim_member_serialize(m) status = swim_member_status(m), uuid = swim_member_uuid(m), uri = swim_member_uri(m), + generation = swim_member_generation(m), incarnation = swim_member_incarnation(m), -- There are many ways to interpret a payload, and it is -- not a job of a serialization method. Only binary size @@ -427,6 +439,7 @@ local swim_member_mt = { uuid = swim_member_uuid, uri = swim_member_uri, incarnation = swim_member_incarnation, + generation = swim_member_generation, payload_cdata = swim_member_payload_cdata, payload_str = swim_member_payload_str, payload = swim_member_payload, @@ -722,6 +735,9 @@ local swim_member_event_index = { is_new_uri = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_URI) ~= 0 end, + is_new_generation = function(self) + return bit.band(self[1], capi.SWIM_EV_NEW_GENERATION) ~= 0 + end, is_new_incarnation = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0 end, @@ -910,10 +926,23 @@ local cache_table_mt = { __mode = 'v' } -- -- Create a new SWIM instance, and configure if @a cfg is --- provided. +-- provided. @a cfg can contain one non-dynamic parameter - +-- generation. It can't be changed later with swim:cfg(). -- local function swim_new(cfg) - local ptr = internal.swim_new() + local generation = 0 + if cfg and type(cfg) == 'table' and cfg.generation ~= nil then + generation = cfg.generation + if type(generation) ~= 'number' or generation < 0 or + generation ~= math.floor(generation) then + return error('swim.new: expected non-negative integer generation') + end + cfg = table.copy(cfg) + -- swim:cfg() should not see that parameter. It takes only + -- dynamic ones. + cfg.generation = nil + end + local ptr = internal.swim_new(generation) if ptr == nil then return nil, box.error.last() end diff --git a/test/swim/swim.result b/test/swim/swim.result index cceee2595..27acb0983 100644 --- a/test/swim/swim.result +++ b/test/swim/swim.result @@ -305,11 +305,12 @@ s1:self():uuid() ... old_self --- -- uri: 127.0.0.1: +- generation: 0 + payload_size: 0 status: left incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1: ... -- Can't remove self. s1:remove_member(uuid(3)) @@ -389,11 +390,12 @@ s = s1:self() ... s --- -- uri: 127.0.0.1: +- generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1: ... s:status() --- @@ -488,11 +490,12 @@ s1:member_by_uuid(uuid(2)) -- UUID can be cdata. s1:member_by_uuid(s:uuid()) --- -- uri: 127.0.0.1: +- generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1: ... s1:quit() --- @@ -776,11 +779,12 @@ s.pairs() iterate() --- - - - 00000000-0000-1000-8000-000000000001 - - uri: 127.0.0.1: + - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1: ... s:add_member({uuid = uuid(2), uri = uri()}) --- @@ -789,17 +793,19 @@ s:add_member({uuid = uuid(2), uri = uri()}) iterate() --- - - - 00000000-0000-1000-8000-000000000002 - - uri: 127.0.0.1: + - generation: 0 + payload_size: 0 status: alive incarnation: 0 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 + uri: 127.0.0.1: - - 00000000-0000-1000-8000-000000000001 - - uri: 127.0.0.1: + - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1: ... s:add_member({uuid = uuid(3), uri = uri()}) --- @@ -808,23 +814,26 @@ s:add_member({uuid = uuid(3), uri = uri()}) iterate() --- - - - 00000000-0000-1000-8000-000000000001 - - uri: 127.0.0.1: + - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1: - - 00000000-0000-1000-8000-000000000003 - - uri: 127.0.0.1: + - generation: 0 + payload_size: 0 status: alive incarnation: 0 uuid: 00000000-0000-1000-8000-000000000003 - payload_size: 0 + uri: 127.0.0.1: - - 00000000-0000-1000-8000-000000000002 - - uri: 127.0.0.1: + - generation: 0 + payload_size: 0 status: alive incarnation: 0 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 + uri: 127.0.0.1: ... s:delete() --- @@ -962,11 +971,12 @@ s2 = s:member_by_uuid(uuid(2)) ... s2 --- -- uri: 127.0.0.1: +- generation: 0 + payload_size: 0 status: alive incarnation: 0 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 + uri: 127.0.0.1: ... -- Next lookups return the same member table. s2_old_uri = s2:uri() @@ -1208,11 +1218,12 @@ while s1:member_by_uuid(s2:self():uuid()) == nil do fiber.sleep(0.01) end ... s2:member_by_uuid(s1:self():uuid()) --- -- uri: 127.0.0.1: +- generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1: ... s1:delete() --- @@ -1285,11 +1296,12 @@ while #m_list < 1 do fiber.sleep(0) end ... m_list --- -- - uri: 127.0.0.1: +- - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 0 + uri: 127.0.0.1: ... e_list --- @@ -1326,11 +1338,12 @@ while s1:size() ~= 2 do fiber.sleep(0.01) end -- sleeps. m_list --- -- - uri: 127.0.0.1: +- - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 + uri: 127.0.0.1: ... e_list --- @@ -1368,21 +1381,24 @@ while #m_list ~= 3 do fiber.sleep(0.01) end ... m_list --- -- - uri: 127.0.0.1: +- - generation: 0 + payload_size: 0 status: alive incarnation: 1 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 - - uri: 127.0.0.1: + uri: 127.0.0.1: + - generation: 0 + payload_size: 8 status: alive incarnation: 2 uuid: 00000000-0000-1000-8000-000000000001 + uri: 127.0.0.1: + - generation: 0 payload_size: 8 - - uri: 127.0.0.1: status: alive incarnation: 2 uuid: 00000000-0000-1000-8000-000000000001 - payload_size: 8 + uri: 127.0.0.1: ... e_list --- @@ -1425,16 +1441,18 @@ fiber.sleep(0) -- Two events - status update to 'left', and 'drop'. m_list --- -- - uri: 127.0.0.1: +- - generation: 0 + payload_size: 0 status: left incarnation: 1 uuid: 00000000-0000-1000-8000-000000000002 + uri: 127.0.0.1: + - generation: 0 payload_size: 0 - - uri: 127.0.0.1: status: left incarnation: 1 uuid: 00000000-0000-1000-8000-000000000002 - payload_size: 0 + uri: 127.0.0.1: ... e_list --- @@ -1484,6 +1502,96 @@ ctx_list s1:delete() --- ... +-- +-- gh-4280: 'generation' counter to detect restarts and refute +-- information left from previous lifes of a SWIM instance. +-- +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 0}) +--- +... +s2 = swim.new({uuid = uuid(2), uri = 0, heartbeat_rate = 0.01}) +--- +... +s2:add_member({uuid = uuid(1), uri = s1:self():uri()}) +--- +- true +... +s1_view = s2:member_by_uuid(uuid(1)) +--- +... +s1:set_payload('payload 1') +--- +- true +... +while not s1_view:payload() do fiber.sleep(0.1) end +--- +... +s1_view:payload() +--- +- payload 1 +... +s1:self():incarnation() +--- +- 2 +... +s1:self():generation() +--- +- 0 +... +-- Now S2 knows S1's payload as 'payload 1'. +is_new_generation = false +--- +... +_ = s2:on_member_event(function(m, e) is_new_generation = is_new_generation or e:is_new_generation() end) +--- +... +s1:delete() +--- +... +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 1}) +--- +... +s1:add_member({uuid = uuid(2), uri = s2:self():uri()}) +--- +- true +... +s1:set_payload('payload 2') +--- +- true +... +s1:self():incarnation() +--- +- 2 +... +-- Without the new generation S2 would believe that S1's payload +-- is still 'payload 1'. Because 'payload 1' and 'payload 2' were +-- disseminated with the same incarnation. +s1:self():generation() +--- +- 1 +... +while s1_view:payload() ~= 'payload 2' do fiber.sleep(0.1) end +--- +... +s1_view:payload() +--- +- payload 2 +... +is_new_generation +--- +- true +... +-- Generation is static parameter. +s1:cfg({generation = 5}) +--- +- error: 'swim:cfg: unknown option generation' +... +s1:delete() +--- +... +s2:delete() +--- +... test_run:cmd("clear filter") --- - true diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua index 576219b4d..886c00af5 100644 --- a/test/swim/swim.test.lua +++ b/test/swim/swim.test.lua @@ -499,4 +499,42 @@ ctx_list s1:delete() +-- +-- gh-4280: 'generation' counter to detect restarts and refute +-- information left from previous lifes of a SWIM instance. +-- +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 0}) +s2 = swim.new({uuid = uuid(2), uri = 0, heartbeat_rate = 0.01}) +s2:add_member({uuid = uuid(1), uri = s1:self():uri()}) +s1_view = s2:member_by_uuid(uuid(1)) +s1:set_payload('payload 1') +while not s1_view:payload() do fiber.sleep(0.1) end +s1_view:payload() +s1:self():incarnation() +s1:self():generation() + +-- Now S2 knows S1's payload as 'payload 1'. + +is_new_generation = false +_ = s2:on_member_event(function(m, e) is_new_generation = is_new_generation or e:is_new_generation() end) +s1:delete() +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 1}) +s1:add_member({uuid = uuid(2), uri = s2:self():uri()}) +s1:set_payload('payload 2') +s1:self():incarnation() +-- Without the new generation S2 would believe that S1's payload +-- is still 'payload 1'. Because 'payload 1' and 'payload 2' were +-- disseminated with the same incarnation. +s1:self():generation() + +while s1_view:payload() ~= 'payload 2' do fiber.sleep(0.1) end +s1_view:payload() +is_new_generation + +-- Generation is static parameter. +s1:cfg({generation = 5}) + +s1:delete() +s2:delete() + test_run:cmd("clear filter") diff --git a/test/unit/swim.c b/test/unit/swim.c index bffc0985d..4371b56f0 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -133,7 +133,7 @@ swim_test_cfg(void) { swim_start_test(16); - struct swim *s = swim_new(); + struct swim *s = swim_new(0); assert(s != NULL); is(swim_cfg(s, NULL, -1, -1, -1, NULL), -1, "first cfg failed - no URI"); ok(swim_error_check_match("mandatory"), "diag says 'mandatory'"); @@ -149,7 +149,7 @@ swim_test_cfg(void) is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\ "URI"); - struct swim *s2 = swim_new(); + struct swim *s2 = swim_new(0); assert(s2 != NULL); const char *bad_uri1 = "127.1.1.1.1.1.1:1"; const char *bad_uri2 = "google.com:1"; @@ -391,16 +391,17 @@ swim_test_refute(void) fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_SUSPECTED, 4) != 0); swim_cluster_set_drop(cluster, 1, 0); - is(swim_cluster_wait_age(cluster, 1, 1, 1, 1), 0, + is(swim_cluster_wait_age(cluster, 1, 1, 0, 1, 1), 0, "S2 increments its own incarnation to refute its suspicion"); - is(swim_cluster_wait_age(cluster, 0, 1, 1, 1), 0, + is(swim_cluster_wait_age(cluster, 0, 1, 0, 1, 1), 0, "new incarnation has reached S1 with a next round message"); + fail_if(swim_cluster_member_generation(cluster, 1, 1) != 0); swim_cluster_restart_node(cluster, 1); is(swim_cluster_member_incarnation(cluster, 1, 1), 0, "after restart S2's incarnation is 0 again"); - is(swim_cluster_wait_age(cluster, 1, 1, 1, 1), 0, - "S2 learned its old bigger incarnation 1 from S0"); + is(swim_cluster_member_generation(cluster, 1, 1), 1, + "but generation is new"); swim_cluster_delete(cluster); swim_finish_test(); @@ -527,7 +528,7 @@ swim_test_quit(void) * old LEFT status. */ swim_cluster_restart_node(cluster, 0); - is(swim_cluster_wait_age(cluster, 0, 0, 1, 2), 0, + is(swim_cluster_wait_age(cluster, 0, 0, 1, 0, 2), 0, "quited member S1 has returned and refuted the old status"); fail_if(swim_cluster_wait_fullmesh(cluster, 2) != 0); /* @@ -557,7 +558,7 @@ swim_test_quit(void) /* Now allow S2 to get the 'self-quit' message. */ swim_cluster_unblock_io(cluster, 1); - is(swim_cluster_wait_age(cluster, 1, 1, 2, 0), 0, "S2 finally got "\ + is(swim_cluster_wait_age(cluster, 1, 1, 1, 1, 0), 0, "S2 finally got "\ "'quit' message from S1, but with its 'own' UUID - refute it") swim_cluster_delete(cluster); @@ -1089,7 +1090,7 @@ swim_test_triggers(void) swim_member_unref(tctx.ctx.member); /* Check that recfg fires incarnation update trigger. */ - s1 = swim_new(); + s1 = swim_new(0); struct tt_uuid uuid = uuid_nil; uuid.time_low = 1; fail_if(swim_cfg(s1, "127.0.0.1:1", -1, -1, -1, &uuid) != 0); @@ -1110,10 +1111,39 @@ swim_test_triggers(void) swim_finish_test(); } +static void +swim_test_generation(void) +{ + swim_start_test(3); + + struct swim_cluster *cluster = swim_cluster_new(2); + swim_cluster_interconnect(cluster, 0, 1); + + const char *p1 = "payload 1"; + int p1_size = strlen(p1); + swim_cluster_member_set_payload(cluster, 0, p1, p1_size); + is(swim_cluster_wait_payload_everywhere(cluster, 0, p1, p1_size, 1), 0, + "S1 disseminated its payload to S2"); + + swim_cluster_restart_node(cluster, 0); + const char *p2 = "payload 2"; + int p2_size = strlen(p2); + swim_cluster_member_set_payload(cluster, 0, p2, p2_size); + is(swim_cluster_wait_payload_everywhere(cluster, 0, p2, p2_size, 2), 0, + "S1 restarted and set another payload. Without generation it could "\ + "lead to never disseminated new payload."); + is(swim_cluster_member_generation(cluster, 1, 0), 1, + "S2 sees new generation of S1"); + + swim_cluster_delete(cluster); + + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(21); + swim_start_test(22); (void) ap; swim_test_ev_init(); @@ -1140,6 +1170,7 @@ main_f(va_list ap) swim_test_encryption(); swim_test_slow_net(); swim_test_triggers(); + swim_test_generation(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index 2968a2da7..bad3c30d0 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..21 +1..22 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -89,7 +89,7 @@ ok 7 - subtests ok 1 - S2 increments its own incarnation to refute its suspicion ok 2 - new incarnation has reached S1 with a next round message ok 3 - after restart S2's incarnation is 0 again - ok 4 - S2 learned its old bigger incarnation 1 from S0 + ok 4 - but generation is new ok 8 - subtests *** swim_test_refute: done *** *** swim_test_basic_gossip *** @@ -227,4 +227,11 @@ ok 20 - subtests ok 22 - local URI update warns about incarnation update ok 21 - subtests *** swim_test_triggers: done *** + *** swim_test_generation *** + 1..3 + ok 1 - S1 disseminated its payload to S2 + ok 2 - S1 restarted and set another payload. Without generation it could lead to never disseminated new payload. + ok 3 - S2 sees new generation of S1 +ok 22 - subtests + *** swim_test_generation: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index e646c3a47..228b2f752 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -157,6 +157,11 @@ struct swim_node { * that instance. */ struct tt_uuid uuid; + /** + * Generation counter. Persisted for restarts, when SWIM + * is explicitly deleted before restart. + */ + uint64_t generation; /** * Filter to drop packets with a certain probability * from/to a specified direction. @@ -212,7 +217,8 @@ swim_test_event_cb(struct trigger *trigger, void *event) static inline void swim_node_create(struct swim_node *n, int id) { - n->swim = swim_new(); + n->generation = 0; + n->swim = swim_new(n->generation); assert(n->swim != NULL); struct trigger *t = (struct trigger *) malloc(sizeof(*t)); trigger_create(t, swim_test_event_cb, NULL, (trigger_f0) free); @@ -259,7 +265,8 @@ swim_cluster_new(int size) void swim_cluster_set_ack_timeout(struct swim_cluster *cluster, double ack_timeout) { - swim_cluster_set_cfg(cluster, swim_cfg, NULL, -1, ack_timeout, -1, NULL); + swim_cluster_set_cfg(cluster, swim_cfg, NULL, -1, ack_timeout, -1, + NULL); cluster->ack_timeout = ack_timeout; } @@ -359,6 +366,17 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, return swim_member_incarnation(m); } +uint64_t +swim_cluster_member_generation(struct swim_cluster *cluster, int node_id, + int member_id) +{ + const struct swim_member *m = + swim_cluster_member_view(cluster, node_id, member_id); + if (m == NULL) + return UINT64_MAX; + return swim_member_generation(m); +} + const char * swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, int member_id, int *size) @@ -402,7 +420,7 @@ swim_cluster_restart_node(struct swim_cluster *cluster, int i) &n->uuid)); swim_delete(s); } - s = swim_new(); + s = swim_new(++n->generation); assert(s != NULL); int rc = swim_cfg(s, uri, -1, cluster->ack_timeout, cluster->gc_mode, &n->uuid); @@ -713,6 +731,7 @@ struct swim_member_template { * target. */ bool need_check_age; + uint64_t generation; uint64_t incarnation; /** * True, if the payload should be checked to be equal to @@ -751,9 +770,10 @@ swim_member_template_set_status(struct swim_member_template *t, */ static inline void swim_member_template_set_age(struct swim_member_template *t, - uint64_t incarnation) + uint64_t generation, uint64_t incarnation) { t->need_check_age = true; + t->generation = generation; t->incarnation = incarnation; } @@ -778,22 +798,25 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) const struct swim_member *m = swim_cluster_member_view(cluster, t->node_id, t->member_id); enum swim_member_status status; - uint64_t incarnation; + uint64_t incarnation, generation; const char *payload; int payload_size; if (m != NULL) { status = swim_member_status(m); + generation = swim_member_generation(m); incarnation = swim_member_incarnation(m); payload = swim_member_payload(m, &payload_size); } else { status = swim_member_status_MAX; + generation = 0; incarnation = 0; payload = NULL; payload_size = 0; } if (t->need_check_status && status != t->status) return false; - if (t->need_check_age && incarnation != t->incarnation) + if (t->need_check_age && (generation != t->generation || + incarnation != t->incarnation)) return false; if (t->need_check_payload && (payload_size != t->payload_size || @@ -848,11 +871,11 @@ swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, int swim_cluster_wait_age(struct swim_cluster *cluster, int node_id, int member_id, - uint64_t incarnation, double timeout) + uint64_t generation, uint64_t incarnation, double timeout) { struct swim_member_template t; swim_member_template_create(&t, node_id, member_id); - swim_member_template_set_age(&t, incarnation); + swim_member_template_set_age(&t, generation, incarnation); return swim_wait_timeout(timeout, cluster, swim_loop_check_member, &t); } diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 309636b87..11adc017f 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -160,6 +160,10 @@ uint64_t swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, int member_id); +uint64_t +swim_cluster_member_generation(struct swim_cluster *cluster, int node_id, + int member_id); + const char * swim_cluster_member_payload(struct swim_cluster *cluster, int node_id, int member_id, int *size); @@ -224,7 +228,8 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, */ int swim_cluster_wait_age(struct swim_cluster *cluster, int node_id, int member_id, - uint64_t incarnation, double timeout); + uint64_t generation, uint64_t incarnation, + double timeout); /** * Wait until a member with id @a member_id is seen with -- 2.20.1 (Apple Git-117)