[PATCH] vinyl: flush transactions before setting trigger on altered space

Vladimir Davydov vdavydov.dev at gmail.com
Wed Aug 1 13:44:10 MSK 2018


When building a new index or checking a space format, we propagate
changes done to the space during the procedure with the aid of an
on_replace trigger. The problem is there may be transactions with a
non-empty write set when we install the trigger. Changes done by
those transactions will not be seen by the trigger and so they won't
make it to the new index, resulting in an inconsistency between the
primary and secondary indexes. To fix this issue, let's flush all
active transactions after installing the trigger. If a transaction
fails to commit or rollback within box.cfg.vinyl_timeout seconds,
DDL simply aborts it.

Closes #3458
---
 src/box/vinyl.c            |  28 ++++---
 src/box/vy_lsm.h           |  11 +++
 src/box/vy_tx.c            |  76 +++++++++++++++++++
 src/box/vy_tx.h            |  36 +++++++++
 test/vinyl/ddl.result      |   1 -
 test/vinyl/ddl.test.lua    |   1 -
 test/vinyl/errinj.result   | 181 +++++++++++++++++++++++++++++++++++++++++++++
 test/vinyl/errinj.test.lua |  82 ++++++++++++++++++++
 test/vinyl/misc.result     |   5 ++
 test/vinyl/misc.test.lua   |   4 +
 10 files changed, 414 insertions(+), 11 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 5da1c4bc..ce211c47 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1084,9 +1084,16 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
 	trigger_create(&on_replace, vy_check_format_on_replace, &ctx, NULL);
 	trigger_add(&space->on_replace, &on_replace);
 
+	/*
+	 * Wait for all transactions that began before we installed
+	 * the on_replace trigger so that we don't miss their write
+	 * set while checking the space format.
+	 */
+	tx_manager_flush(env->xm, env->timeout);
+
 	struct vy_read_iterator itr;
 	vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, key,
-			      &env->xm->p_global_read_view);
+			      &env->xm->p_committed_read_view);
 	int rc;
 	int loops = 0;
 	struct tuple *tuple;
@@ -4158,10 +4165,8 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 {
 	struct vy_env *env = vy_env(src_space->engine);
 	struct vy_lsm *pk = vy_lsm(src_space->index[0]);
-	bool is_empty = (pk->stat.disk.count.rows == 0 &&
-			 pk->stat.memory.count.rows == 0);
 
-	if (new_index->def->iid == 0 && !is_empty) {
+	if (new_index->def->iid == 0 && !vy_lsm_is_empty(pk)) {
 		diag_set(ClientError, ER_UNSUPPORTED, "Vinyl",
 			 "rebuilding the primary index of a non-empty space");
 		return -1;
@@ -4178,9 +4183,6 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 	    env->status == VINYL_FINAL_RECOVERY_LOCAL)
 		return vy_build_recover(env, new_lsm, pk);
 
-	if (is_empty)
-		return 0;
-
 	/*
 	 * Iterate over all tuples stored in the space and insert
 	 * each of them into the new LSM tree. Since read iterator
@@ -4203,6 +4205,13 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 	trigger_create(&on_replace, vy_build_on_replace, &ctx, NULL);
 	trigger_add(&src_space->on_replace, &on_replace);
 
+	/*
+	 * Wait for all transactions that began before we installed
+	 * the on_replace trigger so that we don't miss their write
+	 * set while building the new index.
+	 */
+	tx_manager_flush(env->xm, env->timeout);
+
 	struct vy_read_iterator itr;
 	vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, key,
 			      &env->xm->p_committed_read_view);
@@ -4253,9 +4262,10 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 
 	/*
 	 * Dump the new index upon build completion so that we don't
-	 * have to rebuild it on recovery.
+	 * have to rebuild it on recovery. No need to trigger dump if
+	 * the space happens to be empty.
 	 */
-	if (rc == 0)
+	if (rc == 0 && !vy_lsm_is_empty(new_lsm))
 		rc = vy_scheduler_dump(&env->scheduler);
 
 	if (rc == 0 && ctx.is_failed) {
diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h
index f0b7ec9c..08d30530 100644
--- a/src/box/vy_lsm.h
+++ b/src/box/vy_lsm.h
@@ -311,6 +311,17 @@ void
 vy_lsm_delete(struct vy_lsm *lsm);
 
 /**
+ * Return true if the LSM tree has no statements, neither on disk
+ * nor in memory.
+ */
+static inline bool
+vy_lsm_is_empty(struct vy_lsm *lsm)
+{
+	return (lsm->stat.disk.count.rows == 0 &&
+		lsm->stat.memory.count.rows == 0);
+}
+
+/**
  * Increment the reference counter of an LSM tree.
  * An LSM tree cannot be deleted if its reference
  * counter is elevated.
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 5c186b87..9ff03d94 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -42,6 +42,7 @@
 #include "diag.h"
 #include "errcode.h"
 #include "fiber.h"
+#include "fiber_cond.h"
 #include "iproto_constants.h"
 #include "iterator_type.h"
 #include "salad/stailq.h"
@@ -106,6 +107,7 @@ tx_manager_new(void)
 		return NULL;
 	}
 
+	rlist_create(&xm->write_list);
 	rlist_create(&xm->read_views);
 	vy_global_read_view_create((struct vy_read_view *)&xm->global_read_view,
 				   INT64_MAX);
@@ -121,6 +123,8 @@ tx_manager_new(void)
 		       sizeof(struct vy_read_interval));
 	mempool_create(&xm->read_view_mempool, slab_cache,
 		       sizeof(struct vy_read_view));
+
+	fiber_cond_create(&xm->cond);
 	return xm;
 }
 
@@ -134,6 +138,59 @@ tx_manager_delete(struct tx_manager *xm)
 	free(xm);
 }
 
+/**
+ * Wait until all transactions with the write identifier equal
+ * to the given one or older are completed or the timeout occurs.
+ */
+static int
+tx_manager_wait_write(struct tx_manager *xm,
+		      int64_t write_id, double timeout)
+{
+	double deadline = ev_monotonic_now(loop()) + timeout;
+	do {
+		if (rlist_empty(&xm->write_list))
+			return 0;
+		struct vy_tx *tx = rlist_first_entry(&xm->write_list,
+					struct vy_tx, in_write_list);
+		if (tx->write_id > write_id)
+			return 0;
+	} while (fiber_cond_wait_deadline(&xm->cond, deadline) == 0);
+	return -1;
+}
+
+void
+tx_manager_flush(struct tx_manager *xm, double timeout)
+{
+	/*
+	 * Wait for transactions started before this point
+	 * to complete within the specified timeout.
+	 */
+	int64_t write_id = xm->last_write_id;
+	if (tx_manager_wait_write(xm, write_id, timeout) == 0)
+		return; /* all transactions have completed */
+
+	/*
+	 * Abort transactions that failed to complete within
+	 * the specified timeout. Note, transactions that have
+	 * been submitted to WAL can't be aborted.
+	 */
+	struct vy_tx *tx, *next_tx;
+	rlist_foreach_entry_safe(tx, &xm->write_list, in_write_list, next_tx) {
+		if (tx->write_id > write_id)
+			break;
+		if (tx->state == VINYL_TX_COMMIT)
+			continue;
+		assert(tx->state == VINYL_TX_READY);
+		tx->state = VINYL_TX_ABORT;
+		rlist_del_entry(tx, in_write_list);
+	}
+	fiber_cond_broadcast(&xm->cond);
+
+	/* Wait for transactions awaiting WAL write. */
+	if (tx_manager_wait_write(xm, write_id, TIMEOUT_INFINITY) != 0)
+		unreachable();
+}
+
 /** Create or reuse an instance of a read view. */
 static struct vy_read_view *
 tx_manager_read_view(struct tx_manager *xm)
@@ -288,11 +345,15 @@ vy_tx_create(struct tx_manager *xm, struct vy_tx *tx)
 	vy_tx_read_set_new(&tx->read_set);
 	tx->psn = 0;
 	rlist_create(&tx->on_destroy);
+	tx->write_id = -1;
+	rlist_create(&tx->in_write_list);
 }
 
 void
 vy_tx_destroy(struct vy_tx *tx)
 {
+	assert(rlist_empty(&tx->in_write_list));
+
 	trigger_run(&tx->on_destroy, NULL);
 	trigger_destroy(&tx->on_destroy);
 
@@ -346,7 +407,9 @@ vy_tx_send_to_read_view(struct vy_tx *tx, struct txv *v)
 		if (rv == NULL)
 			return -1;
 		abort->read_view = rv;
+		rlist_del_entry(abort, in_write_list);
 	}
+	fiber_cond_broadcast(&tx->xm->cond);
 	return 0;
 }
 
@@ -368,7 +431,9 @@ vy_tx_abort_readers(struct vy_tx *tx, struct txv *v)
 		if (abort->state != VINYL_TX_READY)
 			continue;
 		abort->state = VINYL_TX_ABORT;
+		rlist_del_entry(abort, in_write_list);
 	}
+	fiber_cond_broadcast(&tx->xm->cond);
 }
 
 struct vy_tx *
@@ -594,6 +659,9 @@ vy_tx_commit(struct vy_tx *tx, int64_t lsn)
 	if (tx->read_view != &xm->global_read_view)
 		tx->read_view->vlsn = lsn;
 out:
+	rlist_del_entry(tx, in_write_list);
+	fiber_cond_broadcast(&xm->cond);
+
 	vy_tx_destroy(tx);
 	mempool_free(&xm->tx_mempool, tx);
 }
@@ -656,6 +724,9 @@ vy_tx_rollback(struct vy_tx *tx)
 	if (tx->state == VINYL_TX_COMMIT)
 		vy_tx_rollback_after_prepare(tx);
 
+	rlist_del_entry(tx, in_write_list);
+	fiber_cond_broadcast(&xm->cond);
+
 	vy_tx_destroy(tx);
 	mempool_free(&xm->tx_mempool, tx);
 }
@@ -852,6 +923,11 @@ vy_tx_set(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt)
 	tx->write_size += tuple_size(stmt);
 	vy_stmt_counter_acct_tuple(&lsm->stat.txw.count, stmt);
 	stailq_add_tail_entry(&tx->log, v, next_in_log);
+
+	if (tx->write_id < 0) {
+		tx->write_id = ++tx->xm->last_write_id;
+		rlist_add_tail_entry(&tx->xm->write_list, tx, in_write_list);
+	}
 	return 0;
 }
 
diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h
index dcf6a739..1a157299 100644
--- a/src/box/vy_tx.h
+++ b/src/box/vy_tx.h
@@ -39,6 +39,7 @@
 #include <small/rb.h>
 #include <small/rlist.h>
 
+#include "fiber_cond.h"
 #include "iterator_type.h"
 #include "salad/stailq.h"
 #include "trivia/util.h"
@@ -136,6 +137,14 @@ struct vy_tx {
 	/** Transaction manager. */
 	struct tx_manager *xm;
 	/**
+	 * Unique, monotonically growing identifier assigned
+	 * to a transaction when it performs its first write
+	 * operation. See also tx_manager::last_write_id.
+	 */
+	int64_t write_id;
+	/** Link in tx_manager::write_list. */
+	struct rlist in_write_list;
+	/**
 	 * In memory transaction log. Contains both reads
 	 * and writes.
 	 */
@@ -209,6 +218,19 @@ struct tx_manager {
 	 */
 	struct vy_tx *last_prepared_tx;
 	/**
+	 * Write identifier assigned to the most recent read-write
+	 * transaction. See also vy_tx::write_id.
+	 */
+	int64_t last_write_id;
+	/**
+	 * List of all transactions that have non-empty write list
+	 * and may actually commit (i.e. haven't been sent to read
+	 * view or aborted). Newer transactions (with greater write
+	 * identifiers) are closer to the tail of the list.
+	 * Linked by vy_tx::in_write_list.
+	 */
+	struct rlist write_list;
+	/**
 	 * The list of TXs with a read view in order of vlsn.
 	 */
 	struct rlist read_views;
@@ -238,6 +260,12 @@ struct tx_manager {
 	 * transaction to use in such places.
 	 */
 	const struct vy_read_view *p_committed_read_view;
+	/**
+	 * Condition variable signaled whenever a transaction
+	 * is committed, rolled back, aborted, or sent to read
+	 * view.
+	 */
+	struct fiber_cond cond;
 	/** Transaction statistics. */
 	struct vy_tx_stat stat;
 	/** Sum size of statements pinned by the write set. */
@@ -262,6 +290,14 @@ tx_manager_new(void);
 void
 tx_manager_delete(struct tx_manager *xm);
 
+/**
+ * Wait until all currently active truncactions have been
+ * committed or rolled back. Abort transactions that have
+ * not completed within the specified timeout.
+ */
+void
+tx_manager_flush(struct tx_manager *xm, double timeout);
+
 /** Initialize a tx object. */
 void
 vy_tx_create(struct tx_manager *xm, struct vy_tx *tx);
diff --git a/test/vinyl/ddl.result b/test/vinyl/ddl.result
index 3e65e232..bb4fc984 100644
--- a/test/vinyl/ddl.result
+++ b/test/vinyl/ddl.result
@@ -656,7 +656,6 @@ last_val = 1000;
 ---
 ...
 function gen_load()
-    fiber.sleep(0.001)
     local s = box.space.test
     for i = 1, 200 do
         local op = math.random(4)
diff --git a/test/vinyl/ddl.test.lua b/test/vinyl/ddl.test.lua
index 45c5cf8e..f7bf03bb 100644
--- a/test/vinyl/ddl.test.lua
+++ b/test/vinyl/ddl.test.lua
@@ -253,7 +253,6 @@ box.commit();
 last_val = 1000;
 
 function gen_load()
-    fiber.sleep(0.001)
     local s = box.space.test
     for i = 1, 200 do
         local op = math.random(4)
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index 28271fc9..fe3af69f 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -1844,3 +1844,184 @@ s.index.sk:stat().memory.rows
 s:drop()
 ---
 ...
+--
+-- gh-3458: the write set of a transaction started before DDL and
+-- committed during DDL is taken into account by space format check
+-- and index build.
+--
+fiber = require('fiber')
+---
+...
+errinj = box.error.injection
+---
+...
+vinyl_cache = box.cfg.vinyl_cache
+---
+...
+box.cfg{vinyl_cache = 0}
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('pk', {page_size = 16})
+---
+...
+pad = string.rep('x', 16)
+---
+...
+for i = 101, 200 do s:replace{i, i, pad} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+ch = fiber.channel(1)
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function async_replace(tuple, timeout)
+    fiber.create(function()
+        box.begin()
+        s:replace(tuple)
+        fiber.sleep(timeout)
+        local status = pcall(box.commit)
+        ch:put(status)
+    end)
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+async_replace({1}, 0.01)
+---
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+---
+- ok
+...
+s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail
+---
+- error: Tuple field count 1 is less than required by space format or defined indexes
+    (expected at least 2)
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+---
+- ok
+...
+ch:get()
+---
+- true
+...
+s:get{1} ~= nil
+---
+- true
+...
+s:delete{1}
+---
+...
+s:format{}
+---
+...
+async_replace({1, 1}, 0.01)
+---
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+---
+- ok
+...
+_ = s:create_index('sk', {parts = {2, 'unsigned'}})
+---
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+---
+- ok
+...
+ch:get()
+---
+- true
+...
+s:get{1} ~= nil
+---
+- true
+...
+s.index.pk:count() == s.index.sk:count()
+---
+- true
+...
+s.index.sk:drop()
+---
+...
+-- Transactions that failed to complete within timeout are aborted.
+vinyl_timeout = box.cfg.vinyl_timeout
+---
+...
+box.cfg{vinyl_timeout = 0.001}
+---
+...
+async_replace({2}, 0.01)
+---
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+---
+- ok
+...
+s:format{{'key', 'unsigned'}, {'value', 'unsigned'}}
+---
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+---
+- ok
+...
+ch:get()
+---
+- false
+...
+s:get{2} == nil
+---
+- true
+...
+s:format{}
+---
+...
+s:truncate()
+---
+...
+-- Transactions that reached WAL are never aborted.
+errinj.set('ERRINJ_WAL_DELAY', true)
+---
+- ok
+...
+_ = fiber.create(function() s:replace{1} end)
+---
+...
+_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end)
+---
+...
+fiber.sleep(0)
+---
+...
+s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail
+---
+- error: Tuple field count 1 is less than required by space format or defined indexes
+    (expected at least 2)
+...
+s:select()
+---
+- - [1]
+...
+s:drop()
+---
+...
+box.cfg{vinyl_timeout = vinyl_timeout}
+---
+...
+box.cfg{vinyl_cache = vinyl_cache}
+---
+...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index 000067d3..0a872bc9 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -736,3 +736,85 @@ s.index.sk:select()
 s.index.sk:stat().memory.rows
 
 s:drop()
+
+--
+-- gh-3458: the write set of a transaction started before DDL and
+-- committed during DDL is taken into account by space format check
+-- and index build.
+--
+fiber = require('fiber')
+errinj = box.error.injection
+
+vinyl_cache = box.cfg.vinyl_cache
+box.cfg{vinyl_cache = 0}
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('pk', {page_size = 16})
+
+pad = string.rep('x', 16)
+for i = 101, 200 do s:replace{i, i, pad} end
+box.snapshot()
+
+ch = fiber.channel(1)
+
+test_run:cmd("setopt delimiter ';'")
+function async_replace(tuple, timeout)
+    fiber.create(function()
+        box.begin()
+        s:replace(tuple)
+        fiber.sleep(timeout)
+        local status = pcall(box.commit)
+        ch:put(status)
+    end)
+end;
+test_run:cmd("setopt delimiter ''");
+
+async_replace({1}, 0.01)
+
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+
+ch:get()
+s:get{1} ~= nil
+s:delete{1}
+s:format{}
+
+async_replace({1, 1}, 0.01)
+
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+_ = s:create_index('sk', {parts = {2, 'unsigned'}})
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+
+ch:get()
+s:get{1} ~= nil
+s.index.pk:count() == s.index.sk:count()
+s.index.sk:drop()
+
+-- Transactions that failed to complete within timeout are aborted.
+vinyl_timeout = box.cfg.vinyl_timeout
+box.cfg{vinyl_timeout = 0.001}
+
+async_replace({2}, 0.01)
+
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+s:format{{'key', 'unsigned'}, {'value', 'unsigned'}}
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+
+ch:get()
+s:get{2} == nil
+s:format{}
+s:truncate()
+
+-- Transactions that reached WAL are never aborted.
+errinj.set('ERRINJ_WAL_DELAY', true)
+_ = fiber.create(function() s:replace{1} end)
+_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end)
+
+fiber.sleep(0)
+s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail
+s:select()
+
+s:drop()
+box.cfg{vinyl_timeout = vinyl_timeout}
+box.cfg{vinyl_cache = vinyl_cache}
diff --git a/test/vinyl/misc.result b/test/vinyl/misc.result
index 409df3e0..d2a7ca68 100644
--- a/test/vinyl/misc.result
+++ b/test/vinyl/misc.result
@@ -140,6 +140,11 @@ i6 = s:create_index('i6', {unique = true, parts = {4, 'unsigned', 6, 'unsigned',
 i7 = s:create_index('i7', {unique = true, parts = {6, 'unsigned'}})
 ---
 ...
+-- space.create_index() does a lookup in the primary index
+-- so reset the stats before calling space.insert().
+box.stat.reset()
+---
+...
 s:insert{1, 1, 1, 1, 1, 1}
 ---
 - [1, 1, 1, 1, 1, 1]
diff --git a/test/vinyl/misc.test.lua b/test/vinyl/misc.test.lua
index 3dbfdb54..9f61ca0a 100644
--- a/test/vinyl/misc.test.lua
+++ b/test/vinyl/misc.test.lua
@@ -62,6 +62,10 @@ i5 = s:create_index('i5', {unique = true, parts = {4, 'unsigned', 5, 'unsigned',
 i6 = s:create_index('i6', {unique = true, parts = {4, 'unsigned', 6, 'unsigned', 5, 'unsigned'}})
 i7 = s:create_index('i7', {unique = true, parts = {6, 'unsigned'}})
 
+-- space.create_index() does a lookup in the primary index
+-- so reset the stats before calling space.insert().
+box.stat.reset()
+
 s:insert{1, 1, 1, 1, 1, 1}
 
 i1:stat().lookup -- 1
-- 
2.11.0




More information about the Tarantool-patches mailing list