[tarantool-patches] [PATCH v3 11/14] txn: introduce asynchronous txn commit
    Georgy Kirichenko 
    georgy at tarantool.org
       
    Sun Jun  9 23:44:40 MSK 2019
    
    
  
Allow asynchronous transaction commit. This adds two functions:
 * txn_write that sends a transaction to a journal
 * txn_wait that waits until the transaction processing was done
Prerequisites: #1254
---
 src/box/box.cc    |  24 ++---
 src/box/journal.c |  22 +++--
 src/box/journal.h |  40 +++++----
 src/box/txn.c     | 220 +++++++++++++++++++++++++++++++---------------
 src/box/txn.h     |  12 +++
 src/box/vy_log.c  |   2 +-
 src/box/wal.c     |   8 ++
 7 files changed, 221 insertions(+), 107 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index d0616095b..510f3fc99 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -308,19 +308,21 @@ struct recovery_journal {
  * min/max LSN of created LSM levels.
  */
 static int64_t
-recovery_journal_async_write(struct journal *base,
-			     struct journal_entry *entry)
+recovery_journal_write(struct journal *base,
+		       struct journal_entry *entry)
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
 	entry->res = vclock_sum(journal->vclock);
+	if (entry->on_done_cb)
+		entry->on_done_cb(entry, entry->on_done_cb_data);
 	entry->done = true;
 	fiber_cond_broadcast(&entry->done_cond);
 	return 0;
 }
 
 static int64_t
-recovery_journal_async_wait(struct journal *base,
-			    struct journal_entry *entry)
+recovery_journal_wait(struct journal *base,
+		      struct journal_entry *entry)
 {
 	(void) base;
 	assert(entry->done);
@@ -328,20 +330,20 @@ recovery_journal_async_wait(struct journal *base,
 }
 
 static int64_t
-recovery_journal_write(struct journal *base,
-		       struct journal_entry *entry)
+recovery_journal_write_sync(struct journal *base,
+			    struct journal_entry *entry)
 {
-	if (recovery_journal_async_write(base, entry) == 0)
-		return recovery_journal_async_wait(base, entry);
+	if (recovery_journal_write(base, entry) == 0)
+		return recovery_journal_wait(base, entry);
 	return -1;
 }
 
 static inline void
 recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 {
-	journal_create(&journal->base, recovery_journal_write,
-		       recovery_journal_async_write,
-		       recovery_journal_async_wait,
+	journal_create(&journal->base, recovery_journal_write_sync,
+		       recovery_journal_write,
+		       recovery_journal_wait,
 		       NULL);
 	journal->vclock = v;
 }
diff --git a/src/box/journal.c b/src/box/journal.c
index b978e6752..dadff771e 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -37,17 +37,19 @@
  * but txn_commit() must work.
  */
 static int64_t
-dummy_async_write(struct journal *journal, struct journal_entry *entry)
+dummy_journal_write(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
 	entry->res = 0;
+	if (entry->on_done_cb)
+		entry->on_done_cb(entry, entry->on_done_cb_data);
 	entry->done = true;
 	fiber_cond_broadcast(&entry->done_cond);
 	return 0;
 }
 
 static int64_t
-dummy_async_wait(struct journal *journal, struct journal_entry *entry)
+dummy_journal_wait(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
 	assert(entry->done);
@@ -55,24 +57,26 @@ dummy_async_wait(struct journal *journal, struct journal_entry *entry)
 }
 
 static int64_t
-dummy_journal_write(struct journal *journal, struct journal_entry *entry)
+dummy_journal_write_sync(struct journal *journal, struct journal_entry *entry)
 {
-	if (dummy_async_write(journal, entry) == 0)
-		return dummy_async_wait(journal, entry);
+	if (dummy_journal_write(journal, entry) == 0)
+		return dummy_journal_wait(journal, entry);
 	return -1;
 }
 
 static struct journal dummy_journal = {
+	dummy_journal_write_sync,
 	dummy_journal_write,
-	dummy_async_write,
-	dummy_async_wait,
+	dummy_journal_wait,
 	NULL,
 };
 
 struct journal *current_journal = &dummy_journal;
 
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region)
+journal_entry_new(size_t n_rows, struct region *region,
+		  void (*on_done_cb)(struct journal_entry *entry, void *data),
+		  void *on_done_cb_data)
 {
 	struct journal_entry *entry;
 
@@ -90,6 +94,8 @@ journal_entry_new(size_t n_rows, struct region *region)
 	entry->res = -1;
 	entry->done = false;
 	fiber_cond_create(&entry->done_cond);
+	entry->on_done_cb = on_done_cb;
+	entry->on_done_cb_data = on_done_cb_data;
 	return entry;
 }
 
diff --git a/src/box/journal.h b/src/box/journal.h
index e7fe9154a..c7e467969 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -63,6 +63,14 @@ struct journal_entry {
 	 * Condition to broadcast when processing is done.
 	 */
 	struct fiber_cond done_cond;
+	/**
+	 * A journal entry completion callback.
+	 */
+	void (*on_done_cb)(struct journal_entry *entry, void *data);
+	/**
+	 * A journal entry completion callback argument.
+	 */
+	void *on_done_cb_data;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -85,7 +93,9 @@ struct region;
  * @return NULL if out of memory, fiber diagnostics area is set
  */
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region);
+journal_entry_new(size_t n_rows, struct region *region,
+		  void (*on_done_cb)(struct journal_entry *entry, void *data),
+		  void *on_done_cb_data);
 
 /**
  * An API for an abstract journal for all transactions of this
@@ -93,12 +103,12 @@ journal_entry_new(size_t n_rows, struct region *region);
  * synchronous replication.
  */
 struct journal {
+	int64_t (*write_sync)(struct journal *journal,
+			      struct journal_entry *req);
 	int64_t (*write)(struct journal *journal,
 			 struct journal_entry *req);
-	int64_t (*async_write)(struct journal *journal,
-			       struct journal_entry *req);
-	int64_t (*async_wait)(struct journal *journal,
-			      struct journal_entry *req);
+	int64_t (*wait)(struct journal *journal,
+			struct journal_entry *req);
 	void (*destroy)(struct journal *journal);
 };
 
@@ -115,9 +125,9 @@ extern struct journal *current_journal;
  *           or -1 on error.
  */
 static inline int64_t
-journal_write(struct journal_entry *entry)
+journal_write_sync(struct journal_entry *entry)
 {
-	return current_journal->write(current_journal, entry);
+	return current_journal->write_sync(current_journal, entry);
 }
 
 /**
@@ -126,9 +136,9 @@ journal_write(struct journal_entry *entry)
  * @return   0 if write was scheduled or -1 on error.
  */
 static inline int64_t
-journal_async_write(struct journal_entry *entry)
+journal_write(struct journal_entry *entry)
 {
-	return current_journal->async_write(current_journal, entry);
+	return current_journal->write(current_journal, entry);
 }
 
 /**
@@ -137,9 +147,9 @@ journal_async_write(struct journal_entry *entry)
  *           or -1 on error.
  */
 static inline int64_t
-journal_async_wait(struct journal_entry *entry)
+journal_wait(struct journal_entry *entry)
 {
-	return current_journal->async_wait(current_journal, entry);
+	return current_journal->wait(current_journal, entry);
 }
 
 /**
@@ -173,14 +183,14 @@ journal_set(struct journal *new_journal)
 
 static inline void
 journal_create(struct journal *journal,
+	       int64_t (*write_sync)(struct journal *, struct journal_entry *),
 	       int64_t (*write)(struct journal *, struct journal_entry *),
-	       int64_t (*async_write)(struct journal *, struct journal_entry *),
-	       int64_t (*async_wait)(struct journal *, struct journal_entry *),
+	       int64_t (*wait)(struct journal *, struct journal_entry *),
 	       void (*destroy)(struct journal *))
 {
+	journal->write_sync = write_sync;
 	journal->write = write;
-	journal->async_write = async_write,
-	journal->async_wait = async_wait,
+	journal->wait = wait;
 	journal->destroy = destroy;
 }
 
diff --git a/src/box/txn.c b/src/box/txn.c
index a08652af1..815d635fe 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -199,6 +199,7 @@ txn_begin()
 	txn->engine = NULL;
 	txn->engine_tx = NULL;
 	txn->psql_txn = NULL;
+	txn->entry = NULL;
 	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
 	fiber_set_txn(fiber(), txn);
 	return txn;
@@ -318,22 +319,77 @@ fail:
 	return -1;
 }
 
+/**
+ * Complete transaction processing.
+ */
+static void
+txn_complete(struct txn *txn)
+{
+	if (txn->signature < 0) {
+		if (txn->engine)
+			engine_rollback(txn->engine, txn);
+		/* Rollback triggers must not throw. */
+		fiber_set_txn(fiber(), txn);
+		if (txn->has_triggers &&
+		    trigger_run(&txn->on_rollback, txn) != 0) {
+			diag_log();
+			unreachable();
+			panic("rollback trigger failed");
+		}
+		fiber_set_txn(fiber(), NULL);
+
+		return;
+	}
+	/*
+	 * Engine can be NULL if transaction contains IPROTO_NOP
+	 * statements only.
+	 */
+	if (txn->engine != NULL)
+		engine_commit(txn->engine, txn);
+	/*
+	 * The transaction is in the binary log. No action below
+	 * may throw. In case an error has happened, there is
+	 * no other option but terminate.
+	 */
+	fiber_set_txn(fiber(), txn);
+	if (txn->has_triggers &&
+	    trigger_run(&txn->on_commit, txn) != 0) {
+		diag_log();
+		unreachable();
+		panic("commit trigger failed");
+	}
+
+	fiber_set_txn(fiber(), NULL);
+}
+
+static void
+txn_entry_done_cb(struct journal_entry *entry, void *data)
+{
+	struct txn *txn = (struct txn *)data;
+	assert(txn->entry == entry);
+	txn->signature = entry->res;
+	txn_complete(txn);
+}
+
 static int64_t
-txn_write_to_wal(struct txn *txn)
+txn_journal_write(struct txn *txn)
 {
+	assert(txn->entry == NULL);
 	assert(txn->n_new_rows + txn->n_applier_rows > 0);
 
-	struct journal_entry *req = journal_entry_new(txn->n_new_rows +
-						      txn->n_applier_rows,
-						      &txn->region);
-	if (req == NULL) {
+	/* Prepare a journal entry. */
+	txn->entry = journal_entry_new(txn->n_new_rows +
+				       txn->n_applier_rows,
+				       &txn->region,
+				       txn_entry_done_cb, txn);
+	if (txn->entry == NULL) {
 		txn_rollback(txn);
 		return -1;
 	}
 
 	struct txn_stmt *stmt;
-	struct xrow_header **remote_row = req->rows;
-	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
+	struct xrow_header **remote_row = txn->entry->rows;
+	struct xrow_header **local_row = txn->entry->rows + txn->n_applier_rows;
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->row == NULL)
 			continue; /* A read (e.g. select) request */
@@ -341,42 +397,39 @@ txn_write_to_wal(struct txn *txn)
 			*local_row++ = stmt->row;
 		else
 			*remote_row++ = stmt->row;
-		req->approx_len += xrow_approx_len(stmt->row);
+		txn->entry->approx_len += xrow_approx_len(stmt->row);
 	}
-	assert(remote_row == req->rows + txn->n_applier_rows);
+	assert(remote_row == txn->entry->rows + txn->n_applier_rows);
 	assert(local_row == remote_row + txn->n_new_rows);
 
-	ev_tstamp start = ev_monotonic_now(loop());
-	int64_t res = journal_write(req);
-	ev_tstamp stop = ev_monotonic_now(loop());
-
-	if (res < 0) {
-		/* Cascading rollback. */
-		txn_rollback(txn); /* Perform our part of cascading rollback. */
-		/*
-		 * Move fiber to end of event loop to avoid
-		 * execution of any new requests before all
-		 * pending rollbacks are processed.
-		 */
-		fiber_reschedule();
+	/* Send entry to a journal. */
+	if (journal_write(txn->entry) < 0) {
 		diag_set(ClientError, ER_WAL_IO);
-		diag_log();
-	} else if (stop - start > too_long_threshold) {
-		int n_rows = txn->n_new_rows + txn->n_applier_rows;
-		say_warn_ratelimited("too long WAL write: %d rows at "
-				     "LSN %lld: %.3f sec", n_rows,
-				     res - n_rows + 1, stop - start);
+		return -1;
 	}
-	/*
-	 * Use vclock_sum() from WAL writer as transaction signature.
-	 */
-	return res;
+	return 0;
 }
 
-int
-txn_commit(struct txn *txn)
+/*
+ * Wait until journal processing finished.
+ */
+static int64_t
+txn_journal_wait(struct txn *txn)
+{
+	assert(txn->entry != NULL);
+	int64_t signature = journal_wait(txn->entry);
+	assert(signature == txn->signature);
+	if (signature < 0)
+		diag_set(ClientError, ER_WAL_IO);
+	return signature;
+}
+
+/*
+ * Prepare a transaction using engines.
+ */
+static int
+txn_prepare(struct txn *txn)
 {
-	assert(txn == in_txn());
 	/*
 	 * If transaction has been started in SQL, deferred
 	 * foreign key constraints must not be violated.
@@ -386,7 +439,7 @@ txn_commit(struct txn *txn)
 		struct sql_txn *sql_txn = txn->psql_txn;
 		if (sql_txn->fk_deferred_count != 0) {
 			diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
-			goto fail;
+			return -1;
 		}
 	}
 	/*
@@ -394,42 +447,75 @@ txn_commit(struct txn *txn)
 	 * we have a bunch of IPROTO_NOP statements.
 	 */
 	if (txn->engine != NULL) {
-		if (engine_prepare(txn->engine, txn) != 0)
-			goto fail;
+		if (engine_prepare(txn->engine, txn) != 0) {
+			return -1;
+		}
 	}
+	return 0;
+}
 
-	if (txn->n_new_rows + txn->n_applier_rows > 0) {
-		txn->signature = txn_write_to_wal(txn);
-		if (txn->signature < 0)
-			return -1;
+/*
+ * Send a transaction to a journal.
+ */
+int
+txn_write(struct txn *txn)
+{
+	if (txn_prepare(txn) != 0)
+		goto fail;
+
+	txn->start_tm = ev_monotonic_now(loop());
+	if (txn->n_new_rows + txn->n_applier_rows == 0) {
+		/* Nothing to do. */
+		txn->signature = 0;
+		txn_complete(txn);
+		return 0;
 	}
-	/*
-	 * Engine can be NULL if transaction contains IPROTO_NOP
-	 * statements only.
-	 */
-	if (txn->engine != NULL)
-		engine_commit(txn->engine, txn);
-	/*
-	 * The transaction is in the binary log. No action below
-	 * may throw. In case an error has happened, there is
-	 * no other option but terminate.
-	 */
-	if (txn->has_triggers &&
-	    trigger_run(&txn->on_commit, txn) != 0) {
-		diag_log();
-		unreachable();
-		panic("commit trigger failed");
+
+	if (txn_journal_write(txn) != 0)
+		goto fail;
+	fiber_set_txn(fiber(), NULL);
+	return 0;
+fail:
+	txn_rollback(txn);
+	return -1;
+}
+
+/*
+ * Wait until transaction processing was finished.
+ */
+int
+txn_wait(struct txn *txn)
+{
+	if (txn->n_new_rows + txn->n_applier_rows > 0 &&
+	    txn_journal_wait(txn) < 0)
+		goto fail;
+	ev_tstamp stop_tm = ev_monotonic_now(loop());
+	if (stop_tm - txn->start_tm > too_long_threshold) {
+		int n_rows = txn->n_new_rows + txn->n_applier_rows;
+		say_warn_ratelimited("too long WAL write: %d rows at "
+				     "LSN %lld: %.3f sec", n_rows,
+				     txn->signature - n_rows + 1,
+				     stop_tm - txn->start_tm);
 	}
 
 
-	fiber_set_txn(fiber(), NULL);
 	txn_free(txn);
 	return 0;
+
 fail:
-	txn_rollback(txn);
+	txn_free(txn);
 	return -1;
 }
 
+int
+txn_commit(struct txn *txn)
+{
+	if (txn_write(txn) != 0 ||
+	    txn_wait(txn) < 0)
+		return -1;
+	return 0;
+}
+
 void
 txn_rollback_stmt(struct txn *txn)
 {
@@ -442,18 +528,8 @@ txn_rollback_stmt(struct txn *txn)
 void
 txn_rollback(struct txn *txn)
 {
-	assert(txn == in_txn());
-	if (txn->engine)
-		engine_rollback(txn->engine, txn);
-	/* Rollback triggers must not throw. */
-	if (txn->has_triggers &&
-	    trigger_run(&txn->on_rollback, txn) != 0) {
-		diag_log();
-		unreachable();
-		panic("rollback trigger failed");
-	}
-
-	fiber_set_txn(fiber(), NULL);
+	txn->signature = -1;
+	txn_complete(txn);
 	txn_free(txn);
 }
 
diff --git a/src/box/txn.h b/src/box/txn.h
index d211e5012..84d4f27d3 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -195,6 +195,12 @@ struct txn {
 	 /** Commit and rollback triggers */
 	struct rlist on_commit, on_rollback;
 	struct sql_txn *psql_txn;
+	/** Journal entry to control txn write. */
+	struct journal_entry *entry;
+	/** Transaction completion trigger. */
+	struct trigger entry_done;
+	/** Timestampt of entry write start. */
+	ev_tstamp start_tm;
 };
 
 /* Pointer to the current transaction (if any) */
@@ -228,6 +234,12 @@ txn_commit(struct txn *txn);
 void
 txn_rollback(struct txn *txn);
 
+int
+txn_write(struct txn *txn);
+
+int
+txn_wait(struct txn *txn);
+
 /**
  * Roll back the transaction but keep the object around.
  * A special case for memtx transaction abort on yield. In this
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index bdc1cfa31..d37d44011 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -772,7 +772,7 @@ vy_log_flush(void)
 		tx_size++;
 
 	size_t used = region_used(&fiber()->gc);
-	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
+	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);
 	if (entry == NULL)
 		goto err;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index e868a8e71..eff48b4fe 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -272,6 +272,8 @@ tx_schedule_f(va_list ap)
 			struct journal_entry *req =
 				stailq_shift_entry(&writer->schedule_queue,
 						   struct journal_entry, fifo);
+			if (req->on_done_cb != NULL)
+				req->on_done_cb(req, req->on_done_cb_data);
 			req->done = true;
 			fiber_cond_broadcast(&req->done_cond);
 		}
@@ -1184,6 +1186,8 @@ wal_async_write(struct journal *journal, struct journal_entry *entry)
 		say_error("Aborting transaction %llu during "
 			  "cascading rollback",
 			  vclock_sum(&writer->vclock));
+		if (entry->on_done_cb)
+			entry->on_done_cb(entry, entry->on_done_cb_data);
 		entry->done = true;
 		fiber_cond_broadcast(&entry->done_cond);
 		return -1;
@@ -1200,6 +1204,8 @@ wal_async_write(struct journal *journal, struct journal_entry *entry)
 		if (batch == NULL) {
 			diag_set(OutOfMemory, sizeof(struct wal_msg),
 				 "region", "struct wal_msg");
+			if (entry->on_done_cb)
+				entry->on_done_cb(entry, entry->on_done_cb_data);
 			entry->done = true;
 			fiber_cond_broadcast(&entry->done_cond);
 			return -1;
@@ -1253,6 +1259,8 @@ wal_async_write_in_wal_mode_none(struct journal *journal,
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	entry->res = vclock_sum(&writer->vclock);
+	if (entry->on_done_cb)
+		entry->on_done_cb(entry, entry->on_done_cb_data);
 	entry->done = true;
 	fiber_cond_broadcast(&entry->done_cond);
 	return 0;
-- 
2.21.0
    
    
More information about the Tarantool-patches
mailing list