[Tarantool-patches] [PATCH rfc 1/1] replication: stop resetting existing connections
Olga Arkhangelskaia
arkholga at tarantool.org
Thu Apr 2 16:29:48 MSK 2020
Every time replication configuration changes replica resets all
connections, even if the source is in the new configuration.
This slows down reconfiguration thru ER_CFG from the already existing
realy.
Patch allows to create only absolutely new appliers.
Closes #4669
Part of #4668
---
Branch:
https://github.com/tarantool/tarantool/tree/OKriw/gh-4669-Any-change-in-box.cfg-replication-list-resets-all-connections
src/box/box.cc | 106 +++++++++++++++++++++++++-----
src/box/box.h | 1 +
src/box/replication.cc | 56 +++++++++++-----
src/box/replication.h | 3 +-
test/replication/misc.result | 6 ++
test/replication/misc.test.lua | 2 +
test/replication/quorum.result | 3 +
test/replication/quorum.test.lua | 1 +
test/replication/replica_self.lua | 11 ++++
test/replication/self.result | 68 +++++++++++++++++++
test/replication/self.test.lua | 25 +++++++
11 files changed, 248 insertions(+), 34 deletions(-)
create mode 100644 test/replication/replica_self.lua
create mode 100644 test/replication/self.result
create mode 100644 test/replication/self.test.lua
diff --git a/src/box/box.cc b/src/box/box.cc
index 1b2b27d61..b23cf991c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -139,6 +139,12 @@ static struct fiber_pool tx_fiber_pool;
*/
static struct cbus_endpoint tx_prio_endpoint;
+/**
+ * Array to store sources of replication which appliers
+ * will removed.
+ */
+char *remove_sources[VCLOCK_MAX-1];
+
static int
box_check_writable(void)
{
@@ -666,11 +672,38 @@ box_check_config()
box_check_sql_cache_size(cfg_geti("sql_cache_size"));
}
+/*
+ * Check if a source from new config is already in the replicaset.
+ */
+static bool source_exist(const char *source)
+{
+ replicaset_foreach(replica) {
+ if (replica->applier != NULL) {
+ if (strcmp(replica->applier->source, source) == 0)
+ return true;
+ }
+ }
+ return false;
+}
+
+/*
+ * Check whether replicaset source is in new configuration.
+ */
+ static bool source_left(int count, const char *source_old)
+{
+ for (int i = 0; i < count; i++) {
+ const char *source_new = cfg_getarr_elem("replication", i);
+ if(strcmp(source_new, source_old) == 0)
+ return true;
+ }
+ return false;
+}
+
/*
* Parse box.cfg.replication and create appliers.
*/
static struct applier **
-cfg_get_replication(int *p_count)
+cfg_get_replication(int *p_count, int *new_count)
{
/* Use static buffer for result */
@@ -681,21 +714,55 @@ cfg_get_replication(int *p_count)
tnt_raise(ClientError, ER_CFG, "replication",
"too many replicas");
}
-
- for (int i = 0; i < count; i++) {
- const char *source = cfg_getarr_elem("replication", i);
- struct applier *applier = applier_new(source);
- if (applier == NULL) {
+ /* Transition between anon and non-anon replicas is treated
+ * separately.*/
+ if (box_check_replication_anon() != replication_anon) {
+ int j = 0;
+ for (int i = 0; i < count; i++) {
+ const char *source = cfg_getarr_elem("replication", i);
+ struct applier *applier = applier_new(source);
+ if (applier == NULL) {
/* Delete created appliers */
- while (--i >= 0)
- applier_delete(appliers[i]);
- return NULL;
+ while (--j >= 0)
+ applier_delete(appliers[j]);
+ return NULL;
+ }
+ appliers[i] = applier; /* link to the list */
+ /* Anon replica should be removed.*/
+ remove_sources[i] = strdup(source);
+ }
+ *p_count = count;
+ *new_count = count;
+ } else {
+ int j = 0;
+ /* Create appliers only for new sources */
+ for (int i = 0; i < count; i++) {
+ const char *source = cfg_getarr_elem("replication", i);
+ if (!source_exist(source)) {
+ struct applier *applier = applier_new(source);
+ if (applier == NULL) {
+ /* Delete created appliers */
+ while (--j >= 0)
+ applier_delete(appliers[j]);
+ return NULL;
+ }
+ appliers[j] = applier; /* link to the list */
+ j++;
+ }
+ }
+ int i = 0;
+ /* List of appliers to be deleted */
+ replicaset_foreach(replica) {
+ if (replica->applier != NULL &&
+ !source_left(count, replica->applier->source)) {
+ remove_sources[i] = strdup(replica->applier->source);
+ i++;
}
- appliers[i] = applier; /* link to the list */
+ }
+ /* Return only number on newly created appliers.*/
+ *p_count = count;
+ *new_count = j;
}
-
- *p_count = count;
-
return appliers;
}
@@ -707,16 +774,18 @@ static void
box_sync_replication(bool connect_quorum)
{
int count = 0;
- struct applier **appliers = cfg_get_replication(&count);
+ int new_count = 0;
+ struct applier **appliers = cfg_get_replication(&count, &new_count);
+
if (appliers == NULL)
diag_raise();
auto guard = make_scoped_guard([=]{
- for (int i = 0; i < count; i++)
+ for (int i = 0; i < new_count; i++)
applier_delete(appliers[i]); /* doesn't affect diag */
});
- replicaset_connect(appliers, count, connect_quorum);
+ replicaset_connect(appliers, count, connect_quorum, new_count);
guard.is_active = false;
}
@@ -796,7 +865,6 @@ box_set_replication_anon(void)
replication_anon = !anon;
});
/* Turn anonymous instance into a normal one. */
- replication_anon = anon;
/*
* Reset all appliers. This will interrupt
* anonymous follow they're in so that one of
@@ -804,6 +872,10 @@ box_set_replication_anon(void)
* non-anonymous subscribe.
*/
box_sync_replication(false);
+ /* Turn anonymous instance into a normal one.
+ * Need to check config first, and then set replication_anon
+ */
+ replication_anon = anon;
/*
* Wait until the master has registered this
* instance.
diff --git a/src/box/box.h b/src/box/box.h
index a212e6510..5babfd448 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -471,6 +471,7 @@ box_process_rw(struct request *request, struct space *space,
int
boxk(int type, uint32_t space_id, const char *format, ...);
+extern char *remove_sources[31];
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/replication.cc b/src/box/replication.cc
index e7bfa22ab..a71a6f0f0 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -482,12 +482,27 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
return 0;
}
+static bool
+check_and_remove_sources(const char *source) {
+ for(int i = 0; i < VCLOCK_MAX - 1; i++) {
+ if (remove_sources[i] != NULL) {
+ if (strcmp(source, remove_sources[i]) == 0) {
+ free(remove_sources[i]);
+ remove_sources[i] = NULL;
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
/**
* Update the replica set with new "applier" objects
* upon reconfiguration of box.cfg.replication.
+ * Count is total number of applier, new_count - only new ones.
*/
static void
-replicaset_update(struct applier **appliers, int count)
+replicaset_update(struct applier **appliers, int count, int new_count)
{
replica_hash_t uniq;
memset(&uniq, 0, sizeof(uniq));
@@ -505,7 +520,7 @@ replicaset_update(struct applier **appliers, int count)
});
/* Check for duplicate UUID */
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < new_count; i++) {
applier = appliers[i];
replica = replica_new();
replica_set_applier(replica, applier);
@@ -540,7 +555,7 @@ replicaset_update(struct applier **appliers, int count)
* apply the new configuration. Nothing can fail after this point.
*/
- /* Prune old appliers */
+ /* Prune all appliers that haven't finished bootstrap yet.*/
while (!rlist_empty(&replicaset.anon)) {
replica = rlist_first_entry(&replicaset.anon,
typeof(*replica), in_anon);
@@ -551,21 +566,25 @@ replicaset_update(struct applier **appliers, int count)
applier_stop(applier);
applier_delete(applier);
}
+ /*Throw away applier with sources from remove list and stopped once.*/
replicaset_foreach(replica) {
if (replica->applier == NULL)
continue;
applier = replica->applier;
- replica_clear_applier(replica);
- replica->applier_sync_state = APPLIER_DISCONNECTED;
- applier_stop(applier);
- applier_delete(applier);
+ if (check_and_remove_sources(applier->source) || count == 0 ||
+ applier->state == APPLIER_OFF) {
+ replica_clear_applier(replica);
+ replica->applier_sync_state = APPLIER_DISCONNECTED;
+ applier_stop(applier);
+ applier_delete(applier);
+ }
}
/* Save new appliers */
replicaset.applier.total = count;
replicaset.applier.connected = 0;
replicaset.applier.loading = 0;
- replicaset.applier.synced = 0;
+ replicaset.applier.synced = count - new_count;
replica_hash_foreach_safe(&uniq, replica, next) {
replica_hash_remove(&uniq, replica);
@@ -640,15 +659,16 @@ applier_on_connect_f(struct trigger *trigger, void *event)
void
replicaset_connect(struct applier **appliers, int count,
- bool connect_quorum)
+ bool connect_quorum, int new_count)
{
if (count == 0) {
/* Cleanup the replica set. */
- replicaset_update(appliers, count);
+ replicaset_update(appliers, count, new_count);
return;
}
- say_info("connecting to %d replicas", count);
+ say_info("connected to %d replicas", count);
+ say_info("connecting to %d replicas", new_count);
if (!connect_quorum) {
/*
@@ -680,14 +700,15 @@ replicaset_connect(struct applier **appliers, int count,
struct applier_on_connect triggers[VCLOCK_MAX];
struct replicaset_connect_state state;
- state.connected = state.failed = 0;
+ state.connected = count - new_count;
+ state.failed = 0;
fiber_cond_create(&state.wakeup);
double timeout = replication_connect_timeout;
int quorum = MIN(count, replication_connect_quorum);
/* Add triggers and start simulations connection to remote peers */
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < new_count; i++) {
struct applier *applier = appliers[i];
struct applier_on_connect *trigger = &triggers[i];
/* Register a trigger to wake us up when peer is connected */
@@ -733,7 +754,7 @@ replicaset_connect(struct applier **appliers, int count,
say_info("connected to %d replicas", state.connected);
}
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < new_count; i++) {
/* Unregister the temporary trigger used to wake us up */
trigger_clear(&triggers[i].base);
/*
@@ -748,14 +769,17 @@ replicaset_connect(struct applier **appliers, int count,
/* Now all the appliers are connected, update the replica set. */
try {
- replicaset_update(appliers, count);
+ replicaset_update(appliers, count, new_count);
} catch (Exception *e) {
goto error;
}
+ for (int i = 0; i < new_count; i++) {
+ appliers[i] = NULL;
+ }
return;
error:
/* Destroy appliers */
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < new_count; i++) {
trigger_clear(&triggers[i].base);
applier_stop(appliers[i]);
}
diff --git a/src/box/replication.h b/src/box/replication.h
index 2ef1255b3..4db35b0d1 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -398,10 +398,11 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid);
* \param connect_quorum if this flag is set, fail unless at
* least replication_connect_quorum
* appliers have successfully connected.
+ * \param new_count size of new appliers array
*/
void
replicaset_connect(struct applier **appliers, int count,
- bool connect_quorum);
+ bool connect_quorumm, int new_count);
/**
* Check if the current instance fell too much behind its
diff --git a/test/replication/misc.result b/test/replication/misc.result
index b63d72846..94115804a 100644
--- a/test/replication/misc.result
+++ b/test/replication/misc.result
@@ -457,6 +457,9 @@ test_run:cmd("switch replica")
replication = box.cfg.replication[1]
---
...
+box.cfg{replication = {}}
+---
+...
box.cfg{replication = {replication, replication}}
---
- error: 'Incorrect value for option ''replication'': duplicate connection to the
@@ -477,6 +480,9 @@ test_run:cmd("switch replica")
---
- true
...
+box.cfg{replication = {}}
+---
+...
box.cfg{replication_connect_quorum = 0, replication_connect_timeout = 0.01}
---
...
diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua
index c454a0992..96c87be88 100644
--- a/test/replication/misc.test.lua
+++ b/test/replication/misc.test.lua
@@ -190,6 +190,7 @@ test_run:cmd("create server replica with rpl_master=default, script='replication
test_run:cmd("start server replica")
test_run:cmd("switch replica")
replication = box.cfg.replication[1]
+box.cfg{replication = {}}
box.cfg{replication = {replication, replication}}
-- Check the case when duplicate connection is detected in the background.
@@ -198,6 +199,7 @@ listen = box.cfg.listen
box.cfg{listen = ''}
test_run:cmd("switch replica")
+box.cfg{replication = {}}
box.cfg{replication_connect_quorum = 0, replication_connect_timeout = 0.01}
box.cfg{replication = {replication, replication}}
diff --git a/test/replication/quorum.result b/test/replication/quorum.result
index 07abe7f2a..35a42cc03 100644
--- a/test/replication/quorum.result
+++ b/test/replication/quorum.result
@@ -432,6 +432,9 @@ test_run:cmd('switch replica_quorum')
...
-- If replication_connect_quorum was ignored here, the instance
-- would exit with an error.
+box.cfg{replication={}}
+---
+...
box.cfg{replication={INSTANCE_URI, nonexistent_uri(1)}}
---
...
diff --git a/test/replication/quorum.test.lua b/test/replication/quorum.test.lua
index 5f2872675..c8d8e3700 100644
--- a/test/replication/quorum.test.lua
+++ b/test/replication/quorum.test.lua
@@ -164,6 +164,7 @@ test_run:cmd('start server replica_quorum with wait=True, wait_load=True, args="
test_run:cmd('switch replica_quorum')
-- If replication_connect_quorum was ignored here, the instance
-- would exit with an error.
+box.cfg{replication={}}
box.cfg{replication={INSTANCE_URI, nonexistent_uri(1)}}
box.info.id
test_run:cmd('switch default')
diff --git a/test/replication/replica_self.lua b/test/replication/replica_self.lua
new file mode 100644
index 000000000..7f972570d
--- /dev/null
+++ b/test/replication/replica_self.lua
@@ -0,0 +1,11 @@
+#!/usr/bin/env tarantool
+
+box.cfg({
+ listen = os.getenv("LISTEN"),
+ replication = {os.getenv("MASTER"), os.getenv("LISTEN")},
+ memtx_memory = 107374182,
+ replication_timeout = 0.1,
+ replication_connect_timeout = 0.5,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/self.result b/test/replication/self.result
new file mode 100644
index 000000000..77eb417dc
--- /dev/null
+++ b/test/replication/self.result
@@ -0,0 +1,68 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+--box.schema.user.revoke('guest', 'replication')
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+
+--
+-- gh-4669 any change in box.cfg replication list resets all connections
+--
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica_self.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd("start server replica")
+ | ---
+ | - true
+ | ...
+test_run:cmd("switch replica")
+ | ---
+ | - true
+ | ...
+replication = box.cfg.replication
+ | ---
+ | ...
+replication_change = {}
+ | ---
+ | ...
+replication_change[1] = replication[2]
+ | ---
+ | ...
+replication_change[2] = replication[1]
+ | ---
+ | ...
+box.cfg{replication = replication_change}
+ | ---
+ | ...
+
+test_run:cmd("switch default")
+ | ---
+ | - true
+ | ...
+--search log for error with duplicated connection from relay
+test_run:grep_log('replica', 'duplicate connection') == nil
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server replica")
+ | ---
+ | - true
+ | ...
+test_run:cmd("cleanup server replica")
+ | ---
+ | - true
+ | ...
+test_run:cmd("delete server replica")
+ | ---
+ | - true
+ | ...
+test_run:cleanup_cluster()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
diff --git a/test/replication/self.test.lua b/test/replication/self.test.lua
new file mode 100644
index 000000000..0fcf0c98d
--- /dev/null
+++ b/test/replication/self.test.lua
@@ -0,0 +1,25 @@
+test_run = require('test_run').new()
+
+--box.schema.user.revoke('guest', 'replication')
+box.schema.user.grant('guest', 'replication')
+
+--
+-- gh-4669 any change in box.cfg replication list resets all connections
+--
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica_self.lua"')
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+replication = box.cfg.replication
+replication_change = {}
+replication_change[1] = replication[2]
+replication_change[2] = replication[1]
+box.cfg{replication = replication_change}
+
+test_run:cmd("switch default")
+--search log for error with duplicated connection from relay
+test_run:grep_log('replica', 'duplicate connection') == nil
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+test_run:cmd("delete server replica")
+test_run:cleanup_cluster()
+box.schema.user.revoke('guest', 'replication')
--
2.20.1 (Apple Git-117)
More information about the Tarantool-patches
mailing list