* [tarantool-patches] [PATCH 1/6] test: rename swim_cluster_node to swim_cluster_member
2019-04-24 14:36 [tarantool-patches] [PATCH 0/6] swim suspicion Vladislav Shpilevoy
@ 2019-04-24 14:36 ` Vladislav Shpilevoy
2019-04-24 16:37 ` [tarantool-patches] " Konstantin Osipov
2019-04-24 14:36 ` [tarantool-patches] [PATCH 2/6] test: remove swim packet filter destructors Vladislav Shpilevoy
` (4 subsequent siblings)
5 siblings, 1 reply; 22+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-24 14:36 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
There are two different structures - public struct swim_member
exposed by SWIM API, and struct swim_node defined and used inside
tests. Before this patch swim_cluster_node() was returning struct
swim_member, just historically. But more and more places appear
where it is wanted to safely take struct swim_node, not
swim_member, with an appropriate assertion on an invalid index.
This patch renames swim_cluster_node() to swim_cluster_member(),
and introduces new swim_cluster_node() returning swim_node.
---
test/unit/swim.c | 28 ++++++++++----------
test/unit/swim_test_utils.c | 51 +++++++++++++++++++++----------------
test/unit/swim_test_utils.h | 2 +-
3 files changed, 44 insertions(+), 37 deletions(-)
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 1cdfd45aa..e375e6607 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -105,7 +105,7 @@ swim_test_uuid_update(void)
struct swim_cluster *cluster = swim_cluster_new(2);
swim_cluster_add_link(cluster, 0, 1);
fail_if(swim_cluster_wait_fullmesh(cluster, 1) != 0);
- struct swim *s = swim_cluster_node(cluster, 0);
+ struct swim *s = swim_cluster_member(cluster, 0);
struct tt_uuid old_uuid = *swim_member_uuid(swim_self(s));
struct tt_uuid new_uuid = uuid_nil;
new_uuid.time_low = 1000;
@@ -180,8 +180,8 @@ swim_test_add_remove(void)
struct swim_cluster *cluster = swim_cluster_new(2);
swim_cluster_add_link(cluster, 0, 1);
fail_if(swim_cluster_wait_fullmesh(cluster, 1) != 0);
- struct swim *s1 = swim_cluster_node(cluster, 0);
- struct swim *s2 = swim_cluster_node(cluster, 1);
+ struct swim *s1 = swim_cluster_member(cluster, 0);
+ struct swim *s2 = swim_cluster_member(cluster, 1);
const struct swim_member *s2_self = swim_self(s2);
is(swim_add_member(s1, swim_member_uri(s2_self),
@@ -272,8 +272,8 @@ swim_test_basic_failure_detection(void)
swim_cluster_block_io(cluster, 1);
/* Next round after 1 sec + let ping hang for 0.25 sec. */
swim_run_for(1.25);
- struct swim *s1 = swim_cluster_node(cluster, 0);
- struct swim *s2 = swim_cluster_node(cluster, 1);
+ struct swim *s1 = swim_cluster_member(cluster, 0);
+ struct swim *s2 = swim_cluster_member(cluster, 1);
const struct swim_member *s2_self = swim_self(s2);
swim_remove_member(s1, swim_member_uuid(s2_self));
swim_cluster_unblock_io(cluster, 1);
@@ -343,8 +343,8 @@ swim_test_probe(void)
swim_start_test(2);
struct swim_cluster *cluster = swim_cluster_new(2);
- struct swim *s1 = swim_cluster_node(cluster, 0);
- struct swim *s2 = swim_cluster_node(cluster, 1);
+ struct swim *s1 = swim_cluster_member(cluster, 0);
+ struct swim *s2 = swim_cluster_member(cluster, 1);
const char *s2_uri = swim_member_uri(swim_self(s2));
is(swim_probe_member(s1, s2_uri), 0, "send probe");
is(swim_cluster_wait_fullmesh(cluster, 0.1), 0,
@@ -503,11 +503,11 @@ swim_test_quit(void)
* received the 'quit' message with the same UUID. Of
* course, it should be refuted.
*/
- struct swim *s0 = swim_cluster_node(cluster, 0);
+ struct swim *s0 = swim_cluster_member(cluster, 0);
struct tt_uuid s0_uuid = *swim_member_uuid(swim_self(s0));
- struct swim *s1 = swim_cluster_node(cluster, 1);
+ struct swim *s1 = swim_cluster_member(cluster, 1);
swim_remove_member(s1, &s0_uuid);
- struct swim *s2 = swim_cluster_node(cluster, 2);
+ struct swim *s2 = swim_cluster_member(cluster, 2);
swim_remove_member(s2, &s0_uuid);
swim_cluster_quit_node(cluster, 0);
@@ -578,7 +578,7 @@ swim_test_uri_update(void)
swim_cluster_add_link(cluster, 2, 1);
swim_cluster_add_link(cluster, 2, 0);
- struct swim *s0 = swim_cluster_node(cluster, 0);
+ struct swim *s0 = swim_cluster_member(cluster, 0);
const struct swim_member *s0_self = swim_self(s0);
const char *new_s0_uri = "127.0.0.5:1";
fail_if(swim_cfg(s0, "127.0.0.5:1", -1, -1, -1, NULL) != 0);
@@ -586,7 +586,7 @@ swim_test_uri_update(void)
* Since S1 knows about S2 only, one round step is enough.
*/
swim_run_for(1);
- struct swim *s1 = swim_cluster_node(cluster, 1);
+ struct swim *s1 = swim_cluster_member(cluster, 1);
const struct swim_member *s0_view =
swim_member_by_uuid(s1, swim_member_uuid(s0_self));
is(strcmp(new_s0_uri, swim_member_uri(s0_view)), 0,
@@ -619,8 +619,8 @@ swim_test_broadcast(void)
swim_start_test(6);
int size = 4;
struct swim_cluster *cluster = swim_cluster_new(size);
- struct swim *s0 = swim_cluster_node(cluster, 0);
- struct swim *s1 = swim_cluster_node(cluster, 1);
+ struct swim *s0 = swim_cluster_member(cluster, 0);
+ struct swim *s1 = swim_cluster_member(cluster, 1);
const char *s1_uri = swim_member_uri(swim_self(s1));
struct uri u;
fail_if(uri_parse(&u, s1_uri) != 0 || u.service == NULL);
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index 45570cce5..4f488fb7b 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -137,12 +137,25 @@ swim_cluster_delete(struct swim_cluster *cluster)
free(cluster);
}
+/** Safely get node of @a cluster with id @a i. */
+static inline struct swim_node *
+swim_cluster_node(struct swim_cluster *cluster, int i)
+{
+ assert(i >= 0 && i < cluster->size);
+ return &cluster->node[i];
+}
+
+struct swim *
+swim_cluster_member(struct swim_cluster *cluster, int i)
+{
+ return swim_cluster_node(cluster, i)->swim;
+}
+
int
swim_cluster_update_uuid(struct swim_cluster *cluster, int i,
const struct tt_uuid *new_uuid)
{
- assert(i >= 0 && i < cluster->size);
- struct swim_node *n = &cluster->node[i];
+ struct swim_node *n = swim_cluster_node(cluster, i);
if (swim_cfg(n->swim, NULL, -1, -1, -1, new_uuid) != 0)
return -1;
n->uuid = *new_uuid;
@@ -152,9 +165,10 @@ swim_cluster_update_uuid(struct swim_cluster *cluster, int i,
int
swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id)
{
- const struct swim_member *from = swim_self(cluster->node[from_id].swim);
- return swim_add_member(cluster->node[to_id].swim, swim_member_uri(from),
- swim_member_uuid(from));
+ const struct swim_member *from =
+ swim_self(swim_cluster_member(cluster, from_id));
+ return swim_add_member(swim_cluster_member(cluster, to_id),
+ swim_member_uri(from), swim_member_uuid(from));
}
static const struct swim_member *
@@ -165,8 +179,9 @@ swim_cluster_member_view(struct swim_cluster *cluster, int node_id,
* Do not use node[member_id].swim - it can be NULL
* already, for example, in case of quit or deletion.
*/
- return swim_member_by_uuid(cluster->node[node_id].swim,
- &cluster->node[member_id].uuid);
+ struct swim_node *n = swim_cluster_node(cluster, member_id);
+ return swim_member_by_uuid(swim_cluster_member(cluster, node_id),
+ &n->uuid);
}
enum swim_member_status
@@ -208,22 +223,14 @@ int
swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
const char *payload, uint16_t size)
{
- struct swim *s = swim_cluster_node(cluster, i);
+ struct swim *s = swim_cluster_member(cluster, i);
return swim_set_payload(s, payload, size);
}
-struct swim *
-swim_cluster_node(struct swim_cluster *cluster, int i)
-{
- assert(i >= 0 && i < cluster->size);
- return cluster->node[i].swim;
-}
-
void
swim_cluster_quit_node(struct swim_cluster *cluster, int i)
{
- assert(i >= 0 && i < cluster->size);
- struct swim_node *n = &cluster->node[i];
+ struct swim_node *n = swim_cluster_node(cluster, i);
assert(tt_uuid_is_equal(&n->uuid,
swim_member_uuid(swim_self(n->swim))));
swim_quit(n->swim);
@@ -233,8 +240,7 @@ swim_cluster_quit_node(struct swim_cluster *cluster, int i)
void
swim_cluster_restart_node(struct swim_cluster *cluster, int i)
{
- assert(i >= 0 && i < cluster->size);
- struct swim_node *n = &cluster->node[i];
+ struct swim_node *n = swim_cluster_node(cluster, i);
struct swim *s = n->swim;
char uri[128];
swim_cluster_id_to_uri(uri, i);
@@ -261,7 +267,8 @@ swim_cluster_block_io(struct swim_cluster *cluster, int i)
void
swim_cluster_unblock_io(struct swim_cluster *cluster, int i)
{
- swim_test_transport_unblock_fd(swim_fd(cluster->node[i].swim));
+ struct swim *s = swim_cluster_member(cluster, i);
+ swim_test_transport_unblock_fd(swim_fd(s));
}
/** A structure used by drop rate packet filter. */
@@ -308,7 +315,7 @@ static void
swim_cluster_set_drop_generic(struct swim_cluster *cluster, int i,
double value, bool is_for_in, bool is_for_out)
{
- int fd = swim_fd(swim_cluster_node(cluster, i));
+ int fd = swim_fd(swim_cluster_member(cluster, i));
if (value == 0) {
swim_test_transport_remove_filter(fd, swim_filter_drop_rate);
return;
@@ -374,7 +381,7 @@ void
swim_cluster_drop_components(struct swim_cluster *cluster, int i,
const int *keys, int key_count)
{
- int fd = swim_fd(swim_cluster_node(cluster, i));
+ int fd = swim_fd(swim_cluster_member(cluster, i));
if (key_count == 0) {
swim_test_transport_remove_filter(fd,
swim_filter_drop_component);
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 100a67e0c..145af9b1f 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -70,7 +70,7 @@ swim_error_check_match(const char *msg);
/** Get a SWIM instance by its ordinal number. */
struct swim *
-swim_cluster_node(struct swim_cluster *cluster, int i);
+swim_cluster_member(struct swim_cluster *cluster, int i);
/** Quit a member with id @a id. */
void
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] [PATCH 2/6] test: remove swim packet filter destructors
2019-04-24 14:36 [tarantool-patches] [PATCH 0/6] swim suspicion Vladislav Shpilevoy
2019-04-24 14:36 ` [tarantool-patches] [PATCH 1/6] test: rename swim_cluster_node to swim_cluster_member Vladislav Shpilevoy
@ 2019-04-24 14:36 ` Vladislav Shpilevoy
2019-04-24 16:37 ` [tarantool-patches] " Konstantin Osipov
2019-04-24 14:36 ` [tarantool-patches] [PATCH 3/6] test: introduce swim packet filter by destination address Vladislav Shpilevoy
` (3 subsequent siblings)
5 siblings, 1 reply; 22+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-24 14:36 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
Swim test packet filters are supposed to filter out packets
matching certain criteria or with a probability. They were
implemented as a filter-function and userdata passed into the
former on each invocation. Usually it was allocated on heap and
needed deletion. But it appeared that much simpler is to store
the filters inside struct swim_node, pass it as userdata, and get
rid of userdata destructors and dynamic allocations.
The patch is motivated by necessity to add one new filter, which
anyway will need struct swim_node as userdata.
---
test/unit/swim_test_transport.c | 14 +---
test/unit/swim_test_transport.h | 11 +--
test/unit/swim_test_utils.c | 127 +++++++++++++++++++++-----------
3 files changed, 86 insertions(+), 66 deletions(-)
diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index d82066388..513274c2c 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -109,11 +109,6 @@ swim_test_packet_dup(struct swim_test_packet *p)
struct swim_fd_filter {
/** A function to decide whether to drop a packet. */
swim_test_filter_check_f check;
- /**
- * A function called when the filter is deleted to free
- * @a udata if necessary.
- */
- swim_test_filter_delete_f delete;
/**
* Arbitrary user data. Passed to each call of @a check.
*/
@@ -124,14 +119,12 @@ struct swim_fd_filter {
/** Create a new filter. */
static inline struct swim_fd_filter *
-swim_fd_filter_new(swim_test_filter_check_f check,
- swim_test_filter_delete_f delete, void *udata)
+swim_fd_filter_new(swim_test_filter_check_f check, void *udata)
{
struct swim_fd_filter *f = (struct swim_fd_filter *) malloc(sizeof(*f));
assert(f != NULL);
f->udata = udata;
f->check = check;
- f->delete = delete;
rlist_create(&f->in_filters);
return f;
}
@@ -141,7 +134,6 @@ static inline void
swim_fd_filter_delete(struct swim_fd_filter *filter)
{
rlist_del_entry(filter, in_filters);
- filter->delete(filter->udata);
free(filter);
}
@@ -211,11 +203,11 @@ swim_test_transport_remove_filter(int fd, swim_test_filter_check_f check)
void
swim_test_transport_add_filter(int fd, swim_test_filter_check_f check,
- swim_test_filter_delete_f delete, void *udata)
+ void *udata)
{
struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE];
assert(sfd->is_opened);
- struct swim_fd_filter *f = swim_fd_filter_new(check, delete, udata);
+ struct swim_fd_filter *f = swim_fd_filter_new(check, udata);
swim_test_transport_remove_filter(fd, check);
rlist_add_tail_entry(&sfd->filters, f, in_filters);
}
diff --git a/test/unit/swim_test_transport.h b/test/unit/swim_test_transport.h
index d166dc600..a66cf2d81 100644
--- a/test/unit/swim_test_transport.h
+++ b/test/unit/swim_test_transport.h
@@ -50,13 +50,6 @@ struct ev_loop;
typedef bool (*swim_test_filter_check_f)(const char *data, int size,
void *udata, int dir);
-/**
- * It is possible that a filter is complex and uses helper data
- * allocated somewhere. This function is called when the filter
- * is dropped and allows to free user data.
- */
-typedef void (*swim_test_filter_delete_f)(void *udata);
-
/**
* Until there are no new IO events, feed EV_WRITE event to all
* opened descriptors; EV_READ to ones, who have not empty recv
@@ -84,14 +77,12 @@ swim_test_transport_unblock_fd(int fd);
* @param fd File descriptor to add filter to.
* @param check Check function. It is called for each packet and
* should return true, when the packet should be dropped.
- * @param delete A destructor for @a udata called when the filter
- * is dropped.
* @param udata Arbitrary user data, passed to each @a check
* invocation.
*/
void
swim_test_transport_add_filter(int fd, swim_test_filter_check_f check,
- swim_test_filter_delete_f delete, void *udata);
+ void *udata);
/**
* Remove a filter having @a check function. Works just like the
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index 4f488fb7b..0beeab65d 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -37,6 +37,49 @@
#include "fiber.h"
#include "msgpuck.h"
+/**
+ * Drop rate packet filter to drop packets with a certain
+ * probability.
+ */
+struct swim_drop_rate {
+ /** True if should be applied to incoming packets. */
+ bool is_for_in;
+ /** True if should be applied to outgoing packets. */
+ bool is_for_out;
+ /** Drop rate percentage. */
+ double rate;
+};
+
+/** Initialize drop rate packet filter. */
+static inline void
+swim_drop_rate_create(struct swim_drop_rate *dr, double rate, bool is_for_in,
+ bool is_for_out)
+{
+ dr->is_for_in = is_for_in;
+ dr->is_for_out = is_for_out;
+ dr->rate = rate;
+}
+
+/**
+ * Drop components packet filter to drop packets containing
+ * specified SWIM components.
+ */
+struct swim_drop_components {
+ /** List of component body keys. */
+ const int *keys;
+ /** Length of @a keys. */
+ int key_count;
+};
+
+/** Initialize drop components packet filter. */
+static inline void
+swim_drop_components_create(struct swim_drop_components *dc, const int *keys,
+ int key_count)
+{
+ dc->keys = keys;
+ dc->key_count = key_count;
+}
+
/**
* SWIM cluster node and its UUID. UUID is stored separately
* because sometimes a test wants to drop a SWIM instance, but
@@ -52,6 +95,15 @@ struct swim_node {
* that instance.
*/
struct tt_uuid uuid;
+ /**
+ * Filter to drop packets with a certain probability
+ * from/to a specified direction.
+ */
+ struct swim_drop_rate drop_rate;
+ /**
+ * Filter to drop packets with specified SWIM components.
+ */
+ struct swim_drop_components drop_components;
};
/**
@@ -76,6 +128,24 @@ swim_cluster_id_to_uri(char *buffer, int id)
sprintf(buffer, "127.0.0.1:%d", id + 1);
}
+/** Create a SWIM cluster node @a n with a 0-based @a id. */
+static inline void
+swim_node_create(struct swim_node *n, int id)
+{
+ n->swim = swim_new();
+ assert(n->swim != NULL);
+ char uri[128];
+ swim_cluster_id_to_uri(uri, id);
+ n->uuid = uuid_nil;
+ n->uuid.time_low = id + 1;
+ int rc = swim_cfg(n->swim, uri, -1, -1, -1, &n->uuid);
+ assert(rc == 0);
+ (void) rc;
+
+ swim_drop_rate_create(&n->drop_rate, 0, false, false);
+ swim_drop_components_create(&n->drop_components, NULL, 0);
+}
+
struct swim_cluster *
swim_cluster_new(int size)
{
@@ -87,20 +157,9 @@ swim_cluster_new(int size)
res->size = size;
res->ack_timeout = -1;
res->gc_mode = SWIM_GC_DEFAULT;
- struct tt_uuid uuid;
- memset(&uuid, 0, sizeof(uuid));
- char *uri = tt_static_buf();
struct swim_node *n = res->node;
- for (int i = 0; i < size; ++i, ++n) {
- n->swim = swim_new();
- assert(n->swim != NULL);
- swim_cluster_id_to_uri(uri, i);
- uuid.time_low = i + 1;
- n->uuid = uuid;
- int rc = swim_cfg(n->swim, uri, -1, -1, -1, &uuid);
- assert(rc == 0);
- (void) rc;
- }
+ for (int i = 0; i < size; ++i, ++n)
+ swim_node_create(n, i);
return res;
}
@@ -271,16 +330,6 @@ swim_cluster_unblock_io(struct swim_cluster *cluster, int i)
swim_test_transport_unblock_fd(swim_fd(s));
}
-/** A structure used by drop rate packet filter. */
-struct swim_drop_rate {
- /** True if should be applied to incoming packets. */
- bool is_for_in;
- /** True if should be applied to outgoing packets. */
- bool is_for_out;
- /** Drop rate percentage. */
- double rate;
-};
-
/** Create a new drop rate filter helper. */
static inline struct swim_drop_rate *
swim_drop_rate_new(double rate, bool is_for_in, bool is_for_out)
@@ -315,14 +364,15 @@ static void
swim_cluster_set_drop_generic(struct swim_cluster *cluster, int i,
double value, bool is_for_in, bool is_for_out)
{
- int fd = swim_fd(swim_cluster_member(cluster, i));
+ struct swim_node *n = swim_cluster_node(cluster, i);
+ int fd = swim_fd(n->swim);
if (value == 0) {
swim_test_transport_remove_filter(fd, swim_filter_drop_rate);
return;
}
- struct swim_drop_rate *dr = swim_drop_rate_new(value, is_for_in,
- is_for_out);
- swim_test_transport_add_filter(fd, swim_filter_drop_rate, free, dr);
+ swim_drop_rate_create(&n->drop_rate, value, is_for_in, is_for_out);
+ swim_test_transport_add_filter(fd, swim_filter_drop_rate,
+ &n->drop_rate);
}
void
@@ -343,16 +393,6 @@ swim_cluster_set_drop_in(struct swim_cluster *cluster, int i, double value)
swim_cluster_set_drop_generic(cluster, i, value, true, false);
}
-/**
- * A list of components to drop used by component packet filter.
- */
-struct swim_drop_components {
- /** List of component body keys. */
- const int *keys;
- /** Length of @a keys. */
- int key_count;
-};
-
/**
* Check if a packet contains any of the components to filter out.
*/
@@ -381,19 +421,16 @@ void
swim_cluster_drop_components(struct swim_cluster *cluster, int i,
const int *keys, int key_count)
{
- int fd = swim_fd(swim_cluster_member(cluster, i));
+ struct swim_node *n = swim_cluster_node(cluster, i);
+ int fd = swim_fd(n->swim);
if (key_count == 0) {
swim_test_transport_remove_filter(fd,
swim_filter_drop_component);
return;
}
- struct swim_drop_components *dc =
- (struct swim_drop_components *) malloc(sizeof(*dc));
- assert(dc != NULL);
- dc->key_count = key_count;
- dc->keys = keys;
- swim_test_transport_add_filter(fd, swim_filter_drop_component, free,
- dc);
+ swim_drop_components_create(&n->drop_components, keys, key_count);
+ swim_test_transport_add_filter(fd, swim_filter_drop_component,
+ &n->drop_components);
}
/** Check if @a s1 knows every member of @a s2's table. */
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] Re: [PATCH 2/6] test: remove swim packet filter destructors
2019-04-24 14:36 ` [tarantool-patches] [PATCH 2/6] test: remove swim packet filter destructors Vladislav Shpilevoy
@ 2019-04-24 16:37 ` Konstantin Osipov
0 siblings, 0 replies; 22+ messages in thread
From: Konstantin Osipov @ 2019-04-24 16:37 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/24 18:50]:
> Swim test packet filters are supposed to filter out packets
> matching certain criteria or with a probability. They were
> implemented as a filter-function and userdata passed into the
> former on each invocation. Usually it was allocated on heap and
> needed deletion. But it appeared that much simpler is to store
> the filters inside struct swim_node, pass it as userdata, and get
> rid of userdata destructors and dynamic allocations.
>
> The patch is motivated by necessity to add one new filter, which
> anyway will need struct swim_node as userdata.
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] [PATCH 3/6] test: introduce swim packet filter by destination address
2019-04-24 14:36 [tarantool-patches] [PATCH 0/6] swim suspicion Vladislav Shpilevoy
2019-04-24 14:36 ` [tarantool-patches] [PATCH 1/6] test: rename swim_cluster_node to swim_cluster_member Vladislav Shpilevoy
2019-04-24 14:36 ` [tarantool-patches] [PATCH 2/6] test: remove swim packet filter destructors Vladislav Shpilevoy
@ 2019-04-24 14:36 ` Vladislav Shpilevoy
2019-04-24 16:38 ` [tarantool-patches] " Konstantin Osipov
2019-04-24 14:36 ` [tarantool-patches] [PATCH 4/6] swim: wrap sio_strfaddr() Vladislav Shpilevoy
` (2 subsequent siblings)
5 siblings, 1 reply; 22+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-24 14:36 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
The filter is going to be used to test the SWIM suspicion
component. The destination filter will break certain network
channels, and the suspicion component shall withstand that.
Part of #3234
---
test/unit/swim_test_transport.c | 10 +--
test/unit/swim_test_transport.h | 6 +-
test/unit/swim_test_utils.c | 114 +++++++++++++++++++++++++++++++-
test/unit/swim_test_utils.h | 11 +++
4 files changed, 133 insertions(+), 8 deletions(-)
diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index 513274c2c..c4a1dd774 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -238,14 +238,16 @@ swim_fd_close(struct swim_fd *fd)
* Check all the packet filters if any wants to drop @a p packet.
* @a dir parameter says direction. Values are the same as for
* standard in/out descriptors: 0 for input, 1 for output.
+ * @a peer_fd says sender/receiver file descriptor depending on
+ * @a dir.
*/
static inline bool
swim_fd_test_if_drop(struct swim_fd *fd, const struct swim_test_packet *p,
- int dir)
+ int dir, int peer_fd)
{
struct swim_fd_filter *f;
rlist_foreach_entry(f, &fd->filters, in_filters) {
- if (f->check(p->data, p->size, f->udata, dir))
+ if (f->check(p->data, p->size, f->udata, dir, peer_fd))
return true;
}
return false;
@@ -380,8 +382,8 @@ static inline void
swim_move_packet(struct swim_fd *src, struct swim_fd *dst,
struct swim_test_packet *p)
{
- if (dst->is_opened && !swim_fd_test_if_drop(dst, p, 0) &&
- !swim_fd_test_if_drop(src, p, 1))
+ if (dst->is_opened && !swim_fd_test_if_drop(dst, p, 0, src->evfd) &&
+ !swim_fd_test_if_drop(src, p, 1, dst->evfd))
rlist_add_tail_entry(&dst->recv_queue, p, in_queue);
else
swim_test_packet_delete(p);
diff --git a/test/unit/swim_test_transport.h b/test/unit/swim_test_transport.h
index a66cf2d81..6235278d0 100644
--- a/test/unit/swim_test_transport.h
+++ b/test/unit/swim_test_transport.h
@@ -45,10 +45,12 @@ struct ev_loop;
* arbitrary user data, and should return true, if the packet
* should be dropped. False otherwise. Direction is said via
* @a dir parameter. 0 means incoming packet, 1 means outgoing
- * packet, just like standard IO descriptors.
+ * packet, just like standard IO descriptors. Via @a peer_fd
+ * parameter a sender/receiver descriptor number is passed
+ * depending on @a dir.
*/
typedef bool (*swim_test_filter_check_f)(const char *data, int size,
- void *udata, int dir);
+ void *udata, int dir, int peer_fd);
/**
* Until there are no new IO events, feed EV_WRITE event to all
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index 0beeab65d..de1bef6e7 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -80,6 +80,67 @@ swim_drop_components_create(struct swim_drop_components *dc, const int *keys,
dc->key_count = key_count;
}
+/** Packet filter to drop packets with specified destinations. */
+struct swim_drop_channel {
+ /**
+ * An array of file descriptors to drop messages sent to
+ * them.
+ */
+ int *drop_fd;
+ /** Length of @a drop_fd. */
+ int drop_fd_size;
+ /** Capacity of @a drop_fd. */
+ int drop_fd_cap;
+};
+
+/** Initialize drop channel packet filter. */
+static inline void
+swim_drop_channel_create(struct swim_drop_channel *dc)
+{
+ dc->drop_fd = NULL;
+ dc->drop_fd_size = 0;
+ dc->drop_fd_cap = 0;
+}
+
+/**
+ * Set @a new_fd file descriptor into @a dc drop channel packet
+ * filter in place of @a old_fd descriptor. Just like dup2()
+ * system call.
+ * @retval 0 Success.
+ * @retval -1 @a old_fd is not found.
+ */
+static inline int
+swim_drop_channel_dup_fd(const struct swim_drop_channel *dc, int new_fd,
+ int old_fd)
+{
+ for (int i = 0; i < dc->drop_fd_size; ++i) {
+ if (dc->drop_fd[i] == old_fd) {
+ dc->drop_fd[i] = new_fd;
+ return 0;
+ }
+ }
+ return -1;
+}
+
+/** Add @a fd to @a dc drop channel packet filter. */
+static inline void
+swim_drop_channel_add_fd(struct swim_drop_channel *dc, int fd)
+{
+ if (swim_drop_channel_dup_fd(dc, fd, -1) == 0)
+ return;
+ dc->drop_fd_cap += dc->drop_fd_cap + 1;
+ int new_bsize = dc->drop_fd_cap * sizeof(int);
+ dc->drop_fd = (int *) realloc(dc->drop_fd, new_bsize);
+ dc->drop_fd[dc->drop_fd_size++] = fd;
+}
+
+/** Destroy drop channel packet filter. */
+static inline void
+swim_drop_channel_destroy(struct swim_drop_channel *dc)
+{
+ free(dc->drop_fd);
+}
+
/**
* SWIM cluster node and its UUID. UUID is stored separately
* because sometimes a test wants to drop a SWIM instance, but
@@ -104,6 +165,8 @@ struct swim_node {
* Filter to drop packets with specified SWIM components.
*/
struct swim_drop_components drop_components;
+ /** Filter to drop packets with specified destinations. */
+ struct swim_drop_channel drop_channel;
};
/**
@@ -144,6 +207,7 @@ swim_node_create(struct swim_node *n, int id)
swim_drop_rate_create(&n->drop_rate, 0, false, false);
swim_drop_components_create(&n->drop_components, NULL, 0);
+ swim_drop_channel_create(&n->drop_channel);
}
struct swim_cluster *
@@ -191,6 +255,7 @@ swim_cluster_delete(struct swim_cluster *cluster)
for (int i = 0; i < cluster->size; ++i) {
if (cluster->node[i].swim != NULL)
swim_delete(cluster->node[i].swim);
+ swim_drop_channel_destroy(&cluster->node[i].drop_channel);
}
free(cluster->node);
free(cluster);
@@ -347,10 +412,12 @@ swim_drop_rate_new(double rate, bool is_for_in, bool is_for_out)
* A packet filter dropping a packet with a certain probability.
*/
static bool
-swim_filter_drop_rate(const char *data, int size, void *udata, int dir)
+swim_filter_drop_rate(const char *data, int size, void *udata, int dir,
+ int peer_fd)
{
(void) data;
(void) size;
+ (void) peer_fd;
struct swim_drop_rate *dr = (struct swim_drop_rate *) udata;
if ((dir == 0 && !dr->is_for_in) || (dir == 1 && !dr->is_for_out))
return false;
@@ -397,10 +464,12 @@ swim_cluster_set_drop_in(struct swim_cluster *cluster, int i, double value)
* Check if a packet contains any of the components to filter out.
*/
static bool
-swim_filter_drop_component(const char *data, int size, void *udata, int dir)
+swim_filter_drop_component(const char *data, int size, void *udata, int dir,
+ int peer_fd)
{
(void) size;
(void) dir;
+ (void) peer_fd;
struct swim_drop_components *dc = (struct swim_drop_components *) udata;
/* Skip meta. */
mp_next(&data);
@@ -433,6 +502,47 @@ swim_cluster_drop_components(struct swim_cluster *cluster, int i,
&n->drop_components);
}
+/**
+ * Check if the packet sender should drop a packet outgoing to
+ * @a peer_fd file descriptor.
+ */
+static bool
+swim_filter_drop_channel(const char *data, int size, void *udata, int dir,
+ int peer_fd)
+{
+ (void) data;
+ (void) size;
+ if (dir != 1)
+ return false;
+ struct swim_drop_channel *dc = (struct swim_drop_channel *) udata;
+ /*
+ * Fullscan is totally ok - there are no more than 2-3
+ * blocks simultaneously in the tests.
+ */
+ for (int i = 0; i < dc->drop_fd_size; ++i) {
+ if (dc->drop_fd[i] == peer_fd)
+ return true;
+ }
+ return false;
+}
+
+void
+swim_cluster_set_drop_channel(struct swim_cluster *cluster, int from_id,
+ int to_id, bool value)
+{
+ int to_fd = swim_fd(swim_cluster_member(cluster, to_id));
+ struct swim_node *from_node = swim_cluster_node(cluster, from_id);
+ struct swim_drop_channel *dc = &from_node->drop_channel;
+ if (! value) {
+ swim_drop_channel_dup_fd(dc, -1, to_fd);
+ return;
+ }
+ swim_drop_channel_add_fd(dc, to_fd);
+ swim_test_transport_add_filter(swim_fd(from_node->swim),
+ swim_filter_drop_channel,
+ &from_node->drop_channel);
+}
+
/** Check if @a s1 knows every member of @a s2's table. */
static inline bool
swim1_contains_swim2(struct swim *s1, struct swim *s2)
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 145af9b1f..c78894820 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -118,6 +118,17 @@ void
swim_cluster_drop_components(struct swim_cluster *cluster, int i,
const int *keys, int key_count);
+/**
+ * When @a value is true, break a one direction network link
+ * between @a to_id and @a from_id SWIM instances. It is a pure
+ * network block, the member tables are not touched. All the
+ * packets trying to go directly from @a from_id to @a to_id are
+ * dropped. When @a value is false, the channel is restored.
+ */
+void
+swim_cluster_set_drop_channel(struct swim_cluster *cluster, int from_id,
+ int to_id, bool value);
+
/**
* Explicitly add a member of id @a from_id to a member of id
* @a to_id.
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] [PATCH 4/6] swim: wrap sio_strfaddr()
2019-04-24 14:36 [tarantool-patches] [PATCH 0/6] swim suspicion Vladislav Shpilevoy
` (2 preceding siblings ...)
2019-04-24 14:36 ` [tarantool-patches] [PATCH 3/6] test: introduce swim packet filter by destination address Vladislav Shpilevoy
@ 2019-04-24 14:36 ` Vladislav Shpilevoy
2019-04-24 16:40 ` [tarantool-patches] " Konstantin Osipov
2019-04-24 14:36 ` [tarantool-patches] [PATCH 5/6] swim: introduce routing Vladislav Shpilevoy
2019-04-24 14:36 ` [tarantool-patches] [PATCH 6/6] swim: introduce suspicion Vladislav Shpilevoy
5 siblings, 1 reply; 22+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-24 14:36 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
SIO provides a function sio_strfaddr() to obtain string
representation of arbitrary struct sockaddr. And it uses
singleton buffer to store results, because some of them can't
fit into tt_static_buf() according to POSIX name limits.
SWIM uses only AF_INET addresses, they are short enough to fit
into tt_static_buf(). Also SWIM is going to call sio_strfaddr()
on more than 1 address in a row in subsequent patches, and
singleton buffer does not work here - each call will overwrite
result of the previous. Besides, SWIM never uses struct sockaddr
type. All these reasons are sufficient to wrap sio_strfaddr()
with encapsulated cast to struct sockaddr *, and copying on
tt_static_buf().
Part of #3234
---
src/lib/swim/swim.c | 10 +++-------
src/lib/swim/swim_io.c | 5 ++---
src/lib/swim/swim_proto.c | 8 ++++++++
src/lib/swim/swim_proto.h | 10 ++++++++++
4 files changed, 23 insertions(+), 10 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 230ec52d4..bb1ded713 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1086,8 +1086,7 @@ swim_send_fd_msg(struct swim *swim, struct swim_task *task,
assert(map_size == 2);
mp_encode_map(header, map_size);
say_verbose("SWIM %d: schedule %s to %s", swim_fd(swim),
- swim_fd_msg_type_strs[type],
- sio_strfaddr((struct sockaddr *) dst, sizeof(*dst)));
+ swim_fd_msg_type_strs[type], swim_inaddr_str(dst));
swim_task_send(task, dst, &swim->scheduler);
}
@@ -1732,9 +1731,7 @@ swim_info(struct swim *swim, struct info_handler *info)
node = mh_next(swim->members, node)) {
struct swim_member *m =
*mh_swim_table_node(swim->members, node);
- info_table_begin(info,
- sio_strfaddr((struct sockaddr *) &m->addr,
- sizeof(m->addr)));
+ info_table_begin(info, swim_inaddr_str(&m->addr));
info_append_str(info, "status",
swim_member_status_strs[m->status]);
info_append_str(info, "uuid", swim_uuid_str(&m->uuid));
@@ -1888,8 +1885,7 @@ swim_iterator_close(struct swim_iterator *iterator)
const char *
swim_member_uri(const struct swim_member *member)
{
- return sio_strfaddr((const struct sockaddr *) &member->addr,
- sizeof(member->addr));
+ return swim_inaddr_str(&member->addr);
}
const struct tt_uuid *
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 7d6addf02..a55c15f30 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -321,8 +321,7 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
rlist_shift_entry(&scheduler->queue_output, struct swim_task,
in_queue_output);
say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler),
- task->desc, sio_strfaddr((struct sockaddr *) &task->dst,
- sizeof(task->dst)));
+ task->desc, swim_inaddr_str(&task->dst));
swim_packet_build_meta(&task->packet, &scheduler->transport.addr);
int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
task->packet.pos - task->packet.buf,
@@ -353,7 +352,7 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
return;
}
say_verbose("SWIM %d: received from %s", swim_scheduler_fd(scheduler),
- sio_strfaddr((struct sockaddr *) &src, len));
+ swim_inaddr_str(&src));
struct swim_meta_def meta;
const char *pos = buf, *end = pos + size;
if (swim_meta_def_decode(&meta, &pos, end) < 0)
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 91500518d..796559e8e 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -33,8 +33,16 @@
#include "say.h"
#include "version.h"
#include "diag.h"
+#include "sio.h"
#include <sys/socket.h> /* AF_INET for FreeBSD. */
+const char *
+swim_inaddr_str(const struct sockaddr_in *addr)
+{
+ return tt_sprintf("%s", sio_strfaddr((struct sockaddr *) addr,
+ sizeof(*addr)));
+}
+
const char *swim_member_status_strs[] = {
"alive",
"dead",
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 59a4d2086..ddf9e28db 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -540,4 +540,14 @@ swim_inaddr_eq(const struct sockaddr_in *a1, const struct sockaddr_in *a2)
a1->sin_addr.s_addr == a2->sin_addr.s_addr;
}
+/**
+ * A wrapper around sio_strfaddr() so as to 1) do not clog SWIM
+ * code with huge casts to 'struct sockaddr *' and passes of
+ * sizeof(struct sockaddr_in); 2) copy on tt_static_buf instead of
+ * internal singleton sio buffer - otherwise non-first invocation
+ * of sio_strfaddr() rewrites the previous one.
+ */
+const char *
+swim_inaddr_str(const struct sockaddr_in *addr);
+
#endif /* TARANTOOL_SWIM_PROTO_H_INCLUDED */
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] Re: [PATCH 4/6] swim: wrap sio_strfaddr()
2019-04-24 14:36 ` [tarantool-patches] [PATCH 4/6] swim: wrap sio_strfaddr() Vladislav Shpilevoy
@ 2019-04-24 16:40 ` Konstantin Osipov
2019-04-24 20:23 ` Vladislav Shpilevoy
0 siblings, 1 reply; 22+ messages in thread
From: Konstantin Osipov @ 2019-04-24 16:40 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/24 18:50]:
> SIO provides a function sio_strfaddr() to obtain string
> representation of arbitrary struct sockaddr. And it uses
> singleton buffer to store results, because some of them can't
> fit into tt_static_buf() according to POSIX name limits.
This is minor, but I would make sio_strfaddr() use the same set of
buffers as provided by tt_static_buf() instead, with a static
assert that the buffer size is large enough.
> SWIM uses only AF_INET addresses, they are short enough to fit
> into tt_static_buf(). Also SWIM is going to call sio_strfaddr()
> on more than 1 address in a row in subsequent patches, and
> singleton buffer does not work here - each call will overwrite
> result of the previous. Besides, SWIM never uses struct sockaddr
> type. All these reasons are sufficient to wrap sio_strfaddr()
> with encapsulated cast to struct sockaddr *, and copying on
> tt_static_buf().
>
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] Re: [PATCH 4/6] swim: wrap sio_strfaddr()
2019-04-24 16:40 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-24 20:23 ` Vladislav Shpilevoy
2019-04-25 10:34 ` Konstantin Osipov
0 siblings, 1 reply; 22+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-24 20:23 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov
On 24/04/2019 19:40, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/24 18:50]:
>> SIO provides a function sio_strfaddr() to obtain string
>> representation of arbitrary struct sockaddr. And it uses
>> singleton buffer to store results, because some of them can't
>> fit into tt_static_buf() according to POSIX name limits.
>
> This is minor, but I would make sio_strfaddr() use the same set of
> buffers as provided by tt_static_buf() instead, with a static
> assert that the buffer size is large enough.
As I said, the tt_static_buf is not big enough. SIO uses NI_MAXHOST
constant which on Linux is 1025 > size of tt_static_buf.
http://man7.org/linux/man-pages/man3/getnameinfo.3.html
If you still think we will not break anything, I can do that. Just
confirm that you are sure.
It will not eliminate swim_inaddr_str() though, because most of
the problems were about huge cast to 'const struct sockaddr *'
and passing sizeof(struct sockaddr_in) each time - it made code
unreadable.
>
>> SWIM uses only AF_INET addresses, they are short enough to fit
>> into tt_static_buf(). Also SWIM is going to call sio_strfaddr()
>> on more than 1 address in a row in subsequent patches, and
>> singleton buffer does not work here - each call will overwrite
>> result of the previous. Besides, SWIM never uses struct sockaddr
>> type. All these reasons are sufficient to wrap sio_strfaddr()
>> with encapsulated cast to struct sockaddr *, and copying on
>> tt_static_buf().
>>
>
> --
> Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
> http://tarantool.io - www.twitter.com/kostja_osipov
>
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] Re: [PATCH 4/6] swim: wrap sio_strfaddr()
2019-04-24 20:23 ` Vladislav Shpilevoy
@ 2019-04-25 10:34 ` Konstantin Osipov
2019-04-25 13:50 ` Vladislav Shpilevoy
0 siblings, 1 reply; 22+ messages in thread
From: Konstantin Osipov @ 2019-04-25 10:34 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/25 00:46]:
> > This is minor, but I would make sio_strfaddr() use the same set of
> > buffers as provided by tt_static_buf() instead, with a static
> > assert that the buffer size is large enough.
>
> As I said, the tt_static_buf is not big enough. SIO uses NI_MAXHOST
> constant which on Linux is 1025 > size of tt_static_buf.
>
> http://man7.org/linux/man-pages/man3/getnameinfo.3.html
>
> If you still think we will not break anything, I can do that. Just
> confirm that you are sure.
>
> It will not eliminate swim_inaddr_str() though, because most of
> the problems were about huge cast to 'const struct sockaddr *'
> and passing sizeof(struct sockaddr_in) each time - it made code
> unreadable.
I don't have a strong opinion either way, but the length problem
could be fixed by increasing tt_static_buf() size, and adding a
static assert that it's greater than NI_MAXHOST.
It's OK to keep swim_inaddr_str() wrapper.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] Re: [PATCH 4/6] swim: wrap sio_strfaddr()
2019-04-25 10:34 ` Konstantin Osipov
@ 2019-04-25 13:50 ` Vladislav Shpilevoy
0 siblings, 0 replies; 22+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-25 13:50 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
On 25/04/2019 13:34, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/25 00:46]:
>>> This is minor, but I would make sio_strfaddr() use the same set of
>>> buffers as provided by tt_static_buf() instead, with a static
>>> assert that the buffer size is large enough.
>>
>> As I said, the tt_static_buf is not big enough. SIO uses NI_MAXHOST
>> constant which on Linux is 1025 > size of tt_static_buf.
>>
>> http://man7.org/linux/man-pages/man3/getnameinfo.3.html
>>
>> If you still think we will not break anything, I can do that. Just
>> confirm that you are sure.
>>
>> It will not eliminate swim_inaddr_str() though, because most of
>> the problems were about huge cast to 'const struct sockaddr *'
>> and passing sizeof(struct sockaddr_in) each time - it made code
>> unreadable.
>
> I don't have a strong opinion either way, but the length problem
> could be fixed by increasing tt_static_buf() size, and adding a
> static assert that it's greater than NI_MAXHOST.
Done in a separate commit.
>
> It's OK to keep swim_inaddr_str() wrapper.
>
> --
> Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
> http://tarantool.io - www.twitter.com/kostja_osipov
>
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] [PATCH 5/6] swim: introduce routing
2019-04-24 14:36 [tarantool-patches] [PATCH 0/6] swim suspicion Vladislav Shpilevoy
` (3 preceding siblings ...)
2019-04-24 14:36 ` [tarantool-patches] [PATCH 4/6] swim: wrap sio_strfaddr() Vladislav Shpilevoy
@ 2019-04-24 14:36 ` Vladislav Shpilevoy
2019-04-24 16:46 ` [tarantool-patches] " Konstantin Osipov
2019-04-24 14:36 ` [tarantool-patches] [PATCH 6/6] swim: introduce suspicion Vladislav Shpilevoy
5 siblings, 1 reply; 22+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-24 14:36 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
Before the patch SWIM packets were being sent quite
straightforward from one instance to another with transparent
routing on Internet Level of TCP/IP. But the SWIM paper
describes last yet not implemented component - suspicion
mechanism.
So as not to overload this message with suspicion details it is
enough to say that it makes possible sending a packet through an
intermediate SWIM instance, not directly.
This commit extends the SWIM protocol with a new transport-level
section named 'routing'. It allows to send indirect SWIM messages
transparently via packet forwarding implemented fully inside
transportation component, in swim_io.c.
Part of #3234
---
src/lib/swim/swim.c | 4 +-
src/lib/swim/swim_io.c | 96 +++++++++++++++++++++++++++++++++----
src/lib/swim/swim_io.h | 15 +++++-
src/lib/swim/swim_proto.c | 83 +++++++++++++++++++++++++++++++-
src/lib/swim/swim_proto.h | 62 ++++++++++++++++++++++--
test/unit/swim_proto.c | 56 +++++++++++++++++++++-
test/unit/swim_proto.result | 11 ++++-
7 files changed, 310 insertions(+), 17 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index bb1ded713..1b4a4365d 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1430,8 +1430,10 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end,
/** Process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler, const char *pos,
- const char *end, const struct sockaddr_in *src)
+ const char *end, const struct sockaddr_in *src,
+ const struct sockaddr_in *proxy)
{
+ (void) proxy;
const char *prefix = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index a55c15f30..c7ffc31d5 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -60,8 +60,11 @@ swim_packet_create(struct swim_packet *packet)
/** Fill metadata prefix of a packet. */
static inline void
swim_packet_build_meta(struct swim_packet *packet,
- const struct sockaddr_in *src)
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *route_src,
+ const struct sockaddr_in *route_dst)
{
+ assert((route_src != NULL) == (route_dst != NULL));
char *meta = packet->meta;
char *end = packet->body;
/*
@@ -71,9 +74,18 @@ swim_packet_build_meta(struct swim_packet *packet,
if (meta == end)
return;
struct swim_meta_header_bin header;
- swim_meta_header_bin_create(&header, src);
- assert(meta + sizeof(header) == end);
+ swim_meta_header_bin_create(&header, src, route_dst != NULL);
+ assert(meta + sizeof(header) <= end);
memcpy(meta, &header, sizeof(header));
+ meta += sizeof(header);
+ if (route_dst != NULL) {
+ struct swim_route_bin route;
+ swim_route_bin_create(&route, route_src, route_dst);
+ assert(meta + sizeof(route) <= end);
+ memcpy(meta, &route, sizeof(route));
+ meta += sizeof(route);
+ }
+ assert(meta == end);
/*
* Once meta is built, it is consumed by the body. Used
* not to rebuild the meta again if the task will be
@@ -82,6 +94,21 @@ swim_packet_build_meta(struct swim_packet *packet,
packet->body = packet->meta;
}
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
+{
+ /*
+ * Route meta should be reserved before body encoding is
+ * started. Otherwise later it would be necessary to move
+ * already encoded body, maybe having its tail trimmed off
+ * because of limited UDP packet size.
+ */
+ assert(swim_packet_body_size(&task->packet) == 0);
+ assert(! swim_inaddr_is_empty(proxy));
+ task->proxy = *proxy;
+ swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin));
+}
+
void
swim_task_create(struct swim_task *task, swim_task_f complete,
swim_task_f cancel, const char *desc)
@@ -161,6 +188,11 @@ swim_bcast_task_delete_cb(struct swim_task *task,
static int
swim_bcast_task_next_addr(struct swim_bcast_task *task)
{
+ /*
+ * Broadcast + proxy is not supported yet, and barely it
+ * will be needed anytime.
+ */
+ assert(swim_inaddr_is_empty(&task->base.proxy));
for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) {
int flags = i->ifa_flags;
if ((flags & IFF_UP) == 0)
@@ -320,13 +352,23 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
struct swim_task *task =
rlist_shift_entry(&scheduler->queue_output, struct swim_task,
in_queue_output);
+ const struct sockaddr_in *src = &scheduler->transport.addr;
+ const struct sockaddr_in *dst = &task->dst;
+ const char *dst_str = swim_inaddr_str(dst);
+ if (! swim_inaddr_is_empty(&task->proxy)) {
+ dst = &task->proxy;
+ dst_str = tt_sprintf("%s via %s", dst_str,
+ swim_inaddr_str(dst));
+ swim_packet_build_meta(&task->packet, src, src, &task->dst);
+ } else {
+ swim_packet_build_meta(&task->packet, src, NULL, NULL);
+ }
say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler),
- task->desc, swim_inaddr_str(&task->dst));
- swim_packet_build_meta(&task->packet, &scheduler->transport.addr);
+ task->desc, dst_str);
int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
task->packet.pos - task->packet.buf,
- (const struct sockaddr *) &task->dst,
- sizeof(task->dst));
+ (const struct sockaddr *) dst,
+ sizeof(*dst));
if (rc < 0)
diag_log();
if (task->complete != NULL)
@@ -357,7 +399,45 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
const char *pos = buf, *end = pos + size;
if (swim_meta_def_decode(&meta, &pos, end) < 0)
goto error;
- scheduler->on_input(scheduler, pos, end, &meta.src);
+ /*
+ * Check if this instance is not the destination and
+ * possibly forward the packet.
+ */
+ if (! meta.is_route_specified) {
+ scheduler->on_input(scheduler, pos, end, &meta.src, NULL);
+ return;
+ }
+ struct sockaddr_in *self = &scheduler->transport.addr;
+ if (swim_inaddr_eq(&meta.route.dst, self)) {
+ scheduler->on_input(scheduler, pos, end, &meta.route.src,
+ &meta.src);
+ return;
+ }
+ /* Forward the foreign packet. */
+ struct swim_task *task = swim_task_new(swim_task_delete_cb,
+ swim_task_delete_cb,
+ "foreign packet");
+ if (task == NULL)
+ goto error;
+ /*
+ * Allocate route meta explicitly, because the packet
+ * should keep route meta even being sent to the final
+ * destination directly.
+ */
+ swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin));
+ /*
+ * Meta should be rebuilt with the different source
+ * address - this instance. It is used by the receiver to
+ * send a reply through this instance again.
+ */
+ swim_packet_build_meta(&task->packet, self, &meta.route.src,
+ &meta.route.dst);
+ /* Copy the original body without a touch. */
+ size = end - pos;
+ char *body = swim_packet_alloc(&task->packet, size);
+ assert(body != NULL);
+ memcpy(body, pos, size);
+ swim_task_send(task, &meta.route.dst, scheduler);
return;
error:
diag_log();
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 88a7f424d..9c567e157 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -139,7 +139,8 @@ swim_packet_create(struct swim_packet *packet);
typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler,
const char *buf, const char *end,
- const struct sockaddr_in *src);
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *proxy);
/** Planner and executor of input and output operations.*/
struct swim_scheduler {
@@ -211,6 +212,11 @@ struct swim_task {
struct swim_packet packet;
/** Destination address. */
struct sockaddr_in dst;
+ /**
+ * Optional proxy via which the destination should be
+ * reached.
+ */
+ struct sockaddr_in proxy;
/** Place in a queue of tasks. */
struct rlist in_queue_output;
/**
@@ -226,6 +232,13 @@ swim_task_is_scheduled(struct swim_task *task)
return ! rlist_empty(&task->in_queue_output);
}
+/**
+ * Set the proxy for the task. Before sending this proxy is dumped
+ * into metadata section.
+ */
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy);
+
/**
* Put the task into a queue of tasks. Eventually it will be sent.
*/
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 796559e8e..18c20abf3 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -439,9 +439,9 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header,
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
- const struct sockaddr_in *src)
+ const struct sockaddr_in *src, bool has_routing)
{
- int map_size = 1 + SWIM_INADDR_BIN_SIZE;
+ int map_size = 1 + SWIM_INADDR_BIN_SIZE + has_routing;
assert(mp_sizeof_map(map_size) == 1);
header->m_header = 0x80 | map_size;
header->k_version = SWIM_META_TARANTOOL_VERSION;
@@ -452,6 +452,64 @@ swim_meta_header_bin_create(struct swim_meta_header_bin *header,
swim_inaddr_bin_fill(&header->src_addr, src);
}
+/**
+ * Decode meta routing section into meta definition object.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos MessagePack buffer to decode.
+ * @param end End of the MessagePack buffer.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+static int
+swim_meta_def_decode_route(struct swim_meta_def *def, const char **pos,
+ const char *end)
+{
+ const char *prefix = "invalid routing section:";
+ uint32_t size;
+ def->route.src.sin_family = AF_INET;
+ def->route.dst.sin_family = AF_INET;
+ if (swim_decode_map(pos, end, &size, prefix, "route") != 0)
+ return -1;
+ for (uint32_t i = 0; i < size; ++i) {
+ uint64_t key;
+ if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0)
+ return -1;
+ switch (key) {
+ case SWIM_ROUTE_SRC_ADDRESS:
+ if (swim_decode_ip(&def->route.src, pos, end, prefix,
+ "source address") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_SRC_PORT:
+ if (swim_decode_port(&def->route.src, pos, end,
+ prefix, "source port") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_DST_ADDRESS:
+ if (swim_decode_ip(&def->route.dst, pos, end, prefix,
+ "destination address") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_DST_PORT:
+ if (swim_decode_port(&def->route.dst, pos, end,
+ prefix, "destination port") != 0)
+ return -1;
+ break;
+ default:
+ diag_set(SwimError, "%s unknown key", prefix);
+ return -1;
+ }
+ }
+ if (swim_check_inaddr_not_empty(&def->route.src, prefix,
+ "source") != 0 ||
+ swim_check_inaddr_not_empty(&def->route.dst, prefix,
+ "destination") != 0)
+ return -1;
+ def->is_route_specified = true;
+ return 0;
+}
+
int
swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
const char *end)
@@ -467,6 +525,10 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0)
return -1;
switch (key) {
+ case SWIM_META_ROUTING:
+ if (swim_meta_def_decode_route(def, pos, end) != 0)
+ return -1;
+ break;
case SWIM_META_TARANTOOL_VERSION:
if (swim_decode_uint(pos, end, &key, prefix,
"version") != 0)
@@ -509,3 +571,20 @@ swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation)
header->m_incarnation = 0xcf;
header->v_incarnation = mp_bswap_u64(incarnation);
}
+
+void
+swim_route_bin_create(struct swim_route_bin *route,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *dst)
+{
+ int map_size = SWIM_INADDR_BIN_SIZE * 2;
+ assert(mp_sizeof_map(map_size) == 1);
+ route->k_routing = SWIM_META_ROUTING;
+ route->m_routing = 0x80 | map_size;
+ swim_inaddr_bin_create(&route->src_addr, SWIM_ROUTE_SRC_ADDRESS,
+ SWIM_ROUTE_SRC_PORT);
+ swim_inaddr_bin_create(&route->dst_addr, SWIM_ROUTE_DST_ADDRESS,
+ SWIM_ROUTE_DST_PORT);
+ swim_inaddr_bin_fill(&route->src_addr, src);
+ swim_inaddr_bin_fill(&route->dst_addr, dst);
+}
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index ddf9e28db..045e55415 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -51,7 +51,13 @@ enum {
* | { |
* | SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,|
* | SWIM_META_SRC_ADDRESS: uint, ip, |
- * | SWIM_META_SRC_PORT: uint, port |
+ * | SWIM_META_SRC_PORT: uint, port, |
+ * | SWIM_META_ROUTING: { |
+ * | SWIM_ROUTE_SRC_ADDRESS: uint, ip, |
+ * | SWIM_ROUTE_SRC_PORT: uint, port, |
+ * | SWIM_ROUTE_DST_ADDRESS: uint, ip, |
+ * | SWIM_ROUTE_DST_PORT: uint, port |
+ * | } |
* | } |
* +-------------------Protocol logic section--------------------+
* | { |
@@ -421,6 +427,7 @@ enum swim_meta_key {
*/
SWIM_META_SRC_ADDRESS,
SWIM_META_SRC_PORT,
+ SWIM_META_ROUTING,
};
/**
@@ -432,7 +439,7 @@ enum swim_meta_key {
* separate MessagePack map.
*/
struct PACKED swim_meta_header_bin {
- /** mp_encode_map(3) */
+ /** mp_encode_map(3 or 4) */
uint8_t m_header;
/** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */
@@ -448,7 +455,7 @@ struct PACKED swim_meta_header_bin {
/** Initialize meta section. */
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
- const struct sockaddr_in *src);
+ const struct sockaddr_in *src, bool has_routing);
/** Meta definition. */
struct swim_meta_def {
@@ -456,6 +463,18 @@ struct swim_meta_def {
uint32_t version;
/** Source of the message. */
struct sockaddr_in src;
+ /** Route source and destination. */
+ struct {
+ struct sockaddr_in src;
+ struct sockaddr_in dst;
+ } route;
+ /**
+ * True, if both @a src and @a dst are not empty. This
+ * flag is just sugar so as not to check both addresses
+ * manually. Also in future more fields could be added
+ * here.
+ */
+ bool is_route_specified;
};
/**
@@ -471,6 +490,43 @@ int
swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
const char *end);
+enum swim_route_key {
+ /**
+ * True source of the packet. Can be different from the
+ * packet sender. It is expected that a response should be
+ * sent back to this address. Maybe indirectly through the
+ * same proxy.
+ */
+ SWIM_ROUTE_SRC_ADDRESS = 0,
+ SWIM_ROUTE_SRC_PORT,
+ /**
+ * True destination of the packet. Can be different from
+ * this instance, receiver. If it is for another instance,
+ * then this packet is forwarded to the latter.
+ */
+ SWIM_ROUTE_DST_ADDRESS,
+ SWIM_ROUTE_DST_PORT,
+ swim_route_key_MAX,
+};
+
+/** Route section template. Describes source, destination. */
+struct PACKED swim_route_bin {
+ /** mp_encode_uint(SWIM_ROUTING) */
+ uint8_t k_routing;
+ /** mp_encode_map(4) */
+ uint8_t m_routing;
+ /** SWIM_ROUTE_SRC_ADDRESS and SWIM_ROUTE_SRC_PORT. */
+ struct swim_inaddr_bin src_addr;
+ /** SWIM_ROUTE_DST_ADDRESS and SWIM_ROUTE_DST_PORT. */
+ struct swim_inaddr_bin dst_addr;
+};
+
+/** Initialize routing section. */
+void
+swim_route_bin_create(struct swim_route_bin *route,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *dst);
+
/** }}} Meta component */
enum swim_quit_key {
diff --git a/test/unit/swim_proto.c b/test/unit/swim_proto.c
index 64487fb7d..d7fafc7f0 100644
--- a/test/unit/swim_proto.c
+++ b/test/unit/swim_proto.c
@@ -189,11 +189,64 @@ swim_test_meta(void)
footer();
}
+static void
+swim_test_route(void)
+{
+ header();
+ plan(5);
+
+ char buffer[1024];
+ struct swim_meta_def mdef;
+ struct swim_meta_header_bin header;
+ struct sockaddr_in addr;
+ addr.sin_port = htons(1234);
+ fail_if(inet_aton("127.0.0.1", &addr.sin_addr) == 0);
+
+ swim_meta_header_bin_create(&header, &addr, true);
+ memcpy(buffer, &header, sizeof(header));
+ char *last_valid = buffer + sizeof(header);
+ char *end = last_valid;
+ const char *pos = buffer;
+
+ is(swim_meta_def_decode(&mdef, &pos, end), -1,
+ "route was expected, but map is too short");
+
+ end = mp_encode_uint(end, SWIM_META_ROUTING);
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), -1, "no route map");
+
+ end = mp_encode_map(end, 0);
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), -1, "empty route map");
+
+ struct swim_route_bin route;
+ struct sockaddr_in src, dst;
+ memset(&src, 0, sizeof(src));
+ memset(&dst, 0, sizeof(dst));
+ swim_route_bin_create(&route, &src, &dst);
+ memcpy(last_valid, &route, sizeof(route));
+ end = last_valid + sizeof(route);
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), -1, "zero addresses");
+
+ src.sin_port = 1;
+ src.sin_addr = addr.sin_addr;
+ dst.sin_port = 1;
+ dst.sin_addr = addr.sin_addr;
+ swim_route_bin_create(&route, &src, &dst);
+ memcpy(last_valid, &route, sizeof(route));
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), 0, "normal route");
+
+ check_plan();
+ footer();
+}
+
int
main()
{
header();
- plan(2);
+ plan(3);
memory_init();
fiber_init(fiber_c_invoke);
int fd = open("log.txt", O_TRUNC);
@@ -203,6 +256,7 @@ main()
swim_test_member_def();
swim_test_meta();
+ swim_test_route();
say_logger_free();
fiber_free();
diff --git a/test/unit/swim_proto.result b/test/unit/swim_proto.result
index c22769161..8a41a5d40 100644
--- a/test/unit/swim_proto.result
+++ b/test/unit/swim_proto.result
@@ -1,5 +1,5 @@
*** main ***
-1..2
+1..3
*** swim_test_member_def ***
1..12
ok 1 - not map header
@@ -28,4 +28,13 @@ ok 1 - subtests
ok 8 - normal meta
ok 2 - subtests
*** swim_test_meta: done ***
+ *** swim_test_route ***
+ 1..5
+ ok 1 - route was expected, but map is too short
+ ok 2 - no route map
+ ok 3 - empty route map
+ ok 4 - zero addresses
+ ok 5 - normal route
+ok 3 - subtests
+ *** swim_test_route: done ***
*** main: done ***
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] Re: [PATCH 5/6] swim: introduce routing
2019-04-24 14:36 ` [tarantool-patches] [PATCH 5/6] swim: introduce routing Vladislav Shpilevoy
@ 2019-04-24 16:46 ` Konstantin Osipov
2019-04-24 20:25 ` Vladislav Shpilevoy
0 siblings, 1 reply; 22+ messages in thread
From: Konstantin Osipov @ 2019-04-24 16:46 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/24 18:50]:
there is a couple of things I don't understand:
- how do you make a decision that you need to use a proxy? is it
random? is it if there are too many unacknowledged pings?
- how do you make a decision which proxy to use? Do you choose a
single peer or all available peers when you need to send a
message via a proxy?
- how is the result returned to the sender? As far as I can see
from the route section, there is only source and destination
addresses, but it doesn't contain entire routing path. Do you
assume there is always only one intermediate hop in case of an
indirect ping and always respond to the direct sender, assuming
the sender has a direct connection to the originator?
Please document answers to these questions in the code, it
constitutes the mechanics of indirect pings.
> +void
> +swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
Why not "swim_task_set_proxy_addr"?
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] Re: [PATCH 5/6] swim: introduce routing
2019-04-24 16:46 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-24 20:25 ` Vladislav Shpilevoy
2019-04-25 10:39 ` Konstantin Osipov
0 siblings, 1 reply; 22+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-24 20:25 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
On 24/04/2019 19:46, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/24 18:50]:
>
> there is a couple of things I don't understand:
>
> - how do you make a decision that you need to use a proxy? is it
> random? is it if there are too many unacknowledged pings?
First of all this patch does not use proxies at all. It just introduces
new section SWIM_META_ROUTING.
Answering your question - the second option. The last patch, which
introduces suspicion, uses proxy when too many direct pings are
unacked.
>
> - how do you make a decision which proxy to use? Do you choose a
> single peer or all available peers when you need to send a
> message via a proxy?
I choose multiple random proxies as the SWIM paper says.
>
> - how is the result returned to the sender?
I send an answer via the same proxy.
> As far as I can see
> from the route section, there is only source and destination
> addresses, but it doesn't contain entire routing path.
In fact it does. Meta contains SWIM_META_ROUTING +
{SWIM_META_SRC_PORT, SWIM_META_SRC_ADDRESS}. It is 3 ip/port pairs.
SWIM_META_SRC_PORT/ADDRESS is the message sender, always.
SWIM_META_ROUING contains the message originator (it is not
always sender), and the destination.
Let's consider an example. There are 3 nodes S1, S2, S3. S1
sends a ping to S3 via S2.
S1 -> S2
{
route: {src: S1, dst: S3},
src: S1
}
S2 -> S3
{
route: {src: S1, dst: S3},
src: S2
}
S3 -> S2
{
route: {src: S3, dst: S1},
src: S3
}
S2 -> S1
{
route: {src: S3, dst: S1},
src: S2
}
As you can see, 'src' is set to real sender
on each hop. 'route' is unchanged until the
message reaches the end.
'src' is used by S3 to send the response back
via the same proxy.
> Do you
> assume there is always only one intermediate hop in case of an
> indirect ping and always respond to the direct sender, assuming
> the sender has a direct connection to the originator?
Yes, I send an indirect message only via one intermediate member,
as the SWIM paper says. And respond by the same route.
>
> Please document answers to these questions in the code, it
> constitutes the mechanics of indirect pings.
Most of your questions are answered in comments inside the
next commit. But you fairly noticed that it is worth
documenting that only one additional hop is possible. Arbitrary
routes of any length are not implemented. I thought about it, but
decided it would be ultra overkill for such a simple task.
A new comment:
===================================================================
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 045e55415..f70ac708a 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -427,6 +427,32 @@ enum swim_meta_key {
*/
SWIM_META_SRC_ADDRESS,
SWIM_META_SRC_PORT,
+ /**
+ * Routing section allows to specify routes of up to 3
+ * nodes: source, proxy, and destination. It contains two
+ * addresses - the message originator and the final
+ * destination. Here is an example of an indirect message
+ * transmission. Assume, there are 3 nodes: S1, S2, S3.
+ * S1 sends a message to S3 via S2. The following steps
+ * are executed in order to deliver the message:
+ *
+ * S1 -> S2
+ * { src: S1, routing: {src: S1, dst: S3}, body: ... }
+ *
+ * S2 receives the message and sees: routing.dst != S2 -
+ * it is a foreign packet. S2 forwards it to S3 preserving
+ * all the data - body and routing sections.
+ *
+ *
+ * S2 -> S3
+ * {src: S2, routing: {src: S1, dst: S3}, body: ...}
+ *
+ * S3 receives the message and sees: routing.dst == S3 -
+ * the message is delivered. If S3 wants to answer, it
+ * sends a response via the same proxy. It knows, that the
+ * message was delivered from S2, and sends an answer via
+ * S2.
+ */
SWIM_META_ROUTING,
};
===================================================================
>
>> +void
>> +swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
>
> Why not "swim_task_set_proxy_addr"?
1) Because the transport level operates on addresses only. It is excessive
to say everywhere '_addr' suffix. It is impossible to send to UUID or
anything else except inet address;
2) For consistency - 'swim_task_send()' is not named
'swim_task_send_to_addr()';
3) '_addr' suffix is too long. I tried it on the first SWIM version,
but it does not help at all, only pads the code out.
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] Re: [PATCH 5/6] swim: introduce routing
2019-04-24 20:25 ` Vladislav Shpilevoy
@ 2019-04-25 10:39 ` Konstantin Osipov
2019-04-25 13:50 ` Vladislav Shpilevoy
0 siblings, 1 reply; 22+ messages in thread
From: Konstantin Osipov @ 2019-04-25 10:39 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/25 00:46]:
>
>
> On 24/04/2019 19:46, Konstantin Osipov wrote:
> > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/24 18:50]:
> >
> > there is a couple of things I don't understand:
> >
> > - how do you make a decision that you need to use a proxy? is it
> > random? is it if there are too many unacknowledged pings?
>
> First of all this patch does not use proxies at all. It just introduces
> new section SWIM_META_ROUTING.
>
> Answering your question - the second option. The last patch, which
> introduces suspicion, uses proxy when too many direct pings are
> unacked.
>
> >
> > - how do you make a decision which proxy to use? Do you choose a
> > single peer or all available peers when you need to send a
> > message via a proxy?
>
> I choose multiple random proxies as the SWIM paper says.
>
> >
> > - how is the result returned to the sender?
>
> I send an answer via the same proxy.
>
> > As far as I can see
> > from the route section, there is only source and destination
> > addresses, but it doesn't contain entire routing path.
>
> In fact it does. Meta contains SWIM_META_ROUTING +
> {SWIM_META_SRC_PORT, SWIM_META_SRC_ADDRESS}. It is 3 ip/port pairs.
>
> SWIM_META_SRC_PORT/ADDRESS is the message sender, always.
>
> SWIM_META_ROUING contains the message originator (it is not
> always sender), and the destination.
>
> Let's consider an example. There are 3 nodes S1, S2, S3. S1
> sends a ping to S3 via S2.
>
> S1 -> S2
> {
> route: {src: S1, dst: S3},
> src: S1
> }
>
> S2 -> S3
> {
> route: {src: S1, dst: S3},
> src: S2
> }
>
> S3 -> S2
> {
> route: {src: S3, dst: S1},
> src: S3
> }
>
> S2 -> S1
> {
> route: {src: S3, dst: S1},
> src: S2
> }
>
>
> As you can see, 'src' is set to real sender
> on each hop. 'route' is unchanged until the
> message reaches the end.
>
> 'src' is used by S3 to send the response back
> via the same proxy.
>
> > Do you
> > assume there is always only one intermediate hop in case of an
> > indirect ping and always respond to the direct sender, assuming
> > the sender has a direct connection to the originator?
>
> Yes, I send an indirect message only via one intermediate member,
> as the SWIM paper says. And respond by the same route.
>
> >
> > Please document answers to these questions in the code, it
> > constitutes the mechanics of indirect pings.
>
> Most of your questions are answered in comments inside the
> next commit. But you fairly noticed that it is worth
> documenting that only one additional hop is possible. Arbitrary
> routes of any length are not implemented. I thought about it, but
> decided it would be ultra overkill for such a simple task.
>
> A new comment:
This comment is good but could be even better if it contains
answers to all of the questions, not just how routing is done:
> > - how do you make a decision that you need to use a proxy? is it
> > random? is it if there are too many unacknowledged pings?
>
> First of all this patch does not use proxies at all. It just introduces
> new section SWIM_META_ROUTING.
OK, but when I look at routing itself I want to know how it's
used. Even if this patch does not use the routing, I want a patch
with a comment about the usage. Please feel free to add to the
current comment or submit separately.
>
> Answering your question - the second option. The last patch, which
> introduces suspicion, uses proxy when too many direct pings are
> unacked.
>
> >
> > - how do you make a decision which proxy to use? Do you choose a
> > single peer or all available peers when you need to send a
> > message via a proxy?
>
> I choose multiple random proxies as the SWIM paper says.
Please document this decision and the reasons for it as you stated
above.
> >> +void
> >> +swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
> >
> > Why not "swim_task_set_proxy_addr"?
>
> 1) Because the transport level operates on addresses only. It is excessive
> to say everywhere '_addr' suffix. It is impossible to send to UUID or
> anything else except inet address;
>
> 2) For consistency - 'swim_task_send()' is not named
> 'swim_task_send_to_addr()';
>
> 3) '_addr' suffix is too long. I tried it on the first SWIM version,
> but it does not help at all, only pads the code out.
Then swim_task_set_proxy(). A method should have a verb in its
name, otherwise the name looks like a constructor.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] Re: [PATCH 5/6] swim: introduce routing
2019-04-25 10:39 ` Konstantin Osipov
@ 2019-04-25 13:50 ` Vladislav Shpilevoy
2019-04-25 13:57 ` Konstantin Osipov
0 siblings, 1 reply; 22+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-25 13:50 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
> Please document this decision and the reasons for it as you stated
> above.
Done in the last commit.
=========================================================================
@@ -59,11 +59,22 @@
* ping. Replies are processed out of the main cycle,
* asynchronously.
*
- * Random selection provides even network load of ~1 message on
- * each member per one protocol step regardless of the cluster
- * size. Without randomness each member would receive a network
- * load of N messages in each protocol step, where N is the
- * cluster size.
+ * When a member unacknowledged too many pings, its status is
+ * changed to 'suspected'. The SWIM paper describes suspicion
+ * subcomponent as a protection against false-positive detection
+ * of alive members as dead. It happens when a member is
+ * overloaded and responds to pings too slow, or when the network
+ * is in trouble and packets can not go through some channels.
+ * When a member is suspected, another instance pings it
+ * indirectly via other members. It sends a fixed number of pings
+ * to the suspected one in parallel via additional hops selected
+ * randomly among other members.
+ *
+ * Random selection in all the components provides even network
+ * load of ~1 message on each member per one protocol step
+ * regardless of the cluster size. Without randomness each member
+ * would receive a network load of N messages in each protocol
+ * step, where N is the cluster size.
*
* To speed up propagation of new information by means of a few
* random messages SWIM proposes a kind of fairness: when
@@ -1111,7 +1122,12 @@ swim_send_ack(struct swim *swim, struct swim_task *task,
swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK, NULL);
}
-/** Schedule an indirect ack through @a proxy. */
+/**
+ * Schedule an indirect ack through @a proxy. Indirect ACK is sent
+ * only when this instance receives an indirect ping. It means
+ * that another member tries to reach this one via other nodes,
+ * and inexplicably failed to do it directly.
+ */
static inline int
swim_send_indirect_ack(struct swim *swim, const struct sockaddr_in *dst,
const struct sockaddr_in *proxy)
@@ -1155,7 +1171,16 @@ finish:
swim_task_delete_cb(task, scheduler, rc);
}
-/** Schedule a number of indirect pings to a member @a dst. */
+/**
+ * Schedule a number of indirect pings to a member @a dst.
+ * Indirect pings are used when direct pings are not acked too
+ * long. The SWIM paper explains that it is a protection against
+ * false-positive failure detection when a node sends ACKs too
+ * slow, or the network is in trouble. Then other nodes can try to
+ * access it via different channels and members. The algorithm is
+ * simple - choose a fixed number of random members and send pings
+ * to the suspected member via them in parallel.
+ */
static inline int
swim_send_indirect_pings(struct swim *swim, const struct swim_member *dst)
{
=========================================================================
>>>> +void
>>>> +swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
>>>
>>> Why not "swim_task_set_proxy_addr"?
>>
>> 1) Because the transport level operates on addresses only. It is excessive
>> to say everywhere '_addr' suffix. It is impossible to send to UUID or
>> anything else except inet address;
>>
>> 2) For consistency - 'swim_task_send()' is not named
>> 'swim_task_send_to_addr()';
>>
>> 3) '_addr' suffix is too long. I tried it on the first SWIM version,
>> but it does not help at all, only pads the code out.
>
> Then swim_task_set_proxy(). A method should have a verb in its
> name, otherwise the name looks like a constructor.
Done.
=========================================================================
void
-swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
+swim_task_set_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
{
/*
* Route meta should be reserved before body encoding is
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 977859db7..03f268bc1 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -243,7 +243,7 @@ swim_task_is_scheduled(struct swim_task *task)
* into metadata section.
*/
void
-swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy);
+swim_task_set_proxy(struct swim_task *task, const struct sockaddr_in *proxy);
=========================================================================
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] [PATCH 6/6] swim: introduce suspicion
2019-04-24 14:36 [tarantool-patches] [PATCH 0/6] swim suspicion Vladislav Shpilevoy
` (4 preceding siblings ...)
2019-04-24 14:36 ` [tarantool-patches] [PATCH 5/6] swim: introduce routing Vladislav Shpilevoy
@ 2019-04-24 14:36 ` Vladislav Shpilevoy
2019-04-24 17:01 ` [tarantool-patches] " Konstantin Osipov
5 siblings, 1 reply; 22+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-24 14:36 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
Suspicion component is a way how SWIM protects from
false-positive failure detections. When the network is slow, or
a SWIM node does not manage to process messages in time because
of being overloaded, other nodes will not receive ACKs in time,
but it is too soon to declare the member dead.
The nodes will mark the member as suspected, and will ping it
indirectly, via other members. It 1) gives the suspected member
more time to respond on ACKs, 2) protects from the case when it
is a network problem on particular channels.
Part of #3234
---
src/lib/swim/swim.c | 245 +++++++++++++++++++++++++++++++---
src/lib/swim/swim_constants.h | 6 +
src/lib/swim/swim_proto.c | 1 +
test/unit/swim.c | 76 ++++++++---
test/unit/swim.result | 30 +++--
5 files changed, 311 insertions(+), 47 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 1b4a4365d..ac4061701 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -143,11 +143,17 @@ enum {
*/
ACK_TIMEOUT_DEFAULT = 30,
/**
- * If a member has not been responding to pings this
- * number of times, it is considered dead. According to
- * the SWIM paper, for a member it is sufficient to miss
- * one direct ping, and an arbitrary but fixed number of
- * simultaneous indirect pings, to be considered dead.
+ * If an alive member has not been responding to pings
+ * this number of times, it is suspected to be dead. To
+ * confirm the death it should fail more pings.
+ */
+ NO_ACKS_TO_SUSPECT = 2,
+ /**
+ * If a suspected member has not been responding to pings
+ * this number of times, it is considered dead. According
+ * to the SWIM paper, for a member it is sufficient to
+ * miss one direct ping, and an arbitrary but fixed number
+ * of simultaneous indirect pings, to be considered dead.
* Seems too little, so here it is bigger.
*/
NO_ACKS_TO_DEAD = 3,
@@ -161,6 +167,11 @@ enum {
* anti-entropy components.
*/
NO_ACKS_TO_GC = 2,
+ /**
+ * Number of attempts to reach out a member via another
+ * members when it did not answer on a regular ping .
+ */
+ INDIRECT_PING_COUNT = 2,
};
/**
@@ -474,12 +485,17 @@ swim_cached_round_msg_invalidate(struct swim *swim)
swim->is_round_packet_valid = false;
}
-/** Put the member into a list of ACK waiters. */
+/**
+ * Put the member into a list of ACK waiters. @a hop_count says
+ * how many hops from one member to another the ACK is expected to
+ * do.
+ */
static void
-swim_wait_ack(struct swim *swim, struct swim_member *member)
+swim_wait_ack(struct swim *swim, struct swim_member *member, int hop_count)
{
if (heap_node_is_stray(&member->in_wait_ack_heap)) {
- member->ping_deadline = swim_time() + swim->wait_ack_tick.at;
+ double timeout = swim->wait_ack_tick.at * hop_count;
+ member->ping_deadline = swim_time() + timeout;
wait_ack_heap_insert(&swim->wait_ack_heap, member);
swim_ev_timer_start(loop(), &swim->wait_ack_tick);
}
@@ -609,7 +625,7 @@ swim_ping_task_complete(struct swim_task *task,
struct swim *swim = swim_by_scheduler(scheduler);
struct swim_member *m = container_of(task, struct swim_member,
ping_task);
- swim_wait_ack(swim, m);
+ swim_wait_ack(swim, m, 1);
}
/** Free member's resources. */
@@ -1064,7 +1080,7 @@ swim_complete_step(struct swim_task *task,
* dissemination and failure detection
* sections.
*/
- swim_wait_ack(swim, m);
+ swim_wait_ack(swim, m, 1);
swim_decrease_event_ttd(swim);
}
}
@@ -1073,13 +1089,16 @@ swim_complete_step(struct swim_task *task,
/** Schedule send of a failure detection message. */
static void
swim_send_fd_msg(struct swim *swim, struct swim_task *task,
- const struct sockaddr_in *dst, enum swim_fd_msg_type type)
+ const struct sockaddr_in *dst, enum swim_fd_msg_type type,
+ const struct sockaddr_in *proxy)
{
/*
* Reset packet allocator in case if task is being reused.
*/
assert(! swim_task_is_scheduled(task));
swim_packet_create(&task->packet);
+ if (proxy != NULL)
+ swim_task_proxy(task, proxy);
char *header = swim_packet_alloc(&task->packet, 1);
int map_size = swim_encode_src_uuid(swim, &task->packet);
map_size += swim_encode_failure_detection(swim, &task->packet, type);
@@ -1095,7 +1114,21 @@ static inline void
swim_send_ack(struct swim *swim, struct swim_task *task,
const struct sockaddr_in *dst)
{
- swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK);
+ swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK, NULL);
+}
+
+/** Schedule an indirect ack through @a proxy. */
+static inline int
+swim_send_indirect_ack(struct swim *swim, const struct sockaddr_in *dst,
+ const struct sockaddr_in *proxy)
+{
+ struct swim_task *task =
+ swim_task_new(swim_task_delete_cb, swim_task_delete_cb,
+ "indirect ack");
+ if (task == NULL)
+ return -1;
+ swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK, proxy);
+ return 0;
}
/** Schedule send of a ping. */
@@ -1103,7 +1136,171 @@ static inline void
swim_send_ping(struct swim *swim, struct swim_task *task,
const struct sockaddr_in *dst)
{
- swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_PING);
+ swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_PING, NULL);
+}
+
+struct swim_iping_block;
+
+/**
+ * When indirect pings are sent, each is represented by this
+ * structure.
+ */
+struct swim_iping_task {
+ /** Base task used by the scheduler. */
+ struct swim_task base;
+ /**
+ * Reference to a block of other indirect ping tasks sent
+ * at the same moment. Used to decide whether it is needed
+ * to start waiting for an ACK, and on which member.
+ */
+ struct swim_iping_block *block;
+};
+
+/**
+ * An array of indirect ping tasks sent simultaneously via
+ * different proxies. The block contains meta information allowing
+ * to 1) start waiting for an ACK only after first successful
+ * send; 2) determine which member has sent the pings - UUID is
+ * needed for that, inet address is not enough.
+ *
+ * The block is deleted when the last task is finished.
+ */
+struct swim_iping_block {
+ /** Array of indirect ping tasks. */
+ struct swim_iping_task tasks[INDIRECT_PING_COUNT];
+ /**
+ * UUID of the destination member. Used to set ping
+ * deadline in that member when at least one is sent
+ * successfully.
+ */
+ struct tt_uuid dst_uuid;
+ /**
+ * The flag is used to wait for an ACK only once after a
+ * first ping is sent to protect from a case, when an ACK
+ * is received faster, than the last ping is sent. Then
+ * the whole indirect ping block should be considered
+ * acked.
+ */
+ bool need_wait_ack;
+};
+
+/** Destroy block's tasks and free its memory. */
+static inline void
+swim_iping_block_delete(struct swim_iping_block *b)
+{
+ for (int i = 0; i < INDIRECT_PING_COUNT; ++i)
+ swim_task_destroy(&b->tasks[i].base);
+ free(b);
+}
+
+/**
+ * Try to delete the task @a t. It can be deleted individually
+ * because is stored in an array, but if @a t is last in the
+ * block, then all other tasks have been sent as well, and the
+ * block can be deleted.
+ */
+static inline void
+swim_iping_task_delete(struct swim_iping_task *t)
+{
+ if (t == &t->block->tasks[INDIRECT_PING_COUNT])
+ swim_iping_block_delete(t->block);
+}
+
+/**
+ * Wrapper for iping task destructor to be called by the scheduler
+ * when a task is canceled.
+ */
+static void
+swim_iping_task_delete_cb(struct swim_task *base_task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) rc;
+ (void) scheduler;
+ struct swim_iping_task *t = (struct swim_iping_task *) base_task;
+ swim_iping_task_delete(t);
+}
+
+/**
+ * Indirect ping task completion callback. If it is a first
+ * successful transmission, then the sender starts waiting for an
+ * ACK.
+ */
+static void
+swim_iping_task_complete(struct swim_task *base_task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ struct swim_iping_task *t = (struct swim_iping_task *) base_task;
+ struct swim_iping_block *b = t->block;
+ if (rc >= 0 && b->need_wait_ack) {
+ b->need_wait_ack = false;
+ struct swim *swim = swim_by_scheduler(scheduler);
+ struct swim_member *m =
+ swim_find_member(swim, &b->dst_uuid);
+ if (m != NULL)
+ swim_wait_ack(swim, m, 2);
+ }
+ swim_iping_task_delete(t);
+}
+
+/**
+ * Create a new block of indirect ping tasks to be sent to a
+ * member with UUID @a dst_uuid.
+ */
+static inline struct swim_iping_block *
+swim_iping_block_new(const struct tt_uuid *dst_uuid)
+{
+ struct swim_iping_block *b =
+ (struct swim_iping_block *) malloc(sizeof(*b));
+ if (b == NULL) {
+ diag_set(OutOfMemory, sizeof(*b), "malloc", "b");
+ return NULL;
+ }
+ b->need_wait_ack = true;
+ b->dst_uuid = *dst_uuid;
+ for (int i = 0; i < INDIRECT_PING_COUNT; ++i) {
+ swim_task_create(&b->tasks[i].base, swim_iping_task_complete,
+ swim_iping_task_delete_cb, "indirect ping");
+ b->tasks[i].block = b;
+ }
+ return b;
+}
+
+/** Schedule a number of indirect pings to a member @a dst. */
+static inline int
+swim_send_indirect_pings(struct swim *swim, const struct swim_member *dst)
+{
+ struct mh_swim_table_t *t = swim->members;
+ int member_count = mh_size(t);
+ int rnd = swim_scaled_rand(0, member_count - 1);
+ mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
+ struct swim_iping_block *b = swim_iping_block_new(&dst->uuid);
+ if (b == NULL)
+ return -1;
+ for (int member_i = 0, task_i = 0; member_i < member_count &&
+ task_i < INDIRECT_PING_COUNT; ++member_i) {
+ struct swim_member *m = *mh_swim_table_node(t, rc);
+ /*
+ * It makes no sense to send an indirect ping via
+ * self and via destination - it would be just
+ * direct ping then.
+ */
+ if (m != swim->self && !swim_inaddr_eq(&dst->addr, &m->addr)) {
+ struct swim_iping_task *t = &b->tasks[task_i++];
+ swim_task_proxy(&t->base, &m->addr);
+ swim_send_fd_msg(swim, &t->base, &dst->addr,
+ SWIM_FD_MSG_PING, &m->addr);
+ }
+ /*
+ * First random member could be chosen too close
+ * to the hash end. Here the cycle is wrapped, if
+ * a packet still has free memory, but the
+ * iterator has already reached the hash end.
+ */
+ rc = mh_next(t, rc);
+ if (rc == end)
+ rc = mh_first(t);
+ }
+ return 0;
}
/**
@@ -1128,6 +1325,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
++m->unacknowledged_pings;
switch (m->status) {
case MEMBER_ALIVE:
+ if (m->unacknowledged_pings < NO_ACKS_TO_SUSPECT)
+ break;
+ m->status = MEMBER_SUSPECTED;
+ swim_on_member_update(swim, m);
+ if (swim_send_indirect_pings(swim, m) != 0)
+ diag_log();
+ break;
+ case MEMBER_SUSPECTED:
if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
swim_on_member_update(swim, m);
@@ -1315,7 +1520,8 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
static int
swim_process_failure_detection(struct swim *swim, const char **pos,
const char *end, const struct sockaddr_in *src,
- const struct tt_uuid *uuid)
+ const struct tt_uuid *uuid,
+ const struct sockaddr_in *proxy)
{
const char *prefix = "invalid failure detection message:";
struct swim_failure_detection_def def;
@@ -1360,8 +1566,13 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
switch (def.type) {
case SWIM_FD_MSG_PING:
- if (! swim_task_is_scheduled(&member->ack_task))
+ if (proxy != NULL) {
+ if (swim_send_indirect_ack(swim, &member->addr,
+ proxy) != 0)
+ diag_log();
+ } else if (! swim_task_is_scheduled(&member->ack_task)) {
swim_send_ack(swim, &member->ack_task, &member->addr);
+ }
break;
case SWIM_FD_MSG_ACK:
member->unacknowledged_pings = 0;
@@ -1433,7 +1644,6 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
const char *end, const struct sockaddr_in *src,
const struct sockaddr_in *proxy)
{
- (void) proxy;
const char *prefix = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
@@ -1465,7 +1675,8 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
break;
case SWIM_FAILURE_DETECTION:
if (swim_process_failure_detection(swim, &pos, end,
- src, &uuid) != 0)
+ src, &uuid,
+ proxy) != 0)
goto error;
break;
case SWIM_DISSEMINATION:
diff --git a/src/lib/swim/swim_constants.h b/src/lib/swim/swim_constants.h
index 7869ddf3e..4f8404ce3 100644
--- a/src/lib/swim/swim_constants.h
+++ b/src/lib/swim/swim_constants.h
@@ -37,6 +37,12 @@
enum swim_member_status {
/** The instance is ok, responds to requests. */
MEMBER_ALIVE = 0,
+ /**
+ * If a member has not responded to a ping, it is declared
+ * as suspected to be dead. After more failed pings it
+ * is finally dead.
+ */
+ MEMBER_SUSPECTED,
/**
* The member is considered dead. It will disappear from
* the membership after some unacknowledged pings.
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 18c20abf3..6502e40a1 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -45,6 +45,7 @@ swim_inaddr_str(const struct sockaddr_in *addr)
const char *swim_member_status_strs[] = {
"alive",
+ "suspected",
"dead",
"left",
};
diff --git a/test/unit/swim.c b/test/unit/swim.c
index e375e6607..2deaf138a 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -240,7 +240,7 @@ swim_test_add_remove(void)
static void
swim_test_basic_failure_detection(void)
{
- swim_start_test(7);
+ swim_start_test(9);
struct swim_cluster *cluster = swim_cluster_new(2);
swim_cluster_set_ack_timeout(cluster, 0.5);
@@ -248,8 +248,15 @@ swim_test_basic_failure_detection(void)
is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE,
"node is added as alive");
swim_cluster_block_io(cluster, 1);
- is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 2.4), -1,
- "member still is not dead after 2 noacks");
+ /* Roll one round to send a first ping. */
+ swim_run_for(1);
+
+ is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_SUSPECTED, 0.9), -1,
+ "member still is not suspected after 1 noack");
+ is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_SUSPECTED, 0.1), 0,
+ "but it is suspected after one more");
+ is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 1.4), -1,
+ "it is not dead after 2 more noacks");
is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 0.1), 0,
"but it is dead after one more");
@@ -304,34 +311,35 @@ swim_test_basic_gossip(void)
swim_cluster_add_link(cluster, 1, 0);
swim_cluster_set_drop(cluster, 1, 100);
/*
- * Wait two no-ACKs on S1 from S2. +1 sec to send a first
+ * Wait one no-ACK on S1 from S2. +1 sec to send a first
* ping.
*/
- swim_run_for(20 + 1);
+ swim_run_for(10 + 1);
swim_cluster_add_link(cluster, 0, 2);
swim_cluster_add_link(cluster, 2, 1);
/*
* After 10 seconds (one ack timeout) S1 should see S2 as
- * dead. But S3 still should see S2 as alive. To prevent
- * S1 from informing S3 about that the S3 IO is blocked
- * for a short time.
+ * suspected. But S3 still should see S2 as alive. To
+ * prevent S1 from informing S3 about that the S3 IO is
+ * blocked for a short time.
*/
swim_run_for(9);
is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE,
"S1 still thinks that S2 is alive");
swim_cluster_block_io(cluster, 2);
swim_run_for(1);
- is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD, "but one "\
- "more second, and a third ack timed out - S1 sees S2 as dead");
+ is(swim_cluster_member_status(cluster, 0, 1), MEMBER_SUSPECTED,
+ "but one more second, and a second ack timed out - S1 sees S2 as "\
+ "suspected");
is(swim_cluster_member_status(cluster, 2, 1), MEMBER_ALIVE,
"S3 still thinks that S2 is alive");
swim_cluster_unblock_io(cluster, 2);
/*
- * At most after two round steps S1 sends 'S2 is dead' to
- * S3.
+ * At most after two round steps S1 sends
+ * 'S2 is suspected' to S3.
*/
- is(swim_cluster_wait_status(cluster, 2, 1, MEMBER_DEAD, 2), 0,
- "S3 learns about dead S2 from S1");
+ is(swim_cluster_wait_status(cluster, 2, 1, MEMBER_SUSPECTED, 2), 0,
+ "S3 learns about suspected S2 from S1");
swim_cluster_delete(cluster);
swim_finish_test();
@@ -363,10 +371,14 @@ swim_test_refute(void)
swim_cluster_add_link(cluster, 0, 1);
swim_cluster_set_drop(cluster, 1, 100);
- fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 7) != 0);
+ /* Roll one round to send a first ping. */
+ swim_run_for(1);
+
+ fail_if(swim_cluster_wait_status(cluster, 0, 1,
+ MEMBER_SUSPECTED, 4) != 0);
swim_cluster_set_drop(cluster, 1, 0);
is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0,
- "S2 increments its own incarnation to refute its death");
+ "S2 increments its own incarnation to refute its suspicion");
is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 1), 0,
"new incarnation has reached S1 with a next round message");
@@ -386,7 +398,7 @@ swim_test_too_big_packet(void)
swim_start_test(3);
int size = 50;
double ack_timeout = 1;
- double first_dead_timeout = 20;
+ double first_dead_timeout = 30;
double everywhere_dead_timeout = size;
int drop_id = size / 2;
@@ -465,7 +477,9 @@ swim_test_undead(void)
swim_cluster_add_link(cluster, 0, 1);
swim_cluster_add_link(cluster, 1, 0);
swim_cluster_set_drop(cluster, 1, 100);
- is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 4), 0,
+ /* Roll one round to send a first ping. */
+ swim_run_for(1);
+ is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 5), 0,
"member S2 is dead");
swim_run_for(5);
is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD,
@@ -833,10 +847,33 @@ swim_test_payload_refutation(void)
swim_finish_test();
}
+static void
+swim_test_indirect_ping(void)
+{
+ swim_start_test(2);
+ uint16_t cluster_size = 3;
+ struct swim_cluster *cluster = swim_cluster_new(cluster_size);
+ swim_cluster_set_ack_timeout(cluster, 1);
+ for (int i = 0; i < cluster_size; ++i) {
+ for (int j = i + 1; j < cluster_size; ++j)
+ swim_cluster_interconnect(cluster, i, j);
+ }
+ swim_cluster_set_drop_channel(cluster, 0, 1, true);
+ swim_cluster_set_drop_channel(cluster, 1, 0, true);
+ swim_run_for(10);
+ is(swim_cluster_wait_status_everywhere(cluster, 0, MEMBER_ALIVE, 0),
+ 0, "S1 is still alive everywhere");
+ is(swim_cluster_wait_status_everywhere(cluster, 1, MEMBER_ALIVE, 0),
+ 0, "as well as S2 - they communicated via S3");
+
+ swim_cluster_delete(cluster);
+ swim_finish_test();
+}
+
static int
main_f(va_list ap)
{
- swim_start_test(17);
+ swim_start_test(18);
(void) ap;
swim_test_ev_init();
@@ -859,6 +896,7 @@ main_f(va_list ap)
swim_test_broadcast();
swim_test_payload_basic();
swim_test_payload_refutation();
+ swim_test_indirect_ping();
swim_test_transport_free();
swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 4b1407db3..a450d8427 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
*** main_f ***
-1..17
+1..18
*** swim_test_one_link ***
1..6
ok 1 - no rounds - no fullmesh
@@ -64,14 +64,16 @@ ok 4 - subtests
ok 5 - subtests
*** swim_test_add_remove: done ***
*** swim_test_basic_failure_detection ***
- 1..7
+ 1..9
ok 1 - node is added as alive
- ok 2 - member still is not dead after 2 noacks
- ok 3 - but it is dead after one more
- ok 4 - after 2 more unacks the member still is not deleted - dissemination TTD keeps it
- ok 5 - but it is dropped after 2 rounds when TTD gets 0
- ok 6 - fullmesh is restored
- ok 7 - a member is added back on an ACK
+ ok 2 - member still is not suspected after 1 noack
+ ok 3 - but it is suspected after one more
+ ok 4 - it is not dead after 2 more noacks
+ ok 5 - but it is dead after one more
+ ok 6 - after 2 more unacks the member still is not deleted - dissemination TTD keeps it
+ ok 7 - but it is dropped after 2 rounds when TTD gets 0
+ ok 8 - fullmesh is restored
+ ok 9 - a member is added back on an ACK
ok 6 - subtests
*** swim_test_basic_failure_detection: done ***
*** swim_test_probe ***
@@ -82,7 +84,7 @@ ok 7 - subtests
*** swim_test_probe: done ***
*** swim_test_refute ***
1..4
- ok 1 - S2 increments its own incarnation to refute its death
+ ok 1 - S2 increments its own incarnation to refute its suspicion
ok 2 - new incarnation has reached S1 with a next round message
ok 3 - after restart S2's incarnation is 0 again
ok 4 - S2 learned its old bigger incarnation 1 from S0
@@ -91,9 +93,9 @@ ok 8 - subtests
*** swim_test_basic_gossip ***
1..4
ok 1 - S1 still thinks that S2 is alive
- ok 2 - but one more second, and a third ack timed out - S1 sees S2 as dead
+ ok 2 - but one more second, and a second ack timed out - S1 sees S2 as suspected
ok 3 - S3 still thinks that S2 is alive
- ok 4 - S3 learns about dead S2 from S1
+ ok 4 - S3 learns about suspected S2 from S1
ok 9 - subtests
*** swim_test_basic_gossip: done ***
*** swim_test_too_big_packet ***
@@ -177,4 +179,10 @@ ok 16 - subtests
ok 11 - S3 learns S1's payload from S2
ok 17 - subtests
*** swim_test_payload_refutation: done ***
+ *** swim_test_indirect_ping ***
+ 1..2
+ ok 1 - S1 is still alive everywhere
+ ok 2 - as well as S2 - they communicated via S3
+ok 18 - subtests
+ *** swim_test_indirect_ping: done ***
*** main_f: done ***
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] Re: [PATCH 6/6] swim: introduce suspicion
2019-04-24 14:36 ` [tarantool-patches] [PATCH 6/6] swim: introduce suspicion Vladislav Shpilevoy
@ 2019-04-24 17:01 ` Konstantin Osipov
2019-04-24 20:28 ` Vladislav Shpilevoy
0 siblings, 1 reply; 22+ messages in thread
From: Konstantin Osipov @ 2019-04-24 17:01 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/24 18:50]:
> + /**
> + * Number of attempts to reach out a member via another
> + * members when it did not answer on a regular ping .
> + */
> + INDIRECT_PING_COUNT = 2,
As far as I can see it's not the number of attempts, it's the
number of cluster members involved as proxies for sending an
indirect ping. The fact that you attempt an indirect ping only
once before switching member state to dead is secondary.
This has to do with lack of a description in changeset or code
comment of indirect pings strategy.
> +/**
> + * Put the member into a list of ACK waiters. @a hop_count says
> + * how many hops from one member to another the ACK is expected to
> + * do.
> + */
> static void
> -swim_wait_ack(struct swim *swim, struct swim_member *member)
> +swim_wait_ack(struct swim *swim, struct swim_member *member, int hop_count)
> {
> if (heap_node_is_stray(&member->in_wait_ack_heap)) {
> - member->ping_deadline = swim_time() + swim->wait_ack_tick.at;
> + double timeout = swim->wait_ack_tick.at * hop_count;
Is hop_count ever greater than two? If not, I would not use int
hop_count, but use bool is_indirect_ping, and add a comment
explaining that in case of indirect pings the number of hops is
exactly two, so we need to increase the timeout.
> +/**
> + * When indirect pings are sent, each is represented by this
> + * structure.
> + */
> +struct swim_iping_task {
> + /** Base task used by the scheduler. */
> + struct swim_task base;
> + /**
> + * Reference to a block of other indirect ping tasks sent
> + * at the same moment. Used to decide whether it is needed
> + * to start waiting for an ACK, and on which member.
> + */
> + struct swim_iping_block *block;
I don't understand this comment, why do you need a reference?
I would not use it at all, seems like we have a separate data
structure when existing structures would do just fine.
> +};
> +
> +/**
> + * An array of indirect ping tasks sent simultaneously via
> + * different proxies. The block contains meta information allowing
> + * to 1) start waiting for an ACK only after first successful
> + * send; 2) determine which member has sent the pings - UUID is
> + * needed for that, inet address is not enough.
> + *
> + * The block is deleted when the last task is finished.
> + */
> +struct swim_iping_block {
> + /** Array of indirect ping tasks. */
> + struct swim_iping_task tasks[INDIRECT_PING_COUNT];
> + /**
> + * UUID of the destination member. Used to set ping
> + * deadline in that member when at least one is sent
> + * successfully.
> + */
> + struct tt_uuid dst_uuid;
> + /**
> + * The flag is used to wait for an ACK only once after a
> + * first ping is sent to protect from a case, when an ACK
> + * is received faster, than the last ping is sent. Then
> + * the whole indirect ping block should be considered
> + * acked.
> + */
Why care if indirect ping block is acknowledged or not? What
matters is member status, and it is changed when we get an ack.
In other words, I'd drop the whole idea with blocks and just send
multiple independent pings.
> switch (def.type) {
> case SWIM_FD_MSG_PING:
> - if (! swim_task_is_scheduled(&member->ack_task))
> + if (proxy != NULL) {
> + if (swim_send_indirect_ack(swim, &member->addr,
> + proxy) != 0)
> + diag_log();
> + } else if (! swim_task_is_scheduled(&member->ack_task)) {
> swim_send_ack(swim, &member->ack_task, &member->addr);
> + }
> break;
Why do you ever need to distinguish direct and indirect acks? Why not
respond to the sender address, while preserving the routing table
from the ping request, assuming the sender can figure out whether
the response is addressed to him directly or it should proxy it
further based on the received proxy section of the response?
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 22+ messages in thread
* [tarantool-patches] Re: [PATCH 6/6] swim: introduce suspicion
2019-04-24 17:01 ` [tarantool-patches] " Konstantin Osipov
@ 2019-04-24 20:28 ` Vladislav Shpilevoy
2019-04-25 10:42 ` Konstantin Osipov
0 siblings, 1 reply; 22+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-24 20:28 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
On 24/04/2019 20:01, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/04/24 18:50]:
>> + /**
>> + * Number of attempts to reach out a member via another
>> + * members when it did not answer on a regular ping .
>> + */
>> + INDIRECT_PING_COUNT = 2,
>
> As far as I can see it's not the number of attempts, it's the
> number of cluster members involved as proxies for sending an
> indirect ping. The fact that you attempt an indirect ping only
> once before switching member state to dead is secondary.
>
> This has to do with lack of a description in changeset or code
> comment of indirect pings strategy.
Sorry, you are right. They are not 'attempts' strictly
speaking. New comment:
==================================================================
*/
NO_ACKS_TO_GC = 2,
/**
- * Number of attempts to reach out a member via another
- * members when it did not answer on a regular ping .
+ * Number of pings sent indirectly to a member via other
+ * members when it did not answer on a regular ping. The
+ * messages are sent in parallel and via different
+ * members.
*/
INDIRECT_PING_COUNT = 2,
};
==================================================================
>
>> +/**
>> + * Put the member into a list of ACK waiters. @a hop_count says
>> + * how many hops from one member to another the ACK is expected to
>> + * do.
>> + */
>> static void
>> -swim_wait_ack(struct swim *swim, struct swim_member *member)
>> +swim_wait_ack(struct swim *swim, struct swim_member *member, int hop_count)
>> {
>> if (heap_node_is_stray(&member->in_wait_ack_heap)) {
>> - member->ping_deadline = swim_time() + swim->wait_ack_tick.at;
>> + double timeout = swim->wait_ack_tick.at * hop_count;
>
> Is hop_count ever greater than two? If not, I would not use int
> hop_count, but use bool is_indirect_ping, and add a comment
> explaining that in case of indirect pings the number of hops is
> exactly two, so we need to increase the timeout.
Ok, I do not mind.
==================================================================
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index af9b59cbb..24229bf0c 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -493,10 +493,18 @@ swim_cached_round_msg_invalidate(struct swim *swim)
* do.
*/
static void
-swim_wait_ack(struct swim *swim, struct swim_member *member, int hop_count)
+swim_wait_ack(struct swim *swim, struct swim_member *member,
+ bool was_ping_indirect)
{
if (heap_node_is_stray(&member->in_wait_ack_heap)) {
- double timeout = swim->wait_ack_tick.at * hop_count;
+ double timeout = swim->wait_ack_tick.at;
+ /*
+ * Direct ping is two hops: PING + ACK.
+ * Indirect ping is four hops: PING, FORWARD PING,
+ * ACK, FORWARD ACK. This is why x2 for indirects.
+ */
+ if (was_ping_indirect)
+ timeout *= 2;
member->ping_deadline = swim_time() + timeout;
wait_ack_heap_insert(&swim->wait_ack_heap, member);
swim_ev_timer_start(loop(), &swim->wait_ack_tick);
@@ -627,7 +635,7 @@ swim_ping_task_complete(struct swim_task *task,
struct swim *swim = swim_by_scheduler(scheduler);
struct swim_member *m = container_of(task, struct swim_member,
ping_task);
- swim_wait_ack(swim, m, 1);
+ swim_wait_ack(swim, m, false);
}
/** Free member's resources. */
@@ -1082,7 +1090,7 @@ swim_complete_step(struct swim_task *task,
* dissemination and failure detection
* sections.
*/
- swim_wait_ack(swim, m, 1);
+ swim_wait_ack(swim, m, false);
swim_decrease_event_ttd(swim);
}
}
@@ -1239,7 +1247,7 @@ swim_iping_task_complete(struct swim_task *base_task,
struct swim_member *m =
swim_find_member(swim, &b->dst_uuid);
if (m != NULL)
- swim_wait_ack(swim, m, 2);
+ swim_wait_ack(swim, m, true);
}
swim_iping_task_delete(t);
}
==================================================================
>
>> +/**
>> + * When indirect pings are sent, each is represented by this
>> + * structure.
>> + */
>> +struct swim_iping_task {
>> + /** Base task used by the scheduler. */
>> + struct swim_task base;
>> + /**
>> + * Reference to a block of other indirect ping tasks sent
>> + * at the same moment. Used to decide whether it is needed
>> + * to start waiting for an ACK, and on which member.
>> + */
>> + struct swim_iping_block *block;
>
> I don't understand this comment, why do you need a reference?
>
> I would not use it at all, seems like we have a separate data
> structure when existing structures would do just fine.
>
Talking of block - we have decided to get rid of it. To
understand why, I should explain why was it needed.
1) To wait for an ACK I need a struct swim_member to put it
into ack waiter queue. But in task_complete() callback I don't
know the member nor its UUID. struct swim_task is too encapsulated
inside the transport level. Iping block stores the UUID of sender.
2) It is possible, that a first indirect ACK was sent and we got
a result faster than managed to send a next indirect ACK. In such a
case we will wait for the second ACK, even having a response on
the first one.
We've decided that block can be removed if UUID is added to
struct swim_task, and if we mark ACK and ping with timestamps.
But I found another solution for ACK/ping timing problem.
When a next indirect ping is sent, lets watch at
swim_member status. If it is not suspected anymore, it
means, that an ACK had already been received while this ping
was waiting for EV_WRITE.
I will not paste here diff of this change since it is too big.
I would rather send V2 of this patchset.
>> switch (def.type) {
>> case SWIM_FD_MSG_PING:
>> - if (! swim_task_is_scheduled(&member->ack_task))
>> + if (proxy != NULL) {
>> + if (swim_send_indirect_ack(swim, &member->addr,
>> + proxy) != 0)
>> + diag_log();
>> + } else if (! swim_task_is_scheduled(&member->ack_task)) {
>> swim_send_ack(swim, &member->ack_task, &member->addr);
>> + }
>> break;
>
> Why do you ever need to distinguish direct and indirect acks? Why not
> respond to the sender address, while preserving the routing table
> from the ping request, assuming the sender can figure out whether
> the response is addressed to him directly or it should proxy it
> further based on the received proxy section of the response?
Such a long sentence, do not know how to split it in parts. Ok.
Firstly, I do not distinguish indirect ACKs when they are received
by the pinger. This is thanks to the transport level full isolation.
I do not even have a special request type like 'PING_REQUEST' or
something, like classical implementation does. In theory, I could even
send usual anti-entropy and dissemination messages indirectly.
Secondly, to preserve routing table I need to distinguish direct
from indirect ACKs on sender side, because routing table is generated
only when proxy address is set. I can't just say
'swim_send_ack(swim, member)' and let the transport level decide
whether it should use proxy or not.
The transport level doesn't have an address-table to choose proxies,
as well as it does not know, that this message is a response to another
message, which in turn was received via proxy. Only SWIM core has this
information. So it should give the transport level a hint via
'swim_task_proxy()' method.
swim_send_ack and swim_send_indirect_ack are just wrappers around
swim_send_fd_msg(), passing different proxy values. The former sets
proxy NULL, the latter not NULL. That's all.
>
> --
> Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
> http://tarantool.io - www.twitter.com/kostja_osipov
>
^ permalink raw reply [flat|nested] 22+ messages in thread