[Tarantool-patches] [PATCH v19 2/3] qsync: order access to the limbo terms
Serge Petrenko
sergepetrenko at tarantool.org
Fri Oct 1 15:14:52 MSK 2021
30.09.2021 12:44, Cyrill Gorcunov пишет:
> Limbo terms tracking is shared between appliers and when
> one of appliers is waiting for write to complete inside
> journal_write() routine, an other may need to access read
> term value to figure out if promote request is valid to
> apply. Due to cooperative multitasking access to the terms
> is not consistent so we need to be sure that other fibers
> read up to date terms (ie written to the WAL).
>
> For this sake we use a latching mechanism, when one fiber
> takes a lock for updating other readers are waiting until
> the operation is complete.
>
> For example here is a call graph of two appliers
>
> applier 1
> ---------
> applier_apply_tx
> (promote term = 3
> current max term = 2)
> applier_synchro_filter_tx
> apply_synchro_row
> journal_write
> (sleeping)
>
> at this moment another applier comes in with obsolete
> data and term 2
>
> applier 2
> ---------
> applier_apply_tx
> (term 2)
> applier_synchro_filter_tx
> txn_limbo_is_replica_outdated -> false
> journal_write (sleep)
>
> applier 1
> ---------
> journal wakes up
> apply_synchro_row_cb
> set max term to 3
>
> So the applier 2 didn't notice that term 3 is already seen
> and wrote obsolete data. With locking the applier 2 will
> wait until applier 1 has finished its write.
>
> We introduce the following helpers:
>
> 1) txn_limbo_process_begin: which takes a lock
> 2) txn_limbo_process_commit and txn_limbo_process_rollback
> which simply release the lock but have different names
> for better semantics
> 3) txn_limbo_process is a general function which uses x_begin
> and x_commit helper internally
> 4) txn_limbo_process_core to do a real job over processing the
> request, it implies that txn_limbo_process_begin been called
>
> Part-of #6036
>
> Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
> ---
> src/box/applier.cc | 12 +++++++---
> src/box/box.cc | 15 ++++++++-----
> src/box/txn_limbo.c | 17 +++++++++++++--
> src/box/txn_limbo.h | 53 ++++++++++++++++++++++++++++++++++++++++-----
> 4 files changed, 80 insertions(+), 17 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index b981bd436..46c36e259 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -857,7 +857,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
> applier_rollback_by_wal_io(entry->res);
> } else {
> replica_txn_wal_write_cb(synchro_entry->rcb);
> - txn_limbo_process(&txn_limbo, synchro_entry->req);
> + txn_limbo_process_core(&txn_limbo, synchro_entry->req);
> trigger_run(&replicaset.applier.on_wal_write, NULL);
> }
> fiber_wakeup(synchro_entry->owner);
> @@ -873,6 +873,8 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
> if (xrow_decode_synchro(row, &req) != 0)
> goto err;
>
> + txn_limbo_process_begin(&txn_limbo);
> +
> struct replica_cb_data rcb_data;
> struct synchro_entry entry;
> /*
> @@ -910,12 +912,16 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
> * transactions side, including the async ones.
> */
> if (journal_write(&entry.base) != 0)
> - goto err;
> + goto err_rollback;
> if (entry.base.res < 0) {
> diag_set_journal_res(entry.base.res);
> - goto err;
> + goto err_rollback;
> }
> + txn_limbo_process_commit(&txn_limbo);
> return 0;
> +
> +err_rollback:
> + txn_limbo_process_rollback(&txn_limbo);
> err:
> diag_log();
> return -1;
> diff --git a/src/box/box.cc b/src/box/box.cc
> index e082e1a3d..6a9be745a 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1677,8 +1677,6 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn)
> struct raft *raft = box_raft();
> assert(raft->volatile_term == raft->term);
> assert(promote_lsn >= 0);
> - txn_limbo_write_promote(&txn_limbo, promote_lsn,
> - raft->term);
> struct synchro_request req = {
> .type = IPROTO_RAFT_PROMOTE,
> .replica_id = prev_leader_id,
> @@ -1686,8 +1684,11 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn)
> .lsn = promote_lsn,
> .term = raft->term,
> };
> - txn_limbo_process(&txn_limbo, &req);
> + txn_limbo_process_begin(&txn_limbo);
> + txn_limbo_write_promote(&txn_limbo, req.lsn, req.term);
> + txn_limbo_process_core(&txn_limbo, &req);
> assert(txn_limbo_is_empty(&txn_limbo));
> + txn_limbo_process_commit(&txn_limbo);
> }
>
> /** A guard to block multiple simultaneous promote()/demote() invocations. */
> @@ -1699,8 +1700,6 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn)
> {
> assert(box_raft()->volatile_term == box_raft()->term);
> assert(promote_lsn >= 0);
> - txn_limbo_write_demote(&txn_limbo, promote_lsn,
> - box_raft()->term);
> struct synchro_request req = {
> .type = IPROTO_RAFT_DEMOTE,
> .replica_id = prev_leader_id,
> @@ -1708,8 +1707,12 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn)
> .lsn = promote_lsn,
> .term = box_raft()->term,
> };
> - txn_limbo_process(&txn_limbo, &req);
> + txn_limbo_process_begin(&txn_limbo);
> + txn_limbo_write_demote(&txn_limbo, promote_lsn,
> + box_raft()->term);
> + txn_limbo_process_core(&txn_limbo, &req);
> assert(txn_limbo_is_empty(&txn_limbo));
> + txn_limbo_process_commit(&txn_limbo);
> }
>
> int
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index 70447caaf..855c98c98 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -47,6 +47,7 @@ txn_limbo_create(struct txn_limbo *limbo)
> vclock_create(&limbo->vclock);
> vclock_create(&limbo->promote_term_map);
> limbo->promote_greatest_term = 0;
> + latch_create(&limbo->promote_latch);
> limbo->confirmed_lsn = 0;
> limbo->rollback_count = 0;
> limbo->is_in_rollback = false;
> @@ -724,11 +725,14 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout)
> }
>
> void
> -txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
> +txn_limbo_process_core(struct txn_limbo *limbo,
> + const struct synchro_request *req)
> {
> + assert(latch_is_locked(&limbo->promote_latch));
> +
> uint64_t term = req->term;
> uint32_t origin = req->origin_id;
> - if (txn_limbo_replica_term(limbo, origin) < term) {
> + if (vclock_get(&limbo->promote_term_map, origin) < (int64_t)term) {
> vclock_follow(&limbo->promote_term_map, origin, term);
> if (term > limbo->promote_greatest_term)
> limbo->promote_greatest_term = term;
> @@ -786,6 +790,15 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
> return;
> }
>
> +void
> +txn_limbo_process(struct txn_limbo *limbo,
> + const struct synchro_request *req)
> +{
> + txn_limbo_process_begin(limbo);
> + txn_limbo_process_core(limbo, req);
> + txn_limbo_process_commit(limbo);
> +}
> +
> void
> txn_limbo_on_parameters_change(struct txn_limbo *limbo)
> {
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 53e52f676..fdb214711 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -31,6 +31,7 @@
> */
> #include "small/rlist.h"
> #include "vclock/vclock.h"
> +#include "latch.h"
>
> #include <stdint.h>
>
> @@ -147,6 +148,10 @@ struct txn_limbo {
> * limbo and raft are in sync and the terms are the same.
> */
> uint64_t promote_greatest_term;
> + /**
> + * To order access to the promote data.
> + */
> + struct latch promote_latch;
> /**
> * Maximal LSN gathered quorum and either already confirmed in WAL, or
> * whose confirmation is in progress right now. Any attempt to confirm
> @@ -216,9 +221,12 @@ txn_limbo_last_entry(struct txn_limbo *limbo)
> * @a replica_id.
> */
> static inline uint64_t
> -txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id)
> +txn_limbo_replica_term(struct txn_limbo *limbo, uint32_t replica_id)
> {
> - return vclock_get(&limbo->promote_term_map, replica_id);
> + latch_lock(&limbo->promote_latch);
> + uint64_t v = vclock_get(&limbo->promote_term_map, replica_id);
> + latch_unlock(&limbo->promote_latch);
> + return v;
> }
>
> /**
> @@ -226,11 +234,14 @@ txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id)
> * data from it. The check is only valid when elections are enabled.
> */
> static inline bool
> -txn_limbo_is_replica_outdated(const struct txn_limbo *limbo,
> +txn_limbo_is_replica_outdated(struct txn_limbo *limbo,
> uint32_t replica_id)
> {
> - return txn_limbo_replica_term(limbo, replica_id) <
> - limbo->promote_greatest_term;
> + latch_lock(&limbo->promote_latch);
> + uint64_t v = vclock_get(&limbo->promote_term_map, replica_id);
> + bool res = v < limbo->promote_greatest_term;
> + latch_unlock(&limbo->promote_latch);
> + return res;
> }
>
> /**
> @@ -300,7 +311,37 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
> int
> txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>
> -/** Execute a synchronous replication request. */
> +/**
> + * Initiate execution of a synchronous replication request.
> + */
> +static inline void
> +txn_limbo_process_begin(struct txn_limbo *limbo)
> +{
> + latch_lock(&limbo->promote_latch);
> +}
> +
> +/** Commit a synchronous replication request. */
> +static inline void
> +txn_limbo_process_commit(struct txn_limbo *limbo)
> +{
> + assert(latch_is_locked(&limbo->promote_latch));
> + latch_unlock(&limbo->promote_latch);
> +}
> +
> +/** Rollback a synchronous replication request. */
> +static inline void
> +txn_limbo_process_rollback(struct txn_limbo *limbo)
> +{
> + assert(latch_is_locked(&limbo->promote_latch));
> + latch_unlock(&limbo->promote_latch);
> +}
> +
> +/** Core of processing synchronous replication request. */
> +void
> +txn_limbo_process_core(struct txn_limbo *limbo,
> + const struct synchro_request *req);
> +
> +/** Process a synchronous replication request. */
> void
> txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
Thanks for the patch!
Mostly ok with one question:
What about txn_limbo_write_confirm/txn_limbo_read_confirm pairs issued
inside txn_limbo_ack() and txn_limbo_on_parameters_change() ?
Shouldn't they take the latch as well? I mean, txn_limbo_ack() and
txn_limbo_on_parameters_change() as a whole.
>
--
Serge Petrenko
More information about the Tarantool-patches
mailing list