[PATCH v1 1/1] box: rework tuple_init_field_map to allocate field_map

Kirill Shcherbatov kshcherbatov at tarantool.org
Tue Feb 19 16:26:02 MSK 2019


Due to the fact that in the case of multikey indexes, the size of
the field map may depend on a particular tuple, the
tuple_int_field_map function has been reworked in such a way as
to allocate the field map of the required size and return it.

Needed for #1257
---
 src/box/memtx_engine.c | 35 +++++++++++++++++----------
 src/box/tuple.c        | 49 ++++++++++++++++++++++----------------
 src/box/tuple.h        |  6 ++---
 src/box/tuple_format.c | 32 ++++++++++++++-----------
 src/box/tuple_format.h | 14 ++++++-----
 src/box/vy_stmt.c      | 54 ++++++++++++++++++++++++++----------------
 6 files changed, 113 insertions(+), 77 deletions(-)

diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 64f43456e..1d27f11ee 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -1124,18 +1124,31 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
 {
 	struct memtx_engine *memtx = (struct memtx_engine *)format->engine;
 	assert(mp_typeof(*data) == MP_ARRAY);
+	struct tuple *tuple = NULL;
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	uint32_t field_map_size = 0;
+	uint32_t *field_map = NULL;
+	if (tuple_format_field_count(format) > 0) {
+		field_map = tuple_field_map_create(format, data, true,
+						   &field_map_size, region);
+		if (field_map == NULL)
+			goto end;
+	} else {
+		assert(format->field_map_size == 0);
+	}
+
 	size_t tuple_len = end - data;
-	size_t total = sizeof(struct memtx_tuple) + format->field_map_size +
-		tuple_len;
+	size_t total = sizeof(struct memtx_tuple) + field_map_size + tuple_len;
 
 	ERROR_INJECT(ERRINJ_TUPLE_ALLOC, {
 		diag_set(OutOfMemory, total, "slab allocator", "memtx_tuple");
-		return NULL;
+		goto end;
 	});
 	if (unlikely(total > memtx->max_tuple_size)) {
 		diag_set(ClientError, ER_MEMTX_MAX_TUPLE_SIZE, total);
 		error_log(diag_last_error(diag_get()));
-		return NULL;
+		goto end;
 	}
 
 	struct memtx_tuple *memtx_tuple;
@@ -1147,9 +1160,9 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
 	}
 	if (memtx_tuple == NULL) {
 		diag_set(OutOfMemory, total, "slab allocator", "memtx_tuple");
-		return NULL;
+		goto end;
 	}
-	struct tuple *tuple = &memtx_tuple->base;
+	tuple = &memtx_tuple->base;
 	tuple->refs = 0;
 	memtx_tuple->version = memtx->snapshot_version;
 	assert(tuple_len <= UINT32_MAX); /* bsize is UINT32_MAX */
@@ -1161,15 +1174,13 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
 	 * tuple base, not from memtx_tuple, because the struct
 	 * tuple is not the first field of the memtx_tuple.
 	 */
-	tuple->data_offset = sizeof(struct tuple) + format->field_map_size;
+	tuple->data_offset = sizeof(struct tuple) + field_map_size;
 	char *raw = (char *) tuple + tuple->data_offset;
-	uint32_t *field_map = (uint32_t *) raw;
+	memcpy(raw - field_map_size, field_map, field_map_size);
 	memcpy(raw, data, tuple_len);
-	if (tuple_init_field_map(format, field_map, raw, true)) {
-		memtx_tuple_delete(format, tuple);
-		return NULL;
-	}
 	say_debug("%s(%zu) = %p", __func__, tuple_len, memtx_tuple);
+end:
+	region_truncate(region, region_svp);
 	return tuple;
 }
 
diff --git a/src/box/tuple.c b/src/box/tuple.c
index 0770db66b..011fff64b 100644
--- a/src/box/tuple.c
+++ b/src/box/tuple.c
@@ -76,30 +76,40 @@ runtime_tuple_new(struct tuple_format *format, const char *data, const char *end
 	assert(format->vtab.tuple_delete == tuple_format_runtime_vtab.tuple_delete);
 
 	mp_tuple_assert(data, end);
-	size_t data_len = end - data;
-	size_t total = sizeof(struct tuple) + format->field_map_size +
-		data_len;
+	struct tuple *tuple = NULL;
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	uint32_t field_map_size = 0;
+	uint32_t *field_map = NULL;
+	if (tuple_format_field_count(format) > 0) {
+		field_map = tuple_field_map_create(format, data, true,
+						   &field_map_size, region);
+		if (field_map == NULL)
+			goto end;
+	} else {
+		assert(format->field_map_size == 0);
+	}
 
-	struct tuple *tuple = (struct tuple *) smalloc(&runtime_alloc, total);
+	size_t data_len = end - data;
+	size_t total = sizeof(struct tuple) + field_map_size + data_len;
+	tuple = (struct tuple *) smalloc(&runtime_alloc, total);
 	if (tuple == NULL) {
 		diag_set(OutOfMemory, (unsigned) total,
 			 "malloc", "tuple");
-		return NULL;
+		goto end;
 	}
 
 	tuple->refs = 0;
 	tuple->bsize = data_len;
 	tuple->format_id = tuple_format_id(format);
 	tuple_format_ref(format);
-	tuple->data_offset = sizeof(struct tuple) + format->field_map_size;
+	tuple->data_offset = sizeof(struct tuple) + field_map_size;
 	char *raw = (char *) tuple + tuple->data_offset;
-	uint32_t *field_map = (uint32_t *) raw;
+	memcpy(raw - field_map_size, field_map, field_map_size);
 	memcpy(raw, data, data_len);
-	if (tuple_init_field_map(format, field_map, raw, true)) {
-		runtime_tuple_delete(format, tuple);
-		return NULL;
-	}
 	say_debug("%s(%zu) = %p", __func__, data_len, tuple);
+end:
+	region_truncate(region, region_svp);
 	return tuple;
 }
 
@@ -123,17 +133,14 @@ tuple_validate_raw(struct tuple_format *format, const char *tuple)
 
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
-	uint32_t *field_map = region_alloc(region, format->field_map_size);
-	if (field_map == NULL) {
-		diag_set(OutOfMemory, format->field_map_size, "region_alloc",
-			 "field_map");
-		return -1;
-	}
-	field_map = (uint32_t *)((char *)field_map + format->field_map_size);
-	if (tuple_init_field_map(format, field_map, tuple, true) != 0)
-		return -1;
+	int rc = 0;
+	uint32_t field_map_size;
+	if (tuple_field_map_create(format, tuple, true, &field_map_size,
+				   region) == NULL)
+		rc = -1;
+	assert(rc != 0 || field_map_size == format->field_map_size);
 	region_truncate(region, region_svp);
-	return 0;
+	return rc;
 }
 
 /**
diff --git a/src/box/tuple.h b/src/box/tuple.h
index e803260cb..b17f3e8f9 100644
--- a/src/box/tuple.h
+++ b/src/box/tuple.h
@@ -450,7 +450,7 @@ tuple_delete(struct tuple *tuple)
 
 /**
  * Check tuple data correspondence to space format.
- * Actually checks everything that checks tuple_init_field_map.
+ * Actually checks everything that checks tuple_field_map_create.
  * @param format Format to which the tuple must match.
  * @param tuple  MessagePack array.
  *
@@ -478,7 +478,7 @@ tuple_validate(struct tuple_format *format, struct tuple *tuple)
  * Return a field map for the tuple.
  * @param tuple tuple
  * @returns a field map for the tuple.
- * @sa tuple_init_field_map()
+ * @sa tuple_field_map_create()
  */
 static inline const uint32_t *
 tuple_field_map(const struct tuple *tuple)
@@ -586,7 +586,7 @@ parse:
  * @param field_no the index of field to return
  *
  * @returns field data if field exists or NULL
- * @sa tuple_init_field_map()
+ * @sa tuple_field_map_create()
  */
 static inline const char *
 tuple_field_raw(struct tuple_format *format, const char *tuple,
diff --git a/src/box/tuple_format.c b/src/box/tuple_format.c
index b26b367a1..445bc725c 100644
--- a/src/box/tuple_format.c
+++ b/src/box/tuple_format.c
@@ -801,18 +801,22 @@ tuple_format1_can_store_format2_tuples(struct tuple_format *format1,
 }
 
 /** @sa declaration for details. */
-int
-tuple_init_field_map(struct tuple_format *format, uint32_t *field_map,
-		     const char *tuple, bool validate)
+uint32_t *
+tuple_field_map_create(struct tuple_format *format, const char *tuple,
+		       bool validate, uint32_t *field_map_size,
+		       struct region *region)
 {
-	if (tuple_format_field_count(format) == 0)
-		return 0; /* Nothing to initialize */
+	assert(tuple_format_field_count(format) > 0);
+	*field_map_size = format->field_map_size;
+	uint32_t *field_map = region_alloc(region, *field_map_size);
+	if (field_map == NULL) {
+		diag_set(OutOfMemory, *field_map_size, "region_alloc",
+			 "field_map");
+		goto error;
+	}
+	field_map = (uint32_t *)((char *)field_map + *field_map_size);
 
-	struct region *region = &fiber()->gc;
-	size_t region_svp = region_used(region);
 	const char *pos = tuple;
-	int rc = 0;
-
 	/* Check to see if the tuple has a sufficient number of fields. */
 	uint32_t field_count = mp_decode_array(&pos);
 	if (validate && format->exact_field_count > 0 &&
@@ -853,8 +857,7 @@ tuple_init_field_map(struct tuple_format *format, uint32_t *field_map,
 	 * Nullify field map to be able to detect by 0,
 	 * which key fields are absent in tuple_field().
 	 */
-	memset((char *)field_map - format->field_map_size, 0,
-		format->field_map_size);
+	memset((char *)field_map - *field_map_size, 0, *field_map_size);
 	/*
 	 * Prepare mp stack of the size equal to the maximum depth
 	 * of the indexed field in the format::fields tree
@@ -978,10 +981,11 @@ finish:
 		}
 	}
 out:
-	region_truncate(region, region_svp);
-	return rc;
+	if (likely(field_map != NULL))
+		field_map = (uint32_t *)((char *)field_map - *field_map_size);
+	return field_map;
 error:
-	rc = -1;
+	field_map = NULL;
 	goto out;
 }
 
diff --git a/src/box/tuple_format.h b/src/box/tuple_format.h
index f4e142d53..9d8647c20 100644
--- a/src/box/tuple_format.h
+++ b/src/box/tuple_format.h
@@ -386,12 +386,13 @@ box_tuple_format_unref(box_tuple_format_t *format);
 /** \endcond public */
 
 /**
- * Fill the field map of tuple with field offsets.
+ * Allocate the field map of tuple with field offsets on region.
  * @param format    Tuple format.
- * @param field_map A pointer behind the last element of the field
- *                  map.
  * @param tuple     MessagePack array.
  * @param validate  If set, validate the tuple against the format.
+ * @param field_map_size[out] The pointer to variable to store
+ *                            field map size.
+ * @param region    The region to use to perform allocation.
  *
  * @retval  0 Success.
  * @retval -1 Format error.
@@ -402,9 +403,10 @@ box_tuple_format_unref(box_tuple_format_t *format);
  *                             field_map
  * tuple + off_i = indexed_field_i;
  */
-int
-tuple_init_field_map(struct tuple_format *format, uint32_t *field_map,
-		     const char *tuple, bool validate);
+uint32_t *
+tuple_field_map_create(struct tuple_format *format, const char *tuple,
+		       bool validate, uint32_t *field_map_size,
+		       struct region *region);
 
 /**
  * Initialize tuple format subsystem.
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index af5c64086..c11f8a892 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -286,18 +286,46 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
 	for (int i = 0; i < op_count; ++i)
 		ops_size += ops[i].iov_len;
 
+	struct tuple *stmt = NULL;
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	/*
+	 * Calculate offsets for key parts.
+	 *
+	 * Note, an overwritten statement loaded from a primary
+	 * index run file may not conform to the current format
+	 * in case the space was altered (e.g. a new field was
+	 * added which is missing in a deleted tuple). Although
+	 * we should never return such statements to the user,
+	 * we may still need to decode them while iterating over
+	 * a run so we skip tuple validation here. This is OK as
+	 * tuples inserted into a space are validated explicitly
+	 * with tuple_validate() anyway.
+	 */
+	uint32_t field_map_size = 0;
+	uint32_t *field_map = NULL;
+	if (tuple_format_field_count(format) > 0) {
+		field_map = tuple_field_map_create(format, tuple_begin, false,
+						   &field_map_size, region);
+		if (field_map == NULL)
+			goto end;
+		assert(field_map_size == format->field_map_size);
+	} else {
+		assert(format->field_map_size == 0);
+	}
 	/*
 	 * Allocate stmt. Offsets: one per key part + offset of the
 	 * statement end.
 	 */
 	size_t mpsize = (tuple_end - tuple_begin);
 	size_t bsize = mpsize + ops_size;
-	struct tuple *stmt = vy_stmt_alloc(format, bsize);
+	stmt = vy_stmt_alloc(format, bsize);
 	if (stmt == NULL)
-		return NULL;
+		goto end;
 	/* Copy MsgPack data */
 	char *raw = (char *) tuple_data(stmt);
 	char *wpos = raw;
+	memcpy(wpos - field_map_size, field_map, field_map_size);
 	memcpy(wpos, tuple_begin, mpsize);
 	wpos += mpsize;
 	for (struct iovec *op = ops, *end = ops + op_count;
@@ -306,24 +334,8 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
 		wpos += op->iov_len;
 	}
 	vy_stmt_set_type(stmt, type);
-
-	/*
-	 * Calculate offsets for key parts.
-	 *
-	 * Note, an overwritten statement loaded from a primary
-	 * index run file may not conform to the current format
-	 * in case the space was altered (e.g. a new field was
-	 * added which is missing in a deleted tuple). Although
-	 * we should never return such statements to the user,
-	 * we may still need to decode them while iterating over
-	 * a run so we skip tuple validation here. This is OK as
-	 * tuples inserted into a space are validated explicitly
-	 * with tuple_validate() anyway.
-	 */
-	if (tuple_init_field_map(format, (uint32_t *) raw, raw, false)) {
-		tuple_unref(stmt);
-		return NULL;
-	}
+end:
+	region_truncate(region, region_svp);
 	return stmt;
 }
 
@@ -523,7 +535,7 @@ vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
 	 * Perform simultaneous parsing of the tuple and
 	 * format::fields tree traversal to copy indexed field
 	 * data and initialize field map. In many details the code
-	 * above works like tuple_init_field_map, read it's
+	 * above works like tuple_field_map_create, read it's
 	 * comments for more details.
 	 */
 	uint32_t frames_sz = format->fields_depth * sizeof(struct mp_frame);
-- 
2.20.1




More information about the Tarantool-patches mailing list