[tarantool-patches] [PATCH v2 3/3] swim: introduce suspicion

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Apr 24 23:22:00 MSK 2019


Suspicion component is a way how SWIM protects from
false-positive failure detections. When the network is slow, or
a SWIM node does not manage to process messages in time because
of being overloaded, other nodes will not receive ACKs in time,
but it is too soon to declare the member dead.

The nodes will mark the member as suspected, and will ping it
indirectly, via other members. It 1) gives the suspected member
more time to respond on ACKs, 2) protects from the case when it
is a network problem on particular channels.

Part of #3234
---
 src/lib/swim/swim.c           | 146 ++++++++++++++++++++++++++++++----
 src/lib/swim/swim_constants.h |   6 ++
 src/lib/swim/swim_proto.c     |   1 +
 test/unit/swim.c              |  76 +++++++++++++-----
 test/unit/swim.result         |  30 ++++---
 5 files changed, 213 insertions(+), 46 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 1b4a4365d..3917c71f3 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -143,11 +143,17 @@ enum {
 	 */
 	ACK_TIMEOUT_DEFAULT = 30,
 	/**
-	 * If a member has not been responding to pings this
-	 * number of times, it is considered dead. According to
-	 * the SWIM paper, for a member it is sufficient to miss
-	 * one direct ping, and an arbitrary but fixed number of
-	 * simultaneous indirect pings, to be considered dead.
+	 * If an alive member has not been responding to pings
+	 * this number of times, it is suspected to be dead. To
+	 * confirm the death it should fail more pings.
+	 */
+	NO_ACKS_TO_SUSPECT = 2,
+	/**
+	 * If a suspected member has not been responding to pings
+	 * this number of times, it is considered dead. According
+	 * to the SWIM paper, for a member it is sufficient to
+	 * miss one direct ping, and an arbitrary but fixed number
+	 * of simultaneous indirect pings, to be considered dead.
 	 * Seems too little, so here it is bigger.
 	 */
 	NO_ACKS_TO_DEAD = 3,
@@ -161,6 +167,13 @@ enum {
 	 * anti-entropy components.
 	 */
 	NO_ACKS_TO_GC = 2,
+	/**
+	 * Number of pings sent indirectly to a member via other
+	 * members when it did not answer on a regular ping. The
+	 * messages are sent in parallel and via different
+	 * members.
+	 */
+	INDIRECT_PING_COUNT = 2,
 };
 
 /**
@@ -476,10 +489,19 @@ swim_cached_round_msg_invalidate(struct swim *swim)
 
 /** Put the member into a list of ACK waiters. */
 static void
-swim_wait_ack(struct swim *swim, struct swim_member *member)
+swim_wait_ack(struct swim *swim, struct swim_member *member,
+	      bool was_ping_indirect)
 {
 	if (heap_node_is_stray(&member->in_wait_ack_heap)) {
-		member->ping_deadline = swim_time() + swim->wait_ack_tick.at;
+		double timeout = swim->wait_ack_tick.at;
+		/*
+		 * Direct ping is two hops: PING + ACK.
+		 * Indirect ping is four hops: PING, FORWARD PING,
+		 * ACK, FORWARD ACK. This is why x2 for indirects.
+		 */
+		if (was_ping_indirect)
+			timeout *= 2;
+		member->ping_deadline = swim_time() + timeout;
 		wait_ack_heap_insert(&swim->wait_ack_heap, member);
 		swim_ev_timer_start(loop(), &swim->wait_ack_tick);
 	}
@@ -609,7 +631,7 @@ swim_ping_task_complete(struct swim_task *task,
 	struct swim *swim = swim_by_scheduler(scheduler);
 	struct swim_member *m = container_of(task, struct swim_member,
 					     ping_task);
-	swim_wait_ack(swim, m);
+	swim_wait_ack(swim, m, false);
 }
 
 /** Free member's resources. */
@@ -1064,7 +1086,7 @@ swim_complete_step(struct swim_task *task,
 			 * dissemination and failure detection
 			 * sections.
 			 */
-			swim_wait_ack(swim, m);
+			swim_wait_ack(swim, m, false);
 			swim_decrease_event_ttd(swim);
 		}
 	}
@@ -1073,13 +1095,16 @@ swim_complete_step(struct swim_task *task,
 /** Schedule send of a failure detection message. */
 static void
 swim_send_fd_msg(struct swim *swim, struct swim_task *task,
-		 const struct sockaddr_in *dst, enum swim_fd_msg_type type)
+		 const struct sockaddr_in *dst, enum swim_fd_msg_type type,
+		 const struct sockaddr_in *proxy)
 {
 	/*
 	 * Reset packet allocator in case if task is being reused.
 	 */
 	assert(! swim_task_is_scheduled(task));
 	swim_packet_create(&task->packet);
+	if (proxy != NULL)
+		swim_task_proxy(task, proxy);
 	char *header = swim_packet_alloc(&task->packet, 1);
 	int map_size = swim_encode_src_uuid(swim, &task->packet);
 	map_size += swim_encode_failure_detection(swim, &task->packet, type);
@@ -1095,7 +1120,21 @@ static inline void
 swim_send_ack(struct swim *swim, struct swim_task *task,
 	      const struct sockaddr_in *dst)
 {
-	swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK);
+	swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK, NULL);
+}
+
+/** Schedule an indirect ack through @a proxy. */
+static inline int
+swim_send_indirect_ack(struct swim *swim, const struct sockaddr_in *dst,
+		       const struct sockaddr_in *proxy)
+{
+	struct swim_task *task =
+		swim_task_new(swim_task_delete_cb, swim_task_delete_cb,
+			      "indirect ack");
+	if (task == NULL)
+		return -1;
+	swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK, proxy);
+	return 0;
 }
 
 /** Schedule send of a ping. */
@@ -1103,7 +1142,68 @@ static inline void
 swim_send_ping(struct swim *swim, struct swim_task *task,
 	       const struct sockaddr_in *dst)
 {
-	swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_PING);
+	swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_PING, NULL);
+}
+
+/** Indirect ping task completion callback. */
+static void
+swim_iping_task_complete(struct swim_task *task,
+			 struct swim_scheduler *scheduler, int rc)
+{
+	if (rc < 0)
+		goto finish;
+	struct swim *swim = swim_by_scheduler(scheduler);
+	struct swim_member *m = swim_find_member(swim, &task->uuid);
+	/*
+	 * A member can be already removed, probably manually, so
+	 * check for NULL. Additionally it is possible that before
+	 * this indirect ping managed to get EV_WRITE, already an
+	 * ACK was received and the member is alive again. Then
+	 * nothing to do.
+	 */
+	if (m != NULL && m->status != MEMBER_ALIVE)
+		swim_wait_ack(swim, m, true);
+finish:
+	swim_task_delete_cb(task, scheduler, rc);
+}
+
+/** Schedule a number of indirect pings to a member @a dst. */
+static inline int
+swim_send_indirect_pings(struct swim *swim, const struct swim_member *dst)
+{
+	struct mh_swim_table_t *t = swim->members;
+	int member_count = mh_size(t);
+	int rnd = swim_scaled_rand(0, member_count - 1);
+	mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
+	for (int member_i = 0, task_i = 0; member_i < member_count &&
+	     task_i < INDIRECT_PING_COUNT; ++member_i) {
+		struct swim_member *m = *mh_swim_table_node(t, rc);
+		/*
+		 * It makes no sense to send an indirect ping via
+		 * self and via destination - it would be just
+		 * direct ping then.
+		 */
+		if (m != swim->self && !swim_inaddr_eq(&dst->addr, &m->addr)) {
+			struct swim_task *t =
+				swim_task_new(swim_iping_task_complete,
+					      swim_task_delete_cb,
+					      "indirect ping");
+			if (t == NULL)
+				return -1;
+			t->uuid = dst->uuid;
+			swim_task_proxy(t, &m->addr);
+			swim_send_fd_msg(swim, t, &dst->addr, SWIM_FD_MSG_PING,
+					 &m->addr);
+		}
+		/*
+		 * First random member could be chosen too close
+		 * to the hash end. Here the cycle is wrapped.
+		 */
+		rc = mh_next(t, rc);
+		if (rc == end)
+			rc = mh_first(t);
+	}
+	return 0;
 }
 
 /**
@@ -1128,6 +1228,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
 		++m->unacknowledged_pings;
 		switch (m->status) {
 		case MEMBER_ALIVE:
+			if (m->unacknowledged_pings < NO_ACKS_TO_SUSPECT)
+				break;
+			m->status = MEMBER_SUSPECTED;
+			swim_on_member_update(swim, m);
+			if (swim_send_indirect_pings(swim, m) != 0)
+				diag_log();
+			break;
+		case MEMBER_SUSPECTED:
 			if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
 				m->status = MEMBER_DEAD;
 				swim_on_member_update(swim, m);
@@ -1315,7 +1423,8 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
 static int
 swim_process_failure_detection(struct swim *swim, const char **pos,
 			       const char *end, const struct sockaddr_in *src,
-			       const struct tt_uuid *uuid)
+			       const struct tt_uuid *uuid,
+			       const struct sockaddr_in *proxy)
 {
 	const char *prefix = "invalid failure detection message:";
 	struct swim_failure_detection_def def;
@@ -1360,8 +1469,13 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
 
 	switch (def.type) {
 	case SWIM_FD_MSG_PING:
-		if (! swim_task_is_scheduled(&member->ack_task))
+		if (proxy != NULL) {
+			if (swim_send_indirect_ack(swim, &member->addr,
+						   proxy) != 0)
+				diag_log();
+		} else if (! swim_task_is_scheduled(&member->ack_task)) {
 			swim_send_ack(swim, &member->ack_task, &member->addr);
+		}
 		break;
 	case SWIM_FD_MSG_ACK:
 		member->unacknowledged_pings = 0;
@@ -1433,7 +1547,6 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
 	      const char *end, const struct sockaddr_in *src,
 	      const struct sockaddr_in *proxy)
 {
-	(void) proxy;
 	const char *prefix = "invalid message:";
 	struct swim *swim = swim_by_scheduler(scheduler);
 	struct tt_uuid uuid;
@@ -1465,7 +1578,8 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
 			break;
 		case SWIM_FAILURE_DETECTION:
 			if (swim_process_failure_detection(swim, &pos, end,
-							   src, &uuid) != 0)
+							   src, &uuid,
+							   proxy) != 0)
 				goto error;
 			break;
 		case SWIM_DISSEMINATION:
diff --git a/src/lib/swim/swim_constants.h b/src/lib/swim/swim_constants.h
index 7869ddf3e..4f8404ce3 100644
--- a/src/lib/swim/swim_constants.h
+++ b/src/lib/swim/swim_constants.h
@@ -37,6 +37,12 @@
 enum swim_member_status {
 	/** The instance is ok, responds to requests. */
 	MEMBER_ALIVE = 0,
+	/**
+	 * If a member has not responded to a ping, it is declared
+	 * as suspected to be dead. After more failed pings it
+	 * is finally dead.
+	 */
+	MEMBER_SUSPECTED,
 	/**
 	 * The member is considered dead. It will disappear from
 	 * the membership after some unacknowledged pings.
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 18c20abf3..6502e40a1 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -45,6 +45,7 @@ swim_inaddr_str(const struct sockaddr_in *addr)
 
 const char *swim_member_status_strs[] = {
 	"alive",
+	"suspected",
 	"dead",
 	"left",
 };
diff --git a/test/unit/swim.c b/test/unit/swim.c
index e375e6607..2deaf138a 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -240,7 +240,7 @@ swim_test_add_remove(void)
 static void
 swim_test_basic_failure_detection(void)
 {
-	swim_start_test(7);
+	swim_start_test(9);
 	struct swim_cluster *cluster = swim_cluster_new(2);
 	swim_cluster_set_ack_timeout(cluster, 0.5);
 
@@ -248,8 +248,15 @@ swim_test_basic_failure_detection(void)
 	is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE,
 	   "node is added as alive");
 	swim_cluster_block_io(cluster, 1);
-	is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 2.4), -1,
-	   "member still is not dead after 2 noacks");
+	/* Roll one round to send a first ping. */
+	swim_run_for(1);
+
+	is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_SUSPECTED, 0.9), -1,
+	   "member still is not suspected after 1 noack");
+	is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_SUSPECTED, 0.1), 0,
+	   "but it is suspected after one more");
+	is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 1.4), -1,
+	   "it is not dead after 2 more noacks");
 	is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 0.1), 0,
 	   "but it is dead after one more");
 
@@ -304,34 +311,35 @@ swim_test_basic_gossip(void)
 	swim_cluster_add_link(cluster, 1, 0);
 	swim_cluster_set_drop(cluster, 1, 100);
 	/*
-	 * Wait two no-ACKs on S1 from S2. +1 sec to send a first
+	 * Wait one no-ACK on S1 from S2. +1 sec to send a first
 	 * ping.
 	 */
-	swim_run_for(20 + 1);
+	swim_run_for(10 + 1);
 	swim_cluster_add_link(cluster, 0, 2);
 	swim_cluster_add_link(cluster, 2, 1);
 	/*
 	 * After 10 seconds (one ack timeout) S1 should see S2 as
-	 * dead. But S3 still should see S2 as alive. To prevent
-	 * S1 from informing S3 about that the S3 IO is blocked
-	 * for a short time.
+	 * suspected. But S3 still should see S2 as alive. To
+	 * prevent S1 from informing S3 about that the S3 IO is
+	 * blocked for a short time.
 	 */
 	swim_run_for(9);
 	is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE,
 	   "S1 still thinks that S2 is alive");
 	swim_cluster_block_io(cluster, 2);
 	swim_run_for(1);
-	is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD, "but one "\
-	   "more second, and a third ack timed out - S1 sees S2 as dead");
+	is(swim_cluster_member_status(cluster, 0, 1), MEMBER_SUSPECTED,
+	   "but one more second, and a second ack timed out - S1 sees S2 as "\
+	   "suspected");
 	is(swim_cluster_member_status(cluster, 2, 1), MEMBER_ALIVE,
 	   "S3 still thinks that S2 is alive");
 	swim_cluster_unblock_io(cluster, 2);
 	/*
-	 * At most after two round steps S1 sends 'S2 is dead' to
-	 * S3.
+	 * At most after two round steps S1 sends
+	 * 'S2 is suspected' to S3.
 	 */
-	is(swim_cluster_wait_status(cluster, 2, 1, MEMBER_DEAD, 2), 0,
-	   "S3 learns about dead S2 from S1");
+	is(swim_cluster_wait_status(cluster, 2, 1, MEMBER_SUSPECTED, 2), 0,
+	   "S3 learns about suspected S2 from S1");
 
 	swim_cluster_delete(cluster);
 	swim_finish_test();
@@ -363,10 +371,14 @@ swim_test_refute(void)
 
 	swim_cluster_add_link(cluster, 0, 1);
 	swim_cluster_set_drop(cluster, 1, 100);
-	fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 7) != 0);
+	/* Roll one round to send a first ping. */
+	swim_run_for(1);
+
+	fail_if(swim_cluster_wait_status(cluster, 0, 1,
+					 MEMBER_SUSPECTED, 4) != 0);
 	swim_cluster_set_drop(cluster, 1, 0);
 	is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0,
-	   "S2 increments its own incarnation to refute its death");
+	   "S2 increments its own incarnation to refute its suspicion");
 	is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 1), 0,
 	   "new incarnation has reached S1 with a next round message");
 
@@ -386,7 +398,7 @@ swim_test_too_big_packet(void)
 	swim_start_test(3);
 	int size = 50;
 	double ack_timeout = 1;
-	double first_dead_timeout = 20;
+	double first_dead_timeout = 30;
 	double everywhere_dead_timeout = size;
 	int drop_id = size / 2;
 
@@ -465,7 +477,9 @@ swim_test_undead(void)
 	swim_cluster_add_link(cluster, 0, 1);
 	swim_cluster_add_link(cluster, 1, 0);
 	swim_cluster_set_drop(cluster, 1, 100);
-	is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 4), 0,
+	/* Roll one round to send a first ping. */
+	swim_run_for(1);
+	is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 5), 0,
 	   "member S2 is dead");
 	swim_run_for(5);
 	is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD,
@@ -833,10 +847,33 @@ swim_test_payload_refutation(void)
 	swim_finish_test();
 }
 
+static void
+swim_test_indirect_ping(void)
+{
+	swim_start_test(2);
+	uint16_t cluster_size = 3;
+	struct swim_cluster *cluster = swim_cluster_new(cluster_size);
+	swim_cluster_set_ack_timeout(cluster, 1);
+	for (int i = 0; i < cluster_size; ++i) {
+		for (int j = i + 1; j < cluster_size; ++j)
+			swim_cluster_interconnect(cluster, i, j);
+	}
+	swim_cluster_set_drop_channel(cluster, 0, 1, true);
+	swim_cluster_set_drop_channel(cluster, 1, 0, true);
+	swim_run_for(10);
+	is(swim_cluster_wait_status_everywhere(cluster, 0, MEMBER_ALIVE, 0),
+	   0, "S1 is still alive everywhere");
+	is(swim_cluster_wait_status_everywhere(cluster, 1, MEMBER_ALIVE, 0),
+	   0, "as well as S2 - they communicated via S3");
+
+	swim_cluster_delete(cluster);
+	swim_finish_test();
+}
+
 static int
 main_f(va_list ap)
 {
-	swim_start_test(17);
+	swim_start_test(18);
 
 	(void) ap;
 	swim_test_ev_init();
@@ -859,6 +896,7 @@ main_f(va_list ap)
 	swim_test_broadcast();
 	swim_test_payload_basic();
 	swim_test_payload_refutation();
+	swim_test_indirect_ping();
 
 	swim_test_transport_free();
 	swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 4b1407db3..a450d8427 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
 	*** main_f ***
-1..17
+1..18
 	*** swim_test_one_link ***
     1..6
     ok 1 - no rounds - no fullmesh
@@ -64,14 +64,16 @@ ok 4 - subtests
 ok 5 - subtests
 	*** swim_test_add_remove: done ***
 	*** swim_test_basic_failure_detection ***
-    1..7
+    1..9
     ok 1 - node is added as alive
-    ok 2 - member still is not dead after 2 noacks
-    ok 3 - but it is dead after one more
-    ok 4 - after 2 more unacks the member still is not deleted - dissemination TTD keeps it
-    ok 5 - but it is dropped after 2 rounds when TTD gets 0
-    ok 6 - fullmesh is restored
-    ok 7 - a member is added back on an ACK
+    ok 2 - member still is not suspected after 1 noack
+    ok 3 - but it is suspected after one more
+    ok 4 - it is not dead after 2 more noacks
+    ok 5 - but it is dead after one more
+    ok 6 - after 2 more unacks the member still is not deleted - dissemination TTD keeps it
+    ok 7 - but it is dropped after 2 rounds when TTD gets 0
+    ok 8 - fullmesh is restored
+    ok 9 - a member is added back on an ACK
 ok 6 - subtests
 	*** swim_test_basic_failure_detection: done ***
 	*** swim_test_probe ***
@@ -82,7 +84,7 @@ ok 7 - subtests
 	*** swim_test_probe: done ***
 	*** swim_test_refute ***
     1..4
-    ok 1 - S2 increments its own incarnation to refute its death
+    ok 1 - S2 increments its own incarnation to refute its suspicion
     ok 2 - new incarnation has reached S1 with a next round message
     ok 3 - after restart S2's incarnation is 0 again
     ok 4 - S2 learned its old bigger incarnation 1 from S0
@@ -91,9 +93,9 @@ ok 8 - subtests
 	*** swim_test_basic_gossip ***
     1..4
     ok 1 - S1 still thinks that S2 is alive
-    ok 2 - but one more second, and a third ack timed out - S1 sees S2 as dead
+    ok 2 - but one more second, and a second ack timed out - S1 sees S2 as suspected
     ok 3 - S3 still thinks that S2 is alive
-    ok 4 - S3 learns about dead S2 from S1
+    ok 4 - S3 learns about suspected S2 from S1
 ok 9 - subtests
 	*** swim_test_basic_gossip: done ***
 	*** swim_test_too_big_packet ***
@@ -177,4 +179,10 @@ ok 16 - subtests
     ok 11 - S3 learns S1's payload from S2
 ok 17 - subtests
 	*** swim_test_payload_refutation: done ***
+	*** swim_test_indirect_ping ***
+    1..2
+    ok 1 - S1 is still alive everywhere
+    ok 2 - as well as S2 - they communicated via S3
+ok 18 - subtests
+	*** swim_test_indirect_ping: done ***
 	*** main_f: done ***
-- 
2.20.1 (Apple Git-117)





More information about the Tarantool-patches mailing list