[PATCH v2] memtx: add yields during index build

Serge Petrenko sergepetrenko at tarantool.org
Thu May 23 17:11:11 MSK 2019


Memtx index build used to stall event loop for all the build period.
Add occasional yields so that the loop is not blocked for too long.
Also make index build set on_replace triggers so that concurrent
replaces are also correctly handled during the build.

Closes #3976
---
https://github.com/tarantool/tarantool/issues/3976
https://github.com/tarantool/tarantool/tree/sp/gh-3976-background-index-build

Changes in v2:
  - add an on_replace trigger
    to handle concurrent replaces
    while index build yields
  - modify test case slightly,
    test concurrent replaces.

 src/box/memtx_space.c                         | 84 +++++++++++++++++++
 test/box/memtx_background_index_build.result  | 77 +++++++++++++++++
 .../box/memtx_background_index_build.test.lua | 46 ++++++++++
 test/box/suite.ini                            |  2 +-
 4 files changed, 208 insertions(+), 1 deletion(-)
 create mode 100644 test/box/memtx_background_index_build.result
 create mode 100644 test/box/memtx_background_index_build.test.lua

diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 5ddb4f7ee..2c72fa8f2 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -870,10 +870,63 @@ memtx_init_ephemeral_space(struct space *space)
 	memtx_space_add_primary_key(space);
 }
 
+struct memtx_build_state {
+	struct index *index;
+	struct tuple_format *format;
+	struct tuple *tuple;
+	struct diag diag;
+	int rc;
+};
+
+static void
+memtx_build_on_replace(struct trigger *trigger, void *event)
+{
+	struct txn *txn = event;
+	struct memtx_build_state *state = trigger->data;
+	struct txn_stmt *stmt = txn_current_stmt(txn);
+
+	if (stmt->new_tuple != NULL &&
+	    tuple_validate(state->format, stmt->new_tuple) != 0) {
+		state->rc = -1;
+		diag_move(diag_get(), &state->diag);
+		return;
+	}
+
+	struct tuple *cmp_tuple = stmt->new_tuple != NULL ? stmt->new_tuple :
+							    stmt->old_tuple;
+	struct key_def *cmp_def = state->index->def->cmp_def;
+	hint_t state_hint = tuple_hint(state->tuple, cmp_def);
+	hint_t cmp_hint = tuple_hint(cmp_tuple, cmp_def);
+	/*
+	 * Only update the already built part of an index. All the other
+	 * tuples will be inserted when build continues.
+	 */
+	if (tuple_compare(state->tuple, state_hint, cmp_tuple, cmp_hint, cmp_def) < 0)
+		return;
+
+	struct tuple *delete = NULL;
+	state->rc = index_replace(state->index, stmt->old_tuple, stmt->new_tuple,
+				  DUP_REPLACE, &delete);
+
+	if (state->rc != 0) {
+		diag_move(diag_get(), &state->diag);
+	}
+	return;
+}
+
 static int
 memtx_space_build_index(struct space *src_space, struct index *new_index,
 			struct tuple_format *new_format)
 {
+	/*
+	 * Yield every 1K tuples.
+	 * In debug mode yield more often for testing purposes.
+	 */
+#ifdef NDEBUG
+	enum { YIELD_LOOPS = 1000 };
+#else
+	enum { YIELD_LOOPS = 10 };
+#endif
 	/**
 	 * If it's a secondary key, and we're not building them
 	 * yet (i.e. it's snapshot recovery for memtx), do nothing.
@@ -899,6 +952,16 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
 	if (it == NULL)
 		return -1;
 
+	struct memtx_build_state state;
+	state.index = new_index;
+	state.format = new_format;
+	state.rc = 0;
+	diag_create(&state.diag);
+
+	struct trigger on_replace;
+	trigger_create(&on_replace, memtx_build_on_replace, &state, NULL);
+	trigger_add(&src_space->on_replace, &on_replace);
+
 	/*
 	 * The index has to be built tuple by tuple, since
 	 * there is no guarantee that all tuples satisfy
@@ -909,6 +972,7 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
 	/* Build the new index. */
 	int rc;
 	struct tuple *tuple;
+	size_t count = 0;
 	while ((rc = iterator_next(it, &tuple)) == 0 && tuple != NULL) {
 		/*
 		 * Check that the tuple is OK according to the
@@ -933,8 +997,28 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
 		 */
 		if (new_index->def->iid == 0)
 			tuple_ref(tuple);
+		if (++count % YIELD_LOOPS == 0) {
+			/*
+			 * Remember the latest inserted tuple to
+			 * avoid re-inserting already processed
+			 * tuples in on_replace trigger.
+			 */
+			state.tuple = tuple;
+			fiber_sleep(0);
+			/*
+			 * The on_replace trigger may have failed
+			 * during the yield.
+			 */
+			if (state.rc != 0) {
+				rc = -1;
+				diag_move(&state.diag, diag_get());
+				break;
+			}
+		}
 	}
 	iterator_delete(it);
+	diag_destroy(&state.diag);
+	trigger_clear(&on_replace);
 	return rc;
 }
 
diff --git a/test/box/memtx_background_index_build.result b/test/box/memtx_background_index_build.result
new file mode 100644
index 000000000..d47a01900
--- /dev/null
+++ b/test/box/memtx_background_index_build.result
@@ -0,0 +1,77 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+fiber = require('fiber')
+---
+...
+test_run:cmd('setopt delimiter ";"')
+---
+- true
+...
+-- the fiber executing this function will serve two purposes.
+-- First of all it will count the number of context swtiches
+-- that happened during the index build.
+-- Secondly, it will issue concurrent replaces in
+-- the space to check that such replaces are handled correctly
+-- during the index build.
+function f()
+    local i = 0
+    while true do
+        i = i + 1
+        box.space.test:replace{i, "new"}
+        box.space.test:replace{1001-i, "new"}
+        fiber.yield()
+        fiber.testcancel()
+    end
+end;
+---
+...
+test_run:cmd('setopt delimiter ""');
+---
+- true
+...
+_ = box.schema.space.create('test')
+---
+...
+_ = box.space.test:create_index('pk')
+---
+...
+for i = 1,1000 do box.space.test:insert{i} end
+---
+...
+csw = 0
+---
+...
+fib = fiber.new(f) _ = box.space.test:create_index('sk') csw = fiber.info()[fib.id()].csw
+---
+...
+fiber.cancel(fib)
+---
+...
+-- index build in debug mode should yield every 10 iterations.
+-- This means we will have at least 100 event loop iterations
+-- during index build.
+csw >= 100 or csw
+---
+- true
+...
+-- check that replaces generated by concurrent transactions
+-- also update the index correctly
+box.space.test.index[1]:get(1)
+---
+- [1, 'new']
+...
+box.space.test.index[1]:get(1000)
+---
+- [1000, 'new']
+...
+box.space.test.index[1]:get(500)
+---
+- [500]
+...
+box.space.test:drop()
+---
+...
diff --git a/test/box/memtx_background_index_build.test.lua b/test/box/memtx_background_index_build.test.lua
new file mode 100644
index 000000000..34cd3da54
--- /dev/null
+++ b/test/box/memtx_background_index_build.test.lua
@@ -0,0 +1,46 @@
+env = require('test_run')
+test_run = env.new()
+
+fiber = require('fiber')
+
+
+test_run:cmd('setopt delimiter ";"')
+
+-- the fiber executing this function will serve two purposes.
+-- First of all it will count the number of context swtiches
+-- that happened during the index build.
+-- Secondly, it will issue concurrent replaces in
+-- the space to check that such replaces are handled correctly
+-- during the index build.
+function f()
+    local i = 0
+    while true do
+        i = i + 1
+        box.space.test:replace{i, "new"}
+        box.space.test:replace{1001-i, "new"}
+        fiber.yield()
+        fiber.testcancel()
+    end
+end;
+test_run:cmd('setopt delimiter ""');
+
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+
+for i = 1,1000 do box.space.test:insert{i} end
+
+csw = 0
+fib = fiber.new(f) _ = box.space.test:create_index('sk') csw = fiber.info()[fib.id()].csw
+fiber.cancel(fib)
+
+-- index build in debug mode should yield every 10 iterations.
+-- This means we will have at least 100 event loop iterations
+-- during index build.
+csw >= 100 or csw
+-- check that replaces generated by concurrent transactions
+-- also update the index correctly
+box.space.test.index[1]:get(1)
+box.space.test.index[1]:get(1000)
+box.space.test.index[1]:get(500)
+
+box.space.test:drop()
diff --git a/test/box/suite.ini b/test/box/suite.ini
index c7b75c173..5283d2031 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
 description = Database tests
 script = box.lua
 disabled = rtree_errinj.test.lua tuple_bench.test.lua
-release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua
+release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua memtx_background_index_build.test.lua
 lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua
 use_unix_sockets = True
 is_parallel = True
-- 
2.20.1 (Apple Git-117)




More information about the Tarantool-patches mailing list