From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-lj1-f193.google.com (mail-lj1-f193.google.com [209.85.208.193]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 6733D430D51 for ; Thu, 19 Mar 2020 12:07:48 +0300 (MSK) Received: by mail-lj1-f193.google.com with SMTP id u15so1254055lji.10 for ; Thu, 19 Mar 2020 02:07:48 -0700 (PDT) From: Cyrill Gorcunov Date: Thu, 19 Mar 2020 12:05:37 +0300 Message-Id: <20200319090537.5613-12-gorcunov@gmail.com> In-Reply-To: <20200319090537.5613-1-gorcunov@gmail.com> References: <20200319090537.5613-1-gorcunov@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v13 11/11] box/journal: redesign journal operations List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tml Currently the journal provides only one method -- write, which implies a callback to trigger upon write completion (in contrary with 1.10 series where all commits were processing in synchronous way). Lets make difference between sync and async writes more notable: provide journal::write_async method which takes a callback data as an argument, while journal:write handle transaction in synchronous way. Redesing notes: 1) The callback for async write set once in journal creation. There is no need to carry callback in every journal entry. This allows us to save some memory 2) Both journal_write and journal_write_async require the caller to pass valid fiber->storage.txn, on error the txn must not be reset but preserved so callers would be able to run txn_rollback 3) txn_commit and txn_commit_async call txn_rollback where appropriate 4) no need to call journal_entry_complete on sync writes anymore 5) wal_write_in_wal_mode_none is too long, renamed to wal_write_none Signed-off-by: Cyrill Gorcunov --- src/box/box.cc | 6 +-- src/box/journal.c | 39 +++++++++++++++----- src/box/journal.h | 94 +++++++++++++++++++++++++++++++++-------------- src/box/txn.c | 53 ++++++++++++-------------- src/box/vy_log.c | 4 +- src/box/wal.c | 74 ++++++++++++++++++++++++++++++++----- 6 files changed, 191 insertions(+), 79 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index cf79affca..87ddbeb3a 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -322,7 +322,6 @@ recovery_journal_write(struct journal *base, { struct recovery_journal *journal = (struct recovery_journal *) base; entry->res = vclock_sum(journal->vclock); - journal_entry_complete(entry); return 0; } @@ -330,8 +329,9 @@ static void recovery_journal_create(struct vclock *v) { static struct recovery_journal journal; - - journal_create(&journal.base, recovery_journal_write, NULL); + journal_create(&journal.base, journal_no_write_async, + journal_no_write_async_cb, + recovery_journal_write, NULL); journal.vclock = v; journal_set(&journal.base); } diff --git a/src/box/journal.c b/src/box/journal.c index 11e78990d..036aad87a 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -32,6 +32,29 @@ #include #include +int +journal_no_write_async(struct journal *journal, + struct journal_entry *entry, + void *complete_data) +{ + (void)complete_data; + (void)journal; + + say_error("journal: write_async invalid context"); + entry->res = -1; + return -1; +} + +void +journal_no_write_async_cb(struct journal_entry *entry, + void *complete_data) +{ + (void)complete_data; + + say_error("journal: write_async_cb invalid context"); + entry->res = -1; +} + /** * Used to load from a memtx snapshot. LSN is not used, * but txn_commit() must work. @@ -41,21 +64,19 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry) { (void) journal; entry->res = 0; - journal_entry_complete(entry); return 0; } static struct journal dummy_journal = { - dummy_journal_write, - NULL, + .write_async = journal_no_write_async, + .write_async_cb = journal_no_write_async_cb, + .write = dummy_journal_write, }; struct journal *current_journal = &dummy_journal; struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region, - journal_entry_complete_cb on_complete_cb, - void *on_complete_cb_data) +journal_entry_new(size_t n_rows, struct region *region) { struct journal_entry *entry; @@ -68,11 +89,11 @@ journal_entry_new(size_t n_rows, struct region *region, diag_set(OutOfMemory, size, "region", "struct journal_entry"); return NULL; } + + entry->complete_data = NULL; entry->approx_len = 0; entry->n_rows = n_rows; entry->res = -1; - entry->on_complete_cb = on_complete_cb; - entry->on_complete_cb_data = on_complete_cb_data; + return entry; } - diff --git a/src/box/journal.h b/src/box/journal.h index 64f167c6f..e1947edd1 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -34,6 +34,7 @@ #include #include "salad/stailq.h" #include "fiber.h" +#include "txn.h" #if defined(__cplusplus) extern "C" { @@ -60,17 +61,10 @@ struct journal_entry { * the committed transaction, on error is -1 */ int64_t res; - /** - * A journal entry finalization callback which is going to be called - * after the entry processing was finished in both cases: success - * or fail. Entry->res is set to a result value before the callback - * is fired. - */ - journal_entry_complete_cb on_complete_cb; /** * A journal entry completion callback argument. */ - void *on_complete_cb_data; + void *complete_data; /** * Approximate size of this request when encoded. */ @@ -93,18 +87,7 @@ struct region; * @return NULL if out of memory, fiber diagnostics area is set */ struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region, - journal_entry_complete_cb on_complete_cb, - void *on_complete_cb_data); - -/** - * Finalize a single entry. - */ -static inline void -journal_entry_complete(struct journal_entry *entry) -{ - entry->on_complete_cb(entry, entry->on_complete_cb_data); -} +journal_entry_new(size_t n_rows, struct region *region); /** * An API for an abstract journal for all transactions of this @@ -112,11 +95,33 @@ journal_entry_complete(struct journal_entry *entry) * synchronous replication. */ struct journal { + /** Asynchronous write */ + int (*write_async)(struct journal *journal, + struct journal_entry *entry, + void *complete_data); + + /** Asynchronous write completion */ + void (*write_async_cb)(struct journal_entry *entry, + void *complete_data); + + /** Synchronous write */ int (*write)(struct journal *journal, - struct journal_entry *req); + struct journal_entry *entry); + + /** Journal destroy */ void (*destroy)(struct journal *journal); }; +/** + * Finalize a single entry. + */ +static inline void +journal_async_complete(struct journal *journal, struct journal_entry *entry) +{ + assert(journal->write_async_cb != NULL); + journal->write_async_cb(entry, entry->complete_data); +} + /** * Depending on the step of recovery and instance configuration * points at a concrete implementation of the journal. @@ -124,16 +129,30 @@ struct journal { extern struct journal *current_journal; /** - * Send a single entry to write. + * Write a single entry to the journal in synchronous way. * - * @return 0 if write was scheduled or -1 in case of an error. + * @return 0 if write was processed by a backend or -1 in case of an error. */ static inline int journal_write(struct journal_entry *entry) { + assert(in_txn() != NULL); return current_journal->write(current_journal, entry); } +/** + * Queue a single entry to the journal in asynchronous way. + * + * @return 0 if write was queued to a backend or -1 in case of an error. + */ +static inline int +journal_write_async(struct journal_entry *entry, void *complete_data) +{ + assert(in_txn() != NULL); + return current_journal->write_async(current_journal, entry, + complete_data); +} + /** * Change the current implementation of the journaling API. * Happens during life cycle of an instance: @@ -165,13 +184,34 @@ journal_set(struct journal *new_journal) static inline void journal_create(struct journal *journal, - int (*write)(struct journal *, struct journal_entry *), - void (*destroy)(struct journal *)) + int (*write_async)(struct journal *journal, + struct journal_entry *entry, + void *complete_data), + void (*write_async_cb)(struct journal_entry *entry, + void *complete_data), + int (*write)(struct journal *journal, + struct journal_entry *entry), + void (*destroy)(struct journal *journal)) { - journal->write = write; - journal->destroy = destroy; + journal->write_async = write_async; + journal->write_async_cb = write_async_cb; + journal->write = write; + journal->destroy = destroy; } +/** + * A stub to issue an error in case if asynchronous + * write is diabled in the backend. + */ +extern int +journal_no_write_async(struct journal *journal, + struct journal_entry *entry, + void *complete_data); + +extern void +journal_no_write_async_cb(struct journal_entry *entry, + void *complete_data); + static inline bool journal_is_initialized(struct journal *journal) { diff --git a/src/box/txn.c b/src/box/txn.c index 11c20aceb..33302586d 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -487,7 +487,7 @@ txn_journal_entry_new(struct txn *txn) assert(txn->n_new_rows + txn->n_applier_rows > 0); req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows, - &txn->region, txn_complete_async, txn); + &txn->region); if (req == NULL) return NULL; @@ -518,24 +518,6 @@ txn_journal_entry_new(struct txn *txn) return req; } -static int64_t -txn_write_to_wal(struct journal_entry *req) -{ - /* - * Send the entry to the journal. - * - * After this point the transaction must not be used - * so reset the corresponding key in the fiber storage. - */ - fiber_set_txn(fiber(), NULL); - if (journal_write(req) < 0) { - diag_set(ClientError, ER_WAL_IO); - diag_log(); - return -1; - } - return 0; -} - /* * Prepare a transaction using engines. */ @@ -608,7 +590,15 @@ txn_commit_async(struct txn *txn) return -1; } - return txn_write_to_wal(req); + if (journal_write_async(req, txn) != 0) { + txn_rollback(txn); + + diag_set(ClientError, ER_WAL_IO); + diag_log(); + return -1; + } + + return 0; } int @@ -636,21 +626,26 @@ txn_commit(struct txn *txn) return -1; } - if (txn_write_to_wal(req) != 0) + if (journal_write(req) != 0) { + txn_rollback(txn); + txn_free(txn); + + diag_set(ClientError, ER_WAL_IO); + diag_log(); return -1; + } - /* - * In case of non-yielding journal the transaction could already - * be done and there is nothing to wait in such cases. - */ if (!txn_has_flag(txn, TXN_IS_DONE)) { - bool cancellable = fiber_set_cancellable(false); - fiber_yield(); - fiber_set_cancellable(cancellable); + txn->signature = req->res; + txn_complete(txn); + fiber_set_txn(fiber(), NULL); } + int res = txn->signature >= 0 ? 0 : -1; - if (res != 0) + if (res != 0) { diag_set(ClientError, ER_WAL_IO); + diag_log(); + } /* Synchronous transactions are freed by the calling fiber. */ txn_free(txn); diff --git a/src/box/vy_log.c b/src/box/vy_log.c index cb291f3c8..92171ec21 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -815,8 +815,8 @@ vy_log_tx_flush(struct vy_log_tx *tx) tx_size++; size_t used = region_used(&fiber()->gc); - struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc, - NULL, NULL); + + struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc); if (entry == NULL) goto err; diff --git a/src/box/wal.c b/src/box/wal.c index 1668c9348..ba9f22e7a 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -32,6 +32,7 @@ #include "vclock.h" #include "fiber.h" +#include "txn.h" #include "fio.h" #include "errinj.h" #include "error.h" @@ -60,11 +61,17 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL }; int wal_dir_lock = -1; +static int +wal_write_async(struct journal *, struct journal_entry *, void *); + static int wal_write(struct journal *, struct journal_entry *); static int -wal_write_in_wal_mode_none(struct journal *, struct journal_entry *); +wal_write_none_async(struct journal *, struct journal_entry *, void *); + +static int +wal_write_none(struct journal *, struct journal_entry *); /* * WAL writer - maintain a Write Ahead Log for every change @@ -253,9 +260,10 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry) static void tx_schedule_queue(struct stailq *queue) { + struct wal_writer *writer = &wal_writer_singleton; struct journal_entry *req, *tmp; stailq_foreach_entry_safe(req, tmp, queue, fifo) - journal_entry_complete(req); + journal_async_complete(&writer->base, req); } /** @@ -349,8 +357,14 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, { writer->wal_mode = wal_mode; writer->wal_max_size = wal_max_size; - journal_create(&writer->base, wal_mode == WAL_NONE ? - wal_write_in_wal_mode_none : wal_write, NULL); + + journal_create(&writer->base, + wal_mode == WAL_NONE ? + wal_write_none_async : wal_write_async, + txn_complete_async, + wal_mode == WAL_NONE ? + wal_write_none : wal_write, + NULL); struct xlog_opts opts = xlog_opts_default; opts.sync_is_async = true; @@ -1170,9 +1184,18 @@ wal_writer_f(va_list ap) * to be written to disk. */ static int -wal_write(struct journal *journal, struct journal_entry *entry) +wal_write_async(struct journal *journal, struct journal_entry *entry, + void *complete_data) { struct wal_writer *writer = (struct wal_writer *) journal; + entry->complete_data = complete_data; + struct txn *txn = complete_data; + + /* + * After this point the transaction should not + * be bound to the fiber, it handled by a callback. + */ + fiber_set_txn(fiber(), NULL); ERROR_INJECT(ERRINJ_WAL_IO, { goto fail; @@ -1220,14 +1243,39 @@ wal_write(struct journal *journal, struct journal_entry *entry) return 0; fail: + /* + * Don't forget to restore transaction + * in a fiber storage: the caller should + * be able to run a rollback procedure. + */ + fiber_set_txn(fiber(), txn); entry->res = -1; - journal_entry_complete(entry); return -1; } static int -wal_write_in_wal_mode_none(struct journal *journal, - struct journal_entry *entry) +wal_write(struct journal *journal, struct journal_entry *entry) +{ + struct txn *txn = in_txn(); + + /* + * We can reuse async WAL engine transparently + * to the caller. + */ + if (wal_write_async(journal, entry, txn) != 0) + return -1; + + bool cancellable = fiber_set_cancellable(false); + fiber_yield(); + fiber_set_cancellable(cancellable); + + return 0; +} + +static int +wal_write_none_async(struct journal *journal, + struct journal_entry *entry, + void *complete_data) { struct wal_writer *writer = (struct wal_writer *) journal; struct vclock vclock_diff; @@ -1237,10 +1285,18 @@ wal_write_in_wal_mode_none(struct journal *journal, vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); entry->res = vclock_sum(&writer->vclock); - journal_entry_complete(entry); + + (void)complete_data; + fiber_set_txn(fiber(), NULL); return 0; } +static int +wal_write_none(struct journal *journal, struct journal_entry *entry) +{ + return wal_write_none_async(journal, entry, NULL); +} + void wal_init_vy_log() { -- 2.20.1