From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 1C81C2F2D8 for ; Thu, 23 May 2019 04:21:37 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id mXiqQel-8Jvc for ; Thu, 23 May 2019 04:21:36 -0400 (EDT) Received: from smtp37.i.mail.ru (smtp37.i.mail.ru [94.100.177.97]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 7406C2F293 for ; Thu, 23 May 2019 04:19:46 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v2 6/8] Offload tx_prio processing to a fiber Date: Thu, 23 May 2019 11:19:38 +0300 Message-Id: <7ea0167cb98532c99905f74be0c07a95ead02d04.1558598679.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko --- src/box/box.cc | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index e10b73277..b8ef4b9ed 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -2061,13 +2061,41 @@ local_recovery(const struct tt_uuid *instance_uuid, } } +/* A structure containing tx_prio endpoint fiber context. */ +static struct tx_prio_ctx { + /* The fiber processing the tx_prio endpoint. */ + struct fiber *fiber; + /* True if there are more messages to process. */ + bool has_message; + /* Condition to signal when a new message arrived. */ + struct fiber_cond message_cond; +} tx_prio_ctx; + static void tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events) { (void) loop; (void) events; - struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data; - cbus_process(endpoint); + (void) watcher; + tx_prio_ctx.has_message = true; + fiber_cond_signal(&tx_prio_ctx.message_cond); +} + +/* + * Tx prio endpoint fiber function. + */ +static int +tx_prio_process_f(va_list ap) +{ + (void) ap; + while (!fiber_is_cancelled()) { + while (tx_prio_ctx.has_message) { + tx_prio_ctx.has_message = false; + cbus_process(&tx_prio_endpoint); + } + fiber_cond_wait(&tx_prio_ctx.message_cond); + } + return 0; } static void @@ -2119,6 +2147,12 @@ box_cfg_xc(void) IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR, FIBER_POOL_IDLE_TIMEOUT); /* Add an extra endpoint for WAL wake up/rollback messages. */ + memset(&tx_prio_ctx, 0, sizeof(struct tx_prio_ctx)); + fiber_cond_create(&tx_prio_ctx.message_cond); + tx_prio_ctx.fiber = fiber_new("tx_prio", tx_prio_process_f); + if (tx_prio_ctx.fiber == NULL) + panic("Could not create tx_prio fiber"); + fiber_start(tx_prio_ctx.fiber, NULL); cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint); rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX); -- 2.21.0