[tarantool-patches] Re: [PATCH 2/4] Refactor reloadable fiber

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Aug 1 14:54:42 MSK 2018


Thanks for the patch! Pushed into the master.

On 31/07/2018 14:30, Alex Khatskevich wrote:
> full diff
> 
> commit 8afc94e2bb4ec880815e7f2154607d9f23a88e27
> Author: AKhatskevich <avkhatskevich at tarantool.org>
> Date:   Thu Jul 26 19:42:14 2018 +0300
> 
>      Refactor reloadable fiber
> 
>      Reloadable fiber changes:
>      * renamed reloadable_fiber_f -> reloadable_fiber
>      * reloadable_fiber creates fiber and gives name on its own
>      * worker name replaced with a fiber name in logs
>      * added extra data argument, which is passed to a function
> 
> diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result
> index 88cbaae..3df8d4b 100644
> --- a/test/rebalancer/rebalancer.result
> +++ b/test/rebalancer/rebalancer.result
> @@ -418,7 +418,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)
>   fiber = require('fiber')
>   ---
>   ...
> -while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end
> +while not test_run:grep_log('box_2_a', "rebalancer_f has been started") do fiber.sleep(0.1) end
>   ---
>   ...
>   while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end
> diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua
> index 01f2061..0cfe9a9 100644
> --- a/test/rebalancer/rebalancer.test.lua
> +++ b/test/rebalancer/rebalancer.test.lua
> @@ -195,7 +195,7 @@ switch_rs1_master()
>   vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)
> 
>   fiber = require('fiber')
> -while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end
> +while not test_run:grep_log('box_2_a', "rebalancer_f has been started") do fiber.sleep(0.1) end
>   while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end
> 
>   --
> diff --git a/test/router/reload.result b/test/router/reload.result
> index 88122aa..f0badc3 100644
> --- a/test/router/reload.result
> +++ b/test/router/reload.result
> @@ -116,10 +116,10 @@ vshard.router.module_version()
>   check_reloaded()
>   ---
>   ...
> -while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end
> +while test_run:grep_log('router_1', 'failover_f has been started') == nil do fiber.sleep(0.1) end
>   ---
>   ...
> -while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'discovery_f has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
>   ---
>   ...
>   check_reloaded()
> diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
> index 01b7163..528222a 100644
> --- a/test/router/reload.test.lua
> +++ b/test/router/reload.test.lua
> @@ -59,8 +59,8 @@ vshard.router.module_version()
> 
>   check_reloaded()
> 
> -while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end
> -while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'failover_f has been started') == nil do fiber.sleep(0.1) end
> +while test_run:grep_log('router_1', 'discovery_f has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> 
>   check_reloaded()
> 
> diff --git a/test/storage/reload.result b/test/storage/reload.result
> index b91b622..c354dba 100644
> --- a/test/storage/reload.result
> +++ b/test/storage/reload.result
> @@ -113,13 +113,13 @@ vshard.storage.module_version()
>   check_reloaded()
>   ---
>   ...
> -while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end
> +while test_run:grep_log('storage_2_a', 'collect_garbage_f has been started') == nil do fiber.sleep(0.1) end
>   ---
>   ...
> -while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
> +while test_run:grep_log('storage_2_a', 'recovery_f has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
>   ---
>   ...
> -while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
> +while test_run:grep_log('storage_2_a', 'rebalancer_f has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
>   ---
>   ...
>   check_reloaded()
> diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
> index 9140299..a8df1df 100644
> --- a/test/storage/reload.test.lua
> +++ b/test/storage/reload.test.lua
> @@ -59,9 +59,9 @@ vshard.storage.module_version()
> 
>   check_reloaded()
> 
> -while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end
> -while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
> -while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
> +while test_run:grep_log('storage_2_a', 'collect_garbage_f has been started') == nil do fiber.sleep(0.1) end
> +while test_run:grep_log('storage_2_a', 'recovery_f has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
> +while test_run:grep_log('storage_2_a', 'rebalancer_f has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
> 
>   check_reloaded()
> 
> diff --git a/test/unit/garbage.result b/test/unit/garbage.result
> index 0da8ee1..a352bd8 100644
> --- a/test/unit/garbage.result
> +++ b/test/unit/garbage.result
> @@ -343,7 +343,7 @@ control.bucket_generation_collected
>   collect_f = vshard.storage.internal.collect_garbage_f
>   ---
>   ...
> -f = fiber.create(collect_f, vshard.storage.module_version())
> +f = fiber.create(collect_f)
>   ---
>   ...
>   fill_spaces_with_garbage()
> diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
> index db7821f..80d37e7 100644
> --- a/test/unit/garbage.test.lua
> +++ b/test/unit/garbage.test.lua
> @@ -149,7 +149,7 @@ control.bucket_generation_collected
>   -- Test continuous garbage collection via background fiber.
>   --
>   collect_f = vshard.storage.internal.collect_garbage_f
> -f = fiber.create(collect_f, vshard.storage.module_version())
> +f = fiber.create(collect_f)
>   fill_spaces_with_garbage()
>   -- Wait until garbage collection is finished.
>   while #s2:select{} ~= 2 do fiber.sleep(0.1) end
> diff --git a/test/unit/util.result b/test/unit/util.result
> index 30906d1..096e36f 100644
> --- a/test/unit/util.result
> +++ b/test/unit/util.result
> @@ -34,22 +34,22 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end
>   fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end
>   ---
>   ...
> -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
> +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function')
>   ---
>   ...
> -while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end
> +while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end
>   ---
>   ...
>   fib:cancel()
>   ---
>   ...
> -test_run:grep_log('default', 'Worker_name is reloaded, restarting')
> +test_run:grep_log('default', 'module is reloaded, restarting')
>   ---
> -- Worker_name is reloaded, restarting
> +- module is reloaded, restarting
>   ...
> -test_run:grep_log('default', 'Worker_name has been started')
> +test_run:grep_log('default', 'reloadable_function has been started')
>   ---
> -- Worker_name has been started
> +- reloadable_function has been started
>   ...
>   log.info(string.rep('a', 1000))
>   ---
> @@ -58,16 +58,16 @@ log.info(string.rep('a', 1000))
>   fake_M.reloadable_function = function () fiber.sleep(0.01); return true end
>   ---
>   ...
> -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
> +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function')
>   ---
>   ...
> -while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end
> +while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end
>   ---
>   ...
> -fib:cancel()
> +test_run:grep_log('default', 'reloadable_function has been started', 1000)
>   ---
> +- reloadable_function has been started
>   ...
> -test_run:grep_log('default', 'Worker_name has been started', 1000)
> +fib:cancel()
>   ---
> -- Worker_name has been started
>   ...
> diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua
> index 131274c..5f39e06 100644
> --- a/test/unit/util.test.lua
> +++ b/test/unit/util.test.lua
> @@ -14,16 +14,16 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end
>   -- Check autoreload on function change during failure.
>   fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end
> 
> -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
> -while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end
> +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function')
> +while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end
>   fib:cancel()
> -test_run:grep_log('default', 'Worker_name is reloaded, restarting')
> -test_run:grep_log('default', 'Worker_name has been started')
> +test_run:grep_log('default', 'module is reloaded, restarting')
> +test_run:grep_log('default', 'reloadable_function has been started')
>   log.info(string.rep('a', 1000))
> 
>   -- Check reload feature.
>   fake_M.reloadable_function = function () fiber.sleep(0.01); return true end
> -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
> -while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end
> +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function')
> +while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end
> +test_run:grep_log('default', 'reloadable_function has been started', 1000)
>   fib:cancel()
> -test_run:grep_log('default', 'Worker_name has been started', 1000)
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 1a0ed2f..4cb19fd 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -149,9 +149,8 @@ end
>   -- Background fiber to perform discovery. It periodically scans
>   -- replicasets one by one and updates route_map.
>   --
> -local function discovery_f(module_version)
> -    lfiber.name('discovery_fiber')
> -    M.discovery_fiber = lfiber.self()
> +local function discovery_f()
> +    local module_version = M.module_version
>       local iterations_until_lua_gc =
>           consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
>       while module_version == M.module_version do
> @@ -459,9 +458,8 @@ end
>   -- tries to reconnect to the best replica. When the connection is
>   -- established, it replaces the original replica.
>   --
> -local function failover_f(module_version)
> -    lfiber.name('vshard.failover')
> -    M.failover_fiber = lfiber.self()
> +local function failover_f()
> +    local module_version = M.module_version
>       local min_timeout = math.min(consts.FAILOVER_UP_TIMEOUT,
>                                    consts.FAILOVER_DOWN_TIMEOUT)
>       -- This flag is used to avoid logging like:
> @@ -545,10 +543,12 @@ local function router_cfg(cfg)
>           M.route_map[bucket] = M.replicasets[rs.uuid]
>       end
>       if M.failover_fiber == nil then
> -        lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover')
> +        M.failover_fiber = util.reloadable_fiber_create(
> +            'vshard.failover', M, 'failover_f')
>       end
>       if M.discovery_fiber == nil then
> -        lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery')
> +        M.discovery_fiber = util.reloadable_fiber_create(
> +            'vshard.discovery', M, 'discovery_f')
>       end
>       -- Destroy connections, not used in a new configuration.
>       collectgarbage()
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index c1df0e6..5ec6898 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -352,10 +352,10 @@ end
>   -- @param module_version Module version, on which the current
>   --        function had been started. If the actual module version
>   --        appears to be changed, then stop recovery. It is
> ---        restarted in reloadable_fiber_f().
> +--        restarted in reloadable_fiber.
>   --
> -local function recovery_f(module_version)
> -    lfiber.name('vshard.recovery')
> +local function recovery_f()
> +    local module_version = M.module_version
>       local _bucket = box.space._bucket
>       M.buckets_to_recovery = {}
>       for _, bucket in _bucket.index.status:pairs({consts.BUCKET.SENDING}) do
> @@ -728,10 +728,9 @@ local function local_on_master_enable()
>       M._on_master_enable:run()
>       -- Start background process to collect garbage.
>       M.collect_bucket_garbage_fiber =
> -        lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',
> -                      'Garbage collector')
> +        util.reloadable_fiber_create('vshard.gc', M, 'collect_garbage_f')
>       M.recovery_fiber =
> -        lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery')
> +        util.reloadable_fiber_create('vshard.recovery', M, 'recovery_f')
>       -- TODO: check current status
>       log.info("Took on replicaset master role")
>   end
> @@ -1024,10 +1023,10 @@ end
>   -- @param module_version Module version, on which the current
>   --        function had been started. If the actual module version
>   --        appears to be changed, then stop GC. It is restarted
> ---        in reloadable_fiber_f().
> +--        in reloadable_fiber.
>   --
> -function collect_garbage_f(module_version)
> -    lfiber.name('vshard.gc')
> +function collect_garbage_f()
> +    local module_version = M.module_version
>       -- Collector controller. Changes of _bucket increments
>       -- bucket generation. Garbage collector has its own bucket
>       -- generation which is <= actual. Garbage collection is
> @@ -1351,8 +1350,8 @@ end
>   -- Background rebalancer. Works on a storage which has the
>   -- smallest replicaset uuid and which is master.
>   --
> -local function rebalancer_f(module_version)
> -    lfiber.name('vshard.rebalancer')
> +local function rebalancer_f()
> +    local module_version = M.module_version
>       while module_version == M.module_version do
>           while not M.is_rebalancer_active do
>               log.info('Rebalancer is disabled. Sleep')
> @@ -1642,8 +1641,8 @@ local function storage_cfg(cfg, this_replica_uuid)
> 
>       if min_master == this_replica then
>           if not M.rebalancer_fiber then
> -            M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M,
> -                                               'rebalancer_f', 'Rebalancer')
> +            M.rebalancer_fiber = util.reloadable_fiber_create(
> +                'vshard.rebalancer', M, 'rebalancer_f')
>           else
>               log.info('Wakeup rebalancer')
>               -- Configuration had changed. Time to rebalance.
> diff --git a/vshard/util.lua b/vshard/util.lua
> index fb875ce..ea676ff 100644
> --- a/vshard/util.lua
> +++ b/vshard/util.lua
> @@ -10,7 +10,7 @@ if not M then
>       --
>       M = {
>           -- Latest versions of functions.
> -        reloadable_fiber_f = nil,
> +        reloadable_fiber_main_loop = nil,
>       }
>       rawset(_G, MODULE_INTERNALS, M)
>   end
> @@ -34,40 +34,52 @@ end
>   --
>   -- Wrapper to run a func in infinite loop and restart it on
>   -- errors and module reload.
> --- To handle module reload and run new version of a function
> --- in the module, the function should just return.
> ---
> --- @param module Module which can be reloaded.
> --- @param func_name Name of a function to be executed in the
> ---        module.
> --- @param worker_name Name of the reloadable background subsystem.
> ---        For example: "Garbage Collector", "Recovery", "Discovery",
> ---        "Rebalancer". Used only for an activity logging.
> +-- This loop executes the latest version of itself in case of
> +-- reload of that module.
> +-- See description of parameters in `reloadable_fiber_create`.
>   --
> -local function reloadable_fiber_f(module, func_name, worker_name)
> -    log.info('%s has been started', worker_name)
> +local function reloadable_fiber_main_loop(module, func_name)
> +    log.info('%s has been started', func_name)
>       local func = module[func_name]
> -::reload_loop::
> -    local ok, err = pcall(func, module.module_version)
> -    -- yield serves two pursoses:
> +::restart_loop::
> +    local ok, err = pcall(func)
> +    -- yield serves two purposes:
>       --  * makes this fiber cancellable
>       --  * prevents 100% cpu consumption
>       fiber.yield()
>       if not ok then
> -        log.error('%s has been failed: %s', worker_name, err)
> +        log.error('%s has been failed: %s', func_name, err)
>           if func == module[func_name] then
> -            goto reload_loop
> +            goto restart_loop
>           end
>           -- There is a chance that error was raised during reload
>           -- (or caused by reload). Perform reload in case function
>           -- has been changed.
> -        log.error('%s: reloadable function %s has been changed',
> -                  worker_name, func_name)
> +        log.error('reloadable function %s has been changed', func_name)
>       end
> -    log.info('%s is reloaded, restarting', worker_name)
> +    log.info('module is reloaded, restarting')
>       -- luajit drops this frame if next function is called in
>       -- return statement.
> -    return M.reloadable_fiber_f(module, func_name, worker_name)
> +    return M.reloadable_fiber_main_loop(module, func_name)
> +end
> +
> +--
> +-- Create a new fiber which runs a function in a loop. This loop
> +-- is aware of reload mechanism and it loads a new version of the
> +-- function in that case.
> +-- To handle module reload and run new version of a function
> +-- in the module, the function should just return.
> +-- @param fiber_name Name of a new fiber. E.g. "vshard.rebalancer".
> +-- @param module Module which can be reloaded.
> +-- @param func_name Name of a function to be executed in the
> +--        module.
> +-- @retval New fiber.
> +--
> +local function reloadable_fiber_create(fiber_name, module, func_name)
> +    assert(type(fiber_name) == 'string')
> +    local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name)
> +    xfiber:name(fiber_name)
> +    return xfiber
>   end
> 
>   --
> @@ -98,7 +110,7 @@ local function generate_self_checker(obj_name, func_name, mt, func)
>   end
> 
>   -- Update latest versions of function
> -M.reloadable_fiber_f = reloadable_fiber_f
> +M.reloadable_fiber_main_loop = reloadable_fiber_main_loop
> 
>   local function sync_task(delay, task, ...)
>       if delay then
> @@ -121,7 +133,7 @@ end
> 
>   return {
>       tuple_extract_key = tuple_extract_key,
> -    reloadable_fiber_f = reloadable_fiber_f,
> +    reloadable_fiber_create = reloadable_fiber_create,
>       generate_self_checker = generate_self_checker,
>       async_task = async_task,
>       internal = M,
> 
> 




More information about the Tarantool-patches mailing list