[tarantool-patches] [PATCH v2 2/3] swim: introduce routing

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Apr 24 23:21:59 MSK 2019


Before the patch SWIM packets were being sent quite
straightforward from one instance to another with transparent
routing on Internet Level of TCP/IP. But the SWIM paper
describes last yet not implemented component - suspicion
mechanism.

So as not to overload this message with suspicion details it is
enough to say that it makes possible sending a packet through an
intermediate SWIM instance, not directly.

This commit extends the SWIM protocol with a new transport-level
section named 'routing'. It allows to send indirect SWIM messages
transparently via packet forwarding implemented fully inside
transportation component, in swim_io.c.

Part of #3234
---
 src/lib/swim/swim.c         |  4 +-
 src/lib/swim/swim_io.c      | 96 +++++++++++++++++++++++++++++++++----
 src/lib/swim/swim_io.h      | 15 +++++-
 src/lib/swim/swim_proto.c   | 83 +++++++++++++++++++++++++++++++-
 src/lib/swim/swim_proto.h   | 88 ++++++++++++++++++++++++++++++++--
 test/unit/swim_proto.c      | 56 +++++++++++++++++++++-
 test/unit/swim_proto.result | 11 ++++-
 7 files changed, 336 insertions(+), 17 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index bb1ded713..1b4a4365d 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1430,8 +1430,10 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end,
 /** Process a new message. */
 static void
 swim_on_input(struct swim_scheduler *scheduler, const char *pos,
-	      const char *end, const struct sockaddr_in *src)
+	      const char *end, const struct sockaddr_in *src,
+	      const struct sockaddr_in *proxy)
 {
+	(void) proxy;
 	const char *prefix = "invalid message:";
 	struct swim *swim = swim_by_scheduler(scheduler);
 	struct tt_uuid uuid;
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index a55c15f30..c7ffc31d5 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -60,8 +60,11 @@ swim_packet_create(struct swim_packet *packet)
 /** Fill metadata prefix of a packet. */
 static inline void
 swim_packet_build_meta(struct swim_packet *packet,
-		       const struct sockaddr_in *src)
+		       const struct sockaddr_in *src,
+		       const struct sockaddr_in *route_src,
+		       const struct sockaddr_in *route_dst)
 {
+	assert((route_src != NULL) == (route_dst != NULL));
 	char *meta = packet->meta;
 	char *end = packet->body;
 	/*
@@ -71,9 +74,18 @@ swim_packet_build_meta(struct swim_packet *packet,
 	if (meta == end)
 		return;
 	struct swim_meta_header_bin header;
-	swim_meta_header_bin_create(&header, src);
-	assert(meta + sizeof(header) == end);
+	swim_meta_header_bin_create(&header, src, route_dst != NULL);
+	assert(meta + sizeof(header) <= end);
 	memcpy(meta, &header, sizeof(header));
+	meta += sizeof(header);
+	if (route_dst != NULL) {
+		struct swim_route_bin route;
+		swim_route_bin_create(&route, route_src, route_dst);
+		assert(meta + sizeof(route) <= end);
+		memcpy(meta, &route, sizeof(route));
+		meta += sizeof(route);
+	}
+	assert(meta == end);
 	/*
 	 * Once meta is built, it is consumed by the body. Used
 	 * not to rebuild the meta again if the task will be
@@ -82,6 +94,21 @@ swim_packet_build_meta(struct swim_packet *packet,
 	packet->body = packet->meta;
 }
 
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
+{
+	/*
+	 * Route meta should be reserved before body encoding is
+	 * started. Otherwise later it would be necessary to move
+	 * already encoded body, maybe having its tail trimmed off
+	 * because of limited UDP packet size.
+	 */
+	assert(swim_packet_body_size(&task->packet) == 0);
+	assert(! swim_inaddr_is_empty(proxy));
+	task->proxy = *proxy;
+	swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin));
+}
+
 void
 swim_task_create(struct swim_task *task, swim_task_f complete,
 		 swim_task_f cancel, const char *desc)
@@ -161,6 +188,11 @@ swim_bcast_task_delete_cb(struct swim_task *task,
 static int
 swim_bcast_task_next_addr(struct swim_bcast_task *task)
 {
+	/*
+	 * Broadcast + proxy is not supported yet, and barely it
+	 * will be needed anytime.
+	 */
+	assert(swim_inaddr_is_empty(&task->base.proxy));
 	for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) {
 		int flags = i->ifa_flags;
 		if ((flags & IFF_UP) == 0)
@@ -320,13 +352,23 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
 	struct swim_task *task =
 		rlist_shift_entry(&scheduler->queue_output, struct swim_task,
 				  in_queue_output);
+	const struct sockaddr_in *src = &scheduler->transport.addr;
+	const struct sockaddr_in *dst = &task->dst;
+	const char *dst_str = swim_inaddr_str(dst);
+	if (! swim_inaddr_is_empty(&task->proxy)) {
+		dst = &task->proxy;
+		dst_str = tt_sprintf("%s via %s", dst_str,
+				     swim_inaddr_str(dst));
+		swim_packet_build_meta(&task->packet, src, src, &task->dst);
+	} else {
+		swim_packet_build_meta(&task->packet, src, NULL, NULL);
+	}
 	say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler),
-		    task->desc, swim_inaddr_str(&task->dst));
-	swim_packet_build_meta(&task->packet, &scheduler->transport.addr);
+		    task->desc, dst_str);
 	int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
 				     task->packet.pos - task->packet.buf,
-				     (const struct sockaddr *) &task->dst,
-				     sizeof(task->dst));
+				     (const struct sockaddr *) dst,
+				     sizeof(*dst));
 	if (rc < 0)
 		diag_log();
 	if (task->complete != NULL)
@@ -357,7 +399,45 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
 	const char *pos = buf, *end = pos + size;
 	if (swim_meta_def_decode(&meta, &pos, end) < 0)
 		goto error;
-	scheduler->on_input(scheduler, pos, end, &meta.src);
+	/*
+	 * Check if this instance is not the destination and
+	 * possibly forward the packet.
+	 */
+	if (! meta.is_route_specified) {
+		scheduler->on_input(scheduler, pos, end, &meta.src, NULL);
+		return;
+	}
+	struct sockaddr_in *self = &scheduler->transport.addr;
+	if (swim_inaddr_eq(&meta.route.dst, self)) {
+		scheduler->on_input(scheduler, pos, end, &meta.route.src,
+				    &meta.src);
+		return;
+	}
+	/* Forward the foreign packet. */
+	struct swim_task *task = swim_task_new(swim_task_delete_cb,
+					       swim_task_delete_cb,
+					       "foreign packet");
+	if (task == NULL)
+		goto error;
+	/*
+	 * Allocate route meta explicitly, because the packet
+	 * should keep route meta even being sent to the final
+	 * destination directly.
+	 */
+	swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin));
+	/*
+	 * Meta should be rebuilt with the different source
+	 * address - this instance. It is used by the receiver to
+	 * send a reply through this instance again.
+	 */
+	swim_packet_build_meta(&task->packet, self, &meta.route.src,
+			       &meta.route.dst);
+	/* Copy the original body without a touch. */
+	size = end - pos;
+	char *body = swim_packet_alloc(&task->packet, size);
+	assert(body != NULL);
+	memcpy(body, pos, size);
+	swim_task_send(task, &meta.route.dst, scheduler);
 	return;
 error:
 	diag_log();
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 884680859..977859db7 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -140,7 +140,8 @@ swim_packet_create(struct swim_packet *packet);
 
 typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler,
 					  const char *buf, const char *end,
-					  const struct sockaddr_in *src);
+					  const struct sockaddr_in *src,
+					  const struct sockaddr_in *proxy);
 
 /** Planner and executor of input and output operations.*/
 struct swim_scheduler {
@@ -212,6 +213,11 @@ struct swim_task {
 	struct swim_packet packet;
 	/** Destination address. */
 	struct sockaddr_in dst;
+	/**
+	 * Optional proxy via which the destination should be
+	 * reached.
+	 */
+	struct sockaddr_in proxy;
 	/** Place in a queue of tasks. */
 	struct rlist in_queue_output;
 	/**
@@ -232,6 +238,13 @@ swim_task_is_scheduled(struct swim_task *task)
 	return ! rlist_empty(&task->in_queue_output);
 }
 
+/**
+ * Set the proxy for the task. Before sending this proxy is dumped
+ * into metadata section.
+ */
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy);
+
 /**
  * Put the task into a queue of tasks. Eventually it will be sent.
  */
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 796559e8e..18c20abf3 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -439,9 +439,9 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header,
 
 void
 swim_meta_header_bin_create(struct swim_meta_header_bin *header,
-			    const struct sockaddr_in *src)
+			    const struct sockaddr_in *src, bool has_routing)
 {
-	int map_size = 1 + SWIM_INADDR_BIN_SIZE;
+	int map_size = 1 + SWIM_INADDR_BIN_SIZE + has_routing;
 	assert(mp_sizeof_map(map_size) == 1);
 	header->m_header = 0x80 | map_size;
 	header->k_version = SWIM_META_TARANTOOL_VERSION;
@@ -452,6 +452,64 @@ swim_meta_header_bin_create(struct swim_meta_header_bin *header,
 	swim_inaddr_bin_fill(&header->src_addr, src);
 }
 
+/**
+ * Decode meta routing section into meta definition object.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos MessagePack buffer to decode.
+ * @param end End of the MessagePack buffer.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+static int
+swim_meta_def_decode_route(struct swim_meta_def *def, const char **pos,
+			   const char *end)
+{
+	const char *prefix = "invalid routing section:";
+	uint32_t size;
+	def->route.src.sin_family = AF_INET;
+	def->route.dst.sin_family = AF_INET;
+	if (swim_decode_map(pos, end, &size, prefix, "route") != 0)
+		return -1;
+	for (uint32_t i = 0; i < size; ++i) {
+		uint64_t key;
+		if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0)
+			return -1;
+		switch (key) {
+		case SWIM_ROUTE_SRC_ADDRESS:
+			if (swim_decode_ip(&def->route.src, pos, end, prefix,
+					   "source address") != 0)
+				return -1;
+			break;
+		case SWIM_ROUTE_SRC_PORT:
+			if (swim_decode_port(&def->route.src, pos, end,
+					     prefix, "source port") != 0)
+				return -1;
+			break;
+		case SWIM_ROUTE_DST_ADDRESS:
+			if (swim_decode_ip(&def->route.dst, pos, end, prefix,
+					   "destination address") != 0)
+				return -1;
+			break;
+		case SWIM_ROUTE_DST_PORT:
+			if (swim_decode_port(&def->route.dst, pos, end,
+					     prefix, "destination port") != 0)
+				return -1;
+			break;
+		default:
+			diag_set(SwimError, "%s unknown key", prefix);
+			return -1;
+		}
+	}
+	if (swim_check_inaddr_not_empty(&def->route.src, prefix,
+					"source") != 0 ||
+	    swim_check_inaddr_not_empty(&def->route.dst, prefix,
+					"destination") != 0)
+		return -1;
+	def->is_route_specified = true;
+	return 0;
+}
+
 int
 swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
 		     const char *end)
@@ -467,6 +525,10 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
 		if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0)
 			return -1;
 		switch (key) {
+		case SWIM_META_ROUTING:
+			if (swim_meta_def_decode_route(def, pos, end) != 0)
+				return -1;
+			break;
 		case SWIM_META_TARANTOOL_VERSION:
 			if (swim_decode_uint(pos, end, &key, prefix,
 					     "version") != 0)
@@ -509,3 +571,20 @@ swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation)
 	header->m_incarnation = 0xcf;
 	header->v_incarnation = mp_bswap_u64(incarnation);
 }
+
+void
+swim_route_bin_create(struct swim_route_bin *route,
+		      const struct sockaddr_in *src,
+		      const struct sockaddr_in *dst)
+{
+	int map_size = SWIM_INADDR_BIN_SIZE * 2;
+	assert(mp_sizeof_map(map_size) == 1);
+	route->k_routing = SWIM_META_ROUTING;
+	route->m_routing = 0x80 | map_size;
+	swim_inaddr_bin_create(&route->src_addr, SWIM_ROUTE_SRC_ADDRESS,
+			       SWIM_ROUTE_SRC_PORT);
+	swim_inaddr_bin_create(&route->dst_addr, SWIM_ROUTE_DST_ADDRESS,
+			       SWIM_ROUTE_DST_PORT);
+	swim_inaddr_bin_fill(&route->src_addr, src);
+	swim_inaddr_bin_fill(&route->dst_addr, dst);
+}
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index ddf9e28db..f70ac708a 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -51,7 +51,13 @@ enum {
  * | {                                                           |
  * |     SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,|
  * |     SWIM_META_SRC_ADDRESS: uint, ip,                        |
- * |     SWIM_META_SRC_PORT: uint, port                          |
+ * |     SWIM_META_SRC_PORT: uint, port,                         |
+ * |     SWIM_META_ROUTING: {                                    |
+ * |         SWIM_ROUTE_SRC_ADDRESS: uint, ip,                   |
+ * |         SWIM_ROUTE_SRC_PORT: uint, port,                    |
+ * |         SWIM_ROUTE_DST_ADDRESS: uint, ip,                   |
+ * |         SWIM_ROUTE_DST_PORT: uint, port                     |
+ * |     }                                                       |
  * | }                                                           |
  * +-------------------Protocol logic section--------------------+
  * | {                                                           |
@@ -421,6 +427,33 @@ enum swim_meta_key {
 	 */
 	SWIM_META_SRC_ADDRESS,
 	SWIM_META_SRC_PORT,
+	/**
+	 * Routing section allows to specify routes of up to 3
+	 * nodes: source, proxy, and destination. It contains two
+	 * addresses - the message originator and the final
+	 * destination. Here is an example of an indirect message
+	 * transmission. Assume, there are 3 nodes: S1, S2, S3.
+	 * S1 sends a message to S3 via S2. The following steps
+	 * are executed in order to deliver the message:
+	 *
+	 * S1 -> S2
+	 * { src: S1, routing: {src: S1, dst: S3}, body: ... }
+	 *
+	 * S2 receives the message and sees: routing.dst != S2 -
+	 * it is a foreign packet. S2 forwards it to S3 preserving
+	 * all the data - body and routing sections.
+	 *
+	 *
+	 * S2 -> S3
+	 * {src: S2, routing: {src: S1, dst: S3}, body: ...}
+	 *
+	 * S3 receives the message and sees: routing.dst == S3 -
+	 * the message is delivered. If S3 wants to answer, it
+	 * sends a response via the same proxy. It knows, that the
+	 * message was delivered from S2, and sends an answer via
+	 * S2.
+	 */
+	SWIM_META_ROUTING,
 };
 
 /**
@@ -432,7 +465,7 @@ enum swim_meta_key {
  * separate MessagePack map.
  */
 struct PACKED swim_meta_header_bin {
-	/** mp_encode_map(3) */
+	/** mp_encode_map(3 or 4) */
 	uint8_t m_header;
 
 	/** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */
@@ -448,7 +481,7 @@ struct PACKED swim_meta_header_bin {
 /** Initialize meta section. */
 void
 swim_meta_header_bin_create(struct swim_meta_header_bin *header,
-			    const struct sockaddr_in *src);
+			    const struct sockaddr_in *src, bool has_routing);
 
 /** Meta definition. */
 struct swim_meta_def {
@@ -456,6 +489,18 @@ struct swim_meta_def {
 	uint32_t version;
 	/** Source of the message. */
 	struct sockaddr_in src;
+	/** Route source and destination. */
+	struct {
+		struct sockaddr_in src;
+		struct sockaddr_in dst;
+	} route;
+	/**
+	 * True, if both @a src and @a dst are not empty. This
+	 * flag is just sugar so as not to check both addresses
+	 * manually. Also in future more fields could be added
+	 * here.
+	 */
+	bool is_route_specified;
 };
 
 /**
@@ -471,6 +516,43 @@ int
 swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
 		     const char *end);
 
+enum swim_route_key {
+	/**
+	 * True source of the packet. Can be different from the
+	 * packet sender. It is expected that a response should be
+	 * sent back to this address. Maybe indirectly through the
+	 * same proxy.
+	 */
+	SWIM_ROUTE_SRC_ADDRESS = 0,
+	SWIM_ROUTE_SRC_PORT,
+	/**
+	 * True destination of the packet. Can be different from
+	 * this instance, receiver. If it is for another instance,
+	 * then this packet is forwarded to the latter.
+	 */
+	SWIM_ROUTE_DST_ADDRESS,
+	SWIM_ROUTE_DST_PORT,
+	swim_route_key_MAX,
+};
+
+/** Route section template. Describes source, destination. */
+struct PACKED swim_route_bin {
+	/** mp_encode_uint(SWIM_ROUTING) */
+	uint8_t k_routing;
+	/** mp_encode_map(4) */
+	uint8_t m_routing;
+	/** SWIM_ROUTE_SRC_ADDRESS and SWIM_ROUTE_SRC_PORT. */
+	struct swim_inaddr_bin src_addr;
+	/** SWIM_ROUTE_DST_ADDRESS and SWIM_ROUTE_DST_PORT. */
+	struct swim_inaddr_bin dst_addr;
+};
+
+/** Initialize routing section. */
+void
+swim_route_bin_create(struct swim_route_bin *route,
+		      const struct sockaddr_in *src,
+		      const struct sockaddr_in *dst);
+
 /** }}}                     Meta component                      */
 
 enum swim_quit_key {
diff --git a/test/unit/swim_proto.c b/test/unit/swim_proto.c
index 64487fb7d..d7fafc7f0 100644
--- a/test/unit/swim_proto.c
+++ b/test/unit/swim_proto.c
@@ -189,11 +189,64 @@ swim_test_meta(void)
 	footer();
 }
 
+static void
+swim_test_route(void)
+{
+	header();
+	plan(5);
+
+	char buffer[1024];
+	struct swim_meta_def mdef;
+	struct swim_meta_header_bin header;
+	struct sockaddr_in addr;
+	addr.sin_port = htons(1234);
+	fail_if(inet_aton("127.0.0.1", &addr.sin_addr) == 0);
+
+	swim_meta_header_bin_create(&header, &addr, true);
+	memcpy(buffer, &header, sizeof(header));
+	char *last_valid = buffer + sizeof(header);
+	char *end = last_valid;
+	const char *pos = buffer;
+
+	is(swim_meta_def_decode(&mdef, &pos, end), -1,
+	   "route was expected, but map is too short");
+
+	end = mp_encode_uint(end, SWIM_META_ROUTING);
+	pos = buffer;
+	is(swim_meta_def_decode(&mdef, &pos, end), -1, "no route map");
+
+	end = mp_encode_map(end, 0);
+	pos = buffer;
+	is(swim_meta_def_decode(&mdef, &pos, end), -1, "empty route map");
+
+	struct swim_route_bin route;
+	struct sockaddr_in src, dst;
+	memset(&src, 0, sizeof(src));
+	memset(&dst, 0, sizeof(dst));
+	swim_route_bin_create(&route, &src, &dst);
+	memcpy(last_valid, &route, sizeof(route));
+	end = last_valid + sizeof(route);
+	pos = buffer;
+	is(swim_meta_def_decode(&mdef, &pos, end), -1, "zero addresses");
+
+	src.sin_port = 1;
+	src.sin_addr = addr.sin_addr;
+	dst.sin_port = 1;
+	dst.sin_addr = addr.sin_addr;
+	swim_route_bin_create(&route, &src, &dst);
+	memcpy(last_valid, &route, sizeof(route));
+	pos = buffer;
+	is(swim_meta_def_decode(&mdef, &pos, end), 0, "normal route");
+
+	check_plan();
+	footer();
+}
+
 int
 main()
 {
 	header();
-	plan(2);
+	plan(3);
 	memory_init();
 	fiber_init(fiber_c_invoke);
 	int fd = open("log.txt", O_TRUNC);
@@ -203,6 +256,7 @@ main()
 
 	swim_test_member_def();
 	swim_test_meta();
+	swim_test_route();
 
 	say_logger_free();
 	fiber_free();
diff --git a/test/unit/swim_proto.result b/test/unit/swim_proto.result
index c22769161..8a41a5d40 100644
--- a/test/unit/swim_proto.result
+++ b/test/unit/swim_proto.result
@@ -1,5 +1,5 @@
 	*** main ***
-1..2
+1..3
 	*** swim_test_member_def ***
     1..12
     ok 1 - not map header
@@ -28,4 +28,13 @@ ok 1 - subtests
     ok 8 - normal meta
 ok 2 - subtests
 	*** swim_test_meta: done ***
+	*** swim_test_route ***
+    1..5
+    ok 1 - route was expected, but map is too short
+    ok 2 - no route map
+    ok 3 - empty route map
+    ok 4 - zero addresses
+    ok 5 - normal route
+ok 3 - subtests
+	*** swim_test_route: done ***
 	*** main: done ***
-- 
2.20.1 (Apple Git-117)





More information about the Tarantool-patches mailing list