[tarantool-patches] [PATCH 6/6] [RAW] swim: introduce failure detection component
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Mar 20 13:49:19 MSK 2019
Failure detection components allows to find which members are
already dead.
Part of #3234
---
src/lib/swim/swim.c | 450 +++++++++++++++++++++++++++++++-
src/lib/swim/swim.h | 36 ++-
src/lib/swim/swim_io.c | 23 +-
src/lib/swim/swim_io.h | 16 ++
src/lib/swim/swim_proto.c | 82 +++++-
src/lib/swim/swim_proto.h | 101 ++++++-
test/unit/swim.c | 160 +++++++++++-
test/unit/swim.result | 34 ++-
test/unit/swim_test_transport.c | 16 +-
test/unit/swim_test_transport.h | 9 +
test/unit/swim_test_utils.c | 71 ++++-
test/unit/swim_test_utils.h | 30 +++
12 files changed, 991 insertions(+), 37 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index df34ce247..f97a2f993 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -38,6 +38,8 @@
#include "info/info.h"
#include "assoc.h"
#include "sio.h"
+#define HEAP_FORWARD_DECLARATION
+#include "salad/heap.h"
/**
* SWIM - Scalable Weakly-consistent Infection-style Process Group
@@ -135,6 +137,31 @@ enum {
* value.
*/
HEARTBEAT_RATE_DEFAULT = 1,
+ /**
+ * If a ping was sent, it is considered to be lost after
+ * this time without an ack. Nothing special in this
+ * value.
+ */
+ ACK_TIMEOUT_DEFAULT = 30,
+ /**
+ * If a member has not been responding to pings this
+ * number of times, it is considered to be dead.
+ * According to the SWIM paper, for a member it is enough
+ * to do not respond on one direct ping, and on K
+ * simultanous indirect pings, to be considered as dead.
+ * Seems too little, so here it is bigger.
+ */
+ NO_ACKS_TO_DEAD = 3,
+ /**
+ * If a member confirmed to be dead, it is removed from
+ * the membership after at least this number of
+ * unacknowledged pings. According to the SWIM paper, a
+ * dead member is deleted immediately. But here it is held
+ * for a while to 1) maybe refute its dead status, 2)
+ * disseminate the status via dissemination and
+ * anti-entropy components.
+ */
+ NO_ACKS_TO_GC_DEFAULT = 2,
};
/**
@@ -213,6 +240,31 @@ struct swim_member {
* Position in a queue of members in the current round.
*/
struct rlist in_round_queue;
+ /**
+ *
+ * Failure detection component
+ */
+ /** Growing number to refute old messages. */
+ uint64_t incarnation;
+ /**
+ * How many pings did not receive an ack in a row being in
+ * the current status. After a threshold the instance is
+ * marked as dead. After more it is removed from the
+ * table. On each status or incarnation change this
+ * counter is reset.
+ */
+ int unacknowledged_pings;
+ /**
+ * When the latest ping is considered to be
+ * unacknowledged.
+ */
+ double ping_deadline;
+ /** Ready at hand regular ACK task. */
+ struct swim_task ack_task;
+ /** Ready at hand regular PING task. */
+ struct swim_task ping_task;
+ /** Position in a queue of members waiting for an ack. */
+ struct heap_node in_wait_ack_heap;
};
#define mh_name _swim_table
@@ -230,6 +282,12 @@ struct mh_swim_table_key {
#define MH_SOURCE 1
#include "salad/mhash.h"
+#define HEAP_NAME wait_ack_heap
+#define HEAP_LESS(h, a, b) ((a)->ping_deadline < (b)->ping_deadline)
+#define heap_value_t struct swim_member
+#define heap_value_attr in_wait_ack_heap
+#include "salad/heap.h"
+
/**
* SWIM instance. Stores configuration, manages periodical tasks,
* rounds. Each member has an object of this type on its host,
@@ -285,8 +343,80 @@ struct swim {
* starting from this instance.
*/
mh_int_t iterator;
+ /**
+ *
+ * Failure detection component
+ */
+ /**
+ * Members waiting for an ACK. On too long absence of an
+ * ACK a member is considered to be dead and is removed.
+ * The heap is sorted by deadline in ascending order
+ * (bottom is newer, top is older).
+ */
+ heap_t wait_ack_heap;
+ /** Generator of ack checking events. */
+ struct ev_timer wait_ack_tick;
+ /**
+ * How many pings to a dead member should be
+ * unacknowledged to delete it from the member table.
+ */
+ int no_acks_to_gc;
};
+/** Put the member into a list of ACK waiters. */
+static void
+swim_wait_ack(struct swim *swim, struct swim_member *member)
+{
+ if (heap_node_is_stray(&member->in_wait_ack_heap)) {
+ member->ping_deadline = swim_time() + swim->wait_ack_tick.at;
+ wait_ack_heap_insert(&swim->wait_ack_heap, member);
+ swim_ev_timer_start(loop(), &swim->wait_ack_tick);
+ }
+}
+
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_on_member_update(struct swim *swim, struct swim_member *member)
+{
+ (void) swim;
+ member->unacknowledged_pings = 0;
+}
+
+/**
+ * Update status and incarnation of the member if needed. Statuses
+ * are compared as a compound key: {incarnation, status}. So @a
+ * new_status can override an old one only if its incarnation is
+ * greater, or the same, but its status is "bigger". Statuses are
+ * compared by their identifier, so "alive" < "dead". This
+ * protects from the case when a member is detected as dead on one
+ * instance, but overriden by another instance with the same
+ * incarnation "alive" message.
+ */
+static inline void
+swim_update_member_inc_status(struct swim *swim, struct swim_member *member,
+ enum swim_member_status new_status,
+ uint64_t incarnation)
+{
+ /*
+ * Source of truth about self is this instance and it is
+ * never updated from remote. Refutation is handled
+ * separately.
+ */
+ assert(member != swim->self);
+ if (member->incarnation < incarnation) {
+ member->status = new_status;
+ member->incarnation = incarnation;
+ swim_on_member_update(swim, member);
+ } else if (member->incarnation == incarnation &&
+ member->status < new_status) {
+ member->status = new_status;
+ swim_on_member_update(swim, member);
+ }
+}
+
int
swim_fd(const struct swim *swim)
{
@@ -304,11 +434,37 @@ swim_by_scheduler(struct swim_scheduler *scheduler)
return container_of(scheduler, struct swim, scheduler);
}
+/**
+ * Once a ping is sent, the member should start waiting for an
+ * ACK.
+ */
+static void
+swim_ping_task_complete(struct swim_task *task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ /*
+ * If ping send has failed, it makes to sense to wait for
+ * an ACK.
+ */
+ if (rc < 0)
+ return;
+ struct swim *swim = swim_by_scheduler(scheduler);
+ struct swim_member *m = container_of(task, struct swim_member,
+ ping_task);
+ swim_wait_ack(swim, m);
+}
+
/** Free member's resources. */
static inline void
swim_member_delete(struct swim_member *member)
{
assert(rlist_empty(&member->in_round_queue));
+
+ /* Failure detection component. */
+ assert(heap_node_is_stray(&member->in_wait_ack_heap));
+ swim_task_destroy(&member->ack_task);
+ swim_task_destroy(&member->ping_task);
+
free(member);
}
@@ -330,7 +486,7 @@ swim_reserve_one_member(struct swim *swim)
/** Create a new member. It is not registered anywhere here. */
static struct swim_member *
swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status)
+ enum swim_member_status status, uint64_t incarnation)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
@@ -343,6 +499,13 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid,
member->uuid = *uuid;
member->hash = swim_uuid_hash(uuid);
rlist_create(&member->in_round_queue);
+
+ /* Failure detection component. */
+ member->incarnation = incarnation;
+ heap_node_create(&member->in_wait_ack_heap);
+ swim_task_create(&member->ack_task, NULL, NULL, "ack");
+ swim_task_create(&member->ping_task, swim_ping_task_complete, NULL,
+ "ping");
return member;
}
@@ -360,6 +523,11 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
assert(rc != mh_end(swim->members));
mh_swim_table_del(swim->members, rc, NULL);
rlist_del_entry(member, in_round_queue);
+
+ /* Failure detection component. */
+ if (! heap_node_is_stray(&member->in_wait_ack_heap))
+ wait_ack_heap_delete(&swim->wait_ack_heap, member);
+
swim_member_delete(member);
}
@@ -383,7 +551,8 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid)
*/
static struct swim_member *
swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
- const struct tt_uuid *uuid, enum swim_member_status status)
+ const struct tt_uuid *uuid, enum swim_member_status status,
+ uint64_t incarnation)
{
int new_bsize = sizeof(swim->shuffled[0]) *
(mh_size(swim->members) + 1);
@@ -394,7 +563,17 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
return NULL;
}
swim->shuffled = new_shuffled;
- struct swim_member *member = swim_member_new(addr, uuid, status);
+ /*
+ * Reserve one more slot to never fail push into the ack
+ * waiters heap.
+ */
+ if (wait_ack_heap_reserve(&swim->wait_ack_heap) != 0) {
+ diag_set(OutOfMemory, sizeof(struct heap_node), "realloc",
+ "wait_ack_heap");
+ return NULL;
+ }
+ struct swim_member *member =
+ swim_member_new(addr, uuid, status, incarnation);
if (member == NULL)
return NULL;
assert(swim_find_member(swim, uuid) == NULL);
@@ -489,7 +668,7 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
if (swim_packet_reserve(packet, new_size) == NULL)
break;
swim_member_bin_fill(&member_bin, &m->addr, &m->uuid,
- m->status);
+ m->status, m->incarnation);
memcpy(pos + size, &member_bin, sizeof(member_bin));
size = new_size;
/*
@@ -526,6 +705,25 @@ swim_encode_src_uuid(struct swim *swim, struct swim_packet *packet)
return 1;
}
+/**
+ * Encode failure detection component.
+ * @retval Number of key-values added to the packet's root map.
+ */
+static int
+swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet,
+ enum swim_fd_msg_type type)
+{
+ struct swim_fd_header_bin fd_header_bin;
+ int size = sizeof(fd_header_bin);
+ char *pos = swim_packet_alloc(packet, size);
+ if (pos == NULL)
+ return 0;
+ swim_fd_header_bin_create(&fd_header_bin, type,
+ swim->self->incarnation);
+ memcpy(pos, &fd_header_bin, size);
+ return 1;
+}
+
/** Encode SWIM components into a UDP packet. */
static void
swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
@@ -534,9 +732,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
char *header = swim_packet_alloc(packet, 1);
int map_size = 0;
map_size += swim_encode_src_uuid(swim, packet);
+ map_size += swim_encode_failure_detection(swim, packet,
+ SWIM_FD_MSG_PING);
map_size += swim_encode_anti_entropy(swim, packet);
- assert(mp_sizeof_map(map_size) == 1 && map_size == 2);
+ assert(mp_sizeof_map(map_size) == 1 && map_size >= 2);
mp_encode_map(header, map_size);
}
@@ -592,8 +792,92 @@ swim_complete_step(struct swim_task *task,
struct swim_member *m =
rlist_first_entry(&swim->round_queue, struct swim_member,
in_round_queue);
- if (swim_sockaddr_in_eq(&m->addr, &task->dst))
+ if (swim_sockaddr_in_eq(&m->addr, &task->dst)) {
rlist_shift(&swim->round_queue);
+ if (rc > 0) {
+ /*
+ * Each round message contains failure
+ * detection section with a ping.
+ */
+ swim_wait_ack(swim, m);
+ }
+ }
+}
+
+/** Schedule send of a failure detection message. */
+static void
+swim_send_fd_msg(struct swim *swim, struct swim_task *task,
+ const struct sockaddr_in *dst, enum swim_fd_msg_type type)
+{
+ /*
+ * Reset packet allocator in case if task is being reused.
+ */
+ swim_packet_create(&task->packet);
+ char *header = swim_packet_alloc(&task->packet, 1);
+ int map_size = swim_encode_src_uuid(swim, &task->packet);
+ map_size += swim_encode_failure_detection(swim, &task->packet, type);
+ assert(map_size == 2);
+ mp_encode_map(header, map_size);
+ say_verbose("SWIM %d: schedule %s to %s", swim_fd(swim),
+ swim_fd_msg_type_strs[type],
+ sio_strfaddr((struct sockaddr *) dst, sizeof(*dst)));
+ swim_task_send(task, dst, &swim->scheduler);
+}
+
+/** Schedule send of an ack. */
+static inline void
+swim_send_ack(struct swim *swim, struct swim_task *task,
+ const struct sockaddr_in *dst)
+{
+ swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK);
+}
+
+/** Schedule send of a ping. */
+static inline void
+swim_send_ping(struct swim *swim, struct swim_task *task,
+ const struct sockaddr_in *dst)
+{
+ swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_PING);
+}
+
+/**
+ * Check for unacknowledged pings. A ping is unacknowledged if an
+ * ack was not received during ack timeout. An unacknowledged ping
+ * is resent here.
+ */
+static void
+swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
+{
+ assert((events & EV_TIMER) != 0);
+ (void) events;
+ struct swim *swim = (struct swim *) t->data;
+ double current_time = swim_time();
+ struct swim_member *m;
+ while ((m = wait_ack_heap_top(&swim->wait_ack_heap)) != NULL) {
+ if (current_time < m->ping_deadline) {
+ swim_ev_timer_start(loop, t);
+ return;
+ }
+ wait_ack_heap_pop(&swim->wait_ack_heap);
+ ++m->unacknowledged_pings;
+ switch (m->status) {
+ case MEMBER_ALIVE:
+ if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
+ m->status = MEMBER_DEAD;
+ swim_on_member_update(swim, m);
+ }
+ break;
+ case MEMBER_DEAD:
+ if (m->unacknowledged_pings >= swim->no_acks_to_gc) {
+ swim_delete_member(swim, m);
+ continue;
+ }
+ break;
+ default:
+ unreachable();
+ }
+ swim_send_ping(swim, &m->ping_task, &m->addr);
+ }
}
/**
@@ -642,6 +926,7 @@ swim_update_member_uuid(struct swim *swim, struct swim_member *member,
say_verbose("SWIM %d: a member has changed its UUID from %s to %s",
swim_fd(swim), swim_uuid_str(&old_uuid),
swim_uuid_str(new_uuid));
+ swim_on_member_update(swim, member);
return 0;
}
@@ -650,8 +935,10 @@ static inline void
swim_update_member_addr(struct swim *swim, struct swim_member *member,
const struct sockaddr_in *addr)
{
- (void) swim;
- member->addr = *addr;
+ if (! swim_sockaddr_in_eq(addr, &member->addr)) {
+ member->addr = *addr;
+ swim_on_member_update(swim, member);
+ }
}
/**
@@ -665,13 +952,52 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def)
{
struct swim_member *member = swim_find_member(swim, &def->uuid);
if (member == NULL) {
+ if (def->status == MEMBER_DEAD &&
+ swim->no_acks_to_gc != SWIM_NO_ACKS_IGNORE) {
+ /*
+ * Do not 'resurrect' dead members to
+ * prevent 'ghost' members. Ghost member
+ * is a one declared as dead, sent via
+ * anti-entropy, and removed from local
+ * member table, but then returned back
+ * from received anti-entropy, as again
+ * dead. Such dead members could 'live'
+ * forever.
+ */
+ return NULL;
+ }
member = swim_new_member(swim, &def->addr, &def->uuid,
- def->status);
+ def->status, def->incarnation);
return member;
}
struct swim_member *self = swim->self;
- if (member != self)
+ if (member != self) {
+ if (def->incarnation < member->incarnation)
+ return member;
swim_update_member_addr(swim, member, &def->addr);
+ swim_update_member_inc_status(swim, member, def->status,
+ def->incarnation);
+ return member;
+ }
+ /*
+ * It is possible that other instances know a bigger
+ * incarnation of this instance - such thing happens when
+ * the instance restarts and loses its local incarnation
+ * number. It will be restored by receiving dissemination
+ * and anti-entropy messages about self.
+ */
+ if (self->incarnation < def->incarnation)
+ self->incarnation = def->incarnation;
+ if (def->status != MEMBER_ALIVE &&
+ def->incarnation == self->incarnation) {
+ /*
+ * In the cluster a gossip exists that this
+ * instance is not alive. Refute this information
+ * with a bigger incarnation.
+ */
+ self->incarnation++;
+ swim_on_member_update(swim, member);
+ }
return member;
}
@@ -699,12 +1025,58 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
return 0;
}
+/**
+ * Decode a failure detection message. Schedule acks, process
+ * acks.
+ */
+static int
+swim_process_failure_detection(struct swim *swim, const char **pos,
+ const char *end, const struct sockaddr_in *src,
+ const struct tt_uuid *uuid)
+{
+ const char *prefix = "invalid failure detection message:";
+ struct swim_failure_detection_def def;
+ struct swim_member_def mdef;
+ if (swim_failure_detection_def_decode(&def, pos, end, prefix) != 0)
+ return -1;
+ say_verbose("SWIM %d: process failure detection's %s", swim_fd(swim),
+ swim_fd_msg_type_strs[def.type]);
+ swim_member_def_create(&mdef);
+ mdef.addr = *src;
+ mdef.incarnation = def.incarnation;
+ mdef.uuid = *uuid;
+ struct swim_member *member = swim_upsert_member(swim, &mdef);
+ if (member == NULL)
+ return -1;
+ if (def.incarnation < member->incarnation)
+ return 0;
+ if (def.incarnation == member->incarnation &&
+ member->status != MEMBER_ALIVE) {
+ member->status = MEMBER_ALIVE;
+ swim_on_member_update(swim, member);
+ }
+
+ switch (def.type) {
+ case SWIM_FD_MSG_PING:
+ if (! swim_task_is_scheduled(&member->ack_task))
+ swim_send_ack(swim, &member->ack_task, &member->addr);
+ break;
+ case SWIM_FD_MSG_ACK:
+ member->unacknowledged_pings = 0;
+ if (! heap_node_is_stray(&member->in_wait_ack_heap))
+ wait_ack_heap_delete(&swim->wait_ack_heap, member);
+ break;
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
/** Process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler, const char *pos,
const char *end, const struct sockaddr_in *src)
{
- (void) src;
const char *prefix = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
@@ -734,6 +1106,11 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
if (swim_process_anti_entropy(swim, &pos, end) != 0)
goto error;
break;
+ case SWIM_FAILURE_DETECTION:
+ if (swim_process_failure_detection(swim, &pos, end,
+ src, &uuid) != 0)
+ goto error;
+ break;
default:
diag_set(SwimError, "%s unexpected key", prefix);
goto error;
@@ -766,6 +1143,13 @@ swim_new(void)
swim_task_create(&swim->round_step_task, swim_complete_step, NULL,
"round packet");
swim_scheduler_create(&swim->scheduler, swim_on_input);
+
+ /* Failure detection component. */
+ wait_ack_heap_create(&swim->wait_ack_heap);
+ swim_ev_timer_init(&swim->wait_ack_tick, swim_check_acks,
+ ACK_TIMEOUT_DEFAULT, 0);
+ swim->wait_ack_tick.data = (void *) swim;
+ swim->no_acks_to_gc = NO_ACKS_TO_GC_DEFAULT;
return swim;
}
@@ -796,7 +1180,7 @@ swim_uri_to_addr(const char *uri, struct sockaddr_in *addr,
int
swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
- const struct tt_uuid *uuid)
+ double ack_timeout, int no_acks_to_gc, const struct tt_uuid *uuid)
{
const char *prefix = "swim.cfg:";
struct sockaddr_in addr;
@@ -809,7 +1193,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
"a first config", prefix);
return -1;
}
- swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE);
+ swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE,
+ 0);
if (swim->self == NULL)
return -1;
} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
@@ -855,14 +1240,25 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
if (swim->round_tick.at != heartbeat_rate && heartbeat_rate > 0)
swim_ev_timer_set(&swim->round_tick, heartbeat_rate, 0);
+ if (swim->wait_ack_tick.at != ack_timeout && ack_timeout > 0)
+ swim_ev_timer_set(&swim->wait_ack_tick, ack_timeout, 0);
+
swim_update_member_addr(swim, swim->self, &addr);
int rc = swim_update_member_uuid(swim, swim->self, uuid);
/* Reserved above. */
assert(rc == 0);
(void) rc;
+ if (no_acks_to_gc >= 0)
+ swim->no_acks_to_gc = no_acks_to_gc;
return 0;
}
+double
+swim_ack_timeout(const struct swim *swim)
+{
+ return swim->wait_ack_tick.at;
+}
+
bool
swim_is_configured(const struct swim *swim)
{
@@ -883,7 +1279,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
return -1;
struct swim_member *member = swim_find_member(swim, uuid);
if (member == NULL) {
- member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE);
+ member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0);
return member == NULL ? -1 : 0;
}
diag_set(SwimError, "%s a member with such UUID already exists",
@@ -911,6 +1307,21 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid)
return 0;
}
+int
+swim_probe_member(struct swim *swim, const char *uri)
+{
+ assert(swim_is_configured(swim));
+ struct sockaddr_in addr;
+ if (swim_uri_to_addr(uri, &addr, "swim.probe_member:") != 0)
+ return -1;
+ struct swim_task *t = swim_task_new(swim_task_delete_cb,
+ swim_task_delete_cb, "probe ping");
+ if (t == NULL)
+ return -1;
+ swim_send_ping(swim, t, &addr);
+ return 0;
+}
+
void
swim_info(struct swim *swim, struct info_handler *info)
{
@@ -926,6 +1337,7 @@ swim_info(struct swim *swim, struct info_handler *info)
info_append_str(info, "status",
swim_member_status_strs[m->status]);
info_append_str(info, "uuid", swim_uuid_str(&m->uuid));
+ info_append_int(info, "incarnation", (int64_t) m->incarnation);
info_table_end(info);
}
info_end(info);
@@ -936,14 +1348,18 @@ swim_delete(struct swim *swim)
{
swim_scheduler_destroy(&swim->scheduler);
swim_ev_timer_stop(loop(), &swim->round_tick);
+ swim_ev_timer_stop(loop(), &swim->wait_ack_tick);
swim_task_destroy(&swim->round_step_task);
mh_int_t node;
mh_foreach(swim->members, node) {
struct swim_member *m =
*mh_swim_table_node(swim->members, node);
rlist_del_entry(m, in_round_queue);
+ if (! heap_node_is_stray(&m->in_wait_ack_heap))
+ wait_ack_heap_delete(&swim->wait_ack_heap, m);
swim_member_delete(m);
}
+ wait_ack_heap_destroy(&swim->wait_ack_heap);
mh_swim_table_delete(swim->members);
free(swim->shuffled);
}
@@ -1007,3 +1423,9 @@ swim_member_uuid(const struct swim_member *member)
{
return &member->uuid;
}
+
+uint64_t
+swim_member_incarnation(const struct swim_member *member)
+{
+ return member->incarnation;
+}
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index ddb759c3d..23b244d00 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -31,6 +31,7 @@
* SUCH DAMAGE.
*/
#include <stdbool.h>
+#include <limits.h>
#define SWIM_PUBLIC_API
#include "swim_proto.h"
@@ -38,6 +39,16 @@
extern "C" {
#endif
+enum {
+ /**
+ * Value being used to say that unacknowledged pings
+ * should not affect a certain decision about member. For
+ * example, regardless of number of unacked pings, never
+ * drop a member.
+ */
+ SWIM_NO_ACKS_IGNORE = INT_MAX,
+};
+
struct info_handler;
struct swim;
struct tt_uuid;
@@ -65,6 +76,14 @@ swim_is_configured(const struct swim *swim);
* @heartbeat_rate seconds. It is rather the protocol
* speed. Protocol period depends on member count and
* @heartbeat_rate.
+ * @param ack_timeout Time in seconds after which a ping is
+ * considered to be unacknowledged.
+ * @param no_acks_to_gc How many pings to a dead member should be
+ * unacknowledged to delete it from the member table. Big
+ * or even almost infinite (SWIM_NO_ACKS_IGNORE) values
+ * could be useful, if SWIM is used mainly for monitoring
+ * of existing nodes with manual removal of dead ones, and
+ * probably with only single initial discovery.
* @param uuid UUID of this instance. Must be unique over the
* cluster.
*
@@ -73,7 +92,11 @@ swim_is_configured(const struct swim *swim);
*/
int
swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
- const struct tt_uuid *uuid);
+ double ack_timeout, int no_acks_to_gc, const struct tt_uuid *uuid);
+
+/** SWIM's ACK timeout, previously set via @sa swim_cfg. */
+double
+swim_ack_timeout(const struct swim *swim);
/**
* Stop listening and broadcasting messages, cleanup all internal
@@ -98,6 +121,13 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid);
int
swim_remove_member(struct swim *swim, const struct tt_uuid *uuid);
+/**
+ * Send a ping to this address. If an ACK is received, the member
+ * will be added.
+ */
+int
+swim_probe_member(struct swim *swim, const char *uri);
+
/** Dump member statuses into @a info. */
void
swim_info(struct swim *swim, struct info_handler *info);
@@ -148,6 +178,10 @@ swim_member_uri(const struct swim_member *member);
const struct tt_uuid *
swim_member_uuid(const struct swim_member *member);
+/** Member's incarnation. */
+uint64_t
+swim_member_incarnation(const struct swim_member *member);
+
#if defined(__cplusplus)
}
#endif
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 015968a0d..504f64f32 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -67,11 +67,32 @@ swim_task_create(struct swim_task *task, swim_task_f complete,
rlist_create(&task->in_queue_output);
}
+struct swim_task *
+swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc)
+{
+ struct swim_task *task = (struct swim_task *) malloc(sizeof(*task));
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+ return NULL;
+ }
+ swim_task_create(task, complete, cancel, desc);
+ return task;
+}
+
+void
+swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler,
+ int rc)
+{
+ (void) rc;
+ (void) scheduler;
+ free(task);
+}
+
/** Put the task into the queue of output tasks. */
static inline void
swim_task_schedule(struct swim_task *task, struct swim_scheduler *scheduler)
{
- assert(rlist_empty(&task->in_queue_output));
+ assert(! swim_task_is_scheduled(task));
rlist_add_tail_entry(&scheduler->queue_output, task, in_queue_output);
swim_ev_io_start(loop(), &scheduler->output);
}
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index bc62a29ce..a6bc28ad3 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -207,6 +207,13 @@ struct swim_task {
const char *desc;
};
+/** Check if @a task is already scheduled. */
+static inline bool
+swim_task_is_scheduled(struct swim_task *task)
+{
+ return ! rlist_empty(&task->in_queue_output);
+}
+
/**
* Put the task into a queue of tasks. Eventually it will be sent.
*/
@@ -219,6 +226,15 @@ void
swim_task_create(struct swim_task *task, swim_task_f complete,
swim_task_f cancel, const char *desc);
+/** Allocate and create a new task. */
+struct swim_task *
+swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc);
+
+/** Callback to delete a task after its completion. */
+void
+swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler,
+ int rc);
+
/** Destroy the task, pop from the queue. */
static inline void
swim_task_destroy(struct swim_task *task)
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index bf4c09b24..93b7938b6 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -36,6 +36,12 @@
const char *swim_member_status_strs[] = {
"alive",
+ "dead",
+};
+
+const char *swim_fd_msg_type_strs[] = {
+ "ping",
+ "ack",
};
int
@@ -182,6 +188,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
"member uuid") != 0)
return -1;
break;
+ case SWIM_MEMBER_INCARNATION:
+ if (swim_decode_uint(pos, end, &def->incarnation, prefix,
+ "member incarnation") != 0)
+ return -1;
+ break;
default:
unreachable();
}
@@ -229,6 +240,70 @@ swim_src_uuid_bin_create(struct swim_src_uuid_bin *header,
memcpy(header->v_uuid, uuid, UUID_LEN);
}
+void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+ enum swim_fd_msg_type type, uint64_t incarnation)
+{
+ header->k_header = SWIM_FAILURE_DETECTION;
+ header->m_header = 0x82;
+
+ header->k_type = SWIM_FD_MSG_TYPE;
+ header->v_type = type;
+
+ header->k_incarnation = SWIM_FD_INCARNATION;
+ header->m_incarnation = 0xcf;
+ header->v_incarnation = mp_bswap_u64(incarnation);
+}
+
+int
+swim_failure_detection_def_decode(struct swim_failure_detection_def *def,
+ const char **pos, const char *end,
+ const char *prefix)
+{
+ uint32_t size;
+ if (swim_decode_map(pos, end, &size, prefix, "root") != 0)
+ return -1;
+ memset(def, 0, sizeof(*def));
+ def->type = swim_fd_msg_type_MAX;
+ if (size != 2) {
+ diag_set(SwimError, "%s root map should have two keys - "\
+ "message type and incarnation", prefix);
+ return -1;
+ }
+ for (int i = 0; i < (int) size; ++i) {
+ uint64_t key;
+ if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0)
+ return -1;
+ switch(key) {
+ case SWIM_FD_MSG_TYPE:
+ if (swim_decode_uint(pos, end, &key, prefix,
+ "message type") != 0)
+ return -1;
+ if (key >= swim_fd_msg_type_MAX) {
+ diag_set(SwimError, "%s unknown message type",
+ prefix);
+ return -1;
+ }
+ def->type = key;
+ break;
+ case SWIM_FD_INCARNATION:
+ if (swim_decode_uint(pos, end, &def->incarnation,
+ prefix, "incarnation") != 0)
+ return -1;
+ break;
+ default:
+ diag_set(SwimError, "%s unexpected key", prefix);
+ return -1;
+ }
+ }
+ if (def->type == swim_fd_msg_type_MAX) {
+ diag_set(SwimError, "%s message type should be specified",
+ prefix);
+ return -1;
+ }
+ return 0;
+}
+
void
swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
uint16_t batch_size)
@@ -241,18 +316,19 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
void
swim_member_bin_fill(struct swim_member_bin *header,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status)
+ enum swim_member_status status, uint64_t incarnation)
{
header->v_status = status;
header->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr));
header->v_port = mp_bswap_u16(ntohs(addr->sin_port));
memcpy(header->v_uuid, uuid, UUID_LEN);
+ header->v_incarnation = mp_bswap_u64(incarnation);
}
void
swim_member_bin_create(struct swim_member_bin *header)
{
- header->m_header = 0x84;
+ header->m_header = 0x85;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDRESS;
header->m_addr = 0xce;
@@ -261,6 +337,8 @@ swim_member_bin_create(struct swim_member_bin *header)
header->k_uuid = SWIM_MEMBER_UUID;
header->m_uuid = 0xc4;
header->m_uuid_len = UUID_LEN;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
}
void
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 4f3cdf03d..c120f5733 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -49,12 +49,20 @@
* | |
* | AND |
* | |
+ * | SWIM_FAILURE_DETECTION: { |
+ * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, |
+ * | SWIM_FD_INCARNATION: uint |
+ * | }, |
+ * | |
+ * | OR/AND |
+ * | |
* | SWIM_ANTI_ENTROPY: [ |
* | { |
* | SWIM_MEMBER_STATUS: uint, enum member_status, |
* | SWIM_MEMBER_ADDRESS: uint, ip, |
* | SWIM_MEMBER_PORT: uint, port, |
- * | SWIM_MEMBER_UUID: 16 byte UUID |
+ * | SWIM_MEMBER_UUID: 16 byte UUID, |
+ * | SWIM_MEMBER_INCARNATION: uint |
* | }, |
* | ... |
* | ], |
@@ -68,6 +76,11 @@
enum swim_member_status {
/** The instance is ok, responds to requests. */
MEMBER_ALIVE = 0,
+ /**
+ * The member is considered to be dead. It will disappear
+ * from the membership after some unacknowledged pings.
+ */
+ MEMBER_DEAD,
swim_member_status_MAX,
};
@@ -87,6 +100,7 @@ extern const char *swim_member_status_strs[];
struct swim_member_def {
struct tt_uuid uuid;
struct sockaddr_in addr;
+ uint64_t incarnation;
enum swim_member_status status;
};
@@ -117,6 +131,7 @@ swim_member_def_decode(struct swim_member_def *def, const char **pos,
enum swim_body_key {
SWIM_SRC_UUID = 0,
SWIM_ANTI_ENTROPY,
+ SWIM_FAILURE_DETECTION,
};
/**
@@ -139,6 +154,79 @@ void
swim_src_uuid_bin_create(struct swim_src_uuid_bin *header,
const struct tt_uuid *uuid);
+/** {{{ Failure detection component */
+
+/** Failure detection component keys. */
+enum swim_fd_key {
+ /** Type of the failure detection message: ping or ack. */
+ SWIM_FD_MSG_TYPE,
+ /**
+ * Incarnation of the sender. To make the member alive if
+ * it was considered to be dead, but ping/ack with greater
+ * incarnation was received from it.
+ */
+ SWIM_FD_INCARNATION,
+};
+
+/** Failure detection message type. */
+enum swim_fd_msg_type {
+ SWIM_FD_MSG_PING,
+ SWIM_FD_MSG_ACK,
+ swim_fd_msg_type_MAX,
+};
+
+extern const char *swim_fd_msg_type_strs[];
+
+/** SWIM failure detection MessagePack header template. */
+struct PACKED swim_fd_header_bin {
+ /** mp_encode_uint(SWIM_FAILURE_DETECTION) */
+ uint8_t k_header;
+ /** mp_encode_map(2) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_FD_MSG_TYPE) */
+ uint8_t k_type;
+ /** mp_encode_uint(enum swim_fd_msg_type) */
+ uint8_t v_type;
+
+ /** mp_encode_uint(SWIM_FD_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+/** Initialize failure detection section. */
+void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+ enum swim_fd_msg_type type, uint64_t incarnation);
+
+/** A decoded failure detection message. */
+struct swim_failure_detection_def {
+ /** Type of the message. */
+ enum swim_fd_msg_type type;
+ /** Incarnation of the sender. */
+ uint64_t incarnation;
+};
+
+/**
+ * Decode failure detection from a MessagePack buffer.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos Start of the MessagePack buffer.
+ * @param end End of the MessagePack buffer.
+ * @param prefix A prefix of an error message to use for diag_set,
+ * when something is wrong.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+int
+swim_failure_detection_def_decode(struct swim_failure_detection_def *def,
+ const char **pos, const char *end,
+ const char *prefix);
+
+/** }}} Failure detection component */
+
/** {{{ Anti-entropy component */
/**
@@ -150,6 +238,7 @@ enum swim_member_key {
SWIM_MEMBER_ADDRESS,
SWIM_MEMBER_PORT,
SWIM_MEMBER_UUID,
+ SWIM_MEMBER_INCARNATION,
swim_member_key_MAX,
};
@@ -172,7 +261,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
* anti-entropy section.
*/
struct PACKED swim_member_bin {
- /** mp_encode_map(4) */
+ /** mp_encode_map(5) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -198,6 +287,12 @@ struct PACKED swim_member_bin {
uint8_t m_uuid;
uint8_t m_uuid_len;
uint8_t v_uuid[UUID_LEN];
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
};
/** Initialize antri-entropy record. */
@@ -213,7 +308,7 @@ swim_member_bin_create(struct swim_member_bin *header);
void
swim_member_bin_fill(struct swim_member_bin *header,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status);
+ enum swim_member_status status, uint64_t incarnation);
/** }}} Anti-entropy component */
diff --git a/test/unit/swim.c b/test/unit/swim.c
index ea60be4ae..80787a0c7 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -105,11 +105,11 @@ swim_test_uuid_update(void)
struct swim *s = swim_cluster_node(cluster, 0);
struct tt_uuid new_uuid = uuid_nil;
new_uuid.time_low = 1000;
- is(swim_cfg(s, NULL, -1, &new_uuid), 0, "UUID update");
+ is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), 0, "UUID update");
is(swim_cluster_wait_fullmesh(cluster, 1), 0,
"old UUID is returned back as a 'ghost' member");
new_uuid.time_low = 2;
- is(swim_cfg(s, NULL, -1, &new_uuid), -1,
+ is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), -1,
"can not update to an existing UUID - swim_cfg fails");
ok(swim_error_check_match("exists"), "diag says 'exists'");
swim_cluster_delete(cluster);
@@ -124,16 +124,16 @@ swim_test_cfg(void)
struct swim *s = swim_new();
assert(s != NULL);
- is(swim_cfg(s, NULL, -1, NULL), -1, "first cfg failed - no URI");
+ is(swim_cfg(s, NULL, -1, -1, -1, NULL), -1, "first cfg failed - no URI");
ok(swim_error_check_match("mandatory"), "diag says 'mandatory'");
const char *uri = "127.0.0.1:1";
- is(swim_cfg(s, uri, -1, NULL), -1, "first cfg failed - no UUID");
+ is(swim_cfg(s, uri, -1, -1, -1, NULL), -1, "first cfg failed - no UUID");
ok(swim_error_check_match("mandatory"), "diag says 'mandatory'");
struct tt_uuid uuid = uuid_nil;
uuid.time_low = 1;
- is(swim_cfg(s, uri, -1, &uuid), 0, "configured first time");
- is(swim_cfg(s, NULL, -1, NULL), 0, "second time can omit URI, UUID");
- is(swim_cfg(s, NULL, 2, NULL), 0, "hearbeat is dynamic");
+ is(swim_cfg(s, uri, -1, -1, -1, &uuid), 0, "configured first time");
+ is(swim_cfg(s, NULL, -1, -1, -1, NULL), 0, "second time can omit URI, UUID");
+ is(swim_cfg(s, NULL, 2, 2, -1, NULL), 0, "hearbeat is dynamic");
const char *self_uri = swim_member_uri(swim_self(s));
is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\
"URI");
@@ -145,14 +145,16 @@ swim_test_cfg(void)
const char *bad_uri3 = "unix/:/home/gerold103/any/dir";
struct tt_uuid uuid2 = uuid_nil;
uuid2.time_low = 2;
- is(swim_cfg(s2, bad_uri1, -1, &uuid2), -1, "can not use invalid URI");
+ is(swim_cfg(s2, bad_uri1, -1, -1, -1, &uuid2), -1,
+ "can not use invalid URI");
ok(swim_error_check_match("invalid uri"), "diag says 'invalid uri'");
- is(swim_cfg(s2, bad_uri2, -1, &uuid2), -1, "can not use domain names");
+ is(swim_cfg(s2, bad_uri2, -1, -1, -1, &uuid2), -1,
+ "can not use domain names");
ok(swim_error_check_match("invalid uri"), "diag says 'invalid uri'");
- is(swim_cfg(s2, bad_uri3, -1, &uuid2), -1,
+ is(swim_cfg(s2, bad_uri3, -1, -1, -1, &uuid2), -1,
"UNIX sockets are not supported");
ok(swim_error_check_match("only IP"), "diag says 'only IP'");
- is(swim_cfg(s2, uri, -1, &uuid2), -1,
+ is(swim_cfg(s2, uri, -1, -1, -1, &uuid2), -1,
"can not bind to an occupied port");
ok(swim_error_check_match("bind"), "diag says 'bind'");
swim_delete(s2);
@@ -226,10 +228,140 @@ swim_test_add_remove(void)
swim_finish_test();
}
+static void
+swim_test_basic_failure_detection(void)
+{
+ swim_start_test(7);
+ struct swim_cluster *cluster = swim_cluster_new(2);
+ swim_cluster_set_ack_timeout(cluster, 0.5);
+
+ swim_cluster_add_link(cluster, 0, 1);
+ is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE,
+ "node is added as alive");
+ swim_cluster_block_io(cluster, 1);
+ is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 2.4), -1,
+ "member still is not dead after 2 noacks");
+ is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 0.1), 0,
+ "but it is dead after one more");
+
+ is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX,
+ 0.9), -1,
+ "after 1 more unack the member still is not deleted");
+ is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX,
+ 0.1), 0, "but it is dropped after 1 more");
+
+ /*
+ * After IO unblock pending messages will be processed all
+ * at once. S2 will learn about S1. After one more round
+ * step it should be fullmesh.
+ */
+ swim_cluster_unblock_io(cluster, 1);
+ is(swim_cluster_wait_fullmesh(cluster, 1), 0, "fullmesh is restored");
+
+ /* A member can be removed during an ACK wait. */
+ swim_cluster_block_io(cluster, 1);
+ /* Next round after 1 sec + let ping hang for 0.25 sec. */
+ swim_run_for(1.25);
+ struct swim *s1 = swim_cluster_node(cluster, 0);
+ struct swim *s2 = swim_cluster_node(cluster, 1);
+ const struct swim_member *s2_self = swim_self(s2);
+ swim_remove_member(s1, swim_member_uuid(s2_self));
+ swim_cluster_unblock_io(cluster, 1);
+ swim_run_for(0.1);
+ is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE,
+ "a member is added back on an ACK");
+
+ swim_cluster_delete(cluster);
+ swim_finish_test();
+}
+
+static void
+swim_test_basic_gossip(void)
+{
+ swim_start_test(3);
+ struct swim_cluster *cluster = swim_cluster_new(3);
+ swim_cluster_set_ack_timeout(cluster, 10);
+ /*
+ * Test basic gossip. S1 and S2 know each other. Then S2
+ * starts losing packets. S1 does not receive 2 ACKs from
+ * S2. Then S3 joins the cluster and explicitly learns
+ * about S1 and S2. After one more unack S1 declares S2 as
+ * dead, and via anti-entropy S3 learns the same. Even
+ * earlier than it could discover the same via its own
+ * pings to S2.
+ */
+ swim_cluster_add_link(cluster, 0, 1);
+ swim_cluster_add_link(cluster, 1, 0);
+ swim_cluster_set_drop(cluster, 1, true);
+ /*
+ * Wait two no-ACKs on S1 from S2. +1 sec to send a first
+ * ping.
+ */
+ swim_run_for(20 + 1);
+ swim_cluster_add_link(cluster, 0, 2);
+ swim_cluster_add_link(cluster, 2, 1);
+ is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 10), 0,
+ "S1 sees S2 as dead");
+ is(swim_cluster_member_status(cluster, 2, 1), MEMBER_ALIVE,
+ "S3 still thinks that S2 is alive");
+ /*
+ * At most after two round steps S1 sends 'S2 is dead' to
+ * S3.
+ */
+ is(swim_cluster_wait_status(cluster, 2, 1, MEMBER_DEAD, 2), 0,
+ "S3 learns about dead S2 from S1");
+
+ swim_cluster_delete(cluster);
+ swim_finish_test();
+}
+
+static void
+swim_test_probe(void)
+{
+ swim_start_test(2);
+ struct swim_cluster *cluster = swim_cluster_new(2);
+
+ struct swim *s1 = swim_cluster_node(cluster, 0);
+ struct swim *s2 = swim_cluster_node(cluster, 1);
+ const char *s2_uri = swim_member_uri(swim_self(s2));
+ is(swim_probe_member(s1, s2_uri), 0, "send probe");
+ is(swim_cluster_wait_fullmesh(cluster, 0.1), 0,
+ "receive ACK on probe and get fullmesh")
+
+ swim_cluster_delete(cluster);
+ swim_finish_test();
+}
+
+static void
+swim_test_refute(void)
+{
+ swim_start_test(4);
+ struct swim_cluster *cluster = swim_cluster_new(2);
+ swim_cluster_set_ack_timeout(cluster, 2);
+
+ swim_cluster_add_link(cluster, 0, 1);
+ swim_cluster_set_drop(cluster, 1, true);
+ fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 7) != 0);
+ swim_cluster_set_drop(cluster, 1, false);
+ is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0,
+ "S2 increments its own incarnation to refute its death");
+ is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 1), 0,
+ "new incarnation has reached S1 with a next round message");
+
+ swim_cluster_restart_node(cluster, 1);
+ is(swim_cluster_member_incarnation(cluster, 1, 1), 0,
+ "after restart S2's incarnation is 0 again");
+ is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0,
+ "S2 learned its old bigger incarnation 1 from S0");
+
+ swim_cluster_delete(cluster);
+ swim_finish_test();
+}
+
static int
main_f(va_list ap)
{
- swim_start_test(5);
+ swim_start_test(9);
(void) ap;
swim_test_ev_init();
@@ -240,6 +372,10 @@ main_f(va_list ap)
swim_test_uuid_update();
swim_test_cfg();
swim_test_add_remove();
+ swim_test_basic_failure_detection();
+ swim_test_probe();
+ swim_test_refute();
+ swim_test_basic_gossip();
swim_test_transport_free();
swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 42cef1612..2503cbfcd 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
*** main_f ***
-1..5
+1..9
*** swim_test_one_link ***
1..6
ok 1 - no rounds - no fullmesh
@@ -60,4 +60,36 @@ ok 4 - subtests
ok 13 - back in fullmesh after a member removal in the middle of a step
ok 5 - subtests
*** swim_test_add_remove: done ***
+ *** swim_test_basic_failure_detection ***
+ 1..7
+ ok 1 - node is added as alive
+ ok 2 - member still is not dead after 2 noacks
+ ok 3 - but it is dead after one more
+ ok 4 - after 1 more unack the member still is not deleted
+ ok 5 - but it is dropped after 1 more
+ ok 6 - fullmesh is restored
+ ok 7 - a member is added back on an ACK
+ok 6 - subtests
+ *** swim_test_basic_failure_detection: done ***
+ *** swim_test_probe ***
+ 1..2
+ ok 1 - send probe
+ ok 2 - receive ACK on probe and get fullmesh
+ok 7 - subtests
+ *** swim_test_probe: done ***
+ *** swim_test_refute ***
+ 1..4
+ ok 1 - S2 increments its own incarnation to refute its death
+ ok 2 - new incarnation has reached S1 with a next round message
+ ok 3 - after restart S2's incarnation is 0 again
+ ok 4 - S2 learned its old bigger incarnation 1 from S0
+ok 8 - subtests
+ *** swim_test_refute: done ***
+ *** swim_test_basic_gossip ***
+ 1..3
+ ok 1 - S1 sees S2 as dead
+ ok 2 - S3 still thinks that S2 is alive
+ ok 3 - S3 learns about dead S2 from S1
+ok 9 - subtests
+ *** swim_test_basic_gossip: done ***
*** main_f: done ***
diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index d1c3e97d7..d121292b1 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -95,7 +95,11 @@ struct swim_fd {
* blocked, new messages are queued, but not delivered.
*/
bool is_opened;
-
+ /**
+ * True if any message sent to that fd should be just
+ * dropped, not queued.
+ */
+ bool is_dropping;
/**
* Link in the list of opened and non-blocked descriptors.
* Used to feed them all EV_WRITE.
@@ -260,6 +264,14 @@ swim_test_transport_unblock_fd(int fd)
rlist_add_tail_entry(&swim_fd_active, sfd, in_active);
}
+void
+swim_test_transport_set_drop(int fd, bool value)
+{
+ struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE];
+ if (sfd->is_opened)
+ sfd->is_dropping = value;
+}
+
/** Send one packet to destination's recv queue. */
static inline void
swim_fd_send_packet(struct swim_fd *fd)
@@ -270,7 +282,7 @@ swim_fd_send_packet(struct swim_fd *fd)
rlist_shift_entry(&fd->send_queue, struct swim_test_packet,
in_queue);
struct swim_fd *dst = &swim_fd[ntohs(p->dst.sin_port)];
- if (dst->is_opened)
+ if (dst->is_opened && ! dst->is_dropping && ! fd->is_dropping)
rlist_add_tail_entry(&dst->recv_queue, p, in_queue);
else
swim_test_packet_delete(p);
diff --git a/test/unit/swim_test_transport.h b/test/unit/swim_test_transport.h
index f8066e636..5a1a92271 100644
--- a/test/unit/swim_test_transport.h
+++ b/test/unit/swim_test_transport.h
@@ -30,6 +30,7 @@
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
+#include <stdbool.h>
struct ev_loop;
/**
@@ -58,6 +59,14 @@ swim_test_transport_block_fd(int fd);
void
swim_test_transport_unblock_fd(int fd);
+/**
+ * Set to true, if all incomming and outgoing packets should be
+ * dropped. Note, that the node, owning @a fd, thinks, that its
+ * packets are sent.
+ */
+void
+swim_test_transport_set_drop(int fd, bool value);
+
/** Initialize test transport system. */
void
swim_test_transport_init(void);
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index 0d62bb26c..0415a3035 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -62,13 +62,24 @@ swim_cluster_new(int size)
assert(res->node[i] != NULL);
sprintf(uri, "127.0.0.1:%d", i + 1);
uuid.time_low = i + 1;
- int rc = swim_cfg(res->node[i], uri, -1, &uuid);
+ int rc = swim_cfg(res->node[i], uri, -1, -1, -1, &uuid);
assert(rc == 0);
(void) rc;
}
return res;
}
+void
+swim_cluster_set_ack_timeout(struct swim_cluster *cluster, double ack_timeout)
+{
+ for (int i = 0; i < cluster->size; ++i) {
+ int rc = swim_cfg(cluster->node[i], NULL, -1, ack_timeout, -1,
+ NULL);
+ assert(rc == 0);
+ (void) rc;
+ }
+}
+
void
swim_cluster_delete(struct swim_cluster *cluster)
{
@@ -107,6 +118,17 @@ swim_cluster_member_status(struct swim_cluster *cluster, int node_id,
return swim_member_status(m);
}
+uint64_t
+swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id,
+ int member_id)
+{
+ const struct swim_member *m =
+ swim_cluster_member_view(cluster, node_id, member_id);
+ if (m == NULL)
+ return UINT64_MAX;
+ return swim_member_incarnation(m);
+}
+
struct swim *
swim_cluster_node(struct swim_cluster *cluster, int i)
{
@@ -114,6 +136,25 @@ swim_cluster_node(struct swim_cluster *cluster, int i)
return cluster->node[i];
}
+void
+swim_cluster_restart_node(struct swim_cluster *cluster, int i)
+{
+ assert(i >= 0 && i < cluster->size);
+ struct swim *s = cluster->node[i];
+ const struct swim_member *self = swim_self(s);
+ struct tt_uuid uuid = *swim_member_uuid(self);
+ char uri[128];
+ strcpy(uri, swim_member_uri(self));
+ double ack_timeout = swim_ack_timeout(s);
+ swim_delete(s);
+ s = swim_new();
+ assert(s != NULL);
+ int rc = swim_cfg(s, uri, -1, ack_timeout, -1, &uuid);
+ assert(rc == 0);
+ (void) rc;
+ cluster->node[i] = s;
+}
+
void
swim_cluster_block_io(struct swim_cluster *cluster, int i)
{
@@ -126,6 +167,12 @@ swim_cluster_unblock_io(struct swim_cluster *cluster, int i)
swim_test_transport_unblock_fd(swim_fd(cluster->node[i]));
}
+void
+swim_cluster_set_drop(struct swim_cluster *cluster, int i, bool value)
+{
+ swim_test_transport_set_drop(swim_fd(cluster->node[i]), value);
+}
+
/** Check if @a s1 knows every member of @a s2's table. */
static inline bool
swim1_contains_swim2(struct swim *s1, struct swim *s2)
@@ -186,6 +233,28 @@ swim_run_for(double duration)
swim_wait_timeout(duration, false);
}
+int
+swim_cluster_wait_status(struct swim_cluster *cluster, int node_id,
+ int member_id, enum swim_member_status status,
+ double timeout)
+{
+ return swim_wait_timeout(timeout,
+ swim_cluster_member_status(cluster, node_id,
+ member_id) == status
+ );
+}
+
+int
+swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id,
+ int member_id, uint64_t incarnation,
+ double timeout)
+{
+ return swim_wait_timeout(timeout,
+ swim_cluster_member_incarnation(cluster, node_id,
+ member_id) == incarnation
+ );
+}
+
bool
swim_error_check_match(const char *msg)
{
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index befb95420..8fba5c2da 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -44,6 +44,10 @@ struct swim_cluster;
struct swim_cluster *
swim_cluster_new(int size);
+/** Change ACK timeout of all the instances in the cluster. */
+void
+swim_cluster_set_ack_timeout(struct swim_cluster *cluster, double ack_timeout);
+
/** Delete all the SWIM instances, and the cluster itself. */
void
swim_cluster_delete(struct swim_cluster *cluster);
@@ -56,6 +60,10 @@ swim_error_check_match(const char *msg);
struct swim *
swim_cluster_node(struct swim_cluster *cluster, int i);
+/** Drop and create again a SWIM instance with id @a i. */
+void
+swim_cluster_restart_node(struct swim_cluster *cluster, int i);
+
/** Block IO on a SWIM instance with id @a i. */
void
swim_cluster_block_io(struct swim_cluster *cluster, int i);
@@ -64,6 +72,9 @@ swim_cluster_block_io(struct swim_cluster *cluster, int i);
void
swim_cluster_unblock_io(struct swim_cluster *cluster, int i);
+void
+swim_cluster_set_drop(struct swim_cluster *cluster, int i, bool value);
+
/**
* Explicitly add a member of id @a from_id to a member of id
* @a to_id.
@@ -75,6 +86,10 @@ enum swim_member_status
swim_cluster_member_status(struct swim_cluster *cluster, int node_id,
int member_id);
+uint64_t
+swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id,
+ int member_id);
+
/**
* Check if in the cluster every instance knowns the about other
* instances.
@@ -86,6 +101,21 @@ swim_cluster_is_fullmesh(struct swim_cluster *cluster);
int
swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout);
+/**
+ * Wait until a member with id @a member_id is seen with @a status
+ * in the membership table of a member with id @a node_id. At most
+ * @a timeout seconds.
+ */
+int
+swim_cluster_wait_status(struct swim_cluster *cluster, int node_id,
+ int member_id, enum swim_member_status status,
+ double timeout);
+
+int
+swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id,
+ int member_id, uint64_t incarnation,
+ double timeout);
+
/** Process SWIM events for @a duration fake seconds. */
void
swim_run_for(double duration);
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list