From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 08DE82881B for ; Wed, 8 Aug 2018 10:04:59 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 3INh97lJoaMZ for ; Wed, 8 Aug 2018 10:04:58 -0400 (EDT) Received: from smtp49.i.mail.ru (smtp49.i.mail.ru [94.100.177.109]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 2F9B223F43 for ; Wed, 8 Aug 2018 10:04:58 -0400 (EDT) Subject: [tarantool-patches] Re: [PATCH 3/3] Introduce multiple routers feature References: <30ab88fc-fba0-11d8-254c-385e59caead7@tarantool.org> <9a0a958a-5f66-2aa5-83de-e2d6f55cbd71@tarantool.org> <9109c71a-0313-82c8-02db-3e4dd8c833cd@tarantool.org> <875d8175-1655-25d3-1ed2-469ca0df596e@tarantool.org> From: Alex Khatskevich Message-ID: <6f750fd3-4c6e-cfe0-add9-ee482329d893@tarantool.org> Date: Wed, 8 Aug 2018 17:04:55 +0300 MIME-Version: 1.0 In-Reply-To: <875d8175-1655-25d3-1ed2-469ca0df596e@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-US Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: Vladislav Shpilevoy , tarantool-patches@freelists.org On 08.08.2018 15:28, Vladislav Shpilevoy wrote: > Thanks for the fixes! > > 1. Please, rebase on the master. I've failed to do it > easy. > Done > 2. Please, adding a new commit send it to the same thread. > I am talking about "Fix: do not update route map in place". > > Since you've not sent it, I review it here. > > 2.1. At first, please, prefix the commit title with a > subsystem name the patch is for. Here it is not "Fix: ", > but "router: ". > > 2.2. We know a new route map size before rebuild - it is > equal to the total bucket count. So it can be allocated > once via table.new(total_bucket_count, 0). It allows to > avoid reallocs. > > I've fixed both remarks and pushed the commit into the > master. > Thanks >> +local function router_new(name, cfg) >> +    if type(name) ~= 'string' or type(cfg) ~= 'table' then >> +           error('Wrong argument type. Usage: >> vshard.router.new(name, cfg).') >> +    end >> +    if M.routers[name] then >> +        return nil, lerror.vshard(lerror.code.ROUTER_ALREADY_EXISTS, >> name) >> +    end >> +    local router = table.deepcopy(ROUTER_TEMPLATE) >> +    setmetatable(router, router_mt) >> +    router.name = name >> +    M.routers[name] = router >> +    router_cfg(router, cfg) > > 3. router_cfg can raise an error from box.cfg. So on an error lets > catch it, > remove the router from M.routers and rethrow the error. Done > > In other things the patch LGTM. Please, fix the comments above and I will > push it. Thank you for working on this! full diff commit 5cc3991487b6b212ef1c35880963c020e443200e Author: AKhatskevich Date:   Thu Jul 26 16:17:25 2018 +0300     Introduce multiple routers feature     Key points:     * Old `vshard.router.some_method()` api is preserved.     * Add `vshard.router.new(name, cfg)` method which returns a new router.     * Each router has its own:       1. name       2. background fibers       3. attributes (route_map, replicasets, outdate_delay...)     * Module reload reloads all configured routers.     * `cfg` reconfigures a single router.     * All routers share the same box configuration. The last passed config       overrides the global box config.     * Multiple router instances can be connected to the same cluster.     * By now, a router cannot be destroyed.     Extra changes:     * Add `data` parameter to `reloadable_fiber_create` function.     Closes #130 diff --git a/test/failover/failover.result b/test/failover/failover.result index 73a4250..50410ad 100644 --- a/test/failover/failover.result +++ b/test/failover/failover.result @@ -174,7 +174,7 @@ test_run:switch('router_1')  ---  - true  ... -rs1 = vshard.router.internal.replicasets[rs_uuid[1]] +rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]]  ---  ...  while not rs1.replica_up_ts do fiber.sleep(0.1) end diff --git a/test/failover/failover.test.lua b/test/failover/failover.test.lua index 6e06314..44c8b6d 100644 --- a/test/failover/failover.test.lua +++ b/test/failover/failover.test.lua @@ -74,7 +74,7 @@ echo_count  -- Ensure that replica_up_ts is updated periodically.  test_run:switch('router_1') -rs1 = vshard.router.internal.replicasets[rs_uuid[1]] +rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]]  while not rs1.replica_up_ts do fiber.sleep(0.1) end  old_up_ts = rs1.replica_up_ts  while rs1.replica_up_ts == old_up_ts do fiber.sleep(0.1) end diff --git a/test/failover/failover_errinj.result b/test/failover/failover_errinj.result index 3b6d986..484a1e3 100644 --- a/test/failover/failover_errinj.result +++ b/test/failover/failover_errinj.result @@ -49,7 +49,7 @@ vshard.router.cfg(cfg)  -- Check that already run failover step is restarted on  -- configuration change (if some replicasets are removed from  -- config). -rs1 = vshard.router.internal.replicasets[rs_uuid[1]] +rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]]  ---  ...  while not rs1.replica or not rs1.replica.conn:is_connected() do fiber.sleep(0.1) end diff --git a/test/failover/failover_errinj.test.lua b/test/failover/failover_errinj.test.lua index b4d2d35..14228de 100644 --- a/test/failover/failover_errinj.test.lua +++ b/test/failover/failover_errinj.test.lua @@ -20,7 +20,7 @@ vshard.router.cfg(cfg)  -- Check that already run failover step is restarted on  -- configuration change (if some replicasets are removed from  -- config). -rs1 = vshard.router.internal.replicasets[rs_uuid[1]] +rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]]  while not rs1.replica or not rs1.replica.conn:is_connected() do fiber.sleep(0.1) end  vshard.router.internal.errinj.ERRINJ_FAILOVER_CHANGE_CFG = true  wait_state('Configuration has changed, restart ') diff --git a/test/failover/router_1.lua b/test/failover/router_1.lua index d71209b..664a6c6 100644 --- a/test/failover/router_1.lua +++ b/test/failover/router_1.lua @@ -42,7 +42,7 @@ end  function priority_order()      local ret = {}      for _, uuid in pairs(rs_uuid) do -        local rs = vshard.router.internal.replicasets[uuid] +        local rs = vshard.router.internal.static_router.replicasets[uuid]          local sorted = {}          for _, replica in pairs(rs.priority_list) do              local z diff --git a/test/misc/reconfigure.result b/test/misc/reconfigure.result index c7960b3..311f749 100644 --- a/test/misc/reconfigure.result +++ b/test/misc/reconfigure.result @@ -250,7 +250,7 @@ test_run:switch('router_1')  -- Ensure that in a case of error router internals are not  -- changed.  -- -not vshard.router.internal.collect_lua_garbage +not vshard.router.internal.static_router.collect_lua_garbage  ---  - true  ... @@ -264,7 +264,7 @@ vshard.router.cfg(cfg)  ---  - error: 'Incorrect value for option ''invalid_option'': unexpected option'  ... -not vshard.router.internal.collect_lua_garbage +not vshard.router.internal.static_router.collect_lua_garbage  ---  - true  ... diff --git a/test/misc/reconfigure.test.lua b/test/misc/reconfigure.test.lua index 25dc2ca..298b9b0 100644 --- a/test/misc/reconfigure.test.lua +++ b/test/misc/reconfigure.test.lua @@ -99,11 +99,11 @@ test_run:switch('router_1')  -- Ensure that in a case of error router internals are not  -- changed.  -- -not vshard.router.internal.collect_lua_garbage +not vshard.router.internal.static_router.collect_lua_garbage  cfg.collect_lua_garbage = true  cfg.invalid_option = 'kek'  vshard.router.cfg(cfg) -not vshard.router.internal.collect_lua_garbage +not vshard.router.internal.static_router.collect_lua_garbage  cfg.invalid_option = nil  cfg.collect_lua_garbage = nil  vshard.router.cfg(cfg) diff --git a/test/multiple_routers/configs.lua b/test/multiple_routers/configs.lua new file mode 100644 index 0000000..a6ce33c --- /dev/null +++ b/test/multiple_routers/configs.lua @@ -0,0 +1,81 @@ +names = { +    storage_1_1_a = '32a2d4b8-f146-44ed-9d51-2436507efdf8', +    storage_1_1_b = 'c1c849b1-641d-40b8-9283-bcfe73d46270', +    storage_1_2_a = '04e677ed-c7ba-47e0-a67f-b5100cfa86af', +    storage_1_2_b = 'c7a979ee-9263-4a38-84a5-2fb6a0a32684', +    storage_2_1_a = '88dc03f0-23fb-4f05-b462-e29186542864', +    storage_2_1_b = '4230b711-f5c4-4131-bf98-88cd43a16901', +    storage_2_2_a = '6b1eefbc-1e2e-410e-84ff-44c572ea9916', +    storage_2_2_b = 'be74419a-1e56-4ba4-97e9-6b18710f63c5', +} + +rs_1_1 = 'dd208fb8-8b90-49bc-8393-6b3a99da7c52' +rs_1_2 = 'af9cfe88-2091-4613-a877-a623776c5c0e' +rs_2_1 = '9ca8ee15-ae18-4f31-9385-4859f89ce73f' +rs_2_2 = '007f5f58-b654-4125-8441-a71866fb62b5' + +local cfg_1 = {} +cfg_1.sharding = { +    [rs_1_1] = { +        replicas = { +            [names.storage_1_1_a] = { +                uri = 'storage:storage@127.0.0.1:3301', +                name = 'storage_1_1_a', +                master = true, +            }, +            [names.storage_1_1_b] = { +                uri = 'storage:storage@127.0.0.1:3302', +                name = 'storage_1_1_b', +            }, +        } +    }, +    [rs_1_2] = { +        replicas = { +            [names.storage_1_2_a] = { +                uri = 'storage:storage@127.0.0.1:3303', +                name = 'storage_1_2_a', +                master = true, +            }, +            [names.storage_1_2_b] = { +                uri = 'storage:storage@127.0.0.1:3304', +                name = 'storage_1_2_b', +            }, +        } +    }, +} + + +local cfg_2 = {} +cfg_2.sharding = { +    [rs_2_1] = { +        replicas = { +            [names.storage_2_1_a] = { +                uri = 'storage:storage@127.0.0.1:3305', +                name = 'storage_2_1_a', +                master = true, +            }, +            [names.storage_2_1_b] = { +                uri = 'storage:storage@127.0.0.1:3306', +                name = 'storage_2_1_b', +            }, +        } +    }, +    [rs_2_2] = { +        replicas = { +            [names.storage_2_2_a] = { +                uri = 'storage:storage@127.0.0.1:3307', +                name = 'storage_2_2_a', +                master = true, +            }, +            [names.storage_2_2_b] = { +                uri = 'storage:storage@127.0.0.1:3308', +                name = 'storage_2_2_b', +            }, +        } +    }, +} + +return { +    cfg_1 = cfg_1, +    cfg_2 = cfg_2, +} diff --git a/test/multiple_routers/multiple_routers.result b/test/multiple_routers/multiple_routers.result new file mode 100644 index 0000000..5b85e1c --- /dev/null +++ b/test/multiple_routers/multiple_routers.result @@ -0,0 +1,301 @@ +test_run = require('test_run').new() +--- +... +REPLICASET_1_1 = { 'storage_1_1_a', 'storage_1_1_b' } +--- +... +REPLICASET_1_2 = { 'storage_1_2_a', 'storage_1_2_b' } +--- +... +REPLICASET_2_1 = { 'storage_2_1_a', 'storage_2_1_b' } +--- +... +REPLICASET_2_2 = { 'storage_2_2_a', 'storage_2_2_b' } +--- +... +test_run:create_cluster(REPLICASET_1_1, 'multiple_routers') +--- +... +test_run:create_cluster(REPLICASET_1_2, 'multiple_routers') +--- +... +test_run:create_cluster(REPLICASET_2_1, 'multiple_routers') +--- +... +test_run:create_cluster(REPLICASET_2_2, 'multiple_routers') +--- +... +util = require('lua_libs.util') +--- +... +util.wait_master(test_run, REPLICASET_1_1, 'storage_1_1_a') +--- +... +util.wait_master(test_run, REPLICASET_1_2, 'storage_1_2_a') +--- +... +util.wait_master(test_run, REPLICASET_2_1, 'storage_2_1_a') +--- +... +util.wait_master(test_run, REPLICASET_2_2, 'storage_2_2_a') +--- +... +test_run:cmd("create server router_1 with script='multiple_routers/router_1.lua'") +--- +- true +... +test_run:cmd("start server router_1") +--- +- true +... +-- Configure default (static) router. +_ = test_run:cmd("switch router_1") +--- +... +vshard.router.cfg(configs.cfg_1) +--- +... +vshard.router.bootstrap() +--- +- true +... +_ = test_run:cmd("switch storage_1_2_a") +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +_ = test_run:cmd("switch router_1") +--- +... +vshard.router.call(1, 'write', 'do_replace', {{1, 1}}) +--- +- true +... +vshard.router.call(1, 'read', 'do_select', {1}) +--- +- [[1, 1]] +... +-- Test that static router is just a router object under the hood. +static_router = vshard.router.internal.static_router +--- +... +static_router:route(1) == vshard.router.route(1) +--- +- true +... +-- Configure extra router. +router_2 = vshard.router.new('router_2', configs.cfg_2) +--- +... +router_2:bootstrap() +--- +- true +... +_ = test_run:cmd("switch storage_2_2_a") +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +_ = test_run:cmd("switch router_1") +--- +... +router_2:call(1, 'write', 'do_replace', {{2, 2}}) +--- +- true +... +router_2:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +-- Check that router_2 and static router serves different clusters. +#router_2:call(1, 'read', 'do_select', {1}) == 0 +--- +- true +... +-- Create several routers to the same cluster. +routers = {} +--- +... +for i = 3, 10 do routers[i] = vshard.router.new('router_' .. i, configs.cfg_2) end +--- +... +routers[3]:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +-- Check that they have their own background fibers. +fiber_names = {} +--- +... +for i = 2, 10 do fiber_names['vshard.failover.router_' .. i] = true; fiber_names['vshard.discovery.router_' .. i] = true; end +--- +... +next(fiber_names) ~= nil +--- +- true +... +fiber = require('fiber') +--- +... +for _, xfiber in pairs(fiber.info()) do fiber_names[xfiber.name] = nil end +--- +... +next(fiber_names) == nil +--- +- true +... +-- Reconfigure one of routers do not affect the others. +routers[3]:cfg(configs.cfg_1) +--- +... +routers[3]:call(1, 'read', 'do_select', {1}) +--- +- [[1, 1]] +... +#routers[3]:call(1, 'read', 'do_select', {2}) == 0 +--- +- true +... +#routers[4]:call(1, 'read', 'do_select', {1}) == 0 +--- +- true +... +routers[4]:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +routers[3]:cfg(configs.cfg_2) +--- +... +-- Try to create router with the same name. +util = require('lua_libs.util') +--- +... +util.check_error(vshard.router.new, 'router_2', configs.cfg_2) +--- +- null +- type: ShardingError +  code: 21 +  name: ROUTER_ALREADY_EXISTS +  message: Router with name router_2 already exists +... +-- Reload router module. +_, old_rs_1 = next(vshard.router.internal.static_router.replicasets) +--- +... +_, old_rs_2 = next(router_2.replicasets) +--- +... +package.loaded['vshard.router'] = nil +--- +... +vshard.router = require('vshard.router') +--- +... +while not old_rs_1.is_outdated do fiber.sleep(0.01) end +--- +... +while not old_rs_2.is_outdated do fiber.sleep(0.01) end +--- +... +vshard.router.call(1, 'read', 'do_select', {1}) +--- +- [[1, 1]] +... +router_2:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +routers[5]:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +-- Check lua_gc counter. +lua_gc = require('vshard.lua_gc') +--- +... +vshard.router.internal.collect_lua_garbage_cnt == 0 +--- +- true +... +lua_gc.internal.bg_fiber == nil +--- +- true +... +configs.cfg_2.collect_lua_garbage = true +--- +... +routers[5]:cfg(configs.cfg_2) +--- +... +lua_gc.internal.bg_fiber ~= nil +--- +- true +... +routers[7]:cfg(configs.cfg_2) +--- +... +lua_gc.internal.bg_fiber ~= nil +--- +- true +... +vshard.router.internal.collect_lua_garbage_cnt == 2 +--- +- true +... +package.loaded['vshard.router'] = nil +--- +... +vshard.router = require('vshard.router') +--- +... +vshard.router.internal.collect_lua_garbage_cnt == 2 +--- +- true +... +configs.cfg_2.collect_lua_garbage = nil +--- +... +routers[5]:cfg(configs.cfg_2) +--- +... +lua_gc.internal.bg_fiber ~= nil +--- +- true +... +routers[7]:cfg(configs.cfg_2) +--- +... +vshard.router.internal.collect_lua_garbage_cnt == 0 +--- +- true +... +lua_gc.internal.bg_fiber == nil +--- +- true +... +_ = test_run:cmd("switch default") +--- +... +test_run:cmd("stop server router_1") +--- +- true +... +test_run:cmd("cleanup server router_1") +--- +- true +... +test_run:drop_cluster(REPLICASET_1_1) +--- +... +test_run:drop_cluster(REPLICASET_1_2) +--- +... +test_run:drop_cluster(REPLICASET_2_1) +--- +... +test_run:drop_cluster(REPLICASET_2_2) +--- +... diff --git a/test/multiple_routers/multiple_routers.test.lua b/test/multiple_routers/multiple_routers.test.lua new file mode 100644 index 0000000..ec3c7f7 --- /dev/null +++ b/test/multiple_routers/multiple_routers.test.lua @@ -0,0 +1,109 @@ +test_run = require('test_run').new() + +REPLICASET_1_1 = { 'storage_1_1_a', 'storage_1_1_b' } +REPLICASET_1_2 = { 'storage_1_2_a', 'storage_1_2_b' } +REPLICASET_2_1 = { 'storage_2_1_a', 'storage_2_1_b' } +REPLICASET_2_2 = { 'storage_2_2_a', 'storage_2_2_b' } + +test_run:create_cluster(REPLICASET_1_1, 'multiple_routers') +test_run:create_cluster(REPLICASET_1_2, 'multiple_routers') +test_run:create_cluster(REPLICASET_2_1, 'multiple_routers') +test_run:create_cluster(REPLICASET_2_2, 'multiple_routers') +util = require('lua_libs.util') +util.wait_master(test_run, REPLICASET_1_1, 'storage_1_1_a') +util.wait_master(test_run, REPLICASET_1_2, 'storage_1_2_a') +util.wait_master(test_run, REPLICASET_2_1, 'storage_2_1_a') +util.wait_master(test_run, REPLICASET_2_2, 'storage_2_2_a') + +test_run:cmd("create server router_1 with script='multiple_routers/router_1.lua'") +test_run:cmd("start server router_1") + +-- Configure default (static) router. +_ = test_run:cmd("switch router_1") +vshard.router.cfg(configs.cfg_1) +vshard.router.bootstrap() +_ = test_run:cmd("switch storage_1_2_a") +wait_rebalancer_state('The cluster is balanced ok', test_run) +_ = test_run:cmd("switch router_1") + +vshard.router.call(1, 'write', 'do_replace', {{1, 1}}) +vshard.router.call(1, 'read', 'do_select', {1}) + +-- Test that static router is just a router object under the hood. +static_router = vshard.router.internal.static_router +static_router:route(1) == vshard.router.route(1) + +-- Configure extra router. +router_2 = vshard.router.new('router_2', configs.cfg_2) +router_2:bootstrap() +_ = test_run:cmd("switch storage_2_2_a") +wait_rebalancer_state('The cluster is balanced ok', test_run) +_ = test_run:cmd("switch router_1") + +router_2:call(1, 'write', 'do_replace', {{2, 2}}) +router_2:call(1, 'read', 'do_select', {2}) +-- Check that router_2 and static router serves different clusters. +#router_2:call(1, 'read', 'do_select', {1}) == 0 + +-- Create several routers to the same cluster. +routers = {} +for i = 3, 10 do routers[i] = vshard.router.new('router_' .. i, configs.cfg_2) end +routers[3]:call(1, 'read', 'do_select', {2}) +-- Check that they have their own background fibers. +fiber_names = {} +for i = 2, 10 do fiber_names['vshard.failover.router_' .. i] = true; fiber_names['vshard.discovery.router_' .. i] = true; end +next(fiber_names) ~= nil +fiber = require('fiber') +for _, xfiber in pairs(fiber.info()) do fiber_names[xfiber.name] = nil end +next(fiber_names) == nil + +-- Reconfigure one of routers do not affect the others. +routers[3]:cfg(configs.cfg_1) +routers[3]:call(1, 'read', 'do_select', {1}) +#routers[3]:call(1, 'read', 'do_select', {2}) == 0 +#routers[4]:call(1, 'read', 'do_select', {1}) == 0 +routers[4]:call(1, 'read', 'do_select', {2}) +routers[3]:cfg(configs.cfg_2) + +-- Try to create router with the same name. +util = require('lua_libs.util') +util.check_error(vshard.router.new, 'router_2', configs.cfg_2) + +-- Reload router module. +_, old_rs_1 = next(vshard.router.internal.static_router.replicasets) +_, old_rs_2 = next(router_2.replicasets) +package.loaded['vshard.router'] = nil +vshard.router = require('vshard.router') +while not old_rs_1.is_outdated do fiber.sleep(0.01) end +while not old_rs_2.is_outdated do fiber.sleep(0.01) end +vshard.router.call(1, 'read', 'do_select', {1}) +router_2:call(1, 'read', 'do_select', {2}) +routers[5]:call(1, 'read', 'do_select', {2}) + +-- Check lua_gc counter. +lua_gc = require('vshard.lua_gc') +vshard.router.internal.collect_lua_garbage_cnt == 0 +lua_gc.internal.bg_fiber == nil +configs.cfg_2.collect_lua_garbage = true +routers[5]:cfg(configs.cfg_2) +lua_gc.internal.bg_fiber ~= nil +routers[7]:cfg(configs.cfg_2) +lua_gc.internal.bg_fiber ~= nil +vshard.router.internal.collect_lua_garbage_cnt == 2 +package.loaded['vshard.router'] = nil +vshard.router = require('vshard.router') +vshard.router.internal.collect_lua_garbage_cnt == 2 +configs.cfg_2.collect_lua_garbage = nil +routers[5]:cfg(configs.cfg_2) +lua_gc.internal.bg_fiber ~= nil +routers[7]:cfg(configs.cfg_2) +vshard.router.internal.collect_lua_garbage_cnt == 0 +lua_gc.internal.bg_fiber == nil + +_ = test_run:cmd("switch default") +test_run:cmd("stop server router_1") +test_run:cmd("cleanup server router_1") +test_run:drop_cluster(REPLICASET_1_1) +test_run:drop_cluster(REPLICASET_1_2) +test_run:drop_cluster(REPLICASET_2_1) +test_run:drop_cluster(REPLICASET_2_2) diff --git a/test/multiple_routers/router_1.lua b/test/multiple_routers/router_1.lua new file mode 100644 index 0000000..2e9ea91 --- /dev/null +++ b/test/multiple_routers/router_1.lua @@ -0,0 +1,15 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +-- Get instance name +local fio = require('fio') +local NAME = fio.basename(arg[0], '.lua') + +require('console').listen(os.getenv('ADMIN')) + +configs = require('configs') + +-- Start the database with sharding +vshard = require('vshard') +box.cfg{} diff --git a/test/multiple_routers/storage_1_1_a.lua b/test/multiple_routers/storage_1_1_a.lua new file mode 100644 index 0000000..b44a97a --- /dev/null +++ b/test/multiple_routers/storage_1_1_a.lua @@ -0,0 +1,23 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +-- Get instance name. +local fio = require('fio') +NAME = fio.basename(arg[0], '.lua') + +require('console').listen(os.getenv('ADMIN')) + +-- Fetch config for the cluster of the instance. +if NAME:sub(9,9) == '1' then +    cfg = require('configs').cfg_1 +else +    cfg = require('configs').cfg_2 +end + +-- Start the database with sharding. +vshard = require('vshard') +vshard.storage.cfg(cfg, names[NAME]) + +-- Bootstrap storage. +require('lua_libs.bootstrap') diff --git a/test/multiple_routers/storage_1_1_b.lua b/test/multiple_routers/storage_1_1_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_1_1_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_1_2_a.lua b/test/multiple_routers/storage_1_2_a.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_1_2_a.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_1_2_b.lua b/test/multiple_routers/storage_1_2_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_1_2_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_1_a.lua b/test/multiple_routers/storage_2_1_a.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_1_a.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_1_b.lua b/test/multiple_routers/storage_2_1_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_1_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_2_a.lua b/test/multiple_routers/storage_2_2_a.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_2_a.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_2_b.lua b/test/multiple_routers/storage_2_2_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_2_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/suite.ini b/test/multiple_routers/suite.ini new file mode 100644 index 0000000..d2d4470 --- /dev/null +++ b/test/multiple_routers/suite.ini @@ -0,0 +1,6 @@ +[default] +core = tarantool +description = Multiple routers tests +script = test.lua +is_parallel = False +lua_libs = ../lua_libs configs.lua diff --git a/test/multiple_routers/test.lua b/test/multiple_routers/test.lua new file mode 100644 index 0000000..cb7c1ee --- /dev/null +++ b/test/multiple_routers/test.lua @@ -0,0 +1,9 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +box.cfg{ +    listen              = os.getenv("LISTEN"), +} + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/router/exponential_timeout.result b/test/router/exponential_timeout.result index fb54d0f..6748b64 100644 --- a/test/router/exponential_timeout.result +++ b/test/router/exponential_timeout.result @@ -37,10 +37,10 @@ test_run:cmd('switch router_1')  util = require('util')  ---  ... -rs1 = vshard.router.internal.replicasets[replicasets[1]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]  ---  ... -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]  ---  ...  util.collect_timeouts(rs1) diff --git a/test/router/exponential_timeout.test.lua b/test/router/exponential_timeout.test.lua index 3ec0b8c..75d85bf 100644 --- a/test/router/exponential_timeout.test.lua +++ b/test/router/exponential_timeout.test.lua @@ -13,8 +13,8 @@ test_run:cmd("start server router_1")  test_run:cmd('switch router_1')  util = require('util') -rs1 = vshard.router.internal.replicasets[replicasets[1]] -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]  util.collect_timeouts(rs1)  util.collect_timeouts(rs2) diff --git a/test/router/reconnect_to_master.result b/test/router/reconnect_to_master.result index 5e678ce..d502723 100644 --- a/test/router/reconnect_to_master.result +++ b/test/router/reconnect_to_master.result @@ -76,7 +76,7 @@ _ = test_run:cmd('stop server storage_1_a')  _ = test_run:switch('router_1')  ---  ... -reps = vshard.router.internal.replicasets +reps = vshard.router.internal.static_router.replicasets  ---  ...  test_run:cmd("setopt delimiter ';'") @@ -95,7 +95,7 @@ end;  ...  function count_known_buckets()      local known_buckets = 0 -    for _, id in pairs(vshard.router.internal.route_map) do +    for _, id in pairs(vshard.router.internal.static_router.route_map) do          known_buckets = known_buckets + 1      end      return known_buckets @@ -127,7 +127,7 @@ is_disconnected()  fiber = require('fiber')  ---  ... -while vshard.router.internal.replicasets[replicasets[1]].replica == nil do fiber.sleep(0.1) end +while vshard.router.internal.static_router.replicasets[replicasets[1]].replica == nil do fiber.sleep(0.1) end  ---  ...  vshard.router.info() diff --git a/test/router/reconnect_to_master.test.lua b/test/router/reconnect_to_master.test.lua index 39ba90e..8820fa7 100644 --- a/test/router/reconnect_to_master.test.lua +++ b/test/router/reconnect_to_master.test.lua @@ -34,7 +34,7 @@ _ = test_run:cmd('stop server storage_1_a')  _ = test_run:switch('router_1') -reps = vshard.router.internal.replicasets +reps = vshard.router.internal.static_router.replicasets  test_run:cmd("setopt delimiter ';'")  function is_disconnected()      for i, rep in pairs(reps) do @@ -46,7 +46,7 @@ function is_disconnected()  end;  function count_known_buckets()      local known_buckets = 0 -    for _, id in pairs(vshard.router.internal.route_map) do +    for _, id in pairs(vshard.router.internal.static_router.route_map) do          known_buckets = known_buckets + 1      end      return known_buckets @@ -63,7 +63,7 @@ is_disconnected()  -- Wait until replica is connected to test alerts on unavailable  -- master.  fiber = require('fiber') -while vshard.router.internal.replicasets[replicasets[1]].replica == nil do fiber.sleep(0.1) end +while vshard.router.internal.static_router.replicasets[replicasets[1]].replica == nil do fiber.sleep(0.1) end  vshard.router.info()  -- Return master. diff --git a/test/router/reload.result b/test/router/reload.result index f0badc3..98e8e71 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -229,7 +229,7 @@ vshard.router.cfg(cfg)  cfg.connection_outdate_delay = old_connection_delay  ---  ... -vshard.router.internal.connection_outdate_delay = nil +vshard.router.internal.static_router.connection_outdate_delay = nil  ---  ...  rs_new = vshard.router.route(1) diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 528222a..293cb26 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -104,7 +104,7 @@ old_connection_delay = cfg.connection_outdate_delay  cfg.connection_outdate_delay = 0.3  vshard.router.cfg(cfg)  cfg.connection_outdate_delay = old_connection_delay -vshard.router.internal.connection_outdate_delay = nil +vshard.router.internal.static_router.connection_outdate_delay = nil  rs_new = vshard.router.route(1)  rs_old = rs  _, replica_old = next(rs_old.replicas) diff --git a/test/router/reroute_wrong_bucket.result b/test/router/reroute_wrong_bucket.result index 7f2a494..989dc79 100644 --- a/test/router/reroute_wrong_bucket.result +++ b/test/router/reroute_wrong_bucket.result @@ -98,7 +98,7 @@ vshard.router.call(100, 'read', 'customer_lookup', {1}, {timeout = 100})  ---  - {'accounts': [], 'customer_id': 1, 'name': 'name'}  ... -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]]  ---  ...  vshard.router.call(100, 'write', 'customer_add', {{customer_id = 2, bucket_id = 100, name = 'name2', accounts = {}}}, {timeout = 100}) @@ -146,13 +146,13 @@ test_run:switch('router_1')  ...  -- Emulate a situation, when a replicaset_2 while is unknown for  -- router, but is already known for storages. -save_rs2 = vshard.router.internal.replicasets[replicasets[2]] +save_rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]  ---  ... -vshard.router.internal.replicasets[replicasets[2]] = nil +vshard.router.internal.static_router.replicasets[replicasets[2]] = nil  ---  ... -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]]  ---  ...  fiber = require('fiber') @@ -207,7 +207,7 @@ err  require('log').info(string.rep('a', 1000))  ---  ... -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]]  ---  ...  call_retval = nil @@ -219,7 +219,7 @@ f = fiber.create(do_call, 100)  while not test_run:grep_log('router_1', 'please update configuration', 1000) do fiber.sleep(0.1) end  ---  ... -vshard.router.internal.replicasets[replicasets[2]] = save_rs2 +vshard.router.internal.static_router.replicasets[replicasets[2]] = save_rs2  ---  ...  while not call_retval do fiber.sleep(0.1) end diff --git a/test/router/reroute_wrong_bucket.test.lua b/test/router/reroute_wrong_bucket.test.lua index 03384d1..a00f941 100644 --- a/test/router/reroute_wrong_bucket.test.lua +++ b/test/router/reroute_wrong_bucket.test.lua @@ -35,7 +35,7 @@ customer_add({customer_id = 1, bucket_id = 100, name = 'name', accounts = {}})  test_run:switch('router_1')  vshard.router.call(100, 'read', 'customer_lookup', {1}, {timeout = 100}) -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]]  vshard.router.call(100, 'write', 'customer_add', {{customer_id = 2, bucket_id = 100, name = 'name2', accounts = {}}}, {timeout = 100})  -- Create cycle. @@ -55,9 +55,9 @@ box.space._bucket:replace({100, vshard.consts.BUCKET.SENT, replicasets[2]})  test_run:switch('router_1')  -- Emulate a situation, when a replicaset_2 while is unknown for  -- router, but is already known for storages. -save_rs2 = vshard.router.internal.replicasets[replicasets[2]] -vshard.router.internal.replicasets[replicasets[2]] = nil -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +save_rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]] +vshard.router.internal.static_router.replicasets[replicasets[2]] = nil +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]]  fiber = require('fiber')  call_retval = nil @@ -84,11 +84,11 @@ err  -- detect it and end with ok.  --  require('log').info(string.rep('a', 1000)) -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]]  call_retval = nil  f = fiber.create(do_call, 100)  while not test_run:grep_log('router_1', 'please update configuration', 1000) do fiber.sleep(0.1) end -vshard.router.internal.replicasets[replicasets[2]] = save_rs2 +vshard.router.internal.static_router.replicasets[replicasets[2]] = save_rs2  while not call_retval do fiber.sleep(0.1) end  call_retval  vshard.router.call(100, 'read', 'customer_lookup', {3}, {timeout = 1}) diff --git a/test/router/retry_reads.result b/test/router/retry_reads.result index 64b0ff3..b803ae3 100644 --- a/test/router/retry_reads.result +++ b/test/router/retry_reads.result @@ -37,7 +37,7 @@ test_run:cmd('switch router_1')  util = require('util')  ---  ... -rs1 = vshard.router.internal.replicasets[replicasets[1]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]  ---  ...  min_timeout = vshard.consts.CALL_TIMEOUT_MIN diff --git a/test/router/retry_reads.test.lua b/test/router/retry_reads.test.lua index 2fb2fc7..510e961 100644 --- a/test/router/retry_reads.test.lua +++ b/test/router/retry_reads.test.lua @@ -13,7 +13,7 @@ test_run:cmd("start server router_1")  test_run:cmd('switch router_1')  util = require('util') -rs1 = vshard.router.internal.replicasets[replicasets[1]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]  min_timeout = vshard.consts.CALL_TIMEOUT_MIN  -- diff --git a/test/router/router.result b/test/router/router.result index 45394e1..ceaf672 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -70,10 +70,10 @@ test_run:grep_log('router_1', 'connected to ')  ---  - 'connected to '  ... -rs1 = vshard.router.internal.replicasets[replicasets[1]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]  ---  ... -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]  ---  ...  fiber = require('fiber') @@ -95,7 +95,7 @@ rs2.replica == rs2.master  -- Part of gh-76: on reconfiguration do not recreate connections  -- to replicas, that are kept in a new configuration.  -- -old_replicasets = vshard.router.internal.replicasets +old_replicasets = vshard.router.internal.static_router.replicasets  ---  ...  old_connections = {} @@ -127,17 +127,17 @@ connection_count == 4  vshard.router.cfg(cfg)  ---  ... -new_replicasets = vshard.router.internal.replicasets +new_replicasets = vshard.router.internal.static_router.replicasets  ---  ...  old_replicasets ~= new_replicasets  ---  - true  ... -rs1 = vshard.router.internal.replicasets[replicasets[1]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]  ---  ... -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]  ---  ...  while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end @@ -225,7 +225,7 @@ vshard.router.bootstrap()  --  -- gh-108: negative bucket count on discovery.  -- -vshard.router.internal.route_map = {} +vshard.router.internal.static_router.route_map = {}  ---  ...  rets = {} @@ -456,7 +456,7 @@ conn.state  rs_uuid = ''  ---  ... -rs = vshard.router.internal.replicasets[rs_uuid] +rs = vshard.router.internal.static_router.replicasets[rs_uuid]  ---  ...  master = rs.master @@ -605,7 +605,7 @@ vshard.router.info()  ...  -- Remove replica and master connections to trigger alert  -- UNREACHABLE_REPLICASET. -rs = vshard.router.internal.replicasets[replicasets[1]] +rs = vshard.router.internal.static_router.replicasets[replicasets[1]]  ---  ...  master_conn = rs.master.conn @@ -749,7 +749,7 @@ test_run:cmd("setopt delimiter ';'")  ...  function calculate_known_buckets()      local known_buckets = 0 -    for _, rs in pairs(vshard.router.internal.route_map) do +    for _, rs in pairs(vshard.router.internal.static_router.route_map) do          known_buckets = known_buckets + 1      end      return known_buckets @@ -851,10 +851,10 @@ test_run:cmd("setopt delimiter ';'")  - true  ...  for i = 1, 100 do -    local rs = vshard.router.internal.route_map[i] +    local rs = vshard.router.internal.static_router.route_map[i]      assert(rs)      rs.bucket_count = rs.bucket_count - 1 -    vshard.router.internal.route_map[i] = nil +    vshard.router.internal.static_router.route_map[i] = nil  end;  ---  ... @@ -999,7 +999,7 @@ vshard.router.sync(100500)  -- object method like this: object.method() instead of  -- object:method(), an appropriate help-error returns.  -- -_, replicaset = next(vshard.router.internal.replicasets) +_, replicaset = next(vshard.router.internal.static_router.replicasets)  ---  ...  error_messages = {} @@ -1069,7 +1069,7 @@ test_run:cmd("setopt delimiter ';'")  ---  - true  ... -for bucket, rs in pairs(vshard.router.internal.route_map) do +for bucket, rs in pairs(vshard.router.internal.static_router.route_map) do      bucket_to_old_rs[bucket] = rs      bucket_cnt = bucket_cnt + 1  end; @@ -1084,7 +1084,7 @@ vshard.router.cfg(cfg);  ...  for bucket, old_rs in pairs(bucket_to_old_rs) do      local old_uuid = old_rs.uuid -    local rs = vshard.router.internal.route_map[bucket] +    local rs = vshard.router.internal.static_router.route_map[bucket]      if not rs or not old_uuid == rs.uuid then          error("Bucket lost during reconfigure.")      end @@ -1111,7 +1111,7 @@ end;  vshard.router.cfg(cfg);  ---  ... -vshard.router.internal.route_map = {}; +vshard.router.internal.static_router.route_map = {};  ---  ...  vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false; @@ -1119,7 +1119,7 @@ vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false;  ...  -- Do discovery iteration. Upload buckets from the  -- first replicaset. -while not next(vshard.router.internal.route_map) do +while not next(vshard.router.internal.static_router.route_map) do      vshard.router.discovery_wakeup()      fiber.sleep(0.01)  end; @@ -1128,12 +1128,12 @@ end;  new_replicasets = {};  ---  ... -for _, rs in pairs(vshard.router.internal.replicasets) do +for _, rs in pairs(vshard.router.internal.static_router.replicasets) do      new_replicasets[rs] = true  end;  ---  ... -_, rs = next(vshard.router.internal.route_map); +_, rs = next(vshard.router.internal.static_router.route_map);  ---  ...  new_replicasets[rs] == true; @@ -1185,6 +1185,17 @@ vshard.router.route(1):callro('echo', {'some_data'})  - null  - null  ... +-- Multiple routers: check that static router can be used as an +-- object. +static_router = vshard.router.internal.static_router +--- +... +static_router:route(1):callro('echo', {'some_data'}) +--- +- some_data +- null +- null +...  _ = test_run:cmd("switch default")  ---  ... diff --git a/test/router/router.test.lua b/test/router/router.test.lua index df2f381..d7588f7 100644 --- a/test/router/router.test.lua +++ b/test/router/router.test.lua @@ -27,8 +27,8 @@ util = require('util')  -- gh-24: log all connnect/disconnect events.  test_run:grep_log('router_1', 'connected to ') -rs1 = vshard.router.internal.replicasets[replicasets[1]] -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]  fiber = require('fiber')  while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end  -- With no zones the nearest server is master. @@ -39,7 +39,7 @@ rs2.replica == rs2.master  -- Part of gh-76: on reconfiguration do not recreate connections  -- to replicas, that are kept in a new configuration.  -- -old_replicasets = vshard.router.internal.replicasets +old_replicasets = vshard.router.internal.static_router.replicasets  old_connections = {}  connection_count = 0  test_run:cmd("setopt delimiter ';'") @@ -52,10 +52,10 @@ end;  test_run:cmd("setopt delimiter ''");  connection_count == 4  vshard.router.cfg(cfg) -new_replicasets = vshard.router.internal.replicasets +new_replicasets = vshard.router.internal.static_router.replicasets  old_replicasets ~= new_replicasets -rs1 = vshard.router.internal.replicasets[replicasets[1]] -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]  while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end  vshard.router.discovery_wakeup()  -- Check that netbox connections are the same. @@ -91,7 +91,7 @@ vshard.router.bootstrap()  --  -- gh-108: negative bucket count on discovery.  -- -vshard.router.internal.route_map = {} +vshard.router.internal.static_router.route_map = {}  rets = {}  function do_echo() table.insert(rets, vshard.router.callro(1, 'echo', {1})) end  f1 = fiber.create(do_echo) f2 = fiber.create(do_echo) @@ -153,7 +153,7 @@ conn = vshard.router.route(1).master.conn  conn.state  -- Test missing master.  rs_uuid = 'ac522f65-aa94-4134-9f64-51ee384f1a54' -rs = vshard.router.internal.replicasets[rs_uuid] +rs = vshard.router.internal.static_router.replicasets[rs_uuid]  master = rs.master  rs.master = nil  vshard.router.route(1).master @@ -223,7 +223,7 @@ vshard.router.info()  -- Remove replica and master connections to trigger alert  -- UNREACHABLE_REPLICASET. -rs = vshard.router.internal.replicasets[replicasets[1]] +rs = vshard.router.internal.static_router.replicasets[replicasets[1]]  master_conn = rs.master.conn  replica_conn = rs.replica.conn  rs.master.conn = nil @@ -261,7 +261,7 @@ util.check_error(vshard.router.buckets_info, 123, '456')  test_run:cmd("setopt delimiter ';'")  function calculate_known_buckets()      local known_buckets = 0 -    for _, rs in pairs(vshard.router.internal.route_map) do +    for _, rs in pairs(vshard.router.internal.static_router.route_map) do          known_buckets = known_buckets + 1      end      return known_buckets @@ -301,10 +301,10 @@ test_run:switch('router_1')  --  test_run:cmd("setopt delimiter ';'")  for i = 1, 100 do -    local rs = vshard.router.internal.route_map[i] +    local rs = vshard.router.internal.static_router.route_map[i]      assert(rs)      rs.bucket_count = rs.bucket_count - 1 -    vshard.router.internal.route_map[i] = nil +    vshard.router.internal.static_router.route_map[i] = nil  end;  test_run:cmd("setopt delimiter ''");  calculate_known_buckets() @@ -367,7 +367,7 @@ vshard.router.sync(100500)  -- object method like this: object.method() instead of  -- object:method(), an appropriate help-error returns.  -- -_, replicaset = next(vshard.router.internal.replicasets) +_, replicaset = next(vshard.router.internal.static_router.replicasets)  error_messages = {}  test_run:cmd("setopt delimiter ';'") @@ -395,7 +395,7 @@ error_messages  bucket_to_old_rs = {}  bucket_cnt = 0  test_run:cmd("setopt delimiter ';'") -for bucket, rs in pairs(vshard.router.internal.route_map) do +for bucket, rs in pairs(vshard.router.internal.static_router.route_map) do      bucket_to_old_rs[bucket] = rs      bucket_cnt = bucket_cnt + 1  end; @@ -403,7 +403,7 @@ bucket_cnt;  vshard.router.cfg(cfg);  for bucket, old_rs in pairs(bucket_to_old_rs) do      local old_uuid = old_rs.uuid -    local rs = vshard.router.internal.route_map[bucket] +    local rs = vshard.router.internal.static_router.route_map[bucket]      if not rs or not old_uuid == rs.uuid then          error("Bucket lost during reconfigure.")      end @@ -423,19 +423,19 @@ while vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY ~= 'waiting' do      fiber.sleep(0.02)  end;  vshard.router.cfg(cfg); -vshard.router.internal.route_map = {}; +vshard.router.internal.static_router.route_map = {};  vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false;  -- Do discovery iteration. Upload buckets from the  -- first replicaset. -while not next(vshard.router.internal.route_map) do +while not next(vshard.router.internal.static_router.route_map) do      vshard.router.discovery_wakeup()      fiber.sleep(0.01)  end;  new_replicasets = {}; -for _, rs in pairs(vshard.router.internal.replicasets) do +for _, rs in pairs(vshard.router.internal.static_router.replicasets) do      new_replicasets[rs] = true  end; -_, rs = next(vshard.router.internal.route_map); +_, rs = next(vshard.router.internal.static_router.route_map);  new_replicasets[rs] == true;  test_run:cmd("setopt delimiter ''"); @@ -453,6 +453,11 @@ vshard.router.internal.errinj.ERRINJ_CFG = false  util.has_same_fields(old_internal, vshard.router.internal)  vshard.router.route(1):callro('echo', {'some_data'}) +-- Multiple routers: check that static router can be used as an +-- object. +static_router = vshard.router.internal.static_router +static_router:route(1):callro('echo', {'some_data'}) +  _ = test_run:cmd("switch default")  test_run:drop_cluster(REPLICASET_2) diff --git a/vshard/error.lua b/vshard/error.lua index f79107b..da92b58 100644 --- a/vshard/error.lua +++ b/vshard/error.lua @@ -105,7 +105,12 @@ local error_message_template = {          name = 'OBJECT_IS_OUTDATED',          msg = 'Object is outdated after module reload/reconfigure. ' ..                'Use new instance.' -    } +    }, +    [21] = { +        name = 'ROUTER_ALREADY_EXISTS', +        msg = 'Router with name %s already exists', +        args = {'name'}, +    },  }  -- diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 69cd37c..7ab2145 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -26,14 +26,33 @@ local M = rawget(_G, MODULE_INTERNALS)  if not M then      M = {          ---------------- Common module attributes ---------------- -        -- The last passed configuration. -        current_cfg = nil,          errinj = {              ERRINJ_CFG = false,              ERRINJ_FAILOVER_CHANGE_CFG = false,              ERRINJ_RELOAD = false,              ERRINJ_LONG_DISCOVERY = false,          }, +        -- Dictionary, key is router name, value is a router. +        routers = {}, +        -- Router object which can be accessed by old api: +        -- e.g. vshard.router.call(...) +        static_router = nil, +        -- This counter is used to restart background fibers with +        -- new reloaded code. +        module_version = 0, +        -- Number of router which require collecting lua garbage. +        collect_lua_garbage_cnt = 0, +    } +end + +-- +-- Router object attributes. +-- +local ROUTER_TEMPLATE = { +        -- Name of router. +        name = nil, +        -- The last passed configuration. +        current_cfg = nil,          -- Time to outdate old objects on reload.          connection_outdate_delay = nil,          -- Bucket map cache. @@ -48,38 +67,60 @@ if not M then          total_bucket_count = 0,          -- Boolean lua_gc state (create periodic gc task).          collect_lua_garbage = nil, -        -- This counter is used to restart background fibers with -        -- new reloaded code. -        module_version = 0, -    } -end +} + +local STATIC_ROUTER_NAME = '_static_router'  -- Set a bucket to a replicaset. -local function bucket_set(bucket_id, rs_uuid) -    local replicaset = M.replicasets[rs_uuid] +local function bucket_set(router, bucket_id, rs_uuid) +    local replicaset = router.replicasets[rs_uuid]      -- It is technically possible to delete a replicaset at the      -- same time when route to the bucket is discovered.      if not replicaset then          return nil, lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET, bucket_id)      end -    local old_replicaset = M.route_map[bucket_id] +    local old_replicaset = router.route_map[bucket_id]      if old_replicaset ~= replicaset then          if old_replicaset then              old_replicaset.bucket_count = old_replicaset.bucket_count - 1          end          replicaset.bucket_count = replicaset.bucket_count + 1      end -    M.route_map[bucket_id] = replicaset +    router.route_map[bucket_id] = replicaset      return replicaset  end  -- Remove a bucket from the cache. -local function bucket_reset(bucket_id) -    local replicaset = M.route_map[bucket_id] +local function bucket_reset(router, bucket_id) +    local replicaset = router.route_map[bucket_id]      if replicaset then          replicaset.bucket_count = replicaset.bucket_count - 1      end -    M.route_map[bucket_id] = nil +    router.route_map[bucket_id] = nil +end + +-------------------------------------------------------------------------------- +-- Helpers +-------------------------------------------------------------------------------- + +-- +-- Increase/decrease number of routers which require to collect +-- a lua garbage and change state of the `lua_gc` fiber. +-- + +local function lua_gc_cnt_inc() +    M.collect_lua_garbage_cnt = M.collect_lua_garbage_cnt + 1 +    if M.collect_lua_garbage_cnt == 1 then +        lua_gc.set_state(true, consts.COLLECT_LUA_GARBAGE_INTERVAL) +    end +end + +local function lua_gc_cnt_dec() +    M.collect_lua_garbage_cnt = M.collect_lua_garbage_cnt - 1 +    assert(M.collect_lua_garbage_cnt >= 0) +    if M.collect_lua_garbage_cnt == 0 then +        lua_gc.set_state(false, consts.COLLECT_LUA_GARBAGE_INTERVAL) +    end  end  -------------------------------------------------------------------------------- @@ -87,8 +128,8 @@ end  --------------------------------------------------------------------------------  -- Search bucket in whole cluster -local function bucket_discovery(bucket_id) -    local replicaset = M.route_map[bucket_id] +local function bucket_discovery(router, bucket_id) +    local replicaset = router.route_map[bucket_id]      if replicaset ~= nil then          return replicaset      end @@ -96,11 +137,11 @@ local function bucket_discovery(bucket_id)      log.verbose("Discovering bucket %d", bucket_id)      local last_err = nil      local unreachable_uuid = nil -    for uuid, replicaset in pairs(M.replicasets) do +    for uuid, replicaset in pairs(router.replicasets) do          local _, err =              replicaset:callrw('vshard.storage.bucket_stat', {bucket_id})          if err == nil then -            return bucket_set(bucket_id, replicaset.uuid) +            return bucket_set(router, bucket_id, replicaset.uuid)          elseif err.code ~= lerror.code.WRONG_BUCKET then              last_err = err              unreachable_uuid = uuid @@ -129,14 +170,14 @@ local function bucket_discovery(bucket_id)  end  -- Resolve bucket id to replicaset uuid -local function bucket_resolve(bucket_id) +local function bucket_resolve(router, bucket_id)      local replicaset, err -    local replicaset = M.route_map[bucket_id] +    local replicaset = router.route_map[bucket_id]      if replicaset ~= nil then          return replicaset      end      -- Replicaset removed from cluster, perform discovery -    replicaset, err = bucket_discovery(bucket_id) +    replicaset, err = bucket_discovery(router, bucket_id)      if replicaset == nil then          return nil, err      end @@ -147,14 +188,14 @@ end  -- Background fiber to perform discovery. It periodically scans  -- replicasets one by one and updates route_map.  -- -local function discovery_f() +local function discovery_f(router)      local module_version = M.module_version      while module_version == M.module_version do -        while not next(M.replicasets) do +        while not next(router.replicasets) do              lfiber.sleep(consts.DISCOVERY_INTERVAL)          end -        local old_replicasets = M.replicasets -        for rs_uuid, replicaset in pairs(M.replicasets) do +        local old_replicasets = router.replicasets +        for rs_uuid, replicaset in pairs(router.replicasets) do              local active_buckets, err = replicaset:callro('vshard.storage.buckets_discovery', {},                                    {timeout = 2}) @@ -164,7 +205,7 @@ local function discovery_f()              end              -- Renew replicasets object captured by the for loop              -- in case of reconfigure and reload events. -            if M.replicasets ~= old_replicasets then +            if router.replicasets ~= old_replicasets then                  break              end              if not active_buckets then @@ -177,11 +218,11 @@ local function discovery_f()                  end                  replicaset.bucket_count = #active_buckets                  for _, bucket_id in pairs(active_buckets) do -                    local old_rs = M.route_map[bucket_id] +                    local old_rs = router.route_map[bucket_id]                      if old_rs and old_rs ~= replicaset then                          old_rs.bucket_count = old_rs.bucket_count - 1                      end -                    M.route_map[bucket_id] = replicaset +                    router.route_map[bucket_id] = replicaset                  end              end              lfiber.sleep(consts.DISCOVERY_INTERVAL) @@ -192,9 +233,9 @@ end  --  -- Immediately wakeup discovery fiber if exists.  -- -local function discovery_wakeup() -    if M.discovery_fiber then -        M.discovery_fiber:wakeup() +local function discovery_wakeup(router) +    if router.discovery_fiber then +        router.discovery_fiber:wakeup()      end  end @@ -206,7 +247,7 @@ end  -- Function will restart operation after wrong bucket response until timeout  -- is reached  -- -local function router_call(bucket_id, mode, func, args, opts) +local function router_call(router, bucket_id, mode, func, args, opts)      if opts and (type(opts) ~= 'table' or                   (opts.timeout and type(opts.timeout) ~= 'number')) then          error('Usage: call(bucket_id, mode, func, args, opts)') @@ -214,7 +255,7 @@ local function router_call(bucket_id, mode, func, args, opts)      local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN      local replicaset, err      local tend = lfiber.time() + timeout -    if bucket_id > M.total_bucket_count or bucket_id <= 0 then +    if bucket_id > router.total_bucket_count or bucket_id <= 0 then          error('Bucket is unreachable: bucket id is out of range')      end      local call @@ -224,7 +265,7 @@ local function router_call(bucket_id, mode, func, args, opts)          call = 'callrw'      end      repeat -        replicaset, err = bucket_resolve(bucket_id) +        replicaset, err = bucket_resolve(router, bucket_id)          if replicaset then  ::replicaset_is_found::              local storage_call_status, call_status, call_error = @@ -240,9 +281,9 @@ local function router_call(bucket_id, mode, func, args, opts)              end              err = call_status              if err.code == lerror.code.WRONG_BUCKET then -                bucket_reset(bucket_id) +                bucket_reset(router, bucket_id)                  if err.destination then -                    replicaset = M.replicasets[err.destination] +                    replicaset = router.replicasets[err.destination]                      if not replicaset then                          log.warn('Replicaset "%s" was not found, but received'..                                   ' from storage as destination - please '.. @@ -254,13 +295,14 @@ local function router_call(bucket_id, mode, func, args, opts)                          -- but already is executed on storages.                          while lfiber.time() <= tend do                              lfiber.sleep(0.05) -                            replicaset = M.replicasets[err.destination] +                            replicaset = router.replicasets[err.destination]                              if replicaset then                                  goto replicaset_is_found                              end                          end                      else -                        replicaset = bucket_set(bucket_id, replicaset.uuid) +                        replicaset = bucket_set(router, bucket_id, +                                                replicaset.uuid)                          lfiber.yield()                          -- Protect against infinite cycle in a                          -- case of broken cluster, when a bucket @@ -277,7 +319,7 @@ local function router_call(bucket_id, mode, func, args, opts)                  -- is not timeout - these requests are repeated in                  -- any case on client, if error.                  assert(mode == 'write') -                bucket_reset(bucket_id) +                bucket_reset(router, bucket_id)                  return nil, err              elseif err.code == lerror.code.NON_MASTER then                  -- Same, as above - do not wait and repeat. @@ -303,12 +345,12 @@ end  --  -- Wrappers for router_call with preset mode.  -- -local function router_callro(bucket_id, ...) -    return router_call(bucket_id, 'read', ...) +local function router_callro(router, bucket_id, ...) +    return router_call(router, bucket_id, 'read', ...)  end -local function router_callrw(bucket_id, ...) -    return router_call(bucket_id, 'write', ...) +local function router_callrw(router, bucket_id, ...) +    return router_call(router, bucket_id, 'write', ...)  end  -- @@ -316,27 +358,27 @@ end  -- @param bucket_id Bucket identifier.  -- @retval Netbox connection.  -- -local function router_route(bucket_id) +local function router_route(router, bucket_id)      if type(bucket_id) ~= 'number' then          error('Usage: router.route(bucket_id)')      end -    return bucket_resolve(bucket_id) +    return bucket_resolve(router, bucket_id)  end  --  -- Return map of all replicasets.  -- @retval See self.replicasets map.  -- -local function router_routeall() -    return M.replicasets +local function router_routeall(router) +    return router.replicasets  end  --------------------------------------------------------------------------------  -- Failover  -------------------------------------------------------------------------------- -local function failover_ping_round() -    for _, replicaset in pairs(M.replicasets) do +local function failover_ping_round(router) +    for _, replicaset in pairs(router.replicasets) do          local replica = replicaset.replica          if replica ~= nil and replica.conn ~= nil and             replica.down_ts == nil then @@ -379,10 +421,10 @@ end  -- Collect UUIDs of replicasets, priority of whose replica  -- connections must be updated.  -- -local function failover_collect_to_update() +local function failover_collect_to_update(router)      local ts = lfiber.time()      local uuid_to_update = {} -    for uuid, rs in pairs(M.replicasets) do +    for uuid, rs in pairs(router.replicasets) do          if failover_need_down_priority(rs, ts) or             failover_need_up_priority(rs, ts) then              table.insert(uuid_to_update, uuid) @@ -397,16 +439,16 @@ end  -- disconnected replicas.  -- @retval true A replica of an replicaset has been changed.  -- -local function failover_step() -    failover_ping_round() -    local uuid_to_update = failover_collect_to_update() +local function failover_step(router) +    failover_ping_round(router) +    local uuid_to_update = failover_collect_to_update(router)      if #uuid_to_update == 0 then          return false      end      local curr_ts = lfiber.time()      local replica_is_changed = false      for _, uuid in pairs(uuid_to_update) do -        local rs = M.replicasets[uuid] +        local rs = router.replicasets[uuid]          if M.errinj.ERRINJ_FAILOVER_CHANGE_CFG then              rs = nil              M.errinj.ERRINJ_FAILOVER_CHANGE_CFG = false @@ -448,7 +490,7 @@ end  -- tries to reconnect to the best replica. When the connection is  -- established, it replaces the original replica.  -- -local function failover_f() +local function failover_f(router)      local module_version = M.module_version      local min_timeout = math.min(consts.FAILOVER_UP_TIMEOUT,                                   consts.FAILOVER_DOWN_TIMEOUT) @@ -458,7 +500,7 @@ local function failover_f()      local prev_was_ok = false      while module_version == M.module_version do  ::continue:: -        local ok, replica_is_changed = pcall(failover_step) +        local ok, replica_is_changed = pcall(failover_step, router)          if not ok then              log.error('Error during failovering: %s',                        lerror.make(replica_is_changed)) @@ -485,8 +527,8 @@ end  -- Configuration  -------------------------------------------------------------------------------- -local function router_cfg(cfg, is_reload) -    cfg = lcfg.check(cfg, M.current_cfg) +local function router_cfg(router, cfg, is_reload) +    cfg = lcfg.check(cfg, router.current_cfg)      local vshard_cfg, box_cfg = lcfg.split(cfg)      if not M.replicasets then          log.info('Starting router configuration') @@ -511,45 +553,49 @@ local function router_cfg(cfg, is_reload)      -- Move connections from an old configuration to a new one.      -- It must be done with no yields to prevent usage both of not      -- fully moved old replicasets, and not fully built new ones. -    lreplicaset.rebind_replicasets(new_replicasets, M.replicasets) +    lreplicaset.rebind_replicasets(new_replicasets, router.replicasets)      -- Now the new replicasets are fully built. Can establish      -- connections and yield.      for _, replicaset in pairs(new_replicasets) do          replicaset:connect_all()      end +    -- Change state of lua GC. +    if vshard_cfg.collect_lua_garbage and not router.collect_lua_garbage then +        lua_gc_cnt_inc() +    elseif not vshard_cfg.collect_lua_garbage and +       router.collect_lua_garbage then +        lua_gc_cnt_dec() +    end      lreplicaset.wait_masters_connect(new_replicasets) -    lreplicaset.outdate_replicasets(M.replicasets, +    lreplicaset.outdate_replicasets(router.replicasets, vshard_cfg.connection_outdate_delay) -    M.connection_outdate_delay = vshard_cfg.connection_outdate_delay -    M.total_bucket_count = vshard_cfg.bucket_count -    M.collect_lua_garbage = vshard_cfg.collect_lua_garbage -    M.current_cfg = cfg -    M.replicasets = new_replicasets -    local old_route_map = M.route_map -    M.route_map = table_new(M.total_bucket_count, 0) +    router.connection_outdate_delay = vshard_cfg.connection_outdate_delay +    router.total_bucket_count = vshard_cfg.bucket_count +    router.collect_lua_garbage = vshard_cfg.collect_lua_garbage +    router.current_cfg = cfg +    router.replicasets = new_replicasets +    local old_route_map = router.route_map +    router.route_map = table_new(router.total_bucket_count, 0)      for bucket, rs in pairs(old_route_map) do -        M.route_map[bucket] = M.replicasets[rs.uuid] +        router.route_map[bucket] = router.replicasets[rs.uuid]      end -    if M.failover_fiber == nil then -        M.failover_fiber = util.reloadable_fiber_create('vshard.failover', M, - 'failover_f') +    if router.failover_fiber == nil then +        router.failover_fiber = util.reloadable_fiber_create( +            'vshard.failover.' .. router.name, M, 'failover_f', router)      end -    if M.discovery_fiber == nil then -        M.discovery_fiber = util.reloadable_fiber_create('vshard.discovery', M, - 'discovery_f') +    if router.discovery_fiber == nil then +        router.discovery_fiber = util.reloadable_fiber_create( +            'vshard.discovery.' .. router.name, M, 'discovery_f', router)      end -    lua_gc.set_state(M.collect_lua_garbage, consts.COLLECT_LUA_GARBAGE_INTERVAL) -    -- Destroy connections, not used in a new configuration. -    collectgarbage()  end  --------------------------------------------------------------------------------  -- Bootstrap  -------------------------------------------------------------------------------- -local function cluster_bootstrap() +local function cluster_bootstrap(router)      local replicasets = {} -    for uuid, replicaset in pairs(M.replicasets) do +    for uuid, replicaset in pairs(router.replicasets) do          table.insert(replicasets, replicaset)          local count, err = replicaset:callrw('vshard.storage.buckets_count',                                               {}) @@ -560,9 +606,10 @@ local function cluster_bootstrap()              return nil, lerror.vshard(lerror.code.NON_EMPTY)          end      end -    lreplicaset.calculate_etalon_balance(M.replicasets, M.total_bucket_count) +    lreplicaset.calculate_etalon_balance(router.replicasets, +                                         router.total_bucket_count)      local bucket_id = 1 -    for uuid, replicaset in pairs(M.replicasets) do +    for uuid, replicaset in pairs(router.replicasets) do          if replicaset.etalon_bucket_count > 0 then              local ok, err = replicaset:callrw('vshard.storage.bucket_force_create', @@ -618,7 +665,7 @@ local function replicaset_instance_info(replicaset, name, alerts, errcolor,      return info, consts.STATUS.GREEN  end -local function router_info() +local function router_info(router)      local state = {          replicasets = {},          bucket = { @@ -632,7 +679,7 @@ local function router_info()      }      local bucket_info = state.bucket      local known_bucket_count = 0 -    for rs_uuid, replicaset in pairs(M.replicasets) do +    for rs_uuid, replicaset in pairs(router.replicasets) do          -- Replicaset info parameters:          -- * master instance info;          -- * replica instance info; @@ -720,7 +767,7 @@ local function router_info()          -- If a bucket is unreachable, then replicaset is          -- unreachable too and color already is red.      end -    bucket_info.unknown = M.total_bucket_count - known_bucket_count +    bucket_info.unknown = router.total_bucket_count - known_bucket_count      if bucket_info.unknown > 0 then          state.status = math.max(state.status, consts.STATUS.YELLOW)          table.insert(state.alerts, lerror.alert(lerror.code.UNKNOWN_BUCKETS, @@ -737,13 +784,13 @@ end  -- @param limit Maximal bucket count in output.  -- @retval Map of type {bucket_id = 'unknown'/replicaset_uuid}.  -- -local function router_buckets_info(offset, limit) +local function router_buckets_info(router, offset, limit)      if offset ~= nil and type(offset) ~= 'number' or         limit ~= nil and type(limit) ~= 'number' then          error('Usage: buckets_info(offset, limit)')      end      offset = offset or 0 -    limit = limit or M.total_bucket_count +    limit = limit or router.total_bucket_count      local ret = {}      -- Use one string memory for all unknown buckets.      local available_rw = 'available_rw' @@ -752,9 +799,9 @@ local function router_buckets_info(offset, limit)      local unreachable = 'unreachable'      -- Collect limit.      local first = math.max(1, offset + 1) -    local last = math.min(offset + limit, M.total_bucket_count) +    local last = math.min(offset + limit, router.total_bucket_count)      for bucket_id = first, last do -        local rs = M.route_map[bucket_id] +        local rs = router.route_map[bucket_id]          if rs then              if rs.master and rs.master:is_connected() then                  ret[bucket_id] = {uuid = rs.uuid, status = available_rw} @@ -774,22 +821,22 @@ end  -- Other  -------------------------------------------------------------------------------- -local function router_bucket_id(key) +local function router_bucket_id(router, key)      if key == nil then          error("Usage: vshard.router.bucket_id(key)")      end -    return lhash.key_hash(key) % M.total_bucket_count + 1 +    return lhash.key_hash(key) % router.total_bucket_count + 1  end -local function router_bucket_count() -    return M.total_bucket_count +local function router_bucket_count(router) +    return router.total_bucket_count  end -local function router_sync(timeout) +local function router_sync(router, timeout)      if timeout ~= nil and type(timeout) ~= 'number' then          error('Usage: vshard.router.sync([timeout: number])')      end -    for rs_uuid, replicaset in pairs(M.replicasets) do +    for rs_uuid, replicaset in pairs(router.replicasets) do          local status, err = replicaset:callrw('vshard.storage.sync', {timeout})          if not status then              -- Add information about replicaset @@ -803,6 +850,94 @@ if M.errinj.ERRINJ_RELOAD then      error('Error injection: reload')  end +-------------------------------------------------------------------------------- +-- Managing router instances +-------------------------------------------------------------------------------- + +local function cfg_reconfigure(router, cfg) +    return router_cfg(router, cfg, false) +end + +local router_mt = { +    __index = { +        cfg = cfg_reconfigure; +        info = router_info; +        buckets_info = router_buckets_info; +        call = router_call; +        callro = router_callro; +        callrw = router_callrw; +        route = router_route; +        routeall = router_routeall; +        bucket_id = router_bucket_id; +        bucket_count = router_bucket_count; +        sync = router_sync; +        bootstrap = cluster_bootstrap; +        bucket_discovery = bucket_discovery; +        discovery_wakeup = discovery_wakeup; +    } +} + +-- Table which represents this module. +local module = {} + +-- This metatable bypasses calls to a module to the static_router. +local module_mt = {__index = {}} +for method_name, method in pairs(router_mt.__index) do +    module_mt.__index[method_name] = function(...) +        return method(M.static_router, ...) +    end +end + +local function export_static_router_attributes() +    setmetatable(module, module_mt) +end + +-- +-- Create a new instance of router. +-- @param name Name of a new router. +-- @param cfg Configuration for `router_cfg`. +-- @retval Router instance. +-- @retval Nil and error object. +-- +local function router_new(name, cfg) +    if type(name) ~= 'string' or type(cfg) ~= 'table' then +           error('Wrong argument type. Usage: vshard.router.new(name, cfg).') +    end +    if M.routers[name] then +        return nil, lerror.vshard(lerror.code.ROUTER_ALREADY_EXISTS, name) +    end +    local router = table.deepcopy(ROUTER_TEMPLATE) +    setmetatable(router, router_mt) +    router.name = name +    M.routers[name] = router +    local ok, err = pcall(router_cfg, router, cfg) +    if not ok then +        M.routers[name] = nil +        error(err) +    end +    return router +end + +-- +-- Wrapper around a `router_new` API, which allow to use old +-- static `vshard.router.cfg()` API. +-- +local function legacy_cfg(cfg) +    if M.static_router then +        -- Reconfigure. +        router_cfg(M.static_router, cfg, false) +    else +        -- Create new static instance. +        local router, err = router_new(STATIC_ROUTER_NAME, cfg) +        if router then +            M.static_router = router +            export_static_router_attributes() +        else +            return nil, err +        end +    end +end +  --------------------------------------------------------------------------------  -- Module definition  -------------------------------------------------------------------------------- @@ -813,28 +948,23 @@ end  if not rawget(_G, MODULE_INTERNALS) then      rawset(_G, MODULE_INTERNALS, M)  else -    router_cfg(M.current_cfg, true) +    for _, router in pairs(M.routers) do +        router_cfg(router, router.current_cfg, true) +        setmetatable(router, router_mt) +    end +    if M.static_router then +        export_static_router_attributes() +    end      M.module_version = M.module_version + 1  end  M.discovery_f = discovery_f  M.failover_f = failover_f +M.router_mt = router_mt -return { -    cfg = function(cfg) return router_cfg(cfg, false) end; -    info = router_info; -    buckets_info = router_buckets_info; -    call = router_call; -    callro = router_callro; -    callrw = router_callrw; -    route = router_route; -    routeall = router_routeall; -    bucket_id = router_bucket_id; -    bucket_count = router_bucket_count; -    sync = router_sync; -    bootstrap = cluster_bootstrap; -    bucket_discovery = bucket_discovery; -    discovery_wakeup = discovery_wakeup; -    internal = M; -    module_version = function() return M.module_version end; -} +module.cfg = legacy_cfg +module.new = router_new +module.internal = M +module.module_version = function() return M.module_version end + +return module diff --git a/vshard/util.lua b/vshard/util.lua index 37abe2b..3afaa61 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -38,11 +38,11 @@ end  -- reload of that module.  -- See description of parameters in `reloadable_fiber_create`.  -- -local function reloadable_fiber_main_loop(module, func_name) +local function reloadable_fiber_main_loop(module, func_name, data)      log.info('%s has been started', func_name)      local func = module[func_name]  ::restart_loop:: -    local ok, err = pcall(func) +    local ok, err = pcall(func, data)      -- yield serves two purposes:      --  * makes this fiber cancellable      --  * prevents 100% cpu consumption @@ -60,7 +60,7 @@ local function reloadable_fiber_main_loop(module, func_name)      log.info('module is reloaded, restarting')      -- luajit drops this frame if next function is called in      -- return statement. -    return M.reloadable_fiber_main_loop(module, func_name) +    return M.reloadable_fiber_main_loop(module, func_name, data)  end  -- @@ -74,11 +74,13 @@ end  -- @param module Module which can be reloaded.  -- @param func_name Name of a function to be executed in the  --        module. +-- @param data Data to be passed to the specified function.  -- @retval New fiber.  -- -local function reloadable_fiber_create(fiber_name, module, func_name) +local function reloadable_fiber_create(fiber_name, module, func_name, data)      assert(type(fiber_name) == 'string') -    local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name) +    local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name, +                                data)      xfiber:name(fiber_name)      return xfiber  end