[patches] [PATCH vshard 2/2] Enable automatic periodic lua garbage collection

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Feb 23 15:54:08 MSK 2018


When vshard a router or a storage is run for a long time, it
can generate many garbage by background fibers like bucket GC,
recovery etc.

Allow to specify in a config the option
garbage_lua_collect_interval to enable this lua GC.

Else enable to set garbage_collect_interval for bucket garbage
collector.

Closes #77

Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
 test/router/garbage_collector.result    | 101 ++++++++++++++++++++++++++++++++
 test/router/garbage_collector.test.lua  |  34 +++++++++++
 test/router/reload.result               |   3 +-
 test/router/reload.test.lua             |   3 +-
 test/storage/garbage_collector.result   |  94 +++++++++++++++++++++++++++++
 test/storage/garbage_collector.test.lua |  38 ++++++++++++
 test/storage/reload.result              |   3 +-
 test/storage/reload.test.lua            |   3 +-
 test/unit/config.result                 |  57 ++++++++++++++++++
 test/unit/config.test.lua               |  21 +++++++
 test/unit/garbage.result                |   3 +
 test/unit/garbage.test.lua              |   1 +
 vshard/cfg.lua                          |  14 +++++
 vshard/consts.lua                       |   2 +-
 vshard/router/init.lua                  |  19 ++++++
 vshard/storage/init.lua                 |  42 +++++++++++--
 vshard/util.lua                         |  23 +++++++-
 17 files changed, 449 insertions(+), 12 deletions(-)
 create mode 100644 test/router/garbage_collector.result
 create mode 100644 test/router/garbage_collector.test.lua

diff --git a/test/router/garbage_collector.result b/test/router/garbage_collector.result
new file mode 100644
index 0000000..5e6ea3a
--- /dev/null
+++ b/test/router/garbage_collector.result
@@ -0,0 +1,101 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'router')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'router')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+---
+- true
+...
+test_run:cmd("start server router_1")
+---
+- true
+...
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('router_1')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+cfg.garbage_lua_collect_interval = 0.5
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+while not test_run:grep_log('router_1', 'Start Lua GC') do fiber.sleep(0.1) end
+---
+...
+a = setmetatable({}, {__mode = 'v'})
+---
+...
+a.k = {b = 100}
+---
+...
+fiber.sleep(0.7)
+---
+...
+a.k
+---
+- null
+...
+cfg.garbage_lua_collect_interval = nil
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+while not test_run:grep_log('router_1', 'Stop Lua GC') do fiber.sleep(0.1) end
+---
+...
+a.k = {b = 100}
+---
+...
+fiber.sleep(0.7)
+---
+...
+a.k
+---
+- b: 100
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:cmd("stop server router_1")
+---
+- true
+...
+test_run:cmd("cleanup server router_1")
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
diff --git a/test/router/garbage_collector.test.lua b/test/router/garbage_collector.test.lua
new file mode 100644
index 0000000..be18974
--- /dev/null
+++ b/test/router/garbage_collector.test.lua
@@ -0,0 +1,34 @@
+test_run = require('test_run').new()
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+test_run:create_cluster(REPLICASET_1, 'router')
+test_run:create_cluster(REPLICASET_2, 'router')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+test_run:cmd("start server router_1")
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('router_1')
+fiber = require('fiber')
+cfg.garbage_lua_collect_interval = 0.5
+vshard.router.cfg(cfg)
+while not test_run:grep_log('router_1', 'Start Lua GC') do fiber.sleep(0.1) end
+a = setmetatable({}, {__mode = 'v'})
+a.k = {b = 100}
+fiber.sleep(0.7)
+a.k
+cfg.garbage_lua_collect_interval = nil
+vshard.router.cfg(cfg)
+while not test_run:grep_log('router_1', 'Stop Lua GC') do fiber.sleep(0.1) end
+a.k = {b = 100}
+fiber.sleep(0.7)
+a.k
+
+test_run:switch("default")
+test_run:cmd("stop server router_1")
+test_run:cmd("cleanup server router_1")
+test_run:drop_cluster(REPLICASET_1)
+test_run:drop_cluster(REPLICASET_2)
diff --git a/test/router/reload.result b/test/router/reload.result
index 19a9ead..d89ecde 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -65,7 +65,8 @@ test_run:cmd("setopt delimiter ';'")
 ...
 function check_reloaded()
 	for k, v in pairs(old_internal) do
-		if v == vshard.router.internal[k] then
+		if v == vshard.router.internal[k] and
+		   k ~= 'collect_lua_garbage_f' then
 			return k
 		end
 	end
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 6e21b74..64ec205 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -26,7 +26,8 @@ vshard.router.module_version()
 test_run:cmd("setopt delimiter ';'")
 function check_reloaded()
 	for k, v in pairs(old_internal) do
-		if v == vshard.router.internal[k] then
+		if v == vshard.router.internal[k] and
+		   k ~= 'collect_lua_garbage_f' then
 			return k
 		end
 	end
diff --git a/test/storage/garbage_collector.result b/test/storage/garbage_collector.result
index 5d5558c..5c9c50f 100644
--- a/test/storage/garbage_collector.result
+++ b/test/storage/garbage_collector.result
@@ -110,6 +110,100 @@ customer:select{}
 ---
 - - [10, 1, 'user1']
 ...
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('storage_1_a')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+log = require('log')
+---
+...
+cfg.garbage_lua_collect_interval = 0.5
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+while not test_run:grep_log('storage_1_a', 'Start Lua GC') do fiber.sleep(0.1) end
+---
+...
+-- Ensure that the GC is not started if it already exists.
+log.info(string.rep('a', 1000))
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+fiber.sleep(0.5)
+---
+...
+test_run:grep_log('storage_1_a', 'Start Lua GC', 1000)
+---
+- null
+...
+-- Create a weak reference to a able {b = 100} - it must be
+-- deleted on the next GC.
+a = setmetatable({}, {__mode = 'v'})
+---
+...
+a.k = {b = 100}
+---
+...
+-- Wait until Lua GC deletes a.k.
+fiber.sleep(0.7)
+---
+...
+a.k
+---
+- null
+...
+cfg.garbage_lua_collect_interval = nil
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+while not test_run:grep_log('storage_1_a', 'Stop Lua GC') do fiber.sleep(0.1) end
+---
+...
+a.k = {b = 100}
+---
+...
+fiber.sleep(0.7)
+---
+...
+a.k ~= nil
+---
+- true
+...
+-- Ensure the Lua garbage collector is stoped when the master
+-- becomes a slave.
+log.info(string.rep('a', 1000))
+---
+...
+cfg.garbage_lua_collect_interval = 0.5
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+while not test_run:grep_log('storage_1_a', 'Start Lua GC', 1000) do fiber.sleep(0.1) end
+---
+...
+cfg.sharding[replicasets[1]].replicas[names.storage_1_a].master = false
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+while not test_run:grep_log('storage_1_a', 'Stop Lua GC', 1000) do fiber.sleep(0.1) end
+---
+...
 test_run:switch('default')
 ---
 - true
diff --git a/test/storage/garbage_collector.test.lua b/test/storage/garbage_collector.test.lua
index 1795296..26d6943 100644
--- a/test/storage/garbage_collector.test.lua
+++ b/test/storage/garbage_collector.test.lua
@@ -41,6 +41,44 @@ test_run:switch('storage_1_b')
 while box.space._bucket:get{3} ~= nil do fiber.sleep(0.1) end
 customer:select{}
 
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('storage_1_a')
+fiber = require('fiber')
+log = require('log')
+cfg.garbage_lua_collect_interval = 0.5
+vshard.storage.cfg(cfg, names.storage_1_a)
+while not test_run:grep_log('storage_1_a', 'Start Lua GC') do fiber.sleep(0.1) end
+-- Ensure that the GC is not started if it already exists.
+log.info(string.rep('a', 1000))
+vshard.storage.cfg(cfg, names.storage_1_a)
+fiber.sleep(0.5)
+test_run:grep_log('storage_1_a', 'Start Lua GC', 1000)
+-- Create a weak reference to a able {b = 100} - it must be
+-- deleted on the next GC.
+a = setmetatable({}, {__mode = 'v'})
+a.k = {b = 100}
+-- Wait until Lua GC deletes a.k.
+fiber.sleep(0.7)
+a.k
+cfg.garbage_lua_collect_interval = nil
+vshard.storage.cfg(cfg, names.storage_1_a)
+while not test_run:grep_log('storage_1_a', 'Stop Lua GC') do fiber.sleep(0.1) end
+a.k = {b = 100}
+fiber.sleep(0.7)
+a.k ~= nil
+
+-- Ensure the Lua garbage collector is stoped when the master
+-- becomes a slave.
+log.info(string.rep('a', 1000))
+cfg.garbage_lua_collect_interval = 0.5
+vshard.storage.cfg(cfg, names.storage_1_a)
+while not test_run:grep_log('storage_1_a', 'Start Lua GC', 1000) do fiber.sleep(0.1) end
+cfg.sharding[replicasets[1]].replicas[names.storage_1_a].master = false
+vshard.storage.cfg(cfg, names.storage_1_a)
+while not test_run:grep_log('storage_1_a', 'Stop Lua GC', 1000) do fiber.sleep(0.1) end
+
 test_run:switch('default')
 test_run:drop_cluster(REPLICASET_2)
 test_run:drop_cluster(REPLICASET_1)
diff --git a/test/storage/reload.result b/test/storage/reload.result
index f689cf4..fcdddd6 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -62,7 +62,8 @@ test_run:cmd("setopt delimiter ';'")
 ...
 function check_reloaded()
 	for k, v in pairs(old_internal) do
-		if v == vshard.storage.internal[k] then
+		if v == vshard.storage.internal[k] and
+		   k ~= 'collect_lua_garbage_f' then
 			return k
 		end
 	end
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 6e19a92..f3f8ec9 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -25,7 +25,8 @@ while test_run:grep_log('storage_2_a', 'The cluster is balanced ok') == nil do v
 test_run:cmd("setopt delimiter ';'")
 function check_reloaded()
 	for k, v in pairs(old_internal) do
-		if v == vshard.storage.internal[k] then
+		if v == vshard.storage.internal[k] and
+		   k ~= 'collect_lua_garbage_f' then
 			return k
 		end
 	end
diff --git a/test/unit/config.result b/test/unit/config.result
index e3d3cd9..f304bbb 100644
--- a/test/unit/config.result
+++ b/test/unit/config.result
@@ -370,3 +370,60 @@ cfg.shard_index = 'vbucket'
 lcfg.check(cfg)
 ---
 ...
+--
+-- gh-77: garbage collection options.
+--
+cfg.garbage_collect_interval = 'str'
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = 0
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = -1
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = 100.5
+---
+...
+lcfg.check(cfg)
+---
+...
+cfg.garbage_lua_collect_interval = 'str'
+---
+...
+check(cfg)
+---
+- Garbage Lua collect interval must be positive number
+...
+cfg.garbage_lua_collect_interval = 0
+---
+...
+check(cfg)
+---
+- Garbage Lua collect interval must be positive number
+...
+cfg.garbage_lua_collect_interval = -1
+---
+...
+check(cfg)
+---
+- Garbage Lua collect interval must be positive number
+...
+cfg.garbage_lua_collect_interval = 100.5
+---
+...
+lcfg.check(cfg)
+---
+...
diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua
index add16bd..9f02dc4 100644
--- a/test/unit/config.test.lua
+++ b/test/unit/config.test.lua
@@ -147,3 +147,24 @@ cfg.shard_index = ''
 check(cfg)
 cfg.shard_index = 'vbucket'
 lcfg.check(cfg)
+
+--
+-- gh-77: garbage collection options.
+--
+cfg.garbage_collect_interval = 'str'
+check(cfg)
+cfg.garbage_collect_interval = 0
+check(cfg)
+cfg.garbage_collect_interval = -1
+check(cfg)
+cfg.garbage_collect_interval = 100.5
+lcfg.check(cfg)
+
+cfg.garbage_lua_collect_interval = 'str'
+check(cfg)
+cfg.garbage_lua_collect_interval = 0
+check(cfg)
+cfg.garbage_lua_collect_interval = -1
+check(cfg)
+cfg.garbage_lua_collect_interval = 100.5
+lcfg.check(cfg)
diff --git a/test/unit/garbage.result b/test/unit/garbage.result
index 14e5721..83d7b9f 100644
--- a/test/unit/garbage.result
+++ b/test/unit/garbage.result
@@ -27,6 +27,9 @@ test_run:cmd("setopt delimiter ''");
 vshard.storage.internal.shard_index = 'bucket_id'
 ---
 ...
+vshard.storage.internal.garbage_collect_interval = vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+---
+...
 --
 -- Find nothing if no bucket_id anywhere, or there is no index
 -- by it, or bucket_id is not unsigned.
diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
index 3db44f8..0a5aca9 100644
--- a/test/unit/garbage.test.lua
+++ b/test/unit/garbage.test.lua
@@ -13,6 +13,7 @@ end;
 test_run:cmd("setopt delimiter ''");
 
 vshard.storage.internal.shard_index = 'bucket_id'
+vshard.storage.internal.garbage_collect_interval = vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
 
 --
 -- Find nothing if no bucket_id anywhere, or there is no index
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index 8d9ccbb..0828fa6 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -124,6 +124,18 @@ local function cfg_check(shard_cfg)
                   'positive integer')
         end
     end
+    if shard_cfg.garbage_collect_interval ~= nil then
+        local t = shard_cfg.garbage_collect_interval
+        if type(t) ~= 'number' or t <= 0 then
+            error('Garbage collect interval must be positive number')
+        end
+    end
+    if shard_cfg.garbage_lua_collect_interval ~= nil then
+        local t = shard_cfg.garbage_lua_collect_interval
+        if type(t) ~= 'number' or t <= 0 then
+            error('Garbage Lua collect interval must be positive number')
+        end
+    end
     local uuids = {}
     local uris = {}
     for replicaset_uuid, replicaset in pairs(shard_cfg.sharding) do
@@ -156,6 +168,8 @@ local function prepare_for_box_cfg(cfg)
     cfg.rebalancer_disbalance_threshold = nil
     cfg.rebalancer_max_receiving = nil
     cfg.shard_index = nil
+    cfg.garbage_collect_interval = nil
+    cfg.garbage_lua_collect_interval = nil
 end
 
 return {
diff --git a/vshard/consts.lua b/vshard/consts.lua
index bb215c3..9edbfad 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -32,7 +32,7 @@ return {
     FAILOVER_DOWN_TIMEOUT = 1;
     SYNC_TIMEOUT = 1;
     RECONNECT_TIMEOUT = 0.5;
-    GARBAGE_COLLECT_INTERVAL = 0.5;
+    DEFAULT_GARBAGE_COLLECT_INTERVAL = 0.5;
     RECOVERY_INTERVAL = 5;
     DISCOVERY_INTERVAL = 10;
 }
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index f7e84a1..0053c77 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -24,6 +24,10 @@ if not M then
         discovery_fiber = nil,
         -- Bucket count stored on all replicasets.
         total_bucket_count = 0,
+        -- Fiber to remove lua garbage.
+        garbage_lua_collect_fiber = nil,
+        -- Do lua garbage collection one per this time.
+        garbage_lua_collect_interval = nil,
         -- This counter is used to restart background fibers with
         -- new reloaded code.
         module_version = 0,
@@ -485,6 +489,7 @@ local function router_cfg(cfg)
     -- TODO: update existing route map in-place
     M.route_map = {}
     M.total_bucket_count = cfg.bucket_count or consts.DEFAULT_BUCKET_COUNT
+    M.garbage_lua_collect_interval = cfg.garbage_lua_collect_interval
     lcfg.prepare_for_box_cfg(cfg)
     -- Force net.box connection on cfg()
     for _, replicaset in pairs(new_replicasets) do
@@ -507,6 +512,19 @@ local function router_cfg(cfg)
         log.info('Start discovery fiber')
         lfiber.create(util.reloadable_fiber_f, M, 'discovery_f', 'Discovery')
     end
+    if M.garbage_lua_collect_interval and
+       (not M.garbage_lua_collect_fiber or
+        M.garbage_lua_collect_fiber:status() == 'dead') then
+
+        M.garbage_lua_collect_fiber =
+            lfiber.create(util.reloadable_fiber_f, M, 'collect_lua_garbage_f',
+                          'Lua garbage collector', M)
+        log.info('Start Lua GC')
+    elseif M.garbage_lua_collect_fiber then
+        M.garbage_lua_collect_fiber:cancel()
+        M.garbage_lua_collect_fiber = nil
+        log.info('Stop Lua GC')
+    end
     if old_replicasets then
         lreplicaset.destroy(old_replicasets)
     end
@@ -775,6 +793,7 @@ end
 --
 M.discovery_f = discovery_f
 M.failover_f = failover_f
+M.collect_lua_garbage_f = util.collect_lua_garbage_f
 
 if not rawget(_G, '__module_vshard_router') then
     rawset(_G, '__module_vshard_router', M)
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index cb7efef..264aa95 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -22,6 +22,8 @@ if not M then
         replicasets = nil,
         -- Fiber to remove garbage buckets data.
         garbage_collect_fiber = nil,
+        -- Fiber to remove lua garbage.
+        garbage_lua_collect_fiber = nil,
         -- Bucket identifiers which are not active and are not being
         -- sent - their status is unknown. Their state must be checked
         -- periodically in recovery fiber.
@@ -54,6 +56,10 @@ if not M then
         -- Maximal bucket count that can be received by a single
         -- replicaset simultaneously.
         rebalancer_max_receiving = 0,
+        -- Do buckets garbage collection once per this time.
+        garbage_collect_interval = nil,
+        -- Do lua garbage collection one per this time.
+        garbage_lua_collect_interval = nil,
         errinj = {
             ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
             ERRINJ_RELOAD = false,
@@ -793,7 +799,7 @@ end
 
 --
 -- Garbage collector. Works on masters. The garbage collector
--- wakes up once per GARBAGE_COLLECT_INTERVAL seconds.
+-- wakes up once per specified time.
 -- After wakeup it checks follows the plan:
 -- 1) Check if _bucket has changed. If not, then sleep again;
 -- 2) Scan user spaces for not existing, sent and garbage buckets,
@@ -847,7 +853,7 @@ function collect_garbage_f(module_version)
             end
             if not status then
                 log.error('Error during garbage collection step: %s', err)
-                lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
+                lfiber.sleep(M.garbage_collect_interval)
                 goto continue
             end
             status, empty_sent_buckets = pcall(collect_garbage_update_bucket)
@@ -858,7 +864,7 @@ function collect_garbage_f(module_version)
                 log.error('Error during empty buckets processing: %s',
                           empty_sent_buckets)
                 control.bucket_generation = control.bucket_generation + 1
-                lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
+                lfiber.sleep(M.garbage_collect_interval)
                 goto continue
             end
         end
@@ -878,7 +884,7 @@ function collect_garbage_f(module_version)
                 buckets_for_redirect_ts = lfiber.time()
             end
         end
-        lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
+        lfiber.sleep(M.garbage_collect_interval)
 ::continue::
     end
 end
@@ -1323,6 +1329,9 @@ local function storage_cfg(cfg, this_replica_uuid)
     M.rebalancer_max_receiving = cfg.rebalancer_max_receiving or
                                     consts.DEFAULT_REBALANCER_MAX_RECEIVING
     M.shard_index = cfg.shard_index or 'bucket_id'
+    M.garbage_collect_interval = cfg.garbage_collect_interval or
+                                 consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+    M.garbage_lua_collect_interval = cfg.garbage_lua_collect_interval
     lcfg.prepare_for_box_cfg(cfg)
 
     box.cfg(cfg)
@@ -1356,6 +1365,30 @@ local function storage_cfg(cfg, this_replica_uuid)
         M.rebalancer_fiber:cancel()
         M.rebalancer_fiber = nil
     end
+
+    -- Enable periodic Lua garbage collection. It is not done in
+    -- master enable trigger, because this option is dynamic. The
+    -- Lua garbage collector can be turned on or off with no
+    -- switching master role.
+    if is_master and M.garbage_lua_collect_interval and
+       (not M.garbage_lua_collect_fiber or
+        M.garbage_lua_collect_fiber.status() == 'dead') then
+
+        M.garbage_lua_collect_fiber =
+            lfiber.create(util.reloadable_fiber_f, M, 'collect_lua_garbage_f',
+                          'Lua garbage collector', M)
+        log.info('Start Lua GC')
+    end
+    -- Cancel periodic Lua garbage collection if the option was
+    -- removed from the config, or if the master role is gone.
+    if M.garbage_lua_collect_fiber and
+       (not M.garbage_lua_collect_interval or
+        (was_master and not is_master)) then
+
+        M.garbage_lua_collect_fiber:cancel()
+        M.garbage_lua_collect_fiber = nil
+        log.info('Stop Lua GC')
+    end
     if old_replicasets then
         lreplicaset.destroy(old_replicasets)
     end
@@ -1568,6 +1601,7 @@ M.find_sharded_spaces = find_sharded_spaces
 M.find_garbage_bucket = find_garbage_bucket
 M.collect_garbage_step = collect_garbage_step
 M.collect_garbage_f = collect_garbage_f
+M.collect_lua_garbage_f = util.collect_lua_garbage_f
 M.rebalancer_build_routes = rebalancer_build_routes
 M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
 
diff --git a/vshard/util.lua b/vshard/util.lua
index f2feec1..4d95b5b 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -1,5 +1,6 @@
 -- vshard.util
-log = require('log')
+local log = require('log')
+local fiber = require('fiber')
 
 --
 -- Extract parts of a tuple.
@@ -17,6 +18,19 @@ local function tuple_extract_key(tuple, parts)
     return key
 end
 
+--
+-- Background function for periodic Lua garbage collection. Both
+-- router and storage has several infinite background fibers which
+-- generates garbage. And this function periodically cleans it up.
+--
+local function collect_lua_garbage_f(module_version, M)
+    fiber.name('vshard.lua_gc')
+    while module_version == M.module_version do
+        collectgarbage()
+        fiber.sleep(M.garbage_lua_collect_interval)
+    end
+end
+
 --
 -- Wrapper to run @a func in infinite loop and restart it on the
 -- module reload. This function CAN NOT BE AUTORELOADED. To update
@@ -34,13 +48,15 @@ end
 --        "Rebalancer".
 -- @param M Module which can reload.
 --
-local function reloadable_fiber_f(M, func_name, worker_name)
+local function reloadable_fiber_f(M, func_name, worker_name, ...)
     while true do
-        local ok, err = pcall(M[func_name], M.module_version)
+        local ok, err = pcall(M[func_name], M.module_version, ...)
         if not ok then
             log.error('%s has been failed: %s', worker_name, err)
+            fiber.yield()
         else
             log.info('%s has been reloaded', worker_name)
+            fiber.yield()
         end
     end
 end
@@ -48,4 +64,5 @@ end
 return {
     tuple_extract_key = tuple_extract_key,
     reloadable_fiber_f = reloadable_fiber_f,
+    collect_lua_garbage_f = collect_lua_garbage_f,
 }
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list