[PATCH 13/13] relay: join new replicas off read view
Vladimir Davydov
vdavydov.dev at gmail.com
Sat Aug 10 13:03:40 MSK 2019
Historically, we join a new replica off the last checkpoint. As a
result, we must always keep the last memtx snapshot and all vinyl data
files corresponding to it. Actually, there's no need to use the last
checkpoint for joining a replica. Instead we can use the current read
view as both memtx and vinyl support it. This should speed up the
process of joining a new replica, because we don't need to replay all
xlogs written after the last checkpoint, only those that are accumulated
while we are relaying the current read view. This should also allow us
to avoid creating a snapshot file on bootstrap, because the only reason
why we need it is allowing joining replicas. Besides, this is a step
towards decoupling the vinyl metadata log from checkpointing in
particular and from xlogs in general.
Closes #1271
---
src/box/blackhole.c | 3 +-
src/box/box.cc | 33 +-
src/box/engine.c | 21 --
src/box/engine.h | 27 +-
src/box/memtx_engine.c | 74 ----
src/box/relay.cc | 170 +++++++--
src/box/relay.h | 2 +-
src/box/sysview.c | 3 +-
src/box/vinyl.c | 374 ++++----------------
src/box/vy_run.c | 6 -
src/box/vy_tx.c | 10 +-
src/box/vy_tx.h | 8 +
src/lib/core/errinj.h | 2 +-
test/box/errinj.result | 38 +-
test/replication-py/cluster.result | 13 -
test/replication-py/cluster.test.py | 25 --
test/replication/join_without_snap.result | 88 +++++
test/replication/join_without_snap.test.lua | 32 ++
test/replication/suite.cfg | 1 +
test/vinyl/errinj.result | 4 +-
test/vinyl/errinj.test.lua | 4 +-
test/xlog/panic_on_broken_lsn.result | 31 +-
test/xlog/panic_on_broken_lsn.test.lua | 20 +-
23 files changed, 419 insertions(+), 570 deletions(-)
create mode 100644 test/replication/join_without_snap.result
create mode 100644 test/replication/join_without_snap.test.lua
diff --git a/src/box/blackhole.c b/src/box/blackhole.c
index b69e543a..d2fa82be 100644
--- a/src/box/blackhole.c
+++ b/src/box/blackhole.c
@@ -178,7 +178,6 @@ blackhole_engine_create_space(struct engine *engine, struct space_def *def,
static const struct engine_vtab blackhole_engine_vtab = {
/* .shutdown = */ blackhole_engine_shutdown,
/* .create_space = */ blackhole_engine_create_space,
- /* .join = */ generic_engine_join,
/* .begin = */ generic_engine_begin,
/* .begin_statement = */ generic_engine_begin_statement,
/* .prepare = */ generic_engine_prepare,
@@ -213,6 +212,6 @@ blackhole_engine_new(void)
engine->vtab = &blackhole_engine_vtab;
engine->name = "blackhole";
- engine->flags = ENGINE_BYPASS_TX;
+ engine->flags = ENGINE_BYPASS_TX | ENGINE_EXCLUDE_FROM_SNAPSHOT;
return engine;
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 66cd6d3a..95ce0bc1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1467,41 +1467,13 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
"wal_mode = 'none'");
}
- /*
- * The only case when the directory index is empty is
- * when someone has deleted a snapshot and tries to join
- * as a replica. Our best effort is to not crash in such
- * case: raise ER_MISSING_SNAPSHOT.
- */
- struct gc_checkpoint *checkpoint = gc_last_checkpoint();
- if (checkpoint == NULL)
- tnt_raise(ClientError, ER_MISSING_SNAPSHOT);
-
- /* Remember start vclock. */
- struct vclock start_vclock;
- vclock_copy(&start_vclock, &checkpoint->vclock);
-
- /*
- * Make sure the checkpoint files won't be deleted while
- * initial join is in progress.
- */
- struct gc_checkpoint_ref gc;
- gc_ref_checkpoint(checkpoint, &gc, "replica %s",
- tt_uuid_str(&instance_uuid));
- auto gc_guard = make_scoped_guard([&]{ gc_unref_checkpoint(&gc); });
-
- /* Respond to JOIN request with start_vclock. */
- struct xrow_header row;
- xrow_encode_vclock_xc(&row, &start_vclock);
- row.sync = header->sync;
- coio_write_xrow(io, &row);
-
say_info("joining replica %s at %s",
tt_uuid_str(&instance_uuid), sio_socketname(io->fd));
/*
* Initial stream: feed replica with dirty data from engines.
*/
+ struct vclock start_vclock;
relay_initial_join(io->fd, header->sync, &start_vclock);
say_info("initial data sent.");
@@ -1513,6 +1485,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
*/
box_on_join(&instance_uuid);
+ ERROR_INJECT_YIELD(ERRINJ_REPLICA_JOIN_DELAY);
+
/* Remember master's vclock after the last request */
struct vclock stop_vclock;
vclock_copy(&stop_vclock, &replicaset.vclock);
@@ -1530,6 +1504,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
diag_raise();
/* Send end of initial stage data marker */
+ struct xrow_header row;
xrow_encode_vclock_xc(&row, &stop_vclock);
row.sync = header->sync;
coio_write_xrow(io, &row);
diff --git a/src/box/engine.c b/src/box/engine.c
index a52d0ed1..73ff0464 100644
--- a/src/box/engine.c
+++ b/src/box/engine.c
@@ -174,17 +174,6 @@ engine_backup(const struct vclock *vclock, engine_backup_cb cb, void *cb_arg)
return 0;
}
-int
-engine_join(const struct vclock *vclock, struct xstream *stream)
-{
- struct engine *engine;
- engine_foreach(engine) {
- if (engine->vtab->join(engine, vclock, stream) != 0)
- return -1;
- }
- return 0;
-}
-
void
engine_memory_stat(struct engine_memory_stat *stat)
{
@@ -204,16 +193,6 @@ engine_reset_stat(void)
/* {{{ Virtual method stubs */
-int
-generic_engine_join(struct engine *engine, const struct vclock *vclock,
- struct xstream *stream)
-{
- (void)engine;
- (void)vclock;
- (void)stream;
- return 0;
-}
-
int
generic_engine_begin(struct engine *engine, struct txn *txn)
{
diff --git a/src/box/engine.h b/src/box/engine.h
index a302b3bc..c83042b4 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -73,11 +73,6 @@ struct engine_vtab {
/** Allocate a new space instance. */
struct space *(*create_space)(struct engine *engine,
struct space_def *def, struct rlist *key_list);
- /**
- * Write statements stored in checkpoint @vclock to @stream.
- */
- int (*join)(struct engine *engine, const struct vclock *vclock,
- struct xstream *stream);
/**
* Begin a new single or multi-statement transaction.
* Called on first statement in a transaction, not when
@@ -205,6 +200,12 @@ enum {
* transactions w/o throwing ER_CROSS_ENGINE_TRANSACTION.
*/
ENGINE_BYPASS_TX = 1 << 0,
+ /**
+ * This flag is set for virtual engines, such as sysview,
+ * that don't actually store any data. It means that we
+ * must not relay their content to a newly joined replica.
+ */
+ ENGINE_EXCLUDE_FROM_SNAPSHOT = 1 << 1,
};
struct engine {
@@ -330,13 +331,6 @@ engine_begin_final_recovery(void);
int
engine_end_recovery(void);
-/**
- * Feed checkpoint data as join events to the replicas.
- * (called on the master).
- */
-int
-engine_join(const struct vclock *vclock, struct xstream *stream);
-
int
engine_begin_checkpoint(void);
@@ -364,8 +358,6 @@ engine_reset_stat(void);
/*
* Virtual method stubs.
*/
-int generic_engine_join(struct engine *, const struct vclock *,
- struct xstream *);
int generic_engine_begin(struct engine *, struct txn *);
int generic_engine_begin_statement(struct engine *, struct txn *);
int generic_engine_prepare(struct engine *, struct txn *);
@@ -468,13 +460,6 @@ engine_end_recovery_xc(void)
diag_raise();
}
-static inline void
-engine_join_xc(const struct vclock *vclock, struct xstream *stream)
-{
- if (engine_join(vclock, stream) != 0)
- diag_raise();
-}
-
#endif /* defined(__cplusplus) */
#endif /* TARANTOOL_BOX_ENGINE_H_INCLUDED */
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index ea197cad..28c516c2 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -726,79 +726,6 @@ memtx_engine_backup(struct engine *engine, const struct vclock *vclock,
return cb(filename, cb_arg);
}
-/** Used to pass arguments to memtx_initial_join_f */
-struct memtx_join_arg {
- const char *snap_dirname;
- int64_t checkpoint_lsn;
- struct xstream *stream;
-};
-
-/**
- * Invoked from a thread to feed snapshot rows.
- */
-static int
-memtx_initial_join_f(va_list ap)
-{
- struct memtx_join_arg *arg = va_arg(ap, struct memtx_join_arg *);
- const char *snap_dirname = arg->snap_dirname;
- int64_t checkpoint_lsn = arg->checkpoint_lsn;
- struct xstream *stream = arg->stream;
-
- struct xdir dir;
- /*
- * snap_dirname and INSTANCE_UUID don't change after start,
- * safe to use in another thread.
- */
- xdir_create(&dir, snap_dirname, SNAP, &INSTANCE_UUID,
- &xlog_opts_default);
- struct xlog_cursor cursor;
- int rc = xdir_open_cursor(&dir, checkpoint_lsn, &cursor);
- xdir_destroy(&dir);
- if (rc < 0)
- return -1;
-
- struct xrow_header row;
- while ((rc = xlog_cursor_next(&cursor, &row, true)) == 0) {
- rc = xstream_write(stream, &row);
- if (rc < 0)
- break;
- }
- xlog_cursor_close(&cursor, false);
- if (rc < 0)
- return -1;
-
- /**
- * We should never try to read snapshots with no EOF
- * marker - such snapshots are very likely corrupted and
- * should not be trusted.
- */
- /* TODO: replace panic with diag_set() */
- if (!xlog_cursor_is_eof(&cursor))
- panic("snapshot `%s' has no EOF marker", cursor.name);
- return 0;
-}
-
-static int
-memtx_engine_join(struct engine *engine, const struct vclock *vclock,
- struct xstream *stream)
-{
- struct memtx_engine *memtx = (struct memtx_engine *)engine;
-
- /*
- * cord_costart() passes only void * pointer as an argument.
- */
- struct memtx_join_arg arg = {
- /* .snap_dirname = */ memtx->snap_dir.dirname,
- /* .checkpoint_lsn = */ vclock_sum(vclock),
- /* .stream = */ stream
- };
-
- /* Send snapshot using a thread */
- struct cord cord;
- cord_costart(&cord, "initial_join", memtx_initial_join_f, &arg);
- return cord_cojoin(&cord);
-}
-
static int
small_stats_noop_cb(const struct mempool_stats *stats, void *cb_ctx)
{
@@ -822,7 +749,6 @@ memtx_engine_memory_stat(struct engine *engine, struct engine_memory_stat *stat)
static const struct engine_vtab memtx_engine_vtab = {
/* .shutdown = */ memtx_engine_shutdown,
/* .create_space = */ memtx_engine_create_space,
- /* .join = */ memtx_engine_join,
/* .begin = */ memtx_engine_begin,
/* .begin_statement = */ generic_engine_begin_statement,
/* .prepare = */ generic_engine_prepare,
diff --git a/src/box/relay.cc b/src/box/relay.cc
index efa3373f..a29a62fa 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -41,11 +41,13 @@
#include "coio.h"
#include "coio_task.h"
-#include "engine.h"
#include "gc.h"
+#include "index.h"
#include "iproto_constants.h"
#include "recovery.h"
#include "replication.h"
+#include "schema.h"
+#include "space.h"
#include "trigger.h"
#include "vclock.h"
#include "version.h"
@@ -168,8 +170,6 @@ relay_last_row_time(const struct relay *relay)
static void
relay_send(struct relay *relay, struct xrow_header *packet);
static void
-relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row);
-static void
relay_send_row(struct xstream *stream, struct xrow_header *row);
struct relay *
@@ -283,20 +283,156 @@ relay_set_cord_name(int fd)
cord_set_name(name);
}
+/**
+ * A space to feed to a replica on initial join.
+ */
+struct relay_space {
+ /** Link in the list of spaces to feed. */
+ struct rlist link;
+ /** Space id. */
+ uint32_t space_id;
+ /** Read view iterator. */
+ struct snapshot_iterator *iterator;
+};
+
+/**
+ * Add a space to the list of spaces to feed to a replica
+ * if eligible. We don't need to relay the following kinds
+ * of spaces:
+ *
+ * - Temporary spaces, apparently.
+ * - Spaces that are local to this instance.
+ * - Virtual spaces, such as sysview.
+ * - Spaces that don't have the primary index.
+ */
+static int
+relay_space_add(struct space *sp, void *data)
+{
+ struct rlist *spaces = (struct rlist *)data;
+
+ if (space_is_temporary(sp))
+ return 0;
+ if (space_group_id(sp) == GROUP_LOCAL)
+ return 0;
+ if (sp->engine->flags & ENGINE_EXCLUDE_FROM_SNAPSHOT)
+ return 0;
+ struct index *pk = space_index(sp, 0);
+ if (pk == NULL)
+ return 0;
+
+ struct relay_space *r = (struct relay_space *)malloc(sizeof(*r));
+ if (r == NULL) {
+ diag_set(OutOfMemory, sizeof(*r),
+ "malloc", "struct relay_space");
+ return -1;
+ }
+ r->space_id = space_id(sp);
+ r->iterator = index_create_snapshot_iterator(pk);
+ if (r->iterator == NULL) {
+ free(r);
+ return -1;
+ }
+ rlist_add_tail_entry(spaces, r, link);
+ return 0;
+}
+
+/**
+ * Relay a single space row to a replica.
+ */
+static void
+relay_space_send_row(uint32_t space_id, const char *data, uint32_t size,
+ struct ev_io *io, uint64_t sync)
+{
+ struct request_replace_body body;
+ request_replace_body_create(&body, space_id);
+
+ struct xrow_header row;
+ memset(&row, 0, sizeof(row));
+ row.type = IPROTO_INSERT;
+ row.sync = sync;
+
+ row.bodycnt = 2;
+ row.body[0].iov_base = &body;
+ row.body[0].iov_len = sizeof(body);
+ row.body[1].iov_base = (char *)data;
+ row.body[1].iov_len = size;
+
+ coio_write_xrow(io, &row);
+}
+
+/**
+ * Relay a read view of a space content to a replica.
+ */
+static void
+relay_space_send(struct relay_space *r, struct ev_io *io, uint64_t sync)
+{
+ int rc;
+ struct snapshot_iterator *it = r->iterator;
+
+ uint32_t size;
+ const char *data;
+ while ((rc = it->next(it, &data, &size)) == 0 && data != NULL)
+ relay_space_send_row(r->space_id, data, size, io, sync);
+
+ if (rc != 0)
+ diag_raise();
+}
+
+/**
+ * Close the read view iterator associated with the space
+ * and free the container object.
+ */
+static void
+relay_space_free(struct relay_space *r)
+{
+ rlist_del_entry(r, link);
+ r->iterator->free(r->iterator);
+ free(r);
+}
+
void
relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
{
- struct relay *relay = relay_new(NULL);
- if (relay == NULL)
- diag_raise();
+ struct ev_io io;
+ coio_create(&io, fd);
- relay_start(relay, fd, sync, relay_send_initial_join_row);
- auto relay_guard = make_scoped_guard([=] {
- relay_stop(relay);
- relay_delete(relay);
+ RLIST_HEAD(spaces);
+ auto guard = make_scoped_guard([&spaces] {
+ struct relay_space *r, *next;
+ rlist_foreach_entry_safe(r, &spaces, link, next)
+ relay_space_free(r);
});
- engine_join_xc(vclock, &relay->stream);
+ /*
+ * First, we open read view iterators over spaces that need
+ * to be fed to the replica. Note, we can't yield in the loop,
+ * because otherwise we could get an inconsistent view of the
+ * database.
+ */
+ if (space_foreach(relay_space_add, &spaces) != 0)
+ diag_raise();
+
+ /*
+ * Second, we must sync WAL to make sure that all changes
+ * visible by the iterators are successfully committed.
+ */
+ if (wal_sync() != 0)
+ diag_raise();
+
+ vclock_copy(vclock, &replicaset.vclock);
+
+ /* Respond to JOIN request with the current vclock. */
+ struct xrow_header row;
+ xrow_encode_vclock_xc(&row, vclock);
+ row.sync = sync;
+ coio_write_xrow(&io, &row);
+
+ /* Finally, send the read view to the replica. */
+ struct relay_space *r, *next;
+ rlist_foreach_entry_safe(r, &spaces, link, next) {
+ relay_space_send(r, &io, sync);
+ relay_space_free(r);
+ }
}
int
@@ -697,18 +833,6 @@ relay_send(struct relay *relay, struct xrow_header *packet)
fiber_sleep(inj->dparam);
}
-static void
-relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
-{
- struct relay *relay = container_of(stream, struct relay, stream);
- /*
- * Ignore replica local requests as we don't need to promote
- * vclock while sending a snapshot.
- */
- if (row->group_id != GROUP_LOCAL)
- relay_send(relay, row);
-}
-
/** Send a single row to the client. */
static void
relay_send_row(struct xstream *stream, struct xrow_header *packet)
diff --git a/src/box/relay.h b/src/box/relay.h
index 869f8f2e..e1782d78 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -102,7 +102,7 @@ relay_last_row_time(const struct relay *relay);
*
* @param fd client connection
* @param sync sync from incoming JOIN request
- * @param vclock vclock of the last checkpoint
+ * @param vclock[out] vclock of the read view sent to the replica
*/
void
relay_initial_join(int fd, uint64_t sync, struct vclock *vclock);
diff --git a/src/box/sysview.c b/src/box/sysview.c
index 46cf1e13..8c2d63c9 100644
--- a/src/box/sysview.c
+++ b/src/box/sysview.c
@@ -566,7 +566,6 @@ sysview_engine_create_space(struct engine *engine, struct space_def *def,
static const struct engine_vtab sysview_engine_vtab = {
/* .shutdown = */ sysview_engine_shutdown,
/* .create_space = */ sysview_engine_create_space,
- /* .join = */ generic_engine_join,
/* .begin = */ generic_engine_begin,
/* .begin_statement = */ generic_engine_begin_statement,
/* .prepare = */ generic_engine_prepare,
@@ -601,6 +600,6 @@ sysview_engine_new(void)
sysview->base.vtab = &sysview_engine_vtab;
sysview->base.name = "sysview";
- sysview->base.flags = ENGINE_BYPASS_TX;
+ sysview->base.flags = ENGINE_BYPASS_TX | ENGINE_EXCLUDE_FROM_SNAPSHOT;
return sysview;
}
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 9e93153b..297c3a40 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -177,6 +177,12 @@ struct vinyl_iterator {
struct trigger on_tx_destroy;
};
+struct vinyl_snapshot_iterator {
+ struct snapshot_iterator base;
+ struct vy_read_view *rv;
+ struct vy_read_iterator iterator;
+};
+
static const struct engine_vtab vinyl_engine_vtab;
static const struct space_vtab vinyl_space_vtab;
static const struct index_vtab vinyl_index_vtab;
@@ -2914,311 +2920,6 @@ vinyl_engine_end_recovery(struct engine *engine)
/** {{{ Replication */
-/** Relay context, passed to all relay functions. */
-struct vy_join_ctx {
- /** Environment. */
- struct vy_env *env;
- /** Stream to relay statements to. */
- struct xstream *stream;
- /** Pipe to the relay thread. */
- struct cpipe relay_pipe;
- /** Pipe to the tx thread. */
- struct cpipe tx_pipe;
- /**
- * Cbus message, used for calling functions
- * on behalf of the relay thread.
- */
- struct cbus_call_msg cmsg;
- /** ID of the space currently being relayed. */
- uint32_t space_id;
- /**
- * LSM tree key definition, as defined by the user.
- * We only send the primary key, so the definition
- * provided by the user is correct for compare.
- */
- struct key_def *key_def;
- /** LSM tree format used for REPLACE and DELETE statements. */
- struct tuple_format *format;
- /**
- * Write iterator for merging runs before sending
- * them to the replica.
- */
- struct vy_stmt_stream *wi;
- /**
- * List of run slices of the current range, linked by
- * vy_slice::in_join. The newer a slice the closer it
- * is to the head of the list.
- */
- struct rlist slices;
-};
-
-/**
- * Recover a slice and add it to the list of slices.
- * Newer slices are supposed to be recovered first.
- * Returns 0 on success, -1 on failure.
- */
-static int
-vy_prepare_send_slice(struct vy_join_ctx *ctx,
- struct vy_slice_recovery_info *slice_info)
-{
- int rc = -1;
- struct vy_run *run = NULL;
- struct vy_entry begin = vy_entry_none();
- struct vy_entry end = vy_entry_none();
-
- run = vy_run_new(&ctx->env->run_env, slice_info->run->id);
- if (run == NULL)
- goto out;
- if (vy_run_recover(run, ctx->env->path, ctx->space_id, 0,
- ctx->key_def) != 0)
- goto out;
-
- if (slice_info->begin != NULL) {
- begin = vy_entry_key_from_msgpack(ctx->env->lsm_env.key_format,
- ctx->key_def,
- slice_info->begin);
- if (begin.stmt == NULL)
- goto out;
- }
- if (slice_info->end != NULL) {
- end = vy_entry_key_from_msgpack(ctx->env->lsm_env.key_format,
- ctx->key_def,
- slice_info->end);
- if (end.stmt == NULL)
- goto out;
- }
-
- struct vy_slice *slice = vy_slice_new(slice_info->id, run,
- begin, end, ctx->key_def);
- if (slice == NULL)
- goto out;
-
- rlist_add_tail_entry(&ctx->slices, slice, in_join);
- rc = 0;
-out:
- if (run != NULL)
- vy_run_unref(run);
- if (begin.stmt != NULL)
- tuple_unref(begin.stmt);
- if (end.stmt != NULL)
- tuple_unref(end.stmt);
- return rc;
-}
-
-static int
-vy_send_range_f(struct cbus_call_msg *cmsg)
-{
- struct vy_join_ctx *ctx = container_of(cmsg, struct vy_join_ctx, cmsg);
-
- int rc = ctx->wi->iface->start(ctx->wi);
- if (rc != 0)
- goto err;
- struct vy_entry entry;
- while ((rc = ctx->wi->iface->next(ctx->wi, &entry)) == 0 &&
- entry.stmt != NULL) {
- struct xrow_header xrow;
- rc = vy_stmt_encode_primary(entry.stmt, ctx->key_def,
- ctx->space_id, &xrow);
- if (rc != 0)
- break;
- /*
- * Reset the LSN as the replica will ignore it
- * anyway - see comment to vy_env::join_lsn.
- */
- xrow.lsn = 0;
- rc = xstream_write(ctx->stream, &xrow);
- if (rc != 0)
- break;
- fiber_gc();
- }
-err:
- ctx->wi->iface->stop(ctx->wi);
- fiber_gc();
- return rc;
-}
-
-/** Merge and send all runs of the given range. */
-static int
-vy_send_range(struct vy_join_ctx *ctx,
- struct vy_range_recovery_info *range_info)
-{
- int rc;
- struct vy_slice *slice, *tmp;
-
- if (rlist_empty(&range_info->slices))
- return 0; /* nothing to do */
-
- /* Recover slices. */
- struct vy_slice_recovery_info *slice_info;
- rlist_foreach_entry(slice_info, &range_info->slices, in_range) {
- rc = vy_prepare_send_slice(ctx, slice_info);
- if (rc != 0)
- goto out_delete_slices;
- }
-
- /* Create a write iterator. */
- struct rlist fake_read_views;
- rlist_create(&fake_read_views);
- ctx->wi = vy_write_iterator_new(ctx->key_def, true, true,
- &fake_read_views, NULL);
- if (ctx->wi == NULL) {
- rc = -1;
- goto out;
- }
- rlist_foreach_entry(slice, &ctx->slices, in_join) {
- rc = vy_write_iterator_new_slice(ctx->wi, slice, ctx->format);
- if (rc != 0)
- goto out_delete_wi;
- }
-
- /* Do the actual work from the relay thread. */
- bool cancellable = fiber_set_cancellable(false);
- rc = cbus_call(&ctx->relay_pipe, &ctx->tx_pipe, &ctx->cmsg,
- vy_send_range_f, NULL, TIMEOUT_INFINITY);
- fiber_set_cancellable(cancellable);
-
-out_delete_wi:
- ctx->wi->iface->close(ctx->wi);
- ctx->wi = NULL;
-out_delete_slices:
- rlist_foreach_entry_safe(slice, &ctx->slices, in_join, tmp)
- vy_slice_delete(slice);
- rlist_create(&ctx->slices);
-out:
- return rc;
-}
-
-/** Send all tuples stored in the given LSM tree. */
-static int
-vy_send_lsm(struct vy_join_ctx *ctx, struct vy_lsm_recovery_info *lsm_info)
-{
- int rc = -1;
-
- if (lsm_info->drop_lsn >= 0 || lsm_info->create_lsn < 0) {
- /* Dropped or not yet built LSM tree. */
- return 0;
- }
- if (lsm_info->group_id == GROUP_LOCAL) {
- /* Replica local space. */
- return 0;
- }
-
- /*
- * We are only interested in the primary index LSM tree.
- * Secondary keys will be rebuilt on the destination.
- */
- if (lsm_info->index_id != 0)
- return 0;
-
- ctx->space_id = lsm_info->space_id;
-
- /* Create key definition and tuple format. */
- ctx->key_def = key_def_new(lsm_info->key_parts,
- lsm_info->key_part_count, false);
- if (ctx->key_def == NULL)
- goto out;
- ctx->format = vy_stmt_format_new(&ctx->env->stmt_env, &ctx->key_def, 1,
- NULL, 0, 0, NULL);
- if (ctx->format == NULL)
- goto out_free_key_def;
- tuple_format_ref(ctx->format);
-
- /* Send ranges. */
- struct vy_range_recovery_info *range_info;
- assert(!rlist_empty(&lsm_info->ranges));
- rlist_foreach_entry(range_info, &lsm_info->ranges, in_lsm) {
- rc = vy_send_range(ctx, range_info);
- if (rc != 0)
- break;
- }
-
- tuple_format_unref(ctx->format);
- ctx->format = NULL;
-out_free_key_def:
- key_def_delete(ctx->key_def);
- ctx->key_def = NULL;
-out:
- return rc;
-}
-
-/** Relay cord function. */
-static int
-vy_join_f(va_list ap)
-{
- struct vy_join_ctx *ctx = va_arg(ap, struct vy_join_ctx *);
-
- coio_enable();
-
- cpipe_create(&ctx->tx_pipe, "tx");
-
- struct cbus_endpoint endpoint;
- cbus_endpoint_create(&endpoint, cord_name(cord()),
- fiber_schedule_cb, fiber());
-
- cbus_loop(&endpoint);
-
- cbus_endpoint_destroy(&endpoint, cbus_process);
- cpipe_destroy(&ctx->tx_pipe);
- return 0;
-}
-
-static int
-vinyl_engine_join(struct engine *engine, const struct vclock *vclock,
- struct xstream *stream)
-{
- struct vy_env *env = vy_env(engine);
- int rc = -1;
-
- /* Allocate the relay context. */
- struct vy_join_ctx *ctx = malloc(sizeof(*ctx));
- if (ctx == NULL) {
- diag_set(OutOfMemory, PATH_MAX, "malloc", "struct vy_join_ctx");
- goto out;
- }
- memset(ctx, 0, sizeof(*ctx));
- ctx->env = env;
- ctx->stream = stream;
- rlist_create(&ctx->slices);
-
- /* Start the relay cord. */
- char name[FIBER_NAME_MAX];
- snprintf(name, sizeof(name), "initial_join_%p", stream);
- struct cord cord;
- if (cord_costart(&cord, name, vy_join_f, ctx) != 0)
- goto out_free_ctx;
- cpipe_create(&ctx->relay_pipe, name);
-
- /*
- * Load the recovery context from the given point in time.
- * Send all runs stored in it to the replica.
- */
- struct vy_recovery *recovery;
- recovery = vy_recovery_new(vclock_sum(vclock),
- VY_RECOVERY_LOAD_CHECKPOINT);
- if (recovery == NULL) {
- say_error("failed to recover vylog to join a replica");
- goto out_join_cord;
- }
- rc = 0;
- struct vy_lsm_recovery_info *lsm_info;
- rlist_foreach_entry(lsm_info, &recovery->lsms, in_recovery) {
- rc = vy_send_lsm(ctx, lsm_info);
- if (rc != 0)
- break;
- }
- vy_recovery_delete(recovery);
-
-out_join_cord:
- cbus_stop_loop(&ctx->relay_pipe);
- cpipe_destroy(&ctx->relay_pipe);
- if (cord_cojoin(&cord) != 0)
- rc = -1;
-out_free_ctx:
- free(ctx);
-out:
- return rc;
-}
-
static int
vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
{
@@ -3939,6 +3640,66 @@ vinyl_index_create_iterator(struct index *base, enum iterator_type type,
return (struct iterator *)it;
}
+static int
+vinyl_snapshot_iterator_next(struct snapshot_iterator *base,
+ const char **data, uint32_t *size)
+{
+ assert(base->next == vinyl_snapshot_iterator_next);
+ struct vinyl_snapshot_iterator *it =
+ (struct vinyl_snapshot_iterator *)base;
+ struct vy_entry entry;
+ if (vy_read_iterator_next(&it->iterator, &entry) != 0)
+ return -1;
+ *data = entry.stmt != NULL ? tuple_data_range(entry.stmt, size) : NULL;
+ return 0;
+}
+
+static void
+vinyl_snapshot_iterator_free(struct snapshot_iterator *base)
+{
+ assert(base->free == vinyl_snapshot_iterator_free);
+ struct vinyl_snapshot_iterator *it =
+ (struct vinyl_snapshot_iterator *)base;
+ struct vy_lsm *lsm = it->iterator.lsm;
+ struct vy_env *env = vy_env(lsm->base.engine);
+ vy_read_iterator_close(&it->iterator);
+ tx_manager_destroy_read_view(env->xm, it->rv);
+ vy_lsm_unref(lsm);
+ free(it);
+}
+
+static struct snapshot_iterator *
+vinyl_index_create_snapshot_iterator(struct index *base)
+{
+ struct vy_lsm *lsm = vy_lsm(base);
+ struct vy_env *env = vy_env(base->engine);
+
+ struct vinyl_snapshot_iterator *it = malloc(sizeof(*it));
+ if (it == NULL) {
+ diag_set(OutOfMemory, sizeof(*it), "malloc",
+ "struct vinyl_snapshot_iterator");
+ return NULL;
+ }
+ it->base.next = vinyl_snapshot_iterator_next;
+ it->base.free = vinyl_snapshot_iterator_free;
+
+ it->rv = tx_manager_read_view(env->xm);
+ if (it->rv == NULL) {
+ free(it);
+ return NULL;
+ }
+ vy_read_iterator_open(&it->iterator, lsm, NULL,
+ ITER_ALL, lsm->env->empty_key,
+ (const struct vy_read_view **)&it->rv);
+ /*
+ * The index may be dropped while we are reading it.
+ * The iterator must go on as if nothing happened.
+ */
+ vy_lsm_ref(lsm);
+
+ return &it->base;
+}
+
static int
vinyl_index_get(struct index *index, const char *key,
uint32_t part_count, struct tuple **ret)
@@ -4665,7 +4426,6 @@ static struct trigger on_replace_vinyl_deferred_delete = {
static const struct engine_vtab vinyl_engine_vtab = {
/* .shutdown = */ vinyl_engine_shutdown,
/* .create_space = */ vinyl_engine_create_space,
- /* .join = */ vinyl_engine_join,
/* .begin = */ vinyl_engine_begin,
/* .begin_statement = */ vinyl_engine_begin_statement,
/* .prepare = */ vinyl_engine_prepare,
@@ -4732,7 +4492,7 @@ static const struct index_vtab vinyl_index_vtab = {
/* .replace = */ generic_index_replace,
/* .create_iterator = */ vinyl_index_create_iterator,
/* .create_snapshot_iterator = */
- generic_index_create_snapshot_iterator,
+ vinyl_index_create_snapshot_iterator,
/* .stat = */ vinyl_index_stat,
/* .compact = */ vinyl_index_compact,
/* .reset_stat = */ vinyl_index_reset_stat,
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index c6c17aee..25b6dcd3 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -1675,14 +1675,8 @@ vy_run_recover(struct vy_run *run, const char *dir,
/* Read run header. */
struct xrow_header xrow;
- ERROR_INJECT(ERRINJ_VYRUN_INDEX_GARBAGE, {
- errinj(ERRINJ_XLOG_GARBAGE, ERRINJ_BOOL)->bparam = true;
- });
/* all rows should be in one tx */
int rc = xlog_cursor_next_tx(&cursor);
- ERROR_INJECT(ERRINJ_VYRUN_INDEX_GARBAGE, {
- errinj(ERRINJ_XLOG_GARBAGE, ERRINJ_BOOL)->bparam = false;
- });
if (rc != 0) {
if (rc > 0)
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 9b300fde..6884ed8d 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -156,8 +156,7 @@ tx_manager_mem_used(struct tx_manager *xm)
return ret;
}
-/** Create or reuse an instance of a read view. */
-static struct vy_read_view *
+struct vy_read_view *
tx_manager_read_view(struct tx_manager *xm)
{
struct vy_read_view *rv;
@@ -195,12 +194,9 @@ tx_manager_read_view(struct tx_manager *xm)
return rv;
}
-/** Dereference and possibly destroy a read view. */
-static void
-tx_manager_destroy_read_view(struct tx_manager *xm,
- const struct vy_read_view *read_view)
+void
+tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv)
{
- struct vy_read_view *rv = (struct vy_read_view *) read_view;
if (rv == xm->p_global_read_view)
return;
assert(rv->refs);
diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h
index 376f4330..3144c921 100644
--- a/src/box/vy_tx.h
+++ b/src/box/vy_tx.h
@@ -289,6 +289,14 @@ tx_manager_delete(struct tx_manager *xm);
size_t
tx_manager_mem_used(struct tx_manager *xm);
+/** Create or reuse an instance of a read view. */
+struct vy_read_view *
+tx_manager_read_view(struct tx_manager *xm);
+
+/** Dereference and possibly destroy a read view. */
+void
+tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv);
+
/**
* Abort all rw transactions that affect the given space
* and haven't reached WAL yet. Called before executing a DDL
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index fe9c2237..cda179e9 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -101,11 +101,11 @@ struct errinj {
_(ERRINJ_RELAY_REPORT_INTERVAL, ERRINJ_DOUBLE, {.dparam = 0}) \
_(ERRINJ_RELAY_FINAL_SLEEP, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_RELAY_FINAL_JOIN, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_REPLICA_JOIN_DELAY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_PORT_DUMP, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_XLOG_GARBAGE, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_XLOG_META, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_XLOG_READ, ERRINJ_INT, {.iparam = -1}) \
- _(ERRINJ_VYRUN_INDEX_GARBAGE, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_VYRUN_DATA_READ, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_CHECK_FORMAT_DELAY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_BUILD_INDEX, ERRINJ_INT, {.iparam = -1}) \
diff --git a/test/box/errinj.result b/test/box/errinj.result
index af2f8877..e7d58f49 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -38,17 +38,17 @@ errinj.info()
state: -1
ERRINJ_WAL_WRITE_EOF:
state: false
- ERRINJ_VY_LOG_FILE_RENAME:
- state: false
- ERRINJ_VYRUN_INDEX_GARBAGE:
- state: false
- ERRINJ_BUILD_INDEX_DELAY:
+ ERRINJ_HTTP_RESPONSE_ADD_WAIT:
state: false
ERRINJ_BUILD_INDEX:
state: -1
- ERRINJ_VY_INDEX_FILE_RENAME:
+ ERRINJ_BUILD_INDEX_DELAY:
state: false
- ERRINJ_CHECK_FORMAT_DELAY:
+ ERRINJ_VY_RUN_FILE_RENAME:
+ state: false
+ ERRINJ_VY_COMPACTION_DELAY:
+ state: false
+ ERRINJ_VY_DUMP_DELAY:
state: false
ERRINJ_VY_DELAY_PK_LOOKUP:
state: false
@@ -56,16 +56,16 @@ errinj.info()
state: false
ERRINJ_PORT_DUMP:
state: false
- ERRINJ_VY_DUMP_DELAY:
- state: false
+ ERRINJ_WAL_BREAK_LSN:
+ state: -1
ERRINJ_WAL_IO:
state: false
ERRINJ_WAL_FALLOCATE:
state: 0
- ERRINJ_WAL_BREAK_LSN:
- state: -1
ERRINJ_RELAY_BREAK_LSN:
state: -1
+ ERRINJ_VY_INDEX_FILE_RENAME:
+ state: false
ERRINJ_TUPLE_FORMAT_COUNT:
state: -1
ERRINJ_TUPLE_ALLOC:
@@ -76,7 +76,7 @@ errinj.info()
state: false
ERRINJ_RELAY_REPORT_INTERVAL:
state: 0
- ERRINJ_VY_RUN_FILE_RENAME:
+ ERRINJ_VY_LOG_FILE_RENAME:
state: false
ERRINJ_VY_READ_PAGE_TIMEOUT:
state: 0
@@ -84,23 +84,23 @@ errinj.info()
state: false
ERRINJ_SIO_READ_MAX:
state: -1
- ERRINJ_HTTP_RESPONSE_ADD_WAIT:
- state: false
- ERRINJ_WAL_WRITE_DISK:
- state: false
ERRINJ_SNAP_COMMIT_DELAY:
state: false
+ ERRINJ_WAL_WRITE_DISK:
+ state: false
ERRINJ_SNAP_WRITE_DELAY:
state: false
- ERRINJ_VY_RUN_WRITE:
- state: false
ERRINJ_LOG_ROTATE:
state: false
+ ERRINJ_VY_RUN_WRITE:
+ state: false
+ ERRINJ_CHECK_FORMAT_DELAY:
+ state: false
ERRINJ_VY_LOG_FLUSH_DELAY:
state: false
ERRINJ_RELAY_FINAL_JOIN:
state: false
- ERRINJ_VY_COMPACTION_DELAY:
+ ERRINJ_REPLICA_JOIN_DELAY:
state: false
ERRINJ_RELAY_FINAL_SLEEP:
state: false
diff --git a/test/replication-py/cluster.result b/test/replication-py/cluster.result
index 04f06f74..f68a6af7 100644
--- a/test/replication-py/cluster.result
+++ b/test/replication-py/cluster.result
@@ -23,19 +23,6 @@ box.schema.user.grant('guest', 'replication')
...
ok - join with granted role
-------------------------------------------------------------
-gh-707: Master crashes on JOIN if it does not have snapshot files
-gh-480: If socket is closed while JOIN, replica wont reconnect
--------------------------------------------------------------
-ok - join without snapshots
-ok - _cluster did not change after unsuccessful JOIN
-box.schema.user.revoke('guest', 'replication')
----
-...
-box.snapshot()
----
-- ok
-...
--------------------------------------------------------------
gh-434: Assertion if replace _cluster tuple for local server
-------------------------------------------------------------
box.space._cluster:replace{1, require('uuid').NULL:str()}
diff --git a/test/replication-py/cluster.test.py b/test/replication-py/cluster.test.py
index 0140a6bd..088ca9c3 100644
--- a/test/replication-py/cluster.test.py
+++ b/test/replication-py/cluster.test.py
@@ -71,31 +71,6 @@ server.iproto.reconnect() # re-connect with new permissions
server_id = check_join('join with granted role')
server.iproto.py_con.space('_cluster').delete(server_id)
-print '-------------------------------------------------------------'
-print 'gh-707: Master crashes on JOIN if it does not have snapshot files'
-print 'gh-480: If socket is closed while JOIN, replica wont reconnect'
-print '-------------------------------------------------------------'
-
-data_dir = os.path.join(server.vardir, server.name)
-for k in glob.glob(os.path.join(data_dir, '*.snap')):
- os.unlink(k)
-
-# remember the number of servers in _cluster table
-server_count = len(server.iproto.py_con.space('_cluster').select(()))
-
-rows = list(server.iproto.py_con.join(replica_uuid))
-print len(rows) > 0 and rows[-1].return_message.find('.snap') >= 0 and \
- 'ok' or 'not ok', '-', 'join without snapshots'
-res = server.iproto.py_con.space('_cluster').select(())
-if server_count <= len(res):
- print 'ok - _cluster did not change after unsuccessful JOIN'
-else:
- print 'not ok - _cluster did change after unsuccessful JOIN'
- print res
-
-server.admin("box.schema.user.revoke('guest', 'replication')")
-server.admin('box.snapshot()')
-
print '-------------------------------------------------------------'
print 'gh-434: Assertion if replace _cluster tuple for local server'
print '-------------------------------------------------------------'
diff --git a/test/replication/join_without_snap.result b/test/replication/join_without_snap.result
new file mode 100644
index 00000000..becdfd21
--- /dev/null
+++ b/test/replication/join_without_snap.result
@@ -0,0 +1,88 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--
+-- gh-1271: check that replica join works off the current read view,
+-- not the last checkpoint. To do that, delete the last snapshot file
+-- and check that a replica can still join.
+--
+_ = box.schema.space.create('test')
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+for i = 1, 5 do box.space.test:insert{i} end
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+
+fio = require('fio')
+ | ---
+ | ...
+fio.unlink(fio.pathjoin(box.cfg.memtx_dir, string.format('%020d.snap', box.info.signature)))
+ | ---
+ | - true
+ | ...
+
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('switch replica')
+ | ---
+ | - true
+ | ...
+
+box.space.test:select()
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | - [5]
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('cleanup server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+test_run:cleanup_cluster()
+ | ---
+ | ...
+
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+box.space.test:drop()
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
diff --git a/test/replication/join_without_snap.test.lua b/test/replication/join_without_snap.test.lua
new file mode 100644
index 00000000..6a23d741
--- /dev/null
+++ b/test/replication/join_without_snap.test.lua
@@ -0,0 +1,32 @@
+test_run = require('test_run').new()
+
+--
+-- gh-1271: check that replica join works off the current read view,
+-- not the last checkpoint. To do that, delete the last snapshot file
+-- and check that a replica can still join.
+--
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+for i = 1, 5 do box.space.test:insert{i} end
+box.snapshot()
+
+fio = require('fio')
+fio.unlink(fio.pathjoin(box.cfg.memtx_dir, string.format('%020d.snap', box.info.signature)))
+
+box.schema.user.grant('guest', 'replication')
+
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica.lua"')
+test_run:cmd('start server replica')
+test_run:cmd('switch replica')
+
+box.space.test:select()
+
+test_run:cmd('switch default')
+test_run:cmd('stop server replica')
+test_run:cmd('cleanup server replica')
+test_run:cmd('delete server replica')
+test_run:cleanup_cluster()
+
+box.schema.user.revoke('guest', 'replication')
+box.space.test:drop()
+box.snapshot()
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 91e884ec..eb25077d 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -10,6 +10,7 @@
"force_recovery.test.lua": {},
"on_schema_init.test.lua": {},
"long_row_timeout.test.lua": {},
+ "join_without_snap.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index e8795143..2635da26 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -1116,7 +1116,7 @@ box.snapshot()
box.schema.user.grant('guest', 'replication')
---
...
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', true)
+errinj.set('ERRINJ_VY_READ_PAGE', true)
---
- ok
...
@@ -1136,7 +1136,7 @@ test_run:cmd("delete server replica")
---
- true
...
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', false)
+errinj.set('ERRINJ_VY_READ_PAGE', false)
---
- ok
...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index 034ed34c..4230cfae 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -404,12 +404,12 @@ _ = s:create_index('pk')
s:replace{1, 2, 3}
box.snapshot()
box.schema.user.grant('guest', 'replication')
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', true)
+errinj.set('ERRINJ_VY_READ_PAGE', true)
test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
test_run:cmd("start server replica with crash_expected=True")
test_run:cmd("cleanup server replica")
test_run:cmd("delete server replica")
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', false)
+errinj.set('ERRINJ_VY_READ_PAGE', false)
box.schema.user.revoke('guest', 'replication')
s:drop()
diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result
index cddc9c3b..60283281 100644
--- a/test/xlog/panic_on_broken_lsn.result
+++ b/test/xlog/panic_on_broken_lsn.result
@@ -123,16 +123,33 @@ box.space.test:auto_increment{'v0'}
---
- [1, 'v0']
...
-lsn = box.info.vclock[1]
+-- Inject a broken LSN in the final join stage.
+lsn = -1
---
...
-box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", true)
---
- ok
...
-box.space.test:auto_increment{'v1'}
+fiber = require('fiber')
---
-- [2, 'v1']
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+_ = fiber.create(function()
+ test_run:wait_cond(function() return box.space._cluster:get(2) ~= nil end)
+ lsn = box.info.vclock[1]
+ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+ box.space.test:auto_increment{'v1'}
+ box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
+end);
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
...
test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"')
---
@@ -142,12 +159,6 @@ test_run:cmd('start server replica with crash_expected=True')
---
- false
...
-fiber = require('fiber')
----
-...
-while box.info.replication[2] == nil do fiber.sleep(0.001) end
----
-...
box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
---
- ok
diff --git a/test/xlog/panic_on_broken_lsn.test.lua b/test/xlog/panic_on_broken_lsn.test.lua
index cdf287a1..ca304345 100644
--- a/test/xlog/panic_on_broken_lsn.test.lua
+++ b/test/xlog/panic_on_broken_lsn.test.lua
@@ -57,14 +57,24 @@ box.schema.user.grant('guest', 'replication')
_ = box.schema.space.create('test', {id = 9000})
_ = box.space.test:create_index('pk')
box.space.test:auto_increment{'v0'}
-lsn = box.info.vclock[1]
-box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
-box.space.test:auto_increment{'v1'}
+
+-- Inject a broken LSN in the final join stage.
+lsn = -1
+box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", true)
+
+fiber = require('fiber')
+test_run:cmd("setopt delimiter ';'")
+_ = fiber.create(function()
+ test_run:wait_cond(function() return box.space._cluster:get(2) ~= nil end)
+ lsn = box.info.vclock[1]
+ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+ box.space.test:auto_increment{'v1'}
+ box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
+end);
+test_run:cmd("setopt delimiter ''");
test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"')
test_run:cmd('start server replica with crash_expected=True')
-fiber = require('fiber')
-while box.info.replication[2] == nil do fiber.sleep(0.001) end
box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
-- Check that log contains the mention of broken LSN and the request printout
--
2.20.1
More information about the Tarantool-patches
mailing list