[PATCH] vinyl: allow to build secondary index for non-empty space

Vladimir Davydov vdavydov.dev at gmail.com
Sat Apr 21 16:46:31 MSK 2018


This patch implements space_vtab::build_index callback for vinyl spaces.
Now instead of returning an error in case the space is not empty, the
callback will actually try to build a new index. The build procedure
consists of four steps:

 1. Prepare the LSM tree for building. This implies writing a special
    record to vylog, VY_LOG_PREPARE_LSM, and adding the new index to
    the vinyl scheduler so that it can be dumped during build. We need
    to log the new LSM tree so that we can keep track of run files
    created for it during build and remove them if build procedure
    fails.

 2. Inserting tuples stored in the space into the new LSM tree. Since
    there may concurrent DML requests, we install a trigger to forward
    them to the new index.

 3. Dumping the index to disk so that we don't have to rebuild it after
    recovery.

 4. Committing the new LSM tree in vylog (VY_LOG_CREATE_LSM).

Steps 1-3 are done from the space_vtab::build_index callback while
step 4 is done after WAL write, from index_vtab::commit_create.

While step 3 is being performed, new DML requests may be executed for
the altered space. Those requests will be reflected in the new index
thanks to the on_replace trigger, however they won't be recovered during
WAL recovery as they will appear in WAL before the ALTER record that
created the index. To recover them, we replay all statements stored in
the primary key's memory level when replaying the ALTER record during
WAL recovery.
---
https://github.com/tarantool/tarantool/commits/vy-allow-to-build-secondary-indexes

 src/box/vinyl.c                  | 624 ++++++++++++++++++++++++++++++++----
 src/box/vy_log.c                 | 348 ++++++++++++++------
 src/box/vy_log.h                 |  40 ++-
 src/box/vy_lsm.c                 |  40 ++-
 src/box/vy_lsm.h                 |   7 +-
 src/box/vy_mem.c                 |   8 +-
 src/box/vy_quota.h               |  10 +
 src/box/vy_scheduler.c           |  38 ++-
 src/box/vy_scheduler.h           |   7 +
 test/box/alter.result            | 501 -----------------------------
 test/box/alter.test.lua          | 159 ---------
 test/engine/ddl.result           | 672 +++++++++++++++++++++++++++++++++++++++
 test/engine/ddl.test.lua         | 221 +++++++++++++
 test/unit/vy_mem.c               |   4 +-
 test/vinyl/ddl.result            | 442 ++++++++++++++-----------
 test/vinyl/ddl.test.lua          | 218 ++++++++-----
 test/vinyl/errinj.result         | 256 +++++++++++++++
 test/vinyl/errinj.test.lua       | 115 +++++++
 test/vinyl/errinj_gc.result      |  74 ++++-
 test/vinyl/errinj_gc.test.lua    |  37 ++-
 test/vinyl/errinj_vylog.result   |  57 ++++
 test/vinyl/errinj_vylog.test.lua |  29 ++
 test/vinyl/gh.result             |   2 +-
 test/vinyl/layout.result         |  40 +--
 24 files changed, 2832 insertions(+), 1117 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index ff4c2831..1f88be08 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -62,6 +62,7 @@
 #include "txn.h"
 #include "xrow.h"
 #include "xlog.h"
+#include "schema.h"
 #include "engine.h"
 #include "space.h"
 #include "index.h"
@@ -256,6 +257,11 @@ static const struct engine_vtab vinyl_engine_vtab;
 static const struct space_vtab vinyl_space_vtab;
 static const struct index_vtab vinyl_index_vtab;
 
+static int
+vy_lsm_get(struct vy_lsm *lsm, struct vy_tx *tx,
+	   const struct vy_read_view **rv,
+	   struct tuple *key, struct tuple **result);
+
 /**
  * A quick intro into Vinyl cosmology and file format
  * --------------------------------------------------
@@ -812,10 +818,10 @@ vinyl_index_commit_create(struct index *index, int64_t lsn)
 	struct vy_env *env = vy_env(index->engine);
 	struct vy_lsm *lsm = vy_lsm(index);
 
-	assert(lsm->id >= 0);
-
 	if (env->status == VINYL_INITIAL_RECOVERY_LOCAL ||
 	    env->status == VINYL_FINAL_RECOVERY_LOCAL) {
+		if (lsm->id >= 0)
+			vy_scheduler_add_lsm(&env->scheduler, lsm);
 		/*
 		 * Normally, if this is local recovery, the index
 		 * should have been logged before restart. There's
@@ -824,10 +830,8 @@ vinyl_index_commit_create(struct index *index, int64_t lsn)
 		 * the index isn't in the recovery context and we
 		 * need to retry to log it now.
 		 */
-		if (lsm->commit_lsn >= 0) {
-			vy_scheduler_add_lsm(&env->scheduler, lsm);
+		if (lsm->commit_lsn >= 0)
 			return;
-		}
 	}
 
 	if (env->status == VINYL_INITIAL_RECOVERY_REMOTE) {
@@ -852,9 +856,6 @@ vinyl_index_commit_create(struct index *index, int64_t lsn)
 	assert(lsm->commit_lsn < 0);
 	lsm->commit_lsn = lsn;
 
-	assert(lsm->range_count == 1);
-	struct vy_range *range = vy_range_tree_first(lsm->tree);
-
 	/*
 	 * Since it's too late to fail now, in case of vylog write
 	 * failure we leave the records we attempted to write in
@@ -864,15 +865,25 @@ vinyl_index_commit_create(struct index *index, int64_t lsn)
 	 * recovery.
 	 */
 	vy_log_tx_begin();
-	vy_log_create_lsm(lsm->id, lsm->space_id, lsm->index_id,
-			  lsm->key_def, lsn);
-	vy_log_insert_range(lsm->id, range->id, NULL, NULL);
+	if (lsm->id < 0) {
+		lsm->id = vy_log_next_id();
+		vy_log_create_lsm(lsm->id, lsm->space_id, lsm->index_id,
+				  lsm->key_def, lsn);
+
+		assert(lsm->range_count == 1);
+		struct vy_range *range = vy_range_tree_first(lsm->tree);
+		vy_log_insert_range(lsm->id, range->id, NULL, NULL);
+
+		vy_scheduler_add_lsm(&env->scheduler, lsm);
+	} else {
+		/*
+		 * The LSM tree has been prepared and built,
+		 * just commit it.
+		 */
+		vy_log_create_lsm(lsm->id, lsm->space_id, lsm->index_id,
+				  lsm->key_def, lsn);
+	}
 	vy_log_tx_try_commit();
-	/*
-	 * After we committed the index in the log, we can schedule
-	 * a task for it.
-	 */
-	vy_scheduler_add_lsm(&env->scheduler, lsm);
 }
 
 static void
@@ -929,6 +940,35 @@ vy_log_lsm_prune(struct vy_lsm *lsm, int64_t gc_lsn)
 }
 
 static void
+vinyl_index_abort_create(struct index *index)
+{
+	struct vy_env *env = vy_env(index->engine);
+	struct vy_lsm *lsm = vy_lsm(index);
+
+	lsm->is_dropped = true;
+
+	/* Nothing to do if the LSM tree was not prepared for build. */
+	if (lsm->id < 0)
+		return;
+
+	/*
+	 * No need to clean up vylog if this is a failure during
+	 * local recovery.
+	 */
+	if (env->status == VINYL_INITIAL_RECOVERY_LOCAL ||
+	    env->status == VINYL_FINAL_RECOVERY_LOCAL)
+		return;
+
+	vy_scheduler_remove_lsm(&env->scheduler, lsm);
+
+	/* Expunge the incomplete LSM tree from vylog. */
+	vy_log_tx_begin();
+	vy_log_lsm_prune(lsm, 0);
+	vy_log_drop_lsm(lsm->id);
+	vy_log_tx_try_commit();
+}
+
+static void
 vinyl_index_commit_drop(struct index *index)
 {
 	struct vy_env *env = vy_env(index->engine);
@@ -1172,50 +1212,491 @@ vinyl_space_drop_primary_key(struct space *space)
 	(void)space;
 }
 
+/**
+ * Prepare a new LSM tree for building.
+ */
 static int
-vinyl_space_build_index(struct space *src_space, struct index *new_index,
-			struct tuple_format *new_format)
+vy_build_prepare(struct vy_env *env, struct vy_lsm *lsm)
 {
-	(void)new_format;
+	/*
+	 * First, log the new LSM tree in vylog. This is necessary,
+	 * because the LSM tree may be dumped during build and so we
+	 * need to keep track of run files created for it so that we
+	 * can clean up in case build fails.
+	 */
+	vy_log_tx_begin();
 
-	struct vy_env *env = vy_env(src_space->engine);
-	struct vy_lsm *pk = vy_lsm(src_space->index[0]);
+	int64_t id = vy_log_next_id();
+	vy_log_prepare_lsm(id, lsm->space_id, lsm->index_id);
+
+	assert(lsm->range_count == 1);
+	struct vy_range *range = vy_range_tree_first(lsm->tree);
+	vy_log_insert_range(id, range->id, NULL, NULL);
+
+	if (vy_log_tx_commit() < 0)
+		return -1;
+
+	/*
+	 * Assign id. Note, it has to be done after successfully logging
+	 * the new LSM tree, because vinyl_index_abort_create() uses id
+	 * as indicator that index has been successfully prepared.
+	 */
+	assert(lsm->id < 0);
+	lsm->id = id;
+
+	/*
+	 * After we committed the new LSM tree in vylog, we add it to
+	 * the scheduler so that it can be dumped and compacted during
+	 * build.
+	 */
+	vy_scheduler_add_lsm(&env->scheduler, lsm);
+	return 0;
+}
+
+/** Argument passed to vy_build_on_replace(). */
+struct vy_build_ctx {
+	/** LSM tree under construction. */
+	struct vy_lsm *lsm;
+	/** Format to check new tuples against. */
+	struct tuple_format *format;
+	/**
+	 * Names of the altered space and the new index.
+	 * Used for error reporting.
+	 */
+	const char *space_name;
+	const char *index_name;
+	/** Set in case a build error occurred. */
+	bool is_failed;
+	/** Container for storing errors. */
+	struct diag diag;
+};
 
+/**
+ * This is an on_replace trigger callback that forwards DML requests
+ * to the index that is currently being built.
+ */
+static void
+vy_build_on_replace(struct trigger *trigger, void *event)
+{
+	struct txn *txn = event;
+	struct txn_stmt *stmt = txn_current_stmt(txn);
+	struct vy_build_ctx *ctx = trigger->data;
+	struct vy_tx *tx = txn->engine_tx;
+	struct tuple_format *format = ctx->format;
+	struct vy_lsm *lsm = ctx->lsm;
+
+	if (ctx->is_failed)
+		return; /* already failed, nothing to do */
+
+	struct tuple *delete = NULL;
+	struct tuple *insert = NULL;
+
+	/* Check new tuples for conformity to the new format. */
+	if (stmt->new_tuple != NULL &&
+	    tuple_validate(format, stmt->new_tuple) != 0)
+		goto err;
+
+	/* Check key uniqueness if necessary. */
+	if (stmt->new_tuple != NULL && lsm->check_is_unique &&
+	    (!lsm->key_def->is_nullable ||
+	     !vy_tuple_key_contains_null(stmt->new_tuple, lsm->key_def))) {
+		struct tuple *key = vy_stmt_extract_key(stmt->new_tuple,
+							lsm->key_def,
+							lsm->env->key_format);
+		if (key == NULL)
+			goto err;
+		struct tuple *found;
+		int rc = vy_lsm_get(lsm, tx, vy_tx_read_view(tx), key, &found);
+		tuple_unref(key);
+		if (rc != 0)
+			goto err;
+		if (found != NULL) {
+			tuple_unref(found);
+			diag_set(ClientError, ER_TUPLE_FOUND,
+				 ctx->index_name, ctx->space_name);
+			goto err;
+		}
+	}
+
+	/* Forward the statement to the new LSM tree. */
+	if (stmt->old_tuple != NULL) {
+		delete = vy_stmt_new_surrogate_delete(format, stmt->old_tuple);
+		if (delete == NULL)
+			goto err;
+	}
+	if (stmt->new_tuple != NULL) {
+		uint32_t data_len;
+		const char *data = tuple_data_range(stmt->new_tuple, &data_len);
+		insert = vy_stmt_new_insert(format, data, data + data_len);
+		if (insert == NULL)
+			goto err;
+	}
+	if (delete != NULL && vy_tx_set(tx, lsm, delete) != 0)
+		goto err;
+	if (insert != NULL && vy_tx_set(tx, lsm, insert) != 0)
+		goto err;
+out:
+	if (delete != NULL)
+		tuple_unref(delete);
+	if (insert != NULL)
+		tuple_unref(insert);
+	return;
+err:
+	ctx->is_failed = true;
+	diag_move(diag_get(), &ctx->diag);
+	goto out;
+}
+
+/**
+ * Insert a tuple fetched from the space into the LSM tree that
+ * is currently being built.
+ */
+static int
+vy_build_insert_tuple(struct vy_env *env, struct vy_lsm *lsm,
+		      const char *space_name, const char *index_name,
+		      struct tuple_format *new_format, struct tuple *tuple)
+{
 	/*
-	 * During local recovery we are loading existing indexes
-	 * from disk, not building new ones.
-	 */
-	if (env->status != VINYL_INITIAL_RECOVERY_LOCAL &&
-	    env->status != VINYL_FINAL_RECOVERY_LOCAL) {
-		if (pk->stat.disk.count.rows != 0 ||
-		    pk->stat.memory.count.rows != 0) {
-			diag_set(ClientError, ER_UNSUPPORTED, "Vinyl",
-				 "building an index for a non-empty space");
+	 * Use the last LSN to the time, because read iterator
+	 * guarantees that this is the newest tuple version.
+	 */
+	int64_t lsn = env->xm->lsn;
+	struct vy_mem *mem = lsm->mem;
+
+	/* Check the tuple against the new space format. */
+	if (tuple_validate(new_format, tuple) != 0)
+		return -1;
+
+	/* Check unique constraint if necessary. */
+	if (lsm->check_is_unique &&
+	    (!lsm->key_def->is_nullable ||
+	     !vy_tuple_key_contains_null(tuple, lsm->key_def))) {
+		struct tuple *key = vy_stmt_extract_key(tuple, lsm->key_def,
+							lsm->env->key_format);
+		if (key == NULL)
+			return -1;
+		/*
+		 * Make sure the in-memory index won't go away
+		 * while we are reading disk.
+		 */
+		vy_mem_pin(mem);
+
+		struct tuple *found;
+		int rc = vy_lsm_get(lsm, NULL, &env->xm->p_committed_read_view,
+				    key, &found);
+		vy_mem_unpin(mem);
+		tuple_unref(key);
+
+		if (rc != 0)
+			return -1;
+
+		if (found != NULL &&
+		    vy_stmt_compare(tuple, found, lsm->cmp_def) == 0) {
+			tuple_unref(found);
+			return 0;
+		}
+		if (found != NULL) {
+			tuple_unref(found);
+			diag_set(ClientError, ER_TUPLE_FOUND,
+				 index_name, space_name);
 			return -1;
 		}
 	}
 
+	/* Reallocate the new tuple using the new space format. */
+	uint32_t data_len;
+	const char *data = tuple_data_range(tuple, &data_len);
+	struct tuple *stmt = vy_stmt_new_replace(new_format, data,
+						 data + data_len);
+	if (stmt == NULL)
+		return -1;
+
+	/* Insert the new tuple into the in-memory index. */
+	size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
+	const struct tuple *region_stmt = vy_stmt_dup_lsregion(stmt,
+				&mem->env->allocator, mem->generation);
+	tuple_unref(stmt);
+	if (region_stmt == NULL)
+		return -1;
+	vy_stmt_set_lsn((struct tuple *)region_stmt, lsn);
+	if (vy_mem_insert(mem, region_stmt) != 0)
+		return -1;
+
+	mem->min_lsn = MIN(mem->min_lsn, lsn);
+	mem->max_lsn = MAX(mem->max_lsn, lsn);
+	vy_stmt_counter_acct_tuple(&lsm->stat.memory.count, region_stmt);
+
+	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
+	assert(mem_used_after >= mem_used_before);
+	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_wait(&env->quota);
+	return 0;
+}
+
+/**
+ * Recover a single statement that was inserted into the space
+ * while the newly built index was dumped to disk.
+ */
+static int
+vy_build_recover_mem_stmt(struct vy_lsm *lsm, struct vy_lsm *pk,
+			  const struct tuple *mem_stmt)
+{
+	int64_t lsn = vy_stmt_lsn(mem_stmt);
+	if (lsn <= lsm->dump_lsn)
+		return 0; /* statement was dumped, nothing to do */
+
+	/* Lookup the tuple that was affected by the statement. */
+	const struct vy_read_view rv = { .vlsn = lsn - 1 };
+	const struct vy_read_view *p_rv = &rv;
+	struct tuple *old_tuple;
+	if (vy_point_lookup(pk, NULL, &p_rv, (struct tuple *)mem_stmt,
+			    &old_tuple) != 0)
+		return -1;
+
+	int rc = -1;
+	uint32_t data_len;
+	const char *data;
+	struct tuple *new_tuple;
+	struct tuple *delete = NULL;
+	struct tuple *insert = NULL;
+
+	/*
+	 * Create DELETE + REPLACE statements corresponding to
+	 * the given statement in the secondary index.
+	 */
+	switch (vy_stmt_type(mem_stmt)) {
+	case IPROTO_INSERT:
+	case IPROTO_REPLACE:
+		data = tuple_data_range(mem_stmt, &data_len);
+		insert = vy_stmt_new_insert(lsm->mem_format,
+					    data, data + data_len);
+		if (insert == NULL)
+			goto out;
+		goto make_delete;
+
+	case IPROTO_UPSERT:
+		new_tuple = vy_apply_upsert(mem_stmt, old_tuple, pk->cmp_def,
+					    pk->mem_format, true);
+		if (new_tuple == NULL)
+			goto out;
+		data = tuple_data_range(new_tuple, &data_len);
+		insert = vy_stmt_new_insert(lsm->mem_format,
+					    data, data + data_len);
+		tuple_unref(new_tuple);
+		if (insert == NULL)
+			goto out;
+		goto make_delete;
+
+	case IPROTO_DELETE: make_delete:
+		if (old_tuple != NULL) {
+			delete = vy_stmt_new_surrogate_delete(lsm->mem_format,
+							      old_tuple);
+			if (delete == NULL)
+				goto out;
+		}
+		break;
+	default:
+		unreachable();
+	}
+
+	/* Insert DELETE + REPLACE into the LSM tree. */
+	if (delete != NULL) {
+		struct tuple *region_stmt = vy_stmt_dup_lsregion(delete,
+						&lsm->mem->env->allocator,
+						lsm->mem->generation);
+		if (region_stmt == NULL)
+			goto out;
+		vy_stmt_set_lsn((struct tuple *)region_stmt, lsn);
+		if (vy_mem_insert(lsm->mem, region_stmt) != 0)
+			goto out;
+		vy_stmt_counter_acct_tuple(&lsm->stat.memory.count, delete);
+	}
+	if (insert != NULL) {
+		struct tuple *region_stmt = vy_stmt_dup_lsregion(insert,
+						&lsm->mem->env->allocator,
+						lsm->mem->generation);
+		if (region_stmt == NULL)
+			goto out;
+		vy_stmt_set_lsn((struct tuple *)region_stmt, lsn);
+		if (vy_mem_insert(lsm->mem, region_stmt) != 0)
+			goto out;
+		vy_stmt_counter_acct_tuple(&lsm->stat.memory.count, insert);
+	}
+	lsm->mem->min_lsn = MIN(lsm->mem->min_lsn, lsn);
+	lsm->mem->max_lsn = MAX(lsm->mem->max_lsn, lsn);
+
+	rc = 0;
+out:
+	if (old_tuple != NULL)
+		tuple_unref(old_tuple);
+	if (delete != NULL)
+		tuple_unref(delete);
+	if (insert != NULL)
+		tuple_unref(insert);
+	return rc;
+
+}
+
+/**
+ * Recover all statements stored in the given in-memory index
+ * that were inserted into the space while the newly built index
+ * was dumped to disk.
+ */
+static int
+vy_build_recover_mem(struct vy_lsm *lsm, struct vy_lsm *pk, struct vy_mem *mem)
+{
 	/*
-	 * Unlike Memtx, Vinyl does not need building of a secondary index.
-	 * This is true because of two things:
-	 * 1) Vinyl does not support alter of non-empty spaces
-	 * 2) During recovery a Vinyl index already has all needed data on disk.
-	 * And there are 3 cases:
-	 * I. The secondary index is added in snapshot. Then Vinyl was
-	 * snapshotted too and all necessary for that moment data is on disk.
-	 * II. The secondary index is added in WAL. That means that vinyl
-	 * space had no data at that point and had nothing to build. The
-	 * index actually could contain recovered data, but it will handle it
-	 * by itself during WAL recovery.
-	 * III. Vinyl is online. The space is definitely empty and there's
-	 * nothing to build.
+	 * Recover statements starting from the oldest one.
+	 * Key order doesn't matter so we simply iterate over
+	 * the in-memory index in reverse order.
 	 */
+	struct vy_mem_tree_iterator itr;
+	itr = vy_mem_tree_iterator_last(&mem->tree);
+	while (!vy_mem_tree_iterator_is_invalid(&itr)) {
+		const struct tuple *mem_stmt;
+		mem_stmt = *vy_mem_tree_iterator_get_elem(&mem->tree, &itr);
+		if (vy_build_recover_mem_stmt(lsm, pk, mem_stmt) != 0)
+			return -1;
+		vy_mem_tree_iterator_prev(&mem->tree, &itr);
+	}
+	return 0;
+}
+
+/**
+ * Recover the memory level of a newly built index.
+ *
+ * During the final dump of a newly built index, new statements may
+ * be inserted into the space. If those statements are not dumped to
+ * disk before restart, they won't be recovered from WAL, because at
+ * the time they were generated the new index didn't exist. In order
+ * to recover them, we replay all statements stored in the memory
+ * level of the primary index.
+ */
+static int
+vy_build_recover(struct vy_env *env, struct vy_lsm *lsm, struct vy_lsm *pk)
+{
+	int rc;
+	struct vy_mem *mem;
+	size_t mem_used_before, mem_used_after;
+
+	mem_used_before = lsregion_used(&env->mem_env.allocator);
+	rlist_foreach_entry_reverse(mem, &pk->sealed, in_sealed) {
+		rc = vy_build_recover_mem(lsm, pk, mem);
+		if (rc != 0)
+			goto out;
+	}
+	rc = vy_build_recover_mem(lsm, pk, pk->mem);
+out:
+	mem_used_after = lsregion_used(&env->mem_env.allocator);
+	assert(mem_used_after >= mem_used_before);
+	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	return rc;
+}
+
+static int
+vinyl_space_build_index(struct space *src_space, struct index *new_index,
+			struct tuple_format *new_format)
+{
+	struct vy_env *env = vy_env(src_space->engine);
+	struct vy_lsm *pk = vy_lsm(src_space->index[0]);
+	bool is_empty = (pk->stat.disk.count.rows == 0 &&
+			 pk->stat.memory.count.rows == 0);
+
+	if (new_index->def->iid == 0 && !is_empty) {
+		diag_set(ClientError, ER_UNSUPPORTED, "Vinyl",
+			 "rebuilding the primary index of a non-empty space");
+		return -1;
+	}
+
 	if (vinyl_index_open(new_index) != 0)
 		return -1;
 
 	/* Set pointer to the primary key for the new index. */
-	vy_lsm_update_pk(vy_lsm(new_index), pk);
-	return 0;
+	struct vy_lsm *new_lsm = vy_lsm(new_index);
+	vy_lsm_update_pk(new_lsm, pk);
+
+	if (env->status == VINYL_INITIAL_RECOVERY_LOCAL ||
+	    env->status == VINYL_FINAL_RECOVERY_LOCAL)
+		return vy_build_recover(env, new_lsm, pk);
+
+	if (is_empty)
+		return 0;
+
+	if (vy_build_prepare(env, new_lsm) != 0)
+		return -1;
+
+	/*
+	 * Iterate over all tuples stored in the space and insert
+	 * each of them into the new LSM tree. Since read iterator
+	 * may yield, we install an on_replace trigger to forward
+	 * DML requests issued during the build.
+	 */
+	struct tuple *key = vy_stmt_new_select(pk->env->key_format, NULL, 0);
+	if (key == NULL)
+		return -1;
+
+	struct trigger on_replace;
+	struct vy_build_ctx ctx;
+	ctx.lsm = new_lsm;
+	ctx.format = new_format;
+	ctx.space_name = space_name(src_space);
+	ctx.index_name = new_index->def->name;
+	ctx.is_failed = false;
+	diag_create(&ctx.diag);
+	trigger_create(&on_replace, vy_build_on_replace, &ctx, NULL);
+	trigger_add(&src_space->on_replace, &on_replace);
+
+	struct vy_read_iterator itr;
+	vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, key,
+			      &env->xm->p_committed_read_view);
+	int rc;
+	int loops = 0;
+	struct tuple *tuple;
+	while ((rc = vy_read_iterator_next(&itr, &tuple)) == 0) {
+		if (tuple == NULL)
+			break;
+		/*
+		 * Note, yield is not allowed here. If we yielded,
+		 * the tuple could be overwritten by a concurrent
+		 * transaction, in which case we would insert an
+		 * outdated tuple to the new index.
+		 */
+		rc = vy_build_insert_tuple(env, new_lsm, space_name(src_space),
+				new_index->def->name, new_format, tuple);
+		if (rc != 0)
+			break;
+		/*
+		 * Read iterator yields only when it reads runs.
+		 * Yield periodically in order not to stall the
+		 * tx thread in case there are a lot of tuples in
+		 * mems or cache.
+		 */
+		if (++loops % VY_YIELD_LOOPS == 0)
+			fiber_sleep(0);
+		if (ctx.is_failed) {
+			diag_move(&ctx.diag, diag_get());
+			rc = -1;
+			break;
+		}
+	}
+	vy_read_iterator_close(&itr);
+	tuple_unref(key);
+
+	/*
+	 * Dump the new index upon build completion so that we don't
+	 * have to rebuild it on recovery.
+	 */
+	if (rc == 0)
+		rc = vy_scheduler_dump(&env->scheduler);
+
+	if (rc == 0 && ctx.is_failed) {
+		diag_move(&ctx.diag, diag_get());
+		rc = -1;
+	}
+
+	diag_destroy(&ctx.diag);
+	trigger_clear(&on_replace);
+	return rc;
 }
 
 static size_t
@@ -1322,7 +1803,7 @@ vy_is_committed(struct vy_env *env, struct space *space)
  * @param  0 Success.
  * @param -1 Memory error or read error.
  */
-static inline int
+static int
 vy_lsm_get(struct vy_lsm *lsm, struct vy_tx *tx,
 	     const struct vy_read_view **rv,
 	     struct tuple *key, struct tuple **result)
@@ -2929,6 +3410,7 @@ vinyl_engine_end_recovery(struct engine *engine)
 		vy_gc(e, e->recovery, VY_GC_INCOMPLETE, INT64_MAX);
 		vy_recovery_delete(e->recovery);
 		e->recovery = NULL;
+		e->xm->lsn = vclock_sum(e->recovery_vclock);
 		e->recovery_vclock = NULL;
 		e->status = VINYL_ONLINE;
 		vy_quota_set_limit(&e->quota, e->memory);
@@ -3129,7 +3611,8 @@ vy_send_lsm(struct vy_join_ctx *ctx, struct vy_lsm_recovery_info *lsm_info)
 {
 	int rc = -1;
 
-	if (lsm_info->is_dropped)
+	/* Skip dropped or incomplete indexes. */
+	if (lsm_info->is_dropped || lsm_info->create_lsn < 0)
 		return 0;
 
 	/*
@@ -3361,18 +3844,48 @@ vy_gc(struct vy_env *env, struct vy_recovery *recovery,
 	int loops = 0;
 	struct vy_lsm_recovery_info *lsm_info;
 	rlist_foreach_entry(lsm_info, &recovery->lsms, in_recovery) {
+		/*
+		 * Delete unused run files.
+		 */
 		struct vy_run_recovery_info *run_info;
 		rlist_foreach_entry(run_info, &lsm_info->runs, in_lsm) {
-			if ((run_info->is_dropped &&
-			     run_info->gc_lsn < gc_lsn &&
-			     (gc_mask & VY_GC_DROPPED) != 0) ||
-			    (run_info->is_incomplete &&
-			     (gc_mask & VY_GC_INCOMPLETE) != 0)) {
+			bool purge = false;
+
+			/* Run was deleted before the oldest checkpoint. */
+			if ((gc_mask & VY_GC_DROPPED) != 0 &&
+			    run_info->is_dropped && run_info->gc_lsn < gc_lsn)
+				purge = true;
+
+			/* Index or run was not committed. */
+			if ((gc_mask & VY_GC_INCOMPLETE) != 0 &&
+			    (lsm_info->create_lsn < 0 || run_info->is_incomplete))
+				purge = true;
+
+			if (purge)
 				vy_gc_run(env, lsm_info, run_info);
-			}
+
 			if (loops % VY_YIELD_LOOPS == 0)
 				fiber_sleep(0);
 		}
+
+		/*
+		 * Purge incomplete indexes from vylog.
+		 */
+		if (lsm_info->create_lsn >= 0 || lsm_info->is_dropped ||
+		    (gc_mask & VY_GC_INCOMPLETE) == 0)
+			continue;
+
+		vy_log_tx_begin();
+		struct vy_range_recovery_info *range_info;
+		rlist_foreach_entry(range_info, &lsm_info->ranges, in_lsm) {
+			struct vy_slice_recovery_info *slice_info;
+			rlist_foreach_entry(slice_info, &range_info->slices,
+					    in_range)
+				vy_log_delete_slice(slice_info->id);
+			vy_log_delete_range(range_info->id);
+		}
+		vy_log_drop_lsm(lsm_info->id);
+		vy_log_tx_try_commit();
 	}
 }
 
@@ -3424,7 +3937,8 @@ vinyl_engine_backup(struct engine *engine, struct vclock *vclock,
 	int loops = 0;
 	struct vy_lsm_recovery_info *lsm_info;
 	rlist_foreach_entry(lsm_info, &recovery->lsms, in_recovery) {
-		if (lsm_info->is_dropped)
+		/* Skip dropped or incomplete indexes. */
+		if (lsm_info->is_dropped || lsm_info->create_lsn < 0)
 			continue;
 		struct vy_run_recovery_info *run_info;
 		rlist_foreach_entry(run_info, &lsm_info->runs, in_lsm) {
@@ -4030,7 +4544,7 @@ static const struct space_vtab vinyl_space_vtab = {
 static const struct index_vtab vinyl_index_vtab = {
 	/* .destroy = */ vinyl_index_destroy,
 	/* .commit_create = */ vinyl_index_commit_create,
-	/* .abort_create = */ generic_index_abort_create,
+	/* .abort_create = */ vinyl_index_abort_create,
 	/* .commit_modify = */ vinyl_index_commit_modify,
 	/* .commit_drop = */ vinyl_index_commit_drop,
 	/* .update_def = */ generic_index_update_def,
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 172a1b01..195d4550 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -118,6 +118,7 @@ static const char *vy_log_type_name[] = {
 	[VY_LOG_SNAPSHOT]		= "snapshot",
 	[VY_LOG_TRUNCATE_LSM]		= "truncate_lsm",
 	[VY_LOG_MODIFY_LSM]		= "modify_lsm",
+	[VY_LOG_PREPARE_LSM]		= "prepare_lsm",
 };
 
 /** Metadata log object. */
@@ -174,6 +175,10 @@ static struct vy_log vy_log;
 static struct vy_recovery *
 vy_recovery_new_locked(int64_t signature, bool only_checkpoint);
 
+static int
+vy_recovery_process_record(struct vy_recovery *recovery,
+			   const struct vy_log_record *record);
+
 /**
  * Return the name of the vylog file that has the given signature.
  */
@@ -871,6 +876,14 @@ vy_log_end_recovery(void)
 {
 	assert(vy_log.recovery != NULL);
 
+	/*
+	 * Update the recovery context with records written during
+	 * recovery - we will need them for garbage collection.
+	 */
+	struct vy_log_record *record;
+	stailq_foreach_entry(record, &vy_log.tx, in_tx)
+		vy_recovery_process_record(vy_log.recovery, record);
+
 	/* Flush all pending records. */
 	if (vy_log_flush() < 0) {
 		diag_log();
@@ -1125,17 +1138,57 @@ vy_log_write(const struct vy_log_record *record)
  * vy_recovery::index_id_hash map.
  */
 static inline int64_t
-vy_recovery_index_id_hash(uint32_t space_id, uint32_t index_id)
+vy_recovery_index_id_hash_key(uint32_t space_id, uint32_t index_id)
 {
 	return ((uint64_t)space_id << 32) + index_id;
 }
 
+/**
+ * Insert an LSM tree into vy_recovery::index_id_hash.
+ *
+ * If there is an LSM tree with the same space_id/index_id
+ * present in the hash table, it will be silently replaced.
+ *
+ * Returns 0 on success, -1 on error.
+ */
+static int
+vy_recovery_hash_index_id(struct vy_recovery *recovery,
+			  struct vy_lsm_recovery_info *lsm)
+{
+	struct mh_i64ptr_t *h = recovery->index_id_hash;
+	struct mh_i64ptr_node_t node;
+
+	node.key = vy_recovery_index_id_hash_key(lsm->space_id, lsm->index_id);
+	node.val = lsm;
+
+	if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) {
+		diag_set(OutOfMemory, 0, "mh_i64ptr_put", "mh_i64ptr_node_t");
+		return -1;
+	}
+	return 0;
+}
+
+/**
+ * Remove the entry corresponding to the space_id/index_id
+ * from vy_recovery::index_id_hash.
+ */
+static void
+vy_recovery_unhash_index_id(struct vy_recovery *recovery,
+			    uint32_t space_id, uint32_t index_id)
+{
+	struct mh_i64ptr_t *h = recovery->index_id_hash;
+	int64_t key = vy_recovery_index_id_hash_key(space_id, index_id);
+	mh_int_t k = mh_i64ptr_find(h, key, NULL);
+	if (k != mh_end(h))
+		mh_i64ptr_del(h, k, NULL);
+}
+
 /** Lookup an LSM tree in vy_recovery::index_id_hash map. */
 struct vy_lsm_recovery_info *
 vy_recovery_lsm_by_index_id(struct vy_recovery *recovery,
 			    uint32_t space_id, uint32_t index_id)
 {
-	int64_t key = vy_recovery_index_id_hash(space_id, index_id);
+	int64_t key = vy_recovery_index_id_hash_key(space_id, index_id);
 	struct mh_i64ptr_t *h = recovery->index_id_hash;
 	mh_int_t k = mh_i64ptr_find(h, key, NULL);
 	if (k == mh_end(h))
@@ -1188,10 +1241,122 @@ vy_recovery_lookup_slice(struct vy_recovery *recovery, int64_t slice_id)
 }
 
 /**
+ * Allocate a new LSM tree with the given ID and add it to
+ * the recovery context.
+ *
+ * Returns the new LSM tree on success, NULL on error.
+ */
+static struct vy_lsm_recovery_info *
+vy_recovery_do_create_lsm(struct vy_recovery *recovery, int64_t id,
+			  uint32_t space_id, uint32_t index_id)
+{
+	struct vy_lsm_recovery_info *lsm = malloc(sizeof(*lsm));
+	if (lsm == NULL) {
+		diag_set(OutOfMemory, sizeof(*lsm),
+			 "malloc", "struct vy_lsm_recovery_info");
+		return NULL;
+	}
+	struct mh_i64ptr_t *h = recovery->lsm_hash;
+	struct mh_i64ptr_node_t node = { id, lsm };
+	struct mh_i64ptr_node_t *old_node = NULL;
+	if (mh_i64ptr_put(h, &node, &old_node, NULL) == mh_end(h)) {
+		diag_set(OutOfMemory, 0, "mh_i64ptr_put", "mh_i64ptr_node_t");
+		free(lsm);
+		return NULL;
+	}
+	assert(old_node == NULL);
+	lsm->id = id;
+	lsm->space_id = space_id;
+	lsm->index_id = index_id;
+	lsm->key_parts = NULL;
+	lsm->key_part_count = 0;
+	lsm->is_dropped = false;
+	lsm->create_lsn = -1;
+	lsm->modify_lsn = -1;
+	lsm->dump_lsn = -1;
+	lsm->prepared = NULL;
+	rlist_create(&lsm->ranges);
+	rlist_create(&lsm->runs);
+	/*
+	 * Keep newer LSM trees closer to the tail of the list
+	 * so that on log rotation we create/drop past incarnations
+	 * before the final version.
+	 */
+	rlist_add_tail_entry(&recovery->lsms, lsm, in_recovery);
+	if (recovery->max_id < id)
+		recovery->max_id = id;
+	return lsm;
+}
+
+/**
+ * Handle a VY_LOG_PREPARE_LSM log record.
+ *
+ * This function allocates a new LSM tree with the given ID and
+ * either associates it with the existing LSM tree hashed under
+ * the same space_id/index_id or inserts it into the hash if
+ * there's none.
+ *
+ * Note, we link incomplete LSM trees to index_id_hash (either
+ * directly or indirectly via vy_lsm_recovery_info::prepared),
+ * because an LSM tree may have been fully built and logged in
+ * WAL, but not committed to vylog. We need to be able to identify
+ * such LSM trees during local recovery so that instead of
+ * rebuilding them we can simply retry vylog write.
+ *
+ * Returns 0 on success, -1 on error.
+ */
+static int
+vy_recovery_prepare_lsm(struct vy_recovery *recovery, int64_t id,
+			uint32_t space_id, uint32_t index_id)
+{
+	if (vy_recovery_lookup_lsm(recovery, id) != NULL) {
+		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+			 tt_sprintf("Duplicate LSM tree id %lld",
+				    (long long)id));
+		return -1;
+	}
+	struct vy_lsm_recovery_info *new_lsm;
+	new_lsm = vy_recovery_do_create_lsm(recovery, id, space_id, index_id);
+	if (new_lsm == NULL)
+		return -1;
+
+	struct vy_lsm_recovery_info *lsm;
+	lsm = vy_recovery_lsm_by_index_id(recovery, space_id, index_id);
+	if (lsm == NULL) {
+		/*
+		 * There's no LSM tree for these space_id/index_id
+		 * in the recovery context. Insert the new LSM tree
+		 * into the index_id_hash.
+		 */
+		return vy_recovery_hash_index_id(recovery, new_lsm);
+	}
+
+	/*
+	 * If there's an LSM tree for the given space_id/index_id,
+	 * it can't be incomplete (i.e. it must be committed albeit
+	 * it may be dropped), neither can it have a prepared LSM
+	 * tree associated with it.
+	 */
+	if (lsm->create_lsn < 0 || lsm->prepared != NULL) {
+		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+			 tt_sprintf("Index %u/%u prepared twice",
+				    (unsigned)space_id, (unsigned)index_id));
+		return -1;
+	}
+
+	/* Link the new LSM tree to the existing one. */
+	lsm->prepared = new_lsm;
+	return 0;
+}
+
+/**
  * Handle a VY_LOG_CREATE_LSM log record.
- * This function allocates a new vinyl LSM tree with ID @id
- * and inserts it to the hash.
- * Return 0 on success, -1 on failure (ID collision or OOM).
+ *
+ * Depending on whether the LSM tree was previously prepared,
+ * this function either commits it or allocates a new one and
+ * inserts it into the recovery hash.
+ *
+ * Returns 0 on success, -1 on error.
  */
 static int
 vy_recovery_create_lsm(struct vy_recovery *recovery, int64_t id,
@@ -1200,116 +1365,81 @@ vy_recovery_create_lsm(struct vy_recovery *recovery, int64_t id,
 		       uint32_t key_part_count, int64_t create_lsn,
 		       int64_t modify_lsn, int64_t dump_lsn)
 {
-	struct vy_lsm_recovery_info *lsm;
-	struct key_part_def *key_parts_copy;
-	struct mh_i64ptr_node_t node;
-	struct mh_i64ptr_t *h;
-	mh_int_t k;
-
-	/*
-	 * Make a copy of the key definition to be used for
-	 * the new LSM tree incarnation.
-	 */
 	if (key_parts == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Missing key definition for LSM tree %lld",
 				    (long long)id));
 		return -1;
 	}
-	key_parts_copy = malloc(sizeof(*key_parts) * key_part_count);
-	if (key_parts_copy == NULL) {
-		diag_set(OutOfMemory, sizeof(*key_parts) * key_part_count,
-			 "malloc", "struct key_part_def");
-		return -1;
-	}
-	memcpy(key_parts_copy, key_parts, sizeof(*key_parts) * key_part_count);
-
-	/*
-	 * Look up the LSM tree in the hash.
-	 */
-	h = recovery->index_id_hash;
-	node.key = vy_recovery_index_id_hash(space_id, index_id);
-	k = mh_i64ptr_find(h, node.key, NULL);
-	lsm = (k != mh_end(h)) ? mh_i64ptr_node(h, k)->val : NULL;
 
-	if (lsm == NULL) {
+	struct vy_lsm_recovery_info *lsm;
+	lsm = vy_recovery_lookup_lsm(recovery, id);
+	if (lsm != NULL) {
 		/*
-		 * This is the first time the LSM tree is created
-		 * (there's no previous incarnation in the context).
-		 * Allocate a node for the LSM tree and add it to
-		 * the hash.
+		 * If the LSM tree already exists, it must be in
+		 * the prepared state (i.e. not committed or dropped).
 		 */
-		lsm = malloc(sizeof(*lsm));
-		if (lsm == NULL) {
-			diag_set(OutOfMemory, sizeof(*lsm),
-				 "malloc", "struct vy_lsm_recovery_info");
-			free(key_parts_copy);
-			return -1;
-		}
-		lsm->index_id = index_id;
-		lsm->space_id = space_id;
-		rlist_create(&lsm->ranges);
-		rlist_create(&lsm->runs);
-
-		node.val = lsm;
-		if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) {
-			diag_set(OutOfMemory, 0, "mh_i64ptr_put",
-				 "mh_i64ptr_node_t");
-			free(key_parts_copy);
-			free(lsm);
+		if (lsm->create_lsn >= 0 || lsm->is_dropped) {
+			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+				 tt_sprintf("Duplicate LSM tree id %lld",
+					    (long long)id));
 			return -1;
 		}
-		rlist_add_entry(&recovery->lsms, lsm, in_recovery);
 	} else {
-		/*
-		 * The LSM tree was dropped and recreated with the
-		 * same ID. Update its key definition (because it
-		 * could have changed since the last time it was
-		 * used) and reset its state.
-		 */
-		if (!lsm->is_dropped) {
-			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-				 tt_sprintf("LSM tree %u/%u created twice",
-					    (unsigned)space_id,
-					    (unsigned)index_id));
-			free(key_parts_copy);
+		lsm = vy_recovery_do_create_lsm(recovery, id,
+						space_id, index_id);
+		if (lsm == NULL)
 			return -1;
-		}
-		assert(lsm->index_id == index_id);
-		assert(lsm->space_id == space_id);
-		free(lsm->key_parts);
+		lsm->dump_lsn = dump_lsn;
 	}
 
-	lsm->id = id;
-	lsm->key_parts = key_parts_copy;
+	/* Set the key definition. */
+	assert(lsm->key_parts == NULL);
+	lsm->key_parts = malloc(sizeof(*key_parts) * key_part_count);
+	if (lsm->key_parts == NULL) {
+		diag_set(OutOfMemory, sizeof(*key_parts) * key_part_count,
+			 "malloc", "struct key_part_def");
+		return -1;
+	}
+	memcpy(lsm->key_parts, key_parts, sizeof(*key_parts) * key_part_count);
 	lsm->key_part_count = key_part_count;
-	lsm->is_dropped = false;
+
+	/* Mark the LSM tree committed by assigning LSN. */
 	lsm->create_lsn = create_lsn;
 	lsm->modify_lsn = modify_lsn;
-	lsm->dump_lsn = dump_lsn;
 
 	/*
-	 * Add the LSM tree to the hash.
+	 * Hash the new LSM tree under the given space_id/index_id.
+	 * First, look up the LSM tree that is presently in the hash.
 	 */
-	h = recovery->lsm_hash;
-	node.key = id;
-	node.val = lsm;
-	if (mh_i64ptr_find(h, id, NULL) != mh_end(h)) {
-		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Duplicate LSM tree id %lld",
-				    (long long)id));
-		return -1;
-	}
-	if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) {
-		diag_set(OutOfMemory, 0, "mh_i64ptr_put",
-			 "mh_i64ptr_node_t");
-		return -1;
+	struct vy_lsm_recovery_info *old_lsm;
+	old_lsm = vy_recovery_lsm_by_index_id(recovery, space_id, index_id);
+	if (old_lsm == lsm) {
+		/*
+		 * The new LSM tree is already hashed, nothing to do
+		 * (it was hashed on prepare).
+		 */
+		return 0;
 	}
 
-	if (recovery->max_id < id)
-		recovery->max_id = id;
+	/* Unlink the new LSM tree from the old one, if any. */
+	if (old_lsm != NULL) {
+		assert(old_lsm->create_lsn >= 0);
+		if (!old_lsm->is_dropped) {
+			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+				 tt_sprintf("LSM tree %u/%u created twice",
+					    (unsigned)space_id,
+					    (unsigned)index_id));
+			return -1;
+		}
+		if (old_lsm->prepared != NULL) {
+			assert(old_lsm->prepared == lsm);
+			old_lsm->prepared = NULL;
+		}
+	}
 
-	return 0;
+	/* Update the hash with the new LSM tree. */
+	return vy_recovery_hash_index_id(recovery, lsm);
 }
 
 /**
@@ -1388,6 +1518,35 @@ vy_recovery_drop_lsm(struct vy_recovery *recovery, int64_t id)
 		}
 	}
 	lsm->is_dropped = true;
+
+	if (lsm->create_lsn >= 0)
+		return 0;
+	/*
+	 * If the dropped LSM tree has never been committed,
+	 * it means that ALTER for the corresponding index was
+	 * aborted, hence we don't need to keep it in the
+	 * index_id_hash, because the LSM tree is pure garbage
+	 * and will never be recovered. Unlink it now.
+	 */
+	struct vy_lsm_recovery_info *hashed_lsm;
+	hashed_lsm = vy_recovery_lsm_by_index_id(recovery,
+				lsm->space_id, lsm->index_id);
+	if (hashed_lsm == lsm) {
+		/*
+		 * The LSM tree is linked to the index_id_hash
+		 * directly. Remove the corresponding hash entry.
+		 */
+		vy_recovery_unhash_index_id(recovery,
+				lsm->space_id, lsm->index_id);
+	} else {
+		/*
+		 * The LSM tree was linked to an existing LSM
+		 * tree via vy_lsm_recovery_info::prepared.
+		 * Clear the reference.
+		 */
+		assert(hashed_lsm->prepared == lsm);
+		hashed_lsm->prepared = NULL;
+	}
 	return 0;
 }
 
@@ -1817,6 +1976,10 @@ vy_recovery_process_record(struct vy_recovery *recovery,
 {
 	int rc;
 	switch (record->type) {
+	case VY_LOG_PREPARE_LSM:
+		rc = vy_recovery_prepare_lsm(recovery, record->lsm_id,
+				record->space_id, record->index_id);
+		break;
 	case VY_LOG_CREATE_LSM:
 		rc = vy_recovery_create_lsm(recovery, record->lsm_id,
 				record->space_id, record->index_id,
@@ -2070,7 +2233,8 @@ vy_log_append_lsm(struct xlog *xlog, struct vy_lsm_recovery_info *lsm)
 	struct vy_log_record record;
 
 	vy_log_record_init(&record);
-	record.type = VY_LOG_CREATE_LSM;
+	record.type = lsm->create_lsn < 0 ?
+		VY_LOG_PREPARE_LSM : VY_LOG_CREATE_LSM;
 	record.lsm_id = lsm->id;
 	record.index_id = lsm->index_id;
 	record.space_id = lsm->space_id;
diff --git a/src/box/vy_log.h b/src/box/vy_log.h
index 1b2b419f..eda337e0 100644
--- a/src/box/vy_log.h
+++ b/src/box/vy_log.h
@@ -169,6 +169,21 @@ enum vy_log_record_type {
 	 * Requires vy_log_record::lsm_id, key_def, modify_lsn.
 	 */
 	VY_LOG_MODIFY_LSM		= 13,
+	/*
+	 * Prepare a new LSM tree for building.
+	 * Requires vy_log_record::lsm_id, index_id, space_id.
+	 *
+	 * Index ALTER operation consists of two stages. First, we
+	 * build a new LSM tree, checking constraints if necessary.
+	 * This is done before writing the operation to WAL. Then,
+	 * provided the first stage succeeded, we commit the LSM
+	 * tree to the metadata log.
+	 *
+	 * The following record is used to prepare a new LSM tree
+	 * for building. Once the index has been built, we write
+	 * a VY_LOG_CREATE_LSM record to commit it.
+	 */
+	VY_LOG_PREPARE_LSM		= 14,
 
 	vy_log_record_type_MAX
 };
@@ -260,7 +275,12 @@ struct vy_lsm_recovery_info {
 	uint32_t key_part_count;
 	/** True if the LSM tree was dropped. */
 	bool is_dropped;
-	/** LSN of the WAL row that created the LSM tree. */
+	/**
+	 * LSN of the WAL row that created the LSM tree,
+	 * or -1 if the LSM tree was not committed to WAL
+	 * (that is there was an VY_LOG_PREPARE_LSM record
+	 * but no VY_LOG_CREATE_LSM).
+	 */
 	int64_t create_lsn;
 	/** LSN of the WAL row that last modified the LSM tree. */
 	int64_t modify_lsn;
@@ -277,6 +297,11 @@ struct vy_lsm_recovery_info {
 	 * vy_run_recovery_info::in_lsm.
 	 */
 	struct rlist runs;
+	/**
+	 * Pointer to an LSM tree that is going to replace
+	 * this one after successful ALTER.
+	 */
+	struct vy_lsm_recovery_info *prepared;
 };
 
 /** Vinyl range info stored in a recovery context. */
@@ -509,6 +534,19 @@ vy_log_record_init(struct vy_log_record *record)
 	memset(record, 0, sizeof(*record));
 }
 
+/** Helper to log a vinyl LSM tree preparation. */
+static inline void
+vy_log_prepare_lsm(int64_t id, uint32_t space_id, uint32_t index_id)
+{
+	struct vy_log_record record;
+	vy_log_record_init(&record);
+	record.type = VY_LOG_PREPARE_LSM;
+	record.lsm_id = id;
+	record.space_id = space_id;
+	record.index_id = index_id;
+	vy_log_write(&record);
+}
+
 /** Helper to log a vinyl LSM tree creation. */
 static inline void
 vy_log_create_lsm(int64_t id, uint32_t space_id, uint32_t index_id,
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index 4dfc0a25..63e18594 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -327,10 +327,6 @@ vy_lsm_create(struct vy_lsm *lsm)
 		return -1;
 	}
 
-	/* Assign unique id. */
-	assert(lsm->id < 0);
-	lsm->id = vy_log_next_id();
-
 	/* Allocate initial range. */
 	return vy_lsm_init_range_tree(lsm);
 }
@@ -505,10 +501,10 @@ vy_lsm_recover(struct vy_lsm *lsm, struct vy_recovery *recovery,
 	lsm_info = vy_recovery_lsm_by_index_id(recovery,
 			lsm->space_id, lsm->index_id);
 	if (is_checkpoint_recovery) {
-		if (lsm_info == NULL) {
+		if (lsm_info == NULL || lsm_info->create_lsn < 0) {
 			/*
 			 * All LSM trees created from snapshot rows must
-			 * be present in vylog, because snapshot can
+			 * be committed to vylog, because snapshot can
 			 * only succeed if vylog has been successfully
 			 * flushed.
 			 */
@@ -528,18 +524,30 @@ vy_lsm_recover(struct vy_lsm *lsm, struct vy_recovery *recovery,
 		}
 	}
 
-	if (lsm_info == NULL || lsn > lsm_info->create_lsn) {
+	if (lsm_info == NULL || (lsm_info->create_lsn >= 0 &&
+				 lsm_info->prepared == NULL &&
+				 lsn > lsm_info->create_lsn)) {
 		/*
 		 * If we failed to log LSM tree creation before restart,
 		 * we won't find it in the log on recovery. This is OK as
 		 * the LSM tree doesn't have any runs in this case. We will
 		 * retry to log LSM tree in vinyl_index_commit_create().
-		 * For now, just create the initial range and assign id.
+		 * For now, just create the initial range.
 		 */
-		lsm->id = vy_log_next_id();
 		return vy_lsm_init_range_tree(lsm);
 	}
 
+	if (lsm_info->create_lsn >= 0 && lsn > lsm_info->create_lsn) {
+		/*
+		 * The index we are recovering was prepared, successfully
+		 * built, and committed to WAL, but it was not committed
+		 * to vylog. Recover the prepared LSM tree. We will retry
+		 * to write it to vylog in vinyl_index_commit_create().
+		 */
+		lsm_info = lsm_info->prepared;
+		assert(lsm_info != NULL);
+	}
+
 	lsm->id = lsm_info->id;
 	lsm->commit_lsn = lsm_info->modify_lsn;
 
@@ -743,11 +751,20 @@ int
 vy_lsm_set(struct vy_lsm *lsm, struct vy_mem *mem,
 	   const struct tuple *stmt, const struct tuple **region_stmt)
 {
+	uint32_t format_id = stmt->format_id;
+
 	assert(vy_stmt_is_refable(stmt));
 	assert(*region_stmt == NULL || !vy_stmt_is_refable(*region_stmt));
 
-	/* Allocate region_stmt on demand. */
-	if (*region_stmt == NULL) {
+	/*
+	 * Allocate region_stmt on demand.
+	 *
+	 * Also, reallocate region_stmt if it uses a different tuple
+	 * format. This may happen during ALTER, when the LSM tree
+	 * that is currently being built uses the new space format
+	 * while other LSM trees still use the old space format.
+	 */
+	if (*region_stmt == NULL || (*region_stmt)->format_id != format_id) {
 		*region_stmt = vy_stmt_dup_lsregion(stmt, &mem->env->allocator,
 						    mem->generation);
 		if (*region_stmt == NULL)
@@ -758,7 +775,6 @@ vy_lsm_set(struct vy_lsm *lsm, struct vy_mem *mem,
 	lsm->stat.memory.count.bytes += tuple_size(stmt);
 
 	/* Abort transaction if format was changed by DDL */
-	uint32_t format_id = stmt->format_id;
 	if (format_id != tuple_format_id(mem->format_with_colmask) &&
 	    format_id != tuple_format_id(mem->format)) {
 		diag_set(ClientError, ER_TRANSACTION_CONFLICT);
diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h
index c7ccb7d6..8f7b4513 100644
--- a/src/box/vy_lsm.h
+++ b/src/box/vy_lsm.h
@@ -152,7 +152,12 @@ struct vy_lsm {
 	 * until all pending operations have completed.
 	 */
 	int refs;
-	/** Unique ID of this LSM tree. */
+	/**
+	 * Unique ID of this LSM tree.
+	 *
+	 * Assigned when the LSM tree is first logged
+	 * (VY_LOG_PREPARE_LSM or VY_LOG_CREATE_LSM).
+	 */
 	int64_t id;
 	/** ID of the index this LSM tree is for. */
 	uint32_t index_id;
diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c
index 65e31ea5..3954bf3a 100644
--- a/src/box/vy_mem.c
+++ b/src/box/vy_mem.c
@@ -245,11 +245,9 @@ vy_mem_commit_stmt(struct vy_mem *mem, const struct tuple *stmt)
 	/* The statement must be from a lsregion. */
 	assert(!vy_stmt_is_refable(stmt));
 	int64_t lsn = vy_stmt_lsn(stmt);
-	if (mem->min_lsn == INT64_MAX)
-		mem->min_lsn = lsn;
-	assert(mem->min_lsn <= lsn);
-	if (mem->max_lsn < lsn)
-		mem->max_lsn = lsn;
+	mem->min_lsn = MIN(mem->min_lsn, lsn);
+	mem->max_lsn = MAX(mem->max_lsn, lsn);
+	mem->version++;
 }
 
 void
diff --git a/src/box/vy_quota.h b/src/box/vy_quota.h
index 89f88bdc..73fc6a1b 100644
--- a/src/box/vy_quota.h
+++ b/src/box/vy_quota.h
@@ -175,6 +175,16 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 	return 0;
 }
 
+/**
+ * Block the caller until the quota is not exceeded.
+ */
+static inline void
+vy_quota_wait(struct vy_quota *q)
+{
+	while (q->used > q->limit)
+		fiber_cond_wait(&q->cond);
+}
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 912d9028..67c3423e 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -458,6 +458,35 @@ vy_scheduler_trigger_dump(struct vy_scheduler *scheduler)
 	fiber_cond_signal(&scheduler->scheduler_cond);
 }
 
+int
+vy_scheduler_dump(struct vy_scheduler *scheduler)
+{
+	/*
+	 * We must not start dump if checkpoint is in progress
+	 * so first wait for checkpoint to complete.
+	 */
+	while (scheduler->checkpoint_in_progress)
+		fiber_cond_wait(&scheduler->dump_cond);
+
+	/* Trigger dump. */
+	if (scheduler->generation == scheduler->dump_generation)
+		scheduler->dump_start = ev_monotonic_now(loop());
+	int64_t generation = ++scheduler->generation;
+	fiber_cond_signal(&scheduler->scheduler_cond);
+
+	/* Wait for dump to complete. */
+	while (scheduler->dump_generation < generation) {
+		if (scheduler->is_throttled) {
+			/* Dump error occurred. */
+			struct error *e = diag_last_error(&scheduler->diag);
+			diag_add_error(diag_get(), e);
+			return -1;
+		}
+		fiber_cond_wait(&scheduler->dump_cond);
+	}
+	return 0;
+}
+
 /**
  * Check whether the current dump round is complete.
  * If it is, free memory and proceed to the next dump round.
@@ -722,7 +751,7 @@ vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task)
 		goto delete_mems;
 	}
 
-	assert(new_run->info.min_lsn > lsm->dump_lsn);
+	assert(new_run->info.min_lsn >= lsm->dump_lsn);
 	assert(new_run->info.max_lsn <= dump_lsn);
 
 	/*
@@ -1119,11 +1148,12 @@ vy_task_compact_complete(struct vy_scheduler *scheduler, struct vy_task *task)
 		return -1;
 	}
 
-	if (gc_lsn < 0) {
+	if (gc_lsn < 0 || lsm->commit_lsn < 0) {
 		/*
 		 * If there is no last snapshot, i.e. we are in
-		 * the middle of join, we can delete compacted
-		 * run files right away.
+		 * the middle of join, or the index hasn't been
+		 * committed, i.e. we are building it right now,
+		 * we can delete compacted run files right away.
 		 */
 		vy_log_tx_begin();
 		rlist_foreach_entry(run, &unused_runs, in_unused) {
diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h
index 4db86152..7dbd6a96 100644
--- a/src/box/vy_scheduler.h
+++ b/src/box/vy_scheduler.h
@@ -202,6 +202,13 @@ void
 vy_scheduler_trigger_dump(struct vy_scheduler *scheduler);
 
 /**
+ * Trigger dump of all currently existing in-memory trees
+ * and wait until it is complete. Returns 0 on success.
+ */
+int
+vy_scheduler_dump(struct vy_scheduler *scheduler);
+
+/**
  * Schedule a checkpoint. Please call vy_scheduler_wait_checkpoint()
  * after that.
  */
diff --git a/test/box/alter.result b/test/box/alter.result
index a78dd3e9..91712674 100644
--- a/test/box/alter.result
+++ b/test/box/alter.result
@@ -817,507 +817,6 @@ ts:drop()
 ---
 ...
 --
--- gh-1557: NULL in indexes.
---
-NULL = require('msgpack').NULL
----
-...
-format = {}
----
-...
-format[1] = { name = 'field1', type = 'unsigned', is_nullable = true }
----
-...
-format[2] = { name = 'field2', type = 'unsigned', is_nullable = true }
----
-...
-s = box.schema.space.create('test', { format = format })
----
-...
-s:create_index('primary', { parts = { 'field1' } })
----
-- error: Primary index of the space 'test' can not contain nullable parts
-...
-s:create_index('primary', { parts = {{'field1', is_nullable = false}} })
----
-- error: Field 1 is nullable in space format, but not nullable in index parts
-...
-format[1].is_nullable = false
----
-...
-s:format(format)
----
-...
-s:create_index('primary', { parts = {{'field1', is_nullable = true}} })
----
-- error: Primary index of the space 'test' can not contain nullable parts
-...
-s:create_index('primary', { parts = {'field1'} })
----
-- unique: true
-  parts:
-  - type: unsigned
-    is_nullable: false
-    fieldno: 1
-  id: 0
-  space_id: 733
-  name: primary
-  type: TREE
-...
--- Check that is_nullable can't be set to false on non-empty space
-s:insert({1, NULL})
----
-- [1, null]
-...
-format[1].is_nullable = true
----
-...
-s:format(format)
----
-- error: Field 1 is nullable in space format, but not nullable in index parts
-...
-format[1].is_nullable = false
----
-...
-format[2].is_nullable = false
----
-...
-s:format(format)
----
-- error: 'Tuple field 2 type does not match one required by operation: expected unsigned'
-...
-s:delete(1)
----
-- [1, null]
-...
--- Disable is_nullable on empty space
-s:format(format)
----
-...
--- Disable is_nullable on a non-empty space.
-format[2].is_nullable = true
----
-...
-s:format(format)
----
-...
-s:replace{1, 1}
----
-- [1, 1]
-...
-format[2].is_nullable = false
----
-...
-s:format(format)
----
-...
--- Enable is_nullable on a non-empty space.
-format[2].is_nullable = true
----
-...
-s:format(format)
----
-...
-s:replace{1, box.NULL}
----
-- [1, null]
-...
-s:delete{1}
----
-- [1, null]
-...
-s:format({})
----
-...
-s:create_index('secondary', { parts = {{2, 'string', is_nullable = true}} })
----
-- unique: true
-  parts:
-  - type: string
-    is_nullable: true
-    fieldno: 2
-  id: 1
-  space_id: 733
-  name: secondary
-  type: TREE
-...
-s:insert({1, NULL})
----
-- [1, null]
-...
-s.index.secondary:alter({ parts = {{2, 'string', is_nullable = false} }})
----
-- error: 'Tuple field 2 type does not match one required by operation: expected string'
-...
-s:delete({1})
----
-- [1, null]
-...
-s.index.secondary:alter({ parts = {{2, 'string', is_nullable = false} }})
----
-...
-s:insert({1, NULL})
----
-- error: 'Tuple field 2 type does not match one required by operation: expected string'
-...
-s:insert({2, 'xxx'})
----
-- [2, 'xxx']
-...
-s.index.secondary:alter({ parts = {{2, 'string', is_nullable = true} }})
----
-...
-s:insert({1, NULL})
----
-- [1, null]
-...
-s:drop()
----
-...
-s = box.schema.create_space('test')
----
-...
-test_run:cmd("setopt delimiter ';'")
----
-- true
-...
-s:format({
-    [1] = { name = 'id1', type = 'unsigned'},
-    [2] = { name = 'id2', type = 'unsigned'},
-    [3] = { name = 'id3', type = 'string'},
-    [4] = { name = 'id4', type = 'string'},
-    [5] = { name = 'id5', type = 'string'},
-    [6] = { name = 'id6', type = 'string'},
-});
----
-...
-test_run:cmd("setopt delimiter ''");
----
-- true
-...
-s:format()
----
-- [{'name': 'id1', 'type': 'unsigned'}, {'name': 'id2', 'type': 'unsigned'}, {'name': 'id3',
-    'type': 'string'}, {'name': 'id4', 'type': 'string'}, {'name': 'id5', 'type': 'string'},
-  {'name': 'id6', 'type': 'string'}]
-...
-_ = s:create_index('primary')
----
-...
-s:insert({1, 1, 'a', 'b', 'c', 'd'})
----
-- [1, 1, 'a', 'b', 'c', 'd']
-...
-s:drop()
----
-...
-s = box.schema.create_space('test')
----
-...
-idx = s:create_index('idx')
----
-...
-box.space.test == s
----
-- true
-...
-s:drop()
----
-...
---
--- gh-3000: index modifying must change key_def parts and
--- comparators. They can be changed, if there was compatible index
--- parts change. For example, a part type was changed from
--- unsigned to number. In such a case comparators must be reset
--- and part types updated.
---
-s = box.schema.create_space('test')
----
-...
-pk = s:create_index('pk')
----
-...
-s:replace{1}
----
-- [1]
-...
-pk:alter{parts = {{1, 'integer'}}}
----
-...
-s:replace{-2}
----
-- [-2]
-...
-s:select{}
----
-- - [-2]
-  - [1]
-...
-s:drop()
----
-...
---
--- Allow to change is_nullable in index definition on non-empty
--- space.
---
-s = box.schema.create_space('test')
----
-...
-pk = s:create_index('pk')
----
-...
-sk1 = s:create_index('sk1', {parts = {{2, 'unsigned', is_nullable = true}}})
----
-...
-sk2 = s:create_index('sk2', {parts = {{3, 'unsigned', is_nullable = false}}})
----
-...
-s:replace{1, box.NULL, 1}
----
-- [1, null, 1]
-...
-sk1:alter({parts = {{2, 'unsigned', is_nullable = false}}})
----
-- error: 'Tuple field 2 type does not match one required by operation: expected unsigned'
-...
-s:replace{1, 1, 1}
----
-- [1, 1, 1]
-...
-sk1:alter({parts = {{2, 'unsigned', is_nullable = false}}})
----
-...
-s:replace{1, 1, box.NULL}
----
-- error: 'Tuple field 3 type does not match one required by operation: expected unsigned'
-...
-sk2:alter({parts = {{3, 'unsigned', is_nullable = true}}})
----
-...
-s:replace{1, 1, box.NULL}
----
-- [1, 1, null]
-...
-s:replace{2, 10, 100}
----
-- [2, 10, 100]
-...
-s:replace{3, 0, 20}
----
-- [3, 0, 20]
-...
-s:replace{4, 15, 150}
----
-- [4, 15, 150]
-...
-s:replace{5, 9, box.NULL}
----
-- [5, 9, null]
-...
-sk1:select{}
----
-- - [3, 0, 20]
-  - [1, 1, null]
-  - [5, 9, null]
-  - [2, 10, 100]
-  - [4, 15, 150]
-...
-sk2:select{}
----
-- - [1, 1, null]
-  - [5, 9, null]
-  - [3, 0, 20]
-  - [2, 10, 100]
-  - [4, 15, 150]
-...
-s:drop()
----
-...
---
--- gh-3008: allow multiple types on the same field.
---
-format = {}
----
-...
-format[1] = {name = 'field1', type = 'unsigned'}
----
-...
-format[2] = {name = 'field2', type = 'scalar'}
----
-...
-format[3] = {name = 'field3', type = 'integer'}
----
-...
-s = box.schema.create_space('test', {format = format})
----
-...
-pk = s:create_index('pk')
----
-...
-sk1 = s:create_index('sk1', {parts = {{2, 'number'}}})
----
-...
-sk2 = s:create_index('sk2', {parts = {{2, 'integer'}}})
----
-...
-sk3 = s:create_index('sk3', {parts = {{2, 'unsigned'}}})
----
-...
-sk4 = s:create_index('sk4', {parts = {{3, 'number'}}})
----
-...
-s:format()
----
-- [{'name': 'field1', 'type': 'unsigned'}, {'name': 'field2', 'type': 'scalar'}, {
-    'name': 'field3', 'type': 'integer'}]
-...
-s:replace{1, '100', -20.2}
----
-- error: 'Tuple field 2 type does not match one required by operation: expected unsigned'
-...
-s:replace{1, 100, -20.2}
----
-- error: 'Tuple field 3 type does not match one required by operation: expected integer'
-...
-s:replace{1, 100, -20}
----
-- [1, 100, -20]
-...
-s:replace{2, 50, 0}
----
-- [2, 50, 0]
-...
-s:replace{3, 150, -60}
----
-- [3, 150, -60]
-...
-s:replace{4, 0, 120}
----
-- [4, 0, 120]
-...
-pk:select{}
----
-- - [1, 100, -20]
-  - [2, 50, 0]
-  - [3, 150, -60]
-  - [4, 0, 120]
-...
-sk1:select{}
----
-- - [4, 0, 120]
-  - [2, 50, 0]
-  - [1, 100, -20]
-  - [3, 150, -60]
-...
-sk2:select{}
----
-- - [4, 0, 120]
-  - [2, 50, 0]
-  - [1, 100, -20]
-  - [3, 150, -60]
-...
-sk3:select{}
----
-- - [4, 0, 120]
-  - [2, 50, 0]
-  - [1, 100, -20]
-  - [3, 150, -60]
-...
-sk4:select{}
----
-- - [3, 150, -60]
-  - [1, 100, -20]
-  - [2, 50, 0]
-  - [4, 0, 120]
-...
-sk1:alter{parts = {{2, 'unsigned'}}}
----
-...
-sk2:alter{parts = {{2, 'unsigned'}}}
----
-...
-sk4:alter{parts = {{3, 'integer'}}}
----
-...
-s:replace{1, 50.5, 1.5}
----
-- error: 'Tuple field 2 type does not match one required by operation: expected unsigned'
-...
-s:replace{1, 50, 1.5}
----
-- error: 'Tuple field 3 type does not match one required by operation: expected integer'
-...
-s:replace{5, 5, 5}
----
-- [5, 5, 5]
-...
-sk1:select{}
----
-- - [4, 0, 120]
-  - [5, 5, 5]
-  - [2, 50, 0]
-  - [1, 100, -20]
-  - [3, 150, -60]
-...
-sk2:select{}
----
-- - [4, 0, 120]
-  - [5, 5, 5]
-  - [2, 50, 0]
-  - [1, 100, -20]
-  - [3, 150, -60]
-...
-sk3:select{}
----
-- - [4, 0, 120]
-  - [5, 5, 5]
-  - [2, 50, 0]
-  - [1, 100, -20]
-  - [3, 150, -60]
-...
-sk4:select{}
----
-- - [3, 150, -60]
-  - [1, 100, -20]
-  - [2, 50, 0]
-  - [5, 5, 5]
-  - [4, 0, 120]
-...
-sk1:drop()
----
-...
-sk2:drop()
----
-...
-sk3:drop()
----
-...
--- Remove 'unsigned' constraints from indexes, and 'scalar' now
--- can be inserted in the second field.
-s:replace{1, true, 100}
----
-- [1, true, 100]
-...
-s:select{}
----
-- - [1, true, 100]
-  - [2, 50, 0]
-  - [3, 150, -60]
-  - [4, 0, 120]
-  - [5, 5, 5]
-...
-sk4:select{}
----
-- - [3, 150, -60]
-  - [2, 50, 0]
-  - [5, 5, 5]
-  - [1, true, 100]
-  - [4, 0, 120]
-...
-s:drop()
----
-...
---
 -- gh-2914: Allow any space name which consists of printable characters
 --
 identifier = require("identifier")
diff --git a/test/box/alter.test.lua b/test/box/alter.test.lua
index ab713584..b088478d 100644
--- a/test/box/alter.test.lua
+++ b/test/box/alter.test.lua
@@ -316,165 +316,6 @@ n
 ts:drop()
 
 --
--- gh-1557: NULL in indexes.
---
-
-NULL = require('msgpack').NULL
-
-format = {}
-format[1] = { name = 'field1', type = 'unsigned', is_nullable = true }
-format[2] = { name = 'field2', type = 'unsigned', is_nullable = true }
-s = box.schema.space.create('test', { format = format })
-s:create_index('primary', { parts = { 'field1' } })
-s:create_index('primary', { parts = {{'field1', is_nullable = false}} })
-format[1].is_nullable = false
-s:format(format)
-s:create_index('primary', { parts = {{'field1', is_nullable = true}} })
-
-s:create_index('primary', { parts = {'field1'} })
-
--- Check that is_nullable can't be set to false on non-empty space
-s:insert({1, NULL})
-format[1].is_nullable = true
-s:format(format)
-format[1].is_nullable = false
-format[2].is_nullable = false
-s:format(format)
-s:delete(1)
--- Disable is_nullable on empty space
-s:format(format)
--- Disable is_nullable on a non-empty space.
-format[2].is_nullable = true
-s:format(format)
-s:replace{1, 1}
-format[2].is_nullable = false
-s:format(format)
--- Enable is_nullable on a non-empty space.
-format[2].is_nullable = true
-s:format(format)
-s:replace{1, box.NULL}
-s:delete{1}
-s:format({})
-
-s:create_index('secondary', { parts = {{2, 'string', is_nullable = true}} })
-s:insert({1, NULL})
-s.index.secondary:alter({ parts = {{2, 'string', is_nullable = false} }})
-s:delete({1})
-s.index.secondary:alter({ parts = {{2, 'string', is_nullable = false} }})
-s:insert({1, NULL})
-s:insert({2, 'xxx'})
-s.index.secondary:alter({ parts = {{2, 'string', is_nullable = true} }})
-s:insert({1, NULL})
-
-s:drop()
-
-s = box.schema.create_space('test')
-test_run:cmd("setopt delimiter ';'")
-s:format({
-    [1] = { name = 'id1', type = 'unsigned'},
-    [2] = { name = 'id2', type = 'unsigned'},
-    [3] = { name = 'id3', type = 'string'},
-    [4] = { name = 'id4', type = 'string'},
-    [5] = { name = 'id5', type = 'string'},
-    [6] = { name = 'id6', type = 'string'},
-});
-test_run:cmd("setopt delimiter ''");
-s:format()
-_ = s:create_index('primary')
-s:insert({1, 1, 'a', 'b', 'c', 'd'})
-s:drop()
-
-s = box.schema.create_space('test')
-idx = s:create_index('idx')
-box.space.test == s
-s:drop()
-
---
--- gh-3000: index modifying must change key_def parts and
--- comparators. They can be changed, if there was compatible index
--- parts change. For example, a part type was changed from
--- unsigned to number. In such a case comparators must be reset
--- and part types updated.
---
-s = box.schema.create_space('test')
-pk = s:create_index('pk')
-s:replace{1}
-pk:alter{parts = {{1, 'integer'}}}
-s:replace{-2}
-s:select{}
-s:drop()
-
---
--- Allow to change is_nullable in index definition on non-empty
--- space.
---
-s = box.schema.create_space('test')
-pk = s:create_index('pk')
-sk1 = s:create_index('sk1', {parts = {{2, 'unsigned', is_nullable = true}}})
-sk2 = s:create_index('sk2', {parts = {{3, 'unsigned', is_nullable = false}}})
-s:replace{1, box.NULL, 1}
-sk1:alter({parts = {{2, 'unsigned', is_nullable = false}}})
-s:replace{1, 1, 1}
-sk1:alter({parts = {{2, 'unsigned', is_nullable = false}}})
-s:replace{1, 1, box.NULL}
-sk2:alter({parts = {{3, 'unsigned', is_nullable = true}}})
-s:replace{1, 1, box.NULL}
-s:replace{2, 10, 100}
-s:replace{3, 0, 20}
-s:replace{4, 15, 150}
-s:replace{5, 9, box.NULL}
-sk1:select{}
-sk2:select{}
-s:drop()
-
---
--- gh-3008: allow multiple types on the same field.
---
-format = {}
-format[1] = {name = 'field1', type = 'unsigned'}
-format[2] = {name = 'field2', type = 'scalar'}
-format[3] = {name = 'field3', type = 'integer'}
-s = box.schema.create_space('test', {format = format})
-pk = s:create_index('pk')
-sk1 = s:create_index('sk1', {parts = {{2, 'number'}}})
-sk2 = s:create_index('sk2', {parts = {{2, 'integer'}}})
-sk3 = s:create_index('sk3', {parts = {{2, 'unsigned'}}})
-sk4 = s:create_index('sk4', {parts = {{3, 'number'}}})
-s:format()
-s:replace{1, '100', -20.2}
-s:replace{1, 100, -20.2}
-s:replace{1, 100, -20}
-s:replace{2, 50, 0}
-s:replace{3, 150, -60}
-s:replace{4, 0, 120}
-pk:select{}
-sk1:select{}
-sk2:select{}
-sk3:select{}
-sk4:select{}
-
-sk1:alter{parts = {{2, 'unsigned'}}}
-sk2:alter{parts = {{2, 'unsigned'}}}
-sk4:alter{parts = {{3, 'integer'}}}
-s:replace{1, 50.5, 1.5}
-s:replace{1, 50, 1.5}
-s:replace{5, 5, 5}
-sk1:select{}
-sk2:select{}
-sk3:select{}
-sk4:select{}
-
-sk1:drop()
-sk2:drop()
-sk3:drop()
--- Remove 'unsigned' constraints from indexes, and 'scalar' now
--- can be inserted in the second field.
-s:replace{1, true, 100}
-s:select{}
-sk4:select{}
-s:drop()
-
---
 -- gh-2914: Allow any space name which consists of printable characters
 --
 identifier = require("identifier")
diff --git a/test/engine/ddl.result b/test/engine/ddl.result
index 04062ac1..6c915879 100644
--- a/test/engine/ddl.result
+++ b/test/engine/ddl.result
@@ -1352,3 +1352,675 @@ s:select()
 s:drop()
 ---
 ...
+--
+-- gh-1557: NULL in indexes.
+--
+NULL = require('msgpack').NULL
+---
+...
+format = {}
+---
+...
+format[1] = { name = 'field1', type = 'unsigned', is_nullable = true }
+---
+...
+format[2] = { name = 'field2', type = 'unsigned', is_nullable = true }
+---
+...
+s = box.schema.space.create('test', {engine = engine, format = format})
+---
+...
+s:create_index('primary', { parts = { 'field1' } })
+---
+- error: Primary index of the space 'test' can not contain nullable parts
+...
+s:create_index('primary', { parts = {{'field1', is_nullable = false}} })
+---
+- error: Field 1 is nullable in space format, but not nullable in index parts
+...
+format[1].is_nullable = false
+---
+...
+s:format(format)
+---
+...
+s:create_index('primary', { parts = {{'field1', is_nullable = true}} })
+---
+- error: Primary index of the space 'test' can not contain nullable parts
+...
+i = s:create_index('primary', { parts = {'field1'} })
+---
+...
+i.parts
+---
+- - type: unsigned
+    is_nullable: false
+    fieldno: 1
+...
+-- Check that is_nullable can't be set to false on non-empty space
+s:insert({1, NULL})
+---
+- [1, null]
+...
+format[1].is_nullable = true
+---
+...
+s:format(format)
+---
+- error: Field 1 is nullable in space format, but not nullable in index parts
+...
+format[1].is_nullable = false
+---
+...
+format[2].is_nullable = false
+---
+...
+s:format(format)
+---
+- error: 'Tuple field 2 type does not match one required by operation: expected unsigned'
+...
+_ = s:delete(1)
+---
+...
+-- Disable is_nullable on empty space
+s:format(format)
+---
+...
+-- Disable is_nullable on a non-empty space.
+format[2].is_nullable = true
+---
+...
+s:format(format)
+---
+...
+s:replace{1, 1}
+---
+- [1, 1]
+...
+format[2].is_nullable = false
+---
+...
+s:format(format)
+---
+...
+-- Enable is_nullable on a non-empty space.
+format[2].is_nullable = true
+---
+...
+s:format(format)
+---
+...
+s:replace{1, box.NULL}
+---
+- [1, null]
+...
+_ = s:delete{1}
+---
+...
+s:format({})
+---
+...
+i = s:create_index('secondary', { parts = {{2, 'string', is_nullable = true}} })
+---
+...
+i.parts
+---
+- - type: string
+    is_nullable: true
+    fieldno: 2
+...
+s:insert({1, NULL})
+---
+- [1, null]
+...
+s.index.secondary:alter({ parts = {{2, 'string', is_nullable = false} }})
+---
+- error: 'Tuple field 2 type does not match one required by operation: expected string'
+...
+_ = s:delete({1})
+---
+...
+s.index.secondary:alter({ parts = {{2, 'string', is_nullable = false} }})
+---
+...
+s:insert({1, NULL})
+---
+- error: 'Tuple field 2 type does not match one required by operation: expected string'
+...
+s:insert({2, 'xxx'})
+---
+- [2, 'xxx']
+...
+s.index.secondary:alter({ parts = {{2, 'string', is_nullable = true} }})
+---
+...
+s:insert({1, NULL})
+---
+- [1, null]
+...
+s:drop()
+---
+...
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+inspector:cmd("setopt delimiter ';'")
+---
+- true
+...
+s:format({
+    [1] = { name = 'id1', type = 'unsigned'},
+    [2] = { name = 'id2', type = 'unsigned'},
+    [3] = { name = 'id3', type = 'string'},
+    [4] = { name = 'id4', type = 'string'},
+    [5] = { name = 'id5', type = 'string'},
+    [6] = { name = 'id6', type = 'string'},
+});
+---
+...
+inspector:cmd("setopt delimiter ''");
+---
+- true
+...
+s:format()
+---
+- [{'name': 'id1', 'type': 'unsigned'}, {'name': 'id2', 'type': 'unsigned'}, {'name': 'id3',
+    'type': 'string'}, {'name': 'id4', 'type': 'string'}, {'name': 'id5', 'type': 'string'},
+  {'name': 'id6', 'type': 'string'}]
+...
+_ = s:create_index('primary')
+---
+...
+s:insert({1, 1, 'a', 'b', 'c', 'd'})
+---
+- [1, 1, 'a', 'b', 'c', 'd']
+...
+s:drop()
+---
+...
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+idx = s:create_index('idx')
+---
+...
+box.space.test == s
+---
+- true
+...
+s:drop()
+---
+...
+--
+-- gh-3000: index modifying must change key_def parts and
+-- comparators. They can be changed, if there was compatible index
+-- parts change. For example, a part type was changed from
+-- unsigned to number. In such a case comparators must be reset
+-- and part types updated.
+--
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1}
+---
+- [1]
+...
+pk:alter{parts = {{1, 'integer'}}}
+---
+...
+s:replace{-2}
+---
+- [-2]
+...
+s:select{}
+---
+- - [-2]
+  - [1]
+...
+s:drop()
+---
+...
+--
+-- Allow to change is_nullable in index definition on non-empty
+-- space.
+--
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+pk = s:create_index('pk')
+---
+...
+sk1 = s:create_index('sk1', {parts = {{2, 'unsigned', is_nullable = true}}})
+---
+...
+sk2 = s:create_index('sk2', {parts = {{3, 'unsigned', is_nullable = false}}})
+---
+...
+s:replace{1, box.NULL, 1}
+---
+- [1, null, 1]
+...
+sk1:alter({parts = {{2, 'unsigned', is_nullable = false}}})
+---
+- error: 'Tuple field 2 type does not match one required by operation: expected unsigned'
+...
+s:replace{1, 1, 1}
+---
+- [1, 1, 1]
+...
+sk1:alter({parts = {{2, 'unsigned', is_nullable = false}}})
+---
+...
+s:replace{1, 1, box.NULL}
+---
+- error: 'Tuple field 3 type does not match one required by operation: expected unsigned'
+...
+sk2:alter({parts = {{3, 'unsigned', is_nullable = true}}})
+---
+...
+s:replace{1, 1, box.NULL}
+---
+- [1, 1, null]
+...
+s:replace{2, 10, 100}
+---
+- [2, 10, 100]
+...
+s:replace{3, 0, 20}
+---
+- [3, 0, 20]
+...
+s:replace{4, 15, 150}
+---
+- [4, 15, 150]
+...
+s:replace{5, 9, box.NULL}
+---
+- [5, 9, null]
+...
+sk1:select{}
+---
+- - [3, 0, 20]
+  - [1, 1, null]
+  - [5, 9, null]
+  - [2, 10, 100]
+  - [4, 15, 150]
+...
+sk2:select{}
+---
+- - [1, 1, null]
+  - [5, 9, null]
+  - [3, 0, 20]
+  - [2, 10, 100]
+  - [4, 15, 150]
+...
+s:drop()
+---
+...
+--
+-- gh-3008: allow multiple types on the same field.
+--
+format = {}
+---
+...
+format[1] = {name = 'field1', type = 'unsigned'}
+---
+...
+format[2] = {name = 'field2', type = 'scalar'}
+---
+...
+format[3] = {name = 'field3', type = 'integer'}
+---
+...
+s = box.schema.space.create('test', {engine = engine, format = format})
+---
+...
+pk = s:create_index('pk')
+---
+...
+sk1 = s:create_index('sk1', {parts = {{2, 'number'}}})
+---
+...
+sk2 = s:create_index('sk2', {parts = {{2, 'integer'}}})
+---
+...
+sk3 = s:create_index('sk3', {parts = {{2, 'unsigned'}}})
+---
+...
+sk4 = s:create_index('sk4', {parts = {{3, 'number'}}})
+---
+...
+s:format()
+---
+- [{'name': 'field1', 'type': 'unsigned'}, {'name': 'field2', 'type': 'scalar'}, {
+    'name': 'field3', 'type': 'integer'}]
+...
+s:replace{1, '100', -20.2}
+---
+- error: 'Tuple field 2 type does not match one required by operation: expected unsigned'
+...
+s:replace{1, 100, -20.2}
+---
+- error: 'Tuple field 3 type does not match one required by operation: expected integer'
+...
+s:replace{1, 100, -20}
+---
+- [1, 100, -20]
+...
+s:replace{2, 50, 0}
+---
+- [2, 50, 0]
+...
+s:replace{3, 150, -60}
+---
+- [3, 150, -60]
+...
+s:replace{4, 0, 120}
+---
+- [4, 0, 120]
+...
+pk:select{}
+---
+- - [1, 100, -20]
+  - [2, 50, 0]
+  - [3, 150, -60]
+  - [4, 0, 120]
+...
+sk1:select{}
+---
+- - [4, 0, 120]
+  - [2, 50, 0]
+  - [1, 100, -20]
+  - [3, 150, -60]
+...
+sk2:select{}
+---
+- - [4, 0, 120]
+  - [2, 50, 0]
+  - [1, 100, -20]
+  - [3, 150, -60]
+...
+sk3:select{}
+---
+- - [4, 0, 120]
+  - [2, 50, 0]
+  - [1, 100, -20]
+  - [3, 150, -60]
+...
+sk4:select{}
+---
+- - [3, 150, -60]
+  - [1, 100, -20]
+  - [2, 50, 0]
+  - [4, 0, 120]
+...
+sk1:alter{parts = {{2, 'unsigned'}}}
+---
+...
+sk2:alter{parts = {{2, 'unsigned'}}}
+---
+...
+sk4:alter{parts = {{3, 'integer'}}}
+---
+...
+s:replace{1, 50.5, 1.5}
+---
+- error: 'Tuple field 2 type does not match one required by operation: expected unsigned'
+...
+s:replace{1, 50, 1.5}
+---
+- error: 'Tuple field 3 type does not match one required by operation: expected integer'
+...
+s:replace{5, 5, 5}
+---
+- [5, 5, 5]
+...
+sk1:select{}
+---
+- - [4, 0, 120]
+  - [5, 5, 5]
+  - [2, 50, 0]
+  - [1, 100, -20]
+  - [3, 150, -60]
+...
+sk2:select{}
+---
+- - [4, 0, 120]
+  - [5, 5, 5]
+  - [2, 50, 0]
+  - [1, 100, -20]
+  - [3, 150, -60]
+...
+sk3:select{}
+---
+- - [4, 0, 120]
+  - [5, 5, 5]
+  - [2, 50, 0]
+  - [1, 100, -20]
+  - [3, 150, -60]
+...
+sk4:select{}
+---
+- - [3, 150, -60]
+  - [1, 100, -20]
+  - [2, 50, 0]
+  - [5, 5, 5]
+  - [4, 0, 120]
+...
+sk1:drop()
+---
+...
+sk2:drop()
+---
+...
+sk3:drop()
+---
+...
+-- Remove 'unsigned' constraints from indexes, and 'scalar' now
+-- can be inserted in the second field.
+s:replace{1, true, 100}
+---
+- [1, true, 100]
+...
+s:select{}
+---
+- - [1, true, 100]
+  - [2, 50, 0]
+  - [3, 150, -60]
+  - [4, 0, 120]
+  - [5, 5, 5]
+...
+sk4:select{}
+---
+- - [3, 150, -60]
+  - [2, 50, 0]
+  - [5, 5, 5]
+  - [1, true, 100]
+  - [4, 0, 120]
+...
+s:drop()
+---
+...
+--
+-- Creating/altering a secondary index of a non-empty space.
+--
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+_ = s:create_index('pk')
+---
+...
+_ = s:insert{1, 'zzz', 'aaa', 999}
+---
+...
+_ = s:insert{2, 'yyy', 'bbb', 888}
+---
+...
+_ = s:insert{3, 'xxx', 'ccc', 777}
+---
+...
+box.snapshot()
+---
+- ok
+...
+_ = s:update(1, {{'!', -1, 'eee'}})
+---
+...
+_ = s:upsert({2, '2', '2', -2}, {{'=', 4, -888}})
+---
+...
+_ = s:replace(s:get(3):update{{'=', 3, box.NULL}})
+---
+...
+_ = s:upsert({4, 'zzz', 'ddd', -666}, {{'!', -1, 'abc'}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+_ = s:update(1, {{'=', 5, 'fff'}})
+---
+...
+_ = s:upsert({3, '3', '3', -3}, {{'=', 5, 'ggg'}})
+---
+...
+_ = s:insert{5, 'xxx', 'eee', 555, 'hhh'}
+---
+...
+_ = s:replace{6, 'yyy', box.NULL, -444}
+---
+...
+s:select()
+---
+- - [1, 'zzz', 'aaa', 999, 'fff']
+  - [2, 'yyy', 'bbb', -888]
+  - [3, 'xxx', null, 777, 'ggg']
+  - [4, 'zzz', 'ddd', -666]
+  - [5, 'xxx', 'eee', 555, 'hhh']
+  - [6, 'yyy', null, -444]
+...
+s:create_index('sk', {parts = {2, 'string'}}) -- error: unique constraint
+---
+- error: Duplicate key exists in unique index 'sk' in space 'test'
+...
+s:create_index('sk', {parts = {3, 'string'}}) -- error: nullability constraint
+---
+- error: 'Tuple field 3 type does not match one required by operation: expected string'
+...
+s:create_index('sk', {parts = {4, 'unsigned'}}) -- error: field type
+---
+- error: 'Tuple field 4 type does not match one required by operation: expected unsigned'
+...
+s:create_index('sk', {parts = {4, 'integer', 5, 'string'}}) -- error: field missing
+---
+- error: Tuple field count 4 is less than required by space format or defined indexes
+    (expected at least 5)
+...
+i1 = s:create_index('i1', {parts = {2, 'string'}, unique = false})
+---
+...
+i2 = s:create_index('i2', {parts = {{3, 'string', is_nullable = true}}})
+---
+...
+i3 = s:create_index('i3', {parts = {4, 'integer'}})
+---
+...
+i1:select()
+---
+- - [3, 'xxx', null, 777, 'ggg']
+  - [5, 'xxx', 'eee', 555, 'hhh']
+  - [2, 'yyy', 'bbb', -888]
+  - [6, 'yyy', null, -444]
+  - [1, 'zzz', 'aaa', 999, 'fff']
+  - [4, 'zzz', 'ddd', -666]
+...
+i2:select()
+---
+- - [3, 'xxx', null, 777, 'ggg']
+  - [6, 'yyy', null, -444]
+  - [1, 'zzz', 'aaa', 999, 'fff']
+  - [2, 'yyy', 'bbb', -888]
+  - [4, 'zzz', 'ddd', -666]
+  - [5, 'xxx', 'eee', 555, 'hhh']
+...
+i3:select()
+---
+- - [2, 'yyy', 'bbb', -888]
+  - [4, 'zzz', 'ddd', -666]
+  - [6, 'yyy', null, -444]
+  - [5, 'xxx', 'eee', 555, 'hhh']
+  - [3, 'xxx', null, 777, 'ggg']
+  - [1, 'zzz', 'aaa', 999, 'fff']
+...
+i1:alter{unique = true} -- error: unique contraint
+---
+- error: Duplicate key exists in unique index 'i1' in space 'test'
+...
+i2:alter{parts = {3, 'string'}} -- error: nullability contraint
+---
+- error: 'Tuple field 3 type does not match one required by operation: expected string'
+...
+i3:alter{parts = {4, 'unsigned'}} -- error: field type
+---
+- error: 'Tuple field 4 type does not match one required by operation: expected unsigned'
+...
+i3:alter{parts = {4, 'integer', 5, 'string'}} -- error: field missing
+---
+- error: Tuple field count 4 is less than required by space format or defined indexes
+    (expected at least 5)
+...
+i3:alter{parts = {2, 'string', 4, 'integer'}} -- ok
+---
+...
+i3:select()
+---
+- - [5, 'xxx', 'eee', 555, 'hhh']
+  - [3, 'xxx', null, 777, 'ggg']
+  - [2, 'yyy', 'bbb', -888]
+  - [6, 'yyy', null, -444]
+  - [4, 'zzz', 'ddd', -666]
+  - [1, 'zzz', 'aaa', 999, 'fff']
+...
+-- Check that recovery works.
+inspector:cmd("restart server default")
+s = box.space.test
+---
+...
+s.index.i1:select()
+---
+- - [3, 'xxx', null, 777, 'ggg']
+  - [5, 'xxx', 'eee', 555, 'hhh']
+  - [2, 'yyy', 'bbb', -888]
+  - [6, 'yyy', null, -444]
+  - [1, 'zzz', 'aaa', 999, 'fff']
+  - [4, 'zzz', 'ddd', -666]
+...
+s.index.i2:select()
+---
+- - [3, 'xxx', null, 777, 'ggg']
+  - [6, 'yyy', null, -444]
+  - [1, 'zzz', 'aaa', 999, 'fff']
+  - [2, 'yyy', 'bbb', -888]
+  - [4, 'zzz', 'ddd', -666]
+  - [5, 'xxx', 'eee', 555, 'hhh']
+...
+s.index.i3:select()
+---
+- - [5, 'xxx', 'eee', 555, 'hhh']
+  - [3, 'xxx', null, 777, 'ggg']
+  - [2, 'yyy', 'bbb', -888]
+  - [6, 'yyy', null, -444]
+  - [4, 'zzz', 'ddd', -666]
+  - [1, 'zzz', 'aaa', 999, 'fff']
+...
+box.snapshot()
+---
+- ok
+...
+s:drop()
+---
+...
diff --git a/test/engine/ddl.test.lua b/test/engine/ddl.test.lua
index f3d68e1b..2593535b 100644
--- a/test/engine/ddl.test.lua
+++ b/test/engine/ddl.test.lua
@@ -498,3 +498,224 @@ s:format(format)
 s:format()
 s:select()
 s:drop()
+
+--
+-- gh-1557: NULL in indexes.
+--
+
+NULL = require('msgpack').NULL
+
+format = {}
+format[1] = { name = 'field1', type = 'unsigned', is_nullable = true }
+format[2] = { name = 'field2', type = 'unsigned', is_nullable = true }
+s = box.schema.space.create('test', {engine = engine, format = format})
+s:create_index('primary', { parts = { 'field1' } })
+s:create_index('primary', { parts = {{'field1', is_nullable = false}} })
+format[1].is_nullable = false
+s:format(format)
+s:create_index('primary', { parts = {{'field1', is_nullable = true}} })
+
+i = s:create_index('primary', { parts = {'field1'} })
+i.parts
+
+-- Check that is_nullable can't be set to false on non-empty space
+s:insert({1, NULL})
+format[1].is_nullable = true
+s:format(format)
+format[1].is_nullable = false
+format[2].is_nullable = false
+s:format(format)
+_ = s:delete(1)
+-- Disable is_nullable on empty space
+s:format(format)
+-- Disable is_nullable on a non-empty space.
+format[2].is_nullable = true
+s:format(format)
+s:replace{1, 1}
+format[2].is_nullable = false
+s:format(format)
+-- Enable is_nullable on a non-empty space.
+format[2].is_nullable = true
+s:format(format)
+s:replace{1, box.NULL}
+_ = s:delete{1}
+s:format({})
+
+i = s:create_index('secondary', { parts = {{2, 'string', is_nullable = true}} })
+i.parts
+
+s:insert({1, NULL})
+s.index.secondary:alter({ parts = {{2, 'string', is_nullable = false} }})
+_ = s:delete({1})
+s.index.secondary:alter({ parts = {{2, 'string', is_nullable = false} }})
+s:insert({1, NULL})
+s:insert({2, 'xxx'})
+s.index.secondary:alter({ parts = {{2, 'string', is_nullable = true} }})
+s:insert({1, NULL})
+
+s:drop()
+
+s = box.schema.space.create('test', {engine = engine})
+inspector:cmd("setopt delimiter ';'")
+s:format({
+    [1] = { name = 'id1', type = 'unsigned'},
+    [2] = { name = 'id2', type = 'unsigned'},
+    [3] = { name = 'id3', type = 'string'},
+    [4] = { name = 'id4', type = 'string'},
+    [5] = { name = 'id5', type = 'string'},
+    [6] = { name = 'id6', type = 'string'},
+});
+inspector:cmd("setopt delimiter ''");
+s:format()
+_ = s:create_index('primary')
+s:insert({1, 1, 'a', 'b', 'c', 'd'})
+s:drop()
+
+s = box.schema.space.create('test', {engine = engine})
+idx = s:create_index('idx')
+box.space.test == s
+s:drop()
+
+--
+-- gh-3000: index modifying must change key_def parts and
+-- comparators. They can be changed, if there was compatible index
+-- parts change. For example, a part type was changed from
+-- unsigned to number. In such a case comparators must be reset
+-- and part types updated.
+--
+s = box.schema.space.create('test', {engine = engine})
+pk = s:create_index('pk')
+s:replace{1}
+pk:alter{parts = {{1, 'integer'}}}
+s:replace{-2}
+s:select{}
+s:drop()
+
+--
+-- Allow to change is_nullable in index definition on non-empty
+-- space.
+--
+s = box.schema.space.create('test', {engine = engine})
+pk = s:create_index('pk')
+sk1 = s:create_index('sk1', {parts = {{2, 'unsigned', is_nullable = true}}})
+sk2 = s:create_index('sk2', {parts = {{3, 'unsigned', is_nullable = false}}})
+s:replace{1, box.NULL, 1}
+sk1:alter({parts = {{2, 'unsigned', is_nullable = false}}})
+s:replace{1, 1, 1}
+sk1:alter({parts = {{2, 'unsigned', is_nullable = false}}})
+s:replace{1, 1, box.NULL}
+sk2:alter({parts = {{3, 'unsigned', is_nullable = true}}})
+s:replace{1, 1, box.NULL}
+s:replace{2, 10, 100}
+s:replace{3, 0, 20}
+s:replace{4, 15, 150}
+s:replace{5, 9, box.NULL}
+sk1:select{}
+sk2:select{}
+s:drop()
+
+--
+-- gh-3008: allow multiple types on the same field.
+--
+format = {}
+format[1] = {name = 'field1', type = 'unsigned'}
+format[2] = {name = 'field2', type = 'scalar'}
+format[3] = {name = 'field3', type = 'integer'}
+s = box.schema.space.create('test', {engine = engine, format = format})
+pk = s:create_index('pk')
+sk1 = s:create_index('sk1', {parts = {{2, 'number'}}})
+sk2 = s:create_index('sk2', {parts = {{2, 'integer'}}})
+sk3 = s:create_index('sk3', {parts = {{2, 'unsigned'}}})
+sk4 = s:create_index('sk4', {parts = {{3, 'number'}}})
+s:format()
+s:replace{1, '100', -20.2}
+s:replace{1, 100, -20.2}
+s:replace{1, 100, -20}
+s:replace{2, 50, 0}
+s:replace{3, 150, -60}
+s:replace{4, 0, 120}
+pk:select{}
+sk1:select{}
+sk2:select{}
+sk3:select{}
+sk4:select{}
+
+sk1:alter{parts = {{2, 'unsigned'}}}
+sk2:alter{parts = {{2, 'unsigned'}}}
+sk4:alter{parts = {{3, 'integer'}}}
+s:replace{1, 50.5, 1.5}
+s:replace{1, 50, 1.5}
+s:replace{5, 5, 5}
+sk1:select{}
+sk2:select{}
+sk3:select{}
+sk4:select{}
+
+sk1:drop()
+sk2:drop()
+sk3:drop()
+-- Remove 'unsigned' constraints from indexes, and 'scalar' now
+-- can be inserted in the second field.
+s:replace{1, true, 100}
+s:select{}
+sk4:select{}
+s:drop()
+
+--
+-- Creating/altering a secondary index of a non-empty space.
+--
+s = box.schema.space.create('test', {engine = engine})
+_ = s:create_index('pk')
+
+_ = s:insert{1, 'zzz', 'aaa', 999}
+_ = s:insert{2, 'yyy', 'bbb', 888}
+_ = s:insert{3, 'xxx', 'ccc', 777}
+
+box.snapshot()
+
+_ = s:update(1, {{'!', -1, 'eee'}})
+_ = s:upsert({2, '2', '2', -2}, {{'=', 4, -888}})
+_ = s:replace(s:get(3):update{{'=', 3, box.NULL}})
+_ = s:upsert({4, 'zzz', 'ddd', -666}, {{'!', -1, 'abc'}})
+
+box.snapshot()
+
+_ = s:update(1, {{'=', 5, 'fff'}})
+_ = s:upsert({3, '3', '3', -3}, {{'=', 5, 'ggg'}})
+_ = s:insert{5, 'xxx', 'eee', 555, 'hhh'}
+_ = s:replace{6, 'yyy', box.NULL, -444}
+
+s:select()
+
+s:create_index('sk', {parts = {2, 'string'}}) -- error: unique constraint
+s:create_index('sk', {parts = {3, 'string'}}) -- error: nullability constraint
+s:create_index('sk', {parts = {4, 'unsigned'}}) -- error: field type
+s:create_index('sk', {parts = {4, 'integer', 5, 'string'}}) -- error: field missing
+
+i1 = s:create_index('i1', {parts = {2, 'string'}, unique = false})
+i2 = s:create_index('i2', {parts = {{3, 'string', is_nullable = true}}})
+i3 = s:create_index('i3', {parts = {4, 'integer'}})
+
+i1:select()
+i2:select()
+i3:select()
+
+i1:alter{unique = true} -- error: unique contraint
+i2:alter{parts = {3, 'string'}} -- error: nullability contraint
+i3:alter{parts = {4, 'unsigned'}} -- error: field type
+i3:alter{parts = {4, 'integer', 5, 'string'}} -- error: field missing
+
+i3:alter{parts = {2, 'string', 4, 'integer'}} -- ok
+i3:select()
+
+-- Check that recovery works.
+inspector:cmd("restart server default")
+
+s = box.space.test
+s.index.i1:select()
+s.index.i2:select()
+s.index.i3:select()
+
+box.snapshot()
+
+s:drop()
diff --git a/test/unit/vy_mem.c b/test/unit/vy_mem.c
index 6641dc0c..6666f94c 100644
--- a/test/unit/vy_mem.c
+++ b/test/unit/vy_mem.c
@@ -51,9 +51,9 @@ test_basic(void)
 
 	/* Check version  */
 	stmt = vy_mem_insert_template(mem, &stmts[4]);
-	is(mem->version, 6, "vy_mem->version")
+	is(mem->version, 8, "vy_mem->version")
 	vy_mem_commit_stmt(mem, stmt);
-	is(mem->version, 6, "vy_mem->version")
+	is(mem->version, 9, "vy_mem->version")
 
 	/* Clean up */
 	vy_mem_delete(mem);
diff --git a/test/vinyl/ddl.result b/test/vinyl/ddl.result
index 4607a44e..3f672474 100644
--- a/test/vinyl/ddl.result
+++ b/test/vinyl/ddl.result
@@ -68,202 +68,61 @@ index = space:create_index('primary', {type = 'hash'})
 space:drop()
 ---
 ...
--- creation of a new index and altering the definition of an existing
--- index are unsupported for non-empty spaces
+-- rebuild of the primary index is not supported
 space = box.schema.space.create('test', { engine = 'vinyl' })
 ---
 ...
-index = space:create_index('primary')
+pk = space:create_index('pk', {run_count_per_level = 1, run_size_ratio = 10})
 ---
 ...
-space:insert({1})
+space:replace{1, 1}
 ---
-- [1]
-...
--- fail because of wrong tuple format {1}, but need {1, ...}
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
----
-- error: Vinyl does not support building an index for a non-empty space
-...
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
----
-- error: Vinyl does not support building an index for a non-empty space
-...
-#box.space._index:select({space.id})
----
-- 1
-...
-box.space._index:get{space.id, 0}[6]
----
-- [[0, 'unsigned']]
-...
-space:drop()
----
-...
-space = box.schema.space.create('test', { engine = 'vinyl' })
----
-...
-index = space:create_index('primary')
----
-...
-space:insert({1, 2})
----
-- [1, 2]
-...
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
----
-- error: Vinyl does not support building an index for a non-empty space
-...
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
----
-- error: Vinyl does not support building an index for a non-empty space
-...
-#box.space._index:select({space.id})
----
-- 1
-...
-box.space._index:get{space.id, 0}[6]
----
-- [[0, 'unsigned']]
-...
-space:drop()
----
-...
-space = box.schema.space.create('test', { engine = 'vinyl' })
----
-...
-index = space:create_index('primary')
----
-...
-space:insert({1, 2})
----
-- [1, 2]
-...
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
----
-- error: Vinyl does not support building an index for a non-empty space
-...
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
----
-- error: Vinyl does not support building an index for a non-empty space
-...
-#box.space._index:select({space.id})
----
-- 1
-...
-box.space._index:get{space.id, 0}[6]
----
-- [[0, 'unsigned']]
-...
-space:delete({1})
----
-...
--- must fail because vy_mems have data
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
----
-- error: Vinyl does not support building an index for a non-empty space
+- [1, 1]
 ...
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
+pk:alter{parts = {2, 'unsigned'}} -- error: mem not empty
 ---
-- error: Vinyl does not support building an index for a non-empty space
+- error: Vinyl does not support rebuilding the primary index of a non-empty space
 ...
 box.snapshot()
 ---
 - ok
 ...
-while space.index.primary:info().rows ~= 0 do fiber.sleep(0.01) end
----
-...
--- after a dump REPLACE + DELETE = nothing, so the space is empty now and
--- can be altered.
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
----
-...
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
----
-...
-#box.space._index:select({space.id})
----
-- 2
-...
-box.space._index:get{space.id, 0}[6]
+pk:alter{parts = {2, 'unsigned'}} -- error: run not empty
 ---
-- [[0, 'unsigned'], [1, 'unsigned']]
+- error: Vinyl does not support rebuilding the primary index of a non-empty space
 ...
-space:insert({1, 2})
----
-- [1, 2]
-...
-index:select{}
----
-- - [1, 2]
-...
-index2:select{}
----
-- - [1, 2]
-...
-space:drop()
+space:replace{2, 2}
 ---
+- [2, 2]
 ...
-space = box.schema.space.create('test', { engine = 'vinyl' })
+space:delete{1}
 ---
 ...
-index = space:create_index('primary', { run_count_per_level = 2 })
+space:delete{2}
 ---
 ...
-space:insert({1, 2})
+pk:alter{parts = {2, 'unsigned'}} -- error: mem/run not empty
 ---
-- [1, 2]
+- error: Vinyl does not support rebuilding the primary index of a non-empty space
 ...
 box.snapshot()
 ---
 - ok
 ...
-space:delete({1})
----
-...
-box.snapshot()
+-- wait for compaction to complete
+while pk:info().disk.compact.count == 0 do fiber.sleep(0.01) end
 ---
-- ok
-...
-while space.index.primary:info().run_count ~= 2 do fiber.sleep(0.01) end
----
-...
--- must fail because vy_runs have data
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
----
-- error: Vinyl does not support building an index for a non-empty space
-...
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
----
-- error: Vinyl does not support building an index for a non-empty space
-...
--- After compaction the REPLACE + DELETE + DELETE = nothing, so
--- the space is now empty and can be altered.
-space:delete({1})
----
-...
--- Make sure the run is big enough to trigger compaction.
-space:replace({2, 3})
----
-- [2, 3]
-...
-space:delete({2})
----
-...
-box.snapshot()
----
-- ok
 ...
--- Wait until the dump is finished.
-while space.index.primary:info().rows ~= 0 do fiber.sleep(0.01) end
+pk:alter{parts = {2, 'unsigned'}} -- success: space is empty now
 ---
 ...
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
+space:replace{1, 2}
 ---
+- [1, 2]
 ...
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
+space:get(2)
 ---
+- [1, 2]
 ...
 space:drop()
 ---
@@ -291,7 +150,7 @@ space:auto_increment{3}
 ...
 box.space._index:replace{space.id, 0, 'pk', 'tree', {unique=true}, {{0, 'unsigned'}, {1, 'unsigned'}}}
 ---
-- error: Vinyl does not support building an index for a non-empty space
+- error: Vinyl does not support rebuilding the primary index of a non-empty space
 ...
 space:select{}
 ---
@@ -747,14 +606,6 @@ s.index.secondary.unique
 ---
 - false
 ...
-s.index.secondary:alter{unique = true} -- error
----
-- error: Vinyl does not support building an index for a non-empty space
-...
-s.index.secondary.unique
----
-- false
-...
 s:insert{2, 10}
 ---
 - [2, 10]
@@ -771,27 +622,256 @@ s:drop()
 s = box.schema.space.create('test', {engine = 'vinyl'})
 ---
 ...
-_ = s:create_index('i1')
+pk = s:create_index('pk', {parts = {1, 'unsigned'}})
 ---
 ...
-_ = s:create_index('i2', {parts = {2, 'integer'}})
+s:replace{1}
 ---
+- [1]
 ...
-_ = s:create_index('i3', {parts = {{3, 'string', is_nullable = true}}})
+-- Extending field type is allowed without rebuild.
+pk:alter{parts = {1, 'integer'}}
 ---
 ...
-_ = s:replace{1, 1, 'test'}
+-- Should fail as we do not support rebuilding the primary index of a non-empty space.
+pk:alter{parts = {1, 'unsigned'}}
 ---
+- error: Vinyl does not support rebuilding the primary index of a non-empty space
 ...
--- Should fail with 'Vinyl does not support building an index for a non-empty space'.
-s.index.i2:alter{parts = {2, 'unsigned'}}
+s:replace{-1}
 ---
-- error: Vinyl does not support building an index for a non-empty space
+- [-1]
 ...
-s.index.i3:alter{parts = {{3, 'string', is_nullable = false}}}
+s:drop()
 ---
-- error: Vinyl does not support building an index for a non-empty space
 ...
-s:drop()
+--
+-- Check that all modifications done to the space during index build
+-- are reflected in the new index.
+--
+math.randomseed(os.time())
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('pk')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+box.begin();
+---
+...
+for i = 1, 1000 do
+    if (i % 100 == 0) then
+        box.commit()
+        box.begin()
+    end
+    if i % 300 == 0 then
+        box.snapshot()
+    end
+    box.space.test:replace{i, i, i}
+end;
+---
+...
+box.commit();
+---
+...
+last_val = 1000;
+---
+...
+function gen_load()
+    local s = box.space.test
+    for i = 1, 200 do
+        local op = math.random(4)
+        local key = math.random(1000)
+        local val1 = math.random(1000)
+        local val2 = last_val + 1
+        last_val = val2
+        if op == 1 then
+            pcall(s.insert, s, {key, val1, val2})
+        elseif op == 2 then
+            pcall(s.replace, s, {key, val1, val2})
+        elseif op == 3 then
+            pcall(s.delete, s, {key})
+        elseif op == 4 then
+            pcall(s.upsert, s, {key, val1, val2}, {{'=', 2, val1}, {'=', 3, val2}})
+        end
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+ch = fiber.channel(1)
+---
+...
+_ = fiber.create(function() gen_load() ch:put(true) end)
+---
+...
+_ = box.space.test:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
+---
+...
+ch:get()
+---
+- true
+...
+_ = fiber.create(function() gen_load() ch:put(true) end)
+---
+...
+_ = box.space.test:create_index('tk', {unique = true, parts = {3, 'unsigned'}})
+---
+...
+ch:get()
+---
+- true
+...
+box.space.test.index.pk:count() == box.space.test.index.sk:count()
+---
+- true
+...
+box.space.test.index.pk:count() == box.space.test.index.tk:count()
+---
+- true
+...
+test_run:cmd("restart server default")
+box.space.test.index.pk:count() == box.space.test.index.sk:count()
 ---
+- true
+...
+box.space.test.index.pk:count() == box.space.test.index.tk:count()
+---
+- true
+...
+box.snapshot()
+---
+- ok
+...
+box.space.test.index.pk:count() == box.space.test.index.sk:count()
+---
+- true
+...
+box.space.test.index.pk:count() == box.space.test.index.tk:count()
+---
+- true
+...
+box.space.test:drop()
+---
+...
+--
+-- Check that creation of a secondary index triggers dump
+-- if memory quota is exceeded.
+--
+test_run:cmd("create server test with script='vinyl/low_quota.lua'")
+---
+- true
+...
+test_run:cmd("start server test with args='1048576'")
+---
+- true
+...
+test_run:cmd("switch test")
+---
+- true
+...
+_ = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = box.space.test:create_index('pk')
+---
+...
+pad = string.rep('x', 1000)
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+box.begin();
+---
+...
+for i = 1, 1000 do
+    if (i % 100 == 0) then
+        box.commit()
+        box.begin()
+    end
+    box.space.test:replace{i, i, pad}
+end;
+---
+...
+box.commit();
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = box.space.test:create_index('sk', {parts = {2, 'unsigned', 3, 'string'}})
+---
+...
+box.space.test.index.sk:info().disk.dump.count > 1
+---
+- true
+...
+box.space.test.index.sk:count()
+---
+- 1000
+...
+test_run:cmd("restart server test with args='1048576'")
+box.space.test.index.sk:count()
+---
+- 1000
+...
+box.space.test.index.sk:drop()
+---
+...
+box.snapshot()
+---
+- ok
+...
+--
+-- Check that run files left from an index we failed to build
+-- are removed by garbage collection.
+--
+fio = require('fio')
+---
+...
+_ = box.space.test:replace{1001, 1, string.rep('x', 1000)}
+---
+...
+box.space.test:create_index('sk', {parts = {2, 'unsigned', 3, 'string'}})
+---
+- error: Duplicate key exists in unique index 'sk' in space 'test'
+...
+#fio.listdir(fio.pathjoin(box.cfg.vinyl_dir, box.space.test.id, 1)) > 0
+---
+- true
+...
+box.snapshot()
+---
+- ok
+...
+#fio.listdir(fio.pathjoin(box.cfg.vinyl_dir, box.space.test.id, 1)) == 0
+---
+- true
+...
+box.space.test:drop()
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("cleanup server test")
+---
+- true
 ...
diff --git a/test/vinyl/ddl.test.lua b/test/vinyl/ddl.test.lua
index 637a331d..b0378600 100644
--- a/test/vinyl/ddl.test.lua
+++ b/test/vinyl/ddl.test.lua
@@ -23,76 +23,23 @@ space = box.schema.space.create('test', { engine = 'vinyl' })
 index = space:create_index('primary', {type = 'hash'})
 space:drop()
 
--- creation of a new index and altering the definition of an existing
--- index are unsupported for non-empty spaces
+-- rebuild of the primary index is not supported
 space = box.schema.space.create('test', { engine = 'vinyl' })
-index = space:create_index('primary')
-space:insert({1})
--- fail because of wrong tuple format {1}, but need {1, ...}
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
-#box.space._index:select({space.id})
-box.space._index:get{space.id, 0}[6]
-space:drop()
-
-space = box.schema.space.create('test', { engine = 'vinyl' })
-index = space:create_index('primary')
-space:insert({1, 2})
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
-#box.space._index:select({space.id})
-box.space._index:get{space.id, 0}[6]
-space:drop()
-
-space = box.schema.space.create('test', { engine = 'vinyl' })
-index = space:create_index('primary')
-space:insert({1, 2})
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
-#box.space._index:select({space.id})
-box.space._index:get{space.id, 0}[6]
-space:delete({1})
-
--- must fail because vy_mems have data
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
-box.snapshot()
-while space.index.primary:info().rows ~= 0 do fiber.sleep(0.01) end
-
--- after a dump REPLACE + DELETE = nothing, so the space is empty now and
--- can be altered.
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
-#box.space._index:select({space.id})
-box.space._index:get{space.id, 0}[6]
-space:insert({1, 2})
-index:select{}
-index2:select{}
-space:drop()
-
-space = box.schema.space.create('test', { engine = 'vinyl' })
-index = space:create_index('primary', { run_count_per_level = 2 })
-space:insert({1, 2})
-box.snapshot()
-space:delete({1})
+pk = space:create_index('pk', {run_count_per_level = 1, run_size_ratio = 10})
+space:replace{1, 1}
+pk:alter{parts = {2, 'unsigned'}} -- error: mem not empty
 box.snapshot()
-while space.index.primary:info().run_count ~= 2 do fiber.sleep(0.01) end
--- must fail because vy_runs have data
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
-
--- After compaction the REPLACE + DELETE + DELETE = nothing, so
--- the space is now empty and can be altered.
-space:delete({1})
--- Make sure the run is big enough to trigger compaction.
-space:replace({2, 3})
-space:delete({2})
+pk:alter{parts = {2, 'unsigned'}} -- error: run not empty
+space:replace{2, 2}
+space:delete{1}
+space:delete{2}
+pk:alter{parts = {2, 'unsigned'}} -- error: mem/run not empty
 box.snapshot()
--- Wait until the dump is finished.
-while space.index.primary:info().rows ~= 0 do fiber.sleep(0.01) end
-index2 = space:create_index('secondary', { parts = {2, 'unsigned'} })
-space.index.primary:alter({parts = {1, 'unsigned', 2, 'unsigned'}})
-
+-- wait for compaction to complete
+while pk:info().disk.compact.count == 0 do fiber.sleep(0.01) end
+pk:alter{parts = {2, 'unsigned'}} -- success: space is empty now
+space:replace{1, 2}
+space:get(2)
 space:drop()
 
 --
@@ -273,19 +220,138 @@ _ = s:create_index('secondary', {unique = true, parts = {2, 'unsigned'}})
 s:insert{1, 10}
 s.index.secondary:alter{unique = false} -- ok
 s.index.secondary.unique
-s.index.secondary:alter{unique = true} -- error
-s.index.secondary.unique
 s:insert{2, 10}
 s.index.secondary:select(10)
 s:drop()
 
 -- Narrowing indexed field type entails index rebuild.
 s = box.schema.space.create('test', {engine = 'vinyl'})
-_ = s:create_index('i1')
-_ = s:create_index('i2', {parts = {2, 'integer'}})
-_ = s:create_index('i3', {parts = {{3, 'string', is_nullable = true}}})
-_ = s:replace{1, 1, 'test'}
--- Should fail with 'Vinyl does not support building an index for a non-empty space'.
-s.index.i2:alter{parts = {2, 'unsigned'}}
-s.index.i3:alter{parts = {{3, 'string', is_nullable = false}}}
+pk = s:create_index('pk', {parts = {1, 'unsigned'}})
+s:replace{1}
+-- Extending field type is allowed without rebuild.
+pk:alter{parts = {1, 'integer'}}
+-- Should fail as we do not support rebuilding the primary index of a non-empty space.
+pk:alter{parts = {1, 'unsigned'}}
+s:replace{-1}
 s:drop()
+
+--
+-- Check that all modifications done to the space during index build
+-- are reflected in the new index.
+--
+math.randomseed(os.time())
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('pk')
+
+test_run:cmd("setopt delimiter ';'")
+
+box.begin();
+for i = 1, 1000 do
+    if (i % 100 == 0) then
+        box.commit()
+        box.begin()
+    end
+    if i % 300 == 0 then
+        box.snapshot()
+    end
+    box.space.test:replace{i, i, i}
+end;
+box.commit();
+
+last_val = 1000;
+
+function gen_load()
+    local s = box.space.test
+    for i = 1, 200 do
+        local op = math.random(4)
+        local key = math.random(1000)
+        local val1 = math.random(1000)
+        local val2 = last_val + 1
+        last_val = val2
+        if op == 1 then
+            pcall(s.insert, s, {key, val1, val2})
+        elseif op == 2 then
+            pcall(s.replace, s, {key, val1, val2})
+        elseif op == 3 then
+            pcall(s.delete, s, {key})
+        elseif op == 4 then
+            pcall(s.upsert, s, {key, val1, val2}, {{'=', 2, val1}, {'=', 3, val2}})
+        end
+    end
+end;
+
+test_run:cmd("setopt delimiter ''");
+
+ch = fiber.channel(1)
+
+_ = fiber.create(function() gen_load() ch:put(true) end)
+_ = box.space.test:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
+ch:get()
+
+_ = fiber.create(function() gen_load() ch:put(true) end)
+_ = box.space.test:create_index('tk', {unique = true, parts = {3, 'unsigned'}})
+ch:get()
+
+box.space.test.index.pk:count() == box.space.test.index.sk:count()
+box.space.test.index.pk:count() == box.space.test.index.tk:count()
+
+test_run:cmd("restart server default")
+
+box.space.test.index.pk:count() == box.space.test.index.sk:count()
+box.space.test.index.pk:count() == box.space.test.index.tk:count()
+box.snapshot()
+box.space.test.index.pk:count() == box.space.test.index.sk:count()
+box.space.test.index.pk:count() == box.space.test.index.tk:count()
+box.space.test:drop()
+
+--
+-- Check that creation of a secondary index triggers dump
+-- if memory quota is exceeded.
+--
+test_run:cmd("create server test with script='vinyl/low_quota.lua'")
+test_run:cmd("start server test with args='1048576'")
+test_run:cmd("switch test")
+
+_ = box.schema.space.create('test', {engine = 'vinyl'})
+_ = box.space.test:create_index('pk')
+
+pad = string.rep('x', 1000)
+
+test_run:cmd("setopt delimiter ';'")
+box.begin();
+for i = 1, 1000 do
+    if (i % 100 == 0) then
+        box.commit()
+        box.begin()
+    end
+    box.space.test:replace{i, i, pad}
+end;
+box.commit();
+test_run:cmd("setopt delimiter ''");
+
+_ = box.space.test:create_index('sk', {parts = {2, 'unsigned', 3, 'string'}})
+box.space.test.index.sk:info().disk.dump.count > 1
+box.space.test.index.sk:count()
+
+test_run:cmd("restart server test with args='1048576'")
+
+box.space.test.index.sk:count()
+box.space.test.index.sk:drop()
+box.snapshot()
+
+--
+-- Check that run files left from an index we failed to build
+-- are removed by garbage collection.
+--
+fio = require('fio')
+_ = box.space.test:replace{1001, 1, string.rep('x', 1000)}
+box.space.test:create_index('sk', {parts = {2, 'unsigned', 3, 'string'}})
+#fio.listdir(fio.pathjoin(box.cfg.vinyl_dir, box.space.test.id, 1)) > 0
+box.snapshot()
+#fio.listdir(fio.pathjoin(box.cfg.vinyl_dir, box.space.test.id, 1)) == 0
+box.space.test:drop()
+
+test_run:cmd("switch default")
+test_run:cmd("stop server test")
+test_run:cmd("cleanup server test")
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index fd21f7bb..fb090109 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -1395,3 +1395,259 @@ s:count() -- 200
 s:drop()
 ---
 ...
+--
+-- Check that ALTER is aborted if a tuple inserted during index build
+-- doesn't conform to the new format.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('pk', {page_size = 16})
+---
+...
+pad = string.rep('x', 16)
+---
+...
+for i = 101, 200 do s:replace{i, i, pad} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+ch = fiber.channel(1)
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+_ = fiber.create(function()
+    fiber.sleep(0.01)
+    for i = 1, 100 do
+        s:replace{i}
+    end
+    ch:put(true)
+end);
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+---
+- ok
+...
+s:create_index('sk', {parts = {2, 'unsigned'}}) -- must fail
+---
+- error: Tuple field count 1 is less than required by space format or defined indexes
+    (expected at least 2)
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+---
+- ok
+...
+ch:get()
+---
+- true
+...
+s:count() -- 200
+---
+- 200
+...
+s:drop()
+---
+...
+--
+-- Check that ALTER is aborted if a tuple inserted during index build
+-- violates unique constraint.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('pk', {page_size = 16})
+---
+...
+pad = string.rep('x', 16)
+---
+...
+for i = 101, 200 do s:replace{i, i, pad} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+ch = fiber.channel(1)
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+_ = fiber.create(function()
+    fiber.sleep(0.01)
+    for i = 1, 100 do
+        s:replace{i, i + 1}
+    end
+    ch:put(true)
+end);
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+---
+- ok
+...
+s:create_index('sk', {parts = {2, 'unsigned'}}) -- must fail
+---
+- error: Duplicate key exists in unique index 'sk' in space 'test'
+...
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+---
+- ok
+...
+ch:get()
+---
+- true
+...
+s:count() -- 200
+---
+- 200
+...
+s:drop()
+---
+...
+--
+-- Check that modifications done to the space during the final dump
+-- of a newly built index are recovered properly.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('pk')
+---
+...
+for i = 1, 5 do s:replace{i, i} end
+---
+...
+errinj.set("ERRINJ_VY_RUN_WRITE_TIMEOUT", 0.1)
+---
+- ok
+...
+ch = fiber.channel(1)
+---
+...
+_ = fiber.create(function() s:create_index('sk', {parts = {2, 'integer'}}) ch:put(true) end)
+---
+...
+errinj.set("ERRINJ_VY_RUN_WRITE_TIMEOUT", 0)
+---
+- ok
+...
+fiber.sleep(0.01)
+---
+...
+_ = s:delete{1}
+---
+...
+_ = s:replace{2, -2}
+---
+...
+_ = s:delete{2}
+---
+...
+_ = s:replace{3, -3}
+---
+...
+_ = s:replace{3, -2}
+---
+...
+_ = s:replace{3, -1}
+---
+...
+_ = s:delete{3}
+---
+...
+_ = s:upsert({3, 3}, {{'=', 2, 1}})
+---
+...
+_ = s:upsert({3, 3}, {{'=', 2, 2}})
+---
+...
+_ = s:delete{3}
+---
+...
+_ = s:replace{4, -1}
+---
+...
+_ = s:replace{4, -2}
+---
+...
+_ = s:replace{4, -4}
+---
+...
+_ = s:upsert({5, 1}, {{'=', 2, 1}})
+---
+...
+_ = s:upsert({5, 2}, {{'=', 2, -5}})
+---
+...
+_ = s:replace{6, -6}
+---
+...
+_ = s:upsert({7, -7}, {{'=', 2, -7}})
+---
+...
+ch:get()
+---
+- true
+...
+s.index.sk:select()
+---
+- - [7, -7]
+  - [6, -6]
+  - [5, -5]
+  - [4, -4]
+...
+s.index.sk:info().memory.rows
+---
+- 27
+...
+test_run:cmd('restart server default')
+s = box.space.test
+---
+...
+s.index.sk:select()
+---
+- - [7, -7]
+  - [6, -6]
+  - [5, -5]
+  - [4, -4]
+...
+s.index.sk:info().memory.rows
+---
+- 27
+...
+box.snapshot()
+---
+- ok
+...
+s.index.sk:select()
+---
+- - [7, -7]
+  - [6, -6]
+  - [5, -5]
+  - [4, -4]
+...
+s.index.sk:info().memory.rows
+---
+- 0
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index 64d04c62..e434b098 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -548,3 +548,118 @@ ch:get()
 
 s:count() -- 200
 s:drop()
+
+--
+-- Check that ALTER is aborted if a tuple inserted during index build
+-- doesn't conform to the new format.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('pk', {page_size = 16})
+
+pad = string.rep('x', 16)
+for i = 101, 200 do s:replace{i, i, pad} end
+box.snapshot()
+
+ch = fiber.channel(1)
+test_run:cmd("setopt delimiter ';'")
+_ = fiber.create(function()
+    fiber.sleep(0.01)
+    for i = 1, 100 do
+        s:replace{i}
+    end
+    ch:put(true)
+end);
+test_run:cmd("setopt delimiter ''");
+
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+s:create_index('sk', {parts = {2, 'unsigned'}}) -- must fail
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+
+ch:get()
+
+s:count() -- 200
+s:drop()
+
+--
+-- Check that ALTER is aborted if a tuple inserted during index build
+-- violates unique constraint.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('pk', {page_size = 16})
+
+pad = string.rep('x', 16)
+for i = 101, 200 do s:replace{i, i, pad} end
+box.snapshot()
+
+ch = fiber.channel(1)
+test_run:cmd("setopt delimiter ';'")
+_ = fiber.create(function()
+    fiber.sleep(0.01)
+    for i = 1, 100 do
+        s:replace{i, i + 1}
+    end
+    ch:put(true)
+end);
+test_run:cmd("setopt delimiter ''");
+
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001)
+s:create_index('sk', {parts = {2, 'unsigned'}}) -- must fail
+errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0)
+
+ch:get()
+
+s:count() -- 200
+s:drop()
+
+--
+-- Check that modifications done to the space during the final dump
+-- of a newly built index are recovered properly.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('pk')
+
+for i = 1, 5 do s:replace{i, i} end
+
+errinj.set("ERRINJ_VY_RUN_WRITE_TIMEOUT", 0.1)
+ch = fiber.channel(1)
+_ = fiber.create(function() s:create_index('sk', {parts = {2, 'integer'}}) ch:put(true) end)
+errinj.set("ERRINJ_VY_RUN_WRITE_TIMEOUT", 0)
+
+fiber.sleep(0.01)
+
+_ = s:delete{1}
+_ = s:replace{2, -2}
+_ = s:delete{2}
+_ = s:replace{3, -3}
+_ = s:replace{3, -2}
+_ = s:replace{3, -1}
+_ = s:delete{3}
+_ = s:upsert({3, 3}, {{'=', 2, 1}})
+_ = s:upsert({3, 3}, {{'=', 2, 2}})
+_ = s:delete{3}
+_ = s:replace{4, -1}
+_ = s:replace{4, -2}
+_ = s:replace{4, -4}
+_ = s:upsert({5, 1}, {{'=', 2, 1}})
+_ = s:upsert({5, 2}, {{'=', 2, -5}})
+_ = s:replace{6, -6}
+_ = s:upsert({7, -7}, {{'=', 2, -7}})
+
+ch:get()
+
+s.index.sk:select()
+s.index.sk:info().memory.rows
+
+test_run:cmd('restart server default')
+
+s = box.space.test
+
+s.index.sk:select()
+s.index.sk:info().memory.rows
+
+box.snapshot()
+
+s.index.sk:select()
+s.index.sk:info().memory.rows
+
+s:drop()
diff --git a/test/vinyl/errinj_gc.result b/test/vinyl/errinj_gc.result
index 704cae6e..4681c986 100644
--- a/test/vinyl/errinj_gc.result
+++ b/test/vinyl/errinj_gc.result
@@ -180,9 +180,6 @@ s:select()
 - - [100, '12345']
   - [200, '67890']
 ...
---
--- Cleanup.
---
 s:drop()
 ---
 ...
@@ -199,3 +196,74 @@ temp:drop()
 box.cfg{checkpoint_count = default_checkpoint_count}
 ---
 ...
+--
+-- Check that if we failed to clean up an incomplete index before restart,
+-- we will clean it up after recovery.
+--
+fio = require('fio')
+---
+...
+fiber = require('fiber')
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('pk')
+---
+...
+_ = s:insert{1, 1}
+---
+...
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
+---
+- ok
+...
+ch = fiber.channel(1)
+---
+...
+_ = fiber.create(function() pcall(s.create_index, s, 'sk', {parts = {2, 'unsigned'}}) ch:put(true) end)
+---
+...
+-- wait for ALTER to stall on WAL after preparing the new index
+while s.index.pk:info().disk.dump.count == 0 do fiber.sleep(0.001) end
+---
+...
+box.error.injection.set('ERRINJ_VY_GC', true)
+---
+- ok
+...
+box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true);
+---
+- ok
+...
+box.error.injection.set("ERRINJ_WAL_WRITE", true)
+---
+- ok
+...
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
+---
+- ok
+...
+ch:get()
+---
+- true
+...
+#fio.listdir(fio.pathjoin(box.cfg.vinyl_dir, s.id, 1)) > 0
+---
+- true
+...
+test_run:cmd('restart server default')
+fio = require('fio')
+---
+...
+s = box.space.test
+---
+...
+#fio.listdir(fio.pathjoin(box.cfg.vinyl_dir, s.id, 1)) == 0
+---
+- true
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/errinj_gc.test.lua b/test/vinyl/errinj_gc.test.lua
index 44291091..ee10bcd5 100644
--- a/test/vinyl/errinj_gc.test.lua
+++ b/test/vinyl/errinj_gc.test.lua
@@ -85,10 +85,6 @@ file_count()
 
 s:select()
 
---
--- Cleanup.
---
-
 s:drop()
 gc()
 file_count()
@@ -96,3 +92,36 @@ file_count()
 temp:drop()
 
 box.cfg{checkpoint_count = default_checkpoint_count}
+
+--
+-- Check that if we failed to clean up an incomplete index before restart,
+-- we will clean it up after recovery.
+--
+fio = require('fio')
+fiber = require('fiber')
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('pk')
+_ = s:insert{1, 1}
+
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
+ch = fiber.channel(1)
+_ = fiber.create(function() pcall(s.create_index, s, 'sk', {parts = {2, 'unsigned'}}) ch:put(true) end)
+
+-- wait for ALTER to stall on WAL after preparing the new index
+while s.index.pk:info().disk.dump.count == 0 do fiber.sleep(0.001) end
+
+box.error.injection.set('ERRINJ_VY_GC', true)
+box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true);
+box.error.injection.set("ERRINJ_WAL_WRITE", true)
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
+ch:get()
+
+#fio.listdir(fio.pathjoin(box.cfg.vinyl_dir, s.id, 1)) > 0
+
+test_run:cmd('restart server default')
+
+fio = require('fio')
+s = box.space.test
+#fio.listdir(fio.pathjoin(box.cfg.vinyl_dir, s.id, 1)) == 0
+s:drop()
diff --git a/test/vinyl/errinj_vylog.result b/test/vinyl/errinj_vylog.result
index f78201c9..3525c674 100644
--- a/test/vinyl/errinj_vylog.result
+++ b/test/vinyl/errinj_vylog.result
@@ -285,3 +285,60 @@ s3:select()
 s3:drop()
 ---
 ...
+--
+-- Check that an index that was prepared, but not committed,
+-- is recovered properly.
+--
+fiber = require('fiber')
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('pk')
+---
+...
+_ = s:insert{1, 1}
+---
+...
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
+---
+- ok
+...
+ch = fiber.channel(1)
+---
+...
+_ = fiber.create(function() s:create_index('sk', {parts = {2, 'unsigned'}}) ch:put(true) end)
+---
+...
+-- wait for ALTER to stall on WAL after preparing the new index
+while s.index.pk:info().disk.dump.count == 0 do fiber.sleep(0.001) end
+---
+...
+box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true);
+---
+- ok
+...
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
+---
+- ok
+...
+ch:get()
+---
+- true
+...
+test_run:cmd('restart server default')
+s = box.space.test
+---
+...
+s.index.pk:select()
+---
+- - [1, 1]
+...
+s.index.sk:select()
+---
+- - [1, 1]
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/errinj_vylog.test.lua b/test/vinyl/errinj_vylog.test.lua
index 36b3659d..697298c8 100644
--- a/test/vinyl/errinj_vylog.test.lua
+++ b/test/vinyl/errinj_vylog.test.lua
@@ -140,3 +140,32 @@ s2:drop()
 s3 = box.space.test3
 s3:select()
 s3:drop()
+
+--
+-- Check that an index that was prepared, but not committed,
+-- is recovered properly.
+--
+fiber = require('fiber')
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('pk')
+_ = s:insert{1, 1}
+
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
+ch = fiber.channel(1)
+_ = fiber.create(function() s:create_index('sk', {parts = {2, 'unsigned'}}) ch:put(true) end)
+
+-- wait for ALTER to stall on WAL after preparing the new index
+while s.index.pk:info().disk.dump.count == 0 do fiber.sleep(0.001) end
+
+box.error.injection.set('ERRINJ_VY_LOG_FLUSH', true);
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
+ch:get()
+
+test_run:cmd('restart server default')
+
+s = box.space.test
+s.index.pk:select()
+s.index.sk:select()
+
+s:drop()
diff --git a/test/vinyl/gh.result b/test/vinyl/gh.result
index 47695acb..76beab09 100644
--- a/test/vinyl/gh.result
+++ b/test/vinyl/gh.result
@@ -144,7 +144,7 @@ s:insert{5, 5}
 ...
 s.index.primary:alter({parts={2,'unsigned'}})
 ---
-- error: Vinyl does not support building an index for a non-empty space
+- error: Vinyl does not support rebuilding the primary index of a non-empty space
 ...
 s:drop()
 ---
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index e6294d3c..29e88e67 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -130,16 +130,16 @@ result
     - - HEADER:
           type: INSERT
         BODY:
-          tuple: [0, {6: 512, 7: [{'field': 0, 'collation': 1, 'type': 'string'}],
+          tuple: [0, {0: 1, 6: 512, 7: [{'field': 0, 'collation': 1, 'type': 'string'}],
               9: 10, 12: 3, 13: 7}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [5, {2: 8, 9: 10}]
+          tuple: [5, {0: 1, 2: 8, 9: 10}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [4, {2: 5}]
+          tuple: [4, {0: 1, 2: 5}]
       - HEADER:
           type: INSERT
         BODY:
@@ -147,24 +147,24 @@ result
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [2, {1: 1}]
+          tuple: [2, {0: 1}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [8, {1: 1, 2: 8, 8: 9}]
+          tuple: [8, {2: 8, 8: 9}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [0, {0: 2, 5: 1, 6: 512, 7: [{'field': 1, 'is_nullable': true, 'type': 'unsigned'}],
+          tuple: [0, {0: 3, 5: 1, 6: 512, 7: [{'field': 1, 'is_nullable': true, 'type': 'unsigned'}],
               9: 10, 12: 4, 13: 7}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [5, {0: 2, 2: 6, 9: 10}]
+          tuple: [5, {0: 3, 2: 6, 9: 10}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [4, {0: 2, 2: 4}]
+          tuple: [4, {0: 3, 2: 4}]
       - HEADER:
           type: INSERT
         BODY:
@@ -172,11 +172,11 @@ result
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [2, {0: 2, 1: 3}]
+          tuple: [2, {0: 3, 1: 2}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [8, {1: 3, 2: 6, 8: 7}]
+          tuple: [8, {1: 2, 2: 6, 8: 7}]
       - HEADER:
           type: INSERT
         BODY:
@@ -185,52 +185,52 @@ result
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [7, {2: 4}]
+          tuple: [7, {2: 5}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [7, {2: 5}]
+          tuple: [7, {2: 4}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [4, {0: 2, 2: 10}]
+          tuple: [4, {0: 3, 2: 10}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [5, {0: 2, 2: 10, 9: 13}]
+          tuple: [5, {0: 3, 2: 10, 9: 13}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [8, {1: 3, 2: 10, 8: 11}]
+          tuple: [8, {1: 2, 2: 10, 8: 11}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [10, {0: 2, 9: 13}]
+          tuple: [10, {0: 3, 9: 13}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [4, {2: 12}]
+          tuple: [4, {0: 1, 2: 12}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [5, {2: 12, 9: 13}]
+          tuple: [5, {0: 1, 2: 12, 9: 13}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [8, {1: 1, 2: 12, 8: 13}]
+          tuple: [8, {2: 12, 8: 13}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [10, {9: 13}]
+          tuple: [10, {0: 1, 9: 13}]
   - - 00000000000000000008.index
     - - HEADER:
           type: RUNINFO
-- 
2.11.0




More information about the Tarantool-patches mailing list