* [tarantool-patches] [PATCH][vshard] Reload reloadable fiber @ 2018-06-14 11:42 AKhatskevich 2018-06-14 11:58 ` AKhatskevich 2018-06-21 12:04 ` [tarantool-patches] " Vladislav Shpilevoy 0 siblings, 2 replies; 11+ messages in thread From: AKhatskevich @ 2018-06-14 11:42 UTC (permalink / raw) To: v.shpilevoy, tarantool-patches Fixed a problem: The `reloadable_fiber_f` was running an infinite where loop and preventing the whole module from being reloaded. This behavior is fixed by calling new version of `reloadable_fiber_f` in a return statement instead of the where loop. Note: calling a function in a return statement doesn't increase a stack size. Closes #116 --- Branch: https://github.com/tarantool/vshard/tree/kh/gh-116-reloadable Issue: https://github.com/tarantool/vshard/issues/116 test/router/reload.result | 4 +-- test/router/reload.test.lua | 4 +-- test/storage/reload.result | 6 ++-- test/storage/reload.test.lua | 6 ++-- vshard/util.lua | 78 +++++++++++++++++++++++++++++++------------- 5 files changed, 65 insertions(+), 33 deletions(-) diff --git a/test/router/reload.result b/test/router/reload.result index 19a9ead..47f3c2e 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -116,10 +116,10 @@ vshard.router.module_version() check_reloaded() --- ... -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end --- ... check_reloaded() diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 6e21b74..af2939d 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -59,8 +59,8 @@ vshard.router.module_version() check_reloaded() -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end check_reloaded() diff --git a/test/storage/reload.result b/test/storage/reload.result index f689cf4..531d984 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -113,13 +113,13 @@ vshard.storage.module_version() check_reloaded() --- ... -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end --- ... -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end --- ... check_reloaded() diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 6e19a92..64c3a60 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -59,9 +59,9 @@ vshard.storage.module_version() check_reloaded() -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end check_reloaded() diff --git a/vshard/util.lua b/vshard/util.lua index bb71318..fa51701 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -2,6 +2,24 @@ local log = require('log') local fiber = require('fiber') +local MODULE_INTERNALS = '__module_vshard_util' +local M = rawget(_G, MODULE_INTERNALS) +if not M then + -- + -- The module is loaded for the first time. + -- + M = { + -- Latest versions of functions. + reloadable_fiber_f = nil, + errinj = { + RELOADABLE_STACK_MAX = nil, + RELOADABLE_EXIT = nil, + } + + } + rawset(_G, MODULE_INTERNALS, M) +end + -- -- Extract parts of a tuple. -- @param tuple Tuple to extract a key from. @@ -19,33 +37,43 @@ local function tuple_extract_key(tuple, parts) end -- --- Wrapper to run @a func in infinite loop and restart it on the --- module reload. This function CAN NOT BE AUTORELOADED. To update --- it you must manualy stop all fibers, run by this function, do --- reload, and then restart all stopped fibers. This can be done, --- for example, by calling vshard.storage/router.cfg() again with --- the same config as earlier. +-- Wrapper to run a func in infinite loop and restart it on +-- errors and module reload. +-- To handle module reload and run new version of a function +-- in the module, the function should just return. -- --- @param func Reloadable function to run. It must accept current --- module version as an argument, and interrupt itself, --- when it is changed. --- @param worker_name Name of the function. Usual infinite fiber --- represents a background subsystem, which has a name. For --- example: "Garbage Collector", "Recovery", "Discovery", --- "Rebalancer". --- @param M Module which can reload. +-- @param module Module which can be reloaded. +-- @param func_name Name of a function to be executed in the +-- module. +-- @param worker_name Name of the reloadable background subsystem. +-- For example: "Garbage Collector", "Recovery", "Discovery", +-- "Rebalancer". Used only for an activity logging. -- -local function reloadable_fiber_f(M, func_name, worker_name) - while true do - local ok, err = pcall(M[func_name], M.module_version) - if not ok then - log.error('%s has been failed: %s', worker_name, err) - fiber.yield() - else - log.info('%s has been reloaded', worker_name) - fiber.yield() +local function reloadable_fiber_f(module, func_name, worker_name) + log.info('%s has been started', worker_name) + local func = module[func_name] + local ok, err = pcall(func, module.module_version) + if not ok then + log.error('%s has been failed: %s', worker_name, err) + if func ~= module[func_name] then + log.warn('%s reloadable function %s has changed', + worker_name, func_name) end end + fiber.yield() + log.info('%s is reloading', worker_name) + if M.errinj.RELOADABLE_EXIT then + return + end + -- luajit drops this frame if next function is called in + -- return statement. + if M.errinj.RELOADABLE_STACK_MAX then + M.errinj.RELOADABLE_STACK_MAX = M.errinj.RELOADABLE_STACK_MAX - 1 + if M.errinj.RELOADABLE_STACK_MAX == 0 then + return + end + end + return M.reloadable_fiber_f(module, func_name, worker_name) end -- @@ -75,8 +103,12 @@ local function generate_self_checker(obj_name, func_name, mt, func) end end +-- Update latest versions of function +M.reloadable_fiber_f = reloadable_fiber_f + return { tuple_extract_key = tuple_extract_key, reloadable_fiber_f = reloadable_fiber_f, generate_self_checker = generate_self_checker, + internal = M, } -- 2.14.1 ^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] [PATCH][vshard] Reload reloadable fiber 2018-06-14 11:42 [tarantool-patches] [PATCH][vshard] Reload reloadable fiber AKhatskevich @ 2018-06-14 11:58 ` AKhatskevich 2018-06-21 12:04 ` [tarantool-patches] " Vladislav Shpilevoy 1 sibling, 0 replies; 11+ messages in thread From: AKhatskevich @ 2018-06-14 11:58 UTC (permalink / raw) To: v.shpilevoy, tarantool-patches Fixed a problem: The `reloadable_fiber_f` was running an infinite where loop and preventing the whole module from being reloaded. This behavior is fixed by calling new version of `reloadable_fiber_f` in a return statement instead of the where loop. Note: calling a function in a return statement doesn't increase a stack size. Closes #116 --- Branch: https://github.com/tarantool/vshard/tree/kh/gh-116-reloadable Issue: https://github.com/tarantool/vshard/issues/116 Change: * Add tests. test/router/reload.result | 4 +- test/router/reload.test.lua | 4 +- test/storage/reload.result | 6 +-- test/storage/reload.test.lua | 6 +-- test/unit/util.result | 107 +++++++++++++++++++++++++++++++++++++++++++ test/unit/util.test.lua | 50 ++++++++++++++++++++ vshard/util.lua | 78 +++++++++++++++++++++---------- 7 files changed, 222 insertions(+), 33 deletions(-) create mode 100644 test/unit/util.result create mode 100644 test/unit/util.test.lua diff --git a/test/router/reload.result b/test/router/reload.result index 19a9ead..47f3c2e 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -116,10 +116,10 @@ vshard.router.module_version() check_reloaded() --- ... -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end --- ... check_reloaded() diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 6e21b74..af2939d 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -59,8 +59,8 @@ vshard.router.module_version() check_reloaded() -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end check_reloaded() diff --git a/test/storage/reload.result b/test/storage/reload.result index f689cf4..531d984 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -113,13 +113,13 @@ vshard.storage.module_version() check_reloaded() --- ... -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end --- ... -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end --- ... check_reloaded() diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 6e19a92..64c3a60 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -59,9 +59,9 @@ vshard.storage.module_version() check_reloaded() -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end check_reloaded() diff --git a/test/unit/util.result b/test/unit/util.result new file mode 100644 index 0000000..ea9edfa --- /dev/null +++ b/test/unit/util.result @@ -0,0 +1,107 @@ +test_run = require('test_run').new() +--- +... +util = require('vshard.util') +--- +... +test_util = require('util') +--- +... +log = require('log') +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +fake_M = { + reloadable_func = nil, + module_version = 1, +}; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Check autoreload on function change during failure. +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end +--- +... +util.internal.errinj.RELOADABLE_FUNCTION_CHANGED = true +--- +... +util.internal.errinj.RELOADABLE_EXIT = true +--- +... +util.reloadable_fiber_f(fake_M, 'reloadable_function', 'Worker_name') +--- +... +test_run:grep_log('default', 'reloadable function reloadable_function has changed') +--- +- reloadable function reloadable_function has changed +... +test_run:grep_log('default', 'is reloading') +--- +- is reloading +... +util.internal.errinj.RELOADABLE_FUNCTION_CHANGED = nil +--- +... +log.info(string.rep('a', 1000)) +--- +... +-- Check reload feature. +fake_M.reloadable_function = function () return true end +--- +... +util.reloadable_fiber_f(fake_M, 'reloadable_function', 'Worker_name') +--- +... +test_run:grep_log('default', 'is reloading', 1000) +--- +- is reloading +... +stack_size_log = {} +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +fake_M.reloadable_function = function () + table.insert(stack_size_log, #debug.traceback()) +end +test_run:cmd("setopt delimiter ''"); +--- +... +-- Chack that stack size is not increased by recursive call of +-- `reloadable_fiber_f`. +util.internal.errinj.RELOADABLE_EXIT = nil +--- +... +util.internal.errinj.RELOADABLE_STACK_MAX = 10 +--- +... +util.reloadable_fiber_f(fake_M, 'reloadable_function', 'Worker_name') +--- +... +#stack_size_log +--- +- 10 +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +for _, frame_size in pairs(stack_size_log) do + if stack_size_log[1] ~= frame_size then + error("Stack grows.") + end +end +test_run:cmd("setopt delimiter ''"); +--- +... +util.internal.errinj.RELOADABLE_STACK_MAX = nil +--- +... diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua new file mode 100644 index 0000000..fa0aff2 --- /dev/null +++ b/test/unit/util.test.lua @@ -0,0 +1,50 @@ +test_run = require('test_run').new() +util = require('vshard.util') +test_util = require('util') +log = require('log') + +test_run:cmd("setopt delimiter ';'") +fake_M = { + reloadable_func = nil, + module_version = 1, +}; +test_run:cmd("setopt delimiter ''"); + +-- Check autoreload on function change during failure. +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end + +util.internal.errinj.RELOADABLE_FUNCTION_CHANGED = true +util.internal.errinj.RELOADABLE_EXIT = true + +util.reloadable_fiber_f(fake_M, 'reloadable_function', 'Worker_name') +test_run:grep_log('default', 'reloadable function reloadable_function has changed') +test_run:grep_log('default', 'is reloading') +util.internal.errinj.RELOADABLE_FUNCTION_CHANGED = nil +log.info(string.rep('a', 1000)) + +-- Check reload feature. +fake_M.reloadable_function = function () return true end +util.reloadable_fiber_f(fake_M, 'reloadable_function', 'Worker_name') +test_run:grep_log('default', 'is reloading', 1000) + +stack_size_log = {} +test_run:cmd("setopt delimiter ';'") +fake_M.reloadable_function = function () + table.insert(stack_size_log, #debug.traceback()) +end +test_run:cmd("setopt delimiter ''"); + +-- Chack that stack size is not increased by recursive call of +-- `reloadable_fiber_f`. +util.internal.errinj.RELOADABLE_EXIT = nil +util.internal.errinj.RELOADABLE_STACK_MAX = 10 +util.reloadable_fiber_f(fake_M, 'reloadable_function', 'Worker_name') +#stack_size_log +test_run:cmd("setopt delimiter ';'") +for _, frame_size in pairs(stack_size_log) do + if stack_size_log[1] ~= frame_size then + error("Stack grows.") + end +end +test_run:cmd("setopt delimiter ''"); +util.internal.errinj.RELOADABLE_STACK_MAX = nil diff --git a/vshard/util.lua b/vshard/util.lua index bb71318..fa51701 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -2,6 +2,24 @@ local log = require('log') local fiber = require('fiber') +local MODULE_INTERNALS = '__module_vshard_util' +local M = rawget(_G, MODULE_INTERNALS) +if not M then + -- + -- The module is loaded for the first time. + -- + M = { + -- Latest versions of functions. + reloadable_fiber_f = nil, + errinj = { + RELOADABLE_STACK_MAX = nil, + RELOADABLE_EXIT = nil, + } + + } + rawset(_G, MODULE_INTERNALS, M) +end + -- -- Extract parts of a tuple. -- @param tuple Tuple to extract a key from. @@ -19,33 +37,43 @@ local function tuple_extract_key(tuple, parts) end -- --- Wrapper to run @a func in infinite loop and restart it on the --- module reload. This function CAN NOT BE AUTORELOADED. To update --- it you must manualy stop all fibers, run by this function, do --- reload, and then restart all stopped fibers. This can be done, --- for example, by calling vshard.storage/router.cfg() again with --- the same config as earlier. +-- Wrapper to run a func in infinite loop and restart it on +-- errors and module reload. +-- To handle module reload and run new version of a function +-- in the module, the function should just return. -- --- @param func Reloadable function to run. It must accept current --- module version as an argument, and interrupt itself, --- when it is changed. --- @param worker_name Name of the function. Usual infinite fiber --- represents a background subsystem, which has a name. For --- example: "Garbage Collector", "Recovery", "Discovery", --- "Rebalancer". --- @param M Module which can reload. +-- @param module Module which can be reloaded. +-- @param func_name Name of a function to be executed in the +-- module. +-- @param worker_name Name of the reloadable background subsystem. +-- For example: "Garbage Collector", "Recovery", "Discovery", +-- "Rebalancer". Used only for an activity logging. -- -local function reloadable_fiber_f(M, func_name, worker_name) - while true do - local ok, err = pcall(M[func_name], M.module_version) - if not ok then - log.error('%s has been failed: %s', worker_name, err) - fiber.yield() - else - log.info('%s has been reloaded', worker_name) - fiber.yield() +local function reloadable_fiber_f(module, func_name, worker_name) + log.info('%s has been started', worker_name) + local func = module[func_name] + local ok, err = pcall(func, module.module_version) + if not ok then + log.error('%s has been failed: %s', worker_name, err) + if func ~= module[func_name] then + log.warn('%s reloadable function %s has changed', + worker_name, func_name) end end + fiber.yield() + log.info('%s is reloading', worker_name) + if M.errinj.RELOADABLE_EXIT then + return + end + -- luajit drops this frame if next function is called in + -- return statement. + if M.errinj.RELOADABLE_STACK_MAX then + M.errinj.RELOADABLE_STACK_MAX = M.errinj.RELOADABLE_STACK_MAX - 1 + if M.errinj.RELOADABLE_STACK_MAX == 0 then + return + end + end + return M.reloadable_fiber_f(module, func_name, worker_name) end -- @@ -75,8 +103,12 @@ local function generate_self_checker(obj_name, func_name, mt, func) end end +-- Update latest versions of function +M.reloadable_fiber_f = reloadable_fiber_f + return { tuple_extract_key = tuple_extract_key, reloadable_fiber_f = reloadable_fiber_f, generate_self_checker = generate_self_checker, + internal = M, } -- 2.14.1 ^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber 2018-06-14 11:42 [tarantool-patches] [PATCH][vshard] Reload reloadable fiber AKhatskevich 2018-06-14 11:58 ` AKhatskevich @ 2018-06-21 12:04 ` Vladislav Shpilevoy 2018-06-26 12:43 ` Alex Khatskevich 1 sibling, 1 reply; 11+ messages in thread From: Vladislav Shpilevoy @ 2018-06-21 12:04 UTC (permalink / raw) To: AKhatskevich, tarantool-patches Hello. Thanks for the patch! See my 6 comments below. On 14/06/2018 14:42, AKhatskevich wrote: > Fixed a problem: > The `reloadable_fiber_f` was running an infinite where loop and 1. What is a 'where loop'? > preventing the whole module from being reloaded. > > This behavior is fixed by calling new version of `reloadable_fiber_f` in > a return statement instead of the where loop. Note: calling a function > in a return statement doesn't increase a stack size. > > Closes #116 > --- > Branch: https://github.com/tarantool/vshard/tree/kh/gh-116-reloadable > Issue: https://github.com/tarantool/vshard/issues/116 > > test/router/reload.result | 4 +-- > test/router/reload.test.lua | 4 +-- > test/storage/reload.result | 6 ++-- > test/storage/reload.test.lua | 6 ++-- > vshard/util.lua | 78 +++++++++++++++++++++++++++++++------------- > 5 files changed, 65 insertions(+), 33 deletions(-) > > diff --git a/test/router/reload.result b/test/router/reload.result > index 19a9ead..47f3c2e 100644 > --- a/test/router/reload.result > +++ b/test/router/reload.result > @@ -116,10 +116,10 @@ vshard.router.module_version() > check_reloaded() > --- > ... > -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end 2. Why? Please, leave the old message. Router already writes that failover is started in router_cfg. In other places the same. > diff --git a/vshard/util.lua b/vshard/util.lua > index bb71318..fa51701 100644 > --- a/vshard/util.lua > +++ b/vshard/util.lua > @@ -2,6 +2,24 @@ > local log = require('log') > local fiber = require('fiber') > > +local MODULE_INTERNALS = '__module_vshard_util' > +local M = rawget(_G, MODULE_INTERNALS) > +if not M then > + -- > + -- The module is loaded for the first time. > + -- > + M = { > + -- Latest versions of functions. > + reloadable_fiber_f = nil, > + errinj = { > + RELOADABLE_STACK_MAX = nil, 3. What is the point of this error injection? It tests Lua, not VShard, as I think. And it takes too many lines in the reloadable fiber function complicating its understanding. So lets remove. > + RELOADABLE_EXIT = nil, > + } > + > + } > + rawset(_G, MODULE_INTERNALS, M) > +end > + > @@ -19,33 +37,43 @@ local function tuple_extract_key(tuple, parts) > end > > -- > --- Wrapper to run @a func in infinite loop and restart it on the > --- module reload. This function CAN NOT BE AUTORELOADED. To update > --- it you must manualy stop all fibers, run by this function, do > --- reload, and then restart all stopped fibers. This can be done, > --- for example, by calling vshard.storage/router.cfg() again with > --- the same config as earlier. > +-- Wrapper to run a func in infinite loop and restart it on > +-- errors and module reload. > +-- To handle module reload and run new version of a function > +-- in the module, the function should just return. > -- > --- @param func Reloadable function to run. It must accept current > --- module version as an argument, and interrupt itself, > --- when it is changed. > --- @param worker_name Name of the function. Usual infinite fiber > --- represents a background subsystem, which has a name. For > --- example: "Garbage Collector", "Recovery", "Discovery", > --- "Rebalancer". > --- @param M Module which can reload. > +-- @param module Module which can be reloaded. > +-- @param func_name Name of a function to be executed in the > +-- module. > +-- @param worker_name Name of the reloadable background subsystem. > +-- For example: "Garbage Collector", "Recovery", "Discovery", > +-- "Rebalancer". Used only for an activity logging. > -- > -local function reloadable_fiber_f(M, func_name, worker_name) > - while true do > - local ok, err = pcall(M[func_name], M.module_version) > - if not ok then > - log.error('%s has been failed: %s', worker_name, err) > - fiber.yield() > - else > - log.info('%s has been reloaded', worker_name) > - fiber.yield() > +local function reloadable_fiber_f(module, func_name, worker_name) > + log.info('%s has been started', worker_name) > + local func = module[func_name] > + local ok, err = pcall(func, module.module_version) > + if not ok then > + log.error('%s has been failed: %s', worker_name, err) > + if func ~= module[func_name] then > + log.warn('%s reloadable function %s has changed', > + worker_name, func_name) > end > end > + fiber.yield() > + log.info('%s is reloading', worker_name) > + if M.errinj.RELOADABLE_EXIT then > + return 4. How is this error possible? There are no lines in reloadable_fiber_f that can terminate the fiber. 5. Now on any reload I see two messages: started reloading started reloading But actually the fiber is started once. Please, return the old messages. > diff --git a/test/unit/util.result b/test/unit/util.result > new file mode 100644 > index 0000000..ea9edfa > --- /dev/null > +++ b/test/unit/util.result > @@ -0,0 +1,107 @@ > +test_run = require('test_run').new() > +--- > +... > +util = require('vshard.util') > +--- > +... > +test_util = require('util') 6. Unused variable? > +--- > +... > +log = require('log') > +--- > +... ^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber 2018-06-21 12:04 ` [tarantool-patches] " Vladislav Shpilevoy @ 2018-06-26 12:43 ` Alex Khatskevich 2018-06-26 12:50 ` Alex Khatskevich 0 siblings, 1 reply; 11+ messages in thread From: Alex Khatskevich @ 2018-06-26 12:43 UTC (permalink / raw) To: Vladislav Shpilevoy, tarantool-patches >> Fixed a problem: >> The `reloadable_fiber_f` was running an infinite where loop and > > 1. What is a 'where loop'? where -> while >> -while test_run:grep_log('router_1', 'Failover has been reloaded') == >> nil do fiber.sleep(0.1) end >> +while test_run:grep_log('router_1', 'Failover has been started') == >> nil do fiber.sleep(0.1) end > > 2. Why? Please, leave the old message. Router already writes that > failover is started in router_cfg. In other places the same. Discussed verbally. After this commit, it is the reloadable fiber's responsibility to write "started" message, it looks like 'Worker_name has been started'. Reloaded message looks like 'Worker_name is reloaded, restarting' >> + -- >> + -- The module is loaded for the first time. >> + -- >> + M = { >> + -- Latest versions of functions. >> + reloadable_fiber_f = nil, >> + errinj = { >> + RELOADABLE_STACK_MAX = nil, > > 3. What is the point of this error injection? It > tests Lua, not VShard, as I think. And it takes > too many lines in the reloadable fiber function > complicating its understanding. So lets remove. deleted >> +-- @param module Module which can be reloaded. >> +-- @param func_name Name of a function to be executed in the >> +-- module. >> +-- @param worker_name Name of the reloadable background subsystem. >> +-- For example: "Garbage Collector", "Recovery", "Discovery", >> +-- "Rebalancer". Used only for an activity logging. >> -- >> -local function reloadable_fiber_f(M, func_name, worker_name) >> - while true do >> - local ok, err = pcall(M[func_name], M.module_version) >> - if not ok then >> - log.error('%s has been failed: %s', worker_name, err) >> - fiber.yield() >> - else >> - log.info('%s has been reloaded', worker_name) >> - fiber.yield() >> +local function reloadable_fiber_f(module, func_name, worker_name) >> + log.info('%s has been started', worker_name) >> + local func = module[func_name] >> + local ok, err = pcall(func, module.module_version) >> + if not ok then >> + log.error('%s has been failed: %s', worker_name, err) >> + if func ~= module[func_name] then >> + log.warn('%s reloadable function %s has changed', >> + worker_name, func_name) >> end >> end >> + fiber.yield() >> + log.info('%s is reloading', worker_name) >> + if M.errinj.RELOADABLE_EXIT then >> + return > > 4. How is this error possible? There are no lines in reloadable_fiber_f > that can terminate the fiber. deleted > > 5. Now on any reload I see two messages: > > started > reloading > started > reloading > > But actually the fiber is started once. Please, return the old > messages. > discussed verbally >> +++ b/test/unit/util.result >> @@ -0,0 +1,107 @@ >> +test_run = require('test_run').new() >> +--- >> +... >> +util = require('vshard.util') >> +--- >> +... >> +test_util = require('util') > > 6. Unused variable? deleted ^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber 2018-06-26 12:43 ` Alex Khatskevich @ 2018-06-26 12:50 ` Alex Khatskevich 2018-06-27 9:54 ` Vladislav Shpilevoy 0 siblings, 1 reply; 11+ messages in thread From: Alex Khatskevich @ 2018-06-26 12:50 UTC (permalink / raw) To: Vladislav Shpilevoy, tarantool-patches Sorry, I forgot to put the new patch here commit 4ddab3c47c5963c3138701d89ba3091d5da9848a Author: AKhatskevich <avkhatskevich@tarantool.org> Date: Thu Jun 14 14:03:07 2018 +0300 Reload reloadable fiber Fixed a problem: The `reloadable_fiber_f` was running an infinite while loop and preventing the whole module from being reloaded. This behavior is fixed by calling new version of `reloadable_fiber_f` in a return statement instead of the where loop. Note: calling a function in a return statement doesn't increase a stack size. Extra changes: * transfer write "started" message responsibility from caller to reloadable fiber Closes #116 diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result index 22100fe..6e3aa81 100644 --- a/test/rebalancer/rebalancer.result +++ b/test/rebalancer/rebalancer.result @@ -424,7 +424,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) fiber = require('fiber') --- ... -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end --- ... while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua index 3327f30..922357a 100644 --- a/test/rebalancer/rebalancer.test.lua +++ b/test/rebalancer/rebalancer.test.lua @@ -197,7 +197,7 @@ switch_rs1_master() vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) fiber = require('fiber') -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end -- diff --git a/test/router/reload.result b/test/router/reload.result index 19a9ead..47f3c2e 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -116,10 +116,10 @@ vshard.router.module_version() check_reloaded() --- ... -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end --- ... check_reloaded() diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 6e21b74..af2939d 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -59,8 +59,8 @@ vshard.router.module_version() check_reloaded() -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end check_reloaded() diff --git a/test/storage/reload.result b/test/storage/reload.result index f689cf4..531d984 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -113,13 +113,13 @@ vshard.storage.module_version() check_reloaded() --- ... -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end --- ... -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end --- ... check_reloaded() diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 6e19a92..64c3a60 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -59,9 +59,9 @@ vshard.storage.module_version() check_reloaded() -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end check_reloaded() diff --git a/test/unit/util.result b/test/unit/util.result new file mode 100644 index 0000000..dbfcce9 --- /dev/null +++ b/test/unit/util.result @@ -0,0 +1,72 @@ +test_run = require('test_run').new() +--- +... +fiber = require('fiber') +--- +... +log = require('log') +--- +... +test_util = require('util') +--- +... +util = require('vshard.util') +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +fake_M = { + reloadable_func = nil, + module_version = 1, +}; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Check autoreload on function change during failure. +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end +--- +... +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +--- +... +fib:cancel() +--- +... +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has changed') +--- +- 'Worker_name: reloadable function reloadable_function has changed' +... +test_run:grep_log('default', 'Worker_name is reloaded, restarting') +--- +- Worker_name is reloaded, restarting +... +test_run:grep_log('default', 'Worker_name has been started') +--- +- Worker_name has been started +... +log.info(string.rep('a', 1000)) +--- +... +-- Check reload feature. +fake_M.reloadable_function = function () return true end +--- +... +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +--- +... +fib:cancel() +--- +... +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) +--- +- Worker_name is reloaded, restarting +... +test_run:grep_log('default', 'Worker_name has been started', 1000) +--- +- Worker_name has been started +... diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua new file mode 100644 index 0000000..094cae9 --- /dev/null +++ b/test/unit/util.test.lua @@ -0,0 +1,29 @@ +test_run = require('test_run').new() +fiber = require('fiber') +log = require('log') +test_util = require('util') +util = require('vshard.util') + +test_run:cmd("setopt delimiter ';'") +fake_M = { + reloadable_func = nil, + module_version = 1, +}; +test_run:cmd("setopt delimiter ''"); + +-- Check autoreload on function change during failure. +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end + +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib:cancel() +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has changed') +test_run:grep_log('default', 'Worker_name is reloaded, restarting') +test_run:grep_log('default', 'Worker_name has been started') +log.info(string.rep('a', 1000)) + +-- Check reload feature. +fake_M.reloadable_function = function () return true end +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib:cancel() +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) +test_run:grep_log('default', 'Worker_name has been started', 1000) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 21093e5..9e18b27 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -491,11 +491,9 @@ local function router_cfg(cfg) end lreplicaset.wait_masters_connect(new_replicasets) if M.failover_fiber == nil then - log.info('Start failover fiber') lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover') end if M.discovery_fiber == nil then - log.info('Start discovery fiber') lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery') end -- Destroy connections, not used in a new configuration. diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 57076e1..300751f 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -692,10 +692,8 @@ local function local_on_master_enable() M.collect_bucket_garbage_fiber = lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f', 'Garbage collector') - log.info("GC is started") M.recovery_fiber = lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery') - log.info('Recovery is started') -- TODO: check current status log.info("Took on replicaset master role") end @@ -1585,7 +1583,6 @@ local function storage_cfg(cfg, this_replica_uuid) if min_master == this_replica then if not M.rebalancer_fiber then - log.info('Run rebalancer') M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M, 'rebalancer_f', 'Rebalancer') else diff --git a/vshard/util.lua b/vshard/util.lua index bb71318..920f53e 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -2,6 +2,19 @@ local log = require('log') local fiber = require('fiber') +local MODULE_INTERNALS = '__module_vshard_util' +local M = rawget(_G, MODULE_INTERNALS) +if not M then + -- + -- The module is loaded for the first time. + -- + M = { + -- Latest versions of functions. + reloadable_fiber_f = nil, + } + rawset(_G, MODULE_INTERNALS, M) +end + -- -- Extract parts of a tuple. -- @param tuple Tuple to extract a key from. @@ -19,33 +32,37 @@ local function tuple_extract_key(tuple, parts) end -- --- Wrapper to run @a func in infinite loop and restart it on the --- module reload. This function CAN NOT BE AUTORELOADED. To update --- it you must manualy stop all fibers, run by this function, do --- reload, and then restart all stopped fibers. This can be done, --- for example, by calling vshard.storage/router.cfg() again with --- the same config as earlier. +-- Wrapper to run a func in infinite loop and restart it on +-- errors and module reload. +-- To handle module reload and run new version of a function +-- in the module, the function should just return. -- --- @param func Reloadable function to run. It must accept current --- module version as an argument, and interrupt itself, --- when it is changed. --- @param worker_name Name of the function. Usual infinite fiber --- represents a background subsystem, which has a name. For --- example: "Garbage Collector", "Recovery", "Discovery", --- "Rebalancer". --- @param M Module which can reload. +-- @param module Module which can be reloaded. +-- @param func_name Name of a function to be executed in the +-- module. +-- @param worker_name Name of the reloadable background subsystem. +-- For example: "Garbage Collector", "Recovery", "Discovery", +-- "Rebalancer". Used only for an activity logging. -- -local function reloadable_fiber_f(M, func_name, worker_name) - while true do - local ok, err = pcall(M[func_name], M.module_version) - if not ok then - log.error('%s has been failed: %s', worker_name, err) - fiber.yield() - else - log.info('%s has been reloaded', worker_name) - fiber.yield() +local function reloadable_fiber_f(module, func_name, worker_name) + log.info('%s has been started', worker_name) + local func = module[func_name] + local ok, err = pcall(func, module.module_version) + if not ok then + log.error('%s has been failed: %s', worker_name, err) + if func ~= module[func_name] then + log.warn('%s: reloadable function %s has changed', + worker_name, func_name) end end + -- yield serves two pursoses: + -- * makes this fiber cancellable + -- * prevents 100% cpu consumption + fiber.yield() + log.info('%s is reloaded, restarting', worker_name) + -- luajit drops this frame if next function is called in + -- return statement. + return M.reloadable_fiber_f(module, func_name, worker_name) end -- @@ -75,8 +92,12 @@ local function generate_self_checker(obj_name, func_name, mt, func) end end +-- Update latest versions of function +M.reloadable_fiber_f = reloadable_fiber_f + return { tuple_extract_key = tuple_extract_key, reloadable_fiber_f = reloadable_fiber_f, generate_self_checker = generate_self_checker, + internal = M, } ^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber 2018-06-26 12:50 ` Alex Khatskevich @ 2018-06-27 9:54 ` Vladislav Shpilevoy 2018-06-27 10:44 ` Alex Khatskevich 0 siblings, 1 reply; 11+ messages in thread From: Vladislav Shpilevoy @ 2018-06-27 9:54 UTC (permalink / raw) To: tarantool-patches, AKhatskevich Hello. Thanks for the fixes! See 4 comments below. On 26/06/2018 15:50, Alex Khatskevich wrote: > Sorry, I forgot to put the new patch here > > commit 4ddab3c47c5963c3138701d89ba3091d5da9848a > Author: AKhatskevich <avkhatskevich@tarantool.org> > Date: Thu Jun 14 14:03:07 2018 +0300 > > Reload reloadable fiber > > Fixed a problem: > The `reloadable_fiber_f` was running an infinite while loop and > preventing the whole module from being reloaded. > > This behavior is fixed by calling new version of `reloadable_fiber_f` in > a return statement instead of the where loop. Note: calling a function 1. Again 'where loop'. > in a return statement doesn't increase a stack size. > > Extra changes: > * transfer write "started" message responsibility from caller to > reloadable fiber > > Closes #116 > > diff --git a/test/unit/util.result b/test/unit/util.result > new file mode 100644 > index 0000000..dbfcce9 > --- /dev/null > +++ b/test/unit/util.result > @@ -0,0 +1,72 @@ > +test_run = require('test_run').new() > +--- > +... > +fiber = require('fiber') > +--- > +... > +log = require('log') > +--- > +... > +test_util = require('util') > +--- > +... > +util = require('vshard.util') > +--- > +... > +test_run:cmd("setopt delimiter ';'") > +--- > +- true > +... > +fake_M = { > + reloadable_func = nil, > + module_version = 1, > +}; > +--- > +... > +test_run:cmd("setopt delimiter ''"); > +--- > +- true > +... > +-- Check autoreload on function change during failure. > +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end > +--- > +... > +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > +--- > +... > +fib:cancel() > +--- > +... > +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has changed') 2. 'Has been' changed? > +--- > +- 'Worker_name: reloadable function reloadable_function has changed' > +... > diff --git a/vshard/util.lua b/vshard/util.lua > index bb71318..920f53e 100644 > --- a/vshard/util.lua > +++ b/vshard/util.lua > @@ -19,33 +32,37 @@ local function tuple_extract_key(tuple, parts) > +local function reloadable_fiber_f(module, func_name, worker_name) > + log.info('%s has been started', worker_name) > + local func = module[func_name] > + local ok, err = pcall(func, module.module_version) > + if not ok then > + log.error('%s has been failed: %s', worker_name, err) > + if func ~= module[func_name] then > + log.warn('%s: reloadable function %s has changed', > + worker_name, func_name) 3. Why is these check and warn here, after an error? > end > end > + -- yield serves two pursoses: > + -- * makes this fiber cancellable > + -- * prevents 100% cpu consumption > + fiber.yield() > + log.info('%s is reloaded, restarting', worker_name) 4. Why do you print 'is reloaded' even if it just thrown an error and the function is not changed actually? > + -- luajit drops this frame if next function is called in > + -- return statement. > + return M.reloadable_fiber_f(module, func_name, worker_name) > end ^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber 2018-06-27 9:54 ` Vladislav Shpilevoy @ 2018-06-27 10:44 ` Alex Khatskevich 2018-06-27 11:34 ` Vladislav Shpilevoy 0 siblings, 1 reply; 11+ messages in thread From: Alex Khatskevich @ 2018-06-27 10:44 UTC (permalink / raw) To: Vladislav Shpilevoy, tarantool-patches On 27.06.2018 12:54, Vladislav Shpilevoy wrote: > Hello. Thanks for the fixes! See 4 comments below. > > On 26/06/2018 15:50, Alex Khatskevich wrote: >> Sorry, I forgot to put the new patch here >> >> commit 4ddab3c47c5963c3138701d89ba3091d5da9848a >> Author: AKhatskevich <avkhatskevich@tarantool.org> >> Date: Thu Jun 14 14:03:07 2018 +0300 >> >> Reload reloadable fiber >> >> Fixed a problem: >> The `reloadable_fiber_f` was running an infinite while loop and >> preventing the whole module from being reloaded. >> >> This behavior is fixed by calling new version of >> `reloadable_fiber_f` in >> a return statement instead of the where loop. Note: calling a >> function > > 1. Again 'where loop'. > Sorry >> in a return statement doesn't increase a stack size. >> >> Extra changes: >> * transfer write "started" message responsibility from caller to >> reloadable fiber >> >> Closes #116 >> >> diff --git a/test/unit/util.result b/test/unit/util.result >> new file mode 100644 >> index 0000000..dbfcce9 >> --- /dev/null >> +++ b/test/unit/util.result >> @@ -0,0 +1,72 @@ >> +test_run = require('test_run').new() >> +--- >> +... >> +fiber = require('fiber') >> +--- >> +... >> +log = require('log') >> +--- >> +... >> +test_util = require('util') >> +--- >> +... >> +util = require('vshard.util') >> +--- >> +... >> +test_run:cmd("setopt delimiter ';'") >> +--- >> +- true >> +... >> +fake_M = { >> + reloadable_func = nil, >> + module_version = 1, >> +}; >> +--- >> +... >> +test_run:cmd("setopt delimiter ''"); >> +--- >> +- true >> +... >> +-- Check autoreload on function change during failure. >> +fake_M.reloadable_function = function () fake_M.reloadable_function >> = 42; error('Error happened.') end >> +--- >> +... >> +fib = fiber.create(util.reloadable_fiber_f, fake_M, >> 'reloadable_function', 'Worker_name') >> +--- >> +... >> +fib:cancel() >> +--- >> +... >> +test_run:grep_log('default', 'Worker_name: reloadable function >> reloadable_function has changed') > > 2. 'Has been' changed? I has changed it, but there are two notes: 1. Both are correct https://english.stackexchange.com/questions/97243/has-been-changed-or-has-changed 2. vshard uses both options: a. "Configuration has changed, restart" b. "Rebalancer location has changed" > >> +--- >> +- 'Worker_name: reloadable function reloadable_function has changed' >> +... >> diff --git a/vshard/util.lua b/vshard/util.lua >> index bb71318..920f53e 100644 >> --- a/vshard/util.lua >> +++ b/vshard/util.lua >> @@ -19,33 +32,37 @@ local function tuple_extract_key(tuple, parts) >> +local function reloadable_fiber_f(module, func_name, worker_name) >> + log.info('%s has been started', worker_name) >> + local func = module[func_name] >> + local ok, err = pcall(func, module.module_version) >> + if not ok then >> + log.error('%s has been failed: %s', worker_name, err) >> + if func ~= module[func_name] then >> + log.warn('%s: reloadable function %s has changed', >> + worker_name, func_name) > > 3. Why is these check and warn here, after an error? Changed: 1. warn -> error 2. Comment added: -- There is a chance that error was raised during reload -- (or caused by reload). Perform reload in case function -- has been changed. > >> end >> end >> + -- yield serves two pursoses: >> + -- * makes this fiber cancellable >> + -- * prevents 100% cpu consumption >> + fiber.yield() >> + log.info('%s is reloaded, restarting', worker_name) > > 4. Why do you print 'is reloaded' even if it just thrown an error > and the function is not changed actually? Fixed by goto > >> + -- luajit drops this frame if next function is called in >> + -- return statement. >> + return M.reloadable_fiber_f(module, func_name, worker_name) >> end Full diff: commit d098bf0930935b029fbca3f4fd78211dd5022fb1 Author: AKhatskevich <avkhatskevich@tarantool.org> Date: Thu Jun 14 14:03:07 2018 +0300 Reload reloadable fiber Fixed a problem: The `reloadable_fiber_f` was running an infinite while loop and preventing the whole module from being reloaded. This behavior is fixed by calling new version of `reloadable_fiber_f` in a return statement instead of the while loop. Note: calling a function in a return statement doesn't increase a stack size. Extra changes: * transfer write "started" message responsibility from caller to reloadable fiber Closes #116 diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result index 22100fe..6e3aa81 100644 --- a/test/rebalancer/rebalancer.result +++ b/test/rebalancer/rebalancer.result @@ -424,7 +424,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) fiber = require('fiber') --- ... -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end --- ... while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua index 3327f30..922357a 100644 --- a/test/rebalancer/rebalancer.test.lua +++ b/test/rebalancer/rebalancer.test.lua @@ -197,7 +197,7 @@ switch_rs1_master() vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) fiber = require('fiber') -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end -- diff --git a/test/router/reload.result b/test/router/reload.result index 19a9ead..47f3c2e 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -116,10 +116,10 @@ vshard.router.module_version() check_reloaded() --- ... -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end --- ... check_reloaded() diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 6e21b74..af2939d 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -59,8 +59,8 @@ vshard.router.module_version() check_reloaded() -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end check_reloaded() diff --git a/test/storage/reload.result b/test/storage/reload.result index f689cf4..531d984 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -113,13 +113,13 @@ vshard.storage.module_version() check_reloaded() --- ... -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end --- ... -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end --- ... check_reloaded() diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 6e19a92..64c3a60 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -59,9 +59,9 @@ vshard.storage.module_version() check_reloaded() -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end check_reloaded() diff --git a/test/unit/util.result b/test/unit/util.result new file mode 100644 index 0000000..a7b176d --- /dev/null +++ b/test/unit/util.result @@ -0,0 +1,72 @@ +test_run = require('test_run').new() +--- +... +fiber = require('fiber') +--- +... +log = require('log') +--- +... +test_util = require('util') +--- +... +util = require('vshard.util') +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +fake_M = { + reloadable_func = nil, + module_version = 1, +}; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Check autoreload on function change during failure. +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end +--- +... +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +--- +... +fib:cancel() +--- +... +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') +--- +- 'Worker_name: reloadable function reloadable_function has been changed' +... +test_run:grep_log('default', 'Worker_name is reloaded, restarting') +--- +- Worker_name is reloaded, restarting +... +test_run:grep_log('default', 'Worker_name has been started') +--- +- Worker_name has been started +... +log.info(string.rep('a', 1000)) +--- +... +-- Check reload feature. +fake_M.reloadable_function = function () return true end +--- +... +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +--- +... +fib:cancel() +--- +... +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) +--- +- Worker_name is reloaded, restarting +... +test_run:grep_log('default', 'Worker_name has been started', 1000) +--- +- Worker_name has been started +... diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua new file mode 100644 index 0000000..9359378 --- /dev/null +++ b/test/unit/util.test.lua @@ -0,0 +1,29 @@ +test_run = require('test_run').new() +fiber = require('fiber') +log = require('log') +test_util = require('util') +util = require('vshard.util') + +test_run:cmd("setopt delimiter ';'") +fake_M = { + reloadable_func = nil, + module_version = 1, +}; +test_run:cmd("setopt delimiter ''"); + +-- Check autoreload on function change during failure. +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end + +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib:cancel() +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') +test_run:grep_log('default', 'Worker_name is reloaded, restarting') +test_run:grep_log('default', 'Worker_name has been started') +log.info(string.rep('a', 1000)) + +-- Check reload feature. +fake_M.reloadable_function = function () return true end +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib:cancel() +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) +test_run:grep_log('default', 'Worker_name has been started', 1000) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 21093e5..9e18b27 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -491,11 +491,9 @@ local function router_cfg(cfg) end lreplicaset.wait_masters_connect(new_replicasets) if M.failover_fiber == nil then - log.info('Start failover fiber') lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover') end if M.discovery_fiber == nil then - log.info('Start discovery fiber') lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery') end -- Destroy connections, not used in a new configuration. diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 57076e1..300751f 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -692,10 +692,8 @@ local function local_on_master_enable() M.collect_bucket_garbage_fiber = lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f', 'Garbage collector') - log.info("GC is started") M.recovery_fiber = lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery') - log.info('Recovery is started') -- TODO: check current status log.info("Took on replicaset master role") end @@ -1585,7 +1583,6 @@ local function storage_cfg(cfg, this_replica_uuid) if min_master == this_replica then if not M.rebalancer_fiber then - log.info('Run rebalancer') M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M, 'rebalancer_f', 'Rebalancer') else diff --git a/vshard/util.lua b/vshard/util.lua index bb71318..ce79930 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -2,6 +2,19 @@ local log = require('log') local fiber = require('fiber') +local MODULE_INTERNALS = '__module_vshard_util' +local M = rawget(_G, MODULE_INTERNALS) +if not M then + -- + -- The module is loaded for the first time. + -- + M = { + -- Latest versions of functions. + reloadable_fiber_f = nil, + } + rawset(_G, MODULE_INTERNALS, M) +end + -- -- Extract parts of a tuple. -- @param tuple Tuple to extract a key from. @@ -19,33 +32,42 @@ local function tuple_extract_key(tuple, parts) end -- --- Wrapper to run @a func in infinite loop and restart it on the --- module reload. This function CAN NOT BE AUTORELOADED. To update --- it you must manualy stop all fibers, run by this function, do --- reload, and then restart all stopped fibers. This can be done, --- for example, by calling vshard.storage/router.cfg() again with --- the same config as earlier. +-- Wrapper to run a func in infinite loop and restart it on +-- errors and module reload. +-- To handle module reload and run new version of a function +-- in the module, the function should just return. -- --- @param func Reloadable function to run. It must accept current --- module version as an argument, and interrupt itself, --- when it is changed. --- @param worker_name Name of the function. Usual infinite fiber --- represents a background subsystem, which has a name. For --- example: "Garbage Collector", "Recovery", "Discovery", --- "Rebalancer". --- @param M Module which can reload. +-- @param module Module which can be reloaded. +-- @param func_name Name of a function to be executed in the +-- module. +-- @param worker_name Name of the reloadable background subsystem. +-- For example: "Garbage Collector", "Recovery", "Discovery", +-- "Rebalancer". Used only for an activity logging. -- -local function reloadable_fiber_f(M, func_name, worker_name) - while true do - local ok, err = pcall(M[func_name], M.module_version) - if not ok then - log.error('%s has been failed: %s', worker_name, err) - fiber.yield() - else - log.info('%s has been reloaded', worker_name) - fiber.yield() +local function reloadable_fiber_f(module, func_name, worker_name) + log.info('%s has been started', worker_name) + local func = module[func_name] +::reload_loop:: + local ok, err = pcall(func, module.module_version) + -- yield serves two pursoses: + -- * makes this fiber cancellable + -- * prevents 100% cpu consumption + fiber.yield() + if not ok then + log.error('%s has been failed: %s', worker_name, err) + if func == module[func_name] then + goto reload_loop end + -- There is a chance that error was raised during reload + -- (or caused by reload). Perform reload in case function + -- has been changed. + log.error('%s: reloadable function %s has been changed', + worker_name, func_name) end + log.info('%s is reloaded, restarting', worker_name) + -- luajit drops this frame if next function is called in + -- return statement. + return M.reloadable_fiber_f(module, func_name, worker_name) end -- @@ -75,8 +97,12 @@ local function generate_self_checker(obj_name, func_name, mt, func) end end +-- Update latest versions of function +M.reloadable_fiber_f = reloadable_fiber_f + return { tuple_extract_key = tuple_extract_key, reloadable_fiber_f = reloadable_fiber_f, generate_self_checker = generate_self_checker, + internal = M, } ^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber 2018-06-27 10:44 ` Alex Khatskevich @ 2018-06-27 11:34 ` Vladislav Shpilevoy 2018-06-27 11:45 ` Alex Khatskevich 0 siblings, 1 reply; 11+ messages in thread From: Vladislav Shpilevoy @ 2018-06-27 11:34 UTC (permalink / raw) To: Alex Khatskevich, tarantool-patches Please, rebase on the latest master branch. When I do, the tests fail. On 27/06/2018 13:44, Alex Khatskevich wrote: > > > On 27.06.2018 12:54, Vladislav Shpilevoy wrote: >> Hello. Thanks for the fixes! See 4 comments below. >> >> On 26/06/2018 15:50, Alex Khatskevich wrote: >>> Sorry, I forgot to put the new patch here >>> >>> commit 4ddab3c47c5963c3138701d89ba3091d5da9848a >>> Author: AKhatskevich <avkhatskevich@tarantool.org> >>> Date: Thu Jun 14 14:03:07 2018 +0300 >>> >>> Reload reloadable fiber >>> >>> Fixed a problem: >>> The `reloadable_fiber_f` was running an infinite while loop and >>> preventing the whole module from being reloaded. >>> >>> This behavior is fixed by calling new version of `reloadable_fiber_f` in >>> a return statement instead of the where loop. Note: calling a function >> >> 1. Again 'where loop'. >> > Sorry >>> in a return statement doesn't increase a stack size. >>> >>> Extra changes: >>> * transfer write "started" message responsibility from caller to >>> reloadable fiber >>> >>> Closes #116 >>> >>> diff --git a/test/unit/util.result b/test/unit/util.result >>> new file mode 100644 >>> index 0000000..dbfcce9 >>> --- /dev/null >>> +++ b/test/unit/util.result >>> @@ -0,0 +1,72 @@ >>> +test_run = require('test_run').new() >>> +--- >>> +... >>> +fiber = require('fiber') >>> +--- >>> +... >>> +log = require('log') >>> +--- >>> +... >>> +test_util = require('util') >>> +--- >>> +... >>> +util = require('vshard.util') >>> +--- >>> +... >>> +test_run:cmd("setopt delimiter ';'") >>> +--- >>> +- true >>> +... >>> +fake_M = { >>> + reloadable_func = nil, >>> + module_version = 1, >>> +}; >>> +--- >>> +... >>> +test_run:cmd("setopt delimiter ''"); >>> +--- >>> +- true >>> +... >>> +-- Check autoreload on function change during failure. >>> +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end >>> +--- >>> +... >>> +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') >>> +--- >>> +... >>> +fib:cancel() >>> +--- >>> +... >>> +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has changed') >> >> 2. 'Has been' changed? > I has changed it, but there are two notes: > 1. Both are correct https://english.stackexchange.com/questions/97243/has-been-changed-or-has-changed > 2. vshard uses both options: > a. "Configuration has changed, restart" > b. "Rebalancer location has changed" >> >>> +--- >>> +- 'Worker_name: reloadable function reloadable_function has changed' >>> +... >>> diff --git a/vshard/util.lua b/vshard/util.lua >>> index bb71318..920f53e 100644 >>> --- a/vshard/util.lua >>> +++ b/vshard/util.lua >>> @@ -19,33 +32,37 @@ local function tuple_extract_key(tuple, parts) >>> +local function reloadable_fiber_f(module, func_name, worker_name) >>> + log.info('%s has been started', worker_name) >>> + local func = module[func_name] >>> + local ok, err = pcall(func, module.module_version) >>> + if not ok then >>> + log.error('%s has been failed: %s', worker_name, err) >>> + if func ~= module[func_name] then >>> + log.warn('%s: reloadable function %s has changed', >>> + worker_name, func_name) >> >> 3. Why is these check and warn here, after an error? > Changed: > 1. warn -> error > 2. Comment added: > -- There is a chance that error was raised during reload > -- (or caused by reload). Perform reload in case function > -- has been changed. >> >>> end >>> end >>> + -- yield serves two pursoses: >>> + -- * makes this fiber cancellable >>> + -- * prevents 100% cpu consumption >>> + fiber.yield() >>> + log.info('%s is reloaded, restarting', worker_name) >> >> 4. Why do you print 'is reloaded' even if it just thrown an error >> and the function is not changed actually? > > Fixed by goto >> >>> + -- luajit drops this frame if next function is called in >>> + -- return statement. >>> + return M.reloadable_fiber_f(module, func_name, worker_name) >>> end > > Full diff: > > commit d098bf0930935b029fbca3f4fd78211dd5022fb1 > Author: AKhatskevich <avkhatskevich@tarantool.org> > Date: Thu Jun 14 14:03:07 2018 +0300 > > Reload reloadable fiber > > Fixed a problem: > The `reloadable_fiber_f` was running an infinite while loop and > preventing the whole module from being reloaded. > > This behavior is fixed by calling new version of `reloadable_fiber_f` in > a return statement instead of the while loop. Note: calling a function > in a return statement doesn't increase a stack size. > > Extra changes: > * transfer write "started" message responsibility from caller to > reloadable fiber > > Closes #116 > > diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result > index 22100fe..6e3aa81 100644 > --- a/test/rebalancer/rebalancer.result > +++ b/test/rebalancer/rebalancer.result > @@ -424,7 +424,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) > fiber = require('fiber') > --- > ... > -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end > +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end > --- > ... > while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end > diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua > index 3327f30..922357a 100644 > --- a/test/rebalancer/rebalancer.test.lua > +++ b/test/rebalancer/rebalancer.test.lua > @@ -197,7 +197,7 @@ switch_rs1_master() > vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) > > fiber = require('fiber') > -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end > +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end > while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end > > -- > diff --git a/test/router/reload.result b/test/router/reload.result > index 19a9ead..47f3c2e 100644 > --- a/test/router/reload.result > +++ b/test/router/reload.result > @@ -116,10 +116,10 @@ vshard.router.module_version() > check_reloaded() > --- > ... > -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end > --- > ... > -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > --- > ... > check_reloaded() > diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua > index 6e21b74..af2939d 100644 > --- a/test/router/reload.test.lua > +++ b/test/router/reload.test.lua > @@ -59,8 +59,8 @@ vshard.router.module_version() > > check_reloaded() > > -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end > -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > > check_reloaded() > > diff --git a/test/storage/reload.result b/test/storage/reload.result > index f689cf4..531d984 100644 > --- a/test/storage/reload.result > +++ b/test/storage/reload.result > @@ -113,13 +113,13 @@ vshard.storage.module_version() > check_reloaded() > --- > ... > -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end > --- > ... > -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end > +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end > --- > ... > -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end > +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end > --- > ... > check_reloaded() > diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua > index 6e19a92..64c3a60 100644 > --- a/test/storage/reload.test.lua > +++ b/test/storage/reload.test.lua > @@ -59,9 +59,9 @@ vshard.storage.module_version() > > check_reloaded() > > -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end > -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end > -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end > +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end > +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end > > check_reloaded() > > diff --git a/test/unit/util.result b/test/unit/util.result > new file mode 100644 > index 0000000..a7b176d > --- /dev/null > +++ b/test/unit/util.result > @@ -0,0 +1,72 @@ > +test_run = require('test_run').new() > +--- > +... > +fiber = require('fiber') > +--- > +... > +log = require('log') > +--- > +... > +test_util = require('util') > +--- > +... > +util = require('vshard.util') > +--- > +... > +test_run:cmd("setopt delimiter ';'") > +--- > +- true > +... > +fake_M = { > + reloadable_func = nil, > + module_version = 1, > +}; > +--- > +... > +test_run:cmd("setopt delimiter ''"); > +--- > +- true > +... > +-- Check autoreload on function change during failure. > +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end > +--- > +... > +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > +--- > +... > +fib:cancel() > +--- > +... > +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') > +--- > +- 'Worker_name: reloadable function reloadable_function has been changed' > +... > +test_run:grep_log('default', 'Worker_name is reloaded, restarting') > +--- > +- Worker_name is reloaded, restarting > +... > +test_run:grep_log('default', 'Worker_name has been started') > +--- > +- Worker_name has been started > +... > +log.info(string.rep('a', 1000)) > +--- > +... > +-- Check reload feature. > +fake_M.reloadable_function = function () return true end > +--- > +... > +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > +--- > +... > +fib:cancel() > +--- > +... > +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) > +--- > +- Worker_name is reloaded, restarting > +... > +test_run:grep_log('default', 'Worker_name has been started', 1000) > +--- > +- Worker_name has been started > +... > diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua > new file mode 100644 > index 0000000..9359378 > --- /dev/null > +++ b/test/unit/util.test.lua > @@ -0,0 +1,29 @@ > +test_run = require('test_run').new() > +fiber = require('fiber') > +log = require('log') > +test_util = require('util') > +util = require('vshard.util') > + > +test_run:cmd("setopt delimiter ';'") > +fake_M = { > + reloadable_func = nil, > + module_version = 1, > +}; > +test_run:cmd("setopt delimiter ''"); > + > +-- Check autoreload on function change during failure. > +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end > + > +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > +fib:cancel() > +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') > +test_run:grep_log('default', 'Worker_name is reloaded, restarting') > +test_run:grep_log('default', 'Worker_name has been started') > +log.info(string.rep('a', 1000)) > + > +-- Check reload feature. > +fake_M.reloadable_function = function () return true end > +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > +fib:cancel() > +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) > +test_run:grep_log('default', 'Worker_name has been started', 1000) > diff --git a/vshard/router/init.lua b/vshard/router/init.lua > index 21093e5..9e18b27 100644 > --- a/vshard/router/init.lua > +++ b/vshard/router/init.lua > @@ -491,11 +491,9 @@ local function router_cfg(cfg) > end > lreplicaset.wait_masters_connect(new_replicasets) > if M.failover_fiber == nil then > - log.info('Start failover fiber') > lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover') > end > if M.discovery_fiber == nil then > - log.info('Start discovery fiber') > lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery') > end > -- Destroy connections, not used in a new configuration. > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua > index 57076e1..300751f 100644 > --- a/vshard/storage/init.lua > +++ b/vshard/storage/init.lua > @@ -692,10 +692,8 @@ local function local_on_master_enable() > M.collect_bucket_garbage_fiber = > lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f', > 'Garbage collector') > - log.info("GC is started") > M.recovery_fiber = > lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery') > - log.info('Recovery is started') > -- TODO: check current status > log.info("Took on replicaset master role") > end > @@ -1585,7 +1583,6 @@ local function storage_cfg(cfg, this_replica_uuid) > > if min_master == this_replica then > if not M.rebalancer_fiber then > - log.info('Run rebalancer') > M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M, > 'rebalancer_f', 'Rebalancer') > else > diff --git a/vshard/util.lua b/vshard/util.lua > index bb71318..ce79930 100644 > --- a/vshard/util.lua > +++ b/vshard/util.lua > @@ -2,6 +2,19 @@ > local log = require('log') > local fiber = require('fiber') > > +local MODULE_INTERNALS = '__module_vshard_util' > +local M = rawget(_G, MODULE_INTERNALS) > +if not M then > + -- > + -- The module is loaded for the first time. > + -- > + M = { > + -- Latest versions of functions. > + reloadable_fiber_f = nil, > + } > + rawset(_G, MODULE_INTERNALS, M) > +end > + > -- > -- Extract parts of a tuple. > -- @param tuple Tuple to extract a key from. > @@ -19,33 +32,42 @@ local function tuple_extract_key(tuple, parts) > end > > -- > --- Wrapper to run @a func in infinite loop and restart it on the > --- module reload. This function CAN NOT BE AUTORELOADED. To update > --- it you must manualy stop all fibers, run by this function, do > --- reload, and then restart all stopped fibers. This can be done, > --- for example, by calling vshard.storage/router.cfg() again with > --- the same config as earlier. > +-- Wrapper to run a func in infinite loop and restart it on > +-- errors and module reload. > +-- To handle module reload and run new version of a function > +-- in the module, the function should just return. > -- > --- @param func Reloadable function to run. It must accept current > --- module version as an argument, and interrupt itself, > --- when it is changed. > --- @param worker_name Name of the function. Usual infinite fiber > --- represents a background subsystem, which has a name. For > --- example: "Garbage Collector", "Recovery", "Discovery", > --- "Rebalancer". > --- @param M Module which can reload. > +-- @param module Module which can be reloaded. > +-- @param func_name Name of a function to be executed in the > +-- module. > +-- @param worker_name Name of the reloadable background subsystem. > +-- For example: "Garbage Collector", "Recovery", "Discovery", > +-- "Rebalancer". Used only for an activity logging. > -- > -local function reloadable_fiber_f(M, func_name, worker_name) > - while true do > - local ok, err = pcall(M[func_name], M.module_version) > - if not ok then > - log.error('%s has been failed: %s', worker_name, err) > - fiber.yield() > - else > - log.info('%s has been reloaded', worker_name) > - fiber.yield() > +local function reloadable_fiber_f(module, func_name, worker_name) > + log.info('%s has been started', worker_name) > + local func = module[func_name] > +::reload_loop:: > + local ok, err = pcall(func, module.module_version) > + -- yield serves two pursoses: > + -- * makes this fiber cancellable > + -- * prevents 100% cpu consumption > + fiber.yield() > + if not ok then > + log.error('%s has been failed: %s', worker_name, err) > + if func == module[func_name] then > + goto reload_loop > end > + -- There is a chance that error was raised during reload > + -- (or caused by reload). Perform reload in case function > + -- has been changed. > + log.error('%s: reloadable function %s has been changed', > + worker_name, func_name) > end > + log.info('%s is reloaded, restarting', worker_name) > + -- luajit drops this frame if next function is called in > + -- return statement. > + return M.reloadable_fiber_f(module, func_name, worker_name) > end > > -- > @@ -75,8 +97,12 @@ local function generate_self_checker(obj_name, func_name, mt, func) > end > end > > +-- Update latest versions of function > +M.reloadable_fiber_f = reloadable_fiber_f > + > return { > tuple_extract_key = tuple_extract_key, > reloadable_fiber_f = reloadable_fiber_f, > generate_self_checker = generate_self_checker, > + internal = M, > } > ^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber 2018-06-27 11:34 ` Vladislav Shpilevoy @ 2018-06-27 11:45 ` Alex Khatskevich 2018-06-27 11:53 ` Vladislav Shpilevoy 0 siblings, 1 reply; 11+ messages in thread From: Alex Khatskevich @ 2018-06-27 11:45 UTC (permalink / raw) To: Vladislav Shpilevoy, tarantool-patches I have rebased and pushed, but... Non of the tests fails. As far as I know, there is a couple of blinking tests in vshard. On 27.06.2018 14:34, Vladislav Shpilevoy wrote: > Please, rebase on the latest master branch. When I do, > the tests fail. ^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber 2018-06-27 11:45 ` Alex Khatskevich @ 2018-06-27 11:53 ` Vladislav Shpilevoy 2018-06-27 13:11 ` Vladislav Shpilevoy 0 siblings, 1 reply; 11+ messages in thread From: Vladislav Shpilevoy @ 2018-06-27 11:53 UTC (permalink / raw) To: Alex Khatskevich, tarantool-patches On 27/06/2018 14:45, Alex Khatskevich wrote: > I have rebased and pushed, but... > > Non of the tests fails. > > As far as I know, there is a couple of blinking tests in vshard. VShard now has no unstable tests. I got error on your new test, and even with no rebase it appears sometimes: ====================================================================================== WORKR TEST PARAMS RESULT --------------------------------------------------------------------------------- [001] unit/util.test.lua [ fail ] [001] [001] Test failed! Result content mismatch: [001] --- unit/util.result Wed Jun 27 14:46:34 2018 [001] +++ unit/util.reject Wed Jun 27 14:52:17 2018 [001] @@ -39,11 +39,11 @@ [001] ... [001] test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') [001] --- [001] -- 'Worker_name: reloadable function reloadable_function has been changed' [001] +- null [001] ... [001] test_run:grep_log('default', 'Worker_name is reloaded, restarting') [001] --- [001] -- Worker_name is reloaded, restarting [001] +- null [001] ... [001] test_run:grep_log('default', 'Worker_name has been started') [001] --- [001] I think it is because you cancel fiber too early in the test. Please, repair. > > > On 27.06.2018 14:34, Vladislav Shpilevoy wrote: >> Please, rebase on the latest master branch. When I do, >> the tests fail. ^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber 2018-06-27 11:53 ` Vladislav Shpilevoy @ 2018-06-27 13:11 ` Vladislav Shpilevoy 0 siblings, 0 replies; 11+ messages in thread From: Vladislav Shpilevoy @ 2018-06-27 13:11 UTC (permalink / raw) To: Alex Khatskevich, tarantool-patches Thanks for the fixes! The patch is pushed. On 27/06/2018 14:53, Vladislav Shpilevoy wrote: > > > On 27/06/2018 14:45, Alex Khatskevich wrote: >> I have rebased and pushed, but... >> >> Non of the tests fails. >> >> As far as I know, there is a couple of blinking tests in vshard. > > VShard now has no unstable tests. I got error on your new test, and > even with no rebase it appears sometimes: > > > ====================================================================================== > WORKR TEST PARAMS RESULT > --------------------------------------------------------------------------------- > [001] unit/util.test.lua [ fail ] > [001] > [001] Test failed! Result content mismatch: > [001] --- unit/util.result Wed Jun 27 14:46:34 2018 > [001] +++ unit/util.reject Wed Jun 27 14:52:17 2018 > [001] @@ -39,11 +39,11 @@ > [001] ... > [001] test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') > [001] --- > [001] -- 'Worker_name: reloadable function reloadable_function has been changed' > [001] +- null > [001] ... > [001] test_run:grep_log('default', 'Worker_name is reloaded, restarting') > [001] --- > [001] -- Worker_name is reloaded, restarting > [001] +- null > [001] ... > [001] test_run:grep_log('default', 'Worker_name has been started') > [001] --- > [001] > > I think it is because you cancel fiber too early in the test. Please, repair. > >> >> >> On 27.06.2018 14:34, Vladislav Shpilevoy wrote: >>> Please, rebase on the latest master branch. When I do, >>> the tests fail. > ^ permalink raw reply [flat|nested] 11+ messages in thread
end of thread, other threads:[~2018-06-27 13:11 UTC | newest] Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2018-06-14 11:42 [tarantool-patches] [PATCH][vshard] Reload reloadable fiber AKhatskevich 2018-06-14 11:58 ` AKhatskevich 2018-06-21 12:04 ` [tarantool-patches] " Vladislav Shpilevoy 2018-06-26 12:43 ` Alex Khatskevich 2018-06-26 12:50 ` Alex Khatskevich 2018-06-27 9:54 ` Vladislav Shpilevoy 2018-06-27 10:44 ` Alex Khatskevich 2018-06-27 11:34 ` Vladislav Shpilevoy 2018-06-27 11:45 ` Alex Khatskevich 2018-06-27 11:53 ` Vladislav Shpilevoy 2018-06-27 13:11 ` Vladislav Shpilevoy
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox