From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 3/6] Don't take schema lock for checkpointing Date: Sun, 30 Jun 2019 22:40:16 +0300 Message-Id: <2b12ec00f416bbb2ab214c5a0ddfc373375ed038.1561922496.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: Memtx checkpointing proceeds as follows: first we open iterators over primary indexes of all spaces and save them to a list, then we start a thread that uses the iterators to dump space contents to a snap file. To avoid accessing a freed tuple, we put the small allocator to the delayed free mode. However, this doesn't prevent an index from being dropped so we also take the schema lock to lock out any DDL operation that can potentially destroy a space or an index. Note, vinyl doesn't need this lock, because it implements index reference counting under the hood. Actually, we don't really need to take a lock - instead we can simply postpone index destruction until checkpointing is complete, similarly to how we postpone destruction of individual tuples. We even have all the infrastructure for this - it's delayed garbage collection. So this patch tweaks it a bit to delay the actual index destruction to be done after checkpointing is complete. This is a step forward towards removal of the schema lock, which stands in the way of transactional DDL. --- src/box/gc.c | 8 ------ src/box/memtx_engine.c | 42 ++++++++++++++++++++++++++------ src/box/memtx_engine.h | 5 ++++ test/app-tap/snapshot.test.lua | 5 +++- test/engine/errinj_ddl.result | 54 +++++++++++++++++++++++++++++++++++++++++ test/engine/errinj_ddl.test.lua | 24 ++++++++++++++++++ 6 files changed, 121 insertions(+), 17 deletions(-) diff --git a/src/box/gc.c b/src/box/gc.c index 5639edd8..a2c963e0 100644 --- a/src/box/gc.c +++ b/src/box/gc.c @@ -54,7 +54,6 @@ #include "say.h" #include "vclock.h" #include "cbus.h" -#include "schema.h" #include "engine.h" /* engine_collect_garbage() */ #include "wal.h" /* wal_collect_garbage() */ #include "checkpoint_schedule.h" @@ -369,12 +368,6 @@ gc_do_checkpoint(void) gc.checkpoint_is_in_progress = true; /* - * We don't support DDL operations while making a checkpoint. - * Lock them out. - */ - latch_lock(&schema_lock); - - /* * Rotate WAL and call engine callbacks to create a checkpoint * on disk for each registered engine. */ @@ -398,7 +391,6 @@ out: if (rc != 0) engine_abort_checkpoint(); - latch_unlock(&schema_lock); gc.checkpoint_is_in_progress = false; return rc; } diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index e259bf62..c8376110 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -527,20 +527,20 @@ checkpoint_write_row(struct xlog *l, struct xrow_header *row) } static int -checkpoint_write_tuple(struct xlog *l, struct space *space, +checkpoint_write_tuple(struct xlog *l, uint32_t space_id, uint32_t group_id, const char *data, uint32_t size) { struct request_replace_body body; body.m_body = 0x82; /* map of two elements. */ body.k_space_id = IPROTO_SPACE_ID; body.m_space_id = 0xce; /* uint32 */ - body.v_space_id = mp_bswap_u32(space_id(space)); + body.v_space_id = mp_bswap_u32(space_id); body.k_tuple = IPROTO_TUPLE; struct xrow_header row; memset(&row, 0, sizeof(struct xrow_header)); row.type = IPROTO_INSERT; - row.group_id = space_group_id(space); + row.group_id = group_id; row.bodycnt = 2; row.body[0].iov_base = &body; @@ -551,7 +551,8 @@ checkpoint_write_tuple(struct xlog *l, struct space *space, } struct checkpoint_entry { - struct space *space; + uint32_t space_id; + uint32_t group_id; struct snapshot_iterator *iterator; struct rlist link; }; @@ -641,7 +642,8 @@ checkpoint_add_space(struct space *sp, void *data) } rlist_add_tail_entry(&ckpt->entries, entry, link); - entry->space = sp; + entry->space_id = space_id(sp); + entry->group_id = space_group_id(sp); entry->iterator = index_create_snapshot_iterator(pk); if (entry->iterator == NULL) return -1; @@ -677,8 +679,8 @@ checkpoint_f(va_list ap) struct snapshot_iterator *it = entry->iterator; for (data = it->next(it, &size); data != NULL; data = it->next(it, &size)) { - if (checkpoint_write_tuple(&snap, entry->space, - data, size) != 0) { + if (checkpoint_write_tuple(&snap, entry->space_id, + entry->group_id, data, size) != 0) { xlog_close(&snap, false); return -1; } @@ -748,6 +750,19 @@ memtx_engine_wait_checkpoint(struct engine *engine, return result; } +/** + * Called after checkpointing is complete to free indexes dropped + * while checkpointing was in progress, see memtx_engine_run_gc(). + */ +static void +memtx_engine_gc_after_checkpoint(struct memtx_engine *memtx) +{ + struct memtx_gc_task *task, *next; + stailq_foreach_entry_safe(task, next, &memtx->gc_to_free, link) + task->vtab->free(task); + stailq_create(&memtx->gc_to_free); +} + static void memtx_engine_commit_checkpoint(struct engine *engine, const struct vclock *vclock) @@ -785,6 +800,8 @@ memtx_engine_commit_checkpoint(struct engine *engine, checkpoint_delete(memtx->checkpoint); memtx->checkpoint = NULL; + + memtx_engine_gc_after_checkpoint(memtx); } static void @@ -969,7 +986,15 @@ memtx_engine_run_gc(struct memtx_engine *memtx, bool *stop) task->vtab->run(task, &task_done); if (task_done) { stailq_shift(&memtx->gc_queue); - task->vtab->free(task); + /* + * If checkpointing is in progress, the index may be + * used by the checkpoint thread so we postpone freeing + * until checkpointing is complete. + */ + if (memtx->checkpoint == NULL) + task->vtab->free(task); + else + stailq_add_entry(&memtx->gc_to_free, task, link); } } @@ -1041,6 +1066,7 @@ memtx_engine_new(const char *snap_dirname, bool force_recovery, } stailq_create(&memtx->gc_queue); + stailq_create(&memtx->gc_to_free); memtx->gc_fiber = fiber_new("memtx.gc", memtx_engine_gc_f); if (memtx->gc_fiber == NULL) goto fail; diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h index ccb51678..fcf595e7 100644 --- a/src/box/memtx_engine.h +++ b/src/box/memtx_engine.h @@ -155,6 +155,11 @@ struct memtx_engine { * memtx_gc_task::link. */ struct stailq gc_queue; + /** + * List of tasks awaiting to be freed once checkpointing + * is complete, linked by memtx_gc_task::link. + */ + struct stailq gc_to_free; }; struct memtx_gc_task; diff --git a/test/app-tap/snapshot.test.lua b/test/app-tap/snapshot.test.lua index f8ad1631..d86f32fe 100755 --- a/test/app-tap/snapshot.test.lua +++ b/test/app-tap/snapshot.test.lua @@ -86,7 +86,8 @@ local i2 = s2:create_index('test', { type = 'tree', parts = {1, 'unsigned'} }) for i = 1,1000 do s1:insert{i, i, i} end -fiber.create(function () box.snapshot() end) +local snap_chan = fiber.channel() +fiber.create(function () box.snapshot() snap_chan:put(true) end) fiber.sleep(0) @@ -96,6 +97,8 @@ s2:update({1}, {{'+', 2, 2}}) s1:drop() s2:drop() +snap_chan:get() + test:ok(true, "gh-1185: no crash in matras_touch") ------------------------------------------------------------------------------- diff --git a/test/engine/errinj_ddl.result b/test/engine/errinj_ddl.result index 5f404300..a28223a6 100644 --- a/test/engine/errinj_ddl.result +++ b/test/engine/errinj_ddl.result @@ -350,3 +350,57 @@ s:count() -- 200 s:drop() --- ... +-- +-- Dropping and recreating a space while it is being written +-- to a checkpoint. +-- +s = box.schema.space.create('test', {engine = engine}) +--- +... +_ = s:create_index('pk') +--- +... +for i = 1, 5 do s:insert{i, i} end +--- +... +errinj.set("ERRINJ_SNAP_WRITE_DELAY", true) +--- +- ok +... +ch = fiber.channel(1) +--- +... +_ = fiber.create(function() box.snapshot() ch:put(true) end) +--- +... +s:drop() +--- +... +s = box.schema.space.create('test', {engine = engine}) +--- +... +_ = s:create_index('pk') +--- +... +for i = 1, 5 do s:insert{i, i * 10} end +--- +... +errinj.set("ERRINJ_SNAP_WRITE_DELAY", false) +--- +- ok +... +ch:get() +--- +- true +... +s:select() +--- +- - [1, 10] + - [2, 20] + - [3, 30] + - [4, 40] + - [5, 50] +... +s:drop() +--- +... diff --git a/test/engine/errinj_ddl.test.lua b/test/engine/errinj_ddl.test.lua index a382daa4..c40eae93 100644 --- a/test/engine/errinj_ddl.test.lua +++ b/test/engine/errinj_ddl.test.lua @@ -164,3 +164,27 @@ ch:get() s:count() -- 200 s:drop() + +-- +-- Dropping and recreating a space while it is being written +-- to a checkpoint. +-- +s = box.schema.space.create('test', {engine = engine}) +_ = s:create_index('pk') +for i = 1, 5 do s:insert{i, i} end + +errinj.set("ERRINJ_SNAP_WRITE_DELAY", true) +ch = fiber.channel(1) +_ = fiber.create(function() box.snapshot() ch:put(true) end) + +s:drop() + +s = box.schema.space.create('test', {engine = engine}) +_ = s:create_index('pk') +for i = 1, 5 do s:insert{i, i * 10} end + +errinj.set("ERRINJ_SNAP_WRITE_DELAY", false) +ch:get() + +s:select() +s:drop() -- 2.11.0