[PATCH 5/7] vinyl: use field_map_builder for constructing stmt field map

Vladimir Davydov vdavydov.dev at gmail.com
Wed May 8 20:22:37 MSK 2019


Currently, we construct a field map for a vinyl surrogate DELETE
statement by hand, which works fine as long as field maps don't have
extents. Once multikey indexes are introduced, there will be extents
hence we must switch to field_map_builder.
---
 src/box/vy_stmt.c | 60 ++++++++++++++++++++++++++++---------------------------
 1 file changed, 31 insertions(+), 29 deletions(-)

diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index ddad26f5..170c258a 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -146,18 +146,19 @@ vy_stmt_format_new(struct vy_stmt_env *env, struct key_def *const *keys,
  * Allocate a vinyl statement object on base of the struct tuple
  * with malloc() and the reference counter equal to 1.
  * @param format Format of an index.
- * @param size   Size of the variable part of the statement. It
+ * @param data_offset Offset of MessagePack data within the tuple.
+ * @param bsize  Size of the variable part of the statement. It
  *               includes size of MessagePack tuple data and, for
  *               upserts, MessagePack array of operations.
  * @retval not NULL Success.
  * @retval     NULL Memory error.
  */
 static struct tuple *
-vy_stmt_alloc(struct tuple_format *format, uint32_t bsize)
+vy_stmt_alloc(struct tuple_format *format, uint32_t data_offset, uint32_t bsize)
 {
+	assert(data_offset >= sizeof(struct vy_stmt) + format->field_map_size);
 	struct vy_stmt_env *env = format->engine;
-	uint32_t total_size = sizeof(struct vy_stmt) + format->field_map_size +
-		bsize;
+	uint32_t total_size = data_offset + bsize;
 	if (unlikely(total_size > env->max_tuple_size)) {
 		diag_set(ClientError, ER_VINYL_MAX_TUPLE_SIZE,
 			 (unsigned) total_size);
@@ -169,14 +170,14 @@ vy_stmt_alloc(struct tuple_format *format, uint32_t bsize)
 		diag_set(OutOfMemory, total_size, "malloc", "struct vy_stmt");
 		return NULL;
 	}
-	say_debug("vy_stmt_alloc(format = %d %u, bsize = %zu) = %p",
-		format->id, format->field_map_size, bsize, tuple);
+	say_debug("vy_stmt_alloc(format = %d data_offset = %u, bsize = %u) = %p",
+		  format->id, data_offset, bsize, tuple);
 	tuple->refs = 1;
 	tuple->format_id = tuple_format_id(format);
 	if (cord_is_main())
 		tuple_format_ref(format);
 	tuple->bsize = bsize;
-	tuple->data_offset = sizeof(struct vy_stmt) + format->field_map_size;
+	tuple->data_offset = data_offset;
 	vy_stmt_set_lsn(tuple, 0);
 	vy_stmt_set_type(tuple, 0);
 	vy_stmt_set_flags(tuple, 0);
@@ -191,7 +192,8 @@ vy_stmt_dup(struct tuple *stmt)
 	 * tuple field map. This map can be simple memcopied from
 	 * the original tuple.
 	 */
-	struct tuple *res = vy_stmt_alloc(tuple_format(stmt), stmt->bsize);
+	struct tuple *res = vy_stmt_alloc(tuple_format(stmt),
+					  stmt->data_offset, stmt->bsize);
 	if (res == NULL)
 		return NULL;
 	assert(tuple_size(res) == tuple_size(stmt));
@@ -253,7 +255,7 @@ vy_key_new(struct tuple_format *format, const char *key, uint32_t part_count)
 	/* Allocate stmt */
 	uint32_t key_size = key_end - key;
 	uint32_t bsize = mp_sizeof_array(part_count) + key_size;
-	struct tuple *stmt = vy_stmt_alloc(format, bsize);
+	struct tuple *stmt = vy_stmt_alloc(format, sizeof(struct vy_stmt), bsize);
 	if (stmt == NULL)
 		return NULL;
 	/* Copy MsgPack data */
@@ -318,14 +320,14 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
 	if (tuple_field_map_create(format, tuple_begin, false, &builder) != 0)
 		goto end;
 	uint32_t field_map_size = field_map_build_size(&builder);
-	assert(field_map_size == format->field_map_size);
 	/*
 	 * Allocate stmt. Offsets: one per key part + offset of the
 	 * statement end.
 	 */
 	size_t mpsize = (tuple_end - tuple_begin);
 	size_t bsize = mpsize + ops_size;
-	stmt = vy_stmt_alloc(format, bsize);
+	stmt = vy_stmt_alloc(format, sizeof(struct vy_stmt) +
+			     field_map_size, bsize);
 	if (stmt == NULL)
 		goto end;
 	/* Copy MsgPack data */
@@ -389,13 +391,13 @@ vy_stmt_replace_from_upsert(struct tuple *upsert)
 
 	/* Copy statement data excluding UPSERT operations */
 	struct tuple_format *format = tuple_format(upsert);
-	struct tuple *replace = vy_stmt_alloc(format, bsize);
+	struct tuple *replace = vy_stmt_alloc(format, upsert->data_offset, bsize);
 	if (replace == NULL)
 		return NULL;
 	/* Copy both data and field_map. */
 	char *dst = (char *)replace + sizeof(struct vy_stmt);
 	char *src = (char *)upsert + sizeof(struct vy_stmt);
-	memcpy(dst, src, format->field_map_size + bsize);
+	memcpy(dst, src, upsert->data_offset + bsize - sizeof(struct vy_stmt));
 	vy_stmt_set_type(replace, IPROTO_REPLACE);
 	vy_stmt_set_lsn(replace, vy_stmt_lsn(upsert));
 	return replace;
@@ -407,17 +409,18 @@ vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
 {
 	struct tuple *stmt = NULL;
 	uint32_t src_size = src_data_end - src_data;
-	uint32_t total_size = src_size + format->field_map_size;
 	/* Surrogate tuple uses less memory than the original tuple */
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
-	char *data = region_alloc(region, total_size);
+	char *data = region_alloc(region, src_size);
 	if (data == NULL) {
 		diag_set(OutOfMemory, src_size, "region", "tuple");
 		return NULL;
 	}
-	char *field_map_begin = data + src_size;
-	uint32_t *field_map = (uint32_t *) (data + total_size);
+	struct field_map_builder builder;
+	if (field_map_builder_create(&builder, format->field_map_size,
+				     region) != 0)
+		goto out;
 	/*
 	 * Perform simultaneous parsing of the tuple and
 	 * format::fields tree traversal to copy indexed field
@@ -429,14 +432,7 @@ vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
 			TUPLE_FORMAT_ITERATOR_KEY_PARTS_ONLY, &field_count,
 			region) != 0)
 		goto out;
-	/*
-	 * Nullify field map to be able to detect by 0, which key
-	 * fields are absent in tuple_field().
-	 */
-	memset((char *)field_map - format->field_map_size, 0,
-		format->field_map_size);
 	char *pos = mp_encode_array(data, field_count);
-
 	struct tuple_format_iterator_entry entry;
 	while (tuple_format_iterator_next(&it, &entry) == 0 &&
 	       entry.data != NULL) {
@@ -457,8 +453,12 @@ vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
 					    entry.field->token.len);
 		}
 		/* Initialize field_map with data offset. */
-		if (entry.field->offset_slot != TUPLE_OFFSET_SLOT_NIL)
-			field_map[entry.field->offset_slot] = pos - data;
+		uint32_t offset_slot = entry.field->offset_slot;
+		if (offset_slot != TUPLE_OFFSET_SLOT_NIL &&
+		    field_map_builder_set_slot(&builder, offset_slot,
+					pos - data, entry.multikey_idx,
+					entry.multikey_count, region) != 0)
+			goto out;
 		/* Copy field data. */
 		if (entry.field->type == FIELD_TYPE_ARRAY) {
 			pos = mp_encode_array(pos, entry.count);
@@ -473,13 +473,15 @@ vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
 		goto out;
 	assert(pos <= data + src_size);
 	uint32_t bsize = pos - data;
-	stmt = vy_stmt_alloc(format, bsize);
+	uint32_t field_map_size = field_map_build_size(&builder);
+	stmt = vy_stmt_alloc(format, sizeof(struct vy_stmt) + field_map_size,
+			     bsize);
 	if (stmt == NULL)
 		goto out;
 	char *stmt_data = (char *) tuple_data(stmt);
-	char *stmt_field_map_begin = stmt_data - format->field_map_size;
+	char *stmt_field_map_begin = stmt_data - field_map_size;
 	memcpy(stmt_data, data, bsize);
-	memcpy(stmt_field_map_begin, field_map_begin, format->field_map_size);
+	field_map_build(&builder, stmt_field_map_begin);
 	vy_stmt_set_type(stmt, IPROTO_DELETE);
 	mp_tuple_assert(stmt_data, stmt_data + bsize);
 out:
-- 
2.11.0




More information about the Tarantool-patches mailing list