From: sergos via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: Serge Petrenko <sergepetrenko@tarantool.org>
Cc: tarantool-patches@dev.tarantool.org
Subject: Re: [Tarantool-patches] [PATCH v2 2/2] replication: fix replica disconnect upon reconfiguration
Date: Wed, 13 Oct 2021 19:00:29 +0300 [thread overview]
Message-ID: <31EB9C74-5BAD-4D52-A5BB-EE347C4104A5@tarantool.org> (raw)
In-Reply-To: <8e453790-534e-be84-b4b2-fd30c9740eb7@tarantool.org>
[-- Attachment #1: Type: text/plain, Size: 17825 bytes --]
LGTM.
Sergos
> On 12 Oct 2021, at 17:14, Serge Petrenko <sergepetrenko@tarantool.org> wrote:
>
>
>
> 12.10.2021 13:14, sergos пишет:
>> Hi!
>>
>> Thanks for the patch!
>>
>> Just request for some addition to the test, otherwise LGTM.
>>
>> Sergos
>>
>>
>>> On 5 Oct 2021, at 16:18, Serge Petrenko <sergepetrenko@tarantool.org> wrote:
>>>
>>> Replication reconfiguration used to work as follows: upon receiving a
>>> new config disconnect from all the existing masters and try to connect
>>> to all the masters from the new config.
>>>
>>> This lead to instances doing extra work when old config and new config
>>> had the same nodes in them: instead of doing nothing, tarantool
>>> reinitialized the connection.
>>>
>>> There was another problem: when an existing connection is broken, master
>>> takes some time to notice that. So, when replica resets the connection,
>>> it may try to reconnect faster than the master is able to notice its
>>> absence. In this case replica wouldn't be able to reconnect due to
>>> `duplicate connection with the same replica UUID`. So replication would
>>> stop for a replication_timeout, which may be quite large (seconds or
>>> tens of seconds).
>>>
>>> Let's prevent tarantool from reconnecting to the same instance, if there
>>> already is a working connection.
>>>
>>> Closes #4669
>>> ---
>>> .../gh-4669-applier-reconfig-reconnect.md | 5 ++
>>> src/box/box.cc | 31 ++++++---
>>> src/box/replication.cc | 41 ++++++++----
>>> src/box/replication.h | 4 +-
>>> test/instance_files/base_instance.lua | 3 +-
>>> test/instance_files/replica.lua | 17 +++++
>>> test/luatest_helpers/server.lua | 65 +++++++++++++++++++
>>> .../gh_4669_applier_reconnect_test.lua | 40 ++++++++++++
>>> 8 files changed, 184 insertions(+), 22 deletions(-)
>>> create mode 100644 changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
>>> create mode 100755 test/instance_files/replica.lua
>>> create mode 100644 test/replication-luatest/gh_4669_applier_reconnect_test.lua
>>>
>>> diff --git a/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
>>> new file mode 100644
>>> index 000000000..2a776872b
>>> --- /dev/null
>>> +++ b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
>>> @@ -0,0 +1,5 @@
>>> +## bugfix/replication
>>> +
>>> +* Fixed replica reconnecting to a living master on any `box.cfg{replication=...}`
>>> + change. Such reconnects could lead to replica failing to restore connection
>>> + for `replication_timeout` seconds (gh-4669).
>>> diff --git a/src/box/box.cc b/src/box/box.cc
>>> index 219ffa38d..cc4ada47e 100644
>>> --- a/src/box/box.cc
>>> +++ b/src/box/box.cc
>>> @@ -1249,7 +1249,7 @@ cfg_get_replication(int *p_count)
>>> * don't start appliers.
>>> */
>>> static void
>>> -box_sync_replication(bool connect_quorum)
>>> +box_sync_replication(bool do_quorum, bool do_reuse)
>>> {
>>> int count = 0;
>>> struct applier **appliers = cfg_get_replication(&count);
>>> @@ -1260,12 +1260,27 @@ box_sync_replication(bool connect_quorum)
>>> for (int i = 0; i < count; i++)
>>> applier_delete(appliers[i]); /* doesn't affect diag */
>>> });
>>> -
>>> - replicaset_connect(appliers, count, connect_quorum);
>>> + replicaset_connect(appliers, count, do_quorum, do_reuse);
>>>
>>> guard.is_active = false;
>>> }
>>>
>>> +static inline void
>>> +box_restart_replication(void)
>>> +{
>>> + const bool do_quorum = true;
>>> + const bool do_reuse = false;
>>> + box_sync_replication(do_quorum, do_reuse);
>>> +}
>>> +
>>> +static inline void
>>> +box_update_replication(void)
>>> +{
>>> + const bool do_quorum = false;
>>> + const bool do_reuse = true;
>>> + box_sync_replication(do_quorum, do_reuse);
>>> +}
>>> +
>>> void
>>> box_set_replication(void)
>>> {
>>> @@ -1284,7 +1299,7 @@ box_set_replication(void)
>>> * Stay in orphan mode in case we fail to connect to at least
>>> * 'replication_connect_quorum' remote instances.
>>> */
>>> - box_sync_replication(false);
>>> + box_update_replication();
>>> /* Follow replica */
>>> replicaset_follow();
>>> /* Wait until appliers are in sync */
>>> @@ -1404,7 +1419,7 @@ box_set_replication_anon(void)
>>> * them can register and others resend a
>>> * non-anonymous subscribe.
>>> */
>>> - box_sync_replication(true);
>>> + box_restart_replication();
>>> /*
>>> * Wait until the master has registered this
>>> * instance.
>>> @@ -3258,7 +3273,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
>>> * with connecting to 'replication_connect_quorum' masters.
>>> * If this also fails, throw an error.
>>> */
>>> - box_sync_replication(true);
>>> + box_restart_replication();
>>>
>>> struct replica *master = replicaset_find_join_master();
>>> assert(master == NULL || master->applier != NULL);
>>> @@ -3335,7 +3350,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>>> if (wal_dir_lock >= 0) {
>>> if (box_listen() != 0)
>>> diag_raise();
>>> - box_sync_replication(false);
>>> + box_update_replication();
>>>
>>> struct replica *master;
>>> if (replicaset_needs_rejoin(&master)) {
>>> @@ -3414,7 +3429,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>>> vclock_copy(&replicaset.vclock, &recovery->vclock);
>>> if (box_listen() != 0)
>>> diag_raise();
>>> - box_sync_replication(false);
>>> + box_update_replication();
>>> }
>>> stream_guard.is_active = false;
>>> recovery_finalize(recovery);
>>> diff --git a/src/box/replication.cc b/src/box/replication.cc
>>> index 1288bc9b1..10b4ac915 100644
>>> --- a/src/box/replication.cc
>>> +++ b/src/box/replication.cc
>>> @@ -507,7 +507,7 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
>>> * upon reconfiguration of box.cfg.replication.
>>> */
>>> static void
>>> -replicaset_update(struct applier **appliers, int count)
>>> +replicaset_update(struct applier **appliers, int count, bool keep_connect)
>>> {
>>> replica_hash_t uniq;
>>> memset(&uniq, 0, sizeof(uniq));
>>> @@ -572,22 +572,39 @@ replicaset_update(struct applier **appliers, int count)
>>> applier_stop(applier);
>>> applier_delete(applier);
>>> }
>>> +
>>> + replicaset.applier.total = count;
>>> + replicaset.applier.connected = 0;
>>> + replicaset.applier.loading = 0;
>>> + replicaset.applier.synced = 0;
>>> replicaset_foreach(replica) {
>>> if (replica->applier == NULL)
>>> continue;
>>> - applier = replica->applier;
>>> - replica_clear_applier(replica);
>>> - replica->applier_sync_state = APPLIER_DISCONNECTED;
>>> + struct replica *other = replica_hash_search(&uniq, replica);
>>> + if (keep_connect && other != NULL &&
>>> + (replica->applier->state == APPLIER_FOLLOW ||
>>> + replica->applier->state == APPLIER_SYNC)) {
>>> + /*
>>> + * Try not to interrupt working appliers upon
>>> + * reconfiguration.
>>> + */
>>> + replicaset.applier.connected++;
>>> + replicaset.applier.synced++;
>>> + replica_hash_remove(&uniq, other);
>>> + applier = other->applier;
>>> + replica_clear_applier(other);
>>> + replica_delete(other);
>>> + } else {
>>> + applier = replica->applier;
>>> + replica_clear_applier(replica);
>>> + replica->applier_sync_state = APPLIER_DISCONNECTED;
>>> + }
>>> applier_stop(applier);
>>> applier_delete(applier);
>>> }
>>>
>>> - /* Save new appliers */
>>> - replicaset.applier.total = count;
>>> - replicaset.applier.connected = 0;
>>> - replicaset.applier.loading = 0;
>>> - replicaset.applier.synced = 0;
>>>
>>> + /* Save new appliers */
>>> replica_hash_foreach_safe(&uniq, replica, next) {
>>> replica_hash_remove(&uniq, replica);
>>>
>>> @@ -664,11 +681,11 @@ applier_on_connect_f(struct trigger *trigger, void *event)
>>>
>>> void
>>> replicaset_connect(struct applier **appliers, int count,
>>> - bool connect_quorum)
>>> + bool connect_quorum, bool keep_connect)
>>> {
>>> if (count == 0) {
>>> /* Cleanup the replica set. */
>>> - replicaset_update(appliers, count);
>>> + replicaset_update(appliers, 0, false);
>>> return;
>>> }
>>>
>>> @@ -772,7 +789,7 @@ replicaset_connect(struct applier **appliers, int count,
>>>
>>> /* Now all the appliers are connected, update the replica set. */
>>> try {
>>> - replicaset_update(appliers, count);
>>> + replicaset_update(appliers, count, keep_connect);
>>> } catch (Exception *e) {
>>> goto error;
>>> }
>>> diff --git a/src/box/replication.h b/src/box/replication.h
>>> index 4c82cd839..a8fed45e8 100644
>>> --- a/src/box/replication.h
>>> +++ b/src/box/replication.h
>>> @@ -439,10 +439,12 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid);
>>> * \param connect_quorum if this flag is set, fail unless at
>>> * least replication_connect_quorum
>>> * appliers have successfully connected.
>>> + * \param keep_connect if this flag is set do not force a reconnect if the
>>> + * old connection to the replica is fine.
>>> */
>>> void
>>> replicaset_connect(struct applier **appliers, int count,
>>> - bool connect_quorum);
>>> + bool connect_quorum, bool keep_connect);
>>>
>>> /**
>>> * Check if the current instance fell too much behind its
>>> diff --git a/test/instance_files/base_instance.lua b/test/instance_files/base_instance.lua
>>> index 45bdbc7e8..e579c3843 100755
>>> --- a/test/instance_files/base_instance.lua
>>> +++ b/test/instance_files/base_instance.lua
>>> @@ -5,7 +5,8 @@ local listen = os.getenv('TARANTOOL_LISTEN')
>>> box.cfg({
>>> work_dir = workdir,
>>> -- listen = 'localhost:3310'
>>> - listen = listen
>>> + listen = listen,
>>> + log = workdir..'/tarantool.log',
>>> })
>>>
>>> box.schema.user.grant('guest', 'read,write,execute,create', 'universe')
>>> diff --git a/test/instance_files/replica.lua b/test/instance_files/replica.lua
>>> new file mode 100755
>>> index 000000000..587345f24
>>> --- /dev/null
>>> +++ b/test/instance_files/replica.lua
>>> @@ -0,0 +1,17 @@
>>> +#!/usr/bin/env tarantool
>>> +local workdir = os.getenv('TARANTOOL_WORKDIR')
>>> +local listen = os.getenv('TARANTOOL_LISTEN')
>>> +local SOCKET_DIR = os.getenv('VARDIR')
>>> +local MASTER = arg[1] or "master"
>>> +
>>> +local function master_uri()
>>> + return SOCKET_DIR..'/'..MASTER..'.sock'
>>> +end
>>> +
>>> +box.cfg({
>>> + work_dir = workdir,
>>> + listen = listen,
>>> + replication = master_uri(),
>>> +})
>>> +
>>> +_G.ready = true
>>> diff --git a/test/luatest_helpers/server.lua b/test/luatest_helpers/server.lua
>>> index 018ea4f7d..0306a24ef 100644
>>> --- a/test/luatest_helpers/server.lua
>>> +++ b/test/luatest_helpers/server.lua
>>> @@ -5,6 +5,7 @@ local fiber = require('fiber')
>>> local fio = require('fio')
>>> local fun = require('fun')
>>> local json = require('json')
>>> +local errno = require('errno')
>>>
>>> local checks = require('checks')
>>> local luatest = require('luatest')
>>> @@ -59,6 +60,70 @@ function Server:initialize()
>>> getmetatable(getmetatable(self)).initialize(self)
>>> end
>>>
>>> +-- A copy of test_run:grep_log.
>>> +function Server:grep_log(what, bytes, opts)
>>> + local opts = opts or {}
>>> + local noreset = opts.noreset or false
>>> + -- if instance has crashed provide filename to use grep_log
>>> + local filename = opts.filename or self:eval('return box.cfg.log')
>>> + local file = fio.open(filename, {'O_RDONLY', 'O_NONBLOCK'})
>>> +
>>> + local function fail(msg)
>>> + local err = errno.strerror()
>>> + if file ~= nil then
>>> + file:close()
>>> + end
>>> + error(string.format("%s: %s: %s", msg, filename, err))
>>> + end
>>> +
>>> + if file == nil then
>>> + fail("Failed to open log file")
>>> + end
>>> + io.flush() -- attempt to flush stdout == log fd
>>> + local filesize = file:seek(0, 'SEEK_END')
>>> + if filesize == nil then
>>> + fail("Failed to get log file size")
>>> + end
>>> + local bytes = bytes or 65536 -- don't read whole log - it can be huge
>>> + bytes = bytes > filesize and filesize or bytes
>>> + if file:seek(-bytes, 'SEEK_END') == nil then
>>> + fail("Failed to seek log file")
>>> + end
>>> + local found, buf
>>> + repeat -- read file in chunks
>>> + local s = file:read(2048)
>>> + if s == nil then
>>> + fail("Failed to read log file")
>>> + end
>>> + local pos = 1
>>> + repeat -- split read string in lines
>>> + local endpos = string.find(s, '\n', pos)
>>> + endpos = endpos and endpos - 1 -- strip terminating \n
>>> + local line = string.sub(s, pos, endpos)
>>> + if endpos == nil and s ~= '' then
>>> + -- line doesn't end with \n or eof, append it to buffer
>>> + -- to be checked on next iteration
>>> + buf = buf or {}
>>> + table.insert(buf, line)
>>> + else
>>> + if buf ~= nil then -- prepend line with buffered data
>>> + table.insert(buf, line)
>>> + line = table.concat(buf)
>>> + buf = nil
>>> + end
>>> + if string.match(line, "Starting instance") and not noreset then
>>> + found = nil -- server was restarted, reset search
>>> + else
>>> + found = string.match(line, what) or found
>>> + end
>>> + end
>>> + pos = endpos and endpos + 2 -- jump to char after \n
>>> + until pos == nil
>>> + until s == ''
>>> + file:close()
>>> + return found
>>> +end
>>> +
>>> --- Generates environment to run process with.
>>> -- The result is merged into os.environ().
>>> -- @return map
>>> diff --git a/test/replication-luatest/gh_4669_applier_reconnect_test.lua b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
>>> new file mode 100644
>>> index 000000000..19e10b136
>>> --- /dev/null
>>> +++ b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
>>> @@ -0,0 +1,40 @@
>>> +local t = require('luatest')
>>> +local cluster = require('test.luatest_helpers.cluster')
>>> +
>>> +local g = t.group('gh-4669-applier-reconnect')
>>> +
>>> +local function check_follow_master(server)
>>> + return t.assert_equals(
>>> + server:eval('return box.info.replication[1].upstream.status'), 'follow')
>>> +end
>>> +
>>> +g.before_each(function()
>>> + g.cluster = cluster:new({})
>>> + g.master = g.cluster:build_server(
>>> + {}, {alias = 'master'}, 'base_instance.lua')
>>> + g.replica = g.cluster:build_server(
>>> + {args={'master'}}, {alias = 'replica'}, 'replica.lua')
>>> +
>>> + g.cluster:join_server(g.master)
>>> + g.cluster:join_server(g.replica)
>>> + g.cluster:start()
>>> + check_follow_master(g.replica)
>>> +end)
>>> +
>>> +g.after_each(function()
>>> + g.cluster:stop()
>>> +end)
>>> +
>>> +-- Test that appliers aren't recreated upon replication reconfiguration.
>>> +g.test_applier_connection_on_reconfig = function(g)
>>> + g.replica:eval([[
>>> + box.cfg{
>>> + replication = {
>>> + os.getenv("TARANTOOL_LISTEN"),
>>> + box.cfg.replication[1],
>>> + }
>>> + }
>>> + ]])
>> Should we test this with dynamic add/remove of a 2nd replica?
>
> No problem. Here's the diff:
>
> ============================
>
> diff --git a/test/replication-luatest/gh_4669_applier_reconnect_test.lua b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
> index 19e10b136..e082515fd 100644
> --- a/test/replication-luatest/gh_4669_applier_reconnect_test.lua
> +++ b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
> @@ -14,9 +14,12 @@ g.before_each(function()
> {}, {alias = 'master'}, 'base_instance.lua')
> g.replica = g.cluster:build_server(
> {args={'master'}}, {alias = 'replica'}, 'replica.lua')
> + g.replica2 = g.cluster:build_server(
> + {args={'master'}}, {alias = 'replica2'}, 'replica.lua')
>
> g.cluster:join_server(g.master)
> g.cluster:join_server(g.replica)
> + g.cluster:join_server(g.replica2)
> g.cluster:start()
> check_follow_master(g.replica)
> end)
> @@ -26,12 +29,20 @@ g.after_each(function()
> end)
>
> -- Test that appliers aren't recreated upon replication reconfiguration.
> +-- Add and then remove two extra replicas to the configuration. The master
> +-- connection should stay intact.
> g.test_applier_connection_on_reconfig = function(g)
> g.replica:eval([[
> box.cfg{
> replication = {
> os.getenv("TARANTOOL_LISTEN"),
> box.cfg.replication[1],
> + os.getenv('VARDIR')..'/replica2.sock'
> + }
> + }
> + box.cfg{
> + replication = {
> + box.cfg.replication[2]
> }
> }
> ]])
>
> ============================
>
>>
>>> + check_follow_master(g.replica)
>>> + t.assert_equals(g.master:grep_log("exiting the relay loop"), nil)
>>> +end
>>> --
>>> 2.30.1 (Apple Git-130)
>>>
>
> --
> Serge Petrenko
[-- Attachment #2: Type: text/html, Size: 74433 bytes --]
prev parent reply other threads:[~2021-10-13 16:00 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-10-05 13:18 [Tarantool-patches] [PATCH v2 0/2] replication: fix reconnect on box.cfg.replication change Serge Petrenko via Tarantool-patches
2021-10-05 13:18 ` [Tarantool-patches] [PATCH v2 1/2] replicaiton: make anon replica connect to quorum upon reconfiguration Serge Petrenko via Tarantool-patches
2021-10-12 9:56 ` sergos via Tarantool-patches
2021-10-12 13:38 ` Serge Petrenko via Tarantool-patches
2021-10-13 16:00 ` sergos via Tarantool-patches
2021-10-05 13:18 ` [Tarantool-patches] [PATCH v2 2/2] replication: fix replica disconnect " Serge Petrenko via Tarantool-patches
2021-10-12 10:14 ` sergos via Tarantool-patches
2021-10-12 14:14 ` Serge Petrenko via Tarantool-patches
2021-10-13 16:00 ` sergos via Tarantool-patches [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=31EB9C74-5BAD-4D52-A5BB-EE347C4104A5@tarantool.org \
--to=tarantool-patches@dev.tarantool.org \
--cc=sergepetrenko@tarantool.org \
--cc=sergos@tarantool.org \
--subject='Re: [Tarantool-patches] [PATCH v2 2/2] replication: fix replica disconnect upon reconfiguration' \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox