[PATCH v3 2/6] [RAW] swim: introduce failure detection component
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Sat Dec 29 13:14:11 MSK 2018
Failure detection components allows to find which members are
already dead.
Part of #3234
---
src/lib/swim/swim.c | 439 +++++++++++++++++++++++++++++++++-
src/lib/swim/swim_io.c | 14 ++
src/lib/swim/swim_io.h | 16 ++
src/lib/swim/swim_transport.h | 4 +-
4 files changed, 463 insertions(+), 10 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 08c377374..c7bc11bca 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -89,6 +89,22 @@
enum {
/** How often to send membership messages and pings. */
HEARTBEAT_RATE_DEFAULT = 1,
+ /**
+ * If a ping was sent, it is considered to be lost after
+ * this time without an ack.
+ */
+ ACK_TIMEOUT = 1,
+ /**
+ * If a member has not been responding to pings this
+ * number of times, it is considered to be dead.
+ */
+ NO_ACKS_TO_DEAD = 3,
+ /**
+ * If a not pinned member confirmed to be dead, it is
+ * removed from the membership after at least this number
+ * of failed pings.
+ */
+ NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
};
/**
@@ -109,11 +125,17 @@ enum swim_member_status {
* members table.
*/
MEMBER_ALIVE = 0,
+ /**
+ * The member is considered to be dead. It will disappear
+ * from the membership, if it is not pinned.
+ */
+ MEMBER_DEAD,
swim_member_status_MAX,
};
static const char *swim_member_status_strs[] = {
"alive",
+ "dead",
};
/**
@@ -138,6 +160,30 @@ struct swim_member {
* Position in a queue of members in the current round.
*/
struct rlist in_queue_round;
+ /**
+ *
+ * Failure detection component
+ */
+ /**
+ * True, if the member is configured explicitly and can
+ * not disappear from the membership.
+ */
+ bool is_pinned;
+ /** Growing number to refute old messages. */
+ uint64_t incarnation;
+ /**
+ * How many pings did not receive an ack in a row. After
+ * a threshold the instance is marked as dead. After more
+ * it is removed from the table (if not pinned).
+ */
+ int failed_pings;
+ /** When the latest ping was sent to this member. */
+ double ping_ts;
+ /** Ready at hand regular ACK task. */
+ struct swim_task ack_task;
+ struct swim_task ping_task;
+ /** Position in a queue of members waiting for an ack. */
+ struct rlist in_queue_wait_ack;
};
/**
@@ -189,6 +235,19 @@ struct swim {
* anti-entropy message.
*/
struct swim_member **shuffled_members;
+ /**
+ *
+ * Failure detection component
+ */
+ /**
+ * Members waiting for an ACK. On too long absence of ACK
+ * a member is considered to be dead and is removed. The
+ * list is sorted by time in ascending order (tail is
+ * newer, head is older).
+ */
+ struct rlist queue_wait_ack;
+ /** Generator of ack checking events. */
+ struct ev_periodic wait_ack_tick;
};
static inline uint64_t
@@ -204,8 +263,84 @@ sockaddr_in_hash(const struct sockaddr_in *a)
*/
enum swim_component_type {
SWIM_ANTI_ENTROPY = 0,
+ SWIM_FAILURE_DETECTION,
};
+/** {{{ Failure detection component */
+
+/** Possible failure detection keys. */
+enum swim_fd_key {
+ /** Type of the failure detection message: ping or ack. */
+ SWIM_FD_MSG_TYPE,
+ /**
+ * Incarnation of the sender. To make the member alive if
+ * it was considered to be dead, but ping/ack with greater
+ * incarnation was received from it.
+ */
+ SWIM_FD_INCARNATION,
+};
+
+/**
+ * Failure detection message now has only two types: ping or ack.
+ * Indirect ping/ack are todo.
+ */
+enum swim_fd_msg_type {
+ SWIM_FD_MSG_PING,
+ SWIM_FD_MSG_ACK,
+ swim_fd_msg_type_MAX,
+};
+
+static const char *swim_fd_msg_type_strs[] = {
+ "ping",
+ "ack",
+};
+
+/** SWIM failure detection MsgPack header template. */
+struct PACKED swim_fd_header_bin {
+ /** mp_encode_uint(SWIM_FAILURE_DETECTION) */
+ uint8_t k_header;
+ /** mp_encode_map(2) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_FD_MSG_TYPE) */
+ uint8_t k_type;
+ /** mp_encode_uint(enum swim_fd_msg_type) */
+ uint8_t v_type;
+
+ /** mp_encode_uint(SWIM_FD_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+static inline void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+ enum swim_fd_msg_type type, uint64_t incarnation)
+{
+ header->k_header = SWIM_FAILURE_DETECTION;
+ header->m_header = 0x82;
+
+ header->k_type = SWIM_FD_MSG_TYPE;
+ header->v_type = type;
+
+ header->k_incarnation = SWIM_FD_INCARNATION;
+ header->m_incarnation = 0xcf;
+ header->v_incarnation = mp_bswap_u64(incarnation);
+}
+
+static void
+swim_member_schedule_ack_wait(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_wait_ack)) {
+ member->ping_ts = fiber_time();
+ rlist_add_tail_entry(&swim->queue_wait_ack, member,
+ in_queue_wait_ack);
+ }
+}
+
+/** }}} Failure detection component */
+
/** {{{ Anti-entropy component */
/**
@@ -220,6 +355,7 @@ enum swim_member_key {
*/
SWIM_MEMBER_ADDRESS,
SWIM_MEMBER_PORT,
+ SWIM_MEMBER_INCARNATION,
swim_member_key_MAX,
};
@@ -243,7 +379,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
/** SWIM member MsgPack template. */
struct PACKED swim_member_bin {
- /** mp_encode_map(3) */
+ /** mp_encode_map(4) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -262,6 +398,12 @@ struct PACKED swim_member_bin {
/** mp_encode_uint(addr.sin_port) */
uint8_t m_port;
uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
};
static inline void
@@ -271,17 +413,20 @@ swim_member_bin_reset(struct swim_member_bin *header,
header->v_status = member->status;
header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
header->v_port = mp_bswap_u16(member->addr.sin_port);
+ header->v_incarnation = mp_bswap_u64(member->incarnation);
}
static inline void
swim_member_bin_create(struct swim_member_bin *header)
{
- header->m_header = 0x83;
+ header->m_header = 0x84;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDRESS;
header->m_addr = 0xce;
header->k_port = SWIM_MEMBER_PORT;
header->m_port = 0xcd;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
}
/** }}} Anti-entropy component */
@@ -289,17 +434,51 @@ swim_member_bin_create(struct swim_member_bin *header)
/**
* SWIM message structure:
* {
+ * SWIM_FAILURE_DETECTION: {
+ * SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type,
+ * SWIM_FD_INCARNATION: uint
+ * },
+ *
+ * OR/AND
+ *
* SWIM_ANTI_ENTROPY: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
* SWIM_MEMBER_ADDRESS: uint, ip,
- * SWIM_MEMBER_PORT: uint, port
+ * SWIM_MEMBER_PORT: uint, port,
+ * SWIM_MEMBER_INCARNATION: uint
* },
* ...
* ],
* }
*/
+/**
+ * Update status and incarnation of the member if needed. Statuses
+ * are compared as a compound key: {incarnation, status}. So @a
+ * new_status can override an old one only if its incarnation is
+ * greater, or the same, but its status is "bigger". Statuses are
+ * compared by their identifier, so "alive" < "dead". This
+ * protects from the case when a member is detected as dead on one
+ * instance, but overriden by another instance with the same
+ * incarnation "alive" message.
+ */
+static inline void
+swim_member_update_status(struct swim *swim, struct swim_member *member,
+ enum swim_member_status new_status,
+ uint64_t incarnation)
+{
+ (void) swim;
+ assert(member != swim->self);
+ if (member->incarnation == incarnation) {
+ if (member->status < new_status)
+ member->status = new_status;
+ } else if (member->incarnation < incarnation) {
+ member->status = new_status;
+ member->incarnation = incarnation;
+ }
+}
+
/**
* Remove the member from all queues, hashes, destroy it and free
* the memory.
@@ -313,6 +492,11 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
mh_i64ptr_del(swim->members, rc, NULL);
rlist_del_entry(member, in_queue_round);
+ /* Failure detection component. */
+ rlist_del_entry(member, in_queue_wait_ack);
+ swim_task_destroy(&member->ack_task);
+ swim_task_destroy(&member->ping_task);
+
free(member);
}
@@ -322,7 +506,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
*/
static struct swim_member *
swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
- enum swim_member_status status)
+ enum swim_member_status status, uint64_t incarnation)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
@@ -343,6 +527,12 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
}
rlist_add_entry(&swim->queue_round, member, in_queue_round);
+ /* Failure detection component. */
+ member->incarnation = incarnation;
+ rlist_create(&member->in_queue_wait_ack);
+ swim_task_create(&member->ack_task, &swim->scheduler, swim_task_reset);
+ swim_task_create(&member->ping_task, &swim->scheduler, swim_task_reset);
+
return member;
}
@@ -442,6 +632,28 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg)
return 1;
}
+/**
+ * Encode failure detection component.
+ * @retval -1 Error.
+ * @retval 1 Success, something is encoded.
+ */
+static int
+swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg,
+ enum swim_fd_msg_type type)
+{
+ struct swim_fd_header_bin fd_header_bin;
+ int size = sizeof(fd_header_bin);
+ struct swim_packet *packet = swim_msg_reserve(msg, size);
+ if (packet == NULL)
+ return -1;
+ char *pos = swim_packet_alloc(packet, size);
+ swim_fd_header_bin_create(&fd_header_bin, type,
+ swim->self->incarnation);
+ memcpy(pos, &fd_header_bin, size);
+ swim_packet_flush(packet);
+ return 1;
+}
+
/** Encode SWIM components into a sequence of UDP packets. */
static int
swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
@@ -453,6 +665,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
char *header = swim_packet_alloc(packet, 1);
int rc, map_size = 0;
+ rc = swim_encode_failure_detection(swim, msg, SWIM_FD_MSG_PING);
+ if (rc < 0)
+ goto error;
+ map_size += rc;
+
rc = swim_encode_anti_entropy(swim, msg);
if (rc < 0)
goto error;
@@ -490,10 +707,11 @@ swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
return;
}
struct swim_member *m =
- rlist_shift_entry(&swim->queue_round, struct swim_member,
+ rlist_first_entry(&swim->queue_round, struct swim_member,
in_queue_round);
swim_task_schedule(&swim->round_step_task,
swim->transport->send_round_msg, &m->addr);
+ swim_member_schedule_ack_wait(swim, m);
ev_periodic_stop(loop, p);
}
@@ -501,16 +719,84 @@ static void
swim_round_step_complete(struct swim_task *task)
{
struct swim *swim = container_of(task, struct swim, round_step_task);
+ rlist_shift_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
swim_msg_reset(&task->msg);
ev_periodic_start(loop(), &swim->round_tick);
}
+/** Send a failure detection message. */
+static void
+swim_schedule_fd_request(struct swim *swim, struct swim_task *task,
+ struct swim_member *m, enum swim_fd_msg_type type,
+ swim_transport_send_f send)
+{
+ struct swim_msg *msg = &task->msg;
+ int rc = swim_encode_failure_detection(swim, msg, type);
+ if (rc < 0) {
+ diag_log();
+ swim_task_delete(task);
+ return;
+ }
+ assert(rc > 0);
+ say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type],
+ sio_strfaddr((struct sockaddr *) &m->addr,
+ sizeof(m->addr)));
+ swim_task_schedule(task, send, &m->addr);
+}
+
+static inline void
+swim_schedule_ack(struct swim *swim, struct swim_member *member)
+{
+ swim_schedule_fd_request(swim, &member->ack_task, member,
+ SWIM_FD_MSG_ACK, swim->transport->send_ack);
+}
+
+static inline void
+swim_schedule_ping(struct swim *swim, struct swim_member *member)
+{
+ swim_schedule_fd_request(swim, &member->ping_task, member,
+ SWIM_FD_MSG_PING, swim->transport->send_ping);
+ swim_member_schedule_ack_wait(swim, member);
+}
+
+/**
+ * Check for failed pings. A ping is failed if an ack was not
+ * received during ACK_TIMEOUT. A failed ping is resent here.
+ */
+static void
+swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+ assert((events & EV_PERIODIC) != 0);
+ (void) loop;
+ (void) events;
+ struct swim *swim = (struct swim *) p->data;
+ struct swim_member *m, *tmp;
+ double current_time = fiber_time();
+ rlist_foreach_entry_safe(m, &swim->queue_wait_ack, in_queue_wait_ack,
+ tmp) {
+ if (current_time - m->ping_ts < ACK_TIMEOUT)
+ break;
+ ++m->failed_pings;
+ if (m->failed_pings >= NO_ACKS_TO_GC) {
+ if (!m->is_pinned)
+ swim_member_delete(swim, m);
+ continue;
+ }
+ if (m->failed_pings >= NO_ACKS_TO_DEAD)
+ m->status = MEMBER_DEAD;
+ swim_schedule_ping(swim, m);
+ rlist_del_entry(m, in_queue_wait_ack);
+ }
+}
+
/**
* SWIM member attributes from anti-entropy and dissemination
* messages.
*/
struct swim_member_def {
struct sockaddr_in addr;
+ uint64_t incarnation;
enum swim_member_status status;
};
@@ -530,9 +816,35 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
* members table.
*/
if (member == NULL) {
- member = swim_member_new(swim, &def->addr, def->status);
+ member = swim_member_new(swim, &def->addr, def->status,
+ def->incarnation);
if (member == NULL)
diag_log();
+ return;
+ }
+ struct swim_member *self = swim->self;
+ if (member != self) {
+ swim_member_update_status(swim, member, def->status,
+ def->incarnation);
+ return;
+ }
+ /*
+ * It is possible that other instances know a bigger
+ * incarnation of this instance - such thing happens when
+ * the instance restarts and loses its local incarnation
+ * number. It will be restored by receiving dissemination
+ * messages about self.
+ */
+ if (self->incarnation < def->incarnation)
+ self->incarnation = def->incarnation;
+ if (def->status != MEMBER_ALIVE &&
+ def->incarnation == self->incarnation) {
+ /*
+ * In the cluster a gossip exists that this
+ * instance is not alive. Refute this information
+ * with a bigger incarnation.
+ */
+ self->incarnation++;
}
}
@@ -576,6 +888,15 @@ swim_process_member_key(enum swim_member_key key, const char **pos,
}
def->addr.sin_port = port;
break;
+ case SWIM_MEMBER_INCARNATION:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member incarnation should be uint",
+ msg_pref);
+ return -1;
+ }
+ def->incarnation = mp_decode_uint(pos);
+ break;
default:
unreachable();
}
@@ -627,12 +948,95 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
return 0;
}
+/**
+ * Decode a failure detection message. Schedule pings, process
+ * acks.
+ */
+static int
+swim_process_failure_detection(struct swim *swim, const char **pos,
+ const char *end, const struct sockaddr_in *src)
+{
+ const char *msg_pref = "Invalid SWIM failure detection message:";
+ if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) {
+ say_error("%s root should be a map", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_map(pos);
+ if (size != 2) {
+ say_error("%s root map should have two keys - message type "\
+ "and incarnation", msg_pref);
+ return -1;
+ }
+ enum swim_fd_msg_type type = swim_fd_msg_type_MAX;
+ uint64_t incarnation = 0;
+ for (int i = 0; i < (int) size; ++i) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s a key should be uint", msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ switch(key) {
+ case SWIM_FD_MSG_TYPE:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s message type should be uint",
+ msg_pref);
+ return -1;
+ }
+ key = mp_decode_uint(pos);
+ if (key >= swim_fd_msg_type_MAX) {
+ say_error("%s unknown message type", msg_pref);
+ return -1;
+ }
+ type = key;
+ break;
+ case SWIM_FD_INCARNATION:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s incarnation should be uint",
+ msg_pref);
+ return -1;
+ }
+ incarnation = mp_decode_uint(pos);
+ break;
+ default:
+ say_error("%s unknown key", msg_pref);
+ return -1;
+ }
+ }
+ if (type == swim_fd_msg_type_MAX) {
+ say_error("%s message type should be specified", msg_pref);
+ return -1;
+ }
+ struct swim_member *sender = swim_find_member(swim, src);
+ if (sender == NULL) {
+ sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation);
+ if (sender == NULL) {
+ diag_log();
+ return 0;
+ }
+ } else {
+ swim_member_update_status(swim, sender, MEMBER_ALIVE,
+ incarnation);
+ }
+ if (type == SWIM_FD_MSG_PING) {
+ swim_schedule_ack(swim, sender);
+ } else {
+ assert(type == SWIM_FD_MSG_ACK);
+ if (incarnation >= sender->incarnation) {
+ sender->failed_pings = 0;
+ rlist_del_entry(sender, in_queue_wait_ack);
+ }
+ }
+ return 0;
+}
+
/** Receive and process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler,
const struct swim_packet *packet, const struct sockaddr_in *src)
{
- (void) src;
const char *msg_pref = "Invalid SWIM message:";
struct swim *swim = container_of(scheduler, struct swim, scheduler);
const char *pos = packet->body;
@@ -655,6 +1059,12 @@ swim_on_input(struct swim_scheduler *scheduler,
if (swim_process_anti_entropy(swim, &pos, end) != 0)
return;
break;
+ case SWIM_FAILURE_DETECTION:
+ say_verbose("SWIM: process failure detection");
+ if (swim_process_failure_detection(swim, &pos, end,
+ src) != 0)
+ return;
+ break;
default:
say_error("%s unknown component type component is "\
"supported", msg_pref);
@@ -726,6 +1136,12 @@ swim_new(void)
swim_round_step_complete);
swim->transport = &swim_udp_transport;
swim_scheduler_create(&swim->scheduler, swim_on_input, swim->transport);
+
+ /* Failure detection component. */
+ rlist_create(&swim->queue_wait_ack);
+ ev_init(&swim->wait_ack_tick, swim_check_acks);
+ ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
+ swim->wait_ack_tick.data = (void *) swim;
return swim;
}
@@ -738,7 +1154,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
struct swim_member *new_self = NULL;
if (swim_find_member(swim, &addr) == NULL) {
- new_self = swim_member_new(swim, &addr, MEMBER_ALIVE);
+ new_self = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
if (new_self == NULL)
return -1;
}
@@ -747,6 +1163,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
}
ev_periodic_start(loop(), &swim->round_tick);
+ ev_periodic_start(loop(), &swim->wait_ack_tick);
if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0)
ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL);
@@ -767,9 +1184,10 @@ swim_add_member(struct swim *swim, const char *uri)
return -1;
struct swim_member *member = swim_find_member(swim, &addr);
if (member == NULL) {
- member = swim_member_new(swim, &addr, MEMBER_ALIVE);
+ member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
if (member == NULL)
return -1;
+ member->is_pinned = true;
}
return 0;
}
@@ -800,6 +1218,8 @@ swim_info(struct swim *swim, struct info_handler *info)
sizeof(member->addr)));
info_append_str(info, "status",
swim_member_status_strs[member->status]);
+ info_append_int(info, "incarnation",
+ (int64_t) member->incarnation);
info_table_end(info);
}
info_end(info);
@@ -810,6 +1230,7 @@ swim_delete(struct swim *swim)
{
swim_scheduler_destroy(&swim->scheduler);
ev_periodic_stop(loop(), &swim->round_tick);
+ ev_periodic_stop(loop(), &swim->wait_ack_tick);
swim_task_destroy(&swim->round_step_task);
mh_int_t node = mh_first(swim->members);
while (node != mh_end(swim->members)) {
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 8a1eca819..00a16a2bb 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -23,6 +23,8 @@ swim_udp_recv_msg(int fd, void *buffer, size_t size, struct sockaddr *addr,
struct swim_transport swim_udp_transport = {
/* .send_round_msg = */ swim_udp_send_msg,
+ /* .send_ping = */ swim_udp_send_msg,
+ /* .send_ack = */ swim_udp_send_msg,
/* .recv_msg = */ swim_udp_recv_msg,
};
@@ -39,6 +41,18 @@ swim_packet_new(struct swim_msg *msg)
return res;
}
+struct swim_task *
+swim_task_new(struct swim_scheduler *scheduler, swim_task_f complete)
+{
+ struct swim_task *task = (struct swim_task *) malloc(sizeof(*task));
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+ return NULL;
+ }
+ swim_task_create(task, scheduler, complete);
+ return task;
+}
+
void
swim_task_schedule(struct swim_task *task, swim_transport_send_f send,
const struct sockaddr_in *dst)
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index fc44fd0a7..f08bd1ef3 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -279,12 +279,21 @@ swim_task_create(struct swim_task *task, struct swim_scheduler *scheduler,
task->scheduler = scheduler;
}
+struct swim_task *
+swim_task_new(struct swim_scheduler *scheduler, swim_task_f complete);
+
static inline bool
swim_task_is_active(struct swim_task *task)
{
return ! rlist_empty(&task->in_queue_output);
}
+static inline void
+swim_task_reset(struct swim_task *task)
+{
+ swim_msg_reset(&task->msg);
+}
+
static inline void
swim_task_destroy(struct swim_task *task)
{
@@ -292,6 +301,13 @@ swim_task_destroy(struct swim_task *task)
swim_msg_destroy(&task->msg);
}
+static inline void
+swim_task_delete(struct swim_task *task)
+{
+ swim_task_destroy(task);
+ free(task);
+}
+
#if defined(__cplusplus)
}
#endif
diff --git a/src/lib/swim/swim_transport.h b/src/lib/swim/swim_transport.h
index d629526ac..0bf4aa186 100644
--- a/src/lib/swim/swim_transport.h
+++ b/src/lib/swim/swim_transport.h
@@ -52,7 +52,9 @@ struct swim_transport {
* are like sendto().
*/
swim_transport_send_f send_round_msg;
-
+ /** Send failure detection message. */
+ swim_transport_send_f send_ping;
+ swim_transport_send_f send_ack;
/**
* Receive a message. Not necessary round or failure
* detection. Before message is received, its type is
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list