From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id D854D2BC72 for ; Mon, 29 Apr 2019 14:13:49 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id lV8JaEtU6ZlZ for ; Mon, 29 Apr 2019 14:13:49 -0400 (EDT) Received: from smtp54.i.mail.ru (smtp54.i.mail.ru [217.69.128.34]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 949392DB15 for ; Mon, 29 Apr 2019 14:13:49 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH 1/2] swim: split send/recv into phases Date: Mon, 29 Apr 2019 21:13:43 +0300 Message-Id: <61a41781d019b3edcdec85519dd93f5ea9c3acd7.1556561487.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org At this moment swim_scheduler_on_output() is a relatively simple function. It takes a task, builds its meta and flushes a result into the network. But soon SWIM will be able to encrypt messages. It means, that in addition to regular preprocessing like building meta headers a new phase will appear - encryption. What is more - conditional encryption, because a user may want to do not encrypt messages. All the same is about swim_scheduler_on_input() - if a SWIM instance uses encryption, it should decrypt incoming messages before forwarding them into the SWIM code logic. The chosen strategy - lets reuse on_output/on_input virtuality and create two version of on_input/on_output functions: swim_on_plain_input() | swim_on_encrypted_input() swim_on_plain_output() | swim_on_encrypted_output() One of these pairs will be chosen depending on if the instance uses encryption. To make these 4 functions as simple and short and possible this commit creates two sets of functions, doing all the logic except encryption: swim_begin_send() swim_do_send() swim_complete_send() swim_begin_recv() swim_do_recv() swim_complete_recv() These functions will be used by on_input/on_output functions with different arguments. Part of #3234 --- src/lib/swim/swim_io.c | 129 +++++++++++++++++++++++++++++++++-------- 1 file changed, 104 insertions(+), 25 deletions(-) diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index ee05123c9..2c1448c1f 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -335,66 +335,135 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler) swim_scheduler_stop_input(scheduler); } -static void -swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) +/** + * Begin packet transmission. Prepare a next task in the queue to + * send its packet: build a meta header, pop the task from the + * queue. + * @param scheduler Scheduler to pop a task from. + * @param loop Event loop passed by libev. + * @param io Descriptor to send to. + * @param events Mask of happened events passed by libev. + * @param[out] dst Destination address to send the packet to. Can + * be different from task.dst, for example, if task.proxy + * is specified. + * + * @retval NULL The queue is empty. Input has been stopped. + * @retval not NULL A task ready to be sent. + */ +static struct swim_task * +swim_begin_send(struct swim_scheduler *scheduler, struct ev_loop *loop, + struct ev_io *io, int events, const struct sockaddr_in **dst) { assert((events & EV_WRITE) != 0); (void) events; - struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; if (rlist_empty(&scheduler->queue_output)) { /* * Possible, if a member pushed a task and then * was deleted together with it. */ swim_ev_io_stop(loop, io); - return; + return NULL; } struct swim_task *task = rlist_shift_entry(&scheduler->queue_output, struct swim_task, in_queue_output); const struct sockaddr_in *src = &scheduler->transport.addr; - const struct sockaddr_in *dst = &task->dst; - const char *dst_str = swim_inaddr_str(dst); + *dst = &task->dst; + const char *dst_str = swim_inaddr_str(*dst); if (! swim_inaddr_is_empty(&task->proxy)) { - dst = &task->proxy; + *dst = &task->proxy; dst_str = tt_sprintf("%s via %s", dst_str, - swim_inaddr_str(dst)); + swim_inaddr_str(*dst)); swim_packet_build_meta(&task->packet, src, src, &task->dst); } else { swim_packet_build_meta(&task->packet, src, NULL, NULL); } say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler), task->desc, dst_str); - int rc = swim_transport_send(&scheduler->transport, task->packet.buf, - task->packet.pos - task->packet.buf, - (const struct sockaddr *) dst, - sizeof(*dst)); - if (rc < 0) + return task; +} + +/** Send a packet into the network. */ +static inline ssize_t +swim_do_send(struct swim_scheduler *scheduler, const char *buf, int size, + const struct sockaddr_in *dst) +{ + return swim_transport_send(&scheduler->transport, buf, size, + (const struct sockaddr *) dst, sizeof(*dst)); +} + +/** + * Finalize packet transmission, call the completion callback. + * @param scheduler Scheduler owning @a task. + * @param task Sent (or failed to be sent) task. + * @param size Result of send(). + */ +static inline void +swim_complete_send(struct swim_scheduler *scheduler, struct swim_task *task, + ssize_t size) +{ + if (size < 0) diag_log(); if (task->complete != NULL) - task->complete(task, scheduler, rc); + task->complete(task, scheduler, size); } static void -swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events) +swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) +{ + struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; + const struct sockaddr_in *dst; + struct swim_task *task = swim_begin_send(scheduler, loop, io, events, + &dst); + if (task == NULL) + return; + ssize_t size = swim_do_send(scheduler, task->packet.buf, + task->packet.pos - task->packet.buf, dst); + swim_complete_send(scheduler, task, size); +} + +/** + * Begin packet receipt. Note, this function is no-op, and exists + * just for consistency with begin/do/complete_send() functions. + */ +static inline void +swim_begin_recv(struct swim_scheduler *scheduler, struct ev_loop *loop, + struct ev_io *io, int events) { assert((events & EV_READ) != 0); + (void) io; + (void) scheduler; (void) events; (void) loop; - struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; +} + +/** Receive a packet from the network. */ +static ssize_t +swim_do_recv(struct swim_scheduler *scheduler, char *buf, int size) +{ struct sockaddr_in src; socklen_t len = sizeof(src); - char buf[UDP_PACKET_SIZE]; - ssize_t size = swim_transport_recv(&scheduler->transport, buf, - sizeof(buf), - (struct sockaddr *) &src, &len); - if (size <= 0) { - if (size < 0) - goto error; - return; - } + ssize_t rc = swim_transport_recv(&scheduler->transport, buf, size, + (struct sockaddr *) &src, &len); + if (rc <= 0) + return rc; say_verbose("SWIM %d: received from %s", swim_scheduler_fd(scheduler), swim_inaddr_str(&src)); + return rc; +} + +/** + * Finalize packet receipt, call the SWIM core callbacks, or + * forward the packet to a next node. + */ +static void +swim_complete_recv(struct swim_scheduler *scheduler, const char *buf, + ssize_t size) +{ + if (size < 0) + goto error; + if (size == 0) + return; struct swim_meta_def meta; const char *pos = buf, *end = pos + size; if (swim_meta_def_decode(&meta, &pos, end) < 0) @@ -443,6 +512,16 @@ error: diag_log(); } +static void +swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events) +{ + struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; + char buf[UDP_PACKET_SIZE]; + swim_begin_recv(scheduler, loop, io, events); + ssize_t size = swim_do_recv(scheduler, buf, UDP_PACKET_SIZE); + swim_complete_recv(scheduler, buf, size); +} + const char * swim_inaddr_str(const struct sockaddr_in *addr) { -- 2.20.1 (Apple Git-117)