[PATCH v4 09/12] [RAW] swim: introduce routing

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu Jan 31 00:28:38 MSK 2019


It is going to be used for indirect ping/acks.

Part of #3234
---
 src/lib/swim/swim.c       |   4 +-
 src/lib/swim/swim_io.c    | 100 +++++++++++++++++++++++++++++++++++---
 src/lib/swim/swim_io.h    |  17 ++++++-
 src/lib/swim/swim_proto.c |  92 ++++++++++++++++++++++++++++++++++-
 src/lib/swim/swim_proto.h |  76 +++++++++++++++++++++++++++--
 5 files changed, 274 insertions(+), 15 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 78dbc6092..5cec3789a 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1200,8 +1200,10 @@ swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
 /** Process a new message. */
 static void
 swim_on_input(struct swim_scheduler *scheduler, const char *pos,
-	      const char *end, const struct sockaddr_in *src)
+	      const char *end, const struct sockaddr_in *src,
+	      const struct sockaddr_in *proxy)
 {
+	(void) proxy;
 	const char *msg_pref = "invalid message:";
 	struct swim *swim = swim_by_scheduler(scheduler);
 	struct tt_uuid uuid;
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 170d7af77..a8fb1f588 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -54,6 +54,50 @@ swim_packet_create(struct swim_packet *packet)
 	swim_packet_alloc_meta(packet, sizeof(struct swim_meta_header_bin));
 }
 
+/** Fill metadata prefix of a packet. */
+static inline void
+swim_packet_build_meta(struct swim_packet *packet,
+		       const struct sockaddr_in *src,
+		       const struct sockaddr_in *route_src,
+		       const struct sockaddr_in *route_dst)
+{
+	char *meta = packet->meta;
+	char *end = packet->body;
+	/*
+	 * Meta is already built, do nothing. It is used for
+	 * packet forwarding, when route source != this instance.
+	 */
+	if (meta == end)
+		return;
+	struct swim_meta_header_bin header;
+	struct swim_route_bin route;
+	assert(meta + sizeof(header) <= end);
+	swim_meta_header_bin_create(&header, src, route_dst != NULL);
+	memcpy(meta, &header, sizeof(header));
+	if (route_dst != NULL) {
+		meta += sizeof(header);
+		assert(meta + sizeof(route) <= end);
+		swim_route_bin_create(&route, route_src, route_dst);
+		memcpy(meta, &route, sizeof(route));
+	}
+	/* Now the meta is build and the body consumes it. */
+	packet->body = packet->meta;
+}
+
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
+{
+	/*
+	 * Meta should be reserved before body encoding is
+	 * started. Otherwise it would be necessary to move
+	 * already encoded body, maybe losing its tail.
+	 */
+	assert(swim_packet_body_size(&task->packet) == 0);
+	task->proxy = *proxy;
+	task->is_proxy_specified = true;
+	swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin));
+}
+
 void
 swim_task_create(struct swim_task *task, swim_task_f complete,
 		 swim_task_f cancel)
@@ -178,16 +222,24 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
 	struct swim_task *task =
 		rlist_shift_entry(&scheduler->queue_output, struct swim_task,
 				  in_queue_output);
+	const struct sockaddr_in *src = &scheduler->transport.addr;
+	const struct sockaddr_in *dst, *proxy_dst, *proxy_src;
+	if (task->is_proxy_specified) {
+		dst = &task->proxy;
+		proxy_dst = dst;
+		proxy_src = src;
+	} else {
+		dst = &task->dst;
+		proxy_dst = NULL;
+		proxy_src = NULL;
+	}
+	swim_packet_build_meta(&task->packet, src, proxy_src, proxy_dst);
 	say_verbose("SWIM: send to %s",
-		    sio_strfaddr((struct sockaddr *) &task->dst,
-				 sizeof(task->dst)));
-	struct swim_meta_header_bin header;
-	swim_meta_header_bin_create(&header, &scheduler->transport.addr);
-	memcpy(task->packet.meta, &header, sizeof(header));
+		    sio_strfaddr((struct sockaddr *) dst, sizeof(*dst)));
 	int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
 				     task->packet.pos - task->packet.buf,
-				     (const struct sockaddr *) &task->dst,
-				     sizeof(task->dst));
+				     (const struct sockaddr *) dst,
+				     sizeof(*dst));
 	if (rc != 0)
 		diag_log();
 	if (task->complete != NULL)
@@ -216,9 +268,41 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
 		    sio_strfaddr((struct sockaddr *) &src, len));
 	struct swim_meta_def meta;
 	const char *pos = buf, *end = pos + size;
+	struct sockaddr_in *self = &scheduler->transport.addr;
 	if (swim_meta_def_decode(&meta, &pos, end) < 0)
 		goto error;
-	scheduler->on_input(scheduler, pos, end, &meta.src);
+	/*
+	 * Check if this instance is not a receiver and possibly
+	 * forward the packet.
+	 */
+	if (! meta.is_route_specified) {
+		scheduler->on_input(scheduler, pos, end, &meta.src, NULL);
+	} else if (meta.route.dst.sin_port == self->sin_port &&
+		   meta.route.dst.sin_addr.s_addr == self->sin_addr.s_addr) {
+		scheduler->on_input(scheduler, pos, end, &meta.route.src,
+				    &meta.src);
+	} else {
+		/* Forward the packet. */
+		struct swim_task *task = swim_task_new(swim_task_delete_cb,
+						       swim_task_delete_cb);
+		if (task == NULL)
+			goto error;
+		swim_task_proxy(task, &meta.route.dst);
+		/*
+		 * Meta should be rebuilt with the different
+		 * source address - this instance. It is used by a
+		 * receiver to send a reply through this instance
+		 * again.
+		 */
+		swim_packet_build_meta(&task->packet, self, &meta.route.src,
+				       &meta.route.dst);
+		/* Copy the original body without a touch. */
+		size = end - pos;
+		char *body = swim_packet_alloc(&task->packet, size);
+		assert(body != NULL);
+		memcpy(body, pos, size);
+		swim_task_send(task, &meta.route.dst, scheduler);
+	}
 	return;
 error:
 	diag_log();
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 4d694857d..0ba8972f0 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -138,7 +138,8 @@ swim_packet_create(struct swim_packet *packet);
 
 typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler,
 					  const char *buf, const char *end,
-					  const struct sockaddr_in *src);
+					  const struct sockaddr_in *src,
+					  const struct sockaddr_in *proxy);
 
 /** Planner and executor of input and output operations.*/
 struct swim_scheduler {
@@ -206,10 +207,24 @@ struct swim_task {
 	struct swim_packet packet;
 	/** Destination address. */
 	struct sockaddr_in dst;
+	/**
+	 * Optional proxy via which the destination should be
+	 * reached.
+	 */
+	struct sockaddr_in proxy;
+	/** True if a proxy is specified. */
+	bool is_proxy_specified;
 	/** Place in a queue of tasks. */
 	struct rlist in_queue_output;
 };
 
+/**
+ * Set the proxy for the task. Before sending this proxy will be
+ * dumped into metadata.
+ */
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy);
+
 /**
  * Put the task into a queue of tasks. Eventually it will be sent.
  */
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 284d35695..8b1ed76a7 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -429,9 +429,9 @@ swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header,
 
 void
 swim_meta_header_bin_create(struct swim_meta_header_bin *header,
-			    const struct sockaddr_in *src)
+			    const struct sockaddr_in *src, bool has_routing)
 {
-	header->m_header = 0x83;
+	header->m_header = 0x83 + has_routing;
 	header->k_version = SWIM_META_TARANTOOL_VERSION;
 	header->m_version = 0xce;
 	header->v_version = mp_bswap_u32(tarantool_version_id());
@@ -443,6 +443,69 @@ swim_meta_header_bin_create(struct swim_meta_header_bin *header,
 	header->v_port = mp_bswap_u16(src->sin_port);
 }
 
+/**
+ * Decode meta routing section into meta definition object.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos MessagePack buffer to decode.
+ * @param end End of the MessagePack buffer.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+static int
+swim_meta_def_decode_route(struct swim_meta_def *def, const char **pos,
+			   const char *end)
+{
+	const char *msg_pref = "invalid routing section:";
+	uint32_t size;
+	if (swim_decode_map(pos, end, &size, msg_pref, "route") != 0)
+		return -1;
+	for (uint32_t i = 0; i < size; ++i) {
+		uint64_t key;
+		if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0)
+			return -1;
+		switch (key) {
+		case SWIM_ROUTE_SRC_ADDRESS:
+			if (swim_decode_ip(&def->route.src, pos, end, msg_pref,
+					   "source address") != 0)
+				return -1;
+			break;
+		case SWIM_ROUTE_SRC_PORT:
+			if (swim_decode_port(&def->route.src, pos, end,
+					     msg_pref, "source port") != 0)
+				return -1;
+			break;
+		case SWIM_ROUTE_DST_ADDRESS:
+			if (swim_decode_ip(&def->route.dst, pos, end, msg_pref,
+					   "destination address") != 0)
+				return -1;
+			break;
+		case SWIM_ROUTE_DST_PORT:
+			if (swim_decode_port(&def->route.dst, pos, end,
+					     msg_pref, "destination port") != 0)
+				return -1;
+			break;
+		default:
+			diag_set(SwimError, "%s unknown key", msg_pref);
+			return -1;
+		}
+	}
+	if (def->route.src.sin_port == 0 ||
+	    def->route.src.sin_addr.s_addr == 0) {
+		diag_set(SwimError, "%s source address should be specified",
+			 msg_pref);
+		return -1;
+	}
+	if (def->route.dst.sin_port == 0 ||
+	    def->route.dst.sin_addr.s_addr == 0) {
+		diag_set(SwimError, "%s destination address should be "\
+			 "specified", msg_pref);
+		return -1;
+	}
+	def->is_route_specified = true;
+	return 0;
+}
+
 int
 swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
 		     const char *end)
@@ -457,6 +520,10 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
 		if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0)
 			return -1;
 		switch (key) {
+		case SWIM_META_ROUTING:
+			if (swim_meta_def_decode_route(def, pos, end) != 0)
+				return -1;
+			break;
 		case SWIM_META_TARANTOOL_VERSION:
 			if (swim_decode_uint(pos, end, &key, msg_pref,
 					     "version") != 0)
@@ -493,3 +560,24 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
 	}
 	return 0;
 }
+
+void
+swim_route_bin_create(struct swim_route_bin *route,
+		      const struct sockaddr_in *src,
+		      const struct sockaddr_in *dst)
+{
+	route->k_routing = SWIM_META_ROUTING;
+	route->m_routing = 0x84;
+	route->k_src_addr = SWIM_ROUTE_SRC_ADDRESS;
+	route->m_src_addr = 0xce;
+	route->v_src_addr = mp_bswap_u32(src->sin_addr.s_addr);
+	route->k_src_port = SWIM_ROUTE_SRC_PORT;
+	route->m_src_port = 0xcd;
+	route->v_src_port = mp_bswap_u16(src->sin_port);
+	route->k_dst_addr = SWIM_ROUTE_DST_ADDRESS;
+	route->m_dst_addr = 0xce;
+	route->v_dst_addr = mp_bswap_u32(dst->sin_addr.s_addr);
+	route->k_dst_port = SWIM_ROUTE_DST_PORT;
+	route->m_dst_port = 0xcd;
+	route->v_dst_port = mp_bswap_u16(dst->sin_port);
+}
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 353605c35..fe9eb85c5 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -48,7 +48,13 @@ enum {
  * | {                                                           |
  * |     SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,|
  * |     SWIM_META_SRC_ADDRESS: uint, ip,                        |
- * |     SWIM_META_SRC_PORT: uint, port                          |
+ * |     SWIM_META_SRC_PORT: uint, port,                         |
+ * |     SWIM_META_ROUTING: {                                    |
+ * |         SWIM_ROUTE_SRC_ADDRESS: uint, ip,                   |
+ * |         SWIM_ROUTE_SRC_PORT: uint, port,                    |
+ * |         SWIM_ROUTE_DST_ADDRESS: uint, ip,                   |
+ * |         SWIM_ROUTE_DST_PORT: uint, port                     |
+ * |     }                                                       |
  * | }                                                           |
  * +-------------------Protocol logic section--------------------+
  * | {                                                           |
@@ -458,6 +464,7 @@ enum swim_meta_key {
 	 */
 	SWIM_META_SRC_ADDRESS,
 	SWIM_META_SRC_PORT,
+	SWIM_META_ROUTING,
 };
 
 /**
@@ -469,7 +476,7 @@ enum swim_meta_key {
  * separate MessagePack map.
  */
 struct PACKED swim_meta_header_bin {
-	/** mp_encode_map(3) */
+	/** mp_encode_map(3 or 4) */
 	uint8_t m_header;
 
 	/** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */
@@ -494,7 +501,7 @@ struct PACKED swim_meta_header_bin {
 /** Initialize meta section. */
 void
 swim_meta_header_bin_create(struct swim_meta_header_bin *header,
-			    const struct sockaddr_in *src);
+			    const struct sockaddr_in *src, bool has_routing);
 
 /** Meta definition. */
 struct swim_meta_def {
@@ -502,6 +509,12 @@ struct swim_meta_def {
 	uint32_t version;
 	/** Source of the message. */
 	struct sockaddr_in src;
+	/** Route source and destination. */
+	bool is_route_specified;
+	struct {
+		struct sockaddr_in src;
+		struct sockaddr_in dst;
+	} route;
 };
 
 /**
@@ -517,6 +530,63 @@ int
 swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
 		     const char *end);
 
+enum swim_route_key {
+	/**
+	 * True source of the packet. Can be different from the
+	 * packet sender. It is expected that the answer should
+	 * be sent back to this address. Maybe indirectly through
+	 * the same proxy.
+	 */
+	SWIM_ROUTE_SRC_ADDRESS = 0,
+	SWIM_ROUTE_SRC_PORT,
+	/**
+	 * True destination of the packet. Can be different from
+	 * this instance, receiver. If it is for another instance,
+	 * then this packet is forwarded to the latter.
+	 */
+	SWIM_ROUTE_DST_ADDRESS,
+	SWIM_ROUTE_DST_PORT,
+	swim_route_key_MAX,
+};
+
+/** Route section template. Describes source, destination. */
+struct PACKED swim_route_bin {
+	/** mp_encode_uint(SWIM_ROUTING) */
+	uint8_t k_routing;
+	/** mp_encode_map(4) */
+	uint8_t m_routing;
+
+	/** mp_encode_uint(SWIM_ROUTE_SRC_ADDRESS) */
+	uint8_t k_src_addr;
+	/** mp_encode_uint(addr.sin_addr.s_addr) */
+	uint8_t m_src_addr;
+	uint32_t v_src_addr;
+
+	/** mp_encode_uint(SWIM_ROUTE_SRC_PORT) */
+	uint8_t k_src_port;
+	/** mp_encode_uint(addr.sin_port) */
+	uint8_t m_src_port;
+	uint16_t v_src_port;
+
+	/** mp_encode_uint(SWIM_ROUTE_DST_ADDRESS) */
+	uint8_t k_dst_addr;
+	/** mp_encode_uint(addr.sin_addr.s_addr) */
+	uint8_t m_dst_addr;
+	uint32_t v_dst_addr;
+
+	/** mp_encode_uint(SWIM_ROUTE_DST_PORT) */
+	uint8_t k_dst_port;
+	/** mp_encode_uint(addr.sin_port) */
+	uint8_t m_dst_port;
+	uint16_t v_dst_port;
+};
+
+/** Initialize routing section. */
+void
+swim_route_bin_create(struct swim_route_bin *route,
+		      const struct sockaddr_in *src,
+		      const struct sockaddr_in *dst);
+
 /** }}}                     Meta component                      */
 
 /**
-- 
2.17.2 (Apple Git-113)




More information about the Tarantool-patches mailing list