[tarantool-patches] Re: [PATCH 2/2] Complete module reload

Alex Khatskevich avkhatskevich at tarantool.org
Tue Jun 19 10:09:47 MSK 2018


On 17.06.2018 22:46, Vladislav Shpilevoy wrote:
> Thanks for the patch! See 11 comments below.
>
> On 09/06/2018 20:47, AKhatskevich wrote:
>> In case one need to upgrade vshard to a new version, this commit
>> introduces a reload mechanism which allows to do that for a
>> noticeable variety of possible changes (between the two versions).
>
> Nitpicking: it does not introduce reload mechanism. This commit
> just improves the thing.
>
> Maybe should we place here a manual how to reload?
> A short example.
added
>
>>
>> Reload process:
>>   * reload all vshard modules
>>   * create new `replicaset` and `replica` objects
>>   * reuse old netbox connections in new replica objects if
>>     possible
>>   * update router/storage.internal table
>>   * after a `reload_deprecate_timeout` disable old instances of
>>     `replicaset` and `replica` objects
>>
>> Reload works for modules:
>>   * vshard.router
>>   * vshard.storage
>>
>> In case reload process fails, old router/storage module continue
>> working properly.
>>
>> Extra changes:
>>   * add `util.async_task` method, which runs a function after a
>>     delay
>>   * delete replicaset:rebind_connections method as it is replaced
>>     with `rebind_replicasets` which updates all replicasets at once
>>   * introduce `module_reloading` for distinguishing reloaf from
>
> 1. reloaf -> reload.
fixed
>
>>     reconfigure
>> * introduce MODULE_INTERNALS which stores name of the module
>>    internal data in the global namespace
>>
>> Closes #112
>> ---
>>   test/router/reload.result    | 109 
>> +++++++++++++++++++++++++++++++++++++++++++
>>   test/router/reload.test.lua  |  41 ++++++++++++++++
>>   test/router/router.result    |   3 +-
>>   test/storage/reload.result   |  61 ++++++++++++++++++++++++
>>   test/storage/reload.test.lua |  24 ++++++++++
>>   vshard/replicaset.lua        |  91 
>> +++++++++++++++++++++++++++++-------
>>   vshard/router/init.lua       |  40 ++++++++++++----
>>   vshard/storage/init.lua      |  44 ++++++++++++-----
>>   vshard/util.lua              |  20 ++++++++
>>   9 files changed, 392 insertions(+), 41 deletions(-)
>>
>
> I will review the tests later, after we finish with the code.
>
>> diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
>> index 99f59aa..48053a0 100644
>> --- a/vshard/replicaset.lua
>> +++ b/vshard/replicaset.lua
>> @@ -48,7 +48,8 @@ local lerror = require('vshard.error')
>>   local fiber = require('fiber')
>>   local luri = require('uri')
>>   local ffi = require('ffi')
>> -local gsc = require('vshard.util').generate_self_checker
>> +local util = require('vshard.util')
>> +local gsc = util.generate_self_checker
>>     --
>>   -- on_connect() trigger for net.box
>> @@ -338,26 +339,79 @@ local function replicaset_tostring(replicaset)
>>   end
>>     --
>> --- Rebind connections of old replicas to new ones.
>> +-- Deprecate old objects in case of reload.
>> +--
>> +local function clean_old_objects(old_replicaset_mt, old_replica_mt,
>> +                                 old_replicas)
>> +    local function get_deprecated_warning(obj_name)
>> +        return function(...)
>> +            local finfo = debug.getinfo(2)
>> +            local file = finfo.short_src
>> +            local name = finfo.name
>> +            local line = finfo.currentline
>> +            local err_fmt = '%s.%s:%s: Object %s is outdated. Use 
>> new instance'
>> +            error(string.format(err_fmt, file, name, line, obj_name))
>> +        end
>> +    end
> 2.1 Why could not you just declare this function out of 
> clean_old_object()?
> It does not depend on any upvalue here. And it is parsed every time when
> clean_old_objects is called slowing it down.
This function is called only on vshard reload and the pattern does not 
slow down that process
noticeable.
However it makes code much more readable.
Looking through the module you do not loose your attention on huge 
collection of different functions. Those helper-functions are used only 
inside of the `clean_old_objects` function and that fact is emphasized.
>
> 2.2. Too complex error message. Lets just throw error that object is
> outdated.
I have changed the API. Returning simple vshard error instead of raising 
an exception.
> 3. Why replace_mt_methods needs old mt? Why not just replace the whole 
> old
> __index?
>
> I thought you would do something like this:
>
> getmetatable(old_object).__index = function()
>     return function() error('The object is outdated') end
> end
>
> Any attempt to call a method in an old object leads to
> throwing an error.
>
> It allows to remove both replace_mt_methods, get_deprecated_warning.
>
> And one more remark: our convention is to throw only OOM and misusage
> errors. Here we should return two values: nil, error_object. Lets
> add a new error code for this object.
Fixed
>
>> +    for _, replica in pairs(old_replicas) do
>> +        replica.conn = nil
>> +    end
>> +    log.info('Old replicaset and replica objects are cleaned up.')
>> +end
>> +
>>   --
>> -local function replicaset_rebind_connections(replicaset)
>> -    for _, replica in pairs(replicaset.replicas) do
>> -        local old_replica = replica.old_replica
>> -        if old_replica then
>> -            local conn = old_replica.conn
>> -            replica.conn = conn
>> -            replica.down_ts = old_replica.down_ts
>> -            replica.net_timeout = old_replica.net_timeout
>> -            replica.net_sequential_ok = old_replica.net_sequential_ok
>> -            replica.net_sequential_fail = 
>> old_replica.net_sequential_fail
>> -            if conn then
>> -                conn.replica = replica
>> -                conn.replicaset = replicaset
>> -                old_replica.conn = nil
>> +-- Copy netbox conections from old replica objects to new ones.
>> +--
>> +local function rebind_replicasets(replicasets, deprecate, 
>> deprecate_timeout)
>
> 4. Lets always deprecate old objects. Reconfigure is like reload 
> called from API.
> And remove 'deprecate' parameter.
>
> On reconfiguration deprecating is even more useful because if during the
> reconfiguration a cluster topology is changed, then some of buckets 
> can be
> transferred making old routes (and thus old replicaset objects that
> had been got by router.route() call) outdated.
Discussed verbally. The main reason to deprecate old objects is that we 
have to keep track of
all objects to reload them and to reistablish connections in th future.
Fixed.
>
>> +    -- Collect data to deprecate old objects.
>> +    local old_replicaset_mt = nil
>> +    local old_replica_mt = nil
>> +    local old_replicas = {}
>> +    for _, replicaset in pairs(replicasets) do
>> +        if replicaset.old_replicaset then
>> +            local old_mt = getmetatable(replicaset.old_replicaset)
>> +            assert(old_replicaset_mt == nil or old_replicaset_mt == 
>> old_mt)
>> +            assert(not deprecate or
>> +                   old_replicaset_mt ~= getmetatable(replicaset))
>> +            old_replicaset_mt = old_mt
>> +            replicaset.old_replicaset = nil
>> +        end
>> +        for _, replica in pairs(replicaset.replicas) do
>> +            local old_replica = replica.old_replica
>> +            if old_replica then
>> +                local old_mt = getmetatable(old_replica)
>> +                assert(old_replica_mt == nil or old_replica_mt == 
>> old_mt)
>> +                assert(not deprecate or old_replica_mt ~= 
>> getmetatable(replica))
>> +                old_replica_mt = old_mt
>> +                table.insert(old_replicas, old_replica)
>> +                local conn = old_replica.conn
>> +                replica.conn = conn
>> +                replica.down_ts = old_replica.down_ts
>> +                replica.net_timeout = old_replica.net_timeout
>> +                replica.net_sequential_ok = 
>> old_replica.net_sequential_ok
>> +                replica.net_sequential_fail = 
>> old_replica.net_sequential_fail
>> +                if conn then
>> +                    conn.replica = replica
>> +                    conn.replicaset = replicaset
>> +                end
>> +                replica.old_replica = nil
>>               end
>> -            replica.old_replica = nil
>>           end
>>       end
>> +    if deprecate then
>> +        util.async_task(deprecate_timeout, clean_old_objects, 
>> old_replicaset_mt,
>> +                        old_replica_mt, old_replicas)
>
> 5. Here you have lost replicas and replicasets that had been removed from
> the configuration. Not updated, but exactly removed. You iterate over new
> replicasets only, that has not links to removed ones.
>
> You should not deprecate old objects via links in new ones. You should 
> have
> two different methods: to schedule old objects deprecation and to rebind
> connections. Looks, like rebind_connections should be returned.
I did not like old version of the method because iе had to be called for 
each replicaset.
I do want nor create as many fibers for deprecation nor bypass the fiber 
between calls.

To fix issue I had to write function `copy_metatables()` to distinguish 
metatables of old and new
objects.
Fixed.
>
>> +    end
>>   end
>>     --
>> @@ -523,6 +576,7 @@ local function buildall(sharding_cfg, 
>> old_replicasets)
>>               weight = replicaset.weight,
>>               bucket_count = 0,
>>               lock = replicaset.lock,
>> +            old_replicaset = old_replicaset,
>
> 6. You should not have this link.
old_replicaset deleted
old_replica deleted
(this functionality is moved to `rebind_replicasets`)
>
>>           }, replicaset_mt)
>>           local priority_list = {}
>> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>> index 1dee80c..c075436 100644
>> --- a/vshard/router/init.lua
>> +++ b/vshard/router/init.lua
>> @@ -129,6 +143,9 @@ local function discovery_f(module_version)
>>           consts.COLLECT_LUA_GARBAGE_INTERVAL / 
>> consts.DISCOVERY_INTERVAL
>>       while module_version == M.module_version do
>>           for _, replicaset in pairs(M.replicasets) do
>> +            if module_version ~= M.module_version then
>> +                return
>> +            end
>
> 7. As I told you verbally, this check should be done after callro.
> I can reload the module during callro(), and old replicaset object
> will fill the route map. Can you please extract this 3-line fix in a
> separate commit with a test?
Yes. I have done it in the other commit
(
along with gh-117
https://github.com/tarantool/vshard/commit/fa97ea478a9e4d85237eeb2bb8ccab0067d4914d
)
So, I would jut delete it from this patchset and leave for gh-117.
>
>>               local active_buckets, err =
>> replicaset:callro('vshard.storage.buckets_discovery', {},
>>                                     {timeout = 2})
>> @@ -457,8 +474,11 @@ end
>>   -- Configuration
>> --------------------------------------------------------------------------------
>>   +-- Distinguish reload from reconfigure.
>> +local module_reloading = true
>>   local function router_cfg(cfg)
>>       cfg = lcfg.check(cfg)
>> +    local current_cfg = table.deepcopy(cfg)
>
> 8. cfg is already deepcopy of the original cfg.
Yes, however there is a `lcfg.remove_non_box_options(cfg)`.
Possibly, for extensibility it is better to make a cfg copy just
for box.cfg{} before calling `remove_non_box_options`.
>
>>       if not M.replicasets then
>>           log.info('Starting router configuration')
>>       else
>> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
>> index 879c7c4..b570821 100644
>> --- a/vshard/storage/init.lua
>> +++ b/vshard/storage/init.lua
>> @@ -2,14 +2,27 @@ local log = require('log')
>>   local luri = require('uri')
>>   local lfiber = require('fiber')
>>   local netbox = require('net.box') -- for net.box:self()
>> +local trigger = require('internal.trigger')
>> +
>> +local MODULE_INTERNALS = '__module_vshard_storage'
>> +-- Reload requirements, in case this module is reloaded manually.
>> +if rawget(_G, MODULE_INTERNALS) then
>> +    local vshard_modules = {
>> +        'vshard.consts', 'vshard.error', 'vshard.cfg',
>> +        'vshard.replicaset', 'vshard.util',
>> +    }
>> +    for _, module in pairs(vshard_modules) do
>> +        package.loaded[module] = nil
>> +    end
>> +end
>>   local consts = require('vshard.consts')
>>   local lerror = require('vshard.error')
>> -local util = require('vshard.util')
>>   local lcfg = require('vshard.cfg')
>>   local lreplicaset = require('vshard.replicaset')
>> -local trigger = require('internal.trigger')
>> +local util = require('vshard.util')
>>   -local M = rawget(_G, '__module_vshard_storage')
>> +
>
> 9. Extra empty line.
fixed
>> +local M = rawget(_G, MODULE_INTERNALS)
>>   if not M then
>>       --
>>       -- The module is loaded for the first time.
>> @@ -21,6 +34,8 @@ if not M then
>>           -- See format in replicaset.lua.
>>           --
>>           replicasets = nil,
>> +        -- Time to deprecate old objects on reload.
>> +        reload_deprecate_timeout = nil,
>
> 10. Looks like this thing is always 'nil' until somebody does
> not manually set it via storage/router.internal. It is not good.
> Please, move it to configuration and create a default value.
Fixed
>
>>           -- Triggers on master switch event. They are called right
>>           -- before the event occurs.
>>           _on_master_enable = trigger.new('_on_master_enable'),
>> diff --git a/vshard/util.lua b/vshard/util.lua
>> index bb71318..f5bd78c 100644
>> --- a/vshard/util.lua
>> +++ b/vshard/util.lua
>> @@ -75,8 +75,28 @@ local function generate_self_checker(obj_name, 
>> func_name, mt, func)
>>       end
>>   end
>>   +
>> +--
>> +-- Run a function without interrupting current fiber.
>> +-- @param delay Delay in seconds before the task should be
>> +--        executed.
>> +-- @param task Function to be executed.
>> +-- @param ... Arguments which would be passed to the `task`.
>> +--
>> +local function async_task(delay, task, ...)
>> +    assert(delay == nil or type(delay) == 'number')
>> +    local function wait_and_call(...)
>> +        if delay then
>> +            fiber.sleep(delay)
>> +        end
>> +        task(...)
>> +    end
>
> 11. Please, avoid closures when possible. Every time the function is
> called the closure is parsed again.
As in the previous case. I did it to increase readability.
Fixed. (sync_task created)

I also have renamed Deprecate -> Outdate.

Here is a new diff:

commit fd6d22cbeef28097199b36e34bc7bf33f659f40a
Author: AKhatskevich <avkhatskevich at tarantool.org>
Date:   Sat Jun 9 17:23:40 2018 +0300

     Complete module reload

     In case one need to upgrade vshard to a new version, this commit
     improves reload mechanism to allow to do that for a wider variety of
     possible changes (between two versions).

     Changes:
      * introduce cfg option `connection_outdate_delay`
      * improve reload mechanism
      * add `util.async_task` method, which runs a function after a
        delay
      * delete replicaset:rebind_connections method as it is replaced
        with `rebind_replicasets` which updates all replicasets at once

     Reload mechanism:
      * reload all vshard modules
      * create new `replicaset` and `replica` objects
      * reuse old netbox connections in new replica objects if
        possible
      * update router/storage.internal table
      * after a `connection_outdate_delay` disable old instances of
        `replicaset` and `replica` objects

     Reload works for modules:
      * vshard.router
      * vshard.storage

     Here is a module reload algorihtm:
      * old vshard is working
      * delete old vshard src
      * install new vshard
      * call: package.loaded['vshard.router'] = nil
      * call: old_router = vshard.router -- Save working router copy.
      * call: vshard.router = require('vshard.router')
      * if require fails: continue using old_router
      * if require succeeds: use vshard.router

     In case reload process fails, old router/storage module, replicaset and
     replica objects continue working properly. If reload succeeds, all old
     objects would be deprecated.

     Extra changes:
      * introduce MODULE_INTERNALS which stores name of the module
        internal data in the global namespace

     Closes #112

diff --git a/test/router/reload.result b/test/router/reload.result
index 19a9ead..6517934 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -174,6 +174,143 @@ vshard.router.module_version()
  check_reloaded()
  ---
  ...
+--
+-- Outdate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_ = require('vshard.router')
+---
+...
+-- Make sure outdate async task has had cpu time.
+fiber.sleep(0.005)
+---
+...
+rs.callro(rs, 'echo', {'some_data'})
+---
+- null
+- type: ShardingError
+  name: OBJECT_IS_OUTDATED
+  message: Object is outdated after module reload/reconfigure. Use new 
instance.
+  code: 20
+...
+vshard.router = require('vshard.router')
+---
+...
+rs = vshard.router.route(1)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+-- Test `connection_outdate_delay`.
+old_connection_delay = cfg.connection_outdate_delay
+---
+...
+cfg.connection_outdate_delay = 0.3
+---
+...
+vshard.router.cfg(cfg)
+---
+...
+cfg.connection_outdate_delay = old_connection_delay
+---
+...
+vshard.router.internal.connection_outdate_delay = nil
+---
+...
+vshard.router = require('vshard.router')
+---
+...
+rs_new = vshard.router.route(1)
+---
+...
+rs_new:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+fiber.sleep(0.2)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+fiber.sleep(0.2)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- null
+- type: ShardingError
+  name: OBJECT_IS_OUTDATED
+  message: Object is outdated after module reload/reconfigure. Use new 
instance.
+  code: 20
+...
+err:match('Object replicaset is outdated.*')
+---
+- error: '[string "return err:match(''Object replicaset is 
outdat..."]:1: variable
+    ''err'' is not declared'
+...
+rs_new:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
+vshard.router.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.router.internal)
+---
+...
+package.loaded["vshard.router"] = nil
+---
+...
+_, err = pcall(require, 'vshard.router')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.router.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.router.internal)
+---
+- true
+...
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+---
+...
  test_run:switch('default')
  ---
  - true
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 6e21b74..9b73870 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -86,6 +86,47 @@ _ = require('vshard.router')
  vshard.router.module_version()
  check_reloaded()

+--
+-- Outdate old replicaset and replica objets.
+--
+rs = vshard.router.route(1)
+rs:callro('echo', {'some_data'})
+package.loaded["vshard.router"] = nil
+_ = require('vshard.router')
+-- Make sure outdate async task has had cpu time.
+fiber.sleep(0.005)
+rs.callro(rs, 'echo', {'some_data'})
+vshard.router = require('vshard.router')
+rs = vshard.router.route(1)
+rs:callro('echo', {'some_data'})
+-- Test `connection_outdate_delay`.
+old_connection_delay = cfg.connection_outdate_delay
+cfg.connection_outdate_delay = 0.3
+vshard.router.cfg(cfg)
+cfg.connection_outdate_delay = old_connection_delay
+vshard.router.internal.connection_outdate_delay = nil
+vshard.router = require('vshard.router')
+rs_new = vshard.router.route(1)
+rs_new:callro('echo', {'some_data'})
+rs:callro('echo', {'some_data'})
+fiber.sleep(0.2)
+rs:callro('echo', {'some_data'})
+fiber.sleep(0.2)
+rs:callro('echo', {'some_data'})
+err:match('Object replicaset is outdated.*')
+rs_new:callro('echo', {'some_data'})
+
+-- Error during reconfigure process.
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+vshard.router.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.router.internal)
+package.loaded["vshard.router"] = nil
+_, err = pcall(require, 'vshard.router')
+err:match('Error injection:.*')
+vshard.router.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.router.internal)
+_ = vshard.router.route(1):callro('echo', {'some_data'})
+
  test_run:switch('default')
  test_run:cmd('stop server router_1')
  test_run:cmd('cleanup server router_1')
diff --git a/test/router/router.result b/test/router/router.result
index 3ebab5d..71e156c 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -1024,11 +1024,10 @@ error_messages
  - - Use replicaset:callro(...) instead of replicaset.callro(...)
    - Use replicaset:connect_master(...) instead of 
replicaset.connect_master(...)
    - Use replicaset:connect_replica(...) instead of 
replicaset.connect_replica(...)
-  - Use replicaset:rebind_connections(...) instead of 
replicaset.rebind_connections(...)
    - Use replicaset:down_replica_priority(...) instead of 
replicaset.down_replica_priority(...)
    - Use replicaset:call(...) instead of replicaset.call(...)
-  - Use replicaset:up_replica_priority(...) instead of 
replicaset.up_replica_priority(...)
    - Use replicaset:connect(...) instead of replicaset.connect(...)
+  - Use replicaset:up_replica_priority(...) instead of 
replicaset.up_replica_priority(...)
    - Use replicaset:callrw(...) instead of replicaset.callrw(...)
    - Use replicaset:connect_all(...) instead of replicaset.connect_all(...)
  ...
diff --git a/test/storage/reload.result b/test/storage/reload.result
index f689cf4..8eec80f 100644
--- a/test/storage/reload.result
+++ b/test/storage/reload.result
@@ -174,6 +174,74 @@ vshard.storage.module_version()
  check_reloaded()
  ---
  ...
+--
+-- Outdate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_ = require('vshard.storage')
+---
+...
+rs.callro(rs, 'echo', {'some_data'})
+---
+- null
+- type: ShardingError
+  name: OBJECT_IS_OUTDATED
+  message: Object is outdated after module reload/reconfigure. Use new 
instance.
+  code: 20
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs.callro(rs, 'echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+rs:callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+---
+...
+old_internal = table.copy(vshard.storage.internal)
+---
+...
+package.loaded["vshard.storage"] = nil
+---
+...
+_, err = pcall(require, 'vshard.storage')
+---
+...
+err:match('Error injection:.*')
+---
+- 'Error injection: cfg'
+...
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+---
+...
+util.has_same_fields(old_internal, vshard.storage.internal)
+---
+- true
+...
+_, rs = next(vshard.storage.internal.replicasets)
+---
+...
+_ = rs:callro('echo', {'some_data'})
+---
+...
  test_run:switch('default')
  ---
  - true
diff --git a/test/storage/reload.test.lua b/test/storage/reload.test.lua
index 6e19a92..7d870ff 100644
--- a/test/storage/reload.test.lua
+++ b/test/storage/reload.test.lua
@@ -87,6 +87,29 @@ _ = require('vshard.storage')
  vshard.storage.module_version()
  check_reloaded()

+--
+-- Outdate old replicaset and replica objets.
+--
+_, rs = next(vshard.storage.internal.replicasets)
+package.loaded["vshard.storage"] = nil
+_ = require('vshard.storage')
+rs.callro(rs, 'echo', {'some_data'})
+_, rs = next(vshard.storage.internal.replicasets)
+rs.callro(rs, 'echo', {'some_data'})
+
+-- Error during reload process.
+_, rs = next(vshard.storage.internal.replicasets)
+rs:callro('echo', {'some_data'})
+vshard.storage.internal.errinj.ERRINJ_CFG = true
+old_internal = table.copy(vshard.storage.internal)
+package.loaded["vshard.storage"] = nil
+_, err = pcall(require, 'vshard.storage')
+err:match('Error injection:.*')
+vshard.storage.internal.errinj.ERRINJ_CFG = false
+util.has_same_fields(old_internal, vshard.storage.internal)
+_, rs = next(vshard.storage.internal.replicasets)
+_ = rs:callro('echo', {'some_data'})
+
  test_run:switch('default')
  test_run:drop_cluster(REPLICASET_2)
  test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index f5db4c0..5703867 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -218,6 +218,10 @@ local cfg_template = {
          type = 'non-negative number', name = 'Sync timeout', 
is_optional = true,
          default = consts.DEFAULT_SYNC_TIMEOUT
      }},
+    {'connection_outdate_delay', {
+        type = 'non-negative number', name = 'Object outdate timeout',
+        is_optional = true, default = nil
+    }},
  }

  --
@@ -247,6 +251,8 @@ local function remove_non_box_options(cfg)
      cfg.collect_bucket_garbage_interval = nil
      cfg.collect_lua_garbage = nil
      cfg.sync_timeout = nil
+    cfg.sync_timeout = nil
+    cfg.connection_outdate_delay = nil
  end

  return {
diff --git a/vshard/error.lua b/vshard/error.lua
index cf2f9d2..9df2c53 100644
--- a/vshard/error.lua
+++ b/vshard/error.lua
@@ -100,6 +100,11 @@ local error_message_template = {
      [19] = {
          name = 'REPLICASET_IS_LOCKED',
          msg = 'Replicaset is locked'
+    },
+    [20] = {
+        name = 'OBJECT_IS_OUTDATED',
+        msg = 'Object is outdated after module reload/reconfigure. ' ..
+            'Use new instance.'
      }
  }

diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 99f59aa..4e7bc10 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -48,7 +48,8 @@ local lerror = require('vshard.error')
  local fiber = require('fiber')
  local luri = require('uri')
  local ffi = require('ffi')
-local gsc = require('vshard.util').generate_self_checker
+local util = require('vshard.util')
+local gsc = util.generate_self_checker

  --
  -- on_connect() trigger for net.box
@@ -338,26 +339,80 @@ local function replicaset_tostring(replicaset)
  end

  --
--- Rebind connections of old replicas to new ones.
+-- Outdate old objects in case of reload/reconfigure.
  --
-local function replicaset_rebind_connections(replicaset)
-    for _, replica in pairs(replicaset.replicas) do
-        local old_replica = replica.old_replica
-        if old_replica then
-            local conn = old_replica.conn
-            replica.conn = conn
-            replica.down_ts = old_replica.down_ts
-            replica.net_timeout = old_replica.net_timeout
-            replica.net_sequential_ok = old_replica.net_sequential_ok
-            replica.net_sequential_fail = old_replica.net_sequential_fail
-            if conn then
-                conn.replica = replica
-                conn.replicaset = replicaset
-                old_replica.conn = nil
+local function cleanup_old_objects(old_replicaset_mt, old_replica_mt,
+                                 old_replicas)
+    local function get_outdated_warning()
+        return function(...)
+            return nil, lerror.vshard(lerror.code.OBJECT_IS_OUTDATED)
+        end
+    end
+    old_replicaset_mt.__index = get_outdated_warning
+    -- Leave replica_mt unchanged. All its methods are safe.
+    for _, replica in pairs(old_replicas) do
+        replica.conn = nil
+    end
+    log.info('Old replicaset and replica objects are outdated.')
+end
+
+--
+-- Initiate outdating process, which cancels connections and
+-- spoils object methods.
+--
+local function outdate_replicasets(replicasets, outdate_delay)
+    -- Collect data to outdate old objects.
+    local old_replicaset_mt = nil
+    local old_replica_mt = nil
+    local old_replicas = {}
+    for _, replicaset in pairs(replicasets) do
+        local old_mt = getmetatable(replicaset)
+        assert(old_replicaset_mt == nil or old_replicaset_mt == old_mt)
+        old_replicaset_mt = old_mt
+        for _, replica in pairs(replicaset.replicas) do
+            table.insert(old_replicas, replica)
+            local old_mt = getmetatable(replica)
+            assert(old_replica_mt == nil or old_replica_mt == old_mt)
+            old_replica_mt = old_mt
+        end
+    end
+    util.async_task(outdate_delay, cleanup_old_objects, old_replicaset_mt,
+                    old_replica_mt, old_replicas)
+end
+
+
+--
+-- Copy netbox conections from old replica objects to new ones
+-- and outdate old objects.
+-- @param replicasets New replicasets
+-- @param old_replicasets Replicasets and replicas to be outdated.
+-- @param outdate_delay Number of seconds; delay to outdate
+--        old objects.
+--
+local function rebind_replicasets(replicasets, old_replicasets, 
outdate_delay)
+    for replicaset_uuid, replicaset in pairs(replicasets) do
+        local old_replicaset = old_replicasets and
+ old_replicasets[replicaset_uuid]
+        for replica_uuid, replica in pairs(replicaset.replicas) do
+            local old_replica = old_replicaset and
+ old_replicaset.replicas[replica_uuid]
+            if old_replica then
+                local conn = old_replica.conn
+                replica.conn = conn
+                replica.down_ts = old_replica.down_ts
+                replica.net_timeout = old_replica.net_timeout
+                replica.net_sequential_ok = old_replica.net_sequential_ok
+                replica.net_sequential_fail = 
old_replica.net_sequential_fail
+                if conn then
+                    conn.replica = replica
+                    conn.replicaset = replicaset
+                end
              end
-            replica.old_replica = nil
          end
      end
+    if old_replicasets then
+        outdate_replicasets(old_replicasets, outdate_delay)
+    end
  end

  --
@@ -369,7 +424,6 @@ local replicaset_mt = {
          connect_master = replicaset_connect_master;
          connect_all = replicaset_connect_all;
          connect_replica = replicaset_connect_to_replica;
-        rebind_connections = replicaset_rebind_connections;
          down_replica_priority = replicaset_down_replica_priority;
          up_replica_priority = replicaset_up_replica_priority;
          call = replicaset_master_call;
@@ -378,14 +432,6 @@ local replicaset_mt = {
      };
      __tostring = replicaset_tostring;
  }
---
--- Wrap self methods with a sanity checker.
---
-local index = {}
-for name, func in pairs(replicaset_mt.__index) do
-    index[name] = gsc("replicaset", name, replicaset_mt, func)
-end
-replicaset_mt.__index = index

  local replica_mt = {
      __index = {
@@ -406,11 +452,30 @@ local replica_mt = {
          end
      end,
  }
-index = {}
-for name, func in pairs(replica_mt.__index) do
-    index[name] = gsc("replica", name, replica_mt, func)
+
+--
+-- Get new copy of replicaset and replica meta tables.
+-- It is important to have distinct copies of meta tables to
+-- be able to outdate only old objects.
+--
+local function copy_metatables()
+    --
+    -- Wrap self methods with a sanity checker.
+    --
+    local replicaset_mt_copy = table.copy(replicaset_mt)
+    local replica_mt_copy = table.copy(replica_mt)
+    local index = {}
+    for name, func in pairs(replicaset_mt_copy.__index) do
+        index[name] = gsc("replicaset", name, replicaset_mt_copy, func)
+    end
+    replicaset_mt_copy.__index = index
+    index = {}
+    for name, func in pairs(replica_mt_copy.__index) do
+        index[name] = gsc("replica", name, replica_mt_copy, func)
+    end
+    replica_mt_copy.__index = index
+    return replicaset_mt_copy, replica_mt_copy
  end
-replica_mt.__index = index

  --
  -- Calculate for each replicaset its etalon bucket count.
@@ -514,20 +579,17 @@ local function buildall(sharding_cfg, old_replicasets)
          zone_weights = {}
      end
      local curr_ts = fiber.time()
+    local replicaset_mt_copy, replica_mt_copy = copy_metatables()
      for replicaset_uuid, replicaset in pairs(sharding_cfg.sharding) do
-        local old_replicaset = old_replicasets and
- old_replicasets[replicaset_uuid]
          local new_replicaset = setmetatable({
              replicas = {},
              uuid = replicaset_uuid,
              weight = replicaset.weight,
              bucket_count = 0,
              lock = replicaset.lock,
-        }, replicaset_mt)
+        }, replicaset_mt_copy)
          local priority_list = {}
          for replica_uuid, replica in pairs(replicaset.replicas) do
-            local old_replica = old_replicaset and
- old_replicaset.replicas[replica_uuid]
              -- The old replica is saved in the new object to
              -- rebind its connection at the end of a
              -- router/storage reconfiguration.
@@ -535,8 +597,8 @@ local function buildall(sharding_cfg, old_replicasets)
                  uri = replica.uri, name = replica.name, uuid = 
replica_uuid,
                  zone = replica.zone, net_timeout = 
consts.CALL_TIMEOUT_MIN,
                  net_sequential_ok = 0, net_sequential_fail = 0,
-                down_ts = curr_ts, old_replica = old_replica,
-            }, replica_mt)
+                down_ts = curr_ts,
+            }, replica_mt_copy)
              new_replicaset.replicas[replica_uuid] = new_replica
              if replica.master then
                  new_replicaset.master = new_replica
@@ -596,4 +658,6 @@ return {
      buildall = buildall,
      calculate_etalon_balance = cluster_calculate_etalon_balance,
      wait_masters_connect = wait_masters_connect,
+    rebind_replicasets = rebind_replicasets,
+    outdate_replicasets = outdate_replicasets,
  }
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 1dee80c..d70d060 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -1,5 +1,17 @@
  local log = require('log')
  local lfiber = require('fiber')
+
+local MODULE_INTERNALS = '__module_vshard_router'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+    local vshard_modules = {
+        'vshard.consts', 'vshard.error', 'vshard.cfg',
+        'vshard.hash', 'vshard.replicaset', 'vshard.util',
+    }
+    for _, module in pairs(vshard_modules) do
+        package.loaded[module] = nil
+    end
+end
  local consts = require('vshard.consts')
  local lerror = require('vshard.error')
  local lcfg = require('vshard.cfg')
@@ -7,7 +19,7 @@ local lhash = require('vshard.hash')
  local lreplicaset = require('vshard.replicaset')
  local util = require('vshard.util')

-local M = rawget(_G, '__module_vshard_router')
+local M = rawget(_G, MODULE_INTERNALS)
  if not M then
      M = {
          errinj = {
@@ -15,6 +27,8 @@ if not M then
              ERRINJ_FAILOVER_CHANGE_CFG = false,
              ERRINJ_RELOAD = false,
          },
+        -- Time to outdate old objects on reload.
+        connection_outdate_delay = nil,
          -- Bucket map cache.
          route_map = {},
          -- All known replicasets used for bucket re-balancing
@@ -459,6 +473,7 @@ end

  local function router_cfg(cfg)
      cfg = lcfg.check(cfg)
+    local current_cfg = table.deepcopy(cfg)
      if not M.replicasets then
          log.info('Starting router configuration')
      else
@@ -480,17 +495,18 @@ local function router_cfg(cfg)
      if M.errinj.ERRINJ_CFG then
          error('Error injection: cfg')
      end
+    M.connection_outdate_delay = current_cfg.connection_outdate_delay
      M.total_bucket_count = total_bucket_count
      M.collect_lua_garbage = collect_lua_garbage
+    M.current_cfg = current_cfg
      -- TODO: update existing route map in-place
      M.route_map = {}
-    M.replicasets = new_replicasets
      -- Move connections from an old configuration to a new one.
      -- It must be done with no yields to prevent usage both of not
      -- fully moved old replicasets, and not fully built new ones.
-    for _, replicaset in pairs(new_replicasets) do
-        replicaset:rebind_connections()
-    end
+    lreplicaset.rebind_replicasets(new_replicasets, M.replicasets,
+ M.connection_outdate_delay)
+    M.replicasets = new_replicasets
      -- Now the new replicasets are fully built. Can establish
      -- connections and yield.
      for _, replicaset in pairs(new_replicasets) do
@@ -776,15 +792,16 @@ end
  -- About functions, saved in M, and reloading see comment in
  -- storage/init.lua.
  --
-M.discovery_f = discovery_f
-M.failover_f = failover_f
-
-if not rawget(_G, '__module_vshard_router') then
-    rawset(_G, '__module_vshard_router', M)
+if not rawget(_G, MODULE_INTERNALS) then
+    rawset(_G, MODULE_INTERNALS, M)
  else
+    router_cfg(M.current_cfg)
      M.module_version = M.module_version + 1
  end

+M.discovery_f = discovery_f
+M.failover_f = failover_f
+
  return {
      cfg = router_cfg;
      info = router_info;
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 879c7c4..50347b0 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -2,14 +2,26 @@ local log = require('log')
  local luri = require('uri')
  local lfiber = require('fiber')
  local netbox = require('net.box') -- for net.box:self()
+local trigger = require('internal.trigger')
+
+local MODULE_INTERNALS = '__module_vshard_storage'
+-- Reload requirements, in case this module is reloaded manually.
+if rawget(_G, MODULE_INTERNALS) then
+    local vshard_modules = {
+        'vshard.consts', 'vshard.error', 'vshard.cfg',
+        'vshard.replicaset', 'vshard.util',
+    }
+    for _, module in pairs(vshard_modules) do
+        package.loaded[module] = nil
+    end
+end
  local consts = require('vshard.consts')
  local lerror = require('vshard.error')
-local util = require('vshard.util')
  local lcfg = require('vshard.cfg')
  local lreplicaset = require('vshard.replicaset')
-local trigger = require('internal.trigger')
+local util = require('vshard.util')

-local M = rawget(_G, '__module_vshard_storage')
+local M = rawget(_G, MODULE_INTERNALS)
  if not M then
      --
      -- The module is loaded for the first time.
@@ -21,6 +33,8 @@ if not M then
          -- See format in replicaset.lua.
          --
          replicasets = nil,
+        -- Time to deprecate old objects on reload.
+        reload_deprecate_timeout = nil,
          -- Triggers on master switch event. They are called right
          -- before the event occurs.
          _on_master_enable = trigger.new('_on_master_enable'),
@@ -1450,6 +1464,7 @@ local function storage_cfg(cfg, this_replica_uuid)
          error('Usage: cfg(configuration, this_replica_uuid)')
      end
      cfg = lcfg.check(cfg)
+    local current_cfg = table.deepcopy(cfg)
      if cfg.weights or cfg.zone then
          error('Weights and zone are not allowed for storage 
configuration')
      end
@@ -1571,10 +1586,8 @@ local function storage_cfg(cfg, this_replica_uuid)
      local uri = luri.parse(this_replica.uri)
      box.once("vshard:storage:1", storage_schema_v1, uri.login, 
uri.password)

+    lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
      M.replicasets = new_replicasets
-    for _, replicaset in pairs(new_replicasets) do
-        replicaset:rebind_connections()
-    end
      M.this_replicaset = this_replicaset
      M.this_replica = this_replica
      M.total_bucket_count = total_bucket_count
@@ -1583,6 +1596,7 @@ local function storage_cfg(cfg, this_replica_uuid)
      M.shard_index = shard_index
      M.collect_bucket_garbage_interval = collect_bucket_garbage_interval
      M.collect_lua_garbage = collect_lua_garbage
+    M.current_cfg = current_cfg

      if was_master and not is_master then
          local_on_master_disable()
@@ -1813,6 +1827,14 @@ end
  -- restarted (or is restarted from M.background_f, which is not
  -- changed) and continues use old func1 and func2.
  --
+
+if not rawget(_G, MODULE_INTERNALS) then
+    rawset(_G, MODULE_INTERNALS, M)
+else
+    storage_cfg(M.current_cfg, M.this_replica.uuid)
+    M.module_version = M.module_version + 1
+end
+
  M.recovery_f = recovery_f
  M.collect_garbage_f = collect_garbage_f
  M.rebalancer_f = rebalancer_f
@@ -1828,12 +1850,6 @@ M.rebalancer_build_routes = rebalancer_build_routes
  M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
  M.cached_find_sharded_spaces = find_sharded_spaces

-if not rawget(_G, '__module_vshard_storage') then
-    rawset(_G, '__module_vshard_storage', M)
-else
-    M.module_version = M.module_version + 1
-end
-
  return {
      sync = sync,
      bucket_force_create = bucket_force_create,
diff --git a/vshard/util.lua b/vshard/util.lua
index bb71318..4b859cc 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -75,8 +75,28 @@ local function generate_self_checker(obj_name, 
func_name, mt, func)
      end
  end

+local function sync_task(delay, task, ...)
+    if delay then
+        fiber.sleep(delay)
+    end
+    task(...)
+end
+
+--
+-- Run a function without interrupting current fiber.
+-- @param delay Delay in seconds before the task should be
+--        executed.
+-- @param task Function to be executed.
+-- @param ... Arguments which would be passed to the `task`.
+--
+local function async_task(delay, task, ...)
+    assert(delay == nil or type(delay) == 'number')
+    fiber.create(sync_task, delay, task, ...)
+end
+
  return {
      tuple_extract_key = tuple_extract_key,
      reloadable_fiber_f = reloadable_fiber_f,
      generate_self_checker = generate_self_checker,
+    async_task = async_task,
  }










More information about the Tarantool-patches mailing list