* [tarantool-patches] [replication 1/1] replication: Add script for prune dead replicas
@ 2018-06-04 18:26 Ilya Markov
0 siblings, 0 replies; only message in thread
From: Ilya Markov @ 2018-06-04 18:26 UTC (permalink / raw)
To: georgy; +Cc: tarantool-patches
Introduce box.replication.get_alive_replicas and
box.replication.prune_dead_replicas methods.
The first method returns a table of replicas which are alive
within the specified time period.
The second method deletes all replicas which are not in table of alives
from box.space._cluster.
The alive replica in period is the one which has upstream.status or
downstream.status not equal to "stopped" or "disconnected" or the one
that was added during specified period.
Add test-run filter to some tests in engine package.
Closes #3110
---
branch: gh-3110-prune-dead-replicas
src/box/lua/schema.lua | 60 +++++
test/box/misc.result | 1 +
test/engine/iterator.result | 6 +-
test/engine/iterator.test.lua | 2 +-
test/engine/savepoint.result | 16 +-
test/engine/savepoint.test.lua | 1 +
test/replication/prune_dead_replicas.result | 329 ++++++++++++++++++++++++++
test/replication/prune_dead_replicas.test.lua | 102 ++++++++
test/replication/suite.cfg | 1 +
9 files changed, 510 insertions(+), 8 deletions(-)
create mode 100644 test/replication/prune_dead_replicas.result
create mode 100644 test/replication/prune_dead_replicas.test.lua
diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index 0b7849f..9662036 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -7,6 +7,7 @@ local fun = require('fun')
local log = require('log')
local fio = require('fio')
local json = require('json')
+local fiber = require('fiber')
local session = box.session
local internal = require('box.internal')
local function setmap(table)
@@ -2308,4 +2309,63 @@ box.feedback.save = function(file_name)
fh:close()
end
+box.replication = {}
+
+local function replica_is_alive(replica_info)
+ if replica_info ~= nil and replica_info.uuid == box.info.uuid then
+ -- current replica is assumed to be alive.
+ return true
+ end
+ if replica_info == nil or
+ (replica_info.downstream == nil and replica_info.upstream == nil) then
+ return false
+ end
+ if replica_info.downstream ~= nil then
+ return replica_info.downstream.status ~= "disconnected" and
+ replica_info.downstream.status ~= "stopped"
+ else
+ -- upstream ~= nil
+ return replica_info.upstream.status ~= "disconnected" and
+ replica_info.upstream.status ~= "stopped"
+ end
+end
+
+box.replication.get_alive_replicas = function(timeout)
+ if timeout ~= nil then
+ if type(timeout) ~= 'number' then
+ error('Usage: box.replication.get_alive_replicas([timeout])')
+ end
+ end
+ local res = {}
+ local info_old = box.info.replication
+ local info_new = box.info.replication
+ timeout = timeout or 0.5
+ fiber.sleep(timeout)
+ info_new = box.info.replication
+ for i, new_value in pairs(info_new) do
+ local old_value = info_old[i]
+ if old_value == nil or old_value.uuid ~= new_value.uuid then
+ -- Replica was added during waiting period. We can't compare it with previous status.
+ -- We should assume it alive despite its status.
+ -- UUID wouldn't match only if old_replica was deleted and new replica was added at this time.
+ -- If the old replica was recovered with new id, we assume it alive too.
+ res[new_value.uuid] = i
+ elseif replica_is_alive(old_value) or replica_is_alive(new_value) then
+ res[new_value.uuid]= i
+ end
+ end
+ return res
+end
+
+box.replication.prune_dead_replicas = function(alive_replicas)
+ if type(alive_replicas) ~= 'table' then
+ error("Usage: box.replication.prune_dead_replicas(alive_replicas)")
+ end
+ for _, tuple in box.space._cluster:pairs() do
+ if alive_replicas[tuple[2]] == nil then
+ box.space._cluster.index.uuid:delete{tuple[2]}
+ end
+ end
+end
+
box.NULL = msgpack.NULL
diff --git a/test/box/misc.result b/test/box/misc.result
index 8f94f55..52e0044 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -68,6 +68,7 @@ t
- info
- internal
- once
+ - replication
- rollback
- rollback_to_savepoint
- runtime
diff --git a/test/engine/iterator.result b/test/engine/iterator.result
index 423ed0b..47e86ed 100644
--- a/test/engine/iterator.result
+++ b/test/engine/iterator.result
@@ -3812,6 +3812,10 @@ inspector:cmd("clear filter")
---
- true
...
+inspector:cmd("push filter '(error: .builtin/.*[.]lua):[0-9]+' to '\\1'")
+---
+- true
+...
--
-- gh-1875 Add support for index:pairs(key, iterator-type) syntax
--
@@ -4211,7 +4215,7 @@ s:replace{35}
...
state, value = gen(param,state)
---
-- error: 'builtin/box/schema.lua:985: usage: next(param, state)'
+- error: 'builtin/box/schema.lua: usage: next(param, state)'
...
value
---
diff --git a/test/engine/iterator.test.lua b/test/engine/iterator.test.lua
index 905173a..c518010 100644
--- a/test/engine/iterator.test.lua
+++ b/test/engine/iterator.test.lua
@@ -257,7 +257,7 @@ space:drop()
inspector:cmd("clear filter")
-
+inspector:cmd("push filter '(error: .builtin/.*[.]lua):[0-9]+' to '\\1'")
--
-- gh-1875 Add support for index:pairs(key, iterator-type) syntax
--
diff --git a/test/engine/savepoint.result b/test/engine/savepoint.result
index d440efa..76a0aed 100644
--- a/test/engine/savepoint.result
+++ b/test/engine/savepoint.result
@@ -4,6 +4,10 @@ env = require('test_run')
test_run = env.new()
---
...
+test_run:cmd("push filter '(.builtin/.*[.]lua):[0-9]+' to '\\1'")
+---
+- true
+...
-- gh-2025 box.savepoint
s1 = nil
---
@@ -14,7 +18,7 @@ s1 = box.savepoint()
...
box.rollback_to_savepoint(s1)
---
-- error: 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- error: 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
...
box.begin() s1 = box.savepoint()
---
@@ -323,27 +327,27 @@ test_run:cmd("setopt delimiter ''");
ok1, errmsg1
---
- false
-- 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
...
ok2, errmsg2
---
- false
-- 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
...
ok3, errmsg3
---
- false
-- 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
...
ok4, errmsg4
---
- false
-- 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
...
ok5, errmsg5
---
- false
-- 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
...
s:select{}
---
diff --git a/test/engine/savepoint.test.lua b/test/engine/savepoint.test.lua
index de8f297..28a88d0 100644
--- a/test/engine/savepoint.test.lua
+++ b/test/engine/savepoint.test.lua
@@ -1,6 +1,7 @@
env = require('test_run')
test_run = env.new()
+test_run:cmd("push filter '(.builtin/.*[.]lua):[0-9]+' to '\\1'")
-- gh-2025 box.savepoint
s1 = nil
diff --git a/test/replication/prune_dead_replicas.result b/test/replication/prune_dead_replicas.result
new file mode 100644
index 0000000..831ac88
--- /dev/null
+++ b/test/replication/prune_dead_replicas.result
@@ -0,0 +1,329 @@
+test_run = require('test_run').new()
+---
+...
+replica_set = require('fast_replica')
+---
+...
+fiber = require('fiber')
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+test_run:cmd("push filter 'lsn: [0-9]+' to 'lsn: <lsn>'")
+---
+- true
+...
+test_run:cmd("push filter 'uuid: .*' to 'uuid: <uuid>'")
+---
+- true
+...
+test_run:cmd("push filter 'idle: .*' to 'idle: <idle>'")
+---
+- true
+...
+test_run:cmd("push filter 'peer: .*' to 'peer: <peer>'")
+---
+- true
+...
+test_run:cmd("push filter 'lag: .*' to 'lag: <lag>'")
+---
+- true
+...
+test_run:cmd("push filter 'vclock: .*' to 'vclock: <vclock>'")
+---
+- true
+...
+test_run:cmd("push filter '(error: .builtin/.*[.]lua):[0-9]+' to '\\1'")
+---
+- true
+...
+box.replication.get_alive_replicas("name")
+---
+- error: 'builtin/box/schema.lua: Usage: box.replication.get_alive_replicas([timeout])'
+...
+alive = box.replication.get_alive_replicas(0.001)
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 1
+---
+- true
+...
+replica_set.join(test_run, 2)
+---
+...
+while box.space._cluster:len() ~= 3 do fiber.sleep(0.001) end
+---
+...
+box.space._cluster:len() == 3
+---
+- true
+...
+box.info.replication
+---
+- 1:
+ id: 1
+ uuid: <uuid>
+ lsn: <lsn>
+ 2:
+ id: 2
+ uuid: <uuid>
+ lsn: <lsn>
+ downstream:
+ vclock: <vclock>
+ 3:
+ id: 3
+ uuid: <uuid>
+ lsn: <lsn>
+ downstream:
+ vclock: <vclock>
+...
+test_run:cmd("switch replica1")
+---
+- true
+...
+box.space._cluster:len() == 3
+---
+- true
+...
+box.info.replication
+---
+- 1:
+ id: 1
+ uuid: <uuid>
+ lsn: <lsn>
+ upstream:
+ status: follow
+ idle: <idle>
+ peer: <peer>
+ lag: <lag>
+ 2:
+ id: 2
+ uuid: <uuid>
+ lsn: <lsn>
+ 3:
+ id: 3
+ uuid: <uuid>
+ lsn: <lsn>
+...
+test_run:cmd("switch default")
+---
+- true
+...
+alive = box.replication.get_alive_replicas(0.001)
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+-- nothing is deleted
+box.space._cluster:len() == 3
+---
+- true
+...
+box.info.replication
+---
+- 1:
+ id: 1
+ uuid: <uuid>
+ lsn: <lsn>
+ 2:
+ id: 2
+ uuid: <uuid>
+ lsn: <lsn>
+ downstream:
+ vclock: <vclock>
+ 3:
+ id: 3
+ uuid: <uuid>
+ lsn: <lsn>
+ downstream:
+ vclock: <vclock>
+...
+test_run:cmd("switch replica2")
+---
+- true
+...
+box.cfg{replication = {}}
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+while box.info.replication[3].downstream ~= nil and box.info.replication[3].downstream.status ~= "stopped" do fiber.sleep(0.001) end
+---
+...
+alive = box.replication.get_alive_replicas(0.001)
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 2
+---
+- true
+...
+box.info.replication
+---
+- 1:
+ id: 1
+ uuid: <uuid>
+ lsn: <lsn>
+ 2:
+ id: 2
+ uuid: <uuid>
+ lsn: <lsn>
+ downstream:
+ vclock: <vclock>
+...
+test_run:cmd("restart server replica2 with cleanup=1")
+---
+- true
+...
+while box.space._cluster:len() ~= 3 do fiber.sleep(0.001) end
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+ch = fiber.channel(1)
+---
+...
+-- The function get_alive_replicas consists of two checks of box.info.replication.
+-- Replica is considered to be alive if it is alive in one of these checks.
+-- Test these checks. In these test we highly rely on the fact that
+-- replica will be considered disconnected or restarts faster than
+-- specified timeout.
+-- First check sees replica alive, second sees it dead. Test this replica must not be deleted.
+func = function(timeout) ch:put(1) ch:put(box.replication.get_alive_replicas(timeout)) end
+---
+...
+f = fiber.create(func, 0.5)
+---
+...
+ch:get()
+---
+- 1
+...
+test_run:cmd("switch replica2")
+---
+- true
+...
+box.cfg{replication = {}}
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+while box.info.replication[3].downstream ~= nil and box.info.replication[3].downstream.status ~= "stopped" do fiber.sleep(0.001) end
+---
+...
+alive = ch:get()
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 3
+---
+- true
+...
+-- First check sees replica dead, second sees it alive. Test this replica must not be deleted.
+f = fiber.create(func, 1.0)
+---
+...
+ch:get()
+---
+- 1
+...
+test_run:cmd("restart server replica2 with cleanup=1")
+---
+- true
+...
+test_run:cmd("switch default")
+---
+- true
+...
+alive = ch:get()
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 3
+---
+- true
+...
+test_run:cmd("switch replica1")
+---
+- true
+...
+test_run:cmd("stop server default")
+---
+- true
+...
+while box.info.replication[1].upstream.status == "follow" do fiber.sleep(0.001) end
+---
+...
+alive = box.replication.get_alive_replicas(0.001)
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 1
+---
+- true
+...
+box.info.replication
+---
+- 2:
+ id: 2
+ uuid: <uuid>
+ lsn: <lsn>
+...
+test_run:cmd("deploy server default")
+---
+- true
+...
+test_run:cmd("start server default")
+---
+- true
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run = require('test_run').new()
+---
+...
+replica_set = require('fast_replica')
+---
+...
+replica_set.drop_all(test_run)
+---
+...
+-- cleanup. Don't revoke guest privilege as it was lost on deploy.
+alive = box.replication.get_alive_replicas(0.001)
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 1
+---
+- true
+...
+box.info.replication
+---
+- 1:
+ id: 1
+ uuid: <uuid>
+ lsn: <lsn>
+...
diff --git a/test/replication/prune_dead_replicas.test.lua b/test/replication/prune_dead_replicas.test.lua
new file mode 100644
index 0000000..2124047
--- /dev/null
+++ b/test/replication/prune_dead_replicas.test.lua
@@ -0,0 +1,102 @@
+test_run = require('test_run').new()
+replica_set = require('fast_replica')
+fiber = require('fiber')
+
+box.schema.user.grant('guest', 'replication')
+test_run:cmd("push filter 'lsn: [0-9]+' to 'lsn: <lsn>'")
+test_run:cmd("push filter 'uuid: .*' to 'uuid: <uuid>'")
+test_run:cmd("push filter 'idle: .*' to 'idle: <idle>'")
+test_run:cmd("push filter 'peer: .*' to 'peer: <peer>'")
+test_run:cmd("push filter 'lag: .*' to 'lag: <lag>'")
+test_run:cmd("push filter 'vclock: .*' to 'vclock: <vclock>'")
+test_run:cmd("push filter '(error: .builtin/.*[.]lua):[0-9]+' to '\\1'")
+
+box.replication.get_alive_replicas("name")
+
+alive = box.replication.get_alive_replicas(0.001)
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 1
+
+
+replica_set.join(test_run, 2)
+while box.space._cluster:len() ~= 3 do fiber.sleep(0.001) end
+box.space._cluster:len() == 3
+box.info.replication
+
+test_run:cmd("switch replica1")
+box.space._cluster:len() == 3
+box.info.replication
+
+test_run:cmd("switch default")
+alive = box.replication.get_alive_replicas(0.001)
+box.replication.prune_dead_replicas(alive)
+
+-- nothing is deleted
+box.space._cluster:len() == 3
+box.info.replication
+
+test_run:cmd("switch replica2")
+box.cfg{replication = {}}
+test_run:cmd("switch default")
+
+while box.info.replication[3].downstream ~= nil and box.info.replication[3].downstream.status ~= "stopped" do fiber.sleep(0.001) end
+alive = box.replication.get_alive_replicas(0.001)
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 2
+box.info.replication
+
+test_run:cmd("restart server replica2 with cleanup=1")
+while box.space._cluster:len() ~= 3 do fiber.sleep(0.001) end
+test_run:cmd("switch default")
+
+ch = fiber.channel(1)
+-- The function get_alive_replicas consists of two checks of box.info.replication.
+-- Replica is considered to be alive if it is alive in one of these checks.
+-- Test these checks. In these test we highly rely on the fact that
+-- replica will be considered disconnected or restarts faster than
+-- specified timeout.
+
+-- First check sees replica alive, second sees it dead. Test this replica must not be deleted.
+func = function(timeout) ch:put(1) ch:put(box.replication.get_alive_replicas(timeout)) end
+f = fiber.create(func, 0.5)
+ch:get()
+
+test_run:cmd("switch replica2")
+box.cfg{replication = {}}
+test_run:cmd("switch default")
+while box.info.replication[3].downstream ~= nil and box.info.replication[3].downstream.status ~= "stopped" do fiber.sleep(0.001) end
+alive = ch:get()
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 3
+
+-- First check sees replica dead, second sees it alive. Test this replica must not be deleted.
+f = fiber.create(func, 1.0)
+ch:get()
+test_run:cmd("restart server replica2 with cleanup=1")
+test_run:cmd("switch default")
+alive = ch:get()
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 3
+
+test_run:cmd("switch replica1")
+test_run:cmd("stop server default")
+while box.info.replication[1].upstream.status == "follow" do fiber.sleep(0.001) end
+
+alive = box.replication.get_alive_replicas(0.001)
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 1
+box.info.replication
+
+test_run:cmd("deploy server default")
+test_run:cmd("start server default")
+test_run:cmd("switch default")
+
+test_run = require('test_run').new()
+replica_set = require('fast_replica')
+replica_set.drop_all(test_run)
+-- cleanup. Don't revoke guest privilege as it was lost on deploy.
+
+alive = box.replication.get_alive_replicas(0.001)
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 1
+box.info.replication
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 95e94e5..a11a168 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -6,6 +6,7 @@
"wal_off.test.lua": {},
"hot_standby.test.lua": {},
"rebootstrap.test.lua": {},
+ "prune_dead_replicas.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
--
2.7.4
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2018-06-04 18:26 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-06-04 18:26 [tarantool-patches] [replication 1/1] replication: Add script for prune dead replicas Ilya Markov
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox