[patches] [PATCH vshard 2/2] Enable automatic periodic lua garbage collection
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Fri Feb 23 15:54:08 MSK 2018
When vshard a router or a storage is run for a long time, it
can generate many garbage by background fibers like bucket GC,
recovery etc.
Allow to specify in a config the option
garbage_lua_collect_interval to enable this lua GC.
Else enable to set garbage_collect_interval for bucket garbage
collector.
Closes #77
Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
test/router/garbage_collector.result | 101 ++++++++++++++++++++++++++++++++
test/router/garbage_collector.test.lua | 34 +++++++++++
test/router/reload.result | 3 +-
test/router/reload.test.lua | 3 +-
test/storage/garbage_collector.result | 94 +++++++++++++++++++++++++++++
test/storage/garbage_collector.test.lua | 38 ++++++++++++
test/storage/reload.result | 3 +-
test/storage/reload.test.lua | 3 +-
test/unit/config.result | 57 ++++++++++++++++++
test/unit/config.test.lua | 21 +++++++
test/unit/garbage.result | 3 +
test/unit/garbage.test.lua | 1 +
vshard/cfg.lua | 14 +++++
vshard/consts.lua | 2 +-
vshard/router/init.lua | 19 ++++++
vshard/storage/init.lua | 42 +++++++++++--
vshard/util.lua | 23 +++++++-
17 files changed, 449 insertions(+), 12 deletions(-)
create mode 100644 test/router/garbage_collector.result
create mode 100644 test/router/garbage_collector.test.lua
diff --git a/test/router/garbage_collector.result b/test/router/garbage_collector.result
new file mode 100644
index 0000000..5e6ea3a
--- /dev/null
+++ b/test/router/garbage_collector.result
@@ -0,0 +1,101 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'router')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'router')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+---
+- true
+...
+test_run:cmd("start server router_1")
+---
+- true
+...
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('router_1')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+cfg.garbage_lua_collect_interval = 0.5
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+while not test_run:grep_log('router_1', 'Start Lua GC') do fiber.sleep(0.1) end
+---
+...
+a = setmetatable({}, {__mode = 'v'})
+---
+...
+a.k = {b = 100}
+---
+...
+fiber.sleep(0.7)
+---
+...
+a.k
+---
+- null
+...
+cfg.garbage_lua_collect_interval = nil
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+while not test_run:grep_log('router_1', 'Stop Lua GC') do fiber.sleep(0.1) end
+---
+...
+a.k = {b = 100}
+---
+...
+fiber.sleep(0.7)
+---
+...
+a.k
+---
+- b: 100
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:cmd("stop server router_1")
+---
+- true
+...
+test_run:cmd("cleanup server router_1")
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
diff --git a/test/router/garbage_collector.test.lua b/test/router/garbage_collector.test.lua
new file mode 100644
index 0000000..be18974
--- /dev/null
+++ b/test/router/garbage_collector.test.lua
@@ -0,0 +1,34 @@
+test_run = require('test_run').new()
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+test_run:create_cluster(REPLICASET_1, 'router')
+test_run:create_cluster(REPLICASET_2, 'router')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+test_run:cmd("start server router_1")
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('router_1')
+fiber = require('fiber')
+cfg.garbage_lua_collect_interval = 0.5
+vshard.router.cfg(cfg)
+while not test_run:grep_log('router_1', 'Start Lua GC') do fiber.sleep(0.1) end
+a = setmetatable({}, {__mode = 'v'})
+a.k = {b = 100}
+fiber.sleep(0.7)
+a.k
+cfg.garbage_lua_collect_interval = nil
+vshard.router.cfg(cfg)
+while not test_run:grep_log('router_1', 'Stop Lua GC') do fiber.sleep(0.1) end
+a.k = {b = 100}
+fiber.sleep(0.7)
+a.k
+
+test_run:switch("default")
+test_run:cmd("stop server router_1")
+test_run:cmd("cleanup server router_1")
+test_run:drop_cluster(REPLICASET_1)
+test_run:drop_cluster(REPLICASET_2)
diff --git a/test/router/reload.result b/test/router/reload.result
index 19a9ead..d89ecde 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -65,7 +65,8 @@ test_run:cmd("setopt delimiter ';'")
...
function check_reloaded()
for k, v in pairs(old_internal) do
- if v == vshard.router.internal[k] then
+ if v == vshard.router.internal[k] and
+ k ~= 'collect_lua_garbage_f' then
return k
end
end
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 6e21b74..64ec205 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -26,7 +26,8 @@ vshard.router.module_version()
test_run:cmd("setopt delimiter ';'")
function check_reloaded()
for k, v in pairs(old_internal) do
- if v == vshard.router.internal[k] then
+ if v == vshard.router.internal[k] and
+ k ~= 'collect_lua_garbage_f' then
return k
end
end
diff --git a/test/storage/garbage_collector.result b/test/storage/garbage_collector.result
index 5d5558c..5c9c50f 100644
--- a/test/storage/garbage_collector.result
+++ b/test/storage/garbage_collector.result
@@ -110,6 +110,100 @@ customer:select{}
---
- - [10, 1, 'user1']
...
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('storage_1_a')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+log = require('log')
+---
+...
+cfg.garbage_lua_collect_interval = 0.5
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+while not test_run:grep_log('storage_1_a', 'Start Lua GC') do fiber.sleep(0.1) end
+---
+...
+-- Ensure that the GC is not started if it already exists.
+log.info(string.rep('a', 1000))
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+fiber.sleep(0.5)
+---
+...
+test_run:grep_log('storage_1_a', 'Start Lua GC', 1000)
+---
+- null
+...
+-- Create a weak reference to a able {b = 100} - it must be
+-- deleted on the next GC.
+a = setmetatable({}, {__mode = 'v'})
+---
+...
+a.k = {b = 100}
+---
+...
+-- Wait until Lua GC deletes a.k.
+fiber.sleep(0.7)
+---
+...
+a.k
+---
+- null
+...
+cfg.garbage_lua_collect_interval = nil
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+while not test_run:grep_log('storage_1_a', 'Stop Lua GC') do fiber.sleep(0.1) end
+---
+...
+a.k = {b = 100}
+---
+...
+fiber.sleep(0.7)
+---
+...
+a.k ~= nil
+---
+- true
+...
+-- Ensure the Lua garbage collector is stoped when the master
+-- becomes a slave.
+log.info(string.rep('a', 1000))
+---
+...
+cfg.garbage_lua_collect_interval = 0.5
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+while not test_run:grep_log('storage_1_a', 'Start Lua GC', 1000) do fiber.sleep(0.1) end
+---
+...
+cfg.sharding[replicasets[1]].replicas[names.storage_1_a].master = false
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+while not test_run:grep_log('storage_1_a', 'Stop Lua GC', 1000) do fiber.sleep(0.1) end
+---
+...
test_run:switch('default')
---
- true
diff --git a/test/storage/garbage_collector.test.lua b/test/storage/garbage_collector.test.lua
index 1795296..26d6943 100644
--- a/test/storage/garbage_collector.test.lua
+++ b/test/storage/garbage_collector.test.lua
@@ -41,6 +41,44 @@ test_run:switch('storage_1_b')
while box.space._bucket:get{3} ~= nil do fiber.sleep(0.1) end
customer:select{}
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('storage_1_a')
+fiber = require('fiber')
+log = require('log')
+cfg.garbage_lua_collect_interval = 0.5
+vshard.storage.cfg(cfg, names.storage_1_a)
+while not test_run:grep_log('storage_1_a', 'Start Lua GC') do fiber.sleep(0.1) end
+-- Ensure that the GC is not started if it already exists.
+log.info(string.rep('a', 1000))
+vshard.storage.cfg(cfg, names.storage_1_a)
+fiber.sleep(0.5)
+test_run:grep_log('storage_1_a', 'Start Lua GC', 1000)
+-- Create a weak reference to a able {b = 100} - it must be
+-- deleted on the next GC.
+a = setmetatable({}, {__mode = 'v'})
+a.k = {b = 100}
+-- Wait until Lua GC deletes a.k.
+fiber.sleep(0.7)
+a.k
+cfg.garbage_lua_collect_interval = nil
+vshard.storage.cfg(cfg, names.storage_1_a)
+while not test_run:grep_log('storage_1_a', 'Stop Lua GC') do fiber.sleep(0.1) end
+a.k = {b = 100}
+fiber.sleep(0.7)
+a.k ~= nil
+
+-- Ensure the Lua garbage collector is stoped when the master
+-- becomes a slave.
+log.info(string.rep('a', 1000))
+cfg.garbage_lua_collect_interval = 0.5
+vshard.storage.cfg(cfg, names.storage_1_a)
+while not test_run:grep_log('storage_1_a', 'Start Lua GC', 1000) do fiber.sleep(0.1) end
+cfg.sharding[replicasets[1]].replicas[names.storage_1_a].master = false
+vshard.storage.cfg(cfg, names.storage_1_a)
+while not test_run:grep_log('storage_1_a', 'Stop Lua GC', 1000) do fiber.sleep(0.1) end
+
test_run:switch('default')
test_run:drop_cluster(REPLICASET_2)
test_run:drop_cluster(REPLICASET_1)
diff --git a/test/storage/reload.result b/test/storage/reload.result
index f689cf4..fcdddd6 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -62,7 +62,8 @@ test_run:cmd("setopt delimiter ';'")
...
function check_reloaded()
for k, v in pairs(old_internal) do
- if v == vshard.storage.internal[k] then
+ if v == vshard.storage.internal[k] and
+ k ~= 'collect_lua_garbage_f' then
return k
end
end
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 6e19a92..f3f8ec9 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -25,7 +25,8 @@ while test_run:grep_log('storage_2_a', 'The cluster is balanced ok') == nil do v
test_run:cmd("setopt delimiter ';'")
function check_reloaded()
for k, v in pairs(old_internal) do
- if v == vshard.storage.internal[k] then
+ if v == vshard.storage.internal[k] and
+ k ~= 'collect_lua_garbage_f' then
return k
end
end
diff --git a/test/unit/config.result b/test/unit/config.result
index e3d3cd9..f304bbb 100644
--- a/test/unit/config.result
+++ b/test/unit/config.result
@@ -370,3 +370,60 @@ cfg.shard_index = 'vbucket'
lcfg.check(cfg)
---
...
+--
+-- gh-77: garbage collection options.
+--
+cfg.garbage_collect_interval = 'str'
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = 0
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = -1
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = 100.5
+---
+...
+lcfg.check(cfg)
+---
+...
+cfg.garbage_lua_collect_interval = 'str'
+---
+...
+check(cfg)
+---
+- Garbage Lua collect interval must be positive number
+...
+cfg.garbage_lua_collect_interval = 0
+---
+...
+check(cfg)
+---
+- Garbage Lua collect interval must be positive number
+...
+cfg.garbage_lua_collect_interval = -1
+---
+...
+check(cfg)
+---
+- Garbage Lua collect interval must be positive number
+...
+cfg.garbage_lua_collect_interval = 100.5
+---
+...
+lcfg.check(cfg)
+---
+...
diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua
index add16bd..9f02dc4 100644
--- a/test/unit/config.test.lua
+++ b/test/unit/config.test.lua
@@ -147,3 +147,24 @@ cfg.shard_index = ''
check(cfg)
cfg.shard_index = 'vbucket'
lcfg.check(cfg)
+
+--
+-- gh-77: garbage collection options.
+--
+cfg.garbage_collect_interval = 'str'
+check(cfg)
+cfg.garbage_collect_interval = 0
+check(cfg)
+cfg.garbage_collect_interval = -1
+check(cfg)
+cfg.garbage_collect_interval = 100.5
+lcfg.check(cfg)
+
+cfg.garbage_lua_collect_interval = 'str'
+check(cfg)
+cfg.garbage_lua_collect_interval = 0
+check(cfg)
+cfg.garbage_lua_collect_interval = -1
+check(cfg)
+cfg.garbage_lua_collect_interval = 100.5
+lcfg.check(cfg)
diff --git a/test/unit/garbage.result b/test/unit/garbage.result
index 14e5721..83d7b9f 100644
--- a/test/unit/garbage.result
+++ b/test/unit/garbage.result
@@ -27,6 +27,9 @@ test_run:cmd("setopt delimiter ''");
vshard.storage.internal.shard_index = 'bucket_id'
---
...
+vshard.storage.internal.garbage_collect_interval = vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+---
+...
--
-- Find nothing if no bucket_id anywhere, or there is no index
-- by it, or bucket_id is not unsigned.
diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
index 3db44f8..0a5aca9 100644
--- a/test/unit/garbage.test.lua
+++ b/test/unit/garbage.test.lua
@@ -13,6 +13,7 @@ end;
test_run:cmd("setopt delimiter ''");
vshard.storage.internal.shard_index = 'bucket_id'
+vshard.storage.internal.garbage_collect_interval = vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
--
-- Find nothing if no bucket_id anywhere, or there is no index
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index 8d9ccbb..0828fa6 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -124,6 +124,18 @@ local function cfg_check(shard_cfg)
'positive integer')
end
end
+ if shard_cfg.garbage_collect_interval ~= nil then
+ local t = shard_cfg.garbage_collect_interval
+ if type(t) ~= 'number' or t <= 0 then
+ error('Garbage collect interval must be positive number')
+ end
+ end
+ if shard_cfg.garbage_lua_collect_interval ~= nil then
+ local t = shard_cfg.garbage_lua_collect_interval
+ if type(t) ~= 'number' or t <= 0 then
+ error('Garbage Lua collect interval must be positive number')
+ end
+ end
local uuids = {}
local uris = {}
for replicaset_uuid, replicaset in pairs(shard_cfg.sharding) do
@@ -156,6 +168,8 @@ local function prepare_for_box_cfg(cfg)
cfg.rebalancer_disbalance_threshold = nil
cfg.rebalancer_max_receiving = nil
cfg.shard_index = nil
+ cfg.garbage_collect_interval = nil
+ cfg.garbage_lua_collect_interval = nil
end
return {
diff --git a/vshard/consts.lua b/vshard/consts.lua
index bb215c3..9edbfad 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -32,7 +32,7 @@ return {
FAILOVER_DOWN_TIMEOUT = 1;
SYNC_TIMEOUT = 1;
RECONNECT_TIMEOUT = 0.5;
- GARBAGE_COLLECT_INTERVAL = 0.5;
+ DEFAULT_GARBAGE_COLLECT_INTERVAL = 0.5;
RECOVERY_INTERVAL = 5;
DISCOVERY_INTERVAL = 10;
}
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index f7e84a1..0053c77 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -24,6 +24,10 @@ if not M then
discovery_fiber = nil,
-- Bucket count stored on all replicasets.
total_bucket_count = 0,
+ -- Fiber to remove lua garbage.
+ garbage_lua_collect_fiber = nil,
+ -- Do lua garbage collection one per this time.
+ garbage_lua_collect_interval = nil,
-- This counter is used to restart background fibers with
-- new reloaded code.
module_version = 0,
@@ -485,6 +489,7 @@ local function router_cfg(cfg)
-- TODO: update existing route map in-place
M.route_map = {}
M.total_bucket_count = cfg.bucket_count or consts.DEFAULT_BUCKET_COUNT
+ M.garbage_lua_collect_interval = cfg.garbage_lua_collect_interval
lcfg.prepare_for_box_cfg(cfg)
-- Force net.box connection on cfg()
for _, replicaset in pairs(new_replicasets) do
@@ -507,6 +512,19 @@ local function router_cfg(cfg)
log.info('Start discovery fiber')
lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery')
end
+ if M.garbage_lua_collect_interval and
+ (not M.garbage_lua_collect_fiber or
+ M.garbage_lua_collect_fiber:status() == 'dead') then
+
+ M.garbage_lua_collect_fiber =
+ lfiber.create(util.reloadable_fiber_f, M, 'collect_lua_garbage_f',
+ 'Lua garbage collector', M)
+ log.info('Start Lua GC')
+ elseif M.garbage_lua_collect_fiber then
+ M.garbage_lua_collect_fiber:cancel()
+ M.garbage_lua_collect_fiber = nil
+ log.info('Stop Lua GC')
+ end
if old_replicasets then
lreplicaset.destroy(old_replicasets)
end
@@ -775,6 +793,7 @@ end
--
M.discovery_f = discovery_f
M.failover_f = failover_f
+M.collect_lua_garbage_f = util.collect_lua_garbage_f
if not rawget(_G, '__module_vshard_router') then
rawset(_G, '__module_vshard_router', M)
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index cb7efef..264aa95 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -22,6 +22,8 @@ if not M then
replicasets = nil,
-- Fiber to remove garbage buckets data.
garbage_collect_fiber = nil,
+ -- Fiber to remove lua garbage.
+ garbage_lua_collect_fiber = nil,
-- Bucket identifiers which are not active and are not being
-- sent - their status is unknown. Their state must be checked
-- periodically in recovery fiber.
@@ -54,6 +56,10 @@ if not M then
-- Maximal bucket count that can be received by a single
-- replicaset simultaneously.
rebalancer_max_receiving = 0,
+ -- Do buckets garbage collection once per this time.
+ garbage_collect_interval = nil,
+ -- Do lua garbage collection one per this time.
+ garbage_lua_collect_interval = nil,
errinj = {
ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
ERRINJ_RELOAD = false,
@@ -793,7 +799,7 @@ end
--
-- Garbage collector. Works on masters. The garbage collector
--- wakes up once per GARBAGE_COLLECT_INTERVAL seconds.
+-- wakes up once per specified time.
-- After wakeup it checks follows the plan:
-- 1) Check if _bucket has changed. If not, then sleep again;
-- 2) Scan user spaces for not existing, sent and garbage buckets,
@@ -847,7 +853,7 @@ function collect_garbage_f(module_version)
end
if not status then
log.error('Error during garbage collection step: %s', err)
- lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
+ lfiber.sleep(M.garbage_collect_interval)
goto continue
end
status, empty_sent_buckets = pcall(collect_garbage_update_bucket)
@@ -858,7 +864,7 @@ function collect_garbage_f(module_version)
log.error('Error during empty buckets processing: %s',
empty_sent_buckets)
control.bucket_generation = control.bucket_generation + 1
- lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
+ lfiber.sleep(M.garbage_collect_interval)
goto continue
end
end
@@ -878,7 +884,7 @@ function collect_garbage_f(module_version)
buckets_for_redirect_ts = lfiber.time()
end
end
- lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
+ lfiber.sleep(M.garbage_collect_interval)
::continue::
end
end
@@ -1323,6 +1329,9 @@ local function storage_cfg(cfg, this_replica_uuid)
M.rebalancer_max_receiving = cfg.rebalancer_max_receiving or
consts.DEFAULT_REBALANCER_MAX_RECEIVING
M.shard_index = cfg.shard_index or 'bucket_id'
+ M.garbage_collect_interval = cfg.garbage_collect_interval or
+ consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+ M.garbage_lua_collect_interval = cfg.garbage_lua_collect_interval
lcfg.prepare_for_box_cfg(cfg)
box.cfg(cfg)
@@ -1356,6 +1365,30 @@ local function storage_cfg(cfg, this_replica_uuid)
M.rebalancer_fiber:cancel()
M.rebalancer_fiber = nil
end
+
+ -- Enable periodic Lua garbage collection. It is not done in
+ -- master enable trigger, because this option is dynamic. The
+ -- Lua garbage collector can be turned on or off with no
+ -- switching master role.
+ if is_master and M.garbage_lua_collect_interval and
+ (not M.garbage_lua_collect_fiber or
+ M.garbage_lua_collect_fiber.status() == 'dead') then
+
+ M.garbage_lua_collect_fiber =
+ lfiber.create(util.reloadable_fiber_f, M, 'collect_lua_garbage_f',
+ 'Lua garbage collector', M)
+ log.info('Start Lua GC')
+ end
+ -- Cancel periodic Lua garbage collection if the option was
+ -- removed from the config, or if the master role is gone.
+ if M.garbage_lua_collect_fiber and
+ (not M.garbage_lua_collect_interval or
+ (was_master and not is_master)) then
+
+ M.garbage_lua_collect_fiber:cancel()
+ M.garbage_lua_collect_fiber = nil
+ log.info('Stop Lua GC')
+ end
if old_replicasets then
lreplicaset.destroy(old_replicasets)
end
@@ -1568,6 +1601,7 @@ M.find_sharded_spaces = find_sharded_spaces
M.find_garbage_bucket = find_garbage_bucket
M.collect_garbage_step = collect_garbage_step
M.collect_garbage_f = collect_garbage_f
+M.collect_lua_garbage_f = util.collect_lua_garbage_f
M.rebalancer_build_routes = rebalancer_build_routes
M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
diff --git a/vshard/util.lua b/vshard/util.lua
index f2feec1..4d95b5b 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -1,5 +1,6 @@
-- vshard.util
-log = require('log')
+local log = require('log')
+local fiber = require('fiber')
--
-- Extract parts of a tuple.
@@ -17,6 +18,19 @@ local function tuple_extract_key(tuple, parts)
return key
end
+--
+-- Background function for periodic Lua garbage collection. Both
+-- router and storage has several infinite background fibers which
+-- generates garbage. And this function periodically cleans it up.
+--
+local function collect_lua_garbage_f(module_version, M)
+ fiber.name('vshard.lua_gc')
+ while module_version == M.module_version do
+ collectgarbage()
+ fiber.sleep(M.garbage_lua_collect_interval)
+ end
+end
+
--
-- Wrapper to run @a func in infinite loop and restart it on the
-- module reload. This function CAN NOT BE AUTORELOADED. To update
@@ -34,13 +48,15 @@ end
-- "Rebalancer".
-- @param M Module which can reload.
--
-local function reloadable_fiber_f(M, func_name, worker_name)
+local function reloadable_fiber_f(M, func_name, worker_name, ...)
while true do
- local ok, err = pcall(M[func_name], M.module_version)
+ local ok, err = pcall(M[func_name], M.module_version, ...)
if not ok then
log.error('%s has been failed: %s', worker_name, err)
+ fiber.yield()
else
log.info('%s has been reloaded', worker_name)
+ fiber.yield()
end
end
end
@@ -48,4 +64,5 @@ end
return {
tuple_extract_key = tuple_extract_key,
reloadable_fiber_f = reloadable_fiber_f,
+ collect_lua_garbage_f = collect_lua_garbage_f,
}
--
2.14.3 (Apple Git-98)
More information about the Tarantool-patches
mailing list