[tarantool-patches] [PATCH 1/2] swim: split send/recv into phases
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Tue May 14 00:57:14 MSK 2019
At this moment swim_scheduler_on_output() is a relatively simple
function. It takes a task, builds its meta and flushes a result
into the network. But soon SWIM will be able to encrypt messages.
It means, that in addition to regular preprocessing like building
meta headers a new phase will appear - encryption. What is more -
conditional encryption, because a user may want to do not encrypt
messages.
All the same is about swim_scheduler_on_input() - if a SWIM
instance uses encryption, it should decrypt incoming messages
before forwarding them into the SWIM code logic.
The chosen strategy - lets reuse on_output/on_input virtuality
and create two version of on_input/on_output functions:
swim_on_plain_input() | swim_on_encrypted_input()
swim_on_plain_output() | swim_on_encrypted_output()
One of these pairs will be chosen depending on if the instance
uses encryption.
To make these 4 functions as simple and short as possible this
commit creates two sets of functions, doing all the logic except
encryption:
swim_begin_send()
swim_do_send()
swim_complete_send()
swim_begin_recv()
swim_do_recv()
swim_complete_recv()
These functions will be used by on_input/on_output functions with
different arguments.
Part of #3234
---
src/lib/swim/swim_io.c | 129 +++++++++++++++++++++++++++++++++--------
1 file changed, 104 insertions(+), 25 deletions(-)
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 107f47140..5bfa138df 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -338,66 +338,135 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler)
swim_scheduler_stop_input(scheduler);
}
-static void
-swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
+/**
+ * Begin packet transmission. Prepare a next task in the queue to
+ * send its packet: build a meta header, pop the task from the
+ * queue.
+ * @param scheduler Scheduler to pop a task from.
+ * @param loop Event loop passed by libev.
+ * @param io Descriptor to send to.
+ * @param events Mask of happened events passed by libev.
+ * @param[out] dst Destination address to send the packet to. Can
+ * be different from task.dst, for example, if task.proxy
+ * is specified.
+ *
+ * @retval NULL The queue is empty. Input has been stopped.
+ * @retval not NULL A task ready to be sent.
+ */
+static struct swim_task *
+swim_begin_send(struct swim_scheduler *scheduler, struct ev_loop *loop,
+ struct ev_io *io, int events, const struct sockaddr_in **dst)
{
assert((events & EV_WRITE) != 0);
(void) events;
- struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
if (rlist_empty(&scheduler->queue_output)) {
/*
* Possible, if a member pushed a task and then
* was deleted together with it.
*/
swim_ev_io_stop(loop, io);
- return;
+ return NULL;
}
struct swim_task *task =
rlist_shift_entry(&scheduler->queue_output, struct swim_task,
in_queue_output);
const struct sockaddr_in *src = &scheduler->transport.addr;
- const struct sockaddr_in *dst = &task->dst;
- const char *dst_str = swim_inaddr_str(dst);
+ *dst = &task->dst;
+ const char *dst_str = swim_inaddr_str(*dst);
if (! swim_inaddr_is_empty(&task->proxy)) {
- dst = &task->proxy;
+ *dst = &task->proxy;
dst_str = tt_sprintf("%s via %s", dst_str,
- swim_inaddr_str(dst));
+ swim_inaddr_str(*dst));
swim_packet_build_meta(&task->packet, src, src, &task->dst);
} else {
swim_packet_build_meta(&task->packet, src, NULL, NULL);
}
say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler),
task->desc, dst_str);
- int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
- task->packet.pos - task->packet.buf,
- (const struct sockaddr *) dst,
- sizeof(*dst));
- if (rc < 0)
+ return task;
+}
+
+/** Send a packet into the network. */
+static inline ssize_t
+swim_do_send(struct swim_scheduler *scheduler, const char *buf, int size,
+ const struct sockaddr_in *dst)
+{
+ return swim_transport_send(&scheduler->transport, buf, size,
+ (const struct sockaddr *) dst, sizeof(*dst));
+}
+
+/**
+ * Finalize packet transmission, call the completion callback.
+ * @param scheduler Scheduler owning @a task.
+ * @param task Sent (or failed to be sent) task.
+ * @param size Result of send().
+ */
+static inline void
+swim_complete_send(struct swim_scheduler *scheduler, struct swim_task *task,
+ ssize_t size)
+{
+ if (size < 0)
diag_log();
if (task->complete != NULL)
- task->complete(task, scheduler, rc);
+ task->complete(task, scheduler, size);
}
static void
-swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
+swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
+{
+ struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
+ const struct sockaddr_in *dst;
+ struct swim_task *task = swim_begin_send(scheduler, loop, io, events,
+ &dst);
+ if (task == NULL)
+ return;
+ ssize_t size = swim_do_send(scheduler, task->packet.buf,
+ task->packet.pos - task->packet.buf, dst);
+ swim_complete_send(scheduler, task, size);
+}
+
+/**
+ * Begin packet receipt. Note, this function is no-op, and exists
+ * just for consistency with begin/do/complete_send() functions.
+ */
+static inline void
+swim_begin_recv(struct swim_scheduler *scheduler, struct ev_loop *loop,
+ struct ev_io *io, int events)
{
assert((events & EV_READ) != 0);
+ (void) io;
+ (void) scheduler;
(void) events;
(void) loop;
- struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
+}
+
+/** Receive a packet from the network. */
+static ssize_t
+swim_do_recv(struct swim_scheduler *scheduler, char *buf, int size)
+{
struct sockaddr_in src;
socklen_t len = sizeof(src);
- char buf[UDP_PACKET_SIZE];
- ssize_t size = swim_transport_recv(&scheduler->transport, buf,
- sizeof(buf),
- (struct sockaddr *) &src, &len);
- if (size <= 0) {
- if (size < 0)
- goto error;
- return;
- }
+ ssize_t rc = swim_transport_recv(&scheduler->transport, buf, size,
+ (struct sockaddr *) &src, &len);
+ if (rc <= 0)
+ return rc;
say_verbose("SWIM %d: received from %s", swim_scheduler_fd(scheduler),
swim_inaddr_str(&src));
+ return rc;
+}
+
+/**
+ * Finalize packet receipt, call the SWIM core callbacks, or
+ * forward the packet to a next node.
+ */
+static void
+swim_complete_recv(struct swim_scheduler *scheduler, const char *buf,
+ ssize_t size)
+{
+ if (size < 0)
+ goto error;
+ if (size == 0)
+ return;
struct swim_meta_def meta;
const char *pos = buf, *end = pos + size;
if (swim_meta_def_decode(&meta, &pos, end) < 0)
@@ -446,6 +515,16 @@ error:
diag_log();
}
+static void
+swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
+{
+ struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
+ char buf[UDP_PACKET_SIZE];
+ swim_begin_recv(scheduler, loop, io, events);
+ ssize_t size = swim_do_recv(scheduler, buf, UDP_PACKET_SIZE);
+ swim_complete_recv(scheduler, buf, size);
+}
+
const char *
swim_inaddr_str(const struct sockaddr_in *addr)
{
--
2.20.1 (Apple Git-117)
More information about the Tarantool-patches
mailing list