From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-lj1-f195.google.com (mail-lj1-f195.google.com [209.85.208.195]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 396054696C4 for ; Thu, 5 Mar 2020 15:31:54 +0300 (MSK) Received: by mail-lj1-f195.google.com with SMTP id u26so5854652ljd.8 for ; Thu, 05 Mar 2020 04:31:54 -0800 (PST) From: Cyrill Gorcunov Date: Thu, 5 Mar 2020 15:29:43 +0300 Message-Id: <20200305122943.7324-11-gorcunov@gmail.com> In-Reply-To: <20200305122943.7324-1-gorcunov@gmail.com> References: <20200305122943.7324-1-gorcunov@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 10/10] box/journal: redesign sync and async writes List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tml Currently the journal provides only one method -- write, which implies a callback to trigger upon write completion (in contrary with 1.10 series where all commits were processing in synchronous way). Lets make difference between sync and async writes more notable: provide journal::write_async method which takes a callback and callback data as an argument, while journal:write handle transaction in synchronous way. Redesing notes: 1) Both journal_write and journal_write_async require the caller to pass valid fiber->storage.txn, on error the txn must not be reset but preserved so callers would be able to run txn_rollback 2) txn_commit and txn_commit_async call txn_rollback where appropriate 3) no need to call journal_entry_complete on sync writes anymore, it is handled by txn_commit by self 4) wal_write_in_wal_mode_none is too long, renamed to wal_write_none Signed-off-by: Cyrill Gorcunov --- src/box/box.cc | 4 +- src/box/journal.c | 21 +++++++-- src/box/journal.h | 48 ++++++++++++++++++--- src/box/txn.c | 102 ++++++++++++++++++++++---------------------- src/box/wal.c | 106 ++++++++++++++++++++++++++++++++++++++++++---- 5 files changed, 211 insertions(+), 70 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index eb5931e37..03510eef9 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -322,14 +322,14 @@ recovery_journal_write(struct journal *base, { struct recovery_journal *journal = (struct recovery_journal *) base; entry->res = vclock_sum(journal->vclock); - journal_entry_complete(entry); return 0; } static inline void recovery_journal_create(struct recovery_journal *journal, struct vclock *v) { - journal_create(&journal->base, recovery_journal_write, NULL); + journal_create(&journal->base, journal_no_write_async, + recovery_journal_write, NULL); journal->vclock = v; journal_set(&journal->base); } diff --git a/src/box/journal.c b/src/box/journal.c index 266ee5d1f..2f78a4bc4 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -32,6 +32,21 @@ #include #include +int +journal_no_write_async(struct journal *journal, + struct journal_entry *entry, + journal_entry_complete_cb on_complete_cb, + void *on_complete_cb_data) +{ + (void)journal; + (void)entry; + (void)on_complete_cb; + (void)on_complete_cb_data; + + say_error("journal: write_async called from invalid context"); + return -1; +} + /** * Used to load from a memtx snapshot. LSN is not used, * but txn_commit() must work. @@ -41,13 +56,12 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry) { (void) journal; entry->res = 0; - journal_entry_complete(entry); return 0; } static struct journal dummy_journal = { - dummy_journal_write, - NULL, + .write_async = journal_no_write_async, + .write = dummy_journal_write, }; struct journal *current_journal = &dummy_journal; @@ -75,4 +89,3 @@ journal_entry_new(size_t n_rows, struct region *region) return entry; } - diff --git a/src/box/journal.h b/src/box/journal.h index e74c69910..fac4d4e78 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -34,6 +34,7 @@ #include #include "salad/stailq.h" #include "fiber.h" +#include "txn.h" #if defined(__cplusplus) extern "C" { @@ -110,6 +111,10 @@ journal_entry_complete(struct journal_entry *entry) * synchronous replication. */ struct journal { + int (*write_async)(struct journal *journal, + struct journal_entry *entry, + journal_entry_complete_cb on_complete_cb, + void *on_complete_cb_data); int (*write)(struct journal *journal, struct journal_entry *req); void (*destroy)(struct journal *journal); @@ -122,16 +127,33 @@ struct journal { extern struct journal *current_journal; /** - * Send a single entry to write. + * Write a single entry to the journal in synchronous way. * - * @return 0 if write was scheduled or -1 in case of an error. + * @return 0 if write was processed by a backend or -1 in case of an error. */ static inline int journal_write(struct journal_entry *entry) { + assert(in_txn() != NULL); return current_journal->write(current_journal, entry); } +/** + * Queue a single entry to the journal in asynchronous way. + * + * @return 0 if write was queued to a backend or -1 in case of an error. + */ +static inline int +journal_write_async(struct journal_entry *entry, + journal_entry_complete_cb on_complete_cb, + void *on_complete_cb_data) +{ + assert(in_txn() != NULL); + return current_journal->write_async(current_journal, entry, + on_complete_cb, + on_complete_cb_data); +} + /** * Change the current implementation of the journaling API. * Happens during life cycle of an instance: @@ -163,17 +185,33 @@ journal_set(struct journal *new_journal) static inline void journal_create(struct journal *journal, + int (*write_async)(struct journal *journal, + struct journal_entry *entry, + journal_entry_complete_cb on_complete_cb, + void *on_complete_cb_data), int (*write)(struct journal *, struct journal_entry *), void (*destroy)(struct journal *)) { - journal->write = write; - journal->destroy = destroy; + journal->write_async = write_async, + journal->write = write; + journal->destroy = destroy; } +/** + * A stub to issue an error in case if asynchronous + * write is diabled in the backend. + */ +extern int +journal_no_write_async(struct journal *journal, + struct journal_entry *entry, + journal_entry_complete_cb on_complete_cb, + void *on_complete_cb_data); + static inline bool journal_is_initialized(struct journal *journal) { - return journal->write != NULL; + return journal->write != NULL && + journal->write_async != NULL; } #if defined(__cplusplus) diff --git a/src/box/txn.c b/src/box/txn.c index 613da181b..27aa3d35e 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -463,7 +463,7 @@ txn_complete(struct txn *txn) } static void -txn_entry_complete_cb(struct journal_entry *entry, void *data) +txn_async_complete(struct journal_entry *entry, void *data) { struct txn *txn = data; txn->signature = entry->res; @@ -478,6 +478,10 @@ txn_entry_complete_cb(struct journal_entry *entry, void *data) fiber_set_txn(fiber(), NULL); } +/** + * Allocate new journal entry with transaction + * data to write. + */ static struct journal_entry * txn_journal_entry_new(struct txn *txn) { @@ -518,24 +522,6 @@ txn_journal_entry_new(struct txn *txn) return req; } -static int64_t -txn_write_to_wal(struct journal_entry *req) -{ - /* - * Send the entry to the journal. - * - * After this point the transaction must not be used - * so reset the corresponding key in the fiber storage. - */ - fiber_set_txn(fiber(), NULL); - if (journal_write(req) < 0) { - diag_set(ClientError, ER_WAL_IO); - diag_log(); - return -1; - } - return 0; -} - /* * Prepare a transaction using engines. */ @@ -596,42 +582,51 @@ txn_commit_nop(struct txn *txn) return false; } +/** + * Commit a transaction asynchronously, the + * completion is processed by a callback. + */ int txn_commit_async(struct txn *txn) { struct journal_entry *req; - if (txn_prepare(txn) != 0) { - txn_rollback(txn); - return -1; - } + if (txn_prepare(txn) != 0) + goto out_rollback; if (txn_commit_nop(txn)) return 0; req = txn_journal_entry_new(txn); - if (req == NULL) { - txn_rollback(txn); - return -1; + if (req == NULL) + goto out_rollback; + + if (journal_write_async(req, txn_async_complete, txn) != 0) { + diag_set(ClientError, ER_WAL_IO); + diag_log(); + goto out_rollback; } - req->on_complete_cb = txn_entry_complete_cb; - req->on_complete_cb_data = txn; - return txn_write_to_wal(req); + return 0; + +out_rollback: + txn_rollback(txn); + return -1; } +/** + * Commit a transaction synchronously. + */ int txn_commit(struct txn *txn) { struct journal_entry *req; - int res = -1; + int res; txn->fiber = fiber(); - if (txn_prepare(txn) != 0) { - txn_rollback(txn); - goto out; - } + if (txn_prepare(txn) != 0) + goto out_rollback; if (txn_commit_nop(txn)) { res = 0; @@ -639,33 +634,40 @@ txn_commit(struct txn *txn) } req = txn_journal_entry_new(txn); - if (req == NULL) { - txn_rollback(txn); - goto out; - } - req->on_complete_cb = txn_entry_complete_cb; - req->on_complete_cb_data = txn; - - if (txn_write_to_wal(req) != 0) - return -1; + if (req == NULL) + goto out_rollback; /* - * In case of non-yielding journal the transaction could already - * be done and there is nothing to wait in such cases. + * FIXME: Move error setup inside the + * journal engine itself. The ClientError + * here is too general. */ - if (!txn_has_flag(txn, TXN_IS_DONE)) { - bool cancellable = fiber_set_cancellable(false); - fiber_yield(); - fiber_set_cancellable(cancellable); + + if (journal_write(req) != 0) { + diag_set(ClientError, ER_WAL_IO); + diag_log(); + goto out_rollback; } + + txn->signature = req->res; res = txn->signature >= 0 ? 0 : -1; - if (res != 0) + if (res != 0) { diag_set(ClientError, ER_WAL_IO); + diag_log(); + } + txn_complete(txn); + fiber_set_txn(fiber(), NULL); out: + /* Synchronous transactions are freed by the calling fiber. */ txn_free(txn); return res; + +out_rollback: + res = -1; + txn_rollback(txn); + goto out; } void diff --git a/src/box/wal.c b/src/box/wal.c index 1668c9348..dd9563f31 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -32,6 +32,7 @@ #include "vclock.h" #include "fiber.h" +#include "txn.h" #include "fio.h" #include "errinj.h" #include "error.h" @@ -60,11 +61,19 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL }; int wal_dir_lock = -1; +static int +wal_write_async(struct journal *, struct journal_entry *, + journal_entry_complete_cb, void *); + static int wal_write(struct journal *, struct journal_entry *); static int -wal_write_in_wal_mode_none(struct journal *, struct journal_entry *); +wal_write_none_async(struct journal *, struct journal_entry *, + journal_entry_complete_cb, void *); + +static int +wal_write_none(struct journal *, struct journal_entry *); /* * WAL writer - maintain a Write Ahead Log for every change @@ -349,8 +358,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, { writer->wal_mode = wal_mode; writer->wal_max_size = wal_max_size; - journal_create(&writer->base, wal_mode == WAL_NONE ? - wal_write_in_wal_mode_none : wal_write, NULL); + journal_create(&writer->base, + wal_mode == WAL_NONE ? + wal_write_none_async : wal_write_async, + wal_mode == WAL_NONE ? + wal_write_none : wal_write, + NULL); struct xlog_opts opts = xlog_opts_default; opts.sync_is_async = true; @@ -1170,9 +1183,21 @@ wal_writer_f(va_list ap) * to be written to disk. */ static int -wal_write(struct journal *journal, struct journal_entry *entry) +wal_write_async(struct journal *journal, struct journal_entry *entry, + journal_entry_complete_cb on_complete_cb, + void *on_complete_cb_data) { struct wal_writer *writer = (struct wal_writer *) journal; + struct txn *txn = in_txn(); + + /* + * After this point the transaction will + * live on its own and processed via callbacks, + * so reset the fiber storage. + */ + entry->on_complete_cb = on_complete_cb; + entry->on_complete_cb_data = on_complete_cb_data; + fiber_set_txn(fiber(), NULL); ERROR_INJECT(ERRINJ_WAL_IO, { goto fail; @@ -1220,27 +1245,90 @@ wal_write(struct journal *journal, struct journal_entry *entry) return 0; fail: + /* + * Don't forget to restore transaction + * in a fiber storage: the caller should + * be able to run a rollback procedure. + */ + fiber_set_txn(fiber(), txn); entry->res = -1; - journal_entry_complete(entry); + txn->signature = -1; return -1; } +static void +wal_write_cb(struct journal_entry *entry, void *data) +{ + struct txn *txn = data; + (void)entry; + + /* + * On synchronous write just wake up + * the waiter which will complete the + * transaction. + */ + fiber_wakeup(txn->fiber); +} + +/* + * Queue entry to write and wait until it processed. + */ static int -wal_write_in_wal_mode_none(struct journal *journal, - struct journal_entry *entry) +wal_write(struct journal *journal, struct journal_entry *entry) { - struct wal_writer *writer = (struct wal_writer *) journal; + struct txn *txn = in_txn(); + + /* + * Lets reuse async WAL engine to shrink code a bit. + */ + if (wal_write_async(journal, entry, wal_write_cb, txn) != 0) + return -1; + + bool cancellable = fiber_set_cancellable(false); + fiber_yield(); + fiber_set_cancellable(cancellable); + + /* + * Unlike async write we preserve the transaction + * in a fiber storage where the caller should finish + * the transaction. + */ + fiber_set_txn(fiber(), txn); + return 0; +} + +static int +wal_write_none_async(struct journal *journal, + struct journal_entry *entry, + journal_entry_complete_cb on_complete_cb, + void *on_complete_cb_data) +{ + struct wal_writer *writer = (struct wal_writer *)journal; struct vclock vclock_diff; + struct txn *txn = in_txn(); + + (void)on_complete_cb; + (void)on_complete_cb_data; + + fiber_set_txn(fiber(), NULL); + vclock_create(&vclock_diff); wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows, entry->rows + entry->n_rows); vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); entry->res = vclock_sum(&writer->vclock); - journal_entry_complete(entry); + + txn->signature = entry->res; return 0; } +static int +wal_write_none(struct journal *journal, struct journal_entry *entry) +{ + return wal_write_none_async(journal, entry, NULL, NULL); +} + void wal_init_vy_log() { -- 2.20.1