From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 133992146E for ; Wed, 27 Jun 2018 07:34:22 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id Iu9fF4OCCfAL for ; Wed, 27 Jun 2018 07:34:21 -0400 (EDT) Received: from smtp53.i.mail.ru (smtp53.i.mail.ru [94.100.177.113]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 354C421418 for ; Wed, 27 Jun 2018 07:34:21 -0400 (EDT) Subject: [tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber References: <20180614114202.2634-1-avkhatskevich@tarantool.org> <45c2c224-6bfa-c54d-7654-36961a9d1c66@tarantool.org> <0c6f7167-d61d-010e-9ba5-5c08244a5bb9@tarantool.org> <527ad322-e091-dcf6-ad74-5de5a4fcc6ac@tarantool.org> <51114d53-806d-5c65-3424-7a965a434a98@tarantool.org> From: Vladislav Shpilevoy Message-ID: <43e5ead7-f6fd-6eda-dfc5-3a9bee2eab0a@tarantool.org> Date: Wed, 27 Jun 2018 14:34:17 +0300 MIME-Version: 1.0 In-Reply-To: <51114d53-806d-5c65-3424-7a965a434a98@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Language: en-US Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: Alex Khatskevich , tarantool-patches@freelists.org Please, rebase on the latest master branch. When I do, the tests fail. On 27/06/2018 13:44, Alex Khatskevich wrote: > > > On 27.06.2018 12:54, Vladislav Shpilevoy wrote: >> Hello. Thanks for the fixes! See 4 comments below. >> >> On 26/06/2018 15:50, Alex Khatskevich wrote: >>> Sorry, I forgot to put the new patch here >>> >>> commit 4ddab3c47c5963c3138701d89ba3091d5da9848a >>> Author: AKhatskevich >>> Date:   Thu Jun 14 14:03:07 2018 +0300 >>> >>>      Reload reloadable fiber >>> >>>      Fixed a problem: >>>      The `reloadable_fiber_f` was running an infinite while loop and >>>      preventing the whole module from being reloaded. >>> >>>      This behavior is fixed by calling new version of `reloadable_fiber_f` in >>>      a return statement instead of the where loop.  Note: calling a function >> >> 1. Again 'where loop'. >> > Sorry >>>      in a return statement doesn't increase a stack size. >>> >>>      Extra changes: >>>       * transfer write "started" message responsibility from caller to >>>         reloadable fiber >>> >>>      Closes #116 >>> >>> diff --git a/test/unit/util.result b/test/unit/util.result >>> new file mode 100644 >>> index 0000000..dbfcce9 >>> --- /dev/null >>> +++ b/test/unit/util.result >>> @@ -0,0 +1,72 @@ >>> +test_run = require('test_run').new() >>> +--- >>> +... >>> +fiber = require('fiber') >>> +--- >>> +... >>> +log = require('log') >>> +--- >>> +... >>> +test_util = require('util') >>> +--- >>> +... >>> +util = require('vshard.util') >>> +--- >>> +... >>> +test_run:cmd("setopt delimiter ';'") >>> +--- >>> +- true >>> +... >>> +fake_M = { >>> +    reloadable_func = nil, >>> +    module_version = 1, >>> +}; >>> +--- >>> +... >>> +test_run:cmd("setopt delimiter ''"); >>> +--- >>> +- true >>> +... >>> +-- Check autoreload on function change during failure. >>> +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end >>> +--- >>> +... >>> +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') >>> +--- >>> +... >>> +fib:cancel() >>> +--- >>> +... >>> +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has changed') >> >> 2. 'Has been' changed? > I has changed it, but there are two notes: > 1. Both are correct https://english.stackexchange.com/questions/97243/has-been-changed-or-has-changed > 2. vshard uses both options: >    a. "Configuration has changed, restart" >    b. "Rebalancer location has changed" >> >>> +--- >>> +- 'Worker_name: reloadable function reloadable_function has changed' >>> +... >>> diff --git a/vshard/util.lua b/vshard/util.lua >>> index bb71318..920f53e 100644 >>> --- a/vshard/util.lua >>> +++ b/vshard/util.lua >>> @@ -19,33 +32,37 @@ local function tuple_extract_key(tuple, parts) >>> +local function reloadable_fiber_f(module, func_name, worker_name) >>> +    log.info('%s has been started', worker_name) >>> +    local func = module[func_name] >>> +    local ok, err = pcall(func, module.module_version) >>> +    if not ok then >>> +        log.error('%s has been failed: %s', worker_name, err) >>> +        if func ~= module[func_name] then >>> +            log.warn('%s: reloadable function %s has changed', >>> +                        worker_name, func_name) >> >> 3. Why is these check and warn here, after an error? > Changed: > 1. warn -> error > 2. Comment added: >         -- There is a chance that error was raised during reload >         -- (or caused by reload). Perform reload in case function >         -- has been changed. >> >>>           end >>>       end >>> +    -- yield serves two pursoses: >>> +    --  * makes this fiber cancellable >>> +    --  * prevents 100% cpu consumption >>> +    fiber.yield() >>> +    log.info('%s is reloaded, restarting', worker_name) >> >> 4. Why do you print 'is reloaded' even if it just thrown an error >> and the function is not changed actually? > > Fixed by goto >> >>> +    -- luajit drops this frame if next function is called in >>> +    -- return statement. >>> +    return M.reloadable_fiber_f(module, func_name, worker_name) >>>   end > > Full diff: > > commit d098bf0930935b029fbca3f4fd78211dd5022fb1 > Author: AKhatskevich > Date:   Thu Jun 14 14:03:07 2018 +0300 > >     Reload reloadable fiber > >     Fixed a problem: >     The `reloadable_fiber_f` was running an infinite while loop and >     preventing the whole module from being reloaded. > >     This behavior is fixed by calling new version of `reloadable_fiber_f` in >     a return statement instead of the while loop. Note: calling a function >     in a return statement doesn't increase a stack size. > >     Extra changes: >      * transfer write "started" message responsibility from caller to >        reloadable fiber > >     Closes #116 > > diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result > index 22100fe..6e3aa81 100644 > --- a/test/rebalancer/rebalancer.result > +++ b/test/rebalancer/rebalancer.result > @@ -424,7 +424,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) >  fiber = require('fiber') >  --- >  ... > -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end > +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end >  --- >  ... >  while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end > diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua > index 3327f30..922357a 100644 > --- a/test/rebalancer/rebalancer.test.lua > +++ b/test/rebalancer/rebalancer.test.lua > @@ -197,7 +197,7 @@ switch_rs1_master() >  vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) > >  fiber = require('fiber') > -while not test_run:grep_log('box_2_a', "Run rebalancer") do fiber.sleep(0.1) end > +while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end >  while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end > >  -- > diff --git a/test/router/reload.result b/test/router/reload.result > index 19a9ead..47f3c2e 100644 > --- a/test/router/reload.result > +++ b/test/router/reload.result > @@ -116,10 +116,10 @@ vshard.router.module_version() >  check_reloaded() >  --- >  ... > -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end >  --- >  ... > -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end >  --- >  ... >  check_reloaded() > diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua > index 6e21b74..af2939d 100644 > --- a/test/router/reload.test.lua > +++ b/test/router/reload.test.lua > @@ -59,8 +59,8 @@ vshard.router.module_version() > >  check_reloaded() > > -while test_run:grep_log('router_1', 'Failover has been reloaded') == nil do fiber.sleep(0.1) end > -while test_run:grep_log('router_1', 'Discovery has been reloaded') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > >  check_reloaded() > > diff --git a/test/storage/reload.result b/test/storage/reload.result > index f689cf4..531d984 100644 > --- a/test/storage/reload.result > +++ b/test/storage/reload.result > @@ -113,13 +113,13 @@ vshard.storage.module_version() >  check_reloaded() >  --- >  ... > -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end >  --- >  ... > -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end > +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end >  --- >  ... > -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end > +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end >  --- >  ... >  check_reloaded() > diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua > index 6e19a92..64c3a60 100644 > --- a/test/storage/reload.test.lua > +++ b/test/storage/reload.test.lua > @@ -59,9 +59,9 @@ vshard.storage.module_version() > >  check_reloaded() > > -while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end > -while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end > -while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end > +while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end > +while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end > >  check_reloaded() > > diff --git a/test/unit/util.result b/test/unit/util.result > new file mode 100644 > index 0000000..a7b176d > --- /dev/null > +++ b/test/unit/util.result > @@ -0,0 +1,72 @@ > +test_run = require('test_run').new() > +--- > +... > +fiber = require('fiber') > +--- > +... > +log = require('log') > +--- > +... > +test_util = require('util') > +--- > +... > +util = require('vshard.util') > +--- > +... > +test_run:cmd("setopt delimiter ';'") > +--- > +- true > +... > +fake_M = { > +    reloadable_func = nil, > +    module_version = 1, > +}; > +--- > +... > +test_run:cmd("setopt delimiter ''"); > +--- > +- true > +... > +-- Check autoreload on function change during failure. > +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end > +--- > +... > +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > +--- > +... > +fib:cancel() > +--- > +... > +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') > +--- > +- 'Worker_name: reloadable function reloadable_function has been changed' > +... > +test_run:grep_log('default', 'Worker_name is reloaded, restarting') > +--- > +- Worker_name is reloaded, restarting > +... > +test_run:grep_log('default', 'Worker_name has been started') > +--- > +- Worker_name has been started > +... > +log.info(string.rep('a', 1000)) > +--- > +... > +-- Check reload feature. > +fake_M.reloadable_function = function () return true end > +--- > +... > +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > +--- > +... > +fib:cancel() > +--- > +... > +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) > +--- > +- Worker_name is reloaded, restarting > +... > +test_run:grep_log('default', 'Worker_name has been started', 1000) > +--- > +- Worker_name has been started > +... > diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua > new file mode 100644 > index 0000000..9359378 > --- /dev/null > +++ b/test/unit/util.test.lua > @@ -0,0 +1,29 @@ > +test_run = require('test_run').new() > +fiber = require('fiber') > +log = require('log') > +test_util = require('util') > +util = require('vshard.util') > + > +test_run:cmd("setopt delimiter ';'") > +fake_M = { > +    reloadable_func = nil, > +    module_version = 1, > +}; > +test_run:cmd("setopt delimiter ''"); > + > +-- Check autoreload on function change during failure. > +fake_M.reloadable_function = function () fake_M.reloadable_function = 42; error('Error happened.') end > + > +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > +fib:cancel() > +test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') > +test_run:grep_log('default', 'Worker_name is reloaded, restarting') > +test_run:grep_log('default', 'Worker_name has been started') > +log.info(string.rep('a', 1000)) > + > +-- Check reload feature. > +fake_M.reloadable_function = function () return true end > +fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > +fib:cancel() > +test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000) > +test_run:grep_log('default', 'Worker_name has been started', 1000) > diff --git a/vshard/router/init.lua b/vshard/router/init.lua > index 21093e5..9e18b27 100644 > --- a/vshard/router/init.lua > +++ b/vshard/router/init.lua > @@ -491,11 +491,9 @@ local function router_cfg(cfg) >      end >      lreplicaset.wait_masters_connect(new_replicasets) >      if M.failover_fiber == nil then > -        log.info('Start failover fiber') >          lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover') >      end >      if M.discovery_fiber == nil then > -        log.info('Start discovery fiber') >          lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery') >      end >      -- Destroy connections, not used in a new configuration. > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua > index 57076e1..300751f 100644 > --- a/vshard/storage/init.lua > +++ b/vshard/storage/init.lua > @@ -692,10 +692,8 @@ local function local_on_master_enable() >      M.collect_bucket_garbage_fiber = >          lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f', >                        'Garbage collector') > -    log.info("GC is started") >      M.recovery_fiber = >          lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery') > -    log.info('Recovery is started') >      -- TODO: check current status >      log.info("Took on replicaset master role") >  end > @@ -1585,7 +1583,6 @@ local function storage_cfg(cfg, this_replica_uuid) > >      if min_master == this_replica then >          if not M.rebalancer_fiber then > -            log.info('Run rebalancer') >              M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M, > 'rebalancer_f', 'Rebalancer') >          else > diff --git a/vshard/util.lua b/vshard/util.lua > index bb71318..ce79930 100644 > --- a/vshard/util.lua > +++ b/vshard/util.lua > @@ -2,6 +2,19 @@ >  local log = require('log') >  local fiber = require('fiber') > > +local MODULE_INTERNALS = '__module_vshard_util' > +local M = rawget(_G, MODULE_INTERNALS) > +if not M then > +    -- > +    -- The module is loaded for the first time. > +    -- > +    M = { > +        -- Latest versions of functions. > +        reloadable_fiber_f = nil, > +    } > +    rawset(_G, MODULE_INTERNALS, M) > +end > + >  -- >  -- Extract parts of a tuple. >  -- @param tuple Tuple to extract a key from. > @@ -19,33 +32,42 @@ local function tuple_extract_key(tuple, parts) >  end > >  -- > --- Wrapper to run @a func in infinite loop and restart it on the > --- module reload. This function CAN NOT BE AUTORELOADED. To update > --- it you must manualy stop all fibers, run by this function, do > --- reload, and then restart all stopped fibers. This can be done, > --- for example, by calling vshard.storage/router.cfg() again with > --- the same config as earlier. > +-- Wrapper to run a func in infinite loop and restart it on > +-- errors and module reload. > +-- To handle module reload and run new version of a function > +-- in the module, the function should just return. >  -- > --- @param func Reloadable function to run. It must accept current > ---        module version as an argument, and interrupt itself, > ---        when it is changed. > --- @param worker_name Name of the function. Usual infinite fiber > ---        represents a background subsystem, which has a name. For > ---        example: "Garbage Collector", "Recovery", "Discovery", > ---        "Rebalancer". > --- @param M Module which can reload. > +-- @param module Module which can be reloaded. > +-- @param func_name Name of a function to be executed in the > +--        module. > +-- @param worker_name Name of the reloadable background subsystem. > +--        For example: "Garbage Collector", "Recovery", "Discovery", > +--        "Rebalancer". Used only for an activity logging. >  -- > -local function reloadable_fiber_f(M, func_name, worker_name) > -    while true do > -        local ok, err = pcall(M[func_name], M.module_version) > -        if not ok then > -            log.error('%s has been failed: %s', worker_name, err) > -            fiber.yield() > -        else > -            log.info('%s has been reloaded', worker_name) > -            fiber.yield() > +local function reloadable_fiber_f(module, func_name, worker_name) > +    log.info('%s has been started', worker_name) > +    local func = module[func_name] > +::reload_loop:: > +    local ok, err = pcall(func, module.module_version) > +    -- yield serves two pursoses: > +    --  * makes this fiber cancellable > +    --  * prevents 100% cpu consumption > +    fiber.yield() > +    if not ok then > +        log.error('%s has been failed: %s', worker_name, err) > +        if func == module[func_name] then > +            goto reload_loop >          end > +        -- There is a chance that error was raised during reload > +        -- (or caused by reload). Perform reload in case function > +        -- has been changed. > +        log.error('%s: reloadable function %s has been changed', > +                  worker_name, func_name) >      end > +    log.info('%s is reloaded, restarting', worker_name) > +    -- luajit drops this frame if next function is called in > +    -- return statement. > +    return M.reloadable_fiber_f(module, func_name, worker_name) >  end > >  -- > @@ -75,8 +97,12 @@ local function generate_self_checker(obj_name, func_name, mt, func) >      end >  end > > +-- Update latest versions of function > +M.reloadable_fiber_f = reloadable_fiber_f > + >  return { >      tuple_extract_key = tuple_extract_key, >      reloadable_fiber_f = reloadable_fiber_f, >      generate_self_checker = generate_self_checker, > +    internal = M, >  } >