Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: kostja@tarantool.org, vdavydov.dev@gmail.com
Subject: [PATCH v4 08/12] [RAW] swim: introduce payload
Date: Thu, 31 Jan 2019 00:28:37 +0300	[thread overview]
Message-ID: <2978d7e5238a77fed54c4c61d3a892c586747c35.1548883137.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1548883137.git.v.shpilevoy@tarantool.org>
In-Reply-To: <cover.1548883137.git.v.shpilevoy@tarantool.org>

Payload is an arbitrary user data, disseminated just like other
member attributes.

Part of #3234
---
 src/lib/swim/swim.c       | 102 +++++++++++++++++++++++++++++++++++---
 src/lib/swim/swim.h       |   4 ++
 src/lib/swim/swim_proto.c |  50 +++++++++++++++----
 src/lib/swim/swim_proto.h |  27 ++++++++--
 4 files changed, 163 insertions(+), 20 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 40faa296e..78dbc6092 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -259,6 +259,16 @@ struct swim_member {
 	 * learn its dead status.
 	 */
 	int status_ttl;
+	/** Arbitrary user data, disseminated on each change. */
+	char *payload;
+	/** Payload size, in bytes. */
+	int payload_size;
+	/**
+	 * TTL of payload. At most this number of times payload is
+	 * sent as a part of dissemination component. Reset on
+	 * each update.
+	 */
+	int payload_ttl;
 	/**
 	 * Events are put into a queue sorted by event occurrence
 	 * time.
@@ -415,6 +425,14 @@ swim_member_uuid_is_updated(struct swim_member *member, struct swim *swim)
 	swim_schedule_event(swim, member);
 }
 
+/** Make all needed actions to process member's payload update. */
+static void
+swim_member_payload_is_updated(struct swim_member *member, struct swim *swim)
+{
+	member->payload_ttl = mh_size(swim->members);
+	swim_schedule_event(swim, member);
+}
+
 /**
  * Update status and incarnation of the member if needed. Statuses
  * are compared as a compound key: {incarnation, status}. So @a
@@ -458,6 +476,31 @@ swim_by_scheduler(struct swim_scheduler *scheduler)
 	return container_of(scheduler, struct swim, scheduler);
 }
 
+/**
+ * Update members payload if necessary. If a payload is the same -
+ * nothing happens. Fortunately, memcmp here is not expensive,
+ * because 1) payload change is extra rare event usually,
+ * 2) max payload size is very limited.
+ */
+static inline int
+swim_member_update_payload(struct swim_member *member, const char *payload,
+			   int payload_size, struct swim *swim)
+{
+	if (payload_size == member->payload_size &&
+	    memcmp(payload, member->payload, payload_size) == 0)
+		return 0;
+	char *new_payload = (char *) realloc(member->payload, payload_size);
+	if (new_payload == NULL) {
+		diag_set(OutOfMemory, payload_size, "realloc", "new_payload");
+		return -1;
+	}
+	memcpy(new_payload, payload, payload_size);
+	member->payload = new_payload;
+	member->payload_size = payload_size;
+	swim_member_payload_is_updated(member, swim);
+	return 0;
+}
+
 /**
  * Remove the member from all queues, hashes, destroy it and free
  * the memory.
@@ -480,6 +523,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
 
 	/* Dissemination component. */
 	rlist_del_entry(member, in_queue_events);
+	free(member->payload);
 
 	free(member);
 }
@@ -522,7 +566,7 @@ swim_ping_task_complete(struct swim_task *task,
 static struct swim_member *
 swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
 		const struct tt_uuid *uuid, enum swim_member_status status,
-		uint64_t incarnation)
+		uint64_t incarnation, const char *payload, int payload_size)
 {
 	struct swim_member *member =
 		(struct swim_member *) calloc(1, sizeof(*member));
@@ -553,6 +597,11 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
 	/* Dissemination component. */
 	rlist_create(&member->in_queue_events);
 	swim_member_status_is_updated(member, swim);
+	if (swim_member_update_payload(member, payload, payload_size,
+				       swim) != 0) {
+		swim_member_delete(swim, member);
+		return NULL;
+	}
 
 	say_verbose("SWIM: member %s is added", swim_uuid_str(uuid));
 	return member;
@@ -632,14 +681,17 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
 	for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
 	     i < member_count; ++i) {
 		struct swim_member *m = *mh_swim_table_node(t, rc);
-		int new_size = size + sizeof(member_bin);
+		int new_size = size + sizeof(member_bin) + m->payload_size;
 		char *pos = swim_packet_reserve(packet, new_size);
 		if (pos == NULL)
 			break;
 		size = new_size;
 		swim_member_bin_fill(&member_bin, &m->addr, &m->uuid,
-				     m->status, m->incarnation);
+				     m->status, m->incarnation,
+				     m->payload_size);
 		memcpy(pos, &member_bin, sizeof(member_bin));
+		pos += sizeof(member_bin);
+		memcpy(pos, m->payload, m->payload_size);
 		/*
 		 * First random member could be choosen too close
 		 * to the hash end. Here the cycle is wrapped, if
@@ -718,17 +770,27 @@ swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
 		int new_size = size + sizeof(event_bin);
 		if (m->old_uuid_ttl > 0)
 			new_size += sizeof(old_uuid_bin);
+		if (m->payload_ttl > 0) {
+			new_size += mp_sizeof_uint(SWIM_MEMBER_PAYLOAD) +
+				    mp_sizeof_bin(m->payload_size);
+		}
 		char *pos = swim_packet_reserve(packet, new_size);
 		if (pos == NULL)
 			break;
 		size = new_size;
 		swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid,
-				    m->incarnation, m->old_uuid_ttl);
+				    m->incarnation, m->old_uuid_ttl,
+				    m->payload_ttl);
 		memcpy(pos, &event_bin, sizeof(event_bin));
+		pos += sizeof(event_bin);
 		if (m->old_uuid_ttl > 0) {
-			pos += sizeof(event_bin);
 			swim_old_uuid_bin_fill(&old_uuid_bin, &m->old_uuid);
 			memcpy(pos, &old_uuid_bin, sizeof(old_uuid_bin));
+			pos += sizeof(old_uuid_bin);
+		}
+		if (m->payload_ttl > 0) {
+			pos = mp_encode_uint(pos, SWIM_MEMBER_PAYLOAD);
+			mp_encode_bin(pos, m->payload, m->payload_size);
 		}
 		++i;
 	}
@@ -778,6 +840,8 @@ swim_decrease_events_ttl(struct swim *swim)
 				 tmp) {
 		if (member->old_uuid_ttl > 0)
 			--member->old_uuid_ttl;
+		if (member->payload_ttl > 0)
+			--member->payload_ttl;
 		if (--member->status_ttl == 0) {
 			rlist_del_entry(member, in_queue_events);
 			cached_round_msg_invalidate(swim);
@@ -989,7 +1053,9 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
 		}
 		if (old_member == NULL) {
 			member = swim_member_new(swim, &def->addr, &def->uuid,
-						 def->status, def->incarnation);
+						 def->status, def->incarnation,
+						 def->payload,
+						 def->payload_size);
 		} else if (swim_member_update_uuid(old_member, &def->uuid,
 						   swim) == 0) {
 			member = old_member;
@@ -1002,6 +1068,13 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
 			swim_member_update_addr(member, &def->addr, swim);
 			swim_member_update_status(member, def->status,
 						  def->incarnation, swim);
+			if (def->is_payload_specified &&
+			    swim_member_update_payload(member, def->payload,
+						       def->payload_size,
+						       swim) != 0) {
+				/* Not such a critical error. */
+				diag_log();
+			}
 			if (old_member != NULL) {
 				assert(member != old_member);
 				swim_member_delete(swim, old_member);
@@ -1256,7 +1329,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 			return -1;
 		}
 		swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE,
-					     0);
+					     0, NULL, 0);
 		if (swim->self == NULL)
 			return -1;
 	} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
@@ -1323,6 +1396,18 @@ swim_check_is_configured(const struct swim *swim, const char *msg_pref)
 	return -1;
 }
 
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size)
+{
+	if (payload_size > MAX_PAYLOAD_SIZE) {
+		diag_set(IllegalParams, "Payload should be <= %d",
+			 MAX_PAYLOAD_SIZE);
+		return -1;
+	}
+	return swim_member_update_payload(swim->self, payload, payload_size,
+					  swim);
+}
+
 int
 swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
 {
@@ -1334,7 +1419,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
 		return -1;
 	struct swim_member *member = swim_find_member(swim, uuid);
 	if (member == NULL) {
-		member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0);
+		member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0,
+					 NULL, 0);
 		return member == NULL ? -1 : 0;
 	}
 	diag_set(SwimError, "%s a member with such UUID already exists",
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 9d21a739d..dced172c0 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -67,6 +67,10 @@ int
 swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 	 double ack_timeout, const struct tt_uuid *uuid);
 
+/** Set payload to disseminate over the cluster. */
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size);
+
 /**
  * Stop listening and broadcasting messages, cleanup all internal
  * structures, free memory.
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index e31c67682..284d35695 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -115,21 +115,36 @@ swim_decode_port(struct sockaddr_in *address, const char **pos, const char *end,
 	return 0;
 }
 
-int
-swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end,
-		 const char *msg_pref, const char *param_name)
+static inline int
+swim_decode_bin(const char **bin, uint32_t *size, const char **pos,
+		const char *end, const char *msg_pref, const char *param_name)
 {
 	if (mp_typeof(**pos) != MP_BIN || mp_check_binl(*pos, end) > 0) {
 		diag_set(SwimError, "%s %s should be bin", msg_pref,
 			 param_name);
 		return -1;
 	}
-	if (mp_decode_binl(pos) != UUID_LEN || *pos + UUID_LEN > end) {
+	*bin = mp_decode_bin(pos, size);
+	if (*pos > end) {
+		diag_set(SwimError, "%s %s is invalid", msg_pref, param_name);
+		return -1;
+	}
+	return 0;
+}
+
+int
+swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end,
+		 const char *msg_pref, const char *param_name)
+{
+	uint32_t size;
+	const char *bin;
+	if (swim_decode_bin(&bin, &size, pos, end, msg_pref, param_name) != 0)
+		return -1;
+	if (size != UUID_LEN) {
 		diag_set(SwimError, "%s %s is invalid", msg_pref, param_name);
 		return -1;
 	}
-	memcpy(uuid, *pos, UUID_LEN);
-	*pos += UUID_LEN;
+	memcpy(uuid, bin, UUID_LEN);
 	return 0;
 }
 
@@ -157,6 +172,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
 		       struct swim_member_def *def)
 {
 	uint64_t tmp;
+	uint32_t len;
 	switch (key) {
 	case SWIM_MEMBER_STATUS:
 		if (swim_decode_uint(pos, end, &tmp, msg_pref,
@@ -194,6 +210,18 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
 				     "member old uuid") != 0)
 			return -1;
 		break;
+	case SWIM_MEMBER_PAYLOAD:
+		if (swim_decode_bin(&def->payload, &len, pos, end, msg_pref,
+				    "member payload") != 0)
+			return -1;
+		if (len > MAX_PAYLOAD_SIZE) {
+			diag_set(SwimError, "%s member payload size should be "\
+				 "<= %d", msg_pref, MAX_PAYLOAD_SIZE);
+			return -1;
+		}
+		def->payload_size = (int) len;
+		def->is_payload_specified = true;
+		break;
 	default:
 		unreachable();
 	}
@@ -317,13 +345,15 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
 void
 swim_member_bin_fill(struct swim_member_bin *header,
 		     const struct sockaddr_in *addr, const struct tt_uuid *uuid,
-		     enum swim_member_status status, uint64_t incarnation)
+		     enum swim_member_status status, uint64_t incarnation,
+		     uint16_t payload_size)
 {
 	header->v_status = status;
 	header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
 	header->v_port = mp_bswap_u16(addr->sin_port);
 	memcpy(header->v_uuid, uuid, UUID_LEN);
 	header->v_incarnation = mp_bswap_u64(incarnation);
+	header->v_payload_size = mp_bswap_u16(payload_size);
 }
 
 void
@@ -340,6 +370,8 @@ swim_member_bin_create(struct swim_member_bin *header)
 	header->m_uuid_len = UUID_LEN;
 	header->k_incarnation = SWIM_MEMBER_INCARNATION;
 	header->m_incarnation = 0xcf;
+	header->k_payload = SWIM_MEMBER_PAYLOAD;
+	header->m_payload_size = 0xc5;
 }
 
 void
@@ -370,9 +402,9 @@ void
 swim_event_bin_fill(struct swim_event_bin *header,
 		    enum swim_member_status status,
 		    const struct sockaddr_in *addr, const struct tt_uuid *uuid,
-		    uint64_t incarnation, int old_uuid_ttl)
+		    uint64_t incarnation, int old_uuid_ttl, int payload_ttl)
 {
-	header->m_header = 0x85 + (old_uuid_ttl > 0);
+	header->m_header = 0x85 + (old_uuid_ttl > 0) + (payload_ttl > 0);
 	header->v_status = status;
 	header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
 	header->v_port = mp_bswap_u16(addr->sin_port);
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index a3dc1164e..353605c35 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -35,6 +35,11 @@
 #include <arpa/inet.h>
 #include <stdbool.h>
 
+enum {
+	/** Reserve 272 bytes for headers. */
+	MAX_PAYLOAD_SIZE = 1200,
+};
+
 /**
  * SWIM binary protocol structures and helpers. Below is a picture
  * of a SWIM message template:
@@ -108,6 +113,13 @@ struct swim_member_def {
 	struct sockaddr_in addr;
 	uint64_t incarnation;
 	enum swim_member_status status;
+	const char *payload;
+	int payload_size;
+	/**
+	 * Zero payload size does not mean that payload is not
+	 * specified. It can be just empty.
+	 */
+	bool is_payload_specified;
 };
 
 /** Initialize the definition with default values. */
@@ -247,6 +259,7 @@ enum swim_member_key {
 	SWIM_MEMBER_UUID,
 	SWIM_MEMBER_INCARNATION,
 	SWIM_MEMBER_OLD_UUID,
+	SWIM_MEMBER_PAYLOAD,
 	swim_member_key_MAX,
 };
 
@@ -301,6 +314,13 @@ struct PACKED swim_member_bin {
 	/** mp_encode_uint(64bit incarnation) */
 	uint8_t m_incarnation;
 	uint64_t v_incarnation;
+
+	/** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */
+	uint8_t k_payload;
+	/** mp_encode_bin(16bit bin header) */
+	uint8_t m_payload_size;
+	uint16_t v_payload_size;
+	/** Payload data ... */
 };
 
 /** Initialize antri-entropy record. */
@@ -316,7 +336,8 @@ swim_member_bin_create(struct swim_member_bin *header);
 void
 swim_member_bin_fill(struct swim_member_bin *header,
 		     const struct sockaddr_in *addr, const struct tt_uuid *uuid,
-		     enum swim_member_status status, uint64_t incarnation);
+		     enum swim_member_status status, uint64_t incarnation,
+		     uint16_t payload_size);
 
 /** }}}                  Anti-entropy component                 */
 
@@ -338,7 +359,7 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header,
 
 /** SWIM event MessagePack template. */
 struct PACKED swim_event_bin {
-	/** mp_encode_map(5 or 6) */
+	/** mp_encode_map(5, or 6, or 7) */
 	uint8_t m_header;
 
 	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -386,7 +407,7 @@ void
 swim_event_bin_fill(struct swim_event_bin *header,
 		    enum swim_member_status status,
 		    const struct sockaddr_in *addr, const struct tt_uuid *uuid,
-		    uint64_t incarnation, int old_uuid_ttl);
+		    uint64_t incarnation, int old_uuid_ttl, int payload_ttl);
 
 /** Optional attribute of an event - old UUID of a member. */
 struct swim_old_uuid_bin {
-- 
2.17.2 (Apple Git-113)

  parent reply	other threads:[~2019-01-30 21:28 UTC|newest]

Thread overview: 23+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 01/12] sio: introduce sio_uri_to_addr Vladislav Shpilevoy
2019-02-15 13:21   ` [tarantool-patches] " Konstantin Osipov
2019-02-15 21:22     ` [tarantool-patches] " Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 10/12] [RAW] swim: introduce 'quit' message Vladislav Shpilevoy
2019-02-21 12:13   ` [tarantool-patches] " Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 11/12] [RAW] swim: introduce broadcast tasks Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 12/12] [RAW] swim: allow to use broadcast tasks to send pings Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 02/12] evio: expose evio_setsockopt_server function Vladislav Shpilevoy
2019-02-15 13:21   ` [tarantool-patches] " Konstantin Osipov
2019-02-15 21:22     ` [tarantool-patches] " Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 03/12] rlist: introduce rlist_add_tail_entry_sorted Vladislav Shpilevoy
2019-02-15 13:26   ` [tarantool-patches] " Konstantin Osipov
2019-02-15 13:34     ` [tarantool-patches] " Vladislav Shpilevoy
2019-02-15 18:07       ` Konstantin Osipov
2019-01-30 21:28 ` [PATCH v4 04/12] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
2019-02-21 18:35   ` [tarantool-patches] " Konstantin Osipov
2019-02-26 18:28     ` [tarantool-patches] " Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 05/12] [RAW] swim: introduce failure detection component Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 06/12] [RAW] swim: introduce dissemination component Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 07/12] [RAW] swim: keep encoded round message cached Vladislav Shpilevoy
2019-01-30 21:28 ` Vladislav Shpilevoy [this message]
2019-01-30 21:28 ` [PATCH v4 09/12] [RAW] swim: introduce routing Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=2978d7e5238a77fed54c4c61d3a892c586747c35.1548883137.git.v.shpilevoy@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [PATCH v4 08/12] [RAW] swim: introduce payload' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox