[Tarantool-patches] [PATCH v2 2/4] replication: retry in case of XlogGapError

Serge Petrenko sergepetrenko at tarantool.org
Tue Sep 15 10:35:51 MSK 2020


15.09.2020 02:11, Vladislav Shpilevoy пишет:
> Previously XlogGapError was considered a critical error stopping
> the replication. That may be not so good as it looks.
>
> XlogGapError is a perfectly fine error, which should not kill the
> replication connection. It should be retried instead.
>
> Because here is an example, when the gap can be recovered on its
> own. Consider the case: node1 is a leader, it is booted with
> vclock {1: 3}. Node2 connects and fetches snapshot of node1, it
> also gets vclock {1: 3}. Then node1 writes something and its
> vclock becomes {1: 4}. Now node3 boots from node1, and gets the
> same vclock. Vclocks now look like this:
>
>    - node1: {1: 4}, leader, has {1: 3} snap.
>    - node2: {1: 3}, booted from node1, has only snap.
>    - node3: {1: 4}, booted from node1, has only snap.
>
> If the cluster is a fullmesh, node2 will send subscribe requests
> with vclock {1: 3}. If node3 receives it, it will respond with
> xlog gap error, because it only has a snap with {1: 4}, nothing
> else. In that case node2 should retry connecting to node3, and in
> the meantime try to get newer changes from node1.
>
> The example is totally valid. However it is unreachable now
> because master registers all replicas in _cluster before allowing
> them to make a join. So they all bootstrap from a snapshot
> containing all their IDs. This is a bug, because such
> auto-registration leads to registration of anonymous replicas, if
> they are present during bootstrap. Also it blocks Raft, which
> can't work if there are registered, but not yet joined nodes.
>
> Once the registration problem will be solved in a next commit, the
> XlogGapError will strike quite often during bootstrap. This patch
> won't allow that happen.
>
> Needed for #5287
> ---
>   src/box/applier.cc                            |  27 +++
>   test/replication/force_recovery.result        | 110 -----------
>   test/replication/force_recovery.test.lua      |  43 -----
>   test/replication/replica.lua                  |   2 +
>   test/replication/replica_rejoin.result        |   6 +-
>   test/replication/replica_rejoin.test.lua      |   4 +-
>   .../show_error_on_disconnect.result           |   2 +-
>   .../show_error_on_disconnect.test.lua         |   2 +-
>   test/xlog/panic_on_wal_error.result           | 171 ------------------
>   test/xlog/panic_on_wal_error.test.lua         |  75 --------
>   10 files changed, 36 insertions(+), 406 deletions(-)
>   delete mode 100644 test/replication/force_recovery.result
>   delete mode 100644 test/replication/force_recovery.test.lua
>   delete mode 100644 test/xlog/panic_on_wal_error.result
>   delete mode 100644 test/xlog/panic_on_wal_error.test.lua

Hi! Thanks for the patch!

I propose you rework the tests so that they expect upstream.status == 
'loading'
instead of deleting them altogether.


>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index c1d07ca54..96dd48c0d 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -106,6 +106,7 @@ applier_log_error(struct applier *applier, struct error *e)
>   	case ER_SYSTEM:
>   	case ER_UNKNOWN_REPLICA:
>   	case ER_PASSWORD_MISMATCH:
> +	case ER_XLOG_GAP:
>   		say_info("will retry every %.2lf second",
>   			 replication_reconnect_interval());
>   		break;
> @@ -1333,6 +1334,32 @@ applier_f(va_list ap)
>   				applier_disconnect(applier, APPLIER_STOPPED);
>   				return -1;
>   			}
> +		} catch (XlogGapError *e) {
> +			/*
> +			 * Xlog gap error can't be a critical error. Because it
> +			 * is totally normal during bootstrap. Consider the
> +			 * case: node1 is a leader, it is booted with vclock
> +			 * {1: 3}. Node2 connects and fetches snapshot of node1,
> +			 * it also gets vclock {1: 3}. Then node1 writes
> +			 * something and its vclock becomes {1: 4}. Now node3
> +			 * boots from node1, and gets the same vclock. Vclocks
> +			 * now look like this:
> +			 *
> +			 * - node1: {1: 4}, leader, has {1: 3} snap.
> +			 * - node2: {1: 3}, booted from node1, has only snap.
> +			 * - node3: {1: 4}, booted from node1, has only snap.
> +			 *
> +			 * If the cluster is a fullmesh, node2 will send
> +			 * subscribe requests with vclock {1: 3}. If node3
> +			 * receives it, it will respond with xlog gap error,
> +			 * because it only has a snap with {1: 4}, nothing else.
> +			 * In that case node2 should retry connecting to node3,
> +			 * and in the meantime try to get newer changes from
> +			 * node1.
> +			 */
> +			applier_log_error(applier, e);
> +			applier_disconnect(applier, APPLIER_LOADING);
> +			goto reconnect;
>   		} catch (FiberIsCancelled *e) {
>   			if (!diag_is_empty(&applier->diag)) {
>   				diag_move(&applier->diag, &fiber()->diag);
> diff --git a/test/replication/force_recovery.result b/test/replication/force_recovery.result
> deleted file mode 100644
> index f50452858..000000000
> --- a/test/replication/force_recovery.result
> +++ /dev/null
> @@ -1,110 +0,0 @@
> -test_run = require('test_run').new()
> ----
> -...
> -fio = require('fio')
> ----
> -...
> ---
> --- Test that box.cfg.force_recovery is ignored by relay threads (gh-3910).
> ---
> -_ = box.schema.space.create('test')
> ----
> -...
> -_ = box.space.test:create_index('primary')
> ----
> -...
> -box.schema.user.grant('guest', 'replication')
> ----
> -...
> --- Deploy a replica.
> -test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'")
> ----
> -- true
> -...
> -test_run:cmd("start server test")
> ----
> -- true
> -...
> --- Stop the replica and wait for the relay thread to exit.
> -test_run:cmd("stop server test")
> ----
> -- true
> -...
> -test_run:wait_cond(function() return box.info.replication[2].downstream.status == 'stopped' end, 10)
> ----
> -- true
> -...
> --- Delete an xlog file that is needed by the replica.
> -box.snapshot()
> ----
> -- ok
> -...
> -xlog = fio.pathjoin(box.cfg.wal_dir, string.format('%020d.xlog', box.info.signature))
> ----
> -...
> -box.space.test:replace{1}
> ----
> -- [1]
> -...
> -box.snapshot()
> ----
> -- ok
> -...
> -box.space.test:replace{2}
> ----
> -- [2]
> -...
> -fio.unlink(xlog)
> ----
> -- true
> -...
> --- Check that even though box.cfg.force_recovery is set,
> --- replication will still fail due to LSN gap.
> -box.cfg{force_recovery = true}
> ----
> -...
> -test_run:cmd("start server test")
> ----
> -- true
> -...
> -test_run:cmd("switch test")
> ----
> -- true
> -...
> -box.space.test:select()
> ----
> -- []
> -...
> -box.info.replication[1].upstream.status == 'stopped' or box.info
> ----
> -- true
> -...
> -test_run:cmd("switch default")
> ----
> -- true
> -...
> -box.cfg{force_recovery = false}
> ----
> -...
> --- Cleanup.
> -test_run:cmd("stop server test")
> ----
> -- true
> -...
> -test_run:cmd("cleanup server test")
> ----
> -- true
> -...
> -test_run:cmd("delete server test")
> ----
> -- true
> -...
> -test_run:cleanup_cluster()
> ----
> -...
> -box.schema.user.revoke('guest', 'replication')
> ----
> -...
> -box.space.test:drop()
> ----
> -...
> diff --git a/test/replication/force_recovery.test.lua b/test/replication/force_recovery.test.lua
> deleted file mode 100644
> index 54307814b..000000000
> --- a/test/replication/force_recovery.test.lua
> +++ /dev/null
> @@ -1,43 +0,0 @@
> -test_run = require('test_run').new()
> -fio = require('fio')
> -
> ---
> --- Test that box.cfg.force_recovery is ignored by relay threads (gh-3910).
> ---
> -_ = box.schema.space.create('test')
> -_ = box.space.test:create_index('primary')
> -box.schema.user.grant('guest', 'replication')
> -
> --- Deploy a replica.
> -test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'")
> -test_run:cmd("start server test")
> -
> --- Stop the replica and wait for the relay thread to exit.
> -test_run:cmd("stop server test")
> -test_run:wait_cond(function() return box.info.replication[2].downstream.status == 'stopped' end, 10)
> -
> --- Delete an xlog file that is needed by the replica.
> -box.snapshot()
> -xlog = fio.pathjoin(box.cfg.wal_dir, string.format('%020d.xlog', box.info.signature))
> -box.space.test:replace{1}
> -box.snapshot()
> -box.space.test:replace{2}
> -fio.unlink(xlog)
> -
> --- Check that even though box.cfg.force_recovery is set,
> --- replication will still fail due to LSN gap.
> -box.cfg{force_recovery = true}
> -test_run:cmd("start server test")
> -test_run:cmd("switch test")
> -box.space.test:select()
> -box.info.replication[1].upstream.status == 'stopped' or box.info
> -test_run:cmd("switch default")
> -box.cfg{force_recovery = false}
> -
> --- Cleanup.
> -test_run:cmd("stop server test")
> -test_run:cmd("cleanup server test")
> -test_run:cmd("delete server test")
> -test_run:cleanup_cluster()
> -box.schema.user.revoke('guest', 'replication')
> -box.space.test:drop()
> diff --git a/test/replication/replica.lua b/test/replication/replica.lua
> index f3a6dfe58..ef0d6d63a 100644
> --- a/test/replication/replica.lua
> +++ b/test/replication/replica.lua
> @@ -1,6 +1,7 @@
>   #!/usr/bin/env tarantool
>   
>   repl_include_self = arg[1] and arg[1] == 'true' or false
> +repl_quorum = arg[2] and tonumber(arg[2]) or nil
>   repl_list = nil
>   
>   if repl_include_self then
> @@ -14,6 +15,7 @@ box.cfg({
>       replication         = repl_list,
>       memtx_memory        = 107374182,
>       replication_timeout = 0.1,
> +    replication_connect_quorum = repl_quorum,
>   })
>   
>   require('console').listen(os.getenv('ADMIN'))
> diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result
> index dd04ae297..7b64c3813 100644
> --- a/test/replication/replica_rejoin.result
> +++ b/test/replication/replica_rejoin.result
> @@ -213,7 +213,7 @@ test_run:wait_cond(function() return #fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.
>   box.cfg{checkpoint_count = checkpoint_count}
>   ---
>   ...
> -test_run:cmd("start server replica with args='true'")
> +test_run:cmd("start server replica with args='true 0'")
>   ---
>   - true
>   ...
> @@ -221,9 +221,9 @@ test_run:cmd("switch replica")
>   ---
>   - true
>   ...
> -box.info.status -- orphan
> +test_run:wait_upstream(1, {message_re = 'Missing %.xlog file'})
>   ---
> -- orphan
> +- true
>   ...
>   box.space.test:select()
>   ---
> diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua
> index 410ef44d7..dcd0557cb 100644
> --- a/test/replication/replica_rejoin.test.lua
> +++ b/test/replication/replica_rejoin.test.lua
> @@ -79,9 +79,9 @@ for i = 1, 3 do box.space.test:insert{i * 100} end
>   fio = require('fio')
>   test_run:wait_cond(function() return #fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) == 1 end) or fio.pathjoin(box.cfg.wal_dir, '*.xlog')
>   box.cfg{checkpoint_count = checkpoint_count}
> -test_run:cmd("start server replica with args='true'")
> +test_run:cmd("start server replica with args='true 0'")
>   test_run:cmd("switch replica")
> -box.info.status -- orphan
> +test_run:wait_upstream(1, {message_re = 'Missing %.xlog file'})
>   box.space.test:select()
>   
>   --
> diff --git a/test/replication/show_error_on_disconnect.result b/test/replication/show_error_on_disconnect.result
> index 48003db06..19d3886e4 100644
> --- a/test/replication/show_error_on_disconnect.result
> +++ b/test/replication/show_error_on_disconnect.result
> @@ -72,7 +72,7 @@ box.space.test:select()
>   other_id = box.info.id % 2 + 1
>   ---
>   ...
> -test_run:wait_upstream(other_id, {status = 'stopped', message_re = 'Missing'})
> +test_run:wait_upstream(other_id, {status = 'loading', message_re = 'Missing'})
>   ---
>   - true
>   ...
> diff --git a/test/replication/show_error_on_disconnect.test.lua b/test/replication/show_error_on_disconnect.test.lua
> index 1b0ea4373..dce926a34 100644
> --- a/test/replication/show_error_on_disconnect.test.lua
> +++ b/test/replication/show_error_on_disconnect.test.lua
> @@ -29,7 +29,7 @@ test_run:cmd("switch master_quorum1")
>   box.cfg{replication = repl}
>   box.space.test:select()
>   other_id = box.info.id % 2 + 1
> -test_run:wait_upstream(other_id, {status = 'stopped', message_re = 'Missing'})
> +test_run:wait_upstream(other_id, {status = 'loading', message_re = 'Missing'})
>   test_run:cmd("switch master_quorum2")
>   box.space.test:select()
>   other_id = box.info.id % 2 + 1
> diff --git a/test/xlog/panic_on_wal_error.result b/test/xlog/panic_on_wal_error.result
> deleted file mode 100644
> index 22f14f912..000000000
> --- a/test/xlog/panic_on_wal_error.result
> +++ /dev/null
> @@ -1,171 +0,0 @@
> --- preparatory stuff
> -env = require('test_run')
> ----
> -...
> -test_run = env.new()
> ----
> -...
> -test_run:cmd("restart server default with cleanup=True")
> -box.schema.user.grant('guest', 'replication')
> ----
> -...
> -_ = box.schema.space.create('test')
> ----
> -...
> -_ = box.space.test:create_index('pk')
> ----
> -...
> ---
> --- reopen xlog
> ---
> -test_run:cmd("restart server default")
> -box.space.test ~= nil
> ----
> -- true
> -...
> --- insert some stuff
> ---
> -box.space.test:auto_increment{'before snapshot'}
> ----
> -- [1, 'before snapshot']
> -...
> ---
> --- this snapshot will go to the replica
> ---
> -box.snapshot()
> ----
> -- ok
> -...
> ---
> --- create a replica, let it catch up somewhat
> ---
> -test_run:cmd("create server replica with rpl_master=default, script='xlog/replica.lua'")
> ----
> -- true
> -...
> -test_run:cmd("start server replica")
> ----
> -- true
> -...
> -test_run:cmd("switch replica")
> ----
> -- true
> -...
> -box.space.test:select{}
> ----
> -- - [1, 'before snapshot']
> -...
> ---
> --- stop replica, restart the master, insert more stuff
> --- which will make it into an xlog only
> ---
> -test_run:cmd("switch default")
> ----
> -- true
> -...
> -test_run:cmd("stop server replica")
> ----
> -- true
> -...
> -test_run:cmd("restart server default")
> -box.space.test:auto_increment{'after snapshot'}
> ----
> -- [2, 'after snapshot']
> -...
> -box.space.test:auto_increment{'after snapshot - one more row'}
> ----
> -- [3, 'after snapshot - one more row']
> -...
> ---
> --- save snapshot and remove xlogs
> ---
> -box.snapshot()
> ----
> -- ok
> -...
> -fio = require('fio')
> ----
> -...
> -glob = fio.pathjoin(box.cfg.wal_dir, '*.xlog')
> ----
> -...
> -files = fio.glob(glob)
> ----
> -...
> -for _, file in pairs(files) do fio.unlink(file) end
> ----
> -...
> ---
> --- make sure the server has some xlogs, otherwise the
> --- replica doesn't discover the gap in the logs
> ---
> -box.space.test:auto_increment{'after snapshot and restart'}
> ----
> -- [4, 'after snapshot and restart']
> -...
> -box.space.test:auto_increment{'after snapshot and restart - one more row'}
> ----
> -- [5, 'after snapshot and restart - one more row']
> -...
> ---
> ---  check that panic is true
> ---
> -box.cfg{force_recovery=false}
> ----
> -...
> -box.cfg.force_recovery
> ----
> -- false
> -...
> ---
> --- try to start the replica, ha-ha
> --- (replication should fail, some rows are missing)
> ---
> -test_run:cmd("start server replica")
> ----
> -- true
> -...
> -test_run:cmd("switch replica")
> ----
> -- true
> -...
> -fiber = require('fiber')
> ----
> -...
> -while box.info.replication[1].upstream.status ~= "stopped" do fiber.sleep(0.001) end
> ----
> -...
> -box.info.replication[1].upstream.status
> ----
> -- stopped
> -...
> -box.info.replication[1].upstream.message
> ----
> -- 'Missing .xlog file between LSN 6 {1: 6} and 8 {1: 8}'
> -...
> -box.space.test:select{}
> ----
> -- - [1, 'before snapshot']
> -...
> ---
> ---
> -test_run:cmd("switch default")
> ----
> -- true
> -...
> -test_run:cmd("stop server replica")
> ----
> -- true
> -...
> -test_run:cmd("cleanup server replica")
> ----
> -- true
> -...
> ---
> --- cleanup
> -box.space.test:drop()
> ----
> -...
> -box.schema.user.revoke('guest', 'replication')
> ----
> -...
> diff --git a/test/xlog/panic_on_wal_error.test.lua b/test/xlog/panic_on_wal_error.test.lua
> deleted file mode 100644
> index 2e95431c6..000000000
> --- a/test/xlog/panic_on_wal_error.test.lua
> +++ /dev/null
> @@ -1,75 +0,0 @@
> --- preparatory stuff
> -env = require('test_run')
> -test_run = env.new()
> -
> -test_run:cmd("restart server default with cleanup=True")
> -box.schema.user.grant('guest', 'replication')
> -_ = box.schema.space.create('test')
> -_ = box.space.test:create_index('pk')
> ---
> --- reopen xlog
> ---
> -test_run:cmd("restart server default")
> -box.space.test ~= nil
> --- insert some stuff
> ---
> -box.space.test:auto_increment{'before snapshot'}
> ---
> --- this snapshot will go to the replica
> ---
> -box.snapshot()
> ---
> --- create a replica, let it catch up somewhat
> ---
> -test_run:cmd("create server replica with rpl_master=default, script='xlog/replica.lua'")
> -test_run:cmd("start server replica")
> -test_run:cmd("switch replica")
> -box.space.test:select{}
> ---
> --- stop replica, restart the master, insert more stuff
> --- which will make it into an xlog only
> ---
> -test_run:cmd("switch default")
> -test_run:cmd("stop server replica")
> -test_run:cmd("restart server default")
> -box.space.test:auto_increment{'after snapshot'}
> -box.space.test:auto_increment{'after snapshot - one more row'}
> ---
> --- save snapshot and remove xlogs
> ---
> -box.snapshot()
> -fio = require('fio')
> -glob = fio.pathjoin(box.cfg.wal_dir, '*.xlog')
> -files = fio.glob(glob)
> -for _, file in pairs(files) do fio.unlink(file) end
> ---
> --- make sure the server has some xlogs, otherwise the
> --- replica doesn't discover the gap in the logs
> ---
> -box.space.test:auto_increment{'after snapshot and restart'}
> -box.space.test:auto_increment{'after snapshot and restart - one more row'}
> ---
> ---  check that panic is true
> ---
> -box.cfg{force_recovery=false}
> -box.cfg.force_recovery
> ---
> --- try to start the replica, ha-ha
> --- (replication should fail, some rows are missing)
> ---
> -test_run:cmd("start server replica")
> -test_run:cmd("switch replica")
> -fiber = require('fiber')
> -while box.info.replication[1].upstream.status ~= "stopped" do fiber.sleep(0.001) end
> -box.info.replication[1].upstream.status
> -box.info.replication[1].upstream.message
> -box.space.test:select{}
> ---
> ---
> -test_run:cmd("switch default")
> -test_run:cmd("stop server replica")
> -test_run:cmd("cleanup server replica")
> ---
> --- cleanup
> -box.space.test:drop()
> -box.schema.user.revoke('guest', 'replication')

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list