[PATCH v3 6/6] [RAW] swim: introduce payload

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Dec 29 13:14:15 MSK 2018


Part of #3234
---
 src/lib/swim/swim.c | 139 ++++++++++++++++++++++++++++++++++++++++----
 src/lib/swim/swim.h |   4 ++
 2 files changed, 131 insertions(+), 12 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 7dff22dd5..fa2ae0273 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -105,6 +105,8 @@ enum {
 	 * of failed pings.
 	 */
 	NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
+	/** Reserve 272 bytes for headers. */
+	MAX_PAYLOAD_SIZE = 1200,
 };
 
 /**
@@ -200,6 +202,16 @@ struct swim_member {
 	 * learn its dead status.
 	 */
 	int status_ttl;
+	/** Arbitrary user data, disseminated on each change. */
+	char *payload;
+	/** Useless formal comment: payload size. */
+	int payload_size;
+	/**
+	 * TTL of payload. At most this number of times payload is
+	 * sent as a part of dissemination component. Reset on
+	 * each update.
+	 */
+	int payload_ttl;
 	/**
 	 * Events are put into a queue sorted by event occurrence
 	 * time.
@@ -392,6 +404,7 @@ enum swim_member_key {
 	SWIM_MEMBER_ADDRESS,
 	SWIM_MEMBER_PORT,
 	SWIM_MEMBER_INCARNATION,
+	SWIM_MEMBER_PAYLOAD,
 	swim_member_key_MAX,
 };
 
@@ -415,7 +428,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
 
 /** SWIM member MsgPack template. */
 struct PACKED swim_member_bin {
-	/** mp_encode_map(4) */
+	/** mp_encode_map(5) */
 	uint8_t m_header;
 
 	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -440,6 +453,13 @@ struct PACKED swim_member_bin {
 	/** mp_encode_uint(64bit incarnation) */
 	uint8_t m_incarnation;
 	uint64_t v_incarnation;
+
+	/** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */
+	uint8_t k_payload;
+	/** mp_encode_bin(16bit bin header) */
+	uint8_t m_payload_size;
+	uint16_t v_payload_size;
+	/** Payload data ... */
 };
 
 static inline void
@@ -450,12 +470,13 @@ swim_member_bin_reset(struct swim_member_bin *header,
 	header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
 	header->v_port = mp_bswap_u16(member->addr.sin_port);
 	header->v_incarnation = mp_bswap_u64(member->incarnation);
+	header->v_payload_size = mp_bswap_u16(member->payload_size);
 }
 
 static inline void
 swim_member_bin_create(struct swim_member_bin *header)
 {
-	header->m_header = 0x84;
+	header->m_header = 0x85;
 	header->k_status = SWIM_MEMBER_STATUS;
 	header->k_addr = SWIM_MEMBER_ADDRESS;
 	header->m_addr = 0xce;
@@ -463,6 +484,8 @@ swim_member_bin_create(struct swim_member_bin *header)
 	header->m_port = 0xcd;
 	header->k_incarnation = SWIM_MEMBER_INCARNATION;
 	header->m_incarnation = 0xcf;
+	header->k_payload = SWIM_MEMBER_PAYLOAD;
+	header->m_payload_size = 0xc5;
 }
 
 /** }}}                  Anti-entropy component                 */
@@ -488,7 +511,7 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
 
 /** SWIM event MsgPack template. */
 struct PACKED swim_event_bin {
-	/** mp_encode_map(4) */
+	/** mp_encode_map(4 or 5) */
 	uint8_t m_header;
 
 	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -518,7 +541,6 @@ struct PACKED swim_event_bin {
 static inline void
 swim_event_bin_create(struct swim_event_bin *header)
 {
-	header->m_header = 0x84;
 	header->k_status = SWIM_MEMBER_STATUS;
 	header->k_addr = SWIM_MEMBER_ADDRESS;
 	header->m_addr = 0xce;
@@ -531,6 +553,7 @@ swim_event_bin_create(struct swim_event_bin *header)
 static inline void
 swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
 {
+	header->m_header = 0x84 + member->payload_ttl > 0;
 	header->v_status = member->status;
 	header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
 	header->v_port = mp_bswap_u16(member->addr.sin_port);
@@ -594,6 +617,14 @@ swim_member_status_is_updated(struct swim *swim, struct swim_member *member)
 	cached_round_msg_invalidate(swim);
 }
 
+static void
+swim_member_payload_is_updated(struct swim *swim, struct swim_member *member)
+{
+	swim_schedule_event(swim, member);
+	member->payload_ttl = mh_size(swim->members);
+	cached_round_msg_invalidate(swim);
+}
+
 /**
  * Update status and incarnation of the member if needed. Statuses
  * are compared as a compound key: {incarnation, status}. So @a
@@ -622,6 +653,28 @@ swim_member_update_status(struct swim *swim, struct swim_member *member,
 	}
 }
 
+static inline int
+swim_member_update_payload(struct swim *swim, struct swim_member *member,
+			   uint64_t incarnation, const char *payload,
+			   int payload_size)
+{
+	if (incarnation < member->incarnation)
+		return 0;
+	if (payload_size == member->payload_size &&
+	    memcmp(payload, member->payload, payload_size) == 0)
+		return 0;
+	char *new_payload = (char *) realloc(member->payload, payload_size);
+	if (new_payload == NULL) {
+		diag_set(OutOfMemory, payload_size, "malloc", "new_payload");
+		return -1;
+	}
+	memcpy(new_payload, payload, payload_size);
+	member->payload = new_payload;
+	member->payload_size = payload_size;
+	swim_member_payload_is_updated(swim, member);
+	return 0;
+}
+
 /**
  * Remove the member from all queues, hashes, destroy it and free
  * the memory.
@@ -653,7 +706,8 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
  */
 static struct swim_member *
 swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
-		enum swim_member_status status, uint64_t incarnation)
+		enum swim_member_status status, uint64_t incarnation,
+		const char *payload, int payload_size)
 {
 	struct swim_member *member =
 		(struct swim_member *) calloc(1, sizeof(*member));
@@ -683,6 +737,12 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
 	/* Dissemination component. */
 	rlist_create(&member->in_queue_events);
 	swim_member_status_is_updated(swim, member);
+	if (swim_member_update_payload(swim, member, incarnation, payload,
+				       payload_size) != 0) {
+		rlist_del_entry(member, in_queue_events);
+		swim_member_delete(swim, member);
+		return NULL;
+	}
 
 	return member;
 }
@@ -769,12 +829,15 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg)
 
 	swim_member_bin_create(&member_bin);
 	for (; i < (int) mh_size(swim->members); ++i) {
-		char *pos = swim_packet_alloc(packet, sizeof(member_bin));
+		struct swim_member *member = swim->shuffled_members[i];
+		char *pos = swim_packet_alloc(packet, sizeof(member_bin) +
+					      member->payload_size);
 		if (pos == NULL)
 			break;
-		struct swim_member *member = swim->shuffled_members[i];
 		swim_member_bin_reset(&member_bin, member);
 		memcpy(pos, &member_bin, sizeof(member_bin));
+		pos += sizeof(member_bin);
+		memcpy(pos, member->payload, member->payload_size);
 	}
 	if (i == 0)
 		return 0;
@@ -828,11 +891,22 @@ swim_encode_dissemination_packet(struct swim_msg *msg, struct rlist **queue_pos)
 	struct swim_event_bin event_bin;
 	swim_event_bin_create(&event_bin);
 	rlist_foreach_entry(member, *queue_pos, in_queue_events) {
-		char *pos = swim_packet_alloc(packet, sizeof(event_bin));
+		int size = sizeof(event_bin);
+		if (member->payload_ttl > 0) {
+			size += mp_sizeof_uint(SWIM_MEMBER_PAYLOAD) +
+				mp_sizeof_bin(member->payload_size);
+		}
+		char *pos = swim_packet_alloc(packet, size);
 		if (pos == NULL)
 			break;
 		swim_event_bin_reset(&event_bin, member);
 		memcpy(pos, &event_bin, sizeof(event_bin));
+		pos += sizeof(event_bin);
+		if (member->payload_ttl > 0) {
+			pos = mp_encode_uint(pos, SWIM_MEMBER_PAYLOAD);
+			mp_encode_bin(pos, member->payload,
+				      member->payload_size);
+		}
 		++i;
 		prev = member;
 	}
@@ -904,6 +978,10 @@ swim_decrease_events_ttl(struct swim *swim)
 	struct swim_member *member, *tmp;
 	rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
 				 tmp) {
+		assert(member->status_ttl > 0);
+		assert(member->status_ttl >= member->payload_ttl);
+		if (member->payload_ttl > 0)
+			--member->payload_ttl;
 		if (--member->status_ttl == 0) {
 			rlist_del_entry(member, in_queue_events);
 			cached_round_msg_invalidate(swim);
@@ -1029,6 +1107,8 @@ struct swim_member_def {
 	struct sockaddr_in addr;
 	uint64_t incarnation;
 	enum swim_member_status status;
+	const char *payload;
+	int payload_size;
 };
 
 static inline void
@@ -1048,7 +1128,8 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
 	 */
 	if (member == NULL) {
 		member = swim_member_new(swim, &def->addr, def->status,
-					 def->incarnation);
+					 def->incarnation, def->payload,
+					 def->payload_size);
 		if (member == NULL)
 			diag_log();
 		return;
@@ -1057,6 +1138,10 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
 	if (member != self) {
 		swim_member_update_status(swim, member, def->status,
 					  def->incarnation);
+		if (swim_member_update_payload(swim, member, def->incarnation,
+					       def->payload,
+					       def->payload_size) != 0)
+			diag_log();
 		return;
 	}
 	uint64_t old_incarnation = self->incarnation;
@@ -1131,6 +1216,21 @@ swim_process_member_key(enum swim_member_key key, const char **pos,
 		}
 		def->incarnation = mp_decode_uint(pos);
 		break;
+	case SWIM_MEMBER_PAYLOAD:
+		if (mp_typeof(**pos) != MP_BIN ||\
+		    mp_check_binl(*pos, end) > 0) {
+			say_error("%s member payload should be bin", msg_pref);
+			return -1;
+		}
+		uint32_t len;
+		def->payload = mp_decode_bin(pos, &len);
+		if (len > MAX_PAYLOAD_SIZE) {
+			say_error("%s member payload size should be <= %d",
+				  msg_pref, MAX_PAYLOAD_SIZE);
+			return -1;
+		}
+		def->payload_size = (int) len;
+		break;
 	default:
 		unreachable();
 	}
@@ -1245,7 +1345,8 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
 	}
 	struct swim_member *sender = swim_find_member(swim, src);
 	if (sender == NULL) {
-		sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation);
+		sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation,
+					 NULL, 0);
 		if (sender == NULL) {
 			diag_log();
 			return 0;
@@ -1441,7 +1542,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 		return -1;
 	struct swim_member *new_self = NULL;
 	if (swim_find_member(swim, &addr) == NULL) {
-		new_self = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
+		new_self = swim_member_new(swim, &addr, MEMBER_ALIVE, 0, NULL,
+					   0);
 		if (new_self == NULL)
 			return -1;
 	}
@@ -1466,6 +1568,19 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 	return 0;
 }
 
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size)
+{
+	if (payload_size > MAX_PAYLOAD_SIZE) {
+		diag_set(IllegalParams, "Payload should be <= %d",
+			 MAX_PAYLOAD_SIZE);
+		return -1;
+	}
+	return swim_member_update_payload(swim, swim->self,
+					  swim->self->incarnation, payload,
+					  payload_size);
+}
+
 int
 swim_add_member(struct swim *swim, const char *uri)
 {
@@ -1474,7 +1589,7 @@ swim_add_member(struct swim *swim, const char *uri)
 		return -1;
 	struct swim_member *member = swim_find_member(swim, &addr);
 	if (member == NULL) {
-		member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
+		member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0, NULL, 0);
 		if (member == NULL)
 			return -1;
 		member->is_pinned = true;
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 350fa0cee..a44fcb977 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -65,6 +65,10 @@ int
 swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 	 const struct swim_transport *new_transport);
 
+/** Set payload to disseminate over the cluster. */
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size);
+
 /**
  * Stop listening and broadcasting messages, cleanup all internal
  * structures, free memory.
-- 
2.17.2 (Apple Git-113)




More information about the Tarantool-patches mailing list