[PATCH 3/5] swim: introduce a dissemination component

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Mon Dec 17 15:53:21 MSK 2018


Dissemination components broadcasts events about member status
updates.

Public API:

swim.cfg({server = <uri>, members = <array of uris>,
          heartbeat = <seconds>})

    Configures the SWIM module.

    @server - URI of UDP server to which other cluster
        members will send SWIM messages. It should
        have the format "ip:port".

    @members - array of URIs explicitly defined by a
        user. These members are never deleted from
        members table until they are removed from the
        configuration explicitly. SWIM downloads from
        them their members tables, merges with its
        own and repeats.

    @heartbeat - how often send a part of members
        table to another member. Note, that it is not
        how ofter send the whole table, nor how ofter
        to send the table to all members. It is only
        one step of the protocol.

swim.stop()

    Stops the SWIM module: shuts down the server,
    closes socket, destroys queues, frees memory.
    Note that after it swim.cfg can be called again.

swim.info()

    Show info about each known member in the format:
    {
        ["ip:port"] = {
            status = <alive/dead>,
            incarnation = <growing unsigned number>
        }
    }

Closes #3234
---
 src/lib/swim/swim.c | 237 ++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 231 insertions(+), 6 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 6b2f1ca0c..bbf6b7fd5 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -48,7 +48,6 @@
  * - indirect ping.
  * - increment own incarnation on each round.
  * - attach dst incarnation to ping.
- * - fix swim_member_bin mp_encode_map(2) to 3 in the first patch.
  */
 
 /**
@@ -224,6 +223,26 @@ struct swim_member {
 	struct swim_io_task ping_task;
 	/** Position in a queue of members waiting for an ack. */
 	struct rlist in_queue_wait_ack;
+	/**
+	 *
+	 *                 Dissemination component
+	 *
+	 * Dissemination component sends events. Event is a
+	 * notification about member status update. So formally,
+	 * this structure already has all the needed attributes.
+	 * But also an event somehow should be sent to all members
+	 * at least once according to SWIM, so it requires
+	 * something like TTL, which decrements on each send. And
+	 * a member can not be removed from the global table until
+	 * it gets dead and its dissemination TTL is 0, so as to
+	 * allow other members learn its dead status.
+	 */
+	int dissemination_ttl;
+	/**
+	 * Events are put into a queue sorted by event occurrence
+	 * time.
+	 */
+	struct rlist in_queue_events;
 };
 
 /**
@@ -240,6 +259,7 @@ static struct swim_member *self = NULL;
 enum swim_component_type {
 	SWIM_ANTI_ENTROPY = 0,
 	SWIM_FAILURE_DETECTION,
+	SWIM_DISSEMINATION,
 };
 
 /** {{{                Failure detection component              */
@@ -438,6 +458,92 @@ static struct swim_io_task round_step_task = {
 
 /** }}}                  Anti-entropy component                 */
 
+/** {{{                 Dissemination component                 */
+
+/** SWIM dissemination MsgPack template. */
+struct PACKED swim_diss_header_bin {
+	/** mp_encode_uint(SWIM_DISSEMINATION) */
+	uint8_t k_header;
+	/** mp_encode_array() */
+	uint8_t m_header;
+	uint32_t v_header;
+};
+
+static inline void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
+{
+	header->k_header = SWIM_DISSEMINATION;
+	header->m_header = 0xdd;
+	header->v_header = mp_bswap_u32(batch_size);
+}
+
+/** SWIM event MsgPack template. */
+struct PACKED swim_event_bin {
+	/** mp_encode_map(4) */
+	uint8_t m_header;
+
+	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
+	uint8_t k_status;
+	/** mp_encode_uint(enum member_status) */
+	uint8_t v_status;
+
+	/** mp_encode_uint(SWIM_MEMBER_ADDR) */
+	uint8_t k_addr;
+	/** mp_encode_uint(addr.sin_addr.s_addr) */
+	uint8_t m_addr;
+	uint32_t v_addr;
+
+	/** mp_encode_uint(SWIM_MEMBER_PORT) */
+	uint8_t k_port;
+	/** mp_encode_uint(addr.sin_port) */
+	uint8_t m_port;
+	uint16_t v_port;
+
+	/** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+	uint8_t k_incarnation;
+	/** mp_encode_uint(64bit incarnation) */
+	uint8_t m_incarnation;
+	uint64_t v_incarnation;
+};
+
+static inline void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+	header->m_header = 0x84;
+	header->k_status = SWIM_MEMBER_STATUS;
+	header->k_addr = SWIM_MEMBER_ADDR;
+	header->m_addr = 0xce;
+	header->k_port = SWIM_MEMBER_PORT;
+	header->m_port = 0xcd;
+	header->k_incarnation = SWIM_MEMBER_INCARNATION;
+	header->m_incarnation = 0xcf;
+}
+
+static inline void
+swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
+{
+	header->v_status = member->status;
+	header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+	header->v_port = mp_bswap_u16(member->addr.sin_port);
+	header->v_incarnation = mp_bswap_u64(member->incarnation);
+}
+
+/** Queue of events sorted by occurrence time. */
+static RLIST_HEAD(queue_events);
+static int event_count = 0;
+
+static inline void
+swim_schedule_event(struct swim_member *member)
+{
+	if (rlist_empty(&member->in_queue_events)) {
+		rlist_add_tail_entry(&queue_events, member, in_queue_events);
+		event_count++;
+	}
+	member->dissemination_ttl = mh_size(members);
+}
+
+/** }}}                 Dissemination component                 */
+
 /**
  * SWIM message structure:
  * {
@@ -448,6 +554,18 @@ static struct swim_io_task round_step_task = {
  *
  *                 OR/AND
  *
+ *     SWIM_DISSEMINATION: [
+ *         {
+ *             SWIM_MEMBER_STATUS: uint, enum member_status,
+ *             SWIM_MEMBER_ADDR: uint, ip,
+ *             SWIM_MEMBER_PORT: uint, port,
+ *             SWIM_MEMBER_INCARNATION: uint
+ *         },
+ *         ...
+ *     ],
+ *
+ *                 OR/AND
+ *
  *     SWIM_ANTI_ENTROPY: [
  *         {
  *             SWIM_MEMBER_STATUS: uint, enum member_status,
@@ -531,6 +649,16 @@ swim_send_ack(struct swim_io_task *task);
 static void
 swim_send_ping(struct swim_io_task *task);
 
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_member_is_updated(struct swim_member *member)
+{
+	swim_schedule_event(member);
+}
+
 /**
  * Update status of the member if needed. Statuses are compared as
  * a compound key: {incarnation, status}. So @a new_status can
@@ -548,11 +676,14 @@ swim_member_update_status(struct swim_member *member,
 {
 	assert(member != self);
 	if (member->incarnation == incarnation) {
-		if (member->status < new_status)
+		if (member->status < new_status) {
 			member->status = new_status;
+			swim_member_is_updated(member);
+		}
 	} else if (member->incarnation < incarnation) {
 		member->status = new_status;
 		member->incarnation = incarnation;
+		swim_member_is_updated(member);
 	}
 }
 
@@ -589,6 +720,8 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status,
 	swim_io_task_create(&member->ping_task, swim_send_ping);
 	rlist_add_entry(&queue_round, member, in_queue_round);
 	rlist_create(&member->in_queue_wait_ack);
+	rlist_create(&member->in_queue_events);
+	swim_schedule_event(member);
 	return member;
 }
 
@@ -617,6 +750,7 @@ swim_member_delete(struct swim_member *member)
 	swim_io_task_destroy(&member->ping_task);
 	rlist_del_entry(member, in_queue_round);
 	rlist_del_entry(member, in_queue_wait_ack);
+	assert(rlist_empty(&member->in_queue_events));
 	free(member);
 }
 
@@ -696,19 +830,53 @@ swim_encode_round_msg(char *buffer, int size)
 	assert((uint)size > sizeof(struct swim_fd_header_bin) + 1);
 	size -= sizeof(struct swim_fd_header_bin) + 1;
 
+	int diss_batch_size = calculate_bin_batch_size(
+		sizeof(struct swim_diss_header_bin),
+		sizeof(struct swim_event_bin), size);
+	if (diss_batch_size > event_count)
+		diss_batch_size = event_count;
+	size -= sizeof(struct swim_diss_header_bin) -
+		diss_batch_size * sizeof(struct swim_event_bin);
+
 	int ae_batch_size = calculate_bin_batch_size(
 		sizeof(struct swim_anti_entropy_header_bin),
 		sizeof(struct swim_member_bin), size);
 	if (ae_batch_size > shuffled_members_size)
 		ae_batch_size = shuffled_members_size;
 
-	buffer = mp_encode_map(buffer, 2);
+	buffer = mp_encode_map(buffer, 1 + (diss_batch_size > 0) +
+			       (ae_batch_size > 0));
 
 	struct swim_fd_header_bin fd_header_bin;
 	swim_fd_header_bin_create(&fd_header_bin, SWIM_FD_MSG_PING);
 	memcpy(buffer, &fd_header_bin, sizeof(fd_header_bin));
 	buffer += sizeof(fd_header_bin);
 
+	if (diss_batch_size > 0) {
+		struct swim_diss_header_bin diss_header_bin;
+		swim_diss_header_bin_create(&diss_header_bin, diss_batch_size);
+		memcpy(buffer, &diss_header_bin, sizeof(diss_header_bin));
+		buffer += sizeof(diss_header_bin);
+
+		int i = 0;
+		struct swim_member *member, *tmp;
+		struct swim_event_bin event_bin;
+		swim_event_bin_create(&event_bin);
+		rlist_foreach_entry_safe(member, &queue_events, in_queue_events,
+					 tmp) {
+			swim_event_bin_reset(&event_bin, member);
+			memcpy(buffer, &event_bin, sizeof(event_bin));
+			buffer += sizeof(event_bin);
+			rlist_del_entry(member, in_queue_events);
+			--member->dissemination_ttl;
+			if (++i >= diss_batch_size)
+				break;
+		}
+		event_count -= diss_batch_size;
+	}
+
+	if (ae_batch_size == 0)
+		return buffer - start;
 	struct swim_anti_entropy_header_bin ae_header_bin;
 	swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size);
 	memcpy(buffer, &ae_header_bin, sizeof(ae_header_bin));
@@ -840,12 +1008,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
 			break;
 		++m->failed_pings;
 		if (m->failed_pings >= NO_ACKS_TO_GC) {
-			if (!m->is_pinned)
+			if (!m->is_pinned && m->dissemination_ttl == 0)
 				swim_member_delete(m);
 			continue;
 		}
-		if (m->failed_pings >= NO_ACKS_TO_DEAD)
+		if (m->failed_pings >= NO_ACKS_TO_DEAD) {
 			m->status = MEMBER_DEAD;
+			swim_member_is_updated(m);
+		}
 		swim_io_task_push(&m->ping_task);
 		rlist_del_entry(m, in_queue_wait_ack);
 	}
@@ -1094,6 +1264,50 @@ swim_process_failure_detection(const char **pos, const char *end,
 	return 0;
 }
 
+static int
+swim_process_dissemination(const char **pos, const char *end)
+{
+	const char *msg_pref = "Invald SWIM dissemination message:";
+	if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+		say_error("%s message should be an array", msg_pref);
+		return -1;
+	}
+	uint64_t size = mp_decode_array(pos);
+	for (uint64_t i = 0; i < size; ++i) {
+		if (mp_typeof(**pos) != MP_MAP ||
+		    mp_check_map(*pos, end) > 0) {
+			say_error("%s event should be map", msg_pref);
+			return -1;
+		}
+		uint64_t map_size = mp_decode_map(pos);
+		struct swim_member_def def;
+		swim_member_def_create(&def);
+		for (uint64_t j = 0; j < map_size; ++j) {
+			if (mp_typeof(**pos) != MP_UINT ||
+			    mp_check_uint(*pos, end) > 0) {
+				say_error("%s event key should be uint",
+					  msg_pref);
+				return -1;
+			}
+			uint64_t key = mp_decode_uint(pos);
+			if (key >= swim_member_key_MAX) {
+				say_error("%s unknown event key", msg_pref);
+				return -1;
+			}
+			if (swim_process_member_key(key, pos, end, msg_pref,
+						    &def) != 0)
+				return -1;
+		}
+		if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+			say_error("%s member address should be specified",
+				  msg_pref);
+			return -1;
+		}
+		swim_process_member_update(&def);
+	}
+	return 0;
+}
+
 /** Receive and process a new message. */
 static void
 swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
@@ -1140,6 +1354,11 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
 							   &addr) != 0)
 				return;
 			break;
+		case SWIM_DISSEMINATION:
+			say_verbose("SWIM: process dissemination");
+			if (swim_process_dissemination(&pos, end) != 0)
+				return;
+			break;
 		default:
 			say_error("%s unknown component type component is "\
 				  "supported", msg_pref);
@@ -1299,6 +1518,7 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
 error:
 	for (int i = 0; i < new_cfg_size; ++i) {
 		if (new_cfg[i]->status == new_status) {
+			rlist_del_entry(new_cfg[i], in_queue_events);
 			swim_member_delete(new_cfg[i]);
 			if (new_self == new_cfg[i])
 				new_self = NULL;
@@ -1306,8 +1526,10 @@ error:
 	}
 	if (member_uri_count > 0)
 		free(new_cfg);
-	if (new_self != NULL && new_self->status == new_status)
+	if (new_self != NULL && new_self->status == new_status) {
+		rlist_del_entry(new_self, in_queue_events);
 		swim_member_delete(new_self);
+	}
 	return -1;
 }
 
@@ -1346,6 +1568,7 @@ swim_stop(void)
 	while (node != mh_end(members)) {
 		struct swim_member *m = (struct swim_member *)
 			mh_i64ptr_node(members, node)->val;
+		rlist_del_entry(m, in_queue_events);
 		swim_member_delete(m);
 		node = mh_first(members);
 	}
@@ -1358,9 +1581,11 @@ swim_stop(void)
 	cfg_size = 0;
 	shuffled_members = NULL;
 	shuffled_members_size = 0;
+	event_count = 0;
 	rlist_create(&queue_wait_ack);
 	rlist_create(&queue_output);
 	rlist_create(&queue_round);
+	rlist_create(&queue_events);
 }
 
 #ifndef NDEBUG
-- 
2.17.2 (Apple Git-113)




More information about the Tarantool-patches mailing list