[tarantool-patches] Re: [PATCH v3 2/4] box: introduce slot_cache in key_part

Kirill Shcherbatov kshcherbatov at tarantool.org
Thu Sep 6 15:47:09 MSK 2018


> Hi! Thanks for the fixes!
Hi. Thank you for review and fixes.

> 1. Comments.
/** ModifySpaceFormat - update format epoch in a new space. */
class ModifySpaceFormat: public AlterSpaceOp

/*
 * Alter format epoch in a new space format to be greater than
 * previous one.
 */
void
ModifySpaceFormat::alter(struct alter_space *alter)

> 2. Redundant white spaces.
> (fixed by me)
> 3. How new_space can be NULL? alter() called only after new_space
> is created.
> (fixed by me)
> 4. Makes no sense to continue the cycle from this
> moment.
> (fixed by me)
> 5. How old_space can be NULL here? It is stored in struct alter
> right in its constructor. What is more, how old_space->format
> can be NULL, if a new space format is not NULL? And you do not
> need is_format_epoch_changed in such a case.
> (fixed by me)
> 6. Garbage diff.
> (fixed by me)
> 7. Why 'the eldest'? As I remember, we decided to prefer newer
> format offset slots.

> 8. Looks like you ignored my comment about why do you
> case struct key_part * to self. Let me fix it for you
> where possible. Where it is not possible, please,
> remove 'const struct key_def' qualifier from the function
> declaration and do not cast too.
> Also, as you could see, when a tuple_field function is
> introduced, it has two versions: raw and not raw. Not raw
> functions are used exactly for such hunks, when you do
> not need to get multiple fields.
> (partially fixed by me)
This refactoring is really global and time-consuming. Going to do it at last.

>> -template<bool is_nullable, bool has_optional_parts>
>> +template<bool is_nullable, bool has_optional_parts, bool has_json_path>
> 9. You said, that has_json_path is substituted with is_flat, but looks
> like you did not. Please, fix all the remarks of the previous review
> more accurate.
Not, I've choosen has_json_path that I use everywhere.

>> +	format->epoch = 1;
> 10. Why is this epoch invalid? As I understand, a format can live
> with it normally but comparators will be slow.
At first, invalid epoch is 0 and there is 0 on branch. Here is a bug of letter patch.
At second, we need invalid epoch as vinyl creates indexes during space creations,
before alter that setup correct epoch. 

> 11. The same as in the previous review: do not call tuple_format()
> and other tuple functions multiple times when possible.
> (fixed by me)
=================================================

>From 996ee351112bb070d511636bc702496bc445f047 Mon Sep 17 00:00:00 2001
Message-Id: <996ee351112bb070d511636bc702496bc445f047.1536237903.git.kshcherbatov at tarantool.org>
In-Reply-To: <cover.1536237903.git.kshcherbatov at tarantool.org>
References: <cover.1536237903.git.kshcherbatov at tarantool.org>
From: Kirill Shcherbatov <kshcherbatov at tarantool.org>
Date: Thu, 9 Aug 2018 15:02:44 +0300
Subject: [PATCH 2/4] box: introduce slot_cache in key_part

Same key_part could be used in different formats multiple
times, so different field->offset_slot would be allocated.
In most scenarios we work with series of tuples of same
format, and (in general) format lookup for field would be
expensive operation for JSON-paths defined in key_part.
New slot_cache field in key_part structure and epoch-based
mechanism to validate it actuality should be effective
approach to improve performance.
New routine tuple_field_by_part use tuple and key_part
to access field that allows to rework and speedup all
scenarios of access tuple data by index.
This also allows to work with JSON-path key_parts later.

Part of #1012.
---
 src/box/alter.cc             |   6 +-
 src/box/blackhole.c          |   3 +-
 src/box/engine.h             |  11 ++--
 src/box/key_def.c            |  16 +++--
 src/box/key_def.h            |  38 +++++++-----
 src/box/memtx_bitset.c       |   5 +-
 src/box/memtx_engine.c       |   4 +-
 src/box/memtx_hash.h         |   4 +-
 src/box/memtx_rtree.c        |   3 +-
 src/box/memtx_space.c        |   3 +-
 src/box/memtx_space.h        |   2 +-
 src/box/schema.cc            |   4 +-
 src/box/space.c              |   4 +-
 src/box/space.h              |   8 ++-
 src/box/sysview.c            |   3 +-
 src/box/tuple.h              |  14 +++++
 src/box/tuple_bloom.c        |   8 +--
 src/box/tuple_bloom.h        |   8 +--
 src/box/tuple_compare.cc     | 141 ++++++++++++++++++++++++++-----------------
 src/box/tuple_compare.h      |   5 +-
 src/box/tuple_extract_key.cc |  53 +++++++++-------
 src/box/tuple_format.c       |  12 ++++
 src/box/tuple_format.h       |  18 ++++++
 src/box/tuple_hash.cc        |  63 ++++++++++++-------
 src/box/tuple_hash.h         |   9 ++-
 src/box/vinyl.c              |   3 +-
 src/box/vy_history.c         |   2 +-
 src/box/vy_history.h         |   2 +-
 src/box/vy_lsm.c             |   2 +
 src/box/vy_mem.c             |   8 +--
 src/box/vy_mem.h             |  10 +--
 src/box/vy_range.c           |   2 +-
 src/box/vy_range.h           |   4 +-
 src/box/vy_run.c             |  39 ++++++------
 src/box/vy_run.h             |  34 +++++------
 src/box/vy_stmt.c            |  15 ++---
 src/box/vy_stmt.h            |  39 ++++++------
 src/box/vy_upsert.c          |   2 +-
 src/box/vy_upsert.h          |   2 +-
 src/box/vy_write_iterator.c  |   8 +--
 src/box/vy_write_iterator.h  |   6 +-
 41 files changed, 374 insertions(+), 249 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index a6299a1..123db1f 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -883,7 +883,9 @@ alter_space_do(struct txn *txn, struct alter_space *alter)
 	 * Create a new (empty) space for the new definition.
 	 * Sic: the triggers are not moved over yet.
 	 */
-	alter->new_space = space_new_xc(alter->space_def, &alter->key_list);
+	alter->new_space = space_new_xc(alter->space_def, &alter->key_list,
+					alter->old_space->format != NULL ?
+					alter->old_space->format->epoch : 0);
 	/*
 	 * Copy the replace function, the new space is at the same recovery
 	 * phase as the old one. This hack is especially necessary for
@@ -1604,7 +1606,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
 		auto def_guard =
 			make_scoped_guard([=] { space_def_delete(def); });
 		RLIST_HEAD(empty_list);
-		struct space *space = space_new_xc(def, &empty_list);
+		struct space *space = space_new_xc(def, &empty_list, 0);
 		/**
 		 * The new space must be inserted in the space
 		 * cache right away to achieve linearisable
diff --git a/src/box/blackhole.c b/src/box/blackhole.c
index f979304..160154b 100644
--- a/src/box/blackhole.c
+++ b/src/box/blackhole.c
@@ -135,7 +135,7 @@ blackhole_engine_shutdown(struct engine *engine)
 
 static struct space *
 blackhole_engine_create_space(struct engine *engine, struct space_def *def,
-			      struct rlist *key_list)
+			      struct rlist *key_list, uint64_t epoch)
 {
 	if (!rlist_empty(key_list)) {
 		diag_set(ClientError, ER_UNSUPPORTED, "Blackhole", "indexes");
@@ -158,6 +158,7 @@ blackhole_engine_create_space(struct engine *engine, struct space_def *def,
 		return NULL;
 	}
 	format->exact_field_count = def->exact_field_count;
+	format->epoch = ++epoch;
 	tuple_format_ref(format);
 
 	if (space_create(space, engine, &blackhole_space_vtab,
diff --git a/src/box/engine.h b/src/box/engine.h
index 5b96c74..0e8c76c 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -72,7 +72,8 @@ struct engine_vtab {
 	void (*shutdown)(struct engine *);
 	/** Allocate a new space instance. */
 	struct space *(*create_space)(struct engine *engine,
-			struct space_def *def, struct rlist *key_list);
+			struct space_def *def, struct rlist *key_list,
+			uint64_t epoch);
 	/**
 	 * Write statements stored in checkpoint @vclock to @stream.
 	 */
@@ -237,9 +238,9 @@ engine_find(const char *name)
 
 static inline struct space *
 engine_create_space(struct engine *engine, struct space_def *def,
-		    struct rlist *key_list)
+		    struct rlist *key_list, uint64_t epoch)
 {
-	return engine->vtab->create_space(engine, def, key_list);
+	return engine->vtab->create_space(engine, def, key_list, epoch);
 }
 
 static inline int
@@ -390,9 +391,9 @@ engine_find_xc(const char *name)
 
 static inline struct space *
 engine_create_space_xc(struct engine *engine, struct space_def *def,
-		    struct rlist *key_list)
+		    struct rlist *key_list, uint64_t epoch)
 {
-	struct space *space = engine_create_space(engine, def, key_list);
+	struct space *space = engine_create_space(engine, def, key_list, epoch);
 	if (space == NULL)
 		diag_raise();
 	return space;
diff --git a/src/box/key_def.c b/src/box/key_def.c
index ee09dc9..2ef78c1 100644
--- a/src/box/key_def.c
+++ b/src/box/key_def.c
@@ -208,14 +208,14 @@ box_key_def_delete(box_key_def_t *key_def)
 
 int
 box_tuple_compare(const box_tuple_t *tuple_a, const box_tuple_t *tuple_b,
-		  const box_key_def_t *key_def)
+		  box_key_def_t *key_def)
 {
 	return tuple_compare(tuple_a, tuple_b, key_def);
 }
 
 int
 box_tuple_compare_with_key(const box_tuple_t *tuple_a, const char *key_b,
-			   const box_key_def_t *key_def)
+			   box_key_def_t *key_def)
 {
 	uint32_t part_count = mp_decode_array(&key_b);
 	return tuple_compare_with_key(tuple_a, key_b, part_count, key_def);
@@ -258,6 +258,8 @@ key_def_set_part(struct key_def *def, uint32_t part_no, uint32_t fieldno,
 	def->parts[part_no].type = type;
 	def->parts[part_no].coll = coll;
 	def->parts[part_no].coll_id = coll_id;
+	def->parts[part_no].offset_slot = TUPLE_OFFSET_SLOT_NIL;
+	def->parts[part_no].offset_slot_epoch = 0;
 	column_mask_set_fieldno(&def->column_mask, fieldno);
 	/**
 	 * When all parts are set, initialize the tuple
@@ -556,8 +558,11 @@ key_def_merge(const struct key_def *first, const struct key_def *second)
 	part = first->parts;
 	end = part + first->part_count;
 	for (; part != end; part++) {
-		key_def_set_part(new_def, pos++, part->fieldno, part->type,
+		key_def_set_part(new_def, pos, part->fieldno, part->type,
 				 part->is_nullable, part->coll, part->coll_id);
+		new_def->parts[pos].offset_slot_epoch = part->offset_slot_epoch;
+		new_def->parts[pos].offset_slot = part->offset_slot;
+		pos++;
 	}
 
 	/* Set-append second key def's part to the new key def. */
@@ -566,8 +571,11 @@ key_def_merge(const struct key_def *first, const struct key_def *second)
 	for (; part != end; part++) {
 		if (key_def_find(first, part->fieldno))
 			continue;
-		key_def_set_part(new_def, pos++, part->fieldno, part->type,
+		key_def_set_part(new_def, pos, part->fieldno, part->type,
 				 part->is_nullable, part->coll, part->coll_id);
+		new_def->parts[pos].offset_slot_epoch = part->offset_slot_epoch;
+		new_def->parts[pos].offset_slot = part->offset_slot;
+		pos++;
 	}
 	return new_def;
 }
diff --git a/src/box/key_def.h b/src/box/key_def.h
index aecbe03..07997b8 100644
--- a/src/box/key_def.h
+++ b/src/box/key_def.h
@@ -74,6 +74,17 @@ struct key_part {
 	struct coll *coll;
 	/** True if a part can store NULLs. */
 	bool is_nullable;
+	/**
+	 * Epoch of offset slot cache. Initialized with
+	 * incremental epoch of format on caching it's field's
+	 * offset_slot via tuple_field_by_part_raw to speed up
+	 * access on subsequent calls with same format.
+	 * Cache is expected to use "the newest format is most
+	 * relevant" strategy.
+	 */
+	uint64_t offset_slot_epoch;
+	/** Cache with format's field offset slot. */
+	int32_t offset_slot;
 };
 
 struct key_def;
@@ -83,26 +94,26 @@ struct tuple;
 typedef int (*tuple_compare_with_key_t)(const struct tuple *tuple_a,
 					const char *key,
 					uint32_t part_count,
-					const struct key_def *key_def);
+					struct key_def *key_def);
 /** @copydoc tuple_compare() */
 typedef int (*tuple_compare_t)(const struct tuple *tuple_a,
 			       const struct tuple *tuple_b,
-			       const struct key_def *key_def);
+			       struct key_def *key_def);
 /** @copydoc tuple_extract_key() */
 typedef char *(*tuple_extract_key_t)(const struct tuple *tuple,
-				     const struct key_def *key_def,
+				     struct key_def *key_def,
 				     uint32_t *key_size);
 /** @copydoc tuple_extract_key_raw() */
 typedef char *(*tuple_extract_key_raw_t)(const char *data,
 					 const char *data_end,
-					 const struct key_def *key_def,
+					 struct key_def *key_def,
 					 uint32_t *key_size);
 /** @copydoc tuple_hash() */
 typedef uint32_t (*tuple_hash_t)(const struct tuple *tuple,
-				 const struct key_def *key_def);
+				 struct key_def *key_def);
 /** @copydoc key_hash() */
 typedef uint32_t (*key_hash_t)(const char *key,
-				const struct key_def *key_def);
+				struct key_def *key_def);
 
 /* Definition of a multipart key. */
 struct key_def {
@@ -201,7 +212,7 @@ box_key_def_delete(box_key_def_t *key_def);
  */
 int
 box_tuple_compare(const box_tuple_t *tuple_a, const box_tuple_t *tuple_b,
-		  const box_key_def_t *key_def);
+		  box_key_def_t *key_def);
 
 /**
  * @brief Compare tuple with key using the key definition.
@@ -216,7 +227,7 @@ box_tuple_compare(const box_tuple_t *tuple_a, const box_tuple_t *tuple_b,
 
 int
 box_tuple_compare_with_key(const box_tuple_t *tuple_a, const char *key_b,
-			   const box_key_def_t *key_def);
+			   box_key_def_t *key_def);
 
 /** \endcond public */
 
@@ -443,7 +454,7 @@ key_part_cmp(const struct key_part *parts1, uint32_t part_count1,
  * @retval NULL     Memory allocation error
  */
 static inline char *
-tuple_extract_key(const struct tuple *tuple, const struct key_def *key_def,
+tuple_extract_key(const struct tuple *tuple, struct key_def *key_def,
 		  uint32_t *key_size)
 {
 	return key_def->tuple_extract_key(tuple, key_def, key_size);
@@ -464,7 +475,7 @@ tuple_extract_key(const struct tuple *tuple, const struct key_def *key_def,
  */
 static inline char *
 tuple_extract_key_raw(const char *data, const char *data_end,
-		      const struct key_def *key_def, uint32_t *key_size)
+		      struct key_def *key_def, uint32_t *key_size)
 {
 	return key_def->tuple_extract_key_raw(data, data_end, key_def,
 					      key_size);
@@ -483,8 +494,7 @@ tuple_extract_key_raw(const char *data, const char *data_end,
  * @retval >0 if key_a > key_b
  */
 int
-key_compare(const char *key_a, const char *key_b,
-	    const struct key_def *key_def);
+key_compare(const char *key_a, const char *key_b, struct key_def *key_def);
 
 /**
  * Compare tuples using the key definition.
@@ -497,7 +507,7 @@ key_compare(const char *key_a, const char *key_b,
  */
 static inline int
 tuple_compare(const struct tuple *tuple_a, const struct tuple *tuple_b,
-	      const struct key_def *key_def)
+	      struct key_def *key_def)
 {
 	return key_def->tuple_compare(tuple_a, tuple_b, key_def);
 }
@@ -515,7 +525,7 @@ tuple_compare(const struct tuple *tuple_a, const struct tuple *tuple_b,
  */
 static inline int
 tuple_compare_with_key(const struct tuple *tuple, const char *key,
-		       uint32_t part_count, const struct key_def *key_def)
+		       uint32_t part_count, struct key_def *key_def)
 {
 	return key_def->tuple_compare_with_key(tuple, key, part_count, key_def);
 }
diff --git a/src/box/memtx_bitset.c b/src/box/memtx_bitset.c
index a665f1a..cd7362e 100644
--- a/src/box/memtx_bitset.c
+++ b/src/box/memtx_bitset.c
@@ -283,8 +283,9 @@ memtx_bitset_index_replace(struct index *base, struct tuple *old_tuple,
 	}
 
 	if (new_tuple != NULL) {
-		const char *field;
-		field = tuple_field(new_tuple, base->def->key_def->parts[0].fieldno);
+		const char *field =
+			tuple_field_by_part(new_tuple,
+					    base->def->key_def->parts);
 		uint32_t key_len;
 		const void *key = make_key(field, &key_len);
 #ifndef OLD_GOOD_BITSET
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 1f80ce5..4b7d377 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -358,10 +358,10 @@ memtx_engine_end_recovery(struct engine *engine)
 
 static struct space *
 memtx_engine_create_space(struct engine *engine, struct space_def *def,
-			  struct rlist *key_list)
+			  struct rlist *key_list, uint64_t epoch)
 {
 	struct memtx_engine *memtx = (struct memtx_engine *)engine;
-	return memtx_space_new(memtx, def, key_list);
+	return memtx_space_new(memtx, def, key_list, epoch);
 }
 
 static int
diff --git a/src/box/memtx_hash.h b/src/box/memtx_hash.h
index a3b4805..10663fc 100644
--- a/src/box/memtx_hash.h
+++ b/src/box/memtx_hash.h
@@ -39,14 +39,14 @@ extern "C" {
 
 static inline bool
 memtx_hash_equal(struct tuple *tuple_a, struct tuple *tuple_b,
-		 const struct key_def *key_def)
+		 struct key_def *key_def)
 {
 	return tuple_compare(tuple_a, tuple_b, key_def) == 0;
 }
 
 static inline bool
 memtx_hash_equal_key(struct tuple *tuple, const char *key,
-		     const struct key_def *key_def)
+		     struct key_def *key_def)
 {
 	return tuple_compare_with_key(tuple, key, key_def->part_count,
 				      key_def) == 0;
diff --git a/src/box/memtx_rtree.c b/src/box/memtx_rtree.c
index 0b12cda..f2aa6c3 100644
--- a/src/box/memtx_rtree.c
+++ b/src/box/memtx_rtree.c
@@ -112,7 +112,8 @@ extract_rectangle(struct rtree_rect *rect, const struct tuple *tuple,
 		  struct index_def *index_def)
 {
 	assert(index_def->key_def->part_count == 1);
-	const char *elems = tuple_field(tuple, index_def->key_def->parts[0].fieldno);
+	const char *elems = tuple_field_by_part(tuple,
+						index_def->key_def->parts);
 	unsigned dimension = index_def->opts.dimension;
 	uint32_t count = mp_decode_array(&elems);
 	return mp_decode_rect(rect, dimension, elems, count, "Field");
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 08ae0da..8b30fc2 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -884,7 +884,7 @@ static const struct space_vtab memtx_space_vtab = {
 
 struct space *
 memtx_space_new(struct memtx_engine *memtx,
-		struct space_def *def, struct rlist *key_list)
+		struct space_def *def, struct rlist *key_list, uint64_t epoch)
 {
 	struct memtx_space *memtx_space = malloc(sizeof(*memtx_space));
 	if (memtx_space == NULL) {
@@ -918,6 +918,7 @@ memtx_space_new(struct memtx_engine *memtx,
 	format->engine = memtx;
 	format->is_temporary = def->opts.is_temporary;
 	format->exact_field_count = def->exact_field_count;
+	format->epoch = ++epoch;
 	tuple_format_ref(format);
 
 	if (space_create((struct space *)memtx_space, (struct engine *)memtx,
diff --git a/src/box/memtx_space.h b/src/box/memtx_space.h
index 7dc3410..b5bec0c 100644
--- a/src/box/memtx_space.h
+++ b/src/box/memtx_space.h
@@ -79,7 +79,7 @@ memtx_space_replace_all_keys(struct space *, struct tuple *, struct tuple *,
 
 struct space *
 memtx_space_new(struct memtx_engine *memtx,
-		struct space_def *def, struct rlist *key_list);
+		struct space_def *def, struct rlist *key_list, uint64_t epoch);
 
 #if defined(__cplusplus)
 } /* extern "C" */
diff --git a/src/box/schema.cc b/src/box/schema.cc
index 7f20f36..e52e19d 100644
--- a/src/box/schema.cc
+++ b/src/box/schema.cc
@@ -214,7 +214,7 @@ sc_space_new(uint32_t id, const char *name, struct key_def *key_def,
 	struct rlist key_list;
 	rlist_create(&key_list);
 	rlist_add_entry(&key_list, index_def, link);
-	struct space *space = space_new_xc(def, &key_list);
+	struct space *space = space_new_xc(def, &key_list, 0);
 	(void) space_cache_replace(space);
 	if (replace_trigger)
 		trigger_add(&space->on_replace, replace_trigger);
@@ -380,7 +380,7 @@ schema_init()
 			space_def_delete(def);
 		});
 		RLIST_HEAD(key_list);
-		struct space *space = space_new_xc(def, &key_list);
+		struct space *space = space_new_xc(def, &key_list, 0);
 		space_cache_replace(space);
 		init_system_space(space);
 		trigger_run_xc(&on_alter_space, space);
diff --git a/src/box/space.c b/src/box/space.c
index 871cc67..2e4df74 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -181,12 +181,12 @@ fail:
 }
 
 struct space *
-space_new(struct space_def *def, struct rlist *key_list)
+space_new(struct space_def *def, struct rlist *key_list, uint64_t epoch)
 {
 	struct engine *engine = engine_find(def->engine_name);
 	if (engine == NULL)
 		return NULL;
-	return engine_create_space(engine, def, key_list);
+	return engine_create_space(engine, def, key_list, epoch);
 }
 
 void
diff --git a/src/box/space.h b/src/box/space.h
index 8888ec8..068ea4b 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -378,10 +378,11 @@ struct field_def;
  * Allocate and initialize a space.
  * @param space_def Space definition.
  * @param key_list List of index_defs.
+ * @param epoch Last epoch to initialize format.
  * @retval Space object.
  */
 struct space *
-space_new(struct space_def *space_def, struct rlist *key_list);
+space_new(struct space_def *space_def, struct rlist *key_list, uint64_t epoch);
 
 /** Destroy and free a space. */
 void
@@ -416,9 +417,10 @@ int generic_space_prepare_alter(struct space *, struct space *);
 } /* extern "C" */
 
 static inline struct space *
-space_new_xc(struct space_def *space_def, struct rlist *key_list)
+space_new_xc(struct space_def *space_def, struct rlist *key_list,
+	     uint64_t epoch)
 {
-	struct space *space = space_new(space_def, key_list);
+	struct space *space = space_new(space_def, key_list, epoch);
 	if (space == NULL)
 		diag_raise();
 	return space;
diff --git a/src/box/sysview.c b/src/box/sysview.c
index a636c68..d35ff71 100644
--- a/src/box/sysview.c
+++ b/src/box/sysview.c
@@ -504,8 +504,9 @@ sysview_engine_shutdown(struct engine *engine)
 
 static struct space *
 sysview_engine_create_space(struct engine *engine, struct space_def *def,
-			    struct rlist *key_list)
+			    struct rlist *key_list, uint64_t epoch)
 {
+	(void)epoch;
 	struct space *space = (struct space *)calloc(1, sizeof(*space));
 	if (space == NULL) {
 		diag_set(OutOfMemory, sizeof(*space),
diff --git a/src/box/tuple.h b/src/box/tuple.h
index 2e84516..b638f50 100644
--- a/src/box/tuple.h
+++ b/src/box/tuple.h
@@ -43,6 +43,7 @@ extern "C" {
 
 struct slab_arena;
 struct quota;
+struct key_part;
 
 /**
  * A format for standalone tuples allocated on runtime arena.
@@ -522,6 +523,19 @@ tuple_field(const struct tuple *tuple, uint32_t fieldno)
 }
 
 /**
+ * Get a field refereed by index @part in tuple.
+ * @param tuple Tuple to get the field from.
+ * @param part Index part to use.
+ * @retval Field data if the field exists or NULL.
+ */
+static inline const char *
+tuple_field_by_part(const struct tuple *tuple, struct key_part *part)
+{
+	return tuple_field_by_part_raw(tuple_format(tuple), tuple_data(tuple),
+				       tuple_field_map(tuple), part);
+}
+
+/**
  * Get tuple field by its JSON path.
  * @param tuple Tuple to get field from.
  * @param path Field JSON path.
diff --git a/src/box/tuple_bloom.c b/src/box/tuple_bloom.c
index ffad151..dc40698 100644
--- a/src/box/tuple_bloom.c
+++ b/src/box/tuple_bloom.c
@@ -74,8 +74,7 @@ tuple_bloom_builder_delete(struct tuple_bloom_builder *builder)
 
 int
 tuple_bloom_builder_add(struct tuple_bloom_builder *builder,
-			const struct tuple *tuple,
-			const struct key_def *key_def,
+			const struct tuple *tuple, struct key_def *key_def,
 			uint32_t hashed_parts)
 {
 	assert(builder->part_count == key_def->part_count);
@@ -168,8 +167,7 @@ tuple_bloom_delete(struct tuple_bloom *bloom)
 
 bool
 tuple_bloom_maybe_has(const struct tuple_bloom *bloom,
-		      const struct tuple *tuple,
-		      const struct key_def *key_def)
+		      const struct tuple *tuple, struct key_def *key_def)
 {
 	if (bloom->is_legacy) {
 		return bloom_maybe_has(&bloom->parts[0],
@@ -195,7 +193,7 @@ tuple_bloom_maybe_has(const struct tuple_bloom *bloom,
 bool
 tuple_bloom_maybe_has_key(const struct tuple_bloom *bloom,
 			  const char *key, uint32_t part_count,
-			  const struct key_def *key_def)
+			  struct key_def *key_def)
 {
 	if (bloom->is_legacy) {
 		if (part_count < key_def->part_count)
diff --git a/src/box/tuple_bloom.h b/src/box/tuple_bloom.h
index 505933d..b05fee1 100644
--- a/src/box/tuple_bloom.h
+++ b/src/box/tuple_bloom.h
@@ -117,8 +117,7 @@ tuple_bloom_builder_delete(struct tuple_bloom_builder *builder);
  */
 int
 tuple_bloom_builder_add(struct tuple_bloom_builder *builder,
-			const struct tuple *tuple,
-			const struct key_def *key_def,
+			const struct tuple *tuple, struct key_def *key_def,
 			uint32_t hashed_parts);
 
 /**
@@ -147,8 +146,7 @@ tuple_bloom_delete(struct tuple_bloom *bloom);
  */
 bool
 tuple_bloom_maybe_has(const struct tuple_bloom *bloom,
-		      const struct tuple *tuple,
-		      const struct key_def *key_def);
+		      const struct tuple *tuple, struct key_def *key_def);
 
 /**
  * Check if a tuple matching a key was stored in a tuple bloom filter.
@@ -162,7 +160,7 @@ tuple_bloom_maybe_has(const struct tuple_bloom *bloom,
 bool
 tuple_bloom_maybe_has_key(const struct tuple_bloom *bloom,
 			  const char *key, uint32_t part_count,
-			  const struct key_def *key_def);
+			  struct key_def *key_def);
 
 /**
  * Return the size of a tuple bloom filter when encoded.
diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc
index e53afba..b14ac35 100644
--- a/src/box/tuple_compare.cc
+++ b/src/box/tuple_compare.cc
@@ -426,15 +426,24 @@ tuple_compare_field_with_hint(const char *field_a, enum mp_type a_type,
 }
 
 uint32_t
-tuple_common_key_parts(const struct tuple *tuple_a,
-		       const struct tuple *tuple_b,
-		       const struct key_def *key_def)
+tuple_common_key_parts(const struct tuple *tuple_a, const struct tuple *tuple_b,
+		       struct key_def *key_def)
 {
 	uint32_t i;
+	struct tuple_format *tuple_a_format = tuple_format(tuple_a);
+	struct tuple_format *tuple_b_format = tuple_format(tuple_b);
+	const char *tuple_a_raw = tuple_data(tuple_a);
+	const char *tuple_b_raw = tuple_data(tuple_b);
+	const uint32_t *tuple_a_field_map = tuple_field_map(tuple_a);
+	const uint32_t *tuple_b_field_map = tuple_field_map(tuple_b);
 	for (i = 0; i < key_def->part_count; i++) {
-		const struct key_part *part = &key_def->parts[i];
-		const char *field_a = tuple_field(tuple_a, part->fieldno);
-		const char *field_b = tuple_field(tuple_b, part->fieldno);
+		struct key_part *part = (struct key_part *)&key_def->parts[i];
+		const char *field_a =
+			tuple_field_by_part_raw(tuple_a_format, tuple_a_raw,
+						tuple_a_field_map, part);
+		const char *field_b =
+			tuple_field_by_part_raw(tuple_b_format, tuple_b_raw,
+						tuple_b_field_map, part);
 		enum mp_type a_type = field_a != NULL ?
 				      mp_typeof(*field_a) : MP_NIL;
 		enum mp_type b_type = field_b != NULL ?
@@ -449,15 +458,15 @@ tuple_common_key_parts(const struct tuple *tuple_a,
 	return i;
 }
 
-template<bool is_nullable, bool has_optional_parts>
+template<bool is_nullable, bool has_optional_parts, bool has_json_path>
 static inline int
 tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
-		       const struct key_def *key_def)
+		       struct key_def *key_def)
 {
 	assert(!has_optional_parts || is_nullable);
 	assert(is_nullable == key_def->is_nullable);
 	assert(has_optional_parts == key_def->has_optional_parts);
-	const struct key_part *part = key_def->parts;
+	struct key_part *part = key_def->parts;
 	const char *tuple_a_raw = tuple_data(tuple_a);
 	const char *tuple_b_raw = tuple_data(tuple_b);
 	if (key_def->part_count == 1 && part->fieldno == 0) {
@@ -488,7 +497,7 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 	const struct tuple_format *format_b = tuple_format(tuple_b);
 	const uint32_t *field_map_a = tuple_field_map(tuple_a);
 	const uint32_t *field_map_b = tuple_field_map(tuple_b);
-	const struct key_part *end;
+	struct key_part *end;
 	const char *field_a, *field_b;
 	enum mp_type a_type, b_type;
 	int rc;
@@ -498,10 +507,19 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 		end = part + key_def->part_count;
 
 	for (; part < end; part++) {
-		field_a = tuple_field_raw(format_a, tuple_a_raw, field_map_a,
-					  part->fieldno);
-		field_b = tuple_field_raw(format_b, tuple_b_raw, field_map_b,
-					  part->fieldno);
+		if (!has_json_path) {
+			field_a = tuple_field_raw(format_a, tuple_a_raw,
+						  field_map_a,
+						  part->fieldno);
+			field_b = tuple_field_raw(format_b, tuple_b_raw,
+						  field_map_b,
+						  part->fieldno);
+		} else {
+			field_a = tuple_field_by_part_raw(format_a, tuple_a_raw,
+							  field_map_a, part);
+			field_b = tuple_field_by_part_raw(format_b, tuple_b_raw,
+							  field_map_b, part);
+		}
 		assert(has_optional_parts ||
 		       (field_a != NULL && field_b != NULL));
 		if (! is_nullable) {
@@ -548,10 +566,19 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 	 */
 	end = key_def->parts + key_def->part_count;
 	for (; part < end; ++part) {
-		field_a = tuple_field_raw(format_a, tuple_a_raw, field_map_a,
-					  part->fieldno);
-		field_b = tuple_field_raw(format_b, tuple_b_raw, field_map_b,
-					  part->fieldno);
+		if (!has_json_path) {
+			field_a = tuple_field_raw(format_a, tuple_a_raw,
+						  field_map_a,
+						  part->fieldno);
+			field_b = tuple_field_raw(format_b, tuple_b_raw,
+						  field_map_b,
+						  part->fieldno);
+		} else {
+			field_a = tuple_field_by_part_raw(format_a, tuple_a_raw,
+							  field_map_a, part);
+			field_b = tuple_field_by_part_raw(format_b, tuple_b_raw,
+							  field_map_b, part);
+		}
 		/*
 		 * Extended parts are primary, and they can not
 		 * be absent or be NULLs.
@@ -565,26 +592,30 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 	return 0;
 }
 
-template<bool is_nullable, bool has_optional_parts>
+template<bool is_nullable, bool has_optional_parts, bool has_json_paths>
 static inline int
 tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
-				uint32_t part_count,
-				const struct key_def *key_def)
+				uint32_t part_count, struct key_def *key_def)
 {
 	assert(!has_optional_parts || is_nullable);
 	assert(is_nullable == key_def->is_nullable);
 	assert(has_optional_parts == key_def->has_optional_parts);
 	assert(key != NULL || part_count == 0);
 	assert(part_count <= key_def->part_count);
-	const struct key_part *part = key_def->parts;
+	struct key_part *part = key_def->parts;
 	const struct tuple_format *format = tuple_format(tuple);
 	const char *tuple_raw = tuple_data(tuple);
 	const uint32_t *field_map = tuple_field_map(tuple);
 	enum mp_type a_type, b_type;
 	if (likely(part_count == 1)) {
 		const char *field;
-		field = tuple_field_raw(format, tuple_raw, field_map,
-					part->fieldno);
+		if (!has_json_paths) {
+			field = tuple_field_raw(format, tuple_raw, field_map,
+						part->fieldno);
+		} else {
+			field = tuple_field_by_part_raw(format, tuple_raw,
+							field_map, part);
+		}
 		if (! is_nullable) {
 			return tuple_compare_field(field, key, part->type,
 						   part->coll);
@@ -605,12 +636,17 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
 		}
 	}
 
-	const struct key_part *end = part + part_count;
+	struct key_part *end = part + part_count;
 	int rc;
 	for (; part < end; ++part, mp_next(&key)) {
 		const char *field;
-		field = tuple_field_raw(format, tuple_raw, field_map,
-					part->fieldno);
+		if (!has_json_paths) {
+			field = tuple_field_raw(format, tuple_raw, field_map,
+						part->fieldno);
+		} else {
+			field = tuple_field_by_part_raw(format, tuple_raw,
+							field_map, part);
+		}
 		if (! is_nullable) {
 			rc = tuple_compare_field(field, key, part->type,
 						 part->coll);
@@ -643,11 +679,11 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
 template<bool is_nullable>
 static inline int
 key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
-		  const struct key_def *key_def)
+		  struct key_def *key_def)
 {
 	assert(is_nullable == key_def->is_nullable);
 	assert((key_a != NULL && key_b != NULL) || part_count == 0);
-	const struct key_part *part = key_def->parts;
+	struct key_part *part = key_def->parts;
 	if (likely(part_count == 1)) {
 		if (! is_nullable) {
 			return tuple_compare_field(key_a, key_b, part->type,
@@ -667,7 +703,7 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
 		}
 	}
 
-	const struct key_part *end = part + part_count;
+	struct key_part *end = part + part_count;
 	int rc;
 	for (; part < end; ++part, mp_next(&key_a), mp_next(&key_b)) {
 		if (! is_nullable) {
@@ -699,8 +735,7 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
 template<bool is_nullable, bool has_optional_parts>
 static inline int
 tuple_compare_with_key_sequential(const struct tuple *tuple, const char *key,
-				  uint32_t part_count,
-				  const struct key_def *key_def)
+				  uint32_t part_count, struct key_def *key_def)
 {
 	assert(!has_optional_parts || is_nullable);
 	assert(key_def_is_sequential(key_def));
@@ -739,8 +774,7 @@ tuple_compare_with_key_sequential(const struct tuple *tuple, const char *key,
 }
 
 int
-key_compare(const char *key_a, const char *key_b,
-	    const struct key_def *key_def)
+key_compare(const char *key_a, const char *key_b, struct key_def *key_def)
 {
 	uint32_t part_count_a = mp_decode_array(&key_a);
 	uint32_t part_count_b = mp_decode_array(&key_b);
@@ -760,8 +794,7 @@ key_compare(const char *key_a, const char *key_b,
 template <bool is_nullable, bool has_optional_parts>
 static int
 tuple_compare_sequential(const struct tuple *tuple_a,
-			 const struct tuple *tuple_b,
-			 const struct key_def *key_def)
+			 const struct tuple *tuple_b, key_def *key_def)
 {
 	assert(!has_optional_parts || is_nullable);
 	assert(has_optional_parts == key_def->has_optional_parts);
@@ -778,8 +811,8 @@ tuple_compare_sequential(const struct tuple *tuple_a,
 						key_def->part_count, key_def);
 	}
 	bool was_null_met = false;
-	const struct key_part *part = key_def->parts;
-	const struct key_part *end = part + key_def->unique_part_count;
+	struct key_part *part = key_def->parts;
+	struct key_part *end = part + key_def->unique_part_count;
 	int rc;
 	uint32_t i = 0;
 	for (; part < end; ++part, ++i) {
@@ -944,7 +977,7 @@ struct TupleCompare
 {
 	static int compare(const struct tuple *tuple_a,
 			   const struct tuple *tuple_b,
-			   const struct key_def *)
+			   struct key_def *)
 	{
 		struct tuple_format *format_a = tuple_format(tuple_a);
 		struct tuple_format *format_b = tuple_format(tuple_b);
@@ -963,7 +996,7 @@ template <int TYPE, int ...MORE_TYPES>
 struct TupleCompare<0, TYPE, MORE_TYPES...> {
 	static int compare(const struct tuple *tuple_a,
 			   const struct tuple *tuple_b,
-			   const struct key_def *)
+			   struct key_def *)
 	{
 		struct tuple_format *format_a = tuple_format(tuple_a);
 		struct tuple_format *format_b = tuple_format(tuple_b);
@@ -1016,9 +1049,9 @@ tuple_compare_create(const struct key_def *def)
 			else
 				return tuple_compare_sequential<true, false>;
 		} else if (def->has_optional_parts) {
-			return tuple_compare_slowpath<true, true>;
+			return tuple_compare_slowpath<true, true, false>;
 		} else {
-			return tuple_compare_slowpath<true, false>;
+			return tuple_compare_slowpath<true, false, false>;
 		}
 	}
 	assert(! def->has_optional_parts);
@@ -1041,7 +1074,7 @@ tuple_compare_create(const struct key_def *def)
 	if (key_def_is_sequential(def))
 		return tuple_compare_sequential<false, false>;
 	else
-		return tuple_compare_slowpath<false, false>;
+		return tuple_compare_slowpath<false, false, false>;
 }
 
 /* }}} tuple_compare */
@@ -1115,7 +1148,7 @@ struct FieldCompareWithKey<FLD_ID, IDX, TYPE, IDX2, TYPE2, MORE_TYPES...>
 {
 	inline static int
 	compare(const struct tuple *tuple, const char *key,
-		uint32_t part_count, const struct key_def *key_def,
+		uint32_t part_count, struct key_def *key_def,
 		const struct tuple_format *format, const char *field)
 	{
 		int r;
@@ -1141,11 +1174,11 @@ struct FieldCompareWithKey<FLD_ID, IDX, TYPE, IDX2, TYPE2, MORE_TYPES...>
 template <int FLD_ID, int IDX, int TYPE>
 struct FieldCompareWithKey<FLD_ID, IDX, TYPE> {
 	inline static int compare(const struct tuple *,
-					 const char *key,
-					 uint32_t,
-					 const struct key_def *,
-					 const struct tuple_format *,
-					 const char *field)
+				  const char *key,
+				  uint32_t,
+				  struct key_def *,
+				  const struct tuple_format *,
+				  const char *field)
 	{
 		return field_compare_with_key<TYPE>(&field, &key);
 	}
@@ -1159,7 +1192,7 @@ struct TupleCompareWithKey
 {
 	static int
 	compare(const struct tuple *tuple, const char *key,
-		uint32_t part_count, const struct key_def *key_def)
+		uint32_t part_count, struct key_def *key_def)
 	{
 		/* Part count can be 0 in wildcard searches. */
 		if (part_count == 0)
@@ -1180,7 +1213,7 @@ struct TupleCompareWithKey<0, 0, TYPE, MORE_TYPES...>
 	static int compare(const struct tuple *tuple,
 				  const char *key,
 				  uint32_t part_count,
-				  const struct key_def *key_def)
+				  struct key_def *key_def)
 	{
 		/* Part count can be 0 in wildcard searches. */
 		if (part_count == 0)
@@ -1236,9 +1269,9 @@ tuple_compare_with_key_create(const struct key_def *def)
 									 false>;
 			}
 		} else if (def->has_optional_parts) {
-			return tuple_compare_with_key_slowpath<true, true>;
+			return tuple_compare_with_key_slowpath<true, true, false>;
 		} else {
-			return tuple_compare_with_key_slowpath<true, false>;
+			return tuple_compare_with_key_slowpath<true, false, false>;
 		}
 	}
 	assert(! def->has_optional_parts);
@@ -1264,7 +1297,7 @@ tuple_compare_with_key_create(const struct key_def *def)
 	if (key_def_is_sequential(def))
 		return tuple_compare_with_key_sequential<false, false>;
 	else
-		return tuple_compare_with_key_slowpath<false, false>;
+		return tuple_compare_with_key_slowpath<false, false, false>;
 }
 
 /* }}} tuple_compare_with_key */
diff --git a/src/box/tuple_compare.h b/src/box/tuple_compare.h
index 2a9875a..e3a6320 100644
--- a/src/box/tuple_compare.h
+++ b/src/box/tuple_compare.h
@@ -49,9 +49,8 @@ extern "C" {
  * @return number of key parts the two tuples have in common
  */
 uint32_t
-tuple_common_key_parts(const struct tuple *tuple_a,
-		       const struct tuple *tuple_b,
-		       const struct key_def *key_def);
+tuple_common_key_parts(const struct tuple *tuple_a, const struct tuple *tuple_b,
+		       struct key_def *key_def);
 
 /**
  * Create a comparison function for the key_def
diff --git a/src/box/tuple_extract_key.cc b/src/box/tuple_extract_key.cc
index 880abb6..6b771f3 100644
--- a/src/box/tuple_extract_key.cc
+++ b/src/box/tuple_extract_key.cc
@@ -22,8 +22,7 @@ key_def_contains_sequential_parts(const struct key_def *def)
 template <bool has_optional_parts>
 static char *
 tuple_extract_key_sequential_raw(const char *data, const char *data_end,
-				 const struct key_def *key_def,
-				 uint32_t *key_size)
+				 struct key_def *key_def, uint32_t *key_size)
 {
 	assert(!has_optional_parts || key_def->is_nullable);
 	assert(key_def_is_sequential(key_def));
@@ -72,8 +71,7 @@ tuple_extract_key_sequential_raw(const char *data, const char *data_end,
  */
 template <bool has_optional_parts>
 static inline char *
-tuple_extract_key_sequential(const struct tuple *tuple,
-			     const struct key_def *key_def,
+tuple_extract_key_sequential(const struct tuple *tuple, struct key_def *key_def,
 			     uint32_t *key_size)
 {
 	assert(key_def_is_sequential(key_def));
@@ -91,10 +89,11 @@ tuple_extract_key_sequential(const struct tuple *tuple,
  * General-purpose implementation of tuple_extract_key()
  * @copydoc tuple_extract_key()
  */
-template <bool contains_sequential_parts, bool has_optional_parts>
+template <bool contains_sequential_parts, bool has_optional_parts,
+          bool has_json_paths>
 static char *
 tuple_extract_key_slowpath(const struct tuple *tuple,
-			   const struct key_def *key_def, uint32_t *key_size)
+			   struct key_def *key_def, uint32_t *key_size)
 {
 	assert(!has_optional_parts || key_def->is_nullable);
 	assert(has_optional_parts == key_def->has_optional_parts);
@@ -110,9 +109,14 @@ tuple_extract_key_slowpath(const struct tuple *tuple,
 
 	/* Calculate the key size. */
 	for (uint32_t i = 0; i < part_count; ++i) {
-		const char *field =
-			tuple_field_raw(format, data, field_map,
-					key_def->parts[i].fieldno);
+		const char *field;
+		if (!has_json_paths) {
+			field = tuple_field_raw(format, data, field_map,
+						key_def->parts[i].fieldno);
+		} else {
+			field = tuple_field_by_part_raw(format, data, field_map,
+							&key_def->parts[i]);
+		}
 		if (has_optional_parts && field == NULL) {
 			bsize += mp_sizeof_nil();
 			continue;
@@ -152,9 +156,14 @@ tuple_extract_key_slowpath(const struct tuple *tuple,
 	}
 	char *key_buf = mp_encode_array(key, part_count);
 	for (uint32_t i = 0; i < part_count; ++i) {
-		const char *field =
-			tuple_field_raw(format, data, field_map,
-					key_def->parts[i].fieldno);
+		const char *field;
+		if (!has_json_paths) {
+			field = tuple_field_raw(format, data, field_map,
+						key_def->parts[i].fieldno);
+		} else {
+			field = tuple_field_by_part_raw(format, data, field_map,
+							&key_def->parts[i]);
+		}
 		if (has_optional_parts && field == NULL) {
 			key_buf = mp_encode_nil(key_buf);
 			continue;
@@ -201,11 +210,10 @@ tuple_extract_key_slowpath(const struct tuple *tuple,
  * General-purpose version of tuple_extract_key_raw()
  * @copydoc tuple_extract_key_raw()
  */
-template <bool has_optional_parts>
+template <bool has_optional_parts, bool has_json_paths>
 static char *
 tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
-			       const struct key_def *key_def,
-			       uint32_t *key_size)
+			       struct key_def *key_def, uint32_t *key_size)
 {
 	assert(!has_optional_parts || key_def->is_nullable);
 	assert(has_optional_parts == key_def->has_optional_parts);
@@ -318,18 +326,21 @@ tuple_extract_key_set(struct key_def *key_def)
 			assert(key_def->is_nullable);
 			if (key_def_contains_sequential_parts(key_def)) {
 				key_def->tuple_extract_key =
-					tuple_extract_key_slowpath<true, true>;
+					tuple_extract_key_slowpath<true, true,
+								   false>;
 			} else {
 				key_def->tuple_extract_key =
-					tuple_extract_key_slowpath<false, true>;
+					tuple_extract_key_slowpath<false, true,
+								   false>;
 			}
 		} else {
 			if (key_def_contains_sequential_parts(key_def)) {
 				key_def->tuple_extract_key =
-					tuple_extract_key_slowpath<true, false>;
+					tuple_extract_key_slowpath<true, false,
+								   false>;
 			} else {
 				key_def->tuple_extract_key =
-					tuple_extract_key_slowpath<false,
+					tuple_extract_key_slowpath<false, false,
 								   false>;
 			}
 		}
@@ -337,9 +348,9 @@ tuple_extract_key_set(struct key_def *key_def)
 	if (key_def->has_optional_parts) {
 		assert(key_def->is_nullable);
 		key_def->tuple_extract_key_raw =
-			tuple_extract_key_slowpath_raw<true>;
+			tuple_extract_key_slowpath_raw<true, false>;
 	} else {
 		key_def->tuple_extract_key_raw =
-			tuple_extract_key_slowpath_raw<false>;
+			tuple_extract_key_slowpath_raw<false, false>;
 	}
 }
diff --git a/src/box/tuple_format.c b/src/box/tuple_format.c
index b385c0d..6ae96e2 100644
--- a/src/box/tuple_format.c
+++ b/src/box/tuple_format.c
@@ -232,6 +232,11 @@ tuple_format_alloc(struct key_def * const *keys, uint16_t key_count,
 		format->dict = dict;
 		tuple_dictionary_ref(dict);
 	}
+	/*
+	 * Set invalid epoch that should be changed later on
+	 * attaching to space.
+	 */
+	format->epoch = 0;
 	format->refs = 0;
 	format->id = FORMAT_ID_NIL;
 	format->field_count = field_count;
@@ -541,6 +546,13 @@ tuple_field_go_to_key(const char **field, const char *key, int len)
 	return -1;
 }
 
+const char *
+tuple_field_by_part_raw(const struct tuple_format *format, const char *data,
+			const uint32_t *field_map, struct key_part *part)
+{
+	return tuple_field_raw(format, data, field_map, part->fieldno);
+}
+
 int
 tuple_field_raw_by_path(struct tuple_format *format, const char *tuple,
                         const uint32_t *field_map, const char *path,
diff --git a/src/box/tuple_format.h b/src/box/tuple_format.h
index c7dc48f..9406d5b 100644
--- a/src/box/tuple_format.h
+++ b/src/box/tuple_format.h
@@ -115,6 +115,12 @@ struct tuple_field {
  * Tuple format describes how tuple is stored and information about its fields
  */
 struct tuple_format {
+	/**
+	 * Counter that grows incrementally on space rebuild if
+	 * format has other distribution of offset slots comparing
+	 * with previous one.
+	 */
+	uint64_t epoch;
 	/** Virtual function table */
 	struct tuple_format_vtab vtab;
 	/** Pointer to engine-specific data. */
@@ -324,6 +330,18 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
 		     const char *tuple);
 
 /**
+ * Get a field refereed by index @part in tuple.
+ * @param format Tuple format.
+ * @param tuple A pointer to MessagePack array.
+ * @param field_map A pointer to the LAST element of field map.
+ * @param part Index part to use.
+ * @retval Field data if the field exists or NULL.
+ */
+const char *
+tuple_field_by_part_raw(const struct tuple_format *format, const char *data,
+			const uint32_t *field_map, struct key_part *part);
+
+/**
  * Get a field at the specific position in this MessagePack array.
  * Returns a pointer to MessagePack data.
  * @param format tuple format
diff --git a/src/box/tuple_hash.cc b/src/box/tuple_hash.cc
index dee9be3..01a0983 100644
--- a/src/box/tuple_hash.cc
+++ b/src/box/tuple_hash.cc
@@ -103,7 +103,7 @@ struct KeyFieldHash<TYPE> {
 
 template <int TYPE, int ...MORE_TYPES>
 struct KeyHash {
-	static uint32_t hash(const char *key, const struct key_def *)
+	static uint32_t hash(const char *key, struct key_def *)
 	{
 		uint32_t h = HASH_SEED;
 		uint32_t carry = 0;
@@ -116,7 +116,7 @@ struct KeyHash {
 
 template <>
 struct KeyHash<FIELD_TYPE_UNSIGNED> {
-	static uint32_t hash(const char *key, const struct key_def *key_def)
+	static uint32_t hash(const char *key, struct key_def *key_def)
 	{
 		uint64_t val = mp_decode_uint(&key);
 		(void) key_def;
@@ -152,12 +152,13 @@ template <int TYPE, int ...MORE_TYPES>
 struct TupleHash
 {
 	static uint32_t hash(const struct tuple *tuple,
-			     const struct key_def *key_def)
+			     struct key_def *key_def)
 	{
 		uint32_t h = HASH_SEED;
 		uint32_t carry = 0;
 		uint32_t total_size = 0;
-		const char *field = tuple_field(tuple, key_def->parts->fieldno);
+		const char *field =
+			tuple_field_by_part(tuple, key_def->parts);
 		TupleFieldHash<TYPE, MORE_TYPES...>::
 			hash(&field, &h, &carry, &total_size);
 		return PMurHash32_Result(h, carry, total_size);
@@ -167,9 +168,10 @@ struct TupleHash
 template <>
 struct TupleHash<FIELD_TYPE_UNSIGNED> {
 	static uint32_t	hash(const struct tuple *tuple,
-			     const struct key_def *key_def)
+			     struct key_def *key_def)
 	{
-		const char *field = tuple_field(tuple, key_def->parts->fieldno);
+		const char *field =
+			tuple_field_by_part(tuple, key_def->parts);
 		uint64_t val = mp_decode_uint(&field);
 		if (likely(val <= UINT32_MAX))
 			return val;
@@ -211,12 +213,12 @@ static const hasher_signature hash_arr[] = {
 
 #undef HASHER
 
-template <bool has_optional_parts>
+template <bool has_optional_parts, bool has_json_paths>
 uint32_t
-tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def);
+tuple_hash_slowpath(const struct tuple *tuple, struct key_def *key_def);
 
 uint32_t
-key_hash_slowpath(const char *key, const struct key_def *key_def);
+key_hash_slowpath(const char *key, struct key_def *key_def);
 
 void
 tuple_hash_func_set(struct key_def *key_def) {
@@ -255,9 +257,9 @@ tuple_hash_func_set(struct key_def *key_def) {
 
 slowpath:
 	if (key_def->has_optional_parts)
-		key_def->tuple_hash = tuple_hash_slowpath<true>;
+		key_def->tuple_hash = tuple_hash_slowpath<true, false>;
 	else
-		key_def->tuple_hash = tuple_hash_slowpath<false>;
+		key_def->tuple_hash = tuple_hash_slowpath<false, false>;
 	key_def->key_hash = key_hash_slowpath;
 }
 
@@ -308,26 +310,34 @@ tuple_hash_null(uint32_t *ph1, uint32_t *pcarry)
 }
 
 uint32_t
-tuple_hash_key_part(uint32_t *ph1, uint32_t *pcarry,
-		    const struct tuple *tuple,
-		    const struct key_part *part)
+tuple_hash_key_part(uint32_t *ph1, uint32_t *pcarry, const struct tuple *tuple,
+		    struct key_part *part)
 {
-	const char *field = tuple_field(tuple, part->fieldno);
+	const char *field = tuple_field_by_part(tuple, part);
 	if (field == NULL)
 		return tuple_hash_null(ph1, pcarry);
 	return tuple_hash_field(ph1, pcarry, &field, part->coll);
 }
 
-template <bool has_optional_parts>
+template <bool has_optional_parts, bool has_json_paths>
 uint32_t
-tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
+tuple_hash_slowpath(const struct tuple *tuple, struct key_def *key_def)
 {
 	assert(has_optional_parts == key_def->has_optional_parts);
 	uint32_t h = HASH_SEED;
 	uint32_t carry = 0;
 	uint32_t total_size = 0;
 	uint32_t prev_fieldno = key_def->parts[0].fieldno;
-	const char *field = tuple_field(tuple, key_def->parts[0].fieldno);
+	struct tuple_format *format = tuple_format(tuple);
+	const char *tuple_raw = tuple_data(tuple);
+	const uint32_t *field_map = tuple_field_map(tuple);
+	const char *field;
+	if (!has_json_paths) {
+		field = tuple_field(tuple, prev_fieldno);
+	} else {
+		field = tuple_field_by_part_raw(format, tuple_raw, field_map,
+						key_def->parts);
+	}
 	const char *end = (char *)tuple + tuple_size(tuple);
 	if (has_optional_parts && field == NULL) {
 		total_size += tuple_hash_null(&h, &carry);
@@ -341,7 +351,18 @@ tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
 		 * need of tuple_field
 		 */
 		if (prev_fieldno + 1 != key_def->parts[part_id].fieldno) {
-			field = tuple_field(tuple, key_def->parts[part_id].fieldno);
+			if (!has_json_paths) {
+				field = tuple_field(tuple,
+						    key_def->parts[part_id].
+						    fieldno);
+			} else {
+				struct key_part *part =
+					&key_def->parts[part_id];
+				field = tuple_field_by_part_raw(format,
+								tuple_raw,
+								field_map,
+								part);
+			}
 		}
 		if (has_optional_parts && (field == NULL || field >= end)) {
 			total_size += tuple_hash_null(&h, &carry);
@@ -357,13 +378,13 @@ tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
 }
 
 uint32_t
-key_hash_slowpath(const char *key, const struct key_def *key_def)
+key_hash_slowpath(const char *key, struct key_def *key_def)
 {
 	uint32_t h = HASH_SEED;
 	uint32_t carry = 0;
 	uint32_t total_size = 0;
 
-	for (const struct key_part *part = key_def->parts;
+	for (struct key_part *part = key_def->parts;
 	     part < key_def->parts + key_def->part_count; part++) {
 		total_size += tuple_hash_field(&h, &carry, &key, part->coll);
 	}
diff --git a/src/box/tuple_hash.h b/src/box/tuple_hash.h
index aab8f54..abc961b 100644
--- a/src/box/tuple_hash.h
+++ b/src/box/tuple_hash.h
@@ -70,9 +70,8 @@ tuple_hash_field(uint32_t *ph1, uint32_t *pcarry, const char **field,
  * This function updates @ph1 and @pcarry.
  */
 uint32_t
-tuple_hash_key_part(uint32_t *ph1, uint32_t *pcarry,
-		    const struct tuple *tuple,
-		    const struct key_part *part);
+tuple_hash_key_part(uint32_t *ph1, uint32_t *pcarry, const struct tuple *tuple,
+		    struct key_part *part);
 
 /**
  * Calculates a common hash value for a tuple
@@ -81,7 +80,7 @@ tuple_hash_key_part(uint32_t *ph1, uint32_t *pcarry,
  * @return - hash value
  */
 static inline uint32_t
-tuple_hash(const struct tuple *tuple, const struct key_def *key_def)
+tuple_hash(const struct tuple *tuple, struct key_def *key_def)
 {
 	return key_def->tuple_hash(tuple, key_def);
 }
@@ -93,7 +92,7 @@ tuple_hash(const struct tuple *tuple, const struct key_def *key_def)
  * @return - hash value
  */
 static inline uint32_t
-key_hash(const char *key, const struct key_def *key_def)
+key_hash(const char *key, struct key_def *key_def)
 {
 	return key_def->key_hash(key, key_def);
 }
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 0b33b6f..86a33ec 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -557,7 +557,7 @@ vinyl_engine_check_space_def(struct space_def *def)
 
 static struct space *
 vinyl_engine_create_space(struct engine *engine, struct space_def *def,
-			  struct rlist *key_list)
+			  struct rlist *key_list, uint64_t epoch)
 {
 	struct space *space = malloc(sizeof(*space));
 	if (space == NULL) {
@@ -589,6 +589,7 @@ vinyl_engine_create_space(struct engine *engine, struct space_def *def,
 		return NULL;
 	}
 	format->exact_field_count = def->exact_field_count;
+	format->epoch = ++epoch;
 	tuple_format_ref(format);
 
 	if (space_create(space, engine, &vinyl_space_vtab,
diff --git a/src/box/vy_history.c b/src/box/vy_history.c
index 498da97..0f3b711 100644
--- a/src/box/vy_history.c
+++ b/src/box/vy_history.c
@@ -73,7 +73,7 @@ vy_history_cleanup(struct vy_history *history)
 }
 
 int
-vy_history_apply(struct vy_history *history, const struct key_def *cmp_def,
+vy_history_apply(struct vy_history *history, struct key_def *cmp_def,
 		 struct tuple_format *format, bool keep_delete,
 		 int *upserts_applied, struct tuple **ret)
 {
diff --git a/src/box/vy_history.h b/src/box/vy_history.h
index 1f8bb59..e3c5a19 100644
--- a/src/box/vy_history.h
+++ b/src/box/vy_history.h
@@ -154,7 +154,7 @@ vy_history_cleanup(struct vy_history *history);
  * will return NULL unless @keep_delete flag is set.
  */
 int
-vy_history_apply(struct vy_history *history, const struct key_def *cmp_def,
+vy_history_apply(struct vy_history *history, struct key_def *cmp_def,
 		 struct tuple_format *format, bool keep_delete,
 		 int *upserts_applied, struct tuple **ret);
 
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index cb3c436..8fa86d3 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -158,6 +158,7 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_cache_env *cache_env,
 						    NULL);
 		if (lsm->disk_format == NULL)
 			goto fail_format;
+		lsm->disk_format->epoch = format->epoch;
 	}
 	tuple_format_ref(lsm->disk_format);
 
@@ -166,6 +167,7 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_cache_env *cache_env,
 			vy_tuple_format_new_with_colmask(format);
 		if (lsm->mem_format_with_colmask == NULL)
 			goto fail_mem_format_with_colmask;
+		lsm->mem_format_with_colmask->epoch = format->epoch;
 	} else {
 		lsm->mem_format_with_colmask = pk->mem_format_with_colmask;
 	}
diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c
index f9be850..ccd079f 100644
--- a/src/box/vy_mem.c
+++ b/src/box/vy_mem.c
@@ -97,7 +97,7 @@ vy_mem_tree_extent_free(void *ctx, void *p)
 
 struct vy_mem *
 vy_mem_new(struct vy_mem_env *env, int64_t generation,
-	   const struct key_def *cmp_def, struct tuple_format *format,
+	   struct key_def *cmp_def, struct tuple_format *format,
 	   struct tuple_format *format_with_colmask,
 	   uint32_t space_cache_version)
 {
@@ -321,7 +321,7 @@ vy_mem_iterator_find_lsn(struct vy_mem_iterator *itr,
 {
 	assert(!vy_mem_tree_iterator_is_invalid(&itr->curr_pos));
 	assert(itr->curr_stmt == vy_mem_iterator_curr_stmt(itr));
-	const struct key_def *cmp_def = itr->mem->cmp_def;
+	struct key_def *cmp_def = itr->mem->cmp_def;
 	while (vy_stmt_lsn(itr->curr_stmt) > (**itr->read_view).vlsn ||
 	       vy_stmt_flags(itr->curr_stmt) & VY_STMT_SKIP_READ) {
 		if (vy_mem_iterator_step(itr, iterator_type) != 0 ||
@@ -461,7 +461,7 @@ vy_mem_iterator_next_key(struct vy_mem_iterator *itr)
 	assert(itr->mem->version == itr->version);
 	assert(!vy_mem_tree_iterator_is_invalid(&itr->curr_pos));
 	assert(itr->curr_stmt == vy_mem_iterator_curr_stmt(itr));
-	const struct key_def *cmp_def = itr->mem->cmp_def;
+	struct key_def *cmp_def = itr->mem->cmp_def;
 
 	const struct tuple *prev_stmt = itr->curr_stmt;
 	do {
@@ -493,7 +493,7 @@ vy_mem_iterator_next_lsn(struct vy_mem_iterator *itr)
 	assert(itr->mem->version == itr->version);
 	assert(!vy_mem_tree_iterator_is_invalid(&itr->curr_pos));
 	assert(itr->curr_stmt == vy_mem_iterator_curr_stmt(itr));
-	const struct key_def *cmp_def = itr->mem->cmp_def;
+	struct key_def *cmp_def = itr->mem->cmp_def;
 
 	struct vy_mem_tree_iterator next_pos = itr->curr_pos;
 next:
diff --git a/src/box/vy_mem.h b/src/box/vy_mem.h
index 29b60ac..d6afeed 100644
--- a/src/box/vy_mem.h
+++ b/src/box/vy_mem.h
@@ -88,7 +88,7 @@ struct tree_mem_key {
  */
 static int
 vy_mem_tree_cmp(const struct tuple *a, const struct tuple *b,
-		const struct key_def *cmp_def)
+		struct key_def *cmp_def)
 {
 	int res = vy_tuple_compare(a, b, cmp_def);
 	if (res)
@@ -102,7 +102,7 @@ vy_mem_tree_cmp(const struct tuple *a, const struct tuple *b,
  */
 static int
 vy_mem_tree_cmp_key(const struct tuple *a, struct tree_mem_key *key,
-		    const struct key_def *cmp_def)
+		    struct key_def *cmp_def)
 {
 	int res = vy_stmt_compare(a, key->stmt, cmp_def);
 	if (res == 0) {
@@ -123,7 +123,7 @@ vy_mem_tree_cmp_key(const struct tuple *a, struct tree_mem_key *key,
 #define BPS_TREE_COMPARE_KEY(a, b, cmp_def) vy_mem_tree_cmp_key(a, b, cmp_def)
 #define bps_tree_elem_t const struct tuple *
 #define bps_tree_key_t struct tree_mem_key *
-#define bps_tree_arg_t const struct key_def *
+#define bps_tree_arg_t struct key_def *
 #define BPS_TREE_NO_DEBUG
 
 #include <salad/bps_tree.h>
@@ -183,7 +183,7 @@ struct vy_mem {
 	 * Key definition for this index, extended with primary
 	 * key parts.
 	 */
-	const struct key_def *cmp_def;
+	struct key_def *cmp_def;
 	/** version is initially 0 and is incremented on every write */
 	uint32_t version;
 	/** Data dictionary cache version at the time of creation. */
@@ -265,7 +265,7 @@ vy_mem_wait_pinned(struct vy_mem *mem)
  */
 struct vy_mem *
 vy_mem_new(struct vy_mem_env *env, int64_t generation,
-	   const struct key_def *cmp_def, struct tuple_format *format,
+	   struct key_def *cmp_def, struct tuple_format *format,
 	   struct tuple_format *format_with_colmask,
 	   uint32_t space_cache_version);
 
diff --git a/src/box/vy_range.c b/src/box/vy_range.c
index 6a55a01..6b6d23c 100644
--- a/src/box/vy_range.c
+++ b/src/box/vy_range.c
@@ -178,7 +178,7 @@ vy_range_tree_find_by_key(vy_range_tree_t *tree,
 
 struct vy_range *
 vy_range_new(int64_t id, struct tuple *begin, struct tuple *end,
-	     const struct key_def *cmp_def)
+	     struct key_def *cmp_def)
 {
 	struct vy_range *range = calloc(1, sizeof(*range));
 	if (range == NULL) {
diff --git a/src/box/vy_range.h b/src/box/vy_range.h
index d7031e7..2f67840 100644
--- a/src/box/vy_range.h
+++ b/src/box/vy_range.h
@@ -72,7 +72,7 @@ struct vy_range {
 	 * keys, to ensure an always distinct result for
 	 * non-unique keys.
 	 */
-	const struct key_def *cmp_def;
+	struct key_def *cmp_def;
 	/** An estimate of the number of statements in this range. */
 	struct vy_disk_stmt_counter count;
 	/**
@@ -194,7 +194,7 @@ vy_range_tree_find_by_key(vy_range_tree_t *tree,
  */
 struct vy_range *
 vy_range_new(int64_t id, struct tuple *begin, struct tuple *end,
-	     const struct key_def *cmp_def);
+	     struct key_def *cmp_def);
 
 /**
  * Free a range and all its slices.
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index f107e3a..7485d97 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -302,8 +302,8 @@ vy_run_bloom_size(struct vy_run *run)
  */
 static uint32_t
 vy_page_index_find_page(struct vy_run *run, const struct tuple *key,
-			const struct key_def *cmp_def,
-			enum iterator_type itype, bool *equal_key)
+			struct key_def *cmp_def, enum iterator_type itype,
+			bool *equal_key)
 {
 	if (itype == ITER_EQ)
 		itype = ITER_GE; /* One day it'll become obsolete */
@@ -365,9 +365,8 @@ vy_page_index_find_page(struct vy_run *run, const struct tuple *key,
 }
 
 struct vy_slice *
-vy_slice_new(int64_t id, struct vy_run *run,
-	     struct tuple *begin, struct tuple *end,
-	     const struct key_def *cmp_def)
+vy_slice_new(int64_t id, struct vy_run *run, struct tuple *begin,
+	     struct tuple *end, struct key_def *cmp_def)
 {
 	struct vy_slice *slice = malloc(sizeof(*slice));
 	if (slice == NULL) {
@@ -446,9 +445,8 @@ vy_slice_delete(struct vy_slice *slice)
 }
 
 int
-vy_slice_cut(struct vy_slice *slice, int64_t id,
-	     struct tuple *begin, struct tuple *end,
-	     const struct key_def *cmp_def,
+vy_slice_cut(struct vy_slice *slice, int64_t id, struct tuple *begin,
+	     struct tuple *end, struct key_def *cmp_def,
 	     struct vy_slice **result)
 {
 	*result = NULL;
@@ -1148,7 +1146,7 @@ vy_run_iterator_find_lsn(struct vy_run_iterator *itr,
 			 const struct tuple *key, struct tuple **ret)
 {
 	struct vy_slice *slice = itr->slice;
-	const struct key_def *cmp_def = itr->cmp_def;
+	struct key_def *cmp_def = itr->cmp_def;
 
 	*ret = NULL;
 
@@ -1228,7 +1226,7 @@ vy_run_iterator_do_seek(struct vy_run_iterator *itr,
 	*ret = NULL;
 
 	struct tuple_bloom *bloom = run->info.bloom;
-	const struct key_def *key_def = itr->key_def;
+	struct key_def *key_def = itr->key_def;
 	if (iterator_type == ITER_EQ && bloom != NULL) {
 		bool need_lookup;
 		if (vy_stmt_type(key) == IPROTO_SELECT) {
@@ -1318,7 +1316,7 @@ vy_run_iterator_seek(struct vy_run_iterator *itr,
 		     enum iterator_type iterator_type,
 		     const struct tuple *key, struct tuple **ret)
 {
-	const struct key_def *cmp_def = itr->cmp_def;
+	struct key_def *cmp_def = itr->cmp_def;
 	struct vy_slice *slice = itr->slice;
 	const struct tuple *check_eq_key = NULL;
 	int cmp;
@@ -1392,8 +1390,7 @@ vy_run_iterator_open(struct vy_run_iterator *itr,
 		     struct vy_run_iterator_stat *stat,
 		     struct vy_slice *slice, enum iterator_type iterator_type,
 		     const struct tuple *key, const struct vy_read_view **rv,
-		     const struct key_def *cmp_def,
-		     const struct key_def *key_def,
+		     struct key_def *cmp_def, struct key_def *key_def,
 		     struct tuple_format *format,
 		     bool is_primary)
 {
@@ -1729,7 +1726,7 @@ fail:
 /* dump statement to the run page buffers (stmt header and data) */
 static int
 vy_run_dump_stmt(const struct tuple *value, struct xlog *data_xlog,
-		 struct vy_page_info *info, const struct key_def *key_def,
+		 struct vy_page_info *info, struct key_def *key_def,
 		 bool is_primary)
 {
 	struct xrow_header xrow;
@@ -2019,9 +2016,9 @@ fail:
 
 int
 vy_run_writer_create(struct vy_run_writer *writer, struct vy_run *run,
-		const char *dirpath, uint32_t space_id, uint32_t iid,
-		const struct key_def *cmp_def, const struct key_def *key_def,
-		uint64_t page_size, double bloom_fpr)
+		     const char *dirpath, uint32_t space_id, uint32_t iid,
+		     struct key_def *cmp_def, struct key_def *key_def,
+		     uint64_t page_size, double bloom_fpr)
 {
 	memset(writer, 0, sizeof(*writer));
 	writer->run = run;
@@ -2285,10 +2282,8 @@ vy_run_writer_abort(struct vy_run_writer *writer)
 int
 vy_run_rebuild_index(struct vy_run *run, const char *dir,
 		     uint32_t space_id, uint32_t iid,
-		     const struct key_def *cmp_def,
-		     const struct key_def *key_def,
-		     struct tuple_format *format,
-		     const struct index_opts *opts)
+		     struct key_def *cmp_def, struct key_def *key_def,
+		     struct tuple_format *format, const struct index_opts *opts)
 {
 	assert(run->info.bloom == NULL);
 	assert(run->page_info == NULL);
@@ -2628,7 +2623,7 @@ static const struct vy_stmt_stream_iface vy_slice_stream_iface = {
 
 void
 vy_slice_stream_open(struct vy_slice_stream *stream, struct vy_slice *slice,
-		     const struct key_def *cmp_def, struct tuple_format *format,
+		     struct key_def *cmp_def, struct tuple_format *format,
 		     bool is_primary)
 {
 	stream->base.iface = &vy_slice_stream_iface;
diff --git a/src/box/vy_run.h b/src/box/vy_run.h
index 5030886..d74f216 100644
--- a/src/box/vy_run.h
+++ b/src/box/vy_run.h
@@ -218,9 +218,9 @@ struct vy_run_iterator {
 
 	/* Members needed for memory allocation and disk access */
 	/** Key definition used for comparing statements on disk. */
-	const struct key_def *cmp_def;
+	struct key_def *cmp_def;
 	/** Key definition provided by the user. */
-	const struct key_def *key_def;
+	struct key_def *key_def;
 	/**
 	 * Format ot allocate REPLACE and DELETE tuples read from
 	 * pages.
@@ -370,8 +370,7 @@ vy_run_recover(struct vy_run *run, const char *dir,
 int
 vy_run_rebuild_index(struct vy_run *run, const char *dir,
 		     uint32_t space_id, uint32_t iid,
-		     const struct key_def *cmp_def,
-		     const struct key_def *key_def,
+		     struct key_def *cmp_def, struct key_def *key_def,
 		     struct tuple_format *format,
 		     const struct index_opts *opts);
 
@@ -428,9 +427,8 @@ vy_run_remove_files(const char *dir, uint32_t space_id,
  * This function increments @run->refs.
  */
 struct vy_slice *
-vy_slice_new(int64_t id, struct vy_run *run,
-	     struct tuple *begin, struct tuple *end,
-	     const struct key_def *cmp_def);
+vy_slice_new(int64_t id, struct vy_run *run, struct tuple *begin,
+	     struct tuple *end, struct key_def *cmp_def);
 
 /**
  * Free a run slice.
@@ -480,9 +478,8 @@ vy_slice_wait_pinned(struct vy_slice *slice)
  * with [@begin, @end), @result is set to NULL.
  */
 int
-vy_slice_cut(struct vy_slice *slice, int64_t id,
-	     struct tuple *begin, struct tuple *end,
-	     const struct key_def *cmp_def,
+vy_slice_cut(struct vy_slice *slice, int64_t id, struct tuple *begin,
+	     struct tuple *end, struct key_def *cmp_def,
 	     struct vy_slice **result);
 
 /**
@@ -496,8 +493,7 @@ vy_run_iterator_open(struct vy_run_iterator *itr,
 		     struct vy_run_iterator_stat *stat,
 		     struct vy_slice *slice, enum iterator_type iterator_type,
 		     const struct tuple *key, const struct vy_read_view **rv,
-		     const struct key_def *cmp_def,
-		     const struct key_def *key_def,
+		     struct key_def *cmp_def, struct key_def *key_def,
 		     struct tuple_format *format, bool is_primary);
 
 /**
@@ -547,7 +543,7 @@ struct vy_slice_stream {
 	 * Key def for comparing with slice boundaries,
 	 * includes secondary key parts.
 	 */
-	const struct key_def *cmp_def;
+	struct key_def *cmp_def;
 	/** Format for allocating REPLACE and DELETE tuples read from pages. */
 	struct tuple_format *format;
 	/** Set if this iterator is for a primary index. */
@@ -559,7 +555,7 @@ struct vy_slice_stream {
  */
 void
 vy_slice_stream_open(struct vy_slice_stream *stream, struct vy_slice *slice,
-		     const struct key_def *cmp_def, struct tuple_format *format,
+		     struct key_def *cmp_def, struct tuple_format *format,
 		     bool is_primary);
 
 /**
@@ -580,9 +576,9 @@ struct vy_run_writer {
 	 * min key, run min/max keys, and secondary index
 	 * statements.
 	 */
-	const struct key_def *cmp_def;
+	struct key_def *cmp_def;
 	/** Key definition to calculate bloom. */
-	const struct key_def *key_def;
+	struct key_def *key_def;
 	/**
 	 * Minimal page size. When a page becames bigger, it is
 	 * dumped.
@@ -610,9 +606,9 @@ struct vy_run_writer {
 /** Create a run writer to fill a run with statements. */
 int
 vy_run_writer_create(struct vy_run_writer *writer, struct vy_run *run,
-		const char *dirpath, uint32_t space_id, uint32_t iid,
-		const struct key_def *cmp_def, const struct key_def *key_def,
-		uint64_t page_size, double bloom_fpr);
+		     const char *dirpath, uint32_t space_id, uint32_t iid,
+		     struct key_def *cmp_def, struct key_def *key_def,
+		     uint64_t page_size, double bloom_fpr);
 
 /**
  * Write a specified statement into a run.
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index 37da282..8018dee 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -387,8 +387,7 @@ vy_stmt_new_surrogate_from_key(const char *key, enum iproto_type type,
 }
 
 struct tuple *
-vy_stmt_new_surrogate_delete_from_key(const char *key,
-				      const struct key_def *cmp_def,
+vy_stmt_new_surrogate_delete_from_key(const char *key, struct key_def *cmp_def,
 				      struct tuple_format *format)
 {
 	return vy_stmt_new_surrogate_from_key(key, IPROTO_DELETE,
@@ -457,7 +456,7 @@ vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
 }
 
 struct tuple *
-vy_stmt_extract_key(const struct tuple *stmt, const struct key_def *key_def,
+vy_stmt_extract_key(const struct tuple *stmt, struct key_def *key_def,
 		    struct tuple_format *format)
 {
 	struct region *region = &fiber()->gc;
@@ -475,7 +474,7 @@ vy_stmt_extract_key(const struct tuple *stmt, const struct key_def *key_def,
 
 struct tuple *
 vy_stmt_extract_key_raw(const char *data, const char *data_end,
-			const struct key_def *key_def,
+			struct key_def *key_def,
 			struct tuple_format *format)
 {
 	struct region *region = &fiber()->gc;
@@ -543,9 +542,8 @@ vy_stmt_meta_decode(struct request *request, struct tuple *stmt)
 }
 
 int
-vy_stmt_encode_primary(const struct tuple *value,
-		       const struct key_def *key_def, uint32_t space_id,
-		       struct xrow_header *xrow)
+vy_stmt_encode_primary(const struct tuple *value, struct key_def *key_def,
+		       uint32_t space_id, struct xrow_header *xrow)
 {
 	memset(xrow, 0, sizeof(*xrow));
 	enum iproto_type type = vy_stmt_type(value);
@@ -591,8 +589,7 @@ vy_stmt_encode_primary(const struct tuple *value,
 }
 
 int
-vy_stmt_encode_secondary(const struct tuple *value,
-			 const struct key_def *cmp_def,
+vy_stmt_encode_secondary(const struct tuple *value, struct key_def *cmp_def,
 			 struct xrow_header *xrow)
 {
 	memset(xrow, 0, sizeof(*xrow));
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index 273d5e8..b52b4e2 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -331,7 +331,7 @@ vy_stmt_unref_if_possible(struct tuple *stmt)
  */
 static inline int
 vy_key_compare(const struct tuple *a, const struct tuple *b,
-	       const struct key_def *cmp_def)
+	       struct key_def *cmp_def)
 {
 	assert(vy_stmt_type(a) == IPROTO_SELECT);
 	assert(vy_stmt_type(b) == IPROTO_SELECT);
@@ -352,7 +352,7 @@ vy_key_compare(const struct tuple *a, const struct tuple *b,
  */
 static inline int
 vy_tuple_compare(const struct tuple *a, const struct tuple *b,
-		 const struct key_def *cmp_def)
+		 struct key_def *cmp_def)
 {
 	enum iproto_type type;
 	type = vy_stmt_type(a);
@@ -381,7 +381,7 @@ vy_tuple_compare(const struct tuple *a, const struct tuple *b,
  */
 static inline int
 vy_tuple_compare_with_raw_key(const struct tuple *tuple, const char *key,
-			      const struct key_def *key_def)
+			      struct key_def *key_def)
 {
 	uint32_t part_count = mp_decode_array(&key);
 	return tuple_compare_with_key(tuple, key, part_count, key_def);
@@ -390,7 +390,7 @@ vy_tuple_compare_with_raw_key(const struct tuple *tuple, const char *key,
 /** @sa vy_tuple_compare_with_raw_key(). */
 static inline int
 vy_tuple_compare_with_key(const struct tuple *tuple, const struct tuple *key,
-			  const struct key_def *key_def)
+			  struct key_def *key_def)
 {
 	const char *key_mp = tuple_data(key);
 	uint32_t part_count = mp_decode_array(&key_mp);
@@ -400,7 +400,7 @@ vy_tuple_compare_with_key(const struct tuple *tuple, const struct tuple *key,
 /** @sa tuple_compare. */
 static inline int
 vy_stmt_compare(const struct tuple *a, const struct tuple *b,
-		const struct key_def *key_def)
+		struct key_def *key_def)
 {
 	bool a_is_tuple = vy_stmt_type(a) != IPROTO_SELECT;
 	bool b_is_tuple = vy_stmt_type(b) != IPROTO_SELECT;
@@ -419,7 +419,7 @@ vy_stmt_compare(const struct tuple *a, const struct tuple *b,
 /** @sa tuple_compare_with_raw_key. */
 static inline int
 vy_stmt_compare_with_raw_key(const struct tuple *stmt, const char *key,
-			     const struct key_def *key_def)
+			     struct key_def *key_def)
 {
 	if (vy_stmt_type(stmt) != IPROTO_SELECT)
 		return vy_tuple_compare_with_raw_key(stmt, key, key_def);
@@ -429,7 +429,7 @@ vy_stmt_compare_with_raw_key(const struct tuple *stmt, const char *key,
 /** @sa tuple_compare_with_key. */
 static inline int
 vy_stmt_compare_with_key(const struct tuple *stmt, const struct tuple *key,
-			 const struct key_def *key_def)
+			 struct key_def *key_def)
 {
 	assert(vy_stmt_type(key) == IPROTO_SELECT);
 	return vy_stmt_compare_with_raw_key(stmt, tuple_data(key), key_def);
@@ -476,7 +476,7 @@ vy_key_dup(const char *key);
  */
 struct tuple *
 vy_stmt_new_surrogate_delete_from_key(const char *key,
-				      const struct key_def *cmp_def,
+				      struct key_def *cmp_def,
 				      struct tuple_format *format);
 
 /**
@@ -628,7 +628,7 @@ vy_key_from_msgpack(struct tuple_format *format, const char *key)
  * malloc().
  */
 struct tuple *
-vy_stmt_extract_key(const struct tuple *stmt, const struct key_def *key_def,
+vy_stmt_extract_key(const struct tuple *stmt, struct key_def *key_def,
 		    struct tuple_format *format);
 
 /**
@@ -638,7 +638,7 @@ vy_stmt_extract_key(const struct tuple *stmt, const struct key_def *key_def,
  */
 struct tuple *
 vy_stmt_extract_key_raw(const char *data, const char *data_end,
-			const struct key_def *key_def,
+			struct key_def *key_def,
 			struct tuple_format *format);
 
 /**
@@ -654,9 +654,8 @@ vy_stmt_extract_key_raw(const char *data, const char *data_end,
  * @retval -1 if error
  */
 int
-vy_stmt_encode_primary(const struct tuple *value,
-		       const struct key_def *key_def, uint32_t space_id,
-		       struct xrow_header *xrow);
+vy_stmt_encode_primary(const struct tuple *value, struct key_def *key_def,
+		       uint32_t space_id, struct xrow_header *xrow);
 
 /**
  * Encode vy_stmt for a secondary key as xrow_header
@@ -669,8 +668,7 @@ vy_stmt_encode_primary(const struct tuple *value,
  * @retval -1 if error
  */
 int
-vy_stmt_encode_secondary(const struct tuple *value,
-			 const struct key_def *cmp_def,
+vy_stmt_encode_secondary(const struct tuple *value, struct key_def *cmp_def,
 			 struct xrow_header *xrow);
 
 /**
@@ -716,10 +714,15 @@ vy_tuple_format_new_with_colmask(struct tuple_format *mem_format);
  * @retval Does the key contain NULL or not?
  */
 static inline bool
-vy_tuple_key_contains_null(const struct tuple *tuple, const struct key_def *def)
+vy_tuple_key_contains_null(const struct tuple *tuple, struct key_def *def)
 {
-	for (uint32_t i = 0; i < def->part_count; ++i) {
-		const char *field = tuple_field(tuple, def->parts[i].fieldno);
+	struct tuple_format *format = tuple_format(tuple);
+	const char *data = tuple_data(tuple);
+	const uint32_t *field_map = tuple_field_map(tuple);
+	for (struct key_part *part = def->parts, *end = part + def->part_count;
+	     part < end; ++part) {
+		const char *field =
+			tuple_field_by_part_raw(format, data, field_map, part);
 		if (field == NULL || mp_typeof(*field) == MP_NIL)
 			return true;
 	}
diff --git a/src/box/vy_upsert.c b/src/box/vy_upsert.c
index 7af58d9..ebea278 100644
--- a/src/box/vy_upsert.c
+++ b/src/box/vy_upsert.c
@@ -88,7 +88,7 @@ vy_upsert_try_to_squash(struct tuple_format *format, struct region *region,
 
 struct tuple *
 vy_apply_upsert(const struct tuple *new_stmt, const struct tuple *old_stmt,
-		const struct key_def *cmp_def, struct tuple_format *format,
+		struct key_def *cmp_def, struct tuple_format *format,
 		bool suppress_error)
 {
 	/*
diff --git a/src/box/vy_upsert.h b/src/box/vy_upsert.h
index 7878b1b..5649961 100644
--- a/src/box/vy_upsert.h
+++ b/src/box/vy_upsert.h
@@ -65,7 +65,7 @@ struct tuple_format;
  */
 struct tuple *
 vy_apply_upsert(const struct tuple *new_stmt, const struct tuple *old_stmt,
-		const struct key_def *cmp_def, struct tuple_format *format,
+		struct key_def *cmp_def, struct tuple_format *format,
 		bool suppress_error);
 
 #if defined(__cplusplus)
diff --git a/src/box/vy_write_iterator.c b/src/box/vy_write_iterator.c
index df9c933..5252a1a 100644
--- a/src/box/vy_write_iterator.c
+++ b/src/box/vy_write_iterator.c
@@ -166,7 +166,7 @@ struct vy_write_iterator {
 	/* A heap to order the sources, newest LSN at heap top. */
 	heap_t src_heap;
 	/** Index key definition used to store statements on disk. */
-	const struct key_def *cmp_def;
+	struct key_def *cmp_def;
 	/** Format to allocate new REPLACE and DELETE tuples from vy_run */
 	struct tuple_format *format;
 	/* There is no LSM tree level older than the one we're writing to. */
@@ -340,9 +340,9 @@ static const struct vy_stmt_stream_iface vy_slice_stream_iface;
  * @return the iterator or NULL on error (diag is set).
  */
 struct vy_stmt_stream *
-vy_write_iterator_new(const struct key_def *cmp_def,
-		      struct tuple_format *format, bool is_primary,
-		      bool is_last_level, struct rlist *read_views,
+vy_write_iterator_new(struct key_def *cmp_def, struct tuple_format *format, 
+		      bool is_primary, bool is_last_level, 
+		      struct rlist *read_views,
 		      struct vy_deferred_delete_handler *handler)
 {
 	/*
diff --git a/src/box/vy_write_iterator.h b/src/box/vy_write_iterator.h
index 5214b60..ffdc584 100644
--- a/src/box/vy_write_iterator.h
+++ b/src/box/vy_write_iterator.h
@@ -269,9 +269,9 @@ struct vy_deferred_delete_handler {
  * @return the iterator or NULL on error (diag is set).
  */
 struct vy_stmt_stream *
-vy_write_iterator_new(const struct key_def *cmp_def,
-		      struct tuple_format *format, bool is_primary,
-		      bool is_last_level, struct rlist *read_views,
+vy_write_iterator_new(struct key_def *cmp_def, struct tuple_format *format,
+		      bool is_primary, bool is_last_level, 
+		      struct rlist *read_views,
 		      struct vy_deferred_delete_handler *handler);
 
 /**
-- 
2.7.4




More information about the Tarantool-patches mailing list