From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 75C0A6F3F2; Mon, 11 Oct 2021 16:24:12 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 75C0A6F3F2 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1633958652; bh=1ykWJ4Ivcsyb1Za5i7eOeTc8YtJw1PMIEHkzSH8cZa8=; h=To:Cc:References:Date:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=acDDjcqXmBcXH7yLTiNrsUTSgpQlpX6ninFnqWbZzAYK74R5GC1ci5W25Qleuihdi P3UAUy5QCueLdF6EaMkGvd9TyrMrW3BZOEbqBHRP1xUg1bTfq2/Nk6mQVF93meJQWX hz8O0VjBOziyJDWEhfkLso5EqL//I+cLO/PvfZ/E= Received: from smtp36.i.mail.ru (smtp36.i.mail.ru [94.100.177.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id A6C776F3F2 for ; Mon, 11 Oct 2021 16:24:10 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org A6C776F3F2 Received: by smtp36.i.mail.ru with esmtpa (envelope-from ) id 1mZvHl-0002TB-Qp; Mon, 11 Oct 2021 16:24:10 +0300 To: Cyrill Gorcunov , tml Cc: Vladislav Shpilevoy References: <20211008175809.349501-1-gorcunov@gmail.com> <20211008175809.349501-3-gorcunov@gmail.com> Message-ID: <946dca1e-e36a-0650-6e9d-98d92098213d@tarantool.org> Date: Mon, 11 Oct 2021 16:24:09 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:78.0) Gecko/20100101 Thunderbird/78.14.0 MIME-Version: 1.0 In-Reply-To: <20211008175809.349501-3-gorcunov@gmail.com> Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Content-Language: en-GB X-4EC0790: 10 X-7564579A: 78E4E2B564C1792B X-77F55803: 4F1203BC0FB41BD922964B4BA091D9ACADAF6F8DFF1F92E5AE8E9BE4794C522A182A05F538085040BA7349AD619E8D5BA159AF4A73815ACA92D7C987D164F5859EF2931B8360E926 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7AE8B5BEC11030C2FEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F790063725D748B084CAA27D8638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D892F3271DB8C2A2B4AD8BC6EDFE7F7688117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCAA867293B0326636D2E47CDBA5A96583BD4B6F7A4D31EC0BC014FD901B82EE079FA2833FD35BB23D27C277FBC8AE2E8B974A882099E279BDA471835C12D1D977C4224003CC8364762BB6847A3DEAEFB0F43C7A68FF6260569E8FC8737B5C2249EC8D19AE6D49635B68655334FD4449CB9ECD01F8117BC8BEAAAE862A0553A39223F8577A6DFFEA7CB1724D34C644744043847C11F186F3C59DAA53EE0834AAEE X-C1DE0DAB: 0D63561A33F958A5FEC6E8D0423B2171E00D00101B9E11CE402673BA013DDF51D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA759D2A03B9C34326B3410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D344FDECC3B9E4D57D4FC5F4C36E44B1737341487AD36F7C7BE40685B0EA62BAD4761B5E19075D4CA961D7E09C32AA3244C425B3DC7A8D06BD22A35852D2805BFA78894E9C85370243EFACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2bioju/+AAevgYAX9J0pxjeFP7g== X-Mailru-Sender: 3B9A0136629DC9125D61937A2360A446F88426727C821144C2D72815FEFA26838284F42B5E333C68424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v21 2/3] qsync: order access to the limbo terms X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" 08.10.2021 20:58, Cyrill Gorcunov пишет: > Limbo terms tracking is shared between appliers and when > one of appliers is waiting for write to complete inside > journal_write() routine, an other may need to access read > term value to figure out if promote request is valid to > apply. Due to cooperative multitasking access to the terms > is not consistent so we need to be sure that other fibers > read up to date terms (ie written to the WAL). > > For this sake we use a latching mechanism, when one fiber > takes a lock for updating other readers are waiting until > the operation is complete. > > For example here is a call graph of two appliers Hi! Thanks for the fixes! Please, find a couple of minor comments below. Other than that LGTM! > > applier 1 > --------- > applier_apply_tx > (promote term = 3 > current max term = 2) > applier_synchro_filter_tx > apply_synchro_row > journal_write > (sleeping) > > at this moment another applier comes in with obsolete > data and term 2 > > applier 2 > --------- > applier_apply_tx > (term 2) > applier_synchro_filter_tx > txn_limbo_is_replica_outdated -> false > journal_write (sleep) > > applier 1 > --------- > journal wakes up > apply_synchro_row_cb > set max term to 3 > > So the applier 2 didn't notice that term 3 is already seen > and wrote obsolete data. With locking the applier 2 will > wait until applier 1 has finished its write. > > Also Serge Petrenko pointed that we have somewhat similar situation > with txn_limbo_ack()[we might try to write confirm on entry while > new promote is in fly and not yet applied, so confirm might be invalid] > and txn_limbo_on_parameters_change() [where we might confirm entries > reducing quorum number while we even not a limbo owner]. Thus we need > to fix these problems as well. > > We introduce the following helpers: > > 1) txn_limbo_process_begin: which takes a lock > 2) txn_limbo_process_commit and txn_limbo_process_rollback > which simply release the lock but have different names > for better semantics > 3) txn_limbo_process is a general function which uses x_begin > and x_commit helper internally > 4) txn_limbo_process_core to do a real job over processing the > request, it implies that txn_limbo_process_begin been called > 5) txn_limbo_ack() and txn_limbo_on_parameters_change() both > respect current limbo owner via promote latch. > > Part-of #6036 > > Signed-off-by: Cyrill Gorcunov > --- > src/box/applier.cc | 12 +++-- > src/box/box.cc | 15 ++++--- > src/box/relay.cc | 11 ++--- > src/box/txn.c | 2 +- > src/box/txn_limbo.c | 41 +++++++++++++++-- > src/box/txn_limbo.h | 80 +++++++++++++++++++++++++++++++--- > test/unit/snap_quorum_delay.cc | 5 ++- > 7 files changed, 136 insertions(+), 30 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index b981bd436..46c36e259 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -857,7 +857,7 @@ apply_synchro_row_cb(struct journal_entry *entry) > applier_rollback_by_wal_io(entry->res); > } else { > replica_txn_wal_write_cb(synchro_entry->rcb); > - txn_limbo_process(&txn_limbo, synchro_entry->req); > + txn_limbo_process_core(&txn_limbo, synchro_entry->req); > trigger_run(&replicaset.applier.on_wal_write, NULL); > } > fiber_wakeup(synchro_entry->owner); > @@ -873,6 +873,8 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row) > if (xrow_decode_synchro(row, &req) != 0) > goto err; > > + txn_limbo_process_begin(&txn_limbo); > + > struct replica_cb_data rcb_data; > struct synchro_entry entry; > /* > @@ -910,12 +912,16 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row) > * transactions side, including the async ones. > */ > if (journal_write(&entry.base) != 0) > - goto err; > + goto err_rollback; > if (entry.base.res < 0) { > diag_set_journal_res(entry.base.res); > - goto err; > + goto err_rollback; > } > + txn_limbo_process_commit(&txn_limbo); > return 0; > + > +err_rollback: > + txn_limbo_process_rollback(&txn_limbo); > err: > diag_log(); > return -1; > diff --git a/src/box/box.cc b/src/box/box.cc > index e082e1a3d..6a9be745a 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -1677,8 +1677,6 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn) > struct raft *raft = box_raft(); > assert(raft->volatile_term == raft->term); > assert(promote_lsn >= 0); > - txn_limbo_write_promote(&txn_limbo, promote_lsn, > - raft->term); > struct synchro_request req = { > .type = IPROTO_RAFT_PROMOTE, > .replica_id = prev_leader_id, > @@ -1686,8 +1684,11 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn) > .lsn = promote_lsn, > .term = raft->term, > }; > - txn_limbo_process(&txn_limbo, &req); > + txn_limbo_process_begin(&txn_limbo); > + txn_limbo_write_promote(&txn_limbo, req.lsn, req.term); > + txn_limbo_process_core(&txn_limbo, &req); > assert(txn_limbo_is_empty(&txn_limbo)); > + txn_limbo_process_commit(&txn_limbo); > } > > /** A guard to block multiple simultaneous promote()/demote() invocations. */ > @@ -1699,8 +1700,6 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn) > { > assert(box_raft()->volatile_term == box_raft()->term); > assert(promote_lsn >= 0); > - txn_limbo_write_demote(&txn_limbo, promote_lsn, > - box_raft()->term); > struct synchro_request req = { > .type = IPROTO_RAFT_DEMOTE, > .replica_id = prev_leader_id, > @@ -1708,8 +1707,12 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn) > .lsn = promote_lsn, > .term = box_raft()->term, > }; > - txn_limbo_process(&txn_limbo, &req); > + txn_limbo_process_begin(&txn_limbo); > + txn_limbo_write_demote(&txn_limbo, promote_lsn, > + box_raft()->term); > + txn_limbo_process_core(&txn_limbo, &req); > assert(txn_limbo_is_empty(&txn_limbo)); > + txn_limbo_process_commit(&txn_limbo); > } > > int > diff --git a/src/box/relay.cc b/src/box/relay.cc > index f5852df7b..61ef1e3a5 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -545,15 +545,10 @@ tx_status_update(struct cmsg *msg) > ack.vclock = &status->vclock; > /* > * Let pending synchronous transactions know, which of > - * them were successfully sent to the replica. Acks are > - * collected only by the transactions originator (which is > - * the single master in 100% so far). Other instances wait > - * for master's CONFIRM message instead. > + * them were successfully sent to the replica. > */ > - if (txn_limbo.owner_id == instance_id) { > - txn_limbo_ack(&txn_limbo, ack.source, > - vclock_get(ack.vclock, instance_id)); > - } > + txn_limbo_ack(&txn_limbo, instance_id, ack.source, > + vclock_get(ack.vclock, instance_id)); > trigger_run(&replicaset.on_ack, &ack); > > static const struct cmsg_hop route[] = { > diff --git a/src/box/txn.c b/src/box/txn.c > index e7fc81683..06bb85a09 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -939,7 +939,7 @@ txn_commit(struct txn *txn) > txn_limbo_assign_local_lsn(&txn_limbo, limbo_entry, > lsn); > /* Local WAL write is a first 'ACK'. */ > - txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, lsn); > + txn_limbo_ack_self(&txn_limbo, lsn); > } > if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) > goto rollback; > diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c > index 70447caaf..176a83cb5 100644 > --- a/src/box/txn_limbo.c > +++ b/src/box/txn_limbo.c > @@ -47,6 +47,7 @@ txn_limbo_create(struct txn_limbo *limbo) > vclock_create(&limbo->vclock); > vclock_create(&limbo->promote_term_map); > limbo->promote_greatest_term = 0; > + latch_create(&limbo->promote_latch); > limbo->confirmed_lsn = 0; > limbo->rollback_count = 0; > limbo->is_in_rollback = false; > @@ -542,10 +543,23 @@ txn_limbo_read_demote(struct txn_limbo *limbo, int64_t lsn) > } > > void > -txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) > +txn_limbo_ack(struct txn_limbo *limbo, uint32_t owner_id, > + uint32_t replica_id, int64_t lsn) > { > if (rlist_empty(&limbo->queue)) > return; > + /* > + * ACKs are collected only by the transactions originator > + * (which is the single master in 100% so far). Other instances > + * wait for master's CONFIRM message instead. > + * > + * Due to cooperative multitasking there might be limbo owner > + * migration in-fly (while writing data to journal), so for > + * simplicity sake the test for owner is done here instead > + * of putting this check to the callers. > + */ > + if (!txn_limbo_is_owner(limbo, owner_id)) > + return; I feel like you should check for limbo->queue emptiness **after** taking the latch. Anything may change while we sleep. > /* > * If limbo is currently writing a rollback, it means that the whole > * queue will be rolled back. Because rollback is written only for > @@ -724,11 +738,14 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout) > } > > void > -txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) > +txn_limbo_process_core(struct txn_limbo *limbo, > + const struct synchro_request *req) > { > + assert(latch_is_locked(&limbo->promote_latch)); > + > uint64_t term = req->term; > uint32_t origin = req->origin_id; > - if (txn_limbo_replica_term(limbo, origin) < term) { > + if (vclock_get(&limbo->promote_term_map, origin) < (int64_t)term) { > vclock_follow(&limbo->promote_term_map, origin, term); > if (term > limbo->promote_greatest_term) > limbo->promote_greatest_term = term; > @@ -786,11 +803,29 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) > return; > } > > +void > +txn_limbo_process(struct txn_limbo *limbo, > + const struct synchro_request *req) > +{ > + txn_limbo_process_begin(limbo); > + txn_limbo_process_core(limbo, req); > + txn_limbo_process_commit(limbo); > +} > + > void > txn_limbo_on_parameters_change(struct txn_limbo *limbo) > { > if (rlist_empty(&limbo->queue)) > return; > + /* > + * In case if we're not current leader (ie not owning the > + * limbo) then we should not confirm anything, otherwise > + * we could reduce quorum number and start writing CONFIRM > + * while leader node carries own maybe bigger quorum value. > + */ > + if (!txn_limbo_is_owner(limbo, instance_id)) > + return; > + Same as above, let's check for queue emptiness after taking the latch (checking txn_limbo_is_owner). > struct txn_limbo_entry *e; > int64_t confirm_lsn = -1; > rlist_foreach_entry(e, &limbo->queue, in_queue) { > diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h > index 53e52f676..aaff444e4 100644 > --- a/src/box/txn_limbo.h > +++ b/src/box/txn_limbo.h > @@ -31,6 +31,7 @@ > */ > #include "small/rlist.h" > #include "vclock/vclock.h" > +#include "latch.h" > > #include > > @@ -147,6 +148,10 @@ struct txn_limbo { > * limbo and raft are in sync and the terms are the same. > */ > uint64_t promote_greatest_term; > + /** > + * To order access to the promote data. > + */ > + struct latch promote_latch; > /** > * Maximal LSN gathered quorum and either already confirmed in WAL, or > * whose confirmation is in progress right now. Any attempt to confirm > @@ -194,6 +199,23 @@ txn_limbo_is_empty(struct txn_limbo *limbo) > return rlist_empty(&limbo->queue); > } > > +/** > + * Test if the \a owner_id is current limbo owner. > + */ > +static inline bool > +txn_limbo_is_owner(struct txn_limbo *limbo, uint32_t owner_id) > +{ > + /* > + * A guard needed to prevent race with in-fly promote > + * packets which are sitting inside journal but not yet > + * written. > + */ > + latch_lock(&limbo->promote_latch); > + bool v = limbo->owner_id == owner_id; > + latch_unlock(&limbo->promote_latch); > + return v; > +} > + > bool > txn_limbo_is_ro(struct txn_limbo *limbo); > > @@ -216,9 +238,12 @@ txn_limbo_last_entry(struct txn_limbo *limbo) > * @a replica_id. > */ > static inline uint64_t > -txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id) > +txn_limbo_replica_term(struct txn_limbo *limbo, uint32_t replica_id) > { > - return vclock_get(&limbo->promote_term_map, replica_id); > + latch_lock(&limbo->promote_latch); > + uint64_t v = vclock_get(&limbo->promote_term_map, replica_id); > + latch_unlock(&limbo->promote_latch); > + return v; > } > > /** > @@ -226,11 +251,14 @@ txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id) > * data from it. The check is only valid when elections are enabled. > */ > static inline bool > -txn_limbo_is_replica_outdated(const struct txn_limbo *limbo, > +txn_limbo_is_replica_outdated(struct txn_limbo *limbo, > uint32_t replica_id) > { > - return txn_limbo_replica_term(limbo, replica_id) < > - limbo->promote_greatest_term; > + latch_lock(&limbo->promote_latch); > + uint64_t v = vclock_get(&limbo->promote_term_map, replica_id); > + bool res = v < limbo->promote_greatest_term; > + latch_unlock(&limbo->promote_latch); > + return res; > } > > /** > @@ -287,7 +315,15 @@ txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry, > * replica with the specified ID. > */ > void > -txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); > +txn_limbo_ack(struct txn_limbo *limbo, uint32_t owner_id, > + uint32_t replica_id, int64_t lsn); > + > +static inline void > +txn_limbo_ack_self(struct txn_limbo *limbo, int64_t lsn) > +{ > + return txn_limbo_ack(limbo, limbo->owner_id, > + limbo->owner_id, lsn); > +} > > /** > * Block the current fiber until the transaction in the limbo > @@ -300,7 +336,37 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); > int > txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); > > -/** Execute a synchronous replication request. */ > +/** > + * Initiate execution of a synchronous replication request. > + */ > +static inline void > +txn_limbo_process_begin(struct txn_limbo *limbo) > +{ > + latch_lock(&limbo->promote_latch); > +} > + > +/** Commit a synchronous replication request. */ > +static inline void > +txn_limbo_process_commit(struct txn_limbo *limbo) > +{ > + assert(latch_is_locked(&limbo->promote_latch)); latch_unlock itself has an assertion, even a more strict one. So this assertion here and in process_rollback() is extraneous. > + latch_unlock(&limbo->promote_latch); > +} > + > +/** Rollback a synchronous replication request. */ > +static inline void > +txn_limbo_process_rollback(struct txn_limbo *limbo) > +{ > + assert(latch_is_locked(&limbo->promote_latch)); > + latch_unlock(&limbo->promote_latch); > +} > + > +/** Core of processing synchronous replication request. */ > +void > +txn_limbo_process_core(struct txn_limbo *limbo, > + const struct synchro_request *req); > + > +/** Process a synchronous replication request. */ > void > txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req); > > diff --git a/test/unit/snap_quorum_delay.cc b/test/unit/snap_quorum_delay.cc > index 803bbbea8..d43b4cd2c 100644 > --- a/test/unit/snap_quorum_delay.cc > +++ b/test/unit/snap_quorum_delay.cc > @@ -130,7 +130,7 @@ txn_process_func(va_list ap) > } > > txn_limbo_assign_local_lsn(&txn_limbo, entry, fake_lsn); > - txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, fake_lsn); > + txn_limbo_ack_self(&txn_limbo, fake_lsn); > txn_limbo_wait_complete(&txn_limbo, entry); > > switch (process_type) { > @@ -157,7 +157,8 @@ txn_confirm_func(va_list ap) > * inside gc_checkpoint(). > */ > fiber_sleep(0); > - txn_limbo_ack(&txn_limbo, relay_id, fake_lsn); > + txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, > + relay_id, fake_lsn); > return 0; > } > -- Serge Petrenko