[tarantool-patches] [PATCH v4 7/9] txn: introduce asynchronous txn commit

Georgy Kirichenko georgy at tarantool.org
Thu Jun 20 00:23:14 MSK 2019


This commit implements asynchronous transaction processing using
txn_write. The method prepares a transaction and sends it to an journal
without an yield until the transaction was finished. The transaction
status could be controlled via on_commit/on_rollback triggers.
In order to support asynchronous transaction journal_write method turned
to an asynchronous one and now a transaction engine controls journal status
using journal entry finalization callback.

Prerequisites: #1254
---
 src/box/journal.c |   2 -
 src/box/journal.h |  10 +---
 src/box/txn.c     | 137 ++++++++++++++++++++++++++++++----------------
 src/box/txn.h     |  16 ++++++
 src/box/wal.c     |  23 +++-----
 5 files changed, 116 insertions(+), 72 deletions(-)

diff --git a/src/box/journal.c b/src/box/journal.c
index eb0db9af2..b4f3515f0 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -30,7 +30,6 @@
  */
 #include "journal.h"
 #include <small/region.h>
-#include <fiber.h>
 #include <diag.h>
 
 /**
@@ -73,7 +72,6 @@ journal_entry_new(size_t n_rows, struct region *region,
 	entry->approx_len = 0;
 	entry->n_rows = n_rows;
 	entry->res = -1;
-	entry->fiber = fiber();
 	entry->on_done_cb = on_done_cb;
 	entry->on_done_cb_data = on_done_cb_data;
 	return entry;
diff --git a/src/box/journal.h b/src/box/journal.h
index b704b5c67..e85ff2c9e 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -33,6 +33,7 @@
 #include <stdint.h>
 #include <stdbool.h>
 #include "salad/stailq.h"
+#include "fiber.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -54,10 +55,6 @@ struct journal_entry {
 	 * the committed transaction, on error is -1
 	 */
 	int64_t res;
-	/**
-	 * The fiber issuing the request.
-	 */
-	struct fiber *fiber;
 	/**
 	 * A journal entry completion callback.
 	 */
@@ -110,10 +107,9 @@ struct journal {
 extern struct journal *current_journal;
 
 /**
- * Record a single entry.
+ * Send a single entry to write.
  *
- * @return   a log sequence number (vclock signature) of the entry
- *           or -1 on error.
+ * @return 0 if write was scheduled or -1 in case of an error.
  */
 static inline int64_t
 journal_write(struct journal_entry *entry)
diff --git a/src/box/txn.c b/src/box/txn.c
index 52e16f3e6..493bc2e3c 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -199,6 +199,9 @@ txn_begin()
 	txn->engine = NULL;
 	txn->engine_tx = NULL;
 	txn->psql_txn = NULL;
+	txn->entry = NULL;
+	txn->fiber = NULL;
+	txn->done = false;
 	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
 	fiber_set_txn(fiber(), txn);
 	trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
@@ -359,7 +362,11 @@ txn_complete(struct txn *txn)
 			panic("rollback trigger failed");
 		}
 		fiber_set_txn(fiber(), NULL);
-
+		txn->done = true;
+		if (txn->fiber == NULL)
+			txn_free(txn);
+		else if (txn->fiber != fiber())
+			fiber_wakeup(txn->fiber);
 		return;
 	}
 	/*
@@ -368,6 +375,15 @@ txn_complete(struct txn *txn)
 	 */
 	if (txn->engine != NULL)
 		engine_commit(txn->engine, txn);
+
+	ev_tstamp stop_tm = ev_monotonic_now(loop());
+	if (stop_tm - txn->start_tm > too_long_threshold) {
+		int n_rows = txn->n_new_rows + txn->n_applier_rows;
+		say_warn_ratelimited("too long WAL write: %d rows at "
+				     "LSN %lld: %.3f sec", n_rows,
+				     txn->signature - n_rows + 1,
+				     stop_tm - txn->start_tm);
+	}
 	/*
 	 * Some of triggers require for in_txn variable is set so
 	 * restore it for time a trigger is in progress.
@@ -378,6 +394,7 @@ txn_complete(struct txn *txn)
 	 * may throw. In case an error has happened, there is
 	 * no other option but terminate.
 	 */
+	fiber_set_txn(fiber(), txn);
 	if (txn->has_triggers &&
 	    trigger_run(&txn->on_commit, txn) != 0) {
 		diag_log();
@@ -386,35 +403,41 @@ txn_complete(struct txn *txn)
 	}
 
 	fiber_set_txn(fiber(), NULL);
+	txn->done = true;
+	if (txn->fiber == NULL)
+		txn_free(txn);
+	else if (txn->fiber != fiber())
+		fiber_wakeup(txn->fiber);
 }
 
 static void
 txn_entry_done_cb(struct journal_entry *entry, void *data)
 {
 	struct txn *txn = (struct txn *)data;
+	assert(txn->entry == entry);
 	txn->signature = entry->res;
 	txn_complete(txn);
 }
 
-
 static int64_t
-txn_write_to_wal(struct txn *txn)
+txn_journal_write(struct txn *txn)
 {
+	assert(txn->entry == NULL);
 	assert(txn->n_new_rows + txn->n_applier_rows > 0);
 
-	struct journal_entry *req = journal_entry_new(txn->n_new_rows +
-						      txn->n_applier_rows,
-						      &txn->region,
-						      txn_entry_done_cb,
-						      txn);
-	if (req == NULL) {
+	/* Prepare a journal entry. */
+	txn->entry = journal_entry_new(txn->n_new_rows +
+				       txn->n_applier_rows,
+				       &txn->region,
+				       txn_entry_done_cb, txn);
+	if (txn->entry == NULL) {
 		txn_rollback(txn);
 		return -1;
 	}
 
 	struct txn_stmt *stmt;
-	struct xrow_header **remote_row = req->rows;
-	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
+	struct xrow_header **remote_row = txn->entry->rows;
+	struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows;
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->row == NULL)
 			continue; /* A read (e.g. select) request */
@@ -422,34 +445,25 @@ txn_write_to_wal(struct txn *txn)
 			*local_row++ = stmt->row;
 		else
 			*remote_row++ = stmt->row;
-		req->approx_len += xrow_approx_len(stmt->row);
+		txn->entry->approx_len += xrow_approx_len(stmt->row);
 	}
-	assert(remote_row == req->rows + txn->n_applier_rows);
+	assert(remote_row == txn->entry->rows + txn->n_applier_rows);
 	assert(local_row == remote_row + txn->n_new_rows);
 
-	ev_tstamp start = ev_monotonic_now(loop());
-	int64_t res = journal_write(req);
-	ev_tstamp stop = ev_monotonic_now(loop());
-
-	if (res < 0) {
+	/* Send entry to a journal. */
+	if (journal_write(txn->entry) < 0) {
 		diag_set(ClientError, ER_WAL_IO);
-		diag_log();
-	} else if (stop - start > too_long_threshold) {
-		int n_rows = txn->n_new_rows + txn->n_applier_rows;
-		say_warn_ratelimited("too long WAL write: %d rows at "
-				     "LSN %lld: %.3f sec", n_rows,
-				     res - n_rows + 1, stop - start);
+		return -1;
 	}
-	/*
-	 * Use vclock_sum() from WAL writer as transaction signature.
-	 */
-	return res;
+	return 0;
 }
 
-int
-txn_commit(struct txn *txn)
+/*
+ * Prepare a transaction using engines.
+ */
+static int
+txn_prepare(struct txn *txn)
 {
-	assert(txn == in_txn());
 	/*
 	 * If transaction has been started in SQL, deferred
 	 * foreign key constraints must not be violated.
@@ -459,7 +473,7 @@ txn_commit(struct txn *txn)
 		struct sql_txn *sql_txn = txn->psql_txn;
 		if (sql_txn->fk_deferred_count != 0) {
 			diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
-			goto fail;
+			return -1;
 		}
 	}
 	/*
@@ -467,32 +481,63 @@ txn_commit(struct txn *txn)
 	 * we have a bunch of IPROTO_NOP statements.
 	 */
 	if (txn->engine != NULL) {
-		if (engine_prepare(txn->engine, txn) != 0)
-			goto fail;
+		if (engine_prepare(txn->engine, txn) != 0) {
+			return -1;
+		}
 	}
 	trigger_clear(&txn->fiber_on_stop);
+	return 0;
+}
 
-	fiber_set_txn(fiber(), NULL);
-	if (txn->n_new_rows + txn->n_applier_rows > 0) {
-		txn->signature = txn_write_to_wal(txn);
-		if (txn->signature < 0)
-			return -1;
-	} else {
-		/*
-		 * However there is noting to write to wal a completion
-		 * should be fired.
-		 */
+/*
+ * Send a transaction to a journal.
+ */
+int
+txn_write(struct txn *txn)
+{
+	if (txn_prepare(txn) != 0)
+		goto fail;
+
+	txn->start_tm = ev_monotonic_now(loop());
+	if (txn->n_new_rows + txn->n_applier_rows == 0) {
+		/* Nothing to do. */
 		txn->signature = 0;
 		txn_complete(txn);
+		return 0;
 	}
 
-	txn_free(txn);
+	if (txn_journal_write(txn) != 0)
+		return -1;
+	fiber_set_txn(fiber(), NULL);
 	return 0;
 fail:
 	txn_rollback(txn);
 	return -1;
 }
 
+int
+txn_commit(struct txn *txn)
+{
+	txn->fiber = fiber();
+
+	if (txn_write(txn) != 0)
+		return -1;
+	/*
+	 * In case of non-yielding journal the transaction could already
+	 * be done and there is nothing to wait in such cases.
+	 */
+	if (!txn->done) {
+		bool cancellable = fiber_set_cancellable(false);
+		fiber_yield();
+		fiber_set_cancellable(cancellable);
+	}
+	int res = txn->signature >= 0? 0: -1;
+	if (res != 0)
+		diag_set(ClientError, ER_WAL_IO);
+	txn_free(txn);
+	return res;
+}
+
 void
 txn_rollback_stmt(struct txn *txn)
 {
@@ -505,11 +550,9 @@ txn_rollback_stmt(struct txn *txn)
 void
 txn_rollback(struct txn *txn)
 {
-	assert(txn == in_txn());
 	trigger_clear(&txn->fiber_on_stop);
 	txn->signature = -1;
 	txn_complete(txn);
-	txn_free(txn);
 }
 
 void
diff --git a/src/box/txn.h b/src/box/txn.h
index 569978ce9..bd6b695a9 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -195,6 +195,16 @@ struct txn {
 	 /** Commit and rollback triggers */
 	struct rlist on_commit, on_rollback;
 	struct sql_txn *psql_txn;
+	/** Journal entry to control txn write. */
+	struct journal_entry *entry;
+	/** Transaction completion trigger. */
+	struct trigger entry_done;
+	/** Timestampt of entry write start. */
+	ev_tstamp start_tm;
+	/* A fiber to wake up when transaction is finished. */
+	struct fiber *fiber;
+	/* True when transaction is processed. */
+	bool done;
 };
 
 /* Pointer to the current transaction (if any) */
@@ -228,6 +238,12 @@ txn_commit(struct txn *txn);
 void
 txn_rollback(struct txn *txn);
 
+int
+txn_write(struct txn *txn);
+
+int
+txn_wait(struct txn *txn);
+
 /**
  * Roll back the transaction but keep the object around.
  * A special case for memtx transaction abort on yield. In this
diff --git a/src/box/wal.c b/src/box/wal.c
index 62b6391fd..582ae4598 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -265,7 +265,6 @@ tx_schedule_f(va_list ap)
 						   struct journal_entry, fifo);
 			if (req->on_done_cb != NULL)
 				req->on_done_cb(req, req->on_done_cb_data);
-			fiber_wakeup(req->fiber);
 		}
 		writer->is_in_rollback = false;
 		fiber_cond_wait(&writer->schedule_cond);
@@ -274,7 +273,7 @@ tx_schedule_f(va_list ap)
 }
 
 /**
- * Attach requests to a scheduling queue.
+ * Signal done condition.
  */
 static void
 tx_schedule_queue(struct stailq *queue)
@@ -380,7 +379,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	writer->wal_max_size = wal_max_size;
 	writer->is_in_rollback = false;
 	journal_create(&writer->base, wal_mode == WAL_NONE ?
-		       wal_write_in_wal_mode_none : wal_write, NULL);
+		       wal_write_in_wal_mode_none : wal_write,
+		       NULL);
 
 	struct xlog_opts opts = xlog_opts_default;
 	opts.sync_is_async = true;
@@ -1153,9 +1153,9 @@ wal_writer_f(va_list ap)
 
 /**
  * WAL writer main entry point: queue a single request
- * to be written to disk and wait until this task is completed.
+ * to be written to disk.
  */
-int64_t
+static int64_t
 wal_write(struct journal *journal, struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
@@ -1212,19 +1212,10 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	batch->approx_len += entry->approx_len;
 	writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
 	cpipe_flush_input(&writer->wal_pipe);
-	/**
-	 * It's not safe to spuriously wakeup this fiber
-	 * since in that case it will ignore a possible
-	 * error from WAL writer and not roll back the
-	 * transaction.
-	 */
-	bool cancellable = fiber_set_cancellable(false);
-	fiber_yield(); /* Request was inserted. */
-	fiber_set_cancellable(cancellable);
-	return entry->res;
+	return 0;
 }
 
-int64_t
+static int64_t
 wal_write_in_wal_mode_none(struct journal *journal,
 			   struct journal_entry *entry)
 {
-- 
2.22.0





More information about the Tarantool-patches mailing list