From: sergos via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: Serge Petrenko <sergepetrenko@tarantool.org> Cc: tarantool-patches@dev.tarantool.org Subject: Re: [Tarantool-patches] [PATCH v2 2/2] replication: fix replica disconnect upon reconfiguration Date: Tue, 12 Oct 2021 13:14:27 +0300 [thread overview] Message-ID: <648688AC-03B8-48B3-B644-8F3B9A9A9212@tarantool.org> (raw) In-Reply-To: <5f68cfcac8c44346a690e0f48ad0da1d7646ac83.1633439400.git.sergepetrenko@tarantool.org> Hi! Thanks for the patch! Just request for some addition to the test, otherwise LGTM. Sergos > On 5 Oct 2021, at 16:18, Serge Petrenko <sergepetrenko@tarantool.org> wrote: > > Replication reconfiguration used to work as follows: upon receiving a > new config disconnect from all the existing masters and try to connect > to all the masters from the new config. > > This lead to instances doing extra work when old config and new config > had the same nodes in them: instead of doing nothing, tarantool > reinitialized the connection. > > There was another problem: when an existing connection is broken, master > takes some time to notice that. So, when replica resets the connection, > it may try to reconnect faster than the master is able to notice its > absence. In this case replica wouldn't be able to reconnect due to > `duplicate connection with the same replica UUID`. So replication would > stop for a replication_timeout, which may be quite large (seconds or > tens of seconds). > > Let's prevent tarantool from reconnecting to the same instance, if there > already is a working connection. > > Closes #4669 > --- > .../gh-4669-applier-reconfig-reconnect.md | 5 ++ > src/box/box.cc | 31 ++++++--- > src/box/replication.cc | 41 ++++++++---- > src/box/replication.h | 4 +- > test/instance_files/base_instance.lua | 3 +- > test/instance_files/replica.lua | 17 +++++ > test/luatest_helpers/server.lua | 65 +++++++++++++++++++ > .../gh_4669_applier_reconnect_test.lua | 40 ++++++++++++ > 8 files changed, 184 insertions(+), 22 deletions(-) > create mode 100644 changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md > create mode 100755 test/instance_files/replica.lua > create mode 100644 test/replication-luatest/gh_4669_applier_reconnect_test.lua > > diff --git a/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md > new file mode 100644 > index 000000000..2a776872b > --- /dev/null > +++ b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md > @@ -0,0 +1,5 @@ > +## bugfix/replication > + > +* Fixed replica reconnecting to a living master on any `box.cfg{replication=...}` > + change. Such reconnects could lead to replica failing to restore connection > + for `replication_timeout` seconds (gh-4669). > diff --git a/src/box/box.cc b/src/box/box.cc > index 219ffa38d..cc4ada47e 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -1249,7 +1249,7 @@ cfg_get_replication(int *p_count) > * don't start appliers. > */ > static void > -box_sync_replication(bool connect_quorum) > +box_sync_replication(bool do_quorum, bool do_reuse) > { > int count = 0; > struct applier **appliers = cfg_get_replication(&count); > @@ -1260,12 +1260,27 @@ box_sync_replication(bool connect_quorum) > for (int i = 0; i < count; i++) > applier_delete(appliers[i]); /* doesn't affect diag */ > }); > - > - replicaset_connect(appliers, count, connect_quorum); > + replicaset_connect(appliers, count, do_quorum, do_reuse); > > guard.is_active = false; > } > > +static inline void > +box_restart_replication(void) > +{ > + const bool do_quorum = true; > + const bool do_reuse = false; > + box_sync_replication(do_quorum, do_reuse); > +} > + > +static inline void > +box_update_replication(void) > +{ > + const bool do_quorum = false; > + const bool do_reuse = true; > + box_sync_replication(do_quorum, do_reuse); > +} > + > void > box_set_replication(void) > { > @@ -1284,7 +1299,7 @@ box_set_replication(void) > * Stay in orphan mode in case we fail to connect to at least > * 'replication_connect_quorum' remote instances. > */ > - box_sync_replication(false); > + box_update_replication(); > /* Follow replica */ > replicaset_follow(); > /* Wait until appliers are in sync */ > @@ -1404,7 +1419,7 @@ box_set_replication_anon(void) > * them can register and others resend a > * non-anonymous subscribe. > */ > - box_sync_replication(true); > + box_restart_replication(); > /* > * Wait until the master has registered this > * instance. > @@ -3258,7 +3273,7 @@ bootstrap(const struct tt_uuid *instance_uuid, > * with connecting to 'replication_connect_quorum' masters. > * If this also fails, throw an error. > */ > - box_sync_replication(true); > + box_restart_replication(); > > struct replica *master = replicaset_find_join_master(); > assert(master == NULL || master->applier != NULL); > @@ -3335,7 +3350,7 @@ local_recovery(const struct tt_uuid *instance_uuid, > if (wal_dir_lock >= 0) { > if (box_listen() != 0) > diag_raise(); > - box_sync_replication(false); > + box_update_replication(); > > struct replica *master; > if (replicaset_needs_rejoin(&master)) { > @@ -3414,7 +3429,7 @@ local_recovery(const struct tt_uuid *instance_uuid, > vclock_copy(&replicaset.vclock, &recovery->vclock); > if (box_listen() != 0) > diag_raise(); > - box_sync_replication(false); > + box_update_replication(); > } > stream_guard.is_active = false; > recovery_finalize(recovery); > diff --git a/src/box/replication.cc b/src/box/replication.cc > index 1288bc9b1..10b4ac915 100644 > --- a/src/box/replication.cc > +++ b/src/box/replication.cc > @@ -507,7 +507,7 @@ replica_on_applier_state_f(struct trigger *trigger, void *event) > * upon reconfiguration of box.cfg.replication. > */ > static void > -replicaset_update(struct applier **appliers, int count) > +replicaset_update(struct applier **appliers, int count, bool keep_connect) > { > replica_hash_t uniq; > memset(&uniq, 0, sizeof(uniq)); > @@ -572,22 +572,39 @@ replicaset_update(struct applier **appliers, int count) > applier_stop(applier); > applier_delete(applier); > } > + > + replicaset.applier.total = count; > + replicaset.applier.connected = 0; > + replicaset.applier.loading = 0; > + replicaset.applier.synced = 0; > replicaset_foreach(replica) { > if (replica->applier == NULL) > continue; > - applier = replica->applier; > - replica_clear_applier(replica); > - replica->applier_sync_state = APPLIER_DISCONNECTED; > + struct replica *other = replica_hash_search(&uniq, replica); > + if (keep_connect && other != NULL && > + (replica->applier->state == APPLIER_FOLLOW || > + replica->applier->state == APPLIER_SYNC)) { > + /* > + * Try not to interrupt working appliers upon > + * reconfiguration. > + */ > + replicaset.applier.connected++; > + replicaset.applier.synced++; > + replica_hash_remove(&uniq, other); > + applier = other->applier; > + replica_clear_applier(other); > + replica_delete(other); > + } else { > + applier = replica->applier; > + replica_clear_applier(replica); > + replica->applier_sync_state = APPLIER_DISCONNECTED; > + } > applier_stop(applier); > applier_delete(applier); > } > > - /* Save new appliers */ > - replicaset.applier.total = count; > - replicaset.applier.connected = 0; > - replicaset.applier.loading = 0; > - replicaset.applier.synced = 0; > > + /* Save new appliers */ > replica_hash_foreach_safe(&uniq, replica, next) { > replica_hash_remove(&uniq, replica); > > @@ -664,11 +681,11 @@ applier_on_connect_f(struct trigger *trigger, void *event) > > void > replicaset_connect(struct applier **appliers, int count, > - bool connect_quorum) > + bool connect_quorum, bool keep_connect) > { > if (count == 0) { > /* Cleanup the replica set. */ > - replicaset_update(appliers, count); > + replicaset_update(appliers, 0, false); > return; > } > > @@ -772,7 +789,7 @@ replicaset_connect(struct applier **appliers, int count, > > /* Now all the appliers are connected, update the replica set. */ > try { > - replicaset_update(appliers, count); > + replicaset_update(appliers, count, keep_connect); > } catch (Exception *e) { > goto error; > } > diff --git a/src/box/replication.h b/src/box/replication.h > index 4c82cd839..a8fed45e8 100644 > --- a/src/box/replication.h > +++ b/src/box/replication.h > @@ -439,10 +439,12 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid); > * \param connect_quorum if this flag is set, fail unless at > * least replication_connect_quorum > * appliers have successfully connected. > + * \param keep_connect if this flag is set do not force a reconnect if the > + * old connection to the replica is fine. > */ > void > replicaset_connect(struct applier **appliers, int count, > - bool connect_quorum); > + bool connect_quorum, bool keep_connect); > > /** > * Check if the current instance fell too much behind its > diff --git a/test/instance_files/base_instance.lua b/test/instance_files/base_instance.lua > index 45bdbc7e8..e579c3843 100755 > --- a/test/instance_files/base_instance.lua > +++ b/test/instance_files/base_instance.lua > @@ -5,7 +5,8 @@ local listen = os.getenv('TARANTOOL_LISTEN') > box.cfg({ > work_dir = workdir, > -- listen = 'localhost:3310' > - listen = listen > + listen = listen, > + log = workdir..'/tarantool.log', > }) > > box.schema.user.grant('guest', 'read,write,execute,create', 'universe') > diff --git a/test/instance_files/replica.lua b/test/instance_files/replica.lua > new file mode 100755 > index 000000000..587345f24 > --- /dev/null > +++ b/test/instance_files/replica.lua > @@ -0,0 +1,17 @@ > +#!/usr/bin/env tarantool > +local workdir = os.getenv('TARANTOOL_WORKDIR') > +local listen = os.getenv('TARANTOOL_LISTEN') > +local SOCKET_DIR = os.getenv('VARDIR') > +local MASTER = arg[1] or "master" > + > +local function master_uri() > + return SOCKET_DIR..'/'..MASTER..'.sock' > +end > + > +box.cfg({ > + work_dir = workdir, > + listen = listen, > + replication = master_uri(), > +}) > + > +_G.ready = true > diff --git a/test/luatest_helpers/server.lua b/test/luatest_helpers/server.lua > index 018ea4f7d..0306a24ef 100644 > --- a/test/luatest_helpers/server.lua > +++ b/test/luatest_helpers/server.lua > @@ -5,6 +5,7 @@ local fiber = require('fiber') > local fio = require('fio') > local fun = require('fun') > local json = require('json') > +local errno = require('errno') > > local checks = require('checks') > local luatest = require('luatest') > @@ -59,6 +60,70 @@ function Server:initialize() > getmetatable(getmetatable(self)).initialize(self) > end > > +-- A copy of test_run:grep_log. > +function Server:grep_log(what, bytes, opts) > + local opts = opts or {} > + local noreset = opts.noreset or false > + -- if instance has crashed provide filename to use grep_log > + local filename = opts.filename or self:eval('return box.cfg.log') > + local file = fio.open(filename, {'O_RDONLY', 'O_NONBLOCK'}) > + > + local function fail(msg) > + local err = errno.strerror() > + if file ~= nil then > + file:close() > + end > + error(string.format("%s: %s: %s", msg, filename, err)) > + end > + > + if file == nil then > + fail("Failed to open log file") > + end > + io.flush() -- attempt to flush stdout == log fd > + local filesize = file:seek(0, 'SEEK_END') > + if filesize == nil then > + fail("Failed to get log file size") > + end > + local bytes = bytes or 65536 -- don't read whole log - it can be huge > + bytes = bytes > filesize and filesize or bytes > + if file:seek(-bytes, 'SEEK_END') == nil then > + fail("Failed to seek log file") > + end > + local found, buf > + repeat -- read file in chunks > + local s = file:read(2048) > + if s == nil then > + fail("Failed to read log file") > + end > + local pos = 1 > + repeat -- split read string in lines > + local endpos = string.find(s, '\n', pos) > + endpos = endpos and endpos - 1 -- strip terminating \n > + local line = string.sub(s, pos, endpos) > + if endpos == nil and s ~= '' then > + -- line doesn't end with \n or eof, append it to buffer > + -- to be checked on next iteration > + buf = buf or {} > + table.insert(buf, line) > + else > + if buf ~= nil then -- prepend line with buffered data > + table.insert(buf, line) > + line = table.concat(buf) > + buf = nil > + end > + if string.match(line, "Starting instance") and not noreset then > + found = nil -- server was restarted, reset search > + else > + found = string.match(line, what) or found > + end > + end > + pos = endpos and endpos + 2 -- jump to char after \n > + until pos == nil > + until s == '' > + file:close() > + return found > +end > + > --- Generates environment to run process with. > -- The result is merged into os.environ(). > -- @return map > diff --git a/test/replication-luatest/gh_4669_applier_reconnect_test.lua b/test/replication-luatest/gh_4669_applier_reconnect_test.lua > new file mode 100644 > index 000000000..19e10b136 > --- /dev/null > +++ b/test/replication-luatest/gh_4669_applier_reconnect_test.lua > @@ -0,0 +1,40 @@ > +local t = require('luatest') > +local cluster = require('test.luatest_helpers.cluster') > + > +local g = t.group('gh-4669-applier-reconnect') > + > +local function check_follow_master(server) > + return t.assert_equals( > + server:eval('return box.info.replication[1].upstream.status'), 'follow') > +end > + > +g.before_each(function() > + g.cluster = cluster:new({}) > + g.master = g.cluster:build_server( > + {}, {alias = 'master'}, 'base_instance.lua') > + g.replica = g.cluster:build_server( > + {args={'master'}}, {alias = 'replica'}, 'replica.lua') > + > + g.cluster:join_server(g.master) > + g.cluster:join_server(g.replica) > + g.cluster:start() > + check_follow_master(g.replica) > +end) > + > +g.after_each(function() > + g.cluster:stop() > +end) > + > +-- Test that appliers aren't recreated upon replication reconfiguration. > +g.test_applier_connection_on_reconfig = function(g) > + g.replica:eval([[ > + box.cfg{ > + replication = { > + os.getenv("TARANTOOL_LISTEN"), > + box.cfg.replication[1], > + } > + } > + ]]) Should we test this with dynamic add/remove of a 2nd replica? > + check_follow_master(g.replica) > + t.assert_equals(g.master:grep_log("exiting the relay loop"), nil) > +end > -- > 2.30.1 (Apple Git-130) >
next prev parent reply other threads:[~2021-10-12 10:14 UTC|newest] Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-10-05 13:18 [Tarantool-patches] [PATCH v2 0/2] replication: fix reconnect on box.cfg.replication change Serge Petrenko via Tarantool-patches 2021-10-05 13:18 ` [Tarantool-patches] [PATCH v2 1/2] replicaiton: make anon replica connect to quorum upon reconfiguration Serge Petrenko via Tarantool-patches 2021-10-12 9:56 ` sergos via Tarantool-patches 2021-10-12 13:38 ` Serge Petrenko via Tarantool-patches 2021-10-13 16:00 ` sergos via Tarantool-patches 2021-10-05 13:18 ` [Tarantool-patches] [PATCH v2 2/2] replication: fix replica disconnect " Serge Petrenko via Tarantool-patches 2021-10-12 10:14 ` sergos via Tarantool-patches [this message] 2021-10-12 14:14 ` Serge Petrenko via Tarantool-patches 2021-10-13 16:00 ` sergos via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=648688AC-03B8-48B3-B644-8F3B9A9A9212@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=sergepetrenko@tarantool.org \ --cc=sergos@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v2 2/2] replication: fix replica disconnect upon reconfiguration' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox