[Tarantool-patches] [PATCH v2 19/19] replication: block async transactions when not empty limbo
Serge Petrenko
sergepetrenko at tarantool.org
Fri Jul 3 15:28:49 MSK 2020
30.06.2020 02:15, Vladislav Shpilevoy пишет:
> When there is a not committed synchronous transaction, any attempt
> to commit a next transaction should be suspended, even if it is an
> async transaction.
>
> This restriction comes from the theoretically possible dependency
> of what is written in the async transactions on what was written
> in the previous sync transactions.
>
> For that there is a new txn flag - TXN_WAIT_SYNC. Previously the
> only synchro replication flag was TXN_WAIT_ACK. And now a
> transaction can be sync, but not wait for ACKs.
>
> In particular, if a transaction:
>
> - Is synchronous, the it has TXN_WAIT_SYNC (it is sync), and
> TXN_WAIT_ACK (need to collect ACKs, or get a CONFIRM);
>
> - Is asynchronous, and the limbo was empty and the moment of
> commit, the it does not have any of these flags and committed
> like earlier;
>
> - Is asynchronous, and the limbo was not empty and the moment of
> commit. Then it will have only TXN_WAIT_SYNC. So it will be
> finished right after all the previous sync transactions are
> done. Note: *without waiting for ACKs* - the transaction is
> still asynchronous in a sense that it is don't need to wait for
> quorum replication.
>
> Follow-up #4845
Thanks for the patch! LGTM.
> ---
> src/box/applier.cc | 8 ++
> src/box/txn.c | 16 ++--
> src/box/txn.h | 7 ++
> src/box/txn_limbo.c | 49 +++++++++---
> .../sync_replication_sanity.result | 75 +++++++++++++++++++
> .../sync_replication_sanity.test.lua | 26 +++++++
> test/unit/snap_quorum_delay.cc | 6 +-
> 7 files changed, 172 insertions(+), 15 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 7e63dc544..7e70211b7 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -280,6 +280,14 @@ process_confirm_rollback(struct request *request, bool is_confirm)
> txn_limbo.instance_id);
> return -1;
> }
> + assert(txn->n_applier_rows == 0);
> + /*
> + * This is not really a transaction. It just uses txn API
> + * to put the data into WAL. And obviously it should not
> + * go to the limbo and block on the very same sync
> + * transaction which it tries to confirm now.
> + */
> + txn_set_flag(txn, TXN_FORCE_ASYNC);
>
> if (txn_begin_stmt(txn, NULL) != 0)
> return -1;
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 37955752a..bc2bb8e11 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -442,7 +442,7 @@ txn_complete(struct txn *txn)
> engine_rollback(txn->engine, txn);
> if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
> txn_run_rollback_triggers(txn, &txn->on_rollback);
> - } else if (!txn_has_flag(txn, TXN_WAIT_ACK)) {
> + } else if (!txn_has_flag(txn, TXN_WAIT_SYNC)) {
> /* Commit the transaction. */
> if (txn->engine != NULL)
> engine_commit(txn->engine, txn);
> @@ -552,8 +552,14 @@ txn_journal_entry_new(struct txn *txn)
> * space can't be synchronous. So if there is at least one
> * synchronous space, the transaction is not local.
> */
> - if (is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC))
> - txn_set_flag(txn, TXN_WAIT_ACK);
> + if (!txn_has_flag(txn, TXN_FORCE_ASYNC)) {
> + if (is_sync) {
> + txn_set_flag(txn, TXN_WAIT_SYNC);
> + txn_set_flag(txn, TXN_WAIT_ACK);
> + } else if (!txn_limbo_is_empty(&txn_limbo)) {
> + txn_set_flag(txn, TXN_WAIT_SYNC);
> + }
> + }
>
> assert(remote_row == req->rows + txn->n_applier_rows);
> assert(local_row == remote_row + txn->n_new_rows);
> @@ -662,7 +668,7 @@ txn_commit_async(struct txn *txn)
> return -1;
> }
>
> - bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
> + bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC);
> struct txn_limbo_entry *limbo_entry;
> if (is_sync) {
> /*
> @@ -737,7 +743,7 @@ txn_commit(struct txn *txn)
> return -1;
> }
>
> - bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
> + bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC);
> if (is_sync) {
> /*
> * Remote rows, if any, come before local rows, so
> diff --git a/src/box/txn.h b/src/box/txn.h
> index c631d7033..c484fcb56 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -66,11 +66,18 @@ enum txn_flag {
> TXN_CAN_YIELD,
> /** on_commit and/or on_rollback list is not empty. */
> TXN_HAS_TRIGGERS,
> + /**
> + * A transaction is either synchronous itself and needs to
> + * be synced with replicas, or it is async, but is blocked
> + * by a not yet finished synchronous transaction.
> + */
> + TXN_WAIT_SYNC,
> /**
> * Transaction, touched sync spaces, enters 'waiting for
> * acks' state before commit. In this state it waits until
> * it is replicated onto a quorum of replicas, and only
> * then finishes commit and returns success to a user.
> + * TXN_WAIT_SYNC is always set, if TXN_WAIT_ACK is set.
> */
> TXN_WAIT_ACK,
> /**
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index fbe4dcecf..bfb404e8e 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -47,7 +47,7 @@ txn_limbo_create(struct txn_limbo *limbo)
> struct txn_limbo_entry *
> txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn)
> {
> - assert(txn_has_flag(txn, TXN_WAIT_ACK));
> + assert(txn_has_flag(txn, TXN_WAIT_SYNC));
> if (id == 0)
> id = instance_id;
> if (limbo->instance_id != id) {
> @@ -143,7 +143,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> struct txn *txn = entry->txn;
> assert(entry->lsn > 0);
> assert(!txn_has_flag(txn, TXN_IS_DONE));
> - assert(txn_has_flag(txn, TXN_WAIT_ACK));
> + assert(txn_has_flag(txn, TXN_WAIT_SYNC));
> if (txn_limbo_check_complete(limbo, entry)) {
> txn_limbo_remove(limbo, entry);
> return 0;
> @@ -160,6 +160,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
> txn_limbo_pop(limbo, e);
> txn_clear_flag(e->txn, TXN_WAIT_ACK);
> + txn_clear_flag(e->txn, TXN_WAIT_SYNC);
> txn_complete(e->txn);
> if (e == entry)
> break;
> @@ -179,6 +180,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> }
> txn_limbo_remove(limbo, entry);
> txn_clear_flag(txn, TXN_WAIT_ACK);
> + txn_clear_flag(txn, TXN_WAIT_SYNC);
> return 0;
> }
>
> @@ -209,6 +211,13 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo,
> struct txn *txn = txn_begin();
> if (txn == NULL)
> return -1;
> + /*
> + * This is not really a transaction. It just uses txn API
> + * to put the data into WAL. And obviously it should not
> + * go to the limbo and block on the very same sync
> + * transaction which it tries to confirm now.
> + */
> + txn_set_flag(txn, TXN_FORCE_ASYNC);
>
> if (txn_begin_stmt(txn, NULL) != 0)
> goto rollback;
> @@ -238,11 +247,21 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
> assert(limbo->instance_id != REPLICA_ID_NIL);
> struct txn_limbo_entry *e, *tmp;
> rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
> - if (e->lsn > lsn)
> + /*
> + * Confirm a transaction if
> + * - it is a sync transaction covered by the
> + * confirmation LSN;
> + * - it is an async transaction, and it is the
> + * last in the queue. So it does not depend on
> + * a not finished sync transaction anymore and
> + * can be confirmed too.
> + */
> + if (e->lsn > lsn && txn_has_flag(e->txn, TXN_WAIT_ACK))
> break;
> e->is_commit = true;
> txn_limbo_remove(limbo, e);
> txn_clear_flag(e->txn, TXN_WAIT_ACK);
> + txn_clear_flag(e->txn, TXN_WAIT_SYNC);
> /*
> * If txn_complete_async() was already called,
> * finish tx processing. Otherwise just clear the
> @@ -277,6 +296,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
> e->is_rollback = true;
> txn_limbo_pop(limbo, e);
> txn_clear_flag(e->txn, TXN_WAIT_ACK);
> + txn_clear_flag(e->txn, TXN_WAIT_SYNC);
> if (e->txn->signature >= 0) {
> /* Rollback the transaction. */
> e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK;
> @@ -307,15 +327,26 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
> struct txn_limbo_entry *e;
> struct txn_limbo_entry *last_quorum = NULL;
> rlist_foreach_entry(e, &limbo->queue, in_queue) {
> - if (e->lsn <= prev_lsn)
> - continue;
> if (e->lsn > lsn)
> break;
> - if (++e->ack_count >= replication_synchro_quorum) {
> - e->is_commit = true;
> - last_quorum = e;
> - }
> + if (e->lsn <= prev_lsn)
> + continue;
> assert(e->ack_count <= VCLOCK_MAX);
> + /*
> + * Sync transactions need to collect acks. Async
> + * transactions are automatically committed right
> + * after all the previous sync transactions are.
> + */
> + if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
> + if (++e->ack_count < replication_synchro_quorum)
> + continue;
> + } else {
> + assert(txn_has_flag(e->txn, TXN_WAIT_SYNC));
> + if (last_quorum == NULL)
> + continue;
> + }
> + e->is_commit = true;
> + last_quorum = e;
> }
> if (last_quorum != NULL) {
> if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
> diff --git a/test/replication/sync_replication_sanity.result b/test/replication/sync_replication_sanity.result
> index 8b37ba6f5..f713d4b08 100644
> --- a/test/replication/sync_replication_sanity.result
> +++ b/test/replication/sync_replication_sanity.result
> @@ -224,6 +224,81 @@ box.space.sync:select{4}
> | - - [4]
> | ...
>
> +--
> +-- Async transactions should wait for existing sync transactions
> +-- finish.
> +--
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +-- Start 2 fibers, which will execute one right after the other
> +-- in the same event loop iteration.
> +f = fiber.create(box.space.sync.replace, box.space.sync, {5}) s:replace{5}
> + | ---
> + | ...
> +f:status()
> + | ---
> + | - dead
> + | ...
> +s:select{5}
> + | ---
> + | - - [5]
> + | ...
> +box.space.sync:select{5}
> + | ---
> + | - - [5]
> + | ...
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.test:select{5}
> + | ---
> + | - - [5]
> + | ...
> +box.space.sync:select{5}
> + | ---
> + | - - [5]
> + | ...
> +-- Ensure sync rollback will affect all pending async transactions
> +-- too.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3}
> + | ---
> + | ...
> +f = fiber.create(box.space.sync.replace, box.space.sync, {6}) s:replace{6}
> + | ---
> + | - error: Quorum collection for a synchronous transaction is timed out
> + | ...
> +f:status()
> + | ---
> + | - dead
> + | ...
> +s:select{6}
> + | ---
> + | - []
> + | ...
> +box.space.sync:select{6}
> + | ---
> + | - []
> + | ...
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.test:select{6}
> + | ---
> + | - []
> + | ...
> +box.space.sync:select{6}
> + | ---
> + | - []
> + | ...
> +
> -- Cleanup.
> test_run:cmd('switch default')
> | ---
> diff --git a/test/replication/sync_replication_sanity.test.lua b/test/replication/sync_replication_sanity.test.lua
> index b0326fd4b..f84b6ee19 100644
> --- a/test/replication/sync_replication_sanity.test.lua
> +++ b/test/replication/sync_replication_sanity.test.lua
> @@ -92,6 +92,32 @@ box.space.sync:replace{4}
> test_run:switch('replica')
> box.space.sync:select{4}
>
> +--
> +-- Async transactions should wait for existing sync transactions
> +-- finish.
> +--
> +test_run:switch('default')
> +-- Start 2 fibers, which will execute one right after the other
> +-- in the same event loop iteration.
> +f = fiber.create(box.space.sync.replace, box.space.sync, {5}) s:replace{5}
> +f:status()
> +s:select{5}
> +box.space.sync:select{5}
> +test_run:switch('replica')
> +box.space.test:select{5}
> +box.space.sync:select{5}
> +-- Ensure sync rollback will affect all pending async transactions
> +-- too.
> +test_run:switch('default')
> +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3}
> +f = fiber.create(box.space.sync.replace, box.space.sync, {6}) s:replace{6}
> +f:status()
> +s:select{6}
> +box.space.sync:select{6}
> +test_run:switch('replica')
> +box.space.test:select{6}
> +box.space.sync:select{6}
> +
> -- Cleanup.
> test_run:cmd('switch default')
>
> diff --git a/test/unit/snap_quorum_delay.cc b/test/unit/snap_quorum_delay.cc
> index 7a200673a..e6cf381bf 100644
> --- a/test/unit/snap_quorum_delay.cc
> +++ b/test/unit/snap_quorum_delay.cc
> @@ -97,8 +97,12 @@ txn_process_func(va_list ap)
> enum process_type process_type = (enum process_type)va_arg(ap, int);
> struct txn *txn = txn_begin();
> txn->fiber = fiber();
> - /* Set the TXN_WAIT_ACK flag to simulate a sync transaction.*/
> + /*
> + * Set the TXN_WAIT_ACK + SYNC flags to simulate a sync
> + * transaction.
> + */
> txn_set_flag(txn, TXN_WAIT_ACK);
> + txn_set_flag(txn, TXN_WAIT_SYNC);
> /*
> * The true way to push the transaction to limbo is to call
> * txn_commit() for sync transaction. But, if txn_commit()
--
Serge Petrenko
More information about the Tarantool-patches
mailing list