[tarantool-patches] [PATCH 6/6] swim: introduce payload
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Fri Apr 12 01:22:30 MSK 2019
Payload is arbitrary user data disseminated over the cluster
along with other member attributes.
Part of #3234
---
src/lib/swim/swim.c | 155 ++++++++++++++++++++++++++--
src/lib/swim/swim.h | 8 ++
src/lib/swim/swim_proto.c | 31 +++++-
src/lib/swim/swim_proto.h | 41 +++++++-
test/unit/swim.c | 195 +++++++++++++++++++++++++++++++++++-
test/unit/swim.result | 32 +++++-
test/unit/swim_test_utils.c | 62 ++++++++++++
test/unit/swim_test_utils.h | 18 ++++
8 files changed, 525 insertions(+), 17 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 2dac6eedd..2be1c846b 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -308,6 +308,38 @@ struct swim_member {
* allow other members to learn the dead status.
*/
int status_ttl;
+ /** Arbitrary user data, disseminated on each change. */
+ char *payload;
+ /** Payload size, in bytes. */
+ uint16_t payload_size;
+ /**
+ * True, if the payload is thought to be of the most
+ * actual version. In such a case it can be disseminated
+ * farther. Otherwise @a payload is suspected to be
+ * outdated and can be updated in two cases only:
+ *
+ * 1) when it is received with a bigger incarnation from
+ * anywhere;
+ *
+ * 2) when it is received with the same incarnation, but
+ * local payload is outdated.
+ *
+ * A payload can become outdated, if anyhow a new
+ * incarnation of the member has been learned, but not a
+ * new payload. In such a case it can't be said exactly
+ * whether the member has updated payload, or another
+ * attribute. The only way here is to wait until the most
+ * actual payload will be received from another instance.
+ * Note, that such an instance always exists - the payload
+ * originator instance.
+ */
+ bool is_payload_up_to_date;
+ /**
+ * TTL of payload. At most this number of times payload is
+ * sent as a part of dissemination component. Reset on
+ * each payload update.
+ */
+ int payload_ttl;
/**
* All created events are put into a queue sorted by event
* time.
@@ -523,6 +555,34 @@ swim_by_scheduler(struct swim_scheduler *scheduler)
return container_of(scheduler, struct swim, scheduler);
}
+/** Update member's payload, register a corresponding event. */
+static inline int
+swim_update_member_payload(struct swim *swim, struct swim_member *member,
+ const char *payload, uint16_t payload_size,
+ int incarnation_increment)
+{
+ assert(payload_size <= MAX_PAYLOAD_SIZE);
+ char *new_payload;
+ if (payload_size > 0) {
+ new_payload = (char *) realloc(member->payload, payload_size);
+ if (new_payload == NULL) {
+ diag_set(OutOfMemory, payload_size, "realloc", "new_payload");
+ return -1;
+ }
+ memcpy(new_payload, payload, payload_size);
+ } else {
+ free(member->payload);
+ new_payload = NULL;
+ }
+ member->payload = new_payload;
+ member->payload_size = payload_size;
+ member->payload_ttl = mh_size(swim->members);
+ member->incarnation += incarnation_increment;
+ member->is_payload_up_to_date = true;
+ swim_on_member_update(swim, member);
+ return 0;
+}
+
/**
* Once a ping is sent, the member should start waiting for an
* ACK.
@@ -556,6 +616,7 @@ swim_member_delete(struct swim_member *member)
/* Dissemination component. */
assert(rlist_empty(&member->in_dissemination_queue));
+ free(member->payload);
free(member);
}
@@ -637,7 +698,7 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid)
static struct swim_member *
swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
const struct tt_uuid *uuid, enum swim_member_status status,
- uint64_t incarnation)
+ uint64_t incarnation, const char *payload, int payload_size)
{
int new_bsize = sizeof(swim->shuffled[0]) *
(mh_size(swim->members) + 1);
@@ -675,6 +736,12 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
/* Dissemination component. */
swim_on_member_update(swim, member);
+ if (payload_size >= 0 &&
+ swim_update_member_payload(swim, member, payload,
+ payload_size, 0) != 0) {
+ swim_delete_member(swim, member);
+ return NULL;
+ }
say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim),
swim_uuid_str(&member->uuid), mh_size(swim->members));
@@ -739,17 +806,29 @@ swim_new_round(struct swim *swim)
*/
static int
swim_encode_member(struct swim_packet *packet, struct swim_member *m,
- struct swim_passport_bin *passport)
+ struct swim_passport_bin *passport,
+ struct swim_member_payload_bin *payload_header,
+ bool is_payload_needed)
{
/* The headers should be initialized. */
assert(passport->k_status == SWIM_MEMBER_STATUS);
+ assert(payload_header->k_payload == SWIM_MEMBER_PAYLOAD);
int size = sizeof(*passport);
+ if (is_payload_needed)
+ size += sizeof(*payload_header) + m->payload_size;
char *pos = swim_packet_alloc(packet, size);
if (pos == NULL)
return -1;
swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status,
- m->incarnation);
+ m->incarnation, is_payload_needed);
memcpy(pos, passport, sizeof(*passport));
+ if (is_payload_needed) {
+ pos += sizeof(*passport);
+ swim_member_payload_bin_fill(payload_header, m->payload_size);
+ memcpy(pos, payload_header, sizeof(*payload_header));
+ pos += sizeof(*payload_header);
+ memcpy(pos, m->payload, m->payload_size);
+ }
return 0;
}
@@ -763,17 +842,21 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
{
struct swim_anti_entropy_header_bin ae_header_bin;
struct swim_passport_bin passport_bin;
+ struct swim_member_payload_bin payload_header;
char *header = swim_packet_alloc(packet, sizeof(ae_header_bin));
if (header == NULL)
return 0;
swim_passport_bin_create(&passport_bin);
+ swim_member_payload_bin_create(&payload_header);
struct mh_swim_table_t *t = swim->members;
int i = 0, member_count = mh_size(t);
int rnd = swim_scaled_rand(0, member_count - 1);
for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
i < member_count; ++i) {
struct swim_member *m = *mh_swim_table_node(t, rc);
- if (swim_encode_member(packet, m, &passport_bin) != 0)
+ if (swim_encode_member(packet, m, &passport_bin,
+ &payload_header,
+ m->is_payload_up_to_date) != 0)
break;
/*
* First random member could be chosen too close
@@ -833,16 +916,21 @@ static int
swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
{
struct swim_diss_header_bin diss_header_bin;
+ struct swim_member_payload_bin payload_header;
struct swim_passport_bin passport_bin;
char *header = swim_packet_alloc(packet, sizeof(diss_header_bin));
if (header == NULL)
return 0;
swim_passport_bin_create(&passport_bin);
+ swim_member_payload_bin_create(&payload_header);
int i = 0;
struct swim_member *m;
rlist_foreach_entry(m, &swim->dissemination_queue,
in_dissemination_queue) {
- if (swim_encode_member(packet, m, &passport_bin) != 0)
+ bool is_payload_needed = m->payload_ttl > 0 &&
+ m->is_payload_up_to_date;
+ if (swim_encode_member(packet, m, &passport_bin,
+ &payload_header, is_payload_needed) != 0)
break;
++i;
}
@@ -890,6 +978,10 @@ swim_decrease_event_ttl(struct swim *swim)
rlist_foreach_entry_safe(member, &swim->dissemination_queue,
in_dissemination_queue,
tmp) {
+ if (member->payload_ttl > 0) {
+ if (--member->payload_ttl == 0)
+ swim_cached_round_msg_invalidate(swim);
+ }
if (--member->status_ttl == 0) {
rlist_del_entry(member, in_dissemination_queue);
swim_cached_round_msg_invalidate(swim);
@@ -1065,8 +1157,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def,
{
assert(member != swim->self);
assert(def->incarnation >= member->incarnation);
- if (def->incarnation > member->incarnation)
+ /*
+ * Payload update rules are simple: it can be updated
+ * either if the new payload has a bigger incarnation, or
+ * the same incarnation, but local payload is outdated.
+ */
+ bool is_payload_needed = false;
+ if (def->incarnation > member->incarnation) {
swim_update_member_addr(swim, member, &def->addr, 0);
+ if (def->payload_size >= 0) {
+ is_payload_needed = true;
+ } else if (member->is_payload_up_to_date) {
+ member->is_payload_up_to_date = false;
+ swim_on_member_update(swim, member);
+ }
+ } else if (! member->is_payload_up_to_date && def->payload_size >= 0) {
+ is_payload_needed = true;
+ }
+ if (is_payload_needed &&
+ swim_update_member_payload(swim, member, def->payload,
+ def->payload_size, 0) != 0) {
+ /* Not such a critical error. */
+ diag_log();
+ }
swim_update_member_inc_status(swim, member, def->status,
def->incarnation);
}
@@ -1107,7 +1220,8 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
goto skip;
}
*result = swim_new_member(swim, &def->addr, &def->uuid,
- def->status, def->incarnation);
+ def->status, def->incarnation,
+ def->payload, def->payload_size);
return *result != NULL ? 0 : -1;
}
*result = member;
@@ -1436,7 +1550,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
}
swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE,
- 0);
+ 0, NULL, 0);
if (swim->self == NULL)
return -1;
} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
@@ -1448,7 +1562,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
}
new_self = swim_new_member(swim, &swim->self->addr, uuid,
- MEMBER_ALIVE, 0);
+ MEMBER_ALIVE, 0, swim->self->payload,
+ swim->self->payload_size);
if (new_self == NULL)
return -1;
}
@@ -1504,6 +1619,18 @@ swim_is_configured(const struct swim *swim)
return swim->self != NULL;
}
+int
+swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size)
+{
+ if (payload_size > MAX_PAYLOAD_SIZE) {
+ diag_set(IllegalParams, "Payload should be <= %d",
+ MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ return swim_update_member_payload(swim, swim->self, payload,
+ payload_size, 1);
+}
+
int
swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
{
@@ -1518,7 +1645,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
return -1;
struct swim_member *member = swim_find_member(swim, uuid);
if (member == NULL) {
- member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0);
+ member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0,
+ NULL, -1);
return member == NULL ? -1 : 0;
}
diag_set(SwimError, "%s a member with such UUID already exists",
@@ -1754,3 +1882,10 @@ swim_member_incarnation(const struct swim_member *member)
{
return member->incarnation;
}
+
+const char *
+swim_member_payload(const struct swim_member *member, uint16_t *size)
+{
+ *size = member->payload_size;
+ return member->payload;
+}
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 09d933b83..6a219d131 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -104,6 +104,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
double
swim_ack_timeout(const struct swim *swim);
+/** Set payload to disseminate over the cluster. */
+int
+swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size);
+
/**
* Stop listening and broadcasting messages, cleanup all internal
* structures, free memory.
@@ -220,6 +224,10 @@ swim_member_uuid(const struct swim_member *member);
uint64_t
swim_member_incarnation(const struct swim_member *member);
+/** Member's payload. */
+const char *
+swim_member_payload(const struct swim_member *member, uint16_t *size);
+
#if defined(__cplusplus)
}
#endif
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index d84550663..16034a25b 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -160,6 +160,7 @@ swim_member_def_create(struct swim_member_def *def)
memset(def, 0, sizeof(*def));
def->addr.sin_family = AF_INET;
def->status = MEMBER_ALIVE;
+ def->payload_size = -1;
}
/**
@@ -179,6 +180,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
struct swim_member_def *def)
{
uint64_t tmp;
+ uint32_t len;
switch (key) {
case SWIM_MEMBER_STATUS:
if (swim_decode_uint(pos, end, &tmp, prefix,
@@ -210,6 +212,17 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
"member incarnation") != 0)
return -1;
break;
+ case SWIM_MEMBER_PAYLOAD:
+ if (swim_decode_bin(&def->payload, &len, pos, end, prefix,
+ "member payload") != 0)
+ return -1;
+ if (len > MAX_PAYLOAD_SIZE) {
+ diag_set(SwimError, "%s member payload size should be "\
+ "<= %d", prefix, MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ def->payload_size = (int) len;
+ break;
default:
unreachable();
}
@@ -330,10 +343,22 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
header->v_anti_entropy = mp_bswap_u16(batch_size);
}
+void
+swim_member_payload_bin_create(struct swim_member_payload_bin *bin)
+{
+ bin->k_payload = SWIM_MEMBER_PAYLOAD;
+ bin->m_payload_size = 0xc5;
+}
+
+void
+swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, uint16_t size)
+{
+ bin->v_payload_size = mp_bswap_u16(size);
+}
+
void
swim_passport_bin_create(struct swim_passport_bin *passport)
{
- passport->m_header = 0x85;
passport->k_status = SWIM_MEMBER_STATUS;
passport->k_addr = SWIM_MEMBER_ADDRESS;
passport->m_addr = 0xce;
@@ -350,8 +375,10 @@ void
swim_passport_bin_fill(struct swim_passport_bin *passport,
const struct sockaddr_in *addr,
const struct tt_uuid *uuid,
- enum swim_member_status status, uint64_t incarnation)
+ enum swim_member_status status, uint64_t incarnation,
+ bool is_payload_needed)
{
+ passport->m_header = 0x85 + is_payload_needed;
passport->v_status = status;
passport->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr));
passport->v_port = mp_bswap_u16(ntohs(addr->sin_port));
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index ab4057185..607ac80dd 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -38,6 +38,11 @@
#include <stdbool.h>
#include "swim_constants.h"
+enum {
+ /** Reserve 272 bytes for headers. */
+ MAX_PAYLOAD_SIZE = 1200,
+};
+
/**
* SWIM binary protocol structures and helpers. Below is a picture
* of a SWIM message template:
@@ -67,7 +72,8 @@
* | SWIM_MEMBER_ADDRESS: uint, ip, |
* | SWIM_MEMBER_PORT: uint, port, |
* | SWIM_MEMBER_UUID: 16 byte UUID, |
- * | SWIM_MEMBER_INCARNATION: uint |
+ * | SWIM_MEMBER_INCARNATION: uint, |
+ * | SWIM_MEMBER_PAYLOAD: bin |
* | }, |
* | ... |
* | ], |
@@ -80,7 +86,8 @@
* | SWIM_MEMBER_ADDRESS: uint, ip, |
* | SWIM_MEMBER_PORT: uint, port, |
* | SWIM_MEMBER_UUID: 16 byte UUID, |
- * | SWIM_MEMBER_INCARNATION: uint |
+ * | SWIM_MEMBER_INCARNATION: uint, |
+ * | SWIM_MEMBER_PAYLOAD: bin |
* | }, |
* | ... |
* | ], |
@@ -103,6 +110,8 @@ struct swim_member_def {
struct sockaddr_in addr;
uint64_t incarnation;
enum swim_member_status status;
+ const char *payload;
+ int payload_size;
};
/** Initialize the definition with default values. */
@@ -242,6 +251,7 @@ enum swim_member_key {
SWIM_MEMBER_PORT,
SWIM_MEMBER_UUID,
SWIM_MEMBER_INCARNATION,
+ SWIM_MEMBER_PAYLOAD,
swim_member_key_MAX,
};
@@ -305,6 +315,30 @@ struct PACKED swim_passport_bin {
uint64_t v_incarnation;
};
+/**
+ * SWIM member's payload header. Payload data should be encoded
+ * right after it.
+ */
+struct PACKED swim_member_payload_bin {
+ /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */
+ uint8_t k_payload;
+ /** mp_encode_bin(16bit bin header) */
+ uint8_t m_payload_size;
+ uint16_t v_payload_size;
+ /** Payload data ... */
+};
+
+/** Initialize payload record. */
+void
+swim_member_payload_bin_create(struct swim_member_payload_bin *bin);
+
+/**
+ * Fill a previously created payload record with an actual size.
+ */
+void
+swim_member_payload_bin_fill(struct swim_member_payload_bin *bin,
+ uint16_t size);
+
/** Initialize a member's binary passport. */
void
swim_passport_bin_create(struct swim_passport_bin *passport);
@@ -319,7 +353,8 @@ void
swim_passport_bin_fill(struct swim_passport_bin *passport,
const struct sockaddr_in *addr,
const struct tt_uuid *uuid,
- enum swim_member_status status, uint64_t incarnation);
+ enum swim_member_status status, uint64_t incarnation,
+ bool is_payload_needed);
/** }}} Anti-entropy component */
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 48aea2f07..10d2b0bbd 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -36,6 +36,7 @@
#include "uri/uri.h"
#include "swim/swim.h"
#include "swim/swim_ev.h"
+#include "swim/swim_proto.h"
#include "swim_test_transport.h"
#include "swim_test_ev.h"
#include "swim_test_utils.h"
@@ -642,10 +643,200 @@ swim_test_broadcast(void)
swim_finish_test();
}
+static void
+swim_test_payload_basic(void)
+{
+ swim_start_test(11);
+ uint16_t size, cluster_size = 3;
+ struct swim_cluster *cluster = swim_cluster_new(cluster_size);
+ for (int i = 0; i < cluster_size; ++i) {
+ for (int j = i + 1; j < cluster_size; ++j)
+ swim_cluster_interconnect(cluster, i, j);
+ }
+ ok(swim_cluster_member_payload(cluster, 0, 0, &size) == NULL &&
+ size == 0, "no payload by default");
+ is(swim_cluster_member_set_payload(cluster, 0, NULL, 1300), -1,
+ "can not set too big payload");
+ ok(swim_error_check_match("Payload should be <="), "diag says too big");
+
+ const char *s0_payload = "S1 payload";
+ uint16_t s0_payload_size = strlen(s0_payload) + 1;
+ is(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+ s0_payload_size), 0,
+ "payload is set");
+ is(swim_cluster_member_incarnation(cluster, 0, 0), 1,
+ "incarnation is incremeted on each payload update");
+ const char *tmp = swim_cluster_member_payload(cluster, 0, 0, &size);
+ ok(size == s0_payload_size && memcmp(s0_payload, tmp, size) == 0,
+ "payload is successfully obtained back");
+
+ is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+ s0_payload_size, cluster_size),
+ 0, "payload is disseminated");
+ s0_payload = "S1 second version of payload";
+ s0_payload_size = strlen(s0_payload) + 1;
+ is(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+ s0_payload_size), 0,
+ "payload is changed");
+ is(swim_cluster_member_incarnation(cluster, 0, 0), 2,
+ "incarnation is incremeted on each payload update");
+ is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+ s0_payload_size, cluster_size),
+ 0, "second payload is disseminated");
+ /*
+ * Test that new incarnations help to rewrite the old
+ * payload from anti-entropy.
+ */
+ swim_cluster_set_drop(cluster, 0, 100);
+ s0_payload = "S1 third version of payload";
+ s0_payload_size = strlen(s0_payload) + 1;
+ fail_if(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+ s0_payload_size) != 0);
+ /* Wait at least one round until payload TTL gets 0. */
+ swim_run_for(3);
+ swim_cluster_set_drop(cluster, 0, 0);
+ is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+ s0_payload_size, cluster_size),
+ 0, "third payload is disseminated via anti-entropy");
+
+ swim_cluster_delete(cluster);
+ swim_finish_test();
+}
+
+static void
+swim_test_payload_refutation(void)
+{
+ swim_start_test(11);
+ uint16_t size, cluster_size = 3;
+ struct swim_cluster *cluster = swim_cluster_new(cluster_size);
+ swim_cluster_set_ack_timeout(cluster, 1);
+ for (int i = 0; i < cluster_size; ++i) {
+ for (int j = i + 1; j < cluster_size; ++j)
+ swim_cluster_interconnect(cluster, i, j);
+ }
+ const char *s0_old_payload = "s0 payload";
+ uint16_t s0_old_payload_size = strlen(s0_old_payload) + 1;
+ fail_if(swim_cluster_member_set_payload(cluster, 0, s0_old_payload,
+ s0_old_payload_size) != 0);
+ fail_if(swim_cluster_wait_payload_everywhere(cluster, 0, s0_old_payload,
+ s0_old_payload_size,
+ 3) != 0);
+ /*
+ * The test checks the following case. Assume there are 3
+ * nodes: S1, S2, S3. They all know each other. S1 sets
+ * new payload, S2 and S3 knows that. They all see that S1
+ * has incarnation 1 and payload P1.
+ *
+ * Now S1 changes payload to P2. Its incarnation becomes
+ * 2. During next entire round its round messages are
+ * lost, however ACKs work ok.
+ */
+ const char *s0_new_payload = "s0 second payload";
+ uint16_t s0_new_payload_size = strlen(s0_new_payload);
+ fail_if(swim_cluster_member_set_payload(cluster, 0, s0_new_payload,
+ s0_new_payload_size) != 0);
+ int components[2] = {SWIM_DISSEMINATION, SWIM_ANTI_ENTROPY};
+ swim_cluster_drop_components(cluster, 0, components, 2);
+ swim_run_for(3);
+ swim_cluster_drop_components(cluster, 0, NULL, 0);
+
+ is(swim_cluster_member_incarnation(cluster, 1, 0), 2,
+ "S2 sees new incarnation of S1");
+ is(swim_cluster_member_incarnation(cluster, 2, 0), 2,
+ "S3 does the same");
+
+ const char *tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+ ok(size == s0_old_payload_size &&
+ memcmp(tmp, s0_old_payload, size) == 0,
+ "but S2 does not known the new payload");
+
+ tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+ ok(size == s0_old_payload_size &&
+ memcmp(tmp, s0_old_payload, size) == 0,
+ "as well as S3");
+
+ /* Restore normal ACK timeout. */
+ swim_cluster_set_ack_timeout(cluster, 30);
+
+ /*
+ * Now S1's payload TTL is 0, but via ACKs S1 sent its new
+ * incarnation to S2 and S3. Despite that they should
+ * apply new S1's payload via anti-entropy. Next lines
+ * test that:
+ *
+ * 1) S2 can apply new S1's payload from S1's
+ * anti-entropy;
+ *
+ * 2) S2 will not receive the old S1's payload from S3.
+ * S3 knows, that its payload is outdated, and should
+ * not send it;
+ *
+ * 2) S3 can apply new S1's payload from S2's
+ * anti-entropy. Note, that here S3 applies the payload
+ * not directly from the originator. It is the most
+ * complex case.
+ *
+ * Next lines test the case (1).
+ */
+
+ /* S3 does not participate in the test (1). */
+ swim_cluster_set_drop(cluster, 2, 100);
+ swim_run_for(3);
+
+ tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+ ok(size == s0_new_payload_size &&
+ memcmp(tmp, s0_new_payload, size) == 0,
+ "S2 learned S1's payload via anti-entropy");
+ is(swim_cluster_member_incarnation(cluster, 1, 0), 2,
+ "incarnation still is the same");
+
+ tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+ ok(size == s0_old_payload_size &&
+ memcmp(tmp, s0_old_payload, size) == 0,
+ "S3 was blocked and does not know anything");
+ is(swim_cluster_member_incarnation(cluster, 2, 0), 2,
+ "incarnation still is the same");
+
+ /* S1 will not participate in the tests further. */
+ swim_cluster_set_drop(cluster, 0, 100);
+
+ /*
+ * Now check the case (2) - S3 will not send outdated
+ * version of S1's payload. To maintain the experimental
+ * integrity S1 and S2 are silent. Only S3 sends packets.
+ */
+ swim_cluster_set_drop(cluster, 2, 0);
+ swim_cluster_set_drop_out(cluster, 1, 100);
+ swim_run_for(3);
+
+ tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+ ok(size == s0_new_payload_size &&
+ memcmp(tmp, s0_new_payload, size) == 0,
+ "S2 keeps the same new S1's payload, S3 did not rewrite it");
+
+ tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+ ok(size == s0_old_payload_size &&
+ memcmp(tmp, s0_old_payload, size) == 0,
+ "S3 still does not know anything");
+
+ /*
+ * Now check the case (3) - S3 accepts new S1's payload
+ * from S2. Even knowing the same S1's incarnation.
+ */
+ swim_cluster_set_drop(cluster, 1, 0);
+ swim_cluster_set_drop_out(cluster, 2, 100);
+ is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_new_payload,
+ s0_new_payload_size, 3), 0,
+ "S3 learns S1's payload from S2")
+
+ swim_cluster_delete(cluster);
+ swim_finish_test();
+}
+
static int
main_f(va_list ap)
{
- swim_start_test(15);
+ swim_start_test(17);
(void) ap;
swim_test_ev_init();
@@ -666,6 +857,8 @@ main_f(va_list ap)
swim_test_quit();
swim_test_uri_update();
swim_test_broadcast();
+ swim_test_payload_basic();
+ swim_test_payload_refutation();
swim_test_transport_free();
swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index d315f181f..8b0ab54aa 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
*** main_f ***
-1..15
+1..17
*** swim_test_one_link ***
1..6
ok 1 - no rounds - no fullmesh
@@ -147,4 +147,34 @@ ok 14 - subtests
ok 6 - fullmesh is reached, and no one link was added explicitly
ok 15 - subtests
*** swim_test_broadcast: done ***
+ *** swim_test_payload_basic ***
+ 1..11
+ ok 1 - no payload by default
+ ok 2 - can not set too big payload
+ ok 3 - diag says too big
+ ok 4 - payload is set
+ ok 5 - incarnation is incremeted on each payload update
+ ok 6 - payload is successfully obtained back
+ ok 7 - payload is disseminated
+ ok 8 - payload is changed
+ ok 9 - incarnation is incremeted on each payload update
+ ok 10 - second payload is disseminated
+ ok 11 - third payload is disseminated via anti-entropy
+ok 16 - subtests
+ *** swim_test_payload_basic: done ***
+ *** swim_test_payload_refutation ***
+ 1..11
+ ok 1 - S2 sees new incarnation of S1
+ ok 2 - S3 does the same
+ ok 3 - but S2 does not known the new payload
+ ok 4 - as well as S3
+ ok 5 - S2 learned S1's payload via anti-entropy
+ ok 6 - incarnation still is the same
+ ok 7 - S3 was blocked and does not know anything
+ ok 8 - incarnation still is the same
+ ok 9 - S2 keeps the same new S1's payload, S3 did not rewrite it
+ ok 10 - S3 still does not know anything
+ ok 11 - S3 learns S1's payload from S2
+ok 17 - subtests
+ *** swim_test_payload_refutation: done ***
*** main_f: done ***
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index fd528d166..45570cce5 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -191,6 +191,27 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id,
return swim_member_incarnation(m);
}
+const char *
+swim_cluster_member_payload(struct swim_cluster *cluster, int node_id,
+ int member_id, uint16_t *size)
+{
+ const struct swim_member *m =
+ swim_cluster_member_view(cluster, node_id, member_id);
+ if (m == NULL) {
+ *size = 0;
+ return NULL;
+ }
+ return swim_member_payload(m, size);
+}
+
+int
+swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
+ const char *payload, uint16_t size)
+{
+ struct swim *s = swim_cluster_node(cluster, i);
+ return swim_set_payload(s, payload, size);
+}
+
struct swim *
swim_cluster_node(struct swim_cluster *cluster, int i)
{
@@ -506,6 +527,13 @@ struct swim_member_template {
*/
bool need_check_incarnation;
uint64_t incarnation;
+ /**
+ * True, if the payload should be checked to be equal to
+ * @a payload of size @a payload_size.
+ */
+ bool need_check_payload;
+ const char *payload;
+ uint16_t payload_size;
};
/** Build member template. No checks are set. */
@@ -542,6 +570,19 @@ swim_member_template_set_incarnation(struct swim_member_template *t,
t->incarnation = incarnation;
}
+/**
+ * Set that the member template should be used to check member
+ * status.
+ */
+static inline void
+swim_member_template_set_payload(struct swim_member_template *t,
+ const char *payload, uint16_t payload_size)
+{
+ t->need_check_payload = true;
+ t->payload = payload;
+ t->payload_size = payload_size;
+}
+
/** Callback to check that a member matches a template. */
static bool
swim_loop_check_member(struct swim_cluster *cluster, void *data)
@@ -551,17 +592,26 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data)
swim_cluster_member_view(cluster, t->node_id, t->member_id);
enum swim_member_status status;
uint64_t incarnation;
+ const char *payload;
+ uint16_t payload_size;
if (m != NULL) {
status = swim_member_status(m);
incarnation = swim_member_incarnation(m);
+ payload = swim_member_payload(m, &payload_size);
} else {
status = swim_member_status_MAX;
incarnation = 0;
+ payload = NULL;
+ payload_size = 0;
}
if (t->need_check_status && status != t->status)
return false;
if (t->need_check_incarnation && incarnation != t->incarnation)
return false;
+ if (t->need_check_payload &&
+ (payload_size != t->payload_size ||
+ memcmp(payload, t->payload, payload_size) != 0))
+ return false;
return true;
}
@@ -644,6 +694,18 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id,
swim_loop_check_member_everywhere, &t);
}
+int
+swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
+ int member_id, const char *payload,
+ uint16_t payload_size, double timeout)
+{
+ struct swim_member_template t;
+ swim_member_template_create(&t, -1, member_id);
+ swim_member_template_set_payload(&t, payload, payload_size);
+ return swim_wait_timeout(timeout, cluster,
+ swim_loop_check_member_everywhere, &t);
+}
+
bool
swim_error_check_match(const char *msg)
{
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 6ea136e36..100a67e0c 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -141,6 +141,14 @@ uint64_t
swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id,
int member_id);
+const char *
+swim_cluster_member_payload(struct swim_cluster *cluster, int node_id,
+ int member_id, uint16_t *size);
+
+int
+swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
+ const char *payload, uint16_t size);
+
/**
* Check if in the cluster every instance knowns the about other
* instances.
@@ -192,6 +200,16 @@ swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id,
int member_id, uint64_t incarnation,
double timeout);
+/**
+ * Wait until a member with id @a member_id is seen with
+ * @a payload of size @a payload_size in the membership table of
+ * every instance in @a cluster. At most @a timeout seconds.
+ */
+int
+swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
+ int member_id, const char *payload,
+ uint16_t payload_size, double timeout);
+
/** Process SWIM events for @a duration fake seconds. */
void
swim_run_for(double duration);
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list