From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id B3EB230D4A for ; Wed, 19 Jun 2019 17:23:22 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id D7TtACNHuFHc for ; Wed, 19 Jun 2019 17:23:22 -0400 (EDT) Received: from smtp63.i.mail.ru (smtp63.i.mail.ru [217.69.128.43]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 508C130CFD for ; Wed, 19 Jun 2019 17:23:22 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback Date: Thu, 20 Jun 2019 00:23:13 +0300 Message-Id: <079a143aa17e185c44c5fe9c53ef207d7988fe56.1560978655.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Finalize a transaction thorough a journal entry callback. So transaction processing doesn't rely on fiber schedule. This also enforce transaction finalization order as triggers might fail. Prerequisites: #1254 --- src/box/alter.cc | 5 +++ src/box/box.cc | 7 ++- src/box/journal.c | 10 ++++- src/box/journal.h | 12 ++++- src/box/txn.c | 112 ++++++++++++++++++++++++++++++---------------- src/box/vy_log.c | 3 +- src/box/wal.c | 20 ++++++++- 7 files changed, 122 insertions(+), 47 deletions(-) diff --git a/src/box/alter.cc b/src/box/alter.cc index a37a68ce4..aa6a79264 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -3558,6 +3558,11 @@ unlock_after_dd(struct trigger *trigger, void *event) { (void) trigger; (void) event; + /* + * A trigger could be processed by the wal scheduler fiber + * so steal the latch first. + */ + latch_steal(&schema_lock); latch_unlock(&schema_lock); } diff --git a/src/box/box.cc b/src/box/box.cc index 5e5cd2b08..2994363ab 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -309,10 +309,13 @@ struct recovery_journal { */ static int64_t recovery_journal_write(struct journal *base, - struct journal_entry * /* entry */) + struct journal_entry *entry) { struct recovery_journal *journal = (struct recovery_journal *) base; - return vclock_sum(journal->vclock); + entry->res = vclock_sum(journal->vclock); + if (entry->on_done_cb) + entry->on_done_cb(entry, entry->on_done_cb_data); + return entry->res; } static inline void diff --git a/src/box/journal.c b/src/box/journal.c index fe13fb6ee..eb0db9af2 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -41,7 +41,9 @@ static int64_t dummy_journal_write(struct journal *journal, struct journal_entry *entry) { (void) journal; - (void) entry; + entry->res = 0; + if (entry->on_done_cb) + entry->on_done_cb(entry, entry->on_done_cb_data); return 0; } @@ -53,7 +55,9 @@ static struct journal dummy_journal = { struct journal *current_journal = &dummy_journal; struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region) +journal_entry_new(size_t n_rows, struct region *region, + void (*on_done_cb)(struct journal_entry *entry, void *data), + void *on_done_cb_data) { struct journal_entry *entry; @@ -70,6 +74,8 @@ journal_entry_new(size_t n_rows, struct region *region) entry->n_rows = n_rows; entry->res = -1; entry->fiber = fiber(); + entry->on_done_cb = on_done_cb; + entry->on_done_cb_data = on_done_cb_data; return entry; } diff --git a/src/box/journal.h b/src/box/journal.h index 8ac32ee5e..b704b5c67 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -58,6 +58,14 @@ struct journal_entry { * The fiber issuing the request. */ struct fiber *fiber; + /** + * A journal entry completion callback. + */ + void (*on_done_cb)(struct journal_entry *entry, void *data); + /** + * A journal entry completion callback argument. + */ + void *on_done_cb_data; /** * Approximate size of this request when encoded. */ @@ -80,7 +88,9 @@ struct region; * @return NULL if out of memory, fiber diagnostics area is set */ struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region); +journal_entry_new(size_t n_rows, struct region *region, + void (*on_done_cb)(struct journal_entry *entry, void *data), + void *on_done_cb_data); /** * An API for an abstract journal for all transactions of this diff --git a/src/box/txn.c b/src/box/txn.c index 21f7e98b4..52e16f3e6 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -337,6 +337,66 @@ fail: return -1; } +/** + * Complete transaction processing. + */ +static void +txn_complete(struct txn *txn) +{ + if (txn->signature < 0) { + if (txn->engine) + engine_rollback(txn->engine, txn); + /* + * Some of triggers require for in_txn variable is set so + * restore it for time a trigger is in progress. + */ + fiber_set_txn(fiber(), txn); + /* Rollback triggers must not throw. */ + if (txn->has_triggers && + trigger_run(&txn->on_rollback, txn) != 0) { + diag_log(); + unreachable(); + panic("rollback trigger failed"); + } + fiber_set_txn(fiber(), NULL); + + return; + } + /* + * Engine can be NULL if transaction contains IPROTO_NOP + * statements only. + */ + if (txn->engine != NULL) + engine_commit(txn->engine, txn); + /* + * Some of triggers require for in_txn variable is set so + * restore it for time a trigger is in progress. + */ + fiber_set_txn(fiber(), txn); + /* + * The transaction is in the binary log. No action below + * may throw. In case an error has happened, there is + * no other option but terminate. + */ + if (txn->has_triggers && + trigger_run(&txn->on_commit, txn) != 0) { + diag_log(); + unreachable(); + panic("commit trigger failed"); + } + + fiber_set_txn(fiber(), NULL); +} + +static void +txn_entry_done_cb(struct journal_entry *entry, void *data) +{ + struct txn *txn = (struct txn *)data; + txn->signature = entry->res; + txn_complete(txn); +} + + static int64_t txn_write_to_wal(struct txn *txn) { @@ -344,7 +404,9 @@ txn_write_to_wal(struct txn *txn) struct journal_entry *req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows, - &txn->region); + &txn->region, + txn_entry_done_cb, + txn); if (req == NULL) { txn_rollback(txn); return -1; @@ -370,14 +432,6 @@ txn_write_to_wal(struct txn *txn) ev_tstamp stop = ev_monotonic_now(loop()); if (res < 0) { - /* Cascading rollback. */ - txn_rollback(txn); /* Perform our part of cascading rollback. */ - /* - * Move fiber to end of event loop to avoid - * execution of any new requests before all - * pending rollbacks are processed. - */ - fiber_reschedule(); diag_set(ClientError, ER_WAL_IO); diag_log(); } else if (stop - start > too_long_threshold) { @@ -418,31 +472,20 @@ txn_commit(struct txn *txn) } trigger_clear(&txn->fiber_on_stop); + fiber_set_txn(fiber(), NULL); if (txn->n_new_rows + txn->n_applier_rows > 0) { txn->signature = txn_write_to_wal(txn); if (txn->signature < 0) return -1; + } else { + /* + * However there is noting to write to wal a completion + * should be fired. + */ + txn->signature = 0; + txn_complete(txn); } - /* - * Engine can be NULL if transaction contains IPROTO_NOP - * statements only. - */ - if (txn->engine != NULL) - engine_commit(txn->engine, txn); - /* - * The transaction is in the binary log. No action below - * may throw. In case an error has happened, there is - * no other option but terminate. - */ - if (txn->has_triggers && - trigger_run(&txn->on_commit, txn) != 0) { - diag_log(); - unreachable(); - panic("commit trigger failed"); - } - - fiber_set_txn(fiber(), NULL); txn_free(txn); return 0; fail: @@ -464,17 +507,8 @@ txn_rollback(struct txn *txn) { assert(txn == in_txn()); trigger_clear(&txn->fiber_on_stop); - if (txn->engine) - engine_rollback(txn->engine, txn); - /* Rollback triggers must not throw. */ - if (txn->has_triggers && - trigger_run(&txn->on_rollback, txn) != 0) { - diag_log(); - unreachable(); - panic("rollback trigger failed"); - } - - fiber_set_txn(fiber(), NULL); + txn->signature = -1; + txn_complete(txn); txn_free(txn); } diff --git a/src/box/vy_log.c b/src/box/vy_log.c index bdc1cfa31..7cf5ff3e9 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -772,7 +772,8 @@ vy_log_flush(void) tx_size++; size_t used = region_used(&fiber()->gc); - struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc); + struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc, + NULL, NULL); if (entry == NULL) goto err; diff --git a/src/box/wal.c b/src/box/wal.c index 71f6dbb5c..62b6391fd 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -263,6 +263,8 @@ tx_schedule_f(va_list ap) struct journal_entry *req = stailq_shift_entry(&writer->schedule_queue, struct journal_entry, fifo); + if (req->on_done_cb != NULL) + req->on_done_cb(req, req->on_done_cb_data); fiber_wakeup(req->fiber); } writer->is_in_rollback = false; @@ -1158,7 +1160,12 @@ wal_write(struct journal *journal, struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; - ERROR_INJECT_RETURN(ERRINJ_WAL_IO); + ERROR_INJECT(ERRINJ_WAL_IO, { + entry->res = -1; + if (entry->on_done_cb != NULL) + entry->on_done_cb(entry, entry->on_done_cb_data); + return -1; + }); if (writer->is_in_rollback) { /* @@ -1171,6 +1178,9 @@ wal_write(struct journal *journal, struct journal_entry *entry) say_error("Aborting transaction %llu during " "cascading rollback", vclock_sum(&writer->vclock)); + entry->res = -1; + if (entry->on_done_cb != NULL) + entry->on_done_cb(entry, entry->on_done_cb_data); return -1; } @@ -1185,6 +1195,9 @@ wal_write(struct journal *journal, struct journal_entry *entry) if (batch == NULL) { diag_set(OutOfMemory, sizeof(struct wal_msg), "region", "struct wal_msg"); + entry->res = -1; + if (entry->on_done_cb != NULL) + entry->on_done_cb(entry, entry->on_done_cb_data); return -1; } wal_msg_create(batch); @@ -1222,7 +1235,10 @@ wal_write_in_wal_mode_none(struct journal *journal, entry->rows + entry->n_rows); vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); - return vclock_sum(&writer->vclock); + entry->res = vclock_sum(&writer->vclock); + if (entry->on_done_cb) + entry->on_done_cb(entry, entry->on_done_cb_data); + return entry->res; } void -- 2.22.0