[patches] [PATCH 3/3] Allow to do not specify tail nullable index columns

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Feb 10 00:54:44 MSK 2018


If a column is nullable and is the last defined one (via index parts
or space format), it can be omited on insertion. Such absent fields
are treated as NULLs in comparators and are not stored.

Closes #2988

Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
 src/box/alter.cc             |  34 ++-
 src/box/key_def.cc           |  32 ++-
 src/box/key_def.h            |  13 +-
 src/box/schema.cc            |   8 +-
 src/box/tuple_compare.cc     | 259 ++++++++++++------
 src/box/tuple_extract_key.cc | 205 ++++++++++++---
 src/box/tuple_format.c       |  39 ++-
 src/box/tuple_format.h       |  25 +-
 src/box/tuple_hash.cc        |  36 ++-
 src/box/vinyl.c              |   1 +
 src/box/vy_stmt.c            |  21 +-
 src/box/vy_stmt.h            |   2 +-
 test/engine/null.result      | 605 +++++++++++++++++++++++++++++++++++++++++++
 test/engine/null.test.lua    | 158 +++++++++++
 14 files changed, 1281 insertions(+), 157 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index c48e89f9e..e549d3c34 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -323,7 +323,39 @@ index_def_new_from_tuple(struct tuple *tuple, struct space *space)
 					     space->def->field_count) != 0)
 			diag_raise();
 	}
-	key_def = key_def_new_with_parts(part_def, part_count);
+	struct space_def *sd = space->def;
+	int other_index_count = 0;
+	struct key_def **keys = NULL;
+	/*
+	 * To detect which key parts are optional, min_field_count
+	 * is required. But min_field_count from an old space
+	 * format can not be used. For example, consider the case,
+	 * when a space has no format, has one primary index on a
+	 * first field and has a single secondary index on a
+	 * non-nullable second field. Min field count here is 2.
+	 * Now alter the secondary index to make its part be
+	 * nullable. The 'space' variable here is the old space,
+	 * where min_field_count is still 2, but actually it is
+	 * already 1. Actual min_field_count must be calculated
+	 * using old unchanged indexes, NEW definition of an
+	 * updated index and a space format, defined by an user.
+	 */
+	for (uint32_t i = 0; i < space->index_count; ++i) {
+		if (space->index[i]->def->iid != index_id)
+			++other_index_count;
+	}
+	if (other_index_count > 0) {
+		size_t bsize = other_index_count * sizeof(keys[0]);
+		keys = (struct key_def **) region_alloc_xc(&fiber()->gc, bsize);
+		for (uint32_t i = 0, j = 0; i < space->index_count; ++i) {
+			if (space->index[i]->def->iid != index_id)
+				keys[j++] = space->index[i]->def->key_def;
+		}
+	}
+	uint32_t min_field_count =
+		tuple_format_min_field_count(keys, other_index_count,
+					     sd->fields, sd->field_count);
+	key_def = key_def_new_with_parts(part_def, part_count, min_field_count);
 	if (key_def == NULL)
 		diag_raise();
 	struct index_def *index_def =
diff --git a/src/box/key_def.cc b/src/box/key_def.cc
index 7f00b82df..d613df122 100644
--- a/src/box/key_def.cc
+++ b/src/box/key_def.cc
@@ -131,15 +131,27 @@ key_def_new(uint32_t part_count)
 }
 
 struct key_def *
-key_def_new_with_parts(struct key_part_def *parts, uint32_t part_count)
+key_def_new_with_parts(struct key_part_def *parts, uint32_t part_count,
+		       uint32_t min_field_count)
 {
 	struct key_def *def = key_def_new(part_count);
 	if (def == NULL)
 		return NULL;
 
-	for (uint32_t i = 0; i < part_count; i++) {
+	/*
+	 * Update min_field_count to take a new key_def into
+	 * account.
+	 */
+	for (uint32_t i = 0; i < part_count; ++i) {
+		struct key_part_def *part = &parts[i];
+		if (!part->is_nullable && part->fieldno + 1 > min_field_count)
+			min_field_count = part->fieldno + 1;
+	}
+
+	for (uint32_t i = 0; i < part_count; ++i) {
 		struct key_part_def *part = &parts[i];
 		struct coll *coll = NULL;
+		bool is_optional = part->fieldno + 1 > min_field_count;
 		if (part->coll_id != COLL_NONE) {
 			coll = coll_by_id(part->coll_id);
 			if (coll == NULL) {
@@ -150,7 +162,7 @@ key_def_new_with_parts(struct key_part_def *parts, uint32_t part_count)
 			}
 		}
 		key_def_set_part(def, i, part->fieldno, part->type,
-				 part->is_nullable, coll);
+				 part->is_nullable, is_optional, coll);
 	}
 	return def;
 }
@@ -179,7 +191,7 @@ box_key_def_new(uint32_t *fields, uint32_t *types, uint32_t part_count)
 	for (uint32_t item = 0; item < part_count; ++item) {
 		key_def_set_part(key_def, item, fields[item],
 				 (enum field_type)types[item],
-				 key_part_def_default.is_nullable, NULL);
+				 key_part_def_default.is_nullable, false, NULL);
 	}
 	return key_def;
 }
@@ -238,15 +250,19 @@ key_part_check_compatibility(const struct key_part *old_parts,
 
 void
 key_def_set_part(struct key_def *def, uint32_t part_no, uint32_t fieldno,
-		 enum field_type type, bool is_nullable, struct coll *coll)
+		 enum field_type type, bool is_nullable, bool is_optional,
+		 struct coll *coll)
 {
 	assert(part_no < def->part_count);
 	assert(type < field_type_MAX);
+	assert(!is_optional || is_nullable);
 	def->is_nullable |= is_nullable;
+	def->is_optional |= is_optional;
 	def->parts[part_no].is_nullable = is_nullable;
 	def->parts[part_no].fieldno = fieldno;
 	def->parts[part_no].type = type;
 	def->parts[part_no].coll = coll;
+	def->parts[part_no].is_optional = is_optional;
 	column_mask_set_fieldno(&def->column_mask, fieldno);
 	/**
 	 * When all parts are set, initialize the tuple
@@ -514,7 +530,8 @@ key_def_merge(const struct key_def *first, const struct key_def *second)
 	end = part + first->part_count;
 	for (; part != end; part++) {
 		key_def_set_part(new_def, pos++, part->fieldno, part->type,
-				 part->is_nullable, part->coll);
+				 part->is_nullable, part->is_optional,
+				 part->coll);
 	}
 
 	/* Set-append second key def's part to the new key def. */
@@ -524,7 +541,8 @@ key_def_merge(const struct key_def *first, const struct key_def *second)
 		if (key_def_find(first, part->fieldno))
 			continue;
 		key_def_set_part(new_def, pos++, part->fieldno, part->type,
-				 part->is_nullable, part->coll);
+				 part->is_nullable, part->is_optional,
+				 part->coll);
 	}
 	return new_def;
 }
diff --git a/src/box/key_def.h b/src/box/key_def.h
index 201863f5a..f4dbafc06 100644
--- a/src/box/key_def.h
+++ b/src/box/key_def.h
@@ -72,6 +72,8 @@ struct key_part {
 	struct coll *coll;
 	/** True if a part can store NULLs. */
 	bool is_nullable;
+	/** True, if a part can absense in a tuple. */
+	bool is_optional;
 };
 
 struct key_def;
@@ -126,6 +128,11 @@ struct key_def {
 	uint32_t unique_part_count;
 	/** True, if at least one part can store NULL. */
 	bool is_nullable;
+	/**
+	 * True, if some key parts can absense in a tuple. These
+	 * fields assumed to be MP_NIL.
+	 */
+	bool is_optional;
 	/** Key fields mask. @sa column_mask.h for details. */
 	uint64_t column_mask;
 	/** The size of the 'parts' array. */
@@ -187,7 +194,8 @@ key_def_new(uint32_t part_count);
  * and initialize its parts.
  */
 struct key_def *
-key_def_new_with_parts(struct key_part_def *parts, uint32_t part_count);
+key_def_new_with_parts(struct key_part_def *parts, uint32_t part_count,
+		       uint32_t min_field_count);
 
 /**
  * Dump part definitions of the given key def.
@@ -201,7 +209,8 @@ key_def_dump_parts(const struct key_def *def, struct key_part_def *parts);
  */
 void
 key_def_set_part(struct key_def *def, uint32_t part_no, uint32_t fieldno,
-		 enum field_type type, bool is_nullable, struct coll *coll);
+		 enum field_type type, bool is_nullable, bool is_optional,
+		 struct coll *coll);
 
 /**
  * An snprint-style function to print a key definition.
diff --git a/src/box/schema.cc b/src/box/schema.cc
index 2e8533b94..5028fba5c 100644
--- a/src/box/schema.cc
+++ b/src/box/schema.cc
@@ -281,13 +281,13 @@ schema_init()
 	auto key_def_guard = make_scoped_guard([&] { box_key_def_delete(key_def); });
 
 	key_def_set_part(key_def, 0 /* part no */, 0 /* field no */,
-			 FIELD_TYPE_STRING, false, NULL);
+			 FIELD_TYPE_STRING, false, false, NULL);
 	sc_space_new(BOX_SCHEMA_ID, "_schema", key_def, &on_replace_schema,
 		     NULL);
 
 	/* _space - home for all spaces. */
 	key_def_set_part(key_def, 0 /* part no */, 0 /* field no */,
-			 FIELD_TYPE_UNSIGNED, false, NULL);
+			 FIELD_TYPE_UNSIGNED, false, false, NULL);
 
 	/* _collation - collation description. */
 	sc_space_new(BOX_COLLATION_ID, "_collation", key_def,
@@ -335,10 +335,10 @@ schema_init()
 		diag_raise();
 	/* space no */
 	key_def_set_part(key_def, 0 /* part no */, 0 /* field no */,
-			 FIELD_TYPE_UNSIGNED, false, NULL);
+			 FIELD_TYPE_UNSIGNED, false, false, NULL);
 	/* index no */
 	key_def_set_part(key_def, 1 /* part no */, 1 /* field no */,
-			 FIELD_TYPE_UNSIGNED, false, NULL);
+			 FIELD_TYPE_UNSIGNED, false, false, NULL);
 	sc_space_new(BOX_INDEX_ID, "_index", key_def,
 		     &alter_space_on_replace_index, &on_stmt_begin_index);
 }
diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc
index cf8fc130e..77fc64428 100644
--- a/src/box/tuple_compare.cc
+++ b/src/box/tuple_compare.cc
@@ -426,15 +426,23 @@ tuple_compare_field_with_hint(const char *field_a, enum mp_type a_type,
 	}
 }
 
-template<bool is_nullable>
+template<bool is_nullable, bool is_optional>
 static inline int
 tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 		       const struct key_def *key_def)
 {
+	assert(!is_optional || is_nullable);
+	assert(is_nullable == key_def->is_nullable);
+	assert(is_optional == key_def->is_optional);
 	const struct key_part *part = key_def->parts;
 	const char *tuple_a_raw = tuple_data(tuple_a);
 	const char *tuple_b_raw = tuple_data(tuple_b);
 	if (key_def->part_count == 1 && part->fieldno == 0) {
+		/*
+		 * First field can not be optional - empty tuples
+		 * can not exist.
+		 */
+		assert(!is_optional);
 		mp_decode_array(&tuple_a_raw);
 		mp_decode_array(&tuple_b_raw);
 		if (! is_nullable) {
@@ -458,8 +466,8 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 	const uint32_t *field_map_a = tuple_field_map(tuple_a);
 	const uint32_t *field_map_b = tuple_field_map(tuple_b);
 	const struct key_part *end;
-	const char *field_a;
-	const char *field_b;
+	const char *field_a, *field_b;
+	enum mp_type a_type, b_type;
 	int rc;
 	if (is_nullable)
 		end = part + key_def->unique_part_count;
@@ -471,7 +479,7 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 					  part->fieldno);
 		field_b = tuple_field_raw(format_b, tuple_b_raw, field_map_b,
 					  part->fieldno);
-		assert(field_a != NULL && field_b != NULL);
+		assert(is_optional || field_a != NULL && field_b != NULL);
 		if (! is_nullable) {
 			rc = tuple_compare_field(field_a, field_b, part->type,
 						 part->coll);
@@ -480,8 +488,13 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 			else
 				continue;
 		}
-		enum mp_type a_type = mp_typeof(*field_a);
-		enum mp_type b_type = mp_typeof(*field_b);
+		if (is_optional) {
+			a_type = field_a != NULL ? mp_typeof(*field_a) : MP_NIL;
+			b_type = field_b != NULL ? mp_typeof(*field_b) : MP_NIL;
+		} else {
+			a_type = mp_typeof(*field_a);
+			b_type = mp_typeof(*field_b);
+		}
 		if (a_type == MP_NIL) {
 			if (b_type != MP_NIL)
 				return -1;
@@ -515,6 +528,10 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 					  part->fieldno);
 		field_b = tuple_field_raw(format_b, tuple_b_raw, field_map_b,
 					  part->fieldno);
+		/*
+		 * Extended parts are primary, and they can not
+		 * absense or be NULLs.
+		 */
 		assert(field_a != NULL && field_b != NULL);
 		rc = tuple_compare_field(field_a, field_b, part->type,
 					 part->coll);
@@ -524,18 +541,22 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 	return 0;
 }
 
-template<bool is_nullable>
+template<bool is_nullable, bool is_optional>
 static inline int
 tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
 				uint32_t part_count,
 				const struct key_def *key_def)
 {
+	assert(!is_optional || is_nullable);
+	assert(is_nullable == key_def->is_nullable);
+	assert(is_optional == key_def->is_optional);
 	assert(key != NULL || part_count == 0);
 	assert(part_count <= key_def->part_count);
 	const struct key_part *part = key_def->parts;
 	const struct tuple_format *format = tuple_format(tuple);
 	const char *tuple_raw = tuple_data(tuple);
 	const uint32_t *field_map = tuple_field_map(tuple);
+	enum mp_type a_type, b_type;
 	if (likely(part_count == 1)) {
 		const char *field;
 		field = tuple_field_raw(format, tuple_raw, field_map,
@@ -544,8 +565,11 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
 			return tuple_compare_field(field, key, part->type,
 						   part->coll);
 		}
-		enum mp_type a_type = mp_typeof(*field);
-		enum mp_type b_type = mp_typeof(*key);
+		if (is_optional)
+			a_type = field != NULL ? mp_typeof(*field) : MP_NIL;
+		else
+			a_type = mp_typeof(*field);
+		b_type = mp_typeof(*key);
 		if (a_type == MP_NIL) {
 			return b_type == MP_NIL ? 0 : -1;
 		} else if (b_type == MP_NIL) {
@@ -564,15 +588,18 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
 		field = tuple_field_raw(format, tuple_raw, field_map,
 					part->fieldno);
 		if (! is_nullable) {
-			int rc = tuple_compare_field(field, key, part->type,
-						     part->coll);
+			rc = tuple_compare_field(field, key, part->type,
+						 part->coll);
 			if (rc != 0)
 				return rc;
 			else
 				continue;
 		}
-		enum mp_type a_type = mp_typeof(*field);
-		enum mp_type b_type = mp_typeof(*key);
+		if (is_optional)
+			a_type = field != NULL ? mp_typeof(*field) : MP_NIL;
+		else
+			a_type = mp_typeof(*field);
+		b_type = mp_typeof(*key);
 		if (a_type == MP_NIL) {
 			if (b_type != MP_NIL)
 				return -1;
@@ -589,18 +616,12 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
 	return 0;
 }
 
-/**
- * is_sequential_nullable_tuples used to determine if this
- * function is used to compare two tuples, which keys are
- * sequential. In such a case it is necessary to compare them
- * using primary parts if a NULL was met.
- */
-template<bool is_nullable, bool is_sequential_nullable_tuples>
+template<bool is_nullable>
 static inline int
 key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
 		  const struct key_def *key_def)
 {
-	assert(!is_sequential_nullable_tuples || is_nullable);
+	assert(is_nullable == key_def->is_nullable);
 	assert((key_a != NULL && key_b != NULL) || part_count == 0);
 	const struct key_part *part = key_def->parts;
 	if (likely(part_count == 1)) {
@@ -622,7 +643,6 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
 		}
 	}
 
-	bool was_null_met = false;
 	const struct key_part *end = part + part_count;
 	int rc;
 	for (; part < end; ++part, mp_next(&key_a), mp_next(&key_b)) {
@@ -639,7 +659,6 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
 		if (a_type == MP_NIL) {
 			if (b_type != MP_NIL)
 				return -1;
-			was_null_met = true;
 		} else if (b_type == MP_NIL) {
 			return 1;
 		} else {
@@ -650,31 +669,49 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
 				return rc;
 		}
 	}
-
-	if (!is_sequential_nullable_tuples || !was_null_met)
-		return 0;
-	end = key_def->parts + key_def->unique_part_count;
-	for (; part < end; ++part, mp_next(&key_a), mp_next(&key_b)) {
-		rc = tuple_compare_field(key_a, key_b, part->type, part->coll);
-		if (rc != 0)
-			return rc;
-	}
 	return 0;
 }
 
-template<bool is_nullable>
+template<bool is_nullable, bool is_optional>
 static inline int
-tuple_compare_with_key_sequential(const struct tuple *tuple,
-	const char *key, uint32_t part_count, const struct key_def *key_def)
+tuple_compare_with_key_sequential(const struct tuple *tuple, const char *key,
+				  uint32_t part_count,
+				  const struct key_def *key_def)
 {
+	assert(!is_optional || is_nullable);
 	assert(key_def_is_sequential(key_def));
+	assert(is_nullable == key_def->is_nullable);
+	assert(is_optional == key_def->is_optional);
 	const char *tuple_key = tuple_data(tuple);
-	uint32_t tuple_field_count = mp_decode_array(&tuple_key);
-	assert(tuple_field_count >= key_def->part_count);
-	assert(part_count <= key_def->part_count);
-	(void) tuple_field_count;
-	return key_compare_parts<is_nullable, false>(tuple_key, key, part_count,
-						     key_def);
+	uint32_t field_count = mp_decode_array(&tuple_key);
+	uint32_t cmp_part_count;
+	if (is_optional && field_count < part_count) {
+		cmp_part_count = field_count;
+	} else {
+		assert(field_count >= part_count);
+		cmp_part_count = part_count;
+	}
+	int rc = key_compare_parts<is_nullable>(tuple_key, key, cmp_part_count,
+						key_def);
+	if (!is_optional || rc != 0)
+		return rc;
+	/*
+	 * If some tuple indexed fields are absent, then check
+	 * corresponding key fields to be equal to NULL.
+	 */
+	if (field_count < part_count) {
+		/*
+		 * Key's and tuple's first field_count fields are
+		 * equal, and their bsize too.
+		 */
+		key += tuple->bsize - mp_sizeof_array(field_count);
+		for (uint32_t i = field_count; i < part_count;
+		     ++i, mp_next(&key)) {
+			if (mp_typeof(*key) != MP_NIL)
+				return -1;
+		}
+	}
+	return 0;
 }
 
 int
@@ -688,49 +725,83 @@ key_compare(const char *key_a, const char *key_b,
 	uint32_t part_count = MIN(part_count_a, part_count_b);
 	assert(part_count <= key_def->part_count);
 	if (! key_def->is_nullable) {
-		return key_compare_parts<false, false>(key_a, key_b, part_count,
-						       key_def);
+		return key_compare_parts<false>(key_a, key_b, part_count,
+						key_def);
 	} else {
-		return key_compare_parts<true, false>(key_a, key_b, part_count,
-						      key_def);
+		return key_compare_parts<true>(key_a, key_b, part_count,
+					       key_def);
 	}
 }
 
+template <bool is_nullable, bool is_optional>
 static int
 tuple_compare_sequential(const struct tuple *tuple_a,
 			 const struct tuple *tuple_b,
 			 const struct key_def *key_def)
 {
+	assert(!is_optional || is_nullable);
+	assert(is_optional == key_def->is_optional);
 	assert(key_def_is_sequential(key_def));
+	assert(is_nullable == key_def->is_nullable);
 	const char *key_a = tuple_data(tuple_a);
-	uint32_t field_count_a = mp_decode_array(&key_a);
-	assert(field_count_a >= key_def->part_count);
-	(void) field_count_a;
-	const char *key_b = tuple_data(tuple_b);
-	uint32_t field_count_b = mp_decode_array(&key_b);
-	assert(field_count_b >= key_def->part_count);
-	(void) field_count_b;
-	return key_compare_parts<false, false>(key_a, key_b,
-					       key_def->part_count, key_def);
-}
-
-static int
-tuple_compare_sequential_nullable(const struct tuple *tuple_a,
-				  const struct tuple *tuple_b,
-				  const struct key_def *key_def)
-{
-	assert(key_def->is_nullable);
-	assert(key_def_is_sequential(key_def));
-	const char *key_a = tuple_data(tuple_a);
-	uint32_t field_count = mp_decode_array(&key_a);
-	assert(field_count >= key_def->part_count);
+	uint32_t fc_a = mp_decode_array(&key_a);
 	const char *key_b = tuple_data(tuple_b);
-	field_count = mp_decode_array(&key_b);
-	assert(field_count >= key_def->part_count);
-	(void) field_count;
-	return key_compare_parts<true, true>(key_a, key_b,
-					     key_def->unique_part_count,
-					     key_def);
+	uint32_t fc_b = mp_decode_array(&key_b);
+	if (!is_optional && !is_nullable) {
+		assert(fc_a >= key_def->part_count);
+		assert(fc_b >= key_def->part_count);
+		return key_compare_parts<is_nullable>(key_a, key_b,
+						      key_def->part_count,
+						      key_def);
+	}
+	bool was_null_met = false;
+	const struct key_part *part = key_def->parts;
+	const struct key_part *end = part + key_def->unique_part_count;
+	int rc;
+	uint32_t i = 0;
+	for (; part < end; ++part, ++i) {
+		enum mp_type a_type, b_type;
+		if (is_optional) {
+			a_type = i >= fc_a ? MP_NIL : mp_typeof(*key_a);
+			b_type = i >= fc_b ? MP_NIL : mp_typeof(*key_b);
+		} else {
+			a_type = mp_typeof(*key_a);
+			b_type = mp_typeof(*key_b);
+		}
+		if (a_type == MP_NIL) {
+			if (b_type != MP_NIL)
+				return -1;
+			was_null_met = true;
+		} else if (b_type == MP_NIL) {
+			return 1;
+		} else {
+			rc = tuple_compare_field_with_hint(key_a, a_type, key_b,
+							   b_type, part->type,
+							   part->coll);
+			if (rc != 0)
+				return rc;
+		}
+		if (!is_optional || i < fc_a)
+			mp_next(&key_a);
+		if (!is_optional || i < fc_b)
+			mp_next(&key_b);
+	}
+	if (! was_null_met)
+		return 0;
+	end = key_def->parts + key_def->part_count;
+	for (; part < end; ++part, ++i, mp_next(&key_a), mp_next(&key_b)) {
+		/*
+		 * If tuples are equal by unique_part_count, then
+		 * the rest of parts are a primary key, which can
+		 * not absense or be null.
+		 */
+		assert(i < fc_a && i < fc_b);
+		rc = tuple_compare_field(key_a, key_b, part->type,
+					 part->coll);
+		if (rc != 0)
+			return rc;
+	}
+	return 0;
 }
 
 template <int TYPE>
@@ -913,12 +984,21 @@ static const comparator_signature cmp_arr[] = {
 #undef COMPARATOR
 
 tuple_compare_t
-tuple_compare_create(const struct key_def *def) {
+tuple_compare_create(const struct key_def *def)
+{
 	if (def->is_nullable) {
-		if (key_def_is_sequential(def))
-			return tuple_compare_sequential_nullable;
-		return tuple_compare_slowpath<true>;
+		if (key_def_is_sequential(def)) {
+			if (def->is_optional)
+				return tuple_compare_sequential<true, true>;
+			else
+				return tuple_compare_sequential<true, false>;
+		} else if (def->is_optional) {
+			return tuple_compare_slowpath<true, true>;
+		} else {
+			return tuple_compare_slowpath<true, false>;
+		}
 	}
+	assert(! def->is_optional);
 	if (!key_def_has_collation(def)) {
 		/* Precalculated comparators don't use collation */
 		for (uint32_t k = 0;
@@ -936,8 +1016,9 @@ tuple_compare_create(const struct key_def *def) {
 		}
 	}
 	if (key_def_is_sequential(def))
-		return tuple_compare_sequential;
-	return tuple_compare_slowpath<false>;
+		return tuple_compare_sequential<false, false>;
+	else
+		return tuple_compare_slowpath<false, false>;
 }
 
 /* }}} tuple_compare */
@@ -1123,10 +1204,21 @@ tuple_compare_with_key_t
 tuple_compare_with_key_create(const struct key_def *def)
 {
 	if (def->is_nullable) {
-		if (key_def_is_sequential(def))
-			return tuple_compare_with_key_sequential<true>;
-		return tuple_compare_with_key_slowpath<true>;
+		if (key_def_is_sequential(def)) {
+			if (def->is_optional) {
+				return tuple_compare_with_key_sequential<true,
+									 true>;
+			} else {
+				return tuple_compare_with_key_sequential<true,
+									 false>;
+			}
+		} else if (def->is_optional) {
+			return tuple_compare_with_key_slowpath<true, true>;
+		} else {
+			return tuple_compare_with_key_slowpath<true, false>;
+		}
 	}
+	assert(! def->is_optional);
 	if (!key_def_has_collation(def)) {
 		/* Precalculated comparators don't use collation */
 		for (uint32_t k = 0;
@@ -1147,8 +1239,9 @@ tuple_compare_with_key_create(const struct key_def *def)
 		}
 	}
 	if (key_def_is_sequential(def))
-		return tuple_compare_with_key_sequential<false>;
-	return tuple_compare_with_key_slowpath<false>;
+		return tuple_compare_with_key_sequential<false, false>;
+	else
+		return tuple_compare_with_key_slowpath<false, false>;
 }
 
 /* }}} tuple_compare_with_key */
diff --git a/src/box/tuple_extract_key.cc b/src/box/tuple_extract_key.cc
index 8765c16b5..860872c83 100644
--- a/src/box/tuple_extract_key.cc
+++ b/src/box/tuple_extract_key.cc
@@ -1,29 +1,52 @@
 #include "tuple_extract_key.h"
 #include "fiber.h"
 
+enum { MSGPACK_NULL = 0xc0 };
+
+/** True, if a key con contain two or more parts in sequence. */
+static bool
+key_def_contains_sequential_parts(const struct key_def *def)
+{
+	for (uint32_t i = 0; i < def->part_count - 1; ++i) {
+		if (def->parts[i].fieldno + 1 == def->parts[i + 1].fieldno)
+			return true;
+	}
+	return false;
+}
+
 /**
  * Optimized version of tuple_extract_key_raw() for sequential key defs
  * @copydoc tuple_extract_key_raw()
  */
+template <bool is_optional>
 static char *
 tuple_extract_key_sequential_raw(const char *data, const char *data_end,
 				 const struct key_def *key_def,
 				 uint32_t *key_size)
 {
+	assert(!is_optional || key_def->is_nullable);
 	assert(key_def_is_sequential(key_def));
+	assert(is_optional == key_def->is_optional);
+	assert(data_end != NULL);
+	assert(mp_sizeof_nil() == 1);
 	const char *field_start = data;
 	uint32_t bsize = mp_sizeof_array(key_def->part_count);
-
-	mp_decode_array(&field_start);
+	uint32_t field_count = mp_decode_array(&field_start);
 	const char *field_end = field_start;
-
-	for (uint32_t i = 0; i < key_def->part_count; i++)
-		mp_next(&field_end);
+	uint32_t null_count;
+	if (!is_optional || field_count > key_def->part_count) {
+		for (uint32_t i = 0; i < key_def->part_count; i++)
+			mp_next(&field_end);
+		null_count = 0;
+	} else {
+		assert(key_def->is_nullable);
+		null_count = key_def->part_count - field_count;
+		field_end = data_end;
+		bsize += null_count * mp_sizeof_nil();
+	}
+	assert(field_end - field_start <= data_end - data);
 	bsize += field_end - field_start;
 
-	assert(!data_end || (field_end - field_start <= data_end - data));
-	(void) data_end;
-
 	char *key = (char *) region_alloc(&fiber()->gc, bsize);
 	if (key == NULL) {
 		diag_set(OutOfMemory, bsize, "region",
@@ -32,6 +55,10 @@ tuple_extract_key_sequential_raw(const char *data, const char *data_end,
 	}
 	char *key_buf = mp_encode_array(key, key_def->part_count);
 	memcpy(key_buf, field_start, field_end - field_start);
+	if (is_optional && null_count > 0) {
+		key_buf += field_end - field_start;
+		memset(key_buf, MSGPACK_NULL, null_count);
+	}
 
 	if (key_size != NULL)
 		*key_size = bsize;
@@ -42,54 +69,77 @@ tuple_extract_key_sequential_raw(const char *data, const char *data_end,
  * Optimized version of tuple_extract_key() for sequential key defs
  * @copydoc tuple_extract_key()
  */
+template <bool is_optional>
 static inline char *
 tuple_extract_key_sequential(const struct tuple *tuple,
 			     const struct key_def *key_def,
 			     uint32_t *key_size)
 {
 	assert(key_def_is_sequential(key_def));
+	assert(!is_optional || key_def->is_nullable);
+	assert(is_optional == key_def->is_optional);
 	const char *data = tuple_data(tuple);
-	return tuple_extract_key_sequential_raw(data, NULL, key_def, key_size);
+	const char *data_end = data + tuple->bsize;
+	return tuple_extract_key_sequential_raw<is_optional>(data, data_end,
+							     key_def, key_size);
 }
 
 /**
  * General-purpose implementation of tuple_extract_key()
  * @copydoc tuple_extract_key()
  */
-template <bool contains_sequential_parts>
+template <bool contains_sequential_parts, bool is_optional>
 static char *
 tuple_extract_key_slowpath(const struct tuple *tuple,
 			   const struct key_def *key_def, uint32_t *key_size)
 {
+	assert(!is_optional || key_def->is_nullable);
+	assert(is_optional == key_def->is_optional);
+	assert(contains_sequential_parts ==
+	       key_def_contains_sequential_parts(key_def));
+	assert(mp_sizeof_nil() == 1);
 	const char *data = tuple_data(tuple);
 	uint32_t part_count = key_def->part_count;
 	uint32_t bsize = mp_sizeof_array(part_count);
 	const struct tuple_format *format = tuple_format(tuple);
 	const uint32_t *field_map = tuple_field_map(tuple);
+	const char *tuple_end = data + tuple->bsize;
 
 	/* Calculate the key size. */
 	for (uint32_t i = 0; i < part_count; ++i) {
 		const char *field =
 			tuple_field_raw(format, data, field_map,
 					key_def->parts[i].fieldno);
+		if (is_optional && field == NULL) {
+			assert(key_def->parts[i].is_optional);
+			bsize += mp_sizeof_nil();
+			continue;
+		}
+		assert(field != NULL);
 		const char *end = field;
 		if (contains_sequential_parts) {
 			/*
 			 * Skip sequential part in order to
 			 * minimize tuple_field_raw() calls.
 			 */
-			for (; i < key_def->part_count - 1; i++) {
+			for (; i < part_count - 1; i++) {
 				if (key_def->parts[i].fieldno + 1 !=
-					key_def->parts[i + 1].fieldno) {
+				    key_def->parts[i + 1].fieldno) {
 					/*
 					 * End of sequential part.
 					 */
 					break;
 				}
-				mp_next(&end);
+				if (!is_optional || end < tuple_end)
+					mp_next(&end);
+				else
+					bsize += mp_sizeof_nil();
 			}
 		}
-		mp_next(&end);
+		if (!is_optional || end < tuple_end)
+			mp_next(&end);
+		else
+			bsize += mp_sizeof_nil();
 		bsize += end - field;
 	}
 
@@ -103,27 +153,42 @@ tuple_extract_key_slowpath(const struct tuple *tuple,
 		const char *field =
 			tuple_field_raw(format, data, field_map,
 					key_def->parts[i].fieldno);
+		if (is_optional && field == NULL) {
+			key_buf = mp_encode_nil(key_buf);
+			continue;
+		}
 		const char *end = field;
+		uint32_t null_count = 0;
 		if (contains_sequential_parts) {
 			/*
 			 * Skip sequential part in order to
 			 * minimize tuple_field_raw() calls.
 			 */
-			for (; i < key_def->part_count - 1; i++) {
+			for (; i < part_count - 1; i++) {
 				if (key_def->parts[i].fieldno + 1 !=
-					key_def->parts[i + 1].fieldno) {
+				    key_def->parts[i + 1].fieldno) {
 					/*
 					 * End of sequential part.
 					 */
 					break;
 				}
-				mp_next(&end);
+				if (!is_optional || end < tuple_end)
+					mp_next(&end);
+				else
+					++null_count;
 			}
 		}
-		mp_next(&end);
+		if (!is_optional || end < tuple_end)
+			mp_next(&end);
+		else
+			++null_count;
 		bsize = end - field;
 		memcpy(key_buf, field, bsize);
 		key_buf += bsize;
+		if (is_optional && null_count != 0) {
+			memset(key_buf, MSGPACK_NULL, null_count);
+			key_buf += null_count * mp_sizeof_nil();
+		}
 	}
 	if (key_size != NULL)
 		*key_size = key_buf - key;
@@ -134,11 +199,15 @@ tuple_extract_key_slowpath(const struct tuple *tuple,
  * General-purpose version of tuple_extract_key_raw()
  * @copydoc tuple_extract_key_raw()
  */
+template <bool is_optional>
 static char *
 tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 			       const struct key_def *key_def,
 			       uint32_t *key_size)
 {
+	assert(!is_optional || key_def->is_nullable);
+	assert(is_optional == key_def->is_optional);
+	assert(mp_sizeof_nil() == 1);
 	/* allocate buffer with maximal possible size */
 	char *key = (char *) region_alloc(&fiber()->gc, data_end - data);
 	if (key == NULL) {
@@ -148,7 +217,12 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 	}
 	char *key_buf = mp_encode_array(key, key_def->part_count);
 	const char *field0 = data;
-	mp_decode_array(&field0);
+	uint32_t field_count = mp_decode_array(&field0);
+	/*
+	 * A tuple can not be empty - at least a pk always exists.
+	 */
+	assert(field_count > 0);
+	(void) field_count;
 	const char *field0_end = field0;
 	mp_next(&field0_end);
 	const char *field = field0;
@@ -156,11 +230,14 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 	uint32_t current_fieldno = 0;
 	for (uint32_t i = 0; i < key_def->part_count; i++) {
 		uint32_t fieldno = key_def->parts[i].fieldno;
+		uint32_t null_count = 0;
 		for (; i < key_def->part_count - 1; i++) {
 			if (key_def->parts[i].fieldno + 1 !=
 			    key_def->parts[i + 1].fieldno)
 				break;
 		}
+		uint32_t end_fieldno = key_def->parts[i].fieldno;
+
 		if (fieldno < current_fieldno) {
 			/* Rewind. */
 			field = field0;
@@ -168,6 +245,19 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 			current_fieldno = 0;
 		}
 
+		/*
+		 * First fieldno in a key columns can be out of
+		 * tuple size for nullable indexes because of
+		 * absense of indexed fields. Treat such fields
+		 * as NULLs.
+		 */
+		if (is_optional && fieldno >= field_count) {
+			/* Nullify entire columns range. */
+			null_count = fieldno - end_fieldno + 1;
+			memset(key_buf, MSGPACK_NULL, null_count);
+			key_buf += null_count * mp_sizeof_nil();
+			continue;
+		}
 		while (current_fieldno < fieldno) {
 			/* search first field of key in tuple raw data */
 			field = field_end;
@@ -175,31 +265,33 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 			current_fieldno++;
 		}
 
-		while (current_fieldno < key_def->parts[i].fieldno) {
-			/* search the last field in subsequence */
-			mp_next(&field_end);
-			current_fieldno++;
+		/*
+		 * If the last fieldno is out of tuple size, then
+		 * fill rest of columns with NULLs.
+		 */
+		if (is_optional && end_fieldno >= field_count) {
+			null_count = end_fieldno - field_count + 1;
+			field_end = data_end;
+		} else {
+			while (current_fieldno < end_fieldno) {
+				mp_next(&field_end);
+				current_fieldno++;
+			}
 		}
 		memcpy(key_buf, field, field_end - field);
 		key_buf += field_end - field;
-		assert(key_buf - key <= data_end - data);
+		if (is_optional && null_count != 0) {
+			memset(key_buf, MSGPACK_NULL, null_count);
+			key_buf += null_count * mp_sizeof_nil();
+		} else {
+			assert(key_buf - key <= data_end - data);
+		}
 	}
 	if (key_size != NULL)
 		*key_size = (uint32_t)(key_buf - key);
 	return key;
 }
 
-/** True, if a key con contain two or more parts in sequence. */
-static bool
-key_def_contains_sequential_parts(struct key_def *def)
-{
-	for (uint32_t i = 0; i < def->part_count - 1; ++i) {
-		if (def->parts[i].fieldno + 1 == def->parts[i + 1].fieldno)
-			return true;
-	}
-	return false;
-}
-
 /**
  * Initialize tuple_extract_key() and tuple_extract_key_raw()
  */
@@ -207,16 +299,45 @@ void
 tuple_extract_key_set(struct key_def *key_def)
 {
 	if (key_def_is_sequential(key_def)) {
-		key_def->tuple_extract_key = tuple_extract_key_sequential;
-		key_def->tuple_extract_key_raw = tuple_extract_key_sequential_raw;
-	} else {
-		if (key_def_contains_sequential_parts(key_def)) {
+		if (key_def->is_optional) {
+			assert(key_def->is_nullable);
 			key_def->tuple_extract_key =
-				tuple_extract_key_slowpath<true>;
+				tuple_extract_key_sequential<true>;
+			key_def->tuple_extract_key_raw =
+				tuple_extract_key_sequential_raw<true>;
 		} else {
 			key_def->tuple_extract_key =
-				tuple_extract_key_slowpath<false>;
+				tuple_extract_key_sequential<false>;
+			key_def->tuple_extract_key_raw =
+				tuple_extract_key_sequential_raw<false>;
 		}
-		key_def->tuple_extract_key_raw = tuple_extract_key_slowpath_raw;
+	} else {
+		if (key_def->is_optional) {
+			assert(key_def->is_nullable);
+			if (key_def_contains_sequential_parts(key_def)) {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<true, true>;
+			} else {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<false, true>;
+			}
+		} else {
+			if (key_def_contains_sequential_parts(key_def)) {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<true, false>;
+			} else {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<false,
+								   false>;
+			}
+		}
+	}
+	if (key_def->is_optional) {
+		assert(key_def->is_nullable);
+		key_def->tuple_extract_key_raw =
+			tuple_extract_key_slowpath_raw<true>;
+	} else {
+		key_def->tuple_extract_key_raw =
+			tuple_extract_key_slowpath_raw<false>;
 	}
 }
diff --git a/src/box/tuple_format.c b/src/box/tuple_format.c
index 3e2c8bf57..26109cf3e 100644
--- a/src/box/tuple_format.c
+++ b/src/box/tuple_format.c
@@ -49,6 +49,9 @@ tuple_format_create(struct tuple_format *format, struct key_def * const *keys,
 		    uint16_t key_count, const struct field_def *fields,
 		    uint32_t field_count)
 {
+	format->min_field_count =
+		tuple_format_min_field_count(keys, key_count, fields,
+					     field_count);
 	if (format->field_count == 0) {
 		format->field_map_size = 0;
 		return 0;
@@ -59,8 +62,6 @@ tuple_format_create(struct tuple_format *format, struct key_def * const *keys,
 		format->fields[i].type = fields[i].type;
 		format->fields[i].offset_slot = TUPLE_OFFSET_SLOT_NIL;
 		format->fields[i].is_nullable = fields[i].is_nullable;
-		if (i + 1 > format->min_field_count && !fields[i].is_nullable)
-			format->min_field_count = i + 1;
 	}
 	/* Initialize remaining fields */
 	for (uint32_t i = field_count; i < format->field_count; i++)
@@ -89,6 +90,8 @@ tuple_format_create(struct tuple_format *format, struct key_def * const *keys,
 					 "nullable" : "not nullable");
 				return -1;
 			}
+			assert(part->fieldno + 1 <= format->min_field_count ||
+			       part->is_optional);
 
 			/*
 			 * Check that there are no conflicts
@@ -231,7 +234,7 @@ tuple_format_alloc(struct key_def * const *keys, uint16_t key_count,
 	format->field_count = field_count;
 	format->index_field_count = index_field_count;
 	format->exact_field_count = 0;
-	format->min_field_count = index_field_count;
+	format->min_field_count = 0;
 	return format;
 }
 
@@ -354,6 +357,14 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
 	++field;
 	uint32_t i = 1;
 	uint32_t defined_field_count = MIN(field_count, format->field_count);
+	if (field_count < format->index_field_count) {
+		/*
+		 * Nullify field map to be able to detect by 0,
+		 * which key fields absense in tuple_field().
+		 */
+		memset((char *)field_map - format->field_map_size, 0,
+		       format->field_map_size);
+	}
 	for (; i < defined_field_count; ++i, ++field) {
 		mp_type = mp_typeof(*pos);
 		if (key_mp_type_validate(field->type, mp_type, ER_FIELD_TYPE,
@@ -369,6 +380,28 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
 	return 0;
 }
 
+uint32_t
+tuple_format_min_field_count(struct key_def * const *keys, uint16_t key_count,
+			     const struct field_def *space_fields,
+			     uint32_t space_field_count)
+{
+	uint32_t min_field_count = 0;
+	for (uint32_t i = 0; i < space_field_count; ++i) {
+		if (! space_fields[i].is_nullable)
+			min_field_count = i + 1;
+	}
+	for (uint32_t i = 0; i < key_count; ++i) {
+		const struct key_def *kd = keys[i];
+		for (uint32_t j = 0; j < kd->part_count; ++j) {
+			const struct key_part *kp = &kd->parts[j];
+			if (!kp->is_nullable &&
+			    kp->fieldno + 1 > min_field_count)
+				min_field_count = kp->fieldno + 1;
+		}
+	}
+	return min_field_count;
+}
+
 /** Destroy tuple format subsystem and free resourses */
 void
 tuple_format_free()
diff --git a/src/box/tuple_format.h b/src/box/tuple_format.h
index d33c77ae6..5460fa5ad 100644
--- a/src/box/tuple_format.h
+++ b/src/box/tuple_format.h
@@ -230,6 +230,21 @@ tuple_format_meta_size(const struct tuple_format *format)
 	return format->extra_size + format->field_map_size;
 }
 
+/**
+ * Calculate minimal field count of tuples with specified keys and
+ * space format.
+ * @param keys Array of key definitions of indexes.
+ * @param key_count Length of @a keys.
+ * @param space_fields Array of fields from a space format.
+ * @param space_field_count Length of @a space_fields.
+ *
+ * @retval Minimal field count.
+ */
+uint32_t
+tuple_format_min_field_count(struct key_def * const *keys, uint16_t key_count,
+			     const struct field_def *space_fields,
+			     uint32_t space_field_count);
+
 typedef struct tuple_format box_tuple_format_t;
 
 /** \cond public */
@@ -298,7 +313,7 @@ static inline const char *
 tuple_field_raw(const struct tuple_format *format, const char *tuple,
 		const uint32_t *field_map, uint32_t field_no)
 {
-	if (likely(field_no < format->field_count)) {
+	if (likely(field_no < format->index_field_count)) {
 		/* Indexed field */
 
 		if (field_no == 0) {
@@ -307,8 +322,12 @@ tuple_field_raw(const struct tuple_format *format, const char *tuple,
 		}
 
 		int32_t offset_slot = format->fields[field_no].offset_slot;
-		if (offset_slot != TUPLE_OFFSET_SLOT_NIL)
-			return tuple + field_map[offset_slot];
+		if (offset_slot != TUPLE_OFFSET_SLOT_NIL) {
+			if (field_map[offset_slot] != 0)
+				return tuple + field_map[offset_slot];
+			else
+				return NULL;
+		}
 	}
 	ERROR_INJECT(ERRINJ_TUPLE_FIELD, return NULL);
 	uint32_t field_count = mp_decode_array(&tuple);
diff --git a/src/box/tuple_hash.cc b/src/box/tuple_hash.cc
index e08bc5ff5..f317a2137 100644
--- a/src/box/tuple_hash.cc
+++ b/src/box/tuple_hash.cc
@@ -212,6 +212,7 @@ static const hasher_signature hash_arr[] = {
 
 #undef HASHER
 
+template <bool is_optional>
 uint32_t
 tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def);
 
@@ -254,7 +255,10 @@ tuple_hash_func_set(struct key_def *key_def) {
 	}
 
 slowpath:
-	key_def->tuple_hash = tuple_hash_slowpath;
+	if (key_def->is_optional)
+		key_def->tuple_hash = tuple_hash_slowpath<true>;
+	else
+		key_def->tuple_hash = tuple_hash_slowpath<false>;
 	key_def->key_hash = key_hash_slowpath;
 }
 
@@ -295,17 +299,32 @@ tuple_hash_field(uint32_t *ph1, uint32_t *pcarry, const char **field,
 	return size;
 }
 
+static inline uint32_t
+tuple_hash_null(uint32_t *ph1, uint32_t *pcarry)
+{
+	assert(mp_sizeof_nil() == 1);
+	const char null = 0xc0;
+	PMurHash32_Process(ph1, pcarry, &null, 1);
+	return mp_sizeof_nil();
+}
 
+template <bool is_optional>
 uint32_t
 tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
 {
+	assert(is_optional == key_def->is_optional);
 	uint32_t h = HASH_SEED;
 	uint32_t carry = 0;
 	uint32_t total_size = 0;
 	uint32_t prev_fieldno = key_def->parts[0].fieldno;
-	const char* field = tuple_field(tuple, key_def->parts[0].fieldno);
-	total_size += tuple_hash_field(&h, &carry, &field,
-				       key_def->parts[0].coll);
+	const char *field = tuple_field(tuple, key_def->parts[0].fieldno);
+	const char *end = (char *)tuple + tuple_size(tuple);
+	if (is_optional && field == NULL) {
+		total_size += tuple_hash_null(&h, &carry);
+	} else {
+		total_size += tuple_hash_field(&h, &carry, &field,
+					       key_def->parts[0].coll);
+	}
 	for (uint32_t part_id = 1; part_id < key_def->part_count; part_id++) {
 		/* If parts of key_def are not sequential we need to call
 		 * tuple_field. Otherwise, tuple is hashed sequentially without
@@ -314,8 +333,13 @@ tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
 		if (prev_fieldno + 1 != key_def->parts[part_id].fieldno) {
 			field = tuple_field(tuple, key_def->parts[part_id].fieldno);
 		}
-		total_size += tuple_hash_field(&h, &carry, &field,
-					       key_def->parts[part_id].coll);
+		if (is_optional && (field == NULL || field >= end)) {
+			total_size += tuple_hash_null(&h, &carry);
+		} else {
+			total_size +=
+				tuple_hash_field(&h, &carry, &field,
+						 key_def->parts[part_id].coll);
+		}
 		prev_fieldno = key_def->parts[part_id].fieldno;
 	}
 
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 248394239..3aae327ee 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -3016,6 +3016,7 @@ vy_join_cb(const struct vy_log_record *record, void *arg)
 		if (ctx->key_def != NULL)
 			free(ctx->key_def);
 		ctx->key_def = key_def_new_with_parts(record->key_parts,
+						      record->key_part_count,
 						      record->key_part_count);
 		if (ctx->key_def == NULL)
 			return -1;
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index c0f692986..54c456498 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -361,7 +361,7 @@ vy_stmt_new_surrogate_from_key(const char *key, enum iproto_type type,
 	assert(part_count <= field_count);
 	uint32_t nulls_count = field_count - cmp_def->part_count;
 	uint32_t bsize = mp_sizeof_array(field_count) +
-		mp_sizeof_nil() * nulls_count;
+			 mp_sizeof_nil() * nulls_count;
 	for (uint32_t i = 0; i < part_count; ++i) {
 		const struct key_part *part = &cmp_def->parts[i];
 		assert(part->fieldno < field_count);
@@ -422,14 +422,25 @@ vy_stmt_new_surrogate_delete(struct tuple_format *format,
 
 	const char *src_pos = src_data;
 	uint32_t src_count = mp_decode_array(&src_pos);
-	uint32_t field_count = format->index_field_count;
-	assert(src_count >= field_count);
-	(void) src_count;
+	assert(src_count >= format->min_field_count);
+	uint32_t field_count;
+	if (src_count < format->index_field_count) {
+		field_count = src_count;
+		/*
+		 * Nullify field map to be able to detect by 0,
+		 * which key fields absense in tuple_field().
+		 */
+		memset((char *)field_map - format->field_map_size, 0,
+		       format->field_map_size);
+	} else {
+		field_count = format->index_field_count;
+	}
 	char *pos = mp_encode_array(data, field_count);
 	for (uint32_t i = 0; i < field_count; ++i) {
 		const struct tuple_field *field = &format->fields[i];
 		if (! field->is_key_part) {
-			/* Unindexed field - write NIL */
+			/* Unindexed field - write NIL. */
+			assert(i < src_count);
 			pos = mp_encode_nil(pos);
 			mp_next(&src_pos);
 			continue;
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index 38c53216e..8f7c261e8 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -688,7 +688,7 @@ vy_tuple_key_contains_null(const struct tuple *tuple, const struct key_def *def)
 {
 	for (uint32_t i = 0; i < def->part_count; ++i) {
 		const char *field = tuple_field(tuple, def->parts[i].fieldno);
-		if (mp_typeof(*field) == MP_NIL)
+		if (field == NULL || mp_typeof(*field) == MP_NIL)
 			return true;
 	}
 	return false;
diff --git a/test/engine/null.result b/test/engine/null.result
index 835f4d6f4..5164da615 100644
--- a/test/engine/null.result
+++ b/test/engine/null.result
@@ -755,3 +755,608 @@ s.index.i5:select()
 s:drop()
 ---
 ...
+--
+-- gh-2988: allow absense of tail nullable indexed fields.
+--
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+pk = s:create_index('pk')
+---
+...
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+---
+...
+-- Test tuple_compare_slowpath, tuple_compare_with_key_slowpath.
+s:replace{} -- Fail
+---
+- error: Tuple field count 0 is less than required by space format or defined indexes
+    (expected at least 1)
+...
+-- Compare full vs not full.
+s:replace{2}
+---
+- [2]
+...
+s:replace{1, 2}
+---
+- [1, 2]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+...
+sk:select{box.NULL}
+---
+- - [2]
+...
+sk:select{2}
+---
+- - [1, 2]
+...
+-- Compare not full vs full.
+s:replace{4, 5}
+---
+- [4, 5]
+...
+s:replace{3}
+---
+- [3]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+  - [3]
+  - [4, 5]
+...
+sk:select{box.NULL}
+---
+- - [2]
+  - [3]
+...
+sk:select{5}
+---
+- - [4, 5]
+...
+-- Compare extended keys.
+s:replace{7}
+---
+- [7]
+...
+s:replace{6}
+---
+- [6]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+  - [3]
+  - [4, 5]
+  - [6]
+  - [7]
+...
+sk:select{box.NULL}
+---
+- - [2]
+  - [3]
+  - [6]
+  - [7]
+...
+sk:select{}
+---
+- - [2]
+  - [3]
+  - [6]
+  - [7]
+  - [1, 2]
+  - [4, 5]
+...
+-- Test tuple extract key during dump for vinyl.
+box.snapshot()
+---
+- ok
+...
+sk:select{}
+---
+- - [2]
+  - [3]
+  - [6]
+  - [7]
+  - [1, 2]
+  - [4, 5]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+  - [3]
+  - [4, 5]
+  - [6]
+  - [7]
+...
+-- Test tuple_compare_sequential_nullable,
+-- tuple_compare_with_key_sequential.
+s:drop()
+---
+...
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+pk = s:create_index('pk')
+---
+...
+parts = {}
+---
+...
+parts[1] = {1, 'unsigned'}
+---
+...
+parts[2] = {2, 'unsigned', is_nullable = true}
+---
+...
+parts[3] = {3, 'unsigned', is_nullable = true}
+---
+...
+sk = s:create_index('sk', {parts = parts})
+---
+...
+-- Compare full vs not full.
+s:replace{1, 2, 3}
+---
+- [1, 2, 3]
+...
+s:replace{3}
+---
+- [3]
+...
+s:replace{2, 3}
+---
+- [2, 3]
+...
+sk:select{}
+---
+- - [1, 2, 3]
+  - [2, 3]
+  - [3]
+...
+sk:select{3, box.NULL}
+---
+- - [3]
+...
+sk:select{3, box.NULL, box.NULL}
+---
+- - [3]
+...
+sk:select{2}
+---
+- - [2, 3]
+...
+sk:select{2, 3}
+---
+- - [2, 3]
+...
+sk:select{3, 100}
+---
+- []
+...
+sk:select{3, box.NULL, 100}
+---
+- []
+...
+sk:select({3, box.NULL}, {iterator = 'GE'})
+---
+- - [3]
+...
+sk:select({3, box.NULL}, {iterator = 'LE'})
+---
+- - [3]
+  - [2, 3]
+  - [1, 2, 3]
+...
+s:select{}
+---
+- - [1, 2, 3]
+  - [2, 3]
+  - [3]
+...
+-- Test tuple extract key for vinyl.
+box.snapshot()
+---
+- ok
+...
+sk:select{}
+---
+- - [1, 2, 3]
+  - [2, 3]
+  - [3]
+...
+sk:select{3, box.NULL}
+---
+- - [3]
+...
+sk:select{3, box.NULL, box.NULL}
+---
+- - [3]
+...
+sk:select{2}
+---
+- - [2, 3]
+...
+sk:select{2, 3}
+---
+- - [2, 3]
+...
+sk:select{3, 100}
+---
+- []
+...
+sk:select{3, box.NULL, 100}
+---
+- []
+...
+sk:select({3, box.NULL}, {iterator = 'GE'})
+---
+- - [3]
+...
+sk:select({3, box.NULL}, {iterator = 'LE'})
+---
+- - [3]
+  - [2, 3]
+  - [1, 2, 3]
+...
+-- Test a tuple_compare_sequential() for a case, when there are
+-- two equal tuples, but in one of them field count < unique field
+-- count.
+s:replace{1, box.NULL}
+---
+- [1, null]
+...
+s:replace{1, box.NULL, box.NULL}
+---
+- [1, null, null]
+...
+s:select{1}
+---
+- - [1, null, null]
+...
+--
+-- Partially sequential keys. See tuple_extract_key.cc and
+-- contains_sequential_parts template flag.
+--
+s:drop()
+---
+...
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+pk = s:create_index('pk')
+---
+...
+parts = {}
+---
+...
+parts[1] = {2, 'unsigned', is_nullable = true}
+---
+...
+parts[2] = {3, 'unsigned', is_nullable = true}
+---
+...
+parts[3] = {5, 'unsigned', is_nullable = true}
+---
+...
+parts[4] = {6, 'unsigned', is_nullable = true}
+---
+...
+parts[5] = {4, 'unsigned', is_nullable = true}
+---
+...
+parts[6] = {7, 'unsigned', is_nullable = true}
+---
+...
+sk = s:create_index('sk', {parts = parts})
+---
+...
+s:insert{1, 1, 1, 1, 1, 1, 1}
+---
+- [1, 1, 1, 1, 1, 1, 1]
+...
+s:insert{8, 1, 1, 1, 1, box.NULL}
+---
+- [8, 1, 1, 1, 1, null]
+...
+s:insert{9, 1, 1, 1, box.NULL}
+---
+- [9, 1, 1, 1, null]
+...
+s:insert{6, 6}
+---
+- [6, 6]
+...
+s:insert{10, 6, box.NULL}
+---
+- [10, 6, null]
+...
+s:insert{2, 2, 2, 2, 2, 2}
+---
+- [2, 2, 2, 2, 2, 2]
+...
+s:insert{7}
+---
+- [7]
+...
+s:insert{5, 5, 5}
+---
+- [5, 5, 5]
+...
+s:insert{3, 5, box.NULL, box.NULL, box.NULL}
+---
+- [3, 5, null, null, null]
+...
+s:insert{4, 5, 5, 5, box.NULL}
+---
+- [4, 5, 5, 5, null]
+...
+s:insert{11, 4, 4, 4}
+---
+- [11, 4, 4, 4]
+...
+s:insert{12, 4, box.NULL, 4}
+---
+- [12, 4, null, 4]
+...
+s:insert{13, 3, 3, 3, 3}
+---
+- [13, 3, 3, 3, 3]
+...
+s:insert{14, box.NULL, 3, box.NULL, 3}
+---
+- [14, null, 3, null, 3]
+...
+s:select{}
+---
+- - [1, 1, 1, 1, 1, 1, 1]
+  - [2, 2, 2, 2, 2, 2]
+  - [3, 5, null, null, null]
+  - [4, 5, 5, 5, null]
+  - [5, 5, 5]
+  - [6, 6]
+  - [7]
+  - [8, 1, 1, 1, 1, null]
+  - [9, 1, 1, 1, null]
+  - [10, 6, null]
+  - [11, 4, 4, 4]
+  - [12, 4, null, 4]
+  - [13, 3, 3, 3, 3]
+  - [14, null, 3, null, 3]
+...
+sk:select{}
+---
+- - [7]
+  - [14, null, 3, null, 3]
+  - [9, 1, 1, 1, null]
+  - [8, 1, 1, 1, 1, null]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [2, 2, 2, 2, 2, 2]
+  - [13, 3, 3, 3, 3]
+  - [12, 4, null, 4]
+  - [11, 4, 4, 4]
+  - [3, 5, null, null, null]
+  - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+  - [6, 6]
+  - [10, 6, null]
+...
+sk:select{5, 5, box.NULL}
+---
+- - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+...
+sk:select{5, 5, box.NULL, 100}
+---
+- []
+...
+sk:select({7, box.NULL}, {iterator = 'LT'})
+---
+- - [10, 6, null]
+  - [6, 6]
+  - [4, 5, 5, 5, null]
+  - [5, 5, 5]
+  - [3, 5, null, null, null]
+  - [11, 4, 4, 4]
+  - [12, 4, null, 4]
+  - [13, 3, 3, 3, 3]
+  - [2, 2, 2, 2, 2, 2]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [8, 1, 1, 1, 1, null]
+  - [9, 1, 1, 1, null]
+  - [14, null, 3, null, 3]
+  - [7]
+...
+box.snapshot()
+---
+- ok
+...
+sk:select{}
+---
+- - [7]
+  - [14, null, 3, null, 3]
+  - [9, 1, 1, 1, null]
+  - [8, 1, 1, 1, 1, null]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [2, 2, 2, 2, 2, 2]
+  - [13, 3, 3, 3, 3]
+  - [12, 4, null, 4]
+  - [11, 4, 4, 4]
+  - [3, 5, null, null, null]
+  - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+  - [6, 6]
+  - [10, 6, null]
+...
+sk:select{5, 5, box.NULL}
+---
+- - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+...
+sk:select{5, 5, box.NULL, 100}
+---
+- []
+...
+sk:select({7, box.NULL}, {iterator = 'LT'})
+---
+- - [10, 6, null]
+  - [6, 6]
+  - [4, 5, 5, 5, null]
+  - [5, 5, 5]
+  - [3, 5, null, null, null]
+  - [11, 4, 4, 4]
+  - [12, 4, null, 4]
+  - [13, 3, 3, 3, 3]
+  - [2, 2, 2, 2, 2, 2]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [8, 1, 1, 1, 1, null]
+  - [9, 1, 1, 1, null]
+  - [14, null, 3, null, 3]
+  - [7]
+...
+s:drop()
+---
+...
+--
+-- The main case of absend nullable fields - create an index over
+-- them on not empty space (available on memtx only).
+--
+s = box.schema.space.create('test', {engine = 'memtx'})
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1}
+---
+- [1]
+...
+s:replace{2}
+---
+- [2]
+...
+s:replace{3}
+---
+- [3]
+...
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+---
+...
+s:replace{4}
+---
+- [4]
+...
+s:replace{5, 6}
+---
+- [5, 6]
+...
+s:replace{7, 8}
+---
+- [7, 8]
+...
+s:replace{9, box.NULL}
+---
+- [9, null]
+...
+s:select{}
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [5, 6]
+  - [7, 8]
+  - [9, null]
+...
+sk:select{}
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [9, null]
+  - [5, 6]
+  - [7, 8]
+...
+sk:select{box.NULL}
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [9, null]
+...
+s:drop()
+---
+...
+--
+-- The complex case: when an index part is_nullable is set to,
+-- false and it changes min_field_count, this part must become
+-- optional and turn on comparators for optional fields. See the
+-- big comment in alter.cc in index_def_new_from_tuple().
+--
+s = box.schema.create_space('test', {engine = 'memtx'})
+---
+...
+pk = s:create_index('pk')
+---
+...
+sk = s:create_index('sk', {parts = {2, 'unsigned'}})
+---
+...
+s:replace{1, 1}
+---
+- [1, 1]
+...
+s:replace{2, box.NULL}
+---
+- error: 'Tuple field 2 type does not match one required by operation: expected unsigned'
+...
+s:select{}
+---
+- - [1, 1]
+...
+sk:alter({parts = {{2, 'unsigned', is_nullable = true}}})
+---
+...
+s:replace{20, box.NULL}
+---
+- [20, null]
+...
+sk:select{}
+---
+- - [20, null]
+  - [1, 1]
+...
+s:replace{10}
+---
+- [10]
+...
+sk:select{}
+---
+- - [10]
+  - [20, null]
+  - [1, 1]
+...
+s:replace{40}
+---
+- [40]
+...
+sk:select{}
+---
+- - [10]
+  - [20, null]
+  - [40]
+  - [1, 1]
+...
+s:drop()
+---
+...
diff --git a/test/engine/null.test.lua b/test/engine/null.test.lua
index 19a70d68a..12702484a 100644
--- a/test/engine/null.test.lua
+++ b/test/engine/null.test.lua
@@ -242,3 +242,161 @@ s.index.i4:select()
 s.index.i5:select()
 
 s:drop()
+
+--
+-- gh-2988: allow absense of tail nullable indexed fields.
+--
+s = box.schema.space.create('test', {engine = engine})
+pk = s:create_index('pk')
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+
+-- Test tuple_compare_slowpath, tuple_compare_with_key_slowpath.
+
+s:replace{} -- Fail
+-- Compare full vs not full.
+s:replace{2}
+s:replace{1, 2}
+s:select{}
+sk:select{box.NULL}
+sk:select{2}
+-- Compare not full vs full.
+s:replace{4, 5}
+s:replace{3}
+s:select{}
+sk:select{box.NULL}
+sk:select{5}
+-- Compare extended keys.
+s:replace{7}
+s:replace{6}
+s:select{}
+sk:select{box.NULL}
+sk:select{}
+-- Test tuple extract key during dump for vinyl.
+box.snapshot()
+sk:select{}
+s:select{}
+
+-- Test tuple_compare_sequential_nullable,
+-- tuple_compare_with_key_sequential.
+s:drop()
+s = box.schema.space.create('test', {engine = engine})
+pk = s:create_index('pk')
+parts = {}
+parts[1] = {1, 'unsigned'}
+parts[2] = {2, 'unsigned', is_nullable = true}
+parts[3] = {3, 'unsigned', is_nullable = true}
+sk = s:create_index('sk', {parts = parts})
+-- Compare full vs not full.
+s:replace{1, 2, 3}
+s:replace{3}
+s:replace{2, 3}
+sk:select{}
+sk:select{3, box.NULL}
+sk:select{3, box.NULL, box.NULL}
+sk:select{2}
+sk:select{2, 3}
+sk:select{3, 100}
+sk:select{3, box.NULL, 100}
+sk:select({3, box.NULL}, {iterator = 'GE'})
+sk:select({3, box.NULL}, {iterator = 'LE'})
+s:select{}
+-- Test tuple extract key for vinyl.
+box.snapshot()
+sk:select{}
+sk:select{3, box.NULL}
+sk:select{3, box.NULL, box.NULL}
+sk:select{2}
+sk:select{2, 3}
+sk:select{3, 100}
+sk:select{3, box.NULL, 100}
+sk:select({3, box.NULL}, {iterator = 'GE'})
+sk:select({3, box.NULL}, {iterator = 'LE'})
+
+-- Test a tuple_compare_sequential() for a case, when there are
+-- two equal tuples, but in one of them field count < unique field
+-- count.
+s:replace{1, box.NULL}
+s:replace{1, box.NULL, box.NULL}
+s:select{1}
+
+--
+-- Partially sequential keys. See tuple_extract_key.cc and
+-- contains_sequential_parts template flag.
+--
+s:drop()
+s = box.schema.space.create('test', {engine = engine})
+pk = s:create_index('pk')
+parts = {}
+parts[1] = {2, 'unsigned', is_nullable = true}
+parts[2] = {3, 'unsigned', is_nullable = true}
+parts[3] = {5, 'unsigned', is_nullable = true}
+parts[4] = {6, 'unsigned', is_nullable = true}
+parts[5] = {4, 'unsigned', is_nullable = true}
+parts[6] = {7, 'unsigned', is_nullable = true}
+sk = s:create_index('sk', {parts = parts})
+s:insert{1, 1, 1, 1, 1, 1, 1}
+s:insert{8, 1, 1, 1, 1, box.NULL}
+s:insert{9, 1, 1, 1, box.NULL}
+s:insert{6, 6}
+s:insert{10, 6, box.NULL}
+s:insert{2, 2, 2, 2, 2, 2}
+s:insert{7}
+s:insert{5, 5, 5}
+s:insert{3, 5, box.NULL, box.NULL, box.NULL}
+s:insert{4, 5, 5, 5, box.NULL}
+s:insert{11, 4, 4, 4}
+s:insert{12, 4, box.NULL, 4}
+s:insert{13, 3, 3, 3, 3}
+s:insert{14, box.NULL, 3, box.NULL, 3}
+s:select{}
+sk:select{}
+sk:select{5, 5, box.NULL}
+sk:select{5, 5, box.NULL, 100}
+sk:select({7, box.NULL}, {iterator = 'LT'})
+box.snapshot()
+sk:select{}
+sk:select{5, 5, box.NULL}
+sk:select{5, 5, box.NULL, 100}
+sk:select({7, box.NULL}, {iterator = 'LT'})
+
+s:drop()
+
+--
+-- The main case of absend nullable fields - create an index over
+-- them on not empty space (available on memtx only).
+--
+s = box.schema.space.create('test', {engine = 'memtx'})
+pk = s:create_index('pk')
+s:replace{1}
+s:replace{2}
+s:replace{3}
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+s:replace{4}
+s:replace{5, 6}
+s:replace{7, 8}
+s:replace{9, box.NULL}
+s:select{}
+sk:select{}
+sk:select{box.NULL}
+s:drop()
+
+--
+-- The complex case: when an index part is_nullable is set to,
+-- false and it changes min_field_count, this part must become
+-- optional and turn on comparators for optional fields. See the
+-- big comment in alter.cc in index_def_new_from_tuple().
+--
+s = box.schema.create_space('test', {engine = 'memtx'})
+pk = s:create_index('pk')
+sk = s:create_index('sk', {parts = {2, 'unsigned'}})
+s:replace{1, 1}
+s:replace{2, box.NULL}
+s:select{}
+sk:alter({parts = {{2, 'unsigned', is_nullable = true}}})
+s:replace{20, box.NULL}
+sk:select{}
+s:replace{10}
+sk:select{}
+s:replace{40}
+sk:select{}
+s:drop()
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list