[PATCH 3/6] Don't take schema lock for checkpointing

Vladimir Davydov vdavydov.dev at gmail.com
Sun Jun 30 22:40:16 MSK 2019


Memtx checkpointing proceeds as follows: first we open iterators over
primary indexes of all spaces and save them to a list, then we start
a thread that uses the iterators to dump space contents to a snap file.
To avoid accessing a freed tuple, we put the small allocator to the
delayed free mode. However, this doesn't prevent an index from being
dropped so we also take the schema lock to lock out any DDL operation
that can potentially destroy a space or an index. Note, vinyl doesn't
need this lock, because it implements index reference counting under
the hood.

Actually, we don't really need to take a lock - instead we can simply
postpone index destruction until checkpointing is complete, similarly
to how we postpone destruction of individual tuples. We even have all
the infrastructure for this - it's delayed garbage collection. So this
patch tweaks it a bit to delay the actual index destruction to be done
after checkpointing is complete.

This is a step forward towards removal of the schema lock, which stands
in the way of transactional DDL.
---
 src/box/gc.c                    |  8 ------
 src/box/memtx_engine.c          | 42 ++++++++++++++++++++++++++------
 src/box/memtx_engine.h          |  5 ++++
 test/app-tap/snapshot.test.lua  |  5 +++-
 test/engine/errinj_ddl.result   | 54 +++++++++++++++++++++++++++++++++++++++++
 test/engine/errinj_ddl.test.lua | 24 ++++++++++++++++++
 6 files changed, 121 insertions(+), 17 deletions(-)

diff --git a/src/box/gc.c b/src/box/gc.c
index 5639edd8..a2c963e0 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -54,7 +54,6 @@
 #include "say.h"
 #include "vclock.h"
 #include "cbus.h"
-#include "schema.h"
 #include "engine.h"		/* engine_collect_garbage() */
 #include "wal.h"		/* wal_collect_garbage() */
 #include "checkpoint_schedule.h"
@@ -369,12 +368,6 @@ gc_do_checkpoint(void)
 	gc.checkpoint_is_in_progress = true;
 
 	/*
-	 * We don't support DDL operations while making a checkpoint.
-	 * Lock them out.
-	 */
-	latch_lock(&schema_lock);
-
-	/*
 	 * Rotate WAL and call engine callbacks to create a checkpoint
 	 * on disk for each registered engine.
 	 */
@@ -398,7 +391,6 @@ out:
 	if (rc != 0)
 		engine_abort_checkpoint();
 
-	latch_unlock(&schema_lock);
 	gc.checkpoint_is_in_progress = false;
 	return rc;
 }
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index e259bf62..c8376110 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -527,20 +527,20 @@ checkpoint_write_row(struct xlog *l, struct xrow_header *row)
 }
 
 static int
-checkpoint_write_tuple(struct xlog *l, struct space *space,
+checkpoint_write_tuple(struct xlog *l, uint32_t space_id, uint32_t group_id,
 		       const char *data, uint32_t size)
 {
 	struct request_replace_body body;
 	body.m_body = 0x82; /* map of two elements. */
 	body.k_space_id = IPROTO_SPACE_ID;
 	body.m_space_id = 0xce; /* uint32 */
-	body.v_space_id = mp_bswap_u32(space_id(space));
+	body.v_space_id = mp_bswap_u32(space_id);
 	body.k_tuple = IPROTO_TUPLE;
 
 	struct xrow_header row;
 	memset(&row, 0, sizeof(struct xrow_header));
 	row.type = IPROTO_INSERT;
-	row.group_id = space_group_id(space);
+	row.group_id = group_id;
 
 	row.bodycnt = 2;
 	row.body[0].iov_base = &body;
@@ -551,7 +551,8 @@ checkpoint_write_tuple(struct xlog *l, struct space *space,
 }
 
 struct checkpoint_entry {
-	struct space *space;
+	uint32_t space_id;
+	uint32_t group_id;
 	struct snapshot_iterator *iterator;
 	struct rlist link;
 };
@@ -641,7 +642,8 @@ checkpoint_add_space(struct space *sp, void *data)
 	}
 	rlist_add_tail_entry(&ckpt->entries, entry, link);
 
-	entry->space = sp;
+	entry->space_id = space_id(sp);
+	entry->group_id = space_group_id(sp);
 	entry->iterator = index_create_snapshot_iterator(pk);
 	if (entry->iterator == NULL)
 		return -1;
@@ -677,8 +679,8 @@ checkpoint_f(va_list ap)
 		struct snapshot_iterator *it = entry->iterator;
 		for (data = it->next(it, &size); data != NULL;
 		     data = it->next(it, &size)) {
-			if (checkpoint_write_tuple(&snap, entry->space,
-					data, size) != 0) {
+			if (checkpoint_write_tuple(&snap, entry->space_id,
+					entry->group_id, data, size) != 0) {
 				xlog_close(&snap, false);
 				return -1;
 			}
@@ -748,6 +750,19 @@ memtx_engine_wait_checkpoint(struct engine *engine,
 	return result;
 }
 
+/**
+ * Called after checkpointing is complete to free indexes dropped
+ * while checkpointing was in progress, see memtx_engine_run_gc().
+ */
+static void
+memtx_engine_gc_after_checkpoint(struct memtx_engine *memtx)
+{
+	struct memtx_gc_task *task, *next;
+	stailq_foreach_entry_safe(task, next, &memtx->gc_to_free, link)
+		task->vtab->free(task);
+	stailq_create(&memtx->gc_to_free);
+}
+
 static void
 memtx_engine_commit_checkpoint(struct engine *engine,
 			       const struct vclock *vclock)
@@ -785,6 +800,8 @@ memtx_engine_commit_checkpoint(struct engine *engine,
 
 	checkpoint_delete(memtx->checkpoint);
 	memtx->checkpoint = NULL;
+
+	memtx_engine_gc_after_checkpoint(memtx);
 }
 
 static void
@@ -969,7 +986,15 @@ memtx_engine_run_gc(struct memtx_engine *memtx, bool *stop)
 	task->vtab->run(task, &task_done);
 	if (task_done) {
 		stailq_shift(&memtx->gc_queue);
-		task->vtab->free(task);
+		/*
+		 * If checkpointing is in progress, the index may be
+		 * used by the checkpoint thread so we postpone freeing
+		 * until checkpointing is complete.
+		 */
+		if (memtx->checkpoint == NULL)
+			task->vtab->free(task);
+		else
+			stailq_add_entry(&memtx->gc_to_free, task, link);
 	}
 }
 
@@ -1041,6 +1066,7 @@ memtx_engine_new(const char *snap_dirname, bool force_recovery,
 	}
 
 	stailq_create(&memtx->gc_queue);
+	stailq_create(&memtx->gc_to_free);
 	memtx->gc_fiber = fiber_new("memtx.gc", memtx_engine_gc_f);
 	if (memtx->gc_fiber == NULL)
 		goto fail;
diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h
index ccb51678..fcf595e7 100644
--- a/src/box/memtx_engine.h
+++ b/src/box/memtx_engine.h
@@ -155,6 +155,11 @@ struct memtx_engine {
 	 * memtx_gc_task::link.
 	 */
 	struct stailq gc_queue;
+	/**
+	 * List of tasks awaiting to be freed once checkpointing
+	 * is complete, linked by memtx_gc_task::link.
+	 */
+	struct stailq gc_to_free;
 };
 
 struct memtx_gc_task;
diff --git a/test/app-tap/snapshot.test.lua b/test/app-tap/snapshot.test.lua
index f8ad1631..d86f32fe 100755
--- a/test/app-tap/snapshot.test.lua
+++ b/test/app-tap/snapshot.test.lua
@@ -86,7 +86,8 @@ local i2 = s2:create_index('test', { type = 'tree', parts = {1, 'unsigned'} })
 
 for i = 1,1000 do s1:insert{i, i, i} end
 
-fiber.create(function () box.snapshot() end)
+local snap_chan = fiber.channel()
+fiber.create(function () box.snapshot() snap_chan:put(true) end)
 
 fiber.sleep(0)
 
@@ -96,6 +97,8 @@ s2:update({1}, {{'+', 2, 2}})
 s1:drop()
 s2:drop()
 
+snap_chan:get()
+
 test:ok(true, "gh-1185: no crash in matras_touch")
 
 -------------------------------------------------------------------------------
diff --git a/test/engine/errinj_ddl.result b/test/engine/errinj_ddl.result
index 5f404300..a28223a6 100644
--- a/test/engine/errinj_ddl.result
+++ b/test/engine/errinj_ddl.result
@@ -350,3 +350,57 @@ s:count() -- 200
 s:drop()
 ---
 ...
+--
+-- Dropping and recreating a space while it is being written
+-- to a checkpoint.
+--
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+_ = s:create_index('pk')
+---
+...
+for i = 1, 5 do s:insert{i, i} end
+---
+...
+errinj.set("ERRINJ_SNAP_WRITE_DELAY", true)
+---
+- ok
+...
+ch = fiber.channel(1)
+---
+...
+_ = fiber.create(function() box.snapshot() ch:put(true) end)
+---
+...
+s:drop()
+---
+...
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+_ = s:create_index('pk')
+---
+...
+for i = 1, 5 do s:insert{i, i * 10} end
+---
+...
+errinj.set("ERRINJ_SNAP_WRITE_DELAY", false)
+---
+- ok
+...
+ch:get()
+---
+- true
+...
+s:select()
+---
+- - [1, 10]
+  - [2, 20]
+  - [3, 30]
+  - [4, 40]
+  - [5, 50]
+...
+s:drop()
+---
+...
diff --git a/test/engine/errinj_ddl.test.lua b/test/engine/errinj_ddl.test.lua
index a382daa4..c40eae93 100644
--- a/test/engine/errinj_ddl.test.lua
+++ b/test/engine/errinj_ddl.test.lua
@@ -164,3 +164,27 @@ ch:get()
 
 s:count() -- 200
 s:drop()
+
+--
+-- Dropping and recreating a space while it is being written
+-- to a checkpoint.
+--
+s = box.schema.space.create('test', {engine = engine})
+_ = s:create_index('pk')
+for i = 1, 5 do s:insert{i, i} end
+
+errinj.set("ERRINJ_SNAP_WRITE_DELAY", true)
+ch = fiber.channel(1)
+_ = fiber.create(function() box.snapshot() ch:put(true) end)
+
+s:drop()
+
+s = box.schema.space.create('test', {engine = engine})
+_ = s:create_index('pk')
+for i = 1, 5 do s:insert{i, i * 10} end
+
+errinj.set("ERRINJ_SNAP_WRITE_DELAY", false)
+ch:get()
+
+s:select()
+s:drop()
-- 
2.11.0




More information about the Tarantool-patches mailing list