From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng2.m.smailru.net (smtpng2.m.smailru.net [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id BC7F344532D for ; Wed, 8 Jul 2020 18:14:35 +0300 (MSK) From: Aleksandr Lyapunov Date: Wed, 8 Jul 2020 18:14:20 +0300 Message-Id: <1594221263-6228-14-git-send-email-alyapunov@tarantool.org> In-Reply-To: <1594221263-6228-1-git-send-email-alyapunov@tarantool.org> References: <1594221263-6228-1-git-send-email-alyapunov@tarantool.org> Subject: [Tarantool-patches] [PATCH 13/16] tx: introduce txm_story List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Cc: v.shpilevoy@tarantool.org --- src/box/txn.c | 725 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++- src/box/txn.h | 127 ++++++++++ 2 files changed, 851 insertions(+), 1 deletion(-) diff --git a/src/box/txn.c b/src/box/txn.c index b9f1737..78c542f 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -36,13 +36,44 @@ #include "xrow.h" #include "errinj.h" #include "iproto_constants.h" +#include "small/mempool.h" + +static uint32_t +txm_story_key_hash(const struct tuple *a) +{ + uintptr_t u = (uintptr_t)a; + if (sizeof(uintptr_t) <= sizeof(uint32_t)) + return u; + else + return u ^ (u >> 32); +} + +#define mh_name _history +#define mh_key_t struct tuple * +#define mh_node_t struct txm_story * +#define mh_arg_t int +#define mh_hash(a, arg) (txm_story_key_hash((*(a))->tuple)) +#define mh_hash_key(a, arg) (txm_story_key_hash(a)) +#define mh_cmp(a, b, arg) ((*(a))->tuple != (*(b))->tuple) +#define mh_cmp_key(a, b, arg) ((a) != (*(b))->tuple) +#define MH_SOURCE +#include "salad/mhash.h" struct tx_manager { /** Last prepare-sequence-number that was assigned to preped TX. */ int64_t last_psn; + /** Mempools for tx_story objects with difference index count. */ + struct mempool txm_story_pool[BOX_INDEX_MAX]; + /** Hash table tuple -> txm_story of that tuple. */ + struct mh_history_t *history; + /** List of all txm_story objects. */ + struct rlist all_stories; + /** Iterator that sequentially traverses all txm_story objects. */ + struct rlist *traverse_all_stories; }; +/** The one and only instance of tx_manager. */ static struct tx_manager txm; struct tx_conflict_tracker { @@ -120,6 +151,9 @@ txn_stmt_new(struct region *region) stmt->space = NULL; stmt->old_tuple = NULL; stmt->new_tuple = NULL; + stmt->add_story = NULL; + stmt->del_story = NULL; + stmt->next_in_del_list = NULL; stmt->engine_savepoint = NULL; stmt->row = NULL; stmt->has_triggers = false; @@ -1047,11 +1081,23 @@ txn_on_yield(struct trigger *trigger, void *event) void tx_manager_init() { + for (size_t i = 0; i < BOX_INDEX_MAX; i++) { + size_t item_size = sizeof(struct txm_story) + + i * sizeof(struct txm_story_link); + mempool_create(&txm.txm_story_pool[i], + cord_slab_cache(), item_size); + } + txm.history = mh_history_new(); + rlist_create(&txm.all_stories); + txm.traverse_all_stories = &txm.all_stories; } void tx_manager_free() { + mh_history_delete(txm.history); + for (size_t i = 0; i < BOX_INDEX_MAX; i++) + mempool_destroy(&txm.txm_story_pool[i]); } int @@ -1095,4 +1141,681 @@ txm_cause_conflict(struct txn *wreaker, struct txn *victim) rlist_add(&wreaker->conflict_list, &tracker->in_conflict_list); rlist_add(&wreaker->conflicted_by_list, &tracker->in_conflicted_by_list); return 0; -} \ No newline at end of file +} + +struct txm_story * +txm_story_new(struct tuple *tuple, uint32_t index_count) +{ + assert(!tuple->is_dirty); + assert(index_count < BOX_INDEX_MAX); + struct mempool *pool = &txm.txm_story_pool[index_count]; + struct txm_story *story = (struct txm_story *)mempool_alloc(pool); + if (story == NULL) { + size_t item_size = sizeof(struct txm_story) + + index_count * sizeof(struct txm_story_link); + diag_set(OutOfMemory, item_size, + "tx_manager", "tx story"); + return story; + } + story->tuple = tuple; + + const struct txm_story **put_story = (const struct txm_story **)&story; + struct txm_story **empty = NULL; + mh_int_t pos = mh_history_put(txm.history, put_story, &empty, 0); + if (pos == mh_end(txm.history)) { + mempool_free(pool, story); + diag_set(OutOfMemory, pos + 1, + "tx_manager", "tx history hash table"); + return NULL; + } + tuple->is_dirty = true; + tuple_ref(tuple); + + story->index_count = index_count; + story->add_stmt = NULL; + story->add_psn = 0; + story->del_stmt = NULL; + story->del_psn = 0; + rlist_create(&story->reader_list); + rlist_add(&txm.all_stories, &story->in_all_stories); + memset(story->link, 0, sizeof(story->link[0]) * index_count); + return story; +} + +static struct txm_story * +txm_story_new_add_stmt(struct tuple *tuple, struct txn_stmt *stmt) +{ + struct txm_story *res = txm_story_new(tuple, stmt->space->index_count); + if (res == NULL) + return NULL; + res->add_stmt = stmt; + assert(stmt->add_story == NULL); + stmt->add_story = res; + return res; +} + +static struct txm_story * +txm_story_new_del_stmt(struct tuple *tuple, struct txn_stmt *stmt) +{ + struct txm_story *res = txm_story_new(tuple, stmt->space->index_count); + if (res == NULL) + return NULL; + res->del_stmt = stmt; + assert(stmt->del_story == NULL); + stmt->del_story = res; + return res; +} + +static void +txm_story_delete_add_stmt(struct txm_story *story) +{ + story->add_stmt->add_story = NULL; + story->add_stmt = NULL; + txm_story_delete(story); +} + +static void +txm_story_delete_del_stmt(struct txm_story *story) +{ + story->del_stmt->del_story = NULL; + story->del_stmt = NULL; + txm_story_delete(story); +} + + +static struct txm_story * +txm_story_get(struct tuple *tuple) +{ + assert(tuple->is_dirty); + + mh_int_t pos = mh_history_find(txm.history, tuple, 0); + assert(pos != mh_end(txm.history)); + return *mh_history_node(txm.history, pos); +} + +static struct tuple * +txm_story_older_tuple(struct txm_story_link *link) +{ + return link->older.is_story ? link->older.story->tuple + : link->older.tuple; +} + +static void +txm_story_link_story(struct txm_story *story, struct txm_story *older_story, + uint32_t index) +{ + assert(older_story != NULL); + struct txm_story_link *link = &story->link[index]; + /* Must be unlinked. */ + assert(!link->older.is_story); + assert(link->older.tuple == NULL); + link->older.is_story = true; + link->older.story = older_story; + older_story->link[index].newer_story = story; +} + +static void +txm_story_link_tuple(struct txm_story *story, struct tuple *older_tuple, + uint32_t index) +{ + struct txm_story_link *link = &story->link[index]; + /* Must be unlinked. */ + assert(!link->older.is_story); + assert(link->older.tuple == NULL); + if (older_tuple == NULL) + return; + if (older_tuple->is_dirty) { + txm_story_link_story(story, txm_story_get(older_tuple), index); + return; + } + tusp_setNT(&link->older.tuple, older_tuple); +} + +static void +txm_story_unlink(struct txm_story *story, uint32_t index) +{ + struct txm_story_link *link = &story->link[index]; + if (link->older.is_story) + link->older.story->link[index].newer_story = NULL; + else + tusp_setUN(&link->older.tuple, NULL); + link->older.is_story = false; + link->older.tuple = NULL; +} + +static bool +txm_story_is_visible(struct txm_story *story, struct txn *txn, + struct tuple **visible_tuple, bool prepared_ok, + bool *own_change) +{ + *own_change = false; + struct txn_stmt *dels = story->del_stmt; + while (dels != NULL) { + if (dels->txn == txn) { + /* deleted by us. */ + *visible_tuple = NULL; + *own_change = true; + return true; + } + dels = dels->next_in_del_list; + } + if (prepared_ok && story->del_psn != 0) { + /* deleted by at least prepared TX. */ + *visible_tuple = NULL; + return true; + } + if (story->del_psn != 0 && story->del_stmt == NULL) { + /* deleted by committed TX. */ + *visible_tuple = NULL; + return true; + } + + if (story->add_stmt != NULL && story->add_stmt->txn == txn) { + /* added by us. */ + *visible_tuple = story->tuple; + *own_change = true; + return true; + } + if (prepared_ok && story->add_psn != 0) { + /* added by at least prepared TX. */ + *visible_tuple = story->tuple; + return true; + } + if (story->add_psn != 0 && story->add_stmt == NULL) { + /* added by committed TX. */ + *visible_tuple = story->tuple; + return true; + } + return false; +} + +/** Temporary allocated on region that stores a conflicting TX. */ +struct txn_conflict +{ + struct txn *other_txn; + struct txn_conflict *next; +}; + +static int +txm_save_conflict(struct txn *other_txn, struct txn_conflict **coflicts_head, + struct region *region) +{ + size_t err_size; + struct txn_conflict *next_conflict; + next_conflict = region_alloc_object(region, struct txn_conflict, + &err_size); + if (next_conflict == NULL) { + diag_set(OutOfMemory, err_size, + "txn_region", "txn conflict"); + return -1; + } + next_conflict->other_txn = other_txn; + next_conflict->next = *coflicts_head; + *coflicts_head = next_conflict; + return 0; +} + +int +txm_story_find_visible(struct txn_stmt *stmt, struct txm_story *story, + uint32_t ind, struct tuple **visible_replaced, + struct txn_conflict **collected_conflicts, + struct region *region) +{ + while (true) { + if (!story->link[ind].older.is_story) { + /* + * the tuple is so old that we doesn't + * know its story. + */ + *visible_replaced = story->link[ind].older.tuple; + assert(*visible_replaced == NULL || + !(*visible_replaced)->is_dirty); + break; + } + story = story->link[ind].older.story; + bool unused; + if (txm_story_is_visible(story, stmt->txn, + visible_replaced, true, &unused)) + break; + + /* + * We skip the story but once the story is committed + * before out TX that may cause conflict. + * The conflict will be inavoidable if this statement + * relies on old_tuple. If not (it's a replace), + * the conflict will take place only for secondary + * index if the story will not be overwritten in primary + * index. + */ + bool cross_conflict = false; + if (stmt->preserve_old_tuple) { + cross_conflict = true; + } else if (ind != 0) { + struct txm_story *look_up = story; + cross_conflict = true; + while (look_up->link[ind].newer_story != NULL) { + struct txm_story *over; + over = look_up->link[ind].newer_story; + if (over->add_stmt->txn == stmt->txn) { + cross_conflict = false; + break; + } + look_up = over; + } + } + if (cross_conflict) { + if (txm_save_conflict(story->add_stmt->txn, + collected_conflicts, + region) != 0) + return -1; + + } + } + return 0; +} + +int +txm_history_link_stmt(struct txn_stmt *stmt, + struct tuple *old_tuple, struct tuple *new_tuple, + enum dup_replace_mode mode, struct tuple **result) +{ + struct txm_story *add_story = NULL; + uint32_t add_story_linked = 0; + struct txm_story *del_story = NULL; + bool del_story_created = false; + struct region *region = &stmt->txn->region; + size_t region_svp = region_used(region); + + /** + * List of transactions that will conflict us once one of them + * become committed. + */ + struct txn_conflict *collected_conflicts = NULL; + + if (new_tuple != NULL) { + add_story = txm_story_new_add_stmt(new_tuple, stmt); + if (add_story == NULL) + goto fail; + + for (uint32_t i = 0; i < stmt->space->index_count; i++) { + struct tuple *replaced; + struct index *index = stmt->space->index[i]; + if (index_replace(index, NULL, new_tuple, + DUP_REPLACE_OR_INSERT, + &replaced) != 0) + goto fail; + txm_story_link_tuple(add_story, replaced, i); + add_story_linked++; + + struct tuple *visible_replaced = NULL; + if (txm_story_find_visible(stmt, add_story, i, + &visible_replaced, + &collected_conflicts, + region) != 0) + goto fail; + + uint32_t errcode; + errcode = replace_check_dup(old_tuple, + visible_replaced, + i == 0 ? mode : DUP_INSERT); + if (errcode != 0) { + struct space *sp = stmt->space; + if (sp != NULL) + diag_set(ClientError, errcode, + sp->index[i]->def->name, + space_name(sp)); + goto fail; + } + + if (i == 0) { + old_tuple = visible_replaced; + *result = visible_replaced; + } + } + } else { + assert(old_tuple != NULL); + *result = old_tuple; + } + + struct tuple *del_tuple = NULL; + if (new_tuple != NULL) { + struct txm_story_link *link = &add_story->link[0]; + if (link->older.is_story) { + del_story = link->older.story; + del_tuple = del_story->tuple; + } else { + del_tuple = link->older.tuple; + } + } else { + del_tuple = old_tuple; + } + if (del_tuple != NULL && del_story == NULL) { + if (del_tuple->is_dirty) { + del_story = txm_story_get(del_tuple); + } else { + del_story = txm_story_new_del_stmt(del_tuple, stmt); + if (del_story == NULL) + goto fail; + del_story_created = true; + } + } + if (new_tuple != NULL && del_story_created) { + for (uint32_t i = 0; i < add_story->index_count; i++) { + struct txm_story_link *link = &add_story->link[i]; + if (link->older.is_story) + continue; + if (link->older.tuple == del_tuple) { + txm_story_unlink(add_story, i); + txm_story_link_story(add_story, del_story, i); + } + } + } + if (del_story != NULL) { + stmt->next_in_del_list = del_story->del_stmt; + del_story->del_stmt = stmt; + stmt->del_story = del_story; + } + + while (collected_conflicts != NULL) { + if (txm_cause_conflict(collected_conflicts->other_txn, + stmt->txn) != 0) { + goto fail; + } + collected_conflicts = collected_conflicts->next; + } + region_truncate(region, region_svp); + if (stmt->new_tuple != NULL) + tuple_ref(stmt->new_tuple); + if (*result != NULL) + tuple_ref(*result); + return 0; + +fail: + if (add_story != NULL) { + while (add_story_linked > 0) { + --add_story_linked; + uint32_t i = add_story_linked; + + struct index *index = stmt->space->index[i]; + struct txm_story_link *link = &add_story->link[i]; + struct tuple *was = txm_story_older_tuple(link); + struct tuple *unused; + if (index_replace(index, new_tuple, was, + DUP_INSERT, + &unused) != 0) { + diag_log(); + unreachable(); + panic("failed to rollback change"); + } + + txm_story_unlink(stmt->add_story, i); + + } + txm_story_delete_add_stmt(stmt->add_story); + } + + if (del_story != NULL && del_story->del_stmt == stmt) { + del_story->del_stmt = stmt->next_in_del_list; + stmt->next_in_del_list = NULL; + } + + if (del_story_created) + txm_story_delete_del_stmt(stmt->del_story); + else + stmt->del_story = NULL; + + region_truncate(region, region_svp); + return -1; +} + +void +txm_history_unlink_stmt(struct txn_stmt *stmt) +{ + if (stmt->add_story != NULL) { + assert(stmt->add_story->tuple == stmt->new_tuple); + struct txm_story *story = stmt->add_story; + + for (uint32_t i = 0; i < story->index_count; i++) { + struct txm_story_link *link = &story->link[i]; + if (link->newer_story == NULL) { + struct tuple *unused; + struct index *index = stmt->space->index[i]; + struct tuple *was = txm_story_older_tuple(link); + if (index_replace(index, story->tuple, was, + DUP_INSERT, &unused) != 0) { + diag_log(); + unreachable(); + panic("failed to rollback change"); + } + } else { + struct txm_story *newer = link->newer_story; + assert(newer->link[i].older.is_story); + assert(newer->link[i].older.story == story); + txm_story_unlink(newer, i); + if (link->older.is_story) { + struct txm_story *to = link->older.story; + txm_story_link_story(newer,to, i); + } else { + struct tuple *to = link->older.tuple; + txm_story_link_tuple(newer, to, i); + } + } + txm_story_unlink(story, i); + } + stmt->add_story->add_stmt = NULL; + stmt->add_story = NULL; + tuple_unref(stmt->new_tuple); + } + + if (stmt->del_story != NULL) { + struct txm_story *story = stmt->del_story; + + struct txn_stmt **prev = &story->del_stmt; + while (*prev != stmt) { + prev = &(*prev)->next_in_del_list; + assert(*prev != NULL); + } + *prev = stmt->next_in_del_list; + stmt->next_in_del_list = NULL; + + stmt->del_story->del_stmt = NULL; + stmt->del_story = NULL; + } +} + +void +txm_history_prepare_stmt(struct txn_stmt *stmt) +{ + assert(stmt->txn->psn != 0); + + /* Move story to the past to prepared stories. */ + + struct txm_story *story = stmt->add_story; + uint32_t index_count = story == NULL ? 0 : story->index_count; + for (uint32_t i = 0; i < index_count; ) { + if (!story->link[i].older.is_story) { + /* tuple is old. */ + i++; + continue; + } + struct txm_story *old_story = + story->link[i].older.story; + if (old_story->del_psn != 0) { + /* is psn is set, the change is prepared. */ + i++; + continue; + } + if (old_story->add_psn != 0) { + /* is psn is set, the change is prepared. */ + i++; + continue; + } + + if (old_story->add_stmt != NULL) { + /* ancient */ + i++; + continue; + } + if (old_story->add_stmt->txn == stmt->txn) { + /* added by us. */ + i++; + continue; + } + + if (old_story->add_stmt->preserve_old_tuple || i != 0) + old_story->add_stmt->txn->status = TXN_CONFLICTED; + + /* Swap story and old story. */ + struct txm_story_link *link = &story->link[i]; + if (link->newer_story == NULL) { + /* we have to replace the tuple in index. */ + struct tuple *unused; + struct index *index = stmt->space->index[i]; + if (index_replace(index, story->tuple, + old_story->tuple, + DUP_INSERT, &unused) != 0) { + diag_log(); + unreachable(); + panic("failed to rollback change"); + } + } else { + struct txm_story *newer = link->newer_story; + assert(newer->link[i].older.is_story); + assert(newer->link[i].older.story == story); + txm_story_unlink(newer, i); + txm_story_link_story(newer, old_story, i); + } + + txm_story_unlink(story, i); + + txm_story_unlink(story, i); + if (old_story->link[i].older.is_story) { + struct txm_story *to = + old_story->link[i].older.story; + txm_story_unlink(old_story, i); + txm_story_link_story(story, to, i); + } else { + struct tuple *to = + old_story->link[i].older.tuple; + txm_story_unlink(old_story, i); + txm_story_link_tuple(story, to, i); + } + + txm_story_link_story(old_story, story, i); + + if (i == 0) { + assert(stmt->del_story == old_story); + assert(story->link[0].older.is_story || + story->link[0].older.tuple == NULL); + + struct txn_stmt *dels = old_story->del_stmt; + assert(dels != NULL); + do { + if (dels->txn != stmt->txn) + dels->txn->status = TXN_CONFLICTED; + dels->del_story = NULL; + struct txn_stmt *next = dels->next_in_del_list; + dels->next_in_del_list = NULL; + dels = next; + } while (dels != NULL); + old_story->del_stmt = NULL; + + if (story->link[0].older.is_story) { + struct txm_story *oldest_story = + story->link[0].older.story; + dels = oldest_story->del_stmt; + while (dels != NULL) { + assert(dels->txn != stmt->txn); + dels->del_story = NULL; + struct txn_stmt *next = + dels->next_in_del_list; + dels->next_in_del_list = NULL; + dels = next; + } + oldest_story->del_stmt = stmt; + stmt->del_story = oldest_story; + } + } + } + if (stmt->add_story != NULL) + stmt->add_story->add_psn = stmt->txn->psn; + + if (stmt->del_story != NULL) + stmt->del_story->del_psn = stmt->txn->psn; +} + +ssize_t +txm_history_release_stmt(struct txn_stmt *stmt) +{ + size_t res = 0; + if (stmt->add_story != NULL) { + assert(stmt->add_story->add_stmt == stmt); + res += stmt->add_story->tuple->bsize; + stmt->add_story->add_stmt = NULL; + stmt->add_story = NULL; + } + if (stmt->del_story != NULL) { + assert(stmt->del_story->del_stmt == stmt); + assert(stmt->next_in_del_list == NULL); + res -= stmt->del_story->tuple->bsize; + tuple_unref(stmt->del_story->tuple); + stmt->del_story->del_stmt = NULL; + stmt->del_story = NULL; + } + return res; +} + +struct tuple * +txm_tuple_clarify_slow(struct txn* txn, struct tuple* tuple, uint32_t index, + uint32_t mk_index, bool prepared_ok) +{ + assert(tuple->is_dirty); + struct txm_story *story = txm_story_get(tuple); + bool own_change = false; + struct tuple *result = NULL; + + while (true) { + if (txm_story_is_visible(story, txn, &result, + prepared_ok, &own_change)) { + break; + } + if (story->link[index].older.is_story) { + story = story->link[index].older.story; + } else { + result = story->link[index].older.tuple; + break; + } + } + (void)own_change; /* TODO: add conflict */ + (void)mk_index; /* TODO: multiindex */ + return result; +} +void +txm_story_delete(struct txm_story *story) +{ + assert(story->add_stmt == NULL); + assert(story->del_stmt == NULL); + + if (txm.traverse_all_stories == &story->in_all_stories) + txm.traverse_all_stories = rlist_next(txm.traverse_all_stories); + rlist_del(&story->in_all_stories); + + mh_int_t pos = mh_history_find(txm.history, story->tuple, 0); + assert(pos != mh_end(txm.history)); + mh_history_del(txm.history, pos, 0); + + story->tuple->is_dirty = false; + tuple_unref(story->tuple); + +#ifndef NDEBUG + /* Expecting to delete fully unlinked story. */ + for (uint32_t i = 0; i < story->index_count; i++) { + assert(story->link[i].newer_story == NULL); + assert(story->link[i].older.is_story == false); + assert(story->link[i].older.tuple == NULL); + } +#endif + + struct mempool *pool = &txm.txm_story_pool[story->index_count]; + mempool_free(pool, story); +} diff --git a/src/box/txn.h b/src/box/txn.h index 92c0116..bbe1c51 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -36,6 +36,7 @@ #include "trigger.h" #include "fiber.h" #include "space.h" +#include "tuple.h" #if defined(__cplusplus) extern "C" { @@ -123,6 +124,18 @@ struct txn_stmt { struct space *space; struct tuple *old_tuple; struct tuple *new_tuple; + /** + * If new_tuple != NULL and this transaction was not prepared, + * this member holds added story of the new_tuple. + */ + struct txm_story *add_story; + /** + * If new_tuple == NULL and this transaction was not prepared, + * this member holds added story of the old_tuple. + */ + struct txm_story *del_story; + /** Link in txm_story::del_stmt linked list. */ + struct txn_stmt *next_in_del_list; /** Engine savepoint for the start of this statement. */ void *engine_savepoint; /** Redo info: the binary log row */ @@ -284,6 +297,87 @@ struct txn { struct rlist conflicted_by_list; }; +/** + * Pointer to tuple or story. + */ +struct txm_story_or_tuple { + /** Flag whether it's a story. */ + bool is_story; + union { + /** Pointer to story, it must be reverse liked. */ + struct txm_story *story; + /** Smart pointer to tuple: the tuple is referenced if set. */ + struct tuple *tuple; + }; +}; + +/** + * Link that connects a txm_story with older and newer stories of the same + * key in index. + */ +struct txm_story_link { + /** Story that was happened after that story was ended. */ + struct txm_story *newer_story; + /** + * Older story or ancient tuple (so old that its story was lost). + * In case of tuple is can also be NULL. + */ + struct txm_story_or_tuple older; +}; + +/** + * A part of a history of a value in space. + * It's a story about a tuple, from the point it was added to space to the + * point when it was deleted from a space. + * All stories are linked into a list of stories of the same key of each index. + */ +struct txm_story { + /** The story is about this tuple. The tuple is referenced. */ + + struct tuple *tuple; + /** + * Statement that told this story. Is set to NULL when the statement's + * transaction becomes committed. Can also be NULL if we don't know who + * introduced that story. + */ + struct txn_stmt *add_stmt; + /** + * Prepare sequence number of add_stmt's transaction. Is set when + * the transactions is prepared. Can be 0 if the transaction is + * in progress or we don't know who introduced that story. + */ + int64_t add_psn; + /** + * Statement that ended this story. Is set to NULL when the statement's + * transaction becomes committed. Can also be NULL if the tuple has not + * been deleted yet. + */ + struct txn_stmt *del_stmt; + /** + * Prepare sequence number of del_stmt's transaction. Is set when + * the transactions is prepared. Can be 0 if the transaction is + * in progress or if nobody has deleted the tuple. + */ + int64_t del_psn; + /** + * List of trackers - transactions that has read this tuple. + */ + struct rlist reader_list; + /** + * Link in tx_manager::all_stories + */ + struct rlist in_all_stories; + /** + * Number of indexes in this space - and the count of link[]. + */ + uint32_t index_count; + /** + * Link with older and newer stories (and just tuples) for each + * index respectively. + */ + struct txm_story_link link[]; +}; + static inline bool txn_has_flag(struct txn *txn, enum txn_flag flag) { @@ -652,6 +746,39 @@ tx_manager_free(); int txm_cause_conflict(struct txn *wreaker, struct txn *victim); +struct txm_story * +txm_story_new(struct tuple *tuple, uint32_t index_count); + +int +txm_history_link_stmt(struct txn_stmt *stmt, + struct tuple *old_tuple, struct tuple *new_tuple, + enum dup_replace_mode mode, struct tuple **result); + +void +txm_history_unlink_stmt(struct txn_stmt *stmt); + +void +txm_history_prepare_stmt(struct txn_stmt *stmt); + +ssize_t +txm_history_release_stmt(struct txn_stmt *stmt); + +struct tuple * +txm_tuple_clarify_slow(struct txn *txn, struct tuple *tuple, uint32_t index, + uint32_t mk_index, bool prepared_ok); + +static inline struct tuple* +txm_tuple_clarify(struct txn *txn, struct tuple* tuple, uint32_t index, + uint32_t mk_index, bool prepared_ok) +{ + if (!tuple->is_dirty) + return tuple; + return txm_tuple_clarify_slow(txn, tuple, index, mk_index, prepared_ok); +} + +void +txm_story_delete(struct txm_story *story); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ -- 2.7.4