[PATCH v4 10/12] [RAW] swim: introduce 'quit' message
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Jan 31 00:28:28 MSK 2019
'Quit' message helps gracefully leave the cluster, notifying all
members that this instance is not dead, but just decided to
leave.
Part of #3234
---
src/lib/swim/swim.c | 255 +++++++++++++++++++++++++++++++++++---
src/lib/swim/swim.h | 8 ++
src/lib/swim/swim_io.c | 8 +-
src/lib/swim/swim_io.h | 4 +
src/lib/swim/swim_proto.c | 12 ++
src/lib/swim/swim_proto.h | 38 ++++++
src/lua/swim.c | 33 ++++-
7 files changed, 333 insertions(+), 25 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 5cec3789a..b377f154f 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -138,8 +138,14 @@ enum {
*/
ACK_TIMEOUT_DEFAULT = 30,
/**
- * If a member has not been responding to pings this
- * number of times, it is considered to be dead.
+ * If an alive member has not been responding to pings
+ * this number of times, it is suspected to be dead. To
+ * confirm the death it should fail more pings.
+ */
+ NO_ACKS_TO_SUSPECT = 2,
+ /**
+ * If a suspected member has not been responding to pings
+ * this number of times, it is considered to be dead.
* According to the SWIM paper, for a member it is enough
* to do not respond on one direct ping, and on K
* simultanous indirect pings, to be considered as dead.
@@ -156,6 +162,11 @@ enum {
* anti-entropy components.
*/
NO_ACKS_TO_GC = 2,
+ /**
+ * Number of attempts to reach out a member who did not
+ * answered on a regular ping via another members.
+ */
+ INDIRECT_PING_COUNT = 2,
};
/**
@@ -367,6 +378,15 @@ struct swim {
struct rlist queue_events;
};
+/** Get a random member from a members table. */
+static inline struct swim_member *
+swim_random_member(struct swim *swim)
+{
+ int rnd = swim_scaled_rand(0, mh_size(swim->members) - 1);
+ mh_int_t node = mh_swim_table_random(swim->members, rnd);
+ return *mh_swim_table_node(swim->members, node);
+}
+
/** Reset cached round message on any change of any member. */
static inline void
cached_round_msg_invalidate(struct swim *swim)
@@ -374,15 +394,34 @@ cached_round_msg_invalidate(struct swim *swim)
swim_packet_create(&swim->round_step_task.packet);
}
+/** Comparator for a sorted list of ping deadlines. */
+static inline int
+swim_member_ping_deadline_cmp(struct swim_member *a, struct swim_member *b)
+{
+ double res = a->ping_deadline - b->ping_deadline;
+ if (res > 0)
+ return 1;
+ return res < 0 ? -1 : 0;
+}
+
/** Put the member into a list of ACK waiters. */
static void
-swim_member_wait_ack(struct swim *swim, struct swim_member *member)
+swim_member_wait_ack(struct swim *swim, struct swim_member *member,
+ int hop_count)
{
if (rlist_empty(&member->in_queue_wait_ack)) {
- member->ping_deadline = fiber_time() +
- swim->wait_ack_tick.interval;
- rlist_add_tail_entry(&swim->queue_wait_ack, member,
- in_queue_wait_ack);
+ double timeout = swim->wait_ack_tick.interval * hop_count;
+ member->ping_deadline = fiber_time() + timeout;
+ struct swim_member *pos;
+ /*
+ * Indirect ping deadline can be later than
+ * deadlines of some of newer direct pings, so it
+ * is not enough to just append a new member to
+ * the end of this list.
+ */
+ rlist_add_tail_entry_sorted(&swim->queue_wait_ack, pos, member,
+ in_queue_wait_ack,
+ swim_member_ping_deadline_cmp);
}
}
@@ -556,7 +595,7 @@ swim_ping_task_complete(struct swim_task *task,
struct swim *swim = swim_by_scheduler(scheduler);
struct swim_member *m = container_of(task, struct swim_member,
ping_task);
- swim_member_wait_ack(swim, m);
+ swim_member_wait_ack(swim, m, 1);
}
/**
@@ -845,6 +884,8 @@ swim_decrease_events_ttl(struct swim *swim)
if (--member->status_ttl == 0) {
rlist_del_entry(member, in_queue_events);
cached_round_msg_invalidate(swim);
+ if (member->status == MEMBER_LEFT)
+ swim_member_delete(swim, member);
}
}
}
@@ -897,7 +938,7 @@ swim_round_step_complete(struct swim_task *task,
* Each round message contains failure detection
* section with a ping.
*/
- swim_member_wait_ack(swim, m);
+ swim_member_wait_ack(swim, m, 1);
/* As well as dissemination. */
swim_decrease_events_ttl(swim);
}
@@ -906,12 +947,15 @@ swim_round_step_complete(struct swim_task *task,
/** Schedule send of a failure detection message. */
static void
swim_send_fd_request(struct swim *swim, struct swim_task *task,
- const struct sockaddr_in *dst, enum swim_fd_msg_type type)
+ const struct sockaddr_in *dst, enum swim_fd_msg_type type,
+ const struct sockaddr_in *proxy)
{
/*
* Reset packet allocator in case if task is being reused.
*/
swim_packet_create(&task->packet);
+ if (proxy != NULL)
+ swim_task_proxy(task, proxy);
char *header = swim_packet_alloc(&task->packet, 1);
int map_size = swim_encode_src_uuid(swim, &task->packet);
map_size += swim_encode_failure_detection(swim, &task->packet, type);
@@ -925,17 +969,82 @@ swim_send_fd_request(struct swim *swim, struct swim_task *task,
/** Schedule send of an ack. */
static inline void
swim_send_ack(struct swim *swim, struct swim_task *task,
- const struct sockaddr_in *dst)
+ const struct sockaddr_in *dst, const struct sockaddr_in *proxy)
{
- swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_ACK);
+ swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_ACK, proxy);
}
/** Schedule send of a ping. */
static inline void
swim_send_ping(struct swim *swim, struct swim_task *task,
- const struct sockaddr_in *dst)
+ const struct sockaddr_in *dst, const struct sockaddr_in *proxy)
{
- swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_PING);
+ swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_PING, proxy);
+}
+
+/**
+ * Indirect ping task. It is executed multiple times to send a
+ * ping to several random members. Main motivation of this task is
+ * to do not create many tasks for indirect pings swarm, but reuse
+ * one.
+ */
+struct swim_iping_task {
+ /** Base structure. */
+ struct swim_task base;
+ /**
+ * How many times to send. Decremented on each send and on
+ * 0 the task is deleted.
+ */
+ int ttl;
+};
+
+/** Reschedule the task with a different proxy, or delete. */
+static void
+swim_iping_task_complete(struct swim_task *base_task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) rc;
+ struct swim *swim = swim_by_scheduler(scheduler);
+ struct swim_iping_task *task = (struct swim_iping_task *) base_task;
+ if (--task->ttl == 0) {
+ swim_task_destroy(base_task);
+ free(task);
+ return;
+ }
+ swim_task_send(base_task, &swim_random_member(swim)->addr, scheduler);
+}
+
+/**
+ * Schedule a number of indirect pings of a member with the
+ * specified address and UUID.
+ */
+static inline int
+swim_send_indirect_pings(struct swim *swim, const struct sockaddr_in *dst)
+{
+ struct swim_iping_task *task =
+ (struct swim_iping_task *) malloc(sizeof(*task));
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+ return -1;
+ }
+ task->ttl = INDIRECT_PING_COUNT;
+ swim_task_create(&task->base, swim_iping_task_complete,
+ swim_task_delete_cb);
+ swim_send_ping(swim, &task->base, dst, &swim_random_member(swim)->addr);
+ return 0;
+}
+
+/** Schedule an indirect ACK. */
+static inline int
+swim_send_indirect_ack(struct swim *swim, const struct sockaddr_in *dst,
+ const struct sockaddr_in *proxy)
+{
+ struct swim_task *task = swim_task_new(swim_task_delete_cb,
+ swim_task_delete_cb);
+ if (task == NULL)
+ return -1;
+ swim_send_ack(swim, task, dst, proxy);
+ return 0;
}
/**
@@ -959,6 +1068,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
++m->unacknowledged_pings;
switch (m->status) {
case MEMBER_ALIVE:
+ if (m->unacknowledged_pings < NO_ACKS_TO_SUSPECT)
+ break;
+ m->status = MEMBER_SUSPECTED;
+ swim_member_status_is_updated(m, swim);
+ if (swim_send_indirect_pings(swim, &m->addr) != 0)
+ diag_log();
+ break;
+ case MEMBER_SUSPECTED:
if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
swim_member_status_is_updated(m, swim);
@@ -969,10 +1086,12 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
m->status_ttl == 0)
swim_member_delete(swim, m);
break;
+ case MEMBER_LEFT:
+ break;
default:
unreachable();
}
- swim_send_ping(swim, &m->ping_task, &m->addr);
+ swim_send_ping(swim, &m->ping_task, &m->addr, NULL);
rlist_del_entry(m, in_queue_wait_ack);
}
}
@@ -1136,7 +1255,8 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
static int
swim_process_failure_detection(struct swim *swim, const char **pos,
const char *end, const struct sockaddr_in *src,
- const struct tt_uuid *uuid)
+ const struct tt_uuid *uuid,
+ const struct sockaddr_in *proxy)
{
const char *msg_pref = "invalid failure detection message:";
struct swim_failure_detection_def def;
@@ -1154,7 +1274,13 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
switch (def.type) {
case SWIM_FD_MSG_PING:
- swim_send_ack(swim, &member->ack_task, &member->addr);
+ if (proxy == NULL) {
+ swim_send_ack(swim, &member->ack_task, &member->addr,
+ NULL);
+ } else if (swim_send_indirect_ack(swim, &member->addr,
+ proxy) != 0) {
+ diag_log();
+ }
break;
case SWIM_FD_MSG_ACK:
if (def.incarnation >= member->incarnation) {
@@ -1197,13 +1323,43 @@ swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
return 0;
}
+/**
+ * Decode a quit message. Schedule dissemination, change status.
+ */
+static int
+swim_process_quit(struct swim *swim, const char **pos, const char *end,
+ const struct sockaddr_in *src, const struct tt_uuid *uuid)
+{
+ (void) src;
+ const char *msg_pref = "invald quit message:";
+ uint32_t size;
+ if (swim_decode_map(pos, end, &size, msg_pref, "root") != 0)
+ return -1;
+ if (size != 1) {
+ diag_set(SwimError, "%s map of size 1 is expected", msg_pref);
+ return -1;
+ }
+ uint64_t tmp;
+ if (swim_decode_uint(pos, end, &tmp, msg_pref, "a key") != 0)
+ return -1;
+ if (tmp != SWIM_QUIT_INCARNATION) {
+ diag_set(SwimError, "%s a key should be incarnation", msg_pref);
+ return -1;
+ }
+ if (swim_decode_uint(pos, end, &tmp, msg_pref, "incarnation") != 0)
+ return -1;
+ struct swim_member *m = swim_find_member(swim, uuid);
+ if (m != NULL)
+ swim_member_update_status(m, MEMBER_LEFT, tmp, swim);
+ return 0;
+}
+
/** Process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler, const char *pos,
const char *end, const struct sockaddr_in *src,
const struct sockaddr_in *proxy)
{
- (void) proxy;
const char *msg_pref = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
@@ -1237,7 +1393,8 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
case SWIM_FAILURE_DETECTION:
say_verbose("SWIM: process failure detection");
if (swim_process_failure_detection(swim, &pos, end,
- src, &uuid) != 0)
+ src, &uuid,
+ proxy) != 0)
goto error;
break;
case SWIM_DISSEMINATION:
@@ -1245,6 +1402,11 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
if (swim_process_dissemination(swim, &pos, end) != 0)
goto error;
break;
+ case SWIM_QUIT:
+ say_verbose("SWIM: process quit");
+ if (swim_process_quit(swim, &pos, end, src, &uuid) != 0)
+ goto error;
+ break;
default:
diag_set(SwimError, "%s unexpected key", msg_pref);
goto error;
@@ -1460,7 +1622,7 @@ swim_probe_member(struct swim *swim, const char *uri)
swim_task_delete_cb);
if (t == NULL)
return -1;
- swim_send_ping(swim, t, &addr);
+ swim_send_ping(swim, t, &addr, NULL);
return 0;
}
@@ -1501,3 +1663,56 @@ swim_delete(struct swim *swim)
}
mh_swim_table_delete(swim->members);
}
+
+/**
+ * Quit message is broadcasted in the same way as round messages,
+ * step by step, with the only difference that quit round steps
+ * follow each other without delays.
+ */
+static void
+swim_quit_step_complete(struct swim_task *task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) rc;
+ (void) task;
+ struct swim *swim = swim_by_scheduler(scheduler);
+ if (rlist_empty(&swim->queue_round)) {
+ swim_delete(swim);
+ return;
+ }
+ struct swim_member *m =
+ rlist_shift_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
+ swim_task_send(&swim->round_step_task, &m->addr, &swim->scheduler);
+}
+
+void
+swim_quit(struct swim *swim)
+{
+ if (swim->self == NULL) {
+ swim_delete(swim);
+ return;
+ }
+ ev_periodic_stop(loop(), &swim->round_tick);
+ ev_periodic_stop(loop(), &swim->wait_ack_tick);
+ swim_scheduler_stop_input(&swim->scheduler);
+ /* Start the last round - quiting. */
+ if (swim_new_round(swim) != 0 || rlist_empty(&swim->queue_round)) {
+ swim_delete(swim);
+ return;
+ }
+ swim_task_destroy(&swim->round_step_task);
+ swim_task_create(&swim->round_step_task, swim_quit_step_complete,
+ swim_task_delete_cb);
+ struct swim_quit_bin header;
+ swim_quit_bin_create(&header, swim->self->incarnation);
+ int size = mp_sizeof_map(1) + sizeof(header);
+ char *pos = swim_packet_alloc(&swim->round_step_task.packet, size);
+ assert(pos != NULL);
+ pos = mp_encode_map(pos, 1);
+ memcpy(pos, &header, sizeof(header));
+ struct swim_member *m =
+ rlist_shift_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
+ swim_task_send(&swim->round_step_task, &m->addr, &swim->scheduler);
+}
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index dced172c0..24f3a4b33 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -97,6 +97,14 @@ swim_probe_member(struct swim *swim, const char *uri);
void
swim_info(struct swim *swim, struct info_handler *info);
+/**
+ * Gracefully leave the cluster, broadcast a notification.
+ * Members, received it, will remove a record about this instance
+ * from their tables, and will not consider it to be dead.
+ */
+void
+swim_quit(struct swim *swim);
+
#if defined(__cplusplus)
}
#endif
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index a8fb1f588..e62b7126f 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -187,6 +187,12 @@ swim_scheduler_bind(struct swim_scheduler *scheduler,
return 0;
}
+void
+swim_scheduler_stop_input(struct swim_scheduler *scheduler)
+{
+ ev_io_stop(loop(), &scheduler->input);
+}
+
void
swim_scheduler_destroy(struct swim_scheduler *scheduler)
{
@@ -202,7 +208,7 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler)
}
swim_transport_destroy(&scheduler->transport);
ev_io_stop(loop(), &scheduler->output);
- ev_io_stop(loop(), &scheduler->input);
+ swim_scheduler_stop_input(scheduler);
}
static void
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 0ba8972f0..c13b5d14f 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -178,6 +178,10 @@ int
swim_scheduler_bind(struct swim_scheduler *scheduler,
const struct sockaddr_in *addr);
+/** Stop accepting new packets from the network. */
+void
+swim_scheduler_stop_input(struct swim_scheduler *scheduler);
+
/** Destroy scheduler, its queues, close the socket. */
void
swim_scheduler_destroy(struct swim_scheduler *scheduler);
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 8b1ed76a7..aa9edfc9d 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -36,7 +36,9 @@
const char *swim_member_status_strs[] = {
"alive",
+ "suspected",
"dead",
+ "left",
};
const char *swim_fd_msg_type_strs[] = {
@@ -581,3 +583,13 @@ swim_route_bin_create(struct swim_route_bin *route,
route->m_dst_port = 0xcd;
route->v_dst_port = mp_bswap_u16(dst->sin_port);
}
+
+void
+swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation)
+{
+ header->k_quit = SWIM_QUIT;
+ header->m_quit = 0x81;
+ header->k_incarnation = SWIM_QUIT_INCARNATION;
+ header->m_incarnation = 0xcf;
+ header->v_incarnation = mp_bswap_u64(incarnation);
+}
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index fe9eb85c5..b743074e5 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -92,6 +92,12 @@ enum {
* | }, |
* | ... |
* | ], |
+ * | |
+ * | OR/AND |
+ * | |
+ * | SWIM_QUIT: { |
+ * | SWIM_QUIT_INCARNATION: uint |
+ * | } |
* | } |
* +-------------------------------------------------------------+
*/
@@ -99,11 +105,19 @@ enum {
enum swim_member_status {
/** The instance is ok, responds to requests. */
MEMBER_ALIVE = 0,
+ /**
+ * If a member has not responded to a ping, it is declared
+ * as suspected to be dead. After more failed pings it
+ * is finaly dead.
+ */
+ MEMBER_SUSPECTED,
/**
* The member is considered to be dead. It will disappear
* from the membership, if it is not pinned.
*/
MEMBER_DEAD,
+ /** The member has voluntary left the cluster. */
+ MEMBER_LEFT,
swim_member_status_MAX,
};
@@ -157,6 +171,7 @@ enum swim_body_key {
SWIM_ANTI_ENTROPY,
SWIM_FAILURE_DETECTION,
SWIM_DISSEMINATION,
+ SWIM_QUIT,
};
/**
@@ -589,6 +604,29 @@ swim_route_bin_create(struct swim_route_bin *route,
/** }}} Meta component */
+enum swim_quit_key {
+ /** Incarnation to ignore old quit messages. */
+ SWIM_QUIT_INCARNATION = 0,
+};
+
+/** Quit section. Describes voluntary quit from the cluster. */
+struct PACKED swim_quit_bin {
+ /** mp_encode_uint(SWIM_QUIT) */
+ uint8_t k_quit;
+ /** mp_encode_map(1) */
+ uint8_t m_quit;
+
+ /** mp_encode_uint(SWIM_QUIT_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+/** Initialize quit section. */
+void
+swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation);
+
/**
* Helpers to decode some values - map, array, etc with
* appropriate checks. All of them set diagnostics on an error
diff --git a/src/lua/swim.c b/src/lua/swim.c
index a20c2dc0d..7df4e5c85 100644
--- a/src/lua/swim.c
+++ b/src/lua/swim.c
@@ -312,6 +312,16 @@ lua_swim_remove_member(struct lua_State *L)
return 1;
}
+/** Remove a SWIM instance pointer from Lua space, nullify. */
+static void
+lua_swim_invalidate(struct lua_State *L)
+{
+ uint32_t ctypeid;
+ struct swim **cdata = (struct swim **) luaL_checkcdata(L, 1, &ctypeid);
+ assert(ctypeid == CTID_STRUCT_SWIM_PTR);
+ *cdata = NULL;
+}
+
/**
* Destroy and delete a SWIM instance. All its memory is freed, it
* stops participating in any rounds, the socket is closed. No
@@ -326,10 +336,7 @@ lua_swim_delete(struct lua_State *L)
if (swim == NULL)
return luaL_error(L, "Usage: swim:delete()");
swim_delete(swim);
- uint32_t ctypeid;
- struct swim **cdata = (struct swim **) luaL_checkcdata(L, 1, &ctypeid);
- assert(ctypeid == CTID_STRUCT_SWIM_PTR);
- *cdata = NULL;
+ lua_swim_invalidate(L);
return 0;
}
@@ -379,6 +386,23 @@ lua_swim_probe_member(struct lua_State *L)
return 1;
}
+/**
+ * Gracefully leave the cluster. The Lua stack should contain one
+ * value - a SWIM instance. After this method is called, the SWIM
+ * instance is deleted and can not be used.
+ * @param L Lua state.
+ */
+static int
+lua_swim_quit(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:quit()");
+ swim_quit(swim);
+ lua_swim_invalidate(L);
+ return 0;
+}
+
void
tarantool_lua_swim_init(struct lua_State *L)
{
@@ -390,6 +414,7 @@ tarantool_lua_swim_init(struct lua_State *L)
{"delete", lua_swim_delete},
{"info", lua_swim_info},
{"probe_member", lua_swim_probe_member},
+ {"quit", lua_swim_quit},
{NULL, NULL}
};
luaL_register_module(L, "swim", lua_swim_methods);
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list