[patches] [schema 1/1] Allow to do not specify tail nullable index columns

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu Jan 25 13:56:11 MSK 2018


If a column is nullable and is the last defined one (via index parts
or space format), it can be omited on insertion. Such absent fields
are treated as NULLs in comparators and are not stored.

Closes #2988

Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
 src/box/tuple_compare.cc     |  77 +++++--
 src/box/tuple_extract_key.cc | 173 +++++++++++---
 src/box/tuple_format.c       |  11 +-
 src/box/tuple_format.h       |  10 +-
 src/box/tuple_hash.cc        |  32 ++-
 src/box/vy_stmt.c            |  17 +-
 src/box/vy_stmt.h            |   2 +-
 test/engine/null.result      | 526 +++++++++++++++++++++++++++++++++++++++++++
 test/engine/null.test.lua    | 130 +++++++++++
 9 files changed, 912 insertions(+), 66 deletions(-)

diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc
index cf8fc130e..4c249a942 100644
--- a/src/box/tuple_compare.cc
+++ b/src/box/tuple_compare.cc
@@ -471,8 +471,8 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 					  part->fieldno);
 		field_b = tuple_field_raw(format_b, tuple_b_raw, field_map_b,
 					  part->fieldno);
-		assert(field_a != NULL && field_b != NULL);
 		if (! is_nullable) {
+			assert(field_a != NULL && field_b != NULL);
 			rc = tuple_compare_field(field_a, field_b, part->type,
 						 part->coll);
 			if (rc != 0)
@@ -480,8 +480,10 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
 			else
 				continue;
 		}
-		enum mp_type a_type = mp_typeof(*field_a);
-		enum mp_type b_type = mp_typeof(*field_b);
+		enum mp_type a_type =
+			field_a != NULL ? mp_typeof(*field_a) : MP_NIL;
+		enum mp_type b_type =
+			field_b != NULL ? mp_typeof(*field_b) : MP_NIL;
 		if (a_type == MP_NIL) {
 			if (b_type != MP_NIL)
 				return -1;
@@ -544,7 +546,8 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
 			return tuple_compare_field(field, key, part->type,
 						   part->coll);
 		}
-		enum mp_type a_type = mp_typeof(*field);
+		enum mp_type a_type =
+			field != NULL ? mp_typeof(*field) : MP_NIL;
 		enum mp_type b_type = mp_typeof(*key);
 		if (a_type == MP_NIL) {
 			return b_type == MP_NIL ? 0 : -1;
@@ -571,7 +574,8 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
 			else
 				continue;
 		}
-		enum mp_type a_type = mp_typeof(*field);
+		enum mp_type a_type =
+			field != NULL ? mp_typeof(*field) : MP_NIL;
 		enum mp_type b_type = mp_typeof(*key);
 		if (a_type == MP_NIL) {
 			if (b_type != MP_NIL)
@@ -653,6 +657,7 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
 
 	if (!is_sequential_nullable_tuples || !was_null_met)
 		return 0;
+	assert(part_count == key_def->unique_part_count);
 	end = key_def->parts + key_def->unique_part_count;
 	for (; part < end; ++part, mp_next(&key_a), mp_next(&key_b)) {
 		rc = tuple_compare_field(key_a, key_b, part->type, part->coll);
@@ -662,10 +667,44 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
 	return 0;
 }
 
-template<bool is_nullable>
 static inline int
-tuple_compare_with_key_sequential(const struct tuple *tuple,
-	const char *key, uint32_t part_count, const struct key_def *key_def)
+tuple_compare_with_key_sequential_nullable(const struct tuple *tuple,
+					   const char *key,
+					   uint32_t part_count,
+					   const struct key_def *key_def)
+{
+	assert(key_def_is_sequential(key_def));
+	assert(key_def->is_nullable);
+	const char *tuple_key = tuple_data(tuple);
+	uint32_t field_count = mp_decode_array(&tuple_key);
+	uint32_t defined_part_count = MIN(field_count, part_count);
+	int rc = key_compare_parts<true, false>(tuple_key, key,
+						defined_part_count, key_def);
+	if (rc != 0)
+		return rc;
+	/*
+	 * If some tuple indexed fields are absent, then check
+	 * corresponding key fields to be equal to NULL.
+	 */
+	if (field_count < part_count) {
+		/*
+		 * Key's and tuple's first field_count fields are
+		 * equal, and their bsize too.
+		 */
+		key += tuple->bsize - mp_sizeof_array(field_count);
+		for (uint32_t i = field_count; i < part_count; ++i,
+		     mp_next(&key)) {
+			if (mp_typeof(*key) != MP_NIL)
+				return -1;
+		}
+	}
+	return 0;
+}
+
+static inline int
+tuple_compare_with_key_sequential(const struct tuple *tuple, const char *key,
+				  uint32_t part_count,
+				  const struct key_def *key_def)
 {
 	assert(key_def_is_sequential(key_def));
 	const char *tuple_key = tuple_data(tuple);
@@ -673,8 +712,8 @@ tuple_compare_with_key_sequential(const struct tuple *tuple,
 	assert(tuple_field_count >= key_def->part_count);
 	assert(part_count <= key_def->part_count);
 	(void) tuple_field_count;
-	return key_compare_parts<is_nullable, false>(tuple_key, key, part_count,
-						     key_def);
+	return key_compare_parts<false, false>(tuple_key, key, part_count,
+					       key_def);
 }
 
 int
@@ -722,15 +761,13 @@ tuple_compare_sequential_nullable(const struct tuple *tuple_a,
 	assert(key_def->is_nullable);
 	assert(key_def_is_sequential(key_def));
 	const char *key_a = tuple_data(tuple_a);
-	uint32_t field_count = mp_decode_array(&key_a);
-	assert(field_count >= key_def->part_count);
+	uint32_t field_count_a = mp_decode_array(&key_a);
 	const char *key_b = tuple_data(tuple_b);
-	field_count = mp_decode_array(&key_b);
-	assert(field_count >= key_def->part_count);
-	(void) field_count;
-	return key_compare_parts<true, true>(key_a, key_b,
-					     key_def->unique_part_count,
-					     key_def);
+	uint32_t field_count_b = mp_decode_array(&key_b);
+	uint32_t part_count = MIN(field_count_a, field_count_b);
+	if (part_count > key_def->unique_part_count)
+		part_count = key_def->unique_part_count;
+	return key_compare_parts<true, true>(key_a, key_b, part_count, key_def);
 }
 
 template <int TYPE>
@@ -1124,7 +1161,7 @@ tuple_compare_with_key_create(const struct key_def *def)
 {
 	if (def->is_nullable) {
 		if (key_def_is_sequential(def))
-			return tuple_compare_with_key_sequential<true>;
+			return tuple_compare_with_key_sequential_nullable;
 		return tuple_compare_with_key_slowpath<true>;
 	}
 	if (!key_def_has_collation(def)) {
@@ -1147,7 +1184,7 @@ tuple_compare_with_key_create(const struct key_def *def)
 		}
 	}
 	if (key_def_is_sequential(def))
-		return tuple_compare_with_key_sequential<false>;
+		return tuple_compare_with_key_sequential;
 	return tuple_compare_with_key_slowpath<false>;
 }
 
diff --git a/src/box/tuple_extract_key.cc b/src/box/tuple_extract_key.cc
index 8765c16b5..32d367140 100644
--- a/src/box/tuple_extract_key.cc
+++ b/src/box/tuple_extract_key.cc
@@ -1,29 +1,39 @@
 #include "tuple_extract_key.h"
 #include "fiber.h"
 
+enum { MSGPACK_NULL = 0xc0 };
+
 /**
  * Optimized version of tuple_extract_key_raw() for sequential key defs
  * @copydoc tuple_extract_key_raw()
  */
+template <bool is_nullable>
 static char *
 tuple_extract_key_sequential_raw(const char *data, const char *data_end,
 				 const struct key_def *key_def,
 				 uint32_t *key_size)
 {
 	assert(key_def_is_sequential(key_def));
+	assert(is_nullable == key_def->is_nullable);
+	assert(data_end != NULL);
+	assert(mp_sizeof_nil() == 1);
 	const char *field_start = data;
 	uint32_t bsize = mp_sizeof_array(key_def->part_count);
-
-	mp_decode_array(&field_start);
+	uint32_t field_count = mp_decode_array(&field_start);
 	const char *field_end = field_start;
-
-	for (uint32_t i = 0; i < key_def->part_count; i++)
-		mp_next(&field_end);
+	uint32_t null_count;
+	if (! is_nullable || field_count > key_def->part_count) {
+		for (uint32_t i = 0; i < key_def->part_count; i++)
+			mp_next(&field_end);
+		null_count = 0;
+	} else {
+		null_count = key_def->part_count - field_count;
+		field_end = data_end;
+		bsize += null_count * mp_sizeof_nil();
+	}
+	assert(field_end - field_start <= data_end - data);
 	bsize += field_end - field_start;
 
-	assert(!data_end || (field_end - field_start <= data_end - data));
-	(void) data_end;
-
 	char *key = (char *) region_alloc(&fiber()->gc, bsize);
 	if (key == NULL) {
 		diag_set(OutOfMemory, bsize, "region",
@@ -32,6 +42,10 @@ tuple_extract_key_sequential_raw(const char *data, const char *data_end,
 	}
 	char *key_buf = mp_encode_array(key, key_def->part_count);
 	memcpy(key_buf, field_start, field_end - field_start);
+	if (is_nullable && null_count > 0) {
+		key_buf += field_end - field_start;
+		memset(key_buf, MSGPACK_NULL, null_count);
+	}
 
 	if (key_size != NULL)
 		*key_size = bsize;
@@ -42,54 +56,73 @@ tuple_extract_key_sequential_raw(const char *data, const char *data_end,
  * Optimized version of tuple_extract_key() for sequential key defs
  * @copydoc tuple_extract_key()
  */
+template <bool is_nullable>
 static inline char *
 tuple_extract_key_sequential(const struct tuple *tuple,
 			     const struct key_def *key_def,
 			     uint32_t *key_size)
 {
 	assert(key_def_is_sequential(key_def));
+	assert(is_nullable == key_def->is_nullable);
 	const char *data = tuple_data(tuple);
-	return tuple_extract_key_sequential_raw(data, NULL, key_def, key_size);
+	const char *data_end = data + tuple->bsize;
+	return tuple_extract_key_sequential_raw<is_nullable>(data, data_end,
+							     key_def, key_size);
 }
 
 /**
  * General-purpose implementation of tuple_extract_key()
  * @copydoc tuple_extract_key()
  */
-template <bool contains_sequential_parts>
+template <bool contains_sequential_parts, bool is_nullable>
 static char *
 tuple_extract_key_slowpath(const struct tuple *tuple,
 			   const struct key_def *key_def, uint32_t *key_size)
 {
+	assert(key_def->is_nullable == is_nullable);
+	assert(mp_sizeof_nil() == 1);
 	const char *data = tuple_data(tuple);
 	uint32_t part_count = key_def->part_count;
 	uint32_t bsize = mp_sizeof_array(part_count);
 	const struct tuple_format *format = tuple_format(tuple);
 	const uint32_t *field_map = tuple_field_map(tuple);
+	const char *tuple_end = data + tuple->bsize;
 
 	/* Calculate the key size. */
 	for (uint32_t i = 0; i < part_count; ++i) {
 		const char *field =
 			tuple_field_raw(format, data, field_map,
 					key_def->parts[i].fieldno);
+		if (is_nullable && field == NULL) {
+			assert(key_def->parts[i].is_nullable);
+			bsize += mp_sizeof_nil();
+			continue;
+		}
+		assert(field != NULL);
 		const char *end = field;
 		if (contains_sequential_parts) {
 			/*
 			 * Skip sequential part in order to
 			 * minimize tuple_field_raw() calls.
 			 */
-			for (; i < key_def->part_count - 1; i++) {
+			for (; i < part_count - 1; i++) {
 				if (key_def->parts[i].fieldno + 1 !=
-					key_def->parts[i + 1].fieldno) {
+				    key_def->parts[i + 1].fieldno) {
 					/*
 					 * End of sequential part.
 					 */
 					break;
 				}
-				mp_next(&end);
+				if (!is_nullable || end < tuple_end)
+					mp_next(&end);
+				else
+					bsize += mp_sizeof_nil();
 			}
 		}
-		mp_next(&end);
+		if (!is_nullable || end < tuple_end)
+			mp_next(&end);
+		else
+			bsize += mp_sizeof_nil();
 		bsize += end - field;
 	}
 
@@ -103,27 +136,42 @@ tuple_extract_key_slowpath(const struct tuple *tuple,
 		const char *field =
 			tuple_field_raw(format, data, field_map,
 					key_def->parts[i].fieldno);
+		if (is_nullable && field == NULL) {
+			key_buf = mp_encode_nil(key_buf);
+			continue;
+		}
 		const char *end = field;
+		uint32_t null_count = 0;
 		if (contains_sequential_parts) {
 			/*
 			 * Skip sequential part in order to
 			 * minimize tuple_field_raw() calls.
 			 */
-			for (; i < key_def->part_count - 1; i++) {
+			for (; i < part_count - 1; i++) {
 				if (key_def->parts[i].fieldno + 1 !=
-					key_def->parts[i + 1].fieldno) {
+				    key_def->parts[i + 1].fieldno) {
 					/*
 					 * End of sequential part.
 					 */
 					break;
 				}
-				mp_next(&end);
+				if (!is_nullable || end < tuple_end)
+					mp_next(&end);
+				else
+					++null_count;
 			}
 		}
-		mp_next(&end);
+		if (!is_nullable || end < tuple_end)
+			mp_next(&end);
+		else
+			++null_count;
 		bsize = end - field;
 		memcpy(key_buf, field, bsize);
 		key_buf += bsize;
+		if (is_nullable && null_count != 0) {
+			memset(key_buf, MSGPACK_NULL, null_count);
+			key_buf += null_count * mp_sizeof_nil();
+		}
 	}
 	if (key_size != NULL)
 		*key_size = key_buf - key;
@@ -134,11 +182,14 @@ tuple_extract_key_slowpath(const struct tuple *tuple,
  * General-purpose version of tuple_extract_key_raw()
  * @copydoc tuple_extract_key_raw()
  */
+template <bool is_nullable>
 static char *
 tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 			       const struct key_def *key_def,
 			       uint32_t *key_size)
 {
+	assert(is_nullable == key_def->is_nullable);
+	assert(mp_sizeof_nil() == 1);
 	/* allocate buffer with maximal possible size */
 	char *key = (char *) region_alloc(&fiber()->gc, data_end - data);
 	if (key == NULL) {
@@ -148,7 +199,12 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 	}
 	char *key_buf = mp_encode_array(key, key_def->part_count);
 	const char *field0 = data;
-	mp_decode_array(&field0);
+	uint32_t field_count = mp_decode_array(&field0);
+	/*
+	 * A tuple can not be empty - at least a pk always exists.
+	 */
+	assert(field_count > 0);
+	(void) field_count;
 	const char *field0_end = field0;
 	mp_next(&field0_end);
 	const char *field = field0;
@@ -156,11 +212,14 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 	uint32_t current_fieldno = 0;
 	for (uint32_t i = 0; i < key_def->part_count; i++) {
 		uint32_t fieldno = key_def->parts[i].fieldno;
+		uint32_t null_count = 0;
 		for (; i < key_def->part_count - 1; i++) {
 			if (key_def->parts[i].fieldno + 1 !=
 			    key_def->parts[i + 1].fieldno)
 				break;
 		}
+		uint32_t end_fieldno = key_def->parts[i].fieldno;
+
 		if (fieldno < current_fieldno) {
 			/* Rewind. */
 			field = field0;
@@ -168,6 +227,19 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 			current_fieldno = 0;
 		}
 
+		/*
+		 * First fieldno in a key columns can be out of
+		 * tuple size for nullable indexes because of
+		 * absense of indexed fields. Treat such fields
+		 * as NULLs.
+		 */
+		if (is_nullable && fieldno >= field_count) {
+			/* Nullify entire columns range. */
+			null_count = fieldno - end_fieldno + 1;
+			memset(key_buf, MSGPACK_NULL, null_count);
+			key_buf += null_count * mp_sizeof_nil();
+			continue;
+		}
 		while (current_fieldno < fieldno) {
 			/* search first field of key in tuple raw data */
 			field = field_end;
@@ -175,14 +247,27 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
 			current_fieldno++;
 		}
 
-		while (current_fieldno < key_def->parts[i].fieldno) {
-			/* search the last field in subsequence */
-			mp_next(&field_end);
-			current_fieldno++;
+		/*
+		 * If the last fieldno is out of tuple size, then
+		 * fill rest of columns with NULLs.
+		 */
+		if (is_nullable && end_fieldno >= field_count) {
+			null_count = end_fieldno - field_count + 1;
+			field_end = data_end;
+		} else {
+			while (current_fieldno < end_fieldno) {
+				mp_next(&field_end);
+				current_fieldno++;
+			}
 		}
 		memcpy(key_buf, field, field_end - field);
 		key_buf += field_end - field;
-		assert(key_buf - key <= data_end - data);
+		if (is_nullable && null_count != 0) {
+			memset(key_buf, MSGPACK_NULL, null_count);
+			key_buf += null_count * mp_sizeof_nil();
+		} else {
+			assert(key_buf - key <= data_end - data);
+		}
 	}
 	if (key_size != NULL)
 		*key_size = (uint32_t)(key_buf - key);
@@ -207,16 +292,42 @@ void
 tuple_extract_key_set(struct key_def *key_def)
 {
 	if (key_def_is_sequential(key_def)) {
-		key_def->tuple_extract_key = tuple_extract_key_sequential;
-		key_def->tuple_extract_key_raw = tuple_extract_key_sequential_raw;
-	} else {
-		if (key_def_contains_sequential_parts(key_def)) {
+		if (key_def->is_nullable) {
 			key_def->tuple_extract_key =
-				tuple_extract_key_slowpath<true>;
+				tuple_extract_key_sequential<true>;
+			key_def->tuple_extract_key_raw =
+				tuple_extract_key_sequential_raw<true>;
 		} else {
 			key_def->tuple_extract_key =
-				tuple_extract_key_slowpath<false>;
+				tuple_extract_key_sequential<false>;
+			key_def->tuple_extract_key_raw =
+				tuple_extract_key_sequential_raw<false>;
 		}
-		key_def->tuple_extract_key_raw = tuple_extract_key_slowpath_raw;
+	} else {
+		if (key_def->is_nullable) {
+			if (key_def_contains_sequential_parts(key_def)) {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<true, true>;
+			} else {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<false, true>;
+			}
+		} else {
+			if (key_def_contains_sequential_parts(key_def)) {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<true, false>;
+			} else {
+				key_def->tuple_extract_key =
+					tuple_extract_key_slowpath<false,
+								   false>;
+			}
+		}
+	}
+	if (key_def->is_nullable) {
+		key_def->tuple_extract_key_raw =
+			tuple_extract_key_slowpath_raw<true>;
+	} else {
+		key_def->tuple_extract_key_raw =
+			tuple_extract_key_slowpath_raw<false>;
 	}
 }
diff --git a/src/box/tuple_format.c b/src/box/tuple_format.c
index 819cdc71e..b2a4e82b0 100644
--- a/src/box/tuple_format.c
+++ b/src/box/tuple_format.c
@@ -261,6 +261,7 @@ tuple_format_alloc(struct key_def * const *keys, uint16_t key_count,
 		   uint32_t space_field_count)
 {
 	uint32_t index_field_count = 0;
+	uint32_t min_field_count = 0;
 	/* find max max field no */
 	for (uint16_t key_no = 0; key_no < key_count; ++key_no) {
 		const struct key_def *key_def = keys[key_no];
@@ -269,6 +270,10 @@ tuple_format_alloc(struct key_def * const *keys, uint16_t key_count,
 		for (; part < pend; part++) {
 			index_field_count = MAX(index_field_count,
 						part->fieldno + 1);
+			if (! part->is_nullable) {
+				min_field_count = MAX(min_field_count,
+						      part->fieldno + 1);
+			}
 		}
 	}
 	uint32_t field_count = MAX(space_field_count, index_field_count);
@@ -305,7 +310,7 @@ tuple_format_alloc(struct key_def * const *keys, uint16_t key_count,
 	format->field_count = field_count;
 	format->index_field_count = index_field_count;
 	format->exact_field_count = 0;
-	format->min_field_count = index_field_count;
+	format->min_field_count = min_field_count;
 	return format;
 
 error_name_hash_reserve:
@@ -474,6 +479,10 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
 	++field;
 	uint32_t i = 1;
 	uint32_t defined_field_count = MIN(field_count, format->field_count);
+	if (field_count < format->index_field_count) {
+		memset((char *)field_map - format->field_map_size, 0,
+		       format->field_map_size);
+	}
 	for (; i < defined_field_count; ++i, ++field) {
 		mp_type = mp_typeof(*pos);
 		if (key_mp_type_validate(field->type, mp_type, ER_FIELD_TYPE,
diff --git a/src/box/tuple_format.h b/src/box/tuple_format.h
index bb92e14bb..9b17ff054 100644
--- a/src/box/tuple_format.h
+++ b/src/box/tuple_format.h
@@ -304,7 +304,7 @@ static inline const char *
 tuple_field_raw(const struct tuple_format *format, const char *tuple,
 		const uint32_t *field_map, uint32_t field_no)
 {
-	if (likely(field_no < format->field_count)) {
+	if (likely(field_no < format->index_field_count)) {
 		/* Indexed field */
 
 		if (field_no == 0) {
@@ -313,8 +313,12 @@ tuple_field_raw(const struct tuple_format *format, const char *tuple,
 		}
 
 		int32_t offset_slot = format->fields[field_no].offset_slot;
-		if (offset_slot != TUPLE_OFFSET_SLOT_NIL)
-			return tuple + field_map[offset_slot];
+		if (offset_slot != TUPLE_OFFSET_SLOT_NIL) {
+			if (field_map[offset_slot] != 0)
+				return tuple + field_map[offset_slot];
+			else
+				return NULL;
+		}
 	}
 	ERROR_INJECT(ERRINJ_TUPLE_FIELD, return NULL);
 	uint32_t field_count = mp_decode_array(&tuple);
diff --git a/src/box/tuple_hash.cc b/src/box/tuple_hash.cc
index e08bc5ff5..f4ab5fb94 100644
--- a/src/box/tuple_hash.cc
+++ b/src/box/tuple_hash.cc
@@ -212,6 +212,7 @@ static const hasher_signature hash_arr[] = {
 
 #undef HASHER
 
+template <bool is_nullable>
 uint32_t
 tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def);
 
@@ -254,7 +255,10 @@ tuple_hash_func_set(struct key_def *key_def) {
 	}
 
 slowpath:
-	key_def->tuple_hash = tuple_hash_slowpath;
+	if (key_def->is_nullable)
+		key_def->tuple_hash = tuple_hash_slowpath<true>;
+	else
+		key_def->tuple_hash = tuple_hash_slowpath<false>;
 	key_def->key_hash = key_hash_slowpath;
 }
 
@@ -295,7 +299,16 @@ tuple_hash_field(uint32_t *ph1, uint32_t *pcarry, const char **field,
 	return size;
 }
 
+static inline uint32_t
+tuple_hash_null(uint32_t *ph1, uint32_t *pcarry)
+{
+	assert(mp_sizeof_nil() == 1);
+	const char null = 0xc0;
+	PMurHash32_Process(ph1, pcarry, &null, 1);
+	return mp_sizeof_nil();
+}
 
+template <bool is_nullable>
 uint32_t
 tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
 {
@@ -304,8 +317,12 @@ tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
 	uint32_t total_size = 0;
 	uint32_t prev_fieldno = key_def->parts[0].fieldno;
 	const char* field = tuple_field(tuple, key_def->parts[0].fieldno);
-	total_size += tuple_hash_field(&h, &carry, &field,
-				       key_def->parts[0].coll);
+	if (is_nullable && field == NULL) {
+		total_size += tuple_hash_null(&h, &carry);
+	} else {
+		total_size += tuple_hash_field(&h, &carry, &field,
+					       key_def->parts[0].coll);
+	}
 	for (uint32_t part_id = 1; part_id < key_def->part_count; part_id++) {
 		/* If parts of key_def are not sequential we need to call
 		 * tuple_field. Otherwise, tuple is hashed sequentially without
@@ -314,8 +331,13 @@ tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
 		if (prev_fieldno + 1 != key_def->parts[part_id].fieldno) {
 			field = tuple_field(tuple, key_def->parts[part_id].fieldno);
 		}
-		total_size += tuple_hash_field(&h, &carry, &field,
-					       key_def->parts[part_id].coll);
+		if (is_nullable && field == NULL) {
+			total_size += tuple_hash_null(&h, &carry);
+		} else {
+			total_size +=
+				tuple_hash_field(&h, &carry, &field,
+						 key_def->parts[part_id].coll);
+		}
 		prev_fieldno = key_def->parts[part_id].fieldno;
 	}
 
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index 8a24e62fe..833e45bd1 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -384,7 +384,7 @@ vy_stmt_new_surrogate_from_key(const char *key, enum iproto_type type,
 	assert(part_count <= field_count);
 	uint32_t nulls_count = field_count - cmp_def->part_count;
 	uint32_t bsize = mp_sizeof_array(field_count) +
-		mp_sizeof_nil() * nulls_count;
+			 mp_sizeof_nil() * nulls_count;
 	for (uint32_t i = 0; i < part_count; ++i) {
 		const struct key_part *part = &cmp_def->parts[i];
 		assert(part->fieldno < field_count);
@@ -445,14 +445,21 @@ vy_stmt_new_surrogate_delete(struct tuple_format *format,
 
 	const char *src_pos = src_data;
 	uint32_t src_count = mp_decode_array(&src_pos);
-	uint32_t field_count = format->index_field_count;
-	assert(src_count >= field_count);
-	(void) src_count;
+	assert(src_count >= format->min_field_count);
+	uint32_t field_count;
+	if (src_count < format->index_field_count) {
+		field_count = src_count;
+		memset((char *)field_map - format->field_map_size, 0,
+		       format->field_map_size);
+	} else {
+		field_count = format->index_field_count;
+	}
 	char *pos = mp_encode_array(data, field_count);
 	for (uint32_t i = 0; i < field_count; ++i) {
 		const struct tuple_field *field = &format->fields[i];
 		if (! field->is_key_part) {
-			/* Unindexed field - write NIL */
+			/* Unindexed field - write NIL. */
+			assert(i < src_count);
 			pos = mp_encode_nil(pos);
 			mp_next(&src_pos);
 			continue;
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index f884fca44..94477cd66 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -707,7 +707,7 @@ vy_tuple_key_contains_null(const struct tuple *tuple, const struct key_def *def)
 {
 	for (uint32_t i = 0; i < def->part_count; ++i) {
 		const char *field = tuple_field(tuple, def->parts[i].fieldno);
-		if (mp_typeof(*field) == MP_NIL)
+		if (field == NULL || mp_typeof(*field) == MP_NIL)
 			return true;
 	}
 	return false;
diff --git a/test/engine/null.result b/test/engine/null.result
index e68db1e49..2eeb32e03 100644
--- a/test/engine/null.result
+++ b/test/engine/null.result
@@ -751,3 +751,529 @@ s.index.i5:select()
 s:drop()
 ---
 ...
+--
+-- gh-2988: allow absense of tail nullable indexed fields.
+--
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+pk = s:create_index('pk')
+---
+...
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+---
+...
+-- Test tuple_compare_slowpath, tuple_compare_with_key_slowpath.
+s:replace{} -- Fail
+---
+- error: Tuple field count 0 is less than required (expected at least 1)
+...
+-- Compare full vs not full.
+s:replace{2}
+---
+- [2]
+...
+s:replace{1, 2}
+---
+- [1, 2]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+...
+sk:select{box.NULL}
+---
+- - [2]
+...
+sk:select{2}
+---
+- - [1, 2]
+...
+-- Compare not full vs full.
+s:replace{4, 5}
+---
+- [4, 5]
+...
+s:replace{3}
+---
+- [3]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+  - [3]
+  - [4, 5]
+...
+sk:select{box.NULL}
+---
+- - [2]
+  - [3]
+...
+sk:select{5}
+---
+- - [4, 5]
+...
+-- Compare extended keys.
+s:replace{7}
+---
+- [7]
+...
+s:replace{6}
+---
+- [6]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+  - [3]
+  - [4, 5]
+  - [6]
+  - [7]
+...
+sk:select{box.NULL}
+---
+- - [2]
+  - [3]
+  - [6]
+  - [7]
+...
+sk:select{}
+---
+- - [2]
+  - [3]
+  - [6]
+  - [7]
+  - [1, 2]
+  - [4, 5]
+...
+-- Test tuple extract key during dump for vinyl.
+box.snapshot()
+---
+- ok
+...
+sk:select{}
+---
+- - [2]
+  - [3]
+  - [6]
+  - [7]
+  - [1, 2]
+  - [4, 5]
+...
+s:select{}
+---
+- - [1, 2]
+  - [2]
+  - [3]
+  - [4, 5]
+  - [6]
+  - [7]
+...
+-- Test tuple_compare_sequential_nullable,
+-- tuple_compare_with_key_sequential.
+s:drop()
+---
+...
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+pk = s:create_index('pk')
+---
+...
+parts = {}
+---
+...
+parts[1] = {1, 'unsigned'}
+---
+...
+parts[2] = {2, 'unsigned', is_nullable = true}
+---
+...
+parts[3] = {3, 'unsigned', is_nullable = true}
+---
+...
+sk = s:create_index('sk', {parts = parts})
+---
+...
+-- Compare full vs not full.
+s:replace{1, 2, 3}
+---
+- [1, 2, 3]
+...
+s:replace{3}
+---
+- [3]
+...
+s:replace{2, 3}
+---
+- [2, 3]
+...
+sk:select{}
+---
+- - [1, 2, 3]
+  - [2, 3]
+  - [3]
+...
+sk:select{3, box.NULL}
+---
+- - [3]
+...
+sk:select{3, box.NULL, box.NULL}
+---
+- - [3]
+...
+sk:select{2}
+---
+- - [2, 3]
+...
+sk:select{2, 3}
+---
+- - [2, 3]
+...
+sk:select{3, 100}
+---
+- []
+...
+sk:select{3, box.NULL, 100}
+---
+- []
+...
+sk:select({3, box.NULL}, {iterator = 'GE'})
+---
+- - [3]
+...
+sk:select({3, box.NULL}, {iterator = 'LE'})
+---
+- - [3]
+  - [2, 3]
+  - [1, 2, 3]
+...
+s:select{}
+---
+- - [1, 2, 3]
+  - [2, 3]
+  - [3]
+...
+-- Test tuple extract key for vinyl.
+box.snapshot()
+---
+- ok
+...
+sk:select{}
+---
+- - [1, 2, 3]
+  - [2, 3]
+  - [3]
+...
+sk:select{3, box.NULL}
+---
+- - [3]
+...
+sk:select{3, box.NULL, box.NULL}
+---
+- - [3]
+...
+sk:select{2}
+---
+- - [2, 3]
+...
+sk:select{2, 3}
+---
+- - [2, 3]
+...
+sk:select{3, 100}
+---
+- []
+...
+sk:select{3, box.NULL, 100}
+---
+- []
+...
+sk:select({3, box.NULL}, {iterator = 'GE'})
+---
+- - [3]
+...
+sk:select({3, box.NULL}, {iterator = 'LE'})
+---
+- - [3]
+  - [2, 3]
+  - [1, 2, 3]
+...
+--
+-- Partially sequential keys. See tuple_extract_key.cc and
+-- contains_sequential_parts template flag.
+--
+s:drop()
+---
+...
+s = box.schema.space.create('test', {engine = engine})
+---
+...
+pk = s:create_index('pk')
+---
+...
+parts = {}
+---
+...
+parts[1] = {2, 'unsigned', is_nullable = true}
+---
+...
+parts[2] = {3, 'unsigned', is_nullable = true}
+---
+...
+parts[3] = {5, 'unsigned', is_nullable = true}
+---
+...
+parts[4] = {6, 'unsigned', is_nullable = true}
+---
+...
+parts[5] = {4, 'unsigned', is_nullable = true}
+---
+...
+parts[6] = {7, 'unsigned', is_nullable = true}
+---
+...
+sk = s:create_index('sk', {parts = parts})
+---
+...
+s:insert{1, 1, 1, 1, 1, 1, 1}
+---
+- [1, 1, 1, 1, 1, 1, 1]
+...
+s:insert{8, 1, 1, 1, 1, box.NULL}
+---
+- [8, 1, 1, 1, 1, null]
+...
+s:insert{9, 1, 1, 1, box.NULL}
+---
+- [9, 1, 1, 1, null]
+...
+s:insert{6, 6}
+---
+- [6, 6]
+...
+s:insert{10, 6, box.NULL}
+---
+- [10, 6, null]
+...
+s:insert{2, 2, 2, 2, 2, 2}
+---
+- [2, 2, 2, 2, 2, 2]
+...
+s:insert{7}
+---
+- [7]
+...
+s:insert{5, 5, 5}
+---
+- [5, 5, 5]
+...
+s:insert{3, 5, box.NULL, box.NULL, box.NULL}
+---
+- [3, 5, null, null, null]
+...
+s:insert{4, 5, 5, 5, box.NULL}
+---
+- [4, 5, 5, 5, null]
+...
+s:insert{11, 4, 4, 4}
+---
+- [11, 4, 4, 4]
+...
+s:insert{12, 4, box.NULL, 4}
+---
+- [12, 4, null, 4]
+...
+s:insert{13, 3, 3, 3, 3}
+---
+- [13, 3, 3, 3, 3]
+...
+s:insert{14, box.NULL, 3, box.NULL, 3}
+---
+- [14, null, 3, null, 3]
+...
+s:select{}
+---
+- - [1, 1, 1, 1, 1, 1, 1]
+  - [2, 2, 2, 2, 2, 2]
+  - [3, 5, null, null, null]
+  - [4, 5, 5, 5, null]
+  - [5, 5, 5]
+  - [6, 6]
+  - [7]
+  - [8, 1, 1, 1, 1, null]
+  - [9, 1, 1, 1, null]
+  - [10, 6, null]
+  - [11, 4, 4, 4]
+  - [12, 4, null, 4]
+  - [13, 3, 3, 3, 3]
+  - [14, null, 3, null, 3]
+...
+sk:select{}
+---
+- - [7]
+  - [14, null, 3, null, 3]
+  - [9, 1, 1, 1, null]
+  - [8, 1, 1, 1, 1, null]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [2, 2, 2, 2, 2, 2]
+  - [13, 3, 3, 3, 3]
+  - [12, 4, null, 4]
+  - [11, 4, 4, 4]
+  - [3, 5, null, null, null]
+  - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+  - [6, 6]
+  - [10, 6, null]
+...
+sk:select{5, 5, box.NULL}
+---
+- - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+...
+sk:select{5, 5, box.NULL, 100}
+---
+- []
+...
+sk:select({7, box.NULL}, {iterator = 'LT'})
+---
+- - [10, 6, null]
+  - [6, 6]
+  - [4, 5, 5, 5, null]
+  - [5, 5, 5]
+  - [3, 5, null, null, null]
+  - [11, 4, 4, 4]
+  - [12, 4, null, 4]
+  - [13, 3, 3, 3, 3]
+  - [2, 2, 2, 2, 2, 2]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [8, 1, 1, 1, 1, null]
+  - [9, 1, 1, 1, null]
+  - [14, null, 3, null, 3]
+  - [7]
+...
+box.snapshot()
+---
+- ok
+...
+sk:select{}
+---
+- - [7]
+  - [14, null, 3, null, 3]
+  - [9, 1, 1, 1, null]
+  - [8, 1, 1, 1, 1, null]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [2, 2, 2, 2, 2, 2]
+  - [13, 3, 3, 3, 3]
+  - [12, 4, null, 4]
+  - [11, 4, 4, 4]
+  - [3, 5, null, null, null]
+  - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+  - [6, 6]
+  - [10, 6, null]
+...
+sk:select{5, 5, box.NULL}
+---
+- - [5, 5, 5]
+  - [4, 5, 5, 5, null]
+...
+sk:select{5, 5, box.NULL, 100}
+---
+- []
+...
+sk:select({7, box.NULL}, {iterator = 'LT'})
+---
+- - [10, 6, null]
+  - [6, 6]
+  - [4, 5, 5, 5, null]
+  - [5, 5, 5]
+  - [3, 5, null, null, null]
+  - [11, 4, 4, 4]
+  - [12, 4, null, 4]
+  - [13, 3, 3, 3, 3]
+  - [2, 2, 2, 2, 2, 2]
+  - [1, 1, 1, 1, 1, 1, 1]
+  - [8, 1, 1, 1, 1, null]
+  - [9, 1, 1, 1, null]
+  - [14, null, 3, null, 3]
+  - [7]
+...
+s:drop()
+---
+...
+--
+-- The main case of absend nullable fields - create an index over
+-- them on not empty space (available on memtx only).
+--
+s = box.schema.space.create('test', {engine = 'memtx'})
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1}
+---
+- [1]
+...
+s:replace{2}
+---
+- [2]
+...
+s:replace{3}
+---
+- [3]
+...
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+---
+...
+s:replace{4}
+---
+- [4]
+...
+s:replace{5, 6}
+---
+- [5, 6]
+...
+s:replace{7, 8}
+---
+- [7, 8]
+...
+s:replace{9, box.NULL}
+---
+- [9, null]
+...
+s:select{}
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [5, 6]
+  - [7, 8]
+  - [9, null]
+...
+sk:select{}
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [9, null]
+  - [5, 6]
+  - [7, 8]
+...
+sk:select{box.NULL}
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [9, null]
+...
+s:drop()
+---
+...
diff --git a/test/engine/null.test.lua b/test/engine/null.test.lua
index 19a70d68a..ba329737d 100644
--- a/test/engine/null.test.lua
+++ b/test/engine/null.test.lua
@@ -242,3 +242,133 @@ s.index.i4:select()
 s.index.i5:select()
 
 s:drop()
+
+--
+-- gh-2988: allow absense of tail nullable indexed fields.
+--
+s = box.schema.space.create('test', {engine = engine})
+pk = s:create_index('pk')
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+
+-- Test tuple_compare_slowpath, tuple_compare_with_key_slowpath.
+
+s:replace{} -- Fail
+-- Compare full vs not full.
+s:replace{2}
+s:replace{1, 2}
+s:select{}
+sk:select{box.NULL}
+sk:select{2}
+-- Compare not full vs full.
+s:replace{4, 5}
+s:replace{3}
+s:select{}
+sk:select{box.NULL}
+sk:select{5}
+-- Compare extended keys.
+s:replace{7}
+s:replace{6}
+s:select{}
+sk:select{box.NULL}
+sk:select{}
+-- Test tuple extract key during dump for vinyl.
+box.snapshot()
+sk:select{}
+s:select{}
+
+-- Test tuple_compare_sequential_nullable,
+-- tuple_compare_with_key_sequential.
+s:drop()
+s = box.schema.space.create('test', {engine = engine})
+pk = s:create_index('pk')
+parts = {}
+parts[1] = {1, 'unsigned'}
+parts[2] = {2, 'unsigned', is_nullable = true}
+parts[3] = {3, 'unsigned', is_nullable = true}
+sk = s:create_index('sk', {parts = parts})
+-- Compare full vs not full.
+s:replace{1, 2, 3}
+s:replace{3}
+s:replace{2, 3}
+sk:select{}
+sk:select{3, box.NULL}
+sk:select{3, box.NULL, box.NULL}
+sk:select{2}
+sk:select{2, 3}
+sk:select{3, 100}
+sk:select{3, box.NULL, 100}
+sk:select({3, box.NULL}, {iterator = 'GE'})
+sk:select({3, box.NULL}, {iterator = 'LE'})
+s:select{}
+-- Test tuple extract key for vinyl.
+box.snapshot()
+sk:select{}
+sk:select{3, box.NULL}
+sk:select{3, box.NULL, box.NULL}
+sk:select{2}
+sk:select{2, 3}
+sk:select{3, 100}
+sk:select{3, box.NULL, 100}
+sk:select({3, box.NULL}, {iterator = 'GE'})
+sk:select({3, box.NULL}, {iterator = 'LE'})
+
+--
+-- Partially sequential keys. See tuple_extract_key.cc and
+-- contains_sequential_parts template flag.
+--
+s:drop()
+s = box.schema.space.create('test', {engine = engine})
+pk = s:create_index('pk')
+parts = {}
+parts[1] = {2, 'unsigned', is_nullable = true}
+parts[2] = {3, 'unsigned', is_nullable = true}
+parts[3] = {5, 'unsigned', is_nullable = true}
+parts[4] = {6, 'unsigned', is_nullable = true}
+parts[5] = {4, 'unsigned', is_nullable = true}
+parts[6] = {7, 'unsigned', is_nullable = true}
+sk = s:create_index('sk', {parts = parts})
+s:insert{1, 1, 1, 1, 1, 1, 1}
+s:insert{8, 1, 1, 1, 1, box.NULL}
+s:insert{9, 1, 1, 1, box.NULL}
+s:insert{6, 6}
+s:insert{10, 6, box.NULL}
+s:insert{2, 2, 2, 2, 2, 2}
+s:insert{7}
+s:insert{5, 5, 5}
+s:insert{3, 5, box.NULL, box.NULL, box.NULL}
+s:insert{4, 5, 5, 5, box.NULL}
+s:insert{11, 4, 4, 4}
+s:insert{12, 4, box.NULL, 4}
+s:insert{13, 3, 3, 3, 3}
+s:insert{14, box.NULL, 3, box.NULL, 3}
+s:select{}
+sk:select{}
+sk:select{5, 5, box.NULL}
+sk:select{5, 5, box.NULL, 100}
+sk:select({7, box.NULL}, {iterator = 'LT'})
+box.snapshot()
+sk:select{}
+sk:select{5, 5, box.NULL}
+sk:select{5, 5, box.NULL, 100}
+sk:select({7, box.NULL}, {iterator = 'LT'})
+
+s:drop()
+
+--
+-- The main case of absend nullable fields - create an index over
+-- them on not empty space (available on memtx only).
+--
+s = box.schema.space.create('test', {engine = 'memtx'})
+pk = s:create_index('pk')
+s:replace{1}
+s:replace{2}
+s:replace{3}
+sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
+s:replace{4}
+s:replace{5, 6}
+s:replace{7, 8}
+s:replace{9, box.NULL}
+s:select{}
+sk:select{}
+sk:select{box.NULL}
+s:drop()
-- 
2.11.0 (Apple Git-81)




More information about the Tarantool-patches mailing list