From: Alex Khatskevich <avkhatskevich@tarantool.org> To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>, tarantool-patches@freelists.org Subject: [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber Date: Tue, 26 Jun 2018 15:50:43 +0300 [thread overview] Message-ID: <cd6d8c28-3d27-9769-d2da-caca1dcc3617@tarantool.org> (raw) In-Reply-To: <0c6f7167-d61d-010e-9ba5-5c08244a5bb9@tarantool.org> Sorry, I forgot to put the new patch here commit 4ddab3c47c5963c3138701d89ba3091d5da9848a Author: AKhatskevich <avkhatskevich@tarantool.org> Date: Thu Jun 14 14:03:07 2018 +0300 Reload reloadable fiber Fixed a problem: The `reloadable_fiber_f` was running an infinite while loop and preventing the whole module from being reloaded. This behavior is fixed by calling new version of `reloadable_fiber_f` in a return statement instead of the where loop. Note: calling a function in a return statement doesn't increase a stack size. Extra changes: * transfer write "started" message responsibility from caller to reloadable fiber Closes #116 diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result index 22100fe..6e3aa81 100644 --- a/test/rebalancer/rebalancer.result +++ b/test/rebalancer/rebalancer.result @@ -424,7 +424,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) fiber = require('fiber') --- ... -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end --- ... while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua index 3327f30..922357a 100644 --- a/test/rebalancer/rebalancer.test.lua +++ b/test/rebalancer/rebalancer.test.lua @@ -197,7 +197,7 @@ switch_rs1_master() vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) fiber = require('fiber') -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end -- diff --git a/test/router/reload.result b/test/router/reload.result index 19a9ead..47f3c2e 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -116,10 +116,10 @@ vshard.router.module_version() check_reloaded() --- ... -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end --- ... check_reloaded() diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 6e21b74..af2939d 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -59,8 +59,8 @@ vshard.router.module_version() check_reloaded() -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end check_reloaded() diff --git a/test/storage/reload.result b/test/storage/reload.result index f689cf4..531d984 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -113,13 +113,13 @@ vshard.storage.module_version() check_reloaded() --- ... -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end --- ... -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end --- ... check_reloaded() diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 6e19a92..64c3a60 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -59,9 +59,9 @@ vshard.storage.module_version() check_reloaded() -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end check_reloaded() diff --git a/test/unit/util.result b/test/unit/util.result new file mode 100644 index 0000000..dbfcce9 --- /dev/null +++ b/test/unit/util.result @@ -0,0 +1,72 @@ +test_run = require('test_run').new() +--- +... +fiber = require('fiber') +--- +... +log = require('log') +--- +... +test_util = require('util') +--- +... +util = require('vshard.util') +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +fake_M = { + reloadable_func = nil, + module_version = 1, +}; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Check autoreload on function change during failure. +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end +--- +... +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +--- +... +fib:cancel() +--- +... +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has changed') +--- +- 'Worker_name: reloadable function reloadable_function has changed' +... +test_run:grep_log('default', 'Worker_name is reloaded, restarting') +--- +- Worker_name is reloaded, restarting +... +test_run:grep_log('default', 'Worker_name has been started') +--- +- Worker_name has been started +... +log.info(string.rep('a', 1000)) +--- +... +-- Check reload feature. +fake_M.reloadable_function = function () return true end +--- +... +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +--- +... +fib:cancel() +--- +... +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) +--- +- Worker_name is reloaded, restarting +... +test_run:grep_log('default', 'Worker_name has been started', 1000) +--- +- Worker_name has been started +... diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua new file mode 100644 index 0000000..094cae9 --- /dev/null +++ b/test/unit/util.test.lua @@ -0,0 +1,29 @@ +test_run = require('test_run').new() +fiber = require('fiber') +log = require('log') +test_util = require('util') +util = require('vshard.util') + +test_run:cmd("setopt delimiter ';'") +fake_M = { + reloadable_func = nil, + module_version = 1, +}; +test_run:cmd("setopt delimiter ''"); + +-- Check autoreload on function change during failure. +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end + +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib:cancel() +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has changed') +test_run:grep_log('default', 'Worker_name is reloaded, restarting') +test_run:grep_log('default', 'Worker_name has been started') +log.info(string.rep('a', 1000)) + +-- Check reload feature. +fake_M.reloadable_function = function () return true end +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib:cancel() +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) +test_run:grep_log('default', 'Worker_name has been started', 1000) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 21093e5..9e18b27 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -491,11 +491,9 @@ local function router_cfg(cfg) end lreplicaset.wait_masters_connect(new_replicasets) if M.failover_fiber == nil then - log.info('Start failover fiber') lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover') end if M.discovery_fiber == nil then - log.info('Start discovery fiber') lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery') end -- Destroy connections, not used in a new configuration. diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 57076e1..300751f 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -692,10 +692,8 @@ local function local_on_master_enable() M.collect_bucket_garbage_fiber = lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f', 'Garbage collector') - log.info("GC is started") M.recovery_fiber = lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery') - log.info('Recovery is started') -- TODO: check current status log.info("Took on replicaset master role") end @@ -1585,7 +1583,6 @@ local function storage_cfg(cfg, this_replica_uuid) if min_master == this_replica then if not M.rebalancer_fiber then - log.info('Run rebalancer') M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M, 'rebalancer_f', 'Rebalancer') else diff --git a/vshard/util.lua b/vshard/util.lua index bb71318..920f53e 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -2,6 +2,19 @@ local log = require('log') local fiber = require('fiber') +local MODULE_INTERNALS = '__module_vshard_util' +local M = rawget(_G, MODULE_INTERNALS) +if not M then + -- + -- The module is loaded for the first time. + -- + M = { + -- Latest versions of functions. + reloadable_fiber_f = nil, + } + rawset(_G, MODULE_INTERNALS, M) +end + -- -- Extract parts of a tuple. -- @param tuple Tuple to extract a key from. @@ -19,33 +32,37 @@ local function tuple_extract_key(tuple, parts) end -- --- Wrapper to run @a func in infinite loop and restart it on the --- module reload. This function CAN NOT BE AUTORELOADED. To update --- it you must manualy stop all fibers, run by this function, do --- reload, and then restart all stopped fibers. This can be done, --- for example, by calling vshard.storage/router.cfg() again with --- the same config as earlier. +-- Wrapper to run a func in infinite loop and restart it on +-- errors and module reload. +-- To handle module reload and run new version of a function +-- in the module, the function should just return. -- --- @param func Reloadable function to run. It must accept current --- module version as an argument, and interrupt itself, --- when it is changed. --- @param worker_name Name of the function. Usual infinite fiber --- represents a background subsystem, which has a name. For --- example: "Garbage Collector", "Recovery", "Discovery", --- "Rebalancer". --- @param M Module which can reload. +-- @param module Module which can be reloaded. +-- @param func_name Name of a function to be executed in the +-- module. +-- @param worker_name Name of the reloadable background subsystem. +-- For example: "Garbage Collector", "Recovery", "Discovery", +-- "Rebalancer". Used only for an activity logging. -- -local function reloadable_fiber_f(M, func_name, worker_name) - while true do - local ok, err = pcall(M[func_name], M.module_version) - if not ok then - log.error('%s has been failed: %s', worker_name, err) - fiber.yield() - else - log.info('%s has been reloaded', worker_name) - fiber.yield() +local function reloadable_fiber_f(module, func_name, worker_name) + log.info('%s has been started', worker_name) + local func = module[func_name] + local ok, err = pcall(func, module.module_version) + if not ok then + log.error('%s has been failed: %s', worker_name, err) + if func ~= module[func_name] then + log.warn('%s: reloadable function %s has changed', + worker_name, func_name) end end + -- yield serves two pursoses: + -- * makes this fiber cancellable + -- * prevents 100% cpu consumption + fiber.yield() + log.info('%s is reloaded, restarting', worker_name) + -- luajit drops this frame if next function is called in + -- return statement. + return M.reloadable_fiber_f(module, func_name, worker_name) end -- @@ -75,8 +92,12 @@ local function generate_self_checker(obj_name, func_name, mt, func) end end +-- Update latest versions of function +M.reloadable_fiber_f = reloadable_fiber_f + return { tuple_extract_key = tuple_extract_key, reloadable_fiber_f = reloadable_fiber_f, generate_self_checker = generate_self_checker, + internal = M, }
next prev parent reply other threads:[~2018-06-26 12:50 UTC|newest] Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-06-14 11:42 [tarantool-patches] " AKhatskevich 2018-06-14 11:58 ` AKhatskevich 2018-06-21 12:04 ` [tarantool-patches] " Vladislav Shpilevoy 2018-06-26 12:43 ` Alex Khatskevich 2018-06-26 12:50 ` Alex Khatskevich [this message] 2018-06-27 9:54 ` Vladislav Shpilevoy 2018-06-27 10:44 ` Alex Khatskevich 2018-06-27 11:34 ` Vladislav Shpilevoy 2018-06-27 11:45 ` Alex Khatskevich 2018-06-27 11:53 ` Vladislav Shpilevoy 2018-06-27 13:11 ` Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=cd6d8c28-3d27-9769-d2da-caca1dcc3617@tarantool.org \ --to=avkhatskevich@tarantool.org \ --cc=tarantool-patches@freelists.org \ --cc=v.shpilevoy@tarantool.org \ --subject='[tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox