From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id C3AA52E2D2 for ; Sun, 9 Jun 2019 16:44:54 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id DBRSz5SphnkK for ; Sun, 9 Jun 2019 16:44:54 -0400 (EDT) Received: from smtp39.i.mail.ru (smtp39.i.mail.ru [94.100.177.99]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 5C0122E26D for ; Sun, 9 Jun 2019 16:44:54 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v3 08/14] wal: enable asyncronous wal writes Date: Sun, 9 Jun 2019 23:44:37 +0300 Message-Id: <6eb76383a0e8cb04b52e7c4dd0374e18cf536e23.1560112747.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Allow to send a journal entry to wal without to wait until the writing was finished. Two methods were introduced: * async_write method emits an entry to be written, returns 0 if the entry was successfully scheduled; * async_wait method waits until writing was finished and returns a result of journal write. Prerequisites: #1254 --- src/box/box.cc | 30 ++++++++++++++++++++--- src/box/journal.c | 21 +++++++++++++++- src/box/journal.h | 30 +++++++++++++++++++++++ src/box/wal.c | 62 +++++++++++++++++++++++++++++++++++++++++------ 4 files changed, 130 insertions(+), 13 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index a88e762c0..d0616095b 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -308,19 +308,41 @@ struct recovery_journal { * min/max LSN of created LSM levels. */ static int64_t -recovery_journal_write(struct journal *base, - struct journal_entry *entry) +recovery_journal_async_write(struct journal *base, + struct journal_entry *entry) { struct recovery_journal *journal = (struct recovery_journal *) base; + entry->res = vclock_sum(journal->vclock); entry->done = true; fiber_cond_broadcast(&entry->done_cond); - return vclock_sum(journal->vclock); + return 0; +} + +static int64_t +recovery_journal_async_wait(struct journal *base, + struct journal_entry *entry) +{ + (void) base; + assert(entry->done); + return entry->res; +} + +static int64_t +recovery_journal_write(struct journal *base, + struct journal_entry *entry) +{ + if (recovery_journal_async_write(base, entry) == 0) + return recovery_journal_async_wait(base, entry); + return -1; } static inline void recovery_journal_create(struct recovery_journal *journal, struct vclock *v) { - journal_create(&journal->base, recovery_journal_write, NULL); + journal_create(&journal->base, recovery_journal_write, + recovery_journal_async_write, + recovery_journal_async_wait, + NULL); journal->vclock = v; } diff --git a/src/box/journal.c b/src/box/journal.c index 6406d6f01..b978e6752 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -37,16 +37,35 @@ * but txn_commit() must work. */ static int64_t -dummy_journal_write(struct journal *journal, struct journal_entry *entry) +dummy_async_write(struct journal *journal, struct journal_entry *entry) { (void) journal; + entry->res = 0; entry->done = true; fiber_cond_broadcast(&entry->done_cond); return 0; } +static int64_t +dummy_async_wait(struct journal *journal, struct journal_entry *entry) +{ + (void) journal; + assert(entry->done); + return entry->res; +} + +static int64_t +dummy_journal_write(struct journal *journal, struct journal_entry *entry) +{ + if (dummy_async_write(journal, entry) == 0) + return dummy_async_wait(journal, entry); + return -1; +} + static struct journal dummy_journal = { dummy_journal_write, + dummy_async_write, + dummy_async_wait, NULL, }; diff --git a/src/box/journal.h b/src/box/journal.h index 618c68eb2..e7fe9154a 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -95,6 +95,10 @@ journal_entry_new(size_t n_rows, struct region *region); struct journal { int64_t (*write)(struct journal *journal, struct journal_entry *req); + int64_t (*async_write)(struct journal *journal, + struct journal_entry *req); + int64_t (*async_wait)(struct journal *journal, + struct journal_entry *req); void (*destroy)(struct journal *journal); }; @@ -116,6 +120,28 @@ journal_write(struct journal_entry *entry) return current_journal->write(current_journal, entry); } +/** + * Send a single entry to write. + * + * @return 0 if write was scheduled or -1 on error. + */ +static inline int64_t +journal_async_write(struct journal_entry *entry) +{ + return current_journal->async_write(current_journal, entry); +} + +/** + * Wait until entry processing finished. + * @return a log sequence number (vclock signature) of the entry + * or -1 on error. + */ +static inline int64_t +journal_async_wait(struct journal_entry *entry) +{ + return current_journal->async_wait(current_journal, entry); +} + /** * Change the current implementation of the journaling API. * Happens during life cycle of an instance: @@ -148,9 +174,13 @@ journal_set(struct journal *new_journal) static inline void journal_create(struct journal *journal, int64_t (*write)(struct journal *, struct journal_entry *), + int64_t (*async_write)(struct journal *, struct journal_entry *), + int64_t (*async_wait)(struct journal *, struct journal_entry *), void (*destroy)(struct journal *)) { journal->write = write; + journal->async_write = async_write, + journal->async_wait = async_wait, journal->destroy = destroy; } diff --git a/src/box/wal.c b/src/box/wal.c index 5951817d0..86d021896 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -63,9 +63,18 @@ int wal_dir_lock = -1; static int64_t wal_write(struct journal *, struct journal_entry *); +static int64_t +wal_async_write(struct journal *, struct journal_entry *); + +static int64_t +wal_async_wait(struct journal *, struct journal_entry *); + static int64_t wal_write_in_wal_mode_none(struct journal *, struct journal_entry *); +static int64_t +wal_async_write_in_wal_mode_none(struct journal *, struct journal_entry *); + /* * WAL writer - maintain a Write Ahead Log for every change * in the data state. @@ -351,7 +360,10 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, writer->wal_max_rows = wal_max_rows; writer->wal_max_size = wal_max_size; journal_create(&writer->base, wal_mode == WAL_NONE ? - wal_write_in_wal_mode_none : wal_write, NULL); + wal_write_in_wal_mode_none : wal_write, + wal_mode == WAL_NONE ? + wal_async_write_in_wal_mode_none: wal_async_write, + wal_async_wait, NULL); struct xlog_opts opts = xlog_opts_default; opts.sync_is_async = true; @@ -1118,10 +1130,10 @@ wal_writer_f(va_list ap) /** * WAL writer main entry point: queue a single request - * to be written to disk and wait until this task is completed. + * to be written to disk. */ -int64_t -wal_write(struct journal *journal, struct journal_entry *entry) +static int64_t +wal_async_write(struct journal *journal, struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; @@ -1138,6 +1150,8 @@ wal_write(struct journal *journal, struct journal_entry *entry) say_error("Aborting transaction %llu during " "cascading rollback", vclock_sum(&writer->vclock)); + entry->done = true; + fiber_cond_broadcast(&entry->done_cond); return -1; } @@ -1152,6 +1166,8 @@ wal_write(struct journal *journal, struct journal_entry *entry) if (batch == NULL) { diag_set(OutOfMemory, sizeof(struct wal_msg), "region", "struct wal_msg"); + entry->done = true; + fiber_cond_broadcast(&entry->done_cond); return -1; } wal_msg_create(batch); @@ -1166,16 +1182,34 @@ wal_write(struct journal *journal, struct journal_entry *entry) batch->approx_len += entry->approx_len; writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX; cpipe_flush_input(&writer->wal_pipe); + return 0; +} +static int64_t +wal_async_wait(struct journal *journal, struct journal_entry *entry) +{ + (void) journal; while (!entry->done) fiber_cond_wait(&entry->done_cond); return entry->res; } -int64_t -wal_write_in_wal_mode_none(struct journal *journal, - struct journal_entry *entry) +/** + * WAL writer main entry point: queue a single request + * to be written to disk and wait until this task is completed. + */ +static int64_t +wal_write(struct journal *journal, struct journal_entry *entry) +{ + if (wal_async_write(journal, entry) != 0) + return -1; + return wal_async_wait(journal, entry); +} + +static int64_t +wal_async_write_in_wal_mode_none(struct journal *journal, + struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; struct vclock vclock_diff; @@ -1184,7 +1218,19 @@ wal_write_in_wal_mode_none(struct journal *journal, entry->rows + entry->n_rows); vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); - return vclock_sum(&writer->vclock); + entry->res = vclock_sum(&writer->vclock); + entry->done = true; + fiber_cond_broadcast(&entry->done_cond); + return 0; +} + +static int64_t +wal_write_in_wal_mode_none(struct journal *journal, + struct journal_entry *entry) +{ + if (wal_async_write_in_wal_mode_none(journal, entry) != 0) + return -1; + return wal_async_wait(journal, entry); } void -- 2.21.0