From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-lf1-f68.google.com (mail-lf1-f68.google.com [209.85.167.68]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 51EA044185F for ; Fri, 20 Mar 2020 11:22:06 +0300 (MSK) Received: by mail-lf1-f68.google.com with SMTP id j15so3803181lfk.6 for ; Fri, 20 Mar 2020 01:22:06 -0700 (PDT) From: Cyrill Gorcunov Date: Fri, 20 Mar 2020 11:19:56 +0300 Message-Id: <20200320081956.30650-12-gorcunov@gmail.com> In-Reply-To: <20200320081956.30650-1-gorcunov@gmail.com> References: <20200320081956.30650-1-gorcunov@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v15 11/11] box/journal: redesign journal operations List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tml Currently the journal provides only one method -- write, which implies a callback to trigger upon write completion (in contrary with 1.10 series where all commits were processing in synchronous way). Lets make difference between sync and async writes more notable: provide journal::write_async method which runs completion function once entry is written, in turn journal:write handle transaction in synchronous way. Redesing notes: 1) The callback for async write set once in journal creation. There is no need to carry callback in every journal entry. This allows us to save some memory; 2) txn_commit and txn_commit_async call txn_rollback where appropriate; 3) no need to call journal_entry_complete on sync writes anymore; 4) wal_write_in_wal_mode_none is too long, renamed to wal_write_none; 5) wal engine use async writes internally but it is transparent to callers. Signed-off-by: Cyrill Gorcunov --- src/box/box.cc | 8 ++--- src/box/journal.c | 33 +++++++++++++----- src/box/journal.h | 86 +++++++++++++++++++++++++++++++---------------- src/box/txn.c | 61 +++++++++++++++++---------------- src/box/txn.h | 2 +- src/box/vy_log.c | 5 +-- src/box/wal.c | 61 ++++++++++++++++++++++++++------- src/box/wal.h | 4 +-- 8 files changed, 171 insertions(+), 89 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index cf79affca..3a3bda78e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -322,7 +322,6 @@ recovery_journal_write(struct journal *base, { struct recovery_journal *journal = (struct recovery_journal *) base; entry->res = vclock_sum(journal->vclock); - journal_entry_complete(entry); return 0; } @@ -330,8 +329,9 @@ static void recovery_journal_create(struct vclock *v) { static struct recovery_journal journal; - - journal_create(&journal.base, recovery_journal_write, NULL); + journal_create(&journal.base, journal_no_write_async, + journal_no_write_async_cb, + recovery_journal_write, NULL); journal.vclock = v; journal_set(&journal.base); } @@ -2353,7 +2353,7 @@ box_cfg_xc(void) int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size")); enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode")); - if (wal_init(wal_mode, cfg_gets("wal_dir"), + if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"), wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection, on_wal_checkpoint_threshold) != 0) { diag_raise(); diff --git a/src/box/journal.c b/src/box/journal.c index 11e78990d..b7535dc68 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -32,6 +32,24 @@ #include #include +int +journal_no_write_async(struct journal *journal, + struct journal_entry *entry) +{ + (void)journal; + + assert(true); + entry->res = -1; + return -1; +} + +void +journal_no_write_async_cb(struct journal_entry *entry) +{ + assert(true); + entry->res = -1; +} + /** * Used to load from a memtx snapshot. LSN is not used, * but txn_commit() must work. @@ -41,21 +59,20 @@ dummy_journal_write(struct journal *journal, struct journal_entry *entry) { (void) journal; entry->res = 0; - journal_entry_complete(entry); return 0; } static struct journal dummy_journal = { - dummy_journal_write, - NULL, + .write_async = journal_no_write_async, + .write_async_cb = journal_no_write_async_cb, + .write = dummy_journal_write, }; struct journal *current_journal = &dummy_journal; struct journal_entry * journal_entry_new(size_t n_rows, struct region *region, - journal_entry_complete_cb on_complete_cb, - void *on_complete_cb_data) + void *complete_data) { struct journal_entry *entry; @@ -68,11 +85,11 @@ journal_entry_new(size_t n_rows, struct region *region, diag_set(OutOfMemory, size, "region", "struct journal_entry"); return NULL; } + + entry->complete_data = complete_data; entry->approx_len = 0; entry->n_rows = n_rows; entry->res = -1; - entry->on_complete_cb = on_complete_cb; - entry->on_complete_cb_data = on_complete_cb_data; + return entry; } - diff --git a/src/box/journal.h b/src/box/journal.h index 64f167c6f..a3f3170ed 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -42,9 +42,6 @@ extern "C" { struct xrow_header; struct journal_entry; -/** Journal entry finalization callback typedef. */ -typedef void (*journal_entry_complete_cb)(struct journal_entry *entry, void *data); - /** * An entry for an abstract journal. * Simply put, a write ahead log request. @@ -60,17 +57,10 @@ struct journal_entry { * the committed transaction, on error is -1 */ int64_t res; - /** - * A journal entry finalization callback which is going to be called - * after the entry processing was finished in both cases: success - * or fail. Entry->res is set to a result value before the callback - * is fired. - */ - journal_entry_complete_cb on_complete_cb; /** * A journal entry completion callback argument. */ - void *on_complete_cb_data; + void *complete_data; /** * Approximate size of this request when encoded. */ @@ -94,17 +84,7 @@ struct region; */ struct journal_entry * journal_entry_new(size_t n_rows, struct region *region, - journal_entry_complete_cb on_complete_cb, - void *on_complete_cb_data); - -/** - * Finalize a single entry. - */ -static inline void -journal_entry_complete(struct journal_entry *entry) -{ - entry->on_complete_cb(entry, entry->on_complete_cb_data); -} + void *complete_data); /** * An API for an abstract journal for all transactions of this @@ -112,11 +92,31 @@ journal_entry_complete(struct journal_entry *entry) * synchronous replication. */ struct journal { + /** Asynchronous write */ + int (*write_async)(struct journal *journal, + struct journal_entry *entry); + + /** Asynchronous write completion */ + void (*write_async_cb)(struct journal_entry *entry); + + /** Synchronous write */ int (*write)(struct journal *journal, - struct journal_entry *req); + struct journal_entry *entry); + + /** Journal destroy */ void (*destroy)(struct journal *journal); }; +/** + * Finalize a single entry. + */ +static inline void +journal_async_complete(struct journal *journal, struct journal_entry *entry) +{ + assert(journal->write_async_cb != NULL); + journal->write_async_cb(entry); +} + /** * Depending on the step of recovery and instance configuration * points at a concrete implementation of the journal. @@ -124,9 +124,9 @@ struct journal { extern struct journal *current_journal; /** - * Send a single entry to write. + * Write a single entry to the journal in synchronous way. * - * @return 0 if write was scheduled or -1 in case of an error. + * @return 0 if write was processed by a backend or -1 in case of an error. */ static inline int journal_write(struct journal_entry *entry) @@ -134,6 +134,17 @@ journal_write(struct journal_entry *entry) return current_journal->write(current_journal, entry); } +/** + * Queue a single entry to the journal in asynchronous way. + * + * @return 0 if write was queued to a backend or -1 in case of an error. + */ +static inline int +journal_write_async(struct journal_entry *entry) +{ + return current_journal->write_async(current_journal, entry); +} + /** * Change the current implementation of the journaling API. * Happens during life cycle of an instance: @@ -165,13 +176,30 @@ journal_set(struct journal *new_journal) static inline void journal_create(struct journal *journal, - int (*write)(struct journal *, struct journal_entry *), - void (*destroy)(struct journal *)) + int (*write_async)(struct journal *journal, + struct journal_entry *entry), + void (*write_async_cb)(struct journal_entry *entry), + int (*write)(struct journal *journal, + struct journal_entry *entry), + void (*destroy)(struct journal *journal)) { - journal->write = write; - journal->destroy = destroy; + journal->write_async = write_async; + journal->write_async_cb = write_async_cb; + journal->write = write; + journal->destroy = destroy; } +/** + * A stub to issue an error in case if asynchronous + * write is diabled in the backend. + */ +extern int +journal_no_write_async(struct journal *journal, + struct journal_entry *entry); + +extern void +journal_no_write_async_cb(struct journal_entry *entry); + static inline bool journal_is_initialized(struct journal *journal) { diff --git a/src/box/txn.c b/src/box/txn.c index 11c20aceb..b42df3df6 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -463,9 +463,9 @@ txn_complete(struct txn *txn) } void -txn_complete_async(struct journal_entry *entry, void *complete_data) +txn_complete_async(struct journal_entry *entry) { - struct txn *txn = complete_data; + struct txn *txn = entry->complete_data; txn->signature = entry->res; /* * Some commit/rollback triggers require for in_txn fiber @@ -487,7 +487,7 @@ txn_journal_entry_new(struct txn *txn) assert(txn->n_new_rows + txn->n_applier_rows > 0); req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows, - &txn->region, txn_complete_async, txn); + &txn->region, txn); if (req == NULL) return NULL; @@ -518,24 +518,6 @@ txn_journal_entry_new(struct txn *txn) return req; } -static int64_t -txn_write_to_wal(struct journal_entry *req) -{ - /* - * Send the entry to the journal. - * - * After this point the transaction must not be used - * so reset the corresponding key in the fiber storage. - */ - fiber_set_txn(fiber(), NULL); - if (journal_write(req) < 0) { - diag_set(ClientError, ER_WAL_IO); - diag_log(); - return -1; - } - return 0; -} - /* * Prepare a transaction using engines. */ @@ -608,7 +590,17 @@ txn_commit_async(struct txn *txn) return -1; } - return txn_write_to_wal(req); + fiber_set_txn(fiber(), NULL); + if (journal_write_async(req) != 0) { + fiber_set_txn(fiber(), txn); + txn_rollback(txn); + + diag_set(ClientError, ER_WAL_IO); + diag_log(); + return -1; + } + + return 0; } int @@ -636,21 +628,28 @@ txn_commit(struct txn *txn) return -1; } - if (txn_write_to_wal(req) != 0) + fiber_set_txn(fiber(), NULL); + if (journal_write(req) != 0) { + fiber_set_txn(fiber(), txn); + txn_rollback(txn); + txn_free(txn); + + diag_set(ClientError, ER_WAL_IO); + diag_log(); return -1; + } - /* - * In case of non-yielding journal the transaction could already - * be done and there is nothing to wait in such cases. - */ if (!txn_has_flag(txn, TXN_IS_DONE)) { - bool cancellable = fiber_set_cancellable(false); - fiber_yield(); - fiber_set_cancellable(cancellable); + txn->signature = req->res; + txn_complete(txn); + fiber_set_txn(fiber(), NULL); } + int res = txn->signature >= 0 ? 0 : -1; - if (res != 0) + if (res != 0) { diag_set(ClientError, ER_WAL_IO); + diag_log(); + } /* Synchronous transactions are freed by the calling fiber. */ txn_free(txn); diff --git a/src/box/txn.h b/src/box/txn.h index 572c76d84..3f6d79d5c 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -292,7 +292,7 @@ txn_rollback(struct txn *txn); * Complete asynchronous transaction. */ void -txn_complete_async(struct journal_entry *entry, void *complete_data); +txn_complete_async(struct journal_entry *entry); /** * Submit a transaction to the journal. diff --git a/src/box/vy_log.c b/src/box/vy_log.c index cb291f3c8..9ead066af 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -815,8 +815,9 @@ vy_log_tx_flush(struct vy_log_tx *tx) tx_size++; size_t used = region_used(&fiber()->gc); - struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc, - NULL, NULL); + + struct journal_entry *entry; + entry = journal_entry_new(tx_size, &fiber()->gc, NULL); if (entry == NULL) goto err; diff --git a/src/box/wal.c b/src/box/wal.c index 1668c9348..3b094b0e8 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -60,11 +60,17 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL }; int wal_dir_lock = -1; +static int +wal_write_async(struct journal *, struct journal_entry *); + static int wal_write(struct journal *, struct journal_entry *); static int -wal_write_in_wal_mode_none(struct journal *, struct journal_entry *); +wal_write_none_async(struct journal *, struct journal_entry *); + +static int +wal_write_none(struct journal *, struct journal_entry *); /* * WAL writer - maintain a Write Ahead Log for every change @@ -253,9 +259,10 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry) static void tx_schedule_queue(struct stailq *queue) { + struct wal_writer *writer = &wal_writer_singleton; struct journal_entry *req, *tmp; stailq_foreach_entry_safe(req, tmp, queue, fifo) - journal_entry_complete(req); + journal_async_complete(&writer->base, req); } /** @@ -342,6 +349,7 @@ tx_notify_checkpoint(struct cmsg *msg) */ static void wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, + void (*wall_async_cb)(struct journal_entry *entry), const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid, wal_on_garbage_collection_f on_garbage_collection, @@ -349,8 +357,14 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, { writer->wal_mode = wal_mode; writer->wal_max_size = wal_max_size; - journal_create(&writer->base, wal_mode == WAL_NONE ? - wal_write_in_wal_mode_none : wal_write, NULL); + + journal_create(&writer->base, + wal_mode == WAL_NONE ? + wal_write_none_async : wal_write_async, + wall_async_cb, + wal_mode == WAL_NONE ? + wal_write_none : wal_write, + NULL); struct xlog_opts opts = xlog_opts_default; opts.sync_is_async = true; @@ -458,14 +472,14 @@ wal_open(struct wal_writer *writer) } int -wal_init(enum wal_mode wal_mode, const char *wal_dirname, - int64_t wal_max_size, const struct tt_uuid *instance_uuid, +wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry), + const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold) { /* Initialize the state. */ struct wal_writer *writer = &wal_writer_singleton; - wal_writer_create(writer, wal_mode, wal_dirname, + wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname, wal_max_size, instance_uuid, on_garbage_collection, on_checkpoint_threshold); @@ -1170,7 +1184,7 @@ wal_writer_f(va_list ap) * to be written to disk. */ static int -wal_write(struct journal *journal, struct journal_entry *entry) +wal_write_async(struct journal *journal, struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; @@ -1221,26 +1235,49 @@ wal_write(struct journal *journal, struct journal_entry *entry) fail: entry->res = -1; - journal_entry_complete(entry); return -1; } static int -wal_write_in_wal_mode_none(struct journal *journal, - struct journal_entry *entry) +wal_write(struct journal *journal, struct journal_entry *entry) +{ + /* + * We can reuse async WAL engine transparently + * to the caller. + */ + if (wal_write_async(journal, entry) != 0) + return -1; + + bool cancellable = fiber_set_cancellable(false); + fiber_yield(); + fiber_set_cancellable(cancellable); + + return 0; +} + +static int +wal_write_none_async(struct journal *journal, + struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; struct vclock vclock_diff; + vclock_create(&vclock_diff); wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows, entry->rows + entry->n_rows); vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); entry->res = vclock_sum(&writer->vclock); - journal_entry_complete(entry); + return 0; } +static int +wal_write_none(struct journal *journal, struct journal_entry *entry) +{ + return wal_write_none_async(journal, entry); +} + void wal_init_vy_log() { diff --git a/src/box/wal.h b/src/box/wal.h index 76b44941a..11a66a20a 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void); * Start WAL thread and initialize WAL writer. */ int -wal_init(enum wal_mode wal_mode, const char *wal_dirname, - int64_t wal_max_size, const struct tt_uuid *instance_uuid, +wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry), + const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold); -- 2.20.1