From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id BC4FA2A4F9 for ; Wed, 20 Mar 2019 06:49:26 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id iO19rX4_WP_S for ; Wed, 20 Mar 2019 06:49:26 -0400 (EDT) Received: from smtpng2.m.smailru.net (smtpng2.m.smailru.net [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 5F2A42A4FB for ; Wed, 20 Mar 2019 06:49:25 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH 6/6] [RAW] swim: introduce failure detection component Date: Wed, 20 Mar 2019 13:49:19 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Failure detection components allows to find which members are already dead. Part of #3234 --- src/lib/swim/swim.c | 450 +++++++++++++++++++++++++++++++- src/lib/swim/swim.h | 36 ++- src/lib/swim/swim_io.c | 23 +- src/lib/swim/swim_io.h | 16 ++ src/lib/swim/swim_proto.c | 82 +++++- src/lib/swim/swim_proto.h | 101 ++++++- test/unit/swim.c | 160 +++++++++++- test/unit/swim.result | 34 ++- test/unit/swim_test_transport.c | 16 +- test/unit/swim_test_transport.h | 9 + test/unit/swim_test_utils.c | 71 ++++- test/unit/swim_test_utils.h | 30 +++ 12 files changed, 991 insertions(+), 37 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index df34ce247..f97a2f993 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -38,6 +38,8 @@ #include "info/info.h" #include "assoc.h" #include "sio.h" +#define HEAP_FORWARD_DECLARATION +#include "salad/heap.h" /** * SWIM - Scalable Weakly-consistent Infection-style Process Group @@ -135,6 +137,31 @@ enum { * value. */ HEARTBEAT_RATE_DEFAULT = 1, + /** + * If a ping was sent, it is considered to be lost after + * this time without an ack. Nothing special in this + * value. + */ + ACK_TIMEOUT_DEFAULT = 30, + /** + * If a member has not been responding to pings this + * number of times, it is considered to be dead. + * According to the SWIM paper, for a member it is enough + * to do not respond on one direct ping, and on K + * simultanous indirect pings, to be considered as dead. + * Seems too little, so here it is bigger. + */ + NO_ACKS_TO_DEAD = 3, + /** + * If a member confirmed to be dead, it is removed from + * the membership after at least this number of + * unacknowledged pings. According to the SWIM paper, a + * dead member is deleted immediately. But here it is held + * for a while to 1) maybe refute its dead status, 2) + * disseminate the status via dissemination and + * anti-entropy components. + */ + NO_ACKS_TO_GC_DEFAULT = 2, }; /** @@ -213,6 +240,31 @@ struct swim_member { * Position in a queue of members in the current round. */ struct rlist in_round_queue; + /** + * + * Failure detection component + */ + /** Growing number to refute old messages. */ + uint64_t incarnation; + /** + * How many pings did not receive an ack in a row being in + * the current status. After a threshold the instance is + * marked as dead. After more it is removed from the + * table. On each status or incarnation change this + * counter is reset. + */ + int unacknowledged_pings; + /** + * When the latest ping is considered to be + * unacknowledged. + */ + double ping_deadline; + /** Ready at hand regular ACK task. */ + struct swim_task ack_task; + /** Ready at hand regular PING task. */ + struct swim_task ping_task; + /** Position in a queue of members waiting for an ack. */ + struct heap_node in_wait_ack_heap; }; #define mh_name _swim_table @@ -230,6 +282,12 @@ struct mh_swim_table_key { #define MH_SOURCE 1 #include "salad/mhash.h" +#define HEAP_NAME wait_ack_heap +#define HEAP_LESS(h, a, b) ((a)->ping_deadline < (b)->ping_deadline) +#define heap_value_t struct swim_member +#define heap_value_attr in_wait_ack_heap +#include "salad/heap.h" + /** * SWIM instance. Stores configuration, manages periodical tasks, * rounds. Each member has an object of this type on its host, @@ -285,8 +343,80 @@ struct swim { * starting from this instance. */ mh_int_t iterator; + /** + * + * Failure detection component + */ + /** + * Members waiting for an ACK. On too long absence of an + * ACK a member is considered to be dead and is removed. + * The heap is sorted by deadline in ascending order + * (bottom is newer, top is older). + */ + heap_t wait_ack_heap; + /** Generator of ack checking events. */ + struct ev_timer wait_ack_tick; + /** + * How many pings to a dead member should be + * unacknowledged to delete it from the member table. + */ + int no_acks_to_gc; }; +/** Put the member into a list of ACK waiters. */ +static void +swim_wait_ack(struct swim *swim, struct swim_member *member) +{ + if (heap_node_is_stray(&member->in_wait_ack_heap)) { + member->ping_deadline = swim_time() + swim->wait_ack_tick.at; + wait_ack_heap_insert(&swim->wait_ack_heap, member); + swim_ev_timer_start(loop(), &swim->wait_ack_tick); + } +} + +/** + * Make all needed actions to process a member's update like a + * change of its status, or incarnation, or both. + */ +static void +swim_on_member_update(struct swim *swim, struct swim_member *member) +{ + (void) swim; + member->unacknowledged_pings = 0; +} + +/** + * Update status and incarnation of the member if needed. Statuses + * are compared as a compound key: {incarnation, status}. So @a + * new_status can override an old one only if its incarnation is + * greater, or the same, but its status is "bigger". Statuses are + * compared by their identifier, so "alive" < "dead". This + * protects from the case when a member is detected as dead on one + * instance, but overriden by another instance with the same + * incarnation "alive" message. + */ +static inline void +swim_update_member_inc_status(struct swim *swim, struct swim_member *member, + enum swim_member_status new_status, + uint64_t incarnation) +{ + /* + * Source of truth about self is this instance and it is + * never updated from remote. Refutation is handled + * separately. + */ + assert(member != swim->self); + if (member->incarnation < incarnation) { + member->status = new_status; + member->incarnation = incarnation; + swim_on_member_update(swim, member); + } else if (member->incarnation == incarnation && + member->status < new_status) { + member->status = new_status; + swim_on_member_update(swim, member); + } +} + int swim_fd(const struct swim *swim) { @@ -304,11 +434,37 @@ swim_by_scheduler(struct swim_scheduler *scheduler) return container_of(scheduler, struct swim, scheduler); } +/** + * Once a ping is sent, the member should start waiting for an + * ACK. + */ +static void +swim_ping_task_complete(struct swim_task *task, + struct swim_scheduler *scheduler, int rc) +{ + /* + * If ping send has failed, it makes to sense to wait for + * an ACK. + */ + if (rc < 0) + return; + struct swim *swim = swim_by_scheduler(scheduler); + struct swim_member *m = container_of(task, struct swim_member, + ping_task); + swim_wait_ack(swim, m); +} + /** Free member's resources. */ static inline void swim_member_delete(struct swim_member *member) { assert(rlist_empty(&member->in_round_queue)); + + /* Failure detection component. */ + assert(heap_node_is_stray(&member->in_wait_ack_heap)); + swim_task_destroy(&member->ack_task); + swim_task_destroy(&member->ping_task); + free(member); } @@ -330,7 +486,7 @@ swim_reserve_one_member(struct swim *swim) /** Create a new member. It is not registered anywhere here. */ static struct swim_member * swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status) + enum swim_member_status status, uint64_t incarnation) { struct swim_member *member = (struct swim_member *) calloc(1, sizeof(*member)); @@ -343,6 +499,13 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid, member->uuid = *uuid; member->hash = swim_uuid_hash(uuid); rlist_create(&member->in_round_queue); + + /* Failure detection component. */ + member->incarnation = incarnation; + heap_node_create(&member->in_wait_ack_heap); + swim_task_create(&member->ack_task, NULL, NULL, "ack"); + swim_task_create(&member->ping_task, swim_ping_task_complete, NULL, + "ping"); return member; } @@ -360,6 +523,11 @@ swim_delete_member(struct swim *swim, struct swim_member *member) assert(rc != mh_end(swim->members)); mh_swim_table_del(swim->members, rc, NULL); rlist_del_entry(member, in_round_queue); + + /* Failure detection component. */ + if (! heap_node_is_stray(&member->in_wait_ack_heap)) + wait_ack_heap_delete(&swim->wait_ack_heap, member); + swim_member_delete(member); } @@ -383,7 +551,8 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid) */ static struct swim_member * swim_new_member(struct swim *swim, const struct sockaddr_in *addr, - const struct tt_uuid *uuid, enum swim_member_status status) + const struct tt_uuid *uuid, enum swim_member_status status, + uint64_t incarnation) { int new_bsize = sizeof(swim->shuffled[0]) * (mh_size(swim->members) + 1); @@ -394,7 +563,17 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, return NULL; } swim->shuffled = new_shuffled; - struct swim_member *member = swim_member_new(addr, uuid, status); + /* + * Reserve one more slot to never fail push into the ack + * waiters heap. + */ + if (wait_ack_heap_reserve(&swim->wait_ack_heap) != 0) { + diag_set(OutOfMemory, sizeof(struct heap_node), "realloc", + "wait_ack_heap"); + return NULL; + } + struct swim_member *member = + swim_member_new(addr, uuid, status, incarnation); if (member == NULL) return NULL; assert(swim_find_member(swim, uuid) == NULL); @@ -489,7 +668,7 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet) if (swim_packet_reserve(packet, new_size) == NULL) break; swim_member_bin_fill(&member_bin, &m->addr, &m->uuid, - m->status); + m->status, m->incarnation); memcpy(pos + size, &member_bin, sizeof(member_bin)); size = new_size; /* @@ -526,6 +705,25 @@ swim_encode_src_uuid(struct swim *swim, struct swim_packet *packet) return 1; } +/** + * Encode failure detection component. + * @retval Number of key-values added to the packet's root map. + */ +static int +swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet, + enum swim_fd_msg_type type) +{ + struct swim_fd_header_bin fd_header_bin; + int size = sizeof(fd_header_bin); + char *pos = swim_packet_alloc(packet, size); + if (pos == NULL) + return 0; + swim_fd_header_bin_create(&fd_header_bin, type, + swim->self->incarnation); + memcpy(pos, &fd_header_bin, size); + return 1; +} + /** Encode SWIM components into a UDP packet. */ static void swim_encode_round_msg(struct swim *swim, struct swim_packet *packet) @@ -534,9 +732,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet) char *header = swim_packet_alloc(packet, 1); int map_size = 0; map_size += swim_encode_src_uuid(swim, packet); + map_size += swim_encode_failure_detection(swim, packet, + SWIM_FD_MSG_PING); map_size += swim_encode_anti_entropy(swim, packet); - assert(mp_sizeof_map(map_size) == 1 && map_size == 2); + assert(mp_sizeof_map(map_size) == 1 && map_size >= 2); mp_encode_map(header, map_size); } @@ -592,8 +792,92 @@ swim_complete_step(struct swim_task *task, struct swim_member *m = rlist_first_entry(&swim->round_queue, struct swim_member, in_round_queue); - if (swim_sockaddr_in_eq(&m->addr, &task->dst)) + if (swim_sockaddr_in_eq(&m->addr, &task->dst)) { rlist_shift(&swim->round_queue); + if (rc > 0) { + /* + * Each round message contains failure + * detection section with a ping. + */ + swim_wait_ack(swim, m); + } + } +} + +/** Schedule send of a failure detection message. */ +static void +swim_send_fd_msg(struct swim *swim, struct swim_task *task, + const struct sockaddr_in *dst, enum swim_fd_msg_type type) +{ + /* + * Reset packet allocator in case if task is being reused. + */ + swim_packet_create(&task->packet); + char *header = swim_packet_alloc(&task->packet, 1); + int map_size = swim_encode_src_uuid(swim, &task->packet); + map_size += swim_encode_failure_detection(swim, &task->packet, type); + assert(map_size == 2); + mp_encode_map(header, map_size); + say_verbose("SWIM %d: schedule %s to %s", swim_fd(swim), + swim_fd_msg_type_strs[type], + sio_strfaddr((struct sockaddr *) dst, sizeof(*dst))); + swim_task_send(task, dst, &swim->scheduler); +} + +/** Schedule send of an ack. */ +static inline void +swim_send_ack(struct swim *swim, struct swim_task *task, + const struct sockaddr_in *dst) +{ + swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK); +} + +/** Schedule send of a ping. */ +static inline void +swim_send_ping(struct swim *swim, struct swim_task *task, + const struct sockaddr_in *dst) +{ + swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_PING); +} + +/** + * Check for unacknowledged pings. A ping is unacknowledged if an + * ack was not received during ack timeout. An unacknowledged ping + * is resent here. + */ +static void +swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events) +{ + assert((events & EV_TIMER) != 0); + (void) events; + struct swim *swim = (struct swim *) t->data; + double current_time = swim_time(); + struct swim_member *m; + while ((m = wait_ack_heap_top(&swim->wait_ack_heap)) != NULL) { + if (current_time < m->ping_deadline) { + swim_ev_timer_start(loop, t); + return; + } + wait_ack_heap_pop(&swim->wait_ack_heap); + ++m->unacknowledged_pings; + switch (m->status) { + case MEMBER_ALIVE: + if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) { + m->status = MEMBER_DEAD; + swim_on_member_update(swim, m); + } + break; + case MEMBER_DEAD: + if (m->unacknowledged_pings >= swim->no_acks_to_gc) { + swim_delete_member(swim, m); + continue; + } + break; + default: + unreachable(); + } + swim_send_ping(swim, &m->ping_task, &m->addr); + } } /** @@ -642,6 +926,7 @@ swim_update_member_uuid(struct swim *swim, struct swim_member *member, say_verbose("SWIM %d: a member has changed its UUID from %s to %s", swim_fd(swim), swim_uuid_str(&old_uuid), swim_uuid_str(new_uuid)); + swim_on_member_update(swim, member); return 0; } @@ -650,8 +935,10 @@ static inline void swim_update_member_addr(struct swim *swim, struct swim_member *member, const struct sockaddr_in *addr) { - (void) swim; - member->addr = *addr; + if (! swim_sockaddr_in_eq(addr, &member->addr)) { + member->addr = *addr; + swim_on_member_update(swim, member); + } } /** @@ -665,13 +952,52 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def) { struct swim_member *member = swim_find_member(swim, &def->uuid); if (member == NULL) { + if (def->status == MEMBER_DEAD && + swim->no_acks_to_gc != SWIM_NO_ACKS_IGNORE) { + /* + * Do not 'resurrect' dead members to + * prevent 'ghost' members. Ghost member + * is a one declared as dead, sent via + * anti-entropy, and removed from local + * member table, but then returned back + * from received anti-entropy, as again + * dead. Such dead members could 'live' + * forever. + */ + return NULL; + } member = swim_new_member(swim, &def->addr, &def->uuid, - def->status); + def->status, def->incarnation); return member; } struct swim_member *self = swim->self; - if (member != self) + if (member != self) { + if (def->incarnation < member->incarnation) + return member; swim_update_member_addr(swim, member, &def->addr); + swim_update_member_inc_status(swim, member, def->status, + def->incarnation); + return member; + } + /* + * It is possible that other instances know a bigger + * incarnation of this instance - such thing happens when + * the instance restarts and loses its local incarnation + * number. It will be restored by receiving dissemination + * and anti-entropy messages about self. + */ + if (self->incarnation < def->incarnation) + self->incarnation = def->incarnation; + if (def->status != MEMBER_ALIVE && + def->incarnation == self->incarnation) { + /* + * In the cluster a gossip exists that this + * instance is not alive. Refute this information + * with a bigger incarnation. + */ + self->incarnation++; + swim_on_member_update(swim, member); + } return member; } @@ -699,12 +1025,58 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) return 0; } +/** + * Decode a failure detection message. Schedule acks, process + * acks. + */ +static int +swim_process_failure_detection(struct swim *swim, const char **pos, + const char *end, const struct sockaddr_in *src, + const struct tt_uuid *uuid) +{ + const char *prefix = "invalid failure detection message:"; + struct swim_failure_detection_def def; + struct swim_member_def mdef; + if (swim_failure_detection_def_decode(&def, pos, end, prefix) != 0) + return -1; + say_verbose("SWIM %d: process failure detection's %s", swim_fd(swim), + swim_fd_msg_type_strs[def.type]); + swim_member_def_create(&mdef); + mdef.addr = *src; + mdef.incarnation = def.incarnation; + mdef.uuid = *uuid; + struct swim_member *member = swim_upsert_member(swim, &mdef); + if (member == NULL) + return -1; + if (def.incarnation < member->incarnation) + return 0; + if (def.incarnation == member->incarnation && + member->status != MEMBER_ALIVE) { + member->status = MEMBER_ALIVE; + swim_on_member_update(swim, member); + } + + switch (def.type) { + case SWIM_FD_MSG_PING: + if (! swim_task_is_scheduled(&member->ack_task)) + swim_send_ack(swim, &member->ack_task, &member->addr); + break; + case SWIM_FD_MSG_ACK: + member->unacknowledged_pings = 0; + if (! heap_node_is_stray(&member->in_wait_ack_heap)) + wait_ack_heap_delete(&swim->wait_ack_heap, member); + break; + default: + unreachable(); + } + return 0; +} + /** Process a new message. */ static void swim_on_input(struct swim_scheduler *scheduler, const char *pos, const char *end, const struct sockaddr_in *src) { - (void) src; const char *prefix = "invalid message:"; struct swim *swim = swim_by_scheduler(scheduler); struct tt_uuid uuid; @@ -734,6 +1106,11 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos, if (swim_process_anti_entropy(swim, &pos, end) != 0) goto error; break; + case SWIM_FAILURE_DETECTION: + if (swim_process_failure_detection(swim, &pos, end, + src, &uuid) != 0) + goto error; + break; default: diag_set(SwimError, "%s unexpected key", prefix); goto error; @@ -766,6 +1143,13 @@ swim_new(void) swim_task_create(&swim->round_step_task, swim_complete_step, NULL, "round packet"); swim_scheduler_create(&swim->scheduler, swim_on_input); + + /* Failure detection component. */ + wait_ack_heap_create(&swim->wait_ack_heap); + swim_ev_timer_init(&swim->wait_ack_tick, swim_check_acks, + ACK_TIMEOUT_DEFAULT, 0); + swim->wait_ack_tick.data = (void *) swim; + swim->no_acks_to_gc = NO_ACKS_TO_GC_DEFAULT; return swim; } @@ -796,7 +1180,7 @@ swim_uri_to_addr(const char *uri, struct sockaddr_in *addr, int swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, - const struct tt_uuid *uuid) + double ack_timeout, int no_acks_to_gc, const struct tt_uuid *uuid) { const char *prefix = "swim.cfg:"; struct sockaddr_in addr; @@ -809,7 +1193,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, "a first config", prefix); return -1; } - swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE); + swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, + 0); if (swim->self == NULL) return -1; } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { @@ -855,14 +1240,25 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, if (swim->round_tick.at != heartbeat_rate && heartbeat_rate > 0) swim_ev_timer_set(&swim->round_tick, heartbeat_rate, 0); + if (swim->wait_ack_tick.at != ack_timeout && ack_timeout > 0) + swim_ev_timer_set(&swim->wait_ack_tick, ack_timeout, 0); + swim_update_member_addr(swim, swim->self, &addr); int rc = swim_update_member_uuid(swim, swim->self, uuid); /* Reserved above. */ assert(rc == 0); (void) rc; + if (no_acks_to_gc >= 0) + swim->no_acks_to_gc = no_acks_to_gc; return 0; } +double +swim_ack_timeout(const struct swim *swim) +{ + return swim->wait_ack_tick.at; +} + bool swim_is_configured(const struct swim *swim) { @@ -883,7 +1279,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) return -1; struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { - member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE); + member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0); return member == NULL ? -1 : 0; } diag_set(SwimError, "%s a member with such UUID already exists", @@ -911,6 +1307,21 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid) return 0; } +int +swim_probe_member(struct swim *swim, const char *uri) +{ + assert(swim_is_configured(swim)); + struct sockaddr_in addr; + if (swim_uri_to_addr(uri, &addr, "swim.probe_member:") != 0) + return -1; + struct swim_task *t = swim_task_new(swim_task_delete_cb, + swim_task_delete_cb, "probe ping"); + if (t == NULL) + return -1; + swim_send_ping(swim, t, &addr); + return 0; +} + void swim_info(struct swim *swim, struct info_handler *info) { @@ -926,6 +1337,7 @@ swim_info(struct swim *swim, struct info_handler *info) info_append_str(info, "status", swim_member_status_strs[m->status]); info_append_str(info, "uuid", swim_uuid_str(&m->uuid)); + info_append_int(info, "incarnation", (int64_t) m->incarnation); info_table_end(info); } info_end(info); @@ -936,14 +1348,18 @@ swim_delete(struct swim *swim) { swim_scheduler_destroy(&swim->scheduler); swim_ev_timer_stop(loop(), &swim->round_tick); + swim_ev_timer_stop(loop(), &swim->wait_ack_tick); swim_task_destroy(&swim->round_step_task); mh_int_t node; mh_foreach(swim->members, node) { struct swim_member *m = *mh_swim_table_node(swim->members, node); rlist_del_entry(m, in_round_queue); + if (! heap_node_is_stray(&m->in_wait_ack_heap)) + wait_ack_heap_delete(&swim->wait_ack_heap, m); swim_member_delete(m); } + wait_ack_heap_destroy(&swim->wait_ack_heap); mh_swim_table_delete(swim->members); free(swim->shuffled); } @@ -1007,3 +1423,9 @@ swim_member_uuid(const struct swim_member *member) { return &member->uuid; } + +uint64_t +swim_member_incarnation(const struct swim_member *member) +{ + return member->incarnation; +} diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index ddb759c3d..23b244d00 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -31,6 +31,7 @@ * SUCH DAMAGE. */ #include +#include #define SWIM_PUBLIC_API #include "swim_proto.h" @@ -38,6 +39,16 @@ extern "C" { #endif +enum { + /** + * Value being used to say that unacknowledged pings + * should not affect a certain decision about member. For + * example, regardless of number of unacked pings, never + * drop a member. + */ + SWIM_NO_ACKS_IGNORE = INT_MAX, +}; + struct info_handler; struct swim; struct tt_uuid; @@ -65,6 +76,14 @@ swim_is_configured(const struct swim *swim); * @heartbeat_rate seconds. It is rather the protocol * speed. Protocol period depends on member count and * @heartbeat_rate. + * @param ack_timeout Time in seconds after which a ping is + * considered to be unacknowledged. + * @param no_acks_to_gc How many pings to a dead member should be + * unacknowledged to delete it from the member table. Big + * or even almost infinite (SWIM_NO_ACKS_IGNORE) values + * could be useful, if SWIM is used mainly for monitoring + * of existing nodes with manual removal of dead ones, and + * probably with only single initial discovery. * @param uuid UUID of this instance. Must be unique over the * cluster. * @@ -73,7 +92,11 @@ swim_is_configured(const struct swim *swim); */ int swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, - const struct tt_uuid *uuid); + double ack_timeout, int no_acks_to_gc, const struct tt_uuid *uuid); + +/** SWIM's ACK timeout, previously set via @sa swim_cfg. */ +double +swim_ack_timeout(const struct swim *swim); /** * Stop listening and broadcasting messages, cleanup all internal @@ -98,6 +121,13 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid); int swim_remove_member(struct swim *swim, const struct tt_uuid *uuid); +/** + * Send a ping to this address. If an ACK is received, the member + * will be added. + */ +int +swim_probe_member(struct swim *swim, const char *uri); + /** Dump member statuses into @a info. */ void swim_info(struct swim *swim, struct info_handler *info); @@ -148,6 +178,10 @@ swim_member_uri(const struct swim_member *member); const struct tt_uuid * swim_member_uuid(const struct swim_member *member); +/** Member's incarnation. */ +uint64_t +swim_member_incarnation(const struct swim_member *member); + #if defined(__cplusplus) } #endif diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index 015968a0d..504f64f32 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -67,11 +67,32 @@ swim_task_create(struct swim_task *task, swim_task_f complete, rlist_create(&task->in_queue_output); } +struct swim_task * +swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc) +{ + struct swim_task *task = (struct swim_task *) malloc(sizeof(*task)); + if (task == NULL) { + diag_set(OutOfMemory, sizeof(*task), "malloc", "task"); + return NULL; + } + swim_task_create(task, complete, cancel, desc); + return task; +} + +void +swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler, + int rc) +{ + (void) rc; + (void) scheduler; + free(task); +} + /** Put the task into the queue of output tasks. */ static inline void swim_task_schedule(struct swim_task *task, struct swim_scheduler *scheduler) { - assert(rlist_empty(&task->in_queue_output)); + assert(! swim_task_is_scheduled(task)); rlist_add_tail_entry(&scheduler->queue_output, task, in_queue_output); swim_ev_io_start(loop(), &scheduler->output); } diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index bc62a29ce..a6bc28ad3 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -207,6 +207,13 @@ struct swim_task { const char *desc; }; +/** Check if @a task is already scheduled. */ +static inline bool +swim_task_is_scheduled(struct swim_task *task) +{ + return ! rlist_empty(&task->in_queue_output); +} + /** * Put the task into a queue of tasks. Eventually it will be sent. */ @@ -219,6 +226,15 @@ void swim_task_create(struct swim_task *task, swim_task_f complete, swim_task_f cancel, const char *desc); +/** Allocate and create a new task. */ +struct swim_task * +swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc); + +/** Callback to delete a task after its completion. */ +void +swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler, + int rc); + /** Destroy the task, pop from the queue. */ static inline void swim_task_destroy(struct swim_task *task) diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index bf4c09b24..93b7938b6 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -36,6 +36,12 @@ const char *swim_member_status_strs[] = { "alive", + "dead", +}; + +const char *swim_fd_msg_type_strs[] = { + "ping", + "ack", }; int @@ -182,6 +188,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member uuid") != 0) return -1; break; + case SWIM_MEMBER_INCARNATION: + if (swim_decode_uint(pos, end, &def->incarnation, prefix, + "member incarnation") != 0) + return -1; + break; default: unreachable(); } @@ -229,6 +240,70 @@ swim_src_uuid_bin_create(struct swim_src_uuid_bin *header, memcpy(header->v_uuid, uuid, UUID_LEN); } +void +swim_fd_header_bin_create(struct swim_fd_header_bin *header, + enum swim_fd_msg_type type, uint64_t incarnation) +{ + header->k_header = SWIM_FAILURE_DETECTION; + header->m_header = 0x82; + + header->k_type = SWIM_FD_MSG_TYPE; + header->v_type = type; + + header->k_incarnation = SWIM_FD_INCARNATION; + header->m_incarnation = 0xcf; + header->v_incarnation = mp_bswap_u64(incarnation); +} + +int +swim_failure_detection_def_decode(struct swim_failure_detection_def *def, + const char **pos, const char *end, + const char *prefix) +{ + uint32_t size; + if (swim_decode_map(pos, end, &size, prefix, "root") != 0) + return -1; + memset(def, 0, sizeof(*def)); + def->type = swim_fd_msg_type_MAX; + if (size != 2) { + diag_set(SwimError, "%s root map should have two keys - "\ + "message type and incarnation", prefix); + return -1; + } + for (int i = 0; i < (int) size; ++i) { + uint64_t key; + if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) + return -1; + switch(key) { + case SWIM_FD_MSG_TYPE: + if (swim_decode_uint(pos, end, &key, prefix, + "message type") != 0) + return -1; + if (key >= swim_fd_msg_type_MAX) { + diag_set(SwimError, "%s unknown message type", + prefix); + return -1; + } + def->type = key; + break; + case SWIM_FD_INCARNATION: + if (swim_decode_uint(pos, end, &def->incarnation, + prefix, "incarnation") != 0) + return -1; + break; + default: + diag_set(SwimError, "%s unexpected key", prefix); + return -1; + } + } + if (def->type == swim_fd_msg_type_MAX) { + diag_set(SwimError, "%s message type should be specified", + prefix); + return -1; + } + return 0; +} + void swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, uint16_t batch_size) @@ -241,18 +316,19 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, void swim_member_bin_fill(struct swim_member_bin *header, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status) + enum swim_member_status status, uint64_t incarnation) { header->v_status = status; header->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr)); header->v_port = mp_bswap_u16(ntohs(addr->sin_port)); memcpy(header->v_uuid, uuid, UUID_LEN); + header->v_incarnation = mp_bswap_u64(incarnation); } void swim_member_bin_create(struct swim_member_bin *header) { - header->m_header = 0x84; + header->m_header = 0x85; header->k_status = SWIM_MEMBER_STATUS; header->k_addr = SWIM_MEMBER_ADDRESS; header->m_addr = 0xce; @@ -261,6 +337,8 @@ swim_member_bin_create(struct swim_member_bin *header) header->k_uuid = SWIM_MEMBER_UUID; header->m_uuid = 0xc4; header->m_uuid_len = UUID_LEN; + header->k_incarnation = SWIM_MEMBER_INCARNATION; + header->m_incarnation = 0xcf; } void diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index 4f3cdf03d..c120f5733 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -49,12 +49,20 @@ * | | * | AND | * | | + * | SWIM_FAILURE_DETECTION: { | + * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | + * | SWIM_FD_INCARNATION: uint | + * | }, | + * | | + * | OR/AND | + * | | * | SWIM_ANTI_ENTROPY: [ | * | { | * | SWIM_MEMBER_STATUS: uint, enum member_status, | * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | - * | SWIM_MEMBER_UUID: 16 byte UUID | + * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_INCARNATION: uint | * | }, | * | ... | * | ], | @@ -68,6 +76,11 @@ enum swim_member_status { /** The instance is ok, responds to requests. */ MEMBER_ALIVE = 0, + /** + * The member is considered to be dead. It will disappear + * from the membership after some unacknowledged pings. + */ + MEMBER_DEAD, swim_member_status_MAX, }; @@ -87,6 +100,7 @@ extern const char *swim_member_status_strs[]; struct swim_member_def { struct tt_uuid uuid; struct sockaddr_in addr; + uint64_t incarnation; enum swim_member_status status; }; @@ -117,6 +131,7 @@ swim_member_def_decode(struct swim_member_def *def, const char **pos, enum swim_body_key { SWIM_SRC_UUID = 0, SWIM_ANTI_ENTROPY, + SWIM_FAILURE_DETECTION, }; /** @@ -139,6 +154,79 @@ void swim_src_uuid_bin_create(struct swim_src_uuid_bin *header, const struct tt_uuid *uuid); +/** {{{ Failure detection component */ + +/** Failure detection component keys. */ +enum swim_fd_key { + /** Type of the failure detection message: ping or ack. */ + SWIM_FD_MSG_TYPE, + /** + * Incarnation of the sender. To make the member alive if + * it was considered to be dead, but ping/ack with greater + * incarnation was received from it. + */ + SWIM_FD_INCARNATION, +}; + +/** Failure detection message type. */ +enum swim_fd_msg_type { + SWIM_FD_MSG_PING, + SWIM_FD_MSG_ACK, + swim_fd_msg_type_MAX, +}; + +extern const char *swim_fd_msg_type_strs[]; + +/** SWIM failure detection MessagePack header template. */ +struct PACKED swim_fd_header_bin { + /** mp_encode_uint(SWIM_FAILURE_DETECTION) */ + uint8_t k_header; + /** mp_encode_map(2) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_FD_MSG_TYPE) */ + uint8_t k_type; + /** mp_encode_uint(enum swim_fd_msg_type) */ + uint8_t v_type; + + /** mp_encode_uint(SWIM_FD_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + +/** Initialize failure detection section. */ +void +swim_fd_header_bin_create(struct swim_fd_header_bin *header, + enum swim_fd_msg_type type, uint64_t incarnation); + +/** A decoded failure detection message. */ +struct swim_failure_detection_def { + /** Type of the message. */ + enum swim_fd_msg_type type; + /** Incarnation of the sender. */ + uint64_t incarnation; +}; + +/** + * Decode failure detection from a MessagePack buffer. + * @param[out] def Definition to decode into. + * @param[in][out] pos Start of the MessagePack buffer. + * @param end End of the MessagePack buffer. + * @param prefix A prefix of an error message to use for diag_set, + * when something is wrong. + * + * @retval 0 Success. + * @retval -1 Error. + */ +int +swim_failure_detection_def_decode(struct swim_failure_detection_def *def, + const char **pos, const char *end, + const char *prefix); + +/** }}} Failure detection component */ + /** {{{ Anti-entropy component */ /** @@ -150,6 +238,7 @@ enum swim_member_key { SWIM_MEMBER_ADDRESS, SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, + SWIM_MEMBER_INCARNATION, swim_member_key_MAX, }; @@ -172,7 +261,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, * anti-entropy section. */ struct PACKED swim_member_bin { - /** mp_encode_map(4) */ + /** mp_encode_map(5) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -198,6 +287,12 @@ struct PACKED swim_member_bin { uint8_t m_uuid; uint8_t m_uuid_len; uint8_t v_uuid[UUID_LEN]; + + /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; }; /** Initialize antri-entropy record. */ @@ -213,7 +308,7 @@ swim_member_bin_create(struct swim_member_bin *header); void swim_member_bin_fill(struct swim_member_bin *header, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status); + enum swim_member_status status, uint64_t incarnation); /** }}} Anti-entropy component */ diff --git a/test/unit/swim.c b/test/unit/swim.c index ea60be4ae..80787a0c7 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -105,11 +105,11 @@ swim_test_uuid_update(void) struct swim *s = swim_cluster_node(cluster, 0); struct tt_uuid new_uuid = uuid_nil; new_uuid.time_low = 1000; - is(swim_cfg(s, NULL, -1, &new_uuid), 0, "UUID update"); + is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), 0, "UUID update"); is(swim_cluster_wait_fullmesh(cluster, 1), 0, "old UUID is returned back as a 'ghost' member"); new_uuid.time_low = 2; - is(swim_cfg(s, NULL, -1, &new_uuid), -1, + is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), -1, "can not update to an existing UUID - swim_cfg fails"); ok(swim_error_check_match("exists"), "diag says 'exists'"); swim_cluster_delete(cluster); @@ -124,16 +124,16 @@ swim_test_cfg(void) struct swim *s = swim_new(); assert(s != NULL); - is(swim_cfg(s, NULL, -1, NULL), -1, "first cfg failed - no URI"); + is(swim_cfg(s, NULL, -1, -1, -1, NULL), -1, "first cfg failed - no URI"); ok(swim_error_check_match("mandatory"), "diag says 'mandatory'"); const char *uri = "127.0.0.1:1"; - is(swim_cfg(s, uri, -1, NULL), -1, "first cfg failed - no UUID"); + is(swim_cfg(s, uri, -1, -1, -1, NULL), -1, "first cfg failed - no UUID"); ok(swim_error_check_match("mandatory"), "diag says 'mandatory'"); struct tt_uuid uuid = uuid_nil; uuid.time_low = 1; - is(swim_cfg(s, uri, -1, &uuid), 0, "configured first time"); - is(swim_cfg(s, NULL, -1, NULL), 0, "second time can omit URI, UUID"); - is(swim_cfg(s, NULL, 2, NULL), 0, "hearbeat is dynamic"); + is(swim_cfg(s, uri, -1, -1, -1, &uuid), 0, "configured first time"); + is(swim_cfg(s, NULL, -1, -1, -1, NULL), 0, "second time can omit URI, UUID"); + is(swim_cfg(s, NULL, 2, 2, -1, NULL), 0, "hearbeat is dynamic"); const char *self_uri = swim_member_uri(swim_self(s)); is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\ "URI"); @@ -145,14 +145,16 @@ swim_test_cfg(void) const char *bad_uri3 = "unix/:/home/gerold103/any/dir"; struct tt_uuid uuid2 = uuid_nil; uuid2.time_low = 2; - is(swim_cfg(s2, bad_uri1, -1, &uuid2), -1, "can not use invalid URI"); + is(swim_cfg(s2, bad_uri1, -1, -1, -1, &uuid2), -1, + "can not use invalid URI"); ok(swim_error_check_match("invalid uri"), "diag says 'invalid uri'"); - is(swim_cfg(s2, bad_uri2, -1, &uuid2), -1, "can not use domain names"); + is(swim_cfg(s2, bad_uri2, -1, -1, -1, &uuid2), -1, + "can not use domain names"); ok(swim_error_check_match("invalid uri"), "diag says 'invalid uri'"); - is(swim_cfg(s2, bad_uri3, -1, &uuid2), -1, + is(swim_cfg(s2, bad_uri3, -1, -1, -1, &uuid2), -1, "UNIX sockets are not supported"); ok(swim_error_check_match("only IP"), "diag says 'only IP'"); - is(swim_cfg(s2, uri, -1, &uuid2), -1, + is(swim_cfg(s2, uri, -1, -1, -1, &uuid2), -1, "can not bind to an occupied port"); ok(swim_error_check_match("bind"), "diag says 'bind'"); swim_delete(s2); @@ -226,10 +228,140 @@ swim_test_add_remove(void) swim_finish_test(); } +static void +swim_test_basic_failure_detection(void) +{ + swim_start_test(7); + struct swim_cluster *cluster = swim_cluster_new(2); + swim_cluster_set_ack_timeout(cluster, 0.5); + + swim_cluster_add_link(cluster, 0, 1); + is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE, + "node is added as alive"); + swim_cluster_block_io(cluster, 1); + is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 2.4), -1, + "member still is not dead after 2 noacks"); + is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 0.1), 0, + "but it is dead after one more"); + + is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX, + 0.9), -1, + "after 1 more unack the member still is not deleted"); + is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX, + 0.1), 0, "but it is dropped after 1 more"); + + /* + * After IO unblock pending messages will be processed all + * at once. S2 will learn about S1. After one more round + * step it should be fullmesh. + */ + swim_cluster_unblock_io(cluster, 1); + is(swim_cluster_wait_fullmesh(cluster, 1), 0, "fullmesh is restored"); + + /* A member can be removed during an ACK wait. */ + swim_cluster_block_io(cluster, 1); + /* Next round after 1 sec + let ping hang for 0.25 sec. */ + swim_run_for(1.25); + struct swim *s1 = swim_cluster_node(cluster, 0); + struct swim *s2 = swim_cluster_node(cluster, 1); + const struct swim_member *s2_self = swim_self(s2); + swim_remove_member(s1, swim_member_uuid(s2_self)); + swim_cluster_unblock_io(cluster, 1); + swim_run_for(0.1); + is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE, + "a member is added back on an ACK"); + + swim_cluster_delete(cluster); + swim_finish_test(); +} + +static void +swim_test_basic_gossip(void) +{ + swim_start_test(3); + struct swim_cluster *cluster = swim_cluster_new(3); + swim_cluster_set_ack_timeout(cluster, 10); + /* + * Test basic gossip. S1 and S2 know each other. Then S2 + * starts losing packets. S1 does not receive 2 ACKs from + * S2. Then S3 joins the cluster and explicitly learns + * about S1 and S2. After one more unack S1 declares S2 as + * dead, and via anti-entropy S3 learns the same. Even + * earlier than it could discover the same via its own + * pings to S2. + */ + swim_cluster_add_link(cluster, 0, 1); + swim_cluster_add_link(cluster, 1, 0); + swim_cluster_set_drop(cluster, 1, true); + /* + * Wait two no-ACKs on S1 from S2. +1 sec to send a first + * ping. + */ + swim_run_for(20 + 1); + swim_cluster_add_link(cluster, 0, 2); + swim_cluster_add_link(cluster, 2, 1); + is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 10), 0, + "S1 sees S2 as dead"); + is(swim_cluster_member_status(cluster, 2, 1), MEMBER_ALIVE, + "S3 still thinks that S2 is alive"); + /* + * At most after two round steps S1 sends 'S2 is dead' to + * S3. + */ + is(swim_cluster_wait_status(cluster, 2, 1, MEMBER_DEAD, 2), 0, + "S3 learns about dead S2 from S1"); + + swim_cluster_delete(cluster); + swim_finish_test(); +} + +static void +swim_test_probe(void) +{ + swim_start_test(2); + struct swim_cluster *cluster = swim_cluster_new(2); + + struct swim *s1 = swim_cluster_node(cluster, 0); + struct swim *s2 = swim_cluster_node(cluster, 1); + const char *s2_uri = swim_member_uri(swim_self(s2)); + is(swim_probe_member(s1, s2_uri), 0, "send probe"); + is(swim_cluster_wait_fullmesh(cluster, 0.1), 0, + "receive ACK on probe and get fullmesh") + + swim_cluster_delete(cluster); + swim_finish_test(); +} + +static void +swim_test_refute(void) +{ + swim_start_test(4); + struct swim_cluster *cluster = swim_cluster_new(2); + swim_cluster_set_ack_timeout(cluster, 2); + + swim_cluster_add_link(cluster, 0, 1); + swim_cluster_set_drop(cluster, 1, true); + fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 7) != 0); + swim_cluster_set_drop(cluster, 1, false); + is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, + "S2 increments its own incarnation to refute its death"); + is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 1), 0, + "new incarnation has reached S1 with a next round message"); + + swim_cluster_restart_node(cluster, 1); + is(swim_cluster_member_incarnation(cluster, 1, 1), 0, + "after restart S2's incarnation is 0 again"); + is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0, + "S2 learned its old bigger incarnation 1 from S0"); + + swim_cluster_delete(cluster); + swim_finish_test(); +} + static int main_f(va_list ap) { - swim_start_test(5); + swim_start_test(9); (void) ap; swim_test_ev_init(); @@ -240,6 +372,10 @@ main_f(va_list ap) swim_test_uuid_update(); swim_test_cfg(); swim_test_add_remove(); + swim_test_basic_failure_detection(); + swim_test_probe(); + swim_test_refute(); + swim_test_basic_gossip(); swim_test_transport_free(); swim_test_ev_free(); diff --git a/test/unit/swim.result b/test/unit/swim.result index 42cef1612..2503cbfcd 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -1,5 +1,5 @@ *** main_f *** -1..5 +1..9 *** swim_test_one_link *** 1..6 ok 1 - no rounds - no fullmesh @@ -60,4 +60,36 @@ ok 4 - subtests ok 13 - back in fullmesh after a member removal in the middle of a step ok 5 - subtests *** swim_test_add_remove: done *** + *** swim_test_basic_failure_detection *** + 1..7 + ok 1 - node is added as alive + ok 2 - member still is not dead after 2 noacks + ok 3 - but it is dead after one more + ok 4 - after 1 more unack the member still is not deleted + ok 5 - but it is dropped after 1 more + ok 6 - fullmesh is restored + ok 7 - a member is added back on an ACK +ok 6 - subtests + *** swim_test_basic_failure_detection: done *** + *** swim_test_probe *** + 1..2 + ok 1 - send probe + ok 2 - receive ACK on probe and get fullmesh +ok 7 - subtests + *** swim_test_probe: done *** + *** swim_test_refute *** + 1..4 + ok 1 - S2 increments its own incarnation to refute its death + ok 2 - new incarnation has reached S1 with a next round message + ok 3 - after restart S2's incarnation is 0 again + ok 4 - S2 learned its old bigger incarnation 1 from S0 +ok 8 - subtests + *** swim_test_refute: done *** + *** swim_test_basic_gossip *** + 1..3 + ok 1 - S1 sees S2 as dead + ok 2 - S3 still thinks that S2 is alive + ok 3 - S3 learns about dead S2 from S1 +ok 9 - subtests + *** swim_test_basic_gossip: done *** *** main_f: done *** diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c index d1c3e97d7..d121292b1 100644 --- a/test/unit/swim_test_transport.c +++ b/test/unit/swim_test_transport.c @@ -95,7 +95,11 @@ struct swim_fd { * blocked, new messages are queued, but not delivered. */ bool is_opened; - + /** + * True if any message sent to that fd should be just + * dropped, not queued. + */ + bool is_dropping; /** * Link in the list of opened and non-blocked descriptors. * Used to feed them all EV_WRITE. @@ -260,6 +264,14 @@ swim_test_transport_unblock_fd(int fd) rlist_add_tail_entry(&swim_fd_active, sfd, in_active); } +void +swim_test_transport_set_drop(int fd, bool value) +{ + struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE]; + if (sfd->is_opened) + sfd->is_dropping = value; +} + /** Send one packet to destination's recv queue. */ static inline void swim_fd_send_packet(struct swim_fd *fd) @@ -270,7 +282,7 @@ swim_fd_send_packet(struct swim_fd *fd) rlist_shift_entry(&fd->send_queue, struct swim_test_packet, in_queue); struct swim_fd *dst = &swim_fd[ntohs(p->dst.sin_port)]; - if (dst->is_opened) + if (dst->is_opened && ! dst->is_dropping && ! fd->is_dropping) rlist_add_tail_entry(&dst->recv_queue, p, in_queue); else swim_test_packet_delete(p); diff --git a/test/unit/swim_test_transport.h b/test/unit/swim_test_transport.h index f8066e636..5a1a92271 100644 --- a/test/unit/swim_test_transport.h +++ b/test/unit/swim_test_transport.h @@ -30,6 +30,7 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include struct ev_loop; /** @@ -58,6 +59,14 @@ swim_test_transport_block_fd(int fd); void swim_test_transport_unblock_fd(int fd); +/** + * Set to true, if all incomming and outgoing packets should be + * dropped. Note, that the node, owning @a fd, thinks, that its + * packets are sent. + */ +void +swim_test_transport_set_drop(int fd, bool value); + /** Initialize test transport system. */ void swim_test_transport_init(void); diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 0d62bb26c..0415a3035 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -62,13 +62,24 @@ swim_cluster_new(int size) assert(res->node[i] != NULL); sprintf(uri, "127.0.0.1:%d", i + 1); uuid.time_low = i + 1; - int rc = swim_cfg(res->node[i], uri, -1, &uuid); + int rc = swim_cfg(res->node[i], uri, -1, -1, -1, &uuid); assert(rc == 0); (void) rc; } return res; } +void +swim_cluster_set_ack_timeout(struct swim_cluster *cluster, double ack_timeout) +{ + for (int i = 0; i < cluster->size; ++i) { + int rc = swim_cfg(cluster->node[i], NULL, -1, ack_timeout, -1, + NULL); + assert(rc == 0); + (void) rc; + } +} + void swim_cluster_delete(struct swim_cluster *cluster) { @@ -107,6 +118,17 @@ swim_cluster_member_status(struct swim_cluster *cluster, int node_id, return swim_member_status(m); } +uint64_t +swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, + int member_id) +{ + const struct swim_member *m = + swim_cluster_member_view(cluster, node_id, member_id); + if (m == NULL) + return UINT64_MAX; + return swim_member_incarnation(m); +} + struct swim * swim_cluster_node(struct swim_cluster *cluster, int i) { @@ -114,6 +136,25 @@ swim_cluster_node(struct swim_cluster *cluster, int i) return cluster->node[i]; } +void +swim_cluster_restart_node(struct swim_cluster *cluster, int i) +{ + assert(i >= 0 && i < cluster->size); + struct swim *s = cluster->node[i]; + const struct swim_member *self = swim_self(s); + struct tt_uuid uuid = *swim_member_uuid(self); + char uri[128]; + strcpy(uri, swim_member_uri(self)); + double ack_timeout = swim_ack_timeout(s); + swim_delete(s); + s = swim_new(); + assert(s != NULL); + int rc = swim_cfg(s, uri, -1, ack_timeout, -1, &uuid); + assert(rc == 0); + (void) rc; + cluster->node[i] = s; +} + void swim_cluster_block_io(struct swim_cluster *cluster, int i) { @@ -126,6 +167,12 @@ swim_cluster_unblock_io(struct swim_cluster *cluster, int i) swim_test_transport_unblock_fd(swim_fd(cluster->node[i])); } +void +swim_cluster_set_drop(struct swim_cluster *cluster, int i, bool value) +{ + swim_test_transport_set_drop(swim_fd(cluster->node[i]), value); +} + /** Check if @a s1 knows every member of @a s2's table. */ static inline bool swim1_contains_swim2(struct swim *s1, struct swim *s2) @@ -186,6 +233,28 @@ swim_run_for(double duration) swim_wait_timeout(duration, false); } +int +swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, + int member_id, enum swim_member_status status, + double timeout) +{ + return swim_wait_timeout(timeout, + swim_cluster_member_status(cluster, node_id, + member_id) == status + ); +} + +int +swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, + int member_id, uint64_t incarnation, + double timeout) +{ + return swim_wait_timeout(timeout, + swim_cluster_member_incarnation(cluster, node_id, + member_id) == incarnation + ); +} + bool swim_error_check_match(const char *msg) { diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index befb95420..8fba5c2da 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -44,6 +44,10 @@ struct swim_cluster; struct swim_cluster * swim_cluster_new(int size); +/** Change ACK timeout of all the instances in the cluster. */ +void +swim_cluster_set_ack_timeout(struct swim_cluster *cluster, double ack_timeout); + /** Delete all the SWIM instances, and the cluster itself. */ void swim_cluster_delete(struct swim_cluster *cluster); @@ -56,6 +60,10 @@ swim_error_check_match(const char *msg); struct swim * swim_cluster_node(struct swim_cluster *cluster, int i); +/** Drop and create again a SWIM instance with id @a i. */ +void +swim_cluster_restart_node(struct swim_cluster *cluster, int i); + /** Block IO on a SWIM instance with id @a i. */ void swim_cluster_block_io(struct swim_cluster *cluster, int i); @@ -64,6 +72,9 @@ swim_cluster_block_io(struct swim_cluster *cluster, int i); void swim_cluster_unblock_io(struct swim_cluster *cluster, int i); +void +swim_cluster_set_drop(struct swim_cluster *cluster, int i, bool value); + /** * Explicitly add a member of id @a from_id to a member of id * @a to_id. @@ -75,6 +86,10 @@ enum swim_member_status swim_cluster_member_status(struct swim_cluster *cluster, int node_id, int member_id); +uint64_t +swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id, + int member_id); + /** * Check if in the cluster every instance knowns the about other * instances. @@ -86,6 +101,21 @@ swim_cluster_is_fullmesh(struct swim_cluster *cluster); int swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout); +/** + * Wait until a member with id @a member_id is seen with @a status + * in the membership table of a member with id @a node_id. At most + * @a timeout seconds. + */ +int +swim_cluster_wait_status(struct swim_cluster *cluster, int node_id, + int member_id, enum swim_member_status status, + double timeout); + +int +swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id, + int member_id, uint64_t incarnation, + double timeout); + /** Process SWIM events for @a duration fake seconds. */ void swim_run_for(double duration); -- 2.17.2 (Apple Git-113)