[Tarantool-patches] [PATCH v3 10/13] tmx: introduce conflict tracker

Aleksandr Lyapunov alyapunov at tarantool.org
Wed Jul 15 16:55:33 MSK 2020


There are situations when we have to track that if some TX is
committed then some others must be aborted due to conflict.
The common case is that one r/w TX have read some value while the
second is about to overwrite the value; is the second is committed,
the first must be aborted.
Thus we have to store many-to-many TX relations between breaker
TX and victim TX.
The patch implements that.

Part of #4897
---
 src/box/txn.c | 156 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 src/box/txn.h |  32 ++++++++++++
 2 files changed, 187 insertions(+), 1 deletion(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index 7a15986..ba81b58 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -42,6 +42,12 @@ struct tx_manager
 {
 	/** Last prepare-sequence-number that was assigned to prepared TX. */
 	int64_t last_psn;
+	/**
+	 * List of all transactions that are in a read view.
+	 * New transactions are added to the tail of this list,
+	 * so the list is ordered by rv_psn.
+	 */
+	struct rlist read_view_txs;
 };
 
 /** That's a definition, see declaration for description. */
@@ -50,6 +56,21 @@ bool tx_manager_use_mvcc_engine = false;
 /** The one and only instance of tx_manager. */
 static struct tx_manager txm;
 
+/**
+ * Record that links two transactions, breaker and victim.
+ * See txm_cause_conflict for details.
+ */
+struct tx_conflict_tracker {
+	/** TX that aborts victim on commit. */
+	struct txn *breaker;
+	/** TX that aborts will be aborted on breaker's commit. */
+	struct txn *victim;
+	/** Link in breaker->conflict_list. */
+	struct rlist in_conflict_list;
+	/** Link in victim->conflicted_by_list. */
+	struct rlist in_conflicted_by_list;
+};
+
 double too_long_threshold;
 
 /* Txn cache. */
@@ -209,6 +230,9 @@ txn_new(void)
 	}
 	assert(region_used(&region) == sizeof(*txn));
 	txn->region = region;
+	rlist_create(&txn->conflict_list);
+	rlist_create(&txn->conflicted_by_list);
+	rlist_create(&txn->in_read_view_txs);
 	return txn;
 }
 
@@ -218,6 +242,22 @@ txn_new(void)
 inline static void
 txn_free(struct txn *txn)
 {
+	struct tx_conflict_tracker *entry, *next;
+	rlist_foreach_entry_safe(entry, &txn->conflict_list,
+				 in_conflict_list, next) {
+		rlist_del(&entry->in_conflict_list);
+		rlist_del(&entry->in_conflicted_by_list);
+	}
+	rlist_foreach_entry_safe(entry, &txn->conflicted_by_list,
+				 in_conflicted_by_list, next) {
+		rlist_del(&entry->in_conflict_list);
+		rlist_del(&entry->in_conflicted_by_list);
+	}
+	assert(rlist_empty(&txn->conflict_list));
+	assert(rlist_empty(&txn->conflicted_by_list));
+
+	rlist_del(&txn->in_read_view_txs);
+
 	struct txn_stmt *stmt;
 	stailq_foreach_entry(stmt, &txn->stmts, next)
 		txn_stmt_destroy(stmt);
@@ -235,6 +275,8 @@ txn_begin(void)
 	struct txn *txn = txn_new();
 	if (txn == NULL)
 		return NULL;
+	assert(rlist_empty(&txn->conflict_list));
+	assert(rlist_empty(&txn->conflicted_by_list));
 
 	/* Initialize members explicitly to save time on memset() */
 	stailq_create(&txn->stmts);
@@ -245,6 +287,7 @@ txn_begin(void)
 	txn->in_sub_stmt = 0;
 	txn->id = ++tsn;
 	txn->psn = 0;
+	txn->rv_psn = 0;
 	txn->status = TXN_INPROGRESS;
 	txn->signature = TXN_SIGNATURE_ROLLBACK;
 	txn->engine = NULL;
@@ -293,6 +336,15 @@ txn_begin_stmt(struct txn *txn, struct space *space)
 		diag_set(ClientError, ER_SUB_STMT_MAX);
 		return -1;
 	}
+
+	/*
+	 * A conflict have happened; there is no reason to continue the TX.
+	 */
+	if (txn->status == TXN_CONFLICTED) {
+		diag_set(ClientError, ER_TRANSACTION_CONFLICT);
+		return -1;
+	}
+
 	struct txn_stmt *stmt = txn_stmt_new(&txn->region);
 	if (stmt == NULL)
 		return -1;
@@ -647,6 +699,32 @@ txn_journal_entry_new(struct txn *txn)
 	return req;
 }
 
+/**
+ * Handle conflict when @breaker transaction is prepared.
+ * The conflict is happened if @victim have read something that @breaker
+ * overwrites.
+ * If @victim is read-only or haven't made any changes, it should be send
+ * to read view, in which is will not see @breaker.
+ * Otherwise @vistim must be marked as conflicted.
+ */
+static void
+txn_handle_conflict(struct txn *breaker, struct txn *victim)
+{
+	assert(breaker->psn != 0);
+	if (!victim->status != TXN_INPROGRESS) {
+		/* Was conflicted by somebody else. */
+		return;
+	}
+	if (stailq_empty(&victim->stmts)) {
+		/* Send to read view. */
+		victim->rv_psn = breaker->psn;
+		rlist_add_tail(&txm.read_view_txs, &victim->in_read_view_txs);
+	} else {
+		/* Mark as conflicted. */
+		victim->status = TXN_CONFLICTED;
+	}
+}
+
 /*
  * Prepare a transaction using engines.
  */
@@ -670,6 +748,17 @@ txn_prepare(struct txn *txn)
 		diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
 		return -1;
 	}
+
+	/*
+	 * Somebody else has written some value that we have read.
+	 * The transaction is not possible.
+	 */
+	if (txn->status == TXN_CONFLICTED || txn->status == TXN_IN_READ_VIEW) {
+		diag_set(ClientError, ER_TRANSACTION_CONFLICT);
+		return -1;
+	}
+	assert(txn->status == TXN_INPROGRESS);
+
 	/*
 	 * Perform transaction conflict resolution. Engine == NULL when
 	 * we have a bunch of IPROTO_NOP statements.
@@ -678,6 +767,26 @@ txn_prepare(struct txn *txn)
 		if (engine_prepare(txn->engine, txn) != 0)
 			return -1;
 	}
+
+	struct tx_conflict_tracker *entry, *next;
+	/* Handle conflicts. */
+	rlist_foreach_entry_safe(entry, &txn->conflict_list,
+				 in_conflict_list, next) {
+		assert(entry->breaker == txn);
+		txn_handle_conflict(txn, entry->victim);
+		rlist_del(&entry->in_conflict_list);
+		rlist_del(&entry->in_conflicted_by_list);
+	}
+	/* Just free conflict list - we don't need it anymore. */
+	rlist_foreach_entry_safe(entry, &txn->conflicted_by_list,
+				 in_conflicted_by_list, next) {
+		assert(entry->victim == txn);
+		rlist_del(&entry->in_conflict_list);
+		rlist_del(&entry->in_conflicted_by_list);
+	}
+	assert(rlist_empty(&txn->conflict_list));
+	assert(rlist_empty(&txn->conflicted_by_list));
+
 	trigger_clear(&txn->fiber_on_stop);
 	if (!txn_has_flag(txn, TXN_CAN_YIELD))
 		trigger_clear(&txn->fiber_on_yield);
@@ -1184,10 +1293,55 @@ txn_on_yield(struct trigger *trigger, void *event)
 void
 tx_manager_init()
 {
-	(void)txm;
+	rlist_create(&txm.read_view_txs);
 }
 
 void
 tx_manager_free()
 {
 }
+
+int
+txm_cause_conflict(struct txn *breaker, struct txn *victim)
+{
+	struct tx_conflict_tracker *tracker = NULL;
+	struct rlist *r1 = breaker->conflict_list.next;
+	struct rlist *r2 = victim->conflicted_by_list.next;
+	while (r1 != &breaker->conflict_list &&
+	       r2 != &victim->conflicted_by_list) {
+		tracker = rlist_entry(r1, struct tx_conflict_tracker,
+				      in_conflict_list);
+		assert(tracker->breaker == breaker);
+		if (tracker->victim == victim)
+			break;
+		tracker = rlist_entry(r2, struct tx_conflict_tracker,
+				      in_conflicted_by_list);
+		assert(tracker->victim == victim);
+		if (tracker->breaker == breaker)
+			break;
+		tracker = NULL;
+		r1 = r1->next;
+		r2 = r2->next;
+	}
+	if (tracker != NULL) {
+		/* Move to the beginning of a list
+		 * for a case of subsequent lookups */
+		rlist_del(&tracker->in_conflict_list);
+		rlist_del(&tracker->in_conflicted_by_list);
+	} else {
+		size_t size;
+		tracker = region_alloc_object(&victim->region,
+					      struct tx_conflict_tracker,
+					      &size);
+		if (tracker == NULL) {
+			diag_set(OutOfMemory, size, "tx region",
+				 "conflict_tracker");
+			return -1;
+		}
+		tracker->breaker = breaker;
+		tracker->victim = victim;
+	}
+	rlist_add(&breaker->conflict_list, &tracker->in_conflict_list);
+	rlist_add(&victim->conflicted_by_list, &tracker->in_conflicted_by_list);
+	return 0;
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index b5b94fd..03ccb76 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -275,6 +275,11 @@ struct txn {
 	 * Transactions are committed in that order.
 	 */
 	int64_t psn;
+	/**
+	 * Read view of that TX. The TX can see only changes with ps < rv_psn.
+	 * Is nonzero if and only if status = TXN_IN_READ_VIEW.
+	 */
+	int64_t rv_psn;
 	/** Status of the TX */
 	enum txn_status status;
 	/** List of statements in a transaction. */
@@ -336,6 +341,22 @@ struct txn {
 	uint32_t fk_deferred_count;
 	/** List of savepoints to find savepoint by name. */
 	struct rlist savepoints;
+	/**
+	 * List of tx_conflict_tracker records where .breaker is the current
+	 * transaction and .victim is the transactions that must be aborted
+	 * if the current transaction is committed.
+	 */
+	struct rlist conflict_list;
+	/**
+	 * List of tx_conflict_tracker records where .victim is the current
+	 * transaction and .breaker is the transactions that, if committed,
+	 * will abort the current transaction.
+	 */
+	struct rlist conflicted_by_list;
+	/**
+	 * Link in tx_manager::read_view_txs.
+	 */
+	struct rlist in_read_view_txs;
 };
 
 static inline bool
@@ -717,6 +738,17 @@ tx_manager_init();
 void
 tx_manager_free();
 
+/**
+ * Notify TX manager that if transaction @breaker is committed then the
+ * transactions @victim must be aborted due to conflict.
+ * For example: there's two rw transaction in progress, one have read
+ * some value while the second is about to overwrite it. If the second
+ * is committed first, the first must be aborted.
+ * @return 0 on success, -1 on memory error.
+ */
+int
+txm_cause_conflict(struct txn *breaker, struct txn *victim);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
-- 
2.7.4



More information about the Tarantool-patches mailing list