From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 9EEAE30D4B for ; Wed, 19 Jun 2019 17:23:23 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 22cOtucbVATi for ; Wed, 19 Jun 2019 17:23:23 -0400 (EDT) Received: from smtp63.i.mail.ru (smtp63.i.mail.ru [217.69.128.43]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 3470E30CFD for ; Wed, 19 Jun 2019 17:23:23 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v4 7/9] txn: introduce asynchronous txn commit Date: Thu, 20 Jun 2019 00:23:14 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko This commit implements asynchronous transaction processing using txn_write. The method prepares a transaction and sends it to an journal without an yield until the transaction was finished. The transaction status could be controlled via on_commit/on_rollback triggers. In order to support asynchronous transaction journal_write method turned to an asynchronous one and now a transaction engine controls journal status using journal entry finalization callback. Prerequisites: #1254 --- src/box/journal.c | 2 - src/box/journal.h | 10 +--- src/box/txn.c | 137 ++++++++++++++++++++++++++++++---------------- src/box/txn.h | 16 ++++++ src/box/wal.c | 23 +++----- 5 files changed, 116 insertions(+), 72 deletions(-) diff --git a/src/box/journal.c b/src/box/journal.c index eb0db9af2..b4f3515f0 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -30,7 +30,6 @@ */ #include "journal.h" #include -#include #include /** @@ -73,7 +72,6 @@ journal_entry_new(size_t n_rows, struct region *region, entry->approx_len = 0; entry->n_rows = n_rows; entry->res = -1; - entry->fiber = fiber(); entry->on_done_cb = on_done_cb; entry->on_done_cb_data = on_done_cb_data; return entry; diff --git a/src/box/journal.h b/src/box/journal.h index b704b5c67..e85ff2c9e 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -33,6 +33,7 @@ #include #include #include "salad/stailq.h" +#include "fiber.h" #if defined(__cplusplus) extern "C" { @@ -54,10 +55,6 @@ struct journal_entry { * the committed transaction, on error is -1 */ int64_t res; - /** - * The fiber issuing the request. - */ - struct fiber *fiber; /** * A journal entry completion callback. */ @@ -110,10 +107,9 @@ struct journal { extern struct journal *current_journal; /** - * Record a single entry. + * Send a single entry to write. * - * @return a log sequence number (vclock signature) of the entry - * or -1 on error. + * @return 0 if write was scheduled or -1 in case of an error. */ static inline int64_t journal_write(struct journal_entry *entry) diff --git a/src/box/txn.c b/src/box/txn.c index 52e16f3e6..493bc2e3c 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -199,6 +199,9 @@ txn_begin() txn->engine = NULL; txn->engine_tx = NULL; txn->psql_txn = NULL; + txn->entry = NULL; + txn->fiber = NULL; + txn->done = false; /* fiber_on_yield/fiber_on_stop initialized by engine on demand */ fiber_set_txn(fiber(), txn); trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL); @@ -359,7 +362,11 @@ txn_complete(struct txn *txn) panic("rollback trigger failed"); } fiber_set_txn(fiber(), NULL); - + txn->done = true; + if (txn->fiber == NULL) + txn_free(txn); + else if (txn->fiber != fiber()) + fiber_wakeup(txn->fiber); return; } /* @@ -368,6 +375,15 @@ txn_complete(struct txn *txn) */ if (txn->engine != NULL) engine_commit(txn->engine, txn); + + ev_tstamp stop_tm = ev_monotonic_now(loop()); + if (stop_tm - txn->start_tm > too_long_threshold) { + int n_rows = txn->n_new_rows + txn->n_applier_rows; + say_warn_ratelimited("too long WAL write: %d rows at " + "LSN %lld: %.3f sec", n_rows, + txn->signature - n_rows + 1, + stop_tm - txn->start_tm); + } /* * Some of triggers require for in_txn variable is set so * restore it for time a trigger is in progress. @@ -378,6 +394,7 @@ txn_complete(struct txn *txn) * may throw. In case an error has happened, there is * no other option but terminate. */ + fiber_set_txn(fiber(), txn); if (txn->has_triggers && trigger_run(&txn->on_commit, txn) != 0) { diag_log(); @@ -386,35 +403,41 @@ txn_complete(struct txn *txn) } fiber_set_txn(fiber(), NULL); + txn->done = true; + if (txn->fiber == NULL) + txn_free(txn); + else if (txn->fiber != fiber()) + fiber_wakeup(txn->fiber); } static void txn_entry_done_cb(struct journal_entry *entry, void *data) { struct txn *txn = (struct txn *)data; + assert(txn->entry == entry); txn->signature = entry->res; txn_complete(txn); } - static int64_t -txn_write_to_wal(struct txn *txn) +txn_journal_write(struct txn *txn) { + assert(txn->entry == NULL); assert(txn->n_new_rows + txn->n_applier_rows > 0); - struct journal_entry *req = journal_entry_new(txn->n_new_rows + - txn->n_applier_rows, - &txn->region, - txn_entry_done_cb, - txn); - if (req == NULL) { + /* Prepare a journal entry. */ + txn->entry = journal_entry_new(txn->n_new_rows + + txn->n_applier_rows, + &txn->region, + txn_entry_done_cb, txn); + if (txn->entry == NULL) { txn_rollback(txn); return -1; } struct txn_stmt *stmt; - struct xrow_header **remote_row = req->rows; - struct xrow_header **local_row = req->rows + txn->n_applier_rows; + struct xrow_header **remote_row = txn->entry->rows; + struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->row == NULL) continue; /* A read (e.g. select) request */ @@ -422,34 +445,25 @@ txn_write_to_wal(struct txn *txn) *local_row++ = stmt->row; else *remote_row++ = stmt->row; - req->approx_len += xrow_approx_len(stmt->row); + txn->entry->approx_len += xrow_approx_len(stmt->row); } - assert(remote_row == req->rows + txn->n_applier_rows); + assert(remote_row == txn->entry->rows + txn->n_applier_rows); assert(local_row == remote_row + txn->n_new_rows); - ev_tstamp start = ev_monotonic_now(loop()); - int64_t res = journal_write(req); - ev_tstamp stop = ev_monotonic_now(loop()); - - if (res < 0) { + /* Send entry to a journal. */ + if (journal_write(txn->entry) < 0) { diag_set(ClientError, ER_WAL_IO); - diag_log(); - } else if (stop - start > too_long_threshold) { - int n_rows = txn->n_new_rows + txn->n_applier_rows; - say_warn_ratelimited("too long WAL write: %d rows at " - "LSN %lld: %.3f sec", n_rows, - res - n_rows + 1, stop - start); + return -1; } - /* - * Use vclock_sum() from WAL writer as transaction signature. - */ - return res; + return 0; } -int -txn_commit(struct txn *txn) +/* + * Prepare a transaction using engines. + */ +static int +txn_prepare(struct txn *txn) { - assert(txn == in_txn()); /* * If transaction has been started in SQL, deferred * foreign key constraints must not be violated. @@ -459,7 +473,7 @@ txn_commit(struct txn *txn) struct sql_txn *sql_txn = txn->psql_txn; if (sql_txn->fk_deferred_count != 0) { diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT); - goto fail; + return -1; } } /* @@ -467,32 +481,63 @@ txn_commit(struct txn *txn) * we have a bunch of IPROTO_NOP statements. */ if (txn->engine != NULL) { - if (engine_prepare(txn->engine, txn) != 0) - goto fail; + if (engine_prepare(txn->engine, txn) != 0) { + return -1; + } } trigger_clear(&txn->fiber_on_stop); + return 0; +} - fiber_set_txn(fiber(), NULL); - if (txn->n_new_rows + txn->n_applier_rows > 0) { - txn->signature = txn_write_to_wal(txn); - if (txn->signature < 0) - return -1; - } else { - /* - * However there is noting to write to wal a completion - * should be fired. - */ +/* + * Send a transaction to a journal. + */ +int +txn_write(struct txn *txn) +{ + if (txn_prepare(txn) != 0) + goto fail; + + txn->start_tm = ev_monotonic_now(loop()); + if (txn->n_new_rows + txn->n_applier_rows == 0) { + /* Nothing to do. */ txn->signature = 0; txn_complete(txn); + return 0; } - txn_free(txn); + if (txn_journal_write(txn) != 0) + return -1; + fiber_set_txn(fiber(), NULL); return 0; fail: txn_rollback(txn); return -1; } +int +txn_commit(struct txn *txn) +{ + txn->fiber = fiber(); + + if (txn_write(txn) != 0) + return -1; + /* + * In case of non-yielding journal the transaction could already + * be done and there is nothing to wait in such cases. + */ + if (!txn->done) { + bool cancellable = fiber_set_cancellable(false); + fiber_yield(); + fiber_set_cancellable(cancellable); + } + int res = txn->signature >= 0? 0: -1; + if (res != 0) + diag_set(ClientError, ER_WAL_IO); + txn_free(txn); + return res; +} + void txn_rollback_stmt(struct txn *txn) { @@ -505,11 +550,9 @@ txn_rollback_stmt(struct txn *txn) void txn_rollback(struct txn *txn) { - assert(txn == in_txn()); trigger_clear(&txn->fiber_on_stop); txn->signature = -1; txn_complete(txn); - txn_free(txn); } void diff --git a/src/box/txn.h b/src/box/txn.h index 569978ce9..bd6b695a9 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -195,6 +195,16 @@ struct txn { /** Commit and rollback triggers */ struct rlist on_commit, on_rollback; struct sql_txn *psql_txn; + /** Journal entry to control txn write. */ + struct journal_entry *entry; + /** Transaction completion trigger. */ + struct trigger entry_done; + /** Timestampt of entry write start. */ + ev_tstamp start_tm; + /* A fiber to wake up when transaction is finished. */ + struct fiber *fiber; + /* True when transaction is processed. */ + bool done; }; /* Pointer to the current transaction (if any) */ @@ -228,6 +238,12 @@ txn_commit(struct txn *txn); void txn_rollback(struct txn *txn); +int +txn_write(struct txn *txn); + +int +txn_wait(struct txn *txn); + /** * Roll back the transaction but keep the object around. * A special case for memtx transaction abort on yield. In this diff --git a/src/box/wal.c b/src/box/wal.c index 62b6391fd..582ae4598 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -265,7 +265,6 @@ tx_schedule_f(va_list ap) struct journal_entry, fifo); if (req->on_done_cb != NULL) req->on_done_cb(req, req->on_done_cb_data); - fiber_wakeup(req->fiber); } writer->is_in_rollback = false; fiber_cond_wait(&writer->schedule_cond); @@ -274,7 +273,7 @@ tx_schedule_f(va_list ap) } /** - * Attach requests to a scheduling queue. + * Signal done condition. */ static void tx_schedule_queue(struct stailq *queue) @@ -380,7 +379,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, writer->wal_max_size = wal_max_size; writer->is_in_rollback = false; journal_create(&writer->base, wal_mode == WAL_NONE ? - wal_write_in_wal_mode_none : wal_write, NULL); + wal_write_in_wal_mode_none : wal_write, + NULL); struct xlog_opts opts = xlog_opts_default; opts.sync_is_async = true; @@ -1153,9 +1153,9 @@ wal_writer_f(va_list ap) /** * WAL writer main entry point: queue a single request - * to be written to disk and wait until this task is completed. + * to be written to disk. */ -int64_t +static int64_t wal_write(struct journal *journal, struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; @@ -1212,19 +1212,10 @@ wal_write(struct journal *journal, struct journal_entry *entry) batch->approx_len += entry->approx_len; writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX; cpipe_flush_input(&writer->wal_pipe); - /** - * It's not safe to spuriously wakeup this fiber - * since in that case it will ignore a possible - * error from WAL writer and not roll back the - * transaction. - */ - bool cancellable = fiber_set_cancellable(false); - fiber_yield(); /* Request was inserted. */ - fiber_set_cancellable(cancellable); - return entry->res; + return 0; } -int64_t +static int64_t wal_write_in_wal_mode_none(struct journal *journal, struct journal_entry *entry) { -- 2.22.0