[tarantool-patches] Re: [PATCH 3/3] Introduce destroy module feature

Alex Khatskevich avkhatskevich at tarantool.org
Tue Jun 26 00:54:46 MSK 2018


>> --- a/test/lua_libs/util.lua
>> +++ b/test/lua_libs/util.lua
>> @@ -69,9 +69,43 @@ local function wait_master(test_run, replicaset, 
>> master)
>>       log.info('Slaves are connected to a master "%s"', master)
>>   end
>>   +function vshard_fiber_list()
>> +    -- Flush jit traces to prevent them from
>> +    -- keeping its upvalues in memory.
>> +    jit.flush()
>> +    collectgarbage()
>> +    -- Give a fiber time to clean itself.
>> +    fiber.sleep(0.05)
>
> 1. Why do you need this sleep? As far as I know
> collectgarbage just cleans all the garbage in the same
> fiber with no yields or async tasks.
netbox: If `collect garbage` has collected a connection, the fiber need 
a time to wakeup after
fiber:cancel() to process it and exit.
another module: the same possible behavior.
>>
>> +        local add = true
>> +        for _, pattern in pairs(non_vshard_patterns) do
>> +            if fib.name:match(pattern) then
>> +                add = false
>> +                break
>> +            end
>> +        end
>> +        if add then
>> +            table.insert(names, fib.name)
>> +        end
>> +    end
>> +    table.sort(names)
>
> 2. Sort of an array just does nothing, it is not?
Fibers return in non-predictable order.
If one uses this function just to immediately print a fiber list, he/she 
expects the retval
to be consistent over time.
>
>> diff --git a/test/router/destroy.result b/test/router/destroy.result
>> +        sleep(0.05)
>
> 3. Maybe fiber.sleep? Process sleep looks weird here.
>> +---> diff --git a/test/storage/destroy.result 
>> b/test/storage/destroy.result
>> +        sleep(0.05)
> 4. Same.

Fixed, thanks
>> +    end
>> +end;
>> +---> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>> index 21093e5..da8e49e 100644
>> --- a/vshard/router/init.lua
>> +++ b/vshard/router/init.lua
>> @@ -33,6 +33,27 @@ if not M then
>>       }
>>   end
>>   +--
>> +-- Destroy router module.
>> +--
>> +local function destroy()
>> +    local MODULE_INTERNALS = '__module_vshard_router'
>> +    assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed')
>> +    local bg_fibers = {
>> +        'failover_fiber',
>> +        'discovery_fiber',
>> +    }
>
> 5. Please, just inline this cycle in 2 lines. Anyway on a new
> fiber it is necessary to add a new name to bg_fibers.
done
>
>> +    for _, fib_name in pairs(bg_fibers) do
>> +        if M[fib_name] then
>> +            M[fib_name]:cancel()
>> +            M[fib_name] = nil
>> +        end
>> +    end
>> +    vshard.router.internal = nil
>> +    rawset(_G, MODULE_INTERNALS, nil)
>> +    M = nil
>
> 6. I do not like, that recfg now looks like
>
> destroy
> package.loaded = nil
> cfg
>
> It should not be necessary to nullify the package. Today in
> the public chat a customer asked about
>
> box.cfg{}
> box.stop()
> box.cfg{}
> box.stop()
>
> So there is no place for package.loaded. Most of people
> even do not know about this thing existence.
reimplemented. waiting for feedback.
>
>> +end
>> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
>> index 059e705..ac37163 100644
>> --- a/vshard/storage/init.lua
>> +++ b/vshard/storage/init.lua
>> @@ -207,6 +207,43 @@ local function on_master_enable(...)
>>       end
>>   end
>> +--------------------------------------------------------------------------------
>> +-- Destroy
>> +-------------------------------------------------------------------------------- 
>>
>> +
>> +--
>> +-- Destroy storage module.
>> +--
>> +local function destroy()
>> +    local MODULE_INTERNALS = '__module_vshard_storage'
>> +    assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed')
>> +    box.space._bucket:drop()
> 7. When this instance is master for another instance (it will be so
> when we return to master-master topology), this drop will be replicated
> to active instances. Please, forbid destroy when the instance has active
> relays.
This feature is under development...
>> +    for _, name in ipairs(storage_api) do
>> +        box.schema.func.drop(name)
>> +    end
>> +    local bg_fibers = {
>> +        'recovery_fiber',
>> +        'collect_bucket_garbage_fiber',
>> +        'rebalancer_applier_fiber',
>> +        'rebalancer_fiber',
>> +    }
>> +    for _, fib_name in pairs(bg_fibers) do
>> +        if M[fib_name] then
>> +            M[fib_name]:cancel()
>> +            M[fib_name] = nil
>> +        end
>> +    end
>> +    local box_cfg = table.deepcopy(box.cfg)
>> +    box_cfg.replication = nil
>> +    box_cfg.read_only = nil
>
> 8. Assign nil to table is like removal of the key. So here actually
> you just recall empty box.cfg{}. To turn off the replication you
> should pass empty replication array explicitly. To turn off the
> read_only you should pass read_only = false explicitly.
Thanks
>> +    box.snapshot()
>
> 9. Why?
deleted

Full diff


commit b0cddc2a3ba30d3ecf57e71ce8d095bf2f093a5e
Author: AKhatskevich <avkhatskevich at tarantool.org>
Date:   Wed Jun 20 00:31:25 2018 +0300

     Introduce destroy module feature

     Introduce functions:
      * vshard.router.destroy()
      * vshard.storage.destroy()

     Those functions:
      * close connections
      * stop background fibers
      * delete vshard spaces
      * delete vshard funcitons
      * delete `once` metadate

     After the destroy, module can be configured as it was just loaded.

     Extra changes:
      * introduce fiber_list function which returns names of non-tarantool
        fibers
      * introduce update_M function, which updates M (module internals) with
        values defined in the module

     Closes #121

diff --git a/test/lua_libs/util.lua b/test/lua_libs/util.lua
index f2d3b48..ce0ea67 100644
--- a/test/lua_libs/util.lua
+++ b/test/lua_libs/util.lua
@@ -69,9 +69,43 @@ local function wait_master(test_run, replicaset, master)
      log.info('Slaves are connected to a master "%s"', master)
  end

+function vshard_fiber_list()
+    -- Flush jit traces to prevent them from
+    -- keeping its upvalues in memory.
+    jit.flush()
+    collectgarbage()
+    -- Give a fiber time to clean itself.
+    fiber.sleep(0.05)
+    local fibers = fiber.info()
+    local non_vshard_patterns = {
+        '^console',
+        'feedback_daemon$',
+        '^checkpoint_daemon$',
+        '^main$',
+        '^memtx%.gc$',
+        '^vinyl%.scheduler$',
+    }
+    local names = {}
+    for _, fib in pairs(fibers) do
+        local add = true
+        for _, pattern in pairs(non_vshard_patterns) do
+            if fib.name:match(pattern) then
+                add = false
+                break
+            end
+        end
+        if add then
+            table.insert(names, fib.name)
+        end
+    end
+    table.sort(names)
+    return names
+end;
+
  return {
      check_error = check_error,
      shuffle_masters = shuffle_masters,
      collect_timeouts = collect_timeouts,
      wait_master = wait_master,
+    vshard_fiber_list = vshard_fiber_list,
  }
diff --git a/test/router/destroy.result b/test/router/destroy.result
new file mode 100644
index 0000000..9284ef6
--- /dev/null
+++ b/test/router/destroy.result
@@ -0,0 +1,105 @@
+test_run = require('test_run').new()
+---
+...
+netbox = require('net.box')
+---
+...
+fiber = require('fiber')
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'storage')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'storage')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+---
+- true
+...
+test_run:cmd("start server router_1")
+---
+- true
+...
+_ = test_run:cmd("switch router_1")
+---
+...
+util = require('util')
+---
+...
+fiber = require('fiber')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function wait_fibers_exit()
+    while #util.vshard_fiber_list() > 0 do
+        fiber.sleep(0.05)
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3301 (net.box)
+  - 127.0.0.1:3302 (net.box)
+  - 127.0.0.1:3303 (net.box)
+  - 127.0.0.1:3304 (net.box)
+  - discovery_fiber
+  - vshard.failover
+...
+vshard.router.destroy()
+---
+...
+wait_fibers_exit()
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3301 (net.box)
+  - 127.0.0.1:3302 (net.box)
+  - 127.0.0.1:3303 (net.box)
+  - 127.0.0.1:3304 (net.box)
+  - discovery_fiber
+  - vshard.failover
+...
+_ = test_run:cmd("switch default")
+---
+...
+test_run:cmd('stop server router_1')
+---
+- true
+...
+test_run:cmd('cleanup server router_1')
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
diff --git a/test/router/destroy.test.lua b/test/router/destroy.test.lua
new file mode 100644
index 0000000..caf4d8e
--- /dev/null
+++ b/test/router/destroy.test.lua
@@ -0,0 +1,38 @@
+test_run = require('test_run').new()
+netbox = require('net.box')
+fiber = require('fiber')
+
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+
+test_run:create_cluster(REPLICASET_1, 'storage')
+test_run:create_cluster(REPLICASET_2, 'storage')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+test_run:cmd("start server router_1")
+
+_ = test_run:cmd("switch router_1")
+util = require('util')
+fiber = require('fiber')
+test_run:cmd("setopt delimiter ';'")
+function wait_fibers_exit()
+    while #util.vshard_fiber_list() > 0 do
+        fiber.sleep(0.05)
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+vshard.router.destroy()
+wait_fibers_exit()
+vshard.router.cfg(cfg)
+util.vshard_fiber_list()
+
+_ = test_run:cmd("switch default")
+test_run:cmd('stop server router_1')
+test_run:cmd('cleanup server router_1')
+test_run:drop_cluster(REPLICASET_2)
diff --git a/test/storage/destroy.result b/test/storage/destroy.result
new file mode 100644
index 0000000..180fc0b
--- /dev/null
+++ b/test/storage/destroy.result
@@ -0,0 +1,129 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'storage')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'storage')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+_ = test_run:cmd("switch storage_1_a")
+---
+...
+util = require('util')
+---
+...
+fiber = require('fiber')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function wait_fibers_exit()
+    while #util.vshard_fiber_list() > 0 do
+        fiber.sleep(0.05)
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Storage is configured.
+-- Establish net.box connection.
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs:callro('echo', {'some data'})
+---
+- some data
+- null
+- null
+...
+rs = nil
+---
+...
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3303 (net.box)
+  - vshard.gc
+  - vshard.recovery
+...
+box.schema.user.exists('storage') == true
+---
+- true
+...
+box.space._bucket ~= nil
+---
+- true
+...
+-- Destroy storage.
+vshard.storage.destroy()
+---
+...
+wait_fibers_exit()
+---
+...
+box.space._bucket == nil
+---
+- true
+...
+-- Reconfigure storage.
+-- gh-52: Allow use existing user.
+box.schema.user.exists('storage') == true
+---
+- true
+...
+vshard.storage.cfg(cfg, names['storage_1_a'])
+---
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs:callro('echo', {'some data'})
+---
+- some data
+- null
+- null
+...
+rs = nil
+---
+...
+box.space._bucket ~= nil
+---
+- true
+...
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3303 (net.box)
+  - vshard.gc
+  - vshard.recovery
+...
+_ = test_run:cmd("switch default")
+---
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
diff --git a/test/storage/destroy.test.lua b/test/storage/destroy.test.lua
new file mode 100644
index 0000000..38e7c0a
--- /dev/null
+++ b/test/storage/destroy.test.lua
@@ -0,0 +1,50 @@
+test_run = require('test_run').new()
+
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+
+test_run:create_cluster(REPLICASET_1, 'storage')
+test_run:create_cluster(REPLICASET_2, 'storage')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+
+_ = test_run:cmd("switch storage_1_a")
+util = require('util')
+fiber = require('fiber')
+test_run:cmd("setopt delimiter ';'")
+function wait_fibers_exit()
+    while #util.vshard_fiber_list() > 0 do
+        fiber.sleep(0.05)
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Storage is configured.
+-- Establish net.box connection.
+_, rs = next(vshard.storage.internal.replicasets)
+rs:callro('echo', {'some data'})
+rs = nil
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+box.schema.user.exists('storage') == true
+box.space._bucket ~= nil
+-- Destroy storage.
+vshard.storage.destroy()
+wait_fibers_exit()
+box.space._bucket == nil
+
+-- Reconfigure storage.
+-- gh-52: Allow use existing user.
+box.schema.user.exists('storage') == true
+vshard.storage.cfg(cfg, names['storage_1_a'])
+_, rs = next(vshard.storage.internal.replicasets)
+rs:callro('echo', {'some data'})
+rs = nil
+box.space._bucket ~= nil
+util.vshard_fiber_list()
+
+_ = test_run:cmd("switch default")
+test_run:drop_cluster(REPLICASET_2)
+test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 21093e5..e56c38a 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -7,30 +7,49 @@ local lhash = require('vshard.hash')
  local lreplicaset = require('vshard.replicaset')
  local util = require('vshard.util')

+local MODULE_SKELETON = {
+    errinj = {
+        ERRINJ_FAILOVER_CHANGE_CFG = false,
+        ERRINJ_RELOAD = false,
+    },
+    -- Bucket map cache.
+    route_map = {},
+    -- All known replicasets used for bucket re-balancing
+    replicasets = nil,
+    -- Fiber to maintain replica connections.
+    failover_fiber = nil,
+    -- Fiber to discovery buckets in background.
+    discovery_fiber = nil,
+    -- Bucket count stored on all replicasets.
+    total_bucket_count = 0,
+    -- If true, then discovery fiber starts to call
+    -- collectgarbage() periodically.
+    collect_lua_garbage = nil,
+    -- This counter is used to restart background fibers with
+    -- new reloaded code.
+    module_version = 0,
+}
+
  local M = rawget(_G, '__module_vshard_router')
-if not M then
-    M = {
-        errinj = {
-            ERRINJ_FAILOVER_CHANGE_CFG = false,
-            ERRINJ_RELOAD = false,
-        },
-        -- Bucket map cache.
-        route_map = {},
-        -- All known replicasets used for bucket re-balancing
-        replicasets = nil,
-        -- Fiber to maintain replica connections.
-        failover_fiber = nil,
-        -- Fiber to discovery buckets in background.
-        discovery_fiber = nil,
-        -- Bucket count stored on all replicasets.
-        total_bucket_count = 0,
-        -- If true, then discovery fiber starts to call
-        -- collectgarbage() periodically.
-        collect_lua_garbage = nil,
-        -- This counter is used to restart background fibers with
-        -- new reloaded code.
-        module_version = 0,
-    }
+local update_M
+
+--
+-- Destroy router module.
+--
+local function destroy()
+    local MODULE_INTERNALS = '__module_vshard_router'
+    assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed')
+    -- Cancel background fibers.
+    for _, fib_name in pairs({
+        'failover_fiber',
+        'discovery_fiber',
+    }) do
+        if M[fib_name] then
+            M[fib_name]:cancel()
+        end
+    end
+    util.override_table(M, MODULE_SKELETON)
+    update_M()
  end

  -- Set a replicaset by container of a bucket.
@@ -758,10 +777,6 @@ local function router_sync(timeout)
      end
  end

-if M.errinj.ERRINJ_RELOAD then
-    error('Error injection: reload')
-end
-
  --------------------------------------------------------------------------------
  -- Module definition
  --------------------------------------------------------------------------------
@@ -769,15 +784,28 @@ end
  -- About functions, saved in M, and reloading see comment in
  -- storage/init.lua.
  --
-M.discovery_f = discovery_f
-M.failover_f = failover_f
+
+--
+-- Store module-definde values to M.
+--
+update_M = function()
+    M.discovery_f = discovery_f
+    M.failover_f = failover_f
+end

  if not rawget(_G, '__module_vshard_router') then
+    M = table.deepcopy(MODULE_SKELETON)
+    update_M()
      rawset(_G, '__module_vshard_router', M)
  else
+    M = rawget(_G, '__module_vshard_router')
      M.module_version = M.module_version + 1
  end

+if M.errinj.ERRINJ_RELOAD then
+    error('Error injection: reload')
+end
+
  return {
      cfg = router_cfg;
      info = router_info;
@@ -792,6 +820,7 @@ return {
      sync = router_sync;
      bootstrap = cluster_bootstrap;
      bucket_discovery = bucket_discovery;
+    destroy = destroy,
      discovery_wakeup = discovery_wakeup;
      internal = M;
      module_version = function() return M.module_version end;
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index c80bfbf..0d3cc24 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -9,89 +9,88 @@ local lcfg = require('vshard.cfg')
  local lreplicaset = require('vshard.replicaset')
  local trigger = require('internal.trigger')

-local M = rawget(_G, '__module_vshard_storage')
-if not M then
+--
+-- The module is loaded for the first time.
+--
+local MODULE_SKELETON = {
+    ---------------- Common module attributes ----------------
      --
-    -- The module is loaded for the first time.
+    -- All known replicasets used for bucket re-balancing.
+    -- See format in replicaset.lua.
      --
-    M = {
-        ---------------- Common module attributes ----------------
-        --
-        -- All known replicasets used for bucket re-balancing.
-        -- See format in replicaset.lua.
-        --
-        replicasets = nil,
-        -- Triggers on master switch event. They are called right
-        -- before the event occurs.
-        _on_master_enable = trigger.new('_on_master_enable'),
-        _on_master_disable = trigger.new('_on_master_disable'),
-        -- Index which is a trigger to shard its space by numbers in
-        -- this index. It must have at first part either unsigned,
-        -- or integer or number type and be not nullable. Values in
-        -- this part are considered as bucket identifiers.
-        shard_index = nil,
-        -- Bucket count stored on all replicasets.
-        total_bucket_count = 0,
-        errinj = {
-            ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
-            ERRINJ_RELOAD = false,
-            ERRINJ_CFG_DELAY = false,
-            ERRINJ_LONG_RECEIVE = false,
-        },
-        -- This counter is used to restart background fibers with
-        -- new reloaded code.
-        module_version = 0,
-        --
-        -- Timeout to wait sync with slaves. Used on master
-        -- demotion or on a manual sync() call.
-        --
-        sync_timeout = consts.DEFAULT_SYNC_TIMEOUT,
-        -- References to a parent replicaset and self in it.
-        this_replicaset = nil,
-        this_replica = nil,
-
-        ------------------- Garbage collection -------------------
-        -- Fiber to remove garbage buckets data.
-        collect_bucket_garbage_fiber = nil,
-        -- Do buckets garbage collection once per this time.
-        collect_bucket_garbage_interval = nil,
-        -- If true, then bucket garbage collection fiber starts to
-        -- call collectgarbage() periodically.
-        collect_lua_garbage = nil,
-
-        -------------------- Bucket recovery ---------------------
-        -- Bucket identifiers which are not active and are not being
-        -- sent - their status is unknown. Their state must be checked
-        -- periodically in recovery fiber.
-        buckets_to_recovery = {},
-        buckets_to_recovery_count = 0,
-        recovery_fiber = nil,
-
-        ----------------------- Rebalancer -----------------------
-        -- Fiber to rebalance a cluster.
-        rebalancer_fiber = nil,
-        -- Fiber which applies routes one by one. Its presense and
-        -- active status means that the rebalancing is in progress
-        -- now on the current node.
-        rebalancer_applier_fiber = nil,
-        -- Internal flag to activate and deactivate rebalancer. Mostly
-        -- for tests.
-        is_rebalancer_active = true,
-        -- Maximal allowed percent deviation of bucket count on a
-        -- replicaset from etalon bucket count.
-        rebalancer_disbalance_threshold = 0,
-        -- Maximal bucket count that can be received by a single
-        -- replicaset simultaneously.
-        rebalancer_max_receiving = 0,
-        -- Identifier of a bucket that rebalancer is sending now,
-        -- or else 0. If a bucket has state SENDING, but its id is
-        -- not stored here, it means, that its sending was
-        -- interrupted, for example by restart of an instance, and
-        -- a destination replicaset must drop already received
-        -- data.
-        rebalancer_sending_bucket = 0,
-    }
-end
+    replicasets = nil,
+    -- Triggers on master switch event. They are called right
+    -- before the event occurs.
+    _on_master_enable = trigger.new('_on_master_enable'),
+    _on_master_disable = trigger.new('_on_master_disable'),
+    -- Index which is a trigger to shard its space by numbers in
+    -- this index. It must have at first part either unsigned,
+    -- or integer or number type and be not nullable. Values in
+    -- this part are considered as bucket identifiers.
+    shard_index = nil,
+    -- Bucket count stored on all replicasets.
+    total_bucket_count = 0,
+    errinj = {
+        ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
+        ERRINJ_RELOAD = false,
+        ERRINJ_CFG_DELAY = false,
+        ERRINJ_LONG_RECEIVE = false,
+    },
+    -- This counter is used to restart background fibers with
+    -- new reloaded code.
+    module_version = 0,
+    --
+    -- Timeout to wait sync with slaves. Used on master
+    -- demotion or on a manual sync() call.
+    --
+    sync_timeout = consts.DEFAULT_SYNC_TIMEOUT,
+    -- References to a parent replicaset and self in it.
+    this_replicaset = nil,
+    this_replica = nil,
+
+    ------------------- Garbage collection -------------------
+    -- Fiber to remove garbage buckets data.
+    collect_bucket_garbage_fiber = nil,
+    -- Do buckets garbage collection once per this time.
+    collect_bucket_garbage_interval = nil,
+    -- If true, then bucket garbage collection fiber starts to
+    -- call collectgarbage() periodically.
+    collect_lua_garbage = nil,
+
+    -------------------- Bucket recovery ---------------------
+    -- Bucket identifiers which are not active and are not being
+    -- sent - their status is unknown. Their state must be checked
+    -- periodically in recovery fiber.
+    buckets_to_recovery = {},
+    buckets_to_recovery_count = 0,
+    recovery_fiber = nil,
+
+    ----------------------- Rebalancer -----------------------
+    -- Fiber to rebalance a cluster.
+    rebalancer_fiber = nil,
+    -- Fiber which applies routes one by one. Its presense and
+    -- active status means that the rebalancing is in progress
+    -- now on the current node.
+    rebalancer_applier_fiber = nil,
+    -- Internal flag to activate and deactivate rebalancer. Mostly
+    -- for tests.
+    is_rebalancer_active = true,
+    -- Maximal allowed percent deviation of bucket count on a
+    -- replicaset from etalon bucket count.
+    rebalancer_disbalance_threshold = 0,
+    -- Maximal bucket count that can be received by a single
+    -- replicaset simultaneously.
+    rebalancer_max_receiving = 0,
+    -- Identifier of a bucket that rebalancer is sending now,
+    -- or else 0. If a bucket has state SENDING, but its id is
+    -- not stored here, it means, that its sending was
+    -- interrupted, for example by restart of an instance, and
+    -- a destination replicaset must drop already received
+    -- data.
+    rebalancer_sending_bucket = 0,
+}
+local M
+local update_M

  --
  -- Check if this replicaset is locked. It means be invisible for
@@ -138,6 +137,22 @@ end
  --------------------------------------------------------------------------------
  -- Schema
  --------------------------------------------------------------------------------
+local storage_api = {
+    'vshard.storage.sync',
+    'vshard.storage.call',
+    'vshard.storage.bucket_force_create',
+    'vshard.storage.bucket_force_drop',
+    'vshard.storage.bucket_collect',
+    'vshard.storage.bucket_send',
+    'vshard.storage.bucket_recv',
+    'vshard.storage.bucket_stat',
+    'vshard.storage.buckets_count',
+    'vshard.storage.buckets_info',
+    'vshard.storage.buckets_discovery',
+    'vshard.storage.rebalancer_request_state',
+    'vshard.storage.rebalancer_apply_routes',
+}
+
  local function storage_schema_v1(username, password)
      log.info("Initializing schema")
      box.schema.user.create(username, {
@@ -157,22 +172,6 @@ local function storage_schema_v1(username, password)
      bucket:create_index('pk', {parts = {'id'}})
      bucket:create_index('status', {parts = {'status'}, unique = false})

-    local storage_api = {
-        'vshard.storage.sync',
-        'vshard.storage.call',
-        'vshard.storage.bucket_force_create',
-        'vshard.storage.bucket_force_drop',
-        'vshard.storage.bucket_collect',
-        'vshard.storage.bucket_send',
-        'vshard.storage.bucket_recv',
-        'vshard.storage.bucket_stat',
-        'vshard.storage.buckets_count',
-        'vshard.storage.buckets_info',
-        'vshard.storage.buckets_discovery',
-        'vshard.storage.rebalancer_request_state',
-        'vshard.storage.rebalancer_apply_routes',
-    }
-
      for _, name in ipairs(storage_api) do
          box.schema.func.create(name, {setuid = true})
          box.schema.user.grant(username, 'execute', 'function', name)
@@ -203,6 +202,40 @@ local function on_master_enable(...)
      end
  end

+--------------------------------------------------------------------------------
+-- Destroy
+--------------------------------------------------------------------------------
+
+--
+-- Destroy storage module.
+--
+local function destroy()
+    local MODULE_INTERNALS = '__module_vshard_storage'
+    assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed')
+    box.space._bucket:drop()
+    for _, name in ipairs(storage_api) do
+        box.schema.func.drop(name)
+    end
+    -- Cancel background fibers.
+    for _, fib_name in pairs({
+        'recovery_fiber',
+        'collect_bucket_garbage_fiber',
+        'rebalancer_applier_fiber',
+        'rebalancer_fiber',
+    }) do
+        if M[fib_name] then
+            M[fib_name]:cancel()
+        end
+    end
+    local box_cfg = table.deepcopy(box.cfg)
+    box_cfg.replication = {}
+    box_cfg.read_only = false
+    box.cfg(box_cfg)
+    box.space._schema:delete{'oncevshard:storage:1'}
+    util.override_table(M, MODULE_SKELETON)
+    update_M()
+end
+
  --------------------------------------------------------------------------------
  -- Recovery
  --------------------------------------------------------------------------------
@@ -594,10 +627,6 @@ local function bucket_collect_internal(bucket_id)
      return data
  end

-if M.errinj.ERRINJ_RELOAD then
-    error('Error injection: reload')
-end
-
  --
  -- Collect content of ACTIVE bucket.
  --
@@ -1808,27 +1837,37 @@ end
  -- restarted (or is restarted from M.background_f, which is not
  -- changed) and continues use old func1 and func2.
  --
-M.recovery_f = recovery_f
-M.collect_garbage_f = collect_garbage_f
-M.rebalancer_f = rebalancer_f

  --
--- These functions are saved in M not for atomic reload, but for
--- unit testing.
+-- Store module-definde values to M.
  --
-M.find_garbage_bucket = find_garbage_bucket
-M.collect_garbage_step = collect_garbage_step
-M.collect_garbage_f = collect_garbage_f
-M.rebalancer_build_routes = rebalancer_build_routes
-M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
-M.cached_find_sharded_spaces = find_sharded_spaces
+update_M = function()
+    M.recovery_f = recovery_f
+    M.collect_garbage_f = collect_garbage_f
+    M.rebalancer_f = rebalancer_f
+    -- These functions are saved in M not for atomic reload, but for
+    -- unit testing.
+    M.find_garbage_bucket = find_garbage_bucket
+    M.collect_garbage_step = collect_garbage_step
+    M.collect_garbage_f = collect_garbage_f
+    M.rebalancer_build_routes = rebalancer_build_routes
+    M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
+    M.cached_find_sharded_spaces = find_sharded_spaces
+end

  if not rawget(_G, '__module_vshard_storage') then
+    M = table.deepcopy(MODULE_SKELETON)
+    update_M()
      rawset(_G, '__module_vshard_storage', M)
  else
+    M = rawget(_G, '__module_vshard_router')
      M.module_version = M.module_version + 1
  end

+if M.errinj.ERRINJ_RELOAD then
+    error('Error injection: reload')
+end
+
  return {
      sync = sync,
      bucket_force_create = bucket_force_create,
@@ -1840,6 +1879,7 @@ return {
      bucket_pin = bucket_pin,
      bucket_unpin = bucket_unpin,
      bucket_delete_garbage = bucket_delete_garbage,
+    destroy = destroy,
      garbage_collector_wakeup = garbage_collector_wakeup,
      rebalancer_wakeup = rebalancer_wakeup,
      rebalancer_apply_routes = rebalancer_apply_routes,
diff --git a/vshard/util.lua b/vshard/util.lua
index bb71318..d57cf44 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -75,7 +75,22 @@ local function generate_self_checker(obj_name, 
func_name, mt, func)
      end
  end

+--
+-- Replace itrernals of a table with a template.
+-- @param tbl Table to be updated.
+-- @param template Internals of the `tbl` after the call.
+--
+local function override_table(tbl, template)
+    for k, _ in pairs(tbl) do
+        tbl[k] = nil
+    end
+    for k, v in pairs(template) do
+        tbl[k] = table.deepcopy(v)
+    end
+end
+
  return {
+    override_table = override_table,
      tuple_extract_key = tuple_extract_key,
      reloadable_fiber_f = reloadable_fiber_f,
      generate_self_checker = generate_self_checker,




More information about the Tarantool-patches mailing list