[tarantool-patches] Re: [PATCH 6/6] swim: introduce payload
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Apr 18 20:43:40 MSK 2019
On 18/04/2019 18:12, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [19/04/12 01:25]:
>> Payload is arbitrary user data disseminated over the cluster
>> along with other member attributes.
>> Part of #3234
>> ---
>> src/lib/swim/swim.c | 155 ++++++++++++++++++++++++++--
>> src/lib/swim/swim.h | 8 ++
>> src/lib/swim/swim_proto.c | 31 +++++-
>> src/lib/swim/swim_proto.h | 41 +++++++-
>> test/unit/swim.c | 195 +++++++++++++++++++++++++++++++++++-
>> test/unit/swim.result | 32 +++++-
>> test/unit/swim_test_utils.c | 62 ++++++++++++
>> test/unit/swim_test_utils.h | 18 ++++
>> 8 files changed, 525 insertions(+), 17 deletions(-)>> + * whether the member has updated payload, or another
>> + * attribute. The only way here is to wait until the most
>> + * actual payload will be received from another instance.
>> + * Note, that such an instance always exists - the payload
>> + * originator instance.
>> + */
>> + bool is_payload_up_to_date;
>> + /**
>> + * TTL of payload. At most this number of times payload is
>> + * sent as a part of dissemination component. Reset on
>> + * each payload update.
>> + */
>> + int payload_ttl;
>
> As agreed, let's rename ttl to ttd across the board and update the
> comments to say that ttd is time to disseminate.
Cool, done in a separate commit.
>
>> +/** Update member's payload, register a corresponding event. */
>> +static inline int
>> +swim_update_member_payload(struct swim *swim, struct swim_member *member,
>> + const char *payload, uint16_t payload_size,
>> + int incarnation_increment)
>
> Passing incarnation_increment raises a lot of questions when
> reading the code.
We already use it for address update, it simplified a lot of things,
and most importantly encapsulated incarnation + TTD set into one place.
I tried to avoid this argument for address, but it looked much worse,
when TTD is reset in too many places - it is easy to miss something.
The main goal I pursued was encapsulation of incarnation update and
event registration on small attribute changes. Now I have these
functions hiding that task inside:
swim_update_member_inc_status
swim_update_member_payload
swim_update_member_addr
With your way incarnation and member attribute update business will
spread among literally every function touching members.
> I would not use this function from the
> constructor - I don't see any issue in copy-pasting 3 lines of
> code into the constructor to make both branches simpler.
It is not 3 lines, generally it is 6-7 lines per each swim_new_member()
to call update_payload() afterward. I have 4 invocations of swim_new_member(),
two of them pass payload and its size. Without encapsulated processing of
payload update I would be forced to add more checks into two these two
places with larger code duplication than you thought, which I would not
want to do. These places are swim_upsert_member() and swim_cfg() - the
functions are very sensitive to code duplication in terms of readability.
During SWIM development I tried to keep them as clean and short as possible.
I tried your way right now again, but it just looks much worse. Dropped.
>
> Or I would ban passing payload to the constructor and make this
> function public.
>
>> static int
>> swim_encode_member(struct swim_packet *packet, struct swim_member *m,
>> - struct swim_passport_bin *passport)
>> + struct swim_passport_bin *passport,
>> + struct swim_member_payload_bin *payload_header,
>> + bool is_payload_needed)
>
> Please rename is_payload_needed -> encode_payload.
I followed our rules on flags naming: is_<...>. But ok, done.
> There is no
> comment for encode_payload and how it is used.
Added a comment.
> I don't see why you can't move all the decision making about
> whether to encode the payload or not into this function.
>
> The payload should be encoded if:
> - its not null
> - payload ttl is greater than zero
> - it's not trustworthy. (is_payload_up_to_date). I would btw
> rename is_payload_up_to_date to is_payload_trustworthy - this
> name would be closer to truth.
>
> Why can't you look at these conditions once in swim_encode_member
> rather than evaluate them outside this function?
Because anti-entropy and dissemination have different conditions
when payload should be encoded. For anti-entropy it is enough for
payload be up to date. For dissemination TTD > 0 additionally is
required.
Verbally decided to always check for is_payload_up_to_date inside
the function, and use the argument encode_payload for the
dissemination component only.
>
> BTW, there is no harm in encoding an empty payload all the time
> (e.g. mp_bin of size 0). This would make the code simpler - you
> would only need to look at payload_size to see if payload exists.
This is what I do. I encode payload even when it is empty and
up-to-date.
>
>> +int
>> +swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size)
>> +{
>> + if (payload_size > MAX_PAYLOAD_SIZE) {
>> + diag_set(IllegalParams, "Payload should be <= %d",
>> + MAX_PAYLOAD_SIZE);
>> + return -1;
>> + }
>> + return swim_update_member_payload(swim, swim->self, payload,
>> + payload_size, 1);
>> +}
>
> Like I said above, if there is swim_set_paylaod, there is no need
> to supply payload in swim_new_member().
Swim_set_payload does not work on foreign members, it takes 'self' only.
During members decoding and update I need to reset payloads of other
members.
>
>> + def->payload_size = -1;
>
> Ugh.
I need a way to detect whether payload was found in a packet
and decoded. It is not sent always, because 1) dissemination
does not send it when TTD == 0 or it is outdated,
2) anti-entropy does not send outdated payloads as well.
>
>
> --
> Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
> http://tarantool.io - www.twitter.com/kostja_osipov
>
A new version:
===================================================================
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 3e64e4c91..22760cdd7 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -309,6 +309,44 @@ struct swim_member {
* allow other members to learn the dead status.
*/
int status_ttd;
+ /** Arbitrary user data, disseminated on each change. */
+ char *payload;
+ /** Payload size, in bytes. */
+ uint16_t payload_size;
+ /**
+ * True, if the payload is thought to be of the most
+ * actual version. In such a case it can be disseminated
+ * further. Otherwise @a payload is suspected to be
+ * outdated and can be updated in two cases only:
+ *
+ * 1) when it is received with a bigger incarnation from
+ * anywhere;
+ *
+ * 2) when it is received with the same incarnation, but
+ * local payload is outdated.
+ *
+ * A payload can become outdated, if anyhow a new
+ * incarnation of the member has been learned, but not a
+ * new payload. For example, a message with new payload
+ * could be lost, and at the same time this instance
+ * responds to a ping with newly incarnated ack. The ack
+ * receiver will learn the new incarnation, but not the
+ * new payload.
+ *
+ * In this case it can't be said exactly whether the
+ * member has updated payload, or another attribute. The
+ * only way here is to wait until the most actual payload
+ * will be received from another instance. Note, that such
+ * an instance always exists - the payload originator
+ * instance.
+ */
+ bool is_payload_up_to_date;
+ /**
+ * TTD of payload. At most this number of times payload is
+ * sent as a part of dissemination component. Reset on
+ * each payload update.
+ */
+ int payload_ttd;
/**
* All created events are put into a queue sorted by event
* time.
@@ -524,6 +562,34 @@ swim_by_scheduler(struct swim_scheduler *scheduler)
return container_of(scheduler, struct swim, scheduler);
}
+/** Update member's payload, register a corresponding event. */
+static inline int
+swim_update_member_payload(struct swim *swim, struct swim_member *member,
+ const char *payload, uint16_t payload_size,
+ int incarnation_increment)
+{
+ assert(payload_size <= MAX_PAYLOAD_SIZE);
+ char *new_payload;
+ if (payload_size > 0) {
+ new_payload = (char *) realloc(member->payload, payload_size);
+ if (new_payload == NULL) {
+ diag_set(OutOfMemory, payload_size, "realloc", "new_payload");
+ return -1;
+ }
+ memcpy(new_payload, payload, payload_size);
+ } else {
+ free(member->payload);
+ new_payload = NULL;
+ }
+ member->payload = new_payload;
+ member->payload_size = payload_size;
+ member->payload_ttd = mh_size(swim->members);
+ member->incarnation += incarnation_increment;
+ member->is_payload_up_to_date = true;
+ swim_on_member_update(swim, member);
+ return 0;
+}
+
/**
* Once a ping is sent, the member should start waiting for an
* ACK.
@@ -557,6 +623,7 @@ swim_member_delete(struct swim_member *member)
/* Dissemination component. */
assert(rlist_empty(&member->in_dissemination_queue));
+ free(member->payload);
free(member);
}
@@ -638,7 +705,7 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid)
static struct swim_member *
swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
const struct tt_uuid *uuid, enum swim_member_status status,
- uint64_t incarnation)
+ uint64_t incarnation, const char *payload, int payload_size)
{
int new_bsize = sizeof(swim->shuffled[0]) *
(mh_size(swim->members) + 1);
@@ -676,6 +743,12 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
/* Dissemination component. */
swim_on_member_update(swim, member);
+ if (payload_size >= 0 &&
+ swim_update_member_payload(swim, member, payload,
+ payload_size, 0) != 0) {
+ swim_delete_member(swim, member);
+ return NULL;
+ }
say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim),
swim_uuid_str(&member->uuid), mh_size(swim->members));
@@ -735,22 +808,40 @@ swim_new_round(struct swim *swim)
/**
* Encode one member into @a packet using @a passport structure.
+ * Note that this function does not make a decision whether
+ * payload should be encoded, because its callers have different
+ * conditions for that. The anti-entropy needs the payload be
+ * up-to-date. The dissemination component additionally needs
+ * TTD > 0.
* @retval 0 Success, encoded.
* @retval -1 Not enough memory in the packet.
*/
static int
swim_encode_member(struct swim_packet *packet, struct swim_member *m,
- struct swim_passport_bin *passport)
+ struct swim_passport_bin *passport,
+ struct swim_member_payload_bin *payload_header,
+ bool encode_payload)
{
/* The headers should be initialized. */
assert(passport->k_status == SWIM_MEMBER_STATUS);
+ assert(payload_header->k_payload == SWIM_MEMBER_PAYLOAD);
int size = sizeof(*passport);
+ encode_payload = encode_payload && m->is_payload_up_to_date;
+ if (encode_payload)
+ size += sizeof(*payload_header) + m->payload_size;
char *pos = swim_packet_alloc(packet, size);
if (pos == NULL)
return -1;
swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status,
- m->incarnation);
+ m->incarnation, encode_payload);
memcpy(pos, passport, sizeof(*passport));
+ if (encode_payload) {
+ pos += sizeof(*passport);
+ swim_member_payload_bin_fill(payload_header, m->payload_size);
+ memcpy(pos, payload_header, sizeof(*payload_header));
+ pos += sizeof(*payload_header);
+ memcpy(pos, m->payload, m->payload_size);
+ }
return 0;
}
@@ -764,17 +855,20 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
{
struct swim_anti_entropy_header_bin ae_header_bin;
struct swim_passport_bin passport_bin;
+ struct swim_member_payload_bin payload_header;
char *header = swim_packet_alloc(packet, sizeof(ae_header_bin));
if (header == NULL)
return 0;
swim_passport_bin_create(&passport_bin);
+ swim_member_payload_bin_create(&payload_header);
struct mh_swim_table_t *t = swim->members;
int i = 0, member_count = mh_size(t);
int rnd = swim_scaled_rand(0, member_count - 1);
for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
i < member_count; ++i) {
struct swim_member *m = *mh_swim_table_node(t, rc);
- if (swim_encode_member(packet, m, &passport_bin) != 0)
+ if (swim_encode_member(packet, m, &passport_bin,
+ &payload_header, true) != 0)
break;
/*
* First random member could be chosen too close
@@ -834,16 +928,20 @@ static int
swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
{
struct swim_diss_header_bin diss_header_bin;
+ struct swim_member_payload_bin payload_header;
struct swim_passport_bin passport_bin;
char *header = swim_packet_alloc(packet, sizeof(diss_header_bin));
if (header == NULL)
return 0;
swim_passport_bin_create(&passport_bin);
+ swim_member_payload_bin_create(&payload_header);
int i = 0;
struct swim_member *m;
rlist_foreach_entry(m, &swim->dissemination_queue,
in_dissemination_queue) {
- if (swim_encode_member(packet, m, &passport_bin) != 0)
+ if (swim_encode_member(packet, m, &passport_bin,
+ &payload_header,
+ m->payload_ttd > 0) != 0)
break;
++i;
}
@@ -891,6 +989,10 @@ swim_decrease_event_ttd(struct swim *swim)
rlist_foreach_entry_safe(member, &swim->dissemination_queue,
in_dissemination_queue,
tmp) {
+ if (member->payload_ttd > 0) {
+ if (--member->payload_ttd == 0)
+ swim_cached_round_msg_invalidate(swim);
+ }
if (--member->status_ttd == 0) {
rlist_del_entry(member, in_dissemination_queue);
swim_cached_round_msg_invalidate(swim);
@@ -1066,8 +1168,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def,
{
assert(member != swim->self);
assert(def->incarnation >= member->incarnation);
- if (def->incarnation > member->incarnation)
+ /*
+ * Payload update rules are simple: it can be updated
+ * either if the new payload has a bigger incarnation, or
+ * the same incarnation, but local payload is outdated.
+ */
+ bool encode_payload = false;
+ if (def->incarnation > member->incarnation) {
swim_update_member_addr(swim, member, &def->addr, 0);
+ if (def->payload_size >= 0) {
+ encode_payload = true;
+ } else if (member->is_payload_up_to_date) {
+ member->is_payload_up_to_date = false;
+ swim_on_member_update(swim, member);
+ }
+ } else if (! member->is_payload_up_to_date && def->payload_size >= 0) {
+ encode_payload = true;
+ }
+ if (encode_payload &&
+ swim_update_member_payload(swim, member, def->payload,
+ def->payload_size, 0) != 0) {
+ /* Not such a critical error. */
+ diag_log();
+ }
swim_update_member_inc_status(swim, member, def->status,
def->incarnation);
}
@@ -1108,7 +1231,8 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
goto skip;
}
*result = swim_new_member(swim, &def->addr, &def->uuid,
- def->status, def->incarnation);
+ def->status, def->incarnation,
+ def->payload, def->payload_size);
return *result != NULL ? 0 : -1;
}
*result = member;
@@ -1437,7 +1561,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
}
swim->self = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE,
- 0);
+ 0, NULL, 0);
if (swim->self == NULL)
return -1;
} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
@@ -1449,7 +1573,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
}
new_self = swim_new_member(swim, &swim->self->addr, uuid,
- MEMBER_ALIVE, 0);
+ MEMBER_ALIVE, 0, swim->self->payload,
+ swim->self->payload_size);
if (new_self == NULL)
return -1;
}
@@ -1505,6 +1630,18 @@ swim_is_configured(const struct swim *swim)
return swim->self != NULL;
}
+int
+swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size)
+{
+ if (payload_size > MAX_PAYLOAD_SIZE) {
+ diag_set(IllegalParams, "Payload should be <= %d",
+ MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ return swim_update_member_payload(swim, swim->self, payload,
+ payload_size, 1);
+}
+
int
swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
{
@@ -1519,7 +1656,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
return -1;
struct swim_member *member = swim_find_member(swim, uuid);
if (member == NULL) {
- member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0);
+ member = swim_new_member(swim, &addr, uuid, MEMBER_ALIVE, 0,
+ NULL, -1);
return member == NULL ? -1 : 0;
}
diag_set(SwimError, "%s a member with such UUID already exists",
@@ -1755,3 +1893,10 @@ swim_member_incarnation(const struct swim_member *member)
{
return member->incarnation;
}
+
+const char *
+swim_member_payload(const struct swim_member *member, uint16_t *size)
+{
+ *size = member->payload_size;
+ return member->payload;
+}
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 09d933b83..6a219d131 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -104,6 +104,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
double
swim_ack_timeout(const struct swim *swim);
+/** Set payload to disseminate over the cluster. */
+int
+swim_set_payload(struct swim *swim, const char *payload, uint16_t payload_size);
+
/**
* Stop listening and broadcasting messages, cleanup all internal
* structures, free memory.
@@ -220,6 +224,10 @@ swim_member_uuid(const struct swim_member *member);
uint64_t
swim_member_incarnation(const struct swim_member *member);
+/** Member's payload. */
+const char *
+swim_member_payload(const struct swim_member *member, uint16_t *size);
+
#if defined(__cplusplus)
}
#endif
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index d84550663..58c9ec119 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -160,6 +160,7 @@ swim_member_def_create(struct swim_member_def *def)
memset(def, 0, sizeof(*def));
def->addr.sin_family = AF_INET;
def->status = MEMBER_ALIVE;
+ def->payload_size = -1;
}
/**
@@ -179,6 +180,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
struct swim_member_def *def)
{
uint64_t tmp;
+ uint32_t len;
switch (key) {
case SWIM_MEMBER_STATUS:
if (swim_decode_uint(pos, end, &tmp, prefix,
@@ -210,6 +212,17 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
"member incarnation") != 0)
return -1;
break;
+ case SWIM_MEMBER_PAYLOAD:
+ if (swim_decode_bin(&def->payload, &len, pos, end, prefix,
+ "member payload") != 0)
+ return -1;
+ if (len > MAX_PAYLOAD_SIZE) {
+ diag_set(SwimError, "%s member payload size should be "\
+ "<= %d", prefix, MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ def->payload_size = (int) len;
+ break;
default:
unreachable();
}
@@ -330,10 +343,22 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
header->v_anti_entropy = mp_bswap_u16(batch_size);
}
+void
+swim_member_payload_bin_create(struct swim_member_payload_bin *bin)
+{
+ bin->k_payload = SWIM_MEMBER_PAYLOAD;
+ bin->m_payload_size = 0xc5;
+}
+
+void
+swim_member_payload_bin_fill(struct swim_member_payload_bin *bin, uint16_t size)
+{
+ bin->v_payload_size = mp_bswap_u16(size);
+}
+
void
swim_passport_bin_create(struct swim_passport_bin *passport)
{
- passport->m_header = 0x85;
passport->k_status = SWIM_MEMBER_STATUS;
passport->k_addr = SWIM_MEMBER_ADDRESS;
passport->m_addr = 0xce;
@@ -350,8 +375,10 @@ void
swim_passport_bin_fill(struct swim_passport_bin *passport,
const struct sockaddr_in *addr,
const struct tt_uuid *uuid,
- enum swim_member_status status, uint64_t incarnation)
+ enum swim_member_status status, uint64_t incarnation,
+ bool encode_payload)
{
+ passport->m_header = 0x85 + encode_payload;
passport->v_status = status;
passport->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr));
passport->v_port = mp_bswap_u16(ntohs(addr->sin_port));
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index e1c70db43..23d339e80 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -38,6 +38,11 @@
#include <stdbool.h>
#include "swim_constants.h"
+enum {
+ /** Reserve 272 bytes for headers. */
+ MAX_PAYLOAD_SIZE = 1200,
+};
+
/**
* SWIM binary protocol structures and helpers. Below is a picture
* of a SWIM message template:
@@ -67,7 +72,8 @@
* | SWIM_MEMBER_ADDRESS: uint, ip, |
* | SWIM_MEMBER_PORT: uint, port, |
* | SWIM_MEMBER_UUID: 16 byte UUID, |
- * | SWIM_MEMBER_INCARNATION: uint |
+ * | SWIM_MEMBER_INCARNATION: uint, |
+ * | SWIM_MEMBER_PAYLOAD: bin |
* | }, |
* | ... |
* | ], |
@@ -80,7 +86,8 @@
* | SWIM_MEMBER_ADDRESS: uint, ip, |
* | SWIM_MEMBER_PORT: uint, port, |
* | SWIM_MEMBER_UUID: 16 byte UUID, |
- * | SWIM_MEMBER_INCARNATION: uint |
+ * | SWIM_MEMBER_INCARNATION: uint, |
+ * | SWIM_MEMBER_PAYLOAD: bin |
* | }, |
* | ... |
* | ], |
@@ -103,6 +110,8 @@ struct swim_member_def {
struct sockaddr_in addr;
uint64_t incarnation;
enum swim_member_status status;
+ const char *payload;
+ int payload_size;
};
/** Initialize the definition with default values. */
@@ -242,6 +251,7 @@ enum swim_member_key {
SWIM_MEMBER_PORT,
SWIM_MEMBER_UUID,
SWIM_MEMBER_INCARNATION,
+ SWIM_MEMBER_PAYLOAD,
swim_member_key_MAX,
};
@@ -305,6 +315,30 @@ struct PACKED swim_passport_bin {
uint64_t v_incarnation;
};
+/**
+ * SWIM member's payload header. Payload data should be encoded
+ * right after it.
+ */
+struct PACKED swim_member_payload_bin {
+ /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */
+ uint8_t k_payload;
+ /** mp_encode_bin(16bit bin header) */
+ uint8_t m_payload_size;
+ uint16_t v_payload_size;
+ /** Payload data ... */
+};
+
+/** Initialize payload record. */
+void
+swim_member_payload_bin_create(struct swim_member_payload_bin *bin);
+
+/**
+ * Fill a previously created payload record with an actual size.
+ */
+void
+swim_member_payload_bin_fill(struct swim_member_payload_bin *bin,
+ uint16_t size);
+
/** Initialize a member's binary passport. */
void
swim_passport_bin_create(struct swim_passport_bin *passport);
@@ -319,7 +353,8 @@ void
swim_passport_bin_fill(struct swim_passport_bin *passport,
const struct sockaddr_in *addr,
const struct tt_uuid *uuid,
- enum swim_member_status status, uint64_t incarnation);
+ enum swim_member_status status, uint64_t incarnation,
+ bool encode_payload);
/** }}} Anti-entropy component */
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 6f3871606..0b8058f0b 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -36,6 +36,7 @@
#include "uri/uri.h"
#include "swim/swim.h"
#include "swim/swim_ev.h"
+#include "swim/swim_proto.h"
#include "swim_test_transport.h"
#include "swim_test_ev.h"
#include "swim_test_utils.h"
@@ -642,10 +643,200 @@ swim_test_broadcast(void)
swim_finish_test();
}
+static void
+swim_test_payload_basic(void)
+{
+ swim_start_test(11);
+ uint16_t size, cluster_size = 3;
+ struct swim_cluster *cluster = swim_cluster_new(cluster_size);
+ for (int i = 0; i < cluster_size; ++i) {
+ for (int j = i + 1; j < cluster_size; ++j)
+ swim_cluster_interconnect(cluster, i, j);
+ }
+ ok(swim_cluster_member_payload(cluster, 0, 0, &size) == NULL &&
+ size == 0, "no payload by default");
+ is(swim_cluster_member_set_payload(cluster, 0, NULL, 1300), -1,
+ "can not set too big payload");
+ ok(swim_error_check_match("Payload should be <="), "diag says too big");
+
+ const char *s0_payload = "S1 payload";
+ uint16_t s0_payload_size = strlen(s0_payload) + 1;
+ is(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+ s0_payload_size), 0,
+ "payload is set");
+ is(swim_cluster_member_incarnation(cluster, 0, 0), 1,
+ "incarnation is incremeted on each payload update");
+ const char *tmp = swim_cluster_member_payload(cluster, 0, 0, &size);
+ ok(size == s0_payload_size && memcmp(s0_payload, tmp, size) == 0,
+ "payload is successfully obtained back");
+
+ is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+ s0_payload_size, cluster_size),
+ 0, "payload is disseminated");
+ s0_payload = "S1 second version of payload";
+ s0_payload_size = strlen(s0_payload) + 1;
+ is(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+ s0_payload_size), 0,
+ "payload is changed");
+ is(swim_cluster_member_incarnation(cluster, 0, 0), 2,
+ "incarnation is incremeted on each payload update");
+ is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+ s0_payload_size, cluster_size),
+ 0, "second payload is disseminated");
+ /*
+ * Test that new incarnations help to rewrite the old
+ * payload from anti-entropy.
+ */
+ swim_cluster_set_drop(cluster, 0, 100);
+ s0_payload = "S1 third version of payload";
+ s0_payload_size = strlen(s0_payload) + 1;
+ fail_if(swim_cluster_member_set_payload(cluster, 0, s0_payload,
+ s0_payload_size) != 0);
+ /* Wait at least one round until payload TTD gets 0. */
+ swim_run_for(3);
+ swim_cluster_set_drop(cluster, 0, 0);
+ is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_payload,
+ s0_payload_size, cluster_size),
+ 0, "third payload is disseminated via anti-entropy");
+
+ swim_cluster_delete(cluster);
+ swim_finish_test();
+}
+
+static void
+swim_test_payload_refutation(void)
+{
+ swim_start_test(11);
+ uint16_t size, cluster_size = 3;
+ struct swim_cluster *cluster = swim_cluster_new(cluster_size);
+ swim_cluster_set_ack_timeout(cluster, 1);
+ for (int i = 0; i < cluster_size; ++i) {
+ for (int j = i + 1; j < cluster_size; ++j)
+ swim_cluster_interconnect(cluster, i, j);
+ }
+ const char *s0_old_payload = "s0 payload";
+ uint16_t s0_old_payload_size = strlen(s0_old_payload) + 1;
+ fail_if(swim_cluster_member_set_payload(cluster, 0, s0_old_payload,
+ s0_old_payload_size) != 0);
+ fail_if(swim_cluster_wait_payload_everywhere(cluster, 0, s0_old_payload,
+ s0_old_payload_size,
+ 3) != 0);
+ /*
+ * The test checks the following case. Assume there are 3
+ * nodes: S1, S2, S3. They all know each other. S1 sets
+ * new payload, S2 and S3 knows that. They all see that S1
+ * has incarnation 1 and payload P1.
+ *
+ * Now S1 changes payload to P2. Its incarnation becomes
+ * 2. During next entire round its round messages are
+ * lost, however ACKs work ok.
+ */
+ const char *s0_new_payload = "s0 second payload";
+ uint16_t s0_new_payload_size = strlen(s0_new_payload);
+ fail_if(swim_cluster_member_set_payload(cluster, 0, s0_new_payload,
+ s0_new_payload_size) != 0);
+ int components[2] = {SWIM_DISSEMINATION, SWIM_ANTI_ENTROPY};
+ swim_cluster_drop_components(cluster, 0, components, 2);
+ swim_run_for(3);
+ swim_cluster_drop_components(cluster, 0, NULL, 0);
+
+ is(swim_cluster_member_incarnation(cluster, 1, 0), 2,
+ "S2 sees new incarnation of S1");
+ is(swim_cluster_member_incarnation(cluster, 2, 0), 2,
+ "S3 does the same");
+
+ const char *tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+ ok(size == s0_old_payload_size &&
+ memcmp(tmp, s0_old_payload, size) == 0,
+ "but S2 does not known the new payload");
+
+ tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+ ok(size == s0_old_payload_size &&
+ memcmp(tmp, s0_old_payload, size) == 0,
+ "as well as S3");
+
+ /* Restore normal ACK timeout. */
+ swim_cluster_set_ack_timeout(cluster, 30);
+
+ /*
+ * Now S1's payload TTD is 0, but via ACKs S1 sent its new
+ * incarnation to S2 and S3. Despite that they should
+ * apply new S1's payload via anti-entropy. Next lines
+ * test that:
+ *
+ * 1) S2 can apply new S1's payload from S1's
+ * anti-entropy;
+ *
+ * 2) S2 will not receive the old S1's payload from S3.
+ * S3 knows, that its payload is outdated, and should
+ * not send it;
+ *
+ * 2) S3 can apply new S1's payload from S2's
+ * anti-entropy. Note, that here S3 applies the payload
+ * not directly from the originator. It is the most
+ * complex case.
+ *
+ * Next lines test the case (1).
+ */
+
+ /* S3 does not participate in the test (1). */
+ swim_cluster_set_drop(cluster, 2, 100);
+ swim_run_for(3);
+
+ tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+ ok(size == s0_new_payload_size &&
+ memcmp(tmp, s0_new_payload, size) == 0,
+ "S2 learned S1's payload via anti-entropy");
+ is(swim_cluster_member_incarnation(cluster, 1, 0), 2,
+ "incarnation still is the same");
+
+ tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+ ok(size == s0_old_payload_size &&
+ memcmp(tmp, s0_old_payload, size) == 0,
+ "S3 was blocked and does not know anything");
+ is(swim_cluster_member_incarnation(cluster, 2, 0), 2,
+ "incarnation still is the same");
+
+ /* S1 will not participate in the tests further. */
+ swim_cluster_set_drop(cluster, 0, 100);
+
+ /*
+ * Now check the case (2) - S3 will not send outdated
+ * version of S1's payload. To maintain the experimental
+ * integrity S1 and S2 are silent. Only S3 sends packets.
+ */
+ swim_cluster_set_drop(cluster, 2, 0);
+ swim_cluster_set_drop_out(cluster, 1, 100);
+ swim_run_for(3);
+
+ tmp = swim_cluster_member_payload(cluster, 1, 0, &size);
+ ok(size == s0_new_payload_size &&
+ memcmp(tmp, s0_new_payload, size) == 0,
+ "S2 keeps the same new S1's payload, S3 did not rewrite it");
+
+ tmp = swim_cluster_member_payload(cluster, 2, 0, &size);
+ ok(size == s0_old_payload_size &&
+ memcmp(tmp, s0_old_payload, size) == 0,
+ "S3 still does not know anything");
+
+ /*
+ * Now check the case (3) - S3 accepts new S1's payload
+ * from S2. Even knowing the same S1's incarnation.
+ */
+ swim_cluster_set_drop(cluster, 1, 0);
+ swim_cluster_set_drop_out(cluster, 2, 100);
+ is(swim_cluster_wait_payload_everywhere(cluster, 0, s0_new_payload,
+ s0_new_payload_size, 3), 0,
+ "S3 learns S1's payload from S2")
+
+ swim_cluster_delete(cluster);
+ swim_finish_test();
+}
+
static int
main_f(va_list ap)
{
- swim_start_test(15);
+ swim_start_test(17);
(void) ap;
swim_test_ev_init();
@@ -666,6 +857,8 @@ main_f(va_list ap)
swim_test_quit();
swim_test_uri_update();
swim_test_broadcast();
+ swim_test_payload_basic();
+ swim_test_payload_refutation();
swim_test_transport_free();
swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index a90a86dd0..4b1407db3 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
*** main_f ***
-1..15
+1..17
*** swim_test_one_link ***
1..6
ok 1 - no rounds - no fullmesh
@@ -147,4 +147,34 @@ ok 14 - subtests
ok 6 - fullmesh is reached, and no one link was added explicitly
ok 15 - subtests
*** swim_test_broadcast: done ***
+ *** swim_test_payload_basic ***
+ 1..11
+ ok 1 - no payload by default
+ ok 2 - can not set too big payload
+ ok 3 - diag says too big
+ ok 4 - payload is set
+ ok 5 - incarnation is incremeted on each payload update
+ ok 6 - payload is successfully obtained back
+ ok 7 - payload is disseminated
+ ok 8 - payload is changed
+ ok 9 - incarnation is incremeted on each payload update
+ ok 10 - second payload is disseminated
+ ok 11 - third payload is disseminated via anti-entropy
+ok 16 - subtests
+ *** swim_test_payload_basic: done ***
+ *** swim_test_payload_refutation ***
+ 1..11
+ ok 1 - S2 sees new incarnation of S1
+ ok 2 - S3 does the same
+ ok 3 - but S2 does not known the new payload
+ ok 4 - as well as S3
+ ok 5 - S2 learned S1's payload via anti-entropy
+ ok 6 - incarnation still is the same
+ ok 7 - S3 was blocked and does not know anything
+ ok 8 - incarnation still is the same
+ ok 9 - S2 keeps the same new S1's payload, S3 did not rewrite it
+ ok 10 - S3 still does not know anything
+ ok 11 - S3 learns S1's payload from S2
+ok 17 - subtests
+ *** swim_test_payload_refutation: done ***
*** main_f: done ***
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index fd528d166..45570cce5 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -191,6 +191,27 @@ swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id,
return swim_member_incarnation(m);
}
+const char *
+swim_cluster_member_payload(struct swim_cluster *cluster, int node_id,
+ int member_id, uint16_t *size)
+{
+ const struct swim_member *m =
+ swim_cluster_member_view(cluster, node_id, member_id);
+ if (m == NULL) {
+ *size = 0;
+ return NULL;
+ }
+ return swim_member_payload(m, size);
+}
+
+int
+swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
+ const char *payload, uint16_t size)
+{
+ struct swim *s = swim_cluster_node(cluster, i);
+ return swim_set_payload(s, payload, size);
+}
+
struct swim *
swim_cluster_node(struct swim_cluster *cluster, int i)
{
@@ -506,6 +527,13 @@ struct swim_member_template {
*/
bool need_check_incarnation;
uint64_t incarnation;
+ /**
+ * True, if the payload should be checked to be equal to
+ * @a payload of size @a payload_size.
+ */
+ bool need_check_payload;
+ const char *payload;
+ uint16_t payload_size;
};
/** Build member template. No checks are set. */
@@ -542,6 +570,19 @@ swim_member_template_set_incarnation(struct swim_member_template *t,
t->incarnation = incarnation;
}
+/**
+ * Set that the member template should be used to check member
+ * status.
+ */
+static inline void
+swim_member_template_set_payload(struct swim_member_template *t,
+ const char *payload, uint16_t payload_size)
+{
+ t->need_check_payload = true;
+ t->payload = payload;
+ t->payload_size = payload_size;
+}
+
/** Callback to check that a member matches a template. */
static bool
swim_loop_check_member(struct swim_cluster *cluster, void *data)
@@ -551,17 +592,26 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data)
swim_cluster_member_view(cluster, t->node_id, t->member_id);
enum swim_member_status status;
uint64_t incarnation;
+ const char *payload;
+ uint16_t payload_size;
if (m != NULL) {
status = swim_member_status(m);
incarnation = swim_member_incarnation(m);
+ payload = swim_member_payload(m, &payload_size);
} else {
status = swim_member_status_MAX;
incarnation = 0;
+ payload = NULL;
+ payload_size = 0;
}
if (t->need_check_status && status != t->status)
return false;
if (t->need_check_incarnation && incarnation != t->incarnation)
return false;
+ if (t->need_check_payload &&
+ (payload_size != t->payload_size ||
+ memcmp(payload, t->payload, payload_size) != 0))
+ return false;
return true;
}
@@ -644,6 +694,18 @@ swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id,
swim_loop_check_member_everywhere, &t);
}
+int
+swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
+ int member_id, const char *payload,
+ uint16_t payload_size, double timeout)
+{
+ struct swim_member_template t;
+ swim_member_template_create(&t, -1, member_id);
+ swim_member_template_set_payload(&t, payload, payload_size);
+ return swim_wait_timeout(timeout, cluster,
+ swim_loop_check_member_everywhere, &t);
+}
+
bool
swim_error_check_match(const char *msg)
{
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 6ea136e36..100a67e0c 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -141,6 +141,14 @@ uint64_t
swim_cluster_member_incarnation(struct swim_cluster *cluster, int node_id,
int member_id);
+const char *
+swim_cluster_member_payload(struct swim_cluster *cluster, int node_id,
+ int member_id, uint16_t *size);
+
+int
+swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
+ const char *payload, uint16_t size);
+
/**
* Check if in the cluster every instance knowns the about other
* instances.
@@ -192,6 +200,16 @@ swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id,
int member_id, uint64_t incarnation,
double timeout);
+/**
+ * Wait until a member with id @a member_id is seen with
+ * @a payload of size @a payload_size in the membership table of
+ * every instance in @a cluster. At most @a timeout seconds.
+ */
+int
+swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
+ int member_id, const char *payload,
+ uint16_t payload_size, double timeout);
+
/** Process SWIM events for @a duration fake seconds. */
void
swim_run_for(double duration);
More information about the Tarantool-patches
mailing list