[Tarantool-patches] [PATCH v9 4/5] limbo: filter incoming synchro requests

Serge Petrenko sergepetrenko at tarantool.org
Tue Aug 3 13:51:44 MSK 2021



30.07.2021 14:35, Cyrill Gorcunov пишет:
> When we receive synchro requests we can't just apply
> them blindly because in worse case they may come from
> split-brain configuration (where a cluster splitted into
> several subclusters and each one has own leader elected,
> then subclisters are trying to merge back into original
> cluster). We need to do our best to detect such configs
> and force these nodes to rejoin from the scratch for
> data consistency sake.
>
> Thus when we're processing requests we pass them to the
> packet filter first which validates their contents and
> refuse to apply if they are not matched.
>
> Depending on request type each packet traverse an
> appropriate chain(s)
>
> FILTER_IN
>   - Common chain for any synchro packet. We verify
>     that if replica_id is nil then it shall be
>     PROMOTE request with lsn 0 to migrate limbo owner
>
> FILTER_CONFIRM
> FILTER_ROLLBACK
>   - Both confirm and rollback requests shall not come
>     with empty limbo since it measn the synchro queue
>     is already processed and the peer didn't notice
>     that
>
> FILTER_PROMOTE
>   - Promote request should come in with new terms only,
>     otherwise it means the peer didn't notice election
>
>   - If limbo's confirmed_lsn is equal to promote LSN then
>     it is a valid request to process
>
>   - If limbo's confirmed_lsn is bigger than requested then
>     it is valid in one case only -- limbo migration so the
>     queue shall be empty
>
>   - If limbo's confirmed_lsn is less than promote LSN then
>     - If queue is empty then it means the transactions are
>       already rolled back and request is invalid
>     - If queue is not empty then its first entry might be
>       greater than promote LSN and it means that old data
>       either committed or rolled back already and request
>       is invalid
>
> FILTER_DEMOTE
>   - NOP, reserved for future use
>
> Closes #6036

Thanks for the patch!
You're definitely moving in the right direction here.
Please, find a couple of comments below.

>
> Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
> ---
>   src/box/applier.cc     |  21 ++-
>   src/box/box.cc         |  11 +-
>   src/box/memtx_engine.c |   3 +-
>   src/box/txn_limbo.c    | 304 ++++++++++++++++++++++++++++++++++++++---
>   src/box/txn_limbo.h    |  33 ++++-
>   5 files changed, 347 insertions(+), 25 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index a7f472714..fefaa4ced 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
...

> diff --git a/src/box/box.cc b/src/box/box.cc
> index 5ca617e32..000887aa6 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc

...

> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index be5e0adf5..b01b5a572 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -51,6 +51,7 @@ txn_limbo_create(struct txn_limbo *limbo)
>   	limbo->confirmed_lsn = 0;
>   	limbo->rollback_count = 0;
>   	limbo->is_in_rollback = false;
> +	limbo->is_filtering = true;
>   }
>   
>   bool
> @@ -724,35 +725,291 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout)
>   	return 0;
>   }
>   
> +enum filter_chain {
> +	FILTER_IN,
> +	FILTER_CONFIRM,
> +	FILTER_ROLLBACK,
> +	FILTER_PROMOTE,
> +	FILTER_DEMOTE,
> +	FILTER_MAX,
> +};
> +
> +/**
> + * Common chain for any incoming packet.
> + */
> +static int
> +filter_in(struct txn_limbo *limbo, const struct synchro_request *req)
> +{
> +	(void)limbo;
> +
> +	if (req->replica_id == REPLICA_ID_NIL) {
> +		/*
> +		 * The limbo was empty on the instance issuing
> +		 * the request. This means this instance must
> +		 * empty its limbo as well.
> +		 */
> +		if (req->lsn != 0 ||
> +		    !iproto_type_is_promote_request(req->type)) {
> +			say_info("RAFT: rejecting %s request from "
> +				 "instance id %u for term %llu. "
> +				 "req->replica_id = 0 but lsn %lld.",
> +				 iproto_type_name(req->type),
> +				 req->origin_id, (long long)req->term,
> +				 (long long)req->lsn);
> +
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "Replication",
> +				 "empty replica_id with nonzero LSN");
> +			return -1;
> +		}
> +	}

I agree with Vlad. This may be moved to filter_confirm_rollback.

> +
> +	return 0;
> +}
> +
> +/**
> + * Filter CONFIRM and ROLLBACK packets.
> + */
> +static int
> +filter_confirm_rollback(struct txn_limbo *limbo,
> +			const struct synchro_request *req)
> +{
> +	/*
> +	 * When limbo is empty we have nothing to
> +	 * confirm/commit and if this request comes
> +	 * in it means the split brain has happened.
> +	 */
> +	if (!txn_limbo_is_empty(limbo))
> +		return 0;
> +
> +	say_info("RAFT: rejecting %s request from "
> +		 "instance id %u for term %llu. "
> +		 "Empty limbo detected.",
> +		 iproto_type_name(req->type),
> +		 req->origin_id,
> +		 (long long)req->term);
> +
> +	diag_set(ClientError, ER_UNSUPPORTED,
> +		 "Replication",
> +		 "confirm/rollback with empty limbo");
> +	return -1;
> +}
> +
> +/**
> + * Filter PROMOTE packets.
> + */
> +static int
> +filter_promote(struct txn_limbo *limbo, const struct synchro_request *req)
> +{
> +	int64_t promote_lsn = req->lsn;
> +
> +	/*
> +	 * If the term is already seen it means it comes
> +	 * from a node which didn't notice new elections,
> +	 * thus been living in subdomain and its data is
> +	 * no longer consistent.
> +	 */
> +	if (limbo->promote_term_max > 1 &&
> +	    limbo->promote_term_max > req->term) {
> +		say_info("RAFT: rejecting %s request from "
> +			 "instance id %u for term %llu. "
> +			 "Max term seen is %llu.",
> +			 iproto_type_name(req->type),
> +			 req->origin_id,
> +			 (long long)req->term,
> +			 (long long)limbo->promote_term_max);
> +
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication", "obsolete terms");
> +		return -1;
> +	}
> +
> +	/*
> +	 * Either the limbo is empty or new promote will
> +	 * rollback all waiting transactions. Which
> +	 * is fine.
> +	 */
> +	if (limbo->confirmed_lsn == promote_lsn)
> +		return 0;
> +
> +	/*
> +	 * Explicit split brain situation. Promote
> +	 * comes in with an old LSN which we've already
> +	 * processed.
> +	 */
> +	if (limbo->confirmed_lsn > promote_lsn) {
> +		/*
> +		 * If limbo is empty we're migrating
> +		 * the owner.
> +		 */
> +		if (txn_limbo_is_empty(limbo))
> +			return 0;

I don't understand this part. Are you sure this check is needed?
We're always migrating the owner with a promote.

> +
> +		say_info("RAFT: rejecting %s request from "
> +			 "instance id %u for term %llu. "
> +			 "confirmed_lsn %lld > promote_lsn %lld.",
> +			 iproto_type_name(req->type),
> +			 req->origin_id, (long long)req->term,
> +			 (long long)limbo->confirmed_lsn,
> +			 (long long)promote_lsn);
> +
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication",
> +			 "backward promote LSN (split brain)");
> +		return -1;
> +	}
> +
> +	/*
> +	 * The last case requires a few subcases.
> +	 */
> +	assert(limbo->confirmed_lsn < promote_lsn);
> +
> +	if (txn_limbo_is_empty(limbo)) {
> +		/*
> +		 * Transactions are already rolled back
> +		 * since the limbo is empty.
> +		 */
> +		say_info("RAFT: rejecting %s request from "
> +			 "instance id %u for term %llu. "
> +			 "confirmed_lsn %lld < promote_lsn %lld "
> +			 "and empty limbo.",
> +			 iproto_type_name(req->type),
> +			 req->origin_id, (long long)req->term,
> +			 (long long)limbo->confirmed_lsn,
> +			 (long long)promote_lsn);
> +
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication",
> +			 "forward promote LSN "
> +			 "(empty limbo, split brain)");
> +		return -1;

I think it'd be better to have a separate error code for this purpose.
Say, ER_SPLITBRAIN or something.
Then applier would have more control over what to do when such an error
is raised. Say, never reconnect. (I doesn't reconnect on ER_UNSUPPORTED,
I believe, but a distinct error is still better).

> +	} else {
> +		/*
> +		 * Some entries are present in the limbo,
> +		 * and if first entry's LSN is greater than
> +		 * requested then old data either commited
> +		 * or rolled back, so can't continue.
> +		 */
> +		struct txn_limbo_entry *first;
> +
> +		first = txn_limbo_first_entry(limbo);
> +		if (first->lsn > promote_lsn) {

This should only happen when confirmed_lsn > promote_lsn, shouldn't it?
If yes, than you've already handled it above.

> +			say_info("RAFT: rejecting %s request from "
> +				 "instance id %u for term %llu. "
> +				 "confirmed_lsn %lld < promote_lsn %lld "
> +				 "and limbo first lsn %lld.",
> +				 iproto_type_name(req->type),
> +				 req->origin_id, (long long)req->term,
> +				 (long long)limbo->confirmed_lsn,
> +				 (long long)promote_lsn,
> +				 (long long)first->lsn);
> +
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "Replication",
> +				 "promote LSN confilict "
> +				 "(limbo LSN ahead, split brain)");
> +			return -1;
> +		}
> +	}
> +
> +	return 0;
> +}
> +
> +/**
> + * Filter DEMOTE packets.
> + */
> +static int
> +filter_demote(struct txn_limbo *limbo, const struct synchro_request *req)
> +{
> +	(void)limbo;
> +	(void)req;
> +	return 0;
> +}
> +
> +static int (*filter_req[FILTER_MAX])
> +(struct txn_limbo *limbo, const struct synchro_request *req) = {
> +	[FILTER_IN]		= filter_in,
> +	[FILTER_CONFIRM]	= filter_confirm_rollback,
> +	[FILTER_ROLLBACK]	= filter_confirm_rollback,
> +	[FILTER_PROMOTE]	= filter_promote,
> +	[FILTER_DEMOTE]		= filter_demote,

Demote should be filtered the same way as promote, they are
basically the same requests with same meaning.

demote is just a promote for replica id 0, because we couldn't
do promote replica id 0.

> +};
> +
> +int
> +txn_limbo_filter_locked(struct txn_limbo *limbo,
> +			const struct synchro_request *req)
> +{
> +	unsigned int mask = (1u << FILTER_IN);
> +	unsigned int pos = 0;
> +
> +#ifndef NDEBUG
> +	say_info("limbo: filter %s replica_id %u origin_id %u "
> +		 "term %lld lsn %lld, queue owner_id %u len %lld "
> +		 "confirmed_lsn %lld (%s)",
> +		 iproto_type_name(req->type),
> +		 req->replica_id, req->origin_id,
> +		 (long long)req->term, (long long)req->lsn,
> +		 limbo->owner_id, (long long)limbo->len,
> +		 (long long)limbo->confirmed_lsn,
> +		 limbo->is_filtering ? "on" : "off");
> +#endif
> +
> +	if (!limbo->is_filtering)
> +		return 0;
> +
> +	switch (req->type) {
> +	case IPROTO_CONFIRM:
> +		mask |= (1u << FILTER_CONFIRM);
> +		break;
> +	case IPROTO_ROLLBACK:
> +		mask |= (1u << FILTER_ROLLBACK);
> +		break;
> +	case IPROTO_PROMOTE:
> +		mask |= (1u << FILTER_PROMOTE);
> +		break;
> +	case IPROTO_DEMOTE:
> +		mask |= (1u << FILTER_DEMOTE);
> +		break;
> +	default:
> +		say_info("RAFT: rejecting unexpected %d "
> +			 "request from instance id %u "
> +			 "for term %llu.",
> +			 req->type, req->origin_id,
> +			 (long long)req->term);
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication",
> +			 "unexpected request type");
> +		return -1;
> +	}
> +
> +	while (mask != 0) {
> +		if ((mask & 1) != 0) {
> +			assert(pos < lengthof(filter_req));
> +			if (filter_req[pos](limbo, req) != 0)
> +				return -1;
> +		}
> +		pos++;
> +		mask >>= 1;
> +	};
> +
> +	return 0;
> +}
> +
>   void
>   txn_limbo_process_locked(struct txn_limbo *limbo,
>   			 const struct synchro_request *req)
>   {
>   	uint64_t term = req->term;
>   	uint32_t origin = req->origin_id;
> +
>   	if (txn_limbo_term_locked(limbo, origin) < term) {
>   		vclock_follow(&limbo->promote_term_map, origin, term);
>   		if (term > limbo->promote_term_max)
>   			limbo->promote_term_max = term;
> -	} else if (iproto_type_is_promote_request(req->type) &&
> -		   limbo->promote_term_max > 1) {
> -		/* PROMOTE for outdated term. Ignore. */
> -		say_info("RAFT: ignoring %s request from instance "
> -			 "id %u for term %llu. Greatest term seen "
> -			 "before (%llu) is bigger.",
> -			 iproto_type_name(req->type), origin, (long long)term,
> -			 (long long)limbo->promote_term_max);
> -		return;
>   	}
>   
>   	int64_t lsn = req->lsn;
> -	if (req->replica_id == REPLICA_ID_NIL) {
> -		/*
> -		 * The limbo was empty on the instance issuing the request.
> -		 * This means this instance must empty its limbo as well.
> -		 */
> -		assert(lsn == 0 && iproto_type_is_promote_request(req->type));
> -	} else if (req->replica_id != limbo->owner_id) {
> +	if (req->replica_id != limbo->owner_id) {
>   		/*
>   		 * Ignore CONFIRM/ROLLBACK messages for a foreign master.
>   		 * These are most likely outdated messages for already confirmed

We should error when request -> replica_id != limbo->owner_id.
For every entry type: promote/demote/confirm/rollback.

req->replica_id != limbo->owner_id means that the remote instance has
taken some actions in the past (say, confirmed something) of which we
didn't know until now. This is basically a splitbrain again.


> @@ -783,18 +1040,25 @@ txn_limbo_process_locked(struct txn_limbo *limbo,
>   		txn_limbo_read_demote(limbo, lsn);
>   		break;
>   	default:
> -		unreachable();
> +		panic("limbo: unexpected request type %d",
> +		      req->type);
> +		break;
>   	}
> -	return;
>   }
>   
> -void
> +int
>   txn_limbo_process(struct txn_limbo *limbo,
>   		  const struct synchro_request *req)
>   {
> +	int rc;
> +
>   	txn_limbo_term_lock(limbo);
> -	txn_limbo_process_locked(limbo, req);
> +	rc = txn_limbo_filter_locked(limbo, req);
> +	if (rc == 0)
> +		txn_limbo_process_locked(limbo, req);
>   	txn_limbo_term_unlock(limbo);
> +
> +	return rc;
>   }
>   
>   void
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 25faffd2b..eb74dda00 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -184,6 +184,14 @@ struct txn_limbo {
>   	 * by the 'reversed rollback order' rule - contradiction.
>   	 */
>   	bool is_in_rollback;
> +	/**
> +	 * Whether the limbo should filter incoming requests.
> +	 * The phases of local recovery from WAL file and on applier's
> +	 * join phase we are in complete trust of incoming data because
> +	 * this data forms an initial limbo state and should not
> +	 * filter out requests.
> +	 */
> +	bool is_filtering;
>   };
>   
>   /**
> @@ -355,15 +363,38 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
>   int
>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>   
> +/**
> + * Verify if the request is valid for processing.
> + */
> +int
> +txn_limbo_filter_locked(struct txn_limbo *limbo,
> +			const struct synchro_request *req);
> +
>   /** Execute a synchronous replication request. */
>   void
>   txn_limbo_process_locked(struct txn_limbo *limbo,
>   			 const struct synchro_request *req);
>   
>   /** Lock limbo terms and execute a synchronous replication request. */
> -void
> +int
>   txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
>   
> +/** Enable filtering of synchro requests. */
> +static inline void
> +txn_limbo_filter_enable(struct txn_limbo *limbo)
> +{
> +	limbo->is_filtering = true;
> +	say_info("limbo: filter enabled");
> +}
> +
> +/** Disable filtering of synchro requests. */
> +static inline void
> +txn_limbo_filter_disable(struct txn_limbo *limbo)
> +{
> +	limbo->is_filtering = false;
> +	say_info("limbo: filter disabled");
> +}
> +
>   /**
>    * Waiting for confirmation of all "sync" transactions
>    * during confirm timeout or fail.

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list