From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 5B7462A4F6 for ; Wed, 20 Mar 2019 06:49:23 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 1KWjvqzMW0Bn for ; Wed, 20 Mar 2019 06:49:23 -0400 (EDT) Received: from smtpng2.m.smailru.net (smtpng2.m.smailru.net [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 721A3266DE for ; Wed, 20 Mar 2019 06:49:22 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH 1/6] swim: follow-ups for SWIM anti-entropy Date: Wed, 20 Mar 2019 13:49:14 +0300 Message-Id: <5d4a8709a02fcff1a31f2ae252965c95d5a9fde3.1553078631.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org * fix some obvious errors in swim test utils; * fix a bug with NULL URI garbage on recfg; * fix typos in a comment and in a log message in swim.c; * do not start any timers in swim_cfg. Indeed, round timer should start only when at least one new member is added except self, and it is already done in swim_new_member; * log not only round begin, but each round step - it helps in debug, but does not affect production anyway because the logs are verbose; * in SWIM's event loop log new watch value instead of the old one - turned out, that new is more useful for debug; * log 'process component' inside swim_process_() functions. It is needed for failure detection, where a log of kind 'process failure detection' says nothing - much better to say 'process ping from', or 'process ack'; * in swim tests instead of swim_cluster_wait_...(max_steps) use swim_cluster_wait_...(timeout). Step count restriction appeared to be useful for anti-entropy being equal to number of round steps, but it is not so once failure detection appears. Replies for failure detection requests does not depend on SWIM heartbeat and affect step count in a not trivial way - it makes test writing, debugging and supporting much harder. Follow-up for 03b9a6e91baf246ee2bb9841d01ba3824b6768a6 --- src/lib/swim/swim.c | 23 ++++++++++++++--------- src/lib/swim/swim_io.c | 9 +++++---- src/lib/swim/swim_io.h | 6 +++++- src/lib/swim/swim_proto.c | 1 + test/unit/swim.c | 5 ++++- test/unit/swim.result | 19 ++++++++++--------- test/unit/swim_test_ev.c | 2 +- test/unit/swim_test_transport.c | 13 ++++++++++--- test/unit/swim_test_utils.c | 13 ++++++++----- test/unit/swim_test_utils.h | 6 ++---- 10 files changed, 60 insertions(+), 37 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 4b401800a..c2b2132a2 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -206,7 +206,7 @@ struct swim_member { */ struct tt_uuid uuid; /** - * Cached hash of the uuid for the members table lookups. + * Cached hash of the uuid for the member table lookups. */ uint32_t hash; /** @@ -406,7 +406,8 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node"); return NULL; } - swim_ev_timer_start(loop(), &swim->round_tick); + if (mh_size(swim->members) > 1) + swim_ev_timer_start(loop(), &swim->round_tick); say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim), swim_uuid_str(&member->uuid), mh_size(swim->members)); return member; @@ -448,8 +449,9 @@ swim_new_round(struct swim *swim) swim_fd(swim)); return 0; } + /* -1 for self. */ say_verbose("SWIM %d: start a new round with %d members", swim_fd(swim), - size); + size - 1); swim_shuffle_members(swim); rlist_create(&swim->round_queue); for (int i = 0; i < size; ++i) { @@ -550,7 +552,9 @@ swim_begin_step(struct ev_loop *loop, struct ev_timer *t, int events) (void) events; (void) loop; struct swim *swim = (struct swim *) t->data; - if (rlist_empty(&swim->round_queue) && swim_new_round(swim) != 0) { + if (! rlist_empty(&swim->round_queue)) { + say_verbose("SWIM %d: continue the round", swim_fd(swim)); + } else if (swim_new_round(swim) != 0) { diag_log(); return; } @@ -675,6 +679,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def) static int swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) { + say_verbose("SWIM %d: process anti-entropy", swim_fd(swim)); const char *prefix = "invalid anti-entropy message:"; uint32_t size; if (swim_decode_array(pos, end, &size, prefix, "root") != 0) @@ -726,8 +731,6 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos, goto error; switch(key) { case SWIM_ANTI_ENTROPY: - say_verbose("SWIM %d: process anti-entropy", - swim_fd(swim)); if (swim_process_anti_entropy(swim, &pos, end) != 0) goto error; break; @@ -760,7 +763,8 @@ swim_new(void) swim_ev_timer_init(&swim->round_tick, swim_begin_step, HEARTBEAT_RATE_DEFAULT, 0); swim->round_tick.data = (void *) swim; - swim_task_create(&swim->round_step_task, swim_complete_step, NULL); + swim_task_create(&swim->round_step_task, swim_complete_step, NULL, + "round packet"); swim_scheduler_create(&swim->scheduler, swim_on_input); return swim; } @@ -845,11 +849,12 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, * specified. */ addr = swim->scheduler.transport.addr; + } else { + addr = swim->self->addr; } if (swim->round_tick.at != heartbeat_rate && heartbeat_rate > 0) swim_ev_timer_set(&swim->round_tick, heartbeat_rate, 0); - swim_ev_timer_start(loop(), &swim->round_tick); swim_update_member_addr(swim, swim->self, &addr); int rc = swim_update_member_uuid(swim, swim->self, uuid); /* Reserved above. */ @@ -892,7 +897,7 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid) assert(swim_is_configured(swim)); const char *prefix = "swim.remove_member:"; if (uuid == NULL || tt_uuid_is_nil(uuid)) { - diag_set(SwimError, "%s UUiD is mandatory", prefix); + diag_set(SwimError, "%s UUID is mandatory", prefix); return -1; } struct swim_member *member = swim_find_member(swim, uuid); diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index 9c16d1ad3..015968a0d 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -57,11 +57,12 @@ swim_packet_create(struct swim_packet *packet) void swim_task_create(struct swim_task *task, swim_task_f complete, - swim_task_f cancel) + swim_task_f cancel, const char *desc) { memset(task, 0, sizeof(*task)); task->complete = complete; task->cancel = cancel; + task->desc = desc; swim_packet_create(&task->packet); rlist_create(&task->in_queue_output); } @@ -170,9 +171,9 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) struct swim_task *task = rlist_shift_entry(&scheduler->queue_output, struct swim_task, in_queue_output); - say_verbose("SWIM %d: send to %s", swim_scheduler_fd(scheduler), - sio_strfaddr((struct sockaddr *) &task->dst, - sizeof(task->dst))); + say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler), + task->desc, sio_strfaddr((struct sockaddr *) &task->dst, + sizeof(task->dst))); struct swim_meta_header_bin header; swim_meta_header_bin_create(&header, &scheduler->transport.addr); memcpy(task->packet.meta, &header, sizeof(header)); diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index 68fb89818..bc62a29ce 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -201,6 +201,10 @@ struct swim_task { struct sockaddr_in dst; /** Place in a queue of tasks. */ struct rlist in_queue_output; + /** + * A short description of the packet content. For logging. + */ + const char *desc; }; /** @@ -213,7 +217,7 @@ swim_task_send(struct swim_task *task, const struct sockaddr_in *dst, /** Initialize the task, without scheduling. */ void swim_task_create(struct swim_task *task, swim_task_f complete, - swim_task_f cancel); + swim_task_f cancel, const char *desc); /** Destroy the task, pop from the queue. */ static inline void diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 9c0d49657..bf4c09b24 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -288,6 +288,7 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos, if (swim_decode_map(pos, end, &size, prefix, "root") != 0) return -1; memset(def, 0, sizeof(*def)); + def->src.sin_family = AF_INET; for (uint32_t i = 0; i < size; ++i) { uint64_t key; if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) diff --git a/test/unit/swim.c b/test/unit/swim.c index 29e9eb4f4..921fc8f07 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -109,7 +109,7 @@ swim_test_uuid_update(void) static void swim_test_cfg(void) { - swim_start_test(15); + swim_start_test(16); struct swim *s = swim_new(); assert(s != NULL); @@ -123,6 +123,9 @@ swim_test_cfg(void) is(swim_cfg(s, uri, -1, &uuid), 0, "configured first time"); is(swim_cfg(s, NULL, -1, NULL), 0, "second time can omit URI, UUID"); is(swim_cfg(s, NULL, 2, NULL), 0, "hearbeat is dynamic"); + const char *self_uri = swim_member_uri(swim_self(s)); + is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\ + "URI"); struct swim *s2 = swim_new(); assert(s2 != NULL); diff --git a/test/unit/swim.result b/test/unit/swim.result index e71d6cfc2..e8991d8d8 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -19,7 +19,7 @@ ok 2 - subtests ok 3 - subtests *** swim_test_uuid_update: done *** *** swim_test_cfg *** - 1..15 + 1..16 ok 1 - first cfg failed - no URI ok 2 - diag says 'mandatory' ok 3 - first cfg failed - no UUID @@ -27,14 +27,15 @@ ok 3 - subtests ok 5 - configured first time ok 6 - second time can omit URI, UUID ok 7 - hearbeat is dynamic - ok 8 - can not use invalid URI - ok 9 - diag says 'invalid uri' - ok 10 - can not use domain names - ok 11 - diag says 'invalid uri' - ok 12 - UNIX sockets are not supported - ok 13 - diag says 'only IP' - ok 14 - can not bind to an occupied port - ok 15 - diag says 'bind' + ok 8 - URI is unchanged after recfg with NULL URI + ok 9 - can not use invalid URI + ok 10 - diag says 'invalid uri' + ok 11 - can not use domain names + ok 12 - diag says 'invalid uri' + ok 13 - UNIX sockets are not supported + ok 14 - diag says 'only IP' + ok 15 - can not bind to an occupied port + ok 16 - diag says 'bind' ok 4 - subtests *** swim_test_cfg: done *** *** swim_test_add_remove *** diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c index 950784aec..ee1fcdbb7 100644 --- a/test/unit/swim_test_ev.c +++ b/test/unit/swim_test_ev.c @@ -289,12 +289,12 @@ swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *base) void swim_do_loop_step(struct ev_loop *loop) { - say_verbose("Loop watch %f", watch); struct swim_event *next_e, *e = event_heap_top(&event_heap); if (e != NULL) { assert(e->deadline >= watch); /* Multiple events can have the same deadline. */ watch = e->deadline; + say_verbose("Loop watch %f", watch); do { e->process(e, loop); next_e = event_heap_top(&event_heap); diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c index ee50e3922..d6591e969 100644 --- a/test/unit/swim_test_transport.c +++ b/test/unit/swim_test_transport.c @@ -78,6 +78,13 @@ swim_test_packet_new(const char *data, int size, const struct sockaddr_in *src, return p; } +/** Free packet memory. */ +static inline void +swim_test_packet_delete(struct swim_test_packet *p) +{ + free(p); +} + /** Fake file descriptor. */ struct swim_fd { /** File descriptor number visible to libev. */ @@ -122,9 +129,9 @@ swim_fd_close(struct swim_fd *fd) { struct swim_test_packet *i, *tmp; rlist_foreach_entry_safe(i, &fd->recv_queue, in_queue, tmp) - free(i); + swim_test_packet_delete(i); rlist_foreach_entry_safe(i, &fd->send_queue, in_queue, tmp) - free(i); + swim_test_packet_delete(i); rlist_del_entry(fd, in_opened); } @@ -188,7 +195,7 @@ swim_transport_recv(struct swim_transport *transport, void *buffer, size_t size, *addr_size = sizeof(p->src); ssize_t result = MIN((size_t) p->size, size); memcpy(buffer, p->data, result); - free(p); + swim_test_packet_delete(p); return result; } diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 73f8db40f..a92e55233 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -31,6 +31,7 @@ #include "swim_test_utils.h" #include "swim_test_ev.h" #include "swim/swim.h" +#include "swim/swim_ev.h" #include "uuid/tt_uuid.h" #include "trivia/util.h" #include "fiber.h" @@ -111,7 +112,7 @@ swim1_contains_swim2(struct swim *s1, struct swim *s2) } } swim_iterator_close(it); - return false; + return true; } bool @@ -129,13 +130,15 @@ swim_cluster_is_fullmesh(struct swim_cluster *cluster) } int -swim_cluster_wait_fullmesh(struct swim_cluster *cluster, int max_steps) +swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout) { - while (! swim_cluster_is_fullmesh(cluster) && max_steps > 0) { + double deadline = swim_time() + timeout; + while (! swim_cluster_is_fullmesh(cluster)) { + if (swim_time() >= deadline) + return -1; swim_do_loop_step(loop()); - --max_steps; } - return max_steps < 0 ? -1 : 0; + return 0; } bool diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 56036422d..90962b658 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -74,11 +74,9 @@ swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id); bool swim_cluster_is_fullmesh(struct swim_cluster *cluster); -/** - * Wait for fullmesh at most @a max_steps event loop iterations. - */ +/** Wait for fullmesh at most @a timeout fake seconds. */ int -swim_cluster_wait_fullmesh(struct swim_cluster *cluster, int max_steps); +swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout); #define swim_start_test(n) { \ header(); \ -- 2.17.2 (Apple Git-113)