From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id EC7AC315EC for ; Fri, 21 Jun 2019 17:48:26 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id lxwlFRklVzWE for ; Fri, 21 Jun 2019 17:48:26 -0400 (EDT) Received: from smtp51.i.mail.ru (smtp51.i.mail.ru [94.100.177.111]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 6EB3F31606 for ; Fri, 21 Jun 2019 17:48:26 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v5 4/7] wal: introduce a journal entry finalization callback Date: Sat, 22 Jun 2019 00:48:18 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Finalize a transaction thorough a journal entry callback. So transaction processing doesn't rely on fiber schedule. Also allow to steal locked latch ownership for fiber which isn't owner of the latch. This is required to process transaction triggers asynchronously. Prerequisites: #1254 --- src/box/alter.cc | 6 +++ src/box/box.cc | 6 ++- src/box/journal.c | 9 +++- src/box/journal.h | 29 +++++++++- src/box/txn.c | 123 +++++++++++++++++++++++++++++-------------- src/box/vy_log.c | 3 +- src/box/wal.c | 21 ++++++-- src/lib/core/latch.h | 10 ++++ 8 files changed, 157 insertions(+), 50 deletions(-) diff --git a/src/box/alter.cc b/src/box/alter.cc index a37a68ce4..1595e27af 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -3558,6 +3558,12 @@ unlock_after_dd(struct trigger *trigger, void *event) { (void) trigger; (void) event; + /* + * In case of yielding journal will this trigger be processed + * in a context of tx_prio endpoint instead of a context of + * a fiber which has this latch locked. So steal the latch first. + */ + latch_steal(&schema_lock); latch_unlock(&schema_lock); } diff --git a/src/box/box.cc b/src/box/box.cc index d53b0cdc5..f5bd29dd5 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -303,10 +303,12 @@ struct recovery_journal { */ static int64_t recovery_journal_write(struct journal *base, - struct journal_entry * /* entry */) + struct journal_entry *entry) { struct recovery_journal *journal = (struct recovery_journal *) base; - return vclock_sum(journal->vclock); + entry->res = vclock_sum(journal->vclock); + journal_entry_complete(entry); + return entry->res; } static inline void diff --git a/src/box/journal.c b/src/box/journal.c index fe13fb6ee..4c1997f36 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -41,7 +41,8 @@ static int64_t dummy_journal_write(struct journal *journal, struct journal_entry *entry) { (void) journal; - (void) entry; + entry->res = 0; + journal_entry_complete(entry); return 0; } @@ -53,7 +54,9 @@ static struct journal dummy_journal = { struct journal *current_journal = &dummy_journal; struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region) +journal_entry_new(size_t n_rows, struct region *region, + journal_entry_done_cb on_done_cb, + void *on_done_cb_data) { struct journal_entry *entry; @@ -70,6 +73,8 @@ journal_entry_new(size_t n_rows, struct region *region) entry->n_rows = n_rows; entry->res = -1; entry->fiber = fiber(); + entry->on_done_cb = on_done_cb; + entry->on_done_cb_data = on_done_cb_data; return entry; } diff --git a/src/box/journal.h b/src/box/journal.h index 8ac32ee5e..52b8a715c 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -39,6 +39,11 @@ extern "C" { #endif /* defined(__cplusplus) */ struct xrow_header; +struct journal_entry; + +/** Journal entry finalization callback typedef. */ +typedef void (*journal_entry_done_cb)(struct journal_entry *entry, void *data); + /** * An entry for an abstract journal. * Simply put, a write ahead log request. @@ -58,6 +63,17 @@ struct journal_entry { * The fiber issuing the request. */ struct fiber *fiber; + /** + * A journal entry finalization callback which is going to be called + * after the entry processing was winished in both cases: succes + * or fail. Entry->res is set to a result value before the callback + * is fired. + */ + journal_entry_done_cb on_done_cb; + /** + * A journal entry completion callback argument. + */ + void *on_done_cb_data; /** * Approximate size of this request when encoded. */ @@ -80,7 +96,18 @@ struct region; * @return NULL if out of memory, fiber diagnostics area is set */ struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region); +journal_entry_new(size_t n_rows, struct region *region, + journal_entry_done_cb on_done_cb, + void *on_done_cb_data); + +/** + * Finalize a signle entry. + */ +static inline void +journal_entry_complete(struct journal_entry *entry) +{ + entry->on_done_cb(entry, entry->on_done_cb_data); +} /** * An API for an abstract journal for all transactions of this diff --git a/src/box/txn.c b/src/box/txn.c index 1eb4db6a3..5825acc34 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -337,6 +337,66 @@ fail: return -1; } +/* + * A helper function to process on_commit/on_rollback triggers. + */ +static inline void +txn_process_trigger(struct rlist *trigger, struct txn *txn) +{ + /* + * Some of triggers require for in_txn variable is set so + * restore it for time a trigger is in progress. + */ + fiber_set_txn(fiber(), txn); + /* Rollback triggers must not throw. */ + if (trigger_run(trigger, txn) != 0) { + /* + * As transaction couldn't handle a trigger error so + * there is no option except than panic. + */ + diag_log(); + unreachable(); + panic("rollback trigger failed"); + } + fiber_set_txn(fiber(), NULL); +} + +/** + * Complete transaction processing. + */ +static void +txn_complete(struct txn *txn) +{ + if (txn->signature < 0) { + /* Undo the transaction. */ + if (txn->engine) + engine_rollback(txn->engine, txn); + if (txn->has_triggers) + txn_process_trigger(&txn->on_rollback, txn); + + return; + } else { + /* Accept the transaction. */ + /* + * Engine can be NULL if transaction contains IPROTO_NOP + * statements only. + */ + if (txn->engine != NULL) + engine_commit(txn->engine, txn); + if (txn->has_triggers) + txn_process_trigger(&txn->on_commit, txn); + } +} + +static void +txn_entry_done_cb(struct journal_entry *entry, void *data) +{ + struct txn *txn = (struct txn *)data; + txn->signature = entry->res; + txn_complete(txn); +} + + static int64_t txn_write_to_wal(struct txn *txn) { @@ -344,7 +404,9 @@ txn_write_to_wal(struct txn *txn) struct journal_entry *req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows, - &txn->region); + &txn->region, + txn_entry_done_cb, + txn); if (req == NULL) { txn_rollback(txn); return -1; @@ -370,16 +432,14 @@ txn_write_to_wal(struct txn *txn) ev_tstamp stop = ev_monotonic_now(loop()); if (res < 0) { - /* Cascading rollback. */ - txn_rollback(txn); /* Perform our part of cascading rollback. */ - /* - * Move fiber to end of event loop to avoid - * execution of any new requests before all - * pending rollbacks are processed. - */ - fiber_reschedule(); diag_set(ClientError, ER_WAL_IO); diag_log(); + /* + * However, the transaction is rolled back by + * finalization handler we are still in duty to + * free it. + */ + txn_free(txn); } else if (stop - start > too_long_threshold) { int n_rows = txn->n_new_rows + txn->n_applier_rows; say_warn_ratelimited("too long WAL write: %d rows at " @@ -418,30 +478,23 @@ txn_commit(struct txn *txn) } trigger_clear(&txn->fiber_on_stop); + /* + * After this transaction could not be used more + * so reset corresponding key in a fiber storage. + */ + fiber_set_txn(fiber(), NULL); if (txn->n_new_rows + txn->n_applier_rows > 0) { txn->signature = txn_write_to_wal(txn); if (txn->signature < 0) return -1; + } else { + /* + * However, there is nothing to write to wal a finalization + * should be fired. + */ + txn->signature = 0; + txn_complete(txn); } - /* - * Engine can be NULL if transaction contains IPROTO_NOP - * statements only. - */ - if (txn->engine != NULL) - engine_commit(txn->engine, txn); - /* - * The transaction is in the binary log. No action below - * may throw. In case an error has happened, there is - * no other option but terminate. - */ - if (txn->has_triggers && - trigger_run(&txn->on_commit, txn) != 0) { - diag_log(); - unreachable(); - panic("commit trigger failed"); - } - - fiber_set_txn(fiber(), NULL); txn_free(txn); return 0; fail: @@ -463,18 +516,10 @@ txn_rollback(struct txn *txn) { assert(txn == in_txn()); trigger_clear(&txn->fiber_on_stop); - if (txn->engine) - engine_rollback(txn->engine, txn); - /* Rollback triggers must not throw. */ - if (txn->has_triggers && - trigger_run(&txn->on_rollback, txn) != 0) { - diag_log(); - unreachable(); - panic("rollback trigger failed"); - } - - fiber_set_txn(fiber(), NULL); + txn->signature = -1; + txn_complete(txn); txn_free(txn); + fiber_set_txn(fiber(), NULL); } void diff --git a/src/box/vy_log.c b/src/box/vy_log.c index 098a01419..bf50f5520 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -819,7 +819,8 @@ vy_log_tx_flush(struct vy_log_tx *tx) tx_size++; size_t used = region_used(&fiber()->gc); - struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc); + struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc, + NULL, NULL); if (entry == NULL) goto err; diff --git a/src/box/wal.c b/src/box/wal.c index 0ea15a432..4fa9beca0 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -260,8 +260,10 @@ tx_schedule_queue(struct stailq *queue) * are many ready fibers. */ struct journal_entry *req; - stailq_foreach_entry(req, queue, fifo) + stailq_foreach_entry(req, queue, fifo) { + journal_entry_complete(req); fiber_wakeup(req->fiber); + } } /** @@ -1131,7 +1133,9 @@ wal_write(struct journal *journal, struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; - ERROR_INJECT_RETURN(ERRINJ_WAL_IO); + ERROR_INJECT(ERRINJ_WAL_IO, { + goto fail; + }); if (! stailq_empty(&writer->rollback)) { /* @@ -1144,7 +1148,7 @@ wal_write(struct journal *journal, struct journal_entry *entry) say_error("Aborting transaction %llu during " "cascading rollback", vclock_sum(&writer->vclock)); - return -1; + goto fail; } struct wal_msg *batch; @@ -1158,7 +1162,7 @@ wal_write(struct journal *journal, struct journal_entry *entry) if (batch == NULL) { diag_set(OutOfMemory, sizeof(struct wal_msg), "region", "struct wal_msg"); - return -1; + goto fail; } wal_msg_create(batch); /* @@ -1182,6 +1186,11 @@ wal_write(struct journal *journal, struct journal_entry *entry) fiber_yield(); /* Request was inserted. */ fiber_set_cancellable(cancellable); return entry->res; + +fail: + entry->res = -1; + journal_entry_complete(entry); + return -1; } int64_t @@ -1195,7 +1204,9 @@ wal_write_in_wal_mode_none(struct journal *journal, entry->rows + entry->n_rows); vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); - return vclock_sum(&writer->vclock); + entry->res = vclock_sum(&writer->vclock); + journal_entry_complete(entry); + return entry->res; } void diff --git a/src/lib/core/latch.h b/src/lib/core/latch.h index 49c59cf63..580942564 100644 --- a/src/lib/core/latch.h +++ b/src/lib/core/latch.h @@ -155,6 +155,16 @@ latch_trylock(struct latch *l) return latch_lock_timeout(l, 0); } +/** + * Take a latch ownership + */ +static inline void +latch_steal(struct latch *l) +{ + assert(l->owner != NULL); + l->owner = fiber(); +} + /** * \copydoc box_latch_unlock */ -- 2.22.0