[Tarantool-patches] [PATCH v10 3/4] limbo: filter incoming synchro requests

Cyrill Gorcunov gorcunov at gmail.com
Wed Aug 4 22:07:51 MSK 2021


When we receive synchro requests we can't just apply
them blindly because in worse case they may come from
split-brain configuration (where a cluster split into
several clusters and each one has own leader elected,
then clusters are trying to merge back into the original
one). We need to do our best to detect such disunity
and force these nodes to rejoin from the scratch for
data consistency sake.

Thus when we're processing requests we pass them to the
packet filter first which validates their contents and
refuse to apply if they are not matched.

Depending on request type each packet traverse an
appropriate chain(s)

                                          +---> FILTER_PROMOTE
                                          |
                                          +---> FILTER_CONFIRM
 reader -> FILTER_IN -> request:{type} -> |
              |                           +---> FILTER_ROLLBACK
              | error                     |
              V                           +---> FILTER_DEMOTE
             exit

FILTER_IN
 Common chain for any synchro packet.
 1) Zero LSN allowed for PROMOTE | DEMOTE packets, since
    CONFIRM | ROLLBACK has to proceed some real data with
    LSN already assigned.
 2) request:replica_id = 0 allowed for PROMOTE request only.
 3) request:replica_id should match limbo:owner_id, iow the
    limbo migration should be noticed by all instances in the
    cluster.

FILTER_CONFIRM
FILTER_ROLLBACK
 1) Both confirm and rollback requests can't be considered
    if limbo is already empty, ie there is no data in a
    local queue and everything is processed already. The
    request is obviously from the node which is out of date.

FILTER_PROMOTE
FILTER_DEMOTE
 1) The requests should come in with nonzero term, otherwise
    the packet is corrupted.
 2) The request's term should not be less than maximal known
    one, iow it should not come in from nodes which didn't notice
    raft epoch changes and living in the past.
 3) If LSN of the request matches current confirmed LSN the packet
    is obviously correct to process.
 4) If LSN is less than confirmed LSN then the request is wrong,
    we have processed the requested LSN already.
 5) If LSN is less than confirmed LSN then
    a) If limbo is empty we can't do anything, since data is already
       processed and should issue an error;
    b) If there is some data in the limbo then requested LSN should
       be in range of limbo's [first; last] LSNs, thus the request
       will be able to commit and rollback limbo queue.

Closes #6036

Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
---
 src/box/applier.cc     |  23 ++-
 src/box/box.cc         |  11 +-
 src/box/memtx_engine.c |   3 +-
 src/box/txn_limbo.c    | 354 ++++++++++++++++++++++++++++++++++++-----
 src/box/txn_limbo.h    |  33 +++-
 5 files changed, 378 insertions(+), 46 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9db286ae2..f64b6fa35 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -458,7 +458,8 @@ applier_wait_snapshot(struct applier *applier)
 				struct synchro_request req;
 				if (xrow_decode_synchro(&row, &req) != 0)
 					diag_raise();
-				txn_limbo_process(&txn_limbo, &req);
+				if (txn_limbo_process(&txn_limbo, &req) != 0)
+					diag_raise();
 			} else if (iproto_type_is_raft_request(row.type)) {
 				struct raft_request req;
 				if (xrow_decode_raft(&row, &req, NULL) != 0)
@@ -514,6 +515,11 @@ applier_fetch_snapshot(struct applier *applier)
 	struct ev_io *coio = &applier->io;
 	struct xrow_header row;
 
+	txn_limbo_filter_disable(&txn_limbo);
+	auto filter_guard = make_scoped_guard([&]{
+		txn_limbo_filter_enable(&txn_limbo);
+	});
+
 	memset(&row, 0, sizeof(row));
 	row.type = IPROTO_FETCH_SNAPSHOT;
 	coio_write_xrow(coio, &row);
@@ -587,6 +593,11 @@ applier_register(struct applier *applier, bool was_anon)
 	struct ev_io *coio = &applier->io;
 	struct xrow_header row;
 
+	txn_limbo_filter_disable(&txn_limbo);
+	auto filter_guard = make_scoped_guard([&]{
+		txn_limbo_filter_enable(&txn_limbo);
+	});
+
 	memset(&row, 0, sizeof(row));
 	/*
 	 * Send this instance's current vclock together
@@ -620,6 +631,11 @@ applier_join(struct applier *applier)
 	struct xrow_header row;
 	uint64_t row_count;
 
+	txn_limbo_filter_disable(&txn_limbo);
+	auto filter_guard = make_scoped_guard([&]{
+		txn_limbo_filter_enable(&txn_limbo);
+	});
+
 	xrow_encode_join_xc(&row, &INSTANCE_UUID);
 	coio_write_xrow(coio, &row);
 
@@ -874,6 +890,11 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
 		goto err;
 
 	txn_limbo_term_lock(&txn_limbo);
+	if (txn_limbo_filter_locked(&txn_limbo, &req) != 0) {
+		txn_limbo_term_unlock(&txn_limbo);
+		goto err;
+	}
+
 	struct replica_cb_data rcb_data;
 	struct synchro_entry entry;
 	/*
diff --git a/src/box/box.cc b/src/box/box.cc
index 8dc3b130b..c3516b7a4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1675,7 +1675,8 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn)
 		.lsn = promote_lsn,
 		.term = raft->term,
 	};
-	txn_limbo_process(&txn_limbo, &req);
+	if (txn_limbo_process(&txn_limbo, &req) != 0)
+		diag_raise();
 	assert(txn_limbo_is_empty(&txn_limbo));
 }
 
@@ -1697,7 +1698,8 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn)
 		.lsn = promote_lsn,
 		.term = box_raft()->term,
 	};
-	txn_limbo_process(&txn_limbo, &req);
+	if (txn_limbo_process(&txn_limbo, &req) != 0)
+		diag_raise();
 	assert(txn_limbo_is_empty(&txn_limbo));
 }
 
@@ -3284,6 +3286,11 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
 
+	txn_limbo_filter_disable(&txn_limbo);
+	auto filter_guard = make_scoped_guard([&]{
+		txn_limbo_filter_enable(&txn_limbo);
+	});
+
 	struct wal_stream wal_stream;
 	wal_stream_create(&wal_stream);
 	auto stream_guard = make_scoped_guard([&]{
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 0b06e5e63..4aed24fe3 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -238,7 +238,8 @@ memtx_engine_recover_synchro(const struct xrow_header *row)
 	 * because all its rows have a zero replica_id.
 	 */
 	req.origin_id = req.replica_id;
-	txn_limbo_process(&txn_limbo, &req);
+	if (txn_limbo_process(&txn_limbo, &req) != 0)
+		return -1;
 	return 0;
 }
 
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index a718c55a2..59fb51fac 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -51,6 +51,7 @@ txn_limbo_create(struct txn_limbo *limbo)
 	limbo->confirmed_lsn = 0;
 	limbo->rollback_count = 0;
 	limbo->is_in_rollback = false;
+	limbo->is_filtering = true;
 }
 
 bool
@@ -724,6 +725,302 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout)
 	return 0;
 }
 
+enum filter_chain {
+	FILTER_IN,
+	FILTER_CONFIRM,
+	FILTER_ROLLBACK,
+	FILTER_PROMOTE,
+	FILTER_DEMOTE,
+	FILTER_MAX,
+};
+
+/**
+ * Fill the reject reason with request data.
+ * The function is not reenterable, use with caution.
+ */
+static char *
+reject_str(const struct synchro_request *req)
+{
+	static char prefix[128];
+
+	snprintf(prefix, sizeof(prefix), "RAFT: rejecting %s (%d) "
+		 "request from origin_id %u replica_id %u term %llu",
+		 iproto_type_name(req->type), req->type,
+		 req->origin_id, req->replica_id,
+		 (long long)req->term);
+
+	return prefix;
+}
+
+/**
+ * Common chain for any incoming packet.
+ */
+static int
+filter_in(struct txn_limbo *limbo, const struct synchro_request *req)
+{
+	(void)limbo;
+
+	/*
+	 * Zero LSN are allowed for PROMOTE
+	 * and DEMOTE requests only.
+	 */
+	if (req->lsn == 0) {
+		if (!iproto_type_is_promote_request(req->type)) {
+			say_info("%s. Zero lsn detected",
+				 reject_str(req));
+
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "Replication",
+				 "zero LSN on promote/demote");
+			return -1;
+		}
+	}
+
+	/*
+	 * Zero @a replica_id is allowed for PROMOTE packets only.
+	 */
+	if (req->replica_id == REPLICA_ID_NIL) {
+		if (req->type != IPROTO_PROMOTE) {
+			say_info("%s. Zero replica_id detected",
+				 reject_str(req));
+
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "Replication",
+				 "zero replica_id");
+			return -1;
+
+		}
+	}
+
+	/*
+	 * Incoming packets should esteem limbo owner,
+	 * if it doesn't match it means the sender
+	 * missed limbo owner migrations and out of date.
+	 */
+	if (req->replica_id != limbo->owner_id) {
+		say_info("%s. Limbo owner mismatch, owner_id %u",
+			 reject_str(req), limbo->owner_id);
+
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "Replication",
+			 "sync queue silent owner migration");
+		return -1;
+	}
+
+	return 0;
+}
+
+/**
+ * Filter CONFIRM and ROLLBACK packets.
+ */
+static int
+filter_confirm_rollback(struct txn_limbo *limbo,
+			const struct synchro_request *req)
+{
+	/*
+	 * When limbo is empty we have nothing to
+	 * confirm/commit and if this request comes
+	 * in it means the split brain has happened.
+	 */
+	if (!txn_limbo_is_empty(limbo))
+		return 0;
+
+	say_info("%s. Empty limbo detected", reject_str(req));
+
+	diag_set(ClientError, ER_UNSUPPORTED,
+		 "Replication",
+		 "confirm/rollback with empty limbo");
+	return -1;
+}
+
+/**
+ * Filter PROMOTE and DEMOTE packets.
+ */
+static int
+filter_promote_demote(struct txn_limbo *limbo,
+		      const struct synchro_request *req)
+{
+	int64_t promote_lsn = req->lsn;
+
+	/*
+	 * PROMOTE and DEMOTE packets must not have zero
+	 * term supplied, otherwise it is a broken packet.
+	 */
+	if (req->term == 0) {
+		say_info("%s. Zero term detected", reject_str(req));
+
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "Replication", "zero term");
+		return -1;
+	}
+
+	/*
+	 * If the term is already seen it means it comes
+	 * from a node which didn't notice new elections,
+	 * thus been living in subdomain and its data is
+	 * no longer consistent.
+	 */
+	if (limbo->promote_greatest_term > req->term) {
+		say_info("%s. Max term seen is %llu", reject_str(req),
+			 (long long)limbo->promote_greatest_term);
+
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "Replication", "obsolete terms");
+		return -1;
+	}
+
+	/*
+	 * Easy case -- processed LSN matches the new
+	 * one which comes inside request, everything
+	 * is consistent.
+	 */
+	if (limbo->confirmed_lsn == promote_lsn)
+		return 0;
+
+	/*
+	 * Explicit split brain situation. Promote
+	 * comes in with an old LSN which we've already
+	 * processed.
+	 */
+	if (limbo->confirmed_lsn > promote_lsn) {
+		say_info("%s. confirmed_lsn %lld > promote_lsn %lld",
+			 reject_str(req),
+			 (long long)limbo->confirmed_lsn,
+			 (long long)promote_lsn);
+
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "Replication",
+			 "backward promote LSN (split brain)");
+		return -1;
+	}
+
+	/*
+	 * The last case requires a few subcases.
+	 */
+	assert(limbo->confirmed_lsn < promote_lsn);
+
+	if (txn_limbo_is_empty(limbo)) {
+		/*
+		 * Transactions are rolled back already,
+		 * since the limbo is empty.
+		 */
+		say_info("%s. confirmed_lsn %lld < promote_lsn %lld "
+			 "and empty limbo", reject_str(req),
+			 (long long)limbo->confirmed_lsn,
+			 (long long)promote_lsn);
+
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "Replication",
+			 "forward promote LSN "
+			 "(empty limbo, split brain)");
+		return -1;
+	} else {
+		/*
+		 * Some entries are present in the limbo,
+		 * we need to make sure the @a promote_lsn
+		 * lays inside limbo [first; last] range.
+		 * So that the promote request has some
+		 * queued data to process, otherwise it
+		 * means the request comes from split
+		 * brained node.
+		 */
+		struct txn_limbo_entry *first, *last;
+
+		first = txn_limbo_first_entry(limbo);
+		last = txn_limbo_last_entry(limbo);
+
+		if (first->lsn < promote_lsn ||
+		    last->lsn > promote_lsn) {
+			say_info("%s. promote_lsn %lld out of "
+				 "range [%lld; %lld]",
+				 reject_str(req),
+				 (long long)promote_lsn,
+				 (long long)first->lsn,
+				 (long long)last->lsn);
+
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "Replication",
+				 "promote LSN out of queue range "
+				 "(split brain)");
+			return -1;
+		}
+	}
+
+	return 0;
+}
+
+static int (*filter_req[FILTER_MAX])
+(struct txn_limbo *limbo, const struct synchro_request *req) = {
+	[FILTER_IN]		= filter_in,
+	[FILTER_CONFIRM]	= filter_confirm_rollback,
+	[FILTER_ROLLBACK]	= filter_confirm_rollback,
+	[FILTER_PROMOTE]	= filter_promote_demote,
+	[FILTER_DEMOTE]		= filter_promote_demote,
+};
+
+int
+txn_limbo_filter_locked(struct txn_limbo *limbo,
+			const struct synchro_request *req)
+{
+	unsigned int mask = (1u << FILTER_IN);
+	unsigned int pos = 0;
+
+	assert(latch_is_locked(&limbo->promote_latch));
+
+#ifndef NDEBUG
+	say_info("limbo: filter %s replica_id %u origin_id %u "
+		 "term %lld lsn %lld, queue owner_id %u len %lld "
+		 "confirmed_lsn %lld (%s)",
+		 iproto_type_name(req->type),
+		 req->replica_id, req->origin_id,
+		 (long long)req->term, (long long)req->lsn,
+		 limbo->owner_id, (long long)limbo->len,
+		 (long long)limbo->confirmed_lsn,
+		 limbo->is_filtering ? "on" : "off");
+#endif
+
+	if (!limbo->is_filtering)
+		return 0;
+
+	switch (req->type) {
+	case IPROTO_CONFIRM:
+		mask |= (1u << FILTER_CONFIRM);
+		break;
+	case IPROTO_ROLLBACK:
+		mask |= (1u << FILTER_ROLLBACK);
+		break;
+	case IPROTO_PROMOTE:
+		mask |= (1u << FILTER_PROMOTE);
+		break;
+	case IPROTO_DEMOTE:
+		mask |= (1u << FILTER_DEMOTE);
+		break;
+	default:
+		say_info("RAFT: rejecting unexpected %d "
+			 "request from instance id %u "
+			 "for term %llu.",
+			 req->type, req->origin_id,
+			 (long long)req->term);
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "Replication",
+			 "unexpected request type");
+		return -1;
+	}
+
+	while (mask != 0) {
+		if ((mask & 1) != 0) {
+			assert(pos < lengthof(filter_req));
+			assert(filter_req[pos] != NULL);
+			if (filter_req[pos](limbo, req) != 0)
+				return -1;
+		}
+		pos++;
+		mask >>= 1;
+	};
+
+	return 0;
+}
+
 void
 txn_limbo_process_locked(struct txn_limbo *limbo,
 			 const struct synchro_request *req)
@@ -732,71 +1029,46 @@ txn_limbo_process_locked(struct txn_limbo *limbo,
 
 	uint64_t term = req->term;
 	uint32_t origin = req->origin_id;
+
 	if (txn_limbo_replica_term_locked(limbo, origin) < term) {
 		vclock_follow(&limbo->promote_term_map, origin, term);
 		if (term > limbo->promote_greatest_term)
 			limbo->promote_greatest_term = term;
-	} else if (iproto_type_is_promote_request(req->type) &&
-		   limbo->promote_greatest_term > 1) {
-		/* PROMOTE for outdated term. Ignore. */
-		say_info("RAFT: ignoring %s request from instance "
-			 "id %u for term %llu. Greatest term seen "
-			 "before (%llu) is bigger.",
-			 iproto_type_name(req->type), origin, (long long)term,
-			 (long long)limbo->promote_greatest_term);
-		return;
 	}
 
-	int64_t lsn = req->lsn;
-	if (req->replica_id == REPLICA_ID_NIL) {
-		/*
-		 * The limbo was empty on the instance issuing the request.
-		 * This means this instance must empty its limbo as well.
-		 */
-		assert(lsn == 0 && iproto_type_is_promote_request(req->type));
-	} else if (req->replica_id != limbo->owner_id) {
-		/*
-		 * Ignore CONFIRM/ROLLBACK messages for a foreign master.
-		 * These are most likely outdated messages for already confirmed
-		 * data from an old leader, who has just started and written
-		 * confirm right on synchronous transaction recovery.
-		 */
-		if (!iproto_type_is_promote_request(req->type))
-			return;
-		/*
-		 * Promote has a bigger term, and tries to steal the limbo. It
-		 * means it probably was elected with a quorum, and it makes no
-		 * sense to wait here for confirmations. The other nodes already
-		 * elected a new leader. Rollback all the local txns.
-		 */
-		lsn = 0;
-	}
 	switch (req->type) {
 	case IPROTO_CONFIRM:
-		txn_limbo_read_confirm(limbo, lsn);
+		txn_limbo_read_confirm(limbo, req->lsn);
 		break;
 	case IPROTO_ROLLBACK:
-		txn_limbo_read_rollback(limbo, lsn);
+		txn_limbo_read_rollback(limbo, req->lsn);
 		break;
 	case IPROTO_PROMOTE:
-		txn_limbo_read_promote(limbo, req->origin_id, lsn);
+		txn_limbo_read_promote(limbo, req->origin_id, req->lsn);
 		break;
 	case IPROTO_DEMOTE:
-		txn_limbo_read_demote(limbo, lsn);
+		txn_limbo_read_demote(limbo, req->lsn);
 		break;
 	default:
-		unreachable();
+		panic("limbo: unexpected request type %d",
+		      req->type);
+		break;
 	}
-	return;
 }
 
-void
+int
 txn_limbo_process(struct txn_limbo *limbo,
 		  const struct synchro_request *req)
 {
+	int rc;
+
 	txn_limbo_term_lock(limbo);
-	txn_limbo_process_locked(limbo, req);
+	rc = txn_limbo_filter_locked(limbo, req);
+	if (rc == 0)
+		txn_limbo_process_locked(limbo, req);
 	txn_limbo_term_unlock(limbo);
+
+	return rc;
 }
 
 void
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index c77c501e9..de33037f5 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -184,6 +184,14 @@ struct txn_limbo {
 	 * by the 'reversed rollback order' rule - contradiction.
 	 */
 	bool is_in_rollback;
+	/**
+	 * Whether the limbo should filter incoming requests.
+	 * The phases of local recovery from WAL file and on applier's
+	 * join phase we are in complete trust of incoming data because
+	 * this data forms an initial limbo state and should not
+	 * filter out requests.
+	 */
+	bool is_filtering;
 };
 
 /**
@@ -333,15 +341,38 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
 int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
 
+/**
+ * Verify if the request is valid for processing.
+ */
+int
+txn_limbo_filter_locked(struct txn_limbo *limbo,
+			const struct synchro_request *req);
+
 /** Execute a synchronous replication request. */
 void
 txn_limbo_process_locked(struct txn_limbo *limbo,
 			 const struct synchro_request *req);
 
 /** Lock limbo terms and execute a synchronous replication request. */
-void
+int
 txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
 
+/** Enable filtering of synchro requests. */
+static inline void
+txn_limbo_filter_enable(struct txn_limbo *limbo)
+{
+	limbo->is_filtering = true;
+	say_info("limbo: filter enabled");
+}
+
+/** Disable filtering of synchro requests. */
+static inline void
+txn_limbo_filter_disable(struct txn_limbo *limbo)
+{
+	limbo->is_filtering = false;
+	say_info("limbo: filter disabled");
+}
+
 /**
  * Waiting for confirmation of all "sync" transactions
  * during confirm timeout or fail.
-- 
2.31.1



More information about the Tarantool-patches mailing list