From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id AFC3142EF63 for ; Fri, 3 Jul 2020 09:33:31 +0300 (MSK) Received: by smtpng3.m.smailru.net with esmtpa (envelope-from ) id 1jrFGN-0003jC-85 for tarantool-patches@dev.tarantool.org; Fri, 03 Jul 2020 09:33:31 +0300 From: Aleksandr Lyapunov Date: Fri, 3 Jul 2020 09:33:14 +0300 Message-Id: <1593757997-4145-13-git-send-email-alyapunov@tarantool.org> In-Reply-To: <1593757997-4145-1-git-send-email-alyapunov@tarantool.org> References: <1593757997-4145-1-git-send-email-alyapunov@tarantool.org> Subject: [Tarantool-patches] [PATCH 12/15] tx: introduce txm_story List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org --- src/box/txn.c | 516 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++- src/box/txn.h | 129 +++++++++++++++ 2 files changed, 644 insertions(+), 1 deletion(-) diff --git a/src/box/txn.c b/src/box/txn.c index 79384f0..26377c6 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -35,13 +35,44 @@ #include #include "xrow.h" #include "errinj.h" +#include "small/mempool.h" + +static uint32_t +txm_story_key_hash(const struct tuple *a) +{ + uintptr_t u = (uintptr_t)a; + if (sizeof(uintptr_t) <= sizeof(uint32_t)) + return u; + else + return u ^ (u >> 32); +} + +#define mh_name _history +#define mh_key_t struct tuple * +#define mh_node_t struct txm_story * +#define mh_arg_t int +#define mh_hash(a, arg) (txm_story_key_hash((*(a))->tuple)) +#define mh_hash_key(a, arg) (txm_story_key_hash(a)) +#define mh_cmp(a, b, arg) ((*(a))->tuple != (*(b))->tuple) +#define mh_cmp_key(a, b, arg) ((a) != (*(b))->tuple) +#define MH_SOURCE +#include "salad/mhash.h" struct tx_manager { /** Last prepare-sequence-number that was assigned to preped TX. */ int64_t last_psn; + /** Mempools for tx_story objects with difference index count. */ + struct mempool txm_story_pool[BOX_INDEX_MAX]; + /** Hash table tuple -> txm_story of that tuple. */ + struct mh_history_t *history; + /** List of all txm_story objects. */ + struct rlist all_stories; + /** Iterator that sequentially traverses all txm_story objects. */ + struct rlist *traverse_all_stories; }; +/** The one and only instance of tx_manager. */ static struct tx_manager txm; struct tx_conflict_tracker { @@ -119,6 +150,9 @@ txn_stmt_new(struct region *region) stmt->space = NULL; stmt->old_tuple = NULL; stmt->new_tuple = NULL; + stmt->add_story = NULL; + stmt->del_story = NULL; + stmt->next_in_del_list = NULL; stmt->engine_savepoint = NULL; stmt->row = NULL; stmt->has_triggers = false; @@ -1022,11 +1056,23 @@ txn_on_yield(struct trigger *trigger, void *event) void tx_manager_init() { + for (size_t i = 0; i < BOX_INDEX_MAX; i++) { + size_t item_size = sizeof(struct txm_story) + + i * sizeof(struct txm_story_link); + mempool_create(&txm.txm_story_pool[i], + cord_slab_cache(), item_size); + } + txm.history = mh_history_new(); + rlist_create(&txm.all_stories); + txm.traverse_all_stories = &txm.all_stories; } void tx_manager_free() { + mh_history_delete(txm.history); + for (size_t i = 0; i < BOX_INDEX_MAX; i++) + mempool_destroy(&txm.txm_story_pool[i]); } int @@ -1070,4 +1116,472 @@ txm_cause_conflict(struct txn *wreaker, struct txn *victim) rlist_add(&wreaker->conflict_list, &tracker->in_conflict_list); rlist_add(&wreaker->conflicted_by_list, &tracker->in_conflicted_by_list); return 0; -} \ No newline at end of file +} + +struct txm_story * +txm_story_new(struct tuple *tuple, struct txn_stmt *stmt, uint32_t index_count) +{ + assert(!tuple->is_dirty); + assert(index_count < BOX_INDEX_MAX); + struct mempool *pool = &txm.txm_story_pool[index_count]; + struct txm_story *story = (struct txm_story *)mempool_alloc(pool); + if (story == NULL) { + size_t item_size = sizeof(struct txm_story) + + index_count * sizeof(struct txm_story_link); + diag_set(OutOfMemory, item_size, + "tx_manager", "tx story"); + return story; + } + story->tuple = tuple; + + const struct txm_story **put_story = (const struct txm_story **)&story; + struct txm_story **empty = NULL; + mh_int_t pos = mh_history_put(txm.history, put_story, &empty, 0); + if (pos == mh_end(txm.history)) { + mempool_free(pool, story); + diag_set(OutOfMemory, pos + 1, + "tx_manager", "tx history hash table"); + return NULL; + } + tuple->is_dirty = true; + tuple_ref(tuple); + + story->index_count = index_count; + story->add_stmt = stmt; + story->add_psn = 0; + story->del_stmt = NULL; + story->del_psn = 0; + rlist_create(&story->reader_list); + rlist_add(&txm.all_stories, &story->in_all_stories); + memset(story->link, 0, sizeof(story->link[0]) * index_count); + return story; +} + +/** Temporary allocated on region that stores a conflicting TX. */ +struct txn_conflict +{ + struct txn *wreaker; + struct txn_conflict *next; +}; + +int +txm_check_and_link_add_story(struct txm_story *story, struct txn_stmt *stmt, + enum dup_replace_mode mode) +{ + for (uint32_t i = 0; i < story->index_count; i++) { + assert(!story->link[i].is_old_story); + struct tuple *next_tuple = story->link[i].old_tuple; + if (next_tuple != NULL && next_tuple->is_dirty) { + struct txm_story *next = txm_story_get(next_tuple); + assert(next->link[i].new_story == NULL); + story->link[i].is_old_story = true; + story->link[i].old_story = next; + next->link[i].new_story = story; + } + } + + struct region *region = &stmt->txn->region; + size_t region_svp = region_used(region); + struct txn_conflict *collected_conflicts = NULL; + + for (uint32_t i = 0; i < story->index_count; i++) { + struct tuple *visible = NULL; + struct txm_story *node = story; + while (true) { + if (!node->link[i].is_old_story) { + /* + * the tuple is so old that we doesn't + * know its story. + */ + visible = node->link[i].old_tuple; + assert(visible == NULL || !visible->is_dirty); + break; + } + node = node->link[i].old_story; + + if (node->del_psn != 0) { + /* deleted by at least prepared TX. */ + break; + } + if (node->del_stmt != NULL && + node->del_stmt->txn == stmt->txn) + break; /* deleted by us. */ + if (node->add_psn != 0) { + /* added by at least prepared TX. */ + visible = node->tuple; + break; + } + if (node->add_stmt == NULL) { + /* + * the tuple is so old that we lost + * the beginning of its story. + */ + visible = node->tuple; + break; + } + if (node->add_stmt->txn == stmt->txn) { + /* added by us. */ + visible = node->tuple; + break; + } + /* + * We skip the story but once the story is committed + * before out TX that may cause conflict. + * The conflict will be inavoidable if this statement + * relies on old_tuple. If not (it's a replace), + * the conflict will take place only for secondary + * index if the story will not be overwritten in primary + * index. + */ + bool cross_conflict = false; + if (stmt->preserve_old_tuple) { + cross_conflict = true; + } else if (i != 0) { + struct txm_story *look_up = node; + cross_conflict = true; + while (look_up->link[0].new_story != NULL) { + struct txm_story *over; + over = look_up->link[0].new_story; + if (over->add_stmt->txn == stmt->txn) { + cross_conflict = false; + break; + } + look_up = over; + } + } + if (!cross_conflict) + continue; + size_t err_size; + struct txn_conflict *next_conflict; + next_conflict = + region_alloc_object(region, + struct txn_conflict, + &err_size); + if (next_conflict == NULL) { + diag_set(OutOfMemory, err_size, + "txn_region", "txn conflict"); + goto fail; + } + next_conflict->wreaker = node->add_stmt->txn; + next_conflict->next = collected_conflicts; + collected_conflicts = next_conflict; + } + + int errcode; + errcode = replace_check_dup(stmt->old_tuple, visible, + i == 0 ? mode : DUP_INSERT); + if (errcode != 0) { + struct space *sp = stmt->space; + if (sp != NULL) + diag_set(ClientError, errcode, + sp->index[i]->def->name, + space_name(sp)); + goto fail; + } + } + + if (story->link[0].is_old_story) { + stmt->next_in_del_list = story->link[0].old_story->del_stmt; + story->link[0].old_story->del_stmt = stmt; + for (uint32_t i = 0; i < story->index_count; i++) { + if (story->link[i].is_old_story) + continue; + if (story->link[i].old_tuple != NULL) + tuple_ref(story->link[i].old_tuple); + } + } else if (story->link[0].old_tuple != NULL) { + struct tuple *old_tuple = story->link[0].old_tuple; + struct txm_story *del_story; + del_story = txm_story_new(old_tuple, NULL, story->index_count); + if (del_story == NULL) + goto fail; + del_story->del_stmt = stmt; + for (uint32_t i = 0; i < story->index_count; i++) { + if (story->link[i].is_old_story) + continue; + if (story->link[i].old_tuple == old_tuple) { + story->link[i].is_old_story = true; + story->link[i].old_story = del_story; + } else if (story->link[i].old_tuple != NULL) { + tuple_ref(story->link[i].old_tuple); + } + } + } + + while (collected_conflicts != NULL) { + if (txm_cause_conflict(collected_conflicts->wreaker, + stmt->txn) != 0) { + goto fail; + } + collected_conflicts = collected_conflicts->next; + } + stmt->add_story = story; + + region_truncate(region, region_svp); + return 0; + +fail: + for (uint32_t j = story->index_count; j > 0; j--) { + uint32_t i = j - 1; + if (story->link[i].is_old_story) { + struct txm_story *next = story->link[i].old_story; + story->link[i].is_old_story = false; + story->link[i].old_tuple = next->tuple; + next->link[i].new_story = NULL; + } + } + region_truncate(region, region_svp); + return -1; +} + +int +txm_link_del_story(struct tuple *old_tuple, struct txn_stmt *stmt, + uint32_t index_count) +{ + if (old_tuple->is_dirty) { + struct txm_story *story = txm_story_get(old_tuple); + stmt->next_in_del_list = story->del_stmt; + story->del_stmt = stmt; + stmt->del_story = story; + return 0; + } + struct txm_story *del_story; + del_story = txm_story_new(old_tuple, NULL, index_count); + if (del_story == NULL) + return -1; + del_story->del_stmt = stmt; + stmt->del_story = del_story; + return 0; +} + +void +txm_unlink_add_story(struct txn_stmt *stmt) +{ + assert(stmt->add_story != NULL); + + struct txm_story *story = stmt->add_story; + + for (uint32_t i = 0; i < story->index_count; i++) { + struct txm_story_link *from = &story->link[i]; + if (from->new_story == NULL) { + struct tuple *unused; + struct index *index = stmt->space->index[i]; + struct tuple *rollback = from->is_old_story ? + from->old_story->tuple : + from->old_tuple; + if (index_replace(index, story->tuple, rollback, + DUP_INSERT, &unused) != 0) { + diag_log(); + unreachable(); + panic("failed to rollback change"); + } + if (i == 0 && rollback != NULL) + tuple_ref(rollback); + } else { + struct txm_story *new_story = from->new_story; + struct txm_story_link *to = &new_story->link[i]; + assert(to->is_old_story); + assert(to->old_story == story); + to->is_old_story = from->is_old_story; + if (from->is_old_story) { + to->old_story = from->old_story; + from->old_story = NULL; + } else { + to->old_tuple = from->old_tuple; + from->old_tuple = NULL; + } + from->is_old_story = false; + } + } + + stmt->add_story = NULL; +} + +void +txm_unlink_del_story(struct txn_stmt *stmt) +{ + assert(stmt->del_story != NULL); + + struct txm_story *story = stmt->del_story; + + struct txn_stmt **prev = &story->del_stmt; + while (*prev != stmt) { + prev = &(*prev)->next_in_del_list; + assert(*prev != NULL); + } + *prev = stmt->next_in_del_list; + + stmt->del_story = NULL; +} + +void +txm_prepare_add_story(struct txn_stmt *stmt) +{ + assert(stmt->txn->psn != 0); + assert(stmt->add_story != NULL); + struct txm_story *story = stmt->add_story; + + /* Move story to the past to prepared stories. */ + + while (true) { + if (!story->link[0].is_old_story) + break; /* tuple is prepared. */ + struct txm_story *old_story = story->link[0].old_story; + if (old_story->del_psn != 0) + break; /* is psn is set, the change is prepared. */ + if (old_story->add_psn != 0) + break; /* is psn is set, the change is prepared. */ + if (old_story->add_stmt != NULL) + break; /* ancient */ + if (old_story->add_stmt->txn == stmt->txn) + break; /* added by us. */ + + /* Swap story and old story. */ + for (uint32_t i = 0; i < old_story->index_count; i++) { + if (!story->link[i].is_old_story || + story->link[i].old_story != old_story) + continue; + if (story->link[i].new_story == NULL) { + /* we have to replace the tuple in index. */ + struct tuple *unused; + struct index *index = stmt->space->index[i]; + if (index_replace(index, story->tuple, + old_story->tuple, + DUP_INSERT, &unused) != 0) { + diag_log(); + unreachable(); + panic("failed to rollback change"); + } + } else { + assert(story->link[i].new_story->link[i].old_story == story); + story->link[i].new_story->link[i].old_story = old_story; + } + old_story->link[i].new_story = story->link[i].new_story; + + story->link[i].is_old_story = old_story->link[i].is_old_story; + if (old_story->link[i].is_old_story) { + story->link[i].old_story = old_story->link[i].old_story; + assert(old_story->link[i].old_story->link[i].new_story == old_story); + old_story->link[i].old_story->link[i].new_story = story; + } else { + story->link[i].old_tuple = old_story->link[i].old_tuple; + } + old_story->link[i].is_old_story = true; + old_story->link[i].old_story = story; + } + } + story->add_psn = stmt->txn->psn; +} + +void +txm_let_it_go(struct txn_stmt *stmt) +{ + if (stmt->add_story != NULL) { + stmt->add_story->add_stmt = NULL; + stmt->add_story = NULL; + } + if (stmt->del_story != NULL) { + struct txn_stmt **dels = &stmt->del_story->del_stmt; + while ((*dels) != stmt) { + assert(*dels != NULL); + dels = &(*dels)->next_in_del_list; + } + *dels = (*dels)->next_in_del_list; + stmt->del_story = NULL; + } +} + +struct txm_story * +txm_story_get(struct tuple *tuple) +{ + assert(tuple->is_dirty); + + mh_int_t pos = mh_history_find(txm.history, tuple, 0); + assert(pos != mh_end(txm.history)); + return *mh_history_node(txm.history, pos); +} + +struct tuple * +txm_tuple_clarify_slow(struct txn* txn, struct tuple* tuple, uint32_t index, + uint32_t mk_index, bool prepared_ok) +{ + struct tuple *result = NULL; + struct mh_history_t *hist = txm.history; + mh_int_t pos = mh_history_find(hist, tuple, 0); + assert(pos != mh_end(hist)); + struct txm_story *story = *mh_history_node(hist, pos); + bool own_change = false; + + while (true) { + struct txn_stmt *dels = story->del_stmt; + while (dels != NULL) { + if (dels->txn == txn) { + /* deleted by us. */ + own_change = true; + break; + } + dels = dels->next_in_del_list; + } + if (prepared_ok && story->del_psn != 0) { + /* deleted by prepared. */ + break; + } + if (story->del_psn != 0 && story->del_stmt == NULL) { + /* deleted by committed. */ + break; + } + + if (story->add_stmt != NULL && story->add_stmt->txn == txn) { + /* added by us. */ + result = story->tuple; + break; + } + + if (prepared_ok && story->add_psn != 0) { + /* added by prepared. */ + result = story->tuple; + break; + } + + if (story->add_psn != 0 && story->add_stmt == NULL) { + /* added by committed. */ + result = story->tuple; + break; + } + + if (!story->link[index].is_old_story) { + return story->link[index].old_tuple; + } + + story = story->link[index].old_story; + } + (void)own_change; /* TODO: add conflict */ + (void)mk_index; /* TODO: multiindex */ + return result; +} +void +txm_story_delete(struct txm_story *story) +{ + if (txm.traverse_all_stories == &story->in_all_stories) + txm.traverse_all_stories = rlist_next(txm.traverse_all_stories); + rlist_del(&story->in_all_stories); + tuple_unref(story->tuple); + story->tuple->is_dirty = false; + + for (uint32_t i = 0; i < story->index_count; i++) { + if (!story->link[i].is_old_story && + story->link[i].old_tuple != NULL) { + tuple_unref(story->link[i].old_tuple); + } + } + +#ifndef NDEBUG + const char poison_char = '?'; + size_t item_size = sizeof(struct txm_story) + + story->index_count * sizeof(struct txm_story_link); + memset(story, poison_char, item_size); +#endif + + struct mempool *pool = &txm.txm_story_pool[story->index_count]; + mempool_free(pool, story); +} diff --git a/src/box/txn.h b/src/box/txn.h index 92c0116..b5eda1e 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -36,6 +36,7 @@ #include "trigger.h" #include "fiber.h" #include "space.h" +#include "tuple.h" #if defined(__cplusplus) extern "C" { @@ -123,6 +124,18 @@ struct txn_stmt { struct space *space; struct tuple *old_tuple; struct tuple *new_tuple; + /** + * If new_tuple != NULL and this transaction was not prepared, + * this member holds added story of the new_tuple. + */ + struct txm_story *add_story; + /** + * If new_tuple == NULL and this transaction was not prepared, + * this member holds added story of the old_tuple. + */ + struct txm_story *del_story; + /** Link in txm_story::del_stmt linked list. */ + struct txn_stmt *next_in_del_list; /** Engine savepoint for the start of this statement. */ void *engine_savepoint; /** Redo info: the binary log row */ @@ -284,6 +297,80 @@ struct txn { struct rlist conflicted_by_list; }; +/** + * Link that connects a txm_story with older and newer stories of the same + * key in index. + */ +struct txm_story_link { + /** Story that was happened after that story was ended. */ + struct txm_story *new_story; + /** Flag whether there is older story of the same key in index. */ + bool is_old_story; + union { + /** is_old_story = true. Older story of the same key. */ + struct txm_story *old_story; + /** + * is_old_story = false. Tuple that was in the index before + * this story. That tuple is either NULL or so old that + * we don't have a story about it. + */ + + struct tuple *old_tuple; + }; +}; + +/** + * A part of a history of a value in space. + * It's a story about a tuple, from the point it was added to space to the + * point when it was deleted from a space. + * All stories are linked into a list of stories of the same key of each index. + */ +struct txm_story { + /** The story is about thist tuple. */ + struct tuple *tuple; + /** + * Statement that told this story. Is set to NULL when the statement's + * transaction becomes committed. Can also be NULL if we don't know who + * introduced that story. + */ + struct txn_stmt *add_stmt; + /** + * Prepare sequence number of add_stmt's transaction. Is set when + * the transactions is prepared. Can be 0 if the transaction is + * in progress or we don't know who introduced that story. + */ + int64_t add_psn; + /** + * Statement that ended this story. Is set to NULL when the statement's + * transaction becomes committed. Can also be NULL if the tuple has not + * been deleted yet. + */ + struct txn_stmt *del_stmt; + /** + * Prepare sequence number of del_stmt's transaction. Is set when + * the transactions is prepared. Can be 0 if the transaction is + * in progress or if nobody has deleted the tuple. + */ + int64_t del_psn; + /** + * List of trackers - transactions that has read this tuple. + */ + struct rlist reader_list; + /** + * Link in tx_manager::all_stories + */ + struct rlist in_all_stories; + /** + * Number of indexes in this space - and the count of link[]. + */ + uint32_t index_count; + /** + * Link with older and newer stories (and just tuples) for each + * index respectively. + */ + struct txm_story_link link[]; +}; + static inline bool txn_has_flag(struct txn *txn, enum txn_flag flag) { @@ -652,6 +739,48 @@ tx_manager_free(); int txm_cause_conflict(struct txn *wreaker, struct txn *victim); +struct txm_story * +txm_story_new(struct tuple *tuple, struct txn_stmt *stmt, uint32_t index_count); + +int +txm_check_and_link_add_story(struct txm_story *story, struct txn_stmt *stmt, + enum dup_replace_mode mode); + +int +txm_link_del_story(struct tuple *old_tuple, struct txn_stmt *stmt, + uint32_t index_count); + +void +txm_unlink_add_story(struct txn_stmt *stmt); + +void +txm_unlink_del_story(struct txn_stmt *stmt); + +void +txm_prepare_add_story(struct txn_stmt *stmt); + +void +txm_let_it_go(struct txn_stmt *stmt); + +struct txm_story * +txm_story_get(struct tuple *tuple); + +struct tuple * +txm_tuple_clarify_slow(struct txn *txn, struct tuple *tuple, uint32_t index, + uint32_t mk_index, bool prepared_ok); + +static inline struct tuple* +txm_tuple_clarify(struct txn *txn, struct tuple* tuple, uint32_t index, + uint32_t mk_index, bool prepared_ok) +{ + if (!tuple->is_dirty) + return tuple; + return txm_tuple_clarify_slow(txn, tuple, index, mk_index, prepared_ok); +} + +void +txm_story_delete(struct txm_story *story); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ -- 2.7.4