[Tarantool-patches] [PATCH 4/4] box: introduce 1 byte field map offsets

Ilya Kosarev i.kosarev at tarantool.org
Tue Jan 19 02:50:33 MSK 2021


Now tiny tuples use 1 byte offsets instead of 4 byte offsets. This
allows to save even more memory for tiny tuples.

Closes #5385
---
 src/box/field_map.c          | 14 +++---
 src/box/field_map.h          | 19 +++++---
 src/box/lua/tuple.c          |  3 +-
 src/box/memtx_engine.c       |  7 +--
 src/box/sql.c                |  5 ++-
 src/box/tuple.c              | 27 +++++++-----
 src/box/tuple.h              | 39 +++++++++--------
 src/box/tuple_compare.cc     | 84 +++++++++++++++++++++++-------------
 src/box/tuple_extract_key.cc | 25 +++++++----
 src/box/tuple_format.c       | 13 +++---
 src/box/tuple_format.h       |  3 +-
 src/box/tuple_hash.cc        | 15 ++++---
 src/box/vy_stmt.c            | 18 +++++---
 13 files changed, 167 insertions(+), 105 deletions(-)

diff --git a/src/box/field_map.c b/src/box/field_map.c
index dc903115e..6f8b99ddf 100644
--- a/src/box/field_map.c
+++ b/src/box/field_map.c
@@ -80,7 +80,7 @@ field_map_builder_slot_extent_new(struct field_map_builder *builder,
 }
 
 void
-field_map_build(struct field_map_builder *builder, char *buffer)
+field_map_build(struct field_map_builder *builder, char *buffer, bool is_tiny)
 {
 	/*
 	 * To initialize the field map and its extents, prepare
@@ -97,8 +97,8 @@ field_map_build(struct field_map_builder *builder, char *buffer)
 	 * The buffer size is assumed to be sufficient to write
 	 * field_map_build_size(builder) bytes there.
 	 */
-	uint32_t *field_map =
-		(uint32_t *)(buffer + field_map_build_size(builder));
+	uint8_t *field_map =
+		(uint8_t *)(buffer + field_map_build_size(builder, is_tiny));
 	char *extent_wptr = buffer;
 	for (int32_t i = -1; i >= -(int32_t)builder->slot_count; i--) {
 		/*
@@ -108,13 +108,17 @@ field_map_build(struct field_map_builder *builder, char *buffer)
 		 * explicitly.
 		 */
 		if (!builder->slots[i].has_extent) {
-			store_u32(&field_map[i], builder->slots[i].offset);
+			is_tiny ? store_u8(&field_map[i],
+					   builder->slots[i].offset) :
+				  store_u32(&field_map[i * sizeof(uint32_t)],
+					    builder->slots[i].offset);
 			continue;
 		}
 		struct field_map_builder_slot_extent *extent =
 						builder->slots[i].extent;
 		/** Retrive memory for the extent. */
-		store_u32(&field_map[i], extent_wptr - (char *)field_map);
+		store_u32(&field_map[i * sizeof(uint32_t)],
+			  extent_wptr - (char *)field_map);
 		store_u32(extent_wptr, extent->size);
 		uint32_t extent_offset_sz = extent->size * sizeof(uint32_t);
 		memcpy(&((uint32_t *) extent_wptr)[1], extent->offset,
diff --git a/src/box/field_map.h b/src/box/field_map.h
index d8ef726a1..427f14701 100644
--- a/src/box/field_map.h
+++ b/src/box/field_map.h
@@ -149,15 +149,17 @@ struct field_map_builder_slot {
  * When a field is not in the data tuple, its offset is 0.
  */
 static inline uint32_t
-field_map_get_offset(const uint32_t *field_map, int32_t offset_slot,
-		     int multikey_idx)
+field_map_get_offset(const uint8_t *field_map, int32_t offset_slot,
+		     int multikey_idx, bool is_tiny)
 {
 	/*
 	 * Can not access field_map as a normal uint32 array
 	 * because its alignment may be < 4 bytes. Need to use
 	 * unaligned store-load operations explicitly.
 	 */
-	uint32_t offset = load_u32(&field_map[offset_slot]);
+	uint32_t offset = is_tiny ? load_u8(&field_map[offset_slot]) :
+				    load_u32(&field_map[offset_slot *
+							sizeof(uint32_t)]);
 	if (multikey_idx != MULTIKEY_NONE && (int32_t)offset < 0) {
 		/**
 		 * The field_map extent has the following
@@ -213,16 +215,18 @@ static inline int
 field_map_builder_set_slot(struct field_map_builder *builder,
 			   int32_t offset_slot, uint32_t offset,
 			   int32_t multikey_idx, uint32_t multikey_count,
-			   struct region *region)
+			   struct region *region, bool *is_tiny)
 {
 	assert(offset_slot < 0);
 	assert((uint32_t)-offset_slot <= builder->slot_count);
 	assert(offset > 0);
 	if (multikey_idx == MULTIKEY_NONE) {
 		builder->slots[offset_slot].offset = offset;
+		*is_tiny = ((*is_tiny) && (offset <= UINT8_MAX));
 	} else {
 		assert(multikey_idx >= 0);
 		assert(multikey_idx < (int32_t)multikey_count);
+		*is_tiny = false;
 		struct field_map_builder_slot_extent *extent;
 		if (builder->slots[offset_slot].has_extent) {
 			extent = builder->slots[offset_slot].extent;
@@ -243,9 +247,10 @@ field_map_builder_set_slot(struct field_map_builder *builder,
  * Calculate the size of tuple field_map to be built.
  */
 static inline uint32_t
-field_map_build_size(struct field_map_builder *builder)
+field_map_build_size(struct field_map_builder *builder, bool is_tiny)
 {
-	return builder->slot_count * sizeof(uint32_t) +
+	return builder->slot_count * is_tiny * sizeof(uint8_t) +
+	       builder->slot_count * !is_tiny * sizeof(uint32_t) +
 	       builder->extents_size;
 }
 
@@ -255,6 +260,6 @@ field_map_build_size(struct field_map_builder *builder)
  * The buffer must have at least field_map_build_size(builder) bytes.
  */
 void
-field_map_build(struct field_map_builder *builder, char *buffer);
+field_map_build(struct field_map_builder *builder, char *buffer, bool is_tiny);
 
 #endif /* TARANTOOL_BOX_FIELD_MAP_H_INCLUDED */
diff --git a/src/box/lua/tuple.c b/src/box/lua/tuple.c
index 3e6f043b4..0e54cf379 100644
--- a/src/box/lua/tuple.c
+++ b/src/box/lua/tuple.c
@@ -641,7 +641,8 @@ lbox_tuple_field_by_path(struct lua_State *L)
 					     tuple_data(tuple),
 					     tuple_field_map(tuple),
 					     path, (uint32_t)len,
-					     lua_hashstring(L, 2));
+					     lua_hashstring(L, 2),
+					     tuple_is_tiny(tuple));
 	if (field == NULL)
 		return 0;
 	luamp_decode(L, luaL_msgpack_default, &field);
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index adb90e1c8..0871e17fc 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -1222,9 +1222,9 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
 	size_t tuple_len = end - data;
 	bool is_tiny = (tuple_len <= UINT8_MAX);
 	struct field_map_builder builder;
-	if (tuple_field_map_create(format, data, true, &builder) != 0)
+	if (tuple_field_map_create(format, data, true, &builder, &is_tiny) != 0)
 		goto end;
-	uint32_t field_map_size = field_map_build_size(&builder);
+	uint32_t field_map_size = field_map_build_size(&builder, is_tiny);
 	/*
 	 * Data offset is calculated from the begin of the struct
 	 * tuple base, not from memtx_tuple, because the struct
@@ -1232,6 +1232,7 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
 	 */
 	is_tiny = (is_tiny && (sizeof(struct tuple) +
 			       field_map_size <= MAX_TINY_DATA_OFFSET));
+	field_map_size = field_map_build_size(&builder, is_tiny);
 	uint32_t extra_size = field_map_size +
 			      !is_tiny * sizeof(struct tuple_extra);
 	uint32_t data_offset = sizeof(struct tuple) + extra_size;
@@ -1276,7 +1277,7 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
 	tuple_set_data_offset(tuple, data_offset);
 	tuple_set_dirty_bit(tuple, false);
 	char *raw = (char *) tuple + data_offset;
-	field_map_build(&builder, raw - field_map_size);
+	field_map_build(&builder, raw - field_map_size, is_tiny);
 	memcpy(raw, data, tuple_len);
 	say_debug("%s(%zu) = %p", __func__, tuple_len, memtx_tuple);
 end:
diff --git a/src/box/sql.c b/src/box/sql.c
index 59e1e88fc..3702f4f19 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -727,7 +727,7 @@ tarantoolsqlIdxKeyCompare(struct BtCursor *cursor,
 	struct tuple *tuple;
 	const char *base;
 	struct tuple_format *format;
-	const uint32_t *field_map;
+	const uint8_t *field_map;
 	uint32_t field_count, next_fieldno = 0;
 	const char *p, *field0;
 	u32 i, n;
@@ -776,7 +776,8 @@ tarantoolsqlIdxKeyCompare(struct BtCursor *cursor,
 				uint32_t field_offset =
 					field_map_get_offset(field_map,
 							     field->offset_slot,
-							     MULTIKEY_NONE);
+							     MULTIKEY_NONE,
+							     tuple_is_tiny(tuple));
 				p = base + field_offset;
 			}
 		}
diff --git a/src/box/tuple.c b/src/box/tuple.c
index db95d5872..e9e9b7af2 100644
--- a/src/box/tuple.c
+++ b/src/box/tuple.c
@@ -82,11 +82,12 @@ runtime_tuple_new(struct tuple_format *format, const char *data, const char *end
 	size_t data_len = end - data;
 	bool is_tiny = (data_len <= UINT8_MAX);
 	struct field_map_builder builder;
-	if (tuple_field_map_create(format, data, true, &builder) != 0)
+	if (tuple_field_map_create(format, data, true, &builder, &is_tiny) != 0)
 		goto end;
-	uint32_t field_map_size = field_map_build_size(&builder);
+	uint32_t field_map_size = field_map_build_size(&builder, is_tiny);
 	is_tiny = (is_tiny && (sizeof(struct tuple) +
 			       field_map_size <= MAX_TINY_DATA_OFFSET));
+	field_map_size = field_map_build_size(&builder, is_tiny);
 	uint32_t data_offset = sizeof(struct tuple) + field_map_size +
 			       !is_tiny * sizeof(uint32_t);
 	assert(!is_tiny || data_offset <= MAX_TINY_DATA_OFFSET);
@@ -113,7 +114,7 @@ runtime_tuple_new(struct tuple_format *format, const char *data, const char *end
 	tuple_set_data_offset(tuple, data_offset);
 	tuple_set_dirty_bit(tuple, false);
 	char *raw = (char *) tuple + data_offset;
-	field_map_build(&builder, raw - field_map_size);
+	field_map_build(&builder, raw - field_map_size, is_tiny);
 	memcpy(raw, data, data_len);
 	say_debug("%s(%zu) = %p", __func__, data_len, tuple);
 end:
@@ -141,7 +142,9 @@ tuple_validate_raw(struct tuple_format *format, const char *tuple)
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
 	struct field_map_builder builder;
-	int rc = tuple_field_map_create(format, tuple, true, &builder);
+	bool is_tiny = false;
+	int rc = tuple_field_map_create(format, tuple, true, &builder,
+					&is_tiny);
 	region_truncate(region, region_svp);
 	return rc;
 }
@@ -494,8 +497,9 @@ tuple_go_to_path(const char **data, const char *path, uint32_t path_len,
 
 const char *
 tuple_field_raw_by_full_path(struct tuple_format *format, const char *tuple,
-			     const uint32_t *field_map, const char *path,
-			     uint32_t path_len, uint32_t path_hash)
+			     const uint8_t *field_map, const char *path,
+			     uint32_t path_len, uint32_t path_hash,
+			     bool is_tiny)
 {
 	assert(path_len > 0);
 	uint32_t fieldno;
@@ -507,7 +511,8 @@ tuple_field_raw_by_full_path(struct tuple_format *format, const char *tuple,
 	 */
 	if (tuple_fieldno_by_name(format->dict, path, path_len, path_hash,
 				  &fieldno) == 0)
-		return tuple_field_raw(format, tuple, field_map, fieldno);
+		return tuple_field_raw(format, tuple, field_map, fieldno,
+				       is_tiny);
 	struct json_lexer lexer;
 	struct json_token token;
 	json_lexer_create(&lexer, path, path_len, TUPLE_INDEX_BASE);
@@ -545,13 +550,13 @@ tuple_field_raw_by_full_path(struct tuple_format *format, const char *tuple,
 	return tuple_field_raw_by_path(format, tuple, field_map, fieldno,
 				       path + lexer.offset,
 				       path_len - lexer.offset,
-				       NULL, MULTIKEY_NONE);
+				       NULL, MULTIKEY_NONE, is_tiny);
 }
 
 uint32_t
 tuple_raw_multikey_count(struct tuple_format *format, const char *data,
-			       const uint32_t *field_map,
-			       struct key_def *key_def)
+			 const uint8_t *field_map,
+			 struct key_def *key_def, bool is_tiny)
 {
 	assert(key_def->is_multikey);
 	const char *array_raw =
@@ -559,7 +564,7 @@ tuple_raw_multikey_count(struct tuple_format *format, const char *data,
 					key_def->multikey_fieldno,
 					key_def->multikey_path,
 					key_def->multikey_path_len,
-					NULL, MULTIKEY_NONE);
+					NULL, MULTIKEY_NONE, is_tiny);
 	if (array_raw == NULL)
 		return 0;
 	enum mp_type type = mp_typeof(*array_raw);
diff --git a/src/box/tuple.h b/src/box/tuple.h
index 9eac52f01..685524672 100644
--- a/src/box/tuple.h
+++ b/src/box/tuple.h
@@ -723,11 +723,10 @@ tuple_validate(struct tuple_format *format, struct tuple *tuple)
  * @returns a field map for the tuple.
  * @sa tuple_field_map_create()
  */
-static inline const uint32_t *
+static inline const uint8_t *
 tuple_field_map(struct tuple *tuple)
 {
-	return (const uint32_t *) ((const char *) tuple +
-				   tuple_data_offset(tuple));
+	return (uint8_t *)tuple + tuple_data_offset(tuple);
 }
 
 /**
@@ -804,9 +803,10 @@ tuple_field_go_to_key(const char **field, const char *key, int len);
  */
 static inline const char *
 tuple_field_raw_by_path(struct tuple_format *format, const char *tuple,
-			const uint32_t *field_map, uint32_t fieldno,
+			const uint8_t *field_map, uint32_t fieldno,
 			const char *path, uint32_t path_len,
-			int32_t *offset_slot_hint, int multikey_idx)
+			int32_t *offset_slot_hint, int multikey_idx,
+			bool is_tiny)
 {
 	int32_t offset_slot;
 	if (offset_slot_hint != NULL &&
@@ -853,7 +853,7 @@ tuple_field_raw_by_path(struct tuple_format *format, const char *tuple,
 offset_slot_access:
 		/* Indexed field */
 		offset = field_map_get_offset(field_map, offset_slot,
-					      multikey_idx);
+					      multikey_idx, is_tiny);
 		if (offset == 0)
 			return NULL;
 		tuple += offset;
@@ -887,7 +887,7 @@ parse:
  */
 static inline const char *
 tuple_field_raw(struct tuple_format *format, const char *tuple,
-		const uint32_t *field_map, uint32_t field_no)
+		const uint8_t *field_map, uint32_t field_no, bool is_tiny)
 {
 	if (likely(field_no < format->index_field_count)) {
 		int32_t offset_slot;
@@ -903,7 +903,7 @@ tuple_field_raw(struct tuple_format *format, const char *tuple,
 		if (offset_slot == TUPLE_OFFSET_SLOT_NIL)
 			goto parse;
 		offset = field_map_get_offset(field_map, offset_slot,
-					      MULTIKEY_NONE);
+					      MULTIKEY_NONE, is_tiny);
 		if (offset == 0)
 			return NULL;
 		tuple += offset;
@@ -931,7 +931,8 @@ static inline const char *
 tuple_field(struct tuple *tuple, uint32_t fieldno)
 {
 	return tuple_field_raw(tuple_format(tuple), tuple_data(tuple),
-			       tuple_field_map(tuple), fieldno);
+			       tuple_field_map(tuple), fieldno,
+			       tuple_is_tiny(tuple));
 }
 
 /**
@@ -951,8 +952,9 @@ tuple_field(struct tuple *tuple, uint32_t fieldno)
  */
 const char *
 tuple_field_raw_by_full_path(struct tuple_format *format, const char *tuple,
-			     const uint32_t *field_map, const char *path,
-			     uint32_t path_len, uint32_t path_hash);
+			     const uint8_t *field_map, const char *path,
+			     uint32_t path_len, uint32_t path_hash,
+			     bool is_tiny);
 
 /**
  * Get a tuple field pointed to by an index part and multikey
@@ -966,8 +968,8 @@ tuple_field_raw_by_full_path(struct tuple_format *format, const char *tuple,
  */
 static inline const char *
 tuple_field_raw_by_part(struct tuple_format *format, const char *data,
-			const uint32_t *field_map,
-			struct key_part *part, int multikey_idx)
+			const uint8_t *field_map,
+			struct key_part *part, int multikey_idx, bool is_tiny)
 {
 	if (unlikely(part->format_epoch != format->epoch)) {
 		assert(format->epoch != 0);
@@ -980,7 +982,8 @@ tuple_field_raw_by_part(struct tuple_format *format, const char *data,
 	}
 	return tuple_field_raw_by_path(format, data, field_map, part->fieldno,
 				       part->path, part->path_len,
-				       &part->offset_slot_cache, multikey_idx);
+				       &part->offset_slot_cache, multikey_idx,
+				       is_tiny);
 }
 
 /**
@@ -996,7 +999,7 @@ tuple_field_by_part(struct tuple *tuple, struct key_part *part,
 {
 	return tuple_field_raw_by_part(tuple_format(tuple), tuple_data(tuple),
 				       tuple_field_map(tuple), part,
-				       multikey_idx);
+				       multikey_idx, tuple_is_tiny(tuple));
 }
 
 /**
@@ -1010,7 +1013,8 @@ tuple_field_by_part(struct tuple *tuple, struct key_part *part,
  */
 uint32_t
 tuple_raw_multikey_count(struct tuple_format *format, const char *data,
-			 const uint32_t *field_map, struct key_def *key_def);
+			 const uint8_t *field_map, struct key_def *key_def,
+			 bool is_tiny);
 
 /**
  * Get count of multikey index keys in tuple by given multikey
@@ -1023,7 +1027,8 @@ static inline uint32_t
 tuple_multikey_count(struct tuple *tuple, struct key_def *key_def)
 {
 	return tuple_raw_multikey_count(tuple_format(tuple), tuple_data(tuple),
-					tuple_field_map(tuple), key_def);
+					tuple_field_map(tuple), key_def,
+					tuple_is_tiny(tuple));
 }
 
 /**
diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc
index eb148c2f5..d9b286738 100644
--- a/src/box/tuple_compare.cc
+++ b/src/box/tuple_compare.cc
@@ -571,8 +571,8 @@ tuple_compare_slowpath(struct tuple *tuple_a, hint_t tuple_a_hint,
 	bool was_null_met = false;
 	struct tuple_format *format_a = tuple_format(tuple_a);
 	struct tuple_format *format_b = tuple_format(tuple_b);
-	const uint32_t *field_map_a = tuple_field_map(tuple_a);
-	const uint32_t *field_map_b = tuple_field_map(tuple_b);
+	const uint8_t *field_map_a = tuple_field_map(tuple_a);
+	const uint8_t *field_map_b = tuple_field_map(tuple_b);
 	struct key_part *end;
 	const char *field_a, *field_b;
 	enum mp_type a_type, b_type;
@@ -585,22 +585,28 @@ tuple_compare_slowpath(struct tuple *tuple_a, hint_t tuple_a_hint,
 		if (is_multikey) {
 			field_a = tuple_field_raw_by_part(format_a, tuple_a_raw,
 							  field_map_a, part,
-							  (int)tuple_a_hint);
+							  (int)tuple_a_hint,
+							  tuple_is_tiny(tuple_a));
 			field_b = tuple_field_raw_by_part(format_b, tuple_b_raw,
 							  field_map_b, part,
-							  (int)tuple_b_hint);
+							  (int)tuple_b_hint,
+							  tuple_is_tiny(tuple_b));
 		} else if (has_json_paths) {
 			field_a = tuple_field_raw_by_part(format_a, tuple_a_raw,
 							  field_map_a, part,
-							  MULTIKEY_NONE);
+							  MULTIKEY_NONE,
+							  tuple_is_tiny(tuple_a));
 			field_b = tuple_field_raw_by_part(format_b, tuple_b_raw,
 							  field_map_b, part,
-							  MULTIKEY_NONE);
+							  MULTIKEY_NONE,
+							  tuple_is_tiny(tuple_b));
 		} else {
 			field_a = tuple_field_raw(format_a, tuple_a_raw,
-						  field_map_a, part->fieldno);
+						  field_map_a, part->fieldno,
+						  tuple_is_tiny(tuple_a));
 			field_b = tuple_field_raw(format_b, tuple_b_raw,
-						  field_map_b, part->fieldno);
+						  field_map_b, part->fieldno,
+						  tuple_is_tiny(tuple_b));
 		}
 		assert(has_optional_parts ||
 		       (field_a != NULL && field_b != NULL));
@@ -651,22 +657,28 @@ tuple_compare_slowpath(struct tuple *tuple_a, hint_t tuple_a_hint,
 		if (is_multikey) {
 			field_a = tuple_field_raw_by_part(format_a, tuple_a_raw,
 							  field_map_a, part,
-							  (int)tuple_a_hint);
+							  (int)tuple_a_hint,
+							  tuple_is_tiny(tuple_a));
 			field_b = tuple_field_raw_by_part(format_b, tuple_b_raw,
 							  field_map_b, part,
-							  (int)tuple_b_hint);
+							  (int)tuple_b_hint,
+							  tuple_is_tiny(tuple_b));
 		} else if (has_json_paths) {
 			field_a = tuple_field_raw_by_part(format_a, tuple_a_raw,
 							  field_map_a, part,
-							  MULTIKEY_NONE);
+							  MULTIKEY_NONE,
+							  tuple_is_tiny(tuple_a));
 			field_b = tuple_field_raw_by_part(format_b, tuple_b_raw,
 							  field_map_b, part,
-							  MULTIKEY_NONE);
+							  MULTIKEY_NONE,
+							  tuple_is_tiny(tuple_b));
 		} else {
 			field_a = tuple_field_raw(format_a, tuple_a_raw,
-						  field_map_a, part->fieldno);
+						  field_map_a, part->fieldno,
+						  tuple_is_tiny(tuple_a));
 			field_b = tuple_field_raw(format_b, tuple_b_raw,
-						  field_map_b, part->fieldno);
+						  field_map_b, part->fieldno,
+						  tuple_is_tiny(tuple_b));
 		}
 		/*
 		 * Extended parts are primary, and they can not
@@ -703,21 +715,24 @@ tuple_compare_with_key_slowpath(struct tuple *tuple, hint_t tuple_hint,
 	struct key_part *part = key_def->parts;
 	struct tuple_format *format = tuple_format(tuple);
 	const char *tuple_raw = tuple_data(tuple);
-	const uint32_t *field_map = tuple_field_map(tuple);
+	const uint8_t *field_map = tuple_field_map(tuple);
 	enum mp_type a_type, b_type;
 	if (likely(part_count == 1)) {
 		const char *field;
 		if (is_multikey) {
 			field = tuple_field_raw_by_part(format, tuple_raw,
 							field_map, part,
-							(int)tuple_hint);
+							(int)tuple_hint,
+							tuple_is_tiny(tuple));
 		} else if (has_json_paths) {
 			field = tuple_field_raw_by_part(format, tuple_raw,
 							field_map, part,
-							MULTIKEY_NONE);
+							MULTIKEY_NONE,
+							tuple_is_tiny(tuple));
 		} else {
 			field = tuple_field_raw(format, tuple_raw, field_map,
-						part->fieldno);
+						part->fieldno,
+						tuple_is_tiny(tuple));
 		}
 		if (! is_nullable) {
 			return tuple_compare_field(field, key, part->type,
@@ -745,14 +760,16 @@ tuple_compare_with_key_slowpath(struct tuple *tuple, hint_t tuple_hint,
 		if (is_multikey) {
 			field = tuple_field_raw_by_part(format, tuple_raw,
 							field_map, part,
-							(int)tuple_hint);
+							(int)tuple_hint,
+							tuple_is_tiny(tuple));
 		} else if (has_json_paths) {
 			field = tuple_field_raw_by_part(format, tuple_raw,
 							field_map, part,
-							MULTIKEY_NONE);
+							MULTIKEY_NONE,
+							tuple_is_tiny(tuple));
 		} else {
 			field = tuple_field_raw(format, tuple_raw, field_map,
-						part->fieldno);
+						part->fieldno, tuple_is_tiny(tuple));
 		}
 		if (! is_nullable) {
 			rc = tuple_compare_field(field, key, part->type,
@@ -1062,10 +1079,10 @@ struct FieldCompare<IDX, TYPE, IDX2, TYPE2, MORE_TYPES...>
 				return r;
 			field_a = tuple_field_raw(format_a, tuple_data(tuple_a),
 						  tuple_field_map(tuple_a),
-						  IDX2);
+						  IDX2, tuple_is_tiny(tuple_a));
 			field_b = tuple_field_raw(format_b, tuple_data(tuple_b),
 						  tuple_field_map(tuple_b),
-						  IDX2);
+						  IDX2, tuple_is_tiny(tuple_b));
 		}
 		return FieldCompare<IDX2, TYPE2, MORE_TYPES...>::
 			compare(tuple_a, tuple_b, format_a,
@@ -1104,9 +1121,11 @@ struct TupleCompare
 		struct tuple_format *format_b = tuple_format(tuple_b);
 		const char *field_a, *field_b;
 		field_a = tuple_field_raw(format_a, tuple_data(tuple_a),
-					  tuple_field_map(tuple_a), IDX);
+					  tuple_field_map(tuple_a), IDX,
+					  tuple_is_tiny(tuple_a));
 		field_b = tuple_field_raw(format_b, tuple_data(tuple_b),
-					  tuple_field_map(tuple_b), IDX);
+					  tuple_field_map(tuple_b), IDX,
+					  tuple_is_tiny(tuple_b));
 		return FieldCompare<IDX, TYPE, MORE_TYPES...>::
 			compare(tuple_a, tuple_b, format_a,
 				format_b, field_a, field_b);
@@ -1248,7 +1267,8 @@ struct FieldCompareWithKey<FLD_ID, IDX, TYPE, IDX2, TYPE2, MORE_TYPES...>
 			if (r || part_count == FLD_ID + 1)
 				return r;
 			field = tuple_field_raw(format, tuple_data(tuple),
-						tuple_field_map(tuple), IDX2);
+						tuple_field_map(tuple), IDX2,
+						tuple_is_tiny(tuple));
 			mp_next(&key);
 		}
 		return FieldCompareWithKey<FLD_ID + 1, IDX2, TYPE2, MORE_TYPES...>::
@@ -1290,7 +1310,7 @@ struct TupleCompareWithKey
 		struct tuple_format *format = tuple_format(tuple);
 		const char *field = tuple_field_raw(format, tuple_data(tuple),
 						    tuple_field_map(tuple),
-						    IDX);
+						    IDX, tuple_is_tiny(tuple));
 		return FieldCompareWithKey<FLD_ID, IDX, TYPE, MORE_TYPES...>::
 				compare(tuple, key, part_count,
 					key_def, format, field);
@@ -1386,17 +1406,19 @@ func_index_compare(struct tuple *tuple_a, hint_t tuple_a_hint,
 	const char *tuple_b_raw = tuple_data(tuple_b);
 	struct tuple_format *format_a = tuple_format(tuple_a);
 	struct tuple_format *format_b = tuple_format(tuple_b);
-	const uint32_t *field_map_a = tuple_field_map(tuple_a);
-	const uint32_t *field_map_b = tuple_field_map(tuple_b);
+	const uint8_t *field_map_a = tuple_field_map(tuple_a);
+	const uint8_t *field_map_b = tuple_field_map(tuple_b);
 	const char *field_a, *field_b;
 	for (uint32_t i = key_part_count; i < cmp_def->part_count; i++) {
 		struct key_part *part = &cmp_def->parts[i];
 		field_a = tuple_field_raw_by_part(format_a, tuple_a_raw,
 						  field_map_a, part,
-						  MULTIKEY_NONE);
+						  MULTIKEY_NONE,
+						  tuple_is_tiny(tuple_a));
 		field_b = tuple_field_raw_by_part(format_b, tuple_b_raw,
 						  field_map_b, part,
-						  MULTIKEY_NONE);
+						  MULTIKEY_NONE,
+						  tuple_is_tiny(tuple_b));
 		assert(field_a != NULL && field_b != NULL);
 		rc = tuple_compare_field(field_a, field_b, part->type,
 					 part->coll);
diff --git a/src/box/tuple_extract_key.cc b/src/box/tuple_extract_key.cc
index 795dc6559..a61306fba 100644
--- a/src/box/tuple_extract_key.cc
+++ b/src/box/tuple_extract_key.cc
@@ -126,7 +126,7 @@ tuple_extract_key_slowpath(struct tuple *tuple, struct key_def *key_def,
 	uint32_t part_count = key_def->part_count;
 	uint32_t bsize = mp_sizeof_array(part_count);
 	struct tuple_format *format = tuple_format(tuple);
-	const uint32_t *field_map = tuple_field_map(tuple);
+	const uint8_t *field_map = tuple_field_map(tuple);
 	const char *tuple_end = data + tuple_bsize(tuple);
 
 	/* Calculate the key size. */
@@ -134,15 +134,18 @@ tuple_extract_key_slowpath(struct tuple *tuple, struct key_def *key_def,
 		const char *field;
 		if (!has_json_paths) {
 			field = tuple_field_raw(format, data, field_map,
-						key_def->parts[i].fieldno);
+						key_def->parts[i].fieldno,
+						tuple_is_tiny(tuple));
 		} else if (!is_multikey) {
 			field = tuple_field_raw_by_part(format, data, field_map,
 							&key_def->parts[i],
-							MULTIKEY_NONE);
+							MULTIKEY_NONE,
+							tuple_is_tiny(tuple));
 		} else {
 			field = tuple_field_raw_by_part(format, data, field_map,
 							&key_def->parts[i],
-							multikey_idx);
+							multikey_idx,
+							tuple_is_tiny(tuple));
 		}
 		if (has_optional_parts && field == NULL) {
 			bsize += mp_sizeof_nil();
@@ -186,15 +189,18 @@ tuple_extract_key_slowpath(struct tuple *tuple, struct key_def *key_def,
 		const char *field;
 		if (!has_json_paths) {
 			field = tuple_field_raw(format, data, field_map,
-						key_def->parts[i].fieldno);
+						key_def->parts[i].fieldno,
+						tuple_is_tiny(tuple));
 		} else if (!is_multikey) {
 			field = tuple_field_raw_by_part(format, data, field_map,
 							&key_def->parts[i],
-							MULTIKEY_NONE);
+							MULTIKEY_NONE,
+							tuple_is_tiny(tuple));
 		} else {
 			field = tuple_field_raw_by_part(format, data, field_map,
 							&key_def->parts[i],
-							multikey_idx);
+							multikey_idx,
+							tuple_is_tiny(tuple));
 		}
 		if (has_optional_parts && field == NULL) {
 			key_buf = mp_encode_nil(key_buf);
@@ -464,12 +470,13 @@ tuple_key_contains_null(struct tuple *tuple, struct key_def *def,
 {
 	struct tuple_format *format = tuple_format(tuple);
 	const char *data = tuple_data(tuple);
-	const uint32_t *field_map = tuple_field_map(tuple);
+	const uint8_t *field_map = tuple_field_map(tuple);
 	for (struct key_part *part = def->parts, *end = part + def->part_count;
 	     part < end; ++part) {
 		const char *field = tuple_field_raw_by_part(format, data,
 							    field_map, part,
-							    multikey_idx);
+							    multikey_idx,
+							    tuple_is_tiny(tuple));
 		if (field == NULL || mp_typeof(*field) == MP_NIL)
 			return true;
 	}
diff --git a/src/box/tuple_format.c b/src/box/tuple_format.c
index dca0fbddc..b2c4d1ea6 100644
--- a/src/box/tuple_format.c
+++ b/src/box/tuple_format.c
@@ -859,7 +859,8 @@ tuple_format_required_fields_validate(struct tuple_format *format,
 
 static int
 tuple_field_map_create_plain(struct tuple_format *format, const char *tuple,
-			     bool validate, struct field_map_builder *builder)
+			     bool validate, struct field_map_builder *builder,
+			     bool *is_tiny)
 {
 	struct region *region = &fiber()->gc;
 	const char *pos = tuple;
@@ -906,7 +907,7 @@ tuple_field_map_create_plain(struct tuple_format *format, const char *tuple,
 		if (field->offset_slot != TUPLE_OFFSET_SLOT_NIL &&
 		    field_map_builder_set_slot(builder, field->offset_slot,
 					       pos - tuple, MULTIKEY_NONE,
-					       0, NULL) != 0) {
+					       0, NULL, is_tiny) != 0) {
 			return -1;
 		}
 	}
@@ -922,7 +923,8 @@ end:
 /** @sa declaration for details. */
 int
 tuple_field_map_create(struct tuple_format *format, const char *tuple,
-		       bool validate, struct field_map_builder *builder)
+		       bool validate, struct field_map_builder *builder,
+		       bool *is_tiny)
 {
 	struct region *region = &fiber()->gc;
 	if (field_map_builder_create(builder, format->field_map_size,
@@ -937,7 +939,7 @@ tuple_field_map_create(struct tuple_format *format, const char *tuple,
 	 */
 	if (format->fields_depth == 1) {
 		return tuple_field_map_create_plain(format, tuple, validate,
-						    builder);
+						    builder, is_tiny);
 	}
 
 	uint32_t field_count;
@@ -954,7 +956,8 @@ tuple_field_map_create(struct tuple_format *format, const char *tuple,
 		if (entry.field->offset_slot != TUPLE_OFFSET_SLOT_NIL &&
 		    field_map_builder_set_slot(builder, entry.field->offset_slot,
 					entry.data - tuple, entry.multikey_idx,
-					entry.multikey_count, region) != 0)
+					entry.multikey_count, region,
+					is_tiny) != 0)
 			return -1;
 	}
 	return entry.data == NULL ? 0 : -1;
diff --git a/src/box/tuple_format.h b/src/box/tuple_format.h
index 021072d3d..08ebf0939 100644
--- a/src/box/tuple_format.h
+++ b/src/box/tuple_format.h
@@ -427,7 +427,8 @@ box_tuple_format_unref(box_tuple_format_t *format);
  */
 int
 tuple_field_map_create(struct tuple_format *format, const char *tuple,
-		       bool validate, struct field_map_builder *builder);
+		       bool validate, struct field_map_builder *builder,
+		       bool *is_tiny);
 
 /**
  * Initialize tuple format subsystem.
diff --git a/src/box/tuple_hash.cc b/src/box/tuple_hash.cc
index 39f89a659..235590f30 100644
--- a/src/box/tuple_hash.cc
+++ b/src/box/tuple_hash.cc
@@ -372,14 +372,15 @@ tuple_hash_slowpath(struct tuple *tuple, struct key_def *key_def)
 	uint32_t prev_fieldno = key_def->parts[0].fieldno;
 	struct tuple_format *format = tuple_format(tuple);
 	const char *tuple_raw = tuple_data(tuple);
-	const uint32_t *field_map = tuple_field_map(tuple);
+	const uint8_t *field_map = tuple_field_map(tuple);
 	const char *field;
 	if (has_json_paths) {
 		field = tuple_field_raw_by_part(format, tuple_raw, field_map,
-						key_def->parts, MULTIKEY_NONE);
+						key_def->parts, MULTIKEY_NONE,
+						tuple_is_tiny(tuple));
 	} else {
 		field = tuple_field_raw(format, tuple_raw, field_map,
-					prev_fieldno);
+					prev_fieldno, tuple_is_tiny(tuple));
 	}
 	const char *end = (char *)tuple + tuple_size(tuple);
 	if (has_optional_parts && field == NULL) {
@@ -398,10 +399,12 @@ tuple_hash_slowpath(struct tuple *tuple, struct key_def *key_def)
 			if (has_json_paths) {
 				field = tuple_field_raw_by_part(format, tuple_raw,
 								field_map, part,
-								MULTIKEY_NONE);
+								MULTIKEY_NONE,
+								tuple_is_tiny(tuple));
 			} else {
-				field = tuple_field_raw(format, tuple_raw, field_map,
-						    part->fieldno);
+				field = tuple_field_raw(format, tuple_raw,
+							field_map, part->fieldno,
+							tuple_is_tiny(tuple));
 			}
 		}
 		if (has_optional_parts && (field == NULL || field >= end)) {
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index 97bc1e408..6440c7d3f 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -345,9 +345,11 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
 	 * with tuple_validate() anyway.
 	 */
 	struct field_map_builder builder;
-	if (tuple_field_map_create(format, tuple_begin, false, &builder) != 0)
+	bool is_tiny = false;
+	if (tuple_field_map_create(format, tuple_begin, false, &builder,
+				   &is_tiny) != 0)
 		goto end;
-	uint32_t field_map_size = field_map_build_size(&builder);
+	uint32_t field_map_size = field_map_build_size(&builder, is_tiny);
 	/*
 	 * Allocate stmt. Offsets: one per key part + offset of the
 	 * statement end.
@@ -361,7 +363,7 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
 	/* Copy MsgPack data */
 	char *raw = (char *) tuple_data(stmt);
 	char *wpos = raw;
-	field_map_build(&builder, wpos - field_map_size);
+	field_map_build(&builder, wpos - field_map_size, is_tiny);
 	memcpy(wpos, tuple_begin, mpsize);
 	wpos += mpsize;
 	for (struct iovec *op = ops, *end = ops + op_count;
@@ -484,11 +486,12 @@ vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
 					    entry.field->token.len);
 		}
 		/* Initialize field_map with data offset. */
+		bool is_tiny = false;
 		uint32_t offset_slot = entry.field->offset_slot;
 		if (offset_slot != TUPLE_OFFSET_SLOT_NIL &&
 		    field_map_builder_set_slot(&builder, offset_slot,
-					pos - data, entry.multikey_idx,
-					entry.multikey_count, region) != 0)
+				pos - data, entry.multikey_idx,
+				entry.multikey_count, region, &is_tiny) != 0)
 			goto out;
 		/* Copy field data. */
 		if (entry.field->type == FIELD_TYPE_ARRAY) {
@@ -503,8 +506,9 @@ vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
 	if (entry.data != NULL)
 		goto out;
 	assert(pos <= data + src_size);
+	bool is_tiny = false;
 	uint32_t bsize = pos - data;
-	uint32_t field_map_size = field_map_build_size(&builder);
+	uint32_t field_map_size = field_map_build_size(&builder, is_tiny);
 	stmt = vy_stmt_alloc(format, size_of_struct_vy_stmt() + field_map_size,
 			     bsize);
 	if (stmt == NULL)
@@ -512,7 +516,7 @@ vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
 	char *stmt_data = (char *) tuple_data(stmt);
 	char *stmt_field_map_begin = stmt_data - field_map_size;
 	memcpy(stmt_data, data, bsize);
-	field_map_build(&builder, stmt_field_map_begin);
+	field_map_build(&builder, stmt_field_map_begin, is_tiny);
 	vy_stmt_set_type(stmt, IPROTO_DELETE);
 	mp_tuple_assert(stmt_data, stmt_data + bsize);
 out:
-- 
2.17.1



More information about the Tarantool-patches mailing list