From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp36.i.mail.ru (smtp36.i.mail.ru [94.100.177.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 1506A42EF63 for ; Tue, 30 Jun 2020 02:15:45 +0300 (MSK) From: Vladislav Shpilevoy Date: Tue, 30 Jun 2020 01:15:20 +0200 Message-Id: <6cce3594cdf04b56e18b73c7f1e9fcaebbbcf215.1593472477.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v2 19/19] replication: block async transactions when not empty limbo List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org When there is a not committed synchronous transaction, any attempt to commit a next transaction should be suspended, even if it is an async transaction. This restriction comes from the theoretically possible dependency of what is written in the async transactions on what was written in the previous sync transactions. For that there is a new txn flag - TXN_WAIT_SYNC. Previously the only synchro replication flag was TXN_WAIT_ACK. And now a transaction can be sync, but not wait for ACKs. In particular, if a transaction: - Is synchronous, the it has TXN_WAIT_SYNC (it is sync), and TXN_WAIT_ACK (need to collect ACKs, or get a CONFIRM); - Is asynchronous, and the limbo was empty and the moment of commit, the it does not have any of these flags and committed like earlier; - Is asynchronous, and the limbo was not empty and the moment of commit. Then it will have only TXN_WAIT_SYNC. So it will be finished right after all the previous sync transactions are done. Note: *without waiting for ACKs* - the transaction is still asynchronous in a sense that it is don't need to wait for quorum replication. Follow-up #4845 --- src/box/applier.cc | 8 ++ src/box/txn.c | 16 ++-- src/box/txn.h | 7 ++ src/box/txn_limbo.c | 49 +++++++++--- .../sync_replication_sanity.result | 75 +++++++++++++++++++ .../sync_replication_sanity.test.lua | 26 +++++++ test/unit/snap_quorum_delay.cc | 6 +- 7 files changed, 172 insertions(+), 15 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 7e63dc544..7e70211b7 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -280,6 +280,14 @@ process_confirm_rollback(struct request *request, bool is_confirm) txn_limbo.instance_id); return -1; } + assert(txn->n_applier_rows == 0); + /* + * This is not really a transaction. It just uses txn API + * to put the data into WAL. And obviously it should not + * go to the limbo and block on the very same sync + * transaction which it tries to confirm now. + */ + txn_set_flag(txn, TXN_FORCE_ASYNC); if (txn_begin_stmt(txn, NULL) != 0) return -1; diff --git a/src/box/txn.c b/src/box/txn.c index 37955752a..bc2bb8e11 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -442,7 +442,7 @@ txn_complete(struct txn *txn) engine_rollback(txn->engine, txn); if (txn_has_flag(txn, TXN_HAS_TRIGGERS)) txn_run_rollback_triggers(txn, &txn->on_rollback); - } else if (!txn_has_flag(txn, TXN_WAIT_ACK)) { + } else if (!txn_has_flag(txn, TXN_WAIT_SYNC)) { /* Commit the transaction. */ if (txn->engine != NULL) engine_commit(txn->engine, txn); @@ -552,8 +552,14 @@ txn_journal_entry_new(struct txn *txn) * space can't be synchronous. So if there is at least one * synchronous space, the transaction is not local. */ - if (is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC)) - txn_set_flag(txn, TXN_WAIT_ACK); + if (!txn_has_flag(txn, TXN_FORCE_ASYNC)) { + if (is_sync) { + txn_set_flag(txn, TXN_WAIT_SYNC); + txn_set_flag(txn, TXN_WAIT_ACK); + } else if (!txn_limbo_is_empty(&txn_limbo)) { + txn_set_flag(txn, TXN_WAIT_SYNC); + } + } assert(remote_row == req->rows + txn->n_applier_rows); assert(local_row == remote_row + txn->n_new_rows); @@ -662,7 +668,7 @@ txn_commit_async(struct txn *txn) return -1; } - bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK); + bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC); struct txn_limbo_entry *limbo_entry; if (is_sync) { /* @@ -737,7 +743,7 @@ txn_commit(struct txn *txn) return -1; } - bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK); + bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC); if (is_sync) { /* * Remote rows, if any, come before local rows, so diff --git a/src/box/txn.h b/src/box/txn.h index c631d7033..c484fcb56 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -66,11 +66,18 @@ enum txn_flag { TXN_CAN_YIELD, /** on_commit and/or on_rollback list is not empty. */ TXN_HAS_TRIGGERS, + /** + * A transaction is either synchronous itself and needs to + * be synced with replicas, or it is async, but is blocked + * by a not yet finished synchronous transaction. + */ + TXN_WAIT_SYNC, /** * Transaction, touched sync spaces, enters 'waiting for * acks' state before commit. In this state it waits until * it is replicated onto a quorum of replicas, and only * then finishes commit and returns success to a user. + * TXN_WAIT_SYNC is always set, if TXN_WAIT_ACK is set. */ TXN_WAIT_ACK, /** diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index fbe4dcecf..bfb404e8e 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -47,7 +47,7 @@ txn_limbo_create(struct txn_limbo *limbo) struct txn_limbo_entry * txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn) { - assert(txn_has_flag(txn, TXN_WAIT_ACK)); + assert(txn_has_flag(txn, TXN_WAIT_SYNC)); if (id == 0) id = instance_id; if (limbo->instance_id != id) { @@ -143,7 +143,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) struct txn *txn = entry->txn; assert(entry->lsn > 0); assert(!txn_has_flag(txn, TXN_IS_DONE)); - assert(txn_has_flag(txn, TXN_WAIT_ACK)); + assert(txn_has_flag(txn, TXN_WAIT_SYNC)); if (txn_limbo_check_complete(limbo, entry)) { txn_limbo_remove(limbo, entry); return 0; @@ -160,6 +160,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT; txn_limbo_pop(limbo, e); txn_clear_flag(e->txn, TXN_WAIT_ACK); + txn_clear_flag(e->txn, TXN_WAIT_SYNC); txn_complete(e->txn); if (e == entry) break; @@ -179,6 +180,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) } txn_limbo_remove(limbo, entry); txn_clear_flag(txn, TXN_WAIT_ACK); + txn_clear_flag(txn, TXN_WAIT_SYNC); return 0; } @@ -209,6 +211,13 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, struct txn *txn = txn_begin(); if (txn == NULL) return -1; + /* + * This is not really a transaction. It just uses txn API + * to put the data into WAL. And obviously it should not + * go to the limbo and block on the very same sync + * transaction which it tries to confirm now. + */ + txn_set_flag(txn, TXN_FORCE_ASYNC); if (txn_begin_stmt(txn, NULL) != 0) goto rollback; @@ -238,11 +247,21 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) assert(limbo->instance_id != REPLICA_ID_NIL); struct txn_limbo_entry *e, *tmp; rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) { - if (e->lsn > lsn) + /* + * Confirm a transaction if + * - it is a sync transaction covered by the + * confirmation LSN; + * - it is an async transaction, and it is the + * last in the queue. So it does not depend on + * a not finished sync transaction anymore and + * can be confirmed too. + */ + if (e->lsn > lsn && txn_has_flag(e->txn, TXN_WAIT_ACK)) break; e->is_commit = true; txn_limbo_remove(limbo, e); txn_clear_flag(e->txn, TXN_WAIT_ACK); + txn_clear_flag(e->txn, TXN_WAIT_SYNC); /* * If txn_complete_async() was already called, * finish tx processing. Otherwise just clear the @@ -277,6 +296,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn) e->is_rollback = true; txn_limbo_pop(limbo, e); txn_clear_flag(e->txn, TXN_WAIT_ACK); + txn_clear_flag(e->txn, TXN_WAIT_SYNC); if (e->txn->signature >= 0) { /* Rollback the transaction. */ e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK; @@ -307,15 +327,26 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) struct txn_limbo_entry *e; struct txn_limbo_entry *last_quorum = NULL; rlist_foreach_entry(e, &limbo->queue, in_queue) { - if (e->lsn <= prev_lsn) - continue; if (e->lsn > lsn) break; - if (++e->ack_count >= replication_synchro_quorum) { - e->is_commit = true; - last_quorum = e; - } + if (e->lsn <= prev_lsn) + continue; assert(e->ack_count <= VCLOCK_MAX); + /* + * Sync transactions need to collect acks. Async + * transactions are automatically committed right + * after all the previous sync transactions are. + */ + if (txn_has_flag(e->txn, TXN_WAIT_ACK)) { + if (++e->ack_count < replication_synchro_quorum) + continue; + } else { + assert(txn_has_flag(e->txn, TXN_WAIT_SYNC)); + if (last_quorum == NULL) + continue; + } + e->is_commit = true; + last_quorum = e; } if (last_quorum != NULL) { if (txn_limbo_write_confirm(limbo, last_quorum) != 0) { diff --git a/test/replication/sync_replication_sanity.result b/test/replication/sync_replication_sanity.result index 8b37ba6f5..f713d4b08 100644 --- a/test/replication/sync_replication_sanity.result +++ b/test/replication/sync_replication_sanity.result @@ -224,6 +224,81 @@ box.space.sync:select{4} | - - [4] | ... +-- +-- Async transactions should wait for existing sync transactions +-- finish. +-- +test_run:switch('default') + | --- + | - true + | ... +-- Start 2 fibers, which will execute one right after the other +-- in the same event loop iteration. +f = fiber.create(box.space.sync.replace, box.space.sync, {5}) s:replace{5} + | --- + | ... +f:status() + | --- + | - dead + | ... +s:select{5} + | --- + | - - [5] + | ... +box.space.sync:select{5} + | --- + | - - [5] + | ... +test_run:switch('replica') + | --- + | - true + | ... +box.space.test:select{5} + | --- + | - - [5] + | ... +box.space.sync:select{5} + | --- + | - - [5] + | ... +-- Ensure sync rollback will affect all pending async transactions +-- too. +test_run:switch('default') + | --- + | - true + | ... +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3} + | --- + | ... +f = fiber.create(box.space.sync.replace, box.space.sync, {6}) s:replace{6} + | --- + | - error: Quorum collection for a synchronous transaction is timed out + | ... +f:status() + | --- + | - dead + | ... +s:select{6} + | --- + | - [] + | ... +box.space.sync:select{6} + | --- + | - [] + | ... +test_run:switch('replica') + | --- + | - true + | ... +box.space.test:select{6} + | --- + | - [] + | ... +box.space.sync:select{6} + | --- + | - [] + | ... + -- Cleanup. test_run:cmd('switch default') | --- diff --git a/test/replication/sync_replication_sanity.test.lua b/test/replication/sync_replication_sanity.test.lua index b0326fd4b..f84b6ee19 100644 --- a/test/replication/sync_replication_sanity.test.lua +++ b/test/replication/sync_replication_sanity.test.lua @@ -92,6 +92,32 @@ box.space.sync:replace{4} test_run:switch('replica') box.space.sync:select{4} +-- +-- Async transactions should wait for existing sync transactions +-- finish. +-- +test_run:switch('default') +-- Start 2 fibers, which will execute one right after the other +-- in the same event loop iteration. +f = fiber.create(box.space.sync.replace, box.space.sync, {5}) s:replace{5} +f:status() +s:select{5} +box.space.sync:select{5} +test_run:switch('replica') +box.space.test:select{5} +box.space.sync:select{5} +-- Ensure sync rollback will affect all pending async transactions +-- too. +test_run:switch('default') +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3} +f = fiber.create(box.space.sync.replace, box.space.sync, {6}) s:replace{6} +f:status() +s:select{6} +box.space.sync:select{6} +test_run:switch('replica') +box.space.test:select{6} +box.space.sync:select{6} + -- Cleanup. test_run:cmd('switch default') diff --git a/test/unit/snap_quorum_delay.cc b/test/unit/snap_quorum_delay.cc index 7a200673a..e6cf381bf 100644 --- a/test/unit/snap_quorum_delay.cc +++ b/test/unit/snap_quorum_delay.cc @@ -97,8 +97,12 @@ txn_process_func(va_list ap) enum process_type process_type = (enum process_type)va_arg(ap, int); struct txn *txn = txn_begin(); txn->fiber = fiber(); - /* Set the TXN_WAIT_ACK flag to simulate a sync transaction.*/ + /* + * Set the TXN_WAIT_ACK + SYNC flags to simulate a sync + * transaction. + */ txn_set_flag(txn, TXN_WAIT_ACK); + txn_set_flag(txn, TXN_WAIT_SYNC); /* * The true way to push the transaction to limbo is to call * txn_commit() for sync transaction. But, if txn_commit() -- 2.21.1 (Apple Git-122.3)