[Tarantool-patches] [PATCH v4 09/12] txm: clarify all fetched tuples

Aleksandr Lyapunov alyapunov at tarantool.org
Tue Sep 8 13:22:09 MSK 2020


If a tuple fetched from an index is dirty - it must be clarified.
Let's fix all fetched from indexeds in that way.
Also fix a snapshot iterator - it must save a part of history
along with creating a read view in order to clean tuple during
iteration from another thread.

Part of #4897
---
 src/box/memtx_bitset.c |  31 ++++++++-----
 src/box/memtx_hash.c   |  82 +++++++++++++++++++++++++++------
 src/box/memtx_rtree.c  |  32 +++++++++++--
 src/box/memtx_tree.c   | 120 ++++++++++++++++++++++++++++++++++++++++++-------
 4 files changed, 224 insertions(+), 41 deletions(-)

diff --git a/src/box/memtx_bitset.c b/src/box/memtx_bitset.c
index 67eaf6f..2283a47 100644
--- a/src/box/memtx_bitset.c
+++ b/src/box/memtx_bitset.c
@@ -39,7 +39,10 @@
 #include "bitset/index.h"
 #include "fiber.h"
 #include "index.h"
+#include "schema.h"
 #include "tuple.h"
+#include "txn.h"
+#include "memtx_tx.h"
 #include "memtx_engine.h"
 
 struct memtx_bitset_index {
@@ -198,19 +201,27 @@ bitset_index_iterator_next(struct iterator *iterator, struct tuple **ret)
 	assert(iterator->free == bitset_index_iterator_free);
 	struct bitset_index_iterator *it = bitset_index_iterator(iterator);
 
-	size_t value = tt_bitset_iterator_next(&it->bitset_it);
-	if (value == SIZE_MAX) {
-		*ret = NULL;
-		return 0;
-	}
-
+	do {
+		size_t value = tt_bitset_iterator_next(&it->bitset_it);
+		if (value == SIZE_MAX) {
+			*ret = NULL;
+			return 0;
+		}
 #ifndef OLD_GOOD_BITSET
-	struct memtx_bitset_index *index =
-		(struct memtx_bitset_index *)iterator->index;
-	*ret = memtx_bitset_index_value_to_tuple(index, value);
+		struct memtx_bitset_index *index =
+			(struct memtx_bitset_index *)iterator->index;
+		struct tuple *tuple =
+			memtx_bitset_index_value_to_tuple(index, value);
 #else /* #ifndef OLD_GOOD_BITSET */
-	*ret = value_to_tuple(value);
+		struct tuple *tuple = value_to_tuple(value);
 #endif /* #ifndef OLD_GOOD_BITSET */
+		uint32_t iid = iterator->index->def->iid;
+		struct txn *txn = in_txn();
+		struct space *space = space_by_id(iterator->space_id);
+		bool is_rw = txn != NULL;
+		*ret = memtx_tx_tuple_clarify(txn, space, tuple, iid, 0, is_rw);
+	} while (*ret == NULL);
+
 	return 0;
 }
 
diff --git a/src/box/memtx_hash.c b/src/box/memtx_hash.c
index cdd531c..ed4dba9 100644
--- a/src/box/memtx_hash.c
+++ b/src/box/memtx_hash.c
@@ -33,9 +33,11 @@
 #include "fiber.h"
 #include "index.h"
 #include "tuple.h"
+#include "txn.h"
+#include "memtx_tx.h"
 #include "memtx_engine.h"
 #include "space.h"
-#include "schema.h" /* space_cache_find() */
+#include "schema.h" /* space_by_id(), space_cache_find() */
 #include "errinj.h"
 
 #include <small/mempool.h>
@@ -101,7 +103,7 @@ hash_iterator_free(struct iterator *iterator)
 }
 
 static int
-hash_iterator_ge(struct iterator *ptr, struct tuple **ret)
+hash_iterator_ge_base(struct iterator *ptr, struct tuple **ret)
 {
 	assert(ptr->free == hash_iterator_free);
 	struct hash_iterator *it = (struct hash_iterator *) ptr;
@@ -113,10 +115,10 @@ hash_iterator_ge(struct iterator *ptr, struct tuple **ret)
 }
 
 static int
-hash_iterator_gt(struct iterator *ptr, struct tuple **ret)
+hash_iterator_gt_base(struct iterator *ptr, struct tuple **ret)
 {
 	assert(ptr->free == hash_iterator_free);
-	ptr->next = hash_iterator_ge;
+	ptr->next = hash_iterator_ge_base;
 	struct hash_iterator *it = (struct hash_iterator *) ptr;
 	struct memtx_hash_index *index = (struct memtx_hash_index *)ptr->index;
 	struct tuple **res = light_index_iterator_get_and_next(&index->hash_table,
@@ -128,6 +130,32 @@ hash_iterator_gt(struct iterator *ptr, struct tuple **ret)
 	return 0;
 }
 
+#define WRAP_ITERATOR_METHOD(name)						\
+static int									\
+name(struct iterator *iterator, struct tuple **ret)				\
+{										\
+	struct txn *txn = in_txn();						\
+	struct space *space = space_by_id(iterator->space_id);			\
+	bool is_rw = txn != NULL;						\
+	uint32_t iid = iterator->index->def->iid;				\
+	bool is_first = true;							\
+	do {									\
+		int rc = is_first ? name##_base(iterator, ret)			\
+				  : hash_iterator_ge_base(iterator, ret);	\
+		if (rc != 0 || *ret == NULL)					\
+			return rc;						\
+		is_first = false;						\
+		*ret = memtx_tx_tuple_clarify(txn, space, *ret, iid, 0, is_rw); \
+	} while (*ret == NULL);							\
+	return 0;								\
+}										\
+struct forgot_to_add_semicolon
+
+WRAP_ITERATOR_METHOD(hash_iterator_ge);
+WRAP_ITERATOR_METHOD(hash_iterator_gt);
+
+#undef WRAP_ITERATOR_METHOD
+
 static int
 hash_iterator_eq_next(MAYBE_UNUSED struct iterator *it, struct tuple **ret)
 {
@@ -139,7 +167,15 @@ static int
 hash_iterator_eq(struct iterator *it, struct tuple **ret)
 {
 	it->next = hash_iterator_eq_next;
-	return hash_iterator_ge(it, ret);
+	hash_iterator_ge_base(it, ret); /* always returns zero. */
+	if (*ret == NULL)
+		return 0;
+	struct txn *txn = in_txn();
+	struct space *sp = space_by_id(it->space_id);
+	bool is_rw = txn != NULL;
+	*ret = memtx_tx_tuple_clarify(txn, sp, *ret, it->index->def->iid,
+				      0, is_rw);
+	return 0;
 }
 
 /* }}} */
@@ -279,11 +315,18 @@ memtx_hash_index_get(struct index *base, const char *key,
 	       part_count == base->def->key_def->part_count);
 	(void) part_count;
 
+	struct space *space = space_by_id(base->def->space_id);
 	*result = NULL;
 	uint32_t h = key_hash(key, base->def->key_def);
 	uint32_t k = light_index_find_key(&index->hash_table, h, key);
-	if (k != light_index_end)
-		*result = light_index_get(&index->hash_table, k);
+	if (k != light_index_end) {
+		struct tuple *tuple = light_index_get(&index->hash_table, k);
+		uint32_t iid = base->def->iid;
+		struct txn *txn = in_txn();
+		bool is_rw = txn != NULL;
+		*result = memtx_tx_tuple_clarify(txn, space, tuple, iid,
+						 0, is_rw);
+	}
 	return 0;
 }
 
@@ -401,6 +444,7 @@ struct hash_snapshot_iterator {
 	struct snapshot_iterator base;
 	struct memtx_hash_index *index;
 	struct light_index_iterator iterator;
+	struct memtx_tx_snapshot_cleaner cleaner;
 };
 
 /**
@@ -418,6 +462,7 @@ hash_snapshot_iterator_free(struct snapshot_iterator *iterator)
 				      it->index->base.engine);
 	light_index_iterator_destroy(&it->index->hash_table, &it->iterator);
 	index_unref(&it->index->base);
+	memtx_tx_snapshot_cleaner_destroy(&it->cleaner);
 	free(iterator);
 }
 
@@ -434,13 +479,24 @@ hash_snapshot_iterator_next(struct snapshot_iterator *iterator,
 	struct hash_snapshot_iterator *it =
 		(struct hash_snapshot_iterator *) iterator;
 	struct light_index_core *hash_table = &it->index->hash_table;
-	struct tuple **res = light_index_iterator_get_and_next(hash_table,
-							       &it->iterator);
-	if (res == NULL) {
-		*data = NULL;
-		return 0;
+
+	while (true) {
+		struct tuple **res =
+			light_index_iterator_get_and_next(hash_table,
+			                                  &it->iterator);
+		if (res == NULL) {
+			*data = NULL;
+			return 0;
+		}
+
+		struct tuple *tuple = *res;
+		tuple = memtx_tx_snapshot_clarify(&it->cleaner, tuple);
+
+		if (tuple != NULL) {
+			*data = tuple_data_range(*res, size);
+			return 0;
+		}
 	}
-	*data = tuple_data_range(*res, size);
 	return 0;
 }
 
diff --git a/src/box/memtx_rtree.c b/src/box/memtx_rtree.c
index 612fcb2..0bd683d 100644
--- a/src/box/memtx_rtree.c
+++ b/src/box/memtx_rtree.c
@@ -40,7 +40,10 @@
 #include "trivia/util.h"
 
 #include "tuple.h"
+#include "txn.h"
+#include "memtx_tx.h"
 #include "space.h"
+#include "schema.h"
 #include "memtx_engine.h"
 
 struct memtx_rtree_index {
@@ -148,7 +151,16 @@ static int
 index_rtree_iterator_next(struct iterator *i, struct tuple **ret)
 {
 	struct index_rtree_iterator *itr = (struct index_rtree_iterator *)i;
-	*ret = (struct tuple *)rtree_iterator_next(&itr->impl);
+	do {
+		*ret = (struct tuple *) rtree_iterator_next(&itr->impl);
+		if (*ret == NULL)
+			break;
+		uint32_t iid = i->index->def->iid;
+		struct txn *txn = in_txn();
+		struct space *space = space_by_id(i->space_id);
+		bool is_rw = txn != NULL;
+		*ret = memtx_tx_tuple_clarify(txn, space, *ret, iid, 0, is_rw);
+	} while (*ret == NULL);
 	return 0;
 }
 
@@ -213,8 +225,22 @@ memtx_rtree_index_get(struct index *base, const char *key,
 		unreachable();
 
 	*result = NULL;
-	if (rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator))
-		*result = (struct tuple *)rtree_iterator_next(&iterator);
+	if (!rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator)) {
+		rtree_iterator_destroy(&iterator);
+		return 0;
+	}
+	do {
+		struct tuple *tuple = (struct tuple *)
+			rtree_iterator_next(&iterator);
+		if (tuple == NULL)
+			break;
+		uint32_t iid = base->def->iid;
+		struct txn *txn = in_txn();
+		struct space *space = space_by_id(base->def->space_id);
+		bool is_rw = txn != NULL;
+		*result = memtx_tx_tuple_clarify(txn, space, tuple, iid,
+						 0, is_rw);
+	} while (*result == NULL);
 	rtree_iterator_destroy(&iterator);
 	return 0;
 }
diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index 76ff3fc..5af482f 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -31,12 +31,14 @@
 #include "memtx_tree.h"
 #include "memtx_engine.h"
 #include "space.h"
-#include "schema.h" /* space_cache_find() */
+#include "schema.h" /* space_by_id(), space_cache_find() */
 #include "errinj.h"
 #include "memory.h"
 #include "fiber.h"
 #include "key_list.h"
 #include "tuple.h"
+#include "txn.h"
+#include "memtx_tx.h"
 #include <third_party/qsort_arg.h>
 #include <small/mempool.h>
 
@@ -175,7 +177,7 @@ tree_iterator_dummie(struct iterator *iterator, struct tuple **ret)
 }
 
 static int
-tree_iterator_next(struct iterator *iterator, struct tuple **ret)
+tree_iterator_next_base(struct iterator *iterator, struct tuple **ret)
 {
 	struct memtx_tree_index *index =
 		(struct memtx_tree_index *)iterator->index;
@@ -205,7 +207,7 @@ tree_iterator_next(struct iterator *iterator, struct tuple **ret)
 }
 
 static int
-tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
+tree_iterator_prev_base(struct iterator *iterator, struct tuple **ret)
 {
 	struct memtx_tree_index *index =
 		(struct memtx_tree_index *)iterator->index;
@@ -234,7 +236,7 @@ tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
 }
 
 static int
-tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
+tree_iterator_next_equal_base(struct iterator *iterator, struct tuple **ret)
 {
 	struct memtx_tree_index *index =
 		(struct memtx_tree_index *)iterator->index;
@@ -270,7 +272,7 @@ tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
 }
 
 static int
-tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
+tree_iterator_prev_equal_base(struct iterator *iterator, struct tuple **ret)
 {
 	struct memtx_tree_index *index =
 		(struct memtx_tree_index *)iterator->index;
@@ -304,6 +306,47 @@ tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
 	return 0;
 }
 
+#define WRAP_ITERATOR_METHOD(name)						\
+static int									\
+name(struct iterator *iterator, struct tuple **ret)				\
+{										\
+	struct memtx_tree *tree =						\
+		&((struct memtx_tree_index *)iterator->index)->tree;		\
+	struct tree_iterator *it = tree_iterator(iterator);			\
+	struct memtx_tree_iterator *ti = &it->tree_iterator;			\
+	uint32_t iid = iterator->index->def->iid;				\
+	bool is_multikey = iterator->index->def->key_def->is_multikey;		\
+	struct txn *txn = in_txn();						\
+	struct space *space = space_by_id(iterator->space_id);			\
+	bool is_rw = txn != NULL;						\
+	do {									\
+		int rc = name##_base(iterator, ret);				\
+		if (rc != 0 || *ret == NULL)					\
+			return rc;						\
+		uint32_t mk_index = 0;						\
+		if (is_multikey) {						\
+			struct memtx_tree_data *check =				\
+				memtx_tree_iterator_get_elem(tree, ti);		\
+			assert(check != NULL);					\
+			mk_index = check->hint;					\
+		}								\
+		*ret = memtx_tx_tuple_clarify(txn, space, *ret,			\
+					      iid, mk_index, is_rw);		\
+	} while (*ret == NULL);							\
+	tuple_unref(it->current.tuple);						\
+	it->current.tuple = *ret;						\
+	tuple_ref(it->current.tuple);						\
+	return 0;								\
+}										\
+struct forgot_to_add_semicolon
+
+WRAP_ITERATOR_METHOD(tree_iterator_next);
+WRAP_ITERATOR_METHOD(tree_iterator_prev);
+WRAP_ITERATOR_METHOD(tree_iterator_next_equal);
+WRAP_ITERATOR_METHOD(tree_iterator_prev_equal);
+
+#undef WRAP_ITERATOR_METHOD
+
 static void
 tree_iterator_set_next_method(struct tree_iterator *it)
 {
@@ -388,6 +431,22 @@ tree_iterator_start(struct iterator *iterator, struct tuple **ret)
 	tuple_ref(*ret);
 	it->current = *res;
 	tree_iterator_set_next_method(it);
+
+	uint32_t iid = iterator->index->def->iid;
+	bool is_multikey = iterator->index->def->key_def->is_multikey;
+	struct txn *txn = in_txn();
+	struct space *space = space_by_id(iterator->space_id);
+	bool is_rw = txn != NULL;
+	uint32_t mk_index = is_multikey ? res->hint : 0;
+	*ret = memtx_tx_tuple_clarify(txn, space, *ret, iid, mk_index, is_rw);
+	if (*ret == NULL) {
+		return iterator->next(iterator, ret);
+	} else {
+		tuple_unref(it->current.tuple);
+		it->current.tuple = *ret;
+		tuple_ref(it->current.tuple);
+	}
+
 	return 0;
 }
 
@@ -539,7 +598,16 @@ memtx_tree_index_get(struct index *base, const char *key,
 	key_data.part_count = part_count;
 	key_data.hint = key_hint(key, part_count, cmp_def);
 	struct memtx_tree_data *res = memtx_tree_find(&index->tree, &key_data);
-	*result = res != NULL ? res->tuple : NULL;
+	if (res == NULL) {
+		*result = NULL;
+		return 0;
+	}
+	struct txn *txn = in_txn();
+	struct space *space = space_by_id(base->def->space_id);
+	bool is_rw = txn != NULL;
+	uint32_t mk_index = base->def->key_def->is_multikey ? res->hint : 0;
+	*result = memtx_tx_tuple_clarify(txn, space, res->tuple, base->def->iid,
+					 mk_index, is_rw);
 	return 0;
 }
 
@@ -1208,6 +1276,7 @@ struct tree_snapshot_iterator {
 	struct snapshot_iterator base;
 	struct memtx_tree_index *index;
 	struct memtx_tree_iterator tree_iterator;
+	struct memtx_tx_snapshot_cleaner cleaner;
 };
 
 static void
@@ -1220,6 +1289,7 @@ tree_snapshot_iterator_free(struct snapshot_iterator *iterator)
 				      it->index->base.engine);
 	memtx_tree_iterator_destroy(&it->index->tree, &it->tree_iterator);
 	index_unref(&it->index->base);
+	memtx_tx_snapshot_cleaner_destroy(&it->cleaner);
 	free(iterator);
 }
 
@@ -1231,14 +1301,27 @@ tree_snapshot_iterator_next(struct snapshot_iterator *iterator,
 	struct tree_snapshot_iterator *it =
 		(struct tree_snapshot_iterator *)iterator;
 	struct memtx_tree *tree = &it->index->tree;
-	struct memtx_tree_data *res = memtx_tree_iterator_get_elem(tree,
-							&it->tree_iterator);
-	if (res == NULL) {
-		*data = NULL;
-		return 0;
+
+	while (true) {
+		struct memtx_tree_data *res =
+			memtx_tree_iterator_get_elem(tree, &it->tree_iterator);
+
+		if (res == NULL) {
+			*data = NULL;
+			return 0;
+		}
+
+		memtx_tree_iterator_next(tree, &it->tree_iterator);
+
+		struct tuple *tuple = res->tuple;
+		tuple = memtx_tx_snapshot_clarify(&it->cleaner, tuple);
+
+		if (tuple != NULL) {
+			*data = tuple_data_range(tuple, size);
+			return 0;
+		}
 	}
-	memtx_tree_iterator_next(tree, &it->tree_iterator);
-	*data = tuple_data_range(res->tuple, size);
+
 	return 0;
 }
 
@@ -1251,14 +1334,21 @@ static struct snapshot_iterator *
 memtx_tree_index_create_snapshot_iterator(struct index *base)
 {
 	struct memtx_tree_index *index = (struct memtx_tree_index *)base;
-	struct tree_snapshot_iterator *it = (struct tree_snapshot_iterator *)
-		calloc(1, sizeof(*it));
+	struct tree_snapshot_iterator *it =
+		(struct tree_snapshot_iterator *) calloc(1, sizeof(*it));
 	if (it == NULL) {
 		diag_set(OutOfMemory, sizeof(struct tree_snapshot_iterator),
 			 "memtx_tree_index", "create_snapshot_iterator");
 		return NULL;
 	}
 
+	struct space *space = space_cache_find(base->def->space_id);
+	if (memtx_tx_snapshot_cleaner_create(&it->cleaner, space,
+					     "memtx_tree_index") != 0) {
+		free(it);
+		return NULL;
+	}
+
 	it->base.free = tree_snapshot_iterator_free;
 	it->base.next = tree_snapshot_iterator_next;
 	it->index = index;
-- 
2.7.4



More information about the Tarantool-patches mailing list