From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Subject: [tarantool-patches] [PATCH 1/2] swim: split send/recv into phases Date: Mon, 29 Apr 2019 21:13:43 +0300 [thread overview] Message-ID: <61a41781d019b3edcdec85519dd93f5ea9c3acd7.1556561487.git.v.shpilevoy@tarantool.org> (raw) In-Reply-To: <cover.1556561487.git.v.shpilevoy@tarantool.org> At this moment swim_scheduler_on_output() is a relatively simple function. It takes a task, builds its meta and flushes a result into the network. But soon SWIM will be able to encrypt messages. It means, that in addition to regular preprocessing like building meta headers a new phase will appear - encryption. What is more - conditional encryption, because a user may want to do not encrypt messages. All the same is about swim_scheduler_on_input() - if a SWIM instance uses encryption, it should decrypt incoming messages before forwarding them into the SWIM code logic. The chosen strategy - lets reuse on_output/on_input virtuality and create two version of on_input/on_output functions: swim_on_plain_input() | swim_on_encrypted_input() swim_on_plain_output() | swim_on_encrypted_output() One of these pairs will be chosen depending on if the instance uses encryption. To make these 4 functions as simple and short and possible this commit creates two sets of functions, doing all the logic except encryption: swim_begin_send() swim_do_send() swim_complete_send() swim_begin_recv() swim_do_recv() swim_complete_recv() These functions will be used by on_input/on_output functions with different arguments. Part of #3234 --- src/lib/swim/swim_io.c | 129 +++++++++++++++++++++++++++++++++-------- 1 file changed, 104 insertions(+), 25 deletions(-) diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index ee05123c9..2c1448c1f 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -335,66 +335,135 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler) swim_scheduler_stop_input(scheduler); } -static void -swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) +/** + * Begin packet transmission. Prepare a next task in the queue to + * send its packet: build a meta header, pop the task from the + * queue. + * @param scheduler Scheduler to pop a task from. + * @param loop Event loop passed by libev. + * @param io Descriptor to send to. + * @param events Mask of happened events passed by libev. + * @param[out] dst Destination address to send the packet to. Can + * be different from task.dst, for example, if task.proxy + * is specified. + * + * @retval NULL The queue is empty. Input has been stopped. + * @retval not NULL A task ready to be sent. + */ +static struct swim_task * +swim_begin_send(struct swim_scheduler *scheduler, struct ev_loop *loop, + struct ev_io *io, int events, const struct sockaddr_in **dst) { assert((events & EV_WRITE) != 0); (void) events; - struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; if (rlist_empty(&scheduler->queue_output)) { /* * Possible, if a member pushed a task and then * was deleted together with it. */ swim_ev_io_stop(loop, io); - return; + return NULL; } struct swim_task *task = rlist_shift_entry(&scheduler->queue_output, struct swim_task, in_queue_output); const struct sockaddr_in *src = &scheduler->transport.addr; - const struct sockaddr_in *dst = &task->dst; - const char *dst_str = swim_inaddr_str(dst); + *dst = &task->dst; + const char *dst_str = swim_inaddr_str(*dst); if (! swim_inaddr_is_empty(&task->proxy)) { - dst = &task->proxy; + *dst = &task->proxy; dst_str = tt_sprintf("%s via %s", dst_str, - swim_inaddr_str(dst)); + swim_inaddr_str(*dst)); swim_packet_build_meta(&task->packet, src, src, &task->dst); } else { swim_packet_build_meta(&task->packet, src, NULL, NULL); } say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler), task->desc, dst_str); - int rc = swim_transport_send(&scheduler->transport, task->packet.buf, - task->packet.pos - task->packet.buf, - (const struct sockaddr *) dst, - sizeof(*dst)); - if (rc < 0) + return task; +} + +/** Send a packet into the network. */ +static inline ssize_t +swim_do_send(struct swim_scheduler *scheduler, const char *buf, int size, + const struct sockaddr_in *dst) +{ + return swim_transport_send(&scheduler->transport, buf, size, + (const struct sockaddr *) dst, sizeof(*dst)); +} + +/** + * Finalize packet transmission, call the completion callback. + * @param scheduler Scheduler owning @a task. + * @param task Sent (or failed to be sent) task. + * @param size Result of send(). + */ +static inline void +swim_complete_send(struct swim_scheduler *scheduler, struct swim_task *task, + ssize_t size) +{ + if (size < 0) diag_log(); if (task->complete != NULL) - task->complete(task, scheduler, rc); + task->complete(task, scheduler, size); } static void -swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events) +swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) +{ + struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; + const struct sockaddr_in *dst; + struct swim_task *task = swim_begin_send(scheduler, loop, io, events, + &dst); + if (task == NULL) + return; + ssize_t size = swim_do_send(scheduler, task->packet.buf, + task->packet.pos - task->packet.buf, dst); + swim_complete_send(scheduler, task, size); +} + +/** + * Begin packet receipt. Note, this function is no-op, and exists + * just for consistency with begin/do/complete_send() functions. + */ +static inline void +swim_begin_recv(struct swim_scheduler *scheduler, struct ev_loop *loop, + struct ev_io *io, int events) { assert((events & EV_READ) != 0); + (void) io; + (void) scheduler; (void) events; (void) loop; - struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; +} + +/** Receive a packet from the network. */ +static ssize_t +swim_do_recv(struct swim_scheduler *scheduler, char *buf, int size) +{ struct sockaddr_in src; socklen_t len = sizeof(src); - char buf[UDP_PACKET_SIZE]; - ssize_t size = swim_transport_recv(&scheduler->transport, buf, - sizeof(buf), - (struct sockaddr *) &src, &len); - if (size <= 0) { - if (size < 0) - goto error; - return; - } + ssize_t rc = swim_transport_recv(&scheduler->transport, buf, size, + (struct sockaddr *) &src, &len); + if (rc <= 0) + return rc; say_verbose("SWIM %d: received from %s", swim_scheduler_fd(scheduler), swim_inaddr_str(&src)); + return rc; +} + +/** + * Finalize packet receipt, call the SWIM core callbacks, or + * forward the packet to a next node. + */ +static void +swim_complete_recv(struct swim_scheduler *scheduler, const char *buf, + ssize_t size) +{ + if (size < 0) + goto error; + if (size == 0) + return; struct swim_meta_def meta; const char *pos = buf, *end = pos + size; if (swim_meta_def_decode(&meta, &pos, end) < 0) @@ -443,6 +512,16 @@ error: diag_log(); } +static void +swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events) +{ + struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data; + char buf[UDP_PACKET_SIZE]; + swim_begin_recv(scheduler, loop, io, events); + ssize_t size = swim_do_recv(scheduler, buf, UDP_PACKET_SIZE); + swim_complete_recv(scheduler, buf, size); +} + const char * swim_inaddr_str(const struct sockaddr_in *addr) { -- 2.20.1 (Apple Git-117)
next prev parent reply other threads:[~2019-04-29 18:13 UTC|newest] Thread overview: 7+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-04-29 18:13 [tarantool-patches] [PATCH 0/2] swim crypto Vladislav Shpilevoy 2019-04-29 18:13 ` Vladislav Shpilevoy [this message] 2019-04-29 18:13 ` [tarantool-patches] [PATCH 2/2] swim: implement and expose transport-level encryption Vladislav Shpilevoy 2019-05-08 8:52 ` [tarantool-patches] " Vladislav Shpilevoy 2019-05-08 9:11 ` Konstantin Osipov 2019-05-02 15:43 ` [tarantool-patches] Re: [PATCH 0/2] swim crypto Vladislav Shpilevoy 2019-05-13 21:57 [tarantool-patches] " Vladislav Shpilevoy 2019-05-13 21:57 ` [tarantool-patches] [PATCH 1/2] swim: split send/recv into phases Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=61a41781d019b3edcdec85519dd93f5ea9c3acd7.1556561487.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=kostja@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH 1/2] swim: split send/recv into phases' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox