From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id DA67620904 for ; Wed, 27 Jun 2018 06:44:16 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id UT4tFON4ND9u for ; Wed, 27 Jun 2018 06:44:16 -0400 (EDT) Received: from smtp40.i.mail.ru (smtp40.i.mail.ru [94.100.177.100]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 776371FCCB for ; Wed, 27 Jun 2018 06:44:16 -0400 (EDT) Subject: [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber References: <20180614114202.2634-1-avkhatskevich@tarantool.org> <45c2c224-6bfa-c54d-7654-36961a9d1c66@tarantool.org> <0c6f7167-d61d-010e-9ba5-5c08244a5bb9@tarantool.org> <527ad322-e091-dcf6-ad74-5de5a4fcc6ac@tarantool.org> From: Alex Khatskevich Message-ID: <51114d53-806d-5c65-3424-7a965a434a98@tarantool.org> Date: Wed, 27 Jun 2018 13:44:10 +0300 MIME-Version: 1.0 In-Reply-To: <527ad322-e091-dcf6-ad74-5de5a4fcc6ac@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-US Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: Vladislav Shpilevoy , tarantool-patches@freelists.org On 27.06.2018 12:54, Vladislav Shpilevoy wrote: > Hello. Thanks for the fixes! See 4 comments below. > > On 26/06/2018 15:50, Alex Khatskevich wrote: >> Sorry, I forgot to put the new patch here >> >> commit 4ddab3c47c5963c3138701d89ba3091d5da9848a >> Author: AKhatskevich >> Date:   Thu Jun 14 14:03:07 2018 +0300 >> >>      Reload reloadable fiber >> >>      Fixed a problem: >>      The `reloadable_fiber_f` was running an infinite while loop and >>      preventing the whole module from being reloaded. >> >>      This behavior is fixed by calling new version of >> `reloadable_fiber_f` in >>      a return statement instead of the where loop.  Note: calling a >> function > > 1. Again 'where loop'. > Sorry >>      in a return statement doesn't increase a stack size. >> >>      Extra changes: >>       * transfer write "started" message responsibility from caller to >>         reloadable fiber >> >>      Closes #116 >> >> diff --git a/test/unit/util.result b/test/unit/util.result >> new file mode 100644 >> index 0000000..dbfcce9 >> --- /dev/null >> +++ b/test/unit/util.result >> @@ -0,0 +1,72 @@ >> +test_run = require('test_run').new() >> +--- >> +... >> +fiber = require('fiber') >> +--- >> +... >> +log = require('log') >> +--- >> +... >> +test_util = require('util') >> +--- >> +... >> +util = require('vshard.util') >> +--- >> +... >> +test_run:cmd("setopt delimiter ';'") >> +--- >> +- true >> +... >> +fake_M = { >> +    reloadable_func = nil, >> +    module_version = 1, >> +}; >> +--- >> +... >> +test_run:cmd("setopt delimiter ''"); >> +--- >> +- true >> +... >> +-- Check autoreload on function change during failure. >> +fake_M.reloadable_function = function () fake_M.reloadable_function >> = 42; error('Error happened.') end >> +--- >> +... >> +fib = fiber.create(util.reloadable_fiber_f, fake_M, >> 'reloadable_function', 'Worker_name') >> +--- >> +... >> +fib:cancel() >> +--- >> +... >> +test_run:grep_log('default', 'Worker_name: reloadable function >> reloadable_function has changed') > > 2. 'Has been' changed? I has changed it, but there are two notes: 1. Both are correct https://english.stackexchange.com/questions/97243/has-been-changed-or-has-changed 2. vshard uses both options:    a. "Configuration has changed, restart"    b. "Rebalancer location has changed" > >> +--- >> +- 'Worker_name: reloadable function reloadable_function has changed' >> +... >> diff --git a/vshard/util.lua b/vshard/util.lua >> index bb71318..920f53e 100644 >> --- a/vshard/util.lua >> +++ b/vshard/util.lua >> @@ -19,33 +32,37 @@ local function tuple_extract_key(tuple, parts) >> +local function reloadable_fiber_f(module, func_name, worker_name) >> +    log.info('%s has been started', worker_name) >> +    local func = module[func_name] >> +    local ok, err = pcall(func, module.module_version) >> +    if not ok then >> +        log.error('%s has been failed: %s', worker_name, err) >> +        if func ~= module[func_name] then >> +            log.warn('%s: reloadable function %s has changed', >> +                        worker_name, func_name) > > 3. Why is these check and warn here, after an error? Changed: 1. warn -> error 2. Comment added:         -- There is a chance that error was raised during reload         -- (or caused by reload). Perform reload in case function         -- has been changed. > >>           end >>       end >> +    -- yield serves two pursoses: >> +    --  * makes this fiber cancellable >> +    --  * prevents 100% cpu consumption >> +    fiber.yield() >> +    log.info('%s is reloaded, restarting', worker_name) > > 4. Why do you print 'is reloaded' even if it just thrown an error > and the function is not changed actually? Fixed by goto > >> +    -- luajit drops this frame if next function is called in >> +    -- return statement. >> +    return M.reloadable_fiber_f(module, func_name, worker_name) >>   end Full diff: commit d098bf0930935b029fbca3f4fd78211dd5022fb1 Author: AKhatskevich Date:   Thu Jun 14 14:03:07 2018 +0300     Reload reloadable fiber     Fixed a problem:     The `reloadable_fiber_f` was running an infinite while loop and     preventing the whole module from being reloaded.     This behavior is fixed by calling new version of `reloadable_fiber_f` in     a return statement instead of the while loop. Note: calling a function     in a return statement doesn't increase a stack size.     Extra changes:      * transfer write "started" message responsibility from caller to        reloadable fiber     Closes #116 diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result index 22100fe..6e3aa81 100644 --- a/test/rebalancer/rebalancer.result +++ b/test/rebalancer/rebalancer.result @@ -424,7 +424,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)  fiber = require('fiber')  ---  ... -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end  ---  ...  while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua index 3327f30..922357a 100644 --- a/test/rebalancer/rebalancer.test.lua +++ b/test/rebalancer/rebalancer.test.lua @@ -197,7 +197,7 @@ switch_rs1_master()  vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)  fiber = require('fiber') -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end  while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end  -- diff --git a/test/router/reload.result b/test/router/reload.result index 19a9ead..47f3c2e 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -116,10 +116,10 @@ vshard.router.module_version()  check_reloaded()  ---  ... -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end  ---  ... -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end  ---  ...  check_reloaded() diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 6e21b74..af2939d 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -59,8 +59,8 @@ vshard.router.module_version()  check_reloaded() -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end  check_reloaded() diff --git a/test/storage/reload.result b/test/storage/reload.result index f689cf4..531d984 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -113,13 +113,13 @@ vshard.storage.module_version()  check_reloaded()  ---  ... -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end  ---  ... -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end  ---  ... -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end  ---  ...  check_reloaded() diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 6e19a92..64c3a60 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -59,9 +59,9 @@ vshard.storage.module_version()  check_reloaded() -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end  check_reloaded() diff --git a/test/unit/util.result b/test/unit/util.result new file mode 100644 index 0000000..a7b176d --- /dev/null +++ b/test/unit/util.result @@ -0,0 +1,72 @@ +test_run = require('test_run').new() +--- +... +fiber = require('fiber') +--- +... +log = require('log') +--- +... +test_util = require('util') +--- +... +util = require('vshard.util') +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +fake_M = { +    reloadable_func = nil, +    module_version = 1, +}; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Check autoreload on function change during failure. +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end +--- +... +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +--- +... +fib:cancel() +--- +... +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') +--- +- 'Worker_name: reloadable function reloadable_function has been changed' +... +test_run:grep_log('default', 'Worker_name is reloaded, restarting') +--- +- Worker_name is reloaded, restarting +... +test_run:grep_log('default', 'Worker_name has been started') +--- +- Worker_name has been started +... +log.info(string.rep('a', 1000)) +--- +... +-- Check reload feature. +fake_M.reloadable_function = function () return true end +--- +... +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +--- +... +fib:cancel() +--- +... +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) +--- +- Worker_name is reloaded, restarting +... +test_run:grep_log('default', 'Worker_name has been started', 1000) +--- +- Worker_name has been started +... diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua new file mode 100644 index 0000000..9359378 --- /dev/null +++ b/test/unit/util.test.lua @@ -0,0 +1,29 @@ +test_run = require('test_run').new() +fiber = require('fiber') +log = require('log') +test_util = require('util') +util = require('vshard.util') + +test_run:cmd("setopt delimiter ';'") +fake_M = { +    reloadable_func = nil, +    module_version = 1, +}; +test_run:cmd("setopt delimiter ''"); + +-- Check autoreload on function change during failure. +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end + +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib:cancel() +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') +test_run:grep_log('default', 'Worker_name is reloaded, restarting') +test_run:grep_log('default', 'Worker_name has been started') +log.info(string.rep('a', 1000)) + +-- Check reload feature. +fake_M.reloadable_function = function () return true end +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib:cancel() +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) +test_run:grep_log('default', 'Worker_name has been started', 1000) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 21093e5..9e18b27 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -491,11 +491,9 @@ local function router_cfg(cfg)      end      lreplicaset.wait_masters_connect(new_replicasets)      if M.failover_fiber == nil then -        log.info('Start failover fiber')          lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover')      end      if M.discovery_fiber == nil then -        log.info('Start discovery fiber')          lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery')      end      -- Destroy connections, not used in a new configuration. diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 57076e1..300751f 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -692,10 +692,8 @@ local function local_on_master_enable()      M.collect_bucket_garbage_fiber =          lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',                        'Garbage collector') -    log.info("GC is started")      M.recovery_fiber =          lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery') -    log.info('Recovery is started')      -- TODO: check current status      log.info("Took on replicaset master role")  end @@ -1585,7 +1583,6 @@ local function storage_cfg(cfg, this_replica_uuid)      if min_master == this_replica then          if not M.rebalancer_fiber then -            log.info('Run rebalancer')              M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M, 'rebalancer_f', 'Rebalancer')          else diff --git a/vshard/util.lua b/vshard/util.lua index bb71318..ce79930 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -2,6 +2,19 @@  local log = require('log')  local fiber = require('fiber') +local MODULE_INTERNALS = '__module_vshard_util' +local M = rawget(_G, MODULE_INTERNALS) +if not M then +    -- +    -- The module is loaded for the first time. +    -- +    M = { +        -- Latest versions of functions. +        reloadable_fiber_f = nil, +    } +    rawset(_G, MODULE_INTERNALS, M) +end +  --  -- Extract parts of a tuple.  -- @param tuple Tuple to extract a key from. @@ -19,33 +32,42 @@ local function tuple_extract_key(tuple, parts)  end  -- --- Wrapper to run @a func in infinite loop and restart it on the --- module reload. This function CAN NOT BE AUTORELOADED. To update --- it you must manualy stop all fibers, run by this function, do --- reload, and then restart all stopped fibers. This can be done, --- for example, by calling vshard.storage/router.cfg() again with --- the same config as earlier. +-- Wrapper to run a func in infinite loop and restart it on +-- errors and module reload. +-- To handle module reload and run new version of a function +-- in the module, the function should just return.  -- --- @param func Reloadable function to run. It must accept current ---        module version as an argument, and interrupt itself, ---        when it is changed. --- @param worker_name Name of the function. Usual infinite fiber ---        represents a background subsystem, which has a name. For ---        example: "Garbage Collector", "Recovery", "Discovery", ---        "Rebalancer". --- @param M Module which can reload. +-- @param module Module which can be reloaded. +-- @param func_name Name of a function to be executed in the +--        module. +-- @param worker_name Name of the reloadable background subsystem. +--        For example: "Garbage Collector", "Recovery", "Discovery", +--        "Rebalancer". Used only for an activity logging.  -- -local function reloadable_fiber_f(M, func_name, worker_name) -    while true do -        local ok, err = pcall(M[func_name], M.module_version) -        if not ok then -            log.error('%s has been failed: %s', worker_name, err) -            fiber.yield() -        else -            log.info('%s has been reloaded', worker_name) -            fiber.yield() +local function reloadable_fiber_f(module, func_name, worker_name) +    log.info('%s has been started', worker_name) +    local func = module[func_name] +::reload_loop:: +    local ok, err = pcall(func, module.module_version) +    -- yield serves two pursoses: +    --  * makes this fiber cancellable +    --  * prevents 100% cpu consumption +    fiber.yield() +    if not ok then +        log.error('%s has been failed: %s', worker_name, err) +        if func == module[func_name] then +            goto reload_loop          end +        -- There is a chance that error was raised during reload +        -- (or caused by reload). Perform reload in case function +        -- has been changed. +        log.error('%s: reloadable function %s has been changed', +                  worker_name, func_name)      end +    log.info('%s is reloaded, restarting', worker_name) +    -- luajit drops this frame if next function is called in +    -- return statement. +    return M.reloadable_fiber_f(module, func_name, worker_name)  end  -- @@ -75,8 +97,12 @@ local function generate_self_checker(obj_name, func_name, mt, func)      end  end +-- Update latest versions of function +M.reloadable_fiber_f = reloadable_fiber_f +  return {      tuple_extract_key = tuple_extract_key,      reloadable_fiber_f = reloadable_fiber_f,      generate_self_checker = generate_self_checker, +    internal = M,  }