From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp60.i.mail.ru (smtp60.i.mail.ru [217.69.128.40]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id EDA3F42EF5F for ; Fri, 3 Jul 2020 02:40:34 +0300 (MSK) From: Vladislav Shpilevoy Date: Fri, 3 Jul 2020 01:40:29 +0200 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 4/5] [tosquash] replication: rework how local transactions wait sync List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org There was a bug about how async transactions were blocked by a not empty limbo. This was about fully local transactions. The problem is that they don't have LSN in the needed vclock component - it is always 0. It means, that their limbo entry can't get a valid LSN by any means. Even a copy of the previous sync transaction's LSN won't work, because the latter may by still not written to WAL. This patch makes async transactions always have lsn -1 in their limbo entry. Because anyway it does not matter. It is not needed to collect ACKs, nor to propagate limbo's vclock. Now even a fully local transaction can be blocked by a pending sync transaction. Note, this does not cover the case, when the transaction is not fully local, and its last row is local. --- src/box/txn.c | 18 ++-- src/box/txn_limbo.c | 60 +++++++------ test/replication/qsync_basic.result | 120 ++++++++++++++++++++++++++ test/replication/qsync_basic.test.lua | 45 ++++++++++ 4 files changed, 208 insertions(+), 35 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index 6c333cbed..fe0591197 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -688,15 +688,15 @@ txn_commit_async(struct txn *txn) /* See txn_commit(). */ uint32_t origin_id = req->rows[0]->replica_id; - int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn; limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn); if (limbo_entry == NULL) { txn_rollback(txn); return -1; } - assert(lsn > 0); - txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); - + if (txn_has_flag(txn, TXN_WAIT_ACK)) { + int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn; + txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); + } /* * Set a trigger to abort waiting for confirm on * WAL write failure. @@ -779,10 +779,12 @@ txn_commit(struct txn *txn) return -1; } if (is_sync) { - int64_t lsn = req->rows[req->n_rows - 1]->lsn; - txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); - /* Local WAL write is a first 'ACK'. */ - txn_limbo_ack(&txn_limbo, txn_limbo.instance_id, lsn); + if (txn_has_flag(txn, TXN_WAIT_ACK)) { + int64_t lsn = req->rows[req->n_rows - 1]->lsn; + txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); + /* Local WAL write is a first 'ACK'. */ + txn_limbo_ack(&txn_limbo, txn_limbo.instance_id, lsn); + } if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) { txn_free(txn); return -1; diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index 387cfd337..44a0c7273 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -120,6 +120,7 @@ txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry, assert(limbo->instance_id != REPLICA_ID_NIL); assert(entry->lsn == -1); assert(lsn > 0); + assert(txn_has_flag(entry->txn, TXN_WAIT_ACK)); (void) limbo; entry->lsn = lsn; } @@ -129,6 +130,12 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) { if (txn_limbo_entry_is_complete(entry)) return true; + /* + * Async transaction can't complete itself. It is always + * completed by a previous sync transaction. + */ + if (!txn_has_flag(entry->txn, TXN_WAIT_ACK)) + return false; struct vclock_iterator iter; vclock_iterator_init(&iter, &limbo->vclock); int ack_count = 0; @@ -142,14 +149,13 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) } static int -txn_limbo_write_rollback(struct txn_limbo *limbo, - struct txn_limbo_entry *entry); +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn); int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) { struct txn *txn = entry->txn; - assert(entry->lsn > 0); + assert(entry->lsn > 0 || !txn_has_flag(entry->txn, TXN_WAIT_ACK)); assert(!txn_has_flag(txn, TXN_IS_DONE)); assert(txn_has_flag(txn, TXN_WAIT_SYNC)); if (txn_limbo_check_complete(limbo, entry)) @@ -176,7 +182,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) goto complete; } - txn_limbo_write_rollback(limbo, entry); + txn_limbo_write_rollback(limbo, entry->lsn); struct txn_limbo_entry *e, *tmp; rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) { @@ -210,10 +216,11 @@ complete: } static int -txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, - struct txn_limbo_entry *entry, +txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn, bool is_confirm) { + assert(lsn > 0); + struct xrow_header row; struct request request = { .header = &row, @@ -221,14 +228,13 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int res = 0; if (is_confirm) { - res = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn); + res = xrow_encode_confirm(&row, limbo->instance_id, lsn); } else { /* * This entry is the first to be rolled back, so - * the last "safe" lsn is entry->lsn - 1. + * the last "safe" lsn is lsn - 1. */ - res = xrow_encode_rollback(&row, limbo->instance_id, - entry->lsn - 1); + res = xrow_encode_rollback(&row, limbo->instance_id, lsn - 1); } if (res == -1) return -1; @@ -260,10 +266,9 @@ rollback: * transactions waiting for confirmation may be finished. */ static int -txn_limbo_write_confirm(struct txn_limbo *limbo, - struct txn_limbo_entry *entry) +txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn) { - return txn_limbo_write_confirm_rollback(limbo, entry, true); + return txn_limbo_write_confirm_rollback(limbo, lsn, true); } void @@ -300,14 +305,13 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) /** * Write a rollback message to WAL. After it's written - * all the tarnsactions following the current one and waiting + * all the transactions following the current one and waiting * for confirmation must be rolled back. */ static int -txn_limbo_write_rollback(struct txn_limbo *limbo, - struct txn_limbo_entry *entry) +txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn) { - return txn_limbo_write_confirm_rollback(limbo, entry, false); + return txn_limbo_write_confirm_rollback(limbo, lsn, false); } void @@ -316,7 +320,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn) assert(limbo->instance_id != REPLICA_ID_NIL); struct txn_limbo_entry *e, *tmp; rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) { - if (e->lsn <= lsn) + if (e->lsn <= lsn && txn_has_flag(e->txn, TXN_WAIT_ACK)) break; e->is_rollback = true; txn_limbo_pop(limbo, e); @@ -350,31 +354,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id); vclock_follow(&limbo->vclock, replica_id, lsn); struct txn_limbo_entry *e, *last_quorum = NULL; + int64_t confirm_lsn = -1; rlist_foreach_entry(e, &limbo->queue, in_queue) { + assert(e->ack_count <= VCLOCK_MAX); if (e->lsn > lsn) break; - if (e->lsn <= prev_lsn) - continue; - assert(e->ack_count <= VCLOCK_MAX); /* * Sync transactions need to collect acks. Async * transactions are automatically committed right * after all the previous sync transactions are. */ - if (txn_has_flag(e->txn, TXN_WAIT_ACK)) { - if (++e->ack_count < replication_synchro_quorum) - continue; - } else { - assert(txn_has_flag(e->txn, TXN_WAIT_SYNC)); + if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) { + assert(e->lsn == -1); if (last_quorum == NULL) continue; + } else if (e->lsn <= prev_lsn) { + continue; + } else if (++e->ack_count < replication_synchro_quorum) { + continue; + } else { + confirm_lsn = e->lsn; } e->is_commit = true; last_quorum = e; } if (last_quorum == NULL) return; - if (txn_limbo_write_confirm(limbo, last_quorum) != 0) { + if (txn_limbo_write_confirm(limbo, confirm_lsn) != 0) { // TODO: what to do here?. // We already failed writing the CONFIRM // message. What are the chances we'll be diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result index 32deb2ac3..339fc0e33 100644 --- a/test/replication/qsync_basic.result +++ b/test/replication/qsync_basic.result @@ -299,6 +299,126 @@ box.space.sync:select{6} | - [] | ... +-- +-- Fully local async transaction also waits for existing sync txn. +-- +test_run:switch('default') + | --- + | - true + | ... +box.cfg{replication_synchro_timeout = 1000, replication_synchro_quorum = 2} + | --- + | ... +_ = box.schema.create_space('locallocal', {is_local = true}) + | --- + | ... +_ = _:create_index('pk') + | --- + | ... +-- Propagate local vclock to some insane value to ensure it won't +-- affect anything. +box.begin() for i = 1, 2000 do box.space.locallocal:replace{1} end box.commit() + | --- + | ... +do \ + f1 = fiber.create(box.space.sync.replace, box.space.sync, {8}) \ + f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {8}) \ + box.space.test:replace{8} \ +end + | --- + | ... +f1:status() + | --- + | - dead + | ... +f2:status() + | --- + | - dead + | ... +box.space.sync:select{8} + | --- + | - - [8] + | ... +box.space.locallocal:select{8} + | --- + | - - [8] + | ... +box.space.test:select{8} + | --- + | - - [8] + | ... + +test_run:switch('replica') + | --- + | - true + | ... +box.space.sync:select{8} + | --- + | - - [8] + | ... +box.space.locallocal:select{8} + | --- + | - [] + | ... +box.space.test:select{8} + | --- + | - - [8] + | ... + +-- Ensure sync rollback will affect all pending fully local async +-- transactions too. +test_run:switch('default') + | --- + | - true + | ... +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3} + | --- + | ... +do \ + f1 = fiber.create(box.space.sync.replace, box.space.sync, {9}) \ + f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {9}) \ + box.space.test:replace{9} \ +end + | --- + | - error: A rollback for a synchronous transaction is received + | ... +f1:status() + | --- + | - dead + | ... +f2:status() + | --- + | - dead + | ... +box.space.sync:select{9} + | --- + | - [] + | ... +box.space.locallocal:select{9} + | --- + | - [] + | ... +box.space.test:select{9} + | --- + | - [] + | ... +test_run:switch('replica') + | --- + | - true + | ... +box.space.sync:select{9} + | --- + | - [] + | ... +box.space.locallocal:select{9} + | --- + | - [] + | ... +box.space.test:select{9} + | --- + | - [] + | ... + -- -- gh-5123: quorum 1 still should write CONFIRM. -- diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua index 361f22bc3..6e40131bf 100644 --- a/test/replication/qsync_basic.test.lua +++ b/test/replication/qsync_basic.test.lua @@ -118,6 +118,51 @@ test_run:switch('replica') box.space.test:select{6} box.space.sync:select{6} +-- +-- Fully local async transaction also waits for existing sync txn. +-- +test_run:switch('default') +box.cfg{replication_synchro_timeout = 1000, replication_synchro_quorum = 2} +_ = box.schema.create_space('locallocal', {is_local = true}) +_ = _:create_index('pk') +-- Propagate local vclock to some insane value to ensure it won't +-- affect anything. +box.begin() for i = 1, 2000 do box.space.locallocal:replace{1} end box.commit() +do \ + f1 = fiber.create(box.space.sync.replace, box.space.sync, {8}) \ + f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {8}) \ + box.space.test:replace{8} \ +end +f1:status() +f2:status() +box.space.sync:select{8} +box.space.locallocal:select{8} +box.space.test:select{8} + +test_run:switch('replica') +box.space.sync:select{8} +box.space.locallocal:select{8} +box.space.test:select{8} + +-- Ensure sync rollback will affect all pending fully local async +-- transactions too. +test_run:switch('default') +box.cfg{replication_synchro_timeout = 0.001, replication_synchro_quorum = 3} +do \ + f1 = fiber.create(box.space.sync.replace, box.space.sync, {9}) \ + f2 = fiber.create(box.space.locallocal.replace, box.space.locallocal, {9}) \ + box.space.test:replace{9} \ +end +f1:status() +f2:status() +box.space.sync:select{9} +box.space.locallocal:select{9} +box.space.test:select{9} +test_run:switch('replica') +box.space.sync:select{9} +box.space.locallocal:select{9} +box.space.test:select{9} + -- -- gh-5123: quorum 1 still should write CONFIRM. -- -- 2.21.1 (Apple Git-122.3)