[PATCH 2/4] vinyl: introduce bloom filters for partial key lookups

Vladimir Davydov vdavydov.dev at gmail.com
Sat Feb 24 11:32:38 MSK 2018


Currently, we store and use bloom only for full-key lookups. However,
there are use cases when we can also benefit from maintaining bloom
filters for partial keys as well - see #3177 for example. So this patch
replaces the current full-key bloom filter with a multipart one, which
is basically a set of bloom filters, one per each sub-key. Old bloom
filters stored on disk will be silently ignored. This will degrade
performance of existing deployments until major compaction is scheduled,
but this should be mitigated by #3139 (knob for forcing compaction).
Besides, there are still no vinyl deployments in production as far as we
know. Anyway, if there are complaints about the performance degradation,
we can add some code to load and use the legacy bloom filter if the new
one is unavailable.

When a key or tuple is checked against a multipart bloom filter, we
check all its sub-keys to reduce the false positive result. Nevertheless
there's no size optimization as per now. E.g. even if the cardinality of
a partial key is the same as of the full key, we will still store two
full-sized bloom filters although we could probably save some space in
this case by assuming that checking against the bloom corresponding to
a partial key would reduce the false positive rate of full key lookups.
This is addressed later in the series.

Before this patch we used a bloom spectrum object to construct a bloom
filter. A bloom spectrum is basically a set of bloom filters ranging in
size. The point of using a spectrum is that we don't know what the run
size will be while we are writing it so we create 10 bloom filters and
choose the best of them after we are done. With the default bloom fpr of
0.05 it is 10 byte overhead per record, which seems to be OK. However,
if we try to optimize other parameters as well, e.g. the number of hash
functions, the cost of a spectrum will become prohibitive. Funny thing
is a tuple hash is only 4 bytes long, which means if we stored all
hashes in an array and built a bloom filter after we'd written a run, we
would reduce the memory footprint by more than half! And that would only
slightly increase the run write time as scanning a memory map of hashes
and constructing a bloom filter is cheap in comparison to mering runs.
Putting it all together, we stop using bloom spectrum in this patch,
instead we stash all hashes in a new bloom builder object and use them
to build a perfect bloom filer after the run has been written and we
know the cardinality of each sub key.

Closes #3177
---
 src/box/CMakeLists.txt      |   1 +
 src/box/iproto_constants.c  |   3 +-
 src/box/iproto_constants.h  |   4 +-
 src/box/tuple_bloom.c       | 274 ++++++++++++++++++++++++++++++++++++++++++++
 src/box/tuple_bloom.h       | 191 ++++++++++++++++++++++++++++++
 src/box/tuple_compare.cc    |  24 ++++
 src/box/tuple_compare.h     |  12 ++
 src/box/tuple_hash.cc       |  13 ++-
 src/box/tuple_hash.h        |  30 +++++
 src/box/vy_run.c            | 266 +++++++++++++++++-------------------------
 src/box/vy_run.h            |  19 ++-
 src/box/vy_scheduler.c      |  25 +---
 src/box/xlog.c              |  27 -----
 src/box/xlog.h              |   9 --
 test/unit/vy_point_lookup.c |  10 +-
 test/vinyl/bloom.result     | 144 ++++++++++++++++++++++-
 test/vinyl/bloom.test.lua   |  60 +++++++++-
 test/vinyl/info.result      |  14 +--
 18 files changed, 868 insertions(+), 258 deletions(-)
 create mode 100644 src/box/tuple_bloom.c
 create mode 100644 src/box/tuple_bloom.h

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index ad7f910f..cb27db78 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -37,6 +37,7 @@ add_library(tuple STATIC
     tuple_compare.cc
     tuple_extract_key.cc
     tuple_hash.cc
+    tuple_bloom.c
     tuple_dictionary.c
     key_def.cc
     coll_def.c
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index cd7b1d03..4a8643cb 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -194,7 +194,8 @@ const char *vy_run_info_key_strs[VY_RUN_INFO_KEY_MAX] = {
 	"min lsn",
 	"max lsn",
 	"page count",
-	"bloom filter"
+	"bloom filter legacy",
+	"bloom filter",
 };
 
 const char *vy_row_index_key_strs[VY_ROW_INDEX_KEY_MAX] = {
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 95184248..92534ef6 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -295,8 +295,10 @@ enum vy_run_info_key {
 	VY_RUN_INFO_MAX_LSN = 4,
 	/** Number of pages in the run. */
 	VY_RUN_INFO_PAGE_COUNT = 5,
+	/** Legacy bloom filter implementation. */
+	VY_RUN_INFO_BLOOM_LEGACY = 6,
 	/** Bloom filter for keys. */
-	VY_RUN_INFO_BLOOM = 6,
+	VY_RUN_INFO_BLOOM = 7,
 	/** The last key in this enum + 1 */
 	VY_RUN_INFO_KEY_MAX
 };
diff --git a/src/box/tuple_bloom.c b/src/box/tuple_bloom.c
new file mode 100644
index 00000000..65e9827d
--- /dev/null
+++ b/src/box/tuple_bloom.c
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "tuple_bloom.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+#include <msgpuck.h>
+#include "diag.h"
+#include "errcode.h"
+#include "memory.h"
+#include "key_def.h"
+#include "tuple.h"
+#include "tuple_hash.h"
+#include "salad/bloom.h"
+#include "trivia/util.h"
+#include "third_party/PMurHash.h"
+
+enum { HASH_SEED = 13U };
+
+struct tuple_bloom_builder *
+tuple_bloom_builder_new(uint32_t part_count)
+{
+	size_t size = sizeof(struct tuple_bloom_builder) +
+		part_count * sizeof(struct tuple_hash_array);
+	struct tuple_bloom_builder *builder = malloc(size);
+	if (builder == NULL) {
+		diag_set(OutOfMemory, size, "malloc", "tuple bloom builder");
+		return NULL;
+	}
+	memset(builder, 0, size);
+	builder->part_count = part_count;
+	return builder;
+}
+
+void
+tuple_bloom_builder_delete(struct tuple_bloom_builder *builder)
+{
+	for (uint32_t i = 0; i < builder->part_count; i++)
+		free(builder->parts[i].values);
+	free(builder);
+}
+
+int
+tuple_bloom_builder_add(struct tuple_bloom_builder *builder,
+			const struct tuple *tuple,
+			const struct key_def *key_def,
+			uint32_t hashed_parts)
+{
+	assert(builder->part_count == key_def->part_count);
+
+	uint32_t h = HASH_SEED;
+	uint32_t carry = 0;
+	uint32_t total_size = 0;
+
+	for (uint32_t i = 0; i < key_def->part_count; i++) {
+		total_size += tuple_hash_key_part(&h, &carry, tuple,
+						  &key_def->parts[i]);
+		if (i < hashed_parts)
+			continue;
+		struct tuple_hash_array *hash_arr = &builder->parts[i];
+		if (hash_arr->count >= hash_arr->capacity) {
+			uint32_t capacity = MAX(hash_arr->capacity * 2, 1024U);
+			uint32_t *values = realloc(hash_arr->values,
+						   capacity * sizeof(*values));
+			if (values == NULL) {
+				diag_set(OutOfMemory, capacity * sizeof(*values),
+					 "malloc", "tuple hash array");
+				return -1;
+			}
+			hash_arr->capacity = capacity;
+			hash_arr->values = values;
+		}
+		uint32_t hash = PMurHash32_Result(h, carry, total_size);
+		hash_arr->values[hash_arr->count++] = hash;
+	}
+	return 0;
+}
+
+struct tuple_bloom *
+tuple_bloom_new(struct tuple_bloom_builder *builder, double fpr)
+{
+	uint32_t part_count = builder->part_count;
+	size_t size = sizeof(struct tuple_bloom) +
+		part_count * sizeof(struct bloom);
+	struct tuple_bloom *bloom = malloc(size);
+	if (bloom == NULL) {
+		diag_set(OutOfMemory, size, "malloc", "tuple bloom");
+		return NULL;
+	}
+	for (uint32_t i = 0; i < part_count; i++) {
+		struct tuple_hash_array *hash_arr = &builder->parts[i];
+		uint32_t count = hash_arr->count;
+		if (bloom_create(&bloom->parts[i], count,
+				 fpr, runtime.quota) != 0) {
+			diag_set(OutOfMemory, 0, "bloom_create",
+				 "tuple bloom part");
+			for (uint32_t j = 0; j < i; j++)
+				bloom_destroy(&bloom->parts[j], runtime.quota);
+			free(bloom);
+			return NULL;
+		}
+		for (uint32_t k = 0; k < count; k++)
+			bloom_add(&bloom->parts[i], hash_arr->values[k]);
+	}
+	bloom->part_count = part_count;
+	return bloom;
+}
+
+void
+tuple_bloom_delete(struct tuple_bloom *bloom)
+{
+	for (uint32_t i = 0; i < bloom->part_count; i++)
+		bloom_destroy(&bloom->parts[i], runtime.quota);
+	free(bloom);
+}
+
+bool
+tuple_bloom_possible_has(const struct tuple_bloom *bloom,
+			 const struct tuple *tuple,
+			 const struct key_def *key_def)
+{
+	assert(bloom->part_count == key_def->part_count);
+
+	uint32_t h = HASH_SEED;
+	uint32_t carry = 0;
+	uint32_t total_size = 0;
+
+	for (uint32_t i = 0; i < key_def->part_count; i++) {
+		total_size += tuple_hash_key_part(&h, &carry, tuple,
+						  &key_def->parts[i]);
+		uint32_t hash = PMurHash32_Result(h, carry, total_size);
+		if (!bloom_possible_has(&bloom->parts[i], hash))
+			return false;
+	}
+	return true;
+}
+
+bool
+tuple_bloom_possible_has_key(const struct tuple_bloom *bloom,
+			     const char *key, uint32_t part_count,
+			     const struct key_def *key_def)
+{
+	assert(part_count <= key_def->part_count);
+	assert(bloom->part_count == key_def->part_count);
+
+	uint32_t h = HASH_SEED;
+	uint32_t carry = 0;
+	uint32_t total_size = 0;
+
+	for (uint32_t i = 0; i < part_count; i++) {
+		total_size += tuple_hash_field(&h, &carry, &key,
+					       key_def->parts[i].coll);
+		uint32_t hash = PMurHash32_Result(h, carry, total_size);
+		if (!bloom_possible_has(&bloom->parts[i], hash))
+			return false;
+	}
+	return true;
+}
+
+static size_t
+tuple_bloom_sizeof_part(const struct bloom *part)
+{
+	size_t size = 0;
+	size += mp_sizeof_array(3);
+	size += mp_sizeof_uint(part->table_size);
+	size += mp_sizeof_uint(part->hash_count);
+	size += mp_sizeof_bin(bloom_store_size(part));
+	return size;
+}
+
+static char *
+tuple_bloom_encode_part(const struct bloom *part, char *buf)
+{
+	buf = mp_encode_array(buf, 3);
+	buf = mp_encode_uint(buf, part->table_size);
+	buf = mp_encode_uint(buf, part->hash_count);
+	buf = mp_encode_binl(buf, bloom_store_size(part));
+	buf = bloom_store(part, buf);
+	return buf;
+}
+
+static int
+tuple_bloom_decode_part(struct bloom *part, const char **data)
+{
+	memset(part, 0, sizeof(*part));
+	if (mp_decode_array(data) != 3)
+		unreachable();
+	part->table_size = mp_decode_uint(data);
+	part->hash_count = mp_decode_uint(data);
+	size_t store_size = mp_decode_binl(data);
+	assert(store_size == bloom_store_size(part));
+	if (bloom_load_table(part, *data, runtime.quota) != 0) {
+		diag_set(OutOfMemory, store_size, "bloom_load_table",
+			 "tuple bloom part");
+		return -1;
+	}
+	*data += store_size;
+	return 0;
+}
+
+size_t
+tuple_bloom_size(const struct tuple_bloom *bloom)
+{
+	size_t size = 0;
+	size += mp_sizeof_array(bloom->part_count);
+	for (uint32_t i = 0; i < bloom->part_count; i++)
+		size += tuple_bloom_sizeof_part(&bloom->parts[i]);
+	return size;
+}
+
+char *
+tuple_bloom_encode(const struct tuple_bloom *bloom, char *buf)
+{
+	buf = mp_encode_array(buf, bloom->part_count);
+	for (uint32_t i = 0; i < bloom->part_count; i++)
+		buf = tuple_bloom_encode_part(&bloom->parts[i], buf);
+	return buf;
+}
+
+struct tuple_bloom *
+tuple_bloom_decode(const char **data)
+{
+	uint32_t part_count = mp_decode_array(data);
+	struct tuple_bloom *bloom = malloc(sizeof(*bloom) +
+			part_count * sizeof(*bloom->parts));
+	if (bloom == NULL) {
+		diag_set(OutOfMemory, sizeof(*bloom) +
+			 part_count * sizeof(*bloom->parts),
+			 "malloc", "tuple bloom");
+		return NULL;
+	}
+	for (uint32_t i = 0; i < part_count; i++) {
+		if (tuple_bloom_decode_part(&bloom->parts[i], data) != 0) {
+			for (uint32_t j = 0; j < i; j++)
+				bloom_destroy(&bloom->parts[j], runtime.quota);
+			free(bloom);
+			return NULL;
+		}
+	}
+	bloom->part_count = part_count;
+	return bloom;
+}
diff --git a/src/box/tuple_bloom.h b/src/box/tuple_bloom.h
new file mode 100644
index 00000000..24a7e1cb
--- /dev/null
+++ b/src/box/tuple_bloom.h
@@ -0,0 +1,191 @@
+#ifndef TARANTOOL_BOX_TUPLE_BLOOM_H_INCLUDED
+#define TARANTOOL_BOX_TUPLE_BLOOM_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include "salad/bloom.h"
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct tuple;
+struct key_def;
+
+/**
+ * Tuple bloom filter.
+ *
+ * Consists of a set of bloom filters, one per each sub key.
+ * When a key is checked to be hashed in the bloom, all its
+ * sub keys are checked as well, which lowers the probability
+ * of false positive results.
+ */
+struct tuple_bloom {
+	/** Number of key parts. */
+	uint32_t part_count;
+	/** Array of bloom filters, one per each sub key. */
+	struct bloom parts[0];
+};
+
+/**
+ * Array of tuple hashes.
+ */
+struct tuple_hash_array {
+	/** Number of hashes stored in the array. */
+	uint32_t count;
+	/** Capacity of the array of hashes. */
+	uint32_t capacity;
+	/** Array of hashes. */
+	uint32_t *values;
+};
+
+/**
+ * Tuple bloom filter builder.
+ *
+ * Construction of a bloom filter proceeds as follows.
+ * First all tuple hashes are stored in a builder object.
+ * Once all hashes have been stored, a bloom filter of
+ * the optimal size and all the hashes are added to it.
+ */
+struct tuple_bloom_builder {
+	/** Number of key parts. */
+	uint32_t part_count;
+	/** Hash arrays, one per each sub key. */
+	struct tuple_hash_array parts[0];
+};
+
+/**
+ * Create a bloom filter builder.
+ * @param part_count - number of key parts
+ * @return bloom builder on success or NULL on OOM
+ */
+struct tuple_bloom_builder *
+tuple_bloom_builder_new(uint32_t part_count);
+
+/**
+ * Destroy a bloom filter builder.
+ * @param builder - bloom filter builder
+ */
+void
+tuple_bloom_builder_delete(struct tuple_bloom_builder *builder);
+
+/**
+ * Add a tuple hash to a bloom filter builder.
+ * @param builder - bloom filter builder
+ * @param tuple - tuple to add
+ * @param key_def - key definition
+ * @param hashed_parts - number of key parts that have already
+ *  been added to the builder
+ * @return 0 on success, -1 on OOM
+ */
+int
+tuple_bloom_builder_add(struct tuple_bloom_builder *builder,
+			const struct tuple *tuple,
+			const struct key_def *key_def,
+			uint32_t hashed_parts);
+
+/**
+ * Create a new bloom filter.
+ * @param builder - bloom filter builder
+ * @param fpr - desired false positive rate
+ * @return bloom filter on success or NULL on OOM
+ */
+struct tuple_bloom *
+tuple_bloom_new(struct tuple_bloom_builder *builder, double fpr);
+
+/**
+ * Delete a bloom filter.
+ * @param bloom - bloom filter to delete
+ */
+void
+tuple_bloom_delete(struct tuple_bloom *bloom);
+
+/**
+ * Check if a tuple was stored in a bloom filter.
+ * @param bloom - bloom filter
+ * @param tuple - tuple to check
+ * @param key_def - key definition
+ * @return true if the tuple may have been stored in the bloom,
+ *  false if the tuple is definitely not in the bloom
+ */
+bool
+tuple_bloom_possible_has(const struct tuple_bloom *bloom,
+			 const struct tuple *tuple,
+			 const struct key_def *key_def);
+
+/**
+ * Check if a tuple matching a key was stored in a bloom filter.
+ * @param bloom - bloom filter
+ * @param key - key to check
+ * @param part_count - number of parts in the key
+ * @param key_def - key definition
+ * @return true if there may be a tuple matching the key stored in
+ *  the bloom, false if there is definitely no such tuple
+ */
+bool
+tuple_bloom_possible_has_key(const struct tuple_bloom *bloom,
+			     const char *key, uint32_t part_count,
+			     const struct key_def *key_def);
+
+/**
+ * Return the size of a bloom filter when encoded.
+ * @param bloom - bloom filter
+ * @return size of the bloom filter, in bytes
+ */
+size_t
+tuple_bloom_size(const struct tuple_bloom *bloom);
+
+/**
+ * Encode a bloom filter in MsgPack.
+ * @param bloom - bloom filter
+ * @param buf - buffer where to store the bloom filter
+ * @return pointer to the first byte following encoded data
+ */
+char *
+tuple_bloom_encode(const struct tuple_bloom *bloom, char *buf);
+
+/**
+ * Decode a bloom filter from MsgPack.
+ * @param data - pointer to buffer storing encoded bloom filter;
+ *  on success it is advanced by the number of decoded bytes
+ * @return the decoded bloom on success or NULL on OOM
+ */
+struct tuple_bloom *
+tuple_bloom_decode(const char **data);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_BOX_TUPLE_BLOOM_H_INCLUDED */
diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc
index 13ebfc51..cfee0049 100644
--- a/src/box/tuple_compare.cc
+++ b/src/box/tuple_compare.cc
@@ -426,6 +426,30 @@ tuple_compare_field_with_hint(const char *field_a, enum mp_type a_type,
 	}
 }
 
+uint32_t
+tuple_common_key_parts(const struct tuple *tuple_a,
+		       const struct tuple *tuple_b,
+		       const struct key_def *key_def)
+{
+	uint32_t i;
+	for (i = 0; i < key_def->part_count; i++) {
+		const struct key_part *part = &key_def->parts[i];
+		const char *field_a = tuple_field(tuple_a, part->fieldno);
+		const char *field_b = tuple_field(tuple_b, part->fieldno);
+		enum mp_type a_type = field_a != NULL ?
+				      mp_typeof(*field_a) : MP_NIL;
+		enum mp_type b_type = field_b != NULL ?
+				      mp_typeof(*field_b) : MP_NIL;
+		if (a_type == MP_NIL && b_type == MP_NIL)
+			continue;
+		if (a_type == MP_NIL || b_type == MP_NIL ||
+		    tuple_compare_field_with_hint(field_a, a_type,
+				field_b, b_type, part->type, part->coll) != 0)
+			break;
+	}
+	return i;
+}
+
 template<bool is_nullable, bool has_optional_parts>
 static inline int
 tuple_compare_slowpath(const struct tuple *tuple_a, const struct tuple *tuple_b,
diff --git a/src/box/tuple_compare.h b/src/box/tuple_compare.h
index 3888bbb9..d80e1e70 100644
--- a/src/box/tuple_compare.h
+++ b/src/box/tuple_compare.h
@@ -42,6 +42,18 @@ extern "C" {
 #endif /* defined(__cplusplus) */
 
 /**
+ * Return the length of the longest common prefix of two tuples.
+ * @param tuple_a first tuple
+ * @param tuple_b second tuple
+ * @param key_def key defintion
+ * @return number of parts the two tuples have in common
+ */
+uint32_t
+tuple_common_key_parts(const struct tuple *tuple_a,
+		       const struct tuple *tuple_b,
+		       const struct key_def *key_def);
+
+/**
  * Create a comparison function for the key_def
  *
  * @param key_def key_definition
diff --git a/src/box/tuple_hash.cc b/src/box/tuple_hash.cc
index 0f7ba91d..0fa8ea56 100644
--- a/src/box/tuple_hash.cc
+++ b/src/box/tuple_hash.cc
@@ -262,7 +262,7 @@ slowpath:
 	key_def->key_hash = key_hash_slowpath;
 }
 
-static uint32_t
+uint32_t
 tuple_hash_field(uint32_t *ph1, uint32_t *pcarry, const char **field,
 		 struct coll *coll)
 {
@@ -308,6 +308,17 @@ tuple_hash_null(uint32_t *ph1, uint32_t *pcarry)
 	return mp_sizeof_nil();
 }
 
+uint32_t
+tuple_hash_key_part(uint32_t *ph1, uint32_t *pcarry,
+		    const struct tuple *tuple,
+		    const struct key_part *part)
+{
+	const char *field = tuple_field(tuple, part->fieldno);
+	if (field == NULL)
+		return tuple_hash_null(ph1, pcarry);
+	return tuple_hash_field(ph1, pcarry, &field, part->coll);
+}
+
 template <bool has_optional_parts>
 uint32_t
 tuple_hash_slowpath(const struct tuple *tuple, const struct key_def *key_def)
diff --git a/src/box/tuple_hash.h b/src/box/tuple_hash.h
index 4b4985ba..aab8f54e 100644
--- a/src/box/tuple_hash.h
+++ b/src/box/tuple_hash.h
@@ -45,6 +45,36 @@ void
 tuple_hash_func_set(struct key_def *def);
 
 /**
+ * Compute hash of a tuple field.
+ * @param ph1 - pointer to running hash
+ * @param pcarry - pointer to carry
+ * @param field - pointer to field data
+ * @param coll - collation to use for hashing strings or NULL
+ * @return size of processed data
+ *
+ * This function updates @ph1 and @pcarry and advances @field
+ * by the number of processed bytes.
+ */
+uint32_t
+tuple_hash_field(uint32_t *ph1, uint32_t *pcarry, const char **field,
+		 struct coll *coll);
+
+/**
+ * Compute hash of a key part.
+ * @param ph1 - pointer to running hash
+ * @param pcarry - pointer to carry
+ * @param tuple - tuple to hash
+ * @param part - key part
+ * @return size of processed data
+ *
+ * This function updates @ph1 and @pcarry.
+ */
+uint32_t
+tuple_hash_key_part(uint32_t *ph1, uint32_t *pcarry,
+		    const struct tuple *tuple,
+		    const struct key_part *part);
+
+/**
  * Calculates a common hash value for a tuple
  * @param tuple - a tuple
  * @param key_def - key_def for field description
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index 60180841..4e533053 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -40,7 +40,8 @@
 #include "coio_file.h"
 
 #include "replication.h"
-#include "tuple_hash.h" /* for bloom filter */
+#include "tuple_bloom.h"
+#include "tuple_compare.h"
 #include "xlog.h"
 #include "xrow.h"
 
@@ -57,8 +58,6 @@ static const uint64_t vy_run_info_key_map = (1 << VY_RUN_INFO_MIN_KEY) |
 					    (1 << VY_RUN_INFO_MAX_LSN) |
 					    (1 << VY_RUN_INFO_PAGE_COUNT);
 
-enum { VY_BLOOM_VERSION = 0 };
-
 /** xlog meta type for .run files */
 #define XLOG_META_TYPE_RUN "RUN"
 
@@ -238,7 +237,6 @@ vy_run_new(struct vy_run_env *env, int64_t id)
 	run->refs = 1;
 	rlist_create(&run->in_index);
 	rlist_create(&run->in_unused);
-	TRASH(&run->info.bloom);
 	return run;
 }
 
@@ -254,9 +252,10 @@ vy_run_clear(struct vy_run *run)
 	run->page_info = NULL;
 	run->page_index_size = 0;
 	run->info.page_count = 0;
-	if (run->info.has_bloom)
-		bloom_destroy(&run->info.bloom, runtime.quota);
-	run->info.has_bloom = false;
+	if (run->info.bloom != NULL) {
+		tuple_bloom_delete(run->info.bloom);
+		run->info.bloom = NULL;
+	}
 	free(run->info.min_key);
 	run->info.min_key = NULL;
 	free(run->info.max_key);
@@ -274,6 +273,12 @@ vy_run_delete(struct vy_run *run)
 	free(run);
 }
 
+size_t
+vy_run_bloom_size(struct vy_run *run)
+{
+	return run->info.bloom == NULL ? 0 : tuple_bloom_size(run->info.bloom);
+}
+
 /**
  * Find a page from which the iteration of a given key must be started.
  * LE and LT: the found page definitely contains the position
@@ -534,53 +539,6 @@ vy_page_info_decode(struct vy_page_info *page, const struct xrow_header *xrow,
 }
 
 /**
- * Read bloom filter from given buffer.
- * @param bloom - a bloom filter to read.
- * @param buffer[in/out] - a buffer to read from.
- *  The pointer is incremented on the number of bytes read.
- * @param filename Filename for error reporting.
- * @return - 0 on success or -1 on format/memory error
- */
-static int
-vy_run_bloom_decode(struct bloom *bloom, const char **buffer,
-		    const char *filename)
-{
-	const char **pos = buffer;
-	memset(bloom, 0, sizeof(*bloom));
-	uint32_t array_size = mp_decode_array(pos);
-	if (array_size != 4) {
-		diag_set(ClientError, ER_INVALID_INDEX_FILE, filename,
-			 tt_sprintf("Can't decode bloom meta: "
-				    "wrong array size (expected %d, got %u)",
-				    4, (unsigned)array_size));
-		return -1;
-	}
-	uint64_t version = mp_decode_uint(pos);
-	if (version != VY_BLOOM_VERSION) {
-		diag_set(ClientError, ER_INVALID_INDEX_FILE, filename,
-			 tt_sprintf("Can't decode bloom meta: "
-				    "wrong version (expected %d, got %u)",
-				    VY_BLOOM_VERSION, (unsigned)version));
-	}
-	bloom->table_size = mp_decode_uint(pos);
-	bloom->hash_count = mp_decode_uint(pos);
-	size_t table_size = mp_decode_binl(pos);
-	if (table_size != bloom_store_size(bloom)) {
-		diag_set(ClientError, ER_INVALID_INDEX_FILE, filename,
-			 tt_sprintf("Can't decode bloom meta: "
-				    "wrong table size (expected %zu, got %zu)",
-				    bloom_store_size(bloom), table_size));
-		return -1;
-	}
-	if (bloom_load_table(bloom, *pos, runtime.quota) != 0) {
-		diag_set(OutOfMemory, bloom_store_size(bloom), "mmap", "bloom");
-		return -1;
-	}
-	*pos += table_size;
-	return 0;
-}
-
-/**
  * Decode the run metadata from xrow.
  *
  * @param xrow xrow to decode
@@ -631,11 +589,11 @@ vy_run_info_decode(struct vy_run_info *run_info,
 		case VY_RUN_INFO_PAGE_COUNT:
 			run_info->page_count = mp_decode_uint(&pos);
 			break;
+		case VY_RUN_INFO_BLOOM_LEGACY:
+			continue;
 		case VY_RUN_INFO_BLOOM:
-			if (vy_run_bloom_decode(&run_info->bloom, &pos,
-						filename) == 0)
-				run_info->has_bloom = true;
-			else
+			run_info->bloom = tuple_bloom_decode(&pos);
+			if (run_info->bloom == NULL)
 				return -1;
 			break;
 		default:
@@ -1321,18 +1279,20 @@ vy_run_iterator_do_seek(struct vy_run_iterator *itr,
 
 	*ret = NULL;
 
+	struct tuple_bloom *bloom = run->info.bloom;
 	const struct key_def *key_def = itr->key_def;
-	bool is_full_key = (tuple_field_count(key) >= key_def->part_count);
-	if (run->info.has_bloom && iterator_type == ITER_EQ && is_full_key) {
-		uint32_t hash;
+	if (iterator_type == ITER_EQ && bloom != NULL) {
+		bool need_lookup;
 		if (vy_stmt_type(key) == IPROTO_SELECT) {
 			const char *data = tuple_data(key);
-			mp_decode_array(&data);
-			hash = key_hash(data, key_def);
+			uint32_t part_count = mp_decode_array(&data);
+			need_lookup = tuple_bloom_possible_has_key(bloom,
+						data, part_count, key_def);
 		} else {
-			hash = tuple_hash(key, key_def);
+			need_lookup = tuple_bloom_possible_has(bloom, key,
+							       key_def);
 		}
-		if (!bloom_possible_has(&run->info.bloom, hash)) {
+		if (!need_lookup) {
 			itr->search_ended = true;
 			itr->stat->bloom_hit++;
 			return 0;
@@ -1378,7 +1338,7 @@ vy_run_iterator_do_seek(struct vy_run_iterator *itr,
 	if (iterator_type == ITER_EQ && !equal_found) {
 		vy_run_iterator_cache_clean(itr);
 		itr->search_ended = true;
-		if (run->info.has_bloom && is_full_key)
+		if (bloom != NULL)
 			itr->stat->bloom_miss++;
 		return 0;
 	}
@@ -1978,10 +1938,10 @@ vy_run_alloc_page_info(struct vy_run *run, uint32_t *page_info_capacity)
 static int
 vy_run_write_page(struct vy_run *run, struct xlog *data_xlog,
 		  struct vy_stmt_stream *wi, struct tuple **curr_stmt,
-		  uint64_t page_size, struct bloom_spectrum *bs,
 		  const struct key_def *cmp_def,
 		  const struct key_def *key_def, bool is_primary,
-		  uint32_t *page_info_capacity)
+		  uint64_t page_size, uint32_t *page_info_capacity,
+		  struct tuple_bloom_builder *bloom_builder)
 {
 	assert(curr_stmt != NULL);
 	assert(*curr_stmt != NULL);
@@ -2032,9 +1992,6 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog,
 				     cmp_def, is_primary) != 0)
 			goto error_rollback;
 
-		if (bs != NULL)
-			bloom_spectrum_add(bs, tuple_hash(*curr_stmt, key_def));
-
 		int64_t lsn = vy_stmt_lsn(*curr_stmt);
 		run->info.min_lsn = MIN(run->info.min_lsn, lsn);
 		run->info.max_lsn = MAX(run->info.max_lsn, lsn);
@@ -2042,6 +1999,14 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog,
 		if (wi->iface->next(wi, curr_stmt))
 			goto error_rollback;
 
+		if (bloom_builder != NULL) {
+			uint32_t hashed_parts = *curr_stmt == NULL ? 0 :
+				tuple_common_key_parts(*curr_stmt,
+						       last_stmt, key_def);
+			tuple_bloom_builder_add(bloom_builder, last_stmt,
+						key_def, hashed_parts);
+		}
+
 		if (*curr_stmt == NULL) {
 			end_of_run = true;
 		} else {
@@ -2124,10 +2089,10 @@ error_row_index:
 static int
 vy_run_write_data(struct vy_run *run, const char *dirpath,
 		  uint32_t space_id, uint32_t iid,
-		  struct vy_stmt_stream *wi, uint64_t page_size,
+		  struct vy_stmt_stream *wi,
 		  const struct key_def *cmp_def,
 		  const struct key_def *key_def,
-		  size_t max_output_count, double bloom_fpr)
+		  uint64_t page_size, double bloom_fpr)
 {
 	struct tuple *stmt;
 
@@ -2141,13 +2106,11 @@ vy_run_write_data(struct vy_run *run, const char *dirpath,
 	if (stmt == NULL)
 		goto done;
 
-	struct bloom_spectrum bs;
-	bool has_bloom = bloom_fpr < 1;
-	if (has_bloom && bloom_spectrum_create(&bs, max_output_count,
-					bloom_fpr, runtime.quota) != 0) {
-		diag_set(OutOfMemory, 0,
-			 "bloom_spectrum_create", "bloom_spectrum");
-		goto err;
+	struct tuple_bloom_builder *bloom_builder = NULL;
+	if (bloom_fpr < 1) {
+		bloom_builder = tuple_bloom_builder_new(key_def->part_count);
+		if (bloom_builder == NULL)
+			goto err;
 	}
 
 	char path[PATH_MAX];
@@ -2171,9 +2134,10 @@ vy_run_write_data(struct vy_run *run, const char *dirpath,
 	uint32_t page_info_capacity = 0;
 	int rc;
 	do {
-		rc = vy_run_write_page(run, &data_xlog, wi, &stmt, page_size,
-				       has_bloom ? &bs : NULL, cmp_def, key_def,
-				       iid == 0, &page_info_capacity);
+		rc = vy_run_write_page(run, &data_xlog, wi, &stmt,
+				       cmp_def, key_def, iid == 0,
+				       page_size, &page_info_capacity,
+				       bloom_builder);
 		if (rc < 0)
 			goto err_close_xlog;
 		fiber_gc();
@@ -2188,10 +2152,11 @@ vy_run_write_data(struct vy_run *run, const char *dirpath,
 	xlog_close(&data_xlog, true);
 	fiber_gc();
 
-	if (has_bloom) {
-		bloom_spectrum_choose(&bs, &run->info.bloom);
-		run->info.has_bloom = true;
-		bloom_spectrum_destroy(&bs, runtime.quota);
+	if (bloom_builder != NULL) {
+		run->info.bloom = tuple_bloom_new(bloom_builder, bloom_fpr);
+		if (run->info.bloom == NULL)
+			goto err_free_bloom;
+		tuple_bloom_builder_delete(bloom_builder);
 	}
 done:
 	wi->iface->stop(wi);
@@ -2201,8 +2166,8 @@ err_close_xlog:
 	xlog_close(&data_xlog, false);
 	fiber_gc();
 err_free_bloom:
-	if (has_bloom)
-		bloom_spectrum_destroy(&bs, runtime.quota);
+	if (bloom_builder != NULL)
+		tuple_bloom_builder_delete(bloom_builder);
 err:
 	wi->iface->stop(wi);
 	return -1;
@@ -2284,42 +2249,6 @@ vy_page_info_encode(const struct vy_page_info *page_info,
 /** {{{ vy_run_info */
 
 /**
- * Calculate the size on disk that is needed to store give bloom filter.
- * @param bloom - storing bloom filter.
- * @return - calculated size.
- */
-static size_t
-vy_run_bloom_encode_size(const struct bloom *bloom)
-{
-	size_t size = mp_sizeof_array(4);
-	size += mp_sizeof_uint(VY_BLOOM_VERSION); /* version */
-	size += mp_sizeof_uint(bloom->table_size);
-	size += mp_sizeof_uint(bloom->hash_count);
-	size += mp_sizeof_bin(bloom_store_size(bloom));
-	return size;
-}
-
-/**
- * Write bloom filter to given buffer.
- * The buffer must have at least vy_run_bloom_encode_size()
- * @param bloom - a bloom filter to write.
- * @param buffer - a buffer to write to.
- * @return - buffer + number of bytes written.
- */
-char *
-vy_run_bloom_encode(const struct bloom *bloom, char *buffer)
-{
-	char *pos = buffer;
-	pos = mp_encode_array(pos, 4);
-	pos = mp_encode_uint(pos, VY_BLOOM_VERSION);
-	pos = mp_encode_uint(pos, bloom->table_size);
-	pos = mp_encode_uint(pos, bloom->hash_count);
-	pos = mp_encode_binl(pos, bloom_store_size(bloom));
-	pos = bloom_store(bloom, pos);
-	return pos;
-}
-
-/**
  * Encode vy_run_info as xrow
  * Allocates using region alloc
  *
@@ -2342,7 +2271,7 @@ vy_run_info_encode(const struct vy_run_info *run_info,
 	size_t max_key_size = tmp - run_info->max_key;
 
 	uint32_t key_count = 5;
-	if (run_info->has_bloom)
+	if (run_info->bloom != NULL)
 		key_count++;
 
 	size_t size = mp_sizeof_map(key_count);
@@ -2354,9 +2283,9 @@ vy_run_info_encode(const struct vy_run_info *run_info,
 		mp_sizeof_uint(run_info->max_lsn);
 	size += mp_sizeof_uint(VY_RUN_INFO_PAGE_COUNT) +
 		mp_sizeof_uint(run_info->page_count);
-	if (run_info->has_bloom)
+	if (run_info->bloom != NULL)
 		size += mp_sizeof_uint(VY_RUN_INFO_BLOOM) +
-			vy_run_bloom_encode_size(&run_info->bloom);
+			tuple_bloom_size(run_info->bloom);
 
 	char *pos = region_alloc(&fiber()->gc, size);
 	if (pos == NULL) {
@@ -2379,9 +2308,9 @@ vy_run_info_encode(const struct vy_run_info *run_info,
 	pos = mp_encode_uint(pos, run_info->max_lsn);
 	pos = mp_encode_uint(pos, VY_RUN_INFO_PAGE_COUNT);
 	pos = mp_encode_uint(pos, run_info->page_count);
-	if (run_info->has_bloom) {
+	if (run_info->bloom != NULL) {
 		pos = mp_encode_uint(pos, VY_RUN_INFO_BLOOM);
-		pos = vy_run_bloom_encode(&run_info->bloom, pos);
+		pos = tuple_bloom_encode(run_info->bloom, pos);
 	}
 	xrow->body->iov_len = (void *)pos - xrow->body->iov_base;
 	xrow->bodycnt = 1;
@@ -2453,10 +2382,10 @@ fail:
 int
 vy_run_write(struct vy_run *run, const char *dirpath,
 	     uint32_t space_id, uint32_t iid,
-	     struct vy_stmt_stream *wi, uint64_t page_size,
+	     struct vy_stmt_stream *wi,
 	     const struct key_def *cmp_def,
 	     const struct key_def *key_def,
-	     size_t max_output_count, double bloom_fpr)
+	     uint64_t page_size, double bloom_fpr)
 {
 	ERROR_INJECT(ERRINJ_VY_RUN_WRITE,
 		     {diag_set(ClientError, ER_INJECTION,
@@ -2466,9 +2395,8 @@ vy_run_write(struct vy_run *run, const char *dirpath,
 	if (inj != NULL && inj->dparam > 0)
 		usleep(inj->dparam * 1000000);
 
-	if (vy_run_write_data(run, dirpath, space_id, iid,
-			      wi, page_size, cmp_def, key_def,
-			      max_output_count, bloom_fpr) != 0)
+	if (vy_run_write_data(run, dirpath, space_id, iid, wi,
+			      cmp_def, key_def, page_size, bloom_fpr) != 0)
 		return -1;
 
 	if (vy_run_is_empty(run))
@@ -2489,7 +2417,7 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 		     struct tuple_format *upsert_format,
 		     const struct index_opts *opts)
 {
-	assert(run->info.has_bloom == false);
+	assert(run->info.bloom == NULL);
 	assert(run->page_info == NULL);
 	struct region *region = &fiber()->gc;
 	size_t mem_used = region_used(region);
@@ -2505,11 +2433,18 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 
 	int rc = 0;
 	uint32_t page_info_capacity = 0;
-	uint32_t run_row_count = 0;
 
 	const char *key = NULL;
 	int64_t max_lsn = 0;
 	int64_t min_lsn = INT64_MAX;
+	struct tuple *prev_tuple = NULL;
+
+	struct tuple_bloom_builder *bloom_builder = NULL;
+	if (opts->bloom_fpr < 1) {
+		bloom_builder = tuple_bloom_builder_new(key_def->part_count);
+		if (bloom_builder == NULL)
+			goto close_err;
+	}
 
 	off_t page_offset, next_page_offset = xlog_cursor_pos(&cursor);
 	while ((rc = xlog_cursor_next_tx(&cursor)) == 0) {
@@ -2536,8 +2471,17 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 					mem_format, upsert_format, iid == 0);
 			if (tuple == NULL)
 				goto close_err;
+			if (bloom_builder != NULL) {
+				uint32_t hashed_parts = prev_tuple == NULL ? 0 :
+					tuple_common_key_parts(prev_tuple,
+							       tuple, key_def);
+				tuple_bloom_builder_add(bloom_builder, tuple,
+							key_def, hashed_parts);
+			}
 			key = tuple_extract_key(tuple, cmp_def, NULL);
-			tuple_unref(tuple);
+			if (prev_tuple != NULL)
+				tuple_unref(prev_tuple);
+			prev_tuple = tuple;
 			if (key == NULL)
 				goto close_err;
 			if (run->info.min_key == NULL) {
@@ -2562,11 +2506,15 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 		info->unpacked_size = xlog_cursor_tx_pos(&cursor);
 		info->row_index_offset = page_row_index_offset;
 		++run->info.page_count;
-		run_row_count += page_row_count;
 		vy_run_acct_page(run, info);
 		region_truncate(region, mem_used);
 	}
 
+	if (prev_tuple != NULL) {
+		tuple_unref(prev_tuple);
+		prev_tuple = NULL;
+	}
+
 	if (key != NULL) {
 		run->info.max_key = vy_key_dup(key);
 		if (run->info.max_key == NULL)
@@ -2575,32 +2523,19 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 	run->info.max_lsn = max_lsn;
 	run->info.min_lsn = min_lsn;
 
-	if (opts->bloom_fpr >= 1)
-		goto done;
-	if (xlog_cursor_reset(&cursor) != 0)
-		goto close_err;
-	if (bloom_create(&run->info.bloom, run_row_count,
-			 opts->bloom_fpr, runtime.quota) != 0) {
-		diag_set(OutOfMemory, 0,
-			 "bloom_create", "bloom");
-		goto close_err;
-	}
-	struct xrow_header xrow;
-	while ((rc = xlog_cursor_next(&cursor, &xrow, false)) == 0) {
-		if (xrow.type == VY_RUN_ROW_INDEX)
-			continue;
-
-		struct tuple *tuple = vy_stmt_decode(&xrow, cmp_def, mem_format,
-						     upsert_format, iid == 0);
-		if (tuple == NULL)
-			goto close_err;
-		bloom_add(&run->info.bloom, tuple_hash(tuple, key_def));
-	}
-	run->info.has_bloom = true;
-done:
 	region_truncate(region, mem_used);
 	run->fd = cursor.fd;
 	xlog_cursor_close(&cursor, true);
+
+	if (bloom_builder != NULL) {
+		run->info.bloom = tuple_bloom_new(bloom_builder,
+						  opts->bloom_fpr);
+		if (run->info.bloom == NULL)
+			goto close_err;
+		tuple_bloom_builder_delete(bloom_builder);
+		bloom_builder = NULL;
+	}
+
 	/* New run index is ready for write, unlink old file if exists */
 	vy_run_snprint_path(path, sizeof(path), dir,
 			    space_id, iid, run->id, VY_FILE_INDEX);
@@ -2615,7 +2550,12 @@ done:
 close_err:
 	vy_run_clear(run);
 	region_truncate(region, mem_used);
-	xlog_cursor_close(&cursor, false);
+	if (prev_tuple != NULL)
+		tuple_unref(prev_tuple);
+	if (bloom_builder != NULL)
+		tuple_bloom_builder_delete(bloom_builder);
+	if (xlog_cursor_is_open(&cursor))
+		xlog_cursor_close(&cursor, false);
 	return -1;
 }
 
diff --git a/src/box/vy_run.h b/src/box/vy_run.h
index de54937c..2ea06dcd 100644
--- a/src/box/vy_run.h
+++ b/src/box/vy_run.h
@@ -43,7 +43,6 @@
 #include "index_def.h"
 
 #include "small/mempool.h"
-#include "salad/bloom.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -82,10 +81,8 @@ struct vy_run_info {
 	int64_t max_lsn;
 	/** Number of pages in the run. */
 	uint32_t page_count;
-	/** Set iff bloom filter is available. */
-	bool has_bloom;
 	/** Bloom filter of all tuples in run */
-	struct bloom bloom;
+	struct tuple_bloom *bloom;
 };
 
 /**
@@ -301,11 +298,11 @@ vy_run_env_destroy(struct vy_run_env *env);
 void
 vy_run_env_enable_coio(struct vy_run_env *env, int threads);
 
-static inline size_t
-vy_run_bloom_size(struct vy_run *run)
-{
-	return run->info.has_bloom ? bloom_store_size(&run->info.bloom) : 0;
-}
+/**
+ * Return the size of a run bloom filter.
+ */
+size_t
+vy_run_bloom_size(struct vy_run *run);
 
 static inline struct vy_page_info *
 vy_run_page_info(struct vy_run *run, uint32_t pos)
@@ -421,10 +418,10 @@ vy_run_remove_files(const char *dir, uint32_t space_id,
 int
 vy_run_write(struct vy_run *run, const char *dirpath,
 	     uint32_t space_id, uint32_t iid,
-	     struct vy_stmt_stream *wi, uint64_t page_size,
+	     struct vy_stmt_stream *wi,
 	     const struct key_def *cmp_def,
 	     const struct key_def *key_def,
-	     size_t max_output_count, double bloom_fpr);
+	     uint64_t page_size, double bloom_fpr);
 
 /**
  * Allocate a new run slice.
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index df8eb87f..4a7e304b 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -131,12 +131,6 @@ struct vy_task {
 	 */
 	struct stailq_entry link;
 	/**
-	 * An estimate of the maximal number of statements that
-	 * can be written by the task. Used to create a bloom
-	 * filter of the perfect size.
-	 */
-	size_t max_output_count;
-	/**
 	 * Index options may be modified while a task is in
 	 * progress so we save them here to safely access them
 	 * from another thread.
@@ -628,9 +622,8 @@ vy_task_dump_execute(struct vy_task *task)
 
 	return vy_run_write(task->new_run, index->env->path,
 			    index->space_id, index->id, task->wi,
-			    task->page_size, index->cmp_def,
-			    index->key_def, task->max_output_count,
-			    task->bloom_fpr);
+			    index->cmp_def, index->key_def,
+			    task->page_size, task->bloom_fpr);
 }
 
 static int
@@ -900,7 +893,6 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index,
 	 * eligible for dump are over.
 	 */
 	int64_t dump_lsn = -1;
-	size_t max_output_count = 0;
 	struct vy_mem *mem, *next_mem;
 	rlist_foreach_entry_safe(mem, &index->sealed, in_sealed, next_mem) {
 		if (mem->generation > scheduler->dump_generation)
@@ -915,10 +907,9 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index,
 			continue;
 		}
 		dump_lsn = MAX(dump_lsn, mem->max_lsn);
-		max_output_count += mem->tree.size;
 	}
 
-	if (max_output_count == 0) {
+	if (dump_lsn < 0) {
 		/* Nothing to do, pick another index. */
 		vy_scheduler_update_index(scheduler, index);
 		vy_scheduler_complete_dump(scheduler);
@@ -934,7 +925,6 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index,
 	if (new_run == NULL)
 		goto err_run;
 
-	assert(dump_lsn >= 0);
 	new_run->dump_lsn = dump_lsn;
 
 	struct vy_stmt_stream *wi;
@@ -953,7 +943,6 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index,
 
 	task->new_run = new_run;
 	task->wi = wi;
-	task->max_output_count = max_output_count;
 	task->bloom_fpr = index->opts.bloom_fpr;
 	task->page_size = index->opts.page_size;
 
@@ -997,9 +986,8 @@ vy_task_compact_execute(struct vy_task *task)
 
 	return vy_run_write(task->new_run, index->env->path,
 			    index->space_id, index->id, task->wi,
-			    task->page_size, index->cmp_def,
-			    index->key_def, task->max_output_count,
-			    task->bloom_fpr);
+			    index->cmp_def, index->key_def,
+			    task->page_size, task->bloom_fpr);
 }
 
 static int
@@ -1231,11 +1219,8 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_index *index,
 	rlist_foreach_entry(slice, &range->slices, in_range) {
 		if (vy_write_iterator_new_slice(wi, slice) != 0)
 			goto err_wi_sub;
-
-		task->max_output_count += slice->count.rows;
 		new_run->dump_lsn = MAX(new_run->dump_lsn,
 					slice->run->dump_lsn);
-
 		/* Remember the slices we are compacting. */
 		if (task->first_slice == NULL)
 			task->first_slice = slice;
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 4655d53f..d5967353 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -1871,33 +1871,6 @@ error:
 	return -1;
 }
 
-int
-xlog_cursor_reset(struct xlog_cursor *cursor)
-{
-	assert(xlog_cursor_is_open(cursor));
-	cursor->rbuf.rpos = cursor->rbuf.buf;
-	if (ibuf_used(&cursor->rbuf) != (size_t)cursor->read_offset) {
-		cursor->rbuf.wpos = cursor->rbuf.buf;
-		cursor->read_offset = 0;
-	}
-	if (cursor->state == XLOG_CURSOR_TX)
-		xlog_tx_cursor_destroy(&cursor->tx_cursor);
-	if (xlog_cursor_ensure(cursor, XLOG_META_LEN_MAX) == -1)
-		return -1;
-	int rc;
-	rc = xlog_meta_parse(&cursor->meta,
-			     (const char **)&cursor->rbuf.rpos,
-			     (const char *)cursor->rbuf.wpos);
-	if (rc == -1)
-		return -1;
-	if (rc > 0) {
-		diag_set(XlogError, "Unexpected end of file");
-		return -1;
-	}
-	cursor->state = XLOG_CURSOR_ACTIVE;
-	return 0;
-}
-
 void
 xlog_cursor_close(struct xlog_cursor *i, bool reuse_fd)
 {
diff --git a/src/box/xlog.h b/src/box/xlog.h
index 973910d1..4b9a5765 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -628,15 +628,6 @@ xlog_cursor_openmem(struct xlog_cursor *cursor, const char *data, size_t size,
 		    const char *name);
 
 /**
- * Reset cursor position
- * @param cursor cursor
- * @retval 0 succes
- * @retval -1 error, check diag
- */
-int
-xlog_cursor_reset(struct xlog_cursor *cursor);
-
-/**
  * Close cursor
  * @param cursor cursor
  */
diff --git a/test/unit/vy_point_lookup.c b/test/unit/vy_point_lookup.c
index d360b3b4..4cc05b6d 100644
--- a/test/unit/vy_point_lookup.c
+++ b/test/unit/vy_point_lookup.c
@@ -163,9 +163,8 @@ test_basic()
 	struct vy_run *run = vy_run_new(&run_env, 1);
 	isnt(run, NULL, "vy_run_new");
 
-	rc = vy_run_write(run, dir_name, 0, pk->id,
-			  write_stream, 4096, pk->cmp_def, pk->key_def,
-			  100500, 0.1);
+	rc = vy_run_write(run, dir_name, 0, pk->id, write_stream,
+			  pk->cmp_def, pk->key_def, 4096, 0.1);
 	is(rc, 0, "vy_run_write");
 
 	write_stream->iface->close(write_stream);
@@ -200,9 +199,8 @@ test_basic()
 	run = vy_run_new(&run_env, 2);
 	isnt(run, NULL, "vy_run_new");
 
-	rc = vy_run_write(run, dir_name, 0, pk->id,
-			  write_stream, 4096, pk->cmp_def, pk->key_def,
-			  100500, 0.1);
+	rc = vy_run_write(run, dir_name, 0, pk->id, write_stream,
+			  pk->cmp_def, pk->key_def, 4096, 0.1);
 	is(rc, 0, "vy_run_write");
 
 	write_stream->iface->close(write_stream);
diff --git a/test/vinyl/bloom.result b/test/vinyl/bloom.result
index 3c5b503d..2c718697 100644
--- a/test/vinyl/bloom.result
+++ b/test/vinyl/bloom.result
@@ -41,7 +41,7 @@ s:drop()
 s = box.schema.space.create('test', {engine = 'vinyl'})
 ---
 ...
-_ = s:create_index('pk')
+_ = s:create_index('pk', {parts = {1, 'unsigned', 2, 'unsigned', 3, 'unsigned', 4, 'unsigned'}})
 ---
 ...
 reflects = 0
@@ -62,7 +62,7 @@ function cur_seeks() return box.space.test.index.pk:info().disk.iterator.lookup
 function new_seeks() local o = seeks seeks = cur_seeks() return seeks - o end
 ---
 ...
-for i = 1,1000 do s:replace{i} end
+for i = 1, 1000 do s:replace{math.ceil(i / 10), math.ceil(i / 2), i, i * 2} end
 ---
 ...
 box.snapshot()
@@ -75,7 +75,29 @@ _ = new_reflects()
 _ = new_seeks()
 ---
 ...
-for i = 1,1000 do s:select{i} end
+for i = 1, 100 do s:select{i} end
+---
+...
+new_reflects() == 0
+---
+- true
+...
+new_seeks() == 100
+---
+- true
+...
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2)} end
+---
+...
+new_reflects() == 0
+---
+- true
+...
+new_seeks() == 1000
+---
+- true
+...
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2), i} end
 ---
 ...
 new_reflects() == 0
@@ -86,7 +108,51 @@ new_seeks() == 1000
 ---
 - true
 ...
-for i = 1001,2000 do s:select{i} end
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2), i, i * 2} end
+---
+...
+new_reflects() == 0
+---
+- true
+...
+new_seeks() == 1000
+---
+- true
+...
+for i = 1001, 2000 do s:select{i} end
+---
+...
+new_reflects() > 980
+---
+- true
+...
+new_seeks() < 20
+---
+- true
+...
+for i = 1, 1000 do s:select{i, i} end
+---
+...
+new_reflects() > 980
+---
+- true
+...
+new_seeks() < 20
+---
+- true
+...
+for i = 1, 1000 do s:select{i, i, i} end
+---
+...
+new_reflects() > 980
+---
+- true
+...
+new_seeks() < 20
+---
+- true
+...
+for i = 1, 1000 do s:select{i, i, i, i} end
 ---
 ...
 new_reflects() > 980
@@ -125,7 +191,29 @@ _ = new_reflects()
 _ = new_seeks()
 ---
 ...
-for i = 1,1000 do s:select{i} end
+for i = 1, 100 do s:select{i} end
+---
+...
+new_reflects() == 0
+---
+- true
+...
+new_seeks() == 100
+---
+- true
+...
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2)} end
+---
+...
+new_reflects() == 0
+---
+- true
+...
+new_seeks() == 1000
+---
+- true
+...
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2), i} end
 ---
 ...
 new_reflects() == 0
@@ -136,7 +224,51 @@ new_seeks() == 1000
 ---
 - true
 ...
-for i = 1001,2000 do s:select{i} end
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2), i, i * 2} end
+---
+...
+new_reflects() == 0
+---
+- true
+...
+new_seeks() == 1000
+---
+- true
+...
+for i = 1001, 2000 do s:select{i} end
+---
+...
+new_reflects() > 980
+---
+- true
+...
+new_seeks() < 20
+---
+- true
+...
+for i = 1, 1000 do s:select{i, i} end
+---
+...
+new_reflects() > 980
+---
+- true
+...
+new_seeks() < 20
+---
+- true
+...
+for i = 1, 1000 do s:select{i, i, i} end
+---
+...
+new_reflects() > 980
+---
+- true
+...
+new_seeks() < 20
+---
+- true
+...
+for i = 1, 1000 do s:select{i, i, i, i} end
 ---
 ...
 new_reflects() > 980
diff --git a/test/vinyl/bloom.test.lua b/test/vinyl/bloom.test.lua
index 5fecfde1..7165eec6 100644
--- a/test/vinyl/bloom.test.lua
+++ b/test/vinyl/bloom.test.lua
@@ -15,7 +15,7 @@ stat.disk.iterator.bloom.miss -- 0
 s:drop()
 
 s = box.schema.space.create('test', {engine = 'vinyl'})
-_ = s:create_index('pk')
+_ = s:create_index('pk', {parts = {1, 'unsigned', 2, 'unsigned', 3, 'unsigned', 4, 'unsigned'}})
 
 reflects = 0
 function cur_reflects() return box.space.test.index.pk:info().disk.iterator.bloom.hit end
@@ -24,16 +24,40 @@ seeks = 0
 function cur_seeks() return box.space.test.index.pk:info().disk.iterator.lookup end
 function new_seeks() local o = seeks seeks = cur_seeks() return seeks - o end
 
-for i = 1,1000 do s:replace{i} end
+for i = 1, 1000 do s:replace{math.ceil(i / 10), math.ceil(i / 2), i, i * 2} end
 box.snapshot()
 _ = new_reflects()
 _ = new_seeks()
 
-for i = 1,1000 do s:select{i} end
+for i = 1, 100 do s:select{i} end
+new_reflects() == 0
+new_seeks() == 100
+
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2)} end
+new_reflects() == 0
+new_seeks() == 1000
+
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2), i} end
 new_reflects() == 0
 new_seeks() == 1000
 
-for i = 1001,2000 do s:select{i} end
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2), i, i * 2} end
+new_reflects() == 0
+new_seeks() == 1000
+
+for i = 1001, 2000 do s:select{i} end
+new_reflects() > 980
+new_seeks() < 20
+
+for i = 1, 1000 do s:select{i, i} end
+new_reflects() > 980
+new_seeks() < 20
+
+for i = 1, 1000 do s:select{i, i, i} end
+new_reflects() > 980
+new_seeks() < 20
+
+for i = 1, 1000 do s:select{i, i, i, i} end
 new_reflects() > 980
 new_seeks() < 20
 
@@ -51,11 +75,35 @@ function new_seeks() local o = seeks seeks = cur_seeks() return seeks - o end
 _ = new_reflects()
 _ = new_seeks()
 
-for i = 1,1000 do s:select{i} end
+for i = 1, 100 do s:select{i} end
+new_reflects() == 0
+new_seeks() == 100
+
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2)} end
+new_reflects() == 0
+new_seeks() == 1000
+
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2), i} end
 new_reflects() == 0
 new_seeks() == 1000
 
-for i = 1001,2000 do s:select{i} end
+for i = 1, 1000 do s:select{math.ceil(i / 10), math.ceil(i / 2), i, i * 2} end
+new_reflects() == 0
+new_seeks() == 1000
+
+for i = 1001, 2000 do s:select{i} end
+new_reflects() > 980
+new_seeks() < 20
+
+for i = 1, 1000 do s:select{i, i} end
+new_reflects() > 980
+new_seeks() < 20
+
+for i = 1, 1000 do s:select{i, i, i} end
+new_reflects() > 980
+new_seeks() < 20
+
+for i = 1, 1000 do s:select{i, i, i, i} end
 new_reflects() > 980
 new_seeks() < 20
 
diff --git a/test/vinyl/info.result b/test/vinyl/info.result
index d87679bf..7605722c 100644
--- a/test/vinyl/info.result
+++ b/test/vinyl/info.result
@@ -266,7 +266,7 @@ stat_diff(istat(), st)
         bytes: 26049
     index_size: 294
     rows: 25
-    bloom_size: 64
+    bloom_size: 70
     pages: 7
     bytes: 26049
     bytes_compressed: <bytes_compressed>
@@ -1009,8 +1009,8 @@ i1:len(), i2:len()
 ...
 i1:bsize(), i2:bsize()
 ---
-- 358
-- 914
+- 364
+- 920
 ...
 s:bsize() == st1.disk.bytes
 ---
@@ -1055,8 +1055,8 @@ i1:len(), i2:len()
 ...
 i1:bsize(), i2:bsize()
 ---
-- 49510
-- 50066
+- 49516
+- 50072
 ...
 s:bsize() == st1.memory.bytes + st1.disk.bytes
 ---
@@ -1105,8 +1105,8 @@ i1:len(), i2:len()
 ...
 i1:bsize(), i2:bsize()
 ---
-- 358
-- 914
+- 364
+- 920
 ...
 s:bsize() == st1.disk.bytes
 ---
-- 
2.11.0




More information about the Tarantool-patches mailing list