[PATCH v2 1/3] memtx: fix txn_on_yield for DDL transactions

Vladimir Davydov vdavydov.dev at gmail.com
Fri Jul 12 17:54:19 MSK 2019


On Wed, Jul 10, 2019 at 11:34:39PM +0300, Konstantin Osipov wrote:
> > -	/* .prepare = */ memtx_engine_prepare,
> > +	/* .begin_statement = */ generic_engine_begin_statement,
> > +	/* .prepare = */ generic_engine_prepare,
> 
> If some of these callbacks are now empty in all engines, they
> should be removed from vtab.

Those methods are still used in Vinyl so we can't remove them :-(

> >  	txn->has_triggers  = false;
> >  	txn->is_done = false;
> >  	txn->is_aborted = false;
> > +	txn->is_yield_disabled = false;
> 
> It is hight time to convert these to bit fields.

NP but let's please do it in a separate patch if we have to.

> Overall I agree the approach is much better than before, at least
> easier to track since the logic is not hidden in the engine.
> 
> Please let's make the names more evident now, and the api simple.
> I think txn_can_yield(true/false) is good, and I also think that
> we need to collapse all txn flags into a bit field now that we
> have a critical mass.

Agree, the current API does look kinda crooked :-/ Reworked according to
your review. Please see the updated patch below and on the branch.
---

>From 73cb8a22680bf362a5dce925b0aebd2e7508d334 Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov.dev at gmail.com>
Date: Fri, 5 Jul 2019 17:40:56 +0300
Subject: [PATCH] memtx: fix txn_on_yield for DDL transactions

Memtx engine doesn't allow yielding inside a transaction. To achieve
that, it installs fiber->on_yield trigger that aborts the current
transaction (rolls it back, but leaves it be so that commit fails).

There's an exception though - DDL statements are allowed to yield.
This is required so as not to block the event loop while a new index
is built or a space format is checked. Currently, we handle this
exception by checking space id and omitting installation of the
trigger for system spaces. This isn't entirely correct, because we
may yield after a DDL statement is complete, in which case the
transaction won't be aborted though it should:

  box.begin()
  box.space.my_space:create_index('my_index')
  fiber.sleep(0) -- doesn't abort the transaction!

This patch fixes the problem by making the memtx engine install the
on_yield trigger unconditionally, for all kinds of transactions, and
instead explicitly disabling the trigger for yielding DDL operations.

In order not to spread the yield-in-transaction logic between memtx
and txn code, let's move all fiber_on_yield related stuff to txn,
export a method to disable yields, and use the method in memtx.

diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index c8376110..869cd343 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -55,58 +55,6 @@
 static void
 checkpoint_cancel(struct checkpoint *ckpt);
 
-/*
- * Memtx yield-in-transaction trigger: roll back the effects
- * of the transaction and mark the transaction as aborted.
- */
-static void
-txn_on_yield(struct trigger *trigger, void *event)
-{
-	(void) trigger;
-	(void) event;
-
-	struct txn *txn = in_txn();
-	assert(txn && txn->engine_tx);
-	if (txn == NULL || txn->engine_tx == NULL)
-		return;
-	txn_abort(txn);                 /* doesn't yield or fail */
-}
-
-/**
- * Initialize context for yield triggers.
- * In case of a yield inside memtx multi-statement transaction
- * we must, first of all, roll back the effects of the transaction
- * so that concurrent transactions won't see dirty, uncommitted
- * data.
- * Second, we must abort the transaction, since it has been rolled
- * back implicitly. The transaction can not be rolled back
- * completely from within a yield trigger, since a yield trigger
- * can't fail. Instead, we mark the transaction as aborted and
- * raise an error on attempt to commit it.
- *
- * So much hassle to be user-friendly until we have a true
- * interactive transaction support in memtx.
- */
-void
-memtx_init_txn(struct txn *txn)
-{
-	struct fiber *fiber = fiber();
-
-	trigger_create(&txn->fiber_on_yield, txn_on_yield,
-		       NULL, NULL);
-	/*
-	 * Memtx doesn't allow yields between statements of
-	 * a transaction. Set a trigger which would roll
-	 * back the transaction if there is a yield.
-	 */
-	trigger_add(&fiber->on_yield, &txn->fiber_on_yield);
-	/*
-	 * This serves as a marker that the triggers are
-	 * initialized.
-	 */
-	txn->engine_tx = txn;
-}
-
 struct PACKED memtx_tuple {
 	/*
 	 * sic: the header of the tuple is used
@@ -372,45 +320,10 @@ memtx_engine_create_space(struct engine *engine, struct space_def *def,
 }
 
 static int
-memtx_engine_prepare(struct engine *engine, struct txn *txn)
-{
-	(void)engine;
-	if (txn->engine_tx == 0)
-		return 0;
-	/*
-	 * These triggers are only used for memtx and only
-	 * when autocommit == false, so we are saving
-	 * on calls to trigger_create/trigger_clear.
-	 */
-	trigger_clear(&txn->fiber_on_yield);
-	if (txn->is_aborted) {
-		diag_set(ClientError, ER_TRANSACTION_YIELD);
-		diag_log();
-		return -1;
-	}
-	return 0;
-}
-
-static int
 memtx_engine_begin(struct engine *engine, struct txn *txn)
 {
 	(void)engine;
-	(void)txn;
-	return 0;
-}
-
-static int
-memtx_engine_begin_statement(struct engine *engine, struct txn *txn)
-{
-	(void)engine;
-	(void)txn;
-	if (txn->engine_tx == NULL) {
-		struct space *space = txn_last_stmt(txn)->space;
-
-		if (space->def->id > BOX_SYSTEM_ID_MAX)
-			/* Setup triggers for non-ddl transactions. */
-			memtx_init_txn(txn);
-	}
+	txn_can_yield(txn, false);
 	return 0;
 }
 
@@ -459,9 +372,6 @@ memtx_engine_rollback_statement(struct engine *engine, struct txn *txn,
 static void
 memtx_engine_rollback(struct engine *engine, struct txn *txn)
 {
-	if (txn->engine_tx != NULL) {
-		trigger_clear(&txn->fiber_on_yield);
-	}
 	struct txn_stmt *stmt;
 	stailq_reverse(&txn->stmts);
 	stailq_foreach_entry(stmt, &txn->stmts, next)
@@ -948,8 +858,8 @@ static const struct engine_vtab memtx_engine_vtab = {
 	/* .create_space = */ memtx_engine_create_space,
 	/* .join = */ memtx_engine_join,
 	/* .begin = */ memtx_engine_begin,
-	/* .begin_statement = */ memtx_engine_begin_statement,
-	/* .prepare = */ memtx_engine_prepare,
+	/* .begin_statement = */ generic_engine_begin_statement,
+	/* .prepare = */ generic_engine_prepare,
 	/* .commit = */ generic_engine_commit,
 	/* .rollback_statement = */ memtx_engine_rollback_statement,
 	/* .rollback = */ memtx_engine_rollback,
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index f0e1cfd2..926b3f18 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -878,6 +878,8 @@ memtx_check_on_replace(struct trigger *trigger, void *event)
 static int
 memtx_space_check_format(struct space *space, struct tuple_format *format)
 {
+	struct txn *txn = in_txn();
+
 	if (space->index_count == 0)
 		return 0;
 	struct index *pk = space->index[0];
@@ -888,6 +890,8 @@ memtx_space_check_format(struct space *space, struct tuple_format *format)
 	if (it == NULL)
 		return -1;
 
+	txn_can_yield(txn, true);
+
 	struct memtx_engine *memtx = (struct memtx_engine *)space->engine;
 	struct memtx_ddl_state state;
 	state.format = format;
@@ -930,6 +934,7 @@ memtx_space_check_format(struct space *space, struct tuple_format *format)
 	iterator_delete(it);
 	diag_destroy(&state.diag);
 	trigger_clear(&on_replace);
+	txn_can_yield(txn, false);
 	return rc;
 }
 
@@ -1002,6 +1007,7 @@ static int
 memtx_space_build_index(struct space *src_space, struct index *new_index,
 			struct tuple_format *new_format)
 {
+	struct txn *txn = in_txn();
 	/**
 	 * If it's a secondary key, and we're not building them
 	 * yet (i.e. it's snapshot recovery for memtx), do nothing.
@@ -1027,6 +1033,8 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
 	if (it == NULL)
 		return -1;
 
+	txn_can_yield(txn, true);
+
 	struct memtx_engine *memtx = (struct memtx_engine *)src_space->engine;
 	struct memtx_ddl_state state;
 	state.index = new_index;
@@ -1103,6 +1111,7 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
 	iterator_delete(it);
 	diag_destroy(&state.diag);
 	trigger_clear(&on_replace);
+	txn_can_yield(txn, false);
 	return rc;
 }
 
diff --git a/src/box/txn.c b/src/box/txn.c
index c605345d..5193b49c 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -40,6 +40,12 @@ double too_long_threshold;
 /* Txn cache. */
 static struct stailq txn_cache = {NULL, &txn_cache.first};
 
+static void
+txn_on_stop(struct trigger *trigger, void *event);
+
+static void
+txn_on_yield(struct trigger *trigger, void *event);
+
 static inline void
 fiber_set_txn(struct fiber *fiber, struct txn *txn)
 {
@@ -193,7 +199,8 @@ txn_begin()
 	txn->n_applier_rows = 0;
 	txn->has_triggers  = false;
 	txn->is_done = false;
-	txn->is_aborted = false;
+	txn->can_yield = true;
+	txn->is_aborted_by_yield = false;
 	txn->in_sub_stmt = 0;
 	txn->id = ++tsn;
 	txn->signature = -1;
@@ -201,8 +208,8 @@ txn_begin()
 	txn->engine_tx = NULL;
 	txn->psql_txn = NULL;
 	txn->fiber = NULL;
-	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
 	fiber_set_txn(fiber(), txn);
+	/* fiber_on_yield is initialized by engine on demand */
 	trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
 	trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
 	return txn;
@@ -463,6 +470,12 @@ txn_write_to_wal(struct txn *txn)
 static int
 txn_prepare(struct txn *txn)
 {
+	if (txn->is_aborted_by_yield) {
+		assert(!txn->can_yield);
+		diag_set(ClientError, ER_TRANSACTION_YIELD);
+		diag_log();
+		return -1;
+	}
 	/*
 	 * If transaction has been started in SQL, deferred
 	 * foreign key constraints must not be violated.
@@ -484,6 +497,8 @@ txn_prepare(struct txn *txn)
 			return -1;
 	}
 	trigger_clear(&txn->fiber_on_stop);
+	if (!txn->can_yield)
+		trigger_clear(&txn->fiber_on_yield);
 	return 0;
 }
 
@@ -550,23 +565,13 @@ txn_rollback(struct txn *txn)
 {
 	assert(txn == in_txn());
 	trigger_clear(&txn->fiber_on_stop);
+	if (!txn->can_yield)
+		trigger_clear(&txn->fiber_on_yield);
 	txn->signature = -1;
 	txn_complete(txn);
 	fiber_set_txn(fiber(), NULL);
 }
 
-void
-txn_abort(struct txn *txn)
-{
-	assert(in_txn() == txn);
-	txn_rollback_to_svp(txn, NULL);
-	if (txn->has_triggers) {
-		txn_run_triggers(txn, &txn->on_rollback);
-		txn->has_triggers = false;
-	}
-	txn->is_aborted = true;
-}
-
 int
 txn_check_singlestatement(struct txn *txn, const char *where)
 {
@@ -578,6 +583,20 @@ txn_check_singlestatement(struct txn *txn, const char *where)
 	return 0;
 }
 
+void
+txn_can_yield(struct txn *txn, bool set)
+{
+	assert(txn == in_txn());
+	assert(txn->can_yield != set);
+	txn->can_yield = set;
+	if (set) {
+		trigger_clear(&txn->fiber_on_yield);
+	} else {
+		trigger_create(&txn->fiber_on_yield, txn_on_yield, NULL, NULL);
+		trigger_add(&fiber()->on_yield, &txn->fiber_on_yield);
+	}
+}
+
 int64_t
 box_txn_id(void)
 {
@@ -711,7 +730,7 @@ box_txn_rollback_to_savepoint(box_txn_savepoint_t *svp)
 	return 0;
 }
 
-void
+static void
 txn_on_stop(struct trigger *trigger, void *event)
 {
 	(void) trigger;
@@ -719,3 +738,34 @@ txn_on_stop(struct trigger *trigger, void *event)
 	txn_rollback(in_txn());                 /* doesn't yield or fail */
 }
 
+/**
+ * Memtx yield-in-transaction trigger callback.
+ *
+ * In case of a yield inside memtx multi-statement transaction
+ * we must, first of all, roll back the effects of the transaction
+ * so that concurrent transactions won't see dirty, uncommitted
+ * data.
+ *
+ * Second, we must abort the transaction, since it has been rolled
+ * back implicitly. The transaction can not be rolled back
+ * completely from within a yield trigger, since a yield trigger
+ * can't fail. Instead, we mark the transaction as aborted and
+ * raise an error on attempt to commit it.
+ *
+ * So much hassle to be user-friendly until we have a true
+ * interactive transaction support in memtx.
+ */
+static void
+txn_on_yield(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	(void) event;
+	struct txn *txn = in_txn();
+	assert(txn != NULL && !txn->can_yield);
+	txn_rollback_to_svp(txn, NULL);
+	if (txn->has_triggers) {
+		txn_run_triggers(txn, &txn->on_rollback);
+		txn->has_triggers = false;
+	}
+	txn->is_aborted_by_yield = true;
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index d1ef220a..a6d6878b 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -168,7 +168,12 @@ struct txn {
 	 * True if the transaction was aborted so should be
 	 * rolled back at commit.
 	 */
-	bool is_aborted;
+	bool is_aborted_by_yield;
+	/**
+	 * True if yields are allowed inside a transaction,
+	 * see txn_can_yield().
+	 */
+	bool can_yield;
 	/** True if on_commit and on_rollback lists are non-empty. */
 	bool has_triggers;
 	/** The number of active nested statement-level transactions. */
@@ -259,17 +264,6 @@ int
 txn_write(struct txn *txn);
 
 /**
- * Roll back the transaction but keep the object around.
- * A special case for memtx transaction abort on yield. In this
- * case we need to abort the transaction to avoid dirty reads but
- * need to keep it around to ensure a new one is not implicitly
- * started and committed by the user program. Later, at
- * transaction commit we will raise an exception.
- */
-void
-txn_abort(struct txn *txn);
-
-/**
  * Most txns don't have triggers, and txn objects
  * are created on every access to data, so txns
  * are partially initialized.
@@ -371,6 +365,21 @@ int
 txn_check_singlestatement(struct txn *txn, const char *where);
 
 /**
+ * Enables or disables fiber yields inside the current transaction
+ * depending on the value of the given flag. Yields are disabled
+ * by installing a fiber-on-yield trigger that marks the transaction
+ * as aborted, which results in rolling back the transaction on
+ * commit.
+ *
+ * This function is used by the memtx engine, because it doesn't
+ * support yields inside transactions. It is also used to temporarily
+ * enable yields for long DDL operations such as building an index
+ * or checking a space format.
+ */
+void
+txn_can_yield(struct txn *txn, bool set);
+
+/**
  * Returns true if the transaction has a single statement.
  * Supposed to be used from a space on_replace trigger to
  * detect transaction boundaries.
@@ -400,12 +409,6 @@ txn_last_stmt(struct txn *txn)
 }
 
 /**
- * Fiber-stop trigger: roll back the transaction.
- */
-void
-txn_on_stop(struct trigger *trigger, void *event);
-
-/**
  * Return VDBE that is being currently executed.
  *
  * @retval VDBE that is being currently executed.
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index e0de65d0..7635c84c 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1093,6 +1093,7 @@ static int
 vinyl_space_check_format(struct space *space, struct tuple_format *format)
 {
 	struct vy_env *env = vy_env(space->engine);
+	struct txn *txn = in_txn();
 
 	/*
 	 * If this is local recovery, the space was checked before
@@ -1121,6 +1122,8 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
 	bool need_wal_sync;
 	tx_manager_abort_writers_for_ddl(env->xm, space, &need_wal_sync);
 
+	txn_can_yield(txn, true);
+
 	struct trigger on_replace;
 	struct vy_check_format_ctx ctx;
 	ctx.format = format;
@@ -1168,6 +1171,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
 
 	diag_destroy(&ctx.diag);
 	trigger_clear(&on_replace);
+	txn_can_yield(txn, false);
 	return rc;
 }
 
@@ -4314,6 +4318,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 {
 	struct vy_env *env = vy_env(src_space->engine);
 	struct vy_lsm *pk = vy_lsm(src_space->index[0]);
+	struct txn *txn = in_txn();
 
 	if (new_index->def->iid == 0 && !vy_lsm_is_empty(pk)) {
 		diag_set(ClientError, ER_UNSUPPORTED, "Vinyl",
@@ -4345,6 +4350,8 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 	bool need_wal_sync;
 	tx_manager_abort_writers_for_ddl(env->xm, src_space, &need_wal_sync);
 
+	txn_can_yield(txn, true);
+
 	/*
 	 * Iterate over all tuples stored in the space and insert
 	 * each of them into the new LSM tree. Since read iterator
@@ -4438,6 +4445,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 
 	diag_destroy(&ctx.diag);
 	trigger_clear(&on_replace);
+	txn_can_yield(txn, false);
 	return rc;
 }
 
diff --git a/test/box/transaction.result b/test/box/transaction.result
index 857314b7..ad2d650c 100644
--- a/test/box/transaction.result
+++ b/test/box/transaction.result
@@ -722,3 +722,19 @@ box.commit() -- error
 s:drop()
 ---
 ...
+--
+-- Check that a DDL transaction is rolled back on fiber yield.
+--
+s = box.schema.space.create('test')
+---
+...
+box.begin() s:create_index('pk') fiber.sleep(0)
+---
+...
+box.commit() -- error
+---
+- error: Transaction has been aborted by a fiber yield
+...
+s:drop()
+---
+...
diff --git a/test/box/transaction.test.lua b/test/box/transaction.test.lua
index 8ffae2fe..8cd3e8ba 100644
--- a/test/box/transaction.test.lua
+++ b/test/box/transaction.test.lua
@@ -373,3 +373,11 @@ fiber.sleep(0)
 s.index.pk == nil
 box.commit() -- error
 s:drop()
+
+--
+-- Check that a DDL transaction is rolled back on fiber yield.
+--
+s = box.schema.space.create('test')
+box.begin() s:create_index('pk') fiber.sleep(0)
+box.commit() -- error
+s:drop()
diff --git a/test/engine/truncate.result b/test/engine/truncate.result
index 277e7bda..d65a1df1 100644
--- a/test/engine/truncate.result
+++ b/test/engine/truncate.result
@@ -8,7 +8,7 @@ fiber =  require('fiber')
 ---
 ...
 --
--- Check that space truncation is forbidden in a transaction.
+-- Check that space truncation works fine in a transaction.
 --
 s = box.schema.create_space('test', {engine = engine})
 ---
@@ -19,13 +19,7 @@ _ = s:create_index('pk')
 _ = s:insert{123}
 ---
 ...
-box.begin()
----
-...
-s:truncate()
----
-...
-box.commit()
+box.begin() s:truncate() box.commit()
 ---
 ...
 s:select()
diff --git a/test/engine/truncate.test.lua b/test/engine/truncate.test.lua
index 74fdd52b..fe897fb6 100644
--- a/test/engine/truncate.test.lua
+++ b/test/engine/truncate.test.lua
@@ -4,14 +4,12 @@ engine = test_run:get_cfg('engine')
 fiber =  require('fiber')
 
 --
--- Check that space truncation is forbidden in a transaction.
+-- Check that space truncation works fine in a transaction.
 --
 s = box.schema.create_space('test', {engine = engine})
 _ = s:create_index('pk')
 _ = s:insert{123}
-box.begin()
-s:truncate()
-box.commit()
+box.begin() s:truncate() box.commit()
 s:select()
 s:drop()
 



More information about the Tarantool-patches mailing list