From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id A0D816EC5F; Wed, 4 Aug 2021 22:09:30 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org A0D816EC5F DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628104170; bh=poyFobj2/fuymKoQlr7a1GDsEO2G+9xX4NA3uePXMuk=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=mRP2azr3w2pOd6GCsG7+Iob7Ypi2HKVUjNGDvhM8YaSd+rZFn3X+fzNXH2NUZT1qN kGHfrbS73NYo9HyUEz0xqUhs+xYpEISn7yxKF5C1uAktqOnNE7rKmfyIluHs6/g4va gtsEjbtQRo8nFfJrKBMjRxXDRnzo37C1y5s1cNH0= Received: from mail-lf1-f43.google.com (mail-lf1-f43.google.com [209.85.167.43]) (using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id E54486E46F for ; Wed, 4 Aug 2021 22:08:32 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org E54486E46F Received: by mail-lf1-f43.google.com with SMTP id b6so6173304lff.10 for ; Wed, 04 Aug 2021 12:08:32 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=q+UfKwhH++twmPgPRSXwevGzprEdJvMncK4Z/95DokM=; b=ryfNUJNug2F4e6efoE7GX+a3zXa8SFbo5fhUTyFYGG65HirgRa/y4aQq0tk6ltVkyT Ez/qGI6FnxjsyU2mRxNPPvr2ZasVkbHgEyBPLMiPaHfHvS4HpuLley7uMGCnDDiRFegv YWtrvrJRpnYfrrvhNB44eJ4HCyKNbFUMy+z84dAGm+tMRrb5pYKJXGkzcpqPMzzS7tgZ JtRnIdvBbDf+cRqeXIk51zAOKKB3KCGYouxlx7dDkjWMf2tgShEZDpf5yTcBuelcvItI F4FEXl4+UyjjQGBKo/TEAYhEXf286/lW7ZsIuDhBi6Iv4uKTOBtE4t3uhwWjrm2IBk2P Kgkg== X-Gm-Message-State: AOAM533JVRGV5oQ6iAzPRQFnj7JlrWv7RlRjLB1Nm9YwLPhfVQyJdHgN waCvPaDdWBNxEw3IYbXSpZMRmj6YEGU= X-Google-Smtp-Source: ABdhPJwVhG5wlZrbeR52pbzLhg2HNpuGU08evm4C8MiDQ2tfNjwHl9otGw2PbUllvqlttXvkrojmAQ== X-Received: by 2002:a19:dc08:: with SMTP id t8mr517545lfg.523.1628104111859; Wed, 04 Aug 2021 12:08:31 -0700 (PDT) Received: from grain.localdomain ([5.18.255.97]) by smtp.gmail.com with ESMTPSA id x5sm220111ljp.125.2021.08.04.12.08.30 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Wed, 04 Aug 2021 12:08:30 -0700 (PDT) Received: by grain.localdomain (Postfix, from userid 1000) id DA71A5A0021; Wed, 4 Aug 2021 22:07:53 +0300 (MSK) To: tml Date: Wed, 4 Aug 2021 22:07:51 +0300 Message-Id: <20210804190752.488147-4-gorcunov@gmail.com> X-Mailer: git-send-email 2.31.1 In-Reply-To: <20210804190752.488147-1-gorcunov@gmail.com> References: <20210804190752.488147-1-gorcunov@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v10 3/4] limbo: filter incoming synchro requests X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Cyrill Gorcunov via Tarantool-patches Reply-To: Cyrill Gorcunov Cc: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" When we receive synchro requests we can't just apply them blindly because in worse case they may come from split-brain configuration (where a cluster split into several clusters and each one has own leader elected, then clusters are trying to merge back into the original one). We need to do our best to detect such disunity and force these nodes to rejoin from the scratch for data consistency sake. Thus when we're processing requests we pass them to the packet filter first which validates their contents and refuse to apply if they are not matched. Depending on request type each packet traverse an appropriate chain(s) +---> FILTER_PROMOTE | +---> FILTER_CONFIRM reader -> FILTER_IN -> request:{type} -> | | +---> FILTER_ROLLBACK | error | V +---> FILTER_DEMOTE exit FILTER_IN Common chain for any synchro packet. 1) Zero LSN allowed for PROMOTE | DEMOTE packets, since CONFIRM | ROLLBACK has to proceed some real data with LSN already assigned. 2) request:replica_id = 0 allowed for PROMOTE request only. 3) request:replica_id should match limbo:owner_id, iow the limbo migration should be noticed by all instances in the cluster. FILTER_CONFIRM FILTER_ROLLBACK 1) Both confirm and rollback requests can't be considered if limbo is already empty, ie there is no data in a local queue and everything is processed already. The request is obviously from the node which is out of date. FILTER_PROMOTE FILTER_DEMOTE 1) The requests should come in with nonzero term, otherwise the packet is corrupted. 2) The request's term should not be less than maximal known one, iow it should not come in from nodes which didn't notice raft epoch changes and living in the past. 3) If LSN of the request matches current confirmed LSN the packet is obviously correct to process. 4) If LSN is less than confirmed LSN then the request is wrong, we have processed the requested LSN already. 5) If LSN is less than confirmed LSN then a) If limbo is empty we can't do anything, since data is already processed and should issue an error; b) If there is some data in the limbo then requested LSN should be in range of limbo's [first; last] LSNs, thus the request will be able to commit and rollback limbo queue. Closes #6036 Signed-off-by: Cyrill Gorcunov --- src/box/applier.cc | 23 ++- src/box/box.cc | 11 +- src/box/memtx_engine.c | 3 +- src/box/txn_limbo.c | 354 ++++++++++++++++++++++++++++++++++++----- src/box/txn_limbo.h | 33 +++- 5 files changed, 378 insertions(+), 46 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 9db286ae2..f64b6fa35 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -458,7 +458,8 @@ applier_wait_snapshot(struct applier *applier) struct synchro_request req; if (xrow_decode_synchro(&row, &req) != 0) diag_raise(); - txn_limbo_process(&txn_limbo, &req); + if (txn_limbo_process(&txn_limbo, &req) != 0) + diag_raise(); } else if (iproto_type_is_raft_request(row.type)) { struct raft_request req; if (xrow_decode_raft(&row, &req, NULL) != 0) @@ -514,6 +515,11 @@ applier_fetch_snapshot(struct applier *applier) struct ev_io *coio = &applier->io; struct xrow_header row; + txn_limbo_filter_disable(&txn_limbo); + auto filter_guard = make_scoped_guard([&]{ + txn_limbo_filter_enable(&txn_limbo); + }); + memset(&row, 0, sizeof(row)); row.type = IPROTO_FETCH_SNAPSHOT; coio_write_xrow(coio, &row); @@ -587,6 +593,11 @@ applier_register(struct applier *applier, bool was_anon) struct ev_io *coio = &applier->io; struct xrow_header row; + txn_limbo_filter_disable(&txn_limbo); + auto filter_guard = make_scoped_guard([&]{ + txn_limbo_filter_enable(&txn_limbo); + }); + memset(&row, 0, sizeof(row)); /* * Send this instance's current vclock together @@ -620,6 +631,11 @@ applier_join(struct applier *applier) struct xrow_header row; uint64_t row_count; + txn_limbo_filter_disable(&txn_limbo); + auto filter_guard = make_scoped_guard([&]{ + txn_limbo_filter_enable(&txn_limbo); + }); + xrow_encode_join_xc(&row, &INSTANCE_UUID); coio_write_xrow(coio, &row); @@ -874,6 +890,11 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row) goto err; txn_limbo_term_lock(&txn_limbo); + if (txn_limbo_filter_locked(&txn_limbo, &req) != 0) { + txn_limbo_term_unlock(&txn_limbo); + goto err; + } + struct replica_cb_data rcb_data; struct synchro_entry entry; /* diff --git a/src/box/box.cc b/src/box/box.cc index 8dc3b130b..c3516b7a4 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1675,7 +1675,8 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn) .lsn = promote_lsn, .term = raft->term, }; - txn_limbo_process(&txn_limbo, &req); + if (txn_limbo_process(&txn_limbo, &req) != 0) + diag_raise(); assert(txn_limbo_is_empty(&txn_limbo)); } @@ -1697,7 +1698,8 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn) .lsn = promote_lsn, .term = box_raft()->term, }; - txn_limbo_process(&txn_limbo, &req); + if (txn_limbo_process(&txn_limbo, &req) != 0) + diag_raise(); assert(txn_limbo_is_empty(&txn_limbo)); } @@ -3284,6 +3286,11 @@ local_recovery(const struct tt_uuid *instance_uuid, say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID)); + txn_limbo_filter_disable(&txn_limbo); + auto filter_guard = make_scoped_guard([&]{ + txn_limbo_filter_enable(&txn_limbo); + }); + struct wal_stream wal_stream; wal_stream_create(&wal_stream); auto stream_guard = make_scoped_guard([&]{ diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index 0b06e5e63..4aed24fe3 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -238,7 +238,8 @@ memtx_engine_recover_synchro(const struct xrow_header *row) * because all its rows have a zero replica_id. */ req.origin_id = req.replica_id; - txn_limbo_process(&txn_limbo, &req); + if (txn_limbo_process(&txn_limbo, &req) != 0) + return -1; return 0; } diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index a718c55a2..59fb51fac 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -51,6 +51,7 @@ txn_limbo_create(struct txn_limbo *limbo) limbo->confirmed_lsn = 0; limbo->rollback_count = 0; limbo->is_in_rollback = false; + limbo->is_filtering = true; } bool @@ -724,6 +725,302 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout) return 0; } +enum filter_chain { + FILTER_IN, + FILTER_CONFIRM, + FILTER_ROLLBACK, + FILTER_PROMOTE, + FILTER_DEMOTE, + FILTER_MAX, +}; + +/** + * Fill the reject reason with request data. + * The function is not reenterable, use with caution. + */ +static char * +reject_str(const struct synchro_request *req) +{ + static char prefix[128]; + + snprintf(prefix, sizeof(prefix), "RAFT: rejecting %s (%d) " + "request from origin_id %u replica_id %u term %llu", + iproto_type_name(req->type), req->type, + req->origin_id, req->replica_id, + (long long)req->term); + + return prefix; +} + +/** + * Common chain for any incoming packet. + */ +static int +filter_in(struct txn_limbo *limbo, const struct synchro_request *req) +{ + (void)limbo; + + /* + * Zero LSN are allowed for PROMOTE + * and DEMOTE requests only. + */ + if (req->lsn == 0) { + if (!iproto_type_is_promote_request(req->type)) { + say_info("%s. Zero lsn detected", + reject_str(req)); + + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", + "zero LSN on promote/demote"); + return -1; + } + } + + /* + * Zero @a replica_id is allowed for PROMOTE packets only. + */ + if (req->replica_id == REPLICA_ID_NIL) { + if (req->type != IPROTO_PROMOTE) { + say_info("%s. Zero replica_id detected", + reject_str(req)); + + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", + "zero replica_id"); + return -1; + + } + } + + /* + * Incoming packets should esteem limbo owner, + * if it doesn't match it means the sender + * missed limbo owner migrations and out of date. + */ + if (req->replica_id != limbo->owner_id) { + say_info("%s. Limbo owner mismatch, owner_id %u", + reject_str(req), limbo->owner_id); + + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", + "sync queue silent owner migration"); + return -1; + } + + return 0; +} + +/** + * Filter CONFIRM and ROLLBACK packets. + */ +static int +filter_confirm_rollback(struct txn_limbo *limbo, + const struct synchro_request *req) +{ + /* + * When limbo is empty we have nothing to + * confirm/commit and if this request comes + * in it means the split brain has happened. + */ + if (!txn_limbo_is_empty(limbo)) + return 0; + + say_info("%s. Empty limbo detected", reject_str(req)); + + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", + "confirm/rollback with empty limbo"); + return -1; +} + +/** + * Filter PROMOTE and DEMOTE packets. + */ +static int +filter_promote_demote(struct txn_limbo *limbo, + const struct synchro_request *req) +{ + int64_t promote_lsn = req->lsn; + + /* + * PROMOTE and DEMOTE packets must not have zero + * term supplied, otherwise it is a broken packet. + */ + if (req->term == 0) { + say_info("%s. Zero term detected", reject_str(req)); + + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", "zero term"); + return -1; + } + + /* + * If the term is already seen it means it comes + * from a node which didn't notice new elections, + * thus been living in subdomain and its data is + * no longer consistent. + */ + if (limbo->promote_greatest_term > req->term) { + say_info("%s. Max term seen is %llu", reject_str(req), + (long long)limbo->promote_greatest_term); + + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", "obsolete terms"); + return -1; + } + + /* + * Easy case -- processed LSN matches the new + * one which comes inside request, everything + * is consistent. + */ + if (limbo->confirmed_lsn == promote_lsn) + return 0; + + /* + * Explicit split brain situation. Promote + * comes in with an old LSN which we've already + * processed. + */ + if (limbo->confirmed_lsn > promote_lsn) { + say_info("%s. confirmed_lsn %lld > promote_lsn %lld", + reject_str(req), + (long long)limbo->confirmed_lsn, + (long long)promote_lsn); + + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", + "backward promote LSN (split brain)"); + return -1; + } + + /* + * The last case requires a few subcases. + */ + assert(limbo->confirmed_lsn < promote_lsn); + + if (txn_limbo_is_empty(limbo)) { + /* + * Transactions are rolled back already, + * since the limbo is empty. + */ + say_info("%s. confirmed_lsn %lld < promote_lsn %lld " + "and empty limbo", reject_str(req), + (long long)limbo->confirmed_lsn, + (long long)promote_lsn); + + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", + "forward promote LSN " + "(empty limbo, split brain)"); + return -1; + } else { + /* + * Some entries are present in the limbo, + * we need to make sure the @a promote_lsn + * lays inside limbo [first; last] range. + * So that the promote request has some + * queued data to process, otherwise it + * means the request comes from split + * brained node. + */ + struct txn_limbo_entry *first, *last; + + first = txn_limbo_first_entry(limbo); + last = txn_limbo_last_entry(limbo); + + if (first->lsn < promote_lsn || + last->lsn > promote_lsn) { + say_info("%s. promote_lsn %lld out of " + "range [%lld; %lld]", + reject_str(req), + (long long)promote_lsn, + (long long)first->lsn, + (long long)last->lsn); + + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", + "promote LSN out of queue range " + "(split brain)"); + return -1; + } + } + + return 0; +} + +static int (*filter_req[FILTER_MAX]) +(struct txn_limbo *limbo, const struct synchro_request *req) = { + [FILTER_IN] = filter_in, + [FILTER_CONFIRM] = filter_confirm_rollback, + [FILTER_ROLLBACK] = filter_confirm_rollback, + [FILTER_PROMOTE] = filter_promote_demote, + [FILTER_DEMOTE] = filter_promote_demote, +}; + +int +txn_limbo_filter_locked(struct txn_limbo *limbo, + const struct synchro_request *req) +{ + unsigned int mask = (1u << FILTER_IN); + unsigned int pos = 0; + + assert(latch_is_locked(&limbo->promote_latch)); + +#ifndef NDEBUG + say_info("limbo: filter %s replica_id %u origin_id %u " + "term %lld lsn %lld, queue owner_id %u len %lld " + "confirmed_lsn %lld (%s)", + iproto_type_name(req->type), + req->replica_id, req->origin_id, + (long long)req->term, (long long)req->lsn, + limbo->owner_id, (long long)limbo->len, + (long long)limbo->confirmed_lsn, + limbo->is_filtering ? "on" : "off"); +#endif + + if (!limbo->is_filtering) + return 0; + + switch (req->type) { + case IPROTO_CONFIRM: + mask |= (1u << FILTER_CONFIRM); + break; + case IPROTO_ROLLBACK: + mask |= (1u << FILTER_ROLLBACK); + break; + case IPROTO_PROMOTE: + mask |= (1u << FILTER_PROMOTE); + break; + case IPROTO_DEMOTE: + mask |= (1u << FILTER_DEMOTE); + break; + default: + say_info("RAFT: rejecting unexpected %d " + "request from instance id %u " + "for term %llu.", + req->type, req->origin_id, + (long long)req->term); + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", + "unexpected request type"); + return -1; + } + + while (mask != 0) { + if ((mask & 1) != 0) { + assert(pos < lengthof(filter_req)); + assert(filter_req[pos] != NULL); + if (filter_req[pos](limbo, req) != 0) + return -1; + } + pos++; + mask >>= 1; + }; + + return 0; +} + void txn_limbo_process_locked(struct txn_limbo *limbo, const struct synchro_request *req) @@ -732,71 +1029,46 @@ txn_limbo_process_locked(struct txn_limbo *limbo, uint64_t term = req->term; uint32_t origin = req->origin_id; + if (txn_limbo_replica_term_locked(limbo, origin) < term) { vclock_follow(&limbo->promote_term_map, origin, term); if (term > limbo->promote_greatest_term) limbo->promote_greatest_term = term; - } else if (iproto_type_is_promote_request(req->type) && - limbo->promote_greatest_term > 1) { - /* PROMOTE for outdated term. Ignore. */ - say_info("RAFT: ignoring %s request from instance " - "id %u for term %llu. Greatest term seen " - "before (%llu) is bigger.", - iproto_type_name(req->type), origin, (long long)term, - (long long)limbo->promote_greatest_term); - return; } - int64_t lsn = req->lsn; - if (req->replica_id == REPLICA_ID_NIL) { - /* - * The limbo was empty on the instance issuing the request. - * This means this instance must empty its limbo as well. - */ - assert(lsn == 0 && iproto_type_is_promote_request(req->type)); - } else if (req->replica_id != limbo->owner_id) { - /* - * Ignore CONFIRM/ROLLBACK messages for a foreign master. - * These are most likely outdated messages for already confirmed - * data from an old leader, who has just started and written - * confirm right on synchronous transaction recovery. - */ - if (!iproto_type_is_promote_request(req->type)) - return; - /* - * Promote has a bigger term, and tries to steal the limbo. It - * means it probably was elected with a quorum, and it makes no - * sense to wait here for confirmations. The other nodes already - * elected a new leader. Rollback all the local txns. - */ - lsn = 0; - } switch (req->type) { case IPROTO_CONFIRM: - txn_limbo_read_confirm(limbo, lsn); + txn_limbo_read_confirm(limbo, req->lsn); break; case IPROTO_ROLLBACK: - txn_limbo_read_rollback(limbo, lsn); + txn_limbo_read_rollback(limbo, req->lsn); break; case IPROTO_PROMOTE: - txn_limbo_read_promote(limbo, req->origin_id, lsn); + txn_limbo_read_promote(limbo, req->origin_id, req->lsn); break; case IPROTO_DEMOTE: - txn_limbo_read_demote(limbo, lsn); + txn_limbo_read_demote(limbo, req->lsn); break; default: - unreachable(); + panic("limbo: unexpected request type %d", + req->type); + break; } - return; } -void +int txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) { + int rc; + txn_limbo_term_lock(limbo); - txn_limbo_process_locked(limbo, req); + rc = txn_limbo_filter_locked(limbo, req); + if (rc == 0) + txn_limbo_process_locked(limbo, req); txn_limbo_term_unlock(limbo); + + return rc; } void diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index c77c501e9..de33037f5 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -184,6 +184,14 @@ struct txn_limbo { * by the 'reversed rollback order' rule - contradiction. */ bool is_in_rollback; + /** + * Whether the limbo should filter incoming requests. + * The phases of local recovery from WAL file and on applier's + * join phase we are in complete trust of incoming data because + * this data forms an initial limbo state and should not + * filter out requests. + */ + bool is_filtering; }; /** @@ -333,15 +341,38 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); +/** + * Verify if the request is valid for processing. + */ +int +txn_limbo_filter_locked(struct txn_limbo *limbo, + const struct synchro_request *req); + /** Execute a synchronous replication request. */ void txn_limbo_process_locked(struct txn_limbo *limbo, const struct synchro_request *req); /** Lock limbo terms and execute a synchronous replication request. */ -void +int txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req); +/** Enable filtering of synchro requests. */ +static inline void +txn_limbo_filter_enable(struct txn_limbo *limbo) +{ + limbo->is_filtering = true; + say_info("limbo: filter enabled"); +} + +/** Disable filtering of synchro requests. */ +static inline void +txn_limbo_filter_disable(struct txn_limbo *limbo) +{ + limbo->is_filtering = false; + say_info("limbo: filter disabled"); +} + /** * Waiting for confirmation of all "sync" transactions * during confirm timeout or fail. -- 2.31.1