[PATCH v4 08/12] [RAW] swim: introduce payload

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu Jan 31 00:28:37 MSK 2019


Payload is an arbitrary user data, disseminated just like other
member attributes.

Part of #3234
---
 src/lib/swim/swim.c       | 102 +++++++++++++++++++++++++++++++++++---
 src/lib/swim/swim.h       |   4 ++
 src/lib/swim/swim_proto.c |  50 +++++++++++++++----
 src/lib/swim/swim_proto.h |  27 ++++++++--
 4 files changed, 163 insertions(+), 20 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 40faa296e..78dbc6092 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -259,6 +259,16 @@ struct swim_member {
 	 * learn its dead status.
 	 */
 	int status_ttl;
+	/** Arbitrary user data, disseminated on each change. */
+	char *payload;
+	/** Payload size, in bytes. */
+	int payload_size;
+	/**
+	 * TTL of payload. At most this number of times payload is
+	 * sent as a part of dissemination component. Reset on
+	 * each update.
+	 */
+	int payload_ttl;
 	/**
 	 * Events are put into a queue sorted by event occurrence
 	 * time.
@@ -415,6 +425,14 @@ swim_member_uuid_is_updated(struct swim_member *member, struct swim *swim)
 	swim_schedule_event(swim, member);
 }
 
+/** Make all needed actions to process member's payload update. */
+static void
+swim_member_payload_is_updated(struct swim_member *member, struct swim *swim)
+{
+	member->payload_ttl = mh_size(swim->members);
+	swim_schedule_event(swim, member);
+}
+
 /**
  * Update status and incarnation of the member if needed. Statuses
  * are compared as a compound key: {incarnation, status}. So @a
@@ -458,6 +476,31 @@ swim_by_scheduler(struct swim_scheduler *scheduler)
 	return container_of(scheduler, struct swim, scheduler);
 }
 
+/**
+ * Update members payload if necessary. If a payload is the same -
+ * nothing happens. Fortunately, memcmp here is not expensive,
+ * because 1) payload change is extra rare event usually,
+ * 2) max payload size is very limited.
+ */
+static inline int
+swim_member_update_payload(struct swim_member *member, const char *payload,
+			   int payload_size, struct swim *swim)
+{
+	if (payload_size == member->payload_size &&
+	    memcmp(payload, member->payload, payload_size) == 0)
+		return 0;
+	char *new_payload = (char *) realloc(member->payload, payload_size);
+	if (new_payload == NULL) {
+		diag_set(OutOfMemory, payload_size, "realloc", "new_payload");
+		return -1;
+	}
+	memcpy(new_payload, payload, payload_size);
+	member->payload = new_payload;
+	member->payload_size = payload_size;
+	swim_member_payload_is_updated(member, swim);
+	return 0;
+}
+
 /**
  * Remove the member from all queues, hashes, destroy it and free
  * the memory.
@@ -480,6 +523,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
 
 	/* Dissemination component. */
 	rlist_del_entry(member, in_queue_events);
+	free(member->payload);
 
 	free(member);
 }
@@ -522,7 +566,7 @@ swim_ping_task_complete(struct swim_task *task,
 static struct swim_member *
 swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
 		const struct tt_uuid *uuid, enum swim_member_status status,
-		uint64_t incarnation)
+		uint64_t incarnation, const char *payload, int payload_size)
 {
 	struct swim_member *member =
 		(struct swim_member *) calloc(1, sizeof(*member));
@@ -553,6 +597,11 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
 	/* Dissemination component. */
 	rlist_create(&member->in_queue_events);
 	swim_member_status_is_updated(member, swim);
+	if (swim_member_update_payload(member, payload, payload_size,
+				       swim) != 0) {
+		swim_member_delete(swim, member);
+		return NULL;
+	}
 
 	say_verbose("SWIM: member %s is added", swim_uuid_str(uuid));
 	return member;
@@ -632,14 +681,17 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
 	for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
 	     i < member_count; ++i) {
 		struct swim_member *m = *mh_swim_table_node(t, rc);
-		int new_size = size + sizeof(member_bin);
+		int new_size = size + sizeof(member_bin) + m->payload_size;
 		char *pos = swim_packet_reserve(packet, new_size);
 		if (pos == NULL)
 			break;
 		size = new_size;
 		swim_member_bin_fill(&member_bin, &m->addr, &m->uuid,
-				     m->status, m->incarnation);
+				     m->status, m->incarnation,
+				     m->payload_size);
 		memcpy(pos, &member_bin, sizeof(member_bin));
+		pos += sizeof(member_bin);
+		memcpy(pos, m->payload, m->payload_size);
 		/*
 		 * First random member could be choosen too close
 		 * to the hash end. Here the cycle is wrapped, if
@@ -718,17 +770,27 @@ swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
 		int new_size = size + sizeof(event_bin);
 		if (m->old_uuid_ttl > 0)
 			new_size += sizeof(old_uuid_bin);
+		if (m->payload_ttl > 0) {
+			new_size += mp_sizeof_uint(SWIM_MEMBER_PAYLOAD) +
+				    mp_sizeof_bin(m->payload_size);
+		}
 		char *pos = swim_packet_reserve(packet, new_size);
 		if (pos == NULL)
 			break;
 		size = new_size;
 		swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid,
-				    m->incarnation, m->old_uuid_ttl);
+				    m->incarnation, m->old_uuid_ttl,
+				    m->payload_ttl);
 		memcpy(pos, &event_bin, sizeof(event_bin));
+		pos += sizeof(event_bin);
 		if (m->old_uuid_ttl > 0) {
-			pos += sizeof(event_bin);
 			swim_old_uuid_bin_fill(&old_uuid_bin, &m->old_uuid);
 			memcpy(pos, &old_uuid_bin, sizeof(old_uuid_bin));
+			pos += sizeof(old_uuid_bin);
+		}
+		if (m->payload_ttl > 0) {
+			pos = mp_encode_uint(pos, SWIM_MEMBER_PAYLOAD);
+			mp_encode_bin(pos, m->payload, m->payload_size);
 		}
 		++i;
 	}
@@ -778,6 +840,8 @@ swim_decrease_events_ttl(struct swim *swim)
 				 tmp) {
 		if (member->old_uuid_ttl > 0)
 			--member->old_uuid_ttl;
+		if (member->payload_ttl > 0)
+			--member->payload_ttl;
 		if (--member->status_ttl == 0) {
 			rlist_del_entry(member, in_queue_events);
 			cached_round_msg_invalidate(swim);
@@ -989,7 +1053,9 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
 		}
 		if (old_member == NULL) {
 			member = swim_member_new(swim, &def->addr, &def->uuid,
-						 def->status, def->incarnation);
+						 def->status, def->incarnation,
+						 def->payload,
+						 def->payload_size);
 		} else if (swim_member_update_uuid(old_member, &def->uuid,
 						   swim) == 0) {
 			member = old_member;
@@ -1002,6 +1068,13 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
 			swim_member_update_addr(member, &def->addr, swim);
 			swim_member_update_status(member, def->status,
 						  def->incarnation, swim);
+			if (def->is_payload_specified &&
+			    swim_member_update_payload(member, def->payload,
+						       def->payload_size,
+						       swim) != 0) {
+				/* Not such a critical error. */
+				diag_log();
+			}
 			if (old_member != NULL) {
 				assert(member != old_member);
 				swim_member_delete(swim, old_member);
@@ -1256,7 +1329,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 			return -1;
 		}
 		swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE,
-					     0);
+					     0, NULL, 0);
 		if (swim->self == NULL)
 			return -1;
 	} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
@@ -1323,6 +1396,18 @@ swim_check_is_configured(const struct swim *swim, const char *msg_pref)
 	return -1;
 }
 
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size)
+{
+	if (payload_size > MAX_PAYLOAD_SIZE) {
+		diag_set(IllegalParams, "Payload should be <= %d",
+			 MAX_PAYLOAD_SIZE);
+		return -1;
+	}
+	return swim_member_update_payload(swim->self, payload, payload_size,
+					  swim);
+}
+
 int
 swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
 {
@@ -1334,7 +1419,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
 		return -1;
 	struct swim_member *member = swim_find_member(swim, uuid);
 	if (member == NULL) {
-		member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0);
+		member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0,
+					 NULL, 0);
 		return member == NULL ? -1 : 0;
 	}
 	diag_set(SwimError, "%s a member with such UUID already exists",
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 9d21a739d..dced172c0 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -67,6 +67,10 @@ int
 swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 	 double ack_timeout, const struct tt_uuid *uuid);
 
+/** Set payload to disseminate over the cluster. */
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size);
+
 /**
  * Stop listening and broadcasting messages, cleanup all internal
  * structures, free memory.
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index e31c67682..284d35695 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -115,21 +115,36 @@ swim_decode_port(struct sockaddr_in *address, const char **pos, const char *end,
 	return 0;
 }
 
-int
-swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end,
-		 const char *msg_pref, const char *param_name)
+static inline int
+swim_decode_bin(const char **bin, uint32_t *size, const char **pos,
+		const char *end, const char *msg_pref, const char *param_name)
 {
 	if (mp_typeof(**pos) != MP_BIN || mp_check_binl(*pos, end) > 0) {
 		diag_set(SwimError, "%s %s should be bin", msg_pref,
 			 param_name);
 		return -1;
 	}
-	if (mp_decode_binl(pos) != UUID_LEN || *pos + UUID_LEN > end) {
+	*bin = mp_decode_bin(pos, size);
+	if (*pos > end) {
+		diag_set(SwimError, "%s %s is invalid", msg_pref, param_name);
+		return -1;
+	}
+	return 0;
+}
+
+int
+swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end,
+		 const char *msg_pref, const char *param_name)
+{
+	uint32_t size;
+	const char *bin;
+	if (swim_decode_bin(&bin, &size, pos, end, msg_pref, param_name) != 0)
+		return -1;
+	if (size != UUID_LEN) {
 		diag_set(SwimError, "%s %s is invalid", msg_pref, param_name);
 		return -1;
 	}
-	memcpy(uuid, *pos, UUID_LEN);
-	*pos += UUID_LEN;
+	memcpy(uuid, bin, UUID_LEN);
 	return 0;
 }
 
@@ -157,6 +172,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
 		       struct swim_member_def *def)
 {
 	uint64_t tmp;
+	uint32_t len;
 	switch (key) {
 	case SWIM_MEMBER_STATUS:
 		if (swim_decode_uint(pos, end, &tmp, msg_pref,
@@ -194,6 +210,18 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
 				     "member old uuid") != 0)
 			return -1;
 		break;
+	case SWIM_MEMBER_PAYLOAD:
+		if (swim_decode_bin(&def->payload, &len, pos, end, msg_pref,
+				    "member payload") != 0)
+			return -1;
+		if (len > MAX_PAYLOAD_SIZE) {
+			diag_set(SwimError, "%s member payload size should be "\
+				 "<= %d", msg_pref, MAX_PAYLOAD_SIZE);
+			return -1;
+		}
+		def->payload_size = (int) len;
+		def->is_payload_specified = true;
+		break;
 	default:
 		unreachable();
 	}
@@ -317,13 +345,15 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
 void
 swim_member_bin_fill(struct swim_member_bin *header,
 		     const struct sockaddr_in *addr, const struct tt_uuid *uuid,
-		     enum swim_member_status status, uint64_t incarnation)
+		     enum swim_member_status status, uint64_t incarnation,
+		     uint16_t payload_size)
 {
 	header->v_status = status;
 	header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
 	header->v_port = mp_bswap_u16(addr->sin_port);
 	memcpy(header->v_uuid, uuid, UUID_LEN);
 	header->v_incarnation = mp_bswap_u64(incarnation);
+	header->v_payload_size = mp_bswap_u16(payload_size);
 }
 
 void
@@ -340,6 +370,8 @@ swim_member_bin_create(struct swim_member_bin *header)
 	header->m_uuid_len = UUID_LEN;
 	header->k_incarnation = SWIM_MEMBER_INCARNATION;
 	header->m_incarnation = 0xcf;
+	header->k_payload = SWIM_MEMBER_PAYLOAD;
+	header->m_payload_size = 0xc5;
 }
 
 void
@@ -370,9 +402,9 @@ void
 swim_event_bin_fill(struct swim_event_bin *header,
 		    enum swim_member_status status,
 		    const struct sockaddr_in *addr, const struct tt_uuid *uuid,
-		    uint64_t incarnation, int old_uuid_ttl)
+		    uint64_t incarnation, int old_uuid_ttl, int payload_ttl)
 {
-	header->m_header = 0x85 + (old_uuid_ttl > 0);
+	header->m_header = 0x85 + (old_uuid_ttl > 0) + (payload_ttl > 0);
 	header->v_status = status;
 	header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
 	header->v_port = mp_bswap_u16(addr->sin_port);
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index a3dc1164e..353605c35 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -35,6 +35,11 @@
 #include <arpa/inet.h>
 #include <stdbool.h>
 
+enum {
+	/** Reserve 272 bytes for headers. */
+	MAX_PAYLOAD_SIZE = 1200,
+};
+
 /**
  * SWIM binary protocol structures and helpers. Below is a picture
  * of a SWIM message template:
@@ -108,6 +113,13 @@ struct swim_member_def {
 	struct sockaddr_in addr;
 	uint64_t incarnation;
 	enum swim_member_status status;
+	const char *payload;
+	int payload_size;
+	/**
+	 * Zero payload size does not mean that payload is not
+	 * specified. It can be just empty.
+	 */
+	bool is_payload_specified;
 };
 
 /** Initialize the definition with default values. */
@@ -247,6 +259,7 @@ enum swim_member_key {
 	SWIM_MEMBER_UUID,
 	SWIM_MEMBER_INCARNATION,
 	SWIM_MEMBER_OLD_UUID,
+	SWIM_MEMBER_PAYLOAD,
 	swim_member_key_MAX,
 };
 
@@ -301,6 +314,13 @@ struct PACKED swim_member_bin {
 	/** mp_encode_uint(64bit incarnation) */
 	uint8_t m_incarnation;
 	uint64_t v_incarnation;
+
+	/** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */
+	uint8_t k_payload;
+	/** mp_encode_bin(16bit bin header) */
+	uint8_t m_payload_size;
+	uint16_t v_payload_size;
+	/** Payload data ... */
 };
 
 /** Initialize antri-entropy record. */
@@ -316,7 +336,8 @@ swim_member_bin_create(struct swim_member_bin *header);
 void
 swim_member_bin_fill(struct swim_member_bin *header,
 		     const struct sockaddr_in *addr, const struct tt_uuid *uuid,
-		     enum swim_member_status status, uint64_t incarnation);
+		     enum swim_member_status status, uint64_t incarnation,
+		     uint16_t payload_size);
 
 /** }}}                  Anti-entropy component                 */
 
@@ -338,7 +359,7 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header,
 
 /** SWIM event MessagePack template. */
 struct PACKED swim_event_bin {
-	/** mp_encode_map(5 or 6) */
+	/** mp_encode_map(5, or 6, or 7) */
 	uint8_t m_header;
 
 	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -386,7 +407,7 @@ void
 swim_event_bin_fill(struct swim_event_bin *header,
 		    enum swim_member_status status,
 		    const struct sockaddr_in *addr, const struct tt_uuid *uuid,
-		    uint64_t incarnation, int old_uuid_ttl);
+		    uint64_t incarnation, int old_uuid_ttl, int payload_ttl);
 
 /** Optional attribute of an event - old UUID of a member. */
 struct swim_old_uuid_bin {
-- 
2.17.2 (Apple Git-113)




More information about the Tarantool-patches mailing list