[patches] [schema 1/1] Allow to do not specify tail nullable index columns
Konstantin Osipov
kostja at tarantool.org
Thu Jan 25 15:26:15 MSK 2018
Hi,
What's your take on this patch?
* Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/01/25 13:59]:
> If a column is nullable and is the last defined one (via index parts
> or space format), it can be omited on insertion. Such absent fields
> are treated as NULLs in comparators and are not stored.
>
> Closes #2988
>
> Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
> ---
> src/box/tuple_compare.cc | 77 +++++--
> src/box/tuple_extract_key.cc | 173 +++++++++++---
> src/box/tuple_format.c | 11 +-
> src/box/tuple_format.h | 10 +-
> src/box/tuple_hash.cc | 32 ++-
> src/box/vy_stmt.c | 17 +-
> src/box/vy_stmt.h | 2 +-
> test/engine/null.result | 526 +++++++++++++++++++++++++++++++++++++++++++
> test/engine/null.test.lua | 130 +++++++++++
> 9 files changed, 912 insertions(+), 66 deletions(-)
>
> diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc
> index cf8fc130e..4c249a942 100644
> --- a/src/box/tuple_compare.cc
> +++ b/src/box/tuple_compare.cc
> @@ -471,8 +471,8 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
> part->fieldno);
> field_b = tuple_field_raw(format_b, tuple_b_raw, field_map_b,
> part->fieldno);
> - assert(field_a != NULL && field_b != NULL);
> if (! is_nullable) {
> + assert(field_a != NULL && field_b != NULL);
> rc = tuple_compare_field(field_a, field_b, part->type,
> part->coll);
> if (rc != 0)
> @@ -480,8 +480,10 @@ tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
> else
> continue;
> }
> - enum mp_type a_type = mp_typeof(*field_a);
> - enum mp_type b_type = mp_typeof(*field_b);
> + enum mp_type a_type =
> + field_a != NULL ? mp_typeof(*field_a) : MP_NIL;
> + enum mp_type b_type =
> + field_b != NULL ? mp_typeof(*field_b) : MP_NIL;
> if (a_type == MP_NIL) {
> if (b_type != MP_NIL)
> return -1;
> @@ -544,7 +546,8 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
> return tuple_compare_field(field, key, part->type,
> part->coll);
> }
> - enum mp_type a_type = mp_typeof(*field);
> + enum mp_type a_type =
> + field != NULL ? mp_typeof(*field) : MP_NIL;
> enum mp_type b_type = mp_typeof(*key);
> if (a_type == MP_NIL) {
> return b_type == MP_NIL ? 0 : -1;
> @@ -571,7 +574,8 @@ tuple_compare_with_key_slowpath(const struct tuple *tuple, const char *key,
> else
> continue;
> }
> - enum mp_type a_type = mp_typeof(*field);
> + enum mp_type a_type =
> + field != NULL ? mp_typeof(*field) : MP_NIL;
> enum mp_type b_type = mp_typeof(*key);
> if (a_type == MP_NIL) {
> if (b_type != MP_NIL)
> @@ -653,6 +657,7 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
>
> if (!is_sequential_nullable_tuples || !was_null_met)
> return 0;
> + assert(part_count == key_def->unique_part_count);
> end = key_def->parts + key_def->unique_part_count;
> for (; part < end; ++part, mp_next(&key_a), mp_next(&key_b)) {
> rc = tuple_compare_field(key_a, key_b, part->type, part->coll);
> @@ -662,10 +667,44 @@ key_compare_parts(const char *key_a, const char *key_b, uint32_t part_count,
> return 0;
> }
>
> -template<bool is_nullable>
> static inline int
> -tuple_compare_with_key_sequential(const struct tuple *tuple,
> - const char *key, uint32_t part_count, const struct key_def *key_def)
> +tuple_compare_with_key_sequential_nullable(const struct tuple *tuple,
> + const char *key,
> + uint32_t part_count,
> + const struct key_def *key_def)
> +{
> + assert(key_def_is_sequential(key_def));
> + assert(key_def->is_nullable);
> + const char *tuple_key = tuple_data(tuple);
> + uint32_t field_count = mp_decode_array(&tuple_key);
> + uint32_t defined_part_count = MIN(field_count, part_count);
> + int rc = key_compare_parts<true, false>(tuple_key, key,
> + defined_part_count, key_def);
> + if (rc != 0)
> + return rc;
> + /*
> + * If some tuple indexed fields are absent, then check
> + * corresponding key fields to be equal to NULL.
> + */
> + if (field_count < part_count) {
> + /*
> + * Key's and tuple's first field_count fields are
> + * equal, and their bsize too.
> + */
> + key += tuple->bsize - mp_sizeof_array(field_count);
> + for (uint32_t i = field_count; i < part_count; ++i,
> + mp_next(&key)) {
> + if (mp_typeof(*key) != MP_NIL)
> + return -1;
> + }
> + }
> + return 0;
> +}
> +
> +static inline int
> +tuple_compare_with_key_sequential(const struct tuple *tuple, const char *key,
> + uint32_t part_count,
> + const struct key_def *key_def)
> {
> assert(key_def_is_sequential(key_def));
> const char *tuple_key = tuple_data(tuple);
> @@ -673,8 +712,8 @@ tuple_compare_with_key_sequential(const struct tuple *tuple,
> assert(tuple_field_count >= key_def->part_count);
> assert(part_count <= key_def->part_count);
> (void) tuple_field_count;
> - return key_compare_parts<is_nullable, false>(tuple_key, key, part_count,
> - key_def);
> + return key_compare_parts<false, false>(tuple_key, key, part_count,
> + key_def);
> }
>
> int
> @@ -722,15 +761,13 @@ tuple_compare_sequential_nullable(const struct tuple *tuple_a,
> assert(key_def->is_nullable);
> assert(key_def_is_sequential(key_def));
> const char *key_a = tuple_data(tuple_a);
> - uint32_t field_count = mp_decode_array(&key_a);
> - assert(field_count >= key_def->part_count);
> + uint32_t field_count_a = mp_decode_array(&key_a);
> const char *key_b = tuple_data(tuple_b);
> - field_count = mp_decode_array(&key_b);
> - assert(field_count >= key_def->part_count);
> - (void) field_count;
> - return key_compare_parts<true, true>(key_a, key_b,
> - key_def->unique_part_count,
> - key_def);
> + uint32_t field_count_b = mp_decode_array(&key_b);
> + uint32_t part_count = MIN(field_count_a, field_count_b);
> + if (part_count > key_def->unique_part_count)
> + part_count = key_def->unique_part_count;
> + return key_compare_parts<true, true>(key_a, key_b, part_count, key_def);
> }
>
> template <int TYPE>
> @@ -1124,7 +1161,7 @@ tuple_compare_with_key_create(const struct key_def *def)
> {
> if (def->is_nullable) {
> if (key_def_is_sequential(def))
> - return tuple_compare_with_key_sequential<true>;
> + return tuple_compare_with_key_sequential_nullable;
> return tuple_compare_with_key_slowpath<true>;
> }
> if (!key_def_has_collation(def)) {
> @@ -1147,7 +1184,7 @@ tuple_compare_with_key_create(const struct key_def *def)
> }
> }
> if (key_def_is_sequential(def))
> - return tuple_compare_with_key_sequential<false>;
> + return tuple_compare_with_key_sequential;
> return tuple_compare_with_key_slowpath<false>;
> }
>
> diff --git a/src/box/tuple_extract_key.cc b/src/box/tuple_extract_key.cc
> index 8765c16b5..32d367140 100644
> --- a/src/box/tuple_extract_key.cc
> +++ b/src/box/tuple_extract_key.cc
> @@ -1,29 +1,39 @@
> #include "tuple_extract_key.h"
> #include "fiber.h"
>
> +enum { MSGPACK_NULL = 0xc0 };
> +
> /**
> * Optimized version of tuple_extract_key_raw() for sequential key defs
> * @copydoc tuple_extract_key_raw()
> */
> +template <bool is_nullable>
> static char *
> tuple_extract_key_sequential_raw(const char *data, const char *data_end,
> const struct key_def *key_def,
> uint32_t *key_size)
> {
> assert(key_def_is_sequential(key_def));
> + assert(is_nullable == key_def->is_nullable);
> + assert(data_end != NULL);
> + assert(mp_sizeof_nil() == 1);
> const char *field_start = data;
> uint32_t bsize = mp_sizeof_array(key_def->part_count);
> -
> - mp_decode_array(&field_start);
> + uint32_t field_count = mp_decode_array(&field_start);
> const char *field_end = field_start;
> -
> - for (uint32_t i = 0; i < key_def->part_count; i++)
> - mp_next(&field_end);
> + uint32_t null_count;
> + if (! is_nullable || field_count > key_def->part_count) {
> + for (uint32_t i = 0; i < key_def->part_count; i++)
> + mp_next(&field_end);
> + null_count = 0;
> + } else {
> + null_count = key_def->part_count - field_count;
> + field_end = data_end;
> + bsize += null_count * mp_sizeof_nil();
> + }
> + assert(field_end - field_start <= data_end - data);
> bsize += field_end - field_start;
>
> - assert(!data_end || (field_end - field_start <= data_end - data));
> - (void) data_end;
> -
> char *key = (char *) region_alloc(&fiber()->gc, bsize);
> if (key == NULL) {
> diag_set(OutOfMemory, bsize, "region",
> @@ -32,6 +42,10 @@ tuple_extract_key_sequential_raw(const char *data, const char *data_end,
> }
> char *key_buf = mp_encode_array(key, key_def->part_count);
> memcpy(key_buf, field_start, field_end - field_start);
> + if (is_nullable && null_count > 0) {
> + key_buf += field_end - field_start;
> + memset(key_buf, MSGPACK_NULL, null_count);
> + }
>
> if (key_size != NULL)
> *key_size = bsize;
> @@ -42,54 +56,73 @@ tuple_extract_key_sequential_raw(const char *data, const char *data_end,
> * Optimized version of tuple_extract_key() for sequential key defs
> * @copydoc tuple_extract_key()
> */
> +template <bool is_nullable>
> static inline char *
> tuple_extract_key_sequential(const struct tuple *tuple,
> const struct key_def *key_def,
> uint32_t *key_size)
> {
> assert(key_def_is_sequential(key_def));
> + assert(is_nullable == key_def->is_nullable);
> const char *data = tuple_data(tuple);
> - return tuple_extract_key_sequential_raw(data, NULL, key_def, key_size);
> + const char *data_end = data + tuple->bsize;
> + return tuple_extract_key_sequential_raw<is_nullable>(data, data_end,
> + key_def, key_size);
> }
>
> /**
> * General-purpose implementation of tuple_extract_key()
> * @copydoc tuple_extract_key()
> */
> -template <bool contains_sequential_parts>
> +template <bool contains_sequential_parts, bool is_nullable>
> static char *
> tuple_extract_key_slowpath(const struct tuple *tuple,
> const struct key_def *key_def, uint32_t *key_size)
> {
> + assert(key_def->is_nullable == is_nullable);
> + assert(mp_sizeof_nil() == 1);
> const char *data = tuple_data(tuple);
> uint32_t part_count = key_def->part_count;
> uint32_t bsize = mp_sizeof_array(part_count);
> const struct tuple_format *format = tuple_format(tuple);
> const uint32_t *field_map = tuple_field_map(tuple);
> + const char *tuple_end = data + tuple->bsize;
>
> /* Calculate the key size. */
> for (uint32_t i = 0; i < part_count; ++i) {
> const char *field =
> tuple_field_raw(format, data, field_map,
> key_def->parts[i].fieldno);
> + if (is_nullable && field == NULL) {
> + assert(key_def->parts[i].is_nullable);
> + bsize += mp_sizeof_nil();
> + continue;
> + }
> + assert(field != NULL);
> const char *end = field;
> if (contains_sequential_parts) {
> /*
> * Skip sequential part in order to
> * minimize tuple_field_raw() calls.
> */
> - for (; i < key_def->part_count - 1; i++) {
> + for (; i < part_count - 1; i++) {
> if (key_def->parts[i].fieldno + 1 !=
> - key_def->parts[i + 1].fieldno) {
> + key_def->parts[i + 1].fieldno) {
> /*
> * End of sequential part.
> */
> break;
> }
> - mp_next(&end);
> + if (!is_nullable || end < tuple_end)
> + mp_next(&end);
> + else
> + bsize += mp_sizeof_nil();
> }
> }
> - mp_next(&end);
> + if (!is_nullable || end < tuple_end)
> + mp_next(&end);
> + else
> + bsize += mp_sizeof_nil();
> bsize += end - field;
> }
>
> @@ -103,27 +136,42 @@ tuple_extract_key_slowpath(const struct tuple *tuple,
> const char *field =
> tuple_field_raw(format, data, field_map,
> key_def->parts[i].fieldno);
> + if (is_nullable && field == NULL) {
> + key_buf = mp_encode_nil(key_buf);
> + continue;
> + }
> const char *end = field;
> + uint32_t null_count = 0;
> if (contains_sequential_parts) {
> /*
> * Skip sequential part in order to
> * minimize tuple_field_raw() calls.
> */
> - for (; i < key_def->part_count - 1; i++) {
> + for (; i < part_count - 1; i++) {
> if (key_def->parts[i].fieldno + 1 !=
> - key_def->parts[i + 1].fieldno) {
> + key_def->parts[i + 1].fieldno) {
> /*
> * End of sequential part.
> */
> break;
> }
> - mp_next(&end);
> + if (!is_nullable || end < tuple_end)
> + mp_next(&end);
> + else
> + ++null_count;
> }
> }
> - mp_next(&end);
> + if (!is_nullable || end < tuple_end)
> + mp_next(&end);
> + else
> + ++null_count;
> bsize = end - field;
> memcpy(key_buf, field, bsize);
> key_buf += bsize;
> + if (is_nullable && null_count != 0) {
> + memset(key_buf, MSGPACK_NULL, null_count);
> + key_buf += null_count * mp_sizeof_nil();
> + }
> }
> if (key_size != NULL)
> *key_size = key_buf - key;
> @@ -134,11 +182,14 @@ tuple_extract_key_slowpath(const struct tuple *tuple,
> * General-purpose version of tuple_extract_key_raw()
> * @copydoc tuple_extract_key_raw()
> */
> +template <bool is_nullable>
> static char *
> tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
> const struct key_def *key_def,
> uint32_t *key_size)
> {
> + assert(is_nullable == key_def->is_nullable);
> + assert(mp_sizeof_nil() == 1);
> /* allocate buffer with maximal possible size */
> char *key = (char *) region_alloc(&fiber()->gc, data_end - data);
> if (key == NULL) {
> @@ -148,7 +199,12 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
> }
> char *key_buf = mp_encode_array(key, key_def->part_count);
> const char *field0 = data;
> - mp_decode_array(&field0);
> + uint32_t field_count = mp_decode_array(&field0);
> + /*
> + * A tuple can not be empty - at least a pk always exists.
> + */
> + assert(field_count > 0);
> + (void) field_count;
> const char *field0_end = field0;
> mp_next(&field0_end);
> const char *field = field0;
> @@ -156,11 +212,14 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
> uint32_t current_fieldno = 0;
> for (uint32_t i = 0; i < key_def->part_count; i++) {
> uint32_t fieldno = key_def->parts[i].fieldno;
> + uint32_t null_count = 0;
> for (; i < key_def->part_count - 1; i++) {
> if (key_def->parts[i].fieldno + 1 !=
> key_def->parts[i + 1].fieldno)
> break;
> }
> + uint32_t end_fieldno = key_def->parts[i].fieldno;
> +
> if (fieldno < current_fieldno) {
> /* Rewind. */
> field = field0;
> @@ -168,6 +227,19 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
> current_fieldno = 0;
> }
>
> + /*
> + * First fieldno in a key columns can be out of
> + * tuple size for nullable indexes because of
> + * absense of indexed fields. Treat such fields
> + * as NULLs.
> + */
> + if (is_nullable && fieldno >= field_count) {
> + /* Nullify entire columns range. */
> + null_count = fieldno - end_fieldno + 1;
> + memset(key_buf, MSGPACK_NULL, null_count);
> + key_buf += null_count * mp_sizeof_nil();
> + continue;
> + }
> while (current_fieldno < fieldno) {
> /* search first field of key in tuple raw data */
> field = field_end;
> @@ -175,14 +247,27 @@ tuple_extract_key_slowpath_raw(const char *data, const char *data_end,
> current_fieldno++;
> }
>
> - while (current_fieldno < key_def->parts[i].fieldno) {
> - /* search the last field in subsequence */
> - mp_next(&field_end);
> - current_fieldno++;
> + /*
> + * If the last fieldno is out of tuple size, then
> + * fill rest of columns with NULLs.
> + */
> + if (is_nullable && end_fieldno >= field_count) {
> + null_count = end_fieldno - field_count + 1;
> + field_end = data_end;
> + } else {
> + while (current_fieldno < end_fieldno) {
> + mp_next(&field_end);
> + current_fieldno++;
> + }
> }
> memcpy(key_buf, field, field_end - field);
> key_buf += field_end - field;
> - assert(key_buf - key <= data_end - data);
> + if (is_nullable && null_count != 0) {
> + memset(key_buf, MSGPACK_NULL, null_count);
> + key_buf += null_count * mp_sizeof_nil();
> + } else {
> + assert(key_buf - key <= data_end - data);
> + }
> }
> if (key_size != NULL)
> *key_size = (uint32_t)(key_buf - key);
> @@ -207,16 +292,42 @@ void
> tuple_extract_key_set(struct key_def *key_def)
> {
> if (key_def_is_sequential(key_def)) {
> - key_def->tuple_extract_key = tuple_extract_key_sequential;
> - key_def->tuple_extract_key_raw = tuple_extract_key_sequential_raw;
> - } else {
> - if (key_def_contains_sequential_parts(key_def)) {
> + if (key_def->is_nullable) {
> key_def->tuple_extract_key =
> - tuple_extract_key_slowpath<true>;
> + tuple_extract_key_sequential<true>;
> + key_def->tuple_extract_key_raw =
> + tuple_extract_key_sequential_raw<true>;
> } else {
> key_def->tuple_extract_key =
> - tuple_extract_key_slowpath<false>;
> + tuple_extract_key_sequential<false>;
> + key_def->tuple_extract_key_raw =
> + tuple_extract_key_sequential_raw<false>;
> }
> - key_def->tuple_extract_key_raw = tuple_extract_key_slowpath_raw;
> + } else {
> + if (key_def->is_nullable) {
> + if (key_def_contains_sequential_parts(key_def)) {
> + key_def->tuple_extract_key =
> + tuple_extract_key_slowpath<true, true>;
> + } else {
> + key_def->tuple_extract_key =
> + tuple_extract_key_slowpath<false, true>;
> + }
> + } else {
> + if (key_def_contains_sequential_parts(key_def)) {
> + key_def->tuple_extract_key =
> + tuple_extract_key_slowpath<true, false>;
> + } else {
> + key_def->tuple_extract_key =
> + tuple_extract_key_slowpath<false,
> + false>;
> + }
> + }
> + }
> + if (key_def->is_nullable) {
> + key_def->tuple_extract_key_raw =
> + tuple_extract_key_slowpath_raw<true>;
> + } else {
> + key_def->tuple_extract_key_raw =
> + tuple_extract_key_slowpath_raw<false>;
> }
> }
> diff --git a/src/box/tuple_format.c b/src/box/tuple_format.c
> index 819cdc71e..b2a4e82b0 100644
> --- a/src/box/tuple_format.c
> +++ b/src/box/tuple_format.c
> @@ -261,6 +261,7 @@ tuple_format_alloc(struct key_def * const *keys, uint16_t key_count,
> uint32_t space_field_count)
> {
> uint32_t index_field_count = 0;
> + uint32_t min_field_count = 0;
> /* find max max field no */
> for (uint16_t key_no = 0; key_no < key_count; ++key_no) {
> const struct key_def *key_def = keys[key_no];
> @@ -269,6 +270,10 @@ tuple_format_alloc(struct key_def * const *keys, uint16_t key_count,
> for (; part < pend; part++) {
> index_field_count = MAX(index_field_count,
> part->fieldno + 1);
> + if (! part->is_nullable) {
> + min_field_count = MAX(min_field_count,
> + part->fieldno + 1);
> + }
> }
> }
> uint32_t field_count = MAX(space_field_count, index_field_count);
> @@ -305,7 +310,7 @@ tuple_format_alloc(struct key_def * const *keys, uint16_t key_count,
> format->field_count = field_count;
> format->index_field_count = index_field_count;
> format->exact_field_count = 0;
> - format->min_field_count = index_field_count;
> + format->min_field_count = min_field_count;
> return format;
>
> error_name_hash_reserve:
> @@ -474,6 +479,10 @@ tuple_init_field_map(const struct tuple_format *format, uint32_t *field_map,
> ++field;
> uint32_t i = 1;
> uint32_t defined_field_count = MIN(field_count, format->field_count);
> + if (field_count < format->index_field_count) {
> + memset((char *)field_map - format->field_map_size, 0,
> + format->field_map_size);
> + }
> for (; i < defined_field_count; ++i, ++field) {
> mp_type = mp_typeof(*pos);
> if (key_mp_type_validate(field->type, mp_type, ER_FIELD_TYPE,
> diff --git a/src/box/tuple_format.h b/src/box/tuple_format.h
> index bb92e14bb..9b17ff054 100644
> --- a/src/box/tuple_format.h
> +++ b/src/box/tuple_format.h
> @@ -304,7 +304,7 @@ static inline const char *
> tuple_field_raw(const struct tuple_format *format, const char *tuple,
> const uint32_t *field_map, uint32_t field_no)
> {
> - if (likely(field_no < format->field_count)) {
> + if (likely(field_no < format->index_field_count)) {
> /* Indexed field */
>
> if (field_no == 0) {
> @@ -313,8 +313,12 @@ tuple_field_raw(const struct tuple_format *format, const char *tuple,
> }
>
> int32_t offset_slot = format->fields[field_no].offset_slot;
> - if (offset_slot != TUPLE_OFFSET_SLOT_NIL)
> - return tuple + field_map[offset_slot];
> + if (offset_slot != TUPLE_OFFSET_SLOT_NIL) {
> + if (field_map[offset_slot] != 0)
> + return tuple + field_map[offset_slot];
> + else
> + return NULL;
> + }
> }
> ERROR_INJECT(ERRINJ_TUPLE_FIELD, return NULL);
> uint32_t field_count = mp_decode_array(&tuple);
> diff --git a/src/box/tuple_hash.cc b/src/box/tuple_hash.cc
> index e08bc5ff5..f4ab5fb94 100644
> --- a/src/box/tuple_hash.cc
> +++ b/src/box/tuple_hash.cc
> @@ -212,6 +212,7 @@ static const hasher_signature hash_arr[] = {
>
> #undef HASHER
>
> +template <bool is_nullable>
> uint32_t
> tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def);
>
> @@ -254,7 +255,10 @@ tuple_hash_func_set(struct key_def *key_def) {
> }
>
> slowpath:
> - key_def->tuple_hash = tuple_hash_slowpath;
> + if (key_def->is_nullable)
> + key_def->tuple_hash = tuple_hash_slowpath<true>;
> + else
> + key_def->tuple_hash = tuple_hash_slowpath<false>;
> key_def->key_hash = key_hash_slowpath;
> }
>
> @@ -295,7 +299,16 @@ tuple_hash_field(uint32_t *ph1, uint32_t *pcarry, const char **field,
> return size;
> }
>
> +static inline uint32_t
> +tuple_hash_null(uint32_t *ph1, uint32_t *pcarry)
> +{
> + assert(mp_sizeof_nil() == 1);
> + const char null = 0xc0;
> + PMurHash32_Process(ph1, pcarry, &null, 1);
> + return mp_sizeof_nil();
> +}
>
> +template <bool is_nullable>
> uint32_t
> tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
> {
> @@ -304,8 +317,12 @@ tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
> uint32_t total_size = 0;
> uint32_t prev_fieldno = key_def->parts[0].fieldno;
> const char* field = tuple_field(tuple, key_def->parts[0].fieldno);
> - total_size += tuple_hash_field(&h, &carry, &field,
> - key_def->parts[0].coll);
> + if (is_nullable && field == NULL) {
> + total_size += tuple_hash_null(&h, &carry);
> + } else {
> + total_size += tuple_hash_field(&h, &carry, &field,
> + key_def->parts[0].coll);
> + }
> for (uint32_t part_id = 1; part_id < key_def->part_count; part_id++) {
> /* If parts of key_def are not sequential we need to call
> * tuple_field. Otherwise, tuple is hashed sequentially without
> @@ -314,8 +331,13 @@ tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
> if (prev_fieldno + 1 != key_def->parts[part_id].fieldno) {
> field = tuple_field(tuple, key_def->parts[part_id].fieldno);
> }
> - total_size += tuple_hash_field(&h, &carry, &field,
> - key_def->parts[part_id].coll);
> + if (is_nullable && field == NULL) {
> + total_size += tuple_hash_null(&h, &carry);
> + } else {
> + total_size +=
> + tuple_hash_field(&h, &carry, &field,
> + key_def->parts[part_id].coll);
> + }
> prev_fieldno = key_def->parts[part_id].fieldno;
> }
>
> diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
> index 8a24e62fe..833e45bd1 100644
> --- a/src/box/vy_stmt.c
> +++ b/src/box/vy_stmt.c
> @@ -384,7 +384,7 @@ vy_stmt_new_surrogate_from_key(const char *key, enum iproto_type type,
> assert(part_count <= field_count);
> uint32_t nulls_count = field_count - cmp_def->part_count;
> uint32_t bsize = mp_sizeof_array(field_count) +
> - mp_sizeof_nil() * nulls_count;
> + mp_sizeof_nil() * nulls_count;
> for (uint32_t i = 0; i < part_count; ++i) {
> const struct key_part *part = &cmp_def->parts[i];
> assert(part->fieldno < field_count);
> @@ -445,14 +445,21 @@ vy_stmt_new_surrogate_delete(struct tuple_format *format,
>
> const char *src_pos = src_data;
> uint32_t src_count = mp_decode_array(&src_pos);
> - uint32_t field_count = format->index_field_count;
> - assert(src_count >= field_count);
> - (void) src_count;
> + assert(src_count >= format->min_field_count);
> + uint32_t field_count;
> + if (src_count < format->index_field_count) {
> + field_count = src_count;
> + memset((char *)field_map - format->field_map_size, 0,
> + format->field_map_size);
> + } else {
> + field_count = format->index_field_count;
> + }
> char *pos = mp_encode_array(data, field_count);
> for (uint32_t i = 0; i < field_count; ++i) {
> const struct tuple_field *field = &format->fields[i];
> if (! field->is_key_part) {
> - /* Unindexed field - write NIL */
> + /* Unindexed field - write NIL. */
> + assert(i < src_count);
> pos = mp_encode_nil(pos);
> mp_next(&src_pos);
> continue;
> diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
> index f884fca44..94477cd66 100644
> --- a/src/box/vy_stmt.h
> +++ b/src/box/vy_stmt.h
> @@ -707,7 +707,7 @@ vy_tuple_key_contains_null(const struct tuple *tuple, const struct key_def *def)
> {
> for (uint32_t i = 0; i < def->part_count; ++i) {
> const char *field = tuple_field(tuple, def->parts[i].fieldno);
> - if (mp_typeof(*field) == MP_NIL)
> + if (field == NULL || mp_typeof(*field) == MP_NIL)
> return true;
> }
> return false;
> diff --git a/test/engine/null.result b/test/engine/null.result
> index e68db1e49..2eeb32e03 100644
> --- a/test/engine/null.result
> +++ b/test/engine/null.result
> @@ -751,3 +751,529 @@ s.index.i5:select()
> s:drop()
> ---
> ...
> +--
> +-- gh-2988: allow absense of tail nullable indexed fields.
> +--
> +s = box.schema.space.create('test', {engine = engine})
> +---
> +...
> +pk = s:create_index('pk')
> +---
> +...
> +sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
> +---
> +...
> +-- Test tuple_compare_slowpath, tuple_compare_with_key_slowpath.
> +s:replace{} -- Fail
> +---
> +- error: Tuple field count 0 is less than required (expected at least 1)
> +...
> +-- Compare full vs not full.
> +s:replace{2}
> +---
> +- [2]
> +...
> +s:replace{1, 2}
> +---
> +- [1, 2]
> +...
> +s:select{}
> +---
> +- - [1, 2]
> + - [2]
> +...
> +sk:select{box.NULL}
> +---
> +- - [2]
> +...
> +sk:select{2}
> +---
> +- - [1, 2]
> +...
> +-- Compare not full vs full.
> +s:replace{4, 5}
> +---
> +- [4, 5]
> +...
> +s:replace{3}
> +---
> +- [3]
> +...
> +s:select{}
> +---
> +- - [1, 2]
> + - [2]
> + - [3]
> + - [4, 5]
> +...
> +sk:select{box.NULL}
> +---
> +- - [2]
> + - [3]
> +...
> +sk:select{5}
> +---
> +- - [4, 5]
> +...
> +-- Compare extended keys.
> +s:replace{7}
> +---
> +- [7]
> +...
> +s:replace{6}
> +---
> +- [6]
> +...
> +s:select{}
> +---
> +- - [1, 2]
> + - [2]
> + - [3]
> + - [4, 5]
> + - [6]
> + - [7]
> +...
> +sk:select{box.NULL}
> +---
> +- - [2]
> + - [3]
> + - [6]
> + - [7]
> +...
> +sk:select{}
> +---
> +- - [2]
> + - [3]
> + - [6]
> + - [7]
> + - [1, 2]
> + - [4, 5]
> +...
> +-- Test tuple extract key during dump for vinyl.
> +box.snapshot()
> +---
> +- ok
> +...
> +sk:select{}
> +---
> +- - [2]
> + - [3]
> + - [6]
> + - [7]
> + - [1, 2]
> + - [4, 5]
> +...
> +s:select{}
> +---
> +- - [1, 2]
> + - [2]
> + - [3]
> + - [4, 5]
> + - [6]
> + - [7]
> +...
> +-- Test tuple_compare_sequential_nullable,
> +-- tuple_compare_with_key_sequential.
> +s:drop()
> +---
> +...
> +s = box.schema.space.create('test', {engine = engine})
> +---
> +...
> +pk = s:create_index('pk')
> +---
> +...
> +parts = {}
> +---
> +...
> +parts[1] = {1, 'unsigned'}
> +---
> +...
> +parts[2] = {2, 'unsigned', is_nullable = true}
> +---
> +...
> +parts[3] = {3, 'unsigned', is_nullable = true}
> +---
> +...
> +sk = s:create_index('sk', {parts = parts})
> +---
> +...
> +-- Compare full vs not full.
> +s:replace{1, 2, 3}
> +---
> +- [1, 2, 3]
> +...
> +s:replace{3}
> +---
> +- [3]
> +...
> +s:replace{2, 3}
> +---
> +- [2, 3]
> +...
> +sk:select{}
> +---
> +- - [1, 2, 3]
> + - [2, 3]
> + - [3]
> +...
> +sk:select{3, box.NULL}
> +---
> +- - [3]
> +...
> +sk:select{3, box.NULL, box.NULL}
> +---
> +- - [3]
> +...
> +sk:select{2}
> +---
> +- - [2, 3]
> +...
> +sk:select{2, 3}
> +---
> +- - [2, 3]
> +...
> +sk:select{3, 100}
> +---
> +- []
> +...
> +sk:select{3, box.NULL, 100}
> +---
> +- []
> +...
> +sk:select({3, box.NULL}, {iterator = 'GE'})
> +---
> +- - [3]
> +...
> +sk:select({3, box.NULL}, {iterator = 'LE'})
> +---
> +- - [3]
> + - [2, 3]
> + - [1, 2, 3]
> +...
> +s:select{}
> +---
> +- - [1, 2, 3]
> + - [2, 3]
> + - [3]
> +...
> +-- Test tuple extract key for vinyl.
> +box.snapshot()
> +---
> +- ok
> +...
> +sk:select{}
> +---
> +- - [1, 2, 3]
> + - [2, 3]
> + - [3]
> +...
> +sk:select{3, box.NULL}
> +---
> +- - [3]
> +...
> +sk:select{3, box.NULL, box.NULL}
> +---
> +- - [3]
> +...
> +sk:select{2}
> +---
> +- - [2, 3]
> +...
> +sk:select{2, 3}
> +---
> +- - [2, 3]
> +...
> +sk:select{3, 100}
> +---
> +- []
> +...
> +sk:select{3, box.NULL, 100}
> +---
> +- []
> +...
> +sk:select({3, box.NULL}, {iterator = 'GE'})
> +---
> +- - [3]
> +...
> +sk:select({3, box.NULL}, {iterator = 'LE'})
> +---
> +- - [3]
> + - [2, 3]
> + - [1, 2, 3]
> +...
> +--
> +-- Partially sequential keys. See tuple_extract_key.cc and
> +-- contains_sequential_parts template flag.
> +--
> +s:drop()
> +---
> +...
> +s = box.schema.space.create('test', {engine = engine})
> +---
> +...
> +pk = s:create_index('pk')
> +---
> +...
> +parts = {}
> +---
> +...
> +parts[1] = {2, 'unsigned', is_nullable = true}
> +---
> +...
> +parts[2] = {3, 'unsigned', is_nullable = true}
> +---
> +...
> +parts[3] = {5, 'unsigned', is_nullable = true}
> +---
> +...
> +parts[4] = {6, 'unsigned', is_nullable = true}
> +---
> +...
> +parts[5] = {4, 'unsigned', is_nullable = true}
> +---
> +...
> +parts[6] = {7, 'unsigned', is_nullable = true}
> +---
> +...
> +sk = s:create_index('sk', {parts = parts})
> +---
> +...
> +s:insert{1, 1, 1, 1, 1, 1, 1}
> +---
> +- [1, 1, 1, 1, 1, 1, 1]
> +...
> +s:insert{8, 1, 1, 1, 1, box.NULL}
> +---
> +- [8, 1, 1, 1, 1, null]
> +...
> +s:insert{9, 1, 1, 1, box.NULL}
> +---
> +- [9, 1, 1, 1, null]
> +...
> +s:insert{6, 6}
> +---
> +- [6, 6]
> +...
> +s:insert{10, 6, box.NULL}
> +---
> +- [10, 6, null]
> +...
> +s:insert{2, 2, 2, 2, 2, 2}
> +---
> +- [2, 2, 2, 2, 2, 2]
> +...
> +s:insert{7}
> +---
> +- [7]
> +...
> +s:insert{5, 5, 5}
> +---
> +- [5, 5, 5]
> +...
> +s:insert{3, 5, box.NULL, box.NULL, box.NULL}
> +---
> +- [3, 5, null, null, null]
> +...
> +s:insert{4, 5, 5, 5, box.NULL}
> +---
> +- [4, 5, 5, 5, null]
> +...
> +s:insert{11, 4, 4, 4}
> +---
> +- [11, 4, 4, 4]
> +...
> +s:insert{12, 4, box.NULL, 4}
> +---
> +- [12, 4, null, 4]
> +...
> +s:insert{13, 3, 3, 3, 3}
> +---
> +- [13, 3, 3, 3, 3]
> +...
> +s:insert{14, box.NULL, 3, box.NULL, 3}
> +---
> +- [14, null, 3, null, 3]
> +...
> +s:select{}
> +---
> +- - [1, 1, 1, 1, 1, 1, 1]
> + - [2, 2, 2, 2, 2, 2]
> + - [3, 5, null, null, null]
> + - [4, 5, 5, 5, null]
> + - [5, 5, 5]
> + - [6, 6]
> + - [7]
> + - [8, 1, 1, 1, 1, null]
> + - [9, 1, 1, 1, null]
> + - [10, 6, null]
> + - [11, 4, 4, 4]
> + - [12, 4, null, 4]
> + - [13, 3, 3, 3, 3]
> + - [14, null, 3, null, 3]
> +...
> +sk:select{}
> +---
> +- - [7]
> + - [14, null, 3, null, 3]
> + - [9, 1, 1, 1, null]
> + - [8, 1, 1, 1, 1, null]
> + - [1, 1, 1, 1, 1, 1, 1]
> + - [2, 2, 2, 2, 2, 2]
> + - [13, 3, 3, 3, 3]
> + - [12, 4, null, 4]
> + - [11, 4, 4, 4]
> + - [3, 5, null, null, null]
> + - [5, 5, 5]
> + - [4, 5, 5, 5, null]
> + - [6, 6]
> + - [10, 6, null]
> +...
> +sk:select{5, 5, box.NULL}
> +---
> +- - [5, 5, 5]
> + - [4, 5, 5, 5, null]
> +...
> +sk:select{5, 5, box.NULL, 100}
> +---
> +- []
> +...
> +sk:select({7, box.NULL}, {iterator = 'LT'})
> +---
> +- - [10, 6, null]
> + - [6, 6]
> + - [4, 5, 5, 5, null]
> + - [5, 5, 5]
> + - [3, 5, null, null, null]
> + - [11, 4, 4, 4]
> + - [12, 4, null, 4]
> + - [13, 3, 3, 3, 3]
> + - [2, 2, 2, 2, 2, 2]
> + - [1, 1, 1, 1, 1, 1, 1]
> + - [8, 1, 1, 1, 1, null]
> + - [9, 1, 1, 1, null]
> + - [14, null, 3, null, 3]
> + - [7]
> +...
> +box.snapshot()
> +---
> +- ok
> +...
> +sk:select{}
> +---
> +- - [7]
> + - [14, null, 3, null, 3]
> + - [9, 1, 1, 1, null]
> + - [8, 1, 1, 1, 1, null]
> + - [1, 1, 1, 1, 1, 1, 1]
> + - [2, 2, 2, 2, 2, 2]
> + - [13, 3, 3, 3, 3]
> + - [12, 4, null, 4]
> + - [11, 4, 4, 4]
> + - [3, 5, null, null, null]
> + - [5, 5, 5]
> + - [4, 5, 5, 5, null]
> + - [6, 6]
> + - [10, 6, null]
> +...
> +sk:select{5, 5, box.NULL}
> +---
> +- - [5, 5, 5]
> + - [4, 5, 5, 5, null]
> +...
> +sk:select{5, 5, box.NULL, 100}
> +---
> +- []
> +...
> +sk:select({7, box.NULL}, {iterator = 'LT'})
> +---
> +- - [10, 6, null]
> + - [6, 6]
> + - [4, 5, 5, 5, null]
> + - [5, 5, 5]
> + - [3, 5, null, null, null]
> + - [11, 4, 4, 4]
> + - [12, 4, null, 4]
> + - [13, 3, 3, 3, 3]
> + - [2, 2, 2, 2, 2, 2]
> + - [1, 1, 1, 1, 1, 1, 1]
> + - [8, 1, 1, 1, 1, null]
> + - [9, 1, 1, 1, null]
> + - [14, null, 3, null, 3]
> + - [7]
> +...
> +s:drop()
> +---
> +...
> +--
> +-- The main case of absend nullable fields - create an index over
> +-- them on not empty space (available on memtx only).
> +--
> +s = box.schema.space.create('test', {engine = 'memtx'})
> +---
> +...
> +pk = s:create_index('pk')
> +---
> +...
> +s:replace{1}
> +---
> +- [1]
> +...
> +s:replace{2}
> +---
> +- [2]
> +...
> +s:replace{3}
> +---
> +- [3]
> +...
> +sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
> +---
> +...
> +s:replace{4}
> +---
> +- [4]
> +...
> +s:replace{5, 6}
> +---
> +- [5, 6]
> +...
> +s:replace{7, 8}
> +---
> +- [7, 8]
> +...
> +s:replace{9, box.NULL}
> +---
> +- [9, null]
> +...
> +s:select{}
> +---
> +- - [1]
> + - [2]
> + - [3]
> + - [4]
> + - [5, 6]
> + - [7, 8]
> + - [9, null]
> +...
> +sk:select{}
> +---
> +- - [1]
> + - [2]
> + - [3]
> + - [4]
> + - [9, null]
> + - [5, 6]
> + - [7, 8]
> +...
> +sk:select{box.NULL}
> +---
> +- - [1]
> + - [2]
> + - [3]
> + - [4]
> + - [9, null]
> +...
> +s:drop()
> +---
> +...
> diff --git a/test/engine/null.test.lua b/test/engine/null.test.lua
> index 19a70d68a..ba329737d 100644
> --- a/test/engine/null.test.lua
> +++ b/test/engine/null.test.lua
> @@ -242,3 +242,133 @@ s.index.i4:select()
> s.index.i5:select()
>
> s:drop()
> +
> +--
> +-- gh-2988: allow absense of tail nullable indexed fields.
> +--
> +s = box.schema.space.create('test', {engine = engine})
> +pk = s:create_index('pk')
> +sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
> +
> +-- Test tuple_compare_slowpath, tuple_compare_with_key_slowpath.
> +
> +s:replace{} -- Fail
> +-- Compare full vs not full.
> +s:replace{2}
> +s:replace{1, 2}
> +s:select{}
> +sk:select{box.NULL}
> +sk:select{2}
> +-- Compare not full vs full.
> +s:replace{4, 5}
> +s:replace{3}
> +s:select{}
> +sk:select{box.NULL}
> +sk:select{5}
> +-- Compare extended keys.
> +s:replace{7}
> +s:replace{6}
> +s:select{}
> +sk:select{box.NULL}
> +sk:select{}
> +-- Test tuple extract key during dump for vinyl.
> +box.snapshot()
> +sk:select{}
> +s:select{}
> +
> +-- Test tuple_compare_sequential_nullable,
> +-- tuple_compare_with_key_sequential.
> +s:drop()
> +s = box.schema.space.create('test', {engine = engine})
> +pk = s:create_index('pk')
> +parts = {}
> +parts[1] = {1, 'unsigned'}
> +parts[2] = {2, 'unsigned', is_nullable = true}
> +parts[3] = {3, 'unsigned', is_nullable = true}
> +sk = s:create_index('sk', {parts = parts})
> +-- Compare full vs not full.
> +s:replace{1, 2, 3}
> +s:replace{3}
> +s:replace{2, 3}
> +sk:select{}
> +sk:select{3, box.NULL}
> +sk:select{3, box.NULL, box.NULL}
> +sk:select{2}
> +sk:select{2, 3}
> +sk:select{3, 100}
> +sk:select{3, box.NULL, 100}
> +sk:select({3, box.NULL}, {iterator = 'GE'})
> +sk:select({3, box.NULL}, {iterator = 'LE'})
> +s:select{}
> +-- Test tuple extract key for vinyl.
> +box.snapshot()
> +sk:select{}
> +sk:select{3, box.NULL}
> +sk:select{3, box.NULL, box.NULL}
> +sk:select{2}
> +sk:select{2, 3}
> +sk:select{3, 100}
> +sk:select{3, box.NULL, 100}
> +sk:select({3, box.NULL}, {iterator = 'GE'})
> +sk:select({3, box.NULL}, {iterator = 'LE'})
> +
> +--
> +-- Partially sequential keys. See tuple_extract_key.cc and
> +-- contains_sequential_parts template flag.
> +--
> +s:drop()
> +s = box.schema.space.create('test', {engine = engine})
> +pk = s:create_index('pk')
> +parts = {}
> +parts[1] = {2, 'unsigned', is_nullable = true}
> +parts[2] = {3, 'unsigned', is_nullable = true}
> +parts[3] = {5, 'unsigned', is_nullable = true}
> +parts[4] = {6, 'unsigned', is_nullable = true}
> +parts[5] = {4, 'unsigned', is_nullable = true}
> +parts[6] = {7, 'unsigned', is_nullable = true}
> +sk = s:create_index('sk', {parts = parts})
> +s:insert{1, 1, 1, 1, 1, 1, 1}
> +s:insert{8, 1, 1, 1, 1, box.NULL}
> +s:insert{9, 1, 1, 1, box.NULL}
> +s:insert{6, 6}
> +s:insert{10, 6, box.NULL}
> +s:insert{2, 2, 2, 2, 2, 2}
> +s:insert{7}
> +s:insert{5, 5, 5}
> +s:insert{3, 5, box.NULL, box.NULL, box.NULL}
> +s:insert{4, 5, 5, 5, box.NULL}
> +s:insert{11, 4, 4, 4}
> +s:insert{12, 4, box.NULL, 4}
> +s:insert{13, 3, 3, 3, 3}
> +s:insert{14, box.NULL, 3, box.NULL, 3}
> +s:select{}
> +sk:select{}
> +sk:select{5, 5, box.NULL}
> +sk:select{5, 5, box.NULL, 100}
> +sk:select({7, box.NULL}, {iterator = 'LT'})
> +box.snapshot()
> +sk:select{}
> +sk:select{5, 5, box.NULL}
> +sk:select{5, 5, box.NULL, 100}
> +sk:select({7, box.NULL}, {iterator = 'LT'})
> +
> +s:drop()
> +
> +--
> +-- The main case of absend nullable fields - create an index over
> +-- them on not empty space (available on memtx only).
> +--
> +s = box.schema.space.create('test', {engine = 'memtx'})
> +pk = s:create_index('pk')
> +s:replace{1}
> +s:replace{2}
> +s:replace{3}
> +sk = s:create_index('sk', {parts = {{2, 'unsigned', is_nullable = true}}})
> +s:replace{4}
> +s:replace{5, 6}
> +s:replace{7, 8}
> +s:replace{9, box.NULL}
> +s:select{}
> +sk:select{}
> +sk:select{box.NULL}
> +s:drop()
> --
> 2.11.0 (Apple Git-81)
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.org - www.twitter.com/kostja_osipov
More information about the Tarantool-patches
mailing list