From: Yan Shtunder via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: sergepetrenko@tarantool.org, shemeneval@gmail.com Cc: tarantool-patches@dev.tarantool.org, Yan Shtunder <ya.shtunder@gmail.com> Subject: [Tarantool-patches] [PATCH] replication: removing anonymous replicas from synchro quorum Date: Fri, 17 Sep 2021 21:10:58 +0300 [thread overview] Message-ID: <20210917181058.47687-1-ya.shtunder@gmail.com> (raw) Transactions have to committed after they reaches quorum of "real" cluster members. Therefore, anonymous replicas don't have to participate in the quorum. Closes #5418 --- Issue: https://github.com/tarantool/tarantool/issues/5418 Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-5418-qsync-with-anon-replicas src/box/relay.cc | 3 +- test/replication-luatest/gh_5418_test.lua | 73 +++++++ .../instance_files/master.lua | 18 ++ .../instance_files/replica.lua | 18 ++ test/replication-luatest/suite.ini | 3 + test/replication/qsync_with_anon.result | 187 ++---------------- test/replication/qsync_with_anon.test.lua | 83 ++------ 7 files changed, 148 insertions(+), 237 deletions(-) create mode 100644 test/replication-luatest/gh_5418_test.lua create mode 100755 test/replication-luatest/instance_files/master.lua create mode 100755 test/replication-luatest/instance_files/replica.lua create mode 100644 test/replication-luatest/suite.ini diff --git a/src/box/relay.cc b/src/box/relay.cc index 115037fc3..4d5f9a625 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -515,6 +515,7 @@ tx_status_update(struct cmsg *msg) struct replication_ack ack; ack.source = status->relay->replica->id; ack.vclock = &status->vclock; + bool anon = status->relay->replica->anon; /* * Let pending synchronous transactions know, which of * them were successfully sent to the replica. Acks are @@ -522,7 +523,7 @@ tx_status_update(struct cmsg *msg) * the single master in 100% so far). Other instances wait * for master's CONFIRM message instead. */ - if (txn_limbo.owner_id == instance_id) { + if (txn_limbo.owner_id == instance_id && !anon) { txn_limbo_ack(&txn_limbo, ack.source, vclock_get(ack.vclock, instance_id)); } diff --git a/test/replication-luatest/gh_5418_test.lua b/test/replication-luatest/gh_5418_test.lua new file mode 100644 index 000000000..b20b4c577 --- /dev/null +++ b/test/replication-luatest/gh_5418_test.lua @@ -0,0 +1,73 @@ +local t = require('luatest') +local log = require('log') + + +local g = t.group() +local fio = require('fio') + +local Server = t.Server + +g.before_all(function() + g.master = Server:new({ + alias = 'master', + command = './test/replication-luatest/instance_files/master.lua', + workdir = fio.tempdir(), + http_port = 8081, + net_box_port = 13301, + }) + + g.replica = Server:new({ + alias = 'replica', + command = './test/replication-luatest/instance_files/replica.lua', + workdir = fio.tempdir(), + http_port = 8082, + net_box_port = 13302, + }) + + + g.master:start() + g.replica:start() + + t.helpers.retrying({}, function() g.master:connect_net_box() end) + t.helpers.retrying({}, function() g.replica:connect_net_box() end) + + log.info('Everything is started') +end) + + +g.after_all(function() + g.replica:stop() + g.master:stop() + fio.rmtree(g.master.workdir) + fio.rmtree(g.replica.workdir) +end) + + +local function wait_vclock() + lsn = g.master:eval("return box.info.vclock[1]") + _, tbl = g.master:eval("return next(box.info.replication_anon())") + to_lsn = tbl.downstream.vclock[1] + while to_lsn == nil or to_lsn < lsn do + require('fiber').sleep(0.001) + _, tbl = g.master:eval("return next(box.info.replication_anon())") + to_lsn = tbl.downstream.vclock[1] + log.info(string.format("master lsn: %d; replica_anon lsn: %d", + lsn, to_lsn)) + end + return +end + + +g.test_qsync_with_anon = function() + g.master:eval("box.schema.space.create('sync', {is_sync = true})") + g.master:eval("box.space.sync:create_index('pk')") + + t.assert_error_msg_content_equals("Quorum collection for a synchronous transaction is timed out", + function() g.master:eval("return box.space.sync:insert{1}") end) + + -- Wait until everything is replicated from the master to the replica + wait_vclock() + + t.assert_equals(g.master:eval("return box.space.sync:select()"), {}) + t.assert_equals(g.replica:eval("return box.space.sync:select()"), {}) +end diff --git a/test/replication-luatest/instance_files/master.lua b/test/replication-luatest/instance_files/master.lua new file mode 100755 index 000000000..691ac696f --- /dev/null +++ b/test/replication-luatest/instance_files/master.lua @@ -0,0 +1,18 @@ +#!/usr/bin/env tarantool + +local function instance_uri(instance_id) + return 'localhost:'..(13300 + instance_id) +end + +box.cfg({ + --log_level = 7, + work_dir = os.getenv('TARANTOOL_WORKDIR'), + listen = os.getenv('TARANTOOL_LISTEN'), + replication = {instance_uri(1)}, + memtx_memory = 107374182, + replication_synchro_quorum = 2, + replication_timeout = 0.1 +}) + +box.schema.user.grant('guest', 'read, write, execute, create', 'universe', nil, {if_not_exists=true}) +require('log').warn("master is ready") diff --git a/test/replication-luatest/instance_files/replica.lua b/test/replication-luatest/instance_files/replica.lua new file mode 100755 index 000000000..e3410d0ac --- /dev/null +++ b/test/replication-luatest/instance_files/replica.lua @@ -0,0 +1,18 @@ +#!/usr/bin/env tarantool + +local function instance_uri(instance_id) + return 'localhost:'..(13300 + instance_id) +end + +box.cfg({ + work_dir = os.getenv('TARANTOOL_WORKDIR'), + listen = os.getenv('TARANTOOL_LISTEN'), + replication = {instance_uri(1), instance_uri(2)}, + memtx_memory = 107374182, + replication_timeout = 0.1, + replication_connect_timeout = 0.5, + read_only = true, + replication_anon = true +}) + +require('log').warn("replica is ready") diff --git a/test/replication-luatest/suite.ini b/test/replication-luatest/suite.ini new file mode 100644 index 000000000..ccd624099 --- /dev/null +++ b/test/replication-luatest/suite.ini @@ -0,0 +1,3 @@ +[default] +core = luatest +description = first try of using luatest diff --git a/test/replication/qsync_with_anon.result b/test/replication/qsync_with_anon.result index 6a2952a32..d847a77aa 100644 --- a/test/replication/qsync_with_anon.result +++ b/test/replication/qsync_with_anon.result @@ -1,195 +1,57 @@ -- test-run result file version 2 -env = require('test_run') - | --- - | ... -test_run = env.new() - | --- - | ... -engine = test_run:get_cfg('engine') - | --- - | ... - -orig_synchro_quorum = box.cfg.replication_synchro_quorum - | --- - | ... -orig_synchro_timeout = box.cfg.replication_synchro_timeout +test_run = require('test_run').new() | --- | ... NUM_INSTANCES = 2 | --- | ... -BROKEN_QUORUM = NUM_INSTANCES + 1 - | --- - | ... box.schema.user.grant('guest', 'replication') | --- | ... - --- Setup a cluster with anonymous replica. -test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon1.lua"') - | --- - | - true - | ... -test_run:cmd('start server replica_anon') - | --- - | - true - | ... -test_run:cmd('switch replica_anon') - | --- - | - true - | ... - --- [RFC, Asynchronous replication] successful transaction applied on async --- replica. --- Testcase setup. -test_run:switch('default') - | --- - | - true - | ... -box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000} - | --- - | ... -_ = box.schema.space.create('sync', {is_sync=true, engine=engine}) - | --- - | ... -_ = box.space.sync:create_index('pk') - | --- - | ... --- Testcase body. -test_run:switch('default') - | --- - | - true - | ... -box.space.sync:insert{1} -- success - | --- - | - [1] - | ... -box.space.sync:insert{2} -- success - | --- - | - [2] - | ... -box.space.sync:insert{3} -- success - | --- - | - [3] - | ... -test_run:cmd('switch replica_anon') - | --- - | - true - | ... -box.space.sync:select{} -- 1, 2, 3 - | --- - | - - [1] - | - [2] - | - [3] - | ... --- Testcase cleanup. -test_run:switch('default') - | --- - | - true - | ... -box.space.sync:drop() - | --- - | ... - --- [RFC, Asynchronous replication] failed transaction rolled back on async --- replica. --- Testcase setup. -box.cfg{replication_synchro_quorum = NUM_INSTANCES, replication_synchro_timeout = 1000} +box.cfg{replication_synchro_quorum=NUM_INSTANCES} | --- | ... -_ = box.schema.space.create('sync', {is_sync=true, engine=engine}) +_ = box.schema.space.create('sync', {is_sync=true}) | --- | ... _ = box.space.sync:create_index('pk') | --- | ... --- Write something to flush the current master's state to replica. -_ = box.space.sync:insert{1} - | --- - | ... -_ = box.space.sync:delete{1} - | --- - | ... -box.cfg{replication_synchro_quorum = BROKEN_QUORUM, replication_synchro_timeout = 1000} - | --- - | ... -fiber = require('fiber') - | --- - | ... -ok, err = nil - | --- - | ... -f = fiber.create(function() \ - ok, err = pcall(box.space.sync.insert, box.space.sync, {1}) \ -end) - | --- - | ... -test_run:cmd('switch replica_anon') +-- Setup a cluster with anonymous replica +test_run:cmd('create server replica_anon with rpl_master=default,\ + script="replication/anon1.lua"') | --- | - true | ... -test_run:wait_cond(function() return box.space.sync:count() == 1 end) +test_run:cmd('start server replica_anon') | --- | - true | ... -box.space.sync:select{} - | --- - | - - [1] - | ... -test_run:switch('default') - | --- - | - true - | ... -box.cfg{replication_synchro_timeout = 0.001} - | --- - | ... -test_run:wait_cond(function() return f:status() == 'dead' end) + +-- Testcase +box.space.sync:insert{1} -- error | --- - | - true + | - error: Quorum collection for a synchronous transaction is timed out | ... -box.space.sync:select{} +box.space.sync:insert{3} -- error | --- - | - [] + | - error: Quorum collection for a synchronous transaction is timed out | ... - test_run:cmd('switch replica_anon') | --- | - true | ... -test_run:wait_cond(function() return box.space.sync:count() == 0 end) - | --- - | - true - | ... -box.space.sync:select{} +box.space.sync:select{} -- [] | --- | - [] | ... -test_run:switch('default') - | --- - | - true - | ... -box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000} - | --- - | ... -box.space.sync:insert{1} -- success - | --- - | - [1] - | ... -test_run:cmd('switch replica_anon') - | --- - | - true - | ... -box.space.sync:select{} -- 1 - | --- - | - - [1] - | ... --- Testcase cleanup. +-- Testcase cleanup test_run:switch('default') | --- | - true @@ -198,12 +60,13 @@ box.space.sync:drop() | --- | ... --- Teardown. -test_run:switch('default') + +-- Teardown +test_run:cmd('stop server replica_anon') | --- | - true | ... -test_run:cmd('stop server replica_anon') +test_run:cmd('cleanup server replica_anon') | --- | - true | ... @@ -211,15 +74,3 @@ test_run:cmd('delete server replica_anon') | --- | - true | ... -box.schema.user.revoke('guest', 'replication') - | --- - | ... -box.cfg{ \ - replication_synchro_quorum = orig_synchro_quorum, \ - replication_synchro_timeout = orig_synchro_timeout, \ -} - | --- - | ... -test_run:cleanup_cluster() - | --- - | ... diff --git a/test/replication/qsync_with_anon.test.lua b/test/replication/qsync_with_anon.test.lua index d7ecaa107..2d92f08aa 100644 --- a/test/replication/qsync_with_anon.test.lua +++ b/test/replication/qsync_with_anon.test.lua @@ -1,84 +1,31 @@ -env = require('test_run') -test_run = env.new() -engine = test_run:get_cfg('engine') - -orig_synchro_quorum = box.cfg.replication_synchro_quorum -orig_synchro_timeout = box.cfg.replication_synchro_timeout +test_run = require('test_run').new() NUM_INSTANCES = 2 -BROKEN_QUORUM = NUM_INSTANCES + 1 box.schema.user.grant('guest', 'replication') - --- Setup a cluster with anonymous replica. -test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon1.lua"') -test_run:cmd('start server replica_anon') -test_run:cmd('switch replica_anon') - --- [RFC, Asynchronous replication] successful transaction applied on async --- replica. --- Testcase setup. -test_run:switch('default') -box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000} -_ = box.schema.space.create('sync', {is_sync=true, engine=engine}) +box.cfg{replication_synchro_quorum=NUM_INSTANCES} +_ = box.schema.space.create('sync', {is_sync=true}) _ = box.space.sync:create_index('pk') --- Testcase body. -test_run:switch('default') -box.space.sync:insert{1} -- success -box.space.sync:insert{2} -- success -box.space.sync:insert{3} -- success -test_run:cmd('switch replica_anon') -box.space.sync:select{} -- 1, 2, 3 --- Testcase cleanup. -test_run:switch('default') -box.space.sync:drop() --- [RFC, Asynchronous replication] failed transaction rolled back on async --- replica. --- Testcase setup. -box.cfg{replication_synchro_quorum = NUM_INSTANCES, replication_synchro_timeout = 1000} -_ = box.schema.space.create('sync', {is_sync=true, engine=engine}) -_ = box.space.sync:create_index('pk') --- Write something to flush the current master's state to replica. -_ = box.space.sync:insert{1} -_ = box.space.sync:delete{1} - -box.cfg{replication_synchro_quorum = BROKEN_QUORUM, replication_synchro_timeout = 1000} -fiber = require('fiber') -ok, err = nil -f = fiber.create(function() \ - ok, err = pcall(box.space.sync.insert, box.space.sync, {1}) \ -end) -test_run:cmd('switch replica_anon') -test_run:wait_cond(function() return box.space.sync:count() == 1 end) -box.space.sync:select{} +-- Setup a cluster with anonymous replica +test_run:cmd('create server replica_anon with rpl_master=default,\ + script="replication/anon1.lua"') +test_run:cmd('start server replica_anon') -test_run:switch('default') -box.cfg{replication_synchro_timeout = 0.001} -test_run:wait_cond(function() return f:status() == 'dead' end) -box.space.sync:select{} +-- Testcase +box.space.sync:insert{1} -- error +box.space.sync:insert{3} -- error test_run:cmd('switch replica_anon') -test_run:wait_cond(function() return box.space.sync:count() == 0 end) -box.space.sync:select{} +box.space.sync:select{} -- [] -test_run:switch('default') -box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000} -box.space.sync:insert{1} -- success -test_run:cmd('switch replica_anon') -box.space.sync:select{} -- 1 --- Testcase cleanup. +-- Testcase cleanup test_run:switch('default') box.space.sync:drop() --- Teardown. -test_run:switch('default') + +-- Teardown test_run:cmd('stop server replica_anon') +test_run:cmd('cleanup server replica_anon') test_run:cmd('delete server replica_anon') -box.schema.user.revoke('guest', 'replication') -box.cfg{ \ - replication_synchro_quorum = orig_synchro_quorum, \ - replication_synchro_timeout = orig_synchro_timeout, \ -} -test_run:cleanup_cluster() -- 2.25.1
next reply other threads:[~2021-09-17 18:11 UTC|newest] Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-09-17 18:10 Yan Shtunder via Tarantool-patches [this message] -- strict thread matches above, loose matches on Subject: below -- 2021-09-21 10:00 Yan Shtunder via Tarantool-patches 2021-09-27 6:20 ` Serge Petrenko via Tarantool-patches 2021-09-20 19:59 Yan Shtunder via Tarantool-patches 2021-09-06 11:49 Yan Shtunder via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20210917181058.47687-1-ya.shtunder@gmail.com \ --to=tarantool-patches@dev.tarantool.org \ --cc=sergepetrenko@tarantool.org \ --cc=shemeneval@gmail.com \ --cc=ya.shtunder@gmail.com \ --subject='Re: [Tarantool-patches] [PATCH] replication: removing anonymous replicas from synchro quorum' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox