Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: Cyrill Gorcunov <gorcunov@gmail.com>,
	tml <tarantool-patches@dev.tarantool.org>
Cc: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Subject: Re: [Tarantool-patches] [PATCH v22 2/3] qsync: order access to the limbo terms
Date: Tue, 12 Oct 2021 12:40:43 +0300	[thread overview]
Message-ID: <8a02db75-ce19-cb8b-1b41-b74184b167fc@tarantool.org> (raw)
In-Reply-To: <20211011191635.573685-3-gorcunov@gmail.com>



11.10.2021 22:16, Cyrill Gorcunov пишет:
> Limbo terms tracking is shared between appliers and when
> one of appliers is waiting for write to complete inside
> journal_write() routine, an other may need to access read
> term value to figure out if promote request is valid to
> apply. Due to cooperative multitasking access to the terms
> is not consistent so we need to be sure that other fibers
> read up to date terms (ie written to the WAL).
>
> For this sake we use a latching mechanism, when one fiber
> takes a lock for updating other readers are waiting until
> the operation is complete.
>
> For example here is a call graph of two appliers

Thanks for the changes!
One final nit and we're good to go.

>
> applier 1
> ---------
> applier_apply_tx
>    (promote term = 3
>     current max term = 2)
>    applier_synchro_filter_tx
>    apply_synchro_row
>      journal_write
>        (sleeping)
>
> at this moment another applier comes in with obsolete
> data and term 2
>
>                                applier 2
>                                ---------
>                                applier_apply_tx
>                                  (term 2)
>                                  applier_synchro_filter_tx
>                                    txn_limbo_is_replica_outdated -> false
>                                  journal_write (sleep)
>
> applier 1
> ---------
> journal wakes up
>    apply_synchro_row_cb
>      set max term to 3
>
> So the applier 2 didn't notice that term 3 is already seen
> and wrote obsolete data. With locking the applier 2 will
> wait until applier 1 has finished its write.
>
> Also Serge Petrenko pointed that we have somewhat similar situation
> with txn_limbo_ack()[we might try to write confirm on entry while
> new promote is in fly and not yet applied, so confirm might be invalid]
> and txn_limbo_on_parameters_change() [where we might confirm entries
> reducing quorum number while we even not a limbo owner]. Thus we need
> to fix these problems as well.
>
> We introduce the following helpers:
>
> 1) txn_limbo_process_begin: which takes a lock
> 2) txn_limbo_process_commit and txn_limbo_process_rollback
>     which simply release the lock but have different names
>     for better semantics
> 3) txn_limbo_process is a general function which uses x_begin
>     and x_commit helper internally
> 4) txn_limbo_process_core to do a real job over processing the
>     request, it implies that txn_limbo_process_begin been called
> 5) txn_limbo_ack() and txn_limbo_on_parameters_change() both
>     respect current limbo owner via promote latch.
>
> Part-of #6036
>
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
> ---
>   src/box/applier.cc             | 12 ++++--
>   src/box/box.cc                 | 15 ++++---
>   src/box/relay.cc               | 11 ++---
>   src/box/txn.c                  |  2 +-
>   src/box/txn_limbo.c            | 49 +++++++++++++++++++--
>   src/box/txn_limbo.h            | 78 +++++++++++++++++++++++++++++++---
>   test/unit/snap_quorum_delay.cc |  5 ++-
>   7 files changed, 142 insertions(+), 30 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index b981bd436..46c36e259 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -857,7 +857,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
>   		applier_rollback_by_wal_io(entry->res);
>   	} else {
>   		replica_txn_wal_write_cb(synchro_entry->rcb);
> -		txn_limbo_process(&txn_limbo, synchro_entry->req);
> +		txn_limbo_process_core(&txn_limbo, synchro_entry->req);
>   		trigger_run(&replicaset.applier.on_wal_write, NULL);
>   	}
>   	fiber_wakeup(synchro_entry->owner);
> @@ -873,6 +873,8 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
>   	if (xrow_decode_synchro(row, &req) != 0)
>   		goto err;
>   
> +	txn_limbo_process_begin(&txn_limbo);
> +
>   	struct replica_cb_data rcb_data;
>   	struct synchro_entry entry;
>   	/*
> @@ -910,12 +912,16 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
>   	 * transactions side, including the async ones.
>   	 */
>   	if (journal_write(&entry.base) != 0)
> -		goto err;
> +		goto err_rollback;
>   	if (entry.base.res < 0) {
>   		diag_set_journal_res(entry.base.res);
> -		goto err;
> +		goto err_rollback;
>   	}
> +	txn_limbo_process_commit(&txn_limbo);
>   	return 0;
> +
> +err_rollback:
> +	txn_limbo_process_rollback(&txn_limbo);
>   err:
>   	diag_log();
>   	return -1;
> diff --git a/src/box/box.cc b/src/box/box.cc
> index e082e1a3d..6a9be745a 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1677,8 +1677,6 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn)
>   	struct raft *raft = box_raft();
>   	assert(raft->volatile_term == raft->term);
>   	assert(promote_lsn >= 0);
> -	txn_limbo_write_promote(&txn_limbo, promote_lsn,
> -				raft->term);
>   	struct synchro_request req = {
>   		.type = IPROTO_RAFT_PROMOTE,
>   		.replica_id = prev_leader_id,
> @@ -1686,8 +1684,11 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn)
>   		.lsn = promote_lsn,
>   		.term = raft->term,
>   	};
> -	txn_limbo_process(&txn_limbo, &req);
> +	txn_limbo_process_begin(&txn_limbo);
> +	txn_limbo_write_promote(&txn_limbo, req.lsn, req.term);
> +	txn_limbo_process_core(&txn_limbo, &req);
>   	assert(txn_limbo_is_empty(&txn_limbo));
> +	txn_limbo_process_commit(&txn_limbo);
>   }
>   
>   /** A guard to block multiple simultaneous promote()/demote() invocations. */
> @@ -1699,8 +1700,6 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn)
>   {
>   	assert(box_raft()->volatile_term == box_raft()->term);
>   	assert(promote_lsn >= 0);
> -	txn_limbo_write_demote(&txn_limbo, promote_lsn,
> -				box_raft()->term);
>   	struct synchro_request req = {
>   		.type = IPROTO_RAFT_DEMOTE,
>   		.replica_id = prev_leader_id,
> @@ -1708,8 +1707,12 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn)
>   		.lsn = promote_lsn,
>   		.term = box_raft()->term,
>   	};
> -	txn_limbo_process(&txn_limbo, &req);
> +	txn_limbo_process_begin(&txn_limbo);
> +	txn_limbo_write_demote(&txn_limbo, promote_lsn,
> +				box_raft()->term);
> +	txn_limbo_process_core(&txn_limbo, &req);
>   	assert(txn_limbo_is_empty(&txn_limbo));
> +	txn_limbo_process_commit(&txn_limbo);
>   }
>   
>   int
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index f5852df7b..61ef1e3a5 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -545,15 +545,10 @@ tx_status_update(struct cmsg *msg)
>   	ack.vclock = &status->vclock;
>   	/*
>   	 * Let pending synchronous transactions know, which of
> -	 * them were successfully sent to the replica. Acks are
> -	 * collected only by the transactions originator (which is
> -	 * the single master in 100% so far). Other instances wait
> -	 * for master's CONFIRM message instead.
> +	 * them were successfully sent to the replica.
>   	 */
> -	if (txn_limbo.owner_id == instance_id) {
> -		txn_limbo_ack(&txn_limbo, ack.source,
> -			      vclock_get(ack.vclock, instance_id));
> -	}
> +	txn_limbo_ack(&txn_limbo, instance_id, ack.source,
> +		      vclock_get(ack.vclock, instance_id));
>   	trigger_run(&replicaset.on_ack, &ack);
>   
>   	static const struct cmsg_hop route[] = {
> diff --git a/src/box/txn.c b/src/box/txn.c
> index e7fc81683..06bb85a09 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -939,7 +939,7 @@ txn_commit(struct txn *txn)
>   			txn_limbo_assign_local_lsn(&txn_limbo, limbo_entry,
>   						   lsn);
>   			/* Local WAL write is a first 'ACK'. */
> -			txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, lsn);
> +			txn_limbo_ack_self(&txn_limbo, lsn);
>   		}
>   		if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0)
>   			goto rollback;
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index 70447caaf..8f9bc11c7 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -47,6 +47,7 @@ txn_limbo_create(struct txn_limbo *limbo)
>   	vclock_create(&limbo->vclock);
>   	vclock_create(&limbo->promote_term_map);
>   	limbo->promote_greatest_term = 0;
> +	latch_create(&limbo->promote_latch);
>   	limbo->confirmed_lsn = 0;
>   	limbo->rollback_count = 0;
>   	limbo->is_in_rollback = false;
> @@ -542,10 +543,30 @@ txn_limbo_read_demote(struct txn_limbo *limbo, int64_t lsn)
>   }
>   
>   void
> -txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
> +txn_limbo_ack(struct txn_limbo *limbo, uint32_t owner_id,
> +	      uint32_t replica_id, int64_t lsn)
>   {
> +	/*
> +	 * ACKs are collected only by the transactions originator
> +	 * (which is the single master in 100% so far). Other instances
> +	 * wait for master's CONFIRM message instead.
> +	 *
> +	 * Due to cooperative multitasking there might be limbo owner
> +	 * migration in-fly (while writing data to journal), so for
> +	 * simplicity sake the test for owner is done here instead
> +	 * of putting this check to the callers.
> +	 */
> +	if (!txn_limbo_is_owner(limbo, owner_id))
> +		return;
> +
> +	/*
> +	 * Test for empty queue is done _after_ txn_limbo_is_owner
> +	 * call because we need to be sure that limbo is not been
> +	 * changed under our feets while we're reading it.

feets -> feet.


> +	 */
>   	if (rlist_empty(&limbo->queue))
>   		return;
> +
>   	/*
>   	 * If limbo is currently writing a rollback, it means that the whole
>   	 * queue will be rolled back. Because rollback is written only for
> @@ -724,11 +745,14 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout)
>   }
>   
>   void
> -txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
> +txn_limbo_process_core(struct txn_limbo *limbo,
> +		       const struct synchro_request *req)
>   {
> +	assert(latch_is_locked(&limbo->promote_latch));
> +
>   	uint64_t term = req->term;
>   	uint32_t origin = req->origin_id;
> -	if (txn_limbo_replica_term(limbo, origin) < term) {
> +	if (vclock_get(&limbo->promote_term_map, origin) < (int64_t)term) {
>   		vclock_follow(&limbo->promote_term_map, origin, term);
>   		if (term > limbo->promote_greatest_term)
>   			limbo->promote_greatest_term = term;
> @@ -786,11 +810,30 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
>   	return;
>   }
>   
> +void
> +txn_limbo_process(struct txn_limbo *limbo,
> +		  const struct synchro_request *req)
> +{
> +	txn_limbo_process_begin(limbo);
> +	txn_limbo_process_core(limbo, req);
> +	txn_limbo_process_commit(limbo);
> +}
> +
>   void
>   txn_limbo_on_parameters_change(struct txn_limbo *limbo)
>   {
> +	/*
> +	 * In case if we're not current leader (ie not owning the
> +	 * limbo) then we should not confirm anything, otherwise
> +	 * we could reduce quorum number and start writing CONFIRM
> +	 * while leader node carries own maybe bigger quorum value.
> +	 */
> +	if (!txn_limbo_is_owner(limbo, instance_id))
> +		return;
> +
>   	if (rlist_empty(&limbo->queue))
>   		return;
> +
>   	struct txn_limbo_entry *e;
>   	int64_t confirm_lsn = -1;
>   	rlist_foreach_entry(e, &limbo->queue, in_queue) {
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 53e52f676..33cacef8f 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -31,6 +31,7 @@
>    */
>   #include "small/rlist.h"
>   #include "vclock/vclock.h"
> +#include "latch.h"
>   
>   #include <stdint.h>
>   
> @@ -147,6 +148,10 @@ struct txn_limbo {
>   	 * limbo and raft are in sync and the terms are the same.
>   	 */
>   	uint64_t promote_greatest_term;
> +	/**
> +	 * To order access to the promote data.
> +	 */
> +	struct latch promote_latch;
>   	/**
>   	 * Maximal LSN gathered quorum and either already confirmed in WAL, or
>   	 * whose confirmation is in progress right now. Any attempt to confirm
> @@ -194,6 +199,23 @@ txn_limbo_is_empty(struct txn_limbo *limbo)
>   	return rlist_empty(&limbo->queue);
>   }
>   
> +/**
> + * Test if the \a owner_id is current limbo owner.
> + */
> +static inline bool
> +txn_limbo_is_owner(struct txn_limbo *limbo, uint32_t owner_id)
> +{
> +	/*
> +	 * A guard needed to prevent race with in-fly promote
> +	 * packets which are sitting inside journal but not yet
> +	 * written.
> +	 */
> +	latch_lock(&limbo->promote_latch);
> +	bool v = limbo->owner_id == owner_id;
> +	latch_unlock(&limbo->promote_latch);
> +	return v;
> +}
> +
>   bool
>   txn_limbo_is_ro(struct txn_limbo *limbo);
>   
> @@ -216,9 +238,12 @@ txn_limbo_last_entry(struct txn_limbo *limbo)
>    * @a replica_id.
>    */
>   static inline uint64_t
> -txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id)
> +txn_limbo_replica_term(struct txn_limbo *limbo, uint32_t replica_id)
>   {
> -	return vclock_get(&limbo->promote_term_map, replica_id);
> +	latch_lock(&limbo->promote_latch);
> +	uint64_t v = vclock_get(&limbo->promote_term_map, replica_id);
> +	latch_unlock(&limbo->promote_latch);
> +	return v;
>   }
>   
>   /**
> @@ -226,11 +251,14 @@ txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id)
>    * data from it. The check is only valid when elections are enabled.
>    */
>   static inline bool
> -txn_limbo_is_replica_outdated(const struct txn_limbo *limbo,
> +txn_limbo_is_replica_outdated(struct txn_limbo *limbo,
>   			      uint32_t replica_id)
>   {
> -	return txn_limbo_replica_term(limbo, replica_id) <
> -	       limbo->promote_greatest_term;
> +	latch_lock(&limbo->promote_latch);
> +	uint64_t v = vclock_get(&limbo->promote_term_map, replica_id);
> +	bool res = v < limbo->promote_greatest_term;
> +	latch_unlock(&limbo->promote_latch);
> +	return res;
>   }
>   
>   /**
> @@ -287,7 +315,15 @@ txn_limbo_assign_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry,
>    * replica with the specified ID.
>    */
>   void
> -txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
> +txn_limbo_ack(struct txn_limbo *limbo, uint32_t owner_id,
> +	      uint32_t replica_id, int64_t lsn);
> +
> +static inline void
> +txn_limbo_ack_self(struct txn_limbo *limbo, int64_t lsn)
> +{
> +	return txn_limbo_ack(limbo, limbo->owner_id,
> +			     limbo->owner_id, lsn);
> +}
>   
>   /**
>    * Block the current fiber until the transaction in the limbo
> @@ -300,7 +336,35 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
>   int
>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>   
> -/** Execute a synchronous replication request. */
> +/**
> + * Initiate execution of a synchronous replication request.
> + */
> +static inline void
> +txn_limbo_process_begin(struct txn_limbo *limbo)
> +{
> +	latch_lock(&limbo->promote_latch);
> +}
> +
> +/** Commit a synchronous replication request. */
> +static inline void
> +txn_limbo_process_commit(struct txn_limbo *limbo)
> +{
> +	latch_unlock(&limbo->promote_latch);
> +}
> +
> +/** Rollback a synchronous replication request. */
> +static inline void
> +txn_limbo_process_rollback(struct txn_limbo *limbo)
> +{
> +	latch_unlock(&limbo->promote_latch);
> +}
> +
> +/** Core of processing synchronous replication request. */
> +void
> +txn_limbo_process_core(struct txn_limbo *limbo,
> +		       const struct synchro_request *req);
> +
> +/** Process a synchronous replication request. */
>   void
>   txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
>   
> diff --git a/test/unit/snap_quorum_delay.cc b/test/unit/snap_quorum_delay.cc
> index 803bbbea8..d43b4cd2c 100644
> --- a/test/unit/snap_quorum_delay.cc
> +++ b/test/unit/snap_quorum_delay.cc
> @@ -130,7 +130,7 @@ txn_process_func(va_list ap)
>   	}
>   
>   	txn_limbo_assign_local_lsn(&txn_limbo, entry, fake_lsn);
> -	txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, fake_lsn);
> +	txn_limbo_ack_self(&txn_limbo, fake_lsn);
>   	txn_limbo_wait_complete(&txn_limbo, entry);
>   
>   	switch (process_type) {
> @@ -157,7 +157,8 @@ txn_confirm_func(va_list ap)
>   	 * inside gc_checkpoint().
>   	 */
>   	fiber_sleep(0);
> -	txn_limbo_ack(&txn_limbo, relay_id, fake_lsn);
> +	txn_limbo_ack(&txn_limbo, txn_limbo.owner_id,
> +		      relay_id, fake_lsn);
>   	return 0;
>   }
>   

-- 
Serge Petrenko


  reply	other threads:[~2021-10-12  9:40 UTC|newest]

Thread overview: 10+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-10-11 19:16 [Tarantool-patches] [PATCH v22 0/3] qsync: implement packet filtering (part 1) Cyrill Gorcunov via Tarantool-patches
2021-10-11 19:16 ` [Tarantool-patches] [PATCH v22 1/3] latch: add latch_is_locked helper Cyrill Gorcunov via Tarantool-patches
2021-10-11 19:16 ` [Tarantool-patches] [PATCH v22 2/3] qsync: order access to the limbo terms Cyrill Gorcunov via Tarantool-patches
2021-10-12  9:40   ` Serge Petrenko via Tarantool-patches [this message]
2021-10-12 20:26     ` Cyrill Gorcunov via Tarantool-patches
2021-10-13  7:56       ` Serge Petrenko via Tarantool-patches
2021-10-11 19:16 ` [Tarantool-patches] [PATCH v22 3/3] test: add gh-6036-qsync-order test Cyrill Gorcunov via Tarantool-patches
2021-10-12  9:47   ` Serge Petrenko via Tarantool-patches
2021-10-12 20:28     ` Cyrill Gorcunov via Tarantool-patches
2021-10-13  8:20       ` Serge Petrenko via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=8a02db75-ce19-cb8b-1b41-b74184b167fc@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=sergepetrenko@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v22 2/3] qsync: order access to the limbo terms' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox