[PATCH 07/12] vinyl: sanitize full/empty key stmt detection

Vladimir Davydov vdavydov.dev at gmail.com
Thu Feb 21 13:26:07 MSK 2019


Historically, we use tuple_field_count to check whether a statement
represents an empty key (match all) or a full key (point lookup): if
the number of fields in a tuple is greater than or equal to the number
of parts in a key definition, it can be used as a full key; if the
number of fields is zero, then the statement represents an empty key.

While this used to be correct not so long ago, appearance of JSON
indexes changed the rules of the game: now a tuple can have nested
indexed fields so that the same field number appears in the key
definition multiple times. This means tuple_field_count can be less
than the number of key parts and hence the full key check won't work
for a statement representing a tuple.

Actually, any tuple in vinyl can be used as a full key as it has all
key parts by definition, there's no need to use tuple_field_count for
such statements - we only need to do that for statements representing
keys. Keeping that in mind, let's introduce helpers for checking
whether a statement can be used as a full/empty key and use them
throughout the code.
---
 src/box/vinyl.c            |  2 +-
 src/box/vy_cache.c         | 14 +++++++++-----
 src/box/vy_mem.c           |  2 +-
 src/box/vy_point_lookup.c  |  5 ++---
 src/box/vy_range.c         |  5 ++---
 src/box/vy_read_iterator.c |  6 +++---
 src/box/vy_read_set.c      | 18 ++++++------------
 src/box/vy_run.c           |  2 +-
 src/box/vy_stmt.h          | 38 ++++++++++++++++++++++++++++++++++++++
 src/box/vy_tx.c            |  4 ++--
 10 files changed, 65 insertions(+), 31 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 428bae1d..69880238 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1374,7 +1374,7 @@ vy_get(struct vy_lsm *lsm, struct vy_tx *tx,
 	int rc;
 	struct tuple *tuple;
 
-	if (tuple_field_count(key) >= lsm->cmp_def->part_count) {
+	if (vy_stmt_is_full_key(key, lsm->cmp_def)) {
 		/*
 		 * Use point lookup for a full key.
 		 */
diff --git a/src/box/vy_cache.c b/src/box/vy_cache.c
index ead56e95..320ea04d 100644
--- a/src/box/vy_cache.c
+++ b/src/box/vy_cache.c
@@ -270,13 +270,15 @@ vy_cache_add(struct vy_cache *cache, struct tuple *stmt,
 			 * Regardless of order, the statement is the first in
 			 * sequence of statements that is equal to the key.
 			 */
-			boundary_level = tuple_field_count(key);
+			boundary_level = vy_stmt_key_part_count(key,
+							cache->cmp_def);
 		}
 	} else {
 		assert(prev_stmt != NULL);
 		if (order == ITER_EQ || order == ITER_REQ) {
 			/* that is the last statement that is equal to key */
-			boundary_level = tuple_field_count(key);
+			boundary_level = vy_stmt_key_part_count(key,
+							cache->cmp_def);
 		} else {
 			/* that is the last statement */
 			boundary_level = 0;
@@ -527,7 +529,8 @@ static inline bool
 vy_cache_iterator_is_stop(struct vy_cache_iterator *itr,
 			  struct vy_cache_entry *entry)
 {
-	uint8_t key_level = tuple_field_count(itr->key);
+	uint8_t key_level = vy_stmt_key_part_count(itr->key,
+						   itr->cache->cmp_def);
 	/* select{} is actually an EQ iterator with part_count == 0 */
 	bool iter_is_eq = itr->iterator_type == ITER_EQ || key_level == 0;
 	if (iterator_direction(itr->iterator_type) > 0) {
@@ -556,7 +559,8 @@ static inline bool
 vy_cache_iterator_is_end_stop(struct vy_cache_iterator *itr,
 			      struct vy_cache_entry *last_entry)
 {
-	uint8_t key_level = tuple_field_count(itr->key);
+	uint8_t key_level = vy_stmt_key_part_count(itr->key,
+						   itr->cache->cmp_def);
 	/* select{} is actually an EQ iterator with part_count == 0 */
 	bool iter_is_eq = itr->iterator_type == ITER_EQ || key_level == 0;
 	if (iterator_direction(itr->iterator_type) > 0) {
@@ -644,7 +648,7 @@ vy_cache_iterator_seek(struct vy_cache_iterator *itr,
 	*entry = NULL;
 	itr->cache->stat.lookup++;
 
-	if (tuple_field_count(key) > 0) {
+	if (!vy_stmt_is_empty_key(key)) {
 		bool exact;
 		itr->curr_pos = iterator_type == ITER_EQ ||
 				iterator_type == ITER_GE ||
diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c
index f8b89d4e..b672cc6f 100644
--- a/src/box/vy_mem.c
+++ b/src/box/vy_mem.c
@@ -367,7 +367,7 @@ vy_mem_iterator_seek(struct vy_mem_iterator *itr,
 	tree_key.stmt = key;
 	/* (lsn == INT64_MAX - 1) means that lsn is ignored in comparison */
 	tree_key.lsn = INT64_MAX - 1;
-	if (tuple_field_count(key) > 0) {
+	if (!vy_stmt_is_empty_key(key)) {
 		if (iterator_type == ITER_EQ) {
 			bool exact;
 			itr->curr_pos =
diff --git a/src/box/vy_point_lookup.c b/src/box/vy_point_lookup.c
index 6b8bbd23..18d5622b 100644
--- a/src/box/vy_point_lookup.c
+++ b/src/box/vy_point_lookup.c
@@ -197,8 +197,7 @@ vy_point_lookup(struct vy_lsm *lsm, struct vy_tx *tx,
 		struct tuple *key, struct tuple **ret)
 {
 	/* All key parts must be set for a point lookup. */
-	assert(vy_stmt_type(key) != IPROTO_SELECT ||
-	       tuple_field_count(key) >= lsm->cmp_def->part_count);
+	assert(vy_stmt_is_full_key(key, lsm->cmp_def));
 
 	*ret = NULL;
 	double start_time = ev_monotonic_now(loop());
@@ -301,7 +300,7 @@ int
 vy_point_lookup_mem(struct vy_lsm *lsm, const struct vy_read_view **rv,
 		    struct tuple *key, struct tuple **ret)
 {
-	assert(tuple_field_count(key) >= lsm->cmp_def->part_count);
+	assert(vy_stmt_is_full_key(key, lsm->cmp_def));
 
 	int rc;
 	struct vy_history history;
diff --git a/src/box/vy_range.c b/src/box/vy_range.c
index d9afcfff..bbd40615 100644
--- a/src/box/vy_range.c
+++ b/src/box/vy_range.c
@@ -81,8 +81,7 @@ vy_range_tree_find_by_key(vy_range_tree_t *tree,
 			  enum iterator_type iterator_type,
 			  const struct tuple *key)
 {
-	uint32_t key_field_count = tuple_field_count(key);
-	if (key_field_count == 0) {
+	if (vy_stmt_is_empty_key(key)) {
 		switch (iterator_type) {
 		case ITER_LT:
 		case ITER_LE:
@@ -125,7 +124,7 @@ vy_range_tree_find_by_key(vy_range_tree_t *tree,
 		range = vy_range_tree_psearch(tree, key);
 		/* switch to previous for case (4) */
 		if (range != NULL && range->begin != NULL &&
-		    key_field_count < range->cmp_def->part_count &&
+		    !vy_stmt_is_full_key(key, range->cmp_def) &&
 		    vy_stmt_compare_with_key(key, range->begin,
 					     range->cmp_def) == 0)
 			range = vy_range_tree_prev(tree, range);
diff --git a/src/box/vy_read_iterator.c b/src/box/vy_read_iterator.c
index 2984174a..740ee940 100644
--- a/src/box/vy_read_iterator.c
+++ b/src/box/vy_read_iterator.c
@@ -208,7 +208,7 @@ vy_read_iterator_is_exact_match(struct vy_read_iterator *itr,
 	return itr->last_stmt == NULL && stmt != NULL &&
 		(type == ITER_EQ || type == ITER_REQ ||
 		 type == ITER_GE || type == ITER_LE) &&
-		tuple_field_count(key) >= cmp_def->part_count &&
+		vy_stmt_is_full_key(key, cmp_def) &&
 		vy_stmt_compare(stmt, key, cmp_def) == 0;
 }
 
@@ -445,7 +445,7 @@ vy_read_iterator_advance(struct vy_read_iterator *itr)
 {
 	if (itr->last_stmt != NULL && (itr->iterator_type == ITER_EQ ||
 				       itr->iterator_type == ITER_REQ) &&
-	    tuple_field_count(itr->key) >= itr->lsm->cmp_def->part_count) {
+	    vy_stmt_is_full_key(itr->key, itr->lsm->cmp_def)) {
 		/*
 		 * There may be one statement at max satisfying
 		 * EQ with a full key.
@@ -678,7 +678,7 @@ vy_read_iterator_open(struct vy_read_iterator *itr, struct vy_lsm *lsm,
 	itr->key = key;
 	itr->read_view = rv;
 
-	if (tuple_field_count(key) == 0) {
+	if (vy_stmt_is_empty_key(key)) {
 		/*
 		 * Strictly speaking, a GT/LT iterator should return
 		 * nothing if the key is empty, because every key is
diff --git a/src/box/vy_read_set.c b/src/box/vy_read_set.c
index 0f3fab61..b95d2e4e 100644
--- a/src/box/vy_read_set.c
+++ b/src/box/vy_read_set.c
@@ -53,10 +53,8 @@ vy_read_interval_cmpl(const struct vy_read_interval *a,
 		return -1;
 	if (!a->left_belongs && b->left_belongs)
 		return 1;
-	uint32_t a_parts = tuple_field_count(a->left);
-	uint32_t b_parts = tuple_field_count(b->left);
-	a_parts = MIN(a_parts, cmp_def->part_count);
-	b_parts = MIN(b_parts, cmp_def->part_count);
+	uint32_t a_parts = vy_stmt_key_part_count(a->left, cmp_def);
+	uint32_t b_parts = vy_stmt_key_part_count(b->left, cmp_def);
 	if (a->left_belongs)
 		return a_parts < b_parts ? -1 : a_parts > b_parts;
 	else
@@ -76,10 +74,8 @@ vy_read_interval_cmpr(const struct vy_read_interval *a,
 		return 1;
 	if (!a->right_belongs && b->right_belongs)
 		return -1;
-	uint32_t a_parts = tuple_field_count(a->right);
-	uint32_t b_parts = tuple_field_count(b->right);
-	a_parts = MIN(a_parts, cmp_def->part_count);
-	b_parts = MIN(b_parts, cmp_def->part_count);
+	uint32_t a_parts = vy_stmt_key_part_count(a->right, cmp_def);
+	uint32_t b_parts = vy_stmt_key_part_count(b->right, cmp_def);
 	if (a->right_belongs)
 		return a_parts > b_parts ? -1 : a_parts < b_parts;
 	else
@@ -102,10 +98,8 @@ vy_read_interval_should_merge(const struct vy_read_interval *l,
 		return true;
 	if (!l->right_belongs && !r->left_belongs)
 		return false;
-	uint32_t l_parts = tuple_field_count(l->right);
-	uint32_t r_parts = tuple_field_count(r->left);
-	l_parts = MIN(l_parts, cmp_def->part_count);
-	r_parts = MIN(r_parts, cmp_def->part_count);
+	uint32_t l_parts = vy_stmt_key_part_count(l->right, cmp_def);
+	uint32_t r_parts = vy_stmt_key_part_count(r->left, cmp_def);
 	if (l->right_belongs)
 		return l_parts <= r_parts;
 	else
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index b85582b2..915e7f95 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -1273,7 +1273,7 @@ vy_run_iterator_do_seek(struct vy_run_iterator *itr,
 	struct vy_run_iterator_pos end_pos = {run->info.page_count, 0};
 	bool equal_found = false;
 	int rc;
-	if (tuple_field_count(key) > 0) {
+	if (!vy_stmt_is_empty_key(key)) {
 		rc = vy_run_iterator_search(itr, iterator_type, key,
 					    &itr->curr_pos, &equal_found);
 		if (rc != 0 || itr->search_ended)
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index dff0b94f..0832aa0f 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -220,6 +220,44 @@ vy_stmt_set_n_upserts(struct tuple *stmt, uint8_t n)
 }
 
 /**
+ * Return the number of key parts defined in the given vinyl
+ * statement.
+ *
+ * If the statement represents a tuple, we assume that it has
+ * all key parts defined.
+ */
+static inline uint32_t
+vy_stmt_key_part_count(const struct tuple *stmt, struct key_def *key_def)
+{
+	if (vy_stmt_type(stmt) == IPROTO_SELECT) {
+		uint32_t part_count = tuple_field_count(stmt);
+		assert(part_count <= key_def->part_count);
+		return part_count;
+	}
+	return key_def->part_count;
+}
+
+/**
+ * Return true if the given vinyl statement contains all
+ * key parts, i.e. can be used for an exact match lookup.
+ */
+static inline bool
+vy_stmt_is_full_key(const struct tuple *stmt, struct key_def *key_def)
+{
+	return vy_stmt_key_part_count(stmt, key_def) == key_def->part_count;
+}
+
+/**
+ * Return true if the given vinyl statement stores an empty
+ * (match all) key.
+ */
+static inline bool
+vy_stmt_is_empty_key(const struct tuple *stmt)
+{
+	return tuple_field_count(stmt) == 0;
+}
+
+/**
  * Duplicate the statememnt.
  *
  * @param stmt statement
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index b1fcfabb..ac02ee4d 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -943,7 +943,7 @@ vy_tx_track(struct vy_tx *tx, struct vy_lsm *lsm,
 int
 vy_tx_track_point(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt)
 {
-	assert(tuple_field_count(stmt) >= lsm->cmp_def->part_count);
+	assert(vy_stmt_is_full_key(stmt, lsm->cmp_def));
 
 	if (vy_tx_is_in_read_view(tx)) {
 		/* No point in tracking reads. */
@@ -1102,7 +1102,7 @@ vy_txw_iterator_seek(struct vy_txw_iterator *itr,
 	struct vy_lsm *lsm = itr->lsm;
 	struct write_set_key k = { lsm, key };
 	struct txv *txv;
-	if (tuple_field_count(key) > 0) {
+	if (!vy_stmt_is_empty_key(key)) {
 		if (iterator_type == ITER_EQ)
 			txv = write_set_search(&itr->tx->write_set, &k);
 		else if (iterator_type == ITER_GE || iterator_type == ITER_GT)
-- 
2.11.0




More information about the Tarantool-patches mailing list