From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp14.mail.ru (smtp14.mail.ru [94.100.181.95]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id BA26742EF5C for ; Fri, 3 Jul 2020 15:28:50 +0300 (MSK) References: <6cce3594cdf04b56e18b73c7f1e9fcaebbbcf215.1593472477.git.v.shpilevoy@tarantool.org> From: Serge Petrenko Message-ID: Date: Fri, 3 Jul 2020 15:28:49 +0300 MIME-Version: 1.0 In-Reply-To: <6cce3594cdf04b56e18b73c7f1e9fcaebbbcf215.1593472477.git.v.shpilevoy@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH v2 19/19] replication: block async transactions when not empty limbo List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org 30.06.2020 02:15, Vladislav Shpilevoy пишет: > When there is a not committed synchronous transaction, any attempt > to commit a next transaction should be suspended, even if it is an > async transaction. > > This restriction comes from the theoretically possible dependency > of what is written in the async transactions on what was written > in the previous sync transactions. > > For that there is a new txn flag - TXN_WAIT_SYNC. Previously the > only synchro replication flag was TXN_WAIT_ACK. And now a > transaction can be sync, but not wait for ACKs. > > In particular, if a transaction: > > - Is synchronous, the it has TXN_WAIT_SYNC (it is sync), and > TXN_WAIT_ACK (need to collect ACKs, or get a CONFIRM); > > - Is asynchronous, and the limbo was empty and the moment of > commit, the it does not have any of these flags and committed > like earlier; > > - Is asynchronous, and the limbo was not empty and the moment of > commit. Then it will have only TXN_WAIT_SYNC. So it will be > finished right after all the previous sync transactions are > done. Note: *without waiting for ACKs* - the transaction is > still asynchronous in a sense that it is don't need to wait for > quorum replication. > > Follow-up #4845 Thanks for the patch! LGTM. > --- > src/box/applier.cc | 8 ++ > src/box/txn.c | 16 ++-- > src/box/txn.h | 7 ++ > src/box/txn_limbo.c | 49 +++++++++--- > .../sync_replication_sanity.result | 75 +++++++++++++++++++ > .../sync_replication_sanity.test.lua | 26 +++++++ > test/unit/snap_quorum_delay.cc | 6 +- > 7 files changed, 172 insertions(+), 15 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 7e63dc544..7e70211b7 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -280,6 +280,14 @@ process_confirm_rollback(struct request *request, bool is_confirm) > txn_limbo.instance_id); > return -1; > } > + assert(txn->n_applier_rows == 0); > + /* > + * This is not really a transaction. It just uses txn API > + * to put the data into WAL. And obviously it should not > + * go to the limbo and block on the very same sync > + * transaction which it tries to confirm now. > + */ > + txn_set_flag(txn, TXN_FORCE_ASYNC); > > if (txn_begin_stmt(txn, NULL) != 0) > return -1; > diff --git a/src/box/txn.c b/src/box/txn.c > index 37955752a..bc2bb8e11 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -442,7 +442,7 @@ txn_complete(struct txn *txn) > engine_rollback(txn->engine, txn); > if (txn_has_flag(txn, TXN_HAS_TRIGGERS)) > txn_run_rollback_triggers(txn, &txn->on_rollback); > - } else if (!txn_has_flag(txn, TXN_WAIT_ACK)) { > + } else if (!txn_has_flag(txn, TXN_WAIT_SYNC)) { > /* Commit the transaction. */ > if (txn->engine != NULL) > engine_commit(txn->engine, txn); > @@ -552,8 +552,14 @@ txn_journal_entry_new(struct txn *txn) > * space can't be synchronous. So if there is at least one > * synchronous space, the transaction is not local. > */ > - if (is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC)) > - txn_set_flag(txn, TXN_WAIT_ACK); > + if (!txn_has_flag(txn, TXN_FORCE_ASYNC)) { > + if (is_sync) { > + txn_set_flag(txn, TXN_WAIT_SYNC); > + txn_set_flag(txn, TXN_WAIT_ACK); > + } else if (!txn_limbo_is_empty(&txn_limbo)) { > + txn_set_flag(txn, TXN_WAIT_SYNC); > + } > + } > > assert(remote_row == req->rows + txn->n_applier_rows); > assert(local_row == remote_row + txn->n_new_rows); > @@ -662,7 +668,7 @@ txn_commit_async(struct txn *txn) > return -1; > } > > - bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK); > + bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC); > struct txn_limbo_entry *limbo_entry; > if (is_sync) { > /* > @@ -737,7 +743,7 @@ txn_commit(struct txn *txn) > return -1; > } > > - bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK); > + bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC); > if (is_sync) { > /* > * Remote rows, if any, come before local rows, so > diff --git a/src/box/txn.h b/src/box/txn.h > index c631d7033..c484fcb56 100644 > --- a/src/box/txn.h > +++ b/src/box/txn.h > @@ -66,11 +66,18 @@ enum txn_flag { > TXN_CAN_YIELD, > /** on_commit and/or on_rollback list is not empty. */ > TXN_HAS_TRIGGERS, > + /** > + * A transaction is either synchronous itself and needs to > + * be synced with replicas, or it is async, but is blocked > + * by a not yet finished synchronous transaction. > + */ > + TXN_WAIT_SYNC, > /** > * Transaction, touched sync spaces, enters 'waiting for > * acks' state before commit. In this state it waits until > * it is replicated onto a quorum of replicas, and only > * then finishes commit and returns success to a user. > + * TXN_WAIT_SYNC is always set, if TXN_WAIT_ACK is set. > */ > TXN_WAIT_ACK, > /** > diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c > index fbe4dcecf..bfb404e8e 100644 > --- a/src/box/txn_limbo.c > +++ b/src/box/txn_limbo.c > @@ -47,7 +47,7 @@ txn_limbo_create(struct txn_limbo *limbo) > struct txn_limbo_entry * > txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn) > { > - assert(txn_has_flag(txn, TXN_WAIT_ACK)); > + assert(txn_has_flag(txn, TXN_WAIT_SYNC)); > if (id == 0) > id = instance_id; > if (limbo->instance_id != id) { > @@ -143,7 +143,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > struct txn *txn = entry->txn; > assert(entry->lsn > 0); > assert(!txn_has_flag(txn, TXN_IS_DONE)); > - assert(txn_has_flag(txn, TXN_WAIT_ACK)); > + assert(txn_has_flag(txn, TXN_WAIT_SYNC)); > if (txn_limbo_check_complete(limbo, entry)) { > txn_limbo_remove(limbo, entry); > return 0; > @@ -160,6 +160,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT; > txn_limbo_pop(limbo, e); > txn_clear_flag(e->txn, TXN_WAIT_ACK); > + txn_clear_flag(e->txn, TXN_WAIT_SYNC); > txn_complete(e->txn); > if (e == entry) > break; > @@ -179,6 +180,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > } > txn_limbo_remove(limbo, entry); > txn_clear_flag(txn, TXN_WAIT_ACK); > + txn_clear_flag(txn, TXN_WAIT_SYNC); > return 0; > } > > @@ -209,6 +211,13 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, > struct txn *txn = txn_begin(); > if (txn == NULL) > return -1; > + /* > + * This is not really a transaction. It just uses txn API > + * to put the data into WAL. And obviously it should not > + * go to the limbo and block on the very same sync > + * transaction which it tries to confirm now. > + */ > + txn_set_flag(txn, TXN_FORCE_ASYNC); > > if (txn_begin_stmt(txn, NULL) != 0) > goto rollback; > @@ -238,11 +247,21 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) > assert(limbo->instance_id != REPLICA_ID_NIL); > struct txn_limbo_entry *e, *tmp; > rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) { > - if (e->lsn > lsn) > + /* > + * Confirm a transaction if > + * - it is a sync transaction covered by the > + * confirmation LSN; > + * - it is an async transaction, and it is the > + * last in the queue. So it does not depend on > + * a not finished sync transaction anymore and > + * can be confirmed too. > + */ > + if (e->lsn > lsn && txn_has_flag(e->txn, TXN_WAIT_ACK)) > break; > e->is_commit = true; > txn_limbo_remove(limbo, e); > txn_clear_flag(e->txn, TXN_WAIT_ACK); > + txn_clear_flag(e->txn, TXN_WAIT_SYNC); > /* > * If txn_complete_async() was already called, > * finish tx processing. Otherwise just clear the > @@ -277,6 +296,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn) > e->is_rollback = true; > txn_limbo_pop(limbo, e); > txn_clear_flag(e->txn, TXN_WAIT_ACK); > + txn_clear_flag(e->txn, TXN_WAIT_SYNC); > if (e->txn->signature >= 0) { > /* Rollback the transaction. */ > e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK; > @@ -307,15 +327,26 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) > struct txn_limbo_entry *e; > struct txn_limbo_entry *last_quorum = NULL; > rlist_foreach_entry(e, &limbo->queue, in_queue) { > - if (e->lsn <= prev_lsn) > - continue; > if (e->lsn > lsn) > break; > - if (++e->ack_count >= replication_synchro_quorum) { > - e->is_commit = true; > - last_quorum = e; > - } > + if (e->lsn <= prev_lsn) > + continue; > assert(e->ack_count <= VCLOCK_MAX); > + /* > + * Sync transactions need to collect acks. Async > + * transactions are automatically committed right > + * after all the previous sync transactions are. > + */ > + if (txn_has_flag(e->txn, TXN_WAIT_ACK)) { > + if (++e->ack_count < replication_synchro_quorum) > + continue; > + } else { > + assert(txn_has_flag(e->txn, TXN_WAIT_SYNC)); > + if (last_quorum == NULL) > + continue; > + } > + e->is_commit = true; > + last_quorum = e; > } > if (last_quorum != NULL) { > if (txn_limbo_write_confirm(limbo, last_quorum) != 0) { > diff --git a/test/replication/sync_replication_sanity.result b/test/replication/sync_replication_sanity.result > index 8b37ba6f5..f713d4b08 100644 > --- a/test/replication/sync_replication_sanity.result > +++ b/test/replication/sync_replication_sanity.result > @@ -224,6 +224,81 @@ box.space.sync:select{4} > | - - [4] > | ... > > +-- > +-- Async transactions should wait for existing sync transactions > +-- finish. > +-- > +test_run:switch('default') > + | --- > + | - true > + | ... > +-- Start 2 fibers, which will execute one right after the other > +-- in the same event loop iteration. > +f = fiber.create(box.space.sync.replace, box.space.sync, {5}) s:replace{5} > + | --- > + | ... > +f:status() > + | --- > + | - dead > + | ... > +s:select{5} > + | --- > + | - - [5] > + | ... > +box.space.sync:select{5} > + | --- > + | - - [5] > + | ... > +test_run:switch('replica') > + | --- > + | - true > + | ... > +box.space.test:select{5} > + | --- > + | - - [5] > + | ... > +box.space.sync:select{5} > + | --- > + | - - [5] > + | ... > +-- Ensure sync rollback will affect all pending async transactions > +-- too. > +test_run:switch('default') > + | --- > + | - true > + | ... > +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3} > + | --- > + | ... > +f = fiber.create(box.space.sync.replace, box.space.sync, {6}) s:replace{6} > + | --- > + | - error: Quorum collection for a synchronous transaction is timed out > + | ... > +f:status() > + | --- > + | - dead > + | ... > +s:select{6} > + | --- > + | - [] > + | ... > +box.space.sync:select{6} > + | --- > + | - [] > + | ... > +test_run:switch('replica') > + | --- > + | - true > + | ... > +box.space.test:select{6} > + | --- > + | - [] > + | ... > +box.space.sync:select{6} > + | --- > + | - [] > + | ... > + > -- Cleanup. > test_run:cmd('switch default') > | --- > diff --git a/test/replication/sync_replication_sanity.test.lua b/test/replication/sync_replication_sanity.test.lua > index b0326fd4b..f84b6ee19 100644 > --- a/test/replication/sync_replication_sanity.test.lua > +++ b/test/replication/sync_replication_sanity.test.lua > @@ -92,6 +92,32 @@ box.space.sync:replace{4} > test_run:switch('replica') > box.space.sync:select{4} > > +-- > +-- Async transactions should wait for existing sync transactions > +-- finish. > +-- > +test_run:switch('default') > +-- Start 2 fibers, which will execute one right after the other > +-- in the same event loop iteration. > +f = fiber.create(box.space.sync.replace, box.space.sync, {5}) s:replace{5} > +f:status() > +s:select{5} > +box.space.sync:select{5} > +test_run:switch('replica') > +box.space.test:select{5} > +box.space.sync:select{5} > +-- Ensure sync rollback will affect all pending async transactions > +-- too. > +test_run:switch('default') > +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3} > +f = fiber.create(box.space.sync.replace, box.space.sync, {6}) s:replace{6} > +f:status() > +s:select{6} > +box.space.sync:select{6} > +test_run:switch('replica') > +box.space.test:select{6} > +box.space.sync:select{6} > + > -- Cleanup. > test_run:cmd('switch default') > > diff --git a/test/unit/snap_quorum_delay.cc b/test/unit/snap_quorum_delay.cc > index 7a200673a..e6cf381bf 100644 > --- a/test/unit/snap_quorum_delay.cc > +++ b/test/unit/snap_quorum_delay.cc > @@ -97,8 +97,12 @@ txn_process_func(va_list ap) > enum process_type process_type = (enum process_type)va_arg(ap, int); > struct txn *txn = txn_begin(); > txn->fiber = fiber(); > - /* Set the TXN_WAIT_ACK flag to simulate a sync transaction.*/ > + /* > + * Set the TXN_WAIT_ACK + SYNC flags to simulate a sync > + * transaction. > + */ > txn_set_flag(txn, TXN_WAIT_ACK); > + txn_set_flag(txn, TXN_WAIT_SYNC); > /* > * The true way to push the transaction to limbo is to call > * txn_commit() for sync transaction. But, if txn_commit() -- Serge Petrenko