From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v4 09/12] [RAW] swim: introduce routing Date: Thu, 31 Jan 2019 00:28:38 +0300 Message-Id: <26059334ce87d4df7646fba4a7132161f9cc4e7a.1548883137.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org, vdavydov.dev@gmail.com List-ID: It is going to be used for indirect ping/acks. Part of #3234 --- src/lib/swim/swim.c | 4 +- src/lib/swim/swim_io.c | 100 +++++++++++++++++++++++++++++++++++--- src/lib/swim/swim_io.h | 17 ++++++- src/lib/swim/swim_proto.c | 92 ++++++++++++++++++++++++++++++++++- src/lib/swim/swim_proto.h | 76 +++++++++++++++++++++++++++-- 5 files changed, 274 insertions(+), 15 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 78dbc6092..5cec3789a 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -1200,8 +1200,10 @@ swim_process_dissemination(struct swim *swim, const char **pos, const char *end) /** Process a new message. */ static void swim_on_input(struct swim_scheduler *scheduler, const char *pos, - const char *end, const struct sockaddr_in *src) + const char *end, const struct sockaddr_in *src, + const struct sockaddr_in *proxy) { + (void) proxy; const char *msg_pref = "invalid message:"; struct swim *swim = swim_by_scheduler(scheduler); struct tt_uuid uuid; diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index 170d7af77..a8fb1f588 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -54,6 +54,50 @@ swim_packet_create(struct swim_packet *packet) swim_packet_alloc_meta(packet, sizeof(struct swim_meta_header_bin)); } +/** Fill metadata prefix of a packet. */ +static inline void +swim_packet_build_meta(struct swim_packet *packet, + const struct sockaddr_in *src, + const struct sockaddr_in *route_src, + const struct sockaddr_in *route_dst) +{ + char *meta = packet->meta; + char *end = packet->body; + /* + * Meta is already built, do nothing. It is used for + * packet forwarding, when route source != this instance. + */ + if (meta == end) + return; + struct swim_meta_header_bin header; + struct swim_route_bin route; + assert(meta + sizeof(header) <= end); + swim_meta_header_bin_create(&header, src, route_dst != NULL); + memcpy(meta, &header, sizeof(header)); + if (route_dst != NULL) { + meta += sizeof(header); + assert(meta + sizeof(route) <= end); + swim_route_bin_create(&route, route_src, route_dst); + memcpy(meta, &route, sizeof(route)); + } + /* Now the meta is build and the body consumes it. */ + packet->body = packet->meta; +} + +void +swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy) +{ + /* + * Meta should be reserved before body encoding is + * started. Otherwise it would be necessary to move + * already encoded body, maybe losing its tail. + */ + assert(swim_packet_body_size(&task->packet) == 0); + task->proxy = *proxy; + task->is_proxy_specified = true; + swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin)); +} + void swim_task_create(struct swim_task *task, swim_task_f complete, swim_task_f cancel) @@ -178,16 +222,24 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) struct swim_task *task = rlist_shift_entry(&scheduler->queue_output, struct swim_task, in_queue_output); + const struct sockaddr_in *src = &scheduler->transport.addr; + const struct sockaddr_in *dst, *proxy_dst, *proxy_src; + if (task->is_proxy_specified) { + dst = &task->proxy; + proxy_dst = dst; + proxy_src = src; + } else { + dst = &task->dst; + proxy_dst = NULL; + proxy_src = NULL; + } + swim_packet_build_meta(&task->packet, src, proxy_src, proxy_dst); say_verbose("SWIM: send to %s", - sio_strfaddr((struct sockaddr *) &task->dst, - sizeof(task->dst))); - struct swim_meta_header_bin header; - swim_meta_header_bin_create(&header, &scheduler->transport.addr); - memcpy(task->packet.meta, &header, sizeof(header)); + sio_strfaddr((struct sockaddr *) dst, sizeof(*dst))); int rc = swim_transport_send(&scheduler->transport, task->packet.buf, task->packet.pos - task->packet.buf, - (const struct sockaddr *) &task->dst, - sizeof(task->dst)); + (const struct sockaddr *) dst, + sizeof(*dst)); if (rc != 0) diag_log(); if (task->complete != NULL) @@ -216,9 +268,41 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events) sio_strfaddr((struct sockaddr *) &src, len)); struct swim_meta_def meta; const char *pos = buf, *end = pos + size; + struct sockaddr_in *self = &scheduler->transport.addr; if (swim_meta_def_decode(&meta, &pos, end) < 0) goto error; - scheduler->on_input(scheduler, pos, end, &meta.src); + /* + * Check if this instance is not a receiver and possibly + * forward the packet. + */ + if (! meta.is_route_specified) { + scheduler->on_input(scheduler, pos, end, &meta.src, NULL); + } else if (meta.route.dst.sin_port == self->sin_port && + meta.route.dst.sin_addr.s_addr == self->sin_addr.s_addr) { + scheduler->on_input(scheduler, pos, end, &meta.route.src, + &meta.src); + } else { + /* Forward the packet. */ + struct swim_task *task = swim_task_new(swim_task_delete_cb, + swim_task_delete_cb); + if (task == NULL) + goto error; + swim_task_proxy(task, &meta.route.dst); + /* + * Meta should be rebuilt with the different + * source address - this instance. It is used by a + * receiver to send a reply through this instance + * again. + */ + swim_packet_build_meta(&task->packet, self, &meta.route.src, + &meta.route.dst); + /* Copy the original body without a touch. */ + size = end - pos; + char *body = swim_packet_alloc(&task->packet, size); + assert(body != NULL); + memcpy(body, pos, size); + swim_task_send(task, &meta.route.dst, scheduler); + } return; error: diag_log(); diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index 4d694857d..0ba8972f0 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -138,7 +138,8 @@ swim_packet_create(struct swim_packet *packet); typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler, const char *buf, const char *end, - const struct sockaddr_in *src); + const struct sockaddr_in *src, + const struct sockaddr_in *proxy); /** Planner and executor of input and output operations.*/ struct swim_scheduler { @@ -206,10 +207,24 @@ struct swim_task { struct swim_packet packet; /** Destination address. */ struct sockaddr_in dst; + /** + * Optional proxy via which the destination should be + * reached. + */ + struct sockaddr_in proxy; + /** True if a proxy is specified. */ + bool is_proxy_specified; /** Place in a queue of tasks. */ struct rlist in_queue_output; }; +/** + * Set the proxy for the task. Before sending this proxy will be + * dumped into metadata. + */ +void +swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy); + /** * Put the task into a queue of tasks. Eventually it will be sent. */ diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 284d35695..8b1ed76a7 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -429,9 +429,9 @@ swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header, void swim_meta_header_bin_create(struct swim_meta_header_bin *header, - const struct sockaddr_in *src) + const struct sockaddr_in *src, bool has_routing) { - header->m_header = 0x83; + header->m_header = 0x83 + has_routing; header->k_version = SWIM_META_TARANTOOL_VERSION; header->m_version = 0xce; header->v_version = mp_bswap_u32(tarantool_version_id()); @@ -443,6 +443,69 @@ swim_meta_header_bin_create(struct swim_meta_header_bin *header, header->v_port = mp_bswap_u16(src->sin_port); } +/** + * Decode meta routing section into meta definition object. + * @param[out] def Definition to decode into. + * @param[in][out] pos MessagePack buffer to decode. + * @param end End of the MessagePack buffer. + * + * @retval 0 Success. + * @retval -1 Error. + */ +static int +swim_meta_def_decode_route(struct swim_meta_def *def, const char **pos, + const char *end) +{ + const char *msg_pref = "invalid routing section:"; + uint32_t size; + if (swim_decode_map(pos, end, &size, msg_pref, "route") != 0) + return -1; + for (uint32_t i = 0; i < size; ++i) { + uint64_t key; + if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0) + return -1; + switch (key) { + case SWIM_ROUTE_SRC_ADDRESS: + if (swim_decode_ip(&def->route.src, pos, end, msg_pref, + "source address") != 0) + return -1; + break; + case SWIM_ROUTE_SRC_PORT: + if (swim_decode_port(&def->route.src, pos, end, + msg_pref, "source port") != 0) + return -1; + break; + case SWIM_ROUTE_DST_ADDRESS: + if (swim_decode_ip(&def->route.dst, pos, end, msg_pref, + "destination address") != 0) + return -1; + break; + case SWIM_ROUTE_DST_PORT: + if (swim_decode_port(&def->route.dst, pos, end, + msg_pref, "destination port") != 0) + return -1; + break; + default: + diag_set(SwimError, "%s unknown key", msg_pref); + return -1; + } + } + if (def->route.src.sin_port == 0 || + def->route.src.sin_addr.s_addr == 0) { + diag_set(SwimError, "%s source address should be specified", + msg_pref); + return -1; + } + if (def->route.dst.sin_port == 0 || + def->route.dst.sin_addr.s_addr == 0) { + diag_set(SwimError, "%s destination address should be "\ + "specified", msg_pref); + return -1; + } + def->is_route_specified = true; + return 0; +} + int swim_meta_def_decode(struct swim_meta_def *def, const char **pos, const char *end) @@ -457,6 +520,10 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos, if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0) return -1; switch (key) { + case SWIM_META_ROUTING: + if (swim_meta_def_decode_route(def, pos, end) != 0) + return -1; + break; case SWIM_META_TARANTOOL_VERSION: if (swim_decode_uint(pos, end, &key, msg_pref, "version") != 0) @@ -493,3 +560,24 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos, } return 0; } + +void +swim_route_bin_create(struct swim_route_bin *route, + const struct sockaddr_in *src, + const struct sockaddr_in *dst) +{ + route->k_routing = SWIM_META_ROUTING; + route->m_routing = 0x84; + route->k_src_addr = SWIM_ROUTE_SRC_ADDRESS; + route->m_src_addr = 0xce; + route->v_src_addr = mp_bswap_u32(src->sin_addr.s_addr); + route->k_src_port = SWIM_ROUTE_SRC_PORT; + route->m_src_port = 0xcd; + route->v_src_port = mp_bswap_u16(src->sin_port); + route->k_dst_addr = SWIM_ROUTE_DST_ADDRESS; + route->m_dst_addr = 0xce; + route->v_dst_addr = mp_bswap_u32(dst->sin_addr.s_addr); + route->k_dst_port = SWIM_ROUTE_DST_PORT; + route->m_dst_port = 0xcd; + route->v_dst_port = mp_bswap_u16(dst->sin_port); +} diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index 353605c35..fe9eb85c5 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -48,7 +48,13 @@ enum { * | { | * | SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,| * | SWIM_META_SRC_ADDRESS: uint, ip, | - * | SWIM_META_SRC_PORT: uint, port | + * | SWIM_META_SRC_PORT: uint, port, | + * | SWIM_META_ROUTING: { | + * | SWIM_ROUTE_SRC_ADDRESS: uint, ip, | + * | SWIM_ROUTE_SRC_PORT: uint, port, | + * | SWIM_ROUTE_DST_ADDRESS: uint, ip, | + * | SWIM_ROUTE_DST_PORT: uint, port | + * | } | * | } | * +-------------------Protocol logic section--------------------+ * | { | @@ -458,6 +464,7 @@ enum swim_meta_key { */ SWIM_META_SRC_ADDRESS, SWIM_META_SRC_PORT, + SWIM_META_ROUTING, }; /** @@ -469,7 +476,7 @@ enum swim_meta_key { * separate MessagePack map. */ struct PACKED swim_meta_header_bin { - /** mp_encode_map(3) */ + /** mp_encode_map(3 or 4) */ uint8_t m_header; /** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */ @@ -494,7 +501,7 @@ struct PACKED swim_meta_header_bin { /** Initialize meta section. */ void swim_meta_header_bin_create(struct swim_meta_header_bin *header, - const struct sockaddr_in *src); + const struct sockaddr_in *src, bool has_routing); /** Meta definition. */ struct swim_meta_def { @@ -502,6 +509,12 @@ struct swim_meta_def { uint32_t version; /** Source of the message. */ struct sockaddr_in src; + /** Route source and destination. */ + bool is_route_specified; + struct { + struct sockaddr_in src; + struct sockaddr_in dst; + } route; }; /** @@ -517,6 +530,63 @@ int swim_meta_def_decode(struct swim_meta_def *def, const char **pos, const char *end); +enum swim_route_key { + /** + * True source of the packet. Can be different from the + * packet sender. It is expected that the answer should + * be sent back to this address. Maybe indirectly through + * the same proxy. + */ + SWIM_ROUTE_SRC_ADDRESS = 0, + SWIM_ROUTE_SRC_PORT, + /** + * True destination of the packet. Can be different from + * this instance, receiver. If it is for another instance, + * then this packet is forwarded to the latter. + */ + SWIM_ROUTE_DST_ADDRESS, + SWIM_ROUTE_DST_PORT, + swim_route_key_MAX, +}; + +/** Route section template. Describes source, destination. */ +struct PACKED swim_route_bin { + /** mp_encode_uint(SWIM_ROUTING) */ + uint8_t k_routing; + /** mp_encode_map(4) */ + uint8_t m_routing; + + /** mp_encode_uint(SWIM_ROUTE_SRC_ADDRESS) */ + uint8_t k_src_addr; + /** mp_encode_uint(addr.sin_addr.s_addr) */ + uint8_t m_src_addr; + uint32_t v_src_addr; + + /** mp_encode_uint(SWIM_ROUTE_SRC_PORT) */ + uint8_t k_src_port; + /** mp_encode_uint(addr.sin_port) */ + uint8_t m_src_port; + uint16_t v_src_port; + + /** mp_encode_uint(SWIM_ROUTE_DST_ADDRESS) */ + uint8_t k_dst_addr; + /** mp_encode_uint(addr.sin_addr.s_addr) */ + uint8_t m_dst_addr; + uint32_t v_dst_addr; + + /** mp_encode_uint(SWIM_ROUTE_DST_PORT) */ + uint8_t k_dst_port; + /** mp_encode_uint(addr.sin_port) */ + uint8_t m_dst_port; + uint16_t v_dst_port; +}; + +/** Initialize routing section. */ +void +swim_route_bin_create(struct swim_route_bin *route, + const struct sockaddr_in *src, + const struct sockaddr_in *dst); + /** }}} Meta component */ /** -- 2.17.2 (Apple Git-113)