* [tarantool-patches] [PATCH 0/2][vshard] Vshard safe reload
@ 2018-06-09 17:47 AKhatskevich
2018-06-09 17:47 ` [tarantool-patches] [PATCH 1/2] Add test on error during reconfigure AKhatskevich
` (2 more replies)
0 siblings, 3 replies; 14+ messages in thread
From: AKhatskevich @ 2018-06-09 17:47 UTC (permalink / raw)
To: tarantool-patches, v.shpilevoy
Branch: https://github.com/tarantool/vshard/tree/kh/gh-112-reload-mt-2
Issue: https://github.com/tarantool/vshard/issues/112
Two commits:
* Add test on error during reconfigure
* Complete module reload
reloads entire vshard, deprecating old objects
test/lua_libs/util.lua | 16 +++++++
test/router/reload.result | 109 ++++++++++++++++++++++++++++++++++++++++++
test/router/reload.test.lua | 41 ++++++++++++++++
test/router/router.result | 30 +++++++++++-
test/router/router.test.lua | 10 ++++
test/storage/reload.result | 61 +++++++++++++++++++++++
test/storage/reload.test.lua | 24 ++++++++++
test/storage/storage.result | 33 +++++++++++++
test/storage/storage.test.lua | 12 +++++
vshard/replicaset.lua | 91 ++++++++++++++++++++++++++++-------
vshard/router/init.lua | 47 ++++++++++++++----
vshard/storage/init.lua | 53 +++++++++++++++-----
vshard/util.lua | 20 ++++++++
13 files changed, 506 insertions(+), 41 deletions(-)
--
2.14.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] [PATCH 1/2] Add test on error during reconfigure
2018-06-09 17:47 [tarantool-patches] [PATCH 0/2][vshard] Vshard safe reload AKhatskevich
@ 2018-06-09 17:47 ` AKhatskevich
2018-06-17 19:46 ` [tarantool-patches] " Vladislav Shpilevoy
2018-06-09 17:47 ` [tarantool-patches] [PATCH 2/2] Complete module reload AKhatskevich
2018-06-17 19:39 ` [tarantool-patches] Re: [PATCH 0/2][vshard] Vshard safe reload Vladislav Shpilevoy
2 siblings, 1 reply; 14+ messages in thread
From: AKhatskevich @ 2018-06-09 17:47 UTC (permalink / raw)
To: tarantool-patches, v.shpilevoy
In case reconfigure process fails, the node should continue
work properly.
---
test/lua_libs/util.lua | 16 ++++++++++++++++
test/router/router.result | 27 +++++++++++++++++++++++++++
test/router/router.test.lua | 10 ++++++++++
test/storage/storage.result | 33 +++++++++++++++++++++++++++++++++
test/storage/storage.test.lua | 12 ++++++++++++
vshard/router/init.lua | 7 +++++++
vshard/storage/init.lua | 9 +++++++++
7 files changed, 114 insertions(+)
diff --git a/test/lua_libs/util.lua b/test/lua_libs/util.lua
index f2d3b48..aeb2342 100644
--- a/test/lua_libs/util.lua
+++ b/test/lua_libs/util.lua
@@ -69,9 +69,25 @@ local function wait_master(test_run, replicaset, master)
log.info('Slaves are connected to a master "%s"', master)
end
+-- Check that data has at least all fields as an ethalon.
+local function has_same_fields(ethalon, data)
+ assert(type(ethalon) == 'table' and type(data) == 'table')
+ local diff = {}
+ for k, v in pairs(ethalon) do
+ if v ~= data[k] then
+ table.insert(diff, k)
+ end
+ end
+ if #diff > 0 then
+ return false, diff
+ end
+ return true
+end
+
return {
check_error = check_error,
shuffle_masters = shuffle_masters,
collect_timeouts = collect_timeouts,
wait_master = wait_master,
+ has_same_fields = has_same_fields,
}
diff --git a/test/router/router.result b/test/router/router.result
index 2ee1bff..3ebab5d 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -1057,6 +1057,33 @@ error_messages
- - Use replica:is_connected(...) instead of replica.is_connected(...)
- Use replica:safe_uri(...) instead of replica.safe_uri(...)
...
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
+vshard.router.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.router.internal)
+---
+...
+_, err = pcall(vshard.router.cfg, cfg)
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.router.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.router.internal)
+---
+- true
+...
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
_ = test_run:cmd("switch default")
---
...
diff --git a/test/router/router.test.lua b/test/router/router.test.lua
index fae8e24..afcdb9d 100644
--- a/test/router/router.test.lua
+++ b/test/router/router.test.lua
@@ -389,6 +389,16 @@ end;
test_run:cmd("setopt delimiter ''");
error_messages
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+vshard.router.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.router.internal)
+_, err = pcall(vshard.router.cfg, cfg)
+err:match('Error injection:.*')
+vshard.router.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.router.internal)
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+
_ = test_run:cmd("switch default")
test_run:drop_cluster(REPLICASET_2)
diff --git a/test/storage/storage.result b/test/storage/storage.result
index d0bf792..8d88bf4 100644
--- a/test/storage/storage.result
+++ b/test/storage/storage.result
@@ -720,6 +720,39 @@ test_run:cmd("setopt delimiter ''");
---
- true
...
+-- Error during reconfigure process.
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.storage.internal)
+---
+...
+_, err = pcall(vshard.storage.cfg, cfg, names.storage_1_a)
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.storage.internal)
+---
+- true
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
_ = test_run:cmd("switch default")
---
...
diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua
index f4bbf0e..d215db6 100644
--- a/test/storage/storage.test.lua
+++ b/test/storage/storage.test.lua
@@ -177,6 +177,18 @@ for _, new_replicaset in pairs(new_replicasets) do
end;
test_run:cmd("setopt delimiter ''");
+-- Error during reconfigure process.
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs:callro('echo', {'some_data'})
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.storage.internal)
+_, err = pcall(vshard.storage.cfg, cfg, names.storage_1_a)
+err:match('Error injection:.*')
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.storage.internal)
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs:callro('echo', {'some_data'})
+
_ = test_run:cmd("switch default")
test_run:drop_cluster(REPLICASET_2)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 21093e5..1dee80c 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -11,6 +11,7 @@ local M = rawget(_G, '__module_vshard_router')
if not M then
M = {
errinj = {
+ ERRINJ_CFG = false,
ERRINJ_FAILOVER_CHANGE_CFG = false,
ERRINJ_RELOAD = false,
},
@@ -473,6 +474,12 @@ local function router_cfg(cfg)
end
box.cfg(cfg)
log.info("Box has been configured")
+ -- It is considered that all possible errors during cfg
+ -- process occur only before this place.
+ -- This check should be placed as late as possible.
+ if M.errinj.ERRINJ_CFG then
+ error('Error injection: cfg')
+ end
M.total_bucket_count = total_bucket_count
M.collect_lua_garbage = collect_lua_garbage
-- TODO: update existing route map in-place
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 57076e1..879c7c4 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -33,6 +33,7 @@ if not M then
-- Bucket count stored on all replicasets.
total_bucket_count = 0,
errinj = {
+ ERRINJ_CFG = false,
ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
ERRINJ_RELOAD = false,
ERRINJ_CFG_DELAY = false,
@@ -1527,6 +1528,14 @@ local function storage_cfg(cfg, this_replica_uuid)
local shard_index = cfg.shard_index
local collect_bucket_garbage_interval = cfg.collect_bucket_garbage_interval
local collect_lua_garbage = cfg.collect_lua_garbage
+
+ -- It is considered that all possible errors during cfg
+ -- process occur only before this place.
+ -- This check should be placed as late as possible.
+ if M.errinj.ERRINJ_CFG then
+ error('Error injection: cfg')
+ end
+
--
-- Sync timeout is a special case - it must be updated before
-- all other options to allow a user to demote a master with
--
2.14.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] [PATCH 2/2] Complete module reload
2018-06-09 17:47 [tarantool-patches] [PATCH 0/2][vshard] Vshard safe reload AKhatskevich
2018-06-09 17:47 ` [tarantool-patches] [PATCH 1/2] Add test on error during reconfigure AKhatskevich
@ 2018-06-09 17:47 ` AKhatskevich
2018-06-17 19:46 ` [tarantool-patches] " Vladislav Shpilevoy
2018-06-17 19:39 ` [tarantool-patches] Re: [PATCH 0/2][vshard] Vshard safe reload Vladislav Shpilevoy
2 siblings, 1 reply; 14+ messages in thread
From: AKhatskevich @ 2018-06-09 17:47 UTC (permalink / raw)
To: tarantool-patches, v.shpilevoy
In case one need to upgrade vshard to a new version, this commit
introduces a reload mechanism which allows to do that for a
noticeable variety of possible changes (between the two versions).
Reload process:
* reload all vshard modules
* create new `replicaset` and `replica` objects
* reuse old netbox connections in new replica objects if
possible
* update router/storage.internal table
* after a `reload_deprecate_timeout` disable old instances of
`replicaset` and `replica` objects
Reload works for modules:
* vshard.router
* vshard.storage
In case reload process fails, old router/storage module continue
working properly.
Extra changes:
* add `util.async_task` method, which runs a function after a
delay
* delete replicaset:rebind_connections method as it is replaced
with `rebind_replicasets` which updates all replicasets at once
* introduce `module_reloading` for distinguishing reloaf from
reconfigure
* introduce MODULE_INTERNALS which stores name of the module
internal data in the global namespace
Closes #112
---
test/router/reload.result | 109 +++++++++++++++++++++++++++++++++++++++++++
test/router/reload.test.lua | 41 ++++++++++++++++
test/router/router.result | 3 +-
test/storage/reload.result | 61 ++++++++++++++++++++++++
test/storage/reload.test.lua | 24 ++++++++++
vshard/replicaset.lua | 91 +++++++++++++++++++++++++++++-------
vshard/router/init.lua | 40 ++++++++++++----
vshard/storage/init.lua | 44 ++++++++++++-----
vshard/util.lua | 20 ++++++++
9 files changed, 392 insertions(+), 41 deletions(-)
diff --git a/test/router/reload.result b/test/router/reload.result
index 19a9ead..083475f 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -174,6 +174,115 @@ vshard.router.module_version()
check_reloaded()
---
...
+--
+-- Deprecate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_ = require('vshard.router')
+---
+...
+-- Make sure async task (deprecator) has had cpu time.
+fiber.sleep(0.005)
+---
+...
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+---
+...
+err:match('Object replicaset is outdated.*')
+---
+- Object replicaset is outdated. Use new instance
+...
+vshard.router = require('vshard.router')
+---
+...
+rs = vshard.router.route(1)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+-- Test deprecate timeout.
+vshard.router.internal.reload_deprecate_timeout = 0.3
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_ = require('vshard.router')
+---
+...
+vshard.router.internal.reload_deprecate_timeout = nil
+---
+...
+vshard.router = require('vshard.router')
+---
+...
+rs_new = vshard.router.route(1)
+---
+...
+_ = rs_new:callro('echo', {'some_data'})
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+fiber.sleep(0.2)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+fiber.sleep(0.2)
+---
+...
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+---
+...
+err:match('Object replicaset is outdated.*')
+---
+- Object replicaset is outdated. Use new instance
+...
+_ = rs_new:callro('echo', {'some_data'})
+---
+...
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
+vshard.router.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.router.internal)
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_, err = pcall(require, 'vshard.router')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.router.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.router.internal)
+---
+- true
+...
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
test_run:switch('default')
---
- true
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 6e21b74..94cd8cc 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -86,6 +86,47 @@ _ = require('vshard.router')
vshard.router.module_version()
check_reloaded()
+--
+-- Deprecate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+_ = rs:callro('echo', {'some_data'})
+package.loaded["vshard.router"] = nil
+_ = require('vshard.router')
+-- Make sure async task (deprecator) has had cpu time.
+fiber.sleep(0.005)
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+err:match('Object replicaset is outdated.*')
+vshard.router = require('vshard.router')
+rs = vshard.router.route(1)
+_ = rs:callro('echo', {'some_data'})
+-- Test deprecate timeout.
+vshard.router.internal.reload_deprecate_timeout = 0.3
+package.loaded["vshard.router"] = nil
+_ = require('vshard.router')
+vshard.router.internal.reload_deprecate_timeout = nil
+vshard.router = require('vshard.router')
+rs_new = vshard.router.route(1)
+_ = rs_new:callro('echo', {'some_data'})
+_ = rs:callro('echo', {'some_data'})
+fiber.sleep(0.2)
+_ = rs:callro('echo', {'some_data'})
+fiber.sleep(0.2)
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+err:match('Object replicaset is outdated.*')
+_ = rs_new:callro('echo', {'some_data'})
+
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+vshard.router.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.router.internal)
+package.loaded["vshard.router"] = nil
+_, err = pcall(require, 'vshard.router')
+err:match('Error injection:.*')
+vshard.router.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.router.internal)
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+
test_run:switch('default')
test_run:cmd('stop server router_1')
test_run:cmd('cleanup server router_1')
diff --git a/test/router/router.result b/test/router/router.result
index 3ebab5d..71e156c 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -1024,11 +1024,10 @@ error_messages
- - Use replicaset:callro(...) instead of replicaset.callro(...)
- Use replicaset:connect_master(...) instead of replicaset.connect_master(...)
- Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...)
- - Use replicaset:rebind_connections(...) instead of replicaset.rebind_connections(...)
- Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...)
- Use replicaset:call(...) instead of replicaset.call(...)
- - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
- Use replicaset:connect(...) instead of replicaset.connect(...)
+ - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)
- Use replicaset:callrw(...) instead of replicaset.callrw(...)
- Use replicaset:connect_all(...) instead of replicaset.connect_all(...)
...
diff --git a/test/storage/reload.result b/test/storage/reload.result
index f689cf4..b7070e5 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -174,6 +174,67 @@ vshard.storage.module_version()
check_reloaded()
---
...
+--
+-- Deprecate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_ = require('vshard.storage')
+---
+...
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+---
+...
+err:match('Object replicaset is outdated.*')
+---
+- Object replicaset is outdated. Use new instance
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs.callro(rs, 'echo', {'some_data'})
+---
+...
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_, err = pcall(require, 'vshard.storage')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.storage.internal)
+---
+- true
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
test_run:switch('default')
---
- true
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 6e19a92..bf8e803 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -87,6 +87,30 @@ _ = require('vshard.storage')
vshard.storage.module_version()
check_reloaded()
+--
+-- Deprecate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+package.loaded["vshard.storage"] = nil
+_ = require('vshard.storage')
+ok, err = pcall(rs.callro, rs, 'echo', {'some_data'})
+err:match('Object replicaset is outdated.*')
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs.callro(rs, 'echo', {'some_data'})
+
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs:callro('echo', {'some_data'})
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+_, err = pcall(require, 'vshard.storage')
+err:match('Error injection:.*')
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.storage.internal)
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs:callro('echo', {'some_data'})
+
test_run:switch('default')
test_run:drop_cluster(REPLICASET_2)
test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 99f59aa..48053a0 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -48,7 +48,8 @@ local lerror = require('vshard.error')
local fiber = require('fiber')
local luri = require('uri')
local ffi = require('ffi')
-local gsc = require('vshard.util').generate_self_checker
+local util = require('vshard.util')
+local gsc = util.generate_self_checker
--
-- on_connect() trigger for net.box
@@ -338,26 +339,79 @@ local function replicaset_tostring(replicaset)
end
--
--- Rebind connections of old replicas to new ones.
+-- Deprecate old objects in case of reload.
+--
+local function clean_old_objects(old_replicaset_mt, old_replica_mt,
+ old_replicas)
+ local function get_deprecated_warning(obj_name)
+ return function(...)
+ local finfo = debug.getinfo(2)
+ local file = finfo.short_src
+ local name = finfo.name
+ local line = finfo.currentline
+ local err_fmt = '%s.%s:%s: Object %s is outdated. Use new instance'
+ error(string.format(err_fmt, file, name, line, obj_name))
+ end
+ end
+ local function replace_mt_methods(mt, method)
+ if not mt then
+ return
+ end
+ for name, _ in pairs(mt.__index) do
+ mt.__index[name] = method
+ end
+ end
+ replace_mt_methods(old_replicaset_mt, get_deprecated_warning('replicaset'))
+ replace_mt_methods(old_replica_mt, get_deprecated_warning('replica'))
+ for _, replica in pairs(old_replicas) do
+ replica.conn = nil
+ end
+ log.info('Old replicaset and replica objects are cleaned up.')
+end
+
--
-local function replicaset_rebind_connections(replicaset)
- for _, replica in pairs(replicaset.replicas) do
- local old_replica = replica.old_replica
- if old_replica then
- local conn = old_replica.conn
- replica.conn = conn
- replica.down_ts = old_replica.down_ts
- replica.net_timeout = old_replica.net_timeout
- replica.net_sequential_ok = old_replica.net_sequential_ok
- replica.net_sequential_fail = old_replica.net_sequential_fail
- if conn then
- conn.replica = replica
- conn.replicaset = replicaset
- old_replica.conn = nil
+-- Copy netbox conections from old replica objects to new ones.
+--
+local function rebind_replicasets(replicasets, deprecate, deprecate_timeout)
+ -- Collect data to deprecate old objects.
+ local old_replicaset_mt = nil
+ local old_replica_mt = nil
+ local old_replicas = {}
+ for _, replicaset in pairs(replicasets) do
+ if replicaset.old_replicaset then
+ local old_mt = getmetatable(replicaset.old_replicaset)
+ assert(old_replicaset_mt == nil or old_replicaset_mt == old_mt)
+ assert(not deprecate or
+ old_replicaset_mt ~= getmetatable(replicaset))
+ old_replicaset_mt = old_mt
+ replicaset.old_replicaset = nil
+ end
+ for _, replica in pairs(replicaset.replicas) do
+ local old_replica = replica.old_replica
+ if old_replica then
+ local old_mt = getmetatable(old_replica)
+ assert(old_replica_mt == nil or old_replica_mt == old_mt)
+ assert(not deprecate or old_replica_mt ~= getmetatable(replica))
+ old_replica_mt = old_mt
+ table.insert(old_replicas, old_replica)
+ local conn = old_replica.conn
+ replica.conn = conn
+ replica.down_ts = old_replica.down_ts
+ replica.net_timeout = old_replica.net_timeout
+ replica.net_sequential_ok = old_replica.net_sequential_ok
+ replica.net_sequential_fail = old_replica.net_sequential_fail
+ if conn then
+ conn.replica = replica
+ conn.replicaset = replicaset
+ end
+ replica.old_replica = nil
end
- replica.old_replica = nil
end
end
+ if deprecate then
+ util.async_task(deprecate_timeout, clean_old_objects, old_replicaset_mt,
+ old_replica_mt, old_replicas)
+ end
end
--
@@ -369,7 +423,6 @@ local replicaset_mt = {
connect_master = replicaset_connect_master;
connect_all = replicaset_connect_all;
connect_replica = replicaset_connect_to_replica;
- rebind_connections = replicaset_rebind_connections;
down_replica_priority = replicaset_down_replica_priority;
up_replica_priority = replicaset_up_replica_priority;
call = replicaset_master_call;
@@ -523,6 +576,7 @@ local function buildall(sharding_cfg, old_replicasets)
weight = replicaset.weight,
bucket_count = 0,
lock = replicaset.lock,
+ old_replicaset = old_replicaset,
}, replicaset_mt)
local priority_list = {}
for replica_uuid, replica in pairs(replicaset.replicas) do
@@ -596,4 +650,5 @@ return {
buildall = buildall,
calculate_etalon_balance = cluster_calculate_etalon_balance,
wait_masters_connect = wait_masters_connect,
+ rebind_replicasets = rebind_replicasets,
}
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 1dee80c..c075436 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -1,5 +1,17 @@
local log = require('log')
local lfiber = require('fiber')
+
+local MODULE_INTERNALS = '__module_vshard_router'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+ local vshard_modules = {
+ 'vshard.consts', 'vshard.error', 'vshard.cfg',
+ 'vshard.hash', 'vshard.replicaset', 'vshard.util',
+ }
+ for _, module in pairs(vshard_modules) do
+ package.loaded[module] = nil
+ end
+end
local consts = require('vshard.consts')
local lerror = require('vshard.error')
local lcfg = require('vshard.cfg')
@@ -7,7 +19,7 @@ local lhash = require('vshard.hash')
local lreplicaset = require('vshard.replicaset')
local util = require('vshard.util')
-local M = rawget(_G, '__module_vshard_router')
+local M = rawget(_G, MODULE_INTERNALS)
if not M then
M = {
errinj = {
@@ -17,6 +29,8 @@ if not M then
},
-- Bucket map cache.
route_map = {},
+ -- Time to deprecate old objects on reload.
+ reload_deprecate_timeout = nil,
-- All known replicasets used for bucket re-balancing
replicasets = nil,
-- Fiber to maintain replica connections.
@@ -129,6 +143,9 @@ local function discovery_f(module_version)
consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
while module_version == M.module_version do
for _, replicaset in pairs(M.replicasets) do
+ if module_version ~= M.module_version then
+ return
+ end
local active_buckets, err =
replicaset:callro('vshard.storage.buckets_discovery', {},
{timeout = 2})
@@ -457,8 +474,11 @@ end
-- Configuration
--------------------------------------------------------------------------------
+-- Distinguish reload from reconfigure.
+local module_reloading = true
local function router_cfg(cfg)
cfg = lcfg.check(cfg)
+ local current_cfg = table.deepcopy(cfg)
if not M.replicasets then
log.info('Starting router configuration')
else
@@ -482,15 +502,16 @@ local function router_cfg(cfg)
end
M.total_bucket_count = total_bucket_count
M.collect_lua_garbage = collect_lua_garbage
+ M.current_cfg = current_cfg
-- TODO: update existing route map in-place
M.route_map = {}
M.replicasets = new_replicasets
-- Move connections from an old configuration to a new one.
-- It must be done with no yields to prevent usage both of not
-- fully moved old replicasets, and not fully built new ones.
- for _, replicaset in pairs(new_replicasets) do
- replicaset:rebind_connections()
- end
+ lreplicaset.rebind_replicasets(new_replicasets, module_reloading,
+ M.reload_deprecate_timeout)
+ module_reloading = false
-- Now the new replicasets are fully built. Can establish
-- connections and yield.
for _, replicaset in pairs(new_replicasets) do
@@ -776,15 +797,16 @@ end
-- About functions, saved in M, and reloading see comment in
-- storage/init.lua.
--
-M.discovery_f = discovery_f
-M.failover_f = failover_f
-
-if not rawget(_G, '__module_vshard_router') then
- rawset(_G, '__module_vshard_router', M)
+if not rawget(_G, MODULE_INTERNALS) then
+ rawset(_G, MODULE_INTERNALS, M)
else
+ router_cfg(M.current_cfg)
M.module_version = M.module_version + 1
end
+M.discovery_f = discovery_f
+M.failover_f = failover_f
+
return {
cfg = router_cfg;
info = router_info;
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 879c7c4..b570821 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -2,14 +2,27 @@ local log = require('log')
local luri = require('uri')
local lfiber = require('fiber')
local netbox = require('net.box') -- for net.box:self()
+local trigger = require('internal.trigger')
+
+local MODULE_INTERNALS = '__module_vshard_storage'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+ local vshard_modules = {
+ 'vshard.consts', 'vshard.error', 'vshard.cfg',
+ 'vshard.replicaset', 'vshard.util',
+ }
+ for _, module in pairs(vshard_modules) do
+ package.loaded[module] = nil
+ end
+end
local consts = require('vshard.consts')
local lerror = require('vshard.error')
-local util = require('vshard.util')
local lcfg = require('vshard.cfg')
local lreplicaset = require('vshard.replicaset')
-local trigger = require('internal.trigger')
+local util = require('vshard.util')
-local M = rawget(_G, '__module_vshard_storage')
+
+local M = rawget(_G, MODULE_INTERNALS)
if not M then
--
-- The module is loaded for the first time.
@@ -21,6 +34,8 @@ if not M then
-- See format in replicaset.lua.
--
replicasets = nil,
+ -- Time to deprecate old objects on reload.
+ reload_deprecate_timeout = nil,
-- Triggers on master switch event. They are called right
-- before the event occurs.
_on_master_enable = trigger.new('_on_master_enable'),
@@ -1445,11 +1460,14 @@ end
--------------------------------------------------------------------------------
-- Configuration
--------------------------------------------------------------------------------
+-- Distinguish reload from reconfigure.
+local module_reloading = true
local function storage_cfg(cfg, this_replica_uuid)
if this_replica_uuid == nil then
error('Usage: cfg(configuration, this_replica_uuid)')
end
cfg = lcfg.check(cfg)
+ local current_cfg = table.deepcopy(cfg)
if cfg.weights or cfg.zone then
error('Weights and zone are not allowed for storage configuration')
end
@@ -1572,9 +1590,8 @@ local function storage_cfg(cfg, this_replica_uuid)
box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password)
M.replicasets = new_replicasets
- for _, replicaset in pairs(new_replicasets) do
- replicaset:rebind_connections()
- end
+ lreplicaset.rebind_replicasets(new_replicasets, module_reloading)
+ module_reloading = false
M.this_replicaset = this_replicaset
M.this_replica = this_replica
M.total_bucket_count = total_bucket_count
@@ -1583,6 +1600,7 @@ local function storage_cfg(cfg, this_replica_uuid)
M.shard_index = shard_index
M.collect_bucket_garbage_interval = collect_bucket_garbage_interval
M.collect_lua_garbage = collect_lua_garbage
+ M.current_cfg = current_cfg
if was_master and not is_master then
local_on_master_disable()
@@ -1813,6 +1831,14 @@ end
-- restarted (or is restarted from M.background_f, which is not
-- changed) and continues use old func1 and func2.
--
+
+if not rawget(_G, MODULE_INTERNALS) then
+ rawset(_G, MODULE_INTERNALS, M)
+else
+ storage_cfg(M.current_cfg, M.this_replica.uuid)
+ M.module_version = M.module_version + 1
+end
+
M.recovery_f = recovery_f
M.collect_garbage_f = collect_garbage_f
M.rebalancer_f = rebalancer_f
@@ -1828,12 +1854,6 @@ M.rebalancer_build_routes = rebalancer_build_routes
M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
M.cached_find_sharded_spaces = find_sharded_spaces
-if not rawget(_G, '__module_vshard_storage') then
- rawset(_G, '__module_vshard_storage', M)
-else
- M.module_version = M.module_version + 1
-end
-
return {
sync = sync,
bucket_force_create = bucket_force_create,
diff --git a/vshard/util.lua b/vshard/util.lua
index bb71318..f5bd78c 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -75,8 +75,28 @@ local function generate_self_checker(obj_name, func_name, mt, func)
end
end
+
+--
+-- Run a function without interrupting current fiber.
+-- @param delay Delay in seconds before the task should be
+-- executed.
+-- @param task Function to be executed.
+-- @param ... Arguments which would be passed to the `task`.
+--
+local function async_task(delay, task, ...)
+ assert(delay == nil or type(delay) == 'number')
+ local function wait_and_call(...)
+ if delay then
+ fiber.sleep(delay)
+ end
+ task(...)
+ end
+ fiber.create(wait_and_call, ...)
+end
+
return {
tuple_extract_key = tuple_extract_key,
reloadable_fiber_f = reloadable_fiber_f,
generate_self_checker = generate_self_checker,
+ async_task = async_task,
}
--
2.14.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH 0/2][vshard] Vshard safe reload
2018-06-09 17:47 [tarantool-patches] [PATCH 0/2][vshard] Vshard safe reload AKhatskevich
2018-06-09 17:47 ` [tarantool-patches] [PATCH 1/2] Add test on error during reconfigure AKhatskevich
2018-06-09 17:47 ` [tarantool-patches] [PATCH 2/2] Complete module reload AKhatskevich
@ 2018-06-17 19:39 ` Vladislav Shpilevoy
2018-06-19 7:20 ` Alex Khatskevich
2 siblings, 1 reply; 14+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-17 19:39 UTC (permalink / raw)
To: tarantool-patches, AKhatskevich
Hello. Thanks for the patches!
Please, describe here the point of the patchset.
It could be not obvious for a witness.
On 09/06/2018 20:47, AKhatskevich wrote:
> Branch: https://github.com/tarantool/vshard/tree/kh/gh-112-reload-mt-2
> Issue: https://github.com/tarantool/vshard/issues/112
>
> Two commits:
> * Add test on error during reconfigure
> * Complete module reload
> reloads entire vshard, deprecating old objects
>
> test/lua_libs/util.lua | 16 +++++++
> test/router/reload.result | 109 ++++++++++++++++++++++++++++++++++++++++++
> test/router/reload.test.lua | 41 ++++++++++++++++
> test/router/router.result | 30 +++++++++++-
> test/router/router.test.lua | 10 ++++
> test/storage/reload.result | 61 +++++++++++++++++++++++
> test/storage/reload.test.lua | 24 ++++++++++
> test/storage/storage.result | 33 +++++++++++++
> test/storage/storage.test.lua | 12 +++++
> vshard/replicaset.lua | 91 ++++++++++++++++++++++++++++-------
> vshard/router/init.lua | 47 ++++++++++++++----
> vshard/storage/init.lua | 53 +++++++++++++++-----
> vshard/util.lua | 20 ++++++++
> 13 files changed, 506 insertions(+), 41 deletions(-)
>
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH 2/2] Complete module reload
2018-06-09 17:47 ` [tarantool-patches] [PATCH 2/2] Complete module reload AKhatskevich
@ 2018-06-17 19:46 ` Vladislav Shpilevoy
2018-06-19 7:09 ` Alex Khatskevich
0 siblings, 1 reply; 14+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-17 19:46 UTC (permalink / raw)
To: AKhatskevich, tarantool-patches
Thanks for the patch! See 11 comments below.
On 09/06/2018 20:47, AKhatskevich wrote:
> In case one need to upgrade vshard to a new version, this commit
> introduces a reload mechanism which allows to do that for a
> noticeable variety of possible changes (between the two versions).
Nitpicking: it does not introduce reload mechanism. This commit
just improves the thing.
Maybe should we place here a manual how to reload?
A short example.
>
> Reload process:
> * reload all vshard modules
> * create new `replicaset` and `replica` objects
> * reuse old netbox connections in new replica objects if
> possible
> * update router/storage.internal table
> * after a `reload_deprecate_timeout` disable old instances of
> `replicaset` and `replica` objects
>
> Reload works for modules:
> * vshard.router
> * vshard.storage
>
> In case reload process fails, old router/storage module continue
> working properly.
>
> Extra changes:
> * add `util.async_task` method, which runs a function after a
> delay
> * delete replicaset:rebind_connections method as it is replaced
> with `rebind_replicasets` which updates all replicasets at once
> * introduce `module_reloading` for distinguishing reloaf from
1. reloaf -> reload.
> reconfigure
> * introduce MODULE_INTERNALS which stores name of the module
> internal data in the global namespace
>
> Closes #112
> ---
> test/router/reload.result | 109 +++++++++++++++++++++++++++++++++++++++++++
> test/router/reload.test.lua | 41 ++++++++++++++++
> test/router/router.result | 3 +-
> test/storage/reload.result | 61 ++++++++++++++++++++++++
> test/storage/reload.test.lua | 24 ++++++++++
> vshard/replicaset.lua | 91 +++++++++++++++++++++++++++++-------
> vshard/router/init.lua | 40 ++++++++++++----
> vshard/storage/init.lua | 44 ++++++++++++-----
> vshard/util.lua | 20 ++++++++
> 9 files changed, 392 insertions(+), 41 deletions(-)
>
I will review the tests later, after we finish with the code.
> diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
> index 99f59aa..48053a0 100644
> --- a/vshard/replicaset.lua
> +++ b/vshard/replicaset.lua
> @@ -48,7 +48,8 @@ local lerror = require('vshard.error')
> local fiber = require('fiber')
> local luri = require('uri')
> local ffi = require('ffi')
> -local gsc = require('vshard.util').generate_self_checker
> +local util = require('vshard.util')
> +local gsc = util.generate_self_checker
>
> --
> -- on_connect() trigger for net.box
> @@ -338,26 +339,79 @@ local function replicaset_tostring(replicaset)
> end
>
> --
> --- Rebind connections of old replicas to new ones.
> +-- Deprecate old objects in case of reload.
> +--
> +local function clean_old_objects(old_replicaset_mt, old_replica_mt,
> + old_replicas)
> + local function get_deprecated_warning(obj_name)
> + return function(...)
> + local finfo = debug.getinfo(2)
> + local file = finfo.short_src
> + local name = finfo.name
> + local line = finfo.currentline
> + local err_fmt = '%s.%s:%s: Object %s is outdated. Use new instance'
> + error(string.format(err_fmt, file, name, line, obj_name))
> + end
> + end
2. Too complex.
2.1 Why could not you just declare this function out of clean_old_object()?
It does not depend on any upvalue here. And it is parsed every time when
clean_old_objects is called slowing it down.
2.2. Too complex error message. Lets just throw error that object is
outdated.
> + local function replace_mt_methods(mt, method)
> + if not mt then
> + return
> + end
> + for name, _ in pairs(mt.__index) do
> + mt.__index[name] = method
> + end
> + end
> + replace_mt_methods(old_replicaset_mt, get_deprecated_warning('replicaset'))
> + replace_mt_methods(old_replica_mt, get_deprecated_warning('replica'))
3. Why replace_mt_methods needs old mt? Why not just replace the whole old
__index?
I thought you would do something like this:
getmetatable(old_object).__index = function()
return function() error('The object is outdated') end
end
Any attempt to call a method in an old object leads to
throwing an error.
It allows to remove both replace_mt_methods, get_deprecated_warning.
And one more remark: our convention is to throw only OOM and misusage
errors. Here we should return two values: nil, error_object. Lets
add a new error code for this object.
> + for _, replica in pairs(old_replicas) do
> + replica.conn = nil
> + end
> + log.info('Old replicaset and replica objects are cleaned up.')
> +end
> +
> --
> -local function replicaset_rebind_connections(replicaset)
> - for _, replica in pairs(replicaset.replicas) do
> - local old_replica = replica.old_replica
> - if old_replica then
> - local conn = old_replica.conn
> - replica.conn = conn
> - replica.down_ts = old_replica.down_ts
> - replica.net_timeout = old_replica.net_timeout
> - replica.net_sequential_ok = old_replica.net_sequential_ok
> - replica.net_sequential_fail = old_replica.net_sequential_fail
> - if conn then
> - conn.replica = replica
> - conn.replicaset = replicaset
> - old_replica.conn = nil
> +-- Copy netbox conections from old replica objects to new ones.
> +--
> +local function rebind_replicasets(replicasets, deprecate, deprecate_timeout)
4. Lets always deprecate old objects. Reconfigure is like reload called from API.
And remove 'deprecate' parameter.
On reconfiguration deprecating is even more useful because if during the
reconfiguration a cluster topology is changed, then some of buckets can be
transferred making old routes (and thus old replicaset objects that
had been got by router.route() call) outdated.
> + -- Collect data to deprecate old objects.
> + local old_replicaset_mt = nil
> + local old_replica_mt = nil
> + local old_replicas = {}
> + for _, replicaset in pairs(replicasets) do
> + if replicaset.old_replicaset then
> + local old_mt = getmetatable(replicaset.old_replicaset)
> + assert(old_replicaset_mt == nil or old_replicaset_mt == old_mt)
> + assert(not deprecate or
> + old_replicaset_mt ~= getmetatable(replicaset))
> + old_replicaset_mt = old_mt
> + replicaset.old_replicaset = nil
> + end
> + for _, replica in pairs(replicaset.replicas) do
> + local old_replica = replica.old_replica
> + if old_replica then
> + local old_mt = getmetatable(old_replica)
> + assert(old_replica_mt == nil or old_replica_mt == old_mt)
> + assert(not deprecate or old_replica_mt ~= getmetatable(replica))
> + old_replica_mt = old_mt
> + table.insert(old_replicas, old_replica)
> + local conn = old_replica.conn
> + replica.conn = conn
> + replica.down_ts = old_replica.down_ts
> + replica.net_timeout = old_replica.net_timeout
> + replica.net_sequential_ok = old_replica.net_sequential_ok
> + replica.net_sequential_fail = old_replica.net_sequential_fail
> + if conn then
> + conn.replica = replica
> + conn.replicaset = replicaset
> + end
> + replica.old_replica = nil
> end
> - replica.old_replica = nil
> end
> end
> + if deprecate then
> + util.async_task(deprecate_timeout, clean_old_objects, old_replicaset_mt,
> + old_replica_mt, old_replicas)
5. Here you have lost replicas and replicasets that had been removed from
the configuration. Not updated, but exactly removed. You iterate over new
replicasets only, that has not links to removed ones.
You should not deprecate old objects via links in new ones. You should have
two different methods: to schedule old objects deprecation and to rebind
connections. Looks, like rebind_connections should be returned.
> + end
> end
>
> --
> @@ -523,6 +576,7 @@ local function buildall(sharding_cfg, old_replicasets)
> weight = replicaset.weight,
> bucket_count = 0,
> lock = replicaset.lock,
> + old_replicaset = old_replicaset,
6. You should not have this link.
> }, replicaset_mt)
> local priority_list = {}
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 1dee80c..c075436 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -129,6 +143,9 @@ local function discovery_f(module_version)
> consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL
> while module_version == M.module_version do
> for _, replicaset in pairs(M.replicasets) do
> + if module_version ~= M.module_version then
> + return
> + end
7. As I told you verbally, this check should be done after callro.
I can reload the module during callro(), and old replicaset object
will fill the route map. Can you please extract this 3-line fix in a
separate commit with a test?
> local active_buckets, err =
> replicaset:callro('vshard.storage.buckets_discovery', {},
> {timeout = 2})
> @@ -457,8 +474,11 @@ end
> -- Configuration
> --------------------------------------------------------------------------------
>
> +-- Distinguish reload from reconfigure.
> +local module_reloading = true
> local function router_cfg(cfg)
> cfg = lcfg.check(cfg)
> + local current_cfg = table.deepcopy(cfg)
8. cfg is already deepcopy of the original cfg.
> if not M.replicasets then
> log.info('Starting router configuration')
> else
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 879c7c4..b570821 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -2,14 +2,27 @@ local log = require('log')
> local luri = require('uri')
> local lfiber = require('fiber')
> local netbox = require('net.box') -- for net.box:self()
> +local trigger = require('internal.trigger')
> +
> +local MODULE_INTERNALS = '__module_vshard_storage'
> +-- Reload requirements, in case this module is reloaded manually.
> +if rawget(_G, MODULE_INTERNALS) then
> + local vshard_modules = {
> + 'vshard.consts', 'vshard.error', 'vshard.cfg',
> + 'vshard.replicaset', 'vshard.util',
> + }
> + for _, module in pairs(vshard_modules) do
> + package.loaded[module] = nil
> + end
> +end
> local consts = require('vshard.consts')
> local lerror = require('vshard.error')
> -local util = require('vshard.util')
> local lcfg = require('vshard.cfg')
> local lreplicaset = require('vshard.replicaset')
> -local trigger = require('internal.trigger')
> +local util = require('vshard.util')
>
> -local M = rawget(_G, '__module_vshard_storage')
> +
9. Extra empty line.
> +local M = rawget(_G, MODULE_INTERNALS)
> if not M then
> --
> -- The module is loaded for the first time.
> @@ -21,6 +34,8 @@ if not M then
> -- See format in replicaset.lua.
> --
> replicasets = nil,
> + -- Time to deprecate old objects on reload.
> + reload_deprecate_timeout = nil,
10. Looks like this thing is always 'nil' until somebody does
not manually set it via storage/router.internal. It is not good.
Please, move it to configuration and create a default value.
> -- Triggers on master switch event. They are called right
> -- before the event occurs.
> _on_master_enable = trigger.new('_on_master_enable'),
> diff --git a/vshard/util.lua b/vshard/util.lua
> index bb71318..f5bd78c 100644
> --- a/vshard/util.lua
> +++ b/vshard/util.lua
> @@ -75,8 +75,28 @@ local function generate_self_checker(obj_name, func_name, mt, func)
> end
> end
>
> +
> +--
> +-- Run a function without interrupting current fiber.
> +-- @param delay Delay in seconds before the task should be
> +-- executed.
> +-- @param task Function to be executed.
> +-- @param ... Arguments which would be passed to the `task`.
> +--
> +local function async_task(delay, task, ...)
> + assert(delay == nil or type(delay) == 'number')
> + local function wait_and_call(...)
> + if delay then
> + fiber.sleep(delay)
> + end
> + task(...)
> + end
11. Please, avoid closures when possible. Every time the function is
called the closure is parsed again.
> + fiber.create(wait_and_call, ...)
> +end
> +
> return {
> tuple_extract_key = tuple_extract_key,
> reloadable_fiber_f = reloadable_fiber_f,
> generate_self_checker = generate_self_checker,
> + async_task = async_task,
> }
>
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH 1/2] Add test on error during reconfigure
2018-06-09 17:47 ` [tarantool-patches] [PATCH 1/2] Add test on error during reconfigure AKhatskevich
@ 2018-06-17 19:46 ` Vladislav Shpilevoy
2018-06-18 10:10 ` Alex Khatskevich
0 siblings, 1 reply; 14+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-17 19:46 UTC (permalink / raw)
To: tarantool-patches, AKhatskevich
Thanks for the patch! See 4 comments below.
On 09/06/2018 20:47, AKhatskevich wrote:
> In case reconfigure process fails, the node should continue
> work properly.
> ---
> test/lua_libs/util.lua | 16 ++++++++++++++++
> test/router/router.result | 27 +++++++++++++++++++++++++++
> test/router/router.test.lua | 10 ++++++++++
> test/storage/storage.result | 33 +++++++++++++++++++++++++++++++++
> test/storage/storage.test.lua | 12 ++++++++++++
> vshard/router/init.lua | 7 +++++++
> vshard/storage/init.lua | 9 +++++++++
> 7 files changed, 114 insertions(+)
>
> diff --git a/test/router/router.result b/test/router/router.result
> index 2ee1bff..3ebab5d 100644
> --- a/test/router/router.result
> +++ b/test/router/router.result
> @@ -1057,6 +1057,33 @@ error_messages
> - - Use replica:is_connected(...) instead of replica.is_connected(...)
> - Use replica:safe_uri(...) instead of replica.safe_uri(...)
> ...
> +-- Error during reconfigure process.
> +_ = vshard.router.route(1):callro('echo', {'some_data'})
1. Why do you need this call here? It does nor outputs
nothing.
> +---
> +...
> +vshard.router.internal.errinj.ERRINJ_CFG = true
> +---
> +...
> +old_internal = table.copy(vshard.router.internal)
> +---
> +...
> +_, err = pcall(vshard.router.cfg, cfg)
> +---
> +...
> +err:match('Error injection:.*')
> +---
> +- 'Error injection: cfg'
> +...
> +vshard.router.internal.errinj.ERRINJ_CFG = false
> +---
> +...
> +util.has_same_fields(old_internal, vshard.router.internal)
> +---
> +- true
> +...
> +_ = vshard.router.route(1):callro('echo', {'some_data'})
2. Same. Maybe this call should output something? This
test would not fail if callro failed. Call/ro/rw do not
throw exceptions but return a pair: result and error if
occurred.
> +---
> +...
> _ = test_run:cmd("switch default")
> ---
> ...
> diff --git a/test/storage/storage.result b/test/storage/storage.result
> index d0bf792..8d88bf4 100644
> --- a/test/storage/storage.result
> +++ b/test/storage/storage.result
> @@ -720,6 +720,39 @@ test_run:cmd("setopt delimiter ''");
> ---
> - true
> ...
> +-- Error during reconfigure process.
> +_, rs = next(vshard.storage.internal.replicasets)
> +---
> +...
> +_ = rs:callro('echo', {'some_data'})
3. Same. Here and at the end of the test.
> +---
> +...
> +vshard.storage.internal.errinj.ERRINJ_CFG = true
> +---
> +...
> +old_internal = table.copy(vshard.storage.internal)
> +---
> +...
> +_, err = pcall(vshard.storage.cfg, cfg, names.storage_1_a)
> +---
> +...
> +err:match('Error injection:.*')
> +---
> +- 'Error injection: cfg'
> +...
> +vshard.storage.internal.errinj.ERRINJ_CFG = false
> +---
> +...
> +util.has_same_fields(old_internal, vshard.storage.internal)
> +---
> +- true
> +...
> +_, rs = next(vshard.storage.internal.replicasets)
> +---
> +...
> +_ = rs:callro('echo', {'some_data'})
> +---
> +...
> _ = test_run:cmd("switch default")
> ---
> ...
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 21093e5..1dee80c 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -473,6 +474,12 @@ local function router_cfg(cfg)
> end
> box.cfg(cfg)
> log.info("Box has been configured")
> + -- It is considered that all possible errors during cfg
> + -- process occur only before this place.
> + -- This check should be placed as late as possible.
> + if M.errinj.ERRINJ_CFG then
> + error('Error injection: cfg')
> + end
4. I think, you should place this injection before box.cfg.
All the cfg() code relies on the fact, that box.cfg is atomic.
And if box.cfg is ok, then storage/router.cfg finish with
no errors. An error is possible only before box.cfg and in
box.cfg. You may think box.cfg like commit, and code before
box.cfg like prepare.
> M.total_bucket_count = total_bucket_count
> M.collect_lua_garbage = collect_lua_garbage
> -- TODO: update existing route map in-place
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH 1/2] Add test on error during reconfigure
2018-06-17 19:46 ` [tarantool-patches] " Vladislav Shpilevoy
@ 2018-06-18 10:10 ` Alex Khatskevich
2018-06-18 10:48 ` Vladislav Shpilevoy
0 siblings, 1 reply; 14+ messages in thread
From: Alex Khatskevich @ 2018-06-18 10:10 UTC (permalink / raw)
To: Vladislav Shpilevoy, tarantool-patches
>> +-- Error during reconfigure process.
>> +_ = vshard.router.route(1):callro('echo', {'some_data'})
>
> 1. Why do you need this call here? It does nor outputs
> nothing.
Because next call checks a raised error.
This line ensures that an error ocured only after
`vshard.router.internal.errinj.ERRINJ_CFG = true`
just by not failing.
I did it to prevent test.result flooding.
>> +---
>> +...
>> +vshard.router.internal.errinj.ERRINJ_CFG = true
>> +---
>> +...
>> +old_internal = table.copy(vshard.router.internal)
>> +---
>> +...
>> +_, err = pcall(vshard.router.cfg, cfg)
>> +---
> 2. Same. Maybe this call should output something? This
> test would not fail if callro failed. Call/ro/rw do not
> throw exceptions but return a pair: result and error if
> occurred.
The error I test here returns as a raised exception.
> 3. Same. Here and at the end of the test.
Same
>> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>> index 21093e5..1dee80c 100644
>> --- a/vshard/router/init.lua
>> +++ b/vshard/router/init.lua
>> @@ -473,6 +474,12 @@ local function router_cfg(cfg)
>> end
>> box.cfg(cfg)
>> log.info("Box has been configured")
>> + -- It is considered that all possible errors during cfg
>> + -- process occur only before this place.
>> + -- This check should be placed as late as possible.
>> + if M.errinj.ERRINJ_CFG then
>> + error('Error injection: cfg')
>> + end
>
> 4. I think, you should place this injection before box.cfg.
> All the cfg() code relies on the fact, that box.cfg is atomic.
> And if box.cfg is ok, then storage/router.cfg finish with
> no errors. An error is possible only before box.cfg and in
> box.cfg. You may think box.cfg like commit, and code before
> box.cfg like prepare.
In this case it does not matter, because it is just an errinj. However I
wanted to show here,
that error should be raised as late as possible. It can help in the
future, if someone adds a code after `box.cfg`. I prefer errnj to be
here and will move it only if you insist.
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH 1/2] Add test on error during reconfigure
2018-06-18 10:10 ` Alex Khatskevich
@ 2018-06-18 10:48 ` Vladislav Shpilevoy
2018-06-19 10:36 ` Alex Khatskevich
0 siblings, 1 reply; 14+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-18 10:48 UTC (permalink / raw)
To: Alex Khatskevich, tarantool-patches
Hello. Thanks for the fixes!
On 18/06/2018 13:10, Alex Khatskevich wrote:
>
>>> +-- Error during reconfigure process.
>>> +_ = vshard.router.route(1):callro('echo', {'some_data'})
>>
>> 1. Why do you need this call here? It does nor outputs
>> nothing.
> Because next call checks a raised error.
callro never raises (except OOM and usage error). So if here
an error occurs, you will not see it. Same in other places below.
> This line ensures that an error ocured only after
> `vshard.router.internal.errinj.ERRINJ_CFG = true`
> just by not failing.
>
> I did it to prevent test.result flooding.
>>> +---
>>> +...
>>> +vshard.router.internal.errinj.ERRINJ_CFG = true
>>> +---
>>> +...
>>> +old_internal = table.copy(vshard.router.internal)
>>> +---
>>> +...
>>> +_, err = pcall(vshard.router.cfg, cfg)
>>> +---
>> 2. Same. Maybe this call should output something? This
>> test would not fail if callro failed. Call/ro/rw do not
>> throw exceptions but return a pair: result and error if
>> occurred.
> The error I test here returns as a raised exception.
>> 3. Same. Here and at the end of the test.
> Same
>>> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>>> index 21093e5..1dee80c 100644
>>> --- a/vshard/router/init.lua
>>> +++ b/vshard/router/init.lua
>>> @@ -473,6 +474,12 @@ local function router_cfg(cfg)
>>> end
>>> box.cfg(cfg)
>>> log.info("Box has been configured")
>>> + -- It is considered that all possible errors during cfg
>>> + -- process occur only before this place.
>>> + -- This check should be placed as late as possible.
>>> + if M.errinj.ERRINJ_CFG then
>>> + error('Error injection: cfg')
>>> + end
>>
>> 4. I think, you should place this injection before box.cfg.
>> All the cfg() code relies on the fact, that box.cfg is atomic.
>> And if box.cfg is ok, then storage/router.cfg finish with
>> no errors. An error is possible only before box.cfg and in
>> box.cfg. You may think box.cfg like commit, and code before
>> box.cfg like prepare.
> In this case it does not matter, because it is just an errinj. However I wanted to show here,
> that error should be raised as late as possible. It can help in the future, if someone adds a code after `box.cfg`. I prefer errnj to be here and will move it only if you insist.
I understand, but any error is impossible here after box.cfg{}. Please, move this
injection one line up.
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH 2/2] Complete module reload
2018-06-17 19:46 ` [tarantool-patches] " Vladislav Shpilevoy
@ 2018-06-19 7:09 ` Alex Khatskevich
2018-06-19 9:00 ` Vladislav Shpilevoy
0 siblings, 1 reply; 14+ messages in thread
From: Alex Khatskevich @ 2018-06-19 7:09 UTC (permalink / raw)
To: Vladislav Shpilevoy, tarantool-patches
On 17.06.2018 22:46, Vladislav Shpilevoy wrote:
> Thanks for the patch! See 11 comments below.
>
> On 09/06/2018 20:47, AKhatskevich wrote:
>> In case one need to upgrade vshard to a new version, this commit
>> introduces a reload mechanism which allows to do that for a
>> noticeable variety of possible changes (between the two versions).
>
> Nitpicking: it does not introduce reload mechanism. This commit
> just improves the thing.
>
> Maybe should we place here a manual how to reload?
> A short example.
added
>
>>
>> Reload process:
>> * reload all vshard modules
>> * create new `replicaset` and `replica` objects
>> * reuse old netbox connections in new replica objects if
>> possible
>> * update router/storage.internal table
>> * after a `reload_deprecate_timeout` disable old instances of
>> `replicaset` and `replica` objects
>>
>> Reload works for modules:
>> * vshard.router
>> * vshard.storage
>>
>> In case reload process fails, old router/storage module continue
>> working properly.
>>
>> Extra changes:
>> * add `util.async_task` method, which runs a function after a
>> delay
>> * delete replicaset:rebind_connections method as it is replaced
>> with `rebind_replicasets` which updates all replicasets at once
>> * introduce `module_reloading` for distinguishing reloaf from
>
> 1. reloaf -> reload.
fixed
>
>> reconfigure
>> * introduce MODULE_INTERNALS which stores name of the module
>> internal data in the global namespace
>>
>> Closes #112
>> ---
>> test/router/reload.result | 109
>> +++++++++++++++++++++++++++++++++++++++++++
>> test/router/reload.test.lua | 41 ++++++++++++++++
>> test/router/router.result | 3 +-
>> test/storage/reload.result | 61 ++++++++++++++++++++++++
>> test/storage/reload.test.lua | 24 ++++++++++
>> vshard/replicaset.lua | 91
>> +++++++++++++++++++++++++++++-------
>> vshard/router/init.lua | 40 ++++++++++++----
>> vshard/storage/init.lua | 44 ++++++++++++-----
>> vshard/util.lua | 20 ++++++++
>> 9 files changed, 392 insertions(+), 41 deletions(-)
>>
>
> I will review the tests later, after we finish with the code.
>
>> diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
>> index 99f59aa..48053a0 100644
>> --- a/vshard/replicaset.lua
>> +++ b/vshard/replicaset.lua
>> @@ -48,7 +48,8 @@ local lerror = require('vshard.error')
>> local fiber = require('fiber')
>> local luri = require('uri')
>> local ffi = require('ffi')
>> -local gsc = require('vshard.util').generate_self_checker
>> +local util = require('vshard.util')
>> +local gsc = util.generate_self_checker
>> --
>> -- on_connect() trigger for net.box
>> @@ -338,26 +339,79 @@ local function replicaset_tostring(replicaset)
>> end
>> --
>> --- Rebind connections of old replicas to new ones.
>> +-- Deprecate old objects in case of reload.
>> +--
>> +local function clean_old_objects(old_replicaset_mt, old_replica_mt,
>> + old_replicas)
>> + local function get_deprecated_warning(obj_name)
>> + return function(...)
>> + local finfo = debug.getinfo(2)
>> + local file = finfo.short_src
>> + local name = finfo.name
>> + local line = finfo.currentline
>> + local err_fmt = '%s.%s:%s: Object %s is outdated. Use
>> new instance'
>> + error(string.format(err_fmt, file, name, line, obj_name))
>> + end
>> + end
> 2.1 Why could not you just declare this function out of
> clean_old_object()?
> It does not depend on any upvalue here. And it is parsed every time when
> clean_old_objects is called slowing it down.
This function is called only on vshard reload and the pattern does not
slow down that process
noticeable.
However it makes code much more readable.
Looking through the module you do not loose your attention on huge
collection of different functions. Those helper-functions are used only
inside of the `clean_old_objects` function and that fact is emphasized.
>
> 2.2. Too complex error message. Lets just throw error that object is
> outdated.
I have changed the API. Returning simple vshard error instead of raising
an exception.
> 3. Why replace_mt_methods needs old mt? Why not just replace the whole
> old
> __index?
>
> I thought you would do something like this:
>
> getmetatable(old_object).__index = function()
> return function() error('The object is outdated') end
> end
>
> Any attempt to call a method in an old object leads to
> throwing an error.
>
> It allows to remove both replace_mt_methods, get_deprecated_warning.
>
> And one more remark: our convention is to throw only OOM and misusage
> errors. Here we should return two values: nil, error_object. Lets
> add a new error code for this object.
Fixed
>
>> + for _, replica in pairs(old_replicas) do
>> + replica.conn = nil
>> + end
>> + log.info('Old replicaset and replica objects are cleaned up.')
>> +end
>> +
>> --
>> -local function replicaset_rebind_connections(replicaset)
>> - for _, replica in pairs(replicaset.replicas) do
>> - local old_replica = replica.old_replica
>> - if old_replica then
>> - local conn = old_replica.conn
>> - replica.conn = conn
>> - replica.down_ts = old_replica.down_ts
>> - replica.net_timeout = old_replica.net_timeout
>> - replica.net_sequential_ok = old_replica.net_sequential_ok
>> - replica.net_sequential_fail =
>> old_replica.net_sequential_fail
>> - if conn then
>> - conn.replica = replica
>> - conn.replicaset = replicaset
>> - old_replica.conn = nil
>> +-- Copy netbox conections from old replica objects to new ones.
>> +--
>> +local function rebind_replicasets(replicasets, deprecate,
>> deprecate_timeout)
>
> 4. Lets always deprecate old objects. Reconfigure is like reload
> called from API.
> And remove 'deprecate' parameter.
>
> On reconfiguration deprecating is even more useful because if during the
> reconfiguration a cluster topology is changed, then some of buckets
> can be
> transferred making old routes (and thus old replicaset objects that
> had been got by router.route() call) outdated.
Discussed verbally. The main reason to deprecate old objects is that we
have to keep track of
all objects to reload them and to reistablish connections in th future.
Fixed.
>
>> + -- Collect data to deprecate old objects.
>> + local old_replicaset_mt = nil
>> + local old_replica_mt = nil
>> + local old_replicas = {}
>> + for _, replicaset in pairs(replicasets) do
>> + if replicaset.old_replicaset then
>> + local old_mt = getmetatable(replicaset.old_replicaset)
>> + assert(old_replicaset_mt == nil or old_replicaset_mt ==
>> old_mt)
>> + assert(not deprecate or
>> + old_replicaset_mt ~= getmetatable(replicaset))
>> + old_replicaset_mt = old_mt
>> + replicaset.old_replicaset = nil
>> + end
>> + for _, replica in pairs(replicaset.replicas) do
>> + local old_replica = replica.old_replica
>> + if old_replica then
>> + local old_mt = getmetatable(old_replica)
>> + assert(old_replica_mt == nil or old_replica_mt ==
>> old_mt)
>> + assert(not deprecate or old_replica_mt ~=
>> getmetatable(replica))
>> + old_replica_mt = old_mt
>> + table.insert(old_replicas, old_replica)
>> + local conn = old_replica.conn
>> + replica.conn = conn
>> + replica.down_ts = old_replica.down_ts
>> + replica.net_timeout = old_replica.net_timeout
>> + replica.net_sequential_ok =
>> old_replica.net_sequential_ok
>> + replica.net_sequential_fail =
>> old_replica.net_sequential_fail
>> + if conn then
>> + conn.replica = replica
>> + conn.replicaset = replicaset
>> + end
>> + replica.old_replica = nil
>> end
>> - replica.old_replica = nil
>> end
>> end
>> + if deprecate then
>> + util.async_task(deprecate_timeout, clean_old_objects,
>> old_replicaset_mt,
>> + old_replica_mt, old_replicas)
>
> 5. Here you have lost replicas and replicasets that had been removed from
> the configuration. Not updated, but exactly removed. You iterate over new
> replicasets only, that has not links to removed ones.
>
> You should not deprecate old objects via links in new ones. You should
> have
> two different methods: to schedule old objects deprecation and to rebind
> connections. Looks, like rebind_connections should be returned.
I did not like old version of the method because iе had to be called for
each replicaset.
I do want nor create as many fibers for deprecation nor bypass the fiber
between calls.
To fix issue I had to write function `copy_metatables()` to distinguish
metatables of old and new
objects.
Fixed.
>
>> + end
>> end
>> --
>> @@ -523,6 +576,7 @@ local function buildall(sharding_cfg,
>> old_replicasets)
>> weight = replicaset.weight,
>> bucket_count = 0,
>> lock = replicaset.lock,
>> + old_replicaset = old_replicaset,
>
> 6. You should not have this link.
old_replicaset deleted
old_replica deleted
(this functionality is moved to `rebind_replicasets`)
>
>> }, replicaset_mt)
>> local priority_list = {}
>> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>> index 1dee80c..c075436 100644
>> --- a/vshard/router/init.lua
>> +++ b/vshard/router/init.lua
>> @@ -129,6 +143,9 @@ local function discovery_f(module_version)
>> consts.COLLECT_LUA_GARBAGE_INTERVAL /
>> consts.DISCOVERY_INTERVAL
>> while module_version == M.module_version do
>> for _, replicaset in pairs(M.replicasets) do
>> + if module_version ~= M.module_version then
>> + return
>> + end
>
> 7. As I told you verbally, this check should be done after callro.
> I can reload the module during callro(), and old replicaset object
> will fill the route map. Can you please extract this 3-line fix in a
> separate commit with a test?
Yes. I have done it in the other commit
(
along with gh-117
https://github.com/tarantool/vshard/commit/fa97ea478a9e4d85237eeb2bb8ccab0067d4914d
)
So, I would jut delete it from this patchset and leave for gh-117.
>
>> local active_buckets, err =
>> replicaset:callro('vshard.storage.buckets_discovery', {},
>> {timeout = 2})
>> @@ -457,8 +474,11 @@ end
>> -- Configuration
>> --------------------------------------------------------------------------------
>> +-- Distinguish reload from reconfigure.
>> +local module_reloading = true
>> local function router_cfg(cfg)
>> cfg = lcfg.check(cfg)
>> + local current_cfg = table.deepcopy(cfg)
>
> 8. cfg is already deepcopy of the original cfg.
Yes, however there is a `lcfg.remove_non_box_options(cfg)`.
Possibly, for extensibility it is better to make a cfg copy just
for box.cfg{} before calling `remove_non_box_options`.
>
>> if not M.replicasets then
>> log.info('Starting router configuration')
>> else
>> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
>> index 879c7c4..b570821 100644
>> --- a/vshard/storage/init.lua
>> +++ b/vshard/storage/init.lua
>> @@ -2,14 +2,27 @@ local log = require('log')
>> local luri = require('uri')
>> local lfiber = require('fiber')
>> local netbox = require('net.box') -- for net.box:self()
>> +local trigger = require('internal.trigger')
>> +
>> +local MODULE_INTERNALS = '__module_vshard_storage'
>> +-- Reload requirements, in case this module is reloaded manually.
>> +if rawget(_G, MODULE_INTERNALS) then
>> + local vshard_modules = {
>> + 'vshard.consts', 'vshard.error', 'vshard.cfg',
>> + 'vshard.replicaset', 'vshard.util',
>> + }
>> + for _, module in pairs(vshard_modules) do
>> + package.loaded[module] = nil
>> + end
>> +end
>> local consts = require('vshard.consts')
>> local lerror = require('vshard.error')
>> -local util = require('vshard.util')
>> local lcfg = require('vshard.cfg')
>> local lreplicaset = require('vshard.replicaset')
>> -local trigger = require('internal.trigger')
>> +local util = require('vshard.util')
>> -local M = rawget(_G, '__module_vshard_storage')
>> +
>
> 9. Extra empty line.
fixed
>> +local M = rawget(_G, MODULE_INTERNALS)
>> if not M then
>> --
>> -- The module is loaded for the first time.
>> @@ -21,6 +34,8 @@ if not M then
>> -- See format in replicaset.lua.
>> --
>> replicasets = nil,
>> + -- Time to deprecate old objects on reload.
>> + reload_deprecate_timeout = nil,
>
> 10. Looks like this thing is always 'nil' until somebody does
> not manually set it via storage/router.internal. It is not good.
> Please, move it to configuration and create a default value.
Fixed
>
>> -- Triggers on master switch event. They are called right
>> -- before the event occurs.
>> _on_master_enable = trigger.new('_on_master_enable'),
>> diff --git a/vshard/util.lua b/vshard/util.lua
>> index bb71318..f5bd78c 100644
>> --- a/vshard/util.lua
>> +++ b/vshard/util.lua
>> @@ -75,8 +75,28 @@ local function generate_self_checker(obj_name,
>> func_name, mt, func)
>> end
>> end
>> +
>> +--
>> +-- Run a function without interrupting current fiber.
>> +-- @param delay Delay in seconds before the task should be
>> +-- executed.
>> +-- @param task Function to be executed.
>> +-- @param ... Arguments which would be passed to the `task`.
>> +--
>> +local function async_task(delay, task, ...)
>> + assert(delay == nil or type(delay) == 'number')
>> + local function wait_and_call(...)
>> + if delay then
>> + fiber.sleep(delay)
>> + end
>> + task(...)
>> + end
>
> 11. Please, avoid closures when possible. Every time the function is
> called the closure is parsed again.
As in the previous case. I did it to increase readability.
Fixed. (sync_task created)
I also have renamed Deprecate -> Outdate.
Here is a new diff:
commit fd6d22cbeef28097199b36e34bc7bf33f659f40a
Author: AKhatskevich <avkhatskevich@tarantool.org>
Date: Sat Jun 9 17:23:40 2018 +0300
Complete module reload
In case one need to upgrade vshard to a new version, this commit
improves reload mechanism to allow to do that for a wider variety of
possible changes (between two versions).
Changes:
* introduce cfg option `connection_outdate_delay`
* improve reload mechanism
* add `util.async_task` method, which runs a function after a
delay
* delete replicaset:rebind_connections method as it is replaced
with `rebind_replicasets` which updates all replicasets at once
Reload mechanism:
* reload all vshard modules
* create new `replicaset` and `replica` objects
* reuse old netbox connections in new replica objects if
possible
* update router/storage.internal table
* after a `connection_outdate_delay` disable old instances of
`replicaset` and `replica` objects
Reload works for modules:
* vshard.router
* vshard.storage
Here is a module reload algorihtm:
* old vshard is working
* delete old vshard src
* install new vshard
* call: package.loaded['vshard.router'] = nil
* call: old_router = vshard.router -- Save working router copy.
* call: vshard.router = require('vshard.router')
* if require fails: continue using old_router
* if require succeeds: use vshard.router
In case reload process fails, old router/storage module, replicaset and
replica objects continue working properly. If reload succeeds, all old
objects would be deprecated.
Extra changes:
* introduce MODULE_INTERNALS which stores name of the module
internal data in the global namespace
Closes #112
diff --git a/test/router/reload.result b/test/router/reload.result
index 19a9ead..6517934 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -174,6 +174,143 @@ vshard.router.module_version()
check_reloaded()
---
...
+--
+-- Outdate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_ = require('vshard.router')
+---
+...
+-- Make sure outdate async task has had cpu time.
+fiber.sleep(0.005)
+---
+...
+rs.callro(rs, 'echo', {'some_data'})
+---
+- null
+- type: ShardingError
+ name: OBJECT_IS_OUTDATED
+ message: Object is outdated after module reload/reconfigure. Use new
instance.
+ code: 20
+...
+vshard.router = require('vshard.router')
+---
+...
+rs = vshard.router.route(1)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+-- Test `connection_outdate_delay`.
+old_connection_delay = cfg.connection_outdate_delay
+---
+...
+cfg.connection_outdate_delay = 0.3
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+cfg.connection_outdate_delay = old_connection_delay
+---
+...
+vshard.router.internal.connection_outdate_delay = nil
+---
+...
+vshard.router = require('vshard.router')
+---
+...
+rs_new = vshard.router.route(1)
+---
+...
+rs_new:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+fiber.sleep(0.2)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+fiber.sleep(0.2)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- null
+- type: ShardingError
+ name: OBJECT_IS_OUTDATED
+ message: Object is outdated after module reload/reconfigure. Use new
instance.
+ code: 20
+...
+err:match('Object replicaset is outdated.*')
+---
+- error: '[string "return err:match(''Object replicaset is
outdat..."]:1: variable
+ ''err'' is not declared'
+...
+rs_new:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
+vshard.router.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.router.internal)
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_, err = pcall(require, 'vshard.router')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.router.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.router.internal)
+---
+- true
+...
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
test_run:switch('default')
---
- true
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 6e21b74..9b73870 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -86,6 +86,47 @@ _ = require('vshard.router')
vshard.router.module_version()
check_reloaded()
+--
+-- Outdate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+rs:callro('echo', {'some_data'})
+package.loaded["vshard.router"] = nil
+_ = require('vshard.router')
+-- Make sure outdate async task has had cpu time.
+fiber.sleep(0.005)
+rs.callro(rs, 'echo', {'some_data'})
+vshard.router = require('vshard.router')
+rs = vshard.router.route(1)
+rs:callro('echo', {'some_data'})
+-- Test `connection_outdate_delay`.
+old_connection_delay = cfg.connection_outdate_delay
+cfg.connection_outdate_delay = 0.3
+vshard.router.cfg(cfg)
+cfg.connection_outdate_delay = old_connection_delay
+vshard.router.internal.connection_outdate_delay = nil
+vshard.router = require('vshard.router')
+rs_new = vshard.router.route(1)
+rs_new:callro('echo', {'some_data'})
+rs:callro('echo', {'some_data'})
+fiber.sleep(0.2)
+rs:callro('echo', {'some_data'})
+fiber.sleep(0.2)
+rs:callro('echo', {'some_data'})
+err:match('Object replicaset is outdated.*')
+rs_new:callro('echo', {'some_data'})
+
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+vshard.router.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.router.internal)
+package.loaded["vshard.router"] = nil
+_, err = pcall(require, 'vshard.router')
+err:match('Error injection:.*')
+vshard.router.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.router.internal)
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+
test_run:switch('default')
test_run:cmd('stop server router_1')
test_run:cmd('cleanup server router_1')
diff --git a/test/router/router.result b/test/router/router.result
index 3ebab5d..71e156c 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -1024,11 +1024,10 @@ error_messages
- - Use replicaset:callro(...) instead of replicaset.callro(...)
- Use replicaset:connect_master(...) instead of
replicaset.connect_master(...)
- Use replicaset:connect_replica(...) instead of
replicaset.connect_replica(...)
- - Use replicaset:rebind_connections(...) instead of
replicaset.rebind_connections(...)
- Use replicaset:down_replica_priority(...) instead of
replicaset.down_replica_priority(...)
- Use replicaset:call(...) instead of replicaset.call(...)
- - Use replicaset:up_replica_priority(...) instead of
replicaset.up_replica_priority(...)
- Use replicaset:connect(...) instead of replicaset.connect(...)
+ - Use replicaset:up_replica_priority(...) instead of
replicaset.up_replica_priority(...)
- Use replicaset:callrw(...) instead of replicaset.callrw(...)
- Use replicaset:connect_all(...) instead of replicaset.connect_all(...)
...
diff --git a/test/storage/reload.result b/test/storage/reload.result
index f689cf4..8eec80f 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -174,6 +174,74 @@ vshard.storage.module_version()
check_reloaded()
---
...
+--
+-- Outdate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_ = require('vshard.storage')
+---
+...
+rs.callro(rs, 'echo', {'some_data'})
+---
+- null
+- type: ShardingError
+ name: OBJECT_IS_OUTDATED
+ message: Object is outdated after module reload/reconfigure. Use new
instance.
+ code: 20
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs.callro(rs, 'echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_, err = pcall(require, 'vshard.storage')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.storage.internal)
+---
+- true
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
test_run:switch('default')
---
- true
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 6e19a92..7d870ff 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -87,6 +87,29 @@ _ = require('vshard.storage')
vshard.storage.module_version()
check_reloaded()
+--
+-- Outdate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+package.loaded["vshard.storage"] = nil
+_ = require('vshard.storage')
+rs.callro(rs, 'echo', {'some_data'})
+_, rs = next(vshard.storage.internal.replicasets)
+rs.callro(rs, 'echo', {'some_data'})
+
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+rs:callro('echo', {'some_data'})
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+_, err = pcall(require, 'vshard.storage')
+err:match('Error injection:.*')
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.storage.internal)
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs:callro('echo', {'some_data'})
+
test_run:switch('default')
test_run:drop_cluster(REPLICASET_2)
test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index f5db4c0..5703867 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -218,6 +218,10 @@ local cfg_template = {
type = 'non-negative number', name = 'Sync timeout',
is_optional = true,
default = consts.DEFAULT_SYNC_TIMEOUT
}},
+ {'connection_outdate_delay', {
+ type = 'non-negative number', name = 'Object outdate timeout',
+ is_optional = true, default = nil
+ }},
}
--
@@ -247,6 +251,8 @@ local function remove_non_box_options(cfg)
cfg.collect_bucket_garbage_interval = nil
cfg.collect_lua_garbage = nil
cfg.sync_timeout = nil
+ cfg.sync_timeout = nil
+ cfg.connection_outdate_delay = nil
end
return {
diff --git a/vshard/error.lua b/vshard/error.lua
index cf2f9d2..9df2c53 100644
--- a/vshard/error.lua
+++ b/vshard/error.lua
@@ -100,6 +100,11 @@ local error_message_template = {
[19] = {
name = 'REPLICASET_IS_LOCKED',
msg = 'Replicaset is locked'
+ },
+ [20] = {
+ name = 'OBJECT_IS_OUTDATED',
+ msg = 'Object is outdated after module reload/reconfigure. ' ..
+ 'Use new instance.'
}
}
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 99f59aa..4e7bc10 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -48,7 +48,8 @@ local lerror = require('vshard.error')
local fiber = require('fiber')
local luri = require('uri')
local ffi = require('ffi')
-local gsc = require('vshard.util').generate_self_checker
+local util = require('vshard.util')
+local gsc = util.generate_self_checker
--
-- on_connect() trigger for net.box
@@ -338,26 +339,80 @@ local function replicaset_tostring(replicaset)
end
--
--- Rebind connections of old replicas to new ones.
+-- Outdate old objects in case of reload/reconfigure.
--
-local function replicaset_rebind_connections(replicaset)
- for _, replica in pairs(replicaset.replicas) do
- local old_replica = replica.old_replica
- if old_replica then
- local conn = old_replica.conn
- replica.conn = conn
- replica.down_ts = old_replica.down_ts
- replica.net_timeout = old_replica.net_timeout
- replica.net_sequential_ok = old_replica.net_sequential_ok
- replica.net_sequential_fail = old_replica.net_sequential_fail
- if conn then
- conn.replica = replica
- conn.replicaset = replicaset
- old_replica.conn = nil
+local function cleanup_old_objects(old_replicaset_mt, old_replica_mt,
+ old_replicas)
+ local function get_outdated_warning()
+ return function(...)
+ return nil, lerror.vshard(lerror.code.OBJECT_IS_OUTDATED)
+ end
+ end
+ old_replicaset_mt.__index = get_outdated_warning
+ -- Leave replica_mt unchanged. All its methods are safe.
+ for _, replica in pairs(old_replicas) do
+ replica.conn = nil
+ end
+ log.info('Old replicaset and replica objects are outdated.')
+end
+
+--
+-- Initiate outdating process, which cancels connections and
+-- spoils object methods.
+--
+local function outdate_replicasets(replicasets, outdate_delay)
+ -- Collect data to outdate old objects.
+ local old_replicaset_mt = nil
+ local old_replica_mt = nil
+ local old_replicas = {}
+ for _, replicaset in pairs(replicasets) do
+ local old_mt = getmetatable(replicaset)
+ assert(old_replicaset_mt == nil or old_replicaset_mt == old_mt)
+ old_replicaset_mt = old_mt
+ for _, replica in pairs(replicaset.replicas) do
+ table.insert(old_replicas, replica)
+ local old_mt = getmetatable(replica)
+ assert(old_replica_mt == nil or old_replica_mt == old_mt)
+ old_replica_mt = old_mt
+ end
+ end
+ util.async_task(outdate_delay, cleanup_old_objects, old_replicaset_mt,
+ old_replica_mt, old_replicas)
+end
+
+
+--
+-- Copy netbox conections from old replica objects to new ones
+-- and outdate old objects.
+-- @param replicasets New replicasets
+-- @param old_replicasets Replicasets and replicas to be outdated.
+-- @param outdate_delay Number of seconds; delay to outdate
+-- old objects.
+--
+local function rebind_replicasets(replicasets, old_replicasets,
outdate_delay)
+ for replicaset_uuid, replicaset in pairs(replicasets) do
+ local old_replicaset = old_replicasets and
+ old_replicasets[replicaset_uuid]
+ for replica_uuid, replica in pairs(replicaset.replicas) do
+ local old_replica = old_replicaset and
+ old_replicaset.replicas[replica_uuid]
+ if old_replica then
+ local conn = old_replica.conn
+ replica.conn = conn
+ replica.down_ts = old_replica.down_ts
+ replica.net_timeout = old_replica.net_timeout
+ replica.net_sequential_ok = old_replica.net_sequential_ok
+ replica.net_sequential_fail =
old_replica.net_sequential_fail
+ if conn then
+ conn.replica = replica
+ conn.replicaset = replicaset
+ end
end
- replica.old_replica = nil
end
end
+ if old_replicasets then
+ outdate_replicasets(old_replicasets, outdate_delay)
+ end
end
--
@@ -369,7 +424,6 @@ local replicaset_mt = {
connect_master = replicaset_connect_master;
connect_all = replicaset_connect_all;
connect_replica = replicaset_connect_to_replica;
- rebind_connections = replicaset_rebind_connections;
down_replica_priority = replicaset_down_replica_priority;
up_replica_priority = replicaset_up_replica_priority;
call = replicaset_master_call;
@@ -378,14 +432,6 @@ local replicaset_mt = {
};
__tostring = replicaset_tostring;
}
---
--- Wrap self methods with a sanity checker.
---
-local index = {}
-for name, func in pairs(replicaset_mt.__index) do
- index[name] = gsc("replicaset", name, replicaset_mt, func)
-end
-replicaset_mt.__index = index
local replica_mt = {
__index = {
@@ -406,11 +452,30 @@ local replica_mt = {
end
end,
}
-index = {}
-for name, func in pairs(replica_mt.__index) do
- index[name] = gsc("replica", name, replica_mt, func)
+
+--
+-- Get new copy of replicaset and replica meta tables.
+-- It is important to have distinct copies of meta tables to
+-- be able to outdate only old objects.
+--
+local function copy_metatables()
+ --
+ -- Wrap self methods with a sanity checker.
+ --
+ local replicaset_mt_copy = table.copy(replicaset_mt)
+ local replica_mt_copy = table.copy(replica_mt)
+ local index = {}
+ for name, func in pairs(replicaset_mt_copy.__index) do
+ index[name] = gsc("replicaset", name, replicaset_mt_copy, func)
+ end
+ replicaset_mt_copy.__index = index
+ index = {}
+ for name, func in pairs(replica_mt_copy.__index) do
+ index[name] = gsc("replica", name, replica_mt_copy, func)
+ end
+ replica_mt_copy.__index = index
+ return replicaset_mt_copy, replica_mt_copy
end
-replica_mt.__index = index
--
-- Calculate for each replicaset its etalon bucket count.
@@ -514,20 +579,17 @@ local function buildall(sharding_cfg, old_replicasets)
zone_weights = {}
end
local curr_ts = fiber.time()
+ local replicaset_mt_copy, replica_mt_copy = copy_metatables()
for replicaset_uuid, replicaset in pairs(sharding_cfg.sharding) do
- local old_replicaset = old_replicasets and
- old_replicasets[replicaset_uuid]
local new_replicaset = setmetatable({
replicas = {},
uuid = replicaset_uuid,
weight = replicaset.weight,
bucket_count = 0,
lock = replicaset.lock,
- }, replicaset_mt)
+ }, replicaset_mt_copy)
local priority_list = {}
for replica_uuid, replica in pairs(replicaset.replicas) do
- local old_replica = old_replicaset and
- old_replicaset.replicas[replica_uuid]
-- The old replica is saved in the new object to
-- rebind its connection at the end of a
-- router/storage reconfiguration.
@@ -535,8 +597,8 @@ local function buildall(sharding_cfg, old_replicasets)
uri = replica.uri, name = replica.name, uuid =
replica_uuid,
zone = replica.zone, net_timeout =
consts.CALL_TIMEOUT_MIN,
net_sequential_ok = 0, net_sequential_fail = 0,
- down_ts = curr_ts, old_replica = old_replica,
- }, replica_mt)
+ down_ts = curr_ts,
+ }, replica_mt_copy)
new_replicaset.replicas[replica_uuid] = new_replica
if replica.master then
new_replicaset.master = new_replica
@@ -596,4 +658,6 @@ return {
buildall = buildall,
calculate_etalon_balance = cluster_calculate_etalon_balance,
wait_masters_connect = wait_masters_connect,
+ rebind_replicasets = rebind_replicasets,
+ outdate_replicasets = outdate_replicasets,
}
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 1dee80c..d70d060 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -1,5 +1,17 @@
local log = require('log')
local lfiber = require('fiber')
+
+local MODULE_INTERNALS = '__module_vshard_router'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+ local vshard_modules = {
+ 'vshard.consts', 'vshard.error', 'vshard.cfg',
+ 'vshard.hash', 'vshard.replicaset', 'vshard.util',
+ }
+ for _, module in pairs(vshard_modules) do
+ package.loaded[module] = nil
+ end
+end
local consts = require('vshard.consts')
local lerror = require('vshard.error')
local lcfg = require('vshard.cfg')
@@ -7,7 +19,7 @@ local lhash = require('vshard.hash')
local lreplicaset = require('vshard.replicaset')
local util = require('vshard.util')
-local M = rawget(_G, '__module_vshard_router')
+local M = rawget(_G, MODULE_INTERNALS)
if not M then
M = {
errinj = {
@@ -15,6 +27,8 @@ if not M then
ERRINJ_FAILOVER_CHANGE_CFG = false,
ERRINJ_RELOAD = false,
},
+ -- Time to outdate old objects on reload.
+ connection_outdate_delay = nil,
-- Bucket map cache.
route_map = {},
-- All known replicasets used for bucket re-balancing
@@ -459,6 +473,7 @@ end
local function router_cfg(cfg)
cfg = lcfg.check(cfg)
+ local current_cfg = table.deepcopy(cfg)
if not M.replicasets then
log.info('Starting router configuration')
else
@@ -480,17 +495,18 @@ local function router_cfg(cfg)
if M.errinj.ERRINJ_CFG then
error('Error injection: cfg')
end
+ M.connection_outdate_delay = current_cfg.connection_outdate_delay
M.total_bucket_count = total_bucket_count
M.collect_lua_garbage = collect_lua_garbage
+ M.current_cfg = current_cfg
-- TODO: update existing route map in-place
M.route_map = {}
- M.replicasets = new_replicasets
-- Move connections from an old configuration to a new one.
-- It must be done with no yields to prevent usage both of not
-- fully moved old replicasets, and not fully built new ones.
- for _, replicaset in pairs(new_replicasets) do
- replicaset:rebind_connections()
- end
+ lreplicaset.rebind_replicasets(new_replicasets, M.replicasets,
+ M.connection_outdate_delay)
+ M.replicasets = new_replicasets
-- Now the new replicasets are fully built. Can establish
-- connections and yield.
for _, replicaset in pairs(new_replicasets) do
@@ -776,15 +792,16 @@ end
-- About functions, saved in M, and reloading see comment in
-- storage/init.lua.
--
-M.discovery_f = discovery_f
-M.failover_f = failover_f
-
-if not rawget(_G, '__module_vshard_router') then
- rawset(_G, '__module_vshard_router', M)
+if not rawget(_G, MODULE_INTERNALS) then
+ rawset(_G, MODULE_INTERNALS, M)
else
+ router_cfg(M.current_cfg)
M.module_version = M.module_version + 1
end
+M.discovery_f = discovery_f
+M.failover_f = failover_f
+
return {
cfg = router_cfg;
info = router_info;
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 879c7c4..50347b0 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -2,14 +2,26 @@ local log = require('log')
local luri = require('uri')
local lfiber = require('fiber')
local netbox = require('net.box') -- for net.box:self()
+local trigger = require('internal.trigger')
+
+local MODULE_INTERNALS = '__module_vshard_storage'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+ local vshard_modules = {
+ 'vshard.consts', 'vshard.error', 'vshard.cfg',
+ 'vshard.replicaset', 'vshard.util',
+ }
+ for _, module in pairs(vshard_modules) do
+ package.loaded[module] = nil
+ end
+end
local consts = require('vshard.consts')
local lerror = require('vshard.error')
-local util = require('vshard.util')
local lcfg = require('vshard.cfg')
local lreplicaset = require('vshard.replicaset')
-local trigger = require('internal.trigger')
+local util = require('vshard.util')
-local M = rawget(_G, '__module_vshard_storage')
+local M = rawget(_G, MODULE_INTERNALS)
if not M then
--
-- The module is loaded for the first time.
@@ -21,6 +33,8 @@ if not M then
-- See format in replicaset.lua.
--
replicasets = nil,
+ -- Time to deprecate old objects on reload.
+ reload_deprecate_timeout = nil,
-- Triggers on master switch event. They are called right
-- before the event occurs.
_on_master_enable = trigger.new('_on_master_enable'),
@@ -1450,6 +1464,7 @@ local function storage_cfg(cfg, this_replica_uuid)
error('Usage: cfg(configuration, this_replica_uuid)')
end
cfg = lcfg.check(cfg)
+ local current_cfg = table.deepcopy(cfg)
if cfg.weights or cfg.zone then
error('Weights and zone are not allowed for storage
configuration')
end
@@ -1571,10 +1586,8 @@ local function storage_cfg(cfg, this_replica_uuid)
local uri = luri.parse(this_replica.uri)
box.once("vshard:storage:1", storage_schema_v1, uri.login,
uri.password)
+ lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
M.replicasets = new_replicasets
- for _, replicaset in pairs(new_replicasets) do
- replicaset:rebind_connections()
- end
M.this_replicaset = this_replicaset
M.this_replica = this_replica
M.total_bucket_count = total_bucket_count
@@ -1583,6 +1596,7 @@ local function storage_cfg(cfg, this_replica_uuid)
M.shard_index = shard_index
M.collect_bucket_garbage_interval = collect_bucket_garbage_interval
M.collect_lua_garbage = collect_lua_garbage
+ M.current_cfg = current_cfg
if was_master and not is_master then
local_on_master_disable()
@@ -1813,6 +1827,14 @@ end
-- restarted (or is restarted from M.background_f, which is not
-- changed) and continues use old func1 and func2.
--
+
+if not rawget(_G, MODULE_INTERNALS) then
+ rawset(_G, MODULE_INTERNALS, M)
+else
+ storage_cfg(M.current_cfg, M.this_replica.uuid)
+ M.module_version = M.module_version + 1
+end
+
M.recovery_f = recovery_f
M.collect_garbage_f = collect_garbage_f
M.rebalancer_f = rebalancer_f
@@ -1828,12 +1850,6 @@ M.rebalancer_build_routes = rebalancer_build_routes
M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
M.cached_find_sharded_spaces = find_sharded_spaces
-if not rawget(_G, '__module_vshard_storage') then
- rawset(_G, '__module_vshard_storage', M)
-else
- M.module_version = M.module_version + 1
-end
-
return {
sync = sync,
bucket_force_create = bucket_force_create,
diff --git a/vshard/util.lua b/vshard/util.lua
index bb71318..4b859cc 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -75,8 +75,28 @@ local function generate_self_checker(obj_name,
func_name, mt, func)
end
end
+local function sync_task(delay, task, ...)
+ if delay then
+ fiber.sleep(delay)
+ end
+ task(...)
+end
+
+--
+-- Run a function without interrupting current fiber.
+-- @param delay Delay in seconds before the task should be
+-- executed.
+-- @param task Function to be executed.
+-- @param ... Arguments which would be passed to the `task`.
+--
+local function async_task(delay, task, ...)
+ assert(delay == nil or type(delay) == 'number')
+ fiber.create(sync_task, delay, task, ...)
+end
+
return {
tuple_extract_key = tuple_extract_key,
reloadable_fiber_f = reloadable_fiber_f,
generate_self_checker = generate_self_checker,
+ async_task = async_task,
}
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH 0/2][vshard] Vshard safe reload
2018-06-17 19:39 ` [tarantool-patches] Re: [PATCH 0/2][vshard] Vshard safe reload Vladislav Shpilevoy
@ 2018-06-19 7:20 ` Alex Khatskevich
0 siblings, 0 replies; 14+ messages in thread
From: Alex Khatskevich @ 2018-06-19 7:20 UTC (permalink / raw)
To: Vladislav Shpilevoy, tarantool-patches
> Please, describe here the point of the patchset.
> It could be not obvious for a witness.
>
>> Branch: https://github.com/tarantool/vshard/tree/kh/gh-112-reload-mt-2
>> Issue: https://github.com/tarantool/vshard/issues/112
>>
>> Two commits:
>> * Add test on error during reconfigure
>> * Complete module reload
>> reloads entire vshard, deprecating old objects
The aim of this patchset is to give a user ability to update Vshard
version without restarting
a server, even in case some relatively noticeable changes happened in
the Vshard.
Vshard will:
* reuse old connections
* outdate old user-visible objects (replica, replicaset) wich a
specified delay
* reuse route map (after #117)
* reload background fibers
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH 2/2] Complete module reload
2018-06-19 7:09 ` Alex Khatskevich
@ 2018-06-19 9:00 ` Vladislav Shpilevoy
2018-06-19 11:49 ` Alex Khatskevich
0 siblings, 1 reply; 14+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-19 9:00 UTC (permalink / raw)
To: Alex Khatskevich, tarantool-patches
Hello. Thanks for the fixes! See 10 comments below and
the commit with some new fixes on the branch.
> diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
> index 99f59aa..4e7bc10 100644
> --- a/vshard/replicaset.lua
> +++ b/vshard/replicaset.lua
> @@ -338,26 +339,80 @@ local function replicaset_tostring(replicaset)
> end
>
> --
> --- Rebind connections of old replicas to new ones.
> +-- Outdate old objects in case of reload/reconfigure.
> --
> -local function replicaset_rebind_connections(replicaset)
> - for _, replica in pairs(replicaset.replicas) do
> - local old_replica = replica.old_replica
> - if old_replica then
> - local conn = old_replica.conn
> - replica.conn = conn
> - replica.down_ts = old_replica.down_ts
> - replica.net_timeout = old_replica.net_timeout
> - replica.net_sequential_ok = old_replica.net_sequential_ok
> - replica.net_sequential_fail = old_replica.net_sequential_fail
> - if conn then
> - conn.replica = replica
> - conn.replicaset = replicaset
> - old_replica.conn = nil
> +local function cleanup_old_objects(old_replicaset_mt, old_replica_mt,
> + old_replicas)
> + local function get_outdated_warning()
> + return function(...)
> + return nil, lerror.vshard(lerror.code.OBJECT_IS_OUTDATED)
> + end
> + end
1. Please, declare this outdated_mt out of this function in a module
local variable like it is done with ordinal replica/replicaset mt. Not just
__index, but entire mt. See comment 3 why.
2. You did not outdate replica metatables. How did your tests pass then?
> + old_replicaset_mt.__index = get_outdated_warning
> + -- Leave replica_mt unchanged. All its methods are safe.
> + for _, replica in pairs(old_replicas) do
> + replica.conn = nil
> + end
> + log.info('Old replicaset and replica objects are outdated.')
> +end
> +
> +--
> +-- Initiate outdating process, which cancels connections and
> +-- spoils object methods.
> +--
> +local function outdate_replicasets(replicasets, outdate_delay)
> + -- Collect data to outdate old objects.
> + local old_replicaset_mt = nil
> + local old_replica_mt = nil
> + local old_replicas = {}
> + for _, replicaset in pairs(replicasets) do
> + local old_mt = getmetatable(replicaset)
> + assert(old_replicaset_mt == nil or old_replicaset_mt == old_mt)
> + old_replicaset_mt = old_mt
> + for _, replica in pairs(replicaset.replicas) do
> + table.insert(old_replicas, replica)
> + local old_mt = getmetatable(replica)
> + assert(old_replica_mt == nil or old_replica_mt == old_mt)
> + old_replica_mt = old_mt
> + end
> + end
> + util.async_task(outdate_delay, cleanup_old_objects, old_replicaset_mt,
> + old_replica_mt, old_replicas)
3. You do not need to search old_replica_mt and old_replicaset_mt here. You do not
need them at all. Just declare special object_outdate_mt and set this metatable to
the old objects. Not just __index, but entire metatable. So you can get rid of
this old mts search, old mt parameters.
> +end
> +
> +
> +--
> +-- Copy netbox conections from old replica objects to new ones
> +-- and outdate old objects.
> +-- @param replicasets New replicasets
> +-- @param old_replicasets Replicasets and replicas to be outdated.
> +-- @param outdate_delay Number of seconds; delay to outdate
> +-- old objects.
> +--
> +local function rebind_replicasets(replicasets, old_replicasets, outdate_delay)
> + for replicaset_uuid, replicaset in pairs(replicasets) do
> + local old_replicaset = old_replicasets and
> + old_replicasets[replicaset_uuid]> + for replica_uuid, replica in pairs(replicaset.replicas) do
> + local old_replica = old_replicaset and
> + old_replicaset.replicas[replica_uuid]> + if old_replica then
> + local conn = old_replica.conn
> + replica.conn = conn
> + replica.down_ts = old_replica.down_ts
> + replica.net_timeout = old_replica.net_timeout
> + replica.net_sequential_ok = old_replica.net_sequential_ok
> + replica.net_sequential_fail = old_replica.net_sequential_fail
> + if conn then
> + conn.replica = replica
> + conn.replicaset = replicaset
> + end
> end
> - replica.old_replica = nil
> end
> end
> + if old_replicasets then
> + outdate_replicasets(old_replicasets, outdate_delay)
4. Please, name it schedule_connections_outdate. Now the name
confuses. It could be thought to deprecate the objects now, not
after timeout.
> + end
> end
> @@ -406,11 +452,30 @@ local replica_mt = {
> end
> end,
> }
> -index = {}
> -for name, func in pairs(replica_mt.__index) do
> - index[name] = gsc("replica", name, replica_mt, func)
> +
> +--
> +-- Get new copy of replicaset and replica meta tables.
> +-- It is important to have distinct copies of meta tables to
> +-- be able to outdate only old objects.
> +--
> +local function copy_metatables()
5. Please, remove. This function is harmful and slow. You do not
need to copy metatables. Use my recommendations above and think.
> + --
> + -- Wrap self methods with a sanity checker.
> + --
> + local replicaset_mt_copy = table.copy(replicaset_mt)
> + local replica_mt_copy = table.copy(replica_mt)
> + local index = {}
> + for name, func in pairs(replicaset_mt_copy.__index) do
> + index[name] = gsc("replicaset", name, replicaset_mt_copy, func)
> + end
> + replicaset_mt_copy.__index = index
> + index = {}
> + for name, func in pairs(replica_mt_copy.__index) do
> + index[name] = gsc("replica", name, replica_mt_copy, func)
> + end
> + replica_mt_copy.__index = index
> + return replicaset_mt_copy, replica_mt_copy
> end
> -replica_mt.__index = index
>
> @@ -514,20 +579,17 @@ local function buildall(sharding_cfg, old_replicasets)
6. Unused parameter.
> zone_weights = {}
> end
> @@ -535,8 +597,8 @@ local function buildall(sharding_cfg, old_replicasets)
> uri = replica.uri, name = replica.name, uuid = replica_uuid,
> zone = replica.zone, net_timeout = consts.CALL_TIMEOUT_MIN,
> net_sequential_ok = 0, net_sequential_fail = 0,
> - down_ts = curr_ts, old_replica = old_replica,
> - }, replica_mt)
> + down_ts = curr_ts,
> + }, replica_mt_copy)
> new_replicaset.replicas[replica_uuid] = new_replica
> if replica.master then
> new_replicaset.master = new_replica
> @@ -596,4 +658,6 @@ return {
> buildall = buildall,
> calculate_etalon_balance = cluster_calculate_etalon_balance,
> wait_masters_connect = wait_masters_connect,
> + rebind_replicasets = rebind_replicasets,
> + outdate_replicasets = outdate_replicasets,
7. Why do you need to export this function?
> }
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 1dee80c..d70d060 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -459,6 +473,7 @@ end
>
> local function router_cfg(cfg)
> cfg = lcfg.check(cfg)
> + local current_cfg = table.deepcopy(cfg)
8. Ok, I understand why do you copy it, but please,
write a comment about it.
> if not M.replicasets then
> log.info('Starting router configuration')
> else
> @@ -776,15 +792,16 @@ end
> -- About functions, saved in M, and reloading see comment in
> -- storage/init.lua.
> --
> -M.discovery_f = discovery_f
> -M.failover_f = failover_f
> -
> -if not rawget(_G, '__module_vshard_router') then
> - rawset(_G, '__module_vshard_router', M)
> +if not rawget(_G, MODULE_INTERNALS) then
> + rawset(_G, MODULE_INTERNALS, M)
> else
> + router_cfg(M.current_cfg)
> M.module_version = M.module_version + 1
> end
>
> +M.discovery_f = discovery_f
> +M.failover_f = failover_f
9. Why did you move these lines?
> +
> return {
> cfg = router_cfg;
> info = router_info;
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 879c7c4..50347b0 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -21,6 +33,8 @@ if not M then
> -- See format in replicaset.lua.
> --
> replicasets = nil,
> + -- Time to deprecate old objects on reload.
> + reload_deprecate_timeout = nil,
10. Why here is 'reload_deprecate_timeout', but 'connection_outdate_delay'
in router?
> -- Triggers on master switch event. They are called right
> -- before the event occurs.
> _on_master_enable = trigger.new('_on_master_enable'),
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH 1/2] Add test on error during reconfigure
2018-06-18 10:48 ` Vladislav Shpilevoy
@ 2018-06-19 10:36 ` Alex Khatskevich
0 siblings, 0 replies; 14+ messages in thread
From: Alex Khatskevich @ 2018-06-19 10:36 UTC (permalink / raw)
To: Vladislav Shpilevoy, tarantool-patches
> callro never raises (except OOM and usage error). So if here
> an error occurs, you will not see it. Same in other places below.
Fixed by removing `_ =`
>
> I understand, but any error is impossible here after box.cfg{}.
> Please, move this
> injection one line up.
>
Fixed
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH 2/2] Complete module reload
2018-06-19 9:00 ` Vladislav Shpilevoy
@ 2018-06-19 11:49 ` Alex Khatskevich
2018-06-19 11:57 ` Vladislav Shpilevoy
0 siblings, 1 reply; 14+ messages in thread
From: Alex Khatskevich @ 2018-06-19 11:49 UTC (permalink / raw)
To: Vladislav Shpilevoy, tarantool-patches
> Hello. Thanks for the fixes! See 10 comments below and
> the commit with some new fixes on the branch.
your fixes are squashed
>
> 1. Please, declare this outdated_mt out of this function in a module
> local variable like it is done with ordinal replica/replicaset mt. Not
> just
> __index, but entire mt. See comment 3 why.
>
Ok, I like this approach.
> 2. You did not outdate replica metatables. How did your tests pass then?
Added test for outdate replica object && addrd outdated = true attribute to
old replicaset and replica objects.
>
> 3. You do not need to search old_replica_mt and old_replicaset_mt
> here. You do not
> need them at all. Just declare special object_outdate_mt and set this
> metatable to
> the old objects. Not just __index, but entire metatable. So you can
> get rid of
> this old mts search, old mt parameters.
>
Ok, I like this approach.
>> + if old_replicasets then
>> + outdate_replicasets(old_replicasets, outdate_delay)
>
> 4. Please, name it schedule_connections_outdate. Now the name
> confuses. It could be thought to deprecate the objects now, not
> after timeout.
I have deleted an intermediate function, so, now it looks like this
util.async_task(outdate_delay, outdate_replicasets,
old_replicasets)
And I suppose, renaming is redundant.
>> +local function copy_metatables()
>
> 5. Please, remove. This function is harmful and slow. You do not
> need to copy metatables. Use my recommendations above and think.
Done
>
>>
>> @@ -514,20 +579,17 @@ local function buildall(sharding_cfg,
>> old_replicasets)
>
> 6. Unused parameter.
Deleted
>
>> + outdate_replicasets = outdate_replicasets,
>
> 7. Why do you need to export this function?
Typo. Fixed
>
>> + local current_cfg = table.deepcopy(cfg)
>
> 8. Ok, I understand why do you copy it, but please,
> write a comment about it.
I decided to make a copy just for box.cfg and use `cfg` where it is
needed instead. It makes more sense for me and do not require extra
comments.
>> -M.discovery_f = discovery_f
>> -M.failover_f = failover_f
>> -
>> -if not rawget(_G, '__module_vshard_router') then
>> - rawset(_G, '__module_vshard_router', M)
>> +if not rawget(_G, MODULE_INTERNALS) then
>> + rawset(_G, MODULE_INTERNALS, M)
>> else
>> + router_cfg(M.current_cfg)
>> M.module_version = M.module_version + 1
>> end
>>
>> +M.discovery_f = discovery_f
>> +M.failover_f = failover_f
>
> 9. Why did you move these lines?
>
Because M table should not be changed in case of
`router_cfg(M.current_cfg)`
failure
>> +
>> return {
>> cfg = router_cfg;
>> info = router_info;
>> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
>> index 879c7c4..50347b0 100644
>> --- a/vshard/storage/init.lua
>> +++ b/vshard/storage/init.lua
>> @@ -21,6 +33,8 @@ if not M then
>> -- See format in replicaset.lua.
>> --
>> replicasets = nil,
>> + -- Time to deprecate old objects on reload.
>> + reload_deprecate_timeout = nil,
>
> 10. Why here is 'reload_deprecate_timeout', but
> 'connection_outdate_delay'
> in router?
Typo.
storage do not have such parameter at all. Deleted.
^ permalink raw reply [flat|nested] 14+ messages in thread
* [tarantool-patches] Re: [PATCH 2/2] Complete module reload
2018-06-19 11:49 ` Alex Khatskevich
@ 2018-06-19 11:57 ` Vladislav Shpilevoy
0 siblings, 0 replies; 14+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-19 11:57 UTC (permalink / raw)
To: Alex Khatskevich, tarantool-patches
>>> +M.discovery_f = discovery_f
>>> +M.failover_f = failover_f
>>
>> 9. Why did you move these lines?
>>
> Because M table should not be changed in case of
> `router_cfg(M.current_cfg)`
> failure
But now router_cfg on reload starts old M.failover_f and
old M.discovery_f, it is not? I think, that if cfg fails on
reload, then the caller should just return to the old
version.
^ permalink raw reply [flat|nested] 14+ messages in thread
end of thread, other threads:[~2018-06-19 11:57 UTC | newest]
Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-06-09 17:47 [tarantool-patches] [PATCH 0/2][vshard] Vshard safe reload AKhatskevich
2018-06-09 17:47 ` [tarantool-patches] [PATCH 1/2] Add test on error during reconfigure AKhatskevich
2018-06-17 19:46 ` [tarantool-patches] " Vladislav Shpilevoy
2018-06-18 10:10 ` Alex Khatskevich
2018-06-18 10:48 ` Vladislav Shpilevoy
2018-06-19 10:36 ` Alex Khatskevich
2018-06-09 17:47 ` [tarantool-patches] [PATCH 2/2] Complete module reload AKhatskevich
2018-06-17 19:46 ` [tarantool-patches] " Vladislav Shpilevoy
2018-06-19 7:09 ` Alex Khatskevich
2018-06-19 9:00 ` Vladislav Shpilevoy
2018-06-19 11:49 ` Alex Khatskevich
2018-06-19 11:57 ` Vladislav Shpilevoy
2018-06-17 19:39 ` [tarantool-patches] Re: [PATCH 0/2][vshard] Vshard safe reload Vladislav Shpilevoy
2018-06-19 7:20 ` Alex Khatskevich
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox