From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 02D09279A4 for ; Fri, 20 Jul 2018 07:34:09 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id RgMjzJrpS6t4 for ; Fri, 20 Jul 2018 07:34:08 -0400 (EDT) Received: from smtp47.i.mail.ru (smtp47.i.mail.ru [94.100.177.107]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 8B3322791C for ; Fri, 20 Jul 2018 07:34:08 -0400 (EDT) Subject: [tarantool-patches] Re: [PATCH 2/3] Complete module reload From: Alex Khatskevich References: <80d76a63-7435-4148-e0ce-37ed2daee19b@tarantool.org> <7c9af9eb-7fe9-e07c-2589-56fc192e6614@tarantool.org> Message-ID: <8a41845e-5662-b908-b58a-41a7073710f9@tarantool.org> Date: Fri, 20 Jul 2018 14:34:01 +0300 MIME-Version: 1.0 In-Reply-To: <7c9af9eb-7fe9-e07c-2589-56fc192e6614@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-US Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: Vladislav Shpilevoy , tarantool-patches@freelists.org here is a full diff: commit cd3740a45322458cde10e67f8018bc4787f443aa Author: AKhatskevich Date:   Sat Jun 9 17:23:40 2018 +0300     Complete module reload     In case one need to upgrade vshard to a new version, this commit     improves reload mechanism to allow to do that for a wider variety of     possible changes (between two versions).     Changes:      * introduce cfg option `connection_outdate_delay`      * improve reload mechanism      * add `util.async_task` method, which runs a function after a        delay      * delete replicaset:rebind_connections method as it is replaced        with `rebind_replicasets` which updates all replicasets at once     Reload mechanism:      * reload all vshard modules      * create new `replicaset` and `replica` objects      * reuse old netbox connections in new replica objects if        possible      * update router/storage.internal table      * after a `connection_outdate_delay` disable old instances of        `replicaset` and `replica` objects     Reload works for modules:      * vshard.router      * vshard.storage     Here is a module reload algorithm:      * old vshard is working      * delete old vshard src      * install new vshard      * call: package.loaded['vshard.router'] = nil      * call: old_router = vshard.router -- Save working router copy.      * call: vshard.router = require('vshard.router')      * if require fails: continue using old_router      * if require succeeds: use vshard.router     In case reload process fails, old router/storage module, replicaset and     replica objects continue working properly. If reload succeeds, all old     objects would be deprecated.     Extra changes:      * introduce MODULE_INTERNALS which stores name of the module        internal data in the global namespace     Part of #112 diff --git a/test/router/reload.result b/test/router/reload.result index 47f3c2e..71f82b2 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -174,6 +174,132 @@ vshard.router.module_version()  check_reloaded()  ---  ... +-- +-- Outdate old replicaset and replica objets. +-- +rs = vshard.router.route(1) +--- +... +rs:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +package.loaded["vshard.router"] = nil +--- +... +_ = require('vshard.router') +--- +... +-- Make sure outdate async task has had cpu time. +while not rs.outdated do fiber.sleep(0.001) end +--- +... +rs.callro(rs, 'echo', {'some_data'}) +--- +- null +- type: ShardingError +  name: OBJECT_IS_OUTDATED +  message: Object is outdated after module reload/reconfigure. Use new instance. +  code: 20 +... +vshard.router = require('vshard.router') +--- +... +rs = vshard.router.route(1) +--- +... +rs:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +-- Test `connection_outdate_delay`. +old_connection_delay = cfg.connection_outdate_delay +--- +... +cfg.connection_outdate_delay = 0.3 +--- +... +vshard.router.cfg(cfg) +--- +... +cfg.connection_outdate_delay = old_connection_delay +--- +... +vshard.router.internal.connection_outdate_delay = nil +--- +... +rs_new = vshard.router.route(1) +--- +... +rs_old = rs +--- +... +_, replica_old = next(rs_old.replicas) +--- +... +rs_new:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +-- Check old objets are still valid. +rs_old:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +replica_old.conn ~= nil +--- +- true +... +fiber.sleep(0.2) +--- +... +rs_old:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +replica_old.conn ~= nil +--- +- true +... +replica_old.outdated == nil +--- +- true +... +fiber.sleep(0.2) +--- +... +rs_old:callro('echo', {'some_data'}) +--- +- null +- type: ShardingError +  name: OBJECT_IS_OUTDATED +  message: Object is outdated after module reload/reconfigure. Use new instance. +  code: 20 +... +replica_old.conn == nil +--- +- true +... +replica_old.outdated == true +--- +- true +... +rs_new:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +...  test_run:switch('default')  ---  - true diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index af2939d..d77ebe7 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -86,6 +86,42 @@ _ = require('vshard.router')  vshard.router.module_version()  check_reloaded() +-- +-- Outdate old replicaset and replica objets. +-- +rs = vshard.router.route(1) +rs:callro('echo', {'some_data'}) +package.loaded["vshard.router"] = nil +_ = require('vshard.router') +-- Make sure outdate async task has had cpu time. +while not rs.outdated do fiber.sleep(0.001) end +rs.callro(rs, 'echo', {'some_data'}) +vshard.router = require('vshard.router') +rs = vshard.router.route(1) +rs:callro('echo', {'some_data'}) +-- Test `connection_outdate_delay`. +old_connection_delay = cfg.connection_outdate_delay +cfg.connection_outdate_delay = 0.3 +vshard.router.cfg(cfg) +cfg.connection_outdate_delay = old_connection_delay +vshard.router.internal.connection_outdate_delay = nil +rs_new = vshard.router.route(1) +rs_old = rs +_, replica_old = next(rs_old.replicas) +rs_new:callro('echo', {'some_data'}) +-- Check old objets are still valid. +rs_old:callro('echo', {'some_data'}) +replica_old.conn ~= nil +fiber.sleep(0.2) +rs_old:callro('echo', {'some_data'}) +replica_old.conn ~= nil +replica_old.outdated == nil +fiber.sleep(0.2) +rs_old:callro('echo', {'some_data'}) +replica_old.conn == nil +replica_old.outdated == true +rs_new:callro('echo', {'some_data'}) +  test_run:switch('default')  test_run:cmd('stop server router_1')  test_run:cmd('cleanup server router_1') diff --git a/test/router/router.result b/test/router/router.result index 4919962..45394e1 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -1024,11 +1024,10 @@ error_messages  - - Use replicaset:callro(...) instead of replicaset.callro(...)    - Use replicaset:connect_master(...) instead of replicaset.connect_master(...)    - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...) -  - Use replicaset:rebind_connections(...) instead of replicaset.rebind_connections(...)    - Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...)    - Use replicaset:call(...) instead of replicaset.call(...) -  - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)    - Use replicaset:connect(...) instead of replicaset.connect(...) +  - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)    - Use replicaset:callrw(...) instead of replicaset.callrw(...)    - Use replicaset:connect_all(...) instead of replicaset.connect_all(...)  ... diff --git a/test/storage/reload.result b/test/storage/reload.result index 531d984..c890a9f 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -174,6 +174,35 @@ vshard.storage.module_version()  check_reloaded()  ---  ... +-- +-- Outdate old replicaset and replica objets. +-- +_, rs = next(vshard.storage.internal.replicasets) +--- +... +package.loaded["vshard.storage"] = nil +--- +... +_ = require('vshard.storage') +--- +... +rs.callro(rs, 'echo', {'some_data'}) +--- +- null +- type: ShardingError +  name: OBJECT_IS_OUTDATED +  message: Object is outdated after module reload/reconfigure. Use new instance. +  code: 20 +... +_, rs = next(vshard.storage.internal.replicasets) +--- +... +rs.callro(rs, 'echo', {'some_data'}) +--- +- some_data +- null +- null +...  test_run:switch('default')  ---  - true diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 64c3a60..b351813 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -87,6 +87,16 @@ _ = require('vshard.storage')  vshard.storage.module_version()  check_reloaded() +-- +-- Outdate old replicaset and replica objets. +-- +_, rs = next(vshard.storage.internal.replicasets) +package.loaded["vshard.storage"] = nil +_ = require('vshard.storage') +rs.callro(rs, 'echo', {'some_data'}) +_, rs = next(vshard.storage.internal.replicasets) +rs.callro(rs, 'echo', {'some_data'}) +  test_run:switch('default')  test_run:drop_cluster(REPLICASET_2)  test_run:drop_cluster(REPLICASET_1) diff --git a/vshard/cfg.lua b/vshard/cfg.lua index d5429af..bba12cc 100644 --- a/vshard/cfg.lua +++ b/vshard/cfg.lua @@ -217,6 +217,10 @@ local cfg_template = {          type = 'non-negative number', name = 'Sync timeout', is_optional = true,          default = consts.DEFAULT_SYNC_TIMEOUT      }}, +    {'connection_outdate_delay', { +        type = 'non-negative number', name = 'Object outdate timeout', +        is_optional = true +    }},  }  -- @@ -264,6 +268,7 @@ local function remove_non_box_options(cfg)      cfg.collect_bucket_garbage_interval = nil      cfg.collect_lua_garbage = nil      cfg.sync_timeout = nil +    cfg.connection_outdate_delay = nil  end  return { diff --git a/vshard/error.lua b/vshard/error.lua index cf2f9d2..f79107b 100644 --- a/vshard/error.lua +++ b/vshard/error.lua @@ -100,6 +100,11 @@ local error_message_template = {      [19] = {          name = 'REPLICASET_IS_LOCKED',          msg = 'Replicaset is locked' +    }, +    [20] = { +        name = 'OBJECT_IS_OUTDATED', +        msg = 'Object is outdated after module reload/reconfigure. ' .. +              'Use new instance.'      }  } diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index 99f59aa..f6e971b 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -21,6 +21,7 @@  --                                  requests to the replica>,  --             net_sequential_fail = , +--             outdated = nil/true,  --          }  --      },  --      master = , @@ -34,6 +35,7 @@  --      etalon_bucket_count = , +--      outdated = nil/true,  --  }  --  -- replicasets = { @@ -48,7 +50,8 @@ local lerror = require('vshard.error')  local fiber = require('fiber')  local luri = require('uri')  local ffi = require('ffi') -local gsc = require('vshard.util').generate_self_checker +local util = require('vshard.util') +local gsc = util.generate_self_checker  --  -- on_connect() trigger for net.box @@ -337,27 +340,39 @@ local function replicaset_tostring(replicaset)                           master)  end +local outdate_replicasets  -- --- Rebind connections of old replicas to new ones. +-- Copy netbox conections from old replica objects to new ones +-- and outdate old objects. +-- @param replicasets New replicasets +-- @param old_replicasets Replicasets and replicas to be outdated. +-- @param outdate_delay Number of seconds; delay to outdate +--        old objects.  -- -local function replicaset_rebind_connections(replicaset) -    for _, replica in pairs(replicaset.replicas) do -        local old_replica = replica.old_replica -        if old_replica then -            local conn = old_replica.conn -            replica.conn = conn -            replica.down_ts = old_replica.down_ts -            replica.net_timeout = old_replica.net_timeout -            replica.net_sequential_ok = old_replica.net_sequential_ok -            replica.net_sequential_fail = old_replica.net_sequential_fail -            if conn then -                conn.replica = replica -                conn.replicaset = replicaset -                old_replica.conn = nil +local function rebind_replicasets(replicasets, old_replicasets, outdate_delay) +    for replicaset_uuid, replicaset in pairs(replicasets) do +        local old_replicaset = old_replicasets and +                               old_replicasets[replicaset_uuid] +        for replica_uuid, replica in pairs(replicaset.replicas) do +            local old_replica = old_replicaset and + old_replicaset.replicas[replica_uuid] +            if old_replica then +                local conn = old_replica.conn +                replica.conn = conn +                replica.down_ts = old_replica.down_ts +                replica.net_timeout = old_replica.net_timeout +                replica.net_sequential_ok = old_replica.net_sequential_ok +                replica.net_sequential_fail = old_replica.net_sequential_fail +                if conn then +                    conn.replica = replica +                    conn.replicaset = replicaset +                end              end -            replica.old_replica = nil          end      end +    if old_replicasets then +        util.async_task(outdate_delay, outdate_replicasets, old_replicasets) +    end  end  -- @@ -369,7 +384,6 @@ local replicaset_mt = {          connect_master = replicaset_connect_master;          connect_all = replicaset_connect_all;          connect_replica = replicaset_connect_to_replica; -        rebind_connections = replicaset_rebind_connections;          down_replica_priority = replicaset_down_replica_priority;          up_replica_priority = replicaset_up_replica_priority;          call = replicaset_master_call; @@ -412,6 +426,49 @@ for name, func in pairs(replica_mt.__index) do  end  replica_mt.__index = index +-- +-- Meta-methods of outdated objects. +-- They define only attributes from corresponding metatables to +-- make user able to access fields of old objects. +-- +local function outdated_warning(...) +    return nil, lerror.vshard(lerror.code.OBJECT_IS_OUTDATED) +end + +local outdated_replicaset_mt = { +    __index = { +        outdated = true +    } +} +for fname, func in pairs(replicaset_mt.__index) do +    outdated_replicaset_mt.__index[fname] = outdated_warning +end + +local outdated_replica_mt = { +    __index = { +        outdated = true +    } +} +for fname, func in pairs(replica_mt.__index) do +    outdated_replica_mt.__index[fname] = outdated_warning +end + +-- +-- Outdate replicaset and replica objects: +--  * Set outdated_metatables. +--  * Remove connections. +-- +outdate_replicasets = function(replicasets) +    for _, replicaset in pairs(replicasets) do +        setmetatable(replicaset, outdated_replicaset_mt) +        for _, replica in pairs(replicaset.replicas) do +            setmetatable(replica, outdated_replica_mt) +            replica.conn = nil +        end +    end +    log.info('Old replicaset and replica objects are outdated.') +end +  --  -- Calculate for each replicaset its etalon bucket count.  -- Iterative algorithm is used to learn the best balance in a @@ -503,7 +560,7 @@ end  --  -- Update/build replicasets from configuration  -- -local function buildall(sharding_cfg, old_replicasets) +local function buildall(sharding_cfg)      local new_replicasets = {}      local weights = sharding_cfg.weights      local zone = sharding_cfg.zone @@ -515,8 +572,6 @@ local function buildall(sharding_cfg, old_replicasets)      end      local curr_ts = fiber.time()      for replicaset_uuid, replicaset in pairs(sharding_cfg.sharding) do -        local old_replicaset = old_replicasets and -                               old_replicasets[replicaset_uuid]          local new_replicaset = setmetatable({              replicas = {},              uuid = replicaset_uuid, @@ -526,8 +581,6 @@ local function buildall(sharding_cfg, old_replicasets)          }, replicaset_mt)          local priority_list = {}          for replica_uuid, replica in pairs(replicaset.replicas) do -            local old_replica = old_replicaset and - old_replicaset.replicas[replica_uuid]              -- The old replica is saved in the new object to              -- rebind its connection at the end of a              -- router/storage reconfiguration. @@ -535,7 +588,7 @@ local function buildall(sharding_cfg, old_replicasets)                  uri = replica.uri, name = replica.name, uuid = replica_uuid,                  zone = replica.zone, net_timeout = consts.CALL_TIMEOUT_MIN,                  net_sequential_ok = 0, net_sequential_fail = 0, -                down_ts = curr_ts, old_replica = old_replica, +                down_ts = curr_ts,              }, replica_mt)              new_replicaset.replicas[replica_uuid] = new_replica              if replica.master then @@ -596,4 +649,5 @@ return {      buildall = buildall,      calculate_etalon_balance = cluster_calculate_etalon_balance,      wait_masters_connect = wait_masters_connect, +    rebind_replicasets = rebind_replicasets,  } diff --git a/vshard/router/init.lua b/vshard/router/init.lua index a143070..142ddb6 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -1,5 +1,17 @@  local log = require('log')  local lfiber = require('fiber') + +local MODULE_INTERNALS = '__module_vshard_router' +-- Reload requirements, in case this module is reloaded manually. +if rawget(_G, MODULE_INTERNALS) then +    local vshard_modules = { +        'vshard.consts', 'vshard.error', 'vshard.cfg', +        'vshard.hash', 'vshard.replicaset', 'vshard.util', +    } +    for _, module in pairs(vshard_modules) do +        package.loaded[module] = nil +    end +end  local consts = require('vshard.consts')  local lerror = require('vshard.error')  local lcfg = require('vshard.cfg') @@ -7,15 +19,20 @@ local lhash = require('vshard.hash')  local lreplicaset = require('vshard.replicaset')  local util = require('vshard.util') -local M = rawget(_G, '__module_vshard_router') +local M = rawget(_G, MODULE_INTERNALS)  if not M then      M = { +        ---------------- Common module attributes ---------------- +        -- The last passed configuration. +        current_cfg = nil,          errinj = {              ERRINJ_CFG = false,              ERRINJ_FAILOVER_CHANGE_CFG = false,              ERRINJ_RELOAD = false,              ERRINJ_LONG_DISCOVERY = false,          }, +        -- Time to outdate old objects on reload. +        connection_outdate_delay = nil,          -- Bucket map cache.          route_map = {},          -- All known replicasets used for bucket re-balancing @@ -479,12 +496,13 @@ local function router_cfg(cfg)      else          log.info('Starting router reconfiguration')      end -    local new_replicasets = lreplicaset.buildall(cfg, M.replicasets) +    local new_replicasets = lreplicaset.buildall(cfg)      local total_bucket_count = cfg.bucket_count      local collect_lua_garbage = cfg.collect_lua_garbage -    lcfg.remove_non_box_options(cfg) +    local box_cfg = table.copy(cfg) +    lcfg.remove_non_box_options(box_cfg)      log.info("Calling box.cfg()...") -    for k, v in pairs(cfg) do +    for k, v in pairs(box_cfg) do          log.info({[k] = v})      end      -- It is considered that all possible errors during cfg @@ -493,18 +511,18 @@ local function router_cfg(cfg)      if M.errinj.ERRINJ_CFG then          error('Error injection: cfg')      end -    box.cfg(cfg) +    box.cfg(box_cfg)      log.info("Box has been configured") +    M.connection_outdate_delay = cfg.connection_outdate_delay      M.total_bucket_count = total_bucket_count      M.collect_lua_garbage = collect_lua_garbage -    M.replicasets = new_replicasets      M.current_cfg = new_cfg      -- Move connections from an old configuration to a new one.      -- It must be done with no yields to prevent usage both of not      -- fully moved old replicasets, and not fully built new ones. -    for _, replicaset in pairs(new_replicasets) do -        replicaset:rebind_connections() -    end +    lreplicaset.rebind_replicasets(new_replicasets, M.replicasets, +                                   M.connection_outdate_delay) +    M.replicasets = new_replicasets      -- Now the new replicasets are fully built. Can establish      -- connections and yield.      for _, replicaset in pairs(new_replicasets) do @@ -793,15 +811,16 @@ end  -- About functions, saved in M, and reloading see comment in  -- storage/init.lua.  -- -M.discovery_f = discovery_f -M.failover_f = failover_f - -if not rawget(_G, '__module_vshard_router') then -    rawset(_G, '__module_vshard_router', M) +if not rawget(_G, MODULE_INTERNALS) then +    rawset(_G, MODULE_INTERNALS, M)  else +    router_cfg(M.current_cfg)      M.module_version = M.module_version + 1  end +M.discovery_f = discovery_f +M.failover_f = failover_f +  return {      cfg = router_cfg;      info = router_info; diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 052e94f..07bd00c 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -2,20 +2,34 @@ local log = require('log')  local luri = require('uri')  local lfiber = require('fiber')  local netbox = require('net.box') -- for net.box:self() +local trigger = require('internal.trigger') + +local MODULE_INTERNALS = '__module_vshard_storage' +-- Reload requirements, in case this module is reloaded manually. +if rawget(_G, MODULE_INTERNALS) then +    local vshard_modules = { +        'vshard.consts', 'vshard.error', 'vshard.cfg', +        'vshard.replicaset', 'vshard.util', +    } +    for _, module in pairs(vshard_modules) do +        package.loaded[module] = nil +    end +end  local consts = require('vshard.consts')  local lerror = require('vshard.error') -local util = require('vshard.util')  local lcfg = require('vshard.cfg')  local lreplicaset = require('vshard.replicaset') -local trigger = require('internal.trigger') +local util = require('vshard.util') -local M = rawget(_G, '__module_vshard_storage') +local M = rawget(_G, MODULE_INTERNALS)  if not M then      --      -- The module is loaded for the first time.      --      M = {          ---------------- Common module attributes ---------------- +        -- The last passed configuration. +        current_cfg = nil,          --          -- All known replicasets used for bucket re-balancing.          -- See format in replicaset.lua. @@ -1497,7 +1511,7 @@ local function storage_cfg(cfg, this_replica_uuid)      local this_replicaset      local this_replica -    local new_replicasets = lreplicaset.buildall(cfg, M.replicasets) +    local new_replicasets = lreplicaset.buildall(cfg)      local min_master      for rs_uuid, rs in pairs(new_replicasets) do          for replica_uuid, replica in pairs(rs.replicas) do @@ -1576,7 +1590,6 @@ local function storage_cfg(cfg, this_replica_uuid)      --      local old_sync_timeout = M.sync_timeout      M.sync_timeout = cfg.sync_timeout -    lcfg.remove_non_box_options(cfg)      if was_master and not is_master then          local_on_master_disable_prepare() @@ -1585,7 +1598,9 @@ local function storage_cfg(cfg, this_replica_uuid)          local_on_master_enable_prepare()      end -    local ok, err = pcall(box.cfg, cfg) +    local box_cfg = table.copy(cfg) +    lcfg.remove_non_box_options(box_cfg) +    local ok, err = pcall(box.cfg, box_cfg)      while M.errinj.ERRINJ_CFG_DELAY do          lfiber.sleep(0.01)      end @@ -1604,10 +1619,8 @@ local function storage_cfg(cfg, this_replica_uuid)      local uri = luri.parse(this_replica.uri)      box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password) +    lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)      M.replicasets = new_replicasets -    for _, replicaset in pairs(new_replicasets) do -        replicaset:rebind_connections() -    end      M.this_replicaset = this_replicaset      M.this_replica = this_replica      M.total_bucket_count = total_bucket_count @@ -1846,6 +1859,14 @@ end  -- restarted (or is restarted from M.background_f, which is not  -- changed) and continues use old func1 and func2.  -- + +if not rawget(_G, MODULE_INTERNALS) then +    rawset(_G, MODULE_INTERNALS, M) +else +    storage_cfg(M.current_cfg, M.this_replica.uuid) +    M.module_version = M.module_version + 1 +end +  M.recovery_f = recovery_f  M.collect_garbage_f = collect_garbage_f  M.rebalancer_f = rebalancer_f @@ -1861,12 +1882,6 @@ M.rebalancer_build_routes = rebalancer_build_routes  M.rebalancer_calculate_metrics = rebalancer_calculate_metrics  M.cached_find_sharded_spaces = find_sharded_spaces -if not rawget(_G, '__module_vshard_storage') then -    rawset(_G, '__module_vshard_storage', M) -else -    M.module_version = M.module_version + 1 -end -  return {      sync = sync,      bucket_force_create = bucket_force_create, diff --git a/vshard/util.lua b/vshard/util.lua index ce79930..fb875ce 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -100,9 +100,29 @@ end  -- Update latest versions of function  M.reloadable_fiber_f = reloadable_fiber_f +local function sync_task(delay, task, ...) +    if delay then +        fiber.sleep(delay) +    end +    task(...) +end + +-- +-- Run a function without interrupting current fiber. +-- @param delay Delay in seconds before the task should be +--        executed. +-- @param task Function to be executed. +-- @param ... Arguments which would be passed to the `task`. +-- +local function async_task(delay, task, ...) +    assert(delay == nil or type(delay) == 'number') +    fiber.create(sync_task, delay, task, ...) +end +  return {      tuple_extract_key = tuple_extract_key,      reloadable_fiber_f = reloadable_fiber_f,      generate_self_checker = generate_self_checker, +    async_task = async_task,      internal = M,  }