[PATCH v3 3/6] [RAW] swim: introduce a dissemination component

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Dec 29 13:14:12 MSK 2018


Dissemination components broadcasts events about member status
updates.

Part of #3234
---
 src/lib/swim/swim.c | 287 +++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 282 insertions(+), 5 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index c7bc11bca..4e7ffbc54 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -184,6 +184,27 @@ struct swim_member {
 	struct swim_task ping_task;
 	/** Position in a queue of members waiting for an ack. */
 	struct rlist in_queue_wait_ack;
+	/**
+	 *
+	 *                 Dissemination component
+	 *
+	 * Dissemination component sends events. Event is a
+	 * notification about member status update. So formally,
+	 * this structure already has all the needed attributes.
+	 * But also an event somehow should be sent to all members
+	 * at least once according to SWIM, so it requires
+	 * something like TTL for each type of event, which
+	 * decrements on each send. And a member can not be
+	 * removed from the global table until it gets dead and
+	 * its status TTLs is 0, so as to allow other members
+	 * learn its dead status.
+	 */
+	int status_ttl;
+	/**
+	 * Events are put into a queue sorted by event occurrence
+	 * time.
+	 */
+	struct rlist in_queue_events;
 };
 
 /**
@@ -248,6 +269,12 @@ struct swim {
 	struct rlist queue_wait_ack;
 	/** Generator of ack checking events. */
 	struct ev_periodic wait_ack_tick;
+	/**
+	 *
+	 *                 Dissemination component
+	 */
+	/** Queue of events sorted by occurrence time. */
+	struct rlist queue_events;
 };
 
 static inline uint64_t
@@ -264,6 +291,7 @@ sockaddr_in_hash(const struct sockaddr_in *a)
 enum swim_component_type {
 	SWIM_ANTI_ENTROPY = 0,
 	SWIM_FAILURE_DETECTION,
+	SWIM_DISSEMINATION,
 };
 
 /** {{{                Failure detection component              */
@@ -431,6 +459,88 @@ swim_member_bin_create(struct swim_member_bin *header)
 
 /** }}}                  Anti-entropy component                 */
 
+/** {{{                 Dissemination component                 */
+
+/** SWIM dissemination MsgPack template. */
+struct PACKED swim_diss_header_bin {
+	/** mp_encode_uint(SWIM_DISSEMINATION) */
+	uint8_t k_header;
+	/** mp_encode_array() */
+	uint8_t m_header;
+	uint32_t v_header;
+};
+
+static inline void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
+{
+	header->k_header = SWIM_DISSEMINATION;
+	header->m_header = 0xdd;
+	header->v_header = mp_bswap_u32(batch_size);
+}
+
+/** SWIM event MsgPack template. */
+struct PACKED swim_event_bin {
+	/** mp_encode_map(4) */
+	uint8_t m_header;
+
+	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
+	uint8_t k_status;
+	/** mp_encode_uint(enum member_status) */
+	uint8_t v_status;
+
+	/** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
+	uint8_t k_addr;
+	/** mp_encode_uint(addr.sin_addr.s_addr) */
+	uint8_t m_addr;
+	uint32_t v_addr;
+
+	/** mp_encode_uint(SWIM_MEMBER_PORT) */
+	uint8_t k_port;
+	/** mp_encode_uint(addr.sin_port) */
+	uint8_t m_port;
+	uint16_t v_port;
+
+	/** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+	uint8_t k_incarnation;
+	/** mp_encode_uint(64bit incarnation) */
+	uint8_t m_incarnation;
+	uint64_t v_incarnation;
+};
+
+static inline void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+	header->m_header = 0x84;
+	header->k_status = SWIM_MEMBER_STATUS;
+	header->k_addr = SWIM_MEMBER_ADDRESS;
+	header->m_addr = 0xce;
+	header->k_port = SWIM_MEMBER_PORT;
+	header->m_port = 0xcd;
+	header->k_incarnation = SWIM_MEMBER_INCARNATION;
+	header->m_incarnation = 0xcf;
+}
+
+static inline void
+swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
+{
+	header->v_status = member->status;
+	header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+	header->v_port = mp_bswap_u16(member->addr.sin_port);
+	header->v_incarnation = mp_bswap_u64(member->incarnation);
+}
+
+static inline void
+swim_schedule_event(struct swim *swim, struct swim_member *member)
+{
+	if (rlist_empty(&member->in_queue_events)) {
+		rlist_add_tail_entry(&swim->queue_events, member,
+				     in_queue_events);
+	}
+	member->status_ttl = mh_size(swim->members);
+}
+
+/** }}}                 Dissemination component                 */
+
 /**
  * SWIM message structure:
  * {
@@ -441,6 +551,18 @@ swim_member_bin_create(struct swim_member_bin *header)
  *
  *                 OR/AND
  *
+ *     SWIM_DISSEMINATION: [
+ *         {
+ *             SWIM_MEMBER_STATUS: uint, enum member_status,
+ *             SWIM_MEMBER_ADDRESS: uint, ip,
+ *             SWIM_MEMBER_PORT: uint, port,
+ *             SWIM_MEMBER_INCARNATION: uint
+ *         },
+ *         ...
+ *     ],
+ *
+ *                 OR/AND
+ *
  *     SWIM_ANTI_ENTROPY: [
  *         {
  *             SWIM_MEMBER_STATUS: uint, enum member_status,
@@ -453,6 +575,16 @@ swim_member_bin_create(struct swim_member_bin *header)
  * }
  */
 
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_member_status_is_updated(struct swim *swim, struct swim_member *member)
+{
+	swim_schedule_event(swim, member);
+}
+
 /**
  * Update status and incarnation of the member if needed. Statuses
  * are compared as a compound key: {incarnation, status}. So @a
@@ -468,14 +600,16 @@ swim_member_update_status(struct swim *swim, struct swim_member *member,
 			  enum swim_member_status new_status,
 			  uint64_t incarnation)
 {
-	(void) swim;
 	assert(member != swim->self);
 	if (member->incarnation == incarnation) {
-		if (member->status < new_status)
+		if (member->status < new_status) {
 			member->status = new_status;
+			swim_member_status_is_updated(swim, member);
+		}
 	} else if (member->incarnation < incarnation) {
 		member->status = new_status;
 		member->incarnation = incarnation;
+		swim_member_status_is_updated(swim, member);
 	}
 }
 
@@ -497,6 +631,9 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
 	swim_task_destroy(&member->ack_task);
 	swim_task_destroy(&member->ping_task);
 
+	/* Dissemination component. */
+	assert(rlist_empty(&member->in_queue_events));
+
 	free(member);
 }
 
@@ -533,6 +670,10 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
 	swim_task_create(&member->ack_task, &swim->scheduler, swim_task_reset);
 	swim_task_create(&member->ping_task, &swim->scheduler, swim_task_reset);
 
+	/* Dissemination component. */
+	rlist_create(&member->in_queue_events);
+	swim_member_status_is_updated(swim, member);
+
 	return member;
 }
 
@@ -654,6 +795,60 @@ swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg,
 	return 1;
 }
 
+/**
+ * Encode a part of the dissemination component into a single SWIM
+ * packet.
+ * @retval -1 Error.
+ * @retval 0 Not error, but nothing is encoded.
+ * @retval 1 Something is encoded.
+ */
+static int
+swim_encode_dissemination_packet(struct swim_msg *msg, struct rlist **queue_pos)
+{
+	struct swim_diss_header_bin diss_header_bin;
+	int size = sizeof(diss_header_bin);
+	struct swim_packet *packet = swim_msg_reserve(msg, size);
+	if (packet == NULL)
+		return -1;
+	char *header = swim_packet_alloc(packet, size);
+
+	int i = 0;
+	struct swim_member *member, *prev = NULL;
+	struct swim_event_bin event_bin;
+	swim_event_bin_create(&event_bin);
+	rlist_foreach_entry(member, *queue_pos, in_queue_events) {
+		char *pos = swim_packet_alloc(packet, sizeof(event_bin));
+		if (pos == NULL)
+			break;
+		swim_event_bin_reset(&event_bin, member);
+		memcpy(pos, &event_bin, sizeof(event_bin));
+		++i;
+		prev = member;
+	}
+	if (i == 0)
+		return 0;
+	swim_diss_header_bin_create(&diss_header_bin, i);
+	memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+	swim_packet_flush(packet);
+	return 1;
+}
+
+/**
+ * Encode failure dissemination component.
+ * @retval -1 Error.
+ * @retval 1 Success, something is encoded.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_msg *msg)
+{
+	struct rlist *pos;
+	rlist_foreach(pos, &swim->queue_events) {
+		if (swim_encode_dissemination_packet(msg, &pos) < 0)
+			return -1;
+	}
+	return ! rlist_empty(&swim->queue_events);
+}
+
 /** Encode SWIM components into a sequence of UDP packets. */
 static int
 swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
@@ -670,6 +865,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
 		goto error;
 	map_size += rc;
 
+	rc = swim_encode_dissemination(swim, msg);
+	if (rc < 0)
+		goto error;
+	map_size += rc;
+
 	rc = swim_encode_anti_entropy(swim, msg);
 	if (rc < 0)
 		goto error;
@@ -685,6 +885,21 @@ error:
 
 /** Once per specified timeout trigger a next broadcast step. */
 static void
+swim_decrease_events_ttl(struct swim *swim)
+{
+	struct swim_member *member, *tmp;
+	rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
+				 tmp) {
+		if (--member->status_ttl == 0)
+			rlist_del_entry(member, in_queue_events);
+	}
+}
+
+/**
+ * Do one round step. Send encoded components to a next member
+ * from the queue.
+ */
+static void
 swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
 {
 	assert((events & EV_PERIODIC) != 0);
@@ -712,6 +927,7 @@ swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
 	swim_task_schedule(&swim->round_step_task,
 			   swim->transport->send_round_msg, &m->addr);
 	swim_member_schedule_ack_wait(swim, m);
+	swim_decrease_events_ttl(swim);
 	ev_periodic_stop(loop, p);
 }
 
@@ -779,12 +995,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
 			break;
 		++m->failed_pings;
 		if (m->failed_pings >= NO_ACKS_TO_GC) {
-			if (!m->is_pinned)
+			if (!m->is_pinned && m->status_ttl == 0)
 				swim_member_delete(swim, m);
 			continue;
 		}
-		if (m->failed_pings >= NO_ACKS_TO_DEAD)
+		if (m->failed_pings >= NO_ACKS_TO_DEAD) {
 			m->status = MEMBER_DEAD;
+			swim_member_status_is_updated(swim, m);
+		}
 		swim_schedule_ping(swim, m);
 		rlist_del_entry(m, in_queue_wait_ack);
 	}
@@ -828,6 +1046,7 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
 					  def->incarnation);
 		return;
 	}
+	uint64_t old_incarnation = self->incarnation;
 	/*
 	 * It is possible that other instances know a bigger
 	 * incarnation of this instance - such thing happens when
@@ -846,6 +1065,8 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
 		 */
 		self->incarnation++;
 	}
+	if (old_incarnation != self->incarnation)
+		swim_member_status_is_updated(swim, self);
 }
 
 static int
@@ -1032,6 +1253,50 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
 	return 0;
 }
 
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+	const char *msg_pref = "Invald SWIM dissemination message:";
+	if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+		say_error("%s message should be an array", msg_pref);
+		return -1;
+	}
+	uint64_t size = mp_decode_array(pos);
+	for (uint64_t i = 0; i < size; ++i) {
+		if (mp_typeof(**pos) != MP_MAP ||
+		    mp_check_map(*pos, end) > 0) {
+			say_error("%s event should be map", msg_pref);
+			return -1;
+		}
+		uint64_t map_size = mp_decode_map(pos);
+		struct swim_member_def def;
+		swim_member_def_create(&def);
+		for (uint64_t j = 0; j < map_size; ++j) {
+			if (mp_typeof(**pos) != MP_UINT ||
+			    mp_check_uint(*pos, end) > 0) {
+				say_error("%s event key should be uint",
+					  msg_pref);
+				return -1;
+			}
+			uint64_t key = mp_decode_uint(pos);
+			if (key >= swim_member_key_MAX) {
+				say_error("%s unknown event key", msg_pref);
+				return -1;
+			}
+			if (swim_process_member_key(key, pos, end, msg_pref,
+						    &def) != 0)
+				return -1;
+		}
+		if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+			say_error("%s member address should be specified",
+				  msg_pref);
+			return -1;
+		}
+		swim_process_member_update(swim, &def);
+	}
+	return 0;
+}
+
 /** Receive and process a new message. */
 static void
 swim_on_input(struct swim_scheduler *scheduler,
@@ -1065,6 +1330,11 @@ swim_on_input(struct swim_scheduler *scheduler,
 							   src) != 0)
 				return;
 			break;
+		case SWIM_DISSEMINATION:
+			say_verbose("SWIM: process dissemination");
+			if (swim_process_dissemination(swim, &pos, end) != 0)
+				return;
+			break;
 		default:
 			say_error("%s unknown component type component is "\
 				  "supported", msg_pref);
@@ -1142,6 +1412,10 @@ swim_new(void)
 	ev_init(&swim->wait_ack_tick, swim_check_acks);
 	ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
 	swim->wait_ack_tick.data = (void *) swim;
+
+	/* Dissemination events. */
+	rlist_create(&swim->queue_events);
+
 	return swim;
 }
 
@@ -1199,8 +1473,10 @@ swim_remove_member(struct swim *swim, const char *uri)
 	if (uri_to_addr(uri, &addr) != 0)
 		return -1;
 	struct swim_member *member = swim_find_member(swim, &addr);
-	if (member != NULL)
+	if (member != NULL) {
+		rlist_del_entry(member, in_queue_events);
 		swim_member_delete(swim, member);
+	}
 	return 0;
 }
 
@@ -1236,6 +1512,7 @@ swim_delete(struct swim *swim)
 	while (node != mh_end(swim->members)) {
 		struct swim_member *m = (struct swim_member *)
 			mh_i64ptr_node(swim->members, node)->val;
+		rlist_del_entry(m, in_queue_events);
 		swim_member_delete(swim, m);
 		node = mh_first(swim->members);
 	}
-- 
2.17.2 (Apple Git-113)




More information about the Tarantool-patches mailing list