[tarantool-patches] [PATCH v2 6/6] swim: introduce dissemination component

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Apr 9 14:46:37 MSK 2019


Dissemination components broadcasts events about member status
updates. When any member attribute is updated (incarnation,
status, UUID, address), the member stands into an event queue.
Members from the queue are encoded into each round step message
with a higher priority and before anti-entropy section.

It means, then even if a cluster consists of hundreds of members
and one of them was updated on one of instances, this update will
be disseminated regardless of whether this memeber is encoded
into anti-entropy section or not. It drastically speeds events
dissemination up, according to the SWIM paper, and is noticed in
the tests.

Part of #3234
---
 src/lib/swim/swim.c       | 183 +++++++++++++++++++++++++++++++++++++-
 src/lib/swim/swim.h       |   4 +
 src/lib/swim/swim_proto.c |  26 ++++++
 src/lib/swim/swim_proto.h |  56 ++++++++++++
 test/unit/swim.c          |  48 ++++++++--
 test/unit/swim.result     |  22 +++--
 6 files changed, 321 insertions(+), 18 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index a30a83886..eec5d7d25 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -269,6 +269,46 @@ struct swim_member {
 	 * message to it.
 	 */
 	struct heap_node in_wait_ack_heap;
+	/**
+	 *
+	 *                 Dissemination component
+	 *
+	 * Dissemination component sends events. Event is a
+	 * notification about some member state update. The member
+	 * maintains a different event type for each significant
+	 * attribute - status, incarnation, etc not to send entire
+	 * member state each time any member attribute changes.
+	 *
+	 * According to SWIM, an event should be sent to all
+	 * members at least once - for that a TTL (time-to-live)
+	 * counter is maintained for each independent event type.
+	 *
+	 * When a member state changes, the TTL is reset to the
+	 * cluster size. It is then decremented after each send.
+	 * This guarantees that each member state change is sent
+	 * to each SWIM member at least once. If a new event of
+	 * the same type is generated before a round is finished,
+	 * the current event object is updated in place with reset
+	 * of the TTL.
+	 *
+	 * To conclude, TTL works in two ways: to see which
+	 * specific member attribute needs dissemination and to
+	 * track how many cluster members still need to learn
+	 * about the change from this instance.
+	 */
+	/**
+	 * General TTL reset each time when any visible member
+	 * attribute is updated. It is always bigger or equal than
+	 * any other TTLs. In addition it helps to keep a dead
+	 * member not dropped until the TTL gets zero so as to
+	 * allow other members to learn the dead status.
+	 */
+	int status_ttl;
+	/**
+	 * All created events are put into a queue sorted by event
+	 * time.
+	 */
+	struct rlist in_dissemination_queue;
 };
 
 #define mh_name _swim_table
@@ -364,6 +404,17 @@ struct swim {
 	struct ev_timer wait_ack_tick;
 	/** GC state saying how to remove dead members. */
 	enum swim_gc_mode gc_mode;
+	/**
+	 *
+	 *                 Dissemination component
+	 */
+	/**
+	 * Queue of all members which have dissemination
+	 * information. A member is added to the queue whenever
+	 * any of its attributes changes, and stays in the queue
+	 * as long as the event TTL is non-zero.
+	 */
+	struct rlist dissemination_queue;
 };
 
 /** Put the member into a list of ACK waiters. */
@@ -377,6 +428,26 @@ swim_wait_ack(struct swim *swim, struct swim_member *member)
 	}
 }
 
+/**
+ * On literally any update of a member it is added to a queue of
+ * members to disseminate updates. Regardless of other TTLs, each
+ * update also resets status TTL. Status TTL is always greater
+ * than any other event-related TTL, so it's sufficient to look at
+ * it alone to see that a member needs information dissemination.
+ * The status change itself occupies only 2 bytes in a packet, so
+ * it is cheap to send it on any update, while does reduce
+ * entropy.
+ */
+static inline void
+swim_register_event(struct swim *swim, struct swim_member *member)
+{
+	if (rlist_empty(&member->in_dissemination_queue)) {
+		rlist_add_tail_entry(&swim->dissemination_queue, member,
+				     in_dissemination_queue);
+	}
+	member->status_ttl = mh_size(swim->members);
+}
+
 /**
  * Make all needed actions to process a member's update like a
  * change of its status, or incarnation, or both.
@@ -384,8 +455,8 @@ swim_wait_ack(struct swim *swim, struct swim_member *member)
 static void
 swim_on_member_update(struct swim *swim, struct swim_member *member)
 {
-	(void) swim;
 	member->unacknowledged_pings = 0;
+	swim_register_event(swim, member);
 }
 
 /**
@@ -468,6 +539,9 @@ swim_member_delete(struct swim_member *member)
 	swim_task_destroy(&member->ack_task);
 	swim_task_destroy(&member->ping_task);
 
+	/* Dissemination component. */
+	assert(rlist_empty(&member->in_dissemination_queue));
+
 	free(member);
 }
 
@@ -509,6 +583,10 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid,
 	swim_task_create(&member->ack_task, NULL, NULL, "ack");
 	swim_task_create(&member->ping_task, swim_ping_task_complete, NULL,
 			 "ping");
+
+	/* Dissemination component. */
+	rlist_create(&member->in_dissemination_queue);
+
 	return member;
 }
 
@@ -531,6 +609,9 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
 	if (! heap_node_is_stray(&member->in_wait_ack_heap))
 		wait_ack_heap_delete(&swim->wait_ack_heap, member);
 
+	/* Dissemination component. */
+	rlist_del_entry(member, in_dissemination_queue);
+
 	swim_member_delete(member);
 }
 
@@ -590,6 +671,10 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 	}
 	if (mh_size(swim->members) > 1)
 		swim_ev_timer_start(loop(), &swim->round_tick);
+
+	/* Dissemination component. */
+	swim_on_member_update(swim, member);
+
 	say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim),
 		    swim_uuid_str(&member->uuid), mh_size(swim->members));
 	return member;
@@ -727,6 +812,43 @@ swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet,
 	return 1;
 }
 
+/**
+ * Encode dissemination component.
+ * @retval Number of key-values added to the packet's root map.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
+{
+	struct swim_diss_header_bin diss_header_bin;
+	int size = sizeof(diss_header_bin);
+	char *header = swim_packet_reserve(packet, size);
+	if (header == NULL)
+		return 0;
+	int i = 0;
+	char *pos = header + size;
+	struct swim_member *m;
+	struct swim_event_bin event_bin;
+	swim_event_bin_create(&event_bin);
+	rlist_foreach_entry(m, &swim->dissemination_queue,
+			    in_dissemination_queue) {
+		int new_size = size + sizeof(event_bin);
+		if (swim_packet_reserve(packet, new_size) == NULL)
+			break;
+		swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid,
+				    m->incarnation);
+		memcpy(pos, &event_bin, sizeof(event_bin));
+		pos += sizeof(event_bin);
+		size = new_size;
+		++i;
+	}
+	if (i == 0)
+		return 0;
+	swim_diss_header_bin_create(&diss_header_bin, i);
+	memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+	swim_packet_advance(packet, size);
+	return 1;
+}
+
 /** Encode SWIM components into a UDP packet. */
 static void
 swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
@@ -737,12 +859,36 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
 	map_size += swim_encode_src_uuid(swim, packet);
 	map_size += swim_encode_failure_detection(swim, packet,
 						  SWIM_FD_MSG_PING);
+	map_size += swim_encode_dissemination(swim, packet);
 	map_size += swim_encode_anti_entropy(swim, packet);
 
 	assert(mp_sizeof_map(map_size) == 1 && map_size >= 2);
 	mp_encode_map(header, map_size);
 }
 
+/**
+ * Decrement TTLs of all events. It is done after each round step.
+ * Note, since we decrement TTL of all events, even those which
+ * have not been actually encoded and sent, if there are more
+ * events than can fit into a packet, the tail of the queue begins
+ * reeking and rotting. The most recently added members could even
+ * be deleted without being sent once. This is, however, very
+ * unlikely, since even 1000 bytes can fit 37 events containing
+ * ~27 bytes each, which means only happens upon a failure of 37
+ * instances. In such a case event loss is the mildest problem to
+ * deal with.
+ */
+static void
+swim_decrease_event_ttl(struct swim *swim)
+{
+	struct swim_member *member, *tmp;
+	rlist_foreach_entry_safe(member, &swim->dissemination_queue,
+				 in_dissemination_queue, tmp) {
+		if (--member->status_ttl == 0)
+			rlist_del_entry(member, in_dissemination_queue);
+	}
+}
+
 /**
  * Once per specified timeout trigger a next round step. In round
  * step a next memeber is taken from the round queue and a round
@@ -799,10 +945,12 @@ swim_complete_step(struct swim_task *task,
 		rlist_shift(&swim->round_queue);
 		if (rc > 0) {
 			/*
-			 * Each round message contains failure
-			 * detection section with a ping.
+			 * Each round message contains
+			 * dissemination and failure detection
+			 * sections.
 			 */
 			swim_wait_ack(swim, m);
+			swim_decrease_event_ttl(swim);
 		}
 	}
 }
@@ -872,7 +1020,7 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
 			break;
 		case MEMBER_DEAD:
 			if (m->unacknowledged_pings >= NO_ACKS_TO_GC &&
-			    swim->gc_mode == SWIM_GC_ON) {
+			    swim->gc_mode == SWIM_GC_ON && m->status_ttl == 0) {
 				swim_delete_member(swim, m);
 				continue;
 			}
@@ -1121,6 +1269,18 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
 	return 0;
 }
 
+/**
+ * Decode a dissemination message. Schedule new events, update
+ * members.
+ */
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+	say_verbose("SWIM %d: process dissemination", swim_fd(swim));
+	const char *prefix = "invald dissemination message:";
+	return swim_process_members(swim, prefix, pos, end);
+}
+
 /** Process a new message. */
 static void
 swim_on_input(struct swim_scheduler *scheduler, const char *pos,
@@ -1160,6 +1320,10 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
 							   src, &uuid) != 0)
 				goto error;
 			break;
+		case SWIM_DISSEMINATION:
+			if (swim_process_dissemination(swim, &pos, end) != 0)
+				goto error;
+			break;
 		default:
 			diag_set(SwimError, "%s unexpected key", prefix);
 			goto error;
@@ -1199,6 +1363,10 @@ swim_new(void)
 			   ACK_TIMEOUT_DEFAULT, 0);
 	swim->wait_ack_tick.data = (void *) swim;
 	swim->gc_mode = SWIM_GC_ON;
+
+	/* Dissemination component. */
+	rlist_create(&swim->dissemination_queue);
+
 	return swim;
 }
 
@@ -1393,6 +1561,12 @@ swim_info(struct swim *swim, struct info_handler *info)
 	info_end(info);
 }
 
+int
+swim_size(const struct swim *swim)
+{
+	return mh_size(swim->members);
+}
+
 void
 swim_delete(struct swim *swim)
 {
@@ -1407,6 +1581,7 @@ swim_delete(struct swim *swim)
 		rlist_del_entry(m, in_round_queue);
 		if (! heap_node_is_stray(&m->in_wait_ack_heap))
 			wait_ack_heap_delete(&swim->wait_ack_heap, m);
+		rlist_del_entry(m, in_dissemination_queue);
 		swim_member_delete(m);
 	}
 	wait_ack_heap_destroy(&swim->wait_ack_heap);
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 94ddc5dfa..ec924f36f 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -140,6 +140,10 @@ swim_probe_member(struct swim *swim, const char *uri);
 void
 swim_info(struct swim *swim, struct info_handler *info);
 
+/** Get SWIM member table size. */
+int
+swim_size(const struct swim *swim);
+
 /** Get a SWIM member, describing this instance. */
 const struct swim_member *
 swim_self(struct swim *swim);
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 416e1d99e..6b3197790 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -358,6 +358,32 @@ swim_member_bin_create(struct swim_member_bin *header)
 	swim_passport_bin_create(&header->passport);
 }
 
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+			    uint16_t batch_size)
+{
+	header->k_header = SWIM_DISSEMINATION;
+	header->m_header = 0xdc;
+	header->v_header = mp_bswap_u16(batch_size);
+}
+
+void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+	swim_passport_bin_create(&header->passport);
+}
+
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+		    enum swim_member_status status,
+		    const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+		    uint64_t incarnation)
+{
+	header->m_header = 0x85;
+	swim_passport_bin_fill(&header->passport, addr, uuid, status,
+			       incarnation);
+}
+
 void
 swim_meta_header_bin_create(struct swim_meta_header_bin *header,
 			    const struct sockaddr_in *src)
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 0e73f37fb..826443a3b 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -61,6 +61,19 @@
  * |                                                             |
  * |               OR/AND                                        |
  * |                                                             |
+ * |     SWIM_DISSEMINATION: [                                   |
+ * |         {                                                   |
+ * |             SWIM_MEMBER_STATUS: uint, enum member_status,   |
+ * |             SWIM_MEMBER_ADDRESS: uint, ip,                  |
+ * |             SWIM_MEMBER_PORT: uint, port,                   |
+ * |             SWIM_MEMBER_UUID: 16 byte UUID,                 |
+ * |             SWIM_MEMBER_INCARNATION: uint                   |
+ * |         },                                                  |
+ * |         ...                                                 |
+ * |     ],                                                      |
+ * |                                                             |
+ * |               OR/AND                                        |
+ * |                                                             |
  * |     SWIM_ANTI_ENTROPY: [                                    |
  * |         {                                                   |
  * |             SWIM_MEMBER_STATUS: uint, enum member_status,   |
@@ -114,6 +127,7 @@ enum swim_body_key {
 	SWIM_SRC_UUID = 0,
 	SWIM_ANTI_ENTROPY,
 	SWIM_FAILURE_DETECTION,
+	SWIM_DISSEMINATION,
 };
 
 /**
@@ -308,6 +322,48 @@ swim_member_bin_fill(struct swim_member_bin *header,
 
 /** }}}                  Anti-entropy component                 */
 
+/** {{{                 Dissemination component                 */
+
+/** SWIM dissemination MessagePack template. */
+struct PACKED swim_diss_header_bin {
+	/** mp_encode_uint(SWIM_DISSEMINATION) */
+	uint8_t k_header;
+	/** mp_encode_array() */
+	uint8_t m_header;
+	uint16_t v_header;
+};
+
+/** Initialize dissemination header. */
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+			    uint16_t batch_size);
+
+/** SWIM event MessagePack template. */
+struct PACKED swim_event_bin {
+	/** mp_encode_map(5 or 6) */
+	uint8_t m_header;
+	/** Basic member info like status, address. */
+	struct swim_passport_bin passport;
+};
+
+/** Initialize dissemination record. */
+void
+swim_event_bin_create(struct swim_event_bin *header);
+
+/**
+ * Since usually there are many evnets, it is faster to reset a
+ * few fields in an existing template, then each time create a
+ * new template. So the usage pattern is create(), fill(),
+ * fill() ... .
+ */
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+		    enum swim_member_status status,
+		    const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+		    uint64_t incarnation);
+
+/** }}}                 Dissemination component                 */
+
 /** {{{                     Meta component                      */
 
 /**
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 002ea1a5b..4412e252a 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -98,7 +98,7 @@ swim_test_sequence(void)
 static void
 swim_test_uuid_update(void)
 {
-	swim_test_start(4);
+	swim_test_start(5);
 
 	struct swim_cluster *cluster = swim_cluster_new(2);
 	swim_cluster_add_link(cluster, 0, 1);
@@ -109,6 +109,7 @@ swim_test_uuid_update(void)
 	is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), 0, "UUID update");
 	is(swim_cluster_wait_fullmesh(cluster, 1), 0,
 	   "old UUID is returned back as a 'ghost' member");
+	is(swim_size(s), 3, "two members in each + ghost third member");
 	new_uuid.time_low = 2;
 	is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), -1,
 	   "can not update to an existing UUID - swim_cfg fails");
@@ -248,11 +249,12 @@ swim_test_basic_failure_detection(void)
 	is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 0.1), 0,
 	   "but it is dead after one more");
 
-	is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX,
-				    0.9), -1,
-	   "after 1 more unack the member still is not deleted");
-	is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX,
-				    0.1), 0, "but it is dropped after 1 more");
+	swim_test_run_for(1);
+	is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD, "after 2 "\
+	   "more unacks the member still is not deleted - dissemination TTL "\
+	   "keeps it");
+	is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX, 2),
+	   0, "but it is dropped after 2 rounds when TTL gets 0");
 
 	/*
 	 * After IO unblock pending messages will be processed all
@@ -381,7 +383,7 @@ swim_test_too_big_packet(void)
 	int size = 50;
 	double ack_timeout = 1;
 	double first_dead_timeout = 20;
-	double everywhere_dead_timeout = size * 3;
+	double everywhere_dead_timeout = size;
 	int drop_id = size / 2;
 
 	struct swim_cluster *cluster = swim_cluster_new(size);
@@ -420,6 +422,35 @@ swim_test_too_big_packet(void)
 	swim_test_finish();
 }
 
+static void
+swim_test_packet_loss(void)
+{
+	double network_drop_rate[] = {5, 10, 20, 50, 90};
+	swim_test_start(lengthof(network_drop_rate));
+	int size = 20;
+	int drop_id = 0;
+	double ack_timeout = 1;
+
+	for (int i = 0; i < (int) lengthof(network_drop_rate); ++i) {
+		double rate = network_drop_rate[i];
+		struct swim_cluster *cluster = swim_cluster_new(size);
+		for (int j = 0; j < size; ++j) {
+			swim_cluster_set_drop(cluster, j, rate);
+			for (int k = 0; k < size; ++k)
+				swim_cluster_add_link(cluster, j, k);
+		}
+		swim_cluster_set_ack_timeout(cluster, ack_timeout);
+		swim_cluster_set_drop(cluster, drop_id, 100);
+		swim_cluster_set_gc(cluster, SWIM_GC_OFF);
+		double timeout = size * 100.0 / (100 - rate);
+		is(swim_cluster_wait_status_everywhere(cluster, drop_id,
+						       MEMBER_DEAD, 1000), 0,
+		   "drop rate = %.2f, but the failure is disseminated", rate);
+		swim_cluster_delete(cluster);
+	}
+	swim_test_finish();
+}
+
 static void
 swim_test_undead(void)
 {
@@ -442,7 +473,7 @@ swim_test_undead(void)
 static int
 main_f(va_list ap)
 {
-	swim_test_start(11);
+	swim_test_start(12);
 
 	(void) ap;
 	swim_test_ev_init();
@@ -459,6 +490,7 @@ main_f(va_list ap)
 	swim_test_basic_gossip();
 	swim_test_too_big_packet();
 	swim_test_undead();
+	swim_test_packet_loss();
 
 	swim_test_transport_free();
 	swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 3393870c2..615327e27 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
 	*** main_f ***
-1..11
+1..12
 	*** swim_test_one_link ***
     1..6
     ok 1 - no rounds - no fullmesh
@@ -16,11 +16,12 @@ ok 1 - subtests
 ok 2 - subtests
 	*** swim_test_sequence: done ***
 	*** swim_test_uuid_update ***
-    1..4
+    1..5
     ok 1 - UUID update
     ok 2 - old UUID is returned back as a 'ghost' member
-    ok 3 - can not update to an existing UUID - swim_cfg fails
-    ok 4 - diag says 'exists'
+    ok 3 - two members in each + ghost third member
+    ok 4 - can not update to an existing UUID - swim_cfg fails
+    ok 5 - diag says 'exists'
 ok 3 - subtests
 	*** swim_test_uuid_update: done ***
 	*** swim_test_cfg ***
@@ -65,8 +66,8 @@ ok 5 - subtests
     ok 1 - node is added as alive
     ok 2 - member still is not dead after 2 noacks
     ok 3 - but it is dead after one more
-    ok 4 - after 1 more unack the member still is not deleted
-    ok 5 - but it is dropped after 1 more
+    ok 4 - after 2 more unacks the member still is not deleted - dissemination TTL keeps it
+    ok 5 - but it is dropped after 2 rounds when TTL gets 0
     ok 6 - fullmesh is restored
     ok 7 - a member is added back on an ACK
 ok 6 - subtests
@@ -106,4 +107,13 @@ ok 10 - subtests
     ok 2 - but it is never deleted due to the cfg option
 ok 11 - subtests
 	*** swim_test_undead: done ***
+	*** swim_test_packet_loss ***
+    1..5
+    ok 1 - drop rate = 5.00, but the failure is disseminated
+    ok 2 - drop rate = 10.00, but the failure is disseminated
+    ok 3 - drop rate = 20.00, but the failure is disseminated
+    ok 4 - drop rate = 50.00, but the failure is disseminated
+    ok 5 - drop rate = 90.00, but the failure is disseminated
+ok 12 - subtests
+	*** swim_test_packet_loss: done ***
 	*** main_f: done ***
-- 
2.17.2 (Apple Git-113)





More information about the Tarantool-patches mailing list