From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v4 10/12] [RAW] swim: introduce 'quit' message Date: Thu, 31 Jan 2019 00:28:28 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org, vdavydov.dev@gmail.com List-ID: 'Quit' message helps gracefully leave the cluster, notifying all members that this instance is not dead, but just decided to leave. Part of #3234 --- src/lib/swim/swim.c | 255 +++++++++++++++++++++++++++++++++++--- src/lib/swim/swim.h | 8 ++ src/lib/swim/swim_io.c | 8 +- src/lib/swim/swim_io.h | 4 + src/lib/swim/swim_proto.c | 12 ++ src/lib/swim/swim_proto.h | 38 ++++++ src/lua/swim.c | 33 ++++- 7 files changed, 333 insertions(+), 25 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 5cec3789a..b377f154f 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -138,8 +138,14 @@ enum { */ ACK_TIMEOUT_DEFAULT = 30, /** - * If a member has not been responding to pings this - * number of times, it is considered to be dead. + * If an alive member has not been responding to pings + * this number of times, it is suspected to be dead. To + * confirm the death it should fail more pings. + */ + NO_ACKS_TO_SUSPECT = 2, + /** + * If a suspected member has not been responding to pings + * this number of times, it is considered to be dead. * According to the SWIM paper, for a member it is enough * to do not respond on one direct ping, and on K * simultanous indirect pings, to be considered as dead. @@ -156,6 +162,11 @@ enum { * anti-entropy components. */ NO_ACKS_TO_GC = 2, + /** + * Number of attempts to reach out a member who did not + * answered on a regular ping via another members. + */ + INDIRECT_PING_COUNT = 2, }; /** @@ -367,6 +378,15 @@ struct swim { struct rlist queue_events; }; +/** Get a random member from a members table. */ +static inline struct swim_member * +swim_random_member(struct swim *swim) +{ + int rnd = swim_scaled_rand(0, mh_size(swim->members) - 1); + mh_int_t node = mh_swim_table_random(swim->members, rnd); + return *mh_swim_table_node(swim->members, node); +} + /** Reset cached round message on any change of any member. */ static inline void cached_round_msg_invalidate(struct swim *swim) @@ -374,15 +394,34 @@ cached_round_msg_invalidate(struct swim *swim) swim_packet_create(&swim->round_step_task.packet); } +/** Comparator for a sorted list of ping deadlines. */ +static inline int +swim_member_ping_deadline_cmp(struct swim_member *a, struct swim_member *b) +{ + double res = a->ping_deadline - b->ping_deadline; + if (res > 0) + return 1; + return res < 0 ? -1 : 0; +} + /** Put the member into a list of ACK waiters. */ static void -swim_member_wait_ack(struct swim *swim, struct swim_member *member) +swim_member_wait_ack(struct swim *swim, struct swim_member *member, + int hop_count) { if (rlist_empty(&member->in_queue_wait_ack)) { - member->ping_deadline = fiber_time() + - swim->wait_ack_tick.interval; - rlist_add_tail_entry(&swim->queue_wait_ack, member, - in_queue_wait_ack); + double timeout = swim->wait_ack_tick.interval * hop_count; + member->ping_deadline = fiber_time() + timeout; + struct swim_member *pos; + /* + * Indirect ping deadline can be later than + * deadlines of some of newer direct pings, so it + * is not enough to just append a new member to + * the end of this list. + */ + rlist_add_tail_entry_sorted(&swim->queue_wait_ack, pos, member, + in_queue_wait_ack, + swim_member_ping_deadline_cmp); } } @@ -556,7 +595,7 @@ swim_ping_task_complete(struct swim_task *task, struct swim *swim = swim_by_scheduler(scheduler); struct swim_member *m = container_of(task, struct swim_member, ping_task); - swim_member_wait_ack(swim, m); + swim_member_wait_ack(swim, m, 1); } /** @@ -845,6 +884,8 @@ swim_decrease_events_ttl(struct swim *swim) if (--member->status_ttl == 0) { rlist_del_entry(member, in_queue_events); cached_round_msg_invalidate(swim); + if (member->status == MEMBER_LEFT) + swim_member_delete(swim, member); } } } @@ -897,7 +938,7 @@ swim_round_step_complete(struct swim_task *task, * Each round message contains failure detection * section with a ping. */ - swim_member_wait_ack(swim, m); + swim_member_wait_ack(swim, m, 1); /* As well as dissemination. */ swim_decrease_events_ttl(swim); } @@ -906,12 +947,15 @@ swim_round_step_complete(struct swim_task *task, /** Schedule send of a failure detection message. */ static void swim_send_fd_request(struct swim *swim, struct swim_task *task, - const struct sockaddr_in *dst, enum swim_fd_msg_type type) + const struct sockaddr_in *dst, enum swim_fd_msg_type type, + const struct sockaddr_in *proxy) { /* * Reset packet allocator in case if task is being reused. */ swim_packet_create(&task->packet); + if (proxy != NULL) + swim_task_proxy(task, proxy); char *header = swim_packet_alloc(&task->packet, 1); int map_size = swim_encode_src_uuid(swim, &task->packet); map_size += swim_encode_failure_detection(swim, &task->packet, type); @@ -925,17 +969,82 @@ swim_send_fd_request(struct swim *swim, struct swim_task *task, /** Schedule send of an ack. */ static inline void swim_send_ack(struct swim *swim, struct swim_task *task, - const struct sockaddr_in *dst) + const struct sockaddr_in *dst, const struct sockaddr_in *proxy) { - swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_ACK); + swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_ACK, proxy); } /** Schedule send of a ping. */ static inline void swim_send_ping(struct swim *swim, struct swim_task *task, - const struct sockaddr_in *dst) + const struct sockaddr_in *dst, const struct sockaddr_in *proxy) { - swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_PING); + swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_PING, proxy); +} + +/** + * Indirect ping task. It is executed multiple times to send a + * ping to several random members. Main motivation of this task is + * to do not create many tasks for indirect pings swarm, but reuse + * one. + */ +struct swim_iping_task { + /** Base structure. */ + struct swim_task base; + /** + * How many times to send. Decremented on each send and on + * 0 the task is deleted. + */ + int ttl; +}; + +/** Reschedule the task with a different proxy, or delete. */ +static void +swim_iping_task_complete(struct swim_task *base_task, + struct swim_scheduler *scheduler, int rc) +{ + (void) rc; + struct swim *swim = swim_by_scheduler(scheduler); + struct swim_iping_task *task = (struct swim_iping_task *) base_task; + if (--task->ttl == 0) { + swim_task_destroy(base_task); + free(task); + return; + } + swim_task_send(base_task, &swim_random_member(swim)->addr, scheduler); +} + +/** + * Schedule a number of indirect pings of a member with the + * specified address and UUID. + */ +static inline int +swim_send_indirect_pings(struct swim *swim, const struct sockaddr_in *dst) +{ + struct swim_iping_task *task = + (struct swim_iping_task *) malloc(sizeof(*task)); + if (task == NULL) { + diag_set(OutOfMemory, sizeof(*task), "malloc", "task"); + return -1; + } + task->ttl = INDIRECT_PING_COUNT; + swim_task_create(&task->base, swim_iping_task_complete, + swim_task_delete_cb); + swim_send_ping(swim, &task->base, dst, &swim_random_member(swim)->addr); + return 0; +} + +/** Schedule an indirect ACK. */ +static inline int +swim_send_indirect_ack(struct swim *swim, const struct sockaddr_in *dst, + const struct sockaddr_in *proxy) +{ + struct swim_task *task = swim_task_new(swim_task_delete_cb, + swim_task_delete_cb); + if (task == NULL) + return -1; + swim_send_ack(swim, task, dst, proxy); + return 0; } /** @@ -959,6 +1068,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events) ++m->unacknowledged_pings; switch (m->status) { case MEMBER_ALIVE: + if (m->unacknowledged_pings < NO_ACKS_TO_SUSPECT) + break; + m->status = MEMBER_SUSPECTED; + swim_member_status_is_updated(m, swim); + if (swim_send_indirect_pings(swim, &m->addr) != 0) + diag_log(); + break; + case MEMBER_SUSPECTED: if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) { m->status = MEMBER_DEAD; swim_member_status_is_updated(m, swim); @@ -969,10 +1086,12 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events) m->status_ttl == 0) swim_member_delete(swim, m); break; + case MEMBER_LEFT: + break; default: unreachable(); } - swim_send_ping(swim, &m->ping_task, &m->addr); + swim_send_ping(swim, &m->ping_task, &m->addr, NULL); rlist_del_entry(m, in_queue_wait_ack); } } @@ -1136,7 +1255,8 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) static int swim_process_failure_detection(struct swim *swim, const char **pos, const char *end, const struct sockaddr_in *src, - const struct tt_uuid *uuid) + const struct tt_uuid *uuid, + const struct sockaddr_in *proxy) { const char *msg_pref = "invalid failure detection message:"; struct swim_failure_detection_def def; @@ -1154,7 +1274,13 @@ swim_process_failure_detection(struct swim *swim, const char **pos, switch (def.type) { case SWIM_FD_MSG_PING: - swim_send_ack(swim, &member->ack_task, &member->addr); + if (proxy == NULL) { + swim_send_ack(swim, &member->ack_task, &member->addr, + NULL); + } else if (swim_send_indirect_ack(swim, &member->addr, + proxy) != 0) { + diag_log(); + } break; case SWIM_FD_MSG_ACK: if (def.incarnation >= member->incarnation) { @@ -1197,13 +1323,43 @@ swim_process_dissemination(struct swim *swim, const char **pos, const char *end) return 0; } +/** + * Decode a quit message. Schedule dissemination, change status. + */ +static int +swim_process_quit(struct swim *swim, const char **pos, const char *end, + const struct sockaddr_in *src, const struct tt_uuid *uuid) +{ + (void) src; + const char *msg_pref = "invald quit message:"; + uint32_t size; + if (swim_decode_map(pos, end, &size, msg_pref, "root") != 0) + return -1; + if (size != 1) { + diag_set(SwimError, "%s map of size 1 is expected", msg_pref); + return -1; + } + uint64_t tmp; + if (swim_decode_uint(pos, end, &tmp, msg_pref, "a key") != 0) + return -1; + if (tmp != SWIM_QUIT_INCARNATION) { + diag_set(SwimError, "%s a key should be incarnation", msg_pref); + return -1; + } + if (swim_decode_uint(pos, end, &tmp, msg_pref, "incarnation") != 0) + return -1; + struct swim_member *m = swim_find_member(swim, uuid); + if (m != NULL) + swim_member_update_status(m, MEMBER_LEFT, tmp, swim); + return 0; +} + /** Process a new message. */ static void swim_on_input(struct swim_scheduler *scheduler, const char *pos, const char *end, const struct sockaddr_in *src, const struct sockaddr_in *proxy) { - (void) proxy; const char *msg_pref = "invalid message:"; struct swim *swim = swim_by_scheduler(scheduler); struct tt_uuid uuid; @@ -1237,7 +1393,8 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos, case SWIM_FAILURE_DETECTION: say_verbose("SWIM: process failure detection"); if (swim_process_failure_detection(swim, &pos, end, - src, &uuid) != 0) + src, &uuid, + proxy) != 0) goto error; break; case SWIM_DISSEMINATION: @@ -1245,6 +1402,11 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos, if (swim_process_dissemination(swim, &pos, end) != 0) goto error; break; + case SWIM_QUIT: + say_verbose("SWIM: process quit"); + if (swim_process_quit(swim, &pos, end, src, &uuid) != 0) + goto error; + break; default: diag_set(SwimError, "%s unexpected key", msg_pref); goto error; @@ -1460,7 +1622,7 @@ swim_probe_member(struct swim *swim, const char *uri) swim_task_delete_cb); if (t == NULL) return -1; - swim_send_ping(swim, t, &addr); + swim_send_ping(swim, t, &addr, NULL); return 0; } @@ -1501,3 +1663,56 @@ swim_delete(struct swim *swim) } mh_swim_table_delete(swim->members); } + +/** + * Quit message is broadcasted in the same way as round messages, + * step by step, with the only difference that quit round steps + * follow each other without delays. + */ +static void +swim_quit_step_complete(struct swim_task *task, + struct swim_scheduler *scheduler, int rc) +{ + (void) rc; + (void) task; + struct swim *swim = swim_by_scheduler(scheduler); + if (rlist_empty(&swim->queue_round)) { + swim_delete(swim); + return; + } + struct swim_member *m = + rlist_shift_entry(&swim->queue_round, struct swim_member, + in_queue_round); + swim_task_send(&swim->round_step_task, &m->addr, &swim->scheduler); +} + +void +swim_quit(struct swim *swim) +{ + if (swim->self == NULL) { + swim_delete(swim); + return; + } + ev_periodic_stop(loop(), &swim->round_tick); + ev_periodic_stop(loop(), &swim->wait_ack_tick); + swim_scheduler_stop_input(&swim->scheduler); + /* Start the last round - quiting. */ + if (swim_new_round(swim) != 0 || rlist_empty(&swim->queue_round)) { + swim_delete(swim); + return; + } + swim_task_destroy(&swim->round_step_task); + swim_task_create(&swim->round_step_task, swim_quit_step_complete, + swim_task_delete_cb); + struct swim_quit_bin header; + swim_quit_bin_create(&header, swim->self->incarnation); + int size = mp_sizeof_map(1) + sizeof(header); + char *pos = swim_packet_alloc(&swim->round_step_task.packet, size); + assert(pos != NULL); + pos = mp_encode_map(pos, 1); + memcpy(pos, &header, sizeof(header)); + struct swim_member *m = + rlist_shift_entry(&swim->queue_round, struct swim_member, + in_queue_round); + swim_task_send(&swim->round_step_task, &m->addr, &swim->scheduler); +} diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index dced172c0..24f3a4b33 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -97,6 +97,14 @@ swim_probe_member(struct swim *swim, const char *uri); void swim_info(struct swim *swim, struct info_handler *info); +/** + * Gracefully leave the cluster, broadcast a notification. + * Members, received it, will remove a record about this instance + * from their tables, and will not consider it to be dead. + */ +void +swim_quit(struct swim *swim); + #if defined(__cplusplus) } #endif diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index a8fb1f588..e62b7126f 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -187,6 +187,12 @@ swim_scheduler_bind(struct swim_scheduler *scheduler, return 0; } +void +swim_scheduler_stop_input(struct swim_scheduler *scheduler) +{ + ev_io_stop(loop(), &scheduler->input); +} + void swim_scheduler_destroy(struct swim_scheduler *scheduler) { @@ -202,7 +208,7 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler) } swim_transport_destroy(&scheduler->transport); ev_io_stop(loop(), &scheduler->output); - ev_io_stop(loop(), &scheduler->input); + swim_scheduler_stop_input(scheduler); } static void diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index 0ba8972f0..c13b5d14f 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -178,6 +178,10 @@ int swim_scheduler_bind(struct swim_scheduler *scheduler, const struct sockaddr_in *addr); +/** Stop accepting new packets from the network. */ +void +swim_scheduler_stop_input(struct swim_scheduler *scheduler); + /** Destroy scheduler, its queues, close the socket. */ void swim_scheduler_destroy(struct swim_scheduler *scheduler); diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 8b1ed76a7..aa9edfc9d 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -36,7 +36,9 @@ const char *swim_member_status_strs[] = { "alive", + "suspected", "dead", + "left", }; const char *swim_fd_msg_type_strs[] = { @@ -581,3 +583,13 @@ swim_route_bin_create(struct swim_route_bin *route, route->m_dst_port = 0xcd; route->v_dst_port = mp_bswap_u16(dst->sin_port); } + +void +swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation) +{ + header->k_quit = SWIM_QUIT; + header->m_quit = 0x81; + header->k_incarnation = SWIM_QUIT_INCARNATION; + header->m_incarnation = 0xcf; + header->v_incarnation = mp_bswap_u64(incarnation); +} diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index fe9eb85c5..b743074e5 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -92,6 +92,12 @@ enum { * | }, | * | ... | * | ], | + * | | + * | OR/AND | + * | | + * | SWIM_QUIT: { | + * | SWIM_QUIT_INCARNATION: uint | + * | } | * | } | * +-------------------------------------------------------------+ */ @@ -99,11 +105,19 @@ enum { enum swim_member_status { /** The instance is ok, responds to requests. */ MEMBER_ALIVE = 0, + /** + * If a member has not responded to a ping, it is declared + * as suspected to be dead. After more failed pings it + * is finaly dead. + */ + MEMBER_SUSPECTED, /** * The member is considered to be dead. It will disappear * from the membership, if it is not pinned. */ MEMBER_DEAD, + /** The member has voluntary left the cluster. */ + MEMBER_LEFT, swim_member_status_MAX, }; @@ -157,6 +171,7 @@ enum swim_body_key { SWIM_ANTI_ENTROPY, SWIM_FAILURE_DETECTION, SWIM_DISSEMINATION, + SWIM_QUIT, }; /** @@ -589,6 +604,29 @@ swim_route_bin_create(struct swim_route_bin *route, /** }}} Meta component */ +enum swim_quit_key { + /** Incarnation to ignore old quit messages. */ + SWIM_QUIT_INCARNATION = 0, +}; + +/** Quit section. Describes voluntary quit from the cluster. */ +struct PACKED swim_quit_bin { + /** mp_encode_uint(SWIM_QUIT) */ + uint8_t k_quit; + /** mp_encode_map(1) */ + uint8_t m_quit; + + /** mp_encode_uint(SWIM_QUIT_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + +/** Initialize quit section. */ +void +swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation); + /** * Helpers to decode some values - map, array, etc with * appropriate checks. All of them set diagnostics on an error diff --git a/src/lua/swim.c b/src/lua/swim.c index a20c2dc0d..7df4e5c85 100644 --- a/src/lua/swim.c +++ b/src/lua/swim.c @@ -312,6 +312,16 @@ lua_swim_remove_member(struct lua_State *L) return 1; } +/** Remove a SWIM instance pointer from Lua space, nullify. */ +static void +lua_swim_invalidate(struct lua_State *L) +{ + uint32_t ctypeid; + struct swim **cdata = (struct swim **) luaL_checkcdata(L, 1, &ctypeid); + assert(ctypeid == CTID_STRUCT_SWIM_PTR); + *cdata = NULL; +} + /** * Destroy and delete a SWIM instance. All its memory is freed, it * stops participating in any rounds, the socket is closed. No @@ -326,10 +336,7 @@ lua_swim_delete(struct lua_State *L) if (swim == NULL) return luaL_error(L, "Usage: swim:delete()"); swim_delete(swim); - uint32_t ctypeid; - struct swim **cdata = (struct swim **) luaL_checkcdata(L, 1, &ctypeid); - assert(ctypeid == CTID_STRUCT_SWIM_PTR); - *cdata = NULL; + lua_swim_invalidate(L); return 0; } @@ -379,6 +386,23 @@ lua_swim_probe_member(struct lua_State *L) return 1; } +/** + * Gracefully leave the cluster. The Lua stack should contain one + * value - a SWIM instance. After this method is called, the SWIM + * instance is deleted and can not be used. + * @param L Lua state. + */ +static int +lua_swim_quit(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (swim == NULL) + return luaL_error(L, "Usage: swim:quit()"); + swim_quit(swim); + lua_swim_invalidate(L); + return 0; +} + void tarantool_lua_swim_init(struct lua_State *L) { @@ -390,6 +414,7 @@ tarantool_lua_swim_init(struct lua_State *L) {"delete", lua_swim_delete}, {"info", lua_swim_info}, {"probe_member", lua_swim_probe_member}, + {"quit", lua_swim_quit}, {NULL, NULL} }; luaL_register_module(L, "swim", lua_swim_methods); -- 2.17.2 (Apple Git-113)