From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 1AD996EC55; Sun, 11 Jul 2021 01:28:13 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 1AD996EC55 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1625956093; bh=21bynDlca9pIVMJ/n1CP9+yoC6+8+h30I4oyRFmfY3I=; h=To:Date:Subject:List-Id:List-Unsubscribe:List-Archive:List-Post: List-Help:List-Subscribe:From:Reply-To:Cc:From; b=AZlbbF2KwAFII/2KYgPMGnn/pEs2ns7AeNGbzgQEbDg4ubilOq/AKAiWG32GgSDHG 2niMtW1MoLycv9noZTEt59jVdN10agdK38Uv+OKanSAD7n/bsIBiM4RWTe9mQTJ/0k joVpM61Yw1F2tQwXNtEspW78e0ts4MIpCLK2XTpU= Received: from mail-lf1-f48.google.com (mail-lf1-f48.google.com [209.85.167.48]) (using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C667F6EC55 for ; Sun, 11 Jul 2021 01:28:08 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org C667F6EC55 Received: by mail-lf1-f48.google.com with SMTP id x25so19419090lfu.13 for ; Sat, 10 Jul 2021 15:28:08 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:mime-version :content-transfer-encoding; bh=Tk0VN69Nc97hWeX9hOe7kWBjERYj6rGxaEeWdnSRUlM=; b=TNK3fIrA/RpT8y4NRdKmCr9/OwF2L11vfM+nYLegc4mTp9fEbVnG2kw/3cZgGBnAih mI2lG1D3rr5rgx9QZNOccws37pcFNQsuEfJrqWAACobd6h9a4xv+cxTat7y5ccXvbbf3 qTtsMWPmwDG09Zu5GWZBEe1BZwMqjg/CXtKi3K4KBgRunnBSS/PaBuD23C2RQAxpaw0Z K8x8kfgDwD1Ftp/uZSqaoqeT8QCmnF4OVFuwEBn9tc+r+6wnjiWjUGnoaRelPJ1znltf UCYiORBU9iipdaHcnmS+uH5/rn7CrwRKs0yxE90AzkjWSQfKjfVMFA9b+AtnPuyBHxzH 4AMQ== X-Gm-Message-State: AOAM5321l3jPE84f+/G9K0iWoli4pIseogrGyZqTj9QVllrFEd5tGCbs taDUfy8GQHYNni2hHlYTompSYt5F1RHnkQ== X-Google-Smtp-Source: ABdhPJyn3RZ++WjTtUgIAhEdiivYyo8vM4VRmjfZILfaSpdiV5AKyyTQ4OaHbKjQ3och2tbu/5WlYA== X-Received: by 2002:ac2:5221:: with SMTP id i1mr21913157lfl.273.1625956087647; Sat, 10 Jul 2021 15:28:07 -0700 (PDT) Received: from grain.localdomain ([5.18.199.94]) by smtp.gmail.com with ESMTPSA id y11sm799829lfl.284.2021.07.10.15.28.06 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Sat, 10 Jul 2021 15:28:06 -0700 (PDT) Received: by grain.localdomain (Postfix, from userid 1000) id F2E725A001E; Sun, 11 Jul 2021 01:28:05 +0300 (MSK) To: tml Date: Sun, 11 Jul 2021 01:28:03 +0300 Message-Id: <20210710222803.253251-1-gorcunov@gmail.com> X-Mailer: git-send-email 2.31.1 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH] limbo: introduce request processing hooks X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Cyrill Gorcunov via Tarantool-patches Reply-To: Cyrill Gorcunov Cc: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Guys, this is an early rfc since I would like to discuss the design first before going further. Currently we don't interrupt incoming syncro requests which doesn't allow us to detect cluster split-brain situation, as we were discussing verbally there are a number of sign to detect it and we need to stop receiving data from obsolete nodes. The main problem though is that such filtering of incoming packets should happen at the moment where we still can do a step back and inform the peer that data has been declined, but currently our applier code process syncro requests inside WAL trigger, ie when data is already applied or rolling back. Thus we need to separate "filer" and "apply" stages of processing. What is more interesting is that we filter incomings via in memory vclock and update them immediately. Thus the following situation is possible -- a promote request comes in, we remember it inside promote_term_map but then write to WAL fails and we never revert the promote_term_map variable, thus other peer won't be able to resend us this promote request because now we think that we've alreday applied the promote. To solve this I split processing routine into stages: - filter stage: we investigate infly packets and remember their terms in @terms_infly vclock - apply stage: the data is verified and we try to apply the data and write it to the disk, once everything is fine we update @terms_applied vclock which serves as a backup of @terms_infly - error stage: data application failed and we should revert @terms_infly vclock to the previous value The stages are processed by txn_limbo_apply routine which takes a mask of stages it should execute. Old txn_limbo_process is simply an inline wrapper with appropriate flags. Please note that I _didn't_ put complete conditions into limbo_op_filter yet only moved current code there. So take a look once time permit, maybe there some easier approach in code structure. branch gorcunov/gh-6036-rollback-confirm-notest Signed-off-by: Cyrill Gorcunov --- src/box/applier.cc | 13 +++- src/box/box.cc | 6 +- src/box/txn_limbo.c | 149 ++++++++++++++++++++++++++++++++++++++------ src/box/txn_limbo.h | 97 ++++++++++++++++++++++------ 4 files changed, 223 insertions(+), 42 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 978383e64..8a44bf1b2 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -458,7 +458,8 @@ applier_wait_snapshot(struct applier *applier) struct synchro_request req; if (xrow_decode_synchro(&row, &req) != 0) diag_raise(); - txn_limbo_process(&txn_limbo, &req); + if (txn_limbo_process(&txn_limbo, &req) != 0) + diag_raise(); } else if (iproto_type_is_raft_request(row.type)) { struct raft_request req; if (xrow_decode_raft(&row, &req, NULL) != 0) @@ -850,11 +851,16 @@ apply_synchro_row_cb(struct journal_entry *entry) assert(entry->complete_data != NULL); struct synchro_entry *synchro_entry = (struct synchro_entry *)entry->complete_data; + struct synchro_request *req = synchro_entry->req; + if (entry->res < 0) { + txn_limbo_apply(&txn_limbo, req, + LIMBO_OP_ERROR | LIMBO_OP_ATTR_PANIC); applier_rollback_by_wal_io(entry->res); } else { replica_txn_wal_write_cb(synchro_entry->rcb); - txn_limbo_process(&txn_limbo, synchro_entry->req); + txn_limbo_apply(&txn_limbo, synchro_entry->req, + LIMBO_OP_APPLY | LIMBO_OP_ATTR_PANIC); trigger_run(&replicaset.applier.on_wal_write, NULL); } fiber_wakeup(synchro_entry->owner); @@ -870,6 +876,9 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row) if (xrow_decode_synchro(row, &req) != 0) goto err; + if (txn_limbo_apply(&txn_limbo, &req, LIMBO_OP_FILTER) != 0) + goto err; + struct replica_cb_data rcb_data; struct synchro_entry entry; /* diff --git a/src/box/box.cc b/src/box/box.cc index bc68ee4c8..79bacfa08 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -428,7 +428,8 @@ wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row) say_error("couldn't decode a synchro request"); return -1; } - txn_limbo_process(&txn_limbo, &syn_req); + if (txn_limbo_process(&txn_limbo, &syn_req) != 0) + return -1; return 0; } @@ -1701,7 +1702,8 @@ box_clear_synchro_queue(bool demote) .lsn = wait_lsn, .term = term, }; - txn_limbo_process(&txn_limbo, &req); + if (txn_limbo_process(&txn_limbo, &req) != 0) + return -1; assert(txn_limbo_is_empty(&txn_limbo)); } } diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index d2e4dcb1e..0650702b9 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -37,6 +37,15 @@ struct txn_limbo txn_limbo; +static void +txn_limbo_promote_create(struct txn_limbo_promote *pmt) +{ + vclock_create(&pmt->terms_applied); + vclock_create(&pmt->terms_infly); + pmt->term_max = 0; + pmt->term_max_infly = 0; +} + static inline void txn_limbo_create(struct txn_limbo *limbo) { @@ -45,11 +54,11 @@ txn_limbo_create(struct txn_limbo *limbo) limbo->owner_id = REPLICA_ID_NIL; fiber_cond_create(&limbo->wait_cond); vclock_create(&limbo->vclock); - vclock_create(&limbo->promote_term_map); - limbo->promote_greatest_term = 0; limbo->confirmed_lsn = 0; limbo->rollback_count = 0; limbo->is_in_rollback = false; + + txn_limbo_promote_create(&limbo->promote); } bool @@ -305,10 +314,12 @@ void txn_limbo_checkpoint(const struct txn_limbo *limbo, struct synchro_request *req) { + const struct txn_limbo_promote *pmt = &limbo->promote; + req->type = IPROTO_PROMOTE; req->replica_id = limbo->owner_id; req->lsn = limbo->confirmed_lsn; - req->term = limbo->promote_greatest_term; + req->term = pmt->term_max_infly; } void @@ -696,27 +707,38 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo) return 0; } -void -txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) +static int +limbo_op_filter(struct txn_limbo *limbo, const struct synchro_request *req) { - uint64_t term = req->term; + struct txn_limbo_promote *pmt = &limbo->promote; uint32_t origin = req->origin_id; + uint64_t term = req->term; + if (txn_limbo_replica_term(limbo, origin) < term) { - vclock_follow(&limbo->promote_term_map, origin, term); - if (term > limbo->promote_greatest_term) - limbo->promote_greatest_term = term; + vclock_follow(&pmt->terms_infly, origin, term); + if (term > pmt->term_max_infly) + pmt->term_max_infly = term; } else if (iproto_type_is_promote_request(req->type) && - limbo->promote_greatest_term > 1) { - /* PROMOTE for outdated term. Ignore. */ - say_info("RAFT: ignoring %s request from instance " + pmt->term_max_infly > 1) { + say_info("RAFT: declining %s request from instance " "id %u for term %llu. Greatest term seen " "before (%llu) is bigger.", - iproto_type_name(req->type), origin, (long long)term, - (long long)limbo->promote_greatest_term); - return; + iproto_type_name(req->type), origin, + (long long)term, + (long long)pmt->term_max_infly); + diag_set(ClientError, ER_UNSUPPORTED, "RAFT", + "backward terms"); + return -1; } + return 0; +} + +static int +limbo_op_apply(struct txn_limbo *limbo, const struct synchro_request *req) +{ int64_t lsn = req->lsn; + if (req->replica_id == REPLICA_ID_NIL) { /* * The limbo was empty on the instance issuing the request. @@ -731,7 +753,7 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) * confirm right on synchronous transaction recovery. */ if (!iproto_type_is_promote_request(req->type)) - return; + goto out; /* * Promote has a bigger term, and tries to steal the limbo. It * means it probably was elected with a quorum, and it makes no @@ -740,6 +762,7 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) */ lsn = 0; } + switch (req->type) { case IPROTO_CONFIRM: txn_limbo_read_confirm(limbo, lsn); @@ -754,9 +777,99 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) txn_limbo_read_demote(limbo, lsn); break; default: - unreachable(); + panic("limbo: unreacheable stage detected"); + break; } - return; + + +out: + struct txn_limbo_promote *pmt = &limbo->promote; + uint32_t replica_id = req->origin_id; + uint64_t term = req->term; + + uint64_t v = vclock_get(&pmt->terms_applied, replica_id); + if (v < term) { + vclock_follow(&pmt->terms_applied, replica_id, term); + if (term > pmt->term_max) + pmt->term_max = term; + } + return 0; +} + +static int +limbo_op_error(struct txn_limbo *limbo, const struct synchro_request *req) +{ + struct txn_limbo_promote *pmt = &limbo->promote; + uint32_t replica_id = req->origin_id; + /* + * Restore to the applied value in case of error, + * this will allow to reapply the entry when remote + * node get error and try to resend data. + */ + uint64_t v = vclock_get(&pmt->terms_applied, replica_id); + vclock_reset(&pmt->terms_infly, replica_id, v); + + /* + * The max value has to be recalculated in a linear + * form, the errors should not happen frequently so + * this is not a hot path. + */ + int64_t maxv = 0; + struct vclock_iterator it; + vclock_iterator_init(&it, &pmt->terms_infly); + vclock_foreach(&it, r) + maxv = r.lsn > maxv ? r.lsn : maxv; + pmt->term_max_infly = maxv; + return 0; +} + +static int (*limbo_apply_ops[LIMBO_OP_MAX]) + (struct txn_limbo *limbo, const struct synchro_request *req) = { + [LIMBO_OP_FILTER_BIT] = limbo_op_filter, + [LIMBO_OP_APPLY_BIT] = limbo_op_apply, + [LIMBO_OP_ERROR_BIT] = limbo_op_error, +}; + +static const char * +limbo_apply_op_str(unsigned int bit) +{ + static const char *str[] = { + [LIMBO_OP_FILTER_BIT] = "LIMBO_OP_FILTER", + [LIMBO_OP_APPLY_BIT] = "LIMBO_OP_APPLY", + [LIMBO_OP_ERROR_BIT] = "LIMBO_OP_ERROR", + }; + + if (bit < lengthof(limbo_apply_ops)) + return str[bit]; + + return "UNKNOWN"; +} + +int +txn_limbo_apply(struct txn_limbo *limbo, + const struct synchro_request *req, + unsigned int op_mask) +{ + unsigned int mask = op_mask & LIMBO_OP_MASK; + unsigned int bit = LIMBO_OP_MIN; + + while (mask) { + if (mask & 1) { + if (limbo_apply_ops[bit] == NULL) + panic("limbo: empty apply operation"); + say_info("limbo: apply operation %s", + limbo_apply_op_str(bit)); + if (limbo_apply_ops[bit](limbo, req) != 0) { + if (op_mask & LIMBO_OP_ATTR_PANIC) + panic("limbo: panicing op"); + return -1; + } + } + mask >>= 1; + bit++; + }; + + return 0; } void diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index 8e7315947..b93450f65 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -75,6 +75,40 @@ txn_limbo_entry_is_complete(const struct txn_limbo_entry *e) return e->is_commit || e->is_rollback; } +/** + * To keep state of promote requests to handle split-brain + * situation and other errors. + */ +struct txn_limbo_promote { + /** + * Latest terms received with PROMOTE entries from remote instances. + * Limbo uses them to filter out the transactions coming not from the + * limbo owner, but so outdated that they are rolled back everywhere + * except outdated nodes. + */ + struct vclock terms_applied; + /** + * Infly replresentation of @a terms_applied, the term might + * be not yet written to WAL but already seen. + */ + struct vclock terms_infly; + /** + * The biggest PROMOTE term seen by the instance and persisted in WAL. + * It is related to raft term, but not the same. Synchronous replication + * represented by the limbo is interested only in the won elections + * ended with PROMOTE request. + * It means the limbo's term might be smaller than the raft term, while + * there are ongoing elections, or the leader is already known and this + * instance hasn't read its PROMOTE request yet. During other times the + * limbo and raft are in sync and the terms are the same. + */ + uint64_t term_max; + /** + * Infly representation of @a term_max. + */ + uint64_t term_max_infly; +}; + /** * Limbo is a place where transactions are stored, which are * finished, but not committed nor rolled back. These are @@ -130,23 +164,9 @@ struct txn_limbo { */ struct vclock vclock; /** - * Latest terms received with PROMOTE entries from remote instances. - * Limbo uses them to filter out the transactions coming not from the - * limbo owner, but so outdated that they are rolled back everywhere - * except outdated nodes. + * To track PROMOTE requests. */ - struct vclock promote_term_map; - /** - * The biggest PROMOTE term seen by the instance and persisted in WAL. - * It is related to raft term, but not the same. Synchronous replication - * represented by the limbo is interested only in the won elections - * ended with PROMOTE request. - * It means the limbo's term might be smaller than the raft term, while - * there are ongoing elections, or the leader is already known and this - * instance hasn't read its PROMOTE request yet. During other times the - * limbo and raft are in sync and the terms are the same. - */ - uint64_t promote_greatest_term; + struct txn_limbo_promote promote; /** * Maximal LSN gathered quorum and either already confirmed in WAL, or * whose confirmation is in progress right now. Any attempt to confirm @@ -218,7 +238,8 @@ txn_limbo_last_entry(struct txn_limbo *limbo) static inline uint64_t txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id) { - return vclock_get(&limbo->promote_term_map, replica_id); + const struct txn_limbo_promote *pmt = &limbo->promote; + return vclock_get(&pmt->terms_infly, replica_id); } /** @@ -229,8 +250,9 @@ static inline bool txn_limbo_is_replica_outdated(const struct txn_limbo *limbo, uint32_t replica_id) { + const struct txn_limbo_promote *pmt = &limbo->promote; return txn_limbo_replica_term(limbo, replica_id) < - limbo->promote_greatest_term; + pmt->term_max_infly; } /** @@ -300,9 +322,44 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); int txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); +enum { + LIMBO_OP_MIN = 0, + LIMBO_OP_FILTER_BIT = 0, + LIMBO_OP_APPLY_BIT = 1, + LIMBO_OP_ERROR_BIT = 2, + LIMBO_OP_MAX, + + LIMBO_OP_ATTR_MIN = LIMBO_OP_MAX, + LIMBO_OP_ATTR_PANIC_BIT = LIMBO_OP_ATTR_MIN + 1, + LIMBO_OP_ATTR_MAX, +}; + +enum { + LIMBO_OP_FILTER = (1u << LIMBO_OP_FILTER_BIT), + LIMBO_OP_APPLY = (1u << LIMBO_OP_APPLY_BIT), + LIMBO_OP_ERROR = (1u << LIMBO_OP_ERROR_BIT), + LIMBO_OP_MASK = (1u << LIMBO_OP_MAX) - 1, + + LIMBO_OP_ATTR_PANIC = (1u << LIMBO_OP_ATTR_PANIC_BIT), +}; + +/** + * Apply a synchronous replication request, the @a op_mask + * specifies stages to process. + */ +int +txn_limbo_apply(struct txn_limbo *limbo, + const struct synchro_request *req, + unsigned int op_mask); + /** Execute a synchronous replication request. */ -void -txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req); +static inline int +txn_limbo_process(struct txn_limbo *limbo, + const struct synchro_request *req) +{ + const int op_mask = LIMBO_OP_FILTER | LIMBO_OP_APPLY; + return txn_limbo_apply(limbo, req, op_mask); +} /** * Waiting for confirmation of all "sync" transactions -- 2.31.1