[tarantool-patches] [PATCH 5/5] swim: introduce dissemination component

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Apr 5 14:57:31 MSK 2019


Dissemination components broadcasts events about member status
updates. When any member attribute is updated (incarnation,
status, UUID, address), the member stands into an event queue.
Members from the queue are encoded into each round step message
with a higher priority and before anti-entropy section.

It means, then even if a cluster consists of hundreds of members
and one of them was updated on one of instances, this update will
be disseminated regardless of whether this memeber is encoded
into anti-entropy section or not. It drastically speeds events
dissemination up, according to the SWIM paper, and is noticed in
the tests.

Part of #3234
---
 src/lib/swim/swim.c       | 155 +++++++++++++++++++++++++++++++++++++-
 src/lib/swim/swim.h       |   4 +
 src/lib/swim/swim_proto.c |  26 +++++++
 src/lib/swim/swim_proto.h |  56 ++++++++++++++
 test/unit/swim.c          |  48 ++++++++++--
 test/unit/swim.result     |  22 ++++--
 6 files changed, 293 insertions(+), 18 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index a30a83886..8eac102c8 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -269,6 +269,27 @@ struct swim_member {
 	 * message to it.
 	 */
 	struct heap_node in_wait_ack_heap;
+	/**
+	 *
+	 *                 Dissemination component
+	 *
+	 * Dissemination component sends events. Event is a
+	 * notification about member status update. So formally,
+	 * this structure already has all the needed attributes.
+	 * But also an event somehow should be sent to all members
+	 * at least once according to SWIM, so it requires
+	 * something like TTL for each type of event, which gets
+	 * decremented on each send. And a member can not be
+	 * removed from the global table until it gets dead and
+	 * its status TTLs is 0, so as to allow other members
+	 * learn its dead status.
+	 */
+	int status_ttl;
+	/**
+	 * Events are put into a queue sorted by event occurrence
+	 * time.
+	 */
+	struct rlist in_events_queue;
 };
 
 #define mh_name _swim_table
@@ -364,6 +385,12 @@ struct swim {
 	struct ev_timer wait_ack_tick;
 	/** GC state saying how to remove dead members. */
 	enum swim_gc_mode gc_mode;
+	/**
+	 *
+	 *                 Dissemination component
+	 */
+	/** Queue of events sorted by occurrence time. */
+	struct rlist events_queue;
 };
 
 /** Put the member into a list of ACK waiters. */
@@ -377,6 +404,25 @@ swim_wait_ack(struct swim *swim, struct swim_member *member)
 	}
 }
 
+/**
+ * On literally any update of a member it stands into a queue of
+ * events to disseminate the update. Note that status TTL is
+ * always set, even if UUID is updated, or any other attribute. It
+ * is because 1) it simplifies the code when status TTL is bigger
+ * than all other ones, 2) status occupies only 2 bytes in a
+ * packet, so it is never worse to send it on any update, but
+ * reduces entropy.
+ */
+static inline void
+swim_register_event(struct swim *swim, struct swim_member *member)
+{
+	if (rlist_empty(&member->in_events_queue)) {
+		rlist_add_tail_entry(&swim->events_queue, member,
+				     in_events_queue);
+	}
+	member->status_ttl = mh_size(swim->members);
+}
+
 /**
  * Make all needed actions to process a member's update like a
  * change of its status, or incarnation, or both.
@@ -384,8 +430,8 @@ swim_wait_ack(struct swim *swim, struct swim_member *member)
 static void
 swim_on_member_update(struct swim *swim, struct swim_member *member)
 {
-	(void) swim;
 	member->unacknowledged_pings = 0;
+	swim_register_event(swim, member);
 }
 
 /**
@@ -468,6 +514,9 @@ swim_member_delete(struct swim_member *member)
 	swim_task_destroy(&member->ack_task);
 	swim_task_destroy(&member->ping_task);
 
+	/* Dissemination component. */
+	assert(rlist_empty(&member->in_events_queue));
+
 	free(member);
 }
 
@@ -509,6 +558,10 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid,
 	swim_task_create(&member->ack_task, NULL, NULL, "ack");
 	swim_task_create(&member->ping_task, swim_ping_task_complete, NULL,
 			 "ping");
+
+	/* Dissemination component. */
+	rlist_create(&member->in_events_queue);
+
 	return member;
 }
 
@@ -531,6 +584,9 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
 	if (! heap_node_is_stray(&member->in_wait_ack_heap))
 		wait_ack_heap_delete(&swim->wait_ack_heap, member);
 
+	/* Dissemination component. */
+	rlist_del_entry(member, in_events_queue);
+
 	swim_member_delete(member);
 }
 
@@ -590,6 +646,10 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 	}
 	if (mh_size(swim->members) > 1)
 		swim_ev_timer_start(loop(), &swim->round_tick);
+
+	/* Dissemination component. */
+	swim_on_member_update(swim, member);
+
 	say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim),
 		    swim_uuid_str(&member->uuid), mh_size(swim->members));
 	return member;
@@ -727,6 +787,42 @@ swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet,
 	return 1;
 }
 
+/**
+ * Encode dissemination component.
+ * @retval Number of key-values added to the packet's root map.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
+{
+	struct swim_diss_header_bin diss_header_bin;
+	int size = sizeof(diss_header_bin);
+	char *header = swim_packet_reserve(packet, size);
+	if (header == NULL)
+		return 0;
+	int i = 0;
+	char *pos = header + size;
+	struct swim_member *m;
+	struct swim_event_bin event_bin;
+	swim_event_bin_create(&event_bin);
+	rlist_foreach_entry(m, &swim->events_queue, in_events_queue) {
+		int new_size = size + sizeof(event_bin);
+		if (swim_packet_reserve(packet, new_size) == NULL)
+			break;
+		swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid,
+				    m->incarnation);
+		memcpy(pos, &event_bin, sizeof(event_bin));
+		pos += sizeof(event_bin);
+		size = new_size;
+		++i;
+	}
+	if (i == 0)
+		return 0;
+	swim_diss_header_bin_create(&diss_header_bin, i);
+	memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+	swim_packet_advance(packet, size);
+	return 1;
+}
+
 /** Encode SWIM components into a UDP packet. */
 static void
 swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
@@ -737,12 +833,34 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
 	map_size += swim_encode_src_uuid(swim, packet);
 	map_size += swim_encode_failure_detection(swim, packet,
 						  SWIM_FD_MSG_PING);
+	map_size += swim_encode_dissemination(swim, packet);
 	map_size += swim_encode_anti_entropy(swim, packet);
 
 	assert(mp_sizeof_map(map_size) == 1 && map_size >= 2);
 	mp_encode_map(header, map_size);
 }
 
+/**
+ * Decrement TTLs of all events. It is done after each round step.
+ * Note, that when there are too many events to fit into a packet,
+ * the tail of events list without being disseminated start
+ * reeking and rotting, and the most far events can be deleted
+ * without ever being sent. But hardly this situation is reachable
+ * since even 1000 bytes can fit 37 events of ~27 bytes each, that
+ * means in fact a failure of 37 instances. In such a case event
+ * taint is the most mild problem.
+ */
+static void
+swim_decrease_events_ttl(struct swim *swim)
+{
+	struct swim_member *member, *tmp;
+	rlist_foreach_entry_safe(member, &swim->events_queue, in_events_queue,
+				 tmp) {
+		if (--member->status_ttl == 0)
+			rlist_del_entry(member, in_events_queue);
+	}
+}
+
 /**
  * Once per specified timeout trigger a next round step. In round
  * step a next memeber is taken from the round queue and a round
@@ -799,10 +917,12 @@ swim_complete_step(struct swim_task *task,
 		rlist_shift(&swim->round_queue);
 		if (rc > 0) {
 			/*
-			 * Each round message contains failure
-			 * detection section with a ping.
+			 * Each round message contains
+			 * dissemination and failure detection
+			 * sections.
 			 */
 			swim_wait_ack(swim, m);
+			swim_decrease_events_ttl(swim);
 		}
 	}
 }
@@ -872,7 +992,7 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
 			break;
 		case MEMBER_DEAD:
 			if (m->unacknowledged_pings >= NO_ACKS_TO_GC &&
-			    swim->gc_mode == SWIM_GC_ON) {
+			    swim->gc_mode == SWIM_GC_ON && m->status_ttl == 0) {
 				swim_delete_member(swim, m);
 				continue;
 			}
@@ -1121,6 +1241,18 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
 	return 0;
 }
 
+/**
+ * Decode a dissemination message. Schedule new events, update
+ * members.
+ */
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+	say_verbose("SWIM %d: process dissemination", swim_fd(swim));
+	const char *prefix = "invald dissemination message:";
+	return swim_process_members(swim, prefix, pos, end);
+}
+
 /** Process a new message. */
 static void
 swim_on_input(struct swim_scheduler *scheduler, const char *pos,
@@ -1160,6 +1292,10 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
 							   src, &uuid) != 0)
 				goto error;
 			break;
+		case SWIM_DISSEMINATION:
+			if (swim_process_dissemination(swim, &pos, end) != 0)
+				goto error;
+			break;
 		default:
 			diag_set(SwimError, "%s unexpected key", prefix);
 			goto error;
@@ -1199,6 +1335,10 @@ swim_new(void)
 			   ACK_TIMEOUT_DEFAULT, 0);
 	swim->wait_ack_tick.data = (void *) swim;
 	swim->gc_mode = SWIM_GC_ON;
+
+	/* Dissemination component. */
+	rlist_create(&swim->events_queue);
+
 	return swim;
 }
 
@@ -1393,6 +1533,12 @@ swim_info(struct swim *swim, struct info_handler *info)
 	info_end(info);
 }
 
+int
+swim_size(const struct swim *swim)
+{
+	return mh_size(swim->members);
+}
+
 void
 swim_delete(struct swim *swim)
 {
@@ -1407,6 +1553,7 @@ swim_delete(struct swim *swim)
 		rlist_del_entry(m, in_round_queue);
 		if (! heap_node_is_stray(&m->in_wait_ack_heap))
 			wait_ack_heap_delete(&swim->wait_ack_heap, m);
+		rlist_del_entry(m, in_events_queue);
 		swim_member_delete(m);
 	}
 	wait_ack_heap_destroy(&swim->wait_ack_heap);
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 94ddc5dfa..ec924f36f 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -140,6 +140,10 @@ swim_probe_member(struct swim *swim, const char *uri);
 void
 swim_info(struct swim *swim, struct info_handler *info);
 
+/** Get SWIM member table size. */
+int
+swim_size(const struct swim *swim);
+
 /** Get a SWIM member, describing this instance. */
 const struct swim_member *
 swim_self(struct swim *swim);
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 416e1d99e..6b3197790 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -358,6 +358,32 @@ swim_member_bin_create(struct swim_member_bin *header)
 	swim_passport_bin_create(&header->passport);
 }
 
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+			    uint16_t batch_size)
+{
+	header->k_header = SWIM_DISSEMINATION;
+	header->m_header = 0xdc;
+	header->v_header = mp_bswap_u16(batch_size);
+}
+
+void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+	swim_passport_bin_create(&header->passport);
+}
+
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+		    enum swim_member_status status,
+		    const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+		    uint64_t incarnation)
+{
+	header->m_header = 0x85;
+	swim_passport_bin_fill(&header->passport, addr, uuid, status,
+			       incarnation);
+}
+
 void
 swim_meta_header_bin_create(struct swim_meta_header_bin *header,
 			    const struct sockaddr_in *src)
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 0e73f37fb..826443a3b 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -61,6 +61,19 @@
  * |                                                             |
  * |               OR/AND                                        |
  * |                                                             |
+ * |     SWIM_DISSEMINATION: [                                   |
+ * |         {                                                   |
+ * |             SWIM_MEMBER_STATUS: uint, enum member_status,   |
+ * |             SWIM_MEMBER_ADDRESS: uint, ip,                  |
+ * |             SWIM_MEMBER_PORT: uint, port,                   |
+ * |             SWIM_MEMBER_UUID: 16 byte UUID,                 |
+ * |             SWIM_MEMBER_INCARNATION: uint                   |
+ * |         },                                                  |
+ * |         ...                                                 |
+ * |     ],                                                      |
+ * |                                                             |
+ * |               OR/AND                                        |
+ * |                                                             |
  * |     SWIM_ANTI_ENTROPY: [                                    |
  * |         {                                                   |
  * |             SWIM_MEMBER_STATUS: uint, enum member_status,   |
@@ -114,6 +127,7 @@ enum swim_body_key {
 	SWIM_SRC_UUID = 0,
 	SWIM_ANTI_ENTROPY,
 	SWIM_FAILURE_DETECTION,
+	SWIM_DISSEMINATION,
 };
 
 /**
@@ -308,6 +322,48 @@ swim_member_bin_fill(struct swim_member_bin *header,
 
 /** }}}                  Anti-entropy component                 */
 
+/** {{{                 Dissemination component                 */
+
+/** SWIM dissemination MessagePack template. */
+struct PACKED swim_diss_header_bin {
+	/** mp_encode_uint(SWIM_DISSEMINATION) */
+	uint8_t k_header;
+	/** mp_encode_array() */
+	uint8_t m_header;
+	uint16_t v_header;
+};
+
+/** Initialize dissemination header. */
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+			    uint16_t batch_size);
+
+/** SWIM event MessagePack template. */
+struct PACKED swim_event_bin {
+	/** mp_encode_map(5 or 6) */
+	uint8_t m_header;
+	/** Basic member info like status, address. */
+	struct swim_passport_bin passport;
+};
+
+/** Initialize dissemination record. */
+void
+swim_event_bin_create(struct swim_event_bin *header);
+
+/**
+ * Since usually there are many evnets, it is faster to reset a
+ * few fields in an existing template, then each time create a
+ * new template. So the usage pattern is create(), fill(),
+ * fill() ... .
+ */
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+		    enum swim_member_status status,
+		    const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+		    uint64_t incarnation);
+
+/** }}}                 Dissemination component                 */
+
 /** {{{                     Meta component                      */
 
 /**
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 358230e15..e11f006ae 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -98,7 +98,7 @@ swim_test_sequence(void)
 static void
 swim_test_uuid_update(void)
 {
-	swim_start_test(4);
+	swim_start_test(5);
 
 	struct swim_cluster *cluster = swim_cluster_new(2);
 	swim_cluster_add_link(cluster, 0, 1);
@@ -109,6 +109,7 @@ swim_test_uuid_update(void)
 	is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), 0, "UUID update");
 	is(swim_cluster_wait_fullmesh(cluster, 1), 0,
 	   "old UUID is returned back as a 'ghost' member");
+	is(swim_size(s), 3, "two members in each + ghost third member");
 	new_uuid.time_low = 2;
 	is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), -1,
 	   "can not update to an existing UUID - swim_cfg fails");
@@ -245,11 +246,12 @@ swim_test_basic_failure_detection(void)
 	is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 0.1), 0,
 	   "but it is dead after one more");
 
-	is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX,
-				    0.9), -1,
-	   "after 1 more unack the member still is not deleted");
-	is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX,
-				    0.1), 0, "but it is dropped after 1 more");
+	swim_run_for(1);
+	is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD, "after 2 "\
+	   "more unacks the member still is not deleted - dissemination TTL "\
+	   "keeps it");
+	is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX, 2),
+	   0, "but it is dropped after 2 rounds when TTL gets 0");
 
 	/*
 	 * After IO unblock pending messages will be processed all
@@ -378,7 +380,7 @@ swim_test_too_big_packet(void)
 	int size = 50;
 	double ack_timeout = 1;
 	double first_dead_timeout = 20;
-	double everywhere_dead_timeout = size * 3;
+	double everywhere_dead_timeout = size;
 	int drop_id = size / 2;
 
 	struct swim_cluster *cluster = swim_cluster_new(size);
@@ -417,6 +419,35 @@ swim_test_too_big_packet(void)
 	swim_finish_test();
 }
 
+static void
+swim_test_packet_loss(void)
+{
+	double network_drop_rate[] = {5, 10, 20, 50, 90};
+	swim_start_test(lengthof(network_drop_rate));
+	int size = 20;
+	int drop_id = 0;
+	double ack_timeout = 1;
+
+	for (int i = 0; i < (int) lengthof(network_drop_rate); ++i) {
+		double rate = network_drop_rate[i];
+		struct swim_cluster *cluster = swim_cluster_new(size);
+		for (int j = 0; j < size; ++j) {
+			swim_cluster_set_drop(cluster, j, rate);
+			for (int k = 0; k < size; ++k)
+				swim_cluster_add_link(cluster, j, k);
+		}
+		swim_cluster_set_ack_timeout(cluster, ack_timeout);
+		swim_cluster_set_drop(cluster, drop_id, 100);
+		swim_cluster_set_gc(cluster, SWIM_GC_OFF);
+		double timeout = size * 100.0 / (100 - rate);
+		is(swim_cluster_wait_status_everywhere(cluster, drop_id,
+						       MEMBER_DEAD, 1000), 0,
+		   "drop rate = %.2f, but the failure is disseminated", rate);
+		swim_cluster_delete(cluster);
+	}
+	swim_finish_test();
+}
+
 static void
 swim_test_undead(void)
 {
@@ -439,7 +470,7 @@ swim_test_undead(void)
 static int
 main_f(va_list ap)
 {
-	swim_start_test(11);
+	swim_start_test(12);
 
 	(void) ap;
 	swim_test_ev_init();
@@ -456,6 +487,7 @@ main_f(va_list ap)
 	swim_test_basic_gossip();
 	swim_test_too_big_packet();
 	swim_test_undead();
+	swim_test_packet_loss();
 
 	swim_test_transport_free();
 	swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 3393870c2..615327e27 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
 	*** main_f ***
-1..11
+1..12
 	*** swim_test_one_link ***
     1..6
     ok 1 - no rounds - no fullmesh
@@ -16,11 +16,12 @@ ok 1 - subtests
 ok 2 - subtests
 	*** swim_test_sequence: done ***
 	*** swim_test_uuid_update ***
-    1..4
+    1..5
     ok 1 - UUID update
     ok 2 - old UUID is returned back as a 'ghost' member
-    ok 3 - can not update to an existing UUID - swim_cfg fails
-    ok 4 - diag says 'exists'
+    ok 3 - two members in each + ghost third member
+    ok 4 - can not update to an existing UUID - swim_cfg fails
+    ok 5 - diag says 'exists'
 ok 3 - subtests
 	*** swim_test_uuid_update: done ***
 	*** swim_test_cfg ***
@@ -65,8 +66,8 @@ ok 5 - subtests
     ok 1 - node is added as alive
     ok 2 - member still is not dead after 2 noacks
     ok 3 - but it is dead after one more
-    ok 4 - after 1 more unack the member still is not deleted
-    ok 5 - but it is dropped after 1 more
+    ok 4 - after 2 more unacks the member still is not deleted - dissemination TTL keeps it
+    ok 5 - but it is dropped after 2 rounds when TTL gets 0
     ok 6 - fullmesh is restored
     ok 7 - a member is added back on an ACK
 ok 6 - subtests
@@ -106,4 +107,13 @@ ok 10 - subtests
     ok 2 - but it is never deleted due to the cfg option
 ok 11 - subtests
 	*** swim_test_undead: done ***
+	*** swim_test_packet_loss ***
+    1..5
+    ok 1 - drop rate = 5.00, but the failure is disseminated
+    ok 2 - drop rate = 10.00, but the failure is disseminated
+    ok 3 - drop rate = 20.00, but the failure is disseminated
+    ok 4 - drop rate = 50.00, but the failure is disseminated
+    ok 5 - drop rate = 90.00, but the failure is disseminated
+ok 12 - subtests
+	*** swim_test_packet_loss: done ***
 	*** main_f: done ***
-- 
2.17.2 (Apple Git-113)





More information about the Tarantool-patches mailing list