From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 41F5E71190; Tue, 12 Oct 2021 12:40:47 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 41F5E71190 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1634031647; bh=gxV7AVkP4lgw68BPYhjAMd55RTBUNOsL1a593G5U61E=; h=To:Cc:References:Date:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=rnmRgZs8Oq4xKKsfFRibSC31kn1M0mkyCPC1yBaBDFjpngPE7zifQmarAyhUnjMHY Z8WZRXR7VCMuZDKBX1OnGsTxI8xrqrRCftF6OI2iQVein6FIIaRuDp80qAJx4h4mMs XywgcvCD9ZTBW0hyD7GNa+VnrkBMqg62gWIbJUVA= Received: from smtp48.i.mail.ru (smtp48.i.mail.ru [94.100.177.108]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 6BBEE7118C for ; Tue, 12 Oct 2021 12:40:45 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 6BBEE7118C Received: by smtp48.i.mail.ru with esmtpa (envelope-from ) id 1maEH6-0000DY-Ar; Tue, 12 Oct 2021 12:40:45 +0300 To: Cyrill Gorcunov , tml Cc: Vladislav Shpilevoy References: <20211011191635.573685-1-gorcunov@gmail.com> <20211011191635.573685-3-gorcunov@gmail.com> Message-ID: <8a02db75-ce19-cb8b-1b41-b74184b167fc@tarantool.org> Date: Tue, 12 Oct 2021 12:40:43 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:78.0) Gecko/20100101 Thunderbird/78.14.0 MIME-Version: 1.0 In-Reply-To: <20211011191635.573685-3-gorcunov@gmail.com> Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Content-Language: en-GB X-4EC0790: 10 X-7564579A: EEAE043A70213CC8 X-77F55803: 4F1203BC0FB41BD922964B4BA091D9AC4A5E31101C019806DCD444E38A373F3D182A05F5380850403B965D6D168B14686626FDCFD85630E5492F5C393CE2287358FEFC5794B1A31C X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7408FFE705ACEE2A7EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637AC20F58FBAB79054EA1F7E6F0F101C6723150C8DA25C47586E58E00D9D99D84E1BDDB23E98D2D38BBCA57AF85F7723F2060159F44C7EA410F5D824ABDB6063EACC7F00164DA146DAFE8445B8C89999728AA50765F7900637F6B57BC7E64490618DEB871D839B7333395957E7521B51C2DFABB839C843B9C08941B15DA834481F8AA50765F7900637D0FEED2715E18529389733CBF5DBD5E9B5C8C57E37DE458B9E9CE733340B9D5F3BBE47FD9DD3FB595F5C1EE8F4F765FC72CEEB2601E22B093A03B725D353964B0B7D0EA88DDEDAC722CA9DD8327EE4930A3850AC1BE2E7356436AE5DD6441DC7C4224003CC83647689D4C264860C145E X-C1DE0DAB: 0D63561A33F958A53F09D3D8B15E3693C823CF079C55DDEDA59228093B95FBCDD59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA752654937D1C607C78410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D343D50AEDB859DBAD962CFA9B069DF3C16C56B22801579136CB18762374D2D8A37A8C372606DBFC1A31D7E09C32AA3244C42A32EC3AE4BE9B2C5CA8B756D0E78703A76366E8A9DE7CAFACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojtWOf/ZwQPGGDrfUTW8FlUQ== X-Mailru-Sender: 3B9A0136629DC9125D61937A2360A446FDD5356D4211BFF6EADCA25DAEB486AD4FB1A2BB105D0B89424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v22 2/3] qsync: order access to the limbo terms X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" 11.10.2021 22:16, Cyrill Gorcunov пишет: > Limbo terms tracking is shared between appliers and when > one of appliers is waiting for write to complete inside > journal_write() routine, an other may need to access read > term value to figure out if promote request is valid to > apply. Due to cooperative multitasking access to the terms > is not consistent so we need to be sure that other fibers > read up to date terms (ie written to the WAL). > > For this sake we use a latching mechanism, when one fiber > takes a lock for updating other readers are waiting until > the operation is complete. > > For example here is a call graph of two appliers Thanks for the changes! One final nit and we're good to go. > > applier 1 > --------- > applier_apply_tx > (promote term = 3 > current max term = 2) > applier_synchro_filter_tx > apply_synchro_row > journal_write > (sleeping) > > at this moment another applier comes in with obsolete > data and term 2 > > applier 2 > --------- > applier_apply_tx > (term 2) > applier_synchro_filter_tx > txn_limbo_is_replica_outdated -> false > journal_write (sleep) > > applier 1 > --------- > journal wakes up > apply_synchro_row_cb > set max term to 3 > > So the applier 2 didn't notice that term 3 is already seen > and wrote obsolete data. With locking the applier 2 will > wait until applier 1 has finished its write. > > Also Serge Petrenko pointed that we have somewhat similar situation > with txn_limbo_ack()[we might try to write confirm on entry while > new promote is in fly and not yet applied, so confirm might be invalid] > and txn_limbo_on_parameters_change() [where we might confirm entries > reducing quorum number while we even not a limbo owner]. Thus we need > to fix these problems as well. > > We introduce the following helpers: > > 1) txn_limbo_process_begin: which takes a lock > 2) txn_limbo_process_commit and txn_limbo_process_rollback > which simply release the lock but have different names > for better semantics > 3) txn_limbo_process is a general function which uses x_begin > and x_commit helper internally > 4) txn_limbo_process_core to do a real job over processing the > request, it implies that txn_limbo_process_begin been called > 5) txn_limbo_ack() and txn_limbo_on_parameters_change() both > respect current limbo owner via promote latch. > > Part-of #6036 > > Signed-off-by: Cyrill Gorcunov > --- > src/box/applier.cc | 12 ++++-- > src/box/box.cc | 15 ++++--- > src/box/relay.cc | 11 ++--- > src/box/txn.c | 2 +- > src/box/txn_limbo.c | 49 +++++++++++++++++++-- > src/box/txn_limbo.h | 78 +++++++++++++++++++++++++++++++--- > test/unit/snap_quorum_delay.cc | 5 ++- > 7 files changed, 142 insertions(+), 30 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index b981bd436..46c36e259 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -857,7 +857,7 @@ apply_synchro_row_cb(struct journal_entry *entry) > applier_rollback_by_wal_io(entry->res); > } else { > replica_txn_wal_write_cb(synchro_entry->rcb); > - txn_limbo_process(&txn_limbo, synchro_entry->req); > + txn_limbo_process_core(&txn_limbo, synchro_entry->req); > trigger_run(&replicaset.applier.on_wal_write, NULL); > } > fiber_wakeup(synchro_entry->owner); > @@ -873,6 +873,8 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row) > if (xrow_decode_synchro(row, &req) != 0) > goto err; > > + txn_limbo_process_begin(&txn_limbo); > + > struct replica_cb_data rcb_data; > struct synchro_entry entry; > /* > @@ -910,12 +912,16 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row) > * transactions side, including the async ones. > */ > if (journal_write(&entry.base) != 0) > - goto err; > + goto err_rollback; > if (entry.base.res < 0) { > diag_set_journal_res(entry.base.res); > - goto err; > + goto err_rollback; > } > + txn_limbo_process_commit(&txn_limbo); > return 0; > + > +err_rollback: > + txn_limbo_process_rollback(&txn_limbo); > err: > diag_log(); > return -1; > diff --git a/src/box/box.cc b/src/box/box.cc > index e082e1a3d..6a9be745a 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -1677,8 +1677,6 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn) > struct raft *raft = box_raft(); > assert(raft->volatile_term == raft->term); > assert(promote_lsn >= 0); > - txn_limbo_write_promote(&txn_limbo, promote_lsn, > - raft->term); > struct synchro_request req = { > .type = IPROTO_RAFT_PROMOTE, > .replica_id = prev_leader_id, > @@ -1686,8 +1684,11 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn) > .lsn = promote_lsn, > .term = raft->term, > }; > - txn_limbo_process(&txn_limbo, &req); > + txn_limbo_process_begin(&txn_limbo); > + txn_limbo_write_promote(&txn_limbo, req.lsn, req.term); > + txn_limbo_process_core(&txn_limbo, &req); > assert(txn_limbo_is_empty(&txn_limbo)); > + txn_limbo_process_commit(&txn_limbo); > } > > /** A guard to block multiple simultaneous promote()/demote() invocations. */ > @@ -1699,8 +1700,6 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn) > { > assert(box_raft()->volatile_term == box_raft()->term); > assert(promote_lsn >= 0); > - txn_limbo_write_demote(&txn_limbo, promote_lsn, > - box_raft()->term); > struct synchro_request req = { > .type = IPROTO_RAFT_DEMOTE, > .replica_id = prev_leader_id, > @@ -1708,8 +1707,12 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn) > .lsn = promote_lsn, > .term = box_raft()->term, > }; > - txn_limbo_process(&txn_limbo, &req); > + txn_limbo_process_begin(&txn_limbo); > + txn_limbo_write_demote(&txn_limbo, promote_lsn, > + box_raft()->term); > + txn_limbo_process_core(&txn_limbo, &req); > assert(txn_limbo_is_empty(&txn_limbo)); > + txn_limbo_process_commit(&txn_limbo); > } > > int > diff --git a/src/box/relay.cc b/src/box/relay.cc > index f5852df7b..61ef1e3a5 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -545,15 +545,10 @@ tx_status_update(struct cmsg *msg) > ack.vclock = &status->vclock; > /* > * Let pending synchronous transactions know, which of > - * them were successfully sent to the replica. Acks are > - * collected only by the transactions originator (which is > - * the single master in 100% so far). Other instances wait > - * for master's CONFIRM message instead. > + * them were successfully sent to the replica. > */ > - if (txn_limbo.owner_id == instance_id) { > - txn_limbo_ack(&txn_limbo, ack.source, > - vclock_get(ack.vclock, instance_id)); > - } > + txn_limbo_ack(&txn_limbo, instance_id, ack.source, > + vclock_get(ack.vclock, instance_id)); > trigger_run(&replicaset.on_ack, &ack); > > static const struct cmsg_hop route[] = { > diff --git a/src/box/txn.c b/src/box/txn.c > index e7fc81683..06bb85a09 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -939,7 +939,7 @@ txn_commit(struct txn *txn) > txn_limbo_assign_local_lsn(&txn_limbo, limbo_entry, > lsn); > /* Local WAL write is a first 'ACK'. */ > - txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, lsn); > + txn_limbo_ack_self(&txn_limbo, lsn); > } > if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) > goto rollback; > diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c > index 70447caaf..8f9bc11c7 100644 > --- a/src/box/txn_limbo.c > +++ b/src/box/txn_limbo.c > @@ -47,6 +47,7 @@ txn_limbo_create(struct txn_limbo *limbo) > vclock_create(&limbo->vclock); > vclock_create(&limbo->promote_term_map); > limbo->promote_greatest_term = 0; > + latch_create(&limbo->promote_latch); > limbo->confirmed_lsn = 0; > limbo->rollback_count = 0; > limbo->is_in_rollback = false; > @@ -542,10 +543,30 @@ txn_limbo_read_demote(struct txn_limbo *limbo, int64_t lsn) > } > > void > -txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) > +txn_limbo_ack(struct txn_limbo *limbo, uint32_t owner_id, > + uint32_t replica_id, int64_t lsn) > { > + /* > + * ACKs are collected only by the transactions originator > + * (which is the single master in 100% so far). Other instances > + * wait for master's CONFIRM message instead. > + * > + * Due to cooperative multitasking there might be limbo owner > + * migration in-fly (while writing data to journal), so for > + * simplicity sake the test for owner is done here instead > + * of putting this check to the callers. > + */ > + if (!txn_limbo_is_owner(limbo, owner_id)) > + return; > + > + /* > + * Test for empty queue is done _after_ txn_limbo_is_owner > + * call because we need to be sure that limbo is not been > + * changed under our feets while we're reading it. feets -> feet. > + */ > if (rlist_empty(&limbo->queue)) > return; > + > /* > * If limbo is currently writing a rollback, it means that the whole > * queue will be rolled back. Because rollback is written only for > @@ -724,11 +745,14 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout) > } > > void > -txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) > +txn_limbo_process_core(struct txn_limbo *limbo, > + const struct synchro_request *req) > { > + assert(latch_is_locked(&limbo->promote_latch)); > + > uint64_t term = req->term; > uint32_t origin = req->origin_id; > - if (txn_limbo_replica_term(limbo, origin) < term) { > + if (vclock_get(&limbo->promote_term_map, origin) < (int64_t)term) { > vclock_follow(&limbo->promote_term_map, origin, term); > if (term > limbo->promote_greatest_term) > limbo->promote_greatest_term = term; > @@ -786,11 +810,30 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) > return; > } > > +void > +txn_limbo_process(struct txn_limbo *limbo, > + const struct synchro_request *req) > +{ > + txn_limbo_process_begin(limbo); > + txn_limbo_process_core(limbo, req); > + txn_limbo_process_commit(limbo); > +} > + > void > txn_limbo_on_parameters_change(struct txn_limbo *limbo) > { > + /* > + * In case if we're not current leader (ie not owning the > + * limbo) then we should not confirm anything, otherwise > + * we could reduce quorum number and start writing CONFIRM > + * while leader node carries own maybe bigger quorum value. > + */ > + if (!txn_limbo_is_owner(limbo, instance_id)) > + return; > + > if (rlist_empty(&limbo->queue)) > return; > + > struct txn_limbo_entry *e; > int64_t confirm_lsn = -1; > rlist_foreach_entry(e, &limbo->queue, in_queue) { > diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h > index 53e52f676..33cacef8f 100644 > --- a/src/box/txn_limbo.h > +++ b/src/box/txn_limbo.h > @@ -31,6 +31,7 @@ > */ > #include "small/rlist.h" > #include "vclock/vclock.h" > +#include "latch.h" > > #include > > @@ -147,6 +148,10 @@ struct txn_limbo { > * limbo and raft are in sync and the terms are the same. > */ > uint64_t promote_greatest_term; > + /** > + * To order access to the promote data. > + */ > + struct latch promote_latch; > /** > * Maximal LSN gathered quorum and either already confirmed in WAL, or > * whose confirmation is in progress right now. Any attempt to confirm > @@ -194,6 +199,23 @@ txn_limbo_is_empty(struct txn_limbo *limbo) > return rlist_empty(&limbo->queue); > } > > +/** > + * Test if the \a owner_id is current limbo owner. > + */ > +static inline bool > +txn_limbo_is_owner(struct txn_limbo *limbo, uint32_t owner_id) > +{ > + /* > + * A guard needed to prevent race with in-fly promote > + * packets which are sitting inside journal but not yet > + * written. > + */ > + latch_lock(&limbo->promote_latch); > + bool v = limbo->owner_id == owner_id; > + latch_unlock(&limbo->promote_latch); > + return v; > +} > + > bool > txn_limbo_is_ro(struct txn_limbo *limbo); > > @@ -216,9 +238,12 @@ txn_limbo_last_entry(struct txn_limbo *limbo) > * @a replica_id. > */ > static inline uint64_t > -txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id) > +txn_limbo_replica_term(struct txn_limbo *limbo, uint32_t replica_id) > { > - return vclock_get(&limbo->promote_term_map, replica_id); > + latch_lock(&limbo->promote_latch); > + uint64_t v = vclock_get(&limbo->promote_term_map, replica_id); > + latch_unlock(&limbo->promote_latch); > + return v; > } > > /** > @@ -226,11 +251,14 @@ txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id) > * data from it. The check is only valid when elections are enabled. > */ > static inline bool > -txn_limbo_is_replica_outdated(const struct txn_limbo *limbo, > +txn_limbo_is_replica_outdated(struct txn_limbo *limbo, > uint32_t replica_id) > { > - return txn_limbo_replica_term(limbo, replica_id) < > - limbo->promote_greatest_term; > + latch_lock(&limbo->promote_latch); > + uint64_t v = vclock_get(&limbo->promote_term_map, replica_id); > + bool res = v < limbo->promote_greatest_term; > + latch_unlock(&limbo->promote_latch); > + return res; > } > > /** > @@ -287,7 +315,15 @@ txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry, > * replica with the specified ID. > */ > void > -txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); > +txn_limbo_ack(struct txn_limbo *limbo, uint32_t owner_id, > + uint32_t replica_id, int64_t lsn); > + > +static inline void > +txn_limbo_ack_self(struct txn_limbo *limbo, int64_t lsn) > +{ > + return txn_limbo_ack(limbo, limbo->owner_id, > + limbo->owner_id, lsn); > +} > > /** > * Block the current fiber until the transaction in the limbo > @@ -300,7 +336,35 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); > int > txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); > > -/** Execute a synchronous replication request. */ > +/** > + * Initiate execution of a synchronous replication request. > + */ > +static inline void > +txn_limbo_process_begin(struct txn_limbo *limbo) > +{ > + latch_lock(&limbo->promote_latch); > +} > + > +/** Commit a synchronous replication request. */ > +static inline void > +txn_limbo_process_commit(struct txn_limbo *limbo) > +{ > + latch_unlock(&limbo->promote_latch); > +} > + > +/** Rollback a synchronous replication request. */ > +static inline void > +txn_limbo_process_rollback(struct txn_limbo *limbo) > +{ > + latch_unlock(&limbo->promote_latch); > +} > + > +/** Core of processing synchronous replication request. */ > +void > +txn_limbo_process_core(struct txn_limbo *limbo, > + const struct synchro_request *req); > + > +/** Process a synchronous replication request. */ > void > txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req); > > diff --git a/test/unit/snap_quorum_delay.cc b/test/unit/snap_quorum_delay.cc > index 803bbbea8..d43b4cd2c 100644 > --- a/test/unit/snap_quorum_delay.cc > +++ b/test/unit/snap_quorum_delay.cc > @@ -130,7 +130,7 @@ txn_process_func(va_list ap) > } > > txn_limbo_assign_local_lsn(&txn_limbo, entry, fake_lsn); > - txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, fake_lsn); > + txn_limbo_ack_self(&txn_limbo, fake_lsn); > txn_limbo_wait_complete(&txn_limbo, entry); > > switch (process_type) { > @@ -157,7 +157,8 @@ txn_confirm_func(va_list ap) > * inside gc_checkpoint(). > */ > fiber_sleep(0); > - txn_limbo_ack(&txn_limbo, relay_id, fake_lsn); > + txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, > + relay_id, fake_lsn); > return 0; > } > -- Serge Petrenko