From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 54832446441 for ; Tue, 8 Sep 2020 13:22:17 +0300 (MSK) Received: by smtpng1.m.smailru.net with esmtpa (envelope-from ) id 1kFalU-0006tc-NS for tarantool-patches@dev.tarantool.org; Tue, 08 Sep 2020 13:22:17 +0300 From: Aleksandr Lyapunov Date: Tue, 8 Sep 2020 13:22:07 +0300 Message-Id: <1599560532-27089-8-git-send-email-alyapunov@tarantool.org> In-Reply-To: <1599560532-27089-1-git-send-email-alyapunov@tarantool.org> References: <1599560532-27089-1-git-send-email-alyapunov@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Memtx story is a part of a history of a value in space. It's a story about a tuple, from the point it was added to space to the point when it was deleted from the space. All stories are linked into a list of stories of the same key of each index. Part of #4897 --- src/box/memtx_tx.c | 969 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/box/memtx_tx.h | 217 ++++++++++++ src/box/space.c | 2 + src/box/space.h | 4 + src/box/txn.c | 12 + src/box/txn.h | 24 ++ 6 files changed, 1228 insertions(+) diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c index eb2ab51..dfad6f7 100644 --- a/src/box/memtx_tx.c +++ b/src/box/memtx_tx.c @@ -35,6 +35,29 @@ #include #include "txn.h" +#include "schema_def.h" +#include "small/mempool.h" + +static uint32_t +memtx_tx_story_key_hash(const struct tuple *a) +{ + uintptr_t u = (uintptr_t)a; + if (sizeof(uintptr_t) <= sizeof(uint32_t)) + return u; + else + return u ^ (u >> 32); +} + +#define mh_name _history +#define mh_key_t struct tuple * +#define mh_node_t struct memtx_story * +#define mh_arg_t int +#define mh_hash(a, arg) (memtx_tx_story_key_hash((*(a))->tuple)) +#define mh_hash_key(a, arg) (memtx_tx_story_key_hash(a)) +#define mh_cmp(a, b, arg) ((*(a))->tuple != (*(b))->tuple) +#define mh_cmp_key(a, b, arg) ((a) != (*(b))->tuple) +#define MH_SOURCE +#include "salad/mhash.h" struct tx_manager { @@ -44,6 +67,23 @@ struct tx_manager * so the list is ordered by rv_psn. */ struct rlist read_view_txs; + /** Mempools for tx_story objects with different index count. */ + struct mempool memtx_tx_story_pool[BOX_INDEX_MAX]; + /** Hash table tuple -> memtx_story of that tuple. */ + struct mh_history_t *history; + /** List of all memtx_story objects. */ + struct rlist all_stories; + /** Iterator that sequentially traverses all memtx_story objects. */ + struct rlist *traverse_all_stories; +}; + +enum { + /** + * Number of iterations that is allowed for TX manager to do for + * searching and deleting no more used memtx_tx_stories per creation of + * a new story. + */ + TX_MANAGER_GC_STEPS_SIZE = 2, }; /** That's a definition, see declaration for description. */ @@ -56,6 +96,15 @@ void memtx_tx_manager_init() { rlist_create(&txm.read_view_txs); + for (size_t i = 0; i < BOX_INDEX_MAX; i++) { + size_t item_size = sizeof(struct memtx_story) + + i * sizeof(struct memtx_story_link); + mempool_create(&txm.memtx_tx_story_pool[i], + cord_slab_cache(), item_size); + } + txm.history = mh_history_new(); + rlist_create(&txm.all_stories); + txm.traverse_all_stories = &txm.all_stories; } void @@ -136,3 +185,923 @@ memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim) victim->status = TXN_CONFLICTED; } } + +/** See definition for details */ +static void +memtx_tx_story_gc_step(); + +/** + * Create a new story and link it with the @tuple. + * @return story on success, NULL on error (diag is set). + */ +static struct memtx_story * +memtx_tx_story_new(struct space *space, struct tuple *tuple) +{ + /* Free some memory. */ + for (size_t i = 0; i < TX_MANAGER_GC_STEPS_SIZE; i++) + memtx_tx_story_gc_step(); + assert(!tuple->is_dirty); + uint32_t index_count = space->index_count; + assert(index_count < BOX_INDEX_MAX); + struct mempool *pool = &txm.memtx_tx_story_pool[index_count]; + struct memtx_story *story = (struct memtx_story *) mempool_alloc(pool); + if (story == NULL) { + size_t item_size = sizeof(struct memtx_story) + + index_count * + sizeof(struct memtx_story_link); + diag_set(OutOfMemory, item_size, "mempool_alloc", "story"); + return NULL; + } + story->tuple = tuple; + + const struct memtx_story **put_story = + (const struct memtx_story **) &story; + struct memtx_story **empty = NULL; + mh_int_t pos = mh_history_put(txm.history, put_story, &empty, 0); + if (pos == mh_end(txm.history)) { + mempool_free(pool, story); + diag_set(OutOfMemory, pos + 1, "mh_history_put", + "mh_history_node"); + return NULL; + } + tuple->is_dirty = true; + tuple_ref(tuple); + + story->space = space; + story->index_count = index_count; + story->add_stmt = NULL; + story->add_psn = 0; + story->del_stmt = NULL; + story->del_psn = 0; + rlist_create(&story->reader_list); + rlist_add_tail(&txm.all_stories, &story->in_all_stories); + rlist_add(&space->memtx_stories, &story->in_space_stories); + memset(story->link, 0, sizeof(story->link[0]) * index_count); + return story; +} + +static void +memtx_tx_story_delete(struct memtx_story *story); + +/** + * Create a new story of a @tuple that was added by @stmt. + * @return story on success, NULL on error (diag is set). + */ +static struct memtx_story * +memtx_tx_story_new_add_stmt(struct tuple *tuple, struct txn_stmt *stmt) +{ + struct memtx_story *res = memtx_tx_story_new(stmt->space, tuple); + if (res == NULL) + return NULL; + res->add_stmt = stmt; + assert(stmt->add_story == NULL); + stmt->add_story = res; + return res; +} + +/** + * Create a new story of a @tuple that was deleted by @stmt. + * @return story on success, NULL on error (diag is set). + */ +static struct memtx_story * +memtx_tx_story_new_del_stmt(struct tuple *tuple, struct txn_stmt *stmt) +{ + struct memtx_story *res = memtx_tx_story_new(stmt->space, tuple); + if (res == NULL) + return NULL; + res->del_stmt = stmt; + assert(stmt->del_story == NULL); + stmt->del_story = res; + return res; +} + +/** + * Undo memtx_tx_story_new_add_stmt. + */ +static void +memtx_tx_story_delete_add_stmt(struct memtx_story *story) +{ + story->add_stmt->add_story = NULL; + story->add_stmt = NULL; + memtx_tx_story_delete(story); +} + +/** + * Undo memtx_tx_story_new_del_stmt. + */ +static void +memtx_tx_story_delete_del_stmt(struct memtx_story *story) +{ + story->del_stmt->del_story = NULL; + story->del_stmt = NULL; + memtx_tx_story_delete(story); +} + + +/** + * Find a story of a @tuple. The story expected to be present (assert). + */ +static struct memtx_story * +memtx_tx_story_get(struct tuple *tuple) +{ + assert(tuple->is_dirty); + + mh_int_t pos = mh_history_find(txm.history, tuple, 0); + assert(pos != mh_end(txm.history)); + return *mh_history_node(txm.history, pos); +} + +/** + * Get the older tuple, extracting it from older story if necessary. + */ +static struct tuple * +memtx_tx_story_older_tuple(struct memtx_story_link *link) +{ + return link->older.is_story ? link->older.story->tuple + : link->older.tuple; +} + +/** + * Link a @story with @older_story in @index (in both directions). + */ +static void +memtx_tx_story_link_story(struct memtx_story *story, + struct memtx_story *older_story, + uint32_t index) +{ + assert(older_story != NULL); + struct memtx_story_link *link = &story->link[index]; + /* Must be unlinked. */ + assert(!link->older.is_story); + assert(link->older.tuple == NULL); + link->older.is_story = true; + link->older.story = older_story; + older_story->link[index].newer_story = story; +} + +/** + * Link a @story with older @tuple in @index. In case if the tuple is dirty - + * find and link with the corresponding story. + */ +static void +memtx_tx_story_link_tuple(struct memtx_story *story, + struct tuple *older_tuple, + uint32_t index) +{ + struct memtx_story_link *link = &story->link[index]; + /* Must be unlinked. */ + assert(!link->older.is_story); + assert(link->older.tuple == NULL); + if (older_tuple == NULL) + return; + if (older_tuple->is_dirty) { + memtx_tx_story_link_story(story, + memtx_tx_story_get(older_tuple), + index); + return; + } + link->older.tuple = older_tuple; + tuple_ref(link->older.tuple); +} + +/** + * Unlink a @story with older story/tuple in @index. + */ +static void +memtx_tx_story_unlink(struct memtx_story *story, uint32_t index) +{ + struct memtx_story_link *link = &story->link[index]; + if (link->older.is_story) { + link->older.story->link[index].newer_story = NULL; + } else if (link->older.tuple != NULL) { + tuple_unref(link->older.tuple); + link->older.tuple = NULL; + } + link->older.is_story = false; + link->older.tuple = NULL; +} + +/** + * Run one step of a crawler that traverses all stories and removes no more + * used stories. + */ +static void +memtx_tx_story_gc_step() +{ + if (txm.traverse_all_stories == &txm.all_stories) { + /* We came to the head of the list. */ + txm.traverse_all_stories = txm.traverse_all_stories->next; + return; + } + + /* Lowest read view PSN */ + int64_t lowest_rv_psm = txn_last_psn; + if (!rlist_empty(&txm.read_view_txs)) { + struct txn *txn = + rlist_first_entry(&txm.read_view_txs, struct txn, + in_read_view_txs); + assert(txn->rv_psn != 0); + lowest_rv_psm = txn->rv_psn; + } + + struct memtx_story *story = + rlist_entry(txm.traverse_all_stories, struct memtx_story, + in_all_stories); + txm.traverse_all_stories = txm.traverse_all_stories->next; + + if (story->add_stmt != NULL || story->del_stmt != NULL || + !rlist_empty(&story->reader_list)) { + /* The story is used directly by some transactions. */ + return; + } + if (story->add_psn >= lowest_rv_psm || + story->del_psn >= lowest_rv_psm) { + /* The story can be used by a read view. */ + return; + } + + /* Unlink and delete the story */ + for (uint32_t i = 0; i < story->index_count; i++) { + struct memtx_story_link *link = &story->link[i]; + if (link->newer_story == NULL) { + /* + * We are at the top of the chain. That means + * that story->tuple is in index. If the story is + * actually delete the tuple, it must be deleted from + * index. + */ + if (story->del_psn > 0) { + struct index *index = story->space->index[i]; + struct tuple *unused; + if (index_replace(index, story->tuple, NULL, + DUP_INSERT, &unused) != 0) { + diag_log(); + unreachable(); + panic("failed to rollback change"); + } + assert(story->tuple == unused); + } + memtx_tx_story_unlink(story, i); + } else { + link->newer_story->link[i].older = link->older; + link->older.is_story = false; + link->older.story = NULL; + link->newer_story = NULL; + } + } + + memtx_tx_story_delete(story); +} + +/** + * Check if a @story is visible for transaction @txn. Return visible tuple to + * @visible_tuple (can be set to NULL). + * @param is_prepared_ok - whether prepared (not committed) change is acceptable. + * @param own_change - return true if the change was made by @txn itself. + * @return true if the story is visible, false otherwise. + */ +static bool +memtx_tx_story_is_visible(struct memtx_story *story, struct txn *txn, + struct tuple **visible_tuple, bool is_prepared_ok, + bool *own_change) +{ + *own_change = false; + *visible_tuple = NULL; + + int64_t rv_psn = INT64_MAX; + if (txn != NULL && txn->rv_psn != 0) + rv_psn = txn->rv_psn; + + struct txn_stmt *dels = story->del_stmt; + while (dels != NULL) { + if (dels->txn == txn) { + /* Tuple is deleted by us (@txn). */ + *own_change = true; + return true; + } + dels = dels->next_in_del_list; + } + if (is_prepared_ok && story->del_psn != 0 && story->del_psn < rv_psn) { + /* Tuple is deleted by prepared TX. */ + return true; + } + if (story->del_psn != 0 && story->del_stmt == NULL && + story->del_psn < rv_psn) { + /* Tuple is deleted by committed TX. */ + return true; + } + + if (story->add_stmt != NULL && story->add_stmt->txn == txn) { + /* Tuple is added by us (@txn). */ + *visible_tuple = story->tuple; + *own_change = true; + return true; + } + if (is_prepared_ok && story->add_psn != 0 && story->add_psn < rv_psn) { + /* Tuple is added by another prepared TX. */ + *visible_tuple = story->tuple; + return true; + } + if (story->add_psn != 0 && story->add_stmt == NULL && + story->add_psn < rv_psn) { + /* Tuple is added by committed TX. */ + *visible_tuple = story->tuple; + return true; + } + if (story->add_psn == 0 && story->add_stmt == NULL) { + /* added long time ago. */ + *visible_tuple = story->tuple; + return true; + } + return false; +} + +/** + * Temporary (allocated on region) struct that stores a conflicting TX. + */ +struct memtx_tx_conflict +{ + /* The thansaction that will conflict us upon commit. */ + struct txn *breaker; + /* Link in single-linked list. */ + struct memtx_tx_conflict *next; +}; + +/** + * Save @breaker in list with head @conflicts_head. New list node is allocated + * on @region. + * @return 0 on success, -1 on memory error. + */ +static int +memtx_tx_save_conflict(struct txn *breaker, + struct memtx_tx_conflict **conflicts_head, + struct region *region) +{ + size_t err_size; + struct memtx_tx_conflict *next_conflict; + next_conflict = region_alloc_object(region, struct memtx_tx_conflict, + &err_size); + if (next_conflict == NULL) { + diag_set(OutOfMemory, err_size, "txn_region", "txn conflict"); + return -1; + } + next_conflict->breaker = breaker; + next_conflict->next = *conflicts_head; + *conflicts_head = next_conflict; + return 0; +} + +/** + * Scan a history starting by @stmt statement in @index for a visible tuple + * (prepared suits), returned via @visible_replaced. + * Collect a list of transactions that will abort current transaction if they + * are committed. + * + * @return 0 on success, -1 on memory error. + */ +static int +memtx_tx_story_find_visible(struct memtx_story *story, struct txn_stmt *stmt, + uint32_t index, struct tuple **visible_replaced, + struct memtx_tx_conflict **collected_conflicts, + struct region *region) +{ + while (true) { + if (!story->link[index].older.is_story) { + /* The tuple is so old that we don't know its story. */ + *visible_replaced = story->link[index].older.tuple; + assert(*visible_replaced == NULL || + !(*visible_replaced)->is_dirty); + break; + } + story = story->link[index].older.story; + bool unused; + if (memtx_tx_story_is_visible(story, stmt->txn, + visible_replaced, true, &unused)) + break; + + /* + * We skip the story as invisible but the corresponding TX + * is committed our TX can become conflicted. + * The conflict will be unavoidable if this statement + * relies on old_tuple. If not (it's a replace), + * the conflict will take place only for secondary + * index if the story will not be overwritten in primary + * index. + */ + bool cross_conflict = false; + if (stmt->does_require_old_tuple) { + cross_conflict = true; + } else if (index != 0) { + struct memtx_story *look_up = story; + cross_conflict = true; + while (look_up->link[0].newer_story != NULL) { + struct memtx_story *over; + over = look_up->link[0].newer_story; + if (over->add_stmt->txn == stmt->txn) { + cross_conflict = false; + break; + } + look_up = over; + } + } + if (cross_conflict) { + if (memtx_tx_save_conflict(story->add_stmt->txn, + collected_conflicts, + region) != 0) + return -1; + + } + } + return 0; +} + +int +memtx_tx_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple, + struct tuple *new_tuple, enum dup_replace_mode mode, + struct tuple **result) +{ + assert(new_tuple != NULL || old_tuple != NULL); + struct space *space = stmt->space; + struct memtx_story *add_story = NULL; + uint32_t add_story_linked = 0; + struct memtx_story *del_story = NULL; + bool del_story_created = false; + struct region *region = &stmt->txn->region; + size_t region_svp = region_used(region); + + /* + * List of transactions that will conflict us once one of them + * become committed. + */ + struct memtx_tx_conflict *collected_conflicts = NULL; + + /* Create add_story if necessary. */ + if (new_tuple != NULL) { + add_story = memtx_tx_story_new_add_stmt(new_tuple, stmt); + if (add_story == NULL) + goto fail; + + for (uint32_t i = 0; i < space->index_count; i++) { + struct tuple *replaced; + struct index *index = space->index[i]; + if (index_replace(index, NULL, new_tuple, + DUP_REPLACE_OR_INSERT, + &replaced) != 0) + goto fail; + memtx_tx_story_link_tuple(add_story, replaced, i); + add_story_linked++; + + struct tuple *visible_replaced = NULL; + if (memtx_tx_story_find_visible(add_story, stmt, i, + &visible_replaced, + &collected_conflicts, + region) != 0) + goto fail; + + uint32_t errcode; + errcode = replace_check_dup(old_tuple, visible_replaced, + i == 0 ? mode : DUP_INSERT); + if (errcode != 0) { + if (space != NULL) + diag_set(ClientError, errcode, + index->def->name, + space_name(space)); + goto fail; + } + + if (i == 0) + old_tuple = visible_replaced; + } + } + + /* Create del_story if necessary. */ + struct tuple *del_tuple = NULL; + if (new_tuple != NULL) { + struct memtx_story_link *link = &add_story->link[0]; + if (link->older.is_story) { + del_story = link->older.story; + del_tuple = del_story->tuple; + } else { + del_tuple = link->older.tuple; + } + } else { + del_tuple = old_tuple; + } + if (del_tuple != NULL && del_story == NULL) { + if (del_tuple->is_dirty) { + del_story = memtx_tx_story_get(del_tuple); + } else { + del_story = memtx_tx_story_new_del_stmt(del_tuple, + stmt); + if (del_story == NULL) + goto fail; + del_story_created = true; + } + } + if (new_tuple != NULL && del_story_created) { + for (uint32_t i = 0; i < add_story->index_count; i++) { + struct memtx_story_link *link = &add_story->link[i]; + if (link->older.is_story) + continue; + if (link->older.tuple == del_tuple) { + memtx_tx_story_unlink(add_story, i); + memtx_tx_story_link_story(add_story, del_story, + i); + } + } + } + if (del_story != NULL && !del_story_created) { + stmt->next_in_del_list = del_story->del_stmt; + del_story->del_stmt = stmt; + stmt->del_story = del_story; + } + + /* Purge found conflicts. */ + while (collected_conflicts != NULL) { + if (memtx_tx_cause_conflict(collected_conflicts->breaker, + stmt->txn) != 0) + goto fail; + collected_conflicts = collected_conflicts->next; + } + + /* + * We now reference both new and old tuple because the stmt holds + * pointers to them. + */ + if (stmt->new_tuple != NULL) + tuple_ref(stmt->new_tuple); + *result = old_tuple; + if (*result != NULL) + tuple_ref(*result); + return 0; + + fail: + if (add_story != NULL) { + while (add_story_linked > 0) { + --add_story_linked; + uint32_t i = add_story_linked; + + struct index *index = space->index[i]; + struct memtx_story_link *link = &add_story->link[i]; + struct tuple *was = memtx_tx_story_older_tuple(link); + struct tuple *unused; + if (index_replace(index, new_tuple, was, + DUP_INSERT, + &unused) != 0) { + diag_log(); + unreachable(); + panic("failed to rollback change"); + } + + memtx_tx_story_unlink(stmt->add_story, i); + + } + memtx_tx_story_delete_add_stmt(stmt->add_story); + } + + if (del_story != NULL && del_story->del_stmt == stmt) { + del_story->del_stmt = stmt->next_in_del_list; + stmt->next_in_del_list = NULL; + } + + if (del_story_created) + memtx_tx_story_delete_del_stmt(stmt->del_story); + else + stmt->del_story = NULL; + + region_truncate(region, region_svp); + return -1; +} + +void +memtx_tx_history_rollback_stmt(struct txn_stmt *stmt) +{ + if (stmt->add_story != NULL) { + assert(stmt->add_story->tuple == stmt->new_tuple); + struct memtx_story *story = stmt->add_story; + + for (uint32_t i = 0; i < story->index_count; i++) { + struct memtx_story_link *link = &story->link[i]; + if (link->newer_story == NULL) { + struct tuple *unused; + struct index *index = stmt->space->index[i]; + struct tuple *was = memtx_tx_story_older_tuple(link); + if (index_replace(index, story->tuple, was, + DUP_INSERT, &unused) != 0) { + diag_log(); + unreachable(); + panic("failed to rollback change"); + } + } else { + struct memtx_story *newer = link->newer_story; + assert(newer->link[i].older.is_story); + assert(newer->link[i].older.story == story); + memtx_tx_story_unlink(newer, i); + if (link->older.is_story) { + struct memtx_story *to = link->older.story; + memtx_tx_story_link_story(newer, to, i); + } else { + struct tuple *to = link->older.tuple; + memtx_tx_story_link_tuple(newer, to, i); + } + } + memtx_tx_story_unlink(story, i); + } + stmt->add_story->add_stmt = NULL; + memtx_tx_story_delete(stmt->add_story); + stmt->add_story = NULL; + tuple_unref(stmt->new_tuple); + } + + if (stmt->del_story != NULL) { + struct memtx_story *story = stmt->del_story; + + struct txn_stmt **prev = &story->del_stmt; + while (*prev != stmt) { + prev = &(*prev)->next_in_del_list; + assert(*prev != NULL); + } + *prev = stmt->next_in_del_list; + stmt->next_in_del_list = NULL; + + stmt->del_story->del_stmt = NULL; + stmt->del_story = NULL; + } +} + +void +memtx_tx_history_prepare_stmt(struct txn_stmt *stmt) +{ + assert(stmt->txn->psn != 0); + + /* Move story to the past to prepared stories. */ + + struct memtx_story *story = stmt->add_story; + uint32_t index_count = story == NULL ? 0 : story->index_count; + /* + * Note that if stmt->add_story == NULL, the index_count is set to 0, + * and we will not enter the loop. + */ + for (uint32_t i = 0; i < index_count; ) { + if (!story->link[i].older.is_story) { + /* tuple is old. */ + i++; + continue; + } + bool old_story_is_prepared = false; + struct memtx_story *old_story = story->link[i].older.story; + if (old_story->del_psn != 0) { + /* if psn is set, the change is prepared. */ + old_story_is_prepared = true; + } else if (old_story->add_psn != 0) { + /* if psn is set, the change is prepared. */ + old_story_is_prepared = true; + } else if (old_story->add_stmt == NULL) { + /* ancient. */ + old_story_is_prepared = true; + } else if (old_story->add_stmt->txn == stmt->txn) { + /* added by us. */ + } + + if (old_story_is_prepared) { + struct tx_read_tracker *tracker; + rlist_foreach_entry(tracker, &old_story->reader_list, + in_reader_list) { + if (tracker->reader == stmt->txn) + continue; + if (tracker->reader->status != TXN_INPROGRESS) + continue; + memtx_tx_handle_conflict(stmt->txn, + tracker->reader); + } + i++; + continue; + } + + if (old_story->add_stmt->does_require_old_tuple || i != 0) + old_story->add_stmt->txn->status = TXN_CONFLICTED; + + /* Swap story and old story. */ + struct memtx_story_link *link = &story->link[i]; + if (link->newer_story == NULL) { + /* we have to replace the tuple in index. */ + struct tuple *unused; + struct index *index = stmt->space->index[i]; + if (index_replace(index, story->tuple, old_story->tuple, + DUP_INSERT, &unused) != 0) { + diag_log(); + panic("failed to rollback change"); + } + } else { + struct memtx_story *newer = link->newer_story; + assert(newer->link[i].older.is_story); + assert(newer->link[i].older.story == story); + memtx_tx_story_unlink(newer, i); + memtx_tx_story_link_story(newer, old_story, i); + } + + memtx_tx_story_unlink(story, i); + if (old_story->link[i].older.is_story) { + struct memtx_story *to = + old_story->link[i].older.story; + memtx_tx_story_unlink(old_story, i); + memtx_tx_story_link_story(story, to, i); + } else { + struct tuple *to = + old_story->link[i].older.tuple; + memtx_tx_story_unlink(old_story, i); + memtx_tx_story_link_tuple(story, to, i); + } + + memtx_tx_story_link_story(old_story, story, i); + + if (i == 0) { + assert(stmt->del_story == old_story); + assert(story->link[0].older.is_story || + story->link[0].older.tuple == NULL); + + struct txn_stmt *dels = old_story->del_stmt; + assert(dels != NULL); + do { + if (dels->txn != stmt->txn) + dels->txn->status = TXN_CONFLICTED; + dels->del_story = NULL; + struct txn_stmt *next = dels->next_in_del_list; + dels->next_in_del_list = NULL; + dels = next; + } while (dels != NULL); + old_story->del_stmt = NULL; + + if (story->link[0].older.is_story) { + struct memtx_story *oldest_story = + story->link[0].older.story; + dels = oldest_story->del_stmt; + while (dels != NULL) { + assert(dels->txn != stmt->txn); + dels->del_story = NULL; + struct txn_stmt *next = + dels->next_in_del_list; + dels->next_in_del_list = NULL; + dels = next; + } + oldest_story->del_stmt = stmt; + stmt->del_story = oldest_story; + } + } + } + if (stmt->add_story != NULL) + stmt->add_story->add_psn = stmt->txn->psn; + + if (stmt->del_story != NULL) + stmt->del_story->del_psn = stmt->txn->psn; +} + +ssize_t +memtx_tx_history_commit_stmt(struct txn_stmt *stmt) +{ + size_t res = 0; + if (stmt->add_story != NULL) { + assert(stmt->add_story->add_stmt == stmt); + res += stmt->add_story->tuple->bsize; + stmt->add_story->add_stmt = NULL; + stmt->add_story = NULL; + } + if (stmt->del_story != NULL) { + assert(stmt->del_story->del_stmt == stmt); + assert(stmt->next_in_del_list == NULL); + res -= stmt->del_story->tuple->bsize; + tuple_unref(stmt->del_story->tuple); + stmt->del_story->del_stmt = NULL; + stmt->del_story = NULL; + } + return res; +} + +struct tuple * +memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space, + struct tuple *tuple, uint32_t index, + uint32_t mk_index, bool is_prepared_ok) +{ + assert(tuple->is_dirty); + struct memtx_story *story = memtx_tx_story_get(tuple); + bool own_change = false; + struct tuple *result = NULL; + + while (true) { + if (memtx_tx_story_is_visible(story, txn, &result, + is_prepared_ok, &own_change)) { + break; + } + if (story->link[index].older.is_story) { + story = story->link[index].older.story; + } else { + result = story->link[index].older.tuple; + break; + } + } + if (!own_change) + memtx_tx_track_read(txn, space, tuple); + (void)mk_index; /* TODO: multiindex */ + return result; +} + +static void +memtx_tx_story_delete(struct memtx_story *story) +{ + assert(story->add_stmt == NULL); + assert(story->del_stmt == NULL); + + if (txm.traverse_all_stories == &story->in_all_stories) + txm.traverse_all_stories = rlist_next(txm.traverse_all_stories); + rlist_del(&story->in_all_stories); + rlist_del(&story->in_space_stories); + + mh_int_t pos = mh_history_find(txm.history, story->tuple, 0); + assert(pos != mh_end(txm.history)); + mh_history_del(txm.history, pos, 0); + + story->tuple->is_dirty = false; + tuple_unref(story->tuple); + +#ifndef NDEBUG + /* Expecting to delete fully unlinked story. */ + for (uint32_t i = 0; i < story->index_count; i++) { + assert(story->link[i].newer_story == NULL); + assert(story->link[i].older.is_story == false); + assert(story->link[i].older.tuple == NULL); + } +#endif + + struct mempool *pool = &txm.memtx_tx_story_pool[story->index_count]; + mempool_free(pool, story); +} + +int +memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple) +{ + if (tuple == NULL) + return 0; + if (txn == NULL) + return 0; + if (space == NULL) + return 0; + + struct memtx_story *story; + struct tx_read_tracker *tracker = NULL; + + if (!tuple->is_dirty) { + story = memtx_tx_story_new(space, tuple); + if (story == NULL) + return -1; + size_t sz; + tracker = region_alloc_object(&txn->region, + struct tx_read_tracker, &sz); + if (tracker == NULL) { + diag_set(OutOfMemory, sz, "tx region", "read_tracker"); + memtx_tx_story_delete(story); + return -1; + } + tracker->reader = txn; + tracker->story = story; + rlist_add(&story->reader_list, &tracker->in_reader_list); + rlist_add(&txn->read_set, &tracker->in_read_set); + return 0; + } + story = memtx_tx_story_get(tuple); + + struct rlist *r1 = story->reader_list.next; + struct rlist *r2 = txn->read_set.next; + while (r1 != &story->reader_list && r2 != &txn->read_set) { + tracker = rlist_entry(r1, struct tx_read_tracker, + in_reader_list); + assert(tracker->story == story); + if (tracker->reader == txn) + break; + tracker = rlist_entry(r2, struct tx_read_tracker, + in_read_set); + assert(tracker->reader == txn); + if (tracker->story == story) + break; + tracker = NULL; + r1 = r1->next; + r2 = r2->next; + } + if (tracker != NULL) { + /* Move to the beginning of a list for faster further lookups.*/ + rlist_del(&tracker->in_reader_list); + rlist_del(&tracker->in_read_set); + } else { + size_t sz; + tracker = region_alloc_object(&txn->region, + struct tx_read_tracker, &sz); + if (tracker == NULL) { + diag_set(OutOfMemory, sz, "tx region", "read_tracker"); + return -1; + } + tracker->reader = txn; + tracker->story = story; + } + rlist_add(&story->reader_list, &tracker->in_reader_list); + rlist_add(&txn->read_set, &tracker->in_read_set); + return 0; +} diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h index 6143a22..4670ebe 100644 --- a/src/box/memtx_tx.h +++ b/src/box/memtx_tx.h @@ -31,6 +31,12 @@ */ #include +#include +#include + +#include "small/rlist.h" +#include "index.h" +#include "tuple.h" #include "small/rlist.h" @@ -61,6 +67,119 @@ struct tx_conflict_tracker { }; /** + * Record that links transaction and a story that the transaction have read. + */ +struct tx_read_tracker { + /** The TX that read story. */ + struct txn *reader; + /** The story that was read by reader. */ + struct memtx_story *story; + /** Link in story->reader_list. */ + struct rlist in_reader_list; + /** Link in reader->read_set. */ + struct rlist in_read_set; +}; + +/** + * Pointer to tuple or story. + */ +struct memtx_story_or_tuple { + /** Flag whether it's a story. */ + bool is_story; + union { + /** Pointer to story, it must be reverse liked. */ + struct memtx_story *story; + /** Smart pointer to tuple: the tuple is referenced if set. */ + struct tuple *tuple; + }; +}; + +/** + * Link that connects a memtx_story with older and newer stories of the same + * key in index. + */ +struct memtx_story_link { + /** Story that was happened after that story was ended. */ + struct memtx_story *newer_story; + /** + * Older story or ancient tuple (so old that its story was lost). + * In case of tuple is can also be NULL. + */ + struct memtx_story_or_tuple older; +}; + +/** + * A part of a history of a value in space. + * It's a story about a tuple, from the point it was added to space to the + * point when it was deleted from a space. + * All stories are linked into a list of stories of the same key of each index. + */ +struct memtx_story { + /** The story is about this tuple. The tuple is referenced. */ + + struct tuple *tuple; + /** + * Statement that told this story. Is set to NULL when the statement's + * transaction becomes committed. Can also be NULL if we don't know who + * introduced that story, the tuple was added by a transaction that + * was completed and destroyed some time ago. + */ + struct txn_stmt *add_stmt; + /** + * Prepare sequence number of add_stmt's transaction. Is set when + * the transaction is prepared. Can be 0 if the transaction is + * in progress or we don't know who introduced that story. + */ + int64_t add_psn; + /** + * Statement that ended this story. Is set to NULL when the statement's + * transaction becomes committed. Can also be NULL if the tuple has not + * been deleted yet. + */ + struct txn_stmt *del_stmt; + /** + * Prepare sequence number of del_stmt's transaction. Is set when + * the transaction is prepared. Can be 0 if the transaction is + * in progress or if nobody has deleted the tuple. + */ + int64_t del_psn; + /** + * List of trackers - transactions that has read this tuple. + */ + struct rlist reader_list; + /** + * Link in tx_manager::all_stories + */ + struct rlist in_all_stories; + /** + * Link in space::memtx_tx_stories. + */ + struct rlist in_space_stories; + /** + * The space where the tuple is supposed to be. + */ + struct space *space; + /** + * Number of indexes in this space - and the count of link[]. + */ + uint32_t index_count; + /** + * Link with older and newer stories (and just tuples) for each + * index respectively. + */ + struct memtx_story_link link[]; +}; + +/** + * Snapshot cleaner is a short part of history that is supposed to clarify + * tuples in a index snapshot. It's also supposed to be used in another + * thread while common clarify would probably crash in that case. + */ +struct memtx_tx_snapshot_cleaner { + struct mh_snapshot_cleaner_t *ht; +}; + +/** * Initialize memtx transaction manager. */ void @@ -94,6 +213,104 @@ memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim); void memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim); +/** + * @brief Add a statement to transaction manager's history. + * Until unlinking or releasing the space could internally contain + * wrong tuples and must be cleaned through memtx_tx_tuple_clarify call. + * With that clarifying the statement will be visible to current transaction, + * but invisible to all others. + * Follows signature of @sa memtx_space_replace_all_keys . + * + * @param stmt current statement. + * @param old_tuple the tuple that should be removed (can be NULL). + * @param new_tuple the tuple that should be inserted (can be NULL). + * @param mode dup_replace_mode, used only if new_tuple is not + * NULL and old_tuple is NULL, and only for the + * primary key. + * @param result - old or replaced tuple. + * @return 0 on success, -1 on error (diag is set). + */ +int +memtx_tx_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple, + struct tuple *new_tuple, enum dup_replace_mode mode, + struct tuple **result); + +/** + * @brief Rollback (undo) a statement from transaction manager's history. + * It's just make the statement invisible to all. + * Prepared statements could be also removed, but for consistency all latter + * prepared statement must be also rolled back. + * + * @param stmt current statement. + */ +void +memtx_tx_history_rollback_stmt(struct txn_stmt *stmt); + +/** + * @brief Prepare statement in history for further commit. + * Prepared statements are still invisible for read-only transactions + * but are visible to all read-write transactions. + * Prepared and in-progress transactions use the same links for creating + * chains of stories in history. The difference is that the order of + * prepared transactions is fixed while in-progress transactions are + * added to the end of list in any order. Thus to switch to prepared + * we have to reorder story in such a way that current story will be + * between earlier prepared stories and in-progress stories. That's what + * this function does. + * + * @param stmt current statement. + */ +void +memtx_tx_history_prepare_stmt(struct txn_stmt *stmt); + +/** + * @brief Commit statement in history. + * Make the statement's changes permanent. It becomes visible to all. + * + * @param stmt current statement. + * @return the change in space bsize. + */ +ssize_t +memtx_tx_history_commit_stmt(struct txn_stmt *stmt); + +/** Helper of memtx_tx_tuple_clarify */ +struct tuple * +memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space, + struct tuple *tuple, uint32_t index, + uint32_t mk_index, bool is_prepared_ok); + +/** + * Record in TX manager that a transaction @txn have read a @tuple in @space. + * @return 0 on success, -1 on memory error. + */ +int +memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple); + +/** + * Clean a tuple if it's dirty - finds a visible tuple in history. + * @param txn - current transactions. + * @param space - space in which the tuple was found. + * @param tuple - tuple to clean. + * @param index - index number. + * @param mk_index - multikey index (iа the index is multikey). + * @param is_prepared_ok - allow to return prepared tuples. + * @return clean tuple (can be NULL). + */ +static inline struct tuple * +memtx_tx_tuple_clarify(struct txn *txn, struct space *space, + struct tuple *tuple, uint32_t index, + uint32_t mk_index, bool is_prepared_ok) +{ + if (!memtx_tx_manager_use_mvcc_engine) + return tuple; + if (!tuple->is_dirty) { + memtx_tx_track_read(txn, space, tuple); + return tuple; + } + return memtx_tx_tuple_clarify_slow(txn, space, tuple, index, mk_index, + is_prepared_ok); +} + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/space.c b/src/box/space.c index 1d375cc..1243932 100644 --- a/src/box/space.c +++ b/src/box/space.c @@ -210,6 +210,7 @@ space_create(struct space *space, struct engine *engine, "constraint_ids"); goto fail; } + rlist_create(&space->memtx_stories); return 0; fail_free_indexes: @@ -252,6 +253,7 @@ space_new_ephemeral(struct space_def *def, struct rlist *key_list) void space_delete(struct space *space) { + rlist_del(&space->memtx_stories); assert(space->ck_constraint_trigger == NULL); for (uint32_t j = 0; j <= space->index_id_max; j++) { struct index *index = space->index_map[j]; diff --git a/src/box/space.h b/src/box/space.h index bbdd3ef..7cfba65 100644 --- a/src/box/space.h +++ b/src/box/space.h @@ -239,6 +239,10 @@ struct space { * Hash table with constraint identifiers hashed by name. */ struct mh_strnptr_t *constraint_ids; + /** + * List of all tx stories in the space. + */ + struct rlist memtx_stories; }; /** Initialize a base space instance. */ diff --git a/src/box/txn.c b/src/box/txn.c index 976e17c..023da91 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -110,6 +110,9 @@ txn_stmt_new(struct region *region) stmt->space = NULL; stmt->old_tuple = NULL; stmt->new_tuple = NULL; + stmt->add_story = NULL; + stmt->del_story = NULL; + stmt->next_in_del_list = NULL; stmt->engine_savepoint = NULL; stmt->row = NULL; stmt->has_triggers = false; @@ -194,6 +197,7 @@ txn_new(void) } assert(region_used(®ion) == sizeof(*txn)); txn->region = region; + rlist_create(&txn->read_set); rlist_create(&txn->conflict_list); rlist_create(&txn->conflicted_by_list); rlist_create(&txn->in_read_view_txs); @@ -206,6 +210,14 @@ txn_new(void) inline static void txn_free(struct txn *txn) { + struct tx_read_tracker *tracker, *tmp; + rlist_foreach_entry_safe(tracker, &txn->read_set, + in_read_set, tmp) { + rlist_del(&tracker->in_reader_list); + rlist_del(&tracker->in_read_set); + } + assert(rlist_empty(&txn->read_set)); + struct tx_conflict_tracker *entry, *next; rlist_foreach_entry_safe(entry, &txn->conflict_list, in_conflict_list, next) { diff --git a/src/box/txn.h b/src/box/txn.h index f957d1e..ba818d0 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -36,6 +36,7 @@ #include "trigger.h" #include "fiber.h" #include "space.h" +#include "tuple.h" #if defined(__cplusplus) extern "C" { @@ -172,6 +173,27 @@ struct txn_stmt { struct space *space; struct tuple *old_tuple; struct tuple *new_tuple; + /** + * If new_tuple != NULL and this transaction was not prepared, + * this member holds added story of the new_tuple. + */ + struct memtx_story *add_story; + /** + * If new_tuple == NULL and this transaction was not prepared, + * this member holds added story of the old_tuple. + */ + struct memtx_story *del_story; + /** + * Link in memtx_story::del_stmt linked list. + * Only one prepared TX can delete a tuple and a story. But + * when there are several in-progress transactions and they delete + * the same tuple we have to store several delete statements in one + * story. It's implemented in that way: story has a pointer to the first + * deleting statement, that statement has a pointer to the next etc, + * with NULL in the end. + * That member is that the pointer to next deleting statement. + */ + struct txn_stmt *next_in_del_list; /** Engine savepoint for the start of this statement. */ void *engine_savepoint; /** Redo info: the binary log row */ @@ -354,6 +376,8 @@ struct txn { * Link in tx_manager::read_view_txs. */ struct rlist in_read_view_txs; + /** List of tx_read_trackers with stories that the TX have read. */ + struct rlist read_set; }; static inline bool -- 2.7.4