From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 128C424AC6 for ; Fri, 5 Jul 2019 18:39:02 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id cMIL_RgExxLo for ; Fri, 5 Jul 2019 18:39:01 -0400 (EDT) Received: from smtp54.i.mail.ru (smtp54.i.mail.ru [217.69.128.34]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 52DB624A86 for ; Fri, 5 Jul 2019 18:39:01 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH 1/2] swim: pool IO tasks Date: Sat, 6 Jul 2019 00:40:08 +0200 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Before the patch each SWIM member had two preallocated task objects, 3KB in total. It was a waste of memory, because network load per member in SWIM is ~2 messages per round step regardless of cluster size. This patch moves the tasks to a pool, where they can be reused. Even by different SWIM instances running on the same node. --- src/lib/swim/swim.c | 78 +++++++++++++++++++++++++++---------- src/lib/swim/swim_io.c | 88 +++++++++++++++++++++++++++++++++++++++--- src/lib/swim/swim_io.h | 26 ++++++++++--- 3 files changed, 161 insertions(+), 31 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 7d07a353d..9a7b4ce85 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -414,10 +414,14 @@ struct swim_member { * message to it. */ struct heap_node in_wait_ack_heap; - /** Ready at hand regular ACK task. */ - struct swim_task ack_task; - /** Ready at hand regular PING task. */ - struct swim_task ping_task; + /** + * Last sent failure detection tasks. Kept so as + * 1) not to send them twice; + * 2) to be able to cancel them when the member is + * deleted. + */ + struct swim_task *ack_task; + struct swim_task *ping_task; }; #define mh_name _swim_table @@ -759,16 +763,31 @@ static void swim_ping_task_complete(struct swim_task *task, struct swim_scheduler *scheduler, int rc) { + struct swim *swim = swim_by_scheduler(scheduler); + struct swim_member *m = task->member; + assert(m != NULL); + assert(m->ping_task == task); /* * If ping send has failed, it makes no sense to wait for * an ACK. */ - if (rc < 0) - return; - struct swim *swim = swim_by_scheduler(scheduler); - struct swim_member *m = container_of(task, struct swim_member, - ping_task); - swim_wait_ack(swim, m, false); + if (rc >= 0) + swim_wait_ack(swim, m, false); + m->ping_task = NULL; + swim_task_delete(task); +} + +/** When ACK is completed, allow to send a new ACK. */ +static void +swim_ack_task_complete(struct swim_task *task, struct swim_scheduler *scheduler, + int rc) +{ + (void) scheduler; + (void) rc; + assert(task->member != NULL); + assert(task->member->ack_task == task); + task->member->ack_task = NULL; + swim_task_delete(task); } void @@ -802,8 +821,14 @@ swim_member_delete(struct swim_member *member) /* Failure detection component. */ assert(heap_node_is_stray(&member->in_wait_ack_heap)); - swim_task_destroy(&member->ack_task); - swim_task_destroy(&member->ping_task); + if (member->ack_task != NULL) { + swim_task_delete(member->ack_task); + member->ack_task = NULL; + } + if (member->ping_task != NULL) { + swim_task_delete(member->ping_task); + member->ping_task = NULL; + } /* Dissemination component. */ assert(rlist_empty(&member->in_dissemination_queue)); @@ -833,9 +858,6 @@ swim_member_new(const struct sockaddr_in *addr, const struct tt_uuid *uuid, /* Failure detection component. */ member->incarnation = *incarnation; heap_node_create(&member->in_wait_ack_heap); - swim_task_create(&member->ack_task, NULL, NULL, "ack"); - swim_task_create(&member->ping_task, swim_ping_task_complete, NULL, - "ping"); /* Dissemination component. */ rlist_create(&member->in_dissemination_queue); @@ -1340,7 +1362,7 @@ swim_iping_task_complete(struct swim_task *task, if (m != NULL && m->status != MEMBER_ALIVE) swim_wait_ack(swim, m, true); finish: - swim_task_delete_cb(task, scheduler, rc); + swim_task_delete(task); } /** @@ -1439,7 +1461,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events) default: unreachable(); } - swim_send_ping(swim, &m->ping_task, &m->addr); + m->ping_task = swim_task_new(swim_ping_task_complete, NULL, + "ping"); + if (m->ping_task != NULL) { + m->ping_task->member = m; + swim_send_ping(swim, m->ping_task, &m->addr); + } else { + diag_log(); + } } } @@ -1683,8 +1712,16 @@ swim_process_failure_detection(struct swim *swim, const char **pos, if (swim_send_indirect_ack(swim, &member->addr, proxy) != 0) diag_log(); - } else if (! swim_task_is_scheduled(&member->ack_task)) { - swim_send_ack(swim, &member->ack_task, &member->addr); + } else if (member->ack_task == NULL) { + member->ack_task = swim_task_new(swim_ack_task_complete, + NULL, "ack"); + if (member->ack_task != NULL) { + member->ack_task->member = member; + swim_send_ack(swim, member->ack_task, + &member->addr); + } else { + diag_log(); + } } break; case SWIM_FD_MSG_ACK: @@ -2297,8 +2334,7 @@ swim_quit(struct swim *swim) swim_new_round(swim); struct swim_task *task = &swim->round_step_task; swim_task_destroy(task); - swim_task_create(task, swim_quit_step_complete, swim_task_delete_cb, - "quit"); + swim_task_create(task, swim_quit_step_complete, NULL, "quit"); char *header = swim_packet_alloc(&task->packet, 1); int rc = swim_encode_src_uuid(swim, &task->packet) + swim_encode_quit(swim, &task->packet); diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index e7ff321d4..d83f52ae9 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -36,6 +36,59 @@ #include #include +enum { + /** + * A rough estimation of how many tasks a SWIM instance + * would need simultaneously. 1 for an ACK, 2 for indirect + * ping, 1 for direct ping. Total is 4 for normal + * operation. Others are 1) to get a beautiful number, + * 2) in case random() is not perfect and this instance + * interacts with 2 and more other instances during one + * round. + */ + TASKS_PER_SCHEDULER = 16, +}; + +/** + * All the SWIM instances and their members use the same objects + * to send data - tasks. Each task is ~1.5KB, and on one hand it + * would be a waste of memory to keep preallocated tasks for each + * member. One the other hand it would be too slow to allocate + * and delete ~1.5KB on each interaction, ~3KB on each round step. + * Here is a pool of free tasks shared among all SWIM instances + * to avoid allocations, but do not keep a separate task for each + * member. + */ +static struct stailq swim_task_pool; +/** Number of pooled tasks. */ +static int swim_task_pool_size = 0; +/** + * Number of currently active schedulers. Used to limit max size + * of the pool. + */ +static int scheduler_count = 0; + +/** First scheduler should create the pool. */ +static inline void +swim_task_pool_create(void) +{ + assert(scheduler_count == 1); + assert(swim_task_pool_size == 0); + stailq_create(&swim_task_pool); +} + +/** Last scheduler destroys the pool. */ +static inline void +swim_task_pool_destroy(void) +{ + assert(scheduler_count == 0); + while (! stailq_empty(&swim_task_pool)) { + free(stailq_shift_entry(&swim_task_pool, struct swim_task, + in_pool)); + } + swim_task_pool_size = 0; +} + /** * Allocate memory for meta. The same as mere alloc, but moves * body pointer. @@ -124,22 +177,42 @@ swim_task_create(struct swim_task *task, swim_task_f complete, struct swim_task * swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc) { - struct swim_task *task = (struct swim_task *) malloc(sizeof(*task)); - if (task == NULL) { - diag_set(OutOfMemory, sizeof(*task), "malloc", "task"); - return NULL; + struct swim_task *task; + if (swim_task_pool_size > 0) { + assert(! stailq_empty(&swim_task_pool)); + --swim_task_pool_size; + task = stailq_shift_entry(&swim_task_pool, struct swim_task, + in_pool); + } else { + task = (struct swim_task *) malloc(sizeof(*task)); + if (task == NULL) { + diag_set(OutOfMemory, sizeof(*task), "malloc", "task"); + return NULL; + } } swim_task_create(task, complete, cancel, desc); return task; } +void +swim_task_delete(struct swim_task *task) +{ + swim_task_destroy(task); + if (swim_task_pool_size < TASKS_PER_SCHEDULER * scheduler_count) { + stailq_add_entry(&swim_task_pool, task, in_pool); + ++swim_task_pool_size; + } else { + free(task); + } +} + void swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler, int rc) { (void) rc; (void) scheduler; - free(task); + swim_task_delete(task); } /** Put the task into the queue of output tasks. */ @@ -283,6 +356,8 @@ swim_scheduler_create(struct swim_scheduler *scheduler, CRYPTO_MODE_ECB, NULL, 0); assert(rc == 0); (void) rc; + if (++scheduler_count == 1) + swim_task_pool_create(); } int @@ -326,6 +401,9 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler) swim_transport_destroy(&scheduler->transport); swim_ev_io_stop(swim_loop(), &scheduler->output); swim_scheduler_stop_input(scheduler); + assert(scheduler_count > 0); + if (--scheduler_count == 0) + swim_task_pool_destroy(); } /** diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index 6bf42cb05..3e631935d 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -46,6 +46,7 @@ */ struct swim_task; +struct swim_member; struct swim_scheduler; enum { @@ -247,11 +248,22 @@ struct swim_task { * A short description of the packet content. For logging. */ const char *desc; - /** - * Sender's UUID used by ping tasks to schedule deadline - * for an ACK. - */ - struct tt_uuid uuid; + union { + /** + * Receiver's UUID used by ping tasks to schedule + * deadline for an ACK. + */ + struct tt_uuid uuid; + /** + * Alternative to UUID - direct pointer to the + * receiver member. It works, when members and + * tasks of a certain type are isomorphic. It is + * faster than lookup by UUID. + */ + struct swim_member *member; + }; + /** Link in the task pool. */ + struct stailq_entry in_pool; }; /** Check if @a task is already scheduled. */ @@ -284,6 +296,10 @@ swim_task_create(struct swim_task *task, swim_task_f complete, struct swim_task * swim_task_new(swim_task_f complete, swim_task_f cancel, const char *desc); +/** Destroy a task, free its memory. */ +void +swim_task_delete(struct swim_task *task); + /** Callback to delete a task after its completion. */ void swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler, -- 2.20.1 (Apple Git-117)