[Tarantool-patches] [PATCH v2 2/4] replication: retry in case of XlogGapError
Serge Petrenko
sergepetrenko at tarantool.org
Tue Sep 15 10:35:51 MSK 2020
15.09.2020 02:11, Vladislav Shpilevoy пишет:
> Previously XlogGapError was considered a critical error stopping
> the replication. That may be not so good as it looks.
>
> XlogGapError is a perfectly fine error, which should not kill the
> replication connection. It should be retried instead.
>
> Because here is an example, when the gap can be recovered on its
> own. Consider the case: node1 is a leader, it is booted with
> vclock {1: 3}. Node2 connects and fetches snapshot of node1, it
> also gets vclock {1: 3}. Then node1 writes something and its
> vclock becomes {1: 4}. Now node3 boots from node1, and gets the
> same vclock. Vclocks now look like this:
>
> - node1: {1: 4}, leader, has {1: 3} snap.
> - node2: {1: 3}, booted from node1, has only snap.
> - node3: {1: 4}, booted from node1, has only snap.
>
> If the cluster is a fullmesh, node2 will send subscribe requests
> with vclock {1: 3}. If node3 receives it, it will respond with
> xlog gap error, because it only has a snap with {1: 4}, nothing
> else. In that case node2 should retry connecting to node3, and in
> the meantime try to get newer changes from node1.
>
> The example is totally valid. However it is unreachable now
> because master registers all replicas in _cluster before allowing
> them to make a join. So they all bootstrap from a snapshot
> containing all their IDs. This is a bug, because such
> auto-registration leads to registration of anonymous replicas, if
> they are present during bootstrap. Also it blocks Raft, which
> can't work if there are registered, but not yet joined nodes.
>
> Once the registration problem will be solved in a next commit, the
> XlogGapError will strike quite often during bootstrap. This patch
> won't allow that happen.
>
> Needed for #5287
> ---
> src/box/applier.cc | 27 +++
> test/replication/force_recovery.result | 110 -----------
> test/replication/force_recovery.test.lua | 43 -----
> test/replication/replica.lua | 2 +
> test/replication/replica_rejoin.result | 6 +-
> test/replication/replica_rejoin.test.lua | 4 +-
> .../show_error_on_disconnect.result | 2 +-
> .../show_error_on_disconnect.test.lua | 2 +-
> test/xlog/panic_on_wal_error.result | 171 ------------------
> test/xlog/panic_on_wal_error.test.lua | 75 --------
> 10 files changed, 36 insertions(+), 406 deletions(-)
> delete mode 100644 test/replication/force_recovery.result
> delete mode 100644 test/replication/force_recovery.test.lua
> delete mode 100644 test/xlog/panic_on_wal_error.result
> delete mode 100644 test/xlog/panic_on_wal_error.test.lua
Hi! Thanks for the patch!
I propose you rework the tests so that they expect upstream.status ==
'loading'
instead of deleting them altogether.
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index c1d07ca54..96dd48c0d 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -106,6 +106,7 @@ applier_log_error(struct applier *applier, struct error *e)
> case ER_SYSTEM:
> case ER_UNKNOWN_REPLICA:
> case ER_PASSWORD_MISMATCH:
> + case ER_XLOG_GAP:
> say_info("will retry every %.2lf second",
> replication_reconnect_interval());
> break;
> @@ -1333,6 +1334,32 @@ applier_f(va_list ap)
> applier_disconnect(applier, APPLIER_STOPPED);
> return -1;
> }
> + } catch (XlogGapError *e) {
> + /*
> + * Xlog gap error can't be a critical error. Because it
> + * is totally normal during bootstrap. Consider the
> + * case: node1 is a leader, it is booted with vclock
> + * {1: 3}. Node2 connects and fetches snapshot of node1,
> + * it also gets vclock {1: 3}. Then node1 writes
> + * something and its vclock becomes {1: 4}. Now node3
> + * boots from node1, and gets the same vclock. Vclocks
> + * now look like this:
> + *
> + * - node1: {1: 4}, leader, has {1: 3} snap.
> + * - node2: {1: 3}, booted from node1, has only snap.
> + * - node3: {1: 4}, booted from node1, has only snap.
> + *
> + * If the cluster is a fullmesh, node2 will send
> + * subscribe requests with vclock {1: 3}. If node3
> + * receives it, it will respond with xlog gap error,
> + * because it only has a snap with {1: 4}, nothing else.
> + * In that case node2 should retry connecting to node3,
> + * and in the meantime try to get newer changes from
> + * node1.
> + */
> + applier_log_error(applier, e);
> + applier_disconnect(applier, APPLIER_LOADING);
> + goto reconnect;
> } catch (FiberIsCancelled *e) {
> if (!diag_is_empty(&applier->diag)) {
> diag_move(&applier->diag, &fiber()->diag);
> diff --git a/test/replication/force_recovery.result b/test/replication/force_recovery.result
> deleted file mode 100644
> index f50452858..000000000
> --- a/test/replication/force_recovery.result
> +++ /dev/null
> @@ -1,110 +0,0 @@
> -test_run = require('test_run').new()
> ----
> -...
> -fio = require('fio')
> ----
> -...
> ---
> --- Test that box.cfg.force_recovery is ignored by relay threads (gh-3910).
> ---
> -_ = box.schema.space.create('test')
> ----
> -...
> -_ = box.space.test:create_index('primary')
> ----
> -...
> -box.schema.user.grant('guest', 'replication')
> ----
> -...
> --- Deploy a replica.
> -test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'")
> ----
> -- true
> -...
> -test_run:cmd("start server test")
> ----
> -- true
> -...
> --- Stop the replica and wait for the relay thread to exit.
> -test_run:cmd("stop server test")
> ----
> -- true
> -...
> -test_run:wait_cond(function() return box.info.replication[2].downstream.status == 'stopped' end, 10)
> ----
> -- true
> -...
> --- Delete an xlog file that is needed by the replica.
> -box.snapshot()
> ----
> -- ok
> -...
> -xlog = fio.pathjoin(box.cfg.wal_dir, string.format('%020d.xlog', box.info.signature))
> ----
> -...
> -box.space.test:replace{1}
> ----
> -- [1]
> -...
> -box.snapshot()
> ----
> -- ok
> -...
> -box.space.test:replace{2}
> ----
> -- [2]
> -...
> -fio.unlink(xlog)
> ----
> -- true
> -...
> --- Check that even though box.cfg.force_recovery is set,
> --- replication will still fail due to LSN gap.
> -box.cfg{force_recovery = true}
> ----
> -...
> -test_run:cmd("start server test")
> ----
> -- true
> -...
> -test_run:cmd("switch test")
> ----
> -- true
> -...
> -box.space.test:select()
> ----
> -- []
> -...
> -box.info.replication[1].upstream.status == 'stopped' or box.info
> ----
> -- true
> -...
> -test_run:cmd("switch default")
> ----
> -- true
> -...
> -box.cfg{force_recovery = false}
> ----
> -...
> --- Cleanup.
> -test_run:cmd("stop server test")
> ----
> -- true
> -...
> -test_run:cmd("cleanup server test")
> ----
> -- true
> -...
> -test_run:cmd("delete server test")
> ----
> -- true
> -...
> -test_run:cleanup_cluster()
> ----
> -...
> -box.schema.user.revoke('guest', 'replication')
> ----
> -...
> -box.space.test:drop()
> ----
> -...
> diff --git a/test/replication/force_recovery.test.lua b/test/replication/force_recovery.test.lua
> deleted file mode 100644
> index 54307814b..000000000
> --- a/test/replication/force_recovery.test.lua
> +++ /dev/null
> @@ -1,43 +0,0 @@
> -test_run = require('test_run').new()
> -fio = require('fio')
> -
> ---
> --- Test that box.cfg.force_recovery is ignored by relay threads (gh-3910).
> ---
> -_ = box.schema.space.create('test')
> -_ = box.space.test:create_index('primary')
> -box.schema.user.grant('guest', 'replication')
> -
> --- Deploy a replica.
> -test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'")
> -test_run:cmd("start server test")
> -
> --- Stop the replica and wait for the relay thread to exit.
> -test_run:cmd("stop server test")
> -test_run:wait_cond(function() return box.info.replication[2].downstream.status == 'stopped' end, 10)
> -
> --- Delete an xlog file that is needed by the replica.
> -box.snapshot()
> -xlog = fio.pathjoin(box.cfg.wal_dir, string.format('%020d.xlog', box.info.signature))
> -box.space.test:replace{1}
> -box.snapshot()
> -box.space.test:replace{2}
> -fio.unlink(xlog)
> -
> --- Check that even though box.cfg.force_recovery is set,
> --- replication will still fail due to LSN gap.
> -box.cfg{force_recovery = true}
> -test_run:cmd("start server test")
> -test_run:cmd("switch test")
> -box.space.test:select()
> -box.info.replication[1].upstream.status == 'stopped' or box.info
> -test_run:cmd("switch default")
> -box.cfg{force_recovery = false}
> -
> --- Cleanup.
> -test_run:cmd("stop server test")
> -test_run:cmd("cleanup server test")
> -test_run:cmd("delete server test")
> -test_run:cleanup_cluster()
> -box.schema.user.revoke('guest', 'replication')
> -box.space.test:drop()
> diff --git a/test/replication/replica.lua b/test/replication/replica.lua
> index f3a6dfe58..ef0d6d63a 100644
> --- a/test/replication/replica.lua
> +++ b/test/replication/replica.lua
> @@ -1,6 +1,7 @@
> #!/usr/bin/env tarantool
>
> repl_include_self = arg[1] and arg[1] == 'true' or false
> +repl_quorum = arg[2] and tonumber(arg[2]) or nil
> repl_list = nil
>
> if repl_include_self then
> @@ -14,6 +15,7 @@ box.cfg({
> replication = repl_list,
> memtx_memory = 107374182,
> replication_timeout = 0.1,
> + replication_connect_quorum = repl_quorum,
> })
>
> require('console').listen(os.getenv('ADMIN'))
> diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result
> index dd04ae297..7b64c3813 100644
> --- a/test/replication/replica_rejoin.result
> +++ b/test/replication/replica_rejoin.result
> @@ -213,7 +213,7 @@ test_run:wait_cond(function() return #fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.
> box.cfg{checkpoint_count = checkpoint_count}
> ---
> ...
> -test_run:cmd("start server replica with args='true'")
> +test_run:cmd("start server replica with args='true 0'")
> ---
> - true
> ...
> @@ -221,9 +221,9 @@ test_run:cmd("switch replica")
> ---
> - true
> ...
> -box.info.status -- orphan
> +test_run:wait_upstream(1, {message_re = 'Missing %.xlog file'})
> ---
> -- orphan
> +- true
> ...
> box.space.test:select()
> ---
> diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua
> index 410ef44d7..dcd0557cb 100644
> --- a/test/replication/replica_rejoin.test.lua
> +++ b/test/replication/replica_rejoin.test.lua
> @@ -79,9 +79,9 @@ for i = 1, 3 do box.space.test:insert{i * 100} end
> fio = require('fio')
> test_run:wait_cond(function() return #fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) == 1 end) or fio.pathjoin(box.cfg.wal_dir, '*.xlog')
> box.cfg{checkpoint_count = checkpoint_count}
> -test_run:cmd("start server replica with args='true'")
> +test_run:cmd("start server replica with args='true 0'")
> test_run:cmd("switch replica")
> -box.info.status -- orphan
> +test_run:wait_upstream(1, {message_re = 'Missing %.xlog file'})
> box.space.test:select()
>
> --
> diff --git a/test/replication/show_error_on_disconnect.result b/test/replication/show_error_on_disconnect.result
> index 48003db06..19d3886e4 100644
> --- a/test/replication/show_error_on_disconnect.result
> +++ b/test/replication/show_error_on_disconnect.result
> @@ -72,7 +72,7 @@ box.space.test:select()
> other_id = box.info.id % 2 + 1
> ---
> ...
> -test_run:wait_upstream(other_id, {status = 'stopped', message_re = 'Missing'})
> +test_run:wait_upstream(other_id, {status = 'loading', message_re = 'Missing'})
> ---
> - true
> ...
> diff --git a/test/replication/show_error_on_disconnect.test.lua b/test/replication/show_error_on_disconnect.test.lua
> index 1b0ea4373..dce926a34 100644
> --- a/test/replication/show_error_on_disconnect.test.lua
> +++ b/test/replication/show_error_on_disconnect.test.lua
> @@ -29,7 +29,7 @@ test_run:cmd("switch master_quorum1")
> box.cfg{replication = repl}
> box.space.test:select()
> other_id = box.info.id % 2 + 1
> -test_run:wait_upstream(other_id, {status = 'stopped', message_re = 'Missing'})
> +test_run:wait_upstream(other_id, {status = 'loading', message_re = 'Missing'})
> test_run:cmd("switch master_quorum2")
> box.space.test:select()
> other_id = box.info.id % 2 + 1
> diff --git a/test/xlog/panic_on_wal_error.result b/test/xlog/panic_on_wal_error.result
> deleted file mode 100644
> index 22f14f912..000000000
> --- a/test/xlog/panic_on_wal_error.result
> +++ /dev/null
> @@ -1,171 +0,0 @@
> --- preparatory stuff
> -env = require('test_run')
> ----
> -...
> -test_run = env.new()
> ----
> -...
> -test_run:cmd("restart server default with cleanup=True")
> -box.schema.user.grant('guest', 'replication')
> ----
> -...
> -_ = box.schema.space.create('test')
> ----
> -...
> -_ = box.space.test:create_index('pk')
> ----
> -...
> ---
> --- reopen xlog
> ---
> -test_run:cmd("restart server default")
> -box.space.test ~= nil
> ----
> -- true
> -...
> --- insert some stuff
> ---
> -box.space.test:auto_increment{'before snapshot'}
> ----
> -- [1, 'before snapshot']
> -...
> ---
> --- this snapshot will go to the replica
> ---
> -box.snapshot()
> ----
> -- ok
> -...
> ---
> --- create a replica, let it catch up somewhat
> ---
> -test_run:cmd("create server replica with rpl_master=default, script='xlog/replica.lua'")
> ----
> -- true
> -...
> -test_run:cmd("start server replica")
> ----
> -- true
> -...
> -test_run:cmd("switch replica")
> ----
> -- true
> -...
> -box.space.test:select{}
> ----
> -- - [1, 'before snapshot']
> -...
> ---
> --- stop replica, restart the master, insert more stuff
> --- which will make it into an xlog only
> ---
> -test_run:cmd("switch default")
> ----
> -- true
> -...
> -test_run:cmd("stop server replica")
> ----
> -- true
> -...
> -test_run:cmd("restart server default")
> -box.space.test:auto_increment{'after snapshot'}
> ----
> -- [2, 'after snapshot']
> -...
> -box.space.test:auto_increment{'after snapshot - one more row'}
> ----
> -- [3, 'after snapshot - one more row']
> -...
> ---
> --- save snapshot and remove xlogs
> ---
> -box.snapshot()
> ----
> -- ok
> -...
> -fio = require('fio')
> ----
> -...
> -glob = fio.pathjoin(box.cfg.wal_dir, '*.xlog')
> ----
> -...
> -files = fio.glob(glob)
> ----
> -...
> -for _, file in pairs(files) do fio.unlink(file) end
> ----
> -...
> ---
> --- make sure the server has some xlogs, otherwise the
> --- replica doesn't discover the gap in the logs
> ---
> -box.space.test:auto_increment{'after snapshot and restart'}
> ----
> -- [4, 'after snapshot and restart']
> -...
> -box.space.test:auto_increment{'after snapshot and restart - one more row'}
> ----
> -- [5, 'after snapshot and restart - one more row']
> -...
> ---
> --- check that panic is true
> ---
> -box.cfg{force_recovery=false}
> ----
> -...
> -box.cfg.force_recovery
> ----
> -- false
> -...
> ---
> --- try to start the replica, ha-ha
> --- (replication should fail, some rows are missing)
> ---
> -test_run:cmd("start server replica")
> ----
> -- true
> -...
> -test_run:cmd("switch replica")
> ----
> -- true
> -...
> -fiber = require('fiber')
> ----
> -...
> -while box.info.replication[1].upstream.status ~= "stopped" do fiber.sleep(0.001) end
> ----
> -...
> -box.info.replication[1].upstream.status
> ----
> -- stopped
> -...
> -box.info.replication[1].upstream.message
> ----
> -- 'Missing .xlog file between LSN 6 {1: 6} and 8 {1: 8}'
> -...
> -box.space.test:select{}
> ----
> -- - [1, 'before snapshot']
> -...
> ---
> ---
> -test_run:cmd("switch default")
> ----
> -- true
> -...
> -test_run:cmd("stop server replica")
> ----
> -- true
> -...
> -test_run:cmd("cleanup server replica")
> ----
> -- true
> -...
> ---
> --- cleanup
> -box.space.test:drop()
> ----
> -...
> -box.schema.user.revoke('guest', 'replication')
> ----
> -...
> diff --git a/test/xlog/panic_on_wal_error.test.lua b/test/xlog/panic_on_wal_error.test.lua
> deleted file mode 100644
> index 2e95431c6..000000000
> --- a/test/xlog/panic_on_wal_error.test.lua
> +++ /dev/null
> @@ -1,75 +0,0 @@
> --- preparatory stuff
> -env = require('test_run')
> -test_run = env.new()
> -
> -test_run:cmd("restart server default with cleanup=True")
> -box.schema.user.grant('guest', 'replication')
> -_ = box.schema.space.create('test')
> -_ = box.space.test:create_index('pk')
> ---
> --- reopen xlog
> ---
> -test_run:cmd("restart server default")
> -box.space.test ~= nil
> --- insert some stuff
> ---
> -box.space.test:auto_increment{'before snapshot'}
> ---
> --- this snapshot will go to the replica
> ---
> -box.snapshot()
> ---
> --- create a replica, let it catch up somewhat
> ---
> -test_run:cmd("create server replica with rpl_master=default, script='xlog/replica.lua'")
> -test_run:cmd("start server replica")
> -test_run:cmd("switch replica")
> -box.space.test:select{}
> ---
> --- stop replica, restart the master, insert more stuff
> --- which will make it into an xlog only
> ---
> -test_run:cmd("switch default")
> -test_run:cmd("stop server replica")
> -test_run:cmd("restart server default")
> -box.space.test:auto_increment{'after snapshot'}
> -box.space.test:auto_increment{'after snapshot - one more row'}
> ---
> --- save snapshot and remove xlogs
> ---
> -box.snapshot()
> -fio = require('fio')
> -glob = fio.pathjoin(box.cfg.wal_dir, '*.xlog')
> -files = fio.glob(glob)
> -for _, file in pairs(files) do fio.unlink(file) end
> ---
> --- make sure the server has some xlogs, otherwise the
> --- replica doesn't discover the gap in the logs
> ---
> -box.space.test:auto_increment{'after snapshot and restart'}
> -box.space.test:auto_increment{'after snapshot and restart - one more row'}
> ---
> --- check that panic is true
> ---
> -box.cfg{force_recovery=false}
> -box.cfg.force_recovery
> ---
> --- try to start the replica, ha-ha
> --- (replication should fail, some rows are missing)
> ---
> -test_run:cmd("start server replica")
> -test_run:cmd("switch replica")
> -fiber = require('fiber')
> -while box.info.replication[1].upstream.status ~= "stopped" do fiber.sleep(0.001) end
> -box.info.replication[1].upstream.status
> -box.info.replication[1].upstream.message
> -box.space.test:select{}
> ---
> ---
> -test_run:cmd("switch default")
> -test_run:cmd("stop server replica")
> -test_run:cmd("cleanup server replica")
> ---
> --- cleanup
> -box.space.test:drop()
> -box.schema.user.revoke('guest', 'replication')
--
Serge Petrenko
More information about the Tarantool-patches
mailing list