From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 639ED6E44D; Fri, 10 Sep 2021 18:31:47 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 639ED6E44D DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1631287907; bh=JrpWfl5ja6vWySHxAS+ibj3U3ddOHYqF2HPA8qxJaZI=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=mzVd8YV/eXyy/tiihIiKSv8PMX/wL32JA/BcZZAgY8gK4dJIXkEzC00AFr5Lxjj2v /3RPADJaFBWzImnNeD5BsN0njG4NsgjlZ8RTIRGK9WFNz164/aghN45hQ644EDZwQj QebTnvvpBNgvq+QOn3ClGE1keMDoLZ05W5b3MaTU= Received: from mail-lj1-f177.google.com (mail-lj1-f177.google.com [209.85.208.177]) (using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 8B9A16E465 for ; Fri, 10 Sep 2021 18:30:16 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 8B9A16E465 Received: by mail-lj1-f177.google.com with SMTP id h1so3785732ljl.9 for ; Fri, 10 Sep 2021 08:30:16 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20210112; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=u/BCsmxmM6OMuMuvRZiy0xvtzD+MX9mTG3A8KmJc1z8=; b=nydj9QnbRaxNMwcBHJGyIwU9dyN6KuU/JlMIxPTF4tfTQ2B/wSgSxIXtVv7JvrCgXt o/n3ozUFZx/yRvb+R6igIzgxUwh72SCaSHH2xvaJrFUbB0G+hdesfjq7BIEIcvfx/s2v LweUg3luBpGD17ycvOdJDLOKuCpIyVELKCNMT9nI2LruRPMeRvx7HdigFoBE5asxjkaB 6nKdZEIRUzAESpGPQwSuSpwK0gx1B/jXnsTv4GqYBVuQvna+kZ6ZLaG2a3HCofVHjQFS Ytyx8NFidrz9qpmF/YQLKV0OSJOAx1pYmZAGGHDf6pr7TPB1VdMhf5m18jPrzO1q7sGK J8gg== X-Gm-Message-State: AOAM533Sn1/vWyYK/482fUX8eRgzcyJAyKvOoO8vvw6PRm9NvTbJYEMM +LXM8vV1zMnw7GqUQvlzfVecrB/65mRSVA== X-Google-Smtp-Source: ABdhPJxC1q17mxg1hEvYBuiHSzqvG1QdNc2oaJBbQHmCWPVCckjk5fDcvEgCSqCHXXGEnqSeyHOkMA== X-Received: by 2002:a2e:8e8f:: with SMTP id z15mr4483951ljk.121.1631287815087; Fri, 10 Sep 2021 08:30:15 -0700 (PDT) Received: from grain.localdomain ([5.18.253.97]) by smtp.gmail.com with ESMTPSA id w3sm616362ljm.13.2021.09.10.08.30.13 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Fri, 10 Sep 2021 08:30:14 -0700 (PDT) Received: by grain.localdomain (Postfix, from userid 1000) id C59F15A0023; Fri, 10 Sep 2021 18:29:11 +0300 (MSK) To: tml Date: Fri, 10 Sep 2021 18:29:09 +0300 Message-Id: <20210910152910.607398-6-gorcunov@gmail.com> X-Mailer: git-send-email 2.31.1 In-Reply-To: <20210910152910.607398-1-gorcunov@gmail.com> References: <20210910152910.607398-1-gorcunov@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v14 5/6] qsync: filter incoming synchro requests X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Cyrill Gorcunov via Tarantool-patches Reply-To: Cyrill Gorcunov Cc: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" When we receive synchro requests we can't just apply them blindly because in worse case they may come from split-brain configuration (where a cluster split into several clusters and each one has own leader elected, then clusters are trying to merge back into the original one). We need to do our best to detect such disunity and force these nodes to rejoin from the scratch for data consistency sake. Thus when we're processing requests we pass them to the packet filter first which validates their contents and refuse to apply if they are not matched. Filter logic depends on request type. First there is a common chain for any synchro packet, this is kind of a general pre-validation: 1) Zero LSN allowed for PROMOTE | DEMOTE packets, since CONFIRM | ROLLBACK has to proceed some real data with LSN already assigned. 2) request:replica_id = 0 allowed for PROMOTE request only. 3) request:replica_id should match limbo:owner_id, iow the limbo migration should be noticed by all instances in the cluster. For CONFIRM and ROLLBACK packets: 1) Both of them can't be considered if limbo is already empty, ie there is no data in a local queue and everything is processed already. The request is obviously from the node which is out of date. For PROMOTE and DEMOTE packets: 1) The requests should come in with nonzero term, otherwise the packet is corrupted. 2) The request's term should not be less than maximal known one, iow it should not come in from nodes which didn't notice raft epoch changes and living in the past. 3) If LSN of the request matches current confirmed LSN the packet is obviously correct to process. 4) If LSN is less than confirmed LSN then the request is wrong, we have processed the requested LSN already. 5) If LSN is less than confirmed LSN then a) If limbo is empty we can't do anything, since data is already processed and should issue an error; b) If there is some data in the limbo then requested LSN should be in range of limbo's [first; last] LSNs, thus the request will be able to commit and rollback limbo queue. Because snapshot have promote packet we disable filtering at moment of joining to the leader node and similarly due to recovery. The thing is that our filtration procedure implies that limbo is already initialized to some valid state otherwise we will have to distinguish initial states from working ones, this can be done actuially but will make code more complex. Thus for now lets leave filtration on and off. Closes #6036 Signed-off-by: Cyrill Gorcunov --- .../gh-6036-qsync-filter-packets.md | 9 + src/box/applier.cc | 21 +- src/box/box.cc | 30 +- src/box/errcode.h | 1 + src/box/memtx_engine.cc | 3 +- src/box/txn_limbo.c | 309 +++++++++++++++--- src/box/txn_limbo.h | 33 +- test/box/error.result | 1 + 8 files changed, 350 insertions(+), 57 deletions(-) create mode 100644 changelogs/unreleased/gh-6036-qsync-filter-packets.md diff --git a/changelogs/unreleased/gh-6036-qsync-filter-packets.md b/changelogs/unreleased/gh-6036-qsync-filter-packets.md new file mode 100644 index 000000000..0db629e83 --- /dev/null +++ b/changelogs/unreleased/gh-6036-qsync-filter-packets.md @@ -0,0 +1,9 @@ +## feature/replication + +* Implemented incoming synchronous packets filtration to discard + requests from outdated cluster nodes. This can happen when + replication cluster is partitioned on a transport level and + two or more sub-clusters are running simultaneously for some + time, then they are trying to merge back. Since the subclusters + had own leaders they should not be able to form original cluster + because data is not longer consistent (gh-6036). diff --git a/src/box/applier.cc b/src/box/applier.cc index 845a7d015..45098e3dd 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -458,7 +458,8 @@ applier_wait_snapshot(struct applier *applier) struct synchro_request req; if (xrow_decode_synchro(&row, &req) != 0) diag_raise(); - txn_limbo_process(&txn_limbo, &req); + if (txn_limbo_process(&txn_limbo, &req) != 0) + diag_raise(); } else if (iproto_type_is_raft_request(row.type)) { struct raft_request req; if (xrow_decode_raft(&row, &req, NULL) != 0) @@ -514,6 +515,11 @@ applier_fetch_snapshot(struct applier *applier) struct ev_io *coio = &applier->io; struct xrow_header row; + txn_limbo_filter_disable(&txn_limbo); + auto filter_guard = make_scoped_guard([&]{ + txn_limbo_filter_enable(&txn_limbo); + }); + memset(&row, 0, sizeof(row)); row.type = IPROTO_FETCH_SNAPSHOT; coio_write_xrow(coio, &row); @@ -587,6 +593,11 @@ applier_register(struct applier *applier, bool was_anon) struct ev_io *coio = &applier->io; struct xrow_header row; + txn_limbo_filter_disable(&txn_limbo); + auto filter_guard = make_scoped_guard([&]{ + txn_limbo_filter_enable(&txn_limbo); + }); + memset(&row, 0, sizeof(row)); /* * Send this instance's current vclock together @@ -620,6 +631,11 @@ applier_join(struct applier *applier) struct xrow_header row; uint64_t row_count; + txn_limbo_filter_disable(&txn_limbo); + auto filter_guard = make_scoped_guard([&]{ + txn_limbo_filter_enable(&txn_limbo); + }); + xrow_encode_join_xc(&row, &INSTANCE_UUID); coio_write_xrow(coio, &row); @@ -874,6 +890,9 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row) goto err; txn_limbo_term_lock(&txn_limbo); + if (txn_limbo_filter_locked(&txn_limbo, &req) != 0) + goto err; + struct replica_cb_data rcb_data; struct synchro_entry entry; /* diff --git a/src/box/box.cc b/src/box/box.cc index 7b11d56d6..f134dc8bb 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -424,8 +424,7 @@ wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row) say_error("couldn't decode a synchro request"); return -1; } - txn_limbo_process(&txn_limbo, &syn_req); - return 0; + return txn_limbo_process(&txn_limbo, &syn_req); } static int @@ -1671,7 +1670,7 @@ box_wait_limbo_acked(double timeout) } /** Write and process a PROMOTE request. */ -static void +static int box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn) { struct raft *raft = box_raft(); @@ -1686,15 +1685,17 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn) .lsn = promote_lsn, .term = raft->term, }; - txn_limbo_process(&txn_limbo, &req); + if (txn_limbo_process(&txn_limbo, &req) != 0) + return -1; assert(txn_limbo_is_empty(&txn_limbo)); + return 0; } /** A guard to block multiple simultaneous promote()/demote() invocations. */ static bool is_in_box_promote = false; /** Write and process a DEMOTE request. */ -static void +static int box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn) { assert(box_raft()->volatile_term == box_raft()->term); @@ -1708,8 +1709,10 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn) .lsn = promote_lsn, .term = box_raft()->term, }; - txn_limbo_process(&txn_limbo, &req); + if (txn_limbo_process(&txn_limbo, &req) != 0) + return -1; assert(txn_limbo_is_empty(&txn_limbo)); + return 0; } int @@ -1732,8 +1735,7 @@ box_promote_qsync(void) diag_set(ClientError, ER_NOT_LEADER, raft->leader); return -1; } - box_issue_promote(txn_limbo.owner_id, wait_lsn); - return 0; + return box_issue_promote(txn_limbo.owner_id, wait_lsn); } int @@ -1789,9 +1791,7 @@ box_promote(void) if (wait_lsn < 0) return -1; - box_issue_promote(txn_limbo.owner_id, wait_lsn); - - return 0; + return box_issue_promote(txn_limbo.owner_id, wait_lsn); } int @@ -1826,8 +1826,7 @@ box_demote(void) int64_t wait_lsn = box_wait_limbo_acked(replication_synchro_timeout); if (wait_lsn < 0) return -1; - box_issue_demote(txn_limbo.owner_id, wait_lsn); - return 0; + return box_issue_demote(txn_limbo.owner_id, wait_lsn); } int @@ -3296,6 +3295,11 @@ local_recovery(const struct tt_uuid *instance_uuid, say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID)); + txn_limbo_filter_disable(&txn_limbo); + auto filter_guard = make_scoped_guard([&]{ + txn_limbo_filter_enable(&txn_limbo); + }); + struct wal_stream wal_stream; wal_stream_create(&wal_stream); auto stream_guard = make_scoped_guard([&]{ diff --git a/src/box/errcode.h b/src/box/errcode.h index a6f096698..002fcc1e1 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -283,6 +283,7 @@ struct errcode_record { /*228 */_(ER_SYNC_QUEUE_FOREIGN, "The synchronous transaction queue belongs to other instance with id %u")\ /*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process %s request in stream") \ /*227 */_(ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, "Unable to process %s request out of stream") \ + /*228 */_(ER_CLUSTER_SPLIT, "Cluster split detected. %s") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index de918c335..de4298929 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -238,7 +238,8 @@ memtx_engine_recover_synchro(const struct xrow_header *row) * because all its rows have a zero replica_id. */ req.origin_id = req.replica_id; - txn_limbo_process(&txn_limbo, &req); + if (txn_limbo_process(&txn_limbo, &req) != 0) + return -1; return 0; } diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index 65fbd0cac..925f401e7 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -52,6 +52,7 @@ txn_limbo_create(struct txn_limbo *limbo) limbo->rollback_count = 0; limbo->is_in_rollback = false; limbo->has_initial_promote = false; + limbo->is_filtering = true; } bool @@ -737,6 +738,261 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout) return 0; } +/** + * Fill the reject reason with request data. + * The function is not reenterable, use with caution. + */ +static char * +reject_str(const struct synchro_request *req) +{ + static char prefix[128]; + + snprintf(prefix, sizeof(prefix), "RAFT: rejecting %s (%d) " + "request from origin_id %u replica_id %u term %llu", + iproto_type_name(req->type), req->type, + req->origin_id, req->replica_id, + (long long)req->term); + + return prefix; +} + +/** + * Common chain for any incoming packet. + */ +static int +filter_in(struct txn_limbo *limbo, const struct synchro_request *req) +{ + assert(limbo->is_filtering); + + /* + * Zero LSN are allowed for PROMOTE + * and DEMOTE requests only. + */ + if (req->lsn == 0 && !iproto_type_is_promote_request(req->type)) { + say_error("%s. Zero lsn detected", reject_str(req)); + diag_set(ClientError, ER_CLUSTER_SPLIT, + "Zero LSN on promote/demote"); + return -1; + } + + /* + * Zero @a replica_id is allowed for PROMOTE packets only. + */ + if (req->replica_id == REPLICA_ID_NIL && + req->type != IPROTO_RAFT_PROMOTE) { + say_error("%s. Zero replica_id detected", + reject_str(req)); + diag_set(ClientError, ER_CLUSTER_SPLIT, + "Zero replica_id"); + return -1; + } + + /* + * Incoming packets should esteem limbo owner, + * if it doesn't match it means the sender + * missed limbo owner migrations and out of date. + */ + if (req->replica_id != limbo->owner_id) { + say_error("%s. Limbo owner mismatch, owner_id %u", + reject_str(req), limbo->owner_id); + diag_set(ClientError, ER_CLUSTER_SPLIT, + "Sync queue silent owner migration"); + return -1; + } + + return 0; +} + +/** + * Filter CONFIRM and ROLLBACK packets. + */ +static int +filter_confirm_rollback(struct txn_limbo *limbo, + const struct synchro_request *req) +{ + assert(limbo->is_filtering); + + /* + * When limbo is empty we have nothing to + * confirm/commit and if this request comes + * in it means the split brain has happened. + */ + if (!txn_limbo_is_empty(limbo)) + return 0; + + say_error("%s. Empty limbo detected", reject_str(req)); + diag_set(ClientError, ER_CLUSTER_SPLIT, + "Confirm/rollback with empty limbo"); + return -1; +} + +/** + * Filter PROMOTE and DEMOTE packets. + */ +static int +filter_promote_demote(struct txn_limbo *limbo, + const struct synchro_request *req) +{ + assert(limbo->is_filtering); + + /* + * PROMOTE and DEMOTE packets must not have zero + * term supplied, otherwise it is a broken packet. + */ + if (req->term == 0) { + say_error("%s. Zero term detected", reject_str(req)); + diag_set(ClientError, ER_CLUSTER_SPLIT, + "Request with zero term"); + return -1; + } + + /* + * If the term is already seen it means it comes + * from a node which didn't notice new elections, + * thus been living in subdomain and its data is + * no longer consistent. + */ + if (limbo->promote_greatest_term > req->term) { + say_error("%s. Max term seen is %llu", reject_str(req), + (long long)limbo->promote_greatest_term); + diag_set(ClientError, ER_CLUSTER_SPLIT, + "Obsolete terms"); + return -1; + } + + int64_t promote_lsn = req->lsn; + + /* + * Easy case -- processed LSN matches the new + * one which comes inside request, everything + * is consistent. + */ + if (limbo->confirmed_lsn == promote_lsn) + return 0; + + /* + * Explicit split brain situation. Promote + * comes in with an old LSN which we've already + * processed. + */ + if (limbo->confirmed_lsn > promote_lsn) { + say_error("%s. confirmed_lsn %lld > promote_lsn %lld", + reject_str(req), + (long long)limbo->confirmed_lsn, + (long long)promote_lsn); + diag_set(ClientError, ER_CLUSTER_SPLIT, + "Backward promote LSN"); + return -1; + } + + /* + * The last case requires a few subcases. + */ + assert(limbo->confirmed_lsn < promote_lsn); + + if (txn_limbo_is_empty(limbo)) { + /* + * Transactions are rolled back already, + * since the limbo is empty. + */ + say_error("%s. confirmed_lsn %lld < promote_lsn %lld " + "and empty limbo", reject_str(req), + (long long)limbo->confirmed_lsn, + (long long)promote_lsn); + diag_set(ClientError, ER_CLUSTER_SPLIT, + "Forward promote LSN"); + return -1; + } + + /* + * Some entries are present in the limbo, + * we need to make sure the @a promote_lsn + * lays inside limbo [first; last] range. + * So that the promote request has some + * queued data to process, otherwise it + * means the request comes from split + * brained node. + */ + struct txn_limbo_entry *first, *last; + + first = txn_limbo_first_entry(limbo); + last = txn_limbo_last_entry(limbo); + + if (first->lsn > promote_lsn || last->lsn < promote_lsn) { + say_error("%s. promote_lsn %lld out of " + "range [%lld; %lld]", + reject_str(req), + (long long)promote_lsn, + (long long)first->lsn, + (long long)last->lsn); + diag_set(ClientError, ER_CLUSTER_SPLIT, + "Promote LSN out of queue range"); + return -1; + } + + return 0; +} + +int +txn_limbo_filter_locked(struct txn_limbo *limbo, + const struct synchro_request *req) +{ + assert(latch_is_locked(&limbo->promote_latch)); + +#ifndef NDEBUG + say_info("limbo: filter %s replica_id %u origin_id %u " + "term %lld lsn %lld, queue owner_id %u len %lld " + "promote_greatest_term %lld confirmed_lsn %lld (%s)", + iproto_type_name(req->type), + req->replica_id, req->origin_id, + (long long)req->term, (long long)req->lsn, + limbo->owner_id, (long long)limbo->len, + (long long)limbo->promote_greatest_term, + (long long)limbo->confirmed_lsn, + limbo->is_filtering ? "on" : "off"); +#endif + + /* + * Our filtering engine implies that limbo is + * in "steady" state where variables are initialized, + * thus filtering prevent wrong data to step in. Still + * there are stages such as local recovery and joining + * to another leader node where we fetch an initial state + * of the limbo such as we can't apply the filtering rules + * at this moment. + */ + if (!limbo->is_filtering) + return 0; + + if (filter_in(limbo, req) != 0) + return -1; + + switch (req->type) { + case IPROTO_RAFT_CONFIRM: + case IPROTO_RAFT_ROLLBACK: + if (filter_confirm_rollback(limbo, req) != 0) + return -1; + break; + case IPROTO_RAFT_PROMOTE: + case IPROTO_RAFT_DEMOTE: + if (filter_promote_demote(limbo, req) != 0) + return -1; + break; + default: + say_error("RAFT: rejecting unexpected %d " + "request from instance id %u " + "for term %llu.", + req->type, req->origin_id, + (long long)req->term); + diag_set(ClientError, ER_UNSUPPORTED, + "Replication", + "unexpected request type"); + return -1; + } + + return 0; +} + void txn_limbo_process_locked(struct txn_limbo *limbo, const struct synchro_request *req) @@ -745,71 +1001,42 @@ txn_limbo_process_locked(struct txn_limbo *limbo, uint64_t term = req->term; uint32_t origin = req->origin_id; + if (txn_limbo_replica_term_locked(limbo, origin) < term) { vclock_follow(&limbo->promote_term_map, origin, term); if (term > limbo->promote_greatest_term) limbo->promote_greatest_term = term; - } else if (iproto_type_is_promote_request(req->type) && - limbo->promote_greatest_term > 1) { - /* PROMOTE for outdated term. Ignore. */ - say_info("RAFT: ignoring %s request from instance " - "id %u for term %llu. Greatest term seen " - "before (%llu) is bigger.", - iproto_type_name(req->type), origin, (long long)term, - (long long)limbo->promote_greatest_term); - return; } - int64_t lsn = req->lsn; - if (req->replica_id == REPLICA_ID_NIL) { - /* - * The limbo was empty on the instance issuing the request. - * This means this instance must empty its limbo as well. - */ - assert(lsn == 0 && iproto_type_is_promote_request(req->type)); - } else if (req->replica_id != limbo->owner_id) { - /* - * Ignore CONFIRM/ROLLBACK messages for a foreign master. - * These are most likely outdated messages for already confirmed - * data from an old leader, who has just started and written - * confirm right on synchronous transaction recovery. - */ - if (!iproto_type_is_promote_request(req->type)) - return; - /* - * Promote has a bigger term, and tries to steal the limbo. It - * means it probably was elected with a quorum, and it makes no - * sense to wait here for confirmations. The other nodes already - * elected a new leader. Rollback all the local txns. - */ - lsn = 0; - } switch (req->type) { case IPROTO_RAFT_CONFIRM: - txn_limbo_read_confirm(limbo, lsn); + txn_limbo_read_confirm(limbo, req->lsn); break; case IPROTO_RAFT_ROLLBACK: - txn_limbo_read_rollback(limbo, lsn); + txn_limbo_read_rollback(limbo, req->lsn); break; case IPROTO_RAFT_PROMOTE: - txn_limbo_read_promote(limbo, req->origin_id, lsn); + txn_limbo_read_promote(limbo, req->origin_id, req->lsn); break; case IPROTO_RAFT_DEMOTE: - txn_limbo_read_demote(limbo, lsn); + txn_limbo_read_demote(limbo, req->lsn); break; default: - unreachable(); + panic("limbo: unexpected request type %d", req->type); + break; } - return; } -void +int txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) { txn_limbo_term_lock(limbo); - txn_limbo_process_locked(limbo, req); + int rc = txn_limbo_filter_locked(limbo, req); + if (rc == 0) + txn_limbo_process_locked(limbo, req); txn_limbo_term_unlock(limbo); + return rc; } void diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index 1ee815d1c..74c77c16b 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -190,6 +190,14 @@ struct txn_limbo { * procedure. */ bool has_initial_promote; + /** + * Whether the limbo should filter incoming requests. + * The phases of local recovery from WAL file and on applier's + * join phase we are in complete trust of incoming data because + * this data forms an initial limbo state and should not + * filter out requests. + */ + bool is_filtering; }; /** @@ -339,15 +347,38 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); +/** + * Verify if the request is valid for processing. + */ +int +txn_limbo_filter_locked(struct txn_limbo *limbo, + const struct synchro_request *req); + /** Execute a synchronous replication request. */ void txn_limbo_process_locked(struct txn_limbo *limbo, const struct synchro_request *req); /** Lock limbo terms and execute a synchronous replication request. */ -void +int txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req); +/** Enable filtering of synchro requests. */ +static inline void +txn_limbo_filter_enable(struct txn_limbo *limbo) +{ + limbo->is_filtering = true; + say_info("limbo: filter enabled"); +} + +/** Disable filtering of synchro requests. */ +static inline void +txn_limbo_filter_disable(struct txn_limbo *limbo) +{ + limbo->is_filtering = false; + say_info("limbo: filter disabled"); +} + /** * Waiting for confirmation of all "sync" transactions * during confirm timeout or fail. diff --git a/test/box/error.result b/test/box/error.result index bc804197a..45ea7714c 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -449,6 +449,7 @@ t; | 228: box.error.SYNC_QUEUE_FOREIGN | 229: box.error.UNABLE_TO_PROCESS_IN_STREAM | 230: box.error.UNABLE_TO_PROCESS_OUT_OF_STREAM + | 231: box.error.CLUSTER_SPLIT | ... test_run:cmd("setopt delimiter ''"); -- 2.31.1