[Tarantool-patches] [PATCH 6/9] raft: keep track of greatest known term and filter replication sources based on that

Serge Petrenko sergepetrenko at tarantool.org
Sun Apr 11 20:56:01 MSK 2021


Start writing the actual leader term together with the PROMOTE request
and process terms in PROMOTE requests on receiver side.

Make applier only apply synchronous transactions from the instance which
has the greatest term as received in PROMOTE requests.

Closes #5445
---
 ...very => qsync-multi-statement-recovery.md} |  0
 changelogs/unreleased/raft-promote.md         |  4 +++
 src/box/applier.cc                            | 18 +++++++++++
 src/box/box.cc                                | 15 ++++++----
 src/lib/raft/raft.c                           |  1 +
 src/lib/raft/raft.h                           | 30 +++++++++++++++++++
 6 files changed, 63 insertions(+), 5 deletions(-)
 rename changelogs/unreleased/{qsync-multi-statement-recovery => qsync-multi-statement-recovery.md} (100%)
 create mode 100644 changelogs/unreleased/raft-promote.md

diff --git a/changelogs/unreleased/qsync-multi-statement-recovery b/changelogs/unreleased/qsync-multi-statement-recovery.md
similarity index 100%
rename from changelogs/unreleased/qsync-multi-statement-recovery
rename to changelogs/unreleased/qsync-multi-statement-recovery.md
diff --git a/changelogs/unreleased/raft-promote.md b/changelogs/unreleased/raft-promote.md
new file mode 100644
index 000000000..e5dac599c
--- /dev/null
+++ b/changelogs/unreleased/raft-promote.md
@@ -0,0 +1,4 @@
+## bugfix/replication
+
+* Fix a bug in synchronous replication when rolled back transactions could
+  reappear once a sufficiently old instance reconnected (gh-5445).
diff --git a/src/box/applier.cc b/src/box/applier.cc
index e8cbbe27a..926d2f7ea 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -849,6 +849,9 @@ apply_synchro_row(struct xrow_header *row)
 
 	txn_limbo_process(&txn_limbo, &req);
 
+	if (req.type == IPROTO_PROMOTE)
+		raft_source_update_term(box_raft(), req.origin_id, req.term);
+
 	struct synchro_entry *entry;
 	entry = synchro_entry_new(row, &req);
 	if (entry == NULL)
@@ -1044,6 +1047,21 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 		}
 	}
 
+	/*
+	 * All the synchronous rows coming from outdated instances are ignored
+	 * and replaced with NOPs to save vclock consistency.
+	 */
+	struct applier_tx_row *item;
+	if (raft_source_has_outdated_term(box_raft(), applier->instance_id) &&
+	    (last_row->wait_sync ||
+	     (iproto_type_is_synchro_request(first_row->type) &&
+	     !iproto_type_is_promote_request(first_row->type)))) {
+		stailq_foreach_entry(item, rows, next) {
+			struct xrow_header *row = &item->row;
+			row->type = IPROTO_NOP;
+			row->bodycnt = 0;
+		}
+	}
 	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
 		/*
 		 * Synchro messages are not transactions, in terms
diff --git a/src/box/box.cc b/src/box/box.cc
index b093341d3..9b6323b3f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -426,6 +426,10 @@ wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row)
 		return -1;
 	}
 	txn_limbo_process(&txn_limbo, &syn_req);
+	if (syn_req.type == IPROTO_PROMOTE) {
+			raft_source_update_term(box_raft(), syn_req.origin_id,
+						syn_req.term);
+	}
 	return 0;
 }
 
@@ -1556,11 +1560,10 @@ box_clear_synchro_queue(bool try_wait)
 			rc = -1;
 		} else {
 promote:
-			/*
-			 * Term parameter is unused now, We'll pass
-			 * box_raft()->term there later.
-			 */
-			txn_limbo_write_promote(&txn_limbo, wait_lsn, 0);
+			/* We cannot possibly get here in a volatile state. */
+			assert(box_raft()->volatile_term == box_raft()->term);
+			txn_limbo_write_promote(&txn_limbo, wait_lsn,
+						box_raft()->term);
 			struct synchro_request req = {
 				.type = 0, /* unused */
 				.replica_id = 0, /* unused */
@@ -1570,6 +1573,8 @@ promote:
 			};
 			txn_limbo_read_promote(&txn_limbo, &req);
 			assert(txn_limbo_is_empty(&txn_limbo));
+			raft_source_update_term(box_raft(), req.origin_id,
+						req.lsn);
 		}
 	}
 	in_clear_synchro_queue = false;
diff --git a/src/lib/raft/raft.c b/src/lib/raft/raft.c
index 4ea4fc3f8..e9ce8cade 100644
--- a/src/lib/raft/raft.c
+++ b/src/lib/raft/raft.c
@@ -985,6 +985,7 @@ raft_create(struct raft *raft, const struct raft_vtab *vtab)
 		.death_timeout = 5,
 		.vtab = vtab,
 	};
+	vclock_create(&raft->term_map);
 	raft_ev_timer_init(&raft->timer, raft_sm_schedule_new_election_cb,
 			   0, 0);
 	raft->timer.data = raft;
diff --git a/src/lib/raft/raft.h b/src/lib/raft/raft.h
index e447f6634..cba45a67d 100644
--- a/src/lib/raft/raft.h
+++ b/src/lib/raft/raft.h
@@ -207,6 +207,19 @@ struct raft {
 	 * subsystems, such as Raft.
 	 */
 	const struct vclock *vclock;
+	/**
+	 * The biggest term seen by this instance and persisted in WAL as part
+	 * of a PROMOTE request. May be smaller than @a term, while there are
+	 * ongoing elections, or the leader is already known, but this instance
+	 * hasn't read its PROMOTE request yet.
+	 * During other times must be equal to @a term.
+	 */
+	uint64_t greatest_known_term;
+	/**
+	 * Latest terms received with PROMOTE entries from remote instances.
+	 * Raft uses them to determine data from which sources may be applied.
+	 */
+	struct vclock term_map;
 	/** State machine timed event trigger. */
 	struct ev_timer timer;
 	/** Configured election timeout in seconds. */
@@ -243,6 +256,13 @@ raft_is_source_allowed(const struct raft *raft, uint32_t source_id)
 	return !raft->is_enabled || raft->leader == source_id;
 }
 
+static inline bool
+raft_source_has_outdated_term(const struct raft *raft, uint32_t source_id)
+{
+	uint64_t source_term = vclock_get(&raft->term_map, source_id);
+	return raft->is_enabled && source_term < raft->greatest_known_term;
+}
+
 /** Check if Raft is enabled. */
 static inline bool
 raft_is_enabled(const struct raft *raft)
@@ -250,6 +270,16 @@ raft_is_enabled(const struct raft *raft)
 	return raft->is_enabled;
 }
 
+static inline void
+raft_source_update_term(struct raft *raft, uint32_t source_id, uint64_t term)
+{
+	if ((uint64_t) vclock_get(&raft->term_map, source_id) >= term)
+		return;
+	vclock_follow(&raft->term_map, source_id, term);
+	if (term > raft->greatest_known_term)
+		raft->greatest_known_term = term;
+}
+
 /** Process a raft entry stored in WAL/snapshot. */
 void
 raft_process_recovery(struct raft *raft, const struct raft_msg *req);
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list