[PATCH 01/13] vinyl: store tuple comparison hints in memory tree

Vladimir Davydov vdavydov.dev at gmail.com
Tue Apr 2 20:33:38 MSK 2019


This patch incorporates tuple comparison hints into vy_mem_tree,
similarly to how it was done in case of memtx_tree.

Apart from speeding up lookups, this is also needed for multikey index
support, because multikey indexes will reuse hints to store offsets of
indexed array entries.

While we are at it, rename tree_mem_key to vy_mem_tree_key.
---
 src/box/key_def.h        |  19 +++-
 src/box/tuple_compare.cc |  11 +++
 src/box/vinyl.c          |  27 +++---
 src/box/vy_mem.c         | 224 +++++++++++++++++++++++++++--------------------
 src/box/vy_mem.h         |  59 ++++++++++---
 src/box/vy_stmt.h        |  45 ++++++++++
 6 files changed, 263 insertions(+), 122 deletions(-)

diff --git a/src/box/key_def.h b/src/box/key_def.h
index 288cf727..7bb2986b 100644
--- a/src/box/key_def.h
+++ b/src/box/key_def.h
@@ -577,9 +577,7 @@ tuple_extract_key_raw(const char *data, const char *data_end,
 /**
  * Compare keys using the key definition.
  * @param key_a key parts with MessagePack array header
- * @param part_count_a the number of parts in the key_a
  * @param key_b key_parts with MessagePack array header
- * @param part_count_b the number of parts in the key_b
  * @param key_def key definition
  *
  * @retval 0  if key_a == key_b
@@ -590,6 +588,23 @@ int
 key_compare(const char *key_a, const char *key_b, struct key_def *key_def);
 
 /**
+ * Compare keys using the key definition and comparison hints.
+ * @param key_a key parts with MessagePack array header
+ * @param key_a_hint comparison hint of @a key_a
+ * @param key_b key_parts with MessagePack array header
+ * @param key_b_hint comparison hint of @a key_b
+ * @param key_def key definition
+ *
+ * @retval 0  if key_a == key_b
+ * @retval <0 if key_a < key_b
+ * @retval >0 if key_a > key_b
+ */
+int
+key_compare_hinted(const char *key_a, hint_t key_a_hint,
+		   const char *key_b, hint_t key_b_hint,
+		   struct key_def *key_def);
+
+/**
  * Compare tuples using the key definition.
  * @param tuple_a first tuple
  * @param tuple_b second tuple
diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc
index 93756365..055f3181 100644
--- a/src/box/tuple_compare.cc
+++ b/src/box/tuple_compare.cc
@@ -817,6 +817,17 @@ key_compare(const char *key_a, const char *key_b, struct key_def *key_def)
 	}
 }
 
+int
+key_compare_hinted(const char *key_a, hint_t key_a_hint,
+		   const char *key_b, hint_t key_b_hint,
+		   struct key_def *key_def)
+{
+	int rc = hint_cmp(key_a_hint, key_b_hint);
+	if (rc != 0)
+		return rc;
+	return key_compare(key_a, key_b, key_def);
+}
+
 template <bool is_nullable, bool has_optional_parts>
 static int
 tuple_compare_sequential_hinted(const struct tuple *tuple_a, hint_t tuple_a_hint,
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index da891025..77bacd77 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -3532,6 +3532,8 @@ vy_squash_process(struct vy_squash *squash)
 	if (result == NULL)
 		return 0;
 
+	hint_t hint = vy_stmt_hint(result, lsm->cmp_def);
+
 	/*
 	 * While we were reading on-disk runs, new statements could
 	 * have been prepared for the squashed key. We mustn't apply
@@ -3540,10 +3542,8 @@ vy_squash_process(struct vy_squash *squash)
 	 * vy_lsm_commit_upsert().
 	 */
 	struct vy_mem *mem = lsm->mem;
-	struct tree_mem_key tree_key = {
-		.stmt = result,
-		.lsn = vy_stmt_lsn(result),
-	};
+	struct vy_mem_tree_key tree_key = vy_mem_tree_key(result, hint,
+							  vy_stmt_lsn(result));
 	struct vy_mem_tree_iterator mem_itr =
 		vy_mem_tree_lower_bound(&mem->tree, &tree_key, NULL);
 	if (vy_mem_tree_iterator_is_invalid(&mem_itr)) {
@@ -3557,13 +3557,14 @@ vy_squash_process(struct vy_squash *squash)
 	vy_mem_tree_iterator_prev(&mem->tree, &mem_itr);
 	uint8_t n_upserts = 0;
 	while (!vy_mem_tree_iterator_is_invalid(&mem_itr)) {
-		const struct tuple *mem_stmt;
-		mem_stmt = *vy_mem_tree_iterator_get_elem(&mem->tree, &mem_itr);
-		if (vy_stmt_compare(result, mem_stmt, lsm->cmp_def) != 0 ||
-		    vy_stmt_type(mem_stmt) != IPROTO_UPSERT)
+		struct vy_mem_tree_elem elem;
+		elem = *vy_mem_tree_iterator_get_elem(&mem->tree, &mem_itr);
+		if (vy_stmt_compare_hinted(result, hint, elem.stmt, elem.hint,
+					   lsm->cmp_def) != 0 ||
+		    vy_stmt_type(elem.stmt) != IPROTO_UPSERT)
 			break;
-		assert(vy_stmt_lsn(mem_stmt) >= MAX_LSN);
-		vy_stmt_set_n_upserts((struct tuple *)mem_stmt, n_upserts);
+		assert(vy_stmt_lsn(elem.stmt) >= MAX_LSN);
+		vy_stmt_set_n_upserts((struct tuple *)elem.stmt, n_upserts);
 		if (n_upserts <= VY_UPSERT_THRESHOLD)
 			++n_upserts;
 		vy_mem_tree_iterator_prev(&mem->tree, &mem_itr);
@@ -4166,9 +4167,9 @@ vy_build_recover_mem(struct vy_lsm *lsm, struct vy_lsm *pk, struct vy_mem *mem)
 	struct vy_mem_tree_iterator itr;
 	itr = vy_mem_tree_iterator_last(&mem->tree);
 	while (!vy_mem_tree_iterator_is_invalid(&itr)) {
-		const struct tuple *mem_stmt;
-		mem_stmt = *vy_mem_tree_iterator_get_elem(&mem->tree, &itr);
-		if (vy_build_recover_stmt(lsm, pk, mem_stmt) != 0)
+		struct vy_mem_tree_elem elem;
+		elem = *vy_mem_tree_iterator_get_elem(&mem->tree, &itr);
+		if (vy_build_recover_stmt(lsm, pk, elem.stmt) != 0)
 			return -1;
 		vy_mem_tree_iterator_prev(&mem->tree, &itr);
 	}
diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c
index e3b40083..220751d4 100644
--- a/src/box/vy_mem.c
+++ b/src/box/vy_mem.c
@@ -70,6 +70,23 @@ vy_mem_env_destroy(struct vy_mem_env *env)
 
 /** {{{ vy_mem */
 
+/**
+ * Assert that a memory tree iterator points to a given statement.
+ */
+static inline void
+vy_mem_tree_iterator_assert(struct vy_mem_tree *tree,
+			    struct vy_mem_tree_iterator *pos,
+			    struct vy_mem_tree_elem expected)
+{
+	assert(!vy_mem_tree_iterator_is_invalid(pos));
+	struct vy_mem_tree_elem elem;
+	elem = *vy_mem_tree_iterator_get_elem(tree, pos);
+	assert(elem.stmt == expected.stmt);
+	assert(elem.hint == expected.hint);
+	(void)elem;
+	(void)expected;
+}
+
 static void *
 vy_mem_tree_extent_alloc(void *ctx)
 {
@@ -134,9 +151,9 @@ vy_mem_delete(struct vy_mem *index)
 const struct tuple *
 vy_mem_older_lsn(struct vy_mem *mem, const struct tuple *stmt)
 {
-	struct tree_mem_key tree_key;
-	tree_key.stmt = stmt;
-	tree_key.lsn = vy_stmt_lsn(stmt) - 1;
+	hint_t hint = vy_stmt_hint(stmt, mem->cmp_def);
+	struct vy_mem_tree_key tree_key = vy_mem_tree_key(stmt, hint,
+						vy_stmt_lsn(stmt) - 1);
 	bool exact = false;
 	struct vy_mem_tree_iterator itr =
 		vy_mem_tree_lower_bound(&mem->tree, &tree_key, &exact);
@@ -144,11 +161,12 @@ vy_mem_older_lsn(struct vy_mem *mem, const struct tuple *stmt)
 	if (vy_mem_tree_iterator_is_invalid(&itr))
 		return NULL;
 
-	const struct tuple *result;
+	struct vy_mem_tree_elem result;
 	result = *vy_mem_tree_iterator_get_elem(&mem->tree, &itr);
-	if (vy_stmt_compare(result, stmt, mem->cmp_def) != 0)
+	if (vy_stmt_compare_hinted(result.stmt, result.hint,
+				   stmt, hint, mem->cmp_def) != 0)
 		return NULL;
-	return result;
+	return result.stmt;
 }
 
 int
@@ -159,15 +177,16 @@ vy_mem_insert_upsert(struct vy_mem *mem, const struct tuple *stmt)
 	assert(stmt->format_id == tuple_format_id(mem->format));
 	/* The statement must be from a lsregion. */
 	assert(!vy_stmt_is_refable(stmt));
+	hint_t hint = vy_stmt_hint(stmt, mem->cmp_def);
 	size_t size = tuple_size(stmt);
-	const struct tuple *replaced_stmt = NULL;
+	struct vy_mem_tree_elem new = vy_mem_tree_elem(stmt, hint);
+	struct vy_mem_tree_elem replaced = vy_mem_tree_elem_invalid();
 	struct vy_mem_tree_iterator inserted;
-	if (vy_mem_tree_insert_get_iterator(&mem->tree, stmt, &replaced_stmt,
+	if (vy_mem_tree_insert_get_iterator(&mem->tree, new, &replaced,
 					    &inserted) != 0)
 		return -1;
-	assert(! vy_mem_tree_iterator_is_invalid(&inserted));
-	assert(*vy_mem_tree_iterator_get_elem(&mem->tree, &inserted) == stmt);
-	if (replaced_stmt == NULL)
+	vy_mem_tree_iterator_assert(&mem->tree, &inserted, new);
+	if (replaced.stmt == NULL)
 		mem->count.rows++;
 	mem->count.bytes += size;
 	/*
@@ -192,12 +211,15 @@ vy_mem_insert_upsert(struct vy_mem *mem, const struct tuple *stmt)
 	 * UPSERTs subsequence.
 	 */
 	vy_mem_tree_iterator_next(&mem->tree, &inserted);
-	const struct tuple **older = vy_mem_tree_iterator_get_elem(&mem->tree,
-								   &inserted);
-	if (older == NULL || vy_stmt_type(*older) != IPROTO_UPSERT ||
-	    vy_stmt_compare(stmt, *older, mem->cmp_def) != 0)
+	if (vy_mem_tree_iterator_is_invalid(&inserted))
 		return 0;
-	uint8_t n_upserts = vy_stmt_n_upserts(*older);
+	struct vy_mem_tree_elem older;
+	older = *vy_mem_tree_iterator_get_elem(&mem->tree, &inserted);
+	if (vy_stmt_type(older.stmt) != IPROTO_UPSERT ||
+	    vy_stmt_compare_hinted(stmt, hint, older.stmt, older.hint,
+				   mem->cmp_def) != 0)
+		return 0;
+	uint8_t n_upserts = vy_stmt_n_upserts(older.stmt);
 	/*
 	 * Stop increment if the threshold is reached to avoid
 	 * creation of multiple squashing tasks.
@@ -219,11 +241,13 @@ vy_mem_insert(struct vy_mem *mem, const struct tuple *stmt)
 	       stmt->format_id == tuple_format_id(mem->format));
 	/* The statement must be from a lsregion. */
 	assert(!vy_stmt_is_refable(stmt));
+	hint_t hint = vy_stmt_hint(stmt, mem->cmp_def);
 	size_t size = tuple_size(stmt);
-	const struct tuple *replaced_stmt = NULL;
-	if (vy_mem_tree_insert(&mem->tree, stmt, &replaced_stmt))
+	struct vy_mem_tree_elem new = vy_mem_tree_elem(stmt, hint);
+	struct vy_mem_tree_elem replaced = vy_mem_tree_elem_invalid();
+	if (vy_mem_tree_insert(&mem->tree, new, &replaced))
 		return -1;
-	if (replaced_stmt == NULL)
+	if (replaced.stmt == NULL)
 		mem->count.rows++;
 	mem->count.bytes += size;
 	/*
@@ -261,7 +285,9 @@ vy_mem_rollback_stmt(struct vy_mem *mem, const struct tuple *stmt)
 {
 	/* This is the statement we've inserted before. */
 	assert(!vy_stmt_is_refable(stmt));
-	int rc = vy_mem_tree_delete(&mem->tree, stmt);
+	hint_t hint = vy_stmt_hint(stmt, mem->cmp_def);
+	struct vy_mem_tree_elem elem = vy_mem_tree_elem(stmt, hint);
+	int rc = vy_mem_tree_delete(&mem->tree, elem);
 	assert(rc == 0);
 	(void) rc;
 	/* We can't free memory in case of rollback. */
@@ -274,15 +300,6 @@ vy_mem_rollback_stmt(struct vy_mem *mem, const struct tuple *stmt)
 /* {{{ vy_mem_iterator support functions */
 
 /**
- * Get a stmt by current position
- */
-static const struct tuple *
-vy_mem_iterator_curr_stmt(struct vy_mem_iterator *itr)
-{
-	return *vy_mem_tree_iterator_get_elem(&itr->mem->tree, &itr->curr_pos);
-}
-
-/**
  * Make a step in the iterator direction.
  * @retval 0 success
  * @retval 1 EOF
@@ -296,7 +313,8 @@ vy_mem_iterator_step(struct vy_mem_iterator *itr)
 		vy_mem_tree_iterator_next(&itr->mem->tree, &itr->curr_pos);
 	if (vy_mem_tree_iterator_is_invalid(&itr->curr_pos))
 		return 1;
-	itr->curr_stmt = vy_mem_iterator_curr_stmt(itr);
+	itr->curr = *vy_mem_tree_iterator_get_elem(&itr->mem->tree,
+						   &itr->curr_pos);
 	return 0;
 }
 
@@ -311,16 +329,18 @@ vy_mem_iterator_step(struct vy_mem_iterator *itr)
 static int
 vy_mem_iterator_find_lsn(struct vy_mem_iterator *itr)
 {
+	vy_mem_tree_iterator_assert(&itr->mem->tree, &itr->curr_pos, itr->curr);
+
 	/* Skip to the first statement visible in the read view. */
-	assert(!vy_mem_tree_iterator_is_invalid(&itr->curr_pos));
-	assert(itr->curr_stmt == vy_mem_iterator_curr_stmt(itr));
 	struct key_def *cmp_def = itr->mem->cmp_def;
-	while (vy_stmt_lsn(itr->curr_stmt) > (**itr->read_view).vlsn ||
-	       vy_stmt_flags(itr->curr_stmt) & VY_STMT_SKIP_READ) {
+	while (vy_stmt_lsn(itr->curr.stmt) > (**itr->read_view).vlsn ||
+	       vy_stmt_flags(itr->curr.stmt) & VY_STMT_SKIP_READ) {
 		if (vy_mem_iterator_step(itr) != 0 ||
 		    (itr->iterator_type == ITER_EQ &&
-		     vy_stmt_compare(itr->key, itr->curr_stmt, cmp_def))) {
-			itr->curr_stmt = NULL;
+		     vy_stmt_compare_hinted(itr->key, itr->hint,
+					    itr->curr.stmt, itr->curr.hint,
+					    cmp_def) != 0)) {
+			itr->curr = vy_mem_tree_elem_invalid();
 			return 1;
 		}
 	}
@@ -337,10 +357,11 @@ vy_mem_iterator_find_lsn(struct vy_mem_iterator *itr)
 		/* No more statements. */
 		return 0;
 	}
-	const struct tuple *prev_stmt;
-	prev_stmt = *vy_mem_tree_iterator_get_elem(&itr->mem->tree, &prev_pos);
-	if (vy_stmt_lsn(prev_stmt) > (**itr->read_view).vlsn ||
-	    vy_stmt_compare(itr->curr_stmt, prev_stmt, cmp_def) != 0) {
+	struct vy_mem_tree_elem prev;
+	prev = *vy_mem_tree_iterator_get_elem(&itr->mem->tree, &prev_pos);
+	if (vy_stmt_lsn(prev.stmt) > (**itr->read_view).vlsn ||
+	    vy_stmt_compare_hinted(itr->curr.stmt, itr->curr.hint,
+				   prev.stmt, prev.hint, cmp_def) != 0) {
 		/*
 		 * The next statement is either invisible in
 		 * the read view or for another key.
@@ -354,21 +375,20 @@ vy_mem_iterator_find_lsn(struct vy_mem_iterator *itr)
 	 * is going to take long. So instead we look it up - it's
 	 * pretty cheap anyway.
 	 */
-	struct tree_mem_key tree_key;
-	tree_key.stmt = itr->curr_stmt;
-	tree_key.lsn = (**itr->read_view).vlsn;
+	struct vy_mem_tree_key tree_key = vy_mem_tree_key(itr->curr.stmt,
+				itr->curr.hint, (**itr->read_view).vlsn);
 	itr->curr_pos = vy_mem_tree_lower_bound(&itr->mem->tree,
 						&tree_key, NULL);
 	assert(!vy_mem_tree_iterator_is_invalid(&itr->curr_pos));
-	itr->curr_stmt = *vy_mem_tree_iterator_get_elem(&itr->mem->tree,
-							&itr->curr_pos);
+	itr->curr = *vy_mem_tree_iterator_get_elem(&itr->mem->tree,
+						   &itr->curr_pos);
 
 	/* Skip VY_STMT_SKIP_READ statements, if any. */
-	while (vy_stmt_flags(itr->curr_stmt) & VY_STMT_SKIP_READ) {
+	while (vy_stmt_flags(itr->curr.stmt) & VY_STMT_SKIP_READ) {
 		vy_mem_tree_iterator_next(&itr->mem->tree, &itr->curr_pos);
 		assert(!vy_mem_tree_iterator_is_invalid(&itr->curr_pos));
-		itr->curr_stmt = *vy_mem_tree_iterator_get_elem(&itr->mem->tree,
-								&itr->curr_pos);
+		itr->curr = *vy_mem_tree_iterator_get_elem(&itr->mem->tree,
+							   &itr->curr_pos);
 	}
 	return 0;
 }
@@ -382,27 +402,27 @@ vy_mem_iterator_find_lsn(struct vy_mem_iterator *itr)
  * @retval 1 Not found
  */
 static int
-vy_mem_iterator_seek(struct vy_mem_iterator *itr, const struct tuple *last_key)
+vy_mem_iterator_seek(struct vy_mem_iterator *itr, const struct tuple *last_key,
+		     hint_t last_hint)
 {
 	itr->stat->lookup++;
 	itr->search_started = true;
 	itr->version = itr->mem->version;
-	itr->curr_stmt = NULL;
+	itr->curr = vy_mem_tree_elem_invalid();
 
-	const struct tuple *key = itr->key;
+	struct vy_mem_tree_key tree_key;
+	/* (lsn == INT64_MAX - 1) means that lsn is ignored in comparison */
+	tree_key = vy_mem_tree_key(itr->key, itr->hint, INT64_MAX - 1);
 	enum iterator_type iterator_type = itr->iterator_type;
 	if (last_key != NULL) {
-		key = last_key;
+		tree_key.stmt = last_key;
+		tree_key.hint = last_hint;
 		iterator_type = iterator_direction(itr->iterator_type) > 0 ?
 				ITER_GT : ITER_LT;
 	}
 
 	bool exact;
-	struct tree_mem_key tree_key;
-	tree_key.stmt = key;
-	/* (lsn == INT64_MAX - 1) means that lsn is ignored in comparison */
-	tree_key.lsn = INT64_MAX - 1;
-	if (!vy_stmt_is_empty_key(key)) {
+	if (!vy_stmt_is_empty_key(tree_key.stmt)) {
 		if (iterator_type == ITER_LE || iterator_type == ITER_GT) {
 			itr->curr_pos =
 				vy_mem_tree_upper_bound(&itr->mem->tree,
@@ -426,12 +446,15 @@ vy_mem_iterator_seek(struct vy_mem_iterator *itr, const struct tuple *last_key)
 		vy_mem_tree_iterator_prev(&itr->mem->tree, &itr->curr_pos);
 	if (vy_mem_tree_iterator_is_invalid(&itr->curr_pos))
 		return 1;
-	itr->curr_stmt = vy_mem_iterator_curr_stmt(itr);
+	itr->curr = *vy_mem_tree_iterator_get_elem(&itr->mem->tree,
+						   &itr->curr_pos);
 	if (itr->iterator_type == ITER_EQ &&
 	    ((last_key == NULL && !exact) ||
-	     (last_key != NULL && vy_stmt_compare(itr->key, itr->curr_stmt,
-						  itr->mem->cmp_def) != 0))) {
-		itr->curr_stmt = NULL;
+	     (last_key != NULL &&
+	      vy_stmt_compare_hinted(itr->key, itr->hint,
+				     itr->curr.stmt, itr->curr.hint,
+				     itr->mem->cmp_def) != 0))) {
+		itr->curr = vy_mem_tree_elem_invalid();
 		return 1;
 	}
 	return vy_mem_iterator_find_lsn(itr);
@@ -453,10 +476,11 @@ vy_mem_iterator_open(struct vy_mem_iterator *itr, struct vy_mem_iterator_stat *s
 
 	itr->iterator_type = iterator_type;
 	itr->key = key;
+	itr->hint = vy_stmt_hint(key, mem->cmp_def);
 	itr->read_view = rv;
 
 	itr->curr_pos = vy_mem_tree_invalid_iterator();
-	itr->curr_stmt = NULL;
+	itr->curr = vy_mem_tree_elem_invalid();
 
 	itr->search_started = false;
 }
@@ -470,17 +494,18 @@ static NODISCARD int
 vy_mem_iterator_next_key(struct vy_mem_iterator *itr)
 {
 	if (!itr->search_started)
-		return vy_mem_iterator_seek(itr, NULL);
-	if (!itr->curr_stmt) /* End of search. */
+		return vy_mem_iterator_seek(itr, NULL, HINT_NONE);
+	if (itr->curr.stmt == NULL)
 		return 1;
+
 	assert(itr->mem->version == itr->version);
-	assert(!vy_mem_tree_iterator_is_invalid(&itr->curr_pos));
-	assert(itr->curr_stmt == vy_mem_iterator_curr_stmt(itr));
+	vy_mem_tree_iterator_assert(&itr->mem->tree, &itr->curr_pos, itr->curr);
+
 	struct key_def *cmp_def = itr->mem->cmp_def;
+	struct vy_mem_tree_elem prev = itr->curr;
 
-	const struct tuple *prev_stmt = itr->curr_stmt;
 	if (vy_mem_iterator_step(itr) != 0) {
-		itr->curr_stmt = NULL;
+		itr->curr = vy_mem_tree_elem_invalid();
 		return 1;
 	}
 	/*
@@ -489,12 +514,14 @@ vy_mem_iterator_next_key(struct vy_mem_iterator *itr)
 	 * for this key so instead of iterating further we simply
 	 * look up the next key - it's pretty cheap anyway.
 	 */
-	if (vy_stmt_compare(prev_stmt, itr->curr_stmt, cmp_def) == 0)
-		return vy_mem_iterator_seek(itr, itr->curr_stmt);
+	if (vy_stmt_compare_hinted(prev.stmt, prev.hint, itr->curr.stmt,
+				   itr->curr.hint, cmp_def) == 0)
+		return vy_mem_iterator_seek(itr, itr->curr.stmt, itr->curr.hint);
 
 	if (itr->iterator_type == ITER_EQ &&
-	    vy_stmt_compare(itr->key, itr->curr_stmt, cmp_def) != 0) {
-		itr->curr_stmt = NULL;
+	    vy_stmt_compare_hinted(itr->key, itr->hint, itr->curr.stmt,
+				   itr->curr.hint, cmp_def) != 0) {
+		itr->curr = vy_mem_tree_elem_invalid();
 		return 1;
 	}
 	return vy_mem_iterator_find_lsn(itr);
@@ -509,27 +536,28 @@ static NODISCARD int
 vy_mem_iterator_next_lsn(struct vy_mem_iterator *itr)
 {
 	assert(itr->search_started);
-	if (!itr->curr_stmt) /* End of search. */
+	if (itr->curr.stmt == NULL)
 		return 1;
+
 	assert(itr->mem->version == itr->version);
-	assert(!vy_mem_tree_iterator_is_invalid(&itr->curr_pos));
-	assert(itr->curr_stmt == vy_mem_iterator_curr_stmt(itr));
+	vy_mem_tree_iterator_assert(&itr->mem->tree, &itr->curr_pos, itr->curr);
+
 	struct key_def *cmp_def = itr->mem->cmp_def;
-
 	struct vy_mem_tree_iterator next_pos = itr->curr_pos;
 next:
 	vy_mem_tree_iterator_next(&itr->mem->tree, &next_pos);
 	if (vy_mem_tree_iterator_is_invalid(&next_pos))
 		return 1; /* EOF */
 
-	const struct tuple *next_stmt;
-	next_stmt = *vy_mem_tree_iterator_get_elem(&itr->mem->tree, &next_pos);
-	if (vy_stmt_compare(itr->curr_stmt, next_stmt, cmp_def) != 0)
+	struct vy_mem_tree_elem next;
+	next = *vy_mem_tree_iterator_get_elem(&itr->mem->tree, &next_pos);
+	if (vy_stmt_compare_hinted(itr->curr.stmt, itr->curr.hint,
+				   next.stmt, next.hint, cmp_def) != 0)
 		return 1;
 
 	itr->curr_pos = next_pos;
-	itr->curr_stmt = next_stmt;
-	if (vy_stmt_flags(itr->curr_stmt) & VY_STMT_SKIP_READ)
+	itr->curr = next;
+	if (vy_stmt_flags(itr->curr.stmt) & VY_STMT_SKIP_READ)
 		goto next;
 	return 0;
 }
@@ -544,7 +572,7 @@ vy_mem_iterator_get_history(struct vy_mem_iterator *itr,
 			    struct vy_history *history)
 {
 	do {
-		struct tuple *stmt = (struct tuple *)itr->curr_stmt;
+		struct tuple *stmt = (struct tuple *)itr->curr.stmt;
 		vy_stmt_counter_acct_tuple(&itr->stat->get, stmt);
 		if (vy_history_append_stmt(history, stmt) != 0)
 			return -1;
@@ -571,18 +599,22 @@ vy_mem_iterator_skip(struct vy_mem_iterator *itr,
 {
 	assert(!itr->search_started || itr->version == itr->mem->version);
 
+	hint_t last_hint = last_stmt == NULL ? HINT_NONE :
+			   vy_stmt_hint(last_stmt, itr->mem->cmp_def);
+
 	/*
 	 * Check if the iterator is already positioned
 	 * at the statement following last_stmt.
 	 */
 	if (itr->search_started &&
-	    (itr->curr_stmt == NULL || last_stmt == NULL ||
+	    (itr->curr.stmt == NULL || last_stmt == NULL ||
 	     iterator_direction(itr->iterator_type) *
-	     vy_stmt_compare(itr->curr_stmt, last_stmt, itr->mem->cmp_def) > 0))
+	     vy_stmt_compare_hinted(itr->curr.stmt, itr->curr.hint, last_stmt,
+				    last_hint, itr->mem->cmp_def) > 0))
 		return 0;
 
 	vy_history_cleanup(history);
-	if (vy_mem_iterator_seek(itr, last_stmt) == 0)
+	if (vy_mem_iterator_seek(itr, last_stmt, last_hint) == 0)
 		return vy_mem_iterator_get_history(itr, history);
 	return 0;
 }
@@ -595,10 +627,12 @@ vy_mem_iterator_restore(struct vy_mem_iterator *itr,
 	if (!itr->search_started || itr->version == itr->mem->version)
 		return 0;
 
-	vy_mem_iterator_seek(itr, last_stmt);
+	hint_t last_hint = last_stmt == NULL ? HINT_NONE :
+			   vy_stmt_hint(last_stmt, itr->mem->cmp_def);
+	vy_mem_iterator_seek(itr, last_stmt, last_hint);
 
 	vy_history_cleanup(history);
-	if (itr->curr_stmt != NULL &&
+	if (itr->curr.stmt != NULL &&
 	    vy_mem_iterator_get_history(itr, history) != 0)
 		return -1;
 	return 1;
@@ -616,16 +650,16 @@ vy_mem_stream_next(struct vy_stmt_stream *virt_stream, struct tuple **ret)
 	assert(virt_stream->iface->next == vy_mem_stream_next);
 	struct vy_mem_stream *stream = (struct vy_mem_stream *)virt_stream;
 
-	struct tuple **res = (struct tuple **)
-		vy_mem_tree_iterator_get_elem(&stream->mem->tree,
-					      &stream->curr_pos);
-	if (res == NULL) {
+	if (vy_mem_tree_iterator_is_invalid(&stream->curr_pos)) {
 		*ret = NULL;
-	} else {
-		*ret = *res;
-		vy_mem_tree_iterator_next(&stream->mem->tree,
-					  &stream->curr_pos);
+		return 0;
 	}
+
+	struct vy_mem_tree_elem elem;
+	elem = *vy_mem_tree_iterator_get_elem(&stream->mem->tree,
+					      &stream->curr_pos);
+	*ret = (struct tuple *)elem.stmt;
+	vy_mem_tree_iterator_next(&stream->mem->tree, &stream->curr_pos);
 	return 0;
 }
 
diff --git a/src/box/vy_mem.h b/src/box/vy_mem.h
index 39f238b3..160bf5e1 100644
--- a/src/box/vy_mem.h
+++ b/src/box/vy_mem.h
@@ -78,22 +78,54 @@ vy_mem_env_destroy(struct vy_mem_env *env);
 
 /** @cond false */
 
-struct tree_mem_key {
+struct vy_mem_tree_elem {
 	const struct tuple *stmt;
+	hint_t hint;
+};
+
+struct vy_mem_tree_key {
+	const struct tuple *stmt;
+	hint_t hint;
 	int64_t lsn;
 };
 
+static inline struct vy_mem_tree_key
+vy_mem_tree_key(const struct tuple *stmt, hint_t hint, int64_t lsn)
+{
+	struct vy_mem_tree_key key;
+	key.stmt = stmt;
+	key.hint = hint;
+	key.lsn = lsn;
+	return key;
+}
+
+static inline struct vy_mem_tree_elem
+vy_mem_tree_elem(const struct tuple *stmt, hint_t hint)
+{
+	struct vy_mem_tree_elem elem;
+	elem.stmt = stmt;
+	elem.hint = hint;
+	return elem;
+}
+
+static inline struct vy_mem_tree_elem
+vy_mem_tree_elem_invalid(void)
+{
+	return vy_mem_tree_elem(NULL, HINT_NONE);
+}
+
 /**
  * Internal. Extracted to speed up BPS tree.
  */
 static int
-vy_mem_tree_cmp(const struct tuple *a, const struct tuple *b,
+vy_mem_tree_cmp(struct vy_mem_tree_elem a, struct vy_mem_tree_elem b,
 		struct key_def *cmp_def)
 {
-	int res = vy_stmt_compare(a, b, cmp_def);
+	int res = vy_stmt_compare_hinted(a.stmt, a.hint,
+					 b.stmt, b.hint, cmp_def);
 	if (res)
 		return res;
-	int64_t a_lsn = vy_stmt_lsn(a), b_lsn = vy_stmt_lsn(b);
+	int64_t a_lsn = vy_stmt_lsn(a.stmt), b_lsn = vy_stmt_lsn(b.stmt);
 	return a_lsn > b_lsn ? -1 : a_lsn < b_lsn;
 }
 
@@ -101,14 +133,15 @@ vy_mem_tree_cmp(const struct tuple *a, const struct tuple *b,
  * Internal. Extracted to speed up BPS tree.
  */
 static int
-vy_mem_tree_cmp_key(const struct tuple *a, struct tree_mem_key *key,
+vy_mem_tree_cmp_key(struct vy_mem_tree_elem elem, struct vy_mem_tree_key *key,
 		    struct key_def *cmp_def)
 {
-	int res = vy_stmt_compare(a, key->stmt, cmp_def);
+	int res = vy_stmt_compare_hinted(elem.stmt, elem.hint,
+					 key->stmt, key->hint, cmp_def);
 	if (res == 0) {
 		if (key->lsn == INT64_MAX - 1)
 			return 0;
-		int64_t a_lsn = vy_stmt_lsn(a);
+		int64_t a_lsn = vy_stmt_lsn(elem.stmt);
 		res = a_lsn > key->lsn ? -1 : a_lsn < key->lsn;
 	}
 	return res;
@@ -121,8 +154,8 @@ vy_mem_tree_cmp_key(const struct tuple *a, struct tree_mem_key *key,
 #define BPS_TREE_EXTENT_SIZE VY_MEM_TREE_EXTENT_SIZE
 #define BPS_TREE_COMPARE(a, b, cmp_def) vy_mem_tree_cmp(a, b, cmp_def)
 #define BPS_TREE_COMPARE_KEY(a, b, cmp_def) vy_mem_tree_cmp_key(a, b, cmp_def)
-#define bps_tree_elem_t const struct tuple *
-#define bps_tree_key_t struct tree_mem_key *
+#define bps_tree_elem_t struct vy_mem_tree_elem
+#define bps_tree_key_t struct vy_mem_tree_key *
 #define bps_tree_arg_t struct key_def *
 #define BPS_TREE_NO_DEBUG
 
@@ -346,6 +379,8 @@ struct vy_mem_iterator {
 	enum iterator_type iterator_type;
 	/** Key to search. */
 	const struct tuple *key;
+	/** Comparison hint of the search key. */
+	hint_t hint;
 	/* LSN visibility, iterator shows values with lsn <= than that */
 	const struct vy_read_view **read_view;
 
@@ -354,11 +389,11 @@ struct vy_mem_iterator {
 	struct vy_mem_tree_iterator curr_pos;
 	/*
 	 * The pointer on a region allocated statement from vy_mem BPS tree.
-	 * There is no guarantee that curr_pos points on curr_stmt in the tree.
-	 * For example, cur_pos can be invalid but curr_stmt can point on a
+	 * There is no guarantee that curr_pos points on curr in the tree.
+	 * For example, cur_pos can be invalid but curr can point on a
 	 * valid statement.
 	 */
-	const struct tuple *curr_stmt;
+	struct vy_mem_tree_elem curr;
 	/* data version from vy_mem */
 	uint32_t version;
 
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index 2da2593f..928ae348 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -363,6 +363,21 @@ vy_stmt_unref_if_possible(struct tuple *stmt)
 }
 
 /**
+ * Return a comparison hint of a vinyl statement.
+ */
+static inline hint_t
+vy_stmt_hint(const struct tuple *stmt, struct key_def *key_def)
+{
+	if (vy_stmt_is_key(stmt)) {
+		const char *key = tuple_data(stmt);
+		uint32_t part_count = mp_decode_array(&key);
+		return key_hint(key, part_count, key_def);
+	} else {
+		return tuple_hint(stmt, key_def);
+	}
+}
+
+/**
  * Compare two vinyl statements taking into account their
  * formats (key or tuple).
  */
@@ -389,6 +404,36 @@ vy_stmt_compare(const struct tuple *a, const struct tuple *b,
 }
 
 /**
+ * Compare two vinyl statements taking into account their
+ * formats (key or tuple) and using comparison hints.
+ */
+static inline int
+vy_stmt_compare_hinted(const struct tuple *a, hint_t a_hint,
+		       const struct tuple *b, hint_t b_hint,
+		       struct key_def *key_def)
+{
+	bool a_is_tuple = !vy_stmt_is_key(a);
+	bool b_is_tuple = !vy_stmt_is_key(b);
+	if (a_is_tuple && b_is_tuple) {
+		return tuple_compare_hinted(a, a_hint, b, b_hint, key_def);
+	} else if (a_is_tuple && !b_is_tuple) {
+		const char *key = tuple_data(b);
+		uint32_t part_count = mp_decode_array(&key);
+		return tuple_compare_with_key_hinted(a, a_hint, key, part_count,
+						     b_hint, key_def);
+	} else if (!a_is_tuple && b_is_tuple) {
+		const char *key = tuple_data(a);
+		uint32_t part_count = mp_decode_array(&key);
+		return -tuple_compare_with_key_hinted(b, b_hint, key, part_count,
+						      a_hint, key_def);
+	} else {
+		assert(!a_is_tuple && !b_is_tuple);
+		return key_compare_hinted(tuple_data(a), a_hint,
+					  tuple_data(b), b_hint, key_def);
+	}
+}
+
+/**
  * Compare a vinyl statement (key or tuple) with a raw key
  * (msgpack array).
  */
-- 
2.11.0




More information about the Tarantool-patches mailing list