* [tarantool-patches] [PATCH 0/6] SWIM failure detection draft @ 2019-03-20 10:49 Vladislav Shpilevoy 2019-03-20 10:49 ` [tarantool-patches] [PATCH 1/6] swim: follow-ups for SWIM anti-entropy Vladislav Shpilevoy ` (6 more replies) 0 siblings, 7 replies; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-03-20 10:49 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja It is a second series of patches about SWIM protocol implementation. This one contains some follow-ups for the previous patchset about anti-entropy component, preparations for the failure detection component, and the latter itself. Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-3234-swim-failure-detection Issue: https://github.com/tarantool/tarantool/issues/3234 Vladislav Shpilevoy (6): swim: follow-ups for SWIM anti-entropy test: introduce breakpoints for swim's event loop test: remove swim_unblock_fd event from swim test harness swim: expose enum swim_member_status to public API test: differentiate blocked and closed swim fake fds [RAW] swim: introduce failure detection component src/lib/swim/swim.c | 479 ++++++++++++++++++++++++++++++-- src/lib/swim/swim.h | 42 ++- src/lib/swim/swim_io.c | 32 ++- src/lib/swim/swim_io.h | 22 +- src/lib/swim/swim_proto.c | 83 +++++- src/lib/swim/swim_proto.h | 115 +++++++- test/unit/swim.c | 189 +++++++++++-- test/unit/swim.result | 62 ++++- test/unit/swim_test_ev.c | 46 ++- test/unit/swim_test_ev.h | 8 +- test/unit/swim_test_transport.c | 78 ++++-- test/unit/swim_test_transport.h | 9 + test/unit/swim_test_utils.c | 139 ++++++++- test/unit/swim_test_utils.h | 48 +++- 14 files changed, 1219 insertions(+), 133 deletions(-) -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] [PATCH 1/6] swim: follow-ups for SWIM anti-entropy 2019-03-20 10:49 [tarantool-patches] [PATCH 0/6] SWIM failure detection draft Vladislav Shpilevoy @ 2019-03-20 10:49 ` Vladislav Shpilevoy 2019-03-29 8:27 ` [tarantool-patches] " Konstantin Osipov 2019-03-20 10:49 ` [tarantool-patches] [PATCH 2/6] test: introduce breakpoints for swim's event loop Vladislav Shpilevoy ` (5 subsequent siblings) 6 siblings, 1 reply; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-03-20 10:49 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja * fix some obvious errors in swim test utils; * fix a bug with NULL URI garbage on recfg; * fix typos in a comment and in a log message in swim.c; * do not start any timers in swim_cfg. Indeed, round timer should start only when at least one new member is added except self, and it is already done in swim_new_member; * log not only round begin, but each round step - it helps in debug, but does not affect production anyway because the logs are verbose; * in SWIM's event loop log new watch value instead of the old one - turned out, that new is more useful for debug; * log 'process <name> component' inside swim_process_<name>() functions. It is needed for failure detection, where a log of kind 'process failure detection' says nothing - much better to say 'process ping from', or 'process ack'; * in swim tests instead of swim_cluster_wait_...(max_steps) use swim_cluster_wait_...(timeout). Step count restriction appeared to be useful for anti-entropy being equal to number of round steps, but it is not so once failure detection appears. Replies for failure detection requests does not depend on SWIM heartbeat and affect step count in a not trivial way - it makes test writing, debugging and supporting much harder. Follow-up for 03b9a6e91baf246ee2bb9841d01ba3824b6768a6 --- src/lib/swim/swim.c | 23 ++++++++++++++--------- src/lib/swim/swim_io.c | 9 +++++---- src/lib/swim/swim_io.h | 6 +++++- src/lib/swim/swim_proto.c | 1 + test/unit/swim.c | 5 ++++- test/unit/swim.result | 19 ++++++++++--------- test/unit/swim_test_ev.c | 2 +- test/unit/swim_test_transport.c | 13 ++++++++++--- test/unit/swim_test_utils.c | 13 ++++++++----- test/unit/swim_test_utils.h | 6 ++---- 10 files changed, 60 insertions(+), 37 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 4b401800a..c2b2132a2 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -206,7 +206,7 @@ struct swim_member { */ struct tt_uuid uuid; /** - * Cached hash of the uuid for the members table lookups. + * Cached hash of the uuid for the member table lookups. */ uint32_t hash; /** @@ -406,7 +406,8 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node"); return NULL; } - swim_ev_timer_start(loop(), &swim->round_tick); + if (mh_size(swim->members) > 1) + swim_ev_timer_start(loop(), &swim->round_tick); say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim), swim_uuid_str(&member->uuid), mh_size(swim->members)); return member; @@ -448,8 +449,9 @@ swim_new_round(struct swim *swim) swim_fd(swim)); return 0; } + /* -1 for self. */ say_verbose("SWIM %d: start a new round with %d members", swim_fd(swim), - size); + size - 1); swim_shuffle_members(swim); rlist_create(&swim->round_queue); for (int i = 0; i < size; ++i) { @@ -550,7 +552,9 @@ swim_begin_step(struct ev_loop *loop, struct ev_timer *t, int events) (void) events; (void) loop; struct swim *swim = (struct swim *) t->data; - if (rlist_empty(&swim->round_queue) && swim_new_round(swim) != 0) { + if (! rlist_empty(&swim->round_queue)) { + say_verbose("SWIM %d: continue the round", swim_fd(swim)); + } else if (swim_new_round(swim) != 0) { diag_log(); return; } @@ -675,6 +679,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def) static int swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) { + say_verbose("SWIM %d: process anti-entropy", swim_fd(swim)); const char *prefix = "invalid anti-entropy message:"; uint32_t size; if (swim_decode_array(pos, end, &size, prefix, "root") != 0) @@ -726,8 +731,6 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos, goto error; switch(key) { case SWIM_ANTI_ENTROPY: - say_verbose("SWIM %d: process anti-entropy", - swim_fd(swim)); if (swim_process_anti_entropy(swim, &pos, end) != 0) goto error; break; @@ -760,7 +763,8 @@ swim_new(void) swim_ev_timer_init(&swim->round_tick, swim_begin_step, HEARTBEAT_RATE_DEFAULT, 0); swim->round_tick.data = (void *) swim; - swim_task_create(&swim->round_step_task, swim_complete_step, NULL); + swim_task_create(&swim->round_step_task, swim_complete_step, NULL, + "round packet"); swim_scheduler_create(&swim->scheduler, swim_on_input); return swim; } @@ -845,11 +849,12 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, * specified. */ addr = swim->scheduler.transport.addr; + } else { + addr = swim->self->addr; } if (swim->round_tick.at != heartbeat_rate && heartbeat_rate > 0) swim_ev_timer_set(&swim->round_tick, heartbeat_rate, 0); - swim_ev_timer_start(loop(), &swim->round_tick); swim_update_member_addr(swim, swim->self, &addr); int rc = swim_update_member_uuid(swim, swim->self, uuid); /* Reserved above. */ @@ -892,7 +897,7 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid) assert(swim_is_configured(swim)); const char *prefix = "swim.remove_member:"; if (uuid == NULL || tt_uuid_is_nil(uuid)) { - diag_set(SwimError, "%s UUiD is mandatory", prefix); + diag_set(SwimError, "%s UUID is mandatory", prefix); return -1; } struct swim_member *member = swim_find_member(swim, uuid); diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index 9c16d1ad3..015968a0d 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -57,11 +57,12 @@ swim_packet_create(struct swim_packet *packet) void swim_task_create(struct swim_task *task, swim_task_f complete, - swim_task_f cancel) + swim_task_f cancel, const char *desc) { memset(task, 0, sizeof(*task)); task->complete = complete; task->cancel = cancel; + task->desc = desc; swim_packet_create(&task->packet); rlist_create(&task->in_queue_output); } @@ -170,9 +171,9 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) struct swim_task *task = rlist_shift_entry(&scheduler->queue_output, struct swim_task, in_queue_output); - say_verbose("SWIM %d: send to %s", swim_scheduler_fd(scheduler), - sio_strfaddr((struct sockaddr *) &task->dst, - sizeof(task->dst))); + say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler), + task->desc, sio_strfaddr((struct sockaddr *) &task->dst, + sizeof(task->dst))); struct swim_meta_header_bin header; swim_meta_header_bin_create(&header, &scheduler->transport.addr); memcpy(task->packet.meta, &header, sizeof(header)); diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index 68fb89818..bc62a29ce 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -201,6 +201,10 @@ struct swim_task { struct sockaddr_in dst; /** Place in a queue of tasks. */ struct rlist in_queue_output; + /** + * A short description of the packet content. For logging. + */ + const char *desc; }; /** @@ -213,7 +217,7 @@ swim_task_send(struct swim_task *task, const struct sockaddr_in *dst, /** Initialize the task, without scheduling. */ void swim_task_create(struct swim_task *task, swim_task_f complete, - swim_task_f cancel); + swim_task_f cancel, const char *desc); /** Destroy the task, pop from the queue. */ static inline void diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 9c0d49657..bf4c09b24 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -288,6 +288,7 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos, if (swim_decode_map(pos, end, &size, prefix, "root") != 0) return -1; memset(def, 0, sizeof(*def)); + def->src.sin_family = AF_INET; for (uint32_t i = 0; i < size; ++i) { uint64_t key; if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) diff --git a/test/unit/swim.c b/test/unit/swim.c index 29e9eb4f4..921fc8f07 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -109,7 +109,7 @@ swim_test_uuid_update(void) static void swim_test_cfg(void) { - swim_start_test(15); + swim_start_test(16); struct swim *s = swim_new(); assert(s != NULL); @@ -123,6 +123,9 @@ swim_test_cfg(void) is(swim_cfg(s, uri, -1, &uuid), 0, "configured first time"); is(swim_cfg(s, NULL, -1, NULL), 0, "second time can omit URI, UUID"); is(swim_cfg(s, NULL, 2, NULL), 0, "hearbeat is dynamic"); + const char *self_uri = swim_member_uri(swim_self(s)); + is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\ + "URI"); struct swim *s2 = swim_new(); assert(s2 != NULL); diff --git a/test/unit/swim.result b/test/unit/swim.result index e71d6cfc2..e8991d8d8 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -19,7 +19,7 @@ ok 2 - subtests ok 3 - subtests *** swim_test_uuid_update: done *** *** swim_test_cfg *** - 1..15 + 1..16 ok 1 - first cfg failed - no URI ok 2 - diag says 'mandatory' ok 3 - first cfg failed - no UUID @@ -27,14 +27,15 @@ ok 3 - subtests ok 5 - configured first time ok 6 - second time can omit URI, UUID ok 7 - hearbeat is dynamic - ok 8 - can not use invalid URI - ok 9 - diag says 'invalid uri' - ok 10 - can not use domain names - ok 11 - diag says 'invalid uri' - ok 12 - UNIX sockets are not supported - ok 13 - diag says 'only IP' - ok 14 - can not bind to an occupied port - ok 15 - diag says 'bind' + ok 8 - URI is unchanged after recfg with NULL URI + ok 9 - can not use invalid URI + ok 10 - diag says 'invalid uri' + ok 11 - can not use domain names + ok 12 - diag says 'invalid uri' + ok 13 - UNIX sockets are not supported + ok 14 - diag says 'only IP' + ok 15 - can not bind to an occupied port + ok 16 - diag says 'bind' ok 4 - subtests *** swim_test_cfg: done *** *** swim_test_add_remove *** diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c index 950784aec..ee1fcdbb7 100644 --- a/test/unit/swim_test_ev.c +++ b/test/unit/swim_test_ev.c @@ -289,12 +289,12 @@ swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *base) void swim_do_loop_step(struct ev_loop *loop) { - say_verbose("Loop watch %f", watch); struct swim_event *next_e, *e = event_heap_top(&event_heap); if (e != NULL) { assert(e->deadline >= watch); /* Multiple events can have the same deadline. */ watch = e->deadline; + say_verbose("Loop watch %f", watch); do { e->process(e, loop); next_e = event_heap_top(&event_heap); diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c index ee50e3922..d6591e969 100644 --- a/test/unit/swim_test_transport.c +++ b/test/unit/swim_test_transport.c @@ -78,6 +78,13 @@ swim_test_packet_new(const char *data, int size, const struct sockaddr_in *src, return p; } +/** Free packet memory. */ +static inline void +swim_test_packet_delete(struct swim_test_packet *p) +{ + free(p); +} + /** Fake file descriptor. */ struct swim_fd { /** File descriptor number visible to libev. */ @@ -122,9 +129,9 @@ swim_fd_close(struct swim_fd *fd) { struct swim_test_packet *i, *tmp; rlist_foreach_entry_safe(i, &fd->recv_queue, in_queue, tmp) - free(i); + swim_test_packet_delete(i); rlist_foreach_entry_safe(i, &fd->send_queue, in_queue, tmp) - free(i); + swim_test_packet_delete(i); rlist_del_entry(fd, in_opened); } @@ -188,7 +195,7 @@ swim_transport_recv(struct swim_transport *transport, void *buffer, size_t size, *addr_size = sizeof(p->src); ssize_t result = MIN((size_t) p->size, size); memcpy(buffer, p->data, result); - free(p); + swim_test_packet_delete(p); return result; } diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 73f8db40f..a92e55233 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -31,6 +31,7 @@ #include "swim_test_utils.h" #include "swim_test_ev.h" #include "swim/swim.h" +#include "swim/swim_ev.h" #include "uuid/tt_uuid.h" #include "trivia/util.h" #include "fiber.h" @@ -111,7 +112,7 @@ swim1_contains_swim2(struct swim *s1, struct swim *s2) } } swim_iterator_close(it); - return false; + return true; } bool @@ -129,13 +130,15 @@ swim_cluster_is_fullmesh(struct swim_cluster *cluster) } int -swim_cluster_wait_fullmesh(struct swim_cluster *cluster, int max_steps) +swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout) { - while (! swim_cluster_is_fullmesh(cluster) && max_steps > 0) { + double deadline = swim_time() + timeout; + while (! swim_cluster_is_fullmesh(cluster)) { + if (swim_time() >= deadline) + return -1; swim_do_loop_step(loop()); - --max_steps; } - return max_steps < 0 ? -1 : 0; + return 0; } bool diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 56036422d..90962b658 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -74,11 +74,9 @@ swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id); bool swim_cluster_is_fullmesh(struct swim_cluster *cluster); -/** - * Wait for fullmesh at most @a max_steps event loop iterations. - */ +/** Wait for fullmesh at most @a timeout fake seconds. */ int -swim_cluster_wait_fullmesh(struct swim_cluster *cluster, int max_steps); +swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout); #define swim_start_test(n) { \ header(); \ -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 1/6] swim: follow-ups for SWIM anti-entropy 2019-03-20 10:49 ` [tarantool-patches] [PATCH 1/6] swim: follow-ups for SWIM anti-entropy Vladislav Shpilevoy @ 2019-03-29 8:27 ` Konstantin Osipov 2019-03-29 10:19 ` Vladislav Shpilevoy 0 siblings, 1 reply; 32+ messages in thread From: Konstantin Osipov @ 2019-03-29 8:27 UTC (permalink / raw) To: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: OK to push. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 1/6] swim: follow-ups for SWIM anti-entropy 2019-03-29 8:27 ` [tarantool-patches] " Konstantin Osipov @ 2019-03-29 10:19 ` Vladislav Shpilevoy 0 siblings, 0 replies; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-03-29 10:19 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed to 2.1 and master. On 29/03/2019 11:27, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: > > OK to push. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] [PATCH 2/6] test: introduce breakpoints for swim's event loop 2019-03-20 10:49 [tarantool-patches] [PATCH 0/6] SWIM failure detection draft Vladislav Shpilevoy 2019-03-20 10:49 ` [tarantool-patches] [PATCH 1/6] swim: follow-ups for SWIM anti-entropy Vladislav Shpilevoy @ 2019-03-20 10:49 ` Vladislav Shpilevoy 2019-03-29 18:20 ` [tarantool-patches] " Konstantin Osipov 2019-03-20 10:49 ` [tarantool-patches] [PATCH 3/6] test: remove swim_unblock_fd event from swim test harness Vladislav Shpilevoy ` (4 subsequent siblings) 6 siblings, 1 reply; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-03-20 10:49 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja Breakpoint as API gives a test writer more control over timing of condition checks. Breakpoint stops the swim's event loop in a certain moment of virtual time. Without breakpoints it is possible, that a condition has failed its deadline, but it can not be checked properly. For example, assume that there is a cluster of two members, and after 1 second they should become fullmesh. It means, that any checks in [0, 1) time have to fail. But without breakpoints it is not so: // event_queue: [round_step, 1 sec] // time: 0 swim_cluster_wait_fullmesh(cluster, 0.5); // Fails. // event_queue: [] // time: 1 swim_cluster_wait_fullmesh(cluster, 0.1); // Success. The second test should fail, but it does not, because global time is already 1 after the first test and the cluster is in fullmesh already. It looks weird, so such checks should be done not later than deadline. Follow-up for 03b9a6e91baf246ee2bb9841d01ba3824b6768a6 --- test/unit/swim.c | 6 ++++-- test/unit/swim.result | 5 +++-- test/unit/swim_test_ev.c | 39 +++++++++++++++++++++++++++++++++++++ test/unit/swim_test_ev.h | 8 ++++++++ test/unit/swim_test_utils.c | 26 ++++++++++++++++++------- 5 files changed, 73 insertions(+), 11 deletions(-) diff --git a/test/unit/swim.c b/test/unit/swim.c index 921fc8f07..3a97aeb18 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -50,7 +50,7 @@ static int test_result; static void swim_test_one_link(void) { - swim_start_test(1); + swim_start_test(2); /* * Run a simple cluster of two elements. One of them * learns about another explicitly. Another should add the @@ -58,7 +58,9 @@ swim_test_one_link(void) */ struct swim_cluster *cluster = swim_cluster_new(2); fail_if(swim_cluster_add_link(cluster, 0, 1) != 0); - is(swim_cluster_wait_fullmesh(cluster, 1), 0, "one link"); + is(swim_cluster_wait_fullmesh(cluster, 0.9), -1, + "no rounds - no fullmesh"); + is(swim_cluster_wait_fullmesh(cluster, 0.1), 0, "one link"); swim_cluster_delete(cluster); swim_finish_test(); diff --git a/test/unit/swim.result b/test/unit/swim.result index e8991d8d8..b58325b73 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,8 +1,9 @@ *** main_f *** 1..5 *** swim_test_one_link *** - 1..1 - ok 1 - one link + 1..2 + ok 1 - no rounds - no fullmesh + ok 2 - one link ok 1 - subtests *** swim_test_one_link: done *** *** swim_test_sequence *** diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c index ee1fcdbb7..d4a0a4752 100644 --- a/test/unit/swim_test_ev.c +++ b/test/unit/swim_test_ev.c @@ -55,6 +55,7 @@ static int event_id = 0; enum swim_event_type { SWIM_EVENT_TIMER, SWIM_EVENT_FD_UNBLOCK, + SWIM_EVENT_BRK, }; struct swim_event; @@ -250,6 +251,44 @@ swim_test_ev_block_fd(int fd, double delay) e->fd = fd; } +/** + * Breakpoint event for debug. It does nothing but stops the event + * loop after a timeout to allow highlevel API to check some + * cases. + */ +struct swim_brk_event { + struct swim_event base; +}; + +/** Delete a breakpoint event. */ +static void +swim_brk_event_delete(struct swim_event *e) +{ + assert(e->type == SWIM_EVENT_BRK); + swim_event_destroy(e); + free(e); +} + +/** + * Breakpoint event processing is nothing but the event deletion. + */ +static void +swim_brk_event_process(struct swim_event *e, struct ev_loop *loop) +{ + (void) loop; + assert(e->type == SWIM_EVENT_BRK); + swim_brk_event_delete(e); +} + +void +swim_ev_set_brk(double delay) +{ + struct swim_brk_event *e = (struct swim_brk_event *) malloc(sizeof(*e)); + assert(e != NULL); + swim_event_create(&e->base, SWIM_EVENT_BRK, delay, + swim_brk_event_process, swim_brk_event_delete); +} + /** Implementation of global time visible in SWIM. */ double swim_time(void) diff --git a/test/unit/swim_test_ev.h b/test/unit/swim_test_ev.h index 808bc510e..01a1b8868 100644 --- a/test/unit/swim_test_ev.h +++ b/test/unit/swim_test_ev.h @@ -51,6 +51,14 @@ swim_test_ev_free(void); void swim_test_ev_block_fd(int fd, double delay); +/** + * Stop the event loop after @a delay fake seconds. It does not + * affect other events, so the loop can stop earlier multiple + * times. + */ +void +swim_ev_set_brk(double delay); + /** Play one step of event loop, process generated events. */ void swim_do_loop_step(struct ev_loop *loop); diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index a92e55233..0b301333b 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -129,16 +129,28 @@ swim_cluster_is_fullmesh(struct swim_cluster *cluster) return true; } +/** + * A common wrapper for some conditions checking after each event + * loop step. + */ +#define swim_wait_timeout(timeout, target_cond) ({ \ + swim_ev_set_brk(timeout); \ + double deadline = swim_time() + timeout; \ + int rc = 0; \ + while (! (target_cond)) { \ + if (swim_time() >= deadline) { \ + rc = -1; \ + break; \ + } \ + swim_do_loop_step(loop()); \ + } \ + rc; \ +}) + int swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout) { - double deadline = swim_time() + timeout; - while (! swim_cluster_is_fullmesh(cluster)) { - if (swim_time() >= deadline) - return -1; - swim_do_loop_step(loop()); - } - return 0; + return swim_wait_timeout(timeout, swim_cluster_is_fullmesh(cluster)); } bool -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 2/6] test: introduce breakpoints for swim's event loop 2019-03-20 10:49 ` [tarantool-patches] [PATCH 2/6] test: introduce breakpoints for swim's event loop Vladislav Shpilevoy @ 2019-03-29 18:20 ` Konstantin Osipov 2019-04-02 12:25 ` Vladislav Shpilevoy 0 siblings, 1 reply; 32+ messages in thread From: Konstantin Osipov @ 2019-03-29 18:20 UTC (permalink / raw) To: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: > Breakpoint as API gives a test writer more control over timing of > condition checks. Breakpoint stops the swim's event loop in a > certain moment of virtual time. > > Without breakpoints it is possible, that a condition has failed > its deadline, but it can not be checked properly. For example, > assume that there is a cluster of two members, and after 1 second > they should become fullmesh. It means, that any checks in [0, 1) > time have to fail. But without breakpoints it is not so: From the description of the event, I would actually expect it to stop the event loop, while you're stopping the event loop from the helper macro. Why is it a macro btw? Why not make it a function? Anyway, from this description I would expect that swim_brk_event_process does something like ev_loop_break(), and then the loop has to be resumed. It's fully up to you to do it differently, but please fix the docs so that there is no ambiguity. And, once again, does swim_wait_timeout have to be a macro? As a final nit, I would prefix all testing APIs with a name that would make sure they are never confused with the actual SWIM API. E.g. swim_wait_timeout() could be easily confused with some public method by a newbie. Perhaps choose an entirely different prefix for the testing harness methods like swim_unit_* swim_ut_ or swum? > -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 2/6] test: introduce breakpoints for swim's event loop 2019-03-29 18:20 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-02 12:25 ` Vladislav Shpilevoy 2019-04-02 19:16 ` Vladislav Shpilevoy 0 siblings, 1 reply; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-04-02 12:25 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Hi! On 29/03/2019 21:20, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: >> Breakpoint as API gives a test writer more control over timing of >> condition checks. Breakpoint stops the swim's event loop in a >> certain moment of virtual time. >> >> Without breakpoints it is possible, that a condition has failed >> its deadline, but it can not be checked properly. For example, >> assume that there is a cluster of two members, and after 1 second >> they should become fullmesh. It means, that any checks in [0, 1) >> time have to fail. But without breakpoints it is not so: > > From the description of the event, I would actually expect it to > stop the event loop, while you're stopping the event loop from the > helper macro. It does stop the event loop. But many other events do the same. Purpose of that new brk event is to stop the loop in a concrete point in time, probably between some other, natural, events. > Why is it a macro btw? Why not make it a function? There was no any inevitable obstacle on the way of implementing that as a function. It just looks shorter, especially taking into account that the next patches introduce new swim_wait_timeout() usage cases with more complex conditions. I tried to make it a function firstly, but failed to do it without a pile of wrappers for each case. If you want it be a function - ok. I did it. ============================================================================== diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 0b301333b..896b9dcae 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -129,28 +129,49 @@ swim_cluster_is_fullmesh(struct swim_cluster *cluster) return true; } +typedef bool (*swim_loop_check_f)(struct swim_cluster *cluster, void *data); + /** - * A common wrapper for some conditions checking after each event - * loop step. + * Run the event loop until timeout happens or a custom + * test-defined condition is met. + * @param timeout Maximal number of bogus seconds to run the loop + * for. + * @param cluster Cluster to test for a condition. + * @param check Function condition-checker. It should return true, + * when the condition is met. + * @param data Arbitrary test data passed to @a check without + * changes. + * + * @retval -1 Timeout, condition is not satisfied. + * @retval 0 Success, condition is met before timeout. */ -#define swim_wait_timeout(timeout, target_cond) ({ \ - swim_ev_set_brk(timeout); \ - double deadline = swim_time() + timeout; \ - int rc = 0; \ - while (! (target_cond)) { \ - if (swim_time() >= deadline) { \ - rc = -1; \ - break; \ - } \ - swim_do_loop_step(loop()); \ - } \ - rc; \ -}) +static int +swim_wait_timeout(double timeout, struct swim_cluster *cluster, + swim_loop_check_f check, void *data) +{ + swim_ev_set_brk(timeout); + double deadline = swim_time() + timeout; + while (! check(cluster, data)) { + if (swim_time() >= deadline) + return -1; + swim_do_loop_step(loop()); + } + return 0; +} + +/** Wrapper to check a cluster for fullmesh for timeout. */ +static bool +swim_loop_check_fullmesh(struct swim_cluster *cluster, void *data) +{ + (void) data; + return swim_cluster_is_fullmesh(cluster); +} int swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout) { - return swim_wait_timeout(timeout, swim_cluster_is_fullmesh(cluster)); + return swim_wait_timeout(timeout, cluster, swim_loop_check_fullmesh, + NULL); } ============================================================================== Note, that I can not pass just 'cluster' without 'data' - next commits need more arguments than just a cluster object. > > Anyway, from this description I would expect that > swim_brk_event_process does something like ev_loop_break(), and > then the loop has to be resumed. On the whole, each event here does something like 'ev_loop_break()'. After each loop step the execution is stopped. This is because I do not have anything like ev_run() - I just don't need it. The only API exposed by test event loop is swim_do_loop_step(). It rolls one loop iteration and stops. Above that method I have wrappers, doing more loop steps and checking conditions. And *these wrappers* can be stopped or broken in concrete time points. Watch the hands: the loop is incremental and it can not be stopped because it can not be run. But wrappers can be stopped since they roll many steps in search of a condition satisfaction, or a timeout. This new event is needed not because I can't stop event loop execution, but because I can't add to it arbitrary time points. For example, *between* some events. Also, without that ability I could miss some errors. For an example you can look at the commit message. > It's fully up to you to do it > differently, but please fix the docs so that there is no > ambiguity. I added a big descriptive comment to the swim_test_ev.h, if you did not catch the mechanism from my commit messages: ============================================================================== diff --git a/test/unit/swim_test_ev.h b/test/unit/swim_test_ev.h index 01a1b8868..355f83f1b 100644 --- a/test/unit/swim_test_ev.h +++ b/test/unit/swim_test_ev.h @@ -37,6 +37,47 @@ struct ev_loop; * speed up events processing while keeping SWIM unaware that it * works in a simulation. Libev is used a little, just to store * some IO events. + * + * The test event loop works as follows. It has a global watch and + * a heap of events sorted by deadlines. An event is either a + * libev event like EV_TIMER, or an internal test event. + * + * On each iteration it takes all the next events with the nearest + * and equal deadline, and sets the global watch with the deadline + * value. It simulates time flow. All the events with that + * deadline are processed. An event processing usually means + * calling a libev callback set by a SWIM instance beforehand. + * + * For example, if event deadlines and the watch are: + * + * watch = 0 + * queue = [1, 1, 1, 5, 5, 6, 7, 7, 7] + * + * Then the queue is dispatched as follows: + * + * 1) watch = 1 + * process first 3 events + * queue = [5, 5, 6, 7, 7, 7] + * + * 2) watch = 5 + * process next 2 events + * queue = [6, 7, 7, 7] + * + * 3) watch = 6 + * process a next event + * queue = [7, 7, 7] + * + * 4) watch = 7 + * process next 3 events + * queue = [] + * + * The loop provides an API to make one iteration, do one loop + * step. For example, the sequence above is played in 4 loop + * steps. The unit tests can either do explicitly step by step, + * calling that API method. Or use wrappers with 'timeouts', which + * in fact do the same, but until the global watch equals a + * certain value. Usually after each loop step a test checks some + * conditions. */ ============================================================================== > And, once again, does swim_wait_timeout have to be a > macro? > > As a final nit, I would prefix all testing APIs with a name that > would make sure they are never confused with the actual SWIM API. > > E.g. swim_wait_timeout() could be easily confused with some public > method by a newbie. Doubtful. It is located in test/swim_test_utils.c file. If someone thinks, that methods in these file and directory are public, then not sure if different naming would cure them. > Perhaps choose an entirely different prefix > for the testing harness methods like swim_unit_* swim_ut_ or swum? I thought about it and tried as well, but then names looks either crazy ('swum', seriously? are you mocking?), or too long and the code starts looking like Java. I strongly disagree that we need these prefixes just for the sake of themselves here, because it aggravates readability. If in future something conflicts, we will just fix the concrete conflict without padding out all the other code. At this moment I use suffixes '_test_' and '_cluster_' only in some really dubious places. For example, for the methods which are supposed to be mixed with normal SWIM methods along the tests. Of course, for each of my objectives above and further you can insist with your right of veto, and I will do it heavy-heartedly through the nose and memes. > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 2/6] test: introduce breakpoints for swim's event loop 2019-04-02 12:25 ` Vladislav Shpilevoy @ 2019-04-02 19:16 ` Vladislav Shpilevoy 2019-04-02 20:40 ` Konstantin Osipov 0 siblings, 1 reply; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-04-02 19:16 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Just in case you somehow missed the previous email - I can not push next 'ok to push' patches before this one. ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 2/6] test: introduce breakpoints for swim's event loop 2019-04-02 19:16 ` Vladislav Shpilevoy @ 2019-04-02 20:40 ` Konstantin Osipov 2019-04-02 21:26 ` Vladislav Shpilevoy 0 siblings, 1 reply; 32+ messages in thread From: Konstantin Osipov @ 2019-04-02 20:40 UTC (permalink / raw) To: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/02 22:17]: > Just in case you somehow missed the previous email - I can not push > next 'ok to push' patches before this one. Please feel free to push this patch. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 2/6] test: introduce breakpoints for swim's event loop 2019-04-02 20:40 ` Konstantin Osipov @ 2019-04-02 21:26 ` Vladislav Shpilevoy 0 siblings, 0 replies; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-04-02 21:26 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed to master. On 02/04/2019 23:40, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/02 22:17]: >> Just in case you somehow missed the previous email - I can not push >> next 'ok to push' patches before this one. > > Please feel free to push this patch. > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] [PATCH 3/6] test: remove swim_unblock_fd event from swim test harness 2019-03-20 10:49 [tarantool-patches] [PATCH 0/6] SWIM failure detection draft Vladislav Shpilevoy 2019-03-20 10:49 ` [tarantool-patches] [PATCH 1/6] swim: follow-ups for SWIM anti-entropy Vladislav Shpilevoy 2019-03-20 10:49 ` [tarantool-patches] [PATCH 2/6] test: introduce breakpoints for swim's event loop Vladislav Shpilevoy @ 2019-03-20 10:49 ` Vladislav Shpilevoy 2019-03-29 18:22 ` [tarantool-patches] " Konstantin Osipov 2019-03-20 10:49 ` [tarantool-patches] [PATCH 4/6] swim: expose enum swim_member_status to public API Vladislav Shpilevoy ` (3 subsequent siblings) 6 siblings, 1 reply; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-03-20 10:49 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja It was designed to control when to unblock a blocked fake file descriptor. Blocked fd accepts all messages, but does not deliver them to SWIM. Also it takes messages to send from SWIM, but does not forward it into the 'network'. But such a complex event is not necessary after swim_brk_event appeared. It allows to block an fd, play event loop for a strictly specified fake seconds number, unblock fd, and play further. Follow-up for 03b9a6e91baf246ee2bb9841d01ba3824b6768a6 --- test/unit/swim.c | 9 +++++-- test/unit/swim_test_ev.c | 47 ------------------------------------- test/unit/swim_test_ev.h | 4 ---- test/unit/swim_test_utils.c | 17 ++++++++++++-- test/unit/swim_test_utils.h | 10 +++++++- 5 files changed, 31 insertions(+), 56 deletions(-) diff --git a/test/unit/swim.c b/test/unit/swim.c index 3a97aeb18..2c04e25b9 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -201,9 +201,14 @@ swim_test_add_remove(void) * removed from s1 after the message is scheduled but * before its completion. */ - swim_cluster_block_io(cluster, 0, 2); - swim_do_loop_step(loop()); + swim_cluster_block_io(cluster, 0); + swim_run_for(1); + /* + * Now the message from s1 is in 'fly', round step is not + * finished. + */ swim_remove_member(s1, swim_member_uuid(s2_self)); + swim_cluster_unblock_io(cluster, 0); is(swim_cluster_wait_fullmesh(cluster, 1), 0, "back in fullmesh after a member removal in the middle of a step"); diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c index d4a0a4752..5b84a68de 100644 --- a/test/unit/swim_test_ev.c +++ b/test/unit/swim_test_ev.c @@ -54,7 +54,6 @@ static int event_id = 0; */ enum swim_event_type { SWIM_EVENT_TIMER, - SWIM_EVENT_FD_UNBLOCK, SWIM_EVENT_BRK, }; @@ -205,52 +204,6 @@ swim_timer_event_new(struct ev_watcher *watcher, double delay) assert(rc != mh_end(events_hash)); } -/** - * SWIM fake transport's event. It is used to block a fake file - * descriptor for a delay. Right after a block that event is - * generated to unblock the descriptor later. - */ -struct swim_fd_unblock_event { - struct swim_event base; - /** A fake file descriptor to unlock. */ - int fd; -}; - -/** Delete a fd unblock event. */ -static void -swim_fd_unblock_event_delete(struct swim_event *e) -{ - assert(e->type == SWIM_EVENT_FD_UNBLOCK); - swim_event_destroy(e); - free(e); -} - -/** Process and delete a fd unblock event. */ -static void -swim_fd_unblock_event_process(struct swim_event *e, struct ev_loop *loop) -{ - (void) loop; - assert(e->type == SWIM_EVENT_FD_UNBLOCK); - struct swim_fd_unblock_event *fe = (struct swim_fd_unblock_event *) e; - swim_test_transport_unblock_fd(fe->fd); - swim_fd_unblock_event_delete(e); -} - -void -swim_test_ev_block_fd(int fd, double delay) -{ - struct swim_fd_unblock_event *e = - (struct swim_fd_unblock_event *) malloc(sizeof(*e)); - assert(e != NULL); - /* Block now. */ - swim_test_transport_block_fd(fd); - /* Unblock after delay. */ - swim_event_create(&e->base, SWIM_EVENT_FD_UNBLOCK, delay, - swim_fd_unblock_event_process, - swim_fd_unblock_event_delete); - e->fd = fd; -} - /** * Breakpoint event for debug. It does nothing but stops the event * loop after a timeout to allow highlevel API to check some diff --git a/test/unit/swim_test_ev.h b/test/unit/swim_test_ev.h index 01a1b8868..82bbfc635 100644 --- a/test/unit/swim_test_ev.h +++ b/test/unit/swim_test_ev.h @@ -47,10 +47,6 @@ swim_test_ev_init(void); void swim_test_ev_free(void); -/** Block a file descriptor @a fd for @a delay fake seconds. */ -void -swim_test_ev_block_fd(int fd, double delay); - /** * Stop the event loop after @a delay fake seconds. It does not * affect other events, so the loop can stop earlier multiple diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 0b301333b..ee24b0320 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -30,6 +30,7 @@ */ #include "swim_test_utils.h" #include "swim_test_ev.h" +#include "swim_test_transport.h" #include "swim/swim.h" #include "swim/swim_ev.h" #include "uuid/tt_uuid.h" @@ -94,9 +95,15 @@ swim_cluster_node(struct swim_cluster *cluster, int i) } void -swim_cluster_block_io(struct swim_cluster *cluster, int i, double delay) +swim_cluster_block_io(struct swim_cluster *cluster, int i) { - swim_test_ev_block_fd(swim_fd(cluster->node[i]), delay); + swim_test_transport_block_fd(swim_fd(cluster->node[i])); +} + +void +swim_cluster_unblock_io(struct swim_cluster *cluster, int i) +{ + swim_test_transport_unblock_fd(swim_fd(cluster->node[i])); } /** Check if @a s1 knows every member of @a s2's table. */ @@ -153,6 +160,12 @@ swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout) return swim_wait_timeout(timeout, swim_cluster_is_fullmesh(cluster)); } +void +swim_run_for(double duration) +{ + swim_wait_timeout(duration, false); +} + bool swim_error_check_match(const char *msg) { diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 90962b658..56fc2f57d 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -58,7 +58,11 @@ swim_cluster_node(struct swim_cluster *cluster, int i); /** Block IO on a SWIM instance with id @a i. */ void -swim_cluster_block_io(struct swim_cluster *cluster, int i, double delay); +swim_cluster_block_io(struct swim_cluster *cluster, int i); + +/** Unblock IO on a SWIM instance with id @a i. */ +void +swim_cluster_unblock_io(struct swim_cluster *cluster, int i); /** * Explicitly add a member of id @a from_id to a member of id @@ -78,6 +82,10 @@ swim_cluster_is_fullmesh(struct swim_cluster *cluster); int swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout); +/** Process SWIM events for @a duration fake seconds. */ +void +swim_run_for(double duration); + #define swim_start_test(n) { \ header(); \ say_verbose("-------- SWIM start test %s --------", __func__); \ -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 3/6] test: remove swim_unblock_fd event from swim test harness 2019-03-20 10:49 ` [tarantool-patches] [PATCH 3/6] test: remove swim_unblock_fd event from swim test harness Vladislav Shpilevoy @ 2019-03-29 18:22 ` Konstantin Osipov 2019-04-02 21:26 ` Vladislav Shpilevoy 0 siblings, 1 reply; 32+ messages in thread From: Konstantin Osipov @ 2019-03-29 18:22 UTC (permalink / raw) To: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: > It was designed to control when to unblock a blocked fake file > descriptor. Blocked fd accepts all messages, but does not deliver > them to SWIM. Also it takes messages to send from SWIM, but does > not forward it into the 'network'. OK to push. > -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 3/6] test: remove swim_unblock_fd event from swim test harness 2019-03-29 18:22 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-02 21:26 ` Vladislav Shpilevoy 0 siblings, 0 replies; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-04-02 21:26 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed to master. On 29/03/2019 21:22, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: >> It was designed to control when to unblock a blocked fake file >> descriptor. Blocked fd accepts all messages, but does not deliver >> them to SWIM. Also it takes messages to send from SWIM, but does >> not forward it into the 'network'. > > OK to push. > >> > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] [PATCH 4/6] swim: expose enum swim_member_status to public API 2019-03-20 10:49 [tarantool-patches] [PATCH 0/6] SWIM failure detection draft Vladislav Shpilevoy ` (2 preceding siblings ...) 2019-03-20 10:49 ` [tarantool-patches] [PATCH 3/6] test: remove swim_unblock_fd event from swim test harness Vladislav Shpilevoy @ 2019-03-20 10:49 ` Vladislav Shpilevoy 2019-03-29 18:24 ` [tarantool-patches] " Konstantin Osipov 2019-03-20 10:49 ` [tarantool-patches] [PATCH 5/6] test: differentiate blocked and closed swim fake fds Vladislav Shpilevoy ` (2 subsequent siblings) 6 siblings, 1 reply; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-03-20 10:49 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja At least for testing it is necessary to be able to get status of a member. Now it is always 'alive', but forthcoming failure-detection component would change it. Part of #3234 --- src/lib/swim/swim.c | 6 ++++++ src/lib/swim/swim.h | 6 ++++++ src/lib/swim/swim_proto.h | 14 +++++++++++--- test/unit/swim.c | 11 ++++++++++- test/unit/swim.result | 6 +++++- test/unit/swim_test_utils.c | 22 +++++++++++++++++++++- test/unit/swim_test_utils.h | 6 +++++- 7 files changed, 64 insertions(+), 7 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index c2b2132a2..df34ce247 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -962,6 +962,12 @@ swim_member_by_uuid(struct swim *swim, const struct tt_uuid *uuid) return swim_find_member(swim, uuid); } +enum swim_member_status +swim_member_status(const struct swim_member *member) +{ + return member->status; +} + struct swim_iterator * swim_iterator_open(struct swim *swim) { diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 744214371..ddb759c3d 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -31,6 +31,8 @@ * SUCH DAMAGE. */ #include <stdbool.h> +#define SWIM_PUBLIC_API +#include "swim_proto.h" #if defined(__cplusplus) extern "C" { @@ -112,6 +114,10 @@ swim_self(struct swim *swim); const struct swim_member * swim_member_by_uuid(struct swim *swim, const struct tt_uuid *uuid); +/** Member's current status. */ +enum swim_member_status +swim_member_status(const struct swim_member *member); + /** * Open an iterator to scan the whole member table. The iterator * is not stable. It means, that a caller can not yield between diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index 300a08c1f..4f3cdf03d 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -1,5 +1,3 @@ -#ifndef TARANTOOL_SWIM_PROTO_H_INCLUDED -#define TARANTOOL_SWIM_PROTO_H_INCLUDED /* * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. * @@ -64,6 +62,9 @@ * +-------------------------------------------------------------+ */ +#ifndef TARANTOOL_SWIM_PUBLIC_PROTO_H_INCLUDED +#define TARANTOOL_SWIM_PUBLIC_PROTO_H_INCLUDED + enum swim_member_status { /** The instance is ok, responds to requests. */ MEMBER_ALIVE = 0, @@ -72,6 +73,13 @@ enum swim_member_status { extern const char *swim_member_status_strs[]; +#endif /* TARANTOOL_SWIM_PUBLIC_PROTO_H_INCLUDED */ + +#ifdef SWIM_PUBLIC_API +#undef SWIM_PUBLIC_API +#elif !defined(TARANTOOL_SWIM_PRIVATE_PROTO_H_INCLUDED) +#define TARANTOOL_SWIM_PRIVATE_PROTO_H_INCLUDED + /** * SWIM member attributes from anti-entropy and dissemination * messages. @@ -317,4 +325,4 @@ int swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, const char *prefix, const char *param_name); -#endif /* TARANTOOL_SWIM_PROTO_H_INCLUDED */ +#endif /* TARANTOOL_SWIM_PRIVATE_PROTO_H_INCLUDED */ diff --git a/test/unit/swim.c b/test/unit/swim.c index 2c04e25b9..ea60be4ae 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -50,7 +50,7 @@ static int test_result; static void swim_test_one_link(void) { - swim_start_test(2); + swim_start_test(6); /* * Run a simple cluster of two elements. One of them * learns about another explicitly. Another should add the @@ -61,6 +61,15 @@ swim_test_one_link(void) is(swim_cluster_wait_fullmesh(cluster, 0.9), -1, "no rounds - no fullmesh"); is(swim_cluster_wait_fullmesh(cluster, 0.1), 0, "one link"); + + is(swim_cluster_member_status(cluster, 0, 0), MEMBER_ALIVE, + "self 0 is alive"); + is(swim_cluster_member_status(cluster, 1, 1), MEMBER_ALIVE, + "self 1 is alive"); + is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE, + "0 sees 1 as alive"); + is(swim_cluster_member_status(cluster, 1, 0), MEMBER_ALIVE, + "1 sees 0 as alive"); swim_cluster_delete(cluster); swim_finish_test(); diff --git a/test/unit/swim.result b/test/unit/swim.result index b58325b73..42cef1612 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,9 +1,13 @@ *** main_f *** 1..5 *** swim_test_one_link *** - 1..2 + 1..6 ok 1 - no rounds - no fullmesh ok 2 - one link + ok 3 - self 0 is alive + ok 4 - self 1 is alive + ok 5 - 0 sees 1 as alive + ok 6 - 1 sees 0 as alive ok 1 - subtests *** swim_test_one_link: done *** *** swim_test_sequence *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index ee24b0320..0d62bb26c 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -31,7 +31,6 @@ #include "swim_test_utils.h" #include "swim_test_ev.h" #include "swim_test_transport.h" -#include "swim/swim.h" #include "swim/swim_ev.h" #include "uuid/tt_uuid.h" #include "trivia/util.h" @@ -87,6 +86,27 @@ swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id) swim_member_uuid(from)); } +static const struct swim_member * +swim_cluster_member_view(struct swim_cluster *cluster, int node_id, + int member_id) +{ + struct swim *node = cluster->node[node_id]; + const struct swim_member *member = swim_self(cluster->node[member_id]); + const struct tt_uuid *member_uuid = swim_member_uuid(member); + return swim_member_by_uuid(node, member_uuid); +} + +enum swim_member_status +swim_cluster_member_status(struct swim_cluster *cluster, int node_id, + int member_id) +{ + const struct swim_member *m = + swim_cluster_member_view(cluster, node_id, member_id); + if (m == NULL) + return swim_member_status_MAX; + return swim_member_status(m); +} + struct swim * swim_cluster_node(struct swim_cluster *cluster, int i) { diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 56fc2f57d..befb95420 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -31,9 +31,9 @@ * SUCH DAMAGE. */ #include <stdbool.h> +#include "swim/swim.h" struct swim_cluster; -struct swim; /** * Create a new cluster of SWIM instances. Instances are assigned @@ -71,6 +71,10 @@ swim_cluster_unblock_io(struct swim_cluster *cluster, int i); int swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id); +enum swim_member_status +swim_cluster_member_status(struct swim_cluster *cluster, int node_id, + int member_id); + /** * Check if in the cluster every instance knowns the about other * instances. -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 4/6] swim: expose enum swim_member_status to public API 2019-03-20 10:49 ` [tarantool-patches] [PATCH 4/6] swim: expose enum swim_member_status to public API Vladislav Shpilevoy @ 2019-03-29 18:24 ` Konstantin Osipov 2019-04-02 12:25 ` Vladislav Shpilevoy 0 siblings, 1 reply; 32+ messages in thread From: Konstantin Osipov @ 2019-03-29 18:24 UTC (permalink / raw) To: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: > At least for testing it is necessary to be able to get status of > a member. Now it is always 'alive', but forthcoming > failure-detection component would change it. > +#define SWIM_PUBLIC_API > +#include "swim_proto.h" Please rather than using a define kludge introduce swim_constants.h and move the declaration there. Make swim_constants.h part of the public api. Otherwise OK to push. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 4/6] swim: expose enum swim_member_status to public API 2019-03-29 18:24 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-02 12:25 ` Vladislav Shpilevoy 2019-04-02 13:17 ` Konstantin Osipov 0 siblings, 1 reply; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-04-02 12:25 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov On 29/03/2019 21:24, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: >> At least for testing it is necessary to be able to get status of >> a member. Now it is always 'alive', but forthcoming >> failure-detection component would change it. > >> +#define SWIM_PUBLIC_API >> +#include "swim_proto.h" > > Please rather than using a define kludge introduce > swim_constants.h and move the declaration there. Make > swim_constants.h part of the public api. My main goal was not to lost the git history of enum swim_member_status, because at this moment it is pedantically accurate, with one commit per a single SWIM component introduction in each line of that enum. But as you wish. Since the patch has changed too much, there are no incremental diff. I pasted a new version below. ============================================================================== commit c775ac3bb2debf7bb14514cad9446c734f97afc2 Author: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> Date: Tue Mar 12 20:36:31 2019 +0300 swim: expose enum swim_member_status to public API At least for testing it is necessary to be able to get status of a member. Now it is always 'alive', but forthcoming failure-detection component would change it. Part of #3234 diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 6e80b2268..1b623fc27 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -973,6 +973,12 @@ swim_member_by_uuid(struct swim *swim, const struct tt_uuid *uuid) return swim_find_member(swim, uuid); } +enum swim_member_status +swim_member_status(const struct swim_member *member) +{ + return member->status; +} + struct swim_iterator * swim_iterator_open(struct swim *swim) { diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 744214371..3e57076c5 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -31,6 +31,7 @@ * SUCH DAMAGE. */ #include <stdbool.h> +#include "swim_constants.h" #if defined(__cplusplus) extern "C" { @@ -112,6 +113,10 @@ swim_self(struct swim *swim); const struct swim_member * swim_member_by_uuid(struct swim *swim, const struct tt_uuid *uuid); +/** Member's current status. */ +enum swim_member_status +swim_member_status(const struct swim_member *member); + /** * Open an iterator to scan the whole member table. The iterator * is not stable. It means, that a caller can not yield between diff --git a/src/lib/swim/swim_constants.h b/src/lib/swim/swim_constants.h new file mode 100644 index 000000000..c17e5060a --- /dev/null +++ b/src/lib/swim/swim_constants.h @@ -0,0 +1,45 @@ +#ifndef TARANTOOL_SWIM_CONSTANTS_H_INCLUDED +#define TARANTOOL_SWIM_CONSTANTS_H_INCLUDED +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +/** + * Constants for public API. + */ + +enum swim_member_status { + /** The instance is ok, responds to requests. */ + MEMBER_ALIVE = 0, + swim_member_status_MAX, +}; + +extern const char *swim_member_status_strs[]; + +#endif /* TARANTOOL_SWIM_CONSTANTS_H_INCLUDED */ diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index a0eb26209..69b44f14a 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -36,6 +36,7 @@ #include <netinet/in.h> #include <sys/socket.h> #include <stdbool.h> +#include "swim_constants.h" /** * SWIM binary protocol structures and helpers. Below is a picture @@ -66,14 +67,6 @@ * +-------------------------------------------------------------+ */ -enum swim_member_status { - /** The instance is ok, responds to requests. */ - MEMBER_ALIVE = 0, - swim_member_status_MAX, -}; - -extern const char *swim_member_status_strs[]; - /** * SWIM member attributes from anti-entropy and dissemination * messages. diff --git a/test/unit/swim.c b/test/unit/swim.c index 2c04e25b9..ea60be4ae 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -50,7 +50,7 @@ static int test_result; static void swim_test_one_link(void) { - swim_start_test(2); + swim_start_test(6); /* * Run a simple cluster of two elements. One of them * learns about another explicitly. Another should add the @@ -61,6 +61,15 @@ swim_test_one_link(void) is(swim_cluster_wait_fullmesh(cluster, 0.9), -1, "no rounds - no fullmesh"); is(swim_cluster_wait_fullmesh(cluster, 0.1), 0, "one link"); + + is(swim_cluster_member_status(cluster, 0, 0), MEMBER_ALIVE, + "self 0 is alive"); + is(swim_cluster_member_status(cluster, 1, 1), MEMBER_ALIVE, + "self 1 is alive"); + is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE, + "0 sees 1 as alive"); + is(swim_cluster_member_status(cluster, 1, 0), MEMBER_ALIVE, + "1 sees 0 as alive"); swim_cluster_delete(cluster); swim_finish_test(); diff --git a/test/unit/swim.result b/test/unit/swim.result index b58325b73..42cef1612 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,9 +1,13 @@ *** main_f *** 1..5 *** swim_test_one_link *** - 1..2 + 1..6 ok 1 - no rounds - no fullmesh ok 2 - one link + ok 3 - self 0 is alive + ok 4 - self 1 is alive + ok 5 - 0 sees 1 as alive + ok 6 - 1 sees 0 as alive ok 1 - subtests *** swim_test_one_link: done *** *** swim_test_sequence *** diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index b4c2adf3b..71ec63f3b 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -31,7 +31,6 @@ #include "swim_test_utils.h" #include "swim_test_ev.h" #include "swim_test_transport.h" -#include "swim/swim.h" #include "swim/swim_ev.h" #include "uuid/tt_uuid.h" #include "trivia/util.h" @@ -87,6 +86,27 @@ swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id) swim_member_uuid(from)); } +static const struct swim_member * +swim_cluster_member_view(struct swim_cluster *cluster, int node_id, + int member_id) +{ + struct swim *node = cluster->node[node_id]; + const struct swim_member *member = swim_self(cluster->node[member_id]); + const struct tt_uuid *member_uuid = swim_member_uuid(member); + return swim_member_by_uuid(node, member_uuid); +} + +enum swim_member_status +swim_cluster_member_status(struct swim_cluster *cluster, int node_id, + int member_id) +{ + const struct swim_member *m = + swim_cluster_member_view(cluster, node_id, member_id); + if (m == NULL) + return swim_member_status_MAX; + return swim_member_status(m); +} + struct swim * swim_cluster_node(struct swim_cluster *cluster, int i) { diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 56fc2f57d..befb95420 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -31,9 +31,9 @@ * SUCH DAMAGE. */ #include <stdbool.h> +#include "swim/swim.h" struct swim_cluster; -struct swim; /** * Create a new cluster of SWIM instances. Instances are assigned @@ -71,6 +71,10 @@ swim_cluster_unblock_io(struct swim_cluster *cluster, int i); int swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id); +enum swim_member_status +swim_cluster_member_status(struct swim_cluster *cluster, int node_id, + int member_id); + /** * Check if in the cluster every instance knowns the about other * instances. ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 4/6] swim: expose enum swim_member_status to public API 2019-04-02 12:25 ` Vladislav Shpilevoy @ 2019-04-02 13:17 ` Konstantin Osipov 2019-04-02 21:26 ` Vladislav Shpilevoy 0 siblings, 1 reply; 32+ messages in thread From: Konstantin Osipov @ 2019-04-02 13:17 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/02 15:27]: ok to push. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 4/6] swim: expose enum swim_member_status to public API 2019-04-02 13:17 ` Konstantin Osipov @ 2019-04-02 21:26 ` Vladislav Shpilevoy 0 siblings, 0 replies; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-04-02 21:26 UTC (permalink / raw) To: Konstantin Osipov; +Cc: tarantool-patches Pushed to master. On 02/04/2019 16:17, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/02 15:27]: > > ok to push. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] [PATCH 5/6] test: differentiate blocked and closed swim fake fds 2019-03-20 10:49 [tarantool-patches] [PATCH 0/6] SWIM failure detection draft Vladislav Shpilevoy ` (3 preceding siblings ...) 2019-03-20 10:49 ` [tarantool-patches] [PATCH 4/6] swim: expose enum swim_member_status to public API Vladislav Shpilevoy @ 2019-03-20 10:49 ` Vladislav Shpilevoy 2019-03-29 18:25 ` [tarantool-patches] " Konstantin Osipov 2019-03-20 10:49 ` [tarantool-patches] [PATCH 6/6] [RAW] swim: introduce failure detection component Vladislav Shpilevoy 2019-03-27 19:28 ` [tarantool-patches] [PATCH 7/6] swim: make swim_upsert_member returning two values Vladislav Shpilevoy 6 siblings, 1 reply; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-03-20 10:49 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja SWIM's fake file descriptors were implemented to test SWIM with virtual time and fully controlled network with immediate packet delivery. One of their features - API to block a file descriptor and test various failures about it. But blocked fake fd looks the same as closed fd, and it can confuse new test's author. Now if an fd is not unblocked at the end of a test, it leads to a crash. This commit fixes that via adding explicit difference between blocked and closed fd. Part of #3234 --- test/unit/swim_test_transport.c | 53 +++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c index d6591e969..d1c3e97d7 100644 --- a/test/unit/swim_test_transport.c +++ b/test/unit/swim_test_transport.c @@ -90,10 +90,17 @@ struct swim_fd { /** File descriptor number visible to libev. */ int evfd; /** - * Link in the list of opened descriptors. Used to feed - * them all EV_WRITE. + * True, if the descriptor is opened and can receive new + * messages. Regardless of blocked or not. In case of + * blocked, new messages are queued, but not delivered. */ - struct rlist in_opened; + bool is_opened; + + /** + * Link in the list of opened and non-blocked descriptors. + * Used to feed them all EV_WRITE. + */ + struct rlist in_active; /** Queue of received, but not processed packets. */ struct rlist recv_queue; /** Queue of sent, but not received packets. */ @@ -103,23 +110,22 @@ struct swim_fd { /** Table of fake file descriptors. */ static struct swim_fd swim_fd[FAKE_FD_NUMBER]; /** - * List of opened file descriptors. Used to avoid fullscan of the + * List of active file descriptors. Used to avoid fullscan of the * table. */ -static RLIST_HEAD(swim_fd_opened); +static RLIST_HEAD(swim_fd_active); /** Open a fake file descriptor. */ static inline int swim_fd_open(struct swim_fd *fd) { - if (! rlist_empty(&fd->in_opened)) { + if (fd->is_opened) { errno = EADDRINUSE; diag_set(SocketError, "test_socket:1", "bind"); return -1; } - rlist_add_tail_entry(&swim_fd_opened, fd, in_opened); - rlist_create(&fd->recv_queue); - rlist_create(&fd->send_queue); + fd->is_opened = true; + rlist_add_tail_entry(&swim_fd_active, fd, in_active); return 0; } @@ -127,12 +133,15 @@ swim_fd_open(struct swim_fd *fd) static inline void swim_fd_close(struct swim_fd *fd) { + if (! fd->is_opened) + return; struct swim_test_packet *i, *tmp; rlist_foreach_entry_safe(i, &fd->recv_queue, in_queue, tmp) swim_test_packet_delete(i); rlist_foreach_entry_safe(i, &fd->send_queue, in_queue, tmp) swim_test_packet_delete(i); - rlist_del_entry(fd, in_opened); + rlist_del_entry(fd, in_active); + fd->is_opened = false; } void @@ -140,7 +149,8 @@ swim_test_transport_init(void) { for (int i = 0, evfd = FAKE_FD_BASE; i < FAKE_FD_NUMBER; ++i, ++evfd) { swim_fd[i].evfd = evfd; - rlist_create(&swim_fd[i].in_opened); + swim_fd[i].is_opened = false; + rlist_create(&swim_fd[i].in_active); rlist_create(&swim_fd[i].recv_queue); rlist_create(&swim_fd[i].send_queue); } @@ -172,6 +182,7 @@ swim_transport_send(struct swim_transport *transport, const void *data, swim_test_packet_new(data, size, &transport->addr, (const struct sockaddr_in *) addr); struct swim_fd *src = &swim_fd[transport->fd - FAKE_FD_BASE]; + assert(src->is_opened); rlist_add_tail_entry(&src->send_queue, p, in_queue); return size; } @@ -188,6 +199,7 @@ swim_transport_recv(struct swim_transport *transport, void *buffer, size_t size, * Pop a packet from a receving queue. */ struct swim_fd *dst = &swim_fd[transport->fd - FAKE_FD_BASE]; + assert(dst->is_opened); struct swim_test_packet *p = rlist_shift_entry(&dst->recv_queue, struct swim_test_packet, in_queue); @@ -236,16 +248,16 @@ void swim_test_transport_block_fd(int fd) { struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE]; - assert(! rlist_empty(&sfd->in_opened)); - rlist_del_entry(sfd, in_opened); + assert(! rlist_empty(&sfd->in_active)); + rlist_del_entry(sfd, in_active); } void swim_test_transport_unblock_fd(int fd) { struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE]; - assert(rlist_empty(&sfd->in_opened)); - rlist_add_tail_entry(&swim_fd_opened, sfd, in_opened); + if (sfd->is_opened && rlist_empty(&sfd->in_active)) + rlist_add_tail_entry(&swim_fd_active, sfd, in_active); } /** Send one packet to destination's recv queue. */ @@ -257,8 +269,11 @@ swim_fd_send_packet(struct swim_fd *fd) struct swim_test_packet *p = rlist_shift_entry(&fd->send_queue, struct swim_test_packet, in_queue); - int dst_i = ntohs(p->dst.sin_port); - rlist_add_tail_entry(&swim_fd[dst_i].recv_queue, p, in_queue); + struct swim_fd *dst = &swim_fd[ntohs(p->dst.sin_port)]; + if (dst->is_opened) + rlist_add_tail_entry(&dst->recv_queue, p, in_queue); + else + swim_test_packet_delete(p); } void @@ -269,11 +284,11 @@ swim_transport_do_loop_step(struct ev_loop *loop) * Reversed because libev invokes events in reversed * order. So this reverse + libev reverse = normal order. */ - rlist_foreach_entry_reverse(fd, &swim_fd_opened, in_opened) { + rlist_foreach_entry_reverse(fd, &swim_fd_active, in_active) { swim_fd_send_packet(fd); ev_feed_fd_event(loop, fd->evfd, EV_WRITE); } - rlist_foreach_entry_reverse(fd, &swim_fd_opened, in_opened) { + rlist_foreach_entry_reverse(fd, &swim_fd_active, in_active) { if (!rlist_empty(&fd->recv_queue)) ev_feed_fd_event(loop, fd->evfd, EV_READ); } -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 5/6] test: differentiate blocked and closed swim fake fds 2019-03-20 10:49 ` [tarantool-patches] [PATCH 5/6] test: differentiate blocked and closed swim fake fds Vladislav Shpilevoy @ 2019-03-29 18:25 ` Konstantin Osipov 2019-04-02 21:26 ` Vladislav Shpilevoy 0 siblings, 1 reply; 32+ messages in thread From: Konstantin Osipov @ 2019-03-29 18:25 UTC (permalink / raw) To: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: > SWIM's fake file descriptors were implemented to test SWIM with > virtual time and fully controlled network with immediate packet > delivery. One of their features - API to block a file descriptor > and test various failures about it. > > But blocked fake fd looks the same as closed fd, and it can > confuse new test's author. Now if an fd is not unblocked at the > end of a test, it leads to a crash. This commit fixes that via > adding explicit difference between blocked and closed fd. > > Part of #3234 OK to push. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 5/6] test: differentiate blocked and closed swim fake fds 2019-03-29 18:25 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-02 21:26 ` Vladislav Shpilevoy 0 siblings, 0 replies; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-04-02 21:26 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed to master. On 29/03/2019 21:25, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: >> SWIM's fake file descriptors were implemented to test SWIM with >> virtual time and fully controlled network with immediate packet >> delivery. One of their features - API to block a file descriptor >> and test various failures about it. >> >> But blocked fake fd looks the same as closed fd, and it can >> confuse new test's author. Now if an fd is not unblocked at the >> end of a test, it leads to a crash. This commit fixes that via >> adding explicit difference between blocked and closed fd. >> >> Part of #3234 > > OK to push. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] [PATCH 6/6] [RAW] swim: introduce failure detection component 2019-03-20 10:49 [tarantool-patches] [PATCH 0/6] SWIM failure detection draft Vladislav Shpilevoy ` (4 preceding siblings ...) 2019-03-20 10:49 ` [tarantool-patches] [PATCH 5/6] test: differentiate blocked and closed swim fake fds Vladislav Shpilevoy @ 2019-03-20 10:49 ` Vladislav Shpilevoy 2019-03-29 18:59 ` [tarantool-patches] " Konstantin Osipov 2019-03-27 19:28 ` [tarantool-patches] [PATCH 7/6] swim: make swim_upsert_member returning two values Vladislav Shpilevoy 6 siblings, 1 reply; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-03-20 10:49 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja Failure detection components allows to find which members are already dead. Part of #3234 --- src/lib/swim/swim.c | 450 +++++++++++++++++++++++++++++++- src/lib/swim/swim.h | 36 ++- src/lib/swim/swim_io.c | 23 +- src/lib/swim/swim_io.h | 16 ++ src/lib/swim/swim_proto.c | 82 +++++- src/lib/swim/swim_proto.h | 101 ++++++- test/unit/swim.c | 160 +++++++++++- test/unit/swim.result | 34 ++- test/unit/swim_test_transport.c | 16 +- test/unit/swim_test_transport.h | 9 + test/unit/swim_test_utils.c | 71 ++++- test/unit/swim_test_utils.h | 30 +++ 12 files changed, 991 insertions(+), 37 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index df34ce247..f97a2f993 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -38,6 +38,8 @@ #include "info/info.h" #include "assoc.h" #include "sio.h" +#define HEAP_FORWARD_DECLARATION +#include "salad/heap.h" /** * SWIM - Scalable Weakly-consistent Infection-style Process Group @@ -135,6 +137,31 @@ enum { * value. */ HEARTBEAT_RATE_DEFAULT = 1, + /** + * If a ping was sent, it is considered to be lost after + * this time without an ack. Nothing special in this + * value. + */ + ACK_TIMEOUT_DEFAULT = 30, + /** + * If a member has not been responding to pings this + * number of times, it is considered to be dead. + * According to the SWIM paper, for a member it is enough + * to do not respond on one direct ping, and on K + * simultanous indirect pings, to be considered as dead. + * Seems too little, so here it is bigger. + */ + NO_ACKS_TO_DEAD = 3, + /** + * If a member confirmed to be dead, it is removed from + * the membership after at least this number of + * unacknowledged pings. According to the SWIM paper, a + * dead member is deleted immediately. But here it is held + * for a while to 1) maybe refute its dead status, 2) + * disseminate the status via dissemination and + * anti-entropy components. + */ + NO_ACKS_TO_GC_DEFAULT = 2, }; /** @@ -213,6 +240,31 @@ struct swim_member { * Position in a queue of members in the current round. */ struct rlist in_round_queue; + /** + * + * Failure detection component + */ + /** Growing number to refute old messages. */ + uint64_t incarnation; + /** + * How many pings did not receive an ack in a row being in + * the current status. After a threshold the instance is + * marked as dead. After more it is removed from the + * table. On each status or incarnation change this + * counter is reset. + */ + int unacknowledged_pings; + /** + * When the latest ping is considered to be + * unacknowledged. + */ + double ping_deadline; + /** Ready at hand regular ACK task. */ + struct swim_task ack_task; + /** Ready at hand regular PING task. */ + struct swim_task ping_task; + /** Position in a queue of members waiting for an ack. */ + struct heap_node in_wait_ack_heap; }; #define mh_name _swim_table @@ -230,6 +282,12 @@ struct mh_swim_table_key { #define MH_SOURCE 1 #include "salad/mhash.h" +#define HEAP_NAME wait_ack_heap +#define HEAP_LESS(h, a, b) ((a)->ping_deadline < (b)->ping_deadline) +#define heap_value_t struct swim_member +#define heap_value_attr in_wait_ack_heap +#include "salad/heap.h" + /** * SWIM instance. Stores configuration, manages periodical tasks, * rounds. Each member has an object of this type on its host, @@ -285,8 +343,80 @@ struct swim { * starting from this instance. */ mh_int_t iterator; + /** + * + * Failure detection component + */ + /** + * Members waiting for an ACK. On too long absence of an + * ACK a member is considered to be dead and is removed. + * The heap is sorted by deadline in ascending order + * (bottom is newer, top is older). + */ + heap_t wait_ack_heap; + /** Generator of ack checking events. */ + struct ev_timer wait_ack_tick; + /** + * How many pings to a dead member should be + * unacknowledged to delete it from the member table. + */ + int no_acks_to_gc; }; +/** Put the member into a list of ACK waiters. */ +static void +swim_wait_ack(struct swim *swim, struct swim_member *member) +{ + if (heap_node_is_stray(&member->in_wait_ack_heap)) { + member->ping_deadline = swim_time() + swim->wait_ack_tick.at; + wait_ack_heap_insert(&swim->wait_ack_heap, member); + swim_ev_timer_start(loop(), &swim->wait_ack_tick); + } +} + +/** + * Make all needed actions to process a member's update like a + * change of its status, or incarnation, or both. + */ +static void +swim_on_member_update(struct swim *swim, struct swim_member *member) +{ + (void) swim; + member->unacknowledged_pings = 0; +} + +/** + * Update status and incarnation of the member if needed. Statuses + * are compared as a compound key: {incarnation, status}. So @a + * new_status can override an old one only if its incarnation is + * greater, or the same, but its status is "bigger". Statuses are + * compared by their identifier, so "alive" < "dead". This + * protects from the case when a member is detected as dead on one + * instance, but overriden by another instance with the same + * incarnation "alive" message. + */ +static inline void +swim_update_member_inc_status(struct swim *swim, struct swim_member *member, + enum swim_member_status new_status, + uint64_t incarnation) +{ + /* + * Source of truth about self is this instance and it is + * never updated from remote. Refutation is handled + * separately. + */ + assert(member != swim->self); + if (member->incarnation < incarnation) { + member->status = new_status; + member->incarnation = incarnation; + swim_on_member_update(swim, member); + } else if (member->incarnation == incarnation && + member->status < new_status) { + member->status = new_status; + swim_on_member_update(swim, member); + } +} + int swim_fd(const struct swim *swim) { @@ -304,11 +434,37 @@ swim_by_scheduler(struct swim_scheduler *scheduler) return container_of(scheduler, struct swim, scheduler); } +/** + * Once a ping is sent, the member should start waiting for an + * ACK. + */ +static void +swim_ping_task_complete(struct swim_task *task, + struct swim_scheduler *scheduler, int rc) +{ + /* + * If ping send has failed, it makes to sense to wait for + * an ACK. + */ + if (rc < 0) + return; + struct swim *swim = swim_by_scheduler(scheduler); + struct swim_member *m = container_of(task, struct swim_member, + ping_task); + swim_wait_ack(swim, m); +} + /** Free member's resources. */ static inline void swim_member_delete(struct swim_member *member) { assert(rlist_empty(&member->in_round_queue)); + + /* Failure detection component. */ + assert(heap_node_is_stray(&member->in_wait_ack_heap)); + swim_task_destroy(&member->ack_task); + swim_task_destroy(&member->ping_task); + free(member); } @@ -330,7 +486,7 @@ swim_reserve_one_member(struct swim *swim) /** Create a new member. It is not registered anywhere here. */ static struct swim_member * swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status) + enum swim_member_status status, uint64_t incarnation) { struct swim_member *member = (struct swim_member *) calloc(1, sizeof(*member)); @@ -343,6 +499,13 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid, member->uuid = *uuid; member->hash = swim_uuid_hash(uuid); rlist_create(&member->in_round_queue); + + /* Failure detection component. */ + member->incarnation = incarnation; + heap_node_create(&member->in_wait_ack_heap); + swim_task_create(&member->ack_task, NULL, NULL, "ack"); + swim_task_create(&member->ping_task, swim_ping_task_complete, NULL, + "ping"); return member; } @@ -360,6 +523,11 @@ swim_delete_member(struct swim *swim, struct swim_member *member) assert(rc != mh_end(swim->members)); mh_swim_table_del(swim->members, rc, NULL); rlist_del_entry(member, in_round_queue); + + /* Failure detection component. */ + if (! heap_node_is_stray(&member->in_wait_ack_heap)) + wait_ack_heap_delete(&swim->wait_ack_heap, member); + swim_member_delete(member); } @@ -383,7 +551,8 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid) */ static struct swim_member * swim_new_member(struct swim *swim, const struct sockaddr_in *addr, - const struct tt_uuid *uuid, enum swim_member_status status) + const struct tt_uuid *uuid, enum swim_member_status status, + uint64_t incarnation) { int new_bsize = sizeof(swim->shuffled[0]) * (mh_size(swim->members) + 1); @@ -394,7 +563,17 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, return NULL; } swim->shuffled = new_shuffled; - struct swim_member *member = swim_member_new(addr, uuid, status); + /* + * Reserve one more slot to never fail push into the ack + * waiters heap. + */ + if (wait_ack_heap_reserve(&swim->wait_ack_heap) != 0) { + diag_set(OutOfMemory, sizeof(struct heap_node), "realloc", + "wait_ack_heap"); + return NULL; + } + struct swim_member *member = + swim_member_new(addr, uuid, status, incarnation); if (member == NULL) return NULL; assert(swim_find_member(swim, uuid) == NULL); @@ -489,7 +668,7 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet) if (swim_packet_reserve(packet, new_size) == NULL) break; swim_member_bin_fill(&member_bin, &m->addr, &m->uuid, - m->status); + m->status, m->incarnation); memcpy(pos + size, &member_bin, sizeof(member_bin)); size = new_size; /* @@ -526,6 +705,25 @@ swim_encode_src_uuid(struct swim *swim, struct swim_packet *packet) return 1; } +/** + * Encode failure detection component. + * @retval Number of key-values added to the packet's root map. + */ +static int +swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet, + enum swim_fd_msg_type type) +{ + struct swim_fd_header_bin fd_header_bin; + int size = sizeof(fd_header_bin); + char *pos = swim_packet_alloc(packet, size); + if (pos == NULL) + return 0; + swim_fd_header_bin_create(&fd_header_bin, type, + swim->self->incarnation); + memcpy(pos, &fd_header_bin, size); + return 1; +} + /** Encode SWIM components into a UDP packet. */ static void swim_encode_round_msg(struct swim *swim, struct swim_packet *packet) @@ -534,9 +732,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet) char *header = swim_packet_alloc(packet, 1); int map_size = 0; map_size += swim_encode_src_uuid(swim, packet); + map_size += swim_encode_failure_detection(swim, packet, + SWIM_FD_MSG_PING); map_size += swim_encode_anti_entropy(swim, packet); - assert(mp_sizeof_map(map_size) == 1 && map_size == 2); + assert(mp_sizeof_map(map_size) == 1 && map_size >= 2); mp_encode_map(header, map_size); } @@ -592,8 +792,92 @@ swim_complete_step(struct swim_task *task, struct swim_member *m = rlist_first_entry(&swim->round_queue, struct swim_member, in_round_queue); - if (swim_sockaddr_in_eq(&m->addr, &task->dst)) + if (swim_sockaddr_in_eq(&m->addr, &task->dst)) { rlist_shift(&swim->round_queue); + if (rc > 0) { + /* + * Each round message contains failure + * detection section with a ping. + */ + swim_wait_ack(swim, m); + } + } +} + +/** Schedule send of a failure detection message. */ +static void +swim_send_fd_msg(struct swim *swim, struct swim_task *task, + const struct sockaddr_in *dst, enum swim_fd_msg_type type) +{ + /* + * Reset packet allocator in case if task is being reused. + */ + swim_packet_create(&task->packet); + char *header = swim_packet_alloc(&task->packet, 1); + int map_size = swim_encode_src_uuid(swim, &task->packet); + map_size += swim_encode_failure_detection(swim, &task->packet, type); + assert(map_size == 2); + mp_encode_map(header, map_size); + say_verbose("SWIM %d: schedule %s to %s", swim_fd(swim), + swim_fd_msg_type_strs[type], + sio_strfaddr((struct sockaddr *) dst, sizeof(*dst))); + swim_task_send(task, dst, &swim->scheduler); +} + +/** Schedule send of an ack. */ +static inline void +swim_send_ack(struct swim *swim, struct swim_task *task, + const struct sockaddr_in *dst) +{ + swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK); +} + +/** Schedule send of a ping. */ +static inline void +swim_send_ping(struct swim *swim, struct swim_task *task, + const struct sockaddr_in *dst) +{ + swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_PING); +} + +/** + * Check for unacknowledged pings. A ping is unacknowledged if an + * ack was not received during ack timeout. An unacknowledged ping + * is resent here. + */ +static void +swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events) +{ + assert((events & EV_TIMER) != 0); + (void) events; + struct swim *swim = (struct swim *) t->data; + double current_time = swim_time(); + struct swim_member *m; + while ((m = wait_ack_heap_top(&swim->wait_ack_heap)) != NULL) { + if (current_time < m->ping_deadline) { + swim_ev_timer_start(loop, t); + return; + } + wait_ack_heap_pop(&swim->wait_ack_heap); + ++m->unacknowledged_pings; + switch (m->status) { + case MEMBER_ALIVE: + if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) { + m->status = MEMBER_DEAD; + swim_on_member_update(swim, m); + } + break; + case MEMBER_DEAD: + if (m->unacknowledged_pings >= swim->no_acks_to_gc) { + swim_delete_member(swim, m); + continue; + } + break; + default: + unreachable(); + } + swim_send_ping(swim, &m->ping_task, &m->addr); + } } /** @@ -642,6 +926,7 @@ swim_update_member_uuid(struct swim *swim, struct swim_member *member, say_verbose("SWIM %d: a member has changed its UUID from %s to %s", swim_fd(swim), swim_uuid_str(&old_uuid), swim_uuid_str(new_uuid)); + swim_on_member_update(swim, member); return 0; } @@ -650,8 +935,10 @@ static inline void swim_update_member_addr(struct swim *swim, struct swim_member *member, const struct sockaddr_in *addr) { - (void) swim; - member->addr = *addr; + if (! swim_sockaddr_in_eq(addr, &member->addr)) { + member->addr = *addr; + swim_on_member_update(swim, member); + } } /** @@ -665,13 +952,52 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def) { struct swim_member *member = swim_find_member(swim, &def->uuid); if (member == NULL) { + if (def->status == MEMBER_DEAD && + swim->no_acks_to_gc != SWIM_NO_ACKS_IGNORE) { + /* + * Do not 'resurrect' dead members to + * prevent 'ghost' members. Ghost member + * is a one declared as dead, sent via + * anti-entropy, and removed from local + * member table, but then returned back + * from received anti-entropy, as again + * dead. Such dead members could 'live' + * forever. + */ + return NULL; + } member = swim_new_member(swim, &def->addr, &def->uuid, - def->status); + def->status, def->incarnation); return member; } struct swim_member *self = swim->self; - if (member != self) + if (member != self) { + if (def->incarnation < member->incarnation) + return member; swim_update_member_addr(swim, member, &def->addr); + swim_update_member_inc_status(swim, member, def->status, + def->incarnation); + return member; + } + /* + * It is possible that other instances know a bigger + * incarnation of this instance - such thing happens when + * the instance restarts and loses its local incarnation + * number. It will be restored by receiving dissemination + * and anti-entropy messages about self. + */ + if (self->incarnation < def->incarnation) + self->incarnation = def->incarnation; + if (def->status != MEMBER_ALIVE && + def->incarnation == self->incarnation) { + /* + * In the cluster a gossip exists that this + * instance is not alive. Refute this information + * with a bigger incarnation. + */ + self->incarnation++; + swim_on_member_update(swim, member); + } return member; } @@ -699,12 +1025,58 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) return 0; } +/** + * Decode a failure detection message. Schedule acks, process + * acks. + */ +static int +swim_process_failure_detection(struct swim *swim, const char **pos, + const char *end, const struct sockaddr_in *src, + const struct tt_uuid *uuid) +{ + const char *prefix = "invalid failure detection message:"; + struct swim_failure_detection_def def; + struct swim_member_def mdef; + if (swim_failure_detection_def_decode(&def, pos, end, prefix) != 0) + return -1; + say_verbose("SWIM %d: process failure detection's %s", swim_fd(swim), + swim_fd_msg_type_strs[def.type]); + swim_member_def_create(&mdef); + mdef.addr = *src; + mdef.incarnation = def.incarnation; + mdef.uuid = *uuid; + struct swim_member *member = swim_upsert_member(swim, &mdef); + if (member == NULL) + return -1; + if (def.incarnation < member->incarnation) + return 0; + if (def.incarnation == member->incarnation && + member->status != MEMBER_ALIVE) { + member->status = MEMBER_ALIVE; + swim_on_member_update(swim, member); + } + + switch (def.type) { + case SWIM_FD_MSG_PING: + if (! swim_task_is_scheduled(&member->ack_task)) + swim_send_ack(swim, &member->ack_task, &member->addr); + break; + case SWIM_FD_MSG_ACK: + member->unacknowledged_pings = 0; + if (! heap_node_is_stray(&member->in_wait_ack_heap)) + wait_ack_heap_delete(&swim->wait_ack_heap, member); + break; + default: + unreachable(); + } + return 0; +} + /** Process a new message. */ static void swim_on_input(struct swim_scheduler *scheduler, const char *pos, const char *end, const struct sockaddr_in *src) { - (void) src; const char *prefix = "invalid message:"; struct swim *swim = swim_by_scheduler(scheduler); struct tt_uuid uuid; @@ -734,6 +1106,11 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos, if (swim_process_anti_entropy(swim, &pos, end) != 0) goto error; break; + case SWIM_FAILURE_DETECTION: + if (swim_process_failure_detection(swim, &pos, end, + src, &uuid) != 0) + goto error; + break; default: diag_set(SwimError, "%s unexpected key", prefix); goto error; @@ -766,6 +1143,13 @@ swim_new(void) swim_task_create(&swim->round_step_task, swim_complete_step, NULL, "round packet"); swim_scheduler_create(&swim->scheduler, swim_on_input); + + /* Failure detection component. */ + wait_ack_heap_create(&swim->wait_ack_heap); + swim_ev_timer_init(&swim->wait_ack_tick, swim_check_acks, + ACK_TIMEOUT_DEFAULT, 0); + swim->wait_ack_tick.data = (void *) swim; + swim->no_acks_to_gc = NO_ACKS_TO_GC_DEFAULT; return swim; } @@ -796,7 +1180,7 @@ swim_uri_to_addr(const char *uri, struct sockaddr_in *addr, int swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, - const struct tt_uuid *uuid) + double ack_timeout, int no_acks_to_gc, const struct tt_uuid *uuid) { const char *prefix = "swim.cfg:"; struct sockaddr_in addr; @@ -809,7 +1193,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, "a first config", prefix); return -1; } - swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE); + swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, + 0); if (swim->self == NULL) return -1; } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { @@ -855,14 +1240,25 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, if (swim->round_tick.at != heartbeat_rate && heartbeat_rate > 0) swim_ev_timer_set(&swim->round_tick, heartbeat_rate, 0); + if (swim->wait_ack_tick.at != ack_timeout && ack_timeout > 0) + swim_ev_timer_set(&swim->wait_ack_tick, ack_timeout, 0); + swim_update_member_addr(swim, swim->self, &addr); int rc = swim_update_member_uuid(swim, swim->self, uuid); /* Reserved above. */ assert(rc == 0); (void) rc; + if (no_acks_to_gc >= 0) + swim->no_acks_to_gc = no_acks_to_gc; return 0; } +double +swim_ack_timeout(const struct swim *swim) +{ + return swim->wait_ack_tick.at; +} + bool swim_is_configured(const struct swim *swim) { @@ -883,7 +1279,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) return -1; struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { - member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE); + member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0); return member == NULL ? -1 : 0; } diag_set(SwimError, "%s a member with such UUID already exists", @@ -911,6 +1307,21 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid) return 0; } +int +swim_probe_member(struct swim *swim, const char *uri) +{ + assert(swim_is_configured(swim)); + struct sockaddr_in addr; + if (swim_uri_to_addr(uri, &addr, "swim.probe_member:") != 0) + return -1; + struct swim_task *t = swim_task_new(swim_task_delete_cb, + swim_task_delete_cb, "probe ping"); + if (t == NULL) + return -1; + swim_send_ping(swim, t, &addr); + return 0; +} + void swim_info(struct swim *swim, struct info_handler *info) { @@ -926,6 +1337,7 @@ swim_info(struct swim *swim, struct info_handler *info) info_append_str(info, "status", swim_member_status_strs[m->status]); info_append_str(info, "uuid", swim_uuid_str(&m->uuid)); + info_append_int(info, "incarnation", (int64_t) m->incarnation); info_table_end(info); } info_end(info); @@ -936,14 +1348,18 @@ swim_delete(struct swim *swim) { swim_scheduler_destroy(&swim->scheduler); swim_ev_timer_stop(loop(), &swim->round_tick); + swim_ev_timer_stop(loop(), &swim->wait_ack_tick); swim_task_destroy(&swim->round_step_task); mh_int_t node; mh_foreach(swim->members, node) { struct swim_member *m = *mh_swim_table_node(swim->members, node); rlist_del_entry(m, in_round_queue); + if (! heap_node_is_stray(&m->in_wait_ack_heap)) + wait_ack_heap_delete(&swim->wait_ack_heap, m); swim_member_delete(m); } + wait_ack_heap_destroy(&swim->wait_ack_heap); mh_swim_table_delete(swim->members); free(swim->shuffled); } @@ -1007,3 +1423,9 @@ swim_member_uuid(const struct swim_member *member) { return &member->uuid; } + +uint64_t +swim_member_incarnation(const struct swim_member *member) +{ + return member->incarnation; +} diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index ddb759c3d..23b244d00 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -31,6 +31,7 @@ * SUCH DAMAGE. */ #include <stdbool.h> +#include <limits.h> #define SWIM_PUBLIC_API #include "swim_proto.h" @@ -38,6 +39,16 @@ extern "C" { #endif +enum { + /** + * Value being used to say that unacknowledged pings + * should not affect a certain decision about member. For + * example, regardless of number of unacked pings, never + * drop a member. + */ + SWIM_NO_ACKS_IGNORE = INT_MAX, +}; + struct info_handler; struct swim; struct tt_uuid; @@ -65,6 +76,14 @@ swim_is_configured(const struct swim *swim); * @heartbeat_rate seconds. It is rather the protocol * speed. Protocol period depends on member count and * @heartbeat_rate. + * @param ack_timeout Time in seconds after which a ping is + * considered to be unacknowledged. + * @param no_acks_to_gc How many pings to a dead member should be + * unacknowledged to delete it from the member table. Big + * or even almost infinite (SWIM_NO_ACKS_IGNORE) values + * could be useful, if SWIM is used mainly for monitoring + * of existing nodes with manual removal of dead ones, and + * probably with only single initial discovery. * @param uuid UUID of this instance. Must be unique over the * cluster. * @@ -73,7 +92,11 @@ swim_is_configured(const struct swim *swim); */ int swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, - const struct tt_uuid *uuid); + double ack_timeout, int no_acks_to_gc, const struct tt_uuid *uuid); + +/** SWIM's ACK timeout, previously set via @sa swim_cfg. */ +double +swim_ack_timeout(const struct swim *swim); /** * Stop listening and broadcasting messages, cleanup all internal @@ -98,6 +121,13 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid); int swim_remove_member(struct swim *swim, const struct tt_uuid *uuid); +/** + * Send a ping to this address. If an ACK is received, the member + * will be added. + */ +int +swim_probe_member(struct swim *swim, const char *uri); + /** Dump member statuses into @a info. */ void swim_info(struct swim *swim, struct info_handler *info); @@ -148,6 +178,10 @@ swim_member_uri(const struct swim_member *member); const struct tt_uuid * swim_member_uuid(const struct swim_member *member); +/** Member's incarnation. */ +uint64_t +swim_member_incarnation(const struct swim_member *member); + #if defined(__cplusplus) } #endif diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index 015968a0d..504f64f32 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -67,11 +67,32 @@ swim_task_create(struct swim_task *task, swim_task_f complete, rlist_create(&task->in_queue_output); } +struct swim_task * +swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc) +{ + struct swim_task *task = (struct swim_task *) malloc(sizeof(*task)); + if (task == NULL) { + diag_set(OutOfMemory, sizeof(*task), "malloc", "task"); + return NULL; + } + swim_task_create(task, complete, cancel, desc); + return task; +} + +void +swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler, + int rc) +{ + (void) rc; + (void) scheduler; + free(task); +} + /** Put the task into the queue of output tasks. */ static inline void swim_task_schedule(struct swim_task *task, struct swim_scheduler *scheduler) { - assert(rlist_empty(&task->in_queue_output)); + assert(! swim_task_is_scheduled(task)); rlist_add_tail_entry(&scheduler->queue_output, task, in_queue_output); swim_ev_io_start(loop(), &scheduler->output); } diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index bc62a29ce..a6bc28ad3 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -207,6 +207,13 @@ struct swim_task { const char *desc; }; +/** Check if @a task is already scheduled. */ +static inline bool +swim_task_is_scheduled(struct swim_task *task) +{ + return ! rlist_empty(&task->in_queue_output); +} + /** * Put the task into a queue of tasks. Eventually it will be sent. */ @@ -219,6 +226,15 @@ void swim_task_create(struct swim_task *task, swim_task_f complete, swim_task_f cancel, const char *desc); +/** Allocate and create a new task. */ +struct swim_task * +swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc); + +/** Callback to delete a task after its completion. */ +void +swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler, + int rc); + /** Destroy the task, pop from the queue. */ static inline void swim_task_destroy(struct swim_task *task) diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index bf4c09b24..93b7938b6 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -36,6 +36,12 @@ const char *swim_member_status_strs[] = { "alive", + "dead", +}; + +const char *swim_fd_msg_type_strs[] = { + "ping", + "ack", }; int @@ -182,6 +188,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member uuid") != 0) return -1; break; + case SWIM_MEMBER_INCARNATION: + if (swim_decode_uint(pos, end, &def->incarnation, prefix, + "member incarnation") != 0) + return -1; + break; default: unreachable(); } @@ -229,6 +240,70 @@ swim_src_uuid_bin_create(struct swim_src_uuid_bin *header, memcpy(header->v_uuid, uuid, UUID_LEN); } +void +swim_fd_header_bin_create(struct swim_fd_header_bin *header, + enum swim_fd_msg_type type, uint64_t incarnation) +{ + header->k_header = SWIM_FAILURE_DETECTION; + header->m_header = 0x82; + + header->k_type = SWIM_FD_MSG_TYPE; + header->v_type = type; + + header->k_incarnation = SWIM_FD_INCARNATION; + header->m_incarnation = 0xcf; + header->v_incarnation = mp_bswap_u64(incarnation); +} + +int +swim_failure_detection_def_decode(struct swim_failure_detection_def *def, + const char **pos, const char *end, + const char *prefix) +{ + uint32_t size; + if (swim_decode_map(pos, end, &size, prefix, "root") != 0) + return -1; + memset(def, 0, sizeof(*def)); + def->type = swim_fd_msg_type_MAX; + if (size != 2) { + diag_set(SwimError, "%s root map should have two keys - "\ + "message type and incarnation", prefix); + return -1; + } + for (int i = 0; i < (int) size; ++i) { + uint64_t key; + if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) + return -1; + switch(key) { + case SWIM_FD_MSG_TYPE: + if (swim_decode_uint(pos, end, &key, prefix, + "message type") != 0) + return -1; + if (key >= swim_fd_msg_type_MAX) { + diag_set(SwimError, "%s unknown message type", + prefix); + return -1; + } + def->type = key; + break; + case SWIM_FD_INCARNATION: + if (swim_decode_uint(pos, end, &def->incarnation, + prefix, "incarnation") != 0) + return -1; + break; + default: + diag_set(SwimError, "%s unexpected key", prefix); + return -1; + } + } + if (def->type == swim_fd_msg_type_MAX) { + diag_set(SwimError, "%s message type should be specified", + prefix); + return -1; + } + return 0; +} + void swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, uint16_t batch_size) @@ -241,18 +316,19 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, void swim_member_bin_fill(struct swim_member_bin *header, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status) + enum swim_member_status status, uint64_t incarnation) { header->v_status = status; header->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr)); header->v_port = mp_bswap_u16(ntohs(addr->sin_port)); memcpy(header->v_uuid, uuid, UUID_LEN); + header->v_incarnation = mp_bswap_u64(incarnation); } void swim_member_bin_create(struct swim_member_bin *header) { - header->m_header = 0x84; + header->m_header = 0x85; header->k_status = SWIM_MEMBER_STATUS; header->k_addr = SWIM_MEMBER_ADDRESS; header->m_addr = 0xce; @@ -261,6 +337,8 @@ swim_member_bin_create(struct swim_member_bin *header) header->k_uuid = SWIM_MEMBER_UUID; header->m_uuid = 0xc4; header->m_uuid_len = UUID_LEN; + header->k_incarnation = SWIM_MEMBER_INCARNATION; + header->m_incarnation = 0xcf; } void diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index 4f3cdf03d..c120f5733 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -49,12 +49,20 @@ * | | * | AND | * | | + * | SWIM_FAILURE_DETECTION: { | + * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | + * | SWIM_FD_INCARNATION: uint | + * | }, | + * | | + * | OR/AND | + * | | * | SWIM_ANTI_ENTROPY: [ | * | { | * | SWIM_MEMBER_STATUS: uint, enum member_status, | * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | - * | SWIM_MEMBER_UUID: 16 byte UUID | + * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_INCARNATION: uint | * | }, | * | ... | * | ], | @@ -68,6 +76,11 @@ enum swim_member_status { /** The instance is ok, responds to requests. */ MEMBER_ALIVE = 0, + /** + * The member is considered to be dead. It will disappear + * from the membership after some unacknowledged pings. + */ + MEMBER_DEAD, swim_member_status_MAX, }; @@ -87,6 +100,7 @@ extern const char *swim_member_status_strs[]; struct swim_member_def { struct tt_uuid uuid; struct sockaddr_in addr; + uint64_t incarnation; enum swim_member_status status; }; @@ -117,6 +131,7 @@ swim_member_def_decode(struct swim_member_def *def, const char **pos, enum swim_body_key { SWIM_SRC_UUID = 0, SWIM_ANTI_ENTROPY, + SWIM_FAILURE_DETECTION, }; /** @@ -139,6 +154,79 @@ void swim_src_uuid_bin_create(struct swim_src_uuid_bin *header, const struct tt_uuid *uuid); +/** {{{ Failure detection component */ + +/** Failure detection component keys. */ +enum swim_fd_key { + /** Type of the failure detection message: ping or ack. */ + SWIM_FD_MSG_TYPE, + /** + * Incarnation of the sender. To make the member alive if + * it was considered to be dead, but ping/ack with greater + * incarnation was received from it. + */ + SWIM_FD_INCARNATION, +}; + +/** Failure detection message type. */ +enum swim_fd_msg_type { + SWIM_FD_MSG_PING, + SWIM_FD_MSG_ACK, + swim_fd_msg_type_MAX, +}; + +extern const char *swim_fd_msg_type_strs[]; + +/** SWIM failure detection MessagePack header template. */ +struct PACKED swim_fd_header_bin { + /** mp_encode_uint(SWIM_FAILURE_DETECTION) */ + uint8_t k_header; + /** mp_encode_map(2) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_FD_MSG_TYPE) */ + uint8_t k_type; + /** mp_encode_uint(enum swim_fd_msg_type) */ + uint8_t v_type; + + /** mp_encode_uint(SWIM_FD_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + +/** Initialize failure detection section. */ +void +swim_fd_header_bin_create(struct swim_fd_header_bin *header, + enum swim_fd_msg_type type, uint64_t incarnation); + +/** A decoded failure detection message. */ +struct swim_failure_detection_def { + /** Type of the message. */ + enum swim_fd_msg_type type; + /** Incarnation of the sender. */ + uint64_t incarnation; +}; + +/** + * Decode failure detection from a MessagePack buffer. + * @param[out] def Definition to decode into. + * @param[in][out] pos Start of the MessagePack buffer. + * @param end End of the MessagePack buffer. + * @param prefix A prefix of an error message to use for diag_set, + * when something is wrong. + * + * @retval 0 Success. + * @retval -1 Error. + */ +int +swim_failure_detection_def_decode(struct swim_failure_detection_def *def, + const char **pos, const char *end, + const char *prefix); + +/** }}} Failure detection component */ + /** {{{ Anti-entropy component */ /** @@ -150,6 +238,7 @@ enum swim_member_key { SWIM_MEMBER_ADDRESS, SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, + SWIM_MEMBER_INCARNATION, swim_member_key_MAX, }; @@ -172,7 +261,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, * anti-entropy section. */ struct PACKED swim_member_bin { - /** mp_encode_map(4) */ + /** mp_encode_map(5) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -198,6 +287,12 @@ struct PACKED swim_member_bin { uint8_t m_uuid; uint8_t m_uuid_len; uint8_t v_uuid[UUID_LEN]; + + /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; }; /** Initialize antri-entropy record. */ @@ -213,7 +308,7 @@ swim_member_bin_create(struct swim_member_bin *header); void swim_member_bin_fill(struct swim_member_bin *header, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status); + enum swim_member_status status, uint64_t incarnation); /** }}} Anti-entropy component */ diff --git a/test/unit/swim.c b/test/unit/swim.c index ea60be4ae..80787a0c7 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -105,11 +105,11 @@ swim_test_uuid_update(void) struct swim *s = swim_cluster_node(cluster, 0); struct tt_uuid new_uuid = uuid_nil; new_uuid.time_low = 1000; - is(swim_cfg(s, NULL, -1, &new_uuid), 0, "UUID update"); + is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), 0, "UUID update"); is(swim_cluster_wait_fullmesh(cluster, 1), 0, "old UUID is returned back as a 'ghost' member"); new_uuid.time_low = 2; - is(swim_cfg(s, NULL, -1, &new_uuid), -1, + is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), -1, "can not update to an existing UUID - swim_cfg fails"); ok(swim_error_check_match("exists"), "diag says 'exists'"); swim_cluster_delete(cluster); @@ -124,16 +124,16 @@ swim_test_cfg(void) struct swim *s = swim_new(); assert(s != NULL); - is(swim_cfg(s, NULL, -1, NULL), -1, "first cfg failed - no URI"); + is(swim_cfg(s, NULL, -1, -1, -1, NULL), -1, "first cfg failed - no URI"); ok(swim_error_check_match("mandatory"), "diag says 'mandatory'"); const char *uri = "127.0.0.1:1"; - is(swim_cfg(s, uri, -1, NULL), -1, "first cfg failed - no UUID"); + is(swim_cfg(s, uri, -1, -1, -1, NULL), -1, "first cfg failed - no UUID"); ok(swim_error_check_match("mandatory"), "diag says 'mandatory'"); struct tt_uuid uuid = uuid_nil; uuid.time_low = 1; - is(swim_cfg(s, uri, -1, &uuid), 0, "configured first time"); - is(swim_cfg(s, NULL, -1, NULL), 0, "second time can omit URI, UUID"); - is(swim_cfg(s, NULL, 2, NULL), 0, "hearbeat is dynamic"); + is(swim_cfg(s, uri, -1, -1, -1, &uuid), 0, "configured first time"); + is(swim_cfg(s, NULL, -1, -1, -1, NULL), 0, "second time can omit URI, UUID"); + is(swim_cfg(s, NULL, 2, 2, -1, NULL), 0, "hearbeat is dynamic"); const char *self_uri = swim_member_uri(swim_self(s)); is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\ "URI"); @@ -145,14 +145,16 @@ swim_test_cfg(void) const char *bad_uri3 = "unix/:/home/gerold103/any/dir"; struct tt_uuid uuid2 = uuid_nil; uuid2.time_low = 2; - is(swim_cfg(s2, bad_uri1, -1, &uuid2), -1, "can not use invalid URI"); + is(swim_cfg(s2, bad_uri1, -1, -1, -1, &uuid2), -1, + "can not use invalid URI"); ok(swim_error_check_match("invalid uri"), "diag says 'invalid uri'"); - is(swim_cfg(s2, bad_uri2, -1, &uuid2), -1, "can not use domain names"); + is(swim_cfg(s2, bad_uri2, -1, -1, -1, &uuid2), -1, + "can not use domain names"); ok(swim_error_check_match("invalid uri"), "diag says 'invalid uri'"); - is(swim_cfg(s2, bad_uri3, -1, &uuid2), -1, + is(swim_cfg(s2, bad_uri3, -1, -1, -1, &uuid2), -1, "UNIX sockets are not supported"); ok(swim_error_check_match("only IP"), "diag says 'only IP'"); - is(swim_cfg(s2, uri, -1, &uuid2), -1, + is(swim_cfg(s2, uri, -1, -1, -1, &uuid2), -1, "can not bind to an occupied port"); ok(swim_error_check_match("bind"), "diag says 'bind'"); swim_delete(s2); @@ -226,10 +228,140 @@ swim_test_add_remove(void) swim_finish_test(); } +static void +swim_test_basic_failure_detection(void) +{ + swim_start_test(7); + struct swim_cluster *cluster = swim_cluster_new(2); + swim_cluster_set_ack_timeout(cluster, 0.5); + + swim_cluster_add_link(cluster, 0, 1); + is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE, + "node is added as alive"); + swim_cluster_block_io(cluster, 1); + is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 2.4), -1, + "member still is not dead after 2 noacks"); + is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 0.1), 0, + "but it is dead after one more"); + + is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX, + 0.9), -1, + "after 1 more unack the member still is not deleted"); + is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX, + 0.1), 0, "but it is dropped after 1 more"); + + /* + * After IO unblock pending messages will be processed all + * at once. S2 will learn about S1. After one more round + * step it should be fullmesh. + */ + swim_cluster_unblock_io(cluster, 1); + is(swim_cluster_wait_fullmesh(cluster, 1), 0, "fullmesh is restored"); + + /* A member can be removed during an ACK wait. */ + swim_cluster_block_io(cluster, 1); + /* Next round after 1 sec + let ping hang for 0.25 sec. */ + swim_run_for(1.25); + struct swim *s1 = swim_cluster_node(cluster, 0); + struct swim *s2 = swim_cluster_node(cluster, 1); + const struct swim_member *s2_self = swim_self(s2); + swim_remove_member(s1, swim_member_uuid(s2_self)); + swim_cluster_unblock_io(cluster, 1); + swim_run_for(0.1); + is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE, + "a member is added back on an ACK"); + + swim_cluster_delete(cluster); + swim_finish_test(); +} + +static void +swim_test_basic_gossip(void) +{ + swim_start_test(3); + struct swim_cluster *cluster = swim_cluster_new(3); + swim_cluster_set_ack_timeout(cluster, 10); + /* + * Test basic gossip. S1 and S2 know each other. Then S2 + * starts losing packets. S1 does not receive 2 ACKs from + * S2. Then S3 joins the cluster and explicitly learns + * about S1 and S2. After one more unack S1 declares S2 as + * dead, and via anti-entropy S3 learns the same. Even + * earlier than it could discover the same via its own + * pings to S2. + */ + swim_cluster_add_link(cluster, 0, 1); + swim_cluster_add_link(cluster, 1, 0); + swim_cluster_set_drop(cluster, 1, true); + /* + * Wait two no-ACKs on S1 from S2. +1 sec to send a first + * ping. + */ + swim_run_for(20 + 1); + swim_cluster_add_link(cluster, 0, 2); + swim_cluster_add_link(cluster, 2, 1); + is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 10), 0, + "S1 sees S2 as dead"); + is(swim_cluster_member_status(cluster, 2, 1), MEMBER_ALIVE, + "S3 still thinks that S2 is alive"); + /* + * At most after two round steps S1 sends 'S2 is dead' to + * S3. + */ + is(swim_cluster_wait_status(cluster, 2, 1, MEMBER_DEAD, 2), 0, + "S3 learns about dead S2 from S1"); + + swim_cluster_delete(cluster); + swim_finish_test(); +} + +static void +swim_test_probe(void) +{ + swim_start_test(2); + struct swim_cluster *cluster = swim_cluster_new(2); + + struct swim *s1 = swim_cluster_node(cluster, 0); + struct swim *s2 = swim_cluster_node(cluster, 1); + const char *s2_uri = swim_member_uri(swim_self(s2)); + is(swim_probe_member(s1, s2_uri), 0, "send probe"); + is(swim_cluster_wait_fullmesh(cluster, 0.1), 0, + "receive ACK on probe and get fullmesh") + + swim_cluster_delete(cluster); + swim_finish_test(); +} + +static void +swim_test_refute(void) +{ + swim_start_test(4); + struct swim_cluster *cluster = swim_cluster_new(2); + swim_cluster_set_ack_timeout(cluster, 2); + + swim_cluster_add_link(cluster, 0, 1); + swim_cluster_set_drop(cluster, 1, true); + fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 7) != 0); + swim_cluster_set_drop(cluster, 1, false); + is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, + "S2 increments its own incarnation to refute its death"); + is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 1), 0, + "new incarnation has reached S1 with a next round message"); + + swim_cluster_restart_node(cluster, 1); + is(swim_cluster_member_incarnation(cluster, 1, 1), 0, + "after restart S2's incarnation is 0 again"); + is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, + "S2 learned its old bigger incarnation 1 from S0"); + + swim_cluster_delete(cluster); + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(5); + swim_start_test(9); (void) ap; swim_test_ev_init(); @@ -240,6 +372,10 @@ main_f(va_list ap) swim_test_uuid_update(); swim_test_cfg(); swim_test_add_remove(); + swim_test_basic_failure_detection(); + swim_test_probe(); + swim_test_refute(); + swim_test_basic_gossip(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index 42cef1612..2503cbfcd 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..5 +1..9 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -60,4 +60,36 @@ ok 4 - subtests ok 13 - back in fullmesh after a member removal in the middle of a step ok 5 - subtests *** swim_test_add_remove: done *** + *** swim_test_basic_failure_detection *** + 1..7 + ok 1 - node is added as alive + ok 2 - member still is not dead after 2 noacks + ok 3 - but it is dead after one more + ok 4 - after 1 more unack the member still is not deleted + ok 5 - but it is dropped after 1 more + ok 6 - fullmesh is restored + ok 7 - a member is added back on an ACK +ok 6 - subtests + *** swim_test_basic_failure_detection: done *** + *** swim_test_probe *** + 1..2 + ok 1 - send probe + ok 2 - receive ACK on probe and get fullmesh +ok 7 - subtests + *** swim_test_probe: done *** + *** swim_test_refute *** + 1..4 + ok 1 - S2 increments its own incarnation to refute its death + ok 2 - new incarnation has reached S1 with a next round message + ok 3 - after restart S2's incarnation is 0 again + ok 4 - S2 learned its old bigger incarnation 1 from S0 +ok 8 - subtests + *** swim_test_refute: done *** + *** swim_test_basic_gossip *** + 1..3 + ok 1 - S1 sees S2 as dead + ok 2 - S3 still thinks that S2 is alive + ok 3 - S3 learns about dead S2 from S1 +ok 9 - subtests + *** swim_test_basic_gossip: done *** *** main_f: done *** diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c index d1c3e97d7..d121292b1 100644 --- a/test/unit/swim_test_transport.c +++ b/test/unit/swim_test_transport.c @@ -95,7 +95,11 @@ struct swim_fd { * blocked, new messages are queued, but not delivered. */ bool is_opened; - + /** + * True if any message sent to that fd should be just + * dropped, not queued. + */ + bool is_dropping; /** * Link in the list of opened and non-blocked descriptors. * Used to feed them all EV_WRITE. @@ -260,6 +264,14 @@ swim_test_transport_unblock_fd(int fd) rlist_add_tail_entry(&swim_fd_active, sfd, in_active); } +void +swim_test_transport_set_drop(int fd, bool value) +{ + struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE]; + if (sfd->is_opened) + sfd->is_dropping = value; +} + /** Send one packet to destination's recv queue. */ static inline void swim_fd_send_packet(struct swim_fd *fd) @@ -270,7 +282,7 @@ swim_fd_send_packet(struct swim_fd *fd) rlist_shift_entry(&fd->send_queue, struct swim_test_packet, in_queue); struct swim_fd *dst = &swim_fd[ntohs(p->dst.sin_port)]; - if (dst->is_opened) + if (dst->is_opened && ! dst->is_dropping && ! fd->is_dropping) rlist_add_tail_entry(&dst->recv_queue, p, in_queue); else swim_test_packet_delete(p); diff --git a/test/unit/swim_test_transport.h b/test/unit/swim_test_transport.h index f8066e636..5a1a92271 100644 --- a/test/unit/swim_test_transport.h +++ b/test/unit/swim_test_transport.h @@ -30,6 +30,7 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include <stdbool.h> struct ev_loop; /** @@ -58,6 +59,14 @@ swim_test_transport_block_fd(int fd); void swim_test_transport_unblock_fd(int fd); +/** + * Set to true, if all incomming and outgoing packets should be + * dropped. Note, that the node, owning @a fd, thinks, that its + * packets are sent. + */ +void +swim_test_transport_set_drop(int fd, bool value); + /** Initialize test transport system. */ void swim_test_transport_init(void); diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 0d62bb26c..0415a3035 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -62,13 +62,24 @@ swim_cluster_new(int size) assert(res->node[i] != NULL); sprintf(uri, "127.0.0.1:%d", i + 1); uuid.time_low = i + 1; - int rc = swim_cfg(res->node[i], uri, -1, &uuid); + int rc = swim_cfg(res->node[i], uri, -1, -1, -1, &uuid); assert(rc == 0); (void) rc; } return res; } +void +swim_cluster_set_ack_timeout(struct swim_cluster *cluster, double ack_timeout) +{ + for (int i = 0; i < cluster->size; ++i) { + int rc = swim_cfg(cluster->node[i], NULL, -1, ack_timeout, -1, + NULL); + assert(rc == 0); + (void) rc; + } +} + void swim_cluster_delete(struct swim_cluster *cluster) { @@ -107,6 +118,17 @@ swim_cluster_member_status(struct swim_cluster *cluster, int node_id, return swim_member_status(m); } +uint64_t +swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, + int member_id) +{ + const struct swim_member *m = + swim_cluster_member_view(cluster, node_id, member_id); + if (m == NULL) + return UINT64_MAX; + return swim_member_incarnation(m); +} + struct swim * swim_cluster_node(struct swim_cluster *cluster, int i) { @@ -114,6 +136,25 @@ swim_cluster_node(struct swim_cluster *cluster, int i) return cluster->node[i]; } +void +swim_cluster_restart_node(struct swim_cluster *cluster, int i) +{ + assert(i >= 0 && i < cluster->size); + struct swim *s = cluster->node[i]; + const struct swim_member *self = swim_self(s); + struct tt_uuid uuid = *swim_member_uuid(self); + char uri[128]; + strcpy(uri, swim_member_uri(self)); + double ack_timeout = swim_ack_timeout(s); + swim_delete(s); + s = swim_new(); + assert(s != NULL); + int rc = swim_cfg(s, uri, -1, ack_timeout, -1, &uuid); + assert(rc == 0); + (void) rc; + cluster->node[i] = s; +} + void swim_cluster_block_io(struct swim_cluster *cluster, int i) { @@ -126,6 +167,12 @@ swim_cluster_unblock_io(struct swim_cluster *cluster, int i) swim_test_transport_unblock_fd(swim_fd(cluster->node[i])); } +void +swim_cluster_set_drop(struct swim_cluster *cluster, int i, bool value) +{ + swim_test_transport_set_drop(swim_fd(cluster->node[i]), value); +} + /** Check if @a s1 knows every member of @a s2's table. */ static inline bool swim1_contains_swim2(struct swim *s1, struct swim *s2) @@ -186,6 +233,28 @@ swim_run_for(double duration) swim_wait_timeout(duration, false); } +int +swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, + int member_id, enum swim_member_status status, + double timeout) +{ + return swim_wait_timeout(timeout, + swim_cluster_member_status(cluster, node_id, + member_id) == status + ); +} + +int +swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, + int member_id, uint64_t incarnation, + double timeout) +{ + return swim_wait_timeout(timeout, + swim_cluster_member_incarnation(cluster, node_id, + member_id) == incarnation + ); +} + bool swim_error_check_match(const char *msg) { diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index befb95420..8fba5c2da 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -44,6 +44,10 @@ struct swim_cluster; struct swim_cluster * swim_cluster_new(int size); +/** Change ACK timeout of all the instances in the cluster. */ +void +swim_cluster_set_ack_timeout(struct swim_cluster *cluster, double ack_timeout); + /** Delete all the SWIM instances, and the cluster itself. */ void swim_cluster_delete(struct swim_cluster *cluster); @@ -56,6 +60,10 @@ swim_error_check_match(const char *msg); struct swim * swim_cluster_node(struct swim_cluster *cluster, int i); +/** Drop and create again a SWIM instance with id @a i. */ +void +swim_cluster_restart_node(struct swim_cluster *cluster, int i); + /** Block IO on a SWIM instance with id @a i. */ void swim_cluster_block_io(struct swim_cluster *cluster, int i); @@ -64,6 +72,9 @@ swim_cluster_block_io(struct swim_cluster *cluster, int i); void swim_cluster_unblock_io(struct swim_cluster *cluster, int i); +void +swim_cluster_set_drop(struct swim_cluster *cluster, int i, bool value); + /** * Explicitly add a member of id @a from_id to a member of id * @a to_id. @@ -75,6 +86,10 @@ enum swim_member_status swim_cluster_member_status(struct swim_cluster *cluster, int node_id, int member_id); +uint64_t +swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, + int member_id); + /** * Check if in the cluster every instance knowns the about other * instances. @@ -86,6 +101,21 @@ swim_cluster_is_fullmesh(struct swim_cluster *cluster); int swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout); +/** + * Wait until a member with id @a member_id is seen with @a status + * in the membership table of a member with id @a node_id. At most + * @a timeout seconds. + */ +int +swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, + int member_id, enum swim_member_status status, + double timeout); + +int +swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, + int member_id, uint64_t incarnation, + double timeout); + /** Process SWIM events for @a duration fake seconds. */ void swim_run_for(double duration); -- 2.17.2 (Apple Git-113) ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 6/6] [RAW] swim: introduce failure detection component 2019-03-20 10:49 ` [tarantool-patches] [PATCH 6/6] [RAW] swim: introduce failure detection component Vladislav Shpilevoy @ 2019-03-29 18:59 ` Konstantin Osipov 2019-04-02 12:25 ` Vladislav Shpilevoy 0 siblings, 1 reply; 32+ messages in thread From: Konstantin Osipov @ 2019-03-29 18:59 UTC (permalink / raw) To: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: > > > +/** > + * Decode a failure detection message. Schedule acks, process > + * acks. > + */ > +static int > +swim_process_failure_detection(struct swim *swim, const char **pos, > + const char *end, const struct sockaddr_in *src, > + const struct tt_uuid *uuid) > +{ > + const char *prefix = "invalid failure detection message:"; > + struct swim_failure_detection_def def; > + struct swim_member_def mdef; > + if (swim_failure_detection_def_decode(&def, pos, end, prefix) != 0) > + return -1; > + say_verbose("SWIM %d: process failure detection's %s", swim_fd(swim), > + swim_fd_msg_type_strs[def.type]); > + swim_member_def_create(&mdef); > + mdef.addr = *src; > + mdef.incarnation = def.incarnation; > + mdef.uuid = *uuid; > + struct swim_member *member = swim_upsert_member(swim, &mdef); > + if (member == NULL) > + return -1; > + if (def.incarnation < member->incarnation) > + return 0; I think this branch a comment. Why do you not respond to pings from older incarnations? > + if (no_acks_to_gc >= 0) > + swim->no_acks_to_gc = no_acks_to_gc; Why do you actually need a custom no_acks_to_gc? Could you please add a comment for this variable declaration? > +enum { > + /** > + * Value being used to say that unacknowledged pings > + * should not affect a certain decision about member. For > + * example, regardless of number of unacked pings, never > + * drop a member. > + */ > + SWIM_NO_ACKS_IGNORE = INT_MAX, > +}; OK, now I get it. You use it to pin members. I would actually use bool is_pinned for that. It seems you make no_acks_to_gc a variable only to use it for pinning. Let's add a separate varaible for pins. > +/** > + * Send a ping to this address. If an ACK is received, the member > + * will be added. Could you also please explain in the comment why this is useful? E.g. why would I want to add a member only after a successful ping/ack? > + */ > +int > +swim_probe_member(struct swim *swim, const char *uri); > + > + SWIM_FD_MSG_TYPE, The patch is generally OK to push after fixing the minor comments above. I also changed a few comments on the branch. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 6/6] [RAW] swim: introduce failure detection component 2019-03-29 18:59 ` [tarantool-patches] " Konstantin Osipov @ 2019-04-02 12:25 ` Vladislav Shpilevoy 2019-04-04 10:20 ` Vladislav Shpilevoy 2019-04-04 12:45 ` Konstantin Osipov 0 siblings, 2 replies; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-04-02 12:25 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov On 29/03/2019 21:59, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/20 14:11]: >> >> >> +/** >> + * Decode a failure detection message. Schedule acks, process >> + * acks. >> + */ >> +static int >> +swim_process_failure_detection(struct swim *swim, const char **pos, >> + const char *end, const struct sockaddr_in *src, >> + const struct tt_uuid *uuid) >> +{ >> + const char *prefix = "invalid failure detection message:"; >> + struct swim_failure_detection_def def; >> + struct swim_member_def mdef; >> + if (swim_failure_detection_def_decode(&def, pos, end, prefix) != 0) >> + return -1; >> + say_verbose("SWIM %d: process failure detection's %s", swim_fd(swim), >> + swim_fd_msg_type_strs[def.type]); >> + swim_member_def_create(&mdef); >> + mdef.addr = *src; >> + mdef.incarnation = def.incarnation; >> + mdef.uuid = *uuid; >> + struct swim_member *member = swim_upsert_member(swim, &mdef); >> + if (member == NULL) >> + return -1; >> + if (def.incarnation < member->incarnation) >> + return 0; > > I think this branch a comment. Why do you not respond to pings > from older incarnations? Firstly, I am trying to be consistent in neglect of old messages. I just pretend that they do not exist. Despite their type and purpose. Secondly, if we know a bigger incarnation of the member, then we can be sure, that after this stray ping we have already had some interaction with that member, and it is not necessary to garbage the network with that apparently excessive ACK. I added a comment with the things I said above. > >> +/** >> + * Send a ping to this address. If an ACK is received, the member >> + * will be added. > > Could you also please explain in the comment why this is useful? > E.g. why would I want to add a member only after a successful > ping/ack? It is not about successful ping/ack, but rather for a case when you don't know UUID. This method allows you to discover UUID knowing only URI. ============================================================================== diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 3a2a878fc..9659f03b8 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -123,7 +123,9 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid); /** * Send a ping to this address. If an ACK is received, the member - * will be added. + * will be added. The main purpose of the method is to add a new + * member manually but without knowledge of its UUID. The member + * will send it with an ACK. */ int swim_probe_member(struct swim *swim, const char *uri); ============================================================================== > >> + */ >> +int >> +swim_probe_member(struct swim *swim, const char *uri); >> + >> + SWIM_FD_MSG_TYPE, > > The patch is generally OK to push after fixing the minor comments > above. I also changed a few comments on the branch. Thanks, the fixes are applied. Partially though. For example, in one place you changed 'the SWIM paper' to 'SWIM paper'. But there are no the single SWIM paper in the world. And I used 'the' to emphasize one concrete SWIM paper among many. Besides, you left other 'the SWIM paper' as is. Also, I removed empty lines from swim_update_member_inc_status. We do not use the empty lines for such multi-line conditions. In addition, fixed some places out of 66 border. > >> +enum { >> + /** >> + * Value being used to say that unacknowledged pings >> + * should not affect a certain decision about member. For >> + * example, regardless of number of unacked pings, never >> + * drop a member. >> + */ >> + SWIM_NO_ACKS_IGNORE = INT_MAX, >> +}; > > OK, now I get it. You use it to pin members. I would actually use > bool is_pinned for that. It seems you make no_acks_to_gc a > variable only to use it for pinning. Let's add a separate varaible > for pins. Discussed verbally. Pinning is necessary when members addition and removal is managed by user and its external systems, while SWIM is used for failure detection only. We decided to enclose no_acks_to_gc and provide a user with pinning API only. But there is a problem that when a new member is added implicitly, via anti-entropy, for example, then it is not pinned. A user would need to track every new member and pin it manually. So for that case we decided to introduce a flag 'auto_pin'. With that flag set every new member is pinned automatically. But once I started implementation of thar idea, I faced with a dubious moment - what to do with existing members when auto_pin = true? And do we need point pinning now? I decided, that - we can postpone point pinning until a specific request for that; - instead of auto_pin it is better to add an explicit GC mode, because in a nutshell auto-pin is actually a GC turned off. Lets call a spade a spade. With that simplifications I managed to make the incremental diff quite small and simple for that comment. See below: ============================================================================== diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 188e5bd9a..472137db3 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -160,7 +160,7 @@ enum { * 2) disseminate the status via dissemination and * anti-entropy components. */ - NO_ACKS_TO_GC_DEFAULT = 2, + NO_ACKS_TO_GC = 2, }; /** @@ -362,12 +362,8 @@ struct swim { heap_t wait_ack_heap; /** Generator of ack checking events. */ struct ev_timer wait_ack_tick; - /** - * How many pings to a dead member should be - * unacknowledged for it to be deleted from the member - * table. - */ - int no_acks_to_gc; + /** GC state saying how to remove dead members. */ + enum swim_gc_mode gc_mode; }; /** Put the member into a list of ACK waiters. */ @@ -875,7 +871,8 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events) } break; case MEMBER_DEAD: - if (m->unacknowledged_pings >= swim->no_acks_to_gc) { + if (m->unacknowledged_pings >= NO_ACKS_TO_GC && + swim->gc_mode == SWIM_GC_ON) { swim_delete_member(swim, m); continue; } @@ -968,8 +965,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, { struct swim_member *member = swim_find_member(swim, &def->uuid); if (member == NULL) { - if (def->status == MEMBER_DEAD && - swim->no_acks_to_gc != SWIM_NO_ACKS_IGNORE) { + if (def->status == MEMBER_DEAD && swim->gc_mode == SWIM_GC_ON) { /* * Do not 'resurrect' dead members to * prevent 'ghost' members. Ghost member @@ -1185,7 +1181,7 @@ swim_new(void) swim_ev_timer_init(&swim->wait_ack_tick, swim_check_acks, ACK_TIMEOUT_DEFAULT, 0); swim->wait_ack_tick.data = (void *) swim; - swim->no_acks_to_gc = NO_ACKS_TO_GC_DEFAULT; + swim->gc_mode = SWIM_GC_ON; return swim; } @@ -1216,7 +1212,8 @@ swim_uri_to_addr(const char *uri, struct sockaddr_in *addr, int swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, - double ack_timeout, int no_acks_to_gc, const struct tt_uuid *uuid) + double ack_timeout, enum swim_gc_mode gc_mode, + const struct tt_uuid *uuid) { const char *prefix = "swim.cfg:"; struct sockaddr_in addr; @@ -1284,8 +1281,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, /* Reserved above. */ assert(rc == 0); (void) rc; - if (no_acks_to_gc >= 0) - swim->no_acks_to_gc = no_acks_to_gc; + if (gc_mode != SWIM_GC_DEFAULT) + swim->gc_mode = gc_mode; return 0; } diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 9659f03b8..31150a18e 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -31,7 +31,6 @@ * SUCH DAMAGE. */ #include <stdbool.h> -#include <limits.h> #include <stdint.h> #include "swim_constants.h" @@ -39,22 +38,19 @@ extern "C" { #endif -enum { - /** - * Value being used to say that unacknowledged pings - * should not affect a certain decision about member. For - * example, regardless of number of unacked pings, never - * drop a member. - */ - SWIM_NO_ACKS_IGNORE = INT_MAX, -}; - struct info_handler; struct swim; struct tt_uuid; struct swim_iterator; struct swim_member; +/** Types of SWIM dead member deletion policy. */ +enum swim_gc_mode { + /** Just keep the current mode as is. */ + SWIM_GC_DEFAULT = -1, + /** + * Turn GC off. With that mode dead members are never + * deleted automatically. + */ + SWIM_GC_OFF = 0, + /** + * Turn GC on. Normal classical SWIM GC mode. Dead members + * are deleted automatically after a number of + * unacknowledged pings. + */ + SWIM_GC_ON = 1, +}; + /** * Create a new SWIM instance. Do not bind to a port or set any * parameters. Allocation and initialization only. @@ -78,12 +74,11 @@ swim_is_configured(const struct swim *swim); * @heartbeat_rate. * @param ack_timeout Time in seconds after which a ping is * considered to be unacknowledged. - * @param no_acks_to_gc How many pings to a dead member should be - * unacknowledged to delete it from the member table. Big - * or even almost infinite (SWIM_NO_ACKS_IGNORE) values - * could be useful, if SWIM is used mainly for monitoring - * of existing nodes with manual removal of dead ones, and - * probably with only single initial discovery. + * @param gc_mode Says if members should never be deleted due to + * too many unacknowledged pings. It could be useful, if + * SWIM is used mainly for monitoring of existing nodes + * with manual removal of dead ones, and probably with only + * a single initial discovery. * @param uuid UUID of this instance. Must be unique over the * cluster. * @@ -92,7 +87,8 @@ swim_is_configured(const struct swim *swim); */ int swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, - double ack_timeout, int no_acks_to_gc, const struct tt_uuid *uuid); + double ack_timeout, enum swim_gc_mode gc_mode, + const struct tt_uuid *uuid); /** SWIM's ACK timeout, previously set via @sa swim_cfg. */ double diff --git a/test/unit/swim.c b/test/unit/swim.c index f3a31f992..32d223a7a 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -410,7 +410,7 @@ swim_test_undead(void) { swim_start_test(2); struct swim_cluster *cluster = swim_cluster_new(2); - swim_cluster_set_no_acks_to_gc(cluster, SWIM_NO_ACKS_IGNORE); + swim_cluster_set_gc(cluster, SWIM_GC_OFF); swim_cluster_set_ack_timeout(cluster, 1); swim_cluster_add_link(cluster, 0, 1); swim_cluster_add_link(cluster, 1, 0); diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 57faf803a..bb413372c 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -84,9 +84,9 @@ swim_cluster_set_ack_timeout(struct swim_cluster *cluster, double ack_timeout) } void -swim_cluster_set_no_acks_to_gc(struct swim_cluster *cluster, int no_acks_to_gc) +swim_cluster_set_gc(struct swim_cluster *cluster, enum swim_gc_mode gc_mode) { - swim_cluster_set_cfg(cluster, NULL, -1, -1, no_acks_to_gc, NULL); + swim_cluster_set_cfg(cluster, NULL, -1, -1, gc_mode, NULL); } void diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 494808df0..c0ecf27e0 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -53,7 +53,7 @@ swim_cluster_set_ack_timeout(struct swim_cluster *cluster, double ack_timeout); * of all the instances in the cluster. */ void -swim_cluster_set_no_acks_to_gc(struct swim_cluster *cluster, int no_acks_to_gc); +swim_cluster_set_gc(struct swim_cluster *cluster, enum swim_gc_mode gc_mode); /** Delete all the SWIM instances, and the cluster itself. */ void ============================================================================== ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 6/6] [RAW] swim: introduce failure detection component 2019-04-02 12:25 ` Vladislav Shpilevoy @ 2019-04-04 10:20 ` Vladislav Shpilevoy 2019-04-04 12:45 ` Konstantin Osipov 1 sibling, 0 replies; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-04-04 10:20 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Hi! Just a kind poke to review the new version of the patch. ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 6/6] [RAW] swim: introduce failure detection component 2019-04-02 12:25 ` Vladislav Shpilevoy 2019-04-04 10:20 ` Vladislav Shpilevoy @ 2019-04-04 12:45 ` Konstantin Osipov 2019-04-04 13:57 ` Vladislav Shpilevoy 1 sibling, 1 reply; 32+ messages in thread From: Konstantin Osipov @ 2019-04-04 12:45 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/02 15:27]: > OK to push. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 6/6] [RAW] swim: introduce failure detection component 2019-04-04 12:45 ` Konstantin Osipov @ 2019-04-04 13:57 ` Vladislav Shpilevoy 2019-04-04 16:14 ` Vladimir Davydov 0 siblings, 1 reply; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-04-04 13:57 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed to master. On 04/04/2019 15:45, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/02 15:27]: >> > > OK to push. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 6/6] [RAW] swim: introduce failure detection component 2019-04-04 13:57 ` Vladislav Shpilevoy @ 2019-04-04 16:14 ` Vladimir Davydov 2019-04-04 16:47 ` Vladislav Shpilevoy 0 siblings, 1 reply; 32+ messages in thread From: Vladimir Davydov @ 2019-04-04 16:14 UTC (permalink / raw) To: Vladislav Shpilevoy; +Cc: tarantool-patches, Konstantin Osipov On Thu, Apr 04, 2019 at 04:57:21PM +0300, Vladislav Shpilevoy wrote: > Pushed to master. This commit broke tests: https://travis-ci.org/tarantool/tarantool/jobs/515726901 Please fix. ^ permalink raw reply [flat|nested] 32+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 6/6] [RAW] swim: introduce failure detection component 2019-04-04 16:14 ` Vladimir Davydov @ 2019-04-04 16:47 ` Vladislav Shpilevoy 0 siblings, 0 replies; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-04-04 16:47 UTC (permalink / raw) To: tarantool-patches, Vladimir Davydov; +Cc: Konstantin Osipov Fixed, pushed to master. Travis from the branch with a fix: https://travis-ci.org/tarantool/tarantool/builds/515797027?utm_source=github_status&utm_medium=notification On 04/04/2019 19:14, Vladimir Davydov wrote: > On Thu, Apr 04, 2019 at 04:57:21PM +0300, Vladislav Shpilevoy wrote: >> Pushed to master. > > This commit broke tests: > > https://travis-ci.org/tarantool/tarantool/jobs/515726901 > > Please fix. > ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] [PATCH 7/6] swim: make swim_upsert_member returning two values 2019-03-20 10:49 [tarantool-patches] [PATCH 0/6] SWIM failure detection draft Vladislav Shpilevoy ` (5 preceding siblings ...) 2019-03-20 10:49 ` [tarantool-patches] [PATCH 6/6] [RAW] swim: introduce failure detection component Vladislav Shpilevoy @ 2019-03-27 19:28 ` Vladislav Shpilevoy 2019-03-28 8:52 ` [tarantool-patches] " Konstantin Osipov 6 siblings, 1 reply; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-03-27 19:28 UTC (permalink / raw) To: tarantool-patches; +Cc: kostja Here is one new patch. It appeared to be usefull both for the failure detection and for the dissemination. ================================================================ swim_upsert_member is a function to add new members or update existing ones using info received from other cluster participants. At this moment it is quite simple and straightforward: either create a new member and return it, or update an existing member and return it. But things are going to change in failure detection and dissemination components. A couple of examples showing that a member should be returned separately from success/error status: * To prevent undead members the failure detection forbids to add dead members. Otherwise a dead member would be added and removed back and forth by different components infinitely. Upsert for such members should return NULL, but it is not an error - it is a normal function of the protocol; * When the dissemination component receives a UUID update of an existing member, but with too old incarnation, it does not apply that update. And can not return that member from upsert because of UUID being different from the one in the 'request'. In such a case it should be ok to return NULL, but do not deem it an error. Part of #3234 --- diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index df34ce247..1b623fc27 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -657,22 +657,32 @@ swim_update_member_addr(struct swim *swim, struct swim_member *member, /** * Update or create a member by its definition, received from a * remote instance. - * @retval NULL Error. - * @retval New member, or updated old member. + * @param swim SWIM instance to upsert into. + * @param def Member definition to build a new member or update an + * existing one. + * @param[out] result A result member: a new, or an updated, or + * NULL in case of nothing has changed. For example, @a def + * was too old. + * + * @retval 0 Success. Member is added, or updated. Or nothing has + * changed but not always it is an error. + * @retval -1 Error. */ -static struct swim_member * -swim_upsert_member(struct swim *swim, const struct swim_member_def *def) +static int +swim_upsert_member(struct swim *swim, const struct swim_member_def *def, + struct swim_member **result) { struct swim_member *member = swim_find_member(swim, &def->uuid); if (member == NULL) { - member = swim_new_member(swim, &def->addr, &def->uuid, - def->status); - return member; + *result = swim_new_member(swim, &def->addr, &def->uuid, + def->status); + return *result != NULL ? 0 : -1; } struct swim_member *self = swim->self; if (member != self) swim_update_member_addr(swim, member, &def->addr); - return member; + *result = member; + return 0; } /** Decode an anti-entropy message, update member table. */ @@ -686,9 +696,10 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) return -1; for (uint64_t i = 0; i < size; ++i) { struct swim_member_def def; + struct swim_member *member; if (swim_member_def_decode(&def, pos, end, prefix) != 0) return -1; - if (swim_upsert_member(swim, &def) == NULL) { + if (swim_upsert_member(swim, &def, &member) != 0) { /* * Not a critical error. Other members * still can be updated. ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 7/6] swim: make swim_upsert_member returning two values 2019-03-27 19:28 ` [tarantool-patches] [PATCH 7/6] swim: make swim_upsert_member returning two values Vladislav Shpilevoy @ 2019-03-28 8:52 ` Konstantin Osipov 2019-03-28 11:52 ` Vladislav Shpilevoy 0 siblings, 1 reply; 32+ messages in thread From: Konstantin Osipov @ 2019-03-28 8:52 UTC (permalink / raw) To: tarantool-patches * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/28 09:02]: > Here is one new patch. It appeared to be usefull both > for the failure detection and for the dissemination. OK to push. -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov ^ permalink raw reply [flat|nested] 32+ messages in thread
* [tarantool-patches] Re: [PATCH 7/6] swim: make swim_upsert_member returning two values 2019-03-28 8:52 ` [tarantool-patches] " Konstantin Osipov @ 2019-03-28 11:52 ` Vladislav Shpilevoy 0 siblings, 0 replies; 32+ messages in thread From: Vladislav Shpilevoy @ 2019-03-28 11:52 UTC (permalink / raw) To: tarantool-patches, Konstantin Osipov Pushed to 2.1 and master. On 28/03/2019 11:52, Konstantin Osipov wrote: > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/28 09:02]: >> Here is one new patch. It appeared to be usefull both >> for the failure detection and for the dissemination. > > OK to push. > > > -- > Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 > http://tarantool.io - www.twitter.com/kostja_osipov > ^ permalink raw reply [flat|nested] 32+ messages in thread
end of thread, other threads:[~2019-04-04 16:47 UTC | newest] Thread overview: 32+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2019-03-20 10:49 [tarantool-patches] [PATCH 0/6] SWIM failure detection draft Vladislav Shpilevoy 2019-03-20 10:49 ` [tarantool-patches] [PATCH 1/6] swim: follow-ups for SWIM anti-entropy Vladislav Shpilevoy 2019-03-29 8:27 ` [tarantool-patches] " Konstantin Osipov 2019-03-29 10:19 ` Vladislav Shpilevoy 2019-03-20 10:49 ` [tarantool-patches] [PATCH 2/6] test: introduce breakpoints for swim's event loop Vladislav Shpilevoy 2019-03-29 18:20 ` [tarantool-patches] " Konstantin Osipov 2019-04-02 12:25 ` Vladislav Shpilevoy 2019-04-02 19:16 ` Vladislav Shpilevoy 2019-04-02 20:40 ` Konstantin Osipov 2019-04-02 21:26 ` Vladislav Shpilevoy 2019-03-20 10:49 ` [tarantool-patches] [PATCH 3/6] test: remove swim_unblock_fd event from swim test harness Vladislav Shpilevoy 2019-03-29 18:22 ` [tarantool-patches] " Konstantin Osipov 2019-04-02 21:26 ` Vladislav Shpilevoy 2019-03-20 10:49 ` [tarantool-patches] [PATCH 4/6] swim: expose enum swim_member_status to public API Vladislav Shpilevoy 2019-03-29 18:24 ` [tarantool-patches] " Konstantin Osipov 2019-04-02 12:25 ` Vladislav Shpilevoy 2019-04-02 13:17 ` Konstantin Osipov 2019-04-02 21:26 ` Vladislav Shpilevoy 2019-03-20 10:49 ` [tarantool-patches] [PATCH 5/6] test: differentiate blocked and closed swim fake fds Vladislav Shpilevoy 2019-03-29 18:25 ` [tarantool-patches] " Konstantin Osipov 2019-04-02 21:26 ` Vladislav Shpilevoy 2019-03-20 10:49 ` [tarantool-patches] [PATCH 6/6] [RAW] swim: introduce failure detection component Vladislav Shpilevoy 2019-03-29 18:59 ` [tarantool-patches] " Konstantin Osipov 2019-04-02 12:25 ` Vladislav Shpilevoy 2019-04-04 10:20 ` Vladislav Shpilevoy 2019-04-04 12:45 ` Konstantin Osipov 2019-04-04 13:57 ` Vladislav Shpilevoy 2019-04-04 16:14 ` Vladimir Davydov 2019-04-04 16:47 ` Vladislav Shpilevoy 2019-03-27 19:28 ` [tarantool-patches] [PATCH 7/6] swim: make swim_upsert_member returning two values Vladislav Shpilevoy 2019-03-28 8:52 ` [tarantool-patches] " Konstantin Osipov 2019-03-28 11:52 ` Vladislav Shpilevoy
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox