[tarantool-patches] Re: [PATCH 6/6] swim: introduce payload

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu Apr 18 20:43:40 MSK 2019



On 18/04/2019 18:12, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [19/04/12 01:25]:
>> Payload is arbitrary user data disseminated over the cluster
>> along with other member attributes.
>> Part of #3234
>> ---
>>  src/lib/swim/swim.c         | 155 ++++++++++++++++++++++++++--
>>  src/lib/swim/swim.h         |   8 ++
>>  src/lib/swim/swim_proto.c   |  31 +++++-
>>  src/lib/swim/swim_proto.h   |  41 +++++++-
>>  test/unit/swim.c            | 195 +++++++++++++++++++++++++++++++++++-
>>  test/unit/swim.result       |  32 +++++-
>>  test/unit/swim_test_utils.c |  62 ++++++++++++
>>  test/unit/swim_test_utils.h |  18 ++++
>>  8 files changed, 525 insertions(+), 17 deletions(-)>> +	 * whether the member has updated payload, or another
>> +	 * attribute. The only way here is to wait until the most
>> +	 * actual payload will be received from another instance.
>> +	 * Note, that such an instance always exists - the payload
>> +	 * originator instance.
>> +	 */
>> +	bool is_payload_up_to_date;
>> +	/**
>> +	 * TTL of payload. At most this number of times payload is
>> +	 * sent as a part of dissemination component. Reset on
>> +	 * each payload update.
>> +	 */
>> +	int payload_ttl;
> 
> As agreed, let's rename ttl to ttd across the board and update the
> comments to say that ttd is time to disseminate.

Cool, done in a separate commit.

> 
>> +/** Update member's payload, register a corresponding event. */
>> +static inline int
>> +swim_update_member_payload(struct swim *swim, struct swim_member *member,
>> +			   const char *payload, uint16_t payload_size,
>> +			   int incarnation_increment)
> 
> Passing incarnation_increment raises a lot of questions when
> reading the code.

We already use it for address update, it simplified a lot of things,
and most importantly encapsulated incarnation + TTD set into one place.
I tried to avoid this argument for address, but it looked much worse,
when TTD is reset in too many places - it is easy to miss something.
The main goal I pursued was encapsulation of incarnation update and
event registration on small attribute changes. Now I have these
functions hiding that task inside:

    swim_update_member_inc_status
    swim_update_member_payload
    swim_update_member_addr

With your way incarnation and member attribute update business will
spread among literally every function touching members.

> I would not use this function from the
> constructor - I don't see any issue in copy-pasting 3 lines of
> code into the constructor to make both branches simpler.

It is not 3 lines, generally it is 6-7 lines per each swim_new_member()
to call update_payload() afterward. I have 4 invocations of swim_new_member(),
two of them pass payload and its size. Without encapsulated processing of
payload update I would be forced to add more checks into two these two
places with larger code duplication than you thought, which I would not
want to do. These places are swim_upsert_member() and swim_cfg() - the
functions are very sensitive to code duplication in terms of readability.
During SWIM development I tried to keep them as clean and short as possible.

I tried your way right now again, but it just looks much worse. Dropped.

> 
> Or I would ban passing payload to the constructor and make this
> function public.
> 
>>  static int
>>  swim_encode_member(struct swim_packet *packet, struct swim_member *m,
>> -		   struct swim_passport_bin *passport)
>> +		   struct swim_passport_bin *passport,
>> +		   struct swim_member_payload_bin *payload_header,
>> +		   bool is_payload_needed)
> 
> Please rename is_payload_needed  -> encode_payload.

I followed our rules on flags naming: is_<...>. But ok, done.

> There is no
> comment for encode_payload and how it is used.

Added a comment.

> I don't see why you can't move all the decision making about
> whether to encode the payload or not into this function.
> 
> The payload should be encoded if:
> - its not null
> - payload ttl is greater than zero
> - it's not trustworthy. (is_payload_up_to_date). I would btw
>   rename is_payload_up_to_date to is_payload_trustworthy - this
>   name would be closer to truth.
> 
> Why can't you look at these conditions once in swim_encode_member
> rather than evaluate them outside this function?

Because anti-entropy and dissemination have different conditions
when payload should be encoded. For anti-entropy it is enough for
payload be up to date. For dissemination TTD > 0 additionally is
required.

Verbally decided to always check for is_payload_up_to_date inside
the function, and use the argument encode_payload for the
dissemination component only.

> 
> BTW, there is no harm in encoding an empty payload all the time
> (e.g. mp_bin of size 0). This would make the code simpler - you
> would only need to look at payload_size to see if payload exists.

This is what I do. I encode payload even when it is empty and
up-to-date.

> 
>> +int
>> +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size)
>> +{
>> +	if (payload_size > MAX_PAYLOAD_SIZE) {
>> +		diag_set(IllegalParams, "Payload should be <= %d",
>> +			 MAX_PAYLOAD_SIZE);
>> +		return -1;
>> +	}
>> +	return swim_update_member_payload(swim, swim->self, payload,
>> +					  payload_size, 1);
>> +}
> 
> Like I said above, if there is swim_set_paylaod, there is no need
> to supply payload in swim_new_member(). 

Swim_set_payload does not work on foreign members, it takes 'self' only.
During members decoding and update I need to reset payloads of other
members.

> 
>> +	def->payload_size = -1;
> 
> Ugh.

I need a way to detect whether payload was found in a packet
and decoded. It is not sent always, because 1) dissemination
does not send it when TTD == 0 or it is outdated,
2) anti-entropy does not send outdated payloads as well.

> 
> 
> -- 
> Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
> http://tarantool.io - www.twitter.com/kostja_osipov
> 

A new version:

===================================================================
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 3e64e4c91..22760cdd7 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -309,6 +309,44 @@ struct swim_member {
 	 * allow other members to learn the dead status.
 	 */
 	int status_ttd;
+	/** Arbitrary user data, disseminated on each change. */
+	char *payload;
+	/** Payload size, in bytes. */
+	uint16_t payload_size;
+	/**
+	 * True, if the payload is thought to be of the most
+	 * actual version. In such a case it can be disseminated
+	 * further. Otherwise @a payload is suspected to be
+	 * outdated and can be updated in two cases only:
+	 *
+	 * 1) when it is received with a bigger incarnation from
+	 *    anywhere;
+	 *
+	 * 2) when it is received with the same incarnation, but
+	 *    local payload is outdated.
+	 *
+	 * A payload can become outdated, if anyhow a new
+	 * incarnation of the member has been learned, but not a
+	 * new payload. For example, a message with new payload
+	 * could be lost, and at the same time this instance
+	 * responds to a ping with newly incarnated ack. The ack
+	 * receiver will learn the new incarnation, but not the
+	 * new payload.
+	 *
+	 * In this case it can't be said exactly whether the
+	 * member has updated payload, or another attribute. The
+	 * only way here is to wait until the most actual payload
+	 * will be received from another instance. Note, that such
+	 * an instance always exists - the payload originator
+	 * instance.
+	 */
+	bool is_payload_up_to_date;
+	/**
+	 * TTD of payload. At most this number of times payload is
+	 * sent as a part of dissemination component. Reset on
+	 * each payload update.
+	 */
+	int payload_ttd;
 	/**
 	 * All created events are put into a queue sorted by event
 	 * time.
@@ -524,6 +562,34 @@ swim_by_scheduler(struct swim_scheduler *scheduler)
 	return container_of(scheduler, struct swim, scheduler);
 }
 
+/** Update member's payload, register a corresponding event. */
+static inline int
+swim_update_member_payload(struct swim *swim, struct swim_member *member,
+			   const char *payload, uint16_t payload_size,
+			   int incarnation_increment)
+{
+	assert(payload_size <= MAX_PAYLOAD_SIZE);
+	char *new_payload;
+	if (payload_size > 0) {
+		new_payload = (char *) realloc(member->payload, payload_size);
+		if (new_payload == NULL) {
+			diag_set(OutOfMemory, payload_size, "realloc", "new_payload");
+			return -1;
+		}
+		memcpy(new_payload, payload, payload_size);
+	} else {
+		free(member->payload);
+		new_payload = NULL;
+	}
+	member->payload = new_payload;
+	member->payload_size = payload_size;
+	member->payload_ttd = mh_size(swim->members);
+	member->incarnation += incarnation_increment;
+	member->is_payload_up_to_date = true;
+	swim_on_member_update(swim, member);
+	return 0;
+}
+
 /**
  * Once a ping is sent, the member should start waiting for an
  * ACK.
@@ -557,6 +623,7 @@ swim_member_delete(struct swim_member *member)
 
 	/* Dissemination component. */
 	assert(rlist_empty(&member->in_dissemination_queue));
+	free(member->payload);
 
 	free(member);
 }
@@ -638,7 +705,7 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid)
 static struct swim_member *
 swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 		const struct tt_uuid *uuid, enum swim_member_status status,
-		uint64_t incarnation)
+		uint64_t incarnation, const char *payload, int payload_size)
 {
 	int new_bsize = sizeof(swim->shuffled[0]) *
 			(mh_size(swim->members) + 1);
@@ -676,6 +743,12 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 
 	/* Dissemination component. */
 	swim_on_member_update(swim, member);
+	if (payload_size >= 0 &&
+	    swim_update_member_payload(swim, member, payload,
+				       payload_size, 0) != 0) {
+		swim_delete_member(swim, member);
+		return NULL;
+	}
 
 	say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim),
 		    swim_uuid_str(&member->uuid), mh_size(swim->members));
@@ -735,22 +808,40 @@ swim_new_round(struct swim *swim)
 
 /**
  * Encode one member into @a packet using @a passport structure.
+ * Note that this function does not make a decision whether
+ * payload should be encoded, because its callers have different
+ * conditions for that. The anti-entropy needs the payload be
+ * up-to-date. The dissemination component additionally needs
+ * TTD > 0.
  * @retval 0 Success, encoded.
  * @retval -1 Not enough memory in the packet.
  */
 static int
 swim_encode_member(struct swim_packet *packet, struct swim_member *m,
-		   struct swim_passport_bin *passport)
+		   struct swim_passport_bin *passport,
+		   struct swim_member_payload_bin *payload_header,
+		   bool encode_payload)
 {
 	/* The headers should be initialized. */
 	assert(passport->k_status == SWIM_MEMBER_STATUS);
+	assert(payload_header->k_payload == SWIM_MEMBER_PAYLOAD);
 	int size = sizeof(*passport);
+	encode_payload = encode_payload && m->is_payload_up_to_date;
+	if (encode_payload)
+		size += sizeof(*payload_header) + m->payload_size;
 	char *pos = swim_packet_alloc(packet, size);
 	if (pos == NULL)
 		return -1;
 	swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status,
-			       m->incarnation);
+			       m->incarnation, encode_payload);
 	memcpy(pos, passport, sizeof(*passport));
+	if (encode_payload) {
+		pos += sizeof(*passport);
+		swim_member_payload_bin_fill(payload_header, m->payload_size);
+		memcpy(pos, payload_header, sizeof(*payload_header));
+		pos += sizeof(*payload_header);
+		memcpy(pos, m->payload, m->payload_size);
+	}
 	return 0;
 }
 
@@ -764,17 +855,20 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
 {
 	struct swim_anti_entropy_header_bin ae_header_bin;
 	struct swim_passport_bin passport_bin;
+	struct swim_member_payload_bin payload_header;
 	char *header = swim_packet_alloc(packet, sizeof(ae_header_bin));
 	if (header == NULL)
 		return 0;
 	swim_passport_bin_create(&passport_bin);
+	swim_member_payload_bin_create(&payload_header);
 	struct mh_swim_table_t *t = swim->members;
 	int i = 0, member_count = mh_size(t);
 	int rnd = swim_scaled_rand(0, member_count - 1);
 	for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
 	     i < member_count; ++i) {
 		struct swim_member *m = *mh_swim_table_node(t, rc);
-		if (swim_encode_member(packet, m, &passport_bin) != 0)
+		if (swim_encode_member(packet, m, &passport_bin,
+				       &payload_header, true) != 0)
 			break;
 		/*
 		 * First random member could be chosen too close
@@ -834,16 +928,20 @@ static int
 swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
 {
 	struct swim_diss_header_bin diss_header_bin;
+	struct swim_member_payload_bin payload_header;
 	struct swim_passport_bin passport_bin;
 	char *header = swim_packet_alloc(packet, sizeof(diss_header_bin));
 	if (header == NULL)
 		return 0;
 	swim_passport_bin_create(&passport_bin);
+	swim_member_payload_bin_create(&payload_header);
 	int i = 0;
 	struct swim_member *m;
 	rlist_foreach_entry(m, &swim->dissemination_queue,
 			    in_dissemination_queue) {
-		if (swim_encode_member(packet, m, &passport_bin) != 0)
+		if (swim_encode_member(packet, m, &passport_bin,
+				       &payload_header,
+				       m->payload_ttd > 0) != 0)
 			break;
 		++i;
 	}
@@ -891,6 +989,10 @@ swim_decrease_event_ttd(struct swim *swim)
 	rlist_foreach_entry_safe(member, &swim->dissemination_queue,
 				 in_dissemination_queue,
 				 tmp) {
+		if (member->payload_ttd > 0) {
+			if (--member->payload_ttd == 0)
+				swim_cached_round_msg_invalidate(swim);
+		}
 		if (--member->status_ttd == 0) {
 			rlist_del_entry(member, in_dissemination_queue);
 			swim_cached_round_msg_invalidate(swim);
@@ -1066,8 +1168,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def,
 {
 	assert(member != swim->self);
 	assert(def->incarnation >= member->incarnation);
-	if (def->incarnation > member->incarnation)
+	/*
+	 * Payload update rules are simple: it can be updated
+	 * either if the new payload has a bigger incarnation, or
+	 * the same incarnation, but local payload is outdated.
+	 */
+	bool encode_payload = false;
+	if (def->incarnation > member->incarnation) {
 		swim_update_member_addr(swim, member, &def->addr, 0);
+		if (def->payload_size >= 0) {
+			encode_payload = true;
+		} else if (member->is_payload_up_to_date) {
+			member->is_payload_up_to_date = false;
+			swim_on_member_update(swim, member);
+		}
+	} else if (! member->is_payload_up_to_date && def->payload_size >= 0) {
+		encode_payload = true;
+	}
+	if (encode_payload &&
+	    swim_update_member_payload(swim, member, def->payload,
+				       def->payload_size, 0) != 0) {
+		/* Not such a critical error. */
+		diag_log();
+	}
 	swim_update_member_inc_status(swim, member, def->status,
 				      def->incarnation);
 }
@@ -1108,7 +1231,8 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
 			goto skip;
 		}
 		*result = swim_new_member(swim, &def->addr, &def->uuid,
-					  def->status, def->incarnation);
+					  def->status, def->incarnation,
+					  def->payload, def->payload_size);
 		return *result != NULL ? 0 : -1;
 	}
 	*result = member;
@@ -1437,7 +1561,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 			return -1;
 		}
 		swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE,
-					     0);
+					     0, NULL, 0);
 		if (swim->self == NULL)
 			return -1;
 	} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
@@ -1449,7 +1573,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 			return -1;
 		}
 		new_self = swim_new_member(swim, &swim->self->addr, uuid,
-					   MEMBER_ALIVE, 0);
+					   MEMBER_ALIVE, 0, swim->self->payload,
+					   swim->self->payload_size);
 		if (new_self == NULL)
 			return -1;
 	}
@@ -1505,6 +1630,18 @@ swim_is_configured(const struct swim *swim)
 	return swim->self != NULL;
 }
 
+int
+swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size)
+{
+	if (payload_size > MAX_PAYLOAD_SIZE) {
+		diag_set(IllegalParams, "Payload should be <= %d",
+			 MAX_PAYLOAD_SIZE);
+		return -1;
+	}
+	return swim_update_member_payload(swim, swim->self, payload,
+					  payload_size, 1);
+}
+
 int
 swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
 {
@@ -1519,7 +1656,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
 		return -1;
 	struct swim_member *member = swim_find_member(swim, uuid);
 	if (member == NULL) {
-		member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0);
+		member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0,
+					 NULL, -1);
 		return member == NULL ? -1 : 0;
 	}
 	diag_set(SwimError, "%s a member with such UUID already exists",
@@ -1755,3 +1893,10 @@ swim_member_incarnation(const struct swim_member *member)
 {
 	return member->incarnation;
 }
+
+const char *
+swim_member_payload(const struct swim_member *member, uint16_t *size)
+{
+	*size = member->payload_size;
+	return member->payload;
+}
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 09d933b83..6a219d131 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -104,6 +104,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 double
 swim_ack_timeout(const struct swim *swim);
 
+/** Set payload to disseminate over the cluster. */
+int
+swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size);
+
 /**
  * Stop listening and broadcasting messages, cleanup all internal
  * structures, free memory.
@@ -220,6 +224,10 @@ swim_member_uuid(const struct swim_member *member);
 uint64_t
 swim_member_incarnation(const struct swim_member *member);
 
+/** Member's payload. */
+const char *
+swim_member_payload(const struct swim_member *member, uint16_t *size);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index d84550663..58c9ec119 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -160,6 +160,7 @@ swim_member_def_create(struct swim_member_def *def)
 	memset(def, 0, sizeof(*def));
 	def->addr.sin_family = AF_INET;
 	def->status = MEMBER_ALIVE;
+	def->payload_size = -1;
 }
 
 /**
@@ -179,6 +180,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
 		       struct swim_member_def *def)
 {
 	uint64_t tmp;
+	uint32_t len;
 	switch (key) {
 	case SWIM_MEMBER_STATUS:
 		if (swim_decode_uint(pos, end, &tmp, prefix,
@@ -210,6 +212,17 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
 				     "member incarnation") != 0)
 			return -1;
 		break;
+	case SWIM_MEMBER_PAYLOAD:
+		if (swim_decode_bin(&def->payload, &len, pos, end, prefix,
+				    "member payload") != 0)
+			return -1;
+		if (len > MAX_PAYLOAD_SIZE) {
+			diag_set(SwimError, "%s member payload size should be "\
+				 "<= %d", prefix, MAX_PAYLOAD_SIZE);
+			return -1;
+		}
+		def->payload_size = (int) len;
+		break;
 	default:
 		unreachable();
 	}
@@ -330,10 +343,22 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
 	header->v_anti_entropy = mp_bswap_u16(batch_size);
 }
 
+void
+swim_member_payload_bin_create(struct swim_member_payload_bin *bin)
+{
+	bin->k_payload = SWIM_MEMBER_PAYLOAD;
+	bin->m_payload_size = 0xc5;
+}
+
+void
+swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, uint16_t size)
+{
+	bin->v_payload_size = mp_bswap_u16(size);
+}
+
 void
 swim_passport_bin_create(struct swim_passport_bin *passport)
 {
-	passport->m_header = 0x85;
 	passport->k_status = SWIM_MEMBER_STATUS;
 	passport->k_addr = SWIM_MEMBER_ADDRESS;
 	passport->m_addr = 0xce;
@@ -350,8 +375,10 @@ void
 swim_passport_bin_fill(struct swim_passport_bin *passport,
 		       const struct sockaddr_in *addr,
 		       const struct tt_uuid *uuid,
-		       enum swim_member_status status, uint64_t incarnation)
+		       enum swim_member_status status, uint64_t incarnation,
+		       bool encode_payload)
 {
+	passport->m_header = 0x85 + encode_payload;
 	passport->v_status = status;
 	passport->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr));
 	passport->v_port = mp_bswap_u16(ntohs(addr->sin_port));
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index e1c70db43..23d339e80 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -38,6 +38,11 @@
 #include <stdbool.h>
 #include "swim_constants.h"
 
+enum {
+	/** Reserve 272 bytes for headers. */
+	MAX_PAYLOAD_SIZE = 1200,
+};
+
 /**
  * SWIM binary protocol structures and helpers. Below is a picture
  * of a SWIM message template:
@@ -67,7 +72,8 @@
  * |             SWIM_MEMBER_ADDRESS: uint, ip,                  |
  * |             SWIM_MEMBER_PORT: uint, port,                   |
  * |             SWIM_MEMBER_UUID: 16 byte UUID,                 |
- * |             SWIM_MEMBER_INCARNATION: uint                   |
+ * |             SWIM_MEMBER_INCARNATION: uint,                  |
+ * |             SWIM_MEMBER_PAYLOAD: bin                        |
  * |         },                                                  |
  * |         ...                                                 |
  * |     ],                                                      |
@@ -80,7 +86,8 @@
  * |             SWIM_MEMBER_ADDRESS: uint, ip,                  |
  * |             SWIM_MEMBER_PORT: uint, port,                   |
  * |             SWIM_MEMBER_UUID: 16 byte UUID,                 |
- * |             SWIM_MEMBER_INCARNATION: uint                   |
+ * |             SWIM_MEMBER_INCARNATION: uint,                  |
+ * |             SWIM_MEMBER_PAYLOAD: bin                        |
  * |         },                                                  |
  * |         ...                                                 |
  * |     ],                                                      |
@@ -103,6 +110,8 @@ struct swim_member_def {
 	struct sockaddr_in addr;
 	uint64_t incarnation;
 	enum swim_member_status status;
+	const char *payload;
+	int payload_size;
 };
 
 /** Initialize the definition with default values. */
@@ -242,6 +251,7 @@ enum swim_member_key {
 	SWIM_MEMBER_PORT,
 	SWIM_MEMBER_UUID,
 	SWIM_MEMBER_INCARNATION,
+	SWIM_MEMBER_PAYLOAD,
 	swim_member_key_MAX,
 };
 
@@ -305,6 +315,30 @@ struct PACKED swim_passport_bin {
 	uint64_t v_incarnation;
 };
 
+/**
+ * SWIM member's payload header. Payload data should be encoded
+ * right after it.
+ */
+struct PACKED swim_member_payload_bin {
+	/** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */
+	uint8_t k_payload;
+	/** mp_encode_bin(16bit bin header) */
+	uint8_t m_payload_size;
+	uint16_t v_payload_size;
+	/** Payload data ... */
+};
+
+/** Initialize payload record. */
+void
+swim_member_payload_bin_create(struct swim_member_payload_bin *bin);
+
+/**
+ * Fill a previously created payload record with an actual size.
+ */
+void
+swim_member_payload_bin_fill(struct swim_member_payload_bin *bin,
+			     uint16_t size);
+
 /** Initialize a member's binary passport. */
 void
 swim_passport_bin_create(struct swim_passport_bin *passport);
@@ -319,7 +353,8 @@ void
 swim_passport_bin_fill(struct swim_passport_bin *passport,
 		       const struct sockaddr_in *addr,
 		       const struct tt_uuid *uuid,
-		       enum swim_member_status status, uint64_t incarnation);
+		       enum swim_member_status status, uint64_t incarnation,
+		       bool encode_payload);
 
 /** }}}                  Anti-entropy component                 */
 
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 6f3871606..0b8058f0b 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -36,6 +36,7 @@
 #include "uri/uri.h"
 #include "swim/swim.h"
 #include "swim/swim_ev.h"
+#include "swim/swim_proto.h"
 #include "swim_test_transport.h"
 #include "swim_test_ev.h"
 #include "swim_test_utils.h"
@@ -642,10 +643,200 @@ swim_test_broadcast(void)
 	swim_finish_test();
 }
 
+static void
+swim_test_payload_basic(void)
+{
+	swim_start_test(11);
+	uint16_t size, cluster_size = 3;
+	struct swim_cluster *cluster = swim_cluster_new(cluster_size);
+	for (int i = 0; i < cluster_size; ++i) {
+		for (int j = i + 1; j < cluster_size; ++j)
+			swim_cluster_interconnect(cluster, i, j);
+	}
+	ok(swim_cluster_member_payload(cluster, 0, 0, &size) == NULL &&
+	   size == 0, "no payload by default");
+	is(swim_cluster_member_set_payload(cluster, 0, NULL, 1300), -1,
+	   "can not set too big payload");
+	ok(swim_error_check_match("Payload should be <="), "diag says too big");
+
+	const char *s0_payload = "S1 payload";
+	uint16_t s0_payload_size = strlen(s0_payload) + 1;
+	is(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+					   s0_payload_size), 0,
+	   "payload is set");
+	is(swim_cluster_member_incarnation(cluster, 0, 0), 1,
+	   "incarnation is incremeted on each payload update");
+	const char *tmp = swim_cluster_member_payload(cluster, 0, 0, &size);
+	ok(size == s0_payload_size && memcmp(s0_payload, tmp, size) == 0,
+	   "payload is successfully obtained back");
+
+	is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+						s0_payload_size, cluster_size),
+	   0, "payload is disseminated");
+	s0_payload = "S1 second version of payload";
+	s0_payload_size = strlen(s0_payload) + 1;
+	is(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+					   s0_payload_size), 0,
+	   "payload is changed");
+	is(swim_cluster_member_incarnation(cluster, 0, 0), 2,
+	   "incarnation is incremeted on each payload update");
+	is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+						s0_payload_size, cluster_size),
+	   0, "second payload is disseminated");
+	/*
+	 * Test that new incarnations help to rewrite the old
+	 * payload from anti-entropy.
+	 */
+	swim_cluster_set_drop(cluster, 0, 100);
+	s0_payload = "S1 third version of payload";
+	s0_payload_size = strlen(s0_payload) + 1;
+	fail_if(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+						s0_payload_size) != 0);
+	/* Wait at least one round until payload TTD gets 0. */
+	swim_run_for(3);
+	swim_cluster_set_drop(cluster, 0, 0);
+	is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+						s0_payload_size, cluster_size),
+	   0, "third payload is disseminated via anti-entropy");
+
+	swim_cluster_delete(cluster);
+	swim_finish_test();
+}
+
+static void
+swim_test_payload_refutation(void)
+{
+	swim_start_test(11);
+	uint16_t size, cluster_size = 3;
+	struct swim_cluster *cluster = swim_cluster_new(cluster_size);
+	swim_cluster_set_ack_timeout(cluster, 1);
+	for (int i = 0; i < cluster_size; ++i) {
+		for (int j = i + 1; j < cluster_size; ++j)
+			swim_cluster_interconnect(cluster, i, j);
+	}
+	const char *s0_old_payload = "s0 payload";
+	uint16_t s0_old_payload_size = strlen(s0_old_payload) + 1;
+	fail_if(swim_cluster_member_set_payload(cluster, 0, s0_old_payload,
+						s0_old_payload_size) != 0);
+	fail_if(swim_cluster_wait_payload_everywhere(cluster, 0, s0_old_payload,
+						     s0_old_payload_size,
+						     3) != 0);
+	/*
+	 * The test checks the following case. Assume there are 3
+	 * nodes: S1, S2, S3. They all know each other. S1 sets
+	 * new payload, S2 and S3 knows that. They all see that S1
+	 * has incarnation 1 and payload P1.
+	 *
+	 * Now S1 changes payload to P2. Its incarnation becomes
+	 * 2. During next entire round its round messages are
+	 * lost, however ACKs work ok.
+	 */
+	const char *s0_new_payload = "s0 second payload";
+	uint16_t s0_new_payload_size = strlen(s0_new_payload);
+	fail_if(swim_cluster_member_set_payload(cluster, 0, s0_new_payload,
+						s0_new_payload_size) != 0);
+	int components[2] = {SWIM_DISSEMINATION, SWIM_ANTI_ENTROPY};
+	swim_cluster_drop_components(cluster, 0, components, 2);
+	swim_run_for(3);
+	swim_cluster_drop_components(cluster, 0, NULL, 0);
+
+	is(swim_cluster_member_incarnation(cluster, 1, 0), 2,
+	   "S2 sees new incarnation of S1");
+	is(swim_cluster_member_incarnation(cluster, 2, 0), 2,
+	   "S3 does the same");
+
+	const char *tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+	ok(size == s0_old_payload_size &&
+	   memcmp(tmp, s0_old_payload, size) == 0,
+	   "but S2 does not known the new payload");
+
+	tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+	ok(size == s0_old_payload_size &&
+	   memcmp(tmp, s0_old_payload, size) == 0,
+	   "as well as S3");
+
+	/* Restore normal ACK timeout. */
+	swim_cluster_set_ack_timeout(cluster, 30);
+
+	/*
+	 * Now S1's payload TTD is 0, but via ACKs S1 sent its new
+	 * incarnation to S2 and S3. Despite that they should
+	 * apply new S1's payload via anti-entropy. Next lines
+	 * test that:
+	 *
+	 * 1) S2 can apply new S1's payload from S1's
+	 *    anti-entropy;
+	 *
+	 * 2) S2 will not receive the old S1's payload from S3.
+	 *    S3 knows, that its payload is outdated, and should
+	 *    not send it;
+	 *
+	 * 2) S3 can apply new S1's payload from S2's
+	 *    anti-entropy. Note, that here S3 applies the payload
+	 *    not directly from the originator. It is the most
+	 *    complex case.
+	 *
+	 * Next lines test the case (1).
+	 */
+
+	/* S3 does not participate in the test (1). */
+	swim_cluster_set_drop(cluster, 2, 100);
+	swim_run_for(3);
+
+	tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+	ok(size == s0_new_payload_size &&
+	   memcmp(tmp, s0_new_payload, size) == 0,
+	   "S2 learned S1's payload via anti-entropy");
+	is(swim_cluster_member_incarnation(cluster, 1, 0), 2,
+	   "incarnation still is the same");
+
+	tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+	ok(size == s0_old_payload_size &&
+	   memcmp(tmp, s0_old_payload, size) == 0,
+	   "S3 was blocked and does not know anything");
+	is(swim_cluster_member_incarnation(cluster, 2, 0), 2,
+	   "incarnation still is the same");
+
+	/* S1 will not participate in the tests further. */
+	swim_cluster_set_drop(cluster, 0, 100);
+
+	/*
+	 * Now check the case (2) - S3 will not send outdated
+	 * version of S1's payload. To maintain the experimental
+	 * integrity S1 and S2 are silent. Only S3 sends packets.
+	 */
+	swim_cluster_set_drop(cluster, 2, 0);
+	swim_cluster_set_drop_out(cluster, 1, 100);
+	swim_run_for(3);
+
+	tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+	ok(size == s0_new_payload_size &&
+	   memcmp(tmp, s0_new_payload, size) == 0,
+	   "S2 keeps the same new S1's payload, S3 did not rewrite it");
+
+	tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+	ok(size == s0_old_payload_size &&
+	   memcmp(tmp, s0_old_payload, size) == 0,
+	   "S3 still does not know anything");
+
+	/*
+	 * Now check the case (3) - S3 accepts new S1's payload
+	 * from S2. Even knowing the same S1's incarnation.
+	 */
+	swim_cluster_set_drop(cluster, 1, 0);
+	swim_cluster_set_drop_out(cluster, 2, 100);
+	is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_new_payload,
+						s0_new_payload_size, 3), 0,
+	  "S3 learns S1's payload from S2")
+
+	swim_cluster_delete(cluster);
+	swim_finish_test();
+}
+
 static int
 main_f(va_list ap)
 {
-	swim_start_test(15);
+	swim_start_test(17);
 
 	(void) ap;
 	swim_test_ev_init();
@@ -666,6 +857,8 @@ main_f(va_list ap)
 	swim_test_quit();
 	swim_test_uri_update();
 	swim_test_broadcast();
+	swim_test_payload_basic();
+	swim_test_payload_refutation();
 
 	swim_test_transport_free();
 	swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index a90a86dd0..4b1407db3 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
 	*** main_f ***
-1..15
+1..17
 	*** swim_test_one_link ***
     1..6
     ok 1 - no rounds - no fullmesh
@@ -147,4 +147,34 @@ ok 14 - subtests
     ok 6 - fullmesh is reached, and no one link was added explicitly
 ok 15 - subtests
 	*** swim_test_broadcast: done ***
+	*** swim_test_payload_basic ***
+    1..11
+    ok 1 - no payload by default
+    ok 2 - can not set too big payload
+    ok 3 - diag says too big
+    ok 4 - payload is set
+    ok 5 - incarnation is incremeted on each payload update
+    ok 6 - payload is successfully obtained back
+    ok 7 - payload is disseminated
+    ok 8 - payload is changed
+    ok 9 - incarnation is incremeted on each payload update
+    ok 10 - second payload is disseminated
+    ok 11 - third payload is disseminated via anti-entropy
+ok 16 - subtests
+	*** swim_test_payload_basic: done ***
+	*** swim_test_payload_refutation ***
+    1..11
+    ok 1 - S2 sees new incarnation of S1
+    ok 2 - S3 does the same
+    ok 3 - but S2 does not known the new payload
+    ok 4 - as well as S3
+    ok 5 - S2 learned S1's payload via anti-entropy
+    ok 6 - incarnation still is the same
+    ok 7 - S3 was blocked and does not know anything
+    ok 8 - incarnation still is the same
+    ok 9 - S2 keeps the same new S1's payload, S3 did not rewrite it
+    ok 10 - S3 still does not know anything
+    ok 11 - S3 learns S1's payload from S2
+ok 17 - subtests
+	*** swim_test_payload_refutation: done ***
 	*** main_f: done ***
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index fd528d166..45570cce5 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -191,6 +191,27 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id,
 	return swim_member_incarnation(m);
 }
 
+const char *
+swim_cluster_member_payload(struct swim_cluster *cluster, int node_id,
+			    int member_id, uint16_t *size)
+{
+	const struct swim_member *m =
+		swim_cluster_member_view(cluster, node_id, member_id);
+	if (m == NULL) {
+		*size = 0;
+		return NULL;
+	}
+	return swim_member_payload(m, size);
+}
+
+int
+swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
+				const char *payload, uint16_t size)
+{
+	struct swim *s = swim_cluster_node(cluster, i);
+	return swim_set_payload(s, payload, size);
+}
+
 struct swim *
 swim_cluster_node(struct swim_cluster *cluster, int i)
 {
@@ -506,6 +527,13 @@ struct swim_member_template {
 	 */
 	bool need_check_incarnation;
 	uint64_t incarnation;
+	/**
+	 * True, if the payload should be checked to be equal to
+	 * @a payload of size @a payload_size.
+	 */
+	bool need_check_payload;
+	const char *payload;
+	uint16_t payload_size;
 };
 
 /** Build member template. No checks are set. */
@@ -542,6 +570,19 @@ swim_member_template_set_incarnation(struct swim_member_template *t,
 	t->incarnation = incarnation;
 }
 
+/**
+ * Set that the member template should be used to check member
+ * status.
+ */
+static inline void
+swim_member_template_set_payload(struct swim_member_template *t,
+				 const char *payload, uint16_t payload_size)
+{
+	t->need_check_payload = true;
+	t->payload = payload;
+	t->payload_size = payload_size;
+}
+
 /** Callback to check that a member matches a template. */
 static bool
 swim_loop_check_member(struct swim_cluster *cluster, void *data)
@@ -551,17 +592,26 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data)
 		swim_cluster_member_view(cluster, t->node_id, t->member_id);
 	enum swim_member_status status;
 	uint64_t incarnation;
+	const char *payload;
+	uint16_t payload_size;
 	if (m != NULL) {
 		status = swim_member_status(m);
 		incarnation = swim_member_incarnation(m);
+		payload = swim_member_payload(m, &payload_size);
 	} else {
 		status = swim_member_status_MAX;
 		incarnation = 0;
+		payload = NULL;
+		payload_size = 0;
 	}
 	if (t->need_check_status && status != t->status)
 		return false;
 	if (t->need_check_incarnation && incarnation != t->incarnation)
 		return false;
+	if (t->need_check_payload &&
+	    (payload_size != t->payload_size ||
+	     memcmp(payload, t->payload, payload_size) != 0))
+		return false;
 	return true;
 }
 
@@ -644,6 +694,18 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id,
 				 swim_loop_check_member_everywhere, &t);
 }
 
+int
+swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
+				     int member_id, const char *payload,
+				     uint16_t payload_size, double timeout)
+{
+	struct swim_member_template t;
+	swim_member_template_create(&t, -1, member_id);
+	swim_member_template_set_payload(&t, payload, payload_size);
+	return swim_wait_timeout(timeout, cluster,
+				 swim_loop_check_member_everywhere, &t);
+}
+
 bool
 swim_error_check_match(const char *msg)
 {
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 6ea136e36..100a67e0c 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -141,6 +141,14 @@ uint64_t
 swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id,
 				int member_id);
 
+const char *
+swim_cluster_member_payload(struct swim_cluster *cluster, int node_id,
+			    int member_id, uint16_t *size);
+
+int
+swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
+				const char *payload, uint16_t size);
+
 /**
  * Check if in the cluster every instance knowns the about other
  * instances.
@@ -192,6 +200,16 @@ swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id,
 			      int member_id, uint64_t incarnation,
 			      double timeout);
 
+/**
+ * Wait until a member with id @a member_id is seen with
+ * @a payload of size @a payload_size in the membership table of
+ * every instance in @a cluster. At most @a timeout seconds.
+ */
+int
+swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
+				     int member_id, const char *payload,
+				     uint16_t payload_size, double timeout);
+
 /** Process SWIM events for @a duration fake seconds. */
 void
 swim_run_for(double duration);







More information about the Tarantool-patches mailing list