[tarantool-patches] Re: [PATCH][vshard] Reload reloadable fiber
Alex Khatskevich
avkhatskevich at tarantool.org
Tue Jun 26 15:50:43 MSK 2018
Sorry, I forgot to put the new patch here
commit 4ddab3c47c5963c3138701d89ba3091d5da9848a
Author: AKhatskevich <avkhatskevich at tarantool.org>
Date: Thu Jun 14 14:03:07 2018 +0300
Reload reloadable fiber
Fixed a problem:
The `reloadable_fiber_f` was running an infinite while loop and
preventing the whole module from being reloaded.
This behavior is fixed by calling new version of
`reloadable_fiber_f` in
a return statement instead of the where loop. Note: calling a function
in a return statement doesn't increase a stack size.
Extra changes:
* transfer write "started" message responsibility from caller to
reloadable fiber
Closes #116
diff --git a/test/rebalancer/rebalancer.result
b/test/rebalancer/rebalancer.result
index 22100fe..6e3aa81 100644
--- a/test/rebalancer/rebalancer.result
+++ b/test/rebalancer/rebalancer.result
@@ -424,7 +424,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)
fiber = require('fiber')
---
...
-while not test_run:grep_log('box_2_a', "Run rebalancer") do
fiber.sleep(0.1) end
+while not test_run:grep_log('box_2_a', "Rebalancer has been started")
do fiber.sleep(0.1) end
---
...
while not test_run:grep_log('box_1_a', "Rebalancer location has
changed") do fiber.sleep(0.1) end
diff --git a/test/rebalancer/rebalancer.test.lua
b/test/rebalancer/rebalancer.test.lua
index 3327f30..922357a 100644
--- a/test/rebalancer/rebalancer.test.lua
+++ b/test/rebalancer/rebalancer.test.lua
@@ -197,7 +197,7 @@ switch_rs1_master()
vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)
fiber = require('fiber')
-while not test_run:grep_log('box_2_a', "Run rebalancer") do
fiber.sleep(0.1) end
+while not test_run:grep_log('box_2_a', "Rebalancer has been started")
do fiber.sleep(0.1) end
while not test_run:grep_log('box_1_a', "Rebalancer location has
changed") do fiber.sleep(0.1) end
--
diff --git a/test/router/reload.result b/test/router/reload.result
index 19a9ead..47f3c2e 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -116,10 +116,10 @@ vshard.router.module_version()
check_reloaded()
---
...
-while test_run:grep_log('router_1', 'Failover has been reloaded') ==
nil do fiber.sleep(0.1) end
+while test_run:grep_log('router_1', 'Failover has been started') == nil
do fiber.sleep(0.1) end
---
...
-while test_run:grep_log('router_1', 'Discovery has been reloaded') ==
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'Discovery has been started') ==
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
---
...
check_reloaded()
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 6e21b74..af2939d 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -59,8 +59,8 @@ vshard.router.module_version()
check_reloaded()
-while test_run:grep_log('router_1', 'Failover has been reloaded') ==
nil do fiber.sleep(0.1) end
-while test_run:grep_log('router_1', 'Discovery has been reloaded') ==
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'Failover has been started') == nil
do fiber.sleep(0.1) end
+while test_run:grep_log('router_1', 'Discovery has been started') ==
nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
check_reloaded()
diff --git a/test/storage/reload.result b/test/storage/reload.result
index f689cf4..531d984 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -113,13 +113,13 @@ vshard.storage.module_version()
check_reloaded()
---
...
-while test_run:grep_log('storage_2_a', 'Garbage collector has been
reloaded') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('storage_2_a', 'Garbage collector has been
started') == nil do fiber.sleep(0.1) end
---
...
-while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') ==
nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+while test_run:grep_log('storage_2_a', 'Recovery has been started') ==
nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
---
...
-while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded')
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+while test_run:grep_log('storage_2_a', 'Rebalancer has been started')
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
---
...
check_reloaded()
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 6e19a92..64c3a60 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -59,9 +59,9 @@ vshard.storage.module_version()
check_reloaded()
-while test_run:grep_log('storage_2_a', 'Garbage collector has been
reloaded') == nil do fiber.sleep(0.1) end
-while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') ==
nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
-while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded')
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+while test_run:grep_log('storage_2_a', 'Garbage collector has been
started') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('storage_2_a', 'Recovery has been started') ==
nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+while test_run:grep_log('storage_2_a', 'Rebalancer has been started')
== nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
check_reloaded()
diff --git a/test/unit/util.result b/test/unit/util.result
new file mode 100644
index 0000000..dbfcce9
--- /dev/null
+++ b/test/unit/util.result
@@ -0,0 +1,72 @@
+test_run = require('test_run').new()
+---
+...
+fiber = require('fiber')
+---
+...
+log = require('log')
+---
+...
+test_util = require('util')
+---
+...
+util = require('vshard.util')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+fake_M = {
+ reloadable_func = nil,
+ module_version = 1,
+};
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Check autoreload on function change during failure.
+fake_M.reloadable_function = function () fake_M.reloadable_function =
42; error('Error happened.') end
+---
+...
+fib = fiber.create(util.reloadable_fiber_f, fake_M,
'reloadable_function', 'Worker_name')
+---
+...
+fib:cancel()
+---
+...
+test_run:grep_log('default', 'Worker_name: reloadable function
reloadable_function has changed')
+---
+- 'Worker_name: reloadable function reloadable_function has changed'
+...
+test_run:grep_log('default', 'Worker_name is reloaded, restarting')
+---
+- Worker_name is reloaded, restarting
+...
+test_run:grep_log('default', 'Worker_name has been started')
+---
+- Worker_name has been started
+...
+log.info(string.rep('a', 1000))
+---
+...
+-- Check reload feature.
+fake_M.reloadable_function = function () return true end
+---
+...
+fib = fiber.create(util.reloadable_fiber_f, fake_M,
'reloadable_function', 'Worker_name')
+---
+...
+fib:cancel()
+---
+...
+test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000)
+---
+- Worker_name is reloaded, restarting
+...
+test_run:grep_log('default', 'Worker_name has been started', 1000)
+---
+- Worker_name has been started
+...
diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua
new file mode 100644
index 0000000..094cae9
--- /dev/null
+++ b/test/unit/util.test.lua
@@ -0,0 +1,29 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+log = require('log')
+test_util = require('util')
+util = require('vshard.util')
+
+test_run:cmd("setopt delimiter ';'")
+fake_M = {
+ reloadable_func = nil,
+ module_version = 1,
+};
+test_run:cmd("setopt delimiter ''");
+
+-- Check autoreload on function change during failure.
+fake_M.reloadable_function = function () fake_M.reloadable_function =
42; error('Error happened.') end
+
+fib = fiber.create(util.reloadable_fiber_f, fake_M,
'reloadable_function', 'Worker_name')
+fib:cancel()
+test_run:grep_log('default', 'Worker_name: reloadable function
reloadable_function has changed')
+test_run:grep_log('default', 'Worker_name is reloaded, restarting')
+test_run:grep_log('default', 'Worker_name has been started')
+log.info(string.rep('a', 1000))
+
+-- Check reload feature.
+fake_M.reloadable_function = function () return true end
+fib = fiber.create(util.reloadable_fiber_f, fake_M,
'reloadable_function', 'Worker_name')
+fib:cancel()
+test_run:grep_log('default', 'Worker_name is reloaded, restarting', 1000)
+test_run:grep_log('default', 'Worker_name has been started', 1000)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 21093e5..9e18b27 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -491,11 +491,9 @@ local function router_cfg(cfg)
end
lreplicaset.wait_masters_connect(new_replicasets)
if M.failover_fiber == nil then
- log.info('Start failover fiber')
lfiber.create(util.reloadable_fiber_f, M, 'failover_f',
'Failover')
end
if M.discovery_fiber == nil then
- log.info('Start discovery fiber')
lfiber.create(util.reloadable_fiber_f, M, 'discovery_f',
'Discovery')
end
-- Destroy connections, not used in a new configuration.
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 57076e1..300751f 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -692,10 +692,8 @@ local function local_on_master_enable()
M.collect_bucket_garbage_fiber =
lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',
'Garbage collector')
- log.info("GC is started")
M.recovery_fiber =
lfiber.create(util.reloadable_fiber_f, M, 'recovery_f',
'Recovery')
- log.info('Recovery is started')
-- TODO: check current status
log.info("Took on replicaset master role")
end
@@ -1585,7 +1583,6 @@ local function storage_cfg(cfg, this_replica_uuid)
if min_master == this_replica then
if not M.rebalancer_fiber then
- log.info('Run rebalancer')
M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M,
'rebalancer_f', 'Rebalancer')
else
diff --git a/vshard/util.lua b/vshard/util.lua
index bb71318..920f53e 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -2,6 +2,19 @@
local log = require('log')
local fiber = require('fiber')
+local MODULE_INTERNALS = '__module_vshard_util'
+local M = rawget(_G, MODULE_INTERNALS)
+if not M then
+ --
+ -- The module is loaded for the first time.
+ --
+ M = {
+ -- Latest versions of functions.
+ reloadable_fiber_f = nil,
+ }
+ rawset(_G, MODULE_INTERNALS, M)
+end
+
--
-- Extract parts of a tuple.
-- @param tuple Tuple to extract a key from.
@@ -19,33 +32,37 @@ local function tuple_extract_key(tuple, parts)
end
--
--- Wrapper to run @a func in infinite loop and restart it on the
--- module reload. This function CAN NOT BE AUTORELOADED. To update
--- it you must manualy stop all fibers, run by this function, do
--- reload, and then restart all stopped fibers. This can be done,
--- for example, by calling vshard.storage/router.cfg() again with
--- the same config as earlier.
+-- Wrapper to run a func in infinite loop and restart it on
+-- errors and module reload.
+-- To handle module reload and run new version of a function
+-- in the module, the function should just return.
--
--- @param func Reloadable function to run. It must accept current
--- module version as an argument, and interrupt itself,
--- when it is changed.
--- @param worker_name Name of the function. Usual infinite fiber
--- represents a background subsystem, which has a name. For
--- example: "Garbage Collector", "Recovery", "Discovery",
--- "Rebalancer".
--- @param M Module which can reload.
+-- @param module Module which can be reloaded.
+-- @param func_name Name of a function to be executed in the
+-- module.
+-- @param worker_name Name of the reloadable background subsystem.
+-- For example: "Garbage Collector", "Recovery", "Discovery",
+-- "Rebalancer". Used only for an activity logging.
--
-local function reloadable_fiber_f(M, func_name, worker_name)
- while true do
- local ok, err = pcall(M[func_name], M.module_version)
- if not ok then
- log.error('%s has been failed: %s', worker_name, err)
- fiber.yield()
- else
- log.info('%s has been reloaded', worker_name)
- fiber.yield()
+local function reloadable_fiber_f(module, func_name, worker_name)
+ log.info('%s has been started', worker_name)
+ local func = module[func_name]
+ local ok, err = pcall(func, module.module_version)
+ if not ok then
+ log.error('%s has been failed: %s', worker_name, err)
+ if func ~= module[func_name] then
+ log.warn('%s: reloadable function %s has changed',
+ worker_name, func_name)
end
end
+ -- yield serves two pursoses:
+ -- * makes this fiber cancellable
+ -- * prevents 100% cpu consumption
+ fiber.yield()
+ log.info('%s is reloaded, restarting', worker_name)
+ -- luajit drops this frame if next function is called in
+ -- return statement.
+ return M.reloadable_fiber_f(module, func_name, worker_name)
end
--
@@ -75,8 +92,12 @@ local function generate_self_checker(obj_name,
func_name, mt, func)
end
end
+-- Update latest versions of function
+M.reloadable_fiber_f = reloadable_fiber_f
+
return {
tuple_extract_key = tuple_extract_key,
reloadable_fiber_f = reloadable_fiber_f,
generate_self_checker = generate_self_checker,
+ internal = M,
}
More information about the Tarantool-patches
mailing list