From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp54.i.mail.ru (smtp54.i.mail.ru [217.69.128.34]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 540D0445320 for ; Sun, 5 Jul 2020 12:04:07 +0300 (MSK) References: From: Serge Petrenko Message-ID: <09a2d0b1-4c7e-635a-6c9c-8a3627736ed1@tarantool.org> Date: Sun, 5 Jul 2020 12:04:05 +0300 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH 4/5] [tosquash] replication: rework how local transactions wait sync List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org 03.07.2020 02:40, Vladislav Shpilevoy пишет: > There was a bug about how async transactions were blocked by a > not empty limbo. This was about fully local transactions. The > problem is that they don't have LSN in the needed vclock > component - it is always 0. It means, that their limbo entry > can't get a valid LSN by any means. Even a copy of the previous > sync transaction's LSN won't work, because the latter may by > still not written to WAL. > > This patch makes async transactions always have lsn -1 in their > limbo entry. Because anyway it does not matter. It is not needed > to collect ACKs, nor to propagate limbo's vclock. > > Now even a fully local transaction can be blocked by a pending > sync transaction. > > Note, this does not cover the case, when the transaction is not > fully local, and its last row is local. Hi! Thanks for the patch! Looks good, with one comment below. > --- > src/box/txn.c | 18 ++-- > src/box/txn_limbo.c | 60 +++++++------ > test/replication/qsync_basic.result | 120 ++++++++++++++++++++++++++ > test/replication/qsync_basic.test.lua | 45 ++++++++++ > 4 files changed, 208 insertions(+), 35 deletions(-) > > diff --git a/src/box/txn.c b/src/box/txn.c > index 6c333cbed..fe0591197 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -688,15 +688,15 @@ txn_commit_async(struct txn *txn) > > /* See txn_commit(). */ > uint32_t origin_id = req->rows[0]->replica_id; > - int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn; > limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn); > if (limbo_entry == NULL) { > txn_rollback(txn); > return -1; > } > - assert(lsn > 0); > - txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); > - > + if (txn_has_flag(txn, TXN_WAIT_ACK)) { > + int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn; > + txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); > + } > /* > * Set a trigger to abort waiting for confirm on > * WAL write failure. > @@ -779,10 +779,12 @@ txn_commit(struct txn *txn) > return -1; > } > if (is_sync) { > - int64_t lsn = req->rows[req->n_rows - 1]->lsn; > - txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); > - /* Local WAL write is a first 'ACK'. */ > - txn_limbo_ack(&txn_limbo, txn_limbo.instance_id, lsn); > + if (txn_has_flag(txn, TXN_WAIT_ACK)) { > + int64_t lsn = req->rows[req->n_rows - 1]->lsn; > + txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); > + /* Local WAL write is a first 'ACK'. */ > + txn_limbo_ack(&txn_limbo, txn_limbo.instance_id, lsn); > + } > if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) { > txn_free(txn); > return -1; > diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c > index 387cfd337..44a0c7273 100644 > --- a/src/box/txn_limbo.c > +++ b/src/box/txn_limbo.c > @@ -120,6 +120,7 @@ txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry, > assert(limbo->instance_id != REPLICA_ID_NIL); > assert(entry->lsn == -1); > assert(lsn > 0); > + assert(txn_has_flag(entry->txn, TXN_WAIT_ACK)); > (void) limbo; > entry->lsn = lsn; > } > @@ -129,6 +130,12 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > { > if (txn_limbo_entry_is_complete(entry)) > return true; > + /* > + * Async transaction can't complete itself. It is always > + * completed by a previous sync transaction. > + */ > + if (!txn_has_flag(entry->txn, TXN_WAIT_ACK)) > + return false; > struct vclock_iterator iter; > vclock_iterator_init(&iter, &limbo->vclock); > int ack_count = 0; > @@ -142,14 +149,13 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > } > > static int > -txn_limbo_write_rollback(struct txn_limbo *limbo, > - struct txn_limbo_entry *entry); > +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn); > > int > txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > { > struct txn *txn = entry->txn; > - assert(entry->lsn > 0); > + assert(entry->lsn > 0 || !txn_has_flag(entry->txn, TXN_WAIT_ACK)); > assert(!txn_has_flag(txn, TXN_IS_DONE)); > assert(txn_has_flag(txn, TXN_WAIT_SYNC)); > if (txn_limbo_check_complete(limbo, entry)) > @@ -176,7 +182,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) > goto complete; > } > > - txn_limbo_write_rollback(limbo, entry); > + txn_limbo_write_rollback(limbo, entry->lsn); > struct txn_limbo_entry *e, *tmp; > rlist_foreach_entry_safe_reverse(e, &limbo->queue, > in_queue, tmp) { > @@ -210,10 +216,11 @@ complete: > } > > static int > -txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, > - struct txn_limbo_entry *entry, > +txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn, > bool is_confirm) > { > + assert(lsn > 0); > + > struct xrow_header row; > struct request request = { > .header = &row, > @@ -221,14 +228,13 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, > > int res = 0; > if (is_confirm) { > - res = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn); > + res = xrow_encode_confirm(&row, limbo->instance_id, lsn); > } else { > /* > * This entry is the first to be rolled back, so > - * the last "safe" lsn is entry->lsn - 1. > + * the last "safe" lsn is lsn - 1. > */ > - res = xrow_encode_rollback(&row, limbo->instance_id, > - entry->lsn - 1); > + res = xrow_encode_rollback(&row, limbo->instance_id, lsn - 1); > } > if (res == -1) > return -1; > @@ -260,10 +266,9 @@ rollback: > * transactions waiting for confirmation may be finished. > */ > static int > -txn_limbo_write_confirm(struct txn_limbo *limbo, > - struct txn_limbo_entry *entry) > +txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn) > { > - return txn_limbo_write_confirm_rollback(limbo, entry, true); > + return txn_limbo_write_confirm_rollback(limbo, lsn, true); > } > > void > @@ -300,14 +305,13 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) > > /** > * Write a rollback message to WAL. After it's written > - * all the tarnsactions following the current one and waiting > + * all the transactions following the current one and waiting > * for confirmation must be rolled back. > */ > static int > -txn_limbo_write_rollback(struct txn_limbo *limbo, > - struct txn_limbo_entry *entry) > +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn) > { > - return txn_limbo_write_confirm_rollback(limbo, entry, false); > + return txn_limbo_write_confirm_rollback(limbo, lsn, false); > } > > void > @@ -316,7 +320,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn) > assert(limbo->instance_id != REPLICA_ID_NIL); > struct txn_limbo_entry *e, *tmp; > rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) { > - if (e->lsn <= lsn) > + if (e->lsn <= lsn && txn_has_flag(e->txn, TXN_WAIT_ACK)) > break; Are you rolling back the async transactions that are before the last sync transaction to be rolled back? Why? Shouldn't this condition stay the same? > e->is_rollback = true; > txn_limbo_pop(limbo, e); > @@ -350,31 +354,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) > int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id); > vclock_follow(&limbo->vclock, replica_id, lsn); > struct txn_limbo_entry *e, *last_quorum = NULL; > + int64_t confirm_lsn = -1; > rlist_foreach_entry(e, &limbo->queue, in_queue) { > + assert(e->ack_count <= VCLOCK_MAX); > if (e->lsn > lsn) > break; > - if (e->lsn <= prev_lsn) > - continue; > - assert(e->ack_count <= VCLOCK_MAX); > /* > * Sync transactions need to collect acks. Async > * transactions are automatically committed right > * after all the previous sync transactions are. > */ > - if (txn_has_flag(e->txn, TXN_WAIT_ACK)) { > - if (++e->ack_count < replication_synchro_quorum) > - continue; > - } else { > - assert(txn_has_flag(e->txn, TXN_WAIT_SYNC)); > + if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) { > + assert(e->lsn == -1); > if (last_quorum == NULL) > continue; > + } else if (e->lsn <= prev_lsn) { > + continue; > + } else if (++e->ack_count < replication_synchro_quorum) { > + continue; > + } else { > + confirm_lsn = e->lsn; > } > e->is_commit = true; > last_quorum = e; > } > if (last_quorum == NULL) > return; > - if (txn_limbo_write_confirm(limbo, last_quorum) != 0) { > + if (txn_limbo_write_confirm(limbo, confirm_lsn) != 0) { > // TODO: what to do here?. > // We already failed writing the CONFIRM > // message. What are the chances we'll be > diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result > index 32deb2ac3..339fc0e33 100644 > --- a/test/replication/qsync_basic.result > +++ b/test/replication/qsync_basic.result > @@ -299,6 +299,126 @@ box.space.sync:select{6} > | - [] > | ... > > +-- > +-- Fully local async transaction also waits for existing sync txn. > +-- > +test_run:switch('default') > + | --- > + | - true > + | ... > +box.cfg{replication_synchro_timeout = 1000, replication_synchro_quorum = 2} > + | --- > + | ... > +_ = box.schema.create_space('locallocal', {is_local = true}) > + | --- > + | ... > +_ = _:create_index('pk') > + | --- > + | ... > +-- Propagate local vclock to some insane value to ensure it won't > +-- affect anything. > +box.begin() for i = 1, 2000 do box.space.locallocal:replace{1} end box.commit() > + | --- > + | ... > +do \ > + f1 = fiber.create(box.space.sync.replace, box.space.sync, {8}) \ > + f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {8}) \ > + box.space.test:replace{8} \ > +end > + | --- > + | ... > +f1:status() > + | --- > + | - dead > + | ... > +f2:status() > + | --- > + | - dead > + | ... > +box.space.sync:select{8} > + | --- > + | - - [8] > + | ... > +box.space.locallocal:select{8} > + | --- > + | - - [8] > + | ... > +box.space.test:select{8} > + | --- > + | - - [8] > + | ... > + > +test_run:switch('replica') > + | --- > + | - true > + | ... > +box.space.sync:select{8} > + | --- > + | - - [8] > + | ... > +box.space.locallocal:select{8} > + | --- > + | - [] > + | ... > +box.space.test:select{8} > + | --- > + | - - [8] > + | ... > + > +-- Ensure sync rollback will affect all pending fully local async > +-- transactions too. > +test_run:switch('default') > + | --- > + | - true > + | ... > +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3} > + | --- > + | ... > +do \ > + f1 = fiber.create(box.space.sync.replace, box.space.sync, {9}) \ > + f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {9}) \ > + box.space.test:replace{9} \ > +end > + | --- > + | - error: A rollback for a synchronous transaction is received > + | ... > +f1:status() > + | --- > + | - dead > + | ... > +f2:status() > + | --- > + | - dead > + | ... > +box.space.sync:select{9} > + | --- > + | - [] > + | ... > +box.space.locallocal:select{9} > + | --- > + | - [] > + | ... > +box.space.test:select{9} > + | --- > + | - [] > + | ... > +test_run:switch('replica') > + | --- > + | - true > + | ... > +box.space.sync:select{9} > + | --- > + | - [] > + | ... > +box.space.locallocal:select{9} > + | --- > + | - [] > + | ... > +box.space.test:select{9} > + | --- > + | - [] > + | ... > + > -- > -- gh-5123: quorum 1 still should write CONFIRM. > -- > diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua > index 361f22bc3..6e40131bf 100644 > --- a/test/replication/qsync_basic.test.lua > +++ b/test/replication/qsync_basic.test.lua > @@ -118,6 +118,51 @@ test_run:switch('replica') > box.space.test:select{6} > box.space.sync:select{6} > > +-- > +-- Fully local async transaction also waits for existing sync txn. > +-- > +test_run:switch('default') > +box.cfg{replication_synchro_timeout = 1000, replication_synchro_quorum = 2} > +_ = box.schema.create_space('locallocal', {is_local = true}) > +_ = _:create_index('pk') > +-- Propagate local vclock to some insane value to ensure it won't > +-- affect anything. > +box.begin() for i = 1, 2000 do box.space.locallocal:replace{1} end box.commit() > +do \ > + f1 = fiber.create(box.space.sync.replace, box.space.sync, {8}) \ > + f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {8}) \ > + box.space.test:replace{8} \ > +end > +f1:status() > +f2:status() > +box.space.sync:select{8} > +box.space.locallocal:select{8} > +box.space.test:select{8} > + > +test_run:switch('replica') > +box.space.sync:select{8} > +box.space.locallocal:select{8} > +box.space.test:select{8} > + > +-- Ensure sync rollback will affect all pending fully local async > +-- transactions too. > +test_run:switch('default') > +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3} > +do \ > + f1 = fiber.create(box.space.sync.replace, box.space.sync, {9}) \ > + f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {9}) \ > + box.space.test:replace{9} \ > +end > +f1:status() > +f2:status() > +box.space.sync:select{9} > +box.space.locallocal:select{9} > +box.space.test:select{9} > +test_run:switch('replica') > +box.space.sync:select{9} > +box.space.locallocal:select{9} > +box.space.test:select{9} > + > -- > -- gh-5123: quorum 1 still should write CONFIRM. > -- -- Serge Petrenko