From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 8F4A026490 for ; Sat, 9 Jun 2018 13:47:33 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id jEyx2uVvwlpt for ; Sat, 9 Jun 2018 13:47:33 -0400 (EDT) Received: from smtp42.i.mail.ru (smtp42.i.mail.ru [94.100.177.102]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 1882C2636C for ; Sat, 9 Jun 2018 13:47:33 -0400 (EDT) From: AKhatskevich Subject: [tarantool-patches] [PATCH 2/2] Complete module reload Date: Sat, 9 Jun 2018 20:47:16 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org, v.shpilevoy@tarantool.org In case one need to upgrade vshard to a new version, this commit introduces a reload mechanism which allows to do that for a noticeable variety of possible changes (between the two versions). Reload process: * reload all vshard modules * create new `replicaset` and `replica` objects * reuse old netbox connections in new replica objects if possible * update router/storage.internal table * after a `reload_deprecate_timeout` disable old instances of `replicaset` and `replica` objects Reload works for modules: * vshard.router * vshard.storage In case reload process fails, old router/storage module continue working properly. Extra changes: * add `util.async_task` method, which runs a function after a delay * delete replicaset:rebind_connections method as it is replaced with `rebind_replicasets` which updates all replicasets at once * introduce `module_reloading` for distinguishing reloaf from reconfigure * introduce MODULE_INTERNALS which stores name of the module internal data in the global namespace Closes #112 --- test/router/reload.result | 109 +++++++++++++++++++++++++++++++++++++++++++ test/router/reload.test.lua | 41 ++++++++++++++++ test/router/router.result | 3 +- test/storage/reload.result | 61 ++++++++++++++++++++++++ test/storage/reload.test.lua | 24 ++++++++++ vshard/replicaset.lua | 91 +++++++++++++++++++++++++++++------- vshard/router/init.lua | 40 ++++++++++++---- vshard/storage/init.lua | 44 ++++++++++++----- vshard/util.lua | 20 ++++++++ 9 files changed, 392 insertions(+), 41 deletions(-) diff --git a/test/router/reload.result b/test/router/reload.result index 19a9ead..083475f 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -174,6 +174,115 @@ vshard.router.module_version() check_reloaded() --- ... +-- +-- Deprecate old replicaset and replica objets. +-- +rs = vshard.router.route(1) +--- +... +_ = rs:callro('echo', {'some_data'}) +--- +... +package.loaded["vshard.router"] = nil +--- +... +_ = require('vshard.router') +--- +... +-- Make sure async task (deprecator) has had cpu time. +fiber.sleep(0.005) +--- +... +ok, err = pcall(rs.callro, rs, 'echo', {'some_data'}) +--- +... +err:match('Object replicaset is outdated.*') +--- +- Object replicaset is outdated. Use new instance +... +vshard.router = require('vshard.router') +--- +... +rs = vshard.router.route(1) +--- +... +_ = rs:callro('echo', {'some_data'}) +--- +... +-- Test deprecate timeout. +vshard.router.internal.reload_deprecate_timeout = 0.3 +--- +... +package.loaded["vshard.router"] = nil +--- +... +_ = require('vshard.router') +--- +... +vshard.router.internal.reload_deprecate_timeout = nil +--- +... +vshard.router = require('vshard.router') +--- +... +rs_new = vshard.router.route(1) +--- +... +_ = rs_new:callro('echo', {'some_data'}) +--- +... +_ = rs:callro('echo', {'some_data'}) +--- +... +fiber.sleep(0.2) +--- +... +_ = rs:callro('echo', {'some_data'}) +--- +... +fiber.sleep(0.2) +--- +... +ok, err = pcall(rs.callro, rs, 'echo', {'some_data'}) +--- +... +err:match('Object replicaset is outdated.*') +--- +- Object replicaset is outdated. Use new instance +... +_ = rs_new:callro('echo', {'some_data'}) +--- +... +-- Error during reconfigure process. +_ = vshard.router.route(1):callro('echo', {'some_data'}) +--- +... +vshard.router.internal.errinj.ERRINJ_CFG = true +--- +... +old_internal = table.copy(vshard.router.internal) +--- +... +package.loaded["vshard.router"] = nil +--- +... +_, err = pcall(require, 'vshard.router') +--- +... +err:match('Error injection:.*') +--- +- 'Error injection: cfg' +... +vshard.router.internal.errinj.ERRINJ_CFG = false +--- +... +util.has_same_fields(old_internal, vshard.router.internal) +--- +- true +... +_ = vshard.router.route(1):callro('echo', {'some_data'}) +--- +... test_run:switch('default') --- - true diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 6e21b74..94cd8cc 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -86,6 +86,47 @@ _ = require('vshard.router') vshard.router.module_version() check_reloaded() +-- +-- Deprecate old replicaset and replica objets. +-- +rs = vshard.router.route(1) +_ = rs:callro('echo', {'some_data'}) +package.loaded["vshard.router"] = nil +_ = require('vshard.router') +-- Make sure async task (deprecator) has had cpu time. +fiber.sleep(0.005) +ok, err = pcall(rs.callro, rs, 'echo', {'some_data'}) +err:match('Object replicaset is outdated.*') +vshard.router = require('vshard.router') +rs = vshard.router.route(1) +_ = rs:callro('echo', {'some_data'}) +-- Test deprecate timeout. +vshard.router.internal.reload_deprecate_timeout = 0.3 +package.loaded["vshard.router"] = nil +_ = require('vshard.router') +vshard.router.internal.reload_deprecate_timeout = nil +vshard.router = require('vshard.router') +rs_new = vshard.router.route(1) +_ = rs_new:callro('echo', {'some_data'}) +_ = rs:callro('echo', {'some_data'}) +fiber.sleep(0.2) +_ = rs:callro('echo', {'some_data'}) +fiber.sleep(0.2) +ok, err = pcall(rs.callro, rs, 'echo', {'some_data'}) +err:match('Object replicaset is outdated.*') +_ = rs_new:callro('echo', {'some_data'}) + +-- Error during reconfigure process. +_ = vshard.router.route(1):callro('echo', {'some_data'}) +vshard.router.internal.errinj.ERRINJ_CFG = true +old_internal = table.copy(vshard.router.internal) +package.loaded["vshard.router"] = nil +_, err = pcall(require, 'vshard.router') +err:match('Error injection:.*') +vshard.router.internal.errinj.ERRINJ_CFG = false +util.has_same_fields(old_internal, vshard.router.internal) +_ = vshard.router.route(1):callro('echo', {'some_data'}) + test_run:switch('default') test_run:cmd('stop server router_1') test_run:cmd('cleanup server router_1') diff --git a/test/router/router.result b/test/router/router.result index 3ebab5d..71e156c 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -1024,11 +1024,10 @@ error_messages - - Use replicaset:callro(...) instead of replicaset.callro(...) - Use replicaset:connect_master(...) instead of replicaset.connect_master(...) - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...) - - Use replicaset:rebind_connections(...) instead of replicaset.rebind_connections(...) - Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...) - Use replicaset:call(...) instead of replicaset.call(...) - - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...) - Use replicaset:connect(...) instead of replicaset.connect(...) + - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...) - Use replicaset:callrw(...) instead of replicaset.callrw(...) - Use replicaset:connect_all(...) instead of replicaset.connect_all(...) ... diff --git a/test/storage/reload.result b/test/storage/reload.result index f689cf4..b7070e5 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -174,6 +174,67 @@ vshard.storage.module_version() check_reloaded() --- ... +-- +-- Deprecate old replicaset and replica objets. +-- +_, rs = next(vshard.storage.internal.replicasets) +--- +... +package.loaded["vshard.storage"] = nil +--- +... +_ = require('vshard.storage') +--- +... +ok, err = pcall(rs.callro, rs, 'echo', {'some_data'}) +--- +... +err:match('Object replicaset is outdated.*') +--- +- Object replicaset is outdated. Use new instance +... +_, rs = next(vshard.storage.internal.replicasets) +--- +... +_ = rs.callro(rs, 'echo', {'some_data'}) +--- +... +-- Error during reload process. +_, rs = next(vshard.storage.internal.replicasets) +--- +... +_ = rs:callro('echo', {'some_data'}) +--- +... +vshard.storage.internal.errinj.ERRINJ_CFG = true +--- +... +old_internal = table.copy(vshard.storage.internal) +--- +... +package.loaded["vshard.storage"] = nil +--- +... +_, err = pcall(require, 'vshard.storage') +--- +... +err:match('Error injection:.*') +--- +- 'Error injection: cfg' +... +vshard.storage.internal.errinj.ERRINJ_CFG = false +--- +... +util.has_same_fields(old_internal, vshard.storage.internal) +--- +- true +... +_, rs = next(vshard.storage.internal.replicasets) +--- +... +_ = rs:callro('echo', {'some_data'}) +--- +... test_run:switch('default') --- - true diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 6e19a92..bf8e803 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -87,6 +87,30 @@ _ = require('vshard.storage') vshard.storage.module_version() check_reloaded() +-- +-- Deprecate old replicaset and replica objets. +-- +_, rs = next(vshard.storage.internal.replicasets) +package.loaded["vshard.storage"] = nil +_ = require('vshard.storage') +ok, err = pcall(rs.callro, rs, 'echo', {'some_data'}) +err:match('Object replicaset is outdated.*') +_, rs = next(vshard.storage.internal.replicasets) +_ = rs.callro(rs, 'echo', {'some_data'}) + +-- Error during reload process. +_, rs = next(vshard.storage.internal.replicasets) +_ = rs:callro('echo', {'some_data'}) +vshard.storage.internal.errinj.ERRINJ_CFG = true +old_internal = table.copy(vshard.storage.internal) +package.loaded["vshard.storage"] = nil +_, err = pcall(require, 'vshard.storage') +err:match('Error injection:.*') +vshard.storage.internal.errinj.ERRINJ_CFG = false +util.has_same_fields(old_internal, vshard.storage.internal) +_, rs = next(vshard.storage.internal.replicasets) +_ = rs:callro('echo', {'some_data'}) + test_run:switch('default') test_run:drop_cluster(REPLICASET_2) test_run:drop_cluster(REPLICASET_1) diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index 99f59aa..48053a0 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -48,7 +48,8 @@ local lerror = require('vshard.error') local fiber = require('fiber') local luri = require('uri') local ffi = require('ffi') -local gsc = require('vshard.util').generate_self_checker +local util = require('vshard.util') +local gsc = util.generate_self_checker -- -- on_connect() trigger for net.box @@ -338,26 +339,79 @@ local function replicaset_tostring(replicaset) end -- --- Rebind connections of old replicas to new ones. +-- Deprecate old objects in case of reload. +-- +local function clean_old_objects(old_replicaset_mt, old_replica_mt, + old_replicas) + local function get_deprecated_warning(obj_name) + return function(...) + local finfo = debug.getinfo(2) + local file = finfo.short_src + local name = finfo.name + local line = finfo.currentline + local err_fmt = '%s.%s:%s: Object %s is outdated. Use new instance' + error(string.format(err_fmt, file, name, line, obj_name)) + end + end + local function replace_mt_methods(mt, method) + if not mt then + return + end + for name, _ in pairs(mt.__index) do + mt.__index[name] = method + end + end + replace_mt_methods(old_replicaset_mt, get_deprecated_warning('replicaset')) + replace_mt_methods(old_replica_mt, get_deprecated_warning('replica')) + for _, replica in pairs(old_replicas) do + replica.conn = nil + end + log.info('Old replicaset and replica objects are cleaned up.') +end + -- -local function replicaset_rebind_connections(replicaset) - for _, replica in pairs(replicaset.replicas) do - local old_replica = replica.old_replica - if old_replica then - local conn = old_replica.conn - replica.conn = conn - replica.down_ts = old_replica.down_ts - replica.net_timeout = old_replica.net_timeout - replica.net_sequential_ok = old_replica.net_sequential_ok - replica.net_sequential_fail = old_replica.net_sequential_fail - if conn then - conn.replica = replica - conn.replicaset = replicaset - old_replica.conn = nil +-- Copy netbox conections from old replica objects to new ones. +-- +local function rebind_replicasets(replicasets, deprecate, deprecate_timeout) + -- Collect data to deprecate old objects. + local old_replicaset_mt = nil + local old_replica_mt = nil + local old_replicas = {} + for _, replicaset in pairs(replicasets) do + if replicaset.old_replicaset then + local old_mt = getmetatable(replicaset.old_replicaset) + assert(old_replicaset_mt == nil or old_replicaset_mt == old_mt) + assert(not deprecate or + old_replicaset_mt ~= getmetatable(replicaset)) + old_replicaset_mt = old_mt + replicaset.old_replicaset = nil + end + for _, replica in pairs(replicaset.replicas) do + local old_replica = replica.old_replica + if old_replica then + local old_mt = getmetatable(old_replica) + assert(old_replica_mt == nil or old_replica_mt == old_mt) + assert(not deprecate or old_replica_mt ~= getmetatable(replica)) + old_replica_mt = old_mt + table.insert(old_replicas, old_replica) + local conn = old_replica.conn + replica.conn = conn + replica.down_ts = old_replica.down_ts + replica.net_timeout = old_replica.net_timeout + replica.net_sequential_ok = old_replica.net_sequential_ok + replica.net_sequential_fail = old_replica.net_sequential_fail + if conn then + conn.replica = replica + conn.replicaset = replicaset + end + replica.old_replica = nil end - replica.old_replica = nil end end + if deprecate then + util.async_task(deprecate_timeout, clean_old_objects, old_replicaset_mt, + old_replica_mt, old_replicas) + end end -- @@ -369,7 +423,6 @@ local replicaset_mt = { connect_master = replicaset_connect_master; connect_all = replicaset_connect_all; connect_replica = replicaset_connect_to_replica; - rebind_connections = replicaset_rebind_connections; down_replica_priority = replicaset_down_replica_priority; up_replica_priority = replicaset_up_replica_priority; call = replicaset_master_call; @@ -523,6 +576,7 @@ local function buildall(sharding_cfg, old_replicasets) weight = replicaset.weight, bucket_count = 0, lock = replicaset.lock, + old_replicaset = old_replicaset, }, replicaset_mt) local priority_list = {} for replica_uuid, replica in pairs(replicaset.replicas) do @@ -596,4 +650,5 @@ return { buildall = buildall, calculate_etalon_balance = cluster_calculate_etalon_balance, wait_masters_connect = wait_masters_connect, + rebind_replicasets = rebind_replicasets, } diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 1dee80c..c075436 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -1,5 +1,17 @@ local log = require('log') local lfiber = require('fiber') + +local MODULE_INTERNALS = '__module_vshard_router' +-- Reload requirements, in case this module is reloaded manually. +if rawget(_G, MODULE_INTERNALS) then + local vshard_modules = { + 'vshard.consts', 'vshard.error', 'vshard.cfg', + 'vshard.hash', 'vshard.replicaset', 'vshard.util', + } + for _, module in pairs(vshard_modules) do + package.loaded[module] = nil + end +end local consts = require('vshard.consts') local lerror = require('vshard.error') local lcfg = require('vshard.cfg') @@ -7,7 +19,7 @@ local lhash = require('vshard.hash') local lreplicaset = require('vshard.replicaset') local util = require('vshard.util') -local M = rawget(_G, '__module_vshard_router') +local M = rawget(_G, MODULE_INTERNALS) if not M then M = { errinj = { @@ -17,6 +29,8 @@ if not M then }, -- Bucket map cache. route_map = {}, + -- Time to deprecate old objects on reload. + reload_deprecate_timeout = nil, -- All known replicasets used for bucket re-balancing replicasets = nil, -- Fiber to maintain replica connections. @@ -129,6 +143,9 @@ local function discovery_f(module_version) consts.COLLECT_LUA_GARBAGE_INTERVAL / consts.DISCOVERY_INTERVAL while module_version == M.module_version do for _, replicaset in pairs(M.replicasets) do + if module_version ~= M.module_version then + return + end local active_buckets, err = replicaset:callro('vshard.storage.buckets_discovery', {}, {timeout = 2}) @@ -457,8 +474,11 @@ end -- Configuration -------------------------------------------------------------------------------- +-- Distinguish reload from reconfigure. +local module_reloading = true local function router_cfg(cfg) cfg = lcfg.check(cfg) + local current_cfg = table.deepcopy(cfg) if not M.replicasets then log.info('Starting router configuration') else @@ -482,15 +502,16 @@ local function router_cfg(cfg) end M.total_bucket_count = total_bucket_count M.collect_lua_garbage = collect_lua_garbage + M.current_cfg = current_cfg -- TODO: update existing route map in-place M.route_map = {} M.replicasets = new_replicasets -- Move connections from an old configuration to a new one. -- It must be done with no yields to prevent usage both of not -- fully moved old replicasets, and not fully built new ones. - for _, replicaset in pairs(new_replicasets) do - replicaset:rebind_connections() - end + lreplicaset.rebind_replicasets(new_replicasets, module_reloading, + M.reload_deprecate_timeout) + module_reloading = false -- Now the new replicasets are fully built. Can establish -- connections and yield. for _, replicaset in pairs(new_replicasets) do @@ -776,15 +797,16 @@ end -- About functions, saved in M, and reloading see comment in -- storage/init.lua. -- -M.discovery_f = discovery_f -M.failover_f = failover_f - -if not rawget(_G, '__module_vshard_router') then - rawset(_G, '__module_vshard_router', M) +if not rawget(_G, MODULE_INTERNALS) then + rawset(_G, MODULE_INTERNALS, M) else + router_cfg(M.current_cfg) M.module_version = M.module_version + 1 end +M.discovery_f = discovery_f +M.failover_f = failover_f + return { cfg = router_cfg; info = router_info; diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 879c7c4..b570821 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -2,14 +2,27 @@ local log = require('log') local luri = require('uri') local lfiber = require('fiber') local netbox = require('net.box') -- for net.box:self() +local trigger = require('internal.trigger') + +local MODULE_INTERNALS = '__module_vshard_storage' +-- Reload requirements, in case this module is reloaded manually. +if rawget(_G, MODULE_INTERNALS) then + local vshard_modules = { + 'vshard.consts', 'vshard.error', 'vshard.cfg', + 'vshard.replicaset', 'vshard.util', + } + for _, module in pairs(vshard_modules) do + package.loaded[module] = nil + end +end local consts = require('vshard.consts') local lerror = require('vshard.error') -local util = require('vshard.util') local lcfg = require('vshard.cfg') local lreplicaset = require('vshard.replicaset') -local trigger = require('internal.trigger') +local util = require('vshard.util') -local M = rawget(_G, '__module_vshard_storage') + +local M = rawget(_G, MODULE_INTERNALS) if not M then -- -- The module is loaded for the first time. @@ -21,6 +34,8 @@ if not M then -- See format in replicaset.lua. -- replicasets = nil, + -- Time to deprecate old objects on reload. + reload_deprecate_timeout = nil, -- Triggers on master switch event. They are called right -- before the event occurs. _on_master_enable = trigger.new('_on_master_enable'), @@ -1445,11 +1460,14 @@ end -------------------------------------------------------------------------------- -- Configuration -------------------------------------------------------------------------------- +-- Distinguish reload from reconfigure. +local module_reloading = true local function storage_cfg(cfg, this_replica_uuid) if this_replica_uuid == nil then error('Usage: cfg(configuration, this_replica_uuid)') end cfg = lcfg.check(cfg) + local current_cfg = table.deepcopy(cfg) if cfg.weights or cfg.zone then error('Weights and zone are not allowed for storage configuration') end @@ -1572,9 +1590,8 @@ local function storage_cfg(cfg, this_replica_uuid) box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password) M.replicasets = new_replicasets - for _, replicaset in pairs(new_replicasets) do - replicaset:rebind_connections() - end + lreplicaset.rebind_replicasets(new_replicasets, module_reloading) + module_reloading = false M.this_replicaset = this_replicaset M.this_replica = this_replica M.total_bucket_count = total_bucket_count @@ -1583,6 +1600,7 @@ local function storage_cfg(cfg, this_replica_uuid) M.shard_index = shard_index M.collect_bucket_garbage_interval = collect_bucket_garbage_interval M.collect_lua_garbage = collect_lua_garbage + M.current_cfg = current_cfg if was_master and not is_master then local_on_master_disable() @@ -1813,6 +1831,14 @@ end -- restarted (or is restarted from M.background_f, which is not -- changed) and continues use old func1 and func2. -- + +if not rawget(_G, MODULE_INTERNALS) then + rawset(_G, MODULE_INTERNALS, M) +else + storage_cfg(M.current_cfg, M.this_replica.uuid) + M.module_version = M.module_version + 1 +end + M.recovery_f = recovery_f M.collect_garbage_f = collect_garbage_f M.rebalancer_f = rebalancer_f @@ -1828,12 +1854,6 @@ M.rebalancer_build_routes = rebalancer_build_routes M.rebalancer_calculate_metrics = rebalancer_calculate_metrics M.cached_find_sharded_spaces = find_sharded_spaces -if not rawget(_G, '__module_vshard_storage') then - rawset(_G, '__module_vshard_storage', M) -else - M.module_version = M.module_version + 1 -end - return { sync = sync, bucket_force_create = bucket_force_create, diff --git a/vshard/util.lua b/vshard/util.lua index bb71318..f5bd78c 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -75,8 +75,28 @@ local function generate_self_checker(obj_name, func_name, mt, func) end end + +-- +-- Run a function without interrupting current fiber. +-- @param delay Delay in seconds before the task should be +-- executed. +-- @param task Function to be executed. +-- @param ... Arguments which would be passed to the `task`. +-- +local function async_task(delay, task, ...) + assert(delay == nil or type(delay) == 'number') + local function wait_and_call(...) + if delay then + fiber.sleep(delay) + end + task(...) + end + fiber.create(wait_and_call, ...) +end + return { tuple_extract_key = tuple_extract_key, reloadable_fiber_f = reloadable_fiber_f, generate_self_checker = generate_self_checker, + async_task = async_task, } -- 2.14.1