[Tarantool-patches] [RFC v5 5/5] limbo: filter incoming requests
Serge Petrenko
sergepetrenko at tarantool.org
Thu Jul 15 14:59:02 MSK 2021
15.07.2021 00:23, Cyrill Gorcunov пишет:
> FIXME: This is incomplete PoC
>
> Closes #6036
>
> Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
> ---
> src/box/applier.cc | 3 +++
> src/box/txn_limbo.c | 52 ++++++++++++++++++++++++++++++++++++---------
> src/box/txn_limbo.h | 9 +++++++-
> 3 files changed, 53 insertions(+), 11 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 838aa372d..c3f3a154a 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -871,6 +871,9 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
> goto err;
>
> txn_limbo_promote_lock(&txn_limbo);
> + if (txn_limbo_filter_locked(&txn_limbo, &req) != 0)
> + goto err_unlock;
> +
> struct replica_cb_data rcb_data;
> struct synchro_entry entry;
> /*
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index d24df3606..330ba57b2 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -731,6 +731,40 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout)
> return 0;
> }
>
> +int
> +txn_limbo_filter_locked(struct txn_limbo *limbo,
> + const struct synchro_request *req)
> +{
> + struct txn_limbo_promote *pmt = &limbo->promote;
> + uint32_t replica_id = req->origin_id;
> + uint64_t term = req->term;
> +
> + panic_on(!txn_limbo_promote_is_locked(limbo),
> + "limbo: unlocked filtering of a request");
> +
> + /*
> + * In case of split brain has happened the promote
> + * request may come in with already seen term.
> + */
> + uint64_t seen_term = txn_limbo_term_locked(limbo, replica_id);
You need to filter by "term_max". Any term smaller than "term_max"
is bad.
> + if (seen_term >= term) {
> + if (iproto_type_is_promote_request(req->type) &&
> + pmt->terms_max > 1) {
> + say_info("RAFT: rejecting %s obsolete request "
> + "from instance id %u term %llu. "
> + "Current max term %llu.",
> + iproto_type_name(req->type),
> + replica_id, (long long)term,
> + (long long)pmt->terms_max);
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "Replication", "obsolete terms");
> + return -1;
> + }
> + }
> +
> + return 0;
> +}
> +
> void
> txn_limbo_process_locked(struct txn_limbo *limbo,
> const struct synchro_request *req)
> @@ -742,19 +776,14 @@ txn_limbo_process_locked(struct txn_limbo *limbo,
> panic_on(!txn_limbo_promote_is_locked(limbo),
> "limbo: unlocked processing of a request");
>
> + /*
> + * Update promote tracking since bad requests must
> + * be filtered out already.
> + */
> if (txn_limbo_term_locked(limbo, origin) < term) {
Filtering was done above, shouldn't this if(() always evaluate to true now?
> vclock_follow(&pmt->terms_map, origin, term);
> if (term > pmt->terms_max)
> pmt->terms_max = term;
> - } else if (iproto_type_is_promote_request(req->type) &&
> - pmt->terms_max > 1) {
> - /* PROMOTE for outdated term. Ignore. */
> - say_info("RAFT: ignoring %s request from instance "
> - "id %u for term %llu. Greatest term seen "
> - "before (%llu) is bigger.",
> - iproto_type_name(req->type), origin, (long long)term,
> - (long long)pmt->terms_max);
> - return;
> }
>
> int64_t lsn = req->lsn;
> @@ -800,12 +829,15 @@ txn_limbo_process_locked(struct txn_limbo *limbo,
> return;
> }
>
> -void
> +int
> txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
> {
> txn_limbo_promote_lock(limbo);
> + if (txn_limbo_filter_locked(limbo, req) != 0)
> + return -1;
> txn_limbo_process_locked(limbo, req);
> txn_limbo_promote_unlock(limbo);
> + return 0;
> }
>
> void
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index a2595bcff..bfdfef0e0 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -358,8 +358,15 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
> int
> txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>
> +/**
> + * Verify if the request is valid for processing.
> + */
> +int
> +txn_limbo_filter_locked(struct txn_limbo *limbo,
> + const struct synchro_request *req);
> +
> /** Execute a synchronous replication request. */
> -void
> +int
> txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
>
> void
--
Serge Petrenko
More information about the Tarantool-patches
mailing list