On Thu, May 23, 2019 at 05:11:11PM +0300, Serge Petrenko wrote:Memtx index build used to stall event loop for all the build period.
Add occasional yields so that the loop is not blocked for too long.
Also make index build set on_replace triggers so that concurrent
replaces are also correctly handled during the build.
Closes #3976
Worth mentioning this in the documentation?
Hi! Thank you for review.
I’ve addressed your comments in v3. Please check it out.
---
https://github.com/tarantool/tarantool/issues/3976
https://github.com/tarantool/tarantool/tree/sp/gh-3976-background-index-build
Changes in v2:
- add an on_replace trigger
to handle concurrent replaces
while index build yields
- modify test case slightly,
test concurrent replaces.
src/box/memtx_space.c | 84 +++++++++++++++++++
test/box/memtx_background_index_build.result | 77 +++++++++++++++++
.../box/memtx_background_index_build.test.lua | 46 ++++++++++
test/box/suite.ini | 2 +-
4 files changed, 208 insertions(+), 1 deletion(-)
create mode 100644 test/box/memtx_background_index_build.result
create mode 100644 test/box/memtx_background_index_build.test.lua
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 5ddb4f7ee..2c72fa8f2 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -870,10 +870,63 @@ memtx_init_ephemeral_space(struct space *space)
memtx_space_add_primary_key(space);
}
+struct memtx_build_state {
+ struct index *index;
+ struct tuple_format *format;
+ struct tuple *tuple;
+ struct diag diag;
+ int rc;
+};
+
+static void
+memtx_build_on_replace(struct trigger *trigger, void *event)
+{
+ struct txn *txn = event;
+ struct memtx_build_state *state = trigger->data;
+ struct txn_stmt *stmt = txn_current_stmt(txn);
+
+ if (stmt->new_tuple != NULL &&
+ tuple_validate(state->format, stmt->new_tuple) != 0) {
+ state->rc = -1;
+ diag_move(diag_get(), &state->diag);
+ return;
+ }
+
+ struct tuple *cmp_tuple = stmt->new_tuple != NULL ? stmt->new_tuple :
+ stmt->old_tuple;
+ struct key_def *cmp_def = state->index->def->cmp_def;
+ hint_t state_hint = tuple_hint(state->tuple, cmp_def);
+ hint_t cmp_hint = tuple_hint(cmp_tuple, cmp_def);
+ /*
+ * Only update the already built part of an index. All the other
+ * tuples will be inserted when build continues.
+ */
+ if (tuple_compare(state->tuple, state_hint, cmp_tuple, cmp_hint, cmp_def) < 0)
+ return;
It's pointless to compute hints with tuple_hint() before callingtuple_compare() - you could simply pass HINT_NONE to get the sameeffect.Anyway, this isn't going to work in case of a multikey index, becausethe latter uses hints to store multikey offsets. Take a look at theimplementation of memtx_tree_index_replace_multikey().+
+ struct tuple *delete = NULL;
+ state->rc = index_replace(state->index, stmt->old_tuple, stmt->new_tuple,
+ DUP_REPLACE, &delete);
You should abort index build if a duplicate record is inserted.+
+ if (state->rc != 0) {
+ diag_move(diag_get(), &state->diag);
+ }
+ return;
+}
+
static int
memtx_space_build_index(struct space *src_space, struct index *new_index,
struct tuple_format *new_format)
{
+ /*
+ * Yield every 1K tuples.
+ * In debug mode yield more often for testing purposes.
+ */
+#ifdef NDEBUG
+ enum { YIELD_LOOPS = 1000 };
+#else
+ enum { YIELD_LOOPS = 10 };
+#endif
/**
* If it's a secondary key, and we're not building them
* yet (i.e. it's snapshot recovery for memtx), do nothing.
@@ -899,6 +952,16 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
if (it == NULL)
return -1;
+ struct memtx_build_state state;
+ state.index = new_index;
+ state.format = new_format;
+ state.rc = 0;
+ diag_create(&state.diag);
+
+ struct trigger on_replace;
+ trigger_create(&on_replace, memtx_build_on_replace, &state, NULL);
+ trigger_add(&src_space->on_replace, &on_replace);
+
/*
* The index has to be built tuple by tuple, since
* there is no guarantee that all tuples satisfy
@@ -909,6 +972,7 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
/* Build the new index. */
int rc;
struct tuple *tuple;
+ size_t count = 0;
while ((rc = iterator_next(it, &tuple)) == 0 && tuple != NULL) {
/*
* Check that the tuple is OK according to the
@@ -933,8 +997,28 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
*/
if (new_index->def->iid == 0)
tuple_ref(tuple);
+ if (++count % YIELD_LOOPS == 0) {
+ /*
+ * Remember the latest inserted tuple to
+ * avoid re-inserting already processed
+ * tuples in on_replace trigger.
+ */
+ state.tuple = tuple;
+ fiber_sleep(0);
+ /*
+ * The on_replace trigger may have failed
+ * during the yield.
+ */
+ if (state.rc != 0) {
+ rc = -1;
+ diag_move(&state.diag, diag_get());
+ break;
+ }
+ }
}
iterator_delete(it);
+ diag_destroy(&state.diag);
+ trigger_clear(&on_replace);
return rc;
}
diff --git a/test/box/memtx_background_index_build.test.lua b/test/box/memtx_background_index_build.test.lua
new file mode 100644
index 000000000..34cd3da54
--- /dev/null
+++ b/test/box/memtx_background_index_build.test.lua
@@ -0,0 +1,46 @@
+env = require('test_run')
+test_run = env.new()
+
+fiber = require('fiber')
+
+
+test_run:cmd('setopt delimiter ";"')
+
+-- the fiber executing this function will serve two purposes.
+-- First of all it will count the number of context swtiches
+-- that happened during the index build.
+-- Secondly, it will issue concurrent replaces in
+-- the space to check that such replaces are handled correctly
+-- during the index build.
+function f()
+ local i = 0
+ while true do
+ i = i + 1
+ box.space.test:replace{i, "new"}
+ box.space.test:replace{1001-i, "new"}
+ fiber.yield()
+ fiber.testcancel()
+ end
+end;
+test_run:cmd('setopt delimiter ""');
+
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+
+for i = 1,1000 do box.space.test:insert{i} end
+
+csw = 0
+fib = fiber.new(f) _ = box.space.test:create_index('sk') csw = fiber.info()[fib.id()].csw
+fiber.cancel(fib)
+
+-- index build in debug mode should yield every 10 iterations.
+-- This means we will have at least 100 event loop iterations
+-- during index build.
+csw >= 100 or csw
Why don't you simply introduce a new error injection that would stallindex build opening a time window for inserting a new record? IMO thetest would be much easier for understanding that way.OTOH it might make sense to write a stress test that would insert randomrecords and check index consistency upon completion. E.g. take a look atvinyl/ddl.test.lua.+-- check that replaces generated by concurrent transactions
+-- also update the index correctly
+box.space.test.index[1]:get(1)
+box.space.test.index[1]:get(1000)
+box.space.test.index[1]:get(500)
+
+box.space.test:drop()
The test is insufficient. You should also check the following cases:- Index build is aborted if a tuple that doesn't match the new index format is inserted into the space.- Index build is aborted if the unique constraint of the new index is violated.- Modifying the space while a multikey index is being built works fine.- Indexes are consistent after recovery.I think it would be best if you adopted corresponding vinyl/ddl testcases and moved them to the engine suite.