From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 199D144532C for ; Wed, 15 Jul 2020 16:55:46 +0300 (MSK) Received: by smtpng3.m.smailru.net with esmtpa (envelope-from ) id 1jvhsv-0002ht-Ib for tarantool-patches@dev.tarantool.org; Wed, 15 Jul 2020 16:55:45 +0300 From: Aleksandr Lyapunov Date: Wed, 15 Jul 2020 16:55:35 +0300 Message-Id: <1594821336-14468-13-git-send-email-alyapunov@tarantool.org> In-Reply-To: <1594821336-14468-1-git-send-email-alyapunov@tarantool.org> References: <1594821336-14468-1-git-send-email-alyapunov@tarantool.org> Subject: [Tarantool-patches] [PATCH v3 12/13] txm: clarify all fetched tuples List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org If a tuple fetched from an index is dirty - it must be clarified. Let's fix all fetched from indexeds in that way. Also fix a snapshot iterator - it must save a part of history along with creating a read view in order to clean tuple during iteration from another thread. Part of #4897 --- src/box/memtx_bitset.c | 30 ++++--- src/box/memtx_hash.c | 79 ++++++++++++++--- src/box/memtx_rtree.c | 30 ++++++- src/box/memtx_tree.c | 119 +++++++++++++++++++++---- src/box/space.c | 2 + src/box/space.h | 4 + src/box/txn.c | 230 +++++++++++++++++++++++++++++++++++++++++++++---- src/box/txn.h | 64 +++++++++++++- 8 files changed, 499 insertions(+), 59 deletions(-) diff --git a/src/box/memtx_bitset.c b/src/box/memtx_bitset.c index 67eaf6f..68845c5 100644 --- a/src/box/memtx_bitset.c +++ b/src/box/memtx_bitset.c @@ -39,7 +39,9 @@ #include "bitset/index.h" #include "fiber.h" #include "index.h" +#include "schema.h" #include "tuple.h" +#include "txn.h" #include "memtx_engine.h" struct memtx_bitset_index { @@ -198,19 +200,27 @@ bitset_index_iterator_next(struct iterator *iterator, struct tuple **ret) assert(iterator->free == bitset_index_iterator_free); struct bitset_index_iterator *it = bitset_index_iterator(iterator); - size_t value = tt_bitset_iterator_next(&it->bitset_it); - if (value == SIZE_MAX) { - *ret = NULL; - return 0; - } - + do { + size_t value = tt_bitset_iterator_next(&it->bitset_it); + if (value == SIZE_MAX) { + *ret = NULL; + return 0; + } #ifndef OLD_GOOD_BITSET - struct memtx_bitset_index *index = - (struct memtx_bitset_index *)iterator->index; - *ret = memtx_bitset_index_value_to_tuple(index, value); + struct memtx_bitset_index *index = + (struct memtx_bitset_index *)iterator->index; + struct tuple *tuple = + memtx_bitset_index_value_to_tuple(index, value); #else /* #ifndef OLD_GOOD_BITSET */ - *ret = value_to_tuple(value); + struct tuple *tuple = value_to_tuple(value); #endif /* #ifndef OLD_GOOD_BITSET */ + uint32_t iid = iterator->index->def->iid; + struct txn *txn = in_txn(); + struct space *space = space_by_id(iterator->space_id); + bool is_rw = txn != NULL; + *ret = txm_tuple_clarify(txn, space, tuple, iid, 0, is_rw); + } while (*ret == NULL); + return 0; } diff --git a/src/box/memtx_hash.c b/src/box/memtx_hash.c index cdd531c..6e3bc18 100644 --- a/src/box/memtx_hash.c +++ b/src/box/memtx_hash.c @@ -33,9 +33,10 @@ #include "fiber.h" #include "index.h" #include "tuple.h" +#include "txn.h" #include "memtx_engine.h" #include "space.h" -#include "schema.h" /* space_cache_find() */ +#include "schema.h" /* space_by_id(), space_cache_find() */ #include "errinj.h" #include @@ -101,7 +102,7 @@ hash_iterator_free(struct iterator *iterator) } static int -hash_iterator_ge(struct iterator *ptr, struct tuple **ret) +hash_iterator_ge_base(struct iterator *ptr, struct tuple **ret) { assert(ptr->free == hash_iterator_free); struct hash_iterator *it = (struct hash_iterator *) ptr; @@ -113,10 +114,10 @@ hash_iterator_ge(struct iterator *ptr, struct tuple **ret) } static int -hash_iterator_gt(struct iterator *ptr, struct tuple **ret) +hash_iterator_gt_base(struct iterator *ptr, struct tuple **ret) { assert(ptr->free == hash_iterator_free); - ptr->next = hash_iterator_ge; + ptr->next = hash_iterator_ge_base; struct hash_iterator *it = (struct hash_iterator *) ptr; struct memtx_hash_index *index = (struct memtx_hash_index *)ptr->index; struct tuple **res = light_index_iterator_get_and_next(&index->hash_table, @@ -128,6 +129,32 @@ hash_iterator_gt(struct iterator *ptr, struct tuple **ret) return 0; } +#define WRAP_ITERATOR_METHOD(name) \ +static int \ +name(struct iterator *iterator, struct tuple **ret) \ +{ \ + struct txn *txn = in_txn(); \ + struct space *space = space_by_id(iterator->space_id); \ + bool is_rw = txn != NULL; \ + uint32_t iid = iterator->index->def->iid; \ + bool is_first = true; \ + do { \ + int rc = is_first ? name##_base(iterator, ret) \ + : hash_iterator_ge_base(iterator, ret); \ + if (rc != 0 || *ret == NULL) \ + return rc; \ + is_first = false; \ + *ret = txm_tuple_clarify(txn, space, *ret, iid, 0, is_rw); \ + } while (*ret == NULL); \ + return 0; \ +} \ +struct forgot_to_add_semicolon + +WRAP_ITERATOR_METHOD(hash_iterator_ge); +WRAP_ITERATOR_METHOD(hash_iterator_gt); + +#undef WRAP_ITERATOR_METHOD + static int hash_iterator_eq_next(MAYBE_UNUSED struct iterator *it, struct tuple **ret) { @@ -139,7 +166,14 @@ static int hash_iterator_eq(struct iterator *it, struct tuple **ret) { it->next = hash_iterator_eq_next; - return hash_iterator_ge(it, ret); + hash_iterator_ge_base(it, ret); /* always returns zero. */ + if (*ret == NULL) + return 0; + struct txn *txn = in_txn(); + struct space *sp = space_by_id(it->space_id); + bool is_rw = txn != NULL; + *ret = txm_tuple_clarify(txn, sp, *ret, it->index->def->iid, 0, is_rw); + return 0; } /* }}} */ @@ -279,11 +313,17 @@ memtx_hash_index_get(struct index *base, const char *key, part_count == base->def->key_def->part_count); (void) part_count; + struct space *space = space_by_id(base->def->space_id); *result = NULL; uint32_t h = key_hash(key, base->def->key_def); uint32_t k = light_index_find_key(&index->hash_table, h, key); - if (k != light_index_end) - *result = light_index_get(&index->hash_table, k); + if (k != light_index_end) { + struct tuple *tuple = light_index_get(&index->hash_table, k); + uint32_t iid = base->def->iid; + struct txn *txn = in_txn(); + bool is_rw = txn != NULL; + *result = txm_tuple_clarify(txn, space, tuple, iid, 0, is_rw); + } return 0; } @@ -401,6 +441,7 @@ struct hash_snapshot_iterator { struct snapshot_iterator base; struct memtx_hash_index *index; struct light_index_iterator iterator; + struct txm_snapshot_cleaner cleaner; }; /** @@ -418,6 +459,7 @@ hash_snapshot_iterator_free(struct snapshot_iterator *iterator) it->index->base.engine); light_index_iterator_destroy(&it->index->hash_table, &it->iterator); index_unref(&it->index->base); + txm_snapshot_cleaner_destroy(&it->cleaner); free(iterator); } @@ -434,13 +476,24 @@ hash_snapshot_iterator_next(struct snapshot_iterator *iterator, struct hash_snapshot_iterator *it = (struct hash_snapshot_iterator *) iterator; struct light_index_core *hash_table = &it->index->hash_table; - struct tuple **res = light_index_iterator_get_and_next(hash_table, - &it->iterator); - if (res == NULL) { - *data = NULL; - return 0; + + while (true) { + struct tuple **res = + light_index_iterator_get_and_next(hash_table, + &it->iterator); + if (res == NULL) { + *data = NULL; + return 0; + } + + struct tuple *tuple = *res; + tuple = txm_snapshot_clarify(&it->cleaner, tuple); + + if (tuple != NULL) { + *data = tuple_data_range(*res, size); + return 0; + } } - *data = tuple_data_range(*res, size); return 0; } diff --git a/src/box/memtx_rtree.c b/src/box/memtx_rtree.c index 612fcb2..c7f1655 100644 --- a/src/box/memtx_rtree.c +++ b/src/box/memtx_rtree.c @@ -40,7 +40,9 @@ #include "trivia/util.h" #include "tuple.h" +#include "txn.h" #include "space.h" +#include "schema.h" #include "memtx_engine.h" struct memtx_rtree_index { @@ -148,7 +150,16 @@ static int index_rtree_iterator_next(struct iterator *i, struct tuple **ret) { struct index_rtree_iterator *itr = (struct index_rtree_iterator *)i; - *ret = (struct tuple *)rtree_iterator_next(&itr->impl); + do { + *ret = (struct tuple *) rtree_iterator_next(&itr->impl); + if (*ret == NULL) + break; + uint32_t iid = i->index->def->iid; + struct txn *txn = in_txn(); + struct space *space = space_by_id(i->space_id); + bool is_rw = txn != NULL; + *ret = txm_tuple_clarify(txn, space, *ret, iid, 0, is_rw); + } while (*ret == NULL); return 0; } @@ -213,8 +224,21 @@ memtx_rtree_index_get(struct index *base, const char *key, unreachable(); *result = NULL; - if (rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator)) - *result = (struct tuple *)rtree_iterator_next(&iterator); + if (!rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator)) { + rtree_iterator_destroy(&iterator); + return 0; + } + do { + struct tuple *tuple = (struct tuple *) + rtree_iterator_next(&iterator); + if (tuple == NULL) + break; + uint32_t iid = base->def->iid; + struct txn *txn = in_txn(); + struct space *space = space_by_id(base->def->space_id); + bool is_rw = txn != NULL; + *result = txm_tuple_clarify(txn, space, tuple, iid, 0, is_rw); + } while (*result == NULL); rtree_iterator_destroy(&iterator); return 0; } diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c index 76ff3fc..fe93467 100644 --- a/src/box/memtx_tree.c +++ b/src/box/memtx_tree.c @@ -31,12 +31,13 @@ #include "memtx_tree.h" #include "memtx_engine.h" #include "space.h" -#include "schema.h" /* space_cache_find() */ +#include "schema.h" /* space_by_id(), space_cache_find() */ #include "errinj.h" #include "memory.h" #include "fiber.h" #include "key_list.h" #include "tuple.h" +#include "txn.h" #include #include @@ -175,7 +176,7 @@ tree_iterator_dummie(struct iterator *iterator, struct tuple **ret) } static int -tree_iterator_next(struct iterator *iterator, struct tuple **ret) +tree_iterator_next_base(struct iterator *iterator, struct tuple **ret) { struct memtx_tree_index *index = (struct memtx_tree_index *)iterator->index; @@ -205,7 +206,7 @@ tree_iterator_next(struct iterator *iterator, struct tuple **ret) } static int -tree_iterator_prev(struct iterator *iterator, struct tuple **ret) +tree_iterator_prev_base(struct iterator *iterator, struct tuple **ret) { struct memtx_tree_index *index = (struct memtx_tree_index *)iterator->index; @@ -234,7 +235,7 @@ tree_iterator_prev(struct iterator *iterator, struct tuple **ret) } static int -tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret) +tree_iterator_next_equal_base(struct iterator *iterator, struct tuple **ret) { struct memtx_tree_index *index = (struct memtx_tree_index *)iterator->index; @@ -270,7 +271,7 @@ tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret) } static int -tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret) +tree_iterator_prev_equal_base(struct iterator *iterator, struct tuple **ret) { struct memtx_tree_index *index = (struct memtx_tree_index *)iterator->index; @@ -304,6 +305,47 @@ tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret) return 0; } +#define WRAP_ITERATOR_METHOD(name) \ +static int \ +name(struct iterator *iterator, struct tuple **ret) \ +{ \ + struct memtx_tree *tree = \ + &((struct memtx_tree_index *)iterator->index)->tree; \ + struct tree_iterator *it = tree_iterator(iterator); \ + struct memtx_tree_iterator *ti = &it->tree_iterator; \ + uint32_t iid = iterator->index->def->iid; \ + bool is_multikey = iterator->index->def->key_def->is_multikey; \ + struct txn *txn = in_txn(); \ + struct space *space = space_by_id(iterator->space_id); \ + bool is_rw = txn != NULL; \ + do { \ + int rc = name##_base(iterator, ret); \ + if (rc != 0 || *ret == NULL) \ + return rc; \ + uint32_t mk_index = 0; \ + if (is_multikey) { \ + struct memtx_tree_data *check = \ + memtx_tree_iterator_get_elem(tree, ti); \ + assert(check != NULL); \ + mk_index = check->hint; \ + } \ + *ret = txm_tuple_clarify(txn, space, *ret, \ + iid, mk_index, is_rw); \ + } while (*ret == NULL); \ + tuple_unref(it->current.tuple); \ + it->current.tuple = *ret; \ + tuple_ref(it->current.tuple); \ + return 0; \ +} \ +struct forgot_to_add_semicolon + +WRAP_ITERATOR_METHOD(tree_iterator_next); +WRAP_ITERATOR_METHOD(tree_iterator_prev); +WRAP_ITERATOR_METHOD(tree_iterator_next_equal); +WRAP_ITERATOR_METHOD(tree_iterator_prev_equal); + +#undef WRAP_ITERATOR_METHOD + static void tree_iterator_set_next_method(struct tree_iterator *it) { @@ -388,6 +430,22 @@ tree_iterator_start(struct iterator *iterator, struct tuple **ret) tuple_ref(*ret); it->current = *res; tree_iterator_set_next_method(it); + + uint32_t iid = iterator->index->def->iid; + bool is_multikey = iterator->index->def->key_def->is_multikey; + struct txn *txn = in_txn(); + struct space *space = space_by_id(iterator->space_id); + bool is_rw = txn != NULL; + uint32_t mk_index = is_multikey ? res->hint : 0; + *ret = txm_tuple_clarify(txn, space, *ret, iid, mk_index, is_rw); + if (*ret == NULL) { + return iterator->next(iterator, ret); + } else { + tuple_unref(it->current.tuple); + it->current.tuple = *ret; + tuple_ref(it->current.tuple); + } + return 0; } @@ -539,7 +597,16 @@ memtx_tree_index_get(struct index *base, const char *key, key_data.part_count = part_count; key_data.hint = key_hint(key, part_count, cmp_def); struct memtx_tree_data *res = memtx_tree_find(&index->tree, &key_data); - *result = res != NULL ? res->tuple : NULL; + if (res == NULL) { + *result = NULL; + return 0; + } + struct txn *txn = in_txn(); + struct space *space = space_by_id(base->def->space_id); + bool is_rw = txn != NULL; + uint32_t mk_index = base->def->key_def->is_multikey ? res->hint : 0; + *result = txm_tuple_clarify(txn, space, res->tuple, base->def->iid, + mk_index, is_rw); return 0; } @@ -1208,6 +1275,7 @@ struct tree_snapshot_iterator { struct snapshot_iterator base; struct memtx_tree_index *index; struct memtx_tree_iterator tree_iterator; + struct txm_snapshot_cleaner cleaner; }; static void @@ -1220,6 +1288,7 @@ tree_snapshot_iterator_free(struct snapshot_iterator *iterator) it->index->base.engine); memtx_tree_iterator_destroy(&it->index->tree, &it->tree_iterator); index_unref(&it->index->base); + txm_snapshot_cleaner_destroy(&it->cleaner); free(iterator); } @@ -1231,14 +1300,27 @@ tree_snapshot_iterator_next(struct snapshot_iterator *iterator, struct tree_snapshot_iterator *it = (struct tree_snapshot_iterator *)iterator; struct memtx_tree *tree = &it->index->tree; - struct memtx_tree_data *res = memtx_tree_iterator_get_elem(tree, - &it->tree_iterator); - if (res == NULL) { - *data = NULL; - return 0; + + while (true) { + struct memtx_tree_data *res = + memtx_tree_iterator_get_elem(tree, &it->tree_iterator); + + if (res == NULL) { + *data = NULL; + return 0; + } + + memtx_tree_iterator_next(tree, &it->tree_iterator); + + struct tuple *tuple = res->tuple; + tuple = txm_snapshot_clarify(&it->cleaner, tuple); + + if (tuple != NULL) { + *data = tuple_data_range(tuple, size); + return 0; + } } - memtx_tree_iterator_next(tree, &it->tree_iterator); - *data = tuple_data_range(res->tuple, size); + return 0; } @@ -1251,14 +1333,21 @@ static struct snapshot_iterator * memtx_tree_index_create_snapshot_iterator(struct index *base) { struct memtx_tree_index *index = (struct memtx_tree_index *)base; - struct tree_snapshot_iterator *it = (struct tree_snapshot_iterator *) - calloc(1, sizeof(*it)); + struct tree_snapshot_iterator *it = + (struct tree_snapshot_iterator *) calloc(1, sizeof(*it)); if (it == NULL) { diag_set(OutOfMemory, sizeof(struct tree_snapshot_iterator), "memtx_tree_index", "create_snapshot_iterator"); return NULL; } + struct space *space = space_cache_find(base->def->space_id); + if (txm_snapshot_cleaner_create(&it->cleaner, space, + "memtx_tree_index") != 0) { + free(it); + return NULL; + } + it->base.free = tree_snapshot_iterator_free; it->base.next = tree_snapshot_iterator_next; it->index = index; diff --git a/src/box/space.c b/src/box/space.c index 1d375cc..7394316 100644 --- a/src/box/space.c +++ b/src/box/space.c @@ -210,6 +210,7 @@ space_create(struct space *space, struct engine *engine, "constraint_ids"); goto fail; } + rlist_create(&space->txm_stories); return 0; fail_free_indexes: @@ -252,6 +253,7 @@ space_new_ephemeral(struct space_def *def, struct rlist *key_list) void space_delete(struct space *space) { + rlist_del(&space->txm_stories); assert(space->ck_constraint_trigger == NULL); for (uint32_t j = 0; j <= space->index_id_max; j++) { struct index *index = space->index_map[j]; diff --git a/src/box/space.h b/src/box/space.h index bbdd3ef..9dcc4e7 100644 --- a/src/box/space.h +++ b/src/box/space.h @@ -239,6 +239,10 @@ struct space { * Hash table with constraint identifiers hashed by name. */ struct mh_strnptr_t *constraint_ids; + /** + * List of all tx stories in the space. + */ + struct rlist txm_stories; }; /** Initialize a base space instance. */ diff --git a/src/box/txn.c b/src/box/txn.c index 4c46b60..b4888b3 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -101,6 +101,20 @@ struct tx_conflict_tracker { struct rlist in_conflicted_by_list; }; +/** + * Record that links transaction and a story that the transaction have read. + */ +struct tx_read_tracker { + /** The TX that read story. */ + struct txn *reader; + /** The story that was read by reader. */ + struct txm_story *story; + /** Link in story->reader_list. */ + struct rlist in_reader_list; + /** Link in reader->read_set. */ + struct rlist in_read_set; +}; + double too_long_threshold; /* Txn cache. */ @@ -263,6 +277,7 @@ txn_new(void) } assert(region_used(®ion) == sizeof(*txn)); txn->region = region; + rlist_create(&txn->read_set); rlist_create(&txn->conflict_list); rlist_create(&txn->conflicted_by_list); rlist_create(&txn->in_read_view_txs); @@ -275,6 +290,14 @@ txn_new(void) inline static void txn_free(struct txn *txn) { + struct tx_read_tracker *tracker, *tmp; + rlist_foreach_entry_safe(tracker, &txn->read_set, + in_read_set, tmp) { + rlist_del(&tracker->in_reader_list); + rlist_del(&tracker->in_read_set); + } + assert(rlist_empty(&txn->read_set)); + struct tx_conflict_tracker *entry, *next; rlist_foreach_entry_safe(entry, &txn->conflict_list, in_conflict_list, next) { @@ -1430,6 +1453,7 @@ txm_story_new(struct space *space, struct tuple *tuple) story->del_psn = 0; rlist_create(&story->reader_list); rlist_add_tail(&txm.all_stories, &story->in_all_stories); + rlist_add(&space->txm_stories, &story->in_space_stories); memset(story->link, 0, sizeof(story->link[0]) * index_count); return story; } @@ -1963,26 +1987,32 @@ txm_history_prepare_stmt(struct txn_stmt *stmt) i++; continue; } + bool old_story_is_prepared = false; struct txm_story *old_story = story->link[i].older.story; if (old_story->del_psn != 0) { - /* is psn is set, the change is prepared. */ - i++; - continue; - } - if (old_story->add_psn != 0) { - /* is psn is set, the change is prepared. */ - i++; - continue; - } - - if (old_story->add_stmt != NULL) { + /* if psn is set, the change is prepared. */ + old_story_is_prepared = true; + } else if (old_story->add_psn != 0) { + /* if psn is set, the change is prepared. */ + old_story_is_prepared = true; + } else if (old_story->add_stmt != NULL) { /* ancient. */ - i++; - continue; - } - if (old_story->add_stmt->txn == stmt->txn) { + old_story_is_prepared = true; + } else if (old_story->add_stmt->txn == stmt->txn) { /* added by us. */ + } + + if (old_story_is_prepared) { + struct tx_read_tracker *tracker; + rlist_foreach_entry(tracker, &old_story->reader_list, + in_reader_list) { + if (tracker->reader == stmt->txn) + continue; + if (tracker->reader->status != TXN_INPROGRESS) + continue; + tracker->reader->status = TXN_CONFLICTED; + } i++; continue; } @@ -2091,7 +2121,6 @@ txm_tuple_clarify_slow(struct txn *txn, struct space *space, struct tuple *tuple, uint32_t index, uint32_t mk_index, bool is_prepared_ok) { - (void)space; assert(tuple->is_dirty); struct txm_story *story = txm_story_get(tuple); bool own_change = false; @@ -2109,7 +2138,8 @@ txm_tuple_clarify_slow(struct txn *txn, struct space *space, break; } } - (void)own_change; /* TODO: add conflict */ + if (!own_change) + txm_track_read(txn, space, tuple); (void)mk_index; /* TODO: multiindex */ return result; } @@ -2123,6 +2153,7 @@ txm_story_delete(struct txm_story *story) if (txm.traverse_all_stories == &story->in_all_stories) txm.traverse_all_stories = rlist_next(txm.traverse_all_stories); rlist_del(&story->in_all_stories); + rlist_del(&story->in_space_stories); mh_int_t pos = mh_history_find(txm.history, story->tuple, 0); assert(pos != mh_end(txm.history)); @@ -2143,3 +2174,168 @@ txm_story_delete(struct txm_story *story) struct mempool *pool = &txm.txm_story_pool[story->index_count]; mempool_free(pool, story); } + + +static uint32_t +txm_snapshot_cleaner_hash(const struct tuple *a) +{ + uintptr_t u = (uintptr_t)a; + if (sizeof(uintptr_t) <= sizeof(uint32_t)) + return u; + else + return u ^ (u >> 32); +} + +struct txm_snapshot_cleaner_entry +{ + struct tuple *from; + struct tuple *to; +}; + +#define mh_name _snapshot_cleaner +#define mh_key_t struct tuple * +#define mh_node_t struct txm_snapshot_cleaner_entry +#define mh_arg_t int +#define mh_hash(a, arg) (txm_snapshot_cleaner_hash((a)->from)) +#define mh_hash_key(a, arg) (txm_snapshot_cleaner_hash(a)) +#define mh_cmp(a, b, arg) (((a)->from) != ((b)->from)) +#define mh_cmp_key(a, b, arg) ((a) != ((b)->from)) +#define MH_SOURCE +#include "salad/mhash.h" + +int +txm_snapshot_cleaner_create(struct txm_snapshot_cleaner *cleaner, + struct space *space, const char *index_name) +{ + cleaner->ht = NULL; + if (space == NULL || rlist_empty(&space->txm_stories)) + return 0; + struct mh_snapshot_cleaner_t *ht = mh_snapshot_cleaner_new(); + if (ht == NULL) { + diag_set(OutOfMemory, sizeof(*ht), + index_name, "snapshot cleaner"); + free(ht); + return -1; + } + + struct txm_story *story; + rlist_foreach_entry(story, &space->txm_stories, in_space_stories) { + struct tuple *tuple = story->tuple; + struct tuple *clean = + txm_tuple_clarify_slow(NULL, space, tuple, 0, 0, true); + if (clean == tuple) + continue; + + struct txm_snapshot_cleaner_entry entry; + entry.from = tuple; + entry.to = clean; + mh_int_t res = mh_snapshot_cleaner_put(ht, &entry, NULL, 0); + if (res == mh_end(ht)) { + diag_set(OutOfMemory, sizeof(entry), + index_name, "snapshot rollback entry"); + mh_snapshot_cleaner_delete(ht); + return -1; + } + } + + cleaner->ht = ht; + return 0; +} + +struct tuple * +txm_snapshot_clarify_slow(struct txm_snapshot_cleaner *cleaner, + struct tuple *tuple) +{ + assert(cleaner->ht != NULL); + + struct mh_snapshot_cleaner_t *ht = cleaner->ht; + while (true) { + mh_int_t pos = mh_snapshot_cleaner_find(ht, tuple, 0); + if (pos == mh_end(ht)) + break; + struct txm_snapshot_cleaner_entry *entry = + mh_snapshot_cleaner_node(ht, pos); + assert(entry->from == tuple); + tuple = entry->to; + } + + return tuple; +} + + +void +txm_snapshot_cleaner_destroy(struct txm_snapshot_cleaner *cleaner) +{ + if (cleaner->ht != NULL) + mh_snapshot_cleaner_delete(cleaner->ht); +} + +int +txm_track_read(struct txn *txn, struct space *space, struct tuple *tuple) +{ + if (tuple == NULL) + return 0; + if (txn == NULL) + return 0; + if (space == NULL) + return 0; + + struct txm_story *story; + struct tx_read_tracker *tracker = NULL; + + if (!tuple->is_dirty) { + story = txm_story_new(space, tuple); + if (story != NULL) + return -1; + size_t sz; + tracker = region_alloc_object(&txn->region, + struct tx_read_tracker, &sz); + if (tracker == NULL) { + diag_set(OutOfMemory, sz, "tx region", "read_tracker"); + txm_story_delete(story); + return -1; + } + tracker->reader = txn; + tracker->story = story; + rlist_add(&story->reader_list, &tracker->in_reader_list); + rlist_add(&txn->read_set, &tracker->in_read_set); + return 0; + } + story = txm_story_get(tuple); + + struct rlist *r1 = story->reader_list.next; + struct rlist *r2 = txn->read_set.next; + while (r1 != &story->reader_list && r2 != &txn->read_set) { + tracker = rlist_entry(r1, struct tx_read_tracker, + in_reader_list); + assert(tracker->story == story); + if (tracker->reader == txn) + break; + tracker = rlist_entry(r2, struct tx_read_tracker, + in_read_set); + assert(tracker->reader == txn); + if (tracker->story == story) + break; + tracker = NULL; + r1 = r1->next; + r2 = r2->next; + } + if (tracker != NULL) { + /* Move to the beginning of a list for faster further lookups.*/ + rlist_del(&tracker->in_reader_list); + rlist_del(&tracker->in_read_set); + } else { + size_t sz; + tracker = region_alloc_object(&txn->region, + struct tx_read_tracker, &sz); + if (tracker == NULL) { + diag_set(OutOfMemory, sz, "tx region", "read_tracker"); + return -1; + } + tracker->reader = txn; + tracker->story = story; + } + rlist_add(&story->reader_list, &tracker->in_reader_list); + rlist_add(&txn->read_set, &tracker->in_read_set); + return 0; +} diff --git a/src/box/txn.h b/src/box/txn.h index e294497..5fe87c5 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -379,6 +379,8 @@ struct txn { * Link in tx_manager::read_view_txs. */ struct rlist in_read_view_txs; + /** List of tx_read_trackers with stories that the TX have read. */ + struct rlist read_set; }; /** @@ -453,6 +455,10 @@ struct txm_story { */ struct rlist in_all_stories; /** + * Link in space::txm_stories. + */ + struct rlist in_space_stories; + /** * Number of indexes in this space - and the count of link[]. */ uint32_t index_count; @@ -913,6 +919,13 @@ txm_history_prepare_stmt(struct txn_stmt *stmt); ssize_t txm_history_commit_stmt(struct txn_stmt *stmt); +/** + * Record in TX manager that a transaction @txn have read a @tuple in @space. + * @return 0 on success, -1 on memory error. + */ +int +txm_track_read(struct txn *txn, struct space *space, struct tuple *tuple); + /** Helper of txm_tuple_clarify */ struct tuple * txm_tuple_clarify_slow(struct txn *txn, struct space *space, @@ -936,12 +949,61 @@ txm_tuple_clarify(struct txn *txn, struct space *space, { if (!tx_manager_use_mvcc_engine) return tuple; - if (!tuple->is_dirty) + if (!tuple->is_dirty) { + txm_track_read(txn, space, tuple); return tuple; + } return txm_tuple_clarify_slow(txn, space, tuple, index, mk_index, is_prepared_ok); } +/** + * Snapshot cleaner is a short part of history that is supposed to clarify + * tuples in a index snapshot. It's also supposed to be used in another + * thread while common clarify would probably crash in that case. + */ +struct txm_snapshot_cleaner { + struct mh_snapshot_cleaner_t *ht; +}; + +/** + * Create a snapshot cleaner. + * @param cleaner - cleaner to create. + * @param space - space for which the cleaner must be created. + * @param index_name - name of index for diag in case of memory error. + * @return 0 on success, -1 on memory erorr. + */ +int +txm_snapshot_cleaner_create(struct txm_snapshot_cleaner *cleaner, + struct space *space, const char *index_name); + +/** Helper of txm_snapshot_clafify. */ +struct tuple * +txm_snapshot_clarify_slow(struct txm_snapshot_cleaner *cleaner, + struct tuple *tuple); + +/** + * Like a common clarify that function returns proper tuple if original + * tuple in index is dirty. + * @param cleaner - pre-created snapshot cleaner. + * @param tuple - tuple to clean. + * @return cleaned tuple, can be NULL. + */ +static inline struct tuple * +txm_snapshot_clarify(struct txm_snapshot_cleaner *cleaner, + struct tuple *tuple) +{ + if (cleaner->ht == NULL) + return tuple; + return txm_snapshot_clarify_slow(cleaner, tuple); +} + +/** + * Free resources.in shapshot @cleaner. + */ +void +txm_snapshot_cleaner_destroy(struct txm_snapshot_cleaner *cleaner); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ -- 2.7.4