[Tarantool-patches] [PATCH v4 06/12] txm: introduce conflict tracker
Aleksandr Lyapunov
alyapunov at tarantool.org
Tue Sep 8 13:22:06 MSK 2020
There are situations when we have to track that if some TX is
committed then some others must be aborted due to conflict.
The common case is that one r/w TX have read some value while the
second is about to overwrite the value; if the second is committed,
the first must be aborted.
Thus we have to store many-to-many TX relations between breaker
TX and victim TX.
The patch implements that.
Part of #4897
---
src/box/memtx_tx.c | 88 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
src/box/memtx_tx.h | 38 +++++++++++++++++++++++
src/box/txn.c | 63 ++++++++++++++++++++++++++++++++++++++
src/box/txn.h | 21 +++++++++++++
4 files changed, 209 insertions(+), 1 deletion(-)
diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index 479aa48..eb2ab51 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -30,8 +30,20 @@
*/
#include "memtx_tx.h"
+#include <assert.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include "txn.h"
+
struct tx_manager
{
+ /**
+ * List of all transactions that are in a read view.
+ * New transactions are added to the tail of this list,
+ * so the list is ordered by rv_psn.
+ */
+ struct rlist read_view_txs;
};
/** That's a definition, see declaration for description. */
@@ -43,10 +55,84 @@ static struct tx_manager txm;
void
memtx_tx_manager_init()
{
- (void)txm;
+ rlist_create(&txm.read_view_txs);
}
void
memtx_tx_manager_free()
{
}
+
+int
+memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim)
+{
+ struct tx_conflict_tracker *tracker = NULL;
+ struct rlist *r1 = breaker->conflict_list.next;
+ struct rlist *r2 = victim->conflicted_by_list.next;
+ while (r1 != &breaker->conflict_list &&
+ r2 != &victim->conflicted_by_list) {
+ tracker = rlist_entry(r1, struct tx_conflict_tracker,
+ in_conflict_list);
+ assert(tracker->breaker == breaker);
+ if (tracker->victim == victim)
+ break;
+ tracker = rlist_entry(r2, struct tx_conflict_tracker,
+ in_conflicted_by_list);
+ assert(tracker->victim == victim);
+ if (tracker->breaker == breaker)
+ break;
+ tracker = NULL;
+ r1 = r1->next;
+ r2 = r2->next;
+ }
+ if (tracker != NULL) {
+ /*
+ * Move to the beginning of a list
+ * for a case of subsequent lookups.
+ */
+ rlist_del(&tracker->in_conflict_list);
+ rlist_del(&tracker->in_conflicted_by_list);
+ } else {
+ size_t size;
+ tracker = region_alloc_object(&victim->region,
+ struct tx_conflict_tracker,
+ &size);
+ if (tracker == NULL) {
+ diag_set(OutOfMemory, size, "tx region",
+ "conflict_tracker");
+ return -1;
+ }
+ tracker->breaker = breaker;
+ tracker->victim = victim;
+ }
+ rlist_add(&breaker->conflict_list, &tracker->in_conflict_list);
+ rlist_add(&victim->conflicted_by_list, &tracker->in_conflicted_by_list);
+ return 0;
+}
+
+/**
+ * Handle conflict when @breaker transaction is prepared.
+ * The conflict is happened if @victim have read something that @breaker
+ * overwrites.
+ * If @victim is read-only or haven't made any changes, it should be send
+ * to read view, in which is will not see @breaker.
+ * Otherwise @vistim must be marked as conflicted.
+ */
+void
+memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim)
+{
+ assert(breaker->psn != 0);
+ if (victim->status != TXN_INPROGRESS) {
+ /* Was conflicted by somebody else. */
+ return;
+ }
+ if (stailq_empty(&victim->stmts)) {
+ /* Send to read view. */
+ victim->status = TXN_IN_READ_VIEW;
+ victim->rv_psn = breaker->psn;
+ rlist_add_tail(&txm.read_view_txs, &victim->in_read_view_txs);
+ } else {
+ /* Mark as conflicted. */
+ victim->status = TXN_CONFLICTED;
+ }
+}
diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
index fb2cb4d..6143a22 100644
--- a/src/box/memtx_tx.h
+++ b/src/box/memtx_tx.h
@@ -32,6 +32,8 @@
#include <stdbool.h>
+#include "small/rlist.h"
+
#if defined(__cplusplus)
extern "C" {
#endif /* defined(__cplusplus) */
@@ -44,6 +46,21 @@ extern "C" {
extern bool memtx_tx_manager_use_mvcc_engine;
/**
+ * Record that links two transactions, breaker and victim.
+ * See memtx_tx_cause_conflict for details.
+ */
+struct tx_conflict_tracker {
+ /** TX that aborts victim on commit. */
+ struct txn *breaker;
+ /** TX that will be aborted on breaker's commit. */
+ struct txn *victim;
+ /** Link in breaker->conflict_list. */
+ struct rlist in_conflict_list;
+ /** Link in victim->conflicted_by_list. */
+ struct rlist in_conflicted_by_list;
+};
+
+/**
* Initialize memtx transaction manager.
*/
void
@@ -55,6 +72,27 @@ memtx_tx_manager_init();
void
memtx_tx_manager_free();
+/**
+ * Notify TX manager that if transaction @breaker is committed then the
+ * transaction @victim must be aborted due to conflict.
+ * For example: there's two rw transaction in progress, one have read
+ * some value while the second is about to overwrite it. If the second
+ * is committed first, the first must be aborted.
+ * @return 0 on success, -1 on memory error.
+ */
+int
+memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim);
+
+/**
+ * Handle conflict when @breaker transaction is prepared.
+ * The conflict is happened if @victim have read something that @breaker
+ * overwrites.
+ * If @victim is read-only or haven't made any changes, it should be sent
+ * to read view, in which is will not see @breaker.
+ * Otherwise @victim must be marked as conflicted.
+ */
+void
+memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim);
#if defined(__cplusplus)
} /* extern "C" */
diff --git a/src/box/txn.c b/src/box/txn.c
index 1dfe59f..976e17c 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -29,6 +29,7 @@
* SUCH DAMAGE.
*/
#include "txn.h"
+#include "memtx_tx.h"
#include "txn_limbo.h"
#include "engine.h"
#include "tuple.h"
@@ -193,6 +194,9 @@ txn_new(void)
}
assert(region_used(®ion) == sizeof(*txn));
txn->region = region;
+ rlist_create(&txn->conflict_list);
+ rlist_create(&txn->conflicted_by_list);
+ rlist_create(&txn->in_read_view_txs);
return txn;
}
@@ -202,6 +206,22 @@ txn_new(void)
inline static void
txn_free(struct txn *txn)
{
+ struct tx_conflict_tracker *entry, *next;
+ rlist_foreach_entry_safe(entry, &txn->conflict_list,
+ in_conflict_list, next) {
+ rlist_del(&entry->in_conflict_list);
+ rlist_del(&entry->in_conflicted_by_list);
+ }
+ rlist_foreach_entry_safe(entry, &txn->conflicted_by_list,
+ in_conflicted_by_list, next) {
+ rlist_del(&entry->in_conflict_list);
+ rlist_del(&entry->in_conflicted_by_list);
+ }
+ assert(rlist_empty(&txn->conflict_list));
+ assert(rlist_empty(&txn->conflicted_by_list));
+
+ rlist_del(&txn->in_read_view_txs);
+
struct txn_stmt *stmt;
stailq_foreach_entry(stmt, &txn->stmts, next)
txn_stmt_destroy(stmt);
@@ -219,6 +239,8 @@ txn_begin(void)
struct txn *txn = txn_new();
if (txn == NULL)
return NULL;
+ assert(rlist_empty(&txn->conflict_list));
+ assert(rlist_empty(&txn->conflicted_by_list));
/* Initialize members explicitly to save time on memset() */
stailq_create(&txn->stmts);
@@ -229,6 +251,7 @@ txn_begin(void)
txn->in_sub_stmt = 0;
txn->id = ++tsn;
txn->psn = 0;
+ txn->rv_psn = 0;
txn->status = TXN_INPROGRESS;
txn->signature = TXN_SIGNATURE_ROLLBACK;
txn->engine = NULL;
@@ -277,6 +300,15 @@ txn_begin_stmt(struct txn *txn, struct space *space)
diag_set(ClientError, ER_SUB_STMT_MAX);
return -1;
}
+
+ /*
+ * A conflict have happened; there is no reason to continue the TX.
+ */
+ if (txn->status == TXN_CONFLICTED) {
+ diag_set(ClientError, ER_TRANSACTION_CONFLICT);
+ return -1;
+ }
+
struct txn_stmt *stmt = txn_stmt_new(&txn->region);
if (stmt == NULL)
return -1;
@@ -669,6 +701,17 @@ txn_prepare(struct txn *txn)
diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
return -1;
}
+
+ /*
+ * Somebody else has written some value that we have read.
+ * The RW transaction is not possible.
+ */
+ if (txn->status == TXN_CONFLICTED ||
+ (txn->status == TXN_IN_READ_VIEW && !stailq_empty(&txn->stmts))) {
+ diag_set(ClientError, ER_TRANSACTION_CONFLICT);
+ return -1;
+ }
+
/*
* Perform transaction conflict resolution. Engine == NULL when
* we have a bunch of IPROTO_NOP statements.
@@ -677,6 +720,26 @@ txn_prepare(struct txn *txn)
if (engine_prepare(txn->engine, txn) != 0)
return -1;
}
+
+ struct tx_conflict_tracker *entry, *next;
+ /* Handle conflicts. */
+ rlist_foreach_entry_safe(entry, &txn->conflict_list,
+ in_conflict_list, next) {
+ assert(entry->breaker == txn);
+ memtx_tx_handle_conflict(txn, entry->victim);
+ rlist_del(&entry->in_conflict_list);
+ rlist_del(&entry->in_conflicted_by_list);
+ }
+ /* Just free conflict list - we don't need it anymore. */
+ rlist_foreach_entry_safe(entry, &txn->conflicted_by_list,
+ in_conflicted_by_list, next) {
+ assert(entry->victim == txn);
+ rlist_del(&entry->in_conflict_list);
+ rlist_del(&entry->in_conflicted_by_list);
+ }
+ assert(rlist_empty(&txn->conflict_list));
+ assert(rlist_empty(&txn->conflicted_by_list));
+
trigger_clear(&txn->fiber_on_stop);
if (!txn_has_flag(txn, TXN_CAN_YIELD))
trigger_clear(&txn->fiber_on_yield);
diff --git a/src/box/txn.h b/src/box/txn.h
index d7e77e5..f957d1e 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -272,6 +272,11 @@ struct txn {
* Transactions are committed in that order.
*/
int64_t psn;
+ /**
+ * Read view of that TX. The TX can see only changes with ps < rv_psn.
+ * Is nonzero if and only if status = TXN_IN_READ_VIEW.
+ */
+ int64_t rv_psn;
/** Status of the TX */
enum txn_status status;
/** List of statements in a transaction. */
@@ -333,6 +338,22 @@ struct txn {
uint32_t fk_deferred_count;
/** List of savepoints to find savepoint by name. */
struct rlist savepoints;
+ /**
+ * List of tx_conflict_tracker records where .breaker is the current
+ * transaction and .victim is the transactions that must be aborted
+ * if the current transaction is committed.
+ */
+ struct rlist conflict_list;
+ /**
+ * List of tx_conflict_tracker records where .victim is the current
+ * transaction and .breaker is the transactions that, if committed,
+ * will abort the current transaction.
+ */
+ struct rlist conflicted_by_list;
+ /**
+ * Link in tx_manager::read_view_txs.
+ */
+ struct rlist in_read_view_txs;
};
static inline bool
--
2.7.4
More information about the Tarantool-patches
mailing list