[PATCH v2 12/14] vinyl: zap vy_stmt_new_surrogate_from_key
Vladimir Davydov
vdavydov.dev at gmail.com
Wed Mar 13 11:52:58 MSK 2019
This heavy function isn't needed anymore, as we can now insert key
statements into the memory level.
---
src/box/vinyl.c | 4 +-
src/box/vy_lsm.c | 3 +-
src/box/vy_mem.c | 3 +-
src/box/vy_point_lookup.c | 2 +-
src/box/vy_read_iterator.c | 3 +-
src/box/vy_run.c | 31 +++-------
src/box/vy_run.h | 9 +--
src/box/vy_stmt.c | 147 +++++---------------------------------------
src/box/vy_stmt.h | 38 +++++-------
src/box/vy_write_iterator.c | 2 +-
test/vinyl/stat.result | 2 +-
11 files changed, 53 insertions(+), 191 deletions(-)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index e9a4ae0d..f2d9ab55 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1719,8 +1719,8 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
}
} else {
assert(lsm->index_id == 0);
- delete = vy_stmt_new_surrogate_delete_from_key(request->key,
- pk->key_def, pk->mem_format);
+ delete = vy_stmt_new_delete(pk->env->key_format,
+ request->key, request->key_end);
if (delete == NULL)
return -1;
if (space->index_count > 1)
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index 28cd487c..a5e92887 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -898,7 +898,8 @@ vy_lsm_set(struct vy_lsm *lsm, struct vy_mem *mem,
lsm->stat.memory.count.bytes += tuple_size(stmt);
/* Abort transaction if format was changed by DDL */
- if (format_id != tuple_format_id(mem->format)) {
+ if (!vy_stmt_is_key(stmt) &&
+ format_id != tuple_format_id(mem->format)) {
diag_set(ClientError, ER_TRANSACTION_CONFLICT);
return -1;
}
diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c
index c47a3c36..7fa90cbd 100644
--- a/src/box/vy_mem.c
+++ b/src/box/vy_mem.c
@@ -215,7 +215,8 @@ vy_mem_insert(struct vy_mem *mem, const struct tuple *stmt)
{
assert(vy_stmt_type(stmt) != IPROTO_UPSERT);
/* Check if the statement can be inserted in the vy_mem. */
- assert(stmt->format_id == tuple_format_id(mem->format));
+ assert(vy_stmt_is_key(stmt) ||
+ stmt->format_id == tuple_format_id(mem->format));
/* The statement must be from a lsregion. */
assert(!vy_stmt_is_refable(stmt));
size_t size = tuple_size(stmt);
diff --git a/src/box/vy_point_lookup.c b/src/box/vy_point_lookup.c
index 51226ce8..9f7796eb 100644
--- a/src/box/vy_point_lookup.c
+++ b/src/box/vy_point_lookup.c
@@ -144,7 +144,7 @@ vy_point_lookup_scan_slice(struct vy_lsm *lsm, struct vy_slice *slice,
struct vy_run_iterator run_itr;
vy_run_iterator_open(&run_itr, &lsm->stat.disk.iterator, slice,
ITER_EQ, key, rv, lsm->cmp_def, lsm->key_def,
- lsm->disk_format, lsm->index_id == 0);
+ lsm->disk_format);
struct vy_history slice_history;
vy_history_create(&slice_history, &lsm->env->history_node_pool);
int rc = vy_run_iterator_next(&run_itr, &slice_history);
diff --git a/src/box/vy_read_iterator.c b/src/box/vy_read_iterator.c
index cc884bec..2864a280 100644
--- a/src/box/vy_read_iterator.c
+++ b/src/box/vy_read_iterator.c
@@ -622,8 +622,7 @@ vy_read_iterator_add_disk(struct vy_read_iterator *itr)
&lsm->stat.disk.iterator, slice,
iterator_type, itr->key,
itr->read_view, lsm->cmp_def,
- lsm->key_def, lsm->disk_format,
- lsm->index_id == 0);
+ lsm->key_def, lsm->disk_format);
}
}
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index 928ca57f..699941a8 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -707,22 +707,19 @@ vy_page_xrow(struct vy_page *page, uint32_t stmt_no,
* Read raw stmt data from the page
* @param page Page.
* @param stmt_no Statement position in the page.
- * @param cmp_def Key definition, including primary key parts.
* @param format Format for REPLACE/DELETE tuples.
- * @param is_primary True if the index is primary.
*
* @retval not NULL Statement read from page.
* @retval NULL Memory error.
*/
static struct tuple *
vy_page_stmt(struct vy_page *page, uint32_t stmt_no,
- const struct key_def *cmp_def, struct tuple_format *format,
- bool is_primary)
+ struct tuple_format *format)
{
struct xrow_header xrow;
if (vy_page_xrow(page, stmt_no, &xrow) != 0)
return NULL;
- return vy_stmt_decode(&xrow, cmp_def, format, is_primary);
+ return vy_stmt_decode(&xrow, format);
}
/**
@@ -1025,8 +1022,7 @@ vy_run_iterator_read(struct vy_run_iterator *itr,
int rc = vy_run_iterator_load_page(itr, pos.page_no, &page);
if (rc != 0)
return rc;
- *stmt = vy_page_stmt(page, pos.pos_in_page, itr->cmp_def,
- itr->format, itr->is_primary);
+ *stmt = vy_page_stmt(page, pos.pos_in_page, itr->format);
if (*stmt == NULL)
return -1;
return 0;
@@ -1052,9 +1048,7 @@ vy_run_iterator_search_in_page(struct vy_run_iterator *itr,
iterator_type == ITER_LE ? -1 : 0);
while (beg != end) {
uint32_t mid = beg + (end - beg) / 2;
- struct tuple *fnd_key = vy_page_stmt(page, mid, itr->cmp_def,
- itr->format,
- itr->is_primary);
+ struct tuple *fnd_key = vy_page_stmt(page, mid, itr->format);
if (fnd_key == NULL)
return end;
int cmp = vy_stmt_compare(fnd_key, key, itr->cmp_def);
@@ -1402,14 +1396,12 @@ vy_run_iterator_open(struct vy_run_iterator *itr,
struct vy_slice *slice, enum iterator_type iterator_type,
const struct tuple *key, const struct vy_read_view **rv,
struct key_def *cmp_def, struct key_def *key_def,
- struct tuple_format *format,
- bool is_primary)
+ struct tuple_format *format)
{
itr->stat = stat;
itr->cmp_def = cmp_def;
itr->key_def = key_def;
itr->format = format;
- itr->is_primary = is_primary;
itr->slice = slice;
itr->iterator_type = iterator_type;
@@ -2393,8 +2385,7 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
continue;
}
++page_row_count;
- struct tuple *tuple = vy_stmt_decode(&xrow, cmp_def,
- format, iid == 0);
+ struct tuple *tuple = vy_stmt_decode(&xrow, format);
if (tuple == NULL)
goto close_err;
if (bloom_builder != NULL &&
@@ -2574,8 +2565,7 @@ vy_slice_stream_search(struct vy_stmt_stream *virt_stream)
while (beg != end) {
uint32_t mid = beg + (end - beg) / 2;
struct tuple *fnd_key = vy_page_stmt(stream->page, mid,
- stream->cmp_def, stream->format,
- stream->is_primary);
+ stream->format);
if (fnd_key == NULL)
return -1;
int cmp = vy_stmt_compare(fnd_key, stream->slice->begin,
@@ -2622,8 +2612,7 @@ vy_slice_stream_next(struct vy_stmt_stream *virt_stream, struct tuple **ret)
/* Read current tuple from the page */
struct tuple *tuple = vy_page_stmt(stream->page, stream->pos_in_page,
- stream->cmp_def, stream->format,
- stream->is_primary);
+ stream->format);
if (tuple == NULL) /* Read or memory error */
return -1;
@@ -2696,8 +2685,7 @@ static const struct vy_stmt_stream_iface vy_slice_stream_iface = {
void
vy_slice_stream_open(struct vy_slice_stream *stream, struct vy_slice *slice,
- struct key_def *cmp_def, struct tuple_format *format,
- bool is_primary)
+ struct key_def *cmp_def, struct tuple_format *format)
{
stream->base.iface = &vy_slice_stream_iface;
@@ -2710,5 +2698,4 @@ vy_slice_stream_open(struct vy_slice_stream *stream, struct vy_slice *slice,
stream->cmp_def = cmp_def;
stream->format = format;
tuple_format_ref(format);
- stream->is_primary = is_primary;
}
diff --git a/src/box/vy_run.h b/src/box/vy_run.h
index 18ca1729..beb87b78 100644
--- a/src/box/vy_run.h
+++ b/src/box/vy_run.h
@@ -248,8 +248,6 @@ struct vy_run_iterator {
* pages.
*/
struct tuple_format *format;
- /** Set if this iterator is for a primary index. */
- bool is_primary;
/** The run slice to iterate. */
struct vy_slice *slice;
@@ -523,7 +521,7 @@ vy_run_iterator_open(struct vy_run_iterator *itr,
struct vy_slice *slice, enum iterator_type iterator_type,
const struct tuple *key, const struct vy_read_view **rv,
struct key_def *cmp_def, struct key_def *key_def,
- struct tuple_format *format, bool is_primary);
+ struct tuple_format *format);
/**
* Advance a run iterator to the next key.
@@ -575,8 +573,6 @@ struct vy_slice_stream {
struct key_def *cmp_def;
/** Format for allocating REPLACE and DELETE tuples read from pages. */
struct tuple_format *format;
- /** Set if this iterator is for a primary index. */
- bool is_primary;
};
/**
@@ -584,8 +580,7 @@ struct vy_slice_stream {
*/
void
vy_slice_stream_open(struct vy_slice_stream *stream, struct vy_slice *slice,
- struct key_def *cmp_def, struct tuple_format *format,
- bool is_primary);
+ struct key_def *cmp_def, struct tuple_format *format);
/**
* Run_writer fills a created run with statements one by one,
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index fd7850cc..2e26e843 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -371,6 +371,14 @@ vy_stmt_new_insert(struct tuple_format *format, const char *tuple_begin,
}
struct tuple *
+vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin,
+ const char *tuple_end)
+{
+ return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
+ NULL, 0, IPROTO_DELETE);
+}
+
+struct tuple *
vy_stmt_replace_from_upsert(const struct tuple *upsert)
{
assert(vy_stmt_type(upsert) == IPROTO_UPSERT);
@@ -393,122 +401,6 @@ vy_stmt_replace_from_upsert(const struct tuple *upsert)
return replace;
}
-static struct tuple *
-vy_stmt_new_surrogate_from_key(const char *key, enum iproto_type type,
- const struct key_def *cmp_def,
- struct tuple_format *format)
-{
- /* UPSERT can't be surrogate. */
- assert(type != IPROTO_UPSERT);
- struct region *region = &fiber()->gc;
- size_t region_svp = region_used(region);
-
- uint32_t field_count = format->index_field_count;
- uint32_t iov_sz = sizeof(struct iovec) * format->total_field_count;
- struct iovec *iov = region_alloc(region, iov_sz);
- if (iov == NULL) {
- diag_set(OutOfMemory, iov_sz, "region",
- "iov for surrogate key");
- return NULL;
- }
- memset(iov, 0, iov_sz);
- uint32_t part_count = mp_decode_array(&key);
- assert(part_count == cmp_def->part_count);
- assert(part_count <= format->total_field_count);
- /**
- * Calculate bsize using format::min_tuple_size tuple
- * where parts_count nulls replaced with extracted keys.
- */
- uint32_t bsize = format->min_tuple_size - mp_sizeof_nil() * part_count;
- for (uint32_t i = 0; i < part_count; ++i) {
- const struct key_part *part = &cmp_def->parts[i];
- assert(part->fieldno < field_count);
- struct tuple_field *field =
- tuple_format_field_by_path(format, part->fieldno,
- part->path, part->path_len);
- assert(field != NULL);
- const char *svp = key;
- iov[field->id].iov_base = (char *) key;
- mp_next(&key);
- iov[field->id].iov_len = key - svp;
- bsize += key - svp;
- }
-
- struct tuple *stmt = vy_stmt_alloc(format, bsize);
- if (stmt == NULL)
- goto out;
-
- char *raw = (char *) tuple_data(stmt);
- uint32_t *field_map = (uint32_t *) raw;
- memset((char *)field_map - format->field_map_size, 0,
- format->field_map_size);
- char *wpos = mp_encode_array(raw, field_count);
- struct tuple_field *field;
- json_tree_foreach_entry_preorder(field, &format->fields.root,
- struct tuple_field, token) {
- /*
- * Do not restore fields not involved in index
- * (except gaps in the mp array that may be filled
- * with nils later).
- */
- if (!field->is_key_part)
- continue;
- if (field->token.type == JSON_TOKEN_NUM) {
- /*
- * Write nil istead of omitted array
- * members.
- */
- struct json_token **neighbors =
- field->token.parent->children;
- for (int i = field->token.sibling_idx - 1; i >= 0; i--) {
- if (neighbors[i] != NULL &&
- json_tree_entry(neighbors[i],
- struct tuple_field,
- token)->is_key_part)
- break;
- wpos = mp_encode_nil(wpos);
- }
- } else {
- /* Write a key string for map member. */
- assert(field->token.type == JSON_TOKEN_STR);
- const char *str = field->token.str;
- uint32_t len = field->token.len;
- wpos = mp_encode_str(wpos, str, len);
- }
- int max_child_idx = field->token.max_child_idx;
- if (json_token_is_leaf(&field->token)) {
- if (iov[field->id].iov_len == 0) {
- wpos = mp_encode_nil(wpos);
- } else {
- memcpy(wpos, iov[field->id].iov_base,
- iov[field->id].iov_len);
- uint32_t data_offset = wpos - raw;
- int32_t slot = field->offset_slot;
- if (slot != TUPLE_OFFSET_SLOT_NIL)
- field_map[slot] = data_offset;
- wpos += iov[field->id].iov_len;
- }
- } else if (field->type == FIELD_TYPE_ARRAY) {
- wpos = mp_encode_array(wpos, max_child_idx + 1);
- } else if (field->type == FIELD_TYPE_MAP) {
- wpos = mp_encode_map(wpos, max_child_idx + 1);
- }
- }
- assert(wpos == raw + bsize);
- vy_stmt_set_type(stmt, type);
-out:
- region_truncate(region, region_svp);
- return stmt;
-}
-
-struct tuple *
-vy_stmt_new_surrogate_delete_from_key(const char *key, struct key_def *cmp_def,
- struct tuple_format *format)
-{
- return vy_stmt_new_surrogate_from_key(key, IPROTO_DELETE,
- cmp_def, format);
-}
-
struct tuple *
vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
const char *src_data, const char *src_data_end)
@@ -748,8 +640,6 @@ int
vy_stmt_encode_primary(const struct tuple *value, struct key_def *key_def,
uint32_t space_id, struct xrow_header *xrow)
{
- assert(!vy_stmt_is_key(value));
-
memset(xrow, 0, sizeof(*xrow));
enum iproto_type type = vy_stmt_type(value);
xrow->type = type;
@@ -763,8 +653,9 @@ vy_stmt_encode_primary(const struct tuple *value, struct key_def *key_def,
const char *extracted = NULL;
switch (type) {
case IPROTO_DELETE:
- /* extract key */
- extracted = tuple_extract_key(value, key_def, &size);
+ extracted = vy_stmt_is_key(value) ?
+ tuple_data_range(value, &size) :
+ tuple_extract_key(value, key_def, &size);
if (extracted == NULL)
return -1;
request.key = extracted;
@@ -829,9 +720,9 @@ vy_stmt_encode_secondary(const struct tuple *value, struct key_def *cmp_def,
}
struct tuple *
-vy_stmt_decode(struct xrow_header *xrow, const struct key_def *key_def,
- struct tuple_format *format, bool is_primary)
+vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format)
{
+ struct vy_stmt_env *env = format->engine;
struct request request;
uint64_t key_map = dml_request_key_map(xrow->type);
key_map &= ~(1ULL << IPROTO_SPACE_ID); /* space_id is optional */
@@ -841,14 +732,10 @@ vy_stmt_decode(struct xrow_header *xrow, const struct key_def *key_def,
struct iovec ops;
switch (request.type) {
case IPROTO_DELETE:
- if (is_primary)
- stmt = vy_stmt_new_surrogate_from_key(request.key,
- IPROTO_DELETE,
- key_def, format);
- else
- stmt = vy_stmt_new_with_ops(format, request.key,
- request.key_end,
- NULL, 0, IPROTO_DELETE);
+ /* Always use key format for DELETE statements. */
+ stmt = vy_stmt_new_with_ops(env->key_format,
+ request.key, request.key_end,
+ NULL, 0, IPROTO_DELETE);
break;
case IPROTO_INSERT:
case IPROTO_REPLACE:
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index 7b93883d..187aa8dd 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -419,27 +419,6 @@ char *
vy_key_dup(const char *key);
/**
- * Create a new surrogate DELETE from @a key using format.
- *
- * Example:
- * key: {a3, a5}
- * key_def: { 3, 5 }
- * result: {nil, nil, a3, nil, a5}
- *
- * @param key MessagePack array with key fields.
- * @param cmp_def Key definition of the result statement (incudes
- * primary key parts).
- * @param format Target tuple format.
- *
- * @retval not NULL Success.
- * @retval NULL Memory or format error.
- */
-struct tuple *
-vy_stmt_new_surrogate_delete_from_key(const char *key,
- struct key_def *cmp_def,
- struct tuple_format *format);
-
-/**
* Create a new surrogate DELETE from @a tuple using @a format.
* A surrogate tuple has format->field_count fields from the source
* with all unindexed fields replaced with MessagePack NIL.
@@ -497,6 +476,20 @@ struct tuple *
vy_stmt_new_insert(struct tuple_format *format, const char *tuple_begin,
const char *tuple_end);
+/**
+ * Create the DELETE statement from raw MessagePack data.
+ * @param format Format of a tuple for offsets generating.
+ * @param tuple_begin MessagePack data that contain an array of fields WITH the
+ * array header.
+ * @param tuple_end End of the array that begins from @param tuple_begin.
+ *
+ * @retval NULL Memory allocation error.
+ * @retval not NULL Success.
+ */
+struct tuple *
+vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin,
+ const char *tuple_end);
+
/**
* Create the UPSERT statement from raw MessagePack data.
* @param tuple_begin MessagePack data that contain an array of fields WITH the
@@ -649,8 +642,7 @@ vy_stmt_encode_secondary(const struct tuple *value, struct key_def *cmp_def,
* @retval NULL on error
*/
struct tuple *
-vy_stmt_decode(struct xrow_header *xrow, const struct key_def *key_def,
- struct tuple_format *format, bool is_primary);
+vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format);
/**
* Format a statement into string.
diff --git a/src/box/vy_write_iterator.c b/src/box/vy_write_iterator.c
index 36e43bc9..ee2cb7ae 100644
--- a/src/box/vy_write_iterator.c
+++ b/src/box/vy_write_iterator.c
@@ -476,7 +476,7 @@ vy_write_iterator_new_slice(struct vy_stmt_stream *vstream,
if (src == NULL)
return -1;
vy_slice_stream_open(&src->slice_stream, slice, stream->cmp_def,
- disk_format, stream->is_primary);
+ disk_format);
return 0;
}
diff --git a/test/vinyl/stat.result b/test/vinyl/stat.result
index 01da5f14..08da1658 100644
--- a/test/vinyl/stat.result
+++ b/test/vinyl/stat.result
@@ -1391,7 +1391,7 @@ st2 = i2:stat()
...
s:bsize()
---
-- 107449
+- 107199
...
i1:len(), i2:len()
---
--
2.11.0
More information about the Tarantool-patches
mailing list