[tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Jun 27 14:34:17 MSK 2018


Please, rebase on the latest master branch. When I do,
the tests fail.

On 27/06/2018 13:44, Alex Khatskevich wrote:
> 
> 
> On 27.06.2018 12:54, Vladislav Shpilevoy wrote:
>> Hello. Thanks for the fixes! See 4 comments below.
>>
>> On 26/06/2018 15:50, Alex Khatskevich wrote:
>>> Sorry, I forgot to put the new patch here
>>>
>>> commit 4ddab3c47c5963c3138701d89ba3091d5da9848a
>>> Author: AKhatskevich <avkhatskevich at tarantool.org>
>>> Date:   Thu Jun 14 14:03:07 2018 +0300
>>>
>>>      Reload reloadable fiber
>>>
>>>      Fixed a problem:
>>>      The `reloadable_fiber_f` was running an infinite while loop and
>>>      preventing the whole module from being reloaded.
>>>
>>>      This behavior is fixed by calling new version of `reloadable_fiber_f` in
>>>      a return statement instead of the where loop.  Note: calling a function
>>
>> 1. Again 'where loop'.
>>
> Sorry
>>>      in a return statement doesn't increase a stack size.
>>>
>>>      Extra changes:
>>>       * transfer write "started" message responsibility from caller to
>>>         reloadable fiber
>>>
>>>      Closes #116
>>>
>>> diff --git a/test/unit/util.result b/test/unit/util.result
>>> new file mode 100644
>>> index 0000000..dbfcce9
>>> --- /dev/null
>>> +++ b/test/unit/util.result
>>> @@ -0,0 +1,72 @@
>>> +test_run = require('test_run').new()
>>> +---
>>> +...
>>> +fiber = require('fiber')
>>> +---
>>> +...
>>> +log = require('log')
>>> +---
>>> +...
>>> +test_util = require('util')
>>> +---
>>> +...
>>> +util = require('vshard.util')
>>> +---
>>> +...
>>> +test_run:cmd("setopt delimiter ';'")
>>> +---
>>> +- true
>>> +...
>>> +fake_M = {
>>> +    reloadable_func = nil,
>>> +    module_version = 1,
>>> +};
>>> +---
>>> +...
>>> +test_run:cmd("setopt delimiter ''");
>>> +---
>>> +- true
>>> +...
>>> +-- Check autoreload on function change during failure.
>>> +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end
>>> +---
>>> +...
>>> +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
>>> +---
>>> +...
>>> +fib:cancel()
>>> +---
>>> +...
>>> +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has changed')
>>
>> 2. 'Has been' changed?
> I has changed it, but there are two notes:
> 1. Both are correct https://english.stackexchange.com/questions/97243/has-been-changed-or-has-changed
> 2. vshard uses both options:
>     a. "Configuration has changed, restart"
>     b. "Rebalancer location has changed"
>>
>>> +---
>>> +- 'Worker_name: reloadable function reloadable_function has changed'
>>> +...
>>> diff --git a/vshard/util.lua b/vshard/util.lua
>>> index bb71318..920f53e 100644
>>> --- a/vshard/util.lua
>>> +++ b/vshard/util.lua
>>> @@ -19,33 +32,37 @@ local function tuple_extract_key(tuple, parts)
>>> +local function reloadable_fiber_f(module, func_name, worker_name)
>>> +    log.info('%s has been started', worker_name)
>>> +    local func = module[func_name]
>>> +    local ok, err = pcall(func, module.module_version)
>>> +    if not ok then
>>> +        log.error('%s has been failed: %s', worker_name, err)
>>> +        if func ~= module[func_name] then
>>> +            log.warn('%s: reloadable function %s has changed',
>>> +                        worker_name, func_name)
>>
>> 3. Why is these check and warn here, after an error? 
> Changed:
> 1. warn -> error
> 2. Comment added:
>          -- There is a chance that error was raised during reload
>          -- (or caused by reload). Perform reload in case function
>          -- has been changed.
>>
>>>           end
>>>       end
>>> +    -- yield serves two pursoses:
>>> +    --  * makes this fiber cancellable
>>> +    --  * prevents 100% cpu consumption
>>> +    fiber.yield()
>>> +    log.info('%s is reloaded, restarting', worker_name)
>>
>> 4. Why do you print 'is reloaded' even if it just thrown an error
>> and the function is not changed actually?
> 
> Fixed by goto
>>
>>> +    -- luajit drops this frame if next function is called in
>>> +    -- return statement.
>>> +    return M.reloadable_fiber_f(module, func_name, worker_name)
>>>   end
> 
> Full diff:
> 
> commit d098bf0930935b029fbca3f4fd78211dd5022fb1
> Author: AKhatskevich <avkhatskevich at tarantool.org>
> Date:   Thu Jun 14 14:03:07 2018 +0300
> 
>      Reload reloadable fiber
> 
>      Fixed a problem:
>      The `reloadable_fiber_f` was running an infinite while loop and
>      preventing the whole module from being reloaded.
> 
>      This behavior is fixed by calling new version of `reloadable_fiber_f` in
>      a return statement instead of the while loop. Note: calling a function
>      in a return statement doesn't increase a stack size.
> 
>      Extra changes:
>       * transfer write "started" message responsibility from caller to
>         reloadable fiber
> 
>      Closes #116
> 
> diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result
> index 22100fe..6e3aa81 100644
> --- a/test/rebalancer/rebalancer.result
> +++ b/test/rebalancer/rebalancer.result
> @@ -424,7 +424,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)
>   fiber = require('fiber')
>   ---
>   ...
> -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end
> +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end
>   ---
>   ...
>   while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end
> diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua
> index 3327f30..922357a 100644
> --- a/test/rebalancer/rebalancer.test.lua
> +++ b/test/rebalancer/rebalancer.test.lua
> @@ -197,7 +197,7 @@ switch_rs1_master()
>   vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)
> 
>   fiber = require('fiber')
> -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end
> +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end
>   while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end
> 
>   --
> diff --git a/test/router/reload.result b/test/router/reload.result
> index 19a9ead..47f3c2e 100644
> --- a/test/router/reload.result
> +++ b/test/router/reload.result
> @@ -116,10 +116,10 @@ vshard.router.module_version()
>   check_reloaded()
>   ---
>   ...
> -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end
> +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end
>   ---
>   ...
> -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
>   ---
>   ...
>   check_reloaded()
> diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
> index 6e21b74..af2939d 100644
> --- a/test/router/reload.test.lua
> +++ b/test/router/reload.test.lua
> @@ -59,8 +59,8 @@ vshard.router.module_version()
> 
>   check_reloaded()
> 
> -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end
> -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end
> +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> 
>   check_reloaded()
> 
> diff --git a/test/storage/reload.result b/test/storage/reload.result
> index f689cf4..531d984 100644
> --- a/test/storage/reload.result
> +++ b/test/storage/reload.result
> @@ -113,13 +113,13 @@ vshard.storage.module_version()
>   check_reloaded()
>   ---
>   ...
> -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end
> +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end
>   ---
>   ...
> -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
> +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
>   ---
>   ...
> -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
> +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
>   ---
>   ...
>   check_reloaded()
> diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
> index 6e19a92..64c3a60 100644
> --- a/test/storage/reload.test.lua
> +++ b/test/storage/reload.test.lua
> @@ -59,9 +59,9 @@ vshard.storage.module_version()
> 
>   check_reloaded()
> 
> -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end
> -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
> -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
> +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end
> +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
> +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
> 
>   check_reloaded()
> 
> diff --git a/test/unit/util.result b/test/unit/util.result
> new file mode 100644
> index 0000000..a7b176d
> --- /dev/null
> +++ b/test/unit/util.result
> @@ -0,0 +1,72 @@
> +test_run = require('test_run').new()
> +---
> +...
> +fiber = require('fiber')
> +---
> +...
> +log = require('log')
> +---
> +...
> +test_util = require('util')
> +---
> +...
> +util = require('vshard.util')
> +---
> +...
> +test_run:cmd("setopt delimiter ';'")
> +---
> +- true
> +...
> +fake_M = {
> +    reloadable_func = nil,
> +    module_version = 1,
> +};
> +---
> +...
> +test_run:cmd("setopt delimiter ''");
> +---
> +- true
> +...
> +-- Check autoreload on function change during failure.
> +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end
> +---
> +...
> +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
> +---
> +...
> +fib:cancel()
> +---
> +...
> +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed')
> +---
> +- 'Worker_name: reloadable function reloadable_function has been changed'
> +...
> +test_run:grep_log('default', 'Worker_name is reloaded, restarting')
> +---
> +- Worker_name is reloaded, restarting
> +...
> +test_run:grep_log('default', 'Worker_name has been started')
> +---
> +- Worker_name has been started
> +...
> +log.info(string.rep('a', 1000))
> +---
> +...
> +-- Check reload feature.
> +fake_M.reloadable_function = function () return true end
> +---
> +...
> +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
> +---
> +...
> +fib:cancel()
> +---
> +...
> +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000)
> +---
> +- Worker_name is reloaded, restarting
> +...
> +test_run:grep_log('default', 'Worker_name has been started', 1000)
> +---
> +- Worker_name has been started
> +...
> diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua
> new file mode 100644
> index 0000000..9359378
> --- /dev/null
> +++ b/test/unit/util.test.lua
> @@ -0,0 +1,29 @@
> +test_run = require('test_run').new()
> +fiber = require('fiber')
> +log = require('log')
> +test_util = require('util')
> +util = require('vshard.util')
> +
> +test_run:cmd("setopt delimiter ';'")
> +fake_M = {
> +    reloadable_func = nil,
> +    module_version = 1,
> +};
> +test_run:cmd("setopt delimiter ''");
> +
> +-- Check autoreload on function change during failure.
> +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end
> +
> +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
> +fib:cancel()
> +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed')
> +test_run:grep_log('default', 'Worker_name is reloaded, restarting')
> +test_run:grep_log('default', 'Worker_name has been started')
> +log.info(string.rep('a', 1000))
> +
> +-- Check reload feature.
> +fake_M.reloadable_function = function () return true end
> +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
> +fib:cancel()
> +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000)
> +test_run:grep_log('default', 'Worker_name has been started', 1000)
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 21093e5..9e18b27 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -491,11 +491,9 @@ local function router_cfg(cfg)
>       end
>       lreplicaset.wait_masters_connect(new_replicasets)
>       if M.failover_fiber == nil then
> -        log.info('Start failover fiber')
>           lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover')
>       end
>       if M.discovery_fiber == nil then
> -        log.info('Start discovery fiber')
>           lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery')
>       end
>       -- Destroy connections, not used in a new configuration.
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 57076e1..300751f 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -692,10 +692,8 @@ local function local_on_master_enable()
>       M.collect_bucket_garbage_fiber =
>           lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',
>                         'Garbage collector')
> -    log.info("GC is started")
>       M.recovery_fiber =
>           lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery')
> -    log.info('Recovery is started')
>       -- TODO: check current status
>       log.info("Took on replicaset master role")
>   end
> @@ -1585,7 +1583,6 @@ local function storage_cfg(cfg, this_replica_uuid)
> 
>       if min_master == this_replica then
>           if not M.rebalancer_fiber then
> -            log.info('Run rebalancer')
>               M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M,
> 'rebalancer_f', 'Rebalancer')
>           else
> diff --git a/vshard/util.lua b/vshard/util.lua
> index bb71318..ce79930 100644
> --- a/vshard/util.lua
> +++ b/vshard/util.lua
> @@ -2,6 +2,19 @@
>   local log = require('log')
>   local fiber = require('fiber')
> 
> +local MODULE_INTERNALS = '__module_vshard_util'
> +local M = rawget(_G, MODULE_INTERNALS)
> +if not M then
> +    --
> +    -- The module is loaded for the first time.
> +    --
> +    M = {
> +        -- Latest versions of functions.
> +        reloadable_fiber_f = nil,
> +    }
> +    rawset(_G, MODULE_INTERNALS, M)
> +end
> +
>   --
>   -- Extract parts of a tuple.
>   -- @param tuple Tuple to extract a key from.
> @@ -19,33 +32,42 @@ local function tuple_extract_key(tuple, parts)
>   end
> 
>   --
> --- Wrapper to run @a func in infinite loop and restart it on the
> --- module reload. This function CAN NOT BE AUTORELOADED. To update
> --- it you must manualy stop all fibers, run by this function, do
> --- reload, and then restart all stopped fibers. This can be done,
> --- for example, by calling vshard.storage/router.cfg() again with
> --- the same config as earlier.
> +-- Wrapper to run a func in infinite loop and restart it on
> +-- errors and module reload.
> +-- To handle module reload and run new version of a function
> +-- in the module, the function should just return.
>   --
> --- @param func Reloadable function to run. It must accept current
> ---        module version as an argument, and interrupt itself,
> ---        when it is changed.
> --- @param worker_name Name of the function. Usual infinite fiber
> ---        represents a background subsystem, which has a name. For
> ---        example: "Garbage Collector", "Recovery", "Discovery",
> ---        "Rebalancer".
> --- @param M Module which can reload.
> +-- @param module Module which can be reloaded.
> +-- @param func_name Name of a function to be executed in the
> +--        module.
> +-- @param worker_name Name of the reloadable background subsystem.
> +--        For example: "Garbage Collector", "Recovery", "Discovery",
> +--        "Rebalancer". Used only for an activity logging.
>   --
> -local function reloadable_fiber_f(M, func_name, worker_name)
> -    while true do
> -        local ok, err = pcall(M[func_name], M.module_version)
> -        if not ok then
> -            log.error('%s has been failed: %s', worker_name, err)
> -            fiber.yield()
> -        else
> -            log.info('%s has been reloaded', worker_name)
> -            fiber.yield()
> +local function reloadable_fiber_f(module, func_name, worker_name)
> +    log.info('%s has been started', worker_name)
> +    local func = module[func_name]
> +::reload_loop::
> +    local ok, err = pcall(func, module.module_version)
> +    -- yield serves two pursoses:
> +    --  * makes this fiber cancellable
> +    --  * prevents 100% cpu consumption
> +    fiber.yield()
> +    if not ok then
> +        log.error('%s has been failed: %s', worker_name, err)
> +        if func == module[func_name] then
> +            goto reload_loop
>           end
> +        -- There is a chance that error was raised during reload
> +        -- (or caused by reload). Perform reload in case function
> +        -- has been changed.
> +        log.error('%s: reloadable function %s has been changed',
> +                  worker_name, func_name)
>       end
> +    log.info('%s is reloaded, restarting', worker_name)
> +    -- luajit drops this frame if next function is called in
> +    -- return statement.
> +    return M.reloadable_fiber_f(module, func_name, worker_name)
>   end
> 
>   --
> @@ -75,8 +97,12 @@ local function generate_self_checker(obj_name, func_name, mt, func)
>       end
>   end
> 
> +-- Update latest versions of function
> +M.reloadable_fiber_f = reloadable_fiber_f
> +
>   return {
>       tuple_extract_key = tuple_extract_key,
>       reloadable_fiber_f = reloadable_fiber_f,
>       generate_self_checker = generate_self_checker,
> +    internal = M,
>   }
> 




More information about the Tarantool-patches mailing list