From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH 2/5] swim: introduce failure detection component Date: Mon, 17 Dec 2018 15:53:20 +0300 Message-Id: <16b1bb5a6d1c5e32d2c9a33afcd966e8c98a2ef6.1545047950.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com List-ID: Failure detection components allows to find which members are already dead. Part of #3234 --- src/lib/swim/swim.c | 484 +++++++++++++++++++++++++++++++++++++++-- test/swim/basic.result | 16 ++ 2 files changed, 478 insertions(+), 22 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 7e5d0eb9e..6b2f1ca0c 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -40,17 +40,15 @@ /** * Possible optimizations: - * - track hash table versions and do not resend when a received - * already knows your version. - * - on small updates send to another node only updates since a - * version. On rare updates it can dramatically reduce message - * size and its encoding time. + * + * Optional: * - do not send self. * - cache encoded batch. * - refute immediately. * - indirect ping. * - increment own incarnation on each round. * - attach dst incarnation to ping. + * - fix swim_member_bin mp_encode_map(2) to 3 in the first patch. */ /** @@ -131,24 +129,45 @@ sockaddr_in_hash(const struct sockaddr_in *a) * tables. The intention to send a data is called IO task and is * stored in a queue that is dispatched when output is possible. */ -typedef void (*swim_io_task_f)(void); +struct swim_io_task; + +typedef void (*swim_io_task_f)(struct swim_io_task *); struct swim_io_task { swim_io_task_f cb; struct rlist in_queue_output; }; +static inline void +swim_io_task_create(struct swim_io_task *task, swim_io_task_f cb) +{ + task->cb = cb; + rlist_create(&task->in_queue_output); +} + +static inline void +swim_io_task_destroy(struct swim_io_task *task) +{ + rlist_del_entry(task, in_queue_output); +} + enum swim_member_status { /** * The instance is ok, it responds to requests, sends its * members table. */ MEMBER_ALIVE = 0, + /** + * The member is considered to be dead. It will disappear + * from the membership, if it is not pinned. + */ + MEMBER_DEAD, swim_member_status_MAX, }; static const char *swim_member_status_strs[] = { "alive", + "dead", }; /** @@ -173,6 +192,38 @@ struct swim_member { * Position in a queue of members in the current round. */ struct rlist in_queue_round; + /** + * + * Failure detection component + */ + /** + * True, if the member is configured explicitly and can + * not disappear from the membership. + */ + bool is_pinned; + /** Growing number to refute old messages. */ + uint64_t incarnation; + /** + * How many pings did not receive an ack in a row. After + * a threshold the instance is marked as dead. After more + * it is removed from the table (if not pinned). + */ + int failed_pings; + /** When the last ping was sent. */ + double ping_ts; + /** + * Ready at hand ack task to send when this member has + * sent ping to us. + */ + struct swim_io_task ack_task; + /** + * Ready at hand ping task to send when this member too + * long does not respond to an initial ping, piggybacked + * with members table. + */ + struct swim_io_task ping_task; + /** Position in a queue of members waiting for an ack. */ + struct rlist in_queue_wait_ack; }; /** @@ -188,8 +239,93 @@ static struct swim_member *self = NULL; */ enum swim_component_type { SWIM_ANTI_ENTROPY = 0, + SWIM_FAILURE_DETECTION, +}; + +/** {{{ Failure detection component */ + +/** Possible failure detection keys. */ +enum swim_fd_key { + /** Type of the failure detection message: ping or ack. */ + SWIM_FD_MSG_TYPE, + /** + * Incarnation of the sender. To make the member alive if + * it was considered to be dead, but ping/ack with greater + * incarnation was received from it. + */ + SWIM_FD_INCARNATION, }; +/** + * Failure detection message now has only two types: ping or ack. + * Indirect ping/ack are todo. + */ +enum swim_fd_msg_type { + SWIM_FD_MSG_PING, + SWIM_FD_MSG_ACK, + swim_fd_msg_type_MAX, +}; + +static const char *swim_fd_msg_type_strs[] = { + "ping", + "ack", +}; + +/** SWIM failure detection MsgPack header template. */ +struct PACKED swim_fd_header_bin { + /** mp_encode_uint(SWIM_FAILURE_DETECTION) */ + uint8_t k_header; + /** mp_encode_map(2) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_FD_MSG_TYPE) */ + uint8_t k_type; + /** mp_encode_uint(enum swim_fd_msg_type) */ + uint8_t v_type; + + /** mp_encode_uint(SWIM_FD_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + +static inline void +swim_fd_header_bin_create(struct swim_fd_header_bin *header, + enum swim_fd_msg_type type) +{ + header->k_header = SWIM_FAILURE_DETECTION; + header->m_header = 0x82; + + header->k_type = SWIM_FD_MSG_TYPE; + header->v_type = type; + + header->k_incarnation = SWIM_FD_INCARNATION; + header->m_incarnation = 0xcf; + header->v_incarnation = mp_bswap_u64(self->incarnation); +} + +/** + * Members waiting for an ACK. On too long absence of ACK a member + * is considered to be dead and is removed. The list is sorted by + * time in ascending order (tail is newer, head is older). + */ +static RLIST_HEAD(queue_wait_ack); +/** Generator of ack checking events. */ +static struct ev_periodic wait_ack_tick; + +static void +swim_member_schedule_ack_wait(struct swim_member *member) +{ + if (rlist_empty(&member->in_queue_wait_ack)) { + member->ping_ts = fiber_time(); + rlist_add_tail_entry(&queue_wait_ack, member, + in_queue_wait_ack); + } +} + +/** }}} Failure detection component */ + /** {{{ Anti-entropy component */ /** @@ -204,6 +340,7 @@ enum swim_member_key { */ SWIM_MEMBER_ADDR, SWIM_MEMBER_PORT, + SWIM_MEMBER_INCARNATION, swim_member_key_MAX, }; @@ -227,7 +364,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, /** SWIM member MsgPack template. */ struct PACKED swim_member_bin { - /** mp_encode_map(3) */ + /** mp_encode_map(4) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -246,6 +383,12 @@ struct PACKED swim_member_bin { /** mp_encode_uint(addr.sin_port) */ uint8_t m_port; uint16_t v_port; + + /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; }; static inline void @@ -255,17 +398,20 @@ swim_member_bin_reset(struct swim_member_bin *header, header->v_status = member->status; header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr); header->v_port = mp_bswap_u16(member->addr.sin_port); + header->v_incarnation = mp_bswap_u64(member->incarnation); } static inline void swim_member_bin_create(struct swim_member_bin *header) { - header->m_header = 0x83; + header->m_header = 0x84; header->k_status = SWIM_MEMBER_STATUS; header->k_addr = SWIM_MEMBER_ADDR; header->m_addr = 0xce; header->k_port = SWIM_MEMBER_PORT; header->m_port = 0xcd; + header->k_incarnation = SWIM_MEMBER_INCARNATION; + header->m_incarnation = 0xcf; } /** @@ -283,7 +429,7 @@ static struct ev_periodic round_tick; * objects and related to them only. */ static void -swim_send_round_msg(void); +swim_send_round_msg(struct swim_io_task *task); static struct swim_io_task round_step_task = { /* .cb = */ swim_send_round_msg, @@ -295,11 +441,19 @@ static struct swim_io_task round_step_task = { /** * SWIM message structure: * { + * SWIM_FAILURE_DETECTION: { + * SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, + * SWIM_FD_INCARNATION: uint + * }, + * + * OR/AND + * * SWIM_ANTI_ENTROPY: [ * { * SWIM_MEMBER_STATUS: uint, enum member_status, * SWIM_MEMBER_ADDR: uint, ip, - * SWIM_MEMBER_PORT: uint, port + * SWIM_MEMBER_PORT: uint, port, + * SWIM_MEMBER_INCARNATION: uint * }, * ... * ], @@ -317,6 +471,22 @@ enum { * networks by their admins. */ UDP_PACKET_SIZE = 1472, + /** + * If a ping was sent, it is considered to be lost after + * this time without an ack. + */ + ACK_TIMEOUT = 1, + /** + * If a member has not been responding to pings this + * number of times, it is considered to be dead. + */ + NO_ACKS_TO_DEAD = 3, + /** + * If a not pinned member confirmed to be dead, it is + * removed from the membership after at least this number + * of failed pings. + */ + NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2, }; /** @@ -330,6 +500,14 @@ static struct evio_service input; */ static struct ev_io output; +/** + * An array of configured members. Used only to easy rollback a + * failed reconfiguration. + */ +static struct swim_member **cfg = NULL; +/** Number of configured members. */ +static int cfg_size = 0; + /** * An array of members shuffled on each round. Its head it sent * to each member during one round as an anti-entropy message. @@ -347,12 +525,44 @@ swim_io_task_push(struct swim_io_task *task) ev_io_start(loop(), &output); } +static void +swim_send_ack(struct swim_io_task *task); + +static void +swim_send_ping(struct swim_io_task *task); + +/** + * Update status of the member if needed. Statuses are compared as + * a compound key: {incarnation, status}. So @a new_status can + * override an old one only if its incarnation is greater, or the + * same, but its status is "bigger". Statuses are compared by + * their identifier, so "alive" < "dead". This protects from the + * case when a member is detected as dead on one instance, but + * overriden by another instance with the same incarnation "alive" + * message. + */ +static inline void +swim_member_update_status(struct swim_member *member, + enum swim_member_status new_status, + uint64_t incarnation) +{ + assert(member != self); + if (member->incarnation == incarnation) { + if (member->status < new_status) + member->status = new_status; + } else if (member->incarnation < incarnation) { + member->status = new_status; + member->incarnation = incarnation; + } +} + /** * Register a new member with a specified status. Here it is * added to the hash, to the 'next' queue. */ static struct swim_member * -swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status) +swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status, + uint64_t incarnation) { struct swim_member *member = (struct swim_member *) malloc(sizeof(*member)); @@ -362,6 +572,10 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status) } member->status = status; member->addr = *addr; + member->incarnation = incarnation; + member->is_pinned = false; + member->failed_pings = 0; + member->ping_ts = 0; struct mh_i64ptr_node_t node; node.key = sockaddr_in_hash(addr); node.val = member; @@ -371,7 +585,10 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status) diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node"); return NULL; } + swim_io_task_create(&member->ack_task, swim_send_ack); + swim_io_task_create(&member->ping_task, swim_send_ping); rlist_add_entry(&queue_round, member, in_queue_round); + rlist_create(&member->in_queue_wait_ack); return member; } @@ -396,7 +613,10 @@ swim_member_delete(struct swim_member *member) mh_int_t rc = mh_i64ptr_find(members, key, NULL); assert(rc != mh_end(members)); mh_i64ptr_del(members, rc, NULL); + swim_io_task_destroy(&member->ack_task); + swim_io_task_destroy(&member->ping_task); rlist_del_entry(member, in_queue_round); + rlist_del_entry(member, in_queue_wait_ack); free(member); } @@ -472,16 +692,22 @@ swim_encode_round_msg(char *buffer, int size) if ((shuffled_members == NULL || rlist_empty(&queue_round)) && swim_new_round() != 0) return -1; - /* 1 - for the root map header. */ - assert(size > 1); - --size; + /* -1 - for the root map header. */ + assert((uint)size > sizeof(struct swim_fd_header_bin) + 1); + size -= sizeof(struct swim_fd_header_bin) + 1; + int ae_batch_size = calculate_bin_batch_size( sizeof(struct swim_anti_entropy_header_bin), sizeof(struct swim_member_bin), size); if (ae_batch_size > shuffled_members_size) ae_batch_size = shuffled_members_size; - buffer = mp_encode_map(buffer, 1); + buffer = mp_encode_map(buffer, 2); + + struct swim_fd_header_bin fd_header_bin; + swim_fd_header_bin_create(&fd_header_bin, SWIM_FD_MSG_PING); + memcpy(buffer, &fd_header_bin, sizeof(fd_header_bin)); + buffer += sizeof(fd_header_bin); struct swim_anti_entropy_header_bin ae_header_bin; swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size); @@ -504,8 +730,10 @@ swim_encode_round_msg(char *buffer, int size) * from the queue. */ static void -swim_send_round_msg(void) +swim_send_round_msg(struct swim_io_task *task) { + (void) task; + assert(task->cb == swim_send_round_msg); char buffer[UDP_PACKET_SIZE]; int size = swim_encode_round_msg(buffer, UDP_PACKET_SIZE); if (size < 0) { @@ -524,11 +752,51 @@ swim_send_round_msg(void) if (sio_sendto(output.fd, buffer, size, 0, (struct sockaddr *) &m->addr, sizeof(m->addr)) == -1 && ! sio_wouldblock(errno)) diag_log(); + swim_member_schedule_ack_wait(m); rlist_del_entry(m, in_queue_round); end: ev_periodic_start(loop(), &round_tick); } +/** Send a failure detection message. */ +static void +swim_send_fd_message(struct swim_member *m, enum swim_fd_msg_type type) +{ + char buffer[UDP_PACKET_SIZE]; + char *pos = mp_encode_map(buffer, 1); + struct swim_fd_header_bin header_bin; + swim_fd_header_bin_create(&header_bin, type); + memcpy(pos, &header_bin, sizeof(header_bin)); + pos += sizeof(header_bin); + assert(pos - buffer <= (int)sizeof(buffer)); + say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type], + sio_strfaddr((struct sockaddr *) &m->addr, + sizeof(m->addr))); + if (sio_sendto(output.fd, buffer, pos - buffer, 0, + (struct sockaddr *) &m->addr, sizeof(m->addr)) == -1 && + ! sio_wouldblock(errno)) + diag_log(); +} + +static void +swim_send_ack(struct swim_io_task *task) +{ + assert(task->cb == swim_send_ack); + struct swim_member *m = container_of(task, struct swim_member, + ack_task); + swim_send_fd_message(m, SWIM_FD_MSG_ACK); +} + +static void +swim_send_ping(struct swim_io_task *task) +{ + assert(task->cb == swim_send_ping); + struct swim_member *m = container_of(task, struct swim_member, + ping_task); + swim_send_fd_message(m, SWIM_FD_MSG_PING); + swim_member_schedule_ack_wait(m); +} + static void swim_on_output(struct ev_loop *loop, struct ev_io *io, int events) { @@ -541,7 +809,7 @@ swim_on_output(struct ev_loop *loop, struct ev_io *io, int events) struct swim_io_task *task = rlist_shift_entry(&queue_output, struct swim_io_task, in_queue_output); - task->cb(); + task->cb(task); } /** Once per specified timeout trigger a next broadcast step. */ @@ -554,12 +822,42 @@ swim_trigger_round_step(struct ev_loop *loop, struct ev_periodic *p, int events) ev_periodic_stop(loop, p); } +/** + * Check for failed pings. A ping is failed if an ack was not + * received during ACK_TIMEOUT. A failed ping is resent here. + */ +static void +swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events) +{ + assert((events & EV_PERIODIC) != 0); + (void) loop; + (void) p; + (void) events; + struct swim_member *m, *tmp; + double current_time = fiber_time(); + rlist_foreach_entry_safe(m, &queue_wait_ack, in_queue_wait_ack, tmp) { + if (current_time - m->ping_ts < ACK_TIMEOUT) + break; + ++m->failed_pings; + if (m->failed_pings >= NO_ACKS_TO_GC) { + if (!m->is_pinned) + swim_member_delete(m); + continue; + } + if (m->failed_pings >= NO_ACKS_TO_DEAD) + m->status = MEMBER_DEAD; + swim_io_task_push(&m->ping_task); + rlist_del_entry(m, in_queue_wait_ack); + } +} + /** * SWIM member attributes from anti-entropy and dissemination * messages. */ struct swim_member_def { struct sockaddr_in addr; + uint64_t incarnation; enum swim_member_status status; }; @@ -569,6 +867,7 @@ swim_member_def_create(struct swim_member_def *def) def->addr.sin_port = 0; def->addr.sin_addr.s_addr = 0; def->status = MEMBER_ALIVE; + def->incarnation = 0; } static void @@ -580,9 +879,34 @@ swim_process_member_update(struct swim_member_def *def) * members table. */ if (member == NULL) { - member = swim_member_new(&def->addr, def->status); + member = swim_member_new(&def->addr, def->status, + def->incarnation); if (member == NULL) diag_log(); + return; + } + if (member != self) { + swim_member_update_status(member, def->status, + def->incarnation); + return; + } + /* + * It is possible that other instances know a bigger + * incarnation of this instance - such thing happens when + * the instance restarts and loses its local incarnation + * number. It will be restored by receiving dissemination + * messages about self. + */ + if (self->incarnation < def->incarnation) + self->incarnation = def->incarnation; + if (def->status != MEMBER_ALIVE && + def->incarnation == self->incarnation) { + /* + * In the cluster a gossip exists that this + * instance is not alive. Refute this information + * with a bigger incarnation. + */ + self->incarnation++; } } @@ -626,6 +950,15 @@ swim_process_member_key(enum swim_member_key key, const char **pos, } def->addr.sin_port = port; break; + case SWIM_MEMBER_INCARNATION: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s member incarnation should be uint", + msg_pref); + return -1; + } + def->incarnation = mp_decode_uint(pos); + break; default: unreachable(); } @@ -677,6 +1010,90 @@ swim_process_anti_entropy(const char **pos, const char *end) return 0; } +/** + * Decode a failure detection message. Schedule pings, process + * acks. + */ +static int +swim_process_failure_detection(const char **pos, const char *end, + const struct sockaddr_in *src) +{ + const char *msg_pref = "Invalid SWIM failure detection message:"; + if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) { + say_error("%s root should be a map", msg_pref); + return -1; + } + uint64_t size = mp_decode_map(pos); + if (size != 2) { + say_error("%s root map should have two keys - message type "\ + "and incarnation", msg_pref); + return -1; + } + enum swim_fd_msg_type type = swim_fd_msg_type_MAX; + uint64_t incarnation = 0; + for (int i = 0; i < (int) size; ++i) { + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s a key should be uint", msg_pref); + return -1; + } + uint64_t key = mp_decode_uint(pos); + switch(key) { + case SWIM_FD_MSG_TYPE: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s message type should be uint", + msg_pref); + return -1; + } + key = mp_decode_uint(pos); + if (key >= swim_fd_msg_type_MAX) { + say_error("%s unknown message type", msg_pref); + return -1; + } + type = key; + break; + case SWIM_FD_INCARNATION: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s incarnation should be uint", + msg_pref); + return -1; + } + incarnation = mp_decode_uint(pos); + break; + default: + say_error("%s unknown key", msg_pref); + return -1; + } + } + if (type == swim_fd_msg_type_MAX) { + say_error("%s message type should be specified", msg_pref); + return -1; + } + struct swim_member *sender = swim_find_member(src); + if (sender == NULL) { + sender = swim_member_new(src, MEMBER_ALIVE, incarnation); + if (sender == NULL) { + diag_log(); + return 0; + } + } else { + swim_member_update_status(sender, MEMBER_ALIVE, incarnation); + } + if (type == SWIM_FD_MSG_PING) { + swim_io_task_push(&sender->ack_task); + } else { + assert(type == SWIM_FD_MSG_ACK); + if (incarnation >= sender->incarnation) { + sender->failed_pings = 0; + rlist_del_entry(&sender->ping_task, in_queue_output); + rlist_del_entry(sender, in_queue_wait_ack); + } + } + return 0; +} + /** Receive and process a new message. */ static void swim_on_input(struct ev_loop *loop, struct ev_io *io, int events) @@ -717,6 +1134,12 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events) if (swim_process_anti_entropy(&pos, end) != 0) return; break; + case SWIM_FAILURE_DETECTION: + say_verbose("SWIM: process failure detection"); + if (swim_process_failure_detection(&pos, end, + &addr) != 0) + return; + break; default: say_error("%s unknown component type component is "\ "supported", msg_pref); @@ -783,7 +1206,9 @@ swim_init(void) swim_init_input(&input); ev_init(&output, swim_on_output); ev_init(&round_tick, swim_trigger_round_step); + ev_init(&wait_ack_tick, swim_check_acks); ev_periodic_set(&round_tick, 0, HEARTBEAT_RATE_DEFAULT, NULL); + ev_periodic_set(&wait_ack_tick, 0, ACK_TIMEOUT, NULL); return 0; } @@ -811,7 +1236,7 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri, goto error; struct swim_member *member = swim_find_member(&addr); if (member == NULL) { - member = swim_member_new(&addr, new_status); + member = swim_member_new(&addr, new_status, 0); if (member == NULL) goto error; } @@ -832,7 +1257,8 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri, new_self = swim_find_member(&addr); if (new_self == NULL) { - new_self = swim_member_new(&addr, new_status); + new_self = swim_member_new(&addr, new_status, + 0); if (new_self == NULL) goto error; } @@ -847,6 +1273,7 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri, ev_io_set(&output, input.ev.fd, EV_WRITE); evio_service_start(&input); ev_periodic_start(loop(), &round_tick); + ev_periodic_start(loop(), &wait_ack_tick); } } @@ -854,9 +1281,15 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri, ev_periodic_set(&round_tick, 0, heartbeat_rate, NULL); if (member_uri_count > 0) { - for (int i = 0; i < new_cfg_size; ++i) + for (int i = 0; i < cfg_size; ++i) + cfg[i]->is_pinned = false; + free(cfg); + for (int i = 0; i < new_cfg_size; ++i) { + new_cfg[i]->is_pinned = true; new_cfg[i]->status = MEMBER_ALIVE; - free(new_cfg); + } + cfg = new_cfg; + cfg_size = new_cfg_size; } if (new_self != NULL && new_self->status == new_status) new_self->status = MEMBER_ALIVE; @@ -893,6 +1326,8 @@ swim_info(struct info_handler *info) sizeof(member->addr))); info_append_str(info, "status", swim_member_status_strs[member->status]); + info_append_int(info, "incarnation", + (int64_t) member->incarnation); info_table_end(info); } info_end(info); @@ -906,6 +1341,7 @@ swim_stop(void) ev_io_stop(loop(), &output); evio_service_stop(&input); ev_periodic_stop(loop(), &round_tick); + ev_periodic_stop(loop(), &wait_ack_tick); mh_int_t node = mh_first(members); while (node != mh_end(members)) { struct swim_member *m = (struct swim_member *) @@ -915,10 +1351,14 @@ swim_stop(void) } mh_i64ptr_delete(members); free(shuffled_members); + free(cfg); members = NULL; + cfg = NULL; + cfg_size = 0; shuffled_members = NULL; shuffled_members_size = 0; + rlist_create(&queue_wait_ack); rlist_create(&queue_output); rlist_create(&queue_round); } diff --git a/test/swim/basic.result b/test/swim/basic.result index a7ae140d6..7d0131606 100644 --- a/test/swim/basic.result +++ b/test/swim/basic.result @@ -43,10 +43,13 @@ swim_info_sorted() --- - - - 192.168.0.1:3333 - status: alive + incarnation: 0 - - 192.168.0.2:3333 - status: alive + incarnation: 0 - - 192.168.0.3:3333 - status: alive + incarnation: 0 ... swim.stop() --- @@ -63,12 +66,16 @@ swim_info_sorted() --- - - - 127.0.0.1:listen_port - status: alive + incarnation: 0 - - 192.168.0.1:3333 - status: alive + incarnation: 0 - - 192.168.0.2:3333 - status: alive + incarnation: 0 - - 192.168.0.3:3333 - status: alive + incarnation: 0 ... swim.debug_round_step() --- @@ -77,12 +84,16 @@ swim_info_sorted() --- - - - 127.0.0.1:listen_port - status: alive + incarnation: 0 - - 192.168.0.1:3333 - status: alive + incarnation: 0 - - 192.168.0.2:3333 - status: alive + incarnation: 0 - - 192.168.0.3:3333 - status: alive + incarnation: 0 ... swim.stop() --- @@ -123,6 +134,7 @@ swim_info_sorted() --- - - - 127.0.0.1:listen_port - status: alive + incarnation: 0 ... swim.stop() --- @@ -163,12 +175,16 @@ swim_info_sorted() --- - - - 127.0.0.1:listen_port - status: alive + incarnation: 0 - - 192.168.0.1:3333 - status: alive + incarnation: 0 - - 192.168.0.2:3333 - status: alive + incarnation: 0 - - 192.168.0.3:3333 - status: alive + incarnation: 0 ... swim.stop() --- -- 2.17.2 (Apple Git-113)