* [PATCH 00/13] Join replicas off the current read view
@ 2019-08-10 10:03 Vladimir Davydov
2019-08-10 10:03 ` [PATCH 01/13] vinyl: embed engine in vy_env Vladimir Davydov
` (12 more replies)
0 siblings, 13 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
Currently, we join replicas off the last checkpoint. As a result, we
must keep all files corresponding to the last checkpoint. This means
that we must always create a memtx snapshot file on initial call to
box.cfg() even though it is virtually the same for all instances.
Besides, we must rotate the vylog file synchronously with snapshot
creation, otherwise we wouldn't be able to pull all vinyl files
corresponding to the last checkpoint. This interconnection between
vylog and xlog makes the code difficult to maintain.
Actually, nothing prevents us from relaying the current read view
instead of the last checkpoint on initial join, as both memtx and
vinyl support a consistent read view. This patch does the trick.
This is a step towards making vylog independent of checkpointing
and WAL.
https://github.com/tarantool/tarantool/issues/1271
https://github.com/tarantool/tarantool/commits/dv/gh-1271-rework-replica-join
Vladimir Davydov (13):
vinyl: embed engine in vy_env
vinyl: embed index in vy_lsm
vinyl: move reference counting from vy_lsm to index
vinyl: don't pin index for iterator lifetime
vinyl: don't exempt dropped indexes from dump and compaction
memtx: don't store pointers to index internals in iterator
memtx: use ref counting to pin indexes for snapshot
memtx: allow snapshot iterator to fail
memtx: enter small delayed free mode from snapshot iterator
wal: make wal_sync fail on write error
xrow: factor out helper for setting REPLACE request body
test: disable replication/on_schema_init
relay: join new replicas off read view
src/box/blackhole.c | 3 +-
src/box/box.cc | 53 +-
src/box/engine.c | 21 -
src/box/engine.h | 27 +-
src/box/index.cc | 2 +
src/box/index.h | 29 +-
src/box/iproto_constants.h | 11 +
src/box/lua/info.c | 3 +-
src/box/lua/stat.c | 3 +-
src/box/memtx_bitset.c | 10 +-
src/box/memtx_engine.c | 152 +----
src/box/memtx_engine.h | 28 +-
src/box/memtx_hash.c | 52 +-
src/box/memtx_tree.c | 100 ++--
src/box/relay.cc | 170 +++++-
src/box/relay.h | 2 +-
src/box/sequence.c | 24 +-
src/box/space.c | 4 +-
src/box/sysview.c | 3 +-
src/box/vinyl.c | 596 ++++++--------------
src/box/vinyl.h | 26 +-
src/box/vy_lsm.c | 6 +-
src/box/vy_lsm.h | 22 +-
src/box/vy_run.c | 6 -
src/box/vy_scheduler.c | 95 ++--
src/box/vy_scheduler.h | 10 +-
src/box/vy_tx.c | 10 +-
src/box/vy_tx.h | 8 +
src/box/wal.c | 29 +-
src/box/wal.h | 5 +-
src/lib/core/errinj.h | 2 +-
test/box/errinj.result | 38 +-
test/replication-py/cluster.result | 13 -
test/replication-py/cluster.test.py | 25 -
test/replication/join_without_snap.result | 88 +++
test/replication/join_without_snap.test.lua | 32 ++
test/replication/suite.cfg | 1 +
test/replication/suite.ini | 2 +-
test/unit/vy_point_lookup.c | 3 +-
test/vinyl/errinj.result | 4 +-
test/vinyl/errinj.test.lua | 4 +-
test/xlog/panic_on_broken_lsn.result | 31 +-
test/xlog/panic_on_broken_lsn.test.lua | 20 +-
43 files changed, 826 insertions(+), 947 deletions(-)
create mode 100644 test/replication/join_without_snap.result
create mode 100644 test/replication/join_without_snap.test.lua
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 01/13] vinyl: embed engine in vy_env
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-12 22:14 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:09 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 02/13] vinyl: embed index in vy_lsm Vladimir Davydov
` (11 subsequent siblings)
12 siblings, 2 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
There's no point in having vinyl_engine and vinyl_index wrapper structs
to bind vy_env and vy_lsm to struct engine and index. Instead we can
simply embed engine and index in vy_env and vy_lsm. This will simplify
further development, e.g. this will allow us to move reference counting
from vy_lsm up to struct index so that it can be used in the generic
code.
---
src/box/box.cc | 20 ++++------
src/box/lua/info.c | 3 +-
src/box/lua/stat.c | 3 +-
src/box/vinyl.c | 96 +++++++++++++++++++++-------------------------
src/box/vinyl.h | 26 ++++++-------
5 files changed, 65 insertions(+), 83 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 80249919..66cd6d3a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -784,8 +784,7 @@ box_set_snap_io_rate_limit(void)
assert(memtx != NULL);
memtx_engine_set_snap_io_rate_limit(memtx,
cfg_getd("snap_io_rate_limit"));
- struct vinyl_engine *vinyl;
- vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
+ struct engine *vinyl = engine_by_name("vinyl");
assert(vinyl != NULL);
vinyl_engine_set_snap_io_rate_limit(vinyl,
cfg_getd("snap_io_rate_limit"));
@@ -816,8 +815,7 @@ box_set_too_long_threshold(void)
{
too_long_threshold = cfg_getd("too_long_threshold");
- struct vinyl_engine *vinyl;
- vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
+ struct engine *vinyl = engine_by_name("vinyl");
assert(vinyl != NULL);
vinyl_engine_set_too_long_threshold(vinyl, too_long_threshold);
}
@@ -855,8 +853,7 @@ box_set_checkpoint_wal_threshold(void)
void
box_set_vinyl_memory(void)
{
- struct vinyl_engine *vinyl;
- vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
+ struct engine *vinyl = engine_by_name("vinyl");
assert(vinyl != NULL);
vinyl_engine_set_memory_xc(vinyl,
box_check_vinyl_memory(cfg_geti64("vinyl_memory")));
@@ -865,8 +862,7 @@ box_set_vinyl_memory(void)
void
box_set_vinyl_max_tuple_size(void)
{
- struct vinyl_engine *vinyl;
- vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
+ struct engine *vinyl = engine_by_name("vinyl");
assert(vinyl != NULL);
vinyl_engine_set_max_tuple_size(vinyl,
cfg_geti("vinyl_max_tuple_size"));
@@ -875,8 +871,7 @@ box_set_vinyl_max_tuple_size(void)
void
box_set_vinyl_cache(void)
{
- struct vinyl_engine *vinyl;
- vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
+ struct engine *vinyl = engine_by_name("vinyl");
assert(vinyl != NULL);
vinyl_engine_set_cache(vinyl, cfg_geti64("vinyl_cache"));
}
@@ -884,8 +879,7 @@ box_set_vinyl_cache(void)
void
box_set_vinyl_timeout(void)
{
- struct vinyl_engine *vinyl;
- vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
+ struct engine *vinyl = engine_by_name("vinyl");
assert(vinyl != NULL);
vinyl_engine_set_timeout(vinyl, cfg_getd("vinyl_timeout"));
}
@@ -1718,7 +1712,7 @@ engine_init()
struct engine *blackhole = blackhole_engine_new_xc();
engine_register(blackhole);
- struct vinyl_engine *vinyl;
+ struct engine *vinyl;
vinyl = vinyl_engine_new_xc(cfg_gets("vinyl_dir"),
cfg_geti64("vinyl_memory"),
cfg_geti("vinyl_read_threads"),
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index d0e553b1..55382fd7 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -463,8 +463,7 @@ lbox_info_vinyl_call(struct lua_State *L)
{
struct info_handler h;
luaT_info_handler_create(&h, L);
- struct vinyl_engine *vinyl;
- vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
+ struct engine *vinyl = engine_by_name("vinyl");
assert(vinyl != NULL);
vinyl_engine_stat(vinyl, &h);
return 1;
diff --git a/src/box/lua/stat.c b/src/box/lua/stat.c
index eee2b104..29ec38b2 100644
--- a/src/box/lua/stat.c
+++ b/src/box/lua/stat.c
@@ -121,8 +121,7 @@ lbox_stat_vinyl(struct lua_State *L)
{
struct info_handler h;
luaT_info_handler_create(&h, L);
- struct vinyl_engine *vinyl;
- vinyl = (struct vinyl_engine *)engine_by_name("vinyl");
+ struct engine *vinyl = engine_by_name("vinyl");
assert(vinyl != NULL);
vinyl_engine_stat(vinyl, &h);
return 1;
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index cd009c1c..327d0c39 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -94,6 +94,7 @@ enum vy_status {
};
struct vy_env {
+ struct engine base;
/** Recovery status */
enum vy_status status;
/** TX manager */
@@ -141,19 +142,6 @@ struct vy_env {
bool force_recovery;
};
-struct vinyl_engine {
- struct engine base;
- /** Vinyl environment. */
- struct vy_env *env;
-};
-
-/** Extract vy_env from an engine object. */
-static inline struct vy_env *
-vy_env(struct engine *engine)
-{
- return ((struct vinyl_engine *)engine)->env;
-}
-
struct vinyl_index {
struct index base;
/** LSM tree that stores index data. */
@@ -210,6 +198,14 @@ static const struct index_vtab vinyl_index_vtab;
static struct trigger on_replace_vinyl_deferred_delete;
+/** Extract vy_env from an engine object. */
+static inline struct vy_env *
+vy_env(struct engine *engine)
+{
+ assert(engine->vtab == &vinyl_engine_vtab);
+ return (struct vy_env *)engine;
+}
+
/**
* A quick intro into Vinyl cosmology and file format
* --------------------------------------------------
@@ -327,9 +323,9 @@ vy_info_append_disk(struct vy_env *env, struct info_handler *h)
}
void
-vinyl_engine_stat(struct vinyl_engine *vinyl, struct info_handler *h)
+vinyl_engine_stat(struct engine *engine, struct info_handler *h)
{
- struct vy_env *env = vinyl->env;
+ struct vy_env *env = vy_env(engine);
info_begin(h);
vy_info_append_tx(env, h);
@@ -692,14 +688,13 @@ static struct index *
vinyl_space_create_index(struct space *space, struct index_def *index_def)
{
assert(index_def->type == TREE);
- struct vinyl_engine *vinyl = (struct vinyl_engine *)space->engine;
struct vinyl_index *index = calloc(1, sizeof(*index));
if (index == NULL) {
diag_set(OutOfMemory, sizeof(*index),
"malloc", "struct vinyl_index");
return NULL;
}
- struct vy_env *env = vinyl->env;
+ struct vy_env *env = vy_env(space->engine);
struct vy_lsm *pk = NULL;
if (index_def->iid > 0) {
pk = vy_lsm(space_index(space, 0));
@@ -712,7 +707,7 @@ vinyl_space_create_index(struct space *space, struct index_def *index_def)
free(index);
return NULL;
}
- if (index_create(&index->base, (struct engine *)vinyl,
+ if (index_create(&index->base, &env->base,
&vinyl_index_vtab, index_def) != 0) {
vy_lsm_delete(lsm);
free(index);
@@ -2689,82 +2684,77 @@ vy_env_complete_recovery(struct vy_env *env)
vy_regulator_start(&env->regulator);
}
-struct vinyl_engine *
+struct engine *
vinyl_engine_new(const char *dir, size_t memory,
int read_threads, int write_threads, bool force_recovery)
{
- struct vinyl_engine *vinyl = calloc(1, sizeof(*vinyl));
- if (vinyl == NULL) {
- diag_set(OutOfMemory, sizeof(*vinyl),
- "malloc", "struct vinyl_engine");
+ struct vy_env *env = vy_env_new(dir, memory, read_threads,
+ write_threads, force_recovery);
+ if (env == NULL)
return NULL;
- }
- vinyl->env = vy_env_new(dir, memory, read_threads,
- write_threads, force_recovery);
- if (vinyl->env == NULL) {
- free(vinyl);
- return NULL;
- }
-
- vinyl->base.vtab = &vinyl_engine_vtab;
- vinyl->base.name = "vinyl";
- return vinyl;
+ env->base.vtab = &vinyl_engine_vtab;
+ env->base.name = "vinyl";
+ return &env->base;
}
static void
vinyl_engine_shutdown(struct engine *engine)
{
- struct vinyl_engine *vinyl = (struct vinyl_engine *)engine;
- vy_env_delete(vinyl->env);
- free(vinyl);
+ struct vy_env *env = vy_env(engine);
+ vy_env_delete(env);
}
void
-vinyl_engine_set_cache(struct vinyl_engine *vinyl, size_t quota)
+vinyl_engine_set_cache(struct engine *engine, size_t quota)
{
- vy_cache_env_set_quota(&vinyl->env->cache_env, quota);
+ struct vy_env *env = vy_env(engine);
+ vy_cache_env_set_quota(&env->cache_env, quota);
}
int
-vinyl_engine_set_memory(struct vinyl_engine *vinyl, size_t size)
+vinyl_engine_set_memory(struct engine *engine, size_t size)
{
- if (size < vinyl->env->quota.limit) {
+ struct vy_env *env = vy_env(engine);
+ if (size < env->quota.limit) {
diag_set(ClientError, ER_CFG, "vinyl_memory",
"cannot decrease memory size at runtime");
return -1;
}
- vy_regulator_set_memory_limit(&vinyl->env->regulator, size);
+ vy_regulator_set_memory_limit(&env->regulator, size);
return 0;
}
void
-vinyl_engine_set_max_tuple_size(struct vinyl_engine *vinyl, size_t max_size)
+vinyl_engine_set_max_tuple_size(struct engine *engine, size_t max_size)
{
- vinyl->env->stmt_env.max_tuple_size = max_size;
+ struct vy_env *env = vy_env(engine);
+ env->stmt_env.max_tuple_size = max_size;
}
void
-vinyl_engine_set_timeout(struct vinyl_engine *vinyl, double timeout)
+vinyl_engine_set_timeout(struct engine *engine, double timeout)
{
- vinyl->env->timeout = timeout;
+ struct vy_env *env = vy_env(engine);
+ env->timeout = timeout;
}
void
-vinyl_engine_set_too_long_threshold(struct vinyl_engine *vinyl,
+vinyl_engine_set_too_long_threshold(struct engine *engine,
double too_long_threshold)
{
- vinyl->env->quota.too_long_threshold = too_long_threshold;
- vinyl->env->lsm_env.too_long_threshold = too_long_threshold;
+ struct vy_env *env = vy_env(engine);
+ env->quota.too_long_threshold = too_long_threshold;
+ env->lsm_env.too_long_threshold = too_long_threshold;
}
void
-vinyl_engine_set_snap_io_rate_limit(struct vinyl_engine *vinyl, double limit)
+vinyl_engine_set_snap_io_rate_limit(struct engine *engine, double limit)
{
+ struct vy_env *env = vy_env(engine);
int64_t limit_in_bytes = limit * 1024 * 1024;
- vinyl->env->run_env.snap_io_rate_limit = limit_in_bytes;
- vy_regulator_reset_dump_bandwidth(&vinyl->env->regulator,
- limit_in_bytes);
+ env->run_env.snap_io_rate_limit = limit_in_bytes;
+ vy_regulator_reset_dump_bandwidth(&env->regulator, limit_in_bytes);
}
/** }}} Environment */
diff --git a/src/box/vinyl.h b/src/box/vinyl.h
index 21f99e45..2a3e8f1f 100644
--- a/src/box/vinyl.h
+++ b/src/box/vinyl.h
@@ -39,9 +39,9 @@ extern "C" {
#endif /* defined(__cplusplus) */
struct info_handler;
-struct vinyl_engine;
+struct engine;
-struct vinyl_engine *
+struct engine *
vinyl_engine_new(const char *dir, size_t memory,
int read_threads, int write_threads, bool force_recovery);
@@ -49,55 +49,55 @@ vinyl_engine_new(const char *dir, size_t memory,
* Vinyl engine statistics (box.stat.vinyl()).
*/
void
-vinyl_engine_stat(struct vinyl_engine *vinyl, struct info_handler *handler);
+vinyl_engine_stat(struct engine *engine, struct info_handler *handler);
/**
* Update vinyl cache size.
*/
void
-vinyl_engine_set_cache(struct vinyl_engine *vinyl, size_t quota);
+vinyl_engine_set_cache(struct engine *engine, size_t quota);
/**
* Update vinyl memory size.
*/
int
-vinyl_engine_set_memory(struct vinyl_engine *vinyl, size_t size);
+vinyl_engine_set_memory(struct engine *engine, size_t size);
/**
* Update max tuple size.
*/
void
-vinyl_engine_set_max_tuple_size(struct vinyl_engine *vinyl, size_t max_size);
+vinyl_engine_set_max_tuple_size(struct engine *engine, size_t max_size);
/**
* Update query timeout.
*/
void
-vinyl_engine_set_timeout(struct vinyl_engine *vinyl, double timeout);
+vinyl_engine_set_timeout(struct engine *engine, double timeout);
/**
* Update too_long_threshold.
*/
void
-vinyl_engine_set_too_long_threshold(struct vinyl_engine *vinyl,
+vinyl_engine_set_too_long_threshold(struct engine *engine,
double too_long_threshold);
/**
* Update snap_io_rate_limit.
*/
void
-vinyl_engine_set_snap_io_rate_limit(struct vinyl_engine *vinyl, double limit);
+vinyl_engine_set_snap_io_rate_limit(struct engine *engine, double limit);
#ifdef __cplusplus
} /* extern "C" */
#include "diag.h"
-static inline struct vinyl_engine *
+static inline struct engine *
vinyl_engine_new_xc(const char *dir, size_t memory,
int read_threads, int write_threads, bool force_recovery)
{
- struct vinyl_engine *vinyl;
+ struct engine *vinyl;
vinyl = vinyl_engine_new(dir, memory, read_threads,
write_threads, force_recovery);
if (vinyl == NULL)
@@ -106,9 +106,9 @@ vinyl_engine_new_xc(const char *dir, size_t memory,
}
static inline void
-vinyl_engine_set_memory_xc(struct vinyl_engine *vinyl, size_t size)
+vinyl_engine_set_memory_xc(struct engine *engine, size_t size)
{
- if (vinyl_engine_set_memory(vinyl, size) != 0)
+ if (vinyl_engine_set_memory(engine, size) != 0)
diag_raise();
}
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 02/13] vinyl: embed index in vy_lsm
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
2019-08-10 10:03 ` [PATCH 01/13] vinyl: embed engine in vy_env Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-12 22:14 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:09 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 03/13] vinyl: move reference counting from vy_lsm to index Vladimir Davydov
` (10 subsequent siblings)
12 siblings, 2 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
There's no point in having vinyl_engine and vinyl_index wrapper structs
to bind vy_env and vy_lsm to struct engine and index. Instead we can
simply embed engine and index in vy_env and vy_lsm. This will simplify
further development, e.g. this will allow us to move reference counting
from vy_lsm up to struct index so that it can be used in the generic
code.
---
src/box/vinyl.c | 39 ++++++++++++---------------------------
src/box/vy_lsm.h | 3 ++-
2 files changed, 14 insertions(+), 28 deletions(-)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 327d0c39..645e36ca 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -142,19 +142,6 @@ struct vy_env {
bool force_recovery;
};
-struct vinyl_index {
- struct index base;
- /** LSM tree that stores index data. */
- struct vy_lsm *lsm;
-};
-
-/** Extract vy_lsm from an index object. */
-struct vy_lsm *
-vy_lsm(struct index *index)
-{
- return ((struct vinyl_index *)index)->lsm;
-}
-
/** Mask passed to vy_gc(). */
enum {
/** Delete incomplete runs. */
@@ -206,6 +193,14 @@ vy_env(struct engine *engine)
return (struct vy_env *)engine;
}
+/** Extract vy_lsm from an index object. */
+struct vy_lsm *
+vy_lsm(struct index *index)
+{
+ assert(index->vtab == &vinyl_index_vtab);
+ return (struct vy_lsm *)index;
+}
+
/**
* A quick intro into Vinyl cosmology and file format
* --------------------------------------------------
@@ -688,12 +683,6 @@ static struct index *
vinyl_space_create_index(struct space *space, struct index_def *index_def)
{
assert(index_def->type == TREE);
- struct vinyl_index *index = calloc(1, sizeof(*index));
- if (index == NULL) {
- diag_set(OutOfMemory, sizeof(*index),
- "malloc", "struct vinyl_index");
- return NULL;
- }
struct vy_env *env = vy_env(space->engine);
struct vy_lsm *pk = NULL;
if (index_def->iid > 0) {
@@ -703,18 +692,15 @@ vinyl_space_create_index(struct space *space, struct index_def *index_def)
struct vy_lsm *lsm = vy_lsm_new(&env->lsm_env, &env->cache_env,
&env->mem_env, index_def, space->format,
pk, space_group_id(space));
- if (lsm == NULL) {
- free(index);
+ if (lsm == NULL)
return NULL;
- }
- if (index_create(&index->base, &env->base,
+
+ if (index_create(&lsm->base, &env->base,
&vinyl_index_vtab, index_def) != 0) {
vy_lsm_delete(lsm);
- free(index);
return NULL;
}
- index->lsm = lsm;
- return &index->base;
+ return &lsm->base;
}
static void
@@ -727,7 +713,6 @@ vinyl_index_destroy(struct index *index)
* is gone.
*/
vy_lsm_unref(lsm);
- free(index);
}
/**
diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h
index d9e4b582..327a886b 100644
--- a/src/box/vy_lsm.h
+++ b/src/box/vy_lsm.h
@@ -37,6 +37,7 @@
#include <small/mempool.h>
#include <small/rlist.h>
+#include "index.h"
#include "index_def.h"
#define HEAP_FORWARD_DECLARATION
#include "salad/heap.h"
@@ -51,7 +52,6 @@ extern "C" {
#endif /* defined(__cplusplus) */
struct histogram;
-struct index;
struct tuple;
struct tuple_format;
struct vy_lsm;
@@ -179,6 +179,7 @@ vy_lsm_env_destroy(struct vy_lsm_env *env);
* secondary key, i.e. the tuple stored. This is key_def.
*/
struct vy_lsm {
+ struct index base;
/** Common LSM tree environment. */
struct vy_lsm_env *env;
/**
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 03/13] vinyl: move reference counting from vy_lsm to index
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
2019-08-10 10:03 ` [PATCH 01/13] vinyl: embed engine in vy_env Vladimir Davydov
2019-08-10 10:03 ` [PATCH 02/13] vinyl: embed index in vy_lsm Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-12 22:16 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:09 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 04/13] vinyl: don't pin index for iterator lifetime Vladimir Davydov
` (9 subsequent siblings)
12 siblings, 2 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
Now, as vy_lsm and index are basically the same object, we can implement
reference counting right in struct index. This will allow us to prevent
an index from destruction when a space object it belongs to is freed
anywhere in the code, not just in vinyl.
---
src/box/index.cc | 2 ++
src/box/index.h | 26 ++++++++++++++++++++++++++
src/box/space.c | 4 ++--
src/box/vinyl.c | 7 +------
src/box/vy_lsm.c | 2 --
src/box/vy_lsm.h | 12 ++----------
test/unit/vy_point_lookup.c | 3 ++-
7 files changed, 35 insertions(+), 21 deletions(-)
diff --git a/src/box/index.cc b/src/box/index.cc
index 00a1b502..4e486711 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -488,6 +488,7 @@ index_create(struct index *index, struct engine *engine,
index->vtab = vtab;
index->engine = engine;
index->def = def;
+ index->refs = 1;
index->space_cache_version = space_cache_version;
return 0;
}
@@ -495,6 +496,7 @@ index_create(struct index *index, struct engine *engine,
void
index_delete(struct index *index)
{
+ assert(index->refs == 0);
/*
* Free index_def after destroying the index as
* engine might still need it, e.g. to check if
diff --git a/src/box/index.h b/src/box/index.h
index 2b1d0104..89b5e134 100644
--- a/src/box/index.h
+++ b/src/box/index.h
@@ -454,6 +454,8 @@ struct index {
struct engine *engine;
/* Description of a possibly multipart key. */
struct index_def *def;
+ /** Reference counter. */
+ int refs;
/* Space cache version at the time of construction. */
uint32_t space_cache_version;
};
@@ -502,6 +504,30 @@ index_create(struct index *index, struct engine *engine,
void
index_delete(struct index *index);
+/**
+ * Increment the reference counter of an index to prevent
+ * it from being destroyed when the space it belongs to is
+ * freed.
+ */
+static inline void
+index_ref(struct index *index)
+{
+ assert(index->refs > 0);
+ index->refs++;
+}
+
+/**
+ * Decrement the reference counter of an index.
+ * Destroy the index if it isn't used anymore.
+ */
+static inline void
+index_unref(struct index *index)
+{
+ assert(index->refs > 0);
+ if (--index->refs == 0)
+ index_delete(index);
+}
+
/** Build this index based on the contents of another index. */
int
index_build(struct index *index, struct index *pk);
diff --git a/src/box/space.c b/src/box/space.c
index 1a646899..0d1ad3b3 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -208,7 +208,7 @@ fail_free_indexes:
for (uint32_t i = 0; i <= index_id_max; i++) {
struct index *index = space->index_map[i];
if (index != NULL)
- index_delete(index);
+ index_unref(index);
}
fail:
free(space->index_map);
@@ -248,7 +248,7 @@ space_delete(struct space *space)
for (uint32_t j = 0; j <= space->index_id_max; j++) {
struct index *index = space->index_map[j];
if (index != NULL)
- index_delete(index);
+ index_unref(index);
}
free(space->index_map);
free(space->check_unique_constraint_map);
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 645e36ca..2d07e336 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -707,12 +707,7 @@ static void
vinyl_index_destroy(struct index *index)
{
struct vy_lsm *lsm = vy_lsm(index);
- /*
- * There still may be a task scheduled for this LSM tree
- * so postpone actual deletion until the last reference
- * is gone.
- */
- vy_lsm_unref(lsm);
+ vy_lsm_delete(lsm);
}
/**
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index c67db87a..8fba1792 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -188,7 +188,6 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_cache_env *cache_env,
goto fail_mem;
lsm->id = -1;
- lsm->refs = 1;
lsm->dump_lsn = -1;
lsm->commit_lsn = -1;
vy_cache_create(&lsm->cache, cache_env, cmp_def, index_def->iid == 0);
@@ -245,7 +244,6 @@ vy_range_tree_free_cb(vy_range_tree_t *t, struct vy_range *range, void *arg)
void
vy_lsm_delete(struct vy_lsm *lsm)
{
- assert(lsm->refs == 0);
assert(heap_node_is_stray(&lsm->in_dump));
assert(heap_node_is_stray(&lsm->in_compaction));
assert(vy_lsm_read_set_empty(&lsm->read_set));
diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h
index 327a886b..c8b0e297 100644
--- a/src/box/vy_lsm.h
+++ b/src/box/vy_lsm.h
@@ -182,11 +182,6 @@ struct vy_lsm {
struct index base;
/** Common LSM tree environment. */
struct vy_lsm_env *env;
- /**
- * Reference counter. Used to postpone LSM tree deletion
- * until all pending operations have completed.
- */
- int refs;
/** Unique ID of this LSM tree. */
int64_t id;
/** ID of the index this LSM tree is for. */
@@ -370,8 +365,7 @@ vy_lsm_dumps_per_compaction(struct vy_lsm *lsm)
static inline void
vy_lsm_ref(struct vy_lsm *lsm)
{
- assert(lsm->refs >= 0);
- lsm->refs++;
+ index_ref(&lsm->base);
}
/**
@@ -382,9 +376,7 @@ vy_lsm_ref(struct vy_lsm *lsm)
static inline void
vy_lsm_unref(struct vy_lsm *lsm)
{
- assert(lsm->refs > 0);
- if (--lsm->refs == 0)
- vy_lsm_delete(lsm);
+ index_unref(&lsm->base);
}
/**
diff --git a/test/unit/vy_point_lookup.c b/test/unit/vy_point_lookup.c
index 9961a0f7..fb075c57 100644
--- a/test/unit/vy_point_lookup.c
+++ b/test/unit/vy_point_lookup.c
@@ -15,6 +15,7 @@ uint32_t schema_version;
uint32_t space_cache_version;
struct space *space_by_id(uint32_t id) { return NULL; }
struct vy_lsm *vy_lsm(struct index *index) { return NULL; }
+void index_delete(struct index *index) { unreachable(); }
static int
write_run(struct vy_run *run, const char *dir_name,
@@ -303,7 +304,7 @@ test_basic()
is(results_ok, true, "select results");
is(has_errors, false, "no errors happened");
- vy_lsm_unref(pk);
+ vy_lsm_delete(pk);
index_def_delete(index_def);
tuple_format_unref(format);
vy_cache_destroy(&cache);
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 04/13] vinyl: don't pin index for iterator lifetime
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
` (2 preceding siblings ...)
2019-08-10 10:03 ` [PATCH 03/13] vinyl: move reference counting from vy_lsm to index Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 05/13] vinyl: don't exempt dropped indexes from dump and compaction Vladimir Davydov
` (8 subsequent siblings)
12 siblings, 0 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
vinyl_iterator keeps a reference to the LSM tree it was created for
until it is destroyed, which may take indefinitely long in case the
iterator is used in Lua. Actually, we don't need to keep a reference to
the index for the whole iterator lifetime, because iterator_next()
wrapper guarantees that iterator->next won't be called for a dropped
index. What we need to do is keep a reference while we are yielding on
disk read, similarly to vinyl_index_get().
Currently, pinning an index for indefinitely long is harmless, because
an LSM tree is exempted from dump/compaction as soon as it is dropped so
we just pin some memory, that's all. However, following patches are
going to enable dump/compaction for dropped but pinned indexes in order
to implement snapshot iterator so we better relax the dependency of an
iterator on an index know.
While we are at it, let's remove env and lsm members of vinyl_iterator
struct: lsm can be accessed via vy_read_iterator embedded in the struct
while env is only needed to access iterator_pool so we better store a
pointer to the pool in vinyl_iterator instead.
---
src/box/vinyl.c | 44 +++++++++++++++++++++++++++-----------------
1 file changed, 27 insertions(+), 17 deletions(-)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 2d07e336..93002fdf 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -156,10 +156,8 @@ vy_gc(struct vy_env *env, struct vy_recovery *recovery,
struct vinyl_iterator {
struct iterator base;
- /** Vinyl environment. */
- struct vy_env *env;
- /** LSM tree this iterator is for. */
- struct vy_lsm *lsm;
+ /** Memory pool the iterator was allocated from. */
+ struct mempool *pool;
/**
* Points either to tx_autocommit for autocommit mode
* or to a multi-statement transaction active when the
@@ -3727,8 +3725,6 @@ static void
vinyl_iterator_close(struct vinyl_iterator *it)
{
vy_read_iterator_close(&it->iterator);
- vy_lsm_unref(it->lsm);
- it->lsm = NULL;
tuple_unref(it->key.stmt);
it->key = vy_entry_none();
if (it->tx == &it->tx_autocommit) {
@@ -3801,10 +3797,17 @@ vinyl_iterator_primary_next(struct iterator *base, struct tuple **ret)
assert(base->next = vinyl_iterator_primary_next);
struct vinyl_iterator *it = (struct vinyl_iterator *)base;
- assert(it->lsm->index_id == 0);
+ struct vy_lsm *lsm = it->iterator.lsm;
+ assert(lsm->index_id == 0);
+ /*
+ * Make sure the LSM tree isn't deleted while we are
+ * reading from it.
+ */
+ vy_lsm_ref(lsm);
if (vinyl_iterator_check_tx(it) != 0)
goto fail;
+
struct vy_entry entry;
if (vy_read_iterator_next(&it->iterator, &entry) != 0)
goto fail;
@@ -3817,9 +3820,11 @@ vinyl_iterator_primary_next(struct iterator *base, struct tuple **ret)
tuple_bless(entry.stmt);
}
*ret = entry.stmt;
+ vy_lsm_unref(lsm);
return 0;
fail:
vinyl_iterator_close(it);
+ vy_lsm_unref(lsm);
return -1;
}
@@ -3830,9 +3835,15 @@ vinyl_iterator_secondary_next(struct iterator *base, struct tuple **ret)
assert(base->next = vinyl_iterator_secondary_next);
struct vinyl_iterator *it = (struct vinyl_iterator *)base;
- assert(it->lsm->index_id > 0);
- struct vy_entry partial, entry;
+ struct vy_lsm *lsm = it->iterator.lsm;
+ assert(lsm->index_id > 0);
+ /*
+ * Make sure the LSM tree isn't deleted while we are
+ * reading from it.
+ */
+ vy_lsm_ref(lsm);
+ struct vy_entry partial, entry;
next:
if (vinyl_iterator_check_tx(it) != 0)
goto fail;
@@ -3846,12 +3857,11 @@ next:
vinyl_iterator_account_read(it, start_time, NULL);
vinyl_iterator_close(it);
*ret = NULL;
- return 0;
+ goto out;
}
ERROR_INJECT_YIELD(ERRINJ_VY_DELAY_PK_LOOKUP);
/* Get the full tuple from the primary index. */
- if (vy_get_by_secondary_tuple(it->lsm, it->tx,
- vy_tx_read_view(it->tx),
+ if (vy_get_by_secondary_tuple(lsm, it->tx, vy_tx_read_view(it->tx),
partial, &entry) != 0)
goto fail;
if (entry.stmt == NULL)
@@ -3861,9 +3871,12 @@ next:
*ret = entry.stmt;
tuple_bless(*ret);
tuple_unref(*ret);
+out:
+ vy_lsm_unref(lsm);
return 0;
fail:
vinyl_iterator_close(it);
+ vy_lsm_unref(lsm);
return -1;
}
@@ -3874,7 +3887,7 @@ vinyl_iterator_free(struct iterator *base)
struct vinyl_iterator *it = (struct vinyl_iterator *)base;
if (base->next != vinyl_iterator_last)
vinyl_iterator_close(it);
- mempool_free(&it->env->iterator_pool, it);
+ mempool_free(it->pool, it);
}
static struct iterator *
@@ -3915,10 +3928,7 @@ vinyl_index_create_iterator(struct index *base, enum iterator_type type,
else
it->base.next = vinyl_iterator_secondary_next;
it->base.free = vinyl_iterator_free;
-
- it->env = env;
- it->lsm = lsm;
- vy_lsm_ref(lsm);
+ it->pool = &env->iterator_pool;
if (tx != NULL) {
/*
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 05/13] vinyl: don't exempt dropped indexes from dump and compaction
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
` (3 preceding siblings ...)
2019-08-10 10:03 ` [PATCH 04/13] vinyl: don't pin index for iterator lifetime Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 06/13] memtx: don't store pointers to index internals in iterator Vladimir Davydov
` (7 subsequent siblings)
12 siblings, 0 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
We remove an LSM tree from the scheduler queues as soon as it is
dropped, even though the tree may hang around for a while after
that, e.g. because it is pinned by an iterator. As a result, once
an index is dropped, it won't be dumped anymore - its memory level
will simply disappear without a trace. This is okay for now, but
to implement snapshot iterators we must make sure that an index
will stay valid as long as there's an iterator that references it.
That said, let's delay removal of an index from the scheduler queues
until it is about to be destroyed.
---
src/box/vinyl.c | 16 +------
src/box/vy_lsm.c | 4 ++
src/box/vy_lsm.h | 7 ++++
src/box/vy_scheduler.c | 95 ++++++++++++++++--------------------------
src/box/vy_scheduler.h | 10 ++---
5 files changed, 52 insertions(+), 80 deletions(-)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 93002fdf..ed7c21dd 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -766,18 +766,8 @@ vinyl_index_open(struct index *index)
/*
* Add the new LSM tree to the scheduler so that it can
* be dumped and compacted.
- *
- * Note, during local recovery an LSM tree may be marked
- * as dropped, which means that it will be dropped before
- * recovery is complete. In this case there's no need in
- * letting the scheduler know about it.
*/
- if (!lsm->is_dropped)
- vy_scheduler_add_lsm(&env->scheduler, lsm);
- else
- assert(env->status == VINYL_INITIAL_RECOVERY_LOCAL ||
- env->status == VINYL_FINAL_RECOVERY_LOCAL);
- return 0;
+ return vy_scheduler_add_lsm(&env->scheduler, lsm);
}
static void
@@ -856,8 +846,6 @@ vinyl_index_abort_create(struct index *index)
return;
}
- vy_scheduler_remove_lsm(&env->scheduler, lsm);
-
lsm->is_dropped = true;
vy_log_tx_begin();
@@ -911,8 +899,6 @@ vinyl_index_commit_drop(struct index *index, int64_t lsn)
if (env->status == VINYL_FINAL_RECOVERY_LOCAL && lsm->is_dropped)
return;
- vy_scheduler_remove_lsm(&env->scheduler, lsm);
-
lsm->is_dropped = true;
vy_log_tx_begin();
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index 8fba1792..aa4bce9e 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -45,6 +45,7 @@
#include "say.h"
#include "schema.h"
#include "tuple.h"
+#include "trigger.h"
#include "vy_log.h"
#include "vy_mem.h"
#include "vy_range.h"
@@ -207,6 +208,7 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_cache_env *cache_env,
lsm->group_id = group_id;
lsm->opts = index_def->opts;
vy_lsm_read_set_new(&lsm->read_set);
+ rlist_create(&lsm->on_destroy);
lsm_env->lsm_count++;
return lsm;
@@ -244,6 +246,8 @@ vy_range_tree_free_cb(vy_range_tree_t *t, struct vy_range *range, void *arg)
void
vy_lsm_delete(struct vy_lsm *lsm)
{
+ trigger_run(&lsm->on_destroy, lsm);
+
assert(heap_node_is_stray(&lsm->in_dump));
assert(heap_node_is_stray(&lsm->in_compaction));
assert(vy_lsm_read_set_empty(&lsm->read_set));
diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h
index c8b0e297..47f8ee6a 100644
--- a/src/box/vy_lsm.h
+++ b/src/box/vy_lsm.h
@@ -312,6 +312,13 @@ struct vy_lsm {
* this LSM tree.
*/
vy_lsm_read_set_t read_set;
+ /**
+ * Triggers run when the last reference to this LSM tree
+ * is dropped and the LSM tree is about to be destroyed.
+ * A pointer to this LSM tree is passed to the trigger
+ * callback in the 'event' argument.
+ */
+ struct rlist on_destroy;
};
/** Extract vy_lsm from an index object. */
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index f3bded20..ee361c31 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -510,35 +510,47 @@ vy_scheduler_reset_stat(struct vy_scheduler *scheduler)
stat->compaction_output = 0;
}
-void
+static void
+vy_scheduler_on_delete_lsm(struct trigger *trigger, void *event)
+{
+ struct vy_lsm *lsm = event;
+ struct vy_scheduler *scheduler = trigger->data;
+ assert(! heap_node_is_stray(&lsm->in_dump));
+ assert(! heap_node_is_stray(&lsm->in_compaction));
+ vy_dump_heap_delete(&scheduler->dump_heap, lsm);
+ vy_compaction_heap_delete(&scheduler->compaction_heap, lsm);
+ trigger_clear(trigger);
+ free(trigger);
+}
+
+int
vy_scheduler_add_lsm(struct vy_scheduler *scheduler, struct vy_lsm *lsm)
{
- assert(!lsm->is_dropped);
assert(heap_node_is_stray(&lsm->in_dump));
assert(heap_node_is_stray(&lsm->in_compaction));
+ /*
+ * Register a trigger that will remove this LSM tree from
+ * the scheduler queues on destruction.
+ */
+ struct trigger *trigger = malloc(sizeof(*trigger));
+ if (trigger == NULL) {
+ diag_set(OutOfMemory, sizeof(*trigger), "malloc", "trigger");
+ return -1;
+ }
+ trigger_create(trigger, vy_scheduler_on_delete_lsm, scheduler, NULL);
+ trigger_add(&lsm->on_destroy, trigger);
+ /*
+ * Add this LSM tree to the scheduler queues so that it
+ * can be dumped and compacted in a timely manner.
+ */
vy_dump_heap_insert(&scheduler->dump_heap, lsm);
vy_compaction_heap_insert(&scheduler->compaction_heap, lsm);
-}
-
-void
-vy_scheduler_remove_lsm(struct vy_scheduler *scheduler, struct vy_lsm *lsm)
-{
- assert(!lsm->is_dropped);
- assert(! heap_node_is_stray(&lsm->in_dump));
- assert(! heap_node_is_stray(&lsm->in_compaction));
- vy_dump_heap_delete(&scheduler->dump_heap, lsm);
- vy_compaction_heap_delete(&scheduler->compaction_heap, lsm);
+ return 0;
}
static void
vy_scheduler_update_lsm(struct vy_scheduler *scheduler, struct vy_lsm *lsm)
{
- if (lsm->is_dropped) {
- /* Dropped LSM trees are exempted from scheduling. */
- assert(heap_node_is_stray(&lsm->in_dump));
- assert(heap_node_is_stray(&lsm->in_compaction));
- return;
- }
assert(! heap_node_is_stray(&lsm->in_dump));
assert(! heap_node_is_stray(&lsm->in_compaction));
vy_dump_heap_update(&scheduler->dump_heap, lsm);
@@ -1267,15 +1279,9 @@ vy_task_dump_abort(struct vy_task *task)
/* The iterator has been cleaned up in a worker thread. */
task->wi->iface->close(task->wi);
- /*
- * It's no use alerting the user if the server is
- * shutting down or the LSM tree was dropped.
- */
- if (!lsm->is_dropped) {
- struct error *e = diag_last_error(&task->diag);
- error_log(e);
- say_error("%s: dump failed", vy_lsm_name(lsm));
- }
+ struct error *e = diag_last_error(&task->diag);
+ error_log(e);
+ say_error("%s: dump failed", vy_lsm_name(lsm));
vy_run_discard(task->new_run);
@@ -1287,18 +1293,6 @@ vy_task_dump_abort(struct vy_task *task)
assert(scheduler->dump_task_count > 0);
scheduler->dump_task_count--;
-
- /*
- * If the LSM tree was dropped during dump, we abort
- * the dump task, but we should still poke the scheduler
- * to check if the current dump round is complete.
- * If we don't and this LSM tree happens to be the last
- * one of the current generation, the scheduler will
- * never be notified about dump completion and hence
- * memory will never be released.
- */
- if (lsm->is_dropped)
- vy_scheduler_complete_dump(scheduler);
}
/**
@@ -1317,7 +1311,6 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_worker *worker,
.abort = vy_task_dump_abort,
};
- assert(!lsm->is_dropped);
assert(!lsm->is_dumping);
assert(lsm->pin_count == 0);
assert(vy_lsm_generation(lsm) == scheduler->dump_generation);
@@ -1602,16 +1595,10 @@ vy_task_compaction_abort(struct vy_task *task)
/* The iterator has been cleaned up in worker. */
task->wi->iface->close(task->wi);
- /*
- * It's no use alerting the user if the server is
- * shutting down or the LSM tree was dropped.
- */
- if (!lsm->is_dropped) {
- struct error *e = diag_last_error(&task->diag);
- error_log(e);
- say_error("%s: failed to compact range %s",
- vy_lsm_name(lsm), vy_range_str(range));
- }
+ struct error *e = diag_last_error(&task->diag);
+ error_log(e);
+ say_error("%s: failed to compact range %s",
+ vy_lsm_name(lsm), vy_range_str(range));
vy_run_discard(task->new_run);
@@ -1629,7 +1616,6 @@ vy_task_compaction_new(struct vy_scheduler *scheduler, struct vy_worker *worker,
.complete = vy_task_compaction_complete,
.abort = vy_task_compaction_abort,
};
- assert(!lsm->is_dropped);
struct vy_range *range = vy_range_heap_top(&lsm->range_heap);
assert(range != NULL);
@@ -1945,12 +1931,6 @@ vy_task_complete(struct vy_task *task)
assert(scheduler->stat.tasks_inprogress > 0);
scheduler->stat.tasks_inprogress--;
- if (task->lsm->is_dropped) {
- if (task->ops->abort)
- task->ops->abort(task);
- goto out;
- }
-
struct diag *diag = &task->diag;
if (task->is_failed) {
assert(!diag_is_empty(diag));
@@ -1967,7 +1947,6 @@ vy_task_complete(struct vy_task *task)
diag_move(diag_get(), diag);
goto fail;
}
-out:
scheduler->stat.tasks_completed++;
return 0;
fail:
diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h
index 2d4352d7..bc953975 100644
--- a/src/box/vy_scheduler.h
+++ b/src/box/vy_scheduler.h
@@ -194,16 +194,12 @@ vy_scheduler_reset_stat(struct vy_scheduler *scheduler);
/**
* Add an LSM tree to scheduler dump/compaction queues.
+ * When the LSM tree is destroyed, it will be removed
+ * from the queues automatically.
*/
-void
+int
vy_scheduler_add_lsm(struct vy_scheduler *, struct vy_lsm *);
-/**
- * Remove an LSM tree from scheduler dump/compaction queues.
- */
-void
-vy_scheduler_remove_lsm(struct vy_scheduler *, struct vy_lsm *);
-
/**
* Trigger dump of all currently existing in-memory trees.
*/
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 06/13] memtx: don't store pointers to index internals in iterator
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
` (4 preceding siblings ...)
2019-08-10 10:03 ` [PATCH 05/13] vinyl: don't exempt dropped indexes from dump and compaction Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-12 22:21 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:10 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 07/13] memtx: use ref counting to pin indexes for snapshot Vladimir Davydov
` (6 subsequent siblings)
12 siblings, 2 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
It's pointless as we can always access the index via iterator->index.
---
src/box/memtx_bitset.c | 10 ++-----
src/box/memtx_hash.c | 20 ++++++-------
src/box/memtx_tree.c | 67 +++++++++++++++++++++++-------------------
3 files changed, 49 insertions(+), 48 deletions(-)
diff --git a/src/box/memtx_bitset.c b/src/box/memtx_bitset.c
index 59bc9642..67eaf6fd 100644
--- a/src/box/memtx_bitset.c
+++ b/src/box/memtx_bitset.c
@@ -168,9 +168,6 @@ value_to_tuple(size_t value)
struct bitset_index_iterator {
struct iterator base; /* Must be the first member. */
struct tt_bitset_iterator bitset_it;
-#ifndef OLD_GOOD_BITSET
- struct memtx_bitset_index *bitset_index;
-#endif /* #ifndef OLD_GOOD_BITSET */
/** Memory pool the iterator was allocated from. */
struct mempool *pool;
};
@@ -208,7 +205,9 @@ bitset_index_iterator_next(struct iterator *iterator, struct tuple **ret)
}
#ifndef OLD_GOOD_BITSET
- *ret = memtx_bitset_index_value_to_tuple(it->bitset_index, value);
+ struct memtx_bitset_index *index =
+ (struct memtx_bitset_index *)iterator->index;
+ *ret = memtx_bitset_index_value_to_tuple(index, value);
#else /* #ifndef OLD_GOOD_BITSET */
*ret = value_to_tuple(value);
#endif /* #ifndef OLD_GOOD_BITSET */
@@ -348,9 +347,6 @@ memtx_bitset_index_create_iterator(struct index *base, enum iterator_type type,
it->base.free = bitset_index_iterator_free;
tt_bitset_iterator_create(&it->bitset_it, realloc);
-#ifndef OLD_GOOD_BITSET
- it->bitset_index = index;
-#endif
const void *bitset_key = NULL;
uint32_t bitset_key_size = 0;
diff --git a/src/box/memtx_hash.c b/src/box/memtx_hash.c
index 3174ae2c..e30170d1 100644
--- a/src/box/memtx_hash.c
+++ b/src/box/memtx_hash.c
@@ -83,7 +83,6 @@ struct memtx_hash_index {
struct hash_iterator {
struct iterator base; /* Must be the first member. */
- struct light_index_core *hash_table;
struct light_index_iterator iterator;
/** Memory pool the iterator was allocated from. */
struct mempool *pool;
@@ -106,7 +105,8 @@ hash_iterator_ge(struct iterator *ptr, struct tuple **ret)
{
assert(ptr->free == hash_iterator_free);
struct hash_iterator *it = (struct hash_iterator *) ptr;
- struct tuple **res = light_index_iterator_get_and_next(it->hash_table,
+ struct memtx_hash_index *index = (struct memtx_hash_index *)ptr->index;
+ struct tuple **res = light_index_iterator_get_and_next(&index->hash_table,
&it->iterator);
*ret = res != NULL ? *res : NULL;
return 0;
@@ -118,10 +118,11 @@ hash_iterator_gt(struct iterator *ptr, struct tuple **ret)
assert(ptr->free == hash_iterator_free);
ptr->next = hash_iterator_ge;
struct hash_iterator *it = (struct hash_iterator *) ptr;
- struct tuple **res = light_index_iterator_get_and_next(it->hash_table,
+ struct memtx_hash_index *index = (struct memtx_hash_index *)ptr->index;
+ struct tuple **res = light_index_iterator_get_and_next(&index->hash_table,
&it->iterator);
if (res != NULL)
- res = light_index_iterator_get_and_next(it->hash_table,
+ res = light_index_iterator_get_and_next(&index->hash_table,
&it->iterator);
*ret = res != NULL ? *res : NULL;
return 0;
@@ -364,27 +365,26 @@ memtx_hash_index_create_iterator(struct index *base, enum iterator_type type,
iterator_create(&it->base, base);
it->pool = &memtx->iterator_pool;
it->base.free = hash_iterator_free;
- it->hash_table = &index->hash_table;
- light_index_iterator_begin(it->hash_table, &it->iterator);
+ light_index_iterator_begin(&index->hash_table, &it->iterator);
switch (type) {
case ITER_GT:
if (part_count != 0) {
- light_index_iterator_key(it->hash_table, &it->iterator,
+ light_index_iterator_key(&index->hash_table, &it->iterator,
key_hash(key, base->def->key_def), key);
it->base.next = hash_iterator_gt;
} else {
- light_index_iterator_begin(it->hash_table, &it->iterator);
+ light_index_iterator_begin(&index->hash_table, &it->iterator);
it->base.next = hash_iterator_ge;
}
break;
case ITER_ALL:
- light_index_iterator_begin(it->hash_table, &it->iterator);
+ light_index_iterator_begin(&index->hash_table, &it->iterator);
it->base.next = hash_iterator_ge;
break;
case ITER_EQ:
assert(part_count > 0);
- light_index_iterator_key(it->hash_table, &it->iterator,
+ light_index_iterator_key(&index->hash_table, &it->iterator,
key_hash(key, base->def->key_def), key);
it->base.next = hash_iterator_eq;
break;
diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index 084b4aff..aeba2ba3 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -134,8 +134,6 @@ memtx_tree_qcompare(const void* a, const void *b, void *c)
/* {{{ MemtxTree Iterators ****************************************/
struct tree_iterator {
struct iterator base;
- const struct memtx_tree *tree;
- struct index_def *index_def;
struct memtx_tree_iterator tree_iterator;
enum iterator_type type;
struct memtx_tree_key_data key_data;
@@ -179,20 +177,21 @@ tree_iterator_dummie(struct iterator *iterator, struct tuple **ret)
static int
tree_iterator_next(struct iterator *iterator, struct tuple **ret)
{
+ struct memtx_tree_index *index =
+ (struct memtx_tree_index *)iterator->index;
struct tree_iterator *it = tree_iterator(iterator);
assert(it->current.tuple != NULL);
struct memtx_tree_data *check =
- memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_get_elem(&index->tree, &it->tree_iterator);
if (check == NULL || !memtx_tree_data_is_equal(check, &it->current)) {
- it->tree_iterator =
- memtx_tree_upper_bound_elem(it->tree, it->current,
- NULL);
+ it->tree_iterator = memtx_tree_upper_bound_elem(&index->tree,
+ it->current, NULL);
} else {
- memtx_tree_iterator_next(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_next(&index->tree, &it->tree_iterator);
}
tuple_unref(it->current.tuple);
struct memtx_tree_data *res =
- memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_get_elem(&index->tree, &it->tree_iterator);
if (res == NULL) {
iterator->next = tree_iterator_dummie;
it->current.tuple = NULL;
@@ -208,18 +207,20 @@ tree_iterator_next(struct iterator *iterator, struct tuple **ret)
static int
tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
{
+ struct memtx_tree_index *index =
+ (struct memtx_tree_index *)iterator->index;
struct tree_iterator *it = tree_iterator(iterator);
assert(it->current.tuple != NULL);
struct memtx_tree_data *check =
- memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_get_elem(&index->tree, &it->tree_iterator);
if (check == NULL || !memtx_tree_data_is_equal(check, &it->current)) {
- it->tree_iterator =
- memtx_tree_lower_bound_elem(it->tree, it->current, NULL);
+ it->tree_iterator = memtx_tree_lower_bound_elem(&index->tree,
+ it->current, NULL);
}
- memtx_tree_iterator_prev(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_prev(&index->tree, &it->tree_iterator);
tuple_unref(it->current.tuple);
struct memtx_tree_data *res =
- memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_get_elem(&index->tree, &it->tree_iterator);
if (!res) {
iterator->next = tree_iterator_dummie;
it->current.tuple = NULL;
@@ -235,26 +236,28 @@ tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
static int
tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
{
+ struct memtx_tree_index *index =
+ (struct memtx_tree_index *)iterator->index;
struct tree_iterator *it = tree_iterator(iterator);
assert(it->current.tuple != NULL);
struct memtx_tree_data *check =
- memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_get_elem(&index->tree, &it->tree_iterator);
if (check == NULL || !memtx_tree_data_is_equal(check, &it->current)) {
- it->tree_iterator =
- memtx_tree_upper_bound_elem(it->tree, it->current, NULL);
+ it->tree_iterator = memtx_tree_upper_bound_elem(&index->tree,
+ it->current, NULL);
} else {
- memtx_tree_iterator_next(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_next(&index->tree, &it->tree_iterator);
}
tuple_unref(it->current.tuple);
struct memtx_tree_data *res =
- memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_get_elem(&index->tree, &it->tree_iterator);
/* Use user key def to save a few loops. */
if (res == NULL ||
tuple_compare_with_key(res->tuple, res->hint,
it->key_data.key,
it->key_data.part_count,
it->key_data.hint,
- it->index_def->key_def) != 0) {
+ index->base.def->key_def) != 0) {
iterator->next = tree_iterator_dummie;
it->current.tuple = NULL;
*ret = NULL;
@@ -269,25 +272,27 @@ tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
static int
tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
{
+ struct memtx_tree_index *index =
+ (struct memtx_tree_index *)iterator->index;
struct tree_iterator *it = tree_iterator(iterator);
assert(it->current.tuple != NULL);
struct memtx_tree_data *check =
- memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_get_elem(&index->tree, &it->tree_iterator);
if (check == NULL || !memtx_tree_data_is_equal(check, &it->current)) {
- it->tree_iterator =
- memtx_tree_lower_bound_elem(it->tree, it->current, NULL);
+ it->tree_iterator = memtx_tree_lower_bound_elem(&index->tree,
+ it->current, NULL);
}
- memtx_tree_iterator_prev(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_prev(&index->tree, &it->tree_iterator);
tuple_unref(it->current.tuple);
struct memtx_tree_data *res =
- memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_get_elem(&index->tree, &it->tree_iterator);
/* Use user key def to save a few loops. */
if (res == NULL ||
tuple_compare_with_key(res->tuple, res->hint,
it->key_data.key,
it->key_data.part_count,
it->key_data.hint,
- it->index_def->key_def) != 0) {
+ index->base.def->key_def) != 0) {
iterator->next = tree_iterator_dummie;
it->current.tuple = NULL;
*ret = NULL;
@@ -331,9 +336,11 @@ static int
tree_iterator_start(struct iterator *iterator, struct tuple **ret)
{
*ret = NULL;
+ struct memtx_tree_index *index =
+ (struct memtx_tree_index *)iterator->index;
struct tree_iterator *it = tree_iterator(iterator);
it->base.next = tree_iterator_dummie;
- const struct memtx_tree *tree = it->tree;
+ struct memtx_tree *tree = &index->tree;
enum iterator_type type = it->type;
bool exact = false;
assert(it->current.tuple == NULL);
@@ -369,12 +376,12 @@ tree_iterator_start(struct iterator *iterator, struct tuple **ret)
* iterator_next call will convert the iterator to the
* last position in the tree, that's what we need.
*/
- memtx_tree_iterator_prev(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_prev(tree, &it->tree_iterator);
}
}
- struct memtx_tree_data *res =
- memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+ struct memtx_tree_data *res = memtx_tree_iterator_get_elem(tree,
+ &it->tree_iterator);
if (!res)
return 0;
*ret = res->tuple;
@@ -1002,8 +1009,6 @@ memtx_tree_index_create_iterator(struct index *base, enum iterator_type type,
it->key_data.key = key;
it->key_data.part_count = part_count;
it->key_data.hint = key_hint(key, part_count, cmp_def);
- it->index_def = base->def;
- it->tree = &index->tree;
it->tree_iterator = memtx_tree_invalid_iterator();
it->current.tuple = NULL;
return (struct iterator *)it;
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 07/13] memtx: use ref counting to pin indexes for snapshot
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
` (5 preceding siblings ...)
2019-08-10 10:03 ` [PATCH 06/13] memtx: don't store pointers to index internals in iterator Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-12 22:24 ` [tarantool-patches] " Konstantin Osipov
2019-08-10 10:03 ` [PATCH 08/13] memtx: allow snapshot iterator to fail Vladimir Davydov
` (5 subsequent siblings)
12 siblings, 1 reply; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
Currently, to prevent an index from going away while it is being
written to a snapshot, we postpone memtx_gc_task's free() invocation
until checkpointing is complete, see commit 94de0a081b3a ("Don't take
schema lock for checkpointing"). This works fine, but makes it rather
difficult to reuse snapshot iterators for other purposes, e.g. feeding
a consistent read view to a newly joined replica.
Let's instead use index reference counting for pinning indexes for
checkpointing. A reference is taken in a snapshot iterator constructor
and released when the snapshot iterator is destroyed.
---
src/box/memtx_engine.c | 26 +-------------------------
src/box/memtx_engine.h | 5 -----
src/box/memtx_hash.c | 15 +++++++++------
src/box/memtx_tree.c | 16 +++++++++-------
4 files changed, 19 insertions(+), 43 deletions(-)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 59ad1682..ec667e7a 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -651,19 +651,6 @@ memtx_engine_wait_checkpoint(struct engine *engine,
return result;
}
-/**
- * Called after checkpointing is complete to free indexes dropped
- * while checkpointing was in progress, see memtx_engine_run_gc().
- */
-static void
-memtx_engine_gc_after_checkpoint(struct memtx_engine *memtx)
-{
- struct memtx_gc_task *task, *next;
- stailq_foreach_entry_safe(task, next, &memtx->gc_to_free, link)
- task->vtab->free(task);
- stailq_create(&memtx->gc_to_free);
-}
-
static void
memtx_engine_commit_checkpoint(struct engine *engine,
const struct vclock *vclock)
@@ -701,8 +688,6 @@ memtx_engine_commit_checkpoint(struct engine *engine,
checkpoint_delete(memtx->checkpoint);
memtx->checkpoint = NULL;
-
- memtx_engine_gc_after_checkpoint(memtx);
}
static void
@@ -887,15 +872,7 @@ memtx_engine_run_gc(struct memtx_engine *memtx, bool *stop)
task->vtab->run(task, &task_done);
if (task_done) {
stailq_shift(&memtx->gc_queue);
- /*
- * If checkpointing is in progress, the index may be
- * used by the checkpoint thread so we postpone freeing
- * until checkpointing is complete.
- */
- if (memtx->checkpoint == NULL)
- task->vtab->free(task);
- else
- stailq_add_entry(&memtx->gc_to_free, task, link);
+ task->vtab->free(task);
}
}
@@ -967,7 +944,6 @@ memtx_engine_new(const char *snap_dirname, bool force_recovery,
}
stailq_create(&memtx->gc_queue);
- stailq_create(&memtx->gc_to_free);
memtx->gc_fiber = fiber_new("memtx.gc", memtx_engine_gc_f);
if (memtx->gc_fiber == NULL)
goto fail;
diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h
index fcf595e7..ccb51678 100644
--- a/src/box/memtx_engine.h
+++ b/src/box/memtx_engine.h
@@ -155,11 +155,6 @@ struct memtx_engine {
* memtx_gc_task::link.
*/
struct stailq gc_queue;
- /**
- * List of tasks awaiting to be freed once checkpointing
- * is complete, linked by memtx_gc_task::link.
- */
- struct stailq gc_to_free;
};
struct memtx_gc_task;
diff --git a/src/box/memtx_hash.c b/src/box/memtx_hash.c
index e30170d1..2762d973 100644
--- a/src/box/memtx_hash.c
+++ b/src/box/memtx_hash.c
@@ -399,7 +399,7 @@ memtx_hash_index_create_iterator(struct index *base, enum iterator_type type,
struct hash_snapshot_iterator {
struct snapshot_iterator base;
- struct light_index_core *hash_table;
+ struct memtx_hash_index *index;
struct light_index_iterator iterator;
};
@@ -414,7 +414,8 @@ hash_snapshot_iterator_free(struct snapshot_iterator *iterator)
assert(iterator->free == hash_snapshot_iterator_free);
struct hash_snapshot_iterator *it =
(struct hash_snapshot_iterator *) iterator;
- light_index_iterator_destroy(it->hash_table, &it->iterator);
+ light_index_iterator_destroy(&it->index->hash_table, &it->iterator);
+ index_unref(&it->index->base);
free(iterator);
}
@@ -429,7 +430,8 @@ hash_snapshot_iterator_next(struct snapshot_iterator *iterator, uint32_t *size)
assert(iterator->free == hash_snapshot_iterator_free);
struct hash_snapshot_iterator *it =
(struct hash_snapshot_iterator *) iterator;
- struct tuple **res = light_index_iterator_get_and_next(it->hash_table,
+ struct light_index_core *hash_table = &it->index->hash_table;
+ struct tuple **res = light_index_iterator_get_and_next(hash_table,
&it->iterator);
if (res == NULL)
return NULL;
@@ -455,9 +457,10 @@ memtx_hash_index_create_snapshot_iterator(struct index *base)
it->base.next = hash_snapshot_iterator_next;
it->base.free = hash_snapshot_iterator_free;
- it->hash_table = &index->hash_table;
- light_index_iterator_begin(it->hash_table, &it->iterator);
- light_index_iterator_freeze(it->hash_table, &it->iterator);
+ it->index = index;
+ index_ref(base);
+ light_index_iterator_begin(&index->hash_table, &it->iterator);
+ light_index_iterator_freeze(&index->hash_table, &it->iterator);
return (struct snapshot_iterator *) it;
}
diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index aeba2ba3..77223a6d 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -1205,7 +1205,7 @@ memtx_tree_index_end_build(struct index *base)
struct tree_snapshot_iterator {
struct snapshot_iterator base;
- struct memtx_tree *tree;
+ struct memtx_tree_index *index;
struct memtx_tree_iterator tree_iterator;
};
@@ -1215,8 +1215,8 @@ tree_snapshot_iterator_free(struct snapshot_iterator *iterator)
assert(iterator->free == tree_snapshot_iterator_free);
struct tree_snapshot_iterator *it =
(struct tree_snapshot_iterator *)iterator;
- struct memtx_tree *tree = (struct memtx_tree *)it->tree;
- memtx_tree_iterator_destroy(tree, &it->tree_iterator);
+ memtx_tree_iterator_destroy(&it->index->tree, &it->tree_iterator);
+ index_unref(&it->index->base);
free(iterator);
}
@@ -1226,11 +1226,12 @@ tree_snapshot_iterator_next(struct snapshot_iterator *iterator, uint32_t *size)
assert(iterator->free == tree_snapshot_iterator_free);
struct tree_snapshot_iterator *it =
(struct tree_snapshot_iterator *)iterator;
- struct memtx_tree_data *res =
- memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+ struct memtx_tree *tree = &it->index->tree;
+ struct memtx_tree_data *res = memtx_tree_iterator_get_elem(tree,
+ &it->tree_iterator);
if (res == NULL)
return NULL;
- memtx_tree_iterator_next(it->tree, &it->tree_iterator);
+ memtx_tree_iterator_next(tree, &it->tree_iterator);
return tuple_data_range(res->tuple, size);
}
@@ -1253,7 +1254,8 @@ memtx_tree_index_create_snapshot_iterator(struct index *base)
it->base.free = tree_snapshot_iterator_free;
it->base.next = tree_snapshot_iterator_next;
- it->tree = &index->tree;
+ it->index = index;
+ index_ref(base);
it->tree_iterator = memtx_tree_iterator_first(&index->tree);
memtx_tree_iterator_freeze(&index->tree, &it->tree_iterator);
return (struct snapshot_iterator *) it;
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 08/13] memtx: allow snapshot iterator to fail
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
` (6 preceding siblings ...)
2019-08-10 10:03 ` [PATCH 07/13] memtx: use ref counting to pin indexes for snapshot Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-12 22:25 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:10 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 09/13] memtx: enter small delayed free mode from snapshot iterator Vladimir Davydov
` (4 subsequent siblings)
12 siblings, 2 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
Memtx iterators never fail, that's why the snapshot iterator interface
doesn't support failures. However, once we introduce snapshot iterator
support for vinyl, we will need a way to handle errors in the API.
---
src/box/index.h | 3 ++-
src/box/memtx_engine.c | 22 ++++++++++++----------
src/box/memtx_hash.c | 14 +++++++++-----
src/box/memtx_tree.c | 14 +++++++++-----
src/box/sequence.c | 24 ++++++++++++++----------
5 files changed, 46 insertions(+), 31 deletions(-)
diff --git a/src/box/index.h b/src/box/index.h
index 89b5e134..86148023 100644
--- a/src/box/index.h
+++ b/src/box/index.h
@@ -293,7 +293,8 @@ struct snapshot_iterator {
* Returns a pointer to the tuple data and its
* size or NULL if EOF.
*/
- const char *(*next)(struct snapshot_iterator *, uint32_t *size);
+ int (*next)(struct snapshot_iterator *,
+ const char **data, uint32_t *size);
/**
* Destroy the iterator.
*/
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index ec667e7a..87806775 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -575,25 +575,27 @@ checkpoint_f(va_list ap)
ERROR_INJECT_SLEEP(ERRINJ_SNAP_WRITE_DELAY);
struct checkpoint_entry *entry;
rlist_foreach_entry(entry, &ckpt->entries, link) {
+ int rc;
uint32_t size;
const char *data;
struct snapshot_iterator *it = entry->iterator;
- for (data = it->next(it, &size); data != NULL;
- data = it->next(it, &size)) {
+ while ((rc = it->next(it, &data, &size)) == 0 && data != NULL) {
if (checkpoint_write_tuple(&snap, entry->space_id,
- entry->group_id, data, size) != 0) {
- xlog_close(&snap, false);
- return -1;
- }
+ entry->group_id, data, size) != 0)
+ goto fail;
}
+ if (rc != 0)
+ goto fail;
}
- if (xlog_flush(&snap) < 0) {
- xlog_close(&snap, false);
- return -1;
- }
+ if (xlog_flush(&snap) < 0)
+ goto fail;
+
xlog_close(&snap, false);
say_info("done");
return 0;
+fail:
+ xlog_close(&snap, false);
+ return -1;
}
static int
diff --git a/src/box/memtx_hash.c b/src/box/memtx_hash.c
index 2762d973..920f1032 100644
--- a/src/box/memtx_hash.c
+++ b/src/box/memtx_hash.c
@@ -424,8 +424,9 @@ hash_snapshot_iterator_free(struct snapshot_iterator *iterator)
* Virtual method of snapshot iterator.
* @sa index_vtab::create_snapshot_iterator.
*/
-static const char *
-hash_snapshot_iterator_next(struct snapshot_iterator *iterator, uint32_t *size)
+static int
+hash_snapshot_iterator_next(struct snapshot_iterator *iterator,
+ const char **data, uint32_t *size)
{
assert(iterator->free == hash_snapshot_iterator_free);
struct hash_snapshot_iterator *it =
@@ -433,9 +434,12 @@ hash_snapshot_iterator_next(struct snapshot_iterator *iterator, uint32_t *size)
struct light_index_core *hash_table = &it->index->hash_table;
struct tuple **res = light_index_iterator_get_and_next(hash_table,
&it->iterator);
- if (res == NULL)
- return NULL;
- return tuple_data_range(*res, size);
+ if (res == NULL) {
+ *data = NULL;
+ return 0;
+ }
+ *data = tuple_data_range(*res, size);
+ return 0;
}
/**
diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index 77223a6d..831a2715 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -1220,8 +1220,9 @@ tree_snapshot_iterator_free(struct snapshot_iterator *iterator)
free(iterator);
}
-static const char *
-tree_snapshot_iterator_next(struct snapshot_iterator *iterator, uint32_t *size)
+static int
+tree_snapshot_iterator_next(struct snapshot_iterator *iterator,
+ const char **data, uint32_t *size)
{
assert(iterator->free == tree_snapshot_iterator_free);
struct tree_snapshot_iterator *it =
@@ -1229,10 +1230,13 @@ tree_snapshot_iterator_next(struct snapshot_iterator *iterator, uint32_t *size)
struct memtx_tree *tree = &it->index->tree;
struct memtx_tree_data *res = memtx_tree_iterator_get_elem(tree,
&it->tree_iterator);
- if (res == NULL)
- return NULL;
+ if (res == NULL) {
+ *data = NULL;
+ return 0;
+ }
memtx_tree_iterator_next(tree, &it->tree_iterator);
- return tuple_data_range(res->tuple, size);
+ *data = tuple_data_range(res->tuple, size);
+ return 0;
}
/**
diff --git a/src/box/sequence.c b/src/box/sequence.c
index 1aacc505..5ebfa274 100644
--- a/src/box/sequence.c
+++ b/src/box/sequence.c
@@ -311,27 +311,31 @@ struct sequence_data_iterator {
#define SEQUENCE_TUPLE_BUF_SIZE (mp_sizeof_array(2) + \
2 * mp_sizeof_uint(UINT64_MAX))
-static const char *
-sequence_data_iterator_next(struct snapshot_iterator *base, uint32_t *size)
+static int
+sequence_data_iterator_next(struct snapshot_iterator *base,
+ const char **data, uint32_t *size)
{
struct sequence_data_iterator *iter =
(struct sequence_data_iterator *)base;
- struct sequence_data *data =
+ struct sequence_data *sd =
light_sequence_iterator_get_and_next(&sequence_data_index,
&iter->iter);
- if (data == NULL)
- return NULL;
+ if (sd == NULL) {
+ *data = NULL;
+ return 0;
+ }
char *buf_end = iter->tuple;
buf_end = mp_encode_array(buf_end, 2);
- buf_end = mp_encode_uint(buf_end, data->id);
- buf_end = (data->value >= 0 ?
- mp_encode_uint(buf_end, data->value) :
- mp_encode_int(buf_end, data->value));
+ buf_end = mp_encode_uint(buf_end, sd->id);
+ buf_end = (sd->value >= 0 ?
+ mp_encode_uint(buf_end, sd->value) :
+ mp_encode_int(buf_end, sd->value));
assert(buf_end <= iter->tuple + SEQUENCE_TUPLE_BUF_SIZE);
+ *data = iter->tuple;
*size = buf_end - iter->tuple;
- return iter->tuple;
+ return 0;
}
static void
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 09/13] memtx: enter small delayed free mode from snapshot iterator
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
` (7 preceding siblings ...)
2019-08-10 10:03 ` [PATCH 08/13] memtx: allow snapshot iterator to fail Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-12 22:27 ` [tarantool-patches] " Konstantin Osipov
2019-08-10 10:03 ` [PATCH 10/13] wal: make wal_sync fail on write error Vladimir Davydov
` (3 subsequent siblings)
12 siblings, 1 reply; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
We must enable SMALL_DELAYED_FREE_MODE to safely use a memtx snapshot
iterator. Currently, we do that in checkpoint related callbacks, but if
we want to reuse snapshot iterators for other purposes, e.g. feeding
a read view to a newly joined replica, we better hide this code behind
snapshot iterator constructors.
---
src/box/memtx_engine.c | 24 ++++++++++++++++--------
src/box/memtx_engine.h | 23 +++++++++++++++++++++++
src/box/memtx_hash.c | 3 +++
src/box/memtx_tree.c | 3 +++
4 files changed, 45 insertions(+), 8 deletions(-)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 87806775..c92ed82b 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -614,10 +614,6 @@ memtx_engine_begin_checkpoint(struct engine *engine)
memtx->checkpoint = NULL;
return -1;
}
-
- /* increment snapshot version; set tuple deletion to delayed mode */
- memtx->snapshot_version++;
- small_alloc_setopt(&memtx->alloc, SMALL_DELAYED_FREE_MODE, true);
return 0;
}
@@ -665,8 +661,6 @@ memtx_engine_commit_checkpoint(struct engine *engine,
/* waitCheckpoint() must have been done. */
assert(!memtx->checkpoint->waiting_for_snap_thread);
- small_alloc_setopt(&memtx->alloc, SMALL_DELAYED_FREE_MODE, false);
-
if (!memtx->checkpoint->touch) {
int64_t lsn = vclock_sum(&memtx->checkpoint->vclock);
struct xdir *dir = &memtx->checkpoint->dir;
@@ -707,8 +701,6 @@ memtx_engine_abort_checkpoint(struct engine *engine)
memtx->checkpoint->waiting_for_snap_thread = false;
}
- small_alloc_setopt(&memtx->alloc, SMALL_DELAYED_FREE_MODE, false);
-
/** Remove garbage .inprogress file. */
const char *filename =
xdir_format_filename(&memtx->checkpoint->dir,
@@ -1018,6 +1010,22 @@ memtx_engine_set_max_tuple_size(struct memtx_engine *memtx, size_t max_size)
memtx->max_tuple_size = max_size;
}
+void
+memtx_enter_delayed_free_mode(struct memtx_engine *memtx)
+{
+ memtx->snapshot_version++;
+ if (memtx->delayed_free_mode++ == 0)
+ small_alloc_setopt(&memtx->alloc, SMALL_DELAYED_FREE_MODE, true);
+}
+
+void
+memtx_leave_delayed_free_mode(struct memtx_engine *memtx)
+{
+ assert(memtx->delayed_free_mode > 0);
+ if (--memtx->delayed_free_mode == 0)
+ small_alloc_setopt(&memtx->alloc, SMALL_DELAYED_FREE_MODE, false);
+}
+
struct tuple *
memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
{
diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h
index ccb51678..c092f5d8 100644
--- a/src/box/memtx_engine.h
+++ b/src/box/memtx_engine.h
@@ -137,6 +137,12 @@ struct memtx_engine {
size_t max_tuple_size;
/** Incremented with each next snapshot. */
uint32_t snapshot_version;
+ /**
+ * Unless zero, freeing of tuples allocated before the last
+ * call to memtx_enter_delayed_free_mode() is delayed until
+ * memtx_leave_delayed_free_mode() is called.
+ */
+ uint32_t delayed_free_mode;
/** Memory pool for rtree index iterator. */
struct mempool rtree_iterator_pool;
/**
@@ -205,6 +211,23 @@ memtx_engine_set_memory(struct memtx_engine *memtx, size_t size);
void
memtx_engine_set_max_tuple_size(struct memtx_engine *memtx, size_t max_size);
+/**
+ * Enter tuple delayed free mode: tuple allocated before the call
+ * won't be freed until memtx_leave_delayed_free_mode() is called.
+ * This function is reentrant, meaning it's okay to call it multiple
+ * times from the same or different fibers - one just has to leave
+ * the delayed free mode the same amount of times then.
+ */
+void
+memtx_enter_delayed_free_mode(struct memtx_engine *memtx);
+
+/**
+ * Leave tuple delayed free mode. This function undoes the effect
+ * of memtx_enter_delayed_free_mode().
+ */
+void
+memtx_leave_delayed_free_mode(struct memtx_engine *memtx);
+
/** Allocate a memtx tuple. @sa tuple_new(). */
struct tuple *
memtx_tuple_new(struct tuple_format *format, const char *data, const char *end);
diff --git a/src/box/memtx_hash.c b/src/box/memtx_hash.c
index 920f1032..cdd531cb 100644
--- a/src/box/memtx_hash.c
+++ b/src/box/memtx_hash.c
@@ -414,6 +414,8 @@ hash_snapshot_iterator_free(struct snapshot_iterator *iterator)
assert(iterator->free == hash_snapshot_iterator_free);
struct hash_snapshot_iterator *it =
(struct hash_snapshot_iterator *) iterator;
+ memtx_leave_delayed_free_mode((struct memtx_engine *)
+ it->index->base.engine);
light_index_iterator_destroy(&it->index->hash_table, &it->iterator);
index_unref(&it->index->base);
free(iterator);
@@ -465,6 +467,7 @@ memtx_hash_index_create_snapshot_iterator(struct index *base)
index_ref(base);
light_index_iterator_begin(&index->hash_table, &it->iterator);
light_index_iterator_freeze(&index->hash_table, &it->iterator);
+ memtx_enter_delayed_free_mode((struct memtx_engine *)base->engine);
return (struct snapshot_iterator *) it;
}
diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index 831a2715..e155ecd6 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -1215,6 +1215,8 @@ tree_snapshot_iterator_free(struct snapshot_iterator *iterator)
assert(iterator->free == tree_snapshot_iterator_free);
struct tree_snapshot_iterator *it =
(struct tree_snapshot_iterator *)iterator;
+ memtx_leave_delayed_free_mode((struct memtx_engine *)
+ it->index->base.engine);
memtx_tree_iterator_destroy(&it->index->tree, &it->tree_iterator);
index_unref(&it->index->base);
free(iterator);
@@ -1262,6 +1264,7 @@ memtx_tree_index_create_snapshot_iterator(struct index *base)
index_ref(base);
it->tree_iterator = memtx_tree_iterator_first(&index->tree);
memtx_tree_iterator_freeze(&index->tree, &it->tree_iterator);
+ memtx_enter_delayed_free_mode((struct memtx_engine *)base->engine);
return (struct snapshot_iterator *) it;
}
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 10/13] wal: make wal_sync fail on write error
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
` (8 preceding siblings ...)
2019-08-10 10:03 ` [PATCH 09/13] memtx: enter small delayed free mode from snapshot iterator Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-12 22:29 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 16:48 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 11/13] xrow: factor out helper for setting REPLACE request body Vladimir Davydov
` (2 subsequent siblings)
12 siblings, 2 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
wal_sync() simply flushes the tx<->wal request queue, it doesn't
guarantee that all pending writes are successfully committed to disk.
This works for now, but in order to implement replica join off the
current read view, we need to make sure that all pending writes have
been persisted and won't be rolled back before we can use memtx
snapshot iterators. So this patch adds a return code to wal_sync():
since now on it returns -1 if rollback is in progress and hence
some in-memory changes are going to be rolled back. We will use
this method after opening memtx snapshot iterators used for feeding
a consistent read view a newly joined replica so as to ensure that
changes frozen by the iterators have made it to the disk.
---
src/box/vinyl.c | 22 ++++++++++++++--------
src/box/wal.c | 29 ++++++++++++++++++++++++++---
src/box/wal.h | 5 +++--
3 files changed, 43 insertions(+), 13 deletions(-)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index ed7c21dd..9e93153b 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1098,13 +1098,16 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
* trigger so that changes made by newer transactions are checked
* by the trigger callback.
*/
- if (need_wal_sync)
- wal_sync();
+ int rc;
+ if (need_wal_sync) {
+ rc = wal_sync();
+ if (rc != 0)
+ goto out;
+ }
struct vy_read_iterator itr;
vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, pk->env->empty_key,
&env->xm->p_committed_read_view);
- int rc;
int loops = 0;
struct vy_entry entry;
while ((rc = vy_read_iterator_next(&itr, &entry)) == 0) {
@@ -1129,7 +1132,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
break;
}
vy_read_iterator_close(&itr);
-
+out:
diag_destroy(&ctx.diag);
trigger_clear(&on_replace);
txn_can_yield(txn, false);
@@ -4373,13 +4376,16 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
* trigger so that changes made by newer transactions are checked
* by the trigger callback.
*/
- if (need_wal_sync)
- wal_sync();
+ int rc;
+ if (need_wal_sync) {
+ rc = wal_sync();
+ if (rc != 0)
+ goto out;
+ }
struct vy_read_iterator itr;
vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, pk->env->empty_key,
&env->xm->p_committed_read_view);
- int rc;
int loops = 0;
struct vy_entry entry;
int64_t build_lsn = env->xm->lsn;
@@ -4443,7 +4449,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
diag_move(&ctx.diag, diag_get());
rc = -1;
}
-
+out:
diag_destroy(&ctx.diag);
trigger_clear(&on_replace);
txn_can_yield(txn, false);
diff --git a/src/box/wal.c b/src/box/wal.c
index 58a58e5b..267cafed 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -524,13 +524,36 @@ wal_free(void)
wal_writer_destroy(writer);
}
-void
+static int
+wal_sync_f(struct cbus_call_msg *msg)
+{
+ (void)msg;
+ struct wal_writer *writer = &wal_writer_singleton;
+ if (writer->in_rollback.route != NULL) {
+ /* We're rolling back a failed write. */
+ diag_set(ClientError, ER_WAL_IO);
+ return -1;
+ }
+ return 0;
+}
+
+int
wal_sync(void)
{
struct wal_writer *writer = &wal_writer_singleton;
if (writer->wal_mode == WAL_NONE)
- return;
- cbus_flush(&writer->wal_pipe, &writer->tx_prio_pipe, NULL);
+ return 0;
+ if (!stailq_empty(&writer->rollback)) {
+ /* We're rolling back a failed write. */
+ diag_set(ClientError, ER_WAL_IO);
+ return -1;
+ }
+ bool cancellable = fiber_set_cancellable(false);
+ struct cbus_call_msg msg;
+ int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
+ &msg, wal_sync_f, NULL, TIMEOUT_INFINITY);
+ fiber_set_cancellable(cancellable);
+ return rc;
}
static int
diff --git a/src/box/wal.h b/src/box/wal.h
index 4e500d2a..6725f26d 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -171,9 +171,10 @@ enum wal_mode
wal_mode();
/**
- * Wait till all pending changes to the WAL are flushed.
+ * Wait until all submitted writes are successfully flushed
+ * to disk. Returns 0 on success, -1 if write failed.
*/
-void
+int
wal_sync(void);
struct wal_checkpoint {
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 11/13] xrow: factor out helper for setting REPLACE request body
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
` (9 preceding siblings ...)
2019-08-10 10:03 ` [PATCH 10/13] wal: make wal_sync fail on write error Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-12 22:29 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:11 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 12/13] test: disable replication/on_schema_init Vladimir Davydov
2019-08-10 10:03 ` [PATCH 13/13] relay: join new replicas off read view Vladimir Davydov
12 siblings, 2 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
We will reuse it to relay a snapshot to a newly joined replica.
---
src/box/iproto_constants.h | 11 +++++++++++
src/box/memtx_engine.c | 6 +-----
2 files changed, 12 insertions(+), 5 deletions(-)
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 126d7335..724cce53 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -343,6 +343,17 @@ struct PACKED request_replace_body {
uint8_t k_tuple;
};
+static inline void
+request_replace_body_create(struct request_replace_body *body,
+ uint32_t space_id)
+{
+ body->m_body = 0x82; /* map of two elements. */
+ body->k_space_id = IPROTO_SPACE_ID;
+ body->m_space_id = 0xce; /* uint32 */
+ body->v_space_id = mp_bswap_u32(space_id);
+ body->k_tuple = IPROTO_TUPLE;
+}
+
/**
* Xrow keys for Vinyl run information.
* @sa struct vy_run_info.
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index c92ed82b..ea197cad 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -432,11 +432,7 @@ checkpoint_write_tuple(struct xlog *l, uint32_t space_id, uint32_t group_id,
const char *data, uint32_t size)
{
struct request_replace_body body;
- body.m_body = 0x82; /* map of two elements. */
- body.k_space_id = IPROTO_SPACE_ID;
- body.m_space_id = 0xce; /* uint32 */
- body.v_space_id = mp_bswap_u32(space_id);
- body.k_tuple = IPROTO_TUPLE;
+ request_replace_body_create(&body, space_id);
struct xrow_header row;
memset(&row, 0, sizeof(struct xrow_header));
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 12/13] test: disable replication/on_schema_init
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
` (10 preceding siblings ...)
2019-08-10 10:03 ` [PATCH 11/13] xrow: factor out helper for setting REPLACE request body Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
2019-08-12 22:31 ` [tarantool-patches] " Konstantin Osipov
2019-08-10 10:03 ` [PATCH 13/13] relay: join new replicas off read view Vladimir Davydov
12 siblings, 1 reply; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
The test uses box.on_schema_init to install space.before_replace trigger
that changes the engine/locality of a space received by a replica. This
works, only because we don't make a snapshot after creating those spaces
on the master so that they are relayed from an xlog. If we added
box.snapshot(), the test would fail, because space.before_replace
trigger isn't run for changes received on initial join (see #4417).
Once we make the initial join stage work off the current read view
rather than the last snapshot (see #1271), the test will fail as well.
Let's disable the test until the issue is resolved.
Needed for #1271
See #4417
---
test/replication/suite.ini | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index ac35b94a..2f7cab27 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -2,7 +2,7 @@
core = tarantool
script = master.lua
description = tarantool/box, replication
-disabled = consistent.test.lua
+disabled = consistent.test.lua on_schema_init.test.lua
release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua
config = suite.cfg
lua_libs = lua/fast_replica.lua lua/rlimit.lua
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [PATCH 13/13] relay: join new replicas off read view
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
` (11 preceding siblings ...)
2019-08-10 10:03 ` [PATCH 12/13] test: disable replication/on_schema_init Vladimir Davydov
@ 2019-08-10 10:03 ` Vladimir Davydov
12 siblings, 0 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-10 10:03 UTC (permalink / raw)
To: tarantool-patches
Historically, we join a new replica off the last checkpoint. As a
result, we must always keep the last memtx snapshot and all vinyl data
files corresponding to it. Actually, there's no need to use the last
checkpoint for joining a replica. Instead we can use the current read
view as both memtx and vinyl support it. This should speed up the
process of joining a new replica, because we don't need to replay all
xlogs written after the last checkpoint, only those that are accumulated
while we are relaying the current read view. This should also allow us
to avoid creating a snapshot file on bootstrap, because the only reason
why we need it is allowing joining replicas. Besides, this is a step
towards decoupling the vinyl metadata log from checkpointing in
particular and from xlogs in general.
Closes #1271
---
src/box/blackhole.c | 3 +-
src/box/box.cc | 33 +-
src/box/engine.c | 21 --
src/box/engine.h | 27 +-
src/box/memtx_engine.c | 74 ----
src/box/relay.cc | 170 +++++++--
src/box/relay.h | 2 +-
src/box/sysview.c | 3 +-
src/box/vinyl.c | 374 ++++----------------
src/box/vy_run.c | 6 -
src/box/vy_tx.c | 10 +-
src/box/vy_tx.h | 8 +
src/lib/core/errinj.h | 2 +-
test/box/errinj.result | 38 +-
test/replication-py/cluster.result | 13 -
test/replication-py/cluster.test.py | 25 --
test/replication/join_without_snap.result | 88 +++++
test/replication/join_without_snap.test.lua | 32 ++
test/replication/suite.cfg | 1 +
test/vinyl/errinj.result | 4 +-
test/vinyl/errinj.test.lua | 4 +-
test/xlog/panic_on_broken_lsn.result | 31 +-
test/xlog/panic_on_broken_lsn.test.lua | 20 +-
23 files changed, 419 insertions(+), 570 deletions(-)
create mode 100644 test/replication/join_without_snap.result
create mode 100644 test/replication/join_without_snap.test.lua
diff --git a/src/box/blackhole.c b/src/box/blackhole.c
index b69e543a..d2fa82be 100644
--- a/src/box/blackhole.c
+++ b/src/box/blackhole.c
@@ -178,7 +178,6 @@ blackhole_engine_create_space(struct engine *engine, struct space_def *def,
static const struct engine_vtab blackhole_engine_vtab = {
/* .shutdown = */ blackhole_engine_shutdown,
/* .create_space = */ blackhole_engine_create_space,
- /* .join = */ generic_engine_join,
/* .begin = */ generic_engine_begin,
/* .begin_statement = */ generic_engine_begin_statement,
/* .prepare = */ generic_engine_prepare,
@@ -213,6 +212,6 @@ blackhole_engine_new(void)
engine->vtab = &blackhole_engine_vtab;
engine->name = "blackhole";
- engine->flags = ENGINE_BYPASS_TX;
+ engine->flags = ENGINE_BYPASS_TX | ENGINE_EXCLUDE_FROM_SNAPSHOT;
return engine;
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 66cd6d3a..95ce0bc1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1467,41 +1467,13 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
"wal_mode = 'none'");
}
- /*
- * The only case when the directory index is empty is
- * when someone has deleted a snapshot and tries to join
- * as a replica. Our best effort is to not crash in such
- * case: raise ER_MISSING_SNAPSHOT.
- */
- struct gc_checkpoint *checkpoint = gc_last_checkpoint();
- if (checkpoint == NULL)
- tnt_raise(ClientError, ER_MISSING_SNAPSHOT);
-
- /* Remember start vclock. */
- struct vclock start_vclock;
- vclock_copy(&start_vclock, &checkpoint->vclock);
-
- /*
- * Make sure the checkpoint files won't be deleted while
- * initial join is in progress.
- */
- struct gc_checkpoint_ref gc;
- gc_ref_checkpoint(checkpoint, &gc, "replica %s",
- tt_uuid_str(&instance_uuid));
- auto gc_guard = make_scoped_guard([&]{ gc_unref_checkpoint(&gc); });
-
- /* Respond to JOIN request with start_vclock. */
- struct xrow_header row;
- xrow_encode_vclock_xc(&row, &start_vclock);
- row.sync = header->sync;
- coio_write_xrow(io, &row);
-
say_info("joining replica %s at %s",
tt_uuid_str(&instance_uuid), sio_socketname(io->fd));
/*
* Initial stream: feed replica with dirty data from engines.
*/
+ struct vclock start_vclock;
relay_initial_join(io->fd, header->sync, &start_vclock);
say_info("initial data sent.");
@@ -1513,6 +1485,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
*/
box_on_join(&instance_uuid);
+ ERROR_INJECT_YIELD(ERRINJ_REPLICA_JOIN_DELAY);
+
/* Remember master's vclock after the last request */
struct vclock stop_vclock;
vclock_copy(&stop_vclock, &replicaset.vclock);
@@ -1530,6 +1504,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
diag_raise();
/* Send end of initial stage data marker */
+ struct xrow_header row;
xrow_encode_vclock_xc(&row, &stop_vclock);
row.sync = header->sync;
coio_write_xrow(io, &row);
diff --git a/src/box/engine.c b/src/box/engine.c
index a52d0ed1..73ff0464 100644
--- a/src/box/engine.c
+++ b/src/box/engine.c
@@ -174,17 +174,6 @@ engine_backup(const struct vclock *vclock, engine_backup_cb cb, void *cb_arg)
return 0;
}
-int
-engine_join(const struct vclock *vclock, struct xstream *stream)
-{
- struct engine *engine;
- engine_foreach(engine) {
- if (engine->vtab->join(engine, vclock, stream) != 0)
- return -1;
- }
- return 0;
-}
-
void
engine_memory_stat(struct engine_memory_stat *stat)
{
@@ -204,16 +193,6 @@ engine_reset_stat(void)
/* {{{ Virtual method stubs */
-int
-generic_engine_join(struct engine *engine, const struct vclock *vclock,
- struct xstream *stream)
-{
- (void)engine;
- (void)vclock;
- (void)stream;
- return 0;
-}
-
int
generic_engine_begin(struct engine *engine, struct txn *txn)
{
diff --git a/src/box/engine.h b/src/box/engine.h
index a302b3bc..c83042b4 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -73,11 +73,6 @@ struct engine_vtab {
/** Allocate a new space instance. */
struct space *(*create_space)(struct engine *engine,
struct space_def *def, struct rlist *key_list);
- /**
- * Write statements stored in checkpoint @vclock to @stream.
- */
- int (*join)(struct engine *engine, const struct vclock *vclock,
- struct xstream *stream);
/**
* Begin a new single or multi-statement transaction.
* Called on first statement in a transaction, not when
@@ -205,6 +200,12 @@ enum {
* transactions w/o throwing ER_CROSS_ENGINE_TRANSACTION.
*/
ENGINE_BYPASS_TX = 1 << 0,
+ /**
+ * This flag is set for virtual engines, such as sysview,
+ * that don't actually store any data. It means that we
+ * must not relay their content to a newly joined replica.
+ */
+ ENGINE_EXCLUDE_FROM_SNAPSHOT = 1 << 1,
};
struct engine {
@@ -330,13 +331,6 @@ engine_begin_final_recovery(void);
int
engine_end_recovery(void);
-/**
- * Feed checkpoint data as join events to the replicas.
- * (called on the master).
- */
-int
-engine_join(const struct vclock *vclock, struct xstream *stream);
-
int
engine_begin_checkpoint(void);
@@ -364,8 +358,6 @@ engine_reset_stat(void);
/*
* Virtual method stubs.
*/
-int generic_engine_join(struct engine *, const struct vclock *,
- struct xstream *);
int generic_engine_begin(struct engine *, struct txn *);
int generic_engine_begin_statement(struct engine *, struct txn *);
int generic_engine_prepare(struct engine *, struct txn *);
@@ -468,13 +460,6 @@ engine_end_recovery_xc(void)
diag_raise();
}
-static inline void
-engine_join_xc(const struct vclock *vclock, struct xstream *stream)
-{
- if (engine_join(vclock, stream) != 0)
- diag_raise();
-}
-
#endif /* defined(__cplusplus) */
#endif /* TARANTOOL_BOX_ENGINE_H_INCLUDED */
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index ea197cad..28c516c2 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -726,79 +726,6 @@ memtx_engine_backup(struct engine *engine, const struct vclock *vclock,
return cb(filename, cb_arg);
}
-/** Used to pass arguments to memtx_initial_join_f */
-struct memtx_join_arg {
- const char *snap_dirname;
- int64_t checkpoint_lsn;
- struct xstream *stream;
-};
-
-/**
- * Invoked from a thread to feed snapshot rows.
- */
-static int
-memtx_initial_join_f(va_list ap)
-{
- struct memtx_join_arg *arg = va_arg(ap, struct memtx_join_arg *);
- const char *snap_dirname = arg->snap_dirname;
- int64_t checkpoint_lsn = arg->checkpoint_lsn;
- struct xstream *stream = arg->stream;
-
- struct xdir dir;
- /*
- * snap_dirname and INSTANCE_UUID don't change after start,
- * safe to use in another thread.
- */
- xdir_create(&dir, snap_dirname, SNAP, &INSTANCE_UUID,
- &xlog_opts_default);
- struct xlog_cursor cursor;
- int rc = xdir_open_cursor(&dir, checkpoint_lsn, &cursor);
- xdir_destroy(&dir);
- if (rc < 0)
- return -1;
-
- struct xrow_header row;
- while ((rc = xlog_cursor_next(&cursor, &row, true)) == 0) {
- rc = xstream_write(stream, &row);
- if (rc < 0)
- break;
- }
- xlog_cursor_close(&cursor, false);
- if (rc < 0)
- return -1;
-
- /**
- * We should never try to read snapshots with no EOF
- * marker - such snapshots are very likely corrupted and
- * should not be trusted.
- */
- /* TODO: replace panic with diag_set() */
- if (!xlog_cursor_is_eof(&cursor))
- panic("snapshot `%s' has no EOF marker", cursor.name);
- return 0;
-}
-
-static int
-memtx_engine_join(struct engine *engine, const struct vclock *vclock,
- struct xstream *stream)
-{
- struct memtx_engine *memtx = (struct memtx_engine *)engine;
-
- /*
- * cord_costart() passes only void * pointer as an argument.
- */
- struct memtx_join_arg arg = {
- /* .snap_dirname = */ memtx->snap_dir.dirname,
- /* .checkpoint_lsn = */ vclock_sum(vclock),
- /* .stream = */ stream
- };
-
- /* Send snapshot using a thread */
- struct cord cord;
- cord_costart(&cord, "initial_join", memtx_initial_join_f, &arg);
- return cord_cojoin(&cord);
-}
-
static int
small_stats_noop_cb(const struct mempool_stats *stats, void *cb_ctx)
{
@@ -822,7 +749,6 @@ memtx_engine_memory_stat(struct engine *engine, struct engine_memory_stat *stat)
static const struct engine_vtab memtx_engine_vtab = {
/* .shutdown = */ memtx_engine_shutdown,
/* .create_space = */ memtx_engine_create_space,
- /* .join = */ memtx_engine_join,
/* .begin = */ memtx_engine_begin,
/* .begin_statement = */ generic_engine_begin_statement,
/* .prepare = */ generic_engine_prepare,
diff --git a/src/box/relay.cc b/src/box/relay.cc
index efa3373f..a29a62fa 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -41,11 +41,13 @@
#include "coio.h"
#include "coio_task.h"
-#include "engine.h"
#include "gc.h"
+#include "index.h"
#include "iproto_constants.h"
#include "recovery.h"
#include "replication.h"
+#include "schema.h"
+#include "space.h"
#include "trigger.h"
#include "vclock.h"
#include "version.h"
@@ -168,8 +170,6 @@ relay_last_row_time(const struct relay *relay)
static void
relay_send(struct relay *relay, struct xrow_header *packet);
static void
-relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row);
-static void
relay_send_row(struct xstream *stream, struct xrow_header *row);
struct relay *
@@ -283,20 +283,156 @@ relay_set_cord_name(int fd)
cord_set_name(name);
}
+/**
+ * A space to feed to a replica on initial join.
+ */
+struct relay_space {
+ /** Link in the list of spaces to feed. */
+ struct rlist link;
+ /** Space id. */
+ uint32_t space_id;
+ /** Read view iterator. */
+ struct snapshot_iterator *iterator;
+};
+
+/**
+ * Add a space to the list of spaces to feed to a replica
+ * if eligible. We don't need to relay the following kinds
+ * of spaces:
+ *
+ * - Temporary spaces, apparently.
+ * - Spaces that are local to this instance.
+ * - Virtual spaces, such as sysview.
+ * - Spaces that don't have the primary index.
+ */
+static int
+relay_space_add(struct space *sp, void *data)
+{
+ struct rlist *spaces = (struct rlist *)data;
+
+ if (space_is_temporary(sp))
+ return 0;
+ if (space_group_id(sp) == GROUP_LOCAL)
+ return 0;
+ if (sp->engine->flags & ENGINE_EXCLUDE_FROM_SNAPSHOT)
+ return 0;
+ struct index *pk = space_index(sp, 0);
+ if (pk == NULL)
+ return 0;
+
+ struct relay_space *r = (struct relay_space *)malloc(sizeof(*r));
+ if (r == NULL) {
+ diag_set(OutOfMemory, sizeof(*r),
+ "malloc", "struct relay_space");
+ return -1;
+ }
+ r->space_id = space_id(sp);
+ r->iterator = index_create_snapshot_iterator(pk);
+ if (r->iterator == NULL) {
+ free(r);
+ return -1;
+ }
+ rlist_add_tail_entry(spaces, r, link);
+ return 0;
+}
+
+/**
+ * Relay a single space row to a replica.
+ */
+static void
+relay_space_send_row(uint32_t space_id, const char *data, uint32_t size,
+ struct ev_io *io, uint64_t sync)
+{
+ struct request_replace_body body;
+ request_replace_body_create(&body, space_id);
+
+ struct xrow_header row;
+ memset(&row, 0, sizeof(row));
+ row.type = IPROTO_INSERT;
+ row.sync = sync;
+
+ row.bodycnt = 2;
+ row.body[0].iov_base = &body;
+ row.body[0].iov_len = sizeof(body);
+ row.body[1].iov_base = (char *)data;
+ row.body[1].iov_len = size;
+
+ coio_write_xrow(io, &row);
+}
+
+/**
+ * Relay a read view of a space content to a replica.
+ */
+static void
+relay_space_send(struct relay_space *r, struct ev_io *io, uint64_t sync)
+{
+ int rc;
+ struct snapshot_iterator *it = r->iterator;
+
+ uint32_t size;
+ const char *data;
+ while ((rc = it->next(it, &data, &size)) == 0 && data != NULL)
+ relay_space_send_row(r->space_id, data, size, io, sync);
+
+ if (rc != 0)
+ diag_raise();
+}
+
+/**
+ * Close the read view iterator associated with the space
+ * and free the container object.
+ */
+static void
+relay_space_free(struct relay_space *r)
+{
+ rlist_del_entry(r, link);
+ r->iterator->free(r->iterator);
+ free(r);
+}
+
void
relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
{
- struct relay *relay = relay_new(NULL);
- if (relay == NULL)
- diag_raise();
+ struct ev_io io;
+ coio_create(&io, fd);
- relay_start(relay, fd, sync, relay_send_initial_join_row);
- auto relay_guard = make_scoped_guard([=] {
- relay_stop(relay);
- relay_delete(relay);
+ RLIST_HEAD(spaces);
+ auto guard = make_scoped_guard([&spaces] {
+ struct relay_space *r, *next;
+ rlist_foreach_entry_safe(r, &spaces, link, next)
+ relay_space_free(r);
});
- engine_join_xc(vclock, &relay->stream);
+ /*
+ * First, we open read view iterators over spaces that need
+ * to be fed to the replica. Note, we can't yield in the loop,
+ * because otherwise we could get an inconsistent view of the
+ * database.
+ */
+ if (space_foreach(relay_space_add, &spaces) != 0)
+ diag_raise();
+
+ /*
+ * Second, we must sync WAL to make sure that all changes
+ * visible by the iterators are successfully committed.
+ */
+ if (wal_sync() != 0)
+ diag_raise();
+
+ vclock_copy(vclock, &replicaset.vclock);
+
+ /* Respond to JOIN request with the current vclock. */
+ struct xrow_header row;
+ xrow_encode_vclock_xc(&row, vclock);
+ row.sync = sync;
+ coio_write_xrow(&io, &row);
+
+ /* Finally, send the read view to the replica. */
+ struct relay_space *r, *next;
+ rlist_foreach_entry_safe(r, &spaces, link, next) {
+ relay_space_send(r, &io, sync);
+ relay_space_free(r);
+ }
}
int
@@ -697,18 +833,6 @@ relay_send(struct relay *relay, struct xrow_header *packet)
fiber_sleep(inj->dparam);
}
-static void
-relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
-{
- struct relay *relay = container_of(stream, struct relay, stream);
- /*
- * Ignore replica local requests as we don't need to promote
- * vclock while sending a snapshot.
- */
- if (row->group_id != GROUP_LOCAL)
- relay_send(relay, row);
-}
-
/** Send a single row to the client. */
static void
relay_send_row(struct xstream *stream, struct xrow_header *packet)
diff --git a/src/box/relay.h b/src/box/relay.h
index 869f8f2e..e1782d78 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -102,7 +102,7 @@ relay_last_row_time(const struct relay *relay);
*
* @param fd client connection
* @param sync sync from incoming JOIN request
- * @param vclock vclock of the last checkpoint
+ * @param vclock[out] vclock of the read view sent to the replica
*/
void
relay_initial_join(int fd, uint64_t sync, struct vclock *vclock);
diff --git a/src/box/sysview.c b/src/box/sysview.c
index 46cf1e13..8c2d63c9 100644
--- a/src/box/sysview.c
+++ b/src/box/sysview.c
@@ -566,7 +566,6 @@ sysview_engine_create_space(struct engine *engine, struct space_def *def,
static const struct engine_vtab sysview_engine_vtab = {
/* .shutdown = */ sysview_engine_shutdown,
/* .create_space = */ sysview_engine_create_space,
- /* .join = */ generic_engine_join,
/* .begin = */ generic_engine_begin,
/* .begin_statement = */ generic_engine_begin_statement,
/* .prepare = */ generic_engine_prepare,
@@ -601,6 +600,6 @@ sysview_engine_new(void)
sysview->base.vtab = &sysview_engine_vtab;
sysview->base.name = "sysview";
- sysview->base.flags = ENGINE_BYPASS_TX;
+ sysview->base.flags = ENGINE_BYPASS_TX | ENGINE_EXCLUDE_FROM_SNAPSHOT;
return sysview;
}
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 9e93153b..297c3a40 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -177,6 +177,12 @@ struct vinyl_iterator {
struct trigger on_tx_destroy;
};
+struct vinyl_snapshot_iterator {
+ struct snapshot_iterator base;
+ struct vy_read_view *rv;
+ struct vy_read_iterator iterator;
+};
+
static const struct engine_vtab vinyl_engine_vtab;
static const struct space_vtab vinyl_space_vtab;
static const struct index_vtab vinyl_index_vtab;
@@ -2914,311 +2920,6 @@ vinyl_engine_end_recovery(struct engine *engine)
/** {{{ Replication */
-/** Relay context, passed to all relay functions. */
-struct vy_join_ctx {
- /** Environment. */
- struct vy_env *env;
- /** Stream to relay statements to. */
- struct xstream *stream;
- /** Pipe to the relay thread. */
- struct cpipe relay_pipe;
- /** Pipe to the tx thread. */
- struct cpipe tx_pipe;
- /**
- * Cbus message, used for calling functions
- * on behalf of the relay thread.
- */
- struct cbus_call_msg cmsg;
- /** ID of the space currently being relayed. */
- uint32_t space_id;
- /**
- * LSM tree key definition, as defined by the user.
- * We only send the primary key, so the definition
- * provided by the user is correct for compare.
- */
- struct key_def *key_def;
- /** LSM tree format used for REPLACE and DELETE statements. */
- struct tuple_format *format;
- /**
- * Write iterator for merging runs before sending
- * them to the replica.
- */
- struct vy_stmt_stream *wi;
- /**
- * List of run slices of the current range, linked by
- * vy_slice::in_join. The newer a slice the closer it
- * is to the head of the list.
- */
- struct rlist slices;
-};
-
-/**
- * Recover a slice and add it to the list of slices.
- * Newer slices are supposed to be recovered first.
- * Returns 0 on success, -1 on failure.
- */
-static int
-vy_prepare_send_slice(struct vy_join_ctx *ctx,
- struct vy_slice_recovery_info *slice_info)
-{
- int rc = -1;
- struct vy_run *run = NULL;
- struct vy_entry begin = vy_entry_none();
- struct vy_entry end = vy_entry_none();
-
- run = vy_run_new(&ctx->env->run_env, slice_info->run->id);
- if (run == NULL)
- goto out;
- if (vy_run_recover(run, ctx->env->path, ctx->space_id, 0,
- ctx->key_def) != 0)
- goto out;
-
- if (slice_info->begin != NULL) {
- begin = vy_entry_key_from_msgpack(ctx->env->lsm_env.key_format,
- ctx->key_def,
- slice_info->begin);
- if (begin.stmt == NULL)
- goto out;
- }
- if (slice_info->end != NULL) {
- end = vy_entry_key_from_msgpack(ctx->env->lsm_env.key_format,
- ctx->key_def,
- slice_info->end);
- if (end.stmt == NULL)
- goto out;
- }
-
- struct vy_slice *slice = vy_slice_new(slice_info->id, run,
- begin, end, ctx->key_def);
- if (slice == NULL)
- goto out;
-
- rlist_add_tail_entry(&ctx->slices, slice, in_join);
- rc = 0;
-out:
- if (run != NULL)
- vy_run_unref(run);
- if (begin.stmt != NULL)
- tuple_unref(begin.stmt);
- if (end.stmt != NULL)
- tuple_unref(end.stmt);
- return rc;
-}
-
-static int
-vy_send_range_f(struct cbus_call_msg *cmsg)
-{
- struct vy_join_ctx *ctx = container_of(cmsg, struct vy_join_ctx, cmsg);
-
- int rc = ctx->wi->iface->start(ctx->wi);
- if (rc != 0)
- goto err;
- struct vy_entry entry;
- while ((rc = ctx->wi->iface->next(ctx->wi, &entry)) == 0 &&
- entry.stmt != NULL) {
- struct xrow_header xrow;
- rc = vy_stmt_encode_primary(entry.stmt, ctx->key_def,
- ctx->space_id, &xrow);
- if (rc != 0)
- break;
- /*
- * Reset the LSN as the replica will ignore it
- * anyway - see comment to vy_env::join_lsn.
- */
- xrow.lsn = 0;
- rc = xstream_write(ctx->stream, &xrow);
- if (rc != 0)
- break;
- fiber_gc();
- }
-err:
- ctx->wi->iface->stop(ctx->wi);
- fiber_gc();
- return rc;
-}
-
-/** Merge and send all runs of the given range. */
-static int
-vy_send_range(struct vy_join_ctx *ctx,
- struct vy_range_recovery_info *range_info)
-{
- int rc;
- struct vy_slice *slice, *tmp;
-
- if (rlist_empty(&range_info->slices))
- return 0; /* nothing to do */
-
- /* Recover slices. */
- struct vy_slice_recovery_info *slice_info;
- rlist_foreach_entry(slice_info, &range_info->slices, in_range) {
- rc = vy_prepare_send_slice(ctx, slice_info);
- if (rc != 0)
- goto out_delete_slices;
- }
-
- /* Create a write iterator. */
- struct rlist fake_read_views;
- rlist_create(&fake_read_views);
- ctx->wi = vy_write_iterator_new(ctx->key_def, true, true,
- &fake_read_views, NULL);
- if (ctx->wi == NULL) {
- rc = -1;
- goto out;
- }
- rlist_foreach_entry(slice, &ctx->slices, in_join) {
- rc = vy_write_iterator_new_slice(ctx->wi, slice, ctx->format);
- if (rc != 0)
- goto out_delete_wi;
- }
-
- /* Do the actual work from the relay thread. */
- bool cancellable = fiber_set_cancellable(false);
- rc = cbus_call(&ctx->relay_pipe, &ctx->tx_pipe, &ctx->cmsg,
- vy_send_range_f, NULL, TIMEOUT_INFINITY);
- fiber_set_cancellable(cancellable);
-
-out_delete_wi:
- ctx->wi->iface->close(ctx->wi);
- ctx->wi = NULL;
-out_delete_slices:
- rlist_foreach_entry_safe(slice, &ctx->slices, in_join, tmp)
- vy_slice_delete(slice);
- rlist_create(&ctx->slices);
-out:
- return rc;
-}
-
-/** Send all tuples stored in the given LSM tree. */
-static int
-vy_send_lsm(struct vy_join_ctx *ctx, struct vy_lsm_recovery_info *lsm_info)
-{
- int rc = -1;
-
- if (lsm_info->drop_lsn >= 0 || lsm_info->create_lsn < 0) {
- /* Dropped or not yet built LSM tree. */
- return 0;
- }
- if (lsm_info->group_id == GROUP_LOCAL) {
- /* Replica local space. */
- return 0;
- }
-
- /*
- * We are only interested in the primary index LSM tree.
- * Secondary keys will be rebuilt on the destination.
- */
- if (lsm_info->index_id != 0)
- return 0;
-
- ctx->space_id = lsm_info->space_id;
-
- /* Create key definition and tuple format. */
- ctx->key_def = key_def_new(lsm_info->key_parts,
- lsm_info->key_part_count, false);
- if (ctx->key_def == NULL)
- goto out;
- ctx->format = vy_stmt_format_new(&ctx->env->stmt_env, &ctx->key_def, 1,
- NULL, 0, 0, NULL);
- if (ctx->format == NULL)
- goto out_free_key_def;
- tuple_format_ref(ctx->format);
-
- /* Send ranges. */
- struct vy_range_recovery_info *range_info;
- assert(!rlist_empty(&lsm_info->ranges));
- rlist_foreach_entry(range_info, &lsm_info->ranges, in_lsm) {
- rc = vy_send_range(ctx, range_info);
- if (rc != 0)
- break;
- }
-
- tuple_format_unref(ctx->format);
- ctx->format = NULL;
-out_free_key_def:
- key_def_delete(ctx->key_def);
- ctx->key_def = NULL;
-out:
- return rc;
-}
-
-/** Relay cord function. */
-static int
-vy_join_f(va_list ap)
-{
- struct vy_join_ctx *ctx = va_arg(ap, struct vy_join_ctx *);
-
- coio_enable();
-
- cpipe_create(&ctx->tx_pipe, "tx");
-
- struct cbus_endpoint endpoint;
- cbus_endpoint_create(&endpoint, cord_name(cord()),
- fiber_schedule_cb, fiber());
-
- cbus_loop(&endpoint);
-
- cbus_endpoint_destroy(&endpoint, cbus_process);
- cpipe_destroy(&ctx->tx_pipe);
- return 0;
-}
-
-static int
-vinyl_engine_join(struct engine *engine, const struct vclock *vclock,
- struct xstream *stream)
-{
- struct vy_env *env = vy_env(engine);
- int rc = -1;
-
- /* Allocate the relay context. */
- struct vy_join_ctx *ctx = malloc(sizeof(*ctx));
- if (ctx == NULL) {
- diag_set(OutOfMemory, PATH_MAX, "malloc", "struct vy_join_ctx");
- goto out;
- }
- memset(ctx, 0, sizeof(*ctx));
- ctx->env = env;
- ctx->stream = stream;
- rlist_create(&ctx->slices);
-
- /* Start the relay cord. */
- char name[FIBER_NAME_MAX];
- snprintf(name, sizeof(name), "initial_join_%p", stream);
- struct cord cord;
- if (cord_costart(&cord, name, vy_join_f, ctx) != 0)
- goto out_free_ctx;
- cpipe_create(&ctx->relay_pipe, name);
-
- /*
- * Load the recovery context from the given point in time.
- * Send all runs stored in it to the replica.
- */
- struct vy_recovery *recovery;
- recovery = vy_recovery_new(vclock_sum(vclock),
- VY_RECOVERY_LOAD_CHECKPOINT);
- if (recovery == NULL) {
- say_error("failed to recover vylog to join a replica");
- goto out_join_cord;
- }
- rc = 0;
- struct vy_lsm_recovery_info *lsm_info;
- rlist_foreach_entry(lsm_info, &recovery->lsms, in_recovery) {
- rc = vy_send_lsm(ctx, lsm_info);
- if (rc != 0)
- break;
- }
- vy_recovery_delete(recovery);
-
-out_join_cord:
- cbus_stop_loop(&ctx->relay_pipe);
- cpipe_destroy(&ctx->relay_pipe);
- if (cord_cojoin(&cord) != 0)
- rc = -1;
-out_free_ctx:
- free(ctx);
-out:
- return rc;
-}
-
static int
vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
{
@@ -3939,6 +3640,66 @@ vinyl_index_create_iterator(struct index *base, enum iterator_type type,
return (struct iterator *)it;
}
+static int
+vinyl_snapshot_iterator_next(struct snapshot_iterator *base,
+ const char **data, uint32_t *size)
+{
+ assert(base->next == vinyl_snapshot_iterator_next);
+ struct vinyl_snapshot_iterator *it =
+ (struct vinyl_snapshot_iterator *)base;
+ struct vy_entry entry;
+ if (vy_read_iterator_next(&it->iterator, &entry) != 0)
+ return -1;
+ *data = entry.stmt != NULL ? tuple_data_range(entry.stmt, size) : NULL;
+ return 0;
+}
+
+static void
+vinyl_snapshot_iterator_free(struct snapshot_iterator *base)
+{
+ assert(base->free == vinyl_snapshot_iterator_free);
+ struct vinyl_snapshot_iterator *it =
+ (struct vinyl_snapshot_iterator *)base;
+ struct vy_lsm *lsm = it->iterator.lsm;
+ struct vy_env *env = vy_env(lsm->base.engine);
+ vy_read_iterator_close(&it->iterator);
+ tx_manager_destroy_read_view(env->xm, it->rv);
+ vy_lsm_unref(lsm);
+ free(it);
+}
+
+static struct snapshot_iterator *
+vinyl_index_create_snapshot_iterator(struct index *base)
+{
+ struct vy_lsm *lsm = vy_lsm(base);
+ struct vy_env *env = vy_env(base->engine);
+
+ struct vinyl_snapshot_iterator *it = malloc(sizeof(*it));
+ if (it == NULL) {
+ diag_set(OutOfMemory, sizeof(*it), "malloc",
+ "struct vinyl_snapshot_iterator");
+ return NULL;
+ }
+ it->base.next = vinyl_snapshot_iterator_next;
+ it->base.free = vinyl_snapshot_iterator_free;
+
+ it->rv = tx_manager_read_view(env->xm);
+ if (it->rv == NULL) {
+ free(it);
+ return NULL;
+ }
+ vy_read_iterator_open(&it->iterator, lsm, NULL,
+ ITER_ALL, lsm->env->empty_key,
+ (const struct vy_read_view **)&it->rv);
+ /*
+ * The index may be dropped while we are reading it.
+ * The iterator must go on as if nothing happened.
+ */
+ vy_lsm_ref(lsm);
+
+ return &it->base;
+}
+
static int
vinyl_index_get(struct index *index, const char *key,
uint32_t part_count, struct tuple **ret)
@@ -4665,7 +4426,6 @@ static struct trigger on_replace_vinyl_deferred_delete = {
static const struct engine_vtab vinyl_engine_vtab = {
/* .shutdown = */ vinyl_engine_shutdown,
/* .create_space = */ vinyl_engine_create_space,
- /* .join = */ vinyl_engine_join,
/* .begin = */ vinyl_engine_begin,
/* .begin_statement = */ vinyl_engine_begin_statement,
/* .prepare = */ vinyl_engine_prepare,
@@ -4732,7 +4492,7 @@ static const struct index_vtab vinyl_index_vtab = {
/* .replace = */ generic_index_replace,
/* .create_iterator = */ vinyl_index_create_iterator,
/* .create_snapshot_iterator = */
- generic_index_create_snapshot_iterator,
+ vinyl_index_create_snapshot_iterator,
/* .stat = */ vinyl_index_stat,
/* .compact = */ vinyl_index_compact,
/* .reset_stat = */ vinyl_index_reset_stat,
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index c6c17aee..25b6dcd3 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -1675,14 +1675,8 @@ vy_run_recover(struct vy_run *run, const char *dir,
/* Read run header. */
struct xrow_header xrow;
- ERROR_INJECT(ERRINJ_VYRUN_INDEX_GARBAGE, {
- errinj(ERRINJ_XLOG_GARBAGE, ERRINJ_BOOL)->bparam = true;
- });
/* all rows should be in one tx */
int rc = xlog_cursor_next_tx(&cursor);
- ERROR_INJECT(ERRINJ_VYRUN_INDEX_GARBAGE, {
- errinj(ERRINJ_XLOG_GARBAGE, ERRINJ_BOOL)->bparam = false;
- });
if (rc != 0) {
if (rc > 0)
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 9b300fde..6884ed8d 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -156,8 +156,7 @@ tx_manager_mem_used(struct tx_manager *xm)
return ret;
}
-/** Create or reuse an instance of a read view. */
-static struct vy_read_view *
+struct vy_read_view *
tx_manager_read_view(struct tx_manager *xm)
{
struct vy_read_view *rv;
@@ -195,12 +194,9 @@ tx_manager_read_view(struct tx_manager *xm)
return rv;
}
-/** Dereference and possibly destroy a read view. */
-static void
-tx_manager_destroy_read_view(struct tx_manager *xm,
- const struct vy_read_view *read_view)
+void
+tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv)
{
- struct vy_read_view *rv = (struct vy_read_view *) read_view;
if (rv == xm->p_global_read_view)
return;
assert(rv->refs);
diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h
index 376f4330..3144c921 100644
--- a/src/box/vy_tx.h
+++ b/src/box/vy_tx.h
@@ -289,6 +289,14 @@ tx_manager_delete(struct tx_manager *xm);
size_t
tx_manager_mem_used(struct tx_manager *xm);
+/** Create or reuse an instance of a read view. */
+struct vy_read_view *
+tx_manager_read_view(struct tx_manager *xm);
+
+/** Dereference and possibly destroy a read view. */
+void
+tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv);
+
/**
* Abort all rw transactions that affect the given space
* and haven't reached WAL yet. Called before executing a DDL
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index fe9c2237..cda179e9 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -101,11 +101,11 @@ struct errinj {
_(ERRINJ_RELAY_REPORT_INTERVAL, ERRINJ_DOUBLE, {.dparam = 0}) \
_(ERRINJ_RELAY_FINAL_SLEEP, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_RELAY_FINAL_JOIN, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_REPLICA_JOIN_DELAY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_PORT_DUMP, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_XLOG_GARBAGE, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_XLOG_META, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_XLOG_READ, ERRINJ_INT, {.iparam = -1}) \
- _(ERRINJ_VYRUN_INDEX_GARBAGE, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_VYRUN_DATA_READ, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_CHECK_FORMAT_DELAY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_BUILD_INDEX, ERRINJ_INT, {.iparam = -1}) \
diff --git a/test/box/errinj.result b/test/box/errinj.result
index af2f8877..e7d58f49 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -38,17 +38,17 @@ errinj.info()
state: -1
ERRINJ_WAL_WRITE_EOF:
state: false
- ERRINJ_VY_LOG_FILE_RENAME:
- state: false
- ERRINJ_VYRUN_INDEX_GARBAGE:
- state: false
- ERRINJ_BUILD_INDEX_DELAY:
+ ERRINJ_HTTP_RESPONSE_ADD_WAIT:
state: false
ERRINJ_BUILD_INDEX:
state: -1
- ERRINJ_VY_INDEX_FILE_RENAME:
+ ERRINJ_BUILD_INDEX_DELAY:
state: false
- ERRINJ_CHECK_FORMAT_DELAY:
+ ERRINJ_VY_RUN_FILE_RENAME:
+ state: false
+ ERRINJ_VY_COMPACTION_DELAY:
+ state: false
+ ERRINJ_VY_DUMP_DELAY:
state: false
ERRINJ_VY_DELAY_PK_LOOKUP:
state: false
@@ -56,16 +56,16 @@ errinj.info()
state: false
ERRINJ_PORT_DUMP:
state: false
- ERRINJ_VY_DUMP_DELAY:
- state: false
+ ERRINJ_WAL_BREAK_LSN:
+ state: -1
ERRINJ_WAL_IO:
state: false
ERRINJ_WAL_FALLOCATE:
state: 0
- ERRINJ_WAL_BREAK_LSN:
- state: -1
ERRINJ_RELAY_BREAK_LSN:
state: -1
+ ERRINJ_VY_INDEX_FILE_RENAME:
+ state: false
ERRINJ_TUPLE_FORMAT_COUNT:
state: -1
ERRINJ_TUPLE_ALLOC:
@@ -76,7 +76,7 @@ errinj.info()
state: false
ERRINJ_RELAY_REPORT_INTERVAL:
state: 0
- ERRINJ_VY_RUN_FILE_RENAME:
+ ERRINJ_VY_LOG_FILE_RENAME:
state: false
ERRINJ_VY_READ_PAGE_TIMEOUT:
state: 0
@@ -84,23 +84,23 @@ errinj.info()
state: false
ERRINJ_SIO_READ_MAX:
state: -1
- ERRINJ_HTTP_RESPONSE_ADD_WAIT:
- state: false
- ERRINJ_WAL_WRITE_DISK:
- state: false
ERRINJ_SNAP_COMMIT_DELAY:
state: false
+ ERRINJ_WAL_WRITE_DISK:
+ state: false
ERRINJ_SNAP_WRITE_DELAY:
state: false
- ERRINJ_VY_RUN_WRITE:
- state: false
ERRINJ_LOG_ROTATE:
state: false
+ ERRINJ_VY_RUN_WRITE:
+ state: false
+ ERRINJ_CHECK_FORMAT_DELAY:
+ state: false
ERRINJ_VY_LOG_FLUSH_DELAY:
state: false
ERRINJ_RELAY_FINAL_JOIN:
state: false
- ERRINJ_VY_COMPACTION_DELAY:
+ ERRINJ_REPLICA_JOIN_DELAY:
state: false
ERRINJ_RELAY_FINAL_SLEEP:
state: false
diff --git a/test/replication-py/cluster.result b/test/replication-py/cluster.result
index 04f06f74..f68a6af7 100644
--- a/test/replication-py/cluster.result
+++ b/test/replication-py/cluster.result
@@ -23,19 +23,6 @@ box.schema.user.grant('guest', 'replication')
...
ok - join with granted role
-------------------------------------------------------------
-gh-707: Master crashes on JOIN if it does not have snapshot files
-gh-480: If socket is closed while JOIN, replica wont reconnect
--------------------------------------------------------------
-ok - join without snapshots
-ok - _cluster did not change after unsuccessful JOIN
-box.schema.user.revoke('guest', 'replication')
----
-...
-box.snapshot()
----
-- ok
-...
--------------------------------------------------------------
gh-434: Assertion if replace _cluster tuple for local server
-------------------------------------------------------------
box.space._cluster:replace{1, require('uuid').NULL:str()}
diff --git a/test/replication-py/cluster.test.py b/test/replication-py/cluster.test.py
index 0140a6bd..088ca9c3 100644
--- a/test/replication-py/cluster.test.py
+++ b/test/replication-py/cluster.test.py
@@ -71,31 +71,6 @@ server.iproto.reconnect() # re-connect with new permissions
server_id = check_join('join with granted role')
server.iproto.py_con.space('_cluster').delete(server_id)
-print '-------------------------------------------------------------'
-print 'gh-707: Master crashes on JOIN if it does not have snapshot files'
-print 'gh-480: If socket is closed while JOIN, replica wont reconnect'
-print '-------------------------------------------------------------'
-
-data_dir = os.path.join(server.vardir, server.name)
-for k in glob.glob(os.path.join(data_dir, '*.snap')):
- os.unlink(k)
-
-# remember the number of servers in _cluster table
-server_count = len(server.iproto.py_con.space('_cluster').select(()))
-
-rows = list(server.iproto.py_con.join(replica_uuid))
-print len(rows) > 0 and rows[-1].return_message.find('.snap') >= 0 and \
- 'ok' or 'not ok', '-', 'join without snapshots'
-res = server.iproto.py_con.space('_cluster').select(())
-if server_count <= len(res):
- print 'ok - _cluster did not change after unsuccessful JOIN'
-else:
- print 'not ok - _cluster did change after unsuccessful JOIN'
- print res
-
-server.admin("box.schema.user.revoke('guest', 'replication')")
-server.admin('box.snapshot()')
-
print '-------------------------------------------------------------'
print 'gh-434: Assertion if replace _cluster tuple for local server'
print '-------------------------------------------------------------'
diff --git a/test/replication/join_without_snap.result b/test/replication/join_without_snap.result
new file mode 100644
index 00000000..becdfd21
--- /dev/null
+++ b/test/replication/join_without_snap.result
@@ -0,0 +1,88 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--
+-- gh-1271: check that replica join works off the current read view,
+-- not the last checkpoint. To do that, delete the last snapshot file
+-- and check that a replica can still join.
+--
+_ = box.schema.space.create('test')
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+for i = 1, 5 do box.space.test:insert{i} end
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+
+fio = require('fio')
+ | ---
+ | ...
+fio.unlink(fio.pathjoin(box.cfg.memtx_dir, string.format('%020d.snap', box.info.signature)))
+ | ---
+ | - true
+ | ...
+
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('switch replica')
+ | ---
+ | - true
+ | ...
+
+box.space.test:select()
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | - [5]
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('cleanup server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+test_run:cleanup_cluster()
+ | ---
+ | ...
+
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+box.space.test:drop()
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
diff --git a/test/replication/join_without_snap.test.lua b/test/replication/join_without_snap.test.lua
new file mode 100644
index 00000000..6a23d741
--- /dev/null
+++ b/test/replication/join_without_snap.test.lua
@@ -0,0 +1,32 @@
+test_run = require('test_run').new()
+
+--
+-- gh-1271: check that replica join works off the current read view,
+-- not the last checkpoint. To do that, delete the last snapshot file
+-- and check that a replica can still join.
+--
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+for i = 1, 5 do box.space.test:insert{i} end
+box.snapshot()
+
+fio = require('fio')
+fio.unlink(fio.pathjoin(box.cfg.memtx_dir, string.format('%020d.snap', box.info.signature)))
+
+box.schema.user.grant('guest', 'replication')
+
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica.lua"')
+test_run:cmd('start server replica')
+test_run:cmd('switch replica')
+
+box.space.test:select()
+
+test_run:cmd('switch default')
+test_run:cmd('stop server replica')
+test_run:cmd('cleanup server replica')
+test_run:cmd('delete server replica')
+test_run:cleanup_cluster()
+
+box.schema.user.revoke('guest', 'replication')
+box.space.test:drop()
+box.snapshot()
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 91e884ec..eb25077d 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -10,6 +10,7 @@
"force_recovery.test.lua": {},
"on_schema_init.test.lua": {},
"long_row_timeout.test.lua": {},
+ "join_without_snap.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index e8795143..2635da26 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -1116,7 +1116,7 @@ box.snapshot()
box.schema.user.grant('guest', 'replication')
---
...
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', true)
+errinj.set('ERRINJ_VY_READ_PAGE', true)
---
- ok
...
@@ -1136,7 +1136,7 @@ test_run:cmd("delete server replica")
---
- true
...
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', false)
+errinj.set('ERRINJ_VY_READ_PAGE', false)
---
- ok
...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index 034ed34c..4230cfae 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -404,12 +404,12 @@ _ = s:create_index('pk')
s:replace{1, 2, 3}
box.snapshot()
box.schema.user.grant('guest', 'replication')
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', true)
+errinj.set('ERRINJ_VY_READ_PAGE', true)
test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
test_run:cmd("start server replica with crash_expected=True")
test_run:cmd("cleanup server replica")
test_run:cmd("delete server replica")
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', false)
+errinj.set('ERRINJ_VY_READ_PAGE', false)
box.schema.user.revoke('guest', 'replication')
s:drop()
diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result
index cddc9c3b..60283281 100644
--- a/test/xlog/panic_on_broken_lsn.result
+++ b/test/xlog/panic_on_broken_lsn.result
@@ -123,16 +123,33 @@ box.space.test:auto_increment{'v0'}
---
- [1, 'v0']
...
-lsn = box.info.vclock[1]
+-- Inject a broken LSN in the final join stage.
+lsn = -1
---
...
-box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", true)
---
- ok
...
-box.space.test:auto_increment{'v1'}
+fiber = require('fiber')
---
-- [2, 'v1']
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+_ = fiber.create(function()
+ test_run:wait_cond(function() return box.space._cluster:get(2) ~= nil end)
+ lsn = box.info.vclock[1]
+ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+ box.space.test:auto_increment{'v1'}
+ box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
+end);
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
...
test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"')
---
@@ -142,12 +159,6 @@ test_run:cmd('start server replica with crash_expected=True')
---
- false
...
-fiber = require('fiber')
----
-...
-while box.info.replication[2] == nil do fiber.sleep(0.001) end
----
-...
box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
---
- ok
diff --git a/test/xlog/panic_on_broken_lsn.test.lua b/test/xlog/panic_on_broken_lsn.test.lua
index cdf287a1..ca304345 100644
--- a/test/xlog/panic_on_broken_lsn.test.lua
+++ b/test/xlog/panic_on_broken_lsn.test.lua
@@ -57,14 +57,24 @@ box.schema.user.grant('guest', 'replication')
_ = box.schema.space.create('test', {id = 9000})
_ = box.space.test:create_index('pk')
box.space.test:auto_increment{'v0'}
-lsn = box.info.vclock[1]
-box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
-box.space.test:auto_increment{'v1'}
+
+-- Inject a broken LSN in the final join stage.
+lsn = -1
+box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", true)
+
+fiber = require('fiber')
+test_run:cmd("setopt delimiter ';'")
+_ = fiber.create(function()
+ test_run:wait_cond(function() return box.space._cluster:get(2) ~= nil end)
+ lsn = box.info.vclock[1]
+ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+ box.space.test:auto_increment{'v1'}
+ box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
+end);
+test_run:cmd("setopt delimiter ''");
test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"')
test_run:cmd('start server replica with crash_expected=True')
-fiber = require('fiber')
-while box.info.replication[2] == nil do fiber.sleep(0.001) end
box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
-- Check that log contains the mention of broken LSN and the request printout
--
2.20.1
^ permalink raw reply [flat|nested] 34+ messages in thread
* [tarantool-patches] Re: [PATCH 01/13] vinyl: embed engine in vy_env
2019-08-10 10:03 ` [PATCH 01/13] vinyl: embed engine in vy_env Vladimir Davydov
@ 2019-08-12 22:14 ` Konstantin Osipov
2019-08-14 13:09 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Konstantin Osipov @ 2019-08-12 22:14 UTC (permalink / raw)
To: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> There's no point in having vinyl_engine and vinyl_index wrapper structs
> to bind vy_env and vy_lsm to struct engine and index. Instead we can
> simply embed engine and index in vy_env and vy_lsm. This will simplify
> further development, e.g. this will allow us to move reference counting
> from vy_lsm up to struct index so that it can be used in the generic
> code.
lgtm
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 34+ messages in thread
* [tarantool-patches] Re: [PATCH 02/13] vinyl: embed index in vy_lsm
2019-08-10 10:03 ` [PATCH 02/13] vinyl: embed index in vy_lsm Vladimir Davydov
@ 2019-08-12 22:14 ` Konstantin Osipov
2019-08-14 13:09 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Konstantin Osipov @ 2019-08-12 22:14 UTC (permalink / raw)
To: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> There's no point in having vinyl_engine and vinyl_index wrapper structs
> to bind vy_env and vy_lsm to struct engine and index. Instead we can
> simply embed engine and index in vy_env and vy_lsm. This will simplify
> further development, e.g. this will allow us to move reference counting
> from vy_lsm up to struct index so that it can be used in the generic
> code.
lgtm
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 34+ messages in thread
* [tarantool-patches] Re: [PATCH 03/13] vinyl: move reference counting from vy_lsm to index
2019-08-10 10:03 ` [PATCH 03/13] vinyl: move reference counting from vy_lsm to index Vladimir Davydov
@ 2019-08-12 22:16 ` Konstantin Osipov
2019-08-14 13:09 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Konstantin Osipov @ 2019-08-12 22:16 UTC (permalink / raw)
To: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> Now, as vy_lsm and index are basically the same object, we can implement
> reference counting right in struct index. This will allow us to prevent
> an index from destruction when a space object it belongs to is freed
> anywhere in the code, not just in vinyl.
OK, you won. Let's reference count it!
LGTM.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 34+ messages in thread
* [tarantool-patches] Re: [PATCH 06/13] memtx: don't store pointers to index internals in iterator
2019-08-10 10:03 ` [PATCH 06/13] memtx: don't store pointers to index internals in iterator Vladimir Davydov
@ 2019-08-12 22:21 ` Konstantin Osipov
2019-08-14 13:10 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Konstantin Osipov @ 2019-08-12 22:21 UTC (permalink / raw)
To: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> It's pointless as we can always access the index via iterator->index.
> ---
> src/box/memtx_bitset.c | 10 ++-----
> src/box/memtx_hash.c | 20 ++++++-------
> src/box/memtx_tree.c | 67 +++++++++++++++++++++++-------------------
> 3 files changed, 49 insertions(+), 48 deletions(-)
lgtm
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 34+ messages in thread
* [tarantool-patches] Re: [PATCH 07/13] memtx: use ref counting to pin indexes for snapshot
2019-08-10 10:03 ` [PATCH 07/13] memtx: use ref counting to pin indexes for snapshot Vladimir Davydov
@ 2019-08-12 22:24 ` Konstantin Osipov
2019-08-13 10:56 ` Vladimir Davydov
0 siblings, 1 reply; 34+ messages in thread
From: Konstantin Osipov @ 2019-08-12 22:24 UTC (permalink / raw)
To: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> Currently, to prevent an index from going away while it is being
> written to a snapshot, we postpone memtx_gc_task's free() invocation
> until checkpointing is complete, see commit 94de0a081b3a ("Don't take
> schema lock for checkpointing"). This works fine, but makes it rather
> difficult to reuse snapshot iterators for other purposes, e.g. feeding
> a consistent read view to a newly joined replica.
>
> Let's instead use index reference counting for pinning indexes for
> checkpointing. A reference is taken in a snapshot iterator constructor
> and released when the snapshot iterator is destroyed.
I don't see how this can work in general, memtx index can not live
without memtx space and tuple format. These are not referenced
from the index object.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 34+ messages in thread
* [tarantool-patches] Re: [PATCH 08/13] memtx: allow snapshot iterator to fail
2019-08-10 10:03 ` [PATCH 08/13] memtx: allow snapshot iterator to fail Vladimir Davydov
@ 2019-08-12 22:25 ` Konstantin Osipov
2019-08-14 13:10 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Konstantin Osipov @ 2019-08-12 22:25 UTC (permalink / raw)
To: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> Memtx iterators never fail, that's why the snapshot iterator interface
> doesn't support failures. However, once we introduce snapshot iterator
> support for vinyl, we will need a way to handle errors in the API.
lgtm
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 34+ messages in thread
* [tarantool-patches] Re: [PATCH 09/13] memtx: enter small delayed free mode from snapshot iterator
2019-08-10 10:03 ` [PATCH 09/13] memtx: enter small delayed free mode from snapshot iterator Vladimir Davydov
@ 2019-08-12 22:27 ` Konstantin Osipov
2019-08-13 10:59 ` Vladimir Davydov
0 siblings, 1 reply; 34+ messages in thread
From: Konstantin Osipov @ 2019-08-12 22:27 UTC (permalink / raw)
To: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> We must enable SMALL_DELAYED_FREE_MODE to safely use a memtx snapshot
> iterator. Currently, we do that in checkpoint related callbacks, but if
> we want to reuse snapshot iterators for other purposes, e.g. feeding
> a read view to a newly joined replica, we better hide this code behind
> snapshot iterator constructors.
this is not enough, you may have multiple replicas joining, and
you need multiple checkpoint support for that.
Currently delayed free mode only supports one active checkpoint.
> +void
> +memtx_enter_delayed_free_mode(struct memtx_engine *memtx)
> +{
> + memtx->snapshot_version++;
> + if (memtx->delayed_free_mode++ == 0)
> + small_alloc_setopt(&memtx->alloc, SMALL_DELAYED_FREE_MODE, true);
> +}
Just adding a counter will easily never free any memory if it
never drops below 1.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 34+ messages in thread
* [tarantool-patches] Re: [PATCH 10/13] wal: make wal_sync fail on write error
2019-08-10 10:03 ` [PATCH 10/13] wal: make wal_sync fail on write error Vladimir Davydov
@ 2019-08-12 22:29 ` Konstantin Osipov
2019-08-14 16:48 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Konstantin Osipov @ 2019-08-12 22:29 UTC (permalink / raw)
To: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> wal_sync() simply flushes the tx<->wal request queue, it doesn't
> guarantee that all pending writes are successfully committed to disk.
> This works for now, but in order to implement replica join off the
> current read view, we need to make sure that all pending writes have
> been persisted and won't be rolled back before we can use memtx
> snapshot iterators. So this patch adds a return code to wal_sync():
> since now on it returns -1 if rollback is in progress and hence
> some in-memory changes are going to be rolled back. We will use
> this method after opening memtx snapshot iterators used for feeding
> a consistent read view a newly joined replica so as to ensure that
> changes frozen by the iterators have made it to the disk.
Please add a test case, otherwise LGTM.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 34+ messages in thread
* [tarantool-patches] Re: [PATCH 11/13] xrow: factor out helper for setting REPLACE request body
2019-08-10 10:03 ` [PATCH 11/13] xrow: factor out helper for setting REPLACE request body Vladimir Davydov
@ 2019-08-12 22:29 ` Konstantin Osipov
2019-08-14 13:11 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Konstantin Osipov @ 2019-08-12 22:29 UTC (permalink / raw)
To: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> We will reuse it to relay a snapshot to a newly joined replica.
lgtm
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 34+ messages in thread
* [tarantool-patches] Re: [PATCH 12/13] test: disable replication/on_schema_init
2019-08-10 10:03 ` [PATCH 12/13] test: disable replication/on_schema_init Vladimir Davydov
@ 2019-08-12 22:31 ` Konstantin Osipov
0 siblings, 0 replies; 34+ messages in thread
From: Konstantin Osipov @ 2019-08-12 22:31 UTC (permalink / raw)
To: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> The test uses box.on_schema_init to install space.before_replace trigger
> that changes the engine/locality of a space received by a replica. This
> works, only because we don't make a snapshot after creating those spaces
> on the master so that they are relayed from an xlog. If we added
> box.snapshot(), the test would fail, because space.before_replace
> trigger isn't run for changes received on initial join (see #4417).
> Once we make the initial join stage work off the current read view
> rather than the last snapshot (see #1271), the test will fail as well.
> Let's disable the test until the issue is resolved.
Ugh.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 34+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 07/13] memtx: use ref counting to pin indexes for snapshot
2019-08-12 22:24 ` [tarantool-patches] " Konstantin Osipov
@ 2019-08-13 10:56 ` Vladimir Davydov
2019-08-13 16:08 ` Georgy Kirichenko
0 siblings, 1 reply; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-13 10:56 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
On Tue, Aug 13, 2019 at 01:24:20AM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> > Currently, to prevent an index from going away while it is being
> > written to a snapshot, we postpone memtx_gc_task's free() invocation
> > until checkpointing is complete, see commit 94de0a081b3a ("Don't take
> > schema lock for checkpointing"). This works fine, but makes it rather
> > difficult to reuse snapshot iterators for other purposes, e.g. feeding
> > a consistent read view to a newly joined replica.
> >
> > Let's instead use index reference counting for pinning indexes for
> > checkpointing. A reference is taken in a snapshot iterator constructor
> > and released when the snapshot iterator is destroyed.
>
> I don't see how this can work in general, memtx index can not live
> without memtx space and tuple format. These are not referenced
> from the index object.
A memtx index doesn't need to reference a space - we move indexes from
space to sapce on alter without any problems. Regarding the tuple
format, it is referenced by tuples anyway while tuples are referenced by
the primary index so we don't need to reference it explicitly.
^ permalink raw reply [flat|nested] 34+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 09/13] memtx: enter small delayed free mode from snapshot iterator
2019-08-12 22:27 ` [tarantool-patches] " Konstantin Osipov
@ 2019-08-13 10:59 ` Vladimir Davydov
0 siblings, 0 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-13 10:59 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
On Tue, Aug 13, 2019 at 01:27:49AM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> > We must enable SMALL_DELAYED_FREE_MODE to safely use a memtx snapshot
> > iterator. Currently, we do that in checkpoint related callbacks, but if
> > we want to reuse snapshot iterators for other purposes, e.g. feeding
> > a read view to a newly joined replica, we better hide this code behind
> > snapshot iterator constructors.
>
> this is not enough, you may have multiple replicas joining, and
> you need multiple checkpoint support for that.
>
> Currently delayed free mode only supports one active checkpoint.
Yeah, sure. If replicas keep joining, deleted tuples won't be freed.
That's a pitfall. The right way to fix it is to patch the small
allocator to support generations, but it's going to take a while.
Since the number of replicas is limited and joining a new replica is
a rare event, I'd prefer to commit this for now and then look into
implementation of generational garbage collection in the small
allocator.
>
> > +void
> > +memtx_enter_delayed_free_mode(struct memtx_engine *memtx)
> > +{
> > + memtx->snapshot_version++;
> > + if (memtx->delayed_free_mode++ == 0)
> > + small_alloc_setopt(&memtx->alloc, SMALL_DELAYED_FREE_MODE, true);
> > +}
>
> Just adding a counter will easily never free any memory if it
> never drops below 1.
Well, it will free memory allocated after the counter was incremented
(that's what snapshot_version is about), but in general you're right -
we need to deal with it somehow.
^ permalink raw reply [flat|nested] 34+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 07/13] memtx: use ref counting to pin indexes for snapshot
2019-08-13 10:56 ` Vladimir Davydov
@ 2019-08-13 16:08 ` Georgy Kirichenko
0 siblings, 0 replies; 34+ messages in thread
From: Georgy Kirichenko @ 2019-08-13 16:08 UTC (permalink / raw)
To: tarantool-patches; +Cc: Vladimir Davydov, Konstantin Osipov
[-- Attachment #1: Type: text/plain, Size: 1596 bytes --]
On Tuesday, August 13, 2019 1:56:29 PM MSK Vladimir Davydov wrote:
> On Tue, Aug 13, 2019 at 01:24:20AM +0300, Konstantin Osipov wrote:
> > * Vladimir Davydov <vdavydov.dev@gmail.com> [19/08/10 23:22]:
> > > Currently, to prevent an index from going away while it is being
> > > written to a snapshot, we postpone memtx_gc_task's free() invocation
> > > until checkpointing is complete, see commit 94de0a081b3a ("Don't take
> > > schema lock for checkpointing"). This works fine, but makes it rather
> > > difficult to reuse snapshot iterators for other purposes, e.g. feeding
> > > a consistent read view to a newly joined replica.
> > >
> > > Let's instead use index reference counting for pinning indexes for
> > > checkpointing. A reference is taken in a snapshot iterator constructor
> > > and released when the snapshot iterator is destroyed.
> >
> > I don't see how this can work in general, memtx index can not live
> > without memtx space and tuple format. These are not referenced
> > from the index object.
>
> A memtx index doesn't need to reference a space - we move indexes from
> space to sapce on alter without any problems. Regarding the tuple
> format, it is referenced by tuples anyway while tuples are referenced by
> the primary index so we don't need to reference it explicitly.
I believe we should bound all index internal structures and corresponding
tuple together as well as other schema members (spaces, functions, sequences).
It allows us to get rid of crappy alter.cc file and simplifies objects lifetime
around transactional manager (which we do not have yet)
[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]
^ permalink raw reply [flat|nested] 34+ messages in thread
* Re: [PATCH 01/13] vinyl: embed engine in vy_env
2019-08-10 10:03 ` [PATCH 01/13] vinyl: embed engine in vy_env Vladimir Davydov
2019-08-12 22:14 ` [tarantool-patches] " Konstantin Osipov
@ 2019-08-14 13:09 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-14 13:09 UTC (permalink / raw)
To: tarantool-patches
Pushed to master.
^ permalink raw reply [flat|nested] 34+ messages in thread
* Re: [PATCH 02/13] vinyl: embed index in vy_lsm
2019-08-10 10:03 ` [PATCH 02/13] vinyl: embed index in vy_lsm Vladimir Davydov
2019-08-12 22:14 ` [tarantool-patches] " Konstantin Osipov
@ 2019-08-14 13:09 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-14 13:09 UTC (permalink / raw)
To: tarantool-patches
Pushed to master.
^ permalink raw reply [flat|nested] 34+ messages in thread
* Re: [PATCH 03/13] vinyl: move reference counting from vy_lsm to index
2019-08-10 10:03 ` [PATCH 03/13] vinyl: move reference counting from vy_lsm to index Vladimir Davydov
2019-08-12 22:16 ` [tarantool-patches] " Konstantin Osipov
@ 2019-08-14 13:09 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-14 13:09 UTC (permalink / raw)
To: tarantool-patches
Pushed to master.
^ permalink raw reply [flat|nested] 34+ messages in thread
* Re: [PATCH 06/13] memtx: don't store pointers to index internals in iterator
2019-08-10 10:03 ` [PATCH 06/13] memtx: don't store pointers to index internals in iterator Vladimir Davydov
2019-08-12 22:21 ` [tarantool-patches] " Konstantin Osipov
@ 2019-08-14 13:10 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-14 13:10 UTC (permalink / raw)
To: tarantool-patches
Pushed to master.
^ permalink raw reply [flat|nested] 34+ messages in thread
* Re: [PATCH 08/13] memtx: allow snapshot iterator to fail
2019-08-10 10:03 ` [PATCH 08/13] memtx: allow snapshot iterator to fail Vladimir Davydov
2019-08-12 22:25 ` [tarantool-patches] " Konstantin Osipov
@ 2019-08-14 13:10 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-14 13:10 UTC (permalink / raw)
To: tarantool-patches
Pushed to master.
^ permalink raw reply [flat|nested] 34+ messages in thread
* Re: [PATCH 11/13] xrow: factor out helper for setting REPLACE request body
2019-08-10 10:03 ` [PATCH 11/13] xrow: factor out helper for setting REPLACE request body Vladimir Davydov
2019-08-12 22:29 ` [tarantool-patches] " Konstantin Osipov
@ 2019-08-14 13:11 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-14 13:11 UTC (permalink / raw)
To: tarantool-patches
Pushed to master.
^ permalink raw reply [flat|nested] 34+ messages in thread
* Re: [PATCH 10/13] wal: make wal_sync fail on write error
2019-08-10 10:03 ` [PATCH 10/13] wal: make wal_sync fail on write error Vladimir Davydov
2019-08-12 22:29 ` [tarantool-patches] " Konstantin Osipov
@ 2019-08-14 16:48 ` Vladimir Davydov
1 sibling, 0 replies; 34+ messages in thread
From: Vladimir Davydov @ 2019-08-14 16:48 UTC (permalink / raw)
To: tarantool-patches
Pushed to master after adding a test case:
diff --git a/src/box/wal.c b/src/box/wal.c
index 267cafed..9219d677 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -540,6 +540,11 @@ wal_sync_f(struct cbus_call_msg *msg)
int
wal_sync(void)
{
+ ERROR_INJECT(ERRINJ_WAL_SYNC, {
+ diag_set(ClientError, ER_INJECTION, "wal sync");
+ return -1;
+ });
+
struct wal_writer *writer = &wal_writer_singleton;
if (writer->wal_mode == WAL_NONE)
return 0;
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index fe9c2237..e75a620d 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -73,6 +73,7 @@ struct errinj {
#define ERRINJ_LIST(_) \
_(ERRINJ_TESTING, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_WAL_IO, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_WAL_SYNC, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_WAL_ROTATE, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_WAL_WRITE, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_WAL_WRITE_PARTIAL, ERRINJ_INT, {.iparam = -1}) \
diff --git a/test/box/errinj.result b/test/box/errinj.result
index af2f8877..5784758d 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -20,25 +20,27 @@ errinj.info()
state: false
ERRINJ_VYRUN_DATA_READ:
state: false
+ ERRINJ_SWIM_FD_ONLY:
+ state: false
ERRINJ_SQL_NAME_NORMALIZATION:
state: false
ERRINJ_VY_SCHED_TIMEOUT:
state: 0
- ERRINJ_SWIM_FD_ONLY:
- state: false
ERRINJ_COIO_SENDFILE_CHUNK:
state: -1
+ ERRINJ_VY_LOG_FILE_RENAME:
+ state: false
ERRINJ_WAL_WRITE_PARTIAL:
state: -1
ERRINJ_VY_GC:
state: false
ERRINJ_WAL_DELAY:
state: false
- ERRINJ_VY_INDEX_DUMP:
- state: -1
+ ERRINJ_INDEX_ALLOC:
+ state: false
ERRINJ_WAL_WRITE_EOF:
state: false
- ERRINJ_VY_LOG_FILE_RENAME:
+ ERRINJ_WAL_SYNC:
state: false
ERRINJ_VYRUN_INDEX_GARBAGE:
state: false
@@ -122,20 +124,20 @@ errinj.info()
state: false
ERRINJ_XLOG_GARBAGE:
state: false
- ERRINJ_INDEX_ALLOC:
- state: false
+ ERRINJ_VY_INDEX_DUMP:
+ state: -1
ERRINJ_VY_READ_PAGE_DELAY:
state: false
ERRINJ_TESTING:
state: false
- ERRINJ_RELAY_TIMEOUT:
- state: 0
+ ERRINJ_RELAY_SEND_DELAY:
+ state: false
ERRINJ_VY_SQUASH_TIMEOUT:
state: 0
ERRINJ_VY_LOG_FLUSH:
state: false
- ERRINJ_RELAY_SEND_DELAY:
- state: false
+ ERRINJ_RELAY_TIMEOUT:
+ state: 0
...
errinj.set("some-injection", true)
---
diff --git a/test/vinyl/errinj_ddl.result b/test/vinyl/errinj_ddl.result
index 8fe9064b..deebda89 100644
--- a/test/vinyl/errinj_ddl.result
+++ b/test/vinyl/errinj_ddl.result
@@ -689,3 +689,59 @@ test_run:cmd("setopt delimiter ''");
box.cfg{vinyl_cache = default_vinyl_cache}
---
...
+--
+-- Check that DDL fails if it fails to flush pending WAL writes.
+-- Done in the scope of gh-1271.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('primary')
+---
+...
+s:replace{1, 2}
+---
+- [1, 2]
+...
+box.error.injection.set('ERRINJ_WAL_SYNC', true)
+---
+- ok
+...
+s:format({{'a', 'unsigned'}, {'b', 'unsigned'}}) -- ok
+---
+...
+_ = s:create_index('secondary', {parts = {2, 'unsigned'}}) -- ok
+---
+...
+s:format({})
+---
+...
+s.index.secondary:drop()
+---
+...
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
+---
+- ok
+...
+_ = fiber.create(function() s:replace{3, 4} end)
+---
+...
+s:format({{'a', 'unsigned'}, {'b', 'unsigned'}}) -- error
+---
+- error: Error injection 'wal sync'
+...
+_ = s:create_index('secondary', {parts = {2, 'unsigned'}}) -- error
+---
+- error: Error injection 'wal sync'
+...
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
+---
+- ok
+...
+box.error.injection.set('ERRINJ_WAL_SYNC', false)
+---
+- ok
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/errinj_ddl.test.lua b/test/vinyl/errinj_ddl.test.lua
index 3bc8bed8..9039d357 100644
--- a/test/vinyl/errinj_ddl.test.lua
+++ b/test/vinyl/errinj_ddl.test.lua
@@ -300,3 +300,29 @@ box.error.injection.set('ERRINJ_VY_READ_PAGE_TIMEOUT', 0);
test_run:cmd("setopt delimiter ''");
box.cfg{vinyl_cache = default_vinyl_cache}
+
+--
+-- Check that DDL fails if it fails to flush pending WAL writes.
+-- Done in the scope of gh-1271.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('primary')
+s:replace{1, 2}
+
+box.error.injection.set('ERRINJ_WAL_SYNC', true)
+
+s:format({{'a', 'unsigned'}, {'b', 'unsigned'}}) -- ok
+_ = s:create_index('secondary', {parts = {2, 'unsigned'}}) -- ok
+
+s:format({})
+s.index.secondary:drop()
+
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
+_ = fiber.create(function() s:replace{3, 4} end)
+
+s:format({{'a', 'unsigned'}, {'b', 'unsigned'}}) -- error
+_ = s:create_index('secondary', {parts = {2, 'unsigned'}}) -- error
+
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
+box.error.injection.set('ERRINJ_WAL_SYNC', false)
+s:drop()
^ permalink raw reply [flat|nested] 34+ messages in thread
end of thread, other threads:[~2019-08-14 16:48 UTC | newest]
Thread overview: 34+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-08-10 10:03 [PATCH 00/13] Join replicas off the current read view Vladimir Davydov
2019-08-10 10:03 ` [PATCH 01/13] vinyl: embed engine in vy_env Vladimir Davydov
2019-08-12 22:14 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:09 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 02/13] vinyl: embed index in vy_lsm Vladimir Davydov
2019-08-12 22:14 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:09 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 03/13] vinyl: move reference counting from vy_lsm to index Vladimir Davydov
2019-08-12 22:16 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:09 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 04/13] vinyl: don't pin index for iterator lifetime Vladimir Davydov
2019-08-10 10:03 ` [PATCH 05/13] vinyl: don't exempt dropped indexes from dump and compaction Vladimir Davydov
2019-08-10 10:03 ` [PATCH 06/13] memtx: don't store pointers to index internals in iterator Vladimir Davydov
2019-08-12 22:21 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:10 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 07/13] memtx: use ref counting to pin indexes for snapshot Vladimir Davydov
2019-08-12 22:24 ` [tarantool-patches] " Konstantin Osipov
2019-08-13 10:56 ` Vladimir Davydov
2019-08-13 16:08 ` Georgy Kirichenko
2019-08-10 10:03 ` [PATCH 08/13] memtx: allow snapshot iterator to fail Vladimir Davydov
2019-08-12 22:25 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:10 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 09/13] memtx: enter small delayed free mode from snapshot iterator Vladimir Davydov
2019-08-12 22:27 ` [tarantool-patches] " Konstantin Osipov
2019-08-13 10:59 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 10/13] wal: make wal_sync fail on write error Vladimir Davydov
2019-08-12 22:29 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 16:48 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 11/13] xrow: factor out helper for setting REPLACE request body Vladimir Davydov
2019-08-12 22:29 ` [tarantool-patches] " Konstantin Osipov
2019-08-14 13:11 ` Vladimir Davydov
2019-08-10 10:03 ` [PATCH 12/13] test: disable replication/on_schema_init Vladimir Davydov
2019-08-12 22:31 ` [tarantool-patches] " Konstantin Osipov
2019-08-10 10:03 ` [PATCH 13/13] relay: join new replicas off read view Vladimir Davydov
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox