From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 2EA9B2CCC5 for ; Fri, 19 Apr 2019 08:44:17 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id bAo_30LU8rkI for ; Fri, 19 Apr 2019 08:44:17 -0400 (EDT) Received: from smtp50.i.mail.ru (smtp50.i.mail.ru [94.100.177.110]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id DC8D12CCC0 for ; Fri, 19 Apr 2019 08:44:16 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit Date: Fri, 19 Apr 2019 15:44:06 +0300 Message-Id: <11b51518548691fb892ccfa955438f000671af18.1555677159.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Allow asynchronous transaction commit. This adds two functions: * txn_async_commit that sends a transaction to a journal * txn_async_wait that waits until the transaction processing was done Prerequisites: #1254 --- src/box/txn.c | 121 +++++++++++++++++++++++++++++++++++++------------- src/box/txn.h | 10 +++++ 2 files changed, 99 insertions(+), 32 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index 8f5b66480..eb57b861a 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -194,6 +194,7 @@ txn_begin() txn->engine = NULL; txn->engine_tx = NULL; txn->psql_txn = NULL; + txn->entry = NULL; /* fiber_on_yield/fiber_on_stop initialized by engine on demand */ fiber_set_txn(fiber(), txn); return txn; @@ -324,20 +325,24 @@ journal_write_error_cb(struct trigger *trigger, void *event) txn->engine = NULL; } +/* + * Send the txn to a journal. + */ static int64_t -txn_write_to_wal(struct txn *txn) +txn_journal_async_write(struct txn *txn) { + assert(txn->entry == NULL); assert(txn->n_new_rows + txn->n_applier_rows > 0); + /* Prepare a journal entry. */ struct journal_entry *req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows, &txn->region); if (req == NULL) return -1; - struct trigger on_error; - trigger_create(&on_error, journal_write_error_cb, txn, NULL); - journal_entry_on_error(req, &on_error); + trigger_create(&txn->on_error, journal_write_error_cb, txn, NULL); + journal_entry_on_error(req, &txn->on_error); struct txn_stmt *stmt; struct xrow_header **remote_row = req->rows; @@ -354,31 +359,34 @@ txn_write_to_wal(struct txn *txn) assert(remote_row == req->rows + txn->n_applier_rows); assert(local_row == remote_row + txn->n_new_rows); - ev_tstamp start = ev_monotonic_now(loop()); - int64_t res = journal_write(req); - ev_tstamp stop = ev_monotonic_now(loop()); - - if (res < 0) { - /* Cascading rollback. */ - txn_rollback(txn); /* Perform our part of cascading rollback. */ + txn->entry = req; + /* Send entry to a journal. */ + if (journal_async_write(txn->entry) < 0) { diag_set(ClientError, ER_WAL_IO); - diag_log(); - } else if (stop - start > too_long_threshold) { - int n_rows = txn->n_new_rows + txn->n_applier_rows; - say_warn_ratelimited("too long WAL write: %d rows at " - "LSN %lld: %.3f sec", n_rows, - res - n_rows + 1, stop - start); + return -1; } - /* - * Use vclock_sum() from WAL writer as transaction signature. - */ - return res; + return 0; } -int -txn_commit(struct txn *txn) +/* + * Wait until journal processing finished. + */ +static int64_t +txn_journal_async_wait(struct txn *txn) +{ + assert(txn->entry != NULL); + txn->signature = journal_async_wait(txn->entry); + if (txn->signature < 0) + diag_set(ClientError, ER_WAL_IO); + return txn->signature; +} + +/* + * Prepare a transaction using engines. + */ +static int +txn_prepare(struct txn *txn) { - assert(txn == in_txn()); /* * If transaction has been started in SQL, deferred * foreign key constraints must not be violated. @@ -388,7 +396,7 @@ txn_commit(struct txn *txn) struct sql_txn *sql_txn = txn->psql_txn; if (sql_txn->fk_deferred_count != 0) { diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT); - goto fail; + return -1; } } /* @@ -396,15 +404,54 @@ txn_commit(struct txn *txn) * we have a bunch of IPROTO_NOP statements. */ if (txn->engine != NULL) { - if (engine_prepare(txn->engine, txn) != 0) - goto fail; + if (engine_prepare(txn->engine, txn) != 0) { + return -1; + } } + return 0; +} - if (txn->n_new_rows + txn->n_applier_rows > 0) { - txn->signature = txn_write_to_wal(txn); - if (txn->signature < 0) - return -1; +/* + * Send a transaction to a journal. + */ +int +txn_async_commit(struct txn *txn) +{ + assert(txn == in_txn()); + if (txn_prepare(txn) != 0) + goto fail; + + txn->start_tm = ev_monotonic_now(loop()); + if (txn->n_new_rows + txn->n_applier_rows == 0) + return 0; + + if (txn_journal_async_write(txn) != 0) + goto fail; + + return 0; +fail: + txn_rollback(txn); + return -1; +} + +/* + * Wait until transaction processing was finished. + */ +int +txn_async_wait(struct txn *txn) +{ + if (txn->n_new_rows + txn->n_applier_rows > 0 && + txn_journal_async_wait(txn) < 0) + goto fail; + ev_tstamp stop_tm = ev_monotonic_now(loop()); + if (stop_tm - txn->start_tm > too_long_threshold) { + int n_rows = txn->n_new_rows + txn->n_applier_rows; + say_warn_ratelimited("too long WAL write: %d rows at " + "LSN %lld: %.3f sec", n_rows, + txn->signature - n_rows + 1, + stop_tm - txn->start_tm); } + /* * The transaction is in the binary log. No action below * may throw. In case an error has happened, there is @@ -417,7 +464,7 @@ txn_commit(struct txn *txn) panic("commit trigger failed"); } /* - * Engine can be NULL if transaction contains IPROTO_NOP + * Engine can be NULL if the transaction contains IPROTO_NOP * statements only. */ if (txn->engine != NULL) @@ -430,11 +477,21 @@ txn_commit(struct txn *txn) fiber_set_txn(fiber(), NULL); txn_free(txn); return 0; + fail: txn_rollback(txn); return -1; } +int +txn_commit(struct txn *txn) +{ + if (txn_async_commit(txn) != 0 || + txn_async_wait(txn) < 0) + return -1; + return 0; +} + void txn_rollback_stmt(struct txn *txn) { diff --git a/src/box/txn.h b/src/box/txn.h index 96719638b..09f086fa4 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -187,6 +187,16 @@ struct txn { /** Commit and rollback triggers */ struct rlist on_commit, on_rollback; struct sql_txn *psql_txn; + /** + * Trigger to call if write failed. + */ + struct trigger on_error; + /** + * Journal entry to control txn write. + */ + struct journal_entry *entry; + /** Timestampt of entry write start. */ + ev_tstamp start_tm; }; /* Pointer to the current transaction (if any) */ -- 2.21.0