From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 346772120C for ; Tue, 19 Jun 2018 03:09:58 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id ey1x3bfoC4fd for ; Tue, 19 Jun 2018 03:09:58 -0400 (EDT) Received: from smtp44.i.mail.ru (smtp44.i.mail.ru [94.100.177.104]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 3322121083 for ; Tue, 19 Jun 2018 03:09:57 -0400 (EDT) Subject: [tarantool-patches] Re: [PATCH 2/2] Complete module reload References: <991c6880-9d5e-3f51-9003-a095984dc30d@tarantool.org> From: Alex Khatskevich Message-ID: Date: Tue, 19 Jun 2018 10:09:47 +0300 MIME-Version: 1.0 In-Reply-To: <991c6880-9d5e-3f51-9003-a095984dc30d@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-US Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: Vladislav Shpilevoy , tarantool-patches@freelists.org On 17.06.2018 22:46, Vladislav Shpilevoy wrote: > Thanks for the patch! See 11 comments below. > > On 09/06/2018 20:47, AKhatskevich wrote: >> In case one need to upgrade vshard to a new version, this commit >> introduces a reload mechanism which allows to do that for a >> noticeable variety of possible changes (between the two versions). > > Nitpicking: it does not introduce reload mechanism. This commit > just improves the thing. > > Maybe should we place here a manual how to reload? > A short example. added > >> >> Reload process: >>   * reload all vshard modules >>   * create new `replicaset` and `replica` objects >>   * reuse old netbox connections in new replica objects if >>     possible >>   * update router/storage.internal table >>   * after a `reload_deprecate_timeout` disable old instances of >>     `replicaset` and `replica` objects >> >> Reload works for modules: >>   * vshard.router >>   * vshard.storage >> >> In case reload process fails, old router/storage module continue >> working properly. >> >> Extra changes: >>   * add `util.async_task` method, which runs a function after a >>     delay >>   * delete replicaset:rebind_connections method as it is replaced >>     with `rebind_replicasets` which updates all replicasets at once >>   * introduce `module_reloading` for distinguishing reloaf from > > 1. reloaf -> reload. fixed > >>     reconfigure >> * introduce MODULE_INTERNALS which stores name of the module >>    internal data in the global namespace >> >> Closes #112 >> --- >>   test/router/reload.result    | 109 >> +++++++++++++++++++++++++++++++++++++++++++ >>   test/router/reload.test.lua  |  41 ++++++++++++++++ >>   test/router/router.result    |   3 +- >>   test/storage/reload.result   |  61 ++++++++++++++++++++++++ >>   test/storage/reload.test.lua |  24 ++++++++++ >>   vshard/replicaset.lua        |  91 >> +++++++++++++++++++++++++++++------- >>   vshard/router/init.lua       |  40 ++++++++++++---- >>   vshard/storage/init.lua      |  44 ++++++++++++----- >>   vshard/util.lua              |  20 ++++++++ >>   9 files changed, 392 insertions(+), 41 deletions(-) >> > > I will review the tests later, after we finish with the code. > >> diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua >> index 99f59aa..48053a0 100644 >> --- a/vshard/replicaset.lua >> +++ b/vshard/replicaset.lua >> @@ -48,7 +48,8 @@ local lerror = require('vshard.error') >>   local fiber = require('fiber') >>   local luri = require('uri') >>   local ffi = require('ffi') >> -local gsc = require('vshard.util').generate_self_checker >> +local util = require('vshard.util') >> +local gsc = util.generate_self_checker >>     -- >>   -- on_connect() trigger for net.box >> @@ -338,26 +339,79 @@ local function replicaset_tostring(replicaset) >>   end >>     -- >> --- Rebind connections of old replicas to new ones. >> +-- Deprecate old objects in case of reload. >> +-- >> +local function clean_old_objects(old_replicaset_mt, old_replica_mt, >> +                                 old_replicas) >> +    local function get_deprecated_warning(obj_name) >> +        return function(...) >> +            local finfo = debug.getinfo(2) >> +            local file = finfo.short_src >> +            local name = finfo.name >> +            local line = finfo.currentline >> +            local err_fmt = '%s.%s:%s: Object %s is outdated. Use >> new instance' >> +            error(string.format(err_fmt, file, name, line, obj_name)) >> +        end >> +    end > 2.1 Why could not you just declare this function out of > clean_old_object()? > It does not depend on any upvalue here. And it is parsed every time when > clean_old_objects is called slowing it down. This function is called only on vshard reload and the pattern does not slow down that process noticeable. However it makes code much more readable. Looking through the module you do not loose your attention on huge collection of different functions. Those helper-functions are used only inside of the `clean_old_objects` function and that fact is emphasized. > > 2.2. Too complex error message. Lets just throw error that object is > outdated. I have changed the API. Returning simple vshard error instead of raising an exception. > 3. Why replace_mt_methods needs old mt? Why not just replace the whole > old > __index? > > I thought you would do something like this: > > getmetatable(old_object).__index = function() >     return function() error('The object is outdated') end > end > > Any attempt to call a method in an old object leads to > throwing an error. > > It allows to remove both replace_mt_methods, get_deprecated_warning. > > And one more remark: our convention is to throw only OOM and misusage > errors. Here we should return two values: nil, error_object. Lets > add a new error code for this object. Fixed > >> +    for _, replica in pairs(old_replicas) do >> +        replica.conn = nil >> +    end >> +    log.info('Old replicaset and replica objects are cleaned up.') >> +end >> + >>   -- >> -local function replicaset_rebind_connections(replicaset) >> -    for _, replica in pairs(replicaset.replicas) do >> -        local old_replica = replica.old_replica >> -        if old_replica then >> -            local conn = old_replica.conn >> -            replica.conn = conn >> -            replica.down_ts = old_replica.down_ts >> -            replica.net_timeout = old_replica.net_timeout >> -            replica.net_sequential_ok = old_replica.net_sequential_ok >> -            replica.net_sequential_fail = >> old_replica.net_sequential_fail >> -            if conn then >> -                conn.replica = replica >> -                conn.replicaset = replicaset >> -                old_replica.conn = nil >> +-- Copy netbox conections from old replica objects to new ones. >> +-- >> +local function rebind_replicasets(replicasets, deprecate, >> deprecate_timeout) > > 4. Lets always deprecate old objects. Reconfigure is like reload > called from API. > And remove 'deprecate' parameter. > > On reconfiguration deprecating is even more useful because if during the > reconfiguration a cluster topology is changed, then some of buckets > can be > transferred making old routes (and thus old replicaset objects that > had been got by router.route() call) outdated. Discussed verbally. The main reason to deprecate old objects is that we have to keep track of all objects to reload them and to reistablish connections in th future. Fixed. > >> +    -- Collect data to deprecate old objects. >> +    local old_replicaset_mt = nil >> +    local old_replica_mt = nil >> +    local old_replicas = {} >> +    for _, replicaset in pairs(replicasets) do >> +        if replicaset.old_replicaset then >> +            local old_mt = getmetatable(replicaset.old_replicaset) >> +            assert(old_replicaset_mt == nil or old_replicaset_mt == >> old_mt) >> +            assert(not deprecate or >> +                   old_replicaset_mt ~= getmetatable(replicaset)) >> +            old_replicaset_mt = old_mt >> +            replicaset.old_replicaset = nil >> +        end >> +        for _, replica in pairs(replicaset.replicas) do >> +            local old_replica = replica.old_replica >> +            if old_replica then >> +                local old_mt = getmetatable(old_replica) >> +                assert(old_replica_mt == nil or old_replica_mt == >> old_mt) >> +                assert(not deprecate or old_replica_mt ~= >> getmetatable(replica)) >> +                old_replica_mt = old_mt >> +                table.insert(old_replicas, old_replica) >> +                local conn = old_replica.conn >> +                replica.conn = conn >> +                replica.down_ts = old_replica.down_ts >> +                replica.net_timeout = old_replica.net_timeout >> +                replica.net_sequential_ok = >> old_replica.net_sequential_ok >> +                replica.net_sequential_fail = >> old_replica.net_sequential_fail >> +                if conn then >> +                    conn.replica = replica >> +                    conn.replicaset = replicaset >> +                end >> +                replica.old_replica = nil >>               end >> -            replica.old_replica = nil >>           end >>       end >> +    if deprecate then >> +        util.async_task(deprecate_timeout, clean_old_objects, >> old_replicaset_mt, >> +                        old_replica_mt, old_replicas) > > 5. Here you have lost replicas and replicasets that had been removed from > the configuration. Not updated, but exactly removed. You iterate over new > replicasets only, that has not links to removed ones. > > You should not deprecate old objects via links in new ones. You should > have > two different methods: to schedule old objects deprecation and to rebind > connections. Looks, like rebind_connections should be returned. I did not like old version of the method because iе had to be called for each replicaset. I do want nor create as many fibers for deprecation nor bypass the fiber between calls. To fix issue I had to write function `copy_metatables()` to distinguish metatables of old and new objects. Fixed. > >> +    end >>   end >>     -- >> @@ -523,6 +576,7 @@ local function buildall(sharding_cfg, >> old_replicasets) >>               weight = replicaset.weight, >>               bucket_count = 0, >>               lock = replicaset.lock, >> +            old_replicaset = old_replicaset, > > 6. You should not have this link. old_replicaset deleted old_replica deleted (this functionality is moved to `rebind_replicasets`) > >>           }, replicaset_mt) >>           local priority_list = {} >> diff --git a/vshard/router/init.lua b/vshard/router/init.lua >> index 1dee80c..c075436 100644 >> --- a/vshard/router/init.lua >> +++ b/vshard/router/init.lua >> @@ -129,6 +143,9 @@ local function discovery_f(module_version) >>           consts.COLLECT_LUA_GARBAGE_INTERVAL / >> consts.DISCOVERY_INTERVAL >>       while module_version == M.module_version do >>           for _, replicaset in pairs(M.replicasets) do >> +            if module_version ~= M.module_version then >> +                return >> +            end > > 7. As I told you verbally, this check should be done after callro. > I can reload the module during callro(), and old replicaset object > will fill the route map. Can you please extract this 3-line fix in a > separate commit with a test? Yes. I have done it in the other commit ( along with gh-117 https://github.com/tarantool/vshard/commit/fa97ea478a9e4d85237eeb2bb8ccab0067d4914d ) So, I would jut delete it from this patchset and leave for gh-117. > >>               local active_buckets, err = >> replicaset:callro('vshard.storage.buckets_discovery', {}, >>                                     {timeout = 2}) >> @@ -457,8 +474,11 @@ end >>   -- Configuration >> -------------------------------------------------------------------------------- >>   +-- Distinguish reload from reconfigure. >> +local module_reloading = true >>   local function router_cfg(cfg) >>       cfg = lcfg.check(cfg) >> +    local current_cfg = table.deepcopy(cfg) > > 8. cfg is already deepcopy of the original cfg. Yes, however there is a `lcfg.remove_non_box_options(cfg)`. Possibly, for extensibility it is better to make a cfg copy just for box.cfg{} before calling `remove_non_box_options`. > >>       if not M.replicasets then >>           log.info('Starting router configuration') >>       else >> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua >> index 879c7c4..b570821 100644 >> --- a/vshard/storage/init.lua >> +++ b/vshard/storage/init.lua >> @@ -2,14 +2,27 @@ local log = require('log') >>   local luri = require('uri') >>   local lfiber = require('fiber') >>   local netbox = require('net.box') -- for net.box:self() >> +local trigger = require('internal.trigger') >> + >> +local MODULE_INTERNALS = '__module_vshard_storage' >> +-- Reload requirements, in case this module is reloaded manually. >> +if rawget(_G, MODULE_INTERNALS) then >> +    local vshard_modules = { >> +        'vshard.consts', 'vshard.error', 'vshard.cfg', >> +        'vshard.replicaset', 'vshard.util', >> +    } >> +    for _, module in pairs(vshard_modules) do >> +        package.loaded[module] = nil >> +    end >> +end >>   local consts = require('vshard.consts') >>   local lerror = require('vshard.error') >> -local util = require('vshard.util') >>   local lcfg = require('vshard.cfg') >>   local lreplicaset = require('vshard.replicaset') >> -local trigger = require('internal.trigger') >> +local util = require('vshard.util') >>   -local M = rawget(_G, '__module_vshard_storage') >> + > > 9. Extra empty line. fixed >> +local M = rawget(_G, MODULE_INTERNALS) >>   if not M then >>       -- >>       -- The module is loaded for the first time. >> @@ -21,6 +34,8 @@ if not M then >>           -- See format in replicaset.lua. >>           -- >>           replicasets = nil, >> +        -- Time to deprecate old objects on reload. >> +        reload_deprecate_timeout = nil, > > 10. Looks like this thing is always 'nil' until somebody does > not manually set it via storage/router.internal. It is not good. > Please, move it to configuration and create a default value. Fixed > >>           -- Triggers on master switch event. They are called right >>           -- before the event occurs. >>           _on_master_enable = trigger.new('_on_master_enable'), >> diff --git a/vshard/util.lua b/vshard/util.lua >> index bb71318..f5bd78c 100644 >> --- a/vshard/util.lua >> +++ b/vshard/util.lua >> @@ -75,8 +75,28 @@ local function generate_self_checker(obj_name, >> func_name, mt, func) >>       end >>   end >>   + >> +-- >> +-- Run a function without interrupting current fiber. >> +-- @param delay Delay in seconds before the task should be >> +--        executed. >> +-- @param task Function to be executed. >> +-- @param ... Arguments which would be passed to the `task`. >> +-- >> +local function async_task(delay, task, ...) >> +    assert(delay == nil or type(delay) == 'number') >> +    local function wait_and_call(...) >> +        if delay then >> +            fiber.sleep(delay) >> +        end >> +        task(...) >> +    end > > 11. Please, avoid closures when possible. Every time the function is > called the closure is parsed again. As in the previous case. I did it to increase readability. Fixed. (sync_task created) I also have renamed Deprecate -> Outdate. Here is a new diff: commit fd6d22cbeef28097199b36e34bc7bf33f659f40a Author: AKhatskevich Date:   Sat Jun 9 17:23:40 2018 +0300     Complete module reload     In case one need to upgrade vshard to a new version, this commit     improves reload mechanism to allow to do that for a wider variety of     possible changes (between two versions).     Changes:      * introduce cfg option `connection_outdate_delay`      * improve reload mechanism      * add `util.async_task` method, which runs a function after a        delay      * delete replicaset:rebind_connections method as it is replaced        with `rebind_replicasets` which updates all replicasets at once     Reload mechanism:      * reload all vshard modules      * create new `replicaset` and `replica` objects      * reuse old netbox connections in new replica objects if        possible      * update router/storage.internal table      * after a `connection_outdate_delay` disable old instances of        `replicaset` and `replica` objects     Reload works for modules:      * vshard.router      * vshard.storage     Here is a module reload algorihtm:      * old vshard is working      * delete old vshard src      * install new vshard      * call: package.loaded['vshard.router'] = nil      * call: old_router = vshard.router -- Save working router copy.      * call: vshard.router = require('vshard.router')      * if require fails: continue using old_router      * if require succeeds: use vshard.router     In case reload process fails, old router/storage module, replicaset and     replica objects continue working properly. If reload succeeds, all old     objects would be deprecated.     Extra changes:      * introduce MODULE_INTERNALS which stores name of the module        internal data in the global namespace     Closes #112 diff --git a/test/router/reload.result b/test/router/reload.result index 19a9ead..6517934 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -174,6 +174,143 @@ vshard.router.module_version()  check_reloaded()  ---  ... +-- +-- Outdate old replicaset and replica objets. +-- +rs = vshard.router.route(1) +--- +... +rs:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +package.loaded["vshard.router"] = nil +--- +... +_ = require('vshard.router') +--- +... +-- Make sure outdate async task has had cpu time. +fiber.sleep(0.005) +--- +... +rs.callro(rs, 'echo', {'some_data'}) +--- +- null +- type: ShardingError +  name: OBJECT_IS_OUTDATED +  message: Object is outdated after module reload/reconfigure. Use new instance. +  code: 20 +... +vshard.router = require('vshard.router') +--- +... +rs = vshard.router.route(1) +--- +... +rs:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +-- Test `connection_outdate_delay`. +old_connection_delay = cfg.connection_outdate_delay +--- +... +cfg.connection_outdate_delay = 0.3 +--- +... +vshard.router.cfg(cfg) +--- +... +cfg.connection_outdate_delay = old_connection_delay +--- +... +vshard.router.internal.connection_outdate_delay = nil +--- +... +vshard.router = require('vshard.router') +--- +... +rs_new = vshard.router.route(1) +--- +... +rs_new:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +rs:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +fiber.sleep(0.2) +--- +... +rs:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +fiber.sleep(0.2) +--- +... +rs:callro('echo', {'some_data'}) +--- +- null +- type: ShardingError +  name: OBJECT_IS_OUTDATED +  message: Object is outdated after module reload/reconfigure. Use new instance. +  code: 20 +... +err:match('Object replicaset is outdated.*') +--- +- error: '[string "return err:match(''Object replicaset is outdat..."]:1: variable +    ''err'' is not declared' +... +rs_new:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +-- Error during reconfigure process. +_ = vshard.router.route(1):callro('echo', {'some_data'}) +--- +... +vshard.router.internal.errinj.ERRINJ_CFG = true +--- +... +old_internal = table.copy(vshard.router.internal) +--- +... +package.loaded["vshard.router"] = nil +--- +... +_, err = pcall(require, 'vshard.router') +--- +... +err:match('Error injection:.*') +--- +- 'Error injection: cfg' +... +vshard.router.internal.errinj.ERRINJ_CFG = false +--- +... +util.has_same_fields(old_internal, vshard.router.internal) +--- +- true +... +_ = vshard.router.route(1):callro('echo', {'some_data'}) +--- +...  test_run:switch('default')  ---  - true diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 6e21b74..9b73870 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -86,6 +86,47 @@ _ = require('vshard.router')  vshard.router.module_version()  check_reloaded() +-- +-- Outdate old replicaset and replica objets. +-- +rs = vshard.router.route(1) +rs:callro('echo', {'some_data'}) +package.loaded["vshard.router"] = nil +_ = require('vshard.router') +-- Make sure outdate async task has had cpu time. +fiber.sleep(0.005) +rs.callro(rs, 'echo', {'some_data'}) +vshard.router = require('vshard.router') +rs = vshard.router.route(1) +rs:callro('echo', {'some_data'}) +-- Test `connection_outdate_delay`. +old_connection_delay = cfg.connection_outdate_delay +cfg.connection_outdate_delay = 0.3 +vshard.router.cfg(cfg) +cfg.connection_outdate_delay = old_connection_delay +vshard.router.internal.connection_outdate_delay = nil +vshard.router = require('vshard.router') +rs_new = vshard.router.route(1) +rs_new:callro('echo', {'some_data'}) +rs:callro('echo', {'some_data'}) +fiber.sleep(0.2) +rs:callro('echo', {'some_data'}) +fiber.sleep(0.2) +rs:callro('echo', {'some_data'}) +err:match('Object replicaset is outdated.*') +rs_new:callro('echo', {'some_data'}) + +-- Error during reconfigure process. +_ = vshard.router.route(1):callro('echo', {'some_data'}) +vshard.router.internal.errinj.ERRINJ_CFG = true +old_internal = table.copy(vshard.router.internal) +package.loaded["vshard.router"] = nil +_, err = pcall(require, 'vshard.router') +err:match('Error injection:.*') +vshard.router.internal.errinj.ERRINJ_CFG = false +util.has_same_fields(old_internal, vshard.router.internal) +_ = vshard.router.route(1):callro('echo', {'some_data'}) +  test_run:switch('default')  test_run:cmd('stop server router_1')  test_run:cmd('cleanup server router_1') diff --git a/test/router/router.result b/test/router/router.result index 3ebab5d..71e156c 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -1024,11 +1024,10 @@ error_messages  - - Use replicaset:callro(...) instead of replicaset.callro(...)    - Use replicaset:connect_master(...) instead of replicaset.connect_master(...)    - Use replicaset:connect_replica(...) instead of replicaset.connect_replica(...) -  - Use replicaset:rebind_connections(...) instead of replicaset.rebind_connections(...)    - Use replicaset:down_replica_priority(...) instead of replicaset.down_replica_priority(...)    - Use replicaset:call(...) instead of replicaset.call(...) -  - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)    - Use replicaset:connect(...) instead of replicaset.connect(...) +  - Use replicaset:up_replica_priority(...) instead of replicaset.up_replica_priority(...)    - Use replicaset:callrw(...) instead of replicaset.callrw(...)    - Use replicaset:connect_all(...) instead of replicaset.connect_all(...)  ... diff --git a/test/storage/reload.result b/test/storage/reload.result index f689cf4..8eec80f 100644 --- a/test/storage/reload.result +++ b/test/storage/reload.result @@ -174,6 +174,74 @@ vshard.storage.module_version()  check_reloaded()  ---  ... +-- +-- Outdate old replicaset and replica objets. +-- +_, rs = next(vshard.storage.internal.replicasets) +--- +... +package.loaded["vshard.storage"] = nil +--- +... +_ = require('vshard.storage') +--- +... +rs.callro(rs, 'echo', {'some_data'}) +--- +- null +- type: ShardingError +  name: OBJECT_IS_OUTDATED +  message: Object is outdated after module reload/reconfigure. Use new instance. +  code: 20 +... +_, rs = next(vshard.storage.internal.replicasets) +--- +... +rs.callro(rs, 'echo', {'some_data'}) +--- +- some_data +- null +- null +... +-- Error during reload process. +_, rs = next(vshard.storage.internal.replicasets) +--- +... +rs:callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... +vshard.storage.internal.errinj.ERRINJ_CFG = true +--- +... +old_internal = table.copy(vshard.storage.internal) +--- +... +package.loaded["vshard.storage"] = nil +--- +... +_, err = pcall(require, 'vshard.storage') +--- +... +err:match('Error injection:.*') +--- +- 'Error injection: cfg' +... +vshard.storage.internal.errinj.ERRINJ_CFG = false +--- +... +util.has_same_fields(old_internal, vshard.storage.internal) +--- +- true +... +_, rs = next(vshard.storage.internal.replicasets) +--- +... +_ = rs:callro('echo', {'some_data'}) +--- +...  test_run:switch('default')  ---  - true diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua index 6e19a92..7d870ff 100644 --- a/test/storage/reload.test.lua +++ b/test/storage/reload.test.lua @@ -87,6 +87,29 @@ _ = require('vshard.storage')  vshard.storage.module_version()  check_reloaded() +-- +-- Outdate old replicaset and replica objets. +-- +_, rs = next(vshard.storage.internal.replicasets) +package.loaded["vshard.storage"] = nil +_ = require('vshard.storage') +rs.callro(rs, 'echo', {'some_data'}) +_, rs = next(vshard.storage.internal.replicasets) +rs.callro(rs, 'echo', {'some_data'}) + +-- Error during reload process. +_, rs = next(vshard.storage.internal.replicasets) +rs:callro('echo', {'some_data'}) +vshard.storage.internal.errinj.ERRINJ_CFG = true +old_internal = table.copy(vshard.storage.internal) +package.loaded["vshard.storage"] = nil +_, err = pcall(require, 'vshard.storage') +err:match('Error injection:.*') +vshard.storage.internal.errinj.ERRINJ_CFG = false +util.has_same_fields(old_internal, vshard.storage.internal) +_, rs = next(vshard.storage.internal.replicasets) +_ = rs:callro('echo', {'some_data'}) +  test_run:switch('default')  test_run:drop_cluster(REPLICASET_2)  test_run:drop_cluster(REPLICASET_1) diff --git a/vshard/cfg.lua b/vshard/cfg.lua index f5db4c0..5703867 100644 --- a/vshard/cfg.lua +++ b/vshard/cfg.lua @@ -218,6 +218,10 @@ local cfg_template = {          type = 'non-negative number', name = 'Sync timeout', is_optional = true,          default = consts.DEFAULT_SYNC_TIMEOUT      }}, +    {'connection_outdate_delay', { +        type = 'non-negative number', name = 'Object outdate timeout', +        is_optional = true, default = nil +    }},  }  -- @@ -247,6 +251,8 @@ local function remove_non_box_options(cfg)      cfg.collect_bucket_garbage_interval = nil      cfg.collect_lua_garbage = nil      cfg.sync_timeout = nil +    cfg.sync_timeout = nil +    cfg.connection_outdate_delay = nil  end  return { diff --git a/vshard/error.lua b/vshard/error.lua index cf2f9d2..9df2c53 100644 --- a/vshard/error.lua +++ b/vshard/error.lua @@ -100,6 +100,11 @@ local error_message_template = {      [19] = {          name = 'REPLICASET_IS_LOCKED',          msg = 'Replicaset is locked' +    }, +    [20] = { +        name = 'OBJECT_IS_OUTDATED', +        msg = 'Object is outdated after module reload/reconfigure. ' .. +            'Use new instance.'      }  } diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index 99f59aa..4e7bc10 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -48,7 +48,8 @@ local lerror = require('vshard.error')  local fiber = require('fiber')  local luri = require('uri')  local ffi = require('ffi') -local gsc = require('vshard.util').generate_self_checker +local util = require('vshard.util') +local gsc = util.generate_self_checker  --  -- on_connect() trigger for net.box @@ -338,26 +339,80 @@ local function replicaset_tostring(replicaset)  end  -- --- Rebind connections of old replicas to new ones. +-- Outdate old objects in case of reload/reconfigure.  -- -local function replicaset_rebind_connections(replicaset) -    for _, replica in pairs(replicaset.replicas) do -        local old_replica = replica.old_replica -        if old_replica then -            local conn = old_replica.conn -            replica.conn = conn -            replica.down_ts = old_replica.down_ts -            replica.net_timeout = old_replica.net_timeout -            replica.net_sequential_ok = old_replica.net_sequential_ok -            replica.net_sequential_fail = old_replica.net_sequential_fail -            if conn then -                conn.replica = replica -                conn.replicaset = replicaset -                old_replica.conn = nil +local function cleanup_old_objects(old_replicaset_mt, old_replica_mt, +                                 old_replicas) +    local function get_outdated_warning() +        return function(...) +            return nil, lerror.vshard(lerror.code.OBJECT_IS_OUTDATED) +        end +    end +    old_replicaset_mt.__index = get_outdated_warning +    -- Leave replica_mt unchanged. All its methods are safe. +    for _, replica in pairs(old_replicas) do +        replica.conn = nil +    end +    log.info('Old replicaset and replica objects are outdated.') +end + +-- +-- Initiate outdating process, which cancels connections and +-- spoils object methods. +-- +local function outdate_replicasets(replicasets, outdate_delay) +    -- Collect data to outdate old objects. +    local old_replicaset_mt = nil +    local old_replica_mt = nil +    local old_replicas = {} +    for _, replicaset in pairs(replicasets) do +        local old_mt = getmetatable(replicaset) +        assert(old_replicaset_mt == nil or old_replicaset_mt == old_mt) +        old_replicaset_mt = old_mt +        for _, replica in pairs(replicaset.replicas) do +            table.insert(old_replicas, replica) +            local old_mt = getmetatable(replica) +            assert(old_replica_mt == nil or old_replica_mt == old_mt) +            old_replica_mt = old_mt +        end +    end +    util.async_task(outdate_delay, cleanup_old_objects, old_replicaset_mt, +                    old_replica_mt, old_replicas) +end + + +-- +-- Copy netbox conections from old replica objects to new ones +-- and outdate old objects. +-- @param replicasets New replicasets +-- @param old_replicasets Replicasets and replicas to be outdated. +-- @param outdate_delay Number of seconds; delay to outdate +--        old objects. +-- +local function rebind_replicasets(replicasets, old_replicasets, outdate_delay) +    for replicaset_uuid, replicaset in pairs(replicasets) do +        local old_replicaset = old_replicasets and + old_replicasets[replicaset_uuid] +        for replica_uuid, replica in pairs(replicaset.replicas) do +            local old_replica = old_replicaset and + old_replicaset.replicas[replica_uuid] +            if old_replica then +                local conn = old_replica.conn +                replica.conn = conn +                replica.down_ts = old_replica.down_ts +                replica.net_timeout = old_replica.net_timeout +                replica.net_sequential_ok = old_replica.net_sequential_ok +                replica.net_sequential_fail = old_replica.net_sequential_fail +                if conn then +                    conn.replica = replica +                    conn.replicaset = replicaset +                end              end -            replica.old_replica = nil          end      end +    if old_replicasets then +        outdate_replicasets(old_replicasets, outdate_delay) +    end  end  -- @@ -369,7 +424,6 @@ local replicaset_mt = {          connect_master = replicaset_connect_master;          connect_all = replicaset_connect_all;          connect_replica = replicaset_connect_to_replica; -        rebind_connections = replicaset_rebind_connections;          down_replica_priority = replicaset_down_replica_priority;          up_replica_priority = replicaset_up_replica_priority;          call = replicaset_master_call; @@ -378,14 +432,6 @@ local replicaset_mt = {      };      __tostring = replicaset_tostring;  } --- --- Wrap self methods with a sanity checker. --- -local index = {} -for name, func in pairs(replicaset_mt.__index) do -    index[name] = gsc("replicaset", name, replicaset_mt, func) -end -replicaset_mt.__index = index  local replica_mt = {      __index = { @@ -406,11 +452,30 @@ local replica_mt = {          end      end,  } -index = {} -for name, func in pairs(replica_mt.__index) do -    index[name] = gsc("replica", name, replica_mt, func) + +-- +-- Get new copy of replicaset and replica meta tables. +-- It is important to have distinct copies of meta tables to +-- be able to outdate only old objects. +-- +local function copy_metatables() +    -- +    -- Wrap self methods with a sanity checker. +    -- +    local replicaset_mt_copy = table.copy(replicaset_mt) +    local replica_mt_copy = table.copy(replica_mt) +    local index = {} +    for name, func in pairs(replicaset_mt_copy.__index) do +        index[name] = gsc("replicaset", name, replicaset_mt_copy, func) +    end +    replicaset_mt_copy.__index = index +    index = {} +    for name, func in pairs(replica_mt_copy.__index) do +        index[name] = gsc("replica", name, replica_mt_copy, func) +    end +    replica_mt_copy.__index = index +    return replicaset_mt_copy, replica_mt_copy  end -replica_mt.__index = index  --  -- Calculate for each replicaset its etalon bucket count. @@ -514,20 +579,17 @@ local function buildall(sharding_cfg, old_replicasets)          zone_weights = {}      end      local curr_ts = fiber.time() +    local replicaset_mt_copy, replica_mt_copy = copy_metatables()      for replicaset_uuid, replicaset in pairs(sharding_cfg.sharding) do -        local old_replicaset = old_replicasets and - old_replicasets[replicaset_uuid]          local new_replicaset = setmetatable({              replicas = {},              uuid = replicaset_uuid,              weight = replicaset.weight,              bucket_count = 0,              lock = replicaset.lock, -        }, replicaset_mt) +        }, replicaset_mt_copy)          local priority_list = {}          for replica_uuid, replica in pairs(replicaset.replicas) do -            local old_replica = old_replicaset and - old_replicaset.replicas[replica_uuid]              -- The old replica is saved in the new object to              -- rebind its connection at the end of a              -- router/storage reconfiguration. @@ -535,8 +597,8 @@ local function buildall(sharding_cfg, old_replicasets)                  uri = replica.uri, name = replica.name, uuid = replica_uuid,                  zone = replica.zone, net_timeout = consts.CALL_TIMEOUT_MIN,                  net_sequential_ok = 0, net_sequential_fail = 0, -                down_ts = curr_ts, old_replica = old_replica, -            }, replica_mt) +                down_ts = curr_ts, +            }, replica_mt_copy)              new_replicaset.replicas[replica_uuid] = new_replica              if replica.master then                  new_replicaset.master = new_replica @@ -596,4 +658,6 @@ return {      buildall = buildall,      calculate_etalon_balance = cluster_calculate_etalon_balance,      wait_masters_connect = wait_masters_connect, +    rebind_replicasets = rebind_replicasets, +    outdate_replicasets = outdate_replicasets,  } diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 1dee80c..d70d060 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -1,5 +1,17 @@  local log = require('log')  local lfiber = require('fiber') + +local MODULE_INTERNALS = '__module_vshard_router' +-- Reload requirements, in case this module is reloaded manually. +if rawget(_G, MODULE_INTERNALS) then +    local vshard_modules = { +        'vshard.consts', 'vshard.error', 'vshard.cfg', +        'vshard.hash', 'vshard.replicaset', 'vshard.util', +    } +    for _, module in pairs(vshard_modules) do +        package.loaded[module] = nil +    end +end  local consts = require('vshard.consts')  local lerror = require('vshard.error')  local lcfg = require('vshard.cfg') @@ -7,7 +19,7 @@ local lhash = require('vshard.hash')  local lreplicaset = require('vshard.replicaset')  local util = require('vshard.util') -local M = rawget(_G, '__module_vshard_router') +local M = rawget(_G, MODULE_INTERNALS)  if not M then      M = {          errinj = { @@ -15,6 +27,8 @@ if not M then              ERRINJ_FAILOVER_CHANGE_CFG = false,              ERRINJ_RELOAD = false,          }, +        -- Time to outdate old objects on reload. +        connection_outdate_delay = nil,          -- Bucket map cache.          route_map = {},          -- All known replicasets used for bucket re-balancing @@ -459,6 +473,7 @@ end  local function router_cfg(cfg)      cfg = lcfg.check(cfg) +    local current_cfg = table.deepcopy(cfg)      if not M.replicasets then          log.info('Starting router configuration')      else @@ -480,17 +495,18 @@ local function router_cfg(cfg)      if M.errinj.ERRINJ_CFG then          error('Error injection: cfg')      end +    M.connection_outdate_delay = current_cfg.connection_outdate_delay      M.total_bucket_count = total_bucket_count      M.collect_lua_garbage = collect_lua_garbage +    M.current_cfg = current_cfg      -- TODO: update existing route map in-place      M.route_map = {} -    M.replicasets = new_replicasets      -- Move connections from an old configuration to a new one.      -- It must be done with no yields to prevent usage both of not      -- fully moved old replicasets, and not fully built new ones. -    for _, replicaset in pairs(new_replicasets) do -        replicaset:rebind_connections() -    end +    lreplicaset.rebind_replicasets(new_replicasets, M.replicasets, + M.connection_outdate_delay) +    M.replicasets = new_replicasets      -- Now the new replicasets are fully built. Can establish      -- connections and yield.      for _, replicaset in pairs(new_replicasets) do @@ -776,15 +792,16 @@ end  -- About functions, saved in M, and reloading see comment in  -- storage/init.lua.  -- -M.discovery_f = discovery_f -M.failover_f = failover_f - -if not rawget(_G, '__module_vshard_router') then -    rawset(_G, '__module_vshard_router', M) +if not rawget(_G, MODULE_INTERNALS) then +    rawset(_G, MODULE_INTERNALS, M)  else +    router_cfg(M.current_cfg)      M.module_version = M.module_version + 1  end +M.discovery_f = discovery_f +M.failover_f = failover_f +  return {      cfg = router_cfg;      info = router_info; diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 879c7c4..50347b0 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -2,14 +2,26 @@ local log = require('log')  local luri = require('uri')  local lfiber = require('fiber')  local netbox = require('net.box') -- for net.box:self() +local trigger = require('internal.trigger') + +local MODULE_INTERNALS = '__module_vshard_storage' +-- Reload requirements, in case this module is reloaded manually. +if rawget(_G, MODULE_INTERNALS) then +    local vshard_modules = { +        'vshard.consts', 'vshard.error', 'vshard.cfg', +        'vshard.replicaset', 'vshard.util', +    } +    for _, module in pairs(vshard_modules) do +        package.loaded[module] = nil +    end +end  local consts = require('vshard.consts')  local lerror = require('vshard.error') -local util = require('vshard.util')  local lcfg = require('vshard.cfg')  local lreplicaset = require('vshard.replicaset') -local trigger = require('internal.trigger') +local util = require('vshard.util') -local M = rawget(_G, '__module_vshard_storage') +local M = rawget(_G, MODULE_INTERNALS)  if not M then      --      -- The module is loaded for the first time. @@ -21,6 +33,8 @@ if not M then          -- See format in replicaset.lua.          --          replicasets = nil, +        -- Time to deprecate old objects on reload. +        reload_deprecate_timeout = nil,          -- Triggers on master switch event. They are called right          -- before the event occurs.          _on_master_enable = trigger.new('_on_master_enable'), @@ -1450,6 +1464,7 @@ local function storage_cfg(cfg, this_replica_uuid)          error('Usage: cfg(configuration, this_replica_uuid)')      end      cfg = lcfg.check(cfg) +    local current_cfg = table.deepcopy(cfg)      if cfg.weights or cfg.zone then          error('Weights and zone are not allowed for storage configuration')      end @@ -1571,10 +1586,8 @@ local function storage_cfg(cfg, this_replica_uuid)      local uri = luri.parse(this_replica.uri)      box.once("vshard:storage:1", storage_schema_v1, uri.login, uri.password) +    lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)      M.replicasets = new_replicasets -    for _, replicaset in pairs(new_replicasets) do -        replicaset:rebind_connections() -    end      M.this_replicaset = this_replicaset      M.this_replica = this_replica      M.total_bucket_count = total_bucket_count @@ -1583,6 +1596,7 @@ local function storage_cfg(cfg, this_replica_uuid)      M.shard_index = shard_index      M.collect_bucket_garbage_interval = collect_bucket_garbage_interval      M.collect_lua_garbage = collect_lua_garbage +    M.current_cfg = current_cfg      if was_master and not is_master then          local_on_master_disable() @@ -1813,6 +1827,14 @@ end  -- restarted (or is restarted from M.background_f, which is not  -- changed) and continues use old func1 and func2.  -- + +if not rawget(_G, MODULE_INTERNALS) then +    rawset(_G, MODULE_INTERNALS, M) +else +    storage_cfg(M.current_cfg, M.this_replica.uuid) +    M.module_version = M.module_version + 1 +end +  M.recovery_f = recovery_f  M.collect_garbage_f = collect_garbage_f  M.rebalancer_f = rebalancer_f @@ -1828,12 +1850,6 @@ M.rebalancer_build_routes = rebalancer_build_routes  M.rebalancer_calculate_metrics = rebalancer_calculate_metrics  M.cached_find_sharded_spaces = find_sharded_spaces -if not rawget(_G, '__module_vshard_storage') then -    rawset(_G, '__module_vshard_storage', M) -else -    M.module_version = M.module_version + 1 -end -  return {      sync = sync,      bucket_force_create = bucket_force_create, diff --git a/vshard/util.lua b/vshard/util.lua index bb71318..4b859cc 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -75,8 +75,28 @@ local function generate_self_checker(obj_name, func_name, mt, func)      end  end +local function sync_task(delay, task, ...) +    if delay then +        fiber.sleep(delay) +    end +    task(...) +end + +-- +-- Run a function without interrupting current fiber. +-- @param delay Delay in seconds before the task should be +--        executed. +-- @param task Function to be executed. +-- @param ... Arguments which would be passed to the `task`. +-- +local function async_task(delay, task, ...) +    assert(delay == nil or type(delay) == 'number') +    fiber.create(sync_task, delay, task, ...) +end +  return {      tuple_extract_key = tuple_extract_key,      reloadable_fiber_f = reloadable_fiber_f,      generate_self_checker = generate_self_checker, +    async_task = async_task,  }