[patches] [PATCH vshard 2/2] storage: allow storage reload

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Feb 20 01:45:47 MSK 2018


Part of #72

Signed-off-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
---
 test/storage/reload.result   | 190 ++++++++++++++++++++
 test/storage/reload.test.lua |  93 ++++++++++
 test/unit/garbage.result     |   2 +-
 test/unit/garbage.test.lua   |   2 +-
 vshard/storage/init.lua      | 412 +++++++++++++++++++++++++------------------
 vshard/util.lua              |  32 +++-
 6 files changed, 561 insertions(+), 170 deletions(-)
 create mode 100644 test/storage/reload.result
 create mode 100644 test/storage/reload.test.lua

diff --git a/test/storage/reload.result b/test/storage/reload.result
new file mode 100644
index 0000000..ee37383
--- /dev/null
+++ b/test/storage/reload.result
@@ -0,0 +1,190 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+---
+...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'storage')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'storage')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+---
+...
+test_run:switch('storage_1_a')
+---
+- true
+...
+vshard.storage.bucket_force_create(1, vshard.consts.DEFAULT_BUCKET_COUNT / 2)
+---
+- true
+...
+test_run:switch('storage_2_a')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+vshard.storage.bucket_force_create(vshard.consts.DEFAULT_BUCKET_COUNT / 2 + 1, vshard.consts.DEFAULT_BUCKET_COUNT / 2)
+---
+- true
+...
+--
+-- Gh-72: allow reload. Test simple reload, error during
+-- reloading, ensure the fibers are restarted on reload.
+--
+assert(rawget(_G, '__module_vshard_storage') ~= nil)
+---
+- true
+...
+vshard.storage.module_version()
+---
+- 0
+...
+while test_run:grep_log('storage_2_a', 'The cluster is balanced ok') == nil do vshard.storage.rebalancer_wakeup() fiber.sleep(0.1) end
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function check_reloaded()
+	for k, v in pairs(old_internal) do
+		if v == vshard.storage.internal[k] then
+			return k
+		end
+	end
+end;
+---
+...
+function check_not_reloaded()
+	for k, v in pairs(old_internal) do
+		if v ~= vshard.storage.internal[k] then
+			return k
+		end
+	end
+end;
+---
+...
+function copy_functions(t)
+	local ret = {}
+	for k, v in pairs(t) do
+		if type(v) == 'function' then
+			ret[k] = v
+		end
+	end
+	return ret
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+--
+-- Simple reload. All functions are reloaded and they have
+-- another points in vshard.storage.internal.
+--
+old_internal = copy_functions(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_ = require('vshard.storage')
+---
+...
+vshard.storage.module_version()
+---
+- 1
+...
+check_reloaded()
+---
+...
+while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end
+---
+...
+while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+---
+...
+while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+---
+...
+check_reloaded()
+---
+...
+--
+-- Error during reload - in such a case no function can be
+-- updated. Reload is atomic.
+--
+vshard.storage.internal.errinj.ERRINJ_RELOAD = true
+---
+...
+old_internal = copy_functions(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+util = require('util')
+---
+...
+util.check_error(require, 'vshard.storage')
+---
+- 'Error injection: reload'
+...
+check_not_reloaded()
+---
+...
+vshard.storage.module_version()
+---
+- 1
+...
+--
+-- A next reload is ok, and all functions are updated.
+--
+vshard.storage.internal.errinj.ERRINJ_RELOAD = false
+---
+...
+old_internal = copy_functions(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_ = require('vshard.storage')
+---
+...
+vshard.storage.module_version()
+---
+- 2
+...
+check_reloaded()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
+test_run:cmd('clear filter')
+---
+- true
+...
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
new file mode 100644
index 0000000..50a402c
--- /dev/null
+++ b/test/storage/reload.test.lua
@@ -0,0 +1,93 @@
+test_run = require('test_run').new()
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+test_run:create_cluster(REPLICASET_1, 'storage')
+test_run:create_cluster(REPLICASET_2, 'storage')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+
+test_run:switch('storage_1_a')
+vshard.storage.bucket_force_create(1, vshard.consts.DEFAULT_BUCKET_COUNT / 2)
+test_run:switch('storage_2_a')
+fiber = require('fiber')
+vshard.storage.bucket_force_create(vshard.consts.DEFAULT_BUCKET_COUNT / 2 + 1, vshard.consts.DEFAULT_BUCKET_COUNT / 2)
+
+--
+-- Gh-72: allow reload. Test simple reload, error during
+-- reloading, ensure the fibers are restarted on reload.
+--
+
+assert(rawget(_G, '__module_vshard_storage') ~= nil)
+
+vshard.storage.module_version()
+while test_run:grep_log('storage_2_a', 'The cluster is balanced ok') == nil do vshard.storage.rebalancer_wakeup() fiber.sleep(0.1) end
+test_run:cmd("setopt delimiter ';'")
+function check_reloaded()
+	for k, v in pairs(old_internal) do
+		if v == vshard.storage.internal[k] then
+			return k
+		end
+	end
+end;
+function check_not_reloaded()
+	for k, v in pairs(old_internal) do
+		if v ~= vshard.storage.internal[k] then
+			return k
+		end
+	end
+end;
+function copy_functions(t)
+	local ret = {}
+	for k, v in pairs(t) do
+		if type(v) == 'function' then
+			ret[k] = v
+		end
+	end
+	return ret
+end;
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Simple reload. All functions are reloaded and they have
+-- another points in vshard.storage.internal.
+--
+old_internal = copy_functions(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+_ = require('vshard.storage')
+vshard.storage.module_version()
+
+check_reloaded()
+
+while test_run:grep_log('storage_2_a', 'Garbage collector has been reloaded') == nil do fiber.sleep(0.1) end
+while test_run:grep_log('storage_2_a', 'Recovery has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.recovery_wakeup() end
+while test_run:grep_log('storage_2_a', 'Rebalancer has been reloaded') == nil do fiber.sleep(0.1) vshard.storage.rebalancer_wakeup() end
+
+check_reloaded()
+
+--
+-- Error during reload - in such a case no function can be
+-- updated. Reload is atomic.
+--
+vshard.storage.internal.errinj.ERRINJ_RELOAD = true
+old_internal = copy_functions(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+util = require('util')
+util.check_error(require, 'vshard.storage')
+check_not_reloaded()
+vshard.storage.module_version()
+
+--
+-- A next reload is ok, and all functions are updated.
+--
+vshard.storage.internal.errinj.ERRINJ_RELOAD = false
+old_internal = copy_functions(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+_ = require('vshard.storage')
+vshard.storage.module_version()
+check_reloaded()
+
+test_run:switch('default')
+test_run:drop_cluster(REPLICASET_2)
+test_run:drop_cluster(REPLICASET_1)
+test_run:cmd('clear filter')
diff --git a/test/unit/garbage.result b/test/unit/garbage.result
index 3474aad..14e5721 100644
--- a/test/unit/garbage.result
+++ b/test/unit/garbage.result
@@ -317,7 +317,7 @@ control.bucket_generation_collected
 collect_f = vshard.storage.internal.collect_garbage_f
 ---
 ...
-f = fiber.create(collect_f)
+f = fiber.create(collect_f, vshard.storage.module_version())
 ---
 ...
 fill_spaces_with_garbage()
diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
index af80983..3db44f8 100644
--- a/test/unit/garbage.test.lua
+++ b/test/unit/garbage.test.lua
@@ -136,7 +136,7 @@ control.bucket_generation_collected
 -- Test continuous garbage collection via background fiber.
 --
 collect_f = vshard.storage.internal.collect_garbage_f
-f = fiber.create(collect_f)
+f = fiber.create(collect_f, vshard.storage.module_version())
 fill_spaces_with_garbage()
 -- Wait until garbage collection is finished.
 while #s2:select{} ~= 2 do fiber.sleep(0.1) end
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 3182ddc..25ee449 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -9,47 +9,60 @@ local lcfg = require('vshard.cfg')
 local lreplicaset = require('vshard.replicaset')
 local trigger = require('internal.trigger')
 
-local total_bucket_count = 0
-local rebalancer_disbalance_threshold = 0
-local rebalancer_max_receiving = 0
-
--- Internal state
-local self = {
+local M = rawget(_G, '__module_vshard_storage')
+if not M then
     --
-    -- All known replicasets used for bucket re-balancing.
-    -- See format in replicaset.lua.
+    -- The module is loaded for the first time.
     --
-    replicasets = nil,
-    -- Fiber to remove garbage buckets data.
-    garbage_collect_fiber = nil,
-    -- Bucket identifiers which are not active and are not being
-    -- sent - their status is unknown. Their state must be checked
-    -- periodically in recovery fiber.
-    buckets_to_recovery = {},
-    buckets_to_recovery_count = 0,
-    recovery_fiber = nil,
-    -- Fiber to rebalance a cluster.
-    rebalancer_fiber = nil,
-    -- Fiber which applies routes one by one. Its presense and
-    -- active status means that the rebalancing is in progress
-    -- now on the current node.
-    rebalancer_applier_fiber = nil,
-    -- Internal flag to activate and deactivate rebalancer. Mostly
-    -- for tests.
-    is_rebalancer_active = true,
-    -- Triggers on master switch event. They are called right
-    -- before the event occurs.
-    on_master_enable = trigger.new('on_master_enable'),
-    on_master_disable = trigger.new('on_master_disable'),
-    -- Index which is a trigger to shard its space by numbers in
-    -- this index. It must have at first part either unsigned,
-    -- or integer or number type and be not nullable. Values in
-    -- this part are considered as bucket identifiers.
-    shard_index = nil,
-    errinj = {
-        ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
+    M = {
+        --
+        -- All known replicasets used for bucket re-balancing.
+        -- See format in replicaset.lua.
+        --
+        replicasets = nil,
+        -- Fiber to remove garbage buckets data.
+        garbage_collect_fiber = nil,
+        -- Bucket identifiers which are not active and are not being
+        -- sent - their status is unknown. Their state must be checked
+        -- periodically in recovery fiber.
+        buckets_to_recovery = {},
+        buckets_to_recovery_count = 0,
+        recovery_fiber = nil,
+        -- Fiber to rebalance a cluster.
+        rebalancer_fiber = nil,
+        -- Fiber which applies routes one by one. Its presense and
+        -- active status means that the rebalancing is in progress
+        -- now on the current node.
+        rebalancer_applier_fiber = nil,
+        -- Internal flag to activate and deactivate rebalancer. Mostly
+        -- for tests.
+        is_rebalancer_active = true,
+        -- Triggers on master switch event. They are called right
+        -- before the event occurs.
+        _on_master_enable = trigger.new('_on_master_enable'),
+        _on_master_disable = trigger.new('_on_master_disable'),
+        -- Index which is a trigger to shard its space by numbers in
+        -- this index. It must have at first part either unsigned,
+        -- or integer or number type and be not nullable. Values in
+        -- this part are considered as bucket identifiers.
+        shard_index = nil,
+        -- Bucket count stored on all replicasets.
+        total_bucket_count = 0,
+        -- Maximal allowed percent deviation of bucket count on a
+        -- replicaset from ethalon bucket count.
+        rebalancer_disbalance_threshold = 0,
+        -- Maximal bucket count that can be received by a single
+        -- replicaset simultaneously.
+        rebalancer_max_receiving = 0,
+        errinj = {
+            ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
+            ERRINJ_RELOAD = false,
+        },
+        -- This counter is used to restart background fibers with
+        -- new reloaded code.
+        module_version = 0,
     }
-}
+end
 
 --------------------------------------------------------------------------------
 -- Schema
@@ -94,24 +107,24 @@ local function storage_schema_v1(username, password)
 end
 
 local function this_is_master()
-    return self.this_replicaset and self.this_replicaset.master and
-           self.this_replica == self.this_replicaset.master
+    return M.this_replicaset and M.this_replicaset.master and
+           M.this_replica == M.this_replicaset.master
 end
 
 local function on_master_disable(...)
-    self.on_master_disable(...)
+    M._on_master_disable(...)
     -- If a trigger is set after storage.cfg(), then notify an
     -- user, that the current instance is not master.
-    if #{...} == 1 and not this_is_master() then
-        self.on_master_disable:run()
+    if #{...} == 1 and not M.this_is_master() then
+        M._on_master_disable:run()
     end
 end
 
 local function on_master_enable(...)
-    self.on_master_enable(...)
+    M._on_master_enable(...)
     -- Same as above, but notify, that the instance is master.
-    if #{...} == 1 and this_is_master() then
-        self.on_master_enable:run()
+    if #{...} == 1 and M.this_is_master() then
+        M._on_master_enable:run()
     end
 end
 
@@ -137,7 +150,7 @@ local function recovery_step()
     local _bucket = box.space._bucket
     local new_count = 0
     local is_empty = true
-    for bucket_id, _ in pairs(self.buckets_to_recovery) do
+    for bucket_id, _ in pairs(M.buckets_to_recovery) do
         if is_empty then
             log.info('Starting buckets recovery step')
         end
@@ -151,7 +164,7 @@ local function recovery_step()
             table.insert(recovered, bucket_id)
             goto continue
         end
-        local destination = self.replicasets[bucket.destination]
+        local destination = M.replicasets[bucket.destination]
         if not destination or not destination.master then
             -- No replicaset master for a bucket. Wait until it
             -- appears.
@@ -198,9 +211,9 @@ local function recovery_step()
         box.commit()
     end
     for _, id in pairs(recovered) do
-        self.buckets_to_recovery[id] = nil
+        M.buckets_to_recovery[id] = nil
     end
-    self.buckets_to_recovery_count = new_count
+    M.buckets_to_recovery_count = new_count
 end
 
 --
@@ -222,19 +235,26 @@ local function recovery_garbage_receiving_buckets()
 end
 
 --
--- Background fiber to resolve status of buckets, whose 'sending'
--- has failed due to tarantool or network problems.
+-- Infinite function to resolve status of buckets, whose 'sending'
+-- has failed due to tarantool or network problems. Restarts on
+-- reload.
+-- @param module_version Module version, on which the current
+--        function had been started. If the actual module version
+--        appears to be changed, then stop recovery. It is
+--        restarted in reloadable_fiber_f().
 --
-local function recovery_f()
+local function recovery_f(module_version)
     lfiber.name('vshard.recovery')
     local _bucket = box.space._bucket
     local sending_buckets = _bucket.index.status:select{consts.BUCKET.SENDING}
-    self.buckets_to_recovery = {}
+    M.buckets_to_recovery = {}
     for _, bucket in pairs(sending_buckets) do
-        self.buckets_to_recovery[bucket.id] = true
+        M.buckets_to_recovery[bucket.id] = true
     end
-    while true do
-        local ok, err = pcall(recovery_step)
+    -- Interrupt recovery if a module has been reloaded. Perhaps,
+    -- there was found a bug, and reload fixes it.
+    while module_version == M.module_version do
+        local ok, err = pcall(M.recovery_step)
         if not ok then
             log.error('Error during buckets recovery: %s', err)
         end
@@ -246,8 +266,8 @@ end
 -- Immediately wakeup recovery fiber, if exists.
 --
 local function recovery_wakeup()
-    if self.recovery_fiber then
-        self.recovery_fiber:wakeup()
+    if M.recovery_fiber then
+        M.recovery_fiber:wakeup()
     end
 end
 
@@ -322,12 +342,12 @@ local function bucket_check_state(bucket_id, mode)
     assert(mode == 'read' or mode == 'write')
     local bucket = box.space._bucket:get({bucket_id})
     local errcode = nil
-    if bucket == nil or bucket_is_garbage(bucket) then
+    if bucket == nil or M.bucket_is_garbage(bucket) then
         errcode = lerror.code.WRONG_BUCKET
     elseif (bucket.status == consts.BUCKET.SENDING and mode ~= 'read') then
         errcode = lerror.code.TRANSFER_IS_IN_PROGRESS
     elseif bucket.status == consts.BUCKET.ACTIVE and mode ~= 'read' and
-           self.this_replicaset.master ~= self.this_replica then
+           M.this_replicaset.master ~= M.this_replica then
         errcode = lerror.code.NON_MASTER
     end
     if errcode ~= nil then
@@ -350,7 +370,7 @@ local function bucket_stat(bucket_id)
     end
     local bucket = box.space._bucket:get({bucket_id})
 
-    if not bucket or bucket_is_garbage(bucket) then
+    if not bucket or M.bucket_is_garbage(bucket) then
         return nil, lerror.vshard(lerror.code.WRONG_BUCKET,
                                   {bucket_id = bucket_id})
     else
@@ -446,7 +466,7 @@ end
 --
 local function find_sharded_spaces()
     local spaces = {}
-    local idx = self.shard_index
+    local idx = M.shard_index
     for k, space in pairs(box.space) do
         if type(k) == 'number' and space.index[idx] ~= nil then
             local parts = space.index[idx].parts
@@ -466,8 +486,8 @@ end
 --
 local function bucket_collect_internal(bucket_id)
     local data = {}
-    local spaces = find_sharded_spaces()
-    local idx = self.shard_index
+    local spaces = M.find_sharded_spaces()
+    local idx = M.shard_index
     for k, space in pairs(spaces) do
         assert(space.index[idx] ~= nil)
         local space_data = space.index[idx]:select({bucket_id})
@@ -476,6 +496,10 @@ local function bucket_collect_internal(bucket_id)
     return data
 end
 
+if M.errinj.ERRINJ_RELOAD then
+    error('Error injection: reload')
+end
+
 --
 -- Collect content of ACTIVE bucket.
 --
@@ -484,11 +508,11 @@ local function bucket_collect(bucket_id)
         error('Usage: bucket_collect(bucket_id)')
     end
 
-    local status, err = bucket_check_state(bucket_id, 'read')
+    local status, err = M.bucket_check_state(bucket_id, 'read')
     if not status then
         return nil, err
     end
-    return bucket_collect_internal(bucket_id)
+    return M.bucket_collect_internal(bucket_id)
 end
 
 --
@@ -507,23 +531,23 @@ end
 -- This function executes when a master role is removed from local
 -- instance during configuration
 --
-local function local_master_disable()
-    self.on_master_disable:run()
+local function local_on_master_disable()
+    M._on_master_disable:run()
     box.cfg({read_only = true})
     log.verbose("Resigning from the replicaset master role...")
     -- Stop garbage collecting
-    if self.garbage_collect_fiber ~= nil then
-        self.garbage_collect_fiber:cancel()
-        self.garbage_collect_fiber = nil
+    if M.garbage_collect_fiber ~= nil then
+        M.garbage_collect_fiber:cancel()
+        M.garbage_collect_fiber = nil
         log.info("GC stopped")
     end
-    if self.recovery_fiber ~= nil then
-        self.recovery_fiber:cancel()
-        self.recovery_fiber = nil
+    if M.recovery_fiber ~= nil then
+        M.recovery_fiber:cancel()
+        M.recovery_fiber = nil
         log.info('Recovery stopped')
     end
     -- Wait until replicas are synchronized before one another become a new master
-    sync(consts.SYNC_TIMEOUT)
+    M.sync(consts.SYNC_TIMEOUT)
     log.info("Resigned from the replicaset master role")
 end
 
@@ -533,15 +557,18 @@ local collect_garbage_f
 -- This function executes whan a master role is added to local
 -- instance during configuration
 --
-local function local_master_enable()
+local function local_on_master_enable()
     box.cfg({read_only = false})
-    self.on_master_enable:run()
+    M._on_master_enable:run()
     log.verbose("Taking on replicaset master role...")
-    recovery_garbage_receiving_buckets()
+    M.recovery_garbage_receiving_buckets()
     -- Start background process to collect garbage.
-    self.garbage_collect_fiber = lfiber.create(collect_garbage_f)
+    M.garbage_collect_fiber =
+        lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',
+                      'Garbage collector')
     log.info("GC is started")
-    self.recovery_fiber = lfiber.create(recovery_f)
+    M.recovery_fiber =
+        lfiber.create(util.reloadable_fiber_f, M, 'recovery_f', 'Recovery')
     log.info('Recovery is started')
     -- TODO: check current status
     log.info("Took on replicaset master role")
@@ -555,11 +582,11 @@ local function bucket_send(bucket_id, destination)
         error('Usage: bucket_send(bucket_id, destination)')
     end
 
-    local status, err = bucket_check_state(bucket_id, 'write')
+    local status, err = M.bucket_check_state(bucket_id, 'write')
     if not status then
         return nil, err
     end
-    local replicaset = self.replicasets[destination]
+    local replicaset = M.replicasets[destination]
     if replicaset == nil then
         return nil, lerror.vshard(lerror.code.NO_SUCH_REPLICASET,
                                   {replicaset_uuid = destination})
@@ -571,10 +598,10 @@ local function bucket_send(bucket_id, destination)
                                    replicaset_uuid = replicaset_uuid})
     end
 
-    local data = bucket_collect_internal(bucket_id)
+    local data = M.bucket_collect_internal(bucket_id)
     -- In a case of OOM or exception below the recovery fiber must
     -- handle the 'sending' bucket.
-    self.buckets_to_recovery[bucket_id] = true
+    M.buckets_to_recovery[bucket_id] = true
     box.space._bucket:replace({bucket_id, consts.BUCKET.SENDING, destination})
 
     local status, err =
@@ -584,12 +611,12 @@ local function bucket_send(bucket_id, destination)
         if err.type == 'ShardingError' then
             -- Rollback bucket state.
             box.space._bucket:replace({bucket_id, consts.BUCKET.ACTIVE})
-            self.buckets_to_recovery[bucket_id] = nil
+            M.buckets_to_recovery[bucket_id] = nil
         end
         return status, err
     end
     box.space._bucket:replace({bucket_id, consts.BUCKET.SENT, destination})
-    self.buckets_to_recovery[bucket_id] = nil
+    M.buckets_to_recovery[bucket_id] = nil
 
     return true
 end
@@ -621,7 +648,7 @@ local function find_garbage_bucket(bucket_index, control)
         t = box.space._bucket:get({bucket_id})
         -- If a bucket is stored in _bucket and is garbage - the
         -- result is found.
-        if t == nil or bucket_is_garbage(t) then
+        if t == nil or M.bucket_is_garbage(t) then
             return bucket_id
         end
         -- The found bucket is not garbage - continue search
@@ -629,8 +656,8 @@ local function find_garbage_bucket(bucket_index, control)
         curr_bucket = bucket_id + 1
         iterations = iterations + 1
         if iterations % 1000 == 0 or
-           self.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY then
-            while self.ERRINJ_BUCKET_FIND_GARBAGE_DELAY do
+           M.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY then
+            while M.ERRINJ_BUCKET_FIND_GARBAGE_DELAY do
                 lfiber.sleep(0.1)
             end
             -- Do not occupy 100% CPU.
@@ -678,15 +705,15 @@ local function collect_garbage_step(control)
     -- during garbage collection some buckets are removed and
     -- their tuples are in new spaces, then bucket_generation is
     -- incremented and such spaces are cleaned up on a next step.
-    local sharded_spaces = find_sharded_spaces()
+    local sharded_spaces = M.find_sharded_spaces()
     -- For each space:
     -- 1) Find garbage bucket. If not found, go to a next space;
     -- 2) Delete all its tuples;
     -- 3) Go to 1.
     for _, space in pairs(sharded_spaces) do
-        local bucket_index = space.index[self.shard_index]
+        local bucket_index = space.index[M.shard_index]
         while true do
-            local garbage_bucket = find_garbage_bucket(bucket_index, control)
+            local garbage_bucket = M.find_garbage_bucket(bucket_index, control)
             -- Stop the step, if a generation has changed.
             if bucket_generation ~= control.bucket_generation then
                 log.info('Interrupt garbage collection step')
@@ -695,7 +722,8 @@ local function collect_garbage_step(control)
             if garbage_bucket == nil then
                 break
             end
-            collect_garbage_bucket_in_space(space, bucket_index, garbage_bucket)
+            M.collect_garbage_bucket_in_space(space, bucket_index,
+                                              garbage_bucket)
             if bucket_generation ~= control.bucket_generation then
                 log.info('Interrupt garbage collection step')
                 return
@@ -766,8 +794,8 @@ local function collect_garbage_update_bucket()
 end
 
 --
--- Background garbage collector. Works on masters. The garbage
--- collector wakeups once per GARBAGE_COLLECT_INTERVAL seconds.
+-- Garbage collector. Works on masters. The garbage collector
+-- wakes up once per GARBAGE_COLLECT_INTERVAL seconds.
 -- After wakeup it checks follows the plan:
 -- 1) Check if _bucket has changed. If not, then sleep again;
 -- 2) Scan user spaces for not existing, sent and garbage buckets,
@@ -778,7 +806,12 @@ end
 -- 5) Sleep, go to (1).
 -- For each step detains see comments in code.
 --
-function collect_garbage_f()
+-- @param module_version Module version, on which the current
+--        function had been started. If the actual module version
+--        appears to be changed, then stop GC. It is restarted
+--        in reloadable_fiber_f().
+--
+function collect_garbage_f(module_version)
     lfiber.name('vshard.gc')
     -- Collector controller. Changes of _bucket increments
     -- bucket generation. Garbage collector has its own bucket
@@ -807,17 +840,22 @@ function collect_garbage_f()
     -- for next deletion.
     local empty_sent_buckets = {}
 
-    while true do
-::continue::
+    while M.module_version == module_version do
         -- Check if no changes in buckets configuration.
         if control.bucket_generation_collected ~= control.bucket_generation then
-            local status, err = pcall(collect_garbage_step, control)
+            local status, err = pcall(M.collect_garbage_step, control)
+            if M.module_version ~= module_version then
+                return
+            end
             if not status then
                 log.error('Error during garbage collection step: %s', err)
                 lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
                 goto continue
             end
-            status, empty_sent_buckets = pcall(collect_garbage_update_bucket)
+            status, empty_sent_buckets = pcall(M.collect_garbage_update_bucket)
+            if M.module_version ~= module_version then
+                return
+            end
             if not status then
                 log.error('Error during empty buckets processing: %s',
                           empty_sent_buckets)
@@ -828,8 +866,11 @@ function collect_garbage_f()
         end
         local duration = lfiber.time() - buckets_for_redirect_ts
         if duration >= consts.BUCKET_SENT_GARBAGE_DELAY then
-            local status, err = pcall(collect_garbage_drop_redirects,
+            local status, err = pcall(M.collect_garbage_drop_redirects,
                                       buckets_for_redirect)
+            if M.module_version ~= module_version then
+                return
+            end
             if not status then
                 log.error('Error during deletion of empty sent buckets: %s',
                           err)
@@ -840,6 +881,7 @@ function collect_garbage_f()
             end
         end
         lfiber.sleep(consts.GARBAGE_COLLECT_INTERVAL)
+::continue::
     end
 end
 
@@ -857,14 +899,14 @@ local function bucket_delete_garbage(bucket_id, opts)
     end
     opts = opts or {}
     local bucket = box.space._bucket:get({bucket_id})
-    if bucket ~= nil and not bucket_is_garbage(bucket) and not opts.force then
+    if bucket ~= nil and not M.bucket_is_garbage(bucket) and not opts.force then
         error('Can not delete not garbage bucket. Use "{force=true}" to '..
               'ignore this attention')
     end
-    local sharded_spaces = find_sharded_spaces()
-    local idx = self.shard_index
+    local sharded_spaces = M.find_sharded_spaces()
+    local idx = M.shard_index
     for _, space in pairs(sharded_spaces) do
-        collect_garbage_bucket_in_space(space, space.index[idx], bucket_id)
+        M.collect_garbage_bucket_in_space(space, space.index[idx], bucket_id)
     end
 end
 
@@ -972,8 +1014,8 @@ local function rebalancer_build_routes(replicasets)
     local bucket_routes = {}
     for uuid, replicaset in pairs(replicasets) do
         if replicaset.needed > 0 then
-            rebalancer_take_buckets_from_pool(bucket_pool, bucket_routes, uuid,
-                                              replicaset.needed)
+            M.rebalancer_take_buckets_from_pool(bucket_pool, bucket_routes,
+                                                uuid, replicaset.needed)
         end
     end
     return bucket_routes
@@ -992,15 +1034,15 @@ local function rebalancer_apply_routes_f(routes)
     -- var = fiber.create(), because when it yields, we have no
     -- guarantee that an event loop does not contain events
     -- between this fiber and its creator.
-    self.rebalancer_applier_fiber = lfiber.self()
+    M.rebalancer_applier_fiber = lfiber.self()
     local active_buckets = _status:select{consts.BUCKET.ACTIVE}
     local i = 1
     for dst_uuid, bucket_count in pairs(routes) do
         assert(i + bucket_count - 1 <= #active_buckets)
         log.info('Send %d buckets to %s', bucket_count,
-                 self.replicasets[dst_uuid])
+                 M.replicasets[dst_uuid])
         for j = i, i + bucket_count - 1 do
-            local status, ret = pcall(bucket_send, active_buckets[j].id,
+            local status, ret = pcall(M.bucket_send, active_buckets[j].id,
                                       dst_uuid)
             if not status or ret ~= true then
                 if not status then
@@ -1023,7 +1065,7 @@ end
 -- fiber.
 --
 local function rebalancing_is_in_progress()
-    local f = self.rebalancer_applier_fiber
+    local f = M.rebalancer_applier_fiber
     return f ~= nil and f:status() ~= 'dead'
 end
 
@@ -1034,10 +1076,10 @@ end
 -- }. Is used by a rebalancer.
 --
 local function rebalancer_apply_routes(routes)
-    assert(not rebalancing_is_in_progress())
+    assert(not M.rebalancing_is_in_progress())
     -- Can not apply routes here because of gh-946 in tarantool
     -- about problems with long polling. Apply routes in a fiber.
-    lfiber.create(rebalancer_apply_routes_f, routes)
+    lfiber.create(M.rebalancer_apply_routes_f, routes)
     return true
 end
 
@@ -1051,7 +1093,7 @@ end
 local function rebalancer_download_states()
     local replicasets = {}
     local total_bucket_active_count = 0
-    for uuid, replicaset in pairs(self.replicasets) do
+    for uuid, replicaset in pairs(M.replicasets) do
         local bucket_active_count =
             replicaset:callrw('vshard.storage.rebalancer_request_state', {})
         if bucket_active_count == nil then
@@ -1064,12 +1106,12 @@ local function rebalancer_download_states()
                              ethalon_bucket_count =
                                 replicaset.ethalon_bucket_count}
     end
-    if total_bucket_active_count == total_bucket_count then
+    if total_bucket_active_count == M.total_bucket_count then
         return replicasets
     else
         log.info('Total active bucket count is not equal to total. '..
                  'Possibly a boostrap is not finished yet. Expected %d, but '..
-                 'found %d', total_bucket_count, total_bucket_active_count)
+                 'found %d', M.total_bucket_count, total_bucket_active_count)
     end
 end
 
@@ -1077,15 +1119,17 @@ end
 -- Background rebalancer. Works on a storage which has the
 -- smallest replicaset uuid and which is master.
 --
-local function rebalancer_f()
+local function rebalancer_f(module_version)
     lfiber.name('vshard.rebalancer')
-    while true do
-::continue::
-        while not self.is_rebalancer_active do
+    while module_version == M.module_version do
+        while not M.is_rebalancer_active do
             log.info('Rebalancer is disabled. Sleep')
             lfiber.sleep(consts.REBALANCER_IDLE_INTERVAL)
         end
-        local status, replicasets = pcall(rebalancer_download_states)
+        local status, replicasets = pcall(M.rebalancer_download_states)
+        if M.module_version ~= module_version then
+            return
+        end
         if not status or replicasets == nil then
             if not status then
                 log.error('Error during downloading rebalancer states: %s',
@@ -1096,14 +1140,15 @@ local function rebalancer_f()
             goto continue
         end
         local max_disbalance =
-            rebalancer_calculate_metrics(replicasets, rebalancer_max_receiving)
-        if max_disbalance <= rebalancer_disbalance_threshold then
+            M.rebalancer_calculate_metrics(replicasets,
+                                           M.rebalancer_max_receiving)
+        if max_disbalance <= M.rebalancer_disbalance_threshold then
             log.info('The cluster is balanced ok. Schedule next rebalancing '..
                      'after %f seconds', consts.REBALANCER_IDLE_INTERVAL)
             lfiber.sleep(consts.REBALANCER_IDLE_INTERVAL)
             goto continue
         end
-        local routes = rebalancer_build_routes(replicasets)
+        local routes = M.rebalancer_build_routes(replicasets)
         -- Routes table can not be empty. If it had been empty,
         -- then max_disbalance would have been calculated
         -- incorrectly.
@@ -1114,7 +1159,7 @@ local function rebalancer_f()
         end
         assert(not is_empty)
         for src_uuid, src_routes in pairs(routes) do
-            local rs = self.replicasets[src_uuid]
+            local rs = M.replicasets[src_uuid]
             local status, err =
                 rs:callrw('vshard.storage.rebalancer_apply_routes',
                           {src_routes})
@@ -1128,6 +1173,7 @@ local function rebalancer_f()
         log.info('Rebalance routes are sent. Schedule next wakeup after '..
                  '%f seconds', consts.REBALANCER_WORK_INTERVAL)
         lfiber.sleep(consts.REBALANCER_WORK_INTERVAL)
+::continue::
     end
 end
 
@@ -1138,7 +1184,7 @@ end
 -- @retval     nil Not SENT or not ACTIVE buckets were found.
 --
 local function rebalancer_request_state()
-    if not self.is_rebalancer_active or rebalancing_is_in_progress() then
+    if not M.is_rebalancer_active or M.rebalancing_is_in_progress() then
         return
     end
     local _bucket = box.space._bucket
@@ -1161,8 +1207,8 @@ end
 -- node.
 --
 local function rebalancer_wakeup()
-    if self.rebalancer_fiber ~= nil then
-        self.rebalancer_fiber:wakeup()
+    if M.rebalancer_fiber ~= nil then
+        M.rebalancer_fiber:wakeup()
     end
 end
 
@@ -1172,10 +1218,10 @@ end
 -- not sends its state to rebalancer.
 --
 local function rebalancer_disable()
-    self.is_rebalancer_active = false
+    M.is_rebalancer_active = false
 end
 local function rebalancer_enable()
-    self.is_rebalancer_active = true
+    M.is_rebalancer_active = true
 end
 
 --------------------------------------------------------------------------------
@@ -1196,7 +1242,7 @@ local function storage_call(bucket_id, mode, name, args)
         error('Unknown mode: '..tostring(mode))
     end
 
-    local ok, err = bucket_check_state(bucket_id, mode)
+    local ok, err = M.bucket_check_state(bucket_id, mode)
     if not ok then
         return nil, err
     end
@@ -1216,15 +1262,15 @@ local function storage_cfg(cfg, this_replica_uuid)
     if cfg.weights or cfg.zone then
         error('Weights and zone are not allowed for storage configuration')
     end
-    local old_replicasets = self.replicasets
+    local old_replicasets = M.replicasets
     if old_replicasets then
         log.info("Starting reconfiguration of replica %s", this_replica_uuid)
     else
         log.info("Starting configuration of replica %s", this_replica_uuid)
     end
 
-    local was_master = self.this_replicaset ~= nil and
-                       self.this_replicaset.master == self.this_replica
+    local was_master = M.this_replicaset ~= nil and
+                       M.this_replicaset.master == M.this_replica
 
     local this_replicaset
     local this_replica
@@ -1256,8 +1302,8 @@ local function storage_cfg(cfg, this_replica_uuid)
     -- disabled and there are triggers on master disable, then
     -- they would not be able to modify anything, if 'read_only'
     -- had been set here. 'Read_only' is set in
-    -- local_master_disable after triggers and is unset in
-    -- local_master_enable before triggers.
+    -- local_on_master_disable after triggers and is unset in
+    -- local_on_master_enable before triggers.
     --
     -- If a master role of the replica is not changed, then
     -- 'read_only' can be set right here.
@@ -1272,44 +1318,45 @@ local function storage_cfg(cfg, this_replica_uuid)
     end
     cfg.instance_uuid = this_replica.uuid
     cfg.replicaset_uuid = this_replicaset.uuid
-    total_bucket_count = cfg.bucket_count or consts.DEFAULT_BUCKET_COUNT
-    rebalancer_disbalance_threshold =
+    M.total_bucket_count = cfg.bucket_count or consts.DEFAULT_BUCKET_COUNT
+    M.rebalancer_disbalance_threshold =
         cfg.rebalancer_disbalance_threshold or
         consts.DEFAULT_REBALANCER_DISBALANCE_THRESHOLD
-    rebalancer_max_receiving = cfg.rebalancer_max_receiving or
-                               consts.DEFAULT_REBALANCER_MAX_RECEIVING
-    self.shard_index = cfg.shard_index or 'bucket_id'
+    M.rebalancer_max_receiving = cfg.rebalancer_max_receiving or
+                                    consts.DEFAULT_REBALANCER_MAX_RECEIVING
+    M.shard_index = cfg.shard_index or 'bucket_id'
     lcfg.prepare_for_box_cfg(cfg)
 
     box.cfg(cfg)
     log.info("Box has been configured")
-    self.replicasets = new_replicasets
-    self.this_replicaset = this_replicaset
-    self.this_replica = this_replica
+    M.replicasets = new_replicasets
+    M.this_replicaset = this_replicaset
+    M.this_replica = this_replica
     local uri = luri.parse(this_replica.uri)
     box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
 
     if was_master and not is_master then
-        local_master_disable()
+        M.local_on_master_disable()
     end
 
     if not was_master and is_master then
-        local_master_enable()
+        M.local_on_master_enable()
     end
 
     if min_master == this_replica then
-        if not self.rebalancer_fiber then
+        if not M.rebalancer_fiber then
             log.info('Run rebalancer')
-            self.rebalancer_fiber = lfiber.create(rebalancer_f)
+            M.rebalancer_fiber = lfiber.create(util.reloadable_fiber_f, M,
+                                               'rebalancer_f', 'Rebalancer')
         else
             log.info('Wakeup rebalancer')
             -- Configuration had changed. Time to rebalance.
-            self.rebalancer_fiber:wakeup()
+            M.rebalancer_fiber:wakeup()
         end
-    elseif self.rebalancer_fiber then
+    elseif M.rebalancer_fiber then
         log.info('Rebalancer location has changed to %s', min_master)
-        self.rebalancer_fiber:cancel()
-        self.rebalancer_fiber = nil
+        M.rebalancer_fiber:cancel()
+        M.rebalancer_fiber = nil
     end
     if old_replicasets then
         lreplicaset.destroy(old_replicasets)
@@ -1347,13 +1394,13 @@ local function storage_info()
     }
     local code = lerror.code
     local alert = lerror.alert
-    local this_uuid = self.this_replicaset.uuid
-    local this_master = self.this_replicaset.master
+    local this_uuid = M.this_replicaset.uuid
+    local this_master = M.this_replicaset.master
     if this_master == nil then
         table.insert(state.alerts, alert(code.MISSING_MASTER, this_uuid))
         state.status = math.max(state.status, consts.STATUS.ORANGE)
     end
-    if this_master and this_master ~= self.this_replica then
+    if this_master and this_master ~= M.this_replica then
         for id, replica in pairs(box.info.replication) do
             if replica.uuid ~= this_master.uuid then
                 goto cont
@@ -1393,7 +1440,7 @@ local function storage_info()
         state.replication.status = 'master'
         local redundancy = 0
         for id, replica in pairs(box.info.replication) do
-            if replica.uuid ~= self.this_replica.uuid then
+            if replica.uuid ~= M.this_replica.uuid then
                 if replica.downstream == nil then
                     table.insert(state.alerts, alert(code.UNREACHABLE_REPLICA,
                                                      replica.uuid))
@@ -1430,7 +1477,7 @@ local function storage_info()
     end
 
     local ireplicasets = {}
-    for uuid, replicaset in pairs(self.replicasets) do
+    for uuid, replicaset in pairs(M.replicasets) do
         local master = replicaset.master
         if not master then
             ireplicasets[uuid] = {uuid = uuid, master = 'missing'}
@@ -1453,14 +1500,44 @@ end
 --------------------------------------------------------------------------------
 -- Module definition
 --------------------------------------------------------------------------------
-
-self.find_sharded_spaces = find_sharded_spaces
-self.find_garbage_bucket = find_garbage_bucket
-self.collect_garbage_step = collect_garbage_step
-self.collect_garbage_f = collect_garbage_f
-
-self.rebalancer_build_routes = rebalancer_build_routes
-self.rebalancer_calculate_metrics = rebalancer_calculate_metrics
+--
+-- Put here all functions, that are used inside vshard.storage for
+-- atomic reload. If reload failes above, then interpreter do not
+-- come here, and M is not changed.
+-- If a function is used only outside, then no necessity to store
+-- it in M. It is reloaded atomically due to return below.
+--
+M.this_is_master = this_is_master
+M.recovery_step = recovery_step
+M.recovery_garbage_receiving_buckets = recovery_garbage_receiving_buckets
+M.recovery_f = recovery_f
+M.sync = sync
+M.bucket_is_garbage = bucket_is_garbage
+M.bucket_check_state = bucket_check_state
+M.find_sharded_spaces = find_sharded_spaces
+M.bucket_collect_internal = bucket_collect_internal
+M.local_on_master_disable = local_on_master_disable
+M.local_on_master_enable = local_on_master_enable
+M.bucket_send = bucket_send
+M.find_garbage_bucket = find_garbage_bucket
+M.collect_garbage_bucket_in_space = collect_garbage_bucket_in_space
+M.collect_garbage_step = collect_garbage_step
+M.collect_garbage_drop_redirects = collect_garbage_drop_redirects
+M.collect_garbage_update_bucket = collect_garbage_update_bucket
+M.collect_garbage_f = collect_garbage_f
+M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
+M.rebalancer_take_buckets_from_pool = rebalancer_take_buckets_from_pool
+M.rebalancer_build_routes = rebalancer_build_routes
+M.rebalancer_apply_routes_f = rebalancer_apply_routes_f
+M.rebalancing_is_in_progress = rebalancing_is_in_progress
+M.rebalancer_download_states = rebalancer_download_states
+M.rebalancer_f = rebalancer_f
+
+if not rawget(_G, '__module_vshard_storage') then
+    rawset(_G, '__module_vshard_storage', M)
+else
+    M.module_version = M.module_version + 1
+end
 
 return {
     sync = sync;
@@ -1483,7 +1560,8 @@ return {
     buckets_count = storage_buckets_count;
     buckets_discovery = buckets_discovery;
     rebalancer_request_state = rebalancer_request_state;
-    internal = self;
+    internal = M;
     on_master_enable = on_master_enable;
     on_master_disable = on_master_disable;
+    module_version = function() return M.module_version end;
 }
diff --git a/vshard/util.lua b/vshard/util.lua
index 9b56a78..f2feec1 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -1,4 +1,5 @@
 -- vshard.util
+log = require('log')
 
 --
 -- Extract parts of a tuple.
@@ -16,6 +17,35 @@ local function tuple_extract_key(tuple, parts)
     return key
 end
 
+--
+-- Wrapper to run @a func in infinite loop and restart it on the
+-- module reload. This function CAN NOT BE AUTORELOADED. To update
+-- it you must manualy stop all fibers, run by this function, do
+-- reload, and then restart all stopped fibers. This can be done,
+-- for example, by calling vshard.storage/router.cfg() again with
+-- the same config as earlier.
+--
+-- @param func Reloadable function to run. It must accept current
+--        module version as an argument, and interrupt itself,
+--        when it is changed.
+-- @param worker_name Name of the function. Usual infinite fiber
+--        represents a background subsystem, which has a name. For
+--        example: "Garbage Collector", "Recovery", "Discovery",
+--        "Rebalancer".
+-- @param M Module which can reload.
+--
+local function reloadable_fiber_f(M, func_name, worker_name)
+    while true do
+        local ok, err = pcall(M[func_name], M.module_version)
+        if not ok then
+            log.error('%s has been failed: %s', worker_name, err)
+        else
+            log.info('%s has been reloaded', worker_name)
+        end
+    end
+end
+
 return {
-    tuple_extract_key = tuple_extract_key
+    tuple_extract_key = tuple_extract_key,
+    reloadable_fiber_f = reloadable_fiber_f,
 }
-- 
2.14.3 (Apple Git-98)




More information about the Tarantool-patches mailing list