Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: vdavydov.dev@gmail.com
Subject: [PATCH 3/5] swim: introduce a dissemination component
Date: Mon, 17 Dec 2018 15:53:21 +0300	[thread overview]
Message-ID: <8c392a1f3796487465b29936483cee575ea339b0.1545047950.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1545047950.git.v.shpilevoy@tarantool.org>
In-Reply-To: <cover.1545047950.git.v.shpilevoy@tarantool.org>

Dissemination components broadcasts events about member status
updates.

Public API:

swim.cfg({server = <uri>, members = <array of uris>,
          heartbeat = <seconds>})

    Configures the SWIM module.

    @server - URI of UDP server to which other cluster
        members will send SWIM messages. It should
        have the format "ip:port".

    @members - array of URIs explicitly defined by a
        user. These members are never deleted from
        members table until they are removed from the
        configuration explicitly. SWIM downloads from
        them their members tables, merges with its
        own and repeats.

    @heartbeat - how often send a part of members
        table to another member. Note, that it is not
        how ofter send the whole table, nor how ofter
        to send the table to all members. It is only
        one step of the protocol.

swim.stop()

    Stops the SWIM module: shuts down the server,
    closes socket, destroys queues, frees memory.
    Note that after it swim.cfg can be called again.

swim.info()

    Show info about each known member in the format:
    {
        ["ip:port"] = {
            status = <alive/dead>,
            incarnation = <growing unsigned number>
        }
    }

Closes #3234
---
 src/lib/swim/swim.c | 237 ++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 231 insertions(+), 6 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 6b2f1ca0c..bbf6b7fd5 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -48,7 +48,6 @@
  * - indirect ping.
  * - increment own incarnation on each round.
  * - attach dst incarnation to ping.
- * - fix swim_member_bin mp_encode_map(2) to 3 in the first patch.
  */
 
 /**
@@ -224,6 +223,26 @@ struct swim_member {
 	struct swim_io_task ping_task;
 	/** Position in a queue of members waiting for an ack. */
 	struct rlist in_queue_wait_ack;
+	/**
+	 *
+	 *                 Dissemination component
+	 *
+	 * Dissemination component sends events. Event is a
+	 * notification about member status update. So formally,
+	 * this structure already has all the needed attributes.
+	 * But also an event somehow should be sent to all members
+	 * at least once according to SWIM, so it requires
+	 * something like TTL, which decrements on each send. And
+	 * a member can not be removed from the global table until
+	 * it gets dead and its dissemination TTL is 0, so as to
+	 * allow other members learn its dead status.
+	 */
+	int dissemination_ttl;
+	/**
+	 * Events are put into a queue sorted by event occurrence
+	 * time.
+	 */
+	struct rlist in_queue_events;
 };
 
 /**
@@ -240,6 +259,7 @@ static struct swim_member *self = NULL;
 enum swim_component_type {
 	SWIM_ANTI_ENTROPY = 0,
 	SWIM_FAILURE_DETECTION,
+	SWIM_DISSEMINATION,
 };
 
 /** {{{                Failure detection component              */
@@ -438,6 +458,92 @@ static struct swim_io_task round_step_task = {
 
 /** }}}                  Anti-entropy component                 */
 
+/** {{{                 Dissemination component                 */
+
+/** SWIM dissemination MsgPack template. */
+struct PACKED swim_diss_header_bin {
+	/** mp_encode_uint(SWIM_DISSEMINATION) */
+	uint8_t k_header;
+	/** mp_encode_array() */
+	uint8_t m_header;
+	uint32_t v_header;
+};
+
+static inline void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
+{
+	header->k_header = SWIM_DISSEMINATION;
+	header->m_header = 0xdd;
+	header->v_header = mp_bswap_u32(batch_size);
+}
+
+/** SWIM event MsgPack template. */
+struct PACKED swim_event_bin {
+	/** mp_encode_map(4) */
+	uint8_t m_header;
+
+	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
+	uint8_t k_status;
+	/** mp_encode_uint(enum member_status) */
+	uint8_t v_status;
+
+	/** mp_encode_uint(SWIM_MEMBER_ADDR) */
+	uint8_t k_addr;
+	/** mp_encode_uint(addr.sin_addr.s_addr) */
+	uint8_t m_addr;
+	uint32_t v_addr;
+
+	/** mp_encode_uint(SWIM_MEMBER_PORT) */
+	uint8_t k_port;
+	/** mp_encode_uint(addr.sin_port) */
+	uint8_t m_port;
+	uint16_t v_port;
+
+	/** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+	uint8_t k_incarnation;
+	/** mp_encode_uint(64bit incarnation) */
+	uint8_t m_incarnation;
+	uint64_t v_incarnation;
+};
+
+static inline void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+	header->m_header = 0x84;
+	header->k_status = SWIM_MEMBER_STATUS;
+	header->k_addr = SWIM_MEMBER_ADDR;
+	header->m_addr = 0xce;
+	header->k_port = SWIM_MEMBER_PORT;
+	header->m_port = 0xcd;
+	header->k_incarnation = SWIM_MEMBER_INCARNATION;
+	header->m_incarnation = 0xcf;
+}
+
+static inline void
+swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
+{
+	header->v_status = member->status;
+	header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+	header->v_port = mp_bswap_u16(member->addr.sin_port);
+	header->v_incarnation = mp_bswap_u64(member->incarnation);
+}
+
+/** Queue of events sorted by occurrence time. */
+static RLIST_HEAD(queue_events);
+static int event_count = 0;
+
+static inline void
+swim_schedule_event(struct swim_member *member)
+{
+	if (rlist_empty(&member->in_queue_events)) {
+		rlist_add_tail_entry(&queue_events, member, in_queue_events);
+		event_count++;
+	}
+	member->dissemination_ttl = mh_size(members);
+}
+
+/** }}}                 Dissemination component                 */
+
 /**
  * SWIM message structure:
  * {
@@ -448,6 +554,18 @@ static struct swim_io_task round_step_task = {
  *
  *                 OR/AND
  *
+ *     SWIM_DISSEMINATION: [
+ *         {
+ *             SWIM_MEMBER_STATUS: uint, enum member_status,
+ *             SWIM_MEMBER_ADDR: uint, ip,
+ *             SWIM_MEMBER_PORT: uint, port,
+ *             SWIM_MEMBER_INCARNATION: uint
+ *         },
+ *         ...
+ *     ],
+ *
+ *                 OR/AND
+ *
  *     SWIM_ANTI_ENTROPY: [
  *         {
  *             SWIM_MEMBER_STATUS: uint, enum member_status,
@@ -531,6 +649,16 @@ swim_send_ack(struct swim_io_task *task);
 static void
 swim_send_ping(struct swim_io_task *task);
 
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_member_is_updated(struct swim_member *member)
+{
+	swim_schedule_event(member);
+}
+
 /**
  * Update status of the member if needed. Statuses are compared as
  * a compound key: {incarnation, status}. So @a new_status can
@@ -548,11 +676,14 @@ swim_member_update_status(struct swim_member *member,
 {
 	assert(member != self);
 	if (member->incarnation == incarnation) {
-		if (member->status < new_status)
+		if (member->status < new_status) {
 			member->status = new_status;
+			swim_member_is_updated(member);
+		}
 	} else if (member->incarnation < incarnation) {
 		member->status = new_status;
 		member->incarnation = incarnation;
+		swim_member_is_updated(member);
 	}
 }
 
@@ -589,6 +720,8 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status,
 	swim_io_task_create(&member->ping_task, swim_send_ping);
 	rlist_add_entry(&queue_round, member, in_queue_round);
 	rlist_create(&member->in_queue_wait_ack);
+	rlist_create(&member->in_queue_events);
+	swim_schedule_event(member);
 	return member;
 }
 
@@ -617,6 +750,7 @@ swim_member_delete(struct swim_member *member)
 	swim_io_task_destroy(&member->ping_task);
 	rlist_del_entry(member, in_queue_round);
 	rlist_del_entry(member, in_queue_wait_ack);
+	assert(rlist_empty(&member->in_queue_events));
 	free(member);
 }
 
@@ -696,19 +830,53 @@ swim_encode_round_msg(char *buffer, int size)
 	assert((uint)size > sizeof(struct swim_fd_header_bin) + 1);
 	size -= sizeof(struct swim_fd_header_bin) + 1;
 
+	int diss_batch_size = calculate_bin_batch_size(
+		sizeof(struct swim_diss_header_bin),
+		sizeof(struct swim_event_bin), size);
+	if (diss_batch_size > event_count)
+		diss_batch_size = event_count;
+	size -= sizeof(struct swim_diss_header_bin) -
+		diss_batch_size * sizeof(struct swim_event_bin);
+
 	int ae_batch_size = calculate_bin_batch_size(
 		sizeof(struct swim_anti_entropy_header_bin),
 		sizeof(struct swim_member_bin), size);
 	if (ae_batch_size > shuffled_members_size)
 		ae_batch_size = shuffled_members_size;
 
-	buffer = mp_encode_map(buffer, 2);
+	buffer = mp_encode_map(buffer, 1 + (diss_batch_size > 0) +
+			       (ae_batch_size > 0));
 
 	struct swim_fd_header_bin fd_header_bin;
 	swim_fd_header_bin_create(&fd_header_bin, SWIM_FD_MSG_PING);
 	memcpy(buffer, &fd_header_bin, sizeof(fd_header_bin));
 	buffer += sizeof(fd_header_bin);
 
+	if (diss_batch_size > 0) {
+		struct swim_diss_header_bin diss_header_bin;
+		swim_diss_header_bin_create(&diss_header_bin, diss_batch_size);
+		memcpy(buffer, &diss_header_bin, sizeof(diss_header_bin));
+		buffer += sizeof(diss_header_bin);
+
+		int i = 0;
+		struct swim_member *member, *tmp;
+		struct swim_event_bin event_bin;
+		swim_event_bin_create(&event_bin);
+		rlist_foreach_entry_safe(member, &queue_events, in_queue_events,
+					 tmp) {
+			swim_event_bin_reset(&event_bin, member);
+			memcpy(buffer, &event_bin, sizeof(event_bin));
+			buffer += sizeof(event_bin);
+			rlist_del_entry(member, in_queue_events);
+			--member->dissemination_ttl;
+			if (++i >= diss_batch_size)
+				break;
+		}
+		event_count -= diss_batch_size;
+	}
+
+	if (ae_batch_size == 0)
+		return buffer - start;
 	struct swim_anti_entropy_header_bin ae_header_bin;
 	swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size);
 	memcpy(buffer, &ae_header_bin, sizeof(ae_header_bin));
@@ -840,12 +1008,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
 			break;
 		++m->failed_pings;
 		if (m->failed_pings >= NO_ACKS_TO_GC) {
-			if (!m->is_pinned)
+			if (!m->is_pinned && m->dissemination_ttl == 0)
 				swim_member_delete(m);
 			continue;
 		}
-		if (m->failed_pings >= NO_ACKS_TO_DEAD)
+		if (m->failed_pings >= NO_ACKS_TO_DEAD) {
 			m->status = MEMBER_DEAD;
+			swim_member_is_updated(m);
+		}
 		swim_io_task_push(&m->ping_task);
 		rlist_del_entry(m, in_queue_wait_ack);
 	}
@@ -1094,6 +1264,50 @@ swim_process_failure_detection(const char **pos, const char *end,
 	return 0;
 }
 
+static int
+swim_process_dissemination(const char **pos, const char *end)
+{
+	const char *msg_pref = "Invald SWIM dissemination message:";
+	if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+		say_error("%s message should be an array", msg_pref);
+		return -1;
+	}
+	uint64_t size = mp_decode_array(pos);
+	for (uint64_t i = 0; i < size; ++i) {
+		if (mp_typeof(**pos) != MP_MAP ||
+		    mp_check_map(*pos, end) > 0) {
+			say_error("%s event should be map", msg_pref);
+			return -1;
+		}
+		uint64_t map_size = mp_decode_map(pos);
+		struct swim_member_def def;
+		swim_member_def_create(&def);
+		for (uint64_t j = 0; j < map_size; ++j) {
+			if (mp_typeof(**pos) != MP_UINT ||
+			    mp_check_uint(*pos, end) > 0) {
+				say_error("%s event key should be uint",
+					  msg_pref);
+				return -1;
+			}
+			uint64_t key = mp_decode_uint(pos);
+			if (key >= swim_member_key_MAX) {
+				say_error("%s unknown event key", msg_pref);
+				return -1;
+			}
+			if (swim_process_member_key(key, pos, end, msg_pref,
+						    &def) != 0)
+				return -1;
+		}
+		if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+			say_error("%s member address should be specified",
+				  msg_pref);
+			return -1;
+		}
+		swim_process_member_update(&def);
+	}
+	return 0;
+}
+
 /** Receive and process a new message. */
 static void
 swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
@@ -1140,6 +1354,11 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
 							   &addr) != 0)
 				return;
 			break;
+		case SWIM_DISSEMINATION:
+			say_verbose("SWIM: process dissemination");
+			if (swim_process_dissemination(&pos, end) != 0)
+				return;
+			break;
 		default:
 			say_error("%s unknown component type component is "\
 				  "supported", msg_pref);
@@ -1299,6 +1518,7 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
 error:
 	for (int i = 0; i < new_cfg_size; ++i) {
 		if (new_cfg[i]->status == new_status) {
+			rlist_del_entry(new_cfg[i], in_queue_events);
 			swim_member_delete(new_cfg[i]);
 			if (new_self == new_cfg[i])
 				new_self = NULL;
@@ -1306,8 +1526,10 @@ error:
 	}
 	if (member_uri_count > 0)
 		free(new_cfg);
-	if (new_self != NULL && new_self->status == new_status)
+	if (new_self != NULL && new_self->status == new_status) {
+		rlist_del_entry(new_self, in_queue_events);
 		swim_member_delete(new_self);
+	}
 	return -1;
 }
 
@@ -1346,6 +1568,7 @@ swim_stop(void)
 	while (node != mh_end(members)) {
 		struct swim_member *m = (struct swim_member *)
 			mh_i64ptr_node(members, node)->val;
+		rlist_del_entry(m, in_queue_events);
 		swim_member_delete(m);
 		node = mh_first(members);
 	}
@@ -1358,9 +1581,11 @@ swim_stop(void)
 	cfg_size = 0;
 	shuffled_members = NULL;
 	shuffled_members_size = 0;
+	event_count = 0;
 	rlist_create(&queue_wait_ack);
 	rlist_create(&queue_output);
 	rlist_create(&queue_round);
+	rlist_create(&queue_events);
 }
 
 #ifndef NDEBUG
-- 
2.17.2 (Apple Git-113)

  parent reply	other threads:[~2018-12-17 12:53 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-12-17 12:53 [PATCH 0/5] SWIM Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 1/5] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 2/5] swim: introduce failure detection component Vladislav Shpilevoy
2018-12-17 12:53 ` Vladislav Shpilevoy [this message]
2018-12-17 12:53 ` [PATCH 4/5] swim: introduce "suspected" status Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 5/5] swim: keep encoded round message cached Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=8c392a1f3796487465b29936483cee575ea339b0.1545047950.git.v.shpilevoy@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [PATCH 3/5] swim: introduce a dissemination component' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox