[tarantool-patches] [PATCH 3/3] Introduce destroy module feature

AKhatskevich avkhatskevich at tarantool.org
Sat Jun 23 00:43:35 MSK 2018


Introduce functions:
 * vshard.router.destroy()
 * vshard.storage.destroy()

Those functions:
 * close connections
 * stop background fibers
 * delete vshard spaces
 * delete vshard funcitons
 * delete `once` metadate

Closes #121
---
 test/lua_libs/util.lua        |  34 +++++++++++
 test/router/destroy.result    | 108 ++++++++++++++++++++++++++++++++++
 test/router/destroy.test.lua  |  39 +++++++++++++
 test/storage/destroy.result   | 132 ++++++++++++++++++++++++++++++++++++++++++
 test/storage/destroy.test.lua |  51 ++++++++++++++++
 vshard/router/init.lua        |  22 +++++++
 vshard/storage/init.lua       |  70 +++++++++++++++++-----
 7 files changed, 440 insertions(+), 16 deletions(-)
 create mode 100644 test/router/destroy.result
 create mode 100644 test/router/destroy.test.lua
 create mode 100644 test/storage/destroy.result
 create mode 100644 test/storage/destroy.test.lua

diff --git a/test/lua_libs/util.lua b/test/lua_libs/util.lua
index f2d3b48..ce0ea67 100644
--- a/test/lua_libs/util.lua
+++ b/test/lua_libs/util.lua
@@ -69,9 +69,43 @@ local function wait_master(test_run, replicaset, master)
     log.info('Slaves are connected to a master "%s"', master)
 end
 
+function vshard_fiber_list()
+    -- Flush jit traces to prevent them from
+    -- keeping its upvalues in memory.
+    jit.flush()
+    collectgarbage()
+    -- Give a fiber time to clean itself.
+    fiber.sleep(0.05)
+    local fibers = fiber.info()
+    local non_vshard_patterns = {
+        '^console',
+        'feedback_daemon$',
+        '^checkpoint_daemon$',
+        '^main$',
+        '^memtx%.gc$',
+        '^vinyl%.scheduler$',
+    }
+    local names = {}
+    for _, fib in pairs(fibers) do
+        local add = true
+        for _, pattern in pairs(non_vshard_patterns) do
+            if fib.name:match(pattern) then
+                add = false
+                break
+            end
+        end
+        if add then
+            table.insert(names, fib.name)
+        end
+    end
+    table.sort(names)
+    return names
+end;
+
 return {
     check_error = check_error,
     shuffle_masters = shuffle_masters,
     collect_timeouts = collect_timeouts,
     wait_master = wait_master,
+    vshard_fiber_list = vshard_fiber_list,
 }
diff --git a/test/router/destroy.result b/test/router/destroy.result
new file mode 100644
index 0000000..48ba661
--- /dev/null
+++ b/test/router/destroy.result
@@ -0,0 +1,108 @@
+test_run = require('test_run').new()
+---
+...
+netbox = require('net.box')
+---
+...
+fiber = require('fiber')
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'storage')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'storage')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+---
+- true
+...
+test_run:cmd("start server router_1")
+---
+- true
+...
+_ = test_run:cmd("switch router_1")
+---
+...
+util = require('util')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function wait_fibers_exit()
+    while #util.vshard_fiber_list() > 0 do
+        sleep(0.05)
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3301 (net.box)
+  - 127.0.0.1:3302 (net.box)
+  - 127.0.0.1:3303 (net.box)
+  - 127.0.0.1:3304 (net.box)
+  - discovery_fiber
+  - vshard.failover
+...
+vshard.router.destroy()
+---
+...
+wait_fibers_exit()
+---
+...
+package.loaded['vshard.router'] = nil
+---
+...
+vshard.router = require('vshard.router')
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3301 (net.box)
+  - 127.0.0.1:3302 (net.box)
+  - 127.0.0.1:3303 (net.box)
+  - 127.0.0.1:3304 (net.box)
+  - discovery_fiber
+  - vshard.failover
+...
+_ = test_run:cmd("switch default")
+---
+...
+test_run:cmd('stop server router_1')
+---
+- true
+...
+test_run:cmd('cleanup server router_1')
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
diff --git a/test/router/destroy.test.lua b/test/router/destroy.test.lua
new file mode 100644
index 0000000..1f972bd
--- /dev/null
+++ b/test/router/destroy.test.lua
@@ -0,0 +1,39 @@
+test_run = require('test_run').new()
+netbox = require('net.box')
+fiber = require('fiber')
+
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+
+test_run:create_cluster(REPLICASET_1, 'storage')
+test_run:create_cluster(REPLICASET_2, 'storage')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+test_run:cmd("start server router_1")
+
+_ = test_run:cmd("switch router_1")
+util = require('util')
+test_run:cmd("setopt delimiter ';'")
+function wait_fibers_exit()
+    while #util.vshard_fiber_list() > 0 do
+        sleep(0.05)
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+vshard.router.destroy()
+wait_fibers_exit()
+package.loaded['vshard.router'] = nil
+vshard.router = require('vshard.router')
+vshard.router.cfg(cfg)
+util.vshard_fiber_list()
+
+_ = test_run:cmd("switch default")
+test_run:cmd('stop server router_1')
+test_run:cmd('cleanup server router_1')
+test_run:drop_cluster(REPLICASET_2)
diff --git a/test/storage/destroy.result b/test/storage/destroy.result
new file mode 100644
index 0000000..23ec6bd
--- /dev/null
+++ b/test/storage/destroy.result
@@ -0,0 +1,132 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'storage')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'storage')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+_ = test_run:cmd("switch storage_1_a")
+---
+...
+util = require('util')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function wait_fibers_exit()
+    while #util.vshard_fiber_list() > 0 do
+        sleep(0.05)
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Storage is configured.
+-- Establish net.box connection.
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs:callro('echo', {'some data'})
+---
+- some data
+- null
+- null
+...
+rs = nil
+---
+...
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3303 (net.box)
+  - vshard.gc
+  - vshard.recovery
+...
+box.schema.user.exists('storage') == true
+---
+- true
+...
+box.space._bucket ~= nil
+---
+- true
+...
+-- Destroy storage.
+vshard.storage.destroy()
+---
+...
+wait_fibers_exit()
+---
+...
+box.space._bucket == nil
+---
+- true
+...
+-- Reconfigure storage.
+-- gh-52: Allow use existing user.
+box.schema.user.exists('storage') == true
+---
+- true
+...
+package.loaded['vshard.storage'] = nil
+---
+...
+vshard.storage = require('vshard.storage')
+---
+...
+vshard.storage.cfg(cfg, names['storage_1_a'])
+---
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs:callro('echo', {'some data'})
+---
+- some data
+- null
+- null
+...
+rs = nil
+---
+...
+box.space._bucket ~= nil
+---
+- true
+...
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3303 (net.box)
+  - vshard.gc
+  - vshard.recovery
+...
+_ = test_run:cmd("switch default")
+---
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
diff --git a/test/storage/destroy.test.lua b/test/storage/destroy.test.lua
new file mode 100644
index 0000000..156b76e
--- /dev/null
+++ b/test/storage/destroy.test.lua
@@ -0,0 +1,51 @@
+test_run = require('test_run').new()
+
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+
+test_run:create_cluster(REPLICASET_1, 'storage')
+test_run:create_cluster(REPLICASET_2, 'storage')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+
+_ = test_run:cmd("switch storage_1_a")
+util = require('util')
+test_run:cmd("setopt delimiter ';'")
+function wait_fibers_exit()
+    while #util.vshard_fiber_list() > 0 do
+        sleep(0.05)
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Storage is configured.
+-- Establish net.box connection.
+_, rs = next(vshard.storage.internal.replicasets)
+rs:callro('echo', {'some data'})
+rs = nil
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+box.schema.user.exists('storage') == true
+box.space._bucket ~= nil
+-- Destroy storage.
+vshard.storage.destroy()
+wait_fibers_exit()
+box.space._bucket == nil
+
+-- Reconfigure storage.
+-- gh-52: Allow use existing user.
+box.schema.user.exists('storage') == true
+package.loaded['vshard.storage'] = nil
+vshard.storage = require('vshard.storage')
+vshard.storage.cfg(cfg, names['storage_1_a'])
+_, rs = next(vshard.storage.internal.replicasets)
+rs:callro('echo', {'some data'})
+rs = nil
+box.space._bucket ~= nil
+util.vshard_fiber_list()
+
+_ = test_run:cmd("switch default")
+test_run:drop_cluster(REPLICASET_2)
+test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 21093e5..da8e49e 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -33,6 +33,27 @@ if not M then
     }
 end
 
+--
+-- Destroy router module.
+--
+local function destroy()
+    local MODULE_INTERNALS = '__module_vshard_router'
+    assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed')
+    local bg_fibers = {
+        'failover_fiber',
+        'discovery_fiber',
+    }
+    for _, fib_name in pairs(bg_fibers) do
+        if M[fib_name] then
+            M[fib_name]:cancel()
+            M[fib_name] = nil
+        end
+    end
+    vshard.router.internal = nil
+    rawset(_G, MODULE_INTERNALS, nil)
+    M = nil
+end
+
 -- Set a replicaset by container of a bucket.
 local function bucket_set(bucket_id, replicaset)
     assert(replicaset)
@@ -792,6 +813,7 @@ return {
     sync = router_sync;
     bootstrap = cluster_bootstrap;
     bucket_discovery = bucket_discovery;
+    destroy = destroy,
     discovery_wakeup = discovery_wakeup;
     internal = M;
     module_version = function() return M.module_version end;
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 059e705..ac37163 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -148,6 +148,22 @@ local function create_user(username, password)
                           {if_not_exists = true})
 end
 
+local storage_api = {
+    'vshard.storage.sync',
+    'vshard.storage.call',
+    'vshard.storage.bucket_force_create',
+    'vshard.storage.bucket_force_drop',
+    'vshard.storage.bucket_collect',
+    'vshard.storage.bucket_send',
+    'vshard.storage.bucket_recv',
+    'vshard.storage.bucket_stat',
+    'vshard.storage.buckets_count',
+    'vshard.storage.buckets_info',
+    'vshard.storage.buckets_discovery',
+    'vshard.storage.rebalancer_request_state',
+    'vshard.storage.rebalancer_apply_routes',
+}
+
 local function storage_schema_v1(username, password)
     log.info("Initializing schema")
     create_user(username, password)
@@ -161,22 +177,6 @@ local function storage_schema_v1(username, password)
     bucket:create_index('pk', {parts = {'id'}})
     bucket:create_index('status', {parts = {'status'}, unique = false})
 
-    local storage_api = {
-        'vshard.storage.sync',
-        'vshard.storage.call',
-        'vshard.storage.bucket_force_create',
-        'vshard.storage.bucket_force_drop',
-        'vshard.storage.bucket_collect',
-        'vshard.storage.bucket_send',
-        'vshard.storage.bucket_recv',
-        'vshard.storage.bucket_stat',
-        'vshard.storage.buckets_count',
-        'vshard.storage.buckets_info',
-        'vshard.storage.buckets_discovery',
-        'vshard.storage.rebalancer_request_state',
-        'vshard.storage.rebalancer_apply_routes',
-    }
-
     for _, name in ipairs(storage_api) do
         box.schema.func.create(name, {setuid = true})
         box.schema.user.grant(username, 'execute', 'function', name)
@@ -207,6 +207,43 @@ local function on_master_enable(...)
     end
 end
 
+--------------------------------------------------------------------------------
+-- Destroy
+--------------------------------------------------------------------------------
+
+--
+-- Destroy storage module.
+--
+local function destroy()
+    local MODULE_INTERNALS = '__module_vshard_storage'
+    assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed')
+    box.space._bucket:drop()
+    for _, name in ipairs(storage_api) do
+        box.schema.func.drop(name)
+    end
+    local bg_fibers = {
+        'recovery_fiber',
+        'collect_bucket_garbage_fiber',
+        'rebalancer_applier_fiber',
+        'rebalancer_fiber',
+    }
+    for _, fib_name in pairs(bg_fibers) do
+        if M[fib_name] then
+            M[fib_name]:cancel()
+            M[fib_name] = nil
+        end
+    end
+    local box_cfg = table.deepcopy(box.cfg)
+    box_cfg.replication = nil
+    box_cfg.read_only = nil
+    box.cfg(box_cfg)
+    vshard.storage.internal = nil
+    box.space._schema:delete{'oncevshard:storage:1'}
+    rawset(_G, MODULE_INTERNALS, nil)
+    M = nil
+    box.snapshot()
+end
+
 --------------------------------------------------------------------------------
 -- Recovery
 --------------------------------------------------------------------------------
@@ -1844,6 +1881,7 @@ return {
     bucket_pin = bucket_pin,
     bucket_unpin = bucket_unpin,
     bucket_delete_garbage = bucket_delete_garbage,
+    destroy = destroy,
     garbage_collector_wakeup = garbage_collector_wakeup,
     rebalancer_wakeup = rebalancer_wakeup,
     rebalancer_apply_routes = rebalancer_apply_routes,
-- 
2.14.1





More information about the Tarantool-patches mailing list