[Tarantool-patches] [PATCH 2/2] box: introduce a cfg handle to become syncro leader

Serge Petrenko sergepetrenko at tarantool.org
Sun Jul 5 00:55:49 MSK 2020


Introduce replication_synchro_leader option to box.cfg.

Once an instance is promoted to leader, it makes sure that txn_limbo is
free of previous leader's transactions. In order to achieve this goal,
the instance first waits for 2 replication_synchro_timeouts  so that
confirmations and rollbacks from the former leader reach it.

If the limbo remains non-empty, the new leader starts figuring out which
transactions should be confirmed and which should be rolled back. In
order to do so the instance scans through vclocks of all the instances
that replicate from it and defines which former leader's lsn is the last
reached by replication_synchro_quorum of replicas.

Then the instance writes appropriate CONFIRM and ROLLBACK entries.
After these actions the limbo must be empty, and the instance may
proceed with appending its own entries to the limbo.

Closes #4849
---
 src/box/box.cc           | 79 ++++++++++++++++++++++++++++++++++++++++
 src/box/box.h            |  1 +
 src/box/lua/cfg.cc       |  9 +++++
 src/box/lua/load_cfg.lua |  4 ++
 src/box/txn_limbo.c      | 16 +-------
 src/box/txn_limbo.h      | 15 ++++++++
 6 files changed, 110 insertions(+), 14 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ca24b98ca..087710383 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -78,6 +78,7 @@
 #include "sequence.h"
 #include "sql_stmt_cache.h"
 #include "msgpack.h"
+#include "trivia/util.h"
 
 static char status[64] = "unknown";
 
@@ -945,6 +946,84 @@ box_set_replication_anon(void)
 
 }
 
+void
+box_set_replication_synchro_leader(void)
+{
+	bool is_leader = cfg_geti("replication_synchro_leader");
+	/*
+	 * For now no actions required when an instance stops
+	 * being a leader. We should probably wait until txn_limbo
+	 * becomes empty.
+	 */
+	if (!is_leader)
+		return;
+	uint32_t former_leader_id = txn_limbo.instance_id;
+	if (former_leader_id == REPLICA_ID_NIL ||
+	    former_leader_id == instance_id) {
+		return;
+	}
+
+	/* Wait until pending confirmations/rollbacks reach us. */
+	double timeout = 2 * txn_limbo_confirm_timeout(&txn_limbo);
+	double start_tm = fiber_time();
+	while (!txn_limbo_is_empty(&txn_limbo)) {
+		if (fiber_time() - start_tm > timeout)
+			break;
+		fiber_sleep(0.001);
+	}
+
+	if (!txn_limbo_is_empty(&txn_limbo)) {
+		int64_t lsns[VCLOCK_MAX];
+		int len = 0;
+		const struct vclock  *vclock;
+		replicaset_foreach(replica) {
+			if (replica->relay != NULL &&
+			    relay_get_state(replica->relay) != RELAY_OFF &&
+			    !replica->anon) {
+				assert(!tt_uuid_is_equal(&INSTANCE_UUID,
+							 &replica->uuid));
+				vclock = relay_vclock(replica->relay);
+				int64_t lsn = vclock_get(vclock,
+							 former_leader_id);
+				lsns[len++] = lsn;
+			}
+		}
+		lsns[len++] = vclock_get(box_vclock, former_leader_id);
+		assert(len < VCLOCK_MAX);
+
+		int64_t confirm_lsn = 0;
+		if (len >= replication_synchro_quorum) {
+			qsort(lsns, len, sizeof(int64_t), cmp_i64);
+			confirm_lsn = lsns[len - replication_synchro_quorum];
+		}
+
+		struct txn_limbo_entry *e, *last_quorum = NULL;
+		struct txn_limbo_entry *rollback = NULL;
+		rlist_foreach_entry(e, &txn_limbo.queue, in_queue) {
+			if (txn_has_flag(e->txn, TXN_WAIT_ACK)) {
+				if (e->lsn <= confirm_lsn) {
+					last_quorum = e;
+				} else {
+					rollback = e;
+					break;
+				}
+			}
+		}
+
+		if (last_quorum != NULL) {
+			confirm_lsn = last_quorum->lsn;
+			txn_limbo_write_confirm(&txn_limbo, confirm_lsn);
+			txn_limbo_read_confirm(&txn_limbo, confirm_lsn);
+		}
+		if (rollback != NULL) {
+			txn_limbo_write_rollback(&txn_limbo, rollback->lsn);
+			txn_limbo_read_rollback(&txn_limbo, rollback->lsn - 1);
+		}
+
+		assert(txn_limbo_is_empty(&txn_limbo));
+	}
+}
+
 void
 box_listen(void)
 {
diff --git a/src/box/box.h b/src/box/box.h
index f9789154e..565b0ebce 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -245,6 +245,7 @@ void box_set_replication_connect_quorum(void);
 void box_set_replication_sync_lag(void);
 int box_set_replication_synchro_quorum(void);
 int box_set_replication_synchro_timeout(void);
+void box_set_replication_synchro_leader(void);
 void box_set_replication_sync_timeout(void);
 void box_set_replication_skip_conflict(void);
 void box_set_replication_anon(void);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index d481155cd..adc1fcf3f 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -329,6 +329,14 @@ lbox_cfg_set_replication_synchro_timeout(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_replication_synchro_leader(struct lua_State *L)
+{
+	(void) L;
+	box_set_replication_synchro_leader();
+	return 0;
+}
+
 static int
 lbox_cfg_set_replication_sync_timeout(struct lua_State *L)
 {
@@ -388,6 +396,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_replication_sync_lag", lbox_cfg_set_replication_sync_lag},
 		{"cfg_set_replication_synchro_quorum", lbox_cfg_set_replication_synchro_quorum},
 		{"cfg_set_replication_synchro_timeout", lbox_cfg_set_replication_synchro_timeout},
+		{"cfg_set_replication_synchro_leader", lbox_cfg_set_replication_synchro_leader},
 		{"cfg_set_replication_sync_timeout", lbox_cfg_set_replication_sync_timeout},
 		{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
 		{"cfg_set_replication_anon", lbox_cfg_set_replication_anon},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 107bc1582..9a968f30e 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -91,6 +91,7 @@ local default_cfg = {
     replication_sync_timeout = 300,
     replication_synchro_quorum = 1,
     replication_synchro_timeout = 5,
+    replication_synchro_leader = false,
     replication_connect_timeout = 30,
     replication_connect_quorum = nil, -- connect all
     replication_skip_conflict = false,
@@ -168,6 +169,7 @@ local template_cfg = {
     replication_sync_timeout = 'number',
     replication_synchro_quorum = 'number',
     replication_synchro_timeout = 'number',
+    replication_synchro_leader = 'boolean',
     replication_connect_timeout = 'number',
     replication_connect_quorum = 'number',
     replication_skip_conflict = 'boolean',
@@ -286,6 +288,7 @@ local dynamic_cfg = {
     replication_sync_timeout = private.cfg_set_replication_sync_timeout,
     replication_synchro_quorum = private.cfg_set_replication_synchro_quorum,
     replication_synchro_timeout = private.cfg_set_replication_synchro_timeout,
+    replication_synchro_leader = private.cfg_set_replication_synchro_leader,
     replication_skip_conflict = private.cfg_set_replication_skip_conflict,
     replication_anon        = private.cfg_set_replication_anon,
     instance_uuid           = check_instance_uuid,
@@ -333,6 +336,7 @@ local dynamic_cfg_order = {
     -- the new one. This should be fixed when box.cfg is able to
     -- apply some parameters together and atomically.
     replication_anon        = 250,
+    replication_synchro__leader = 250,
 }
 
 local function sort_cfg_cb(l, r)
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 44a0c7273..992115ad1 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -148,9 +148,6 @@ txn_limbo_check_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	return entry->is_commit;
 }
 
-static int
-txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
-
 int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 {
@@ -261,11 +258,7 @@ rollback:
 	return -1;
 }
 
-/**
- * Write a confirmation entry to WAL. After it's written all the
- * transactions waiting for confirmation may be finished.
- */
-static int
+int
 txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
 {
 	return txn_limbo_write_confirm_rollback(limbo, lsn, true);
@@ -303,12 +296,7 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
 	}
 }
 
-/**
- * Write a rollback message to WAL. After it's written
- * all the transactions following the current one and waiting
- * for confirmation must be rolled back.
- */
-static int
+int
 txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
 {
 	return txn_limbo_write_confirm_rollback(limbo, lsn, false);
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 3abbe9e85..5bf5827ac 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -205,6 +205,21 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
 int
 txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
 
+/**
+ * Write a confirmation entry to WAL. After it's written all the
+ * transactions waiting for confirmation may be finished.
+ */
+int
+txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn);
+
+/**
+ * Write a rollback message to WAL. After it's written
+ * all the transactions following the current one and waiting
+ * for confirmation must be rolled back.
+ */
+int
+txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn);
+
 /**
  * Confirm all the entries up to the given master's LSN.
  */
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list