From: Alex Khatskevich <avkhatskevich@tarantool.org> To: tarantool-patches@freelists.org Subject: [tarantool-patches] Re: [PATCH 2/4] Refactor reloadable fiber Date: Tue, 31 Jul 2018 14:30:40 +0300 [thread overview] Message-ID: <4fdb6747-6283-7823-e24e-caf1b709ab04@tarantool.org> (raw) In-Reply-To: <03fadaa0-c6a1-bc63-69b6-ccdccc155848@tarantool.org> full diff commit 8afc94e2bb4ec880815e7f2154607d9f23a88e27 Author: AKhatskevich <avkhatskevich@tarantool.org> Date: Thu Jul 26 19:42:14 2018 +0300 Refactor reloadable fiber Reloadable fiber changes: * renamed reloadable_fiber_f -> reloadable_fiber * reloadable_fiber creates fiber and gives name on its own * worker name replaced with a fiber name in logs * added extra data argument, which is passed to a function diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result index 88cbaae..3df8d4b 100644 --- a/test/rebalancer/rebalancer.result +++ b/test/rebalancer/rebalancer.result @@ -418,7 +418,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) fiber = require('fiber') --- ... -while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "rebalancer_f has been started") do fiber.sleep(0.1) end --- ... while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua index 01f2061..0cfe9a9 100644 --- a/test/rebalancer/rebalancer.test.lua +++ b/test/rebalancer/rebalancer.test.lua @@ -195,7 +195,7 @@ switch_rs1_master() vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) fiber = require('fiber') -while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "rebalancer_f has been started") do fiber.sleep(0.1) end while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end -- diff --git a/test/router/reload.result b/test/router/reload.result index 88122aa..f0badc3 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -116,10 +116,10 @@ vshard.router.module_version() check_reloaded() --- ... -while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'failover_f has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'discovery_f has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end --- ... check_reloaded() diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 01b7163..528222a 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -59,8 +59,8 @@ vshard.router.module_version() check_reloaded() -while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end -while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'failover_f has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'discovery_f has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end check_reloaded() diff --git a/test/storage/reload.result b/test/storage/reload.result index b91b622..c354dba 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -113,13 +113,13 @@ vshard.storage.module_version() check_reloaded() --- ... -while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'collect_garbage_f has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'recovery_f has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end --- ... -while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'rebalancer_f has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end --- ... check_reloaded() diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 9140299..a8df1df 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -59,9 +59,9 @@ vshard.storage.module_version() check_reloaded() -while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end -while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end -while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'collect_garbage_f has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'recovery_f has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'rebalancer_f has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end check_reloaded() diff --git a/test/unit/garbage.result b/test/unit/garbage.result index 0da8ee1..a352bd8 100644 --- a/test/unit/garbage.result +++ b/test/unit/garbage.result @@ -343,7 +343,7 @@ control.bucket_generation_collected collect_f = vshard.storage.internal.collect_garbage_f --- ... -f = fiber.create(collect_f, vshard.storage.module_version()) +f = fiber.create(collect_f) --- ... fill_spaces_with_garbage() diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua index db7821f..80d37e7 100644 --- a/test/unit/garbage.test.lua +++ b/test/unit/garbage.test.lua @@ -149,7 +149,7 @@ control.bucket_generation_collected -- Test continuous garbage collection via background fiber. -- collect_f = vshard.storage.internal.collect_garbage_f -f = fiber.create(collect_f, vshard.storage.module_version()) +f = fiber.create(collect_f) fill_spaces_with_garbage() -- Wait until garbage collection is finished. while #s2:select{} ~= 2 do fiber.sleep(0.1) end diff --git a/test/unit/util.result b/test/unit/util.result index 30906d1..096e36f 100644 --- a/test/unit/util.result +++ b/test/unit/util.result @@ -34,22 +34,22 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end --- ... -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function') --- ... -while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end +while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end --- ... fib:cancel() --- ... -test_run:grep_log('default', 'Worker_name is reloaded, restarting') +test_run:grep_log('default', 'module is reloaded, restarting') --- -- Worker_name is reloaded, restarting +- module is reloaded, restarting ... -test_run:grep_log('default', 'Worker_name has been started') +test_run:grep_log('default', 'reloadable_function has been started') --- -- Worker_name has been started +- reloadable_function has been started ... log.info(string.rep('a', 1000)) --- @@ -58,16 +58,16 @@ log.info(string.rep('a', 1000)) fake_M.reloadable_function = function () fiber.sleep(0.01); return true end --- ... -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function') --- ... -while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end +while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end --- ... -fib:cancel() +test_run:grep_log('default', 'reloadable_function has been started', 1000) --- +- reloadable_function has been started ... -test_run:grep_log('default', 'Worker_name has been started', 1000) +fib:cancel() --- -- Worker_name has been started ... diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua index 131274c..5f39e06 100644 --- a/test/unit/util.test.lua +++ b/test/unit/util.test.lua @@ -14,16 +14,16 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end -- Check autoreload on function change during failure. fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') -while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function') +while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end fib:cancel() -test_run:grep_log('default', 'Worker_name is reloaded, restarting') -test_run:grep_log('default', 'Worker_name has been started') +test_run:grep_log('default', 'module is reloaded, restarting') +test_run:grep_log('default', 'reloadable_function has been started') log.info(string.rep('a', 1000)) -- Check reload feature. fake_M.reloadable_function = function () fiber.sleep(0.01); return true end -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') -while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function') +while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end +test_run:grep_log('default', 'reloadable_function has been started', 1000) fib:cancel() -test_run:grep_log('default', 'Worker_name has been started', 1000) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 1a0ed2f..4cb19fd 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -149,9 +149,8 @@ end -- Background fiber to perform discovery. It periodically scans -- replicasets one by one and updates route_map. -- -local function discovery_f(module_version) - lfiber.name('discovery_fiber') - M.discovery_fiber = lfiber.self() +local function discovery_f() + local module_version = M.module_version local iterations_until_lua_gc = consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL while module_version == M.module_version do @@ -459,9 +458,8 @@ end -- tries to reconnect to the best replica. When the connection is -- established, it replaces the original replica. -- -local function failover_f(module_version) - lfiber.name('vshard.failover') - M.failover_fiber = lfiber.self() +local function failover_f() + local module_version = M.module_version local min_timeout = math.min(consts.FAILOVER_UP_TIMEOUT, consts.FAILOVER_DOWN_TIMEOUT) -- This flag is used to avoid logging like: @@ -545,10 +543,12 @@ local function router_cfg(cfg) M.route_map[bucket] = M.replicasets[rs.uuid] end if M.failover_fiber == nil then - lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover') + M.failover_fiber = util.reloadable_fiber_create( + 'vshard.failover', M, 'failover_f') end if M.discovery_fiber == nil then - lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery') + M.discovery_fiber = util.reloadable_fiber_create( + 'vshard.discovery', M, 'discovery_f') end -- Destroy connections, not used in a new configuration. collectgarbage() diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index c1df0e6..5ec6898 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -352,10 +352,10 @@ end -- @param module_version Module version, on which the current -- function had been started. If the actual module version -- appears to be changed, then stop recovery. It is --- restarted in reloadable_fiber_f(). +-- restarted in reloadable_fiber. -- -local function recovery_f(module_version) - lfiber.name('vshard.recovery') +local function recovery_f() + local module_version = M.module_version local _bucket = box.space._bucket M.buckets_to_recovery = {} for _, bucket in _bucket.index.status:pairs({consts.BUCKET.SENDING}) do @@ -728,10 +728,9 @@ local function local_on_master_enable() M._on_master_enable:run() -- Start background process to collect garbage. M.collect_bucket_garbage_fiber = - lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f', - 'Garbage collector') + util.reloadable_fiber_create('vshard.gc', M, 'collect_garbage_f') M.recovery_fiber = - lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery') + util.reloadable_fiber_create('vshard.recovery', M, 'recovery_f') -- TODO: check current status log.info("Took on replicaset master role") end @@ -1024,10 +1023,10 @@ end -- @param module_version Module version, on which the current -- function had been started. If the actual module version -- appears to be changed, then stop GC. It is restarted --- in reloadable_fiber_f(). +-- in reloadable_fiber. -- -function collect_garbage_f(module_version) - lfiber.name('vshard.gc') +function collect_garbage_f() + local module_version = M.module_version -- Collector controller. Changes of _bucket increments -- bucket generation. Garbage collector has its own bucket -- generation which is <= actual. Garbage collection is @@ -1351,8 +1350,8 @@ end -- Background rebalancer. Works on a storage which has the -- smallest replicaset uuid and which is master. -- -local function rebalancer_f(module_version) - lfiber.name('vshard.rebalancer') +local function rebalancer_f() + local module_version = M.module_version while module_version == M.module_version do while not M.is_rebalancer_active do log.info('Rebalancer is disabled. Sleep') @@ -1642,8 +1641,8 @@ local function storage_cfg(cfg, this_replica_uuid) if min_master == this_replica then if not M.rebalancer_fiber then - M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M, - 'rebalancer_f', 'Rebalancer') + M.rebalancer_fiber = util.reloadable_fiber_create( + 'vshard.rebalancer', M, 'rebalancer_f') else log.info('Wakeup rebalancer') -- Configuration had changed. Time to rebalance. diff --git a/vshard/util.lua b/vshard/util.lua index fb875ce..ea676ff 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -10,7 +10,7 @@ if not M then -- M = { -- Latest versions of functions. - reloadable_fiber_f = nil, + reloadable_fiber_main_loop = nil, } rawset(_G, MODULE_INTERNALS, M) end @@ -34,40 +34,52 @@ end -- -- Wrapper to run a func in infinite loop and restart it on -- errors and module reload. --- To handle module reload and run new version of a function --- in the module, the function should just return. --- --- @param module Module which can be reloaded. --- @param func_name Name of a function to be executed in the --- module. --- @param worker_name Name of the reloadable background subsystem. --- For example: "Garbage Collector", "Recovery", "Discovery", --- "Rebalancer". Used only for an activity logging. +-- This loop executes the latest version of itself in case of +-- reload of that module. +-- See description of parameters in `reloadable_fiber_create`. -- -local function reloadable_fiber_f(module, func_name, worker_name) - log.info('%s has been started', worker_name) +local function reloadable_fiber_main_loop(module, func_name) + log.info('%s has been started', func_name) local func = module[func_name] -::reload_loop:: - local ok, err = pcall(func, module.module_version) - -- yield serves two pursoses: +::restart_loop:: + local ok, err = pcall(func) + -- yield serves two purposes: -- * makes this fiber cancellable -- * prevents 100% cpu consumption fiber.yield() if not ok then - log.error('%s has been failed: %s', worker_name, err) + log.error('%s has been failed: %s', func_name, err) if func == module[func_name] then - goto reload_loop + goto restart_loop end -- There is a chance that error was raised during reload -- (or caused by reload). Perform reload in case function -- has been changed. - log.error('%s: reloadable function %s has been changed', - worker_name, func_name) + log.error('reloadable function %s has been changed', func_name) end - log.info('%s is reloaded, restarting', worker_name) + log.info('module is reloaded, restarting') -- luajit drops this frame if next function is called in -- return statement. - return M.reloadable_fiber_f(module, func_name, worker_name) + return M.reloadable_fiber_main_loop(module, func_name) +end + +-- +-- Create a new fiber which runs a function in a loop. This loop +-- is aware of reload mechanism and it loads a new version of the +-- function in that case. +-- To handle module reload and run new version of a function +-- in the module, the function should just return. +-- @param fiber_name Name of a new fiber. E.g. "vshard.rebalancer". +-- @param module Module which can be reloaded. +-- @param func_name Name of a function to be executed in the +-- module. +-- @retval New fiber. +-- +local function reloadable_fiber_create(fiber_name, module, func_name) + assert(type(fiber_name) == 'string') + local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name) + xfiber:name(fiber_name) + return xfiber end -- @@ -98,7 +110,7 @@ local function generate_self_checker(obj_name, func_name, mt, func) end -- Update latest versions of function -M.reloadable_fiber_f = reloadable_fiber_f +M.reloadable_fiber_main_loop = reloadable_fiber_main_loop local function sync_task(delay, task, ...) if delay then @@ -121,7 +133,7 @@ end return { tuple_extract_key = tuple_extract_key, - reloadable_fiber_f = reloadable_fiber_f, + reloadable_fiber_create = reloadable_fiber_create, generate_self_checker = generate_self_checker, async_task = async_task, internal = M,
next prev parent reply other threads:[~2018-07-31 11:30 UTC|newest] Thread overview: 31+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-07-30 8:56 [tarantool-patches] [PATCH v4] vshard module reload AKhatskevich 2018-07-30 8:56 ` [tarantool-patches] [PATCH 1/4] Fix races related to object outdating AKhatskevich 2018-07-30 11:55 ` [tarantool-patches] " Vladislav Shpilevoy 2018-07-30 16:46 ` Alex Khatskevich 2018-07-30 17:50 ` Vladislav Shpilevoy 2018-07-31 11:05 ` Alex Khatskevich 2018-08-01 12:36 ` Vladislav Shpilevoy 2018-08-01 17:44 ` Alex Khatskevich 2018-08-02 11:51 ` Vladislav Shpilevoy 2018-07-30 8:56 ` [tarantool-patches] [PATCH 2/4] Refactor reloadable fiber AKhatskevich 2018-07-30 11:55 ` [tarantool-patches] " Vladislav Shpilevoy 2018-07-31 11:24 ` Alex Khatskevich 2018-07-31 11:30 ` Alex Khatskevich [this message] 2018-08-01 11:54 ` Vladislav Shpilevoy 2018-07-30 8:56 ` [tarantool-patches] [PATCH 3/4] tests: separate bootstrap routine to a lua_libs AKhatskevich 2018-08-01 12:03 ` [tarantool-patches] " Vladislav Shpilevoy 2018-07-30 8:56 ` [tarantool-patches] [PATCH 4/4] Introduce storage reload evolution AKhatskevich 2018-07-30 11:55 ` [tarantool-patches] " Vladislav Shpilevoy 2018-07-31 11:29 ` Alex Khatskevich 2018-07-31 11:33 ` Alex Khatskevich 2018-08-01 12:36 ` Vladislav Shpilevoy 2018-08-01 18:09 ` Alex Khatskevich 2018-08-02 11:40 ` Vladislav Shpilevoy 2018-08-02 11:46 ` Vladislav Shpilevoy 2018-08-06 10:59 ` Alex Khatskevich 2018-08-06 15:36 ` Vladislav Shpilevoy 2018-08-06 16:08 ` Alex Khatskevich 2018-08-06 17:18 ` Vladislav Shpilevoy 2018-08-07 9:14 ` Alex Khatskevich 2018-08-08 10:35 ` Vladislav Shpilevoy 2018-08-01 14:07 ` [tarantool-patches] [PATCH] Check self arg passed for router objects AKhatskevich
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=4fdb6747-6283-7823-e24e-caf1b709ab04@tarantool.org \ --to=avkhatskevich@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='[tarantool-patches] Re: [PATCH 2/4] Refactor reloadable fiber' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox