From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 4D10830EAE for ; Sat, 22 Jun 2019 17:17:54 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 9Jcu2QJhm8bJ for ; Sat, 22 Jun 2019 17:17:54 -0400 (EDT) Received: from smtp21.mail.ru (smtp21.mail.ru [94.100.179.250]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id BB9BF30E96 for ; Sat, 22 Jun 2019 17:17:53 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH v2 2/2] swim: introduce incarnation.generation Date: Sat, 22 Jun 2019 23:18:21 +0200 Message-Id: <5efc47df21e3ebf8ab62047ee41db400977c33b0.1561238125.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org SWIM uses incarnation to refute old information, but it is not enough when restarts are possible. If an instance restarts, its incarnation is reset to 0. After several local and fast updates it gets N. But it is possible, that other instances also know incarnation of this instance as N, from its previous life, but with different information. They will never take new version of data, because their current version is also considered actual. As a result, incarnation is not enough. There was a necessity to create a persistent part of incarnation. This patch introduces it and calls 'generation'. As an additional profit, generation allows to react on instance restart in user defined triggers. Closes #4280 @TarantoolBot document Title: SWIM generation Incarnation now is a two-part value {generation, version}. Version is exactly the same that is called 'incarnation' in the original SWIM paper, and before this patch. It is a volatile automatically managed number to refute false gossips and update information on remote nodes. Generation is a new persistent part of incarnation allowing users to refute old pieces of information left from previous lifes of an instance. It is a static attribute set when a SWIM instance is created, and can't be changed without restarting the instance. A one could think of incarnation as 128 bit unsigned integer, where upper 64 bits are static and persistent, while lower 64 bits are volatile. Generation not only helps with overriding old information, but also can be used to detect restarts in user defined triggers, because it can be updated only when a SWIM instance is recreated. How to set generation: ```Lua swim = require('swim') s = swim.new({generation = }) ``` Generation can't be set in `swim:cfg`. If it is omitted, then 0 is used by default. But be careful - if the instance is started not a first time, it is safer to use a new generation. Ideally it should be persisted somehow: in a file, in a space, in a global service. How is incarnation update changed: ```Lua swim = require('swim') s = swim.new() s:on_member_event(function(m, e) if e:is_new_incarnation() then if e:is_new_generation() then -- Process restart. end if e:is_new_version() then -- Process version update. It means -- the member is somehow changed. end end end) ``` Note, `is_new_incarnation` is now a shortcut for checking update of generation, or version, or both. Method `member:incarnation()` is changed. Now it returns cdata object with attributes `version` and `generation`. Usage: ```Lua incarnation = member:incarnation() tarantool> incarnation.version --- - 15 ... tarantool> incarnation.generation --- - 2 ... ``` These objects can be compared using comparison operators: ```Lua member1:incarnation() < member2:incarnation member1:incarnation() >= member2:incarnation() -- Any operator works: ==, <, >, <=, >=, ~=. ``` Being printed, incarnation shows a string with both generation and incarnation. Binary protocol is updated. Now Protocol Logic section looks like this: ``` +-------------------Protocol logic section--------------------+ | map { | | 0 = SWIM_SRC_UUID: 16 byte UUID, | | | | AND | | | | 2 = SWIM_FAILURE_DETECTION: map { | | 0 = SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | | 1 = SWIM_FD_GENERATION: uint, | | 2 = SWIM_FD_VERSION: uint | | }, | | | | OR/AND | | | | 3 = SWIM_DISSEMINATION: array [ | | map { | | 0 = SWIM_MEMBER_STATUS: uint, | | enum member_status, | | 1 = SWIM_MEMBER_ADDRESS: uint, ip, | | 2 = SWIM_MEMBER_PORT: uint, port, | | 3 = SWIM_MEMBER_UUID: 16 byte UUID, | | 4 = SWIM_MEMBER_GENERATION: uint, | | 5 = SWIM_MEMBER_VERSION: uint, | | 6 = SWIM_MEMBER_PAYLOAD: bin | | }, | | ... | | ], | | | | OR/AND | | | | 1 = SWIM_ANTI_ENTROPY: array [ | | map { | | 0 = SWIM_MEMBER_STATUS: uint, | | enum member_status, | | 1 = SWIM_MEMBER_ADDRESS: uint, ip, | | 2 = SWIM_MEMBER_PORT: uint, port, | | 3 = SWIM_MEMBER_UUID: 16 byte UUID, | | 4 = SWIM_MEMBER_GENERATION: uint, | | 5 = SWIM_MEMBER_VERSION: uint, | | 6 = SWIM_MEMBER_PAYLOAD: bin | | }, | | ... | | ], | | | | OR/AND | | | | 4 = SWIM_QUIT: map { | | 0 = SWIM_QUIT_GENERATION: uint, | | 1 = SWIM_QUIT_VERSION: uint | | } | | } | +-------------------------------------------------------------+ ``` Note - SWIM_FD_INCARNATION, SWIM_MEMBER_INCARNATION, and SWIM_QUIT_INCARNATION disappeared. Incarnation is sent now in two parts: version and generation. SWIM_MEMBER_PAYLOAD got a new value. This changes are legal because 1) the SWIM is not released yet, so it is mutable, 2) I wanted to emphasize that 'generation' is first/upper part of incarnation, 'version' is second/lower part. --- src/lib/swim/swim.c | 36 +++++++-- src/lib/swim/swim.h | 13 ++-- src/lib/swim/swim_constants.h | 10 ++- src/lib/swim/swim_proto.c | 23 +++++- src/lib/swim/swim_proto.h | 29 +++++-- src/lua/swim.c | 3 +- src/lua/swim.lua | 40 +++++++--- test/swim/swim.result | 143 +++++++++++++++++++++++++++------- test/swim/swim.test.lua | 36 +++++++++ test/unit/swim.c | 68 ++++++++++++---- test/unit/swim.result | 17 +++- test/unit/swim_test_utils.c | 21 ++--- test/unit/swim_test_utils.h | 4 +- 13 files changed, 352 insertions(+), 91 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 2c3cfa9bc..bb9e9f519 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -222,11 +222,21 @@ swim_incarnation_diff(const struct swim_incarnation *l, const struct swim_incarnation *r, enum swim_ev_mask *diff) { - if (l->version == r->version) { + if (l->generation == r->generation && + l->version == r->version) { *diff = 0; return 0; } *diff = SWIM_EV_NEW_VERSION; + if (l->generation < r->generation) { + *diff |= SWIM_EV_NEW_GENERATION; + return -1; + } + if (l->generation > r->generation) { + *diff |= SWIM_EV_NEW_GENERATION; + return 1; + } + assert(l->version != r->version); return l->version < r->version ? -1 : 1; } @@ -483,6 +493,15 @@ struct swim { struct ev_timer wait_ack_tick; /** GC state saying how to remove dead members. */ enum swim_gc_mode gc_mode; + /** + * Generation of that instance is set when the latter is + * created. It is actual only until the instance is + * configured. After that the instance can learn a bigger + * own generation from other members. Despite meaning + * in fact a wrong usage of SWIM generations, it is still + * possible. + */ + uint64_t initial_generation; /** * * Dissemination component @@ -1683,12 +1702,17 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, return -1; } struct swim_incarnation incarnation; - swim_incarnation_create(&incarnation, 0); + swim_incarnation_create(&incarnation, 0, 0); for (uint32_t i = 0; i < size; ++i) { uint64_t tmp; if (swim_decode_uint(pos, end, &tmp, prefix, "a key") != 0) return -1; switch (tmp) { + case SWIM_QUIT_GENERATION: + if (swim_decode_uint(pos, end, &incarnation.generation, + prefix, "generation") != 0) + return -1; + break; case SWIM_QUIT_VERSION: if (swim_decode_uint(pos, end, &incarnation.version, prefix, "version") != 0) @@ -1829,13 +1853,14 @@ swim_event_handler_f(va_list va) struct swim * -swim_new(void) +swim_new(uint64_t generation) { struct swim *swim = (struct swim *) calloc(1, sizeof(*swim)); if (swim == NULL) { diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim"); return NULL; } + swim->initial_generation = generation; swim->members = mh_swim_table_new(); if (swim->members == NULL) { free(swim); @@ -1941,7 +1966,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } struct swim_incarnation incarnation; - swim_incarnation_create(&incarnation, 0); + swim_incarnation_create(&incarnation, swim->initial_generation, + 0); swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &incarnation, NULL, 0); if (swim->self == NULL) @@ -2062,7 +2088,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { struct swim_incarnation inc; - swim_incarnation_create(&inc, 0); + swim_incarnation_create(&inc, 0, 0); member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, &inc, NULL, -1); return member == NULL ? -1 : 0; diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index b8e44515e..4565eb976 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -68,7 +68,7 @@ enum swim_gc_mode { * yields. */ struct swim * -swim_new(void); +swim_new(uint64_t generation); /** Check if a swim instance is configured. */ bool @@ -279,16 +279,17 @@ enum swim_ev_mask { SWIM_EV_NEW = 0b00000001, SWIM_EV_NEW_STATUS = 0b00000010, SWIM_EV_NEW_URI = 0b00000100, - SWIM_EV_NEW_VERSION = 0b00001000, + SWIM_EV_NEW_GENERATION = 0b00001000, + SWIM_EV_NEW_VERSION = 0b00010000, /* * Shortcut to check for update of any part of * incarnation. */ - SWIM_EV_NEW_INCARNATION = 0b00001000, - SWIM_EV_NEW_PAYLOAD = 0b00010000, + SWIM_EV_NEW_INCARNATION = 0b00011000, + SWIM_EV_NEW_PAYLOAD = 0b00100000, /* Shortcut to check for any update. */ - SWIM_EV_UPDATE = 0b00011110, - SWIM_EV_DROP = 0b00100000, + SWIM_EV_UPDATE = 0b00111110, + SWIM_EV_DROP = 0b01000000, }; /** On member event trigger context. */ diff --git a/src/lib/swim/swim_constants.h b/src/lib/swim/swim_constants.h index f105040c6..cee109c8c 100644 --- a/src/lib/swim/swim_constants.h +++ b/src/lib/swim/swim_constants.h @@ -63,6 +63,12 @@ extern const char *swim_member_status_strs[]; * actual. */ struct swim_incarnation { + /** + * Generation is a persistent part of incarnation. It is + * set by a user on SWIM start, and normally is not + * changed during instance lifetime. + */ + uint64_t generation; /** * Version is a volatile part of incarnation. It is * managed by SWIM fully internally. @@ -72,8 +78,10 @@ struct swim_incarnation { /** Create a new incarnation value. */ static inline void -swim_incarnation_create(struct swim_incarnation *i, uint64_t version) +swim_incarnation_create(struct swim_incarnation *i, uint64_t generation, + uint64_t version) { + i->generation = generation; i->version = version; } diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 31c931b98..615bbf685 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -161,8 +161,10 @@ swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, */ static inline void swim_incarnation_bin_create(struct swim_incarnation_bin *bin, - uint8_t version_key) + uint8_t generation_key, uint8_t version_key) { + bin->k_generation = generation_key; + bin->m_generation = 0xcf; bin->k_version = version_key; bin->m_version = 0xcf; } @@ -175,6 +177,7 @@ static inline void swim_incarnation_bin_fill(struct swim_incarnation_bin *bin, const struct swim_incarnation *incarnation) { + bin->v_generation = mp_bswap_u64(incarnation->generation); bin->v_version = mp_bswap_u64(incarnation->version); } @@ -270,6 +273,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member uuid") != 0) return -1; break; + case SWIM_MEMBER_GENERATION: + if (swim_decode_uint(pos, end, &def->incarnation.generation, + prefix, "member generation") != 0) + return -1; + break; case SWIM_MEMBER_VERSION: if (swim_decode_uint(pos, end, &def->incarnation.version, prefix, "member version") != 0) @@ -342,7 +350,8 @@ swim_fd_header_bin_create(struct swim_fd_header_bin *header, header->k_type = SWIM_FD_MSG_TYPE; header->v_type = type; - swim_incarnation_bin_create(&header->incarnation, SWIM_FD_VERSION); + swim_incarnation_bin_create(&header->incarnation, SWIM_FD_GENERATION, + SWIM_FD_VERSION); swim_incarnation_bin_fill(&header->incarnation, incarnation); } @@ -378,6 +387,12 @@ swim_failure_detection_def_decode(struct swim_failure_detection_def *def, } def->type = key; break; + case SWIM_FD_GENERATION: + if (swim_decode_uint(pos, end, + &def->incarnation.generation, + prefix, "generation") != 0) + return -1; + break; case SWIM_FD_VERSION: if (swim_decode_uint(pos, end, &def->incarnation.version, prefix, @@ -429,6 +444,7 @@ swim_passport_bin_create(struct swim_passport_bin *passport) passport->m_uuid = 0xc4; passport->m_uuid_len = UUID_LEN; swim_incarnation_bin_create(&passport->incarnation, + SWIM_MEMBER_GENERATION, SWIM_MEMBER_VERSION); } @@ -591,7 +607,8 @@ swim_quit_bin_create(struct swim_quit_bin *header, header->k_quit = SWIM_QUIT; assert(mp_sizeof_map(SWIM_INCARNATION_BIN_SIZE) == 1); header->m_quit = 0x80 | SWIM_INCARNATION_BIN_SIZE; - swim_incarnation_bin_create(&header->incarnation, SWIM_QUIT_VERSION); + swim_incarnation_bin_create(&header->incarnation, SWIM_QUIT_GENERATION, + SWIM_QUIT_VERSION); swim_incarnation_bin_fill(&header->incarnation, incarnation); } diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index fee6e078d..d7c2ffd0a 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -72,6 +72,7 @@ enum { * | | * | SWIM_FAILURE_DETECTION: { | * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | + * | SWIM_FD_GENERATION: uint, | * | SWIM_FD_VERSION: uint | * | }, | * | | @@ -83,6 +84,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_GENERATION: uint, | * | SWIM_MEMBER_VERSION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | @@ -97,6 +99,7 @@ enum { * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_GENERATION: uint, | * | SWIM_MEMBER_VERSION: uint, | * | SWIM_MEMBER_PAYLOAD: bin | * | }, | @@ -106,6 +109,7 @@ enum { * | OR/AND | * | | * | SWIM_QUIT: { | + * | SWIM_QUIT_GENERATION: uint, | * | SWIM_QUIT_VERSION: uint | * | } | * | } | @@ -118,10 +122,16 @@ enum { * Structures storing an incarnation should use this size * so as to correctly encode MessagePack map header. */ - SWIM_INCARNATION_BIN_SIZE = 1, + SWIM_INCARNATION_BIN_SIZE = 2, }; struct PACKED swim_incarnation_bin { + /** mp_encode_uint(generation key) */ + uint8_t k_generation; + /** mp_encode_uint(64bit generation) */ + uint8_t m_generation; + uint64_t v_generation; + /** mp_encode_uint(version key) */ uint8_t k_version; /** mp_encode_uint(64bit version) */ @@ -205,6 +215,7 @@ enum swim_fd_key { * it was considered dead, but ping/ack with greater * incarnation was received from it. */ + SWIM_FD_GENERATION, SWIM_FD_VERSION, }; @@ -221,7 +232,7 @@ extern const char *swim_fd_msg_type_strs[]; struct PACKED swim_fd_header_bin { /** mp_encode_uint(SWIM_FAILURE_DETECTION) */ uint8_t k_header; - /** mp_encode_map(2) */ + /** mp_encode_map(3) */ uint8_t m_header; /** mp_encode_uint(SWIM_FD_MSG_TYPE) */ @@ -229,7 +240,7 @@ struct PACKED swim_fd_header_bin { /** mp_encode_uint(enum swim_fd_msg_type) */ uint8_t v_type; - /** SWIM_FD_VERSION */ + /** SWIM_FD_GENERATION, SWIM_FD_VERSION */ struct swim_incarnation_bin incarnation; }; @@ -305,6 +316,7 @@ enum swim_member_key { SWIM_MEMBER_ADDRESS, SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, + SWIM_MEMBER_GENERATION, SWIM_MEMBER_VERSION, SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, @@ -336,7 +348,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, * date and TTD is > 0. */ struct PACKED swim_passport_bin { - /** mp_encode_map(5 or 6) */ + /** mp_encode_map(6 or 7) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -354,7 +366,7 @@ struct PACKED swim_passport_bin { uint8_t m_uuid_len; uint8_t v_uuid[UUID_LEN]; - /** SWIM_MEMBER_VERSION */ + /** SWIM_MEMBER_GENERATION, SWIM_MEMBER_VERSION */ struct swim_incarnation_bin incarnation; }; @@ -575,17 +587,18 @@ swim_route_bin_create(struct swim_route_bin *route, enum swim_quit_key { /** Incarnation to ignore old quit messages. */ - SWIM_QUIT_VERSION = 0, + SWIM_QUIT_GENERATION = 0, + SWIM_QUIT_VERSION, }; /** Quit section. Describes voluntary quit from the cluster. */ struct PACKED swim_quit_bin { /** mp_encode_uint(SWIM_QUIT) */ uint8_t k_quit; - /** mp_encode_map(1) */ + /** mp_encode_map(2) */ uint8_t m_quit; - /** SWIM_QUIT_VERSION */ + /** SWIM_QUIT_GENERATION, SWIM_QUIT_VERSION */ struct swim_incarnation_bin incarnation; }; diff --git a/src/lua/swim.c b/src/lua/swim.c index c3a0a9911..26646f41f 100644 --- a/src/lua/swim.c +++ b/src/lua/swim.c @@ -67,7 +67,8 @@ lua_swim_on_member_event(struct lua_State *L) static int lua_swim_new(struct lua_State *L) { - struct swim *s = swim_new(); + uint64_t generation = luaL_checkuint64(L, 1); + struct swim *s = swim_new(generation); *(struct swim **) luaL_pushcdata(L, ctid_swim_ptr) = s; if (s != NULL) return 1; diff --git a/src/lua/swim.lua b/src/lua/swim.lua index 0686590cb..b6d826ca0 100644 --- a/src/lua/swim.lua +++ b/src/lua/swim.lua @@ -28,14 +28,16 @@ ffi.cdef[[ SWIM_EV_NEW = 0b00000001, SWIM_EV_NEW_STATUS = 0b00000010, SWIM_EV_NEW_URI = 0b00000100, - SWIM_EV_NEW_VERSION = 0b00001000, - SWIM_EV_NEW_INCARNATION = 0b00001000, - SWIM_EV_NEW_PAYLOAD = 0b00010000, - SWIM_EV_UPDATE = 0b00011110, - SWIM_EV_DROP = 0b00100000, + SWIM_EV_NEW_GENERATION = 0b00001000, + SWIM_EV_NEW_VERSION = 0b00010000, + SWIM_EV_NEW_INCARNATION = 0b00011000, + SWIM_EV_NEW_PAYLOAD = 0b00100000, + SWIM_EV_UPDATE = 0b00111110, + SWIM_EV_DROP = 0b01000000, }; struct swim_incarnation { + uint64_t generation; uint64_t version; }; @@ -131,16 +133,19 @@ local swim_member_status_strs = { local swim_incarnation_mt = { __eq = function(l, r) - return l.version == r.version + return l.version == r.version and l.generation == r.generation end, __lt = function(l, r) - return l.version < r.version + return l.generation < r.generation or + l.generation == r.generation and l.version < r.version end, __le = function(l, r) - return l.version <= r.version + return l.generation < r.generation or + l.generation == r.generation and l.version <= r.version end, __tostring = function(i) - return string.format('cdata {version = %s}', i.version) + return string.format('cdata {generation = %s, version = %s}', + i.generation, i.version) end, } ffi.metatype(ffi.typeof('struct swim_incarnation'), swim_incarnation_mt) @@ -747,6 +752,9 @@ local swim_member_event_index = { is_new_incarnation = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0 end, + is_new_generation = function(self) + return bit.band(self[1], capi.SWIM_EV_NEW_GENERATION) ~= 0 + end, is_new_version = function(self) return bit.band(self[1], capi.SWIM_EV_NEW_VERSION) ~= 0 end, @@ -938,7 +946,19 @@ local cache_table_mt = { __mode = 'v' } -- provided. -- local function swim_new(cfg) - local ptr = internal.swim_new() + local generation = 0 + if cfg and type(cfg) == 'table' and cfg.generation then + generation = cfg.generation + if type(generation) ~= 'number' or generation < 0 or + math.floor(generation) ~= generation then + return error('swim.new: generation should be non-negative integer') + end + cfg = table.copy(cfg) + -- Nullify in order to do not raise errors in the + -- following swim:cfg() about unknown parameters. + cfg.generation = nil + end + local ptr = internal.swim_new(generation) if ptr == nil then return nil, box.error.last() end diff --git a/test/swim/swim.result b/test/swim/swim.result index e3b89f809..05a108614 100644 --- a/test/swim/swim.result +++ b/test/swim/swim.result @@ -307,7 +307,7 @@ old_self --- - uri: 127.0.0.1: status: left - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -391,7 +391,7 @@ s --- - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -413,7 +413,7 @@ s:uri() ... s:incarnation() --- -- cdata {version = 1ULL} +- cdata {generation = 0ULL, version = 1ULL} ... s:payload_cdata() --- @@ -490,7 +490,7 @@ s1:member_by_uuid(s:uuid()) --- - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -721,7 +721,7 @@ s1_view:payload() ... s1_view:incarnation() --- -- cdata {version = 1ULL} +- cdata {generation = 0ULL, version = 1ULL} ... s1:set_payload('payload') --- @@ -732,7 +732,7 @@ while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end ... s1_view:incarnation() --- -- cdata {version = 2ULL} +- cdata {generation = 0ULL, version = 2ULL} ... s1:set_payload('payload2') --- @@ -743,7 +743,7 @@ while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end ... s1_view:incarnation() --- -- cdata {version = 3ULL} +- cdata {generation = 0ULL, version = 3ULL} ... s1:delete() --- @@ -778,7 +778,7 @@ iterate() - - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -791,13 +791,13 @@ iterate() - - - 00000000-0000-1000-8000-000000000002 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 0ULL} + incarnation: cdata {generation = 0ULL, version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -810,19 +810,19 @@ iterate() - - - 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 - - 00000000-0000-1000-8000-000000000003 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 0ULL} + incarnation: cdata {generation = 0ULL, version = 0ULL} uuid: 00000000-0000-1000-8000-000000000003 payload_size: 0 - - 00000000-0000-1000-8000-000000000002 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 0ULL} + incarnation: cdata {generation = 0ULL, version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -906,7 +906,7 @@ s1_view:payload() ... s1_view:incarnation() --- -- cdata {version = 3ULL} +- cdata {generation = 0ULL, version = 3ULL} ... s1:cfg({heartbeat_rate = 0.01}) --- @@ -932,7 +932,7 @@ p ... s1_view:incarnation() --- -- cdata {version = 3ULL} +- cdata {generation = 0ULL, version = 3ULL} ... s1:delete() --- @@ -964,7 +964,7 @@ s2 --- - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 0ULL} + incarnation: cdata {generation = 0ULL, version = 0ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -1210,7 +1210,7 @@ s2:member_by_uuid(s1:self():uuid()) --- - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -1287,7 +1287,7 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 0 ... @@ -1329,7 +1329,7 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -1371,17 +1371,17 @@ m_list --- - - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 2ULL} + incarnation: cdata {generation = 0ULL, version = 2ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 8 - uri: 127.0.0.1: status: alive - incarnation: cdata {version = 2ULL} + incarnation: cdata {generation = 0ULL, version = 2ULL} uuid: 00000000-0000-1000-8000-000000000001 payload_size: 8 ... @@ -1390,12 +1390,12 @@ e_list - - is_new_payload: true is_new: true is_update: true - - is_new_version: true - is_new_payload: true + - is_new_payload: true + is_new_version: true is_new_incarnation: true is_update: true - - is_new_version: true - is_new_payload: true + - is_new_payload: true + is_new_version: true is_new_incarnation: true is_update: true ... @@ -1430,12 +1430,12 @@ m_list --- - - uri: 127.0.0.1: status: left - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 - uri: 127.0.0.1: status: left - incarnation: cdata {version = 1ULL} + incarnation: cdata {generation = 0ULL, version = 1ULL} uuid: 00000000-0000-1000-8000-000000000002 payload_size: 0 ... @@ -1487,6 +1487,93 @@ ctx_list s1:delete() --- ... +-- +-- gh-4280: 'generation' counter to detect restarts and refute +-- information left from previous lifes of a SWIM instance. +-- +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 0}) +--- +... +s2 = swim.new({uuid = uuid(2), uri = 0, heartbeat_rate = 0.01}) +--- +... +s2:add_member({uuid = uuid(1), uri = s1:self():uri()}) +--- +- true +... +s1_view = s2:member_by_uuid(uuid(1)) +--- +... +s1:set_payload('payload 1') +--- +- true +... +while not s1_view:payload() do fiber.sleep(0.1) end +--- +... +s1_view:payload() +--- +- payload 1 +... +s1:self():incarnation() +--- +- cdata {generation = 0ULL, version = 2ULL} +... +-- Now S2 knows S1's payload as 'payload 1'. +new_gen_ev = nil +--- +... +_ = s2:on_member_event(function(m, e) if e:is_new_generation() then new_gen_ev = e end end) +--- +... +s1:delete() +--- +... +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 1}) +--- +... +s1:add_member({uuid = uuid(2), uri = s2:self():uri()}) +--- +- true +... +s1:set_payload('payload 2') +--- +- true +... +-- Without the new generation S2 would believe that S1's payload +-- is still 'payload 1'. Because 'payload 1' and 'payload 2' were +-- disseminated with the same version. +s1:self():incarnation() +--- +- cdata {generation = 1ULL, version = 2ULL} +... +while s1_view:payload() ~= 'payload 2' do fiber.sleep(0.1) end +--- +... +s1_view:payload() +--- +- payload 2 +... +new_gen_ev +--- +- is_new_version: true + is_new_generation: true + is_update: true + is_new_payload: true + is_new_uri: true + is_new_incarnation: true +... +-- Generation is static parameter. +s1:cfg({generation = 5}) +--- +- error: 'swim:cfg: unknown option generation' +... +s1:delete() +--- +... +s2:delete() +--- +... test_run:cmd("clear filter") --- - true diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua index 576219b4d..b4e4adafc 100644 --- a/test/swim/swim.test.lua +++ b/test/swim/swim.test.lua @@ -499,4 +499,40 @@ ctx_list s1:delete() +-- +-- gh-4280: 'generation' counter to detect restarts and refute +-- information left from previous lifes of a SWIM instance. +-- +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 0}) +s2 = swim.new({uuid = uuid(2), uri = 0, heartbeat_rate = 0.01}) +s2:add_member({uuid = uuid(1), uri = s1:self():uri()}) +s1_view = s2:member_by_uuid(uuid(1)) +s1:set_payload('payload 1') +while not s1_view:payload() do fiber.sleep(0.1) end +s1_view:payload() +s1:self():incarnation() + +-- Now S2 knows S1's payload as 'payload 1'. + +new_gen_ev = nil +_ = s2:on_member_event(function(m, e) if e:is_new_generation() then new_gen_ev = e end end) +s1:delete() +s1 = swim.new({uuid = uuid(1), uri = 0, heartbeat_rate = 0.01, generation = 1}) +s1:add_member({uuid = uuid(2), uri = s2:self():uri()}) +s1:set_payload('payload 2') +-- Without the new generation S2 would believe that S1's payload +-- is still 'payload 1'. Because 'payload 1' and 'payload 2' were +-- disseminated with the same version. +s1:self():incarnation() + +while s1_view:payload() ~= 'payload 2' do fiber.sleep(0.1) end +s1_view:payload() +new_gen_ev + +-- Generation is static parameter. +s1:cfg({generation = 5}) + +s1:delete() +s2:delete() + test_run:cmd("clear filter") diff --git a/test/unit/swim.c b/test/unit/swim.c index 63f816d4a..3486d3f73 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -133,7 +133,7 @@ swim_test_cfg(void) { swim_start_test(16); - struct swim *s = swim_new(); + struct swim *s = swim_new(0); assert(s != NULL); is(swim_cfg(s, NULL, -1, -1, -1, NULL), -1, "first cfg failed - no URI"); ok(swim_error_check_match("mandatory"), "diag says 'mandatory'"); @@ -149,7 +149,7 @@ swim_test_cfg(void) is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\ "URI"); - struct swim *s2 = swim_new(); + struct swim *s2 = swim_new(0); assert(s2 != NULL); const char *bad_uri1 = "127.1.1.1.1.1.1:1"; const char *bad_uri2 = "google.com:1"; @@ -379,7 +379,7 @@ swim_test_probe(void) static void swim_test_refute(void) { - swim_start_test(4); + swim_start_test(6); struct swim_cluster *cluster = swim_cluster_new(2); swim_cluster_set_ack_timeout(cluster, 2); @@ -391,16 +391,21 @@ swim_test_refute(void) fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_SUSPECTED, 4) != 0); swim_cluster_set_drop(cluster, 1, 0); - is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, + is(swim_cluster_wait_incarnation(cluster, 1, 1, 0, 1, 1), 0, "S2 increments its own incarnation to refute its suspicion"); - is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 1), 0, + is(swim_cluster_wait_incarnation(cluster, 0, 1, 0, 1, 1), 0, "new incarnation has reached S1 with a next round message"); swim_cluster_restart_node(cluster, 1); - is(swim_cluster_member_incarnation(cluster, 1, 1).version, 0, - "after restart S2's incarnation is default again"); - is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, - "S2 learned its old bigger incarnation 1 from S0"); + struct swim_incarnation inc = + swim_cluster_member_incarnation(cluster, 1, 1); + is(inc.version, 0, "after restart S2's version is 0 again"); + is(inc.generation, 1, "but generation is new"); + + is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 0, 1), 0, + "S2 disseminates new incarnation, S1 learns it"); + is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE, + "and considers S2 alive"); swim_cluster_delete(cluster); swim_finish_test(); @@ -527,7 +532,7 @@ swim_test_quit(void) * old LEFT status. */ swim_cluster_restart_node(cluster, 0); - is(swim_cluster_wait_incarnation(cluster, 0, 0, 1, 2), 0, + is(swim_cluster_wait_incarnation(cluster, 0, 0, 1, 0, 2), 0, "quited member S1 has returned and refuted the old status"); fail_if(swim_cluster_wait_fullmesh(cluster, 2) != 0); /* @@ -555,9 +560,14 @@ swim_test_quit(void) is(swim_cluster_member_status(cluster, 2, 0), swim_member_status_MAX, "S3 did not add S1 back when received its 'quit'"); - /* Now allow S2 to get the 'self-quit' message. */ + /* + * Now allow S2 to get the 'self-quit' message. Note, + * together with 'quit' it receives new generation, which + * belonged to S1 before. Of course, it is a bug, but in + * a user application - UUIDs are messed. + */ swim_cluster_unblock_io(cluster, 1); - is(swim_cluster_wait_incarnation(cluster, 1, 1, 2, 0), 0, + is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1, 0), 0, "S2 finally got 'quit' message from S1, but with its 'own' UUID - "\ "refute it") swim_cluster_delete(cluster); @@ -1090,7 +1100,7 @@ swim_test_triggers(void) swim_member_unref(tctx.ctx.member); /* Check that recfg fires version update trigger. */ - s1 = swim_new(); + s1 = swim_new(0); struct tt_uuid uuid = uuid_nil; uuid.time_low = 1; fail_if(swim_cfg(s1, "127.0.0.1:1", -1, -1, -1, &uuid) != 0); @@ -1113,10 +1123,39 @@ swim_test_triggers(void) swim_finish_test(); } +static void +swim_test_generation(void) +{ + swim_start_test(3); + + struct swim_cluster *cluster = swim_cluster_new(2); + swim_cluster_interconnect(cluster, 0, 1); + + const char *p1 = "payload 1"; + int p1_size = strlen(p1); + swim_cluster_member_set_payload(cluster, 0, p1, p1_size); + is(swim_cluster_wait_payload_everywhere(cluster, 0, p1, p1_size, 1), 0, + "S1 disseminated its payload to S2"); + + swim_cluster_restart_node(cluster, 0); + const char *p2 = "payload 2"; + int p2_size = strlen(p2); + swim_cluster_member_set_payload(cluster, 0, p2, p2_size); + is(swim_cluster_wait_payload_everywhere(cluster, 0, p2, p2_size, 2), 0, + "S1 restarted and set another payload. Without generation it could "\ + "lead to never disseminated new payload."); + is(swim_cluster_member_incarnation(cluster, 1, 0).generation, 1, + "S2 sees new generation of S1"); + + swim_cluster_delete(cluster); + + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(21); + swim_start_test(22); (void) ap; swim_test_ev_init(); @@ -1143,6 +1182,7 @@ main_f(va_list ap) swim_test_encryption(); swim_test_slow_net(); swim_test_triggers(); + swim_test_generation(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index 8d653477b..04a2778e6 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..21 +1..22 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -85,11 +85,13 @@ ok 6 - subtests ok 7 - subtests *** swim_test_probe: done *** *** swim_test_refute *** - 1..4 + 1..6 ok 1 - S2 increments its own incarnation to refute its suspicion ok 2 - new incarnation has reached S1 with a next round message - ok 3 - after restart S2's incarnation is default again - ok 4 - S2 learned its old bigger incarnation 1 from S0 + ok 3 - after restart S2's version is 0 again + ok 4 - but generation is new + ok 5 - S2 disseminates new incarnation, S1 learns it + ok 6 - and considers S2 alive ok 8 - subtests *** swim_test_refute: done *** *** swim_test_basic_gossip *** @@ -228,4 +230,11 @@ ok 20 - subtests ok 23 - version is a part of incarnation, so the latter is updated too ok 21 - subtests *** swim_test_triggers: done *** + *** swim_test_generation *** + 1..3 + ok 1 - S1 disseminated its payload to S2 + ok 2 - S1 restarted and set another payload. Without generation it could lead to never disseminated new payload. + ok 3 - S2 sees new generation of S1 +ok 22 - subtests + *** swim_test_generation: done *** *** main_f: done *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 72149b353..c56af2233 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -157,6 +157,8 @@ struct swim_node { * that instance. */ struct tt_uuid uuid; + /** Generation to increment on restart. */ + uint64_t generation; /** * Filter to drop packets with a certain probability * from/to a specified direction. @@ -212,7 +214,8 @@ swim_test_event_cb(struct trigger *trigger, void *event) static inline void swim_node_create(struct swim_node *n, int id) { - n->swim = swim_new(); + n->generation = 0; + n->swim = swim_new(0); assert(n->swim != NULL); struct trigger *t = (struct trigger *) malloc(sizeof(*t)); trigger_create(t, swim_test_event_cb, NULL, (trigger_f0) free); @@ -356,7 +359,7 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, swim_cluster_member_view(cluster, node_id, member_id); if (m == NULL) { struct swim_incarnation inc; - swim_incarnation_create(&inc, UINT64_MAX); + swim_incarnation_create(&inc, UINT64_MAX, UINT64_MAX); return inc; } return swim_member_incarnation(m); @@ -405,7 +408,7 @@ swim_cluster_restart_node(struct swim_cluster *cluster, int i) &n->uuid)); swim_delete(s); } - s = swim_new(); + s = swim_new(++n->generation); assert(s != NULL); int rc = swim_cfg(s, uri, -1, cluster->ack_timeout, cluster->gc_mode, &n->uuid); @@ -754,10 +757,10 @@ swim_member_template_set_status(struct swim_member_template *t, */ static inline void swim_member_template_set_incarnation(struct swim_member_template *t, - uint64_t version) + uint64_t generation, uint64_t version) { t->need_check_incarnation = true; - swim_incarnation_create(&t->incarnation, version); + swim_incarnation_create(&t->incarnation, generation, version); } /** @@ -790,7 +793,7 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data) payload = swim_member_payload(m, &payload_size); } else { status = swim_member_status_MAX; - swim_incarnation_create(&incarnation, 0); + swim_incarnation_create(&incarnation, 0, 0); payload = NULL; payload_size = 0; } @@ -852,12 +855,12 @@ swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, int swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, - int member_id, uint64_t version, - double timeout) + int member_id, uint64_t generation, + uint64_t version, double timeout) { struct swim_member_template t; swim_member_template_create(&t, node_id, member_id); - swim_member_template_set_incarnation(&t, version); + swim_member_template_set_incarnation(&t, generation, version); return swim_wait_timeout(timeout, cluster, swim_loop_check_member, &t); } diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 587118c60..7064aa67d 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -224,8 +224,8 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id, */ int swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, - int member_id, uint64_t version, - double timeout); + int member_id, uint64_t generation, + uint64_t version, double timeout); /** * Wait until a member with id @a member_id is seen with -- 2.20.1 (Apple Git-117)