From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v2 2/6] [RAW] swim: introduce failure detection component Date: Tue, 25 Dec 2018 22:19:25 +0300 Message-Id: <4c60b22dd4354b56d719d66dd4b928075dea7cba.1545765055.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com, kostja@tarantool.org List-ID: Failure detection components allows to find which members are already dead. Part of #3234 --- src/lib/swim/swim.c | 457 +++++++++++++++++++++++++++++++++++++++++++- src/lib/swim/swim.h | 9 + 2 files changed, 459 insertions(+), 7 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 989d83a22..22bc06a60 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -99,6 +99,22 @@ enum { * networks by their admins. */ UDP_PACKET_SIZE = 1472, + /** + * If a ping was sent, it is considered to be lost after + * this time without an ack. + */ + ACK_TIMEOUT = 1, + /** + * If a member has not been responding to pings this + * number of times, it is considered to be dead. + */ + NO_ACKS_TO_DEAD = 3, + /** + * If a not pinned member confirmed to be dead, it is + * removed from the membership after at least this number + * of failed pings. + */ + NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2, }; static ssize_t @@ -124,6 +140,7 @@ swim_udp_recv_msg(int fd, void *buffer, size_t size, struct sockaddr *addr, /** UDP sendto/recvfrom implementation of swim_transport. */ static struct swim_transport swim_udp_transport = { /* .send_round_msg = */ swim_udp_send_msg, + /* .send_failure_detection_msg = */ swim_udp_send_msg, /* .recv_msg = */ swim_udp_recv_msg, }; @@ -156,6 +173,12 @@ swim_io_task_create(struct swim_io_task *task, swim_io_task_f cb, rlist_create(&task->in_queue_output); } +static inline void +swim_io_task_destroy(struct swim_io_task *task) +{ + rlist_del_entry(task, in_queue_output); +} + /** * UDP body size is limited by definition. To be able to send a * big message it should be split into multiple packets. Each @@ -287,11 +310,17 @@ enum swim_member_status { * members table. */ MEMBER_ALIVE = 0, + /** + * The member is considered to be dead. It will disappear + * from the membership, if it is not pinned. + */ + MEMBER_DEAD, swim_member_status_MAX, }; static const char *swim_member_status_strs[] = { "alive", + "dead", }; /** @@ -316,6 +345,38 @@ struct swim_member { * Position in a queue of members in the current round. */ struct rlist in_queue_round; + /** + * + * Failure detection component + */ + /** + * True, if the member is configured explicitly and can + * not disappear from the membership. + */ + bool is_pinned; + /** Growing number to refute old messages. */ + uint64_t incarnation; + /** + * How many pings did not receive an ack in a row. After + * a threshold the instance is marked as dead. After more + * it is removed from the table (if not pinned). + */ + int failed_pings; + /** When the last ping was sent. */ + double ping_ts; + /** + * Ready at hand ack task to send when this member has + * sent ping to us. + */ + struct swim_io_task ack_task; + /** + * Ready at hand ping task to send when this member too + * long does not respond to an initial ping, piggybacked + * with members table. + */ + struct swim_io_task ping_task; + /** Position in a queue of members waiting for an ack. */ + struct rlist in_queue_wait_ack; }; /** @@ -381,6 +442,15 @@ struct swim { int shuffled_members_size; /** Queue of output tasks ready to write now. */ struct rlist queue_output; + /** + * Members waiting for an ACK. On too long absence of ACK + * a member is considered to be dead and is removed. The + * list is sorted by time in ascending order (tail is + * newer, head is older). + */ + struct rlist queue_wait_ack; + /** Generator of ack checking events. */ + struct ev_periodic wait_ack_tick; }; static inline uint64_t @@ -396,8 +466,84 @@ sockaddr_in_hash(const struct sockaddr_in *a) */ enum swim_component_type { SWIM_ANTI_ENTROPY = 0, + SWIM_FAILURE_DETECTION, }; +/** {{{ Failure detection component */ + +/** Possible failure detection keys. */ +enum swim_fd_key { + /** Type of the failure detection message: ping or ack. */ + SWIM_FD_MSG_TYPE, + /** + * Incarnation of the sender. To make the member alive if + * it was considered to be dead, but ping/ack with greater + * incarnation was received from it. + */ + SWIM_FD_INCARNATION, +}; + +/** + * Failure detection message now has only two types: ping or ack. + * Indirect ping/ack are todo. + */ +enum swim_fd_msg_type { + SWIM_FD_MSG_PING, + SWIM_FD_MSG_ACK, + swim_fd_msg_type_MAX, +}; + +static const char *swim_fd_msg_type_strs[] = { + "ping", + "ack", +}; + +/** SWIM failure detection MsgPack header template. */ +struct PACKED swim_fd_header_bin { + /** mp_encode_uint(SWIM_FAILURE_DETECTION) */ + uint8_t k_header; + /** mp_encode_map(2) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_FD_MSG_TYPE) */ + uint8_t k_type; + /** mp_encode_uint(enum swim_fd_msg_type) */ + uint8_t v_type; + + /** mp_encode_uint(SWIM_FD_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + +static inline void +swim_fd_header_bin_create(struct swim_fd_header_bin *header, + enum swim_fd_msg_type type, uint64_t incarnation) +{ + header->k_header = SWIM_FAILURE_DETECTION; + header->m_header = 0x82; + + header->k_type = SWIM_FD_MSG_TYPE; + header->v_type = type; + + header->k_incarnation = SWIM_FD_INCARNATION; + header->m_incarnation = 0xcf; + header->v_incarnation = mp_bswap_u64(incarnation); +} + +static void +swim_member_schedule_ack_wait(struct swim *swim, struct swim_member *member) +{ + if (rlist_empty(&member->in_queue_wait_ack)) { + member->ping_ts = fiber_time(); + rlist_add_tail_entry(&swim->queue_wait_ack, member, + in_queue_wait_ack); + } +} + +/** }}} Failure detection component */ + /** {{{ Anti-entropy component */ /** @@ -412,6 +558,7 @@ enum swim_member_key { */ SWIM_MEMBER_ADDR, SWIM_MEMBER_PORT, + SWIM_MEMBER_INCARNATION, swim_member_key_MAX, }; @@ -435,7 +582,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, /** SWIM member MsgPack template. */ struct PACKED swim_member_bin { - /** mp_encode_map(3) */ + /** mp_encode_map(4) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -454,6 +601,12 @@ struct PACKED swim_member_bin { /** mp_encode_uint(addr.sin_port) */ uint8_t m_port; uint16_t v_port; + + /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; }; static inline void @@ -463,17 +616,20 @@ swim_member_bin_reset(struct swim_member_bin *header, header->v_status = member->status; header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr); header->v_port = mp_bswap_u16(member->addr.sin_port); + header->v_incarnation = mp_bswap_u64(member->incarnation); } static inline void swim_member_bin_create(struct swim_member_bin *header) { - header->m_header = 0x83; + header->m_header = 0x84; header->k_status = SWIM_MEMBER_STATUS; header->k_addr = SWIM_MEMBER_ADDR; header->m_addr = 0xce; header->k_port = SWIM_MEMBER_PORT; header->m_port = 0xcd; + header->k_incarnation = SWIM_MEMBER_INCARNATION; + header->m_incarnation = 0xcf; } /** }}} Anti-entropy component */ @@ -481,11 +637,19 @@ swim_member_bin_create(struct swim_member_bin *header) /** * SWIM message structure: * { + * SWIM_FAILURE_DETECTION: { + * SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, + * SWIM_FD_INCARNATION: uint + * }, + * + * OR/AND + * * SWIM_ANTI_ENTROPY: [ * { * SWIM_MEMBER_STATUS: uint, enum member_status, * SWIM_MEMBER_ADDR: uint, ip, - * SWIM_MEMBER_PORT: uint, port + * SWIM_MEMBER_PORT: uint, port, + * SWIM_MEMBER_INCARNATION: uint * }, * ... * ], @@ -499,13 +663,45 @@ swim_io_task_push(struct swim_io_task *task) ev_io_start(loop(), &task->swim->output); } +/** + * Update status of the member if needed. Statuses are compared as + * a compound key: {incarnation, status}. So @a new_status can + * override an old one only if its incarnation is greater, or the + * same, but its status is "bigger". Statuses are compared by + * their identifier, so "alive" < "dead". This protects from the + * case when a member is detected as dead on one instance, but + * overriden by another instance with the same incarnation "alive" + * message. + */ +static inline void +swim_member_update_status(struct swim *swim, struct swim_member *member, + enum swim_member_status new_status, + uint64_t incarnation) +{ + (void) swim; + assert(member != swim->self); + if (member->incarnation == incarnation) { + if (member->status < new_status) + member->status = new_status; + } else if (member->incarnation < incarnation) { + member->status = new_status; + member->incarnation = incarnation; + } +} + +static void +swim_send_ack(struct swim_io_task *task); + +static void +swim_send_ping(struct swim_io_task *task); + /** * Register a new member with a specified status. Here it is * added to the hash, to the 'next' queue. */ static struct swim_member * swim_member_new(struct swim *swim, const struct sockaddr_in *addr, - enum swim_member_status status) + enum swim_member_status status, uint64_t incarnation) { struct swim_member *member = (struct swim_member *) calloc(1, sizeof(*member)); @@ -515,6 +711,7 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr, } member->status = status; member->addr = *addr; + member->incarnation = incarnation; struct mh_i64ptr_node_t node; node.key = sockaddr_in_hash(addr); node.val = member; @@ -524,7 +721,10 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr, diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node"); return NULL; } + swim_io_task_create(&member->ack_task, swim_send_ack, swim); + swim_io_task_create(&member->ping_task, swim_send_ping, swim); rlist_add_entry(&swim->queue_round, member, in_queue_round); + rlist_create(&member->in_queue_wait_ack); return member; } @@ -549,7 +749,10 @@ swim_member_delete(struct swim *swim, struct swim_member *member) mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL); assert(rc != mh_end(swim->members)); mh_i64ptr_del(swim->members, rc, NULL); + swim_io_task_destroy(&member->ack_task); + swim_io_task_destroy(&member->ping_task); rlist_del_entry(member, in_queue_round); + rlist_del_entry(member, in_queue_wait_ack); free(member); } @@ -648,6 +851,27 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg) return i; } +/** + * Encode failure detection component. + * @retval Number of encoded messages. + */ +static int +swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg, + enum swim_fd_msg_type type) +{ + struct swim_fd_header_bin fd_header_bin; + int size = sizeof(fd_header_bin); + struct swim_msg_part *part = swim_msg_reserve(msg, size); + if (part == NULL) + return -1; + char *pos = swim_msg_part_pos(part); + swim_fd_header_bin_create(&fd_header_bin, type, + swim->self->incarnation); + memcpy(pos, &fd_header_bin, size); + swim_msg_part_advance(part, size); + return 1; +} + /** Encode SWIM components into a sequence of UDP packets. */ static int swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) @@ -660,6 +884,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) int rc, map_size = 0; swim_msg_part_advance(part, 1); + rc = swim_encode_failure_detection(swim, msg, SWIM_FD_MSG_PING); + if (rc < 0) + goto error; + map_size += rc > 0; + rc = swim_encode_anti_entropy(swim, msg); if (rc < 0) goto error; @@ -713,11 +942,58 @@ swim_send_round_msg(struct swim_io_task *task) diag_log(); } swim_msg_destroy(&msg); + swim_member_schedule_ack_wait(swim, m); rlist_del_entry(m, in_queue_round); next_round_step: ev_periodic_start(loop(), &swim->round_tick); } +/** Send a failure detection message. */ +static void +swim_send_fd_message(struct swim *swim, struct swim_member *m, + enum swim_fd_msg_type type) +{ + struct swim_msg msg; + swim_msg_create(&msg); + int rc = swim_encode_failure_detection(swim, &msg, type); + if (rc < 0) { + diag_log(); + swim_msg_destroy(&msg); + return; + } + assert(rc > 0); + struct swim_msg_part *part = swim_msg_first_part(&msg); + struct sockaddr *addr = (struct sockaddr *) &m->addr; + assert(swim_msg_part_is_last(part)); + say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type], + sio_strfaddr(addr, sizeof(m->addr))); + if (swim->transport.send_failure_detection_msg(swim->output.fd, + part->body, part->size, + addr, + sizeof(m->addr)) == -1) + diag_log(); + swim_msg_destroy(&msg); +} + +static void +swim_send_ack(struct swim_io_task *task) +{ + assert(task->cb == swim_send_ack); + struct swim_member *m = container_of(task, struct swim_member, + ack_task); + swim_send_fd_message(task->swim, m, SWIM_FD_MSG_ACK); +} + +static void +swim_send_ping(struct swim_io_task *task) +{ + assert(task->cb == swim_send_ping); + struct swim_member *m = container_of(task, struct swim_member, + ping_task); + swim_send_fd_message(task->swim, m, SWIM_FD_MSG_PING); + swim_member_schedule_ack_wait(task->swim, m); +} + static void swim_on_output(struct ev_loop *loop, struct ev_io *io, int events) { @@ -745,12 +1021,43 @@ swim_trigger_round_step(struct ev_loop *loop, struct ev_periodic *p, int events) ev_periodic_stop(loop, p); } +/** + * Check for failed pings. A ping is failed if an ack was not + * received during ACK_TIMEOUT. A failed ping is resent here. + */ +static void +swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events) +{ + assert((events & EV_PERIODIC) != 0); + (void) loop; + (void) events; + struct swim *swim = (struct swim *) p->data; + struct swim_member *m, *tmp; + double current_time = fiber_time(); + rlist_foreach_entry_safe(m, &swim->queue_wait_ack, in_queue_wait_ack, + tmp) { + if (current_time - m->ping_ts < ACK_TIMEOUT) + break; + ++m->failed_pings; + if (m->failed_pings >= NO_ACKS_TO_GC) { + if (!m->is_pinned) + swim_member_delete(swim, m); + continue; + } + if (m->failed_pings >= NO_ACKS_TO_DEAD) + m->status = MEMBER_DEAD; + swim_io_task_push(&m->ping_task); + rlist_del_entry(m, in_queue_wait_ack); + } +} + /** * SWIM member attributes from anti-entropy and dissemination * messages. */ struct swim_member_def { struct sockaddr_in addr; + uint64_t incarnation; enum swim_member_status status; }; @@ -760,6 +1067,7 @@ swim_member_def_create(struct swim_member_def *def) def->addr.sin_port = 0; def->addr.sin_addr.s_addr = 0; def->status = MEMBER_ALIVE; + def->incarnation = 0; } static void @@ -771,9 +1079,35 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def) * members table. */ if (member == NULL) { - member = swim_member_new(swim, &def->addr, def->status); + member = swim_member_new(swim, &def->addr, def->status, + def->incarnation); if (member == NULL) diag_log(); + return; + } + struct swim_member *self = swim->self; + if (member != self) { + swim_member_update_status(swim, member, def->status, + def->incarnation); + return; + } + /* + * It is possible that other instances know a bigger + * incarnation of this instance - such thing happens when + * the instance restarts and loses its local incarnation + * number. It will be restored by receiving dissemination + * messages about self. + */ + if (self->incarnation < def->incarnation) + self->incarnation = def->incarnation; + if (def->status != MEMBER_ALIVE && + def->incarnation == self->incarnation) { + /* + * In the cluster a gossip exists that this + * instance is not alive. Refute this information + * with a bigger incarnation. + */ + self->incarnation++; } } @@ -817,6 +1151,15 @@ swim_process_member_key(enum swim_member_key key, const char **pos, } def->addr.sin_port = port; break; + case SWIM_MEMBER_INCARNATION: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s member incarnation should be uint", + msg_pref); + return -1; + } + def->incarnation = mp_decode_uint(pos); + break; default: unreachable(); } @@ -868,6 +1211,91 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) return 0; } +/** + * Decode a failure detection message. Schedule pings, process + * acks. + */ +static int +swim_process_failure_detection(struct swim *swim, const char **pos, + const char *end, const struct sockaddr_in *src) +{ + const char *msg_pref = "Invalid SWIM failure detection message:"; + if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) { + say_error("%s root should be a map", msg_pref); + return -1; + } + uint64_t size = mp_decode_map(pos); + if (size != 2) { + say_error("%s root map should have two keys - message type "\ + "and incarnation", msg_pref); + return -1; + } + enum swim_fd_msg_type type = swim_fd_msg_type_MAX; + uint64_t incarnation = 0; + for (int i = 0; i < (int) size; ++i) { + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s a key should be uint", msg_pref); + return -1; + } + uint64_t key = mp_decode_uint(pos); + switch(key) { + case SWIM_FD_MSG_TYPE: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s message type should be uint", + msg_pref); + return -1; + } + key = mp_decode_uint(pos); + if (key >= swim_fd_msg_type_MAX) { + say_error("%s unknown message type", msg_pref); + return -1; + } + type = key; + break; + case SWIM_FD_INCARNATION: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s incarnation should be uint", + msg_pref); + return -1; + } + incarnation = mp_decode_uint(pos); + break; + default: + say_error("%s unknown key", msg_pref); + return -1; + } + } + if (type == swim_fd_msg_type_MAX) { + say_error("%s message type should be specified", msg_pref); + return -1; + } + struct swim_member *sender = swim_find_member(swim, src); + if (sender == NULL) { + sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation); + if (sender == NULL) { + diag_log(); + return 0; + } + } else { + swim_member_update_status(swim, sender, MEMBER_ALIVE, + incarnation); + } + if (type == SWIM_FD_MSG_PING) { + swim_io_task_push(&sender->ack_task); + } else { + assert(type == SWIM_FD_MSG_ACK); + if (incarnation >= sender->incarnation) { + sender->failed_pings = 0; + rlist_del_entry(&sender->ping_task, in_queue_output); + rlist_del_entry(sender, in_queue_wait_ack); + } + } + return 0; +} + /** Receive and process a new message. */ static void swim_on_input(struct ev_loop *loop, struct ev_io *io, int events) @@ -910,6 +1338,12 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events) if (swim_process_anti_entropy(swim, &pos, end) != 0) return; break; + case SWIM_FAILURE_DETECTION: + say_verbose("SWIM: process failure detection"); + if (swim_process_failure_detection(swim, &pos, end, + &addr) != 0) + return; + break; default: say_error("%s unknown component type component is "\ "supported", msg_pref); @@ -984,6 +1418,10 @@ swim_new(void) swim->output.data = (void *) swim; swim->transport = swim_udp_transport; rlist_create(&swim->queue_output); + rlist_create(&swim->queue_wait_ack); + ev_init(&swim->wait_ack_tick, swim_check_acks); + ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL); + swim->wait_ack_tick.data = (void *) swim; return swim; } @@ -1024,7 +1462,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, new_self = swim_find_member(swim, &addr); if (new_self == NULL) { new_self = swim_member_new(swim, &addr, - MEMBER_ALIVE); + MEMBER_ALIVE, 0); if (new_self == NULL) { close(fd); return -1; @@ -1034,6 +1472,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, ev_io_set(&swim->input, fd, EV_READ); ev_io_set(&swim->output, fd, EV_WRITE); ev_periodic_start(loop(), &swim->round_tick); + ev_periodic_start(loop(), &swim->wait_ack_tick); } } @@ -1054,9 +1493,10 @@ swim_add_member(struct swim *swim, const char *uri) return -1; struct swim_member *member = swim_find_member(swim, &addr); if (member == NULL) { - member = swim_member_new(swim, &addr, MEMBER_ALIVE); + member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0); if (member == NULL) return -1; + member->is_pinned = true; } return 0; } @@ -1087,6 +1527,8 @@ swim_info(struct swim *swim, struct info_handler *info) sizeof(member->addr))); info_append_str(info, "status", swim_member_status_strs[member->status]); + info_append_int(info, "incarnation", + (int64_t) member->incarnation); info_table_end(info); } info_end(info); @@ -1099,6 +1541,7 @@ swim_delete(struct swim *swim) ev_io_stop(loop(), &swim->output); ev_io_stop(loop(), &swim->input); ev_periodic_stop(loop(), &swim->round_tick); + ev_periodic_stop(loop(), &swim->wait_ack_tick); mh_int_t node = mh_first(swim->members); while (node != mh_end(swim->members)) { struct swim_member *m = (struct swim_member *) diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 77e16ed53..51f0c144d 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -54,6 +54,15 @@ struct swim_transport { (*send_round_msg)(int fd, const void *data, size_t size, const struct sockaddr *addr, socklen_t addr_size); + /** + * Send failure detection message. Contains ping, ack. + * Parameters are like sendto(). + */ + ssize_t + (*send_failure_detection_msg)(int fd, const void *data, size_t size, + const struct sockaddr *addr, + socklen_t addr_size); + /** * Receive a message. Not necessary round or failure * detection. Before message is received, its type is -- 2.17.2 (Apple Git-113)