From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 249C827271 for ; Tue, 31 Jul 2018 07:30:42 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id T8xZek2JGUIi for ; Tue, 31 Jul 2018 07:30:42 -0400 (EDT) Received: from smtp29.i.mail.ru (smtp29.i.mail.ru [94.100.177.89]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 9B76B267B0 for ; Tue, 31 Jul 2018 07:30:41 -0400 (EDT) Received: from [185.6.245.156] (port=48917 helo=[100.96.166.235]) by smtp29.i.mail.ru with esmtpa (envelope-from ) id 1fkSrQ-0005GU-AS for tarantool-patches@freelists.org; Tue, 31 Jul 2018 14:30:40 +0300 Subject: [tarantool-patches] Re: [PATCH 2/4] Refactor reloadable fiber References: <03fadaa0-c6a1-bc63-69b6-ccdccc155848@tarantool.org> From: Alex Khatskevich Message-ID: <4fdb6747-6283-7823-e24e-caf1b709ab04@tarantool.org> Date: Tue, 31 Jul 2018 14:30:40 +0300 MIME-Version: 1.0 In-Reply-To: <03fadaa0-c6a1-bc63-69b6-ccdccc155848@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-US Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org full diff commit 8afc94e2bb4ec880815e7f2154607d9f23a88e27 Author: AKhatskevich Date:   Thu Jul 26 19:42:14 2018 +0300     Refactor reloadable fiber     Reloadable fiber changes:     * renamed reloadable_fiber_f -> reloadable_fiber     * reloadable_fiber creates fiber and gives name on its own     * worker name replaced with a fiber name in logs     * added extra data argument, which is passed to a function diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result index 88cbaae..3df8d4b 100644 --- a/test/rebalancer/rebalancer.result +++ b/test/rebalancer/rebalancer.result @@ -418,7 +418,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)  fiber = require('fiber')  ---  ... -while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "rebalancer_f has been started") do fiber.sleep(0.1) end  ---  ...  while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua index 01f2061..0cfe9a9 100644 --- a/test/rebalancer/rebalancer.test.lua +++ b/test/rebalancer/rebalancer.test.lua @@ -195,7 +195,7 @@ switch_rs1_master()  vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)  fiber = require('fiber') -while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end +while not test_run:grep_log('box_2_a', "rebalancer_f has been started") do fiber.sleep(0.1) end  while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end  -- diff --git a/test/router/reload.result b/test/router/reload.result index 88122aa..f0badc3 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -116,10 +116,10 @@ vshard.router.module_version()  check_reloaded()  ---  ... -while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'failover_f has been started') == nil do fiber.sleep(0.1) end  ---  ... -while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'discovery_f has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end  ---  ...  check_reloaded() diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 01b7163..528222a 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -59,8 +59,8 @@ vshard.router.module_version()  check_reloaded() -while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end -while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'failover_f has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('router_1', 'discovery_f has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end  check_reloaded() diff --git a/test/storage/reload.result b/test/storage/reload.result index b91b622..c354dba 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -113,13 +113,13 @@ vshard.storage.module_version()  check_reloaded()  ---  ... -while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'collect_garbage_f has been started') == nil do fiber.sleep(0.1) end  ---  ... -while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'recovery_f has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end  ---  ... -while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'rebalancer_f has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end  ---  ...  check_reloaded() diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 9140299..a8df1df 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -59,9 +59,9 @@ vshard.storage.module_version()  check_reloaded() -while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end -while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end -while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end +while test_run:grep_log('storage_2_a', 'collect_garbage_f has been started') == nil do fiber.sleep(0.1) end +while test_run:grep_log('storage_2_a', 'recovery_f has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end +while test_run:grep_log('storage_2_a', 'rebalancer_f has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end  check_reloaded() diff --git a/test/unit/garbage.result b/test/unit/garbage.result index 0da8ee1..a352bd8 100644 --- a/test/unit/garbage.result +++ b/test/unit/garbage.result @@ -343,7 +343,7 @@ control.bucket_generation_collected  collect_f = vshard.storage.internal.collect_garbage_f  ---  ... -f = fiber.create(collect_f, vshard.storage.module_version()) +f = fiber.create(collect_f)  ---  ...  fill_spaces_with_garbage() diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua index db7821f..80d37e7 100644 --- a/test/unit/garbage.test.lua +++ b/test/unit/garbage.test.lua @@ -149,7 +149,7 @@ control.bucket_generation_collected  -- Test continuous garbage collection via background fiber.  --  collect_f = vshard.storage.internal.collect_garbage_f -f = fiber.create(collect_f, vshard.storage.module_version()) +f = fiber.create(collect_f)  fill_spaces_with_garbage()  -- Wait until garbage collection is finished.  while #s2:select{} ~= 2 do fiber.sleep(0.1) end diff --git a/test/unit/util.result b/test/unit/util.result index 30906d1..096e36f 100644 --- a/test/unit/util.result +++ b/test/unit/util.result @@ -34,22 +34,22 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end  fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end  ---  ... -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function')  ---  ... -while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end +while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end  ---  ...  fib:cancel()  ---  ... -test_run:grep_log('default', 'Worker_name is reloaded, restarting') +test_run:grep_log('default', 'module is reloaded, restarting')  --- -- Worker_name is reloaded, restarting +- module is reloaded, restarting  ... -test_run:grep_log('default', 'Worker_name has been started') +test_run:grep_log('default', 'reloadable_function has been started')  --- -- Worker_name has been started +- reloadable_function has been started  ...  log.info(string.rep('a', 1000))  --- @@ -58,16 +58,16 @@ log.info(string.rep('a', 1000))  fake_M.reloadable_function = function () fiber.sleep(0.01); return true end  ---  ... -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function')  ---  ... -while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end +while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end  ---  ... -fib:cancel() +test_run:grep_log('default', 'reloadable_function has been started', 1000)  --- +- reloadable_function has been started  ... -test_run:grep_log('default', 'Worker_name has been started', 1000) +fib:cancel()  --- -- Worker_name has been started  ... diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua index 131274c..5f39e06 100644 --- a/test/unit/util.test.lua +++ b/test/unit/util.test.lua @@ -14,16 +14,16 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end  -- Check autoreload on function change during failure.  fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') -while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function') +while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end  fib:cancel() -test_run:grep_log('default', 'Worker_name is reloaded, restarting') -test_run:grep_log('default', 'Worker_name has been started') +test_run:grep_log('default', 'module is reloaded, restarting') +test_run:grep_log('default', 'reloadable_function has been started')  log.info(string.rep('a', 1000))  -- Check reload feature.  fake_M.reloadable_function = function () fiber.sleep(0.01); return true end -fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name') -while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end +fib = util.reloadable_fiber_create('Worker_name', fake_M, 'reloadable_function') +while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end +test_run:grep_log('default', 'reloadable_function has been started', 1000)  fib:cancel() -test_run:grep_log('default', 'Worker_name has been started', 1000) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 1a0ed2f..4cb19fd 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -149,9 +149,8 @@ end  -- Background fiber to perform discovery. It periodically scans  -- replicasets one by one and updates route_map.  -- -local function discovery_f(module_version) -    lfiber.name('discovery_fiber') -    M.discovery_fiber = lfiber.self() +local function discovery_f() +    local module_version = M.module_version      local iterations_until_lua_gc =          consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL      while module_version == M.module_version do @@ -459,9 +458,8 @@ end  -- tries to reconnect to the best replica. When the connection is  -- established, it replaces the original replica.  -- -local function failover_f(module_version) -    lfiber.name('vshard.failover') -    M.failover_fiber = lfiber.self() +local function failover_f() +    local module_version = M.module_version      local min_timeout = math.min(consts.FAILOVER_UP_TIMEOUT,                                   consts.FAILOVER_DOWN_TIMEOUT)      -- This flag is used to avoid logging like: @@ -545,10 +543,12 @@ local function router_cfg(cfg)          M.route_map[bucket] = M.replicasets[rs.uuid]      end      if M.failover_fiber == nil then -        lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover') +        M.failover_fiber = util.reloadable_fiber_create( +            'vshard.failover', M, 'failover_f')      end      if M.discovery_fiber == nil then -        lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery') +        M.discovery_fiber = util.reloadable_fiber_create( +            'vshard.discovery', M, 'discovery_f')      end      -- Destroy connections, not used in a new configuration.      collectgarbage() diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index c1df0e6..5ec6898 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -352,10 +352,10 @@ end  -- @param module_version Module version, on which the current  --        function had been started. If the actual module version  --        appears to be changed, then stop recovery. It is ---        restarted in reloadable_fiber_f(). +--        restarted in reloadable_fiber.  -- -local function recovery_f(module_version) -    lfiber.name('vshard.recovery') +local function recovery_f() +    local module_version = M.module_version      local _bucket = box.space._bucket      M.buckets_to_recovery = {}      for _, bucket in _bucket.index.status:pairs({consts.BUCKET.SENDING}) do @@ -728,10 +728,9 @@ local function local_on_master_enable()      M._on_master_enable:run()      -- Start background process to collect garbage.      M.collect_bucket_garbage_fiber = -        lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f', -                      'Garbage collector') +        util.reloadable_fiber_create('vshard.gc', M, 'collect_garbage_f')      M.recovery_fiber = -        lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery') +        util.reloadable_fiber_create('vshard.recovery', M, 'recovery_f')      -- TODO: check current status      log.info("Took on replicaset master role")  end @@ -1024,10 +1023,10 @@ end  -- @param module_version Module version, on which the current  --        function had been started. If the actual module version  --        appears to be changed, then stop GC. It is restarted ---        in reloadable_fiber_f(). +--        in reloadable_fiber.  -- -function collect_garbage_f(module_version) -    lfiber.name('vshard.gc') +function collect_garbage_f() +    local module_version = M.module_version      -- Collector controller. Changes of _bucket increments      -- bucket generation. Garbage collector has its own bucket      -- generation which is <= actual. Garbage collection is @@ -1351,8 +1350,8 @@ end  -- Background rebalancer. Works on a storage which has the  -- smallest replicaset uuid and which is master.  -- -local function rebalancer_f(module_version) -    lfiber.name('vshard.rebalancer') +local function rebalancer_f() +    local module_version = M.module_version      while module_version == M.module_version do          while not M.is_rebalancer_active do              log.info('Rebalancer is disabled. Sleep') @@ -1642,8 +1641,8 @@ local function storage_cfg(cfg, this_replica_uuid)      if min_master == this_replica then          if not M.rebalancer_fiber then -            M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M, -                                               'rebalancer_f', 'Rebalancer') +            M.rebalancer_fiber = util.reloadable_fiber_create( +                'vshard.rebalancer', M, 'rebalancer_f')          else              log.info('Wakeup rebalancer')              -- Configuration had changed. Time to rebalance. diff --git a/vshard/util.lua b/vshard/util.lua index fb875ce..ea676ff 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -10,7 +10,7 @@ if not M then      --      M = {          -- Latest versions of functions. -        reloadable_fiber_f = nil, +        reloadable_fiber_main_loop = nil,      }      rawset(_G, MODULE_INTERNALS, M)  end @@ -34,40 +34,52 @@ end  --  -- Wrapper to run a func in infinite loop and restart it on  -- errors and module reload. --- To handle module reload and run new version of a function --- in the module, the function should just return. --- --- @param module Module which can be reloaded. --- @param func_name Name of a function to be executed in the ---        module. --- @param worker_name Name of the reloadable background subsystem. ---        For example: "Garbage Collector", "Recovery", "Discovery", ---        "Rebalancer". Used only for an activity logging. +-- This loop executes the latest version of itself in case of +-- reload of that module. +-- See description of parameters in `reloadable_fiber_create`.  -- -local function reloadable_fiber_f(module, func_name, worker_name) -    log.info('%s has been started', worker_name) +local function reloadable_fiber_main_loop(module, func_name) +    log.info('%s has been started', func_name)      local func = module[func_name] -::reload_loop:: -    local ok, err = pcall(func, module.module_version) -    -- yield serves two pursoses: +::restart_loop:: +    local ok, err = pcall(func) +    -- yield serves two purposes:      --  * makes this fiber cancellable      --  * prevents 100% cpu consumption      fiber.yield()      if not ok then -        log.error('%s has been failed: %s', worker_name, err) +        log.error('%s has been failed: %s', func_name, err)          if func == module[func_name] then -            goto reload_loop +            goto restart_loop          end          -- There is a chance that error was raised during reload          -- (or caused by reload). Perform reload in case function          -- has been changed. -        log.error('%s: reloadable function %s has been changed', -                  worker_name, func_name) +        log.error('reloadable function %s has been changed', func_name)      end -    log.info('%s is reloaded, restarting', worker_name) +    log.info('module is reloaded, restarting')      -- luajit drops this frame if next function is called in      -- return statement. -    return M.reloadable_fiber_f(module, func_name, worker_name) +    return M.reloadable_fiber_main_loop(module, func_name) +end + +-- +-- Create a new fiber which runs a function in a loop. This loop +-- is aware of reload mechanism and it loads a new version of the +-- function in that case. +-- To handle module reload and run new version of a function +-- in the module, the function should just return. +-- @param fiber_name Name of a new fiber. E.g. "vshard.rebalancer". +-- @param module Module which can be reloaded. +-- @param func_name Name of a function to be executed in the +--        module. +-- @retval New fiber. +-- +local function reloadable_fiber_create(fiber_name, module, func_name) +    assert(type(fiber_name) == 'string') +    local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name) +    xfiber:name(fiber_name) +    return xfiber  end  -- @@ -98,7 +110,7 @@ local function generate_self_checker(obj_name, func_name, mt, func)  end  -- Update latest versions of function -M.reloadable_fiber_f = reloadable_fiber_f +M.reloadable_fiber_main_loop = reloadable_fiber_main_loop  local function sync_task(delay, task, ...)      if delay then @@ -121,7 +133,7 @@ end  return {      tuple_extract_key = tuple_extract_key, -    reloadable_fiber_f = reloadable_fiber_f, +    reloadable_fiber_create = reloadable_fiber_create,      generate_self_checker = generate_self_checker,      async_task = async_task,      internal = M,