[tarantool-patches] [replication 1/1] replication: Add script for prune dead replicas

Ilya Markov imarkov at tarantool.org
Mon Jun 4 21:26:21 MSK 2018


Introduce box.replication.get_alive_replicas and
box.replication.prune_dead_replicas methods.

The first method returns a table of replicas which are alive
within the specified time period.

The second method deletes all replicas which are not in table of alives
from box.space._cluster.

The alive replica in period is the one which has upstream.status or
downstream.status not equal to "stopped" or "disconnected" or the one
that was added during specified period.

Add test-run filter to some tests in engine package.

Closes #3110
---
branch: gh-3110-prune-dead-replicas

 src/box/lua/schema.lua                        |  60 +++++
 test/box/misc.result                          |   1 +
 test/engine/iterator.result                   |   6 +-
 test/engine/iterator.test.lua                 |   2 +-
 test/engine/savepoint.result                  |  16 +-
 test/engine/savepoint.test.lua                |   1 +
 test/replication/prune_dead_replicas.result   | 329 ++++++++++++++++++++++++++
 test/replication/prune_dead_replicas.test.lua | 102 ++++++++
 test/replication/suite.cfg                    |   1 +
 9 files changed, 510 insertions(+), 8 deletions(-)
 create mode 100644 test/replication/prune_dead_replicas.result
 create mode 100644 test/replication/prune_dead_replicas.test.lua

diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index 0b7849f..9662036 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -7,6 +7,7 @@ local fun = require('fun')
 local log = require('log')
 local fio = require('fio')
 local json = require('json')
+local fiber = require('fiber')
 local session = box.session
 local internal = require('box.internal')
 local function setmap(table)
@@ -2308,4 +2309,63 @@ box.feedback.save = function(file_name)
     fh:close()
 end
 
+box.replication = {}
+
+local function replica_is_alive(replica_info)
+    if replica_info ~= nil and replica_info.uuid == box.info.uuid then
+        -- current replica is assumed to be alive.
+        return true
+    end
+    if replica_info == nil or
+            (replica_info.downstream == nil and replica_info.upstream == nil) then
+        return false
+    end
+    if replica_info.downstream ~= nil then
+        return replica_info.downstream.status ~= "disconnected" and
+                replica_info.downstream.status ~= "stopped"
+    else
+        -- upstream ~= nil
+        return replica_info.upstream.status ~= "disconnected" and
+                replica_info.upstream.status ~= "stopped"
+    end
+end
+
+box.replication.get_alive_replicas = function(timeout)
+    if timeout ~= nil then
+        if type(timeout) ~= 'number' then
+            error('Usage: box.replication.get_alive_replicas([timeout])')
+        end
+    end
+    local res = {}
+    local info_old = box.info.replication
+    local info_new = box.info.replication
+    timeout = timeout or 0.5
+    fiber.sleep(timeout)
+    info_new = box.info.replication
+    for i, new_value in pairs(info_new) do
+        local old_value = info_old[i]
+        if old_value == nil or old_value.uuid ~= new_value.uuid then
+            -- Replica was added during waiting period. We can't compare it with previous status.
+            -- We should assume it alive despite its status.
+            -- UUID wouldn't match only if old_replica was deleted and new replica was added at this time.
+            -- If the old replica was recovered with new id, we assume it alive too.
+            res[new_value.uuid] = i
+        elseif replica_is_alive(old_value) or replica_is_alive(new_value) then
+            res[new_value.uuid]= i
+        end
+    end
+    return res
+end
+
+box.replication.prune_dead_replicas = function(alive_replicas)
+    if type(alive_replicas) ~= 'table' then
+        error("Usage: box.replication.prune_dead_replicas(alive_replicas)")
+    end
+    for _, tuple in box.space._cluster:pairs() do
+        if alive_replicas[tuple[2]] == nil then
+            box.space._cluster.index.uuid:delete{tuple[2]}
+        end
+    end
+end
+
 box.NULL = msgpack.NULL
diff --git a/test/box/misc.result b/test/box/misc.result
index 8f94f55..52e0044 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -68,6 +68,7 @@ t
   - info
   - internal
   - once
+  - replication
   - rollback
   - rollback_to_savepoint
   - runtime
diff --git a/test/engine/iterator.result b/test/engine/iterator.result
index 423ed0b..47e86ed 100644
--- a/test/engine/iterator.result
+++ b/test/engine/iterator.result
@@ -3812,6 +3812,10 @@ inspector:cmd("clear filter")
 ---
 - true
 ...
+inspector:cmd("push filter '(error: .builtin/.*[.]lua):[0-9]+' to '\\1'")
+---
+- true
+...
 --
 -- gh-1875 Add support for index:pairs(key, iterator-type) syntax
 --
@@ -4211,7 +4215,7 @@ s:replace{35}
 ...
 state, value = gen(param,state)
 ---
-- error: 'builtin/box/schema.lua:985: usage: next(param, state)'
+- error: 'builtin/box/schema.lua: usage: next(param, state)'
 ...
 value
 ---
diff --git a/test/engine/iterator.test.lua b/test/engine/iterator.test.lua
index 905173a..c518010 100644
--- a/test/engine/iterator.test.lua
+++ b/test/engine/iterator.test.lua
@@ -257,7 +257,7 @@ space:drop()
 
 inspector:cmd("clear filter")
 
-
+inspector:cmd("push filter '(error: .builtin/.*[.]lua):[0-9]+' to '\\1'")
 --
 -- gh-1875 Add support for index:pairs(key, iterator-type) syntax
 --
diff --git a/test/engine/savepoint.result b/test/engine/savepoint.result
index d440efa..76a0aed 100644
--- a/test/engine/savepoint.result
+++ b/test/engine/savepoint.result
@@ -4,6 +4,10 @@ env = require('test_run')
 test_run = env.new()
 ---
 ...
+test_run:cmd("push filter '(.builtin/.*[.]lua):[0-9]+' to '\\1'")
+---
+- true
+...
 -- gh-2025 box.savepoint
 s1 = nil
 ---
@@ -14,7 +18,7 @@ s1 = box.savepoint()
 ...
 box.rollback_to_savepoint(s1)
 ---
-- error: 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- error: 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
 ...
 box.begin() s1 = box.savepoint()
 ---
@@ -323,27 +327,27 @@ test_run:cmd("setopt delimiter ''");
 ok1, errmsg1
 ---
 - false
-- 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
 ...
 ok2, errmsg2
 ---
 - false
-- 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
 ...
 ok3, errmsg3
 ---
 - false
-- 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
 ...
 ok4, errmsg4
 ---
 - false
-- 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
 ...
 ok5, errmsg5
 ---
 - false
-- 'builtin/box/schema.lua:300: Usage: box.rollback_to_savepoint(savepoint)'
+- 'builtin/box/schema.lua: Usage: box.rollback_to_savepoint(savepoint)'
 ...
 s:select{}
 ---
diff --git a/test/engine/savepoint.test.lua b/test/engine/savepoint.test.lua
index de8f297..28a88d0 100644
--- a/test/engine/savepoint.test.lua
+++ b/test/engine/savepoint.test.lua
@@ -1,6 +1,7 @@
 env = require('test_run')
 test_run = env.new()
 
+test_run:cmd("push filter '(.builtin/.*[.]lua):[0-9]+' to '\\1'")
 -- gh-2025 box.savepoint
 
 s1 = nil
diff --git a/test/replication/prune_dead_replicas.result b/test/replication/prune_dead_replicas.result
new file mode 100644
index 0000000..831ac88
--- /dev/null
+++ b/test/replication/prune_dead_replicas.result
@@ -0,0 +1,329 @@
+test_run = require('test_run').new()
+---
+...
+replica_set = require('fast_replica')
+---
+...
+fiber = require('fiber')
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+test_run:cmd("push filter 'lsn: [0-9]+' to 'lsn: <lsn>'")
+---
+- true
+...
+test_run:cmd("push filter 'uuid: .*' to 'uuid: <uuid>'")
+---
+- true
+...
+test_run:cmd("push filter 'idle: .*' to 'idle: <idle>'")
+---
+- true
+...
+test_run:cmd("push filter 'peer: .*' to 'peer: <peer>'")
+---
+- true
+...
+test_run:cmd("push filter 'lag: .*' to 'lag: <lag>'")
+---
+- true
+...
+test_run:cmd("push filter 'vclock: .*' to 'vclock: <vclock>'")
+---
+- true
+...
+test_run:cmd("push filter '(error: .builtin/.*[.]lua):[0-9]+' to '\\1'")
+---
+- true
+...
+box.replication.get_alive_replicas("name")
+---
+- error: 'builtin/box/schema.lua: Usage: box.replication.get_alive_replicas([timeout])'
+...
+alive = box.replication.get_alive_replicas(0.001)
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 1
+---
+- true
+...
+replica_set.join(test_run, 2)
+---
+...
+while box.space._cluster:len() ~= 3 do fiber.sleep(0.001) end
+---
+...
+box.space._cluster:len() == 3
+---
+- true
+...
+box.info.replication
+---
+- 1:
+    id: 1
+    uuid: <uuid>
+    lsn: <lsn>
+  2:
+    id: 2
+    uuid: <uuid>
+    lsn: <lsn>
+    downstream:
+      vclock: <vclock>
+  3:
+    id: 3
+    uuid: <uuid>
+    lsn: <lsn>
+    downstream:
+      vclock: <vclock>
+...
+test_run:cmd("switch replica1")
+---
+- true
+...
+box.space._cluster:len() == 3
+---
+- true
+...
+box.info.replication
+---
+- 1:
+    id: 1
+    uuid: <uuid>
+    lsn: <lsn>
+    upstream:
+      status: follow
+      idle: <idle>
+      peer: <peer>
+      lag: <lag>
+  2:
+    id: 2
+    uuid: <uuid>
+    lsn: <lsn>
+  3:
+    id: 3
+    uuid: <uuid>
+    lsn: <lsn>
+...
+test_run:cmd("switch default")
+---
+- true
+...
+alive = box.replication.get_alive_replicas(0.001)
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+-- nothing is deleted
+box.space._cluster:len() == 3
+---
+- true
+...
+box.info.replication
+---
+- 1:
+    id: 1
+    uuid: <uuid>
+    lsn: <lsn>
+  2:
+    id: 2
+    uuid: <uuid>
+    lsn: <lsn>
+    downstream:
+      vclock: <vclock>
+  3:
+    id: 3
+    uuid: <uuid>
+    lsn: <lsn>
+    downstream:
+      vclock: <vclock>
+...
+test_run:cmd("switch replica2")
+---
+- true
+...
+box.cfg{replication = {}}
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+while box.info.replication[3].downstream ~= nil and box.info.replication[3].downstream.status ~= "stopped" do fiber.sleep(0.001) end
+---
+...
+alive = box.replication.get_alive_replicas(0.001)
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 2
+---
+- true
+...
+box.info.replication
+---
+- 1:
+    id: 1
+    uuid: <uuid>
+    lsn: <lsn>
+  2:
+    id: 2
+    uuid: <uuid>
+    lsn: <lsn>
+    downstream:
+      vclock: <vclock>
+...
+test_run:cmd("restart server replica2 with cleanup=1")
+---
+- true
+...
+while box.space._cluster:len() ~= 3 do fiber.sleep(0.001) end
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+ch = fiber.channel(1)
+---
+...
+-- The function get_alive_replicas consists of two checks of box.info.replication.
+-- Replica is considered to be alive if it is alive in one of these checks.
+-- Test these checks. In these test we highly rely on the fact that
+-- replica will be considered disconnected or restarts faster than
+-- specified timeout.
+-- First check sees replica alive, second sees it dead. Test this replica must not be deleted.
+func = function(timeout) ch:put(1) ch:put(box.replication.get_alive_replicas(timeout)) end
+---
+...
+f = fiber.create(func, 0.5)
+---
+...
+ch:get()
+---
+- 1
+...
+test_run:cmd("switch replica2")
+---
+- true
+...
+box.cfg{replication = {}}
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+while box.info.replication[3].downstream ~= nil and box.info.replication[3].downstream.status ~= "stopped" do fiber.sleep(0.001) end
+---
+...
+alive = ch:get()
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 3
+---
+- true
+...
+-- First check sees replica dead, second sees it alive. Test this replica must not be deleted.
+f = fiber.create(func, 1.0)
+---
+...
+ch:get()
+---
+- 1
+...
+test_run:cmd("restart server replica2 with cleanup=1")
+---
+- true
+...
+test_run:cmd("switch default")
+---
+- true
+...
+alive = ch:get()
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 3
+---
+- true
+...
+test_run:cmd("switch replica1")
+---
+- true
+...
+test_run:cmd("stop server default")
+---
+- true
+...
+while box.info.replication[1].upstream.status == "follow" do fiber.sleep(0.001) end
+---
+...
+alive = box.replication.get_alive_replicas(0.001)
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 1
+---
+- true
+...
+box.info.replication
+---
+- 2:
+    id: 2
+    uuid: <uuid>
+    lsn: <lsn>
+...
+test_run:cmd("deploy server default")
+---
+- true
+...
+test_run:cmd("start server default")
+---
+- true
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run = require('test_run').new()
+---
+...
+replica_set = require('fast_replica')
+---
+...
+replica_set.drop_all(test_run)
+---
+...
+-- cleanup. Don't revoke guest privilege as it was lost on deploy.
+alive = box.replication.get_alive_replicas(0.001)
+---
+...
+box.replication.prune_dead_replicas(alive)
+---
+...
+box.space._cluster:len() == 1
+---
+- true
+...
+box.info.replication
+---
+- 1:
+    id: 1
+    uuid: <uuid>
+    lsn: <lsn>
+...
diff --git a/test/replication/prune_dead_replicas.test.lua b/test/replication/prune_dead_replicas.test.lua
new file mode 100644
index 0000000..2124047
--- /dev/null
+++ b/test/replication/prune_dead_replicas.test.lua
@@ -0,0 +1,102 @@
+test_run = require('test_run').new()
+replica_set = require('fast_replica')
+fiber = require('fiber')
+
+box.schema.user.grant('guest', 'replication')
+test_run:cmd("push filter 'lsn: [0-9]+' to 'lsn: <lsn>'")
+test_run:cmd("push filter 'uuid: .*' to 'uuid: <uuid>'")
+test_run:cmd("push filter 'idle: .*' to 'idle: <idle>'")
+test_run:cmd("push filter 'peer: .*' to 'peer: <peer>'")
+test_run:cmd("push filter 'lag: .*' to 'lag: <lag>'")
+test_run:cmd("push filter 'vclock: .*' to 'vclock: <vclock>'")
+test_run:cmd("push filter '(error: .builtin/.*[.]lua):[0-9]+' to '\\1'")
+
+box.replication.get_alive_replicas("name")
+
+alive = box.replication.get_alive_replicas(0.001)
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 1
+
+
+replica_set.join(test_run, 2)
+while box.space._cluster:len() ~= 3 do fiber.sleep(0.001) end
+box.space._cluster:len() == 3
+box.info.replication
+
+test_run:cmd("switch replica1")
+box.space._cluster:len() == 3
+box.info.replication
+
+test_run:cmd("switch default")
+alive = box.replication.get_alive_replicas(0.001)
+box.replication.prune_dead_replicas(alive)
+
+-- nothing is deleted
+box.space._cluster:len() == 3
+box.info.replication
+
+test_run:cmd("switch replica2")
+box.cfg{replication = {}}
+test_run:cmd("switch default")
+
+while box.info.replication[3].downstream ~= nil and box.info.replication[3].downstream.status ~= "stopped" do fiber.sleep(0.001) end
+alive = box.replication.get_alive_replicas(0.001)
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 2
+box.info.replication
+
+test_run:cmd("restart server replica2 with cleanup=1")
+while box.space._cluster:len() ~= 3 do fiber.sleep(0.001) end
+test_run:cmd("switch default")
+
+ch = fiber.channel(1)
+-- The function get_alive_replicas consists of two checks of box.info.replication.
+-- Replica is considered to be alive if it is alive in one of these checks.
+-- Test these checks. In these test we highly rely on the fact that
+-- replica will be considered disconnected or restarts faster than
+-- specified timeout.
+
+-- First check sees replica alive, second sees it dead. Test this replica must not be deleted.
+func = function(timeout) ch:put(1) ch:put(box.replication.get_alive_replicas(timeout)) end
+f = fiber.create(func, 0.5)
+ch:get()
+
+test_run:cmd("switch replica2")
+box.cfg{replication = {}}
+test_run:cmd("switch default")
+while box.info.replication[3].downstream ~= nil and box.info.replication[3].downstream.status ~= "stopped" do fiber.sleep(0.001) end
+alive = ch:get()
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 3
+
+-- First check sees replica dead, second sees it alive. Test this replica must not be deleted.
+f = fiber.create(func, 1.0)
+ch:get()
+test_run:cmd("restart server replica2 with cleanup=1")
+test_run:cmd("switch default")
+alive = ch:get()
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 3
+
+test_run:cmd("switch replica1")
+test_run:cmd("stop server default")
+while box.info.replication[1].upstream.status == "follow" do fiber.sleep(0.001) end
+
+alive = box.replication.get_alive_replicas(0.001)
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 1
+box.info.replication
+
+test_run:cmd("deploy server default")
+test_run:cmd("start server default")
+test_run:cmd("switch default")
+
+test_run = require('test_run').new()
+replica_set = require('fast_replica')
+replica_set.drop_all(test_run)
+-- cleanup. Don't revoke guest privilege as it was lost on deploy.
+
+alive = box.replication.get_alive_replicas(0.001)
+box.replication.prune_dead_replicas(alive)
+box.space._cluster:len() == 1
+box.info.replication
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 95e94e5..a11a168 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -6,6 +6,7 @@
     "wal_off.test.lua": {},
     "hot_standby.test.lua": {},
     "rebootstrap.test.lua": {},
+    "prune_dead_replicas.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
-- 
2.7.4





More information about the Tarantool-patches mailing list