[Tarantool-patches] [PATCH 12/15] tx: introduce txm_story

Aleksandr Lyapunov alyapunov at tarantool.org
Fri Jul 3 09:33:14 MSK 2020


---
 src/box/txn.c | 516 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 src/box/txn.h | 129 +++++++++++++++
 2 files changed, 644 insertions(+), 1 deletion(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 79384f0..26377c6 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -35,13 +35,44 @@
 #include <fiber.h>
 #include "xrow.h"
 #include "errinj.h"
+#include "small/mempool.h"
+
+static uint32_t
+txm_story_key_hash(const struct tuple *a)
+{
+	uintptr_t u = (uintptr_t)a;
+	if (sizeof(uintptr_t) <= sizeof(uint32_t))
+		return u;
+	else
+		return u ^ (u >> 32);
+}
+
+#define mh_name _history
+#define mh_key_t struct tuple *
+#define mh_node_t struct txm_story *
+#define mh_arg_t int
+#define mh_hash(a, arg) (txm_story_key_hash((*(a))->tuple))
+#define mh_hash_key(a, arg) (txm_story_key_hash(a))
+#define mh_cmp(a, b, arg) ((*(a))->tuple != (*(b))->tuple)
+#define mh_cmp_key(a, b, arg) ((a) != (*(b))->tuple)
+#define MH_SOURCE
+#include "salad/mhash.h"
 
 struct tx_manager
 {
 	/** Last prepare-sequence-number that was assigned to preped TX. */
 	int64_t last_psn;
+	/** Mempools for tx_story objects with difference index count. */
+	struct mempool txm_story_pool[BOX_INDEX_MAX];
+	/** Hash table tuple -> txm_story of that tuple. */
+	struct mh_history_t *history;
+	/** List of all txm_story objects. */
+	struct rlist all_stories;
+	/** Iterator that sequentially traverses all txm_story objects. */
+	struct rlist *traverse_all_stories;
 };
 
+/** The one and only instance of tx_manager. */
 static struct tx_manager txm;
 
 struct tx_conflict_tracker {
@@ -119,6 +150,9 @@ txn_stmt_new(struct region *region)
 	stmt->space = NULL;
 	stmt->old_tuple = NULL;
 	stmt->new_tuple = NULL;
+	stmt->add_story = NULL;
+	stmt->del_story = NULL;
+	stmt->next_in_del_list = NULL;
 	stmt->engine_savepoint = NULL;
 	stmt->row = NULL;
 	stmt->has_triggers = false;
@@ -1022,11 +1056,23 @@ txn_on_yield(struct trigger *trigger, void *event)
 void
 tx_manager_init()
 {
+	for (size_t i = 0; i < BOX_INDEX_MAX; i++) {
+		size_t item_size = sizeof(struct txm_story) +
+			i * sizeof(struct txm_story_link);
+		mempool_create(&txm.txm_story_pool[i],
+			       cord_slab_cache(), item_size);
+	}
+	txm.history = mh_history_new();
+	rlist_create(&txm.all_stories);
+	txm.traverse_all_stories = &txm.all_stories;
 }
 
 void
 tx_manager_free()
 {
+	mh_history_delete(txm.history);
+	for (size_t i = 0; i < BOX_INDEX_MAX; i++)
+		mempool_destroy(&txm.txm_story_pool[i]);
 }
 
 int
@@ -1070,4 +1116,472 @@ txm_cause_conflict(struct txn *wreaker, struct txn *victim)
 	rlist_add(&wreaker->conflict_list, &tracker->in_conflict_list);
 	rlist_add(&wreaker->conflicted_by_list, &tracker->in_conflicted_by_list);
 	return 0;
-}
\ No newline at end of file
+}
+
+struct txm_story *
+txm_story_new(struct tuple *tuple, struct txn_stmt *stmt, uint32_t index_count)
+{
+	assert(!tuple->is_dirty);
+	assert(index_count < BOX_INDEX_MAX);
+	struct mempool *pool = &txm.txm_story_pool[index_count];
+	struct txm_story *story = (struct txm_story *)mempool_alloc(pool);
+	if (story == NULL) {
+		size_t item_size = sizeof(struct txm_story) +
+			index_count * sizeof(struct txm_story_link);
+		diag_set(OutOfMemory, item_size,
+			 "tx_manager", "tx story");
+		return story;
+	}
+	story->tuple = tuple;
+
+	const struct txm_story **put_story = (const struct txm_story **)&story;
+	struct txm_story **empty = NULL;
+	mh_int_t pos = mh_history_put(txm.history, put_story, &empty, 0);
+	if (pos == mh_end(txm.history)) {
+		mempool_free(pool, story);
+		diag_set(OutOfMemory, pos + 1,
+			 "tx_manager", "tx history hash table");
+		return NULL;
+	}
+	tuple->is_dirty = true;
+	tuple_ref(tuple);
+
+	story->index_count = index_count;
+	story->add_stmt = stmt;
+	story->add_psn = 0;
+	story->del_stmt = NULL;
+	story->del_psn = 0;
+	rlist_create(&story->reader_list);
+	rlist_add(&txm.all_stories, &story->in_all_stories);
+	memset(story->link, 0, sizeof(story->link[0]) * index_count);
+	return story;
+}
+
+/** Temporary allocated on region that stores a conflicting TX. */
+struct txn_conflict
+{
+	struct txn *wreaker;
+	struct txn_conflict *next;
+};
+
+int
+txm_check_and_link_add_story(struct txm_story *story, struct txn_stmt *stmt,
+			     enum dup_replace_mode mode)
+{
+	for (uint32_t i = 0; i < story->index_count; i++) {
+		assert(!story->link[i].is_old_story);
+		struct tuple *next_tuple = story->link[i].old_tuple;
+		if (next_tuple != NULL && next_tuple->is_dirty) {
+			struct txm_story *next = txm_story_get(next_tuple);
+			assert(next->link[i].new_story == NULL);
+			story->link[i].is_old_story = true;
+			story->link[i].old_story = next;
+			next->link[i].new_story = story;
+		}
+	}
+
+	struct region *region = &stmt->txn->region;
+	size_t region_svp = region_used(region);
+	struct txn_conflict *collected_conflicts = NULL;
+
+	for (uint32_t i = 0; i < story->index_count; i++) {
+		struct tuple *visible = NULL;
+		struct txm_story *node = story;
+		while (true) {
+			if (!node->link[i].is_old_story) {
+				/*
+				 * the tuple is so old that we doesn't
+				 * know its story.
+				 */
+				visible = node->link[i].old_tuple;
+				assert(visible == NULL || !visible->is_dirty);
+				break;
+			}
+			node = node->link[i].old_story;
+
+			if (node->del_psn != 0) {
+				/* deleted by at least prepared TX. */
+				break;
+			}
+			if (node->del_stmt != NULL &&
+			    node->del_stmt->txn == stmt->txn)
+				break; /* deleted by us. */
+			if (node->add_psn != 0) {
+				/* added by at least prepared TX. */
+				visible = node->tuple;
+				break;
+			}
+			if (node->add_stmt == NULL) {
+				/*
+				 * the tuple is so old that we lost
+				 * the beginning of its story.
+				 */
+				visible = node->tuple;
+				break;
+			}
+			if (node->add_stmt->txn == stmt->txn) {
+				/* added by us. */
+				visible = node->tuple;
+				break;
+			}
+			/*
+			 * We skip the story but once the story is committed
+			 * before out TX that may cause conflict.
+			 * The conflict will be inavoidable if this statement
+			 * relies on old_tuple. If not (it's a replace),
+			 * the conflict will take place only for secondary
+			 * index if the story will not be overwritten in primary
+			 * index.
+			 */
+			bool cross_conflict = false;
+			if (stmt->preserve_old_tuple) {
+				cross_conflict = true;
+			} else if (i != 0) {
+				struct txm_story *look_up = node;
+				cross_conflict = true;
+				while (look_up->link[0].new_story != NULL) {
+					struct txm_story *over;
+					over = look_up->link[0].new_story;
+					if (over->add_stmt->txn == stmt->txn) {
+						cross_conflict = false;
+						break;
+					}
+					look_up = over;
+				}
+			}
+			if (!cross_conflict)
+				continue;
+			size_t err_size;
+			struct txn_conflict *next_conflict;
+			next_conflict =
+				region_alloc_object(region,
+						    struct txn_conflict,
+						    &err_size);
+			if (next_conflict == NULL) {
+				diag_set(OutOfMemory, err_size,
+					 "txn_region", "txn conflict");
+				goto fail;
+			}
+			next_conflict->wreaker = node->add_stmt->txn;
+			next_conflict->next = collected_conflicts;
+			collected_conflicts = next_conflict;
+		}
+
+		int errcode;
+		errcode = replace_check_dup(stmt->old_tuple, visible,
+					    i == 0 ? mode : DUP_INSERT);
+		if (errcode != 0) {
+			struct space *sp = stmt->space;
+			if (sp != NULL)
+				diag_set(ClientError, errcode,
+					 sp->index[i]->def->name,
+					 space_name(sp));
+			goto fail;
+		}
+	}
+
+	if (story->link[0].is_old_story) {
+		stmt->next_in_del_list = story->link[0].old_story->del_stmt;
+		story->link[0].old_story->del_stmt = stmt;
+		for (uint32_t i = 0; i < story->index_count; i++) {
+			if (story->link[i].is_old_story)
+				continue;
+			if (story->link[i].old_tuple != NULL)
+				tuple_ref(story->link[i].old_tuple);
+		}
+	} else if (story->link[0].old_tuple != NULL) {
+		struct tuple *old_tuple = story->link[0].old_tuple;
+		struct txm_story *del_story;
+		del_story = txm_story_new(old_tuple, NULL, story->index_count);
+		if (del_story == NULL)
+			goto fail;
+		del_story->del_stmt = stmt;
+		for (uint32_t i = 0; i < story->index_count; i++) {
+			if (story->link[i].is_old_story)
+				continue;
+			if (story->link[i].old_tuple == old_tuple) {
+				story->link[i].is_old_story = true;
+				story->link[i].old_story = del_story;
+			} else if (story->link[i].old_tuple != NULL) {
+				tuple_ref(story->link[i].old_tuple);
+			}
+		}
+	}
+
+	while (collected_conflicts != NULL) {
+		if (txm_cause_conflict(collected_conflicts->wreaker,
+				       stmt->txn) != 0) {
+			goto fail;
+		}
+		collected_conflicts = collected_conflicts->next;
+	}
+	stmt->add_story = story;
+
+	region_truncate(region, region_svp);
+	return 0;
+
+fail:
+	for (uint32_t j = story->index_count; j > 0; j--) {
+		uint32_t i = j - 1;
+		if (story->link[i].is_old_story) {
+			struct txm_story *next = story->link[i].old_story;
+			story->link[i].is_old_story = false;
+			story->link[i].old_tuple = next->tuple;
+			next->link[i].new_story = NULL;
+		}
+	}
+	region_truncate(region, region_svp);
+	return -1;
+}
+
+int
+txm_link_del_story(struct tuple *old_tuple, struct txn_stmt *stmt,
+		   uint32_t index_count)
+{
+	if (old_tuple->is_dirty) {
+		struct txm_story *story = txm_story_get(old_tuple);
+		stmt->next_in_del_list = story->del_stmt;
+		story->del_stmt = stmt;
+		stmt->del_story = story;
+		return 0;
+	}
+	struct txm_story *del_story;
+	del_story = txm_story_new(old_tuple, NULL, index_count);
+	if (del_story == NULL)
+		return -1;
+	del_story->del_stmt = stmt;
+	stmt->del_story = del_story;
+	return 0;
+}
+
+void
+txm_unlink_add_story(struct txn_stmt *stmt)
+{
+	assert(stmt->add_story != NULL);
+
+	struct txm_story *story = stmt->add_story;
+
+	for (uint32_t i = 0; i < story->index_count; i++) {
+		struct txm_story_link *from = &story->link[i];
+		if (from->new_story == NULL) {
+			struct tuple *unused;
+			struct index *index = stmt->space->index[i];
+			struct tuple *rollback = from->is_old_story ?
+					 from->old_story->tuple :
+					 from->old_tuple;
+			if (index_replace(index, story->tuple, rollback,
+					  DUP_INSERT, &unused) != 0) {
+				diag_log();
+				unreachable();
+				panic("failed to rollback change");
+			}
+			if (i == 0 && rollback != NULL)
+				tuple_ref(rollback);
+		} else {
+			struct txm_story *new_story = from->new_story;
+			struct txm_story_link *to = &new_story->link[i];
+			assert(to->is_old_story);
+			assert(to->old_story == story);
+			to->is_old_story = from->is_old_story;
+			if (from->is_old_story) {
+				to->old_story = from->old_story;
+				from->old_story = NULL;
+			} else {
+				to->old_tuple = from->old_tuple;
+				from->old_tuple = NULL;
+			}
+			from->is_old_story = false;
+		}
+	}
+
+	stmt->add_story = NULL;
+}
+
+void
+txm_unlink_del_story(struct txn_stmt *stmt)
+{
+	assert(stmt->del_story != NULL);
+
+	struct txm_story *story = stmt->del_story;
+
+	struct txn_stmt **prev = &story->del_stmt;
+	while (*prev != stmt) {
+		prev = &(*prev)->next_in_del_list;
+		assert(*prev != NULL);
+	}
+	*prev = stmt->next_in_del_list;
+
+	stmt->del_story = NULL;
+}
+
+void
+txm_prepare_add_story(struct txn_stmt *stmt)
+{
+	assert(stmt->txn->psn != 0);
+	assert(stmt->add_story != NULL);
+	struct txm_story *story = stmt->add_story;
+
+	/* Move story to the past to prepared stories. */
+
+	while (true) {
+		if (!story->link[0].is_old_story)
+			break; /* tuple is prepared. */
+		struct txm_story *old_story = story->link[0].old_story;
+		if (old_story->del_psn != 0)
+			break; /* is psn is set, the change is prepared. */
+		if (old_story->add_psn != 0)
+			break; /* is psn is set, the change is prepared. */
+		if (old_story->add_stmt != NULL)
+			break; /* ancient */
+		if (old_story->add_stmt->txn == stmt->txn)
+			break; /* added by us. */
+
+		/* Swap story and old story. */
+		for (uint32_t i = 0; i < old_story->index_count; i++) {
+			if (!story->link[i].is_old_story ||
+			    story->link[i].old_story != old_story)
+				continue;
+			if (story->link[i].new_story == NULL) {
+				/* we have to replace the tuple in index. */
+				struct tuple *unused;
+				struct index *index = stmt->space->index[i];
+				if (index_replace(index, story->tuple,
+				                  old_story->tuple,
+				                  DUP_INSERT, &unused) != 0) {
+					diag_log();
+					unreachable();
+					panic("failed to rollback change");
+				}
+			} else {
+				assert(story->link[i].new_story->link[i].old_story == story);
+				story->link[i].new_story->link[i].old_story = old_story;
+			}
+			old_story->link[i].new_story = story->link[i].new_story;
+
+			story->link[i].is_old_story = old_story->link[i].is_old_story;
+			if (old_story->link[i].is_old_story) {
+				story->link[i].old_story = old_story->link[i].old_story;
+				assert(old_story->link[i].old_story->link[i].new_story == old_story);
+				old_story->link[i].old_story->link[i].new_story = story;
+			} else {
+				story->link[i].old_tuple = old_story->link[i].old_tuple;
+			}
+			old_story->link[i].is_old_story = true;
+			old_story->link[i].old_story = story;
+		}
+	}
+	story->add_psn = stmt->txn->psn;
+}
+
+void
+txm_let_it_go(struct txn_stmt *stmt)
+{
+	if (stmt->add_story != NULL) {
+		stmt->add_story->add_stmt = NULL;
+		stmt->add_story = NULL;
+	}
+	if (stmt->del_story != NULL) {
+		struct txn_stmt **dels = &stmt->del_story->del_stmt;
+		while ((*dels) != stmt) {
+			assert(*dels != NULL);
+			dels = &(*dels)->next_in_del_list;
+		}
+		*dels = (*dels)->next_in_del_list;
+		stmt->del_story = NULL;
+	}
+}
+
+struct txm_story *
+txm_story_get(struct tuple *tuple)
+{
+	assert(tuple->is_dirty);
+
+	mh_int_t pos = mh_history_find(txm.history, tuple, 0);
+	assert(pos != mh_end(txm.history));
+	return *mh_history_node(txm.history, pos);
+}
+
+struct tuple *
+txm_tuple_clarify_slow(struct txn* txn, struct tuple* tuple, uint32_t index,
+                       uint32_t mk_index, bool prepared_ok)
+{
+	struct tuple *result = NULL;
+	struct mh_history_t *hist = txm.history;
+	mh_int_t pos = mh_history_find(hist, tuple, 0);
+	assert(pos != mh_end(hist));
+	struct txm_story *story = *mh_history_node(hist, pos);
+	bool own_change = false;
+
+	while (true) {
+		struct txn_stmt *dels = story->del_stmt;
+		while (dels != NULL) {
+			if (dels->txn == txn) {
+				/* deleted by us. */
+				own_change = true;
+				break;
+			}
+			dels = dels->next_in_del_list;
+		}
+		if (prepared_ok && story->del_psn != 0) {
+			/* deleted by prepared. */
+			break;
+		}
+		if (story->del_psn != 0 && story->del_stmt == NULL) {
+			/* deleted by committed. */
+			break;
+		}
+
+		if (story->add_stmt != NULL && story->add_stmt->txn == txn) {
+			/* added by us. */
+			result = story->tuple;
+			break;
+		}
+
+		if (prepared_ok && story->add_psn != 0) {
+			/* added by prepared. */
+			result = story->tuple;
+			break;
+		}
+
+		if (story->add_psn != 0 && story->add_stmt == NULL) {
+			/* added by committed. */
+			result = story->tuple;
+			break;
+		}
+
+		if (!story->link[index].is_old_story) {
+			return story->link[index].old_tuple;
+		}
+
+		story = story->link[index].old_story;
+	}
+	(void)own_change; /* TODO: add conflict */
+	(void)mk_index; /* TODO: multiindex */
+	return result;
+}
+void
+txm_story_delete(struct txm_story *story)
+{
+	if (txm.traverse_all_stories == &story->in_all_stories)
+		txm.traverse_all_stories = rlist_next(txm.traverse_all_stories);
+	rlist_del(&story->in_all_stories);
+	tuple_unref(story->tuple);
+	story->tuple->is_dirty = false;
+
+	for (uint32_t i = 0; i < story->index_count; i++) {
+		if (!story->link[i].is_old_story &&
+		    story->link[i].old_tuple != NULL) {
+			tuple_unref(story->link[i].old_tuple);
+		}
+	}
+
+#ifndef NDEBUG
+	const char poison_char = '?';
+	size_t item_size = sizeof(struct txm_story) +
+		story->index_count * sizeof(struct txm_story_link);
+	memset(story, poison_char, item_size);
+#endif
+
+	struct mempool *pool = &txm.txm_story_pool[story->index_count];
+	mempool_free(pool, story);
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index 92c0116..b5eda1e 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -36,6 +36,7 @@
 #include "trigger.h"
 #include "fiber.h"
 #include "space.h"
+#include "tuple.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -123,6 +124,18 @@ struct txn_stmt {
 	struct space *space;
 	struct tuple *old_tuple;
 	struct tuple *new_tuple;
+	/**
+	 * If new_tuple != NULL and this transaction was not prepared,
+	 * this member holds added story of the new_tuple.
+	 */
+	struct txm_story *add_story;
+	/**
+	 * If new_tuple == NULL and this transaction was not prepared,
+	 * this member holds added story of the old_tuple.
+	 */
+	struct txm_story *del_story;
+	/** Link in txm_story::del_stmt linked list. */
+	struct txn_stmt *next_in_del_list;
 	/** Engine savepoint for the start of this statement. */
 	void *engine_savepoint;
 	/** Redo info: the binary log row */
@@ -284,6 +297,80 @@ struct txn {
 	struct rlist conflicted_by_list;
 };
 
+/**
+ * Link that connects a txm_story with older and newer stories of the same
+ * key in index.
+ */
+struct txm_story_link {
+	/** Story that was happened after that story was ended. */
+	struct txm_story *new_story;
+	/** Flag whether there is older story of the same key in index. */
+	bool is_old_story;
+	union {
+		/** is_old_story = true. Older story of the same key. */
+		struct txm_story *old_story;
+		/**
+		 * is_old_story = false. Tuple that was in the index before
+		 * this story. That tuple is either NULL or so old that
+		 * we don't have a story about it.
+		 */
+
+		struct tuple *old_tuple;
+	};
+};
+
+/**
+ * A part of a history of a value in space.
+ * It's a story about a tuple, from the point it was added to space to the
+ * point when it was deleted from a space.
+ * All stories are linked into a list of stories of the same key of each index.
+ */
+struct txm_story {
+	/** The story is about thist tuple. */
+	struct tuple *tuple;
+	/**
+	 * Statement that told this story. Is set to NULL when the statement's
+	 * transaction becomes committed. Can also be NULL if we don't know who
+	 * introduced that story.
+	 */
+	struct txn_stmt *add_stmt;
+	/**
+	 * Prepare sequence number of add_stmt's transaction. Is set when
+	 * the transactions is prepared. Can be 0 if the transaction is
+	 * in progress or we don't know who introduced that story.
+	 */
+	int64_t add_psn;
+	/**
+	 * Statement that ended this story. Is set to NULL when the statement's
+	 * transaction becomes committed. Can also be NULL if the tuple has not
+	 * been deleted yet.
+	 */
+	struct txn_stmt *del_stmt;
+	/**
+	 * Prepare sequence number of del_stmt's transaction. Is set when
+	 * the transactions is prepared. Can be 0 if the transaction is
+	 * in progress or if nobody has deleted the tuple.
+	 */
+	int64_t del_psn;
+	/**
+	 * List of trackers - transactions that has read this tuple.
+	 */
+	struct rlist reader_list;
+	/**
+	 * Link in tx_manager::all_stories
+	 */
+	struct rlist in_all_stories;
+	/**
+	 * Number of indexes in this space - and the count of link[].
+	 */
+	uint32_t index_count;
+	/**
+	 * Link with older and newer stories (and just tuples) for each
+	 * index respectively.
+	 */
+	struct txm_story_link link[];
+};
+
 static inline bool
 txn_has_flag(struct txn *txn, enum txn_flag flag)
 {
@@ -652,6 +739,48 @@ tx_manager_free();
 int
 txm_cause_conflict(struct txn *wreaker, struct txn *victim);
 
+struct txm_story *
+txm_story_new(struct tuple *tuple, struct txn_stmt *stmt, uint32_t index_count);
+
+int
+txm_check_and_link_add_story(struct txm_story *story, struct txn_stmt *stmt,
+			     enum dup_replace_mode mode);
+
+int
+txm_link_del_story(struct tuple *old_tuple, struct txn_stmt *stmt,
+		   uint32_t index_count);
+
+void
+txm_unlink_add_story(struct txn_stmt *stmt);
+
+void
+txm_unlink_del_story(struct txn_stmt *stmt);
+
+void
+txm_prepare_add_story(struct txn_stmt *stmt);
+
+void
+txm_let_it_go(struct txn_stmt *stmt);
+
+struct txm_story *
+txm_story_get(struct tuple *tuple);
+
+struct tuple *
+txm_tuple_clarify_slow(struct txn *txn, struct tuple *tuple, uint32_t index,
+                       uint32_t mk_index, bool prepared_ok);
+
+static inline struct tuple*
+txm_tuple_clarify(struct txn *txn, struct tuple* tuple, uint32_t index,
+                  uint32_t mk_index, bool prepared_ok)
+{
+	if (!tuple->is_dirty)
+		return tuple;
+	return txm_tuple_clarify_slow(txn, tuple, index, mk_index, prepared_ok);
+}
+
+void
+txm_story_delete(struct txm_story *story);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
-- 
2.7.4



More information about the Tarantool-patches mailing list