[Tarantool-patches] [RFC v5 4/5] limbo: order access to the promote terms

Serge Petrenko sergepetrenko at tarantool.org
Thu Jul 15 14:48:48 MSK 2021



15.07.2021 00:23, Cyrill Gorcunov пишет:
> Promote terms tracking is shared between appliers and when
> one of appliers is waiting for write to complete inside
> journal_write() routine, an other may need to access read
> term value to figure out if promote request is valid to
> apply. Due to cooperative multitasking access to the terms
> is not consistent so we need to be sure that other fibers
> either read up to date terms (ie written to the WAL).
>
> For this sake we use latching mechanism, when one fiber
> took promote-lock for terms updating other readers are
> waiting until update is complete.
>
> For example here is a call graph of two appliers
>
> applier 1
> ---------
> applier_apply_tx
>    (promote term = 3
>     current max term = 2)
>    applier_synchro_filter_tx
>    apply_synchro_row
>      journal_write
>        (sleeping)
>
> at this moment another applier comes in with obsolete
> data and term 2
>
>                                applier 2
>                                ---------
>                                applier_apply_tx
>                                  (term 2)
>                                  applier_synchro_filter_tx
>                                    txn_limbo_is_replica_outdated -> false
>                                  journal_write (sleep)
>
> applier 1
> ---------
> journal wakes up
>    apply_synchro_row_cb
>      set max term to 3
>
> So the applier 2 didn't notice that term 3 is already seen
> and wrote obsolete data. With locking the applier 2 will
> wait until applier 1 has finished its write.
>
> Part-of #6036
>
> Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
> ---
>   src/box/applier.cc  | 10 +++++---
>   src/box/box.cc      |  3 +--
>   src/box/txn_limbo.c | 18 ++++++++++++--
>   src/box/txn_limbo.h | 59 +++++++++++++++++++++++++++++++++++++++++----
>   4 files changed, 78 insertions(+), 12 deletions(-)

Thanks for the patch!

I think all panic_on(!cond) invocations may be replaced with assert(cond)

Even if we use some functions incorrectly, this may be catched by an 
assertion.

Other than that looks good so far.
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 978383e64..838aa372d 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -854,7 +854,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
>   		applier_rollback_by_wal_io(entry->res);
>   	} else {
>   		replica_txn_wal_write_cb(synchro_entry->rcb);
> -		txn_limbo_process(&txn_limbo, synchro_entry->req);
> +		txn_limbo_process_locked(&txn_limbo, synchro_entry->req);
>   		trigger_run(&replicaset.applier.on_wal_write, NULL);
>   	}
>   	fiber_wakeup(synchro_entry->owner);
> @@ -870,6 +870,7 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
>   	if (xrow_decode_synchro(row, &req) != 0)
>   		goto err;
>   
> +	txn_limbo_promote_lock(&txn_limbo);
>   	struct replica_cb_data rcb_data;
>   	struct synchro_entry entry;
>   	/*
> @@ -907,12 +908,15 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
>   	 * transactions side, including the async ones.
>   	 */
>   	if (journal_write(&entry.base) != 0)
> -		goto err;
> +		goto err_unlock;
>   	if (entry.base.res < 0) {
>   		diag_set_journal_res(entry.base.res);
> -		goto err;
> +		goto err_unlock;
>   	}
> +	txn_limbo_promote_unlock(&txn_limbo);
>   	return 0;
> +err_unlock:
> +	txn_limbo_promote_unlock(&txn_limbo);
>   err:
>   	diag_log();
>   	return -1;
> diff --git a/src/box/box.cc b/src/box/box.cc
> index d211589b5..8b0f9859e 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1566,10 +1566,9 @@ box_clear_synchro_queue(bool demote)
>   		 * (synchronous replication and leader election are in sync, and
>   		 * both chose this node as a leader).
>   		 */
> -		if (!demote && txn_limbo_replica_term(&txn_limbo, instance_id) ==
> +		if (!demote && txn_limbo_term(&txn_limbo, instance_id) ==
>   		    box_raft()->term)
>   			return 0;
> -
>   		break;
>   	default:
>   		unreachable();
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index 957fe0d1e..d24df3606 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -40,6 +40,7 @@ struct txn_limbo txn_limbo;
>   static void
>   txn_limbo_promote_create(struct txn_limbo_promote *pmt)
>   {
> +	latch_create(&pmt->latch);
>   	vclock_create(&pmt->terms_map);
>   	pmt->terms_max = 0;
>   }
> @@ -731,12 +732,17 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout)
>   }
>   
>   void
> -txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
> +txn_limbo_process_locked(struct txn_limbo *limbo,
> +			 const struct synchro_request *req)
>   {
>   	struct txn_limbo_promote *pmt = &limbo->promote;
>   	uint64_t term = req->term;
>   	uint32_t origin = req->origin_id;
> -	if (txn_limbo_replica_term(limbo, origin) < term) {
> +
> +	panic_on(!txn_limbo_promote_is_locked(limbo),
> +		 "limbo: unlocked processing of a request");
> +
> +	if (txn_limbo_term_locked(limbo, origin) < term) {
>   		vclock_follow(&pmt->terms_map, origin, term);
>   		if (term > pmt->terms_max)
>   			pmt->terms_max = term;
> @@ -794,6 +800,14 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
>   	return;
>   }
>   
> +void
> +txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
> +{
> +	txn_limbo_promote_lock(limbo);
> +	txn_limbo_process_locked(limbo, req);
> +	txn_limbo_promote_unlock(limbo);
> +}
> +
>   void
>   txn_limbo_on_parameters_change(struct txn_limbo *limbo)
>   {
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 70a5fbfd5..a2595bcff 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -31,6 +31,7 @@
>    */
>   #include "small/rlist.h"
>   #include "vclock/vclock.h"
> +#include "latch.h"
>   
>   #include <stdint.h>
>   
> @@ -80,6 +81,10 @@ txn_limbo_entry_is_complete(const struct txn_limbo_entry *e)
>    * situation and other errors.
>    */
>   struct txn_limbo_promote {
> +	/**
> +	 * To order access to the promote data.
> +	 */
> +	struct latch latch;
>   	/**
>   	 * Latest terms received with PROMOTE entries from remote instances.
>   	 * Limbo uses them to filter out the transactions coming not from the
> @@ -222,15 +227,52 @@ txn_limbo_last_entry(struct txn_limbo *limbo)
>   				in_queue);
>   }
>   
> +/** Lock promote data. */
> +static inline void
> +txn_limbo_promote_lock(struct txn_limbo *limbo)
> +{
> +	struct txn_limbo_promote *pmt = &limbo->promote;
> +	latch_lock(&pmt->latch);
> +}
> +
> +/** Unlock promote data. */
> +static void
> +txn_limbo_promote_unlock(struct txn_limbo *limbo)
> +{
> +	struct txn_limbo_promote *pmt = &limbo->promote;
> +	latch_unlock(&pmt->latch);
> +}
> +
> +/** Test if promote data is locked. */
> +static inline bool
> +txn_limbo_promote_is_locked(struct txn_limbo *limbo)
> +{
> +	const struct txn_limbo_promote *pmt = &limbo->promote;
> +	return latch_is_locked(&pmt->latch);
> +}
> +
> +/** Fetch replica's term with lock taken. */
> +static inline uint64_t
> +txn_limbo_term_locked(struct txn_limbo *limbo, uint32_t replica_id)
> +{
> +	struct txn_limbo_promote *pmt = &limbo->promote;
> +	panic_on(!txn_limbo_promote_is_locked(limbo),
> +		 "limbo: unlocked term read for replica %u",
> +		 replica_id);
> +	return vclock_get(&pmt->terms_map, replica_id);
> +}
> +
>   /**
>    * Return the latest term as seen in PROMOTE requests from instance with id
>    * @a replica_id.
>    */
>   static inline uint64_t
> -txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id)
> +txn_limbo_term(struct txn_limbo *limbo, uint32_t replica_id)
>   {
> -	const struct txn_limbo_promote *pmt = &limbo->promote;
> -	return vclock_get(&pmt->terms_map, replica_id);
> +	txn_limbo_promote_lock(limbo);
> +	uint64_t v = txn_limbo_term_locked(limbo, replica_id);
> +	txn_limbo_promote_unlock(limbo);
> +	return v;
>   }
>   
>   /**
> @@ -238,12 +280,15 @@ txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id)
>    * data from it. The check is only valid when elections are enabled.
>    */
>   static inline bool
> -txn_limbo_is_replica_outdated(const struct txn_limbo *limbo,
> +txn_limbo_is_replica_outdated(struct txn_limbo *limbo,
>   			      uint32_t replica_id)
>   {
>   	const struct txn_limbo_promote *pmt = &limbo->promote;
> -	return txn_limbo_replica_term(limbo, replica_id) <
> +	txn_limbo_promote_lock(limbo);
> +	bool res = txn_limbo_term_locked(limbo, replica_id) <
>   	       pmt->terms_max;
> +	txn_limbo_promote_unlock(limbo);
> +	return res;
>   }
>   
>   /**
> @@ -317,6 +362,10 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>   void
>   txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
>   
> +void
> +txn_limbo_process_locked(struct txn_limbo *limbo,
> +			 const struct synchro_request *req);
> +
>   /**
>    * Waiting for confirmation of all "sync" transactions
>    * during confirm timeout or fail.

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list