From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 1494926EC6 for ; Wed, 1 Aug 2018 07:54:47 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id Ey8jp_9xSKAI for ; Wed, 1 Aug 2018 07:54:47 -0400 (EDT) Received: from smtp42.i.mail.ru (smtp42.i.mail.ru [94.100.177.102]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 4F5CA21E4A for ; Wed, 1 Aug 2018 07:54:46 -0400 (EDT) Subject: [tarantool-patches] Re: [PATCH 2/4] Refactor reloadable fiber References: <03fadaa0-c6a1-bc63-69b6-ccdccc155848@tarantool.org> <4fdb6747-6283-7823-e24e-caf1b709ab04@tarantool.org> From: Vladislav Shpilevoy Message-ID: Date: Wed, 1 Aug 2018 14:54:42 +0300 MIME-Version: 1.0 In-Reply-To: <4fdb6747-6283-7823-e24e-caf1b709ab04@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Language: en-US Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org, Alex Khatskevich Thanks for the patch! Pushed into the master. On 31/07/2018 14:30, Alex Khatskevich wrote: > full diff > > commit 8afc94e2bb4ec880815e7f2154607d9f23a88e27 > Author: AKhatskevich > Date:   Thu Jul 26 19:42:14 2018 +0300 > >     Refactor reloadable fiber > >     Reloadable fiber changes: >     * renamed reloadable_fiber_f -> reloadable_fiber >     * reloadable_fiber creates fiber and gives name on its own >     * worker name replaced with a fiber name in logs >     * added extra data argument, which is passed to a function > > diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result > index 88cbaae..3df8d4b 100644 > --- a/test/rebalancer/rebalancer.result > +++ b/test/rebalancer/rebalancer.result > @@ -418,7 +418,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) >  fiber = require('fiber') >  --- >  ... > -while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end > +while not test_run:grep_log('box_2_a', "rebalancer_f has been started") do fiber.sleep(0.1) end >  --- >  ... >  while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end > diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua > index 01f2061..0cfe9a9 100644 > --- a/test/rebalancer/rebalancer.test.lua > +++ b/test/rebalancer/rebalancer.test.lua > @@ -195,7 +195,7 @@ switch_rs1_master() >  vshard.storage.cfg(cfg, names.replica_uuid.box_2_b) > >  fiber = require('fiber') > -while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end > +while not test_run:grep_log('box_2_a', "rebalancer_f has been started") do fiber.sleep(0.1) end >  while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end > >  -- > diff --git a/test/router/reload.result b/test/router/reload.result > index 88122aa..f0badc3 100644 > --- a/test/router/reload.result > +++ b/test/router/reload.result > @@ -116,10 +116,10 @@ vshard.router.module_version() >  check_reloaded() >  --- >  ... > -while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('router_1', 'failover_f has been started') == nil do fiber.sleep(0.1) end >  --- >  ... > -while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'discovery_f has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end >  --- >  ... >  check_reloaded() > diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua > index 01b7163..528222a 100644 > --- a/test/router/reload.test.lua > +++ b/test/router/reload.test.lua > @@ -59,8 +59,8 @@ vshard.router.module_version() > >  check_reloaded() > > -while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end > -while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'failover_f has been started') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('router_1', 'discovery_f has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > >  check_reloaded() > > diff --git a/test/storage/reload.result b/test/storage/reload.result > index b91b622..c354dba 100644 > --- a/test/storage/reload.result > +++ b/test/storage/reload.result > @@ -113,13 +113,13 @@ vshard.storage.module_version() >  check_reloaded() >  --- >  ... > -while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('storage_2_a', 'collect_garbage_f has been started') == nil do fiber.sleep(0.1) end >  --- >  ... > -while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end > +while test_run:grep_log('storage_2_a', 'recovery_f has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end >  --- >  ... > -while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end > +while test_run:grep_log('storage_2_a', 'rebalancer_f has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end >  --- >  ... >  check_reloaded() > diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua > index 9140299..a8df1df 100644 > --- a/test/storage/reload.test.lua > +++ b/test/storage/reload.test.lua > @@ -59,9 +59,9 @@ vshard.storage.module_version() > >  check_reloaded() > > -while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end > -while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end > -while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end > +while test_run:grep_log('storage_2_a', 'collect_garbage_f has been started') == nil do fiber.sleep(0.1) end > +while test_run:grep_log('storage_2_a', 'recovery_f has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end > +while test_run:grep_log('storage_2_a', 'rebalancer_f has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end > >  check_reloaded() > > diff --git a/test/unit/garbage.result b/test/unit/garbage.result > index 0da8ee1..a352bd8 100644 > --- a/test/unit/garbage.result > +++ b/test/unit/garbage.result > @@ -343,7 +343,7 @@ control.bucket_generation_collected >  collect_f = vshard.storage.internal.collect_garbage_f >  --- >  ... > -f = fiber.create(collect_f, vshard.storage.module_version()) > +f = fiber.create(collect_f) >  --- >  ... >  fill_spaces_with_garbage() > diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua > index db7821f..80d37e7 100644 > --- a/test/unit/garbage.test.lua > +++ b/test/unit/garbage.test.lua > @@ -149,7 +149,7 @@ control.bucket_generation_collected >  -- Test continuous garbage collection via background fiber. >  -- >  collect_f = vshard.storage.internal.collect_garbage_f > -f = fiber.create(collect_f, vshard.storage.module_version()) > +f = fiber.create(collect_f) >  fill_spaces_with_garbage() >  -- Wait until garbage collection is finished. >  while #s2:select{} ~= 2 do fiber.sleep(0.1) end > diff --git a/test/unit/util.result b/test/unit/util.result > index 30906d1..096e36f 100644 > --- a/test/unit/util.result > +++ b/test/unit/util.result > @@ -34,22 +34,22 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end >  fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end >  --- >  ... > -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function') >  --- >  ... > -while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end > +while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end >  --- >  ... >  fib:cancel() >  --- >  ... > -test_run:grep_log('default', 'Worker_name is reloaded, restarting') > +test_run:grep_log('default', 'module is reloaded, restarting') >  --- > -- Worker_name is reloaded, restarting > +- module is reloaded, restarting >  ... > -test_run:grep_log('default', 'Worker_name has been started') > +test_run:grep_log('default', 'reloadable_function has been started') >  --- > -- Worker_name has been started > +- reloadable_function has been started >  ... >  log.info(string.rep('a', 1000)) >  --- > @@ -58,16 +58,16 @@ log.info(string.rep('a', 1000)) >  fake_M.reloadable_function = function () fiber.sleep(0.01); return true end >  --- >  ... > -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function') >  --- >  ... > -while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end > +while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end >  --- >  ... > -fib:cancel() > +test_run:grep_log('default', 'reloadable_function has been started', 1000) >  --- > +- reloadable_function has been started >  ... > -test_run:grep_log('default', 'Worker_name has been started', 1000) > +fib:cancel() >  --- > -- Worker_name has been started >  ... > diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua > index 131274c..5f39e06 100644 > --- a/test/unit/util.test.lua > +++ b/test/unit/util.test.lua > @@ -14,16 +14,16 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end >  -- Check autoreload on function change during failure. >  fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end > > -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > -while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end > +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function') > +while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end >  fib:cancel() > -test_run:grep_log('default', 'Worker_name is reloaded, restarting') > -test_run:grep_log('default', 'Worker_name has been started') > +test_run:grep_log('default', 'module is reloaded, restarting') > +test_run:grep_log('default', 'reloadable_function has been started') >  log.info(string.rep('a', 1000)) > >  -- Check reload feature. >  fake_M.reloadable_function = function () fiber.sleep(0.01); return true end > -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') > -while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end > +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function') > +while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end > +test_run:grep_log('default', 'reloadable_function has been started', 1000) >  fib:cancel() > -test_run:grep_log('default', 'Worker_name has been started', 1000) > diff --git a/vshard/router/init.lua b/vshard/router/init.lua > index 1a0ed2f..4cb19fd 100644 > --- a/vshard/router/init.lua > +++ b/vshard/router/init.lua > @@ -149,9 +149,8 @@ end >  -- Background fiber to perform discovery. It periodically scans >  -- replicasets one by one and updates route_map. >  -- > -local function discovery_f(module_version) > -    lfiber.name('discovery_fiber') > -    M.discovery_fiber = lfiber.self() > +local function discovery_f() > +    local module_version = M.module_version >      local iterations_until_lua_gc = >          consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL >      while module_version == M.module_version do > @@ -459,9 +458,8 @@ end >  -- tries to reconnect to the best replica. When the connection is >  -- established, it replaces the original replica. >  -- > -local function failover_f(module_version) > -    lfiber.name('vshard.failover') > -    M.failover_fiber = lfiber.self() > +local function failover_f() > +    local module_version = M.module_version >      local min_timeout = math.min(consts.FAILOVER_UP_TIMEOUT, >                                   consts.FAILOVER_DOWN_TIMEOUT) >      -- This flag is used to avoid logging like: > @@ -545,10 +543,12 @@ local function router_cfg(cfg) >          M.route_map[bucket] = M.replicasets[rs.uuid] >      end >      if M.failover_fiber == nil then > -        lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover') > +        M.failover_fiber = util.reloadable_fiber_create( > +            'vshard.failover', M, 'failover_f') >      end >      if M.discovery_fiber == nil then > -        lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery') > +        M.discovery_fiber = util.reloadable_fiber_create( > +            'vshard.discovery', M, 'discovery_f') >      end >      -- Destroy connections, not used in a new configuration. >      collectgarbage() > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua > index c1df0e6..5ec6898 100644 > --- a/vshard/storage/init.lua > +++ b/vshard/storage/init.lua > @@ -352,10 +352,10 @@ end >  -- @param module_version Module version, on which the current >  --        function had been started. If the actual module version >  --        appears to be changed, then stop recovery. It is > ---        restarted in reloadable_fiber_f(). > +--        restarted in reloadable_fiber. >  -- > -local function recovery_f(module_version) > -    lfiber.name('vshard.recovery') > +local function recovery_f() > +    local module_version = M.module_version >      local _bucket = box.space._bucket >      M.buckets_to_recovery = {} >      for _, bucket in _bucket.index.status:pairs({consts.BUCKET.SENDING}) do > @@ -728,10 +728,9 @@ local function local_on_master_enable() >      M._on_master_enable:run() >      -- Start background process to collect garbage. >      M.collect_bucket_garbage_fiber = > -        lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f', > -                      'Garbage collector') > +        util.reloadable_fiber_create('vshard.gc', M, 'collect_garbage_f') >      M.recovery_fiber = > -        lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery') > +        util.reloadable_fiber_create('vshard.recovery', M, 'recovery_f') >      -- TODO: check current status >      log.info("Took on replicaset master role") >  end > @@ -1024,10 +1023,10 @@ end >  -- @param module_version Module version, on which the current >  --        function had been started. If the actual module version >  --        appears to be changed, then stop GC. It is restarted > ---        in reloadable_fiber_f(). > +--        in reloadable_fiber. >  -- > -function collect_garbage_f(module_version) > -    lfiber.name('vshard.gc') > +function collect_garbage_f() > +    local module_version = M.module_version >      -- Collector controller. Changes of _bucket increments >      -- bucket generation. Garbage collector has its own bucket >      -- generation which is <= actual. Garbage collection is > @@ -1351,8 +1350,8 @@ end >  -- Background rebalancer. Works on a storage which has the >  -- smallest replicaset uuid and which is master. >  -- > -local function rebalancer_f(module_version) > -    lfiber.name('vshard.rebalancer') > +local function rebalancer_f() > +    local module_version = M.module_version >      while module_version == M.module_version do >          while not M.is_rebalancer_active do >              log.info('Rebalancer is disabled. Sleep') > @@ -1642,8 +1641,8 @@ local function storage_cfg(cfg, this_replica_uuid) > >      if min_master == this_replica then >          if not M.rebalancer_fiber then > -            M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M, > -                                               'rebalancer_f', 'Rebalancer') > +            M.rebalancer_fiber = util.reloadable_fiber_create( > +                'vshard.rebalancer', M, 'rebalancer_f') >          else >              log.info('Wakeup rebalancer') >              -- Configuration had changed. Time to rebalance. > diff --git a/vshard/util.lua b/vshard/util.lua > index fb875ce..ea676ff 100644 > --- a/vshard/util.lua > +++ b/vshard/util.lua > @@ -10,7 +10,7 @@ if not M then >      -- >      M = { >          -- Latest versions of functions. > -        reloadable_fiber_f = nil, > +        reloadable_fiber_main_loop = nil, >      } >      rawset(_G, MODULE_INTERNALS, M) >  end > @@ -34,40 +34,52 @@ end >  -- >  -- Wrapper to run a func in infinite loop and restart it on >  -- errors and module reload. > --- To handle module reload and run new version of a function > --- in the module, the function should just return. > --- > --- @param module Module which can be reloaded. > --- @param func_name Name of a function to be executed in the > ---        module. > --- @param worker_name Name of the reloadable background subsystem. > ---        For example: "Garbage Collector", "Recovery", "Discovery", > ---        "Rebalancer". Used only for an activity logging. > +-- This loop executes the latest version of itself in case of > +-- reload of that module. > +-- See description of parameters in `reloadable_fiber_create`. >  -- > -local function reloadable_fiber_f(module, func_name, worker_name) > -    log.info('%s has been started', worker_name) > +local function reloadable_fiber_main_loop(module, func_name) > +    log.info('%s has been started', func_name) >      local func = module[func_name] > -::reload_loop:: > -    local ok, err = pcall(func, module.module_version) > -    -- yield serves two pursoses: > +::restart_loop:: > +    local ok, err = pcall(func) > +    -- yield serves two purposes: >      --  * makes this fiber cancellable >      --  * prevents 100% cpu consumption >      fiber.yield() >      if not ok then > -        log.error('%s has been failed: %s', worker_name, err) > +        log.error('%s has been failed: %s', func_name, err) >          if func == module[func_name] then > -            goto reload_loop > +            goto restart_loop >          end >          -- There is a chance that error was raised during reload >          -- (or caused by reload). Perform reload in case function >          -- has been changed. > -        log.error('%s: reloadable function %s has been changed', > -                  worker_name, func_name) > +        log.error('reloadable function %s has been changed', func_name) >      end > -    log.info('%s is reloaded, restarting', worker_name) > +    log.info('module is reloaded, restarting') >      -- luajit drops this frame if next function is called in >      -- return statement. > -    return M.reloadable_fiber_f(module, func_name, worker_name) > +    return M.reloadable_fiber_main_loop(module, func_name) > +end > + > +-- > +-- Create a new fiber which runs a function in a loop. This loop > +-- is aware of reload mechanism and it loads a new version of the > +-- function in that case. > +-- To handle module reload and run new version of a function > +-- in the module, the function should just return. > +-- @param fiber_name Name of a new fiber. E.g. "vshard.rebalancer". > +-- @param module Module which can be reloaded. > +-- @param func_name Name of a function to be executed in the > +--        module. > +-- @retval New fiber. > +-- > +local function reloadable_fiber_create(fiber_name, module, func_name) > +    assert(type(fiber_name) == 'string') > +    local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name) > +    xfiber:name(fiber_name) > +    return xfiber >  end > >  -- > @@ -98,7 +110,7 @@ local function generate_self_checker(obj_name, func_name, mt, func) >  end > >  -- Update latest versions of function > -M.reloadable_fiber_f = reloadable_fiber_f > +M.reloadable_fiber_main_loop = reloadable_fiber_main_loop > >  local function sync_task(delay, task, ...) >      if delay then > @@ -121,7 +133,7 @@ end > >  return { >      tuple_extract_key = tuple_extract_key, > -    reloadable_fiber_f = reloadable_fiber_f, > +    reloadable_fiber_create = reloadable_fiber_create, >      generate_self_checker = generate_self_checker, >      async_task = async_task, >      internal = M, > >