[PATCH 4/4] vinyl: log new index before WAL write on DDL

Vladimir Davydov vdavydov.dev at gmail.com
Tue Mar 27 18:03:03 MSK 2018


Currently, we write new indexes to vylog only after successful WAL
write (see vinyl_index_commit_create). This is incompatible with space
ALTER - the problem is during ALTER vinyl may need to create new run
files, which we need to track in order not to leave garbage if ALTER
fails or tarantool exits before ALTER is complete.

So this patch splits index creation in two stages, prepare and commit.
The 'commit' stage is represented by existing VY_LOG_CREATE_LSM record,
which is written from index_vtab::commit_create callback, just like
before. For the 'prepare' stage we introduce a new record type,
VY_LOG_REPARE_LSM, written from index_vtab::build_secondary_key and
index_vtab::add_primary_key callbacks, i.e. before WAL write. For now,
we don't write anything to prepared, but uncommitted indexes (this will
be done later), but we do add prepared indexes to the scheduler so that
they can be dumped and compacted. If ALTER fails, we drop prepared
indexes in index_vtab::abort_create callback. Prepared but uncommitted
indexes are ignored by backup and replication.

Note, we have to rework vinyl/errinj_vylog test in this patch, because
index creation (and hence space truncation) commands now fail on vylog
error, i.e. a situation when the same index is dropped and recreated
multiple times in xlog without having corresponding records in vylog is
now impossible.

Also, space truncation is not linearizable for vinyl anymore as it may
yield before WAL write, while trying to prepare an index in vylog. This
is OK - we never promised it is. Remove the corresponding test case.
---
 src/box/vinyl.c                  |  84 +++++++---
 src/box/vy_log.c                 | 347 ++++++++++++++++++++++++++++-----------
 src/box/vy_log.h                 |  40 ++++-
 src/box/vy_lsm.c                 |  47 ++++--
 test/engine/truncate.result      |  87 ----------
 test/engine/truncate.test.lua    |  48 ------
 test/vinyl/errinj_vylog.result   | 169 +++++++++----------
 test/vinyl/errinj_vylog.test.lua | 112 ++++++-------
 test/vinyl/layout.result         |   4 +-
 9 files changed, 527 insertions(+), 411 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index fa67a99a..0c7ca38d 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -775,6 +775,8 @@ vinyl_index_open(struct index *index)
 	default:
 		unreachable();
 	}
+	if (rc == 0)
+		vy_scheduler_add_lsm(&env->scheduler, lsm);
 	return rc;
 }
 
@@ -796,10 +798,8 @@ vinyl_index_commit_create(struct index *index, int64_t lsn)
 		 * the index isn't in the recovery context and we
 		 * need to retry to log it now.
 		 */
-		if (lsm->is_committed) {
-			vy_scheduler_add_lsm(&env->scheduler, lsm);
+		if (lsm->is_committed)
 			return;
-		}
 	}
 
 	if (env->status == VINYL_INITIAL_RECOVERY_REMOTE) {
@@ -824,9 +824,6 @@ vinyl_index_commit_create(struct index *index, int64_t lsn)
 	assert(!lsm->is_committed);
 	lsm->is_committed = true;
 
-	assert(lsm->range_count == 1);
-	struct vy_range *range = vy_range_tree_first(lsm->tree);
-
 	/*
 	 * Since it's too late to fail now, in case of vylog write
 	 * failure we leave the records we attempted to write in
@@ -838,13 +835,7 @@ vinyl_index_commit_create(struct index *index, int64_t lsn)
 	vy_log_tx_begin();
 	vy_log_create_lsm(lsm->id, lsm->space_id, lsm->index_id,
 			  lsm->key_def, lsn);
-	vy_log_insert_range(lsm->id, range->id, NULL, NULL);
 	vy_log_tx_try_commit();
-	/*
-	 * After we committed the index in the log, we can schedule
-	 * a task for it.
-	 */
-	vy_scheduler_add_lsm(&env->scheduler, lsm);
 }
 
 /*
@@ -873,6 +864,25 @@ vy_log_lsm_prune(struct vy_lsm *lsm, int64_t gc_lsn)
 }
 
 static void
+vinyl_index_abort_create(struct index *index)
+{
+	struct vy_env *env = vy_env(index->engine);
+	struct vy_lsm *lsm = vy_lsm(index);
+
+	assert(env->status != VINYL_INITIAL_RECOVERY_LOCAL &&
+	       env->status != VINYL_FINAL_RECOVERY_LOCAL);
+
+	vy_scheduler_remove_lsm(&env->scheduler, lsm);
+
+	lsm->is_dropped = true;
+
+	vy_log_tx_begin();
+	vy_log_lsm_prune(lsm, 0);
+	vy_log_drop_lsm(lsm->id);
+	vy_log_tx_try_commit();
+}
+
+static void
 vinyl_index_commit_drop(struct index *index)
 {
 	struct vy_env *env = vy_env(index->engine);
@@ -3043,7 +3053,8 @@ vy_send_lsm(struct vy_join_ctx *ctx, struct vy_lsm_recovery_info *lsm_info)
 {
 	int rc = -1;
 
-	if (lsm_info->is_dropped)
+	/* Skip dropped or incomplete indexes. */
+	if (lsm_info->is_dropped || lsm_info->commit_lsn < 0)
 		return 0;
 
 	/*
@@ -3282,18 +3293,48 @@ vy_gc(struct vy_env *env, struct vy_recovery *recovery,
 	int loops = 0;
 	struct vy_lsm_recovery_info *lsm_info;
 	rlist_foreach_entry(lsm_info, &recovery->lsms, in_recovery) {
+		/*
+		 * Delete unused run files.
+		 */
 		struct vy_run_recovery_info *run_info;
 		rlist_foreach_entry(run_info, &lsm_info->runs, in_lsm) {
-			if ((run_info->is_dropped &&
-			     run_info->gc_lsn < gc_lsn &&
-			     (gc_mask & VY_GC_DROPPED) != 0) ||
-			    (run_info->is_incomplete &&
-			     (gc_mask & VY_GC_INCOMPLETE) != 0)) {
+			bool purge = false;
+
+			/* Run was deleted before the oldest checkpoint. */
+			if ((gc_mask & VY_GC_DROPPED) != 0 &&
+			    run_info->is_dropped && run_info->gc_lsn < gc_lsn)
+				purge = true;
+
+			/* Index or run was not committed. */
+			if ((gc_mask & VY_GC_INCOMPLETE) != 0 &&
+			    (lsm_info->commit_lsn < 0 || run_info->is_incomplete))
+				purge = true;
+
+			if (purge)
 				vy_gc_run(env, lsm_info, run_info);
-			}
+
 			if (loops % VY_YIELD_LOOPS == 0)
 				fiber_sleep(0);
 		}
+
+		/*
+		 * Purge incomplete indexes from vylog.
+		 */
+		if (lsm_info->commit_lsn >= 0 || lsm_info->is_dropped ||
+		    (gc_mask & VY_GC_INCOMPLETE) == 0)
+			continue;
+
+		vy_log_tx_begin();
+		struct vy_range_recovery_info *range_info;
+		rlist_foreach_entry(range_info, &lsm_info->ranges, in_lsm) {
+			struct vy_slice_recovery_info *slice_info;
+			rlist_foreach_entry(slice_info, &range_info->slices,
+					    in_range)
+				vy_log_delete_slice(slice_info->id);
+			vy_log_delete_range(range_info->id);
+		}
+		vy_log_drop_lsm(lsm_info->id);
+		vy_log_tx_try_commit();
 	}
 }
 
@@ -3345,7 +3386,8 @@ vinyl_engine_backup(struct engine *engine, struct vclock *vclock,
 	int loops = 0;
 	struct vy_lsm_recovery_info *lsm_info;
 	rlist_foreach_entry(lsm_info, &recovery->lsms, in_recovery) {
-		if (lsm_info->is_dropped)
+		/* Skip dropped or incomplete indexes. */
+		if (lsm_info->is_dropped || lsm_info->commit_lsn < 0)
 			continue;
 		struct vy_run_recovery_info *run_info;
 		rlist_foreach_entry(run_info, &lsm_info->runs, in_lsm) {
@@ -3952,7 +3994,7 @@ static const struct space_vtab vinyl_space_vtab = {
 static const struct index_vtab vinyl_index_vtab = {
 	/* .destroy = */ vinyl_index_destroy,
 	/* .commit_create = */ vinyl_index_commit_create,
-	/* .abort_create = */ generic_index_abort_create,
+	/* .abort_create = */ vinyl_index_abort_create,
 	/* .commit_drop = */ vinyl_index_commit_drop,
 	/* .update_def = */ generic_index_update_def,
 	/* .size = */ vinyl_index_size,
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index aeb31781..dce9807c 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -115,6 +115,7 @@ static const char *vy_log_type_name[] = {
 	[VY_LOG_DUMP_LSM]		= "dump_lsm",
 	[VY_LOG_SNAPSHOT]		= "snapshot",
 	[VY_LOG_TRUNCATE_LSM]		= "truncate_lsm",
+	[VY_LOG_PREPARE_LSM]		= "prepare_lsm",
 };
 
 /** Metadata log object. */
@@ -171,6 +172,10 @@ static struct vy_log vy_log;
 static struct vy_recovery *
 vy_recovery_new_locked(int64_t signature, bool only_checkpoint);
 
+static int
+vy_recovery_process_record(struct vy_recovery *recovery,
+			   const struct vy_log_record *record);
+
 /**
  * Return the name of the vylog file that has the given signature.
  */
@@ -845,6 +850,14 @@ vy_log_end_recovery(void)
 {
 	assert(vy_log.recovery != NULL);
 
+	/*
+	 * Update the recovery context with records written during
+	 * recovery - we will need them for garbage collection.
+	 */
+	struct vy_log_record *record;
+	stailq_foreach_entry(record, &vy_log.tx, in_tx)
+		vy_recovery_process_record(vy_log.recovery, record);
+
 	/* Flush all pending records. */
 	if (vy_log_flush() < 0) {
 		diag_log();
@@ -1099,17 +1112,57 @@ vy_log_write(const struct vy_log_record *record)
  * vy_recovery::index_id_hash map.
  */
 static inline int64_t
-vy_recovery_index_id_hash(uint32_t space_id, uint32_t index_id)
+vy_recovery_index_id_hash_key(uint32_t space_id, uint32_t index_id)
 {
 	return ((uint64_t)space_id << 32) + index_id;
 }
 
+/**
+ * Insert an LSM tree to vy_recovery::index_id_hash.
+ *
+ * If there is an LSM tree with the same space_id/index_id
+ * present in the hash table, it will be silently replaced.
+ *
+ * Returns 0 on success, -1 on error.
+ */
+static int
+vy_recovery_hash_index_id(struct vy_recovery *recovery,
+			  struct vy_lsm_recovery_info *lsm)
+{
+	struct mh_i64ptr_t *h = recovery->index_id_hash;
+	struct mh_i64ptr_node_t node;
+
+	node.key = vy_recovery_index_id_hash_key(lsm->space_id, lsm->index_id);
+	node.val = lsm;
+
+	if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) {
+		diag_set(OutOfMemory, 0, "mh_i64ptr_put", "mh_i64ptr_node_t");
+		return -1;
+	}
+	return 0;
+}
+
+/**
+ * Remove the entry corresponding to the space_id/index_id
+ * from vy_recovery::index_id_hash.
+ */
+static void
+vy_recovery_unhash_index_id(struct vy_recovery *recovery,
+			    uint32_t space_id, uint32_t index_id)
+{
+	struct mh_i64ptr_t *h = recovery->index_id_hash;
+	int64_t key = vy_recovery_index_id_hash_key(space_id, index_id);
+	mh_int_t k = mh_i64ptr_find(h, key, NULL);
+	if (k != mh_end(h))
+		mh_i64ptr_del(h, k, NULL);
+}
+
 /** Lookup an LSM tree in vy_recovery::index_id_hash map. */
 struct vy_lsm_recovery_info *
 vy_recovery_lsm_by_index_id(struct vy_recovery *recovery,
 			    uint32_t space_id, uint32_t index_id)
 {
-	int64_t key = vy_recovery_index_id_hash(space_id, index_id);
+	int64_t key = vy_recovery_index_id_hash_key(space_id, index_id);
 	struct mh_i64ptr_t *h = recovery->index_id_hash;
 	mh_int_t k = mh_i64ptr_find(h, key, NULL);
 	if (k == mh_end(h))
@@ -1162,10 +1215,121 @@ vy_recovery_lookup_slice(struct vy_recovery *recovery, int64_t slice_id)
 }
 
 /**
+ * Allocate a new LSM tree with the given ID and add it to
+ * the recovery context.
+ *
+ * Returns the new LSM tree on success, NULL on error.
+ */
+static struct vy_lsm_recovery_info *
+vy_recovery_do_create_lsm(struct vy_recovery *recovery, int64_t id,
+			  uint32_t space_id, uint32_t index_id)
+{
+	struct vy_lsm_recovery_info *lsm = malloc(sizeof(*lsm));
+	if (lsm == NULL) {
+		diag_set(OutOfMemory, sizeof(*lsm),
+			 "malloc", "struct vy_lsm_recovery_info");
+		return NULL;
+	}
+	struct mh_i64ptr_t *h = recovery->lsm_hash;
+	struct mh_i64ptr_node_t node = { id, lsm };
+	struct mh_i64ptr_node_t *old_node = NULL;
+	if (mh_i64ptr_put(h, &node, &old_node, NULL) == mh_end(h)) {
+		diag_set(OutOfMemory, 0, "mh_i64ptr_put", "mh_i64ptr_node_t");
+		free(lsm);
+		return NULL;
+	}
+	assert(old_node == NULL);
+	lsm->id = id;
+	lsm->space_id = space_id;
+	lsm->index_id = index_id;
+	lsm->key_parts = NULL;
+	lsm->key_part_count = 0;
+	lsm->is_dropped = false;
+	lsm->commit_lsn = -1;
+	lsm->dump_lsn = -1;
+	lsm->prepared = NULL;
+	rlist_create(&lsm->ranges);
+	rlist_create(&lsm->runs);
+	/*
+	 * Keep newer LSM trees closer to the tail of the list
+	 * so that on log rotation we create/drop past incarnations
+	 * before the final version.
+	 */
+	rlist_add_tail_entry(&recovery->lsms, lsm, in_recovery);
+	if (recovery->max_id < id)
+		recovery->max_id = id;
+	return lsm;
+}
+
+/**
+ * Handle a VY_LOG_PREPARE_LSM log record.
+ *
+ * This function allocates a new LSM tree with the given ID and
+ * either associates it with the existing LSM tree hashed under
+ * the same space_id/index_id or inserts it into the hash if
+ * there's none.
+ *
+ * Note, we link incomplete LSM trees to index_id_hash (either
+ * directly or indirectly via vy_lsm_recovery_info::prepared),
+ * because an LSM tree may have been fully built and logged in
+ * WAL, but uncommitted to vylog. We need to be able to identify
+ * such LSM trees during local recovery so that instead of
+ * rebuilding them we can simply retry vylog write.
+ *
+ * Returns 0 on success, -1 on error.
+ */
+static int
+vy_recovery_prepare_lsm(struct vy_recovery *recovery, int64_t id,
+			uint32_t space_id, uint32_t index_id)
+{
+	if (vy_recovery_lookup_lsm(recovery, id) != NULL) {
+		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+			 tt_sprintf("Duplicate LSM tree id %lld",
+				    (long long)id));
+		return -1;
+	}
+	struct vy_lsm_recovery_info *new_lsm;
+	new_lsm = vy_recovery_do_create_lsm(recovery, id, space_id, index_id);
+	if (new_lsm == NULL)
+		return -1;
+
+	struct vy_lsm_recovery_info *lsm;
+	lsm = vy_recovery_lsm_by_index_id(recovery, space_id, index_id);
+	if (lsm == NULL) {
+		/*
+		 * There's no LSM tree for these space_id/index_id
+		 * in the recovery context. Insert the new LSM tree
+		 * into the index_id_hash.
+		 */
+		return vy_recovery_hash_index_id(recovery, new_lsm);
+	}
+
+	/*
+	 * If there's an LSM tree for the given space_id/index_id,
+	 * it can't be incomplete (i.e. it must be committed albeit
+	 * it may be dropped), neither can it have a prepared LSM
+	 * tree associated with it.
+	 */
+	if (lsm->commit_lsn < 0 || lsm->prepared != NULL) {
+		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+			 tt_sprintf("Index %u/%u prepared twice",
+				    (unsigned)space_id, (unsigned)index_id));
+		return -1;
+	}
+
+	/* Link the new LSM tree to the existing one. */
+	lsm->prepared = new_lsm;
+	return 0;
+}
+
+/**
  * Handle a VY_LOG_CREATE_LSM log record.
- * This function allocates a new vinyl LSM tree with ID @id
- * and inserts it to the hash.
- * Return 0 on success, -1 on failure (ID collision or OOM).
+ *
+ * Depending on whether the LSM tree was previously prepared,
+ * this function either commits it or allocates a new one and
+ * inserts it into the recovery hash.
+ *
+ * Returns 0 on success, -1 on error.
  */
 static int
 vy_recovery_create_lsm(struct vy_recovery *recovery, int64_t id,
@@ -1173,115 +1337,80 @@ vy_recovery_create_lsm(struct vy_recovery *recovery, int64_t id,
 		       const struct key_part_def *key_parts,
 		       uint32_t key_part_count, int64_t commit_lsn)
 {
-	struct vy_lsm_recovery_info *lsm;
-	struct key_part_def *key_parts_copy;
-	struct mh_i64ptr_node_t node;
-	struct mh_i64ptr_t *h;
-	mh_int_t k;
-
-	/*
-	 * Make a copy of the key definition to be used for
-	 * the new LSM tree incarnation.
-	 */
 	if (key_parts == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Missing key definition for LSM tree %lld",
 				    (long long)id));
 		return -1;
 	}
-	key_parts_copy = malloc(sizeof(*key_parts) * key_part_count);
-	if (key_parts_copy == NULL) {
-		diag_set(OutOfMemory, sizeof(*key_parts) * key_part_count,
-			 "malloc", "struct key_part_def");
-		return -1;
-	}
-	memcpy(key_parts_copy, key_parts, sizeof(*key_parts) * key_part_count);
 
-	/*
-	 * Look up the LSM tree in the hash.
-	 */
-	h = recovery->index_id_hash;
-	node.key = vy_recovery_index_id_hash(space_id, index_id);
-	k = mh_i64ptr_find(h, node.key, NULL);
-	lsm = (k != mh_end(h)) ? mh_i64ptr_node(h, k)->val : NULL;
-
-	if (lsm == NULL) {
+	struct vy_lsm_recovery_info *lsm;
+	lsm = vy_recovery_lookup_lsm(recovery, id);
+	if (lsm != NULL) {
 		/*
-		 * This is the first time the LSM tree is created
-		 * (there's no previous incarnation in the context).
-		 * Allocate a node for the LSM tree and add it to
-		 * the hash.
+		 * If the LSM tree already exists, it must be in
+		 * the prepared state (i.e. not committed or dropped).
 		 */
-		lsm = malloc(sizeof(*lsm));
-		if (lsm == NULL) {
-			diag_set(OutOfMemory, sizeof(*lsm),
-				 "malloc", "struct vy_lsm_recovery_info");
-			free(key_parts_copy);
-			return -1;
-		}
-		lsm->index_id = index_id;
-		lsm->space_id = space_id;
-		rlist_create(&lsm->ranges);
-		rlist_create(&lsm->runs);
-
-		node.val = lsm;
-		if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) {
-			diag_set(OutOfMemory, 0, "mh_i64ptr_put",
-				 "mh_i64ptr_node_t");
-			free(key_parts_copy);
-			free(lsm);
+		if (lsm->commit_lsn >= 0 || lsm->is_dropped) {
+			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+				 tt_sprintf("Duplicate LSM tree id %lld",
+					    (long long)id));
 			return -1;
 		}
-		rlist_add_entry(&recovery->lsms, lsm, in_recovery);
 	} else {
-		/*
-		 * The LSM tree was dropped and recreated with the
-		 * same ID. Update its key definition (because it
-		 * could have changed since the last time it was
-		 * used) and reset its state.
-		 */
-		if (!lsm->is_dropped) {
-			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-				 tt_sprintf("LSM tree %u/%u created twice",
-					    (unsigned)space_id,
-					    (unsigned)index_id));
-			free(key_parts_copy);
+		lsm = vy_recovery_do_create_lsm(recovery, id,
+						space_id, index_id);
+		if (lsm == NULL)
 			return -1;
-		}
-		assert(lsm->index_id == index_id);
-		assert(lsm->space_id == space_id);
-		free(lsm->key_parts);
 	}
 
-	lsm->id = id;
-	lsm->key_parts = key_parts_copy;
+	/* Set the key definition. */
+	assert(lsm->key_parts == NULL);
+	lsm->key_parts = malloc(sizeof(*key_parts) * key_part_count);
+	if (lsm->key_parts == NULL) {
+		diag_set(OutOfMemory, sizeof(*key_parts) * key_part_count,
+			 "malloc", "struct key_part_def");
+		return -1;
+	}
+	memcpy(lsm->key_parts, key_parts, sizeof(*key_parts) * key_part_count);
 	lsm->key_part_count = key_part_count;
-	lsm->is_dropped = false;
+
+	/* Mark the LSM tree committed by assigning LSN. */
+	assert(lsm->commit_lsn < 0);
 	lsm->commit_lsn = commit_lsn;
-	lsm->dump_lsn = -1;
 
 	/*
-	 * Add the LSM tree to the hash.
+	 * Hash the new LSM tree under the given space_id/index_id.
+	 * First, look up the LSM tree that is presently in the hash.
 	 */
-	h = recovery->lsm_hash;
-	node.key = id;
-	node.val = lsm;
-	if (mh_i64ptr_find(h, id, NULL) != mh_end(h)) {
-		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Duplicate LSM tree id %lld",
-				    (long long)id));
-		return -1;
-	}
-	if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) {
-		diag_set(OutOfMemory, 0, "mh_i64ptr_put",
-			 "mh_i64ptr_node_t");
-		return -1;
+	struct vy_lsm_recovery_info *old_lsm;
+	old_lsm = vy_recovery_lsm_by_index_id(recovery, space_id, index_id);
+	if (old_lsm == lsm) {
+		/*
+		 * The new LSM tree is already hashed, nothing to do
+		 * (it was hashed on prepare).
+		 */
+		return 0;
 	}
 
-	if (recovery->max_id < id)
-		recovery->max_id = id;
+	/* Unlink the new LSM tree from the old one, if any. */
+	if (old_lsm != NULL) {
+		assert(old_lsm->commit_lsn >= 0);
+		if (!old_lsm->is_dropped) {
+			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+				 tt_sprintf("LSM tree %u/%u created twice",
+					    (unsigned)space_id,
+					    (unsigned)index_id));
+			return -1;
+		}
+		if (old_lsm->prepared != NULL) {
+			assert(old_lsm->prepared == lsm);
+			old_lsm->prepared = NULL;
+		}
+	}
 
-	return 0;
+	/* Update the hash with the new LSM tree. */
+	return vy_recovery_hash_index_id(recovery, lsm);
 }
 
 /**
@@ -1323,6 +1452,35 @@ vy_recovery_drop_lsm(struct vy_recovery *recovery, int64_t id)
 		}
 	}
 	lsm->is_dropped = true;
+
+	if (lsm->commit_lsn >= 0)
+		return 0;
+	/*
+	 * If the dropped LSM tree has never been committed,
+	 * it means that ALTER for the corresponding index was
+	 * aborted, hence we don't need to keep it in the
+	 * index_id_hash, because the LSM tree is pure garbage
+	 * and will never be recovered. Unlink it now.
+	 */
+	struct vy_lsm_recovery_info *hashed_lsm;
+	hashed_lsm = vy_recovery_lsm_by_index_id(recovery,
+				lsm->space_id, lsm->index_id);
+	if (hashed_lsm == lsm) {
+		/*
+		 * The LSM tree is linked to the index_id_hash
+		 * directly. Remove the corresponding hash entry.
+		 */
+		vy_recovery_unhash_index_id(recovery,
+				lsm->space_id, lsm->index_id);
+	} else {
+		/*
+		 * The LSM tree was linked to an existing LSM
+		 * tree via vy_lsm_recovery_info::prepared.
+		 * Clear the reference.
+		 */
+		assert(hashed_lsm->prepared == lsm);
+		hashed_lsm->prepared = NULL;
+	}
 	return 0;
 }
 
@@ -1752,6 +1910,10 @@ vy_recovery_process_record(struct vy_recovery *recovery,
 {
 	int rc;
 	switch (record->type) {
+	case VY_LOG_PREPARE_LSM:
+		rc = vy_recovery_prepare_lsm(recovery, record->lsm_id,
+				record->space_id, record->index_id);
+		break;
 	case VY_LOG_CREATE_LSM:
 		rc = vy_recovery_create_lsm(recovery, record->lsm_id,
 				record->space_id, record->index_id,
@@ -1999,7 +2161,8 @@ vy_log_append_lsm(struct xlog *xlog, struct vy_lsm_recovery_info *lsm)
 	struct vy_log_record record;
 
 	vy_log_record_init(&record);
-	record.type = VY_LOG_CREATE_LSM;
+	record.type = lsm->commit_lsn < 0 ?
+		VY_LOG_PREPARE_LSM : VY_LOG_CREATE_LSM;
 	record.lsm_id = lsm->id;
 	record.index_id = lsm->index_id;
 	record.space_id = lsm->space_id;
diff --git a/src/box/vy_log.h b/src/box/vy_log.h
index 2dad4ee9..a05ed9d9 100644
--- a/src/box/vy_log.h
+++ b/src/box/vy_log.h
@@ -164,6 +164,21 @@ enum vy_log_record_type {
 	 * WAL records written after truncation.
 	 */
 	VY_LOG_TRUNCATE_LSM		= 12,
+	/**
+	 * Prepare a new LSM tree for building.
+	 * Requires vy_log_record::lsm_id, index_id, space_id.
+	 *
+	 * Index ALTER operation consists of two stages. First, we
+	 * build a new LSM tree, checking constraints if necessary.
+	 * This is done before writing the operation to WAL. Then,
+	 * provided the first stage succeeded, we commit the LSM
+	 * tree to the metadata log.
+	 *
+	 * The following record is used to prepare a new LSM tree
+	 * for building. Once the index has been built, we write
+	 * a VY_LOG_CREATE_LSM record to commit it.
+	 */
+	VY_LOG_PREPARE_LSM		= 13,
 
 	vy_log_record_type_MAX
 };
@@ -253,7 +268,12 @@ struct vy_lsm_recovery_info {
 	uint32_t key_part_count;
 	/** True if the LSM tree was dropped. */
 	bool is_dropped;
-	/** LSN of the WAL row that committed the LSM tree. */
+	/**
+	 * LSN of the WAL row that committed the LSM tree,
+	 * or -1 if the LSM tree was not committed (that is
+	 * there was an VY_LOG_PREPARE_LSM record but no
+	 * VY_LOG_CREATE_LSM).
+	 */
 	int64_t commit_lsn;
 	/** LSN of the last LSM tree dump. */
 	int64_t dump_lsn;
@@ -268,6 +288,11 @@ struct vy_lsm_recovery_info {
 	 * vy_run_recovery_info::in_lsm.
 	 */
 	struct rlist runs;
+	/**
+	 * Pointer to an LSM tree that is going to replace
+	 * this one after successful ALTER.
+	 */
+	struct vy_lsm_recovery_info *prepared;
 };
 
 /** Vinyl range info stored in a recovery context. */
@@ -500,6 +525,19 @@ vy_log_record_init(struct vy_log_record *record)
 	memset(record, 0, sizeof(*record));
 }
 
+/** Helper to log a vinyl LSM tree preparation. */
+static inline void
+vy_log_prepare_lsm(int64_t id, uint32_t space_id, uint32_t index_id)
+{
+	struct vy_log_record record;
+	vy_log_record_init(&record);
+	record.type = VY_LOG_PREPARE_LSM;
+	record.lsm_id = id;
+	record.space_id = space_id;
+	record.index_id = index_id;
+	vy_log_write(&record);
+}
+
 /** Helper to log a vinyl LSM tree creation. */
 static inline void
 vy_log_create_lsm(int64_t id, uint32_t space_id, uint32_t index_id,
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index 88a75e32..ea07f2ac 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -371,7 +371,20 @@ vy_lsm_create(struct vy_lsm *lsm)
 	lsm->id = vy_log_next_id();
 
 	/* Allocate initial range. */
-	return vy_lsm_init_range_tree(lsm);
+	if (vy_lsm_init_range_tree(lsm) != 0)
+		return -1;
+
+	assert(lsm->range_count == 1);
+	struct vy_range *range = vy_range_tree_first(lsm->tree);
+
+	/* Write the new index to the metadata log. */
+	vy_log_tx_begin();
+	vy_log_prepare_lsm(lsm->id, lsm->space_id, lsm->index_id);
+	vy_log_insert_range(lsm->id, range->id, NULL, NULL);
+	if (vy_log_tx_commit() < 0)
+		return -1;
+
+	return 0;
 }
 
 static struct vy_run *
@@ -545,10 +558,10 @@ vy_lsm_recover(struct vy_lsm *lsm, struct vy_recovery *recovery,
 	lsm_info = vy_recovery_lsm_by_index_id(recovery,
 			lsm->space_id, lsm->index_id);
 	if (is_checkpoint_recovery) {
-		if (lsm_info == NULL) {
+		if (lsm_info == NULL || lsm_info->commit_lsn < 0) {
 			/*
 			 * All LSM trees created from snapshot rows must
-			 * be present in vylog, because snapshot can
+			 * be committed to vylog, because snapshot can
 			 * only succeed if vylog has been successfully
 			 * flushed.
 			 */
@@ -568,20 +581,34 @@ vy_lsm_recover(struct vy_lsm *lsm, struct vy_recovery *recovery,
 		}
 	}
 
-	if (lsm_info == NULL || lsn > lsm_info->commit_lsn) {
+	if (lsm_info == NULL || (lsm_info->commit_lsn >= 0 &&
+				 lsm_info->prepared == NULL &&
+				 lsn > lsm_info->commit_lsn)) {
 		/*
-		 * If we failed to log LSM tree creation before restart,
-		 * we won't find it in the log on recovery. This is OK as
+		 * Before VY_LOG_PREPARE_LSM record type was introduced,
+		 * we wrote a new index to vylog only after successful
+		 * WAL write. So if we can't find the index in vylog,
+		 * we failed to log it before restart. This is OK as
 		 * the LSM tree doesn't have any runs in this case. We will
 		 * retry to log LSM tree in vinyl_index_commit_create().
-		 * For now, just create the initial range and assign id.
+		 * For now, retry index creation.
 		 */
-		lsm->id = vy_log_next_id();
-		return vy_lsm_init_range_tree(lsm);
+		return vy_lsm_create(lsm);
+	}
+
+	if (lsm_info->commit_lsn >= 0 && lsn > lsm_info->commit_lsn) {
+		/*
+		 * The index we are recovering was prepared, successfully
+		 * built, and committed to WAL, but it was not committed
+		 * to vylog. Recover the prepared LSM tree. We will retry
+		 * to write it to vylog in vinyl_index_commit_create().
+		 */
+		lsm_info = lsm_info->prepared;
+		assert(lsm_info != NULL);
 	}
 
 	lsm->id = lsm_info->id;
-	lsm->is_committed = true;
+	lsm->is_committed = lsm_info->commit_lsn >= 0;
 
 	if (lsn < lsm_info->commit_lsn || lsm_info->is_dropped) {
 		/*
diff --git a/test/engine/truncate.result b/test/engine/truncate.result
index b6e1a99a..af4850b6 100644
--- a/test/engine/truncate.result
+++ b/test/engine/truncate.result
@@ -195,93 +195,6 @@ s:drop()
 ---
 ...
 --
--- Check that space truncation is linearizable.
---
--- Create a space with several indexes and start three fibers:
--- 1st and 3rd update the space, 2nd truncates it. Then wait
--- until all fibers are done. The space should contain data
--- inserted by the 3rd fiber.
---
--- Note, this is guaranteed to be true only if space updates
--- don't yield, which is always true for memtx and is true
--- for vinyl in case there's no data on disk, as in this case.
---
-s = box.schema.create_space('test', {engine = engine})
----
-...
-_ = s:create_index('i1', {parts = {1, 'unsigned'}})
----
-...
-_ = s:create_index('i2', {parts = {2, 'unsigned'}})
----
-...
-_ = s:create_index('i3', {parts = {3, 'string'}})
----
-...
-_ = s:insert{1, 1, 'a'}
----
-...
-_ = s:insert{2, 2, 'b'}
----
-...
-_ = s:insert{3, 3, 'c'}
----
-...
-c = fiber.channel(3)
----
-...
-test_run:cmd("setopt delimiter ';'")
----
-- true
-...
-fiber.create(function()
-    box.begin()
-    s:replace{1, 10, 'aa'}
-    s:replace{2, 20, 'bb'}
-    s:replace{3, 30, 'cc'}
-    box.commit()
-    c:put(true)
-end)
-fiber.create(function()
-    s:truncate()
-    c:put(true)
-end)
-fiber.create(function()
-    box.begin()
-    s:replace{1, 100, 'aaa'}
-    s:replace{2, 200, 'bbb'}
-    s:replace{3, 300, 'ccc'}
-    box.commit()
-    c:put(true)
-end)
-test_run:cmd("setopt delimiter ''");
----
-...
-for i = 1, 3 do c:get() end
----
-...
-s.index.i1:select()
----
-- - [1, 100, 'aaa']
-  - [2, 200, 'bbb']
-  - [3, 300, 'ccc']
-...
-s.index.i2:select()
----
-- - [1, 100, 'aaa']
-  - [2, 200, 'bbb']
-  - [3, 300, 'ccc']
-...
-s.index.i3:select()
----
-- - [1, 100, 'aaa']
-  - [2, 200, 'bbb']
-  - [3, 300, 'ccc']
-...
-s:drop()
----
-...
---
 -- Calling space.truncate concurrently.
 --
 s = box.schema.create_space('test', {engine = engine})
diff --git a/test/engine/truncate.test.lua b/test/engine/truncate.test.lua
index 66cbd156..d54f4b9f 100644
--- a/test/engine/truncate.test.lua
+++ b/test/engine/truncate.test.lua
@@ -82,54 +82,6 @@ s.index.i3:select()
 s:drop()
 
 --
--- Check that space truncation is linearizable.
---
--- Create a space with several indexes and start three fibers:
--- 1st and 3rd update the space, 2nd truncates it. Then wait
--- until all fibers are done. The space should contain data
--- inserted by the 3rd fiber.
---
--- Note, this is guaranteed to be true only if space updates
--- don't yield, which is always true for memtx and is true
--- for vinyl in case there's no data on disk, as in this case.
---
-s = box.schema.create_space('test', {engine = engine})
-_ = s:create_index('i1', {parts = {1, 'unsigned'}})
-_ = s:create_index('i2', {parts = {2, 'unsigned'}})
-_ = s:create_index('i3', {parts = {3, 'string'}})
-_ = s:insert{1, 1, 'a'}
-_ = s:insert{2, 2, 'b'}
-_ = s:insert{3, 3, 'c'}
-c = fiber.channel(3)
-test_run:cmd("setopt delimiter ';'")
-fiber.create(function()
-    box.begin()
-    s:replace{1, 10, 'aa'}
-    s:replace{2, 20, 'bb'}
-    s:replace{3, 30, 'cc'}
-    box.commit()
-    c:put(true)
-end)
-fiber.create(function()
-    s:truncate()
-    c:put(true)
-end)
-fiber.create(function()
-    box.begin()
-    s:replace{1, 100, 'aaa'}
-    s:replace{2, 200, 'bbb'}
-    s:replace{3, 300, 'ccc'}
-    box.commit()
-    c:put(true)
-end)
-test_run:cmd("setopt delimiter ''");
-for i = 1, 3 do c:get() end
-s.index.i1:select()
-s.index.i2:select()
-s.index.i3:select()
-s:drop()
-
---
 -- Calling space.truncate concurrently.
 --
 s = box.schema.create_space('test', {engine = engine})
diff --git a/test/vinyl/errinj_vylog.result b/test/vinyl/errinj_vylog.result
index f78201c9..ca23cb45 100644
--- a/test/vinyl/errinj_vylog.result
+++ b/test/vinyl/errinj_vylog.result
@@ -67,7 +67,7 @@ s:drop()
 ---
 ...
 --
--- Check that an index drop/truncate/create record we failed to
+-- Check that an index drop/create record we failed to
 -- write to vylog is flushed along with the next record.
 --
 fiber = require 'fiber'
@@ -76,62 +76,57 @@ fiber = require 'fiber'
 s1 = box.schema.space.create('test1', {engine = 'vinyl'})
 ---
 ...
-_ = s1:create_index('pk')
----
-...
-_ = s1:insert{1, 'a'}
----
-...
 s2 = box.schema.space.create('test2', {engine = 'vinyl'})
 ---
 ...
 _ = s2:create_index('pk')
 ---
 ...
-_ = s2:insert{2, 'b'}
+_ = s2:insert{1, 'a'}
 ---
 ...
 box.snapshot()
 ---
 - ok
 ...
-_ = s1:insert{3, 'c'}
----
-...
-_ = s2:insert{4, 'd'}
+_ = s2:insert{2, 'b'}
 ---
 ...
-SCHED_TIMEOUT = 0.01
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
 ---
+- ok
 ...
-box.error.injection.set('ERRINJ_VY_SCHED_TIMEOUT', SCHED_TIMEOUT)
+-- VY_LOG_PREPARE_LSM written, but VY_LOG_CREATE_LSM missing
+ch = fiber.channel(1)
 ---
-- ok
 ...
-box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true);
+_ = fiber.create(function() s1:create_index('pk') ch:put(true) end)
 ---
-- ok
 ...
-s1:drop()
+fiber.sleep(0.001)
 ---
 ...
-s2:truncate()
+box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true)
 ---
+- ok
 ...
-_ = s2:insert{5, 'e'}
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
 ---
+- ok
 ...
-s3 = box.schema.space.create('test3', {engine = 'vinyl'})
+ch:get()
 ---
+- true
 ...
-_ = s3:create_index('pk')
+_ = s1:insert{3, 'c'}
 ---
 ...
-_ = s3:insert{6, 'f'}
+-- VY_LOG_DROP_LSM missing
+s2.index.pk:drop()
 ---
 ...
 -- pending records must not be rolled back on error
-box.snapshot()
+s2:create_index('pk') -- error
 ---
 - error: Error injection 'vinyl log flush'
 ...
@@ -139,65 +134,46 @@ box.error.injection.set('ERRINJ_VY_LOG_FLUSH', false);
 ---
 - ok
 ...
-fiber.sleep(2 * SCHED_TIMEOUT) -- wait for scheduler to unthrottle
+_ = s1:insert{4, 'd'}
 ---
 ...
-box.error.injection.set('ERRINJ_VY_SCHED_TIMEOUT', 0)
----
-- ok
-...
-box.snapshot()
----
-- ok
-...
-_ = s2:insert{7, 'g'}
+_ = s2:create_index('pk')
 ---
 ...
-_ = s3:insert{8, 'h'}
+_ = s2:insert{5, 'e'}
 ---
 ...
 test_run:cmd('restart server default')
 s1 = box.space.test1
 ---
 ...
-s1 == nil
+s2 = box.space.test2
 ---
-- true
 ...
-s2 = box.space.test2
+s1:select()
 ---
+- - [3, 'c']
+  - [4, 'd']
 ...
 s2:select()
 ---
 - - [5, 'e']
-  - [7, 'g']
-...
-s2:drop()
----
-...
-s3 = box.space.test3
----
 ...
-s3:select()
+s1:drop()
 ---
-- - [6, 'f']
-  - [8, 'h']
 ...
-s3:drop()
+s2:drop()
 ---
 ...
 --
--- Check that if a buffered index drop/truncate/create record
--- does not make it to the vylog before restart, it will be
--- replayed on recovery.
+-- Check that if a buffered index drop/create record does not
+-- make it to the vylog before restart, it will be replayed on
+-- recovery.
 --
-s1 = box.schema.space.create('test1', {engine = 'vinyl'})
----
-...
-_ = s1:create_index('pk')
+fiber = require 'fiber'
 ---
 ...
-_ = s1:insert{111, 'aaa'}
+s1 = box.schema.space.create('test1', {engine = 'vinyl'})
 ---
 ...
 s2 = box.schema.space.create('test2', {engine = 'vinyl'})
@@ -206,82 +182,97 @@ s2 = box.schema.space.create('test2', {engine = 'vinyl'})
 _ = s2:create_index('pk')
 ---
 ...
-_ = s2:insert{222, 'bbb'}
+_ = s2:insert{111, 'aaa'}
 ---
 ...
 box.snapshot()
 ---
 - ok
 ...
-_ = s1:insert{333, 'ccc'}
----
-...
-_ = s2:insert{444, 'ddd'}
+_ = s2:insert{222, 'bbb'}
 ---
 ...
-box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true);
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
 ---
 - ok
 ...
-s1:drop()
+-- VY_LOG_PREPARE_LSM written, but VY_LOG_CREATE_LSM missing
+ch = fiber.channel(1)
 ---
 ...
-s2:truncate()
+_ = fiber.create(function() s1:create_index('pk') ch:put(true) end)
 ---
 ...
-_ = s2:insert{555, 'eee'}
+fiber.sleep(0.001)
 ---
 ...
-s3 = box.schema.space.create('test3', {engine = 'vinyl'})
+box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true)
 ---
+- ok
 ...
-_ = s3:create_index('pk')
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
 ---
+- ok
 ...
-_ = s3:insert{666, 'fff'}
+ch:get()
 ---
+- true
 ...
--- gh-2532: replaying create/drop from xlog crashes tarantool
-test_run:cmd("setopt delimiter ';'")
+_ = s1:insert{333, 'ccc'}
 ---
-- true
 ...
-for i = 1, 10 do
-    s = box.schema.space.create('test', {engine = 'vinyl'})
-    s:create_index('primary')
-    s:create_index('secondary', {unique = false, parts = {2, 'string'}})
-    s:insert{i, 'test' .. i}
-    s:truncate()
-    s:drop()
-end
-test_run:cmd("setopt delimiter ''");
+-- VY_LOG_DROP_LSM missing
+s2.index.pk:drop()
 ---
 ...
 test_run:cmd('restart server default')
 s1 = box.space.test1
 ---
 ...
-s1 == nil
+s2 = box.space.test2
 ---
-- true
 ...
-s2 = box.space.test2
+_ = s1:insert{444, 'ddd'}
+---
+...
+_ = s2:create_index('pk')
 ---
 ...
+_ = s2:insert{555, 'eee'}
+---
+...
+s1:select()
+---
+- - [333, 'ccc']
+  - [444, 'ddd']
+...
 s2:select()
 ---
 - - [555, 'eee']
 ...
-s2:drop()
+box.snapshot()
 ---
+- ok
 ...
-s3 = box.space.test3
+test_run:cmd('restart server default')
+s1 = box.space.test1
 ---
 ...
-s3:select()
+s2 = box.space.test2
 ---
-- - [666, 'fff']
 ...
-s3:drop()
+s1:select()
+---
+- - [333, 'ccc']
+  - [444, 'ddd']
+...
+s2:select()
+---
+- - [555, 'eee']
+...
+s1:drop()
+---
+...
+s2:drop()
 ---
 ...
diff --git a/test/vinyl/errinj_vylog.test.lua b/test/vinyl/errinj_vylog.test.lua
index 36b3659d..3d90755d 100644
--- a/test/vinyl/errinj_vylog.test.lua
+++ b/test/vinyl/errinj_vylog.test.lua
@@ -32,111 +32,101 @@ s:select()
 s:drop()
 
 --
--- Check that an index drop/truncate/create record we failed to
+-- Check that an index drop/create record we failed to
 -- write to vylog is flushed along with the next record.
 --
 fiber = require 'fiber'
 
 s1 = box.schema.space.create('test1', {engine = 'vinyl'})
-_ = s1:create_index('pk')
-_ = s1:insert{1, 'a'}
-
 s2 = box.schema.space.create('test2', {engine = 'vinyl'})
 _ = s2:create_index('pk')
+_ = s2:insert{1, 'a'}
+box.snapshot()
 _ = s2:insert{2, 'b'}
 
-box.snapshot()
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
 
-_ = s1:insert{3, 'c'}
-_ = s2:insert{4, 'd'}
+-- VY_LOG_PREPARE_LSM written, but VY_LOG_CREATE_LSM missing
+ch = fiber.channel(1)
+_ = fiber.create(function() s1:create_index('pk') ch:put(true) end)
+fiber.sleep(0.001)
 
-SCHED_TIMEOUT = 0.01
-box.error.injection.set('ERRINJ_VY_SCHED_TIMEOUT', SCHED_TIMEOUT)
-box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true);
+box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true)
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
 
-s1:drop()
-s2:truncate()
-_ = s2:insert{5, 'e'}
+ch:get()
+_ = s1:insert{3, 'c'}
 
-s3 = box.schema.space.create('test3', {engine = 'vinyl'})
-_ = s3:create_index('pk')
-_ = s3:insert{6, 'f'}
+-- VY_LOG_DROP_LSM missing
+s2.index.pk:drop()
 
 -- pending records must not be rolled back on error
-box.snapshot()
+s2:create_index('pk') -- error
 
 box.error.injection.set('ERRINJ_VY_LOG_FLUSH', false);
-fiber.sleep(2 * SCHED_TIMEOUT) -- wait for scheduler to unthrottle
-box.error.injection.set('ERRINJ_VY_SCHED_TIMEOUT', 0)
-
-box.snapshot()
 
-_ = s2:insert{7, 'g'}
-_ = s3:insert{8, 'h'}
+_ = s1:insert{4, 'd'}
+_ = s2:create_index('pk')
+_ = s2:insert{5, 'e'}
 
 test_run:cmd('restart server default')
 
 s1 = box.space.test1
-s1 == nil
-
 s2 = box.space.test2
+s1:select()
 s2:select()
+s1:drop()
 s2:drop()
 
-s3 = box.space.test3
-s3:select()
-s3:drop()
-
 --
--- Check that if a buffered index drop/truncate/create record
--- does not make it to the vylog before restart, it will be
--- replayed on recovery.
+-- Check that if a buffered index drop/create record does not
+-- make it to the vylog before restart, it will be replayed on
+-- recovery.
 --
+fiber = require 'fiber'
 
 s1 = box.schema.space.create('test1', {engine = 'vinyl'})
-_ = s1:create_index('pk')
-_ = s1:insert{111, 'aaa'}
-
 s2 = box.schema.space.create('test2', {engine = 'vinyl'})
 _ = s2:create_index('pk')
+_ = s2:insert{111, 'aaa'}
+box.snapshot()
 _ = s2:insert{222, 'bbb'}
 
-box.snapshot()
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
+
+-- VY_LOG_PREPARE_LSM written, but VY_LOG_CREATE_LSM missing
+ch = fiber.channel(1)
+_ = fiber.create(function() s1:create_index('pk') ch:put(true) end)
+fiber.sleep(0.001)
+
+box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true)
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
 
+ch:get()
 _ = s1:insert{333, 'ccc'}
-_ = s2:insert{444, 'ddd'}
 
-box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true);
+-- VY_LOG_DROP_LSM missing
+s2.index.pk:drop()
 
-s1:drop()
-s2:truncate()
+test_run:cmd('restart server default')
+
+s1 = box.space.test1
+s2 = box.space.test2
+
+_ = s1:insert{444, 'ddd'}
+_ = s2:create_index('pk')
 _ = s2:insert{555, 'eee'}
 
-s3 = box.schema.space.create('test3', {engine = 'vinyl'})
-_ = s3:create_index('pk')
-_ = s3:insert{666, 'fff'}
-
--- gh-2532: replaying create/drop from xlog crashes tarantool
-test_run:cmd("setopt delimiter ';'")
-for i = 1, 10 do
-    s = box.schema.space.create('test', {engine = 'vinyl'})
-    s:create_index('primary')
-    s:create_index('secondary', {unique = false, parts = {2, 'string'}})
-    s:insert{i, 'test' .. i}
-    s:truncate()
-    s:drop()
-end
-test_run:cmd("setopt delimiter ''");
+s1:select()
+s2:select()
+
+box.snapshot()
 
 test_run:cmd('restart server default')
 
 s1 = box.space.test1
-s1 == nil
-
 s2 = box.space.test2
+s1:select()
 s2:select()
+s1:drop()
 s2:drop()
-
-s3 = box.space.test3
-s3:select()
-s3:drop()
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index 8878cb5e..a56beb69 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -191,12 +191,12 @@ result
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [7, {2: 4}]
+          tuple: [7, {2: 5}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [7, {2: 5}]
+          tuple: [7, {2: 4}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
-- 
2.11.0




More information about the Tarantool-patches mailing list