From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v4 05/12] [RAW] swim: introduce failure detection component Date: Thu, 31 Jan 2019 00:28:34 +0300 Message-Id: <615765b043ab1b334d43d1a1f263e95c5e21519e.1548883137.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org, vdavydov.dev@gmail.com List-ID: Failure detection components allows to find which members are already dead. Part of #3234 --- src/lib/swim/swim.c | 405 ++++++++++++++++++++++++++++++++++++-- src/lib/swim/swim.h | 11 +- src/lib/swim/swim_io.c | 21 ++ src/lib/swim/swim_io.h | 9 + src/lib/swim/swim_proto.c | 82 +++++++- src/lib/swim/swim_proto.h | 101 +++++++++- src/lua/swim.c | 34 +++- 7 files changed, 641 insertions(+), 22 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index adf01cfad..a862f52a4 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -131,6 +131,31 @@ enum { * value. */ HEARTBEAT_RATE_DEFAULT = 1, + /** + * If a ping was sent, it is considered to be lost after + * this time without an ack. Nothing special in this + * value. + */ + ACK_TIMEOUT_DEFAULT = 30, + /** + * If a member has not been responding to pings this + * number of times, it is considered to be dead. + * According to the SWIM paper, for a member it is enough + * to do not respond on one direct ping, and on K + * simultanous indirect pings, to be considered as dead. + * Seems too little, so here it is bigger. + */ + NO_ACKS_TO_DEAD = 3, + /** + * If a member confirmed to be dead, it is removed from + * the membership after at least this number of + * unacknowledged pings. According to the SWIM paper, a + * dead member is deleted immediately. But here it is held + * for a while to 1) maybe refute its dead status, 2) + * disseminate the status via dissemination and + * anti-entropy components. + */ + NO_ACKS_TO_GC = 2, }; /** @@ -193,6 +218,31 @@ struct swim_member { * Position in a queue of members in the current round. */ struct rlist in_queue_round; + /** + * + * Failure detection component + */ + /** Growing number to refute old messages. */ + uint64_t incarnation; + /** + * How many pings did not receive an ack in a row being in + * the current status. After a threshold the instance is + * marked as dead. After more it is removed from the + * table. On each status or incarnation change this + * counter is reset. + */ + int unacknowledged_pings; + /** + * When the latest ping is considered to be + * unacknowledged. + */ + double ping_deadline; + /** Ready at hand regular ACK task. */ + struct swim_task ack_task; + /** Ready at hand regular PING task. */ + struct swim_task ping_task; + /** Position in a queue of members waiting for an ack. */ + struct rlist in_queue_wait_ack; }; #define mh_name _swim_table @@ -250,8 +300,77 @@ struct swim { * ones. */ struct swim_scheduler scheduler; + /** + * + * Failure detection component + */ + /** + * Members waiting for an ACK. On too long absence of an + * ACK a member is considered to be dead and is removed. + * The list is sorted by deadline in ascending order (tail + * is newer, head is older). + */ + struct rlist queue_wait_ack; + /** Generator of ack checking events. */ + struct ev_periodic wait_ack_tick; }; +/** Put the member into a list of ACK waiters. */ +static void +swim_member_wait_ack(struct swim *swim, struct swim_member *member) +{ + if (rlist_empty(&member->in_queue_wait_ack)) { + member->ping_deadline = fiber_time() + + swim->wait_ack_tick.interval; + rlist_add_tail_entry(&swim->queue_wait_ack, member, + in_queue_wait_ack); + } +} + +/** + * Make all needed actions to process a member's update like a + * change of its status, or incarnation, or both. + */ +static void +swim_member_status_is_updated(struct swim_member *member) +{ + member->unacknowledged_pings = 0; +} + +/** + * Update status and incarnation of the member if needed. Statuses + * are compared as a compound key: {incarnation, status}. So @a + * new_status can override an old one only if its incarnation is + * greater, or the same, but its status is "bigger". Statuses are + * compared by their identifier, so "alive" < "dead". This + * protects from the case when a member is detected as dead on one + * instance, but overriden by another instance with the same + * incarnation "alive" message. + */ +static inline void +swim_member_update_status(struct swim_member *member, + enum swim_member_status new_status, + uint64_t incarnation, struct swim *swim) +{ + (void) swim; + /* + * Source of truth about self is this instance and it is + * never updated from remote. Refutation is handled + * separately. + */ + assert(member != swim->self); + if (member->incarnation == incarnation) { + if (member->status < new_status) { + member->status = new_status; + swim_member_status_is_updated(member); + } + } else if (member->incarnation < incarnation) { + member->status = new_status; + member->incarnation = incarnation; + swim_member_status_is_updated(member); + } +} + /** * A helper to get a pointer to a SWIM instance having only a * pointer to it scheduler. It is used by task complete functions. @@ -276,6 +395,11 @@ swim_member_delete(struct swim *swim, struct swim_member *member) mh_swim_table_del(swim->members, rc, NULL); rlist_del_entry(member, in_queue_round); + /* Failure detection component. */ + rlist_del_entry(member, in_queue_wait_ack); + swim_task_destroy(&member->ack_task); + swim_task_destroy(&member->ping_task); + free(member); } @@ -290,13 +414,34 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid) return *mh_swim_table_node(swim->members, node); } +/** + * Once a ping is sent, the member should start waiting for an + * ACK. + */ +static void +swim_ping_task_complete(struct swim_task *task, + struct swim_scheduler *scheduler, int rc) +{ + /* + * If ping send has failed, it makes to sense to wait for + * an ACK. + */ + if (rc != 0) + return; + struct swim *swim = swim_by_scheduler(scheduler); + struct swim_member *m = container_of(task, struct swim_member, + ping_task); + swim_member_wait_ack(swim, m); +} + /** * Register a new member with a specified status. Here it is * added to the hash, to the 'next' queue. */ static struct swim_member * swim_member_new(struct swim *swim, const struct sockaddr_in *addr, - const struct tt_uuid *uuid, enum swim_member_status status) + const struct tt_uuid *uuid, enum swim_member_status status, + uint64_t incarnation) { struct swim_member *member = (struct swim_member *) calloc(1, sizeof(*member)); @@ -318,6 +463,12 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr, } rlist_add_entry(&swim->queue_round, member, in_queue_round); + /* Failure detection component. */ + member->incarnation = incarnation; + rlist_create(&member->in_queue_wait_ack); + swim_task_create(&member->ack_task, NULL, NULL); + swim_task_create(&member->ping_task, swim_ping_task_complete, NULL); + say_verbose("SWIM: member %s is added", swim_uuid_str(uuid)); return member; } @@ -402,7 +553,7 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet) break; size = new_size; swim_member_bin_fill(&member_bin, &m->addr, &m->uuid, - m->status); + m->status, m->incarnation); memcpy(pos, &member_bin, sizeof(member_bin)); /* * First random member could be choosen too close @@ -439,6 +590,26 @@ swim_encode_src_uuid(struct swim *swim, struct swim_packet *packet) return 1; } +/** + * Encode failure detection component. + * @retval 0 Not error, but nothing is encoded. + * @retval 1 Something is encoded. + */ +static int +swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet, + enum swim_fd_msg_type type) +{ + struct swim_fd_header_bin fd_header_bin; + int size = sizeof(fd_header_bin); + char *pos = swim_packet_alloc(packet, size); + if (pos == NULL) + return 0; + swim_fd_header_bin_create(&fd_header_bin, type, + swim->self->incarnation); + memcpy(pos, &fd_header_bin, size); + return 1; +} + /** Encode SWIM components into a UDP packet. */ static void swim_encode_round_msg(struct swim *swim, struct swim_packet *packet) @@ -447,9 +618,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet) char *header = swim_packet_alloc(packet, 1); int map_size = 0; map_size += swim_encode_src_uuid(swim, packet); + map_size += swim_encode_failure_detection(swim, packet, + SWIM_FD_MSG_PING); map_size += swim_encode_anti_entropy(swim, packet); - assert(mp_sizeof_map(map_size) == 1 && map_size == 2); + assert(mp_sizeof_map(map_size) == 1 && map_size >= 2); mp_encode_map(header, map_size); } @@ -494,8 +667,89 @@ swim_round_step_complete(struct swim_task *task, (void) task; struct swim *swim = swim_by_scheduler(scheduler); ev_periodic_start(loop(), &swim->round_tick); - rlist_shift_entry(&swim->queue_round, struct swim_member, - in_queue_round); + struct swim_member *m = + rlist_shift_entry(&swim->queue_round, struct swim_member, + in_queue_round); + if (rc == 0) { + /* + * Each round message contains failure detection + * section with a ping. + */ + swim_member_wait_ack(swim, m); + } +} + +/** Schedule send of a failure detection message. */ +static void +swim_send_fd_request(struct swim *swim, struct swim_task *task, + const struct sockaddr_in *dst, enum swim_fd_msg_type type) +{ + /* + * Reset packet allocator in case if task is being reused. + */ + swim_packet_create(&task->packet); + char *header = swim_packet_alloc(&task->packet, 1); + int map_size = swim_encode_src_uuid(swim, &task->packet); + map_size += swim_encode_failure_detection(swim, &task->packet, type); + assert(map_size == 2); + mp_encode_map(header, map_size); + say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type], + sio_strfaddr((struct sockaddr *) dst, sizeof(*dst))); + swim_task_send(task, dst, &swim->scheduler); +} + +/** Schedule send of an ack. */ +static inline void +swim_send_ack(struct swim *swim, struct swim_task *task, + const struct sockaddr_in *dst) +{ + swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_ACK); +} + +/** Schedule send of a ping. */ +static inline void +swim_send_ping(struct swim *swim, struct swim_task *task, + const struct sockaddr_in *dst) +{ + swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_PING); +} + +/** + * Check for unacknowledged pings. A ping is unacknowledged if an + * ack was not received during ack timeout. An unacknowledged ping + * is resent here. + */ +static void +swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events) +{ + assert((events & EV_PERIODIC) != 0); + (void) loop; + (void) events; + struct swim *swim = (struct swim *) p->data; + struct swim_member *m, *tmp; + double current_time = fiber_time(); + rlist_foreach_entry_safe(m, &swim->queue_wait_ack, in_queue_wait_ack, + tmp) { + if (current_time < m->ping_deadline) + break; + ++m->unacknowledged_pings; + switch (m->status) { + case MEMBER_ALIVE: + if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) { + m->status = MEMBER_DEAD; + swim_member_status_is_updated(m); + } + break; + case MEMBER_DEAD: + if (m->unacknowledged_pings >= NO_ACKS_TO_GC) + swim_member_delete(swim, m); + break; + default: + unreachable(); + } + swim_send_ping(swim, &m->ping_task, &m->addr); + rlist_del_entry(m, in_queue_wait_ack); + } } /** @@ -536,7 +790,11 @@ static inline void swim_member_update_addr(struct swim_member *member, const struct sockaddr_in *addr) { - member->addr = *addr; + if (addr->sin_port != member->addr.sin_port || + addr->sin_addr.s_addr != member->addr.sin_addr.s_addr) { + member->addr = *addr; + swim_member_status_is_updated(member); + } } /** @@ -550,11 +808,50 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def) { struct swim_member *member = swim_find_member(swim, &def->uuid); if (member == NULL) { + if (def->status == MEMBER_DEAD) { + /* + * Do not 'resurrect' dead members to + * prevent 'ghost' members. Ghost member + * is a one declared as dead, sent via + * anti-entropy, and removed from local + * members table, but then returned back + * from received anti-entropy, as again + * dead. Such dead members could 'live' + * forever. + */ + return NULL; + } member = swim_member_new(swim, &def->addr, &def->uuid, - def->status); + def->status, def->incarnation); return member; } - swim_member_update_addr(member, &def->addr); + struct swim_member *self = swim->self; + if (member != self) { + if (def->incarnation >= member->incarnation) { + swim_member_update_addr(member, &def->addr); + swim_member_update_status(member, def->status, + def->incarnation, swim); + } + return member; + } + /* + * It is possible that other instances know a bigger + * incarnation of this instance - such thing happens when + * the instance restarts and loses its local incarnation + * number. It will be restored by receiving dissemination + * and anti-entropy messages about self. + */ + if (self->incarnation < def->incarnation) + self->incarnation = def->incarnation; + if (def->status != MEMBER_ALIVE && + def->incarnation == self->incarnation) { + /* + * In the cluster a gossip exists that this + * instance is not alive. Refute this information + * with a bigger incarnation. + */ + self->incarnation++; + } return member; } @@ -581,12 +878,54 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) return 0; } +/** + * Decode a failure detection message. Schedule acks, process + * acks. + */ +static int +swim_process_failure_detection(struct swim *swim, const char **pos, + const char *end, const struct sockaddr_in *src, + const struct tt_uuid *uuid) +{ + const char *msg_pref = "invalid failure detection message:"; + struct swim_failure_detection_def def; + struct swim_member_def mdef; + if (swim_failure_detection_def_decode(&def, pos, end, msg_pref) != 0) + return -1; + swim_member_def_create(&mdef); + memset(&mdef, 0, sizeof(mdef)); + mdef.addr = *src; + mdef.incarnation = def.incarnation; + mdef.uuid = *uuid; + struct swim_member *member = swim_update_member(swim, &mdef); + if (member == NULL) + return -1; + + switch (def.type) { + case SWIM_FD_MSG_PING: + swim_send_ack(swim, &member->ack_task, &member->addr); + break; + case SWIM_FD_MSG_ACK: + if (def.incarnation >= member->incarnation) { + /* + * Pings are reset above, in + * swim_update_member(). + */ + assert(member->unacknowledged_pings == 0); + rlist_del_entry(member, in_queue_wait_ack); + } + break; + default: + unreachable(); + } + return 0; +} + /** Process a new message. */ static void swim_on_input(struct swim_scheduler *scheduler, const char *pos, const char *end, const struct sockaddr_in *src) { - (void) src; const char *msg_pref = "invalid message:"; struct swim *swim = swim_by_scheduler(scheduler); struct tt_uuid uuid; @@ -617,6 +956,12 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos, if (swim_process_anti_entropy(swim, &pos, end) != 0) goto error; break; + case SWIM_FAILURE_DETECTION: + say_verbose("SWIM: process failure detection"); + if (swim_process_failure_detection(swim, &pos, end, + src, &uuid) != 0) + goto error; + break; default: diag_set(SwimError, "%s unexpected key", msg_pref); goto error; @@ -649,6 +994,12 @@ swim_new(void) swim_task_create(&swim->round_step_task, swim_round_step_complete, NULL); swim_scheduler_create(&swim->scheduler, swim_on_input); + + /* Failure detection component. */ + rlist_create(&swim->queue_wait_ack); + ev_init(&swim->wait_ack_tick, swim_check_acks); + ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT_DEFAULT, NULL); + swim->wait_ack_tick.data = (void *) swim; return swim; } @@ -674,12 +1025,12 @@ swim_uri_to_addr(const char *uri, struct sockaddr_in *addr, int swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, - const struct tt_uuid *uuid) + double ack_timeout, const struct tt_uuid *uuid) { const char *msg_pref = "swim.cfg:"; - if (heartbeat_rate < 0) { - diag_set(IllegalParams, "%s heartbeat_rate should be a "\ - "positive number", msg_pref); + if (heartbeat_rate < 0 || ack_timeout < 0) { + diag_set(IllegalParams, "%s heartbeat_rate and ack_timeout "\ + "should be positive numbers", msg_pref); return -1; } struct sockaddr_in addr; @@ -692,7 +1043,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, "a first config", msg_pref); return -1; } - swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE); + swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, + 0); if (swim->self == NULL) return -1; } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { @@ -730,7 +1082,11 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0) ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL); + if (swim->wait_ack_tick.interval != ack_timeout && ack_timeout > 0) + ev_periodic_set(&swim->wait_ack_tick, 0, ack_timeout, NULL); + ev_periodic_start(loop(), &swim->round_tick); + ev_periodic_start(loop(), &swim->wait_ack_tick); if (! is_first_cfg) { swim_member_update_addr(swim->self, &addr); @@ -766,7 +1122,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) return -1; struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { - member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE); + member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0); return member == NULL ? -1 : 0; } diag_set(SwimError, "%s a member with such UUID already exists", @@ -791,6 +1147,23 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid) return 0; } +int +swim_probe_member(struct swim *swim, const char *uri) +{ + const char *msg_pref = "swim.probe_member:"; + if (swim_check_is_configured(swim, msg_pref) != 0) + return -1; + struct sockaddr_in addr; + if (swim_uri_to_addr(uri, &addr, msg_pref) != 0) + return -1; + struct swim_task *t = swim_task_new(swim_task_delete_cb, + swim_task_delete_cb); + if (t == NULL) + return -1; + swim_send_ping(swim, t, &addr); + return 0; +} + void swim_info(struct swim *swim, struct info_handler *info) { @@ -806,6 +1179,7 @@ swim_info(struct swim *swim, struct info_handler *info) info_append_str(info, "status", swim_member_status_strs[m->status]); info_append_str(info, "uuid", swim_uuid_str(&m->uuid)); + info_append_int(info, "incarnation", (int64_t) m->incarnation); info_table_end(info); } info_end(info); @@ -816,6 +1190,7 @@ swim_delete(struct swim *swim) { swim_scheduler_destroy(&swim->scheduler); ev_periodic_stop(loop(), &swim->round_tick); + ev_periodic_stop(loop(), &swim->wait_ack_tick); swim_task_destroy(&swim->round_step_task); mh_int_t node = mh_first(swim->members); while (node != mh_end(swim->members)) { diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index a98decc86..9d21a739d 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -55,6 +55,8 @@ swim_new(void); * @heartbeat_rate seconds. It is rather the protocol * speed. Protocol period depends on member count and * @heartbeat_rate. + * @param ack_timeout Time in seconds after which a ping is + * considered to be unacknowledged. * @param uuid UUID of this instance. Must be unique over the * cluster. * @@ -63,7 +65,7 @@ swim_new(void); */ int swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, - const struct tt_uuid *uuid); + double ack_timeout, const struct tt_uuid *uuid); /** * Stop listening and broadcasting messages, cleanup all internal @@ -80,6 +82,13 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid); int swim_remove_member(struct swim *swim, const struct tt_uuid *uuid); +/** + * Send a ping to this address. If an ACK is received, the member + * will be added. + */ +int +swim_probe_member(struct swim *swim, const char *uri); + /** Dump member statuses into @a info. */ void swim_info(struct swim *swim, struct info_handler *info); diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index 20973acaf..170d7af77 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -65,6 +65,27 @@ swim_task_create(struct swim_task *task, swim_task_f complete, rlist_create(&task->in_queue_output); } +struct swim_task * +swim_task_new(swim_task_f complete, swim_task_f cancel) +{ + struct swim_task *task = (struct swim_task *) malloc(sizeof(*task)); + if (task == NULL) { + diag_set(OutOfMemory, sizeof(*task), "malloc", "task"); + return NULL; + } + swim_task_create(task, complete, cancel); + return task; +} + +void +swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler, + int rc) +{ + (void) rc; + (void) scheduler; + free(task); +} + /** Put the task into the queue of output tasks. */ static inline void swim_task_schedule(struct swim_task *task, struct swim_scheduler *scheduler) diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index 68fb89818..508d1ef6e 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -215,6 +215,15 @@ void swim_task_create(struct swim_task *task, swim_task_f complete, swim_task_f cancel); +/** Allocate and create a new task. */ +struct swim_task * +swim_task_new(swim_task_f complete, swim_task_f cancel); + +/** Callback to delete a task after its completion. */ +void +swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler, + int rc); + /** Destroy the task, pop from the queue. */ static inline void swim_task_destroy(struct swim_task *task) diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index a273cb815..542b988c1 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -36,6 +36,12 @@ const char *swim_member_status_strs[] = { "alive", + "dead", +}; + +const char *swim_fd_msg_type_strs[] = { + "ping", + "ack", }; int @@ -178,6 +184,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member uuid") != 0) return -1; break; + case SWIM_MEMBER_INCARNATION: + if (swim_decode_uint(pos, end, &def->incarnation, msg_pref, + "member incarnation") != 0) + return -1; + break; default: unreachable(); } @@ -225,6 +236,70 @@ swim_src_uuid_bin_create(struct swim_src_uuid_bin *header, memcpy(header->v_uuid, uuid, UUID_LEN); } +void +swim_fd_header_bin_create(struct swim_fd_header_bin *header, + enum swim_fd_msg_type type, uint64_t incarnation) +{ + header->k_header = SWIM_FAILURE_DETECTION; + header->m_header = 0x82; + + header->k_type = SWIM_FD_MSG_TYPE; + header->v_type = type; + + header->k_incarnation = SWIM_FD_INCARNATION; + header->m_incarnation = 0xcf; + header->v_incarnation = mp_bswap_u64(incarnation); +} + +int +swim_failure_detection_def_decode(struct swim_failure_detection_def *def, + const char **pos, const char *end, + const char *msg_pref) +{ + uint32_t size; + if (swim_decode_map(pos, end, &size, msg_pref, "root") != 0) + return -1; + memset(def, 0, sizeof(*def)); + def->type = swim_fd_msg_type_MAX; + if (size != 2) { + diag_set(SwimError, "%s root map should have two keys - "\ + "message type and incarnation", msg_pref); + return -1; + } + for (int i = 0; i < (int) size; ++i) { + uint64_t key; + if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0) + return -1; + switch(key) { + case SWIM_FD_MSG_TYPE: + if (swim_decode_uint(pos, end, &key, msg_pref, + "message type") != 0) + return -1; + if (key >= swim_fd_msg_type_MAX) { + diag_set(SwimError, "%s unknown message type", + msg_pref); + return -1; + } + def->type = key; + break; + case SWIM_FD_INCARNATION: + if (swim_decode_uint(pos, end, &def->incarnation, + msg_pref, "incarnation") != 0) + return -1; + break; + default: + diag_set(SwimError, "%s unexpected key", msg_pref); + return -1; + } + } + if (def->type == swim_fd_msg_type_MAX) { + diag_set(SwimError, "%s message type should be specified", + msg_pref); + return -1; + } + return 0; +} + void swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, uint16_t batch_size) @@ -237,18 +312,19 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, void swim_member_bin_fill(struct swim_member_bin *header, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status) + enum swim_member_status status, uint64_t incarnation) { header->v_status = status; header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr); header->v_port = mp_bswap_u16(addr->sin_port); memcpy(header->v_uuid, uuid, UUID_LEN); + header->v_incarnation = mp_bswap_u64(incarnation); } void swim_member_bin_create(struct swim_member_bin *header) { - header->m_header = 0x84; + header->m_header = 0x85; header->k_status = SWIM_MEMBER_STATUS; header->k_addr = SWIM_MEMBER_ADDRESS; header->m_addr = 0xce; @@ -257,6 +333,8 @@ swim_member_bin_create(struct swim_member_bin *header) header->k_uuid = SWIM_MEMBER_UUID; header->m_uuid = 0xc4; header->m_uuid_len = UUID_LEN; + header->k_incarnation = SWIM_MEMBER_INCARNATION; + header->m_incarnation = 0xcf; } void diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index 6e36f0b07..91a0bca9d 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -51,12 +51,20 @@ * | | * | AND | * | | + * | SWIM_FAILURE_DETECTION: { | + * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, | + * | SWIM_FD_INCARNATION: uint | + * | }, | + * | | + * | OR/AND | + * | | * | SWIM_ANTI_ENTROPY: [ | * | { | * | SWIM_MEMBER_STATUS: uint, enum member_status, | * | SWIM_MEMBER_ADDRESS: uint, ip, | * | SWIM_MEMBER_PORT: uint, port, | - * | SWIM_MEMBER_UUID: 16 byte UUID | + * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_INCARNATION: uint | * | }, | * | ... | * | ], | @@ -67,6 +75,11 @@ enum swim_member_status { /** The instance is ok, responds to requests. */ MEMBER_ALIVE = 0, + /** + * The member is considered to be dead. It will disappear + * from the membership, if it is not pinned. + */ + MEMBER_DEAD, swim_member_status_MAX, }; @@ -79,6 +92,7 @@ extern const char *swim_member_status_strs[]; struct swim_member_def { struct tt_uuid uuid; struct sockaddr_in addr; + uint64_t incarnation; enum swim_member_status status; }; @@ -109,6 +123,7 @@ swim_member_def_decode(struct swim_member_def *def, const char **pos, enum swim_body_key { SWIM_SRC_UUID = 0, SWIM_ANTI_ENTROPY, + SWIM_FAILURE_DETECTION, }; /** @@ -131,6 +146,79 @@ void swim_src_uuid_bin_create(struct swim_src_uuid_bin *header, const struct tt_uuid *uuid); +/** {{{ Failure detection component */ + +/** Failure detection component keys. */ +enum swim_fd_key { + /** Type of the failure detection message: ping or ack. */ + SWIM_FD_MSG_TYPE, + /** + * Incarnation of the sender. To make the member alive if + * it was considered to be dead, but ping/ack with greater + * incarnation was received from it. + */ + SWIM_FD_INCARNATION, +}; + +/** Failure detection message type. */ +enum swim_fd_msg_type { + SWIM_FD_MSG_PING, + SWIM_FD_MSG_ACK, + swim_fd_msg_type_MAX, +}; + +extern const char *swim_fd_msg_type_strs[]; + +/** SWIM failure detection MessagePack header template. */ +struct PACKED swim_fd_header_bin { + /** mp_encode_uint(SWIM_FAILURE_DETECTION) */ + uint8_t k_header; + /** mp_encode_map(2) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_FD_MSG_TYPE) */ + uint8_t k_type; + /** mp_encode_uint(enum swim_fd_msg_type) */ + uint8_t v_type; + + /** mp_encode_uint(SWIM_FD_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + +/** Initialize failure detection section. */ +void +swim_fd_header_bin_create(struct swim_fd_header_bin *header, + enum swim_fd_msg_type type, uint64_t incarnation); + +/** A decoded failure detection message. */ +struct swim_failure_detection_def { + /** Type of the message. */ + enum swim_fd_msg_type type; + /** Incarnation of the sender. */ + uint64_t incarnation; +}; + +/** + * Decode failure detection from a MessagePack buffer. + * @param[out] def Definition to decode into. + * @param[in][out] pos Start of the MessagePack buffer. + * @param end End of the MessagePack buffer. + * @param msg_pref A prefix of an error message to use for + * diag_set, when something is wrong. + * + * @retval 0 Success. + * @retval -1 Error. + */ +int +swim_failure_detection_def_decode(struct swim_failure_detection_def *def, + const char **pos, const char *end, + const char *msg_pref); + +/** }}} Failure detection component */ + /** {{{ Anti-entropy component */ /** @@ -142,6 +230,7 @@ enum swim_member_key { SWIM_MEMBER_ADDRESS, SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, + SWIM_MEMBER_INCARNATION, swim_member_key_MAX, }; @@ -164,7 +253,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, * anti-entropy section. */ struct PACKED swim_member_bin { - /** mp_encode_map(4) */ + /** mp_encode_map(5) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -190,6 +279,12 @@ struct PACKED swim_member_bin { uint8_t m_uuid; uint8_t m_uuid_len; uint8_t v_uuid[UUID_LEN]; + + /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; }; /** Initialize antri-entropy record. */ @@ -205,7 +300,7 @@ swim_member_bin_create(struct swim_member_bin *header); void swim_member_bin_fill(struct swim_member_bin *header, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status); + enum swim_member_status status, uint64_t incarnation); /** }}} Anti-entropy component */ diff --git a/src/lua/swim.c b/src/lua/swim.c index 317f53e73..a20c2dc0d 100644 --- a/src/lua/swim.c +++ b/src/lua/swim.c @@ -192,8 +192,10 @@ lua_swim_cfg_impl(struct lua_State *L, int ncfg, struct swim *swim, lua_swim_get_uuid_field(L, ncfg, "uuid", funcname, &uuid); double heartbeat_rate = lua_swim_get_timeout_field(L, ncfg, "heartbeat", funcname); + double ack_timeout = + lua_swim_get_timeout_field(L, ncfg, "ack_timeout", funcname); - return swim_cfg(swim, server_uri, heartbeat_rate, &uuid); + return swim_cfg(swim, server_uri, heartbeat_rate, ack_timeout, &uuid); } /** @@ -348,6 +350,35 @@ lua_swim_info(struct lua_State *L) return 1; } +/** + * Send a ping to a URI assuming that there is a member, which + * will respond with an ack, and will be added to the local + * members table.The Lua stack should contain two values - a SWIM + * instance to probe by, and a URI of a member. + * @param L Lua state. + * @retval 1 True. + * @retval 2 Nil and an error object. On invalid Lua parameters + * and OOM it throws. + */ +static int +lua_swim_probe_member(struct lua_State *L) +{ + struct swim *swim = lua_swim_ptr(L, 1); + if (lua_gettop(L) != 2 || swim == NULL) + return luaL_error(L, "Usage: swim:probe_member(uri)"); + if (! lua_isstring(L, 2)) { + return luaL_error(L, "swim.probe_member: member URI should "\ + "be a string"); + } + if (swim_probe_member(swim, lua_tostring(L, 2)) != 0) { + lua_pushnil(L); + luaT_pusherror(L, diag_last_error(diag_get())); + return 2; + } + lua_pushboolean(L, true); + return 1; +} + void tarantool_lua_swim_init(struct lua_State *L) { @@ -358,6 +389,7 @@ tarantool_lua_swim_init(struct lua_State *L) {"remove_member", lua_swim_remove_member}, {"delete", lua_swim_delete}, {"info", lua_swim_info}, + {"probe_member", lua_swim_probe_member}, {NULL, NULL} }; luaL_register_module(L, "swim", lua_swim_methods); -- 2.17.2 (Apple Git-113)