[Tarantool-patches] [PATCH v10 3/4] limbo: filter incoming synchro requests

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Aug 6 02:33:39 MSK 2021


Thanks for the patch!

See 14 comments below.

> FILTER_PROMOTE
> FILTER_DEMOTE
>  1) The requests should come in with nonzero term, otherwise
>     the packet is corrupted.
>  2) The request's term should not be less than maximal known
>     one, iow it should not come in from nodes which didn't notice
>     raft epoch changes and living in the past.
>  3) If LSN of the request matches current confirmed LSN the packet
>     is obviously correct to process.
>  4) If LSN is less than confirmed LSN then the request is wrong,
>     we have processed the requested LSN already.
>  5) If LSN is less than confirmed LSN then

1. You already said "If LSN is less than confirmed LSN then" in (4).
In (4) it must be 'greater'.

>     a) If limbo is empty we can't do anything, since data is already
>        processed and should issue an error;
>     b) If there is some data in the limbo then requested LSN should
>        be in range of limbo's [first; last] LSNs, thus the request
>        will be able to commit and rollback limbo queue.
> 
> Closes #6036

2. You need to add a changelog file.

> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 9db286ae2..f64b6fa35 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -514,6 +515,11 @@ applier_fetch_snapshot(struct applier *applier)
>  	struct ev_io *coio = &applier->io;
>  	struct xrow_header row;
>  
> +	txn_limbo_filter_disable(&txn_limbo);
> +	auto filter_guard = make_scoped_guard([&]{
> +		txn_limbo_filter_enable(&txn_limbo);
> +	});

3. Why do you need to enable/disabled the filter here? Shouldn't snapshot
contain only valid data? Moreover, AFAIU it can't contain any limbo
rows at all. The limbo snapshot is sent separately, but the data flow
does not have anything except pure data. The same for the
join.

And how is it related to applier_register below? It does not download
any data at all, does it?

> diff --git a/src/box/box.cc b/src/box/box.cc
> index 8dc3b130b..c3516b7a4 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1675,7 +1675,8 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn)
>  		.lsn = promote_lsn,
>  		.term = raft->term,
>  	};
> -	txn_limbo_process(&txn_limbo, &req);
> +	if (txn_limbo_process(&txn_limbo, &req) != 0)
> +		diag_raise();

4. box_issue_promote() is used without try-catches anywhere. Please,
don't use exceptions in the new code. The same for demote.

>  	assert(txn_limbo_is_empty(&txn_limbo));
>  }
> @@ -3284,6 +3286,11 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  
>  	say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
>  
> +	txn_limbo_filter_disable(&txn_limbo);

5. Why do you need it turned off for recovery? If the data was
able to be applied in the first place, why can't it be replayed
in the same way during recovery?

> +	auto filter_guard = make_scoped_guard([&]{
> +		txn_limbo_filter_enable(&txn_limbo);
> +	});
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index a718c55a2..59fb51fac 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c

<...>

> +
> +/**
> + * Common chain for any incoming packet.
> + */
> +static int
> +filter_in(struct txn_limbo *limbo, const struct synchro_request *req)
> +{
> +	(void)limbo;

6. So you have the filtering enabled dynamically in the limbo, but
you do not use the limbo here? Why? Maybe at least add an assertion
that the filter is enabled?

> +
> +	/*
> +	 * Zero LSN are allowed for PROMOTE
> +	 * and DEMOTE requests only.
> +	 */
> +	if (req->lsn == 0) {
> +		if (!iproto_type_is_promote_request(req->type)) {
> +			say_info("%s. Zero lsn detected",
> +				 reject_str(req));

7. It should be say_error(). The same in the other places.

> +
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "Replication",
> +				 "zero LSN on promote/demote");
> +			return -1;

8. Please, try to be more compact. Even with the current indetation
level you don't need to wrap the lines so early. But the indentation
can be reduced even further easily:

====================
@@ -764,16 +764,11 @@ filter_in(struct txn_limbo *limbo, const struct synchro_request *req)
 	 * Zero LSN are allowed for PROMOTE
 	 * and DEMOTE requests only.
 	 */
-	if (req->lsn == 0) {
-		if (!iproto_type_is_promote_request(req->type)) {
-			say_info("%s. Zero lsn detected",
-				 reject_str(req));
-
-			diag_set(ClientError, ER_UNSUPPORTED,
-				 "Replication",
-				 "zero LSN on promote/demote");
-			return -1;
-		}
+	if (req->lsn == 0 && !iproto_type_is_promote_request(req->type)) {
+		say_info("%s. Zero lsn detected", reject_str(req));
+		diag_set(ClientError, ER_UNSUPPORTED, "Replication zero LSN "
+			 "on promote/demote");
+		return -1;
 	}
====================

The same in 2 places below. More compact.

> +		}
> +	}

<...>

> +
> +/**
> + * Filter CONFIRM and ROLLBACK packets.
> + */
> +static int
> +filter_confirm_rollback(struct txn_limbo *limbo,
> +			const struct synchro_request *req)
> +{
> +	/*
> +	 * When limbo is empty we have nothing to
> +	 * confirm/commit and if this request comes
> +	 * in it means the split brain has happened.
> +	 */
> +	if (!txn_limbo_is_empty(limbo))
> +		return 0;

9. What if rollback is for LSN > limbo's last LSN? It
also means nothing to do. The same for confirm LSN < limbo's
first LSN.

> +
> +	say_info("%s. Empty limbo detected", reject_str(req));
> +
> +	diag_set(ClientError, ER_UNSUPPORTED,
> +		 "Replication",
> +		 "confirm/rollback with empty limbo");
> +	return -1;
> +}
> +
> +/**
> + * Filter PROMOTE and DEMOTE packets.
> + */
> +static int
> +filter_promote_demote(struct txn_limbo *limbo,
> +		      const struct synchro_request *req)
> +{
> +	int64_t promote_lsn = req->lsn;
> +
> +	/*
> +	 * PROMOTE and DEMOTE packets must not have zero
> +	 * term supplied, otherwise it is a broken packet.
> +	 */
> +	if (req->term == 0) {
> +		say_info("%s. Zero term detected", reject_str(req));
> +
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication", "zero term");
> +		return -1;
> +	}
> +
> +	/*
> +	 * If the term is already seen it means it comes
> +	 * from a node which didn't notice new elections,
> +	 * thus been living in subdomain and its data is
> +	 * no longer consistent.
> +	 */
> +	if (limbo->promote_greatest_term > req->term) {
> +		say_info("%s. Max term seen is %llu", reject_str(req),
> +			 (long long)limbo->promote_greatest_term);
> +
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication", "obsolete terms");
> +		return -1;
> +	}
> +
> +	/*
> +	 * Easy case -- processed LSN matches the new
> +	 * one which comes inside request, everything
> +	 * is consistent.
> +	 */
> +	if (limbo->confirmed_lsn == promote_lsn)
> +		return 0;
> +
> +	/*
> +	 * Explicit split brain situation. Promote
> +	 * comes in with an old LSN which we've already
> +	 * processed.
> +	 */
> +	if (limbo->confirmed_lsn > promote_lsn) {
> +		say_info("%s. confirmed_lsn %lld > promote_lsn %lld",
> +			 reject_str(req),
> +			 (long long)limbo->confirmed_lsn,
> +			 (long long)promote_lsn);
> +
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication",
> +			 "backward promote LSN (split brain)");
> +		return -1;
> +	}
> +
> +	/*
> +	 * The last case requires a few subcases.
> +	 */
> +	assert(limbo->confirmed_lsn < promote_lsn);
> +
> +	if (txn_limbo_is_empty(limbo)) {
> +		/*
> +		 * Transactions are rolled back already,
> +		 * since the limbo is empty.
> +		 */
> +		say_info("%s. confirmed_lsn %lld < promote_lsn %lld "
> +			 "and empty limbo", reject_str(req),
> +			 (long long)limbo->confirmed_lsn,
> +			 (long long)promote_lsn);
> +
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication",
> +			 "forward promote LSN "
> +			 "(empty limbo, split brain)");
> +		return -1;
> +	} else {

10. You don't need 'else' - the main branch already made 'return'.
This should help to reduce the indentation below.

> +		/*
> +		 * Some entries are present in the limbo,
> +		 * we need to make sure the @a promote_lsn
> +		 * lays inside limbo [first; last] range.
> +		 * So that the promote request has some
> +		 * queued data to process, otherwise it
> +		 * means the request comes from split
> +		 * brained node.
> +		 */
> +		struct txn_limbo_entry *first, *last;
> +
> +		first = txn_limbo_first_entry(limbo);
> +		last = txn_limbo_last_entry(limbo);
> +
> +		if (first->lsn < promote_lsn ||
> +		    last->lsn > promote_lsn) {

11. This seems to be broken. In the comment you said the
error is when

	promote < first or promote > last

And here in the condition you return an error when

	promote > first or promote < last

Why?

> +			say_info("%s. promote_lsn %lld out of "
> +				 "range [%lld; %lld]",
> +				 reject_str(req),
> +				 (long long)promote_lsn,
> +				 (long long)first->lsn,
> +				 (long long)last->lsn);
> +
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "Replication",
> +				 "promote LSN out of queue range "
> +				 "(split brain)");
> +			return -1;
> +		}
> +	}
> +
> +	return 0;
> +}
> +
> +static int (*filter_req[FILTER_MAX])
> +(struct txn_limbo *limbo, const struct synchro_request *req) = {
> +	[FILTER_IN]		= filter_in,
> +	[FILTER_CONFIRM]	= filter_confirm_rollback,
> +	[FILTER_ROLLBACK]	= filter_confirm_rollback,
> +	[FILTER_PROMOTE]	= filter_promote_demote,
> +	[FILTER_DEMOTE]		= filter_promote_demote,

12. What is this? Wouldn't it be much much much simpler to just
call filter_in() always + make a switch case for the request type +
call the needed functions?

What is worse, you already have the switch-case anyway, but you
also added some strange loop, masks, and virtual functions ... .
I don't think I could make it more complex even if I wanted to,
sorry. Why so complicated?

> +};
> +
> +int
> +txn_limbo_filter_locked(struct txn_limbo *limbo,
> +			const struct synchro_request *req)
> +{
> +	unsigned int mask = (1u << FILTER_IN);
> +	unsigned int pos = 0;
> +
> +	assert(latch_is_locked(&limbo->promote_latch));
> +
> +#ifndef NDEBUG
> +	say_info("limbo: filter %s replica_id %u origin_id %u "
> +		 "term %lld lsn %lld, queue owner_id %u len %lld "
> +		 "confirmed_lsn %lld (%s)",
> +		 iproto_type_name(req->type),
> +		 req->replica_id, req->origin_id,
> +		 (long long)req->term, (long long)req->lsn,
> +		 limbo->owner_id, (long long)limbo->len,
> +		 (long long)limbo->confirmed_lsn,
> +		 limbo->is_filtering ? "on" : "off");
> +#endif
> +
> +	if (!limbo->is_filtering)
> +		return 0;
> +
> +	switch (req->type) {
> +	case IPROTO_CONFIRM:
> +		mask |= (1u << FILTER_CONFIRM);
> +		break;
> +	case IPROTO_ROLLBACK:
> +		mask |= (1u << FILTER_ROLLBACK);
> +		break;
> +	case IPROTO_PROMOTE:
> +		mask |= (1u << FILTER_PROMOTE);
> +		break;
> +	case IPROTO_DEMOTE:
> +		mask |= (1u << FILTER_DEMOTE);
> +		break;
> +	default:
> +		say_info("RAFT: rejecting unexpected %d "
> +			 "request from instance id %u "
> +			 "for term %llu.",
> +			 req->type, req->origin_id,
> +			 (long long)req->term);
> +		diag_set(ClientError, ER_UNSUPPORTED,
> +			 "Replication",
> +			 "unexpected request type");
> +		return -1;
> +	}
> +
> +	while (mask != 0) {
> +		if ((mask & 1) != 0) {
> +			assert(pos < lengthof(filter_req));
> +			assert(filter_req[pos] != NULL);
> +			if (filter_req[pos](limbo, req) != 0)
> +				return -1;
> +		}
> +		pos++;
> +		mask >>= 1;
> +	};
> +
> +	return 0;
> +}
> +
>  void
>  txn_limbo_process_locked(struct txn_limbo *limbo,
>  			 const struct synchro_request *req)

<...>

> -void
> +int
>  txn_limbo_process(struct txn_limbo *limbo,
>  		  const struct synchro_request *req)
>  {
> +	int rc;
> +
>  	txn_limbo_term_lock(limbo);
> -	txn_limbo_process_locked(limbo, req);
> +	rc = txn_limbo_filter_locked(limbo, req);

13. We use relatively modern C. You can make int rc = ...;

14. Process can fail now, but still its result is ignored in
wal_stream_apply_synchro_row().

> +	if (rc == 0)
> +		txn_limbo_process_locked(limbo, req);
>  	txn_limbo_term_unlock(limbo);
> +
> +	return rc;
>  }


More information about the Tarantool-patches mailing list