[Tarantool-patches] [PATCH v2 2/2] replication: fix replica disconnect upon reconfiguration
Serge Petrenko
sergepetrenko at tarantool.org
Tue Oct 12 17:14:34 MSK 2021
12.10.2021 13:14, sergos пишет:
> Hi!
>
> Thanks for the patch!
>
> Just request for some addition to the test, otherwise LGTM.
>
> Sergos
>
>
>> On 5 Oct 2021, at 16:18, Serge Petrenko <sergepetrenko at tarantool.org> wrote:
>>
>> Replication reconfiguration used to work as follows: upon receiving a
>> new config disconnect from all the existing masters and try to connect
>> to all the masters from the new config.
>>
>> This lead to instances doing extra work when old config and new config
>> had the same nodes in them: instead of doing nothing, tarantool
>> reinitialized the connection.
>>
>> There was another problem: when an existing connection is broken, master
>> takes some time to notice that. So, when replica resets the connection,
>> it may try to reconnect faster than the master is able to notice its
>> absence. In this case replica wouldn't be able to reconnect due to
>> `duplicate connection with the same replica UUID`. So replication would
>> stop for a replication_timeout, which may be quite large (seconds or
>> tens of seconds).
>>
>> Let's prevent tarantool from reconnecting to the same instance, if there
>> already is a working connection.
>>
>> Closes #4669
>> ---
>> .../gh-4669-applier-reconfig-reconnect.md | 5 ++
>> src/box/box.cc | 31 ++++++---
>> src/box/replication.cc | 41 ++++++++----
>> src/box/replication.h | 4 +-
>> test/instance_files/base_instance.lua | 3 +-
>> test/instance_files/replica.lua | 17 +++++
>> test/luatest_helpers/server.lua | 65 +++++++++++++++++++
>> .../gh_4669_applier_reconnect_test.lua | 40 ++++++++++++
>> 8 files changed, 184 insertions(+), 22 deletions(-)
>> create mode 100644 changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
>> create mode 100755 test/instance_files/replica.lua
>> create mode 100644 test/replication-luatest/gh_4669_applier_reconnect_test.lua
>>
>> diff --git a/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
>> new file mode 100644
>> index 000000000..2a776872b
>> --- /dev/null
>> +++ b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
>> @@ -0,0 +1,5 @@
>> +## bugfix/replication
>> +
>> +* Fixed replica reconnecting to a living master on any `box.cfg{replication=...}`
>> + change. Such reconnects could lead to replica failing to restore connection
>> + for `replication_timeout` seconds (gh-4669).
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 219ffa38d..cc4ada47e 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -1249,7 +1249,7 @@ cfg_get_replication(int *p_count)
>> * don't start appliers.
>> */
>> static void
>> -box_sync_replication(bool connect_quorum)
>> +box_sync_replication(bool do_quorum, bool do_reuse)
>> {
>> int count = 0;
>> struct applier **appliers = cfg_get_replication(&count);
>> @@ -1260,12 +1260,27 @@ box_sync_replication(bool connect_quorum)
>> for (int i = 0; i < count; i++)
>> applier_delete(appliers[i]); /* doesn't affect diag */
>> });
>> -
>> - replicaset_connect(appliers, count, connect_quorum);
>> + replicaset_connect(appliers, count, do_quorum, do_reuse);
>>
>> guard.is_active = false;
>> }
>>
>> +static inline void
>> +box_restart_replication(void)
>> +{
>> + const bool do_quorum = true;
>> + const bool do_reuse = false;
>> + box_sync_replication(do_quorum, do_reuse);
>> +}
>> +
>> +static inline void
>> +box_update_replication(void)
>> +{
>> + const bool do_quorum = false;
>> + const bool do_reuse = true;
>> + box_sync_replication(do_quorum, do_reuse);
>> +}
>> +
>> void
>> box_set_replication(void)
>> {
>> @@ -1284,7 +1299,7 @@ box_set_replication(void)
>> * Stay in orphan mode in case we fail to connect to at least
>> * 'replication_connect_quorum' remote instances.
>> */
>> - box_sync_replication(false);
>> + box_update_replication();
>> /* Follow replica */
>> replicaset_follow();
>> /* Wait until appliers are in sync */
>> @@ -1404,7 +1419,7 @@ box_set_replication_anon(void)
>> * them can register and others resend a
>> * non-anonymous subscribe.
>> */
>> - box_sync_replication(true);
>> + box_restart_replication();
>> /*
>> * Wait until the master has registered this
>> * instance.
>> @@ -3258,7 +3273,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
>> * with connecting to 'replication_connect_quorum' masters.
>> * If this also fails, throw an error.
>> */
>> - box_sync_replication(true);
>> + box_restart_replication();
>>
>> struct replica *master = replicaset_find_join_master();
>> assert(master == NULL || master->applier != NULL);
>> @@ -3335,7 +3350,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>> if (wal_dir_lock >= 0) {
>> if (box_listen() != 0)
>> diag_raise();
>> - box_sync_replication(false);
>> + box_update_replication();
>>
>> struct replica *master;
>> if (replicaset_needs_rejoin(&master)) {
>> @@ -3414,7 +3429,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>> vclock_copy(&replicaset.vclock, &recovery->vclock);
>> if (box_listen() != 0)
>> diag_raise();
>> - box_sync_replication(false);
>> + box_update_replication();
>> }
>> stream_guard.is_active = false;
>> recovery_finalize(recovery);
>> diff --git a/src/box/replication.cc b/src/box/replication.cc
>> index 1288bc9b1..10b4ac915 100644
>> --- a/src/box/replication.cc
>> +++ b/src/box/replication.cc
>> @@ -507,7 +507,7 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
>> * upon reconfiguration of box.cfg.replication.
>> */
>> static void
>> -replicaset_update(struct applier **appliers, int count)
>> +replicaset_update(struct applier **appliers, int count, bool keep_connect)
>> {
>> replica_hash_t uniq;
>> memset(&uniq, 0, sizeof(uniq));
>> @@ -572,22 +572,39 @@ replicaset_update(struct applier **appliers, int count)
>> applier_stop(applier);
>> applier_delete(applier);
>> }
>> +
>> + replicaset.applier.total = count;
>> + replicaset.applier.connected = 0;
>> + replicaset.applier.loading = 0;
>> + replicaset.applier.synced = 0;
>> replicaset_foreach(replica) {
>> if (replica->applier == NULL)
>> continue;
>> - applier = replica->applier;
>> - replica_clear_applier(replica);
>> - replica->applier_sync_state = APPLIER_DISCONNECTED;
>> + struct replica *other = replica_hash_search(&uniq, replica);
>> + if (keep_connect && other != NULL &&
>> + (replica->applier->state == APPLIER_FOLLOW ||
>> + replica->applier->state == APPLIER_SYNC)) {
>> + /*
>> + * Try not to interrupt working appliers upon
>> + * reconfiguration.
>> + */
>> + replicaset.applier.connected++;
>> + replicaset.applier.synced++;
>> + replica_hash_remove(&uniq, other);
>> + applier = other->applier;
>> + replica_clear_applier(other);
>> + replica_delete(other);
>> + } else {
>> + applier = replica->applier;
>> + replica_clear_applier(replica);
>> + replica->applier_sync_state = APPLIER_DISCONNECTED;
>> + }
>> applier_stop(applier);
>> applier_delete(applier);
>> }
>>
>> - /* Save new appliers */
>> - replicaset.applier.total = count;
>> - replicaset.applier.connected = 0;
>> - replicaset.applier.loading = 0;
>> - replicaset.applier.synced = 0;
>>
>> + /* Save new appliers */
>> replica_hash_foreach_safe(&uniq, replica, next) {
>> replica_hash_remove(&uniq, replica);
>>
>> @@ -664,11 +681,11 @@ applier_on_connect_f(struct trigger *trigger, void *event)
>>
>> void
>> replicaset_connect(struct applier **appliers, int count,
>> - bool connect_quorum)
>> + bool connect_quorum, bool keep_connect)
>> {
>> if (count == 0) {
>> /* Cleanup the replica set. */
>> - replicaset_update(appliers, count);
>> + replicaset_update(appliers, 0, false);
>> return;
>> }
>>
>> @@ -772,7 +789,7 @@ replicaset_connect(struct applier **appliers, int count,
>>
>> /* Now all the appliers are connected, update the replica set. */
>> try {
>> - replicaset_update(appliers, count);
>> + replicaset_update(appliers, count, keep_connect);
>> } catch (Exception *e) {
>> goto error;
>> }
>> diff --git a/src/box/replication.h b/src/box/replication.h
>> index 4c82cd839..a8fed45e8 100644
>> --- a/src/box/replication.h
>> +++ b/src/box/replication.h
>> @@ -439,10 +439,12 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid);
>> * \param connect_quorum if this flag is set, fail unless at
>> * least replication_connect_quorum
>> * appliers have successfully connected.
>> + * \param keep_connect if this flag is set do not force a reconnect if the
>> + * old connection to the replica is fine.
>> */
>> void
>> replicaset_connect(struct applier **appliers, int count,
>> - bool connect_quorum);
>> + bool connect_quorum, bool keep_connect);
>>
>> /**
>> * Check if the current instance fell too much behind its
>> diff --git a/test/instance_files/base_instance.lua b/test/instance_files/base_instance.lua
>> index 45bdbc7e8..e579c3843 100755
>> --- a/test/instance_files/base_instance.lua
>> +++ b/test/instance_files/base_instance.lua
>> @@ -5,7 +5,8 @@ local listen = os.getenv('TARANTOOL_LISTEN')
>> box.cfg({
>> work_dir = workdir,
>> -- listen = 'localhost:3310'
>> - listen = listen
>> + listen = listen,
>> + log = workdir..'/tarantool.log',
>> })
>>
>> box.schema.user.grant('guest', 'read,write,execute,create', 'universe')
>> diff --git a/test/instance_files/replica.lua b/test/instance_files/replica.lua
>> new file mode 100755
>> index 000000000..587345f24
>> --- /dev/null
>> +++ b/test/instance_files/replica.lua
>> @@ -0,0 +1,17 @@
>> +#!/usr/bin/env tarantool
>> +local workdir = os.getenv('TARANTOOL_WORKDIR')
>> +local listen = os.getenv('TARANTOOL_LISTEN')
>> +local SOCKET_DIR = os.getenv('VARDIR')
>> +local MASTER = arg[1] or "master"
>> +
>> +local function master_uri()
>> + return SOCKET_DIR..'/'..MASTER..'.sock'
>> +end
>> +
>> +box.cfg({
>> + work_dir = workdir,
>> + listen = listen,
>> + replication = master_uri(),
>> +})
>> +
>> +_G.ready = true
>> diff --git a/test/luatest_helpers/server.lua b/test/luatest_helpers/server.lua
>> index 018ea4f7d..0306a24ef 100644
>> --- a/test/luatest_helpers/server.lua
>> +++ b/test/luatest_helpers/server.lua
>> @@ -5,6 +5,7 @@ local fiber = require('fiber')
>> local fio = require('fio')
>> local fun = require('fun')
>> local json = require('json')
>> +local errno = require('errno')
>>
>> local checks = require('checks')
>> local luatest = require('luatest')
>> @@ -59,6 +60,70 @@ function Server:initialize()
>> getmetatable(getmetatable(self)).initialize(self)
>> end
>>
>> +-- A copy of test_run:grep_log.
>> +function Server:grep_log(what, bytes, opts)
>> + local opts = opts or {}
>> + local noreset = opts.noreset or false
>> + -- if instance has crashed provide filename to use grep_log
>> + local filename = opts.filename or self:eval('return box.cfg.log')
>> + local file = fio.open(filename, {'O_RDONLY', 'O_NONBLOCK'})
>> +
>> + local function fail(msg)
>> + local err = errno.strerror()
>> + if file ~= nil then
>> + file:close()
>> + end
>> + error(string.format("%s: %s: %s", msg, filename, err))
>> + end
>> +
>> + if file == nil then
>> + fail("Failed to open log file")
>> + end
>> + io.flush() -- attempt to flush stdout == log fd
>> + local filesize = file:seek(0, 'SEEK_END')
>> + if filesize == nil then
>> + fail("Failed to get log file size")
>> + end
>> + local bytes = bytes or 65536 -- don't read whole log - it can be huge
>> + bytes = bytes > filesize and filesize or bytes
>> + if file:seek(-bytes, 'SEEK_END') == nil then
>> + fail("Failed to seek log file")
>> + end
>> + local found, buf
>> + repeat -- read file in chunks
>> + local s = file:read(2048)
>> + if s == nil then
>> + fail("Failed to read log file")
>> + end
>> + local pos = 1
>> + repeat -- split read string in lines
>> + local endpos = string.find(s, '\n', pos)
>> + endpos = endpos and endpos - 1 -- strip terminating \n
>> + local line = string.sub(s, pos, endpos)
>> + if endpos == nil and s ~= '' then
>> + -- line doesn't end with \n or eof, append it to buffer
>> + -- to be checked on next iteration
>> + buf = buf or {}
>> + table.insert(buf, line)
>> + else
>> + if buf ~= nil then -- prepend line with buffered data
>> + table.insert(buf, line)
>> + line = table.concat(buf)
>> + buf = nil
>> + end
>> + if string.match(line, "Starting instance") and not noreset then
>> + found = nil -- server was restarted, reset search
>> + else
>> + found = string.match(line, what) or found
>> + end
>> + end
>> + pos = endpos and endpos + 2 -- jump to char after \n
>> + until pos == nil
>> + until s == ''
>> + file:close()
>> + return found
>> +end
>> +
>> --- Generates environment to run process with.
>> -- The result is merged into os.environ().
>> -- @return map
>> diff --git a/test/replication-luatest/gh_4669_applier_reconnect_test.lua b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
>> new file mode 100644
>> index 000000000..19e10b136
>> --- /dev/null
>> +++ b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
>> @@ -0,0 +1,40 @@
>> +local t = require('luatest')
>> +local cluster = require('test.luatest_helpers.cluster')
>> +
>> +local g = t.group('gh-4669-applier-reconnect')
>> +
>> +local function check_follow_master(server)
>> + return t.assert_equals(
>> + server:eval('return box.info.replication[1].upstream.status'), 'follow')
>> +end
>> +
>> +g.before_each(function()
>> + g.cluster = cluster:new({})
>> + g.master = g.cluster:build_server(
>> + {}, {alias = 'master'}, 'base_instance.lua')
>> + g.replica = g.cluster:build_server(
>> + {args={'master'}}, {alias = 'replica'}, 'replica.lua')
>> +
>> + g.cluster:join_server(g.master)
>> + g.cluster:join_server(g.replica)
>> + g.cluster:start()
>> + check_follow_master(g.replica)
>> +end)
>> +
>> +g.after_each(function()
>> + g.cluster:stop()
>> +end)
>> +
>> +-- Test that appliers aren't recreated upon replication reconfiguration.
>> +g.test_applier_connection_on_reconfig = function(g)
>> + g.replica:eval([[
>> + box.cfg{
>> + replication = {
>> + os.getenv("TARANTOOL_LISTEN"),
>> + box.cfg.replication[1],
>> + }
>> + }
>> + ]])
> Should we test this with dynamic add/remove of a 2nd replica?
No problem. Here's the diff:
============================
diff --git a/test/replication-luatest/gh_4669_applier_reconnect_test.lua
b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
index 19e10b136..e082515fd 100644
--- a/test/replication-luatest/gh_4669_applier_reconnect_test.lua
+++ b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
@@ -14,9 +14,12 @@ g.before_each(function()
{}, {alias = 'master'}, 'base_instance.lua')
g.replica = g.cluster:build_server(
{args={'master'}}, {alias = 'replica'}, 'replica.lua')
+ g.replica2 = g.cluster:build_server(
+ {args={'master'}}, {alias = 'replica2'}, 'replica.lua')
g.cluster:join_server(g.master)
g.cluster:join_server(g.replica)
+ g.cluster:join_server(g.replica2)
g.cluster:start()
check_follow_master(g.replica)
end)
@@ -26,12 +29,20 @@ g.after_each(function()
end)
-- Test that appliers aren't recreated upon replication reconfiguration.
+-- Add and then remove two extra replicas to the configuration. The master
+-- connection should stay intact.
g.test_applier_connection_on_reconfig = function(g)
g.replica:eval([[
box.cfg{
replication = {
os.getenv("TARANTOOL_LISTEN"),
box.cfg.replication[1],
+ os.getenv('VARDIR')..'/replica2.sock'
+ }
+ }
+ box.cfg{
+ replication = {
+ box.cfg.replication[2]
}
}
]])
============================
>
>> + check_follow_master(g.replica)
>> + t.assert_equals(g.master:grep_log("exiting the relay loop"), nil)
>> +end
>> --
>> 2.30.1 (Apple Git-130)
>>
--
Serge Petrenko
More information about the Tarantool-patches
mailing list