From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 537CC24D17 for ; Mon, 23 Jul 2018 07:14:37 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id XfAckneiLsHL for ; Mon, 23 Jul 2018 07:14:37 -0400 (EDT) Received: from smtp36.i.mail.ru (smtp36.i.mail.ru [94.100.177.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id ADA3E2429D for ; Mon, 23 Jul 2018 07:14:36 -0400 (EDT) From: AKhatskevich Subject: [tarantool-patches] [PATCH 2/4] Complete module reload Date: Mon, 23 Jul 2018 14:14:20 +0300 Message-Id: <6ebbf656cf54e40298ffa067551c291beefd1f8c.1532344376.git.avkhatskevich@tarantool.org> In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: v.shpilevoy@tarantool.org, tarantool-patches@freelists.org In case one need to upgrade vshard to a new version, this commit improves reload mechanism to allow to do that for a wider variety of possible changes (between two versions). Changes: * introduce cfg option `connection_outdate_delay` * improve reload mechanism * add `util.async_task` method, which runs a function after a delay * delete replicaset:rebind_connections method as it is replaced with `rebind_replicasets` which updates all replicasets at once Reload mechanism: * reload all vshard modules * create new `replicaset` and `replica` objects * reuse old netbox connections in new replica objects if possible * update router/storage.internal table * after a `connection_outdate_delay` disable old instances of `replicaset` and `replica` objects Reload works for modules: * vshard.router * vshard.storage Here is a module reload algorithm: * old vshard is working * delete old vshard src * install new vshard * call: package.loaded['vshard.router'] = nil * call: old_router = vshard.router -- Save working router copy. * call: vshard.router = require('vshard.router') * if require fails: continue using old_router * if require succeeds: use vshard.router In case reload process fails, old router/storage module, replicaset and replica objects continue working properly. If reload succeeds, all old objects would be deprecated. Extra changes: * introduce MODULE_INTERNALS which stores name of the module internal data in the global namespace Part of #112 --- test/router/reload.result | 126 +++++++++++++++++++++++++++++++++++++++++++ test/router/reload.test.lua | 36 +++++++++++++ test/router/router.result | 3 +- test/storage/reload.result | 29 ++++++++++ test/storage/reload.test.lua | 10 ++++ vshard/cfg.lua | 5 ++ vshard/error.lua | 5 ++ vshard/replicaset.lua | 102 ++++++++++++++++++++++++++--------- vshard/router/init.lua | 47 +++++++++++----- vshard/storage/init.lua | 45 ++++++++++------ vshard/util.lua | 20 +++++++ 11 files changed, 373 insertions(+), 55 deletions(-) diff --git a/test/router/reload.result b/test/router/reload.result index 47f3c2e..88122aa 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -174,6 +174,132 @@ vshard.router.module_version() check_reloaded() --- ... +-- +-- Outdate old replicaset and replica objects. +-- +rs = vshard.router.route(1) +--- +... +rs:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +package.loaded["vshard.router"] = nil +--- +... +_ = require('vshard.router') +--- +... +-- Make sure outdate async task has had cpu time. +while not rs.is_outdated do fiber.sleep(0.001) end +--- +... +rs.callro(rs, 'echo', {'some_data'}) +--- +- null +- type: ShardingError + name: OBJECT_IS_OUTDATED + message: Object is outdated after module reload/reconfigure. Use new instance. + code: 20 +... +vshard.router = require('vshard.router') +--- +... +rs = vshard.router.route(1) +--- +... +rs:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +-- Test `connection_outdate_delay`. +old_connection_delay = cfg.connection_outdate_delay +--- +... +cfg.connection_outdate_delay = 0.3 +--- +... +vshard.router.cfg(cfg) +--- +... +cfg.connection_outdate_delay = old_connection_delay +--- +... +vshard.router.internal.connection_outdate_delay = nil +--- +... +rs_new = vshard.router.route(1) +--- +... +rs_old = rs +--- +... +_, replica_old = next(rs_old.replicas) +--- +... +rs_new:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +-- Check old objets are still valid. +rs_old:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +replica_old.conn ~= nil +--- +- true +... +fiber.sleep(0.2) +--- +... +rs_old:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +replica_old.conn ~= nil +--- +- true +... +replica_old.is_outdated == nil +--- +- true +... +fiber.sleep(0.2) +--- +... +rs_old:callro('echo', {'some_data'}) +--- +- null +- type: ShardingError + name: OBJECT_IS_OUTDATED + message: Object is outdated after module reload/reconfigure. Use new instance. + code: 20 +... +replica_old.conn == nil +--- +- true +... +replica_old.is_outdated == true +--- +- true +... +rs_new:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... test_run:switch('default') --- - true diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index af2939d..01b7163 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -86,6 +86,42 @@ _ = require('vshard.router') vshard.router.module_version() check_reloaded() +-- +-- Outdate old replicaset and replica objects. +-- +rs = vshard.router.route(1) +rs:callro('echo', {'some_data'}) +package.loaded["vshard.router"] = nil +_ = require('vshard.router') +-- Make sure outdate async task has had cpu time. +while not rs.is_outdated do fiber.sleep(0.001) end +rs.callro(rs, 'echo', {'some_data'}) +vshard.router = require('vshard.router') +rs = vshard.router.route(1) +rs:callro('echo', {'some_data'}) +-- Test `connection_outdate_delay`. +old_connection_delay = cfg.connection_outdate_delay +cfg.connection_outdate_delay = 0.3 +vshard.router.cfg(cfg) +cfg.connection_outdate_delay = old_connection_delay +vshard.router.internal.connection_outdate_delay = nil +rs_new = vshard.router.route(1) +rs_old = rs +_, replica_old = next(rs_old.replicas) +rs_new:callro('echo', {'some_data'}) +-- Check old objets are still valid. +rs_old:callro('echo', {'some_data'}) +replica_old.conn ~= nil +fiber.sleep(0.2) +rs_old:callro('echo', {'some_data'}) +replica_old.conn ~= nil +replica_old.is_outdated == nil +fiber.sleep(0.2) +rs_old:callro('echo', {'some_data'}) +replica_old.conn == nil +replica_old.is_outdated == true +rs_new:callro('echo', {'some_data'}) + test_run:switch('default') test_run:cmd('stop server router_1') test_run:cmd('cleanup server router_1') diff --git a/test/router/router.result b/test/router/router.result index 4919962..45394e1 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -1024,11 +1024,10 @@ error_messages - - Use replicaset:callro(...) instead of replicaset.callro(...) - Use replicaset:connect_master(...) instead of replicaset.connect_master(...) - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...) - - Use replicaset:rebind_connections(...) instead of replicaset.rebind_connections(...) - Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...) - Use replicaset:call(...) instead of replicaset.call(...) - - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...) - Use replicaset:connect(...) instead of replicaset.connect(...) + - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...) - Use replicaset:callrw(...) instead of replicaset.callrw(...) - Use replicaset:connect_all(...) instead of replicaset.connect_all(...) ... diff --git a/test/storage/reload.result b/test/storage/reload.result index 531d984..b91b622 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -174,6 +174,35 @@ vshard.storage.module_version() check_reloaded() --- ... +-- +-- Outdate old replicaset and replica objects. +-- +_, rs = next(vshard.storage.internal.replicasets) +--- +... +package.loaded["vshard.storage"] = nil +--- +... +_ = require('vshard.storage') +--- +... +rs.callro(rs, 'echo', {'some_data'}) +--- +- null +- type: ShardingError + name: OBJECT_IS_OUTDATED + message: Object is outdated after module reload/reconfigure. Use new instance. + code: 20 +... +_, rs = next(vshard.storage.internal.replicasets) +--- +... +rs.callro(rs, 'echo', {'some_data'}) +--- +- some_data +- null +- null +... test_run:switch('default') --- - true diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 64c3a60..9140299 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -87,6 +87,16 @@ _ = require('vshard.storage') vshard.storage.module_version() check_reloaded() +-- +-- Outdate old replicaset and replica objects. +-- +_, rs = next(vshard.storage.internal.replicasets) +package.loaded["vshard.storage"] = nil +_ = require('vshard.storage') +rs.callro(rs, 'echo', {'some_data'}) +_, rs = next(vshard.storage.internal.replicasets) +rs.callro(rs, 'echo', {'some_data'}) + test_run:switch('default') test_run:drop_cluster(REPLICASET_2) test_run:drop_cluster(REPLICASET_1) diff --git a/vshard/cfg.lua b/vshard/cfg.lua index d5429af..bba12cc 100644 --- a/vshard/cfg.lua +++ b/vshard/cfg.lua @@ -217,6 +217,10 @@ local cfg_template = { type = 'non-negative number', name = 'Sync timeout', is_optional = true, default = consts.DEFAULT_SYNC_TIMEOUT }}, + {'connection_outdate_delay', { + type = 'non-negative number', name = 'Object outdate timeout', + is_optional = true + }}, } -- @@ -264,6 +268,7 @@ local function remove_non_box_options(cfg) cfg.collect_bucket_garbage_interval = nil cfg.collect_lua_garbage = nil cfg.sync_timeout = nil + cfg.connection_outdate_delay = nil end return { diff --git a/vshard/error.lua b/vshard/error.lua index cf2f9d2..f79107b 100644 --- a/vshard/error.lua +++ b/vshard/error.lua @@ -100,6 +100,11 @@ local error_message_template = { [19] = { name = 'REPLICASET_IS_LOCKED', msg = 'Replicaset is locked' + }, + [20] = { + name = 'OBJECT_IS_OUTDATED', + msg = 'Object is outdated after module reload/reconfigure. ' .. + 'Use new instance.' } } diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index 99f59aa..6c8d477 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -21,6 +21,7 @@ -- requests to the replica>, -- net_sequential_fail = , +-- is_outdated = nil/true, -- } -- }, -- master = , @@ -34,6 +35,7 @@ -- etalon_bucket_count = , +-- is_outdated = nil/true, -- } -- -- replicasets = { @@ -48,7 +50,8 @@ local lerror = require('vshard.error') local fiber = require('fiber') local luri = require('uri') local ffi = require('ffi') -local gsc = require('vshard.util').generate_self_checker +local util = require('vshard.util') +local gsc = util.generate_self_checker -- -- on_connect() trigger for net.box @@ -337,27 +340,39 @@ local function replicaset_tostring(replicaset) master) end +local outdate_replicasets -- --- Rebind connections of old replicas to new ones. +-- Copy netbox connections from old replica objects to new ones +-- and outdate old objects. +-- @param replicasets New replicasets +-- @param old_replicasets Replicasets and replicas to be outdated. +-- @param outdate_delay Number of seconds; delay to outdate +-- old objects. -- -local function replicaset_rebind_connections(replicaset) - for _, replica in pairs(replicaset.replicas) do - local old_replica = replica.old_replica - if old_replica then - local conn = old_replica.conn - replica.conn = conn - replica.down_ts = old_replica.down_ts - replica.net_timeout = old_replica.net_timeout - replica.net_sequential_ok = old_replica.net_sequential_ok - replica.net_sequential_fail = old_replica.net_sequential_fail - if conn then - conn.replica = replica - conn.replicaset = replicaset - old_replica.conn = nil +local function rebind_replicasets(replicasets, old_replicasets, outdate_delay) + for replicaset_uuid, replicaset in pairs(replicasets) do + local old_replicaset = old_replicasets and + old_replicasets[replicaset_uuid] + for replica_uuid, replica in pairs(replicaset.replicas) do + local old_replica = old_replicaset and + old_replicaset.replicas[replica_uuid] + if old_replica then + local conn = old_replica.conn + replica.conn = conn + replica.down_ts = old_replica.down_ts + replica.net_timeout = old_replica.net_timeout + replica.net_sequential_ok = old_replica.net_sequential_ok + replica.net_sequential_fail = old_replica.net_sequential_fail + if conn then + conn.replica = replica + conn.replicaset = replicaset + end end - replica.old_replica = nil end end + if old_replicasets then + util.async_task(outdate_delay, outdate_replicasets, old_replicasets) + end end -- @@ -369,7 +384,6 @@ local replicaset_mt = { connect_master = replicaset_connect_master; connect_all = replicaset_connect_all; connect_replica = replicaset_connect_to_replica; - rebind_connections = replicaset_rebind_connections; down_replica_priority = replicaset_down_replica_priority; up_replica_priority = replicaset_up_replica_priority; call = replicaset_master_call; @@ -412,6 +426,49 @@ for name, func in pairs(replica_mt.__index) do end replica_mt.__index = index +-- +-- Meta-methods of outdated objects. +-- They define only attributes from corresponding metatables to +-- make user able to access fields of old objects. +-- +local function outdated_warning(...) + return nil, lerror.vshard(lerror.code.OBJECT_IS_OUTDATED) +end + +local outdated_replicaset_mt = { + __index = { + is_outdated = true + } +} +for fname, func in pairs(replicaset_mt.__index) do + outdated_replicaset_mt.__index[fname] = outdated_warning +end + +local outdated_replica_mt = { + __index = { + is_outdated = true + } +} +for fname, func in pairs(replica_mt.__index) do + outdated_replica_mt.__index[fname] = outdated_warning +end + +-- +-- Outdate replicaset and replica objects: +-- * Set outdated_metatables. +-- * Remove connections. +-- +outdate_replicasets = function(replicasets) + for _, replicaset in pairs(replicasets) do + setmetatable(replicaset, outdated_replicaset_mt) + for _, replica in pairs(replicaset.replicas) do + setmetatable(replica, outdated_replica_mt) + replica.conn = nil + end + end + log.info('Old replicaset and replica objects are outdated.') +end + -- -- Calculate for each replicaset its etalon bucket count. -- Iterative algorithm is used to learn the best balance in a @@ -503,7 +560,7 @@ end -- -- Update/build replicasets from configuration -- -local function buildall(sharding_cfg, old_replicasets) +local function buildall(sharding_cfg) local new_replicasets = {} local weights = sharding_cfg.weights local zone = sharding_cfg.zone @@ -515,8 +572,6 @@ local function buildall(sharding_cfg, old_replicasets) end local curr_ts = fiber.time() for replicaset_uuid, replicaset in pairs(sharding_cfg.sharding) do - local old_replicaset = old_replicasets and - old_replicasets[replicaset_uuid] local new_replicaset = setmetatable({ replicas = {}, uuid = replicaset_uuid, @@ -526,8 +581,6 @@ local function buildall(sharding_cfg, old_replicasets) }, replicaset_mt) local priority_list = {} for replica_uuid, replica in pairs(replicaset.replicas) do - local old_replica = old_replicaset and - old_replicaset.replicas[replica_uuid] -- The old replica is saved in the new object to -- rebind its connection at the end of a -- router/storage reconfiguration. @@ -535,7 +588,7 @@ local function buildall(sharding_cfg, old_replicasets) uri = replica.uri, name = replica.name, uuid = replica_uuid, zone = replica.zone, net_timeout = consts.CALL_TIMEOUT_MIN, net_sequential_ok = 0, net_sequential_fail = 0, - down_ts = curr_ts, old_replica = old_replica, + down_ts = curr_ts, }, replica_mt) new_replicaset.replicas[replica_uuid] = new_replica if replica.master then @@ -596,4 +649,5 @@ return { buildall = buildall, calculate_etalon_balance = cluster_calculate_etalon_balance, wait_masters_connect = wait_masters_connect, + rebind_replicasets = rebind_replicasets, } diff --git a/vshard/router/init.lua b/vshard/router/init.lua index a143070..142ddb6 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -1,5 +1,17 @@ local log = require('log') local lfiber = require('fiber') + +local MODULE_INTERNALS = '__module_vshard_router' +-- Reload requirements, in case this module is reloaded manually. +if rawget(_G, MODULE_INTERNALS) then + local vshard_modules = { + 'vshard.consts', 'vshard.error', 'vshard.cfg', + 'vshard.hash', 'vshard.replicaset', 'vshard.util', + } + for _, module in pairs(vshard_modules) do + package.loaded[module] = nil + end +end local consts = require('vshard.consts') local lerror = require('vshard.error') local lcfg = require('vshard.cfg') @@ -7,15 +19,20 @@ local lhash = require('vshard.hash') local lreplicaset = require('vshard.replicaset') local util = require('vshard.util') -local M = rawget(_G, '__module_vshard_router') +local M = rawget(_G, MODULE_INTERNALS) if not M then M = { + ---------------- Common module attributes ---------------- + -- The last passed configuration. + current_cfg = nil, errinj = { ERRINJ_CFG = false, ERRINJ_FAILOVER_CHANGE_CFG = false, ERRINJ_RELOAD = false, ERRINJ_LONG_DISCOVERY = false, }, + -- Time to outdate old objects on reload. + connection_outdate_delay = nil, -- Bucket map cache. route_map = {}, -- All known replicasets used for bucket re-balancing @@ -479,12 +496,13 @@ local function router_cfg(cfg) else log.info('Starting router reconfiguration') end - local new_replicasets = lreplicaset.buildall(cfg, M.replicasets) + local new_replicasets = lreplicaset.buildall(cfg) local total_bucket_count = cfg.bucket_count local collect_lua_garbage = cfg.collect_lua_garbage - lcfg.remove_non_box_options(cfg) + local box_cfg = table.copy(cfg) + lcfg.remove_non_box_options(box_cfg) log.info("Calling box.cfg()...") - for k, v in pairs(cfg) do + for k, v in pairs(box_cfg) do log.info({[k] = v}) end -- It is considered that all possible errors during cfg @@ -493,18 +511,18 @@ local function router_cfg(cfg) if M.errinj.ERRINJ_CFG then error('Error injection: cfg') end - box.cfg(cfg) + box.cfg(box_cfg) log.info("Box has been configured") + M.connection_outdate_delay = cfg.connection_outdate_delay M.total_bucket_count = total_bucket_count M.collect_lua_garbage = collect_lua_garbage - M.replicasets = new_replicasets M.current_cfg = new_cfg -- Move connections from an old configuration to a new one. -- It must be done with no yields to prevent usage both of not -- fully moved old replicasets, and not fully built new ones. - for _, replicaset in pairs(new_replicasets) do - replicaset:rebind_connections() - end + lreplicaset.rebind_replicasets(new_replicasets, M.replicasets, + M.connection_outdate_delay) + M.replicasets = new_replicasets -- Now the new replicasets are fully built. Can establish -- connections and yield. for _, replicaset in pairs(new_replicasets) do @@ -793,15 +811,16 @@ end -- About functions, saved in M, and reloading see comment in -- storage/init.lua. -- -M.discovery_f = discovery_f -M.failover_f = failover_f - -if not rawget(_G, '__module_vshard_router') then - rawset(_G, '__module_vshard_router', M) +if not rawget(_G, MODULE_INTERNALS) then + rawset(_G, MODULE_INTERNALS, M) else + router_cfg(M.current_cfg) M.module_version = M.module_version + 1 end +M.discovery_f = discovery_f +M.failover_f = failover_f + return { cfg = router_cfg; info = router_info; diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 052e94f..07bd00c 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -2,20 +2,34 @@ local log = require('log') local luri = require('uri') local lfiber = require('fiber') local netbox = require('net.box') -- for net.box:self() +local trigger = require('internal.trigger') + +local MODULE_INTERNALS = '__module_vshard_storage' +-- Reload requirements, in case this module is reloaded manually. +if rawget(_G, MODULE_INTERNALS) then + local vshard_modules = { + 'vshard.consts', 'vshard.error', 'vshard.cfg', + 'vshard.replicaset', 'vshard.util', + } + for _, module in pairs(vshard_modules) do + package.loaded[module] = nil + end +end local consts = require('vshard.consts') local lerror = require('vshard.error') -local util = require('vshard.util') local lcfg = require('vshard.cfg') local lreplicaset = require('vshard.replicaset') -local trigger = require('internal.trigger') +local util = require('vshard.util') -local M = rawget(_G, '__module_vshard_storage') +local M = rawget(_G, MODULE_INTERNALS) if not M then -- -- The module is loaded for the first time. -- M = { ---------------- Common module attributes ---------------- + -- The last passed configuration. + current_cfg = nil, -- -- All known replicasets used for bucket re-balancing. -- See format in replicaset.lua. @@ -1497,7 +1511,7 @@ local function storage_cfg(cfg, this_replica_uuid) local this_replicaset local this_replica - local new_replicasets = lreplicaset.buildall(cfg, M.replicasets) + local new_replicasets = lreplicaset.buildall(cfg) local min_master for rs_uuid, rs in pairs(new_replicasets) do for replica_uuid, replica in pairs(rs.replicas) do @@ -1576,7 +1590,6 @@ local function storage_cfg(cfg, this_replica_uuid) -- local old_sync_timeout = M.sync_timeout M.sync_timeout = cfg.sync_timeout - lcfg.remove_non_box_options(cfg) if was_master and not is_master then local_on_master_disable_prepare() @@ -1585,7 +1598,9 @@ local function storage_cfg(cfg, this_replica_uuid) local_on_master_enable_prepare() end - local ok, err = pcall(box.cfg, cfg) + local box_cfg = table.copy(cfg) + lcfg.remove_non_box_options(box_cfg) + local ok, err = pcall(box.cfg, box_cfg) while M.errinj.ERRINJ_CFG_DELAY do lfiber.sleep(0.01) end @@ -1604,10 +1619,8 @@ local function storage_cfg(cfg, this_replica_uuid) local uri = luri.parse(this_replica.uri) box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password) + lreplicaset.rebind_replicasets(new_replicasets, M.replicasets) M.replicasets = new_replicasets - for _, replicaset in pairs(new_replicasets) do - replicaset:rebind_connections() - end M.this_replicaset = this_replicaset M.this_replica = this_replica M.total_bucket_count = total_bucket_count @@ -1846,6 +1859,14 @@ end -- restarted (or is restarted from M.background_f, which is not -- changed) and continues use old func1 and func2. -- + +if not rawget(_G, MODULE_INTERNALS) then + rawset(_G, MODULE_INTERNALS, M) +else + storage_cfg(M.current_cfg, M.this_replica.uuid) + M.module_version = M.module_version + 1 +end + M.recovery_f = recovery_f M.collect_garbage_f = collect_garbage_f M.rebalancer_f = rebalancer_f @@ -1861,12 +1882,6 @@ M.rebalancer_build_routes = rebalancer_build_routes M.rebalancer_calculate_metrics = rebalancer_calculate_metrics M.cached_find_sharded_spaces = find_sharded_spaces -if not rawget(_G, '__module_vshard_storage') then - rawset(_G, '__module_vshard_storage', M) -else - M.module_version = M.module_version + 1 -end - return { sync = sync, bucket_force_create = bucket_force_create, diff --git a/vshard/util.lua b/vshard/util.lua index ce79930..fb875ce 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -100,9 +100,29 @@ end -- Update latest versions of function M.reloadable_fiber_f = reloadable_fiber_f +local function sync_task(delay, task, ...) + if delay then + fiber.sleep(delay) + end + task(...) +end + +-- +-- Run a function without interrupting current fiber. +-- @param delay Delay in seconds before the task should be +-- executed. +-- @param task Function to be executed. +-- @param ... Arguments which would be passed to the `task`. +-- +local function async_task(delay, task, ...) + assert(delay == nil or type(delay) == 'number') + fiber.create(sync_task, delay, task, ...) +end + return { tuple_extract_key = tuple_extract_key, reloadable_fiber_f = reloadable_fiber_f, generate_self_checker = generate_self_checker, + async_task = async_task, internal = M, } -- 2.14.1