[patches] [PATCH vshard 3/4] Allow auto lua GC and customize bucket garbage collector
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Mon Feb 26 13:08:55 MSK 2018
When a vshard instance is running long enought, it accumulates
many lua GC produced by background fibers like bucket GC,
discovery, recovery, rebalancer.
Allow to specify in a configuration nececcity to do automatic
periodic lua garbage collection, when an user does not want to do
it himself.
Closes #77
Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
test/router/garbage_collector.result | 98 +++++++++++++++++++++++++++++++++
test/router/garbage_collector.test.lua | 33 +++++++++++
test/storage/garbage_collector.result | 54 ++++++++++++++++++
test/storage/garbage_collector.test.lua | 22 ++++++++
test/unit/config.result | 49 +++++++++++++++++
test/unit/config.test.lua | 19 +++++++
test/unit/garbage.result | 3 +
test/unit/garbage.test.lua | 1 +
vshard/cfg.lua | 12 ++++
vshard/consts.lua | 3 +-
vshard/router/init.lua | 12 ++++
vshard/storage/init.lua | 37 +++++++++++--
12 files changed, 336 insertions(+), 7 deletions(-)
create mode 100644 test/router/garbage_collector.result
create mode 100644 test/router/garbage_collector.test.lua
diff --git a/test/router/garbage_collector.result b/test/router/garbage_collector.result
new file mode 100644
index 0000000..3c2a4f1
--- /dev/null
+++ b/test/router/garbage_collector.result
@@ -0,0 +1,98 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'router')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'router')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+---
+- true
+...
+test_run:cmd("start server router_1")
+---
+- true
+...
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('router_1')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+cfg.collect_lua_garbage = true
+---
+...
+iters = vshard.consts.COLLECT_LUA_GARBAGE_INTERVAL / vshard.consts.DISCOVERY_INTERVAL
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+a = setmetatable({}, {__mode = 'v'})
+---
+...
+a.k = {b = 100}
+---
+...
+for i = 1, iters + 1 do vshard.router.discovery_wakeup() fiber.sleep(0.01) end
+---
+...
+a.k
+---
+- null
+...
+cfg.collect_lua_garbage = false
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+a.k = {b = 100}
+---
+...
+for i = 1, iters + 1 do vshard.router.discovery_wakeup() fiber.sleep(0.01) end
+---
+...
+a.k ~= nil
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:cmd("stop server router_1")
+---
+- true
+...
+test_run:cmd("cleanup server router_1")
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
diff --git a/test/router/garbage_collector.test.lua b/test/router/garbage_collector.test.lua
new file mode 100644
index 0000000..b3411cd
--- /dev/null
+++ b/test/router/garbage_collector.test.lua
@@ -0,0 +1,33 @@
+test_run = require('test_run').new()
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+test_run:create_cluster(REPLICASET_1, 'router')
+test_run:create_cluster(REPLICASET_2, 'router')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+test_run:cmd("start server router_1")
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('router_1')
+fiber = require('fiber')
+cfg.collect_lua_garbage = true
+iters = vshard.consts.COLLECT_LUA_GARBAGE_INTERVAL / vshard.consts.DISCOVERY_INTERVAL
+vshard.router.cfg(cfg)
+a = setmetatable({}, {__mode = 'v'})
+a.k = {b = 100}
+for i = 1, iters + 1 do vshard.router.discovery_wakeup() fiber.sleep(0.01) end
+a.k
+cfg.collect_lua_garbage = false
+vshard.router.cfg(cfg)
+a.k = {b = 100}
+for i = 1, iters + 1 do vshard.router.discovery_wakeup() fiber.sleep(0.01) end
+a.k ~= nil
+
+test_run:switch("default")
+test_run:cmd("stop server router_1")
+test_run:cmd("cleanup server router_1")
+test_run:drop_cluster(REPLICASET_1)
+test_run:drop_cluster(REPLICASET_2)
diff --git a/test/storage/garbage_collector.result b/test/storage/garbage_collector.result
index 5d5558c..81c4ac1 100644
--- a/test/storage/garbage_collector.result
+++ b/test/storage/garbage_collector.result
@@ -110,6 +110,60 @@ customer:select{}
---
- - [10, 1, 'user1']
...
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('storage_1_a')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+log = require('log')
+---
+...
+cfg.collect_lua_garbage = true
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+-- Create a weak reference to a able {b = 100} - it must be
+-- deleted on the next GC.
+a = setmetatable({}, {__mode = 'v'})
+---
+...
+a.k = {b = 100}
+---
+...
+iters = vshard.consts.COLLECT_LUA_GARBAGE_INTERVAL / vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+---
+...
+-- Wait until Lua GC deletes a.k.
+for i = 1, iters + 1 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.01) end
+---
+...
+a.k
+---
+- null
+...
+cfg.collect_lua_garbage = false
+---
+...
+vshard.storage.cfg(cfg, names.storage_1_a)
+---
+...
+a.k = {b = 100}
+---
+...
+for i = 1, iters + 1 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.01) end
+---
+...
+a.k ~= nil
+---
+- true
+...
test_run:switch('default')
---
- true
diff --git a/test/storage/garbage_collector.test.lua b/test/storage/garbage_collector.test.lua
index 1795296..4b46f79 100644
--- a/test/storage/garbage_collector.test.lua
+++ b/test/storage/garbage_collector.test.lua
@@ -41,6 +41,28 @@ test_run:switch('storage_1_b')
while box.space._bucket:get{3} ~= nil do fiber.sleep(0.1) end
customer:select{}
+--
+-- gh-77: garbage collection options and Lua garbage collection.
+--
+test_run:switch('storage_1_a')
+fiber = require('fiber')
+log = require('log')
+cfg.collect_lua_garbage = true
+vshard.storage.cfg(cfg, names.storage_1_a)
+-- Create a weak reference to a able {b = 100} - it must be
+-- deleted on the next GC.
+a = setmetatable({}, {__mode = 'v'})
+a.k = {b = 100}
+iters = vshard.consts.COLLECT_LUA_GARBAGE_INTERVAL / vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+-- Wait until Lua GC deletes a.k.
+for i = 1, iters + 1 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.01) end
+a.k
+cfg.collect_lua_garbage = false
+vshard.storage.cfg(cfg, names.storage_1_a)
+a.k = {b = 100}
+for i = 1, iters + 1 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.01) end
+a.k ~= nil
+
test_run:switch('default')
test_run:drop_cluster(REPLICASET_2)
test_run:drop_cluster(REPLICASET_1)
diff --git a/test/unit/config.result b/test/unit/config.result
index e3d3cd9..33b1094 100644
--- a/test/unit/config.result
+++ b/test/unit/config.result
@@ -370,3 +370,52 @@ cfg.shard_index = 'vbucket'
lcfg.check(cfg)
---
...
+--
+-- gh-77: garbage collection options.
+--
+cfg.garbage_collect_interval = 'str'
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = 0
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = -1
+---
+...
+check(cfg)
+---
+- Garbage collect interval must be positive number
+...
+cfg.garbage_collect_interval = 100.5
+---
+...
+lcfg.check(cfg)
+---
+...
+cfg.collect_lua_garbage = 100
+---
+...
+check(cfg)
+---
+- Collect Lua garbage must be either true or false
+...
+cfg.collect_lua_garbage = true
+---
+...
+lcfg.check(cfg)
+---
+...
+cfg.collect_lua_garbage = false
+---
+...
+lcfg.check(cfg)
+---
+...
diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua
index add16bd..333d165 100644
--- a/test/unit/config.test.lua
+++ b/test/unit/config.test.lua
@@ -147,3 +147,22 @@ cfg.shard_index = ''
check(cfg)
cfg.shard_index = 'vbucket'
lcfg.check(cfg)
+
+--
+-- gh-77: garbage collection options.
+--
+cfg.garbage_collect_interval = 'str'
+check(cfg)
+cfg.garbage_collect_interval = 0
+check(cfg)
+cfg.garbage_collect_interval = -1
+check(cfg)
+cfg.garbage_collect_interval = 100.5
+lcfg.check(cfg)
+
+cfg.collect_lua_garbage = 100
+check(cfg)
+cfg.collect_lua_garbage = true
+lcfg.check(cfg)
+cfg.collect_lua_garbage = false
+lcfg.check(cfg)
diff --git a/test/unit/garbage.result b/test/unit/garbage.result
index 14e5721..83d7b9f 100644
--- a/test/unit/garbage.result
+++ b/test/unit/garbage.result
@@ -27,6 +27,9 @@ test_run:cmd("setopt delimiter ''");
vshard.storage.internal.shard_index = 'bucket_id'
---
...
+vshard.storage.internal.garbage_collect_interval = vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+---
+...
--
-- Find nothing if no bucket_id anywhere, or there is no index
-- by it, or bucket_id is not unsigned.
diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
index 3db44f8..0a5aca9 100644
--- a/test/unit/garbage.test.lua
+++ b/test/unit/garbage.test.lua
@@ -13,6 +13,7 @@ end;
test_run:cmd("setopt delimiter ''");
vshard.storage.internal.shard_index = 'bucket_id'
+vshard.storage.internal.garbage_collect_interval = vshard.consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
--
-- Find nothing if no bucket_id anywhere, or there is no index
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index 8d9ccbb..7df68fc 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -124,6 +124,16 @@ local function cfg_check(shard_cfg)
'positive integer')
end
end
+ if shard_cfg.garbage_collect_interval ~= nil then
+ local t = shard_cfg.garbage_collect_interval
+ if type(t) ~= 'number' or t <= 0 then
+ error('Garbage collect interval must be positive number')
+ end
+ end
+ if shard_cfg.collect_lua_garbage ~= nil and
+ type(shard_cfg.collect_lua_garbage) ~= 'boolean' then
+ error('Collect Lua garbage must be either true or false')
+ end
local uuids = {}
local uris = {}
for replicaset_uuid, replicaset in pairs(shard_cfg.sharding) do
@@ -156,6 +166,8 @@ local function prepare_for_box_cfg(cfg)
cfg.rebalancer_disbalance_threshold = nil
cfg.rebalancer_max_receiving = nil
cfg.shard_index = nil
+ cfg.garbage_collect_interval = nil
+ cfg.collect_lua_garbage = nil
end
return {
diff --git a/vshard/consts.lua b/vshard/consts.lua
index bb215c3..ff6f6b0 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -32,7 +32,8 @@ return {
FAILOVER_DOWN_TIMEOUT = 1;
SYNC_TIMEOUT = 1;
RECONNECT_TIMEOUT = 0.5;
- GARBAGE_COLLECT_INTERVAL = 0.5;
+ DEFAULT_GARBAGE_COLLECT_INTERVAL = 0.5;
RECOVERY_INTERVAL = 5;
+ COLLECT_LUA_GARBAGE_INTERVAL = 100;
DISCOVERY_INTERVAL = 10;
}
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index f7e84a1..abad12c 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -24,6 +24,9 @@ if not M then
discovery_fiber = nil,
-- Bucket count stored on all replicasets.
total_bucket_count = 0,
+ -- If true, then discovery fiber starts to call
+ -- collectgarbage() periodically.
+ collect_lua_garbage = nil,
-- This counter is used to restart background fibers with
-- new reloaded code.
module_version = 0,
@@ -123,6 +126,8 @@ end
local function discovery_f(module_version)
lfiber.name('discovery_fiber')
M.discovery_fiber = lfiber.self()
+ local iterations_until_lua_gc =
+ consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
while module_version == M.module_version do
for _, replicaset in pairs(M.replicasets) do
local active_buckets, err =
@@ -141,6 +146,12 @@ local function discovery_f(module_version)
M.route_map[bucket_id] = replicaset
end
end
+ iterations_until_lua_gc = iterations_until_lua_gc - 1
+ if M.collect_lua_garbage and iterations_until_lua_gc == 0 then
+ iterations_until_lua_gc =
+ consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
+ collectgarbage()
+ end
lfiber.sleep(consts.DISCOVERY_INTERVAL)
end
end
@@ -485,6 +496,7 @@ local function router_cfg(cfg)
-- TODO: update existing route map in-place
M.route_map = {}
M.total_bucket_count = cfg.bucket_count or consts.DEFAULT_BUCKET_COUNT
+ M.collect_lua_garbage = cfg.collect_lua_garbage
lcfg.prepare_for_box_cfg(cfg)
-- Force net.box connection on cfg()
for _, replicaset in pairs(new_replicasets) do
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index cb7efef..499523a 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -48,12 +48,17 @@ if not M then
shard_index = nil,
-- Bucket count stored on all replicasets.
total_bucket_count = 0,
+ -- If true, then bucket garbage collection fiber starts to
+ -- call collectgarbage() periodically.
+ collect_lua_garbage = nil,
-- Maximal allowed percent deviation of bucket count on a
-- replicaset from ethalon bucket count.
rebalancer_disbalance_threshold = 0,
-- Maximal bucket count that can be received by a single
-- replicaset simultaneously.
rebalancer_max_receiving = 0,
+ -- Do buckets garbage collection once per this time.
+ garbage_collect_interval = nil,
errinj = {
ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
ERRINJ_RELOAD = false,
@@ -793,7 +798,7 @@ end
--
-- Garbage collector. Works on masters. The garbage collector
--- wakes up once per GARBAGE_COLLECT_INTERVAL seconds.
+-- wakes up once per specified time.
-- After wakeup it checks follows the plan:
-- 1) Check if _bucket has changed. If not, then sleep again;
-- 2) Scan user spaces for not existing, sent and garbage buckets,
@@ -837,6 +842,8 @@ function collect_garbage_f(module_version)
-- buckets_for_redirect is deleted, it gets empty_sent_buckets
-- for next deletion.
local empty_sent_buckets = {}
+ local iterations_until_lua_gc =
+ consts.COLLECT_LUA_GARBAGE_INTERVAL / M.garbage_collect_interval
while M.module_version == module_version do
-- Check if no changes in buckets configuration.
@@ -847,7 +854,6 @@ function collect_garbage_f(module_version)
end
if not status then
log.error('Error during garbage collection step: %s', err)
- lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
goto continue
end
status, empty_sent_buckets = pcall(collect_garbage_update_bucket)
@@ -858,12 +864,11 @@ function collect_garbage_f(module_version)
log.error('Error during empty buckets processing: %s',
empty_sent_buckets)
control.bucket_generation = control.bucket_generation + 1
- lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
goto continue
end
end
- local duration = lfiber.time() - buckets_for_redirect_ts
- if duration >= consts.BUCKET_SENT_GARBAGE_DELAY then
+ if lfiber.time() - buckets_for_redirect_ts >=
+ consts.BUCKET_SENT_GARBAGE_DELAY then
local status, err = pcall(collect_garbage_drop_redirects,
buckets_for_redirect)
if M.module_version ~= module_version then
@@ -878,8 +883,23 @@ function collect_garbage_f(module_version)
buckets_for_redirect_ts = lfiber.time()
end
end
- lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
::continue::
+ iterations_until_lua_gc = iterations_until_lua_gc - 1
+ if iterations_until_lua_gc == 0 and M.collect_lua_garbage then
+ iterations_until_lua_gc =
+ consts.COLLECT_LUA_GARBAGE_INTERVAL / M.garbage_collect_interval
+ collectgarbage()
+ end
+ lfiber.sleep(M.garbage_collect_interval)
+ end
+end
+
+--
+-- Immediately wakeup bucket garbage collector.
+--
+local function garbage_collector_wakeup()
+ if M.garbage_collect_fiber then
+ M.garbage_collect_fiber:wakeup()
end
end
@@ -1323,6 +1343,9 @@ local function storage_cfg(cfg, this_replica_uuid)
M.rebalancer_max_receiving = cfg.rebalancer_max_receiving or
consts.DEFAULT_REBALANCER_MAX_RECEIVING
M.shard_index = cfg.shard_index or 'bucket_id'
+ M.garbage_collect_interval = cfg.garbage_collect_interval or
+ consts.DEFAULT_GARBAGE_COLLECT_INTERVAL
+ M.collect_lua_garbage = cfg.collect_lua_garbage
lcfg.prepare_for_box_cfg(cfg)
box.cfg(cfg)
@@ -1356,6 +1379,7 @@ local function storage_cfg(cfg, this_replica_uuid)
M.rebalancer_fiber:cancel()
M.rebalancer_fiber = nil
end
+
if old_replicasets then
lreplicaset.destroy(old_replicasets)
end
@@ -1586,6 +1610,7 @@ return {
bucket_send = bucket_send;
bucket_stat = bucket_stat;
bucket_delete_garbage = bucket_delete_garbage;
+ garbage_collector_wakeup = garbage_collector_wakeup;
rebalancer_wakeup = rebalancer_wakeup;
rebalancer_apply_routes = rebalancer_apply_routes;
rebalancer_disable = rebalancer_disable;
--
2.14.3 (Apple Git-98)
More information about the Tarantool-patches
mailing list