LGTM. Sergos > On 12 Oct 2021, at 17:14, Serge Petrenko wrote: > > > > 12.10.2021 13:14, sergos пишет: >> Hi! >> >> Thanks for the patch! >> >> Just request for some addition to the test, otherwise LGTM. >> >> Sergos >> >> >>> On 5 Oct 2021, at 16:18, Serge Petrenko wrote: >>> >>> Replication reconfiguration used to work as follows: upon receiving a >>> new config disconnect from all the existing masters and try to connect >>> to all the masters from the new config. >>> >>> This lead to instances doing extra work when old config and new config >>> had the same nodes in them: instead of doing nothing, tarantool >>> reinitialized the connection. >>> >>> There was another problem: when an existing connection is broken, master >>> takes some time to notice that. So, when replica resets the connection, >>> it may try to reconnect faster than the master is able to notice its >>> absence. In this case replica wouldn't be able to reconnect due to >>> `duplicate connection with the same replica UUID`. So replication would >>> stop for a replication_timeout, which may be quite large (seconds or >>> tens of seconds). >>> >>> Let's prevent tarantool from reconnecting to the same instance, if there >>> already is a working connection. >>> >>> Closes #4669 >>> --- >>> .../gh-4669-applier-reconfig-reconnect.md | 5 ++ >>> src/box/box.cc | 31 ++++++--- >>> src/box/replication.cc | 41 ++++++++---- >>> src/box/replication.h | 4 +- >>> test/instance_files/base_instance.lua | 3 +- >>> test/instance_files/replica.lua | 17 +++++ >>> test/luatest_helpers/server.lua | 65 +++++++++++++++++++ >>> .../gh_4669_applier_reconnect_test.lua | 40 ++++++++++++ >>> 8 files changed, 184 insertions(+), 22 deletions(-) >>> create mode 100644 changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md >>> create mode 100755 test/instance_files/replica.lua >>> create mode 100644 test/replication-luatest/gh_4669_applier_reconnect_test.lua >>> >>> diff --git a/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md >>> new file mode 100644 >>> index 000000000..2a776872b >>> --- /dev/null >>> +++ b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md >>> @@ -0,0 +1,5 @@ >>> +## bugfix/replication >>> + >>> +* Fixed replica reconnecting to a living master on any `box.cfg{replication=...}` >>> + change. Such reconnects could lead to replica failing to restore connection >>> + for `replication_timeout` seconds (gh-4669). >>> diff --git a/src/box/box.cc b/src/box/box.cc >>> index 219ffa38d..cc4ada47e 100644 >>> --- a/src/box/box.cc >>> +++ b/src/box/box.cc >>> @@ -1249,7 +1249,7 @@ cfg_get_replication(int *p_count) >>> * don't start appliers. >>> */ >>> static void >>> -box_sync_replication(bool connect_quorum) >>> +box_sync_replication(bool do_quorum, bool do_reuse) >>> { >>> int count = 0; >>> struct applier **appliers = cfg_get_replication(&count); >>> @@ -1260,12 +1260,27 @@ box_sync_replication(bool connect_quorum) >>> for (int i = 0; i < count; i++) >>> applier_delete(appliers[i]); /* doesn't affect diag */ >>> }); >>> - >>> - replicaset_connect(appliers, count, connect_quorum); >>> + replicaset_connect(appliers, count, do_quorum, do_reuse); >>> >>> guard.is_active = false; >>> } >>> >>> +static inline void >>> +box_restart_replication(void) >>> +{ >>> + const bool do_quorum = true; >>> + const bool do_reuse = false; >>> + box_sync_replication(do_quorum, do_reuse); >>> +} >>> + >>> +static inline void >>> +box_update_replication(void) >>> +{ >>> + const bool do_quorum = false; >>> + const bool do_reuse = true; >>> + box_sync_replication(do_quorum, do_reuse); >>> +} >>> + >>> void >>> box_set_replication(void) >>> { >>> @@ -1284,7 +1299,7 @@ box_set_replication(void) >>> * Stay in orphan mode in case we fail to connect to at least >>> * 'replication_connect_quorum' remote instances. >>> */ >>> - box_sync_replication(false); >>> + box_update_replication(); >>> /* Follow replica */ >>> replicaset_follow(); >>> /* Wait until appliers are in sync */ >>> @@ -1404,7 +1419,7 @@ box_set_replication_anon(void) >>> * them can register and others resend a >>> * non-anonymous subscribe. >>> */ >>> - box_sync_replication(true); >>> + box_restart_replication(); >>> /* >>> * Wait until the master has registered this >>> * instance. >>> @@ -3258,7 +3273,7 @@ bootstrap(const struct tt_uuid *instance_uuid, >>> * with connecting to 'replication_connect_quorum' masters. >>> * If this also fails, throw an error. >>> */ >>> - box_sync_replication(true); >>> + box_restart_replication(); >>> >>> struct replica *master = replicaset_find_join_master(); >>> assert(master == NULL || master->applier != NULL); >>> @@ -3335,7 +3350,7 @@ local_recovery(const struct tt_uuid *instance_uuid, >>> if (wal_dir_lock >= 0) { >>> if (box_listen() != 0) >>> diag_raise(); >>> - box_sync_replication(false); >>> + box_update_replication(); >>> >>> struct replica *master; >>> if (replicaset_needs_rejoin(&master)) { >>> @@ -3414,7 +3429,7 @@ local_recovery(const struct tt_uuid *instance_uuid, >>> vclock_copy(&replicaset.vclock, &recovery->vclock); >>> if (box_listen() != 0) >>> diag_raise(); >>> - box_sync_replication(false); >>> + box_update_replication(); >>> } >>> stream_guard.is_active = false; >>> recovery_finalize(recovery); >>> diff --git a/src/box/replication.cc b/src/box/replication.cc >>> index 1288bc9b1..10b4ac915 100644 >>> --- a/src/box/replication.cc >>> +++ b/src/box/replication.cc >>> @@ -507,7 +507,7 @@ replica_on_applier_state_f(struct trigger *trigger, void *event) >>> * upon reconfiguration of box.cfg.replication. >>> */ >>> static void >>> -replicaset_update(struct applier **appliers, int count) >>> +replicaset_update(struct applier **appliers, int count, bool keep_connect) >>> { >>> replica_hash_t uniq; >>> memset(&uniq, 0, sizeof(uniq)); >>> @@ -572,22 +572,39 @@ replicaset_update(struct applier **appliers, int count) >>> applier_stop(applier); >>> applier_delete(applier); >>> } >>> + >>> + replicaset.applier.total = count; >>> + replicaset.applier.connected = 0; >>> + replicaset.applier.loading = 0; >>> + replicaset.applier.synced = 0; >>> replicaset_foreach(replica) { >>> if (replica->applier == NULL) >>> continue; >>> - applier = replica->applier; >>> - replica_clear_applier(replica); >>> - replica->applier_sync_state = APPLIER_DISCONNECTED; >>> + struct replica *other = replica_hash_search(&uniq, replica); >>> + if (keep_connect && other != NULL && >>> + (replica->applier->state == APPLIER_FOLLOW || >>> + replica->applier->state == APPLIER_SYNC)) { >>> + /* >>> + * Try not to interrupt working appliers upon >>> + * reconfiguration. >>> + */ >>> + replicaset.applier.connected++; >>> + replicaset.applier.synced++; >>> + replica_hash_remove(&uniq, other); >>> + applier = other->applier; >>> + replica_clear_applier(other); >>> + replica_delete(other); >>> + } else { >>> + applier = replica->applier; >>> + replica_clear_applier(replica); >>> + replica->applier_sync_state = APPLIER_DISCONNECTED; >>> + } >>> applier_stop(applier); >>> applier_delete(applier); >>> } >>> >>> - /* Save new appliers */ >>> - replicaset.applier.total = count; >>> - replicaset.applier.connected = 0; >>> - replicaset.applier.loading = 0; >>> - replicaset.applier.synced = 0; >>> >>> + /* Save new appliers */ >>> replica_hash_foreach_safe(&uniq, replica, next) { >>> replica_hash_remove(&uniq, replica); >>> >>> @@ -664,11 +681,11 @@ applier_on_connect_f(struct trigger *trigger, void *event) >>> >>> void >>> replicaset_connect(struct applier **appliers, int count, >>> - bool connect_quorum) >>> + bool connect_quorum, bool keep_connect) >>> { >>> if (count == 0) { >>> /* Cleanup the replica set. */ >>> - replicaset_update(appliers, count); >>> + replicaset_update(appliers, 0, false); >>> return; >>> } >>> >>> @@ -772,7 +789,7 @@ replicaset_connect(struct applier **appliers, int count, >>> >>> /* Now all the appliers are connected, update the replica set. */ >>> try { >>> - replicaset_update(appliers, count); >>> + replicaset_update(appliers, count, keep_connect); >>> } catch (Exception *e) { >>> goto error; >>> } >>> diff --git a/src/box/replication.h b/src/box/replication.h >>> index 4c82cd839..a8fed45e8 100644 >>> --- a/src/box/replication.h >>> +++ b/src/box/replication.h >>> @@ -439,10 +439,12 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid); >>> * \param connect_quorum if this flag is set, fail unless at >>> * least replication_connect_quorum >>> * appliers have successfully connected. >>> + * \param keep_connect if this flag is set do not force a reconnect if the >>> + * old connection to the replica is fine. >>> */ >>> void >>> replicaset_connect(struct applier **appliers, int count, >>> - bool connect_quorum); >>> + bool connect_quorum, bool keep_connect); >>> >>> /** >>> * Check if the current instance fell too much behind its >>> diff --git a/test/instance_files/base_instance.lua b/test/instance_files/base_instance.lua >>> index 45bdbc7e8..e579c3843 100755 >>> --- a/test/instance_files/base_instance.lua >>> +++ b/test/instance_files/base_instance.lua >>> @@ -5,7 +5,8 @@ local listen = os.getenv('TARANTOOL_LISTEN') >>> box.cfg({ >>> work_dir = workdir, >>> -- listen = 'localhost:3310' >>> - listen = listen >>> + listen = listen, >>> + log = workdir..'/tarantool.log', >>> }) >>> >>> box.schema.user.grant('guest', 'read,write,execute,create', 'universe') >>> diff --git a/test/instance_files/replica.lua b/test/instance_files/replica.lua >>> new file mode 100755 >>> index 000000000..587345f24 >>> --- /dev/null >>> +++ b/test/instance_files/replica.lua >>> @@ -0,0 +1,17 @@ >>> +#!/usr/bin/env tarantool >>> +local workdir = os.getenv('TARANTOOL_WORKDIR') >>> +local listen = os.getenv('TARANTOOL_LISTEN') >>> +local SOCKET_DIR = os.getenv('VARDIR') >>> +local MASTER = arg[1] or "master" >>> + >>> +local function master_uri() >>> + return SOCKET_DIR..'/'..MASTER..'.sock' >>> +end >>> + >>> +box.cfg({ >>> + work_dir = workdir, >>> + listen = listen, >>> + replication = master_uri(), >>> +}) >>> + >>> +_G.ready = true >>> diff --git a/test/luatest_helpers/server.lua b/test/luatest_helpers/server.lua >>> index 018ea4f7d..0306a24ef 100644 >>> --- a/test/luatest_helpers/server.lua >>> +++ b/test/luatest_helpers/server.lua >>> @@ -5,6 +5,7 @@ local fiber = require('fiber') >>> local fio = require('fio') >>> local fun = require('fun') >>> local json = require('json') >>> +local errno = require('errno') >>> >>> local checks = require('checks') >>> local luatest = require('luatest') >>> @@ -59,6 +60,70 @@ function Server:initialize() >>> getmetatable(getmetatable(self)).initialize(self) >>> end >>> >>> +-- A copy of test_run:grep_log. >>> +function Server:grep_log(what, bytes, opts) >>> + local opts = opts or {} >>> + local noreset = opts.noreset or false >>> + -- if instance has crashed provide filename to use grep_log >>> + local filename = opts.filename or self:eval('return box.cfg.log') >>> + local file = fio.open(filename, {'O_RDONLY', 'O_NONBLOCK'}) >>> + >>> + local function fail(msg) >>> + local err = errno.strerror() >>> + if file ~= nil then >>> + file:close() >>> + end >>> + error(string.format("%s: %s: %s", msg, filename, err)) >>> + end >>> + >>> + if file == nil then >>> + fail("Failed to open log file") >>> + end >>> + io.flush() -- attempt to flush stdout == log fd >>> + local filesize = file:seek(0, 'SEEK_END') >>> + if filesize == nil then >>> + fail("Failed to get log file size") >>> + end >>> + local bytes = bytes or 65536 -- don't read whole log - it can be huge >>> + bytes = bytes > filesize and filesize or bytes >>> + if file:seek(-bytes, 'SEEK_END') == nil then >>> + fail("Failed to seek log file") >>> + end >>> + local found, buf >>> + repeat -- read file in chunks >>> + local s = file:read(2048) >>> + if s == nil then >>> + fail("Failed to read log file") >>> + end >>> + local pos = 1 >>> + repeat -- split read string in lines >>> + local endpos = string.find(s, '\n', pos) >>> + endpos = endpos and endpos - 1 -- strip terminating \n >>> + local line = string.sub(s, pos, endpos) >>> + if endpos == nil and s ~= '' then >>> + -- line doesn't end with \n or eof, append it to buffer >>> + -- to be checked on next iteration >>> + buf = buf or {} >>> + table.insert(buf, line) >>> + else >>> + if buf ~= nil then -- prepend line with buffered data >>> + table.insert(buf, line) >>> + line = table.concat(buf) >>> + buf = nil >>> + end >>> + if string.match(line, "Starting instance") and not noreset then >>> + found = nil -- server was restarted, reset search >>> + else >>> + found = string.match(line, what) or found >>> + end >>> + end >>> + pos = endpos and endpos + 2 -- jump to char after \n >>> + until pos == nil >>> + until s == '' >>> + file:close() >>> + return found >>> +end >>> + >>> --- Generates environment to run process with. >>> -- The result is merged into os.environ(). >>> -- @return map >>> diff --git a/test/replication-luatest/gh_4669_applier_reconnect_test.lua b/test/replication-luatest/gh_4669_applier_reconnect_test.lua >>> new file mode 100644 >>> index 000000000..19e10b136 >>> --- /dev/null >>> +++ b/test/replication-luatest/gh_4669_applier_reconnect_test.lua >>> @@ -0,0 +1,40 @@ >>> +local t = require('luatest') >>> +local cluster = require('test.luatest_helpers.cluster') >>> + >>> +local g = t.group('gh-4669-applier-reconnect') >>> + >>> +local function check_follow_master(server) >>> + return t.assert_equals( >>> + server:eval('return box.info.replication[1].upstream.status'), 'follow') >>> +end >>> + >>> +g.before_each(function() >>> + g.cluster = cluster:new({}) >>> + g.master = g.cluster:build_server( >>> + {}, {alias = 'master'}, 'base_instance.lua') >>> + g.replica = g.cluster:build_server( >>> + {args={'master'}}, {alias = 'replica'}, 'replica.lua') >>> + >>> + g.cluster:join_server(g.master) >>> + g.cluster:join_server(g.replica) >>> + g.cluster:start() >>> + check_follow_master(g.replica) >>> +end) >>> + >>> +g.after_each(function() >>> + g.cluster:stop() >>> +end) >>> + >>> +-- Test that appliers aren't recreated upon replication reconfiguration. >>> +g.test_applier_connection_on_reconfig = function(g) >>> + g.replica:eval([[ >>> + box.cfg{ >>> + replication = { >>> + os.getenv("TARANTOOL_LISTEN"), >>> + box.cfg.replication[1], >>> + } >>> + } >>> + ]]) >> Should we test this with dynamic add/remove of a 2nd replica? > > No problem. Here's the diff: > > ============================ > > diff --git a/test/replication-luatest/gh_4669_applier_reconnect_test.lua b/test/replication-luatest/gh_4669_applier_reconnect_test.lua > index 19e10b136..e082515fd 100644 > --- a/test/replication-luatest/gh_4669_applier_reconnect_test.lua > +++ b/test/replication-luatest/gh_4669_applier_reconnect_test.lua > @@ -14,9 +14,12 @@ g.before_each(function() > {}, {alias = 'master'}, 'base_instance.lua') > g.replica = g.cluster:build_server( > {args={'master'}}, {alias = 'replica'}, 'replica.lua') > + g.replica2 = g.cluster:build_server( > + {args={'master'}}, {alias = 'replica2'}, 'replica.lua') > > g.cluster:join_server(g.master) > g.cluster:join_server(g.replica) > + g.cluster:join_server(g.replica2) > g.cluster:start() > check_follow_master(g.replica) > end) > @@ -26,12 +29,20 @@ g.after_each(function() > end) > > -- Test that appliers aren't recreated upon replication reconfiguration. > +-- Add and then remove two extra replicas to the configuration. The master > +-- connection should stay intact. > g.test_applier_connection_on_reconfig = function(g) > g.replica:eval([[ > box.cfg{ > replication = { > os.getenv("TARANTOOL_LISTEN"), > box.cfg.replication[1], > + os.getenv('VARDIR')..'/replica2.sock' > + } > + } > + box.cfg{ > + replication = { > + box.cfg.replication[2] > } > } > ]]) > > ============================ > >> >>> + check_follow_master(g.replica) >>> + t.assert_equals(g.master:grep_log("exiting the relay loop"), nil) >>> +end >>> -- >>> 2.30.1 (Apple Git-130) >>> > > -- > Serge Petrenko