[tarantool-patches] Re: [PATCH 2/4] Refactor reloadable fiber

Alex Khatskevich avkhatskevich at tarantool.org
Tue Jul 31 14:30:40 MSK 2018


full diff

commit 8afc94e2bb4ec880815e7f2154607d9f23a88e27
Author: AKhatskevich <avkhatskevich at tarantool.org>
Date:   Thu Jul 26 19:42:14 2018 +0300

     Refactor reloadable fiber

     Reloadable fiber changes:
     * renamed reloadable_fiber_f -> reloadable_fiber
     * reloadable_fiber creates fiber and gives name on its own
     * worker name replaced with a fiber name in logs
     * added extra data argument, which is passed to a function

diff --git a/test/rebalancer/rebalancer.result 
b/test/rebalancer/rebalancer.result
index 88cbaae..3df8d4b 100644
--- a/test/rebalancer/rebalancer.result
+++ b/test/rebalancer/rebalancer.result
@@ -418,7 +418,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)
  fiber = require('fiber')
  ---
  ...
-while not test_run:grep_log('box_2_a', "Rebalancer has been started") 
do fiber.sleep(0.1) end
+while not test_run:grep_log('box_2_a', "rebalancer_f has been started") 
do fiber.sleep(0.1) end
  ---
  ...
  while not test_run:grep_log('box_1_a', "Rebalancer location has 
changed") do fiber.sleep(0.1) end
diff --git a/test/rebalancer/rebalancer.test.lua 
b/test/rebalancer/rebalancer.test.lua
index 01f2061..0cfe9a9 100644
--- a/test/rebalancer/rebalancer.test.lua
+++ b/test/rebalancer/rebalancer.test.lua
@@ -195,7 +195,7 @@ switch_rs1_master()
  vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)

  fiber = require('fiber')
-while not test_run:grep_log('box_2_a', "Rebalancer has been started") 
do fiber.sleep(0.1) end
+while not test_run:grep_log('box_2_a', "rebalancer_f has been started") 
do fiber.sleep(0.1) end
  while not test_run:grep_log('box_1_a', "Rebalancer location has 
changed") do fiber.sleep(0.1) end

  --
diff --git a/test/router/reload.result b/test/router/reload.result
index 88122aa..f0badc3 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -116,10 +116,10 @@ vshard.router.module_version()
  check_reloaded()
  ---
  ...
-while test_run:grep_log('router_1', 'Failover has been started') == nil 
do fiber.sleep(0.1) end
+while test_run:grep_log('router_1', 'failover_f has been started') == 
nil do fiber.sleep(0.1) end
  ---
  ...
-while test_run:grep_log('router_1', 'Discovery has been started') == 
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'discovery_f has been started') == 
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
  ---
  ...
  check_reloaded()
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 01b7163..528222a 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -59,8 +59,8 @@ vshard.router.module_version()

  check_reloaded()

-while test_run:grep_log('router_1', 'Failover has been started') == nil 
do fiber.sleep(0.1) end
-while test_run:grep_log('router_1', 'Discovery has been started') == 
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'failover_f has been started') == 
nil do fiber.sleep(0.1) end
+while test_run:grep_log('router_1', 'discovery_f has been started') == 
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end

  check_reloaded()

diff --git a/test/storage/reload.result b/test/storage/reload.result
index b91b622..c354dba 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -113,13 +113,13 @@ vshard.storage.module_version()
  check_reloaded()
  ---
  ...
-while test_run:grep_log('storage_2_a', 'Garbage collector has been 
started') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('storage_2_a', 'collect_garbage_f has been 
started') == nil do fiber.sleep(0.1) end
  ---
  ...
-while test_run:grep_log('storage_2_a', 'Recovery has been started') == 
nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+while test_run:grep_log('storage_2_a', 'recovery_f has been started') 
== nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
  ---
  ...
-while test_run:grep_log('storage_2_a', 'Rebalancer has been started') 
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+while test_run:grep_log('storage_2_a', 'rebalancer_f has been started') 
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
  ---
  ...
  check_reloaded()
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 9140299..a8df1df 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -59,9 +59,9 @@ vshard.storage.module_version()

  check_reloaded()

-while test_run:grep_log('storage_2_a', 'Garbage collector has been 
started') == nil do fiber.sleep(0.1) end
-while test_run:grep_log('storage_2_a', 'Recovery has been started') == 
nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
-while test_run:grep_log('storage_2_a', 'Rebalancer has been started') 
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+while test_run:grep_log('storage_2_a', 'collect_garbage_f has been 
started') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('storage_2_a', 'recovery_f has been started') 
== nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+while test_run:grep_log('storage_2_a', 'rebalancer_f has been started') 
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end

  check_reloaded()

diff --git a/test/unit/garbage.result b/test/unit/garbage.result
index 0da8ee1..a352bd8 100644
--- a/test/unit/garbage.result
+++ b/test/unit/garbage.result
@@ -343,7 +343,7 @@ control.bucket_generation_collected
  collect_f = vshard.storage.internal.collect_garbage_f
  ---
  ...
-f = fiber.create(collect_f, vshard.storage.module_version())
+f = fiber.create(collect_f)
  ---
  ...
  fill_spaces_with_garbage()
diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
index db7821f..80d37e7 100644
--- a/test/unit/garbage.test.lua
+++ b/test/unit/garbage.test.lua
@@ -149,7 +149,7 @@ control.bucket_generation_collected
  -- Test continuous garbage collection via background fiber.
  --
  collect_f = vshard.storage.internal.collect_garbage_f
-f = fiber.create(collect_f, vshard.storage.module_version())
+f = fiber.create(collect_f)
  fill_spaces_with_garbage()
  -- Wait until garbage collection is finished.
  while #s2:select{} ~= 2 do fiber.sleep(0.1) end
diff --git a/test/unit/util.result b/test/unit/util.result
index 30906d1..096e36f 100644
--- a/test/unit/util.result
+++ b/test/unit/util.result
@@ -34,22 +34,22 @@ function slow_fail() fiber.sleep(0.01) error('Error 
happened.') end
  fake_M.reloadable_function = function () fake_M.reloadable_function = 
slow_fail; slow_fail() end
  ---
  ...
-fib = fiber.create(util.reloadable_fiber_f, fake_M, 
'reloadable_function', 'Worker_name')
+fib = util.reloadable_fiber_create('Worker_name', fake_M, 
'reloadable_function')
  ---
  ...
-while not test_run:grep_log('default', 'Worker_name: reloadable 
function reloadable_function has been changed') do fiber.sleep(0.01); end
+while not test_run:grep_log('default', 'reloadable function 
reloadable_function has been changed') do fiber.sleep(0.01); end
  ---
  ...
  fib:cancel()
  ---
  ...
-test_run:grep_log('default', 'Worker_name is reloaded, restarting')
+test_run:grep_log('default', 'module is reloaded, restarting')
  ---
-- Worker_name is reloaded, restarting
+- module is reloaded, restarting
  ...
-test_run:grep_log('default', 'Worker_name has been started')
+test_run:grep_log('default', 'reloadable_function has been started')
  ---
-- Worker_name has been started
+- reloadable_function has been started
  ...
  log.info(string.rep('a', 1000))
  ---
@@ -58,16 +58,16 @@ log.info(string.rep('a', 1000))
  fake_M.reloadable_function = function () fiber.sleep(0.01); return 
true end
  ---
  ...
-fib = fiber.create(util.reloadable_fiber_f, fake_M, 
'reloadable_function', 'Worker_name')
+fib = util.reloadable_fiber_create('Worker_name', fake_M, 
'reloadable_function')
  ---
  ...
-while not test_run:grep_log('default', 'Worker_name is reloaded, 
restarting') do fiber.sleep(0.01) end
+while not test_run:grep_log('default', 'module is reloaded, 
restarting') do fiber.sleep(0.01) end
  ---
  ...
-fib:cancel()
+test_run:grep_log('default', 'reloadable_function has been started', 1000)
  ---
+- reloadable_function has been started
  ...
-test_run:grep_log('default', 'Worker_name has been started', 1000)
+fib:cancel()
  ---
-- Worker_name has been started
  ...
diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua
index 131274c..5f39e06 100644
--- a/test/unit/util.test.lua
+++ b/test/unit/util.test.lua
@@ -14,16 +14,16 @@ function slow_fail() fiber.sleep(0.01) error('Error 
happened.') end
  -- Check autoreload on function change during failure.
  fake_M.reloadable_function = function () fake_M.reloadable_function = 
slow_fail; slow_fail() end

-fib = fiber.create(util.reloadable_fiber_f, fake_M, 
'reloadable_function', 'Worker_name')
-while not test_run:grep_log('default', 'Worker_name: reloadable 
function reloadable_function has been changed') do fiber.sleep(0.01); end
+fib = util.reloadable_fiber_create('Worker_name', fake_M, 
'reloadable_function')
+while not test_run:grep_log('default', 'reloadable function 
reloadable_function has been changed') do fiber.sleep(0.01); end
  fib:cancel()
-test_run:grep_log('default', 'Worker_name is reloaded, restarting')
-test_run:grep_log('default', 'Worker_name has been started')
+test_run:grep_log('default', 'module is reloaded, restarting')
+test_run:grep_log('default', 'reloadable_function has been started')
  log.info(string.rep('a', 1000))

  -- Check reload feature.
  fake_M.reloadable_function = function () fiber.sleep(0.01); return 
true end
-fib = fiber.create(util.reloadable_fiber_f, fake_M, 
'reloadable_function', 'Worker_name')
-while not test_run:grep_log('default', 'Worker_name is reloaded, 
restarting') do fiber.sleep(0.01) end
+fib = util.reloadable_fiber_create('Worker_name', fake_M, 
'reloadable_function')
+while not test_run:grep_log('default', 'module is reloaded, 
restarting') do fiber.sleep(0.01) end
+test_run:grep_log('default', 'reloadable_function has been started', 1000)
  fib:cancel()
-test_run:grep_log('default', 'Worker_name has been started', 1000)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 1a0ed2f..4cb19fd 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -149,9 +149,8 @@ end
  -- Background fiber to perform discovery. It periodically scans
  -- replicasets one by one and updates route_map.
  --
-local function discovery_f(module_version)
-    lfiber.name('discovery_fiber')
-    M.discovery_fiber = lfiber.self()
+local function discovery_f()
+    local module_version = M.module_version
      local iterations_until_lua_gc =
          consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
      while module_version == M.module_version do
@@ -459,9 +458,8 @@ end
  -- tries to reconnect to the best replica. When the connection is
  -- established, it replaces the original replica.
  --
-local function failover_f(module_version)
-    lfiber.name('vshard.failover')
-    M.failover_fiber = lfiber.self()
+local function failover_f()
+    local module_version = M.module_version
      local min_timeout = math.min(consts.FAILOVER_UP_TIMEOUT,
                                   consts.FAILOVER_DOWN_TIMEOUT)
      -- This flag is used to avoid logging like:
@@ -545,10 +543,12 @@ local function router_cfg(cfg)
          M.route_map[bucket] = M.replicasets[rs.uuid]
      end
      if M.failover_fiber == nil then
-        lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover')
+        M.failover_fiber = util.reloadable_fiber_create(
+            'vshard.failover', M, 'failover_f')
      end
      if M.discovery_fiber == nil then
-        lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 
'Discovery')
+        M.discovery_fiber = util.reloadable_fiber_create(
+            'vshard.discovery', M, 'discovery_f')
      end
      -- Destroy connections, not used in a new configuration.
      collectgarbage()
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index c1df0e6..5ec6898 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -352,10 +352,10 @@ end
  -- @param module_version Module version, on which the current
  --        function had been started. If the actual module version
  --        appears to be changed, then stop recovery. It is
---        restarted in reloadable_fiber_f().
+--        restarted in reloadable_fiber.
  --
-local function recovery_f(module_version)
-    lfiber.name('vshard.recovery')
+local function recovery_f()
+    local module_version = M.module_version
      local _bucket = box.space._bucket
      M.buckets_to_recovery = {}
      for _, bucket in 
_bucket.index.status:pairs({consts.BUCKET.SENDING}) do
@@ -728,10 +728,9 @@ local function local_on_master_enable()
      M._on_master_enable:run()
      -- Start background process to collect garbage.
      M.collect_bucket_garbage_fiber =
-        lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',
-                      'Garbage collector')
+        util.reloadable_fiber_create('vshard.gc', M, 'collect_garbage_f')
      M.recovery_fiber =
-        lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery')
+        util.reloadable_fiber_create('vshard.recovery', M, 'recovery_f')
      -- TODO: check current status
      log.info("Took on replicaset master role")
  end
@@ -1024,10 +1023,10 @@ end
  -- @param module_version Module version, on which the current
  --        function had been started. If the actual module version
  --        appears to be changed, then stop GC. It is restarted
---        in reloadable_fiber_f().
+--        in reloadable_fiber.
  --
-function collect_garbage_f(module_version)
-    lfiber.name('vshard.gc')
+function collect_garbage_f()
+    local module_version = M.module_version
      -- Collector controller. Changes of _bucket increments
      -- bucket generation. Garbage collector has its own bucket
      -- generation which is <= actual. Garbage collection is
@@ -1351,8 +1350,8 @@ end
  -- Background rebalancer. Works on a storage which has the
  -- smallest replicaset uuid and which is master.
  --
-local function rebalancer_f(module_version)
-    lfiber.name('vshard.rebalancer')
+local function rebalancer_f()
+    local module_version = M.module_version
      while module_version == M.module_version do
          while not M.is_rebalancer_active do
              log.info('Rebalancer is disabled. Sleep')
@@ -1642,8 +1641,8 @@ local function storage_cfg(cfg, this_replica_uuid)

      if min_master == this_replica then
          if not M.rebalancer_fiber then
-            M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M,
-                                               'rebalancer_f', 
'Rebalancer')
+            M.rebalancer_fiber = util.reloadable_fiber_create(
+                'vshard.rebalancer', M, 'rebalancer_f')
          else
              log.info('Wakeup rebalancer')
              -- Configuration had changed. Time to rebalance.
diff --git a/vshard/util.lua b/vshard/util.lua
index fb875ce..ea676ff 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -10,7 +10,7 @@ if not M then
      --
      M = {
          -- Latest versions of functions.
-        reloadable_fiber_f = nil,
+        reloadable_fiber_main_loop = nil,
      }
      rawset(_G, MODULE_INTERNALS, M)
  end
@@ -34,40 +34,52 @@ end
  --
  -- Wrapper to run a func in infinite loop and restart it on
  -- errors and module reload.
--- To handle module reload and run new version of a function
--- in the module, the function should just return.
---
--- @param module Module which can be reloaded.
--- @param func_name Name of a function to be executed in the
---        module.
--- @param worker_name Name of the reloadable background subsystem.
---        For example: "Garbage Collector", "Recovery", "Discovery",
---        "Rebalancer". Used only for an activity logging.
+-- This loop executes the latest version of itself in case of
+-- reload of that module.
+-- See description of parameters in `reloadable_fiber_create`.
  --
-local function reloadable_fiber_f(module, func_name, worker_name)
-    log.info('%s has been started', worker_name)
+local function reloadable_fiber_main_loop(module, func_name)
+    log.info('%s has been started', func_name)
      local func = module[func_name]
-::reload_loop::
-    local ok, err = pcall(func, module.module_version)
-    -- yield serves two pursoses:
+::restart_loop::
+    local ok, err = pcall(func)
+    -- yield serves two purposes:
      --  * makes this fiber cancellable
      --  * prevents 100% cpu consumption
      fiber.yield()
      if not ok then
-        log.error('%s has been failed: %s', worker_name, err)
+        log.error('%s has been failed: %s', func_name, err)
          if func == module[func_name] then
-            goto reload_loop
+            goto restart_loop
          end
          -- There is a chance that error was raised during reload
          -- (or caused by reload). Perform reload in case function
          -- has been changed.
-        log.error('%s: reloadable function %s has been changed',
-                  worker_name, func_name)
+        log.error('reloadable function %s has been changed', func_name)
      end
-    log.info('%s is reloaded, restarting', worker_name)
+    log.info('module is reloaded, restarting')
      -- luajit drops this frame if next function is called in
      -- return statement.
-    return M.reloadable_fiber_f(module, func_name, worker_name)
+    return M.reloadable_fiber_main_loop(module, func_name)
+end
+
+--
+-- Create a new fiber which runs a function in a loop. This loop
+-- is aware of reload mechanism and it loads a new version of the
+-- function in that case.
+-- To handle module reload and run new version of a function
+-- in the module, the function should just return.
+-- @param fiber_name Name of a new fiber. E.g. "vshard.rebalancer".
+-- @param module Module which can be reloaded.
+-- @param func_name Name of a function to be executed in the
+--        module.
+-- @retval New fiber.
+--
+local function reloadable_fiber_create(fiber_name, module, func_name)
+    assert(type(fiber_name) == 'string')
+    local xfiber = fiber.create(reloadable_fiber_main_loop, module, 
func_name)
+    xfiber:name(fiber_name)
+    return xfiber
  end

  --
@@ -98,7 +110,7 @@ local function generate_self_checker(obj_name, 
func_name, mt, func)
  end

  -- Update latest versions of function
-M.reloadable_fiber_f = reloadable_fiber_f
+M.reloadable_fiber_main_loop = reloadable_fiber_main_loop

  local function sync_task(delay, task, ...)
      if delay then
@@ -121,7 +133,7 @@ end

  return {
      tuple_extract_key = tuple_extract_key,
-    reloadable_fiber_f = reloadable_fiber_f,
+    reloadable_fiber_create = reloadable_fiber_create,
      generate_self_checker = generate_self_checker,
      async_task = async_task,
      internal = M,





More information about the Tarantool-patches mailing list