[PATCH v2] vinyl: abort rw transactions before DDL

Vladimir Davydov vdavydov.dev at gmail.com
Fri Aug 24 13:46:53 MSK 2018


When building a new index or checking a space format, we propagate
changes done to the space during the procedure with the aid of an
on_replace trigger. The problem is there may be transactions with a
non-empty write set when we install the trigger. Changes done by
those transactions will not be seen by the trigger and so they won't
make it to the new index, resulting in an inconsistency between the
primary and secondary indexes. To fix this issue, let's abort all rw
transactions affecting the space after setting the trigger. Note,
transactions that reached WAL can't be aborted so we wait for them to
complete.

Closes #3458
---
https://github.com/tarantool/tarantool/issues/3458
https://github.com/tarantool/tarantool/commits/dv/gh-3458-vy-abort-tx-before-ddl

Changes in v2:
 - Abort only transactions that affect the altered space.
 - Maintain the list of rw transactions, not all transactions.
 - Add some functional tests.

v1: https://www.freelists.org/post/tarantool-patches/PATCH-vinyl-abort-rw-transactions-before-DDL

 src/box/vinyl.c                     |  52 +++++++---
 src/box/vy_lsm.h                    |  11 ++
 src/box/vy_tx.c                     |  24 +++++
 src/box/vy_tx.h                     |  15 +++
 test/vinyl/ddl.result               |   1 -
 test/vinyl/ddl.test.lua             |   1 -
 test/vinyl/deferred_delete.result   |   5 +
 test/vinyl/deferred_delete.test.lua |   3 +
 test/vinyl/errinj.result            | 196 ++++++++++++++++++++++++++++++++++++
 test/vinyl/errinj.test.lua          |  85 ++++++++++++++++
 test/vinyl/misc.result              |   5 +
 test/vinyl/misc.test.lua            |   4 +
 12 files changed, 387 insertions(+), 15 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index fb121402..798a37f8 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1017,6 +1017,29 @@ vinyl_space_prepare_alter(struct space *old_space, struct space *new_space)
 	return 0;
 }
 
+/**
+ * This function is called after installing on_replace trigger
+ * used for propagating changes done during DDL. It aborts all
+ * rw transactions affecting the given LSM tree that began
+ * before the trigger was installed so that DDL doesn't miss
+ * their working set.
+ */
+static int
+vy_abort_writers_for_ddl(struct vy_env *env, struct vy_lsm *lsm)
+{
+	if (tx_manager_abort_writers(env->xm, lsm) != 0)
+		return -1;
+	/*
+	 * Wait for prepared transactions to complete
+	 * (we can't abort them as they reached WAL).
+	 */
+	struct vclock unused;
+	if (wal_checkpoint(&unused, false) != 0)
+		return -1;
+
+	return 0;
+}
+
 /** Argument passed to vy_check_format_on_replace(). */
 struct vy_check_format_ctx {
 	/** Format to check new tuples against. */
@@ -1087,10 +1110,13 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
 	trigger_create(&on_replace, vy_check_format_on_replace, &ctx, NULL);
 	trigger_add(&space->on_replace, &on_replace);
 
+	int rc = vy_abort_writers_for_ddl(env, pk);
+	if (rc != 0)
+		goto out;
+
 	struct vy_read_iterator itr;
 	vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, key,
-			      &env->xm->p_global_read_view);
-	int rc;
+			      &env->xm->p_committed_read_view);
 	int loops = 0;
 	struct tuple *tuple;
 	while ((rc = vy_read_iterator_next(&itr, &tuple)) == 0) {
@@ -1114,6 +1140,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
 			break;
 	}
 	vy_read_iterator_close(&itr);
+out:
 	diag_destroy(&ctx.diag);
 	trigger_clear(&on_replace);
 	tuple_unref(key);
@@ -4190,10 +4217,8 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 {
 	struct vy_env *env = vy_env(src_space->engine);
 	struct vy_lsm *pk = vy_lsm(src_space->index[0]);
-	bool is_empty = (pk->stat.disk.count.rows == 0 &&
-			 pk->stat.memory.count.rows == 0);
 
-	if (new_index->def->iid == 0 && !is_empty) {
+	if (new_index->def->iid == 0 && !vy_lsm_is_empty(pk)) {
 		diag_set(ClientError, ER_UNSUPPORTED, "Vinyl",
 			 "rebuilding the primary index of a non-empty space");
 		return -1;
@@ -4210,9 +4235,6 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 	    env->status == VINYL_FINAL_RECOVERY_LOCAL)
 		return vy_build_recover(env, new_lsm, pk);
 
-	if (is_empty)
-		return 0;
-
 	/*
 	 * Iterate over all tuples stored in the space and insert
 	 * each of them into the new LSM tree. Since read iterator
@@ -4234,10 +4256,13 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 	trigger_create(&on_replace, vy_build_on_replace, &ctx, NULL);
 	trigger_add(&src_space->on_replace, &on_replace);
 
+	int rc = vy_abort_writers_for_ddl(env, pk);
+	if (rc != 0)
+		goto out;
+
 	struct vy_read_iterator itr;
 	vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, key,
 			      &env->xm->p_committed_read_view);
-	int rc;
 	int loops = 0;
 	struct tuple *tuple;
 	int64_t build_lsn = env->xm->lsn;
@@ -4280,22 +4305,23 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 		}
 	}
 	vy_read_iterator_close(&itr);
-	tuple_unref(key);
 
 	/*
 	 * Dump the new index upon build completion so that we don't
-	 * have to rebuild it on recovery.
+	 * have to rebuild it on recovery. No need to trigger dump if
+	 * the space happens to be empty.
 	 */
-	if (rc == 0)
+	if (rc == 0 && !vy_lsm_is_empty(new_lsm))
 		rc = vy_scheduler_dump(&env->scheduler);
 
 	if (rc == 0 && ctx.is_failed) {
 		diag_move(&ctx.diag, diag_get());
 		rc = -1;
 	}
-
+out:
 	diag_destroy(&ctx.diag);
 	trigger_clear(&on_replace);
+	tuple_unref(key);
 	return rc;
 }
 
diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h
index d2aa0c43..6917d475 100644
--- a/src/box/vy_lsm.h
+++ b/src/box/vy_lsm.h
@@ -316,6 +316,17 @@ void
 vy_lsm_delete(struct vy_lsm *lsm);
 
 /**
+ * Return true if the LSM tree has no statements, neither on disk
+ * nor in memory.
+ */
+static inline bool
+vy_lsm_is_empty(struct vy_lsm *lsm)
+{
+	return (lsm->stat.disk.count.rows == 0 &&
+		lsm->stat.memory.count.rows == 0);
+}
+
+/**
  * Increment the reference counter of an LSM tree.
  * An LSM tree cannot be deleted if its reference
  * counter is elevated.
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 590b4483..1e8775a0 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -108,6 +108,7 @@ tx_manager_new(void)
 		return NULL;
 	}
 
+	rlist_create(&xm->writers);
 	rlist_create(&xm->read_views);
 	vy_global_read_view_create((struct vy_read_view *)&xm->global_read_view,
 				   INT64_MAX);
@@ -290,6 +291,7 @@ vy_tx_create(struct tx_manager *xm, struct vy_tx *tx)
 	vy_tx_read_set_new(&tx->read_set);
 	tx->psn = 0;
 	rlist_create(&tx->on_destroy);
+	rlist_create(&tx->in_writers);
 }
 
 void
@@ -308,6 +310,7 @@ vy_tx_destroy(struct vy_tx *tx)
 	}
 
 	vy_tx_read_set_iter(&tx->read_set, NULL, vy_tx_read_set_free_cb, NULL);
+	rlist_del_entry(tx, in_writers);
 }
 
 /** Return true if the transaction is read-only. */
@@ -819,6 +822,8 @@ vy_tx_rollback_to_savepoint(struct vy_tx *tx, void *svp)
 		tx->write_set_version++;
 		txv_delete(v);
 	}
+	if (stailq_empty(&tx->log))
+		rlist_del_entry(tx, in_writers);
 }
 
 int
@@ -1018,10 +1023,29 @@ vy_tx_set(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt)
 	tx->write_set_version++;
 	tx->write_size += tuple_size(stmt);
 	vy_stmt_counter_acct_tuple(&lsm->stat.txw.count, stmt);
+	if (stailq_empty(&tx->log))
+		rlist_add_entry(&tx->xm->writers, tx, in_writers);
 	stailq_add_tail_entry(&tx->log, v, next_in_log);
 	return 0;
 }
 
+int
+tx_manager_abort_writers(struct tx_manager *xm, struct vy_lsm *lsm)
+{
+	struct tuple *key = vy_stmt_new_select(lsm->env->key_format, NULL, 0);
+	if (key == NULL)
+		return -1;
+	struct vy_tx *tx;
+	rlist_foreach_entry(tx, &xm->writers, in_writers) {
+		assert(!vy_tx_is_ro(tx));
+		if (tx->state == VINYL_TX_READY &&
+		    write_set_search_key(&tx->write_set, lsm, key) != NULL)
+			tx->state = VINYL_TX_ABORT;
+	}
+	tuple_unref(key);
+	return 0;
+}
+
 void
 vy_txw_iterator_open(struct vy_txw_iterator *itr,
 		     struct vy_txw_iterator_stat *stat,
diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h
index dcf6a739..1d515c72 100644
--- a/src/box/vy_tx.h
+++ b/src/box/vy_tx.h
@@ -133,6 +133,8 @@ write_set_search_key(write_set_t *tree, struct vy_lsm *lsm,
 
 /** Transaction object. */
 struct vy_tx {
+	/** Link in tx_manager::writers. */
+	struct rlist in_writers;
 	/** Transaction manager. */
 	struct tx_manager *xm;
 	/**
@@ -209,6 +211,10 @@ struct tx_manager {
 	 */
 	struct vy_tx *last_prepared_tx;
 	/**
+	 * List of rw transactions, linked by vy_tx::in_writers.
+	 */
+	struct rlist writers;
+	/**
 	 * The list of TXs with a read view in order of vlsn.
 	 */
 	struct rlist read_views;
@@ -262,6 +268,15 @@ tx_manager_new(void);
 void
 tx_manager_delete(struct tx_manager *xm);
 
+/**
+ * Abort all rw transactions that affect the given LSM tree
+ * and haven't reached WAL yet.
+ *
+ * Returns 0 on success, -1 on memory allocation error.
+ */
+int
+tx_manager_abort_writers(struct tx_manager *xm, struct vy_lsm *lsm);
+
 /** Initialize a tx object. */
 void
 vy_tx_create(struct tx_manager *xm, struct vy_tx *tx);
diff --git a/test/vinyl/ddl.result b/test/vinyl/ddl.result
index 3e65e232..bb4fc984 100644
--- a/test/vinyl/ddl.result
+++ b/test/vinyl/ddl.result
@@ -656,7 +656,6 @@ last_val = 1000;
 ---
 ...
 function gen_load()
-    fiber.sleep(0.001)
     local s = box.space.test
     for i = 1, 200 do
         local op = math.random(4)
diff --git a/test/vinyl/ddl.test.lua b/test/vinyl/ddl.test.lua
index 45c5cf8e..f7bf03bb 100644
--- a/test/vinyl/ddl.test.lua
+++ b/test/vinyl/ddl.test.lua
@@ -253,7 +253,6 @@ box.commit();
 last_val = 1000;
 
 function gen_load()
-    fiber.sleep(0.001)
     local s = box.space.test
     for i = 1, 200 do
         local op = math.random(4)
diff --git a/test/vinyl/deferred_delete.result b/test/vinyl/deferred_delete.result
index 9811b6bc..ee9c0212 100644
--- a/test/vinyl/deferred_delete.result
+++ b/test/vinyl/deferred_delete.result
@@ -24,6 +24,11 @@ i1 = s:create_index('i1', {run_count_per_level = 10, parts = {2, 'unsigned'}, un
 i2 = s:create_index('i2', {run_count_per_level = 10, parts = {3, 'unsigned'}, unique = true})
 ---
 ...
+-- space.create_index() does a lookup in the primary index
+-- so reset the stats before filling up the space.
+box.stat.reset()
+---
+...
 for i = 1, 10 do s:replace{i, i, i} end
 ---
 ...
diff --git a/test/vinyl/deferred_delete.test.lua b/test/vinyl/deferred_delete.test.lua
index d18361a0..671a7ed1 100644
--- a/test/vinyl/deferred_delete.test.lua
+++ b/test/vinyl/deferred_delete.test.lua
@@ -13,6 +13,9 @@ s = box.schema.space.create('test', {engine = 'vinyl'})
 pk = s:create_index('pk', {run_count_per_level = 10})
 i1 = s:create_index('i1', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false})
 i2 = s:create_index('i2', {run_count_per_level = 10, parts = {3, 'unsigned'}, unique = true})
+-- space.create_index() does a lookup in the primary index
+-- so reset the stats before filling up the space.
+box.stat.reset()
 for i = 1, 10 do s:replace{i, i, i} end
 box.snapshot()
 for i = 1, 10, 2 do s:delete{i} end
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index cdffa198..17e4dc8c 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -1922,3 +1922,199 @@ box.snapshot() -- ok
 s:drop()
 ---
 ...
+--
+-- gh-3458: check that rw transactions that started before DDL are
+-- aborted.
+--
+vinyl_cache = box.cfg.vinyl_cache
+---
+...
+box.cfg{vinyl_cache = 0}
+---
+...
+s1 = box.schema.space.create('test1', {engine = 'vinyl'})
+---
+...
+_ = s1:create_index('pk', {page_size = 16})
+---
+...
+s2 = box.schema.space.create('test2', {engine = 'vinyl'})
+---
+...
+_ = s2:create_index('pk')
+---
+...
+pad = string.rep('x', 16)
+---
+...
+for i = 101, 200 do s1:replace{i, i, pad} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function async_replace(space, tuple, timeout)
+    local c = fiber.channel(1)
+    fiber.create(function()
+        box.begin()
+        space:replace(tuple)
+        fiber.sleep(timeout)
+        local status = pcall(box.commit)
+        c:put(status)
+    end)
+    return c
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+c1 = async_replace(s1, {1}, 0.01)
+---
+...
+c2 = async_replace(s2, {1}, 0.01)
+---
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+---
+- ok
+...
+s1:format{{'key', 'unsigned'}, {'value', 'unsigned'}}
+---
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+---
+- ok
+...
+c1:get() -- false (transaction was aborted)
+---
+- false
+...
+c2:get() -- true
+---
+- true
+...
+s1:get(1) == nil
+---
+- true
+...
+s2:get(1) ~= nil
+---
+- true
+...
+s1:format()
+---
+- [{'name': 'key', 'type': 'unsigned'}, {'name': 'value', 'type': 'unsigned'}]
+...
+s1:format{}
+---
+...
+c1 = async_replace(s1, {2}, 0.01)
+---
+...
+c2 = async_replace(s2, {2}, 0.01)
+---
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+---
+- ok
+...
+_ = s1:create_index('sk', {parts = {2, 'unsigned'}})
+---
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+---
+- ok
+...
+c1:get() -- false (transaction was aborted)
+---
+- false
+...
+c2:get() -- true
+---
+- true
+...
+s1:get(2) == nil
+---
+- true
+...
+s2:get(2) ~= nil
+---
+- true
+...
+s1.index.pk:count() == s1.index.sk:count()
+---
+- true
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+box.cfg{vinyl_cache = vinyl_cache}
+---
+...
+-- Transactions that reached WAL must not be aborted.
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('pk')
+---
+...
+errinj.set('ERRINJ_WAL_DELAY', true)
+---
+- ok
+...
+_ = fiber.create(function() s:replace{1} end)
+---
+...
+_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end)
+---
+...
+fiber.sleep(0)
+---
+...
+s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail
+---
+- error: Tuple field count 1 is less than required by space format or defined indexes
+    (expected at least 2)
+...
+s:select()
+---
+- - [1]
+...
+s:truncate()
+---
+...
+errinj.set('ERRINJ_WAL_DELAY', true)
+---
+- ok
+...
+_ = fiber.create(function() s:replace{1} end)
+---
+...
+_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end)
+---
+...
+fiber.sleep(0)
+---
+...
+s:create_index('sk', {parts = {2, 'unsigned'}})
+---
+- error: Tuple field count 1 is less than required by space format or defined indexes
+    (expected at least 2)
+...
+s:select()
+---
+- - [1]
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index c2332a69..1b02c01c 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -765,3 +765,88 @@ errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0)
 
 box.snapshot() -- ok
 s:drop()
+
+--
+-- gh-3458: check that rw transactions that started before DDL are
+-- aborted.
+--
+vinyl_cache = box.cfg.vinyl_cache
+box.cfg{vinyl_cache = 0}
+
+s1 = box.schema.space.create('test1', {engine = 'vinyl'})
+_ = s1:create_index('pk', {page_size = 16})
+s2 = box.schema.space.create('test2', {engine = 'vinyl'})
+_ = s2:create_index('pk')
+
+pad = string.rep('x', 16)
+for i = 101, 200 do s1:replace{i, i, pad} end
+box.snapshot()
+
+test_run:cmd("setopt delimiter ';'")
+function async_replace(space, tuple, timeout)
+    local c = fiber.channel(1)
+    fiber.create(function()
+        box.begin()
+        space:replace(tuple)
+        fiber.sleep(timeout)
+        local status = pcall(box.commit)
+        c:put(status)
+    end)
+    return c
+end;
+test_run:cmd("setopt delimiter ''");
+
+c1 = async_replace(s1, {1}, 0.01)
+c2 = async_replace(s2, {1}, 0.01)
+
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+s1:format{{'key', 'unsigned'}, {'value', 'unsigned'}}
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+
+c1:get() -- false (transaction was aborted)
+c2:get() -- true
+
+s1:get(1) == nil
+s2:get(1) ~= nil
+s1:format()
+s1:format{}
+
+c1 = async_replace(s1, {2}, 0.01)
+c2 = async_replace(s2, {2}, 0.01)
+
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+_ = s1:create_index('sk', {parts = {2, 'unsigned'}})
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+
+c1:get() -- false (transaction was aborted)
+c2:get() -- true
+
+s1:get(2) == nil
+s2:get(2) ~= nil
+s1.index.pk:count() == s1.index.sk:count()
+
+s1:drop()
+s2:drop()
+box.cfg{vinyl_cache = vinyl_cache}
+
+-- Transactions that reached WAL must not be aborted.
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('pk')
+
+errinj.set('ERRINJ_WAL_DELAY', true)
+_ = fiber.create(function() s:replace{1} end)
+_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end)
+
+fiber.sleep(0)
+s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail
+s:select()
+s:truncate()
+
+errinj.set('ERRINJ_WAL_DELAY', true)
+_ = fiber.create(function() s:replace{1} end)
+_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end)
+
+fiber.sleep(0)
+s:create_index('sk', {parts = {2, 'unsigned'}})
+s:select()
+s:drop()
diff --git a/test/vinyl/misc.result b/test/vinyl/misc.result
index 409df3e0..d2a7ca68 100644
--- a/test/vinyl/misc.result
+++ b/test/vinyl/misc.result
@@ -140,6 +140,11 @@ i6 = s:create_index('i6', {unique = true, parts = {4, 'unsigned', 6, 'unsigned',
 i7 = s:create_index('i7', {unique = true, parts = {6, 'unsigned'}})
 ---
 ...
+-- space.create_index() does a lookup in the primary index
+-- so reset the stats before calling space.insert().
+box.stat.reset()
+---
+...
 s:insert{1, 1, 1, 1, 1, 1}
 ---
 - [1, 1, 1, 1, 1, 1]
diff --git a/test/vinyl/misc.test.lua b/test/vinyl/misc.test.lua
index 3dbfdb54..9f61ca0a 100644
--- a/test/vinyl/misc.test.lua
+++ b/test/vinyl/misc.test.lua
@@ -62,6 +62,10 @@ i5 = s:create_index('i5', {unique = true, parts = {4, 'unsigned', 5, 'unsigned',
 i6 = s:create_index('i6', {unique = true, parts = {4, 'unsigned', 6, 'unsigned', 5, 'unsigned'}})
 i7 = s:create_index('i7', {unique = true, parts = {6, 'unsigned'}})
 
+-- space.create_index() does a lookup in the primary index
+-- so reset the stats before calling space.insert().
+box.stat.reset()
+
 s:insert{1, 1, 1, 1, 1, 1}
 
 i1:stat().lookup -- 1
-- 
2.11.0




More information about the Tarantool-patches mailing list