From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 12/12] vinyl: do not fill secondary tuples with nulls when decoded Date: Thu, 21 Feb 2019 13:26:12 +0300 Message-Id: <31d507dd7de3de8b2cee577b48723051b7fce406.1550744027.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: In contrast to a primary index, which stores full tuples, secondary indexes only store extended (secondary + primary) keys on disk. To make them look like tuples, we fill missing fields with nulls (aka tuple surrogate). This isn't going to work nicely with multikey indexes though: how would you make a surrogate array from a key? We could special-case multikey index handling, but that would look cumbersome. So this patch removes nulls from secondary tuples restored from disk altogether. To achieve that, it's enough to use key_format for them - then the comparators will detect that it's actually a key, not a tuple and use the appropriate primitive. --- src/box/key_def.c | 36 +++++++++++++++++++++ src/box/key_def.h | 24 ++++++++++++++ src/box/tuple_bloom.c | 64 +++++++++++++++++++++++++++++-------- src/box/tuple_bloom.h | 15 +++++++++ src/box/tuple_compare.cc | 24 ++++++++++++++ src/box/vinyl.c | 39 ++++++++++++++++------- src/box/vy_lsm.c | 42 +++++++++++++----------- src/box/vy_lsm.h | 13 +++++--- src/box/vy_run.c | 78 ++++++++++++++++++++++++++++++--------------- src/box/vy_scheduler.c | 2 +- src/box/vy_stmt.c | 40 +++++++++++++---------- test/unit/vy_point_lookup.c | 2 +- 12 files changed, 285 insertions(+), 94 deletions(-) diff --git a/src/box/key_def.c b/src/box/key_def.c index 92b2586d..9a12322f 100644 --- a/src/box/key_def.c +++ b/src/box/key_def.c @@ -690,6 +690,42 @@ key_def_merge(const struct key_def *first, const struct key_def *second) return new_def; } +struct key_def * +key_def_extract(const struct key_def *cmp_def, const struct key_def *pk_def, + struct region *region) +{ + struct key_def *extracted_def = NULL; + size_t region_svp = region_used(region); + + /* First, dump primary key parts as is. */ + struct key_part_def *parts = region_alloc(region, + pk_def->part_count * sizeof(*parts)); + if (parts == NULL) { + diag_set(OutOfMemory, pk_def->part_count * sizeof(*parts), + "region", "key def parts"); + goto out; + } + if (key_def_dump_parts(pk_def, parts, region) != 0) + goto out; + /* + * Second, update field numbers to match the primary key + * parts in a secondary key. + */ + for (uint32_t i = 0; i < pk_def->part_count; i++) { + const struct key_part *part = key_def_find(cmp_def, + &pk_def->parts[i]); + assert(part != NULL); + parts[i].fieldno = part - cmp_def->parts; + parts[i].path = NULL; + } + + /* Finally, allocate the new key definition. */ + extracted_def = key_def_new(parts, pk_def->part_count); +out: + region_truncate(region, region_svp); + return extracted_def; +} + int key_validate_parts(const struct key_def *key_def, const char *key, uint32_t part_count, bool allow_nullable) diff --git a/src/box/key_def.h b/src/box/key_def.h index dd62f6a3..62f4cb21 100644 --- a/src/box/key_def.h +++ b/src/box/key_def.h @@ -375,6 +375,20 @@ key_def_contains(const struct key_def *first, const struct key_def *second); struct key_def * key_def_merge(const struct key_def *first, const struct key_def *second); +/** + * Create a key definition suitable for extracting primary key + * parts from an extended secondary key. + * @param cmp_def Extended secondary key definition + * (must include primary key parts). + * @param pk_def Primary key definition. + * @param region Region used for temporary allocations. + * @retval not NULL Pointer to the extracted key definition. + * @retval NULL Memory allocation error. + */ +struct key_def * +key_def_extract(const struct key_def *cmp_def, const struct key_def *pk_def, + struct region *region); + /* * Check that parts of the key match with the key definition. * @param key_def Key definition. @@ -523,6 +537,16 @@ tuple_common_key_parts(const struct tuple *tuple_a, const struct tuple *tuple_b, struct key_def *key_def); /** + * Return the length of the longest common prefix of two keys. + * @param key_a first key + * @param key_b second key + * @param key_def key defintion + * @return number of key parts the two keys have in common + */ +uint32_t +key_common_parts(const char *key_a, const char *key_b, struct key_def *key_def); + +/** * Compare keys using the key definition. * @param key_a key parts with MessagePack array header * @param part_count_a the number of parts in the key_a diff --git a/src/box/tuple_bloom.c b/src/box/tuple_bloom.c index cf887c7b..60a7a8fd 100644 --- a/src/box/tuple_bloom.c +++ b/src/box/tuple_bloom.c @@ -70,6 +70,25 @@ tuple_bloom_builder_delete(struct tuple_bloom_builder *builder) free(builder); } +static int +tuple_hash_array_add(struct tuple_hash_array *hash_arr, uint32_t hash) +{ + if (hash_arr->count >= hash_arr->capacity) { + uint32_t capacity = MAX(hash_arr->capacity * 2, 1024U); + uint32_t *values = realloc(hash_arr->values, + capacity * sizeof(*values)); + if (values == NULL) { + diag_set(OutOfMemory, capacity * sizeof(*values), + "malloc", "tuple hash array"); + return -1; + } + hash_arr->capacity = capacity; + hash_arr->values = values; + } + hash_arr->values[hash_arr->count++] = hash; + return 0; +} + int tuple_bloom_builder_add(struct tuple_bloom_builder *builder, const struct tuple *tuple, struct key_def *key_def, @@ -92,21 +111,38 @@ tuple_bloom_builder_add(struct tuple_bloom_builder *builder, */ continue; } - struct tuple_hash_array *hash_arr = &builder->parts[i]; - if (hash_arr->count >= hash_arr->capacity) { - uint32_t capacity = MAX(hash_arr->capacity * 2, 1024U); - uint32_t *values = realloc(hash_arr->values, - capacity * sizeof(*values)); - if (values == NULL) { - diag_set(OutOfMemory, capacity * sizeof(*values), - "malloc", "tuple hash array"); - return -1; - } - hash_arr->capacity = capacity; - hash_arr->values = values; + tuple_hash_array_add(&builder->parts[i], + PMurHash32_Result(h, carry, total_size)); + } + return 0; +} + +int +tuple_bloom_builder_add_key(struct tuple_bloom_builder *builder, + const char *key, uint32_t part_count, + struct key_def *key_def, uint32_t hashed_parts) +{ + (void)part_count; + assert(part_count >= key_def->part_count); + assert(builder->part_count == key_def->part_count); + + uint32_t h = HASH_SEED; + uint32_t carry = 0; + uint32_t total_size = 0; + + for (uint32_t i = 0; i < key_def->part_count; i++) { + total_size += tuple_hash_field(&h, &carry, &key, + key_def->parts[i].coll); + if (i < hashed_parts) { + /* + * This part is already in the bloom, proceed + * to the next one. Note, we can't skip to + * hashed_parts, as we need to compute the hash. + */ + continue; } - uint32_t hash = PMurHash32_Result(h, carry, total_size); - hash_arr->values[hash_arr->count++] = hash; + tuple_hash_array_add(&builder->parts[i], + PMurHash32_Result(h, carry, total_size)); } return 0; } diff --git a/src/box/tuple_bloom.h b/src/box/tuple_bloom.h index b05fee18..c45bc30d 100644 --- a/src/box/tuple_bloom.h +++ b/src/box/tuple_bloom.h @@ -121,6 +121,21 @@ tuple_bloom_builder_add(struct tuple_bloom_builder *builder, uint32_t hashed_parts); /** + * Add a key hash to a tuple bloom filter builder. + * @param builder - bloom filter builder + * @param key - key to add + * @param part_count - number of parts in the key + * @param key_def - key definition + * @param hashed_parts - number of key parts that have already + * been added to the builder + * @return 0 on success, -1 on OOM + */ +int +tuple_bloom_builder_add_key(struct tuple_bloom_builder *builder, + const char *key, uint32_t part_count, + struct key_def *key_def, uint32_t hashed_parts); + +/** * Create a new tuple bloom filter. * @param builder - bloom filter builder * @param fpr - desired false positive rate diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc index b8dc350f..b617d6d8 100644 --- a/src/box/tuple_compare.cc +++ b/src/box/tuple_compare.cc @@ -458,6 +458,30 @@ tuple_common_key_parts(const struct tuple *tuple_a, const struct tuple *tuple_b, return i; } +uint32_t +key_common_parts(const char *key_a, const char *key_b, struct key_def *key_def) +{ + uint32_t i; + uint32_t part_count_a = mp_decode_array(&key_a); + uint32_t part_count_b = mp_decode_array(&key_b); + uint32_t part_count = MIN(part_count_a, part_count_b); + part_count = MIN(part_count, key_def->part_count); + for (i = 0; i < part_count; i++) { + struct key_part *part = &key_def->parts[i]; + enum mp_type a_type = mp_typeof(*key_a); + enum mp_type b_type = mp_typeof(*key_b); + if (a_type == MP_NIL && b_type == MP_NIL) + continue; + if (a_type == MP_NIL || b_type == MP_NIL || + tuple_compare_field_with_hint(key_a, a_type, + key_b, b_type, part->type, part->coll) != 0) + break; + mp_next(&key_a); + mp_next(&key_b); + } + return i; +} + template static inline int tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b, diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 925784b0..ce0368ab 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -720,10 +720,9 @@ vinyl_space_create_index(struct space *space, struct index_def *index_def) pk = vy_lsm(space_index(space, 0)); assert(pk != NULL); } - struct vy_lsm *lsm = vy_lsm_new(&env->lsm_env, &env->stmt_env, - &env->cache_env, &env->mem_env, - index_def, space->format, pk, - space_group_id(space)); + struct vy_lsm *lsm = vy_lsm_new(&env->lsm_env, &env->cache_env, + &env->mem_env, index_def, space->format, + pk, space_group_id(space)); if (lsm == NULL) { free(index); return NULL; @@ -1302,13 +1301,27 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx, const struct vy_read_view **rv, struct tuple *tuple, struct tuple **result) { + int rc = 0; assert(lsm->index_id > 0); - if (vy_point_lookup(lsm->pk, tx, rv, tuple, result) != 0) - return -1; + struct tuple *key; + if (vy_stmt_is_key(tuple)) { + key = vy_stmt_extract_key(tuple, lsm->pk_extractor, + lsm->env->key_format); + if (key == NULL) + return -1; + } else { + key = tuple; + tuple_ref(key); + } + + if (vy_point_lookup(lsm->pk, tx, rv, key, result) != 0) { + rc = -1; + goto out; + } if (*result == NULL || - vy_stmt_compare(*result, tuple, lsm->key_def) != 0) { + vy_stmt_compare(*result, tuple, lsm->cmp_def) != 0) { /* * If a tuple read from a secondary index doesn't * match the tuple corresponding to it in the @@ -1328,7 +1341,7 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx, * the tuple cache implementation. */ vy_cache_on_write(&lsm->cache, tuple, NULL); - return 0; + goto out; } /* @@ -1341,13 +1354,15 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx, */ if (tx != NULL && vy_tx_track_point(tx, lsm->pk, *result) != 0) { tuple_unref(*result); - return -1; + rc = -1; + goto out; } if ((*rv)->vlsn == INT64_MAX) - vy_cache_add(&lsm->pk->cache, *result, NULL, tuple, ITER_EQ); - - return 0; + vy_cache_add(&lsm->pk->cache, *result, NULL, key, ITER_EQ); +out: + tuple_unref(key); + return rc; } /** diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c index 2bb56d87..43f8dba8 100644 --- a/src/box/vy_lsm.c +++ b/src/box/vy_lsm.c @@ -118,10 +118,9 @@ vy_lsm_mem_tree_size(struct vy_lsm *lsm) } struct vy_lsm * -vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env, - struct vy_cache_env *cache_env, struct vy_mem_env *mem_env, - struct index_def *index_def, struct tuple_format *format, - struct vy_lsm *pk, uint32_t group_id) +vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_cache_env *cache_env, + struct vy_mem_env *mem_env, struct index_def *index_def, + struct tuple_format *format, struct vy_lsm *pk, uint32_t group_id) { static int64_t run_buckets[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100, @@ -138,16 +137,14 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env, } lsm->env = lsm_env; - struct key_def *key_def = key_def_dup(index_def->key_def); - if (key_def == NULL) + lsm->key_def = key_def_dup(index_def->key_def); + if (lsm->key_def == NULL) goto fail_key_def; - struct key_def *cmp_def = key_def_dup(index_def->cmp_def); - if (cmp_def == NULL) + lsm->cmp_def = key_def_dup(index_def->cmp_def); + if (lsm->cmp_def == NULL) goto fail_cmp_def; - lsm->cmp_def = cmp_def; - lsm->key_def = key_def; if (index_def->iid == 0) { /* * Disk tuples can be returned to an user from a @@ -156,10 +153,12 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env, */ lsm->disk_format = format; } else { - lsm->disk_format = vy_stmt_format_new(stmt_env, &cmp_def, 1, - NULL, 0, 0, NULL); - if (lsm->disk_format == NULL) - goto fail_format; + lsm->disk_format = lsm_env->key_format; + + lsm->pk_extractor = key_def_extract(lsm->cmp_def, pk->key_def, + &fiber()->gc); + if (lsm->pk_extractor == NULL) + goto fail_pk_extractor; } tuple_format_ref(lsm->disk_format); @@ -170,7 +169,7 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env, if (lsm->run_hist == NULL) goto fail_run_hist; - lsm->mem = vy_mem_new(mem_env, cmp_def, format, + lsm->mem = vy_mem_new(mem_env, lsm->cmp_def, format, *lsm->env->p_generation, space_cache_version); if (lsm->mem == NULL) @@ -180,7 +179,8 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env, lsm->refs = 1; lsm->dump_lsn = -1; lsm->commit_lsn = -1; - vy_cache_create(&lsm->cache, cache_env, cmp_def, index_def->iid == 0); + vy_cache_create(&lsm->cache, cache_env, lsm->cmp_def, + index_def->iid == 0); rlist_create(&lsm->sealed); vy_range_tree_new(&lsm->range_tree); vy_range_heap_create(&lsm->range_heap); @@ -208,10 +208,12 @@ fail_run_hist: vy_lsm_stat_destroy(&lsm->stat); fail_stat: tuple_format_unref(lsm->disk_format); -fail_format: - key_def_delete(cmp_def); + if (lsm->pk_extractor != NULL) + key_def_delete(lsm->pk_extractor); +fail_pk_extractor: + key_def_delete(lsm->cmp_def); fail_cmp_def: - key_def_delete(key_def); + key_def_delete(lsm->key_def); fail_key_def: free(lsm); fail: @@ -262,6 +264,8 @@ vy_lsm_delete(struct vy_lsm *lsm) tuple_format_unref(lsm->disk_format); key_def_delete(lsm->cmp_def); key_def_delete(lsm->key_def); + if (lsm->pk_extractor != NULL) + key_def_delete(lsm->pk_extractor); histogram_delete(lsm->run_hist); vy_lsm_stat_destroy(&lsm->stat); vy_cache_destroy(&lsm->cache); diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h index b94e7a43..981893ec 100644 --- a/src/box/vy_lsm.h +++ b/src/box/vy_lsm.h @@ -200,6 +200,12 @@ struct vy_lsm { /** Key definition passed by the user. */ struct key_def *key_def; /** + * Key definition to extract primary key parts from + * a secondary key. NULL if this LSM tree corresponds + * to a primary index. + */ + struct key_def *pk_extractor; + /** * If the following flag is set, the index this LSM tree * is created for is unique and it must be checked for * duplicates on INSERT. Otherwise, the check can be skipped, @@ -333,10 +339,9 @@ vy_lsm_mem_tree_size(struct vy_lsm *lsm); /** Allocate a new LSM tree object. */ struct vy_lsm * -vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env, - struct vy_cache_env *cache_env, struct vy_mem_env *mem_env, - struct index_def *index_def, struct tuple_format *format, - struct vy_lsm *pk, uint32_t group_id); +vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_cache_env *cache_env, + struct vy_mem_env *mem_env, struct index_def *index_def, + struct tuple_format *format, struct vy_lsm *pk, uint32_t group_id); /** Free an LSM tree object. */ void diff --git a/src/box/vy_run.c b/src/box/vy_run.c index e3470c09..01257144 100644 --- a/src/box/vy_run.c +++ b/src/box/vy_run.c @@ -1412,8 +1412,7 @@ vy_run_iterator_open(struct vy_run_iterator *itr, struct vy_slice *slice, enum iterator_type iterator_type, const struct tuple *key, const struct vy_read_view **rv, struct key_def *cmp_def, struct key_def *key_def, - struct tuple_format *format, - bool is_primary) + struct tuple_format *format, bool is_primary) { itr->stat = stat; itr->cmp_def = cmp_def; @@ -2149,7 +2148,8 @@ vy_run_writer_start_page(struct vy_run_writer *writer, if (run->info.page_count >= writer->page_info_capacity && vy_run_alloc_page_info(run, &writer->page_info_capacity) != 0) return -1; - const char *key = tuple_extract_key(first_stmt, writer->cmp_def, NULL); + const char *key = vy_stmt_is_key(first_stmt) ? tuple_data(first_stmt) : + tuple_extract_key(first_stmt, writer->cmp_def, NULL); if (key == NULL) return -1; if (run->info.page_count == 0) { @@ -2165,6 +2165,27 @@ vy_run_writer_start_page(struct vy_run_writer *writer, return 0; } +static int +vy_tuple_bloom_add(struct tuple_bloom_builder *bloom, struct key_def *key_def, + struct tuple *stmt, struct tuple *last_stmt) +{ + if (vy_stmt_is_key(stmt)) { + assert(last_stmt == NULL || vy_stmt_is_key(last_stmt)); + const char *key = tuple_data(stmt); + uint32_t hashed_parts = last_stmt == NULL ? 0 : + key_common_parts(key, tuple_data(last_stmt), key_def); + uint32_t part_count = mp_decode_array(&key); + return tuple_bloom_builder_add_key(bloom, key, part_count, + key_def, hashed_parts); + } else { + assert(last_stmt == NULL || !vy_stmt_is_key(last_stmt)); + uint32_t hashed_parts = last_stmt == NULL ? 0 : + tuple_common_key_parts(stmt, last_stmt, key_def); + return tuple_bloom_builder_add(bloom, stmt, key_def, + hashed_parts); + } +} + /** * Write @a stmt into a current page. * @param writer Run writer. @@ -2176,13 +2197,10 @@ vy_run_writer_start_page(struct vy_run_writer *writer, static int vy_run_writer_write_to_page(struct vy_run_writer *writer, struct tuple *stmt) { - if (writer->bloom != NULL) { - uint32_t hashed_parts = writer->last_stmt == NULL ? 0 : - tuple_common_key_parts(stmt, writer->last_stmt, - writer->key_def); - tuple_bloom_builder_add(writer->bloom, stmt, - writer->key_def, hashed_parts); - } + if (writer->bloom != NULL && + vy_tuple_bloom_add(writer->bloom, writer->key_def, stmt, + writer->last_stmt) != 0) + return -1; if (writer->last_stmt != NULL) vy_stmt_unref_if_possible(writer->last_stmt); writer->last_stmt = stmt; @@ -2302,7 +2320,9 @@ vy_run_writer_commit(struct vy_run_writer *writer) } assert(writer->last_stmt != NULL); - const char *key = tuple_extract_key(writer->last_stmt, + const char *key = vy_stmt_is_key(writer->last_stmt) ? + tuple_data(writer->last_stmt) : + tuple_extract_key(writer->last_stmt, writer->cmp_def, NULL); if (key == NULL) goto out; @@ -2370,6 +2390,7 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir, uint32_t page_info_capacity = 0; const char *key = NULL; + char *page_min_key = NULL; int64_t max_lsn = 0; int64_t min_lsn = INT64_MAX; struct tuple *prev_tuple = NULL; @@ -2390,7 +2411,6 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir, if (run->info.page_count == page_info_capacity && vy_run_alloc_page_info(run, &page_info_capacity) != 0) goto close_err; - const char *page_min_key = NULL; uint32_t page_row_count = 0; uint64_t page_row_index_offset = 0; uint64_t row_offset = xlog_cursor_tx_pos(&cursor); @@ -2407,14 +2427,14 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir, format, iid == 0); if (tuple == NULL) goto close_err; - if (bloom_builder != NULL) { - uint32_t hashed_parts = prev_tuple == NULL ? 0 : - tuple_common_key_parts(prev_tuple, - tuple, key_def); - tuple_bloom_builder_add(bloom_builder, tuple, - key_def, hashed_parts); + if (bloom_builder != NULL && + vy_tuple_bloom_add(bloom_builder, key_def, + tuple, prev_tuple) != 0) { + tuple_unref(tuple); + goto close_err; } - key = tuple_extract_key(tuple, cmp_def, NULL); + key = vy_stmt_is_key(tuple) ? tuple_data(tuple) : + tuple_extract_key(tuple, cmp_def, NULL); if (prev_tuple != NULL) tuple_unref(prev_tuple); prev_tuple = tuple; @@ -2425,8 +2445,11 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir, if (run->info.min_key == NULL) goto close_err; } - if (page_min_key == NULL) - page_min_key = key; + if (page_min_key == NULL) { + page_min_key = key_dup(key); + if (page_min_key == NULL) + goto close_err; + } if (xrow.lsn > max_lsn) max_lsn = xrow.lsn; if (xrow.lsn < min_lsn) @@ -2443,11 +2466,8 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir, info->row_index_offset = page_row_index_offset; ++run->info.page_count; vy_run_acct_page(run, info); - } - - if (prev_tuple != NULL) { - tuple_unref(prev_tuple); - prev_tuple = NULL; + free(page_min_key); + page_min_key = NULL; } if (key != NULL) { @@ -2455,6 +2475,10 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir, if (run->info.max_key == NULL) goto close_err; } + if (prev_tuple != NULL) { + tuple_unref(prev_tuple); + prev_tuple = NULL; + } run->info.max_lsn = max_lsn; run->info.min_lsn = min_lsn; @@ -2487,6 +2511,8 @@ close_err: region_truncate(region, mem_used); if (prev_tuple != NULL) tuple_unref(prev_tuple); + if (page_min_key != NULL) + free(page_min_key); if (bloom_builder != NULL) tuple_bloom_builder_delete(bloom_builder); if (xlog_cursor_is_open(&cursor)) diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index ad28712b..48e739af 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -1407,7 +1407,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_worker *worker, */ struct vy_stmt_stream *wi; bool is_last_level = (lsm->run_count == 0); - wi = vy_write_iterator_new(task->cmp_def, lsm->disk_format, + wi = vy_write_iterator_new(task->cmp_def, lsm->mem_format, lsm->index_id == 0, is_last_level, scheduler->read_views, NULL); if (wi == NULL) diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c index 5a76bfe4..29accf37 100644 --- a/src/box/vy_stmt.c +++ b/src/box/vy_stmt.c @@ -699,6 +699,8 @@ int vy_stmt_encode_primary(const struct tuple *value, struct key_def *key_def, uint32_t space_id, struct xrow_header *xrow) { + assert(!vy_stmt_is_key(value)); + memset(xrow, 0, sizeof(*xrow)); enum iproto_type type = vy_stmt_type(value); xrow->type = type; @@ -755,9 +757,14 @@ vy_stmt_encode_secondary(const struct tuple *value, struct key_def *cmp_def, memset(&request, 0, sizeof(request)); request.type = type; uint32_t size; - const char *extracted = tuple_extract_key(value, cmp_def, &size); - if (extracted == NULL) - return -1; + const char *extracted; + if (!vy_stmt_is_key(value)) { + extracted = tuple_extract_key(value, cmp_def, &size); + if (extracted == NULL) + return -1; + } else { + extracted = tuple_data_range(value, &size); + } if (type == IPROTO_REPLACE || type == IPROTO_INSERT) { request.tuple = extracted; request.tuple_end = extracted + size; @@ -791,21 +798,20 @@ vy_stmt_decode(struct xrow_header *xrow, const struct key_def *key_def, switch (request.type) { case IPROTO_DELETE: /* extract key */ - stmt = vy_stmt_new_surrogate_from_key(request.key, - IPROTO_DELETE, - key_def, format); - break; - case IPROTO_INSERT: - case IPROTO_REPLACE: - if (is_primary) { - stmt = vy_stmt_new_with_ops(format, request.tuple, - request.tuple_end, - NULL, 0, request.type); - } else { - stmt = vy_stmt_new_surrogate_from_key(request.tuple, - request.type, + if (is_primary) + stmt = vy_stmt_new_surrogate_from_key(request.key, + IPROTO_DELETE, key_def, format); - } + else + stmt = vy_stmt_new_with_ops(format, request.key, + request.key_end, + NULL, 0, IPROTO_DELETE); + break; + case IPROTO_INSERT: + case IPROTO_REPLACE: + stmt = vy_stmt_new_with_ops(format, request.tuple, + request.tuple_end, + NULL, 0, request.type); break; case IPROTO_UPSERT: ops.iov_base = (char *)request.ops; diff --git a/test/unit/vy_point_lookup.c b/test/unit/vy_point_lookup.c index 5f20d09b..6268d2d2 100644 --- a/test/unit/vy_point_lookup.c +++ b/test/unit/vy_point_lookup.c @@ -94,7 +94,7 @@ test_basic() index_def_new(512, 0, "primary", sizeof("primary") - 1, TREE, &index_opts, key_def, NULL); - struct vy_lsm *pk = vy_lsm_new(&lsm_env, &stmt_env, &cache_env, &mem_env, + struct vy_lsm *pk = vy_lsm_new(&lsm_env, &cache_env, &mem_env, index_def, format, NULL, 0); isnt(pk, NULL, "lsm is not NULL") -- 2.11.0