From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 3B66544643D for ; Tue, 8 Sep 2020 13:22:16 +0300 (MSK) Received: by smtpng1.m.smailru.net with esmtpa (envelope-from ) id 1kFalT-0006tc-ON for tarantool-patches@dev.tarantool.org; Tue, 08 Sep 2020 13:22:16 +0300 From: Aleksandr Lyapunov Date: Tue, 8 Sep 2020 13:22:06 +0300 Message-Id: <1599560532-27089-7-git-send-email-alyapunov@tarantool.org> In-Reply-To: <1599560532-27089-1-git-send-email-alyapunov@tarantool.org> References: <1599560532-27089-1-git-send-email-alyapunov@tarantool.org> Subject: [Tarantool-patches] [PATCH v4 06/12] txm: introduce conflict tracker List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org There are situations when we have to track that if some TX is committed then some others must be aborted due to conflict. The common case is that one r/w TX have read some value while the second is about to overwrite the value; if the second is committed, the first must be aborted. Thus we have to store many-to-many TX relations between breaker TX and victim TX. The patch implements that. Part of #4897 --- src/box/memtx_tx.c | 88 +++++++++++++++++++++++++++++++++++++++++++++++++++++- src/box/memtx_tx.h | 38 +++++++++++++++++++++++ src/box/txn.c | 63 ++++++++++++++++++++++++++++++++++++++ src/box/txn.h | 21 +++++++++++++ 4 files changed, 209 insertions(+), 1 deletion(-) diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c index 479aa48..eb2ab51 100644 --- a/src/box/memtx_tx.c +++ b/src/box/memtx_tx.c @@ -30,8 +30,20 @@ */ #include "memtx_tx.h" +#include +#include +#include + +#include "txn.h" + struct tx_manager { + /** + * List of all transactions that are in a read view. + * New transactions are added to the tail of this list, + * so the list is ordered by rv_psn. + */ + struct rlist read_view_txs; }; /** That's a definition, see declaration for description. */ @@ -43,10 +55,84 @@ static struct tx_manager txm; void memtx_tx_manager_init() { - (void)txm; + rlist_create(&txm.read_view_txs); } void memtx_tx_manager_free() { } + +int +memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim) +{ + struct tx_conflict_tracker *tracker = NULL; + struct rlist *r1 = breaker->conflict_list.next; + struct rlist *r2 = victim->conflicted_by_list.next; + while (r1 != &breaker->conflict_list && + r2 != &victim->conflicted_by_list) { + tracker = rlist_entry(r1, struct tx_conflict_tracker, + in_conflict_list); + assert(tracker->breaker == breaker); + if (tracker->victim == victim) + break; + tracker = rlist_entry(r2, struct tx_conflict_tracker, + in_conflicted_by_list); + assert(tracker->victim == victim); + if (tracker->breaker == breaker) + break; + tracker = NULL; + r1 = r1->next; + r2 = r2->next; + } + if (tracker != NULL) { + /* + * Move to the beginning of a list + * for a case of subsequent lookups. + */ + rlist_del(&tracker->in_conflict_list); + rlist_del(&tracker->in_conflicted_by_list); + } else { + size_t size; + tracker = region_alloc_object(&victim->region, + struct tx_conflict_tracker, + &size); + if (tracker == NULL) { + diag_set(OutOfMemory, size, "tx region", + "conflict_tracker"); + return -1; + } + tracker->breaker = breaker; + tracker->victim = victim; + } + rlist_add(&breaker->conflict_list, &tracker->in_conflict_list); + rlist_add(&victim->conflicted_by_list, &tracker->in_conflicted_by_list); + return 0; +} + +/** + * Handle conflict when @breaker transaction is prepared. + * The conflict is happened if @victim have read something that @breaker + * overwrites. + * If @victim is read-only or haven't made any changes, it should be send + * to read view, in which is will not see @breaker. + * Otherwise @vistim must be marked as conflicted. + */ +void +memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim) +{ + assert(breaker->psn != 0); + if (victim->status != TXN_INPROGRESS) { + /* Was conflicted by somebody else. */ + return; + } + if (stailq_empty(&victim->stmts)) { + /* Send to read view. */ + victim->status = TXN_IN_READ_VIEW; + victim->rv_psn = breaker->psn; + rlist_add_tail(&txm.read_view_txs, &victim->in_read_view_txs); + } else { + /* Mark as conflicted. */ + victim->status = TXN_CONFLICTED; + } +} diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h index fb2cb4d..6143a22 100644 --- a/src/box/memtx_tx.h +++ b/src/box/memtx_tx.h @@ -32,6 +32,8 @@ #include +#include "small/rlist.h" + #if defined(__cplusplus) extern "C" { #endif /* defined(__cplusplus) */ @@ -44,6 +46,21 @@ extern "C" { extern bool memtx_tx_manager_use_mvcc_engine; /** + * Record that links two transactions, breaker and victim. + * See memtx_tx_cause_conflict for details. + */ +struct tx_conflict_tracker { + /** TX that aborts victim on commit. */ + struct txn *breaker; + /** TX that will be aborted on breaker's commit. */ + struct txn *victim; + /** Link in breaker->conflict_list. */ + struct rlist in_conflict_list; + /** Link in victim->conflicted_by_list. */ + struct rlist in_conflicted_by_list; +}; + +/** * Initialize memtx transaction manager. */ void @@ -55,6 +72,27 @@ memtx_tx_manager_init(); void memtx_tx_manager_free(); +/** + * Notify TX manager that if transaction @breaker is committed then the + * transaction @victim must be aborted due to conflict. + * For example: there's two rw transaction in progress, one have read + * some value while the second is about to overwrite it. If the second + * is committed first, the first must be aborted. + * @return 0 on success, -1 on memory error. + */ +int +memtx_tx_cause_conflict(struct txn *breaker, struct txn *victim); + +/** + * Handle conflict when @breaker transaction is prepared. + * The conflict is happened if @victim have read something that @breaker + * overwrites. + * If @victim is read-only or haven't made any changes, it should be sent + * to read view, in which is will not see @breaker. + * Otherwise @victim must be marked as conflicted. + */ +void +memtx_tx_handle_conflict(struct txn *breaker, struct txn *victim); #if defined(__cplusplus) } /* extern "C" */ diff --git a/src/box/txn.c b/src/box/txn.c index 1dfe59f..976e17c 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -29,6 +29,7 @@ * SUCH DAMAGE. */ #include "txn.h" +#include "memtx_tx.h" #include "txn_limbo.h" #include "engine.h" #include "tuple.h" @@ -193,6 +194,9 @@ txn_new(void) } assert(region_used(®ion) == sizeof(*txn)); txn->region = region; + rlist_create(&txn->conflict_list); + rlist_create(&txn->conflicted_by_list); + rlist_create(&txn->in_read_view_txs); return txn; } @@ -202,6 +206,22 @@ txn_new(void) inline static void txn_free(struct txn *txn) { + struct tx_conflict_tracker *entry, *next; + rlist_foreach_entry_safe(entry, &txn->conflict_list, + in_conflict_list, next) { + rlist_del(&entry->in_conflict_list); + rlist_del(&entry->in_conflicted_by_list); + } + rlist_foreach_entry_safe(entry, &txn->conflicted_by_list, + in_conflicted_by_list, next) { + rlist_del(&entry->in_conflict_list); + rlist_del(&entry->in_conflicted_by_list); + } + assert(rlist_empty(&txn->conflict_list)); + assert(rlist_empty(&txn->conflicted_by_list)); + + rlist_del(&txn->in_read_view_txs); + struct txn_stmt *stmt; stailq_foreach_entry(stmt, &txn->stmts, next) txn_stmt_destroy(stmt); @@ -219,6 +239,8 @@ txn_begin(void) struct txn *txn = txn_new(); if (txn == NULL) return NULL; + assert(rlist_empty(&txn->conflict_list)); + assert(rlist_empty(&txn->conflicted_by_list)); /* Initialize members explicitly to save time on memset() */ stailq_create(&txn->stmts); @@ -229,6 +251,7 @@ txn_begin(void) txn->in_sub_stmt = 0; txn->id = ++tsn; txn->psn = 0; + txn->rv_psn = 0; txn->status = TXN_INPROGRESS; txn->signature = TXN_SIGNATURE_ROLLBACK; txn->engine = NULL; @@ -277,6 +300,15 @@ txn_begin_stmt(struct txn *txn, struct space *space) diag_set(ClientError, ER_SUB_STMT_MAX); return -1; } + + /* + * A conflict have happened; there is no reason to continue the TX. + */ + if (txn->status == TXN_CONFLICTED) { + diag_set(ClientError, ER_TRANSACTION_CONFLICT); + return -1; + } + struct txn_stmt *stmt = txn_stmt_new(&txn->region); if (stmt == NULL) return -1; @@ -669,6 +701,17 @@ txn_prepare(struct txn *txn) diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT); return -1; } + + /* + * Somebody else has written some value that we have read. + * The RW transaction is not possible. + */ + if (txn->status == TXN_CONFLICTED || + (txn->status == TXN_IN_READ_VIEW && !stailq_empty(&txn->stmts))) { + diag_set(ClientError, ER_TRANSACTION_CONFLICT); + return -1; + } + /* * Perform transaction conflict resolution. Engine == NULL when * we have a bunch of IPROTO_NOP statements. @@ -677,6 +720,26 @@ txn_prepare(struct txn *txn) if (engine_prepare(txn->engine, txn) != 0) return -1; } + + struct tx_conflict_tracker *entry, *next; + /* Handle conflicts. */ + rlist_foreach_entry_safe(entry, &txn->conflict_list, + in_conflict_list, next) { + assert(entry->breaker == txn); + memtx_tx_handle_conflict(txn, entry->victim); + rlist_del(&entry->in_conflict_list); + rlist_del(&entry->in_conflicted_by_list); + } + /* Just free conflict list - we don't need it anymore. */ + rlist_foreach_entry_safe(entry, &txn->conflicted_by_list, + in_conflicted_by_list, next) { + assert(entry->victim == txn); + rlist_del(&entry->in_conflict_list); + rlist_del(&entry->in_conflicted_by_list); + } + assert(rlist_empty(&txn->conflict_list)); + assert(rlist_empty(&txn->conflicted_by_list)); + trigger_clear(&txn->fiber_on_stop); if (!txn_has_flag(txn, TXN_CAN_YIELD)) trigger_clear(&txn->fiber_on_yield); diff --git a/src/box/txn.h b/src/box/txn.h index d7e77e5..f957d1e 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -272,6 +272,11 @@ struct txn { * Transactions are committed in that order. */ int64_t psn; + /** + * Read view of that TX. The TX can see only changes with ps < rv_psn. + * Is nonzero if and only if status = TXN_IN_READ_VIEW. + */ + int64_t rv_psn; /** Status of the TX */ enum txn_status status; /** List of statements in a transaction. */ @@ -333,6 +338,22 @@ struct txn { uint32_t fk_deferred_count; /** List of savepoints to find savepoint by name. */ struct rlist savepoints; + /** + * List of tx_conflict_tracker records where .breaker is the current + * transaction and .victim is the transactions that must be aborted + * if the current transaction is committed. + */ + struct rlist conflict_list; + /** + * List of tx_conflict_tracker records where .victim is the current + * transaction and .breaker is the transactions that, if committed, + * will abort the current transaction. + */ + struct rlist conflicted_by_list; + /** + * Link in tx_manager::read_view_txs. + */ + struct rlist in_read_view_txs; }; static inline bool -- 2.7.4