[Tarantool-patches] [PATCH v3 12/13] txm: clarify all fetched tuples

Aleksandr Lyapunov alyapunov at tarantool.org
Wed Jul 15 16:55:35 MSK 2020


If a tuple fetched from an index is dirty - it must be clarified.
Let's fix all fetched from indexeds in that way.
Also fix a snapshot iterator - it must save a part of history
along with creating a read view in order to clean tuple during
iteration from another thread.

Part of #4897
---
 src/box/memtx_bitset.c |  30 ++++---
 src/box/memtx_hash.c   |  79 ++++++++++++++---
 src/box/memtx_rtree.c  |  30 ++++++-
 src/box/memtx_tree.c   | 119 +++++++++++++++++++++----
 src/box/space.c        |   2 +
 src/box/space.h        |   4 +
 src/box/txn.c          | 230 +++++++++++++++++++++++++++++++++++++++++++++----
 src/box/txn.h          |  64 +++++++++++++-
 8 files changed, 499 insertions(+), 59 deletions(-)

diff --git a/src/box/memtx_bitset.c b/src/box/memtx_bitset.c
index 67eaf6f..68845c5 100644
--- a/src/box/memtx_bitset.c
+++ b/src/box/memtx_bitset.c
@@ -39,7 +39,9 @@
 #include "bitset/index.h"
 #include "fiber.h"
 #include "index.h"
+#include "schema.h"
 #include "tuple.h"
+#include "txn.h"
 #include "memtx_engine.h"
 
 struct memtx_bitset_index {
@@ -198,19 +200,27 @@ bitset_index_iterator_next(struct iterator *iterator, struct tuple **ret)
 	assert(iterator->free == bitset_index_iterator_free);
 	struct bitset_index_iterator *it = bitset_index_iterator(iterator);
 
-	size_t value = tt_bitset_iterator_next(&it->bitset_it);
-	if (value == SIZE_MAX) {
-		*ret = NULL;
-		return 0;
-	}
-
+	do {
+		size_t value = tt_bitset_iterator_next(&it->bitset_it);
+		if (value == SIZE_MAX) {
+			*ret = NULL;
+			return 0;
+		}
 #ifndef OLD_GOOD_BITSET
-	struct memtx_bitset_index *index =
-		(struct memtx_bitset_index *)iterator->index;
-	*ret = memtx_bitset_index_value_to_tuple(index, value);
+		struct memtx_bitset_index *index =
+			(struct memtx_bitset_index *)iterator->index;
+		struct tuple *tuple =
+			memtx_bitset_index_value_to_tuple(index, value);
 #else /* #ifndef OLD_GOOD_BITSET */
-	*ret = value_to_tuple(value);
+		struct tuple *tuple = value_to_tuple(value);
 #endif /* #ifndef OLD_GOOD_BITSET */
+		uint32_t iid = iterator->index->def->iid;
+		struct txn *txn = in_txn();
+		struct space *space = space_by_id(iterator->space_id);
+		bool is_rw = txn != NULL;
+		*ret = txm_tuple_clarify(txn, space, tuple, iid, 0, is_rw);
+	} while (*ret == NULL);
+
 	return 0;
 }
 
diff --git a/src/box/memtx_hash.c b/src/box/memtx_hash.c
index cdd531c..6e3bc18 100644
--- a/src/box/memtx_hash.c
+++ b/src/box/memtx_hash.c
@@ -33,9 +33,10 @@
 #include "fiber.h"
 #include "index.h"
 #include "tuple.h"
+#include "txn.h"
 #include "memtx_engine.h"
 #include "space.h"
-#include "schema.h" /* space_cache_find() */
+#include "schema.h" /* space_by_id(), space_cache_find() */
 #include "errinj.h"
 
 #include <small/mempool.h>
@@ -101,7 +102,7 @@ hash_iterator_free(struct iterator *iterator)
 }
 
 static int
-hash_iterator_ge(struct iterator *ptr, struct tuple **ret)
+hash_iterator_ge_base(struct iterator *ptr, struct tuple **ret)
 {
 	assert(ptr->free == hash_iterator_free);
 	struct hash_iterator *it = (struct hash_iterator *) ptr;
@@ -113,10 +114,10 @@ hash_iterator_ge(struct iterator *ptr, struct tuple **ret)
 }
 
 static int
-hash_iterator_gt(struct iterator *ptr, struct tuple **ret)
+hash_iterator_gt_base(struct iterator *ptr, struct tuple **ret)
 {
 	assert(ptr->free == hash_iterator_free);
-	ptr->next = hash_iterator_ge;
+	ptr->next = hash_iterator_ge_base;
 	struct hash_iterator *it = (struct hash_iterator *) ptr;
 	struct memtx_hash_index *index = (struct memtx_hash_index *)ptr->index;
 	struct tuple **res = light_index_iterator_get_and_next(&index->hash_table,
@@ -128,6 +129,32 @@ hash_iterator_gt(struct iterator *ptr, struct tuple **ret)
 	return 0;
 }
 
+#define WRAP_ITERATOR_METHOD(name)						\
+static int									\
+name(struct iterator *iterator, struct tuple **ret)				\
+{										\
+	struct txn *txn = in_txn();						\
+	struct space *space = space_by_id(iterator->space_id);			\
+	bool is_rw = txn != NULL;						\
+	uint32_t iid = iterator->index->def->iid;				\
+	bool is_first = true;							\
+	do {									\
+		int rc = is_first ? name##_base(iterator, ret)			\
+				  : hash_iterator_ge_base(iterator, ret);	\
+		if (rc != 0 || *ret == NULL)					\
+			return rc;						\
+		is_first = false;						\
+		*ret = txm_tuple_clarify(txn, space, *ret, iid, 0, is_rw);	\
+	} while (*ret == NULL);							\
+	return 0;								\
+}										\
+struct forgot_to_add_semicolon
+
+WRAP_ITERATOR_METHOD(hash_iterator_ge);
+WRAP_ITERATOR_METHOD(hash_iterator_gt);
+
+#undef WRAP_ITERATOR_METHOD
+
 static int
 hash_iterator_eq_next(MAYBE_UNUSED struct iterator *it, struct tuple **ret)
 {
@@ -139,7 +166,14 @@ static int
 hash_iterator_eq(struct iterator *it, struct tuple **ret)
 {
 	it->next = hash_iterator_eq_next;
-	return hash_iterator_ge(it, ret);
+	hash_iterator_ge_base(it, ret); /* always returns zero. */
+	if (*ret == NULL)
+		return 0;
+	struct txn *txn = in_txn();
+	struct space *sp = space_by_id(it->space_id);
+	bool is_rw = txn != NULL;
+	*ret = txm_tuple_clarify(txn, sp, *ret, it->index->def->iid, 0, is_rw);
+	return 0;
 }
 
 /* }}} */
@@ -279,11 +313,17 @@ memtx_hash_index_get(struct index *base, const char *key,
 	       part_count == base->def->key_def->part_count);
 	(void) part_count;
 
+	struct space *space = space_by_id(base->def->space_id);
 	*result = NULL;
 	uint32_t h = key_hash(key, base->def->key_def);
 	uint32_t k = light_index_find_key(&index->hash_table, h, key);
-	if (k != light_index_end)
-		*result = light_index_get(&index->hash_table, k);
+	if (k != light_index_end) {
+		struct tuple *tuple = light_index_get(&index->hash_table, k);
+		uint32_t iid = base->def->iid;
+		struct txn *txn = in_txn();
+		bool is_rw = txn != NULL;
+		*result = txm_tuple_clarify(txn, space, tuple, iid, 0, is_rw);
+	}
 	return 0;
 }
 
@@ -401,6 +441,7 @@ struct hash_snapshot_iterator {
 	struct snapshot_iterator base;
 	struct memtx_hash_index *index;
 	struct light_index_iterator iterator;
+	struct txm_snapshot_cleaner cleaner;
 };
 
 /**
@@ -418,6 +459,7 @@ hash_snapshot_iterator_free(struct snapshot_iterator *iterator)
 				      it->index->base.engine);
 	light_index_iterator_destroy(&it->index->hash_table, &it->iterator);
 	index_unref(&it->index->base);
+	txm_snapshot_cleaner_destroy(&it->cleaner);
 	free(iterator);
 }
 
@@ -434,13 +476,24 @@ hash_snapshot_iterator_next(struct snapshot_iterator *iterator,
 	struct hash_snapshot_iterator *it =
 		(struct hash_snapshot_iterator *) iterator;
 	struct light_index_core *hash_table = &it->index->hash_table;
-	struct tuple **res = light_index_iterator_get_and_next(hash_table,
-							       &it->iterator);
-	if (res == NULL) {
-		*data = NULL;
-		return 0;
+
+	while (true) {
+		struct tuple **res =
+			light_index_iterator_get_and_next(hash_table,
+			                                  &it->iterator);
+		if (res == NULL) {
+			*data = NULL;
+			return 0;
+		}
+
+		struct tuple *tuple = *res;
+		tuple = txm_snapshot_clarify(&it->cleaner, tuple);
+
+		if (tuple != NULL) {
+			*data = tuple_data_range(*res, size);
+			return 0;
+		}
 	}
-	*data = tuple_data_range(*res, size);
 	return 0;
 }
 
diff --git a/src/box/memtx_rtree.c b/src/box/memtx_rtree.c
index 612fcb2..c7f1655 100644
--- a/src/box/memtx_rtree.c
+++ b/src/box/memtx_rtree.c
@@ -40,7 +40,9 @@
 #include "trivia/util.h"
 
 #include "tuple.h"
+#include "txn.h"
 #include "space.h"
+#include "schema.h"
 #include "memtx_engine.h"
 
 struct memtx_rtree_index {
@@ -148,7 +150,16 @@ static int
 index_rtree_iterator_next(struct iterator *i, struct tuple **ret)
 {
 	struct index_rtree_iterator *itr = (struct index_rtree_iterator *)i;
-	*ret = (struct tuple *)rtree_iterator_next(&itr->impl);
+	do {
+		*ret = (struct tuple *) rtree_iterator_next(&itr->impl);
+		if (*ret == NULL)
+			break;
+		uint32_t iid = i->index->def->iid;
+		struct txn *txn = in_txn();
+		struct space *space = space_by_id(i->space_id);
+		bool is_rw = txn != NULL;
+		*ret = txm_tuple_clarify(txn, space, *ret, iid, 0, is_rw);
+	} while (*ret == NULL);
 	return 0;
 }
 
@@ -213,8 +224,21 @@ memtx_rtree_index_get(struct index *base, const char *key,
 		unreachable();
 
 	*result = NULL;
-	if (rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator))
-		*result = (struct tuple *)rtree_iterator_next(&iterator);
+	if (!rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator)) {
+		rtree_iterator_destroy(&iterator);
+		return 0;
+	}
+	do {
+		struct tuple *tuple = (struct tuple *)
+			rtree_iterator_next(&iterator);
+		if (tuple == NULL)
+			break;
+		uint32_t iid = base->def->iid;
+		struct txn *txn = in_txn();
+		struct space *space = space_by_id(base->def->space_id);
+		bool is_rw = txn != NULL;
+		*result = txm_tuple_clarify(txn, space, tuple, iid, 0, is_rw);
+	} while (*result == NULL);
 	rtree_iterator_destroy(&iterator);
 	return 0;
 }
diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index 76ff3fc..fe93467 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -31,12 +31,13 @@
 #include "memtx_tree.h"
 #include "memtx_engine.h"
 #include "space.h"
-#include "schema.h" /* space_cache_find() */
+#include "schema.h" /* space_by_id(), space_cache_find() */
 #include "errinj.h"
 #include "memory.h"
 #include "fiber.h"
 #include "key_list.h"
 #include "tuple.h"
+#include "txn.h"
 #include <third_party/qsort_arg.h>
 #include <small/mempool.h>
 
@@ -175,7 +176,7 @@ tree_iterator_dummie(struct iterator *iterator, struct tuple **ret)
 }
 
 static int
-tree_iterator_next(struct iterator *iterator, struct tuple **ret)
+tree_iterator_next_base(struct iterator *iterator, struct tuple **ret)
 {
 	struct memtx_tree_index *index =
 		(struct memtx_tree_index *)iterator->index;
@@ -205,7 +206,7 @@ tree_iterator_next(struct iterator *iterator, struct tuple **ret)
 }
 
 static int
-tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
+tree_iterator_prev_base(struct iterator *iterator, struct tuple **ret)
 {
 	struct memtx_tree_index *index =
 		(struct memtx_tree_index *)iterator->index;
@@ -234,7 +235,7 @@ tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
 }
 
 static int
-tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
+tree_iterator_next_equal_base(struct iterator *iterator, struct tuple **ret)
 {
 	struct memtx_tree_index *index =
 		(struct memtx_tree_index *)iterator->index;
@@ -270,7 +271,7 @@ tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
 }
 
 static int
-tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
+tree_iterator_prev_equal_base(struct iterator *iterator, struct tuple **ret)
 {
 	struct memtx_tree_index *index =
 		(struct memtx_tree_index *)iterator->index;
@@ -304,6 +305,47 @@ tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
 	return 0;
 }
 
+#define WRAP_ITERATOR_METHOD(name)						\
+static int									\
+name(struct iterator *iterator, struct tuple **ret)				\
+{										\
+	struct memtx_tree *tree =						\
+		&((struct memtx_tree_index *)iterator->index)->tree;		\
+	struct tree_iterator *it = tree_iterator(iterator);			\
+	struct memtx_tree_iterator *ti = &it->tree_iterator;			\
+	uint32_t iid = iterator->index->def->iid;				\
+	bool is_multikey = iterator->index->def->key_def->is_multikey;		\
+	struct txn *txn = in_txn();						\
+	struct space *space = space_by_id(iterator->space_id);			\
+	bool is_rw = txn != NULL;						\
+	do {									\
+		int rc = name##_base(iterator, ret);				\
+		if (rc != 0 || *ret == NULL)					\
+			return rc;						\
+		uint32_t mk_index = 0;						\
+		if (is_multikey) {						\
+			struct memtx_tree_data *check =				\
+				memtx_tree_iterator_get_elem(tree, ti);		\
+			assert(check != NULL);					\
+			mk_index = check->hint;					\
+		}								\
+		*ret = txm_tuple_clarify(txn, space, *ret,			\
+					 iid, mk_index, is_rw);			\
+	} while (*ret == NULL);							\
+	tuple_unref(it->current.tuple);						\
+	it->current.tuple = *ret;						\
+	tuple_ref(it->current.tuple);						\
+	return 0;								\
+}										\
+struct forgot_to_add_semicolon
+
+WRAP_ITERATOR_METHOD(tree_iterator_next);
+WRAP_ITERATOR_METHOD(tree_iterator_prev);
+WRAP_ITERATOR_METHOD(tree_iterator_next_equal);
+WRAP_ITERATOR_METHOD(tree_iterator_prev_equal);
+
+#undef WRAP_ITERATOR_METHOD
+
 static void
 tree_iterator_set_next_method(struct tree_iterator *it)
 {
@@ -388,6 +430,22 @@ tree_iterator_start(struct iterator *iterator, struct tuple **ret)
 	tuple_ref(*ret);
 	it->current = *res;
 	tree_iterator_set_next_method(it);
+
+	uint32_t iid = iterator->index->def->iid;
+	bool is_multikey = iterator->index->def->key_def->is_multikey;
+	struct txn *txn = in_txn();
+	struct space *space = space_by_id(iterator->space_id);
+	bool is_rw = txn != NULL;
+	uint32_t mk_index = is_multikey ? res->hint : 0;
+	*ret = txm_tuple_clarify(txn, space, *ret, iid, mk_index, is_rw);
+	if (*ret == NULL) {
+		return iterator->next(iterator, ret);
+	} else {
+		tuple_unref(it->current.tuple);
+		it->current.tuple = *ret;
+		tuple_ref(it->current.tuple);
+	}
+
 	return 0;
 }
 
@@ -539,7 +597,16 @@ memtx_tree_index_get(struct index *base, const char *key,
 	key_data.part_count = part_count;
 	key_data.hint = key_hint(key, part_count, cmp_def);
 	struct memtx_tree_data *res = memtx_tree_find(&index->tree, &key_data);
-	*result = res != NULL ? res->tuple : NULL;
+	if (res == NULL) {
+		*result = NULL;
+		return 0;
+	}
+	struct txn *txn = in_txn();
+	struct space *space = space_by_id(base->def->space_id);
+	bool is_rw = txn != NULL;
+	uint32_t mk_index = base->def->key_def->is_multikey ? res->hint : 0;
+	*result = txm_tuple_clarify(txn, space, res->tuple, base->def->iid,
+				    mk_index, is_rw);
 	return 0;
 }
 
@@ -1208,6 +1275,7 @@ struct tree_snapshot_iterator {
 	struct snapshot_iterator base;
 	struct memtx_tree_index *index;
 	struct memtx_tree_iterator tree_iterator;
+	struct txm_snapshot_cleaner cleaner;
 };
 
 static void
@@ -1220,6 +1288,7 @@ tree_snapshot_iterator_free(struct snapshot_iterator *iterator)
 				      it->index->base.engine);
 	memtx_tree_iterator_destroy(&it->index->tree, &it->tree_iterator);
 	index_unref(&it->index->base);
+	txm_snapshot_cleaner_destroy(&it->cleaner);
 	free(iterator);
 }
 
@@ -1231,14 +1300,27 @@ tree_snapshot_iterator_next(struct snapshot_iterator *iterator,
 	struct tree_snapshot_iterator *it =
 		(struct tree_snapshot_iterator *)iterator;
 	struct memtx_tree *tree = &it->index->tree;
-	struct memtx_tree_data *res = memtx_tree_iterator_get_elem(tree,
-							&it->tree_iterator);
-	if (res == NULL) {
-		*data = NULL;
-		return 0;
+
+	while (true) {
+		struct memtx_tree_data *res =
+			memtx_tree_iterator_get_elem(tree, &it->tree_iterator);
+
+		if (res == NULL) {
+			*data = NULL;
+			return 0;
+		}
+
+		memtx_tree_iterator_next(tree, &it->tree_iterator);
+
+		struct tuple *tuple = res->tuple;
+		tuple = txm_snapshot_clarify(&it->cleaner, tuple);
+
+		if (tuple != NULL) {
+			*data = tuple_data_range(tuple, size);
+			return 0;
+		}
 	}
-	memtx_tree_iterator_next(tree, &it->tree_iterator);
-	*data = tuple_data_range(res->tuple, size);
+
 	return 0;
 }
 
@@ -1251,14 +1333,21 @@ static struct snapshot_iterator *
 memtx_tree_index_create_snapshot_iterator(struct index *base)
 {
 	struct memtx_tree_index *index = (struct memtx_tree_index *)base;
-	struct tree_snapshot_iterator *it = (struct tree_snapshot_iterator *)
-		calloc(1, sizeof(*it));
+	struct tree_snapshot_iterator *it =
+		(struct tree_snapshot_iterator *) calloc(1, sizeof(*it));
 	if (it == NULL) {
 		diag_set(OutOfMemory, sizeof(struct tree_snapshot_iterator),
 			 "memtx_tree_index", "create_snapshot_iterator");
 		return NULL;
 	}
 
+	struct space *space = space_cache_find(base->def->space_id);
+	if (txm_snapshot_cleaner_create(&it->cleaner, space,
+					"memtx_tree_index") != 0) {
+		free(it);
+		return NULL;
+	}
+
 	it->base.free = tree_snapshot_iterator_free;
 	it->base.next = tree_snapshot_iterator_next;
 	it->index = index;
diff --git a/src/box/space.c b/src/box/space.c
index 1d375cc..7394316 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -210,6 +210,7 @@ space_create(struct space *space, struct engine *engine,
 			 "constraint_ids");
 		goto fail;
 	}
+	rlist_create(&space->txm_stories);
 	return 0;
 
 fail_free_indexes:
@@ -252,6 +253,7 @@ space_new_ephemeral(struct space_def *def, struct rlist *key_list)
 void
 space_delete(struct space *space)
 {
+	rlist_del(&space->txm_stories);
 	assert(space->ck_constraint_trigger == NULL);
 	for (uint32_t j = 0; j <= space->index_id_max; j++) {
 		struct index *index = space->index_map[j];
diff --git a/src/box/space.h b/src/box/space.h
index bbdd3ef..9dcc4e7 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -239,6 +239,10 @@ struct space {
 	 * Hash table with constraint identifiers hashed by name.
 	 */
 	struct mh_strnptr_t *constraint_ids;
+	/**
+	 * List of all tx stories in the space.
+	 */
+	struct rlist txm_stories;
 };
 
 /** Initialize a base space instance. */
diff --git a/src/box/txn.c b/src/box/txn.c
index 4c46b60..b4888b3 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -101,6 +101,20 @@ struct tx_conflict_tracker {
 	struct rlist in_conflicted_by_list;
 };
 
+/**
+ * Record that links transaction and a story that the transaction have read.
+ */
+struct tx_read_tracker {
+	/** The TX that read story. */
+	struct txn *reader;
+	/** The story that was read by reader. */
+	struct txm_story *story;
+	/** Link in story->reader_list. */
+	struct rlist in_reader_list;
+	/** Link in reader->read_set. */
+	struct rlist in_read_set;
+};
+
 double too_long_threshold;
 
 /* Txn cache. */
@@ -263,6 +277,7 @@ txn_new(void)
 	}
 	assert(region_used(&region) == sizeof(*txn));
 	txn->region = region;
+	rlist_create(&txn->read_set);
 	rlist_create(&txn->conflict_list);
 	rlist_create(&txn->conflicted_by_list);
 	rlist_create(&txn->in_read_view_txs);
@@ -275,6 +290,14 @@ txn_new(void)
 inline static void
 txn_free(struct txn *txn)
 {
+	struct tx_read_tracker *tracker, *tmp;
+	rlist_foreach_entry_safe(tracker, &txn->read_set,
+				 in_read_set, tmp) {
+		rlist_del(&tracker->in_reader_list);
+		rlist_del(&tracker->in_read_set);
+	}
+	assert(rlist_empty(&txn->read_set));
+
 	struct tx_conflict_tracker *entry, *next;
 	rlist_foreach_entry_safe(entry, &txn->conflict_list,
 				 in_conflict_list, next) {
@@ -1430,6 +1453,7 @@ txm_story_new(struct space *space, struct tuple *tuple)
 	story->del_psn = 0;
 	rlist_create(&story->reader_list);
 	rlist_add_tail(&txm.all_stories, &story->in_all_stories);
+	rlist_add(&space->txm_stories, &story->in_space_stories);
 	memset(story->link, 0, sizeof(story->link[0]) * index_count);
 	return story;
 }
@@ -1963,26 +1987,32 @@ txm_history_prepare_stmt(struct txn_stmt *stmt)
 			i++;
 			continue;
 		}
+		bool old_story_is_prepared = false;
 		struct txm_story *old_story =
 			story->link[i].older.story;
 		if (old_story->del_psn != 0) {
-			/* is psn is set, the change is prepared. */
-			i++;
-			continue;
-		}
-		if (old_story->add_psn != 0) {
-			/* is psn is set, the change is prepared. */
-			i++;
-			continue;
-		}
-
-		if (old_story->add_stmt != NULL) {
+			/* if psn is set, the change is prepared. */
+			old_story_is_prepared = true;
+		} else if (old_story->add_psn != 0) {
+			/* if psn is set, the change is prepared. */
+			old_story_is_prepared = true;
+		} else if (old_story->add_stmt != NULL) {
 			/* ancient. */
-			i++;
-			continue;
-		}
-		if (old_story->add_stmt->txn == stmt->txn) {
+			old_story_is_prepared = true;
+		} else if (old_story->add_stmt->txn == stmt->txn) {
 			/* added by us. */
+		}
+
+		if (old_story_is_prepared) {
+			struct tx_read_tracker *tracker;
+			rlist_foreach_entry(tracker, &old_story->reader_list,
+					    in_reader_list) {
+				if (tracker->reader == stmt->txn)
+					continue;
+				if (tracker->reader->status != TXN_INPROGRESS)
+					continue;
+				tracker->reader->status = TXN_CONFLICTED;
+			}
 			i++;
 			continue;
 		}
@@ -2091,7 +2121,6 @@ txm_tuple_clarify_slow(struct txn *txn, struct space *space,
 		       struct tuple *tuple, uint32_t index,
 		       uint32_t mk_index, bool is_prepared_ok)
 {
-	(void)space;
 	assert(tuple->is_dirty);
 	struct txm_story *story = txm_story_get(tuple);
 	bool own_change = false;
@@ -2109,7 +2138,8 @@ txm_tuple_clarify_slow(struct txn *txn, struct space *space,
 			break;
 		}
 	}
-	(void)own_change; /* TODO: add conflict */
+	if (!own_change)
+		txm_track_read(txn, space, tuple);
 	(void)mk_index; /* TODO: multiindex */
 	return result;
 }
@@ -2123,6 +2153,7 @@ txm_story_delete(struct txm_story *story)
 	if (txm.traverse_all_stories == &story->in_all_stories)
 		txm.traverse_all_stories = rlist_next(txm.traverse_all_stories);
 	rlist_del(&story->in_all_stories);
+	rlist_del(&story->in_space_stories);
 
 	mh_int_t pos = mh_history_find(txm.history, story->tuple, 0);
 	assert(pos != mh_end(txm.history));
@@ -2143,3 +2174,168 @@ txm_story_delete(struct txm_story *story)
 	struct mempool *pool = &txm.txm_story_pool[story->index_count];
 	mempool_free(pool, story);
 }
+
+
+static uint32_t
+txm_snapshot_cleaner_hash(const struct tuple *a)
+{
+	uintptr_t u = (uintptr_t)a;
+	if (sizeof(uintptr_t) <= sizeof(uint32_t))
+		return u;
+	else
+		return u ^ (u >> 32);
+}
+
+struct txm_snapshot_cleaner_entry
+{
+	struct tuple *from;
+	struct tuple *to;
+};
+
+#define mh_name _snapshot_cleaner
+#define mh_key_t struct tuple *
+#define mh_node_t struct txm_snapshot_cleaner_entry
+#define mh_arg_t int
+#define mh_hash(a, arg) (txm_snapshot_cleaner_hash((a)->from))
+#define mh_hash_key(a, arg) (txm_snapshot_cleaner_hash(a))
+#define mh_cmp(a, b, arg) (((a)->from) != ((b)->from))
+#define mh_cmp_key(a, b, arg) ((a) != ((b)->from))
+#define MH_SOURCE
+#include "salad/mhash.h"
+
+int
+txm_snapshot_cleaner_create(struct txm_snapshot_cleaner *cleaner,
+			    struct space *space, const char *index_name)
+{
+	cleaner->ht = NULL;
+	if (space == NULL || rlist_empty(&space->txm_stories))
+		return 0;
+	struct mh_snapshot_cleaner_t *ht = mh_snapshot_cleaner_new();
+	if (ht == NULL) {
+		diag_set(OutOfMemory, sizeof(*ht),
+			 index_name, "snapshot cleaner");
+		free(ht);
+		return -1;
+	}
+
+	struct txm_story *story;
+	rlist_foreach_entry(story, &space->txm_stories, in_space_stories) {
+		struct tuple *tuple = story->tuple;
+		struct tuple *clean =
+			txm_tuple_clarify_slow(NULL, space, tuple, 0, 0, true);
+		if (clean == tuple)
+			continue;
+
+		struct txm_snapshot_cleaner_entry entry;
+		entry.from = tuple;
+		entry.to = clean;
+		mh_int_t res =  mh_snapshot_cleaner_put(ht,  &entry, NULL, 0);
+		if (res == mh_end(ht)) {
+			diag_set(OutOfMemory, sizeof(entry),
+				 index_name, "snapshot rollback entry");
+			mh_snapshot_cleaner_delete(ht);
+			return -1;
+		}
+	}
+
+	cleaner->ht = ht;
+	return 0;
+}
+
+struct tuple *
+txm_snapshot_clarify_slow(struct txm_snapshot_cleaner *cleaner,
+			  struct tuple *tuple)
+{
+	assert(cleaner->ht != NULL);
+
+	struct mh_snapshot_cleaner_t *ht = cleaner->ht;
+	while (true) {
+		mh_int_t pos =  mh_snapshot_cleaner_find(ht, tuple, 0);
+		if (pos == mh_end(ht))
+			break;
+		struct txm_snapshot_cleaner_entry *entry =
+			mh_snapshot_cleaner_node(ht, pos);
+		assert(entry->from == tuple);
+		tuple = entry->to;
+	}
+
+	return tuple;
+}
+
+
+void
+txm_snapshot_cleaner_destroy(struct txm_snapshot_cleaner *cleaner)
+{
+	if (cleaner->ht != NULL)
+		mh_snapshot_cleaner_delete(cleaner->ht);
+}
+
+int
+txm_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
+{
+	if (tuple == NULL)
+		return 0;
+	if (txn == NULL)
+		return 0;
+	if (space == NULL)
+		return 0;
+
+	struct txm_story *story;
+	struct tx_read_tracker *tracker = NULL;
+
+	if (!tuple->is_dirty) {
+		story = txm_story_new(space, tuple);
+		if (story != NULL)
+			return -1;
+		size_t sz;
+		tracker = region_alloc_object(&txn->region,
+					      struct tx_read_tracker, &sz);
+		if (tracker == NULL) {
+			diag_set(OutOfMemory, sz, "tx region", "read_tracker");
+			txm_story_delete(story);
+			return -1;
+		}
+		tracker->reader = txn;
+		tracker->story = story;
+		rlist_add(&story->reader_list, &tracker->in_reader_list);
+		rlist_add(&txn->read_set, &tracker->in_read_set);
+		return 0;
+	}
+	story = txm_story_get(tuple);
+
+	struct rlist *r1 = story->reader_list.next;
+	struct rlist *r2 = txn->read_set.next;
+	while (r1 != &story->reader_list && r2 != &txn->read_set) {
+		tracker = rlist_entry(r1, struct tx_read_tracker,
+				      in_reader_list);
+		assert(tracker->story == story);
+		if (tracker->reader == txn)
+			break;
+		tracker = rlist_entry(r2, struct tx_read_tracker,
+				      in_read_set);
+		assert(tracker->reader == txn);
+		if (tracker->story == story)
+			break;
+		tracker = NULL;
+		r1 = r1->next;
+		r2 = r2->next;
+	}
+	if (tracker != NULL) {
+		/* Move to the beginning of a list for faster further lookups.*/
+		rlist_del(&tracker->in_reader_list);
+		rlist_del(&tracker->in_read_set);
+	} else {
+		size_t sz;
+		tracker = region_alloc_object(&txn->region,
+					      struct tx_read_tracker, &sz);
+		if (tracker == NULL) {
+			diag_set(OutOfMemory, sz, "tx region", "read_tracker");
+			return -1;
+		}
+		tracker->reader = txn;
+		tracker->story = story;
+	}
+	rlist_add(&story->reader_list, &tracker->in_reader_list);
+	rlist_add(&txn->read_set, &tracker->in_read_set);
+	return 0;
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index e294497..5fe87c5 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -379,6 +379,8 @@ struct txn {
 	 * Link in tx_manager::read_view_txs.
 	 */
 	struct rlist in_read_view_txs;
+	/** List of tx_read_trackers with stories that the TX have read. */
+	struct rlist read_set;
 };
 
 /**
@@ -453,6 +455,10 @@ struct txm_story {
 	 */
 	struct rlist in_all_stories;
 	/**
+	 * Link in space::txm_stories.
+	 */
+	struct rlist in_space_stories;
+	/**
 	 * Number of indexes in this space - and the count of link[].
 	 */
 	uint32_t index_count;
@@ -913,6 +919,13 @@ txm_history_prepare_stmt(struct txn_stmt *stmt);
 ssize_t
 txm_history_commit_stmt(struct txn_stmt *stmt);
 
+/**
+ * Record in TX manager that a transaction @txn have read a @tuple in @space.
+ * @return 0 on success, -1 on memory error.
+ */
+int
+txm_track_read(struct txn *txn, struct space *space, struct tuple *tuple);
+
 /** Helper of txm_tuple_clarify */
 struct tuple *
 txm_tuple_clarify_slow(struct txn *txn, struct space *space,
@@ -936,12 +949,61 @@ txm_tuple_clarify(struct txn *txn, struct space *space,
 {
 	if (!tx_manager_use_mvcc_engine)
 		return tuple;
-	if (!tuple->is_dirty)
+	if (!tuple->is_dirty) {
+		txm_track_read(txn, space, tuple);
 		return tuple;
+	}
 	return txm_tuple_clarify_slow(txn, space, tuple, index, mk_index,
 				      is_prepared_ok);
 }
 
+/**
+ * Snapshot cleaner is a short part of history that is supposed to clarify
+ * tuples in a index snapshot. It's also supposed to be used in another
+ * thread while common clarify would probably crash in that case.
+ */
+struct txm_snapshot_cleaner {
+	struct mh_snapshot_cleaner_t *ht;
+};
+
+/**
+ * Create a snapshot cleaner.
+ * @param cleaner - cleaner to create.
+ * @param space - space for which the cleaner must be created.
+ * @param index_name - name of index for diag in case of memory error.
+ * @return 0 on success, -1 on memory erorr.
+ */
+int
+txm_snapshot_cleaner_create(struct txm_snapshot_cleaner *cleaner,
+			    struct space *space, const char *index_name);
+
+/** Helper of txm_snapshot_clafify. */
+struct tuple *
+txm_snapshot_clarify_slow(struct txm_snapshot_cleaner *cleaner,
+			  struct tuple *tuple);
+
+/**
+ * Like a common clarify that function returns proper tuple if original
+ * tuple in index is dirty.
+ * @param cleaner - pre-created snapshot cleaner.
+ * @param tuple - tuple to clean.
+ * @return cleaned tuple, can be NULL.
+ */
+static inline struct tuple *
+txm_snapshot_clarify(struct txm_snapshot_cleaner *cleaner,
+		     struct tuple *tuple)
+{
+	if (cleaner->ht == NULL)
+		return tuple;
+	return txm_snapshot_clarify_slow(cleaner, tuple);
+}
+
+/**
+ * Free resources.in shapshot @cleaner.
+ */
+void
+txm_snapshot_cleaner_destroy(struct txm_snapshot_cleaner *cleaner);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
-- 
2.7.4



More information about the Tarantool-patches mailing list