* [tarantool-patches] [PATCH v2 1/6] swim: encapsulate member bin info into a 'passport'
2019-04-09 11:46 [tarantool-patches] [PATCH v2 0/6] swim dissemination Vladislav Shpilevoy
@ 2019-04-09 11:46 ` Vladislav Shpilevoy
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 2/6] swim: make members array decoder be a separate function Vladislav Shpilevoy
` (4 subsequent siblings)
5 siblings, 0 replies; 8+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-09 11:46 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
Each member stored in components dissemination and anti-entropy
should carry a unique identifier, a status, and an address. Those
attributes are UUID, IP, Port, enum swim_member_status,
incarnation.
Now they are sent only in scope of anti-entropy, but forthcoming
dissemination component also would like to use these attributes
for each event.
This commit makes the vital attributes and their code more
reusable by encapsulation of them into a binary passport
structure.
Part of #3234
---
src/lib/swim/swim_proto.c | 46 ++++++++++++++++++++++++++-------------
src/lib/swim/swim_proto.h | 26 +++++++++++++++++-----
2 files changed, 51 insertions(+), 21 deletions(-)
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 4e607e796..416e1d99e 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -314,32 +314,48 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
header->v_anti_entropy = mp_bswap_u16(batch_size);
}
+static inline void
+swim_passport_bin_create(struct swim_passport_bin *passport)
+{
+ passport->k_status = SWIM_MEMBER_STATUS;
+ passport->k_addr = SWIM_MEMBER_ADDRESS;
+ passport->m_addr = 0xce;
+ passport->k_port = SWIM_MEMBER_PORT;
+ passport->m_port = 0xcd;
+ passport->k_uuid = SWIM_MEMBER_UUID;
+ passport->m_uuid = 0xc4;
+ passport->m_uuid_len = UUID_LEN;
+ passport->k_incarnation = SWIM_MEMBER_INCARNATION;
+ passport->m_incarnation = 0xcf;
+}
+
+static inline void
+swim_passport_bin_fill(struct swim_passport_bin *passport,
+ const struct sockaddr_in *addr,
+ const struct tt_uuid *uuid,
+ enum swim_member_status status, uint64_t incarnation)
+{
+ passport->v_status = status;
+ passport->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr));
+ passport->v_port = mp_bswap_u16(ntohs(addr->sin_port));
+ memcpy(passport->v_uuid, uuid, UUID_LEN);
+ passport->v_incarnation = mp_bswap_u64(incarnation);
+}
+
void
swim_member_bin_fill(struct swim_member_bin *header,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
enum swim_member_status status, uint64_t incarnation)
{
- header->v_status = status;
- header->v_addr = mp_bswap_u32(ntohl(addr->sin_addr.s_addr));
- header->v_port = mp_bswap_u16(ntohs(addr->sin_port));
- memcpy(header->v_uuid, uuid, UUID_LEN);
- header->v_incarnation = mp_bswap_u64(incarnation);
+ swim_passport_bin_fill(&header->passport, addr, uuid, status,
+ incarnation);
}
void
swim_member_bin_create(struct swim_member_bin *header)
{
header->m_header = 0x85;
- header->k_status = SWIM_MEMBER_STATUS;
- header->k_addr = SWIM_MEMBER_ADDRESS;
- header->m_addr = 0xce;
- header->k_port = SWIM_MEMBER_PORT;
- header->m_port = 0xcd;
- header->k_uuid = SWIM_MEMBER_UUID;
- header->m_uuid = 0xc4;
- header->m_uuid_len = UUID_LEN;
- header->k_incarnation = SWIM_MEMBER_INCARNATION;
- header->m_incarnation = 0xcf;
+ swim_passport_bin_create(&header->passport);
}
void
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 07905df31..0e73f37fb 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -239,13 +239,16 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
uint16_t batch_size);
/**
- * SWIM member MessagePack template. Represents one record in
- * anti-entropy section.
+ * The structure represents a passport of a member. It consists of
+ * some vital necessary member attributes, allowing to detect its
+ * state, exact address. The whole passport is necessary for each
+ * info related to a member: for anti-entropy records, for
+ * dissemination events. The components can inherit that structure
+ * and add more attributes. For example, anti-entropy can add a
+ * mandatory payload; dissemination adds optional old UUID and
+ * payload.
*/
-struct PACKED swim_member_bin {
- /** mp_encode_map(5) */
- uint8_t m_header;
-
+struct PACKED swim_passport_bin {
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
uint8_t k_status;
/** mp_encode_uint(enum member_status) */
@@ -277,6 +280,17 @@ struct PACKED swim_member_bin {
uint64_t v_incarnation;
};
+/**
+ * SWIM member MessagePack template. Represents one record in
+ * anti-entropy section.
+ */
+struct PACKED swim_member_bin {
+ /** mp_encode_map(5) */
+ uint8_t m_header;
+ /** Basic member info like status, address. */
+ struct swim_passport_bin passport;
+};
+
/** Initialize antri-entropy record. */
void
swim_member_bin_create(struct swim_member_bin *header);
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 8+ messages in thread
* [tarantool-patches] [PATCH v2 2/6] swim: make members array decoder be a separate function
2019-04-09 11:46 [tarantool-patches] [PATCH v2 0/6] swim dissemination Vladislav Shpilevoy
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 1/6] swim: encapsulate member bin info into a 'passport' Vladislav Shpilevoy
@ 2019-04-09 11:46 ` Vladislav Shpilevoy
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 3/6] test: rename some swim test methods and macros Vladislav Shpilevoy
` (3 subsequent siblings)
5 siblings, 0 replies; 8+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-09 11:46 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
At this moment SWIM protocol stores array of members only in one
place: inside the anti-entropy component. Its decoding is a
simple loop taking the member definitions one by one and
upserting them into the member table.
But the dissemination also has something kinda like members
array: an array of events. The trick is that an event is
basically the same as a member +/- a couple of optional fields.
Events are also decoded into the member definition structure. It
means that anti-entropy decoder can be easily reused.
Part of #3234
---
src/lib/swim/swim.c | 19 +++++++++++++++----
1 file changed, 15 insertions(+), 4 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index f65fb60a3..a30a83886 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1019,12 +1019,14 @@ skip:
return 0;
}
-/** Decode an anti-entropy message, update member table. */
+/**
+ * Decode a bunch of members encoded as a MessagePack array. Each
+ * correctly decoded member is upserted into the member table.
+ */
static int
-swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
+swim_process_members(struct swim *swim, const char *prefix,
+ const char **pos, const char *end)
{
- say_verbose("SWIM %d: process anti-entropy", swim_fd(swim));
- const char *prefix = "invalid anti-entropy message:";
uint32_t size;
if (swim_decode_array(pos, end, &size, prefix, "root") != 0)
return -1;
@@ -1044,6 +1046,15 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
return 0;
}
+/** Decode an anti-entropy message, update member table. */
+static int
+swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
+{
+ say_verbose("SWIM %d: process anti-entropy", swim_fd(swim));
+ const char *prefix = "invalid anti-entropy message:";
+ return swim_process_members(swim, prefix, pos, end);
+}
+
/**
* Decode a failure detection message. Schedule acks, process
* acks.
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 8+ messages in thread
* [tarantool-patches] [PATCH v2 3/6] test: rename some swim test methods and macros
2019-04-09 11:46 [tarantool-patches] [PATCH v2 0/6] swim dissemination Vladislav Shpilevoy
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 1/6] swim: encapsulate member bin info into a 'passport' Vladislav Shpilevoy
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 2/6] swim: make members array decoder be a separate function Vladislav Shpilevoy
@ 2019-04-09 11:46 ` Vladislav Shpilevoy
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 4/6] test: speed up swim big cluster failure detection Vladislav Shpilevoy
` (2 subsequent siblings)
5 siblings, 0 replies; 8+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-09 11:46 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
@Kostja thinks it improves redability.
---
test/unit/swim.c | 83 +++++++++++++++++++------------------
test/unit/swim_test_utils.c | 4 +-
test/unit/swim_test_utils.h | 8 ++--
3 files changed, 49 insertions(+), 46 deletions(-)
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 860d3211e..5b7b08ae1 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -51,7 +51,7 @@ static int test_result;
static void
swim_test_one_link(void)
{
- swim_start_test(6);
+ swim_test_start(6);
/*
* Run a simple cluster of two elements. One of them
* learns about another explicitly. Another should add the
@@ -73,13 +73,13 @@ swim_test_one_link(void)
"1 sees 0 as alive");
swim_cluster_delete(cluster);
- swim_finish_test();
+ swim_test_finish();
}
static void
swim_test_sequence(void)
{
- swim_start_test(1);
+ swim_test_start(1);
/*
* Run a simple cluster of several elements. Build a
* 'forward list' from them. It should turn into fullmesh
@@ -92,13 +92,13 @@ swim_test_sequence(void)
is(swim_cluster_wait_fullmesh(cluster, 10), 0, "sequence");
swim_cluster_delete(cluster);
- swim_finish_test();
+ swim_test_finish();
}
static void
swim_test_uuid_update(void)
{
- swim_start_test(4);
+ swim_test_start(4);
struct swim_cluster *cluster = swim_cluster_new(2);
swim_cluster_add_link(cluster, 0, 1);
@@ -112,24 +112,24 @@ swim_test_uuid_update(void)
new_uuid.time_low = 2;
is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), -1,
"can not update to an existing UUID - swim_cfg fails");
- ok(swim_error_check_match("exists"), "diag says 'exists'");
+ ok(swim_test_error_check_match("exists"), "diag says 'exists'");
swim_cluster_delete(cluster);
- swim_finish_test();
+ swim_test_finish();
}
static void
swim_test_cfg(void)
{
- swim_start_test(16);
+ swim_test_start(16);
struct swim *s = swim_new();
assert(s != NULL);
is(swim_cfg(s, NULL, -1, -1, -1, NULL), -1, "first cfg failed - no URI");
- ok(swim_error_check_match("mandatory"), "diag says 'mandatory'");
+ ok(swim_test_error_check_match("mandatory"), "diag says 'mandatory'");
const char *uri = "127.0.0.1:1";
is(swim_cfg(s, uri, -1, -1, -1, NULL), -1, "first cfg failed - no UUID");
- ok(swim_error_check_match("mandatory"), "diag says 'mandatory'");
+ ok(swim_test_error_check_match("mandatory"), "diag says 'mandatory'");
struct tt_uuid uuid = uuid_nil;
uuid.time_low = 1;
is(swim_cfg(s, uri, -1, -1, -1, &uuid), 0, "configured first time");
@@ -148,26 +148,28 @@ swim_test_cfg(void)
uuid2.time_low = 2;
is(swim_cfg(s2, bad_uri1, -1, -1, -1, &uuid2), -1,
"can not use invalid URI");
- ok(swim_error_check_match("invalid uri"), "diag says 'invalid uri'");
+ ok(swim_test_error_check_match("invalid uri"),
+ "diag says 'invalid uri'");
is(swim_cfg(s2, bad_uri2, -1, -1, -1, &uuid2), -1,
"can not use domain names");
- ok(swim_error_check_match("invalid uri"), "diag says 'invalid uri'");
+ ok(swim_test_error_check_match("invalid uri"),
+ "diag says 'invalid uri'");
is(swim_cfg(s2, bad_uri3, -1, -1, -1, &uuid2), -1,
"UNIX sockets are not supported");
- ok(swim_error_check_match("only IP"), "diag says 'only IP'");
+ ok(swim_test_error_check_match("only IP"), "diag says 'only IP'");
is(swim_cfg(s2, uri, -1, -1, -1, &uuid2), -1,
"can not bind to an occupied port");
- ok(swim_error_check_match("bind"), "diag says 'bind'");
+ ok(swim_test_error_check_match("bind"), "diag says 'bind'");
swim_delete(s2);
swim_delete(s);
- swim_finish_test();
+ swim_test_finish();
}
static void
swim_test_add_remove(void)
{
- swim_start_test(13);
+ swim_test_start(13);
struct swim_cluster *cluster = swim_cluster_new(2);
swim_cluster_add_link(cluster, 0, 1);
@@ -179,7 +181,7 @@ swim_test_add_remove(void)
is(swim_add_member(s1, swim_member_uri(s2_self),
swim_member_uuid(s2_self)), -1,
"can not add an existing member");
- ok(swim_error_check_match("already exists"),
+ ok(swim_test_error_check_match("already exists"),
"diag says 'already exists'");
const char *bad_uri = "127.0.0101010101";
@@ -187,11 +189,12 @@ swim_test_add_remove(void)
uuid.time_low = 1000;
is(swim_add_member(s1, bad_uri, &uuid), -1,
"can not add a invalid uri");
- ok(swim_error_check_match("invalid uri"), "diag says 'invalid uri'");
+ ok(swim_test_error_check_match("invalid uri"),
+ "diag says 'invalid uri'");
is(swim_remove_member(s2, swim_member_uuid(s2_self)), -1,
"can not remove self");
- ok(swim_error_check_match("can not remove self"),
+ ok(swim_test_error_check_match("can not remove self"),
"diag says the same");
isnt(swim_member_by_uuid(s1, swim_member_uuid(s2_self)), NULL,
@@ -214,7 +217,7 @@ swim_test_add_remove(void)
* before its completion.
*/
swim_cluster_block_io(cluster, 0);
- swim_run_for(1);
+ swim_test_run_for(1);
/*
* Now the message from s1 is in 'fly', round step is not
* finished.
@@ -226,13 +229,13 @@ swim_test_add_remove(void)
swim_cluster_delete(cluster);
- swim_finish_test();
+ swim_test_finish();
}
static void
swim_test_basic_failure_detection(void)
{
- swim_start_test(7);
+ swim_test_start(7);
struct swim_cluster *cluster = swim_cluster_new(2);
swim_cluster_set_ack_timeout(cluster, 0.5);
@@ -262,24 +265,24 @@ swim_test_basic_failure_detection(void)
/* A member can be removed during an ACK wait. */
swim_cluster_block_io(cluster, 1);
/* Next round after 1 sec + let ping hang for 0.25 sec. */
- swim_run_for(1.25);
+ swim_test_run_for(1.25);
struct swim *s1 = swim_cluster_node(cluster, 0);
struct swim *s2 = swim_cluster_node(cluster, 1);
const struct swim_member *s2_self = swim_self(s2);
swim_remove_member(s1, swim_member_uuid(s2_self));
swim_cluster_unblock_io(cluster, 1);
- swim_run_for(0.1);
+ swim_test_run_for(0.1);
is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE,
"a member is added back on an ACK");
swim_cluster_delete(cluster);
- swim_finish_test();
+ swim_test_finish();
}
static void
swim_test_basic_gossip(void)
{
- swim_start_test(4);
+ swim_test_start(4);
struct swim_cluster *cluster = swim_cluster_new(3);
swim_cluster_set_ack_timeout(cluster, 10);
/*
@@ -298,7 +301,7 @@ swim_test_basic_gossip(void)
* Wait two no-ACKs on S1 from S2. +1 sec to send a first
* ping.
*/
- swim_run_for(20 + 1);
+ swim_test_run_for(20 + 1);
swim_cluster_add_link(cluster, 0, 2);
swim_cluster_add_link(cluster, 2, 1);
/*
@@ -307,11 +310,11 @@ swim_test_basic_gossip(void)
* S1 from informing S3 about that the S3 IO is blocked
* for a short time.
*/
- swim_run_for(9);
+ swim_test_run_for(9);
is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE,
"S1 still thinks that S2 is alive");
swim_cluster_block_io(cluster, 2);
- swim_run_for(1);
+ swim_test_run_for(1);
is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD, "but one "\
"more second, and a third ack timed out - S1 sees S2 as dead");
is(swim_cluster_member_status(cluster, 2, 1), MEMBER_ALIVE,
@@ -325,13 +328,13 @@ swim_test_basic_gossip(void)
"S3 learns about dead S2 from S1");
swim_cluster_delete(cluster);
- swim_finish_test();
+ swim_test_finish();
}
static void
swim_test_probe(void)
{
- swim_start_test(2);
+ swim_test_start(2);
struct swim_cluster *cluster = swim_cluster_new(2);
struct swim *s1 = swim_cluster_node(cluster, 0);
@@ -342,13 +345,13 @@ swim_test_probe(void)
"receive ACK on probe and get fullmesh")
swim_cluster_delete(cluster);
- swim_finish_test();
+ swim_test_finish();
}
static void
swim_test_refute(void)
{
- swim_start_test(4);
+ swim_test_start(4);
struct swim_cluster *cluster = swim_cluster_new(2);
swim_cluster_set_ack_timeout(cluster, 2);
@@ -368,13 +371,13 @@ swim_test_refute(void)
"S2 learned its old bigger incarnation 1 from S0");
swim_cluster_delete(cluster);
- swim_finish_test();
+ swim_test_finish();
}
static void
swim_test_too_big_packet(void)
{
- swim_start_test(2);
+ swim_test_start(2);
int size = 50;
struct swim_cluster *cluster = swim_cluster_new(size);
for (int i = 1; i < size; ++i)
@@ -402,13 +405,13 @@ swim_test_too_big_packet(void)
is(i, size, "S%d drops all the packets - it should become dead",
drop_id + 1);
swim_cluster_delete(cluster);
- swim_finish_test();
+ swim_test_finish();
}
static void
swim_test_undead(void)
{
- swim_start_test(2);
+ swim_test_start(2);
struct swim_cluster *cluster = swim_cluster_new(2);
swim_cluster_set_gc(cluster, SWIM_GC_OFF);
swim_cluster_set_ack_timeout(cluster, 1);
@@ -417,17 +420,17 @@ swim_test_undead(void)
swim_cluster_set_drop(cluster, 1, true);
is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 4), 0,
"member S2 is dead");
- swim_run_for(5);
+ swim_test_run_for(5);
is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD,
"but it is never deleted due to the cfg option");
swim_cluster_delete(cluster);
- swim_finish_test();
+ swim_test_finish();
}
static int
main_f(va_list ap)
{
- swim_start_test(11);
+ swim_test_start(11);
(void) ap;
swim_test_ev_init();
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index bb413372c..006c7446f 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -270,7 +270,7 @@ swim_loop_check_false(struct swim_cluster *cluster, void *data)
}
void
-swim_run_for(double duration)
+swim_test_run_for(double duration)
{
swim_wait_timeout(duration, NULL, swim_loop_check_false, NULL);
}
@@ -384,7 +384,7 @@ swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id,
}
bool
-swim_error_check_match(const char *msg)
+swim_test_error_check_match(const char *msg)
{
return strstr(diag_last_error(diag_get())->errmsg, msg) != NULL;
}
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index d2ef00817..4a1ff4cb8 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -61,7 +61,7 @@ swim_cluster_delete(struct swim_cluster *cluster);
/** Check that an error in diag contains @a msg. */
bool
-swim_error_check_match(const char *msg);
+swim_test_error_check_match(const char *msg);
/** Get a SWIM instance by its ordinal number. */
struct swim *
@@ -130,15 +130,15 @@ swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id,
/** Process SWIM events for @a duration fake seconds. */
void
-swim_run_for(double duration);
+swim_test_run_for(double duration);
-#define swim_start_test(n) { \
+#define swim_test_start(n) { \
header(); \
say_verbose("-------- SWIM start test %s --------", __func__); \
plan(n); \
}
-#define swim_finish_test() { \
+#define swim_test_finish() { \
say_verbose("-------- SWIM end test %s --------", __func__); \
swim_test_ev_reset(); \
check_plan(); \
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 8+ messages in thread
* [tarantool-patches] [PATCH v2 4/6] test: speed up swim big cluster failure detection
2019-04-09 11:46 [tarantool-patches] [PATCH v2 0/6] swim dissemination Vladislav Shpilevoy
` (2 preceding siblings ...)
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 3/6] test: rename some swim test methods and macros Vladislav Shpilevoy
@ 2019-04-09 11:46 ` Vladislav Shpilevoy
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 5/6] test: set packet drop rate instead of flag in swim tests Vladislav Shpilevoy
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 6/6] swim: introduce dissemination component Vladislav Shpilevoy
5 siblings, 0 replies; 8+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-09 11:46 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
The test checks that if a member has failed in a big cluster, it
is eventually deleted from all instances. But it takes too much
real time despite usage of virtual time.
This is because member total deletion takes
O(N + ack_timeout * 5) time. N so as to wait until every member
pinged the failed one at least once, + 3 * ack_timeout to learn
that it is dead, and + 2 * ack_timeout to drop it. Of course, it
is an upper border, and usually it is faster but not much. For
example, on the cluster of size 50 it takes easily 55 virtual
seconds.
On the contrary, to just learn that a member is dead on every
instance takes O(log(N)) according to the SWIM paper. On the
same test with 50 instances cluster it takes ~15 virtual seconds
to disseminate 'dead' status of the failed member on every
instance. And even without dissemination component, with
anti-entropy only.
Leaping ahead, for the subsequent patches it is tested that with
the dissemination component it takes already ~6 virtual seconds.
In the summary, without losing test coverage it is much faster to
turn off SWIM GC and wait until the failed member looks dead on
all instances.
Part of #3234
---
test/unit/swim.c | 52 ++++++++++++++++++++-------------
test/unit/swim.result | 5 ++--
test/unit/swim_test_utils.c | 57 +++++++++++++++++++++++++++++++++++++
test/unit/swim_test_utils.h | 20 +++++++++++++
4 files changed, 112 insertions(+), 22 deletions(-)
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 5b7b08ae1..2542eac1d 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -377,33 +377,45 @@ swim_test_refute(void)
static void
swim_test_too_big_packet(void)
{
- swim_test_start(2);
+ swim_test_start(3);
int size = 50;
+ double ack_timeout = 1;
+ double first_dead_timeout = 20;
+ double everywhere_dead_timeout = size * 3;
+ int drop_id = size / 2;
+
struct swim_cluster *cluster = swim_cluster_new(size);
for (int i = 1; i < size; ++i)
swim_cluster_add_link(cluster, 0, i);
- is(swim_cluster_wait_fullmesh(cluster, size), 0, "despite S1 can not "\
- "send all the %d members in a one packet, fullmesh is eventually "\
- "reached", size);
- swim_cluster_set_ack_timeout(cluster, 1);
- int drop_id = size / 2;
+
+ is(swim_cluster_wait_fullmesh(cluster, size * 2), 0, "despite S1 can "\
+ "not send all the %d members in a one packet, fullmesh is "\
+ "eventually reached", size);
+
+ swim_cluster_set_ack_timeout(cluster, ack_timeout);
swim_cluster_set_drop(cluster, drop_id, true);
+ is(swim_cluster_wait_status_anywhere(cluster, drop_id, MEMBER_DEAD,
+ first_dead_timeout), 0,
+ "a dead member is detected in time not depending on cluster size");
/*
- * Dissemination of a detected failure takes long time
- * without help of the component, intended for that.
+ * GC is off to simplify and speed up checks. When no GC
+ * the test is sure that it is safe to check for
+ * MEMBER_DEAD everywhere, because it is impossible that a
+ * member is considered dead in one place, but already
+ * deleted on another. Also, total member deletion takes
+ * linear time, because a member is deleted from an
+ * instance only when *that* instance will not receive
+ * some direct acks from the member. Deletion and
+ * additional pings are not triggered if a member dead
+ * status is received indirectly via dissemination or
+ * anti-entropy. Otherwise it could produce linear network
+ * load on the already weak member.
*/
- double timeout = size * 10;
- int i = 0;
- for (; i < size; ++i) {
- double start = swim_time();
- if (i != drop_id &&
- swim_cluster_wait_status(cluster, i, drop_id,
- swim_member_status_MAX, timeout) != 0)
- break;
- timeout -= swim_time() - start;
- }
- is(i, size, "S%d drops all the packets - it should become dead",
- drop_id + 1);
+ swim_cluster_set_gc(cluster, SWIM_GC_OFF);
+ is(swim_cluster_wait_status_everywhere(cluster, drop_id, MEMBER_DEAD,
+ everywhere_dead_timeout), 0,
+ "S%d death is eventually learned by everyone", drop_id + 1);
+
swim_cluster_delete(cluster);
swim_test_finish();
}
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 904f061f6..3393870c2 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -94,9 +94,10 @@ ok 8 - subtests
ok 9 - subtests
*** swim_test_basic_gossip: done ***
*** swim_test_too_big_packet ***
- 1..2
+ 1..3
ok 1 - despite S1 can not send all the 50 members in a one packet, fullmesh is eventually reached
- ok 2 - S26 drops all the packets - it should become dead
+ ok 2 - a dead member is detected in time not depending on cluster size
+ ok 3 - S26 death is eventually learned by everyone
ok 10 - subtests
*** swim_test_too_big_packet: done ***
*** swim_test_undead ***
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index 006c7446f..02149f256 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -361,6 +361,39 @@ swim_loop_check_member(struct swim_cluster *cluster, void *data)
return true;
}
+/**
+ * Callback to check that a member matches a template on any
+ * instance in the cluster.
+ */
+static bool
+swim_loop_check_member_anywhere(struct swim_cluster *cluster, void *data)
+{
+ struct swim_member_template *t = (struct swim_member_template *) data;
+ for (t->node_id = 0; t->node_id < cluster->size; ++t->node_id) {
+ if (t->node_id != t->member_id &&
+ swim_loop_check_member(cluster, data))
+ return true;
+ }
+ return false;
+}
+
+/**
+ * Callback to check that a member matches a template on every
+ * instance in the cluster.
+ */
+static bool
+swim_loop_check_member_everywhere(struct swim_cluster *cluster, void *data)
+{
+ struct swim_member_template *t = (struct swim_member_template *) data;
+ for (t->node_id = 0; t->node_id < cluster->size; ++t->node_id) {
+ if (t->node_id != t->member_id &&
+ !swim_loop_check_member(cluster, data))
+ return false;
+ }
+ return true;
+}
+
+
int
swim_cluster_wait_status(struct swim_cluster *cluster, int node_id,
int member_id, enum swim_member_status status,
@@ -383,6 +416,30 @@ swim_cluster_wait_incarnation(struct swim_cluster *cluster, int node_id,
return swim_wait_timeout(timeout, cluster, swim_loop_check_member, &t);
}
+int
+swim_cluster_wait_status_anywhere(struct swim_cluster *cluster, int member_id,
+ enum swim_member_status status,
+ double timeout)
+{
+ struct swim_member_template t;
+ swim_member_template_create(&t, -1, member_id);
+ swim_member_template_set_status(&t, status);
+ return swim_wait_timeout(timeout, cluster,
+ swim_loop_check_member_anywhere, &t);
+}
+
+int
+swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id,
+ enum swim_member_status status,
+ double timeout)
+{
+ struct swim_member_template t;
+ swim_member_template_create(&t, -1, member_id);
+ swim_member_template_set_status(&t, status);
+ return swim_wait_timeout(timeout, cluster,
+ swim_loop_check_member_everywhere, &t);
+}
+
bool
swim_test_error_check_match(const char *msg)
{
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 4a1ff4cb8..13781d037 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -118,6 +118,26 @@ swim_cluster_wait_status(struct swim_cluster *cluster, int node_id,
int member_id, enum swim_member_status status,
double timeout);
+/**
+ * Wait until a member with id @a member_id is seen with @a status
+ * in the membership table of any instance in @a cluster. At most
+ * @a timeout seconds.
+ */
+int
+swim_cluster_wait_status_anywhere(struct swim_cluster *cluster, int member_id,
+ enum swim_member_status status,
+ double timeout);
+
+/**
+ * Wait until a member with id @a member_id is seen with @a status
+ * in the membership table of every instance in @a cluster. At
+ * most @a timeout seconds.
+ */
+int
+swim_cluster_wait_status_everywhere(struct swim_cluster *cluster, int member_id,
+ enum swim_member_status status,
+ double timeout);
+
/**
* Wait until a member with id @a member_id is seen with @a
* incarnation in the membership table of a member with id @a
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 8+ messages in thread
* [tarantool-patches] [PATCH v2 5/6] test: set packet drop rate instead of flag in swim tests
2019-04-09 11:46 [tarantool-patches] [PATCH v2 0/6] swim dissemination Vladislav Shpilevoy
` (3 preceding siblings ...)
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 4/6] test: speed up swim big cluster failure detection Vladislav Shpilevoy
@ 2019-04-09 11:46 ` Vladislav Shpilevoy
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 6/6] swim: introduce dissemination component Vladislav Shpilevoy
5 siblings, 0 replies; 8+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-09 11:46 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
Before dissemination component it was enough in the tests to
either drop all packets to/from a certain member, or do not drop
at all. But after dissemination it will be time to test more
granulated packet loss table: not 0/100, but 5/10/20/50/.../100
packet loss rate.
Part of #3234
---
test/unit/swim.c | 10 +++++-----
test/unit/swim_test_transport.c | 27 +++++++++++++++++++--------
test/unit/swim_test_transport.h | 9 +++++----
test/unit/swim_test_utils.c | 2 +-
test/unit/swim_test_utils.h | 2 +-
5 files changed, 31 insertions(+), 19 deletions(-)
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 2542eac1d..002ea1a5b 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -296,7 +296,7 @@ swim_test_basic_gossip(void)
*/
swim_cluster_add_link(cluster, 0, 1);
swim_cluster_add_link(cluster, 1, 0);
- swim_cluster_set_drop(cluster, 1, true);
+ swim_cluster_set_drop(cluster, 1, 100);
/*
* Wait two no-ACKs on S1 from S2. +1 sec to send a first
* ping.
@@ -356,9 +356,9 @@ swim_test_refute(void)
swim_cluster_set_ack_timeout(cluster, 2);
swim_cluster_add_link(cluster, 0, 1);
- swim_cluster_set_drop(cluster, 1, true);
+ swim_cluster_set_drop(cluster, 1, 100);
fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 7) != 0);
- swim_cluster_set_drop(cluster, 1, false);
+ swim_cluster_set_drop(cluster, 1, 0);
is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0,
"S2 increments its own incarnation to refute its death");
is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 1), 0,
@@ -393,7 +393,7 @@ swim_test_too_big_packet(void)
"eventually reached", size);
swim_cluster_set_ack_timeout(cluster, ack_timeout);
- swim_cluster_set_drop(cluster, drop_id, true);
+ swim_cluster_set_drop(cluster, drop_id, 100);
is(swim_cluster_wait_status_anywhere(cluster, drop_id, MEMBER_DEAD,
first_dead_timeout), 0,
"a dead member is detected in time not depending on cluster size");
@@ -429,7 +429,7 @@ swim_test_undead(void)
swim_cluster_set_ack_timeout(cluster, 1);
swim_cluster_add_link(cluster, 0, 1);
swim_cluster_add_link(cluster, 1, 0);
- swim_cluster_set_drop(cluster, 1, true);
+ swim_cluster_set_drop(cluster, 1, 100);
is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 4), 0,
"member S2 is dead");
swim_test_run_for(5);
diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index 3b59eaf4d..e563185ee 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -97,10 +97,10 @@ struct swim_fd {
*/
bool is_opened;
/**
- * True if any message sent to that fd should be just
- * dropped, not queued.
+ * Probability of packet loss. For both sends and
+ * receipts.
*/
- bool is_dropping;
+ double drop_rate;
/**
* Link in the list of opened and non-blocked descriptors.
* Used to feed them all EV_WRITE.
@@ -130,7 +130,7 @@ swim_fd_open(struct swim_fd *fd)
return -1;
}
fd->is_opened = true;
- fd->is_dropping = false;
+ fd->drop_rate = 0;
rlist_add_tail_entry(&swim_fd_active, fd, in_active);
return 0;
}
@@ -156,7 +156,7 @@ swim_test_transport_init(void)
for (int i = 0, evfd = FAKE_FD_BASE; i < FAKE_FD_NUMBER; ++i, ++evfd) {
swim_fd[i].evfd = evfd;
swim_fd[i].is_opened = false;
- swim_fd[i].is_dropping = false;
+ swim_fd[i].drop_rate = 0;
rlist_create(&swim_fd[i].in_active);
rlist_create(&swim_fd[i].recv_queue);
rlist_create(&swim_fd[i].send_queue);
@@ -268,11 +268,21 @@ swim_test_transport_unblock_fd(int fd)
}
void
-swim_test_transport_set_drop(int fd, bool value)
+swim_test_transport_set_drop(int fd, double value)
{
struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE];
if (sfd->is_opened)
- sfd->is_dropping = value;
+ sfd->drop_rate = value;
+}
+
+/**
+ * Returns true with probability @a rate, and is used to decided
+ * wether to drop a packet or not.
+ */
+static inline bool
+swim_test_is_drop(double rate)
+{
+ return ((double) rand() / RAND_MAX) * 100 < rate;
}
/** Send one packet to destination's recv queue. */
@@ -285,7 +295,8 @@ swim_fd_send_packet(struct swim_fd *fd)
rlist_shift_entry(&fd->send_queue, struct swim_test_packet,
in_queue);
struct swim_fd *dst = &swim_fd[ntohs(p->dst.sin_port)];
- if (dst->is_opened && ! dst->is_dropping && ! fd->is_dropping)
+ if (dst->is_opened && !swim_test_is_drop(dst->drop_rate) &&
+ !swim_test_is_drop(fd->drop_rate))
rlist_add_tail_entry(&dst->recv_queue, p, in_queue);
else
swim_test_packet_delete(p);
diff --git a/test/unit/swim_test_transport.h b/test/unit/swim_test_transport.h
index 5a1a92271..d291abe91 100644
--- a/test/unit/swim_test_transport.h
+++ b/test/unit/swim_test_transport.h
@@ -60,12 +60,13 @@ void
swim_test_transport_unblock_fd(int fd);
/**
- * Set to true, if all incomming and outgoing packets should be
- * dropped. Note, that the node, owning @a fd, thinks, that its
- * packets are sent.
+ * Drop rate of incomming and outgoing packets. Note, that even if
+ * a packet is dropped on send, the node, owning @a fd, still
+ * thinks, that the packet is sent. It is not a sender-visible
+ * error.
*/
void
-swim_test_transport_set_drop(int fd, bool value);
+swim_test_transport_set_drop(int fd, double value);
/** Initialize test transport system. */
void
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index 02149f256..015447a55 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -177,7 +177,7 @@ swim_cluster_unblock_io(struct swim_cluster *cluster, int i)
}
void
-swim_cluster_set_drop(struct swim_cluster *cluster, int i, bool value)
+swim_cluster_set_drop(struct swim_cluster *cluster, int i, double value)
{
swim_test_transport_set_drop(swim_fd(cluster->node[i]), value);
}
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 13781d037..69cd8fc70 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -80,7 +80,7 @@ void
swim_cluster_unblock_io(struct swim_cluster *cluster, int i);
void
-swim_cluster_set_drop(struct swim_cluster *cluster, int i, bool value);
+swim_cluster_set_drop(struct swim_cluster *cluster, int i, double value);
/**
* Explicitly add a member of id @a from_id to a member of id
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 8+ messages in thread
* [tarantool-patches] [PATCH v2 6/6] swim: introduce dissemination component
2019-04-09 11:46 [tarantool-patches] [PATCH v2 0/6] swim dissemination Vladislav Shpilevoy
` (4 preceding siblings ...)
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 5/6] test: set packet drop rate instead of flag in swim tests Vladislav Shpilevoy
@ 2019-04-09 11:46 ` Vladislav Shpilevoy
2019-04-09 13:47 ` [tarantool-patches] " Konstantin Osipov
5 siblings, 1 reply; 8+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-09 11:46 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
Dissemination components broadcasts events about member status
updates. When any member attribute is updated (incarnation,
status, UUID, address), the member stands into an event queue.
Members from the queue are encoded into each round step message
with a higher priority and before anti-entropy section.
It means, then even if a cluster consists of hundreds of members
and one of them was updated on one of instances, this update will
be disseminated regardless of whether this memeber is encoded
into anti-entropy section or not. It drastically speeds events
dissemination up, according to the SWIM paper, and is noticed in
the tests.
Part of #3234
---
src/lib/swim/swim.c | 183 +++++++++++++++++++++++++++++++++++++-
src/lib/swim/swim.h | 4 +
src/lib/swim/swim_proto.c | 26 ++++++
src/lib/swim/swim_proto.h | 56 ++++++++++++
test/unit/swim.c | 48 ++++++++--
test/unit/swim.result | 22 +++--
6 files changed, 321 insertions(+), 18 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index a30a83886..eec5d7d25 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -269,6 +269,46 @@ struct swim_member {
* message to it.
*/
struct heap_node in_wait_ack_heap;
+ /**
+ *
+ * Dissemination component
+ *
+ * Dissemination component sends events. Event is a
+ * notification about some member state update. The member
+ * maintains a different event type for each significant
+ * attribute - status, incarnation, etc not to send entire
+ * member state each time any member attribute changes.
+ *
+ * According to SWIM, an event should be sent to all
+ * members at least once - for that a TTL (time-to-live)
+ * counter is maintained for each independent event type.
+ *
+ * When a member state changes, the TTL is reset to the
+ * cluster size. It is then decremented after each send.
+ * This guarantees that each member state change is sent
+ * to each SWIM member at least once. If a new event of
+ * the same type is generated before a round is finished,
+ * the current event object is updated in place with reset
+ * of the TTL.
+ *
+ * To conclude, TTL works in two ways: to see which
+ * specific member attribute needs dissemination and to
+ * track how many cluster members still need to learn
+ * about the change from this instance.
+ */
+ /**
+ * General TTL reset each time when any visible member
+ * attribute is updated. It is always bigger or equal than
+ * any other TTLs. In addition it helps to keep a dead
+ * member not dropped until the TTL gets zero so as to
+ * allow other members to learn the dead status.
+ */
+ int status_ttl;
+ /**
+ * All created events are put into a queue sorted by event
+ * time.
+ */
+ struct rlist in_dissemination_queue;
};
#define mh_name _swim_table
@@ -364,6 +404,17 @@ struct swim {
struct ev_timer wait_ack_tick;
/** GC state saying how to remove dead members. */
enum swim_gc_mode gc_mode;
+ /**
+ *
+ * Dissemination component
+ */
+ /**
+ * Queue of all members which have dissemination
+ * information. A member is added to the queue whenever
+ * any of its attributes changes, and stays in the queue
+ * as long as the event TTL is non-zero.
+ */
+ struct rlist dissemination_queue;
};
/** Put the member into a list of ACK waiters. */
@@ -377,6 +428,26 @@ swim_wait_ack(struct swim *swim, struct swim_member *member)
}
}
+/**
+ * On literally any update of a member it is added to a queue of
+ * members to disseminate updates. Regardless of other TTLs, each
+ * update also resets status TTL. Status TTL is always greater
+ * than any other event-related TTL, so it's sufficient to look at
+ * it alone to see that a member needs information dissemination.
+ * The status change itself occupies only 2 bytes in a packet, so
+ * it is cheap to send it on any update, while does reduce
+ * entropy.
+ */
+static inline void
+swim_register_event(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_dissemination_queue)) {
+ rlist_add_tail_entry(&swim->dissemination_queue, member,
+ in_dissemination_queue);
+ }
+ member->status_ttl = mh_size(swim->members);
+}
+
/**
* Make all needed actions to process a member's update like a
* change of its status, or incarnation, or both.
@@ -384,8 +455,8 @@ swim_wait_ack(struct swim *swim, struct swim_member *member)
static void
swim_on_member_update(struct swim *swim, struct swim_member *member)
{
- (void) swim;
member->unacknowledged_pings = 0;
+ swim_register_event(swim, member);
}
/**
@@ -468,6 +539,9 @@ swim_member_delete(struct swim_member *member)
swim_task_destroy(&member->ack_task);
swim_task_destroy(&member->ping_task);
+ /* Dissemination component. */
+ assert(rlist_empty(&member->in_dissemination_queue));
+
free(member);
}
@@ -509,6 +583,10 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid,
swim_task_create(&member->ack_task, NULL, NULL, "ack");
swim_task_create(&member->ping_task, swim_ping_task_complete, NULL,
"ping");
+
+ /* Dissemination component. */
+ rlist_create(&member->in_dissemination_queue);
+
return member;
}
@@ -531,6 +609,9 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
if (! heap_node_is_stray(&member->in_wait_ack_heap))
wait_ack_heap_delete(&swim->wait_ack_heap, member);
+ /* Dissemination component. */
+ rlist_del_entry(member, in_dissemination_queue);
+
swim_member_delete(member);
}
@@ -590,6 +671,10 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
}
if (mh_size(swim->members) > 1)
swim_ev_timer_start(loop(), &swim->round_tick);
+
+ /* Dissemination component. */
+ swim_on_member_update(swim, member);
+
say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim),
swim_uuid_str(&member->uuid), mh_size(swim->members));
return member;
@@ -727,6 +812,43 @@ swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet,
return 1;
}
+/**
+ * Encode dissemination component.
+ * @retval Number of key-values added to the packet's root map.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
+{
+ struct swim_diss_header_bin diss_header_bin;
+ int size = sizeof(diss_header_bin);
+ char *header = swim_packet_reserve(packet, size);
+ if (header == NULL)
+ return 0;
+ int i = 0;
+ char *pos = header + size;
+ struct swim_member *m;
+ struct swim_event_bin event_bin;
+ swim_event_bin_create(&event_bin);
+ rlist_foreach_entry(m, &swim->dissemination_queue,
+ in_dissemination_queue) {
+ int new_size = size + sizeof(event_bin);
+ if (swim_packet_reserve(packet, new_size) == NULL)
+ break;
+ swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid,
+ m->incarnation);
+ memcpy(pos, &event_bin, sizeof(event_bin));
+ pos += sizeof(event_bin);
+ size = new_size;
+ ++i;
+ }
+ if (i == 0)
+ return 0;
+ swim_diss_header_bin_create(&diss_header_bin, i);
+ memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+ swim_packet_advance(packet, size);
+ return 1;
+}
+
/** Encode SWIM components into a UDP packet. */
static void
swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
@@ -737,12 +859,36 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
map_size += swim_encode_src_uuid(swim, packet);
map_size += swim_encode_failure_detection(swim, packet,
SWIM_FD_MSG_PING);
+ map_size += swim_encode_dissemination(swim, packet);
map_size += swim_encode_anti_entropy(swim, packet);
assert(mp_sizeof_map(map_size) == 1 && map_size >= 2);
mp_encode_map(header, map_size);
}
+/**
+ * Decrement TTLs of all events. It is done after each round step.
+ * Note, since we decrement TTL of all events, even those which
+ * have not been actually encoded and sent, if there are more
+ * events than can fit into a packet, the tail of the queue begins
+ * reeking and rotting. The most recently added members could even
+ * be deleted without being sent once. This is, however, very
+ * unlikely, since even 1000 bytes can fit 37 events containing
+ * ~27 bytes each, which means only happens upon a failure of 37
+ * instances. In such a case event loss is the mildest problem to
+ * deal with.
+ */
+static void
+swim_decrease_event_ttl(struct swim *swim)
+{
+ struct swim_member *member, *tmp;
+ rlist_foreach_entry_safe(member, &swim->dissemination_queue,
+ in_dissemination_queue, tmp) {
+ if (--member->status_ttl == 0)
+ rlist_del_entry(member, in_dissemination_queue);
+ }
+}
+
/**
* Once per specified timeout trigger a next round step. In round
* step a next memeber is taken from the round queue and a round
@@ -799,10 +945,12 @@ swim_complete_step(struct swim_task *task,
rlist_shift(&swim->round_queue);
if (rc > 0) {
/*
- * Each round message contains failure
- * detection section with a ping.
+ * Each round message contains
+ * dissemination and failure detection
+ * sections.
*/
swim_wait_ack(swim, m);
+ swim_decrease_event_ttl(swim);
}
}
}
@@ -872,7 +1020,7 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
break;
case MEMBER_DEAD:
if (m->unacknowledged_pings >= NO_ACKS_TO_GC &&
- swim->gc_mode == SWIM_GC_ON) {
+ swim->gc_mode == SWIM_GC_ON && m->status_ttl == 0) {
swim_delete_member(swim, m);
continue;
}
@@ -1121,6 +1269,18 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
return 0;
}
+/**
+ * Decode a dissemination message. Schedule new events, update
+ * members.
+ */
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+ say_verbose("SWIM %d: process dissemination", swim_fd(swim));
+ const char *prefix = "invald dissemination message:";
+ return swim_process_members(swim, prefix, pos, end);
+}
+
/** Process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler, const char *pos,
@@ -1160,6 +1320,10 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
src, &uuid) != 0)
goto error;
break;
+ case SWIM_DISSEMINATION:
+ if (swim_process_dissemination(swim, &pos, end) != 0)
+ goto error;
+ break;
default:
diag_set(SwimError, "%s unexpected key", prefix);
goto error;
@@ -1199,6 +1363,10 @@ swim_new(void)
ACK_TIMEOUT_DEFAULT, 0);
swim->wait_ack_tick.data = (void *) swim;
swim->gc_mode = SWIM_GC_ON;
+
+ /* Dissemination component. */
+ rlist_create(&swim->dissemination_queue);
+
return swim;
}
@@ -1393,6 +1561,12 @@ swim_info(struct swim *swim, struct info_handler *info)
info_end(info);
}
+int
+swim_size(const struct swim *swim)
+{
+ return mh_size(swim->members);
+}
+
void
swim_delete(struct swim *swim)
{
@@ -1407,6 +1581,7 @@ swim_delete(struct swim *swim)
rlist_del_entry(m, in_round_queue);
if (! heap_node_is_stray(&m->in_wait_ack_heap))
wait_ack_heap_delete(&swim->wait_ack_heap, m);
+ rlist_del_entry(m, in_dissemination_queue);
swim_member_delete(m);
}
wait_ack_heap_destroy(&swim->wait_ack_heap);
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 94ddc5dfa..ec924f36f 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -140,6 +140,10 @@ swim_probe_member(struct swim *swim, const char *uri);
void
swim_info(struct swim *swim, struct info_handler *info);
+/** Get SWIM member table size. */
+int
+swim_size(const struct swim *swim);
+
/** Get a SWIM member, describing this instance. */
const struct swim_member *
swim_self(struct swim *swim);
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 416e1d99e..6b3197790 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -358,6 +358,32 @@ swim_member_bin_create(struct swim_member_bin *header)
swim_passport_bin_create(&header->passport);
}
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+ uint16_t batch_size)
+{
+ header->k_header = SWIM_DISSEMINATION;
+ header->m_header = 0xdc;
+ header->v_header = mp_bswap_u16(batch_size);
+}
+
+void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+ swim_passport_bin_create(&header->passport);
+}
+
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+ enum swim_member_status status,
+ const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+ uint64_t incarnation)
+{
+ header->m_header = 0x85;
+ swim_passport_bin_fill(&header->passport, addr, uuid, status,
+ incarnation);
+}
+
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
const struct sockaddr_in *src)
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 0e73f37fb..826443a3b 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -61,6 +61,19 @@
* | |
* | OR/AND |
* | |
+ * | SWIM_DISSEMINATION: [ |
+ * | { |
+ * | SWIM_MEMBER_STATUS: uint, enum member_status, |
+ * | SWIM_MEMBER_ADDRESS: uint, ip, |
+ * | SWIM_MEMBER_PORT: uint, port, |
+ * | SWIM_MEMBER_UUID: 16 byte UUID, |
+ * | SWIM_MEMBER_INCARNATION: uint |
+ * | }, |
+ * | ... |
+ * | ], |
+ * | |
+ * | OR/AND |
+ * | |
* | SWIM_ANTI_ENTROPY: [ |
* | { |
* | SWIM_MEMBER_STATUS: uint, enum member_status, |
@@ -114,6 +127,7 @@ enum swim_body_key {
SWIM_SRC_UUID = 0,
SWIM_ANTI_ENTROPY,
SWIM_FAILURE_DETECTION,
+ SWIM_DISSEMINATION,
};
/**
@@ -308,6 +322,48 @@ swim_member_bin_fill(struct swim_member_bin *header,
/** }}} Anti-entropy component */
+/** {{{ Dissemination component */
+
+/** SWIM dissemination MessagePack template. */
+struct PACKED swim_diss_header_bin {
+ /** mp_encode_uint(SWIM_DISSEMINATION) */
+ uint8_t k_header;
+ /** mp_encode_array() */
+ uint8_t m_header;
+ uint16_t v_header;
+};
+
+/** Initialize dissemination header. */
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+ uint16_t batch_size);
+
+/** SWIM event MessagePack template. */
+struct PACKED swim_event_bin {
+ /** mp_encode_map(5 or 6) */
+ uint8_t m_header;
+ /** Basic member info like status, address. */
+ struct swim_passport_bin passport;
+};
+
+/** Initialize dissemination record. */
+void
+swim_event_bin_create(struct swim_event_bin *header);
+
+/**
+ * Since usually there are many evnets, it is faster to reset a
+ * few fields in an existing template, then each time create a
+ * new template. So the usage pattern is create(), fill(),
+ * fill() ... .
+ */
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+ enum swim_member_status status,
+ const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+ uint64_t incarnation);
+
+/** }}} Dissemination component */
+
/** {{{ Meta component */
/**
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 002ea1a5b..4412e252a 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -98,7 +98,7 @@ swim_test_sequence(void)
static void
swim_test_uuid_update(void)
{
- swim_test_start(4);
+ swim_test_start(5);
struct swim_cluster *cluster = swim_cluster_new(2);
swim_cluster_add_link(cluster, 0, 1);
@@ -109,6 +109,7 @@ swim_test_uuid_update(void)
is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), 0, "UUID update");
is(swim_cluster_wait_fullmesh(cluster, 1), 0,
"old UUID is returned back as a 'ghost' member");
+ is(swim_size(s), 3, "two members in each + ghost third member");
new_uuid.time_low = 2;
is(swim_cfg(s, NULL, -1, -1, -1, &new_uuid), -1,
"can not update to an existing UUID - swim_cfg fails");
@@ -248,11 +249,12 @@ swim_test_basic_failure_detection(void)
is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 0.1), 0,
"but it is dead after one more");
- is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX,
- 0.9), -1,
- "after 1 more unack the member still is not deleted");
- is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX,
- 0.1), 0, "but it is dropped after 1 more");
+ swim_test_run_for(1);
+ is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD, "after 2 "\
+ "more unacks the member still is not deleted - dissemination TTL "\
+ "keeps it");
+ is(swim_cluster_wait_status(cluster, 0, 1, swim_member_status_MAX, 2),
+ 0, "but it is dropped after 2 rounds when TTL gets 0");
/*
* After IO unblock pending messages will be processed all
@@ -381,7 +383,7 @@ swim_test_too_big_packet(void)
int size = 50;
double ack_timeout = 1;
double first_dead_timeout = 20;
- double everywhere_dead_timeout = size * 3;
+ double everywhere_dead_timeout = size;
int drop_id = size / 2;
struct swim_cluster *cluster = swim_cluster_new(size);
@@ -420,6 +422,35 @@ swim_test_too_big_packet(void)
swim_test_finish();
}
+static void
+swim_test_packet_loss(void)
+{
+ double network_drop_rate[] = {5, 10, 20, 50, 90};
+ swim_test_start(lengthof(network_drop_rate));
+ int size = 20;
+ int drop_id = 0;
+ double ack_timeout = 1;
+
+ for (int i = 0; i < (int) lengthof(network_drop_rate); ++i) {
+ double rate = network_drop_rate[i];
+ struct swim_cluster *cluster = swim_cluster_new(size);
+ for (int j = 0; j < size; ++j) {
+ swim_cluster_set_drop(cluster, j, rate);
+ for (int k = 0; k < size; ++k)
+ swim_cluster_add_link(cluster, j, k);
+ }
+ swim_cluster_set_ack_timeout(cluster, ack_timeout);
+ swim_cluster_set_drop(cluster, drop_id, 100);
+ swim_cluster_set_gc(cluster, SWIM_GC_OFF);
+ double timeout = size * 100.0 / (100 - rate);
+ is(swim_cluster_wait_status_everywhere(cluster, drop_id,
+ MEMBER_DEAD, 1000), 0,
+ "drop rate = %.2f, but the failure is disseminated", rate);
+ swim_cluster_delete(cluster);
+ }
+ swim_test_finish();
+}
+
static void
swim_test_undead(void)
{
@@ -442,7 +473,7 @@ swim_test_undead(void)
static int
main_f(va_list ap)
{
- swim_test_start(11);
+ swim_test_start(12);
(void) ap;
swim_test_ev_init();
@@ -459,6 +490,7 @@ main_f(va_list ap)
swim_test_basic_gossip();
swim_test_too_big_packet();
swim_test_undead();
+ swim_test_packet_loss();
swim_test_transport_free();
swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 3393870c2..615327e27 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
*** main_f ***
-1..11
+1..12
*** swim_test_one_link ***
1..6
ok 1 - no rounds - no fullmesh
@@ -16,11 +16,12 @@ ok 1 - subtests
ok 2 - subtests
*** swim_test_sequence: done ***
*** swim_test_uuid_update ***
- 1..4
+ 1..5
ok 1 - UUID update
ok 2 - old UUID is returned back as a 'ghost' member
- ok 3 - can not update to an existing UUID - swim_cfg fails
- ok 4 - diag says 'exists'
+ ok 3 - two members in each + ghost third member
+ ok 4 - can not update to an existing UUID - swim_cfg fails
+ ok 5 - diag says 'exists'
ok 3 - subtests
*** swim_test_uuid_update: done ***
*** swim_test_cfg ***
@@ -65,8 +66,8 @@ ok 5 - subtests
ok 1 - node is added as alive
ok 2 - member still is not dead after 2 noacks
ok 3 - but it is dead after one more
- ok 4 - after 1 more unack the member still is not deleted
- ok 5 - but it is dropped after 1 more
+ ok 4 - after 2 more unacks the member still is not deleted - dissemination TTL keeps it
+ ok 5 - but it is dropped after 2 rounds when TTL gets 0
ok 6 - fullmesh is restored
ok 7 - a member is added back on an ACK
ok 6 - subtests
@@ -106,4 +107,13 @@ ok 10 - subtests
ok 2 - but it is never deleted due to the cfg option
ok 11 - subtests
*** swim_test_undead: done ***
+ *** swim_test_packet_loss ***
+ 1..5
+ ok 1 - drop rate = 5.00, but the failure is disseminated
+ ok 2 - drop rate = 10.00, but the failure is disseminated
+ ok 3 - drop rate = 20.00, but the failure is disseminated
+ ok 4 - drop rate = 50.00, but the failure is disseminated
+ ok 5 - drop rate = 90.00, but the failure is disseminated
+ok 12 - subtests
+ *** swim_test_packet_loss: done ***
*** main_f: done ***
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 8+ messages in thread
* [tarantool-patches] Re: [PATCH v2 6/6] swim: introduce dissemination component
2019-04-09 11:46 ` [tarantool-patches] [PATCH v2 6/6] swim: introduce dissemination component Vladislav Shpilevoy
@ 2019-04-09 13:47 ` Konstantin Osipov
0 siblings, 0 replies; 8+ messages in thread
From: Konstantin Osipov @ 2019-04-09 13:47 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/09 14:49]:
> Dissemination components broadcasts events about member status
> updates. When any member attribute is updated (incarnation,
> status, UUID, address), the member stands into an event queue.
> Members from the queue are encoded into each round step message
> with a higher priority and before anti-entropy section.
>
> It means, then even if a cluster consists of hundreds of members
> and one of them was updated on one of instances, this update will
> be disseminated regardless of whether this memeber is encoded
> into anti-entropy section or not. It drastically speeds events
> dissemination up, according to the SWIM paper, and is noticed in
> the tests.
>
> Part of #3234
I have pushed the entire stack and my own rename to master at
13:06. It certainly didn't take me the whole day to do a rename: I
did the entire thing, plus ran tests in 15 minutes after lunch.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 8+ messages in thread