[tarantool-patches] [PATCH v5 4/7] wal: introduce a journal entry finalization callback

Georgy Kirichenko georgy at tarantool.org
Sat Jun 22 00:48:18 MSK 2019


Finalize a transaction thorough a journal entry callback. So transaction
processing doesn't rely on fiber schedule.
Also allow to steal locked latch ownership for fiber which isn't owner
of the latch. This is required to process transaction triggers
asynchronously.

Prerequisites: #1254
---
 src/box/alter.cc     |   6 +++
 src/box/box.cc       |   6 ++-
 src/box/journal.c    |   9 +++-
 src/box/journal.h    |  29 +++++++++-
 src/box/txn.c        | 123 +++++++++++++++++++++++++++++--------------
 src/box/vy_log.c     |   3 +-
 src/box/wal.c        |  21 ++++++--
 src/lib/core/latch.h |  10 ++++
 8 files changed, 157 insertions(+), 50 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index a37a68ce4..1595e27af 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3558,6 +3558,12 @@ unlock_after_dd(struct trigger *trigger, void *event)
 {
 	(void) trigger;
 	(void) event;
+	/*
+	 * In case of yielding journal will this trigger be processed
+	 * in a context of tx_prio endpoint instead of a context of
+	 * a fiber which has this latch locked. So steal the latch first.
+	 */
+	latch_steal(&schema_lock);
 	latch_unlock(&schema_lock);
 }
 
diff --git a/src/box/box.cc b/src/box/box.cc
index d53b0cdc5..f5bd29dd5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -303,10 +303,12 @@ struct recovery_journal {
  */
 static int64_t
 recovery_journal_write(struct journal *base,
-		       struct journal_entry * /* entry */)
+		       struct journal_entry *entry)
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
-	return vclock_sum(journal->vclock);
+	entry->res = vclock_sum(journal->vclock);
+	journal_entry_complete(entry);
+	return entry->res;
 }
 
 static inline void
diff --git a/src/box/journal.c b/src/box/journal.c
index fe13fb6ee..4c1997f36 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -41,7 +41,8 @@ static int64_t
 dummy_journal_write(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
-	(void) entry;
+	entry->res = 0;
+	journal_entry_complete(entry);
 	return 0;
 }
 
@@ -53,7 +54,9 @@ static struct journal dummy_journal = {
 struct journal *current_journal = &dummy_journal;
 
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region)
+journal_entry_new(size_t n_rows, struct region *region,
+		  journal_entry_done_cb on_done_cb,
+		  void *on_done_cb_data)
 {
 	struct journal_entry *entry;
 
@@ -70,6 +73,8 @@ journal_entry_new(size_t n_rows, struct region *region)
 	entry->n_rows = n_rows;
 	entry->res = -1;
 	entry->fiber = fiber();
+	entry->on_done_cb = on_done_cb;
+	entry->on_done_cb_data = on_done_cb_data;
 	return entry;
 }
 
diff --git a/src/box/journal.h b/src/box/journal.h
index 8ac32ee5e..52b8a715c 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -39,6 +39,11 @@ extern "C" {
 #endif /* defined(__cplusplus) */
 
 struct xrow_header;
+struct journal_entry;
+
+/** Journal entry finalization callback typedef. */
+typedef void (*journal_entry_done_cb)(struct journal_entry *entry, void *data);
+
 /**
  * An entry for an abstract journal.
  * Simply put, a write ahead log request.
@@ -58,6 +63,17 @@ struct journal_entry {
 	 * The fiber issuing the request.
 	 */
 	struct fiber *fiber;
+	/**
+	 * A journal entry finalization callback which is going to be called
+	 * after the entry processing was winished in both cases: succes
+	 * or fail. Entry->res is set to a result value before the callback
+	 * is fired.
+	 */
+	journal_entry_done_cb on_done_cb;
+	/**
+	 * A journal entry completion callback argument.
+	 */
+	void *on_done_cb_data;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -80,7 +96,18 @@ struct region;
  * @return NULL if out of memory, fiber diagnostics area is set
  */
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region);
+journal_entry_new(size_t n_rows, struct region *region,
+		  journal_entry_done_cb on_done_cb,
+		  void *on_done_cb_data);
+
+/**
+ * Finalize a signle entry.
+ */
+static inline void
+journal_entry_complete(struct journal_entry *entry)
+{
+	entry->on_done_cb(entry, entry->on_done_cb_data);
+}
 
 /**
  * An API for an abstract journal for all transactions of this
diff --git a/src/box/txn.c b/src/box/txn.c
index 1eb4db6a3..5825acc34 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -337,6 +337,66 @@ fail:
 	return -1;
 }
 
+/*
+ * A helper function to process on_commit/on_rollback triggers.
+ */
+static inline void
+txn_process_trigger(struct rlist *trigger, struct txn *txn)
+{
+	/*
+	 * Some of triggers require for in_txn variable is set so
+	 * restore it for time a trigger is in progress.
+	 */
+	fiber_set_txn(fiber(), txn);
+	/* Rollback triggers must not throw. */
+	if (trigger_run(trigger, txn) != 0) {
+		/*
+		 * As transaction couldn't handle a trigger error so
+		 * there is no option except than panic.
+		 */
+		diag_log();
+		unreachable();
+		panic("rollback trigger failed");
+	}
+	fiber_set_txn(fiber(), NULL);
+}
+
+/**
+ * Complete transaction processing.
+ */
+static void
+txn_complete(struct txn *txn)
+{
+	if (txn->signature < 0) {
+		/* Undo the transaction. */
+		if (txn->engine)
+			engine_rollback(txn->engine, txn);
+		if (txn->has_triggers)
+			txn_process_trigger(&txn->on_rollback, txn);
+
+		return;
+	} else {
+		/* Accept the transaction. */
+		/*
+		 * Engine can be NULL if transaction contains IPROTO_NOP
+		 * statements only.
+		 */
+		if (txn->engine != NULL)
+			engine_commit(txn->engine, txn);
+		if (txn->has_triggers)
+			txn_process_trigger(&txn->on_commit, txn);
+	}
+}
+
+static void
+txn_entry_done_cb(struct journal_entry *entry, void *data)
+{
+	struct txn *txn = (struct txn *)data;
+	txn->signature = entry->res;
+	txn_complete(txn);
+}
+
+
 static int64_t
 txn_write_to_wal(struct txn *txn)
 {
@@ -344,7 +404,9 @@ txn_write_to_wal(struct txn *txn)
 
 	struct journal_entry *req = journal_entry_new(txn->n_new_rows +
 						      txn->n_applier_rows,
-						      &txn->region);
+						      &txn->region,
+						      txn_entry_done_cb,
+						      txn);
 	if (req == NULL) {
 		txn_rollback(txn);
 		return -1;
@@ -370,16 +432,14 @@ txn_write_to_wal(struct txn *txn)
 	ev_tstamp stop = ev_monotonic_now(loop());
 
 	if (res < 0) {
-		/* Cascading rollback. */
-		txn_rollback(txn); /* Perform our part of cascading rollback. */
-		/*
-		 * Move fiber to end of event loop to avoid
-		 * execution of any new requests before all
-		 * pending rollbacks are processed.
-		 */
-		fiber_reschedule();
 		diag_set(ClientError, ER_WAL_IO);
 		diag_log();
+		/*
+		 * However, the transaction is rolled back by
+		 * finalization handler we are still in duty to
+		 * free it.
+		 */
+		txn_free(txn);
 	} else if (stop - start > too_long_threshold) {
 		int n_rows = txn->n_new_rows + txn->n_applier_rows;
 		say_warn_ratelimited("too long WAL write: %d rows at "
@@ -418,30 +478,23 @@ txn_commit(struct txn *txn)
 	}
 	trigger_clear(&txn->fiber_on_stop);
 
+	/*
+	 * After this transaction could not be used more
+	 * so reset corresponding key in a fiber storage.
+	 */
+	fiber_set_txn(fiber(), NULL);
 	if (txn->n_new_rows + txn->n_applier_rows > 0) {
 		txn->signature = txn_write_to_wal(txn);
 		if (txn->signature < 0)
 			return -1;
+	} else {
+		/*
+		 * However, there is nothing to write to wal a finalization
+		 * should be fired.
+		 */
+		txn->signature = 0;
+		txn_complete(txn);
 	}
-	/*
-	 * Engine can be NULL if transaction contains IPROTO_NOP
-	 * statements only.
-	 */
-	if (txn->engine != NULL)
-		engine_commit(txn->engine, txn);
-	/*
-	 * The transaction is in the binary log. No action below
-	 * may throw. In case an error has happened, there is
-	 * no other option but terminate.
-	 */
-	if (txn->has_triggers &&
-	    trigger_run(&txn->on_commit, txn) != 0) {
-		diag_log();
-		unreachable();
-		panic("commit trigger failed");
-	}
-
-	fiber_set_txn(fiber(), NULL);
 	txn_free(txn);
 	return 0;
 fail:
@@ -463,18 +516,10 @@ txn_rollback(struct txn *txn)
 {
 	assert(txn == in_txn());
 	trigger_clear(&txn->fiber_on_stop);
-	if (txn->engine)
-		engine_rollback(txn->engine, txn);
-	/* Rollback triggers must not throw. */
-	if (txn->has_triggers &&
-	    trigger_run(&txn->on_rollback, txn) != 0) {
-		diag_log();
-		unreachable();
-		panic("rollback trigger failed");
-	}
-
-	fiber_set_txn(fiber(), NULL);
+	txn->signature = -1;
+	txn_complete(txn);
 	txn_free(txn);
+	fiber_set_txn(fiber(), NULL);
 }
 
 void
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 098a01419..bf50f5520 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -819,7 +819,8 @@ vy_log_tx_flush(struct vy_log_tx *tx)
 		tx_size++;
 
 	size_t used = region_used(&fiber()->gc);
-	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
+	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc,
+							NULL, NULL);
 	if (entry == NULL)
 		goto err;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 0ea15a432..4fa9beca0 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -260,8 +260,10 @@ tx_schedule_queue(struct stailq *queue)
 	 * are many ready fibers.
 	 */
 	struct journal_entry *req;
-	stailq_foreach_entry(req, queue, fifo)
+	stailq_foreach_entry(req, queue, fifo) {
+		journal_entry_complete(req);
 		fiber_wakeup(req->fiber);
+	}
 }
 
 /**
@@ -1131,7 +1133,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 
-	ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
+	ERROR_INJECT(ERRINJ_WAL_IO, {
+		goto fail;
+	});
 
 	if (! stailq_empty(&writer->rollback)) {
 		/*
@@ -1144,7 +1148,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 		say_error("Aborting transaction %llu during "
 			  "cascading rollback",
 			  vclock_sum(&writer->vclock));
-		return -1;
+		goto fail;
 	}
 
 	struct wal_msg *batch;
@@ -1158,7 +1162,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 		if (batch == NULL) {
 			diag_set(OutOfMemory, sizeof(struct wal_msg),
 				 "region", "struct wal_msg");
-			return -1;
+			goto fail;
 		}
 		wal_msg_create(batch);
 		/*
@@ -1182,6 +1186,11 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	fiber_yield(); /* Request was inserted. */
 	fiber_set_cancellable(cancellable);
 	return entry->res;
+
+fail:
+	entry->res = -1;
+	journal_entry_complete(entry);
+	return -1;
 }
 
 int64_t
@@ -1195,7 +1204,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
 		       entry->rows + entry->n_rows);
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
-	return vclock_sum(&writer->vclock);
+	entry->res = vclock_sum(&writer->vclock);
+	journal_entry_complete(entry);
+	return entry->res;
 }
 
 void
diff --git a/src/lib/core/latch.h b/src/lib/core/latch.h
index 49c59cf63..580942564 100644
--- a/src/lib/core/latch.h
+++ b/src/lib/core/latch.h
@@ -155,6 +155,16 @@ latch_trylock(struct latch *l)
 	return latch_lock_timeout(l, 0);
 }
 
+/**
+ * Take a latch ownership
+ */
+static inline void
+latch_steal(struct latch *l)
+{
+	assert(l->owner != NULL);
+	l->owner = fiber();
+}
+
 /**
  * \copydoc box_latch_unlock
  */
-- 
2.22.0





More information about the Tarantool-patches mailing list