[patches] [PATCH vshard 3/4] Allow auto lua GC and customize bucket garbage collector

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Mon Feb 26 13:08:55 MSK 2018


When a vshard instance is running long enought, it accumulates
many lua GC produced by background fibers like bucket GC,
discovery, recovery, rebalancer.

Allow to specify in a configuration nececcity to do automatic
periodic lua garbage collection, when an user does not want to do
it himself.

Closes #77

Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
 test/router/garbage_collector.result    | 98 +++++++++++++++++++++++++++++++++
 test/router/garbage_collector.test.lua  | 33 +++++++++++
 test/storage/garbage_collector.result   | 54 ++++++++++++++++++
 test/storage/garbage_collector.test.lua | 22 ++++++++
 test/unit/config.result                 | 49 +++++++++++++++++
 test/unit/config.test.lua               | 19 +++++++
 test/unit/garbage.result                |  3 +
 test/unit/garbage.test.lua              |  1 +
 vshard/cfg.lua                          | 12 ++++
 vshard/consts.lua                       |  3 +-
 vshard/router/init.lua                  | 12 ++++
 vshard/storage/init.lua                 | 37 +++++++++++--
 12 files changed, 336 insertions(+), 7 deletions(-)
 create mode 100644 test/router/garbage_collector.result
 create mode 100644 test/router/garbage_collector.test.lua

diff --git a/test/router/garbage_collector.result b/test/router/garbage_collector.result
new file mode 100644
index 0000000..3c2a4f1
--- /dev/null
+++ b/test/router/garbage_collector.result
@@ -0,0 +1,98 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'router')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'router')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+---
+- true
+...
+test_run:cmd("start server router_1")
+---
+- true
+...
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('router_1')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+cfg.collect_lua_garbage = true
+---
+...
+iters = vshard.consts.COLLECT_LUA_GARBAGE_INTERVAL / vshard.consts.DISCOVERY_INTERVAL
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+a = setmetatable({}, {__mode = 'v'})
+---
+...
+a.k = {b = 100}
+---
+...
+for i = 1, iters + 1 do vshard.router.discovery_wakeup() fiber.sleep(0.01) end
+---
+...
+a.k
+---
+- null
+...
+cfg.collect_lua_garbage = false
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+a.k = {b = 100}
+---
+...
+for i = 1, iters + 1 do vshard.router.discovery_wakeup() fiber.sleep(0.01) end
+---
+...
+a.k ~= nil
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:cmd("stop server router_1")
+---
+- true
+...
+test_run:cmd("cleanup server router_1")
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
diff --git a/test/router/garbage_collector.test.lua b/test/router/garbage_collector.test.lua
new file mode 100644
index 0000000..b3411cd
--- /dev/null
+++ b/test/router/garbage_collector.test.lua
@@ -0,0 +1,33 @@
+test_run = require('test_run').new()
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+test_run:create_cluster(REPLICASET_1, 'router')
+test_run:create_cluster(REPLICASET_2, 'router')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+test_run:cmd("start server router_1")
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('router_1')
+fiber = require('fiber')
+cfg.collect_lua_garbage = true
+iters = vshard.consts.COLLECT_LUA_GARBAGE_INTERVAL / vshard.consts.DISCOVERY_INTERVAL
+vshard.router.cfg(cfg)
+a = setmetatable({}, {__mode = 'v'})
+a.k = {b = 100}
+for i = 1, iters + 1 do vshard.router.discovery_wakeup() fiber.sleep(0.01) end
+a.k
+cfg.collect_lua_garbage = false
+vshard.router.cfg(cfg)
+a.k = {b = 100}
+for i = 1, iters + 1 do vshard.router.discovery_wakeup() fiber.sleep(0.01) end
+a.k ~= nil
+
+test_run:switch("default")
+test_run:cmd("stop server router_1")
+test_run:cmd("cleanup server router_1")
+test_run:drop_cluster(REPLICASET_1)
+test_run:drop_cluster(REPLICASET_2)
diff --git a/test/storage/garbage_collector.result b/test/storage/garbage_collector.result
index 5d5558c..81c4ac1 100644
--- a/test/storage/garbage_collector.result
+++ b/test/storage/garbage_collector.result
@@ -110,6 +110,60 @@ customer:select{}
 ---
 - - [10, 1, 'user1']
 ...
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('storage_1_a')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+log = require('log')
+---
+...
+cfg.collect_lua_garbage = true
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+-- Create a weak reference to a able {b = 100} - it must be
+-- deleted on the next GC.
+a = setmetatable({}, {__mode = 'v'})
+---
+...
+a.k = {b = 100}
+---
+...
+iters = vshard.consts.COLLECT_LUA_GARBAGE_INTERVAL / vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+---
+...
+-- Wait until Lua GC deletes a.k.
+for i = 1, iters + 1 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.01) end
+---
+...
+a.k
+---
+- null
+...
+cfg.collect_lua_garbage = false
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+a.k = {b = 100}
+---
+...
+for i = 1, iters + 1 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.01) end
+---
+...
+a.k ~= nil
+---
+- true
+...
 test_run:switch('default')
 ---
 - true
diff --git a/test/storage/garbage_collector.test.lua b/test/storage/garbage_collector.test.lua
index 1795296..4b46f79 100644
--- a/test/storage/garbage_collector.test.lua
+++ b/test/storage/garbage_collector.test.lua
@@ -41,6 +41,28 @@ test_run:switch('storage_1_b')
 while box.space._bucket:get{3} ~= nil do fiber.sleep(0.1) end
 customer:select{}
 
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('storage_1_a')
+fiber = require('fiber')
+log = require('log')
+cfg.collect_lua_garbage = true
+vshard.storage.cfg(cfg, names.storage_1_a)
+-- Create a weak reference to a able {b = 100} - it must be
+-- deleted on the next GC.
+a = setmetatable({}, {__mode = 'v'})
+a.k = {b = 100}
+iters = vshard.consts.COLLECT_LUA_GARBAGE_INTERVAL / vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+-- Wait until Lua GC deletes a.k.
+for i = 1, iters + 1 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.01) end
+a.k
+cfg.collect_lua_garbage = false
+vshard.storage.cfg(cfg, names.storage_1_a)
+a.k = {b = 100}
+for i = 1, iters + 1 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.01) end
+a.k ~= nil
+
 test_run:switch('default')
 test_run:drop_cluster(REPLICASET_2)
 test_run:drop_cluster(REPLICASET_1)
diff --git a/test/unit/config.result b/test/unit/config.result
index e3d3cd9..33b1094 100644
--- a/test/unit/config.result
+++ b/test/unit/config.result
@@ -370,3 +370,52 @@ cfg.shard_index = 'vbucket'
 lcfg.check(cfg)
 ---
 ...
+--
+-- gh-77: garbage collection options.
+--
+cfg.garbage_collect_interval = 'str'
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = 0
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = -1
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = 100.5
+---
+...
+lcfg.check(cfg)
+---
+...
+cfg.collect_lua_garbage = 100
+---
+...
+check(cfg)
+---
+- Collect Lua garbage must be either true or false
+...
+cfg.collect_lua_garbage = true
+---
+...
+lcfg.check(cfg)
+---
+...
+cfg.collect_lua_garbage = false
+---
+...
+lcfg.check(cfg)
+---
+...
diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua
index add16bd..333d165 100644
--- a/test/unit/config.test.lua
+++ b/test/unit/config.test.lua
@@ -147,3 +147,22 @@ cfg.shard_index = ''
 check(cfg)
 cfg.shard_index = 'vbucket'
 lcfg.check(cfg)
+
+--
+-- gh-77: garbage collection options.
+--
+cfg.garbage_collect_interval = 'str'
+check(cfg)
+cfg.garbage_collect_interval = 0
+check(cfg)
+cfg.garbage_collect_interval = -1
+check(cfg)
+cfg.garbage_collect_interval = 100.5
+lcfg.check(cfg)
+
+cfg.collect_lua_garbage = 100
+check(cfg)
+cfg.collect_lua_garbage = true
+lcfg.check(cfg)
+cfg.collect_lua_garbage = false
+lcfg.check(cfg)
diff --git a/test/unit/garbage.result b/test/unit/garbage.result
index 14e5721..83d7b9f 100644
--- a/test/unit/garbage.result
+++ b/test/unit/garbage.result
@@ -27,6 +27,9 @@ test_run:cmd("setopt delimiter ''");
 vshard.storage.internal.shard_index = 'bucket_id'
 ---
 ...
+vshard.storage.internal.garbage_collect_interval = vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+---
+...
 --
 -- Find nothing if no bucket_id anywhere, or there is no index
 -- by it, or bucket_id is not unsigned.
diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
index 3db44f8..0a5aca9 100644
--- a/test/unit/garbage.test.lua
+++ b/test/unit/garbage.test.lua
@@ -13,6 +13,7 @@ end;
 test_run:cmd("setopt delimiter ''");
 
 vshard.storage.internal.shard_index = 'bucket_id'
+vshard.storage.internal.garbage_collect_interval = vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
 
 --
 -- Find nothing if no bucket_id anywhere, or there is no index
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index 8d9ccbb..7df68fc 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -124,6 +124,16 @@ local function cfg_check(shard_cfg)
                   'positive integer')
         end
     end
+    if shard_cfg.garbage_collect_interval ~= nil then
+        local t = shard_cfg.garbage_collect_interval
+        if type(t) ~= 'number' or t <= 0 then
+            error('Garbage collect interval must be positive number')
+        end
+    end
+    if shard_cfg.collect_lua_garbage ~= nil and
+       type(shard_cfg.collect_lua_garbage) ~= 'boolean' then
+        error('Collect Lua garbage must be either true or false')
+    end
     local uuids = {}
     local uris = {}
     for replicaset_uuid, replicaset in pairs(shard_cfg.sharding) do
@@ -156,6 +166,8 @@ local function prepare_for_box_cfg(cfg)
     cfg.rebalancer_disbalance_threshold = nil
     cfg.rebalancer_max_receiving = nil
     cfg.shard_index = nil
+    cfg.garbage_collect_interval = nil
+    cfg.collect_lua_garbage = nil
 end
 
 return {
diff --git a/vshard/consts.lua b/vshard/consts.lua
index bb215c3..ff6f6b0 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -32,7 +32,8 @@ return {
     FAILOVER_DOWN_TIMEOUT = 1;
     SYNC_TIMEOUT = 1;
     RECONNECT_TIMEOUT = 0.5;
-    GARBAGE_COLLECT_INTERVAL = 0.5;
+    DEFAULT_GARBAGE_COLLECT_INTERVAL = 0.5;
     RECOVERY_INTERVAL = 5;
+    COLLECT_LUA_GARBAGE_INTERVAL = 100;
     DISCOVERY_INTERVAL = 10;
 }
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index f7e84a1..abad12c 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -24,6 +24,9 @@ if not M then
         discovery_fiber = nil,
         -- Bucket count stored on all replicasets.
         total_bucket_count = 0,
+        -- If true, then discovery fiber starts to call
+        -- collectgarbage() periodically.
+        collect_lua_garbage = nil,
         -- This counter is used to restart background fibers with
         -- new reloaded code.
         module_version = 0,
@@ -123,6 +126,8 @@ end
 local function discovery_f(module_version)
     lfiber.name('discovery_fiber')
     M.discovery_fiber = lfiber.self()
+    local iterations_until_lua_gc =
+        consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
     while module_version == M.module_version do
         for _, replicaset in pairs(M.replicasets) do
             local active_buckets, err =
@@ -141,6 +146,12 @@ local function discovery_f(module_version)
                     M.route_map[bucket_id] = replicaset
                 end
             end
+            iterations_until_lua_gc = iterations_until_lua_gc - 1
+            if M.collect_lua_garbage and iterations_until_lua_gc == 0 then
+                iterations_until_lua_gc =
+                    consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
+                collectgarbage()
+            end
             lfiber.sleep(consts.DISCOVERY_INTERVAL)
         end
     end
@@ -485,6 +496,7 @@ local function router_cfg(cfg)
     -- TODO: update existing route map in-place
     M.route_map = {}
     M.total_bucket_count = cfg.bucket_count or consts.DEFAULT_BUCKET_COUNT
+    M.collect_lua_garbage = cfg.collect_lua_garbage
     lcfg.prepare_for_box_cfg(cfg)
     -- Force net.box connection on cfg()
     for _, replicaset in pairs(new_replicasets) do
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index cb7efef..499523a 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -48,12 +48,17 @@ if not M then
         shard_index = nil,
         -- Bucket count stored on all replicasets.
         total_bucket_count = 0,
+        -- If true, then bucket garbage collection fiber starts to
+        -- call collectgarbage() periodically.
+        collect_lua_garbage = nil,
         -- Maximal allowed percent deviation of bucket count on a
         -- replicaset from ethalon bucket count.
         rebalancer_disbalance_threshold = 0,
         -- Maximal bucket count that can be received by a single
         -- replicaset simultaneously.
         rebalancer_max_receiving = 0,
+        -- Do buckets garbage collection once per this time.
+        garbage_collect_interval = nil,
         errinj = {
             ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
             ERRINJ_RELOAD = false,
@@ -793,7 +798,7 @@ end
 
 --
 -- Garbage collector. Works on masters. The garbage collector
--- wakes up once per GARBAGE_COLLECT_INTERVAL seconds.
+-- wakes up once per specified time.
 -- After wakeup it checks follows the plan:
 -- 1) Check if _bucket has changed. If not, then sleep again;
 -- 2) Scan user spaces for not existing, sent and garbage buckets,
@@ -837,6 +842,8 @@ function collect_garbage_f(module_version)
     -- buckets_for_redirect is deleted, it gets empty_sent_buckets
     -- for next deletion.
     local empty_sent_buckets = {}
+    local iterations_until_lua_gc =
+        consts.COLLECT_LUA_GARBAGE_INTERVAL / M.garbage_collect_interval
 
     while M.module_version == module_version do
         -- Check if no changes in buckets configuration.
@@ -847,7 +854,6 @@ function collect_garbage_f(module_version)
             end
             if not status then
                 log.error('Error during garbage collection step: %s', err)
-                lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
                 goto continue
             end
             status, empty_sent_buckets = pcall(collect_garbage_update_bucket)
@@ -858,12 +864,11 @@ function collect_garbage_f(module_version)
                 log.error('Error during empty buckets processing: %s',
                           empty_sent_buckets)
                 control.bucket_generation = control.bucket_generation + 1
-                lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
                 goto continue
             end
         end
-        local duration = lfiber.time() - buckets_for_redirect_ts
-        if duration >= consts.BUCKET_SENT_GARBAGE_DELAY then
+        if lfiber.time() - buckets_for_redirect_ts >=
+           consts.BUCKET_SENT_GARBAGE_DELAY then
             local status, err = pcall(collect_garbage_drop_redirects,
                                       buckets_for_redirect)
             if M.module_version ~= module_version then
@@ -878,8 +883,23 @@ function collect_garbage_f(module_version)
                 buckets_for_redirect_ts = lfiber.time()
             end
         end
-        lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
 ::continue::
+        iterations_until_lua_gc = iterations_until_lua_gc - 1
+        if iterations_until_lua_gc == 0 and M.collect_lua_garbage then
+            iterations_until_lua_gc =
+                consts.COLLECT_LUA_GARBAGE_INTERVAL / M.garbage_collect_interval
+            collectgarbage()
+        end
+        lfiber.sleep(M.garbage_collect_interval)
+    end
+end
+
+--
+-- Immediately wakeup bucket garbage collector.
+--
+local function garbage_collector_wakeup()
+    if M.garbage_collect_fiber then
+        M.garbage_collect_fiber:wakeup()
     end
 end
 
@@ -1323,6 +1343,9 @@ local function storage_cfg(cfg, this_replica_uuid)
     M.rebalancer_max_receiving = cfg.rebalancer_max_receiving or
                                     consts.DEFAULT_REBALANCER_MAX_RECEIVING
     M.shard_index = cfg.shard_index or 'bucket_id'
+    M.garbage_collect_interval = cfg.garbage_collect_interval or
+                                 consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+    M.collect_lua_garbage = cfg.collect_lua_garbage
     lcfg.prepare_for_box_cfg(cfg)
 
     box.cfg(cfg)
@@ -1356,6 +1379,7 @@ local function storage_cfg(cfg, this_replica_uuid)
         M.rebalancer_fiber:cancel()
         M.rebalancer_fiber = nil
     end
+
     if old_replicasets then
         lreplicaset.destroy(old_replicasets)
     end
@@ -1586,6 +1610,7 @@ return {
     bucket_send = bucket_send;
     bucket_stat = bucket_stat;
     bucket_delete_garbage = bucket_delete_garbage;
+    garbage_collector_wakeup = garbage_collector_wakeup;
     rebalancer_wakeup = rebalancer_wakeup;
     rebalancer_apply_routes = rebalancer_apply_routes;
     rebalancer_disable = rebalancer_disable;
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list