[tarantool-patches] [PATCH v5 5/7] txn: introduce asynchronous txn commit
Georgy Kirichenko
georgy at tarantool.org
Sat Jun 22 00:48:19 MSK 2019
This commit implements asynchronous transaction processing using
txn_write. The method prepares a transaction and sends it to an journal
without an yield until the transaction was finished. The transaction
status could be controlled via on_commit/on_rollback triggers.
In order to support asynchronous transaction journal_write method turned
to an asynchronous one and now a transaction engine controls journal status
using journal entry finalization callback.
Prerequisites: #1254
---
src/box/journal.c | 2 -
src/box/journal.h | 10 +--
src/box/txn.c | 153 ++++++++++++++++++++++++++++++----------------
src/box/txn.h | 16 +++++
src/box/wal.c | 29 ++-------
5 files changed, 125 insertions(+), 85 deletions(-)
diff --git a/src/box/journal.c b/src/box/journal.c
index 4c1997f36..d762613dd 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -30,7 +30,6 @@
*/
#include "journal.h"
#include <small/region.h>
-#include <fiber.h>
#include <diag.h>
/**
@@ -72,7 +71,6 @@ journal_entry_new(size_t n_rows, struct region *region,
entry->approx_len = 0;
entry->n_rows = n_rows;
entry->res = -1;
- entry->fiber = fiber();
entry->on_done_cb = on_done_cb;
entry->on_done_cb_data = on_done_cb_data;
return entry;
diff --git a/src/box/journal.h b/src/box/journal.h
index 52b8a715c..cac82c15e 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -33,6 +33,7 @@
#include <stdint.h>
#include <stdbool.h>
#include "salad/stailq.h"
+#include "fiber.h"
#if defined(__cplusplus)
extern "C" {
@@ -59,10 +60,6 @@ struct journal_entry {
* the committed transaction, on error is -1
*/
int64_t res;
- /**
- * The fiber issuing the request.
- */
- struct fiber *fiber;
/**
* A journal entry finalization callback which is going to be called
* after the entry processing was winished in both cases: succes
@@ -127,10 +124,9 @@ struct journal {
extern struct journal *current_journal;
/**
- * Record a single entry.
+ * Send a single entry to write.
*
- * @return a log sequence number (vclock signature) of the entry
- * or -1 on error.
+ * @return 0 if write was scheduled or -1 in case of an error.
*/
static inline int64_t
journal_write(struct journal_entry *entry)
diff --git a/src/box/txn.c b/src/box/txn.c
index 5825acc34..f331642f9 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -199,6 +199,9 @@ txn_begin()
txn->engine = NULL;
txn->engine_tx = NULL;
txn->psql_txn = NULL;
+ txn->entry = NULL;
+ txn->fiber = NULL;
+ txn->done = false;
/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
fiber_set_txn(fiber(), txn);
trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
@@ -374,7 +377,6 @@ txn_complete(struct txn *txn)
if (txn->has_triggers)
txn_process_trigger(&txn->on_rollback, txn);
- return;
} else {
/* Accept the transaction. */
/*
@@ -385,6 +387,27 @@ txn_complete(struct txn *txn)
engine_commit(txn->engine, txn);
if (txn->has_triggers)
txn_process_trigger(&txn->on_commit, txn);
+ ev_tstamp stop_tm = ev_monotonic_now(loop());
+ if (stop_tm - txn->start_tm > too_long_threshold) {
+ int n_rows = txn->n_new_rows + txn->n_applier_rows;
+ say_warn_ratelimited("too long WAL write: %d rows at "
+ "LSN %lld: %.3f sec", n_rows,
+ txn->signature - n_rows + 1,
+ stop_tm - txn->start_tm);
+ }
+ }
+ /*
+ * If there is no fiber waiting for the transaction then
+ * the transaction could be safely freed. In the opposite case
+ * the fiber is in duty to free this transaction.
+ */
+ if (txn->fiber == NULL)
+ txn_free(txn);
+ else {
+ txn->done = true;
+ if (txn->fiber != fiber())
+ /* Wake a waiting fiber up. */
+ fiber_wakeup(txn->fiber);
}
}
@@ -392,29 +415,30 @@ static void
txn_entry_done_cb(struct journal_entry *entry, void *data)
{
struct txn *txn = (struct txn *)data;
+ assert(txn->entry == entry);
txn->signature = entry->res;
txn_complete(txn);
}
-
static int64_t
txn_write_to_wal(struct txn *txn)
{
+ assert(txn->entry == NULL);
assert(txn->n_new_rows + txn->n_applier_rows > 0);
- struct journal_entry *req = journal_entry_new(txn->n_new_rows +
- txn->n_applier_rows,
- &txn->region,
- txn_entry_done_cb,
- txn);
- if (req == NULL) {
+ /* Prepare a journal entry. */
+ txn->entry = journal_entry_new(txn->n_new_rows +
+ txn->n_applier_rows,
+ &txn->region,
+ txn_entry_done_cb, txn);
+ if (txn->entry == NULL) {
txn_rollback(txn);
return -1;
}
struct txn_stmt *stmt;
- struct xrow_header **remote_row = req->rows;
- struct xrow_header **local_row = req->rows + txn->n_applier_rows;
+ struct xrow_header **remote_row = txn->entry->rows;
+ struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows;
stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->row == NULL)
continue; /* A read (e.g. select) request */
@@ -422,40 +446,26 @@ txn_write_to_wal(struct txn *txn)
*local_row++ = stmt->row;
else
*remote_row++ = stmt->row;
- req->approx_len += xrow_approx_len(stmt->row);
+ txn->entry->approx_len += xrow_approx_len(stmt->row);
}
- assert(remote_row == req->rows + txn->n_applier_rows);
+ assert(remote_row == txn->entry->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
- ev_tstamp start = ev_monotonic_now(loop());
- int64_t res = journal_write(req);
- ev_tstamp stop = ev_monotonic_now(loop());
-
- if (res < 0) {
+ /* Send entry to a journal. */
+ if (journal_write(txn->entry) < 0) {
diag_set(ClientError, ER_WAL_IO);
diag_log();
- /*
- * However, the transaction is rolled back by
- * finalization handler we are still in duty to
- * free it.
- */
- txn_free(txn);
- } else if (stop - start > too_long_threshold) {
- int n_rows = txn->n_new_rows + txn->n_applier_rows;
- say_warn_ratelimited("too long WAL write: %d rows at "
- "LSN %lld: %.3f sec", n_rows,
- res - n_rows + 1, stop - start);
+ return -1;
}
- /*
- * Use vclock_sum() from WAL writer as transaction signature.
- */
- return res;
+ return 0;
}
-int
-txn_commit(struct txn *txn)
+/*
+ * Prepare a transaction using engines.
+ */
+static int
+txn_prepare(struct txn *txn)
{
- assert(txn == in_txn());
/*
* If transaction has been started in SQL, deferred
* foreign key constraints must not be violated.
@@ -465,7 +475,7 @@ txn_commit(struct txn *txn)
struct sql_txn *sql_txn = txn->psql_txn;
if (sql_txn->fk_deferred_count != 0) {
diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
- goto fail;
+ return -1;
}
}
/*
@@ -473,33 +483,71 @@ txn_commit(struct txn *txn)
* we have a bunch of IPROTO_NOP statements.
*/
if (txn->engine != NULL) {
- if (engine_prepare(txn->engine, txn) != 0)
- goto fail;
+ if (engine_prepare(txn->engine, txn) != 0) {
+ return -1;
+ }
}
trigger_clear(&txn->fiber_on_stop);
+ return 0;
+}
+
+/*
+ * Send a transaction to a journal.
+ */
+int
+txn_write(struct txn *txn)
+{
+ if (txn_prepare(txn) != 0) {
+ txn_rollback(txn);
+ return -1;
+ }
/*
* After this transaction could not be used more
* so reset corresponding key in a fiber storage.
- */
+ */
fiber_set_txn(fiber(), NULL);
- if (txn->n_new_rows + txn->n_applier_rows > 0) {
- txn->signature = txn_write_to_wal(txn);
- if (txn->signature < 0)
- return -1;
- } else {
- /*
- * However, there is nothing to write to wal a finalization
- * should be fired.
- */
+ txn->start_tm = ev_monotonic_now(loop());
+ if (txn->n_new_rows + txn->n_applier_rows == 0) {
+ /* Nothing to do. */
txn->signature = 0;
txn_complete(txn);
+ return 0;
+ }
+
+ if (txn_write_to_wal(txn) != 0) {
+ /*
+ * After journal write the transaction would be finalized
+ * with its journal entry finalization callback,
+ * just return an error.
+ */
+ return -1;
}
- txn_free(txn);
return 0;
-fail:
- txn_rollback(txn);
- return -1;
+}
+
+int
+txn_commit(struct txn *txn)
+{
+ txn->fiber = fiber();
+
+ if (txn_write(txn) != 0)
+ return -1;
+ /*
+ * In case of non-yielding journal the transaction could already
+ * be done and there is nothing to wait in such cases.
+ */
+ if (!txn->done) {
+ bool cancellable = fiber_set_cancellable(false);
+ fiber_yield();
+ fiber_set_cancellable(cancellable);
+ }
+ int res = txn->signature >= 0? 0: -1;
+ if (res != 0)
+ diag_set(ClientError, ER_WAL_IO);
+ /* As the current fiber is waiting for the transaction so free it. */
+ txn_free(txn);
+ return res;
}
void
@@ -518,7 +566,6 @@ txn_rollback(struct txn *txn)
trigger_clear(&txn->fiber_on_stop);
txn->signature = -1;
txn_complete(txn);
- txn_free(txn);
fiber_set_txn(fiber(), NULL);
}
diff --git a/src/box/txn.h b/src/box/txn.h
index f20428fad..ddcac3bb9 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -195,6 +195,16 @@ struct txn {
/** Commit and rollback triggers */
struct rlist on_commit, on_rollback;
struct sql_txn *psql_txn;
+ /** Journal entry to control txn write. */
+ struct journal_entry *entry;
+ /** Transaction completion trigger. */
+ struct trigger entry_done;
+ /** Timestampt of entry write start. */
+ ev_tstamp start_tm;
+ /* A fiber to wake up when transaction is finished. */
+ struct fiber *fiber;
+ /* True when transaction is processed. */
+ bool done;
};
/* Pointer to the current transaction (if any) */
@@ -228,6 +238,12 @@ txn_commit(struct txn *txn);
void
txn_rollback(struct txn *txn);
+int
+txn_write(struct txn *txn);
+
+int
+txn_wait(struct txn *txn);
+
/**
* Roll back the transaction but keep the object around.
* A special case for memtx transaction abort on yield. In this
diff --git a/src/box/wal.c b/src/box/wal.c
index 4fa9beca0..dce5fee6b 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -245,13 +245,6 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
return xlog_tx_commit(l);
}
-/**
- * Invoke fibers waiting for their journal_entry's to be
- * completed. The fibers are invoked in strict fifo order:
- * this ensures that, in case of rollback, requests are
- * rolled back in strict reverse order, producing
- * a consistent database state.
- */
static void
tx_schedule_queue(struct stailq *queue)
{
@@ -259,10 +252,9 @@ tx_schedule_queue(struct stailq *queue)
* fiber_wakeup() is faster than fiber_call() when there
* are many ready fibers.
*/
- struct journal_entry *req;
- stailq_foreach_entry(req, queue, fifo) {
+ struct journal_entry *req, *tmp;
+ stailq_foreach_entry_safe(req, tmp, queue, fifo) {
journal_entry_complete(req);
- fiber_wakeup(req->fiber);
}
}
@@ -1126,9 +1118,9 @@ wal_writer_f(va_list ap)
/**
* WAL writer main entry point: queue a single request
- * to be written to disk and wait until this task is completed.
+ * to be written to disk.
*/
-int64_t
+static int64_t
wal_write(struct journal *journal, struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
@@ -1176,16 +1168,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
batch->approx_len += entry->approx_len;
writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
cpipe_flush_input(&writer->wal_pipe);
- /**
- * It's not safe to spuriously wakeup this fiber
- * since in that case it will ignore a possible
- * error from WAL writer and not roll back the
- * transaction.
- */
- bool cancellable = fiber_set_cancellable(false);
- fiber_yield(); /* Request was inserted. */
- fiber_set_cancellable(cancellable);
- return entry->res;
+ return 0;
fail:
entry->res = -1;
@@ -1193,7 +1176,7 @@ fail:
return -1;
}
-int64_t
+static int64_t
wal_write_in_wal_mode_none(struct journal *journal,
struct journal_entry *entry)
{
--
2.22.0
More information about the Tarantool-patches
mailing list