Tarantool development patches archive
 help / color / mirror / Atom feed
From: AKhatskevich <avkhatskevich@tarantool.org>
To: tarantool-patches@freelists.org, v.shpilevoy@tarantool.org
Subject: [tarantool-patches] [PATCH 2/2] Complete module reload
Date: Sat,  9 Jun 2018 20:47:16 +0300	[thread overview]
Message-ID: <d132dfc18a98151b4d27f4017a7c3d5e54ece3a6.1528566184.git.avkhatskevich@tarantool.org> (raw)
In-Reply-To: <cover.1528566184.git.avkhatskevich@tarantool.org>
In-Reply-To: <cover.1528566184.git.avkhatskevich@tarantool.org>

In case one need to upgrade vshard to a new version, this commit
introduces a reload mechanism which allows to do that for a
noticeable variety of possible changes (between the two versions).

Reload process:
 * reload all vshard modules
 * create new `replicaset` and `replica` objects
 * reuse old netbox connections in new replica objects if
   possible
 * update router/storage.internal table
 * after a `reload_deprecate_timeout` disable old instances of
   `replicaset` and `replica` objects

Reload works for modules:
 * vshard.router
 * vshard.storage

In case reload process fails, old router/storage module continue
working properly.

Extra changes:
 * add `util.async_task` method, which runs a function after a
   delay
 * delete replicaset:rebind_connections method as it is replaced
   with `rebind_replicasets` which updates all replicasets at once
 * introduce `module_reloading` for distinguishing reloaf from
   reconfigure
* introduce MODULE_INTERNALS which stores name of the module
  internal data in the global namespace

Closes #112
---
 test/router/reload.result    | 109 +++++++++++++++++++++++++++++++++++++++++++
 test/router/reload.test.lua  |  41 ++++++++++++++++
 test/router/router.result    |   3 +-
 test/storage/reload.result   |  61 ++++++++++++++++++++++++
 test/storage/reload.test.lua |  24 ++++++++++
 vshard/replicaset.lua        |  91 +++++++++++++++++++++++++++++-------
 vshard/router/init.lua       |  40 ++++++++++++----
 vshard/storage/init.lua      |  44 ++++++++++++-----
 vshard/util.lua              |  20 ++++++++
 9 files changed, 392 insertions(+), 41 deletions(-)

diff --git a/test/router/reload.result b/test/router/reload.result
index 19a9ead..083475f 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -174,6 +174,115 @@ vshard.router.module_version()
 check_reloaded()
 ---
 ...
+--
+-- Deprecate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_ = require('vshard.router')
+---
+...
+-- Make sure async task (deprecator) has had cpu time.
+fiber.sleep(0.005)
+---
+...
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+---
+...
+err:match('Object replicaset is outdated.*')
+---
+- Object replicaset is outdated. Use new instance
+...
+vshard.router = require('vshard.router')
+---
+...
+rs = vshard.router.route(1)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+-- Test deprecate timeout.
+vshard.router.internal.reload_deprecate_timeout = 0.3
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_ = require('vshard.router')
+---
+...
+vshard.router.internal.reload_deprecate_timeout = nil
+---
+...
+vshard.router = require('vshard.router')
+---
+...
+rs_new = vshard.router.route(1)
+---
+...
+_ = rs_new:callro('echo', {'some_data'})
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+fiber.sleep(0.2)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+fiber.sleep(0.2)
+---
+...
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+---
+...
+err:match('Object replicaset is outdated.*')
+---
+- Object replicaset is outdated. Use new instance
+...
+_ = rs_new:callro('echo', {'some_data'})
+---
+...
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
+vshard.router.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.router.internal)
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_, err = pcall(require, 'vshard.router')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.router.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.router.internal)
+---
+- true
+...
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
 test_run:switch('default')
 ---
 - true
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 6e21b74..94cd8cc 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -86,6 +86,47 @@ _ = require('vshard.router')
 vshard.router.module_version()
 check_reloaded()
 
+--
+-- Deprecate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+_ = rs:callro('echo', {'some_data'})
+package.loaded["vshard.router"] = nil
+_ = require('vshard.router')
+-- Make sure async task (deprecator) has had cpu time.
+fiber.sleep(0.005)
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+err:match('Object replicaset is outdated.*')
+vshard.router = require('vshard.router')
+rs = vshard.router.route(1)
+_ = rs:callro('echo', {'some_data'})
+-- Test deprecate timeout.
+vshard.router.internal.reload_deprecate_timeout = 0.3
+package.loaded["vshard.router"] = nil
+_ = require('vshard.router')
+vshard.router.internal.reload_deprecate_timeout = nil
+vshard.router = require('vshard.router')
+rs_new = vshard.router.route(1)
+_ = rs_new:callro('echo', {'some_data'})
+_ = rs:callro('echo', {'some_data'})
+fiber.sleep(0.2)
+_ = rs:callro('echo', {'some_data'})
+fiber.sleep(0.2)
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+err:match('Object replicaset is outdated.*')
+_ = rs_new:callro('echo', {'some_data'})
+
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+vshard.router.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.router.internal)
+package.loaded["vshard.router"] = nil
+_, err = pcall(require, 'vshard.router')
+err:match('Error injection:.*')
+vshard.router.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.router.internal)
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+
 test_run:switch('default')
 test_run:cmd('stop server router_1')
 test_run:cmd('cleanup server router_1')
diff --git a/test/router/router.result b/test/router/router.result
index 3ebab5d..71e156c 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -1024,11 +1024,10 @@ error_messages
 - - Use replicaset:callro(...) instead of replicaset.callro(...)
   - Use replicaset:connect_master(...) instead of replicaset.connect_master(...)
   - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...)
-  - Use replicaset:rebind_connections(...) instead of replicaset.rebind_connections(...)
   - Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...)
   - Use replicaset:call(...) instead of replicaset.call(...)
-  - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
   - Use replicaset:connect(...) instead of replicaset.connect(...)
+  - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
   - Use replicaset:callrw(...) instead of replicaset.callrw(...)
   - Use replicaset:connect_all(...) instead of replicaset.connect_all(...)
 ...
diff --git a/test/storage/reload.result b/test/storage/reload.result
index f689cf4..b7070e5 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -174,6 +174,67 @@ vshard.storage.module_version()
 check_reloaded()
 ---
 ...
+--
+-- Deprecate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_ = require('vshard.storage')
+---
+...
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+---
+...
+err:match('Object replicaset is outdated.*')
+---
+- Object replicaset is outdated. Use new instance
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs.callro(rs, 'echo', {'some_data'})
+---
+...
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_, err = pcall(require, 'vshard.storage')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.storage.internal)
+---
+- true
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
 test_run:switch('default')
 ---
 - true
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 6e19a92..bf8e803 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -87,6 +87,30 @@ _ = require('vshard.storage')
 vshard.storage.module_version()
 check_reloaded()
 
+--
+-- Deprecate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+package.loaded["vshard.storage"] = nil
+_ = require('vshard.storage')
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+err:match('Object replicaset is outdated.*')
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs.callro(rs, 'echo', {'some_data'})
+
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs:callro('echo', {'some_data'})
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+_, err = pcall(require, 'vshard.storage')
+err:match('Error injection:.*')
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.storage.internal)
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs:callro('echo', {'some_data'})
+
 test_run:switch('default')
 test_run:drop_cluster(REPLICASET_2)
 test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 99f59aa..48053a0 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -48,7 +48,8 @@ local lerror = require('vshard.error')
 local fiber = require('fiber')
 local luri = require('uri')
 local ffi = require('ffi')
-local gsc = require('vshard.util').generate_self_checker
+local util = require('vshard.util')
+local gsc = util.generate_self_checker
 
 --
 -- on_connect() trigger for net.box
@@ -338,26 +339,79 @@ local function replicaset_tostring(replicaset)
 end
 
 --
--- Rebind connections of old replicas to new ones.
+-- Deprecate old objects in case of reload.
+--
+local function clean_old_objects(old_replicaset_mt, old_replica_mt,
+                                 old_replicas)
+    local function get_deprecated_warning(obj_name)
+        return function(...)
+            local finfo = debug.getinfo(2)
+            local file = finfo.short_src
+            local name = finfo.name
+            local line = finfo.currentline
+            local err_fmt = '%s.%s:%s: Object %s is outdated. Use new instance'
+            error(string.format(err_fmt, file, name, line, obj_name))
+        end
+    end
+    local function replace_mt_methods(mt, method)
+        if not mt then
+            return
+        end
+        for name, _ in pairs(mt.__index) do
+            mt.__index[name] = method
+        end
+    end
+    replace_mt_methods(old_replicaset_mt, get_deprecated_warning('replicaset'))
+    replace_mt_methods(old_replica_mt, get_deprecated_warning('replica'))
+    for _, replica in pairs(old_replicas) do
+        replica.conn = nil
+    end
+    log.info('Old replicaset and replica objects are cleaned up.')
+end
+
 --
-local function replicaset_rebind_connections(replicaset)
-    for _, replica in pairs(replicaset.replicas) do
-        local old_replica = replica.old_replica
-        if old_replica then
-            local conn = old_replica.conn
-            replica.conn = conn
-            replica.down_ts = old_replica.down_ts
-            replica.net_timeout = old_replica.net_timeout
-            replica.net_sequential_ok = old_replica.net_sequential_ok
-            replica.net_sequential_fail = old_replica.net_sequential_fail
-            if conn then
-                conn.replica = replica
-                conn.replicaset = replicaset
-                old_replica.conn = nil
+-- Copy netbox conections from old replica objects to new ones.
+--
+local function rebind_replicasets(replicasets, deprecate, deprecate_timeout)
+    -- Collect data to deprecate old objects.
+    local old_replicaset_mt = nil
+    local old_replica_mt = nil
+    local old_replicas = {}
+    for _, replicaset in pairs(replicasets) do
+        if replicaset.old_replicaset then
+            local old_mt = getmetatable(replicaset.old_replicaset)
+            assert(old_replicaset_mt == nil or old_replicaset_mt == old_mt)
+            assert(not deprecate or
+                   old_replicaset_mt ~= getmetatable(replicaset))
+            old_replicaset_mt = old_mt
+            replicaset.old_replicaset = nil
+        end
+        for _, replica in pairs(replicaset.replicas) do
+            local old_replica = replica.old_replica
+            if old_replica then
+                local old_mt = getmetatable(old_replica)
+                assert(old_replica_mt == nil or old_replica_mt == old_mt)
+                assert(not deprecate or old_replica_mt ~= getmetatable(replica))
+                old_replica_mt = old_mt
+                table.insert(old_replicas, old_replica)
+                local conn = old_replica.conn
+                replica.conn = conn
+                replica.down_ts = old_replica.down_ts
+                replica.net_timeout = old_replica.net_timeout
+                replica.net_sequential_ok = old_replica.net_sequential_ok
+                replica.net_sequential_fail = old_replica.net_sequential_fail
+                if conn then
+                    conn.replica = replica
+                    conn.replicaset = replicaset
+                end
+                replica.old_replica = nil
             end
-            replica.old_replica = nil
         end
     end
+    if deprecate then
+        util.async_task(deprecate_timeout, clean_old_objects, old_replicaset_mt,
+                        old_replica_mt, old_replicas)
+    end
 end
 
 --
@@ -369,7 +423,6 @@ local replicaset_mt = {
         connect_master = replicaset_connect_master;
         connect_all = replicaset_connect_all;
         connect_replica = replicaset_connect_to_replica;
-        rebind_connections = replicaset_rebind_connections;
         down_replica_priority = replicaset_down_replica_priority;
         up_replica_priority = replicaset_up_replica_priority;
         call = replicaset_master_call;
@@ -523,6 +576,7 @@ local function buildall(sharding_cfg, old_replicasets)
             weight = replicaset.weight,
             bucket_count = 0,
             lock = replicaset.lock,
+            old_replicaset = old_replicaset,
         }, replicaset_mt)
         local priority_list = {}
         for replica_uuid, replica in pairs(replicaset.replicas) do
@@ -596,4 +650,5 @@ return {
     buildall = buildall,
     calculate_etalon_balance = cluster_calculate_etalon_balance,
     wait_masters_connect = wait_masters_connect,
+    rebind_replicasets = rebind_replicasets,
 }
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 1dee80c..c075436 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -1,5 +1,17 @@
 local log = require('log')
 local lfiber = require('fiber')
+
+local MODULE_INTERNALS = '__module_vshard_router'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+    local vshard_modules = {
+        'vshard.consts', 'vshard.error', 'vshard.cfg',
+        'vshard.hash', 'vshard.replicaset', 'vshard.util',
+    }
+    for _, module in pairs(vshard_modules) do
+        package.loaded[module] = nil
+    end
+end
 local consts = require('vshard.consts')
 local lerror = require('vshard.error')
 local lcfg = require('vshard.cfg')
@@ -7,7 +19,7 @@ local lhash = require('vshard.hash')
 local lreplicaset = require('vshard.replicaset')
 local util = require('vshard.util')
 
-local M = rawget(_G, '__module_vshard_router')
+local M = rawget(_G, MODULE_INTERNALS)
 if not M then
     M = {
         errinj = {
@@ -17,6 +29,8 @@ if not M then
         },
         -- Bucket map cache.
         route_map = {},
+        -- Time to deprecate old objects on reload.
+        reload_deprecate_timeout = nil,
         -- All known replicasets used for bucket re-balancing
         replicasets = nil,
         -- Fiber to maintain replica connections.
@@ -129,6 +143,9 @@ local function discovery_f(module_version)
         consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
     while module_version == M.module_version do
         for _, replicaset in pairs(M.replicasets) do
+            if module_version ~= M.module_version then
+                return
+            end
             local active_buckets, err =
                 replicaset:callro('vshard.storage.buckets_discovery', {},
                                   {timeout = 2})
@@ -457,8 +474,11 @@ end
 -- Configuration
 --------------------------------------------------------------------------------
 
+-- Distinguish reload from reconfigure.
+local module_reloading = true
 local function router_cfg(cfg)
     cfg = lcfg.check(cfg)
+    local current_cfg = table.deepcopy(cfg)
     if not M.replicasets then
         log.info('Starting router configuration')
     else
@@ -482,15 +502,16 @@ local function router_cfg(cfg)
     end
     M.total_bucket_count = total_bucket_count
     M.collect_lua_garbage = collect_lua_garbage
+    M.current_cfg = current_cfg
     -- TODO: update existing route map in-place
     M.route_map = {}
     M.replicasets = new_replicasets
     -- Move connections from an old configuration to a new one.
     -- It must be done with no yields to prevent usage both of not
     -- fully moved old replicasets, and not fully built new ones.
-    for _, replicaset in pairs(new_replicasets) do
-        replicaset:rebind_connections()
-    end
+    lreplicaset.rebind_replicasets(new_replicasets, module_reloading,
+                                   M.reload_deprecate_timeout)
+    module_reloading = false
     -- Now the new replicasets are fully built. Can establish
     -- connections and yield.
     for _, replicaset in pairs(new_replicasets) do
@@ -776,15 +797,16 @@ end
 -- About functions, saved in M, and reloading see comment in
 -- storage/init.lua.
 --
-M.discovery_f = discovery_f
-M.failover_f = failover_f
-
-if not rawget(_G, '__module_vshard_router') then
-    rawset(_G, '__module_vshard_router', M)
+if not rawget(_G, MODULE_INTERNALS) then
+    rawset(_G, MODULE_INTERNALS, M)
 else
+    router_cfg(M.current_cfg)
     M.module_version = M.module_version + 1
 end
 
+M.discovery_f = discovery_f
+M.failover_f = failover_f
+
 return {
     cfg = router_cfg;
     info = router_info;
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 879c7c4..b570821 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -2,14 +2,27 @@ local log = require('log')
 local luri = require('uri')
 local lfiber = require('fiber')
 local netbox = require('net.box') -- for net.box:self()
+local trigger = require('internal.trigger')
+
+local MODULE_INTERNALS = '__module_vshard_storage'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+    local vshard_modules = {
+        'vshard.consts', 'vshard.error', 'vshard.cfg',
+        'vshard.replicaset', 'vshard.util',
+    }
+    for _, module in pairs(vshard_modules) do
+        package.loaded[module] = nil
+    end
+end
 local consts = require('vshard.consts')
 local lerror = require('vshard.error')
-local util = require('vshard.util')
 local lcfg = require('vshard.cfg')
 local lreplicaset = require('vshard.replicaset')
-local trigger = require('internal.trigger')
+local util = require('vshard.util')
 
-local M = rawget(_G, '__module_vshard_storage')
+
+local M = rawget(_G, MODULE_INTERNALS)
 if not M then
     --
     -- The module is loaded for the first time.
@@ -21,6 +34,8 @@ if not M then
         -- See format in replicaset.lua.
         --
         replicasets = nil,
+        -- Time to deprecate old objects on reload.
+        reload_deprecate_timeout = nil,
         -- Triggers on master switch event. They are called right
         -- before the event occurs.
         _on_master_enable = trigger.new('_on_master_enable'),
@@ -1445,11 +1460,14 @@ end
 --------------------------------------------------------------------------------
 -- Configuration
 --------------------------------------------------------------------------------
+-- Distinguish reload from reconfigure.
+local module_reloading = true
 local function storage_cfg(cfg, this_replica_uuid)
     if this_replica_uuid == nil then
         error('Usage: cfg(configuration, this_replica_uuid)')
     end
     cfg = lcfg.check(cfg)
+    local current_cfg = table.deepcopy(cfg)
     if cfg.weights or cfg.zone then
         error('Weights and zone are not allowed for storage configuration')
     end
@@ -1572,9 +1590,8 @@ local function storage_cfg(cfg, this_replica_uuid)
     box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
 
     M.replicasets = new_replicasets
-    for _, replicaset in pairs(new_replicasets) do
-        replicaset:rebind_connections()
-    end
+    lreplicaset.rebind_replicasets(new_replicasets, module_reloading)
+    module_reloading = false
     M.this_replicaset = this_replicaset
     M.this_replica = this_replica
     M.total_bucket_count = total_bucket_count
@@ -1583,6 +1600,7 @@ local function storage_cfg(cfg, this_replica_uuid)
     M.shard_index = shard_index
     M.collect_bucket_garbage_interval = collect_bucket_garbage_interval
     M.collect_lua_garbage = collect_lua_garbage
+    M.current_cfg = current_cfg
 
     if was_master and not is_master then
         local_on_master_disable()
@@ -1813,6 +1831,14 @@ end
 -- restarted (or is restarted from M.background_f, which is not
 -- changed) and continues use old func1 and func2.
 --
+
+if not rawget(_G, MODULE_INTERNALS) then
+    rawset(_G, MODULE_INTERNALS, M)
+else
+    storage_cfg(M.current_cfg, M.this_replica.uuid)
+    M.module_version = M.module_version + 1
+end
+
 M.recovery_f = recovery_f
 M.collect_garbage_f = collect_garbage_f
 M.rebalancer_f = rebalancer_f
@@ -1828,12 +1854,6 @@ M.rebalancer_build_routes = rebalancer_build_routes
 M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
 M.cached_find_sharded_spaces = find_sharded_spaces
 
-if not rawget(_G, '__module_vshard_storage') then
-    rawset(_G, '__module_vshard_storage', M)
-else
-    M.module_version = M.module_version + 1
-end
-
 return {
     sync = sync,
     bucket_force_create = bucket_force_create,
diff --git a/vshard/util.lua b/vshard/util.lua
index bb71318..f5bd78c 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -75,8 +75,28 @@ local function generate_self_checker(obj_name, func_name, mt, func)
     end
 end
 
+
+--
+-- Run a function without interrupting current fiber.
+-- @param delay Delay in seconds before the task should be
+--        executed.
+-- @param task Function to be executed.
+-- @param ... Arguments which would be passed to the `task`.
+--
+local function async_task(delay, task, ...)
+    assert(delay == nil or type(delay) == 'number')
+    local function wait_and_call(...)
+        if delay then
+            fiber.sleep(delay)
+        end
+        task(...)
+    end
+    fiber.create(wait_and_call, ...)
+end
+
 return {
     tuple_extract_key = tuple_extract_key,
     reloadable_fiber_f = reloadable_fiber_f,
     generate_self_checker = generate_self_checker,
+    async_task = async_task,
 }
-- 
2.14.1

  parent reply	other threads:[~2018-06-09 17:47 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-06-09 17:47 [tarantool-patches] [PATCH 0/2][vshard] Vshard safe reload AKhatskevich
2018-06-09 17:47 ` [tarantool-patches] [PATCH 1/2] Add test on error during reconfigure AKhatskevich
2018-06-17 19:46   ` [tarantool-patches] " Vladislav Shpilevoy
2018-06-18 10:10     ` Alex Khatskevich
2018-06-18 10:48       ` Vladislav Shpilevoy
2018-06-19 10:36         ` Alex Khatskevich
2018-06-09 17:47 ` AKhatskevich [this message]
2018-06-17 19:46   ` [tarantool-patches] Re: [PATCH 2/2] Complete module reload Vladislav Shpilevoy
2018-06-19  7:09     ` Alex Khatskevich
2018-06-19  9:00       ` Vladislav Shpilevoy
2018-06-19 11:49         ` Alex Khatskevich
2018-06-19 11:57           ` Vladislav Shpilevoy
2018-06-17 19:39 ` [tarantool-patches] Re: [PATCH 0/2][vshard] Vshard safe reload Vladislav Shpilevoy
2018-06-19  7:20   ` Alex Khatskevich

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=d132dfc18a98151b4d27f4017a7c3d5e54ece3a6.1528566184.git.avkhatskevich@tarantool.org \
    --to=avkhatskevich@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [tarantool-patches] [PATCH 2/2] Complete module reload' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox