Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: vdavydov.dev@gmail.com, kostja@tarantool.org
Subject: [PATCH v2 2/6] [RAW] swim: introduce failure detection component
Date: Tue, 25 Dec 2018 22:19:25 +0300	[thread overview]
Message-ID: <4c60b22dd4354b56d719d66dd4b928075dea7cba.1545765055.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1545765055.git.v.shpilevoy@tarantool.org>
In-Reply-To: <cover.1545765055.git.v.shpilevoy@tarantool.org>

Failure detection components allows to find which members are
already dead.

Part of #3234
---
 src/lib/swim/swim.c | 457 +++++++++++++++++++++++++++++++++++++++++++-
 src/lib/swim/swim.h |   9 +
 2 files changed, 459 insertions(+), 7 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 989d83a22..22bc06a60 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -99,6 +99,22 @@ enum {
 	 * networks by their admins.
 	 */
 	UDP_PACKET_SIZE = 1472,
+	/**
+	 * If a ping was sent, it is considered to be lost after
+	 * this time without an ack.
+	 */
+	ACK_TIMEOUT = 1,
+	/**
+	 * If a member has not been responding to pings this
+	 * number of times, it is considered to be dead.
+	 */
+	NO_ACKS_TO_DEAD = 3,
+	/**
+	 * If a not pinned member confirmed to be dead, it is
+	 * removed from the membership after at least this number
+	 * of failed pings.
+	 */
+	NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
 };
 
 static ssize_t
@@ -124,6 +140,7 @@ swim_udp_recv_msg(int fd, void *buffer, size_t size, struct sockaddr *addr,
 /** UDP sendto/recvfrom implementation of swim_transport. */
 static struct swim_transport swim_udp_transport = {
 	/* .send_round_msg = */ swim_udp_send_msg,
+	/* .send_failure_detection_msg = */ swim_udp_send_msg,
 	/* .recv_msg = */ swim_udp_recv_msg,
 };
 
@@ -156,6 +173,12 @@ swim_io_task_create(struct swim_io_task *task, swim_io_task_f cb,
 	rlist_create(&task->in_queue_output);
 }
 
+static inline void
+swim_io_task_destroy(struct swim_io_task *task)
+{
+	rlist_del_entry(task, in_queue_output);
+}
+
 /**
  * UDP body size is limited by definition. To be able to send a
  * big message it should be split into multiple packets. Each
@@ -287,11 +310,17 @@ enum swim_member_status {
 	 * members table.
 	 */
 	MEMBER_ALIVE = 0,
+	/**
+	 * The member is considered to be dead. It will disappear
+	 * from the membership, if it is not pinned.
+	 */
+	MEMBER_DEAD,
 	swim_member_status_MAX,
 };
 
 static const char *swim_member_status_strs[] = {
 	"alive",
+	"dead",
 };
 
 /**
@@ -316,6 +345,38 @@ struct swim_member {
 	 * Position in a queue of members in the current round.
 	 */
 	struct rlist in_queue_round;
+	/**
+	 *
+	 *               Failure detection component
+	 */
+	/**
+	 * True, if the member is configured explicitly and can
+	 * not disappear from the membership.
+	 */
+	bool is_pinned;
+	/** Growing number to refute old messages. */
+	uint64_t incarnation;
+	/**
+	 * How many pings did not receive an ack in a row. After
+	 * a threshold the instance is marked as dead. After more
+	 * it is removed from the table (if not pinned).
+	 */
+	int failed_pings;
+	/** When the last ping was sent. */
+	double ping_ts;
+	/**
+	 * Ready at hand ack task to send when this member has
+	 * sent ping to us.
+	 */
+	struct swim_io_task ack_task;
+	/**
+	 * Ready at hand ping task to send when this member too
+	 * long does not respond to an initial ping, piggybacked
+	 * with members table.
+	 */
+	struct swim_io_task ping_task;
+	/** Position in a queue of members waiting for an ack. */
+	struct rlist in_queue_wait_ack;
 };
 
 /**
@@ -381,6 +442,15 @@ struct swim {
 	int shuffled_members_size;
 	/** Queue of output tasks ready to write now. */
 	struct rlist queue_output;
+	/**
+	 * Members waiting for an ACK. On too long absence of ACK
+	 * a member is considered to be dead and is removed. The
+	 * list is sorted by time in ascending order (tail is
+	 * newer, head is older).
+	 */
+	struct rlist queue_wait_ack;
+	/** Generator of ack checking events. */
+	struct ev_periodic wait_ack_tick;
 };
 
 static inline uint64_t
@@ -396,8 +466,84 @@ sockaddr_in_hash(const struct sockaddr_in *a)
  */
 enum swim_component_type {
 	SWIM_ANTI_ENTROPY = 0,
+	SWIM_FAILURE_DETECTION,
 };
 
+/** {{{                Failure detection component              */
+
+/** Possible failure detection keys. */
+enum swim_fd_key {
+	/** Type of the failure detection message: ping or ack. */
+	SWIM_FD_MSG_TYPE,
+	/**
+	 * Incarnation of the sender. To make the member alive if
+	 * it was considered to be dead, but ping/ack with greater
+	 * incarnation was received from it.
+	 */
+	SWIM_FD_INCARNATION,
+};
+
+/**
+ * Failure detection message now has only two types: ping or ack.
+ * Indirect ping/ack are todo.
+ */
+enum swim_fd_msg_type {
+	SWIM_FD_MSG_PING,
+	SWIM_FD_MSG_ACK,
+	swim_fd_msg_type_MAX,
+};
+
+static const char *swim_fd_msg_type_strs[] = {
+	"ping",
+	"ack",
+};
+
+/** SWIM failure detection MsgPack header template. */
+struct PACKED swim_fd_header_bin {
+	/** mp_encode_uint(SWIM_FAILURE_DETECTION) */
+	uint8_t k_header;
+	/** mp_encode_map(2) */
+	uint8_t m_header;
+
+	/** mp_encode_uint(SWIM_FD_MSG_TYPE) */
+	uint8_t k_type;
+	/** mp_encode_uint(enum swim_fd_msg_type) */
+	uint8_t v_type;
+
+	/** mp_encode_uint(SWIM_FD_INCARNATION) */
+	uint8_t k_incarnation;
+	/** mp_encode_uint(64bit incarnation) */
+	uint8_t m_incarnation;
+	uint64_t v_incarnation;
+};
+
+static inline void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+			  enum swim_fd_msg_type type, uint64_t incarnation)
+{
+	header->k_header = SWIM_FAILURE_DETECTION;
+	header->m_header = 0x82;
+
+	header->k_type = SWIM_FD_MSG_TYPE;
+	header->v_type = type;
+
+	header->k_incarnation = SWIM_FD_INCARNATION;
+	header->m_incarnation = 0xcf;
+	header->v_incarnation = mp_bswap_u64(incarnation);
+}
+
+static void
+swim_member_schedule_ack_wait(struct swim *swim, struct swim_member *member)
+{
+	if (rlist_empty(&member->in_queue_wait_ack)) {
+		member->ping_ts = fiber_time();
+		rlist_add_tail_entry(&swim->queue_wait_ack, member,
+				     in_queue_wait_ack);
+	}
+}
+
+/** }}}               Failure detection component               */
+
 /** {{{                  Anti-entropy component                 */
 
 /**
@@ -412,6 +558,7 @@ enum swim_member_key {
 	 */
 	SWIM_MEMBER_ADDR,
 	SWIM_MEMBER_PORT,
+	SWIM_MEMBER_INCARNATION,
 	swim_member_key_MAX,
 };
 
@@ -435,7 +582,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
 
 /** SWIM member MsgPack template. */
 struct PACKED swim_member_bin {
-	/** mp_encode_map(3) */
+	/** mp_encode_map(4) */
 	uint8_t m_header;
 
 	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -454,6 +601,12 @@ struct PACKED swim_member_bin {
 	/** mp_encode_uint(addr.sin_port) */
 	uint8_t m_port;
 	uint16_t v_port;
+
+	/** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+	uint8_t k_incarnation;
+	/** mp_encode_uint(64bit incarnation) */
+	uint8_t m_incarnation;
+	uint64_t v_incarnation;
 };
 
 static inline void
@@ -463,17 +616,20 @@ swim_member_bin_reset(struct swim_member_bin *header,
 	header->v_status = member->status;
 	header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
 	header->v_port = mp_bswap_u16(member->addr.sin_port);
+	header->v_incarnation = mp_bswap_u64(member->incarnation);
 }
 
 static inline void
 swim_member_bin_create(struct swim_member_bin *header)
 {
-	header->m_header = 0x83;
+	header->m_header = 0x84;
 	header->k_status = SWIM_MEMBER_STATUS;
 	header->k_addr = SWIM_MEMBER_ADDR;
 	header->m_addr = 0xce;
 	header->k_port = SWIM_MEMBER_PORT;
 	header->m_port = 0xcd;
+	header->k_incarnation = SWIM_MEMBER_INCARNATION;
+	header->m_incarnation = 0xcf;
 }
 
 /** }}}                  Anti-entropy component                 */
@@ -481,11 +637,19 @@ swim_member_bin_create(struct swim_member_bin *header)
 /**
  * SWIM message structure:
  * {
+ *     SWIM_FAILURE_DETECTION: {
+ *         SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type,
+ *         SWIM_FD_INCARNATION: uint
+ *     },
+ *
+ *                 OR/AND
+ *
  *     SWIM_ANTI_ENTROPY: [
  *         {
  *             SWIM_MEMBER_STATUS: uint, enum member_status,
  *             SWIM_MEMBER_ADDR: uint, ip,
- *             SWIM_MEMBER_PORT: uint, port
+ *             SWIM_MEMBER_PORT: uint, port,
+ *             SWIM_MEMBER_INCARNATION: uint
  *         },
  *         ...
  *     ],
@@ -499,13 +663,45 @@ swim_io_task_push(struct swim_io_task *task)
 	ev_io_start(loop(), &task->swim->output);
 }
 
+/**
+ * Update status of the member if needed. Statuses are compared as
+ * a compound key: {incarnation, status}. So @a new_status can
+ * override an old one only if its incarnation is greater, or the
+ * same, but its status is "bigger". Statuses are compared by
+ * their identifier, so "alive" < "dead". This protects from the
+ * case when a member is detected as dead on one instance, but
+ * overriden by another instance with the same incarnation "alive"
+ * message.
+ */
+static inline void
+swim_member_update_status(struct swim *swim, struct swim_member *member,
+			  enum swim_member_status new_status,
+			  uint64_t incarnation)
+{
+	(void) swim;
+	assert(member != swim->self);
+	if (member->incarnation == incarnation) {
+		if (member->status < new_status)
+			member->status = new_status;
+	} else if (member->incarnation < incarnation) {
+		member->status = new_status;
+		member->incarnation = incarnation;
+	}
+}
+
+static void
+swim_send_ack(struct swim_io_task *task);
+
+static void
+swim_send_ping(struct swim_io_task *task);
+
 /**
  * Register a new member with a specified status. Here it is
  * added to the hash, to the 'next' queue.
  */
 static struct swim_member *
 swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
-		enum swim_member_status status)
+		enum swim_member_status status, uint64_t incarnation)
 {
 	struct swim_member *member =
 		(struct swim_member *) calloc(1, sizeof(*member));
@@ -515,6 +711,7 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
 	}
 	member->status = status;
 	member->addr = *addr;
+	member->incarnation = incarnation;
 	struct mh_i64ptr_node_t node;
 	node.key = sockaddr_in_hash(addr);
 	node.val = member;
@@ -524,7 +721,10 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
 		diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
 		return NULL;
 	}
+	swim_io_task_create(&member->ack_task, swim_send_ack, swim);
+	swim_io_task_create(&member->ping_task, swim_send_ping, swim);
 	rlist_add_entry(&swim->queue_round, member, in_queue_round);
+	rlist_create(&member->in_queue_wait_ack);
 	return member;
 }
 
@@ -549,7 +749,10 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
 	mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL);
 	assert(rc != mh_end(swim->members));
 	mh_i64ptr_del(swim->members, rc, NULL);
+	swim_io_task_destroy(&member->ack_task);
+	swim_io_task_destroy(&member->ping_task);
 	rlist_del_entry(member, in_queue_round);
+	rlist_del_entry(member, in_queue_wait_ack);
 	free(member);
 }
 
@@ -648,6 +851,27 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg)
 	return i;
 }
 
+/**
+ * Encode failure detection component.
+ * @retval Number of encoded messages.
+ */
+static int
+swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg,
+			      enum swim_fd_msg_type type)
+{
+	struct swim_fd_header_bin fd_header_bin;
+	int size = sizeof(fd_header_bin);
+	struct swim_msg_part *part = swim_msg_reserve(msg, size);
+	if (part == NULL)
+		return -1;
+	char *pos = swim_msg_part_pos(part);
+	swim_fd_header_bin_create(&fd_header_bin, type,
+				  swim->self->incarnation);
+	memcpy(pos, &fd_header_bin, size);
+	swim_msg_part_advance(part, size);
+	return 1;
+}
+
 /** Encode SWIM components into a sequence of UDP packets. */
 static int
 swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
@@ -660,6 +884,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
 	int rc, map_size = 0;
 	swim_msg_part_advance(part, 1);
 
+	rc = swim_encode_failure_detection(swim, msg, SWIM_FD_MSG_PING);
+	if (rc < 0)
+		goto error;
+	map_size += rc > 0;
+
 	rc = swim_encode_anti_entropy(swim, msg);
 	if (rc < 0)
 		goto error;
@@ -713,11 +942,58 @@ swim_send_round_msg(struct swim_io_task *task)
 			diag_log();
 	}
 	swim_msg_destroy(&msg);
+	swim_member_schedule_ack_wait(swim, m);
 	rlist_del_entry(m, in_queue_round);
 next_round_step:
 	ev_periodic_start(loop(), &swim->round_tick);
 }
 
+/** Send a failure detection message. */
+static void
+swim_send_fd_message(struct swim *swim, struct swim_member *m,
+		     enum swim_fd_msg_type type)
+{
+	struct swim_msg msg;
+	swim_msg_create(&msg);
+	int rc = swim_encode_failure_detection(swim, &msg, type);
+	if (rc < 0) {
+		diag_log();
+		swim_msg_destroy(&msg);
+		return;
+	}
+	assert(rc > 0);
+	struct swim_msg_part *part = swim_msg_first_part(&msg);
+	struct sockaddr *addr = (struct sockaddr *) &m->addr;
+	assert(swim_msg_part_is_last(part));
+	say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type],
+		    sio_strfaddr(addr, sizeof(m->addr)));
+	if (swim->transport.send_failure_detection_msg(swim->output.fd,
+						       part->body, part->size,
+						       addr,
+						       sizeof(m->addr)) == -1)
+		diag_log();
+	swim_msg_destroy(&msg);
+}
+
+static void
+swim_send_ack(struct swim_io_task *task)
+{
+	assert(task->cb == swim_send_ack);
+	struct swim_member *m = container_of(task, struct swim_member,
+					     ack_task);
+	swim_send_fd_message(task->swim, m, SWIM_FD_MSG_ACK);
+}
+
+static void
+swim_send_ping(struct swim_io_task *task)
+{
+	assert(task->cb == swim_send_ping);
+	struct swim_member *m = container_of(task, struct swim_member,
+					     ping_task);
+	swim_send_fd_message(task->swim, m, SWIM_FD_MSG_PING);
+	swim_member_schedule_ack_wait(task->swim, m);
+}
+
 static void
 swim_on_output(struct ev_loop *loop, struct ev_io *io, int events)
 {
@@ -745,12 +1021,43 @@ swim_trigger_round_step(struct ev_loop *loop, struct ev_periodic *p, int events)
 	ev_periodic_stop(loop, p);
 }
 
+/**
+ * Check for failed pings. A ping is failed if an ack was not
+ * received during ACK_TIMEOUT. A failed ping is resent here.
+ */
+static void
+swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+	assert((events & EV_PERIODIC) != 0);
+	(void) loop;
+	(void) events;
+	struct swim *swim = (struct swim *) p->data;
+	struct swim_member *m, *tmp;
+	double current_time = fiber_time();
+	rlist_foreach_entry_safe(m, &swim->queue_wait_ack, in_queue_wait_ack,
+				 tmp) {
+		if (current_time - m->ping_ts < ACK_TIMEOUT)
+			break;
+		++m->failed_pings;
+		if (m->failed_pings >= NO_ACKS_TO_GC) {
+			if (!m->is_pinned)
+				swim_member_delete(swim, m);
+			continue;
+		}
+		if (m->failed_pings >= NO_ACKS_TO_DEAD)
+			m->status = MEMBER_DEAD;
+		swim_io_task_push(&m->ping_task);
+		rlist_del_entry(m, in_queue_wait_ack);
+	}
+}
+
 /**
  * SWIM member attributes from anti-entropy and dissemination
  * messages.
  */
 struct swim_member_def {
 	struct sockaddr_in addr;
+	uint64_t incarnation;
 	enum swim_member_status status;
 };
 
@@ -760,6 +1067,7 @@ swim_member_def_create(struct swim_member_def *def)
 	def->addr.sin_port = 0;
 	def->addr.sin_addr.s_addr = 0;
 	def->status = MEMBER_ALIVE;
+	def->incarnation = 0;
 }
 
 static void
@@ -771,9 +1079,35 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
 	 * members table.
 	 */
 	if (member == NULL) {
-		member = swim_member_new(swim, &def->addr, def->status);
+		member = swim_member_new(swim, &def->addr, def->status,
+					 def->incarnation);
 		if (member == NULL)
 			diag_log();
+		return;
+	}
+	struct swim_member *self = swim->self;
+	if (member != self) {
+		swim_member_update_status(swim, member, def->status,
+					  def->incarnation);
+		return;
+	}
+	/*
+	 * It is possible that other instances know a bigger
+	 * incarnation of this instance - such thing happens when
+	 * the instance restarts and loses its local incarnation
+	 * number. It will be restored by receiving dissemination
+	 * messages about self.
+	 */
+	if (self->incarnation < def->incarnation)
+		self->incarnation = def->incarnation;
+	if (def->status != MEMBER_ALIVE &&
+	    def->incarnation == self->incarnation) {
+		/*
+		 * In the cluster a gossip exists that this
+		 * instance is not alive. Refute this information
+		 * with a bigger incarnation.
+		 */
+		self->incarnation++;
 	}
 }
 
@@ -817,6 +1151,15 @@ swim_process_member_key(enum swim_member_key key, const char **pos,
 		}
 		def->addr.sin_port = port;
 		break;
+	case SWIM_MEMBER_INCARNATION:
+		if (mp_typeof(**pos) != MP_UINT ||
+		    mp_check_uint(*pos, end) > 0) {
+			say_error("%s member incarnation should be uint",
+				  msg_pref);
+			return -1;
+		}
+		def->incarnation = mp_decode_uint(pos);
+		break;
 	default:
 		unreachable();
 	}
@@ -868,6 +1211,91 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
 	return 0;
 }
 
+/**
+ * Decode a failure detection message. Schedule pings, process
+ * acks.
+ */
+static int
+swim_process_failure_detection(struct swim *swim, const char **pos,
+			       const char *end, const struct sockaddr_in *src)
+{
+	const char *msg_pref = "Invalid SWIM failure detection message:";
+	if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) {
+		say_error("%s root should be a map", msg_pref);
+		return -1;
+	}
+	uint64_t size = mp_decode_map(pos);
+	if (size != 2) {
+		say_error("%s root map should have two keys - message type "\
+			  "and incarnation", msg_pref);
+		return -1;
+	}
+	enum swim_fd_msg_type type = swim_fd_msg_type_MAX;
+	uint64_t incarnation = 0;
+	for (int i = 0; i < (int) size; ++i) {
+		if (mp_typeof(**pos) != MP_UINT ||
+		    mp_check_uint(*pos, end) > 0) {
+			say_error("%s a key should be uint", msg_pref);
+			return -1;
+		}
+		uint64_t key = mp_decode_uint(pos);
+		switch(key) {
+		case SWIM_FD_MSG_TYPE:
+			if (mp_typeof(**pos) != MP_UINT ||
+			    mp_check_uint(*pos, end) > 0) {
+				say_error("%s message type should be uint",
+					  msg_pref);
+				return -1;
+			}
+			key = mp_decode_uint(pos);
+			if (key >= swim_fd_msg_type_MAX) {
+				say_error("%s unknown message type", msg_pref);
+				return -1;
+			}
+			type = key;
+			break;
+		case SWIM_FD_INCARNATION:
+			if (mp_typeof(**pos) != MP_UINT ||
+			    mp_check_uint(*pos, end) > 0) {
+				say_error("%s incarnation should be uint",
+					  msg_pref);
+				return -1;
+			}
+			incarnation = mp_decode_uint(pos);
+			break;
+		default:
+			say_error("%s unknown key", msg_pref);
+			return -1;
+		}
+	}
+	if (type == swim_fd_msg_type_MAX) {
+		say_error("%s message type should be specified", msg_pref);
+		return -1;
+	}
+	struct swim_member *sender = swim_find_member(swim, src);
+	if (sender == NULL) {
+		sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation);
+		if (sender == NULL) {
+			diag_log();
+			return 0;
+		}
+	} else {
+		swim_member_update_status(swim, sender, MEMBER_ALIVE,
+					  incarnation);
+	}
+	if (type == SWIM_FD_MSG_PING) {
+		swim_io_task_push(&sender->ack_task);
+	} else {
+		assert(type == SWIM_FD_MSG_ACK);
+		if (incarnation >= sender->incarnation) {
+			sender->failed_pings = 0;
+			rlist_del_entry(&sender->ping_task, in_queue_output);
+			rlist_del_entry(sender, in_queue_wait_ack);
+		}
+	}
+	return 0;
+}
+
 /** Receive and process a new message. */
 static void
 swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
@@ -910,6 +1338,12 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
 			if (swim_process_anti_entropy(swim, &pos, end) != 0)
 				return;
 			break;
+		case SWIM_FAILURE_DETECTION:
+			say_verbose("SWIM: process failure detection");
+			if (swim_process_failure_detection(swim, &pos, end,
+							   &addr) != 0)
+				return;
+			break;
 		default:
 			say_error("%s unknown component type component is "\
 				  "supported", msg_pref);
@@ -984,6 +1418,10 @@ swim_new(void)
 	swim->output.data = (void *) swim;
 	swim->transport = swim_udp_transport;
 	rlist_create(&swim->queue_output);
+	rlist_create(&swim->queue_wait_ack);
+	ev_init(&swim->wait_ack_tick, swim_check_acks);
+	ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
+	swim->wait_ack_tick.data = (void *) swim;
 	return swim;
 }
 
@@ -1024,7 +1462,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 			new_self = swim_find_member(swim, &addr);
 			if (new_self == NULL) {
 				new_self = swim_member_new(swim, &addr,
-							   MEMBER_ALIVE);
+							   MEMBER_ALIVE, 0);
 				if (new_self == NULL) {
 					close(fd);
 					return -1;
@@ -1034,6 +1472,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 			ev_io_set(&swim->input, fd, EV_READ);
 			ev_io_set(&swim->output, fd, EV_WRITE);
 			ev_periodic_start(loop(), &swim->round_tick);
+			ev_periodic_start(loop(), &swim->wait_ack_tick);
 		}
 	}
 
@@ -1054,9 +1493,10 @@ swim_add_member(struct swim *swim, const char *uri)
 		return -1;
 	struct swim_member *member = swim_find_member(swim, &addr);
 	if (member == NULL) {
-		member = swim_member_new(swim, &addr, MEMBER_ALIVE);
+		member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
 		if (member == NULL)
 			return -1;
+		member->is_pinned = true;
 	}
 	return 0;
 }
@@ -1087,6 +1527,8 @@ swim_info(struct swim *swim, struct info_handler *info)
 					      sizeof(member->addr)));
 		info_append_str(info, "status",
 				swim_member_status_strs[member->status]);
+		info_append_int(info, "incarnation",
+				(int64_t) member->incarnation);
 		info_table_end(info);
 	}
 	info_end(info);
@@ -1099,6 +1541,7 @@ swim_delete(struct swim *swim)
 	ev_io_stop(loop(), &swim->output);
 	ev_io_stop(loop(), &swim->input);
 	ev_periodic_stop(loop(), &swim->round_tick);
+	ev_periodic_stop(loop(), &swim->wait_ack_tick);
 	mh_int_t node = mh_first(swim->members);
 	while (node != mh_end(swim->members)) {
 		struct swim_member *m = (struct swim_member *)
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 77e16ed53..51f0c144d 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -54,6 +54,15 @@ struct swim_transport {
 	(*send_round_msg)(int fd, const void *data, size_t size,
 			  const struct sockaddr *addr, socklen_t addr_size);
 
+	/**
+	 * Send failure detection message. Contains ping, ack.
+	 * Parameters are like sendto().
+	 */
+	ssize_t
+	(*send_failure_detection_msg)(int fd, const void *data, size_t size,
+				      const struct sockaddr *addr,
+				      socklen_t addr_size);
+
 	/**
 	 * Receive a message. Not necessary round or failure
 	 * detection. Before message is received, its type is
-- 
2.17.2 (Apple Git-113)

  parent reply	other threads:[~2018-12-25 19:19 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-12-25 19:19 [PATCH v2 0/6] SWIM Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 1/6] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
2018-12-25 19:19 ` Vladislav Shpilevoy [this message]
2018-12-25 19:19 ` [PATCH v2 3/6] [RAW] swim: introduce a dissemination component Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 4/6] [RAW] swim: keep encoded round message cached Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 5/6] [RAW] swim: send one UDP packet per EV_WRITE event Vladislav Shpilevoy
2018-12-26 21:01 ` [tarantool-patches] [PATCH v2 0/6] SWIM Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4c60b22dd4354b56d719d66dd4b928075dea7cba.1545765055.git.v.shpilevoy@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [PATCH v2 2/6] [RAW] swim: introduce failure detection component' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox