[Tarantool-patches] [PATCH rfc 1/1] replication: stop resetting existing connections

Olga Arkhangelskaia arkholga at tarantool.org
Thu Apr 2 16:29:48 MSK 2020


Every time replication configuration changes replica resets all
connections, even if the source is in the new configuration.
This slows down reconfiguration thru ER_CFG from the already existing
realy.
Patch allows to create only absolutely new appliers.

Closes #4669
Part of #4668
---
Branch:
https://github.com/tarantool/tarantool/tree/OKriw/gh-4669-Any-change-in-box.cfg-replication-list-resets-all-connections
 src/box/box.cc                    | 106 +++++++++++++++++++++++++-----
 src/box/box.h                     |   1 +
 src/box/replication.cc            |  56 +++++++++++-----
 src/box/replication.h             |   3 +-
 test/replication/misc.result      |   6 ++
 test/replication/misc.test.lua    |   2 +
 test/replication/quorum.result    |   3 +
 test/replication/quorum.test.lua  |   1 +
 test/replication/replica_self.lua |  11 ++++
 test/replication/self.result      |  68 +++++++++++++++++++
 test/replication/self.test.lua    |  25 +++++++
 11 files changed, 248 insertions(+), 34 deletions(-)
 create mode 100644 test/replication/replica_self.lua
 create mode 100644 test/replication/self.result
 create mode 100644 test/replication/self.test.lua

diff --git a/src/box/box.cc b/src/box/box.cc
index 1b2b27d61..b23cf991c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -139,6 +139,12 @@ static struct fiber_pool tx_fiber_pool;
  */
 static struct cbus_endpoint tx_prio_endpoint;
 
+/**
+ * Array to store sources of replication which appliers
+ * will removed.
+ */
+char *remove_sources[VCLOCK_MAX-1];
+
 static int
 box_check_writable(void)
 {
@@ -666,11 +672,38 @@ box_check_config()
 	box_check_sql_cache_size(cfg_geti("sql_cache_size"));
 }
 
+/*
+ * Check if a source from new config is already in the replicaset.
+ */
+static bool source_exist(const char *source)
+{
+	replicaset_foreach(replica) {
+		if (replica->applier != NULL) {
+			if (strcmp(replica->applier->source, source) == 0)
+				return true;
+		}
+	}
+	return false;
+}
+
+/*
+ * Check whether replicaset source is in new configuration.
+ */
+ static bool source_left(int count, const char *source_old)
+{
+	for (int i = 0; i < count; i++) {
+		const char *source_new = cfg_getarr_elem("replication", i);
+		if(strcmp(source_new, source_old) == 0)
+			return true;
+	}
+	return false;
+}
+
 /*
  * Parse box.cfg.replication and create appliers.
  */
 static struct applier **
-cfg_get_replication(int *p_count)
+cfg_get_replication(int *p_count, int *new_count)
 {
 
 	/* Use static buffer for result */
@@ -681,21 +714,55 @@ cfg_get_replication(int *p_count)
 		tnt_raise(ClientError, ER_CFG, "replication",
 				"too many replicas");
 	}
-
-	for (int i = 0; i < count; i++) {
-		const char *source = cfg_getarr_elem("replication", i);
-		struct applier *applier = applier_new(source);
-		if (applier == NULL) {
+	/* Transition between anon and non-anon replicas is treated
+	 * separately.*/
+	if (box_check_replication_anon() != replication_anon) {
+		int j = 0;
+		for (int i = 0; i < count; i++) {
+			const char *source = cfg_getarr_elem("replication", i);
+			struct applier *applier = applier_new(source);
+			if (applier == NULL) {
 			/* Delete created appliers */
-			while (--i >= 0)
-				applier_delete(appliers[i]);
-			return NULL;
+				while (--j >= 0)
+					applier_delete(appliers[j]);
+				return NULL;
+			}
+			appliers[i] = applier; /* link to the list */
+			/* Anon replica should be removed.*/
+			remove_sources[i] = strdup(source);
+		}
+		*p_count = count;
+		*new_count = count;
+	} else {
+		int j = 0;
+		/* Create appliers only for new sources */
+		for (int i = 0; i < count; i++) {
+			const char *source = cfg_getarr_elem("replication", i);
+				if (!source_exist(source)) {
+					struct applier *applier = applier_new(source);
+				if (applier == NULL) {
+				/* Delete created appliers */
+					while (--j >= 0)
+						applier_delete(appliers[j]);
+					return NULL;
+				}
+				appliers[j] = applier; /* link to the list */
+				j++;
+			}
+		}
+		int i = 0;
+		/* List of appliers to be deleted */
+		replicaset_foreach(replica) {
+			if (replica->applier != NULL &&
+			    !source_left(count, replica->applier->source)) {
+				remove_sources[i] = strdup(replica->applier->source);
+				i++;
 		}
-		appliers[i] = applier; /* link to the list */
+	}	
+		/* Return only number on newly created appliers.*/
+		*p_count = count;
+		*new_count = j;
 	}
-
-	*p_count = count;
-
 	return appliers;
 }
 
@@ -707,16 +774,18 @@ static void
 box_sync_replication(bool connect_quorum)
 {
 	int count = 0;
-	struct applier **appliers = cfg_get_replication(&count);
+	int new_count = 0;
+	struct applier **appliers = cfg_get_replication(&count, &new_count);
+
 	if (appliers == NULL)
 		diag_raise();
 
 	auto guard = make_scoped_guard([=]{
-		for (int i = 0; i < count; i++)
+		for (int i = 0; i < new_count; i++)
 			applier_delete(appliers[i]); /* doesn't affect diag */
 	});
 
-	replicaset_connect(appliers, count, connect_quorum);
+	replicaset_connect(appliers, count, connect_quorum, new_count);
 
 	guard.is_active = false;
 }
@@ -796,7 +865,6 @@ box_set_replication_anon(void)
 			replication_anon = !anon;
 		});
 		/* Turn anonymous instance into a normal one. */
-		replication_anon = anon;
 		/*
 		 * Reset all appliers. This will interrupt
 		 * anonymous follow they're in so that one of
@@ -804,6 +872,10 @@ box_set_replication_anon(void)
 		 * non-anonymous subscribe.
 		 */
 		box_sync_replication(false);
+		/* Turn anonymous instance into a normal one. 
+		 * Need to check config first, and then set replication_anon
+		 */
+		replication_anon = anon;
 		/*
 		 * Wait until the master has registered this
 		 * instance.
diff --git a/src/box/box.h b/src/box/box.h
index a212e6510..5babfd448 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -471,6 +471,7 @@ box_process_rw(struct request *request, struct space *space,
 int
 boxk(int type, uint32_t space_id, const char *format, ...);
 
+extern char *remove_sources[31];
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/replication.cc b/src/box/replication.cc
index e7bfa22ab..a71a6f0f0 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -482,12 +482,27 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
 	return 0;
 }
 
+static bool
+check_and_remove_sources(const char *source) {
+	for(int i = 0; i < VCLOCK_MAX - 1; i++) {
+		if (remove_sources[i] != NULL) {
+			if (strcmp(source, remove_sources[i]) == 0) {
+				free(remove_sources[i]);
+				remove_sources[i] = NULL;
+				return true;
+			}
+		}
+	}
+	return false;
+}
+
 /**
  * Update the replica set with new "applier" objects
  * upon reconfiguration of box.cfg.replication.
+ * Count is total number of applier, new_count - only new ones.
  */
 static void
-replicaset_update(struct applier **appliers, int count)
+replicaset_update(struct applier **appliers, int count, int new_count)
 {
 	replica_hash_t uniq;
 	memset(&uniq, 0, sizeof(uniq));
@@ -505,7 +520,7 @@ replicaset_update(struct applier **appliers, int count)
 	});
 
 	/* Check for duplicate UUID */
-	for (int i = 0; i < count; i++) {
+	for (int i = 0; i < new_count; i++) {
 		applier = appliers[i];
 		replica = replica_new();
 		replica_set_applier(replica, applier);
@@ -540,7 +555,7 @@ replicaset_update(struct applier **appliers, int count)
 	 * apply the new configuration. Nothing can fail after this point.
 	 */
 
-	/* Prune old appliers */
+	/* Prune all appliers that haven't finished bootstrap yet.*/
 	while (!rlist_empty(&replicaset.anon)) {
 		replica = rlist_first_entry(&replicaset.anon,
 					    typeof(*replica), in_anon);
@@ -551,21 +566,25 @@ replicaset_update(struct applier **appliers, int count)
 		applier_stop(applier);
 		applier_delete(applier);
 	}
+	/*Throw away applier with sources from remove list and stopped once.*/
 	replicaset_foreach(replica) {
 		if (replica->applier == NULL)
 			continue;
 		applier = replica->applier;
-		replica_clear_applier(replica);
-		replica->applier_sync_state = APPLIER_DISCONNECTED;
-		applier_stop(applier);
-		applier_delete(applier);
+		if (check_and_remove_sources(applier->source) || count == 0 ||
+		    applier->state == APPLIER_OFF) {
+			replica_clear_applier(replica);
+			replica->applier_sync_state = APPLIER_DISCONNECTED;
+			applier_stop(applier);
+			applier_delete(applier);
+		}
 	}
 
 	/* Save new appliers */
 	replicaset.applier.total = count;
 	replicaset.applier.connected = 0;
 	replicaset.applier.loading = 0;
-	replicaset.applier.synced = 0;
+	replicaset.applier.synced = count - new_count;
 
 	replica_hash_foreach_safe(&uniq, replica, next) {
 		replica_hash_remove(&uniq, replica);
@@ -640,15 +659,16 @@ applier_on_connect_f(struct trigger *trigger, void *event)
 
 void
 replicaset_connect(struct applier **appliers, int count,
-		   bool connect_quorum)
+		   bool connect_quorum, int new_count)
 {
 	if (count == 0) {
 		/* Cleanup the replica set. */
-		replicaset_update(appliers, count);
+		replicaset_update(appliers, count, new_count);
 		return;
 	}
 
-	say_info("connecting to %d replicas", count);
+	say_info("connected to %d replicas", count);
+	say_info("connecting to %d replicas", new_count);
 
 	if (!connect_quorum) {
 		/*
@@ -680,14 +700,15 @@ replicaset_connect(struct applier **appliers, int count,
 	struct applier_on_connect triggers[VCLOCK_MAX];
 
 	struct replicaset_connect_state state;
-	state.connected = state.failed = 0;
+	state.connected  = count - new_count;
+	state.failed = 0;
 	fiber_cond_create(&state.wakeup);
 
 	double timeout = replication_connect_timeout;
 	int quorum = MIN(count, replication_connect_quorum);
 
 	/* Add triggers and start simulations connection to remote peers */
-	for (int i = 0; i < count; i++) {
+	for (int i = 0; i < new_count; i++) {
 		struct applier *applier = appliers[i];
 		struct applier_on_connect *trigger = &triggers[i];
 		/* Register a trigger to wake us up when peer is connected */
@@ -733,7 +754,7 @@ replicaset_connect(struct applier **appliers, int count,
 		say_info("connected to %d replicas", state.connected);
 	}
 
-	for (int i = 0; i < count; i++) {
+	for (int i = 0; i < new_count; i++) {
 		/* Unregister the temporary trigger used to wake us up */
 		trigger_clear(&triggers[i].base);
 		/*
@@ -748,14 +769,17 @@ replicaset_connect(struct applier **appliers, int count,
 
 	/* Now all the appliers are connected, update the replica set. */
 	try {
-		replicaset_update(appliers, count);
+		replicaset_update(appliers, count, new_count);
 	} catch (Exception *e) {
 		goto error;
 	}
+	for (int i = 0; i < new_count; i++) {
+		appliers[i] = NULL;
+	}
 	return;
 error:
 	/* Destroy appliers */
-	for (int i = 0; i < count; i++) {
+	for (int i = 0; i < new_count; i++) {
 		trigger_clear(&triggers[i].base);
 		applier_stop(appliers[i]);
 	}
diff --git a/src/box/replication.h b/src/box/replication.h
index 2ef1255b3..4db35b0d1 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -398,10 +398,11 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid);
  * \param connect_quorum if this flag is set, fail unless at
  *                       least replication_connect_quorum
  *                       appliers have successfully connected.
+ * \param new_count size of new appliers array
  */
 void
 replicaset_connect(struct applier **appliers, int count,
-		   bool connect_quorum);
+		   bool connect_quorumm, int new_count);
 
 /**
  * Check if the current instance fell too much behind its
diff --git a/test/replication/misc.result b/test/replication/misc.result
index b63d72846..94115804a 100644
--- a/test/replication/misc.result
+++ b/test/replication/misc.result
@@ -457,6 +457,9 @@ test_run:cmd("switch replica")
 replication = box.cfg.replication[1]
 ---
 ...
+box.cfg{replication = {}}
+---
+...
 box.cfg{replication = {replication, replication}}
 ---
 - error: 'Incorrect value for option ''replication'': duplicate connection to the
@@ -477,6 +480,9 @@ test_run:cmd("switch replica")
 ---
 - true
 ...
+box.cfg{replication = {}}
+---
+...
 box.cfg{replication_connect_quorum = 0, replication_connect_timeout = 0.01}
 ---
 ...
diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua
index c454a0992..96c87be88 100644
--- a/test/replication/misc.test.lua
+++ b/test/replication/misc.test.lua
@@ -190,6 +190,7 @@ test_run:cmd("create server replica with rpl_master=default, script='replication
 test_run:cmd("start server replica")
 test_run:cmd("switch replica")
 replication = box.cfg.replication[1]
+box.cfg{replication = {}}
 box.cfg{replication = {replication, replication}}
 
 -- Check the case when duplicate connection is detected in the background.
@@ -198,6 +199,7 @@ listen = box.cfg.listen
 box.cfg{listen = ''}
 
 test_run:cmd("switch replica")
+box.cfg{replication = {}}
 box.cfg{replication_connect_quorum = 0, replication_connect_timeout = 0.01}
 box.cfg{replication = {replication, replication}}
 
diff --git a/test/replication/quorum.result b/test/replication/quorum.result
index 07abe7f2a..35a42cc03 100644
--- a/test/replication/quorum.result
+++ b/test/replication/quorum.result
@@ -432,6 +432,9 @@ test_run:cmd('switch replica_quorum')
 ...
 -- If replication_connect_quorum was ignored here, the instance
 -- would exit with an error.
+box.cfg{replication={}}
+---
+...
 box.cfg{replication={INSTANCE_URI, nonexistent_uri(1)}}
 ---
 ...
diff --git a/test/replication/quorum.test.lua b/test/replication/quorum.test.lua
index 5f2872675..c8d8e3700 100644
--- a/test/replication/quorum.test.lua
+++ b/test/replication/quorum.test.lua
@@ -164,6 +164,7 @@ test_run:cmd('start server replica_quorum with wait=True, wait_load=True, args="
 test_run:cmd('switch replica_quorum')
 -- If replication_connect_quorum was ignored here, the instance
 -- would exit with an error.
+box.cfg{replication={}}
 box.cfg{replication={INSTANCE_URI, nonexistent_uri(1)}}
 box.info.id
 test_run:cmd('switch default')
diff --git a/test/replication/replica_self.lua b/test/replication/replica_self.lua
new file mode 100644
index 000000000..7f972570d
--- /dev/null
+++ b/test/replication/replica_self.lua
@@ -0,0 +1,11 @@
+#!/usr/bin/env tarantool
+
+box.cfg({
+    listen              = os.getenv("LISTEN"),
+    replication         = {os.getenv("MASTER"), os.getenv("LISTEN")},
+    memtx_memory        = 107374182,
+    replication_timeout = 0.1,
+    replication_connect_timeout = 0.5,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/self.result b/test/replication/self.result
new file mode 100644
index 000000000..77eb417dc
--- /dev/null
+++ b/test/replication/self.result
@@ -0,0 +1,68 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--box.schema.user.revoke('guest', 'replication')
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+
+--
+-- gh-4669 any change in box.cfg replication list resets all connections
+--
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica_self.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd("start server replica")
+ | ---
+ | - true
+ | ...
+test_run:cmd("switch replica")
+ | ---
+ | - true
+ | ...
+replication = box.cfg.replication
+ | ---
+ | ...
+replication_change = {}
+ | ---
+ | ...
+replication_change[1] = replication[2]
+ | ---
+ | ...
+replication_change[2] = replication[1]
+ | ---
+ | ...
+box.cfg{replication = replication_change}
+ | ---
+ | ...
+
+test_run:cmd("switch default")
+ | ---
+ | - true
+ | ...
+--search log for error with duplicated connection from relay
+test_run:grep_log('replica', 'duplicate connection') == nil
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server replica")
+ | ---
+ | - true
+ | ...
+test_run:cmd("cleanup server replica")
+ | ---
+ | - true
+ | ...
+test_run:cmd("delete server replica")
+ | ---
+ | - true
+ | ...
+test_run:cleanup_cluster()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
diff --git a/test/replication/self.test.lua b/test/replication/self.test.lua
new file mode 100644
index 000000000..0fcf0c98d
--- /dev/null
+++ b/test/replication/self.test.lua
@@ -0,0 +1,25 @@
+test_run = require('test_run').new()
+
+--box.schema.user.revoke('guest', 'replication')
+box.schema.user.grant('guest', 'replication')
+
+--
+-- gh-4669 any change in box.cfg replication list resets all connections
+--
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica_self.lua"')
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+replication = box.cfg.replication
+replication_change = {}
+replication_change[1] = replication[2]
+replication_change[2] = replication[1]
+box.cfg{replication = replication_change}
+
+test_run:cmd("switch default")
+--search log for error with duplicated connection from relay
+test_run:grep_log('replica', 'duplicate connection') == nil
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+test_run:cmd("delete server replica")
+test_run:cleanup_cluster()
+box.schema.user.revoke('guest', 'replication')
-- 
2.20.1 (Apple Git-117)



More information about the Tarantool-patches mailing list