[PATCH v1 1/1] box: rework tuple_init_field_map to allocate field_map
Kirill Shcherbatov
kshcherbatov at tarantool.org
Tue Feb 19 16:26:02 MSK 2019
Due to the fact that in the case of multikey indexes, the size of
the field map may depend on a particular tuple, the
tuple_int_field_map function has been reworked in such a way as
to allocate the field map of the required size and return it.
Needed for #1257
---
src/box/memtx_engine.c | 35 +++++++++++++++++----------
src/box/tuple.c | 49 ++++++++++++++++++++++----------------
src/box/tuple.h | 6 ++---
src/box/tuple_format.c | 32 ++++++++++++++-----------
src/box/tuple_format.h | 14 ++++++-----
src/box/vy_stmt.c | 54 ++++++++++++++++++++++++++----------------
6 files changed, 113 insertions(+), 77 deletions(-)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 64f43456e..1d27f11ee 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -1124,18 +1124,31 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
{
struct memtx_engine *memtx = (struct memtx_engine *)format->engine;
assert(mp_typeof(*data) == MP_ARRAY);
+ struct tuple *tuple = NULL;
+ struct region *region = &fiber()->gc;
+ size_t region_svp = region_used(region);
+ uint32_t field_map_size = 0;
+ uint32_t *field_map = NULL;
+ if (tuple_format_field_count(format) > 0) {
+ field_map = tuple_field_map_create(format, data, true,
+ &field_map_size, region);
+ if (field_map == NULL)
+ goto end;
+ } else {
+ assert(format->field_map_size == 0);
+ }
+
size_t tuple_len = end - data;
- size_t total = sizeof(struct memtx_tuple) + format->field_map_size +
- tuple_len;
+ size_t total = sizeof(struct memtx_tuple) + field_map_size + tuple_len;
ERROR_INJECT(ERRINJ_TUPLE_ALLOC, {
diag_set(OutOfMemory, total, "slab allocator", "memtx_tuple");
- return NULL;
+ goto end;
});
if (unlikely(total > memtx->max_tuple_size)) {
diag_set(ClientError, ER_MEMTX_MAX_TUPLE_SIZE, total);
error_log(diag_last_error(diag_get()));
- return NULL;
+ goto end;
}
struct memtx_tuple *memtx_tuple;
@@ -1147,9 +1160,9 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
}
if (memtx_tuple == NULL) {
diag_set(OutOfMemory, total, "slab allocator", "memtx_tuple");
- return NULL;
+ goto end;
}
- struct tuple *tuple = &memtx_tuple->base;
+ tuple = &memtx_tuple->base;
tuple->refs = 0;
memtx_tuple->version = memtx->snapshot_version;
assert(tuple_len <= UINT32_MAX); /* bsize is UINT32_MAX */
@@ -1161,15 +1174,13 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
* tuple base, not from memtx_tuple, because the struct
* tuple is not the first field of the memtx_tuple.
*/
- tuple->data_offset = sizeof(struct tuple) + format->field_map_size;
+ tuple->data_offset = sizeof(struct tuple) + field_map_size;
char *raw = (char *) tuple + tuple->data_offset;
- uint32_t *field_map = (uint32_t *) raw;
+ memcpy(raw - field_map_size, field_map, field_map_size);
memcpy(raw, data, tuple_len);
- if (tuple_init_field_map(format, field_map, raw, true)) {
- memtx_tuple_delete(format, tuple);
- return NULL;
- }
say_debug("%s(%zu) = %p", __func__, tuple_len, memtx_tuple);
+end:
+ region_truncate(region, region_svp);
return tuple;
}
diff --git a/src/box/tuple.c b/src/box/tuple.c
index 0770db66b..011fff64b 100644
--- a/src/box/tuple.c
+++ b/src/box/tuple.c
@@ -76,30 +76,40 @@ runtime_tuple_new(struct tuple_format *format, const char *data, const char *end
assert(format->vtab.tuple_delete == tuple_format_runtime_vtab.tuple_delete);
mp_tuple_assert(data, end);
- size_t data_len = end - data;
- size_t total = sizeof(struct tuple) + format->field_map_size +
- data_len;
+ struct tuple *tuple = NULL;
+ struct region *region = &fiber()->gc;
+ size_t region_svp = region_used(region);
+ uint32_t field_map_size = 0;
+ uint32_t *field_map = NULL;
+ if (tuple_format_field_count(format) > 0) {
+ field_map = tuple_field_map_create(format, data, true,
+ &field_map_size, region);
+ if (field_map == NULL)
+ goto end;
+ } else {
+ assert(format->field_map_size == 0);
+ }
- struct tuple *tuple = (struct tuple *) smalloc(&runtime_alloc, total);
+ size_t data_len = end - data;
+ size_t total = sizeof(struct tuple) + field_map_size + data_len;
+ tuple = (struct tuple *) smalloc(&runtime_alloc, total);
if (tuple == NULL) {
diag_set(OutOfMemory, (unsigned) total,
"malloc", "tuple");
- return NULL;
+ goto end;
}
tuple->refs = 0;
tuple->bsize = data_len;
tuple->format_id = tuple_format_id(format);
tuple_format_ref(format);
- tuple->data_offset = sizeof(struct tuple) + format->field_map_size;
+ tuple->data_offset = sizeof(struct tuple) + field_map_size;
char *raw = (char *) tuple + tuple->data_offset;
- uint32_t *field_map = (uint32_t *) raw;
+ memcpy(raw - field_map_size, field_map, field_map_size);
memcpy(raw, data, data_len);
- if (tuple_init_field_map(format, field_map, raw, true)) {
- runtime_tuple_delete(format, tuple);
- return NULL;
- }
say_debug("%s(%zu) = %p", __func__, data_len, tuple);
+end:
+ region_truncate(region, region_svp);
return tuple;
}
@@ -123,17 +133,14 @@ tuple_validate_raw(struct tuple_format *format, const char *tuple)
struct region *region = &fiber()->gc;
size_t region_svp = region_used(region);
- uint32_t *field_map = region_alloc(region, format->field_map_size);
- if (field_map == NULL) {
- diag_set(OutOfMemory, format->field_map_size, "region_alloc",
- "field_map");
- return -1;
- }
- field_map = (uint32_t *)((char *)field_map + format->field_map_size);
- if (tuple_init_field_map(format, field_map, tuple, true) != 0)
- return -1;
+ int rc = 0;
+ uint32_t field_map_size;
+ if (tuple_field_map_create(format, tuple, true, &field_map_size,
+ region) == NULL)
+ rc = -1;
+ assert(rc != 0 || field_map_size == format->field_map_size);
region_truncate(region, region_svp);
- return 0;
+ return rc;
}
/**
diff --git a/src/box/tuple.h b/src/box/tuple.h
index e803260cb..b17f3e8f9 100644
--- a/src/box/tuple.h
+++ b/src/box/tuple.h
@@ -450,7 +450,7 @@ tuple_delete(struct tuple *tuple)
/**
* Check tuple data correspondence to space format.
- * Actually checks everything that checks tuple_init_field_map.
+ * Actually checks everything that checks tuple_field_map_create.
* @param format Format to which the tuple must match.
* @param tuple MessagePack array.
*
@@ -478,7 +478,7 @@ tuple_validate(struct tuple_format *format, struct tuple *tuple)
* Return a field map for the tuple.
* @param tuple tuple
* @returns a field map for the tuple.
- * @sa tuple_init_field_map()
+ * @sa tuple_field_map_create()
*/
static inline const uint32_t *
tuple_field_map(const struct tuple *tuple)
@@ -586,7 +586,7 @@ parse:
* @param field_no the index of field to return
*
* @returns field data if field exists or NULL
- * @sa tuple_init_field_map()
+ * @sa tuple_field_map_create()
*/
static inline const char *
tuple_field_raw(struct tuple_format *format, const char *tuple,
diff --git a/src/box/tuple_format.c b/src/box/tuple_format.c
index b26b367a1..445bc725c 100644
--- a/src/box/tuple_format.c
+++ b/src/box/tuple_format.c
@@ -801,18 +801,22 @@ tuple_format1_can_store_format2_tuples(struct tuple_format *format1,
}
/** @sa declaration for details. */
-int
-tuple_init_field_map(struct tuple_format *format, uint32_t *field_map,
- const char *tuple, bool validate)
+uint32_t *
+tuple_field_map_create(struct tuple_format *format, const char *tuple,
+ bool validate, uint32_t *field_map_size,
+ struct region *region)
{
- if (tuple_format_field_count(format) == 0)
- return 0; /* Nothing to initialize */
+ assert(tuple_format_field_count(format) > 0);
+ *field_map_size = format->field_map_size;
+ uint32_t *field_map = region_alloc(region, *field_map_size);
+ if (field_map == NULL) {
+ diag_set(OutOfMemory, *field_map_size, "region_alloc",
+ "field_map");
+ goto error;
+ }
+ field_map = (uint32_t *)((char *)field_map + *field_map_size);
- struct region *region = &fiber()->gc;
- size_t region_svp = region_used(region);
const char *pos = tuple;
- int rc = 0;
-
/* Check to see if the tuple has a sufficient number of fields. */
uint32_t field_count = mp_decode_array(&pos);
if (validate && format->exact_field_count > 0 &&
@@ -853,8 +857,7 @@ tuple_init_field_map(struct tuple_format *format, uint32_t *field_map,
* Nullify field map to be able to detect by 0,
* which key fields are absent in tuple_field().
*/
- memset((char *)field_map - format->field_map_size, 0,
- format->field_map_size);
+ memset((char *)field_map - *field_map_size, 0, *field_map_size);
/*
* Prepare mp stack of the size equal to the maximum depth
* of the indexed field in the format::fields tree
@@ -978,10 +981,11 @@ finish:
}
}
out:
- region_truncate(region, region_svp);
- return rc;
+ if (likely(field_map != NULL))
+ field_map = (uint32_t *)((char *)field_map - *field_map_size);
+ return field_map;
error:
- rc = -1;
+ field_map = NULL;
goto out;
}
diff --git a/src/box/tuple_format.h b/src/box/tuple_format.h
index f4e142d53..9d8647c20 100644
--- a/src/box/tuple_format.h
+++ b/src/box/tuple_format.h
@@ -386,12 +386,13 @@ box_tuple_format_unref(box_tuple_format_t *format);
/** \endcond public */
/**
- * Fill the field map of tuple with field offsets.
+ * Allocate the field map of tuple with field offsets on region.
* @param format Tuple format.
- * @param field_map A pointer behind the last element of the field
- * map.
* @param tuple MessagePack array.
* @param validate If set, validate the tuple against the format.
+ * @param field_map_size[out] The pointer to variable to store
+ * field map size.
+ * @param region The region to use to perform allocation.
*
* @retval 0 Success.
* @retval -1 Format error.
@@ -402,9 +403,10 @@ box_tuple_format_unref(box_tuple_format_t *format);
* field_map
* tuple + off_i = indexed_field_i;
*/
-int
-tuple_init_field_map(struct tuple_format *format, uint32_t *field_map,
- const char *tuple, bool validate);
+uint32_t *
+tuple_field_map_create(struct tuple_format *format, const char *tuple,
+ bool validate, uint32_t *field_map_size,
+ struct region *region);
/**
* Initialize tuple format subsystem.
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index af5c64086..c11f8a892 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -286,18 +286,46 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
for (int i = 0; i < op_count; ++i)
ops_size += ops[i].iov_len;
+ struct tuple *stmt = NULL;
+ struct region *region = &fiber()->gc;
+ size_t region_svp = region_used(region);
+ /*
+ * Calculate offsets for key parts.
+ *
+ * Note, an overwritten statement loaded from a primary
+ * index run file may not conform to the current format
+ * in case the space was altered (e.g. a new field was
+ * added which is missing in a deleted tuple). Although
+ * we should never return such statements to the user,
+ * we may still need to decode them while iterating over
+ * a run so we skip tuple validation here. This is OK as
+ * tuples inserted into a space are validated explicitly
+ * with tuple_validate() anyway.
+ */
+ uint32_t field_map_size = 0;
+ uint32_t *field_map = NULL;
+ if (tuple_format_field_count(format) > 0) {
+ field_map = tuple_field_map_create(format, tuple_begin, false,
+ &field_map_size, region);
+ if (field_map == NULL)
+ goto end;
+ assert(field_map_size == format->field_map_size);
+ } else {
+ assert(format->field_map_size == 0);
+ }
/*
* Allocate stmt. Offsets: one per key part + offset of the
* statement end.
*/
size_t mpsize = (tuple_end - tuple_begin);
size_t bsize = mpsize + ops_size;
- struct tuple *stmt = vy_stmt_alloc(format, bsize);
+ stmt = vy_stmt_alloc(format, bsize);
if (stmt == NULL)
- return NULL;
+ goto end;
/* Copy MsgPack data */
char *raw = (char *) tuple_data(stmt);
char *wpos = raw;
+ memcpy(wpos - field_map_size, field_map, field_map_size);
memcpy(wpos, tuple_begin, mpsize);
wpos += mpsize;
for (struct iovec *op = ops, *end = ops + op_count;
@@ -306,24 +334,8 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
wpos += op->iov_len;
}
vy_stmt_set_type(stmt, type);
-
- /*
- * Calculate offsets for key parts.
- *
- * Note, an overwritten statement loaded from a primary
- * index run file may not conform to the current format
- * in case the space was altered (e.g. a new field was
- * added which is missing in a deleted tuple). Although
- * we should never return such statements to the user,
- * we may still need to decode them while iterating over
- * a run so we skip tuple validation here. This is OK as
- * tuples inserted into a space are validated explicitly
- * with tuple_validate() anyway.
- */
- if (tuple_init_field_map(format, (uint32_t *) raw, raw, false)) {
- tuple_unref(stmt);
- return NULL;
- }
+end:
+ region_truncate(region, region_svp);
return stmt;
}
@@ -523,7 +535,7 @@ vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
* Perform simultaneous parsing of the tuple and
* format::fields tree traversal to copy indexed field
* data and initialize field map. In many details the code
- * above works like tuple_init_field_map, read it's
+ * above works like tuple_field_map_create, read it's
* comments for more details.
*/
uint32_t frames_sz = format->fields_depth * sizeof(struct mp_frame);
--
2.20.1
More information about the Tarantool-patches
mailing list