[tarantool-patches] [PATCH 6/6] swim: introduce payload

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Apr 12 01:22:30 MSK 2019


Payload is arbitrary user data disseminated over the cluster
along with other member attributes.

Part of #3234
---
 src/lib/swim/swim.c         | 155 ++++++++++++++++++++++++++--
 src/lib/swim/swim.h         |   8 ++
 src/lib/swim/swim_proto.c   |  31 +++++-
 src/lib/swim/swim_proto.h   |  41 +++++++-
 test/unit/swim.c            | 195 +++++++++++++++++++++++++++++++++++-
 test/unit/swim.result       |  32 +++++-
 test/unit/swim_test_utils.c |  62 ++++++++++++
 test/unit/swim_test_utils.h |  18 ++++
 8 files changed, 525 insertions(+), 17 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 2dac6eedd..2be1c846b 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -308,6 +308,38 @@ struct swim_member {
 	 * allow other members to learn the dead status.
 	 */
 	int status_ttl;
+	/** Arbitrary user data, disseminated on each change. */
+	char *payload;
+	/** Payload size, in bytes. */
+	uint16_t payload_size;
+	/**
+	 * True, if the payload is thought to be of the most
+	 * actual version. In such a case it can be disseminated
+	 * farther. Otherwise @a payload is suspected to be
+	 * outdated and can be updated in two cases only:
+	 *
+	 * 1) when it is received with a bigger incarnation from
+	 *    anywhere;
+	 *
+	 * 2) when it is received with the same incarnation, but
+	 *    local payload is outdated.
+	 *
+	 * A payload can become outdated, if anyhow a new
+	 * incarnation of the member has been learned, but not a
+	 * new payload. In such a case it can't be said exactly
+	 * whether the member has updated payload, or another
+	 * attribute. The only way here is to wait until the most
+	 * actual payload will be received from another instance.
+	 * Note, that such an instance always exists - the payload
+	 * originator instance.
+	 */
+	bool is_payload_up_to_date;
+	/**
+	 * TTL of payload. At most this number of times payload is
+	 * sent as a part of dissemination component. Reset on
+	 * each payload update.
+	 */
+	int payload_ttl;
 	/**
 	 * All created events are put into a queue sorted by event
 	 * time.
@@ -523,6 +555,34 @@ swim_by_scheduler(struct swim_scheduler *scheduler)
 	return container_of(scheduler, struct swim, scheduler);
 }
 
+/** Update member's payload, register a corresponding event. */
+static inline int
+swim_update_member_payload(struct swim *swim, struct swim_member *member,
+			   const char *payload, uint16_t payload_size,
+			   int incarnation_increment)
+{
+	assert(payload_size <= MAX_PAYLOAD_SIZE);
+	char *new_payload;
+	if (payload_size > 0) {
+		new_payload = (char *) realloc(member->payload, payload_size);
+		if (new_payload == NULL) {
+			diag_set(OutOfMemory, payload_size, "realloc", "new_payload");
+			return -1;
+		}
+		memcpy(new_payload, payload, payload_size);
+	} else {
+		free(member->payload);
+		new_payload = NULL;
+	}
+	member->payload = new_payload;
+	member->payload_size = payload_size;
+	member->payload_ttl = mh_size(swim->members);
+	member->incarnation += incarnation_increment;
+	member->is_payload_up_to_date = true;
+	swim_on_member_update(swim, member);
+	return 0;
+}
+
 /**
  * Once a ping is sent, the member should start waiting for an
  * ACK.
@@ -556,6 +616,7 @@ swim_member_delete(struct swim_member *member)
 
 	/* Dissemination component. */
 	assert(rlist_empty(&member->in_dissemination_queue));
+	free(member->payload);
 
 	free(member);
 }
@@ -637,7 +698,7 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid)
 static struct swim_member *
 swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 		const struct tt_uuid *uuid, enum swim_member_status status,
-		uint64_t incarnation)
+		uint64_t incarnation, const char *payload, int payload_size)
 {
 	int new_bsize = sizeof(swim->shuffled[0]) *
 			(mh_size(swim->members) + 1);
@@ -675,6 +736,12 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 
 	/* Dissemination component. */
 	swim_on_member_update(swim, member);
+	if (payload_size >= 0 &&
+	    swim_update_member_payload(swim, member, payload,
+				       payload_size, 0) != 0) {
+		swim_delete_member(swim, member);
+		return NULL;
+	}
 
 	say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim),
 		    swim_uuid_str(&member->uuid), mh_size(swim->members));
@@ -739,17 +806,29 @@ swim_new_round(struct swim *swim)
  */
 static int
 swim_encode_member(struct swim_packet *packet, struct swim_member *m,
-		   struct swim_passport_bin *passport)
+		   struct swim_passport_bin *passport,
+		   struct swim_member_payload_bin *payload_header,
+		   bool is_payload_needed)
 {
 	/* The headers should be initialized. */
 	assert(passport->k_status == SWIM_MEMBER_STATUS);
+	assert(payload_header->k_payload == SWIM_MEMBER_PAYLOAD);
 	int size = sizeof(*passport);
+	if (is_payload_needed)
+		size += sizeof(*payload_header) + m->payload_size;
 	char *pos = swim_packet_alloc(packet, size);
 	if (pos == NULL)
 		return -1;
 	swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status,
-			       m->incarnation);
+			       m->incarnation, is_payload_needed);
 	memcpy(pos, passport, sizeof(*passport));
+	if (is_payload_needed) {
+		pos += sizeof(*passport);
+		swim_member_payload_bin_fill(payload_header, m->payload_size);
+		memcpy(pos, payload_header, sizeof(*payload_header));
+		pos += sizeof(*payload_header);
+		memcpy(pos, m->payload, m->payload_size);
+	}
 	return 0;
 }
 
@@ -763,17 +842,21 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
 {
 	struct swim_anti_entropy_header_bin ae_header_bin;
 	struct swim_passport_bin passport_bin;
+	struct swim_member_payload_bin payload_header;
 	char *header = swim_packet_alloc(packet, sizeof(ae_header_bin));
 	if (header == NULL)
 		return 0;
 	swim_passport_bin_create(&passport_bin);
+	swim_member_payload_bin_create(&payload_header);
 	struct mh_swim_table_t *t = swim->members;
 	int i = 0, member_count = mh_size(t);
 	int rnd = swim_scaled_rand(0, member_count - 1);
 	for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
 	     i < member_count; ++i) {
 		struct swim_member *m = *mh_swim_table_node(t, rc);
-		if (swim_encode_member(packet, m, &passport_bin) != 0)
+		if (swim_encode_member(packet, m, &passport_bin,
+				       &payload_header,
+				       m->is_payload_up_to_date) != 0)
 			break;
 		/*
 		 * First random member could be chosen too close
@@ -833,16 +916,21 @@ static int
 swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
 {
 	struct swim_diss_header_bin diss_header_bin;
+	struct swim_member_payload_bin payload_header;
 	struct swim_passport_bin passport_bin;
 	char *header = swim_packet_alloc(packet, sizeof(diss_header_bin));
 	if (header == NULL)
 		return 0;
 	swim_passport_bin_create(&passport_bin);
+	swim_member_payload_bin_create(&payload_header);
 	int i = 0;
 	struct swim_member *m;
 	rlist_foreach_entry(m, &swim->dissemination_queue,
 			    in_dissemination_queue) {
-		if (swim_encode_member(packet, m, &passport_bin) != 0)
+		bool is_payload_needed = m->payload_ttl > 0 &&
+					 m->is_payload_up_to_date;
+		if (swim_encode_member(packet, m, &passport_bin,
+				       &payload_header, is_payload_needed) != 0)
 			break;
 		++i;
 	}
@@ -890,6 +978,10 @@ swim_decrease_event_ttl(struct swim *swim)
 	rlist_foreach_entry_safe(member, &swim->dissemination_queue,
 				 in_dissemination_queue,
 				 tmp) {
+		if (member->payload_ttl > 0) {
+			if (--member->payload_ttl == 0)
+				swim_cached_round_msg_invalidate(swim);
+		}
 		if (--member->status_ttl == 0) {
 			rlist_del_entry(member, in_dissemination_queue);
 			swim_cached_round_msg_invalidate(swim);
@@ -1065,8 +1157,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def,
 {
 	assert(member != swim->self);
 	assert(def->incarnation >= member->incarnation);
-	if (def->incarnation > member->incarnation)
+	/*
+	 * Payload update rules are simple: it can be updated
+	 * either if the new payload has a bigger incarnation, or
+	 * the same incarnation, but local payload is outdated.
+	 */
+	bool is_payload_needed = false;
+	if (def->incarnation > member->incarnation) {
 		swim_update_member_addr(swim, member, &def->addr, 0);
+		if (def->payload_size >= 0) {
+			is_payload_needed = true;
+		} else if (member->is_payload_up_to_date) {
+			member->is_payload_up_to_date = false;
+			swim_on_member_update(swim, member);
+		}
+	} else if (! member->is_payload_up_to_date && def->payload_size >= 0) {
+		is_payload_needed = true;
+	}
+	if (is_payload_needed &&
+	    swim_update_member_payload(swim, member, def->payload,
+				       def->payload_size, 0) != 0) {
+		/* Not such a critical error. */
+		diag_log();
+	}
 	swim_update_member_inc_status(swim, member, def->status,
 				      def->incarnation);
 }
@@ -1107,7 +1220,8 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
 			goto skip;
 		}
 		*result = swim_new_member(swim, &def->addr, &def->uuid,
-					  def->status, def->incarnation);
+					  def->status, def->incarnation,
+					  def->payload, def->payload_size);
 		return *result != NULL ? 0 : -1;
 	}
 	*result = member;
@@ -1436,7 +1550,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 			return -1;
 		}
 		swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE,
-					     0);
+					     0, NULL, 0);
 		if (swim->self == NULL)
 			return -1;
 	} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
@@ -1448,7 +1562,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 			return -1;
 		}
 		new_self = swim_new_member(swim, &swim->self->addr, uuid,
-					   MEMBER_ALIVE, 0);
+					   MEMBER_ALIVE, 0, swim->self->payload,
+					   swim->self->payload_size);
 		if (new_self == NULL)
 			return -1;
 	}
@@ -1504,6 +1619,18 @@ swim_is_configured(const struct swim *swim)
 	return swim->self != NULL;
 }
 
+int
+swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size)
+{
+	if (payload_size > MAX_PAYLOAD_SIZE) {
+		diag_set(IllegalParams, "Payload should be <= %d",
+			 MAX_PAYLOAD_SIZE);
+		return -1;
+	}
+	return swim_update_member_payload(swim, swim->self, payload,
+					  payload_size, 1);
+}
+
 int
 swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
 {
@@ -1518,7 +1645,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
 		return -1;
 	struct swim_member *member = swim_find_member(swim, uuid);
 	if (member == NULL) {
-		member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0);
+		member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0,
+					 NULL, -1);
 		return member == NULL ? -1 : 0;
 	}
 	diag_set(SwimError, "%s a member with such UUID already exists",
@@ -1754,3 +1882,10 @@ swim_member_incarnation(const struct swim_member *member)
 {
 	return member->incarnation;
 }
+
+const char *
+swim_member_payload(const struct swim_member *member, uint16_t *size)
+{
+	*size = member->payload_size;
+	return member->payload;
+}
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 09d933b83..6a219d131 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -104,6 +104,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 double
 swim_ack_timeout(const struct swim *swim);
 
+/** Set payload to disseminate over the cluster. */
+int
+swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size);
+
 /**
  * Stop listening and broadcasting messages, cleanup all internal
  * structures, free memory.
@@ -220,6 +224,10 @@ swim_member_uuid(const struct swim_member *member);
 uint64_t
 swim_member_incarnation(const struct swim_member *member);
 
+/** Member's payload. */
+const char *
+swim_member_payload(const struct swim_member *member, uint16_t *size);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index d84550663..16034a25b 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -160,6 +160,7 @@ swim_member_def_create(struct swim_member_def *def)
 	memset(def, 0, sizeof(*def));
 	def->addr.sin_family = AF_INET;
 	def->status = MEMBER_ALIVE;
+	def->payload_size = -1;
 }
 
 /**
@@ -179,6 +180,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
 		       struct swim_member_def *def)
 {
 	uint64_t tmp;
+	uint32_t len;
 	switch (key) {
 	case SWIM_MEMBER_STATUS:
 		if (swim_decode_uint(pos, end, &tmp, prefix,
@@ -210,6 +212,17 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
 				     "member incarnation") != 0)
 			return -1;
 		break;
+	case SWIM_MEMBER_PAYLOAD:
+		if (swim_decode_bin(&def->payload, &len, pos, end, prefix,
+				    "member payload") != 0)
+			return -1;
+		if (len > MAX_PAYLOAD_SIZE) {
+			diag_set(SwimError, "%s member payload size should be "\
+				 "<= %d", prefix, MAX_PAYLOAD_SIZE);
+			return -1;
+		}
+		def->payload_size = (int) len;
+		break;
 	default:
 		unreachable();
 	}
@@ -330,10 +343,22 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
 	header->v_anti_entropy = mp_bswap_u16(batch_size);
 }
 
+void
+swim_member_payload_bin_create(struct swim_member_payload_bin *bin)
+{
+	bin->k_payload = SWIM_MEMBER_PAYLOAD;
+	bin->m_payload_size = 0xc5;
+}
+
+void
+swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, uint16_t size)
+{
+	bin->v_payload_size = mp_bswap_u16(size);
+}
+
 void
 swim_passport_bin_create(struct swim_passport_bin *passport)
 {
-	passport->m_header = 0x85;
 	passport->k_status = SWIM_MEMBER_STATUS;
 	passport->k_addr = SWIM_MEMBER_ADDRESS;
 	passport->m_addr = 0xce;
@@ -350,8 +375,10 @@ void
 swim_passport_bin_fill(struct swim_passport_bin *passport,
 		       const struct sockaddr_in *addr,
 		       const struct tt_uuid *uuid,
-		       enum swim_member_status status, uint64_t incarnation)
+		       enum swim_member_status status, uint64_t incarnation,
+		       bool is_payload_needed)
 {
+	passport->m_header = 0x85 + is_payload_needed;
 	passport->v_status = status;
 	passport->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr));
 	passport->v_port = mp_bswap_u16(ntohs(addr->sin_port));
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index ab4057185..607ac80dd 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -38,6 +38,11 @@
 #include <stdbool.h>
 #include "swim_constants.h"
 
+enum {
+	/** Reserve 272 bytes for headers. */
+	MAX_PAYLOAD_SIZE = 1200,
+};
+
 /**
  * SWIM binary protocol structures and helpers. Below is a picture
  * of a SWIM message template:
@@ -67,7 +72,8 @@
  * |             SWIM_MEMBER_ADDRESS: uint, ip,                  |
  * |             SWIM_MEMBER_PORT: uint, port,                   |
  * |             SWIM_MEMBER_UUID: 16 byte UUID,                 |
- * |             SWIM_MEMBER_INCARNATION: uint                   |
+ * |             SWIM_MEMBER_INCARNATION: uint,                  |
+ * |             SWIM_MEMBER_PAYLOAD: bin                        |
  * |         },                                                  |
  * |         ...                                                 |
  * |     ],                                                      |
@@ -80,7 +86,8 @@
  * |             SWIM_MEMBER_ADDRESS: uint, ip,                  |
  * |             SWIM_MEMBER_PORT: uint, port,                   |
  * |             SWIM_MEMBER_UUID: 16 byte UUID,                 |
- * |             SWIM_MEMBER_INCARNATION: uint                   |
+ * |             SWIM_MEMBER_INCARNATION: uint,                  |
+ * |             SWIM_MEMBER_PAYLOAD: bin                        |
  * |         },                                                  |
  * |         ...                                                 |
  * |     ],                                                      |
@@ -103,6 +110,8 @@ struct swim_member_def {
 	struct sockaddr_in addr;
 	uint64_t incarnation;
 	enum swim_member_status status;
+	const char *payload;
+	int payload_size;
 };
 
 /** Initialize the definition with default values. */
@@ -242,6 +251,7 @@ enum swim_member_key {
 	SWIM_MEMBER_PORT,
 	SWIM_MEMBER_UUID,
 	SWIM_MEMBER_INCARNATION,
+	SWIM_MEMBER_PAYLOAD,
 	swim_member_key_MAX,
 };
 
@@ -305,6 +315,30 @@ struct PACKED swim_passport_bin {
 	uint64_t v_incarnation;
 };
 
+/**
+ * SWIM member's payload header. Payload data should be encoded
+ * right after it.
+ */
+struct PACKED swim_member_payload_bin {
+	/** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */
+	uint8_t k_payload;
+	/** mp_encode_bin(16bit bin header) */
+	uint8_t m_payload_size;
+	uint16_t v_payload_size;
+	/** Payload data ... */
+};
+
+/** Initialize payload record. */
+void
+swim_member_payload_bin_create(struct swim_member_payload_bin *bin);
+
+/**
+ * Fill a previously created payload record with an actual size.
+ */
+void
+swim_member_payload_bin_fill(struct swim_member_payload_bin *bin,
+			     uint16_t size);
+
 /** Initialize a member's binary passport. */
 void
 swim_passport_bin_create(struct swim_passport_bin *passport);
@@ -319,7 +353,8 @@ void
 swim_passport_bin_fill(struct swim_passport_bin *passport,
 		       const struct sockaddr_in *addr,
 		       const struct tt_uuid *uuid,
-		       enum swim_member_status status, uint64_t incarnation);
+		       enum swim_member_status status, uint64_t incarnation,
+		       bool is_payload_needed);
 
 /** }}}                  Anti-entropy component                 */
 
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 48aea2f07..10d2b0bbd 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -36,6 +36,7 @@
 #include "uri/uri.h"
 #include "swim/swim.h"
 #include "swim/swim_ev.h"
+#include "swim/swim_proto.h"
 #include "swim_test_transport.h"
 #include "swim_test_ev.h"
 #include "swim_test_utils.h"
@@ -642,10 +643,200 @@ swim_test_broadcast(void)
 	swim_finish_test();
 }
 
+static void
+swim_test_payload_basic(void)
+{
+	swim_start_test(11);
+	uint16_t size, cluster_size = 3;
+	struct swim_cluster *cluster = swim_cluster_new(cluster_size);
+	for (int i = 0; i < cluster_size; ++i) {
+		for (int j = i + 1; j < cluster_size; ++j)
+			swim_cluster_interconnect(cluster, i, j);
+	}
+	ok(swim_cluster_member_payload(cluster, 0, 0, &size) == NULL &&
+	   size == 0, "no payload by default");
+	is(swim_cluster_member_set_payload(cluster, 0, NULL, 1300), -1,
+	   "can not set too big payload");
+	ok(swim_error_check_match("Payload should be <="), "diag says too big");
+
+	const char *s0_payload = "S1 payload";
+	uint16_t s0_payload_size = strlen(s0_payload) + 1;
+	is(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+					   s0_payload_size), 0,
+	   "payload is set");
+	is(swim_cluster_member_incarnation(cluster, 0, 0), 1,
+	   "incarnation is incremeted on each payload update");
+	const char *tmp = swim_cluster_member_payload(cluster, 0, 0, &size);
+	ok(size == s0_payload_size && memcmp(s0_payload, tmp, size) == 0,
+	   "payload is successfully obtained back");
+
+	is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+						s0_payload_size, cluster_size),
+	   0, "payload is disseminated");
+	s0_payload = "S1 second version of payload";
+	s0_payload_size = strlen(s0_payload) + 1;
+	is(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+					   s0_payload_size), 0,
+	   "payload is changed");
+	is(swim_cluster_member_incarnation(cluster, 0, 0), 2,
+	   "incarnation is incremeted on each payload update");
+	is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+						s0_payload_size, cluster_size),
+	   0, "second payload is disseminated");
+	/*
+	 * Test that new incarnations help to rewrite the old
+	 * payload from anti-entropy.
+	 */
+	swim_cluster_set_drop(cluster, 0, 100);
+	s0_payload = "S1 third version of payload";
+	s0_payload_size = strlen(s0_payload) + 1;
+	fail_if(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+						s0_payload_size) != 0);
+	/* Wait at least one round until payload TTL gets 0. */
+	swim_run_for(3);
+	swim_cluster_set_drop(cluster, 0, 0);
+	is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+						s0_payload_size, cluster_size),
+	   0, "third payload is disseminated via anti-entropy");
+
+	swim_cluster_delete(cluster);
+	swim_finish_test();
+}
+
+static void
+swim_test_payload_refutation(void)
+{
+	swim_start_test(11);
+	uint16_t size, cluster_size = 3;
+	struct swim_cluster *cluster = swim_cluster_new(cluster_size);
+	swim_cluster_set_ack_timeout(cluster, 1);
+	for (int i = 0; i < cluster_size; ++i) {
+		for (int j = i + 1; j < cluster_size; ++j)
+			swim_cluster_interconnect(cluster, i, j);
+	}
+	const char *s0_old_payload = "s0 payload";
+	uint16_t s0_old_payload_size = strlen(s0_old_payload) + 1;
+	fail_if(swim_cluster_member_set_payload(cluster, 0, s0_old_payload,
+						s0_old_payload_size) != 0);
+	fail_if(swim_cluster_wait_payload_everywhere(cluster, 0, s0_old_payload,
+						     s0_old_payload_size,
+						     3) != 0);
+	/*
+	 * The test checks the following case. Assume there are 3
+	 * nodes: S1, S2, S3. They all know each other. S1 sets
+	 * new payload, S2 and S3 knows that. They all see that S1
+	 * has incarnation 1 and payload P1.
+	 *
+	 * Now S1 changes payload to P2. Its incarnation becomes
+	 * 2. During next entire round its round messages are
+	 * lost, however ACKs work ok.
+	 */
+	const char *s0_new_payload = "s0 second payload";
+	uint16_t s0_new_payload_size = strlen(s0_new_payload);
+	fail_if(swim_cluster_member_set_payload(cluster, 0, s0_new_payload,
+						s0_new_payload_size) != 0);
+	int components[2] = {SWIM_DISSEMINATION, SWIM_ANTI_ENTROPY};
+	swim_cluster_drop_components(cluster, 0, components, 2);
+	swim_run_for(3);
+	swim_cluster_drop_components(cluster, 0, NULL, 0);
+
+	is(swim_cluster_member_incarnation(cluster, 1, 0), 2,
+	   "S2 sees new incarnation of S1");
+	is(swim_cluster_member_incarnation(cluster, 2, 0), 2,
+	   "S3 does the same");
+
+	const char *tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+	ok(size == s0_old_payload_size &&
+	   memcmp(tmp, s0_old_payload, size) == 0,
+	   "but S2 does not known the new payload");
+
+	tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+	ok(size == s0_old_payload_size &&
+	   memcmp(tmp, s0_old_payload, size) == 0,
+	   "as well as S3");
+
+	/* Restore normal ACK timeout. */
+	swim_cluster_set_ack_timeout(cluster, 30);
+
+	/*
+	 * Now S1's payload TTL is 0, but via ACKs S1 sent its new
+	 * incarnation to S2 and S3. Despite that they should
+	 * apply new S1's payload via anti-entropy. Next lines
+	 * test that:
+	 *
+	 * 1) S2 can apply new S1's payload from S1's
+	 *    anti-entropy;
+	 *
+	 * 2) S2 will not receive the old S1's payload from S3.
+	 *    S3 knows, that its payload is outdated, and should
+	 *    not send it;
+	 *
+	 * 2) S3 can apply new S1's payload from S2's
+	 *    anti-entropy. Note, that here S3 applies the payload
+	 *    not directly from the originator. It is the most
+	 *    complex case.
+	 *
+	 * Next lines test the case (1).
+	 */
+
+	/* S3 does not participate in the test (1). */
+	swim_cluster_set_drop(cluster, 2, 100);
+	swim_run_for(3);
+
+	tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+	ok(size == s0_new_payload_size &&
+	   memcmp(tmp, s0_new_payload, size) == 0,
+	   "S2 learned S1's payload via anti-entropy");
+	is(swim_cluster_member_incarnation(cluster, 1, 0), 2,
+	   "incarnation still is the same");
+
+	tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+	ok(size == s0_old_payload_size &&
+	   memcmp(tmp, s0_old_payload, size) == 0,
+	   "S3 was blocked and does not know anything");
+	is(swim_cluster_member_incarnation(cluster, 2, 0), 2,
+	   "incarnation still is the same");
+
+	/* S1 will not participate in the tests further. */
+	swim_cluster_set_drop(cluster, 0, 100);
+
+	/*
+	 * Now check the case (2) - S3 will not send outdated
+	 * version of S1's payload. To maintain the experimental
+	 * integrity S1 and S2 are silent. Only S3 sends packets.
+	 */
+	swim_cluster_set_drop(cluster, 2, 0);
+	swim_cluster_set_drop_out(cluster, 1, 100);
+	swim_run_for(3);
+
+	tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+	ok(size == s0_new_payload_size &&
+	   memcmp(tmp, s0_new_payload, size) == 0,
+	   "S2 keeps the same new S1's payload, S3 did not rewrite it");
+
+	tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+	ok(size == s0_old_payload_size &&
+	   memcmp(tmp, s0_old_payload, size) == 0,
+	   "S3 still does not know anything");
+
+	/*
+	 * Now check the case (3) - S3 accepts new S1's payload
+	 * from S2. Even knowing the same S1's incarnation.
+	 */
+	swim_cluster_set_drop(cluster, 1, 0);
+	swim_cluster_set_drop_out(cluster, 2, 100);
+	is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_new_payload,
+						s0_new_payload_size, 3), 0,
+	  "S3 learns S1's payload from S2")
+
+	swim_cluster_delete(cluster);
+	swim_finish_test();
+}
+
 static int
 main_f(va_list ap)
 {
-	swim_start_test(15);
+	swim_start_test(17);
 
 	(void) ap;
 	swim_test_ev_init();
@@ -666,6 +857,8 @@ main_f(va_list ap)
 	swim_test_quit();
 	swim_test_uri_update();
 	swim_test_broadcast();
+	swim_test_payload_basic();
+	swim_test_payload_refutation();
 
 	swim_test_transport_free();
 	swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index d315f181f..8b0ab54aa 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
 	*** main_f ***
-1..15
+1..17
 	*** swim_test_one_link ***
     1..6
     ok 1 - no rounds - no fullmesh
@@ -147,4 +147,34 @@ ok 14 - subtests
     ok 6 - fullmesh is reached, and no one link was added explicitly
 ok 15 - subtests
 	*** swim_test_broadcast: done ***
+	*** swim_test_payload_basic ***
+    1..11
+    ok 1 - no payload by default
+    ok 2 - can not set too big payload
+    ok 3 - diag says too big
+    ok 4 - payload is set
+    ok 5 - incarnation is incremeted on each payload update
+    ok 6 - payload is successfully obtained back
+    ok 7 - payload is disseminated
+    ok 8 - payload is changed
+    ok 9 - incarnation is incremeted on each payload update
+    ok 10 - second payload is disseminated
+    ok 11 - third payload is disseminated via anti-entropy
+ok 16 - subtests
+	*** swim_test_payload_basic: done ***
+	*** swim_test_payload_refutation ***
+    1..11
+    ok 1 - S2 sees new incarnation of S1
+    ok 2 - S3 does the same
+    ok 3 - but S2 does not known the new payload
+    ok 4 - as well as S3
+    ok 5 - S2 learned S1's payload via anti-entropy
+    ok 6 - incarnation still is the same
+    ok 7 - S3 was blocked and does not know anything
+    ok 8 - incarnation still is the same
+    ok 9 - S2 keeps the same new S1's payload, S3 did not rewrite it
+    ok 10 - S3 still does not know anything
+    ok 11 - S3 learns S1's payload from S2
+ok 17 - subtests
+	*** swim_test_payload_refutation: done ***
 	*** main_f: done ***
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index fd528d166..45570cce5 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -191,6 +191,27 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id,
 	return swim_member_incarnation(m);
 }
 
+const char *
+swim_cluster_member_payload(struct swim_cluster *cluster, int node_id,
+			    int member_id, uint16_t *size)
+{
+	const struct swim_member *m =
+		swim_cluster_member_view(cluster, node_id, member_id);
+	if (m == NULL) {
+		*size = 0;
+		return NULL;
+	}
+	return swim_member_payload(m, size);
+}
+
+int
+swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
+				const char *payload, uint16_t size)
+{
+	struct swim *s = swim_cluster_node(cluster, i);
+	return swim_set_payload(s, payload, size);
+}
+
 struct swim *
 swim_cluster_node(struct swim_cluster *cluster, int i)
 {
@@ -506,6 +527,13 @@ struct swim_member_template {
 	 */
 	bool need_check_incarnation;
 	uint64_t incarnation;
+	/**
+	 * True, if the payload should be checked to be equal to
+	 * @a payload of size @a payload_size.
+	 */
+	bool need_check_payload;
+	const char *payload;
+	uint16_t payload_size;
 };
 
 /** Build member template. No checks are set. */
@@ -542,6 +570,19 @@ swim_member_template_set_incarnation(struct swim_member_template *t,
 	t->incarnation = incarnation;
 }
 
+/**
+ * Set that the member template should be used to check member
+ * status.
+ */
+static inline void
+swim_member_template_set_payload(struct swim_member_template *t,
+				 const char *payload, uint16_t payload_size)
+{
+	t->need_check_payload = true;
+	t->payload = payload;
+	t->payload_size = payload_size;
+}
+
 /** Callback to check that a member matches a template. */
 static bool
 swim_loop_check_member(struct swim_cluster *cluster, void *data)
@@ -551,17 +592,26 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data)
 		swim_cluster_member_view(cluster, t->node_id, t->member_id);
 	enum swim_member_status status;
 	uint64_t incarnation;
+	const char *payload;
+	uint16_t payload_size;
 	if (m != NULL) {
 		status = swim_member_status(m);
 		incarnation = swim_member_incarnation(m);
+		payload = swim_member_payload(m, &payload_size);
 	} else {
 		status = swim_member_status_MAX;
 		incarnation = 0;
+		payload = NULL;
+		payload_size = 0;
 	}
 	if (t->need_check_status && status != t->status)
 		return false;
 	if (t->need_check_incarnation && incarnation != t->incarnation)
 		return false;
+	if (t->need_check_payload &&
+	    (payload_size != t->payload_size ||
+	     memcmp(payload, t->payload, payload_size) != 0))
+		return false;
 	return true;
 }
 
@@ -644,6 +694,18 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id,
 				 swim_loop_check_member_everywhere, &t);
 }
 
+int
+swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
+				     int member_id, const char *payload,
+				     uint16_t payload_size, double timeout)
+{
+	struct swim_member_template t;
+	swim_member_template_create(&t, -1, member_id);
+	swim_member_template_set_payload(&t, payload, payload_size);
+	return swim_wait_timeout(timeout, cluster,
+				 swim_loop_check_member_everywhere, &t);
+}
+
 bool
 swim_error_check_match(const char *msg)
 {
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 6ea136e36..100a67e0c 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -141,6 +141,14 @@ uint64_t
 swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id,
 				int member_id);
 
+const char *
+swim_cluster_member_payload(struct swim_cluster *cluster, int node_id,
+			    int member_id, uint16_t *size);
+
+int
+swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
+				const char *payload, uint16_t size);
+
 /**
  * Check if in the cluster every instance knowns the about other
  * instances.
@@ -192,6 +200,16 @@ swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id,
 			      int member_id, uint64_t incarnation,
 			      double timeout);
 
+/**
+ * Wait until a member with id @a member_id is seen with
+ * @a payload of size @a payload_size in the membership table of
+ * every instance in @a cluster. At most @a timeout seconds.
+ */
+int
+swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
+				     int member_id, const char *payload,
+				     uint16_t payload_size, double timeout);
+
 /** Process SWIM events for @a duration fake seconds. */
 void
 swim_run_for(double duration);
-- 
2.17.2 (Apple Git-113)





More information about the Tarantool-patches mailing list