[PATCH 12/12] vinyl: do not fill secondary tuples with nulls when decoded

Vladimir Davydov vdavydov.dev at gmail.com
Thu Feb 21 13:26:12 MSK 2019


In contrast to a primary index, which stores full tuples, secondary
indexes only store extended (secondary + primary) keys on disk. To make
them look like tuples, we fill missing fields with nulls (aka tuple
surrogate). This isn't going to work nicely with multikey indexes
though: how would you make a surrogate array from a key? We could
special-case multikey index handling, but that would look cumbersome.
So this patch removes nulls from secondary tuples restored from disk
altogether. To achieve that, it's enough to use key_format for them -
then the comparators will detect that it's actually a key, not a tuple
and use the appropriate primitive.
---
 src/box/key_def.c           | 36 +++++++++++++++++++++
 src/box/key_def.h           | 24 ++++++++++++++
 src/box/tuple_bloom.c       | 64 +++++++++++++++++++++++++++++--------
 src/box/tuple_bloom.h       | 15 +++++++++
 src/box/tuple_compare.cc    | 24 ++++++++++++++
 src/box/vinyl.c             | 39 ++++++++++++++++-------
 src/box/vy_lsm.c            | 42 +++++++++++++-----------
 src/box/vy_lsm.h            | 13 +++++---
 src/box/vy_run.c            | 78 ++++++++++++++++++++++++++++++---------------
 src/box/vy_scheduler.c      |  2 +-
 src/box/vy_stmt.c           | 40 +++++++++++++----------
 test/unit/vy_point_lookup.c |  2 +-
 12 files changed, 285 insertions(+), 94 deletions(-)

diff --git a/src/box/key_def.c b/src/box/key_def.c
index 92b2586d..9a12322f 100644
--- a/src/box/key_def.c
+++ b/src/box/key_def.c
@@ -690,6 +690,42 @@ key_def_merge(const struct key_def *first, const struct key_def *second)
 	return new_def;
 }
 
+struct key_def *
+key_def_extract(const struct key_def *cmp_def, const struct key_def *pk_def,
+		struct region *region)
+{
+	struct key_def *extracted_def = NULL;
+	size_t region_svp = region_used(region);
+
+	/* First, dump primary key parts as is. */
+	struct key_part_def *parts = region_alloc(region,
+			pk_def->part_count * sizeof(*parts));
+	if (parts == NULL) {
+		diag_set(OutOfMemory, pk_def->part_count * sizeof(*parts),
+			 "region", "key def parts");
+		goto out;
+	}
+	if (key_def_dump_parts(pk_def, parts, region) != 0)
+		goto out;
+	/*
+	 * Second, update field numbers to match the primary key
+	 * parts in a secondary key.
+	 */
+	for (uint32_t i = 0; i < pk_def->part_count; i++) {
+		const struct key_part *part = key_def_find(cmp_def,
+							   &pk_def->parts[i]);
+		assert(part != NULL);
+		parts[i].fieldno = part - cmp_def->parts;
+		parts[i].path = NULL;
+	}
+
+	/* Finally, allocate the new key definition. */
+	extracted_def = key_def_new(parts, pk_def->part_count);
+out:
+	region_truncate(region, region_svp);
+	return extracted_def;
+}
+
 int
 key_validate_parts(const struct key_def *key_def, const char *key,
 		   uint32_t part_count, bool allow_nullable)
diff --git a/src/box/key_def.h b/src/box/key_def.h
index dd62f6a3..62f4cb21 100644
--- a/src/box/key_def.h
+++ b/src/box/key_def.h
@@ -375,6 +375,20 @@ key_def_contains(const struct key_def *first, const struct key_def *second);
 struct key_def *
 key_def_merge(const struct key_def *first, const struct key_def *second);
 
+/**
+ * Create a key definition suitable for extracting primary key
+ * parts from an extended secondary key.
+ * @param cmp_def   Extended secondary key definition
+ *                  (must include primary key parts).
+ * @param pk_def    Primary key definition.
+ * @param region    Region used for temporary allocations.
+ * @retval not NULL Pointer to the extracted key definition.
+ * @retval NULL     Memory allocation error.
+ */
+struct key_def *
+key_def_extract(const struct key_def *cmp_def, const struct key_def *pk_def,
+		struct region *region);
+
 /*
  * Check that parts of the key match with the key definition.
  * @param key_def Key definition.
@@ -523,6 +537,16 @@ tuple_common_key_parts(const struct tuple *tuple_a, const struct tuple *tuple_b,
 		       struct key_def *key_def);
 
 /**
+ * Return the length of the longest common prefix of two keys.
+ * @param key_a first key
+ * @param key_b second key
+ * @param key_def key defintion
+ * @return number of key parts the two keys have in common
+ */
+uint32_t
+key_common_parts(const char *key_a, const char *key_b, struct key_def *key_def);
+
+/**
  * Compare keys using the key definition.
  * @param key_a key parts with MessagePack array header
  * @param part_count_a the number of parts in the key_a
diff --git a/src/box/tuple_bloom.c b/src/box/tuple_bloom.c
index cf887c7b..60a7a8fd 100644
--- a/src/box/tuple_bloom.c
+++ b/src/box/tuple_bloom.c
@@ -70,6 +70,25 @@ tuple_bloom_builder_delete(struct tuple_bloom_builder *builder)
 	free(builder);
 }
 
+static int
+tuple_hash_array_add(struct tuple_hash_array *hash_arr, uint32_t hash)
+{
+	if (hash_arr->count >= hash_arr->capacity) {
+		uint32_t capacity = MAX(hash_arr->capacity * 2, 1024U);
+		uint32_t *values = realloc(hash_arr->values,
+					   capacity * sizeof(*values));
+		if (values == NULL) {
+			diag_set(OutOfMemory, capacity * sizeof(*values),
+				 "malloc", "tuple hash array");
+			return -1;
+		}
+		hash_arr->capacity = capacity;
+		hash_arr->values = values;
+	}
+	hash_arr->values[hash_arr->count++] = hash;
+	return 0;
+}
+
 int
 tuple_bloom_builder_add(struct tuple_bloom_builder *builder,
 			const struct tuple *tuple, struct key_def *key_def,
@@ -92,21 +111,38 @@ tuple_bloom_builder_add(struct tuple_bloom_builder *builder,
 			 */
 			continue;
 		}
-		struct tuple_hash_array *hash_arr = &builder->parts[i];
-		if (hash_arr->count >= hash_arr->capacity) {
-			uint32_t capacity = MAX(hash_arr->capacity * 2, 1024U);
-			uint32_t *values = realloc(hash_arr->values,
-						   capacity * sizeof(*values));
-			if (values == NULL) {
-				diag_set(OutOfMemory, capacity * sizeof(*values),
-					 "malloc", "tuple hash array");
-				return -1;
-			}
-			hash_arr->capacity = capacity;
-			hash_arr->values = values;
+		tuple_hash_array_add(&builder->parts[i],
+				     PMurHash32_Result(h, carry, total_size));
+	}
+	return 0;
+}
+
+int
+tuple_bloom_builder_add_key(struct tuple_bloom_builder *builder,
+			    const char *key, uint32_t part_count,
+			    struct key_def *key_def, uint32_t hashed_parts)
+{
+	(void)part_count;
+	assert(part_count >= key_def->part_count);
+	assert(builder->part_count == key_def->part_count);
+
+	uint32_t h = HASH_SEED;
+	uint32_t carry = 0;
+	uint32_t total_size = 0;
+
+	for (uint32_t i = 0; i < key_def->part_count; i++) {
+		total_size += tuple_hash_field(&h, &carry, &key,
+					       key_def->parts[i].coll);
+		if (i < hashed_parts) {
+			/*
+			 * This part is already in the bloom, proceed
+			 * to the next one. Note, we can't skip to
+			 * hashed_parts, as we need to compute the hash.
+			 */
+			continue;
 		}
-		uint32_t hash = PMurHash32_Result(h, carry, total_size);
-		hash_arr->values[hash_arr->count++] = hash;
+		tuple_hash_array_add(&builder->parts[i],
+				     PMurHash32_Result(h, carry, total_size));
 	}
 	return 0;
 }
diff --git a/src/box/tuple_bloom.h b/src/box/tuple_bloom.h
index b05fee18..c45bc30d 100644
--- a/src/box/tuple_bloom.h
+++ b/src/box/tuple_bloom.h
@@ -121,6 +121,21 @@ tuple_bloom_builder_add(struct tuple_bloom_builder *builder,
 			uint32_t hashed_parts);
 
 /**
+ * Add a key hash to a tuple bloom filter builder.
+ * @param builder - bloom filter builder
+ * @param key - key to add
+ * @param part_count - number of parts in the key
+ * @param key_def - key definition
+ * @param hashed_parts - number of key parts that have already
+ *  been added to the builder
+ * @return 0 on success, -1 on OOM
+ */
+int
+tuple_bloom_builder_add_key(struct tuple_bloom_builder *builder,
+			    const char *key, uint32_t part_count,
+			    struct key_def *key_def, uint32_t hashed_parts);
+
+/**
  * Create a new tuple bloom filter.
  * @param builder - bloom filter builder
  * @param fpr - desired false positive rate
diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc
index b8dc350f..b617d6d8 100644
--- a/src/box/tuple_compare.cc
+++ b/src/box/tuple_compare.cc
@@ -458,6 +458,30 @@ tuple_common_key_parts(const struct tuple *tuple_a, const struct tuple *tuple_b,
 	return i;
 }
 
+uint32_t
+key_common_parts(const char *key_a, const char *key_b, struct key_def *key_def)
+{
+	uint32_t i;
+	uint32_t part_count_a = mp_decode_array(&key_a);
+	uint32_t part_count_b = mp_decode_array(&key_b);
+	uint32_t part_count = MIN(part_count_a, part_count_b);
+	part_count = MIN(part_count, key_def->part_count);
+	for (i = 0; i < part_count; i++) {
+		struct key_part *part = &key_def->parts[i];
+		enum mp_type a_type = mp_typeof(*key_a);
+		enum mp_type b_type = mp_typeof(*key_b);
+		if (a_type == MP_NIL && b_type == MP_NIL)
+			continue;
+		if (a_type == MP_NIL || b_type == MP_NIL ||
+		    tuple_compare_field_with_hint(key_a, a_type,
+				key_b, b_type, part->type, part->coll) != 0)
+			break;
+		mp_next(&key_a);
+		mp_next(&key_b);
+	}
+	return i;
+}
+
 template<bool is_nullable, bool has_optional_parts, bool has_json_paths>
 static inline int
 tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 925784b0..ce0368ab 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -720,10 +720,9 @@ vinyl_space_create_index(struct space *space, struct index_def *index_def)
 		pk = vy_lsm(space_index(space, 0));
 		assert(pk != NULL);
 	}
-	struct vy_lsm *lsm = vy_lsm_new(&env->lsm_env, &env->stmt_env,
-					&env->cache_env, &env->mem_env,
-					index_def, space->format, pk,
-					space_group_id(space));
+	struct vy_lsm *lsm = vy_lsm_new(&env->lsm_env, &env->cache_env,
+					&env->mem_env, index_def, space->format,
+					pk, space_group_id(space));
 	if (lsm == NULL) {
 		free(index);
 		return NULL;
@@ -1302,13 +1301,27 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx,
 			  const struct vy_read_view **rv,
 			  struct tuple *tuple, struct tuple **result)
 {
+	int rc = 0;
 	assert(lsm->index_id > 0);
 
-	if (vy_point_lookup(lsm->pk, tx, rv, tuple, result) != 0)
-		return -1;
+	struct tuple *key;
+	if (vy_stmt_is_key(tuple)) {
+		key = vy_stmt_extract_key(tuple, lsm->pk_extractor,
+					  lsm->env->key_format);
+		if (key == NULL)
+			return -1;
+	} else {
+		key = tuple;
+		tuple_ref(key);
+	}
+
+	if (vy_point_lookup(lsm->pk, tx, rv, key, result) != 0) {
+		rc = -1;
+		goto out;
+	}
 
 	if (*result == NULL ||
-	    vy_stmt_compare(*result, tuple, lsm->key_def) != 0) {
+	    vy_stmt_compare(*result, tuple, lsm->cmp_def) != 0) {
 		/*
 		 * If a tuple read from a secondary index doesn't
 		 * match the tuple corresponding to it in the
@@ -1328,7 +1341,7 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx,
 		 * the tuple cache implementation.
 		 */
 		vy_cache_on_write(&lsm->cache, tuple, NULL);
-		return 0;
+		goto out;
 	}
 
 	/*
@@ -1341,13 +1354,15 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx,
 	 */
 	if (tx != NULL && vy_tx_track_point(tx, lsm->pk, *result) != 0) {
 		tuple_unref(*result);
-		return -1;
+		rc = -1;
+		goto out;
 	}
 
 	if ((*rv)->vlsn == INT64_MAX)
-		vy_cache_add(&lsm->pk->cache, *result, NULL, tuple, ITER_EQ);
-
-	return 0;
+		vy_cache_add(&lsm->pk->cache, *result, NULL, key, ITER_EQ);
+out:
+	tuple_unref(key);
+	return rc;
 }
 
 /**
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index 2bb56d87..43f8dba8 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -118,10 +118,9 @@ vy_lsm_mem_tree_size(struct vy_lsm *lsm)
 }
 
 struct vy_lsm *
-vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env,
-	   struct vy_cache_env *cache_env, struct vy_mem_env *mem_env,
-	   struct index_def *index_def, struct tuple_format *format,
-	   struct vy_lsm *pk, uint32_t group_id)
+vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_cache_env *cache_env,
+	   struct vy_mem_env *mem_env, struct index_def *index_def,
+	   struct tuple_format *format, struct vy_lsm *pk, uint32_t group_id)
 {
 	static int64_t run_buckets[] = {
 		0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100,
@@ -138,16 +137,14 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env,
 	}
 	lsm->env = lsm_env;
 
-	struct key_def *key_def = key_def_dup(index_def->key_def);
-	if (key_def == NULL)
+	lsm->key_def = key_def_dup(index_def->key_def);
+	if (lsm->key_def == NULL)
 		goto fail_key_def;
 
-	struct key_def *cmp_def = key_def_dup(index_def->cmp_def);
-	if (cmp_def == NULL)
+	lsm->cmp_def = key_def_dup(index_def->cmp_def);
+	if (lsm->cmp_def == NULL)
 		goto fail_cmp_def;
 
-	lsm->cmp_def = cmp_def;
-	lsm->key_def = key_def;
 	if (index_def->iid == 0) {
 		/*
 		 * Disk tuples can be returned to an user from a
@@ -156,10 +153,12 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env,
 		 */
 		lsm->disk_format = format;
 	} else {
-		lsm->disk_format = vy_stmt_format_new(stmt_env, &cmp_def, 1,
-						      NULL, 0, 0, NULL);
-		if (lsm->disk_format == NULL)
-			goto fail_format;
+		lsm->disk_format = lsm_env->key_format;
+
+		lsm->pk_extractor = key_def_extract(lsm->cmp_def, pk->key_def,
+						    &fiber()->gc);
+		if (lsm->pk_extractor == NULL)
+			goto fail_pk_extractor;
 	}
 	tuple_format_ref(lsm->disk_format);
 
@@ -170,7 +169,7 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env,
 	if (lsm->run_hist == NULL)
 		goto fail_run_hist;
 
-	lsm->mem = vy_mem_new(mem_env, cmp_def, format,
+	lsm->mem = vy_mem_new(mem_env, lsm->cmp_def, format,
 			      *lsm->env->p_generation,
 			      space_cache_version);
 	if (lsm->mem == NULL)
@@ -180,7 +179,8 @@ vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env,
 	lsm->refs = 1;
 	lsm->dump_lsn = -1;
 	lsm->commit_lsn = -1;
-	vy_cache_create(&lsm->cache, cache_env, cmp_def, index_def->iid == 0);
+	vy_cache_create(&lsm->cache, cache_env, lsm->cmp_def,
+			index_def->iid == 0);
 	rlist_create(&lsm->sealed);
 	vy_range_tree_new(&lsm->range_tree);
 	vy_range_heap_create(&lsm->range_heap);
@@ -208,10 +208,12 @@ fail_run_hist:
 	vy_lsm_stat_destroy(&lsm->stat);
 fail_stat:
 	tuple_format_unref(lsm->disk_format);
-fail_format:
-	key_def_delete(cmp_def);
+	if (lsm->pk_extractor != NULL)
+		key_def_delete(lsm->pk_extractor);
+fail_pk_extractor:
+	key_def_delete(lsm->cmp_def);
 fail_cmp_def:
-	key_def_delete(key_def);
+	key_def_delete(lsm->key_def);
 fail_key_def:
 	free(lsm);
 fail:
@@ -262,6 +264,8 @@ vy_lsm_delete(struct vy_lsm *lsm)
 	tuple_format_unref(lsm->disk_format);
 	key_def_delete(lsm->cmp_def);
 	key_def_delete(lsm->key_def);
+	if (lsm->pk_extractor != NULL)
+		key_def_delete(lsm->pk_extractor);
 	histogram_delete(lsm->run_hist);
 	vy_lsm_stat_destroy(&lsm->stat);
 	vy_cache_destroy(&lsm->cache);
diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h
index b94e7a43..981893ec 100644
--- a/src/box/vy_lsm.h
+++ b/src/box/vy_lsm.h
@@ -200,6 +200,12 @@ struct vy_lsm {
 	/** Key definition passed by the user. */
 	struct key_def *key_def;
 	/**
+	 * Key definition to extract primary key parts from
+	 * a secondary key. NULL if this LSM tree corresponds
+	 * to a primary index.
+	 */
+	struct key_def *pk_extractor;
+	/**
 	 * If the following flag is set, the index this LSM tree
 	 * is created for is unique and it must be checked for
 	 * duplicates on INSERT. Otherwise, the check can be skipped,
@@ -333,10 +339,9 @@ vy_lsm_mem_tree_size(struct vy_lsm *lsm);
 
 /** Allocate a new LSM tree object. */
 struct vy_lsm *
-vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_stmt_env *stmt_env,
-	   struct vy_cache_env *cache_env, struct vy_mem_env *mem_env,
-	   struct index_def *index_def, struct tuple_format *format,
-	   struct vy_lsm *pk, uint32_t group_id);
+vy_lsm_new(struct vy_lsm_env *lsm_env, struct vy_cache_env *cache_env,
+	   struct vy_mem_env *mem_env, struct index_def *index_def,
+	   struct tuple_format *format, struct vy_lsm *pk, uint32_t group_id);
 
 /** Free an LSM tree object. */
 void
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index e3470c09..01257144 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -1412,8 +1412,7 @@ vy_run_iterator_open(struct vy_run_iterator *itr,
 		     struct vy_slice *slice, enum iterator_type iterator_type,
 		     const struct tuple *key, const struct vy_read_view **rv,
 		     struct key_def *cmp_def, struct key_def *key_def,
-		     struct tuple_format *format,
-		     bool is_primary)
+		     struct tuple_format *format, bool is_primary)
 {
 	itr->stat = stat;
 	itr->cmp_def = cmp_def;
@@ -2149,7 +2148,8 @@ vy_run_writer_start_page(struct vy_run_writer *writer,
 	if (run->info.page_count >= writer->page_info_capacity &&
 	    vy_run_alloc_page_info(run, &writer->page_info_capacity) != 0)
 		return -1;
-	const char *key = tuple_extract_key(first_stmt, writer->cmp_def, NULL);
+	const char *key = vy_stmt_is_key(first_stmt) ? tuple_data(first_stmt) :
+			  tuple_extract_key(first_stmt, writer->cmp_def, NULL);
 	if (key == NULL)
 		return -1;
 	if (run->info.page_count == 0) {
@@ -2165,6 +2165,27 @@ vy_run_writer_start_page(struct vy_run_writer *writer,
 	return 0;
 }
 
+static int
+vy_tuple_bloom_add(struct tuple_bloom_builder *bloom, struct key_def *key_def,
+		   struct tuple *stmt, struct tuple *last_stmt)
+{
+	if (vy_stmt_is_key(stmt)) {
+		assert(last_stmt == NULL || vy_stmt_is_key(last_stmt));
+		const char *key = tuple_data(stmt);
+		uint32_t hashed_parts = last_stmt == NULL ? 0 :
+			key_common_parts(key, tuple_data(last_stmt), key_def);
+		uint32_t part_count = mp_decode_array(&key);
+		return tuple_bloom_builder_add_key(bloom, key, part_count,
+						   key_def, hashed_parts);
+	} else {
+		assert(last_stmt == NULL || !vy_stmt_is_key(last_stmt));
+		uint32_t hashed_parts = last_stmt == NULL ? 0 :
+			tuple_common_key_parts(stmt, last_stmt, key_def);
+		return tuple_bloom_builder_add(bloom, stmt, key_def,
+					       hashed_parts);
+	}
+}
+
 /**
  * Write @a stmt into a current page.
  * @param writer Run writer.
@@ -2176,13 +2197,10 @@ vy_run_writer_start_page(struct vy_run_writer *writer,
 static int
 vy_run_writer_write_to_page(struct vy_run_writer *writer, struct tuple *stmt)
 {
-	if (writer->bloom != NULL) {
-		uint32_t hashed_parts = writer->last_stmt == NULL ? 0 :
-			tuple_common_key_parts(stmt, writer->last_stmt,
-					       writer->key_def);
-		tuple_bloom_builder_add(writer->bloom, stmt,
-					writer->key_def, hashed_parts);
-	}
+	if (writer->bloom != NULL &&
+	    vy_tuple_bloom_add(writer->bloom, writer->key_def, stmt,
+			       writer->last_stmt) != 0)
+		return -1;
 	if (writer->last_stmt != NULL)
 		vy_stmt_unref_if_possible(writer->last_stmt);
 	writer->last_stmt = stmt;
@@ -2302,7 +2320,9 @@ vy_run_writer_commit(struct vy_run_writer *writer)
 	}
 
 	assert(writer->last_stmt != NULL);
-	const char *key = tuple_extract_key(writer->last_stmt,
+	const char *key = vy_stmt_is_key(writer->last_stmt) ?
+		          tuple_data(writer->last_stmt) :
+			  tuple_extract_key(writer->last_stmt,
 					    writer->cmp_def, NULL);
 	if (key == NULL)
 		goto out;
@@ -2370,6 +2390,7 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 	uint32_t page_info_capacity = 0;
 
 	const char *key = NULL;
+	char *page_min_key = NULL;
 	int64_t max_lsn = 0;
 	int64_t min_lsn = INT64_MAX;
 	struct tuple *prev_tuple = NULL;
@@ -2390,7 +2411,6 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 		if (run->info.page_count == page_info_capacity &&
 		    vy_run_alloc_page_info(run, &page_info_capacity) != 0)
 			goto close_err;
-		const char *page_min_key = NULL;
 		uint32_t page_row_count = 0;
 		uint64_t page_row_index_offset = 0;
 		uint64_t row_offset = xlog_cursor_tx_pos(&cursor);
@@ -2407,14 +2427,14 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 							     format, iid == 0);
 			if (tuple == NULL)
 				goto close_err;
-			if (bloom_builder != NULL) {
-				uint32_t hashed_parts = prev_tuple == NULL ? 0 :
-					tuple_common_key_parts(prev_tuple,
-							       tuple, key_def);
-				tuple_bloom_builder_add(bloom_builder, tuple,
-							key_def, hashed_parts);
+			if (bloom_builder != NULL &&
+			    vy_tuple_bloom_add(bloom_builder, key_def,
+					       tuple, prev_tuple) != 0) {
+				tuple_unref(tuple);
+				goto close_err;
 			}
-			key = tuple_extract_key(tuple, cmp_def, NULL);
+			key = vy_stmt_is_key(tuple) ? tuple_data(tuple) :
+			      tuple_extract_key(tuple, cmp_def, NULL);
 			if (prev_tuple != NULL)
 				tuple_unref(prev_tuple);
 			prev_tuple = tuple;
@@ -2425,8 +2445,11 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 				if (run->info.min_key == NULL)
 					goto close_err;
 			}
-			if (page_min_key == NULL)
-				page_min_key = key;
+			if (page_min_key == NULL) {
+				page_min_key = key_dup(key);
+				if (page_min_key == NULL)
+					goto close_err;
+			}
 			if (xrow.lsn > max_lsn)
 				max_lsn = xrow.lsn;
 			if (xrow.lsn < min_lsn)
@@ -2443,11 +2466,8 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 		info->row_index_offset = page_row_index_offset;
 		++run->info.page_count;
 		vy_run_acct_page(run, info);
-	}
-
-	if (prev_tuple != NULL) {
-		tuple_unref(prev_tuple);
-		prev_tuple = NULL;
+		free(page_min_key);
+		page_min_key = NULL;
 	}
 
 	if (key != NULL) {
@@ -2455,6 +2475,10 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 		if (run->info.max_key == NULL)
 			goto close_err;
 	}
+	if (prev_tuple != NULL) {
+		tuple_unref(prev_tuple);
+		prev_tuple = NULL;
+	}
 	run->info.max_lsn = max_lsn;
 	run->info.min_lsn = min_lsn;
 
@@ -2487,6 +2511,8 @@ close_err:
 	region_truncate(region, mem_used);
 	if (prev_tuple != NULL)
 		tuple_unref(prev_tuple);
+	if (page_min_key != NULL)
+		free(page_min_key);
 	if (bloom_builder != NULL)
 		tuple_bloom_builder_delete(bloom_builder);
 	if (xlog_cursor_is_open(&cursor))
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index ad28712b..48e739af 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -1407,7 +1407,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_worker *worker,
 	 */
 	struct vy_stmt_stream *wi;
 	bool is_last_level = (lsm->run_count == 0);
-	wi = vy_write_iterator_new(task->cmp_def, lsm->disk_format,
+	wi = vy_write_iterator_new(task->cmp_def, lsm->mem_format,
 				   lsm->index_id == 0, is_last_level,
 				   scheduler->read_views, NULL);
 	if (wi == NULL)
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index 5a76bfe4..29accf37 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -699,6 +699,8 @@ int
 vy_stmt_encode_primary(const struct tuple *value, struct key_def *key_def,
 		       uint32_t space_id, struct xrow_header *xrow)
 {
+	assert(!vy_stmt_is_key(value));
+
 	memset(xrow, 0, sizeof(*xrow));
 	enum iproto_type type = vy_stmt_type(value);
 	xrow->type = type;
@@ -755,9 +757,14 @@ vy_stmt_encode_secondary(const struct tuple *value, struct key_def *cmp_def,
 	memset(&request, 0, sizeof(request));
 	request.type = type;
 	uint32_t size;
-	const char *extracted = tuple_extract_key(value, cmp_def, &size);
-	if (extracted == NULL)
-		return -1;
+	const char *extracted;
+	if (!vy_stmt_is_key(value)) {
+		extracted = tuple_extract_key(value, cmp_def, &size);
+		if (extracted == NULL)
+			return -1;
+	} else {
+		extracted = tuple_data_range(value, &size);
+	}
 	if (type == IPROTO_REPLACE || type == IPROTO_INSERT) {
 		request.tuple = extracted;
 		request.tuple_end = extracted + size;
@@ -791,21 +798,20 @@ vy_stmt_decode(struct xrow_header *xrow, const struct key_def *key_def,
 	switch (request.type) {
 	case IPROTO_DELETE:
 		/* extract key */
-		stmt = vy_stmt_new_surrogate_from_key(request.key,
-						      IPROTO_DELETE,
-						      key_def, format);
-		break;
-	case IPROTO_INSERT:
-	case IPROTO_REPLACE:
-		if (is_primary) {
-			stmt = vy_stmt_new_with_ops(format, request.tuple,
-						    request.tuple_end,
-						    NULL, 0, request.type);
-		} else {
-			stmt = vy_stmt_new_surrogate_from_key(request.tuple,
-							      request.type,
+		if (is_primary)
+			stmt = vy_stmt_new_surrogate_from_key(request.key,
+							      IPROTO_DELETE,
 							      key_def, format);
-		}
+		else
+			stmt = vy_stmt_new_with_ops(format, request.key,
+						    request.key_end,
+						    NULL, 0, IPROTO_DELETE);
+		break;
+	case IPROTO_INSERT:
+	case IPROTO_REPLACE:
+		stmt = vy_stmt_new_with_ops(format, request.tuple,
+					    request.tuple_end,
+					    NULL, 0, request.type);
 		break;
 	case IPROTO_UPSERT:
 		ops.iov_base = (char *)request.ops;
diff --git a/test/unit/vy_point_lookup.c b/test/unit/vy_point_lookup.c
index 5f20d09b..6268d2d2 100644
--- a/test/unit/vy_point_lookup.c
+++ b/test/unit/vy_point_lookup.c
@@ -94,7 +94,7 @@ test_basic()
 		index_def_new(512, 0, "primary", sizeof("primary") - 1, TREE,
 			      &index_opts, key_def, NULL);
 
-	struct vy_lsm *pk = vy_lsm_new(&lsm_env, &stmt_env, &cache_env, &mem_env,
+	struct vy_lsm *pk = vy_lsm_new(&lsm_env, &cache_env, &mem_env,
 				       index_def, format, NULL, 0);
 	isnt(pk, NULL, "lsm is not NULL")
 
-- 
2.11.0




More information about the Tarantool-patches mailing list