[PATCH 2/4] vinyl: do not use index lsn to identify indexes in vylog
    Vladimir Davydov 
    vdavydov.dev at gmail.com
       
    Sat Mar 17 20:56:35 MSK 2018
    
    
  
An index can be dropped and then recreated with the same space/index id.
To discriminate between different incarnations of the same index during
recovery, we use LSN from the time of index creation, as it is supposed
to be unique. However, the uniqueness property won't hold once ALTER is
implemented for vinyl spaces: the problem is we have to rebuild all
indexes of a space and commit them simultaneously in case the primary
index is ALTERED.
Actually, we don't really need a unique index ID stored both in vylog
and in WAL to recover all incarnations of vinyl indexes. Since WAL is
replayed in the chronological order, we can find the index corresponding
to index creation statement replayed from WAL by its space_id/index_id
and the number of times the index has been dropped since we started to
replay WAL. As we don't actually recover indexes that are dropped during
WAL recovery, we don't need to know their properties, we just need to
know that they existed. So we don't need to store a separate object for
each index incarnation in the recovery context, it's enough to add a
counter representing the number of incarnations the index has had since
the last checkpoint to the vy_index_recovery_info structure to be able
to recover all incarnations of the same vinyl index. Then on recovery,
vy_index_recover() would check the incarnation counter of the index and
 - recover the last incarnation if it is 1
 - emit create/drop record if it is > 1
 - do nothing if it is 0
and decrement the counter on success so that the next call to
vy_index_recover() with the same space and index id proceeds to
the next incarnation of this index. And this is what this patch
does in a nutshell.
The tricky part is preserving backward compatibility. We can't just
replace index_lsn with space_id/index_id everywhere, because there
still can be records that use index_lsn to identify an index and we
need to process them. For example, VY_LOG_CREATE_INDEX is fine in
this regard, because it contains both space_id/index_id and index_lsn,
but VY_LOG_DROP_INDEX isn't, as it only has index_lsn. So we have to
maintain two index hash tables during vylog recovery instead of just
one: one for new records where index is referenced by space_id/index_id
and another for legacy records where index_lsn is used instead.
---
 src/box/vinyl.c          |  39 ++-----
 src/box/vy_index.c       |  54 ++++-----
 src/box/vy_index.h       |  12 +-
 src/box/vy_log.c         | 286 ++++++++++++++++++++++++++++-------------------
 src/box/vy_log.h         |  97 +++++++++++-----
 src/box/vy_scheduler.c   |  12 +-
 test/unit/vy_log_stub.c  |   2 +-
 test/vinyl/layout.result |  32 +++---
 8 files changed, 300 insertions(+), 234 deletions(-)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index d3659b0b..244360fa 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -765,7 +765,6 @@ vy_index_open(struct vy_env *env, struct vy_index *index)
 		 * the index files from it.
 		 */
 		rc = vy_index_recover(index, env->recovery, &env->run_env,
-				vclock_sum(env->recovery_vclock),
 				env->status == VINYL_INITIAL_RECOVERY_LOCAL,
 				env->force_recovery);
 		break;
@@ -778,6 +777,8 @@ vy_index_open(struct vy_env *env, struct vy_index *index)
 static void
 vinyl_index_commit_create(struct index *base, int64_t lsn)
 {
+	(void)lsn;
+
 	struct vy_env *env = vy_env(base->engine);
 	struct vy_index *index = vy_index(base);
 
@@ -791,32 +792,13 @@ vinyl_index_commit_create(struct index *base, int64_t lsn)
 		 * the index isn't in the recovery context and we
 		 * need to retry to log it now.
 		 */
-		if (index->commit_lsn >= 0) {
+		if (index->is_committed) {
 			vy_scheduler_add_index(&env->scheduler, index);
 			return;
 		}
 	}
 
-	if (env->status == VINYL_INITIAL_RECOVERY_REMOTE) {
-		/*
-		 * Records received during initial join do not
-		 * have LSNs so we use a fake one to identify
-		 * the index in vylog.
-		 */
-		lsn = ++env->join_lsn;
-	}
-
-	/*
-	 * Backward compatibility fixup: historically, we used
-	 * box.info.signature for LSN of index creation, which
-	 * lags behind the LSN of the record that created the
-	 * index by 1. So for legacy indexes use the LSN from
-	 * index options.
-	 */
-	if (index->opts.lsn != 0)
-		lsn = index->opts.lsn;
-
-	index->commit_lsn = lsn;
+	index->is_committed = true;
 
 	assert(index->range_count == 1);
 	struct vy_range *range = vy_range_tree_first(index->tree);
@@ -830,9 +812,8 @@ vinyl_index_commit_create(struct index *base, int64_t lsn)
 	 * recovery.
 	 */
 	vy_log_tx_begin();
-	vy_log_create_index(index->commit_lsn, index->id,
-			    index->space_id, index->key_def);
-	vy_log_insert_range(index->commit_lsn, range->id, NULL, NULL);
+	vy_log_create_index(index->space_id, index->id, index->key_def);
+	vy_log_insert_range(index->space_id, index->id, range->id, NULL, NULL);
 	vy_log_tx_try_commit();
 	/*
 	 * After we committed the index in the log, we can schedule
@@ -889,7 +870,7 @@ vinyl_index_commit_drop(struct index *base)
 
 	vy_log_tx_begin();
 	vy_log_index_prune(index, checkpoint_last(NULL));
-	vy_log_drop_index(index->commit_lsn);
+	vy_log_drop_index(index->space_id, index->id);
 	vy_log_tx_try_commit();
 }
 
@@ -938,7 +919,7 @@ vinyl_space_prepare_truncate(struct space *old_space, struct space *new_space)
 		struct vy_index *old_index = vy_index(old_space->index[i]);
 		struct vy_index *new_index = vy_index(new_space->index[i]);
 
-		new_index->commit_lsn = old_index->commit_lsn;
+		new_index->is_committed = old_index->is_committed;
 
 		if (truncate_done) {
 			/*
@@ -1015,9 +996,9 @@ vinyl_space_commit_truncate(struct space *old_space, struct space *new_space)
 		assert(new_index->range_count == 1);
 
 		vy_log_index_prune(old_index, gc_lsn);
-		vy_log_insert_range(new_index->commit_lsn,
+		vy_log_insert_range(new_index->space_id, new_index->id,
 				    range->id, NULL, NULL);
-		vy_log_truncate_index(new_index->commit_lsn,
+		vy_log_truncate_index(new_index->space_id, new_index->id,
 				      new_index->truncate_count);
 	}
 	vy_log_tx_try_commit();
diff --git a/src/box/vy_index.c b/src/box/vy_index.c
index 9c199ddd..5a994c47 100644
--- a/src/box/vy_index.c
+++ b/src/box/vy_index.c
@@ -221,7 +221,6 @@ vy_index_new(struct vy_index_env *index_env, struct vy_cache_env *cache_env,
 		goto fail_mem;
 
 	index->refs = 1;
-	index->commit_lsn = -1;
 	index->dump_lsn = -1;
 	vy_cache_create(&index->cache, cache_env, cmp_def);
 	rlist_create(&index->sealed);
@@ -533,29 +532,26 @@ out:
 
 int
 vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
-		 struct vy_run_env *run_env, int64_t lsn,
+		 struct vy_run_env *run_env,
 		 bool is_checkpoint_recovery, bool force_recovery)
 {
 	assert(index->range_count == 0);
 
 	/*
-	 * Backward compatibility fixup: historically, we used
-	 * box.info.signature for LSN of index creation, which
-	 * lags behind the LSN of the record that created the
-	 * index by 1. So for legacy indexes use the LSN from
-	 * index options.
-	 */
-	if (index->opts.lsn != 0)
-		lsn = index->opts.lsn;
-
-	/*
 	 * Look up the last incarnation of the index in vylog.
 	 */
+	struct vy_index_recovery_id index_id = {
+		.index_id = index->id,
+		.space_id = index->space_id,
+	};
 	struct vy_index_recovery_info *index_info;
-	index_info = vy_recovery_lookup_index(recovery,
-					      index->space_id, index->id);
-	if (is_checkpoint_recovery) {
-		if (index_info == NULL) {
+	index_info = vy_recovery_lookup_index(recovery, &index_id);
+	if (index_info == NULL || index_info->incarnation_count == 0) {
+		/*
+		 * The index was either not found in vylog or
+		 * all incarnations have already been loaded.
+		 */
+		if (is_checkpoint_recovery) {
 			/*
 			 * All indexes created from snapshot rows must
 			 * be present in vylog, because snapshot can
@@ -568,16 +564,6 @@ vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
 					    (unsigned)index->id));
 			return -1;
 		}
-		if (lsn > index_info->index_lsn) {
-			/*
-			 * The last incarnation of the index was created
-			 * before the last checkpoint, load it now.
-			 */
-			lsn = index_info->index_lsn;
-		}
-	}
-
-	if (index_info == NULL || lsn > index_info->index_lsn) {
 		/*
 		 * If we failed to log index creation before restart,
 		 * we won't find it in the log on recovery. This is
@@ -588,9 +574,12 @@ vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
 		return vy_index_init_range_tree(index);
 	}
 
-	index->commit_lsn = lsn;
+	index->is_committed = true;
+
+	/* Advance to the next incarnation. */
+	index_info->incarnation_count--;
 
-	if (lsn < index_info->index_lsn || index_info->is_dropped) {
+	if (index_info->incarnation_count > 0 || index_info->is_dropped) {
 		/*
 		 * Loading a past incarnation of the index, i.e.
 		 * the index is going to dropped during final
@@ -671,8 +660,9 @@ vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
 	}
 	if (prev == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Index %lld has empty range tree",
-				    (long long)index->commit_lsn));
+			 tt_sprintf("Index %u/%u has empty range tree",
+				    (unsigned)index->space_id,
+				    (unsigned)index->id));
 		return -1;
 	}
 	if (prev->end != NULL) {
@@ -1049,7 +1039,7 @@ vy_index_split_range(struct vy_index *index, struct vy_range *range)
 	vy_log_delete_range(range->id);
 	for (int i = 0; i < n_parts; i++) {
 		part = parts[i];
-		vy_log_insert_range(index->commit_lsn, part->id,
+		vy_log_insert_range(index->space_id, index->id, part->id,
 				    tuple_data_or_null(part->begin),
 				    tuple_data_or_null(part->end));
 		rlist_foreach_entry(slice, &part->slices, in_range)
@@ -1115,7 +1105,7 @@ vy_index_coalesce_range(struct vy_index *index, struct vy_range *range)
 	 * Log change in metadata.
 	 */
 	vy_log_tx_begin();
-	vy_log_insert_range(index->commit_lsn, result->id,
+	vy_log_insert_range(index->space_id, index->id, result->id,
 			    tuple_data_or_null(result->begin),
 			    tuple_data_or_null(result->end));
 	for (it = first; it != end; it = vy_range_tree_next(index->tree, it)) {
diff --git a/src/box/vy_index.h b/src/box/vy_index.h
index 4368f6a5..5a4aa29b 100644
--- a/src/box/vy_index.h
+++ b/src/box/vy_index.h
@@ -254,10 +254,10 @@ struct vy_index {
 	 */
 	int64_t dump_lsn;
 	/**
-	 * LSN of the row that committed the index or -1 if
-	 * the index was not committed to the metadata log.
+	 * This flag is set if the index creation was
+	 * committed to the metadata log.
 	 */
-	int64_t commit_lsn;
+	bool is_committed;
 	/**
 	 * This flag is set if the index was dropped.
 	 * It is also set on local recovery if the index
@@ -381,14 +381,10 @@ vy_index_create(struct vy_index *index);
  * WAL, in which case a missing index is OK - it just means we
  * failed to log it before restart and have to retry during
  * WAL replay.
- *
- * @lsn is the LSN of the row that created the index.
- * If the index is recovered from a snapshot, it is set
- * to the snapshot signature.
  */
 int
 vy_index_recover(struct vy_index *index, struct vy_recovery *recovery,
-		 struct vy_run_env *run_env, int64_t lsn,
+		 struct vy_run_env *run_env,
 		 bool is_checkpoint_recovery, bool force_recovery);
 
 /**
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 8b95282b..cbd6dc16 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -304,11 +304,6 @@ vy_log_record_encode(const struct vy_log_record *record,
 	size += mp_sizeof_array(2);
 	size += mp_sizeof_uint(record->type);
 	size_t n_keys = 0;
-	if (record->index_lsn > 0) {
-		size += mp_sizeof_uint(VY_LOG_KEY_INDEX_LSN);
-		size += mp_sizeof_uint(record->index_lsn);
-		n_keys++;
-	}
 	if (record->range_id > 0) {
 		size += mp_sizeof_uint(VY_LOG_KEY_RANGE_ID);
 		size += mp_sizeof_uint(record->range_id);
@@ -386,10 +381,6 @@ vy_log_record_encode(const struct vy_log_record *record,
 	pos = mp_encode_array(pos, 2);
 	pos = mp_encode_uint(pos, record->type);
 	pos = mp_encode_map(pos, n_keys);
-	if (record->index_lsn > 0) {
-		pos = mp_encode_uint(pos, VY_LOG_KEY_INDEX_LSN);
-		pos = mp_encode_uint(pos, record->index_lsn);
-	}
 	if (record->range_id > 0) {
 		pos = mp_encode_uint(pos, VY_LOG_KEY_RANGE_ID);
 		pos = mp_encode_uint(pos, record->range_id);
@@ -1087,30 +1078,70 @@ vy_log_write(const struct vy_log_record *record)
  * vy_recovery::index_id_hash map.
  */
 static inline int64_t
-vy_recovery_index_id_hash(uint32_t space_id, uint32_t index_id)
+vy_index_recovery_id_hash(const struct vy_index_recovery_id *id)
 {
-	return ((uint64_t)space_id << 32) + index_id;
+	return ((uint64_t)id->space_id << 32) + id->index_id;
 }
 
-/** Lookup a vinyl index in vy_recovery::index_id_hash map. */
-struct vy_index_recovery_info *
-vy_recovery_lookup_index(struct vy_recovery *recovery,
-			 uint32_t space_id, uint32_t index_id)
+/** An snprint-style function to print an index id. */
+static int
+vy_index_recovery_id_snprint(char *buf, int size,
+			     const struct vy_index_recovery_id *id)
 {
-	int64_t key = vy_recovery_index_id_hash(space_id, index_id);
-	struct mh_i64ptr_t *h = recovery->index_id_hash;
-	mh_int_t k = mh_i64ptr_find(h, key, NULL);
-	if (k == mh_end(h))
-		return NULL;
-	return mh_i64ptr_node(h, k)->val;
+	int total = 0;
+	if (id->index_lsn != 0) {
+		/*
+		 * Legacy code: index_lsn is used as index ID.
+		 * Print the lsn and space_id/index_id if available.
+		 */
+		SNPRINT(total, snprintf, buf, size, "%lld",
+			(long long)id->index_lsn);
+		if (id->space_id != 0)
+			SNPRINT(total, snprintf, buf, size, " (%u/%u)",
+				(unsigned)id->space_id, (unsigned)id->index_id);
+	} else {
+		SNPRINT(total, snprintf, buf, size, "%u/%u",
+			(unsigned)id->space_id, (unsigned)id->index_id);
+	}
+	return total;
+}
+
+/**
+ * Return a string containing a human readable representation
+ * of an index id.
+ */
+static const char *
+vy_index_recovery_id_str(const struct vy_index_recovery_id *id)
+{
+	char *buf = tt_static_buf();
+	vy_index_recovery_id_snprint(buf, TT_STATIC_BUF_LEN, id);
+	return buf;
 }
 
-/** Lookup a vinyl index in vy_recovery::index_lsn_hash map. */
-static struct vy_index_recovery_info *
-vy_recovery_lookup_index_by_lsn(struct vy_recovery *recovery, int64_t index_lsn)
+/**
+ * Lookup a vinyl index in a recovery context.
+ *
+ * If the index id contains LSN (legacy), we look it up
+ * in index_lsn_hash, otherwise in index_id_hash.
+ */
+struct vy_index_recovery_info *
+vy_recovery_lookup_index(struct vy_recovery *recovery,
+			 const struct vy_index_recovery_id *id)
 {
-	struct mh_i64ptr_t *h = recovery->index_lsn_hash;
-	mh_int_t k = mh_i64ptr_find(h, index_lsn, NULL);
+	int64_t key;
+	struct mh_i64ptr_t *h;
+	if (id->index_lsn != 0) {
+		/*
+		 * Legacy code: index_lsn is used as index ID.
+		 * Look up the index in the corresponding hash.
+		 */
+		key = id->index_lsn;
+		h = recovery->index_lsn_hash;
+	} else {
+		key = vy_index_recovery_id_hash(id);
+		h = recovery->index_id_hash;
+	}
+	mh_int_t k = mh_i64ptr_find(h, key, NULL);
 	if (k == mh_end(h))
 		return NULL;
 	return mh_i64ptr_node(h, k)->val;
@@ -1151,13 +1182,13 @@ vy_recovery_lookup_slice(struct vy_recovery *recovery, int64_t slice_id)
 
 /**
  * Handle a VY_LOG_CREATE_INDEX log record.
- * This function allocates a new vinyl index with ID @index_lsn
+ * This function allocates a new vinyl index with ID @id
  * and inserts it to the hash.
  * Return 0 on success, -1 on failure (ID collision or OOM).
  */
 static int
-vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn,
-			 uint32_t index_id, uint32_t space_id,
+vy_recovery_create_index(struct vy_recovery *recovery,
+			 const struct vy_index_recovery_id *id,
 			 const struct key_part_def *key_parts,
 			 uint32_t key_part_count)
 {
@@ -1173,8 +1204,8 @@ vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn,
 	 */
 	if (key_parts == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Missing key definition for index %lld",
-				    (long long)index_lsn));
+			 tt_sprintf("Missing key definition for index %s",
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 	key_parts_copy = malloc(sizeof(*key_parts) * key_part_count);
@@ -1189,7 +1220,7 @@ vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn,
 	 * Look up the index in the hash.
 	 */
 	h = recovery->index_id_hash;
-	node.key = vy_recovery_index_id_hash(space_id, index_id);
+	node.key = vy_index_recovery_id_hash(id);
 	k = mh_i64ptr_find(h, node.key, NULL);
 	index = (k != mh_end(h)) ? mh_i64ptr_node(h, k)->val : NULL;
 
@@ -1207,10 +1238,11 @@ vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn,
 			free(key_parts_copy);
 			return -1;
 		}
-		index->index_id = index_id;
-		index->space_id = space_id;
+		index->index_id = id->index_id;
+		index->space_id = id->space_id;
 		rlist_create(&index->ranges);
 		rlist_create(&index->runs);
+		index->incarnation_count = 1;
 
 		node.val = index;
 		if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) {
@@ -1226,112 +1258,124 @@ vy_recovery_create_index(struct vy_recovery *recovery, int64_t index_lsn,
 		 * The index was dropped and recreated with the
 		 * same ID. Update its key definition (because it
 		 * could have changed since the last time it was
-		 * used) and reset its state.
+		 * used), reset its state, and bump the incarnation
+		 * counter.
 		 */
 		if (!index->is_dropped) {
 			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-				 tt_sprintf("Index %u/%u created twice",
-					    (unsigned)space_id,
-					    (unsigned)index_id));
+				 tt_sprintf("Index %s created twice",
+					    vy_index_recovery_id_str(id)));
 			free(key_parts_copy);
 			return -1;
 		}
-		assert(index->index_id == index_id);
-		assert(index->space_id == space_id);
+		assert(index->index_id == id->index_id);
+		assert(index->space_id == id->space_id);
+		index->incarnation_count++;
 		free(index->key_parts);
 	}
 
-	index->index_lsn = index_lsn;
 	index->key_parts = key_parts_copy;
 	index->key_part_count = key_part_count;
 	index->is_dropped = false;
 	index->dump_lsn = -1;
 	index->truncate_count = 0;
 
-	/*
-	 * Add the index to the LSN hash.
-	 */
-	h = recovery->index_lsn_hash;
-	node.key = index_lsn;
-	node.val = index;
-	if (mh_i64ptr_find(h, index_lsn, NULL) != mh_end(h)) {
-		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Duplicate index id %lld",
-				    (long long)index_lsn));
-		return -1;
-	}
-	if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) {
-		diag_set(OutOfMemory, 0, "mh_i64ptr_put",
-			 "mh_i64ptr_node_t");
-		return -1;
+	if (id->index_lsn != 0) {
+		/*
+		 * Legacy code: index_lsn is used as index ID.
+		 * Add the index to the corresponding hash.
+		 */
+		h = recovery->index_lsn_hash;
+		node.key = id->index_lsn;
+		node.val = index;
+		if (mh_i64ptr_find(h, id->index_lsn, NULL) != mh_end(h)) {
+			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
+				 tt_sprintf("Duplicate index id %s",
+					    vy_index_recovery_id_str(id)));
+			return -1;
+		}
+		if (mh_i64ptr_put(h, &node, NULL, NULL) == mh_end(h)) {
+			diag_set(OutOfMemory, 0, "mh_i64ptr_put",
+				 "mh_i64ptr_node_t");
+			return -1;
+		}
 	}
 	return 0;
 }
 
 /**
  * Handle a VY_LOG_DROP_INDEX log record.
- * This function marks the vinyl index with ID @index_lsn as dropped.
+ * This function marks the vinyl index with ID @id as dropped.
  * All ranges and runs of the index must have been deleted by now.
  * Returns 0 on success, -1 if ID not found or index is already marked.
  */
 static int
-vy_recovery_drop_index(struct vy_recovery *recovery, int64_t index_lsn)
+vy_recovery_drop_index(struct vy_recovery *recovery,
+		       const struct vy_index_recovery_id *id)
 {
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
+	index = vy_recovery_lookup_index(recovery, id);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Index %lld deleted but not registered",
-				    (long long)index_lsn));
+			 tt_sprintf("Index %s deleted but not registered",
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 	if (index->is_dropped) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Index %lld deleted twice",
-				    (long long)index_lsn));
+			 tt_sprintf("Index %s deleted twice",
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 	if (!rlist_empty(&index->ranges)) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Dropped index %lld has ranges",
-				    (long long)index_lsn));
+			 tt_sprintf("Dropped index %s has ranges",
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 	struct vy_run_recovery_info *run;
 	rlist_foreach_entry(run, &index->runs, in_index) {
 		if (!run->is_dropped && !run->is_incomplete) {
 			diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-				 tt_sprintf("Dropped index %lld has active "
-					    "runs", (long long)index_lsn));
+				 tt_sprintf("Dropped index %s has active runs",
+					    vy_index_recovery_id_str(id)));
 			return -1;
 		}
 	}
 	index->is_dropped = true;
+	if (!recovery->seen_snapshot) {
+		/*
+		 * The index was dropped before the last checkpoint.
+		 * Reset the incarnation counter, because we won't
+		 * load this incarnation during WAL recovery.
+		 */
+		index->incarnation_count = 0;
+	}
 	return 0;
 }
 
 /**
  * Handle a VY_LOG_DUMP_INDEX log record.
  * This function updates LSN of the last dump of the vinyl index
- * with ID @index_lsn.
+ * with ID @id.
  * Returns 0 on success, -1 if ID not found or index is dropped.
  */
 static int
 vy_recovery_dump_index(struct vy_recovery *recovery,
-		       int64_t index_lsn, int64_t dump_lsn)
+		       const struct vy_index_recovery_id *id, int64_t dump_lsn)
 {
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
+	index = vy_recovery_lookup_index(recovery, id);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Dump of unregistered index %lld",
-				    (long long)index_lsn));
+			 tt_sprintf("Dump of unregistered index %s",
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 	if (index->is_dropped) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Dump of deleted index %lld",
-				    (long long)index_lsn));
+			 tt_sprintf("Dump of deleted index %s",
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 	index->dump_lsn = dump_lsn;
@@ -1340,25 +1384,26 @@ vy_recovery_dump_index(struct vy_recovery *recovery,
 
 /**
  * Handle a VY_LOG_TRUNCATE_INDEX log record.
- * This function updates truncate_count of the index with ID @index_lsn.
+ * This function updates truncate_count of the index with ID @id.
  * Returns 0 on success, -1 if ID not found or index is dropped.
  */
 static int
 vy_recovery_truncate_index(struct vy_recovery *recovery,
-			   int64_t index_lsn, int64_t truncate_count)
+			   const struct vy_index_recovery_id *id,
+			   int64_t truncate_count)
 {
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
+	index = vy_recovery_lookup_index(recovery, id);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Truncation of unregistered index %lld",
-				    (long long)index_lsn));
+			 tt_sprintf("Truncation of unregistered index %s",
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 	if (index->is_dropped) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
-			 tt_sprintf("Truncation of deleted index %lld",
-				    (long long)index_lsn));
+			 tt_sprintf("Truncation of deleted index %s",
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 	index->truncate_count = truncate_count;
@@ -1402,21 +1447,21 @@ vy_recovery_do_create_run(struct vy_recovery *recovery, int64_t run_id)
 /**
  * Handle a VY_LOG_PREPARE_RUN log record.
  * This function creates a new incomplete vinyl run with ID @run_id
- * and adds it to the list of runs of the index with ID @index_lsn.
+ * and adds it to the list of runs of the index with ID @id.
  * Return 0 on success, -1 if run already exists, index not found,
  * or OOM.
  */
 static int
-vy_recovery_prepare_run(struct vy_recovery *recovery, int64_t index_lsn,
-			int64_t run_id)
+vy_recovery_prepare_run(struct vy_recovery *recovery,
+			const struct vy_index_recovery_id *id, int64_t run_id)
 {
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
+	index = vy_recovery_lookup_index(recovery, id);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Run %lld created for unregistered "
-				    "index %lld", (long long)run_id,
-				    (long long)index_lsn));
+				    "index %s", (long long)run_id,
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 	if (vy_recovery_lookup_run(recovery, run_id) != NULL) {
@@ -1437,29 +1482,30 @@ vy_recovery_prepare_run(struct vy_recovery *recovery, int64_t index_lsn,
 /**
  * Handle a VY_LOG_CREATE_RUN log record.
  * This function adds the vinyl run with ID @run_id to the list
- * of runs of the index with ID @index_lsn and marks it committed.
+ * of runs of the index with ID @id and marks it committed.
  * If the run does not exist, it will be created.
  * Return 0 on success, -1 if index not found, run or index
  * is dropped, or OOM.
  */
 static int
-vy_recovery_create_run(struct vy_recovery *recovery, int64_t index_lsn,
+vy_recovery_create_run(struct vy_recovery *recovery,
+		       const struct vy_index_recovery_id *id,
 		       int64_t run_id, int64_t dump_lsn)
 {
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
+	index = vy_recovery_lookup_index(recovery, id);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Run %lld created for unregistered "
-				    "index %lld", (long long)run_id,
-				    (long long)index_lsn));
+				    "index %s", (long long)run_id,
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 	if (index->is_dropped) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Run %lld created for deleted "
-				    "index %lld", (long long)run_id,
-				    (long long)index_lsn));
+				    "index %s", (long long)run_id,
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 	struct vy_run_recovery_info *run;
@@ -1538,11 +1584,12 @@ vy_recovery_forget_run(struct vy_recovery *recovery, int64_t run_id)
  * Handle a VY_LOG_INSERT_RANGE log record.
  * This function allocates a new vinyl range with ID @range_id,
  * inserts it to the hash, and adds it to the list of ranges of the
- * index with ID @index_lsn.
+ * index with ID @id.
  * Return 0 on success, -1 on failure (ID collision or OOM).
  */
 static int
-vy_recovery_insert_range(struct vy_recovery *recovery, int64_t index_lsn,
+vy_recovery_insert_range(struct vy_recovery *recovery,
+			 const struct vy_index_recovery_id *id,
 			 int64_t range_id, const char *begin, const char *end)
 {
 	if (vy_recovery_lookup_range(recovery, range_id) != NULL) {
@@ -1552,12 +1599,12 @@ vy_recovery_insert_range(struct vy_recovery *recovery, int64_t index_lsn,
 		return -1;
 	}
 	struct vy_index_recovery_info *index;
-	index = vy_recovery_lookup_index_by_lsn(recovery, index_lsn);
+	index = vy_recovery_lookup_index(recovery, id);
 	if (index == NULL) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("Range %lld created for unregistered "
-				    "index %lld", (long long)range_id,
-				    (long long)index_lsn));
+				    "index %s", (long long)range_id,
+				    vy_index_recovery_id_str(id)));
 		return -1;
 	}
 
@@ -1761,29 +1808,34 @@ static int
 vy_recovery_process_record(struct vy_recovery *recovery,
 			   const struct vy_log_record *record)
 {
+	struct vy_index_recovery_id index_id = {
+		.space_id = record->space_id,
+		.index_id = record->index_id,
+		.index_lsn = record->index_lsn,
+	};
+
 	int rc;
 	switch (record->type) {
 	case VY_LOG_CREATE_INDEX:
-		rc = vy_recovery_create_index(recovery, record->index_lsn,
-				record->index_id, record->space_id,
+		rc = vy_recovery_create_index(recovery, &index_id,
 				record->key_parts, record->key_part_count);
 		break;
 	case VY_LOG_DROP_INDEX:
-		rc = vy_recovery_drop_index(recovery, record->index_lsn);
+		rc = vy_recovery_drop_index(recovery, &index_id);
 		break;
 	case VY_LOG_INSERT_RANGE:
-		rc = vy_recovery_insert_range(recovery, record->index_lsn,
+		rc = vy_recovery_insert_range(recovery, &index_id,
 				record->range_id, record->begin, record->end);
 		break;
 	case VY_LOG_DELETE_RANGE:
 		rc = vy_recovery_delete_range(recovery, record->range_id);
 		break;
 	case VY_LOG_PREPARE_RUN:
-		rc = vy_recovery_prepare_run(recovery, record->index_lsn,
+		rc = vy_recovery_prepare_run(recovery, &index_id,
 					     record->run_id);
 		break;
 	case VY_LOG_CREATE_RUN:
-		rc = vy_recovery_create_run(recovery, record->index_lsn,
+		rc = vy_recovery_create_run(recovery, &index_id,
 					    record->run_id, record->dump_lsn);
 		break;
 	case VY_LOG_DROP_RUN:
@@ -1802,11 +1854,11 @@ vy_recovery_process_record(struct vy_recovery *recovery,
 		rc = vy_recovery_delete_slice(recovery, record->slice_id);
 		break;
 	case VY_LOG_DUMP_INDEX:
-		rc = vy_recovery_dump_index(recovery, record->index_lsn,
+		rc = vy_recovery_dump_index(recovery, &index_id,
 					    record->dump_lsn);
 		break;
 	case VY_LOG_TRUNCATE_INDEX:
-		rc = vy_recovery_truncate_index(recovery, record->index_lsn,
+		rc = vy_recovery_truncate_index(recovery, &index_id,
 						record->truncate_count);
 		break;
 	default:
@@ -1841,6 +1893,7 @@ vy_recovery_new_f(va_list ap)
 	recovery->run_hash = NULL;
 	recovery->slice_hash = NULL;
 	recovery->max_id = -1;
+	recovery->seen_snapshot = false;
 
 	recovery->index_id_hash = mh_i64ptr_new();
 	recovery->index_lsn_hash = mh_i64ptr_new();
@@ -1879,6 +1932,7 @@ vy_recovery_new_f(va_list ap)
 		say_verbose("load vylog record: %s",
 			    vy_log_record_str(&record));
 		if (record.type == VY_LOG_SNAPSHOT) {
+			recovery->seen_snapshot = true;
 			if (only_checkpoint)
 				break;
 			continue;
@@ -2012,7 +2066,6 @@ vy_log_append_index(struct xlog *xlog, struct vy_index_recovery_info *index)
 
 	vy_log_record_init(&record);
 	record.type = VY_LOG_CREATE_INDEX;
-	record.index_lsn = index->index_lsn;
 	record.index_id = index->index_id;
 	record.space_id = index->space_id;
 	record.key_parts = index->key_parts;
@@ -2023,7 +2076,8 @@ vy_log_append_index(struct xlog *xlog, struct vy_index_recovery_info *index)
 	if (index->truncate_count > 0) {
 		vy_log_record_init(&record);
 		record.type = VY_LOG_TRUNCATE_INDEX;
-		record.index_lsn = index->index_lsn;
+		record.index_id = index->index_id;
+		record.space_id = index->space_id;
 		record.truncate_count = index->truncate_count;
 		if (vy_log_append_record(xlog, &record) != 0)
 			return -1;
@@ -2032,7 +2086,8 @@ vy_log_append_index(struct xlog *xlog, struct vy_index_recovery_info *index)
 	if (index->dump_lsn >= 0) {
 		vy_log_record_init(&record);
 		record.type = VY_LOG_DUMP_INDEX;
-		record.index_lsn = index->index_lsn;
+		record.index_id = index->index_id;
+		record.space_id = index->space_id;
 		record.dump_lsn = index->dump_lsn;
 		if (vy_log_append_record(xlog, &record) != 0)
 			return -1;
@@ -2046,7 +2101,8 @@ vy_log_append_index(struct xlog *xlog, struct vy_index_recovery_info *index)
 			record.type = VY_LOG_CREATE_RUN;
 			record.dump_lsn = run->dump_lsn;
 		}
-		record.index_lsn = index->index_lsn;
+		record.index_id = index->index_id;
+		record.space_id = index->space_id;
 		record.run_id = run->id;
 		if (vy_log_append_record(xlog, &record) != 0)
 			return -1;
@@ -2065,7 +2121,8 @@ vy_log_append_index(struct xlog *xlog, struct vy_index_recovery_info *index)
 	rlist_foreach_entry(range, &index->ranges, in_index) {
 		vy_log_record_init(&record);
 		record.type = VY_LOG_INSERT_RANGE;
-		record.index_lsn = index->index_lsn;
+		record.index_id = index->index_id;
+		record.space_id = index->space_id;
 		record.range_id = range->id;
 		record.begin = range->begin;
 		record.end = range->end;
@@ -2092,7 +2149,8 @@ vy_log_append_index(struct xlog *xlog, struct vy_index_recovery_info *index)
 	if (index->is_dropped) {
 		vy_log_record_init(&record);
 		record.type = VY_LOG_DROP_INDEX;
-		record.index_lsn = index->index_lsn;
+		record.index_id = index->index_id;
+		record.space_id = index->space_id;
 		if (vy_log_append_record(xlog, &record) != 0)
 			return -1;
 	}
diff --git a/src/box/vy_log.h b/src/box/vy_log.h
index 8fbacd0f..67dcd418 100644
--- a/src/box/vy_log.h
+++ b/src/box/vy_log.h
@@ -65,18 +65,19 @@ struct mh_i64ptr_t;
 enum vy_log_record_type {
 	/**
 	 * Create a new vinyl index.
-	 * Requires vy_log_record::index_lsn, index_id, space_id,
+	 * Requires vy_log_record::space_id, index_id,
 	 * key_def (with primary key parts).
 	 */
 	VY_LOG_CREATE_INDEX		= 0,
 	/**
 	 * Drop an index.
-	 * Requires vy_log_record::index_lsn.
+	 * Requires vy_log_record::space_id, index_id.
 	 */
 	VY_LOG_DROP_INDEX		= 1,
 	/**
 	 * Insert a new range into a vinyl index.
-	 * Requires vy_log_record::index_lsn, range_id, begin, end.
+	 * Requires vy_log_record::space_id, index_id, range_id,
+	 * begin, end.
 	 */
 	VY_LOG_INSERT_RANGE		= 2,
 	/**
@@ -86,7 +87,7 @@ enum vy_log_record_type {
 	VY_LOG_DELETE_RANGE		= 3,
 	/**
 	 * Prepare a vinyl run file.
-	 * Requires vy_log_record::index_lsn, run_id.
+	 * Requires vy_log_record::space_id, index_id, run_id.
 	 *
 	 * Record of this type is written before creating a run file.
 	 * It is needed to keep track of unfinished due to errors run
@@ -95,7 +96,8 @@ enum vy_log_record_type {
 	VY_LOG_PREPARE_RUN		= 4,
 	/**
 	 * Commit a vinyl run file creation.
-	 * Requires vy_log_record::index_lsn, run_id, dump_lsn.
+	 * Requires vy_log_record::space_id, index_id, run_id,
+	 * dump_lsn.
 	 *
 	 * Written after a run file was successfully created.
 	 */
@@ -126,7 +128,8 @@ enum vy_log_record_type {
 	VY_LOG_FORGET_RUN		= 7,
 	/**
 	 * Insert a run slice into a range.
-	 * Requires vy_log_record::range_id, run_id, slice_id, begin, end.
+	 * Requires vy_log_record::range_id, run_id, slice_id,
+	 * begin, end.
 	 */
 	VY_LOG_INSERT_SLICE		= 8,
 	/**
@@ -136,7 +139,7 @@ enum vy_log_record_type {
 	VY_LOG_DELETE_SLICE		= 9,
 	/**
 	 * Update LSN of the last index dump.
-	 * Requires vy_log_record::index_lsn, dump_lsn.
+	 * Requires vy_log_record::space_id, index_id, dump_lsn.
 	 */
 	VY_LOG_DUMP_INDEX		= 10,
 	/**
@@ -152,7 +155,7 @@ enum vy_log_record_type {
 	VY_LOG_SNAPSHOT			= 11,
 	/**
 	 * Update truncate count of a vinyl index.
-	 * Requires vy_log_record::index_lsn, truncate_count.
+	 * Requires vy_log_record::space_id, index_id, truncate_count.
 	 */
 	VY_LOG_TRUNCATE_INDEX		= 12,
 
@@ -165,7 +168,11 @@ struct vy_log_record {
 	enum vy_log_record_type type;
 	/**
 	 * LSN from the time of index creation.
-	 * Used to identify indexes in vylog.
+
+	 * We used to identify different incarnations of
+	 * the same index by LSN. Now it isn't needed, but
+	 * we have to maintain it to handle records created
+	 * by older versions.
 	 */
 	int64_t index_lsn;
 	/** Unique ID of the vinyl range. */
@@ -216,7 +223,13 @@ struct vy_recovery {
 	struct rlist indexes;
 	/** space_id, index_id -> vy_index_recovery_info. */
 	struct mh_i64ptr_t *index_id_hash;
-	/** index_lsn -> vy_index_recovery_info. */
+	/**
+	 * index_lsn -> vy_index_recovery_info.
+	 *
+	 * This is a legacy from the time when index_lsn was
+	 * used as a unique index ID. We need it to process
+	 * records written by older versions.
+	 */
 	struct mh_i64ptr_t *index_lsn_hash;
 	/** ID -> vy_range_recovery_info. */
 	struct mh_i64ptr_t *range_hash;
@@ -225,18 +238,34 @@ struct vy_recovery {
 	/** ID -> vy_slice_recovery_info. */
 	struct mh_i64ptr_t *slice_hash;
 	/**
+	 * The following flag is set once the VY_LOG_SNAPSHOT
+	 * record is processed.
+	 */
+	bool seen_snapshot;
+	/**
 	 * Maximal vinyl object ID, according to the metadata log,
 	 * or -1 in case no vinyl objects were recovered.
 	 */
 	int64_t max_id;
 };
 
+/**
+ * A helper structure that consists of vy_log_record
+ * fields needed to identify an index.
+ */
+struct vy_index_recovery_id {
+	/** vy_log_record::space_id */
+	uint32_t space_id;
+	/** vy_log_record::index_id */
+	uint32_t index_id;
+	/** vy_log_record::index_lsn (legacy) */
+	int64_t index_lsn;
+};
+
 /** Vinyl index info stored in a recovery context. */
 struct vy_index_recovery_info {
 	/** Link in vy_recovery::indexes. */
 	struct rlist in_recovery;
-	/** LSN of the index creation. */
-	int64_t index_lsn;
 	/** Ordinal index number in the space. */
 	uint32_t index_id;
 	/** Space ID. */
@@ -252,6 +281,11 @@ struct vy_index_recovery_info {
 	/** Truncate count. */
 	int64_t truncate_count;
 	/**
+	 * Number of incarnations the index has had
+	 * since the last checkpoint.
+	 */
+	int incarnation_count;
+	/**
 	 * List of all ranges in the index, linked by
 	 * vy_range_recovery_info::in_index.
 	 */
@@ -481,7 +515,7 @@ vy_recovery_delete(struct vy_recovery *recovery);
  */
 struct vy_index_recovery_info *
 vy_recovery_lookup_index(struct vy_recovery *recovery,
-			 uint32_t space_id, uint32_t index_id);
+			 const struct vy_index_recovery_id *id);
 
 /**
  * Initialize a log record with default values.
@@ -496,39 +530,40 @@ vy_log_record_init(struct vy_log_record *record)
 
 /** Helper to log a vinyl index creation. */
 static inline void
-vy_log_create_index(int64_t index_lsn, uint32_t index_id, uint32_t space_id,
+vy_log_create_index(uint32_t space_id, uint32_t index_id,
 		    const struct key_def *key_def)
 {
 	struct vy_log_record record;
 	vy_log_record_init(&record);
 	record.type = VY_LOG_CREATE_INDEX;
-	record.index_lsn = index_lsn;
-	record.index_id = index_id;
 	record.space_id = space_id;
+	record.index_id = index_id;
 	record.key_def = key_def;
 	vy_log_write(&record);
 }
 
 /** Helper to log a vinyl index drop. */
 static inline void
-vy_log_drop_index(int64_t index_lsn)
+vy_log_drop_index(uint32_t space_id, uint32_t index_id)
 {
 	struct vy_log_record record;
 	vy_log_record_init(&record);
 	record.type = VY_LOG_DROP_INDEX;
-	record.index_lsn = index_lsn;
+	record.space_id = space_id;
+	record.index_id = index_id;
 	vy_log_write(&record);
 }
 
 /** Helper to log a vinyl range insertion. */
 static inline void
-vy_log_insert_range(int64_t index_lsn, int64_t range_id,
+vy_log_insert_range(uint32_t space_id, uint32_t index_id, int64_t range_id,
 		    const char *begin, const char *end)
 {
 	struct vy_log_record record;
 	vy_log_record_init(&record);
 	record.type = VY_LOG_INSERT_RANGE;
-	record.index_lsn = index_lsn;
+	record.space_id = space_id;
+	record.index_id = index_id;
 	record.range_id = range_id;
 	record.begin = begin;
 	record.end = end;
@@ -548,24 +583,27 @@ vy_log_delete_range(int64_t range_id)
 
 /** Helper to log a vinyl run file creation. */
 static inline void
-vy_log_prepare_run(int64_t index_lsn, int64_t run_id)
+vy_log_prepare_run(uint32_t space_id, uint32_t index_id, int64_t run_id)
 {
 	struct vy_log_record record;
 	vy_log_record_init(&record);
 	record.type = VY_LOG_PREPARE_RUN;
-	record.index_lsn = index_lsn;
+	record.space_id = space_id;
+	record.index_id = index_id;
 	record.run_id = run_id;
 	vy_log_write(&record);
 }
 
 /** Helper to log a vinyl run creation. */
 static inline void
-vy_log_create_run(int64_t index_lsn, int64_t run_id, int64_t dump_lsn)
+vy_log_create_run(uint32_t space_id, uint32_t index_id,
+		  int64_t run_id, int64_t dump_lsn)
 {
 	struct vy_log_record record;
 	vy_log_record_init(&record);
 	record.type = VY_LOG_CREATE_RUN;
-	record.index_lsn = index_lsn;
+	record.space_id = space_id;
+	record.index_id = index_id;
 	record.run_id = run_id;
 	record.dump_lsn = dump_lsn;
 	vy_log_write(&record);
@@ -623,24 +661,27 @@ vy_log_delete_slice(int64_t slice_id)
 
 /** Helper to log index dump. */
 static inline void
-vy_log_dump_index(int64_t index_lsn, int64_t dump_lsn)
+vy_log_dump_index(uint32_t space_id, uint32_t index_id, int64_t dump_lsn)
 {
 	struct vy_log_record record;
 	vy_log_record_init(&record);
 	record.type = VY_LOG_DUMP_INDEX;
-	record.index_lsn = index_lsn;
+	record.space_id = space_id;
+	record.index_id = index_id;
 	record.dump_lsn = dump_lsn;
 	vy_log_write(&record);
 }
 
 /** Helper to log index truncation. */
 static inline void
-vy_log_truncate_index(int64_t index_lsn, int64_t truncate_count)
+vy_log_truncate_index(uint32_t space_id, uint32_t index_id,
+		      int64_t truncate_count)
 {
 	struct vy_log_record record;
 	vy_log_record_init(&record);
 	record.type = VY_LOG_TRUNCATE_INDEX;
-	record.index_lsn = index_lsn;
+	record.space_id = space_id;
+	record.index_id = index_id;
 	record.truncate_count = truncate_count;
 	vy_log_write(&record);
 }
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 382cf071..0fde3138 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -583,7 +583,7 @@ vy_run_prepare(struct vy_run_env *run_env, struct vy_index *index)
 	if (run == NULL)
 		return NULL;
 	vy_log_tx_begin();
-	vy_log_prepare_run(index->commit_lsn, run->id);
+	vy_log_prepare_run(index->space_id, index->id, run->id);
 	if (vy_log_tx_commit() < 0) {
 		vy_run_unref(run);
 		return NULL;
@@ -706,7 +706,7 @@ vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task)
 		 * to log index dump anyway.
 		 */
 		vy_log_tx_begin();
-		vy_log_dump_index(index->commit_lsn, dump_lsn);
+		vy_log_dump_index(index->space_id, index->id, dump_lsn);
 		if (vy_log_tx_commit() < 0)
 			goto fail;
 		vy_run_discard(new_run);
@@ -766,7 +766,7 @@ vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task)
 	 * Log change in metadata.
 	 */
 	vy_log_tx_begin();
-	vy_log_create_run(index->commit_lsn, new_run->id, dump_lsn);
+	vy_log_create_run(index->space_id, index->id, new_run->id, dump_lsn);
 	for (range = begin_range, i = 0; range != end_range;
 	     range = vy_range_tree_next(index->tree, range), i++) {
 		assert(i < index->range_count);
@@ -778,7 +778,7 @@ vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task)
 		if (++loops % VY_YIELD_LOOPS == 0)
 			fiber_sleep(0); /* see comment above */
 	}
-	vy_log_dump_index(index->commit_lsn, dump_lsn);
+	vy_log_dump_index(index->space_id, index->id, dump_lsn);
 	if (vy_log_tx_commit() < 0)
 		goto fail_free_slices;
 
@@ -1103,8 +1103,8 @@ vy_task_compact_complete(struct vy_scheduler *scheduler, struct vy_task *task)
 	rlist_foreach_entry(run, &unused_runs, in_unused)
 		vy_log_drop_run(run->id, gc_lsn);
 	if (new_slice != NULL) {
-		vy_log_create_run(index->commit_lsn, new_run->id,
-				  new_run->dump_lsn);
+		vy_log_create_run(index->space_id, index->id,
+				  new_run->id, new_run->dump_lsn);
 		vy_log_insert_slice(range->id, new_run->id, new_slice->id,
 				    tuple_data_or_null(new_slice->begin),
 				    tuple_data_or_null(new_slice->end));
diff --git a/test/unit/vy_log_stub.c b/test/unit/vy_log_stub.c
index 1fda0a6b..f3387846 100644
--- a/test/unit/vy_log_stub.c
+++ b/test/unit/vy_log_stub.c
@@ -53,7 +53,7 @@ vy_log_write(const struct vy_log_record *record) {}
 
 struct vy_index_recovery_info *
 vy_recovery_lookup_index(struct vy_recovery *recovery,
-			 uint32_t space_id, uint32_t index_id)
+			 const struct vy_index_recovery_id *id)
 {
 	unreachable();
 }
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index 603d2865..c59d8c5f 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -128,19 +128,19 @@ result
     - - HEADER:
           type: INSERT
         BODY:
-          tuple: [0, {0: 3, 7: [{'field': 0, 'collation': 1, 'type': 'string'}], 6: 512}]
+          tuple: [0, {7: [{'field': 0, 'collation': 1, 'type': 'string'}], 6: 512}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [10, {0: 3, 9: 9}]
+          tuple: [10, {9: 9, 6: 512}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [5, {0: 3, 2: 6, 9: 9}]
+          tuple: [5, {2: 6, 9: 9, 6: 512}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [4, {0: 3, 2: 3}]
+          tuple: [4, {2: 3, 6: 512}]
       - HEADER:
           type: INSERT
         BODY:
@@ -148,7 +148,7 @@ result
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [2, {0: 3}]
+          tuple: [2, {6: 512}]
       - HEADER:
           type: INSERT
         BODY:
@@ -156,19 +156,19 @@ result
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [0, {0: 4, 5: 1, 6: 512, 7: [{'field': 1, 'is_nullable': true, 'type': 'unsigned'}]}]
+          tuple: [0, {5: 1, 6: 512, 7: [{'field': 1, 'is_nullable': true, 'type': 'unsigned'}]}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [10, {0: 4, 9: 9}]
+          tuple: [10, {9: 9, 5: 1, 6: 512}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [5, {0: 4, 2: 4, 9: 9}]
+          tuple: [5, {2: 4, 5: 1, 6: 512, 9: 9}]
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [4, {0: 4, 2: 2}]
+          tuple: [4, {2: 2, 5: 1, 6: 512}]
       - HEADER:
           type: INSERT
         BODY:
@@ -176,7 +176,7 @@ result
       - HEADER:
           type: INSERT
         BODY:
-          tuple: [2, {0: 4, 1: 1}]
+          tuple: [2, {1: 1, 5: 1, 6: 512}]
       - HEADER:
           type: INSERT
         BODY:
@@ -199,12 +199,12 @@ result
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [4, {0: 4, 2: 8}]
+          tuple: [4, {2: 8, 5: 1, 6: 512}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [5, {0: 4, 2: 8, 9: 12}]
+          tuple: [5, {2: 8, 5: 1, 6: 512, 9: 12}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
@@ -214,17 +214,17 @@ result
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [10, {0: 4, 9: 12}]
+          tuple: [10, {9: 12, 5: 1, 6: 512}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [4, {0: 3, 2: 10}]
+          tuple: [4, {2: 10, 6: 512}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [5, {0: 3, 2: 10, 9: 12}]
+          tuple: [5, {2: 10, 9: 12, 6: 512}]
       - HEADER:
           timestamp: <timestamp>
           type: INSERT
@@ -234,7 +234,7 @@ result
           timestamp: <timestamp>
           type: INSERT
         BODY:
-          tuple: [10, {0: 3, 9: 12}]
+          tuple: [10, {9: 12, 6: 512}]
   - - 00000000000000000006.index
     - - HEADER:
           type: RUNINFO
-- 
2.11.0
    
    
More information about the Tarantool-patches
mailing list