[patches] [PATCH vshard 2/2] storage: allow storage reload
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Tue Feb 20 01:45:47 MSK 2018
Part of #72
Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
test/storage/reload.result | 190 ++++++++++++++++++++
test/storage/reload.test.lua | 93 ++++++++++
test/unit/garbage.result | 2 +-
test/unit/garbage.test.lua | 2 +-
vshard/storage/init.lua | 412 +++++++++++++++++++++++++------------------
vshard/util.lua | 32 +++-
6 files changed, 561 insertions(+), 170 deletions(-)
create mode 100644 test/storage/reload.result
create mode 100644 test/storage/reload.test.lua
diff --git a/test/storage/reload.result b/test/storage/reload.result
new file mode 100644
index 0000000..ee37383
--- /dev/null
+++ b/test/storage/reload.result
@@ -0,0 +1,190 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'storage')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'storage')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+test_run:switch('storage_1_a')
+---
+- true
+...
+vshard.storage.bucket_force_create(1, vshard.consts.DEFAULT_BUCKET_COUNT / 2)
+---
+- true
+...
+test_run:switch('storage_2_a')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+vshard.storage.bucket_force_create(vshard.consts.DEFAULT_BUCKET_COUNT / 2 + 1, vshard.consts.DEFAULT_BUCKET_COUNT / 2)
+---
+- true
+...
+--
+-- Gh-72: allow reload. Test simple reload, error during
+-- reloading, ensure the fibers are restarted on reload.
+--
+assert(rawget(_G, '__module_vshard_storage') ~= nil)
+---
+- true
+...
+vshard.storage.module_version()
+---
+- 0
+...
+while test_run:grep_log('storage_2_a', 'The cluster is balanced ok') == nil do vshard.storage.rebalancer_wakeup() fiber.sleep(0.1) end
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function check_reloaded()
+ for k, v in pairs(old_internal) do
+ if v == vshard.storage.internal[k] then
+ return k
+ end
+ end
+end;
+---
+...
+function check_not_reloaded()
+ for k, v in pairs(old_internal) do
+ if v ~= vshard.storage.internal[k] then
+ return k
+ end
+ end
+end;
+---
+...
+function copy_functions(t)
+ local ret = {}
+ for k, v in pairs(t) do
+ if type(v) == 'function' then
+ ret[k] = v
+ end
+ end
+ return ret
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+--
+-- Simple reload. All functions are reloaded and they have
+-- another points in vshard.storage.internal.
+--
+old_internal = copy_functions(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_ = require('vshard.storage')
+---
+...
+vshard.storage.module_version()
+---
+- 1
+...
+check_reloaded()
+---
+...
+while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end
+---
+...
+while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+---
+...
+while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+---
+...
+check_reloaded()
+---
+...
+--
+-- Error during reload - in such a case no function can be
+-- updated. Reload is atomic.
+--
+vshard.storage.internal.errinj.ERRINJ_RELOAD = true
+---
+...
+old_internal = copy_functions(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+util = require('util')
+---
+...
+util.check_error(require, 'vshard.storage')
+---
+- 'Error injection: reload'
+...
+check_not_reloaded()
+---
+...
+vshard.storage.module_version()
+---
+- 1
+...
+--
+-- A next reload is ok, and all functions are updated.
+--
+vshard.storage.internal.errinj.ERRINJ_RELOAD = false
+---
+...
+old_internal = copy_functions(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_ = require('vshard.storage')
+---
+...
+vshard.storage.module_version()
+---
+- 2
+...
+check_reloaded()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
+test_run:cmd('clear filter')
+---
+- true
+...
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
new file mode 100644
index 0000000..50a402c
--- /dev/null
+++ b/test/storage/reload.test.lua
@@ -0,0 +1,93 @@
+test_run = require('test_run').new()
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+test_run:create_cluster(REPLICASET_1, 'storage')
+test_run:create_cluster(REPLICASET_2, 'storage')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+
+test_run:switch('storage_1_a')
+vshard.storage.bucket_force_create(1, vshard.consts.DEFAULT_BUCKET_COUNT / 2)
+test_run:switch('storage_2_a')
+fiber = require('fiber')
+vshard.storage.bucket_force_create(vshard.consts.DEFAULT_BUCKET_COUNT / 2 + 1, vshard.consts.DEFAULT_BUCKET_COUNT / 2)
+
+--
+-- Gh-72: allow reload. Test simple reload, error during
+-- reloading, ensure the fibers are restarted on reload.
+--
+
+assert(rawget(_G, '__module_vshard_storage') ~= nil)
+
+vshard.storage.module_version()
+while test_run:grep_log('storage_2_a', 'The cluster is balanced ok') == nil do vshard.storage.rebalancer_wakeup() fiber.sleep(0.1) end
+test_run:cmd("setopt delimiter ';'")
+function check_reloaded()
+ for k, v in pairs(old_internal) do
+ if v == vshard.storage.internal[k] then
+ return k
+ end
+ end
+end;
+function check_not_reloaded()
+ for k, v in pairs(old_internal) do
+ if v ~= vshard.storage.internal[k] then
+ return k
+ end
+ end
+end;
+function copy_functions(t)
+ local ret = {}
+ for k, v in pairs(t) do
+ if type(v) == 'function' then
+ ret[k] = v
+ end
+ end
+ return ret
+end;
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Simple reload. All functions are reloaded and they have
+-- another points in vshard.storage.internal.
+--
+old_internal = copy_functions(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+_ = require('vshard.storage')
+vshard.storage.module_version()
+
+check_reloaded()
+
+while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+
+check_reloaded()
+
+--
+-- Error during reload - in such a case no function can be
+-- updated. Reload is atomic.
+--
+vshard.storage.internal.errinj.ERRINJ_RELOAD = true
+old_internal = copy_functions(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+util = require('util')
+util.check_error(require, 'vshard.storage')
+check_not_reloaded()
+vshard.storage.module_version()
+
+--
+-- A next reload is ok, and all functions are updated.
+--
+vshard.storage.internal.errinj.ERRINJ_RELOAD = false
+old_internal = copy_functions(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+_ = require('vshard.storage')
+vshard.storage.module_version()
+check_reloaded()
+
+test_run:switch('default')
+test_run:drop_cluster(REPLICASET_2)
+test_run:drop_cluster(REPLICASET_1)
+test_run:cmd('clear filter')
diff --git a/test/unit/garbage.result b/test/unit/garbage.result
index 3474aad..14e5721 100644
--- a/test/unit/garbage.result
+++ b/test/unit/garbage.result
@@ -317,7 +317,7 @@ control.bucket_generation_collected
collect_f = vshard.storage.internal.collect_garbage_f
---
...
-f = fiber.create(collect_f)
+f = fiber.create(collect_f, vshard.storage.module_version())
---
...
fill_spaces_with_garbage()
diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
index af80983..3db44f8 100644
--- a/test/unit/garbage.test.lua
+++ b/test/unit/garbage.test.lua
@@ -136,7 +136,7 @@ control.bucket_generation_collected
-- Test continuous garbage collection via background fiber.
--
collect_f = vshard.storage.internal.collect_garbage_f
-f = fiber.create(collect_f)
+f = fiber.create(collect_f, vshard.storage.module_version())
fill_spaces_with_garbage()
-- Wait until garbage collection is finished.
while #s2:select{} ~= 2 do fiber.sleep(0.1) end
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 3182ddc..25ee449 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -9,47 +9,60 @@ local lcfg = require('vshard.cfg')
local lreplicaset = require('vshard.replicaset')
local trigger = require('internal.trigger')
-local total_bucket_count = 0
-local rebalancer_disbalance_threshold = 0
-local rebalancer_max_receiving = 0
-
--- Internal state
-local self = {
+local M = rawget(_G, '__module_vshard_storage')
+if not M then
--
- -- All known replicasets used for bucket re-balancing.
- -- See format in replicaset.lua.
+ -- The module is loaded for the first time.
--
- replicasets = nil,
- -- Fiber to remove garbage buckets data.
- garbage_collect_fiber = nil,
- -- Bucket identifiers which are not active and are not being
- -- sent - their status is unknown. Their state must be checked
- -- periodically in recovery fiber.
- buckets_to_recovery = {},
- buckets_to_recovery_count = 0,
- recovery_fiber = nil,
- -- Fiber to rebalance a cluster.
- rebalancer_fiber = nil,
- -- Fiber which applies routes one by one. Its presense and
- -- active status means that the rebalancing is in progress
- -- now on the current node.
- rebalancer_applier_fiber = nil,
- -- Internal flag to activate and deactivate rebalancer. Mostly
- -- for tests.
- is_rebalancer_active = true,
- -- Triggers on master switch event. They are called right
- -- before the event occurs.
- on_master_enable = trigger.new('on_master_enable'),
- on_master_disable = trigger.new('on_master_disable'),
- -- Index which is a trigger to shard its space by numbers in
- -- this index. It must have at first part either unsigned,
- -- or integer or number type and be not nullable. Values in
- -- this part are considered as bucket identifiers.
- shard_index = nil,
- errinj = {
- ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
+ M = {
+ --
+ -- All known replicasets used for bucket re-balancing.
+ -- See format in replicaset.lua.
+ --
+ replicasets = nil,
+ -- Fiber to remove garbage buckets data.
+ garbage_collect_fiber = nil,
+ -- Bucket identifiers which are not active and are not being
+ -- sent - their status is unknown. Their state must be checked
+ -- periodically in recovery fiber.
+ buckets_to_recovery = {},
+ buckets_to_recovery_count = 0,
+ recovery_fiber = nil,
+ -- Fiber to rebalance a cluster.
+ rebalancer_fiber = nil,
+ -- Fiber which applies routes one by one. Its presense and
+ -- active status means that the rebalancing is in progress
+ -- now on the current node.
+ rebalancer_applier_fiber = nil,
+ -- Internal flag to activate and deactivate rebalancer. Mostly
+ -- for tests.
+ is_rebalancer_active = true,
+ -- Triggers on master switch event. They are called right
+ -- before the event occurs.
+ _on_master_enable = trigger.new('_on_master_enable'),
+ _on_master_disable = trigger.new('_on_master_disable'),
+ -- Index which is a trigger to shard its space by numbers in
+ -- this index. It must have at first part either unsigned,
+ -- or integer or number type and be not nullable. Values in
+ -- this part are considered as bucket identifiers.
+ shard_index = nil,
+ -- Bucket count stored on all replicasets.
+ total_bucket_count = 0,
+ -- Maximal allowed percent deviation of bucket count on a
+ -- replicaset from ethalon bucket count.
+ rebalancer_disbalance_threshold = 0,
+ -- Maximal bucket count that can be received by a single
+ -- replicaset simultaneously.
+ rebalancer_max_receiving = 0,
+ errinj = {
+ ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
+ ERRINJ_RELOAD = false,
+ },
+ -- This counter is used to restart background fibers with
+ -- new reloaded code.
+ module_version = 0,
}
-}
+end
--------------------------------------------------------------------------------
-- Schema
@@ -94,24 +107,24 @@ local function storage_schema_v1(username, password)
end
local function this_is_master()
- return self.this_replicaset and self.this_replicaset.master and
- self.this_replica == self.this_replicaset.master
+ return M.this_replicaset and M.this_replicaset.master and
+ M.this_replica == M.this_replicaset.master
end
local function on_master_disable(...)
- self.on_master_disable(...)
+ M._on_master_disable(...)
-- If a trigger is set after storage.cfg(), then notify an
-- user, that the current instance is not master.
- if #{...} == 1 and not this_is_master() then
- self.on_master_disable:run()
+ if #{...} == 1 and not M.this_is_master() then
+ M._on_master_disable:run()
end
end
local function on_master_enable(...)
- self.on_master_enable(...)
+ M._on_master_enable(...)
-- Same as above, but notify, that the instance is master.
- if #{...} == 1 and this_is_master() then
- self.on_master_enable:run()
+ if #{...} == 1 and M.this_is_master() then
+ M._on_master_enable:run()
end
end
@@ -137,7 +150,7 @@ local function recovery_step()
local _bucket = box.space._bucket
local new_count = 0
local is_empty = true
- for bucket_id, _ in pairs(self.buckets_to_recovery) do
+ for bucket_id, _ in pairs(M.buckets_to_recovery) do
if is_empty then
log.info('Starting buckets recovery step')
end
@@ -151,7 +164,7 @@ local function recovery_step()
table.insert(recovered, bucket_id)
goto continue
end
- local destination = self.replicasets[bucket.destination]
+ local destination = M.replicasets[bucket.destination]
if not destination or not destination.master then
-- No replicaset master for a bucket. Wait until it
-- appears.
@@ -198,9 +211,9 @@ local function recovery_step()
box.commit()
end
for _, id in pairs(recovered) do
- self.buckets_to_recovery[id] = nil
+ M.buckets_to_recovery[id] = nil
end
- self.buckets_to_recovery_count = new_count
+ M.buckets_to_recovery_count = new_count
end
--
@@ -222,19 +235,26 @@ local function recovery_garbage_receiving_buckets()
end
--
--- Background fiber to resolve status of buckets, whose 'sending'
--- has failed due to tarantool or network problems.
+-- Infinite function to resolve status of buckets, whose 'sending'
+-- has failed due to tarantool or network problems. Restarts on
+-- reload.
+-- @param module_version Module version, on which the current
+-- function had been started. If the actual module version
+-- appears to be changed, then stop recovery. It is
+-- restarted in reloadable_fiber_f().
--
-local function recovery_f()
+local function recovery_f(module_version)
lfiber.name('vshard.recovery')
local _bucket = box.space._bucket
local sending_buckets = _bucket.index.status:select{consts.BUCKET.SENDING}
- self.buckets_to_recovery = {}
+ M.buckets_to_recovery = {}
for _, bucket in pairs(sending_buckets) do
- self.buckets_to_recovery[bucket.id] = true
+ M.buckets_to_recovery[bucket.id] = true
end
- while true do
- local ok, err = pcall(recovery_step)
+ -- Interrupt recovery if a module has been reloaded. Perhaps,
+ -- there was found a bug, and reload fixes it.
+ while module_version == M.module_version do
+ local ok, err = pcall(M.recovery_step)
if not ok then
log.error('Error during buckets recovery: %s', err)
end
@@ -246,8 +266,8 @@ end
-- Immediately wakeup recovery fiber, if exists.
--
local function recovery_wakeup()
- if self.recovery_fiber then
- self.recovery_fiber:wakeup()
+ if M.recovery_fiber then
+ M.recovery_fiber:wakeup()
end
end
@@ -322,12 +342,12 @@ local function bucket_check_state(bucket_id, mode)
assert(mode == 'read' or mode == 'write')
local bucket = box.space._bucket:get({bucket_id})
local errcode = nil
- if bucket == nil or bucket_is_garbage(bucket) then
+ if bucket == nil or M.bucket_is_garbage(bucket) then
errcode = lerror.code.WRONG_BUCKET
elseif (bucket.status == consts.BUCKET.SENDING and mode ~= 'read') then
errcode = lerror.code.TRANSFER_IS_IN_PROGRESS
elseif bucket.status == consts.BUCKET.ACTIVE and mode ~= 'read' and
- self.this_replicaset.master ~= self.this_replica then
+ M.this_replicaset.master ~= M.this_replica then
errcode = lerror.code.NON_MASTER
end
if errcode ~= nil then
@@ -350,7 +370,7 @@ local function bucket_stat(bucket_id)
end
local bucket = box.space._bucket:get({bucket_id})
- if not bucket or bucket_is_garbage(bucket) then
+ if not bucket or M.bucket_is_garbage(bucket) then
return nil, lerror.vshard(lerror.code.WRONG_BUCKET,
{bucket_id = bucket_id})
else
@@ -446,7 +466,7 @@ end
--
local function find_sharded_spaces()
local spaces = {}
- local idx = self.shard_index
+ local idx = M.shard_index
for k, space in pairs(box.space) do
if type(k) == 'number' and space.index[idx] ~= nil then
local parts = space.index[idx].parts
@@ -466,8 +486,8 @@ end
--
local function bucket_collect_internal(bucket_id)
local data = {}
- local spaces = find_sharded_spaces()
- local idx = self.shard_index
+ local spaces = M.find_sharded_spaces()
+ local idx = M.shard_index
for k, space in pairs(spaces) do
assert(space.index[idx] ~= nil)
local space_data = space.index[idx]:select({bucket_id})
@@ -476,6 +496,10 @@ local function bucket_collect_internal(bucket_id)
return data
end
+if M.errinj.ERRINJ_RELOAD then
+ error('Error injection: reload')
+end
+
--
-- Collect content of ACTIVE bucket.
--
@@ -484,11 +508,11 @@ local function bucket_collect(bucket_id)
error('Usage: bucket_collect(bucket_id)')
end
- local status, err = bucket_check_state(bucket_id, 'read')
+ local status, err = M.bucket_check_state(bucket_id, 'read')
if not status then
return nil, err
end
- return bucket_collect_internal(bucket_id)
+ return M.bucket_collect_internal(bucket_id)
end
--
@@ -507,23 +531,23 @@ end
-- This function executes when a master role is removed from local
-- instance during configuration
--
-local function local_master_disable()
- self.on_master_disable:run()
+local function local_on_master_disable()
+ M._on_master_disable:run()
box.cfg({read_only = true})
log.verbose("Resigning from the replicaset master role...")
-- Stop garbage collecting
- if self.garbage_collect_fiber ~= nil then
- self.garbage_collect_fiber:cancel()
- self.garbage_collect_fiber = nil
+ if M.garbage_collect_fiber ~= nil then
+ M.garbage_collect_fiber:cancel()
+ M.garbage_collect_fiber = nil
log.info("GC stopped")
end
- if self.recovery_fiber ~= nil then
- self.recovery_fiber:cancel()
- self.recovery_fiber = nil
+ if M.recovery_fiber ~= nil then
+ M.recovery_fiber:cancel()
+ M.recovery_fiber = nil
log.info('Recovery stopped')
end
-- Wait until replicas are synchronized before one another become a new master
- sync(consts.SYNC_TIMEOUT)
+ M.sync(consts.SYNC_TIMEOUT)
log.info("Resigned from the replicaset master role")
end
@@ -533,15 +557,18 @@ local collect_garbage_f
-- This function executes whan a master role is added to local
-- instance during configuration
--
-local function local_master_enable()
+local function local_on_master_enable()
box.cfg({read_only = false})
- self.on_master_enable:run()
+ M._on_master_enable:run()
log.verbose("Taking on replicaset master role...")
- recovery_garbage_receiving_buckets()
+ M.recovery_garbage_receiving_buckets()
-- Start background process to collect garbage.
- self.garbage_collect_fiber = lfiber.create(collect_garbage_f)
+ M.garbage_collect_fiber =
+ lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',
+ 'Garbage collector')
log.info("GC is started")
- self.recovery_fiber = lfiber.create(recovery_f)
+ M.recovery_fiber =
+ lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery')
log.info('Recovery is started')
-- TODO: check current status
log.info("Took on replicaset master role")
@@ -555,11 +582,11 @@ local function bucket_send(bucket_id, destination)
error('Usage: bucket_send(bucket_id, destination)')
end
- local status, err = bucket_check_state(bucket_id, 'write')
+ local status, err = M.bucket_check_state(bucket_id, 'write')
if not status then
return nil, err
end
- local replicaset = self.replicasets[destination]
+ local replicaset = M.replicasets[destination]
if replicaset == nil then
return nil, lerror.vshard(lerror.code.NO_SUCH_REPLICASET,
{replicaset_uuid = destination})
@@ -571,10 +598,10 @@ local function bucket_send(bucket_id, destination)
replicaset_uuid = replicaset_uuid})
end
- local data = bucket_collect_internal(bucket_id)
+ local data = M.bucket_collect_internal(bucket_id)
-- In a case of OOM or exception below the recovery fiber must
-- handle the 'sending' bucket.
- self.buckets_to_recovery[bucket_id] = true
+ M.buckets_to_recovery[bucket_id] = true
box.space._bucket:replace({bucket_id, consts.BUCKET.SENDING, destination})
local status, err =
@@ -584,12 +611,12 @@ local function bucket_send(bucket_id, destination)
if err.type == 'ShardingError' then
-- Rollback bucket state.
box.space._bucket:replace({bucket_id, consts.BUCKET.ACTIVE})
- self.buckets_to_recovery[bucket_id] = nil
+ M.buckets_to_recovery[bucket_id] = nil
end
return status, err
end
box.space._bucket:replace({bucket_id, consts.BUCKET.SENT, destination})
- self.buckets_to_recovery[bucket_id] = nil
+ M.buckets_to_recovery[bucket_id] = nil
return true
end
@@ -621,7 +648,7 @@ local function find_garbage_bucket(bucket_index, control)
t = box.space._bucket:get({bucket_id})
-- If a bucket is stored in _bucket and is garbage - the
-- result is found.
- if t == nil or bucket_is_garbage(t) then
+ if t == nil or M.bucket_is_garbage(t) then
return bucket_id
end
-- The found bucket is not garbage - continue search
@@ -629,8 +656,8 @@ local function find_garbage_bucket(bucket_index, control)
curr_bucket = bucket_id + 1
iterations = iterations + 1
if iterations % 1000 == 0 or
- self.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY then
- while self.ERRINJ_BUCKET_FIND_GARBAGE_DELAY do
+ M.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY then
+ while M.ERRINJ_BUCKET_FIND_GARBAGE_DELAY do
lfiber.sleep(0.1)
end
-- Do not occupy 100% CPU.
@@ -678,15 +705,15 @@ local function collect_garbage_step(control)
-- during garbage collection some buckets are removed and
-- their tuples are in new spaces, then bucket_generation is
-- incremented and such spaces are cleaned up on a next step.
- local sharded_spaces = find_sharded_spaces()
+ local sharded_spaces = M.find_sharded_spaces()
-- For each space:
-- 1) Find garbage bucket. If not found, go to a next space;
-- 2) Delete all its tuples;
-- 3) Go to 1.
for _, space in pairs(sharded_spaces) do
- local bucket_index = space.index[self.shard_index]
+ local bucket_index = space.index[M.shard_index]
while true do
- local garbage_bucket = find_garbage_bucket(bucket_index, control)
+ local garbage_bucket = M.find_garbage_bucket(bucket_index, control)
-- Stop the step, if a generation has changed.
if bucket_generation ~= control.bucket_generation then
log.info('Interrupt garbage collection step')
@@ -695,7 +722,8 @@ local function collect_garbage_step(control)
if garbage_bucket == nil then
break
end
- collect_garbage_bucket_in_space(space, bucket_index, garbage_bucket)
+ M.collect_garbage_bucket_in_space(space, bucket_index,
+ garbage_bucket)
if bucket_generation ~= control.bucket_generation then
log.info('Interrupt garbage collection step')
return
@@ -766,8 +794,8 @@ local function collect_garbage_update_bucket()
end
--
--- Background garbage collector. Works on masters. The garbage
--- collector wakeups once per GARBAGE_COLLECT_INTERVAL seconds.
+-- Garbage collector. Works on masters. The garbage collector
+-- wakes up once per GARBAGE_COLLECT_INTERVAL seconds.
-- After wakeup it checks follows the plan:
-- 1) Check if _bucket has changed. If not, then sleep again;
-- 2) Scan user spaces for not existing, sent and garbage buckets,
@@ -778,7 +806,12 @@ end
-- 5) Sleep, go to (1).
-- For each step detains see comments in code.
--
-function collect_garbage_f()
+-- @param module_version Module version, on which the current
+-- function had been started. If the actual module version
+-- appears to be changed, then stop GC. It is restarted
+-- in reloadable_fiber_f().
+--
+function collect_garbage_f(module_version)
lfiber.name('vshard.gc')
-- Collector controller. Changes of _bucket increments
-- bucket generation. Garbage collector has its own bucket
@@ -807,17 +840,22 @@ function collect_garbage_f()
-- for next deletion.
local empty_sent_buckets = {}
- while true do
-::continue::
+ while M.module_version == module_version do
-- Check if no changes in buckets configuration.
if control.bucket_generation_collected ~= control.bucket_generation then
- local status, err = pcall(collect_garbage_step, control)
+ local status, err = pcall(M.collect_garbage_step, control)
+ if M.module_version ~= module_version then
+ return
+ end
if not status then
log.error('Error during garbage collection step: %s', err)
lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
goto continue
end
- status, empty_sent_buckets = pcall(collect_garbage_update_bucket)
+ status, empty_sent_buckets = pcall(M.collect_garbage_update_bucket)
+ if M.module_version ~= module_version then
+ return
+ end
if not status then
log.error('Error during empty buckets processing: %s',
empty_sent_buckets)
@@ -828,8 +866,11 @@ function collect_garbage_f()
end
local duration = lfiber.time() - buckets_for_redirect_ts
if duration >= consts.BUCKET_SENT_GARBAGE_DELAY then
- local status, err = pcall(collect_garbage_drop_redirects,
+ local status, err = pcall(M.collect_garbage_drop_redirects,
buckets_for_redirect)
+ if M.module_version ~= module_version then
+ return
+ end
if not status then
log.error('Error during deletion of empty sent buckets: %s',
err)
@@ -840,6 +881,7 @@ function collect_garbage_f()
end
end
lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
+::continue::
end
end
@@ -857,14 +899,14 @@ local function bucket_delete_garbage(bucket_id, opts)
end
opts = opts or {}
local bucket = box.space._bucket:get({bucket_id})
- if bucket ~= nil and not bucket_is_garbage(bucket) and not opts.force then
+ if bucket ~= nil and not M.bucket_is_garbage(bucket) and not opts.force then
error('Can not delete not garbage bucket. Use "{force=true}" to '..
'ignore this attention')
end
- local sharded_spaces = find_sharded_spaces()
- local idx = self.shard_index
+ local sharded_spaces = M.find_sharded_spaces()
+ local idx = M.shard_index
for _, space in pairs(sharded_spaces) do
- collect_garbage_bucket_in_space(space, space.index[idx], bucket_id)
+ M.collect_garbage_bucket_in_space(space, space.index[idx], bucket_id)
end
end
@@ -972,8 +1014,8 @@ local function rebalancer_build_routes(replicasets)
local bucket_routes = {}
for uuid, replicaset in pairs(replicasets) do
if replicaset.needed > 0 then
- rebalancer_take_buckets_from_pool(bucket_pool, bucket_routes, uuid,
- replicaset.needed)
+ M.rebalancer_take_buckets_from_pool(bucket_pool, bucket_routes,
+ uuid, replicaset.needed)
end
end
return bucket_routes
@@ -992,15 +1034,15 @@ local function rebalancer_apply_routes_f(routes)
-- var = fiber.create(), because when it yields, we have no
-- guarantee that an event loop does not contain events
-- between this fiber and its creator.
- self.rebalancer_applier_fiber = lfiber.self()
+ M.rebalancer_applier_fiber = lfiber.self()
local active_buckets = _status:select{consts.BUCKET.ACTIVE}
local i = 1
for dst_uuid, bucket_count in pairs(routes) do
assert(i + bucket_count - 1 <= #active_buckets)
log.info('Send %d buckets to %s', bucket_count,
- self.replicasets[dst_uuid])
+ M.replicasets[dst_uuid])
for j = i, i + bucket_count - 1 do
- local status, ret = pcall(bucket_send, active_buckets[j].id,
+ local status, ret = pcall(M.bucket_send, active_buckets[j].id,
dst_uuid)
if not status or ret ~= true then
if not status then
@@ -1023,7 +1065,7 @@ end
-- fiber.
--
local function rebalancing_is_in_progress()
- local f = self.rebalancer_applier_fiber
+ local f = M.rebalancer_applier_fiber
return f ~= nil and f:status() ~= 'dead'
end
@@ -1034,10 +1076,10 @@ end
-- }. Is used by a rebalancer.
--
local function rebalancer_apply_routes(routes)
- assert(not rebalancing_is_in_progress())
+ assert(not M.rebalancing_is_in_progress())
-- Can not apply routes here because of gh-946 in tarantool
-- about problems with long polling. Apply routes in a fiber.
- lfiber.create(rebalancer_apply_routes_f, routes)
+ lfiber.create(M.rebalancer_apply_routes_f, routes)
return true
end
@@ -1051,7 +1093,7 @@ end
local function rebalancer_download_states()
local replicasets = {}
local total_bucket_active_count = 0
- for uuid, replicaset in pairs(self.replicasets) do
+ for uuid, replicaset in pairs(M.replicasets) do
local bucket_active_count =
replicaset:callrw('vshard.storage.rebalancer_request_state', {})
if bucket_active_count == nil then
@@ -1064,12 +1106,12 @@ local function rebalancer_download_states()
ethalon_bucket_count =
replicaset.ethalon_bucket_count}
end
- if total_bucket_active_count == total_bucket_count then
+ if total_bucket_active_count == M.total_bucket_count then
return replicasets
else
log.info('Total active bucket count is not equal to total. '..
'Possibly a boostrap is not finished yet. Expected %d, but '..
- 'found %d', total_bucket_count, total_bucket_active_count)
+ 'found %d', M.total_bucket_count, total_bucket_active_count)
end
end
@@ -1077,15 +1119,17 @@ end
-- Background rebalancer. Works on a storage which has the
-- smallest replicaset uuid and which is master.
--
-local function rebalancer_f()
+local function rebalancer_f(module_version)
lfiber.name('vshard.rebalancer')
- while true do
-::continue::
- while not self.is_rebalancer_active do
+ while module_version == M.module_version do
+ while not M.is_rebalancer_active do
log.info('Rebalancer is disabled. Sleep')
lfiber.sleep(consts.REBALANCER_IDLE_INTERVAL)
end
- local status, replicasets = pcall(rebalancer_download_states)
+ local status, replicasets = pcall(M.rebalancer_download_states)
+ if M.module_version ~= module_version then
+ return
+ end
if not status or replicasets == nil then
if not status then
log.error('Error during downloading rebalancer states: %s',
@@ -1096,14 +1140,15 @@ local function rebalancer_f()
goto continue
end
local max_disbalance =
- rebalancer_calculate_metrics(replicasets, rebalancer_max_receiving)
- if max_disbalance <= rebalancer_disbalance_threshold then
+ M.rebalancer_calculate_metrics(replicasets,
+ M.rebalancer_max_receiving)
+ if max_disbalance <= M.rebalancer_disbalance_threshold then
log.info('The cluster is balanced ok. Schedule next rebalancing '..
'after %f seconds', consts.REBALANCER_IDLE_INTERVAL)
lfiber.sleep(consts.REBALANCER_IDLE_INTERVAL)
goto continue
end
- local routes = rebalancer_build_routes(replicasets)
+ local routes = M.rebalancer_build_routes(replicasets)
-- Routes table can not be empty. If it had been empty,
-- then max_disbalance would have been calculated
-- incorrectly.
@@ -1114,7 +1159,7 @@ local function rebalancer_f()
end
assert(not is_empty)
for src_uuid, src_routes in pairs(routes) do
- local rs = self.replicasets[src_uuid]
+ local rs = M.replicasets[src_uuid]
local status, err =
rs:callrw('vshard.storage.rebalancer_apply_routes',
{src_routes})
@@ -1128,6 +1173,7 @@ local function rebalancer_f()
log.info('Rebalance routes are sent. Schedule next wakeup after '..
'%f seconds', consts.REBALANCER_WORK_INTERVAL)
lfiber.sleep(consts.REBALANCER_WORK_INTERVAL)
+::continue::
end
end
@@ -1138,7 +1184,7 @@ end
-- @retval nil Not SENT or not ACTIVE buckets were found.
--
local function rebalancer_request_state()
- if not self.is_rebalancer_active or rebalancing_is_in_progress() then
+ if not M.is_rebalancer_active or M.rebalancing_is_in_progress() then
return
end
local _bucket = box.space._bucket
@@ -1161,8 +1207,8 @@ end
-- node.
--
local function rebalancer_wakeup()
- if self.rebalancer_fiber ~= nil then
- self.rebalancer_fiber:wakeup()
+ if M.rebalancer_fiber ~= nil then
+ M.rebalancer_fiber:wakeup()
end
end
@@ -1172,10 +1218,10 @@ end
-- not sends its state to rebalancer.
--
local function rebalancer_disable()
- self.is_rebalancer_active = false
+ M.is_rebalancer_active = false
end
local function rebalancer_enable()
- self.is_rebalancer_active = true
+ M.is_rebalancer_active = true
end
--------------------------------------------------------------------------------
@@ -1196,7 +1242,7 @@ local function storage_call(bucket_id, mode, name, args)
error('Unknown mode: '..tostring(mode))
end
- local ok, err = bucket_check_state(bucket_id, mode)
+ local ok, err = M.bucket_check_state(bucket_id, mode)
if not ok then
return nil, err
end
@@ -1216,15 +1262,15 @@ local function storage_cfg(cfg, this_replica_uuid)
if cfg.weights or cfg.zone then
error('Weights and zone are not allowed for storage configuration')
end
- local old_replicasets = self.replicasets
+ local old_replicasets = M.replicasets
if old_replicasets then
log.info("Starting reconfiguration of replica %s", this_replica_uuid)
else
log.info("Starting configuration of replica %s", this_replica_uuid)
end
- local was_master = self.this_replicaset ~= nil and
- self.this_replicaset.master == self.this_replica
+ local was_master = M.this_replicaset ~= nil and
+ M.this_replicaset.master == M.this_replica
local this_replicaset
local this_replica
@@ -1256,8 +1302,8 @@ local function storage_cfg(cfg, this_replica_uuid)
-- disabled and there are triggers on master disable, then
-- they would not be able to modify anything, if 'read_only'
-- had been set here. 'Read_only' is set in
- -- local_master_disable after triggers and is unset in
- -- local_master_enable before triggers.
+ -- local_on_master_disable after triggers and is unset in
+ -- local_on_master_enable before triggers.
--
-- If a master role of the replica is not changed, then
-- 'read_only' can be set right here.
@@ -1272,44 +1318,45 @@ local function storage_cfg(cfg, this_replica_uuid)
end
cfg.instance_uuid = this_replica.uuid
cfg.replicaset_uuid = this_replicaset.uuid
- total_bucket_count = cfg.bucket_count or consts.DEFAULT_BUCKET_COUNT
- rebalancer_disbalance_threshold =
+ M.total_bucket_count = cfg.bucket_count or consts.DEFAULT_BUCKET_COUNT
+ M.rebalancer_disbalance_threshold =
cfg.rebalancer_disbalance_threshold or
consts.DEFAULT_REBALANCER_DISBALANCE_THRESHOLD
- rebalancer_max_receiving = cfg.rebalancer_max_receiving or
- consts.DEFAULT_REBALANCER_MAX_RECEIVING
- self.shard_index = cfg.shard_index or 'bucket_id'
+ M.rebalancer_max_receiving = cfg.rebalancer_max_receiving or
+ consts.DEFAULT_REBALANCER_MAX_RECEIVING
+ M.shard_index = cfg.shard_index or 'bucket_id'
lcfg.prepare_for_box_cfg(cfg)
box.cfg(cfg)
log.info("Box has been configured")
- self.replicasets = new_replicasets
- self.this_replicaset = this_replicaset
- self.this_replica = this_replica
+ M.replicasets = new_replicasets
+ M.this_replicaset = this_replicaset
+ M.this_replica = this_replica
local uri = luri.parse(this_replica.uri)
box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
if was_master and not is_master then
- local_master_disable()
+ M.local_on_master_disable()
end
if not was_master and is_master then
- local_master_enable()
+ M.local_on_master_enable()
end
if min_master == this_replica then
- if not self.rebalancer_fiber then
+ if not M.rebalancer_fiber then
log.info('Run rebalancer')
- self.rebalancer_fiber = lfiber.create(rebalancer_f)
+ M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M,
+ 'rebalancer_f', 'Rebalancer')
else
log.info('Wakeup rebalancer')
-- Configuration had changed. Time to rebalance.
- self.rebalancer_fiber:wakeup()
+ M.rebalancer_fiber:wakeup()
end
- elseif self.rebalancer_fiber then
+ elseif M.rebalancer_fiber then
log.info('Rebalancer location has changed to %s', min_master)
- self.rebalancer_fiber:cancel()
- self.rebalancer_fiber = nil
+ M.rebalancer_fiber:cancel()
+ M.rebalancer_fiber = nil
end
if old_replicasets then
lreplicaset.destroy(old_replicasets)
@@ -1347,13 +1394,13 @@ local function storage_info()
}
local code = lerror.code
local alert = lerror.alert
- local this_uuid = self.this_replicaset.uuid
- local this_master = self.this_replicaset.master
+ local this_uuid = M.this_replicaset.uuid
+ local this_master = M.this_replicaset.master
if this_master == nil then
table.insert(state.alerts, alert(code.MISSING_MASTER, this_uuid))
state.status = math.max(state.status, consts.STATUS.ORANGE)
end
- if this_master and this_master ~= self.this_replica then
+ if this_master and this_master ~= M.this_replica then
for id, replica in pairs(box.info.replication) do
if replica.uuid ~= this_master.uuid then
goto cont
@@ -1393,7 +1440,7 @@ local function storage_info()
state.replication.status = 'master'
local redundancy = 0
for id, replica in pairs(box.info.replication) do
- if replica.uuid ~= self.this_replica.uuid then
+ if replica.uuid ~= M.this_replica.uuid then
if replica.downstream == nil then
table.insert(state.alerts, alert(code.UNREACHABLE_REPLICA,
replica.uuid))
@@ -1430,7 +1477,7 @@ local function storage_info()
end
local ireplicasets = {}
- for uuid, replicaset in pairs(self.replicasets) do
+ for uuid, replicaset in pairs(M.replicasets) do
local master = replicaset.master
if not master then
ireplicasets[uuid] = {uuid = uuid, master = 'missing'}
@@ -1453,14 +1500,44 @@ end
--------------------------------------------------------------------------------
-- Module definition
--------------------------------------------------------------------------------
-
-self.find_sharded_spaces = find_sharded_spaces
-self.find_garbage_bucket = find_garbage_bucket
-self.collect_garbage_step = collect_garbage_step
-self.collect_garbage_f = collect_garbage_f
-
-self.rebalancer_build_routes = rebalancer_build_routes
-self.rebalancer_calculate_metrics = rebalancer_calculate_metrics
+--
+-- Put here all functions, that are used inside vshard.storage for
+-- atomic reload. If reload failes above, then interpreter do not
+-- come here, and M is not changed.
+-- If a function is used only outside, then no necessity to store
+-- it in M. It is reloaded atomically due to return below.
+--
+M.this_is_master = this_is_master
+M.recovery_step = recovery_step
+M.recovery_garbage_receiving_buckets = recovery_garbage_receiving_buckets
+M.recovery_f = recovery_f
+M.sync = sync
+M.bucket_is_garbage = bucket_is_garbage
+M.bucket_check_state = bucket_check_state
+M.find_sharded_spaces = find_sharded_spaces
+M.bucket_collect_internal = bucket_collect_internal
+M.local_on_master_disable = local_on_master_disable
+M.local_on_master_enable = local_on_master_enable
+M.bucket_send = bucket_send
+M.find_garbage_bucket = find_garbage_bucket
+M.collect_garbage_bucket_in_space = collect_garbage_bucket_in_space
+M.collect_garbage_step = collect_garbage_step
+M.collect_garbage_drop_redirects = collect_garbage_drop_redirects
+M.collect_garbage_update_bucket = collect_garbage_update_bucket
+M.collect_garbage_f = collect_garbage_f
+M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
+M.rebalancer_take_buckets_from_pool = rebalancer_take_buckets_from_pool
+M.rebalancer_build_routes = rebalancer_build_routes
+M.rebalancer_apply_routes_f = rebalancer_apply_routes_f
+M.rebalancing_is_in_progress = rebalancing_is_in_progress
+M.rebalancer_download_states = rebalancer_download_states
+M.rebalancer_f = rebalancer_f
+
+if not rawget(_G, '__module_vshard_storage') then
+ rawset(_G, '__module_vshard_storage', M)
+else
+ M.module_version = M.module_version + 1
+end
return {
sync = sync;
@@ -1483,7 +1560,8 @@ return {
buckets_count = storage_buckets_count;
buckets_discovery = buckets_discovery;
rebalancer_request_state = rebalancer_request_state;
- internal = self;
+ internal = M;
on_master_enable = on_master_enable;
on_master_disable = on_master_disable;
+ module_version = function() return M.module_version end;
}
diff --git a/vshard/util.lua b/vshard/util.lua
index 9b56a78..f2feec1 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -1,4 +1,5 @@
-- vshard.util
+log = require('log')
--
-- Extract parts of a tuple.
@@ -16,6 +17,35 @@ local function tuple_extract_key(tuple, parts)
return key
end
+--
+-- Wrapper to run @a func in infinite loop and restart it on the
+-- module reload. This function CAN NOT BE AUTORELOADED. To update
+-- it you must manualy stop all fibers, run by this function, do
+-- reload, and then restart all stopped fibers. This can be done,
+-- for example, by calling vshard.storage/router.cfg() again with
+-- the same config as earlier.
+--
+-- @param func Reloadable function to run. It must accept current
+-- module version as an argument, and interrupt itself,
+-- when it is changed.
+-- @param worker_name Name of the function. Usual infinite fiber
+-- represents a background subsystem, which has a name. For
+-- example: "Garbage Collector", "Recovery", "Discovery",
+-- "Rebalancer".
+-- @param M Module which can reload.
+--
+local function reloadable_fiber_f(M, func_name, worker_name)
+ while true do
+ local ok, err = pcall(M[func_name], M.module_version)
+ if not ok then
+ log.error('%s has been failed: %s', worker_name, err)
+ else
+ log.info('%s has been reloaded', worker_name)
+ end
+ end
+end
+
return {
- tuple_extract_key = tuple_extract_key
+ tuple_extract_key = tuple_extract_key,
+ reloadable_fiber_f = reloadable_fiber_f,
}
--
2.14.3 (Apple Git-98)
More information about the Tarantool-patches
mailing list