[Tarantool-patches] [PATCH] replication: removing anonymous replicas from synchro quorum

Serge Petrenko sergepetrenko at tarantool.org
Mon Sep 27 09:20:20 MSK 2021



21.09.2021 13:00, Yan Shtunder пишет:
> Transactions have to committed after they reaches quorum of "real"
> cluster members. Therefore, anonymous replicas don't have to
> participate in the quorum.
>
> Closes #5418


Hi! Thanks for the patch!


> ---
> Issue: https://github.com/tarantool/tarantool/issues/5418
> Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-5418-qsync-with-anon-replicas


Please, rebase the branch on top of the current master.


>
>   src/box/relay.cc                              |   3 +-
>   test/replication-luatest/gh_5418_test.lua     |  81 ++++++++
>   .../instance_files/master.lua                 |  19 ++
>   .../instance_files/replica.lua                |  22 +++
>   test/replication-luatest/suite.ini            |   3 +
>   test/replication/qsync_with_anon.result       | 187 ++----------------
>   test/replication/qsync_with_anon.test.lua     |  83 ++------
>   7 files changed, 161 insertions(+), 237 deletions(-)
>   create mode 100644 test/replication-luatest/gh_5418_test.lua
>   create mode 100755 test/replication-luatest/instance_files/master.lua
>   create mode 100755 test/replication-luatest/instance_files/replica.lua
>   create mode 100644 test/replication-luatest/suite.ini
>
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 115037fc3..4d5f9a625 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -515,6 +515,7 @@ tx_status_update(struct cmsg *msg)
>   	struct replication_ack ack;
>   	ack.source = status->relay->replica->id;
>   	ack.vclock = &status->vclock;
> +	bool anon = status->relay->replica->anon;
>   	/*
>   	 * Let pending synchronous transactions know, which of
>   	 * them were successfully sent to the replica. Acks are
> @@ -522,7 +523,7 @@ tx_status_update(struct cmsg *msg)
>   	 * the single master in 100% so far). Other instances wait
>   	 * for master's CONFIRM message instead.
>   	 */
> -	if (txn_limbo.owner_id == instance_id) {
> +	if (txn_limbo.owner_id == instance_id && !anon) {
>   		txn_limbo_ack(&txn_limbo, ack.source,
>   			      vclock_get(ack.vclock, instance_id));
>   	}
> diff --git a/test/replication-luatest/gh_5418_test.lua b/test/replication-luatest/gh_5418_test.lua
> new file mode 100644
> index 000000000..858fe342f
> --- /dev/null
> +++ b/test/replication-luatest/gh_5418_test.lua
> @@ -0,0 +1,81 @@
> +local fio = require('fio')
> +local log = require('log')
> +local fiber = require('fiber')
> +
> +local t = require('luatest')
> +local g = t.group()
> +
> +local Server = t.Server
> +
> +g.before_all(function()
> +    g.master = Server:new({
> +        alias = 'master',
> +        command = './test/replication-luatest/instance_files/master.lua',


Can the command be shortened to `./master.lua` ?


> +        workdir = fio.tempdir(),
> +        http_port = 8081,


Can you omit the http_port?
It's unused in the test.


> +        net_box_port = 13301,
> +    })
> +
> +    g.replica = Server:new({
> +        alias = 'replica',
> +        command = './test/replication-luatest/instance_files/replica.lua',
> +        workdir = fio.tempdir(),
> +        env = {TARANTOOL_MASTER = '13301'},
> +        http_port = 8082,
> +        net_box_port = 13302,
> +    })
> +
> +
> +    g.master:start()
> +    g.replica:start()
> +
> +    t.helpers.retrying({}, function() g.master:connect_net_box() end)
> +    t.helpers.retrying({}, function() g.replica:connect_net_box() end)
> +
> +    log.info('Everything is started')
> +end)
> +
> +g.after_all(function()
> +    g.replica:stop()
> +    g.master:stop()
> +    fio.rmtree(g.master.workdir)
> +    fio.rmtree(g.replica.workdir)
> +end)
> +
> +local function wait_vclock(timeout)
> +    local started_at = fiber.clock()
> +    local lsn = g.master:eval("return box.info.vclock[1]")
> +
> +    local _, tbl = g.master:eval("return next(box.info.replication_anon())")
> +    local to_lsn = tbl.downstream.vclock[1]
> +
> +    while to_lsn == nil or to_lsn < lsn do
> +        fiber.sleep(0.001)
> +
> +        if (fiber.clock() - started_at) > timeout then
> +            return false
> +        end
> +
> +        _, tbl = g.master:eval("return next(box.info.replication_anon())")
> +        to_lsn = tbl.downstream.vclock[1]
> +
> +        log.info(string.format("master lsn: %d; replica_anon lsn: %d",
> +            lsn, to_lsn))
> +    end
> +
> +    return true
> +end
> +
> +g.test_qsync_with_anon = function()
> +    g.master:eval("box.schema.space.create('sync', {is_sync = true})")
> +    g.master:eval("box.space.sync:create_index('pk')")
> +
> +    t.assert_error_msg_content_equals("Quorum collection for a synchronous transaction is timed out",
> +        function() g.master:eval("return box.space.sync:insert{1}") end)
> +
> +    -- Wait until everything is replicated from the master to the replica
> +    t.assert(wait_vclock(1))
> +
> +    t.assert_equals(g.master:eval("return box.space.sync:select()"), {})
> +    t.assert_equals(g.replica:eval("return box.space.sync:select()"), {})
> +end
> diff --git a/test/replication-luatest/instance_files/master.lua b/test/replication-luatest/instance_files/master.lua
> new file mode 100755
> index 000000000..f2af4b529
> --- /dev/null
> +++ b/test/replication-luatest/instance_files/master.lua
> @@ -0,0 +1,19 @@
> +#!/usr/bin/env tarantool
> +
> +local function instance_uri(instance)
> +    local port = os.getenv(instance)
> +    return 'localhost:' .. port
> +end
> +
> +box.cfg({
> +    --log_level         = 7,
> +    work_dir            = os.getenv('TARANTOOL_WORKDIR'),
> +    listen              = os.getenv('TARANTOOL_LISTEN'),
> +    replication         = {instance_uri('TARANTOOL_LISTEN')},
> +    memtx_memory        = 107374182,
> +    replication_synchro_quorum = 2,
> +    replication_timeout = 0.1
> +})
> +
> +box.schema.user.grant('guest', 'read, write, execute, create', 'universe', nil, {if_not_exists=true})
> +require('log').warn("master is ready")
> diff --git a/test/replication-luatest/instance_files/replica.lua b/test/replication-luatest/instance_files/replica.lua
> new file mode 100755
> index 000000000..ed830cde4
> --- /dev/null
> +++ b/test/replication-luatest/instance_files/replica.lua
> @@ -0,0 +1,22 @@
> +#!/usr/bin/env tarantool
> +
> +local function instance_uri(instance)
> +    local port = os.getenv(instance)
> +    return 'localhost:' .. port
> +end
> +
> +box.cfg({
> +    work_dir            = os.getenv('TARANTOOL_WORKDIR'),
> +    listen              = os.getenv('TARANTOOL_LISTEN'),
> +    replication         = {
> +        instance_uri('TARANTOOL_MASTER'),
> +        instance_uri('TARANTOOL_LISTEN')
> +    },
> +    memtx_memory        = 107374182,
> +    replication_timeout = 0.1,
> +    replication_connect_timeout = 0.5,
> +    read_only           = true,
> +    replication_anon    = true
> +})
> +
> +require('log').warn("replica is ready")
> diff --git a/test/replication-luatest/suite.ini b/test/replication-luatest/suite.ini
> new file mode 100644
> index 000000000..ccd624099
> --- /dev/null
> +++ b/test/replication-luatest/suite.ini
> @@ -0,0 +1,3 @@
> +[default]
> +core = luatest
> +description = first try of using luatest
> diff --git a/test/replication/qsync_with_anon.result b/test/replication/qsync_with_anon.result
> index 6a2952a32..d847a77aa 100644
> --- a/test/replication/qsync_with_anon.result
> +++ b/test/replication/qsync_with_anon.result


Since you've rewritten the test in luatest, you may remove the 
diff-based test altogether.


> @@ -1,195 +1,57 @@
>   -- test-run result file version 2
> -env = require('test_run')
> - | ---
> - | ...
> -test_run = env.new()
> - | ---
> - | ...
> -engine = test_run:get_cfg('engine')
> - | ---
> - | ...
> -
> -orig_synchro_quorum = box.cfg.replication_synchro_quorum
> - | ---
> - | ...
> -orig_synchro_timeout = box.cfg.replication_synchro_timeout
> +test_run = require('test_run').new()
>    | ---
>    | ...
>
>   NUM_INSTANCES = 2
>    | ---
>    | ...
> -BROKEN_QUORUM = NUM_INSTANCES + 1
> - | ---
> - | ...
>
>   box.schema.user.grant('guest', 'replication')
>    | ---
>    | ...
> -
> --- Setup a cluster with anonymous replica.
> -test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon1.lua"')
> - | ---
> - | - true
> - | ...
> -test_run:cmd('start server replica_anon')
> - | ---
> - | - true
> - | ...
> -test_run:cmd('switch replica_anon')
> - | ---
> - | - true
> - | ...
> -
> --- [RFC, Asynchronous replication] successful transaction applied on async
> --- replica.
> --- Testcase setup.
> -test_run:switch('default')
> - | ---
> - | - true
> - | ...
> -box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000}
> - | ---
> - | ...
> -_ = box.schema.space.create('sync', {is_sync=true, engine=engine})
> - | ---
> - | ...
> -_ = box.space.sync:create_index('pk')
> - | ---
> - | ...
> --- Testcase body.
> -test_run:switch('default')
> - | ---
> - | - true
> - | ...
> -box.space.sync:insert{1} -- success
> - | ---
> - | - [1]
> - | ...
> -box.space.sync:insert{2} -- success
> - | ---
> - | - [2]
> - | ...
> -box.space.sync:insert{3} -- success
> - | ---
> - | - [3]
> - | ...
> -test_run:cmd('switch replica_anon')
> - | ---
> - | - true
> - | ...
> -box.space.sync:select{} -- 1, 2, 3
> - | ---
> - | - - [1]
> - |   - [2]
> - |   - [3]
> - | ...
> --- Testcase cleanup.
> -test_run:switch('default')
> - | ---
> - | - true
> - | ...
> -box.space.sync:drop()
> - | ---
> - | ...
> -
> --- [RFC, Asynchronous replication] failed transaction rolled back on async
> --- replica.
> --- Testcase setup.
> -box.cfg{replication_synchro_quorum = NUM_INSTANCES, replication_synchro_timeout = 1000}
> +box.cfg{replication_synchro_quorum=NUM_INSTANCES}
>    | ---
>    | ...
> -_ = box.schema.space.create('sync', {is_sync=true, engine=engine})
> +_ = box.schema.space.create('sync', {is_sync=true})
>    | ---
>    | ...
>   _ = box.space.sync:create_index('pk')
>    | ---
>    | ...
> --- Write something to flush the current master's state to replica.
> -_ = box.space.sync:insert{1}
> - | ---
> - | ...
> -_ = box.space.sync:delete{1}
> - | ---
> - | ...
>
> -box.cfg{replication_synchro_quorum = BROKEN_QUORUM, replication_synchro_timeout = 1000}
> - | ---
> - | ...
> -fiber = require('fiber')
> - | ---
> - | ...
> -ok, err = nil
> - | ---
> - | ...
> -f = fiber.create(function()                                                     \
> -    ok, err = pcall(box.space.sync.insert, box.space.sync, {1})                 \
> -end)
> - | ---
> - | ...
>
> -test_run:cmd('switch replica_anon')
> +-- Setup a cluster with anonymous replica
> +test_run:cmd('create server replica_anon with rpl_master=default,\
> +              script="replication/anon1.lua"')
>    | ---
>    | - true
>    | ...
> -test_run:wait_cond(function() return box.space.sync:count() == 1 end)
> +test_run:cmd('start server replica_anon')
>    | ---
>    | - true
>    | ...
> -box.space.sync:select{}
> - | ---
> - | - - [1]
> - | ...
>
> -test_run:switch('default')
> - | ---
> - | - true
> - | ...
> -box.cfg{replication_synchro_timeout = 0.001}
> - | ---
> - | ...
> -test_run:wait_cond(function() return f:status() == 'dead' end)
> +
> +-- Testcase
> +box.space.sync:insert{1}    -- error
>    | ---
> - | - true
> + | - error: Quorum collection for a synchronous transaction is timed out
>    | ...
> -box.space.sync:select{}
> +box.space.sync:insert{3}    -- error
>    | ---
> - | - []
> + | - error: Quorum collection for a synchronous transaction is timed out
>    | ...
> -
>   test_run:cmd('switch replica_anon')
>    | ---
>    | - true
>    | ...
> -test_run:wait_cond(function() return box.space.sync:count() == 0 end)
> - | ---
> - | - true
> - | ...
> -box.space.sync:select{}
> +box.space.sync:select{}     -- []
>    | ---
>    | - []
>    | ...
>
> -test_run:switch('default')
> - | ---
> - | - true
> - | ...
> -box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000}
> - | ---
> - | ...
> -box.space.sync:insert{1} -- success
> - | ---
> - | - [1]
> - | ...
> -test_run:cmd('switch replica_anon')
> - | ---
> - | - true
> - | ...
> -box.space.sync:select{} -- 1
> - | ---
> - | - - [1]
> - | ...
> --- Testcase cleanup.
> +-- Testcase cleanup
>   test_run:switch('default')
>    | ---
>    | - true
> @@ -198,12 +60,13 @@ box.space.sync:drop()
>    | ---
>    | ...
>
> --- Teardown.
> -test_run:switch('default')
> +
> +-- Teardown
> +test_run:cmd('stop server replica_anon')
>    | ---
>    | - true
>    | ...
> -test_run:cmd('stop server replica_anon')
> +test_run:cmd('cleanup server replica_anon')
>    | ---
>    | - true
>    | ...
> @@ -211,15 +74,3 @@ test_run:cmd('delete server replica_anon')
>    | ---
>    | - true
>    | ...
> -box.schema.user.revoke('guest', 'replication')
> - | ---
> - | ...
> -box.cfg{                                                                        \
> -    replication_synchro_quorum = orig_synchro_quorum,                           \
> -    replication_synchro_timeout = orig_synchro_timeout,                         \
> -}
> - | ---
> - | ...
> -test_run:cleanup_cluster()
> - | ---
> - | ...
> diff --git a/test/replication/qsync_with_anon.test.lua b/test/replication/qsync_with_anon.test.lua
> index d7ecaa107..2d92f08aa 100644
> --- a/test/replication/qsync_with_anon.test.lua
> +++ b/test/replication/qsync_with_anon.test.lua
> @@ -1,84 +1,31 @@
> -env = require('test_run')
> -test_run = env.new()
> -engine = test_run:get_cfg('engine')
> -
> -orig_synchro_quorum = box.cfg.replication_synchro_quorum
> -orig_synchro_timeout = box.cfg.replication_synchro_timeout
> +test_run = require('test_run').new()
>
>   NUM_INSTANCES = 2
> -BROKEN_QUORUM = NUM_INSTANCES + 1
>
>   box.schema.user.grant('guest', 'replication')
> -
> --- Setup a cluster with anonymous replica.
> -test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon1.lua"')
> -test_run:cmd('start server replica_anon')
> -test_run:cmd('switch replica_anon')
> -
> --- [RFC, Asynchronous replication] successful transaction applied on async
> --- replica.
> --- Testcase setup.
> -test_run:switch('default')
> -box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000}
> -_ = box.schema.space.create('sync', {is_sync=true, engine=engine})
> +box.cfg{replication_synchro_quorum=NUM_INSTANCES}
> +_ = box.schema.space.create('sync', {is_sync=true})
>   _ = box.space.sync:create_index('pk')
> --- Testcase body.
> -test_run:switch('default')
> -box.space.sync:insert{1} -- success
> -box.space.sync:insert{2} -- success
> -box.space.sync:insert{3} -- success
> -test_run:cmd('switch replica_anon')
> -box.space.sync:select{} -- 1, 2, 3
> --- Testcase cleanup.
> -test_run:switch('default')
> -box.space.sync:drop()
>
> --- [RFC, Asynchronous replication] failed transaction rolled back on async
> --- replica.
> --- Testcase setup.
> -box.cfg{replication_synchro_quorum = NUM_INSTANCES, replication_synchro_timeout = 1000}
> -_ = box.schema.space.create('sync', {is_sync=true, engine=engine})
> -_ = box.space.sync:create_index('pk')
> --- Write something to flush the current master's state to replica.
> -_ = box.space.sync:insert{1}
> -_ = box.space.sync:delete{1}
> -
> -box.cfg{replication_synchro_quorum = BROKEN_QUORUM, replication_synchro_timeout = 1000}
> -fiber = require('fiber')
> -ok, err = nil
> -f = fiber.create(function()                                                     \
> -    ok, err = pcall(box.space.sync.insert, box.space.sync, {1})                 \
> -end)
>
> -test_run:cmd('switch replica_anon')
> -test_run:wait_cond(function() return box.space.sync:count() == 1 end)
> -box.space.sync:select{}
> +-- Setup a cluster with anonymous replica
> +test_run:cmd('create server replica_anon with rpl_master=default,\
> +              script="replication/anon1.lua"')
> +test_run:cmd('start server replica_anon')
>
> -test_run:switch('default')
> -box.cfg{replication_synchro_timeout = 0.001}
> -test_run:wait_cond(function() return f:status() == 'dead' end)
> -box.space.sync:select{}
>
> +-- Testcase
> +box.space.sync:insert{1}    -- error
> +box.space.sync:insert{3}    -- error
>   test_run:cmd('switch replica_anon')
> -test_run:wait_cond(function() return box.space.sync:count() == 0 end)
> -box.space.sync:select{}
> +box.space.sync:select{}     -- []
>
> -test_run:switch('default')
> -box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000}
> -box.space.sync:insert{1} -- success
> -test_run:cmd('switch replica_anon')
> -box.space.sync:select{} -- 1
> --- Testcase cleanup.
> +-- Testcase cleanup
>   test_run:switch('default')
>   box.space.sync:drop()
>
> --- Teardown.
> -test_run:switch('default')
> +
> +-- Teardown
>   test_run:cmd('stop server replica_anon')
> +test_run:cmd('cleanup server replica_anon')
>   test_run:cmd('delete server replica_anon')
> -box.schema.user.revoke('guest', 'replication')
> -box.cfg{                                                                        \
> -    replication_synchro_quorum = orig_synchro_quorum,                           \
> -    replication_synchro_timeout = orig_synchro_timeout,                         \
> -}
> -test_run:cleanup_cluster()
> --
> 2.25.1
>

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list