From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-lj1-f196.google.com (mail-lj1-f196.google.com [209.85.208.196]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id EF51F469719 for ; Sat, 7 Mar 2020 00:48:03 +0300 (MSK) Received: by mail-lj1-f196.google.com with SMTP id r7so3753856ljp.10 for ; Fri, 06 Mar 2020 13:48:03 -0800 (PST) Date: Sat, 7 Mar 2020 00:48:01 +0300 From: Konstantin Osipov Message-ID: <20200306214801.GJ8140@atlas> References: <20200305122943.7324-1-gorcunov@gmail.com> <20200305122943.7324-11-gorcunov@gmail.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20200305122943.7324-11-gorcunov@gmail.com> Subject: Re: [Tarantool-patches] [PATCH 10/10] box/journal: redesign sync and async writes List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Cyrill Gorcunov Cc: tml You're on track overall, please see comments inline. * Cyrill Gorcunov [20/03/05 15:32]: > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -322,14 +322,14 @@ recovery_journal_write(struct journal *base, > { > struct recovery_journal *journal = (struct recovery_journal *) base; > entry->res = vclock_sum(journal->vclock); > - journal_entry_complete(entry); > return 0; Yes! > } > > static inline void > recovery_journal_create(struct recovery_journal *journal, struct vclock *v) > { > - journal_create(&journal->base, recovery_journal_write, NULL); > + journal_create(&journal->base, journal_no_write_async, > + recovery_journal_write, NULL); Nice. > journal->vclock = v; > journal_set(&journal->base); > } > diff --git a/src/box/journal.c b/src/box/journal.c > index 266ee5d1f..2f78a4bc4 100644 > --- a/src/box/journal.c > +++ b/src/box/journal.c > @@ -32,6 +32,21 @@ > #include > #include > > +int > +journal_no_write_async(struct journal *journal, > + struct journal_entry *entry, > + journal_entry_complete_cb on_complete_cb, > + void *on_complete_cb_data) > +{ > + (void)journal; > + (void)entry; > + (void)on_complete_cb; > + (void)on_complete_cb_data; > + > + say_error("journal: write_async called from invalid context"); Please panic. > + return -1; > +} > + > /** > * Used to load from a memtx snapshot. LSN is not used, > * but txn_commit() must work. > @@ -41,13 +56,12 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry) > { > (void) journal; > entry->res = 0; > - journal_entry_complete(entry); Nice! > --- a/src/box/journal.h > +++ b/src/box/journal.h > @@ -34,6 +34,7 @@ > #include > #include "salad/stailq.h" > #include "fiber.h" > +#include "txn.h" Stray include. Please remove. > /** > - * Send a single entry to write. > + * Write a single entry to the journal in synchronous way. > * > - * @return 0 if write was scheduled or -1 in case of an error. > + * @return 0 if write was processed by a backend or -1 in case of an error. > */ > static inline int > journal_write(struct journal_entry *entry) > { > + assert(in_txn() != NULL); You should not create a dependency loop just because of an assert. > static inline bool > journal_is_initialized(struct journal *journal) > { > - return journal->write != NULL; > + return journal->write != NULL && > + journal->write_async != NULL; This is an unnecessary change. They are always set together. Please feel free to add a flag is_initialized if you don't like checking a single member. > static void > -txn_entry_complete_cb(struct journal_entry *entry, void *data) > +txn_async_complete(struct journal_entry *entry, void *data) > { Yes! > struct txn *txn = data; > txn->signature = entry->res; > @@ -478,6 +478,10 @@ txn_entry_complete_cb(struct journal_entry *entry, void *data) > fiber_set_txn(fiber(), NULL); > } > > +/** > + * Allocate new journal entry with transaction > + * data to write. > + */ > static struct journal_entry * > txn_journal_entry_new(struct txn *txn) > { > @@ -518,24 +522,6 @@ txn_journal_entry_new(struct txn *txn) > return req; > } > > -static int64_t > -txn_write_to_wal(struct journal_entry *req) > -{ > - /* > - * Send the entry to the journal. > - * > - * After this point the transaction must not be used > - * so reset the corresponding key in the fiber storage. > - */ > - fiber_set_txn(fiber(), NULL); > - if (journal_write(req) < 0) { > - diag_set(ClientError, ER_WAL_IO); > - diag_log(); > - return -1; > - } > - return 0; > -} Awesome. > - > /* > * Prepare a transaction using engines. > */ > @@ -596,42 +582,51 @@ txn_commit_nop(struct txn *txn) > return false; > } > > +/** > + * Commit a transaction asynchronously, the > + * completion is processed by a callback. > + */ > int > txn_commit_async(struct txn *txn) > { > struct journal_entry *req; > > - if (txn_prepare(txn) != 0) { > - txn_rollback(txn); > - return -1; > - } > + if (txn_prepare(txn) != 0) > + goto out_rollback; Why not simply rollback: or out: out_rollback suggests there more out_s. > > if (txn_commit_nop(txn)) > return 0; > > req = txn_journal_entry_new(txn); > - if (req == NULL) { > - txn_rollback(txn); > - return -1; > + if (req == NULL) > + goto out_rollback; > + > + if (journal_write_async(req, txn_async_complete, txn) != 0) { > + diag_set(ClientError, ER_WAL_IO); > + diag_log(); > + goto out_rollback; > } > - req->on_complete_cb = txn_entry_complete_cb; > - req->on_complete_cb_data = txn; > > - return txn_write_to_wal(req); > + return 0; > + > +out_rollback: > + txn_rollback(txn); > + return -1; > } > > +/** > + * Commit a transaction synchronously. > + */ > int > txn_commit(struct txn *txn) > { > struct journal_entry *req; > - int res = -1; > + int res; > > txn->fiber = fiber(); I believe you don't have to set it twice, it's already set. > > - if (txn_prepare(txn) != 0) { > - txn_rollback(txn); > - goto out; > - } > + if (txn_prepare(txn) != 0) > + goto out_rollback; > > if (txn_commit_nop(txn)) { > res = 0; > @@ -639,33 +634,40 @@ txn_commit(struct txn *txn) > } > > req = txn_journal_entry_new(txn); > - if (req == NULL) { > - txn_rollback(txn); > - goto out; > - } > - req->on_complete_cb = txn_entry_complete_cb; > - req->on_complete_cb_data = txn; > - > - if (txn_write_to_wal(req) != 0) > - return -1; > + if (req == NULL) > + goto out_rollback; > > /* > - * In case of non-yielding journal the transaction could already > - * be done and there is nothing to wait in such cases. > + * FIXME: Move error setup inside the > + * journal engine itself. The ClientError > + * here is too general. > */ > - if (!txn_has_flag(txn, TXN_IS_DONE)) { > - bool cancellable = fiber_set_cancellable(false); > - fiber_yield(); > - fiber_set_cancellable(cancellable); > + > + if (journal_write(req) != 0) { > + diag_set(ClientError, ER_WAL_IO); > + diag_log(); > + goto out_rollback; > } > + > + txn->signature = req->res; > res = txn->signature >= 0 ? 0 : -1; > - if (res != 0) > + if (res != 0) { > diag_set(ClientError, ER_WAL_IO); > + diag_log(); > + } > > + txn_complete(txn); > + fiber_set_txn(fiber(), NULL); > out: > + > /* Synchronous transactions are freed by the calling fiber. */ > txn_free(txn); > return res; > + > +out_rollback: > + res = -1; > + txn_rollback(txn); > + goto out; > } > > void > diff --git a/src/box/wal.c b/src/box/wal.c > index 1668c9348..dd9563f31 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -32,6 +32,7 @@ > > #include "vclock.h" > #include "fiber.h" > +#include "txn.h" Noooo. Please avoid dependency loops. > #include "fio.h" > #include "errinj.h" > #include "error.h" > @@ -60,11 +61,19 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL }; > > int wal_dir_lock = -1; > > +static int > +wal_write_async(struct journal *, struct journal_entry *, > + journal_entry_complete_cb, void *); > + > static int > wal_write(struct journal *, struct journal_entry *); > > static int > -wal_write_in_wal_mode_none(struct journal *, struct journal_entry *); > +wal_write_none_async(struct journal *, struct journal_entry *, > + journal_entry_complete_cb, void *); > + > +static int > +wal_write_none(struct journal *, struct journal_entry *); > > /* > * WAL writer - maintain a Write Ahead Log for every change > @@ -349,8 +358,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, > { > writer->wal_mode = wal_mode; > writer->wal_max_size = wal_max_size; > - journal_create(&writer->base, wal_mode == WAL_NONE ? > - wal_write_in_wal_mode_none : wal_write, NULL); > + journal_create(&writer->base, > + wal_mode == WAL_NONE ? > + wal_write_none_async : wal_write_async, > + wal_mode == WAL_NONE ? > + wal_write_none : wal_write, > + NULL); > > struct xlog_opts opts = xlog_opts_default; > opts.sync_is_async = true; > @@ -1170,9 +1183,21 @@ wal_writer_f(va_list ap) > * to be written to disk. > */ > static int > -wal_write(struct journal *journal, struct journal_entry *entry) > +wal_write_async(struct journal *journal, struct journal_entry *entry, > + journal_entry_complete_cb on_complete_cb, > + void *on_complete_cb_data) > { > struct wal_writer *writer = (struct wal_writer *) journal; > + struct txn *txn = in_txn(); > + > + /* > + * After this point the transaction will > + * live on its own and processed via callbacks, > + * so reset the fiber storage. > + */ > + entry->on_complete_cb = on_complete_cb; > + entry->on_complete_cb_data = on_complete_cb_data; > + fiber_set_txn(fiber(), NULL); This should be in txn.cc, why did you move it here? > > ERROR_INJECT(ERRINJ_WAL_IO, { > goto fail; > @@ -1220,27 +1245,90 @@ wal_write(struct journal *journal, struct journal_entry *entry) > return 0; > > fail: > + /* > + * Don't forget to restore transaction > + * in a fiber storage: the caller should > + * be able to run a rollback procedure. > + */ > + fiber_set_txn(fiber(), txn); This should not be needed now. > entry->res = -1; > - journal_entry_complete(entry); > + txn->signature = -1; > return -1; > } > > +static void > +wal_write_cb(struct journal_entry *entry, void *data) > +{ > + struct txn *txn = data; > + (void)entry; > + > + /* > + * On synchronous write just wake up > + * the waiter which will complete the > + * transaction. > + */ > + fiber_wakeup(txn->fiber); Why not pass fiber as data, this would avoid having to include txn.h. > +} > + > +/* > + * Queue entry to write and wait until it processed. > + */ > static int > -wal_write_in_wal_mode_none(struct journal *journal, > - struct journal_entry *entry) > +wal_write(struct journal *journal, struct journal_entry *entry) > { > - struct wal_writer *writer = (struct wal_writer *) journal; > + struct txn *txn = in_txn(); > + > + /* > + * Lets reuse async WAL engine to shrink code a bit. > + */ > + if (wal_write_async(journal, entry, wal_write_cb, txn) != 0) txn -> fiber_self(). > + return -1; > + > + bool cancellable = fiber_set_cancellable(false); > + fiber_yield(); > + fiber_set_cancellable(cancellable); > + > + /* > + * Unlike async write we preserve the transaction > + * in a fiber storage where the caller should finish > + * the transaction. > + */ > + fiber_set_txn(fiber(), txn); > + return 0; > +} > + > +static int > +wal_write_none_async(struct journal *journal, > + struct journal_entry *entry, > + journal_entry_complete_cb on_complete_cb, > + void *on_complete_cb_data) > +{ > + struct wal_writer *writer = (struct wal_writer *)journal; > struct vclock vclock_diff; > + struct txn *txn = in_txn(); > + > + (void)on_complete_cb; > + (void)on_complete_cb_data; > + > + fiber_set_txn(fiber(), NULL); > + > vclock_create(&vclock_diff); > wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows, > entry->rows + entry->n_rows); > vclock_merge(&writer->vclock, &vclock_diff); > vclock_copy(&replicaset.vclock, &writer->vclock); > entry->res = vclock_sum(&writer->vclock); > - journal_entry_complete(entry); > + > + txn->signature = entry->res; Please move back this assignment to txn.cc. > return 0; > } > > +static int > +wal_write_none(struct journal *journal, struct journal_entry *entry) > +{ > + return wal_write_none_async(journal, entry, NULL, NULL); > +} > + > -- Konstantin Osipov, Moscow, Russia