Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladimir Davydov <vdavydov.dev@gmail.com>
To: kostja@tarantool.org
Cc: tarantool-patches@freelists.org
Subject: [PATCH 08/12] vinyl: remove optimized comparators
Date: Thu, 21 Feb 2019 13:26:08 +0300	[thread overview]
Message-ID: <52cf8dd45c4ebd27368bef7311c71ef665931f53.1550744027.git.vdavydov.dev@gmail.com> (raw)
In-Reply-To: <cover.1550744027.git.vdavydov.dev@gmail.com>
In-Reply-To: <cover.1550744027.git.vdavydov.dev@gmail.com>

A vinyl statement (vy_stmt struct) may represent either a tuple or a
key. We differentiate between the two kinds by statement type - we use
SELECT for keys and other types for tuples. This was done that way so
that we could pass both tuples and keys to a read iterator as a search
key. To avoid branching in comparators when the types of compared
statements are known in advance, we provide several comparators, each of
which expects certain statement types, e.g. a tuple and a key. Actually,
such a micro optimization looks like an overkill, because a typical
comparator is called by a function pointer and has a lot of comparisons
in the code, see tuple_compare_slowpath for instance. Eliminating one
branch will hardly make the code perform better. At the same time, it
makes the code more difficult to write. Besides, once we remove nils
from statements read from disk (aka surrogate tuples), which is required
to support multikey index, the number of places where types of compared
statements are known will diminish drastically. That said, let's remove
optimized comparators and always use vy_stmt_compare, which checks types
of compared statements and calls the appropriate comparator.
---
 src/box/vinyl.c             |   8 +--
 src/box/vy_cache.c          |   8 +--
 src/box/vy_cache.h          |   2 +-
 src/box/vy_lsm.c            |   8 +--
 src/box/vy_mem.c            |  15 +++---
 src/box/vy_mem.h            |   2 +-
 src/box/vy_range.c          |  13 +++--
 src/box/vy_read_iterator.c  |  22 ++++----
 src/box/vy_run.c            |  36 ++++++-------
 src/box/vy_stmt.h           | 121 +++++++-------------------------------------
 src/box/vy_tx.c             |   6 +--
 src/box/vy_upsert.c         |   2 +-
 src/box/vy_write_iterator.c |   2 +-
 13 files changed, 77 insertions(+), 168 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 69880238..f2f37734 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1306,7 +1306,7 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx,
 		return -1;
 
 	if (*result == NULL ||
-	    vy_tuple_compare(*result, tuple, lsm->key_def) != 0) {
+	    vy_stmt_compare(*result, tuple, lsm->key_def) != 0) {
 		/*
 		 * If a tuple read from a secondary index doesn't
 		 * match the tuple corresponding to it in the
@@ -1525,7 +1525,7 @@ vy_check_is_unique_secondary(struct vy_tx *tx, const struct vy_read_view **rv,
 	 * fail here.
 	 */
 	if (found != NULL && vy_stmt_type(stmt) == IPROTO_REPLACE &&
-	    vy_tuple_compare(stmt, found, lsm->pk->key_def) == 0) {
+	    vy_stmt_compare(stmt, found, lsm->pk->key_def) == 0) {
 		tuple_unref(found);
 		return 0;
 	}
@@ -1730,7 +1730,7 @@ vy_check_update(struct space *space, const struct vy_lsm *pk,
 		uint64_t column_mask)
 {
 	if (!key_update_can_be_skipped(pk->key_def->column_mask, column_mask) &&
-	    vy_tuple_compare(old_tuple, new_tuple, pk->key_def) != 0) {
+	    vy_stmt_compare(old_tuple, new_tuple, pk->key_def) != 0) {
 		diag_set(ClientError, ER_CANT_UPDATE_PRIMARY_KEY,
 			 index_name_by_id(space, pk->index_id),
 			 space_name(space));
@@ -3507,7 +3507,7 @@ vy_squash_process(struct vy_squash *squash)
 	while (!vy_mem_tree_iterator_is_invalid(&mem_itr)) {
 		const struct tuple *mem_stmt;
 		mem_stmt = *vy_mem_tree_iterator_get_elem(&mem->tree, &mem_itr);
-		if (vy_tuple_compare(result, mem_stmt, lsm->cmp_def) != 0 ||
+		if (vy_stmt_compare(result, mem_stmt, lsm->cmp_def) != 0 ||
 		    vy_stmt_type(mem_stmt) != IPROTO_UPSERT)
 			break;
 		assert(vy_stmt_lsn(mem_stmt) >= MAX_LSN);
diff --git a/src/box/vy_cache.c b/src/box/vy_cache.c
index 320ea04d..8354b1c2 100644
--- a/src/box/vy_cache.c
+++ b/src/box/vy_cache.c
@@ -370,8 +370,8 @@ vy_cache_add(struct vy_cache *cache, struct tuple *stmt,
 							&inserted);
 		assert(*prev_check_entry != NULL);
 		struct tuple *prev_check_stmt = (*prev_check_entry)->stmt;
-		int cmp = vy_tuple_compare(prev_stmt, prev_check_stmt,
-					   cache->cmp_def);
+		int cmp = vy_stmt_compare(prev_stmt, prev_check_stmt,
+					  cache->cmp_def);
 
 		if (entry->flags & flag) {
 			/* The found entry must be exactly prev_stmt. (2) */
@@ -722,8 +722,8 @@ vy_cache_iterator_skip(struct vy_cache_iterator *itr,
 	if (itr->search_started &&
 	    (itr->curr_stmt == NULL || last_stmt == NULL ||
 	     iterator_direction(itr->iterator_type) *
-	     vy_tuple_compare(itr->curr_stmt, last_stmt,
-			      itr->cache->cmp_def) > 0))
+	     vy_stmt_compare(itr->curr_stmt, last_stmt,
+			     itr->cache->cmp_def) > 0))
 		return 0;
 
 	*stop = false;
diff --git a/src/box/vy_cache.h b/src/box/vy_cache.h
index 7cb93155..a846622e 100644
--- a/src/box/vy_cache.h
+++ b/src/box/vy_cache.h
@@ -74,7 +74,7 @@ static inline int
 vy_cache_tree_cmp(struct vy_cache_entry *a,
 		  struct vy_cache_entry *b, struct key_def *cmp_def)
 {
-	return vy_tuple_compare(a->stmt, b->stmt, cmp_def);
+	return vy_stmt_compare(a->stmt, b->stmt, cmp_def);
 }
 
 /**
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index 1b59e436..73d369fb 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -405,7 +405,7 @@ vy_lsm_recover_slice(struct vy_lsm *lsm, struct vy_range *range,
 			goto out;
 	}
 	if (begin != NULL && end != NULL &&
-	    vy_key_compare(begin, end, lsm->cmp_def) >= 0) {
+	    vy_stmt_compare(begin, end, lsm->cmp_def) >= 0) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("begin >= end for slice %lld",
 				    (long long)slice_info->id));
@@ -451,7 +451,7 @@ vy_lsm_recover_range(struct vy_lsm *lsm,
 			goto out;
 	}
 	if (begin != NULL && end != NULL &&
-	    vy_key_compare(begin, end, lsm->cmp_def) >= 0) {
+	    vy_stmt_compare(begin, end, lsm->cmp_def) >= 0) {
 		diag_set(ClientError, ER_INVALID_VYLOG_FILE,
 			 tt_sprintf("begin >= end for range %lld",
 				    (long long)range_info->id));
@@ -632,8 +632,8 @@ vy_lsm_recover(struct vy_lsm *lsm, struct vy_recovery *recovery,
 		int cmp = 0;
 		if (prev != NULL &&
 		    (prev->end == NULL || range->begin == NULL ||
-		     (cmp = vy_key_compare(prev->end, range->begin,
-					   lsm->cmp_def)) != 0)) {
+		     (cmp = vy_stmt_compare(prev->end, range->begin,
+					    lsm->cmp_def)) != 0)) {
 			const char *errmsg = cmp > 0 ?
 				"Nearby ranges %lld and %lld overlap" :
 				"Keys between ranges %lld and %lld not spanned";
diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c
index b672cc6f..79417671 100644
--- a/src/box/vy_mem.c
+++ b/src/box/vy_mem.c
@@ -146,7 +146,7 @@ vy_mem_older_lsn(struct vy_mem *mem, const struct tuple *stmt)
 
 	const struct tuple *result;
 	result = *vy_mem_tree_iterator_get_elem(&mem->tree, &itr);
-	if (vy_tuple_compare(result, stmt, mem->cmp_def) != 0)
+	if (vy_stmt_compare(result, stmt, mem->cmp_def) != 0)
 		return NULL;
 	return result;
 }
@@ -195,7 +195,7 @@ vy_mem_insert_upsert(struct vy_mem *mem, const struct tuple *stmt)
 	const struct tuple **older = vy_mem_tree_iterator_get_elem(&mem->tree,
 								   &inserted);
 	if (older == NULL || vy_stmt_type(*older) != IPROTO_UPSERT ||
-	    vy_tuple_compare(stmt, *older, mem->cmp_def) != 0)
+	    vy_stmt_compare(stmt, *older, mem->cmp_def) != 0)
 		return 0;
 	uint8_t n_upserts = vy_stmt_n_upserts(*older);
 	/*
@@ -335,8 +335,8 @@ vy_mem_iterator_find_lsn(struct vy_mem_iterator *itr,
 							       &prev_pos);
 			if (vy_stmt_lsn(prev_stmt) > (**itr->read_view).vlsn ||
 			    vy_stmt_flags(prev_stmt) & VY_STMT_SKIP_READ ||
-			    vy_tuple_compare(itr->curr_stmt, prev_stmt,
-					     cmp_def) != 0)
+			    vy_stmt_compare(itr->curr_stmt, prev_stmt,
+					    cmp_def) != 0)
 				break;
 			itr->curr_pos = prev_pos;
 			itr->curr_stmt = prev_stmt;
@@ -463,7 +463,7 @@ vy_mem_iterator_next_key(struct vy_mem_iterator *itr)
 			itr->curr_stmt = NULL;
 			return 1;
 		}
-	} while (vy_tuple_compare(prev_stmt, itr->curr_stmt, cmp_def) == 0);
+	} while (vy_stmt_compare(prev_stmt, itr->curr_stmt, cmp_def) == 0);
 
 	if (itr->iterator_type == ITER_EQ &&
 	    vy_stmt_compare(itr->key, itr->curr_stmt, cmp_def) != 0) {
@@ -497,7 +497,7 @@ next:
 
 	const struct tuple *next_stmt;
 	next_stmt = *vy_mem_tree_iterator_get_elem(&itr->mem->tree, &next_pos);
-	if (vy_tuple_compare(itr->curr_stmt, next_stmt, cmp_def) != 0)
+	if (vy_stmt_compare(itr->curr_stmt, next_stmt, cmp_def) != 0)
 		return 1;
 
 	itr->curr_pos = next_pos;
@@ -551,8 +551,7 @@ vy_mem_iterator_skip(struct vy_mem_iterator *itr,
 	if (itr->search_started &&
 	    (itr->curr_stmt == NULL || last_stmt == NULL ||
 	     iterator_direction(itr->iterator_type) *
-	     vy_tuple_compare(itr->curr_stmt, last_stmt,
-			      itr->mem->cmp_def) > 0))
+	     vy_stmt_compare(itr->curr_stmt, last_stmt, itr->mem->cmp_def) > 0))
 		return 0;
 
 	vy_history_cleanup(history);
diff --git a/src/box/vy_mem.h b/src/box/vy_mem.h
index b41c3c0b..39f238b3 100644
--- a/src/box/vy_mem.h
+++ b/src/box/vy_mem.h
@@ -90,7 +90,7 @@ static int
 vy_mem_tree_cmp(const struct tuple *a, const struct tuple *b,
 		struct key_def *cmp_def)
 {
-	int res = vy_tuple_compare(a, b, cmp_def);
+	int res = vy_stmt_compare(a, b, cmp_def);
 	if (res)
 		return res;
 	int64_t a_lsn = vy_stmt_lsn(a), b_lsn = vy_stmt_lsn(b);
diff --git a/src/box/vy_range.c b/src/box/vy_range.c
index bbd40615..b36198f2 100644
--- a/src/box/vy_range.c
+++ b/src/box/vy_range.c
@@ -63,8 +63,8 @@ vy_range_tree_cmp(struct vy_range *range_a, struct vy_range *range_b)
 		return 1;
 
 	assert(range_a->cmp_def == range_b->cmp_def);
-	return vy_key_compare(range_a->begin, range_b->begin,
-			      range_a->cmp_def);
+	return vy_stmt_compare(range_a->begin, range_b->begin,
+			       range_a->cmp_def);
 }
 
 int
@@ -73,7 +73,7 @@ vy_range_tree_key_cmp(const struct tuple *stmt, struct vy_range *range)
 	/* Any key > -inf. */
 	if (range->begin == NULL)
 		return 1;
-	return vy_stmt_compare_with_key(stmt, range->begin, range->cmp_def);
+	return vy_stmt_compare(stmt, range->begin, range->cmp_def);
 }
 
 struct vy_range *
@@ -125,8 +125,7 @@ vy_range_tree_find_by_key(vy_range_tree_t *tree,
 		/* switch to previous for case (4) */
 		if (range != NULL && range->begin != NULL &&
 		    !vy_stmt_is_full_key(key, range->cmp_def) &&
-		    vy_stmt_compare_with_key(key, range->begin,
-					     range->cmp_def) == 0)
+		    vy_stmt_compare(key, range->begin, range->cmp_def) == 0)
 			range = vy_range_tree_prev(tree, range);
 		/* for case 5 or subcase of case 4 */
 		if (range == NULL)
@@ -160,8 +159,8 @@ vy_range_tree_find_by_key(vy_range_tree_t *tree,
 		if (range != NULL) {
 			/* fix curr_range for cases 2 and 3 */
 			if (range->begin != NULL &&
-			    vy_stmt_compare_with_key(key, range->begin,
-						     range->cmp_def) != 0) {
+			    vy_stmt_compare(key, range->begin,
+					    range->cmp_def) != 0) {
 				struct vy_range *prev;
 				prev = vy_range_tree_prev(tree, range);
 				if (prev != NULL)
diff --git a/src/box/vy_read_iterator.c b/src/box/vy_read_iterator.c
index 740ee940..94c5f4b9 100644
--- a/src/box/vy_read_iterator.c
+++ b/src/box/vy_read_iterator.c
@@ -148,17 +148,17 @@ vy_read_iterator_range_is_done(struct vy_read_iterator *itr,
 	int dir = iterator_direction(itr->iterator_type);
 
 	if (dir > 0 && range->end != NULL &&
-	    (next_key == NULL || vy_tuple_compare_with_key(next_key,
-					range->end, cmp_def) >= 0) &&
+	    (next_key == NULL || vy_stmt_compare(next_key, range->end,
+						 cmp_def) >= 0) &&
 	    (itr->iterator_type != ITER_EQ ||
-	     vy_stmt_compare_with_key(itr->key, range->end, cmp_def) >= 0))
+	     vy_stmt_compare(itr->key, range->end, cmp_def) >= 0))
 		return true;
 
 	if (dir < 0 && range->begin != NULL &&
-	    (next_key == NULL || vy_tuple_compare_with_key(next_key,
-					range->begin, cmp_def) < 0) &&
+	    (next_key == NULL || vy_stmt_compare(next_key, range->begin,
+						 cmp_def) < 0) &&
 	    (itr->iterator_type != ITER_REQ ||
-	     vy_stmt_compare_with_key(itr->key, range->begin, cmp_def) <= 0))
+	     vy_stmt_compare(itr->key, range->begin, cmp_def) <= 0))
 		return true;
 
 	return false;
@@ -185,7 +185,7 @@ vy_read_iterator_cmp_stmt(struct vy_read_iterator *itr,
 	if (a == NULL && b == NULL)
 		return 0;
 	return iterator_direction(itr->iterator_type) *
-		vy_tuple_compare(a, b, itr->lsm->cmp_def);
+		vy_stmt_compare(a, b, itr->lsm->cmp_def);
 }
 
 /**
@@ -763,12 +763,12 @@ vy_read_iterator_next_range(struct vy_read_iterator *itr)
 		 * Make sure the next statement falls in the range.
 		 */
 		if (dir > 0 && (range->end == NULL ||
-				vy_tuple_compare_with_key(itr->last_stmt,
-						range->end, cmp_def) < 0))
+				vy_stmt_compare(itr->last_stmt, range->end,
+						cmp_def) < 0))
 			break;
 		if (dir < 0 && (range->begin == NULL ||
-				vy_tuple_compare_with_key(itr->last_stmt,
-						range->begin, cmp_def) > 0))
+				vy_stmt_compare(itr->last_stmt, range->begin,
+						cmp_def) > 0))
 			break;
 	}
 	itr->curr_range = range;
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index 915e7f95..06ef299e 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -449,21 +449,21 @@ vy_slice_cut(struct vy_slice *slice, int64_t id, struct tuple *begin,
 	*result = NULL;
 
 	if (begin != NULL && slice->end != NULL &&
-	    vy_key_compare(begin, slice->end, cmp_def) >= 0)
+	    vy_stmt_compare(begin, slice->end, cmp_def) >= 0)
 		return 0; /* no intersection: begin >= slice->end */
 
 	if (end != NULL && slice->begin != NULL &&
-	    vy_key_compare(end, slice->begin, cmp_def) <= 0)
+	    vy_stmt_compare(end, slice->begin, cmp_def) <= 0)
 		return 0; /* no intersection: end <= slice->end */
 
 	/* begin = MAX(begin, slice->begin) */
 	if (slice->begin != NULL &&
-	    (begin == NULL || vy_key_compare(begin, slice->begin, cmp_def) < 0))
+	    (begin == NULL || vy_stmt_compare(begin, slice->begin, cmp_def) < 0))
 		begin = slice->begin;
 
 	/* end = MIN(end, slice->end) */
 	if (slice->end != NULL &&
-	    (end == NULL || vy_key_compare(end, slice->end, cmp_def) > 0))
+	    (end == NULL || vy_stmt_compare(end, slice->end, cmp_def) > 0))
 		end = slice->end;
 
 	*result = vy_slice_new(id, slice->run, begin, end, cmp_def);
@@ -1206,8 +1206,8 @@ vy_run_iterator_find_lsn(struct vy_run_iterator *itr,
 				return -1;
 			if (vy_stmt_lsn(test_stmt) > (**itr->read_view).vlsn ||
 			    vy_stmt_flags(test_stmt) & VY_STMT_SKIP_READ ||
-			    vy_tuple_compare(itr->curr_stmt, test_stmt,
-					     cmp_def) != 0) {
+			    vy_stmt_compare(itr->curr_stmt, test_stmt,
+					    cmp_def) != 0) {
 				tuple_unref(test_stmt);
 				break;
 			}
@@ -1219,8 +1219,7 @@ vy_run_iterator_find_lsn(struct vy_run_iterator *itr,
 	/* Check if the result is within the slice boundaries. */
 	if (iterator_type == ITER_LE || iterator_type == ITER_LT) {
 		if (slice->begin != NULL &&
-		    vy_tuple_compare_with_key(itr->curr_stmt, slice->begin,
-					      cmp_def) < 0) {
+		    vy_stmt_compare(itr->curr_stmt, slice->begin, cmp_def) < 0) {
 			vy_run_iterator_stop(itr);
 			return 0;
 		}
@@ -1228,8 +1227,7 @@ vy_run_iterator_find_lsn(struct vy_run_iterator *itr,
 		assert(iterator_type == ITER_GE || iterator_type == ITER_GT ||
 		       iterator_type == ITER_EQ);
 		if (slice->end != NULL &&
-		    vy_tuple_compare_with_key(itr->curr_stmt, slice->end,
-					      cmp_def) >= 0) {
+		    vy_stmt_compare(itr->curr_stmt, slice->end, cmp_def) >= 0) {
 			vy_run_iterator_stop(itr);
 			return 0;
 		}
@@ -1360,7 +1358,7 @@ vy_run_iterator_seek(struct vy_run_iterator *itr,
 		 *         | ge  | begin | ge  |
 		 *         | eq  |    stop     |
 		 */
-		cmp = vy_stmt_compare_with_key(key, slice->begin, cmp_def);
+		cmp = vy_stmt_compare(key, slice->begin, cmp_def);
 		if (cmp < 0 && iterator_type == ITER_EQ) {
 			vy_run_iterator_stop(itr);
 			return 0;
@@ -1386,7 +1384,7 @@ vy_run_iterator_seek(struct vy_run_iterator *itr,
 		 * > end   | lt  | end   | lt  |
 		 *         | le  | end   | lt  |
 		 */
-		cmp = vy_stmt_compare_with_key(key, slice->end, cmp_def);
+		cmp = vy_stmt_compare(key, slice->end, cmp_def);
 		if (cmp > 0 || (cmp == 0 && iterator_type != ITER_LT)) {
 			iterator_type = ITER_LT;
 			key = slice->end;
@@ -1479,7 +1477,7 @@ vy_run_iterator_next_key(struct vy_run_iterator *itr, struct tuple **ret)
 
 		if (vy_run_iterator_read(itr, itr->curr_pos, &next_key) != 0)
 			return -1;
-	} while (vy_tuple_compare(itr->curr_stmt, next_key, itr->cmp_def) == 0);
+	} while (vy_stmt_compare(itr->curr_stmt, next_key, itr->cmp_def) == 0);
 
 	tuple_unref(itr->curr_stmt);
 	itr->curr_stmt = next_key;
@@ -1520,7 +1518,7 @@ next:
 	if (vy_run_iterator_read(itr, next_pos, &next_key) != 0)
 		return -1;
 
-	if (vy_tuple_compare(itr->curr_stmt, next_key, itr->cmp_def) != 0) {
+	if (vy_stmt_compare(itr->curr_stmt, next_key, itr->cmp_def) != 0) {
 		tuple_unref(next_key);
 		return 0;
 	}
@@ -1570,8 +1568,7 @@ vy_run_iterator_skip(struct vy_run_iterator *itr,
 	if (itr->search_started &&
 	    (itr->curr_stmt == NULL || last_stmt == NULL ||
 	     iterator_direction(itr->iterator_type) *
-	     vy_tuple_compare(itr->curr_stmt, last_stmt,
-			      itr->cmp_def) > 0))
+	     vy_stmt_compare(itr->curr_stmt, last_stmt, itr->cmp_def) > 0))
 		return 0;
 
 	vy_history_cleanup(history);
@@ -2584,8 +2581,8 @@ vy_slice_stream_search(struct vy_stmt_stream *virt_stream)
 					stream->is_primary);
 		if (fnd_key == NULL)
 			return -1;
-		int cmp = vy_tuple_compare_with_key(fnd_key,
-				stream->slice->begin, stream->cmp_def);
+		int cmp = vy_stmt_compare(fnd_key, stream->slice->begin,
+					  stream->cmp_def);
 		if (cmp < 0)
 			beg = mid + 1;
 		else
@@ -2636,8 +2633,7 @@ vy_slice_stream_next(struct vy_stmt_stream *virt_stream, struct tuple **ret)
 	/* Check that the tuple is not out of slice bounds = */
 	if (stream->slice->end != NULL &&
 	    stream->page_no >= stream->slice->last_page_no &&
-	    vy_tuple_compare_with_key(tuple, stream->slice->end,
-				      stream->cmp_def) >= 0) {
+	    vy_stmt_compare(tuple, stream->slice->end, stream->cmp_def) >= 0) {
 		tuple_unref(tuple);
 		return 0;
 	}
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index 0832aa0f..ab352906 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -321,94 +321,9 @@ vy_stmt_unref_if_possible(struct tuple *stmt)
 }
 
 /**
- * Specialized comparators are faster than general-purpose comparators.
- * For example, vy_stmt_compare - slowest comparator because it in worst case
- * checks all combinations of key and tuple types, but
- * vy_key_compare - fastest comparator, because it shouldn't check statement
- * types.
+ * Compare two vinyl statements taking into account their
+ * formats (key or tuple).
  */
-
-/**
- * Compare SELECT/DELETE statements using the key definition
- * @param a left operand (SELECT/DELETE)
- * @param b right operand (SELECT/DELETE)
- * @param cmp_def key definition, with primary parts
- *
- * @retval 0   if a == b
- * @retval > 0 if a > b
- * @retval < 0 if a < b
- *
- * @sa key_compare()
- */
-static inline int
-vy_key_compare(const struct tuple *a, const struct tuple *b,
-	       struct key_def *cmp_def)
-{
-	assert(vy_stmt_type(a) == IPROTO_SELECT);
-	assert(vy_stmt_type(b) == IPROTO_SELECT);
-	return key_compare(tuple_data(a), tuple_data(b), cmp_def);
-}
-
-/**
- * Compare REPLACE/UPSERTS statements.
- * @param a left operand (REPLACE/UPSERT)
- * @param b right operand (REPLACE/UPSERT)
- * @param cmp_def key definition with primary parts
- *
- * @retval 0   if a == b
- * @retval > 0 if a > b
- * @retval < 0 if a < b
- *
- * @sa tuple_compare()
- */
-static inline int
-vy_tuple_compare(const struct tuple *a, const struct tuple *b,
-		 struct key_def *cmp_def)
-{
-	enum iproto_type type;
-	type = vy_stmt_type(a);
-	assert(type == IPROTO_INSERT || type == IPROTO_REPLACE ||
-	       type == IPROTO_UPSERT || type == IPROTO_DELETE);
-	type = vy_stmt_type(b);
-	assert(type == IPROTO_INSERT || type == IPROTO_REPLACE ||
-	       type == IPROTO_UPSERT || type == IPROTO_DELETE);
-	(void)type;
-	return tuple_compare(a, b, cmp_def);
-}
-
-/**
- * Compare REPLACE/UPSERT with SELECT/DELETE using the key
- * definition
- * @param tuple Left operand (REPLACE/UPSERT)
- * @param key   MessagePack array of key fields, right operand.
- *
- * @retval > 0  tuple > key.
- * @retval == 0 tuple == key in all fields
- * @retval == 0 tuple is prefix of key
- * @retval == 0 key is a prefix of tuple
- * @retval < 0  tuple < key.
- *
- * @sa tuple_compare_with_key()
- */
-static inline int
-vy_tuple_compare_with_raw_key(const struct tuple *tuple, const char *key,
-			      struct key_def *key_def)
-{
-	uint32_t part_count = mp_decode_array(&key);
-	return tuple_compare_with_key(tuple, key, part_count, key_def);
-}
-
-/** @sa vy_tuple_compare_with_raw_key(). */
-static inline int
-vy_tuple_compare_with_key(const struct tuple *tuple, const struct tuple *key,
-			  struct key_def *key_def)
-{
-	const char *key_mp = tuple_data(key);
-	uint32_t part_count = mp_decode_array(&key_mp);
-	return tuple_compare_with_key(tuple, key_mp, part_count, key_def);
-}
-
-/** @sa tuple_compare. */
 static inline int
 vy_stmt_compare(const struct tuple *a, const struct tuple *b,
 		struct key_def *key_def)
@@ -416,36 +331,36 @@ vy_stmt_compare(const struct tuple *a, const struct tuple *b,
 	bool a_is_tuple = vy_stmt_type(a) != IPROTO_SELECT;
 	bool b_is_tuple = vy_stmt_type(b) != IPROTO_SELECT;
 	if (a_is_tuple && b_is_tuple) {
-		return vy_tuple_compare(a, b, key_def);
+		return tuple_compare(a, b, key_def);
 	} else if (a_is_tuple && !b_is_tuple) {
-		return vy_tuple_compare_with_key(a, b, key_def);
+		const char *key = tuple_data(b);
+		uint32_t part_count = mp_decode_array(&key);
+		return tuple_compare_with_key(a, key, part_count, key_def);
 	} else if (!a_is_tuple && b_is_tuple) {
-		return -vy_tuple_compare_with_key(b, a, key_def);
+		const char *key = tuple_data(a);
+		uint32_t part_count = mp_decode_array(&key);
+		return -tuple_compare_with_key(b, key, part_count, key_def);
 	} else {
 		assert(!a_is_tuple && !b_is_tuple);
-		return vy_key_compare(a, b, key_def);
+		return key_compare(tuple_data(a), tuple_data(b), key_def);
 	}
 }
 
-/** @sa tuple_compare_with_raw_key. */
+/**
+ * Compare a vinyl statement (key or tuple) with a raw key
+ * (msgpack array).
+ */
 static inline int
 vy_stmt_compare_with_raw_key(const struct tuple *stmt, const char *key,
 			     struct key_def *key_def)
 {
-	if (vy_stmt_type(stmt) != IPROTO_SELECT)
-		return vy_tuple_compare_with_raw_key(stmt, key, key_def);
+	if (vy_stmt_type(stmt) != IPROTO_SELECT) {
+		uint32_t part_count = mp_decode_array(&key);
+		return tuple_compare_with_key(stmt, key, part_count, key_def);
+	}
 	return key_compare(tuple_data(stmt), key, key_def);
 }
 
-/** @sa tuple_compare_with_key. */
-static inline int
-vy_stmt_compare_with_key(const struct tuple *stmt, const struct tuple *key,
-			 struct key_def *key_def)
-{
-	assert(vy_stmt_type(key) == IPROTO_SELECT);
-	return vy_stmt_compare_with_raw_key(stmt, tuple_data(key), key_def);
-}
-
 /**
  * Create the SELECT statement from raw MessagePack data.
  * @param format     Format of an index.
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index ac02ee4d..b9a837a3 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -67,7 +67,7 @@ write_set_cmp(struct txv *a, struct txv *b)
 {
 	int rc = a->lsm < b->lsm ? -1 : a->lsm > b->lsm;
 	if (rc == 0)
-		return vy_tuple_compare(a->stmt, b->stmt, a->lsm->cmp_def);
+		return vy_stmt_compare(a->stmt, b->stmt, a->lsm->cmp_def);
 	return rc;
 }
 
@@ -1189,8 +1189,8 @@ vy_txw_iterator_skip(struct vy_txw_iterator *itr,
 	if (itr->search_started &&
 	    (itr->curr_txv == NULL || last_stmt == NULL ||
 	     iterator_direction(itr->iterator_type) *
-	     vy_tuple_compare(itr->curr_txv->stmt, last_stmt,
-			      itr->lsm->cmp_def) > 0))
+	     vy_stmt_compare(itr->curr_txv->stmt, last_stmt,
+			     itr->lsm->cmp_def) > 0))
 		return 0;
 
 	vy_history_cleanup(history);
diff --git a/src/box/vy_upsert.c b/src/box/vy_upsert.c
index ebea2789..3c5a4abb 100644
--- a/src/box/vy_upsert.c
+++ b/src/box/vy_upsert.c
@@ -205,7 +205,7 @@ check_key:
 	 * Check that key hasn't been changed after applying operations.
 	 */
 	if (!key_update_can_be_skipped(cmp_def->column_mask, column_mask) &&
-	    vy_tuple_compare(old_stmt, result_stmt, cmp_def) != 0) {
+	    vy_stmt_compare(old_stmt, result_stmt, cmp_def) != 0) {
 		/*
 		 * Key has been changed: ignore this UPSERT and
 		 * @retval the old stmt.
diff --git a/src/box/vy_write_iterator.c b/src/box/vy_write_iterator.c
index 7a6a2062..dacd2e72 100644
--- a/src/box/vy_write_iterator.c
+++ b/src/box/vy_write_iterator.c
@@ -227,7 +227,7 @@ heap_less(heap_t *heap, struct heap_node *node1, struct heap_node *node2)
 	struct vy_write_src *src2 =
 		container_of(node2, struct vy_write_src, heap_node);
 
-	int cmp = vy_tuple_compare(src1->tuple, src2->tuple, stream->cmp_def);
+	int cmp = vy_stmt_compare(src1->tuple, src2->tuple, stream->cmp_def);
 	if (cmp != 0)
 		return cmp < 0;
 
-- 
2.11.0

  parent reply	other threads:[~2019-02-21 10:26 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-02-21 10:26 [PATCH 00/12] vinyl: do not fill secondary tuples with nulls Vladimir Davydov
2019-02-21 10:26 ` [PATCH 01/12] vinyl: use vy_lsm_env::empty_key where appropriate Vladimir Davydov
2019-02-21 10:59   ` [tarantool-patches] " Konstantin Osipov
2019-02-21 10:26 ` [PATCH 02/12] vinyl: make vy_tuple_delete static Vladimir Davydov
2019-02-21 11:00   ` [tarantool-patches] " Konstantin Osipov
2019-02-21 10:26 ` [PATCH 03/12] key_def: cleanup virtual function initialization Vladimir Davydov
2019-02-21 11:01   ` [tarantool-patches] " Konstantin Osipov
2019-02-21 12:05     ` Vladimir Davydov
2019-02-21 10:26 ` [PATCH 04/12] key_def: move cmp and hash functions declarations to key_def.h Vladimir Davydov
2019-02-21 11:02   ` [tarantool-patches] " Konstantin Osipov
2019-02-21 10:26 ` [PATCH 05/12] vinyl: move vy_tuple_key_contains_null to generic code Vladimir Davydov
2019-02-21 11:02   ` [tarantool-patches] " Konstantin Osipov
2019-02-21 10:26 ` [PATCH 06/12] vinyl: move vy_key_dup " Vladimir Davydov
2019-02-21 11:04   ` [tarantool-patches] " Konstantin Osipov
2019-02-21 11:52     ` Vladimir Davydov
2019-02-21 10:26 ` [PATCH 07/12] vinyl: sanitize full/empty key stmt detection Vladimir Davydov
2019-02-21 11:10   ` [tarantool-patches] " Konstantin Osipov
2019-02-21 12:11     ` Vladimir Davydov
2019-03-01 12:57   ` Vladimir Davydov
2019-02-21 10:26 ` Vladimir Davydov [this message]
2019-02-21 11:11   ` [tarantool-patches] Re: [PATCH 08/12] vinyl: remove optimized comparators Konstantin Osipov
2019-02-21 10:26 ` [PATCH 09/12] vinyl: introduce statement environment Vladimir Davydov
2019-02-21 11:14   ` [tarantool-patches] " Konstantin Osipov
2019-02-21 10:26 ` [PATCH 10/12] vinyl: rename key stmt construction routine Vladimir Davydov
2019-02-21 11:15   ` [tarantool-patches] " Konstantin Osipov
2019-02-21 12:14     ` Vladimir Davydov
2019-02-21 10:26 ` [PATCH 11/12] vinyl: don't use IPROTO_SELECT type for key statements Vladimir Davydov
2019-02-21 11:16   ` [tarantool-patches] " Konstantin Osipov
2019-02-21 10:26 ` [PATCH 12/12] vinyl: do not fill secondary tuples with nulls when decoded Vladimir Davydov
2019-02-21 15:39 ` [PATCH 00/12] vinyl: do not fill secondary tuples with nulls Vladimir Davydov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=52cf8dd45c4ebd27368bef7311c71ef665931f53.1550744027.git.vdavydov.dev@gmail.com \
    --to=vdavydov.dev@gmail.com \
    --cc=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [PATCH 08/12] vinyl: remove optimized comparators' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox