[Tarantool-patches] [PATCH] replication: removing anonymous replicas from synchro quorum

Yan Shtunder ya.shtunder at gmail.com
Fri Sep 17 21:10:58 MSK 2021


Transactions have to committed after they reaches quorum of "real"
cluster members. Therefore, anonymous replicas don't have to
participate in the quorum.

Closes #5418
---
Issue: https://github.com/tarantool/tarantool/issues/5418
Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-5418-qsync-with-anon-replicas

 src/box/relay.cc                              |   3 +-
 test/replication-luatest/gh_5418_test.lua     |  73 +++++++
 .../instance_files/master.lua                 |  18 ++
 .../instance_files/replica.lua                |  18 ++
 test/replication-luatest/suite.ini            |   3 +
 test/replication/qsync_with_anon.result       | 187 ++----------------
 test/replication/qsync_with_anon.test.lua     |  83 ++------
 7 files changed, 148 insertions(+), 237 deletions(-)
 create mode 100644 test/replication-luatest/gh_5418_test.lua
 create mode 100755 test/replication-luatest/instance_files/master.lua
 create mode 100755 test/replication-luatest/instance_files/replica.lua
 create mode 100644 test/replication-luatest/suite.ini

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 115037fc3..4d5f9a625 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -515,6 +515,7 @@ tx_status_update(struct cmsg *msg)
 	struct replication_ack ack;
 	ack.source = status->relay->replica->id;
 	ack.vclock = &status->vclock;
+	bool anon = status->relay->replica->anon;
 	/*
 	 * Let pending synchronous transactions know, which of
 	 * them were successfully sent to the replica. Acks are
@@ -522,7 +523,7 @@ tx_status_update(struct cmsg *msg)
 	 * the single master in 100% so far). Other instances wait
 	 * for master's CONFIRM message instead.
 	 */
-	if (txn_limbo.owner_id == instance_id) {
+	if (txn_limbo.owner_id == instance_id && !anon) {
 		txn_limbo_ack(&txn_limbo, ack.source,
 			      vclock_get(ack.vclock, instance_id));
 	}
diff --git a/test/replication-luatest/gh_5418_test.lua b/test/replication-luatest/gh_5418_test.lua
new file mode 100644
index 000000000..b20b4c577
--- /dev/null
+++ b/test/replication-luatest/gh_5418_test.lua
@@ -0,0 +1,73 @@
+local t = require('luatest')
+local log = require('log')
+
+
+local g = t.group()
+local fio = require('fio')
+
+local Server = t.Server
+
+g.before_all(function()
+    g.master = Server:new({
+        alias = 'master',
+        command = './test/replication-luatest/instance_files/master.lua',
+        workdir = fio.tempdir(),
+        http_port = 8081,
+        net_box_port = 13301,
+    })
+
+    g.replica = Server:new({
+        alias = 'replica',
+        command = './test/replication-luatest/instance_files/replica.lua',
+        workdir = fio.tempdir(),
+        http_port = 8082,
+        net_box_port = 13302,
+    })
+
+
+    g.master:start()
+    g.replica:start()
+
+    t.helpers.retrying({}, function() g.master:connect_net_box() end)
+    t.helpers.retrying({}, function() g.replica:connect_net_box() end)
+
+    log.info('Everything is started')
+end)
+
+
+g.after_all(function()
+    g.replica:stop()
+    g.master:stop()
+    fio.rmtree(g.master.workdir)
+    fio.rmtree(g.replica.workdir)
+end)
+
+
+local function wait_vclock()
+    lsn = g.master:eval("return box.info.vclock[1]")
+    _, tbl = g.master:eval("return next(box.info.replication_anon())")
+    to_lsn = tbl.downstream.vclock[1]
+    while to_lsn == nil or to_lsn < lsn do
+        require('fiber').sleep(0.001)
+        _, tbl = g.master:eval("return next(box.info.replication_anon())")
+        to_lsn = tbl.downstream.vclock[1]
+        log.info(string.format("master lsn: %d; replica_anon lsn: %d",
+            lsn, to_lsn))
+    end
+    return
+end
+
+
+g.test_qsync_with_anon = function()
+    g.master:eval("box.schema.space.create('sync', {is_sync = true})")
+    g.master:eval("box.space.sync:create_index('pk')")
+
+    t.assert_error_msg_content_equals("Quorum collection for a synchronous transaction is timed out",
+        function() g.master:eval("return box.space.sync:insert{1}") end)
+
+    -- Wait until everything is replicated from the master to the replica
+    wait_vclock()
+
+    t.assert_equals(g.master:eval("return box.space.sync:select()"), {})
+    t.assert_equals(g.replica:eval("return box.space.sync:select()"), {})
+end
diff --git a/test/replication-luatest/instance_files/master.lua b/test/replication-luatest/instance_files/master.lua
new file mode 100755
index 000000000..691ac696f
--- /dev/null
+++ b/test/replication-luatest/instance_files/master.lua
@@ -0,0 +1,18 @@
+#!/usr/bin/env tarantool
+
+local function instance_uri(instance_id)
+    return 'localhost:'..(13300 + instance_id)
+end
+
+box.cfg({
+    --log_level         = 7,
+    work_dir            = os.getenv('TARANTOOL_WORKDIR'),
+    listen              = os.getenv('TARANTOOL_LISTEN'),
+    replication         = {instance_uri(1)},
+    memtx_memory        = 107374182,
+    replication_synchro_quorum = 2,
+    replication_timeout = 0.1
+})
+
+box.schema.user.grant('guest', 'read, write, execute, create', 'universe', nil, {if_not_exists=true})
+require('log').warn("master is ready")
diff --git a/test/replication-luatest/instance_files/replica.lua b/test/replication-luatest/instance_files/replica.lua
new file mode 100755
index 000000000..e3410d0ac
--- /dev/null
+++ b/test/replication-luatest/instance_files/replica.lua
@@ -0,0 +1,18 @@
+#!/usr/bin/env tarantool
+
+local function instance_uri(instance_id)
+    return 'localhost:'..(13300 + instance_id)
+end
+
+box.cfg({
+    work_dir            = os.getenv('TARANTOOL_WORKDIR'),
+    listen              = os.getenv('TARANTOOL_LISTEN'),
+    replication         = {instance_uri(1), instance_uri(2)},
+    memtx_memory        = 107374182,
+    replication_timeout = 0.1,
+    replication_connect_timeout = 0.5,
+    read_only           = true,
+    replication_anon    = true
+})
+
+require('log').warn("replica is ready")
diff --git a/test/replication-luatest/suite.ini b/test/replication-luatest/suite.ini
new file mode 100644
index 000000000..ccd624099
--- /dev/null
+++ b/test/replication-luatest/suite.ini
@@ -0,0 +1,3 @@
+[default]
+core = luatest
+description = first try of using luatest
diff --git a/test/replication/qsync_with_anon.result b/test/replication/qsync_with_anon.result
index 6a2952a32..d847a77aa 100644
--- a/test/replication/qsync_with_anon.result
+++ b/test/replication/qsync_with_anon.result
@@ -1,195 +1,57 @@
 -- test-run result file version 2
-env = require('test_run')
- | ---
- | ...
-test_run = env.new()
- | ---
- | ...
-engine = test_run:get_cfg('engine')
- | ---
- | ...
-
-orig_synchro_quorum = box.cfg.replication_synchro_quorum
- | ---
- | ...
-orig_synchro_timeout = box.cfg.replication_synchro_timeout
+test_run = require('test_run').new()
  | ---
  | ...

 NUM_INSTANCES = 2
  | ---
  | ...
-BROKEN_QUORUM = NUM_INSTANCES + 1
- | ---
- | ...

 box.schema.user.grant('guest', 'replication')
  | ---
  | ...
-
--- Setup a cluster with anonymous replica.
-test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon1.lua"')
- | ---
- | - true
- | ...
-test_run:cmd('start server replica_anon')
- | ---
- | - true
- | ...
-test_run:cmd('switch replica_anon')
- | ---
- | - true
- | ...
-
--- [RFC, Asynchronous replication] successful transaction applied on async
--- replica.
--- Testcase setup.
-test_run:switch('default')
- | ---
- | - true
- | ...
-box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000}
- | ---
- | ...
-_ = box.schema.space.create('sync', {is_sync=true, engine=engine})
- | ---
- | ...
-_ = box.space.sync:create_index('pk')
- | ---
- | ...
--- Testcase body.
-test_run:switch('default')
- | ---
- | - true
- | ...
-box.space.sync:insert{1} -- success
- | ---
- | - [1]
- | ...
-box.space.sync:insert{2} -- success
- | ---
- | - [2]
- | ...
-box.space.sync:insert{3} -- success
- | ---
- | - [3]
- | ...
-test_run:cmd('switch replica_anon')
- | ---
- | - true
- | ...
-box.space.sync:select{} -- 1, 2, 3
- | ---
- | - - [1]
- |   - [2]
- |   - [3]
- | ...
--- Testcase cleanup.
-test_run:switch('default')
- | ---
- | - true
- | ...
-box.space.sync:drop()
- | ---
- | ...
-
--- [RFC, Asynchronous replication] failed transaction rolled back on async
--- replica.
--- Testcase setup.
-box.cfg{replication_synchro_quorum = NUM_INSTANCES, replication_synchro_timeout = 1000}
+box.cfg{replication_synchro_quorum=NUM_INSTANCES}
  | ---
  | ...
-_ = box.schema.space.create('sync', {is_sync=true, engine=engine})
+_ = box.schema.space.create('sync', {is_sync=true})
  | ---
  | ...
 _ = box.space.sync:create_index('pk')
  | ---
  | ...
--- Write something to flush the current master's state to replica.
-_ = box.space.sync:insert{1}
- | ---
- | ...
-_ = box.space.sync:delete{1}
- | ---
- | ...

-box.cfg{replication_synchro_quorum = BROKEN_QUORUM, replication_synchro_timeout = 1000}
- | ---
- | ...
-fiber = require('fiber')
- | ---
- | ...
-ok, err = nil
- | ---
- | ...
-f = fiber.create(function()                                                     \
-    ok, err = pcall(box.space.sync.insert, box.space.sync, {1})                 \
-end)
- | ---
- | ...

-test_run:cmd('switch replica_anon')
+-- Setup a cluster with anonymous replica
+test_run:cmd('create server replica_anon with rpl_master=default,\
+              script="replication/anon1.lua"')
  | ---
  | - true
  | ...
-test_run:wait_cond(function() return box.space.sync:count() == 1 end)
+test_run:cmd('start server replica_anon')
  | ---
  | - true
  | ...
-box.space.sync:select{}
- | ---
- | - - [1]
- | ...

-test_run:switch('default')
- | ---
- | - true
- | ...
-box.cfg{replication_synchro_timeout = 0.001}
- | ---
- | ...
-test_run:wait_cond(function() return f:status() == 'dead' end)
+
+-- Testcase
+box.space.sync:insert{1}    -- error
  | ---
- | - true
+ | - error: Quorum collection for a synchronous transaction is timed out
  | ...
-box.space.sync:select{}
+box.space.sync:insert{3}    -- error
  | ---
- | - []
+ | - error: Quorum collection for a synchronous transaction is timed out
  | ...
-
 test_run:cmd('switch replica_anon')
  | ---
  | - true
  | ...
-test_run:wait_cond(function() return box.space.sync:count() == 0 end)
- | ---
- | - true
- | ...
-box.space.sync:select{}
+box.space.sync:select{}     -- []
  | ---
  | - []
  | ...

-test_run:switch('default')
- | ---
- | - true
- | ...
-box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000}
- | ---
- | ...
-box.space.sync:insert{1} -- success
- | ---
- | - [1]
- | ...
-test_run:cmd('switch replica_anon')
- | ---
- | - true
- | ...
-box.space.sync:select{} -- 1
- | ---
- | - - [1]
- | ...
--- Testcase cleanup.
+-- Testcase cleanup
 test_run:switch('default')
  | ---
  | - true
@@ -198,12 +60,13 @@ box.space.sync:drop()
  | ---
  | ...

--- Teardown.
-test_run:switch('default')
+
+-- Teardown
+test_run:cmd('stop server replica_anon')
  | ---
  | - true
  | ...
-test_run:cmd('stop server replica_anon')
+test_run:cmd('cleanup server replica_anon')
  | ---
  | - true
  | ...
@@ -211,15 +74,3 @@ test_run:cmd('delete server replica_anon')
  | ---
  | - true
  | ...
-box.schema.user.revoke('guest', 'replication')
- | ---
- | ...
-box.cfg{                                                                        \
-    replication_synchro_quorum = orig_synchro_quorum,                           \
-    replication_synchro_timeout = orig_synchro_timeout,                         \
-}
- | ---
- | ...
-test_run:cleanup_cluster()
- | ---
- | ...
diff --git a/test/replication/qsync_with_anon.test.lua b/test/replication/qsync_with_anon.test.lua
index d7ecaa107..2d92f08aa 100644
--- a/test/replication/qsync_with_anon.test.lua
+++ b/test/replication/qsync_with_anon.test.lua
@@ -1,84 +1,31 @@
-env = require('test_run')
-test_run = env.new()
-engine = test_run:get_cfg('engine')
-
-orig_synchro_quorum = box.cfg.replication_synchro_quorum
-orig_synchro_timeout = box.cfg.replication_synchro_timeout
+test_run = require('test_run').new()

 NUM_INSTANCES = 2
-BROKEN_QUORUM = NUM_INSTANCES + 1

 box.schema.user.grant('guest', 'replication')
-
--- Setup a cluster with anonymous replica.
-test_run:cmd('create server replica_anon with rpl_master=default, script="replication/anon1.lua"')
-test_run:cmd('start server replica_anon')
-test_run:cmd('switch replica_anon')
-
--- [RFC, Asynchronous replication] successful transaction applied on async
--- replica.
--- Testcase setup.
-test_run:switch('default')
-box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000}
-_ = box.schema.space.create('sync', {is_sync=true, engine=engine})
+box.cfg{replication_synchro_quorum=NUM_INSTANCES}
+_ = box.schema.space.create('sync', {is_sync=true})
 _ = box.space.sync:create_index('pk')
--- Testcase body.
-test_run:switch('default')
-box.space.sync:insert{1} -- success
-box.space.sync:insert{2} -- success
-box.space.sync:insert{3} -- success
-test_run:cmd('switch replica_anon')
-box.space.sync:select{} -- 1, 2, 3
--- Testcase cleanup.
-test_run:switch('default')
-box.space.sync:drop()

--- [RFC, Asynchronous replication] failed transaction rolled back on async
--- replica.
--- Testcase setup.
-box.cfg{replication_synchro_quorum = NUM_INSTANCES, replication_synchro_timeout = 1000}
-_ = box.schema.space.create('sync', {is_sync=true, engine=engine})
-_ = box.space.sync:create_index('pk')
--- Write something to flush the current master's state to replica.
-_ = box.space.sync:insert{1}
-_ = box.space.sync:delete{1}
-
-box.cfg{replication_synchro_quorum = BROKEN_QUORUM, replication_synchro_timeout = 1000}
-fiber = require('fiber')
-ok, err = nil
-f = fiber.create(function()                                                     \
-    ok, err = pcall(box.space.sync.insert, box.space.sync, {1})                 \
-end)

-test_run:cmd('switch replica_anon')
-test_run:wait_cond(function() return box.space.sync:count() == 1 end)
-box.space.sync:select{}
+-- Setup a cluster with anonymous replica
+test_run:cmd('create server replica_anon with rpl_master=default,\
+              script="replication/anon1.lua"')
+test_run:cmd('start server replica_anon')

-test_run:switch('default')
-box.cfg{replication_synchro_timeout = 0.001}
-test_run:wait_cond(function() return f:status() == 'dead' end)
-box.space.sync:select{}

+-- Testcase
+box.space.sync:insert{1}    -- error
+box.space.sync:insert{3}    -- error
 test_run:cmd('switch replica_anon')
-test_run:wait_cond(function() return box.space.sync:count() == 0 end)
-box.space.sync:select{}
+box.space.sync:select{}     -- []

-test_run:switch('default')
-box.cfg{replication_synchro_quorum=NUM_INSTANCES, replication_synchro_timeout=1000}
-box.space.sync:insert{1} -- success
-test_run:cmd('switch replica_anon')
-box.space.sync:select{} -- 1
--- Testcase cleanup.
+-- Testcase cleanup
 test_run:switch('default')
 box.space.sync:drop()

--- Teardown.
-test_run:switch('default')
+
+-- Teardown
 test_run:cmd('stop server replica_anon')
+test_run:cmd('cleanup server replica_anon')
 test_run:cmd('delete server replica_anon')
-box.schema.user.revoke('guest', 'replication')
-box.cfg{                                                                        \
-    replication_synchro_quorum = orig_synchro_quorum,                           \
-    replication_synchro_timeout = orig_synchro_timeout,                         \
-}
-test_run:cleanup_cluster()
--
2.25.1



More information about the Tarantool-patches mailing list