[PATCH v2] relay: join new replicas off read view

Vladimir Davydov vdavydov.dev at gmail.com
Tue Aug 27 11:33:04 MSK 2019


Historically, we join a new replica off the last checkpoint. As a
result, we must always keep the last memtx snapshot and all vinyl data
files corresponding to it. Actually, there's no need to use the last
checkpoint for joining a replica. Instead we can use the current read
view as both memtx and vinyl support it. This should speed up the
process of joining a new replica, because we don't need to replay all
xlogs written after the last checkpoint, only those that are accumulated
while we are relaying the current read view. This should also allow us
to avoid creating a snapshot file on bootstrap, because the only reason
why we need it is allowing joining replicas. Besides, this is a step
towards decoupling the vinyl metadata log from checkpointing in
particular and from xlogs in general.

Closes #1271
---
https://github.com/tarantool/tarantool/issues/1271
https://github.com/tarantool/tarantool/tree/dv/gh-1271-rework-replica-join

Changes in v2:
 - In case of memtx, use snapshot iterators from a separate thread
   so as not to consume too much of precious tx cpu time. Vinyl's
   join callback still runs from the tx thread, because its iterators
   can't be used from another thread.
 - Setup WAL consumer before starting initial join, because we don't
   pin a checkpoint anymore and hence can't be sure that xlogs following
   the read view fed to the new replica won't be dropped while the
   initial join stage is in progress.

 src/box/blackhole.c                         |   2 +
 src/box/box.cc                              |  60 +--
 src/box/engine.c                            |  68 ++-
 src/box/engine.h                            |  50 ++-
 src/box/memtx_engine.c                      | 163 +++++---
 src/box/relay.cc                            |  25 +-
 src/box/relay.h                             |   2 +-
 src/box/sysview.c                           |   2 +
 src/box/vinyl.c                             | 441 +++++++-------------
 src/box/vy_run.c                            |   6 -
 src/box/vy_tx.c                             |  10 +-
 src/box/vy_tx.h                             |   8 +
 src/lib/core/errinj.h                       |   2 +-
 test/box/errinj.result                      |  38 +-
 test/replication-py/cluster.result          |  13 -
 test/replication-py/cluster.test.py         |  25 --
 test/replication/join_without_snap.result   |  88 ++++
 test/replication/join_without_snap.test.lua |  32 ++
 test/replication/suite.cfg                  |   1 +
 test/vinyl/errinj.result                    |   4 +-
 test/vinyl/errinj.test.lua                  |   4 +-
 test/xlog/panic_on_broken_lsn.result        |  31 +-
 test/xlog/panic_on_broken_lsn.test.lua      |  20 +-
 23 files changed, 614 insertions(+), 481 deletions(-)
 create mode 100644 test/replication/join_without_snap.result
 create mode 100644 test/replication/join_without_snap.test.lua

diff --git a/src/box/blackhole.c b/src/box/blackhole.c
index 22ef324b..4f6ea9ab 100644
--- a/src/box/blackhole.c
+++ b/src/box/blackhole.c
@@ -177,7 +177,9 @@ blackhole_engine_create_space(struct engine *engine, struct space_def *def,
 static const struct engine_vtab blackhole_engine_vtab = {
 	/* .shutdown = */ blackhole_engine_shutdown,
 	/* .create_space = */ blackhole_engine_create_space,
+	/* .prepare_join = */ generic_engine_prepare_join,
 	/* .join = */ generic_engine_join,
+	/* .complete_join = */ generic_engine_complete_join,
 	/* .begin = */ generic_engine_begin,
 	/* .begin_statement = */ generic_engine_begin_statement,
 	/* .prepare = */ generic_engine_prepare,
diff --git a/src/box/box.cc b/src/box/box.cc
index 66cd6d3a..fd4487da 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1468,33 +1468,14 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	}
 
 	/*
-	 * The only case when the directory index is empty is
-	 * when someone has deleted a snapshot and tries to join
-	 * as a replica. Our best effort is to not crash in such
-	 * case: raise ER_MISSING_SNAPSHOT.
+	 * Register the replica as a WAL consumer so that
+	 * it can resume FINAL JOIN where INITIAL JOIN ends.
 	 */
-	struct gc_checkpoint *checkpoint = gc_last_checkpoint();
-	if (checkpoint == NULL)
-		tnt_raise(ClientError, ER_MISSING_SNAPSHOT);
-
-	/* Remember start vclock. */
-	struct vclock start_vclock;
-	vclock_copy(&start_vclock, &checkpoint->vclock);
-
-	/*
-	 * Make sure the checkpoint files won't be deleted while
-	 * initial join is in progress.
-	 */
-	struct gc_checkpoint_ref gc;
-	gc_ref_checkpoint(checkpoint, &gc, "replica %s",
-			  tt_uuid_str(&instance_uuid));
-	auto gc_guard = make_scoped_guard([&]{ gc_unref_checkpoint(&gc); });
-
-	/* Respond to JOIN request with start_vclock. */
-	struct xrow_header row;
-	xrow_encode_vclock_xc(&row, &start_vclock);
-	row.sync = header->sync;
-	coio_write_xrow(io, &row);
+	struct gc_consumer *gc = gc_consumer_register(&replicaset.vclock,
+				"replica %s", tt_uuid_str(&instance_uuid));
+	if (gc == NULL)
+		diag_raise();
+	auto gc_guard = make_scoped_guard([&] { gc_consumer_unregister(gc); });
 
 	say_info("joining replica %s at %s",
 		 tt_uuid_str(&instance_uuid), sio_socketname(io->fd));
@@ -1502,6 +1483,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	/*
 	 * Initial stream: feed replica with dirty data from engines.
 	 */
+	struct vclock start_vclock;
 	relay_initial_join(io->fd, header->sync, &start_vclock);
 	say_info("initial data sent.");
 
@@ -1513,23 +1495,14 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	 */
 	box_on_join(&instance_uuid);
 
+	ERROR_INJECT_YIELD(ERRINJ_REPLICA_JOIN_DELAY);
+
 	/* Remember master's vclock after the last request */
 	struct vclock stop_vclock;
 	vclock_copy(&stop_vclock, &replicaset.vclock);
 
-	/*
-	 * Register the replica as a WAL consumer so that
-	 * it can resume SUBSCRIBE where FINAL JOIN ends.
-	 */
-	replica = replica_by_uuid(&instance_uuid);
-	if (replica->gc != NULL)
-		gc_consumer_unregister(replica->gc);
-	replica->gc = gc_consumer_register(&stop_vclock, "replica %s",
-					   tt_uuid_str(&instance_uuid));
-	if (replica->gc == NULL)
-		diag_raise();
-
 	/* Send end of initial stage data marker */
+	struct xrow_header row;
 	xrow_encode_vclock_xc(&row, &stop_vclock);
 	row.sync = header->sync;
 	coio_write_xrow(io, &row);
@@ -1545,6 +1518,17 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	xrow_encode_vclock_xc(&row, &replicaset.vclock);
 	row.sync = header->sync;
 	coio_write_xrow(io, &row);
+
+	/*
+	 * Advance the WAL consumer state to the position where
+	 * FINAL JOIN ended and assign it to the replica.
+	 */
+	gc_consumer_advance(gc, &stop_vclock);
+	replica = replica_by_uuid(&instance_uuid);
+	if (replica->gc != NULL)
+		gc_consumer_unregister(replica->gc);
+	replica->gc = gc;
+	gc_guard.is_active = false;
 }
 
 void
diff --git a/src/box/engine.c b/src/box/engine.c
index a52d0ed1..8dc0df1d 100644
--- a/src/box/engine.c
+++ b/src/box/engine.c
@@ -36,6 +36,12 @@
 
 RLIST_HEAD(engines);
 
+/**
+ * For simplicity, assume that the engine count can't exceed
+ * the value of this constant.
+ */
+enum { MAX_ENGINE_COUNT = 10 };
+
 /** Register engine instance. */
 void engine_register(struct engine *engine)
 {
@@ -175,16 +181,54 @@ engine_backup(const struct vclock *vclock, engine_backup_cb cb, void *cb_arg)
 }
 
 int
-engine_join(const struct vclock *vclock, struct xstream *stream)
+engine_prepare_join(struct engine_join_ctx *ctx)
 {
+	ctx->array = calloc(MAX_ENGINE_COUNT, sizeof(void *));
+	if (ctx->array == NULL) {
+		diag_set(OutOfMemory, MAX_ENGINE_COUNT * sizeof(void *),
+			 "malloc", "engine join context");
+		return -1;
+	}
+	int i = 0;
 	struct engine *engine;
 	engine_foreach(engine) {
-		if (engine->vtab->join(engine, vclock, stream) != 0)
+		assert(i < MAX_ENGINE_COUNT);
+		if (engine->vtab->prepare_join(engine, &ctx->array[i]) != 0)
+			goto fail;
+		i++;
+	}
+	return 0;
+fail:
+	engine_complete_join(ctx);
+	return -1;
+}
+
+int
+engine_join(struct engine_join_ctx *ctx, struct xstream *stream)
+{
+	int i = 0;
+	struct engine *engine;
+	engine_foreach(engine) {
+		if (engine->vtab->join(engine, ctx->array[i], stream) != 0)
 			return -1;
+		i++;
 	}
 	return 0;
 }
 
+void
+engine_complete_join(struct engine_join_ctx *ctx)
+{
+	int i = 0;
+	struct engine *engine;
+	engine_foreach(engine) {
+		if (ctx->array[i] != NULL)
+			engine->vtab->complete_join(engine, ctx->array[i]);
+		i++;
+	}
+	free(ctx->array);
+}
+
 void
 engine_memory_stat(struct engine_memory_stat *stat)
 {
@@ -205,15 +249,29 @@ engine_reset_stat(void)
 /* {{{ Virtual method stubs */
 
 int
-generic_engine_join(struct engine *engine, const struct vclock *vclock,
-		    struct xstream *stream)
+generic_engine_prepare_join(struct engine *engine, void **ctx)
 {
 	(void)engine;
-	(void)vclock;
+	*ctx = NULL;
+	return 0;
+}
+
+int
+generic_engine_join(struct engine *engine, void *ctx, struct xstream *stream)
+{
+	(void)engine;
+	(void)ctx;
 	(void)stream;
 	return 0;
 }
 
+void
+generic_engine_complete_join(struct engine *engine, void *ctx)
+{
+	(void)engine;
+	(void)ctx;
+}
+
 int
 generic_engine_begin(struct engine *engine, struct txn *txn)
 {
diff --git a/src/box/engine.h b/src/box/engine.h
index ef694da0..07d7fac9 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -74,10 +74,21 @@ struct engine_vtab {
 	struct space *(*create_space)(struct engine *engine,
 			struct space_def *def, struct rlist *key_list);
 	/**
-	 * Write statements stored in checkpoint @vclock to @stream.
+	 * Freeze a read view to feed to a new replica.
+	 * Setup and return a context that will be used
+	 * on further steps.
 	 */
-	int (*join)(struct engine *engine, const struct vclock *vclock,
-		    struct xstream *stream);
+	int (*prepare_join)(struct engine *engine, void **ctx);
+	/**
+	 * Feed the read view frozen on the previous step to
+	 * the given stream.
+	 */
+	int (*join)(struct engine *engine, void *ctx, struct xstream *stream);
+	/**
+	 * Release the read view and free the context prepared
+	 * on the first step.
+	 */
+	void (*complete_join)(struct engine *engine, void *ctx);
 	/**
 	 * Begin a new single or multi-statement transaction.
 	 * Called on first statement in a transaction, not when
@@ -220,6 +231,11 @@ struct engine {
 	struct rlist link;
 };
 
+struct engine_join_ctx {
+	/** Array of engine join contexts, one per each engine. */
+	void **array;
+};
+
 /** Register engine engine instance. */
 void engine_register(struct engine *engine);
 
@@ -328,12 +344,14 @@ engine_begin_final_recovery(void);
 int
 engine_end_recovery(void);
 
-/**
- * Feed checkpoint data as join events to the replicas.
- * (called on the master).
- */
 int
-engine_join(const struct vclock *vclock, struct xstream *stream);
+engine_prepare_join(struct engine_join_ctx *ctx);
+
+int
+engine_join(struct engine_join_ctx *ctx, struct xstream *stream);
+
+void
+engine_complete_join(struct engine_join_ctx *ctx);
 
 int
 engine_begin_checkpoint(void);
@@ -362,8 +380,9 @@ engine_reset_stat(void);
 /*
  * Virtual method stubs.
  */
-int generic_engine_join(struct engine *, const struct vclock *,
-			struct xstream *);
+int generic_engine_prepare_join(struct engine *, void **);
+int generic_engine_join(struct engine *, void *, struct xstream *);
+void generic_engine_complete_join(struct engine *, void *);
 int generic_engine_begin(struct engine *, struct txn *);
 int generic_engine_begin_statement(struct engine *, struct txn *);
 int generic_engine_prepare(struct engine *, struct txn *);
@@ -467,9 +486,16 @@ engine_end_recovery_xc(void)
 }
 
 static inline void
-engine_join_xc(const struct vclock *vclock, struct xstream *stream)
+engine_prepare_join_xc(struct engine_join_ctx *ctx)
 {
-	if (engine_join(vclock, stream) != 0)
+	if (engine_prepare_join(ctx) != 0)
+		diag_raise();
+}
+
+static inline void
+engine_join_xc(struct engine_join_ctx *ctx, struct xstream *stream)
+{
+	if (engine_join(ctx, stream) != 0)
 		diag_raise();
 }
 
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index f6a33282..c6bf3f07 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -734,79 +734,136 @@ memtx_engine_backup(struct engine *engine, const struct vclock *vclock,
 	return cb(filename, cb_arg);
 }
 
-/** Used to pass arguments to memtx_initial_join_f */
-struct memtx_join_arg {
-	const char *snap_dirname;
-	int64_t checkpoint_lsn;
+struct memtx_join_entry {
+	struct rlist in_ctx;
+	uint32_t space_id;
+	struct snapshot_iterator *iterator;
+};
+
+struct memtx_join_ctx {
+	struct rlist entries;
 	struct xstream *stream;
 };
 
-/**
- * Invoked from a thread to feed snapshot rows.
- */
 static int
-memtx_initial_join_f(va_list ap)
+memtx_join_add_space(struct space *space, void *arg)
 {
-	struct memtx_join_arg *arg = va_arg(ap, struct memtx_join_arg *);
-	const char *snap_dirname = arg->snap_dirname;
-	int64_t checkpoint_lsn = arg->checkpoint_lsn;
-	struct xstream *stream = arg->stream;
+	struct memtx_join_ctx *ctx = arg;
+	if (!space_is_memtx(space))
+		return 0;
+	if (space_is_temporary(space))
+		return 0;
+	if (space_group_id(space) == GROUP_LOCAL)
+		return 0;
+	struct index *pk = space_index(space, 0);
+	if (pk == NULL)
+		return 0;
+	struct memtx_join_entry *entry = malloc(sizeof(*entry));
+	if (entry == NULL) {
+		diag_set(OutOfMemory, sizeof(*entry),
+			 "malloc", "struct memtx_join_entry");
+		return -1;
+	}
+	entry->space_id = space_id(space);
+	entry->iterator = index_create_snapshot_iterator(pk);
+	if (entry->iterator == NULL) {
+		free(entry);
+		return -1;
+	}
+	rlist_add_tail_entry(&ctx->entries, entry, in_ctx);
+	return 0;
+}
 
-	struct xdir dir;
-	/*
-	 * snap_dirname and INSTANCE_UUID don't change after start,
-	 * safe to use in another thread.
-	 */
-	xdir_create(&dir, snap_dirname, SNAP, &INSTANCE_UUID,
-		    &xlog_opts_default);
-	struct xlog_cursor cursor;
-	int rc = xdir_open_cursor(&dir, checkpoint_lsn, &cursor);
-	xdir_destroy(&dir);
-	if (rc < 0)
+static int
+memtx_engine_prepare_join(struct engine *engine, void **arg)
+{
+	(void)engine;
+	struct memtx_join_ctx *ctx = malloc(sizeof(*ctx));
+	if (ctx == NULL) {
+		diag_set(OutOfMemory, sizeof(*ctx),
+			 "malloc", "struct memtx_join_ctx");
 		return -1;
+	}
+	rlist_create(&ctx->entries);
+	if (space_foreach(memtx_join_add_space, ctx) != 0) {
+		free(ctx);
+		return -1;
+	}
+	*arg = ctx;
+	return 0;
+}
+
+static int
+memtx_join_send_tuple(struct xstream *stream, uint32_t space_id,
+		      const char *data, size_t size)
+{
+	struct request_replace_body body;
+	request_replace_body_create(&body, space_id);
 
 	struct xrow_header row;
-	while ((rc = xlog_cursor_next(&cursor, &row, true)) == 0) {
-		rc = xstream_write(stream, &row);
-		if (rc < 0)
-			break;
+	memset(&row, 0, sizeof(row));
+	row.type = IPROTO_INSERT;
+
+	row.bodycnt = 2;
+	row.body[0].iov_base = &body;
+	row.body[0].iov_len = sizeof(body);
+	row.body[1].iov_base = (char *)data;
+	row.body[1].iov_len = size;
+
+	return xstream_write(stream, &row);
+}
+
+static int
+memtx_join_f(va_list ap)
+{
+	struct memtx_join_ctx *ctx = va_arg(ap, struct memtx_join_ctx *);
+	struct memtx_join_entry *entry;
+	rlist_foreach_entry(entry, &ctx->entries, in_ctx) {
+		struct snapshot_iterator *it = entry->iterator;
+		int rc;
+		uint32_t size;
+		const char *data;
+		while ((rc = it->next(it, &data, &size)) == 0 && data != NULL) {
+			if (memtx_join_send_tuple(ctx->stream, entry->space_id,
+						  data, size) != 0)
+				return -1;
+		}
+		if (rc != 0)
+			return -1;
 	}
-	xlog_cursor_close(&cursor, false);
-	if (rc < 0)
-		return -1;
-
-	/**
-	 * We should never try to read snapshots with no EOF
-	 * marker - such snapshots are very likely corrupted and
-	 * should not be trusted.
-	 */
-	/* TODO: replace panic with diag_set() */
-	if (!xlog_cursor_is_eof(&cursor))
-		panic("snapshot `%s' has no EOF marker", cursor.name);
 	return 0;
 }
 
 static int
-memtx_engine_join(struct engine *engine, const struct vclock *vclock,
-		  struct xstream *stream)
+memtx_engine_join(struct engine *engine, void *arg, struct xstream *stream)
 {
-	struct memtx_engine *memtx = (struct memtx_engine *)engine;
-
+	(void)engine;
+	struct memtx_join_ctx *ctx = arg;
+	ctx->stream = stream;
 	/*
-	 * cord_costart() passes only void * pointer as an argument.
+	 * Memtx snapshot iterators are safe to use from another
+	 * thread and so we do so as not to consume too much of
+	 * precious tx cpu time while a new replica is joining.
 	 */
-	struct memtx_join_arg arg = {
-		/* .snap_dirname   = */ memtx->snap_dir.dirname,
-		/* .checkpoint_lsn = */ vclock_sum(vclock),
-		/* .stream         = */ stream
-	};
-
-	/* Send snapshot using a thread */
 	struct cord cord;
-	cord_costart(&cord, "initial_join", memtx_initial_join_f, &arg);
+	if (cord_costart(&cord, "initial_join", memtx_join_f, ctx) != 0)
+		return -1;
 	return cord_cojoin(&cord);
 }
 
+static void
+memtx_engine_complete_join(struct engine *engine, void *arg)
+{
+	(void)engine;
+	struct memtx_join_ctx *ctx = arg;
+	struct memtx_join_entry *entry, *next;
+	rlist_foreach_entry_safe(entry, &ctx->entries, in_ctx, next) {
+		entry->iterator->free(entry->iterator);
+		free(entry);
+	}
+	free(ctx);
+}
+
 static int
 small_stats_noop_cb(const struct mempool_stats *stats, void *cb_ctx)
 {
@@ -830,7 +887,9 @@ memtx_engine_memory_stat(struct engine *engine, struct engine_memory_stat *stat)
 static const struct engine_vtab memtx_engine_vtab = {
 	/* .shutdown = */ memtx_engine_shutdown,
 	/* .create_space = */ memtx_engine_create_space,
+	/* .prepare_join = */ memtx_engine_prepare_join,
 	/* .join = */ memtx_engine_join,
+	/* .complete_join = */ memtx_engine_complete_join,
 	/* .begin = */ memtx_engine_begin,
 	/* .begin_statement = */ generic_engine_begin_statement,
 	/* .prepare = */ generic_engine_prepare,
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a19abf6a..74588cba 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -298,7 +298,30 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
 		relay_delete(relay);
 	});
 
-	engine_join_xc(vclock, &relay->stream);
+	/* Freeze a read view in engines. */
+	struct engine_join_ctx ctx;
+	engine_prepare_join_xc(&ctx);
+	auto join_guard = make_scoped_guard([&] {
+		engine_complete_join(&ctx);
+	});
+
+	/*
+	 * Sync WAL to make sure that all changes visible from
+	 * the frozen read view are successfully committed.
+	 */
+	if (wal_sync() != 0)
+		diag_raise();
+
+	vclock_copy(vclock, &replicaset.vclock);
+
+	/* Respond to the JOIN request with the current vclock. */
+	struct xrow_header row;
+	xrow_encode_vclock_xc(&row, vclock);
+	row.sync = sync;
+	coio_write_xrow(&relay->io, &row);
+
+	/* Send read view to the replica. */
+	engine_join_xc(&ctx, &relay->stream);
 }
 
 int
diff --git a/src/box/relay.h b/src/box/relay.h
index 869f8f2e..e1782d78 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -102,7 +102,7 @@ relay_last_row_time(const struct relay *relay);
  *
  * @param fd        client connection
  * @param sync      sync from incoming JOIN request
- * @param vclock    vclock of the last checkpoint
+ * @param vclock[out] vclock of the read view sent to the replica
  */
 void
 relay_initial_join(int fd, uint64_t sync, struct vclock *vclock);
diff --git a/src/box/sysview.c b/src/box/sysview.c
index 1fbe3aa2..00c320b6 100644
--- a/src/box/sysview.c
+++ b/src/box/sysview.c
@@ -565,7 +565,9 @@ sysview_engine_create_space(struct engine *engine, struct space_def *def,
 static const struct engine_vtab sysview_engine_vtab = {
 	/* .shutdown = */ sysview_engine_shutdown,
 	/* .create_space = */ sysview_engine_create_space,
+	/* .prepare_join = */ generic_engine_prepare_join,
 	/* .join = */ generic_engine_join,
+	/* .complete_join = */ generic_engine_complete_join,
 	/* .begin = */ generic_engine_begin,
 	/* .begin_statement = */ generic_engine_begin_statement,
 	/* .prepare = */ generic_engine_prepare,
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index de4a06c4..20bc6e1a 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -166,6 +166,12 @@ struct vinyl_iterator {
 	struct trigger on_tx_destroy;
 };
 
+struct vinyl_snapshot_iterator {
+	struct snapshot_iterator base;
+	struct vy_read_view *rv;
+	struct vy_read_iterator iterator;
+};
+
 static const struct engine_vtab vinyl_engine_vtab;
 static const struct space_vtab vinyl_space_vtab;
 static const struct index_vtab vinyl_index_vtab;
@@ -2894,309 +2900,118 @@ vinyl_engine_end_recovery(struct engine *engine)
 
 /** {{{ Replication */
 
-/** Relay context, passed to all relay functions. */
+struct vy_join_entry {
+	struct rlist in_ctx;
+	uint32_t space_id;
+	struct snapshot_iterator *iterator;
+};
+
 struct vy_join_ctx {
-	/** Environment. */
-	struct vy_env *env;
-	/** Stream to relay statements to. */
-	struct xstream *stream;
-	/** Pipe to the relay thread. */
-	struct cpipe relay_pipe;
-	/** Pipe to the tx thread. */
-	struct cpipe tx_pipe;
-	/**
-	 * Cbus message, used for calling functions
-	 * on behalf of the relay thread.
-	 */
-	struct cbus_call_msg cmsg;
-	/** ID of the space currently being relayed. */
-	uint32_t space_id;
-	/**
-	 * LSM tree key definition, as defined by the user.
-	 * We only send the primary key, so the definition
-	 * provided by the user is correct for compare.
-	 */
-	struct key_def *key_def;
-	/** LSM tree format used for REPLACE and DELETE statements. */
-	struct tuple_format *format;
-	/**
-	 * Write iterator for merging runs before sending
-	 * them to the replica.
-	 */
-	struct vy_stmt_stream *wi;
-	/**
-	 * List of run slices of the current range, linked by
-	 * vy_slice::in_join. The newer a slice the closer it
-	 * is to the head of the list.
-	 */
-	struct rlist slices;
+	struct rlist entries;
 };
 
-/**
- * Recover a slice and add it to the list of slices.
- * Newer slices are supposed to be recovered first.
- * Returns 0 on success, -1 on failure.
- */
 static int
-vy_prepare_send_slice(struct vy_join_ctx *ctx,
-		      struct vy_slice_recovery_info *slice_info)
+vy_join_add_space(struct space *space, void *arg)
 {
-	int rc = -1;
-	struct vy_run *run = NULL;
-	struct vy_entry begin = vy_entry_none();
-	struct vy_entry end = vy_entry_none();
-
-	run = vy_run_new(&ctx->env->run_env, slice_info->run->id);
-	if (run == NULL)
-		goto out;
-	if (vy_run_recover(run, ctx->env->path, ctx->space_id, 0,
-			   ctx->key_def) != 0)
-		goto out;
-
-	if (slice_info->begin != NULL) {
-		begin = vy_entry_key_from_msgpack(ctx->env->lsm_env.key_format,
-						  ctx->key_def,
-						  slice_info->begin);
-		if (begin.stmt == NULL)
-			goto out;
-	}
-	if (slice_info->end != NULL) {
-		end = vy_entry_key_from_msgpack(ctx->env->lsm_env.key_format,
-						ctx->key_def,
-						slice_info->end);
-		if (end.stmt == NULL)
-			goto out;
-	}
-
-	struct vy_slice *slice = vy_slice_new(slice_info->id, run,
-					      begin, end, ctx->key_def);
-	if (slice == NULL)
-		goto out;
-
-	rlist_add_tail_entry(&ctx->slices, slice, in_join);
-	rc = 0;
-out:
-	if (run != NULL)
-		vy_run_unref(run);
-	if (begin.stmt != NULL)
-		tuple_unref(begin.stmt);
-	if (end.stmt != NULL)
-		tuple_unref(end.stmt);
-	return rc;
-}
-
-static int
-vy_send_range_f(struct cbus_call_msg *cmsg)
-{
-	struct vy_join_ctx *ctx = container_of(cmsg, struct vy_join_ctx, cmsg);
-
-	int rc = ctx->wi->iface->start(ctx->wi);
-	if (rc != 0)
-		goto err;
-	struct vy_entry entry;
-	while ((rc = ctx->wi->iface->next(ctx->wi, &entry)) == 0 &&
-	       entry.stmt != NULL) {
-		struct xrow_header xrow;
-		rc = vy_stmt_encode_primary(entry.stmt, ctx->key_def,
-					    ctx->space_id, &xrow);
-		if (rc != 0)
-			break;
-		/*
-		 * Reset the LSN as the replica will ignore it
-		 * anyway.
-		 */
-		xrow.lsn = 0;
-		rc = xstream_write(ctx->stream, &xrow);
-		if (rc != 0)
-			break;
-		fiber_gc();
-	}
-err:
-	ctx->wi->iface->stop(ctx->wi);
-	fiber_gc();
-	return rc;
-}
-
-/** Merge and send all runs of the given range. */
-static int
-vy_send_range(struct vy_join_ctx *ctx,
-	      struct vy_range_recovery_info *range_info)
-{
-	int rc;
-	struct vy_slice *slice, *tmp;
-
-	if (rlist_empty(&range_info->slices))
-		return 0; /* nothing to do */
-
-	/* Recover slices. */
-	struct vy_slice_recovery_info *slice_info;
-	rlist_foreach_entry(slice_info, &range_info->slices, in_range) {
-		rc = vy_prepare_send_slice(ctx, slice_info);
-		if (rc != 0)
-			goto out_delete_slices;
-	}
-
-	/* Create a write iterator. */
-	struct rlist fake_read_views;
-	rlist_create(&fake_read_views);
-	ctx->wi = vy_write_iterator_new(ctx->key_def, true, true,
-					&fake_read_views, NULL);
-	if (ctx->wi == NULL) {
-		rc = -1;
-		goto out;
-	}
-	rlist_foreach_entry(slice, &ctx->slices, in_join) {
-		rc = vy_write_iterator_new_slice(ctx->wi, slice, ctx->format);
-		if (rc != 0)
-			goto out_delete_wi;
-	}
-
-	/* Do the actual work from the relay thread. */
-	bool cancellable = fiber_set_cancellable(false);
-	rc = cbus_call(&ctx->relay_pipe, &ctx->tx_pipe, &ctx->cmsg,
-		       vy_send_range_f, NULL, TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
-
-out_delete_wi:
-	ctx->wi->iface->close(ctx->wi);
-	ctx->wi = NULL;
-out_delete_slices:
-	rlist_foreach_entry_safe(slice, &ctx->slices, in_join, tmp)
-		vy_slice_delete(slice);
-	rlist_create(&ctx->slices);
-out:
-	return rc;
-}
-
-/** Send all tuples stored in the given LSM tree. */
-static int
-vy_send_lsm(struct vy_join_ctx *ctx, struct vy_lsm_recovery_info *lsm_info)
-{
-	int rc = -1;
-
-	if (lsm_info->drop_lsn >= 0 || lsm_info->create_lsn < 0) {
-		/* Dropped or not yet built LSM tree. */
+	struct vy_join_ctx *ctx = arg;
+	if (!space_is_vinyl(space))
 		return 0;
-	}
-	if (lsm_info->group_id == GROUP_LOCAL) {
-		/* Replica local space. */
+	if (space_group_id(space) == GROUP_LOCAL)
 		return 0;
-	}
-
-	/*
-	 * We are only interested in the primary index LSM tree.
-	 * Secondary keys will be rebuilt on the destination.
-	 */
-	if (lsm_info->index_id != 0)
+	struct index *pk = space_index(space, 0);
+	if (pk == NULL)
 		return 0;
-
-	ctx->space_id = lsm_info->space_id;
-
-	/* Create key definition and tuple format. */
-	ctx->key_def = key_def_new(lsm_info->key_parts,
-				   lsm_info->key_part_count, false);
-	if (ctx->key_def == NULL)
-		goto out;
-	ctx->format = vy_stmt_format_new(&ctx->env->stmt_env, &ctx->key_def, 1,
-					 NULL, 0, 0, NULL);
-	if (ctx->format == NULL)
-		goto out_free_key_def;
-	tuple_format_ref(ctx->format);
-
-	/* Send ranges. */
-	struct vy_range_recovery_info *range_info;
-	assert(!rlist_empty(&lsm_info->ranges));
-	rlist_foreach_entry(range_info, &lsm_info->ranges, in_lsm) {
-		rc = vy_send_range(ctx, range_info);
-		if (rc != 0)
-			break;
-	}
-
-	tuple_format_unref(ctx->format);
-	ctx->format = NULL;
-out_free_key_def:
-	key_def_delete(ctx->key_def);
-	ctx->key_def = NULL;
-out:
-	return rc;
-}
-
-/** Relay cord function. */
-static int
-vy_join_f(va_list ap)
-{
-	struct vy_join_ctx *ctx = va_arg(ap, struct vy_join_ctx *);
-
-	coio_enable();
-
-	cpipe_create(&ctx->tx_pipe, "tx");
-
-	struct cbus_endpoint endpoint;
-	cbus_endpoint_create(&endpoint, cord_name(cord()),
-			     fiber_schedule_cb, fiber());
-
-	cbus_loop(&endpoint);
-
-	cbus_endpoint_destroy(&endpoint, cbus_process);
-	cpipe_destroy(&ctx->tx_pipe);
+	struct vy_join_entry *entry = malloc(sizeof(*entry));
+	if (entry == NULL) {
+		diag_set(OutOfMemory, sizeof(*entry),
+			 "malloc", "struct vy_join_entry");
+		return -1;
+	}
+	entry->space_id = space_id(space);
+	entry->iterator = index_create_snapshot_iterator(pk);
+	if (entry->iterator == NULL) {
+		free(entry);
+		return -1;
+	}
+	rlist_add_tail_entry(&ctx->entries, entry, in_ctx);
 	return 0;
 }
 
 static int
-vinyl_engine_join(struct engine *engine, const struct vclock *vclock,
-		  struct xstream *stream)
+vinyl_engine_prepare_join(struct engine *engine, void **arg)
 {
-	struct vy_env *env = vy_env(engine);
-	int rc = -1;
-
-	/* Allocate the relay context. */
+	(void)engine;
 	struct vy_join_ctx *ctx = malloc(sizeof(*ctx));
 	if (ctx == NULL) {
-		diag_set(OutOfMemory, PATH_MAX, "malloc", "struct vy_join_ctx");
-		goto out;
+		diag_set(OutOfMemory, sizeof(*ctx),
+			 "malloc", "struct vy_join_ctx");
+		return -1;
 	}
-	memset(ctx, 0, sizeof(*ctx));
-	ctx->env = env;
-	ctx->stream = stream;
-	rlist_create(&ctx->slices);
+	rlist_create(&ctx->entries);
+	if (space_foreach(vy_join_add_space, ctx) != 0) {
+		free(ctx);
+		return -1;
+	}
+	*arg = ctx;
+	return 0;
+}
 
-	/* Start the relay cord. */
-	char name[FIBER_NAME_MAX];
-	snprintf(name, sizeof(name), "initial_join_%p", stream);
-	struct cord cord;
-	if (cord_costart(&cord, name, vy_join_f, ctx) != 0)
-		goto out_free_ctx;
-	cpipe_create(&ctx->relay_pipe, name);
+static int
+vy_join_send_tuple(struct xstream *stream, uint32_t space_id,
+		   const char *data, size_t size)
+{
+	struct request_replace_body body;
+	request_replace_body_create(&body, space_id);
 
-	/*
-	 * Load the recovery context from the given point in time.
-	 * Send all runs stored in it to the replica.
-	 */
-	struct vy_recovery *recovery;
-	recovery = vy_recovery_new(vclock_sum(vclock),
-				   VY_RECOVERY_LOAD_CHECKPOINT);
-	if (recovery == NULL) {
-		say_error("failed to recover vylog to join a replica");
-		goto out_join_cord;
-	}
-	rc = 0;
-	struct vy_lsm_recovery_info *lsm_info;
-	rlist_foreach_entry(lsm_info, &recovery->lsms, in_recovery) {
-		rc = vy_send_lsm(ctx, lsm_info);
+	struct xrow_header row;
+	memset(&row, 0, sizeof(row));
+	row.type = IPROTO_INSERT;
+
+	row.bodycnt = 2;
+	row.body[0].iov_base = &body;
+	row.body[0].iov_len = sizeof(body);
+	row.body[1].iov_base = (char *)data;
+	row.body[1].iov_len = size;
+
+	return xstream_write(stream, &row);
+}
+
+static int
+vinyl_engine_join(struct engine *engine, void *arg, struct xstream *stream)
+{
+	(void)engine;
+	int loops = 0;
+	struct vy_join_ctx *ctx = arg;
+	struct vy_join_entry *entry;
+	rlist_foreach_entry(entry, &ctx->entries, in_ctx) {
+		struct snapshot_iterator *it = entry->iterator;
+		int rc;
+		uint32_t size;
+		const char *data;
+		while ((rc = it->next(it, &data, &size)) == 0 && data != NULL) {
+			if (vy_join_send_tuple(stream, entry->space_id,
+					       data, size) != 0)
+				return -1;
+		}
 		if (rc != 0)
-			break;
+			return -1;
+		if (++loops % VY_YIELD_LOOPS == 0)
+			fiber_sleep(0);
 	}
-	vy_recovery_delete(recovery);
+	return 0;
+}
 
-out_join_cord:
-	cbus_stop_loop(&ctx->relay_pipe);
-	cpipe_destroy(&ctx->relay_pipe);
-	if (cord_cojoin(&cord) != 0)
-		rc = -1;
-out_free_ctx:
+static void
+vinyl_engine_complete_join(struct engine *engine, void *arg)
+{
+	(void)engine;
+	struct vy_join_ctx *ctx = arg;
+	struct vy_join_entry *entry, *next;
+	rlist_foreach_entry_safe(entry, &ctx->entries, in_ctx, next) {
+		entry->iterator->free(entry->iterator);
+		free(entry);
+	}
 	free(ctx);
-out:
-	return rc;
 }
 
 /* }}} Replication */
@@ -3852,6 +3667,66 @@ vinyl_index_create_iterator(struct index *base, enum iterator_type type,
 	return (struct iterator *)it;
 }
 
+static int
+vinyl_snapshot_iterator_next(struct snapshot_iterator *base,
+			     const char **data, uint32_t *size)
+{
+	assert(base->next == vinyl_snapshot_iterator_next);
+	struct vinyl_snapshot_iterator *it =
+		(struct vinyl_snapshot_iterator *)base;
+	struct vy_entry entry;
+	if (vy_read_iterator_next(&it->iterator, &entry) != 0)
+		return -1;
+	*data = entry.stmt != NULL ? tuple_data_range(entry.stmt, size) : NULL;
+	return 0;
+}
+
+static void
+vinyl_snapshot_iterator_free(struct snapshot_iterator *base)
+{
+	assert(base->free == vinyl_snapshot_iterator_free);
+	struct vinyl_snapshot_iterator *it =
+		(struct vinyl_snapshot_iterator *)base;
+	struct vy_lsm *lsm = it->iterator.lsm;
+	struct vy_env *env = vy_env(lsm->base.engine);
+	vy_read_iterator_close(&it->iterator);
+	tx_manager_destroy_read_view(env->xm, it->rv);
+	vy_lsm_unref(lsm);
+	free(it);
+}
+
+static struct snapshot_iterator *
+vinyl_index_create_snapshot_iterator(struct index *base)
+{
+	struct vy_lsm *lsm = vy_lsm(base);
+	struct vy_env *env = vy_env(base->engine);
+
+	struct vinyl_snapshot_iterator *it = malloc(sizeof(*it));
+	if (it == NULL) {
+		diag_set(OutOfMemory, sizeof(*it), "malloc",
+			 "struct vinyl_snapshot_iterator");
+		return NULL;
+	}
+	it->base.next = vinyl_snapshot_iterator_next;
+	it->base.free = vinyl_snapshot_iterator_free;
+
+	it->rv = tx_manager_read_view(env->xm);
+	if (it->rv == NULL) {
+		free(it);
+		return NULL;
+	}
+	vy_read_iterator_open(&it->iterator, lsm, NULL,
+			      ITER_ALL, lsm->env->empty_key,
+			      (const struct vy_read_view **)&it->rv);
+	/*
+	 * The index may be dropped while we are reading it.
+	 * The iterator must go on as if nothing happened.
+	 */
+	vy_lsm_ref(lsm);
+
+	return &it->base;
+}
+
 static int
 vinyl_index_get(struct index *index, const char *key,
 		uint32_t part_count, struct tuple **ret)
@@ -4578,7 +4453,9 @@ static struct trigger on_replace_vinyl_deferred_delete = {
 static const struct engine_vtab vinyl_engine_vtab = {
 	/* .shutdown = */ vinyl_engine_shutdown,
 	/* .create_space = */ vinyl_engine_create_space,
+	/* .prepare_join = */ vinyl_engine_prepare_join,
 	/* .join = */ vinyl_engine_join,
+	/* .complete_join = */ vinyl_engine_complete_join,
 	/* .begin = */ vinyl_engine_begin,
 	/* .begin_statement = */ vinyl_engine_begin_statement,
 	/* .prepare = */ vinyl_engine_prepare,
@@ -4644,7 +4521,7 @@ static const struct index_vtab vinyl_index_vtab = {
 	/* .replace = */ generic_index_replace,
 	/* .create_iterator = */ vinyl_index_create_iterator,
 	/* .create_snapshot_iterator = */
-		generic_index_create_snapshot_iterator,
+		vinyl_index_create_snapshot_iterator,
 	/* .stat = */ vinyl_index_stat,
 	/* .compact = */ vinyl_index_compact,
 	/* .reset_stat = */ vinyl_index_reset_stat,
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index c6c17aee..25b6dcd3 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -1675,14 +1675,8 @@ vy_run_recover(struct vy_run *run, const char *dir,
 
 	/* Read run header. */
 	struct xrow_header xrow;
-	ERROR_INJECT(ERRINJ_VYRUN_INDEX_GARBAGE, {
-		errinj(ERRINJ_XLOG_GARBAGE, ERRINJ_BOOL)->bparam = true;
-	});
 	/* all rows should be in one tx */
 	int rc = xlog_cursor_next_tx(&cursor);
-	ERROR_INJECT(ERRINJ_VYRUN_INDEX_GARBAGE, {
-		errinj(ERRINJ_XLOG_GARBAGE, ERRINJ_BOOL)->bparam = false;
-	});
 
 	if (rc != 0) {
 		if (rc > 0)
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 1a5d4837..d092e0cd 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -156,8 +156,7 @@ tx_manager_mem_used(struct tx_manager *xm)
 	return ret;
 }
 
-/** Create or reuse an instance of a read view. */
-static struct vy_read_view *
+struct vy_read_view *
 tx_manager_read_view(struct tx_manager *xm)
 {
 	struct vy_read_view *rv;
@@ -195,12 +194,9 @@ tx_manager_read_view(struct tx_manager *xm)
 	return rv;
 }
 
-/** Dereference and possibly destroy a read view. */
-static void
-tx_manager_destroy_read_view(struct tx_manager *xm,
-			     const struct vy_read_view *read_view)
+void
+tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv)
 {
-	struct vy_read_view *rv = (struct vy_read_view *) read_view;
 	if (rv == xm->p_global_read_view)
 		return;
 	assert(rv->refs);
diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h
index 376f4330..3144c921 100644
--- a/src/box/vy_tx.h
+++ b/src/box/vy_tx.h
@@ -289,6 +289,14 @@ tx_manager_delete(struct tx_manager *xm);
 size_t
 tx_manager_mem_used(struct tx_manager *xm);
 
+/** Create or reuse an instance of a read view. */
+struct vy_read_view *
+tx_manager_read_view(struct tx_manager *xm);
+
+/** Dereference and possibly destroy a read view. */
+void
+tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv);
+
 /**
  * Abort all rw transactions that affect the given space
  * and haven't reached WAL yet. Called before executing a DDL
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index e75a620d..3072a00e 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -102,11 +102,11 @@ struct errinj {
 	_(ERRINJ_RELAY_REPORT_INTERVAL, ERRINJ_DOUBLE, {.dparam = 0}) \
 	_(ERRINJ_RELAY_FINAL_SLEEP, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_RELAY_FINAL_JOIN, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_REPLICA_JOIN_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_PORT_DUMP, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_XLOG_GARBAGE, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_XLOG_META, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_XLOG_READ, ERRINJ_INT, {.iparam = -1}) \
-	_(ERRINJ_VYRUN_INDEX_GARBAGE, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_VYRUN_DATA_READ, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_CHECK_FORMAT_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_BUILD_INDEX, ERRINJ_INT, {.iparam = -1}) \
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 73e661a7..5a04e1fb 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -37,7 +37,7 @@ errinj.info()
     state: 0
   ERRINJ_COIO_SENDFILE_CHUNK:
     state: -1
-  ERRINJ_VY_LOG_FILE_RENAME:
+  ERRINJ_HTTP_RESPONSE_ADD_WAIT:
     state: false
   ERRINJ_WAL_WRITE_PARTIAL:
     state: -1
@@ -51,15 +51,15 @@ errinj.info()
     state: false
   ERRINJ_WAL_SYNC:
     state: false
-  ERRINJ_VYRUN_INDEX_GARBAGE:
-    state: false
-  ERRINJ_BUILD_INDEX_DELAY:
-    state: false
   ERRINJ_BUILD_INDEX:
     state: -1
-  ERRINJ_VY_INDEX_FILE_RENAME:
+  ERRINJ_BUILD_INDEX_DELAY:
     state: false
-  ERRINJ_CHECK_FORMAT_DELAY:
+  ERRINJ_VY_RUN_FILE_RENAME:
+    state: false
+  ERRINJ_VY_COMPACTION_DELAY:
+    state: false
+  ERRINJ_VY_DUMP_DELAY:
     state: false
   ERRINJ_VY_DELAY_PK_LOOKUP:
     state: false
@@ -67,16 +67,16 @@ errinj.info()
     state: false
   ERRINJ_PORT_DUMP:
     state: false
-  ERRINJ_VY_DUMP_DELAY:
-    state: false
+  ERRINJ_WAL_BREAK_LSN:
+    state: -1
   ERRINJ_WAL_IO:
     state: false
   ERRINJ_WAL_FALLOCATE:
     state: 0
-  ERRINJ_WAL_BREAK_LSN:
-    state: -1
   ERRINJ_RELAY_BREAK_LSN:
     state: -1
+  ERRINJ_VY_INDEX_FILE_RENAME:
+    state: false
   ERRINJ_TUPLE_FORMAT_COUNT:
     state: -1
   ERRINJ_TUPLE_ALLOC:
@@ -87,7 +87,7 @@ errinj.info()
     state: false
   ERRINJ_RELAY_REPORT_INTERVAL:
     state: 0
-  ERRINJ_VY_RUN_FILE_RENAME:
+  ERRINJ_VY_LOG_FILE_RENAME:
     state: false
   ERRINJ_VY_READ_PAGE_TIMEOUT:
     state: 0
@@ -95,23 +95,23 @@ errinj.info()
     state: false
   ERRINJ_SIO_READ_MAX:
     state: -1
-  ERRINJ_HTTP_RESPONSE_ADD_WAIT:
-    state: false
-  ERRINJ_WAL_WRITE_DISK:
-    state: false
   ERRINJ_SNAP_COMMIT_DELAY:
     state: false
+  ERRINJ_WAL_WRITE_DISK:
+    state: false
   ERRINJ_SNAP_WRITE_DELAY:
     state: false
-  ERRINJ_VY_RUN_WRITE:
-    state: false
   ERRINJ_LOG_ROTATE:
     state: false
+  ERRINJ_VY_RUN_WRITE:
+    state: false
+  ERRINJ_CHECK_FORMAT_DELAY:
+    state: false
   ERRINJ_VY_LOG_FLUSH_DELAY:
     state: false
   ERRINJ_RELAY_FINAL_JOIN:
     state: false
-  ERRINJ_VY_COMPACTION_DELAY:
+  ERRINJ_REPLICA_JOIN_DELAY:
     state: false
   ERRINJ_RELAY_FINAL_SLEEP:
     state: false
diff --git a/test/replication-py/cluster.result b/test/replication-py/cluster.result
index 04f06f74..f68a6af7 100644
--- a/test/replication-py/cluster.result
+++ b/test/replication-py/cluster.result
@@ -23,19 +23,6 @@ box.schema.user.grant('guest', 'replication')
 ...
 ok - join with granted role
 -------------------------------------------------------------
-gh-707: Master crashes on JOIN if it does not have snapshot files
-gh-480: If socket is closed while JOIN, replica wont reconnect
--------------------------------------------------------------
-ok - join without snapshots
-ok - _cluster did not change after unsuccessful JOIN
-box.schema.user.revoke('guest', 'replication')
----
-...
-box.snapshot()
----
-- ok
-...
--------------------------------------------------------------
 gh-434: Assertion if replace _cluster tuple for local server
 -------------------------------------------------------------
 box.space._cluster:replace{1, require('uuid').NULL:str()}
diff --git a/test/replication-py/cluster.test.py b/test/replication-py/cluster.test.py
index 0140a6bd..088ca9c3 100644
--- a/test/replication-py/cluster.test.py
+++ b/test/replication-py/cluster.test.py
@@ -71,31 +71,6 @@ server.iproto.reconnect() # re-connect with new permissions
 server_id = check_join('join with granted role')
 server.iproto.py_con.space('_cluster').delete(server_id)
 
-print '-------------------------------------------------------------'
-print 'gh-707: Master crashes on JOIN if it does not have snapshot files'
-print 'gh-480: If socket is closed while JOIN, replica wont reconnect'
-print '-------------------------------------------------------------'
-
-data_dir = os.path.join(server.vardir, server.name)
-for k in glob.glob(os.path.join(data_dir, '*.snap')):
-    os.unlink(k)
-
-# remember the number of servers in _cluster table
-server_count = len(server.iproto.py_con.space('_cluster').select(()))
-
-rows = list(server.iproto.py_con.join(replica_uuid))
-print len(rows) > 0 and rows[-1].return_message.find('.snap') >= 0 and \
-    'ok' or 'not ok', '-', 'join without snapshots'
-res = server.iproto.py_con.space('_cluster').select(())
-if server_count <= len(res):
-    print 'ok - _cluster did not change after unsuccessful JOIN'
-else:
-    print 'not ok - _cluster did change after unsuccessful JOIN'
-    print res
-
-server.admin("box.schema.user.revoke('guest', 'replication')")
-server.admin('box.snapshot()')
-
 print '-------------------------------------------------------------'
 print 'gh-434: Assertion if replace _cluster tuple for local server'
 print '-------------------------------------------------------------'
diff --git a/test/replication/join_without_snap.result b/test/replication/join_without_snap.result
new file mode 100644
index 00000000..becdfd21
--- /dev/null
+++ b/test/replication/join_without_snap.result
@@ -0,0 +1,88 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--
+-- gh-1271: check that replica join works off the current read view,
+-- not the last checkpoint. To do that, delete the last snapshot file
+-- and check that a replica can still join.
+--
+_ = box.schema.space.create('test')
+ | ---
+ | ...
+_ = box.space.test:create_index('pk')
+ | ---
+ | ...
+for i = 1, 5 do box.space.test:insert{i} end
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+
+fio = require('fio')
+ | ---
+ | ...
+fio.unlink(fio.pathjoin(box.cfg.memtx_dir, string.format('%020d.snap', box.info.signature)))
+ | ---
+ | - true
+ | ...
+
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('switch replica')
+ | ---
+ | - true
+ | ...
+
+box.space.test:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ |   - [5]
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('cleanup server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+test_run:cleanup_cluster()
+ | ---
+ | ...
+
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
+box.space.test:drop()
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
diff --git a/test/replication/join_without_snap.test.lua b/test/replication/join_without_snap.test.lua
new file mode 100644
index 00000000..6a23d741
--- /dev/null
+++ b/test/replication/join_without_snap.test.lua
@@ -0,0 +1,32 @@
+test_run = require('test_run').new()
+
+--
+-- gh-1271: check that replica join works off the current read view,
+-- not the last checkpoint. To do that, delete the last snapshot file
+-- and check that a replica can still join.
+--
+_ = box.schema.space.create('test')
+_ = box.space.test:create_index('pk')
+for i = 1, 5 do box.space.test:insert{i} end
+box.snapshot()
+
+fio = require('fio')
+fio.unlink(fio.pathjoin(box.cfg.memtx_dir, string.format('%020d.snap', box.info.signature)))
+
+box.schema.user.grant('guest', 'replication')
+
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica.lua"')
+test_run:cmd('start server replica')
+test_run:cmd('switch replica')
+
+box.space.test:select()
+
+test_run:cmd('switch default')
+test_run:cmd('stop server replica')
+test_run:cmd('cleanup server replica')
+test_run:cmd('delete server replica')
+test_run:cleanup_cluster()
+
+box.schema.user.revoke('guest', 'replication')
+box.space.test:drop()
+box.snapshot()
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 91e884ec..eb25077d 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -10,6 +10,7 @@
     "force_recovery.test.lua": {},
     "on_schema_init.test.lua": {},
     "long_row_timeout.test.lua": {},
+    "join_without_snap.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index e8795143..2635da26 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -1116,7 +1116,7 @@ box.snapshot()
 box.schema.user.grant('guest', 'replication')
 ---
 ...
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', true)
+errinj.set('ERRINJ_VY_READ_PAGE', true)
 ---
 - ok
 ...
@@ -1136,7 +1136,7 @@ test_run:cmd("delete server replica")
 ---
 - true
 ...
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', false)
+errinj.set('ERRINJ_VY_READ_PAGE', false)
 ---
 - ok
 ...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index 034ed34c..4230cfae 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -404,12 +404,12 @@ _ = s:create_index('pk')
 s:replace{1, 2, 3}
 box.snapshot()
 box.schema.user.grant('guest', 'replication')
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', true)
+errinj.set('ERRINJ_VY_READ_PAGE', true)
 test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
 test_run:cmd("start server replica with crash_expected=True")
 test_run:cmd("cleanup server replica")
 test_run:cmd("delete server replica")
-errinj.set('ERRINJ_VYRUN_INDEX_GARBAGE', false)
+errinj.set('ERRINJ_VY_READ_PAGE', false)
 box.schema.user.revoke('guest', 'replication')
 s:drop()
 
diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result
index cddc9c3b..60283281 100644
--- a/test/xlog/panic_on_broken_lsn.result
+++ b/test/xlog/panic_on_broken_lsn.result
@@ -123,16 +123,33 @@ box.space.test:auto_increment{'v0'}
 ---
 - [1, 'v0']
 ...
-lsn = box.info.vclock[1]
+-- Inject a broken LSN in the final join stage.
+lsn = -1
 ---
 ...
-box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", true)
 ---
 - ok
 ...
-box.space.test:auto_increment{'v1'}
+fiber = require('fiber')
 ---
-- [2, 'v1']
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+_ = fiber.create(function()
+    test_run:wait_cond(function() return box.space._cluster:get(2) ~= nil end)
+    lsn = box.info.vclock[1]
+    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+    box.space.test:auto_increment{'v1'}
+    box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
+end);
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
 ...
 test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"')
 ---
@@ -142,12 +159,6 @@ test_run:cmd('start server replica with crash_expected=True')
 ---
 - false
 ...
-fiber = require('fiber')
----
-...
-while box.info.replication[2] == nil do fiber.sleep(0.001) end
----
-...
 box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
 ---
 - ok
diff --git a/test/xlog/panic_on_broken_lsn.test.lua b/test/xlog/panic_on_broken_lsn.test.lua
index cdf287a1..ca304345 100644
--- a/test/xlog/panic_on_broken_lsn.test.lua
+++ b/test/xlog/panic_on_broken_lsn.test.lua
@@ -57,14 +57,24 @@ box.schema.user.grant('guest', 'replication')
 _ = box.schema.space.create('test', {id = 9000})
 _ = box.space.test:create_index('pk')
 box.space.test:auto_increment{'v0'}
-lsn = box.info.vclock[1]
-box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
-box.space.test:auto_increment{'v1'}
+
+-- Inject a broken LSN in the final join stage.
+lsn = -1
+box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", true)
+
+fiber = require('fiber')
+test_run:cmd("setopt delimiter ';'")
+_ = fiber.create(function()
+    test_run:wait_cond(function() return box.space._cluster:get(2) ~= nil end)
+    lsn = box.info.vclock[1]
+    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+    box.space.test:auto_increment{'v1'}
+    box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
+end);
+test_run:cmd("setopt delimiter ''");
 
 test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"')
 test_run:cmd('start server replica with crash_expected=True')
-fiber = require('fiber')
-while box.info.replication[2] == nil do fiber.sleep(0.001) end
 box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
 
 -- Check that log contains the mention of broken LSN and the request printout
-- 
2.20.1




More information about the Tarantool-patches mailing list