[tarantool-patches] [PATCH 2/2] Complete module reload
AKhatskevich
avkhatskevich at tarantool.org
Sat Jun 9 20:47:16 MSK 2018
In case one need to upgrade vshard to a new version, this commit
introduces a reload mechanism which allows to do that for a
noticeable variety of possible changes (between the two versions).
Reload process:
* reload all vshard modules
* create new `replicaset` and `replica` objects
* reuse old netbox connections in new replica objects if
possible
* update router/storage.internal table
* after a `reload_deprecate_timeout` disable old instances of
`replicaset` and `replica` objects
Reload works for modules:
* vshard.router
* vshard.storage
In case reload process fails, old router/storage module continue
working properly.
Extra changes:
* add `util.async_task` method, which runs a function after a
delay
* delete replicaset:rebind_connections method as it is replaced
with `rebind_replicasets` which updates all replicasets at once
* introduce `module_reloading` for distinguishing reloaf from
reconfigure
* introduce MODULE_INTERNALS which stores name of the module
internal data in the global namespace
Closes #112
---
test/router/reload.result | 109 +++++++++++++++++++++++++++++++++++++++++++
test/router/reload.test.lua | 41 ++++++++++++++++
test/router/router.result | 3 +-
test/storage/reload.result | 61 ++++++++++++++++++++++++
test/storage/reload.test.lua | 24 ++++++++++
vshard/replicaset.lua | 91 +++++++++++++++++++++++++++++-------
vshard/router/init.lua | 40 ++++++++++++----
vshard/storage/init.lua | 44 ++++++++++++-----
vshard/util.lua | 20 ++++++++
9 files changed, 392 insertions(+), 41 deletions(-)
diff --git a/test/router/reload.result b/test/router/reload.result
index 19a9ead..083475f 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -174,6 +174,115 @@ vshard.router.module_version()
check_reloaded()
---
...
+--
+-- Deprecate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_ = require('vshard.router')
+---
+...
+-- Make sure async task (deprecator) has had cpu time.
+fiber.sleep(0.005)
+---
+...
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+---
+...
+err:match('Object replicaset is outdated.*')
+---
+- Object replicaset is outdated. Use new instance
+...
+vshard.router = require('vshard.router')
+---
+...
+rs = vshard.router.route(1)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+-- Test deprecate timeout.
+vshard.router.internal.reload_deprecate_timeout = 0.3
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_ = require('vshard.router')
+---
+...
+vshard.router.internal.reload_deprecate_timeout = nil
+---
+...
+vshard.router = require('vshard.router')
+---
+...
+rs_new = vshard.router.route(1)
+---
+...
+_ = rs_new:callro('echo', {'some_data'})
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+fiber.sleep(0.2)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+fiber.sleep(0.2)
+---
+...
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+---
+...
+err:match('Object replicaset is outdated.*')
+---
+- Object replicaset is outdated. Use new instance
+...
+_ = rs_new:callro('echo', {'some_data'})
+---
+...
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
+vshard.router.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.router.internal)
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_, err = pcall(require, 'vshard.router')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.router.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.router.internal)
+---
+- true
+...
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
test_run:switch('default')
---
- true
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 6e21b74..94cd8cc 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -86,6 +86,47 @@ _ = require('vshard.router')
vshard.router.module_version()
check_reloaded()
+--
+-- Deprecate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+_ = rs:callro('echo', {'some_data'})
+package.loaded["vshard.router"] = nil
+_ = require('vshard.router')
+-- Make sure async task (deprecator) has had cpu time.
+fiber.sleep(0.005)
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+err:match('Object replicaset is outdated.*')
+vshard.router = require('vshard.router')
+rs = vshard.router.route(1)
+_ = rs:callro('echo', {'some_data'})
+-- Test deprecate timeout.
+vshard.router.internal.reload_deprecate_timeout = 0.3
+package.loaded["vshard.router"] = nil
+_ = require('vshard.router')
+vshard.router.internal.reload_deprecate_timeout = nil
+vshard.router = require('vshard.router')
+rs_new = vshard.router.route(1)
+_ = rs_new:callro('echo', {'some_data'})
+_ = rs:callro('echo', {'some_data'})
+fiber.sleep(0.2)
+_ = rs:callro('echo', {'some_data'})
+fiber.sleep(0.2)
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+err:match('Object replicaset is outdated.*')
+_ = rs_new:callro('echo', {'some_data'})
+
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+vshard.router.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.router.internal)
+package.loaded["vshard.router"] = nil
+_, err = pcall(require, 'vshard.router')
+err:match('Error injection:.*')
+vshard.router.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.router.internal)
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+
test_run:switch('default')
test_run:cmd('stop server router_1')
test_run:cmd('cleanup server router_1')
diff --git a/test/router/router.result b/test/router/router.result
index 3ebab5d..71e156c 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -1024,11 +1024,10 @@ error_messages
- - Use replicaset:callro(...) instead of replicaset.callro(...)
- Use replicaset:connect_master(...) instead of replicaset.connect_master(...)
- Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...)
- - Use replicaset:rebind_connections(...) instead of replicaset.rebind_connections(...)
- Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...)
- Use replicaset:call(...) instead of replicaset.call(...)
- - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
- Use replicaset:connect(...) instead of replicaset.connect(...)
+ - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
- Use replicaset:callrw(...) instead of replicaset.callrw(...)
- Use replicaset:connect_all(...) instead of replicaset.connect_all(...)
...
diff --git a/test/storage/reload.result b/test/storage/reload.result
index f689cf4..b7070e5 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -174,6 +174,67 @@ vshard.storage.module_version()
check_reloaded()
---
...
+--
+-- Deprecate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_ = require('vshard.storage')
+---
+...
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+---
+...
+err:match('Object replicaset is outdated.*')
+---
+- Object replicaset is outdated. Use new instance
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs.callro(rs, 'echo', {'some_data'})
+---
+...
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_, err = pcall(require, 'vshard.storage')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.storage.internal)
+---
+- true
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
test_run:switch('default')
---
- true
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 6e19a92..bf8e803 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -87,6 +87,30 @@ _ = require('vshard.storage')
vshard.storage.module_version()
check_reloaded()
+--
+-- Deprecate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+package.loaded["vshard.storage"] = nil
+_ = require('vshard.storage')
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+err:match('Object replicaset is outdated.*')
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs.callro(rs, 'echo', {'some_data'})
+
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs:callro('echo', {'some_data'})
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+_, err = pcall(require, 'vshard.storage')
+err:match('Error injection:.*')
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.storage.internal)
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs:callro('echo', {'some_data'})
+
test_run:switch('default')
test_run:drop_cluster(REPLICASET_2)
test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 99f59aa..48053a0 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -48,7 +48,8 @@ local lerror = require('vshard.error')
local fiber = require('fiber')
local luri = require('uri')
local ffi = require('ffi')
-local gsc = require('vshard.util').generate_self_checker
+local util = require('vshard.util')
+local gsc = util.generate_self_checker
--
-- on_connect() trigger for net.box
@@ -338,26 +339,79 @@ local function replicaset_tostring(replicaset)
end
--
--- Rebind connections of old replicas to new ones.
+-- Deprecate old objects in case of reload.
+--
+local function clean_old_objects(old_replicaset_mt, old_replica_mt,
+ old_replicas)
+ local function get_deprecated_warning(obj_name)
+ return function(...)
+ local finfo = debug.getinfo(2)
+ local file = finfo.short_src
+ local name = finfo.name
+ local line = finfo.currentline
+ local err_fmt = '%s.%s:%s: Object %s is outdated. Use new instance'
+ error(string.format(err_fmt, file, name, line, obj_name))
+ end
+ end
+ local function replace_mt_methods(mt, method)
+ if not mt then
+ return
+ end
+ for name, _ in pairs(mt.__index) do
+ mt.__index[name] = method
+ end
+ end
+ replace_mt_methods(old_replicaset_mt, get_deprecated_warning('replicaset'))
+ replace_mt_methods(old_replica_mt, get_deprecated_warning('replica'))
+ for _, replica in pairs(old_replicas) do
+ replica.conn = nil
+ end
+ log.info('Old replicaset and replica objects are cleaned up.')
+end
+
--
-local function replicaset_rebind_connections(replicaset)
- for _, replica in pairs(replicaset.replicas) do
- local old_replica = replica.old_replica
- if old_replica then
- local conn = old_replica.conn
- replica.conn = conn
- replica.down_ts = old_replica.down_ts
- replica.net_timeout = old_replica.net_timeout
- replica.net_sequential_ok = old_replica.net_sequential_ok
- replica.net_sequential_fail = old_replica.net_sequential_fail
- if conn then
- conn.replica = replica
- conn.replicaset = replicaset
- old_replica.conn = nil
+-- Copy netbox conections from old replica objects to new ones.
+--
+local function rebind_replicasets(replicasets, deprecate, deprecate_timeout)
+ -- Collect data to deprecate old objects.
+ local old_replicaset_mt = nil
+ local old_replica_mt = nil
+ local old_replicas = {}
+ for _, replicaset in pairs(replicasets) do
+ if replicaset.old_replicaset then
+ local old_mt = getmetatable(replicaset.old_replicaset)
+ assert(old_replicaset_mt == nil or old_replicaset_mt == old_mt)
+ assert(not deprecate or
+ old_replicaset_mt ~= getmetatable(replicaset))
+ old_replicaset_mt = old_mt
+ replicaset.old_replicaset = nil
+ end
+ for _, replica in pairs(replicaset.replicas) do
+ local old_replica = replica.old_replica
+ if old_replica then
+ local old_mt = getmetatable(old_replica)
+ assert(old_replica_mt == nil or old_replica_mt == old_mt)
+ assert(not deprecate or old_replica_mt ~= getmetatable(replica))
+ old_replica_mt = old_mt
+ table.insert(old_replicas, old_replica)
+ local conn = old_replica.conn
+ replica.conn = conn
+ replica.down_ts = old_replica.down_ts
+ replica.net_timeout = old_replica.net_timeout
+ replica.net_sequential_ok = old_replica.net_sequential_ok
+ replica.net_sequential_fail = old_replica.net_sequential_fail
+ if conn then
+ conn.replica = replica
+ conn.replicaset = replicaset
+ end
+ replica.old_replica = nil
end
- replica.old_replica = nil
end
end
+ if deprecate then
+ util.async_task(deprecate_timeout, clean_old_objects, old_replicaset_mt,
+ old_replica_mt, old_replicas)
+ end
end
--
@@ -369,7 +423,6 @@ local replicaset_mt = {
connect_master = replicaset_connect_master;
connect_all = replicaset_connect_all;
connect_replica = replicaset_connect_to_replica;
- rebind_connections = replicaset_rebind_connections;
down_replica_priority = replicaset_down_replica_priority;
up_replica_priority = replicaset_up_replica_priority;
call = replicaset_master_call;
@@ -523,6 +576,7 @@ local function buildall(sharding_cfg, old_replicasets)
weight = replicaset.weight,
bucket_count = 0,
lock = replicaset.lock,
+ old_replicaset = old_replicaset,
}, replicaset_mt)
local priority_list = {}
for replica_uuid, replica in pairs(replicaset.replicas) do
@@ -596,4 +650,5 @@ return {
buildall = buildall,
calculate_etalon_balance = cluster_calculate_etalon_balance,
wait_masters_connect = wait_masters_connect,
+ rebind_replicasets = rebind_replicasets,
}
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 1dee80c..c075436 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -1,5 +1,17 @@
local log = require('log')
local lfiber = require('fiber')
+
+local MODULE_INTERNALS = '__module_vshard_router'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+ local vshard_modules = {
+ 'vshard.consts', 'vshard.error', 'vshard.cfg',
+ 'vshard.hash', 'vshard.replicaset', 'vshard.util',
+ }
+ for _, module in pairs(vshard_modules) do
+ package.loaded[module] = nil
+ end
+end
local consts = require('vshard.consts')
local lerror = require('vshard.error')
local lcfg = require('vshard.cfg')
@@ -7,7 +19,7 @@ local lhash = require('vshard.hash')
local lreplicaset = require('vshard.replicaset')
local util = require('vshard.util')
-local M = rawget(_G, '__module_vshard_router')
+local M = rawget(_G, MODULE_INTERNALS)
if not M then
M = {
errinj = {
@@ -17,6 +29,8 @@ if not M then
},
-- Bucket map cache.
route_map = {},
+ -- Time to deprecate old objects on reload.
+ reload_deprecate_timeout = nil,
-- All known replicasets used for bucket re-balancing
replicasets = nil,
-- Fiber to maintain replica connections.
@@ -129,6 +143,9 @@ local function discovery_f(module_version)
consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
while module_version == M.module_version do
for _, replicaset in pairs(M.replicasets) do
+ if module_version ~= M.module_version then
+ return
+ end
local active_buckets, err =
replicaset:callro('vshard.storage.buckets_discovery', {},
{timeout = 2})
@@ -457,8 +474,11 @@ end
-- Configuration
--------------------------------------------------------------------------------
+-- Distinguish reload from reconfigure.
+local module_reloading = true
local function router_cfg(cfg)
cfg = lcfg.check(cfg)
+ local current_cfg = table.deepcopy(cfg)
if not M.replicasets then
log.info('Starting router configuration')
else
@@ -482,15 +502,16 @@ local function router_cfg(cfg)
end
M.total_bucket_count = total_bucket_count
M.collect_lua_garbage = collect_lua_garbage
+ M.current_cfg = current_cfg
-- TODO: update existing route map in-place
M.route_map = {}
M.replicasets = new_replicasets
-- Move connections from an old configuration to a new one.
-- It must be done with no yields to prevent usage both of not
-- fully moved old replicasets, and not fully built new ones.
- for _, replicaset in pairs(new_replicasets) do
- replicaset:rebind_connections()
- end
+ lreplicaset.rebind_replicasets(new_replicasets, module_reloading,
+ M.reload_deprecate_timeout)
+ module_reloading = false
-- Now the new replicasets are fully built. Can establish
-- connections and yield.
for _, replicaset in pairs(new_replicasets) do
@@ -776,15 +797,16 @@ end
-- About functions, saved in M, and reloading see comment in
-- storage/init.lua.
--
-M.discovery_f = discovery_f
-M.failover_f = failover_f
-
-if not rawget(_G, '__module_vshard_router') then
- rawset(_G, '__module_vshard_router', M)
+if not rawget(_G, MODULE_INTERNALS) then
+ rawset(_G, MODULE_INTERNALS, M)
else
+ router_cfg(M.current_cfg)
M.module_version = M.module_version + 1
end
+M.discovery_f = discovery_f
+M.failover_f = failover_f
+
return {
cfg = router_cfg;
info = router_info;
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 879c7c4..b570821 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -2,14 +2,27 @@ local log = require('log')
local luri = require('uri')
local lfiber = require('fiber')
local netbox = require('net.box') -- for net.box:self()
+local trigger = require('internal.trigger')
+
+local MODULE_INTERNALS = '__module_vshard_storage'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+ local vshard_modules = {
+ 'vshard.consts', 'vshard.error', 'vshard.cfg',
+ 'vshard.replicaset', 'vshard.util',
+ }
+ for _, module in pairs(vshard_modules) do
+ package.loaded[module] = nil
+ end
+end
local consts = require('vshard.consts')
local lerror = require('vshard.error')
-local util = require('vshard.util')
local lcfg = require('vshard.cfg')
local lreplicaset = require('vshard.replicaset')
-local trigger = require('internal.trigger')
+local util = require('vshard.util')
-local M = rawget(_G, '__module_vshard_storage')
+
+local M = rawget(_G, MODULE_INTERNALS)
if not M then
--
-- The module is loaded for the first time.
@@ -21,6 +34,8 @@ if not M then
-- See format in replicaset.lua.
--
replicasets = nil,
+ -- Time to deprecate old objects on reload.
+ reload_deprecate_timeout = nil,
-- Triggers on master switch event. They are called right
-- before the event occurs.
_on_master_enable = trigger.new('_on_master_enable'),
@@ -1445,11 +1460,14 @@ end
--------------------------------------------------------------------------------
-- Configuration
--------------------------------------------------------------------------------
+-- Distinguish reload from reconfigure.
+local module_reloading = true
local function storage_cfg(cfg, this_replica_uuid)
if this_replica_uuid == nil then
error('Usage: cfg(configuration, this_replica_uuid)')
end
cfg = lcfg.check(cfg)
+ local current_cfg = table.deepcopy(cfg)
if cfg.weights or cfg.zone then
error('Weights and zone are not allowed for storage configuration')
end
@@ -1572,9 +1590,8 @@ local function storage_cfg(cfg, this_replica_uuid)
box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
M.replicasets = new_replicasets
- for _, replicaset in pairs(new_replicasets) do
- replicaset:rebind_connections()
- end
+ lreplicaset.rebind_replicasets(new_replicasets, module_reloading)
+ module_reloading = false
M.this_replicaset = this_replicaset
M.this_replica = this_replica
M.total_bucket_count = total_bucket_count
@@ -1583,6 +1600,7 @@ local function storage_cfg(cfg, this_replica_uuid)
M.shard_index = shard_index
M.collect_bucket_garbage_interval = collect_bucket_garbage_interval
M.collect_lua_garbage = collect_lua_garbage
+ M.current_cfg = current_cfg
if was_master and not is_master then
local_on_master_disable()
@@ -1813,6 +1831,14 @@ end
-- restarted (or is restarted from M.background_f, which is not
-- changed) and continues use old func1 and func2.
--
+
+if not rawget(_G, MODULE_INTERNALS) then
+ rawset(_G, MODULE_INTERNALS, M)
+else
+ storage_cfg(M.current_cfg, M.this_replica.uuid)
+ M.module_version = M.module_version + 1
+end
+
M.recovery_f = recovery_f
M.collect_garbage_f = collect_garbage_f
M.rebalancer_f = rebalancer_f
@@ -1828,12 +1854,6 @@ M.rebalancer_build_routes = rebalancer_build_routes
M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
M.cached_find_sharded_spaces = find_sharded_spaces
-if not rawget(_G, '__module_vshard_storage') then
- rawset(_G, '__module_vshard_storage', M)
-else
- M.module_version = M.module_version + 1
-end
-
return {
sync = sync,
bucket_force_create = bucket_force_create,
diff --git a/vshard/util.lua b/vshard/util.lua
index bb71318..f5bd78c 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -75,8 +75,28 @@ local function generate_self_checker(obj_name, func_name, mt, func)
end
end
+
+--
+-- Run a function without interrupting current fiber.
+-- @param delay Delay in seconds before the task should be
+-- executed.
+-- @param task Function to be executed.
+-- @param ... Arguments which would be passed to the `task`.
+--
+local function async_task(delay, task, ...)
+ assert(delay == nil or type(delay) == 'number')
+ local function wait_and_call(...)
+ if delay then
+ fiber.sleep(delay)
+ end
+ task(...)
+ end
+ fiber.create(wait_and_call, ...)
+end
+
return {
tuple_extract_key = tuple_extract_key,
reloadable_fiber_f = reloadable_fiber_f,
generate_self_checker = generate_self_checker,
+ async_task = async_task,
}
--
2.14.1
More information about the Tarantool-patches
mailing list