From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: Cyrill Gorcunov <gorcunov@gmail.com>, tml <tarantool-patches@dev.tarantool.org> Cc: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> Subject: Re: [Tarantool-patches] [PATCH v19 2/3] qsync: order access to the limbo terms Date: Fri, 1 Oct 2021 15:14:52 +0300 [thread overview] Message-ID: <0c64d172-4fa8-29ec-7845-ff772738c09a@tarantool.org> (raw) In-Reply-To: <20210930094445.316694-3-gorcunov@gmail.com> 30.09.2021 12:44, Cyrill Gorcunov пишет: > Limbo terms tracking is shared between appliers and when > one of appliers is waiting for write to complete inside > journal_write() routine, an other may need to access read > term value to figure out if promote request is valid to > apply. Due to cooperative multitasking access to the terms > is not consistent so we need to be sure that other fibers > read up to date terms (ie written to the WAL). > > For this sake we use a latching mechanism, when one fiber > takes a lock for updating other readers are waiting until > the operation is complete. > > For example here is a call graph of two appliers > > applier 1 > --------- > applier_apply_tx > (promote term = 3 > current max term = 2) > applier_synchro_filter_tx > apply_synchro_row > journal_write > (sleeping) > > at this moment another applier comes in with obsolete > data and term 2 > > applier 2 > --------- > applier_apply_tx > (term 2) > applier_synchro_filter_tx > txn_limbo_is_replica_outdated -> false > journal_write (sleep) > > applier 1 > --------- > journal wakes up > apply_synchro_row_cb > set max term to 3 > > So the applier 2 didn't notice that term 3 is already seen > and wrote obsolete data. With locking the applier 2 will > wait until applier 1 has finished its write. > > We introduce the following helpers: > > 1) txn_limbo_process_begin: which takes a lock > 2) txn_limbo_process_commit and txn_limbo_process_rollback > which simply release the lock but have different names > for better semantics > 3) txn_limbo_process is a general function which uses x_begin > and x_commit helper internally > 4) txn_limbo_process_core to do a real job over processing the > request, it implies that txn_limbo_process_begin been called > > Part-of #6036 > > Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com> > --- > src/box/applier.cc | 12 +++++++--- > src/box/box.cc | 15 ++++++++----- > src/box/txn_limbo.c | 17 +++++++++++++-- > src/box/txn_limbo.h | 53 ++++++++++++++++++++++++++++++++++++++++----- > 4 files changed, 80 insertions(+), 17 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index b981bd436..46c36e259 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -857,7 +857,7 @@ apply_synchro_row_cb(struct journal_entry *entry) > applier_rollback_by_wal_io(entry->res); > } else { > replica_txn_wal_write_cb(synchro_entry->rcb); > - txn_limbo_process(&txn_limbo, synchro_entry->req); > + txn_limbo_process_core(&txn_limbo, synchro_entry->req); > trigger_run(&replicaset.applier.on_wal_write, NULL); > } > fiber_wakeup(synchro_entry->owner); > @@ -873,6 +873,8 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row) > if (xrow_decode_synchro(row, &req) != 0) > goto err; > > + txn_limbo_process_begin(&txn_limbo); > + > struct replica_cb_data rcb_data; > struct synchro_entry entry; > /* > @@ -910,12 +912,16 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row) > * transactions side, including the async ones. > */ > if (journal_write(&entry.base) != 0) > - goto err; > + goto err_rollback; > if (entry.base.res < 0) { > diag_set_journal_res(entry.base.res); > - goto err; > + goto err_rollback; > } > + txn_limbo_process_commit(&txn_limbo); > return 0; > + > +err_rollback: > + txn_limbo_process_rollback(&txn_limbo); > err: > diag_log(); > return -1; > diff --git a/src/box/box.cc b/src/box/box.cc > index e082e1a3d..6a9be745a 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -1677,8 +1677,6 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn) > struct raft *raft = box_raft(); > assert(raft->volatile_term == raft->term); > assert(promote_lsn >= 0); > - txn_limbo_write_promote(&txn_limbo, promote_lsn, > - raft->term); > struct synchro_request req = { > .type = IPROTO_RAFT_PROMOTE, > .replica_id = prev_leader_id, > @@ -1686,8 +1684,11 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn) > .lsn = promote_lsn, > .term = raft->term, > }; > - txn_limbo_process(&txn_limbo, &req); > + txn_limbo_process_begin(&txn_limbo); > + txn_limbo_write_promote(&txn_limbo, req.lsn, req.term); > + txn_limbo_process_core(&txn_limbo, &req); > assert(txn_limbo_is_empty(&txn_limbo)); > + txn_limbo_process_commit(&txn_limbo); > } > > /** A guard to block multiple simultaneous promote()/demote() invocations. */ > @@ -1699,8 +1700,6 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn) > { > assert(box_raft()->volatile_term == box_raft()->term); > assert(promote_lsn >= 0); > - txn_limbo_write_demote(&txn_limbo, promote_lsn, > - box_raft()->term); > struct synchro_request req = { > .type = IPROTO_RAFT_DEMOTE, > .replica_id = prev_leader_id, > @@ -1708,8 +1707,12 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn) > .lsn = promote_lsn, > .term = box_raft()->term, > }; > - txn_limbo_process(&txn_limbo, &req); > + txn_limbo_process_begin(&txn_limbo); > + txn_limbo_write_demote(&txn_limbo, promote_lsn, > + box_raft()->term); > + txn_limbo_process_core(&txn_limbo, &req); > assert(txn_limbo_is_empty(&txn_limbo)); > + txn_limbo_process_commit(&txn_limbo); > } > > int > diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c > index 70447caaf..855c98c98 100644 > --- a/src/box/txn_limbo.c > +++ b/src/box/txn_limbo.c > @@ -47,6 +47,7 @@ txn_limbo_create(struct txn_limbo *limbo) > vclock_create(&limbo->vclock); > vclock_create(&limbo->promote_term_map); > limbo->promote_greatest_term = 0; > + latch_create(&limbo->promote_latch); > limbo->confirmed_lsn = 0; > limbo->rollback_count = 0; > limbo->is_in_rollback = false; > @@ -724,11 +725,14 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout) > } > > void > -txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) > +txn_limbo_process_core(struct txn_limbo *limbo, > + const struct synchro_request *req) > { > + assert(latch_is_locked(&limbo->promote_latch)); > + > uint64_t term = req->term; > uint32_t origin = req->origin_id; > - if (txn_limbo_replica_term(limbo, origin) < term) { > + if (vclock_get(&limbo->promote_term_map, origin) < (int64_t)term) { > vclock_follow(&limbo->promote_term_map, origin, term); > if (term > limbo->promote_greatest_term) > limbo->promote_greatest_term = term; > @@ -786,6 +790,15 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) > return; > } > > +void > +txn_limbo_process(struct txn_limbo *limbo, > + const struct synchro_request *req) > +{ > + txn_limbo_process_begin(limbo); > + txn_limbo_process_core(limbo, req); > + txn_limbo_process_commit(limbo); > +} > + > void > txn_limbo_on_parameters_change(struct txn_limbo *limbo) > { > diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h > index 53e52f676..fdb214711 100644 > --- a/src/box/txn_limbo.h > +++ b/src/box/txn_limbo.h > @@ -31,6 +31,7 @@ > */ > #include "small/rlist.h" > #include "vclock/vclock.h" > +#include "latch.h" > > #include <stdint.h> > > @@ -147,6 +148,10 @@ struct txn_limbo { > * limbo and raft are in sync and the terms are the same. > */ > uint64_t promote_greatest_term; > + /** > + * To order access to the promote data. > + */ > + struct latch promote_latch; > /** > * Maximal LSN gathered quorum and either already confirmed in WAL, or > * whose confirmation is in progress right now. Any attempt to confirm > @@ -216,9 +221,12 @@ txn_limbo_last_entry(struct txn_limbo *limbo) > * @a replica_id. > */ > static inline uint64_t > -txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id) > +txn_limbo_replica_term(struct txn_limbo *limbo, uint32_t replica_id) > { > - return vclock_get(&limbo->promote_term_map, replica_id); > + latch_lock(&limbo->promote_latch); > + uint64_t v = vclock_get(&limbo->promote_term_map, replica_id); > + latch_unlock(&limbo->promote_latch); > + return v; > } > > /** > @@ -226,11 +234,14 @@ txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id) > * data from it. The check is only valid when elections are enabled. > */ > static inline bool > -txn_limbo_is_replica_outdated(const struct txn_limbo *limbo, > +txn_limbo_is_replica_outdated(struct txn_limbo *limbo, > uint32_t replica_id) > { > - return txn_limbo_replica_term(limbo, replica_id) < > - limbo->promote_greatest_term; > + latch_lock(&limbo->promote_latch); > + uint64_t v = vclock_get(&limbo->promote_term_map, replica_id); > + bool res = v < limbo->promote_greatest_term; > + latch_unlock(&limbo->promote_latch); > + return res; > } > > /** > @@ -300,7 +311,37 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); > int > txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); > > -/** Execute a synchronous replication request. */ > +/** > + * Initiate execution of a synchronous replication request. > + */ > +static inline void > +txn_limbo_process_begin(struct txn_limbo *limbo) > +{ > + latch_lock(&limbo->promote_latch); > +} > + > +/** Commit a synchronous replication request. */ > +static inline void > +txn_limbo_process_commit(struct txn_limbo *limbo) > +{ > + assert(latch_is_locked(&limbo->promote_latch)); > + latch_unlock(&limbo->promote_latch); > +} > + > +/** Rollback a synchronous replication request. */ > +static inline void > +txn_limbo_process_rollback(struct txn_limbo *limbo) > +{ > + assert(latch_is_locked(&limbo->promote_latch)); > + latch_unlock(&limbo->promote_latch); > +} > + > +/** Core of processing synchronous replication request. */ > +void > +txn_limbo_process_core(struct txn_limbo *limbo, > + const struct synchro_request *req); > + > +/** Process a synchronous replication request. */ > void > txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req); Thanks for the patch! Mostly ok with one question: What about txn_limbo_write_confirm/txn_limbo_read_confirm pairs issued inside txn_limbo_ack() and txn_limbo_on_parameters_change() ? Shouldn't they take the latch as well? I mean, txn_limbo_ack() and txn_limbo_on_parameters_change() as a whole. > -- Serge Petrenko
next prev parent reply other threads:[~2021-10-01 12:14 UTC|newest] Thread overview: 16+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-09-30 9:44 [Tarantool-patches] [PATCH v19 0/3] qsync: implement packet filtering (part 1) Cyrill Gorcunov via Tarantool-patches 2021-09-30 9:44 ` [Tarantool-patches] [PATCH v19 1/3] latch: add latch_is_locked helper Cyrill Gorcunov via Tarantool-patches 2021-09-30 9:44 ` [Tarantool-patches] [PATCH v19 2/3] qsync: order access to the limbo terms Cyrill Gorcunov via Tarantool-patches 2021-10-01 12:14 ` Serge Petrenko via Tarantool-patches [this message] 2021-10-01 12:31 ` Cyrill Gorcunov via Tarantool-patches 2021-10-01 12:37 ` Serge Petrenko via Tarantool-patches 2021-10-04 21:53 ` Cyrill Gorcunov via Tarantool-patches 2021-10-05 13:25 ` Serge Petrenko via Tarantool-patches 2021-10-05 21:52 ` Cyrill Gorcunov via Tarantool-patches 2021-09-30 9:44 ` [Tarantool-patches] [PATCH v19 3/3] test: add gh-6036-qsync-order test Cyrill Gorcunov via Tarantool-patches 2021-10-01 12:30 ` Serge Petrenko via Tarantool-patches 2021-10-04 21:16 ` Cyrill Gorcunov via Tarantool-patches 2021-10-05 13:55 ` Serge Petrenko via Tarantool-patches 2021-10-05 22:26 ` Cyrill Gorcunov via Tarantool-patches 2021-10-05 22:32 ` Cyrill Gorcunov via Tarantool-patches 2021-10-06 7:06 ` Cyrill Gorcunov via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=0c64d172-4fa8-29ec-7845-ff772738c09a@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=gorcunov@gmail.com \ --cc=sergepetrenko@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v19 2/3] qsync: order access to the limbo terms' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox