From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v3 5/6] [RAW] swim: send one UDP packet per EV_WRITE event Date: Sat, 29 Dec 2018 13:14:14 +0300 Message-Id: <606bb0e206b7f6a4a45a92e9d200e35d58e815f1.1546077015.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com, kostja@tarantool.org List-ID: Since the first commit of #3234, where anti-entropy component was introduced, a single SWIM message could be split into multiple UDP packets. But so far these packets were being sent in mere 'for' loop on a single EV_WRITE event. It is not proper way of using event loop, but the simplest, because does not require any externally stored positions in packet lists. The previous commit introduced such global list of UDP packets to send, and now it is much simpler to send each packet on separate EV_WRITE event. This commit does it. Part of #3234 --- src/lib/swim/swim_io.c | 19 +++++++++++-------- src/lib/swim/swim_io.h | 1 + 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index 00a16a2bb..dadf14db2 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -60,6 +60,7 @@ swim_task_schedule(struct swim_task *task, swim_transport_send_f send, assert(! swim_task_is_active(task)); task->send = send; task->dst = *dst; + task->pos = swim_msg_first_packet(&task->msg); rlist_add_tail_entry(&task->scheduler->queue_output, task, in_queue_output); ev_io_start(loop(), &task->scheduler->output); @@ -133,19 +134,21 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) return; } struct swim_task *task = - rlist_shift_entry(&scheduler->queue_output, struct swim_task, + rlist_first_entry(&scheduler->queue_output, struct swim_task, in_queue_output); say_verbose("SWIM: send to %s", sio_strfaddr((struct sockaddr *) &task->dst, sizeof(task->dst))); - for (struct swim_packet *packet = swim_msg_first_packet(&task->msg); - packet != NULL; packet = swim_packet_next(packet)) { - if (task->send(io->fd, packet->body, packet->pos - packet->body, - (struct sockaddr *) &task->dst, - sizeof(task->dst)) == -1) - diag_log(); + struct swim_packet *packet = task->pos; + if (task->send(io->fd, packet->body, packet->pos - packet->body, + (struct sockaddr *) &task->dst, + sizeof(task->dst)) == -1) + diag_log(); + task->pos = swim_packet_next(packet); + if (task->pos == NULL) { + task->complete(task); + rlist_del_entry(task, in_queue_output); } - task->complete(task); } static void diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index f08bd1ef3..605542c4e 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -256,6 +256,7 @@ struct swim_task { swim_task_f complete; /** Message to send. */ struct swim_msg msg; + struct swim_packet *pos; /** Destination address. */ struct sockaddr_in dst; /** Place in a queue of tasks. */ -- 2.17.2 (Apple Git-113)