[tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit
Georgy Kirichenko
georgy at tarantool.org
Sun Jun 9 23:44:40 MSK 2019
Allow asynchronous transaction commit. This adds two functions:
* txn_write that sends a transaction to a journal
* txn_wait that waits until the transaction processing was done
Prerequisites: #1254
---
src/box/box.cc | 24 ++---
src/box/journal.c | 22 +++--
src/box/journal.h | 40 +++++----
src/box/txn.c | 220 +++++++++++++++++++++++++++++++---------------
src/box/txn.h | 12 +++
src/box/vy_log.c | 2 +-
src/box/wal.c | 8 ++
7 files changed, 221 insertions(+), 107 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index d0616095b..510f3fc99 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -308,19 +308,21 @@ struct recovery_journal {
* min/max LSN of created LSM levels.
*/
static int64_t
-recovery_journal_async_write(struct journal *base,
- struct journal_entry *entry)
+recovery_journal_write(struct journal *base,
+ struct journal_entry *entry)
{
struct recovery_journal *journal = (struct recovery_journal *) base;
entry->res = vclock_sum(journal->vclock);
+ if (entry->on_done_cb)
+ entry->on_done_cb(entry, entry->on_done_cb_data);
entry->done = true;
fiber_cond_broadcast(&entry->done_cond);
return 0;
}
static int64_t
-recovery_journal_async_wait(struct journal *base,
- struct journal_entry *entry)
+recovery_journal_wait(struct journal *base,
+ struct journal_entry *entry)
{
(void) base;
assert(entry->done);
@@ -328,20 +330,20 @@ recovery_journal_async_wait(struct journal *base,
}
static int64_t
-recovery_journal_write(struct journal *base,
- struct journal_entry *entry)
+recovery_journal_write_sync(struct journal *base,
+ struct journal_entry *entry)
{
- if (recovery_journal_async_write(base, entry) == 0)
- return recovery_journal_async_wait(base, entry);
+ if (recovery_journal_write(base, entry) == 0)
+ return recovery_journal_wait(base, entry);
return -1;
}
static inline void
recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
{
- journal_create(&journal->base, recovery_journal_write,
- recovery_journal_async_write,
- recovery_journal_async_wait,
+ journal_create(&journal->base, recovery_journal_write_sync,
+ recovery_journal_write,
+ recovery_journal_wait,
NULL);
journal->vclock = v;
}
diff --git a/src/box/journal.c b/src/box/journal.c
index b978e6752..dadff771e 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -37,17 +37,19 @@
* but txn_commit() must work.
*/
static int64_t
-dummy_async_write(struct journal *journal, struct journal_entry *entry)
+dummy_journal_write(struct journal *journal, struct journal_entry *entry)
{
(void) journal;
entry->res = 0;
+ if (entry->on_done_cb)
+ entry->on_done_cb(entry, entry->on_done_cb_data);
entry->done = true;
fiber_cond_broadcast(&entry->done_cond);
return 0;
}
static int64_t
-dummy_async_wait(struct journal *journal, struct journal_entry *entry)
+dummy_journal_wait(struct journal *journal, struct journal_entry *entry)
{
(void) journal;
assert(entry->done);
@@ -55,24 +57,26 @@ dummy_async_wait(struct journal *journal, struct journal_entry *entry)
}
static int64_t
-dummy_journal_write(struct journal *journal, struct journal_entry *entry)
+dummy_journal_write_sync(struct journal *journal, struct journal_entry *entry)
{
- if (dummy_async_write(journal, entry) == 0)
- return dummy_async_wait(journal, entry);
+ if (dummy_journal_write(journal, entry) == 0)
+ return dummy_journal_wait(journal, entry);
return -1;
}
static struct journal dummy_journal = {
+ dummy_journal_write_sync,
dummy_journal_write,
- dummy_async_write,
- dummy_async_wait,
+ dummy_journal_wait,
NULL,
};
struct journal *current_journal = &dummy_journal;
struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region)
+journal_entry_new(size_t n_rows, struct region *region,
+ void (*on_done_cb)(struct journal_entry *entry, void *data),
+ void *on_done_cb_data)
{
struct journal_entry *entry;
@@ -90,6 +94,8 @@ journal_entry_new(size_t n_rows, struct region *region)
entry->res = -1;
entry->done = false;
fiber_cond_create(&entry->done_cond);
+ entry->on_done_cb = on_done_cb;
+ entry->on_done_cb_data = on_done_cb_data;
return entry;
}
diff --git a/src/box/journal.h b/src/box/journal.h
index e7fe9154a..c7e467969 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -63,6 +63,14 @@ struct journal_entry {
* Condition to broadcast when processing is done.
*/
struct fiber_cond done_cond;
+ /**
+ * A journal entry completion callback.
+ */
+ void (*on_done_cb)(struct journal_entry *entry, void *data);
+ /**
+ * A journal entry completion callback argument.
+ */
+ void *on_done_cb_data;
/**
* Approximate size of this request when encoded.
*/
@@ -85,7 +93,9 @@ struct region;
* @return NULL if out of memory, fiber diagnostics area is set
*/
struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region);
+journal_entry_new(size_t n_rows, struct region *region,
+ void (*on_done_cb)(struct journal_entry *entry, void *data),
+ void *on_done_cb_data);
/**
* An API for an abstract journal for all transactions of this
@@ -93,12 +103,12 @@ journal_entry_new(size_t n_rows, struct region *region);
* synchronous replication.
*/
struct journal {
+ int64_t (*write_sync)(struct journal *journal,
+ struct journal_entry *req);
int64_t (*write)(struct journal *journal,
struct journal_entry *req);
- int64_t (*async_write)(struct journal *journal,
- struct journal_entry *req);
- int64_t (*async_wait)(struct journal *journal,
- struct journal_entry *req);
+ int64_t (*wait)(struct journal *journal,
+ struct journal_entry *req);
void (*destroy)(struct journal *journal);
};
@@ -115,9 +125,9 @@ extern struct journal *current_journal;
* or -1 on error.
*/
static inline int64_t
-journal_write(struct journal_entry *entry)
+journal_write_sync(struct journal_entry *entry)
{
- return current_journal->write(current_journal, entry);
+ return current_journal->write_sync(current_journal, entry);
}
/**
@@ -126,9 +136,9 @@ journal_write(struct journal_entry *entry)
* @return 0 if write was scheduled or -1 on error.
*/
static inline int64_t
-journal_async_write(struct journal_entry *entry)
+journal_write(struct journal_entry *entry)
{
- return current_journal->async_write(current_journal, entry);
+ return current_journal->write(current_journal, entry);
}
/**
@@ -137,9 +147,9 @@ journal_async_write(struct journal_entry *entry)
* or -1 on error.
*/
static inline int64_t
-journal_async_wait(struct journal_entry *entry)
+journal_wait(struct journal_entry *entry)
{
- return current_journal->async_wait(current_journal, entry);
+ return current_journal->wait(current_journal, entry);
}
/**
@@ -173,14 +183,14 @@ journal_set(struct journal *new_journal)
static inline void
journal_create(struct journal *journal,
+ int64_t (*write_sync)(struct journal *, struct journal_entry *),
int64_t (*write)(struct journal *, struct journal_entry *),
- int64_t (*async_write)(struct journal *, struct journal_entry *),
- int64_t (*async_wait)(struct journal *, struct journal_entry *),
+ int64_t (*wait)(struct journal *, struct journal_entry *),
void (*destroy)(struct journal *))
{
+ journal->write_sync = write_sync;
journal->write = write;
- journal->async_write = async_write,
- journal->async_wait = async_wait,
+ journal->wait = wait;
journal->destroy = destroy;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index a08652af1..815d635fe 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -199,6 +199,7 @@ txn_begin()
txn->engine = NULL;
txn->engine_tx = NULL;
txn->psql_txn = NULL;
+ txn->entry = NULL;
/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
fiber_set_txn(fiber(), txn);
return txn;
@@ -318,22 +319,77 @@ fail:
return -1;
}
+/**
+ * Complete transaction processing.
+ */
+static void
+txn_complete(struct txn *txn)
+{
+ if (txn->signature < 0) {
+ if (txn->engine)
+ engine_rollback(txn->engine, txn);
+ /* Rollback triggers must not throw. */
+ fiber_set_txn(fiber(), txn);
+ if (txn->has_triggers &&
+ trigger_run(&txn->on_rollback, txn) != 0) {
+ diag_log();
+ unreachable();
+ panic("rollback trigger failed");
+ }
+ fiber_set_txn(fiber(), NULL);
+
+ return;
+ }
+ /*
+ * Engine can be NULL if transaction contains IPROTO_NOP
+ * statements only.
+ */
+ if (txn->engine != NULL)
+ engine_commit(txn->engine, txn);
+ /*
+ * The transaction is in the binary log. No action below
+ * may throw. In case an error has happened, there is
+ * no other option but terminate.
+ */
+ fiber_set_txn(fiber(), txn);
+ if (txn->has_triggers &&
+ trigger_run(&txn->on_commit, txn) != 0) {
+ diag_log();
+ unreachable();
+ panic("commit trigger failed");
+ }
+
+ fiber_set_txn(fiber(), NULL);
+}
+
+static void
+txn_entry_done_cb(struct journal_entry *entry, void *data)
+{
+ struct txn *txn = (struct txn *)data;
+ assert(txn->entry == entry);
+ txn->signature = entry->res;
+ txn_complete(txn);
+}
+
static int64_t
-txn_write_to_wal(struct txn *txn)
+txn_journal_write(struct txn *txn)
{
+ assert(txn->entry == NULL);
assert(txn->n_new_rows + txn->n_applier_rows > 0);
- struct journal_entry *req = journal_entry_new(txn->n_new_rows +
- txn->n_applier_rows,
- &txn->region);
- if (req == NULL) {
+ /* Prepare a journal entry. */
+ txn->entry = journal_entry_new(txn->n_new_rows +
+ txn->n_applier_rows,
+ &txn->region,
+ txn_entry_done_cb, txn);
+ if (txn->entry == NULL) {
txn_rollback(txn);
return -1;
}
struct txn_stmt *stmt;
- struct xrow_header **remote_row = req->rows;
- struct xrow_header **local_row = req->rows + txn->n_applier_rows;
+ struct xrow_header **remote_row = txn->entry->rows;
+ struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows;
stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->row == NULL)
continue; /* A read (e.g. select) request */
@@ -341,42 +397,39 @@ txn_write_to_wal(struct txn *txn)
*local_row++ = stmt->row;
else
*remote_row++ = stmt->row;
- req->approx_len += xrow_approx_len(stmt->row);
+ txn->entry->approx_len += xrow_approx_len(stmt->row);
}
- assert(remote_row == req->rows + txn->n_applier_rows);
+ assert(remote_row == txn->entry->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
- ev_tstamp start = ev_monotonic_now(loop());
- int64_t res = journal_write(req);
- ev_tstamp stop = ev_monotonic_now(loop());
-
- if (res < 0) {
- /* Cascading rollback. */
- txn_rollback(txn); /* Perform our part of cascading rollback. */
- /*
- * Move fiber to end of event loop to avoid
- * execution of any new requests before all
- * pending rollbacks are processed.
- */
- fiber_reschedule();
+ /* Send entry to a journal. */
+ if (journal_write(txn->entry) < 0) {
diag_set(ClientError, ER_WAL_IO);
- diag_log();
- } else if (stop - start > too_long_threshold) {
- int n_rows = txn->n_new_rows + txn->n_applier_rows;
- say_warn_ratelimited("too long WAL write: %d rows at "
- "LSN %lld: %.3f sec", n_rows,
- res - n_rows + 1, stop - start);
+ return -1;
}
- /*
- * Use vclock_sum() from WAL writer as transaction signature.
- */
- return res;
+ return 0;
}
-int
-txn_commit(struct txn *txn)
+/*
+ * Wait until journal processing finished.
+ */
+static int64_t
+txn_journal_wait(struct txn *txn)
+{
+ assert(txn->entry != NULL);
+ int64_t signature = journal_wait(txn->entry);
+ assert(signature == txn->signature);
+ if (signature < 0)
+ diag_set(ClientError, ER_WAL_IO);
+ return signature;
+}
+
+/*
+ * Prepare a transaction using engines.
+ */
+static int
+txn_prepare(struct txn *txn)
{
- assert(txn == in_txn());
/*
* If transaction has been started in SQL, deferred
* foreign key constraints must not be violated.
@@ -386,7 +439,7 @@ txn_commit(struct txn *txn)
struct sql_txn *sql_txn = txn->psql_txn;
if (sql_txn->fk_deferred_count != 0) {
diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
- goto fail;
+ return -1;
}
}
/*
@@ -394,42 +447,75 @@ txn_commit(struct txn *txn)
* we have a bunch of IPROTO_NOP statements.
*/
if (txn->engine != NULL) {
- if (engine_prepare(txn->engine, txn) != 0)
- goto fail;
+ if (engine_prepare(txn->engine, txn) != 0) {
+ return -1;
+ }
}
+ return 0;
+}
- if (txn->n_new_rows + txn->n_applier_rows > 0) {
- txn->signature = txn_write_to_wal(txn);
- if (txn->signature < 0)
- return -1;
+/*
+ * Send a transaction to a journal.
+ */
+int
+txn_write(struct txn *txn)
+{
+ if (txn_prepare(txn) != 0)
+ goto fail;
+
+ txn->start_tm = ev_monotonic_now(loop());
+ if (txn->n_new_rows + txn->n_applier_rows == 0) {
+ /* Nothing to do. */
+ txn->signature = 0;
+ txn_complete(txn);
+ return 0;
}
- /*
- * Engine can be NULL if transaction contains IPROTO_NOP
- * statements only.
- */
- if (txn->engine != NULL)
- engine_commit(txn->engine, txn);
- /*
- * The transaction is in the binary log. No action below
- * may throw. In case an error has happened, there is
- * no other option but terminate.
- */
- if (txn->has_triggers &&
- trigger_run(&txn->on_commit, txn) != 0) {
- diag_log();
- unreachable();
- panic("commit trigger failed");
+
+ if (txn_journal_write(txn) != 0)
+ goto fail;
+ fiber_set_txn(fiber(), NULL);
+ return 0;
+fail:
+ txn_rollback(txn);
+ return -1;
+}
+
+/*
+ * Wait until transaction processing was finished.
+ */
+int
+txn_wait(struct txn *txn)
+{
+ if (txn->n_new_rows + txn->n_applier_rows > 0 &&
+ txn_journal_wait(txn) < 0)
+ goto fail;
+ ev_tstamp stop_tm = ev_monotonic_now(loop());
+ if (stop_tm - txn->start_tm > too_long_threshold) {
+ int n_rows = txn->n_new_rows + txn->n_applier_rows;
+ say_warn_ratelimited("too long WAL write: %d rows at "
+ "LSN %lld: %.3f sec", n_rows,
+ txn->signature - n_rows + 1,
+ stop_tm - txn->start_tm);
}
- fiber_set_txn(fiber(), NULL);
txn_free(txn);
return 0;
+
fail:
- txn_rollback(txn);
+ txn_free(txn);
return -1;
}
+int
+txn_commit(struct txn *txn)
+{
+ if (txn_write(txn) != 0 ||
+ txn_wait(txn) < 0)
+ return -1;
+ return 0;
+}
+
void
txn_rollback_stmt(struct txn *txn)
{
@@ -442,18 +528,8 @@ txn_rollback_stmt(struct txn *txn)
void
txn_rollback(struct txn *txn)
{
- assert(txn == in_txn());
- if (txn->engine)
- engine_rollback(txn->engine, txn);
- /* Rollback triggers must not throw. */
- if (txn->has_triggers &&
- trigger_run(&txn->on_rollback, txn) != 0) {
- diag_log();
- unreachable();
- panic("rollback trigger failed");
- }
-
- fiber_set_txn(fiber(), NULL);
+ txn->signature = -1;
+ txn_complete(txn);
txn_free(txn);
}
diff --git a/src/box/txn.h b/src/box/txn.h
index d211e5012..84d4f27d3 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -195,6 +195,12 @@ struct txn {
/** Commit and rollback triggers */
struct rlist on_commit, on_rollback;
struct sql_txn *psql_txn;
+ /** Journal entry to control txn write. */
+ struct journal_entry *entry;
+ /** Transaction completion trigger. */
+ struct trigger entry_done;
+ /** Timestampt of entry write start. */
+ ev_tstamp start_tm;
};
/* Pointer to the current transaction (if any) */
@@ -228,6 +234,12 @@ txn_commit(struct txn *txn);
void
txn_rollback(struct txn *txn);
+int
+txn_write(struct txn *txn);
+
+int
+txn_wait(struct txn *txn);
+
/**
* Roll back the transaction but keep the object around.
* A special case for memtx transaction abort on yield. In this
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index bdc1cfa31..d37d44011 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -772,7 +772,7 @@ vy_log_flush(void)
tx_size++;
size_t used = region_used(&fiber()->gc);
- struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
+ struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);
if (entry == NULL)
goto err;
diff --git a/src/box/wal.c b/src/box/wal.c
index e868a8e71..eff48b4fe 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -272,6 +272,8 @@ tx_schedule_f(va_list ap)
struct journal_entry *req =
stailq_shift_entry(&writer->schedule_queue,
struct journal_entry, fifo);
+ if (req->on_done_cb != NULL)
+ req->on_done_cb(req, req->on_done_cb_data);
req->done = true;
fiber_cond_broadcast(&req->done_cond);
}
@@ -1184,6 +1186,8 @@ wal_async_write(struct journal *journal, struct journal_entry *entry)
say_error("Aborting transaction %llu during "
"cascading rollback",
vclock_sum(&writer->vclock));
+ if (entry->on_done_cb)
+ entry->on_done_cb(entry, entry->on_done_cb_data);
entry->done = true;
fiber_cond_broadcast(&entry->done_cond);
return -1;
@@ -1200,6 +1204,8 @@ wal_async_write(struct journal *journal, struct journal_entry *entry)
if (batch == NULL) {
diag_set(OutOfMemory, sizeof(struct wal_msg),
"region", "struct wal_msg");
+ if (entry->on_done_cb)
+ entry->on_done_cb(entry, entry->on_done_cb_data);
entry->done = true;
fiber_cond_broadcast(&entry->done_cond);
return -1;
@@ -1253,6 +1259,8 @@ wal_async_write_in_wal_mode_none(struct journal *journal,
vclock_merge(&writer->vclock, &vclock_diff);
vclock_copy(&replicaset.vclock, &writer->vclock);
entry->res = vclock_sum(&writer->vclock);
+ if (entry->on_done_cb)
+ entry->on_done_cb(entry, entry->on_done_cb_data);
entry->done = true;
fiber_cond_broadcast(&entry->done_cond);
return 0;
--
2.21.0
More information about the Tarantool-patches
mailing list