[tarantool-patches] Re: [PATCH 3/3] Introduce destroy module feature
Alex Khatskevich
avkhatskevich at tarantool.org
Tue Jun 26 00:54:46 MSK 2018
>> --- a/test/lua_libs/util.lua
>> +++ b/test/lua_libs/util.lua
>> @@ -69,9 +69,43 @@ local function wait_master(test_run, replicaset,
>> master)
>> log.info('Slaves are connected to a master "%s"', master)
>> end
>> +function vshard_fiber_list()
>> + -- Flush jit traces to prevent them from
>> + -- keeping its upvalues in memory.
>> + jit.flush()
>> + collectgarbage()
>> + -- Give a fiber time to clean itself.
>> + fiber.sleep(0.05)
>
> 1. Why do you need this sleep? As far as I know
> collectgarbage just cleans all the garbage in the same
> fiber with no yields or async tasks.
netbox: If `collect garbage` has collected a connection, the fiber need
a time to wakeup after
fiber:cancel() to process it and exit.
another module: the same possible behavior.
>>
>> + local add = true
>> + for _, pattern in pairs(non_vshard_patterns) do
>> + if fib.name:match(pattern) then
>> + add = false
>> + break
>> + end
>> + end
>> + if add then
>> + table.insert(names, fib.name)
>> + end
>> + end
>> + table.sort(names)
>
> 2. Sort of an array just does nothing, it is not?
Fibers return in non-predictable order.
If one uses this function just to immediately print a fiber list, he/she
expects the retval
to be consistent over time.
>
>> diff --git a/test/router/destroy.result b/test/router/destroy.result
>> + sleep(0.05)
>
> 3. Maybe fiber.sleep? Process sleep looks weird here.
>> +---> diff --git a/test/storage/destroy.result
>> b/test/storage/destroy.result
>> + sleep(0.05)
> 4. Same.
Fixed, thanks
>> + end
>> +end;
>> +---> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>> index 21093e5..da8e49e 100644
>> --- a/vshard/router/init.lua
>> +++ b/vshard/router/init.lua
>> @@ -33,6 +33,27 @@ if not M then
>> }
>> end
>> +--
>> +-- Destroy router module.
>> +--
>> +local function destroy()
>> + local MODULE_INTERNALS = '__module_vshard_router'
>> + assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed')
>> + local bg_fibers = {
>> + 'failover_fiber',
>> + 'discovery_fiber',
>> + }
>
> 5. Please, just inline this cycle in 2 lines. Anyway on a new
> fiber it is necessary to add a new name to bg_fibers.
done
>
>> + for _, fib_name in pairs(bg_fibers) do
>> + if M[fib_name] then
>> + M[fib_name]:cancel()
>> + M[fib_name] = nil
>> + end
>> + end
>> + vshard.router.internal = nil
>> + rawset(_G, MODULE_INTERNALS, nil)
>> + M = nil
>
> 6. I do not like, that recfg now looks like
>
> destroy
> package.loaded = nil
> cfg
>
> It should not be necessary to nullify the package. Today in
> the public chat a customer asked about
>
> box.cfg{}
> box.stop()
> box.cfg{}
> box.stop()
>
> So there is no place for package.loaded. Most of people
> even do not know about this thing existence.
reimplemented. waiting for feedback.
>
>> +end
>> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
>> index 059e705..ac37163 100644
>> --- a/vshard/storage/init.lua
>> +++ b/vshard/storage/init.lua
>> @@ -207,6 +207,43 @@ local function on_master_enable(...)
>> end
>> end
>> +--------------------------------------------------------------------------------
>> +-- Destroy
>> +--------------------------------------------------------------------------------
>>
>> +
>> +--
>> +-- Destroy storage module.
>> +--
>> +local function destroy()
>> + local MODULE_INTERNALS = '__module_vshard_storage'
>> + assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed')
>> + box.space._bucket:drop()
> 7. When this instance is master for another instance (it will be so
> when we return to master-master topology), this drop will be replicated
> to active instances. Please, forbid destroy when the instance has active
> relays.
This feature is under development...
>> + for _, name in ipairs(storage_api) do
>> + box.schema.func.drop(name)
>> + end
>> + local bg_fibers = {
>> + 'recovery_fiber',
>> + 'collect_bucket_garbage_fiber',
>> + 'rebalancer_applier_fiber',
>> + 'rebalancer_fiber',
>> + }
>> + for _, fib_name in pairs(bg_fibers) do
>> + if M[fib_name] then
>> + M[fib_name]:cancel()
>> + M[fib_name] = nil
>> + end
>> + end
>> + local box_cfg = table.deepcopy(box.cfg)
>> + box_cfg.replication = nil
>> + box_cfg.read_only = nil
>
> 8. Assign nil to table is like removal of the key. So here actually
> you just recall empty box.cfg{}. To turn off the replication you
> should pass empty replication array explicitly. To turn off the
> read_only you should pass read_only = false explicitly.
Thanks
>> + box.snapshot()
>
> 9. Why?
deleted
Full diff
commit b0cddc2a3ba30d3ecf57e71ce8d095bf2f093a5e
Author: AKhatskevich <avkhatskevich at tarantool.org>
Date: Wed Jun 20 00:31:25 2018 +0300
Introduce destroy module feature
Introduce functions:
* vshard.router.destroy()
* vshard.storage.destroy()
Those functions:
* close connections
* stop background fibers
* delete vshard spaces
* delete vshard funcitons
* delete `once` metadate
After the destroy, module can be configured as it was just loaded.
Extra changes:
* introduce fiber_list function which returns names of non-tarantool
fibers
* introduce update_M function, which updates M (module internals) with
values defined in the module
Closes #121
diff --git a/test/lua_libs/util.lua b/test/lua_libs/util.lua
index f2d3b48..ce0ea67 100644
--- a/test/lua_libs/util.lua
+++ b/test/lua_libs/util.lua
@@ -69,9 +69,43 @@ local function wait_master(test_run, replicaset, master)
log.info('Slaves are connected to a master "%s"', master)
end
+function vshard_fiber_list()
+ -- Flush jit traces to prevent them from
+ -- keeping its upvalues in memory.
+ jit.flush()
+ collectgarbage()
+ -- Give a fiber time to clean itself.
+ fiber.sleep(0.05)
+ local fibers = fiber.info()
+ local non_vshard_patterns = {
+ '^console',
+ 'feedback_daemon$',
+ '^checkpoint_daemon$',
+ '^main$',
+ '^memtx%.gc$',
+ '^vinyl%.scheduler$',
+ }
+ local names = {}
+ for _, fib in pairs(fibers) do
+ local add = true
+ for _, pattern in pairs(non_vshard_patterns) do
+ if fib.name:match(pattern) then
+ add = false
+ break
+ end
+ end
+ if add then
+ table.insert(names, fib.name)
+ end
+ end
+ table.sort(names)
+ return names
+end;
+
return {
check_error = check_error,
shuffle_masters = shuffle_masters,
collect_timeouts = collect_timeouts,
wait_master = wait_master,
+ vshard_fiber_list = vshard_fiber_list,
}
diff --git a/test/router/destroy.result b/test/router/destroy.result
new file mode 100644
index 0000000..9284ef6
--- /dev/null
+++ b/test/router/destroy.result
@@ -0,0 +1,105 @@
+test_run = require('test_run').new()
+---
+...
+netbox = require('net.box')
+---
+...
+fiber = require('fiber')
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'storage')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'storage')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+---
+- true
+...
+test_run:cmd("start server router_1")
+---
+- true
+...
+_ = test_run:cmd("switch router_1")
+---
+...
+util = require('util')
+---
+...
+fiber = require('fiber')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function wait_fibers_exit()
+ while #util.vshard_fiber_list() > 0 do
+ fiber.sleep(0.05)
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3301 (net.box)
+ - 127.0.0.1:3302 (net.box)
+ - 127.0.0.1:3303 (net.box)
+ - 127.0.0.1:3304 (net.box)
+ - discovery_fiber
+ - vshard.failover
+...
+vshard.router.destroy()
+---
+...
+wait_fibers_exit()
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3301 (net.box)
+ - 127.0.0.1:3302 (net.box)
+ - 127.0.0.1:3303 (net.box)
+ - 127.0.0.1:3304 (net.box)
+ - discovery_fiber
+ - vshard.failover
+...
+_ = test_run:cmd("switch default")
+---
+...
+test_run:cmd('stop server router_1')
+---
+- true
+...
+test_run:cmd('cleanup server router_1')
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
diff --git a/test/router/destroy.test.lua b/test/router/destroy.test.lua
new file mode 100644
index 0000000..caf4d8e
--- /dev/null
+++ b/test/router/destroy.test.lua
@@ -0,0 +1,38 @@
+test_run = require('test_run').new()
+netbox = require('net.box')
+fiber = require('fiber')
+
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+
+test_run:create_cluster(REPLICASET_1, 'storage')
+test_run:create_cluster(REPLICASET_2, 'storage')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+test_run:cmd("create server router_1 with script='router/router_1.lua'")
+test_run:cmd("start server router_1")
+
+_ = test_run:cmd("switch router_1")
+util = require('util')
+fiber = require('fiber')
+test_run:cmd("setopt delimiter ';'")
+function wait_fibers_exit()
+ while #util.vshard_fiber_list() > 0 do
+ fiber.sleep(0.05)
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+vshard.router.destroy()
+wait_fibers_exit()
+vshard.router.cfg(cfg)
+util.vshard_fiber_list()
+
+_ = test_run:cmd("switch default")
+test_run:cmd('stop server router_1')
+test_run:cmd('cleanup server router_1')
+test_run:drop_cluster(REPLICASET_2)
diff --git a/test/storage/destroy.result b/test/storage/destroy.result
new file mode 100644
index 0000000..180fc0b
--- /dev/null
+++ b/test/storage/destroy.result
@@ -0,0 +1,129 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'storage')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'storage')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+_ = test_run:cmd("switch storage_1_a")
+---
+...
+util = require('util')
+---
+...
+fiber = require('fiber')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function wait_fibers_exit()
+ while #util.vshard_fiber_list() > 0 do
+ fiber.sleep(0.05)
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Storage is configured.
+-- Establish net.box connection.
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs:callro('echo', {'some data'})
+---
+- some data
+- null
+- null
+...
+rs = nil
+---
+...
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3303 (net.box)
+ - vshard.gc
+ - vshard.recovery
+...
+box.schema.user.exists('storage') == true
+---
+- true
+...
+box.space._bucket ~= nil
+---
+- true
+...
+-- Destroy storage.
+vshard.storage.destroy()
+---
+...
+wait_fibers_exit()
+---
+...
+box.space._bucket == nil
+---
+- true
+...
+-- Reconfigure storage.
+-- gh-52: Allow use existing user.
+box.schema.user.exists('storage') == true
+---
+- true
+...
+vshard.storage.cfg(cfg, names['storage_1_a'])
+---
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs:callro('echo', {'some data'})
+---
+- some data
+- null
+- null
+...
+rs = nil
+---
+...
+box.space._bucket ~= nil
+---
+- true
+...
+util.vshard_fiber_list()
+---
+- - 127.0.0.1:3303 (net.box)
+ - vshard.gc
+ - vshard.recovery
+...
+_ = test_run:cmd("switch default")
+---
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
diff --git a/test/storage/destroy.test.lua b/test/storage/destroy.test.lua
new file mode 100644
index 0000000..38e7c0a
--- /dev/null
+++ b/test/storage/destroy.test.lua
@@ -0,0 +1,50 @@
+test_run = require('test_run').new()
+
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+
+test_run:create_cluster(REPLICASET_1, 'storage')
+test_run:create_cluster(REPLICASET_2, 'storage')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+
+_ = test_run:cmd("switch storage_1_a")
+util = require('util')
+fiber = require('fiber')
+test_run:cmd("setopt delimiter ';'")
+function wait_fibers_exit()
+ while #util.vshard_fiber_list() > 0 do
+ fiber.sleep(0.05)
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Storage is configured.
+-- Establish net.box connection.
+_, rs = next(vshard.storage.internal.replicasets)
+rs:callro('echo', {'some data'})
+rs = nil
+-- Validate destroy finction by fiber_list.
+-- Netbox fibers are deleted after replicas by GC.
+util.vshard_fiber_list()
+box.schema.user.exists('storage') == true
+box.space._bucket ~= nil
+-- Destroy storage.
+vshard.storage.destroy()
+wait_fibers_exit()
+box.space._bucket == nil
+
+-- Reconfigure storage.
+-- gh-52: Allow use existing user.
+box.schema.user.exists('storage') == true
+vshard.storage.cfg(cfg, names['storage_1_a'])
+_, rs = next(vshard.storage.internal.replicasets)
+rs:callro('echo', {'some data'})
+rs = nil
+box.space._bucket ~= nil
+util.vshard_fiber_list()
+
+_ = test_run:cmd("switch default")
+test_run:drop_cluster(REPLICASET_2)
+test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 21093e5..e56c38a 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -7,30 +7,49 @@ local lhash = require('vshard.hash')
local lreplicaset = require('vshard.replicaset')
local util = require('vshard.util')
+local MODULE_SKELETON = {
+ errinj = {
+ ERRINJ_FAILOVER_CHANGE_CFG = false,
+ ERRINJ_RELOAD = false,
+ },
+ -- Bucket map cache.
+ route_map = {},
+ -- All known replicasets used for bucket re-balancing
+ replicasets = nil,
+ -- Fiber to maintain replica connections.
+ failover_fiber = nil,
+ -- Fiber to discovery buckets in background.
+ discovery_fiber = nil,
+ -- Bucket count stored on all replicasets.
+ total_bucket_count = 0,
+ -- If true, then discovery fiber starts to call
+ -- collectgarbage() periodically.
+ collect_lua_garbage = nil,
+ -- This counter is used to restart background fibers with
+ -- new reloaded code.
+ module_version = 0,
+}
+
local M = rawget(_G, '__module_vshard_router')
-if not M then
- M = {
- errinj = {
- ERRINJ_FAILOVER_CHANGE_CFG = false,
- ERRINJ_RELOAD = false,
- },
- -- Bucket map cache.
- route_map = {},
- -- All known replicasets used for bucket re-balancing
- replicasets = nil,
- -- Fiber to maintain replica connections.
- failover_fiber = nil,
- -- Fiber to discovery buckets in background.
- discovery_fiber = nil,
- -- Bucket count stored on all replicasets.
- total_bucket_count = 0,
- -- If true, then discovery fiber starts to call
- -- collectgarbage() periodically.
- collect_lua_garbage = nil,
- -- This counter is used to restart background fibers with
- -- new reloaded code.
- module_version = 0,
- }
+local update_M
+
+--
+-- Destroy router module.
+--
+local function destroy()
+ local MODULE_INTERNALS = '__module_vshard_router'
+ assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed')
+ -- Cancel background fibers.
+ for _, fib_name in pairs({
+ 'failover_fiber',
+ 'discovery_fiber',
+ }) do
+ if M[fib_name] then
+ M[fib_name]:cancel()
+ end
+ end
+ util.override_table(M, MODULE_SKELETON)
+ update_M()
end
-- Set a replicaset by container of a bucket.
@@ -758,10 +777,6 @@ local function router_sync(timeout)
end
end
-if M.errinj.ERRINJ_RELOAD then
- error('Error injection: reload')
-end
-
--------------------------------------------------------------------------------
-- Module definition
--------------------------------------------------------------------------------
@@ -769,15 +784,28 @@ end
-- About functions, saved in M, and reloading see comment in
-- storage/init.lua.
--
-M.discovery_f = discovery_f
-M.failover_f = failover_f
+
+--
+-- Store module-definde values to M.
+--
+update_M = function()
+ M.discovery_f = discovery_f
+ M.failover_f = failover_f
+end
if not rawget(_G, '__module_vshard_router') then
+ M = table.deepcopy(MODULE_SKELETON)
+ update_M()
rawset(_G, '__module_vshard_router', M)
else
+ M = rawget(_G, '__module_vshard_router')
M.module_version = M.module_version + 1
end
+if M.errinj.ERRINJ_RELOAD then
+ error('Error injection: reload')
+end
+
return {
cfg = router_cfg;
info = router_info;
@@ -792,6 +820,7 @@ return {
sync = router_sync;
bootstrap = cluster_bootstrap;
bucket_discovery = bucket_discovery;
+ destroy = destroy,
discovery_wakeup = discovery_wakeup;
internal = M;
module_version = function() return M.module_version end;
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index c80bfbf..0d3cc24 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -9,89 +9,88 @@ local lcfg = require('vshard.cfg')
local lreplicaset = require('vshard.replicaset')
local trigger = require('internal.trigger')
-local M = rawget(_G, '__module_vshard_storage')
-if not M then
+--
+-- The module is loaded for the first time.
+--
+local MODULE_SKELETON = {
+ ---------------- Common module attributes ----------------
--
- -- The module is loaded for the first time.
+ -- All known replicasets used for bucket re-balancing.
+ -- See format in replicaset.lua.
--
- M = {
- ---------------- Common module attributes ----------------
- --
- -- All known replicasets used for bucket re-balancing.
- -- See format in replicaset.lua.
- --
- replicasets = nil,
- -- Triggers on master switch event. They are called right
- -- before the event occurs.
- _on_master_enable = trigger.new('_on_master_enable'),
- _on_master_disable = trigger.new('_on_master_disable'),
- -- Index which is a trigger to shard its space by numbers in
- -- this index. It must have at first part either unsigned,
- -- or integer or number type and be not nullable. Values in
- -- this part are considered as bucket identifiers.
- shard_index = nil,
- -- Bucket count stored on all replicasets.
- total_bucket_count = 0,
- errinj = {
- ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
- ERRINJ_RELOAD = false,
- ERRINJ_CFG_DELAY = false,
- ERRINJ_LONG_RECEIVE = false,
- },
- -- This counter is used to restart background fibers with
- -- new reloaded code.
- module_version = 0,
- --
- -- Timeout to wait sync with slaves. Used on master
- -- demotion or on a manual sync() call.
- --
- sync_timeout = consts.DEFAULT_SYNC_TIMEOUT,
- -- References to a parent replicaset and self in it.
- this_replicaset = nil,
- this_replica = nil,
-
- ------------------- Garbage collection -------------------
- -- Fiber to remove garbage buckets data.
- collect_bucket_garbage_fiber = nil,
- -- Do buckets garbage collection once per this time.
- collect_bucket_garbage_interval = nil,
- -- If true, then bucket garbage collection fiber starts to
- -- call collectgarbage() periodically.
- collect_lua_garbage = nil,
-
- -------------------- Bucket recovery ---------------------
- -- Bucket identifiers which are not active and are not being
- -- sent - their status is unknown. Their state must be checked
- -- periodically in recovery fiber.
- buckets_to_recovery = {},
- buckets_to_recovery_count = 0,
- recovery_fiber = nil,
-
- ----------------------- Rebalancer -----------------------
- -- Fiber to rebalance a cluster.
- rebalancer_fiber = nil,
- -- Fiber which applies routes one by one. Its presense and
- -- active status means that the rebalancing is in progress
- -- now on the current node.
- rebalancer_applier_fiber = nil,
- -- Internal flag to activate and deactivate rebalancer. Mostly
- -- for tests.
- is_rebalancer_active = true,
- -- Maximal allowed percent deviation of bucket count on a
- -- replicaset from etalon bucket count.
- rebalancer_disbalance_threshold = 0,
- -- Maximal bucket count that can be received by a single
- -- replicaset simultaneously.
- rebalancer_max_receiving = 0,
- -- Identifier of a bucket that rebalancer is sending now,
- -- or else 0. If a bucket has state SENDING, but its id is
- -- not stored here, it means, that its sending was
- -- interrupted, for example by restart of an instance, and
- -- a destination replicaset must drop already received
- -- data.
- rebalancer_sending_bucket = 0,
- }
-end
+ replicasets = nil,
+ -- Triggers on master switch event. They are called right
+ -- before the event occurs.
+ _on_master_enable = trigger.new('_on_master_enable'),
+ _on_master_disable = trigger.new('_on_master_disable'),
+ -- Index which is a trigger to shard its space by numbers in
+ -- this index. It must have at first part either unsigned,
+ -- or integer or number type and be not nullable. Values in
+ -- this part are considered as bucket identifiers.
+ shard_index = nil,
+ -- Bucket count stored on all replicasets.
+ total_bucket_count = 0,
+ errinj = {
+ ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
+ ERRINJ_RELOAD = false,
+ ERRINJ_CFG_DELAY = false,
+ ERRINJ_LONG_RECEIVE = false,
+ },
+ -- This counter is used to restart background fibers with
+ -- new reloaded code.
+ module_version = 0,
+ --
+ -- Timeout to wait sync with slaves. Used on master
+ -- demotion or on a manual sync() call.
+ --
+ sync_timeout = consts.DEFAULT_SYNC_TIMEOUT,
+ -- References to a parent replicaset and self in it.
+ this_replicaset = nil,
+ this_replica = nil,
+
+ ------------------- Garbage collection -------------------
+ -- Fiber to remove garbage buckets data.
+ collect_bucket_garbage_fiber = nil,
+ -- Do buckets garbage collection once per this time.
+ collect_bucket_garbage_interval = nil,
+ -- If true, then bucket garbage collection fiber starts to
+ -- call collectgarbage() periodically.
+ collect_lua_garbage = nil,
+
+ -------------------- Bucket recovery ---------------------
+ -- Bucket identifiers which are not active and are not being
+ -- sent - their status is unknown. Their state must be checked
+ -- periodically in recovery fiber.
+ buckets_to_recovery = {},
+ buckets_to_recovery_count = 0,
+ recovery_fiber = nil,
+
+ ----------------------- Rebalancer -----------------------
+ -- Fiber to rebalance a cluster.
+ rebalancer_fiber = nil,
+ -- Fiber which applies routes one by one. Its presense and
+ -- active status means that the rebalancing is in progress
+ -- now on the current node.
+ rebalancer_applier_fiber = nil,
+ -- Internal flag to activate and deactivate rebalancer. Mostly
+ -- for tests.
+ is_rebalancer_active = true,
+ -- Maximal allowed percent deviation of bucket count on a
+ -- replicaset from etalon bucket count.
+ rebalancer_disbalance_threshold = 0,
+ -- Maximal bucket count that can be received by a single
+ -- replicaset simultaneously.
+ rebalancer_max_receiving = 0,
+ -- Identifier of a bucket that rebalancer is sending now,
+ -- or else 0. If a bucket has state SENDING, but its id is
+ -- not stored here, it means, that its sending was
+ -- interrupted, for example by restart of an instance, and
+ -- a destination replicaset must drop already received
+ -- data.
+ rebalancer_sending_bucket = 0,
+}
+local M
+local update_M
--
-- Check if this replicaset is locked. It means be invisible for
@@ -138,6 +137,22 @@ end
--------------------------------------------------------------------------------
-- Schema
--------------------------------------------------------------------------------
+local storage_api = {
+ 'vshard.storage.sync',
+ 'vshard.storage.call',
+ 'vshard.storage.bucket_force_create',
+ 'vshard.storage.bucket_force_drop',
+ 'vshard.storage.bucket_collect',
+ 'vshard.storage.bucket_send',
+ 'vshard.storage.bucket_recv',
+ 'vshard.storage.bucket_stat',
+ 'vshard.storage.buckets_count',
+ 'vshard.storage.buckets_info',
+ 'vshard.storage.buckets_discovery',
+ 'vshard.storage.rebalancer_request_state',
+ 'vshard.storage.rebalancer_apply_routes',
+}
+
local function storage_schema_v1(username, password)
log.info("Initializing schema")
box.schema.user.create(username, {
@@ -157,22 +172,6 @@ local function storage_schema_v1(username, password)
bucket:create_index('pk', {parts = {'id'}})
bucket:create_index('status', {parts = {'status'}, unique = false})
- local storage_api = {
- 'vshard.storage.sync',
- 'vshard.storage.call',
- 'vshard.storage.bucket_force_create',
- 'vshard.storage.bucket_force_drop',
- 'vshard.storage.bucket_collect',
- 'vshard.storage.bucket_send',
- 'vshard.storage.bucket_recv',
- 'vshard.storage.bucket_stat',
- 'vshard.storage.buckets_count',
- 'vshard.storage.buckets_info',
- 'vshard.storage.buckets_discovery',
- 'vshard.storage.rebalancer_request_state',
- 'vshard.storage.rebalancer_apply_routes',
- }
-
for _, name in ipairs(storage_api) do
box.schema.func.create(name, {setuid = true})
box.schema.user.grant(username, 'execute', 'function', name)
@@ -203,6 +202,40 @@ local function on_master_enable(...)
end
end
+--------------------------------------------------------------------------------
+-- Destroy
+--------------------------------------------------------------------------------
+
+--
+-- Destroy storage module.
+--
+local function destroy()
+ local MODULE_INTERNALS = '__module_vshard_storage'
+ assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed')
+ box.space._bucket:drop()
+ for _, name in ipairs(storage_api) do
+ box.schema.func.drop(name)
+ end
+ -- Cancel background fibers.
+ for _, fib_name in pairs({
+ 'recovery_fiber',
+ 'collect_bucket_garbage_fiber',
+ 'rebalancer_applier_fiber',
+ 'rebalancer_fiber',
+ }) do
+ if M[fib_name] then
+ M[fib_name]:cancel()
+ end
+ end
+ local box_cfg = table.deepcopy(box.cfg)
+ box_cfg.replication = {}
+ box_cfg.read_only = false
+ box.cfg(box_cfg)
+ box.space._schema:delete{'oncevshard:storage:1'}
+ util.override_table(M, MODULE_SKELETON)
+ update_M()
+end
+
--------------------------------------------------------------------------------
-- Recovery
--------------------------------------------------------------------------------
@@ -594,10 +627,6 @@ local function bucket_collect_internal(bucket_id)
return data
end
-if M.errinj.ERRINJ_RELOAD then
- error('Error injection: reload')
-end
-
--
-- Collect content of ACTIVE bucket.
--
@@ -1808,27 +1837,37 @@ end
-- restarted (or is restarted from M.background_f, which is not
-- changed) and continues use old func1 and func2.
--
-M.recovery_f = recovery_f
-M.collect_garbage_f = collect_garbage_f
-M.rebalancer_f = rebalancer_f
--
--- These functions are saved in M not for atomic reload, but for
--- unit testing.
+-- Store module-definde values to M.
--
-M.find_garbage_bucket = find_garbage_bucket
-M.collect_garbage_step = collect_garbage_step
-M.collect_garbage_f = collect_garbage_f
-M.rebalancer_build_routes = rebalancer_build_routes
-M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
-M.cached_find_sharded_spaces = find_sharded_spaces
+update_M = function()
+ M.recovery_f = recovery_f
+ M.collect_garbage_f = collect_garbage_f
+ M.rebalancer_f = rebalancer_f
+ -- These functions are saved in M not for atomic reload, but for
+ -- unit testing.
+ M.find_garbage_bucket = find_garbage_bucket
+ M.collect_garbage_step = collect_garbage_step
+ M.collect_garbage_f = collect_garbage_f
+ M.rebalancer_build_routes = rebalancer_build_routes
+ M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
+ M.cached_find_sharded_spaces = find_sharded_spaces
+end
if not rawget(_G, '__module_vshard_storage') then
+ M = table.deepcopy(MODULE_SKELETON)
+ update_M()
rawset(_G, '__module_vshard_storage', M)
else
+ M = rawget(_G, '__module_vshard_router')
M.module_version = M.module_version + 1
end
+if M.errinj.ERRINJ_RELOAD then
+ error('Error injection: reload')
+end
+
return {
sync = sync,
bucket_force_create = bucket_force_create,
@@ -1840,6 +1879,7 @@ return {
bucket_pin = bucket_pin,
bucket_unpin = bucket_unpin,
bucket_delete_garbage = bucket_delete_garbage,
+ destroy = destroy,
garbage_collector_wakeup = garbage_collector_wakeup,
rebalancer_wakeup = rebalancer_wakeup,
rebalancer_apply_routes = rebalancer_apply_routes,
diff --git a/vshard/util.lua b/vshard/util.lua
index bb71318..d57cf44 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -75,7 +75,22 @@ local function generate_self_checker(obj_name,
func_name, mt, func)
end
end
+--
+-- Replace itrernals of a table with a template.
+-- @param tbl Table to be updated.
+-- @param template Internals of the `tbl` after the call.
+--
+local function override_table(tbl, template)
+ for k, _ in pairs(tbl) do
+ tbl[k] = nil
+ end
+ for k, v in pairs(template) do
+ tbl[k] = table.deepcopy(v)
+ end
+end
+
return {
+ override_table = override_table,
tuple_extract_key = tuple_extract_key,
reloadable_fiber_f = reloadable_fiber_f,
generate_self_checker = generate_self_checker,
More information about the Tarantool-patches
mailing list