27 мая 2019 г., в 19:45, Vladimir Davydov <vdavydov.dev@gmail.com> написал(а):

On Thu, May 23, 2019 at 05:11:11PM +0300, Serge Petrenko wrote:
Memtx index build used to stall event loop for all the build period.
Add occasional yields so that the loop is not blocked for too long.
Also make index build set on_replace triggers so that concurrent
replaces are also correctly handled during the build.

Closes #3976

Worth mentioning this in the documentation?

Hi! Thank you for review.

I’ve addressed your comments in v3. Please check it out.


---
https://github.com/tarantool/tarantool/issues/3976
https://github.com/tarantool/tarantool/tree/sp/gh-3976-background-index-build

Changes in v2:
 - add an on_replace trigger
   to handle concurrent replaces
   while index build yields
 - modify test case slightly,
   test concurrent replaces.

src/box/memtx_space.c                         | 84 +++++++++++++++++++
test/box/memtx_background_index_build.result  | 77 +++++++++++++++++
.../box/memtx_background_index_build.test.lua | 46 ++++++++++
test/box/suite.ini                            |  2 +-
4 files changed, 208 insertions(+), 1 deletion(-)
create mode 100644 test/box/memtx_background_index_build.result
create mode 100644 test/box/memtx_background_index_build.test.lua

diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 5ddb4f7ee..2c72fa8f2 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -870,10 +870,63 @@ memtx_init_ephemeral_space(struct space *space)
memtx_space_add_primary_key(space);
}

+struct memtx_build_state {
+ struct index *index;
+ struct tuple_format *format;
+ struct tuple *tuple;
+ struct diag diag;
+ int rc;
+};
+
+static void
+memtx_build_on_replace(struct trigger *trigger, void *event)
+{
+ struct txn *txn = event;
+ struct memtx_build_state *state = trigger->data;
+ struct txn_stmt *stmt = txn_current_stmt(txn);
+
+ if (stmt->new_tuple != NULL &&
+     tuple_validate(state->format, stmt->new_tuple) != 0) {
+ state->rc = -1;
+ diag_move(diag_get(), &state->diag);
+ return;
+ }
+
+ struct tuple *cmp_tuple = stmt->new_tuple != NULL ? stmt->new_tuple :
+     stmt->old_tuple;
+ struct key_def *cmp_def = state->index->def->cmp_def;
+ hint_t state_hint = tuple_hint(state->tuple, cmp_def);
+ hint_t cmp_hint = tuple_hint(cmp_tuple, cmp_def);
+ /*
+  * Only update the already built part of an index. All the other
+  * tuples will be inserted when build continues.
+  */
+ if (tuple_compare(state->tuple, state_hint, cmp_tuple, cmp_hint, cmp_def) < 0)
+ return;

It's pointless to compute hints with tuple_hint() before calling
tuple_compare() - you could simply pass HINT_NONE to get the same
effect.

Anyway, this isn't going to work in case of a multikey index, because
the latter uses hints to store multikey offsets. Take a look at the
implementation of memtx_tree_index_replace_multikey().

+
+ struct tuple *delete = NULL;
+ state->rc = index_replace(state->index, stmt->old_tuple, stmt->new_tuple,
+   DUP_REPLACE, &delete);

You should abort index build if a duplicate record is inserted.

+
+ if (state->rc != 0) {
+ diag_move(diag_get(), &state->diag);
+ }
+ return;
+}
+
static int
memtx_space_build_index(struct space *src_space, struct index *new_index,
struct tuple_format *new_format)
{
+ /*
+  * Yield every 1K tuples.
+  * In debug mode yield more often for testing purposes.
+  */
+#ifdef NDEBUG
+ enum { YIELD_LOOPS = 1000 };
+#else
+ enum { YIELD_LOOPS = 10 };
+#endif
/**
 * If it's a secondary key, and we're not building them
 * yet (i.e. it's snapshot recovery for memtx), do nothing.
@@ -899,6 +952,16 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
if (it == NULL)
return -1;

+ struct memtx_build_state state;
+ state.index = new_index;
+ state.format = new_format;
+ state.rc = 0;
+ diag_create(&state.diag);
+
+ struct trigger on_replace;
+ trigger_create(&on_replace, memtx_build_on_replace, &state, NULL);
+ trigger_add(&src_space->on_replace, &on_replace);
+
/*
 * The index has to be built tuple by tuple, since
 * there is no guarantee that all tuples satisfy
@@ -909,6 +972,7 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
/* Build the new index. */
int rc;
struct tuple *tuple;
+ size_t count = 0;
while ((rc = iterator_next(it, &tuple)) == 0 && tuple != NULL) {
/*
 * Check that the tuple is OK according to the
@@ -933,8 +997,28 @@ memtx_space_build_index(struct space *src_space, struct index *new_index,
 */
if (new_index->def->iid == 0)
tuple_ref(tuple);
+ if (++count % YIELD_LOOPS == 0) {
+ /*
+  * Remember the latest inserted tuple to
+  * avoid re-inserting already processed
+  * tuples in on_replace trigger.
+  */
+ state.tuple = tuple;
+ fiber_sleep(0);
+ /*
+  * The on_replace trigger may have failed
+  * during the yield.
+  */
+ if (state.rc != 0) {
+ rc = -1;
+ diag_move(&state.diag, diag_get());
+ break;
+ }
+ }
}
iterator_delete(it);
+ diag_destroy(&state.diag);
+ trigger_clear(&on_replace);
return rc;
}

diff --git a/test/box/memtx_background_index_build.test.lua b/test/box/memtx_background_index_build.test.lua
new file mode 100644
index 000000000..34cd3da54
--- /dev/null
+++ b/test/box/memtx_background_index_build.test.lua
@@ -0,0 +1,46 @@
+env = require('test_run')
+test_run = env.new()
+
+fiber = require('fiber')
+
+
+test_run:cmd('setopt delimiter ";"')
+
+-- the fiber executing this function will serve two purposes.
+-- First of all it will count the number of context swtiches
+-- that happened during the index build.
+-- Secondly, it will issue concurrent replaces in
+-- the space to check that such replaces are handled correctly
+-- during the index build.
+function f()
+    local i = 0
+    while true do
+        i = i + 1
+        box.space.test:replace{i, "new"}
+        box.space.test:replace{1001-i, "new"}
+        fiber.yield()
+        fiber.testcancel()
+    end
+end;
+test_run:cmd('setopt delimiter ""');
+
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+
+for i = 1,1000 do box.space.test:insert{i} end
+
+csw = 0
+fib = fiber.new(f) _ = box.space.test:create_index('sk') csw = fiber.info()[fib.id()].csw
+fiber.cancel(fib)
+
+-- index build in debug mode should yield every 10 iterations.
+-- This means we will have at least 100 event loop iterations
+-- during index build.
+csw >= 100 or csw

Why don't you simply introduce a new error injection that would stall
index build opening a time window for inserting a new record? IMO the
test would be much easier for understanding that way.

OTOH it might make sense to write a stress test that would insert random
records and check index consistency upon completion. E.g. take a look at
vinyl/ddl.test.lua.

+-- check that replaces generated by concurrent transactions
+-- also update the index correctly
+box.space.test.index[1]:get(1)
+box.space.test.index[1]:get(1000)
+box.space.test.index[1]:get(500)
+
+box.space.test:drop()

The test is insufficient. You should also check the following cases:

- Index build is aborted if a tuple that doesn't match the new index
  format is inserted into the space.
- Index build is aborted if the unique constraint of the new index is
  violated.
- Modifying the space while a multikey index is being built works fine.
- Indexes are consistent after recovery.

I think it would be best if you adopted corresponding vinyl/ddl test
cases and moved them to the engine suite.