[Tarantool-patches] [PATCH v2 2/2] replication: fix replica disconnect upon reconfiguration

Serge Petrenko sergepetrenko at tarantool.org
Tue Oct 12 17:14:34 MSK 2021



12.10.2021 13:14, sergos пишет:
> Hi!
>
> Thanks for the patch!
>
> Just request for some addition to the test, otherwise LGTM.
>
> Sergos
>
>
>> On 5 Oct 2021, at 16:18, Serge Petrenko <sergepetrenko at tarantool.org> wrote:
>>
>> Replication reconfiguration used to work as follows: upon receiving a
>> new config disconnect from all the existing masters and try to connect
>> to all the masters from the new config.
>>
>> This lead to instances doing extra work when old config and new config
>> had the same nodes in them: instead of doing nothing, tarantool
>> reinitialized the connection.
>>
>> There was another problem: when an existing connection is broken, master
>> takes some time to notice that. So, when replica resets the connection,
>> it may try to reconnect faster than the master is able to notice its
>> absence. In this case replica wouldn't be able to reconnect due to
>> `duplicate connection with the same replica UUID`. So replication would
>> stop for a replication_timeout, which may be quite large (seconds or
>> tens of seconds).
>>
>> Let's prevent tarantool from reconnecting to the same instance, if there
>> already is a working connection.
>>
>> Closes #4669
>> ---
>> .../gh-4669-applier-reconfig-reconnect.md     |  5 ++
>> src/box/box.cc                                | 31 ++++++---
>> src/box/replication.cc                        | 41 ++++++++----
>> src/box/replication.h                         |  4 +-
>> test/instance_files/base_instance.lua         |  3 +-
>> test/instance_files/replica.lua               | 17 +++++
>> test/luatest_helpers/server.lua               | 65 +++++++++++++++++++
>> .../gh_4669_applier_reconnect_test.lua        | 40 ++++++++++++
>> 8 files changed, 184 insertions(+), 22 deletions(-)
>> create mode 100644 changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
>> create mode 100755 test/instance_files/replica.lua
>> create mode 100644 test/replication-luatest/gh_4669_applier_reconnect_test.lua
>>
>> diff --git a/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
>> new file mode 100644
>> index 000000000..2a776872b
>> --- /dev/null
>> +++ b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
>> @@ -0,0 +1,5 @@
>> +## bugfix/replication
>> +
>> +* Fixed replica reconnecting to a living master on any `box.cfg{replication=...}`
>> +  change. Such reconnects could lead to replica failing to restore connection
>> +  for `replication_timeout` seconds (gh-4669).
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 219ffa38d..cc4ada47e 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -1249,7 +1249,7 @@ cfg_get_replication(int *p_count)
>>   * don't start appliers.
>>   */
>> static void
>> -box_sync_replication(bool connect_quorum)
>> +box_sync_replication(bool do_quorum, bool do_reuse)
>> {
>> 	int count = 0;
>> 	struct applier **appliers = cfg_get_replication(&count);
>> @@ -1260,12 +1260,27 @@ box_sync_replication(bool connect_quorum)
>> 		for (int i = 0; i < count; i++)
>> 			applier_delete(appliers[i]); /* doesn't affect diag */
>> 	});
>> -
>> -	replicaset_connect(appliers, count, connect_quorum);
>> +	replicaset_connect(appliers, count, do_quorum, do_reuse);
>>
>> 	guard.is_active = false;
>> }
>>
>> +static inline void
>> +box_restart_replication(void)
>> +{
>> +	const bool do_quorum = true;
>> +	const bool do_reuse = false;
>> +	box_sync_replication(do_quorum, do_reuse);
>> +}
>> +
>> +static inline void
>> +box_update_replication(void)
>> +{
>> +	const bool do_quorum = false;
>> +	const bool do_reuse = true;
>> +	box_sync_replication(do_quorum, do_reuse);
>> +}
>> +
>> void
>> box_set_replication(void)
>> {
>> @@ -1284,7 +1299,7 @@ box_set_replication(void)
>> 	 * Stay in orphan mode in case we fail to connect to at least
>> 	 * 'replication_connect_quorum' remote instances.
>> 	 */
>> -	box_sync_replication(false);
>> +	box_update_replication();
>> 	/* Follow replica */
>> 	replicaset_follow();
>> 	/* Wait until appliers are in sync */
>> @@ -1404,7 +1419,7 @@ box_set_replication_anon(void)
>> 		 * them can register and others resend a
>> 		 * non-anonymous subscribe.
>> 		 */
>> -		box_sync_replication(true);
>> +		box_restart_replication();
>> 		/*
>> 		 * Wait until the master has registered this
>> 		 * instance.
>> @@ -3258,7 +3273,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
>> 	 * with connecting to 'replication_connect_quorum' masters.
>> 	 * If this also fails, throw an error.
>> 	 */
>> -	box_sync_replication(true);
>> +	box_restart_replication();
>>
>> 	struct replica *master = replicaset_find_join_master();
>> 	assert(master == NULL || master->applier != NULL);
>> @@ -3335,7 +3350,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>> 	if (wal_dir_lock >= 0) {
>> 		if (box_listen() != 0)
>> 			diag_raise();
>> -		box_sync_replication(false);
>> +		box_update_replication();
>>
>> 		struct replica *master;
>> 		if (replicaset_needs_rejoin(&master)) {
>> @@ -3414,7 +3429,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>> 		vclock_copy(&replicaset.vclock, &recovery->vclock);
>> 		if (box_listen() != 0)
>> 			diag_raise();
>> -		box_sync_replication(false);
>> +		box_update_replication();
>> 	}
>> 	stream_guard.is_active = false;
>> 	recovery_finalize(recovery);
>> diff --git a/src/box/replication.cc b/src/box/replication.cc
>> index 1288bc9b1..10b4ac915 100644
>> --- a/src/box/replication.cc
>> +++ b/src/box/replication.cc
>> @@ -507,7 +507,7 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
>>   * upon reconfiguration of box.cfg.replication.
>>   */
>> static void
>> -replicaset_update(struct applier **appliers, int count)
>> +replicaset_update(struct applier **appliers, int count, bool keep_connect)
>> {
>> 	replica_hash_t uniq;
>> 	memset(&uniq, 0, sizeof(uniq));
>> @@ -572,22 +572,39 @@ replicaset_update(struct applier **appliers, int count)
>> 		applier_stop(applier);
>> 		applier_delete(applier);
>> 	}
>> +
>> +	replicaset.applier.total = count;
>> +	replicaset.applier.connected = 0;
>> +	replicaset.applier.loading = 0;
>> +	replicaset.applier.synced = 0;
>> 	replicaset_foreach(replica) {
>> 		if (replica->applier == NULL)
>> 			continue;
>> -		applier = replica->applier;
>> -		replica_clear_applier(replica);
>> -		replica->applier_sync_state = APPLIER_DISCONNECTED;
>> +		struct replica *other = replica_hash_search(&uniq, replica);
>> +		if (keep_connect && other != NULL &&
>> +		    (replica->applier->state == APPLIER_FOLLOW ||
>> +		     replica->applier->state == APPLIER_SYNC)) {
>> +			/*
>> +			 * Try not to interrupt working appliers upon
>> +			 * reconfiguration.
>> +			 */
>> +			replicaset.applier.connected++;
>> +			replicaset.applier.synced++;
>> +			replica_hash_remove(&uniq, other);
>> +			applier = other->applier;
>> +			replica_clear_applier(other);
>> +			replica_delete(other);
>> +		} else {
>> +			applier = replica->applier;
>> +			replica_clear_applier(replica);
>> +			replica->applier_sync_state = APPLIER_DISCONNECTED;
>> +		}
>> 		applier_stop(applier);
>> 		applier_delete(applier);
>> 	}
>>
>> -	/* Save new appliers */
>> -	replicaset.applier.total = count;
>> -	replicaset.applier.connected = 0;
>> -	replicaset.applier.loading = 0;
>> -	replicaset.applier.synced = 0;
>>
>> +	/* Save new appliers */
>> 	replica_hash_foreach_safe(&uniq, replica, next) {
>> 		replica_hash_remove(&uniq, replica);
>>
>> @@ -664,11 +681,11 @@ applier_on_connect_f(struct trigger *trigger, void *event)
>>
>> void
>> replicaset_connect(struct applier **appliers, int count,
>> -		   bool connect_quorum)
>> +		   bool connect_quorum, bool keep_connect)
>> {
>> 	if (count == 0) {
>> 		/* Cleanup the replica set. */
>> -		replicaset_update(appliers, count);
>> +		replicaset_update(appliers, 0, false);
>> 		return;
>> 	}
>>
>> @@ -772,7 +789,7 @@ replicaset_connect(struct applier **appliers, int count,
>>
>> 	/* Now all the appliers are connected, update the replica set. */
>> 	try {
>> -		replicaset_update(appliers, count);
>> +		replicaset_update(appliers, count, keep_connect);
>> 	} catch (Exception *e) {
>> 		goto error;
>> 	}
>> diff --git a/src/box/replication.h b/src/box/replication.h
>> index 4c82cd839..a8fed45e8 100644
>> --- a/src/box/replication.h
>> +++ b/src/box/replication.h
>> @@ -439,10 +439,12 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid);
>>   * \param connect_quorum if this flag is set, fail unless at
>>   *                       least replication_connect_quorum
>>   *                       appliers have successfully connected.
>> + * \param keep_connect   if this flag is set do not force a reconnect if the
>> + *                       old connection to the replica is fine.
>>   */
>> void
>> replicaset_connect(struct applier **appliers, int count,
>> -		   bool connect_quorum);
>> +		   bool connect_quorum, bool keep_connect);
>>
>> /**
>>   * Check if the current instance fell too much behind its
>> diff --git a/test/instance_files/base_instance.lua b/test/instance_files/base_instance.lua
>> index 45bdbc7e8..e579c3843 100755
>> --- a/test/instance_files/base_instance.lua
>> +++ b/test/instance_files/base_instance.lua
>> @@ -5,7 +5,8 @@ local listen = os.getenv('TARANTOOL_LISTEN')
>> box.cfg({
>>      work_dir = workdir,
>> --     listen = 'localhost:3310'
>> -    listen = listen
>> +    listen = listen,
>> +    log = workdir..'/tarantool.log',
>> })
>>
>> box.schema.user.grant('guest', 'read,write,execute,create', 'universe')
>> diff --git a/test/instance_files/replica.lua b/test/instance_files/replica.lua
>> new file mode 100755
>> index 000000000..587345f24
>> --- /dev/null
>> +++ b/test/instance_files/replica.lua
>> @@ -0,0 +1,17 @@
>> +#!/usr/bin/env tarantool
>> +local workdir = os.getenv('TARANTOOL_WORKDIR')
>> +local listen = os.getenv('TARANTOOL_LISTEN')
>> +local SOCKET_DIR = os.getenv('VARDIR')
>> +local MASTER = arg[1] or "master"
>> +
>> +local function master_uri()
>> +    return SOCKET_DIR..'/'..MASTER..'.sock'
>> +end
>> +
>> +box.cfg({
>> +    work_dir = workdir,
>> +    listen = listen,
>> +    replication = master_uri(),
>> +})
>> +
>> +_G.ready = true
>> diff --git a/test/luatest_helpers/server.lua b/test/luatest_helpers/server.lua
>> index 018ea4f7d..0306a24ef 100644
>> --- a/test/luatest_helpers/server.lua
>> +++ b/test/luatest_helpers/server.lua
>> @@ -5,6 +5,7 @@ local fiber = require('fiber')
>> local fio = require('fio')
>> local fun = require('fun')
>> local json = require('json')
>> +local errno = require('errno')
>>
>> local checks = require('checks')
>> local luatest = require('luatest')
>> @@ -59,6 +60,70 @@ function Server:initialize()
>>      getmetatable(getmetatable(self)).initialize(self)
>> end
>>
>> +-- A copy of test_run:grep_log.
>> +function Server:grep_log(what, bytes, opts)
>> +    local opts = opts or {}
>> +    local noreset = opts.noreset or false
>> +    -- if instance has crashed provide filename to use grep_log
>> +    local filename = opts.filename or self:eval('return box.cfg.log')
>> +    local file = fio.open(filename, {'O_RDONLY', 'O_NONBLOCK'})
>> +
>> +    local function fail(msg)
>> +        local err = errno.strerror()
>> +        if file ~= nil then
>> +            file:close()
>> +        end
>> +        error(string.format("%s: %s: %s", msg, filename, err))
>> +    end
>> +
>> +    if file == nil then
>> +        fail("Failed to open log file")
>> +    end
>> +    io.flush() -- attempt to flush stdout == log fd
>> +    local filesize = file:seek(0, 'SEEK_END')
>> +    if filesize == nil then
>> +        fail("Failed to get log file size")
>> +    end
>> +    local bytes = bytes or 65536 -- don't read whole log - it can be huge
>> +    bytes = bytes > filesize and filesize or bytes
>> +    if file:seek(-bytes, 'SEEK_END') == nil then
>> +        fail("Failed to seek log file")
>> +    end
>> +    local found, buf
>> +    repeat -- read file in chunks
>> +        local s = file:read(2048)
>> +        if s == nil then
>> +            fail("Failed to read log file")
>> +        end
>> +        local pos = 1
>> +        repeat -- split read string in lines
>> +            local endpos = string.find(s, '\n', pos)
>> +            endpos = endpos and endpos - 1 -- strip terminating \n
>> +            local line = string.sub(s, pos, endpos)
>> +            if endpos == nil and s ~= '' then
>> +                -- line doesn't end with \n or eof, append it to buffer
>> +                -- to be checked on next iteration
>> +                buf = buf or {}
>> +                table.insert(buf, line)
>> +            else
>> +                if buf ~= nil then -- prepend line with buffered data
>> +                    table.insert(buf, line)
>> +                    line = table.concat(buf)
>> +                    buf = nil
>> +                end
>> +                if string.match(line, "Starting instance") and not noreset then
>> +                    found = nil -- server was restarted, reset search
>> +                else
>> +                    found = string.match(line, what) or found
>> +                end
>> +            end
>> +            pos = endpos and endpos + 2 -- jump to char after \n
>> +        until pos == nil
>> +    until s == ''
>> +    file:close()
>> +    return found
>> +end
>> +
>> --- Generates environment to run process with.
>> -- The result is merged into os.environ().
>> -- @return map
>> diff --git a/test/replication-luatest/gh_4669_applier_reconnect_test.lua b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
>> new file mode 100644
>> index 000000000..19e10b136
>> --- /dev/null
>> +++ b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
>> @@ -0,0 +1,40 @@
>> +local t = require('luatest')
>> +local cluster = require('test.luatest_helpers.cluster')
>> +
>> +local g = t.group('gh-4669-applier-reconnect')
>> +
>> +local function check_follow_master(server)
>> +    return t.assert_equals(
>> +        server:eval('return box.info.replication[1].upstream.status'), 'follow')
>> +end
>> +
>> +g.before_each(function()
>> +    g.cluster = cluster:new({})
>> +    g.master = g.cluster:build_server(
>> +        {}, {alias = 'master'}, 'base_instance.lua')
>> +    g.replica = g.cluster:build_server(
>> +        {args={'master'}}, {alias = 'replica'}, 'replica.lua')
>> +
>> +    g.cluster:join_server(g.master)
>> +    g.cluster:join_server(g.replica)
>> +    g.cluster:start()
>> +    check_follow_master(g.replica)
>> +end)
>> +
>> +g.after_each(function()
>> +    g.cluster:stop()
>> +end)
>> +
>> +-- Test that appliers aren't recreated upon replication reconfiguration.
>> +g.test_applier_connection_on_reconfig = function(g)
>> +    g.replica:eval([[
>> +        box.cfg{
>> +            replication = {
>> +                os.getenv("TARANTOOL_LISTEN"),
>> +                box.cfg.replication[1],
>> +            }
>> +        }
>> +    ]])
> Should we test this with dynamic add/remove of a 2nd replica?

No problem. Here's the diff:

============================

diff --git a/test/replication-luatest/gh_4669_applier_reconnect_test.lua 
b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
index 19e10b136..e082515fd 100644
--- a/test/replication-luatest/gh_4669_applier_reconnect_test.lua
+++ b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
@@ -14,9 +14,12 @@ g.before_each(function()
          {}, {alias = 'master'}, 'base_instance.lua')
      g.replica = g.cluster:build_server(
          {args={'master'}}, {alias = 'replica'}, 'replica.lua')
+    g.replica2 = g.cluster:build_server(
+        {args={'master'}}, {alias = 'replica2'}, 'replica.lua')

      g.cluster:join_server(g.master)
      g.cluster:join_server(g.replica)
+    g.cluster:join_server(g.replica2)
      g.cluster:start()
      check_follow_master(g.replica)
  end)
@@ -26,12 +29,20 @@ g.after_each(function()
  end)

  -- Test that appliers aren't recreated upon replication reconfiguration.
+-- Add and then remove two extra replicas to the configuration. The master
+-- connection should stay intact.
  g.test_applier_connection_on_reconfig = function(g)
      g.replica:eval([[
          box.cfg{
              replication = {
                  os.getenv("TARANTOOL_LISTEN"),
                  box.cfg.replication[1],
+                os.getenv('VARDIR')..'/replica2.sock'
+            }
+        }
+        box.cfg{
+            replication = {
+                box.cfg.replication[2]
              }
          }
      ]])

============================

>
>> +    check_follow_master(g.replica)
>> +    t.assert_equals(g.master:grep_log("exiting the relay loop"), nil)
>> +end
>> -- 
>> 2.30.1 (Apple Git-130)
>>

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list