From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 718DB282A8 for ; Mon, 30 Jul 2018 04:56:24 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id oQxwXbNqV8BU for ; Mon, 30 Jul 2018 04:56:24 -0400 (EDT) Received: from smtp56.i.mail.ru (smtp56.i.mail.ru [217.69.128.36]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id D9E512829D for ; Mon, 30 Jul 2018 04:56:23 -0400 (EDT) From: AKhatskevich Subject: [tarantool-patches] [PATCH 2/4] Refactor reloadable fiber Date: Mon, 30 Jul 2018 11:56:04 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: v.shpilevoy@tarantool.org, tarantool-patches@freelists.org Reloadable fiber changes: * renamed reloadable_fiber_f -> reloadable_fiber * reloadable_fiber creates fiber and gives name on its own * worker name replaced with a fiber name in logs * added extra data argument, which is passed to a function --- test/rebalancer/rebalancer.result | 2 +- test/rebalancer/rebalancer.test.lua | 2 +- test/router/reload.result | 4 +-- test/router/reload.test.lua | 4 +-- test/storage/reload.result | 6 ++-- test/storage/reload.test.lua | 6 ++-- test/unit/garbage.result | 2 +- test/unit/garbage.test.lua | 2 +- test/unit/util.result | 20 ++++--------- test/unit/util.test.lua | 12 ++++---- vshard/router/init.lua | 16 +++++----- vshard/storage/init.lua | 21 +++++++------ vshard/util.lua | 59 +++++++++++++++++++++---------------- 13 files changed, 76 insertions(+), 80 deletions(-) diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result index 88cbaae..bf9e63b 100644 --- a/test/rebalancer/rebalancer.result +++ b/test/rebalancer/rebalancer.result @@ -418,7 +418,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) fiber = require('fiber') --- ... -while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "vshard.rebalancer has been started") do fiber.sleep(0.1) end --- ... while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua index 01f2061..24c2706 100644 --- a/test/rebalancer/rebalancer.test.lua +++ b/test/rebalancer/rebalancer.test.lua @@ -195,7 +195,7 @@ switch_rs1_master() vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) fiber = require('fiber') -while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "vshard.rebalancer has been started") do fiber.sleep(0.1) end while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end -- diff --git a/test/router/reload.result b/test/router/reload.result index 88122aa..28b5004 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -116,10 +116,10 @@ vshard.router.module_version() check_reloaded() --- ... -while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'vshard.failover has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'vshard.discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end --- ... check_reloaded() diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 01b7163..d15986f 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -59,8 +59,8 @@ vshard.router.module_version() check_reloaded() -while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end -while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'vshard.failover has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'vshard.discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end check_reloaded() diff --git a/test/storage/reload.result b/test/storage/reload.result index b91b622..898a18f 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -113,13 +113,13 @@ vshard.storage.module_version() check_reloaded() --- ... -while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'vshard.gc has been started') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'vshard.recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end --- ... -while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'vshard.rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end --- ... check_reloaded() diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 9140299..a2e8241 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -59,9 +59,9 @@ vshard.storage.module_version() check_reloaded() -while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end -while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end -while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'vshard.gc has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'vshard.recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'vshard.rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end check_reloaded() diff --git a/test/unit/garbage.result b/test/unit/garbage.result index 0da8ee1..a352bd8 100644 --- a/test/unit/garbage.result +++ b/test/unit/garbage.result @@ -343,7 +343,7 @@ control.bucket_generation_collected collect_f = vshard.storage.internal.collect_garbage_f --- ... -f = fiber.create(collect_f, vshard.storage.module_version()) +f = fiber.create(collect_f) --- ... fill_spaces_with_garbage() diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua index db7821f..80d37e7 100644 --- a/test/unit/garbage.test.lua +++ b/test/unit/garbage.test.lua @@ -149,7 +149,7 @@ control.bucket_generation_collected -- Test continuous garbage collection via background fiber. -- collect_f = vshard.storage.internal.collect_garbage_f -f = fiber.create(collect_f, vshard.storage.module_version()) +f = fiber.create(collect_f) fill_spaces_with_garbage() -- Wait until garbage collection is finished. while #s2:select{} ~= 2 do fiber.sleep(0.1) end diff --git a/test/unit/util.result b/test/unit/util.result index 30906d1..56b863e 100644 --- a/test/unit/util.result +++ b/test/unit/util.result @@ -34,22 +34,18 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end --- ... -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib = util.reloadable_fiber('Worker_name', fake_M, 'reloadable_function') --- ... -while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end +while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end --- ... fib:cancel() --- ... -test_run:grep_log('default', 'Worker_name is reloaded, restarting') +test_run:grep_log('default', 'module is reloaded, restarting') --- -- Worker_name is reloaded, restarting -... -test_run:grep_log('default', 'Worker_name has been started') ---- -- Worker_name has been started +- module is reloaded, restarting ... log.info(string.rep('a', 1000)) --- @@ -58,16 +54,12 @@ log.info(string.rep('a', 1000)) fake_M.reloadable_function = function () fiber.sleep(0.01); return true end --- ... -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib = util.reloadable_fiber('Worker_name', fake_M, 'reloadable_function') --- ... -while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end +while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end --- ... fib:cancel() --- ... -test_run:grep_log('default', 'Worker_name has been started', 1000) ---- -- Worker_name has been started -... diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua index 131274c..b26bb51 100644 --- a/test/unit/util.test.lua +++ b/test/unit/util.test.lua @@ -14,16 +14,14 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end -- Check autoreload on function change during failure. fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') -while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end +fib = util.reloadable_fiber('Worker_name', fake_M, 'reloadable_function') +while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end fib:cancel() -test_run:grep_log('default', 'Worker_name is reloaded, restarting') -test_run:grep_log('default', 'Worker_name has been started') +test_run:grep_log('default', 'module is reloaded, restarting') log.info(string.rep('a', 1000)) -- Check reload feature. fake_M.reloadable_function = function () fiber.sleep(0.01); return true end -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') -while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end +fib = util.reloadable_fiber('Worker_name', fake_M, 'reloadable_function') +while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end fib:cancel() -test_run:grep_log('default', 'Worker_name has been started', 1000) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 1a0ed2f..2df3446 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -149,9 +149,8 @@ end -- Background fiber to perform discovery. It periodically scans -- replicasets one by one and updates route_map. -- -local function discovery_f(module_version) - lfiber.name('discovery_fiber') - M.discovery_fiber = lfiber.self() +local function discovery_f() + local module_version = M.module_version local iterations_until_lua_gc = consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL while module_version == M.module_version do @@ -459,9 +458,8 @@ end -- tries to reconnect to the best replica. When the connection is -- established, it replaces the original replica. -- -local function failover_f(module_version) - lfiber.name('vshard.failover') - M.failover_fiber = lfiber.self() +local function failover_f() + local module_version = M.module_version local min_timeout = math.min(consts.FAILOVER_UP_TIMEOUT, consts.FAILOVER_DOWN_TIMEOUT) -- This flag is used to avoid logging like: @@ -545,10 +543,12 @@ local function router_cfg(cfg) M.route_map[bucket] = M.replicasets[rs.uuid] end if M.failover_fiber == nil then - lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover') + M.failover_fiber = util.reloadable_fiber('vshard.failover', M, + 'failover_f') end if M.discovery_fiber == nil then - lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery') + M.discovery_fiber = util.reloadable_fiber('vshard.discovery', M, + 'discovery_f') end -- Destroy connections, not used in a new configuration. collectgarbage() diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index c1df0e6..8ca81f6 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -354,8 +354,8 @@ end -- appears to be changed, then stop recovery. It is -- restarted in reloadable_fiber_f(). -- -local function recovery_f(module_version) - lfiber.name('vshard.recovery') +local function recovery_f() + local module_version = M.module_version local _bucket = box.space._bucket M.buckets_to_recovery = {} for _, bucket in _bucket.index.status:pairs({consts.BUCKET.SENDING}) do @@ -728,10 +728,9 @@ local function local_on_master_enable() M._on_master_enable:run() -- Start background process to collect garbage. M.collect_bucket_garbage_fiber = - lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f', - 'Garbage collector') + util.reloadable_fiber('vshard.gc', M, 'collect_garbage_f') M.recovery_fiber = - lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery') + util.reloadable_fiber('vshard.recovery', M, 'recovery_f') -- TODO: check current status log.info("Took on replicaset master role") end @@ -1026,8 +1025,8 @@ end -- appears to be changed, then stop GC. It is restarted -- in reloadable_fiber_f(). -- -function collect_garbage_f(module_version) - lfiber.name('vshard.gc') +function collect_garbage_f() + local module_version = M.module_version -- Collector controller. Changes of _bucket increments -- bucket generation. Garbage collector has its own bucket -- generation which is <= actual. Garbage collection is @@ -1351,8 +1350,8 @@ end -- Background rebalancer. Works on a storage which has the -- smallest replicaset uuid and which is master. -- -local function rebalancer_f(module_version) - lfiber.name('vshard.rebalancer') +local function rebalancer_f() + local module_version = M.module_version while module_version == M.module_version do while not M.is_rebalancer_active do log.info('Rebalancer is disabled. Sleep') @@ -1642,8 +1641,8 @@ local function storage_cfg(cfg, this_replica_uuid) if min_master == this_replica then if not M.rebalancer_fiber then - M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M, - 'rebalancer_f', 'Rebalancer') + M.rebalancer_fiber = util.reloadable_fiber('vshard.rebalancer', M, + 'rebalancer_f') else log.info('Wakeup rebalancer') -- Configuration had changed. Time to rebalance. diff --git a/vshard/util.lua b/vshard/util.lua index fb875ce..1319acc 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -10,7 +10,7 @@ if not M then -- M = { -- Latest versions of functions. - reloadable_fiber_f = nil, + reloadable_fiber_main_loop = nil, } rawset(_G, MODULE_INTERNALS, M) end @@ -31,6 +31,30 @@ local function tuple_extract_key(tuple, parts) return key end +local function reloadable_fiber_main_loop(module, func_name, data) + local func = module[func_name] +::restart_loop:: + local ok, err = pcall(func, data) + -- yield serves two purposes: + -- * makes this fiber cancellable + -- * prevents 100% cpu consumption + fiber.yield() + if not ok then + log.error('%s has been failed: %s', func_name, err) + if func == module[func_name] then + goto restart_loop + end + -- There is a chance that error was raised during reload + -- (or caused by reload). Perform reload in case function + -- has been changed. + log.error('reloadable function %s has been changed', func_name) + end + log.info('module is reloaded, restarting') + -- luajit drops this frame if next function is called in + -- return statement. + return M.reloadable_fiber_main_loop(module, func_name, data) +end + -- -- Wrapper to run a func in infinite loop and restart it on -- errors and module reload. @@ -44,30 +68,13 @@ end -- For example: "Garbage Collector", "Recovery", "Discovery", -- "Rebalancer". Used only for an activity logging. -- -local function reloadable_fiber_f(module, func_name, worker_name) +local function reloadable_fiber(worker_name, module, func_name, data) + assert(type(worker_name) == 'string') + local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name, + data) log.info('%s has been started', worker_name) - local func = module[func_name] -::reload_loop:: - local ok, err = pcall(func, module.module_version) - -- yield serves two pursoses: - -- * makes this fiber cancellable - -- * prevents 100% cpu consumption - fiber.yield() - if not ok then - log.error('%s has been failed: %s', worker_name, err) - if func == module[func_name] then - goto reload_loop - end - -- There is a chance that error was raised during reload - -- (or caused by reload). Perform reload in case function - -- has been changed. - log.error('%s: reloadable function %s has been changed', - worker_name, func_name) - end - log.info('%s is reloaded, restarting', worker_name) - -- luajit drops this frame if next function is called in - -- return statement. - return M.reloadable_fiber_f(module, func_name, worker_name) + xfiber:name(worker_name) + return xfiber end -- @@ -98,7 +105,7 @@ local function generate_self_checker(obj_name, func_name, mt, func) end -- Update latest versions of function -M.reloadable_fiber_f = reloadable_fiber_f +M.reloadable_fiber_main_loop = reloadable_fiber_main_loop local function sync_task(delay, task, ...) if delay then @@ -121,7 +128,7 @@ end return { tuple_extract_key = tuple_extract_key, - reloadable_fiber_f = reloadable_fiber_f, + reloadable_fiber = reloadable_fiber, generate_self_checker = generate_self_checker, async_task = async_task, internal = M, -- 2.14.1