From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp53.i.mail.ru (smtp53.i.mail.ru [94.100.177.113]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C91B44696C4 for ; Thu, 2 Apr 2020 16:29:56 +0300 (MSK) From: Olga Arkhangelskaia Date: Thu, 2 Apr 2020 16:29:48 +0300 Message-Id: <20200402132948.12804-2-arkholga@tarantool.org> In-Reply-To: <20200402132948.12804-1-arkholga@tarantool.org> References: <20200402132948.12804-1-arkholga@tarantool.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH rfc 1/1] replication: stop resetting existing connections List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Every time replication configuration changes replica resets all connections, even if the source is in the new configuration. This slows down reconfiguration thru ER_CFG from the already existing realy. Patch allows to create only absolutely new appliers. Closes #4669 Part of #4668 --- Branch: https://github.com/tarantool/tarantool/tree/OKriw/gh-4669-Any-change-in-box.cfg-replication-list-resets-all-connections src/box/box.cc | 106 +++++++++++++++++++++++++----- src/box/box.h | 1 + src/box/replication.cc | 56 +++++++++++----- src/box/replication.h | 3 +- test/replication/misc.result | 6 ++ test/replication/misc.test.lua | 2 + test/replication/quorum.result | 3 + test/replication/quorum.test.lua | 1 + test/replication/replica_self.lua | 11 ++++ test/replication/self.result | 68 +++++++++++++++++++ test/replication/self.test.lua | 25 +++++++ 11 files changed, 248 insertions(+), 34 deletions(-) create mode 100644 test/replication/replica_self.lua create mode 100644 test/replication/self.result create mode 100644 test/replication/self.test.lua diff --git a/src/box/box.cc b/src/box/box.cc index 1b2b27d61..b23cf991c 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -139,6 +139,12 @@ static struct fiber_pool tx_fiber_pool; */ static struct cbus_endpoint tx_prio_endpoint; +/** + * Array to store sources of replication which appliers + * will removed. + */ +char *remove_sources[VCLOCK_MAX-1]; + static int box_check_writable(void) { @@ -666,11 +672,38 @@ box_check_config() box_check_sql_cache_size(cfg_geti("sql_cache_size")); } +/* + * Check if a source from new config is already in the replicaset. + */ +static bool source_exist(const char *source) +{ + replicaset_foreach(replica) { + if (replica->applier != NULL) { + if (strcmp(replica->applier->source, source) == 0) + return true; + } + } + return false; +} + +/* + * Check whether replicaset source is in new configuration. + */ + static bool source_left(int count, const char *source_old) +{ + for (int i = 0; i < count; i++) { + const char *source_new = cfg_getarr_elem("replication", i); + if(strcmp(source_new, source_old) == 0) + return true; + } + return false; +} + /* * Parse box.cfg.replication and create appliers. */ static struct applier ** -cfg_get_replication(int *p_count) +cfg_get_replication(int *p_count, int *new_count) { /* Use static buffer for result */ @@ -681,21 +714,55 @@ cfg_get_replication(int *p_count) tnt_raise(ClientError, ER_CFG, "replication", "too many replicas"); } - - for (int i = 0; i < count; i++) { - const char *source = cfg_getarr_elem("replication", i); - struct applier *applier = applier_new(source); - if (applier == NULL) { + /* Transition between anon and non-anon replicas is treated + * separately.*/ + if (box_check_replication_anon() != replication_anon) { + int j = 0; + for (int i = 0; i < count; i++) { + const char *source = cfg_getarr_elem("replication", i); + struct applier *applier = applier_new(source); + if (applier == NULL) { /* Delete created appliers */ - while (--i >= 0) - applier_delete(appliers[i]); - return NULL; + while (--j >= 0) + applier_delete(appliers[j]); + return NULL; + } + appliers[i] = applier; /* link to the list */ + /* Anon replica should be removed.*/ + remove_sources[i] = strdup(source); + } + *p_count = count; + *new_count = count; + } else { + int j = 0; + /* Create appliers only for new sources */ + for (int i = 0; i < count; i++) { + const char *source = cfg_getarr_elem("replication", i); + if (!source_exist(source)) { + struct applier *applier = applier_new(source); + if (applier == NULL) { + /* Delete created appliers */ + while (--j >= 0) + applier_delete(appliers[j]); + return NULL; + } + appliers[j] = applier; /* link to the list */ + j++; + } + } + int i = 0; + /* List of appliers to be deleted */ + replicaset_foreach(replica) { + if (replica->applier != NULL && + !source_left(count, replica->applier->source)) { + remove_sources[i] = strdup(replica->applier->source); + i++; } - appliers[i] = applier; /* link to the list */ + } + /* Return only number on newly created appliers.*/ + *p_count = count; + *new_count = j; } - - *p_count = count; - return appliers; } @@ -707,16 +774,18 @@ static void box_sync_replication(bool connect_quorum) { int count = 0; - struct applier **appliers = cfg_get_replication(&count); + int new_count = 0; + struct applier **appliers = cfg_get_replication(&count, &new_count); + if (appliers == NULL) diag_raise(); auto guard = make_scoped_guard([=]{ - for (int i = 0; i < count; i++) + for (int i = 0; i < new_count; i++) applier_delete(appliers[i]); /* doesn't affect diag */ }); - replicaset_connect(appliers, count, connect_quorum); + replicaset_connect(appliers, count, connect_quorum, new_count); guard.is_active = false; } @@ -796,7 +865,6 @@ box_set_replication_anon(void) replication_anon = !anon; }); /* Turn anonymous instance into a normal one. */ - replication_anon = anon; /* * Reset all appliers. This will interrupt * anonymous follow they're in so that one of @@ -804,6 +872,10 @@ box_set_replication_anon(void) * non-anonymous subscribe. */ box_sync_replication(false); + /* Turn anonymous instance into a normal one. + * Need to check config first, and then set replication_anon + */ + replication_anon = anon; /* * Wait until the master has registered this * instance. diff --git a/src/box/box.h b/src/box/box.h index a212e6510..5babfd448 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -471,6 +471,7 @@ box_process_rw(struct request *request, struct space *space, int boxk(int type, uint32_t space_id, const char *format, ...); +extern char *remove_sources[31]; #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/replication.cc b/src/box/replication.cc index e7bfa22ab..a71a6f0f0 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -482,12 +482,27 @@ replica_on_applier_state_f(struct trigger *trigger, void *event) return 0; } +static bool +check_and_remove_sources(const char *source) { + for(int i = 0; i < VCLOCK_MAX - 1; i++) { + if (remove_sources[i] != NULL) { + if (strcmp(source, remove_sources[i]) == 0) { + free(remove_sources[i]); + remove_sources[i] = NULL; + return true; + } + } + } + return false; +} + /** * Update the replica set with new "applier" objects * upon reconfiguration of box.cfg.replication. + * Count is total number of applier, new_count - only new ones. */ static void -replicaset_update(struct applier **appliers, int count) +replicaset_update(struct applier **appliers, int count, int new_count) { replica_hash_t uniq; memset(&uniq, 0, sizeof(uniq)); @@ -505,7 +520,7 @@ replicaset_update(struct applier **appliers, int count) }); /* Check for duplicate UUID */ - for (int i = 0; i < count; i++) { + for (int i = 0; i < new_count; i++) { applier = appliers[i]; replica = replica_new(); replica_set_applier(replica, applier); @@ -540,7 +555,7 @@ replicaset_update(struct applier **appliers, int count) * apply the new configuration. Nothing can fail after this point. */ - /* Prune old appliers */ + /* Prune all appliers that haven't finished bootstrap yet.*/ while (!rlist_empty(&replicaset.anon)) { replica = rlist_first_entry(&replicaset.anon, typeof(*replica), in_anon); @@ -551,21 +566,25 @@ replicaset_update(struct applier **appliers, int count) applier_stop(applier); applier_delete(applier); } + /*Throw away applier with sources from remove list and stopped once.*/ replicaset_foreach(replica) { if (replica->applier == NULL) continue; applier = replica->applier; - replica_clear_applier(replica); - replica->applier_sync_state = APPLIER_DISCONNECTED; - applier_stop(applier); - applier_delete(applier); + if (check_and_remove_sources(applier->source) || count == 0 || + applier->state == APPLIER_OFF) { + replica_clear_applier(replica); + replica->applier_sync_state = APPLIER_DISCONNECTED; + applier_stop(applier); + applier_delete(applier); + } } /* Save new appliers */ replicaset.applier.total = count; replicaset.applier.connected = 0; replicaset.applier.loading = 0; - replicaset.applier.synced = 0; + replicaset.applier.synced = count - new_count; replica_hash_foreach_safe(&uniq, replica, next) { replica_hash_remove(&uniq, replica); @@ -640,15 +659,16 @@ applier_on_connect_f(struct trigger *trigger, void *event) void replicaset_connect(struct applier **appliers, int count, - bool connect_quorum) + bool connect_quorum, int new_count) { if (count == 0) { /* Cleanup the replica set. */ - replicaset_update(appliers, count); + replicaset_update(appliers, count, new_count); return; } - say_info("connecting to %d replicas", count); + say_info("connected to %d replicas", count); + say_info("connecting to %d replicas", new_count); if (!connect_quorum) { /* @@ -680,14 +700,15 @@ replicaset_connect(struct applier **appliers, int count, struct applier_on_connect triggers[VCLOCK_MAX]; struct replicaset_connect_state state; - state.connected = state.failed = 0; + state.connected = count - new_count; + state.failed = 0; fiber_cond_create(&state.wakeup); double timeout = replication_connect_timeout; int quorum = MIN(count, replication_connect_quorum); /* Add triggers and start simulations connection to remote peers */ - for (int i = 0; i < count; i++) { + for (int i = 0; i < new_count; i++) { struct applier *applier = appliers[i]; struct applier_on_connect *trigger = &triggers[i]; /* Register a trigger to wake us up when peer is connected */ @@ -733,7 +754,7 @@ replicaset_connect(struct applier **appliers, int count, say_info("connected to %d replicas", state.connected); } - for (int i = 0; i < count; i++) { + for (int i = 0; i < new_count; i++) { /* Unregister the temporary trigger used to wake us up */ trigger_clear(&triggers[i].base); /* @@ -748,14 +769,17 @@ replicaset_connect(struct applier **appliers, int count, /* Now all the appliers are connected, update the replica set. */ try { - replicaset_update(appliers, count); + replicaset_update(appliers, count, new_count); } catch (Exception *e) { goto error; } + for (int i = 0; i < new_count; i++) { + appliers[i] = NULL; + } return; error: /* Destroy appliers */ - for (int i = 0; i < count; i++) { + for (int i = 0; i < new_count; i++) { trigger_clear(&triggers[i].base); applier_stop(appliers[i]); } diff --git a/src/box/replication.h b/src/box/replication.h index 2ef1255b3..4db35b0d1 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -398,10 +398,11 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid); * \param connect_quorum if this flag is set, fail unless at * least replication_connect_quorum * appliers have successfully connected. + * \param new_count size of new appliers array */ void replicaset_connect(struct applier **appliers, int count, - bool connect_quorum); + bool connect_quorumm, int new_count); /** * Check if the current instance fell too much behind its diff --git a/test/replication/misc.result b/test/replication/misc.result index b63d72846..94115804a 100644 --- a/test/replication/misc.result +++ b/test/replication/misc.result @@ -457,6 +457,9 @@ test_run:cmd("switch replica") replication = box.cfg.replication[1] --- ... +box.cfg{replication = {}} +--- +... box.cfg{replication = {replication, replication}} --- - error: 'Incorrect value for option ''replication'': duplicate connection to the @@ -477,6 +480,9 @@ test_run:cmd("switch replica") --- - true ... +box.cfg{replication = {}} +--- +... box.cfg{replication_connect_quorum = 0, replication_connect_timeout = 0.01} --- ... diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua index c454a0992..96c87be88 100644 --- a/test/replication/misc.test.lua +++ b/test/replication/misc.test.lua @@ -190,6 +190,7 @@ test_run:cmd("create server replica with rpl_master=default, script='replication test_run:cmd("start server replica") test_run:cmd("switch replica") replication = box.cfg.replication[1] +box.cfg{replication = {}} box.cfg{replication = {replication, replication}} -- Check the case when duplicate connection is detected in the background. @@ -198,6 +199,7 @@ listen = box.cfg.listen box.cfg{listen = ''} test_run:cmd("switch replica") +box.cfg{replication = {}} box.cfg{replication_connect_quorum = 0, replication_connect_timeout = 0.01} box.cfg{replication = {replication, replication}} diff --git a/test/replication/quorum.result b/test/replication/quorum.result index 07abe7f2a..35a42cc03 100644 --- a/test/replication/quorum.result +++ b/test/replication/quorum.result @@ -432,6 +432,9 @@ test_run:cmd('switch replica_quorum') ... -- If replication_connect_quorum was ignored here, the instance -- would exit with an error. +box.cfg{replication={}} +--- +... box.cfg{replication={INSTANCE_URI, nonexistent_uri(1)}} --- ... diff --git a/test/replication/quorum.test.lua b/test/replication/quorum.test.lua index 5f2872675..c8d8e3700 100644 --- a/test/replication/quorum.test.lua +++ b/test/replication/quorum.test.lua @@ -164,6 +164,7 @@ test_run:cmd('start server replica_quorum with wait=True, wait_load=True, args=" test_run:cmd('switch replica_quorum') -- If replication_connect_quorum was ignored here, the instance -- would exit with an error. +box.cfg{replication={}} box.cfg{replication={INSTANCE_URI, nonexistent_uri(1)}} box.info.id test_run:cmd('switch default') diff --git a/test/replication/replica_self.lua b/test/replication/replica_self.lua new file mode 100644 index 000000000..7f972570d --- /dev/null +++ b/test/replication/replica_self.lua @@ -0,0 +1,11 @@ +#!/usr/bin/env tarantool + +box.cfg({ + listen = os.getenv("LISTEN"), + replication = {os.getenv("MASTER"), os.getenv("LISTEN")}, + memtx_memory = 107374182, + replication_timeout = 0.1, + replication_connect_timeout = 0.5, +}) + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/replication/self.result b/test/replication/self.result new file mode 100644 index 000000000..77eb417dc --- /dev/null +++ b/test/replication/self.result @@ -0,0 +1,68 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... + +--box.schema.user.revoke('guest', 'replication') +box.schema.user.grant('guest', 'replication') + | --- + | ... + +-- +-- gh-4669 any change in box.cfg replication list resets all connections +-- +test_run:cmd('create server replica with rpl_master=default, script="replication/replica_self.lua"') + | --- + | - true + | ... +test_run:cmd("start server replica") + | --- + | - true + | ... +test_run:cmd("switch replica") + | --- + | - true + | ... +replication = box.cfg.replication + | --- + | ... +replication_change = {} + | --- + | ... +replication_change[1] = replication[2] + | --- + | ... +replication_change[2] = replication[1] + | --- + | ... +box.cfg{replication = replication_change} + | --- + | ... + +test_run:cmd("switch default") + | --- + | - true + | ... +--search log for error with duplicated connection from relay +test_run:grep_log('replica', 'duplicate connection') == nil + | --- + | - true + | ... +test_run:cmd("stop server replica") + | --- + | - true + | ... +test_run:cmd("cleanup server replica") + | --- + | - true + | ... +test_run:cmd("delete server replica") + | --- + | - true + | ... +test_run:cleanup_cluster() + | --- + | ... +box.schema.user.revoke('guest', 'replication') + | --- + | ... diff --git a/test/replication/self.test.lua b/test/replication/self.test.lua new file mode 100644 index 000000000..0fcf0c98d --- /dev/null +++ b/test/replication/self.test.lua @@ -0,0 +1,25 @@ +test_run = require('test_run').new() + +--box.schema.user.revoke('guest', 'replication') +box.schema.user.grant('guest', 'replication') + +-- +-- gh-4669 any change in box.cfg replication list resets all connections +-- +test_run:cmd('create server replica with rpl_master=default, script="replication/replica_self.lua"') +test_run:cmd("start server replica") +test_run:cmd("switch replica") +replication = box.cfg.replication +replication_change = {} +replication_change[1] = replication[2] +replication_change[2] = replication[1] +box.cfg{replication = replication_change} + +test_run:cmd("switch default") +--search log for error with duplicated connection from relay +test_run:grep_log('replica', 'duplicate connection') == nil +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") +test_run:cmd("delete server replica") +test_run:cleanup_cluster() +box.schema.user.revoke('guest', 'replication') -- 2.20.1 (Apple Git-117)