From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 6E67F2F2EB for ; Thu, 23 May 2019 04:21:40 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id C_N0h0PLzeyo for ; Thu, 23 May 2019 04:21:40 -0400 (EDT) Received: from smtp37.i.mail.ru (smtp37.i.mail.ru [94.100.177.97]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 630D72F2A0 for ; Thu, 23 May 2019 04:19:48 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v2 8/8] Introduce asynchronous txn commit Date: Thu, 23 May 2019 11:19:40 +0300 Message-Id: <42f0c89b62a99f2a4109aa43a59d4db472e6c59c.1558598679.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Allow asynchronous transaction commit. This adds two functions: * txn_async_commit that sends a transaction to a journal * txn_async_wait that waits until the transaction processing was done Prerequisites: #1254 --- src/box/txn.c | 117 +++++++++++++++++++++++++++++++++++--------------- src/box/txn.h | 6 +++ 2 files changed, 89 insertions(+), 34 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index 273964d51..ae18db168 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -195,6 +195,7 @@ txn_begin() txn->engine = NULL; txn->engine_tx = NULL; txn->psql_txn = NULL; + txn->entry = NULL; /* fiber_on_yield/fiber_on_stop initialized by engine on demand */ fiber_set_txn(fiber(), txn); return txn; @@ -315,10 +316,12 @@ fail: } static int64_t -txn_write_to_wal(struct txn *txn) +txn_journal_async_write(struct txn *txn) { + assert(txn->entry == NULL); assert(txn->n_new_rows + txn->n_applier_rows > 0); + /* Prepare a journal entry. */ struct journal_entry *req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows, &txn->region); @@ -340,37 +343,34 @@ txn_write_to_wal(struct txn *txn) assert(remote_row == req->rows + txn->n_applier_rows); assert(local_row == remote_row + txn->n_new_rows); - ev_tstamp start = ev_monotonic_now(loop()); - int64_t res = journal_write(req); - ev_tstamp stop = ev_monotonic_now(loop()); - - if (res < 0) { - /* Cascading rollback. */ - txn_rollback(txn); /* Perform our part of cascading rollback. */ - /* - * Move fiber to end of event loop to avoid - * execution of any new requests before all - * pending rollbacks are processed. - */ - fiber_reschedule(); + txn->entry = req; + /* Send entry to a journal. */ + if (journal_async_write(txn->entry) < 0) { diag_set(ClientError, ER_WAL_IO); - diag_log(); - } else if (stop - start > too_long_threshold) { - int n_rows = txn->n_new_rows + txn->n_applier_rows; - say_warn_ratelimited("too long WAL write: %d rows at " - "LSN %lld: %.3f sec", n_rows, - res - n_rows + 1, stop - start); + return -1; } - /* - * Use vclock_sum() from WAL writer as transaction signature. - */ - return res; + return 0; } -int -txn_commit(struct txn *txn) +/* + * Wait until journal processing finished. + */ +static int64_t +txn_journal_async_wait(struct txn *txn) +{ + assert(txn->entry != NULL); + txn->signature = journal_async_wait(txn->entry); + if (txn->signature < 0) + diag_set(ClientError, ER_WAL_IO); + return txn->signature; +} + +/* + * Prepare a transaction using engines. + */ +static int +txn_prepare(struct txn *txn) { - assert(txn == in_txn()); /* * If transaction has been started in SQL, deferred * foreign key constraints must not be violated. @@ -380,7 +380,7 @@ txn_commit(struct txn *txn) struct sql_txn *sql_txn = txn->psql_txn; if (sql_txn->fk_deferred_count != 0) { diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT); - goto fail; + return -1; } } /* @@ -388,15 +388,54 @@ txn_commit(struct txn *txn) * we have a bunch of IPROTO_NOP statements. */ if (txn->engine != NULL) { - if (engine_prepare(txn->engine, txn) != 0) - goto fail; + if (engine_prepare(txn->engine, txn) != 0) { + return -1; + } } + return 0; +} - if (txn->n_new_rows + txn->n_applier_rows > 0) { - txn->signature = txn_write_to_wal(txn); - if (txn->signature < 0) - return -1; +/* + * Send a transaction to a journal. + */ +int +txn_async_commit(struct txn *txn) +{ + assert(txn == in_txn()); + if (txn_prepare(txn) != 0) + goto fail; + + txn->start_tm = ev_monotonic_now(loop()); + if (txn->n_new_rows + txn->n_applier_rows == 0) + return 0; + + if (txn_journal_async_write(txn) != 0) + goto fail; + + return 0; +fail: + txn_rollback(txn); + return -1; +} + +/* + * Wait until transaction processing was finished. + */ +int +txn_async_wait(struct txn *txn) +{ + if (txn->n_new_rows + txn->n_applier_rows > 0 && + txn_journal_async_wait(txn) < 0) + goto fail; + ev_tstamp stop_tm = ev_monotonic_now(loop()); + if (stop_tm - txn->start_tm > too_long_threshold) { + int n_rows = txn->n_new_rows + txn->n_applier_rows; + say_warn_ratelimited("too long WAL write: %d rows at " + "LSN %lld: %.3f sec", n_rows, + txn->signature - n_rows + 1, + stop_tm - txn->start_tm); } + /* * Engine can be NULL if transaction contains IPROTO_NOP * statements only. @@ -422,11 +461,21 @@ txn_commit(struct txn *txn) fiber_set_txn(fiber(), NULL); txn_free(txn); return 0; + fail: txn_rollback(txn); return -1; } +int +txn_commit(struct txn *txn) +{ + if (txn_async_commit(txn) != 0 || + txn_async_wait(txn) < 0) + return -1; + return 0; +} + void txn_rollback_stmt(struct txn *txn) { diff --git a/src/box/txn.h b/src/box/txn.h index 569978ce9..0b04b5488 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -195,6 +195,12 @@ struct txn { /** Commit and rollback triggers */ struct rlist on_commit, on_rollback; struct sql_txn *psql_txn; + /** + * Journal entry to control txn write. + */ + struct journal_entry *entry; + /** Timestampt of entry write start. */ + ev_tstamp start_tm; }; /* Pointer to the current transaction (if any) */ -- 2.21.0