From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Subject: [tarantool-patches] [PATCH 5/6] swim: introduce routing Date: Wed, 24 Apr 2019 17:36:19 +0300 [thread overview] Message-ID: <6592ce57d8b6c7ef46202551f858a37ca2e18a2c.1556116199.git.v.shpilevoy@tarantool.org> (raw) In-Reply-To: <cover.1556116199.git.v.shpilevoy@tarantool.org> Before the patch SWIM packets were being sent quite straightforward from one instance to another with transparent routing on Internet Level of TCP/IP. But the SWIM paper describes last yet not implemented component - suspicion mechanism. So as not to overload this message with suspicion details it is enough to say that it makes possible sending a packet through an intermediate SWIM instance, not directly. This commit extends the SWIM protocol with a new transport-level section named 'routing'. It allows to send indirect SWIM messages transparently via packet forwarding implemented fully inside transportation component, in swim_io.c. Part of #3234 --- src/lib/swim/swim.c | 4 +- src/lib/swim/swim_io.c | 96 +++++++++++++++++++++++++++++++++---- src/lib/swim/swim_io.h | 15 +++++- src/lib/swim/swim_proto.c | 83 +++++++++++++++++++++++++++++++- src/lib/swim/swim_proto.h | 62 ++++++++++++++++++++++-- test/unit/swim_proto.c | 56 +++++++++++++++++++++- test/unit/swim_proto.result | 11 ++++- 7 files changed, 310 insertions(+), 17 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index bb1ded713..1b4a4365d 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -1430,8 +1430,10 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, /** Process a new message. */ static void swim_on_input(struct swim_scheduler *scheduler, const char *pos, - const char *end, const struct sockaddr_in *src) + const char *end, const struct sockaddr_in *src, + const struct sockaddr_in *proxy) { + (void) proxy; const char *prefix = "invalid message:"; struct swim *swim = swim_by_scheduler(scheduler); struct tt_uuid uuid; diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index a55c15f30..c7ffc31d5 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -60,8 +60,11 @@ swim_packet_create(struct swim_packet *packet) /** Fill metadata prefix of a packet. */ static inline void swim_packet_build_meta(struct swim_packet *packet, - const struct sockaddr_in *src) + const struct sockaddr_in *src, + const struct sockaddr_in *route_src, + const struct sockaddr_in *route_dst) { + assert((route_src != NULL) == (route_dst != NULL)); char *meta = packet->meta; char *end = packet->body; /* @@ -71,9 +74,18 @@ swim_packet_build_meta(struct swim_packet *packet, if (meta == end) return; struct swim_meta_header_bin header; - swim_meta_header_bin_create(&header, src); - assert(meta + sizeof(header) == end); + swim_meta_header_bin_create(&header, src, route_dst != NULL); + assert(meta + sizeof(header) <= end); memcpy(meta, &header, sizeof(header)); + meta += sizeof(header); + if (route_dst != NULL) { + struct swim_route_bin route; + swim_route_bin_create(&route, route_src, route_dst); + assert(meta + sizeof(route) <= end); + memcpy(meta, &route, sizeof(route)); + meta += sizeof(route); + } + assert(meta == end); /* * Once meta is built, it is consumed by the body. Used * not to rebuild the meta again if the task will be @@ -82,6 +94,21 @@ swim_packet_build_meta(struct swim_packet *packet, packet->body = packet->meta; } +void +swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy) +{ + /* + * Route meta should be reserved before body encoding is + * started. Otherwise later it would be necessary to move + * already encoded body, maybe having its tail trimmed off + * because of limited UDP packet size. + */ + assert(swim_packet_body_size(&task->packet) == 0); + assert(! swim_inaddr_is_empty(proxy)); + task->proxy = *proxy; + swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin)); +} + void swim_task_create(struct swim_task *task, swim_task_f complete, swim_task_f cancel, const char *desc) @@ -161,6 +188,11 @@ swim_bcast_task_delete_cb(struct swim_task *task, static int swim_bcast_task_next_addr(struct swim_bcast_task *task) { + /* + * Broadcast + proxy is not supported yet, and barely it + * will be needed anytime. + */ + assert(swim_inaddr_is_empty(&task->base.proxy)); for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) { int flags = i->ifa_flags; if ((flags & IFF_UP) == 0) @@ -320,13 +352,23 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) struct swim_task *task = rlist_shift_entry(&scheduler->queue_output, struct swim_task, in_queue_output); + const struct sockaddr_in *src = &scheduler->transport.addr; + const struct sockaddr_in *dst = &task->dst; + const char *dst_str = swim_inaddr_str(dst); + if (! swim_inaddr_is_empty(&task->proxy)) { + dst = &task->proxy; + dst_str = tt_sprintf("%s via %s", dst_str, + swim_inaddr_str(dst)); + swim_packet_build_meta(&task->packet, src, src, &task->dst); + } else { + swim_packet_build_meta(&task->packet, src, NULL, NULL); + } say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler), - task->desc, swim_inaddr_str(&task->dst)); - swim_packet_build_meta(&task->packet, &scheduler->transport.addr); + task->desc, dst_str); int rc = swim_transport_send(&scheduler->transport, task->packet.buf, task->packet.pos - task->packet.buf, - (const struct sockaddr *) &task->dst, - sizeof(task->dst)); + (const struct sockaddr *) dst, + sizeof(*dst)); if (rc < 0) diag_log(); if (task->complete != NULL) @@ -357,7 +399,45 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events) const char *pos = buf, *end = pos + size; if (swim_meta_def_decode(&meta, &pos, end) < 0) goto error; - scheduler->on_input(scheduler, pos, end, &meta.src); + /* + * Check if this instance is not the destination and + * possibly forward the packet. + */ + if (! meta.is_route_specified) { + scheduler->on_input(scheduler, pos, end, &meta.src, NULL); + return; + } + struct sockaddr_in *self = &scheduler->transport.addr; + if (swim_inaddr_eq(&meta.route.dst, self)) { + scheduler->on_input(scheduler, pos, end, &meta.route.src, + &meta.src); + return; + } + /* Forward the foreign packet. */ + struct swim_task *task = swim_task_new(swim_task_delete_cb, + swim_task_delete_cb, + "foreign packet"); + if (task == NULL) + goto error; + /* + * Allocate route meta explicitly, because the packet + * should keep route meta even being sent to the final + * destination directly. + */ + swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin)); + /* + * Meta should be rebuilt with the different source + * address - this instance. It is used by the receiver to + * send a reply through this instance again. + */ + swim_packet_build_meta(&task->packet, self, &meta.route.src, + &meta.route.dst); + /* Copy the original body without a touch. */ + size = end - pos; + char *body = swim_packet_alloc(&task->packet, size); + assert(body != NULL); + memcpy(body, pos, size); + swim_task_send(task, &meta.route.dst, scheduler); return; error: diag_log(); diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index 88a7f424d..9c567e157 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -139,7 +139,8 @@ swim_packet_create(struct swim_packet *packet); typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler, const char *buf, const char *end, - const struct sockaddr_in *src); + const struct sockaddr_in *src, + const struct sockaddr_in *proxy); /** Planner and executor of input and output operations.*/ struct swim_scheduler { @@ -211,6 +212,11 @@ struct swim_task { struct swim_packet packet; /** Destination address. */ struct sockaddr_in dst; + /** + * Optional proxy via which the destination should be + * reached. + */ + struct sockaddr_in proxy; /** Place in a queue of tasks. */ struct rlist in_queue_output; /** @@ -226,6 +232,13 @@ swim_task_is_scheduled(struct swim_task *task) return ! rlist_empty(&task->in_queue_output); } +/** + * Set the proxy for the task. Before sending this proxy is dumped + * into metadata section. + */ +void +swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy); + /** * Put the task into a queue of tasks. Eventually it will be sent. */ diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 796559e8e..18c20abf3 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -439,9 +439,9 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header, void swim_meta_header_bin_create(struct swim_meta_header_bin *header, - const struct sockaddr_in *src) + const struct sockaddr_in *src, bool has_routing) { - int map_size = 1 + SWIM_INADDR_BIN_SIZE; + int map_size = 1 + SWIM_INADDR_BIN_SIZE + has_routing; assert(mp_sizeof_map(map_size) == 1); header->m_header = 0x80 | map_size; header->k_version = SWIM_META_TARANTOOL_VERSION; @@ -452,6 +452,64 @@ swim_meta_header_bin_create(struct swim_meta_header_bin *header, swim_inaddr_bin_fill(&header->src_addr, src); } +/** + * Decode meta routing section into meta definition object. + * @param[out] def Definition to decode into. + * @param[in][out] pos MessagePack buffer to decode. + * @param end End of the MessagePack buffer. + * + * @retval 0 Success. + * @retval -1 Error. + */ +static int +swim_meta_def_decode_route(struct swim_meta_def *def, const char **pos, + const char *end) +{ + const char *prefix = "invalid routing section:"; + uint32_t size; + def->route.src.sin_family = AF_INET; + def->route.dst.sin_family = AF_INET; + if (swim_decode_map(pos, end, &size, prefix, "route") != 0) + return -1; + for (uint32_t i = 0; i < size; ++i) { + uint64_t key; + if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) + return -1; + switch (key) { + case SWIM_ROUTE_SRC_ADDRESS: + if (swim_decode_ip(&def->route.src, pos, end, prefix, + "source address") != 0) + return -1; + break; + case SWIM_ROUTE_SRC_PORT: + if (swim_decode_port(&def->route.src, pos, end, + prefix, "source port") != 0) + return -1; + break; + case SWIM_ROUTE_DST_ADDRESS: + if (swim_decode_ip(&def->route.dst, pos, end, prefix, + "destination address") != 0) + return -1; + break; + case SWIM_ROUTE_DST_PORT: + if (swim_decode_port(&def->route.dst, pos, end, + prefix, "destination port") != 0) + return -1; + break; + default: + diag_set(SwimError, "%s unknown key", prefix); + return -1; + } + } + if (swim_check_inaddr_not_empty(&def->route.src, prefix, + "source") != 0 || + swim_check_inaddr_not_empty(&def->route.dst, prefix, + "destination") != 0) + return -1; + def->is_route_specified = true; + return 0; +} + int swim_meta_def_decode(struct swim_meta_def *def, const char **pos, const char *end) @@ -467,6 +525,10 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos, if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) return -1; switch (key) { + case SWIM_META_ROUTING: + if (swim_meta_def_decode_route(def, pos, end) != 0) + return -1; + break; case SWIM_META_TARANTOOL_VERSION: if (swim_decode_uint(pos, end, &key, prefix, "version") != 0) @@ -509,3 +571,20 @@ swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation) header->m_incarnation = 0xcf; header->v_incarnation = mp_bswap_u64(incarnation); } + +void +swim_route_bin_create(struct swim_route_bin *route, + const struct sockaddr_in *src, + const struct sockaddr_in *dst) +{ + int map_size = SWIM_INADDR_BIN_SIZE * 2; + assert(mp_sizeof_map(map_size) == 1); + route->k_routing = SWIM_META_ROUTING; + route->m_routing = 0x80 | map_size; + swim_inaddr_bin_create(&route->src_addr, SWIM_ROUTE_SRC_ADDRESS, + SWIM_ROUTE_SRC_PORT); + swim_inaddr_bin_create(&route->dst_addr, SWIM_ROUTE_DST_ADDRESS, + SWIM_ROUTE_DST_PORT); + swim_inaddr_bin_fill(&route->src_addr, src); + swim_inaddr_bin_fill(&route->dst_addr, dst); +} diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index ddf9e28db..045e55415 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -51,7 +51,13 @@ enum { * | { | * | SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,| * | SWIM_META_SRC_ADDRESS: uint, ip, | - * | SWIM_META_SRC_PORT: uint, port | + * | SWIM_META_SRC_PORT: uint, port, | + * | SWIM_META_ROUTING: { | + * | SWIM_ROUTE_SRC_ADDRESS: uint, ip, | + * | SWIM_ROUTE_SRC_PORT: uint, port, | + * | SWIM_ROUTE_DST_ADDRESS: uint, ip, | + * | SWIM_ROUTE_DST_PORT: uint, port | + * | } | * | } | * +-------------------Protocol logic section--------------------+ * | { | @@ -421,6 +427,7 @@ enum swim_meta_key { */ SWIM_META_SRC_ADDRESS, SWIM_META_SRC_PORT, + SWIM_META_ROUTING, }; /** @@ -432,7 +439,7 @@ enum swim_meta_key { * separate MessagePack map. */ struct PACKED swim_meta_header_bin { - /** mp_encode_map(3) */ + /** mp_encode_map(3 or 4) */ uint8_t m_header; /** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */ @@ -448,7 +455,7 @@ struct PACKED swim_meta_header_bin { /** Initialize meta section. */ void swim_meta_header_bin_create(struct swim_meta_header_bin *header, - const struct sockaddr_in *src); + const struct sockaddr_in *src, bool has_routing); /** Meta definition. */ struct swim_meta_def { @@ -456,6 +463,18 @@ struct swim_meta_def { uint32_t version; /** Source of the message. */ struct sockaddr_in src; + /** Route source and destination. */ + struct { + struct sockaddr_in src; + struct sockaddr_in dst; + } route; + /** + * True, if both @a src and @a dst are not empty. This + * flag is just sugar so as not to check both addresses + * manually. Also in future more fields could be added + * here. + */ + bool is_route_specified; }; /** @@ -471,6 +490,43 @@ int swim_meta_def_decode(struct swim_meta_def *def, const char **pos, const char *end); +enum swim_route_key { + /** + * True source of the packet. Can be different from the + * packet sender. It is expected that a response should be + * sent back to this address. Maybe indirectly through the + * same proxy. + */ + SWIM_ROUTE_SRC_ADDRESS = 0, + SWIM_ROUTE_SRC_PORT, + /** + * True destination of the packet. Can be different from + * this instance, receiver. If it is for another instance, + * then this packet is forwarded to the latter. + */ + SWIM_ROUTE_DST_ADDRESS, + SWIM_ROUTE_DST_PORT, + swim_route_key_MAX, +}; + +/** Route section template. Describes source, destination. */ +struct PACKED swim_route_bin { + /** mp_encode_uint(SWIM_ROUTING) */ + uint8_t k_routing; + /** mp_encode_map(4) */ + uint8_t m_routing; + /** SWIM_ROUTE_SRC_ADDRESS and SWIM_ROUTE_SRC_PORT. */ + struct swim_inaddr_bin src_addr; + /** SWIM_ROUTE_DST_ADDRESS and SWIM_ROUTE_DST_PORT. */ + struct swim_inaddr_bin dst_addr; +}; + +/** Initialize routing section. */ +void +swim_route_bin_create(struct swim_route_bin *route, + const struct sockaddr_in *src, + const struct sockaddr_in *dst); + /** }}} Meta component */ enum swim_quit_key { diff --git a/test/unit/swim_proto.c b/test/unit/swim_proto.c index 64487fb7d..d7fafc7f0 100644 --- a/test/unit/swim_proto.c +++ b/test/unit/swim_proto.c @@ -189,11 +189,64 @@ swim_test_meta(void) footer(); } +static void +swim_test_route(void) +{ + header(); + plan(5); + + char buffer[1024]; + struct swim_meta_def mdef; + struct swim_meta_header_bin header; + struct sockaddr_in addr; + addr.sin_port = htons(1234); + fail_if(inet_aton("127.0.0.1", &addr.sin_addr) == 0); + + swim_meta_header_bin_create(&header, &addr, true); + memcpy(buffer, &header, sizeof(header)); + char *last_valid = buffer + sizeof(header); + char *end = last_valid; + const char *pos = buffer; + + is(swim_meta_def_decode(&mdef, &pos, end), -1, + "route was expected, but map is too short"); + + end = mp_encode_uint(end, SWIM_META_ROUTING); + pos = buffer; + is(swim_meta_def_decode(&mdef, &pos, end), -1, "no route map"); + + end = mp_encode_map(end, 0); + pos = buffer; + is(swim_meta_def_decode(&mdef, &pos, end), -1, "empty route map"); + + struct swim_route_bin route; + struct sockaddr_in src, dst; + memset(&src, 0, sizeof(src)); + memset(&dst, 0, sizeof(dst)); + swim_route_bin_create(&route, &src, &dst); + memcpy(last_valid, &route, sizeof(route)); + end = last_valid + sizeof(route); + pos = buffer; + is(swim_meta_def_decode(&mdef, &pos, end), -1, "zero addresses"); + + src.sin_port = 1; + src.sin_addr = addr.sin_addr; + dst.sin_port = 1; + dst.sin_addr = addr.sin_addr; + swim_route_bin_create(&route, &src, &dst); + memcpy(last_valid, &route, sizeof(route)); + pos = buffer; + is(swim_meta_def_decode(&mdef, &pos, end), 0, "normal route"); + + check_plan(); + footer(); +} + int main() { header(); - plan(2); + plan(3); memory_init(); fiber_init(fiber_c_invoke); int fd = open("log.txt", O_TRUNC); @@ -203,6 +256,7 @@ main() swim_test_member_def(); swim_test_meta(); + swim_test_route(); say_logger_free(); fiber_free(); diff --git a/test/unit/swim_proto.result b/test/unit/swim_proto.result index c22769161..8a41a5d40 100644 --- a/test/unit/swim_proto.result +++ b/test/unit/swim_proto.result @@ -1,5 +1,5 @@ *** main *** -1..2 +1..3 *** swim_test_member_def *** 1..12 ok 1 - not map header @@ -28,4 +28,13 @@ ok 1 - subtests ok 8 - normal meta ok 2 - subtests *** swim_test_meta: done *** + *** swim_test_route *** + 1..5 + ok 1 - route was expected, but map is too short + ok 2 - no route map + ok 3 - empty route map + ok 4 - zero addresses + ok 5 - normal route +ok 3 - subtests + *** swim_test_route: done *** *** main: done *** -- 2.20.1 (Apple Git-117)
next prev parent reply other threads:[~2019-04-24 14:36 UTC|newest] Thread overview: 22+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-04-24 14:36 [tarantool-patches] [PATCH 0/6] swim suspicion Vladislav Shpilevoy 2019-04-24 14:36 ` [tarantool-patches] [PATCH 1/6] test: rename swim_cluster_node to swim_cluster_member Vladislav Shpilevoy 2019-04-24 16:37 ` [tarantool-patches] " Konstantin Osipov 2019-04-24 14:36 ` [tarantool-patches] [PATCH 2/6] test: remove swim packet filter destructors Vladislav Shpilevoy 2019-04-24 16:37 ` [tarantool-patches] " Konstantin Osipov 2019-04-24 14:36 ` [tarantool-patches] [PATCH 3/6] test: introduce swim packet filter by destination address Vladislav Shpilevoy 2019-04-24 16:38 ` [tarantool-patches] " Konstantin Osipov 2019-04-24 14:36 ` [tarantool-patches] [PATCH 4/6] swim: wrap sio_strfaddr() Vladislav Shpilevoy 2019-04-24 16:40 ` [tarantool-patches] " Konstantin Osipov 2019-04-24 20:23 ` Vladislav Shpilevoy 2019-04-25 10:34 ` Konstantin Osipov 2019-04-25 13:50 ` Vladislav Shpilevoy 2019-04-24 14:36 ` Vladislav Shpilevoy [this message] 2019-04-24 16:46 ` [tarantool-patches] Re: [PATCH 5/6] swim: introduce routing Konstantin Osipov 2019-04-24 20:25 ` Vladislav Shpilevoy 2019-04-25 10:39 ` Konstantin Osipov 2019-04-25 13:50 ` Vladislav Shpilevoy 2019-04-25 13:57 ` Konstantin Osipov 2019-04-24 14:36 ` [tarantool-patches] [PATCH 6/6] swim: introduce suspicion Vladislav Shpilevoy 2019-04-24 17:01 ` [tarantool-patches] " Konstantin Osipov 2019-04-24 20:28 ` Vladislav Shpilevoy 2019-04-25 10:42 ` Konstantin Osipov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=6592ce57d8b6c7ef46202551f858a37ca2e18a2c.1556116199.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=kostja@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH 5/6] swim: introduce routing' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox