[Tarantool-patches] [PATCH v3 11/13] txm: introduce txm_story
Aleksandr Lyapunov
alyapunov at tarantool.org
Wed Jul 15 16:55:34 MSK 2020
TXM story is a part of a history of a value in space.
It's a story about a tuple, from the point it was added to space
to the point when it was deleted from the space.
All stories are linked into a list of stories of the same key of
each index.
Part of #4897
---
src/box/txn.c | 798 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/box/txn.h | 193 ++++++++++++++
2 files changed, 991 insertions(+)
diff --git a/src/box/txn.c b/src/box/txn.c
index ba81b58..4c46b60 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -37,6 +37,28 @@
#include "xrow.h"
#include "errinj.h"
#include "iproto_constants.h"
+#include "small/mempool.h"
+
+static uint32_t
+txm_story_key_hash(const struct tuple *a)
+{
+ uintptr_t u = (uintptr_t)a;
+ if (sizeof(uintptr_t) <= sizeof(uint32_t))
+ return u;
+ else
+ return u ^ (u >> 32);
+}
+
+#define mh_name _history
+#define mh_key_t struct tuple *
+#define mh_node_t struct txm_story *
+#define mh_arg_t int
+#define mh_hash(a, arg) (txm_story_key_hash((*(a))->tuple))
+#define mh_hash_key(a, arg) (txm_story_key_hash(a))
+#define mh_cmp(a, b, arg) ((*(a))->tuple != (*(b))->tuple)
+#define mh_cmp_key(a, b, arg) ((a) != (*(b))->tuple)
+#define MH_SOURCE
+#include "salad/mhash.h"
struct tx_manager
{
@@ -48,6 +70,14 @@ struct tx_manager
* so the list is ordered by rv_psn.
*/
struct rlist read_view_txs;
+ /** Mempools for tx_story objects with difference index count. */
+ struct mempool txm_story_pool[BOX_INDEX_MAX];
+ /** Hash table tuple -> txm_story of that tuple. */
+ struct mh_history_t *history;
+ /** List of all txm_story objects. */
+ struct rlist all_stories;
+ /** Iterator that sequentially traverses all txm_story objects. */
+ struct rlist *traverse_all_stories;
};
/** That's a definition, see declaration for description. */
@@ -146,6 +176,9 @@ txn_stmt_new(struct region *region)
stmt->space = NULL;
stmt->old_tuple = NULL;
stmt->new_tuple = NULL;
+ stmt->add_story = NULL;
+ stmt->del_story = NULL;
+ stmt->next_in_del_list = NULL;
stmt->engine_savepoint = NULL;
stmt->row = NULL;
stmt->has_triggers = false;
@@ -1294,11 +1327,23 @@ void
tx_manager_init()
{
rlist_create(&txm.read_view_txs);
+ for (size_t i = 0; i < BOX_INDEX_MAX; i++) {
+ size_t item_size = sizeof(struct txm_story) +
+ i * sizeof(struct txm_story_link);
+ mempool_create(&txm.txm_story_pool[i],
+ cord_slab_cache(), item_size);
+ }
+ txm.history = mh_history_new();
+ rlist_create(&txm.all_stories);
+ txm.traverse_all_stories = &txm.all_stories;
}
void
tx_manager_free()
{
+ mh_history_delete(txm.history);
+ for (size_t i = 0; i < BOX_INDEX_MAX; i++)
+ mempool_destroy(&txm.txm_story_pool[i]);
}
int
@@ -1345,3 +1390,756 @@ txm_cause_conflict(struct txn *breaker, struct txn *victim)
rlist_add(&victim->conflicted_by_list, &tracker->in_conflicted_by_list);
return 0;
}
+
+/**
+ * Creates new story and links it with the @tuple.
+ * @return story on success, NULL on error (diag is set).
+ */
+static struct txm_story *
+txm_story_new(struct space *space, struct tuple *tuple)
+{
+ assert(!tuple->is_dirty);
+ uint32_t index_count = space->index_count;
+ assert(index_count < BOX_INDEX_MAX);
+ struct mempool *pool = &txm.txm_story_pool[index_count];
+ struct txm_story *story = (struct txm_story *)mempool_alloc(pool);
+ if (story == NULL) {
+ size_t item_size = sizeof(struct txm_story) +
+ index_count * sizeof(struct txm_story_link);
+ diag_set(OutOfMemory, item_size, "tx_manager", "tx story");
+ return story;
+ }
+ story->tuple = tuple;
+
+ const struct txm_story **put_story = (const struct txm_story **)&story;
+ struct txm_story **empty = NULL;
+ mh_int_t pos = mh_history_put(txm.history, put_story, &empty, 0);
+ if (pos == mh_end(txm.history)) {
+ mempool_free(pool, story);
+ diag_set(OutOfMemory, pos + 1,
+ "tx_manager", "tx history hash table");
+ return NULL;
+ }
+ tuple->is_dirty = true;
+ tuple_ref(tuple);
+
+ story->index_count = index_count;
+ story->add_stmt = NULL;
+ story->add_psn = 0;
+ story->del_stmt = NULL;
+ story->del_psn = 0;
+ rlist_create(&story->reader_list);
+ rlist_add_tail(&txm.all_stories, &story->in_all_stories);
+ memset(story->link, 0, sizeof(story->link[0]) * index_count);
+ return story;
+}
+
+static void
+txm_story_delete(struct txm_story *story);
+
+/**
+ * Creates new story of a @tuple that was added by @stmt.
+ * @return story on success, NULL on error (diag is set).
+ */
+static struct txm_story *
+txm_story_new_add_stmt(struct tuple *tuple, struct txn_stmt *stmt)
+{
+ struct txm_story *res = txm_story_new(stmt->space, tuple);
+ if (res == NULL)
+ return NULL;
+ res->add_stmt = stmt;
+ assert(stmt->add_story == NULL);
+ stmt->add_story = res;
+ return res;
+}
+
+/**
+ * Creates new story of a @tuple that was deleted by @stmt.
+ * @return story on success, NULL on error (diag is set).
+ */
+static struct txm_story *
+txm_story_new_del_stmt(struct tuple *tuple, struct txn_stmt *stmt)
+{
+ struct txm_story *res = txm_story_new(stmt->space, tuple);
+ if (res == NULL)
+ return NULL;
+ res->del_stmt = stmt;
+ assert(stmt->del_story == NULL);
+ stmt->del_story = res;
+ return res;
+}
+
+/**
+ * Undo txm_story_new_add_stmt.
+ */
+static void
+txm_story_delete_add_stmt(struct txm_story *story)
+{
+ story->add_stmt->add_story = NULL;
+ story->add_stmt = NULL;
+ txm_story_delete(story);
+}
+
+/**
+ * Undo txm_story_new_del_stmt.
+ */
+static void
+txm_story_delete_del_stmt(struct txm_story *story)
+{
+ story->del_stmt->del_story = NULL;
+ story->del_stmt = NULL;
+ txm_story_delete(story);
+}
+
+
+/**
+ * Find a story of a @tuple. The story expected to be present (assert).
+ */
+static struct txm_story *
+txm_story_get(struct tuple *tuple)
+{
+ assert(tuple->is_dirty);
+
+ mh_int_t pos = mh_history_find(txm.history, tuple, 0);
+ assert(pos != mh_end(txm.history));
+ return *mh_history_node(txm.history, pos);
+}
+
+/**
+ * Get the older tuple, extracting it from older story if necessary.
+ */
+static struct tuple *
+txm_story_older_tuple(struct txm_story_link *link)
+{
+ return link->older.is_story ? link->older.story->tuple
+ : link->older.tuple;
+}
+
+/**
+ * Link a @story with older story in @index (in both directions).
+ */
+static void
+txm_story_link_story(struct txm_story *story, struct txm_story *older_story,
+ uint32_t index)
+{
+ assert(older_story != NULL);
+ struct txm_story_link *link = &story->link[index];
+ /* Must be unlinked. */
+ assert(!link->older.is_story);
+ assert(link->older.tuple == NULL);
+ link->older.is_story = true;
+ link->older.story = older_story;
+ older_story->link[index].newer_story = story;
+}
+
+/**
+ * Link a @story with older tuple in @index. In case if the tuple is dirty -
+ * find and link with the corresponding story.
+ */
+static void
+txm_story_link_tuple(struct txm_story *story, struct tuple *older_tuple,
+ uint32_t index)
+{
+ struct txm_story_link *link = &story->link[index];
+ /* Must be unlinked. */
+ assert(!link->older.is_story);
+ assert(link->older.tuple == NULL);
+ if (older_tuple == NULL)
+ return;
+ if (older_tuple->is_dirty) {
+ txm_story_link_story(story, txm_story_get(older_tuple), index);
+ return;
+ }
+ link->older.tuple = older_tuple;
+ tuple_ref(link->older.tuple);
+}
+
+/**
+ * Unlink a @story with older story/tuple in @index.
+ */
+static void
+txm_story_unlink(struct txm_story *story, uint32_t index)
+{
+ struct txm_story_link *link = &story->link[index];
+ if (link->older.is_story) {
+ link->older.story->link[index].newer_story = NULL;
+ } else if (link->older.tuple != NULL) {
+ tuple_unref(link->older.tuple);
+ link->older.tuple = NULL;
+ }
+ link->older.is_story = false;
+ link->older.tuple = NULL;
+}
+
+/**
+ * Check if a @story is visible for transaction @txn. Return visible tuple to
+ * @visible_tuple (can be set to NULL).
+ * @param is_prepared_ok - whether prepared (not committed) change is acceptable.
+ * @param own_change - return true if the change was made by @txn itself.
+ * @return true if the story is visible, false otherwise.
+ */
+static bool
+txm_story_is_visible(struct txm_story *story, struct txn *txn,
+ struct tuple **visible_tuple, bool is_prepared_ok,
+ bool *own_change)
+{
+ *own_change = false;
+ *visible_tuple = NULL;
+ struct txn_stmt *dels = story->del_stmt;
+ while (dels != NULL) {
+ if (dels->txn == txn) {
+ /* Tuple is deleted by us (@txn). */
+ *own_change = true;
+ return true;
+ }
+ dels = dels->next_in_del_list;
+ }
+ if (is_prepared_ok && story->del_psn != 0 &&
+ (txn->rv_psn == 0 || story->del_psn < txn->rv_psn)) {
+ /* Tuple is deleted by prepared TX. */
+ return true;
+ }
+ if (story->del_psn != 0 && story->del_stmt == NULL &&
+ (txn->rv_psn == 0 || story->del_psn < txn->rv_psn)) {
+ /* Tuple is deleted by committed TX. */
+ return true;
+ }
+
+ if (story->add_stmt != NULL && story->add_stmt->txn == txn) {
+ /* Tuple is added by us (@txn). */
+ *visible_tuple = story->tuple;
+ *own_change = true;
+ return true;
+ }
+ if (is_prepared_ok && story->add_psn != 0 &&
+ (txn->rv_psn == 0 || story->add_psn < txn->rv_psn)) {
+ /* Tuple is added by another prepared TX. */
+ *visible_tuple = story->tuple;
+ return true;
+ }
+ if (story->add_psn != 0 && story->add_stmt == NULL &&
+ (txn->rv_psn == 0 || story->add_psn < txn->rv_psn)) {
+ /* Tuple is added by committed TX. */
+ *visible_tuple = story->tuple;
+ return true;
+ }
+ if (story->add_psn == 0 && story->add_stmt == NULL) {
+ /* added long time ago. */
+ *visible_tuple = story->tuple;
+ return true;
+ }
+ return false;
+}
+
+/**
+ * Temporary (allocated on region) struct that stores a conflicting TX.
+ */
+struct txn_conflict
+{
+ struct txn *other_txn;
+ struct txn_conflict *next;
+};
+
+/**
+ * Save @other_txn in list with head @coflicts_head. New list node is allocated
+ * on @region.
+ * @return 0 on success, -1 on memory error.
+ */
+static int
+txm_save_conflict(struct txn *other_txn, struct txn_conflict **coflicts_head,
+ struct region *region)
+{
+ size_t err_size;
+ struct txn_conflict *next_conflict;
+ next_conflict = region_alloc_object(region, struct txn_conflict,
+ &err_size);
+ if (next_conflict == NULL) {
+ diag_set(OutOfMemory, err_size, "txn_region", "txn conflict");
+ return -1;
+ }
+ next_conflict->other_txn = other_txn;
+ next_conflict->next = *coflicts_head;
+ *coflicts_head = next_conflict;
+ return 0;
+}
+
+/**
+ * Scan a history starting by @stmt statement in @index for a visible tuple
+ * (prepared suits), returned via @visible_replaced.
+ * Collect a list of transactions that will abort current transaction if they
+ * are committed.
+ *
+ * @return 0 on success, -1 on memory error.
+ */
+static int
+txm_story_find_visible(struct txn_stmt *stmt, struct txm_story *story,
+ uint32_t index, struct tuple **visible_replaced,
+ struct txn_conflict **collected_conflicts,
+ struct region *region)
+{
+ while (true) {
+ if (!story->link[index].older.is_story) {
+ /*
+ * the tuple is so old that we doesn't
+ * know its story.
+ */
+ *visible_replaced = story->link[index].older.tuple;
+ assert(*visible_replaced == NULL ||
+ !(*visible_replaced)->is_dirty);
+ break;
+ }
+ story = story->link[index].older.story;
+ bool unused;
+ if (txm_story_is_visible(story, stmt->txn, visible_replaced,
+ true, &unused))
+ break;
+
+ /*
+ * We skip the story but once the story is committed
+ * before out TX that may cause conflict.
+ * The conflict will be unavoidable if this statement
+ * relies on old_tuple. If not (it's a replace),
+ * the conflict will take place only for secondary
+ * index if the story will not be overwritten in primary
+ * index.
+ */
+ bool cross_conflict = false;
+ if (stmt->does_require_old_tuple) {
+ cross_conflict = true;
+ } else if (index != 0) {
+ struct txm_story *look_up = story;
+ cross_conflict = true;
+ while (look_up->link[index].newer_story != NULL) {
+ struct txm_story *over;
+ over = look_up->link[index].newer_story;
+ if (over->add_stmt->txn == stmt->txn) {
+ cross_conflict = false;
+ break;
+ }
+ look_up = over;
+ }
+ }
+ if (cross_conflict) {
+ if (txm_save_conflict(story->add_stmt->txn,
+ collected_conflicts,
+ region) != 0)
+ return -1;
+
+ }
+ }
+ return 0;
+}
+
+int
+txm_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple,
+ struct tuple *new_tuple, enum dup_replace_mode mode,
+ struct tuple **result)
+{
+ assert(new_tuple != NULL || old_tuple != NULL);
+ struct space *space = stmt->space;
+ struct txm_story *add_story = NULL;
+ uint32_t add_story_linked = 0;
+ struct txm_story *del_story = NULL;
+ bool del_story_created = false;
+ struct region *region = &stmt->txn->region;
+ size_t region_svp = region_used(region);
+
+ /*
+ * List of transactions that will conflict us once one of them
+ * become committed.
+ */
+ struct txn_conflict *collected_conflicts = NULL;
+
+ /* Create add_story if necessary. */
+ if (new_tuple != NULL) {
+ add_story = txm_story_new_add_stmt(new_tuple, stmt);
+ if (add_story == NULL)
+ goto fail;
+
+ for (uint32_t i = 0; i < space->index_count; i++) {
+ struct tuple *replaced;
+ struct index *index = space->index[i];
+ if (index_replace(index, NULL, new_tuple,
+ DUP_REPLACE_OR_INSERT,
+ &replaced) != 0)
+ goto fail;
+ txm_story_link_tuple(add_story, replaced, i);
+ add_story_linked++;
+
+ struct tuple *visible_replaced = NULL;
+ if (txm_story_find_visible(stmt, add_story, i,
+ &visible_replaced,
+ &collected_conflicts,
+ region) != 0)
+ goto fail;
+
+ uint32_t errcode;
+ errcode = replace_check_dup(old_tuple, visible_replaced,
+ i == 0 ? mode : DUP_INSERT);
+ if (errcode != 0) {
+ struct space *sp = stmt->space;
+ if (sp != NULL)
+ diag_set(ClientError, errcode,
+ sp->index[i]->def->name,
+ space_name(sp));
+ goto fail;
+ }
+
+ if (i == 0) {
+ old_tuple = visible_replaced;
+ }
+ }
+ }
+
+ /* Create del_story if necessary. */
+ struct tuple *del_tuple = NULL;
+ if (new_tuple != NULL) {
+ struct txm_story_link *link = &add_story->link[0];
+ if (link->older.is_story) {
+ del_story = link->older.story;
+ del_tuple = del_story->tuple;
+ } else {
+ del_tuple = link->older.tuple;
+ }
+ } else {
+ del_tuple = old_tuple;
+ }
+ if (del_tuple != NULL && del_story == NULL) {
+ if (del_tuple->is_dirty) {
+ del_story = txm_story_get(del_tuple);
+ } else {
+ del_story = txm_story_new_del_stmt(del_tuple, stmt);
+ if (del_story == NULL)
+ goto fail;
+ del_story_created = true;
+ }
+ }
+ if (new_tuple != NULL && del_story_created) {
+ for (uint32_t i = 0; i < add_story->index_count; i++) {
+ struct txm_story_link *link = &add_story->link[i];
+ if (link->older.is_story)
+ continue;
+ if (link->older.tuple == del_tuple) {
+ txm_story_unlink(add_story, i);
+ txm_story_link_story(add_story, del_story, i);
+ }
+ }
+ }
+ if (del_story != NULL && !del_story_created) {
+ stmt->next_in_del_list = del_story->del_stmt;
+ del_story->del_stmt = stmt;
+ stmt->del_story = del_story;
+ }
+
+ /* Purge found conflicts. */
+ while (collected_conflicts != NULL) {
+ if (txm_cause_conflict(collected_conflicts->other_txn,
+ stmt->txn) != 0)
+ goto fail;
+ collected_conflicts = collected_conflicts->next;
+ }
+ region_truncate(region, region_svp);
+
+ /*
+ * We now reference both new and old tuple because the stmt holds
+ * pointers to them.
+ */
+ if (stmt->new_tuple != NULL)
+ tuple_ref(stmt->new_tuple);
+ *result = old_tuple;
+ if (*result != NULL)
+ tuple_ref(*result);
+ return 0;
+
+fail:
+ if (add_story != NULL) {
+ while (add_story_linked > 0) {
+ --add_story_linked;
+ uint32_t i = add_story_linked;
+
+ struct index *index = space->index[i];
+ struct txm_story_link *link = &add_story->link[i];
+ struct tuple *was = txm_story_older_tuple(link);
+ struct tuple *unused;
+ if (index_replace(index, new_tuple, was,
+ DUP_INSERT,
+ &unused) != 0) {
+ diag_log();
+ unreachable();
+ panic("failed to rollback change");
+ }
+
+ txm_story_unlink(stmt->add_story, i);
+
+ }
+ txm_story_delete_add_stmt(stmt->add_story);
+ }
+
+ if (del_story != NULL && del_story->del_stmt == stmt) {
+ del_story->del_stmt = stmt->next_in_del_list;
+ stmt->next_in_del_list = NULL;
+ }
+
+ if (del_story_created)
+ txm_story_delete_del_stmt(stmt->del_story);
+ else
+ stmt->del_story = NULL;
+
+ region_truncate(region, region_svp);
+ return -1;
+}
+
+void
+txm_history_rollback_stmt(struct txn_stmt *stmt)
+{
+ if (stmt->add_story != NULL) {
+ assert(stmt->add_story->tuple == stmt->new_tuple);
+ struct txm_story *story = stmt->add_story;
+
+ for (uint32_t i = 0; i < story->index_count; i++) {
+ struct txm_story_link *link = &story->link[i];
+ if (link->newer_story == NULL) {
+ struct tuple *unused;
+ struct index *index = stmt->space->index[i];
+ struct tuple *was = txm_story_older_tuple(link);
+ if (index_replace(index, story->tuple, was,
+ DUP_INSERT, &unused) != 0) {
+ diag_log();
+ unreachable();
+ panic("failed to rollback change");
+ }
+ } else {
+ struct txm_story *newer = link->newer_story;
+ assert(newer->link[i].older.is_story);
+ assert(newer->link[i].older.story == story);
+ txm_story_unlink(newer, i);
+ if (link->older.is_story) {
+ struct txm_story *to = link->older.story;
+ txm_story_link_story(newer,to, i);
+ } else {
+ struct tuple *to = link->older.tuple;
+ txm_story_link_tuple(newer, to, i);
+ }
+ }
+ txm_story_unlink(story, i);
+ }
+ stmt->add_story->add_stmt = NULL;
+ stmt->add_story = NULL;
+ tuple_unref(stmt->new_tuple);
+ }
+
+ if (stmt->del_story != NULL) {
+ struct txm_story *story = stmt->del_story;
+
+ struct txn_stmt **prev = &story->del_stmt;
+ while (*prev != stmt) {
+ prev = &(*prev)->next_in_del_list;
+ assert(*prev != NULL);
+ }
+ *prev = stmt->next_in_del_list;
+ stmt->next_in_del_list = NULL;
+
+ stmt->del_story->del_stmt = NULL;
+ stmt->del_story = NULL;
+ }
+}
+
+void
+txm_history_prepare_stmt(struct txn_stmt *stmt)
+{
+ assert(stmt->txn->psn != 0);
+
+ /* Move story to the past to prepared stories. */
+
+ struct txm_story *story = stmt->add_story;
+ uint32_t index_count = story == NULL ? 0 : story->index_count;
+ /*
+ * Note that if stmt->add_story == NULL, the index_count is set to 0,
+ * and we will not enter the loop.
+ */
+ for (uint32_t i = 0; i < index_count; ) {
+ if (!story->link[i].older.is_story) {
+ /* tuple is old. */
+ i++;
+ continue;
+ }
+ struct txm_story *old_story =
+ story->link[i].older.story;
+ if (old_story->del_psn != 0) {
+ /* is psn is set, the change is prepared. */
+ i++;
+ continue;
+ }
+ if (old_story->add_psn != 0) {
+ /* is psn is set, the change is prepared. */
+ i++;
+ continue;
+ }
+
+ if (old_story->add_stmt != NULL) {
+ /* ancient. */
+ i++;
+ continue;
+ }
+ if (old_story->add_stmt->txn == stmt->txn) {
+ /* added by us. */
+ i++;
+ continue;
+ }
+
+ if (old_story->add_stmt->does_require_old_tuple || i != 0)
+ old_story->add_stmt->txn->status = TXN_CONFLICTED;
+
+ /* Swap story and old story. */
+ struct txm_story_link *link = &story->link[i];
+ if (link->newer_story == NULL) {
+ /* we have to replace the tuple in index. */
+ struct tuple *unused;
+ struct index *index = stmt->space->index[i];
+ if (index_replace(index, story->tuple, old_story->tuple,
+ DUP_INSERT, &unused) != 0) {
+ diag_log();
+ panic("failed to rollback change");
+ }
+ } else {
+ struct txm_story *newer = link->newer_story;
+ assert(newer->link[i].older.is_story);
+ assert(newer->link[i].older.story == story);
+ txm_story_unlink(newer, i);
+ txm_story_link_story(newer, old_story, i);
+ }
+
+ txm_story_unlink(story, i);
+ if (old_story->link[i].older.is_story) {
+ struct txm_story *to =
+ old_story->link[i].older.story;
+ txm_story_unlink(old_story, i);
+ txm_story_link_story(story, to, i);
+ } else {
+ struct tuple *to =
+ old_story->link[i].older.tuple;
+ txm_story_unlink(old_story, i);
+ txm_story_link_tuple(story, to, i);
+ }
+
+ txm_story_link_story(old_story, story, i);
+
+ if (i == 0) {
+ assert(stmt->del_story == old_story);
+ assert(story->link[0].older.is_story ||
+ story->link[0].older.tuple == NULL);
+
+ struct txn_stmt *dels = old_story->del_stmt;
+ assert(dels != NULL);
+ do {
+ if (dels->txn != stmt->txn)
+ dels->txn->status = TXN_CONFLICTED;
+ dels->del_story = NULL;
+ struct txn_stmt *next = dels->next_in_del_list;
+ dels->next_in_del_list = NULL;
+ dels = next;
+ } while (dels != NULL);
+ old_story->del_stmt = NULL;
+
+ if (story->link[0].older.is_story) {
+ struct txm_story *oldest_story =
+ story->link[0].older.story;
+ dels = oldest_story->del_stmt;
+ while (dels != NULL) {
+ assert(dels->txn != stmt->txn);
+ dels->del_story = NULL;
+ struct txn_stmt *next =
+ dels->next_in_del_list;
+ dels->next_in_del_list = NULL;
+ dels = next;
+ }
+ oldest_story->del_stmt = stmt;
+ stmt->del_story = oldest_story;
+ }
+ }
+ }
+ if (stmt->add_story != NULL)
+ stmt->add_story->add_psn = stmt->txn->psn;
+
+ if (stmt->del_story != NULL)
+ stmt->del_story->del_psn = stmt->txn->psn;
+}
+
+ssize_t
+txm_history_commit_stmt(struct txn_stmt *stmt)
+{
+ size_t res = 0;
+ if (stmt->add_story != NULL) {
+ assert(stmt->add_story->add_stmt == stmt);
+ res += stmt->add_story->tuple->bsize;
+ stmt->add_story->add_stmt = NULL;
+ stmt->add_story = NULL;
+ }
+ if (stmt->del_story != NULL) {
+ assert(stmt->del_story->del_stmt == stmt);
+ assert(stmt->next_in_del_list == NULL);
+ res -= stmt->del_story->tuple->bsize;
+ tuple_unref(stmt->del_story->tuple);
+ stmt->del_story->del_stmt = NULL;
+ stmt->del_story = NULL;
+ }
+ return res;
+}
+
+struct tuple *
+txm_tuple_clarify_slow(struct txn *txn, struct space *space,
+ struct tuple *tuple, uint32_t index,
+ uint32_t mk_index, bool is_prepared_ok)
+{
+ (void)space;
+ assert(tuple->is_dirty);
+ struct txm_story *story = txm_story_get(tuple);
+ bool own_change = false;
+ struct tuple *result = NULL;
+
+ while (true) {
+ if (txm_story_is_visible(story, txn, &result,
+ is_prepared_ok, &own_change)) {
+ break;
+ }
+ if (story->link[index].older.is_story) {
+ story = story->link[index].older.story;
+ } else {
+ result = story->link[index].older.tuple;
+ break;
+ }
+ }
+ (void)own_change; /* TODO: add conflict */
+ (void)mk_index; /* TODO: multiindex */
+ return result;
+}
+
+static void
+txm_story_delete(struct txm_story *story)
+{
+ assert(story->add_stmt == NULL);
+ assert(story->del_stmt == NULL);
+
+ if (txm.traverse_all_stories == &story->in_all_stories)
+ txm.traverse_all_stories = rlist_next(txm.traverse_all_stories);
+ rlist_del(&story->in_all_stories);
+
+ mh_int_t pos = mh_history_find(txm.history, story->tuple, 0);
+ assert(pos != mh_end(txm.history));
+ mh_history_del(txm.history, pos, 0);
+
+ story->tuple->is_dirty = false;
+ tuple_unref(story->tuple);
+
+#ifndef NDEBUG
+ /* Expecting to delete fully unlinked story. */
+ for (uint32_t i = 0; i < story->index_count; i++) {
+ assert(story->link[i].newer_story == NULL);
+ assert(story->link[i].older.is_story == false);
+ assert(story->link[i].older.tuple == NULL);
+ }
+#endif
+
+ struct mempool *pool = &txm.txm_story_pool[story->index_count];
+ mempool_free(pool, story);
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index 03ccb76..e294497 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -36,6 +36,7 @@
#include "trigger.h"
#include "fiber.h"
#include "space.h"
+#include "tuple.h"
#if defined(__cplusplus)
extern "C" {
@@ -175,6 +176,27 @@ struct txn_stmt {
struct space *space;
struct tuple *old_tuple;
struct tuple *new_tuple;
+ /**
+ * If new_tuple != NULL and this transaction was not prepared,
+ * this member holds added story of the new_tuple.
+ */
+ struct txm_story *add_story;
+ /**
+ * If new_tuple == NULL and this transaction was not prepared,
+ * this member holds added story of the old_tuple.
+ */
+ struct txm_story *del_story;
+ /**
+ * Link in txm_story::del_stmt linked list.
+ * Only one prepared TX can delete a tuple and a story. But
+ * when there are several in-progress transactions and they delete
+ * the same tuple we have to store several delete statements in one
+ * story. It's implemented in that way: story has a pointer to the first
+ * deleting statement, that statement has a pointer to the next etc,
+ * with NULL in the end.
+ * That member is that the pointer to next deleting statement.
+ */
+ struct txn_stmt *next_in_del_list;
/** Engine savepoint for the start of this statement. */
void *engine_savepoint;
/** Redo info: the binary log row */
@@ -359,6 +381,88 @@ struct txn {
struct rlist in_read_view_txs;
};
+/**
+ * Pointer to tuple or story.
+ */
+struct txm_story_or_tuple {
+ /** Flag whether it's a story. */
+ bool is_story;
+ union {
+ /** Pointer to story, it must be reverse liked. */
+ struct txm_story *story;
+ /** Smart pointer to tuple: the tuple is referenced if set. */
+ struct tuple *tuple;
+ };
+};
+
+/**
+ * Link that connects a txm_story with older and newer stories of the same
+ * key in index.
+ */
+struct txm_story_link {
+ /** Story that was happened after that story was ended. */
+ struct txm_story *newer_story;
+ /**
+ * Older story or ancient tuple (so old that its story was lost).
+ * In case of tuple is can also be NULL.
+ */
+ struct txm_story_or_tuple older;
+};
+
+/**
+ * A part of a history of a value in space.
+ * It's a story about a tuple, from the point it was added to space to the
+ * point when it was deleted from a space.
+ * All stories are linked into a list of stories of the same key of each index.
+ */
+struct txm_story {
+ /** The story is about this tuple. The tuple is referenced. */
+
+ struct tuple *tuple;
+ /**
+ * Statement that told this story. Is set to NULL when the statement's
+ * transaction becomes committed. Can also be NULL if we don't know who
+ * introduced that story, the tuple was added by a transaction that
+ * was completed and destroyed some time ago.
+ */
+ struct txn_stmt *add_stmt;
+ /**
+ * Prepare sequence number of add_stmt's transaction. Is set when
+ * the transactions is prepared. Can be 0 if the transaction is
+ * in progress or we don't know who introduced that story.
+ */
+ int64_t add_psn;
+ /**
+ * Statement that ended this story. Is set to NULL when the statement's
+ * transaction becomes committed. Can also be NULL if the tuple has not
+ * been deleted yet.
+ */
+ struct txn_stmt *del_stmt;
+ /**
+ * Prepare sequence number of del_stmt's transaction. Is set when
+ * the transactions is prepared. Can be 0 if the transaction is
+ * in progress or if nobody has deleted the tuple.
+ */
+ int64_t del_psn;
+ /**
+ * List of trackers - transactions that has read this tuple.
+ */
+ struct rlist reader_list;
+ /**
+ * Link in tx_manager::all_stories
+ */
+ struct rlist in_all_stories;
+ /**
+ * Number of indexes in this space - and the count of link[].
+ */
+ uint32_t index_count;
+ /**
+ * Link with older and newer stories (and just tuples) for each
+ * index respectively.
+ */
+ struct txm_story_link link[];
+};
+
static inline bool
txn_has_flag(struct txn *txn, enum txn_flag flag)
{
@@ -749,6 +853,95 @@ tx_manager_free();
int
txm_cause_conflict(struct txn *breaker, struct txn *victim);
+/**
+ * @brief Add a statement to transaction manager's history.
+ * Until unlinking or releasing the space could internally contain
+ * wrong tuples and must be cleaned through txm_tuple_clarify call.
+ * With that clarifying the statement will be visible to current transaction,
+ * but invisible to all others.
+ * Follows signature of @sa memtx_space_replace_all_keys .
+ *
+ * @param stmt current statement.
+ * @param old_tuple the tuple that should be removed (can be NULL).
+ * @param new_tuple the tuple that should be inserted (can be NULL).
+ * @param mode dup_replace_mode, used only if new_tuple is not
+ * NULL and old_tuple is NULL, and only for the
+ * primary key.
+ * @param result - old or replaced tuple.
+ * @return 0 on success, -1 on error (diag is set).
+ */
+int
+txm_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple,
+ struct tuple *new_tuple, enum dup_replace_mode mode,
+ struct tuple **result);
+
+/**
+ * @brief Rollback (undo) a statement from transaction manager's history.
+ * It's just make the statement invisible to all.
+ * Prepared statements could be also removed, but for consistency all latter
+ * prepared statement must be also rolled back.
+ *
+ * @param stmt current statement.
+ */
+void
+txm_history_rollback_stmt(struct txn_stmt *stmt);
+
+/**
+ * @brief Prepare statement in history for further commit.
+ * Prepared statements are still invisible for read-only transactions
+ * but are visible to all read-write transactions.
+ * Prepared and in-progress transactions use the same links for creating
+ * chains of stories in history. The difference is that the order of
+ * prepared transactions is fixed while in-progress transactions are
+ * added to the end of list in any order. Thus to switch to prepared
+ * we have to reorder story in such a way that current story will be
+ * between earlier prepared stories and in-progress stories. That's what
+ * this function does.
+ *
+ * @param stmt current statement.
+ */
+void
+txm_history_prepare_stmt(struct txn_stmt *stmt);
+
+/**
+ * @brief Commit statement in history.
+ * Make the statement's changes permanent. It becomes visible to all.
+ *
+ * @param stmt current statement.
+ * @return the change in space bsize.
+ */
+ssize_t
+txm_history_commit_stmt(struct txn_stmt *stmt);
+
+/** Helper of txm_tuple_clarify */
+struct tuple *
+txm_tuple_clarify_slow(struct txn *txn, struct space *space,
+ struct tuple *tuple, uint32_t index,
+ uint32_t mk_index, bool is_prepared_ok);
+
+/**
+ * Cleans a tuple if it's dirty - finds a visible tuple in history.
+ * @param txn - current transactions.
+ * @param space - space in which the tuple was found.
+ * @param tuple - tuple to clean.
+ * @param index - index number.
+ * @param mk_index - multikey index (iа the index is multikey).
+ * @param is_prepared_ok - allow to return prepared tuples.
+ * @return clean tuple (can be NULL).
+ */
+static inline struct tuple *
+txm_tuple_clarify(struct txn *txn, struct space *space,
+ struct tuple *tuple, uint32_t index,
+ uint32_t mk_index, bool is_prepared_ok)
+{
+ if (!tx_manager_use_mvcc_engine)
+ return tuple;
+ if (!tuple->is_dirty)
+ return tuple;
+ return txm_tuple_clarify_slow(txn, space, tuple, index, mk_index,
+ is_prepared_ok);
+}
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
--
2.7.4
More information about the Tarantool-patches
mailing list