[PATCH v2 01/14] vinyl: remove optimized comparators

Vladimir Davydov vdavydov.dev at gmail.com
Wed Mar 13 11:52:47 MSK 2019


A vinyl statement (vy_stmt struct) may represent either a tuple or a
key. We differentiate between the two kinds by statement type - we use
SELECT for keys and other types for tuples. This was done that way so
that we could pass both tuples and keys to a read iterator as a search
key. To avoid branching in comparators when the types of compared
statements are known in advance, we provide several comparators, each of
which expects certain statement types, e.g. a tuple and a key. Actually,
such a micro optimization looks like an overkill, because a typical
comparator is called by function pointer and has a lot of comparisons
in the code, see tuple_compare_slowpath for instance. Eliminating one
branch will hardly make the code perform better. At the same time, it
makes the code more difficult to write. Besides, once we remove nils
from statements read from disk (aka surrogate tuples), which will
ease implementation of multikey indexes, the number of places where
types of compared statements are known will diminish drastically.
That said, let's remove optimized comparators and always use
vy_stmt_compare, which checks types of compared statements and calls
the appropriate comparator.
---
 src/box/vinyl.c             |   8 +--
 src/box/vy_cache.c          |   8 +--
 src/box/vy_cache.h          |   2 +-
 src/box/vy_lsm.c            |   8 +--
 src/box/vy_mem.c            |  13 +++--
 src/box/vy_mem.h            |   2 +-
 src/box/vy_range.c          |  13 +++--
 src/box/vy_read_iterator.c  |  22 ++++----
 src/box/vy_run.c            |  36 ++++++-------
 src/box/vy_stmt.h           | 121 +++++++-------------------------------------
 src/box/vy_tx.c             |   6 +--
 src/box/vy_upsert.c         |   2 +-
 src/box/vy_write_iterator.c |   2 +-
 13 files changed, 76 insertions(+), 167 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 63ceab41..4ffa79bf 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1305,7 +1305,7 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx,
 		return -1;
 
 	if (*result == NULL ||
-	    vy_tuple_compare(*result, tuple, lsm->key_def) != 0) {
+	    vy_stmt_compare(*result, tuple, lsm->key_def) != 0) {
 		/*
 		 * If a tuple read from a secondary index doesn't
 		 * match the tuple corresponding to it in the
@@ -1524,7 +1524,7 @@ vy_check_is_unique_secondary(struct vy_tx *tx, const struct vy_read_view **rv,
 	 * fail here.
 	 */
 	if (found != NULL && vy_stmt_type(stmt) == IPROTO_REPLACE &&
-	    vy_tuple_compare(stmt, found, lsm->pk->key_def) == 0) {
+	    vy_stmt_compare(stmt, found, lsm->pk->key_def) == 0) {
 		tuple_unref(found);
 		return 0;
 	}
@@ -1729,7 +1729,7 @@ vy_check_update(struct space *space, const struct vy_lsm *pk,
 		uint64_t column_mask)
 {
 	if (!key_update_can_be_skipped(pk->key_def->column_mask, column_mask) &&
-	    vy_tuple_compare(old_tuple, new_tuple, pk->key_def) != 0) {
+	    vy_stmt_compare(old_tuple, new_tuple, pk->key_def) != 0) {
 		diag_set(ClientError, ER_CANT_UPDATE_PRIMARY_KEY,
 			 index_name_by_id(space, pk->index_id),
 			 space_name(space));
@@ -3513,7 +3513,7 @@ vy_squash_process(struct vy_squash *squash)
 	while (!vy_mem_tree_iterator_is_invalid(&mem_itr)) {
 		const struct tuple *mem_stmt;
 		mem_stmt = *vy_mem_tree_iterator_get_elem(&mem->tree, &mem_itr);
-		if (vy_tuple_compare(result, mem_stmt, lsm->cmp_def) != 0 ||
+		if (vy_stmt_compare(result, mem_stmt, lsm->cmp_def) != 0 ||
 		    vy_stmt_type(mem_stmt) != IPROTO_UPSERT)
 			break;
 		assert(vy_stmt_lsn(mem_stmt) >= MAX_LSN);
diff --git a/src/box/vy_cache.c b/src/box/vy_cache.c
index 320ea04d..8354b1c2 100644
--- a/src/box/vy_cache.c
+++ b/src/box/vy_cache.c
@@ -370,8 +370,8 @@ vy_cache_add(struct vy_cache *cache, struct tuple *stmt,
 							&inserted);
 		assert(*prev_check_entry != NULL);
 		struct tuple *prev_check_stmt = (*prev_check_entry)->stmt;
-		int cmp = vy_tuple_compare(prev_stmt, prev_check_stmt,
-					   cache->cmp_def);
+		int cmp = vy_stmt_compare(prev_stmt, prev_check_stmt,
+					  cache->cmp_def);
 
 		if (entry->flags & flag) {
 			/* The found entry must be exactly prev_stmt. (2) */
@@ -722,8 +722,8 @@ vy_cache_iterator_skip(struct vy_cache_iterator *itr,
 	if (itr->search_started &&
 	    (itr->curr_stmt == NULL || last_stmt == NULL ||
 	     iterator_direction(itr->iterator_type) *
-	     vy_tuple_compare(itr->curr_stmt, last_stmt,
-			      itr->cache->cmp_def) > 0))
+	     vy_stmt_compare(itr->curr_stmt, last_stmt,
+			     itr->cache->cmp_def) > 0))
 		return 0;
 
 	*stop = false;
diff --git a/src/box/vy_cache.h b/src/box/vy_cache.h
index 7cb93155..a846622e 100644
--- a/src/box/vy_cache.h
+++ b/src/box/vy_cache.h
@@ -74,7 +74,7 @@ static inline int
 vy_cache_tree_cmp(struct vy_cache_entry *a,
 		  struct vy_cache_entry *b, struct key_def *cmp_def)
 {
-	return vy_tuple_compare(a->stmt, b->stmt, cmp_def);
+	return vy_stmt_compare(a->stmt, b->stmt, cmp_def);
 }
 
 /**
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index 07da145b..52183799 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -405,7 +405,7 @@ vy_lsm_recover_slice(struct vy_lsm *lsm, struct vy_range *range,
 			goto out;
 	}
 	if (begin != NULL && end != NULL &&
-	    vy_key_compare(begin, end, lsm->cmp_def) >= 0) {
+	    vy_stmt_compare(begin, end, lsm->cmp_def) >= 0) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("begin >= end for slice %lld",
 				    (long long)slice_info->id));
@@ -451,7 +451,7 @@ vy_lsm_recover_range(struct vy_lsm *lsm,
 			goto out;
 	}
 	if (begin != NULL && end != NULL &&
-	    vy_key_compare(begin, end, lsm->cmp_def) >= 0) {
+	    vy_stmt_compare(begin, end, lsm->cmp_def) >= 0) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("begin >= end for range %lld",
 				    (long long)range_info->id));
@@ -632,8 +632,8 @@ vy_lsm_recover(struct vy_lsm *lsm, struct vy_recovery *recovery,
 		int cmp = 0;
 		if (prev != NULL &&
 		    (prev->end == NULL || range->begin == NULL ||
-		     (cmp = vy_key_compare(prev->end, range->begin,
-					   lsm->cmp_def)) != 0)) {
+		     (cmp = vy_stmt_compare(prev->end, range->begin,
+					    lsm->cmp_def)) != 0)) {
 			const char *errmsg = cmp > 0 ?
 				"Nearby ranges %lld and %lld overlap" :
 				"Keys between ranges %lld and %lld not spanned";
diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c
index 6bc3859c..c47a3c36 100644
--- a/src/box/vy_mem.c
+++ b/src/box/vy_mem.c
@@ -146,7 +146,7 @@ vy_mem_older_lsn(struct vy_mem *mem, const struct tuple *stmt)
 
 	const struct tuple *result;
 	result = *vy_mem_tree_iterator_get_elem(&mem->tree, &itr);
-	if (vy_tuple_compare(result, stmt, mem->cmp_def) != 0)
+	if (vy_stmt_compare(result, stmt, mem->cmp_def) != 0)
 		return NULL;
 	return result;
 }
@@ -195,7 +195,7 @@ vy_mem_insert_upsert(struct vy_mem *mem, const struct tuple *stmt)
 	const struct tuple **older = vy_mem_tree_iterator_get_elem(&mem->tree,
 								   &inserted);
 	if (older == NULL || vy_stmt_type(*older) != IPROTO_UPSERT ||
-	    vy_tuple_compare(stmt, *older, mem->cmp_def) != 0)
+	    vy_stmt_compare(stmt, *older, mem->cmp_def) != 0)
 		return 0;
 	uint8_t n_upserts = vy_stmt_n_upserts(*older);
 	/*
@@ -339,7 +339,7 @@ vy_mem_iterator_find_lsn(struct vy_mem_iterator *itr)
 	const struct tuple *prev_stmt;
 	prev_stmt = *vy_mem_tree_iterator_get_elem(&itr->mem->tree, &prev_pos);
 	if (vy_stmt_lsn(prev_stmt) > (**itr->read_view).vlsn ||
-	    vy_tuple_compare(itr->curr_stmt, prev_stmt, cmp_def) != 0) {
+	    vy_stmt_compare(itr->curr_stmt, prev_stmt, cmp_def) != 0) {
 		/*
 		 * The next statement is either invisible in
 		 * the read view or for another key.
@@ -488,7 +488,7 @@ vy_mem_iterator_next_key(struct vy_mem_iterator *itr)
 	 * for this key so instead of iterating further we simply
 	 * look up the next key - it's pretty cheap anyway.
 	 */
-	if (vy_tuple_compare(prev_stmt, itr->curr_stmt, cmp_def) == 0)
+	if (vy_stmt_compare(prev_stmt, itr->curr_stmt, cmp_def) == 0)
 		return vy_mem_iterator_seek(itr, itr->curr_stmt);
 
 	if (itr->iterator_type == ITER_EQ &&
@@ -523,7 +523,7 @@ next:
 
 	const struct tuple *next_stmt;
 	next_stmt = *vy_mem_tree_iterator_get_elem(&itr->mem->tree, &next_pos);
-	if (vy_tuple_compare(itr->curr_stmt, next_stmt, cmp_def) != 0)
+	if (vy_stmt_compare(itr->curr_stmt, next_stmt, cmp_def) != 0)
 		return 1;
 
 	itr->curr_pos = next_pos;
@@ -577,8 +577,7 @@ vy_mem_iterator_skip(struct vy_mem_iterator *itr,
 	if (itr->search_started &&
 	    (itr->curr_stmt == NULL || last_stmt == NULL ||
 	     iterator_direction(itr->iterator_type) *
-	     vy_tuple_compare(itr->curr_stmt, last_stmt,
-			      itr->mem->cmp_def) > 0))
+	     vy_stmt_compare(itr->curr_stmt, last_stmt, itr->mem->cmp_def) > 0))
 		return 0;
 
 	vy_history_cleanup(history);
diff --git a/src/box/vy_mem.h b/src/box/vy_mem.h
index b41c3c0b..39f238b3 100644
--- a/src/box/vy_mem.h
+++ b/src/box/vy_mem.h
@@ -90,7 +90,7 @@ static int
 vy_mem_tree_cmp(const struct tuple *a, const struct tuple *b,
 		struct key_def *cmp_def)
 {
-	int res = vy_tuple_compare(a, b, cmp_def);
+	int res = vy_stmt_compare(a, b, cmp_def);
 	if (res)
 		return res;
 	int64_t a_lsn = vy_stmt_lsn(a), b_lsn = vy_stmt_lsn(b);
diff --git a/src/box/vy_range.c b/src/box/vy_range.c
index 2a2d2d00..f76615c4 100644
--- a/src/box/vy_range.c
+++ b/src/box/vy_range.c
@@ -63,8 +63,8 @@ vy_range_tree_cmp(struct vy_range *range_a, struct vy_range *range_b)
 		return 1;
 
 	assert(range_a->cmp_def == range_b->cmp_def);
-	return vy_key_compare(range_a->begin, range_b->begin,
-			      range_a->cmp_def);
+	return vy_stmt_compare(range_a->begin, range_b->begin,
+			       range_a->cmp_def);
 }
 
 int
@@ -73,7 +73,7 @@ vy_range_tree_key_cmp(const struct tuple *stmt, struct vy_range *range)
 	/* Any key > -inf. */
 	if (range->begin == NULL)
 		return 1;
-	return vy_stmt_compare_with_key(stmt, range->begin, range->cmp_def);
+	return vy_stmt_compare(stmt, range->begin, range->cmp_def);
 }
 
 struct vy_range *
@@ -125,8 +125,7 @@ vy_range_tree_find_by_key(vy_range_tree_t *tree,
 		/* switch to previous for case (4) */
 		if (range != NULL && range->begin != NULL &&
 		    !vy_stmt_is_full_key(key, range->cmp_def) &&
-		    vy_stmt_compare_with_key(key, range->begin,
-					     range->cmp_def) == 0)
+		    vy_stmt_compare(key, range->begin, range->cmp_def) == 0)
 			range = vy_range_tree_prev(tree, range);
 		/* for case 5 or subcase of case 4 */
 		if (range == NULL)
@@ -160,8 +159,8 @@ vy_range_tree_find_by_key(vy_range_tree_t *tree,
 		if (range != NULL) {
 			/* fix curr_range for cases 2 and 3 */
 			if (range->begin != NULL &&
-			    vy_stmt_compare_with_key(key, range->begin,
-						     range->cmp_def) != 0) {
+			    vy_stmt_compare(key, range->begin,
+					    range->cmp_def) != 0) {
 				struct vy_range *prev;
 				prev = vy_range_tree_prev(tree, range);
 				if (prev != NULL)
diff --git a/src/box/vy_read_iterator.c b/src/box/vy_read_iterator.c
index 740ee940..94c5f4b9 100644
--- a/src/box/vy_read_iterator.c
+++ b/src/box/vy_read_iterator.c
@@ -148,17 +148,17 @@ vy_read_iterator_range_is_done(struct vy_read_iterator *itr,
 	int dir = iterator_direction(itr->iterator_type);
 
 	if (dir > 0 && range->end != NULL &&
-	    (next_key == NULL || vy_tuple_compare_with_key(next_key,
-					range->end, cmp_def) >= 0) &&
+	    (next_key == NULL || vy_stmt_compare(next_key, range->end,
+						 cmp_def) >= 0) &&
 	    (itr->iterator_type != ITER_EQ ||
-	     vy_stmt_compare_with_key(itr->key, range->end, cmp_def) >= 0))
+	     vy_stmt_compare(itr->key, range->end, cmp_def) >= 0))
 		return true;
 
 	if (dir < 0 && range->begin != NULL &&
-	    (next_key == NULL || vy_tuple_compare_with_key(next_key,
-					range->begin, cmp_def) < 0) &&
+	    (next_key == NULL || vy_stmt_compare(next_key, range->begin,
+						 cmp_def) < 0) &&
 	    (itr->iterator_type != ITER_REQ ||
-	     vy_stmt_compare_with_key(itr->key, range->begin, cmp_def) <= 0))
+	     vy_stmt_compare(itr->key, range->begin, cmp_def) <= 0))
 		return true;
 
 	return false;
@@ -185,7 +185,7 @@ vy_read_iterator_cmp_stmt(struct vy_read_iterator *itr,
 	if (a == NULL && b == NULL)
 		return 0;
 	return iterator_direction(itr->iterator_type) *
-		vy_tuple_compare(a, b, itr->lsm->cmp_def);
+		vy_stmt_compare(a, b, itr->lsm->cmp_def);
 }
 
 /**
@@ -763,12 +763,12 @@ vy_read_iterator_next_range(struct vy_read_iterator *itr)
 		 * Make sure the next statement falls in the range.
 		 */
 		if (dir > 0 && (range->end == NULL ||
-				vy_tuple_compare_with_key(itr->last_stmt,
-						range->end, cmp_def) < 0))
+				vy_stmt_compare(itr->last_stmt, range->end,
+						cmp_def) < 0))
 			break;
 		if (dir < 0 && (range->begin == NULL ||
-				vy_tuple_compare_with_key(itr->last_stmt,
-						range->begin, cmp_def) > 0))
+				vy_stmt_compare(itr->last_stmt, range->begin,
+						cmp_def) > 0))
 			break;
 	}
 	itr->curr_range = range;
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index fae123d8..8d40ef00 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -449,21 +449,21 @@ vy_slice_cut(struct vy_slice *slice, int64_t id, struct tuple *begin,
 	*result = NULL;
 
 	if (begin != NULL && slice->end != NULL &&
-	    vy_key_compare(begin, slice->end, cmp_def) >= 0)
+	    vy_stmt_compare(begin, slice->end, cmp_def) >= 0)
 		return 0; /* no intersection: begin >= slice->end */
 
 	if (end != NULL && slice->begin != NULL &&
-	    vy_key_compare(end, slice->begin, cmp_def) <= 0)
+	    vy_stmt_compare(end, slice->begin, cmp_def) <= 0)
 		return 0; /* no intersection: end <= slice->end */
 
 	/* begin = MAX(begin, slice->begin) */
 	if (slice->begin != NULL &&
-	    (begin == NULL || vy_key_compare(begin, slice->begin, cmp_def) < 0))
+	    (begin == NULL || vy_stmt_compare(begin, slice->begin, cmp_def) < 0))
 		begin = slice->begin;
 
 	/* end = MIN(end, slice->end) */
 	if (slice->end != NULL &&
-	    (end == NULL || vy_key_compare(end, slice->end, cmp_def) > 0))
+	    (end == NULL || vy_stmt_compare(end, slice->end, cmp_def) > 0))
 		end = slice->end;
 
 	*result = vy_slice_new(id, slice->run, begin, end, cmp_def);
@@ -1206,8 +1206,8 @@ vy_run_iterator_find_lsn(struct vy_run_iterator *itr,
 				return -1;
 			if (vy_stmt_lsn(test_stmt) > (**itr->read_view).vlsn ||
 			    vy_stmt_flags(test_stmt) & VY_STMT_SKIP_READ ||
-			    vy_tuple_compare(itr->curr_stmt, test_stmt,
-					     cmp_def) != 0) {
+			    vy_stmt_compare(itr->curr_stmt, test_stmt,
+					    cmp_def) != 0) {
 				tuple_unref(test_stmt);
 				break;
 			}
@@ -1219,8 +1219,7 @@ vy_run_iterator_find_lsn(struct vy_run_iterator *itr,
 	/* Check if the result is within the slice boundaries. */
 	if (iterator_type == ITER_LE || iterator_type == ITER_LT) {
 		if (slice->begin != NULL &&
-		    vy_tuple_compare_with_key(itr->curr_stmt, slice->begin,
-					      cmp_def) < 0) {
+		    vy_stmt_compare(itr->curr_stmt, slice->begin, cmp_def) < 0) {
 			vy_run_iterator_stop(itr);
 			return 0;
 		}
@@ -1228,8 +1227,7 @@ vy_run_iterator_find_lsn(struct vy_run_iterator *itr,
 		assert(iterator_type == ITER_GE || iterator_type == ITER_GT ||
 		       iterator_type == ITER_EQ);
 		if (slice->end != NULL &&
-		    vy_tuple_compare_with_key(itr->curr_stmt, slice->end,
-					      cmp_def) >= 0) {
+		    vy_stmt_compare(itr->curr_stmt, slice->end, cmp_def) >= 0) {
 			vy_run_iterator_stop(itr);
 			return 0;
 		}
@@ -1360,7 +1358,7 @@ vy_run_iterator_seek(struct vy_run_iterator *itr,
 		 *         | ge  | begin | ge  |
 		 *         | eq  |    stop     |
 		 */
-		cmp = vy_stmt_compare_with_key(key, slice->begin, cmp_def);
+		cmp = vy_stmt_compare(key, slice->begin, cmp_def);
 		if (cmp < 0 && iterator_type == ITER_EQ) {
 			vy_run_iterator_stop(itr);
 			*ret = NULL;
@@ -1387,7 +1385,7 @@ vy_run_iterator_seek(struct vy_run_iterator *itr,
 		 * > end   | lt  | end   | lt  |
 		 *         | le  | end   | lt  |
 		 */
-		cmp = vy_stmt_compare_with_key(key, slice->end, cmp_def);
+		cmp = vy_stmt_compare(key, slice->end, cmp_def);
 		if (cmp > 0 || (cmp == 0 && iterator_type != ITER_LT)) {
 			iterator_type = ITER_LT;
 			key = slice->end;
@@ -1480,7 +1478,7 @@ vy_run_iterator_next_key(struct vy_run_iterator *itr, struct tuple **ret)
 
 		if (vy_run_iterator_read(itr, itr->curr_pos, &next_key) != 0)
 			return -1;
-	} while (vy_tuple_compare(itr->curr_stmt, next_key, itr->cmp_def) == 0);
+	} while (vy_stmt_compare(itr->curr_stmt, next_key, itr->cmp_def) == 0);
 
 	tuple_unref(itr->curr_stmt);
 	itr->curr_stmt = next_key;
@@ -1521,7 +1519,7 @@ next:
 	if (vy_run_iterator_read(itr, next_pos, &next_key) != 0)
 		return -1;
 
-	if (vy_tuple_compare(itr->curr_stmt, next_key, itr->cmp_def) != 0) {
+	if (vy_stmt_compare(itr->curr_stmt, next_key, itr->cmp_def) != 0) {
 		tuple_unref(next_key);
 		return 0;
 	}
@@ -1571,8 +1569,7 @@ vy_run_iterator_skip(struct vy_run_iterator *itr,
 	if (itr->search_started &&
 	    (itr->curr_stmt == NULL || last_stmt == NULL ||
 	     iterator_direction(itr->iterator_type) *
-	     vy_tuple_compare(itr->curr_stmt, last_stmt,
-			      itr->cmp_def) > 0))
+	     vy_stmt_compare(itr->curr_stmt, last_stmt, itr->cmp_def) > 0))
 		return 0;
 
 	vy_history_cleanup(history);
@@ -2585,8 +2582,8 @@ vy_slice_stream_search(struct vy_stmt_stream *virt_stream)
 					stream->is_primary);
 		if (fnd_key == NULL)
 			return -1;
-		int cmp = vy_tuple_compare_with_key(fnd_key,
-				stream->slice->begin, stream->cmp_def);
+		int cmp = vy_stmt_compare(fnd_key, stream->slice->begin,
+					  stream->cmp_def);
 		if (cmp < 0)
 			beg = mid + 1;
 		else
@@ -2637,8 +2634,7 @@ vy_slice_stream_next(struct vy_stmt_stream *virt_stream, struct tuple **ret)
 	/* Check that the tuple is not out of slice bounds = */
 	if (stream->slice->end != NULL &&
 	    stream->page_no >= stream->slice->last_page_no &&
-	    vy_tuple_compare_with_key(tuple, stream->slice->end,
-				      stream->cmp_def) >= 0) {
+	    vy_stmt_compare(tuple, stream->slice->end, stream->cmp_def) >= 0) {
 		tuple_unref(tuple);
 		return 0;
 	}
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index 1dfbe1f7..98c1a2f3 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -321,94 +321,9 @@ vy_stmt_unref_if_possible(struct tuple *stmt)
 }
 
 /**
- * Specialized comparators are faster than general-purpose comparators.
- * For example, vy_stmt_compare - slowest comparator because it in worst case
- * checks all combinations of key and tuple types, but
- * vy_key_compare - fastest comparator, because it shouldn't check statement
- * types.
+ * Compare two vinyl statements taking into account their
+ * formats (key or tuple).
  */
-
-/**
- * Compare SELECT/DELETE statements using the key definition
- * @param a left operand (SELECT/DELETE)
- * @param b right operand (SELECT/DELETE)
- * @param cmp_def key definition, with primary parts
- *
- * @retval 0   if a == b
- * @retval > 0 if a > b
- * @retval < 0 if a < b
- *
- * @sa key_compare()
- */
-static inline int
-vy_key_compare(const struct tuple *a, const struct tuple *b,
-	       struct key_def *cmp_def)
-{
-	assert(vy_stmt_type(a) == IPROTO_SELECT);
-	assert(vy_stmt_type(b) == IPROTO_SELECT);
-	return key_compare(tuple_data(a), tuple_data(b), cmp_def);
-}
-
-/**
- * Compare REPLACE/UPSERTS statements.
- * @param a left operand (REPLACE/UPSERT)
- * @param b right operand (REPLACE/UPSERT)
- * @param cmp_def key definition with primary parts
- *
- * @retval 0   if a == b
- * @retval > 0 if a > b
- * @retval < 0 if a < b
- *
- * @sa tuple_compare()
- */
-static inline int
-vy_tuple_compare(const struct tuple *a, const struct tuple *b,
-		 struct key_def *cmp_def)
-{
-	enum iproto_type type;
-	type = vy_stmt_type(a);
-	assert(type == IPROTO_INSERT || type == IPROTO_REPLACE ||
-	       type == IPROTO_UPSERT || type == IPROTO_DELETE);
-	type = vy_stmt_type(b);
-	assert(type == IPROTO_INSERT || type == IPROTO_REPLACE ||
-	       type == IPROTO_UPSERT || type == IPROTO_DELETE);
-	(void)type;
-	return tuple_compare(a, b, cmp_def);
-}
-
-/**
- * Compare REPLACE/UPSERT with SELECT/DELETE using the key
- * definition
- * @param tuple Left operand (REPLACE/UPSERT)
- * @param key   MessagePack array of key fields, right operand.
- *
- * @retval > 0  tuple > key.
- * @retval == 0 tuple == key in all fields
- * @retval == 0 tuple is prefix of key
- * @retval == 0 key is a prefix of tuple
- * @retval < 0  tuple < key.
- *
- * @sa tuple_compare_with_key()
- */
-static inline int
-vy_tuple_compare_with_raw_key(const struct tuple *tuple, const char *key,
-			      struct key_def *key_def)
-{
-	uint32_t part_count = mp_decode_array(&key);
-	return tuple_compare_with_key(tuple, key, part_count, key_def);
-}
-
-/** @sa vy_tuple_compare_with_raw_key(). */
-static inline int
-vy_tuple_compare_with_key(const struct tuple *tuple, const struct tuple *key,
-			  struct key_def *key_def)
-{
-	const char *key_mp = tuple_data(key);
-	uint32_t part_count = mp_decode_array(&key_mp);
-	return tuple_compare_with_key(tuple, key_mp, part_count, key_def);
-}
-
-/** @sa tuple_compare. */
 static inline int
 vy_stmt_compare(const struct tuple *a, const struct tuple *b,
 		struct key_def *key_def)
@@ -416,36 +331,36 @@ vy_stmt_compare(const struct tuple *a, const struct tuple *b,
 	bool a_is_tuple = vy_stmt_type(a) != IPROTO_SELECT;
 	bool b_is_tuple = vy_stmt_type(b) != IPROTO_SELECT;
 	if (a_is_tuple && b_is_tuple) {
-		return vy_tuple_compare(a, b, key_def);
+		return tuple_compare(a, b, key_def);
 	} else if (a_is_tuple && !b_is_tuple) {
-		return vy_tuple_compare_with_key(a, b, key_def);
+		const char *key = tuple_data(b);
+		uint32_t part_count = mp_decode_array(&key);
+		return tuple_compare_with_key(a, key, part_count, key_def);
 	} else if (!a_is_tuple && b_is_tuple) {
-		return -vy_tuple_compare_with_key(b, a, key_def);
+		const char *key = tuple_data(a);
+		uint32_t part_count = mp_decode_array(&key);
+		return -tuple_compare_with_key(b, key, part_count, key_def);
 	} else {
 		assert(!a_is_tuple && !b_is_tuple);
-		return vy_key_compare(a, b, key_def);
+		return key_compare(tuple_data(a), tuple_data(b), key_def);
 	}
 }
 
-/** @sa tuple_compare_with_raw_key. */
+/**
+ * Compare a vinyl statement (key or tuple) with a raw key
+ * (msgpack array).
+ */
 static inline int
 vy_stmt_compare_with_raw_key(const struct tuple *stmt, const char *key,
 			     struct key_def *key_def)
 {
-	if (vy_stmt_type(stmt) != IPROTO_SELECT)
-		return vy_tuple_compare_with_raw_key(stmt, key, key_def);
+	if (vy_stmt_type(stmt) != IPROTO_SELECT) {
+		uint32_t part_count = mp_decode_array(&key);
+		return tuple_compare_with_key(stmt, key, part_count, key_def);
+	}
 	return key_compare(tuple_data(stmt), key, key_def);
 }
 
-/** @sa tuple_compare_with_key. */
-static inline int
-vy_stmt_compare_with_key(const struct tuple *stmt, const struct tuple *key,
-			 struct key_def *key_def)
-{
-	assert(vy_stmt_type(key) == IPROTO_SELECT);
-	return vy_stmt_compare_with_raw_key(stmt, tuple_data(key), key_def);
-}
-
 /**
  * Create the SELECT statement from raw MessagePack data.
  * @param format     Format of an index.
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index a125a4a7..3768b3bd 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -68,7 +68,7 @@ write_set_cmp(struct txv *a, struct txv *b)
 {
 	int rc = a->lsm < b->lsm ? -1 : a->lsm > b->lsm;
 	if (rc == 0)
-		return vy_tuple_compare(a->stmt, b->stmt, a->lsm->cmp_def);
+		return vy_stmt_compare(a->stmt, b->stmt, a->lsm->cmp_def);
 	return rc;
 }
 
@@ -1213,8 +1213,8 @@ vy_txw_iterator_skip(struct vy_txw_iterator *itr,
 	if (itr->search_started &&
 	    (itr->curr_txv == NULL || last_stmt == NULL ||
 	     iterator_direction(itr->iterator_type) *
-	     vy_tuple_compare(itr->curr_txv->stmt, last_stmt,
-			      itr->lsm->cmp_def) > 0))
+	     vy_stmt_compare(itr->curr_txv->stmt, last_stmt,
+			     itr->lsm->cmp_def) > 0))
 		return 0;
 
 	vy_history_cleanup(history);
diff --git a/src/box/vy_upsert.c b/src/box/vy_upsert.c
index ebea2789..3c5a4abb 100644
--- a/src/box/vy_upsert.c
+++ b/src/box/vy_upsert.c
@@ -205,7 +205,7 @@ check_key:
 	 * Check that key hasn't been changed after applying operations.
 	 */
 	if (!key_update_can_be_skipped(cmp_def->column_mask, column_mask) &&
-	    vy_tuple_compare(old_stmt, result_stmt, cmp_def) != 0) {
+	    vy_stmt_compare(old_stmt, result_stmt, cmp_def) != 0) {
 		/*
 		 * Key has been changed: ignore this UPSERT and
 		 * @retval the old stmt.
diff --git a/src/box/vy_write_iterator.c b/src/box/vy_write_iterator.c
index 365b44bb..0e804ac1 100644
--- a/src/box/vy_write_iterator.c
+++ b/src/box/vy_write_iterator.c
@@ -225,7 +225,7 @@ heap_less(heap_t *heap, struct vy_write_src *src1, struct vy_write_src *src2)
 	struct vy_write_iterator *stream =
 		container_of(heap, struct vy_write_iterator, src_heap);
 
-	int cmp = vy_tuple_compare(src1->tuple, src2->tuple, stream->cmp_def);
+	int cmp = vy_stmt_compare(src1->tuple, src2->tuple, stream->cmp_def);
 	if (cmp != 0)
 		return cmp < 0;
 
-- 
2.11.0




More information about the Tarantool-patches mailing list