From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id CC0152C1E1 for ; Wed, 24 Apr 2019 16:22:03 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id HGiMDMnr9xEi for ; Wed, 24 Apr 2019 16:22:03 -0400 (EDT) Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 672022C1D6 for ; Wed, 24 Apr 2019 16:22:03 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH v2 2/3] swim: introduce routing Date: Wed, 24 Apr 2019 23:21:59 +0300 Message-Id: <13469cbbd78fa21b79db877dcde3fc8378d27af3.1556137229.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Before the patch SWIM packets were being sent quite straightforward from one instance to another with transparent routing on Internet Level of TCP/IP. But the SWIM paper describes last yet not implemented component - suspicion mechanism. So as not to overload this message with suspicion details it is enough to say that it makes possible sending a packet through an intermediate SWIM instance, not directly. This commit extends the SWIM protocol with a new transport-level section named 'routing'. It allows to send indirect SWIM messages transparently via packet forwarding implemented fully inside transportation component, in swim_io.c. Part of #3234 --- src/lib/swim/swim.c | 4 +- src/lib/swim/swim_io.c | 96 +++++++++++++++++++++++++++++++++---- src/lib/swim/swim_io.h | 15 +++++- src/lib/swim/swim_proto.c | 83 +++++++++++++++++++++++++++++++- src/lib/swim/swim_proto.h | 88 ++++++++++++++++++++++++++++++++-- test/unit/swim_proto.c | 56 +++++++++++++++++++++- test/unit/swim_proto.result | 11 ++++- 7 files changed, 336 insertions(+), 17 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index bb1ded713..1b4a4365d 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -1430,8 +1430,10 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end, /** Process a new message. */ static void swim_on_input(struct swim_scheduler *scheduler, const char *pos, - const char *end, const struct sockaddr_in *src) + const char *end, const struct sockaddr_in *src, + const struct sockaddr_in *proxy) { + (void) proxy; const char *prefix = "invalid message:"; struct swim *swim = swim_by_scheduler(scheduler); struct tt_uuid uuid; diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index a55c15f30..c7ffc31d5 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -60,8 +60,11 @@ swim_packet_create(struct swim_packet *packet) /** Fill metadata prefix of a packet. */ static inline void swim_packet_build_meta(struct swim_packet *packet, - const struct sockaddr_in *src) + const struct sockaddr_in *src, + const struct sockaddr_in *route_src, + const struct sockaddr_in *route_dst) { + assert((route_src != NULL) == (route_dst != NULL)); char *meta = packet->meta; char *end = packet->body; /* @@ -71,9 +74,18 @@ swim_packet_build_meta(struct swim_packet *packet, if (meta == end) return; struct swim_meta_header_bin header; - swim_meta_header_bin_create(&header, src); - assert(meta + sizeof(header) == end); + swim_meta_header_bin_create(&header, src, route_dst != NULL); + assert(meta + sizeof(header) <= end); memcpy(meta, &header, sizeof(header)); + meta += sizeof(header); + if (route_dst != NULL) { + struct swim_route_bin route; + swim_route_bin_create(&route, route_src, route_dst); + assert(meta + sizeof(route) <= end); + memcpy(meta, &route, sizeof(route)); + meta += sizeof(route); + } + assert(meta == end); /* * Once meta is built, it is consumed by the body. Used * not to rebuild the meta again if the task will be @@ -82,6 +94,21 @@ swim_packet_build_meta(struct swim_packet *packet, packet->body = packet->meta; } +void +swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy) +{ + /* + * Route meta should be reserved before body encoding is + * started. Otherwise later it would be necessary to move + * already encoded body, maybe having its tail trimmed off + * because of limited UDP packet size. + */ + assert(swim_packet_body_size(&task->packet) == 0); + assert(! swim_inaddr_is_empty(proxy)); + task->proxy = *proxy; + swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin)); +} + void swim_task_create(struct swim_task *task, swim_task_f complete, swim_task_f cancel, const char *desc) @@ -161,6 +188,11 @@ swim_bcast_task_delete_cb(struct swim_task *task, static int swim_bcast_task_next_addr(struct swim_bcast_task *task) { + /* + * Broadcast + proxy is not supported yet, and barely it + * will be needed anytime. + */ + assert(swim_inaddr_is_empty(&task->base.proxy)); for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) { int flags = i->ifa_flags; if ((flags & IFF_UP) == 0) @@ -320,13 +352,23 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) struct swim_task *task = rlist_shift_entry(&scheduler->queue_output, struct swim_task, in_queue_output); + const struct sockaddr_in *src = &scheduler->transport.addr; + const struct sockaddr_in *dst = &task->dst; + const char *dst_str = swim_inaddr_str(dst); + if (! swim_inaddr_is_empty(&task->proxy)) { + dst = &task->proxy; + dst_str = tt_sprintf("%s via %s", dst_str, + swim_inaddr_str(dst)); + swim_packet_build_meta(&task->packet, src, src, &task->dst); + } else { + swim_packet_build_meta(&task->packet, src, NULL, NULL); + } say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler), - task->desc, swim_inaddr_str(&task->dst)); - swim_packet_build_meta(&task->packet, &scheduler->transport.addr); + task->desc, dst_str); int rc = swim_transport_send(&scheduler->transport, task->packet.buf, task->packet.pos - task->packet.buf, - (const struct sockaddr *) &task->dst, - sizeof(task->dst)); + (const struct sockaddr *) dst, + sizeof(*dst)); if (rc < 0) diag_log(); if (task->complete != NULL) @@ -357,7 +399,45 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events) const char *pos = buf, *end = pos + size; if (swim_meta_def_decode(&meta, &pos, end) < 0) goto error; - scheduler->on_input(scheduler, pos, end, &meta.src); + /* + * Check if this instance is not the destination and + * possibly forward the packet. + */ + if (! meta.is_route_specified) { + scheduler->on_input(scheduler, pos, end, &meta.src, NULL); + return; + } + struct sockaddr_in *self = &scheduler->transport.addr; + if (swim_inaddr_eq(&meta.route.dst, self)) { + scheduler->on_input(scheduler, pos, end, &meta.route.src, + &meta.src); + return; + } + /* Forward the foreign packet. */ + struct swim_task *task = swim_task_new(swim_task_delete_cb, + swim_task_delete_cb, + "foreign packet"); + if (task == NULL) + goto error; + /* + * Allocate route meta explicitly, because the packet + * should keep route meta even being sent to the final + * destination directly. + */ + swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin)); + /* + * Meta should be rebuilt with the different source + * address - this instance. It is used by the receiver to + * send a reply through this instance again. + */ + swim_packet_build_meta(&task->packet, self, &meta.route.src, + &meta.route.dst); + /* Copy the original body without a touch. */ + size = end - pos; + char *body = swim_packet_alloc(&task->packet, size); + assert(body != NULL); + memcpy(body, pos, size); + swim_task_send(task, &meta.route.dst, scheduler); return; error: diag_log(); diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index 884680859..977859db7 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -140,7 +140,8 @@ swim_packet_create(struct swim_packet *packet); typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler, const char *buf, const char *end, - const struct sockaddr_in *src); + const struct sockaddr_in *src, + const struct sockaddr_in *proxy); /** Planner and executor of input and output operations.*/ struct swim_scheduler { @@ -212,6 +213,11 @@ struct swim_task { struct swim_packet packet; /** Destination address. */ struct sockaddr_in dst; + /** + * Optional proxy via which the destination should be + * reached. + */ + struct sockaddr_in proxy; /** Place in a queue of tasks. */ struct rlist in_queue_output; /** @@ -232,6 +238,13 @@ swim_task_is_scheduled(struct swim_task *task) return ! rlist_empty(&task->in_queue_output); } +/** + * Set the proxy for the task. Before sending this proxy is dumped + * into metadata section. + */ +void +swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy); + /** * Put the task into a queue of tasks. Eventually it will be sent. */ diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 796559e8e..18c20abf3 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -439,9 +439,9 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header, void swim_meta_header_bin_create(struct swim_meta_header_bin *header, - const struct sockaddr_in *src) + const struct sockaddr_in *src, bool has_routing) { - int map_size = 1 + SWIM_INADDR_BIN_SIZE; + int map_size = 1 + SWIM_INADDR_BIN_SIZE + has_routing; assert(mp_sizeof_map(map_size) == 1); header->m_header = 0x80 | map_size; header->k_version = SWIM_META_TARANTOOL_VERSION; @@ -452,6 +452,64 @@ swim_meta_header_bin_create(struct swim_meta_header_bin *header, swim_inaddr_bin_fill(&header->src_addr, src); } +/** + * Decode meta routing section into meta definition object. + * @param[out] def Definition to decode into. + * @param[in][out] pos MessagePack buffer to decode. + * @param end End of the MessagePack buffer. + * + * @retval 0 Success. + * @retval -1 Error. + */ +static int +swim_meta_def_decode_route(struct swim_meta_def *def, const char **pos, + const char *end) +{ + const char *prefix = "invalid routing section:"; + uint32_t size; + def->route.src.sin_family = AF_INET; + def->route.dst.sin_family = AF_INET; + if (swim_decode_map(pos, end, &size, prefix, "route") != 0) + return -1; + for (uint32_t i = 0; i < size; ++i) { + uint64_t key; + if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) + return -1; + switch (key) { + case SWIM_ROUTE_SRC_ADDRESS: + if (swim_decode_ip(&def->route.src, pos, end, prefix, + "source address") != 0) + return -1; + break; + case SWIM_ROUTE_SRC_PORT: + if (swim_decode_port(&def->route.src, pos, end, + prefix, "source port") != 0) + return -1; + break; + case SWIM_ROUTE_DST_ADDRESS: + if (swim_decode_ip(&def->route.dst, pos, end, prefix, + "destination address") != 0) + return -1; + break; + case SWIM_ROUTE_DST_PORT: + if (swim_decode_port(&def->route.dst, pos, end, + prefix, "destination port") != 0) + return -1; + break; + default: + diag_set(SwimError, "%s unknown key", prefix); + return -1; + } + } + if (swim_check_inaddr_not_empty(&def->route.src, prefix, + "source") != 0 || + swim_check_inaddr_not_empty(&def->route.dst, prefix, + "destination") != 0) + return -1; + def->is_route_specified = true; + return 0; +} + int swim_meta_def_decode(struct swim_meta_def *def, const char **pos, const char *end) @@ -467,6 +525,10 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos, if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) return -1; switch (key) { + case SWIM_META_ROUTING: + if (swim_meta_def_decode_route(def, pos, end) != 0) + return -1; + break; case SWIM_META_TARANTOOL_VERSION: if (swim_decode_uint(pos, end, &key, prefix, "version") != 0) @@ -509,3 +571,20 @@ swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation) header->m_incarnation = 0xcf; header->v_incarnation = mp_bswap_u64(incarnation); } + +void +swim_route_bin_create(struct swim_route_bin *route, + const struct sockaddr_in *src, + const struct sockaddr_in *dst) +{ + int map_size = SWIM_INADDR_BIN_SIZE * 2; + assert(mp_sizeof_map(map_size) == 1); + route->k_routing = SWIM_META_ROUTING; + route->m_routing = 0x80 | map_size; + swim_inaddr_bin_create(&route->src_addr, SWIM_ROUTE_SRC_ADDRESS, + SWIM_ROUTE_SRC_PORT); + swim_inaddr_bin_create(&route->dst_addr, SWIM_ROUTE_DST_ADDRESS, + SWIM_ROUTE_DST_PORT); + swim_inaddr_bin_fill(&route->src_addr, src); + swim_inaddr_bin_fill(&route->dst_addr, dst); +} diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index ddf9e28db..f70ac708a 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -51,7 +51,13 @@ enum { * | { | * | SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,| * | SWIM_META_SRC_ADDRESS: uint, ip, | - * | SWIM_META_SRC_PORT: uint, port | + * | SWIM_META_SRC_PORT: uint, port, | + * | SWIM_META_ROUTING: { | + * | SWIM_ROUTE_SRC_ADDRESS: uint, ip, | + * | SWIM_ROUTE_SRC_PORT: uint, port, | + * | SWIM_ROUTE_DST_ADDRESS: uint, ip, | + * | SWIM_ROUTE_DST_PORT: uint, port | + * | } | * | } | * +-------------------Protocol logic section--------------------+ * | { | @@ -421,6 +427,33 @@ enum swim_meta_key { */ SWIM_META_SRC_ADDRESS, SWIM_META_SRC_PORT, + /** + * Routing section allows to specify routes of up to 3 + * nodes: source, proxy, and destination. It contains two + * addresses - the message originator and the final + * destination. Here is an example of an indirect message + * transmission. Assume, there are 3 nodes: S1, S2, S3. + * S1 sends a message to S3 via S2. The following steps + * are executed in order to deliver the message: + * + * S1 -> S2 + * { src: S1, routing: {src: S1, dst: S3}, body: ... } + * + * S2 receives the message and sees: routing.dst != S2 - + * it is a foreign packet. S2 forwards it to S3 preserving + * all the data - body and routing sections. + * + * + * S2 -> S3 + * {src: S2, routing: {src: S1, dst: S3}, body: ...} + * + * S3 receives the message and sees: routing.dst == S3 - + * the message is delivered. If S3 wants to answer, it + * sends a response via the same proxy. It knows, that the + * message was delivered from S2, and sends an answer via + * S2. + */ + SWIM_META_ROUTING, }; /** @@ -432,7 +465,7 @@ enum swim_meta_key { * separate MessagePack map. */ struct PACKED swim_meta_header_bin { - /** mp_encode_map(3) */ + /** mp_encode_map(3 or 4) */ uint8_t m_header; /** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */ @@ -448,7 +481,7 @@ struct PACKED swim_meta_header_bin { /** Initialize meta section. */ void swim_meta_header_bin_create(struct swim_meta_header_bin *header, - const struct sockaddr_in *src); + const struct sockaddr_in *src, bool has_routing); /** Meta definition. */ struct swim_meta_def { @@ -456,6 +489,18 @@ struct swim_meta_def { uint32_t version; /** Source of the message. */ struct sockaddr_in src; + /** Route source and destination. */ + struct { + struct sockaddr_in src; + struct sockaddr_in dst; + } route; + /** + * True, if both @a src and @a dst are not empty. This + * flag is just sugar so as not to check both addresses + * manually. Also in future more fields could be added + * here. + */ + bool is_route_specified; }; /** @@ -471,6 +516,43 @@ int swim_meta_def_decode(struct swim_meta_def *def, const char **pos, const char *end); +enum swim_route_key { + /** + * True source of the packet. Can be different from the + * packet sender. It is expected that a response should be + * sent back to this address. Maybe indirectly through the + * same proxy. + */ + SWIM_ROUTE_SRC_ADDRESS = 0, + SWIM_ROUTE_SRC_PORT, + /** + * True destination of the packet. Can be different from + * this instance, receiver. If it is for another instance, + * then this packet is forwarded to the latter. + */ + SWIM_ROUTE_DST_ADDRESS, + SWIM_ROUTE_DST_PORT, + swim_route_key_MAX, +}; + +/** Route section template. Describes source, destination. */ +struct PACKED swim_route_bin { + /** mp_encode_uint(SWIM_ROUTING) */ + uint8_t k_routing; + /** mp_encode_map(4) */ + uint8_t m_routing; + /** SWIM_ROUTE_SRC_ADDRESS and SWIM_ROUTE_SRC_PORT. */ + struct swim_inaddr_bin src_addr; + /** SWIM_ROUTE_DST_ADDRESS and SWIM_ROUTE_DST_PORT. */ + struct swim_inaddr_bin dst_addr; +}; + +/** Initialize routing section. */ +void +swim_route_bin_create(struct swim_route_bin *route, + const struct sockaddr_in *src, + const struct sockaddr_in *dst); + /** }}} Meta component */ enum swim_quit_key { diff --git a/test/unit/swim_proto.c b/test/unit/swim_proto.c index 64487fb7d..d7fafc7f0 100644 --- a/test/unit/swim_proto.c +++ b/test/unit/swim_proto.c @@ -189,11 +189,64 @@ swim_test_meta(void) footer(); } +static void +swim_test_route(void) +{ + header(); + plan(5); + + char buffer[1024]; + struct swim_meta_def mdef; + struct swim_meta_header_bin header; + struct sockaddr_in addr; + addr.sin_port = htons(1234); + fail_if(inet_aton("127.0.0.1", &addr.sin_addr) == 0); + + swim_meta_header_bin_create(&header, &addr, true); + memcpy(buffer, &header, sizeof(header)); + char *last_valid = buffer + sizeof(header); + char *end = last_valid; + const char *pos = buffer; + + is(swim_meta_def_decode(&mdef, &pos, end), -1, + "route was expected, but map is too short"); + + end = mp_encode_uint(end, SWIM_META_ROUTING); + pos = buffer; + is(swim_meta_def_decode(&mdef, &pos, end), -1, "no route map"); + + end = mp_encode_map(end, 0); + pos = buffer; + is(swim_meta_def_decode(&mdef, &pos, end), -1, "empty route map"); + + struct swim_route_bin route; + struct sockaddr_in src, dst; + memset(&src, 0, sizeof(src)); + memset(&dst, 0, sizeof(dst)); + swim_route_bin_create(&route, &src, &dst); + memcpy(last_valid, &route, sizeof(route)); + end = last_valid + sizeof(route); + pos = buffer; + is(swim_meta_def_decode(&mdef, &pos, end), -1, "zero addresses"); + + src.sin_port = 1; + src.sin_addr = addr.sin_addr; + dst.sin_port = 1; + dst.sin_addr = addr.sin_addr; + swim_route_bin_create(&route, &src, &dst); + memcpy(last_valid, &route, sizeof(route)); + pos = buffer; + is(swim_meta_def_decode(&mdef, &pos, end), 0, "normal route"); + + check_plan(); + footer(); +} + int main() { header(); - plan(2); + plan(3); memory_init(); fiber_init(fiber_c_invoke); int fd = open("log.txt", O_TRUNC); @@ -203,6 +256,7 @@ main() swim_test_member_def(); swim_test_meta(); + swim_test_route(); say_logger_free(); fiber_free(); diff --git a/test/unit/swim_proto.result b/test/unit/swim_proto.result index c22769161..8a41a5d40 100644 --- a/test/unit/swim_proto.result +++ b/test/unit/swim_proto.result @@ -1,5 +1,5 @@ *** main *** -1..2 +1..3 *** swim_test_member_def *** 1..12 ok 1 - not map header @@ -28,4 +28,13 @@ ok 1 - subtests ok 8 - normal meta ok 2 - subtests *** swim_test_meta: done *** + *** swim_test_route *** + 1..5 + ok 1 - route was expected, but map is too short + ok 2 - no route map + ok 3 - empty route map + ok 4 - zero addresses + ok 5 - normal route +ok 3 - subtests + *** swim_test_route: done *** *** main: done *** -- 2.20.1 (Apple Git-117)