* [tarantool-patches] [PATCH] Do not latch schema while checkpoint
@ 2019-05-23 8:19 Georgy Kirichenko
0 siblings, 0 replies; only message in thread
From: Georgy Kirichenko @ 2019-05-23 8:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Increase a primary key indexes reference count until the end of a
checkpoint instead of schema latching. Attached test is not valid in
case of vinyl engine because of vy_log which is not able to stop
recovery on recoverable instance vclock. This allows to do not to stop
replication on ddl if there is a running checkpoint and enables
checkpoint during an ddl operation.
Prerequisites: #1254
---
Issue: https://github.com/tarantool/tarantool/issues/1254
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1254-checkpoint-does-not-latch-schema
src/box/gc.c | 7 --
src/box/index.cc | 2 +
src/box/index.h | 17 ++++
src/box/memtx_engine.c | 21 +++--
src/box/space.c | 4 +-
test/engine/checkpoint.result | 157 ++++++++++++++++++++++++++++++++
test/engine/checkpoint.test.lua | 44 +++++++++
test/engine/engine.cfg | 3 +
8 files changed, 239 insertions(+), 16 deletions(-)
create mode 100644 test/engine/checkpoint.result
create mode 100644 test/engine/checkpoint.test.lua
diff --git a/src/box/gc.c b/src/box/gc.c
index 5639edd81..a5e3c20d0 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -368,12 +368,6 @@ gc_do_checkpoint(void)
assert(!gc.checkpoint_is_in_progress);
gc.checkpoint_is_in_progress = true;
- /*
- * We don't support DDL operations while making a checkpoint.
- * Lock them out.
- */
- latch_lock(&schema_lock);
-
/*
* Rotate WAL and call engine callbacks to create a checkpoint
* on disk for each registered engine.
@@ -398,7 +392,6 @@ out:
if (rc != 0)
engine_abort_checkpoint();
- latch_unlock(&schema_lock);
gc.checkpoint_is_in_progress = false;
return rc;
}
diff --git a/src/box/index.cc b/src/box/index.cc
index 4a444e5d0..a440330f3 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -487,6 +487,7 @@ index_create(struct index *index, struct engine *engine,
index->engine = engine;
index->def = def;
index->space_cache_version = space_cache_version;
+ index->ref_count = 1;
return 0;
}
@@ -498,6 +499,7 @@ index_delete(struct index *index)
* engine might still need it, e.g. to check if
* the index is primary or secondary.
*/
+ assert(index->ref_count == 0);
struct index_def *def = index->def;
index->vtab->destroy(index);
index_def_delete(def);
diff --git a/src/box/index.h b/src/box/index.h
index 97d600c96..7cda2ebd7 100644
--- a/src/box/index.h
+++ b/src/box/index.h
@@ -456,6 +456,8 @@ struct index {
struct index_def *def;
/* Space cache version at the time of construction. */
uint32_t space_cache_version;
+ /* Index refernece count. */
+ uint32_t ref_count;
};
/**
@@ -498,10 +500,25 @@ int
index_create(struct index *index, struct engine *engine,
const struct index_vtab *vtab, struct index_def *def);
+static inline void
+index_ref(struct index *index) {
+ assert(index->ref_count > 0);
+ ++index->ref_count;
+}
+
/** Free an index instance. */
void
index_delete(struct index *index);
+static inline void
+index_unref(struct index *index)
+{
+ assert(index->ref_count > 0);
+ --index->ref_count;
+ if (index->ref_count == 0)
+ index_delete(index);
+}
+
/** Build this index based on the contents of another index. */
int
index_build(struct index *index, struct index *pk);
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index f4312484a..94f5038fb 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -530,20 +530,20 @@ checkpoint_write_row(struct xlog *l, struct xrow_header *row)
}
static int
-checkpoint_write_tuple(struct xlog *l, struct space *space,
+checkpoint_write_tuple(struct xlog *l, uint32_t space_id, uint32_t group_id,
const char *data, uint32_t size)
{
struct request_replace_body body;
body.m_body = 0x82; /* map of two elements. */
body.k_space_id = IPROTO_SPACE_ID;
body.m_space_id = 0xce; /* uint32 */
- body.v_space_id = mp_bswap_u32(space_id(space));
+ body.v_space_id = mp_bswap_u32(space_id);
body.k_tuple = IPROTO_TUPLE;
struct xrow_header row;
memset(&row, 0, sizeof(struct xrow_header));
row.type = IPROTO_INSERT;
- row.group_id = space_group_id(space);
+ row.group_id = group_id;
row.bodycnt = 2;
row.body[0].iov_base = &body;
@@ -554,7 +554,9 @@ checkpoint_write_tuple(struct xlog *l, struct space *space,
}
struct checkpoint_entry {
- struct space *space;
+ uint32_t space_id;
+ uint32_t group_id;
+ struct index *index;
struct snapshot_iterator *iterator;
struct rlist link;
};
@@ -604,6 +606,7 @@ checkpoint_delete(struct checkpoint *ckpt)
struct checkpoint_entry *entry, *tmp;
rlist_foreach_entry_safe(entry, &ckpt->entries, link, tmp) {
entry->iterator->free(entry->iterator);
+ index_unref(entry->index);
free(entry);
}
xdir_destroy(&ckpt->dir);
@@ -644,7 +647,10 @@ checkpoint_add_space(struct space *sp, void *data)
}
rlist_add_tail_entry(&ckpt->entries, entry, link);
- entry->space = sp;
+ entry->space_id = space_id(sp);
+ entry->group_id = space_group_id(sp);
+ entry->index = pk;
+ index_ref(entry->index);
entry->iterator = index_create_snapshot_iterator(pk);
if (entry->iterator == NULL)
return -1;
@@ -679,8 +685,9 @@ checkpoint_f(va_list ap)
struct snapshot_iterator *it = entry->iterator;
for (data = it->next(it, &size); data != NULL;
data = it->next(it, &size)) {
- if (checkpoint_write_tuple(&snap, entry->space,
- data, size) != 0) {
+ if (checkpoint_write_tuple(&snap, entry->space_id,
+ entry->group_id,
+ data, size) != 0) {
xlog_close(&snap, false);
return -1;
}
diff --git a/src/box/space.c b/src/box/space.c
index 243e7da2f..d3c13fff0 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -171,7 +171,7 @@ fail_free_indexes:
for (uint32_t i = 0; i <= index_id_max; i++) {
struct index *index = space->index_map[i];
if (index != NULL)
- index_delete(index);
+ index_unref(index);
}
fail:
free(space->index_map);
@@ -209,7 +209,7 @@ space_delete(struct space *space)
for (uint32_t j = 0; j <= space->index_id_max; j++) {
struct index *index = space->index_map[j];
if (index != NULL)
- index_delete(index);
+ index_unref(index);
}
free(space->index_map);
if (space->format != NULL)
diff --git a/test/engine/checkpoint.result b/test/engine/checkpoint.result
new file mode 100644
index 000000000..c02b8944d
--- /dev/null
+++ b/test/engine/checkpoint.result
@@ -0,0 +1,157 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+-- test ddl while snapshoting
+test_run:cmd('create server ddl with script = "engine/ddl.lua"')
+---
+- true
+...
+test_run:cmd("start server ddl")
+---
+- true
+...
+test_run:cmd("switch ddl")
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+_ = box.schema.space.create('ddl', {engine = engine}):create_index('pk')
+---
+...
+for i = 1, 20 do box.space.ddl:replace({i}) end
+---
+...
+errinj = box.error.injection
+---
+...
+errinj.set('ERRINJ_SNAP_WRITE_ROW_TIMEOUT', 0.01)
+---
+- ok
+...
+ch = fiber.channel(1)
+---
+...
+_ = fiber.create(function () box.snapshot() ch:put(true) end)
+---
+...
+-- snapshot was started, remember the current xlog
+fio = require('fio')
+---
+...
+xlog1 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.xlog", box.info.lsn))
+---
+...
+vylog1 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.vylog", box.info.lsn))
+---
+...
+box.space.ddl:truncate()
+---
+...
+errinj.set('ERRINJ_SNAP_WRITE_ROW_TIMEOUT', 0.00)
+---
+- ok
+...
+ch:get()
+---
+- true
+...
+box.space.ddl:select()
+---
+- []
+...
+xlog2 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.xlog", box.info.lsn))
+---
+...
+vylog2 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.vylog", box.info.lsn))
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+xlog1 = test_run:eval('ddl', 'return xlog1')[1]
+---
+...
+xlog2 = test_run:eval('ddl', 'return xlog2')[1]
+---
+...
+vylog1 = test_run:eval('ddl', 'return vylog1')[1]
+---
+...
+vylog2 = test_run:eval('ddl', 'return vylog2')[1]
+---
+...
+test_run:cmd("stop server ddl")
+---
+- true
+...
+-- delete last wals in order to check truncated data remains in a snapshot
+fio = require('fio')
+---
+...
+fio.unlink(xlog1)
+---
+- true
+...
+fio.unlink(xlog2)
+---
+- true
+...
+_ = fio.unlink(vylog1)
+---
+...
+_ = fio.unlink(vylog2)
+---
+...
+-- start server ddl from snapshot only
+test_run:cmd('start server ddl')
+---
+- true
+...
+test_run:cmd("switch ddl")
+---
+- true
+...
+box.space.ddl:select()
+---
+- - [1]
+ - [2]
+ - [3]
+ - [4]
+ - [5]
+ - [6]
+ - [7]
+ - [8]
+ - [9]
+ - [10]
+ - [11]
+ - [12]
+ - [13]
+ - [14]
+ - [15]
+ - [16]
+ - [17]
+ - [18]
+ - [19]
+ - [20]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server ddl")
+---
+- true
+...
+test_run:cmd('cleanup server ddl')
+---
+- true
+...
diff --git a/test/engine/checkpoint.test.lua b/test/engine/checkpoint.test.lua
new file mode 100644
index 000000000..53124f524
--- /dev/null
+++ b/test/engine/checkpoint.test.lua
@@ -0,0 +1,44 @@
+env = require('test_run')
+test_run = env.new()
+
+-- test ddl while snapshoting
+test_run:cmd('create server ddl with script = "engine/ddl.lua"')
+test_run:cmd("start server ddl")
+test_run:cmd("switch ddl")
+fiber = require('fiber')
+engine = test_run:get_cfg('engine')
+_ = box.schema.space.create('ddl', {engine = engine}):create_index('pk')
+for i = 1, 20 do box.space.ddl:replace({i}) end
+errinj = box.error.injection
+errinj.set('ERRINJ_SNAP_WRITE_ROW_TIMEOUT', 0.01)
+ch = fiber.channel(1)
+_ = fiber.create(function () box.snapshot() ch:put(true) end)
+-- snapshot was started, remember the current xlog
+fio = require('fio')
+xlog1 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.xlog", box.info.lsn))
+vylog1 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.vylog", box.info.lsn))
+box.space.ddl:truncate()
+errinj.set('ERRINJ_SNAP_WRITE_ROW_TIMEOUT', 0.00)
+ch:get()
+box.space.ddl:select()
+xlog2 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.xlog", box.info.lsn))
+vylog2 = fio.pathjoin(box.cfg.wal_dir, string.format("%020d.vylog", box.info.lsn))
+test_run:cmd("switch default")
+xlog1 = test_run:eval('ddl', 'return xlog1')[1]
+xlog2 = test_run:eval('ddl', 'return xlog2')[1]
+vylog1 = test_run:eval('ddl', 'return vylog1')[1]
+vylog2 = test_run:eval('ddl', 'return vylog2')[1]
+test_run:cmd("stop server ddl")
+-- delete last wals in order to check truncated data remains in a snapshot
+fio = require('fio')
+fio.unlink(xlog1)
+fio.unlink(xlog2)
+_ = fio.unlink(vylog1)
+_ = fio.unlink(vylog2)
+-- start server ddl from snapshot only
+test_run:cmd('start server ddl')
+test_run:cmd("switch ddl")
+box.space.ddl:select()
+test_run:cmd("switch default")
+test_run:cmd("stop server ddl")
+test_run:cmd('cleanup server ddl')
diff --git a/test/engine/engine.cfg b/test/engine/engine.cfg
index 9f07629b4..d71a51afc 100644
--- a/test/engine/engine.cfg
+++ b/test/engine/engine.cfg
@@ -1,4 +1,7 @@
{
+ "checkpoint.test.lua": {
+ "memtx": {"engine": "memtx"}
+ },
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
--
2.21.0
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2019-05-23 8:21 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-05-23 8:19 [tarantool-patches] [PATCH] Do not latch schema while checkpoint Georgy Kirichenko
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox