[tarantool-patches] [PATCH v2 2/3] swim: introduce routing
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Apr 24 23:21:59 MSK 2019
Before the patch SWIM packets were being sent quite
straightforward from one instance to another with transparent
routing on Internet Level of TCP/IP. But the SWIM paper
describes last yet not implemented component - suspicion
mechanism.
So as not to overload this message with suspicion details it is
enough to say that it makes possible sending a packet through an
intermediate SWIM instance, not directly.
This commit extends the SWIM protocol with a new transport-level
section named 'routing'. It allows to send indirect SWIM messages
transparently via packet forwarding implemented fully inside
transportation component, in swim_io.c.
Part of #3234
---
src/lib/swim/swim.c | 4 +-
src/lib/swim/swim_io.c | 96 +++++++++++++++++++++++++++++++++----
src/lib/swim/swim_io.h | 15 +++++-
src/lib/swim/swim_proto.c | 83 +++++++++++++++++++++++++++++++-
src/lib/swim/swim_proto.h | 88 ++++++++++++++++++++++++++++++++--
test/unit/swim_proto.c | 56 +++++++++++++++++++++-
test/unit/swim_proto.result | 11 ++++-
7 files changed, 336 insertions(+), 17 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index bb1ded713..1b4a4365d 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1430,8 +1430,10 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end,
/** Process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler, const char *pos,
- const char *end, const struct sockaddr_in *src)
+ const char *end, const struct sockaddr_in *src,
+ const struct sockaddr_in *proxy)
{
+ (void) proxy;
const char *prefix = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index a55c15f30..c7ffc31d5 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -60,8 +60,11 @@ swim_packet_create(struct swim_packet *packet)
/** Fill metadata prefix of a packet. */
static inline void
swim_packet_build_meta(struct swim_packet *packet,
- const struct sockaddr_in *src)
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *route_src,
+ const struct sockaddr_in *route_dst)
{
+ assert((route_src != NULL) == (route_dst != NULL));
char *meta = packet->meta;
char *end = packet->body;
/*
@@ -71,9 +74,18 @@ swim_packet_build_meta(struct swim_packet *packet,
if (meta == end)
return;
struct swim_meta_header_bin header;
- swim_meta_header_bin_create(&header, src);
- assert(meta + sizeof(header) == end);
+ swim_meta_header_bin_create(&header, src, route_dst != NULL);
+ assert(meta + sizeof(header) <= end);
memcpy(meta, &header, sizeof(header));
+ meta += sizeof(header);
+ if (route_dst != NULL) {
+ struct swim_route_bin route;
+ swim_route_bin_create(&route, route_src, route_dst);
+ assert(meta + sizeof(route) <= end);
+ memcpy(meta, &route, sizeof(route));
+ meta += sizeof(route);
+ }
+ assert(meta == end);
/*
* Once meta is built, it is consumed by the body. Used
* not to rebuild the meta again if the task will be
@@ -82,6 +94,21 @@ swim_packet_build_meta(struct swim_packet *packet,
packet->body = packet->meta;
}
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
+{
+ /*
+ * Route meta should be reserved before body encoding is
+ * started. Otherwise later it would be necessary to move
+ * already encoded body, maybe having its tail trimmed off
+ * because of limited UDP packet size.
+ */
+ assert(swim_packet_body_size(&task->packet) == 0);
+ assert(! swim_inaddr_is_empty(proxy));
+ task->proxy = *proxy;
+ swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin));
+}
+
void
swim_task_create(struct swim_task *task, swim_task_f complete,
swim_task_f cancel, const char *desc)
@@ -161,6 +188,11 @@ swim_bcast_task_delete_cb(struct swim_task *task,
static int
swim_bcast_task_next_addr(struct swim_bcast_task *task)
{
+ /*
+ * Broadcast + proxy is not supported yet, and barely it
+ * will be needed anytime.
+ */
+ assert(swim_inaddr_is_empty(&task->base.proxy));
for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) {
int flags = i->ifa_flags;
if ((flags & IFF_UP) == 0)
@@ -320,13 +352,23 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
struct swim_task *task =
rlist_shift_entry(&scheduler->queue_output, struct swim_task,
in_queue_output);
+ const struct sockaddr_in *src = &scheduler->transport.addr;
+ const struct sockaddr_in *dst = &task->dst;
+ const char *dst_str = swim_inaddr_str(dst);
+ if (! swim_inaddr_is_empty(&task->proxy)) {
+ dst = &task->proxy;
+ dst_str = tt_sprintf("%s via %s", dst_str,
+ swim_inaddr_str(dst));
+ swim_packet_build_meta(&task->packet, src, src, &task->dst);
+ } else {
+ swim_packet_build_meta(&task->packet, src, NULL, NULL);
+ }
say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler),
- task->desc, swim_inaddr_str(&task->dst));
- swim_packet_build_meta(&task->packet, &scheduler->transport.addr);
+ task->desc, dst_str);
int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
task->packet.pos - task->packet.buf,
- (const struct sockaddr *) &task->dst,
- sizeof(task->dst));
+ (const struct sockaddr *) dst,
+ sizeof(*dst));
if (rc < 0)
diag_log();
if (task->complete != NULL)
@@ -357,7 +399,45 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
const char *pos = buf, *end = pos + size;
if (swim_meta_def_decode(&meta, &pos, end) < 0)
goto error;
- scheduler->on_input(scheduler, pos, end, &meta.src);
+ /*
+ * Check if this instance is not the destination and
+ * possibly forward the packet.
+ */
+ if (! meta.is_route_specified) {
+ scheduler->on_input(scheduler, pos, end, &meta.src, NULL);
+ return;
+ }
+ struct sockaddr_in *self = &scheduler->transport.addr;
+ if (swim_inaddr_eq(&meta.route.dst, self)) {
+ scheduler->on_input(scheduler, pos, end, &meta.route.src,
+ &meta.src);
+ return;
+ }
+ /* Forward the foreign packet. */
+ struct swim_task *task = swim_task_new(swim_task_delete_cb,
+ swim_task_delete_cb,
+ "foreign packet");
+ if (task == NULL)
+ goto error;
+ /*
+ * Allocate route meta explicitly, because the packet
+ * should keep route meta even being sent to the final
+ * destination directly.
+ */
+ swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin));
+ /*
+ * Meta should be rebuilt with the different source
+ * address - this instance. It is used by the receiver to
+ * send a reply through this instance again.
+ */
+ swim_packet_build_meta(&task->packet, self, &meta.route.src,
+ &meta.route.dst);
+ /* Copy the original body without a touch. */
+ size = end - pos;
+ char *body = swim_packet_alloc(&task->packet, size);
+ assert(body != NULL);
+ memcpy(body, pos, size);
+ swim_task_send(task, &meta.route.dst, scheduler);
return;
error:
diag_log();
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 884680859..977859db7 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -140,7 +140,8 @@ swim_packet_create(struct swim_packet *packet);
typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler,
const char *buf, const char *end,
- const struct sockaddr_in *src);
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *proxy);
/** Planner and executor of input and output operations.*/
struct swim_scheduler {
@@ -212,6 +213,11 @@ struct swim_task {
struct swim_packet packet;
/** Destination address. */
struct sockaddr_in dst;
+ /**
+ * Optional proxy via which the destination should be
+ * reached.
+ */
+ struct sockaddr_in proxy;
/** Place in a queue of tasks. */
struct rlist in_queue_output;
/**
@@ -232,6 +238,13 @@ swim_task_is_scheduled(struct swim_task *task)
return ! rlist_empty(&task->in_queue_output);
}
+/**
+ * Set the proxy for the task. Before sending this proxy is dumped
+ * into metadata section.
+ */
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy);
+
/**
* Put the task into a queue of tasks. Eventually it will be sent.
*/
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 796559e8e..18c20abf3 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -439,9 +439,9 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header,
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
- const struct sockaddr_in *src)
+ const struct sockaddr_in *src, bool has_routing)
{
- int map_size = 1 + SWIM_INADDR_BIN_SIZE;
+ int map_size = 1 + SWIM_INADDR_BIN_SIZE + has_routing;
assert(mp_sizeof_map(map_size) == 1);
header->m_header = 0x80 | map_size;
header->k_version = SWIM_META_TARANTOOL_VERSION;
@@ -452,6 +452,64 @@ swim_meta_header_bin_create(struct swim_meta_header_bin *header,
swim_inaddr_bin_fill(&header->src_addr, src);
}
+/**
+ * Decode meta routing section into meta definition object.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos MessagePack buffer to decode.
+ * @param end End of the MessagePack buffer.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+static int
+swim_meta_def_decode_route(struct swim_meta_def *def, const char **pos,
+ const char *end)
+{
+ const char *prefix = "invalid routing section:";
+ uint32_t size;
+ def->route.src.sin_family = AF_INET;
+ def->route.dst.sin_family = AF_INET;
+ if (swim_decode_map(pos, end, &size, prefix, "route") != 0)
+ return -1;
+ for (uint32_t i = 0; i < size; ++i) {
+ uint64_t key;
+ if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0)
+ return -1;
+ switch (key) {
+ case SWIM_ROUTE_SRC_ADDRESS:
+ if (swim_decode_ip(&def->route.src, pos, end, prefix,
+ "source address") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_SRC_PORT:
+ if (swim_decode_port(&def->route.src, pos, end,
+ prefix, "source port") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_DST_ADDRESS:
+ if (swim_decode_ip(&def->route.dst, pos, end, prefix,
+ "destination address") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_DST_PORT:
+ if (swim_decode_port(&def->route.dst, pos, end,
+ prefix, "destination port") != 0)
+ return -1;
+ break;
+ default:
+ diag_set(SwimError, "%s unknown key", prefix);
+ return -1;
+ }
+ }
+ if (swim_check_inaddr_not_empty(&def->route.src, prefix,
+ "source") != 0 ||
+ swim_check_inaddr_not_empty(&def->route.dst, prefix,
+ "destination") != 0)
+ return -1;
+ def->is_route_specified = true;
+ return 0;
+}
+
int
swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
const char *end)
@@ -467,6 +525,10 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0)
return -1;
switch (key) {
+ case SWIM_META_ROUTING:
+ if (swim_meta_def_decode_route(def, pos, end) != 0)
+ return -1;
+ break;
case SWIM_META_TARANTOOL_VERSION:
if (swim_decode_uint(pos, end, &key, prefix,
"version") != 0)
@@ -509,3 +571,20 @@ swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation)
header->m_incarnation = 0xcf;
header->v_incarnation = mp_bswap_u64(incarnation);
}
+
+void
+swim_route_bin_create(struct swim_route_bin *route,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *dst)
+{
+ int map_size = SWIM_INADDR_BIN_SIZE * 2;
+ assert(mp_sizeof_map(map_size) == 1);
+ route->k_routing = SWIM_META_ROUTING;
+ route->m_routing = 0x80 | map_size;
+ swim_inaddr_bin_create(&route->src_addr, SWIM_ROUTE_SRC_ADDRESS,
+ SWIM_ROUTE_SRC_PORT);
+ swim_inaddr_bin_create(&route->dst_addr, SWIM_ROUTE_DST_ADDRESS,
+ SWIM_ROUTE_DST_PORT);
+ swim_inaddr_bin_fill(&route->src_addr, src);
+ swim_inaddr_bin_fill(&route->dst_addr, dst);
+}
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index ddf9e28db..f70ac708a 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -51,7 +51,13 @@ enum {
* | { |
* | SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,|
* | SWIM_META_SRC_ADDRESS: uint, ip, |
- * | SWIM_META_SRC_PORT: uint, port |
+ * | SWIM_META_SRC_PORT: uint, port, |
+ * | SWIM_META_ROUTING: { |
+ * | SWIM_ROUTE_SRC_ADDRESS: uint, ip, |
+ * | SWIM_ROUTE_SRC_PORT: uint, port, |
+ * | SWIM_ROUTE_DST_ADDRESS: uint, ip, |
+ * | SWIM_ROUTE_DST_PORT: uint, port |
+ * | } |
* | } |
* +-------------------Protocol logic section--------------------+
* | { |
@@ -421,6 +427,33 @@ enum swim_meta_key {
*/
SWIM_META_SRC_ADDRESS,
SWIM_META_SRC_PORT,
+ /**
+ * Routing section allows to specify routes of up to 3
+ * nodes: source, proxy, and destination. It contains two
+ * addresses - the message originator and the final
+ * destination. Here is an example of an indirect message
+ * transmission. Assume, there are 3 nodes: S1, S2, S3.
+ * S1 sends a message to S3 via S2. The following steps
+ * are executed in order to deliver the message:
+ *
+ * S1 -> S2
+ * { src: S1, routing: {src: S1, dst: S3}, body: ... }
+ *
+ * S2 receives the message and sees: routing.dst != S2 -
+ * it is a foreign packet. S2 forwards it to S3 preserving
+ * all the data - body and routing sections.
+ *
+ *
+ * S2 -> S3
+ * {src: S2, routing: {src: S1, dst: S3}, body: ...}
+ *
+ * S3 receives the message and sees: routing.dst == S3 -
+ * the message is delivered. If S3 wants to answer, it
+ * sends a response via the same proxy. It knows, that the
+ * message was delivered from S2, and sends an answer via
+ * S2.
+ */
+ SWIM_META_ROUTING,
};
/**
@@ -432,7 +465,7 @@ enum swim_meta_key {
* separate MessagePack map.
*/
struct PACKED swim_meta_header_bin {
- /** mp_encode_map(3) */
+ /** mp_encode_map(3 or 4) */
uint8_t m_header;
/** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */
@@ -448,7 +481,7 @@ struct PACKED swim_meta_header_bin {
/** Initialize meta section. */
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
- const struct sockaddr_in *src);
+ const struct sockaddr_in *src, bool has_routing);
/** Meta definition. */
struct swim_meta_def {
@@ -456,6 +489,18 @@ struct swim_meta_def {
uint32_t version;
/** Source of the message. */
struct sockaddr_in src;
+ /** Route source and destination. */
+ struct {
+ struct sockaddr_in src;
+ struct sockaddr_in dst;
+ } route;
+ /**
+ * True, if both @a src and @a dst are not empty. This
+ * flag is just sugar so as not to check both addresses
+ * manually. Also in future more fields could be added
+ * here.
+ */
+ bool is_route_specified;
};
/**
@@ -471,6 +516,43 @@ int
swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
const char *end);
+enum swim_route_key {
+ /**
+ * True source of the packet. Can be different from the
+ * packet sender. It is expected that a response should be
+ * sent back to this address. Maybe indirectly through the
+ * same proxy.
+ */
+ SWIM_ROUTE_SRC_ADDRESS = 0,
+ SWIM_ROUTE_SRC_PORT,
+ /**
+ * True destination of the packet. Can be different from
+ * this instance, receiver. If it is for another instance,
+ * then this packet is forwarded to the latter.
+ */
+ SWIM_ROUTE_DST_ADDRESS,
+ SWIM_ROUTE_DST_PORT,
+ swim_route_key_MAX,
+};
+
+/** Route section template. Describes source, destination. */
+struct PACKED swim_route_bin {
+ /** mp_encode_uint(SWIM_ROUTING) */
+ uint8_t k_routing;
+ /** mp_encode_map(4) */
+ uint8_t m_routing;
+ /** SWIM_ROUTE_SRC_ADDRESS and SWIM_ROUTE_SRC_PORT. */
+ struct swim_inaddr_bin src_addr;
+ /** SWIM_ROUTE_DST_ADDRESS and SWIM_ROUTE_DST_PORT. */
+ struct swim_inaddr_bin dst_addr;
+};
+
+/** Initialize routing section. */
+void
+swim_route_bin_create(struct swim_route_bin *route,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *dst);
+
/** }}} Meta component */
enum swim_quit_key {
diff --git a/test/unit/swim_proto.c b/test/unit/swim_proto.c
index 64487fb7d..d7fafc7f0 100644
--- a/test/unit/swim_proto.c
+++ b/test/unit/swim_proto.c
@@ -189,11 +189,64 @@ swim_test_meta(void)
footer();
}
+static void
+swim_test_route(void)
+{
+ header();
+ plan(5);
+
+ char buffer[1024];
+ struct swim_meta_def mdef;
+ struct swim_meta_header_bin header;
+ struct sockaddr_in addr;
+ addr.sin_port = htons(1234);
+ fail_if(inet_aton("127.0.0.1", &addr.sin_addr) == 0);
+
+ swim_meta_header_bin_create(&header, &addr, true);
+ memcpy(buffer, &header, sizeof(header));
+ char *last_valid = buffer + sizeof(header);
+ char *end = last_valid;
+ const char *pos = buffer;
+
+ is(swim_meta_def_decode(&mdef, &pos, end), -1,
+ "route was expected, but map is too short");
+
+ end = mp_encode_uint(end, SWIM_META_ROUTING);
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), -1, "no route map");
+
+ end = mp_encode_map(end, 0);
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), -1, "empty route map");
+
+ struct swim_route_bin route;
+ struct sockaddr_in src, dst;
+ memset(&src, 0, sizeof(src));
+ memset(&dst, 0, sizeof(dst));
+ swim_route_bin_create(&route, &src, &dst);
+ memcpy(last_valid, &route, sizeof(route));
+ end = last_valid + sizeof(route);
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), -1, "zero addresses");
+
+ src.sin_port = 1;
+ src.sin_addr = addr.sin_addr;
+ dst.sin_port = 1;
+ dst.sin_addr = addr.sin_addr;
+ swim_route_bin_create(&route, &src, &dst);
+ memcpy(last_valid, &route, sizeof(route));
+ pos = buffer;
+ is(swim_meta_def_decode(&mdef, &pos, end), 0, "normal route");
+
+ check_plan();
+ footer();
+}
+
int
main()
{
header();
- plan(2);
+ plan(3);
memory_init();
fiber_init(fiber_c_invoke);
int fd = open("log.txt", O_TRUNC);
@@ -203,6 +256,7 @@ main()
swim_test_member_def();
swim_test_meta();
+ swim_test_route();
say_logger_free();
fiber_free();
diff --git a/test/unit/swim_proto.result b/test/unit/swim_proto.result
index c22769161..8a41a5d40 100644
--- a/test/unit/swim_proto.result
+++ b/test/unit/swim_proto.result
@@ -1,5 +1,5 @@
*** main ***
-1..2
+1..3
*** swim_test_member_def ***
1..12
ok 1 - not map header
@@ -28,4 +28,13 @@ ok 1 - subtests
ok 8 - normal meta
ok 2 - subtests
*** swim_test_meta: done ***
+ *** swim_test_route ***
+ 1..5
+ ok 1 - route was expected, but map is too short
+ ok 2 - no route map
+ ok 3 - empty route map
+ ok 4 - zero addresses
+ ok 5 - normal route
+ok 3 - subtests
+ *** swim_test_route: done ***
*** main: done ***
--
2.20.1 (Apple Git-117)
More information about the Tarantool-patches
mailing list