From: Georgy Kirichenko <georgy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: Georgy Kirichenko <georgy@tarantool.org>
Subject: [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit
Date: Fri, 19 Apr 2019 15:44:06 +0300 [thread overview]
Message-ID: <11b51518548691fb892ccfa955438f000671af18.1555677159.git.georgy@tarantool.org> (raw)
In-Reply-To: <cover.1555677159.git.georgy@tarantool.org>
Allow asynchronous transaction commit. This adds two functions:
* txn_async_commit that sends a transaction to a journal
* txn_async_wait that waits until the transaction processing was done
Prerequisites: #1254
---
src/box/txn.c | 121 +++++++++++++++++++++++++++++++++++++-------------
src/box/txn.h | 10 +++++
2 files changed, 99 insertions(+), 32 deletions(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index 8f5b66480..eb57b861a 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -194,6 +194,7 @@ txn_begin()
txn->engine = NULL;
txn->engine_tx = NULL;
txn->psql_txn = NULL;
+ txn->entry = NULL;
/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
fiber_set_txn(fiber(), txn);
return txn;
@@ -324,20 +325,24 @@ journal_write_error_cb(struct trigger *trigger, void *event)
txn->engine = NULL;
}
+/*
+ * Send the txn to a journal.
+ */
static int64_t
-txn_write_to_wal(struct txn *txn)
+txn_journal_async_write(struct txn *txn)
{
+ assert(txn->entry == NULL);
assert(txn->n_new_rows + txn->n_applier_rows > 0);
+ /* Prepare a journal entry. */
struct journal_entry *req = journal_entry_new(txn->n_new_rows +
txn->n_applier_rows,
&txn->region);
if (req == NULL)
return -1;
- struct trigger on_error;
- trigger_create(&on_error, journal_write_error_cb, txn, NULL);
- journal_entry_on_error(req, &on_error);
+ trigger_create(&txn->on_error, journal_write_error_cb, txn, NULL);
+ journal_entry_on_error(req, &txn->on_error);
struct txn_stmt *stmt;
struct xrow_header **remote_row = req->rows;
@@ -354,31 +359,34 @@ txn_write_to_wal(struct txn *txn)
assert(remote_row == req->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
- ev_tstamp start = ev_monotonic_now(loop());
- int64_t res = journal_write(req);
- ev_tstamp stop = ev_monotonic_now(loop());
-
- if (res < 0) {
- /* Cascading rollback. */
- txn_rollback(txn); /* Perform our part of cascading rollback. */
+ txn->entry = req;
+ /* Send entry to a journal. */
+ if (journal_async_write(txn->entry) < 0) {
diag_set(ClientError, ER_WAL_IO);
- diag_log();
- } else if (stop - start > too_long_threshold) {
- int n_rows = txn->n_new_rows + txn->n_applier_rows;
- say_warn_ratelimited("too long WAL write: %d rows at "
- "LSN %lld: %.3f sec", n_rows,
- res - n_rows + 1, stop - start);
+ return -1;
}
- /*
- * Use vclock_sum() from WAL writer as transaction signature.
- */
- return res;
+ return 0;
}
-int
-txn_commit(struct txn *txn)
+/*
+ * Wait until journal processing finished.
+ */
+static int64_t
+txn_journal_async_wait(struct txn *txn)
+{
+ assert(txn->entry != NULL);
+ txn->signature = journal_async_wait(txn->entry);
+ if (txn->signature < 0)
+ diag_set(ClientError, ER_WAL_IO);
+ return txn->signature;
+}
+
+/*
+ * Prepare a transaction using engines.
+ */
+static int
+txn_prepare(struct txn *txn)
{
- assert(txn == in_txn());
/*
* If transaction has been started in SQL, deferred
* foreign key constraints must not be violated.
@@ -388,7 +396,7 @@ txn_commit(struct txn *txn)
struct sql_txn *sql_txn = txn->psql_txn;
if (sql_txn->fk_deferred_count != 0) {
diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
- goto fail;
+ return -1;
}
}
/*
@@ -396,15 +404,54 @@ txn_commit(struct txn *txn)
* we have a bunch of IPROTO_NOP statements.
*/
if (txn->engine != NULL) {
- if (engine_prepare(txn->engine, txn) != 0)
- goto fail;
+ if (engine_prepare(txn->engine, txn) != 0) {
+ return -1;
+ }
}
+ return 0;
+}
- if (txn->n_new_rows + txn->n_applier_rows > 0) {
- txn->signature = txn_write_to_wal(txn);
- if (txn->signature < 0)
- return -1;
+/*
+ * Send a transaction to a journal.
+ */
+int
+txn_async_commit(struct txn *txn)
+{
+ assert(txn == in_txn());
+ if (txn_prepare(txn) != 0)
+ goto fail;
+
+ txn->start_tm = ev_monotonic_now(loop());
+ if (txn->n_new_rows + txn->n_applier_rows == 0)
+ return 0;
+
+ if (txn_journal_async_write(txn) != 0)
+ goto fail;
+
+ return 0;
+fail:
+ txn_rollback(txn);
+ return -1;
+}
+
+/*
+ * Wait until transaction processing was finished.
+ */
+int
+txn_async_wait(struct txn *txn)
+{
+ if (txn->n_new_rows + txn->n_applier_rows > 0 &&
+ txn_journal_async_wait(txn) < 0)
+ goto fail;
+ ev_tstamp stop_tm = ev_monotonic_now(loop());
+ if (stop_tm - txn->start_tm > too_long_threshold) {
+ int n_rows = txn->n_new_rows + txn->n_applier_rows;
+ say_warn_ratelimited("too long WAL write: %d rows at "
+ "LSN %lld: %.3f sec", n_rows,
+ txn->signature - n_rows + 1,
+ stop_tm - txn->start_tm);
}
+
/*
* The transaction is in the binary log. No action below
* may throw. In case an error has happened, there is
@@ -417,7 +464,7 @@ txn_commit(struct txn *txn)
panic("commit trigger failed");
}
/*
- * Engine can be NULL if transaction contains IPROTO_NOP
+ * Engine can be NULL if the transaction contains IPROTO_NOP
* statements only.
*/
if (txn->engine != NULL)
@@ -430,11 +477,21 @@ txn_commit(struct txn *txn)
fiber_set_txn(fiber(), NULL);
txn_free(txn);
return 0;
+
fail:
txn_rollback(txn);
return -1;
}
+int
+txn_commit(struct txn *txn)
+{
+ if (txn_async_commit(txn) != 0 ||
+ txn_async_wait(txn) < 0)
+ return -1;
+ return 0;
+}
+
void
txn_rollback_stmt(struct txn *txn)
{
diff --git a/src/box/txn.h b/src/box/txn.h
index 96719638b..09f086fa4 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -187,6 +187,16 @@ struct txn {
/** Commit and rollback triggers */
struct rlist on_commit, on_rollback;
struct sql_txn *psql_txn;
+ /**
+ * Trigger to call if write failed.
+ */
+ struct trigger on_error;
+ /**
+ * Journal entry to control txn write.
+ */
+ struct journal_entry *entry;
+ /** Timestampt of entry write start. */
+ ev_tstamp start_tm;
};
/* Pointer to the current transaction (if any) */
--
2.21.0
next prev parent reply other threads:[~2019-04-19 12:44 UTC|newest]
Thread overview: 21+ messages / expand[flat|nested] mbox.gz Atom feed top
2019-04-19 12:43 [tarantool-patches] [PATCH 00/10] Transaction refactoring Georgy Kirichenko
2019-04-19 12:43 ` [tarantool-patches] [PATCH 01/10] Introduce a txn memory region Georgy Kirichenko
2019-04-24 18:20 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:43 ` [tarantool-patches] [PATCH 02/10] Alloc journal entry on " Georgy Kirichenko
2019-04-24 18:21 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:43 ` [tarantool-patches] [PATCH 03/10] Encode a dml statement to a transaction " Georgy Kirichenko
2019-04-24 18:28 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 04/10] Get rid of autocommit from a txn structure Georgy Kirichenko
2019-04-24 19:07 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 05/10] Get rid of fiber_gc from txn_rollback Georgy Kirichenko
2019-04-24 19:12 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 06/10] Require for txn in case of txn_begin_stmt/txn_rollback_stmt Georgy Kirichenko
2019-04-24 19:13 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 07/10] Remove fiber from a journal_entry structure Georgy Kirichenko
2019-04-24 19:16 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 08/10] Use mempool to alloc wal messages Georgy Kirichenko
2019-04-24 19:18 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` [tarantool-patches] [PATCH 09/10] Enable asyncronous wal writes Georgy Kirichenko
2019-04-24 19:19 ` [tarantool-patches] " Konstantin Osipov
2019-04-19 12:44 ` Georgy Kirichenko [this message]
2019-04-24 19:20 ` [tarantool-patches] Re: [PATCH 10/10] Introduce asynchronous txn commit Konstantin Osipov
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=11b51518548691fb892ccfa955438f000671af18.1555677159.git.georgy@tarantool.org \
--to=georgy@tarantool.org \
--cc=tarantool-patches@freelists.org \
--subject='Re: [tarantool-patches] [PATCH 10/10] Introduce asynchronous txn commit' \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox