[tarantool-patches] [PATCH 1/2] swim: split send/recv into phases

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue May 14 00:57:14 MSK 2019


At this moment swim_scheduler_on_output() is a relatively simple
function. It takes a task, builds its meta and flushes a result
into the network. But soon SWIM will be able to encrypt messages.

It means, that in addition to regular preprocessing like building
meta headers a new phase will appear - encryption. What is more -
conditional encryption, because a user may want to do not encrypt
messages.

All the same is about swim_scheduler_on_input() - if a SWIM
instance uses encryption, it should decrypt incoming messages
before forwarding them into the SWIM code logic.

The chosen strategy - lets reuse on_output/on_input virtuality
and create two version of on_input/on_output functions:

    swim_on_plain_input()  | swim_on_encrypted_input()
    swim_on_plain_output() | swim_on_encrypted_output()

One of these pairs will be chosen depending on if the instance
uses encryption.

To make these 4 functions as simple and short as possible this
commit creates two sets of functions, doing all the logic except
encryption:

    swim_begin_send()
    swim_do_send()
    swim_complete_send()

    swim_begin_recv()
    swim_do_recv()
    swim_complete_recv()

These functions will be used by on_input/on_output functions with
different arguments.

Part of #3234
---
 src/lib/swim/swim_io.c | 129 +++++++++++++++++++++++++++++++++--------
 1 file changed, 104 insertions(+), 25 deletions(-)

diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 107f47140..5bfa138df 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -338,66 +338,135 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler)
 	swim_scheduler_stop_input(scheduler);
 }
 
-static void
-swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
+/**
+ * Begin packet transmission. Prepare a next task in the queue to
+ * send its packet: build a meta header, pop the task from the
+ * queue.
+ * @param scheduler Scheduler to pop a task from.
+ * @param loop Event loop passed by libev.
+ * @param io Descriptor to send to.
+ * @param events Mask of happened events passed by libev.
+ * @param[out] dst Destination address to send the packet to. Can
+ *        be different from task.dst, for example, if task.proxy
+ *        is specified.
+ *
+ * @retval NULL The queue is empty. Input has been stopped.
+ * @retval not NULL A task ready to be sent.
+ */
+static struct swim_task *
+swim_begin_send(struct swim_scheduler *scheduler, struct ev_loop *loop,
+		struct ev_io *io, int events, const struct sockaddr_in **dst)
 {
 	assert((events & EV_WRITE) != 0);
 	(void) events;
-	struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
 	if (rlist_empty(&scheduler->queue_output)) {
 		/*
 		 * Possible, if a member pushed a task and then
 		 * was deleted together with it.
 		 */
 		swim_ev_io_stop(loop, io);
-		return;
+		return NULL;
 	}
 	struct swim_task *task =
 		rlist_shift_entry(&scheduler->queue_output, struct swim_task,
 				  in_queue_output);
 	const struct sockaddr_in *src = &scheduler->transport.addr;
-	const struct sockaddr_in *dst = &task->dst;
-	const char *dst_str = swim_inaddr_str(dst);
+	*dst = &task->dst;
+	const char *dst_str = swim_inaddr_str(*dst);
 	if (! swim_inaddr_is_empty(&task->proxy)) {
-		dst = &task->proxy;
+		*dst = &task->proxy;
 		dst_str = tt_sprintf("%s via %s", dst_str,
-				     swim_inaddr_str(dst));
+				     swim_inaddr_str(*dst));
 		swim_packet_build_meta(&task->packet, src, src, &task->dst);
 	} else {
 		swim_packet_build_meta(&task->packet, src, NULL, NULL);
 	}
 	say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler),
 		    task->desc, dst_str);
-	int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
-				     task->packet.pos - task->packet.buf,
-				     (const struct sockaddr *) dst,
-				     sizeof(*dst));
-	if (rc < 0)
+	return task;
+}
+
+/** Send a packet into the network. */
+static inline ssize_t
+swim_do_send(struct swim_scheduler *scheduler, const char *buf, int size,
+	     const struct sockaddr_in *dst)
+{
+	return swim_transport_send(&scheduler->transport, buf, size,
+				   (const struct sockaddr *) dst, sizeof(*dst));
+}
+
+/**
+ * Finalize packet transmission, call the completion callback.
+ * @param scheduler Scheduler owning @a task.
+ * @param task Sent (or failed to be sent) task.
+ * @param size Result of send().
+ */
+static inline void
+swim_complete_send(struct swim_scheduler *scheduler, struct swim_task *task,
+		   ssize_t size)
+{
+	if (size < 0)
 		diag_log();
 	if (task->complete != NULL)
-		task->complete(task, scheduler, rc);
+		task->complete(task, scheduler, size);
 }
 
 static void
-swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
+swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
+{
+	struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
+	const struct sockaddr_in *dst;
+	struct swim_task *task = swim_begin_send(scheduler, loop, io, events,
+						 &dst);
+	if (task == NULL)
+		return;
+	ssize_t size = swim_do_send(scheduler, task->packet.buf,
+				    task->packet.pos - task->packet.buf, dst);
+	swim_complete_send(scheduler, task, size);
+}
+
+/**
+ * Begin packet receipt. Note, this function is no-op, and exists
+ * just for consistency with begin/do/complete_send() functions.
+ */
+static inline void
+swim_begin_recv(struct swim_scheduler *scheduler, struct ev_loop *loop,
+		struct ev_io *io, int events)
 {
 	assert((events & EV_READ) != 0);
+	(void) io;
+	(void) scheduler;
 	(void) events;
 	(void) loop;
-	struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
+}
+
+/** Receive a packet from the network. */
+static ssize_t
+swim_do_recv(struct swim_scheduler *scheduler, char *buf, int size)
+{
 	struct sockaddr_in src;
 	socklen_t len = sizeof(src);
-	char buf[UDP_PACKET_SIZE];
-	ssize_t size = swim_transport_recv(&scheduler->transport, buf,
-					   sizeof(buf),
-					   (struct sockaddr *) &src, &len);
-	if (size <= 0) {
-		if (size < 0)
-			goto error;
-		return;
-	}
+	ssize_t rc = swim_transport_recv(&scheduler->transport, buf, size,
+					 (struct sockaddr *) &src, &len);
+	if (rc <= 0)
+		return rc;
 	say_verbose("SWIM %d: received from %s", swim_scheduler_fd(scheduler),
 		    swim_inaddr_str(&src));
+	return rc;
+}
+
+/**
+ * Finalize packet receipt, call the SWIM core callbacks, or
+ * forward the packet to a next node.
+ */
+static void
+swim_complete_recv(struct swim_scheduler *scheduler, const char *buf,
+		   ssize_t size)
+{
+	if (size < 0)
+		goto error;
+	if (size == 0)
+		return;
 	struct swim_meta_def meta;
 	const char *pos = buf, *end = pos + size;
 	if (swim_meta_def_decode(&meta, &pos, end) < 0)
@@ -446,6 +515,16 @@ error:
 	diag_log();
 }
 
+static void
+swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
+{
+	struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
+	char buf[UDP_PACKET_SIZE];
+	swim_begin_recv(scheduler, loop, io, events);
+	ssize_t size = swim_do_recv(scheduler, buf, UDP_PACKET_SIZE);
+	swim_complete_recv(scheduler, buf, size);
+}
+
 const char *
 swim_inaddr_str(const struct sockaddr_in *addr)
 {
-- 
2.20.1 (Apple Git-117)





More information about the Tarantool-patches mailing list