* [tarantool-patches] [PATCH v2 2/3] swim: introduce routing
2019-04-24 20:21 [tarantool-patches] [PATCH v2 0/3] swim suspicion Vladislav Shpilevoy
2019-04-24 20:21 ` [tarantool-patches] [PATCH v2 1/3] swim: store sender UUID in swim io tasks Vladislav Shpilevoy
@ 2019-04-24 20:21 ` Vladislav Shpilevoy
2019-04-25 10:43 ` [tarantool-patches] " Konstantin Osipov
2019-04-24 20:22 ` [tarantool-patches] [PATCH v2 3/3] swim: introduce suspicion Vladislav Shpilevoy
2019-04-25 13:50 ` [tarantool-patches] Re: [PATCH v2 0/3] swim suspicion Vladislav Shpilevoy
3 siblings, 1 reply; 10+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-24 20:21 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
Before the patch SWIM packets were being sent quite
straightforward from one instance to another with transparent
routing on Internet Level of TCP/IP. But the SWIM paper
describes last yet not implemented component - suspicion
mechanism.
So as not to overload this message with suspicion details it is
enough to say that it makes possible sending a packet through an
intermediate SWIM instance, not directly.
This commit extends the SWIM protocol with a new transport-level
section named 'routing'. It allows to send indirect SWIM messages
transparently via packet forwarding implemented fully inside
transportation component, in swim_io.c.
Part of #3234
---
src/lib/swim/swim.c | 4 +-
src/lib/swim/swim_io.c | 96 +++++++++++++++++++++++++++++++++----
src/lib/swim/swim_io.h | 15 +++++-
src/lib/swim/swim_proto.c | 83 +++++++++++++++++++++++++++++++-
src/lib/swim/swim_proto.h | 88 ++++++++++++++++++++++++++++++++--
test/unit/swim_proto.c | 56 +++++++++++++++++++++-
test/unit/swim_proto.result | 11 ++++-
7 files changed, 336 insertions(+), 17 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index bb1ded713..1b4a4365d 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1430,8 +1430,10 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end,
/** Process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler, const char *pos,
- const char *end, const struct sockaddr_in *src)
+ const char *end, const struct sockaddr_in *src,
+ const struct sockaddr_in *proxy)
{
+ (void) proxy;
const char *prefix = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index a55c15f30..c7ffc31d5 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -60,8 +60,11 @@ swim_packet_create(struct swim_packet *packet)
/** Fill metadata prefix of a packet. */
static inline void
swim_packet_build_meta(struct swim_packet *packet,
- const struct sockaddr_in *src)
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *route_src,
+ const struct sockaddr_in *route_dst)
{
+ assert((route_src != NULL) == (route_dst != NULL));
char *meta = packet->meta;
char *end = packet->body;
/*
@@ -71,9 +74,18 @@ swim_packet_build_meta(struct swim_packet *packet,
if (meta == end)
return;
struct swim_meta_header_bin header;
- swim_meta_header_bin_create(&header, src);
- assert(meta + sizeof(header) == end);
+ swim_meta_header_bin_create(&header, src, route_dst != NULL);
+ assert(meta + sizeof(header) <= end);
memcpy(meta, &header, sizeof(header));
+ meta += sizeof(header);
+ if (route_dst != NULL) {
+ struct swim_route_bin route;
+ swim_route_bin_create(&route, route_src, route_dst);
+ assert(meta + sizeof(route) <= end);
+ memcpy(meta, &route, sizeof(route));
+ meta += sizeof(route);
+ }
+ assert(meta == end);
/*
* Once meta is built, it is consumed by the body. Used
* not to rebuild the meta again if the task will be
@@ -82,6 +94,21 @@ swim_packet_build_meta(struct swim_packet *packet,
packet->body = packet->meta;
}
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
+{
+ /*
+ * Route meta should be reserved before body encoding is
+ * started. Otherwise later it would be necessary to move
+ * already encoded body, maybe having its tail trimmed off
+ * because of limited UDP packet size.
+ */
+ assert(swim_packet_body_size(&task->packet) == 0);
+ assert(! swim_inaddr_is_empty(proxy));
+ task->proxy = *proxy;
+ swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin));
+}
+
void
swim_task_create(struct swim_task *task, swim_task_f complete,
swim_task_f cancel, const char *desc)
@@ -161,6 +188,11 @@ swim_bcast_task_delete_cb(struct swim_task *task,
static int
swim_bcast_task_next_addr(struct swim_bcast_task *task)
{
+ /*
+ * Broadcast + proxy is not supported yet, and barely it
+ * will be needed anytime.
+ */
+ assert(swim_inaddr_is_empty(&task->base.proxy));
for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) {
int flags = i->ifa_flags;
if ((flags & IFF_UP) == 0)
@@ -320,13 +352,23 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
struct swim_task *task =
rlist_shift_entry(&scheduler->queue_output, struct swim_task,
in_queue_output);
+ const struct sockaddr_in *src = &scheduler->transport.addr;
+ const struct sockaddr_in *dst = &task->dst;
+ const char *dst_str = swim_inaddr_str(dst);
+ if (! swim_inaddr_is_empty(&task->proxy)) {
+ dst = &task->proxy;
+ dst_str = tt_sprintf("%s via %s", dst_str,
+ swim_inaddr_str(dst));
+ swim_packet_build_meta(&task->packet, src, src, &task->dst);
+ } else {
+ swim_packet_build_meta(&task->packet, src, NULL, NULL);
+ }
say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler),
- task->desc, swim_inaddr_str(&task->dst));
- swim_packet_build_meta(&task->packet, &scheduler->transport.addr);
+ task->desc, dst_str);
int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
task->packet.pos - task->packet.buf,
- (const struct sockaddr *) &task->dst,
- sizeof(task->dst));
+ (const struct sockaddr *) dst,
+ sizeof(*dst));
if (rc < 0)
diag_log();
if (task->complete != NULL)
@@ -357,7 +399,45 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
const char *pos = buf, *end = pos + size;
if (swim_meta_def_decode(&meta, &pos, end) < 0)
goto error;
- scheduler->on_input(scheduler, pos, end, &meta.src);
+ /*
+ * Check if this instance is not the destination and
+ * possibly forward the packet.
+ */
+ if (! meta.is_route_specified) {
+ scheduler->on_input(scheduler, pos, end, &meta.src, NULL);
+ return;
+ }
+ struct sockaddr_in *self = &scheduler->transport.addr;
+ if (swim_inaddr_eq(&meta.route.dst, self)) {
+ scheduler->on_input(scheduler, pos, end, &meta.route.src,
+ &meta.src);
+ return;
+ }
+ /* Forward the foreign packet. */
+ struct swim_task *task = swim_task_new(swim_task_delete_cb,
+ swim_task_delete_cb,
+ "foreign packet");
+ if (task == NULL)
+ goto error;
+ /*
+ * Allocate route meta explicitly, because the packet
+ * should keep route meta even being sent to the final
+ * destination directly.
+ */
+ swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin));
+ /*
+ * Meta should be rebuilt with the different source
+ * address - this instance. It is used by the receiver to
+ * send a reply through this instance again.
+ */
+ swim_packet_build_meta(&task->packet, self, &meta.route.src,
+ &meta.route.dst);
+ /* Copy the original body without a touch. */
+ size = end - pos;
+ char *body = swim_packet_alloc(&task->packet, size);
+ assert(body != NULL);
+ memcpy(body, pos, size);
+ swim_task_send(task, &meta.route.dst, scheduler);
return;
error:
diag_log();
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 884680859..977859db7 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -140,7 +140,8 @@ swim_packet_create(struct swim_packet *packet);
typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler,
const char *buf, const char *end,
- const struct sockaddr_in *src);
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *proxy);
/** Planner and executor of input and output operations.*/
struct swim_scheduler {
@@ -212,6 +213,11 @@ struct swim_task {
struct swim_packet packet;
/** Destination address. */
struct sockaddr_in dst;
+ /**
+ * Optional proxy via which the destination should be
+ * reached.
+ */
+ struct sockaddr_in proxy;
/** Place in a queue of tasks. */
struct rlist in_queue_output;
/**
@@ -232,6 +238,13 @@ swim_task_is_scheduled(struct swim_task *task)
return ! rlist_empty(&task->in_queue_output);
}
+/**
+ * Set the proxy for the task. Before sending this proxy is dumped
+ * into metadata section.
+ */
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy);
+
/**
* Put the task into a queue of tasks. Eventually it will be sent.
*/
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 796559e8e..18c20abf3 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -439,9 +439,9 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header,
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
- const struct sockaddr_in *src)
+ const struct sockaddr_in *src, bool has_routing)
{
- int map_size = 1 + SWIM_INADDR_BIN_SIZE;
+ int map_size = 1 + SWIM_INADDR_BIN_SIZE + has_routing;
assert(mp_sizeof_map(map_size) == 1);
header->m_header = 0x80 | map_size;
header->k_version = SWIM_META_TARANTOOL_VERSION;
@@ -452,6 +452,64 @@ swim_meta_header_bin_create(struct swim_meta_header_bin *header,
swim_inaddr_bin_fill(&header->src_addr, src);
}
+/**
+ * Decode meta routing section into meta definition object.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos MessagePack buffer to decode.
+ * @param end End of the MessagePack buffer.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+static int
+swim_meta_def_decode_route(struct swim_meta_def *def, const char **pos,
+ const char *end)
+{
+ const char *prefix = "invalid routing section:";
+ uint32_t size;
+ def->route.src.sin_family = AF_INET;
+ def->route.dst.sin_family = AF_INET;
+ if (swim_decode_map(pos, end, &size, prefix, "route") != 0)
+ return -1;
+ for (uint32_t i = 0; i < size; ++i) {
+ uint64_t key;
+ if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0)
+ return -1;
+ switch (key) {
+ case SWIM_ROUTE_SRC_ADDRESS:
+ if (swim_decode_ip(&def->route.src, pos, end, prefix,
+ "source address") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_SRC_PORT:
+ if (swim_decode_port(&def->route.src, pos, end,
+ prefix, "source port") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_DST_ADDRESS:
+ if (swim_decode_ip(&def->route.dst, pos, end, prefix,
+ "destination address") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_DST_PORT:
+ if (swim_decode_port(&def->route.dst, pos, end,
+ prefix, "destination port") != 0)
+ return -1;
+ break;
+ default:
+ diag_set(SwimError, "%s unknown key", prefix);
+ return -1;
+ }
+ }
+ if (swim_check_inaddr_not_empty(&def->route.src, prefix,
+ "source") != 0 ||
+ swim_check_inaddr_not_empty(&def->route.dst, prefix,
+ "destination") != 0)
+ return -1;
+ def->is_route_specified = true;
+ return 0;
+}
+
int
swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
const char *end)
@@ -467,6 +525,10 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0)
return -1;
switch (key) {
+ case SWIM_META_ROUTING:
+ if (swim_meta_def_decode_route(def, pos, end) != 0)
+ return -1;
+ break;
case SWIM_META_TARANTOOL_VERSION:
if (swim_decode_uint(pos, end, &key, prefix,
"version") != 0)
@@ -509,3 +571,20 @@ swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation)
header->m_incarnation = 0xcf;
header->v_incarnation = mp_bswap_u64(incarnation);
}
+
+void
+swim_route_bin_create(struct swim_route_bin *route,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *dst)
+{
+ int map_size = SWIM_INADDR_BIN_SIZE * 2;
+ assert(mp_sizeof_map(map_size) == 1);
+ route->k_routing = SWIM_META_ROUTING;
+ route->m_routing = 0x80 | map_size;
+ swim_inaddr_bin_create(&route->src_addr, SWIM_ROUTE_SRC_ADDRESS,
+ SWIM_ROUTE_SRC_PORT);
+ swim_inaddr_bin_create(&route->dst_addr, SWIM_ROUTE_DST_ADDRESS,
+ SWIM_ROUTE_DST_PORT);
+ swim_inaddr_bin_fill(&route->src_addr, src);
+ swim_inaddr_bin_fill(&route->dst_addr, dst);
+}
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index ddf9e28db..f70ac708a 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -51,7 +51,13 @@ enum {
* | { |
* | SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,|
* | SWIM_META_SRC_ADDRESS: uint, ip, |
- * | SWIM_META_SRC_PORT: uint, port |
+ * | SWIM_META_SRC_PORT: uint, port, |
+ * | SWIM_META_ROUTING: { |
+ * | SWIM_ROUTE_SRC_ADDRESS: uint, ip, |
+ * | SWIM_ROUTE_SRC_PORT: uint, port, |
+ * | SWIM_ROUTE_DST_ADDRESS: uint, ip, |
+ * | SWIM_ROUTE_DST_PORT: uint, port |
+ * | } |
* | } |
* +-------------------Protocol logic section--------------------+
* | { |
@@ -421,6 +427,33 @@ enum swim_meta_key {
*/
SWIM_META_SRC_ADDRESS,
SWIM_META_SRC_PORT,
+ /**
+ * Routing section allows to specify routes of up to 3
+ * nodes: source, proxy, and destination. It contains two
+ * addresses - the message originator and the final
+ * destination. Here is an example of an indirect message
+ * transmission. Assume, there are 3 nodes: S1, S2, S3.
+ * S1 sends a message to S3 via S2. The following steps
+ * are executed in order to deliver the message:
+ *
+ * S1 -> S2
+ * { src: S1, routing: {src: S1, dst: S3}, body: ... }
+ *
+ * S2 receives the message and sees: routing.dst != S2 -
+ * it is a foreign packet. S2 forwards it to S3 preserving
+ * all the data - body and routing sections.
+ *
+ *
+ * S2 -> S3
+ * {src: S2, routing: {src: S1, dst: S3}, body: ...}
+ *
+ * S3 receives the message and sees: routing.dst == S3 -
+ * the message is delivered. If S3 wants to answer, it
+ * sends a response via the same proxy. It knows, that the
+ * message was delivered from S2, and sends an answer via
+ * S2.
+ */
+ SWIM_META_ROUTING,
};
/**
@@ -432,7 +465,7 @@ enum swim_meta_key {
* separate MessagePack map.
*/
struct PACKED swim_meta_header_bin {
- /** mp_encode_map(3) */
+ /** mp_encode_map(3 or 4) */
uint8_t m_header;
/** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */
@@ -448,7 +481,7 @@ struct PACKED swim_meta_header_bin {
/** Initialize meta section. */
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
- const struct sockaddr_in *src);
+ const struct sockaddr_in *src, bool has_routing);
/** Meta definition. */
struct swim_meta_def {
@@ -456,6 +489,18 @@ struct swim_meta_def {
uint32_t version;
/** Source of the message. */
struct sockaddr_in src;
+ /** Route source and destination. */
+ struct {
+ struct sockaddr_in src;
+ struct sockaddr_in dst;
+ } route;
+ /**
+ * True, if both @a src and @a dst are not empty. This
+ * flag is just sugar so as not to check both addresses
+ * manually. Also in future more fields could be added
+ * here.
+ */
+ bool is_route_specified;
};
/**
@@ -471,6 +516,43 @@ int
swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
const char *end);
+enum swim_route_key {
+ /**
+ * True source of the packet. Can be different from the
+ * packet sender. It is expected that a response should be
+ * sent back to this address. Maybe indirectly through the
+ * same proxy.
+ */
+ SWIM_ROUTE_SRC_ADDRESS = 0,
+ SWIM_ROUTE_SRC_PORT,
+ /**
+ * True destination of the packet. Can be different from
+ * this instance, receiver. If it is for another instance,
+ * then this packet is forwarded to the latter.
+ */
+ SWIM_ROUTE_DST_ADDRESS,
+ SWIM_ROUTE_DST_PORT,
+ swim_route_key_MAX,
+};
+
+/** Route section template. Describes source, destination. */
+struct PACKED swim_route_bin {
+ /** mp_encode_uint(SWIM_ROUTING) */
+ uint8_t k_routing;
+ /** mp_encode_map(4) */
+ uint8_t m_routing;
+ /** SWIM_ROUTE_SRC_ADDRESS and SWIM_ROUTE_SRC_PORT. */
+ struct swim_inaddr_bin src_addr;
+ /** SWIM_ROUTE_DST_ADDRESS and SWIM_ROUTE_DST_PORT. */
+ struct swim_inaddr_bin dst_addr;
+};
+
+/** Initialize routing section. */
+void
+swim_route_bin_create(struct swim_route_bin *route,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *dst);
+
/** }}} Meta component */
enum swim_quit_key {
diff --git a/test/unit/swim_proto.c b/test/unit/swim_proto.c
index 64487fb7d..d7fafc7f0 100644
--- a/test/unit/swim_proto.c
+++ b/test/unit/swim_proto.c
@@ -189,11 +189,64 @@ swim_test_meta(void)
footer();
}
+static void
+swim_test_route(void)
+{
+ header();
+ plan(5);
+
+ char buffer[1024];
+ struct swim_meta_def mdef;
+ struct swim_meta_header_bin header;
+ struct sockaddr_in addr;
+ addr.sin_port = htons(1234);
+ fail_if(inet_aton("127.0.0.1", &addr.sin_addr) == 0);
+
+ swim_meta_header_bin_create(&header, &addr, true);
+ memcpy(buffer, &header, sizeof(header));
+ char *last_valid = buffer + sizeof(header);
+ char *end = last_valid;
+ const char *pos = buffer;
+
+ is(swim_meta_def_decode(&mdef, &pos, end), -1,
+ "route was expected, but map is too short");
+
+ end = mp_encode_uint(end, SWIM_META_ROUTING);
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), -1, "no route map");
+
+ end = mp_encode_map(end, 0);
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), -1, "empty route map");
+
+ struct swim_route_bin route;
+ struct sockaddr_in src, dst;
+ memset(&src, 0, sizeof(src));
+ memset(&dst, 0, sizeof(dst));
+ swim_route_bin_create(&route, &src, &dst);
+ memcpy(last_valid, &route, sizeof(route));
+ end = last_valid + sizeof(route);
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), -1, "zero addresses");
+
+ src.sin_port = 1;
+ src.sin_addr = addr.sin_addr;
+ dst.sin_port = 1;
+ dst.sin_addr = addr.sin_addr;
+ swim_route_bin_create(&route, &src, &dst);
+ memcpy(last_valid, &route, sizeof(route));
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), 0, "normal route");
+
+ check_plan();
+ footer();
+}
+
int
main()
{
header();
- plan(2);
+ plan(3);
memory_init();
fiber_init(fiber_c_invoke);
int fd = open("log.txt", O_TRUNC);
@@ -203,6 +256,7 @@ main()
swim_test_member_def();
swim_test_meta();
+ swim_test_route();
say_logger_free();
fiber_free();
diff --git a/test/unit/swim_proto.result b/test/unit/swim_proto.result
index c22769161..8a41a5d40 100644
--- a/test/unit/swim_proto.result
+++ b/test/unit/swim_proto.result
@@ -1,5 +1,5 @@
*** main ***
-1..2
+1..3
*** swim_test_member_def ***
1..12
ok 1 - not map header
@@ -28,4 +28,13 @@ ok 1 - subtests
ok 8 - normal meta
ok 2 - subtests
*** swim_test_meta: done ***
+ *** swim_test_route ***
+ 1..5
+ ok 1 - route was expected, but map is too short
+ ok 2 - no route map
+ ok 3 - empty route map
+ ok 4 - zero addresses
+ ok 5 - normal route
+ok 3 - subtests
+ *** swim_test_route: done ***
*** main: done ***
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 10+ messages in thread
* [tarantool-patches] [PATCH v2 3/3] swim: introduce suspicion
2019-04-24 20:21 [tarantool-patches] [PATCH v2 0/3] swim suspicion Vladislav Shpilevoy
2019-04-24 20:21 ` [tarantool-patches] [PATCH v2 1/3] swim: store sender UUID in swim io tasks Vladislav Shpilevoy
2019-04-24 20:21 ` [tarantool-patches] [PATCH v2 2/3] swim: introduce routing Vladislav Shpilevoy
@ 2019-04-24 20:22 ` Vladislav Shpilevoy
2019-04-25 10:44 ` [tarantool-patches] " Konstantin Osipov
2019-04-25 13:50 ` [tarantool-patches] Re: [PATCH v2 0/3] swim suspicion Vladislav Shpilevoy
3 siblings, 1 reply; 10+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-24 20:22 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
Suspicion component is a way how SWIM protects from
false-positive failure detections. When the network is slow, or
a SWIM node does not manage to process messages in time because
of being overloaded, other nodes will not receive ACKs in time,
but it is too soon to declare the member dead.
The nodes will mark the member as suspected, and will ping it
indirectly, via other members. It 1) gives the suspected member
more time to respond on ACKs, 2) protects from the case when it
is a network problem on particular channels.
Part of #3234
---
src/lib/swim/swim.c | 146 ++++++++++++++++++++++++++++++----
src/lib/swim/swim_constants.h | 6 ++
src/lib/swim/swim_proto.c | 1 +
test/unit/swim.c | 76 +++++++++++++-----
test/unit/swim.result | 30 ++++---
5 files changed, 213 insertions(+), 46 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 1b4a4365d..3917c71f3 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -143,11 +143,17 @@ enum {
*/
ACK_TIMEOUT_DEFAULT = 30,
/**
- * If a member has not been responding to pings this
- * number of times, it is considered dead. According to
- * the SWIM paper, for a member it is sufficient to miss
- * one direct ping, and an arbitrary but fixed number of
- * simultaneous indirect pings, to be considered dead.
+ * If an alive member has not been responding to pings
+ * this number of times, it is suspected to be dead. To
+ * confirm the death it should fail more pings.
+ */
+ NO_ACKS_TO_SUSPECT = 2,
+ /**
+ * If a suspected member has not been responding to pings
+ * this number of times, it is considered dead. According
+ * to the SWIM paper, for a member it is sufficient to
+ * miss one direct ping, and an arbitrary but fixed number
+ * of simultaneous indirect pings, to be considered dead.
* Seems too little, so here it is bigger.
*/
NO_ACKS_TO_DEAD = 3,
@@ -161,6 +167,13 @@ enum {
* anti-entropy components.
*/
NO_ACKS_TO_GC = 2,
+ /**
+ * Number of pings sent indirectly to a member via other
+ * members when it did not answer on a regular ping. The
+ * messages are sent in parallel and via different
+ * members.
+ */
+ INDIRECT_PING_COUNT = 2,
};
/**
@@ -476,10 +489,19 @@ swim_cached_round_msg_invalidate(struct swim *swim)
/** Put the member into a list of ACK waiters. */
static void
-swim_wait_ack(struct swim *swim, struct swim_member *member)
+swim_wait_ack(struct swim *swim, struct swim_member *member,
+ bool was_ping_indirect)
{
if (heap_node_is_stray(&member->in_wait_ack_heap)) {
- member->ping_deadline = swim_time() + swim->wait_ack_tick.at;
+ double timeout = swim->wait_ack_tick.at;
+ /*
+ * Direct ping is two hops: PING + ACK.
+ * Indirect ping is four hops: PING, FORWARD PING,
+ * ACK, FORWARD ACK. This is why x2 for indirects.
+ */
+ if (was_ping_indirect)
+ timeout *= 2;
+ member->ping_deadline = swim_time() + timeout;
wait_ack_heap_insert(&swim->wait_ack_heap, member);
swim_ev_timer_start(loop(), &swim->wait_ack_tick);
}
@@ -609,7 +631,7 @@ swim_ping_task_complete(struct swim_task *task,
struct swim *swim = swim_by_scheduler(scheduler);
struct swim_member *m = container_of(task, struct swim_member,
ping_task);
- swim_wait_ack(swim, m);
+ swim_wait_ack(swim, m, false);
}
/** Free member's resources. */
@@ -1064,7 +1086,7 @@ swim_complete_step(struct swim_task *task,
* dissemination and failure detection
* sections.
*/
- swim_wait_ack(swim, m);
+ swim_wait_ack(swim, m, false);
swim_decrease_event_ttd(swim);
}
}
@@ -1073,13 +1095,16 @@ swim_complete_step(struct swim_task *task,
/** Schedule send of a failure detection message. */
static void
swim_send_fd_msg(struct swim *swim, struct swim_task *task,
- const struct sockaddr_in *dst, enum swim_fd_msg_type type)
+ const struct sockaddr_in *dst, enum swim_fd_msg_type type,
+ const struct sockaddr_in *proxy)
{
/*
* Reset packet allocator in case if task is being reused.
*/
assert(! swim_task_is_scheduled(task));
swim_packet_create(&task->packet);
+ if (proxy != NULL)
+ swim_task_proxy(task, proxy);
char *header = swim_packet_alloc(&task->packet, 1);
int map_size = swim_encode_src_uuid(swim, &task->packet);
map_size += swim_encode_failure_detection(swim, &task->packet, type);
@@ -1095,7 +1120,21 @@ static inline void
swim_send_ack(struct swim *swim, struct swim_task *task,
const struct sockaddr_in *dst)
{
- swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK);
+ swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK, NULL);
+}
+
+/** Schedule an indirect ack through @a proxy. */
+static inline int
+swim_send_indirect_ack(struct swim *swim, const struct sockaddr_in *dst,
+ const struct sockaddr_in *proxy)
+{
+ struct swim_task *task =
+ swim_task_new(swim_task_delete_cb, swim_task_delete_cb,
+ "indirect ack");
+ if (task == NULL)
+ return -1;
+ swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_ACK, proxy);
+ return 0;
}
/** Schedule send of a ping. */
@@ -1103,7 +1142,68 @@ static inline void
swim_send_ping(struct swim *swim, struct swim_task *task,
const struct sockaddr_in *dst)
{
- swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_PING);
+ swim_send_fd_msg(swim, task, dst, SWIM_FD_MSG_PING, NULL);
+}
+
+/** Indirect ping task completion callback. */
+static void
+swim_iping_task_complete(struct swim_task *task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ if (rc < 0)
+ goto finish;
+ struct swim *swim = swim_by_scheduler(scheduler);
+ struct swim_member *m = swim_find_member(swim, &task->uuid);
+ /*
+ * A member can be already removed, probably manually, so
+ * check for NULL. Additionally it is possible that before
+ * this indirect ping managed to get EV_WRITE, already an
+ * ACK was received and the member is alive again. Then
+ * nothing to do.
+ */
+ if (m != NULL && m->status != MEMBER_ALIVE)
+ swim_wait_ack(swim, m, true);
+finish:
+ swim_task_delete_cb(task, scheduler, rc);
+}
+
+/** Schedule a number of indirect pings to a member @a dst. */
+static inline int
+swim_send_indirect_pings(struct swim *swim, const struct swim_member *dst)
+{
+ struct mh_swim_table_t *t = swim->members;
+ int member_count = mh_size(t);
+ int rnd = swim_scaled_rand(0, member_count - 1);
+ mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
+ for (int member_i = 0, task_i = 0; member_i < member_count &&
+ task_i < INDIRECT_PING_COUNT; ++member_i) {
+ struct swim_member *m = *mh_swim_table_node(t, rc);
+ /*
+ * It makes no sense to send an indirect ping via
+ * self and via destination - it would be just
+ * direct ping then.
+ */
+ if (m != swim->self && !swim_inaddr_eq(&dst->addr, &m->addr)) {
+ struct swim_task *t =
+ swim_task_new(swim_iping_task_complete,
+ swim_task_delete_cb,
+ "indirect ping");
+ if (t == NULL)
+ return -1;
+ t->uuid = dst->uuid;
+ swim_task_proxy(t, &m->addr);
+ swim_send_fd_msg(swim, t, &dst->addr, SWIM_FD_MSG_PING,
+ &m->addr);
+ }
+ /*
+ * First random member could be chosen too close
+ * to the hash end. Here the cycle is wrapped.
+ */
+ rc = mh_next(t, rc);
+ if (rc == end)
+ rc = mh_first(t);
+ }
+ return 0;
}
/**
@@ -1128,6 +1228,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
++m->unacknowledged_pings;
switch (m->status) {
case MEMBER_ALIVE:
+ if (m->unacknowledged_pings < NO_ACKS_TO_SUSPECT)
+ break;
+ m->status = MEMBER_SUSPECTED;
+ swim_on_member_update(swim, m);
+ if (swim_send_indirect_pings(swim, m) != 0)
+ diag_log();
+ break;
+ case MEMBER_SUSPECTED:
if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
swim_on_member_update(swim, m);
@@ -1315,7 +1423,8 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
static int
swim_process_failure_detection(struct swim *swim, const char **pos,
const char *end, const struct sockaddr_in *src,
- const struct tt_uuid *uuid)
+ const struct tt_uuid *uuid,
+ const struct sockaddr_in *proxy)
{
const char *prefix = "invalid failure detection message:";
struct swim_failure_detection_def def;
@@ -1360,8 +1469,13 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
switch (def.type) {
case SWIM_FD_MSG_PING:
- if (! swim_task_is_scheduled(&member->ack_task))
+ if (proxy != NULL) {
+ if (swim_send_indirect_ack(swim, &member->addr,
+ proxy) != 0)
+ diag_log();
+ } else if (! swim_task_is_scheduled(&member->ack_task)) {
swim_send_ack(swim, &member->ack_task, &member->addr);
+ }
break;
case SWIM_FD_MSG_ACK:
member->unacknowledged_pings = 0;
@@ -1433,7 +1547,6 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
const char *end, const struct sockaddr_in *src,
const struct sockaddr_in *proxy)
{
- (void) proxy;
const char *prefix = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
@@ -1465,7 +1578,8 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
break;
case SWIM_FAILURE_DETECTION:
if (swim_process_failure_detection(swim, &pos, end,
- src, &uuid) != 0)
+ src, &uuid,
+ proxy) != 0)
goto error;
break;
case SWIM_DISSEMINATION:
diff --git a/src/lib/swim/swim_constants.h b/src/lib/swim/swim_constants.h
index 7869ddf3e..4f8404ce3 100644
--- a/src/lib/swim/swim_constants.h
+++ b/src/lib/swim/swim_constants.h
@@ -37,6 +37,12 @@
enum swim_member_status {
/** The instance is ok, responds to requests. */
MEMBER_ALIVE = 0,
+ /**
+ * If a member has not responded to a ping, it is declared
+ * as suspected to be dead. After more failed pings it
+ * is finally dead.
+ */
+ MEMBER_SUSPECTED,
/**
* The member is considered dead. It will disappear from
* the membership after some unacknowledged pings.
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 18c20abf3..6502e40a1 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -45,6 +45,7 @@ swim_inaddr_str(const struct sockaddr_in *addr)
const char *swim_member_status_strs[] = {
"alive",
+ "suspected",
"dead",
"left",
};
diff --git a/test/unit/swim.c b/test/unit/swim.c
index e375e6607..2deaf138a 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -240,7 +240,7 @@ swim_test_add_remove(void)
static void
swim_test_basic_failure_detection(void)
{
- swim_start_test(7);
+ swim_start_test(9);
struct swim_cluster *cluster = swim_cluster_new(2);
swim_cluster_set_ack_timeout(cluster, 0.5);
@@ -248,8 +248,15 @@ swim_test_basic_failure_detection(void)
is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE,
"node is added as alive");
swim_cluster_block_io(cluster, 1);
- is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 2.4), -1,
- "member still is not dead after 2 noacks");
+ /* Roll one round to send a first ping. */
+ swim_run_for(1);
+
+ is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_SUSPECTED, 0.9), -1,
+ "member still is not suspected after 1 noack");
+ is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_SUSPECTED, 0.1), 0,
+ "but it is suspected after one more");
+ is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 1.4), -1,
+ "it is not dead after 2 more noacks");
is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 0.1), 0,
"but it is dead after one more");
@@ -304,34 +311,35 @@ swim_test_basic_gossip(void)
swim_cluster_add_link(cluster, 1, 0);
swim_cluster_set_drop(cluster, 1, 100);
/*
- * Wait two no-ACKs on S1 from S2. +1 sec to send a first
+ * Wait one no-ACK on S1 from S2. +1 sec to send a first
* ping.
*/
- swim_run_for(20 + 1);
+ swim_run_for(10 + 1);
swim_cluster_add_link(cluster, 0, 2);
swim_cluster_add_link(cluster, 2, 1);
/*
* After 10 seconds (one ack timeout) S1 should see S2 as
- * dead. But S3 still should see S2 as alive. To prevent
- * S1 from informing S3 about that the S3 IO is blocked
- * for a short time.
+ * suspected. But S3 still should see S2 as alive. To
+ * prevent S1 from informing S3 about that the S3 IO is
+ * blocked for a short time.
*/
swim_run_for(9);
is(swim_cluster_member_status(cluster, 0, 1), MEMBER_ALIVE,
"S1 still thinks that S2 is alive");
swim_cluster_block_io(cluster, 2);
swim_run_for(1);
- is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD, "but one "\
- "more second, and a third ack timed out - S1 sees S2 as dead");
+ is(swim_cluster_member_status(cluster, 0, 1), MEMBER_SUSPECTED,
+ "but one more second, and a second ack timed out - S1 sees S2 as "\
+ "suspected");
is(swim_cluster_member_status(cluster, 2, 1), MEMBER_ALIVE,
"S3 still thinks that S2 is alive");
swim_cluster_unblock_io(cluster, 2);
/*
- * At most after two round steps S1 sends 'S2 is dead' to
- * S3.
+ * At most after two round steps S1 sends
+ * 'S2 is suspected' to S3.
*/
- is(swim_cluster_wait_status(cluster, 2, 1, MEMBER_DEAD, 2), 0,
- "S3 learns about dead S2 from S1");
+ is(swim_cluster_wait_status(cluster, 2, 1, MEMBER_SUSPECTED, 2), 0,
+ "S3 learns about suspected S2 from S1");
swim_cluster_delete(cluster);
swim_finish_test();
@@ -363,10 +371,14 @@ swim_test_refute(void)
swim_cluster_add_link(cluster, 0, 1);
swim_cluster_set_drop(cluster, 1, 100);
- fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 7) != 0);
+ /* Roll one round to send a first ping. */
+ swim_run_for(1);
+
+ fail_if(swim_cluster_wait_status(cluster, 0, 1,
+ MEMBER_SUSPECTED, 4) != 0);
swim_cluster_set_drop(cluster, 1, 0);
is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 1), 0,
- "S2 increments its own incarnation to refute its death");
+ "S2 increments its own incarnation to refute its suspicion");
is(swim_cluster_wait_incarnation(cluster, 0, 1, 1, 1), 0,
"new incarnation has reached S1 with a next round message");
@@ -386,7 +398,7 @@ swim_test_too_big_packet(void)
swim_start_test(3);
int size = 50;
double ack_timeout = 1;
- double first_dead_timeout = 20;
+ double first_dead_timeout = 30;
double everywhere_dead_timeout = size;
int drop_id = size / 2;
@@ -465,7 +477,9 @@ swim_test_undead(void)
swim_cluster_add_link(cluster, 0, 1);
swim_cluster_add_link(cluster, 1, 0);
swim_cluster_set_drop(cluster, 1, 100);
- is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 4), 0,
+ /* Roll one round to send a first ping. */
+ swim_run_for(1);
+ is(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 5), 0,
"member S2 is dead");
swim_run_for(5);
is(swim_cluster_member_status(cluster, 0, 1), MEMBER_DEAD,
@@ -833,10 +847,33 @@ swim_test_payload_refutation(void)
swim_finish_test();
}
+static void
+swim_test_indirect_ping(void)
+{
+ swim_start_test(2);
+ uint16_t cluster_size = 3;
+ struct swim_cluster *cluster = swim_cluster_new(cluster_size);
+ swim_cluster_set_ack_timeout(cluster, 1);
+ for (int i = 0; i < cluster_size; ++i) {
+ for (int j = i + 1; j < cluster_size; ++j)
+ swim_cluster_interconnect(cluster, i, j);
+ }
+ swim_cluster_set_drop_channel(cluster, 0, 1, true);
+ swim_cluster_set_drop_channel(cluster, 1, 0, true);
+ swim_run_for(10);
+ is(swim_cluster_wait_status_everywhere(cluster, 0, MEMBER_ALIVE, 0),
+ 0, "S1 is still alive everywhere");
+ is(swim_cluster_wait_status_everywhere(cluster, 1, MEMBER_ALIVE, 0),
+ 0, "as well as S2 - they communicated via S3");
+
+ swim_cluster_delete(cluster);
+ swim_finish_test();
+}
+
static int
main_f(va_list ap)
{
- swim_start_test(17);
+ swim_start_test(18);
(void) ap;
swim_test_ev_init();
@@ -859,6 +896,7 @@ main_f(va_list ap)
swim_test_broadcast();
swim_test_payload_basic();
swim_test_payload_refutation();
+ swim_test_indirect_ping();
swim_test_transport_free();
swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 4b1407db3..a450d8427 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
*** main_f ***
-1..17
+1..18
*** swim_test_one_link ***
1..6
ok 1 - no rounds - no fullmesh
@@ -64,14 +64,16 @@ ok 4 - subtests
ok 5 - subtests
*** swim_test_add_remove: done ***
*** swim_test_basic_failure_detection ***
- 1..7
+ 1..9
ok 1 - node is added as alive
- ok 2 - member still is not dead after 2 noacks
- ok 3 - but it is dead after one more
- ok 4 - after 2 more unacks the member still is not deleted - dissemination TTD keeps it
- ok 5 - but it is dropped after 2 rounds when TTD gets 0
- ok 6 - fullmesh is restored
- ok 7 - a member is added back on an ACK
+ ok 2 - member still is not suspected after 1 noack
+ ok 3 - but it is suspected after one more
+ ok 4 - it is not dead after 2 more noacks
+ ok 5 - but it is dead after one more
+ ok 6 - after 2 more unacks the member still is not deleted - dissemination TTD keeps it
+ ok 7 - but it is dropped after 2 rounds when TTD gets 0
+ ok 8 - fullmesh is restored
+ ok 9 - a member is added back on an ACK
ok 6 - subtests
*** swim_test_basic_failure_detection: done ***
*** swim_test_probe ***
@@ -82,7 +84,7 @@ ok 7 - subtests
*** swim_test_probe: done ***
*** swim_test_refute ***
1..4
- ok 1 - S2 increments its own incarnation to refute its death
+ ok 1 - S2 increments its own incarnation to refute its suspicion
ok 2 - new incarnation has reached S1 with a next round message
ok 3 - after restart S2's incarnation is 0 again
ok 4 - S2 learned its old bigger incarnation 1 from S0
@@ -91,9 +93,9 @@ ok 8 - subtests
*** swim_test_basic_gossip ***
1..4
ok 1 - S1 still thinks that S2 is alive
- ok 2 - but one more second, and a third ack timed out - S1 sees S2 as dead
+ ok 2 - but one more second, and a second ack timed out - S1 sees S2 as suspected
ok 3 - S3 still thinks that S2 is alive
- ok 4 - S3 learns about dead S2 from S1
+ ok 4 - S3 learns about suspected S2 from S1
ok 9 - subtests
*** swim_test_basic_gossip: done ***
*** swim_test_too_big_packet ***
@@ -177,4 +179,10 @@ ok 16 - subtests
ok 11 - S3 learns S1's payload from S2
ok 17 - subtests
*** swim_test_payload_refutation: done ***
+ *** swim_test_indirect_ping ***
+ 1..2
+ ok 1 - S1 is still alive everywhere
+ ok 2 - as well as S2 - they communicated via S3
+ok 18 - subtests
+ *** swim_test_indirect_ping: done ***
*** main_f: done ***
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 10+ messages in thread