[Tarantool-patches] [PATCH] limbo: introduce request processing hooks

Cyrill Gorcunov gorcunov at gmail.com
Sun Jul 11 01:28:03 MSK 2021


Guys, this is an early rfc since I would like to discuss the
design first before going further. Currently we don't interrupt
incoming syncro requests which doesn't allow us to detect cluster
split-brain situation, as we were discussing verbally there are
a number of sign to detect it and we need to stop receiving data
from obsolete nodes.

The main problem though is that such filtering of incoming packets
should happen at the moment where we still can do a step back and
inform the peer that data has been declined, but currently our
applier code process syncro requests inside WAL trigger, ie when
data is already applied or rolling back.

Thus we need to separate "filer" and "apply" stages of processing.
What is more interesting is that we filter incomings via in memory
vclock and update them immediately. Thus the following situation
is possible -- a promote request comes in, we remember it inside
promote_term_map but then write to WAL fails and we never revert
the promote_term_map variable, thus other peer won't be able to
resend us this promote request because now we think that we've
alreday applied the promote.

To solve this I split processing routine into stages:

 - filter stage: we investigate infly packets and remember
   their terms in @terms_infly vclock
 - apply stage: the data is verified and we try to apply the
   data and write it to the disk, once everything is fine
   we update @terms_applied vclock which serves as a backup
   of @terms_infly
 - error stage: data application failed and we should revert
   @terms_infly vclock to the previous value

The stages are processed by txn_limbo_apply routine which takes
a mask of stages it should execute. Old txn_limbo_process is
simply an inline wrapper with appropriate flags.

Please note that I _didn't_ put complete conditions into
limbo_op_filter yet only moved current code there.

So take a look once time permit, maybe there some easier
approach in code structure.

branch gorcunov/gh-6036-rollback-confirm-notest

Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
---
 src/box/applier.cc  |  13 +++-
 src/box/box.cc      |   6 +-
 src/box/txn_limbo.c | 149 ++++++++++++++++++++++++++++++++++++++------
 src/box/txn_limbo.h |  97 ++++++++++++++++++++++------
 4 files changed, 223 insertions(+), 42 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 978383e64..8a44bf1b2 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -458,7 +458,8 @@ applier_wait_snapshot(struct applier *applier)
 				struct synchro_request req;
 				if (xrow_decode_synchro(&row, &req) != 0)
 					diag_raise();
-				txn_limbo_process(&txn_limbo, &req);
+				if (txn_limbo_process(&txn_limbo, &req) != 0)
+					diag_raise();
 			} else if (iproto_type_is_raft_request(row.type)) {
 				struct raft_request req;
 				if (xrow_decode_raft(&row, &req, NULL) != 0)
@@ -850,11 +851,16 @@ apply_synchro_row_cb(struct journal_entry *entry)
 	assert(entry->complete_data != NULL);
 	struct synchro_entry *synchro_entry =
 		(struct synchro_entry *)entry->complete_data;
+	struct synchro_request *req = synchro_entry->req;
+
 	if (entry->res < 0) {
+		txn_limbo_apply(&txn_limbo, req,
+				LIMBO_OP_ERROR | LIMBO_OP_ATTR_PANIC);
 		applier_rollback_by_wal_io(entry->res);
 	} else {
 		replica_txn_wal_write_cb(synchro_entry->rcb);
-		txn_limbo_process(&txn_limbo, synchro_entry->req);
+		txn_limbo_apply(&txn_limbo, synchro_entry->req,
+				LIMBO_OP_APPLY | LIMBO_OP_ATTR_PANIC);
 		trigger_run(&replicaset.applier.on_wal_write, NULL);
 	}
 	fiber_wakeup(synchro_entry->owner);
@@ -870,6 +876,9 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
 	if (xrow_decode_synchro(row, &req) != 0)
 		goto err;
 
+	if (txn_limbo_apply(&txn_limbo, &req, LIMBO_OP_FILTER) != 0)
+		goto err;
+
 	struct replica_cb_data rcb_data;
 	struct synchro_entry entry;
 	/*
diff --git a/src/box/box.cc b/src/box/box.cc
index bc68ee4c8..79bacfa08 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -428,7 +428,8 @@ wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
 		say_error("couldn't decode a synchro request");
 		return -1;
 	}
-	txn_limbo_process(&txn_limbo, &syn_req);
+	if (txn_limbo_process(&txn_limbo, &syn_req) != 0)
+		return -1;
 	return 0;
 }
 
@@ -1701,7 +1702,8 @@ box_clear_synchro_queue(bool demote)
 				.lsn = wait_lsn,
 				.term = term,
 			};
-			txn_limbo_process(&txn_limbo, &req);
+			if (txn_limbo_process(&txn_limbo, &req) != 0)
+				return -1;
 			assert(txn_limbo_is_empty(&txn_limbo));
 		}
 	}
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index d2e4dcb1e..0650702b9 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -37,6 +37,15 @@
 
 struct txn_limbo txn_limbo;
 
+static void
+txn_limbo_promote_create(struct txn_limbo_promote *pmt)
+{
+	vclock_create(&pmt->terms_applied);
+	vclock_create(&pmt->terms_infly);
+	pmt->term_max = 0;
+	pmt->term_max_infly = 0;
+}
+
 static inline void
 txn_limbo_create(struct txn_limbo *limbo)
 {
@@ -45,11 +54,11 @@ txn_limbo_create(struct txn_limbo *limbo)
 	limbo->owner_id = REPLICA_ID_NIL;
 	fiber_cond_create(&limbo->wait_cond);
 	vclock_create(&limbo->vclock);
-	vclock_create(&limbo->promote_term_map);
-	limbo->promote_greatest_term = 0;
 	limbo->confirmed_lsn = 0;
 	limbo->rollback_count = 0;
 	limbo->is_in_rollback = false;
+
+	txn_limbo_promote_create(&limbo->promote);
 }
 
 bool
@@ -305,10 +314,12 @@ void
 txn_limbo_checkpoint(const struct txn_limbo *limbo,
 		     struct synchro_request *req)
 {
+	const struct txn_limbo_promote *pmt = &limbo->promote;
+
 	req->type = IPROTO_PROMOTE;
 	req->replica_id = limbo->owner_id;
 	req->lsn = limbo->confirmed_lsn;
-	req->term = limbo->promote_greatest_term;
+	req->term = pmt->term_max_infly;
 }
 
 void
@@ -696,27 +707,38 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
 	return 0;
 }
 
-void
-txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
+static int
+limbo_op_filter(struct txn_limbo *limbo, const struct synchro_request *req)
 {
-	uint64_t term = req->term;
+	struct txn_limbo_promote *pmt = &limbo->promote;
 	uint32_t origin = req->origin_id;
+	uint64_t term = req->term;
+
 	if (txn_limbo_replica_term(limbo, origin) < term) {
-		vclock_follow(&limbo->promote_term_map, origin, term);
-		if (term > limbo->promote_greatest_term)
-			limbo->promote_greatest_term = term;
+		vclock_follow(&pmt->terms_infly, origin, term);
+		if (term > pmt->term_max_infly)
+			pmt->term_max_infly = term;
 	} else if (iproto_type_is_promote_request(req->type) &&
-		   limbo->promote_greatest_term > 1) {
-		/* PROMOTE for outdated term. Ignore. */
-		say_info("RAFT: ignoring %s request from instance "
+		   pmt->term_max_infly > 1) {
+		say_info("RAFT: declining %s request from instance "
 			 "id %u for term %llu. Greatest term seen "
 			 "before (%llu) is bigger.",
-			 iproto_type_name(req->type), origin, (long long)term,
-			 (long long)limbo->promote_greatest_term);
-		return;
+			 iproto_type_name(req->type), origin,
+			 (long long)term,
+			 (long long)pmt->term_max_infly);
+		diag_set(ClientError, ER_UNSUPPORTED, "RAFT",
+			 "backward terms");
+		return -1;
 	}
 
+	return 0;
+}
+
+static int
+limbo_op_apply(struct txn_limbo *limbo, const struct synchro_request *req)
+{
 	int64_t lsn = req->lsn;
+
 	if (req->replica_id == REPLICA_ID_NIL) {
 		/*
 		 * The limbo was empty on the instance issuing the request.
@@ -731,7 +753,7 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
 		 * confirm right on synchronous transaction recovery.
 		 */
 		if (!iproto_type_is_promote_request(req->type))
-			return;
+			goto out;
 		/*
 		 * Promote has a bigger term, and tries to steal the limbo. It
 		 * means it probably was elected with a quorum, and it makes no
@@ -740,6 +762,7 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
 		 */
 		lsn = 0;
 	}
+
 	switch (req->type) {
 	case IPROTO_CONFIRM:
 		txn_limbo_read_confirm(limbo, lsn);
@@ -754,9 +777,99 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
 		txn_limbo_read_demote(limbo, lsn);
 		break;
 	default:
-		unreachable();
+		panic("limbo: unreacheable stage detected");
+		break;
 	}
-	return;
+
+
+out:
+	struct txn_limbo_promote *pmt = &limbo->promote;
+	uint32_t replica_id = req->origin_id;
+	uint64_t term = req->term;
+
+	uint64_t v = vclock_get(&pmt->terms_applied, replica_id);
+	if (v < term) {
+		vclock_follow(&pmt->terms_applied, replica_id, term);
+		if (term > pmt->term_max)
+			pmt->term_max = term;
+	}
+	return 0;
+}
+
+static int
+limbo_op_error(struct txn_limbo *limbo, const struct synchro_request *req)
+{
+	struct txn_limbo_promote *pmt = &limbo->promote;
+	uint32_t replica_id = req->origin_id;
+	/*
+	 * Restore to the applied value in case of error,
+	 * this will allow to reapply the entry when remote
+	 * node get error and try to resend data.
+	 */
+	uint64_t v = vclock_get(&pmt->terms_applied, replica_id);
+	vclock_reset(&pmt->terms_infly, replica_id, v);
+
+	/*
+	 * The max value has to be recalculated in a linear
+	 * form, the errors should not happen frequently so
+	 * this is not a hot path.
+	 */
+	int64_t maxv = 0;
+	struct vclock_iterator it;
+	vclock_iterator_init(&it, &pmt->terms_infly);
+	vclock_foreach(&it, r)
+		maxv = r.lsn > maxv ? r.lsn : maxv;
+	pmt->term_max_infly = maxv;
+	return 0;
+}
+
+static int (*limbo_apply_ops[LIMBO_OP_MAX])
+	(struct txn_limbo *limbo, const struct synchro_request *req) = {
+	[LIMBO_OP_FILTER_BIT]	= limbo_op_filter,
+	[LIMBO_OP_APPLY_BIT]	= limbo_op_apply,
+	[LIMBO_OP_ERROR_BIT]	= limbo_op_error,
+};
+
+static const char *
+limbo_apply_op_str(unsigned int bit)
+{
+	static const char *str[] = {
+		[LIMBO_OP_FILTER_BIT]	= "LIMBO_OP_FILTER",
+		[LIMBO_OP_APPLY_BIT]	= "LIMBO_OP_APPLY",
+		[LIMBO_OP_ERROR_BIT]	= "LIMBO_OP_ERROR",
+	};
+
+	if (bit < lengthof(limbo_apply_ops))
+		return str[bit];
+
+	return "UNKNOWN";
+}
+
+int
+txn_limbo_apply(struct txn_limbo *limbo,
+		const struct synchro_request *req,
+		unsigned int op_mask)
+{
+	unsigned int mask = op_mask & LIMBO_OP_MASK;
+	unsigned int bit = LIMBO_OP_MIN;
+
+	while (mask) {
+		if (mask & 1) {
+			if (limbo_apply_ops[bit] == NULL)
+				panic("limbo: empty apply operation");
+			say_info("limbo: apply operation %s",
+				 limbo_apply_op_str(bit));
+			if (limbo_apply_ops[bit](limbo, req) != 0) {
+				if (op_mask & LIMBO_OP_ATTR_PANIC)
+					panic("limbo: panicing op");
+				return -1;
+			}
+		}
+		mask >>= 1;
+		bit++;
+	};
+
+	return 0;
 }
 
 void
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 8e7315947..b93450f65 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -75,6 +75,40 @@ txn_limbo_entry_is_complete(const struct txn_limbo_entry *e)
 	return e->is_commit || e->is_rollback;
 }
 
+/**
+ * To keep state of promote requests to handle split-brain
+ * situation and other errors.
+ */
+struct txn_limbo_promote {
+	/**
+	 * Latest terms received with PROMOTE entries from remote instances.
+	 * Limbo uses them to filter out the transactions coming not from the
+	 * limbo owner, but so outdated that they are rolled back everywhere
+	 * except outdated nodes.
+	 */
+	struct vclock terms_applied;
+	/**
+	 * Infly replresentation of @a terms_applied, the term might
+	 * be not yet written to WAL but already seen.
+	 */
+	struct vclock terms_infly;
+	/**
+	 * The biggest PROMOTE term seen by the instance and persisted in WAL.
+	 * It is related to raft term, but not the same. Synchronous replication
+	 * represented by the limbo is interested only in the won elections
+	 * ended with PROMOTE request.
+	 * It means the limbo's term might be smaller than the raft term, while
+	 * there are ongoing elections, or the leader is already known and this
+	 * instance hasn't read its PROMOTE request yet. During other times the
+	 * limbo and raft are in sync and the terms are the same.
+	 */
+	uint64_t term_max;
+	/**
+	 * Infly representation of @a term_max.
+	 */
+	uint64_t term_max_infly;
+};
+
 /**
  * Limbo is a place where transactions are stored, which are
  * finished, but not committed nor rolled back. These are
@@ -130,23 +164,9 @@ struct txn_limbo {
 	 */
 	struct vclock vclock;
 	/**
-	 * Latest terms received with PROMOTE entries from remote instances.
-	 * Limbo uses them to filter out the transactions coming not from the
-	 * limbo owner, but so outdated that they are rolled back everywhere
-	 * except outdated nodes.
+	 * To track PROMOTE requests.
 	 */
-	struct vclock promote_term_map;
-	/**
-	 * The biggest PROMOTE term seen by the instance and persisted in WAL.
-	 * It is related to raft term, but not the same. Synchronous replication
-	 * represented by the limbo is interested only in the won elections
-	 * ended with PROMOTE request.
-	 * It means the limbo's term might be smaller than the raft term, while
-	 * there are ongoing elections, or the leader is already known and this
-	 * instance hasn't read its PROMOTE request yet. During other times the
-	 * limbo and raft are in sync and the terms are the same.
-	 */
-	uint64_t promote_greatest_term;
+	struct txn_limbo_promote promote;
 	/**
 	 * Maximal LSN gathered quorum and either already confirmed in WAL, or
 	 * whose confirmation is in progress right now. Any attempt to confirm
@@ -218,7 +238,8 @@ txn_limbo_last_entry(struct txn_limbo *limbo)
 static inline uint64_t
 txn_limbo_replica_term(const struct txn_limbo *limbo, uint32_t replica_id)
 {
-	return vclock_get(&limbo->promote_term_map, replica_id);
+	const struct txn_limbo_promote *pmt = &limbo->promote;
+	return vclock_get(&pmt->terms_infly, replica_id);
 }
 
 /**
@@ -229,8 +250,9 @@ static inline bool
 txn_limbo_is_replica_outdated(const struct txn_limbo *limbo,
 			      uint32_t replica_id)
 {
+	const struct txn_limbo_promote *pmt = &limbo->promote;
 	return txn_limbo_replica_term(limbo, replica_id) <
-	       limbo->promote_greatest_term;
+	       pmt->term_max_infly;
 }
 
 /**
@@ -300,9 +322,44 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
 int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
 
+enum {
+	LIMBO_OP_MIN		= 0,
+	LIMBO_OP_FILTER_BIT	= 0,
+	LIMBO_OP_APPLY_BIT	= 1,
+	LIMBO_OP_ERROR_BIT	= 2,
+	LIMBO_OP_MAX,
+
+	LIMBO_OP_ATTR_MIN	= LIMBO_OP_MAX,
+	LIMBO_OP_ATTR_PANIC_BIT	= LIMBO_OP_ATTR_MIN + 1,
+	LIMBO_OP_ATTR_MAX,
+};
+
+enum {
+	LIMBO_OP_FILTER		= (1u << LIMBO_OP_FILTER_BIT),
+	LIMBO_OP_APPLY		= (1u << LIMBO_OP_APPLY_BIT),
+	LIMBO_OP_ERROR		= (1u << LIMBO_OP_ERROR_BIT),
+	LIMBO_OP_MASK		= (1u << LIMBO_OP_MAX) - 1,
+
+	LIMBO_OP_ATTR_PANIC	= (1u << LIMBO_OP_ATTR_PANIC_BIT),
+};
+
+/**
+ * Apply a synchronous replication request, the @a op_mask
+ * specifies stages to process.
+ */
+int
+txn_limbo_apply(struct txn_limbo *limbo,
+		const struct synchro_request *req,
+		unsigned int op_mask);
+
 /** Execute a synchronous replication request. */
-void
-txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
+static inline int
+txn_limbo_process(struct txn_limbo *limbo,
+		  const struct synchro_request *req)
+{
+	const int op_mask = LIMBO_OP_FILTER | LIMBO_OP_APPLY;
+	return txn_limbo_apply(limbo, req, op_mask);
+}
 
 /**
  * Waiting for confirmation of all "sync" transactions
-- 
2.31.1



More information about the Tarantool-patches mailing list