[PATCH v4 09/12] [RAW] swim: introduce routing
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Jan 31 00:28:38 MSK 2019
It is going to be used for indirect ping/acks.
Part of #3234
---
src/lib/swim/swim.c | 4 +-
src/lib/swim/swim_io.c | 100 +++++++++++++++++++++++++++++++++++---
src/lib/swim/swim_io.h | 17 ++++++-
src/lib/swim/swim_proto.c | 92 ++++++++++++++++++++++++++++++++++-
src/lib/swim/swim_proto.h | 76 +++++++++++++++++++++++++++--
5 files changed, 274 insertions(+), 15 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 78dbc6092..5cec3789a 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1200,8 +1200,10 @@ swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
/** Process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler, const char *pos,
- const char *end, const struct sockaddr_in *src)
+ const char *end, const struct sockaddr_in *src,
+ const struct sockaddr_in *proxy)
{
+ (void) proxy;
const char *msg_pref = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 170d7af77..a8fb1f588 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -54,6 +54,50 @@ swim_packet_create(struct swim_packet *packet)
swim_packet_alloc_meta(packet, sizeof(struct swim_meta_header_bin));
}
+/** Fill metadata prefix of a packet. */
+static inline void
+swim_packet_build_meta(struct swim_packet *packet,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *route_src,
+ const struct sockaddr_in *route_dst)
+{
+ char *meta = packet->meta;
+ char *end = packet->body;
+ /*
+ * Meta is already built, do nothing. It is used for
+ * packet forwarding, when route source != this instance.
+ */
+ if (meta == end)
+ return;
+ struct swim_meta_header_bin header;
+ struct swim_route_bin route;
+ assert(meta + sizeof(header) <= end);
+ swim_meta_header_bin_create(&header, src, route_dst != NULL);
+ memcpy(meta, &header, sizeof(header));
+ if (route_dst != NULL) {
+ meta += sizeof(header);
+ assert(meta + sizeof(route) <= end);
+ swim_route_bin_create(&route, route_src, route_dst);
+ memcpy(meta, &route, sizeof(route));
+ }
+ /* Now the meta is build and the body consumes it. */
+ packet->body = packet->meta;
+}
+
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
+{
+ /*
+ * Meta should be reserved before body encoding is
+ * started. Otherwise it would be necessary to move
+ * already encoded body, maybe losing its tail.
+ */
+ assert(swim_packet_body_size(&task->packet) == 0);
+ task->proxy = *proxy;
+ task->is_proxy_specified = true;
+ swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin));
+}
+
void
swim_task_create(struct swim_task *task, swim_task_f complete,
swim_task_f cancel)
@@ -178,16 +222,24 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
struct swim_task *task =
rlist_shift_entry(&scheduler->queue_output, struct swim_task,
in_queue_output);
+ const struct sockaddr_in *src = &scheduler->transport.addr;
+ const struct sockaddr_in *dst, *proxy_dst, *proxy_src;
+ if (task->is_proxy_specified) {
+ dst = &task->proxy;
+ proxy_dst = dst;
+ proxy_src = src;
+ } else {
+ dst = &task->dst;
+ proxy_dst = NULL;
+ proxy_src = NULL;
+ }
+ swim_packet_build_meta(&task->packet, src, proxy_src, proxy_dst);
say_verbose("SWIM: send to %s",
- sio_strfaddr((struct sockaddr *) &task->dst,
- sizeof(task->dst)));
- struct swim_meta_header_bin header;
- swim_meta_header_bin_create(&header, &scheduler->transport.addr);
- memcpy(task->packet.meta, &header, sizeof(header));
+ sio_strfaddr((struct sockaddr *) dst, sizeof(*dst)));
int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
task->packet.pos - task->packet.buf,
- (const struct sockaddr *) &task->dst,
- sizeof(task->dst));
+ (const struct sockaddr *) dst,
+ sizeof(*dst));
if (rc != 0)
diag_log();
if (task->complete != NULL)
@@ -216,9 +268,41 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
sio_strfaddr((struct sockaddr *) &src, len));
struct swim_meta_def meta;
const char *pos = buf, *end = pos + size;
+ struct sockaddr_in *self = &scheduler->transport.addr;
if (swim_meta_def_decode(&meta, &pos, end) < 0)
goto error;
- scheduler->on_input(scheduler, pos, end, &meta.src);
+ /*
+ * Check if this instance is not a receiver and possibly
+ * forward the packet.
+ */
+ if (! meta.is_route_specified) {
+ scheduler->on_input(scheduler, pos, end, &meta.src, NULL);
+ } else if (meta.route.dst.sin_port == self->sin_port &&
+ meta.route.dst.sin_addr.s_addr == self->sin_addr.s_addr) {
+ scheduler->on_input(scheduler, pos, end, &meta.route.src,
+ &meta.src);
+ } else {
+ /* Forward the packet. */
+ struct swim_task *task = swim_task_new(swim_task_delete_cb,
+ swim_task_delete_cb);
+ if (task == NULL)
+ goto error;
+ swim_task_proxy(task, &meta.route.dst);
+ /*
+ * Meta should be rebuilt with the different
+ * source address - this instance. It is used by a
+ * receiver to send a reply through this instance
+ * again.
+ */
+ swim_packet_build_meta(&task->packet, self, &meta.route.src,
+ &meta.route.dst);
+ /* Copy the original body without a touch. */
+ size = end - pos;
+ char *body = swim_packet_alloc(&task->packet, size);
+ assert(body != NULL);
+ memcpy(body, pos, size);
+ swim_task_send(task, &meta.route.dst, scheduler);
+ }
return;
error:
diag_log();
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 4d694857d..0ba8972f0 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -138,7 +138,8 @@ swim_packet_create(struct swim_packet *packet);
typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler,
const char *buf, const char *end,
- const struct sockaddr_in *src);
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *proxy);
/** Planner and executor of input and output operations.*/
struct swim_scheduler {
@@ -206,10 +207,24 @@ struct swim_task {
struct swim_packet packet;
/** Destination address. */
struct sockaddr_in dst;
+ /**
+ * Optional proxy via which the destination should be
+ * reached.
+ */
+ struct sockaddr_in proxy;
+ /** True if a proxy is specified. */
+ bool is_proxy_specified;
/** Place in a queue of tasks. */
struct rlist in_queue_output;
};
+/**
+ * Set the proxy for the task. Before sending this proxy will be
+ * dumped into metadata.
+ */
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy);
+
/**
* Put the task into a queue of tasks. Eventually it will be sent.
*/
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 284d35695..8b1ed76a7 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -429,9 +429,9 @@ swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header,
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
- const struct sockaddr_in *src)
+ const struct sockaddr_in *src, bool has_routing)
{
- header->m_header = 0x83;
+ header->m_header = 0x83 + has_routing;
header->k_version = SWIM_META_TARANTOOL_VERSION;
header->m_version = 0xce;
header->v_version = mp_bswap_u32(tarantool_version_id());
@@ -443,6 +443,69 @@ swim_meta_header_bin_create(struct swim_meta_header_bin *header,
header->v_port = mp_bswap_u16(src->sin_port);
}
+/**
+ * Decode meta routing section into meta definition object.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos MessagePack buffer to decode.
+ * @param end End of the MessagePack buffer.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+static int
+swim_meta_def_decode_route(struct swim_meta_def *def, const char **pos,
+ const char *end)
+{
+ const char *msg_pref = "invalid routing section:";
+ uint32_t size;
+ if (swim_decode_map(pos, end, &size, msg_pref, "route") != 0)
+ return -1;
+ for (uint32_t i = 0; i < size; ++i) {
+ uint64_t key;
+ if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0)
+ return -1;
+ switch (key) {
+ case SWIM_ROUTE_SRC_ADDRESS:
+ if (swim_decode_ip(&def->route.src, pos, end, msg_pref,
+ "source address") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_SRC_PORT:
+ if (swim_decode_port(&def->route.src, pos, end,
+ msg_pref, "source port") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_DST_ADDRESS:
+ if (swim_decode_ip(&def->route.dst, pos, end, msg_pref,
+ "destination address") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_DST_PORT:
+ if (swim_decode_port(&def->route.dst, pos, end,
+ msg_pref, "destination port") != 0)
+ return -1;
+ break;
+ default:
+ diag_set(SwimError, "%s unknown key", msg_pref);
+ return -1;
+ }
+ }
+ if (def->route.src.sin_port == 0 ||
+ def->route.src.sin_addr.s_addr == 0) {
+ diag_set(SwimError, "%s source address should be specified",
+ msg_pref);
+ return -1;
+ }
+ if (def->route.dst.sin_port == 0 ||
+ def->route.dst.sin_addr.s_addr == 0) {
+ diag_set(SwimError, "%s destination address should be "\
+ "specified", msg_pref);
+ return -1;
+ }
+ def->is_route_specified = true;
+ return 0;
+}
+
int
swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
const char *end)
@@ -457,6 +520,10 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0)
return -1;
switch (key) {
+ case SWIM_META_ROUTING:
+ if (swim_meta_def_decode_route(def, pos, end) != 0)
+ return -1;
+ break;
case SWIM_META_TARANTOOL_VERSION:
if (swim_decode_uint(pos, end, &key, msg_pref,
"version") != 0)
@@ -493,3 +560,24 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
}
return 0;
}
+
+void
+swim_route_bin_create(struct swim_route_bin *route,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *dst)
+{
+ route->k_routing = SWIM_META_ROUTING;
+ route->m_routing = 0x84;
+ route->k_src_addr = SWIM_ROUTE_SRC_ADDRESS;
+ route->m_src_addr = 0xce;
+ route->v_src_addr = mp_bswap_u32(src->sin_addr.s_addr);
+ route->k_src_port = SWIM_ROUTE_SRC_PORT;
+ route->m_src_port = 0xcd;
+ route->v_src_port = mp_bswap_u16(src->sin_port);
+ route->k_dst_addr = SWIM_ROUTE_DST_ADDRESS;
+ route->m_dst_addr = 0xce;
+ route->v_dst_addr = mp_bswap_u32(dst->sin_addr.s_addr);
+ route->k_dst_port = SWIM_ROUTE_DST_PORT;
+ route->m_dst_port = 0xcd;
+ route->v_dst_port = mp_bswap_u16(dst->sin_port);
+}
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 353605c35..fe9eb85c5 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -48,7 +48,13 @@ enum {
* | { |
* | SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,|
* | SWIM_META_SRC_ADDRESS: uint, ip, |
- * | SWIM_META_SRC_PORT: uint, port |
+ * | SWIM_META_SRC_PORT: uint, port, |
+ * | SWIM_META_ROUTING: { |
+ * | SWIM_ROUTE_SRC_ADDRESS: uint, ip, |
+ * | SWIM_ROUTE_SRC_PORT: uint, port, |
+ * | SWIM_ROUTE_DST_ADDRESS: uint, ip, |
+ * | SWIM_ROUTE_DST_PORT: uint, port |
+ * | } |
* | } |
* +-------------------Protocol logic section--------------------+
* | { |
@@ -458,6 +464,7 @@ enum swim_meta_key {
*/
SWIM_META_SRC_ADDRESS,
SWIM_META_SRC_PORT,
+ SWIM_META_ROUTING,
};
/**
@@ -469,7 +476,7 @@ enum swim_meta_key {
* separate MessagePack map.
*/
struct PACKED swim_meta_header_bin {
- /** mp_encode_map(3) */
+ /** mp_encode_map(3 or 4) */
uint8_t m_header;
/** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */
@@ -494,7 +501,7 @@ struct PACKED swim_meta_header_bin {
/** Initialize meta section. */
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
- const struct sockaddr_in *src);
+ const struct sockaddr_in *src, bool has_routing);
/** Meta definition. */
struct swim_meta_def {
@@ -502,6 +509,12 @@ struct swim_meta_def {
uint32_t version;
/** Source of the message. */
struct sockaddr_in src;
+ /** Route source and destination. */
+ bool is_route_specified;
+ struct {
+ struct sockaddr_in src;
+ struct sockaddr_in dst;
+ } route;
};
/**
@@ -517,6 +530,63 @@ int
swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
const char *end);
+enum swim_route_key {
+ /**
+ * True source of the packet. Can be different from the
+ * packet sender. It is expected that the answer should
+ * be sent back to this address. Maybe indirectly through
+ * the same proxy.
+ */
+ SWIM_ROUTE_SRC_ADDRESS = 0,
+ SWIM_ROUTE_SRC_PORT,
+ /**
+ * True destination of the packet. Can be different from
+ * this instance, receiver. If it is for another instance,
+ * then this packet is forwarded to the latter.
+ */
+ SWIM_ROUTE_DST_ADDRESS,
+ SWIM_ROUTE_DST_PORT,
+ swim_route_key_MAX,
+};
+
+/** Route section template. Describes source, destination. */
+struct PACKED swim_route_bin {
+ /** mp_encode_uint(SWIM_ROUTING) */
+ uint8_t k_routing;
+ /** mp_encode_map(4) */
+ uint8_t m_routing;
+
+ /** mp_encode_uint(SWIM_ROUTE_SRC_ADDRESS) */
+ uint8_t k_src_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_src_addr;
+ uint32_t v_src_addr;
+
+ /** mp_encode_uint(SWIM_ROUTE_SRC_PORT) */
+ uint8_t k_src_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_src_port;
+ uint16_t v_src_port;
+
+ /** mp_encode_uint(SWIM_ROUTE_DST_ADDRESS) */
+ uint8_t k_dst_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_dst_addr;
+ uint32_t v_dst_addr;
+
+ /** mp_encode_uint(SWIM_ROUTE_DST_PORT) */
+ uint8_t k_dst_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_dst_port;
+ uint16_t v_dst_port;
+};
+
+/** Initialize routing section. */
+void
+swim_route_bin_create(struct swim_route_bin *route,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *dst);
+
/** }}} Meta component */
/**
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list