From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v3 2/6] [RAW] swim: introduce failure detection component Date: Sat, 29 Dec 2018 13:14:11 +0300 Message-Id: <9b4bdddc30f9554b85c9890c0d6c70645c0c7930.1546077015.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com, kostja@tarantool.org List-ID: Failure detection components allows to find which members are already dead. Part of #3234 --- src/lib/swim/swim.c | 439 +++++++++++++++++++++++++++++++++- src/lib/swim/swim_io.c | 14 ++ src/lib/swim/swim_io.h | 16 ++ src/lib/swim/swim_transport.h | 4 +- 4 files changed, 463 insertions(+), 10 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 08c377374..c7bc11bca 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -89,6 +89,22 @@ enum { /** How often to send membership messages and pings. */ HEARTBEAT_RATE_DEFAULT = 1, + /** + * If a ping was sent, it is considered to be lost after + * this time without an ack. + */ + ACK_TIMEOUT = 1, + /** + * If a member has not been responding to pings this + * number of times, it is considered to be dead. + */ + NO_ACKS_TO_DEAD = 3, + /** + * If a not pinned member confirmed to be dead, it is + * removed from the membership after at least this number + * of failed pings. + */ + NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2, }; /** @@ -109,11 +125,17 @@ enum swim_member_status { * members table. */ MEMBER_ALIVE = 0, + /** + * The member is considered to be dead. It will disappear + * from the membership, if it is not pinned. + */ + MEMBER_DEAD, swim_member_status_MAX, }; static const char *swim_member_status_strs[] = { "alive", + "dead", }; /** @@ -138,6 +160,30 @@ struct swim_member { * Position in a queue of members in the current round. */ struct rlist in_queue_round; + /** + * + * Failure detection component + */ + /** + * True, if the member is configured explicitly and can + * not disappear from the membership. + */ + bool is_pinned; + /** Growing number to refute old messages. */ + uint64_t incarnation; + /** + * How many pings did not receive an ack in a row. After + * a threshold the instance is marked as dead. After more + * it is removed from the table (if not pinned). + */ + int failed_pings; + /** When the latest ping was sent to this member. */ + double ping_ts; + /** Ready at hand regular ACK task. */ + struct swim_task ack_task; + struct swim_task ping_task; + /** Position in a queue of members waiting for an ack. */ + struct rlist in_queue_wait_ack; }; /** @@ -189,6 +235,19 @@ struct swim { * anti-entropy message. */ struct swim_member **shuffled_members; + /** + * + * Failure detection component + */ + /** + * Members waiting for an ACK. On too long absence of ACK + * a member is considered to be dead and is removed. The + * list is sorted by time in ascending order (tail is + * newer, head is older). + */ + struct rlist queue_wait_ack; + /** Generator of ack checking events. */ + struct ev_periodic wait_ack_tick; }; static inline uint64_t @@ -204,8 +263,84 @@ sockaddr_in_hash(const struct sockaddr_in *a) */ enum swim_component_type { SWIM_ANTI_ENTROPY = 0, + SWIM_FAILURE_DETECTION, }; +/** {{{ Failure detection component */ + +/** Possible failure detection keys. */ +enum swim_fd_key { + /** Type of the failure detection message: ping or ack. */ + SWIM_FD_MSG_TYPE, + /** + * Incarnation of the sender. To make the member alive if + * it was considered to be dead, but ping/ack with greater + * incarnation was received from it. + */ + SWIM_FD_INCARNATION, +}; + +/** + * Failure detection message now has only two types: ping or ack. + * Indirect ping/ack are todo. + */ +enum swim_fd_msg_type { + SWIM_FD_MSG_PING, + SWIM_FD_MSG_ACK, + swim_fd_msg_type_MAX, +}; + +static const char *swim_fd_msg_type_strs[] = { + "ping", + "ack", +}; + +/** SWIM failure detection MsgPack header template. */ +struct PACKED swim_fd_header_bin { + /** mp_encode_uint(SWIM_FAILURE_DETECTION) */ + uint8_t k_header; + /** mp_encode_map(2) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_FD_MSG_TYPE) */ + uint8_t k_type; + /** mp_encode_uint(enum swim_fd_msg_type) */ + uint8_t v_type; + + /** mp_encode_uint(SWIM_FD_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + +static inline void +swim_fd_header_bin_create(struct swim_fd_header_bin *header, + enum swim_fd_msg_type type, uint64_t incarnation) +{ + header->k_header = SWIM_FAILURE_DETECTION; + header->m_header = 0x82; + + header->k_type = SWIM_FD_MSG_TYPE; + header->v_type = type; + + header->k_incarnation = SWIM_FD_INCARNATION; + header->m_incarnation = 0xcf; + header->v_incarnation = mp_bswap_u64(incarnation); +} + +static void +swim_member_schedule_ack_wait(struct swim *swim, struct swim_member *member) +{ + if (rlist_empty(&member->in_queue_wait_ack)) { + member->ping_ts = fiber_time(); + rlist_add_tail_entry(&swim->queue_wait_ack, member, + in_queue_wait_ack); + } +} + +/** }}} Failure detection component */ + /** {{{ Anti-entropy component */ /** @@ -220,6 +355,7 @@ enum swim_member_key { */ SWIM_MEMBER_ADDRESS, SWIM_MEMBER_PORT, + SWIM_MEMBER_INCARNATION, swim_member_key_MAX, }; @@ -243,7 +379,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, /** SWIM member MsgPack template. */ struct PACKED swim_member_bin { - /** mp_encode_map(3) */ + /** mp_encode_map(4) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -262,6 +398,12 @@ struct PACKED swim_member_bin { /** mp_encode_uint(addr.sin_port) */ uint8_t m_port; uint16_t v_port; + + /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; }; static inline void @@ -271,17 +413,20 @@ swim_member_bin_reset(struct swim_member_bin *header, header->v_status = member->status; header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr); header->v_port = mp_bswap_u16(member->addr.sin_port); + header->v_incarnation = mp_bswap_u64(member->incarnation); } static inline void swim_member_bin_create(struct swim_member_bin *header) { - header->m_header = 0x83; + header->m_header = 0x84; header->k_status = SWIM_MEMBER_STATUS; header->k_addr = SWIM_MEMBER_ADDRESS; header->m_addr = 0xce; header->k_port = SWIM_MEMBER_PORT; header->m_port = 0xcd; + header->k_incarnation = SWIM_MEMBER_INCARNATION; + header->m_incarnation = 0xcf; } /** }}} Anti-entropy component */ @@ -289,17 +434,51 @@ swim_member_bin_create(struct swim_member_bin *header) /** * SWIM message structure: * { + * SWIM_FAILURE_DETECTION: { + * SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, + * SWIM_FD_INCARNATION: uint + * }, + * + * OR/AND + * * SWIM_ANTI_ENTROPY: [ * { * SWIM_MEMBER_STATUS: uint, enum member_status, * SWIM_MEMBER_ADDRESS: uint, ip, - * SWIM_MEMBER_PORT: uint, port + * SWIM_MEMBER_PORT: uint, port, + * SWIM_MEMBER_INCARNATION: uint * }, * ... * ], * } */ +/** + * Update status and incarnation of the member if needed. Statuses + * are compared as a compound key: {incarnation, status}. So @a + * new_status can override an old one only if its incarnation is + * greater, or the same, but its status is "bigger". Statuses are + * compared by their identifier, so "alive" < "dead". This + * protects from the case when a member is detected as dead on one + * instance, but overriden by another instance with the same + * incarnation "alive" message. + */ +static inline void +swim_member_update_status(struct swim *swim, struct swim_member *member, + enum swim_member_status new_status, + uint64_t incarnation) +{ + (void) swim; + assert(member != swim->self); + if (member->incarnation == incarnation) { + if (member->status < new_status) + member->status = new_status; + } else if (member->incarnation < incarnation) { + member->status = new_status; + member->incarnation = incarnation; + } +} + /** * Remove the member from all queues, hashes, destroy it and free * the memory. @@ -313,6 +492,11 @@ swim_member_delete(struct swim *swim, struct swim_member *member) mh_i64ptr_del(swim->members, rc, NULL); rlist_del_entry(member, in_queue_round); + /* Failure detection component. */ + rlist_del_entry(member, in_queue_wait_ack); + swim_task_destroy(&member->ack_task); + swim_task_destroy(&member->ping_task); + free(member); } @@ -322,7 +506,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member) */ static struct swim_member * swim_member_new(struct swim *swim, const struct sockaddr_in *addr, - enum swim_member_status status) + enum swim_member_status status, uint64_t incarnation) { struct swim_member *member = (struct swim_member *) calloc(1, sizeof(*member)); @@ -343,6 +527,12 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr, } rlist_add_entry(&swim->queue_round, member, in_queue_round); + /* Failure detection component. */ + member->incarnation = incarnation; + rlist_create(&member->in_queue_wait_ack); + swim_task_create(&member->ack_task, &swim->scheduler, swim_task_reset); + swim_task_create(&member->ping_task, &swim->scheduler, swim_task_reset); + return member; } @@ -442,6 +632,28 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg) return 1; } +/** + * Encode failure detection component. + * @retval -1 Error. + * @retval 1 Success, something is encoded. + */ +static int +swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg, + enum swim_fd_msg_type type) +{ + struct swim_fd_header_bin fd_header_bin; + int size = sizeof(fd_header_bin); + struct swim_packet *packet = swim_msg_reserve(msg, size); + if (packet == NULL) + return -1; + char *pos = swim_packet_alloc(packet, size); + swim_fd_header_bin_create(&fd_header_bin, type, + swim->self->incarnation); + memcpy(pos, &fd_header_bin, size); + swim_packet_flush(packet); + return 1; +} + /** Encode SWIM components into a sequence of UDP packets. */ static int swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) @@ -453,6 +665,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) char *header = swim_packet_alloc(packet, 1); int rc, map_size = 0; + rc = swim_encode_failure_detection(swim, msg, SWIM_FD_MSG_PING); + if (rc < 0) + goto error; + map_size += rc; + rc = swim_encode_anti_entropy(swim, msg); if (rc < 0) goto error; @@ -490,10 +707,11 @@ swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events) return; } struct swim_member *m = - rlist_shift_entry(&swim->queue_round, struct swim_member, + rlist_first_entry(&swim->queue_round, struct swim_member, in_queue_round); swim_task_schedule(&swim->round_step_task, swim->transport->send_round_msg, &m->addr); + swim_member_schedule_ack_wait(swim, m); ev_periodic_stop(loop, p); } @@ -501,16 +719,84 @@ static void swim_round_step_complete(struct swim_task *task) { struct swim *swim = container_of(task, struct swim, round_step_task); + rlist_shift_entry(&swim->queue_round, struct swim_member, + in_queue_round); swim_msg_reset(&task->msg); ev_periodic_start(loop(), &swim->round_tick); } +/** Send a failure detection message. */ +static void +swim_schedule_fd_request(struct swim *swim, struct swim_task *task, + struct swim_member *m, enum swim_fd_msg_type type, + swim_transport_send_f send) +{ + struct swim_msg *msg = &task->msg; + int rc = swim_encode_failure_detection(swim, msg, type); + if (rc < 0) { + diag_log(); + swim_task_delete(task); + return; + } + assert(rc > 0); + say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type], + sio_strfaddr((struct sockaddr *) &m->addr, + sizeof(m->addr))); + swim_task_schedule(task, send, &m->addr); +} + +static inline void +swim_schedule_ack(struct swim *swim, struct swim_member *member) +{ + swim_schedule_fd_request(swim, &member->ack_task, member, + SWIM_FD_MSG_ACK, swim->transport->send_ack); +} + +static inline void +swim_schedule_ping(struct swim *swim, struct swim_member *member) +{ + swim_schedule_fd_request(swim, &member->ping_task, member, + SWIM_FD_MSG_PING, swim->transport->send_ping); + swim_member_schedule_ack_wait(swim, member); +} + +/** + * Check for failed pings. A ping is failed if an ack was not + * received during ACK_TIMEOUT. A failed ping is resent here. + */ +static void +swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events) +{ + assert((events & EV_PERIODIC) != 0); + (void) loop; + (void) events; + struct swim *swim = (struct swim *) p->data; + struct swim_member *m, *tmp; + double current_time = fiber_time(); + rlist_foreach_entry_safe(m, &swim->queue_wait_ack, in_queue_wait_ack, + tmp) { + if (current_time - m->ping_ts < ACK_TIMEOUT) + break; + ++m->failed_pings; + if (m->failed_pings >= NO_ACKS_TO_GC) { + if (!m->is_pinned) + swim_member_delete(swim, m); + continue; + } + if (m->failed_pings >= NO_ACKS_TO_DEAD) + m->status = MEMBER_DEAD; + swim_schedule_ping(swim, m); + rlist_del_entry(m, in_queue_wait_ack); + } +} + /** * SWIM member attributes from anti-entropy and dissemination * messages. */ struct swim_member_def { struct sockaddr_in addr; + uint64_t incarnation; enum swim_member_status status; }; @@ -530,9 +816,35 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def) * members table. */ if (member == NULL) { - member = swim_member_new(swim, &def->addr, def->status); + member = swim_member_new(swim, &def->addr, def->status, + def->incarnation); if (member == NULL) diag_log(); + return; + } + struct swim_member *self = swim->self; + if (member != self) { + swim_member_update_status(swim, member, def->status, + def->incarnation); + return; + } + /* + * It is possible that other instances know a bigger + * incarnation of this instance - such thing happens when + * the instance restarts and loses its local incarnation + * number. It will be restored by receiving dissemination + * messages about self. + */ + if (self->incarnation < def->incarnation) + self->incarnation = def->incarnation; + if (def->status != MEMBER_ALIVE && + def->incarnation == self->incarnation) { + /* + * In the cluster a gossip exists that this + * instance is not alive. Refute this information + * with a bigger incarnation. + */ + self->incarnation++; } } @@ -576,6 +888,15 @@ swim_process_member_key(enum swim_member_key key, const char **pos, } def->addr.sin_port = port; break; + case SWIM_MEMBER_INCARNATION: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s member incarnation should be uint", + msg_pref); + return -1; + } + def->incarnation = mp_decode_uint(pos); + break; default: unreachable(); } @@ -627,12 +948,95 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) return 0; } +/** + * Decode a failure detection message. Schedule pings, process + * acks. + */ +static int +swim_process_failure_detection(struct swim *swim, const char **pos, + const char *end, const struct sockaddr_in *src) +{ + const char *msg_pref = "Invalid SWIM failure detection message:"; + if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) { + say_error("%s root should be a map", msg_pref); + return -1; + } + uint64_t size = mp_decode_map(pos); + if (size != 2) { + say_error("%s root map should have two keys - message type "\ + "and incarnation", msg_pref); + return -1; + } + enum swim_fd_msg_type type = swim_fd_msg_type_MAX; + uint64_t incarnation = 0; + for (int i = 0; i < (int) size; ++i) { + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s a key should be uint", msg_pref); + return -1; + } + uint64_t key = mp_decode_uint(pos); + switch(key) { + case SWIM_FD_MSG_TYPE: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s message type should be uint", + msg_pref); + return -1; + } + key = mp_decode_uint(pos); + if (key >= swim_fd_msg_type_MAX) { + say_error("%s unknown message type", msg_pref); + return -1; + } + type = key; + break; + case SWIM_FD_INCARNATION: + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s incarnation should be uint", + msg_pref); + return -1; + } + incarnation = mp_decode_uint(pos); + break; + default: + say_error("%s unknown key", msg_pref); + return -1; + } + } + if (type == swim_fd_msg_type_MAX) { + say_error("%s message type should be specified", msg_pref); + return -1; + } + struct swim_member *sender = swim_find_member(swim, src); + if (sender == NULL) { + sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation); + if (sender == NULL) { + diag_log(); + return 0; + } + } else { + swim_member_update_status(swim, sender, MEMBER_ALIVE, + incarnation); + } + if (type == SWIM_FD_MSG_PING) { + swim_schedule_ack(swim, sender); + } else { + assert(type == SWIM_FD_MSG_ACK); + if (incarnation >= sender->incarnation) { + sender->failed_pings = 0; + rlist_del_entry(sender, in_queue_wait_ack); + } + } + return 0; +} + /** Receive and process a new message. */ static void swim_on_input(struct swim_scheduler *scheduler, const struct swim_packet *packet, const struct sockaddr_in *src) { - (void) src; const char *msg_pref = "Invalid SWIM message:"; struct swim *swim = container_of(scheduler, struct swim, scheduler); const char *pos = packet->body; @@ -655,6 +1059,12 @@ swim_on_input(struct swim_scheduler *scheduler, if (swim_process_anti_entropy(swim, &pos, end) != 0) return; break; + case SWIM_FAILURE_DETECTION: + say_verbose("SWIM: process failure detection"); + if (swim_process_failure_detection(swim, &pos, end, + src) != 0) + return; + break; default: say_error("%s unknown component type component is "\ "supported", msg_pref); @@ -726,6 +1136,12 @@ swim_new(void) swim_round_step_complete); swim->transport = &swim_udp_transport; swim_scheduler_create(&swim->scheduler, swim_on_input, swim->transport); + + /* Failure detection component. */ + rlist_create(&swim->queue_wait_ack); + ev_init(&swim->wait_ack_tick, swim_check_acks); + ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL); + swim->wait_ack_tick.data = (void *) swim; return swim; } @@ -738,7 +1154,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; struct swim_member *new_self = NULL; if (swim_find_member(swim, &addr) == NULL) { - new_self = swim_member_new(swim, &addr, MEMBER_ALIVE); + new_self = swim_member_new(swim, &addr, MEMBER_ALIVE, 0); if (new_self == NULL) return -1; } @@ -747,6 +1163,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } ev_periodic_start(loop(), &swim->round_tick); + ev_periodic_start(loop(), &swim->wait_ack_tick); if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0) ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL); @@ -767,9 +1184,10 @@ swim_add_member(struct swim *swim, const char *uri) return -1; struct swim_member *member = swim_find_member(swim, &addr); if (member == NULL) { - member = swim_member_new(swim, &addr, MEMBER_ALIVE); + member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0); if (member == NULL) return -1; + member->is_pinned = true; } return 0; } @@ -800,6 +1218,8 @@ swim_info(struct swim *swim, struct info_handler *info) sizeof(member->addr))); info_append_str(info, "status", swim_member_status_strs[member->status]); + info_append_int(info, "incarnation", + (int64_t) member->incarnation); info_table_end(info); } info_end(info); @@ -810,6 +1230,7 @@ swim_delete(struct swim *swim) { swim_scheduler_destroy(&swim->scheduler); ev_periodic_stop(loop(), &swim->round_tick); + ev_periodic_stop(loop(), &swim->wait_ack_tick); swim_task_destroy(&swim->round_step_task); mh_int_t node = mh_first(swim->members); while (node != mh_end(swim->members)) { diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index 8a1eca819..00a16a2bb 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -23,6 +23,8 @@ swim_udp_recv_msg(int fd, void *buffer, size_t size, struct sockaddr *addr, struct swim_transport swim_udp_transport = { /* .send_round_msg = */ swim_udp_send_msg, + /* .send_ping = */ swim_udp_send_msg, + /* .send_ack = */ swim_udp_send_msg, /* .recv_msg = */ swim_udp_recv_msg, }; @@ -39,6 +41,18 @@ swim_packet_new(struct swim_msg *msg) return res; } +struct swim_task * +swim_task_new(struct swim_scheduler *scheduler, swim_task_f complete) +{ + struct swim_task *task = (struct swim_task *) malloc(sizeof(*task)); + if (task == NULL) { + diag_set(OutOfMemory, sizeof(*task), "malloc", "task"); + return NULL; + } + swim_task_create(task, scheduler, complete); + return task; +} + void swim_task_schedule(struct swim_task *task, swim_transport_send_f send, const struct sockaddr_in *dst) diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index fc44fd0a7..f08bd1ef3 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -279,12 +279,21 @@ swim_task_create(struct swim_task *task, struct swim_scheduler *scheduler, task->scheduler = scheduler; } +struct swim_task * +swim_task_new(struct swim_scheduler *scheduler, swim_task_f complete); + static inline bool swim_task_is_active(struct swim_task *task) { return ! rlist_empty(&task->in_queue_output); } +static inline void +swim_task_reset(struct swim_task *task) +{ + swim_msg_reset(&task->msg); +} + static inline void swim_task_destroy(struct swim_task *task) { @@ -292,6 +301,13 @@ swim_task_destroy(struct swim_task *task) swim_msg_destroy(&task->msg); } +static inline void +swim_task_delete(struct swim_task *task) +{ + swim_task_destroy(task); + free(task); +} + #if defined(__cplusplus) } #endif diff --git a/src/lib/swim/swim_transport.h b/src/lib/swim/swim_transport.h index d629526ac..0bf4aa186 100644 --- a/src/lib/swim/swim_transport.h +++ b/src/lib/swim/swim_transport.h @@ -52,7 +52,9 @@ struct swim_transport { * are like sendto(). */ swim_transport_send_f send_round_msg; - + /** Send failure detection message. */ + swim_transport_send_f send_ping; + swim_transport_send_f send_ack; /** * Receive a message. Not necessary round or failure * detection. Before message is received, its type is -- 2.17.2 (Apple Git-113)