[tarantool-patches] [PATCH] Do not latch schema while checkpoint

Georgy Kirichenko georgy at tarantool.org
Thu May 23 11:19:48 MSK 2019


Increase a primary key indexes reference count until the end of a
checkpoint instead of schema latching. Attached test is not valid in
case of vinyl engine because of vy_log which is not able to stop
recovery on recoverable instance vclock. This allows to do not to stop
replication on ddl if there is a running checkpoint and enables
checkpoint during an ddl operation.

Prerequisites: #1254
---
Issue: https://github.com/tarantool/tarantool/issues/1254
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1254-checkpoint-does-not-latch-schema

 src/box/gc.c                    |   7 --
 src/box/index.cc                |   2 +
 src/box/index.h                 |  17 ++++
 src/box/memtx_engine.c          |  21 +++--
 src/box/space.c                 |   4 +-
 test/engine/checkpoint.result   | 157 ++++++++++++++++++++++++++++++++
 test/engine/checkpoint.test.lua |  44 +++++++++
 test/engine/engine.cfg          |   3 +
 8 files changed, 239 insertions(+), 16 deletions(-)
 create mode 100644 test/engine/checkpoint.result
 create mode 100644 test/engine/checkpoint.test.lua

diff --git a/src/box/gc.c b/src/box/gc.c
index 5639edd81..a5e3c20d0 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -368,12 +368,6 @@ gc_do_checkpoint(void)
 	assert(!gc.checkpoint_is_in_progress);
 	gc.checkpoint_is_in_progress = true;
 
-	/*
-	 * We don't support DDL operations while making a checkpoint.
-	 * Lock them out.
-	 */
-	latch_lock(&schema_lock);
-
 	/*
 	 * Rotate WAL and call engine callbacks to create a checkpoint
 	 * on disk for each registered engine.
@@ -398,7 +392,6 @@ out:
 	if (rc != 0)
 		engine_abort_checkpoint();
 
-	latch_unlock(&schema_lock);
 	gc.checkpoint_is_in_progress = false;
 	return rc;
 }
diff --git a/src/box/index.cc b/src/box/index.cc
index 4a444e5d0..a440330f3 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -487,6 +487,7 @@ index_create(struct index *index, struct engine *engine,
 	index->engine = engine;
 	index->def = def;
 	index->space_cache_version = space_cache_version;
+	index->ref_count = 1;
 	return 0;
 }
 
@@ -498,6 +499,7 @@ index_delete(struct index *index)
 	 * engine might still need it, e.g. to check if
 	 * the index is primary or secondary.
 	 */
+	assert(index->ref_count == 0);
 	struct index_def *def = index->def;
 	index->vtab->destroy(index);
 	index_def_delete(def);
diff --git a/src/box/index.h b/src/box/index.h
index 97d600c96..7cda2ebd7 100644
--- a/src/box/index.h
+++ b/src/box/index.h
@@ -456,6 +456,8 @@ struct index {
 	struct index_def *def;
 	/* Space cache version at the time of construction. */
 	uint32_t space_cache_version;
+	/* Index refernece count. */
+	uint32_t ref_count;
 };
 
 /**
@@ -498,10 +500,25 @@ int
 index_create(struct index *index, struct engine *engine,
 	     const struct index_vtab *vtab, struct index_def *def);
 
+static inline void
+index_ref(struct index *index) {
+	assert(index->ref_count > 0);
+	++index->ref_count;
+}
+
 /** Free an index instance. */
 void
 index_delete(struct index *index);
 
+static inline void
+index_unref(struct index *index)
+{
+	assert(index->ref_count > 0);
+	--index->ref_count;
+	if (index->ref_count == 0)
+		index_delete(index);
+}
+
 /** Build this index based on the contents of another index. */
 int
 index_build(struct index *index, struct index *pk);
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index f4312484a..94f5038fb 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -530,20 +530,20 @@ checkpoint_write_row(struct xlog *l, struct xrow_header *row)
 }
 
 static int
-checkpoint_write_tuple(struct xlog *l, struct space *space,
+checkpoint_write_tuple(struct xlog *l, uint32_t space_id, uint32_t group_id,
 		       const char *data, uint32_t size)
 {
 	struct request_replace_body body;
 	body.m_body = 0x82; /* map of two elements. */
 	body.k_space_id = IPROTO_SPACE_ID;
 	body.m_space_id = 0xce; /* uint32 */
-	body.v_space_id = mp_bswap_u32(space_id(space));
+	body.v_space_id = mp_bswap_u32(space_id);
 	body.k_tuple = IPROTO_TUPLE;
 
 	struct xrow_header row;
 	memset(&row, 0, sizeof(struct xrow_header));
 	row.type = IPROTO_INSERT;
-	row.group_id = space_group_id(space);
+	row.group_id = group_id;
 
 	row.bodycnt = 2;
 	row.body[0].iov_base = &body;
@@ -554,7 +554,9 @@ checkpoint_write_tuple(struct xlog *l, struct space *space,
 }
 
 struct checkpoint_entry {
-	struct space *space;
+	uint32_t space_id;
+	uint32_t group_id;
+	struct index *index;
 	struct snapshot_iterator *iterator;
 	struct rlist link;
 };
@@ -604,6 +606,7 @@ checkpoint_delete(struct checkpoint *ckpt)
 	struct checkpoint_entry *entry, *tmp;
 	rlist_foreach_entry_safe(entry, &ckpt->entries, link, tmp) {
 		entry->iterator->free(entry->iterator);
+		index_unref(entry->index);
 		free(entry);
 	}
 	xdir_destroy(&ckpt->dir);
@@ -644,7 +647,10 @@ checkpoint_add_space(struct space *sp, void *data)
 	}
 	rlist_add_tail_entry(&ckpt->entries, entry, link);
 
-	entry->space = sp;
+	entry->space_id = space_id(sp);
+	entry->group_id = space_group_id(sp);
+	entry->index = pk;
+	index_ref(entry->index);
 	entry->iterator = index_create_snapshot_iterator(pk);
 	if (entry->iterator == NULL)
 		return -1;
@@ -679,8 +685,9 @@ checkpoint_f(va_list ap)
 		struct snapshot_iterator *it = entry->iterator;
 		for (data = it->next(it, &size); data != NULL;
 		     data = it->next(it, &size)) {
-			if (checkpoint_write_tuple(&snap, entry->space,
-					data, size) != 0) {
+			if (checkpoint_write_tuple(&snap, entry->space_id,
+						   entry->group_id,
+						   data, size) != 0) {
 				xlog_close(&snap, false);
 				return -1;
 			}
diff --git a/src/box/space.c b/src/box/space.c
index 243e7da2f..d3c13fff0 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -171,7 +171,7 @@ fail_free_indexes:
 	for (uint32_t i = 0; i <= index_id_max; i++) {
 		struct index *index = space->index_map[i];
 		if (index != NULL)
-			index_delete(index);
+			index_unref(index);
 	}
 fail:
 	free(space->index_map);
@@ -209,7 +209,7 @@ space_delete(struct space *space)
 	for (uint32_t j = 0; j <= space->index_id_max; j++) {
 		struct index *index = space->index_map[j];
 		if (index != NULL)
-			index_delete(index);
+			index_unref(index);
 	}
 	free(space->index_map);
 	if (space->format != NULL)
diff --git a/test/engine/checkpoint.result b/test/engine/checkpoint.result
new file mode 100644
index 000000000..c02b8944d
--- /dev/null
+++ b/test/engine/checkpoint.result
@@ -0,0 +1,157 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+-- test ddl while snapshoting
+test_run:cmd('create server ddl with script = "engine/ddl.lua"')
+---
+- true
+...
+test_run:cmd("start server ddl")
+---
+- true
+...
+test_run:cmd("switch ddl")
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+_ = box.schema.space.create('ddl', {engine = engine}):create_index('pk')
+---
+...
+for i = 1, 20 do box.space.ddl:replace({i}) end
+---
+...
+errinj = box.error.injection
+---
+...
+errinj.set('ERRINJ_SNAP_WRITE_ROW_TIMEOUT', 0.01)
+---
+- ok
+...
+ch = fiber.channel(1)
+---
+...
+_ = fiber.create(function () box.snapshot() ch:put(true) end)
+---
+...
+-- snapshot was started, remember the current xlog
+fio = require('fio')
+---
+...
+xlog1 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.xlog", box.info.lsn))
+---
+...
+vylog1 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.vylog", box.info.lsn))
+---
+...
+box.space.ddl:truncate()
+---
+...
+errinj.set('ERRINJ_SNAP_WRITE_ROW_TIMEOUT', 0.00)
+---
+- ok
+...
+ch:get()
+---
+- true
+...
+box.space.ddl:select()
+---
+- []
+...
+xlog2 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.xlog", box.info.lsn))
+---
+...
+vylog2 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.vylog", box.info.lsn))
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+xlog1 = test_run:eval('ddl', 'return xlog1')[1]
+---
+...
+xlog2 = test_run:eval('ddl', 'return xlog2')[1]
+---
+...
+vylog1 = test_run:eval('ddl', 'return vylog1')[1]
+---
+...
+vylog2 = test_run:eval('ddl', 'return vylog2')[1]
+---
+...
+test_run:cmd("stop server ddl")
+---
+- true
+...
+-- delete last wals in order to check truncated data remains in a snapshot
+fio = require('fio')
+---
+...
+fio.unlink(xlog1)
+---
+- true
+...
+fio.unlink(xlog2)
+---
+- true
+...
+_ = fio.unlink(vylog1)
+---
+...
+_ = fio.unlink(vylog2)
+---
+...
+-- start server ddl from snapshot only
+test_run:cmd('start server ddl')
+---
+- true
+...
+test_run:cmd("switch ddl")
+---
+- true
+...
+box.space.ddl:select()
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [5]
+  - [6]
+  - [7]
+  - [8]
+  - [9]
+  - [10]
+  - [11]
+  - [12]
+  - [13]
+  - [14]
+  - [15]
+  - [16]
+  - [17]
+  - [18]
+  - [19]
+  - [20]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server ddl")
+---
+- true
+...
+test_run:cmd('cleanup server ddl')
+---
+- true
+...
diff --git a/test/engine/checkpoint.test.lua b/test/engine/checkpoint.test.lua
new file mode 100644
index 000000000..53124f524
--- /dev/null
+++ b/test/engine/checkpoint.test.lua
@@ -0,0 +1,44 @@
+env = require('test_run')
+test_run = env.new()
+
+-- test ddl while snapshoting
+test_run:cmd('create server ddl with script = "engine/ddl.lua"')
+test_run:cmd("start server ddl")
+test_run:cmd("switch ddl")
+fiber = require('fiber')
+engine = test_run:get_cfg('engine')
+_ = box.schema.space.create('ddl', {engine = engine}):create_index('pk')
+for i = 1, 20 do box.space.ddl:replace({i}) end
+errinj = box.error.injection
+errinj.set('ERRINJ_SNAP_WRITE_ROW_TIMEOUT', 0.01)
+ch = fiber.channel(1)
+_ = fiber.create(function () box.snapshot() ch:put(true) end)
+-- snapshot was started, remember the current xlog
+fio = require('fio')
+xlog1 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.xlog", box.info.lsn))
+vylog1 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.vylog", box.info.lsn))
+box.space.ddl:truncate()
+errinj.set('ERRINJ_SNAP_WRITE_ROW_TIMEOUT', 0.00)
+ch:get()
+box.space.ddl:select()
+xlog2 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.xlog", box.info.lsn))
+vylog2 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.vylog", box.info.lsn))
+test_run:cmd("switch default")
+xlog1 = test_run:eval('ddl', 'return xlog1')[1]
+xlog2 = test_run:eval('ddl', 'return xlog2')[1]
+vylog1 = test_run:eval('ddl', 'return vylog1')[1]
+vylog2 = test_run:eval('ddl', 'return vylog2')[1]
+test_run:cmd("stop server ddl")
+-- delete last wals in order to check truncated data remains in a snapshot
+fio = require('fio')
+fio.unlink(xlog1)
+fio.unlink(xlog2)
+_ = fio.unlink(vylog1)
+_ = fio.unlink(vylog2)
+-- start server ddl from snapshot only
+test_run:cmd('start server ddl')
+test_run:cmd("switch ddl")
+box.space.ddl:select()
+test_run:cmd("switch default")
+test_run:cmd("stop server ddl")
+test_run:cmd('cleanup server ddl')
diff --git a/test/engine/engine.cfg b/test/engine/engine.cfg
index 9f07629b4..d71a51afc 100644
--- a/test/engine/engine.cfg
+++ b/test/engine/engine.cfg
@@ -1,4 +1,7 @@
 {
+    "checkpoint.test.lua": {
+        "memtx": {"engine": "memtx"}
+    },
     "*": {
         "memtx": {"engine": "memtx"}, 
         "vinyl": {"engine": "vinyl"}
-- 
2.21.0





More information about the Tarantool-patches mailing list