[Tarantool-patches] [PATCH v9 4/5] limbo: filter incoming synchro requests
Serge Petrenko
sergepetrenko at tarantool.org
Tue Aug 3 13:51:44 MSK 2021
30.07.2021 14:35, Cyrill Gorcunov пишет:
> When we receive synchro requests we can't just apply
> them blindly because in worse case they may come from
> split-brain configuration (where a cluster splitted into
> several subclusters and each one has own leader elected,
> then subclisters are trying to merge back into original
> cluster). We need to do our best to detect such configs
> and force these nodes to rejoin from the scratch for
> data consistency sake.
>
> Thus when we're processing requests we pass them to the
> packet filter first which validates their contents and
> refuse to apply if they are not matched.
>
> Depending on request type each packet traverse an
> appropriate chain(s)
>
> FILTER_IN
> - Common chain for any synchro packet. We verify
> that if replica_id is nil then it shall be
> PROMOTE request with lsn 0 to migrate limbo owner
>
> FILTER_CONFIRM
> FILTER_ROLLBACK
> - Both confirm and rollback requests shall not come
> with empty limbo since it measn the synchro queue
> is already processed and the peer didn't notice
> that
>
> FILTER_PROMOTE
> - Promote request should come in with new terms only,
> otherwise it means the peer didn't notice election
>
> - If limbo's confirmed_lsn is equal to promote LSN then
> it is a valid request to process
>
> - If limbo's confirmed_lsn is bigger than requested then
> it is valid in one case only -- limbo migration so the
> queue shall be empty
>
> - If limbo's confirmed_lsn is less than promote LSN then
> - If queue is empty then it means the transactions are
> already rolled back and request is invalid
> - If queue is not empty then its first entry might be
> greater than promote LSN and it means that old data
> either committed or rolled back already and request
> is invalid
>
> FILTER_DEMOTE
> - NOP, reserved for future use
>
> Closes #6036
Thanks for the patch!
You're definitely moving in the right direction here.
Please, find a couple of comments below.
>
> Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
> ---
> src/box/applier.cc | 21 ++-
> src/box/box.cc | 11 +-
> src/box/memtx_engine.c | 3 +-
> src/box/txn_limbo.c | 304 ++++++++++++++++++++++++++++++++++++++---
> src/box/txn_limbo.h | 33 ++++-
> 5 files changed, 347 insertions(+), 25 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index a7f472714..fefaa4ced 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
...
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 5ca617e32..000887aa6 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
...
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index be5e0adf5..b01b5a572 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -51,6 +51,7 @@ txn_limbo_create(struct txn_limbo *limbo)
> limbo->confirmed_lsn = 0;
> limbo->rollback_count = 0;
> limbo->is_in_rollback = false;
> + limbo->is_filtering = true;
> }
>
> bool
> @@ -724,35 +725,291 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout)
> return 0;
> }
>
> +enum filter_chain {
> + FILTER_IN,
> + FILTER_CONFIRM,
> + FILTER_ROLLBACK,
> + FILTER_PROMOTE,
> + FILTER_DEMOTE,
> + FILTER_MAX,
> +};
> +
> +/**
> + * Common chain for any incoming packet.
> + */
> +static int
> +filter_in(struct txn_limbo *limbo, const struct synchro_request *req)
> +{
> + (void)limbo;
> +
> + if (req->replica_id == REPLICA_ID_NIL) {
> + /*
> + * The limbo was empty on the instance issuing
> + * the request. This means this instance must
> + * empty its limbo as well.
> + */
> + if (req->lsn != 0 ||
> + !iproto_type_is_promote_request(req->type)) {
> + say_info("RAFT: rejecting %s request from "
> + "instance id %u for term %llu. "
> + "req->replica_id = 0 but lsn %lld.",
> + iproto_type_name(req->type),
> + req->origin_id, (long long)req->term,
> + (long long)req->lsn);
> +
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "Replication",
> + "empty replica_id with nonzero LSN");
> + return -1;
> + }
> + }
I agree with Vlad. This may be moved to filter_confirm_rollback.
> +
> + return 0;
> +}
> +
> +/**
> + * Filter CONFIRM and ROLLBACK packets.
> + */
> +static int
> +filter_confirm_rollback(struct txn_limbo *limbo,
> + const struct synchro_request *req)
> +{
> + /*
> + * When limbo is empty we have nothing to
> + * confirm/commit and if this request comes
> + * in it means the split brain has happened.
> + */
> + if (!txn_limbo_is_empty(limbo))
> + return 0;
> +
> + say_info("RAFT: rejecting %s request from "
> + "instance id %u for term %llu. "
> + "Empty limbo detected.",
> + iproto_type_name(req->type),
> + req->origin_id,
> + (long long)req->term);
> +
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "Replication",
> + "confirm/rollback with empty limbo");
> + return -1;
> +}
> +
> +/**
> + * Filter PROMOTE packets.
> + */
> +static int
> +filter_promote(struct txn_limbo *limbo, const struct synchro_request *req)
> +{
> + int64_t promote_lsn = req->lsn;
> +
> + /*
> + * If the term is already seen it means it comes
> + * from a node which didn't notice new elections,
> + * thus been living in subdomain and its data is
> + * no longer consistent.
> + */
> + if (limbo->promote_term_max > 1 &&
> + limbo->promote_term_max > req->term) {
> + say_info("RAFT: rejecting %s request from "
> + "instance id %u for term %llu. "
> + "Max term seen is %llu.",
> + iproto_type_name(req->type),
> + req->origin_id,
> + (long long)req->term,
> + (long long)limbo->promote_term_max);
> +
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "Replication", "obsolete terms");
> + return -1;
> + }
> +
> + /*
> + * Either the limbo is empty or new promote will
> + * rollback all waiting transactions. Which
> + * is fine.
> + */
> + if (limbo->confirmed_lsn == promote_lsn)
> + return 0;
> +
> + /*
> + * Explicit split brain situation. Promote
> + * comes in with an old LSN which we've already
> + * processed.
> + */
> + if (limbo->confirmed_lsn > promote_lsn) {
> + /*
> + * If limbo is empty we're migrating
> + * the owner.
> + */
> + if (txn_limbo_is_empty(limbo))
> + return 0;
I don't understand this part. Are you sure this check is needed?
We're always migrating the owner with a promote.
> +
> + say_info("RAFT: rejecting %s request from "
> + "instance id %u for term %llu. "
> + "confirmed_lsn %lld > promote_lsn %lld.",
> + iproto_type_name(req->type),
> + req->origin_id, (long long)req->term,
> + (long long)limbo->confirmed_lsn,
> + (long long)promote_lsn);
> +
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "Replication",
> + "backward promote LSN (split brain)");
> + return -1;
> + }
> +
> + /*
> + * The last case requires a few subcases.
> + */
> + assert(limbo->confirmed_lsn < promote_lsn);
> +
> + if (txn_limbo_is_empty(limbo)) {
> + /*
> + * Transactions are already rolled back
> + * since the limbo is empty.
> + */
> + say_info("RAFT: rejecting %s request from "
> + "instance id %u for term %llu. "
> + "confirmed_lsn %lld < promote_lsn %lld "
> + "and empty limbo.",
> + iproto_type_name(req->type),
> + req->origin_id, (long long)req->term,
> + (long long)limbo->confirmed_lsn,
> + (long long)promote_lsn);
> +
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "Replication",
> + "forward promote LSN "
> + "(empty limbo, split brain)");
> + return -1;
I think it'd be better to have a separate error code for this purpose.
Say, ER_SPLITBRAIN or something.
Then applier would have more control over what to do when such an error
is raised. Say, never reconnect. (I doesn't reconnect on ER_UNSUPPORTED,
I believe, but a distinct error is still better).
> + } else {
> + /*
> + * Some entries are present in the limbo,
> + * and if first entry's LSN is greater than
> + * requested then old data either commited
> + * or rolled back, so can't continue.
> + */
> + struct txn_limbo_entry *first;
> +
> + first = txn_limbo_first_entry(limbo);
> + if (first->lsn > promote_lsn) {
This should only happen when confirmed_lsn > promote_lsn, shouldn't it?
If yes, than you've already handled it above.
> + say_info("RAFT: rejecting %s request from "
> + "instance id %u for term %llu. "
> + "confirmed_lsn %lld < promote_lsn %lld "
> + "and limbo first lsn %lld.",
> + iproto_type_name(req->type),
> + req->origin_id, (long long)req->term,
> + (long long)limbo->confirmed_lsn,
> + (long long)promote_lsn,
> + (long long)first->lsn);
> +
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "Replication",
> + "promote LSN confilict "
> + "(limbo LSN ahead, split brain)");
> + return -1;
> + }
> + }
> +
> + return 0;
> +}
> +
> +/**
> + * Filter DEMOTE packets.
> + */
> +static int
> +filter_demote(struct txn_limbo *limbo, const struct synchro_request *req)
> +{
> + (void)limbo;
> + (void)req;
> + return 0;
> +}
> +
> +static int (*filter_req[FILTER_MAX])
> +(struct txn_limbo *limbo, const struct synchro_request *req) = {
> + [FILTER_IN] = filter_in,
> + [FILTER_CONFIRM] = filter_confirm_rollback,
> + [FILTER_ROLLBACK] = filter_confirm_rollback,
> + [FILTER_PROMOTE] = filter_promote,
> + [FILTER_DEMOTE] = filter_demote,
Demote should be filtered the same way as promote, they are
basically the same requests with same meaning.
demote is just a promote for replica id 0, because we couldn't
do promote replica id 0.
> +};
> +
> +int
> +txn_limbo_filter_locked(struct txn_limbo *limbo,
> + const struct synchro_request *req)
> +{
> + unsigned int mask = (1u << FILTER_IN);
> + unsigned int pos = 0;
> +
> +#ifndef NDEBUG
> + say_info("limbo: filter %s replica_id %u origin_id %u "
> + "term %lld lsn %lld, queue owner_id %u len %lld "
> + "confirmed_lsn %lld (%s)",
> + iproto_type_name(req->type),
> + req->replica_id, req->origin_id,
> + (long long)req->term, (long long)req->lsn,
> + limbo->owner_id, (long long)limbo->len,
> + (long long)limbo->confirmed_lsn,
> + limbo->is_filtering ? "on" : "off");
> +#endif
> +
> + if (!limbo->is_filtering)
> + return 0;
> +
> + switch (req->type) {
> + case IPROTO_CONFIRM:
> + mask |= (1u << FILTER_CONFIRM);
> + break;
> + case IPROTO_ROLLBACK:
> + mask |= (1u << FILTER_ROLLBACK);
> + break;
> + case IPROTO_PROMOTE:
> + mask |= (1u << FILTER_PROMOTE);
> + break;
> + case IPROTO_DEMOTE:
> + mask |= (1u << FILTER_DEMOTE);
> + break;
> + default:
> + say_info("RAFT: rejecting unexpected %d "
> + "request from instance id %u "
> + "for term %llu.",
> + req->type, req->origin_id,
> + (long long)req->term);
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "Replication",
> + "unexpected request type");
> + return -1;
> + }
> +
> + while (mask != 0) {
> + if ((mask & 1) != 0) {
> + assert(pos < lengthof(filter_req));
> + if (filter_req[pos](limbo, req) != 0)
> + return -1;
> + }
> + pos++;
> + mask >>= 1;
> + };
> +
> + return 0;
> +}
> +
> void
> txn_limbo_process_locked(struct txn_limbo *limbo,
> const struct synchro_request *req)
> {
> uint64_t term = req->term;
> uint32_t origin = req->origin_id;
> +
> if (txn_limbo_term_locked(limbo, origin) < term) {
> vclock_follow(&limbo->promote_term_map, origin, term);
> if (term > limbo->promote_term_max)
> limbo->promote_term_max = term;
> - } else if (iproto_type_is_promote_request(req->type) &&
> - limbo->promote_term_max > 1) {
> - /* PROMOTE for outdated term. Ignore. */
> - say_info("RAFT: ignoring %s request from instance "
> - "id %u for term %llu. Greatest term seen "
> - "before (%llu) is bigger.",
> - iproto_type_name(req->type), origin, (long long)term,
> - (long long)limbo->promote_term_max);
> - return;
> }
>
> int64_t lsn = req->lsn;
> - if (req->replica_id == REPLICA_ID_NIL) {
> - /*
> - * The limbo was empty on the instance issuing the request.
> - * This means this instance must empty its limbo as well.
> - */
> - assert(lsn == 0 && iproto_type_is_promote_request(req->type));
> - } else if (req->replica_id != limbo->owner_id) {
> + if (req->replica_id != limbo->owner_id) {
> /*
> * Ignore CONFIRM/ROLLBACK messages for a foreign master.
> * These are most likely outdated messages for already confirmed
We should error when request -> replica_id != limbo->owner_id.
For every entry type: promote/demote/confirm/rollback.
req->replica_id != limbo->owner_id means that the remote instance has
taken some actions in the past (say, confirmed something) of which we
didn't know until now. This is basically a splitbrain again.
> @@ -783,18 +1040,25 @@ txn_limbo_process_locked(struct txn_limbo *limbo,
> txn_limbo_read_demote(limbo, lsn);
> break;
> default:
> - unreachable();
> + panic("limbo: unexpected request type %d",
> + req->type);
> + break;
> }
> - return;
> }
>
> -void
> +int
> txn_limbo_process(struct txn_limbo *limbo,
> const struct synchro_request *req)
> {
> + int rc;
> +
> txn_limbo_term_lock(limbo);
> - txn_limbo_process_locked(limbo, req);
> + rc = txn_limbo_filter_locked(limbo, req);
> + if (rc == 0)
> + txn_limbo_process_locked(limbo, req);
> txn_limbo_term_unlock(limbo);
> +
> + return rc;
> }
>
> void
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 25faffd2b..eb74dda00 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -184,6 +184,14 @@ struct txn_limbo {
> * by the 'reversed rollback order' rule - contradiction.
> */
> bool is_in_rollback;
> + /**
> + * Whether the limbo should filter incoming requests.
> + * The phases of local recovery from WAL file and on applier's
> + * join phase we are in complete trust of incoming data because
> + * this data forms an initial limbo state and should not
> + * filter out requests.
> + */
> + bool is_filtering;
> };
>
> /**
> @@ -355,15 +363,38 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
> int
> txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>
> +/**
> + * Verify if the request is valid for processing.
> + */
> +int
> +txn_limbo_filter_locked(struct txn_limbo *limbo,
> + const struct synchro_request *req);
> +
> /** Execute a synchronous replication request. */
> void
> txn_limbo_process_locked(struct txn_limbo *limbo,
> const struct synchro_request *req);
>
> /** Lock limbo terms and execute a synchronous replication request. */
> -void
> +int
> txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
>
> +/** Enable filtering of synchro requests. */
> +static inline void
> +txn_limbo_filter_enable(struct txn_limbo *limbo)
> +{
> + limbo->is_filtering = true;
> + say_info("limbo: filter enabled");
> +}
> +
> +/** Disable filtering of synchro requests. */
> +static inline void
> +txn_limbo_filter_disable(struct txn_limbo *limbo)
> +{
> + limbo->is_filtering = false;
> + say_info("limbo: filter disabled");
> +}
> +
> /**
> * Waiting for confirmation of all "sync" transactions
> * during confirm timeout or fail.
--
Serge Petrenko
More information about the Tarantool-patches
mailing list