Tarantool development patches archive
 help / color / mirror / Atom feed
From: sergos via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: Serge Petrenko <sergepetrenko@tarantool.org>
Cc: tarantool-patches@dev.tarantool.org
Subject: Re: [Tarantool-patches] [PATCH v2 2/2] replication: fix replica disconnect upon reconfiguration
Date: Tue, 12 Oct 2021 13:14:27 +0300
Message-ID: <648688AC-03B8-48B3-B644-8F3B9A9A9212@tarantool.org> (raw)
In-Reply-To: <5f68cfcac8c44346a690e0f48ad0da1d7646ac83.1633439400.git.sergepetrenko@tarantool.org>

Hi!

Thanks for the patch!

Just request for some addition to the test, otherwise LGTM.

Sergos


> On 5 Oct 2021, at 16:18, Serge Petrenko <sergepetrenko@tarantool.org> wrote:
> 
> Replication reconfiguration used to work as follows: upon receiving a
> new config disconnect from all the existing masters and try to connect
> to all the masters from the new config.
> 
> This lead to instances doing extra work when old config and new config
> had the same nodes in them: instead of doing nothing, tarantool
> reinitialized the connection.
> 
> There was another problem: when an existing connection is broken, master
> takes some time to notice that. So, when replica resets the connection,
> it may try to reconnect faster than the master is able to notice its
> absence. In this case replica wouldn't be able to reconnect due to
> `duplicate connection with the same replica UUID`. So replication would
> stop for a replication_timeout, which may be quite large (seconds or
> tens of seconds).
> 
> Let's prevent tarantool from reconnecting to the same instance, if there
> already is a working connection.
> 
> Closes #4669
> ---
> .../gh-4669-applier-reconfig-reconnect.md     |  5 ++
> src/box/box.cc                                | 31 ++++++---
> src/box/replication.cc                        | 41 ++++++++----
> src/box/replication.h                         |  4 +-
> test/instance_files/base_instance.lua         |  3 +-
> test/instance_files/replica.lua               | 17 +++++
> test/luatest_helpers/server.lua               | 65 +++++++++++++++++++
> .../gh_4669_applier_reconnect_test.lua        | 40 ++++++++++++
> 8 files changed, 184 insertions(+), 22 deletions(-)
> create mode 100644 changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
> create mode 100755 test/instance_files/replica.lua
> create mode 100644 test/replication-luatest/gh_4669_applier_reconnect_test.lua
> 
> diff --git a/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
> new file mode 100644
> index 000000000..2a776872b
> --- /dev/null
> +++ b/changelogs/unreleased/gh-4669-applier-reconfig-reconnect.md
> @@ -0,0 +1,5 @@
> +## bugfix/replication
> +
> +* Fixed replica reconnecting to a living master on any `box.cfg{replication=...}`
> +  change. Such reconnects could lead to replica failing to restore connection
> +  for `replication_timeout` seconds (gh-4669).
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 219ffa38d..cc4ada47e 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -1249,7 +1249,7 @@ cfg_get_replication(int *p_count)
>  * don't start appliers.
>  */
> static void
> -box_sync_replication(bool connect_quorum)
> +box_sync_replication(bool do_quorum, bool do_reuse)
> {
> 	int count = 0;
> 	struct applier **appliers = cfg_get_replication(&count);
> @@ -1260,12 +1260,27 @@ box_sync_replication(bool connect_quorum)
> 		for (int i = 0; i < count; i++)
> 			applier_delete(appliers[i]); /* doesn't affect diag */
> 	});
> -
> -	replicaset_connect(appliers, count, connect_quorum);
> +	replicaset_connect(appliers, count, do_quorum, do_reuse);
> 
> 	guard.is_active = false;
> }
> 
> +static inline void
> +box_restart_replication(void)
> +{
> +	const bool do_quorum = true;
> +	const bool do_reuse = false;
> +	box_sync_replication(do_quorum, do_reuse);
> +}
> +
> +static inline void
> +box_update_replication(void)
> +{
> +	const bool do_quorum = false;
> +	const bool do_reuse = true;
> +	box_sync_replication(do_quorum, do_reuse);
> +}
> +
> void
> box_set_replication(void)
> {
> @@ -1284,7 +1299,7 @@ box_set_replication(void)
> 	 * Stay in orphan mode in case we fail to connect to at least
> 	 * 'replication_connect_quorum' remote instances.
> 	 */
> -	box_sync_replication(false);
> +	box_update_replication();
> 	/* Follow replica */
> 	replicaset_follow();
> 	/* Wait until appliers are in sync */
> @@ -1404,7 +1419,7 @@ box_set_replication_anon(void)
> 		 * them can register and others resend a
> 		 * non-anonymous subscribe.
> 		 */
> -		box_sync_replication(true);
> +		box_restart_replication();
> 		/*
> 		 * Wait until the master has registered this
> 		 * instance.
> @@ -3258,7 +3273,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
> 	 * with connecting to 'replication_connect_quorum' masters.
> 	 * If this also fails, throw an error.
> 	 */
> -	box_sync_replication(true);
> +	box_restart_replication();
> 
> 	struct replica *master = replicaset_find_join_master();
> 	assert(master == NULL || master->applier != NULL);
> @@ -3335,7 +3350,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
> 	if (wal_dir_lock >= 0) {
> 		if (box_listen() != 0)
> 			diag_raise();
> -		box_sync_replication(false);
> +		box_update_replication();
> 
> 		struct replica *master;
> 		if (replicaset_needs_rejoin(&master)) {
> @@ -3414,7 +3429,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
> 		vclock_copy(&replicaset.vclock, &recovery->vclock);
> 		if (box_listen() != 0)
> 			diag_raise();
> -		box_sync_replication(false);
> +		box_update_replication();
> 	}
> 	stream_guard.is_active = false;
> 	recovery_finalize(recovery);
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 1288bc9b1..10b4ac915 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -507,7 +507,7 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
>  * upon reconfiguration of box.cfg.replication.
>  */
> static void
> -replicaset_update(struct applier **appliers, int count)
> +replicaset_update(struct applier **appliers, int count, bool keep_connect)
> {
> 	replica_hash_t uniq;
> 	memset(&uniq, 0, sizeof(uniq));
> @@ -572,22 +572,39 @@ replicaset_update(struct applier **appliers, int count)
> 		applier_stop(applier);
> 		applier_delete(applier);
> 	}
> +
> +	replicaset.applier.total = count;
> +	replicaset.applier.connected = 0;
> +	replicaset.applier.loading = 0;
> +	replicaset.applier.synced = 0;
> 	replicaset_foreach(replica) {
> 		if (replica->applier == NULL)
> 			continue;
> -		applier = replica->applier;
> -		replica_clear_applier(replica);
> -		replica->applier_sync_state = APPLIER_DISCONNECTED;
> +		struct replica *other = replica_hash_search(&uniq, replica);
> +		if (keep_connect && other != NULL &&
> +		    (replica->applier->state == APPLIER_FOLLOW ||
> +		     replica->applier->state == APPLIER_SYNC)) {
> +			/*
> +			 * Try not to interrupt working appliers upon
> +			 * reconfiguration.
> +			 */
> +			replicaset.applier.connected++;
> +			replicaset.applier.synced++;
> +			replica_hash_remove(&uniq, other);
> +			applier = other->applier;
> +			replica_clear_applier(other);
> +			replica_delete(other);
> +		} else {
> +			applier = replica->applier;
> +			replica_clear_applier(replica);
> +			replica->applier_sync_state = APPLIER_DISCONNECTED;
> +		}
> 		applier_stop(applier);
> 		applier_delete(applier);
> 	}
> 
> -	/* Save new appliers */
> -	replicaset.applier.total = count;
> -	replicaset.applier.connected = 0;
> -	replicaset.applier.loading = 0;
> -	replicaset.applier.synced = 0;
> 
> +	/* Save new appliers */
> 	replica_hash_foreach_safe(&uniq, replica, next) {
> 		replica_hash_remove(&uniq, replica);
> 
> @@ -664,11 +681,11 @@ applier_on_connect_f(struct trigger *trigger, void *event)
> 
> void
> replicaset_connect(struct applier **appliers, int count,
> -		   bool connect_quorum)
> +		   bool connect_quorum, bool keep_connect)
> {
> 	if (count == 0) {
> 		/* Cleanup the replica set. */
> -		replicaset_update(appliers, count);
> +		replicaset_update(appliers, 0, false);
> 		return;
> 	}
> 
> @@ -772,7 +789,7 @@ replicaset_connect(struct applier **appliers, int count,
> 
> 	/* Now all the appliers are connected, update the replica set. */
> 	try {
> -		replicaset_update(appliers, count);
> +		replicaset_update(appliers, count, keep_connect);
> 	} catch (Exception *e) {
> 		goto error;
> 	}
> diff --git a/src/box/replication.h b/src/box/replication.h
> index 4c82cd839..a8fed45e8 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -439,10 +439,12 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid);
>  * \param connect_quorum if this flag is set, fail unless at
>  *                       least replication_connect_quorum
>  *                       appliers have successfully connected.
> + * \param keep_connect   if this flag is set do not force a reconnect if the
> + *                       old connection to the replica is fine.
>  */
> void
> replicaset_connect(struct applier **appliers, int count,
> -		   bool connect_quorum);
> +		   bool connect_quorum, bool keep_connect);
> 
> /**
>  * Check if the current instance fell too much behind its
> diff --git a/test/instance_files/base_instance.lua b/test/instance_files/base_instance.lua
> index 45bdbc7e8..e579c3843 100755
> --- a/test/instance_files/base_instance.lua
> +++ b/test/instance_files/base_instance.lua
> @@ -5,7 +5,8 @@ local listen = os.getenv('TARANTOOL_LISTEN')
> box.cfg({
>     work_dir = workdir,
> --     listen = 'localhost:3310'
> -    listen = listen
> +    listen = listen,
> +    log = workdir..'/tarantool.log',
> })
> 
> box.schema.user.grant('guest', 'read,write,execute,create', 'universe')
> diff --git a/test/instance_files/replica.lua b/test/instance_files/replica.lua
> new file mode 100755
> index 000000000..587345f24
> --- /dev/null
> +++ b/test/instance_files/replica.lua
> @@ -0,0 +1,17 @@
> +#!/usr/bin/env tarantool
> +local workdir = os.getenv('TARANTOOL_WORKDIR')
> +local listen = os.getenv('TARANTOOL_LISTEN')
> +local SOCKET_DIR = os.getenv('VARDIR')
> +local MASTER = arg[1] or "master"
> +
> +local function master_uri()
> +    return SOCKET_DIR..'/'..MASTER..'.sock'
> +end
> +
> +box.cfg({
> +    work_dir = workdir,
> +    listen = listen,
> +    replication = master_uri(),
> +})
> +
> +_G.ready = true
> diff --git a/test/luatest_helpers/server.lua b/test/luatest_helpers/server.lua
> index 018ea4f7d..0306a24ef 100644
> --- a/test/luatest_helpers/server.lua
> +++ b/test/luatest_helpers/server.lua
> @@ -5,6 +5,7 @@ local fiber = require('fiber')
> local fio = require('fio')
> local fun = require('fun')
> local json = require('json')
> +local errno = require('errno')
> 
> local checks = require('checks')
> local luatest = require('luatest')
> @@ -59,6 +60,70 @@ function Server:initialize()
>     getmetatable(getmetatable(self)).initialize(self)
> end
> 
> +-- A copy of test_run:grep_log.
> +function Server:grep_log(what, bytes, opts)
> +    local opts = opts or {}
> +    local noreset = opts.noreset or false
> +    -- if instance has crashed provide filename to use grep_log
> +    local filename = opts.filename or self:eval('return box.cfg.log')
> +    local file = fio.open(filename, {'O_RDONLY', 'O_NONBLOCK'})
> +
> +    local function fail(msg)
> +        local err = errno.strerror()
> +        if file ~= nil then
> +            file:close()
> +        end
> +        error(string.format("%s: %s: %s", msg, filename, err))
> +    end
> +
> +    if file == nil then
> +        fail("Failed to open log file")
> +    end
> +    io.flush() -- attempt to flush stdout == log fd
> +    local filesize = file:seek(0, 'SEEK_END')
> +    if filesize == nil then
> +        fail("Failed to get log file size")
> +    end
> +    local bytes = bytes or 65536 -- don't read whole log - it can be huge
> +    bytes = bytes > filesize and filesize or bytes
> +    if file:seek(-bytes, 'SEEK_END') == nil then
> +        fail("Failed to seek log file")
> +    end
> +    local found, buf
> +    repeat -- read file in chunks
> +        local s = file:read(2048)
> +        if s == nil then
> +            fail("Failed to read log file")
> +        end
> +        local pos = 1
> +        repeat -- split read string in lines
> +            local endpos = string.find(s, '\n', pos)
> +            endpos = endpos and endpos - 1 -- strip terminating \n
> +            local line = string.sub(s, pos, endpos)
> +            if endpos == nil and s ~= '' then
> +                -- line doesn't end with \n or eof, append it to buffer
> +                -- to be checked on next iteration
> +                buf = buf or {}
> +                table.insert(buf, line)
> +            else
> +                if buf ~= nil then -- prepend line with buffered data
> +                    table.insert(buf, line)
> +                    line = table.concat(buf)
> +                    buf = nil
> +                end
> +                if string.match(line, "Starting instance") and not noreset then
> +                    found = nil -- server was restarted, reset search
> +                else
> +                    found = string.match(line, what) or found
> +                end
> +            end
> +            pos = endpos and endpos + 2 -- jump to char after \n
> +        until pos == nil
> +    until s == ''
> +    file:close()
> +    return found
> +end
> +
> --- Generates environment to run process with.
> -- The result is merged into os.environ().
> -- @return map
> diff --git a/test/replication-luatest/gh_4669_applier_reconnect_test.lua b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
> new file mode 100644
> index 000000000..19e10b136
> --- /dev/null
> +++ b/test/replication-luatest/gh_4669_applier_reconnect_test.lua
> @@ -0,0 +1,40 @@
> +local t = require('luatest')
> +local cluster = require('test.luatest_helpers.cluster')
> +
> +local g = t.group('gh-4669-applier-reconnect')
> +
> +local function check_follow_master(server)
> +    return t.assert_equals(
> +        server:eval('return box.info.replication[1].upstream.status'), 'follow')
> +end
> +
> +g.before_each(function()
> +    g.cluster = cluster:new({})
> +    g.master = g.cluster:build_server(
> +        {}, {alias = 'master'}, 'base_instance.lua')
> +    g.replica = g.cluster:build_server(
> +        {args={'master'}}, {alias = 'replica'}, 'replica.lua')
> +
> +    g.cluster:join_server(g.master)
> +    g.cluster:join_server(g.replica)
> +    g.cluster:start()
> +    check_follow_master(g.replica)
> +end)
> +
> +g.after_each(function()
> +    g.cluster:stop()
> +end)
> +
> +-- Test that appliers aren't recreated upon replication reconfiguration.
> +g.test_applier_connection_on_reconfig = function(g)
> +    g.replica:eval([[
> +        box.cfg{
> +            replication = {
> +                os.getenv("TARANTOOL_LISTEN"),
> +                box.cfg.replication[1],
> +            }
> +        }
> +    ]])

Should we test this with dynamic add/remove of a 2nd replica?

> +    check_follow_master(g.replica)
> +    t.assert_equals(g.master:grep_log("exiting the relay loop"), nil)
> +end
> -- 
> 2.30.1 (Apple Git-130)
> 


  reply	other threads:[~2021-10-12 10:14 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-10-05 13:18 [Tarantool-patches] [PATCH v2 0/2] replication: fix reconnect on box.cfg.replication change Serge Petrenko via Tarantool-patches
2021-10-05 13:18 ` [Tarantool-patches] [PATCH v2 1/2] replicaiton: make anon replica connect to quorum upon reconfiguration Serge Petrenko via Tarantool-patches
2021-10-12  9:56   ` sergos via Tarantool-patches
2021-10-12 13:38     ` Serge Petrenko via Tarantool-patches
2021-10-13 16:00       ` sergos via Tarantool-patches
2021-10-05 13:18 ` [Tarantool-patches] [PATCH v2 2/2] replication: fix replica disconnect " Serge Petrenko via Tarantool-patches
2021-10-12 10:14   ` sergos via Tarantool-patches [this message]
2021-10-12 14:14     ` Serge Petrenko via Tarantool-patches
2021-10-13 16:00       ` sergos via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=648688AC-03B8-48B3-B644-8F3B9A9A9212@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=sergepetrenko@tarantool.org \
    --cc=sergos@tarantool.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Tarantool development patches archive

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://lists.tarantool.org/tarantool-patches/0 tarantool-patches/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 tarantool-patches tarantool-patches/ https://lists.tarantool.org/tarantool-patches \
		tarantool-patches@dev.tarantool.org.
	public-inbox-index tarantool-patches

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git