Tarantool development patches archive
 help / color / mirror / Atom feed
From: Georgy Kirichenko <georgy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: Georgy Kirichenko <georgy@tarantool.org>
Subject: [tarantool-patches] [PATCH v5 4/7] wal: introduce a journal entry finalization callback
Date: Sat, 22 Jun 2019 00:48:18 +0300	[thread overview]
Message-ID: <b060c4b11be03126990d2c0434ae6f6e6a038d3b.1561153472.git.georgy@tarantool.org> (raw)
In-Reply-To: <cover.1561153472.git.georgy@tarantool.org>

Finalize a transaction thorough a journal entry callback. So transaction
processing doesn't rely on fiber schedule.
Also allow to steal locked latch ownership for fiber which isn't owner
of the latch. This is required to process transaction triggers
asynchronously.

Prerequisites: #1254
---
 src/box/alter.cc     |   6 +++
 src/box/box.cc       |   6 ++-
 src/box/journal.c    |   9 +++-
 src/box/journal.h    |  29 +++++++++-
 src/box/txn.c        | 123 +++++++++++++++++++++++++++++--------------
 src/box/vy_log.c     |   3 +-
 src/box/wal.c        |  21 ++++++--
 src/lib/core/latch.h |  10 ++++
 8 files changed, 157 insertions(+), 50 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index a37a68ce4..1595e27af 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3558,6 +3558,12 @@ unlock_after_dd(struct trigger *trigger, void *event)
 {
 	(void) trigger;
 	(void) event;
+	/*
+	 * In case of yielding journal will this trigger be processed
+	 * in a context of tx_prio endpoint instead of a context of
+	 * a fiber which has this latch locked. So steal the latch first.
+	 */
+	latch_steal(&schema_lock);
 	latch_unlock(&schema_lock);
 }
 
diff --git a/src/box/box.cc b/src/box/box.cc
index d53b0cdc5..f5bd29dd5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -303,10 +303,12 @@ struct recovery_journal {
  */
 static int64_t
 recovery_journal_write(struct journal *base,
-		       struct journal_entry * /* entry */)
+		       struct journal_entry *entry)
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
-	return vclock_sum(journal->vclock);
+	entry->res = vclock_sum(journal->vclock);
+	journal_entry_complete(entry);
+	return entry->res;
 }
 
 static inline void
diff --git a/src/box/journal.c b/src/box/journal.c
index fe13fb6ee..4c1997f36 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -41,7 +41,8 @@ static int64_t
 dummy_journal_write(struct journal *journal, struct journal_entry *entry)
 {
 	(void) journal;
-	(void) entry;
+	entry->res = 0;
+	journal_entry_complete(entry);
 	return 0;
 }
 
@@ -53,7 +54,9 @@ static struct journal dummy_journal = {
 struct journal *current_journal = &dummy_journal;
 
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region)
+journal_entry_new(size_t n_rows, struct region *region,
+		  journal_entry_done_cb on_done_cb,
+		  void *on_done_cb_data)
 {
 	struct journal_entry *entry;
 
@@ -70,6 +73,8 @@ journal_entry_new(size_t n_rows, struct region *region)
 	entry->n_rows = n_rows;
 	entry->res = -1;
 	entry->fiber = fiber();
+	entry->on_done_cb = on_done_cb;
+	entry->on_done_cb_data = on_done_cb_data;
 	return entry;
 }
 
diff --git a/src/box/journal.h b/src/box/journal.h
index 8ac32ee5e..52b8a715c 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -39,6 +39,11 @@ extern "C" {
 #endif /* defined(__cplusplus) */
 
 struct xrow_header;
+struct journal_entry;
+
+/** Journal entry finalization callback typedef. */
+typedef void (*journal_entry_done_cb)(struct journal_entry *entry, void *data);
+
 /**
  * An entry for an abstract journal.
  * Simply put, a write ahead log request.
@@ -58,6 +63,17 @@ struct journal_entry {
 	 * The fiber issuing the request.
 	 */
 	struct fiber *fiber;
+	/**
+	 * A journal entry finalization callback which is going to be called
+	 * after the entry processing was winished in both cases: succes
+	 * or fail. Entry->res is set to a result value before the callback
+	 * is fired.
+	 */
+	journal_entry_done_cb on_done_cb;
+	/**
+	 * A journal entry completion callback argument.
+	 */
+	void *on_done_cb_data;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -80,7 +96,18 @@ struct region;
  * @return NULL if out of memory, fiber diagnostics area is set
  */
 struct journal_entry *
-journal_entry_new(size_t n_rows, struct region *region);
+journal_entry_new(size_t n_rows, struct region *region,
+		  journal_entry_done_cb on_done_cb,
+		  void *on_done_cb_data);
+
+/**
+ * Finalize a signle entry.
+ */
+static inline void
+journal_entry_complete(struct journal_entry *entry)
+{
+	entry->on_done_cb(entry, entry->on_done_cb_data);
+}
 
 /**
  * An API for an abstract journal for all transactions of this
diff --git a/src/box/txn.c b/src/box/txn.c
index 1eb4db6a3..5825acc34 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -337,6 +337,66 @@ fail:
 	return -1;
 }
 
+/*
+ * A helper function to process on_commit/on_rollback triggers.
+ */
+static inline void
+txn_process_trigger(struct rlist *trigger, struct txn *txn)
+{
+	/*
+	 * Some of triggers require for in_txn variable is set so
+	 * restore it for time a trigger is in progress.
+	 */
+	fiber_set_txn(fiber(), txn);
+	/* Rollback triggers must not throw. */
+	if (trigger_run(trigger, txn) != 0) {
+		/*
+		 * As transaction couldn't handle a trigger error so
+		 * there is no option except than panic.
+		 */
+		diag_log();
+		unreachable();
+		panic("rollback trigger failed");
+	}
+	fiber_set_txn(fiber(), NULL);
+}
+
+/**
+ * Complete transaction processing.
+ */
+static void
+txn_complete(struct txn *txn)
+{
+	if (txn->signature < 0) {
+		/* Undo the transaction. */
+		if (txn->engine)
+			engine_rollback(txn->engine, txn);
+		if (txn->has_triggers)
+			txn_process_trigger(&txn->on_rollback, txn);
+
+		return;
+	} else {
+		/* Accept the transaction. */
+		/*
+		 * Engine can be NULL if transaction contains IPROTO_NOP
+		 * statements only.
+		 */
+		if (txn->engine != NULL)
+			engine_commit(txn->engine, txn);
+		if (txn->has_triggers)
+			txn_process_trigger(&txn->on_commit, txn);
+	}
+}
+
+static void
+txn_entry_done_cb(struct journal_entry *entry, void *data)
+{
+	struct txn *txn = (struct txn *)data;
+	txn->signature = entry->res;
+	txn_complete(txn);
+}
+
+
 static int64_t
 txn_write_to_wal(struct txn *txn)
 {
@@ -344,7 +404,9 @@ txn_write_to_wal(struct txn *txn)
 
 	struct journal_entry *req = journal_entry_new(txn->n_new_rows +
 						      txn->n_applier_rows,
-						      &txn->region);
+						      &txn->region,
+						      txn_entry_done_cb,
+						      txn);
 	if (req == NULL) {
 		txn_rollback(txn);
 		return -1;
@@ -370,16 +432,14 @@ txn_write_to_wal(struct txn *txn)
 	ev_tstamp stop = ev_monotonic_now(loop());
 
 	if (res < 0) {
-		/* Cascading rollback. */
-		txn_rollback(txn); /* Perform our part of cascading rollback. */
-		/*
-		 * Move fiber to end of event loop to avoid
-		 * execution of any new requests before all
-		 * pending rollbacks are processed.
-		 */
-		fiber_reschedule();
 		diag_set(ClientError, ER_WAL_IO);
 		diag_log();
+		/*
+		 * However, the transaction is rolled back by
+		 * finalization handler we are still in duty to
+		 * free it.
+		 */
+		txn_free(txn);
 	} else if (stop - start > too_long_threshold) {
 		int n_rows = txn->n_new_rows + txn->n_applier_rows;
 		say_warn_ratelimited("too long WAL write: %d rows at "
@@ -418,30 +478,23 @@ txn_commit(struct txn *txn)
 	}
 	trigger_clear(&txn->fiber_on_stop);
 
+	/*
+	 * After this transaction could not be used more
+	 * so reset corresponding key in a fiber storage.
+	 */
+	fiber_set_txn(fiber(), NULL);
 	if (txn->n_new_rows + txn->n_applier_rows > 0) {
 		txn->signature = txn_write_to_wal(txn);
 		if (txn->signature < 0)
 			return -1;
+	} else {
+		/*
+		 * However, there is nothing to write to wal a finalization
+		 * should be fired.
+		 */
+		txn->signature = 0;
+		txn_complete(txn);
 	}
-	/*
-	 * Engine can be NULL if transaction contains IPROTO_NOP
-	 * statements only.
-	 */
-	if (txn->engine != NULL)
-		engine_commit(txn->engine, txn);
-	/*
-	 * The transaction is in the binary log. No action below
-	 * may throw. In case an error has happened, there is
-	 * no other option but terminate.
-	 */
-	if (txn->has_triggers &&
-	    trigger_run(&txn->on_commit, txn) != 0) {
-		diag_log();
-		unreachable();
-		panic("commit trigger failed");
-	}
-
-	fiber_set_txn(fiber(), NULL);
 	txn_free(txn);
 	return 0;
 fail:
@@ -463,18 +516,10 @@ txn_rollback(struct txn *txn)
 {
 	assert(txn == in_txn());
 	trigger_clear(&txn->fiber_on_stop);
-	if (txn->engine)
-		engine_rollback(txn->engine, txn);
-	/* Rollback triggers must not throw. */
-	if (txn->has_triggers &&
-	    trigger_run(&txn->on_rollback, txn) != 0) {
-		diag_log();
-		unreachable();
-		panic("rollback trigger failed");
-	}
-
-	fiber_set_txn(fiber(), NULL);
+	txn->signature = -1;
+	txn_complete(txn);
 	txn_free(txn);
+	fiber_set_txn(fiber(), NULL);
 }
 
 void
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 098a01419..bf50f5520 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -819,7 +819,8 @@ vy_log_tx_flush(struct vy_log_tx *tx)
 		tx_size++;
 
 	size_t used = region_used(&fiber()->gc);
-	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
+	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc,
+							NULL, NULL);
 	if (entry == NULL)
 		goto err;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 0ea15a432..4fa9beca0 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -260,8 +260,10 @@ tx_schedule_queue(struct stailq *queue)
 	 * are many ready fibers.
 	 */
 	struct journal_entry *req;
-	stailq_foreach_entry(req, queue, fifo)
+	stailq_foreach_entry(req, queue, fifo) {
+		journal_entry_complete(req);
 		fiber_wakeup(req->fiber);
+	}
 }
 
 /**
@@ -1131,7 +1133,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 
-	ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
+	ERROR_INJECT(ERRINJ_WAL_IO, {
+		goto fail;
+	});
 
 	if (! stailq_empty(&writer->rollback)) {
 		/*
@@ -1144,7 +1148,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 		say_error("Aborting transaction %llu during "
 			  "cascading rollback",
 			  vclock_sum(&writer->vclock));
-		return -1;
+		goto fail;
 	}
 
 	struct wal_msg *batch;
@@ -1158,7 +1162,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 		if (batch == NULL) {
 			diag_set(OutOfMemory, sizeof(struct wal_msg),
 				 "region", "struct wal_msg");
-			return -1;
+			goto fail;
 		}
 		wal_msg_create(batch);
 		/*
@@ -1182,6 +1186,11 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	fiber_yield(); /* Request was inserted. */
 	fiber_set_cancellable(cancellable);
 	return entry->res;
+
+fail:
+	entry->res = -1;
+	journal_entry_complete(entry);
+	return -1;
 }
 
 int64_t
@@ -1195,7 +1204,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
 		       entry->rows + entry->n_rows);
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
-	return vclock_sum(&writer->vclock);
+	entry->res = vclock_sum(&writer->vclock);
+	journal_entry_complete(entry);
+	return entry->res;
 }
 
 void
diff --git a/src/lib/core/latch.h b/src/lib/core/latch.h
index 49c59cf63..580942564 100644
--- a/src/lib/core/latch.h
+++ b/src/lib/core/latch.h
@@ -155,6 +155,16 @@ latch_trylock(struct latch *l)
 	return latch_lock_timeout(l, 0);
 }
 
+/**
+ * Take a latch ownership
+ */
+static inline void
+latch_steal(struct latch *l)
+{
+	assert(l->owner != NULL);
+	l->owner = fiber();
+}
+
 /**
  * \copydoc box_latch_unlock
  */
-- 
2.22.0

  parent reply	other threads:[~2019-06-21 21:48 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-06-21 21:48 [tarantool-patches] [PATCH v5 0/7] Parallel applier Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 1/7] txn: unref statement at txn_free Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 2/7] txn: get rid of autocommit from a txn structure Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 3/7] txn: get rid of fiber_gc from txn_rollback Georgy Kirichenko
2019-06-21 21:48 ` Georgy Kirichenko [this message]
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 5/7] txn: introduce asynchronous txn commit Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 6/7] applier: apply transaction in parallel Georgy Kirichenko
2019-06-21 21:48 ` [tarantool-patches] [PATCH v5 7/7] test: fix flaky test Georgy Kirichenko
2019-06-25 16:08 ` [tarantool-patches] [PATCH v5 0/7] Parallel applier Vladimir Davydov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=b060c4b11be03126990d2c0434ae6f6e6a038d3b.1561153472.git.georgy@tarantool.org \
    --to=georgy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [tarantool-patches] [PATCH v5 4/7] wal: introduce a journal entry finalization callback' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox