[Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story

Aleksandr Lyapunov alyapunov at tarantool.org
Tue Sep 8 13:22:07 MSK 2020


Memtx story is a part of a history of a value in space.
It's a story about a tuple, from the point it was added to space
to the point when it was deleted from the space.
All stories are linked into a list of stories of the same key of
each index.

Part of #4897
---
 src/box/memtx_tx.c | 969 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/box/memtx_tx.h | 217 ++++++++++++
 src/box/space.c    |   2 +
 src/box/space.h    |   4 +
 src/box/txn.c      |  12 +
 src/box/txn.h      |  24 ++
 6 files changed, 1228 insertions(+)

diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index eb2ab51..dfad6f7 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -35,6 +35,29 @@
 #include <stdint.h>
 
 #include "txn.h"
+#include "schema_def.h"
+#include "small/mempool.h"
+
+static uint32_t
+memtx_tx_story_key_hash(const struct tuple *a)
+{
+	uintptr_t u = (uintptr_t)a;
+	if (sizeof(uintptr_t) <= sizeof(uint32_t))
+		return u;
+	else
+		return u ^ (u >> 32);
+}
+
+#define mh_name _history
+#define mh_key_t struct tuple *
+#define mh_node_t struct memtx_story *
+#define mh_arg_t int
+#define mh_hash(a, arg) (memtx_tx_story_key_hash((*(a))->tuple))
+#define mh_hash_key(a, arg) (memtx_tx_story_key_hash(a))
+#define mh_cmp(a, b, arg) ((*(a))->tuple != (*(b))->tuple)
+#define mh_cmp_key(a, b, arg) ((a) != (*(b))->tuple)
+#define MH_SOURCE
+#include "salad/mhash.h"
 
 struct tx_manager
 {
@@ -44,6 +67,23 @@ struct tx_manager
 	 * so the list is ordered by rv_psn.
 	 */
 	struct rlist read_view_txs;
+	/** Mempools for tx_story objects with different index count. */
+	struct mempool memtx_tx_story_pool[BOX_INDEX_MAX];
+	/** Hash table tuple -> memtx_story of that tuple. */
+	struct mh_history_t *history;
+	/** List of all memtx_story objects. */
+	struct rlist all_stories;
+	/** Iterator that sequentially traverses all memtx_story objects. */
+	struct rlist *traverse_all_stories;
+};
+
+enum {
+	/**
+	 * Number of iterations that is allowed for TX manager to do for
+	 * searching and deleting no more used memtx_tx_stories per creation of
+	 * a new story.
+	 */
+		TX_MANAGER_GC_STEPS_SIZE = 2,
 };
 
 /** That's a definition, see declaration for description. */
@@ -56,6 +96,15 @@ void
 memtx_tx_manager_init()
 {
 	rlist_create(&txm.read_view_txs);
+	for (size_t i = 0; i < BOX_INDEX_MAX; i++) {
+		size_t item_size = sizeof(struct memtx_story) +
+				   i * sizeof(struct memtx_story_link);
+		mempool_create(&txm.memtx_tx_story_pool[i],
+			       cord_slab_cache(), item_size);
+	}
+	txm.history = mh_history_new();
+	rlist_create(&txm.all_stories);
+	txm.traverse_all_stories = &txm.all_stories;
 }
 
 void
@@ -136,3 +185,923 @@ memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim)
 		victim->status = TXN_CONFLICTED;
 	}
 }
+
+/** See definition for details */
+static void
+memtx_tx_story_gc_step();
+
+/**
+ * Create a new story and link it with the @tuple.
+ * @return story on success, NULL on error (diag is set).
+ */
+static struct memtx_story *
+memtx_tx_story_new(struct space *space, struct tuple *tuple)
+{
+	/* Free some memory. */
+	for (size_t i = 0; i < TX_MANAGER_GC_STEPS_SIZE; i++)
+		memtx_tx_story_gc_step();
+	assert(!tuple->is_dirty);
+	uint32_t index_count = space->index_count;
+	assert(index_count < BOX_INDEX_MAX);
+	struct mempool *pool = &txm.memtx_tx_story_pool[index_count];
+	struct memtx_story *story = (struct memtx_story *) mempool_alloc(pool);
+	if (story == NULL) {
+		size_t item_size = sizeof(struct memtx_story) +
+				   index_count *
+				   sizeof(struct memtx_story_link);
+		diag_set(OutOfMemory, item_size, "mempool_alloc", "story");
+		return NULL;
+	}
+	story->tuple = tuple;
+
+	const struct memtx_story **put_story =
+		(const struct memtx_story **) &story;
+	struct memtx_story **empty = NULL;
+	mh_int_t pos = mh_history_put(txm.history, put_story, &empty, 0);
+	if (pos == mh_end(txm.history)) {
+		mempool_free(pool, story);
+		diag_set(OutOfMemory, pos + 1, "mh_history_put",
+			 "mh_history_node");
+		return NULL;
+	}
+	tuple->is_dirty = true;
+	tuple_ref(tuple);
+
+	story->space = space;
+	story->index_count = index_count;
+	story->add_stmt = NULL;
+	story->add_psn = 0;
+	story->del_stmt = NULL;
+	story->del_psn = 0;
+	rlist_create(&story->reader_list);
+	rlist_add_tail(&txm.all_stories, &story->in_all_stories);
+	rlist_add(&space->memtx_stories, &story->in_space_stories);
+	memset(story->link, 0, sizeof(story->link[0]) * index_count);
+	return story;
+}
+
+static void
+memtx_tx_story_delete(struct memtx_story *story);
+
+/**
+ * Create a new story of a @tuple that was added by @stmt.
+ * @return story on success, NULL on error (diag is set).
+ */
+static struct memtx_story *
+memtx_tx_story_new_add_stmt(struct tuple *tuple, struct txn_stmt *stmt)
+{
+	struct memtx_story *res = memtx_tx_story_new(stmt->space, tuple);
+	if (res == NULL)
+		return NULL;
+	res->add_stmt = stmt;
+	assert(stmt->add_story == NULL);
+	stmt->add_story = res;
+	return res;
+}
+
+/**
+ * Create a new story of a @tuple that was deleted by @stmt.
+ * @return story on success, NULL on error (diag is set).
+ */
+static struct memtx_story *
+memtx_tx_story_new_del_stmt(struct tuple *tuple, struct txn_stmt *stmt)
+{
+	struct memtx_story *res = memtx_tx_story_new(stmt->space, tuple);
+	if (res == NULL)
+		return NULL;
+	res->del_stmt = stmt;
+	assert(stmt->del_story == NULL);
+	stmt->del_story = res;
+	return res;
+}
+
+/**
+ * Undo memtx_tx_story_new_add_stmt.
+ */
+static void
+memtx_tx_story_delete_add_stmt(struct memtx_story *story)
+{
+	story->add_stmt->add_story = NULL;
+	story->add_stmt = NULL;
+	memtx_tx_story_delete(story);
+}
+
+/**
+ * Undo memtx_tx_story_new_del_stmt.
+ */
+static void
+memtx_tx_story_delete_del_stmt(struct memtx_story *story)
+{
+	story->del_stmt->del_story = NULL;
+	story->del_stmt = NULL;
+	memtx_tx_story_delete(story);
+}
+
+
+/**
+ * Find a story of a @tuple. The story expected to be present (assert).
+ */
+static struct memtx_story *
+memtx_tx_story_get(struct tuple *tuple)
+{
+	assert(tuple->is_dirty);
+
+	mh_int_t pos = mh_history_find(txm.history, tuple, 0);
+	assert(pos != mh_end(txm.history));
+	return *mh_history_node(txm.history, pos);
+}
+
+/**
+ * Get the older tuple, extracting it from older story if necessary.
+ */
+static struct tuple *
+memtx_tx_story_older_tuple(struct memtx_story_link *link)
+{
+	return link->older.is_story ? link->older.story->tuple
+				    : link->older.tuple;
+}
+
+/**
+ * Link a @story with @older_story in @index (in both directions).
+ */
+static void
+memtx_tx_story_link_story(struct memtx_story *story,
+			  struct memtx_story *older_story,
+			  uint32_t index)
+{
+	assert(older_story != NULL);
+	struct memtx_story_link *link = &story->link[index];
+	/* Must be unlinked. */
+	assert(!link->older.is_story);
+	assert(link->older.tuple == NULL);
+	link->older.is_story = true;
+	link->older.story = older_story;
+	older_story->link[index].newer_story = story;
+}
+
+/**
+ * Link a @story with older @tuple in @index. In case if the tuple is dirty -
+ * find and link with the corresponding story.
+ */
+static void
+memtx_tx_story_link_tuple(struct memtx_story *story,
+			  struct tuple *older_tuple,
+			  uint32_t index)
+{
+	struct memtx_story_link *link = &story->link[index];
+	/* Must be unlinked. */
+	assert(!link->older.is_story);
+	assert(link->older.tuple == NULL);
+	if (older_tuple == NULL)
+		return;
+	if (older_tuple->is_dirty) {
+		memtx_tx_story_link_story(story,
+					  memtx_tx_story_get(older_tuple),
+					  index);
+		return;
+	}
+	link->older.tuple = older_tuple;
+	tuple_ref(link->older.tuple);
+}
+
+/**
+ * Unlink a @story with older story/tuple in @index.
+ */
+static void
+memtx_tx_story_unlink(struct memtx_story *story, uint32_t index)
+{
+	struct memtx_story_link *link = &story->link[index];
+	if (link->older.is_story) {
+		link->older.story->link[index].newer_story = NULL;
+	} else if (link->older.tuple != NULL) {
+		tuple_unref(link->older.tuple);
+		link->older.tuple = NULL;
+	}
+	link->older.is_story = false;
+	link->older.tuple = NULL;
+}
+
+/**
+ * Run one step of a crawler that traverses all stories and removes no more
+ * used stories.
+ */
+static void
+memtx_tx_story_gc_step()
+{
+	if (txm.traverse_all_stories == &txm.all_stories) {
+		/* We came to the head of the list. */
+		txm.traverse_all_stories = txm.traverse_all_stories->next;
+		return;
+	}
+
+	/* Lowest read view PSN */
+	int64_t lowest_rv_psm = txn_last_psn;
+	if (!rlist_empty(&txm.read_view_txs)) {
+		struct txn *txn =
+			rlist_first_entry(&txm.read_view_txs, struct txn,
+					  in_read_view_txs);
+		assert(txn->rv_psn != 0);
+		lowest_rv_psm = txn->rv_psn;
+	}
+
+	struct memtx_story *story =
+		rlist_entry(txm.traverse_all_stories, struct memtx_story,
+			    in_all_stories);
+	txm.traverse_all_stories = txm.traverse_all_stories->next;
+
+	if (story->add_stmt != NULL || story->del_stmt != NULL ||
+	    !rlist_empty(&story->reader_list)) {
+		/* The story is used directly by some transactions. */
+		return;
+	}
+	if (story->add_psn >= lowest_rv_psm ||
+	    story->del_psn >= lowest_rv_psm) {
+		/* The story can be used by a read view. */
+		return;
+	}
+
+	/* Unlink and delete the story */
+	for (uint32_t i = 0; i < story->index_count; i++) {
+		struct memtx_story_link *link = &story->link[i];
+		if (link->newer_story == NULL) {
+			/*
+			 * We are at the top of the chain. That means
+			 * that story->tuple is in index. If the story is
+			 * actually delete the tuple, it must be deleted from
+			 * index.
+			 */
+			if (story->del_psn > 0) {
+				struct index *index = story->space->index[i];
+				struct tuple *unused;
+				if (index_replace(index, story->tuple, NULL,
+						  DUP_INSERT, &unused) != 0) {
+					diag_log();
+					unreachable();
+					panic("failed to rollback change");
+				}
+				assert(story->tuple == unused);
+			}
+			memtx_tx_story_unlink(story, i);
+		} else {
+			link->newer_story->link[i].older = link->older;
+			link->older.is_story = false;
+			link->older.story = NULL;
+			link->newer_story = NULL;
+		}
+	}
+
+	memtx_tx_story_delete(story);
+}
+
+/**
+ * Check if a @story is visible for transaction @txn. Return visible tuple to
+ * @visible_tuple (can be set to NULL).
+ * @param is_prepared_ok - whether prepared (not committed) change is acceptable.
+ * @param own_change - return true if the change was made by @txn itself.
+ * @return true if the story is visible, false otherwise.
+ */
+static bool
+memtx_tx_story_is_visible(struct memtx_story *story, struct txn *txn,
+			  struct tuple **visible_tuple, bool is_prepared_ok,
+			  bool *own_change)
+{
+	*own_change = false;
+	*visible_tuple = NULL;
+
+	int64_t rv_psn = INT64_MAX;
+	if (txn != NULL && txn->rv_psn != 0)
+		rv_psn = txn->rv_psn;
+
+	struct txn_stmt *dels = story->del_stmt;
+	while (dels != NULL) {
+		if (dels->txn == txn) {
+			/* Tuple is deleted by us (@txn). */
+			*own_change = true;
+			return true;
+		}
+		dels = dels->next_in_del_list;
+	}
+	if (is_prepared_ok && story->del_psn != 0 && story->del_psn < rv_psn) {
+		/* Tuple is deleted by prepared TX. */
+		return true;
+	}
+	if (story->del_psn != 0 && story->del_stmt == NULL &&
+	    story->del_psn < rv_psn) {
+		/* Tuple is deleted by committed TX. */
+		return true;
+	}
+
+	if (story->add_stmt != NULL && story->add_stmt->txn == txn) {
+		/* Tuple is added by us (@txn). */
+		*visible_tuple = story->tuple;
+		*own_change = true;
+		return true;
+	}
+	if (is_prepared_ok && story->add_psn != 0 && story->add_psn < rv_psn) {
+		/* Tuple is added by another prepared TX. */
+		*visible_tuple = story->tuple;
+		return true;
+	}
+	if (story->add_psn != 0 && story->add_stmt == NULL &&
+	    story->add_psn < rv_psn) {
+		/* Tuple is added by committed TX. */
+		*visible_tuple = story->tuple;
+		return true;
+	}
+	if (story->add_psn == 0 && story->add_stmt == NULL) {
+		/* added long time ago. */
+		*visible_tuple = story->tuple;
+		return true;
+	}
+	return false;
+}
+
+/**
+ * Temporary (allocated on region) struct that stores a conflicting TX.
+ */
+struct memtx_tx_conflict
+{
+	/* The thansaction that will conflict us upon commit. */
+	struct txn *breaker;
+	/* Link in single-linked list. */
+	struct memtx_tx_conflict *next;
+};
+
+/**
+ * Save @breaker in list with head @conflicts_head. New list node is allocated
+ * on @region.
+ * @return 0 on success, -1 on memory error.
+ */
+static int
+memtx_tx_save_conflict(struct txn *breaker,
+		       struct memtx_tx_conflict **conflicts_head,
+		       struct region *region)
+{
+	size_t err_size;
+	struct memtx_tx_conflict *next_conflict;
+	next_conflict = region_alloc_object(region, struct memtx_tx_conflict,
+					    &err_size);
+	if (next_conflict == NULL) {
+		diag_set(OutOfMemory, err_size, "txn_region", "txn conflict");
+		return -1;
+	}
+	next_conflict->breaker = breaker;
+	next_conflict->next = *conflicts_head;
+	*conflicts_head = next_conflict;
+	return 0;
+}
+
+/**
+ * Scan a history starting by @stmt statement in @index for a visible tuple
+ * (prepared suits), returned via @visible_replaced.
+ * Collect a list of transactions that will abort current transaction if they
+ * are committed.
+ *
+ * @return 0 on success, -1 on memory error.
+ */
+static int
+memtx_tx_story_find_visible(struct memtx_story *story, struct txn_stmt *stmt,
+			    uint32_t index, struct tuple **visible_replaced,
+			    struct memtx_tx_conflict **collected_conflicts,
+			    struct region *region)
+{
+	while (true) {
+		if (!story->link[index].older.is_story) {
+			/* The tuple is so old that we don't know its story. */
+			*visible_replaced = story->link[index].older.tuple;
+			assert(*visible_replaced == NULL ||
+			       !(*visible_replaced)->is_dirty);
+			break;
+		}
+		story = story->link[index].older.story;
+		bool unused;
+		if (memtx_tx_story_is_visible(story, stmt->txn,
+					      visible_replaced, true, &unused))
+			break;
+
+		/*
+		 * We skip the story as invisible but the corresponding TX
+		 * is committed our TX can become conflicted.
+		 * The conflict will be unavoidable if this statement
+		 * relies on old_tuple. If not (it's a replace),
+		 * the conflict will take place only for secondary
+		 * index if the story will not be overwritten in primary
+		 * index.
+		 */
+		bool cross_conflict = false;
+		if (stmt->does_require_old_tuple) {
+			cross_conflict = true;
+		} else if (index != 0) {
+			struct memtx_story *look_up = story;
+			cross_conflict = true;
+			while (look_up->link[0].newer_story != NULL) {
+				struct memtx_story *over;
+				over = look_up->link[0].newer_story;
+				if (over->add_stmt->txn == stmt->txn) {
+					cross_conflict = false;
+					break;
+				}
+				look_up = over;
+			}
+		}
+		if (cross_conflict) {
+			if (memtx_tx_save_conflict(story->add_stmt->txn,
+						   collected_conflicts,
+						   region) != 0)
+				return -1;
+
+		}
+	}
+	return 0;
+}
+
+int
+memtx_tx_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple,
+			  struct tuple *new_tuple, enum dup_replace_mode mode,
+			  struct tuple **result)
+{
+	assert(new_tuple != NULL || old_tuple != NULL);
+	struct space *space = stmt->space;
+	struct memtx_story *add_story = NULL;
+	uint32_t add_story_linked = 0;
+	struct memtx_story *del_story = NULL;
+	bool del_story_created = false;
+	struct region *region = &stmt->txn->region;
+	size_t region_svp = region_used(region);
+
+	/*
+	 * List of transactions that will conflict us once one of them
+	 * become committed.
+	 */
+	struct memtx_tx_conflict *collected_conflicts = NULL;
+
+	/* Create add_story if necessary. */
+	if (new_tuple != NULL) {
+		add_story = memtx_tx_story_new_add_stmt(new_tuple, stmt);
+		if (add_story == NULL)
+			goto fail;
+
+		for (uint32_t i = 0; i < space->index_count; i++) {
+			struct tuple *replaced;
+			struct index *index = space->index[i];
+			if (index_replace(index, NULL, new_tuple,
+					  DUP_REPLACE_OR_INSERT,
+					  &replaced) != 0)
+				goto fail;
+			memtx_tx_story_link_tuple(add_story, replaced, i);
+			add_story_linked++;
+
+			struct tuple *visible_replaced = NULL;
+			if (memtx_tx_story_find_visible(add_story, stmt, i,
+							&visible_replaced,
+							&collected_conflicts,
+							region) != 0)
+				goto fail;
+
+			uint32_t errcode;
+			errcode = replace_check_dup(old_tuple, visible_replaced,
+						    i == 0 ? mode : DUP_INSERT);
+			if (errcode != 0) {
+				if (space != NULL)
+					diag_set(ClientError, errcode,
+						 index->def->name,
+						 space_name(space));
+				goto fail;
+			}
+
+			if (i == 0)
+				old_tuple = visible_replaced;
+		}
+	}
+
+	/* Create del_story if necessary. */
+	struct tuple *del_tuple = NULL;
+	if (new_tuple != NULL) {
+		struct memtx_story_link *link = &add_story->link[0];
+		if (link->older.is_story) {
+			del_story = link->older.story;
+			del_tuple = del_story->tuple;
+		} else {
+			del_tuple = link->older.tuple;
+		}
+	} else {
+		del_tuple = old_tuple;
+	}
+	if (del_tuple != NULL && del_story == NULL) {
+		if (del_tuple->is_dirty) {
+			del_story = memtx_tx_story_get(del_tuple);
+		} else {
+			del_story = memtx_tx_story_new_del_stmt(del_tuple,
+								stmt);
+			if (del_story == NULL)
+				goto fail;
+			del_story_created = true;
+		}
+	}
+	if (new_tuple != NULL && del_story_created) {
+		for (uint32_t i = 0; i < add_story->index_count; i++) {
+			struct memtx_story_link *link = &add_story->link[i];
+			if (link->older.is_story)
+				continue;
+			if (link->older.tuple == del_tuple) {
+				memtx_tx_story_unlink(add_story, i);
+				memtx_tx_story_link_story(add_story, del_story,
+							  i);
+			}
+		}
+	}
+	if (del_story != NULL && !del_story_created) {
+		stmt->next_in_del_list = del_story->del_stmt;
+		del_story->del_stmt = stmt;
+		stmt->del_story = del_story;
+	}
+
+	/* Purge found conflicts. */
+	while (collected_conflicts != NULL) {
+		if (memtx_tx_cause_conflict(collected_conflicts->breaker,
+					    stmt->txn) != 0)
+			goto fail;
+		collected_conflicts = collected_conflicts->next;
+	}
+
+	/*
+	 * We now reference both new and old tuple because the stmt holds
+	 * pointers to them.
+	 */
+	if (stmt->new_tuple != NULL)
+		tuple_ref(stmt->new_tuple);
+	*result = old_tuple;
+	if (*result != NULL)
+		tuple_ref(*result);
+	return 0;
+
+	fail:
+	if (add_story != NULL) {
+		while (add_story_linked > 0) {
+			--add_story_linked;
+			uint32_t i = add_story_linked;
+
+			struct index *index = space->index[i];
+			struct memtx_story_link *link = &add_story->link[i];
+			struct tuple *was = memtx_tx_story_older_tuple(link);
+			struct tuple *unused;
+			if (index_replace(index, new_tuple, was,
+					  DUP_INSERT,
+					  &unused) != 0) {
+				diag_log();
+				unreachable();
+				panic("failed to rollback change");
+			}
+
+			memtx_tx_story_unlink(stmt->add_story, i);
+
+		}
+		memtx_tx_story_delete_add_stmt(stmt->add_story);
+	}
+
+	if (del_story != NULL && del_story->del_stmt == stmt) {
+		del_story->del_stmt = stmt->next_in_del_list;
+		stmt->next_in_del_list = NULL;
+	}
+
+	if (del_story_created)
+		memtx_tx_story_delete_del_stmt(stmt->del_story);
+	else
+		stmt->del_story = NULL;
+
+	region_truncate(region, region_svp);
+	return -1;
+}
+
+void
+memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
+{
+	if (stmt->add_story != NULL) {
+		assert(stmt->add_story->tuple == stmt->new_tuple);
+		struct memtx_story *story = stmt->add_story;
+
+		for (uint32_t i = 0; i < story->index_count; i++) {
+			struct memtx_story_link *link = &story->link[i];
+			if (link->newer_story == NULL) {
+				struct tuple *unused;
+				struct index *index = stmt->space->index[i];
+				struct tuple *was = memtx_tx_story_older_tuple(link);
+				if (index_replace(index, story->tuple, was,
+						  DUP_INSERT, &unused) != 0) {
+					diag_log();
+					unreachable();
+					panic("failed to rollback change");
+				}
+			} else {
+				struct memtx_story *newer = link->newer_story;
+				assert(newer->link[i].older.is_story);
+				assert(newer->link[i].older.story == story);
+				memtx_tx_story_unlink(newer, i);
+				if (link->older.is_story) {
+					struct memtx_story *to = link->older.story;
+					memtx_tx_story_link_story(newer, to, i);
+				} else {
+					struct tuple *to = link->older.tuple;
+					memtx_tx_story_link_tuple(newer, to, i);
+				}
+			}
+			memtx_tx_story_unlink(story, i);
+		}
+		stmt->add_story->add_stmt = NULL;
+		memtx_tx_story_delete(stmt->add_story);
+		stmt->add_story = NULL;
+		tuple_unref(stmt->new_tuple);
+	}
+
+	if (stmt->del_story != NULL) {
+		struct memtx_story *story = stmt->del_story;
+
+		struct txn_stmt **prev = &story->del_stmt;
+		while (*prev != stmt) {
+			prev = &(*prev)->next_in_del_list;
+			assert(*prev != NULL);
+		}
+		*prev = stmt->next_in_del_list;
+		stmt->next_in_del_list = NULL;
+
+		stmt->del_story->del_stmt = NULL;
+		stmt->del_story = NULL;
+	}
+}
+
+void
+memtx_tx_history_prepare_stmt(struct txn_stmt *stmt)
+{
+	assert(stmt->txn->psn != 0);
+
+	/* Move story to the past to prepared stories. */
+
+	struct memtx_story *story = stmt->add_story;
+	uint32_t index_count = story == NULL ? 0 : story->index_count;
+	/*
+	 * Note that if stmt->add_story == NULL, the index_count is set to 0,
+	 * and we will not enter the loop.
+	 */
+	for (uint32_t i = 0; i < index_count; ) {
+		if (!story->link[i].older.is_story) {
+			/* tuple is old. */
+			i++;
+			continue;
+		}
+		bool old_story_is_prepared = false;
+		struct memtx_story *old_story = story->link[i].older.story;
+		if (old_story->del_psn != 0) {
+			/* if psn is set, the change is prepared. */
+			old_story_is_prepared = true;
+		} else if (old_story->add_psn != 0) {
+			/* if psn is set, the change is prepared. */
+			old_story_is_prepared = true;
+		} else if (old_story->add_stmt == NULL) {
+			/* ancient. */
+			old_story_is_prepared = true;
+		} else if (old_story->add_stmt->txn == stmt->txn) {
+			/* added by us. */
+		}
+
+		if (old_story_is_prepared) {
+			struct tx_read_tracker *tracker;
+			rlist_foreach_entry(tracker, &old_story->reader_list,
+					    in_reader_list) {
+				if (tracker->reader == stmt->txn)
+					continue;
+				if (tracker->reader->status != TXN_INPROGRESS)
+					continue;
+				memtx_tx_handle_conflict(stmt->txn,
+							 tracker->reader);
+			}
+			i++;
+			continue;
+		}
+
+		if (old_story->add_stmt->does_require_old_tuple || i != 0)
+			old_story->add_stmt->txn->status = TXN_CONFLICTED;
+
+		/* Swap story and old story. */
+		struct memtx_story_link *link = &story->link[i];
+		if (link->newer_story == NULL) {
+			/* we have to replace the tuple in index. */
+			struct tuple *unused;
+			struct index *index = stmt->space->index[i];
+			if (index_replace(index, story->tuple, old_story->tuple,
+					  DUP_INSERT, &unused) != 0) {
+				diag_log();
+				panic("failed to rollback change");
+			}
+		} else {
+			struct memtx_story *newer = link->newer_story;
+			assert(newer->link[i].older.is_story);
+			assert(newer->link[i].older.story == story);
+			memtx_tx_story_unlink(newer, i);
+			memtx_tx_story_link_story(newer, old_story, i);
+		}
+
+		memtx_tx_story_unlink(story, i);
+		if (old_story->link[i].older.is_story) {
+			struct memtx_story *to =
+				old_story->link[i].older.story;
+			memtx_tx_story_unlink(old_story, i);
+			memtx_tx_story_link_story(story, to, i);
+		} else {
+			struct tuple *to =
+				old_story->link[i].older.tuple;
+			memtx_tx_story_unlink(old_story, i);
+			memtx_tx_story_link_tuple(story, to, i);
+		}
+
+		memtx_tx_story_link_story(old_story, story, i);
+
+		if (i == 0) {
+			assert(stmt->del_story == old_story);
+			assert(story->link[0].older.is_story ||
+			       story->link[0].older.tuple == NULL);
+
+			struct txn_stmt *dels = old_story->del_stmt;
+			assert(dels != NULL);
+			do {
+				if (dels->txn != stmt->txn)
+					dels->txn->status = TXN_CONFLICTED;
+				dels->del_story = NULL;
+				struct txn_stmt *next = dels->next_in_del_list;
+				dels->next_in_del_list = NULL;
+				dels = next;
+			} while (dels != NULL);
+			old_story->del_stmt = NULL;
+
+			if (story->link[0].older.is_story) {
+				struct memtx_story *oldest_story =
+					story->link[0].older.story;
+				dels = oldest_story->del_stmt;
+				while (dels != NULL) {
+					assert(dels->txn != stmt->txn);
+					dels->del_story = NULL;
+					struct txn_stmt *next =
+						dels->next_in_del_list;
+					dels->next_in_del_list = NULL;
+					dels = next;
+				}
+				oldest_story->del_stmt = stmt;
+				stmt->del_story = oldest_story;
+			}
+		}
+	}
+	if (stmt->add_story != NULL)
+		stmt->add_story->add_psn = stmt->txn->psn;
+
+	if (stmt->del_story != NULL)
+		stmt->del_story->del_psn = stmt->txn->psn;
+}
+
+ssize_t
+memtx_tx_history_commit_stmt(struct txn_stmt *stmt)
+{
+	size_t res = 0;
+	if (stmt->add_story != NULL) {
+		assert(stmt->add_story->add_stmt == stmt);
+		res += stmt->add_story->tuple->bsize;
+		stmt->add_story->add_stmt = NULL;
+		stmt->add_story = NULL;
+	}
+	if (stmt->del_story != NULL) {
+		assert(stmt->del_story->del_stmt == stmt);
+		assert(stmt->next_in_del_list == NULL);
+		res -= stmt->del_story->tuple->bsize;
+		tuple_unref(stmt->del_story->tuple);
+		stmt->del_story->del_stmt = NULL;
+		stmt->del_story = NULL;
+	}
+	return res;
+}
+
+struct tuple *
+memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space,
+			    struct tuple *tuple, uint32_t index,
+			    uint32_t mk_index, bool is_prepared_ok)
+{
+	assert(tuple->is_dirty);
+	struct memtx_story *story = memtx_tx_story_get(tuple);
+	bool own_change = false;
+	struct tuple *result = NULL;
+
+	while (true) {
+		if (memtx_tx_story_is_visible(story, txn, &result,
+					      is_prepared_ok, &own_change)) {
+			break;
+		}
+		if (story->link[index].older.is_story) {
+			story = story->link[index].older.story;
+		} else {
+			result = story->link[index].older.tuple;
+			break;
+		}
+	}
+	if (!own_change)
+		memtx_tx_track_read(txn, space, tuple);
+	(void)mk_index; /* TODO: multiindex */
+	return result;
+}
+
+static void
+memtx_tx_story_delete(struct memtx_story *story)
+{
+	assert(story->add_stmt == NULL);
+	assert(story->del_stmt == NULL);
+
+	if (txm.traverse_all_stories == &story->in_all_stories)
+		txm.traverse_all_stories = rlist_next(txm.traverse_all_stories);
+	rlist_del(&story->in_all_stories);
+	rlist_del(&story->in_space_stories);
+
+	mh_int_t pos = mh_history_find(txm.history, story->tuple, 0);
+	assert(pos != mh_end(txm.history));
+	mh_history_del(txm.history, pos, 0);
+
+	story->tuple->is_dirty = false;
+	tuple_unref(story->tuple);
+
+#ifndef NDEBUG
+	/* Expecting to delete fully unlinked story. */
+	for (uint32_t i = 0; i < story->index_count; i++) {
+		assert(story->link[i].newer_story == NULL);
+		assert(story->link[i].older.is_story == false);
+		assert(story->link[i].older.tuple == NULL);
+	}
+#endif
+
+	struct mempool *pool = &txm.memtx_tx_story_pool[story->index_count];
+	mempool_free(pool, story);
+}
+
+int
+memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
+{
+	if (tuple == NULL)
+		return 0;
+	if (txn == NULL)
+		return 0;
+	if (space == NULL)
+		return 0;
+
+	struct memtx_story *story;
+	struct tx_read_tracker *tracker = NULL;
+
+	if (!tuple->is_dirty) {
+		story = memtx_tx_story_new(space, tuple);
+		if (story == NULL)
+			return -1;
+		size_t sz;
+		tracker = region_alloc_object(&txn->region,
+					      struct tx_read_tracker, &sz);
+		if (tracker == NULL) {
+			diag_set(OutOfMemory, sz, "tx region", "read_tracker");
+			memtx_tx_story_delete(story);
+			return -1;
+		}
+		tracker->reader = txn;
+		tracker->story = story;
+		rlist_add(&story->reader_list, &tracker->in_reader_list);
+		rlist_add(&txn->read_set, &tracker->in_read_set);
+		return 0;
+	}
+	story = memtx_tx_story_get(tuple);
+
+	struct rlist *r1 = story->reader_list.next;
+	struct rlist *r2 = txn->read_set.next;
+	while (r1 != &story->reader_list && r2 != &txn->read_set) {
+		tracker = rlist_entry(r1, struct tx_read_tracker,
+				      in_reader_list);
+		assert(tracker->story == story);
+		if (tracker->reader == txn)
+			break;
+		tracker = rlist_entry(r2, struct tx_read_tracker,
+				      in_read_set);
+		assert(tracker->reader == txn);
+		if (tracker->story == story)
+			break;
+		tracker = NULL;
+		r1 = r1->next;
+		r2 = r2->next;
+	}
+	if (tracker != NULL) {
+		/* Move to the beginning of a list for faster further lookups.*/
+		rlist_del(&tracker->in_reader_list);
+		rlist_del(&tracker->in_read_set);
+	} else {
+		size_t sz;
+		tracker = region_alloc_object(&txn->region,
+					      struct tx_read_tracker, &sz);
+		if (tracker == NULL) {
+			diag_set(OutOfMemory, sz, "tx region", "read_tracker");
+			return -1;
+		}
+		tracker->reader = txn;
+		tracker->story = story;
+	}
+	rlist_add(&story->reader_list, &tracker->in_reader_list);
+	rlist_add(&txn->read_set, &tracker->in_read_set);
+	return 0;
+}
diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
index 6143a22..4670ebe 100644
--- a/src/box/memtx_tx.h
+++ b/src/box/memtx_tx.h
@@ -31,6 +31,12 @@
  */
 
 #include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include "small/rlist.h"
+#include "index.h"
+#include "tuple.h"
 
 #include "small/rlist.h"
 
@@ -61,6 +67,119 @@ struct tx_conflict_tracker {
 };
 
 /**
+ * Record that links transaction and a story that the transaction have read.
+ */
+struct tx_read_tracker {
+	/** The TX that read story. */
+	struct txn *reader;
+	/** The story that was read by reader. */
+	struct memtx_story *story;
+	/** Link in story->reader_list. */
+	struct rlist in_reader_list;
+	/** Link in reader->read_set. */
+	struct rlist in_read_set;
+};
+
+/**
+ * Pointer to tuple or story.
+ */
+struct memtx_story_or_tuple {
+	/** Flag whether it's a story. */
+	bool is_story;
+	union {
+		/** Pointer to story, it must be reverse liked. */
+		struct memtx_story *story;
+		/** Smart pointer to tuple: the tuple is referenced if set. */
+		struct tuple *tuple;
+	};
+};
+
+/**
+ * Link that connects a memtx_story with older and newer stories of the same
+ * key in index.
+ */
+struct memtx_story_link {
+	/** Story that was happened after that story was ended. */
+	struct memtx_story *newer_story;
+	/**
+	 * Older story or ancient tuple (so old that its story was lost).
+	 * In case of tuple is can also be NULL.
+	 */
+	struct memtx_story_or_tuple older;
+};
+
+/**
+ * A part of a history of a value in space.
+ * It's a story about a tuple, from the point it was added to space to the
+ * point when it was deleted from a space.
+ * All stories are linked into a list of stories of the same key of each index.
+ */
+struct memtx_story {
+	/** The story is about this tuple. The tuple is referenced. */
+
+	struct tuple *tuple;
+	/**
+	 * Statement that told this story. Is set to NULL when the statement's
+	 * transaction becomes committed. Can also be NULL if we don't know who
+	 * introduced that story, the tuple was added by a transaction that
+	 * was completed and destroyed some time ago.
+	 */
+	struct txn_stmt *add_stmt;
+	/**
+	 * Prepare sequence number of add_stmt's transaction. Is set when
+	 * the transaction is prepared. Can be 0 if the transaction is
+	 * in progress or we don't know who introduced that story.
+	 */
+	int64_t add_psn;
+	/**
+	 * Statement that ended this story. Is set to NULL when the statement's
+	 * transaction becomes committed. Can also be NULL if the tuple has not
+	 * been deleted yet.
+	 */
+	struct txn_stmt *del_stmt;
+	/**
+	 * Prepare sequence number of del_stmt's transaction. Is set when
+	 * the transaction is prepared. Can be 0 if the transaction is
+	 * in progress or if nobody has deleted the tuple.
+	 */
+	int64_t del_psn;
+	/**
+	 * List of trackers - transactions that has read this tuple.
+	 */
+	struct rlist reader_list;
+	/**
+	 * Link in tx_manager::all_stories
+	 */
+	struct rlist in_all_stories;
+	/**
+	 * Link in space::memtx_tx_stories.
+	 */
+	struct rlist in_space_stories;
+	/**
+	 * The space where the tuple is supposed to be.
+	 */
+	struct space *space;
+	/**
+	 * Number of indexes in this space - and the count of link[].
+	 */
+	uint32_t index_count;
+	/**
+	 * Link with older and newer stories (and just tuples) for each
+	 * index respectively.
+	 */
+	struct memtx_story_link link[];
+};
+
+/**
+ * Snapshot cleaner is a short part of history that is supposed to clarify
+ * tuples in a index snapshot. It's also supposed to be used in another
+ * thread while common clarify would probably crash in that case.
+ */
+struct memtx_tx_snapshot_cleaner {
+	struct mh_snapshot_cleaner_t *ht;
+};
+
+/**
  * Initialize memtx transaction manager.
  */
 void
@@ -94,6 +213,104 @@ memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim);
 void
 memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim);
 
+/**
+ * @brief Add a statement to transaction manager's history.
+ * Until unlinking or releasing the space could internally contain
+ * wrong tuples and must be cleaned through memtx_tx_tuple_clarify call.
+ * With that clarifying the statement will be visible to current transaction,
+ * but invisible to all others.
+ * Follows signature of @sa memtx_space_replace_all_keys .
+ *
+ * @param stmt current statement.
+ * @param old_tuple the tuple that should be removed (can be NULL).
+ * @param new_tuple the tuple that should be inserted (can be NULL).
+ * @param mode      dup_replace_mode, used only if new_tuple is not
+ *                  NULL and old_tuple is NULL, and only for the
+ *                  primary key.
+ * @param result - old or replaced tuple.
+ * @return 0 on success, -1 on error (diag is set).
+ */
+int
+memtx_tx_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple,
+			  struct tuple *new_tuple, enum dup_replace_mode mode,
+			  struct tuple **result);
+
+/**
+ * @brief Rollback (undo) a statement from transaction manager's history.
+ * It's just make the statement invisible to all.
+ * Prepared statements could be also removed, but for consistency all latter
+ * prepared statement must be also rolled back.
+ *
+ * @param stmt current statement.
+ */
+void
+memtx_tx_history_rollback_stmt(struct txn_stmt *stmt);
+
+/**
+ * @brief Prepare statement in history for further commit.
+ * Prepared statements are still invisible for read-only transactions
+ * but are visible to all read-write transactions.
+ * Prepared and in-progress transactions use the same links for creating
+ * chains of stories in history. The difference is that the order of
+ * prepared transactions is fixed while in-progress transactions are
+ * added to the end of list in any order. Thus to switch to prepared
+ * we have to reorder story in such a way that current story will be
+ * between earlier prepared stories and in-progress stories. That's what
+ * this function does.
+ *
+ * @param stmt current statement.
+ */
+void
+memtx_tx_history_prepare_stmt(struct txn_stmt *stmt);
+
+/**
+ * @brief Commit statement in history.
+ * Make the statement's changes permanent. It becomes visible to all.
+ *
+ * @param stmt current statement.
+ * @return the change in space bsize.
+ */
+ssize_t
+memtx_tx_history_commit_stmt(struct txn_stmt *stmt);
+
+/** Helper of memtx_tx_tuple_clarify */
+struct tuple *
+memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space,
+			    struct tuple *tuple, uint32_t index,
+			    uint32_t mk_index, bool is_prepared_ok);
+
+/**
+ * Record in TX manager that a transaction @txn have read a @tuple in @space.
+ * @return 0 on success, -1 on memory error.
+ */
+int
+memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple);
+
+/**
+ * Clean a tuple if it's dirty - finds a visible tuple in history.
+ * @param txn - current transactions.
+ * @param space - space in which the tuple was found.
+ * @param tuple - tuple to clean.
+ * @param index - index number.
+ * @param mk_index - multikey index (iа the index is multikey).
+ * @param is_prepared_ok - allow to return prepared tuples.
+ * @return clean tuple (can be NULL).
+ */
+static inline struct tuple *
+memtx_tx_tuple_clarify(struct txn *txn, struct space *space,
+		       struct tuple *tuple, uint32_t index,
+		       uint32_t mk_index, bool is_prepared_ok)
+{
+	if (!memtx_tx_manager_use_mvcc_engine)
+		return tuple;
+	if (!tuple->is_dirty) {
+		memtx_tx_track_read(txn, space, tuple);
+		return tuple;
+	}
+	return memtx_tx_tuple_clarify_slow(txn, space, tuple, index, mk_index,
+					   is_prepared_ok);
+}
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/space.c b/src/box/space.c
index 1d375cc..1243932 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -210,6 +210,7 @@ space_create(struct space *space, struct engine *engine,
 			 "constraint_ids");
 		goto fail;
 	}
+	rlist_create(&space->memtx_stories);
 	return 0;
 
 fail_free_indexes:
@@ -252,6 +253,7 @@ space_new_ephemeral(struct space_def *def, struct rlist *key_list)
 void
 space_delete(struct space *space)
 {
+	rlist_del(&space->memtx_stories);
 	assert(space->ck_constraint_trigger == NULL);
 	for (uint32_t j = 0; j <= space->index_id_max; j++) {
 		struct index *index = space->index_map[j];
diff --git a/src/box/space.h b/src/box/space.h
index bbdd3ef..7cfba65 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -239,6 +239,10 @@ struct space {
 	 * Hash table with constraint identifiers hashed by name.
 	 */
 	struct mh_strnptr_t *constraint_ids;
+	/**
+	 * List of all tx stories in the space.
+	 */
+	struct rlist memtx_stories;
 };
 
 /** Initialize a base space instance. */
diff --git a/src/box/txn.c b/src/box/txn.c
index 976e17c..023da91 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -110,6 +110,9 @@ txn_stmt_new(struct region *region)
 	stmt->space = NULL;
 	stmt->old_tuple = NULL;
 	stmt->new_tuple = NULL;
+	stmt->add_story = NULL;
+	stmt->del_story = NULL;
+	stmt->next_in_del_list = NULL;
 	stmt->engine_savepoint = NULL;
 	stmt->row = NULL;
 	stmt->has_triggers = false;
@@ -194,6 +197,7 @@ txn_new(void)
 	}
 	assert(region_used(&region) == sizeof(*txn));
 	txn->region = region;
+	rlist_create(&txn->read_set);
 	rlist_create(&txn->conflict_list);
 	rlist_create(&txn->conflicted_by_list);
 	rlist_create(&txn->in_read_view_txs);
@@ -206,6 +210,14 @@ txn_new(void)
 inline static void
 txn_free(struct txn *txn)
 {
+	struct tx_read_tracker *tracker, *tmp;
+	rlist_foreach_entry_safe(tracker, &txn->read_set,
+				 in_read_set, tmp) {
+		rlist_del(&tracker->in_reader_list);
+		rlist_del(&tracker->in_read_set);
+	}
+	assert(rlist_empty(&txn->read_set));
+
 	struct tx_conflict_tracker *entry, *next;
 	rlist_foreach_entry_safe(entry, &txn->conflict_list,
 				 in_conflict_list, next) {
diff --git a/src/box/txn.h b/src/box/txn.h
index f957d1e..ba818d0 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -36,6 +36,7 @@
 #include "trigger.h"
 #include "fiber.h"
 #include "space.h"
+#include "tuple.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -172,6 +173,27 @@ struct txn_stmt {
 	struct space *space;
 	struct tuple *old_tuple;
 	struct tuple *new_tuple;
+	/**
+	 * If new_tuple != NULL and this transaction was not prepared,
+	 * this member holds added story of the new_tuple.
+	 */
+	struct memtx_story *add_story;
+	/**
+	 * If new_tuple == NULL and this transaction was not prepared,
+	 * this member holds added story of the old_tuple.
+	 */
+	struct memtx_story *del_story;
+	/**
+	 * Link in memtx_story::del_stmt linked list.
+	 * Only one prepared TX can delete a tuple and a story. But
+	 * when there are several in-progress transactions and they delete
+	 * the same tuple we have to store several delete statements in one
+	 * story. It's implemented in that way: story has a pointer to the first
+	 * deleting statement, that statement has a pointer to the next etc,
+	 * with NULL in the end.
+	 * That member is that the pointer to next deleting statement.
+	 */
+	struct txn_stmt *next_in_del_list;
 	/** Engine savepoint for the start of this statement. */
 	void *engine_savepoint;
 	/** Redo info: the binary log row */
@@ -354,6 +376,8 @@ struct txn {
 	 * Link in tx_manager::read_view_txs.
 	 */
 	struct rlist in_read_view_txs;
+	/** List of tx_read_trackers with stories that the TX have read. */
+	struct rlist read_set;
 };
 
 static inline bool
-- 
2.7.4



More information about the Tarantool-patches mailing list