[PATCH v2 3/6] [RAW] swim: introduce a dissemination component

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Dec 25 22:19:26 MSK 2018


Dissemination components broadcasts events about member status
updates.

Part of #3234
---
 src/lib/swim/swim.c | 270 +++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 265 insertions(+), 5 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 22bc06a60..15e079f11 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -377,6 +377,26 @@ struct swim_member {
 	struct swim_io_task ping_task;
 	/** Position in a queue of members waiting for an ack. */
 	struct rlist in_queue_wait_ack;
+	/**
+	 *
+	 *                 Dissemination component
+	 *
+	 * Dissemination component sends events. Event is a
+	 * notification about member status update. So formally,
+	 * this structure already has all the needed attributes.
+	 * But also an event somehow should be sent to all members
+	 * at least once according to SWIM, so it requires
+	 * something like TTL, which decrements on each send. And
+	 * a member can not be removed from the global table until
+	 * it gets dead and its dissemination TTL is 0, so as to
+	 * allow other members learn its dead status.
+	 */
+	int dissemination_ttl;
+	/**
+	 * Events are put into a queue sorted by event occurrence
+	 * time.
+	 */
+	struct rlist in_queue_events;
 };
 
 /**
@@ -451,6 +471,8 @@ struct swim {
 	struct rlist queue_wait_ack;
 	/** Generator of ack checking events. */
 	struct ev_periodic wait_ack_tick;
+	/** Queue of events sorted by occurrence time. */
+	struct rlist queue_events;
 };
 
 static inline uint64_t
@@ -467,6 +489,7 @@ sockaddr_in_hash(const struct sockaddr_in *a)
 enum swim_component_type {
 	SWIM_ANTI_ENTROPY = 0,
 	SWIM_FAILURE_DETECTION,
+	SWIM_DISSEMINATION,
 };
 
 /** {{{                Failure detection component              */
@@ -634,6 +657,88 @@ swim_member_bin_create(struct swim_member_bin *header)
 
 /** }}}                  Anti-entropy component                 */
 
+/** {{{                 Dissemination component                 */
+
+/** SWIM dissemination MsgPack template. */
+struct PACKED swim_diss_header_bin {
+	/** mp_encode_uint(SWIM_DISSEMINATION) */
+	uint8_t k_header;
+	/** mp_encode_array() */
+	uint8_t m_header;
+	uint32_t v_header;
+};
+
+static inline void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
+{
+	header->k_header = SWIM_DISSEMINATION;
+	header->m_header = 0xdd;
+	header->v_header = mp_bswap_u32(batch_size);
+}
+
+/** SWIM event MsgPack template. */
+struct PACKED swim_event_bin {
+	/** mp_encode_map(4) */
+	uint8_t m_header;
+
+	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
+	uint8_t k_status;
+	/** mp_encode_uint(enum member_status) */
+	uint8_t v_status;
+
+	/** mp_encode_uint(SWIM_MEMBER_ADDR) */
+	uint8_t k_addr;
+	/** mp_encode_uint(addr.sin_addr.s_addr) */
+	uint8_t m_addr;
+	uint32_t v_addr;
+
+	/** mp_encode_uint(SWIM_MEMBER_PORT) */
+	uint8_t k_port;
+	/** mp_encode_uint(addr.sin_port) */
+	uint8_t m_port;
+	uint16_t v_port;
+
+	/** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+	uint8_t k_incarnation;
+	/** mp_encode_uint(64bit incarnation) */
+	uint8_t m_incarnation;
+	uint64_t v_incarnation;
+};
+
+static inline void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+	header->m_header = 0x84;
+	header->k_status = SWIM_MEMBER_STATUS;
+	header->k_addr = SWIM_MEMBER_ADDR;
+	header->m_addr = 0xce;
+	header->k_port = SWIM_MEMBER_PORT;
+	header->m_port = 0xcd;
+	header->k_incarnation = SWIM_MEMBER_INCARNATION;
+	header->m_incarnation = 0xcf;
+}
+
+static inline void
+swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
+{
+	header->v_status = member->status;
+	header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+	header->v_port = mp_bswap_u16(member->addr.sin_port);
+	header->v_incarnation = mp_bswap_u64(member->incarnation);
+}
+
+static inline void
+swim_schedule_event(struct swim *swim, struct swim_member *member)
+{
+	if (rlist_empty(&member->in_queue_events)) {
+		rlist_add_tail_entry(&swim->queue_events, member,
+				     in_queue_events);
+	}
+	member->dissemination_ttl = mh_size(swim->members);
+}
+
+/** }}}                 Dissemination component                 */
+
 /**
  * SWIM message structure:
  * {
@@ -644,6 +749,18 @@ swim_member_bin_create(struct swim_member_bin *header)
  *
  *                 OR/AND
  *
+ *     SWIM_DISSEMINATION: [
+ *         {
+ *             SWIM_MEMBER_STATUS: uint, enum member_status,
+ *             SWIM_MEMBER_ADDR: uint, ip,
+ *             SWIM_MEMBER_PORT: uint, port,
+ *             SWIM_MEMBER_INCARNATION: uint
+ *         },
+ *         ...
+ *     ],
+ *
+ *                 OR/AND
+ *
  *     SWIM_ANTI_ENTROPY: [
  *         {
  *             SWIM_MEMBER_STATUS: uint, enum member_status,
@@ -663,6 +780,16 @@ swim_io_task_push(struct swim_io_task *task)
 	ev_io_start(loop(), &task->swim->output);
 }
 
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_member_is_updated(struct swim *swim, struct swim_member *member)
+{
+	swim_schedule_event(swim, member);
+}
+
 /**
  * Update status of the member if needed. Statuses are compared as
  * a compound key: {incarnation, status}. So @a new_status can
@@ -678,14 +805,16 @@ swim_member_update_status(struct swim *swim, struct swim_member *member,
 			  enum swim_member_status new_status,
 			  uint64_t incarnation)
 {
-	(void) swim;
 	assert(member != swim->self);
 	if (member->incarnation == incarnation) {
-		if (member->status < new_status)
+		if (member->status < new_status) {
 			member->status = new_status;
+			swim_member_is_updated(swim, member);
+		}
 	} else if (member->incarnation < incarnation) {
 		member->status = new_status;
 		member->incarnation = incarnation;
+		swim_member_is_updated(swim, member);
 	}
 }
 
@@ -725,6 +854,8 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
 	swim_io_task_create(&member->ping_task, swim_send_ping, swim);
 	rlist_add_entry(&swim->queue_round, member, in_queue_round);
 	rlist_create(&member->in_queue_wait_ack);
+	rlist_create(&member->in_queue_events);
+	swim_schedule_event(swim, member);
 	return member;
 }
 
@@ -753,6 +884,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
 	swim_io_task_destroy(&member->ping_task);
 	rlist_del_entry(member, in_queue_round);
 	rlist_del_entry(member, in_queue_wait_ack);
+	assert(rlist_empty(&member->in_queue_events));
 	free(member);
 }
 
@@ -872,6 +1004,62 @@ swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg,
 	return 1;
 }
 
+static int
+swim_encode_dissemination_part(struct swim_msg *msg, struct rlist **queue_pos)
+{
+	struct swim_diss_header_bin diss_header_bin;
+	struct swim_msg_part *part =
+		swim_msg_reserve(msg, sizeof(diss_header_bin));
+	if (part == NULL)
+		return -1;
+	char *header = swim_msg_part_pos(part);
+	char *end = swim_msg_part_end(part);
+	char *pos = header + sizeof(diss_header_bin);
+
+	int i = 0;
+	struct swim_member *member, *prev = NULL;
+	struct swim_event_bin event_bin;
+	swim_event_bin_create(&event_bin);
+	rlist_foreach_entry(member, *queue_pos, in_queue_events) {
+		if (pos + sizeof(event_bin) > end)
+			break;
+		swim_event_bin_reset(&event_bin, member);
+		memcpy(pos, &event_bin, sizeof(event_bin));
+		pos += sizeof(event_bin);
+		++i;
+		prev = member;
+	}
+	if (i == 0)
+		return 0;
+	swim_diss_header_bin_create(&diss_header_bin, i);
+	memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+	swim_msg_part_advance(part, pos - header);
+	if (prev == rlist_last_entry(*queue_pos, struct swim_member,
+				     in_queue_events))
+		*queue_pos = NULL;
+	else
+		*queue_pos = rlist_next(&prev->in_queue_events);
+	return i;
+}
+
+/**
+ * Encode failure dissemination component.
+ * @retval Number of encoded events.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_msg *msg)
+{
+	int count = 0;
+	struct rlist *pos = rlist_first(&swim->queue_events);
+	while (pos != rlist_last(&swim->queue_events)) {
+		int rc = swim_encode_dissemination_part(msg, &pos);
+		if (rc < 0)
+			return -1;
+		count += rc;
+	}
+	return count;
+}
+
 /** Encode SWIM components into a sequence of UDP packets. */
 static int
 swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
@@ -889,6 +1077,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
 		goto error;
 	map_size += rc > 0;
 
+	rc = swim_encode_dissemination(swim, msg);
+	if (rc < 0)
+		goto error;
+	map_size += rc > 0;
+
 	rc = swim_encode_anti_entropy(swim, msg);
 	if (rc < 0)
 		goto error;
@@ -902,6 +1095,17 @@ error:
 	return -1;
 }
 
+static void
+swim_decrease_events_ttl(struct swim *swim)
+{
+	struct swim_member *member, *tmp;
+	rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
+				 tmp) {
+		if (--member->dissemination_ttl == 0)
+			rlist_del_entry(member, in_queue_events);
+	}
+}
+
 /**
  * Do one round step. Send encoded components to a next member
  * from the queue.
@@ -943,6 +1147,7 @@ swim_send_round_msg(struct swim_io_task *task)
 	}
 	swim_msg_destroy(&msg);
 	swim_member_schedule_ack_wait(swim, m);
+	swim_decrease_events_ttl(swim);
 	rlist_del_entry(m, in_queue_round);
 next_round_step:
 	ev_periodic_start(loop(), &swim->round_tick);
@@ -1040,12 +1245,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
 			break;
 		++m->failed_pings;
 		if (m->failed_pings >= NO_ACKS_TO_GC) {
-			if (!m->is_pinned)
+			if (!m->is_pinned && m->dissemination_ttl == 0)
 				swim_member_delete(swim, m);
 			continue;
 		}
-		if (m->failed_pings >= NO_ACKS_TO_DEAD)
+		if (m->failed_pings >= NO_ACKS_TO_DEAD) {
 			m->status = MEMBER_DEAD;
+			swim_member_is_updated(swim, m);
+		}
 		swim_io_task_push(&m->ping_task);
 		rlist_del_entry(m, in_queue_wait_ack);
 	}
@@ -1296,6 +1503,50 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
 	return 0;
 }
 
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+	const char *msg_pref = "Invald SWIM dissemination message:";
+	if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+		say_error("%s message should be an array", msg_pref);
+		return -1;
+	}
+	uint64_t size = mp_decode_array(pos);
+	for (uint64_t i = 0; i < size; ++i) {
+		if (mp_typeof(**pos) != MP_MAP ||
+		    mp_check_map(*pos, end) > 0) {
+			say_error("%s event should be map", msg_pref);
+			return -1;
+		}
+		uint64_t map_size = mp_decode_map(pos);
+		struct swim_member_def def;
+		swim_member_def_create(&def);
+		for (uint64_t j = 0; j < map_size; ++j) {
+			if (mp_typeof(**pos) != MP_UINT ||
+			    mp_check_uint(*pos, end) > 0) {
+				say_error("%s event key should be uint",
+					  msg_pref);
+				return -1;
+			}
+			uint64_t key = mp_decode_uint(pos);
+			if (key >= swim_member_key_MAX) {
+				say_error("%s unknown event key", msg_pref);
+				return -1;
+			}
+			if (swim_process_member_key(key, pos, end, msg_pref,
+						    &def) != 0)
+				return -1;
+		}
+		if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+			say_error("%s member address should be specified",
+				  msg_pref);
+			return -1;
+		}
+		swim_process_member_update(swim, &def);
+	}
+	return 0;
+}
+
 /** Receive and process a new message. */
 static void
 swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
@@ -1344,6 +1595,11 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
 							   &addr) != 0)
 				return;
 			break;
+		case SWIM_DISSEMINATION:
+			say_verbose("SWIM: process dissemination");
+			if (swim_process_dissemination(swim, &pos, end) != 0)
+				return;
+			break;
 		default:
 			say_error("%s unknown component type component is "\
 				  "supported", msg_pref);
@@ -1422,6 +1678,7 @@ swim_new(void)
 	ev_init(&swim->wait_ack_tick, swim_check_acks);
 	ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
 	swim->wait_ack_tick.data = (void *) swim;
+	rlist_create(&swim->queue_events);
 	return swim;
 }
 
@@ -1508,8 +1765,10 @@ swim_remove_member(struct swim *swim, const char *uri)
 	if (uri_to_addr(uri, &addr) != 0)
 		return -1;
 	struct swim_member *member = swim_find_member(swim, &addr);
-	if (member != NULL)
+	if (member != NULL) {
+		rlist_del_entry(member, in_queue_events);
 		swim_member_delete(swim, member);
+	}
 	return 0;
 }
 
@@ -1546,6 +1805,7 @@ swim_delete(struct swim *swim)
 	while (node != mh_end(swim->members)) {
 		struct swim_member *m = (struct swim_member *)
 			mh_i64ptr_node(swim->members, node)->val;
+		rlist_del_entry(m, in_queue_events);
 		swim_member_delete(swim, m);
 		node = mh_first(swim->members);
 	}
-- 
2.17.2 (Apple Git-113)




More information about the Tarantool-patches mailing list