[tarantool-patches] [PATCH 2/3] Complete module reload

AKhatskevich avkhatskevich at tarantool.org
Wed Jul 18 20:47:35 MSK 2018


In case one need to upgrade vshard to a new version, this commit
improves reload mechanism to allow to do that for a wider variety of
possible changes (between two versions).

Changes:
 * introduce cfg option `connection_outdate_delay`
 * improve reload mechanism
 * add `util.async_task` method, which runs a function after a
   delay
 * delete replicaset:rebind_connections method as it is replaced
   with `rebind_replicasets` which updates all replicasets at once

Reload mechanism:
 * reload all vshard modules
 * create new `replicaset` and `replica` objects
 * reuse old netbox connections in new replica objects if
   possible
 * update router/storage.internal table
 * after a `connection_outdate_delay` disable old instances of
   `replicaset` and `replica` objects

Reload works for modules:
 * vshard.router
 * vshard.storage

Here is a module reload algorithm:
 * old vshard is working
 * delete old vshard src
 * install new vshard
 * call: package.loaded['vshard.router'] = nil
 * call: old_router = vshard.router -- Save working router copy.
 * call: vshard.router = require('vshard.router')
 * if require fails: continue using old_router
 * if require succeeds: use vshard.router

In case reload process fails, old router/storage module, replicaset and
replica objects continue working properly. If reload succeeds, all old
objects would be deprecated.

Extra changes:
 * introduce MODULE_INTERNALS which stores name of the module
   internal data in the global namespace

Part of <shut git>112
---
 test/router/reload.result    | 159 +++++++++++++++++++++++++++++++++++++++++++
 test/router/reload.test.lua  |  48 +++++++++++++
 test/router/router.result    |   3 +-
 test/storage/reload.result   |  68 ++++++++++++++++++
 test/storage/reload.test.lua |  23 +++++++
 vshard/cfg.lua               |   6 ++
 vshard/error.lua             |   5 ++
 vshard/replicaset.lua        | 100 ++++++++++++++++++++-------
 vshard/router/init.lua       |  44 ++++++++----
 vshard/storage/init.lua      |  45 ++++++++----
 vshard/util.lua              |  20 ++++++
 11 files changed, 466 insertions(+), 55 deletions(-)

diff --git a/test/router/reload.result b/test/router/reload.result
index 47f3c2e..3fbbe6e 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -174,6 +174,165 @@ vshard.router.module_version()
 check_reloaded()
 ---
 ...
+--
+-- Outdate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_ = require('vshard.router')
+---
+...
+-- Make sure outdate async task has had cpu time.
+fiber.sleep(0.005)
+---
+...
+rs.callro(rs, 'echo', {'some_data'})
+---
+- null
+- type: ShardingError
+  name: OBJECT_IS_OUTDATED
+  message: Object is outdated after module reload/reconfigure. Use new instance.
+  code: 20
+...
+vshard.router = require('vshard.router')
+---
+...
+rs = vshard.router.route(1)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+-- Test `connection_outdate_delay`.
+old_connection_delay = cfg.connection_outdate_delay
+---
+...
+cfg.connection_outdate_delay = 0.3
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+cfg.connection_outdate_delay = old_connection_delay
+---
+...
+vshard.router.internal.connection_outdate_delay = nil
+---
+...
+vshard.router = require('vshard.router')
+---
+...
+rs_new = vshard.router.route(1)
+---
+...
+rs_old = rs
+---
+...
+_, replica_old = next(rs_old.replicas)
+---
+...
+rs_new:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+-- Check old objets are still valid.
+rs_old:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+replica_old.conn ~= nil
+---
+- true
+...
+fiber.sleep(0.2)
+---
+...
+rs_old:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+replica_old.conn ~= nil
+---
+- true
+...
+replica_old.outdated == nil
+---
+- true
+...
+fiber.sleep(0.2)
+---
+...
+rs_old:callro('echo', {'some_data'})
+---
+- null
+- type: ShardingError
+  name: OBJECT_IS_OUTDATED
+  message: Object is outdated after module reload/reconfigure. Use new instance.
+  code: 20
+...
+replica_old.conn == nil
+---
+- true
+...
+replica_old.outdated == true
+---
+- true
+...
+rs_new:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
+vshard.router.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.router.internal)
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_, err = pcall(require, 'vshard.router')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.router.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.router.internal)
+---
+- true
+...
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
 test_run:switch('default')
 ---
 - true
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index af2939d..8d2b2cf 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -86,6 +86,54 @@ _ = require('vshard.router')
 vshard.router.module_version()
 check_reloaded()
 
+--
+-- Outdate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+rs:callro('echo', {'some_data'})
+package.loaded["vshard.router"] = nil
+_ = require('vshard.router')
+-- Make sure outdate async task has had cpu time.
+fiber.sleep(0.005)
+rs.callro(rs, 'echo', {'some_data'})
+vshard.router = require('vshard.router')
+rs = vshard.router.route(1)
+rs:callro('echo', {'some_data'})
+-- Test `connection_outdate_delay`.
+old_connection_delay = cfg.connection_outdate_delay
+cfg.connection_outdate_delay = 0.3
+vshard.router.cfg(cfg)
+cfg.connection_outdate_delay = old_connection_delay
+vshard.router.internal.connection_outdate_delay = nil
+vshard.router = require('vshard.router')
+rs_new = vshard.router.route(1)
+rs_old = rs
+_, replica_old = next(rs_old.replicas)
+rs_new:callro('echo', {'some_data'})
+-- Check old objets are still valid.
+rs_old:callro('echo', {'some_data'})
+replica_old.conn ~= nil
+fiber.sleep(0.2)
+rs_old:callro('echo', {'some_data'})
+replica_old.conn ~= nil
+replica_old.outdated == nil
+fiber.sleep(0.2)
+rs_old:callro('echo', {'some_data'})
+replica_old.conn == nil
+replica_old.outdated == true
+rs_new:callro('echo', {'some_data'})
+
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+vshard.router.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.router.internal)
+package.loaded["vshard.router"] = nil
+_, err = pcall(require, 'vshard.router')
+err:match('Error injection:.*')
+vshard.router.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.router.internal)
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+
 test_run:switch('default')
 test_run:cmd('stop server router_1')
 test_run:cmd('cleanup server router_1')
diff --git a/test/router/router.result b/test/router/router.result
index 7ec3e15..ab939ce 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -1024,11 +1024,10 @@ error_messages
 - - Use replicaset:callro(...) instead of replicaset.callro(...)
   - Use replicaset:connect_master(...) instead of replicaset.connect_master(...)
   - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...)
-  - Use replicaset:rebind_connections(...) instead of replicaset.rebind_connections(...)
   - Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...)
   - Use replicaset:call(...) instead of replicaset.call(...)
-  - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
   - Use replicaset:connect(...) instead of replicaset.connect(...)
+  - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
   - Use replicaset:callrw(...) instead of replicaset.callrw(...)
   - Use replicaset:connect_all(...) instead of replicaset.connect_all(...)
 ...
diff --git a/test/storage/reload.result b/test/storage/reload.result
index 531d984..0281e27 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -174,6 +174,74 @@ vshard.storage.module_version()
 check_reloaded()
 ---
 ...
+--
+-- Outdate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_ = require('vshard.storage')
+---
+...
+rs.callro(rs, 'echo', {'some_data'})
+---
+- null
+- type: ShardingError
+  name: OBJECT_IS_OUTDATED
+  message: Object is outdated after module reload/reconfigure. Use new instance.
+  code: 20
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs.callro(rs, 'echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_, err = pcall(require, 'vshard.storage')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.storage.internal)
+---
+- true
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
 test_run:switch('default')
 ---
 - true
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 64c3a60..b51a364 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -87,6 +87,29 @@ _ = require('vshard.storage')
 vshard.storage.module_version()
 check_reloaded()
 
+--
+-- Outdate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+package.loaded["vshard.storage"] = nil
+_ = require('vshard.storage')
+rs.callro(rs, 'echo', {'some_data'})
+_, rs = next(vshard.storage.internal.replicasets)
+rs.callro(rs, 'echo', {'some_data'})
+
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+rs:callro('echo', {'some_data'})
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+_, err = pcall(require, 'vshard.storage')
+err:match('Error injection:.*')
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.storage.internal)
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs:callro('echo', {'some_data'})
+
 test_run:switch('default')
 test_run:drop_cluster(REPLICASET_2)
 test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index d5429af..87d0fc8 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -217,6 +217,10 @@ local cfg_template = {
         type = 'non-negative number', name = 'Sync timeout', is_optional = true,
         default = consts.DEFAULT_SYNC_TIMEOUT
     }},
+    {'connection_outdate_delay', {
+        type = 'non-negative number', name = 'Object outdate timeout',
+        is_optional = true, default = nil
+    }},
 }
 
 --
@@ -264,6 +268,8 @@ local function remove_non_box_options(cfg)
     cfg.collect_bucket_garbage_interval = nil
     cfg.collect_lua_garbage = nil
     cfg.sync_timeout = nil
+    cfg.sync_timeout = nil
+    cfg.connection_outdate_delay = nil
 end
 
 return {
diff --git a/vshard/error.lua b/vshard/error.lua
index cf2f9d2..f79107b 100644
--- a/vshard/error.lua
+++ b/vshard/error.lua
@@ -100,6 +100,11 @@ local error_message_template = {
     [19] = {
         name = 'REPLICASET_IS_LOCKED',
         msg = 'Replicaset is locked'
+    },
+    [20] = {
+        name = 'OBJECT_IS_OUTDATED',
+        msg = 'Object is outdated after module reload/reconfigure. ' ..
+              'Use new instance.'
     }
 }
 
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 99f59aa..ec6e95b 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -48,7 +48,8 @@ local lerror = require('vshard.error')
 local fiber = require('fiber')
 local luri = require('uri')
 local ffi = require('ffi')
-local gsc = require('vshard.util').generate_self_checker
+local util = require('vshard.util')
+local gsc = util.generate_self_checker
 
 --
 -- on_connect() trigger for net.box
@@ -337,27 +338,39 @@ local function replicaset_tostring(replicaset)
                          master)
 end
 
+local outdate_replicasets
 --
--- Rebind connections of old replicas to new ones.
+-- Copy netbox conections from old replica objects to new ones
+-- and outdate old objects.
+-- @param replicasets New replicasets
+-- @param old_replicasets Replicasets and replicas to be outdated.
+-- @param outdate_delay Number of seconds; delay to outdate
+--        old objects.
 --
-local function replicaset_rebind_connections(replicaset)
-    for _, replica in pairs(replicaset.replicas) do
-        local old_replica = replica.old_replica
-        if old_replica then
-            local conn = old_replica.conn
-            replica.conn = conn
-            replica.down_ts = old_replica.down_ts
-            replica.net_timeout = old_replica.net_timeout
-            replica.net_sequential_ok = old_replica.net_sequential_ok
-            replica.net_sequential_fail = old_replica.net_sequential_fail
-            if conn then
-                conn.replica = replica
-                conn.replicaset = replicaset
-                old_replica.conn = nil
+local function rebind_replicasets(replicasets, old_replicasets, outdate_delay)
+    for replicaset_uuid, replicaset in pairs(replicasets) do
+        local old_replicaset = old_replicasets and
+                               old_replicasets[replicaset_uuid]
+        for replica_uuid, replica in pairs(replicaset.replicas) do
+            local old_replica = old_replicaset and
+                                old_replicaset.replicas[replica_uuid]
+            if old_replica then
+                local conn = old_replica.conn
+                replica.conn = conn
+                replica.down_ts = old_replica.down_ts
+                replica.net_timeout = old_replica.net_timeout
+                replica.net_sequential_ok = old_replica.net_sequential_ok
+                replica.net_sequential_fail = old_replica.net_sequential_fail
+                if conn then
+                    conn.replica = replica
+                    conn.replicaset = replicaset
+                end
             end
-            replica.old_replica = nil
         end
     end
+    if old_replicasets then
+        util.async_task(outdate_delay, outdate_replicasets, old_replicasets)
+    end
 end
 
 --
@@ -369,7 +382,6 @@ local replicaset_mt = {
         connect_master = replicaset_connect_master;
         connect_all = replicaset_connect_all;
         connect_replica = replicaset_connect_to_replica;
-        rebind_connections = replicaset_rebind_connections;
         down_replica_priority = replicaset_down_replica_priority;
         up_replica_priority = replicaset_up_replica_priority;
         call = replicaset_master_call;
@@ -412,6 +424,49 @@ for name, func in pairs(replica_mt.__index) do
 end
 replica_mt.__index = index
 
+--
+-- Meta-methods of outdated objects
+-- They define only arrtibutes from corresponding metatables to
+-- make user able to access fields of old objects.
+--
+local function outdated_warning(...)
+    return nil, lerror.vshard(lerror.code.OBJECT_IS_OUTDATED)
+end
+
+local outdated_replicaset_mt = {
+    __index = {
+        outdated = true
+    }
+}
+for fname, func in pairs(replicaset_mt.__index) do
+    outdated_replicaset_mt.__index[fname] = outdated_warning
+end
+
+local outdated_replica_mt = {
+    __index = {
+        outdated = true
+    }
+}
+for fname, func in pairs(replica_mt.__index) do
+    outdated_replica_mt.__index[fname] = outdated_warning
+end
+
+--
+-- Outdate replicaset and replica objects:
+--  * Set outdated_metatables.
+--  * Remove connections.
+--
+outdate_replicasets = function(replicasets)
+    for _, replicaset in pairs(replicasets) do
+        setmetatable(replicaset, outdated_replicaset_mt)
+        for _, replica in pairs(replicaset.replicas) do
+            setmetatable(replica, outdated_replica_mt)
+            replica.conn = nil
+        end
+    end
+    log.info('Old replicaset and replica objects are outdated.')
+end
+
 --
 -- Calculate for each replicaset its etalon bucket count.
 -- Iterative algorithm is used to learn the best balance in a
@@ -503,7 +558,7 @@ end
 --
 -- Update/build replicasets from configuration
 --
-local function buildall(sharding_cfg, old_replicasets)
+local function buildall(sharding_cfg)
     local new_replicasets = {}
     local weights = sharding_cfg.weights
     local zone = sharding_cfg.zone
@@ -515,8 +570,6 @@ local function buildall(sharding_cfg, old_replicasets)
     end
     local curr_ts = fiber.time()
     for replicaset_uuid, replicaset in pairs(sharding_cfg.sharding) do
-        local old_replicaset = old_replicasets and
-                               old_replicasets[replicaset_uuid]
         local new_replicaset = setmetatable({
             replicas = {},
             uuid = replicaset_uuid,
@@ -526,8 +579,6 @@ local function buildall(sharding_cfg, old_replicasets)
         }, replicaset_mt)
         local priority_list = {}
         for replica_uuid, replica in pairs(replicaset.replicas) do
-            local old_replica = old_replicaset and
-                                old_replicaset.replicas[replica_uuid]
             -- The old replica is saved in the new object to
             -- rebind its connection at the end of a
             -- router/storage reconfiguration.
@@ -535,7 +586,7 @@ local function buildall(sharding_cfg, old_replicasets)
                 uri = replica.uri, name = replica.name, uuid = replica_uuid,
                 zone = replica.zone, net_timeout = consts.CALL_TIMEOUT_MIN,
                 net_sequential_ok = 0, net_sequential_fail = 0,
-                down_ts = curr_ts, old_replica = old_replica,
+                down_ts = curr_ts,
             }, replica_mt)
             new_replicaset.replicas[replica_uuid] = new_replica
             if replica.master then
@@ -596,4 +647,5 @@ return {
     buildall = buildall,
     calculate_etalon_balance = cluster_calculate_etalon_balance,
     wait_masters_connect = wait_masters_connect,
+    rebind_replicasets = rebind_replicasets,
 }
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index a143070..a8f6bbc 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -1,5 +1,17 @@
 local log = require('log')
 local lfiber = require('fiber')
+
+local MODULE_INTERNALS = '__module_vshard_router'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+    local vshard_modules = {
+        'vshard.consts', 'vshard.error', 'vshard.cfg',
+        'vshard.hash', 'vshard.replicaset', 'vshard.util',
+    }
+    for _, module in pairs(vshard_modules) do
+        package.loaded[module] = nil
+    end
+end
 local consts = require('vshard.consts')
 local lerror = require('vshard.error')
 local lcfg = require('vshard.cfg')
@@ -7,7 +19,7 @@ local lhash = require('vshard.hash')
 local lreplicaset = require('vshard.replicaset')
 local util = require('vshard.util')
 
-local M = rawget(_G, '__module_vshard_router')
+local M = rawget(_G, MODULE_INTERNALS)
 if not M then
     M = {
         errinj = {
@@ -16,6 +28,8 @@ if not M then
             ERRINJ_RELOAD = false,
             ERRINJ_LONG_DISCOVERY = false,
         },
+        -- Time to outdate old objects on reload.
+        connection_outdate_delay = nil,
         -- Bucket map cache.
         route_map = {},
         -- All known replicasets used for bucket re-balancing
@@ -479,12 +493,13 @@ local function router_cfg(cfg)
     else
         log.info('Starting router reconfiguration')
     end
-    local new_replicasets = lreplicaset.buildall(cfg, M.replicasets)
+    local new_replicasets = lreplicaset.buildall(cfg)
     local total_bucket_count = cfg.bucket_count
     local collect_lua_garbage = cfg.collect_lua_garbage
-    lcfg.remove_non_box_options(cfg)
+    local box_cfg = table.deepcopy(cfg)
+    lcfg.remove_non_box_options(box_cfg)
     log.info("Calling box.cfg()...")
-    for k, v in pairs(cfg) do
+    for k, v in pairs(box_cfg) do
         log.info({[k] = v})
     end
     -- It is considered that all possible errors during cfg
@@ -493,18 +508,18 @@ local function router_cfg(cfg)
     if M.errinj.ERRINJ_CFG then
         error('Error injection: cfg')
     end
-    box.cfg(cfg)
+    box.cfg(box_cfg)
     log.info("Box has been configured")
+    M.connection_outdate_delay = cfg.connection_outdate_delay
     M.total_bucket_count = total_bucket_count
     M.collect_lua_garbage = collect_lua_garbage
-    M.replicasets = new_replicasets
     M.current_cfg = new_cfg
     -- Move connections from an old configuration to a new one.
     -- It must be done with no yields to prevent usage both of not
     -- fully moved old replicasets, and not fully built new ones.
-    for _, replicaset in pairs(new_replicasets) do
-        replicaset:rebind_connections()
-    end
+    lreplicaset.rebind_replicasets(new_replicasets, M.replicasets,
+                                   M.connection_outdate_delay)
+    M.replicasets = new_replicasets
     -- Now the new replicasets are fully built. Can establish
     -- connections and yield.
     for _, replicaset in pairs(new_replicasets) do
@@ -793,15 +808,16 @@ end
 -- About functions, saved in M, and reloading see comment in
 -- storage/init.lua.
 --
-M.discovery_f = discovery_f
-M.failover_f = failover_f
-
-if not rawget(_G, '__module_vshard_router') then
-    rawset(_G, '__module_vshard_router', M)
+if not rawget(_G, MODULE_INTERNALS) then
+    rawset(_G, MODULE_INTERNALS, M)
 else
+    router_cfg(M.current_cfg)
     M.module_version = M.module_version + 1
 end
 
+M.discovery_f = discovery_f
+M.failover_f = failover_f
+
 return {
     cfg = router_cfg;
     info = router_info;
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 052e94f..bf560e6 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -2,20 +2,34 @@ local log = require('log')
 local luri = require('uri')
 local lfiber = require('fiber')
 local netbox = require('net.box') -- for net.box:self()
+local trigger = require('internal.trigger')
+
+local MODULE_INTERNALS = '__module_vshard_storage'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+    local vshard_modules = {
+        'vshard.consts', 'vshard.error', 'vshard.cfg',
+        'vshard.replicaset', 'vshard.util',
+    }
+    for _, module in pairs(vshard_modules) do
+        package.loaded[module] = nil
+    end
+end
 local consts = require('vshard.consts')
 local lerror = require('vshard.error')
-local util = require('vshard.util')
 local lcfg = require('vshard.cfg')
 local lreplicaset = require('vshard.replicaset')
-local trigger = require('internal.trigger')
+local util = require('vshard.util')
 
-local M = rawget(_G, '__module_vshard_storage')
+local M = rawget(_G, MODULE_INTERNALS)
 if not M then
     --
     -- The module is loaded for the first time.
     --
     M = {
         ---------------- Common module attributes ----------------
+        -- The last passed configuration.
+        current_cfg = nil,
         --
         -- All known replicasets used for bucket re-balancing.
         -- See format in replicaset.lua.
@@ -1497,7 +1511,7 @@ local function storage_cfg(cfg, this_replica_uuid)
 
     local this_replicaset
     local this_replica
-    local new_replicasets = lreplicaset.buildall(cfg, M.replicasets)
+    local new_replicasets = lreplicaset.buildall(cfg)
     local min_master
     for rs_uuid, rs in pairs(new_replicasets) do
         for replica_uuid, replica in pairs(rs.replicas) do
@@ -1576,7 +1590,6 @@ local function storage_cfg(cfg, this_replica_uuid)
     --
     local old_sync_timeout = M.sync_timeout
     M.sync_timeout = cfg.sync_timeout
-    lcfg.remove_non_box_options(cfg)
 
     if was_master and not is_master then
         local_on_master_disable_prepare()
@@ -1585,7 +1598,9 @@ local function storage_cfg(cfg, this_replica_uuid)
         local_on_master_enable_prepare()
     end
 
-    local ok, err = pcall(box.cfg, cfg)
+    local box_cfg = table.deepcopy(cfg)
+    lcfg.remove_non_box_options(box_cfg)
+    local ok, err = pcall(box.cfg, box_cfg)
     while M.errinj.ERRINJ_CFG_DELAY do
         lfiber.sleep(0.01)
     end
@@ -1604,10 +1619,8 @@ local function storage_cfg(cfg, this_replica_uuid)
     local uri = luri.parse(this_replica.uri)
     box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
 
+    lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
     M.replicasets = new_replicasets
-    for _, replicaset in pairs(new_replicasets) do
-        replicaset:rebind_connections()
-    end
     M.this_replicaset = this_replicaset
     M.this_replica = this_replica
     M.total_bucket_count = total_bucket_count
@@ -1846,6 +1859,14 @@ end
 -- restarted (or is restarted from M.background_f, which is not
 -- changed) and continues use old func1 and func2.
 --
+
+if not rawget(_G, MODULE_INTERNALS) then
+    rawset(_G, MODULE_INTERNALS, M)
+else
+    storage_cfg(M.current_cfg, M.this_replica.uuid)
+    M.module_version = M.module_version + 1
+end
+
 M.recovery_f = recovery_f
 M.collect_garbage_f = collect_garbage_f
 M.rebalancer_f = rebalancer_f
@@ -1861,12 +1882,6 @@ M.rebalancer_build_routes = rebalancer_build_routes
 M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
 M.cached_find_sharded_spaces = find_sharded_spaces
 
-if not rawget(_G, '__module_vshard_storage') then
-    rawset(_G, '__module_vshard_storage', M)
-else
-    M.module_version = M.module_version + 1
-end
-
 return {
     sync = sync,
     bucket_force_create = bucket_force_create,
diff --git a/vshard/util.lua b/vshard/util.lua
index ce79930..fb875ce 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -100,9 +100,29 @@ end
 -- Update latest versions of function
 M.reloadable_fiber_f = reloadable_fiber_f
 
+local function sync_task(delay, task, ...)
+    if delay then
+        fiber.sleep(delay)
+    end
+    task(...)
+end
+
+--
+-- Run a function without interrupting current fiber.
+-- @param delay Delay in seconds before the task should be
+--        executed.
+-- @param task Function to be executed.
+-- @param ... Arguments which would be passed to the `task`.
+--
+local function async_task(delay, task, ...)
+    assert(delay == nil or type(delay) == 'number')
+    fiber.create(sync_task, delay, task, ...)
+end
+
 return {
     tuple_extract_key = tuple_extract_key,
     reloadable_fiber_f = reloadable_fiber_f,
     generate_self_checker = generate_self_checker,
+    async_task = async_task,
     internal = M,
 }
-- 
2.14.1





More information about the Tarantool-patches mailing list