From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id A1E5A2E743 for ; Sun, 9 Jun 2019 16:44:55 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id peRfEHvwPwy0 for ; Sun, 9 Jun 2019 16:44:55 -0400 (EDT) Received: from smtp39.i.mail.ru (smtp39.i.mail.ru [94.100.177.99]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 639E92E3DE for ; Sun, 9 Jun 2019 16:44:55 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber Date: Sun, 9 Jun 2019 23:44:38 +0300 Message-Id: <64f63cc1cea7a3c632561f36eb043cb386c4e472.1560112747.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko In order to implement asynchronous transaction we need to run a transaction completion handler but tx_prio is not able to yield. Prerequisites: #1254 --- src/box/wal.c | 48 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/src/box/wal.c b/src/box/wal.c index 86d021896..e868a8e71 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -100,6 +100,12 @@ struct wal_writer struct cpipe wal_pipe; /** A memory pool for messages. */ struct mempool msg_pool; + /** A queue to schedule journal entry completions. */ + struct stailq schedule_queue; + /** True if writer is in rollback state. */ + bool is_in_rollback; + /** A condition to signal about new schedule queue entries. */ + struct fiber_cond schedule_cond; /* ----------------- wal ------------------- */ /** A setting from instance configuration - rows_per_wal */ int64_t wal_max_rows; @@ -254,17 +260,36 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry) return xlog_tx_commit(l); } +/* + * Tx schedule fiber function. + */ +static int +tx_schedule_f(va_list ap) +{ + struct wal_writer *writer = va_arg(ap, struct wal_writer *); + while (!fiber_is_cancelled()) { + while (!stailq_empty(&writer->schedule_queue)) { + struct journal_entry *req = + stailq_shift_entry(&writer->schedule_queue, + struct journal_entry, fifo); + req->done = true; + fiber_cond_broadcast(&req->done_cond); + } + writer->is_in_rollback = false; + fiber_cond_wait(&writer->schedule_cond); + } + return 0; +} + /** - * Signal done condition. + * Attach requests to a scheduling queue. */ static void tx_schedule_queue(struct stailq *queue) { - struct journal_entry *req; - stailq_foreach_entry(req, queue, fifo) { - req->done = true; - fiber_cond_broadcast(&req->done_cond); - } + struct wal_writer *writer = &wal_writer_singleton; + stailq_concat(&writer->schedule_queue, queue); + fiber_cond_signal(&writer->schedule_cond); } /** @@ -309,6 +334,8 @@ tx_schedule_rollback(struct cmsg *msg) /* Must not yield. */ tx_schedule_queue(&writer->rollback); stailq_create(&writer->rollback); + writer->is_in_rollback = true; + if (msg != &writer->in_rollback) mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base)); @@ -359,6 +386,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, writer->wal_mode = wal_mode; writer->wal_max_rows = wal_max_rows; writer->wal_max_size = wal_max_size; + writer->is_in_rollback = false; journal_create(&writer->base, wal_mode == WAL_NONE ? wal_write_in_wal_mode_none : wal_write, wal_mode == WAL_NONE ? @@ -374,6 +402,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, stailq_create(&writer->rollback); cmsg_init(&writer->in_rollback, NULL); + stailq_create(&writer->schedule_queue); + fiber_cond_create(&writer->schedule_cond); + struct fiber *schedule_fiber = fiber_new("tx_schedule", tx_schedule_f); + if (schedule_fiber == NULL) + panic("Could not create schedule fiber"); + fiber_start(schedule_fiber, writer); writer->checkpoint_wal_size = 0; writer->checkpoint_threshold = INT64_MAX; @@ -1139,7 +1173,7 @@ wal_async_write(struct journal *journal, struct journal_entry *entry) ERROR_INJECT_RETURN(ERRINJ_WAL_IO); - if (! stailq_empty(&writer->rollback)) { + if (writer->is_in_rollback) { /* * The writer rollback queue is not empty, * roll back this transaction immediately. -- 2.21.0