From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id AE052445322 for ; Wed, 15 Jul 2020 16:55:44 +0300 (MSK) Received: by smtpng3.m.smailru.net with esmtpa (envelope-from ) id 1jvhsu-0002ht-6D for tarantool-patches@dev.tarantool.org; Wed, 15 Jul 2020 16:55:44 +0300 From: Aleksandr Lyapunov Date: Wed, 15 Jul 2020 16:55:34 +0300 Message-Id: <1594821336-14468-12-git-send-email-alyapunov@tarantool.org> In-Reply-To: <1594821336-14468-1-git-send-email-alyapunov@tarantool.org> References: <1594821336-14468-1-git-send-email-alyapunov@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v3 11/13] txm: introduce txm_story List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org TXM story is a part of a history of a value in space. It's a story about a tuple, from the point it was added to space to the point when it was deleted from the space. All stories are linked into a list of stories of the same key of each index. Part of #4897 --- src/box/txn.c | 798 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/box/txn.h | 193 ++++++++++++++ 2 files changed, 991 insertions(+) diff --git a/src/box/txn.c b/src/box/txn.c index ba81b58..4c46b60 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -37,6 +37,28 @@ #include "xrow.h" #include "errinj.h" #include "iproto_constants.h" +#include "small/mempool.h" + +static uint32_t +txm_story_key_hash(const struct tuple *a) +{ + uintptr_t u = (uintptr_t)a; + if (sizeof(uintptr_t) <= sizeof(uint32_t)) + return u; + else + return u ^ (u >> 32); +} + +#define mh_name _history +#define mh_key_t struct tuple * +#define mh_node_t struct txm_story * +#define mh_arg_t int +#define mh_hash(a, arg) (txm_story_key_hash((*(a))->tuple)) +#define mh_hash_key(a, arg) (txm_story_key_hash(a)) +#define mh_cmp(a, b, arg) ((*(a))->tuple != (*(b))->tuple) +#define mh_cmp_key(a, b, arg) ((a) != (*(b))->tuple) +#define MH_SOURCE +#include "salad/mhash.h" struct tx_manager { @@ -48,6 +70,14 @@ struct tx_manager * so the list is ordered by rv_psn. */ struct rlist read_view_txs; + /** Mempools for tx_story objects with difference index count. */ + struct mempool txm_story_pool[BOX_INDEX_MAX]; + /** Hash table tuple -> txm_story of that tuple. */ + struct mh_history_t *history; + /** List of all txm_story objects. */ + struct rlist all_stories; + /** Iterator that sequentially traverses all txm_story objects. */ + struct rlist *traverse_all_stories; }; /** That's a definition, see declaration for description. */ @@ -146,6 +176,9 @@ txn_stmt_new(struct region *region) stmt->space = NULL; stmt->old_tuple = NULL; stmt->new_tuple = NULL; + stmt->add_story = NULL; + stmt->del_story = NULL; + stmt->next_in_del_list = NULL; stmt->engine_savepoint = NULL; stmt->row = NULL; stmt->has_triggers = false; @@ -1294,11 +1327,23 @@ void tx_manager_init() { rlist_create(&txm.read_view_txs); + for (size_t i = 0; i < BOX_INDEX_MAX; i++) { + size_t item_size = sizeof(struct txm_story) + + i * sizeof(struct txm_story_link); + mempool_create(&txm.txm_story_pool[i], + cord_slab_cache(), item_size); + } + txm.history = mh_history_new(); + rlist_create(&txm.all_stories); + txm.traverse_all_stories = &txm.all_stories; } void tx_manager_free() { + mh_history_delete(txm.history); + for (size_t i = 0; i < BOX_INDEX_MAX; i++) + mempool_destroy(&txm.txm_story_pool[i]); } int @@ -1345,3 +1390,756 @@ txm_cause_conflict(struct txn *breaker, struct txn *victim) rlist_add(&victim->conflicted_by_list, &tracker->in_conflicted_by_list); return 0; } + +/** + * Creates new story and links it with the @tuple. + * @return story on success, NULL on error (diag is set). + */ +static struct txm_story * +txm_story_new(struct space *space, struct tuple *tuple) +{ + assert(!tuple->is_dirty); + uint32_t index_count = space->index_count; + assert(index_count < BOX_INDEX_MAX); + struct mempool *pool = &txm.txm_story_pool[index_count]; + struct txm_story *story = (struct txm_story *)mempool_alloc(pool); + if (story == NULL) { + size_t item_size = sizeof(struct txm_story) + + index_count * sizeof(struct txm_story_link); + diag_set(OutOfMemory, item_size, "tx_manager", "tx story"); + return story; + } + story->tuple = tuple; + + const struct txm_story **put_story = (const struct txm_story **)&story; + struct txm_story **empty = NULL; + mh_int_t pos = mh_history_put(txm.history, put_story, &empty, 0); + if (pos == mh_end(txm.history)) { + mempool_free(pool, story); + diag_set(OutOfMemory, pos + 1, + "tx_manager", "tx history hash table"); + return NULL; + } + tuple->is_dirty = true; + tuple_ref(tuple); + + story->index_count = index_count; + story->add_stmt = NULL; + story->add_psn = 0; + story->del_stmt = NULL; + story->del_psn = 0; + rlist_create(&story->reader_list); + rlist_add_tail(&txm.all_stories, &story->in_all_stories); + memset(story->link, 0, sizeof(story->link[0]) * index_count); + return story; +} + +static void +txm_story_delete(struct txm_story *story); + +/** + * Creates new story of a @tuple that was added by @stmt. + * @return story on success, NULL on error (diag is set). + */ +static struct txm_story * +txm_story_new_add_stmt(struct tuple *tuple, struct txn_stmt *stmt) +{ + struct txm_story *res = txm_story_new(stmt->space, tuple); + if (res == NULL) + return NULL; + res->add_stmt = stmt; + assert(stmt->add_story == NULL); + stmt->add_story = res; + return res; +} + +/** + * Creates new story of a @tuple that was deleted by @stmt. + * @return story on success, NULL on error (diag is set). + */ +static struct txm_story * +txm_story_new_del_stmt(struct tuple *tuple, struct txn_stmt *stmt) +{ + struct txm_story *res = txm_story_new(stmt->space, tuple); + if (res == NULL) + return NULL; + res->del_stmt = stmt; + assert(stmt->del_story == NULL); + stmt->del_story = res; + return res; +} + +/** + * Undo txm_story_new_add_stmt. + */ +static void +txm_story_delete_add_stmt(struct txm_story *story) +{ + story->add_stmt->add_story = NULL; + story->add_stmt = NULL; + txm_story_delete(story); +} + +/** + * Undo txm_story_new_del_stmt. + */ +static void +txm_story_delete_del_stmt(struct txm_story *story) +{ + story->del_stmt->del_story = NULL; + story->del_stmt = NULL; + txm_story_delete(story); +} + + +/** + * Find a story of a @tuple. The story expected to be present (assert). + */ +static struct txm_story * +txm_story_get(struct tuple *tuple) +{ + assert(tuple->is_dirty); + + mh_int_t pos = mh_history_find(txm.history, tuple, 0); + assert(pos != mh_end(txm.history)); + return *mh_history_node(txm.history, pos); +} + +/** + * Get the older tuple, extracting it from older story if necessary. + */ +static struct tuple * +txm_story_older_tuple(struct txm_story_link *link) +{ + return link->older.is_story ? link->older.story->tuple + : link->older.tuple; +} + +/** + * Link a @story with older story in @index (in both directions). + */ +static void +txm_story_link_story(struct txm_story *story, struct txm_story *older_story, + uint32_t index) +{ + assert(older_story != NULL); + struct txm_story_link *link = &story->link[index]; + /* Must be unlinked. */ + assert(!link->older.is_story); + assert(link->older.tuple == NULL); + link->older.is_story = true; + link->older.story = older_story; + older_story->link[index].newer_story = story; +} + +/** + * Link a @story with older tuple in @index. In case if the tuple is dirty - + * find and link with the corresponding story. + */ +static void +txm_story_link_tuple(struct txm_story *story, struct tuple *older_tuple, + uint32_t index) +{ + struct txm_story_link *link = &story->link[index]; + /* Must be unlinked. */ + assert(!link->older.is_story); + assert(link->older.tuple == NULL); + if (older_tuple == NULL) + return; + if (older_tuple->is_dirty) { + txm_story_link_story(story, txm_story_get(older_tuple), index); + return; + } + link->older.tuple = older_tuple; + tuple_ref(link->older.tuple); +} + +/** + * Unlink a @story with older story/tuple in @index. + */ +static void +txm_story_unlink(struct txm_story *story, uint32_t index) +{ + struct txm_story_link *link = &story->link[index]; + if (link->older.is_story) { + link->older.story->link[index].newer_story = NULL; + } else if (link->older.tuple != NULL) { + tuple_unref(link->older.tuple); + link->older.tuple = NULL; + } + link->older.is_story = false; + link->older.tuple = NULL; +} + +/** + * Check if a @story is visible for transaction @txn. Return visible tuple to + * @visible_tuple (can be set to NULL). + * @param is_prepared_ok - whether prepared (not committed) change is acceptable. + * @param own_change - return true if the change was made by @txn itself. + * @return true if the story is visible, false otherwise. + */ +static bool +txm_story_is_visible(struct txm_story *story, struct txn *txn, + struct tuple **visible_tuple, bool is_prepared_ok, + bool *own_change) +{ + *own_change = false; + *visible_tuple = NULL; + struct txn_stmt *dels = story->del_stmt; + while (dels != NULL) { + if (dels->txn == txn) { + /* Tuple is deleted by us (@txn). */ + *own_change = true; + return true; + } + dels = dels->next_in_del_list; + } + if (is_prepared_ok && story->del_psn != 0 && + (txn->rv_psn == 0 || story->del_psn < txn->rv_psn)) { + /* Tuple is deleted by prepared TX. */ + return true; + } + if (story->del_psn != 0 && story->del_stmt == NULL && + (txn->rv_psn == 0 || story->del_psn < txn->rv_psn)) { + /* Tuple is deleted by committed TX. */ + return true; + } + + if (story->add_stmt != NULL && story->add_stmt->txn == txn) { + /* Tuple is added by us (@txn). */ + *visible_tuple = story->tuple; + *own_change = true; + return true; + } + if (is_prepared_ok && story->add_psn != 0 && + (txn->rv_psn == 0 || story->add_psn < txn->rv_psn)) { + /* Tuple is added by another prepared TX. */ + *visible_tuple = story->tuple; + return true; + } + if (story->add_psn != 0 && story->add_stmt == NULL && + (txn->rv_psn == 0 || story->add_psn < txn->rv_psn)) { + /* Tuple is added by committed TX. */ + *visible_tuple = story->tuple; + return true; + } + if (story->add_psn == 0 && story->add_stmt == NULL) { + /* added long time ago. */ + *visible_tuple = story->tuple; + return true; + } + return false; +} + +/** + * Temporary (allocated on region) struct that stores a conflicting TX. + */ +struct txn_conflict +{ + struct txn *other_txn; + struct txn_conflict *next; +}; + +/** + * Save @other_txn in list with head @coflicts_head. New list node is allocated + * on @region. + * @return 0 on success, -1 on memory error. + */ +static int +txm_save_conflict(struct txn *other_txn, struct txn_conflict **coflicts_head, + struct region *region) +{ + size_t err_size; + struct txn_conflict *next_conflict; + next_conflict = region_alloc_object(region, struct txn_conflict, + &err_size); + if (next_conflict == NULL) { + diag_set(OutOfMemory, err_size, "txn_region", "txn conflict"); + return -1; + } + next_conflict->other_txn = other_txn; + next_conflict->next = *coflicts_head; + *coflicts_head = next_conflict; + return 0; +} + +/** + * Scan a history starting by @stmt statement in @index for a visible tuple + * (prepared suits), returned via @visible_replaced. + * Collect a list of transactions that will abort current transaction if they + * are committed. + * + * @return 0 on success, -1 on memory error. + */ +static int +txm_story_find_visible(struct txn_stmt *stmt, struct txm_story *story, + uint32_t index, struct tuple **visible_replaced, + struct txn_conflict **collected_conflicts, + struct region *region) +{ + while (true) { + if (!story->link[index].older.is_story) { + /* + * the tuple is so old that we doesn't + * know its story. + */ + *visible_replaced = story->link[index].older.tuple; + assert(*visible_replaced == NULL || + !(*visible_replaced)->is_dirty); + break; + } + story = story->link[index].older.story; + bool unused; + if (txm_story_is_visible(story, stmt->txn, visible_replaced, + true, &unused)) + break; + + /* + * We skip the story but once the story is committed + * before out TX that may cause conflict. + * The conflict will be unavoidable if this statement + * relies on old_tuple. If not (it's a replace), + * the conflict will take place only for secondary + * index if the story will not be overwritten in primary + * index. + */ + bool cross_conflict = false; + if (stmt->does_require_old_tuple) { + cross_conflict = true; + } else if (index != 0) { + struct txm_story *look_up = story; + cross_conflict = true; + while (look_up->link[index].newer_story != NULL) { + struct txm_story *over; + over = look_up->link[index].newer_story; + if (over->add_stmt->txn == stmt->txn) { + cross_conflict = false; + break; + } + look_up = over; + } + } + if (cross_conflict) { + if (txm_save_conflict(story->add_stmt->txn, + collected_conflicts, + region) != 0) + return -1; + + } + } + return 0; +} + +int +txm_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple, + struct tuple *new_tuple, enum dup_replace_mode mode, + struct tuple **result) +{ + assert(new_tuple != NULL || old_tuple != NULL); + struct space *space = stmt->space; + struct txm_story *add_story = NULL; + uint32_t add_story_linked = 0; + struct txm_story *del_story = NULL; + bool del_story_created = false; + struct region *region = &stmt->txn->region; + size_t region_svp = region_used(region); + + /* + * List of transactions that will conflict us once one of them + * become committed. + */ + struct txn_conflict *collected_conflicts = NULL; + + /* Create add_story if necessary. */ + if (new_tuple != NULL) { + add_story = txm_story_new_add_stmt(new_tuple, stmt); + if (add_story == NULL) + goto fail; + + for (uint32_t i = 0; i < space->index_count; i++) { + struct tuple *replaced; + struct index *index = space->index[i]; + if (index_replace(index, NULL, new_tuple, + DUP_REPLACE_OR_INSERT, + &replaced) != 0) + goto fail; + txm_story_link_tuple(add_story, replaced, i); + add_story_linked++; + + struct tuple *visible_replaced = NULL; + if (txm_story_find_visible(stmt, add_story, i, + &visible_replaced, + &collected_conflicts, + region) != 0) + goto fail; + + uint32_t errcode; + errcode = replace_check_dup(old_tuple, visible_replaced, + i == 0 ? mode : DUP_INSERT); + if (errcode != 0) { + struct space *sp = stmt->space; + if (sp != NULL) + diag_set(ClientError, errcode, + sp->index[i]->def->name, + space_name(sp)); + goto fail; + } + + if (i == 0) { + old_tuple = visible_replaced; + } + } + } + + /* Create del_story if necessary. */ + struct tuple *del_tuple = NULL; + if (new_tuple != NULL) { + struct txm_story_link *link = &add_story->link[0]; + if (link->older.is_story) { + del_story = link->older.story; + del_tuple = del_story->tuple; + } else { + del_tuple = link->older.tuple; + } + } else { + del_tuple = old_tuple; + } + if (del_tuple != NULL && del_story == NULL) { + if (del_tuple->is_dirty) { + del_story = txm_story_get(del_tuple); + } else { + del_story = txm_story_new_del_stmt(del_tuple, stmt); + if (del_story == NULL) + goto fail; + del_story_created = true; + } + } + if (new_tuple != NULL && del_story_created) { + for (uint32_t i = 0; i < add_story->index_count; i++) { + struct txm_story_link *link = &add_story->link[i]; + if (link->older.is_story) + continue; + if (link->older.tuple == del_tuple) { + txm_story_unlink(add_story, i); + txm_story_link_story(add_story, del_story, i); + } + } + } + if (del_story != NULL && !del_story_created) { + stmt->next_in_del_list = del_story->del_stmt; + del_story->del_stmt = stmt; + stmt->del_story = del_story; + } + + /* Purge found conflicts. */ + while (collected_conflicts != NULL) { + if (txm_cause_conflict(collected_conflicts->other_txn, + stmt->txn) != 0) + goto fail; + collected_conflicts = collected_conflicts->next; + } + region_truncate(region, region_svp); + + /* + * We now reference both new and old tuple because the stmt holds + * pointers to them. + */ + if (stmt->new_tuple != NULL) + tuple_ref(stmt->new_tuple); + *result = old_tuple; + if (*result != NULL) + tuple_ref(*result); + return 0; + +fail: + if (add_story != NULL) { + while (add_story_linked > 0) { + --add_story_linked; + uint32_t i = add_story_linked; + + struct index *index = space->index[i]; + struct txm_story_link *link = &add_story->link[i]; + struct tuple *was = txm_story_older_tuple(link); + struct tuple *unused; + if (index_replace(index, new_tuple, was, + DUP_INSERT, + &unused) != 0) { + diag_log(); + unreachable(); + panic("failed to rollback change"); + } + + txm_story_unlink(stmt->add_story, i); + + } + txm_story_delete_add_stmt(stmt->add_story); + } + + if (del_story != NULL && del_story->del_stmt == stmt) { + del_story->del_stmt = stmt->next_in_del_list; + stmt->next_in_del_list = NULL; + } + + if (del_story_created) + txm_story_delete_del_stmt(stmt->del_story); + else + stmt->del_story = NULL; + + region_truncate(region, region_svp); + return -1; +} + +void +txm_history_rollback_stmt(struct txn_stmt *stmt) +{ + if (stmt->add_story != NULL) { + assert(stmt->add_story->tuple == stmt->new_tuple); + struct txm_story *story = stmt->add_story; + + for (uint32_t i = 0; i < story->index_count; i++) { + struct txm_story_link *link = &story->link[i]; + if (link->newer_story == NULL) { + struct tuple *unused; + struct index *index = stmt->space->index[i]; + struct tuple *was = txm_story_older_tuple(link); + if (index_replace(index, story->tuple, was, + DUP_INSERT, &unused) != 0) { + diag_log(); + unreachable(); + panic("failed to rollback change"); + } + } else { + struct txm_story *newer = link->newer_story; + assert(newer->link[i].older.is_story); + assert(newer->link[i].older.story == story); + txm_story_unlink(newer, i); + if (link->older.is_story) { + struct txm_story *to = link->older.story; + txm_story_link_story(newer,to, i); + } else { + struct tuple *to = link->older.tuple; + txm_story_link_tuple(newer, to, i); + } + } + txm_story_unlink(story, i); + } + stmt->add_story->add_stmt = NULL; + stmt->add_story = NULL; + tuple_unref(stmt->new_tuple); + } + + if (stmt->del_story != NULL) { + struct txm_story *story = stmt->del_story; + + struct txn_stmt **prev = &story->del_stmt; + while (*prev != stmt) { + prev = &(*prev)->next_in_del_list; + assert(*prev != NULL); + } + *prev = stmt->next_in_del_list; + stmt->next_in_del_list = NULL; + + stmt->del_story->del_stmt = NULL; + stmt->del_story = NULL; + } +} + +void +txm_history_prepare_stmt(struct txn_stmt *stmt) +{ + assert(stmt->txn->psn != 0); + + /* Move story to the past to prepared stories. */ + + struct txm_story *story = stmt->add_story; + uint32_t index_count = story == NULL ? 0 : story->index_count; + /* + * Note that if stmt->add_story == NULL, the index_count is set to 0, + * and we will not enter the loop. + */ + for (uint32_t i = 0; i < index_count; ) { + if (!story->link[i].older.is_story) { + /* tuple is old. */ + i++; + continue; + } + struct txm_story *old_story = + story->link[i].older.story; + if (old_story->del_psn != 0) { + /* is psn is set, the change is prepared. */ + i++; + continue; + } + if (old_story->add_psn != 0) { + /* is psn is set, the change is prepared. */ + i++; + continue; + } + + if (old_story->add_stmt != NULL) { + /* ancient. */ + i++; + continue; + } + if (old_story->add_stmt->txn == stmt->txn) { + /* added by us. */ + i++; + continue; + } + + if (old_story->add_stmt->does_require_old_tuple || i != 0) + old_story->add_stmt->txn->status = TXN_CONFLICTED; + + /* Swap story and old story. */ + struct txm_story_link *link = &story->link[i]; + if (link->newer_story == NULL) { + /* we have to replace the tuple in index. */ + struct tuple *unused; + struct index *index = stmt->space->index[i]; + if (index_replace(index, story->tuple, old_story->tuple, + DUP_INSERT, &unused) != 0) { + diag_log(); + panic("failed to rollback change"); + } + } else { + struct txm_story *newer = link->newer_story; + assert(newer->link[i].older.is_story); + assert(newer->link[i].older.story == story); + txm_story_unlink(newer, i); + txm_story_link_story(newer, old_story, i); + } + + txm_story_unlink(story, i); + if (old_story->link[i].older.is_story) { + struct txm_story *to = + old_story->link[i].older.story; + txm_story_unlink(old_story, i); + txm_story_link_story(story, to, i); + } else { + struct tuple *to = + old_story->link[i].older.tuple; + txm_story_unlink(old_story, i); + txm_story_link_tuple(story, to, i); + } + + txm_story_link_story(old_story, story, i); + + if (i == 0) { + assert(stmt->del_story == old_story); + assert(story->link[0].older.is_story || + story->link[0].older.tuple == NULL); + + struct txn_stmt *dels = old_story->del_stmt; + assert(dels != NULL); + do { + if (dels->txn != stmt->txn) + dels->txn->status = TXN_CONFLICTED; + dels->del_story = NULL; + struct txn_stmt *next = dels->next_in_del_list; + dels->next_in_del_list = NULL; + dels = next; + } while (dels != NULL); + old_story->del_stmt = NULL; + + if (story->link[0].older.is_story) { + struct txm_story *oldest_story = + story->link[0].older.story; + dels = oldest_story->del_stmt; + while (dels != NULL) { + assert(dels->txn != stmt->txn); + dels->del_story = NULL; + struct txn_stmt *next = + dels->next_in_del_list; + dels->next_in_del_list = NULL; + dels = next; + } + oldest_story->del_stmt = stmt; + stmt->del_story = oldest_story; + } + } + } + if (stmt->add_story != NULL) + stmt->add_story->add_psn = stmt->txn->psn; + + if (stmt->del_story != NULL) + stmt->del_story->del_psn = stmt->txn->psn; +} + +ssize_t +txm_history_commit_stmt(struct txn_stmt *stmt) +{ + size_t res = 0; + if (stmt->add_story != NULL) { + assert(stmt->add_story->add_stmt == stmt); + res += stmt->add_story->tuple->bsize; + stmt->add_story->add_stmt = NULL; + stmt->add_story = NULL; + } + if (stmt->del_story != NULL) { + assert(stmt->del_story->del_stmt == stmt); + assert(stmt->next_in_del_list == NULL); + res -= stmt->del_story->tuple->bsize; + tuple_unref(stmt->del_story->tuple); + stmt->del_story->del_stmt = NULL; + stmt->del_story = NULL; + } + return res; +} + +struct tuple * +txm_tuple_clarify_slow(struct txn *txn, struct space *space, + struct tuple *tuple, uint32_t index, + uint32_t mk_index, bool is_prepared_ok) +{ + (void)space; + assert(tuple->is_dirty); + struct txm_story *story = txm_story_get(tuple); + bool own_change = false; + struct tuple *result = NULL; + + while (true) { + if (txm_story_is_visible(story, txn, &result, + is_prepared_ok, &own_change)) { + break; + } + if (story->link[index].older.is_story) { + story = story->link[index].older.story; + } else { + result = story->link[index].older.tuple; + break; + } + } + (void)own_change; /* TODO: add conflict */ + (void)mk_index; /* TODO: multiindex */ + return result; +} + +static void +txm_story_delete(struct txm_story *story) +{ + assert(story->add_stmt == NULL); + assert(story->del_stmt == NULL); + + if (txm.traverse_all_stories == &story->in_all_stories) + txm.traverse_all_stories = rlist_next(txm.traverse_all_stories); + rlist_del(&story->in_all_stories); + + mh_int_t pos = mh_history_find(txm.history, story->tuple, 0); + assert(pos != mh_end(txm.history)); + mh_history_del(txm.history, pos, 0); + + story->tuple->is_dirty = false; + tuple_unref(story->tuple); + +#ifndef NDEBUG + /* Expecting to delete fully unlinked story. */ + for (uint32_t i = 0; i < story->index_count; i++) { + assert(story->link[i].newer_story == NULL); + assert(story->link[i].older.is_story == false); + assert(story->link[i].older.tuple == NULL); + } +#endif + + struct mempool *pool = &txm.txm_story_pool[story->index_count]; + mempool_free(pool, story); +} diff --git a/src/box/txn.h b/src/box/txn.h index 03ccb76..e294497 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -36,6 +36,7 @@ #include "trigger.h" #include "fiber.h" #include "space.h" +#include "tuple.h" #if defined(__cplusplus) extern "C" { @@ -175,6 +176,27 @@ struct txn_stmt { struct space *space; struct tuple *old_tuple; struct tuple *new_tuple; + /** + * If new_tuple != NULL and this transaction was not prepared, + * this member holds added story of the new_tuple. + */ + struct txm_story *add_story; + /** + * If new_tuple == NULL and this transaction was not prepared, + * this member holds added story of the old_tuple. + */ + struct txm_story *del_story; + /** + * Link in txm_story::del_stmt linked list. + * Only one prepared TX can delete a tuple and a story. But + * when there are several in-progress transactions and they delete + * the same tuple we have to store several delete statements in one + * story. It's implemented in that way: story has a pointer to the first + * deleting statement, that statement has a pointer to the next etc, + * with NULL in the end. + * That member is that the pointer to next deleting statement. + */ + struct txn_stmt *next_in_del_list; /** Engine savepoint for the start of this statement. */ void *engine_savepoint; /** Redo info: the binary log row */ @@ -359,6 +381,88 @@ struct txn { struct rlist in_read_view_txs; }; +/** + * Pointer to tuple or story. + */ +struct txm_story_or_tuple { + /** Flag whether it's a story. */ + bool is_story; + union { + /** Pointer to story, it must be reverse liked. */ + struct txm_story *story; + /** Smart pointer to tuple: the tuple is referenced if set. */ + struct tuple *tuple; + }; +}; + +/** + * Link that connects a txm_story with older and newer stories of the same + * key in index. + */ +struct txm_story_link { + /** Story that was happened after that story was ended. */ + struct txm_story *newer_story; + /** + * Older story or ancient tuple (so old that its story was lost). + * In case of tuple is can also be NULL. + */ + struct txm_story_or_tuple older; +}; + +/** + * A part of a history of a value in space. + * It's a story about a tuple, from the point it was added to space to the + * point when it was deleted from a space. + * All stories are linked into a list of stories of the same key of each index. + */ +struct txm_story { + /** The story is about this tuple. The tuple is referenced. */ + + struct tuple *tuple; + /** + * Statement that told this story. Is set to NULL when the statement's + * transaction becomes committed. Can also be NULL if we don't know who + * introduced that story, the tuple was added by a transaction that + * was completed and destroyed some time ago. + */ + struct txn_stmt *add_stmt; + /** + * Prepare sequence number of add_stmt's transaction. Is set when + * the transactions is prepared. Can be 0 if the transaction is + * in progress or we don't know who introduced that story. + */ + int64_t add_psn; + /** + * Statement that ended this story. Is set to NULL when the statement's + * transaction becomes committed. Can also be NULL if the tuple has not + * been deleted yet. + */ + struct txn_stmt *del_stmt; + /** + * Prepare sequence number of del_stmt's transaction. Is set when + * the transactions is prepared. Can be 0 if the transaction is + * in progress or if nobody has deleted the tuple. + */ + int64_t del_psn; + /** + * List of trackers - transactions that has read this tuple. + */ + struct rlist reader_list; + /** + * Link in tx_manager::all_stories + */ + struct rlist in_all_stories; + /** + * Number of indexes in this space - and the count of link[]. + */ + uint32_t index_count; + /** + * Link with older and newer stories (and just tuples) for each + * index respectively. + */ + struct txm_story_link link[]; +}; + static inline bool txn_has_flag(struct txn *txn, enum txn_flag flag) { @@ -749,6 +853,95 @@ tx_manager_free(); int txm_cause_conflict(struct txn *breaker, struct txn *victim); +/** + * @brief Add a statement to transaction manager's history. + * Until unlinking or releasing the space could internally contain + * wrong tuples and must be cleaned through txm_tuple_clarify call. + * With that clarifying the statement will be visible to current transaction, + * but invisible to all others. + * Follows signature of @sa memtx_space_replace_all_keys . + * + * @param stmt current statement. + * @param old_tuple the tuple that should be removed (can be NULL). + * @param new_tuple the tuple that should be inserted (can be NULL). + * @param mode dup_replace_mode, used only if new_tuple is not + * NULL and old_tuple is NULL, and only for the + * primary key. + * @param result - old or replaced tuple. + * @return 0 on success, -1 on error (diag is set). + */ +int +txm_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple, + struct tuple *new_tuple, enum dup_replace_mode mode, + struct tuple **result); + +/** + * @brief Rollback (undo) a statement from transaction manager's history. + * It's just make the statement invisible to all. + * Prepared statements could be also removed, but for consistency all latter + * prepared statement must be also rolled back. + * + * @param stmt current statement. + */ +void +txm_history_rollback_stmt(struct txn_stmt *stmt); + +/** + * @brief Prepare statement in history for further commit. + * Prepared statements are still invisible for read-only transactions + * but are visible to all read-write transactions. + * Prepared and in-progress transactions use the same links for creating + * chains of stories in history. The difference is that the order of + * prepared transactions is fixed while in-progress transactions are + * added to the end of list in any order. Thus to switch to prepared + * we have to reorder story in such a way that current story will be + * between earlier prepared stories and in-progress stories. That's what + * this function does. + * + * @param stmt current statement. + */ +void +txm_history_prepare_stmt(struct txn_stmt *stmt); + +/** + * @brief Commit statement in history. + * Make the statement's changes permanent. It becomes visible to all. + * + * @param stmt current statement. + * @return the change in space bsize. + */ +ssize_t +txm_history_commit_stmt(struct txn_stmt *stmt); + +/** Helper of txm_tuple_clarify */ +struct tuple * +txm_tuple_clarify_slow(struct txn *txn, struct space *space, + struct tuple *tuple, uint32_t index, + uint32_t mk_index, bool is_prepared_ok); + +/** + * Cleans a tuple if it's dirty - finds a visible tuple in history. + * @param txn - current transactions. + * @param space - space in which the tuple was found. + * @param tuple - tuple to clean. + * @param index - index number. + * @param mk_index - multikey index (iа the index is multikey). + * @param is_prepared_ok - allow to return prepared tuples. + * @return clean tuple (can be NULL). + */ +static inline struct tuple * +txm_tuple_clarify(struct txn *txn, struct space *space, + struct tuple *tuple, uint32_t index, + uint32_t mk_index, bool is_prepared_ok) +{ + if (!tx_manager_use_mvcc_engine) + return tuple; + if (!tuple->is_dirty) + return tuple; + return txm_tuple_clarify_slow(txn, space, tuple, index, mk_index, + is_prepared_ok); +} + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ -- 2.7.4