[PATCH v4 05/12] [RAW] swim: introduce failure detection component
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Jan 31 00:28:34 MSK 2019
Failure detection components allows to find which members are
already dead.
Part of #3234
---
src/lib/swim/swim.c | 405 ++++++++++++++++++++++++++++++++++++--
src/lib/swim/swim.h | 11 +-
src/lib/swim/swim_io.c | 21 ++
src/lib/swim/swim_io.h | 9 +
src/lib/swim/swim_proto.c | 82 +++++++-
src/lib/swim/swim_proto.h | 101 +++++++++-
src/lua/swim.c | 34 +++-
7 files changed, 641 insertions(+), 22 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index adf01cfad..a862f52a4 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -131,6 +131,31 @@ enum {
* value.
*/
HEARTBEAT_RATE_DEFAULT = 1,
+ /**
+ * If a ping was sent, it is considered to be lost after
+ * this time without an ack. Nothing special in this
+ * value.
+ */
+ ACK_TIMEOUT_DEFAULT = 30,
+ /**
+ * If a member has not been responding to pings this
+ * number of times, it is considered to be dead.
+ * According to the SWIM paper, for a member it is enough
+ * to do not respond on one direct ping, and on K
+ * simultanous indirect pings, to be considered as dead.
+ * Seems too little, so here it is bigger.
+ */
+ NO_ACKS_TO_DEAD = 3,
+ /**
+ * If a member confirmed to be dead, it is removed from
+ * the membership after at least this number of
+ * unacknowledged pings. According to the SWIM paper, a
+ * dead member is deleted immediately. But here it is held
+ * for a while to 1) maybe refute its dead status, 2)
+ * disseminate the status via dissemination and
+ * anti-entropy components.
+ */
+ NO_ACKS_TO_GC = 2,
};
/**
@@ -193,6 +218,31 @@ struct swim_member {
* Position in a queue of members in the current round.
*/
struct rlist in_queue_round;
+ /**
+ *
+ * Failure detection component
+ */
+ /** Growing number to refute old messages. */
+ uint64_t incarnation;
+ /**
+ * How many pings did not receive an ack in a row being in
+ * the current status. After a threshold the instance is
+ * marked as dead. After more it is removed from the
+ * table. On each status or incarnation change this
+ * counter is reset.
+ */
+ int unacknowledged_pings;
+ /**
+ * When the latest ping is considered to be
+ * unacknowledged.
+ */
+ double ping_deadline;
+ /** Ready at hand regular ACK task. */
+ struct swim_task ack_task;
+ /** Ready at hand regular PING task. */
+ struct swim_task ping_task;
+ /** Position in a queue of members waiting for an ack. */
+ struct rlist in_queue_wait_ack;
};
#define mh_name _swim_table
@@ -250,8 +300,77 @@ struct swim {
* ones.
*/
struct swim_scheduler scheduler;
+ /**
+ *
+ * Failure detection component
+ */
+ /**
+ * Members waiting for an ACK. On too long absence of an
+ * ACK a member is considered to be dead and is removed.
+ * The list is sorted by deadline in ascending order (tail
+ * is newer, head is older).
+ */
+ struct rlist queue_wait_ack;
+ /** Generator of ack checking events. */
+ struct ev_periodic wait_ack_tick;
};
+/** Put the member into a list of ACK waiters. */
+static void
+swim_member_wait_ack(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_wait_ack)) {
+ member->ping_deadline = fiber_time() +
+ swim->wait_ack_tick.interval;
+ rlist_add_tail_entry(&swim->queue_wait_ack, member,
+ in_queue_wait_ack);
+ }
+}
+
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_member_status_is_updated(struct swim_member *member)
+{
+ member->unacknowledged_pings = 0;
+}
+
+/**
+ * Update status and incarnation of the member if needed. Statuses
+ * are compared as a compound key: {incarnation, status}. So @a
+ * new_status can override an old one only if its incarnation is
+ * greater, or the same, but its status is "bigger". Statuses are
+ * compared by their identifier, so "alive" < "dead". This
+ * protects from the case when a member is detected as dead on one
+ * instance, but overriden by another instance with the same
+ * incarnation "alive" message.
+ */
+static inline void
+swim_member_update_status(struct swim_member *member,
+ enum swim_member_status new_status,
+ uint64_t incarnation, struct swim *swim)
+{
+ (void) swim;
+ /*
+ * Source of truth about self is this instance and it is
+ * never updated from remote. Refutation is handled
+ * separately.
+ */
+ assert(member != swim->self);
+ if (member->incarnation == incarnation) {
+ if (member->status < new_status) {
+ member->status = new_status;
+ swim_member_status_is_updated(member);
+ }
+ } else if (member->incarnation < incarnation) {
+ member->status = new_status;
+ member->incarnation = incarnation;
+ swim_member_status_is_updated(member);
+ }
+}
+
/**
* A helper to get a pointer to a SWIM instance having only a
* pointer to it scheduler. It is used by task complete functions.
@@ -276,6 +395,11 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
mh_swim_table_del(swim->members, rc, NULL);
rlist_del_entry(member, in_queue_round);
+ /* Failure detection component. */
+ rlist_del_entry(member, in_queue_wait_ack);
+ swim_task_destroy(&member->ack_task);
+ swim_task_destroy(&member->ping_task);
+
free(member);
}
@@ -290,13 +414,34 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid)
return *mh_swim_table_node(swim->members, node);
}
+/**
+ * Once a ping is sent, the member should start waiting for an
+ * ACK.
+ */
+static void
+swim_ping_task_complete(struct swim_task *task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ /*
+ * If ping send has failed, it makes to sense to wait for
+ * an ACK.
+ */
+ if (rc != 0)
+ return;
+ struct swim *swim = swim_by_scheduler(scheduler);
+ struct swim_member *m = container_of(task, struct swim_member,
+ ping_task);
+ swim_member_wait_ack(swim, m);
+}
+
/**
* Register a new member with a specified status. Here it is
* added to the hash, to the 'next' queue.
*/
static struct swim_member *
swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
- const struct tt_uuid *uuid, enum swim_member_status status)
+ const struct tt_uuid *uuid, enum swim_member_status status,
+ uint64_t incarnation)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
@@ -318,6 +463,12 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
}
rlist_add_entry(&swim->queue_round, member, in_queue_round);
+ /* Failure detection component. */
+ member->incarnation = incarnation;
+ rlist_create(&member->in_queue_wait_ack);
+ swim_task_create(&member->ack_task, NULL, NULL);
+ swim_task_create(&member->ping_task, swim_ping_task_complete, NULL);
+
say_verbose("SWIM: member %s is added", swim_uuid_str(uuid));
return member;
}
@@ -402,7 +553,7 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
break;
size = new_size;
swim_member_bin_fill(&member_bin, &m->addr, &m->uuid,
- m->status);
+ m->status, m->incarnation);
memcpy(pos, &member_bin, sizeof(member_bin));
/*
* First random member could be choosen too close
@@ -439,6 +590,26 @@ swim_encode_src_uuid(struct swim *swim, struct swim_packet *packet)
return 1;
}
+/**
+ * Encode failure detection component.
+ * @retval 0 Not error, but nothing is encoded.
+ * @retval 1 Something is encoded.
+ */
+static int
+swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet,
+ enum swim_fd_msg_type type)
+{
+ struct swim_fd_header_bin fd_header_bin;
+ int size = sizeof(fd_header_bin);
+ char *pos = swim_packet_alloc(packet, size);
+ if (pos == NULL)
+ return 0;
+ swim_fd_header_bin_create(&fd_header_bin, type,
+ swim->self->incarnation);
+ memcpy(pos, &fd_header_bin, size);
+ return 1;
+}
+
/** Encode SWIM components into a UDP packet. */
static void
swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
@@ -447,9 +618,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
char *header = swim_packet_alloc(packet, 1);
int map_size = 0;
map_size += swim_encode_src_uuid(swim, packet);
+ map_size += swim_encode_failure_detection(swim, packet,
+ SWIM_FD_MSG_PING);
map_size += swim_encode_anti_entropy(swim, packet);
- assert(mp_sizeof_map(map_size) == 1 && map_size == 2);
+ assert(mp_sizeof_map(map_size) == 1 && map_size >= 2);
mp_encode_map(header, map_size);
}
@@ -494,8 +667,89 @@ swim_round_step_complete(struct swim_task *task,
(void) task;
struct swim *swim = swim_by_scheduler(scheduler);
ev_periodic_start(loop(), &swim->round_tick);
- rlist_shift_entry(&swim->queue_round, struct swim_member,
- in_queue_round);
+ struct swim_member *m =
+ rlist_shift_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
+ if (rc == 0) {
+ /*
+ * Each round message contains failure detection
+ * section with a ping.
+ */
+ swim_member_wait_ack(swim, m);
+ }
+}
+
+/** Schedule send of a failure detection message. */
+static void
+swim_send_fd_request(struct swim *swim, struct swim_task *task,
+ const struct sockaddr_in *dst, enum swim_fd_msg_type type)
+{
+ /*
+ * Reset packet allocator in case if task is being reused.
+ */
+ swim_packet_create(&task->packet);
+ char *header = swim_packet_alloc(&task->packet, 1);
+ int map_size = swim_encode_src_uuid(swim, &task->packet);
+ map_size += swim_encode_failure_detection(swim, &task->packet, type);
+ assert(map_size == 2);
+ mp_encode_map(header, map_size);
+ say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type],
+ sio_strfaddr((struct sockaddr *) dst, sizeof(*dst)));
+ swim_task_send(task, dst, &swim->scheduler);
+}
+
+/** Schedule send of an ack. */
+static inline void
+swim_send_ack(struct swim *swim, struct swim_task *task,
+ const struct sockaddr_in *dst)
+{
+ swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_ACK);
+}
+
+/** Schedule send of a ping. */
+static inline void
+swim_send_ping(struct swim *swim, struct swim_task *task,
+ const struct sockaddr_in *dst)
+{
+ swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_PING);
+}
+
+/**
+ * Check for unacknowledged pings. A ping is unacknowledged if an
+ * ack was not received during ack timeout. An unacknowledged ping
+ * is resent here.
+ */
+static void
+swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+ assert((events & EV_PERIODIC) != 0);
+ (void) loop;
+ (void) events;
+ struct swim *swim = (struct swim *) p->data;
+ struct swim_member *m, *tmp;
+ double current_time = fiber_time();
+ rlist_foreach_entry_safe(m, &swim->queue_wait_ack, in_queue_wait_ack,
+ tmp) {
+ if (current_time < m->ping_deadline)
+ break;
+ ++m->unacknowledged_pings;
+ switch (m->status) {
+ case MEMBER_ALIVE:
+ if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
+ m->status = MEMBER_DEAD;
+ swim_member_status_is_updated(m);
+ }
+ break;
+ case MEMBER_DEAD:
+ if (m->unacknowledged_pings >= NO_ACKS_TO_GC)
+ swim_member_delete(swim, m);
+ break;
+ default:
+ unreachable();
+ }
+ swim_send_ping(swim, &m->ping_task, &m->addr);
+ rlist_del_entry(m, in_queue_wait_ack);
+ }
}
/**
@@ -536,7 +790,11 @@ static inline void
swim_member_update_addr(struct swim_member *member,
const struct sockaddr_in *addr)
{
- member->addr = *addr;
+ if (addr->sin_port != member->addr.sin_port ||
+ addr->sin_addr.s_addr != member->addr.sin_addr.s_addr) {
+ member->addr = *addr;
+ swim_member_status_is_updated(member);
+ }
}
/**
@@ -550,11 +808,50 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
{
struct swim_member *member = swim_find_member(swim, &def->uuid);
if (member == NULL) {
+ if (def->status == MEMBER_DEAD) {
+ /*
+ * Do not 'resurrect' dead members to
+ * prevent 'ghost' members. Ghost member
+ * is a one declared as dead, sent via
+ * anti-entropy, and removed from local
+ * members table, but then returned back
+ * from received anti-entropy, as again
+ * dead. Such dead members could 'live'
+ * forever.
+ */
+ return NULL;
+ }
member = swim_member_new(swim, &def->addr, &def->uuid,
- def->status);
+ def->status, def->incarnation);
return member;
}
- swim_member_update_addr(member, &def->addr);
+ struct swim_member *self = swim->self;
+ if (member != self) {
+ if (def->incarnation >= member->incarnation) {
+ swim_member_update_addr(member, &def->addr);
+ swim_member_update_status(member, def->status,
+ def->incarnation, swim);
+ }
+ return member;
+ }
+ /*
+ * It is possible that other instances know a bigger
+ * incarnation of this instance - such thing happens when
+ * the instance restarts and loses its local incarnation
+ * number. It will be restored by receiving dissemination
+ * and anti-entropy messages about self.
+ */
+ if (self->incarnation < def->incarnation)
+ self->incarnation = def->incarnation;
+ if (def->status != MEMBER_ALIVE &&
+ def->incarnation == self->incarnation) {
+ /*
+ * In the cluster a gossip exists that this
+ * instance is not alive. Refute this information
+ * with a bigger incarnation.
+ */
+ self->incarnation++;
+ }
return member;
}
@@ -581,12 +878,54 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
return 0;
}
+/**
+ * Decode a failure detection message. Schedule acks, process
+ * acks.
+ */
+static int
+swim_process_failure_detection(struct swim *swim, const char **pos,
+ const char *end, const struct sockaddr_in *src,
+ const struct tt_uuid *uuid)
+{
+ const char *msg_pref = "invalid failure detection message:";
+ struct swim_failure_detection_def def;
+ struct swim_member_def mdef;
+ if (swim_failure_detection_def_decode(&def, pos, end, msg_pref) != 0)
+ return -1;
+ swim_member_def_create(&mdef);
+ memset(&mdef, 0, sizeof(mdef));
+ mdef.addr = *src;
+ mdef.incarnation = def.incarnation;
+ mdef.uuid = *uuid;
+ struct swim_member *member = swim_update_member(swim, &mdef);
+ if (member == NULL)
+ return -1;
+
+ switch (def.type) {
+ case SWIM_FD_MSG_PING:
+ swim_send_ack(swim, &member->ack_task, &member->addr);
+ break;
+ case SWIM_FD_MSG_ACK:
+ if (def.incarnation >= member->incarnation) {
+ /*
+ * Pings are reset above, in
+ * swim_update_member().
+ */
+ assert(member->unacknowledged_pings == 0);
+ rlist_del_entry(member, in_queue_wait_ack);
+ }
+ break;
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
/** Process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler, const char *pos,
const char *end, const struct sockaddr_in *src)
{
- (void) src;
const char *msg_pref = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
@@ -617,6 +956,12 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
if (swim_process_anti_entropy(swim, &pos, end) != 0)
goto error;
break;
+ case SWIM_FAILURE_DETECTION:
+ say_verbose("SWIM: process failure detection");
+ if (swim_process_failure_detection(swim, &pos, end,
+ src, &uuid) != 0)
+ goto error;
+ break;
default:
diag_set(SwimError, "%s unexpected key", msg_pref);
goto error;
@@ -649,6 +994,12 @@ swim_new(void)
swim_task_create(&swim->round_step_task, swim_round_step_complete,
NULL);
swim_scheduler_create(&swim->scheduler, swim_on_input);
+
+ /* Failure detection component. */
+ rlist_create(&swim->queue_wait_ack);
+ ev_init(&swim->wait_ack_tick, swim_check_acks);
+ ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT_DEFAULT, NULL);
+ swim->wait_ack_tick.data = (void *) swim;
return swim;
}
@@ -674,12 +1025,12 @@ swim_uri_to_addr(const char *uri, struct sockaddr_in *addr,
int
swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
- const struct tt_uuid *uuid)
+ double ack_timeout, const struct tt_uuid *uuid)
{
const char *msg_pref = "swim.cfg:";
- if (heartbeat_rate < 0) {
- diag_set(IllegalParams, "%s heartbeat_rate should be a "\
- "positive number", msg_pref);
+ if (heartbeat_rate < 0 || ack_timeout < 0) {
+ diag_set(IllegalParams, "%s heartbeat_rate and ack_timeout "\
+ "should be positive numbers", msg_pref);
return -1;
}
struct sockaddr_in addr;
@@ -692,7 +1043,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
"a first config", msg_pref);
return -1;
}
- swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE);
+ swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE,
+ 0);
if (swim->self == NULL)
return -1;
} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
@@ -730,7 +1082,11 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0)
ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL);
+ if (swim->wait_ack_tick.interval != ack_timeout && ack_timeout > 0)
+ ev_periodic_set(&swim->wait_ack_tick, 0, ack_timeout, NULL);
+
ev_periodic_start(loop(), &swim->round_tick);
+ ev_periodic_start(loop(), &swim->wait_ack_tick);
if (! is_first_cfg) {
swim_member_update_addr(swim->self, &addr);
@@ -766,7 +1122,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
return -1;
struct swim_member *member = swim_find_member(swim, uuid);
if (member == NULL) {
- member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE);
+ member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0);
return member == NULL ? -1 : 0;
}
diag_set(SwimError, "%s a member with such UUID already exists",
@@ -791,6 +1147,23 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid)
return 0;
}
+int
+swim_probe_member(struct swim *swim, const char *uri)
+{
+ const char *msg_pref = "swim.probe_member:";
+ if (swim_check_is_configured(swim, msg_pref) != 0)
+ return -1;
+ struct sockaddr_in addr;
+ if (swim_uri_to_addr(uri, &addr, msg_pref) != 0)
+ return -1;
+ struct swim_task *t = swim_task_new(swim_task_delete_cb,
+ swim_task_delete_cb);
+ if (t == NULL)
+ return -1;
+ swim_send_ping(swim, t, &addr);
+ return 0;
+}
+
void
swim_info(struct swim *swim, struct info_handler *info)
{
@@ -806,6 +1179,7 @@ swim_info(struct swim *swim, struct info_handler *info)
info_append_str(info, "status",
swim_member_status_strs[m->status]);
info_append_str(info, "uuid", swim_uuid_str(&m->uuid));
+ info_append_int(info, "incarnation", (int64_t) m->incarnation);
info_table_end(info);
}
info_end(info);
@@ -816,6 +1190,7 @@ swim_delete(struct swim *swim)
{
swim_scheduler_destroy(&swim->scheduler);
ev_periodic_stop(loop(), &swim->round_tick);
+ ev_periodic_stop(loop(), &swim->wait_ack_tick);
swim_task_destroy(&swim->round_step_task);
mh_int_t node = mh_first(swim->members);
while (node != mh_end(swim->members)) {
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index a98decc86..9d21a739d 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -55,6 +55,8 @@ swim_new(void);
* @heartbeat_rate seconds. It is rather the protocol
* speed. Protocol period depends on member count and
* @heartbeat_rate.
+ * @param ack_timeout Time in seconds after which a ping is
+ * considered to be unacknowledged.
* @param uuid UUID of this instance. Must be unique over the
* cluster.
*
@@ -63,7 +65,7 @@ swim_new(void);
*/
int
swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
- const struct tt_uuid *uuid);
+ double ack_timeout, const struct tt_uuid *uuid);
/**
* Stop listening and broadcasting messages, cleanup all internal
@@ -80,6 +82,13 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid);
int
swim_remove_member(struct swim *swim, const struct tt_uuid *uuid);
+/**
+ * Send a ping to this address. If an ACK is received, the member
+ * will be added.
+ */
+int
+swim_probe_member(struct swim *swim, const char *uri);
+
/** Dump member statuses into @a info. */
void
swim_info(struct swim *swim, struct info_handler *info);
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 20973acaf..170d7af77 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -65,6 +65,27 @@ swim_task_create(struct swim_task *task, swim_task_f complete,
rlist_create(&task->in_queue_output);
}
+struct swim_task *
+swim_task_new(swim_task_f complete, swim_task_f cancel)
+{
+ struct swim_task *task = (struct swim_task *) malloc(sizeof(*task));
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+ return NULL;
+ }
+ swim_task_create(task, complete, cancel);
+ return task;
+}
+
+void
+swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler,
+ int rc)
+{
+ (void) rc;
+ (void) scheduler;
+ free(task);
+}
+
/** Put the task into the queue of output tasks. */
static inline void
swim_task_schedule(struct swim_task *task, struct swim_scheduler *scheduler)
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 68fb89818..508d1ef6e 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -215,6 +215,15 @@ void
swim_task_create(struct swim_task *task, swim_task_f complete,
swim_task_f cancel);
+/** Allocate and create a new task. */
+struct swim_task *
+swim_task_new(swim_task_f complete, swim_task_f cancel);
+
+/** Callback to delete a task after its completion. */
+void
+swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler,
+ int rc);
+
/** Destroy the task, pop from the queue. */
static inline void
swim_task_destroy(struct swim_task *task)
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index a273cb815..542b988c1 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -36,6 +36,12 @@
const char *swim_member_status_strs[] = {
"alive",
+ "dead",
+};
+
+const char *swim_fd_msg_type_strs[] = {
+ "ping",
+ "ack",
};
int
@@ -178,6 +184,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
"member uuid") != 0)
return -1;
break;
+ case SWIM_MEMBER_INCARNATION:
+ if (swim_decode_uint(pos, end, &def->incarnation, msg_pref,
+ "member incarnation") != 0)
+ return -1;
+ break;
default:
unreachable();
}
@@ -225,6 +236,70 @@ swim_src_uuid_bin_create(struct swim_src_uuid_bin *header,
memcpy(header->v_uuid, uuid, UUID_LEN);
}
+void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+ enum swim_fd_msg_type type, uint64_t incarnation)
+{
+ header->k_header = SWIM_FAILURE_DETECTION;
+ header->m_header = 0x82;
+
+ header->k_type = SWIM_FD_MSG_TYPE;
+ header->v_type = type;
+
+ header->k_incarnation = SWIM_FD_INCARNATION;
+ header->m_incarnation = 0xcf;
+ header->v_incarnation = mp_bswap_u64(incarnation);
+}
+
+int
+swim_failure_detection_def_decode(struct swim_failure_detection_def *def,
+ const char **pos, const char *end,
+ const char *msg_pref)
+{
+ uint32_t size;
+ if (swim_decode_map(pos, end, &size, msg_pref, "root") != 0)
+ return -1;
+ memset(def, 0, sizeof(*def));
+ def->type = swim_fd_msg_type_MAX;
+ if (size != 2) {
+ diag_set(SwimError, "%s root map should have two keys - "\
+ "message type and incarnation", msg_pref);
+ return -1;
+ }
+ for (int i = 0; i < (int) size; ++i) {
+ uint64_t key;
+ if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0)
+ return -1;
+ switch(key) {
+ case SWIM_FD_MSG_TYPE:
+ if (swim_decode_uint(pos, end, &key, msg_pref,
+ "message type") != 0)
+ return -1;
+ if (key >= swim_fd_msg_type_MAX) {
+ diag_set(SwimError, "%s unknown message type",
+ msg_pref);
+ return -1;
+ }
+ def->type = key;
+ break;
+ case SWIM_FD_INCARNATION:
+ if (swim_decode_uint(pos, end, &def->incarnation,
+ msg_pref, "incarnation") != 0)
+ return -1;
+ break;
+ default:
+ diag_set(SwimError, "%s unexpected key", msg_pref);
+ return -1;
+ }
+ }
+ if (def->type == swim_fd_msg_type_MAX) {
+ diag_set(SwimError, "%s message type should be specified",
+ msg_pref);
+ return -1;
+ }
+ return 0;
+}
+
void
swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
uint16_t batch_size)
@@ -237,18 +312,19 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
void
swim_member_bin_fill(struct swim_member_bin *header,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status)
+ enum swim_member_status status, uint64_t incarnation)
{
header->v_status = status;
header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
header->v_port = mp_bswap_u16(addr->sin_port);
memcpy(header->v_uuid, uuid, UUID_LEN);
+ header->v_incarnation = mp_bswap_u64(incarnation);
}
void
swim_member_bin_create(struct swim_member_bin *header)
{
- header->m_header = 0x84;
+ header->m_header = 0x85;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDRESS;
header->m_addr = 0xce;
@@ -257,6 +333,8 @@ swim_member_bin_create(struct swim_member_bin *header)
header->k_uuid = SWIM_MEMBER_UUID;
header->m_uuid = 0xc4;
header->m_uuid_len = UUID_LEN;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
}
void
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 6e36f0b07..91a0bca9d 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -51,12 +51,20 @@
* | |
* | AND |
* | |
+ * | SWIM_FAILURE_DETECTION: { |
+ * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, |
+ * | SWIM_FD_INCARNATION: uint |
+ * | }, |
+ * | |
+ * | OR/AND |
+ * | |
* | SWIM_ANTI_ENTROPY: [ |
* | { |
* | SWIM_MEMBER_STATUS: uint, enum member_status, |
* | SWIM_MEMBER_ADDRESS: uint, ip, |
* | SWIM_MEMBER_PORT: uint, port, |
- * | SWIM_MEMBER_UUID: 16 byte UUID |
+ * | SWIM_MEMBER_UUID: 16 byte UUID, |
+ * | SWIM_MEMBER_INCARNATION: uint |
* | }, |
* | ... |
* | ], |
@@ -67,6 +75,11 @@
enum swim_member_status {
/** The instance is ok, responds to requests. */
MEMBER_ALIVE = 0,
+ /**
+ * The member is considered to be dead. It will disappear
+ * from the membership, if it is not pinned.
+ */
+ MEMBER_DEAD,
swim_member_status_MAX,
};
@@ -79,6 +92,7 @@ extern const char *swim_member_status_strs[];
struct swim_member_def {
struct tt_uuid uuid;
struct sockaddr_in addr;
+ uint64_t incarnation;
enum swim_member_status status;
};
@@ -109,6 +123,7 @@ swim_member_def_decode(struct swim_member_def *def, const char **pos,
enum swim_body_key {
SWIM_SRC_UUID = 0,
SWIM_ANTI_ENTROPY,
+ SWIM_FAILURE_DETECTION,
};
/**
@@ -131,6 +146,79 @@ void
swim_src_uuid_bin_create(struct swim_src_uuid_bin *header,
const struct tt_uuid *uuid);
+/** {{{ Failure detection component */
+
+/** Failure detection component keys. */
+enum swim_fd_key {
+ /** Type of the failure detection message: ping or ack. */
+ SWIM_FD_MSG_TYPE,
+ /**
+ * Incarnation of the sender. To make the member alive if
+ * it was considered to be dead, but ping/ack with greater
+ * incarnation was received from it.
+ */
+ SWIM_FD_INCARNATION,
+};
+
+/** Failure detection message type. */
+enum swim_fd_msg_type {
+ SWIM_FD_MSG_PING,
+ SWIM_FD_MSG_ACK,
+ swim_fd_msg_type_MAX,
+};
+
+extern const char *swim_fd_msg_type_strs[];
+
+/** SWIM failure detection MessagePack header template. */
+struct PACKED swim_fd_header_bin {
+ /** mp_encode_uint(SWIM_FAILURE_DETECTION) */
+ uint8_t k_header;
+ /** mp_encode_map(2) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_FD_MSG_TYPE) */
+ uint8_t k_type;
+ /** mp_encode_uint(enum swim_fd_msg_type) */
+ uint8_t v_type;
+
+ /** mp_encode_uint(SWIM_FD_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+/** Initialize failure detection section. */
+void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+ enum swim_fd_msg_type type, uint64_t incarnation);
+
+/** A decoded failure detection message. */
+struct swim_failure_detection_def {
+ /** Type of the message. */
+ enum swim_fd_msg_type type;
+ /** Incarnation of the sender. */
+ uint64_t incarnation;
+};
+
+/**
+ * Decode failure detection from a MessagePack buffer.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos Start of the MessagePack buffer.
+ * @param end End of the MessagePack buffer.
+ * @param msg_pref A prefix of an error message to use for
+ * diag_set, when something is wrong.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+int
+swim_failure_detection_def_decode(struct swim_failure_detection_def *def,
+ const char **pos, const char *end,
+ const char *msg_pref);
+
+/** }}} Failure detection component */
+
/** {{{ Anti-entropy component */
/**
@@ -142,6 +230,7 @@ enum swim_member_key {
SWIM_MEMBER_ADDRESS,
SWIM_MEMBER_PORT,
SWIM_MEMBER_UUID,
+ SWIM_MEMBER_INCARNATION,
swim_member_key_MAX,
};
@@ -164,7 +253,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
* anti-entropy section.
*/
struct PACKED swim_member_bin {
- /** mp_encode_map(4) */
+ /** mp_encode_map(5) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -190,6 +279,12 @@ struct PACKED swim_member_bin {
uint8_t m_uuid;
uint8_t m_uuid_len;
uint8_t v_uuid[UUID_LEN];
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
};
/** Initialize antri-entropy record. */
@@ -205,7 +300,7 @@ swim_member_bin_create(struct swim_member_bin *header);
void
swim_member_bin_fill(struct swim_member_bin *header,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status);
+ enum swim_member_status status, uint64_t incarnation);
/** }}} Anti-entropy component */
diff --git a/src/lua/swim.c b/src/lua/swim.c
index 317f53e73..a20c2dc0d 100644
--- a/src/lua/swim.c
+++ b/src/lua/swim.c
@@ -192,8 +192,10 @@ lua_swim_cfg_impl(struct lua_State *L, int ncfg, struct swim *swim,
lua_swim_get_uuid_field(L, ncfg, "uuid", funcname, &uuid);
double heartbeat_rate =
lua_swim_get_timeout_field(L, ncfg, "heartbeat", funcname);
+ double ack_timeout =
+ lua_swim_get_timeout_field(L, ncfg, "ack_timeout", funcname);
- return swim_cfg(swim, server_uri, heartbeat_rate, &uuid);
+ return swim_cfg(swim, server_uri, heartbeat_rate, ack_timeout, &uuid);
}
/**
@@ -348,6 +350,35 @@ lua_swim_info(struct lua_State *L)
return 1;
}
+/**
+ * Send a ping to a URI assuming that there is a member, which
+ * will respond with an ack, and will be added to the local
+ * members table.The Lua stack should contain two values - a SWIM
+ * instance to probe by, and a URI of a member.
+ * @param L Lua state.
+ * @retval 1 True.
+ * @retval 2 Nil and an error object. On invalid Lua parameters
+ * and OOM it throws.
+ */
+static int
+lua_swim_probe_member(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (lua_gettop(L) != 2 || swim == NULL)
+ return luaL_error(L, "Usage: swim:probe_member(uri)");
+ if (! lua_isstring(L, 2)) {
+ return luaL_error(L, "swim.probe_member: member URI should "\
+ "be a string");
+ }
+ if (swim_probe_member(swim, lua_tostring(L, 2)) != 0) {
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
void
tarantool_lua_swim_init(struct lua_State *L)
{
@@ -358,6 +389,7 @@ tarantool_lua_swim_init(struct lua_State *L)
{"remove_member", lua_swim_remove_member},
{"delete", lua_swim_delete},
{"info", lua_swim_info},
+ {"probe_member", lua_swim_probe_member},
{NULL, NULL}
};
luaL_register_module(L, "swim", lua_swim_methods);
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list