[Tarantool-patches] [PATCH v3 12/13] txm: clarify all fetched tuples
Aleksandr Lyapunov
alyapunov at tarantool.org
Wed Jul 15 16:55:35 MSK 2020
If a tuple fetched from an index is dirty - it must be clarified.
Let's fix all fetched from indexeds in that way.
Also fix a snapshot iterator - it must save a part of history
along with creating a read view in order to clean tuple during
iteration from another thread.
Part of #4897
---
src/box/memtx_bitset.c | 30 ++++---
src/box/memtx_hash.c | 79 ++++++++++++++---
src/box/memtx_rtree.c | 30 ++++++-
src/box/memtx_tree.c | 119 +++++++++++++++++++++----
src/box/space.c | 2 +
src/box/space.h | 4 +
src/box/txn.c | 230 +++++++++++++++++++++++++++++++++++++++++++++----
src/box/txn.h | 64 +++++++++++++-
8 files changed, 499 insertions(+), 59 deletions(-)
diff --git a/src/box/memtx_bitset.c b/src/box/memtx_bitset.c
index 67eaf6f..68845c5 100644
--- a/src/box/memtx_bitset.c
+++ b/src/box/memtx_bitset.c
@@ -39,7 +39,9 @@
#include "bitset/index.h"
#include "fiber.h"
#include "index.h"
+#include "schema.h"
#include "tuple.h"
+#include "txn.h"
#include "memtx_engine.h"
struct memtx_bitset_index {
@@ -198,19 +200,27 @@ bitset_index_iterator_next(struct iterator *iterator, struct tuple **ret)
assert(iterator->free == bitset_index_iterator_free);
struct bitset_index_iterator *it = bitset_index_iterator(iterator);
- size_t value = tt_bitset_iterator_next(&it->bitset_it);
- if (value == SIZE_MAX) {
- *ret = NULL;
- return 0;
- }
-
+ do {
+ size_t value = tt_bitset_iterator_next(&it->bitset_it);
+ if (value == SIZE_MAX) {
+ *ret = NULL;
+ return 0;
+ }
#ifndef OLD_GOOD_BITSET
- struct memtx_bitset_index *index =
- (struct memtx_bitset_index *)iterator->index;
- *ret = memtx_bitset_index_value_to_tuple(index, value);
+ struct memtx_bitset_index *index =
+ (struct memtx_bitset_index *)iterator->index;
+ struct tuple *tuple =
+ memtx_bitset_index_value_to_tuple(index, value);
#else /* #ifndef OLD_GOOD_BITSET */
- *ret = value_to_tuple(value);
+ struct tuple *tuple = value_to_tuple(value);
#endif /* #ifndef OLD_GOOD_BITSET */
+ uint32_t iid = iterator->index->def->iid;
+ struct txn *txn = in_txn();
+ struct space *space = space_by_id(iterator->space_id);
+ bool is_rw = txn != NULL;
+ *ret = txm_tuple_clarify(txn, space, tuple, iid, 0, is_rw);
+ } while (*ret == NULL);
+
return 0;
}
diff --git a/src/box/memtx_hash.c b/src/box/memtx_hash.c
index cdd531c..6e3bc18 100644
--- a/src/box/memtx_hash.c
+++ b/src/box/memtx_hash.c
@@ -33,9 +33,10 @@
#include "fiber.h"
#include "index.h"
#include "tuple.h"
+#include "txn.h"
#include "memtx_engine.h"
#include "space.h"
-#include "schema.h" /* space_cache_find() */
+#include "schema.h" /* space_by_id(), space_cache_find() */
#include "errinj.h"
#include <small/mempool.h>
@@ -101,7 +102,7 @@ hash_iterator_free(struct iterator *iterator)
}
static int
-hash_iterator_ge(struct iterator *ptr, struct tuple **ret)
+hash_iterator_ge_base(struct iterator *ptr, struct tuple **ret)
{
assert(ptr->free == hash_iterator_free);
struct hash_iterator *it = (struct hash_iterator *) ptr;
@@ -113,10 +114,10 @@ hash_iterator_ge(struct iterator *ptr, struct tuple **ret)
}
static int
-hash_iterator_gt(struct iterator *ptr, struct tuple **ret)
+hash_iterator_gt_base(struct iterator *ptr, struct tuple **ret)
{
assert(ptr->free == hash_iterator_free);
- ptr->next = hash_iterator_ge;
+ ptr->next = hash_iterator_ge_base;
struct hash_iterator *it = (struct hash_iterator *) ptr;
struct memtx_hash_index *index = (struct memtx_hash_index *)ptr->index;
struct tuple **res = light_index_iterator_get_and_next(&index->hash_table,
@@ -128,6 +129,32 @@ hash_iterator_gt(struct iterator *ptr, struct tuple **ret)
return 0;
}
+#define WRAP_ITERATOR_METHOD(name) \
+static int \
+name(struct iterator *iterator, struct tuple **ret) \
+{ \
+ struct txn *txn = in_txn(); \
+ struct space *space = space_by_id(iterator->space_id); \
+ bool is_rw = txn != NULL; \
+ uint32_t iid = iterator->index->def->iid; \
+ bool is_first = true; \
+ do { \
+ int rc = is_first ? name##_base(iterator, ret) \
+ : hash_iterator_ge_base(iterator, ret); \
+ if (rc != 0 || *ret == NULL) \
+ return rc; \
+ is_first = false; \
+ *ret = txm_tuple_clarify(txn, space, *ret, iid, 0, is_rw); \
+ } while (*ret == NULL); \
+ return 0; \
+} \
+struct forgot_to_add_semicolon
+
+WRAP_ITERATOR_METHOD(hash_iterator_ge);
+WRAP_ITERATOR_METHOD(hash_iterator_gt);
+
+#undef WRAP_ITERATOR_METHOD
+
static int
hash_iterator_eq_next(MAYBE_UNUSED struct iterator *it, struct tuple **ret)
{
@@ -139,7 +166,14 @@ static int
hash_iterator_eq(struct iterator *it, struct tuple **ret)
{
it->next = hash_iterator_eq_next;
- return hash_iterator_ge(it, ret);
+ hash_iterator_ge_base(it, ret); /* always returns zero. */
+ if (*ret == NULL)
+ return 0;
+ struct txn *txn = in_txn();
+ struct space *sp = space_by_id(it->space_id);
+ bool is_rw = txn != NULL;
+ *ret = txm_tuple_clarify(txn, sp, *ret, it->index->def->iid, 0, is_rw);
+ return 0;
}
/* }}} */
@@ -279,11 +313,17 @@ memtx_hash_index_get(struct index *base, const char *key,
part_count == base->def->key_def->part_count);
(void) part_count;
+ struct space *space = space_by_id(base->def->space_id);
*result = NULL;
uint32_t h = key_hash(key, base->def->key_def);
uint32_t k = light_index_find_key(&index->hash_table, h, key);
- if (k != light_index_end)
- *result = light_index_get(&index->hash_table, k);
+ if (k != light_index_end) {
+ struct tuple *tuple = light_index_get(&index->hash_table, k);
+ uint32_t iid = base->def->iid;
+ struct txn *txn = in_txn();
+ bool is_rw = txn != NULL;
+ *result = txm_tuple_clarify(txn, space, tuple, iid, 0, is_rw);
+ }
return 0;
}
@@ -401,6 +441,7 @@ struct hash_snapshot_iterator {
struct snapshot_iterator base;
struct memtx_hash_index *index;
struct light_index_iterator iterator;
+ struct txm_snapshot_cleaner cleaner;
};
/**
@@ -418,6 +459,7 @@ hash_snapshot_iterator_free(struct snapshot_iterator *iterator)
it->index->base.engine);
light_index_iterator_destroy(&it->index->hash_table, &it->iterator);
index_unref(&it->index->base);
+ txm_snapshot_cleaner_destroy(&it->cleaner);
free(iterator);
}
@@ -434,13 +476,24 @@ hash_snapshot_iterator_next(struct snapshot_iterator *iterator,
struct hash_snapshot_iterator *it =
(struct hash_snapshot_iterator *) iterator;
struct light_index_core *hash_table = &it->index->hash_table;
- struct tuple **res = light_index_iterator_get_and_next(hash_table,
- &it->iterator);
- if (res == NULL) {
- *data = NULL;
- return 0;
+
+ while (true) {
+ struct tuple **res =
+ light_index_iterator_get_and_next(hash_table,
+ &it->iterator);
+ if (res == NULL) {
+ *data = NULL;
+ return 0;
+ }
+
+ struct tuple *tuple = *res;
+ tuple = txm_snapshot_clarify(&it->cleaner, tuple);
+
+ if (tuple != NULL) {
+ *data = tuple_data_range(*res, size);
+ return 0;
+ }
}
- *data = tuple_data_range(*res, size);
return 0;
}
diff --git a/src/box/memtx_rtree.c b/src/box/memtx_rtree.c
index 612fcb2..c7f1655 100644
--- a/src/box/memtx_rtree.c
+++ b/src/box/memtx_rtree.c
@@ -40,7 +40,9 @@
#include "trivia/util.h"
#include "tuple.h"
+#include "txn.h"
#include "space.h"
+#include "schema.h"
#include "memtx_engine.h"
struct memtx_rtree_index {
@@ -148,7 +150,16 @@ static int
index_rtree_iterator_next(struct iterator *i, struct tuple **ret)
{
struct index_rtree_iterator *itr = (struct index_rtree_iterator *)i;
- *ret = (struct tuple *)rtree_iterator_next(&itr->impl);
+ do {
+ *ret = (struct tuple *) rtree_iterator_next(&itr->impl);
+ if (*ret == NULL)
+ break;
+ uint32_t iid = i->index->def->iid;
+ struct txn *txn = in_txn();
+ struct space *space = space_by_id(i->space_id);
+ bool is_rw = txn != NULL;
+ *ret = txm_tuple_clarify(txn, space, *ret, iid, 0, is_rw);
+ } while (*ret == NULL);
return 0;
}
@@ -213,8 +224,21 @@ memtx_rtree_index_get(struct index *base, const char *key,
unreachable();
*result = NULL;
- if (rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator))
- *result = (struct tuple *)rtree_iterator_next(&iterator);
+ if (!rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator)) {
+ rtree_iterator_destroy(&iterator);
+ return 0;
+ }
+ do {
+ struct tuple *tuple = (struct tuple *)
+ rtree_iterator_next(&iterator);
+ if (tuple == NULL)
+ break;
+ uint32_t iid = base->def->iid;
+ struct txn *txn = in_txn();
+ struct space *space = space_by_id(base->def->space_id);
+ bool is_rw = txn != NULL;
+ *result = txm_tuple_clarify(txn, space, tuple, iid, 0, is_rw);
+ } while (*result == NULL);
rtree_iterator_destroy(&iterator);
return 0;
}
diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index 76ff3fc..fe93467 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -31,12 +31,13 @@
#include "memtx_tree.h"
#include "memtx_engine.h"
#include "space.h"
-#include "schema.h" /* space_cache_find() */
+#include "schema.h" /* space_by_id(), space_cache_find() */
#include "errinj.h"
#include "memory.h"
#include "fiber.h"
#include "key_list.h"
#include "tuple.h"
+#include "txn.h"
#include <third_party/qsort_arg.h>
#include <small/mempool.h>
@@ -175,7 +176,7 @@ tree_iterator_dummie(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_next(struct iterator *iterator, struct tuple **ret)
+tree_iterator_next_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -205,7 +206,7 @@ tree_iterator_next(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
+tree_iterator_prev_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -234,7 +235,7 @@ tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
+tree_iterator_next_equal_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -270,7 +271,7 @@ tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
+tree_iterator_prev_equal_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -304,6 +305,47 @@ tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
return 0;
}
+#define WRAP_ITERATOR_METHOD(name) \
+static int \
+name(struct iterator *iterator, struct tuple **ret) \
+{ \
+ struct memtx_tree *tree = \
+ &((struct memtx_tree_index *)iterator->index)->tree; \
+ struct tree_iterator *it = tree_iterator(iterator); \
+ struct memtx_tree_iterator *ti = &it->tree_iterator; \
+ uint32_t iid = iterator->index->def->iid; \
+ bool is_multikey = iterator->index->def->key_def->is_multikey; \
+ struct txn *txn = in_txn(); \
+ struct space *space = space_by_id(iterator->space_id); \
+ bool is_rw = txn != NULL; \
+ do { \
+ int rc = name##_base(iterator, ret); \
+ if (rc != 0 || *ret == NULL) \
+ return rc; \
+ uint32_t mk_index = 0; \
+ if (is_multikey) { \
+ struct memtx_tree_data *check = \
+ memtx_tree_iterator_get_elem(tree, ti); \
+ assert(check != NULL); \
+ mk_index = check->hint; \
+ } \
+ *ret = txm_tuple_clarify(txn, space, *ret, \
+ iid, mk_index, is_rw); \
+ } while (*ret == NULL); \
+ tuple_unref(it->current.tuple); \
+ it->current.tuple = *ret; \
+ tuple_ref(it->current.tuple); \
+ return 0; \
+} \
+struct forgot_to_add_semicolon
+
+WRAP_ITERATOR_METHOD(tree_iterator_next);
+WRAP_ITERATOR_METHOD(tree_iterator_prev);
+WRAP_ITERATOR_METHOD(tree_iterator_next_equal);
+WRAP_ITERATOR_METHOD(tree_iterator_prev_equal);
+
+#undef WRAP_ITERATOR_METHOD
+
static void
tree_iterator_set_next_method(struct tree_iterator *it)
{
@@ -388,6 +430,22 @@ tree_iterator_start(struct iterator *iterator, struct tuple **ret)
tuple_ref(*ret);
it->current = *res;
tree_iterator_set_next_method(it);
+
+ uint32_t iid = iterator->index->def->iid;
+ bool is_multikey = iterator->index->def->key_def->is_multikey;
+ struct txn *txn = in_txn();
+ struct space *space = space_by_id(iterator->space_id);
+ bool is_rw = txn != NULL;
+ uint32_t mk_index = is_multikey ? res->hint : 0;
+ *ret = txm_tuple_clarify(txn, space, *ret, iid, mk_index, is_rw);
+ if (*ret == NULL) {
+ return iterator->next(iterator, ret);
+ } else {
+ tuple_unref(it->current.tuple);
+ it->current.tuple = *ret;
+ tuple_ref(it->current.tuple);
+ }
+
return 0;
}
@@ -539,7 +597,16 @@ memtx_tree_index_get(struct index *base, const char *key,
key_data.part_count = part_count;
key_data.hint = key_hint(key, part_count, cmp_def);
struct memtx_tree_data *res = memtx_tree_find(&index->tree, &key_data);
- *result = res != NULL ? res->tuple : NULL;
+ if (res == NULL) {
+ *result = NULL;
+ return 0;
+ }
+ struct txn *txn = in_txn();
+ struct space *space = space_by_id(base->def->space_id);
+ bool is_rw = txn != NULL;
+ uint32_t mk_index = base->def->key_def->is_multikey ? res->hint : 0;
+ *result = txm_tuple_clarify(txn, space, res->tuple, base->def->iid,
+ mk_index, is_rw);
return 0;
}
@@ -1208,6 +1275,7 @@ struct tree_snapshot_iterator {
struct snapshot_iterator base;
struct memtx_tree_index *index;
struct memtx_tree_iterator tree_iterator;
+ struct txm_snapshot_cleaner cleaner;
};
static void
@@ -1220,6 +1288,7 @@ tree_snapshot_iterator_free(struct snapshot_iterator *iterator)
it->index->base.engine);
memtx_tree_iterator_destroy(&it->index->tree, &it->tree_iterator);
index_unref(&it->index->base);
+ txm_snapshot_cleaner_destroy(&it->cleaner);
free(iterator);
}
@@ -1231,14 +1300,27 @@ tree_snapshot_iterator_next(struct snapshot_iterator *iterator,
struct tree_snapshot_iterator *it =
(struct tree_snapshot_iterator *)iterator;
struct memtx_tree *tree = &it->index->tree;
- struct memtx_tree_data *res = memtx_tree_iterator_get_elem(tree,
- &it->tree_iterator);
- if (res == NULL) {
- *data = NULL;
- return 0;
+
+ while (true) {
+ struct memtx_tree_data *res =
+ memtx_tree_iterator_get_elem(tree, &it->tree_iterator);
+
+ if (res == NULL) {
+ *data = NULL;
+ return 0;
+ }
+
+ memtx_tree_iterator_next(tree, &it->tree_iterator);
+
+ struct tuple *tuple = res->tuple;
+ tuple = txm_snapshot_clarify(&it->cleaner, tuple);
+
+ if (tuple != NULL) {
+ *data = tuple_data_range(tuple, size);
+ return 0;
+ }
}
- memtx_tree_iterator_next(tree, &it->tree_iterator);
- *data = tuple_data_range(res->tuple, size);
+
return 0;
}
@@ -1251,14 +1333,21 @@ static struct snapshot_iterator *
memtx_tree_index_create_snapshot_iterator(struct index *base)
{
struct memtx_tree_index *index = (struct memtx_tree_index *)base;
- struct tree_snapshot_iterator *it = (struct tree_snapshot_iterator *)
- calloc(1, sizeof(*it));
+ struct tree_snapshot_iterator *it =
+ (struct tree_snapshot_iterator *) calloc(1, sizeof(*it));
if (it == NULL) {
diag_set(OutOfMemory, sizeof(struct tree_snapshot_iterator),
"memtx_tree_index", "create_snapshot_iterator");
return NULL;
}
+ struct space *space = space_cache_find(base->def->space_id);
+ if (txm_snapshot_cleaner_create(&it->cleaner, space,
+ "memtx_tree_index") != 0) {
+ free(it);
+ return NULL;
+ }
+
it->base.free = tree_snapshot_iterator_free;
it->base.next = tree_snapshot_iterator_next;
it->index = index;
diff --git a/src/box/space.c b/src/box/space.c
index 1d375cc..7394316 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -210,6 +210,7 @@ space_create(struct space *space, struct engine *engine,
"constraint_ids");
goto fail;
}
+ rlist_create(&space->txm_stories);
return 0;
fail_free_indexes:
@@ -252,6 +253,7 @@ space_new_ephemeral(struct space_def *def, struct rlist *key_list)
void
space_delete(struct space *space)
{
+ rlist_del(&space->txm_stories);
assert(space->ck_constraint_trigger == NULL);
for (uint32_t j = 0; j <= space->index_id_max; j++) {
struct index *index = space->index_map[j];
diff --git a/src/box/space.h b/src/box/space.h
index bbdd3ef..9dcc4e7 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -239,6 +239,10 @@ struct space {
* Hash table with constraint identifiers hashed by name.
*/
struct mh_strnptr_t *constraint_ids;
+ /**
+ * List of all tx stories in the space.
+ */
+ struct rlist txm_stories;
};
/** Initialize a base space instance. */
diff --git a/src/box/txn.c b/src/box/txn.c
index 4c46b60..b4888b3 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -101,6 +101,20 @@ struct tx_conflict_tracker {
struct rlist in_conflicted_by_list;
};
+/**
+ * Record that links transaction and a story that the transaction have read.
+ */
+struct tx_read_tracker {
+ /** The TX that read story. */
+ struct txn *reader;
+ /** The story that was read by reader. */
+ struct txm_story *story;
+ /** Link in story->reader_list. */
+ struct rlist in_reader_list;
+ /** Link in reader->read_set. */
+ struct rlist in_read_set;
+};
+
double too_long_threshold;
/* Txn cache. */
@@ -263,6 +277,7 @@ txn_new(void)
}
assert(region_used(®ion) == sizeof(*txn));
txn->region = region;
+ rlist_create(&txn->read_set);
rlist_create(&txn->conflict_list);
rlist_create(&txn->conflicted_by_list);
rlist_create(&txn->in_read_view_txs);
@@ -275,6 +290,14 @@ txn_new(void)
inline static void
txn_free(struct txn *txn)
{
+ struct tx_read_tracker *tracker, *tmp;
+ rlist_foreach_entry_safe(tracker, &txn->read_set,
+ in_read_set, tmp) {
+ rlist_del(&tracker->in_reader_list);
+ rlist_del(&tracker->in_read_set);
+ }
+ assert(rlist_empty(&txn->read_set));
+
struct tx_conflict_tracker *entry, *next;
rlist_foreach_entry_safe(entry, &txn->conflict_list,
in_conflict_list, next) {
@@ -1430,6 +1453,7 @@ txm_story_new(struct space *space, struct tuple *tuple)
story->del_psn = 0;
rlist_create(&story->reader_list);
rlist_add_tail(&txm.all_stories, &story->in_all_stories);
+ rlist_add(&space->txm_stories, &story->in_space_stories);
memset(story->link, 0, sizeof(story->link[0]) * index_count);
return story;
}
@@ -1963,26 +1987,32 @@ txm_history_prepare_stmt(struct txn_stmt *stmt)
i++;
continue;
}
+ bool old_story_is_prepared = false;
struct txm_story *old_story =
story->link[i].older.story;
if (old_story->del_psn != 0) {
- /* is psn is set, the change is prepared. */
- i++;
- continue;
- }
- if (old_story->add_psn != 0) {
- /* is psn is set, the change is prepared. */
- i++;
- continue;
- }
-
- if (old_story->add_stmt != NULL) {
+ /* if psn is set, the change is prepared. */
+ old_story_is_prepared = true;
+ } else if (old_story->add_psn != 0) {
+ /* if psn is set, the change is prepared. */
+ old_story_is_prepared = true;
+ } else if (old_story->add_stmt != NULL) {
/* ancient. */
- i++;
- continue;
- }
- if (old_story->add_stmt->txn == stmt->txn) {
+ old_story_is_prepared = true;
+ } else if (old_story->add_stmt->txn == stmt->txn) {
/* added by us. */
+ }
+
+ if (old_story_is_prepared) {
+ struct tx_read_tracker *tracker;
+ rlist_foreach_entry(tracker, &old_story->reader_list,
+ in_reader_list) {
+ if (tracker->reader == stmt->txn)
+ continue;
+ if (tracker->reader->status != TXN_INPROGRESS)
+ continue;
+ tracker->reader->status = TXN_CONFLICTED;
+ }
i++;
continue;
}
@@ -2091,7 +2121,6 @@ txm_tuple_clarify_slow(struct txn *txn, struct space *space,
struct tuple *tuple, uint32_t index,
uint32_t mk_index, bool is_prepared_ok)
{
- (void)space;
assert(tuple->is_dirty);
struct txm_story *story = txm_story_get(tuple);
bool own_change = false;
@@ -2109,7 +2138,8 @@ txm_tuple_clarify_slow(struct txn *txn, struct space *space,
break;
}
}
- (void)own_change; /* TODO: add conflict */
+ if (!own_change)
+ txm_track_read(txn, space, tuple);
(void)mk_index; /* TODO: multiindex */
return result;
}
@@ -2123,6 +2153,7 @@ txm_story_delete(struct txm_story *story)
if (txm.traverse_all_stories == &story->in_all_stories)
txm.traverse_all_stories = rlist_next(txm.traverse_all_stories);
rlist_del(&story->in_all_stories);
+ rlist_del(&story->in_space_stories);
mh_int_t pos = mh_history_find(txm.history, story->tuple, 0);
assert(pos != mh_end(txm.history));
@@ -2143,3 +2174,168 @@ txm_story_delete(struct txm_story *story)
struct mempool *pool = &txm.txm_story_pool[story->index_count];
mempool_free(pool, story);
}
+
+
+static uint32_t
+txm_snapshot_cleaner_hash(const struct tuple *a)
+{
+ uintptr_t u = (uintptr_t)a;
+ if (sizeof(uintptr_t) <= sizeof(uint32_t))
+ return u;
+ else
+ return u ^ (u >> 32);
+}
+
+struct txm_snapshot_cleaner_entry
+{
+ struct tuple *from;
+ struct tuple *to;
+};
+
+#define mh_name _snapshot_cleaner
+#define mh_key_t struct tuple *
+#define mh_node_t struct txm_snapshot_cleaner_entry
+#define mh_arg_t int
+#define mh_hash(a, arg) (txm_snapshot_cleaner_hash((a)->from))
+#define mh_hash_key(a, arg) (txm_snapshot_cleaner_hash(a))
+#define mh_cmp(a, b, arg) (((a)->from) != ((b)->from))
+#define mh_cmp_key(a, b, arg) ((a) != ((b)->from))
+#define MH_SOURCE
+#include "salad/mhash.h"
+
+int
+txm_snapshot_cleaner_create(struct txm_snapshot_cleaner *cleaner,
+ struct space *space, const char *index_name)
+{
+ cleaner->ht = NULL;
+ if (space == NULL || rlist_empty(&space->txm_stories))
+ return 0;
+ struct mh_snapshot_cleaner_t *ht = mh_snapshot_cleaner_new();
+ if (ht == NULL) {
+ diag_set(OutOfMemory, sizeof(*ht),
+ index_name, "snapshot cleaner");
+ free(ht);
+ return -1;
+ }
+
+ struct txm_story *story;
+ rlist_foreach_entry(story, &space->txm_stories, in_space_stories) {
+ struct tuple *tuple = story->tuple;
+ struct tuple *clean =
+ txm_tuple_clarify_slow(NULL, space, tuple, 0, 0, true);
+ if (clean == tuple)
+ continue;
+
+ struct txm_snapshot_cleaner_entry entry;
+ entry.from = tuple;
+ entry.to = clean;
+ mh_int_t res = mh_snapshot_cleaner_put(ht, &entry, NULL, 0);
+ if (res == mh_end(ht)) {
+ diag_set(OutOfMemory, sizeof(entry),
+ index_name, "snapshot rollback entry");
+ mh_snapshot_cleaner_delete(ht);
+ return -1;
+ }
+ }
+
+ cleaner->ht = ht;
+ return 0;
+}
+
+struct tuple *
+txm_snapshot_clarify_slow(struct txm_snapshot_cleaner *cleaner,
+ struct tuple *tuple)
+{
+ assert(cleaner->ht != NULL);
+
+ struct mh_snapshot_cleaner_t *ht = cleaner->ht;
+ while (true) {
+ mh_int_t pos = mh_snapshot_cleaner_find(ht, tuple, 0);
+ if (pos == mh_end(ht))
+ break;
+ struct txm_snapshot_cleaner_entry *entry =
+ mh_snapshot_cleaner_node(ht, pos);
+ assert(entry->from == tuple);
+ tuple = entry->to;
+ }
+
+ return tuple;
+}
+
+
+void
+txm_snapshot_cleaner_destroy(struct txm_snapshot_cleaner *cleaner)
+{
+ if (cleaner->ht != NULL)
+ mh_snapshot_cleaner_delete(cleaner->ht);
+}
+
+int
+txm_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
+{
+ if (tuple == NULL)
+ return 0;
+ if (txn == NULL)
+ return 0;
+ if (space == NULL)
+ return 0;
+
+ struct txm_story *story;
+ struct tx_read_tracker *tracker = NULL;
+
+ if (!tuple->is_dirty) {
+ story = txm_story_new(space, tuple);
+ if (story != NULL)
+ return -1;
+ size_t sz;
+ tracker = region_alloc_object(&txn->region,
+ struct tx_read_tracker, &sz);
+ if (tracker == NULL) {
+ diag_set(OutOfMemory, sz, "tx region", "read_tracker");
+ txm_story_delete(story);
+ return -1;
+ }
+ tracker->reader = txn;
+ tracker->story = story;
+ rlist_add(&story->reader_list, &tracker->in_reader_list);
+ rlist_add(&txn->read_set, &tracker->in_read_set);
+ return 0;
+ }
+ story = txm_story_get(tuple);
+
+ struct rlist *r1 = story->reader_list.next;
+ struct rlist *r2 = txn->read_set.next;
+ while (r1 != &story->reader_list && r2 != &txn->read_set) {
+ tracker = rlist_entry(r1, struct tx_read_tracker,
+ in_reader_list);
+ assert(tracker->story == story);
+ if (tracker->reader == txn)
+ break;
+ tracker = rlist_entry(r2, struct tx_read_tracker,
+ in_read_set);
+ assert(tracker->reader == txn);
+ if (tracker->story == story)
+ break;
+ tracker = NULL;
+ r1 = r1->next;
+ r2 = r2->next;
+ }
+ if (tracker != NULL) {
+ /* Move to the beginning of a list for faster further lookups.*/
+ rlist_del(&tracker->in_reader_list);
+ rlist_del(&tracker->in_read_set);
+ } else {
+ size_t sz;
+ tracker = region_alloc_object(&txn->region,
+ struct tx_read_tracker, &sz);
+ if (tracker == NULL) {
+ diag_set(OutOfMemory, sz, "tx region", "read_tracker");
+ return -1;
+ }
+ tracker->reader = txn;
+ tracker->story = story;
+ }
+ rlist_add(&story->reader_list, &tracker->in_reader_list);
+ rlist_add(&txn->read_set, &tracker->in_read_set);
+ return 0;
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index e294497..5fe87c5 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -379,6 +379,8 @@ struct txn {
* Link in tx_manager::read_view_txs.
*/
struct rlist in_read_view_txs;
+ /** List of tx_read_trackers with stories that the TX have read. */
+ struct rlist read_set;
};
/**
@@ -453,6 +455,10 @@ struct txm_story {
*/
struct rlist in_all_stories;
/**
+ * Link in space::txm_stories.
+ */
+ struct rlist in_space_stories;
+ /**
* Number of indexes in this space - and the count of link[].
*/
uint32_t index_count;
@@ -913,6 +919,13 @@ txm_history_prepare_stmt(struct txn_stmt *stmt);
ssize_t
txm_history_commit_stmt(struct txn_stmt *stmt);
+/**
+ * Record in TX manager that a transaction @txn have read a @tuple in @space.
+ * @return 0 on success, -1 on memory error.
+ */
+int
+txm_track_read(struct txn *txn, struct space *space, struct tuple *tuple);
+
/** Helper of txm_tuple_clarify */
struct tuple *
txm_tuple_clarify_slow(struct txn *txn, struct space *space,
@@ -936,12 +949,61 @@ txm_tuple_clarify(struct txn *txn, struct space *space,
{
if (!tx_manager_use_mvcc_engine)
return tuple;
- if (!tuple->is_dirty)
+ if (!tuple->is_dirty) {
+ txm_track_read(txn, space, tuple);
return tuple;
+ }
return txm_tuple_clarify_slow(txn, space, tuple, index, mk_index,
is_prepared_ok);
}
+/**
+ * Snapshot cleaner is a short part of history that is supposed to clarify
+ * tuples in a index snapshot. It's also supposed to be used in another
+ * thread while common clarify would probably crash in that case.
+ */
+struct txm_snapshot_cleaner {
+ struct mh_snapshot_cleaner_t *ht;
+};
+
+/**
+ * Create a snapshot cleaner.
+ * @param cleaner - cleaner to create.
+ * @param space - space for which the cleaner must be created.
+ * @param index_name - name of index for diag in case of memory error.
+ * @return 0 on success, -1 on memory erorr.
+ */
+int
+txm_snapshot_cleaner_create(struct txm_snapshot_cleaner *cleaner,
+ struct space *space, const char *index_name);
+
+/** Helper of txm_snapshot_clafify. */
+struct tuple *
+txm_snapshot_clarify_slow(struct txm_snapshot_cleaner *cleaner,
+ struct tuple *tuple);
+
+/**
+ * Like a common clarify that function returns proper tuple if original
+ * tuple in index is dirty.
+ * @param cleaner - pre-created snapshot cleaner.
+ * @param tuple - tuple to clean.
+ * @return cleaned tuple, can be NULL.
+ */
+static inline struct tuple *
+txm_snapshot_clarify(struct txm_snapshot_cleaner *cleaner,
+ struct tuple *tuple)
+{
+ if (cleaner->ht == NULL)
+ return tuple;
+ return txm_snapshot_clarify_slow(cleaner, tuple);
+}
+
+/**
+ * Free resources.in shapshot @cleaner.
+ */
+void
+txm_snapshot_cleaner_destroy(struct txm_snapshot_cleaner *cleaner);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
--
2.7.4
More information about the Tarantool-patches
mailing list