[tarantool-patches] [PATCH 2/4] Refactor reloadable fiber
AKhatskevich
avkhatskevich at tarantool.org
Mon Jul 30 11:56:04 MSK 2018
Reloadable fiber changes:
* renamed reloadable_fiber_f -> reloadable_fiber
* reloadable_fiber creates fiber and gives name on its own
* worker name replaced with a fiber name in logs
* added extra data argument, which is passed to a function
---
test/rebalancer/rebalancer.result | 2 +-
test/rebalancer/rebalancer.test.lua | 2 +-
test/router/reload.result | 4 +--
test/router/reload.test.lua | 4 +--
test/storage/reload.result | 6 ++--
test/storage/reload.test.lua | 6 ++--
test/unit/garbage.result | 2 +-
test/unit/garbage.test.lua | 2 +-
test/unit/util.result | 20 ++++---------
test/unit/util.test.lua | 12 ++++----
vshard/router/init.lua | 16 +++++-----
vshard/storage/init.lua | 21 +++++++------
vshard/util.lua | 59 +++++++++++++++++++++----------------
13 files changed, 76 insertions(+), 80 deletions(-)
diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result
index 88cbaae..bf9e63b 100644
--- a/test/rebalancer/rebalancer.result
+++ b/test/rebalancer/rebalancer.result
@@ -418,7 +418,7 @@ vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)
fiber = require('fiber')
---
...
-while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end
+while not test_run:grep_log('box_2_a', "vshard.rebalancer has been started") do fiber.sleep(0.1) end
---
...
while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end
diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua
index 01f2061..24c2706 100644
--- a/test/rebalancer/rebalancer.test.lua
+++ b/test/rebalancer/rebalancer.test.lua
@@ -195,7 +195,7 @@ switch_rs1_master()
vshard.storage.cfg(cfg, names.replica_uuid.box_2_b)
fiber = require('fiber')
-while not test_run:grep_log('box_2_a', "Rebalancer has been started") do fiber.sleep(0.1) end
+while not test_run:grep_log('box_2_a', "vshard.rebalancer has been started") do fiber.sleep(0.1) end
while not test_run:grep_log('box_1_a', "Rebalancer location has changed") do fiber.sleep(0.1) end
--
diff --git a/test/router/reload.result b/test/router/reload.result
index 88122aa..28b5004 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -116,10 +116,10 @@ vshard.router.module_version()
check_reloaded()
---
...
-while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('router_1', 'vshard.failover has been started') == nil do fiber.sleep(0.1) end
---
...
-while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'vshard.discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
---
...
check_reloaded()
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 01b7163..d15986f 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -59,8 +59,8 @@ vshard.router.module_version()
check_reloaded()
-while test_run:grep_log('router_1', 'Failover has been started') == nil do fiber.sleep(0.1) end
-while test_run:grep_log('router_1', 'Discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'vshard.failover has been started') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('router_1', 'vshard.discovery has been started') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
check_reloaded()
diff --git a/test/storage/reload.result b/test/storage/reload.result
index b91b622..898a18f 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -113,13 +113,13 @@ vshard.storage.module_version()
check_reloaded()
---
...
-while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('storage_2_a', 'vshard.gc has been started') == nil do fiber.sleep(0.1) end
---
...
-while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+while test_run:grep_log('storage_2_a', 'vshard.recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
---
...
-while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+while test_run:grep_log('storage_2_a', 'vshard.rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
---
...
check_reloaded()
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 9140299..a2e8241 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -59,9 +59,9 @@ vshard.storage.module_version()
check_reloaded()
-while test_run:grep_log('storage_2_a', 'Garbage collector has been started') == nil do fiber.sleep(0.1) end
-while test_run:grep_log('storage_2_a', 'Recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
-while test_run:grep_log('storage_2_a', 'Rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+while test_run:grep_log('storage_2_a', 'vshard.gc has been started') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('storage_2_a', 'vshard.recovery has been started') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+while test_run:grep_log('storage_2_a', 'vshard.rebalancer has been started') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
check_reloaded()
diff --git a/test/unit/garbage.result b/test/unit/garbage.result
index 0da8ee1..a352bd8 100644
--- a/test/unit/garbage.result
+++ b/test/unit/garbage.result
@@ -343,7 +343,7 @@ control.bucket_generation_collected
collect_f = vshard.storage.internal.collect_garbage_f
---
...
-f = fiber.create(collect_f, vshard.storage.module_version())
+f = fiber.create(collect_f)
---
...
fill_spaces_with_garbage()
diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
index db7821f..80d37e7 100644
--- a/test/unit/garbage.test.lua
+++ b/test/unit/garbage.test.lua
@@ -149,7 +149,7 @@ control.bucket_generation_collected
-- Test continuous garbage collection via background fiber.
--
collect_f = vshard.storage.internal.collect_garbage_f
-f = fiber.create(collect_f, vshard.storage.module_version())
+f = fiber.create(collect_f)
fill_spaces_with_garbage()
-- Wait until garbage collection is finished.
while #s2:select{} ~= 2 do fiber.sleep(0.1) end
diff --git a/test/unit/util.result b/test/unit/util.result
index 30906d1..56b863e 100644
--- a/test/unit/util.result
+++ b/test/unit/util.result
@@ -34,22 +34,18 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end
fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end
---
...
-fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
+fib = util.reloadable_fiber('Worker_name', fake_M, 'reloadable_function')
---
...
-while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end
+while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end
---
...
fib:cancel()
---
...
-test_run:grep_log('default', 'Worker_name is reloaded, restarting')
+test_run:grep_log('default', 'module is reloaded, restarting')
---
-- Worker_name is reloaded, restarting
-...
-test_run:grep_log('default', 'Worker_name has been started')
----
-- Worker_name has been started
+- module is reloaded, restarting
...
log.info(string.rep('a', 1000))
---
@@ -58,16 +54,12 @@ log.info(string.rep('a', 1000))
fake_M.reloadable_function = function () fiber.sleep(0.01); return true end
---
...
-fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
+fib = util.reloadable_fiber('Worker_name', fake_M, 'reloadable_function')
---
...
-while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end
+while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end
---
...
fib:cancel()
---
...
-test_run:grep_log('default', 'Worker_name has been started', 1000)
----
-- Worker_name has been started
-...
diff --git a/test/unit/util.test.lua b/test/unit/util.test.lua
index 131274c..b26bb51 100644
--- a/test/unit/util.test.lua
+++ b/test/unit/util.test.lua
@@ -14,16 +14,14 @@ function slow_fail() fiber.sleep(0.01) error('Error happened.') end
-- Check autoreload on function change during failure.
fake_M.reloadable_function = function () fake_M.reloadable_function = slow_fail; slow_fail() end
-fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
-while not test_run:grep_log('default', 'Worker_name: reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end
+fib = util.reloadable_fiber('Worker_name', fake_M, 'reloadable_function')
+while not test_run:grep_log('default', 'reloadable function reloadable_function has been changed') do fiber.sleep(0.01); end
fib:cancel()
-test_run:grep_log('default', 'Worker_name is reloaded, restarting')
-test_run:grep_log('default', 'Worker_name has been started')
+test_run:grep_log('default', 'module is reloaded, restarting')
log.info(string.rep('a', 1000))
-- Check reload feature.
fake_M.reloadable_function = function () fiber.sleep(0.01); return true end
-fib = fiber.create(util.reloadable_fiber_f, fake_M, 'reloadable_function', 'Worker_name')
-while not test_run:grep_log('default', 'Worker_name is reloaded, restarting') do fiber.sleep(0.01) end
+fib = util.reloadable_fiber('Worker_name', fake_M, 'reloadable_function')
+while not test_run:grep_log('default', 'module is reloaded, restarting') do fiber.sleep(0.01) end
fib:cancel()
-test_run:grep_log('default', 'Worker_name has been started', 1000)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 1a0ed2f..2df3446 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -149,9 +149,8 @@ end
-- Background fiber to perform discovery. It periodically scans
-- replicasets one by one and updates route_map.
--
-local function discovery_f(module_version)
- lfiber.name('discovery_fiber')
- M.discovery_fiber = lfiber.self()
+local function discovery_f()
+ local module_version = M.module_version
local iterations_until_lua_gc =
consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
while module_version == M.module_version do
@@ -459,9 +458,8 @@ end
-- tries to reconnect to the best replica. When the connection is
-- established, it replaces the original replica.
--
-local function failover_f(module_version)
- lfiber.name('vshard.failover')
- M.failover_fiber = lfiber.self()
+local function failover_f()
+ local module_version = M.module_version
local min_timeout = math.min(consts.FAILOVER_UP_TIMEOUT,
consts.FAILOVER_DOWN_TIMEOUT)
-- This flag is used to avoid logging like:
@@ -545,10 +543,12 @@ local function router_cfg(cfg)
M.route_map[bucket] = M.replicasets[rs.uuid]
end
if M.failover_fiber == nil then
- lfiber.create(util.reloadable_fiber_f, M, 'failover_f', 'Failover')
+ M.failover_fiber = util.reloadable_fiber('vshard.failover', M,
+ 'failover_f')
end
if M.discovery_fiber == nil then
- lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery')
+ M.discovery_fiber = util.reloadable_fiber('vshard.discovery', M,
+ 'discovery_f')
end
-- Destroy connections, not used in a new configuration.
collectgarbage()
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index c1df0e6..8ca81f6 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -354,8 +354,8 @@ end
-- appears to be changed, then stop recovery. It is
-- restarted in reloadable_fiber_f().
--
-local function recovery_f(module_version)
- lfiber.name('vshard.recovery')
+local function recovery_f()
+ local module_version = M.module_version
local _bucket = box.space._bucket
M.buckets_to_recovery = {}
for _, bucket in _bucket.index.status:pairs({consts.BUCKET.SENDING}) do
@@ -728,10 +728,9 @@ local function local_on_master_enable()
M._on_master_enable:run()
-- Start background process to collect garbage.
M.collect_bucket_garbage_fiber =
- lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',
- 'Garbage collector')
+ util.reloadable_fiber('vshard.gc', M, 'collect_garbage_f')
M.recovery_fiber =
- lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery')
+ util.reloadable_fiber('vshard.recovery', M, 'recovery_f')
-- TODO: check current status
log.info("Took on replicaset master role")
end
@@ -1026,8 +1025,8 @@ end
-- appears to be changed, then stop GC. It is restarted
-- in reloadable_fiber_f().
--
-function collect_garbage_f(module_version)
- lfiber.name('vshard.gc')
+function collect_garbage_f()
+ local module_version = M.module_version
-- Collector controller. Changes of _bucket increments
-- bucket generation. Garbage collector has its own bucket
-- generation which is <= actual. Garbage collection is
@@ -1351,8 +1350,8 @@ end
-- Background rebalancer. Works on a storage which has the
-- smallest replicaset uuid and which is master.
--
-local function rebalancer_f(module_version)
- lfiber.name('vshard.rebalancer')
+local function rebalancer_f()
+ local module_version = M.module_version
while module_version == M.module_version do
while not M.is_rebalancer_active do
log.info('Rebalancer is disabled. Sleep')
@@ -1642,8 +1641,8 @@ local function storage_cfg(cfg, this_replica_uuid)
if min_master == this_replica then
if not M.rebalancer_fiber then
- M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M,
- 'rebalancer_f', 'Rebalancer')
+ M.rebalancer_fiber = util.reloadable_fiber('vshard.rebalancer', M,
+ 'rebalancer_f')
else
log.info('Wakeup rebalancer')
-- Configuration had changed. Time to rebalance.
diff --git a/vshard/util.lua b/vshard/util.lua
index fb875ce..1319acc 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -10,7 +10,7 @@ if not M then
--
M = {
-- Latest versions of functions.
- reloadable_fiber_f = nil,
+ reloadable_fiber_main_loop = nil,
}
rawset(_G, MODULE_INTERNALS, M)
end
@@ -31,6 +31,30 @@ local function tuple_extract_key(tuple, parts)
return key
end
+local function reloadable_fiber_main_loop(module, func_name, data)
+ local func = module[func_name]
+::restart_loop::
+ local ok, err = pcall(func, data)
+ -- yield serves two purposes:
+ -- * makes this fiber cancellable
+ -- * prevents 100% cpu consumption
+ fiber.yield()
+ if not ok then
+ log.error('%s has been failed: %s', func_name, err)
+ if func == module[func_name] then
+ goto restart_loop
+ end
+ -- There is a chance that error was raised during reload
+ -- (or caused by reload). Perform reload in case function
+ -- has been changed.
+ log.error('reloadable function %s has been changed', func_name)
+ end
+ log.info('module is reloaded, restarting')
+ -- luajit drops this frame if next function is called in
+ -- return statement.
+ return M.reloadable_fiber_main_loop(module, func_name, data)
+end
+
--
-- Wrapper to run a func in infinite loop and restart it on
-- errors and module reload.
@@ -44,30 +68,13 @@ end
-- For example: "Garbage Collector", "Recovery", "Discovery",
-- "Rebalancer". Used only for an activity logging.
--
-local function reloadable_fiber_f(module, func_name, worker_name)
+local function reloadable_fiber(worker_name, module, func_name, data)
+ assert(type(worker_name) == 'string')
+ local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name,
+ data)
log.info('%s has been started', worker_name)
- local func = module[func_name]
-::reload_loop::
- local ok, err = pcall(func, module.module_version)
- -- yield serves two pursoses:
- -- * makes this fiber cancellable
- -- * prevents 100% cpu consumption
- fiber.yield()
- if not ok then
- log.error('%s has been failed: %s', worker_name, err)
- if func == module[func_name] then
- goto reload_loop
- end
- -- There is a chance that error was raised during reload
- -- (or caused by reload). Perform reload in case function
- -- has been changed.
- log.error('%s: reloadable function %s has been changed',
- worker_name, func_name)
- end
- log.info('%s is reloaded, restarting', worker_name)
- -- luajit drops this frame if next function is called in
- -- return statement.
- return M.reloadable_fiber_f(module, func_name, worker_name)
+ xfiber:name(worker_name)
+ return xfiber
end
--
@@ -98,7 +105,7 @@ local function generate_self_checker(obj_name, func_name, mt, func)
end
-- Update latest versions of function
-M.reloadable_fiber_f = reloadable_fiber_f
+M.reloadable_fiber_main_loop = reloadable_fiber_main_loop
local function sync_task(delay, task, ...)
if delay then
@@ -121,7 +128,7 @@ end
return {
tuple_extract_key = tuple_extract_key,
- reloadable_fiber_f = reloadable_fiber_f,
+ reloadable_fiber = reloadable_fiber,
generate_self_checker = generate_self_checker,
async_task = async_task,
internal = M,
--
2.14.1
More information about the Tarantool-patches
mailing list