[tarantool-patches] Re: [PATCH 3/3] Introduce multiple routers feature
Alex Khatskevich
avkhatskevich at tarantool.org
Tue Aug 7 16:18:25 MSK 2018
On 06.08.2018 20:03, Vladislav Shpilevoy wrote:
> Thanks for the patch! See 8 comments below.
>
> 1. You did not send a full diff. There are tests only. (In
> this email I pasted it myself). Please, send a full diff
> next times.
Ok, sorry.
>
>>>> + if M.routers[name] then
>>>> + return nil, string.format('Router with name %s already
>>>> exists', name)
>>>> + end
>>>> + local router = table.deepcopy(ROUTER_TEMPLATE)
>>>> + setmetatable(router, router_mt)
>>>> + router.name = name
>>>> + M.routers[name] = router
>>>> + if name == STATIC_ROUTER_NAME then
>>>> + M.static_router = router
>>>> + export_static_router_attributes()
>>>> + end
>>>
>>> 9. This check can be removed if you move
>>> export_static_router_attributes call into legacy_cfg.
>> Butbue to this if, the static router can be configured by
>> `vshard.box.new(static_router_name)`.
>
> 2. It is not ok. A user should not use any internal names like
> _statis_router to configure it and get. Please, add a new member
> vshard.router.static that references the statis one. Until cfg
> is called it is nil.
Fixed.
By now, user can create a static router only by calling
`vshard.router.cfg()`
>
>> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>> index 3e127cb..62fdcda 100644
>> --- a/vshard/router/init.lua
>> +++ b/vshard/router/init.lua
>> @@ -25,14 +25,32 @@ local M = rawget(_G, MODULE_INTERNALS)
>> if not M then
>> M = {
>> ---------------- Common module attributes ----------------
>> - -- The last passed configuration.
>> - current_cfg = nil,
>> errinj = {
>> ERRINJ_CFG = false,
>> ERRINJ_FAILOVER_CHANGE_CFG = false,
>> ERRINJ_RELOAD = false,
>> ERRINJ_LONG_DISCOVERY = false,
>> },
>> + -- Dictionary, key is router name, value is a router.
>> + routers = {},
>> + -- Router object which can be accessed by old api:
>> + -- e.g. vshard.router.call(...)
>> + static_router = nil,
>> + -- This counter is used to restart background fibers with
>> + -- new reloaded code.
>> + module_version = 0,
>> + collect_lua_garbage_cnt = 0,
>
> 3. A comment?
added
-- Number of router which require collecting lua garbage.
>
>> + }
>> +end
>> +
>> +--
>> +-- Router object attributes.
>> +--
>> +local ROUTER_TEMPLATE = {
>> + -- Name of router.
>> + name = nil,
>> + -- The last passed configuration.
>> + current_cfg = nil,
>> -- Time to outdate old objects on reload.
>> connection_outdate_delay = nil,
>> -- Bucket map cache.> @@ -488,8 +505,20 @@ end
>> -- Configuration
>> --------------------------------------------------------------------------------
>>
>>
>> -local function router_cfg(cfg)
>> - local vshard_cfg, box_cfg = lcfg.check(cfg, M.current_cfg)
>> +local function change_lua_gc_cnt(val)
>
> 4. The same.
fixed
>
>> + assert(M.collect_lua_garbage_cnt >= 0)
>> + local prev_cnt = M.collect_lua_garbage_cnt
>> + M.collect_lua_garbage_cnt = M.collect_lua_garbage_cnt + val
>> + if prev_cnt == 0 and M.collect_lua_garbage_cnt > 0 then
>> + lua_gc.set_state(true, consts.COLLECT_LUA_GARBAGE_INTERVAL)
>> + end
>> + if prev_cnt > 0 and M.collect_lua_garbage_cnt == 0 then
>> + lua_gc.set_state(false, consts.COLLECT_LUA_GARBAGE_INTERVAL)
>> + end
>
> 5. You know the concrete val in the caller always: 1 or -1. I think
> it would look simpler if you split this function into separate inc
> and dec ones. The former checks for prev == 0 and new > 0, the later
> checks for prev > 0 and new == 0. It is not needed to check both
> each time.
changed
>
>> +end
>> +
>> +local function router_cfg(router, cfg)
>> + local vshard_cfg, box_cfg = lcfg.check(cfg, router.current_cfg)
>> if not M.replicasets then
>> log.info('Starting router configuration')
>> else
>> @@ -803,6 +839,77 @@ if M.errinj.ERRINJ_RELOAD then
>> error('Error injection: reload')
>> end
>>
>> +--------------------------------------------------------------------------------
>>
>> +-- Managing router instances
>> +--------------------------------------------------------------------------------
>>
>> +
>> +local function cfg_reconfigure(router, cfg)
>> + return router_cfg(router, cfg)
>> +end
>> +
>> +local router_mt = {
>> + __index = {
>> + cfg = cfg_reconfigure;
>> + info = router_info;
>> + buckets_info = router_buckets_info;
>> + call = router_call;
>> + callro = router_callro;
>> + callrw = router_callrw;
>> + route = router_route;
>> + routeall = router_routeall;
>> + bucket_id = router_bucket_id;
>> + bucket_count = router_bucket_count;
>> + sync = router_sync;
>> + bootstrap = cluster_bootstrap;
>> + bucket_discovery = bucket_discovery;
>> + discovery_wakeup = discovery_wakeup;
>> + }
>> +}
>> +
>> +-- Table which represents this module.
>> +local module = {}
>> +
>> +-- This metatable bypasses calls to a module to the static_router.
>> +local module_mt = {__index = {}}
>> +for method_name, method in pairs(router_mt.__index) do
>> + module_mt.__index[method_name] = function(...)
>> + return method(M.static_router, ...)
>> + end
>> +end
>> +
>> +local function export_static_router_attributes()
>> + setmetatable(module, module_mt)
>> +end
>> +
>> +local function router_new(name, cfg)
>
> 6. A comment?
added
>
> 7. This function should not check for router_name == static one.
> It just creates a new router and returns it. The caller should set
> it into M.routers or M.static_router if needed. For a user you
> expose not this function but a wrapper that calls router_new and
> sets M.routers.
fixed.
>
>> + if type(name) ~= 'string' or type(cfg) ~= 'table' then
>> + error('Wrong argument type. Usage:
>> vshard.router.new(name, cfg).')
>> + end
>> + if M.routers[name] then
>> + return nil, string.format('Router with name %s already
>> exists', name)
>> + end
>> + local router = table.deepcopy(ROUTER_TEMPLATE)
>> + setmetatable(router, router_mt)
>> + router.name = name
>> + M.routers[name] = router
>> + if name == STATIC_ROUTER_NAME then
>> + M.static_router = router
>> + export_static_router_attributes()
>> + end
>> + router_cfg(router, cfg)
>> + return router
>> +end
>> +
>> @@ -813,28 +920,23 @@ end
>> if not rawget(_G, MODULE_INTERNALS) then
>> rawset(_G, MODULE_INTERNALS, M)
>> else
>> - router_cfg(M.current_cfg)
>> + for _, router in pairs(M.routers) do
>> + router_cfg(router, router.current_cfg)
>> + setmetatable(router, router_mt)
>> + end
>> M.module_version = M.module_version + 1
>> end
>>
>> M.discovery_f = discovery_f
>> M.failover_f = failover_f
>> +M.router_mt = router_mt
>> +if M.static_router then
>> + export_static_router_attributes()
>> +end
>
> 8. This is possible on reload only and can be moved into
> the if above to the reload case processing.
Fixed.
Here is a full diff
commit 87b6dc044de177e159dbe24f07abf3f98839ccff
Author: AKhatskevich <avkhatskevich at tarantool.org>
Date: Thu Jul 26 16:17:25 2018 +0300
Introduce multiple routers feature
Key points:
* Old `vshard.router.some_method()` api is preserved.
* Add `vshard.router.new(name, cfg)` method which returns a new router.
* Each router has its own:
1. name
2. background fibers
3. attributes (route_map, replicasets, outdate_delay...)
* Module reload reloads all configured routers.
* `cfg` reconfigures a single router.
* All routers share the same box configuration. The last passed config
overrides the global box config.
* Multiple router instances can be connected to the same cluster.
* By now, a router cannot be destroyed.
Extra changes:
* Add `data` parameter to `reloadable_fiber_create` function.
Closes #130
diff --git a/test/failover/failover.result b/test/failover/failover.result
index 73a4250..50410ad 100644
--- a/test/failover/failover.result
+++ b/test/failover/failover.result
@@ -174,7 +174,7 @@ test_run:switch('router_1')
---
- true
...
-rs1 = vshard.router.internal.replicasets[rs_uuid[1]]
+rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]]
---
...
while not rs1.replica_up_ts do fiber.sleep(0.1) end
diff --git a/test/failover/failover.test.lua
b/test/failover/failover.test.lua
index 6e06314..44c8b6d 100644
--- a/test/failover/failover.test.lua
+++ b/test/failover/failover.test.lua
@@ -74,7 +74,7 @@ echo_count
-- Ensure that replica_up_ts is updated periodically.
test_run:switch('router_1')
-rs1 = vshard.router.internal.replicasets[rs_uuid[1]]
+rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]]
while not rs1.replica_up_ts do fiber.sleep(0.1) end
old_up_ts = rs1.replica_up_ts
while rs1.replica_up_ts == old_up_ts do fiber.sleep(0.1) end
diff --git a/test/failover/failover_errinj.result
b/test/failover/failover_errinj.result
index 3b6d986..484a1e3 100644
--- a/test/failover/failover_errinj.result
+++ b/test/failover/failover_errinj.result
@@ -49,7 +49,7 @@ vshard.router.cfg(cfg)
-- Check that already run failover step is restarted on
-- configuration change (if some replicasets are removed from
-- config).
-rs1 = vshard.router.internal.replicasets[rs_uuid[1]]
+rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]]
---
...
while not rs1.replica or not rs1.replica.conn:is_connected() do
fiber.sleep(0.1) end
diff --git a/test/failover/failover_errinj.test.lua
b/test/failover/failover_errinj.test.lua
index b4d2d35..14228de 100644
--- a/test/failover/failover_errinj.test.lua
+++ b/test/failover/failover_errinj.test.lua
@@ -20,7 +20,7 @@ vshard.router.cfg(cfg)
-- Check that already run failover step is restarted on
-- configuration change (if some replicasets are removed from
-- config).
-rs1 = vshard.router.internal.replicasets[rs_uuid[1]]
+rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]]
while not rs1.replica or not rs1.replica.conn:is_connected() do
fiber.sleep(0.1) end
vshard.router.internal.errinj.ERRINJ_FAILOVER_CHANGE_CFG = true
wait_state('Configuration has changed, restart ')
diff --git a/test/failover/router_1.lua b/test/failover/router_1.lua
index d71209b..664a6c6 100644
--- a/test/failover/router_1.lua
+++ b/test/failover/router_1.lua
@@ -42,7 +42,7 @@ end
function priority_order()
local ret = {}
for _, uuid in pairs(rs_uuid) do
- local rs = vshard.router.internal.replicasets[uuid]
+ local rs = vshard.router.internal.static_router.replicasets[uuid]
local sorted = {}
for _, replica in pairs(rs.priority_list) do
local z
diff --git a/test/misc/reconfigure.result b/test/misc/reconfigure.result
index c7960b3..311f749 100644
--- a/test/misc/reconfigure.result
+++ b/test/misc/reconfigure.result
@@ -250,7 +250,7 @@ test_run:switch('router_1')
-- Ensure that in a case of error router internals are not
-- changed.
--
-not vshard.router.internal.collect_lua_garbage
+not vshard.router.internal.static_router.collect_lua_garbage
---
- true
...
@@ -264,7 +264,7 @@ vshard.router.cfg(cfg)
---
- error: 'Incorrect value for option ''invalid_option'': unexpected
option'
...
-not vshard.router.internal.collect_lua_garbage
+not vshard.router.internal.static_router.collect_lua_garbage
---
- true
...
diff --git a/test/misc/reconfigure.test.lua b/test/misc/reconfigure.test.lua
index 25dc2ca..298b9b0 100644
--- a/test/misc/reconfigure.test.lua
+++ b/test/misc/reconfigure.test.lua
@@ -99,11 +99,11 @@ test_run:switch('router_1')
-- Ensure that in a case of error router internals are not
-- changed.
--
-not vshard.router.internal.collect_lua_garbage
+not vshard.router.internal.static_router.collect_lua_garbage
cfg.collect_lua_garbage = true
cfg.invalid_option = 'kek'
vshard.router.cfg(cfg)
-not vshard.router.internal.collect_lua_garbage
+not vshard.router.internal.static_router.collect_lua_garbage
cfg.invalid_option = nil
cfg.collect_lua_garbage = nil
vshard.router.cfg(cfg)
diff --git a/test/multiple_routers/configs.lua
b/test/multiple_routers/configs.lua
new file mode 100644
index 0000000..a6ce33c
--- /dev/null
+++ b/test/multiple_routers/configs.lua
@@ -0,0 +1,81 @@
+names = {
+ storage_1_1_a = '32a2d4b8-f146-44ed-9d51-2436507efdf8',
+ storage_1_1_b = 'c1c849b1-641d-40b8-9283-bcfe73d46270',
+ storage_1_2_a = '04e677ed-c7ba-47e0-a67f-b5100cfa86af',
+ storage_1_2_b = 'c7a979ee-9263-4a38-84a5-2fb6a0a32684',
+ storage_2_1_a = '88dc03f0-23fb-4f05-b462-e29186542864',
+ storage_2_1_b = '4230b711-f5c4-4131-bf98-88cd43a16901',
+ storage_2_2_a = '6b1eefbc-1e2e-410e-84ff-44c572ea9916',
+ storage_2_2_b = 'be74419a-1e56-4ba4-97e9-6b18710f63c5',
+}
+
+rs_1_1 = 'dd208fb8-8b90-49bc-8393-6b3a99da7c52'
+rs_1_2 = 'af9cfe88-2091-4613-a877-a623776c5c0e'
+rs_2_1 = '9ca8ee15-ae18-4f31-9385-4859f89ce73f'
+rs_2_2 = '007f5f58-b654-4125-8441-a71866fb62b5'
+
+local cfg_1 = {}
+cfg_1.sharding = {
+ [rs_1_1] = {
+ replicas = {
+ [names.storage_1_1_a] = {
+ uri = 'storage:storage at 127.0.0.1:3301',
+ name = 'storage_1_1_a',
+ master = true,
+ },
+ [names.storage_1_1_b] = {
+ uri = 'storage:storage at 127.0.0.1:3302',
+ name = 'storage_1_1_b',
+ },
+ }
+ },
+ [rs_1_2] = {
+ replicas = {
+ [names.storage_1_2_a] = {
+ uri = 'storage:storage at 127.0.0.1:3303',
+ name = 'storage_1_2_a',
+ master = true,
+ },
+ [names.storage_1_2_b] = {
+ uri = 'storage:storage at 127.0.0.1:3304',
+ name = 'storage_1_2_b',
+ },
+ }
+ },
+}
+
+
+local cfg_2 = {}
+cfg_2.sharding = {
+ [rs_2_1] = {
+ replicas = {
+ [names.storage_2_1_a] = {
+ uri = 'storage:storage at 127.0.0.1:3305',
+ name = 'storage_2_1_a',
+ master = true,
+ },
+ [names.storage_2_1_b] = {
+ uri = 'storage:storage at 127.0.0.1:3306',
+ name = 'storage_2_1_b',
+ },
+ }
+ },
+ [rs_2_2] = {
+ replicas = {
+ [names.storage_2_2_a] = {
+ uri = 'storage:storage at 127.0.0.1:3307',
+ name = 'storage_2_2_a',
+ master = true,
+ },
+ [names.storage_2_2_b] = {
+ uri = 'storage:storage at 127.0.0.1:3308',
+ name = 'storage_2_2_b',
+ },
+ }
+ },
+}
+
+return {
+ cfg_1 = cfg_1,
+ cfg_2 = cfg_2,
+}
diff --git a/test/multiple_routers/multiple_routers.result
b/test/multiple_routers/multiple_routers.result
new file mode 100644
index 0000000..5b85e1c
--- /dev/null
+++ b/test/multiple_routers/multiple_routers.result
@@ -0,0 +1,301 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1_1 = { 'storage_1_1_a', 'storage_1_1_b' }
+---
+...
+REPLICASET_1_2 = { 'storage_1_2_a', 'storage_1_2_b' }
+---
+...
+REPLICASET_2_1 = { 'storage_2_1_a', 'storage_2_1_b' }
+---
+...
+REPLICASET_2_2 = { 'storage_2_2_a', 'storage_2_2_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1_1, 'multiple_routers')
+---
+...
+test_run:create_cluster(REPLICASET_1_2, 'multiple_routers')
+---
+...
+test_run:create_cluster(REPLICASET_2_1, 'multiple_routers')
+---
+...
+test_run:create_cluster(REPLICASET_2_2, 'multiple_routers')
+---
+...
+util = require('lua_libs.util')
+---
+...
+util.wait_master(test_run, REPLICASET_1_1, 'storage_1_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_1_2, 'storage_1_2_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2_1, 'storage_2_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2_2, 'storage_2_2_a')
+---
+...
+test_run:cmd("create server router_1 with
script='multiple_routers/router_1.lua'")
+---
+- true
+...
+test_run:cmd("start server router_1")
+---
+- true
+...
+-- Configure default (static) router.
+_ = test_run:cmd("switch router_1")
+---
+...
+vshard.router.cfg(configs.cfg_1)
+---
+...
+vshard.router.bootstrap()
+---
+- true
+...
+_ = test_run:cmd("switch storage_1_2_a")
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+_ = test_run:cmd("switch router_1")
+---
+...
+vshard.router.call(1, 'write', 'do_replace', {{1, 1}})
+---
+- true
+...
+vshard.router.call(1, 'read', 'do_select', {1})
+---
+- [[1, 1]]
+...
+-- Test that static router is just a router object under the hood.
+static_router = vshard.router.internal.static_router
+---
+...
+static_router:route(1) == vshard.router.route(1)
+---
+- true
+...
+-- Configure extra router.
+router_2 = vshard.router.new('router_2', configs.cfg_2)
+---
+...
+router_2:bootstrap()
+---
+- true
+...
+_ = test_run:cmd("switch storage_2_2_a")
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+_ = test_run:cmd("switch router_1")
+---
+...
+router_2:call(1, 'write', 'do_replace', {{2, 2}})
+---
+- true
+...
+router_2:call(1, 'read', 'do_select', {2})
+---
+- [[2, 2]]
+...
+-- Check that router_2 and static router serves different clusters.
+#router_2:call(1, 'read', 'do_select', {1}) == 0
+---
+- true
+...
+-- Create several routers to the same cluster.
+routers = {}
+---
+...
+for i = 3, 10 do routers[i] = vshard.router.new('router_' .. i,
configs.cfg_2) end
+---
+...
+routers[3]:call(1, 'read', 'do_select', {2})
+---
+- [[2, 2]]
+...
+-- Check that they have their own background fibers.
+fiber_names = {}
+---
+...
+for i = 2, 10 do fiber_names['vshard.failover.router_' .. i] = true;
fiber_names['vshard.discovery.router_' .. i] = true; end
+---
+...
+next(fiber_names) ~= nil
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+for _, xfiber in pairs(fiber.info()) do fiber_names[xfiber.name] = nil end
+---
+...
+next(fiber_names) == nil
+---
+- true
+...
+-- Reconfigure one of routers do not affect the others.
+routers[3]:cfg(configs.cfg_1)
+---
+...
+routers[3]:call(1, 'read', 'do_select', {1})
+---
+- [[1, 1]]
+...
+#routers[3]:call(1, 'read', 'do_select', {2}) == 0
+---
+- true
+...
+#routers[4]:call(1, 'read', 'do_select', {1}) == 0
+---
+- true
+...
+routers[4]:call(1, 'read', 'do_select', {2})
+---
+- [[2, 2]]
+...
+routers[3]:cfg(configs.cfg_2)
+---
+...
+-- Try to create router with the same name.
+util = require('lua_libs.util')
+---
+...
+util.check_error(vshard.router.new, 'router_2', configs.cfg_2)
+---
+- null
+- type: ShardingError
+ code: 21
+ name: ROUTER_ALREADY_EXISTS
+ message: Router with name router_2 already exists
+...
+-- Reload router module.
+_, old_rs_1 = next(vshard.router.internal.static_router.replicasets)
+---
+...
+_, old_rs_2 = next(router_2.replicasets)
+---
+...
+package.loaded['vshard.router'] = nil
+---
+...
+vshard.router = require('vshard.router')
+---
+...
+while not old_rs_1.is_outdated do fiber.sleep(0.01) end
+---
+...
+while not old_rs_2.is_outdated do fiber.sleep(0.01) end
+---
+...
+vshard.router.call(1, 'read', 'do_select', {1})
+---
+- [[1, 1]]
+...
+router_2:call(1, 'read', 'do_select', {2})
+---
+- [[2, 2]]
+...
+routers[5]:call(1, 'read', 'do_select', {2})
+---
+- [[2, 2]]
+...
+-- Check lua_gc counter.
+lua_gc = require('vshard.lua_gc')
+---
+...
+vshard.router.internal.collect_lua_garbage_cnt == 0
+---
+- true
+...
+lua_gc.internal.bg_fiber == nil
+---
+- true
+...
+configs.cfg_2.collect_lua_garbage = true
+---
+...
+routers[5]:cfg(configs.cfg_2)
+---
+...
+lua_gc.internal.bg_fiber ~= nil
+---
+- true
+...
+routers[7]:cfg(configs.cfg_2)
+---
+...
+lua_gc.internal.bg_fiber ~= nil
+---
+- true
+...
+vshard.router.internal.collect_lua_garbage_cnt == 2
+---
+- true
+...
+package.loaded['vshard.router'] = nil
+---
+...
+vshard.router = require('vshard.router')
+---
+...
+vshard.router.internal.collect_lua_garbage_cnt == 2
+---
+- true
+...
+configs.cfg_2.collect_lua_garbage = nil
+---
+...
+routers[5]:cfg(configs.cfg_2)
+---
+...
+lua_gc.internal.bg_fiber ~= nil
+---
+- true
+...
+routers[7]:cfg(configs.cfg_2)
+---
+...
+vshard.router.internal.collect_lua_garbage_cnt == 0
+---
+- true
+...
+lua_gc.internal.bg_fiber == nil
+---
+- true
+...
+_ = test_run:cmd("switch default")
+---
+...
+test_run:cmd("stop server router_1")
+---
+- true
+...
+test_run:cmd("cleanup server router_1")
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_1_1)
+---
+...
+test_run:drop_cluster(REPLICASET_1_2)
+---
+...
+test_run:drop_cluster(REPLICASET_2_1)
+---
+...
+test_run:drop_cluster(REPLICASET_2_2)
+---
+...
diff --git a/test/multiple_routers/multiple_routers.test.lua
b/test/multiple_routers/multiple_routers.test.lua
new file mode 100644
index 0000000..ec3c7f7
--- /dev/null
+++ b/test/multiple_routers/multiple_routers.test.lua
@@ -0,0 +1,109 @@
+test_run = require('test_run').new()
+
+REPLICASET_1_1 = { 'storage_1_1_a', 'storage_1_1_b' }
+REPLICASET_1_2 = { 'storage_1_2_a', 'storage_1_2_b' }
+REPLICASET_2_1 = { 'storage_2_1_a', 'storage_2_1_b' }
+REPLICASET_2_2 = { 'storage_2_2_a', 'storage_2_2_b' }
+
+test_run:create_cluster(REPLICASET_1_1, 'multiple_routers')
+test_run:create_cluster(REPLICASET_1_2, 'multiple_routers')
+test_run:create_cluster(REPLICASET_2_1, 'multiple_routers')
+test_run:create_cluster(REPLICASET_2_2, 'multiple_routers')
+util = require('lua_libs.util')
+util.wait_master(test_run, REPLICASET_1_1, 'storage_1_1_a')
+util.wait_master(test_run, REPLICASET_1_2, 'storage_1_2_a')
+util.wait_master(test_run, REPLICASET_2_1, 'storage_2_1_a')
+util.wait_master(test_run, REPLICASET_2_2, 'storage_2_2_a')
+
+test_run:cmd("create server router_1 with
script='multiple_routers/router_1.lua'")
+test_run:cmd("start server router_1")
+
+-- Configure default (static) router.
+_ = test_run:cmd("switch router_1")
+vshard.router.cfg(configs.cfg_1)
+vshard.router.bootstrap()
+_ = test_run:cmd("switch storage_1_2_a")
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+_ = test_run:cmd("switch router_1")
+
+vshard.router.call(1, 'write', 'do_replace', {{1, 1}})
+vshard.router.call(1, 'read', 'do_select', {1})
+
+-- Test that static router is just a router object under the hood.
+static_router = vshard.router.internal.static_router
+static_router:route(1) == vshard.router.route(1)
+
+-- Configure extra router.
+router_2 = vshard.router.new('router_2', configs.cfg_2)
+router_2:bootstrap()
+_ = test_run:cmd("switch storage_2_2_a")
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+_ = test_run:cmd("switch router_1")
+
+router_2:call(1, 'write', 'do_replace', {{2, 2}})
+router_2:call(1, 'read', 'do_select', {2})
+-- Check that router_2 and static router serves different clusters.
+#router_2:call(1, 'read', 'do_select', {1}) == 0
+
+-- Create several routers to the same cluster.
+routers = {}
+for i = 3, 10 do routers[i] = vshard.router.new('router_' .. i,
configs.cfg_2) end
+routers[3]:call(1, 'read', 'do_select', {2})
+-- Check that they have their own background fibers.
+fiber_names = {}
+for i = 2, 10 do fiber_names['vshard.failover.router_' .. i] = true;
fiber_names['vshard.discovery.router_' .. i] = true; end
+next(fiber_names) ~= nil
+fiber = require('fiber')
+for _, xfiber in pairs(fiber.info()) do fiber_names[xfiber.name] = nil end
+next(fiber_names) == nil
+
+-- Reconfigure one of routers do not affect the others.
+routers[3]:cfg(configs.cfg_1)
+routers[3]:call(1, 'read', 'do_select', {1})
+#routers[3]:call(1, 'read', 'do_select', {2}) == 0
+#routers[4]:call(1, 'read', 'do_select', {1}) == 0
+routers[4]:call(1, 'read', 'do_select', {2})
+routers[3]:cfg(configs.cfg_2)
+
+-- Try to create router with the same name.
+util = require('lua_libs.util')
+util.check_error(vshard.router.new, 'router_2', configs.cfg_2)
+
+-- Reload router module.
+_, old_rs_1 = next(vshard.router.internal.static_router.replicasets)
+_, old_rs_2 = next(router_2.replicasets)
+package.loaded['vshard.router'] = nil
+vshard.router = require('vshard.router')
+while not old_rs_1.is_outdated do fiber.sleep(0.01) end
+while not old_rs_2.is_outdated do fiber.sleep(0.01) end
+vshard.router.call(1, 'read', 'do_select', {1})
+router_2:call(1, 'read', 'do_select', {2})
+routers[5]:call(1, 'read', 'do_select', {2})
+
+-- Check lua_gc counter.
+lua_gc = require('vshard.lua_gc')
+vshard.router.internal.collect_lua_garbage_cnt == 0
+lua_gc.internal.bg_fiber == nil
+configs.cfg_2.collect_lua_garbage = true
+routers[5]:cfg(configs.cfg_2)
+lua_gc.internal.bg_fiber ~= nil
+routers[7]:cfg(configs.cfg_2)
+lua_gc.internal.bg_fiber ~= nil
+vshard.router.internal.collect_lua_garbage_cnt == 2
+package.loaded['vshard.router'] = nil
+vshard.router = require('vshard.router')
+vshard.router.internal.collect_lua_garbage_cnt == 2
+configs.cfg_2.collect_lua_garbage = nil
+routers[5]:cfg(configs.cfg_2)
+lua_gc.internal.bg_fiber ~= nil
+routers[7]:cfg(configs.cfg_2)
+vshard.router.internal.collect_lua_garbage_cnt == 0
+lua_gc.internal.bg_fiber == nil
+
+_ = test_run:cmd("switch default")
+test_run:cmd("stop server router_1")
+test_run:cmd("cleanup server router_1")
+test_run:drop_cluster(REPLICASET_1_1)
+test_run:drop_cluster(REPLICASET_1_2)
+test_run:drop_cluster(REPLICASET_2_1)
+test_run:drop_cluster(REPLICASET_2_2)
diff --git a/test/multiple_routers/router_1.lua
b/test/multiple_routers/router_1.lua
new file mode 100644
index 0000000..2e9ea91
--- /dev/null
+++ b/test/multiple_routers/router_1.lua
@@ -0,0 +1,15 @@
+#!/usr/bin/env tarantool
+
+require('strict').on()
+
+-- Get instance name
+local fio = require('fio')
+local NAME = fio.basename(arg[0], '.lua')
+
+require('console').listen(os.getenv('ADMIN'))
+
+configs = require('configs')
+
+-- Start the database with sharding
+vshard = require('vshard')
+box.cfg{}
diff --git a/test/multiple_routers/storage_1_1_a.lua
b/test/multiple_routers/storage_1_1_a.lua
new file mode 100644
index 0000000..b44a97a
--- /dev/null
+++ b/test/multiple_routers/storage_1_1_a.lua
@@ -0,0 +1,23 @@
+#!/usr/bin/env tarantool
+
+require('strict').on()
+
+-- Get instance name.
+local fio = require('fio')
+NAME = fio.basename(arg[0], '.lua')
+
+require('console').listen(os.getenv('ADMIN'))
+
+-- Fetch config for the cluster of the instance.
+if NAME:sub(9,9) == '1' then
+ cfg = require('configs').cfg_1
+else
+ cfg = require('configs').cfg_2
+end
+
+-- Start the database with sharding.
+vshard = require('vshard')
+vshard.storage.cfg(cfg, names[NAME])
+
+-- Bootstrap storage.
+require('lua_libs.bootstrap')
diff --git a/test/multiple_routers/storage_1_1_b.lua
b/test/multiple_routers/storage_1_1_b.lua
new file mode 120000
index 0000000..76d196b
--- /dev/null
+++ b/test/multiple_routers/storage_1_1_b.lua
@@ -0,0 +1 @@
+storage_1_1_a.lua
\ No newline at end of file
diff --git a/test/multiple_routers/storage_1_2_a.lua
b/test/multiple_routers/storage_1_2_a.lua
new file mode 120000
index 0000000..76d196b
--- /dev/null
+++ b/test/multiple_routers/storage_1_2_a.lua
@@ -0,0 +1 @@
+storage_1_1_a.lua
\ No newline at end of file
diff --git a/test/multiple_routers/storage_1_2_b.lua
b/test/multiple_routers/storage_1_2_b.lua
new file mode 120000
index 0000000..76d196b
--- /dev/null
+++ b/test/multiple_routers/storage_1_2_b.lua
@@ -0,0 +1 @@
+storage_1_1_a.lua
\ No newline at end of file
diff --git a/test/multiple_routers/storage_2_1_a.lua
b/test/multiple_routers/storage_2_1_a.lua
new file mode 120000
index 0000000..76d196b
--- /dev/null
+++ b/test/multiple_routers/storage_2_1_a.lua
@@ -0,0 +1 @@
+storage_1_1_a.lua
\ No newline at end of file
diff --git a/test/multiple_routers/storage_2_1_b.lua
b/test/multiple_routers/storage_2_1_b.lua
new file mode 120000
index 0000000..76d196b
--- /dev/null
+++ b/test/multiple_routers/storage_2_1_b.lua
@@ -0,0 +1 @@
+storage_1_1_a.lua
\ No newline at end of file
diff --git a/test/multiple_routers/storage_2_2_a.lua
b/test/multiple_routers/storage_2_2_a.lua
new file mode 120000
index 0000000..76d196b
--- /dev/null
+++ b/test/multiple_routers/storage_2_2_a.lua
@@ -0,0 +1 @@
+storage_1_1_a.lua
\ No newline at end of file
diff --git a/test/multiple_routers/storage_2_2_b.lua
b/test/multiple_routers/storage_2_2_b.lua
new file mode 120000
index 0000000..76d196b
--- /dev/null
+++ b/test/multiple_routers/storage_2_2_b.lua
@@ -0,0 +1 @@
+storage_1_1_a.lua
\ No newline at end of file
diff --git a/test/multiple_routers/suite.ini
b/test/multiple_routers/suite.ini
new file mode 100644
index 0000000..d2d4470
--- /dev/null
+++ b/test/multiple_routers/suite.ini
@@ -0,0 +1,6 @@
+[default]
+core = tarantool
+description = Multiple routers tests
+script = test.lua
+is_parallel = False
+lua_libs = ../lua_libs configs.lua
diff --git a/test/multiple_routers/test.lua b/test/multiple_routers/test.lua
new file mode 100644
index 0000000..cb7c1ee
--- /dev/null
+++ b/test/multiple_routers/test.lua
@@ -0,0 +1,9 @@
+#!/usr/bin/env tarantool
+
+require('strict').on()
+
+box.cfg{
+ listen = os.getenv("LISTEN"),
+}
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/router/exponential_timeout.result
b/test/router/exponential_timeout.result
index fb54d0f..6748b64 100644
--- a/test/router/exponential_timeout.result
+++ b/test/router/exponential_timeout.result
@@ -37,10 +37,10 @@ test_run:cmd('switch router_1')
util = require('util')
---
...
-rs1 = vshard.router.internal.replicasets[replicasets[1]]
+rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]
---
...
-rs2 = vshard.router.internal.replicasets[replicasets[2]]
+rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]
---
...
util.collect_timeouts(rs1)
diff --git a/test/router/exponential_timeout.test.lua
b/test/router/exponential_timeout.test.lua
index 3ec0b8c..75d85bf 100644
--- a/test/router/exponential_timeout.test.lua
+++ b/test/router/exponential_timeout.test.lua
@@ -13,8 +13,8 @@ test_run:cmd("start server router_1")
test_run:cmd('switch router_1')
util = require('util')
-rs1 = vshard.router.internal.replicasets[replicasets[1]]
-rs2 = vshard.router.internal.replicasets[replicasets[2]]
+rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]
+rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]
util.collect_timeouts(rs1)
util.collect_timeouts(rs2)
diff --git a/test/router/reconnect_to_master.result
b/test/router/reconnect_to_master.result
index 5e678ce..d502723 100644
--- a/test/router/reconnect_to_master.result
+++ b/test/router/reconnect_to_master.result
@@ -76,7 +76,7 @@ _ = test_run:cmd('stop server storage_1_a')
_ = test_run:switch('router_1')
---
...
-reps = vshard.router.internal.replicasets
+reps = vshard.router.internal.static_router.replicasets
---
...
test_run:cmd("setopt delimiter ';'")
@@ -95,7 +95,7 @@ end;
...
function count_known_buckets()
local known_buckets = 0
- for _, id in pairs(vshard.router.internal.route_map) do
+ for _, id in pairs(vshard.router.internal.static_router.route_map) do
known_buckets = known_buckets + 1
end
return known_buckets
@@ -127,7 +127,7 @@ is_disconnected()
fiber = require('fiber')
---
...
-while vshard.router.internal.replicasets[replicasets[1]].replica == nil
do fiber.sleep(0.1) end
+while
vshard.router.internal.static_router.replicasets[replicasets[1]].replica
== nil do fiber.sleep(0.1) end
---
...
vshard.router.info()
diff --git a/test/router/reconnect_to_master.test.lua
b/test/router/reconnect_to_master.test.lua
index 39ba90e..8820fa7 100644
--- a/test/router/reconnect_to_master.test.lua
+++ b/test/router/reconnect_to_master.test.lua
@@ -34,7 +34,7 @@ _ = test_run:cmd('stop server storage_1_a')
_ = test_run:switch('router_1')
-reps = vshard.router.internal.replicasets
+reps = vshard.router.internal.static_router.replicasets
test_run:cmd("setopt delimiter ';'")
function is_disconnected()
for i, rep in pairs(reps) do
@@ -46,7 +46,7 @@ function is_disconnected()
end;
function count_known_buckets()
local known_buckets = 0
- for _, id in pairs(vshard.router.internal.route_map) do
+ for _, id in pairs(vshard.router.internal.static_router.route_map) do
known_buckets = known_buckets + 1
end
return known_buckets
@@ -63,7 +63,7 @@ is_disconnected()
-- Wait until replica is connected to test alerts on unavailable
-- master.
fiber = require('fiber')
-while vshard.router.internal.replicasets[replicasets[1]].replica == nil
do fiber.sleep(0.1) end
+while
vshard.router.internal.static_router.replicasets[replicasets[1]].replica
== nil do fiber.sleep(0.1) end
vshard.router.info()
-- Return master.
diff --git a/test/router/reload.result b/test/router/reload.result
index f0badc3..98e8e71 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -229,7 +229,7 @@ vshard.router.cfg(cfg)
cfg.connection_outdate_delay = old_connection_delay
---
...
-vshard.router.internal.connection_outdate_delay = nil
+vshard.router.internal.static_router.connection_outdate_delay = nil
---
...
rs_new = vshard.router.route(1)
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 528222a..293cb26 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -104,7 +104,7 @@ old_connection_delay = cfg.connection_outdate_delay
cfg.connection_outdate_delay = 0.3
vshard.router.cfg(cfg)
cfg.connection_outdate_delay = old_connection_delay
-vshard.router.internal.connection_outdate_delay = nil
+vshard.router.internal.static_router.connection_outdate_delay = nil
rs_new = vshard.router.route(1)
rs_old = rs
_, replica_old = next(rs_old.replicas)
diff --git a/test/router/reroute_wrong_bucket.result
b/test/router/reroute_wrong_bucket.result
index 7f2a494..989dc79 100644
--- a/test/router/reroute_wrong_bucket.result
+++ b/test/router/reroute_wrong_bucket.result
@@ -98,7 +98,7 @@ vshard.router.call(100, 'read', 'customer_lookup',
{1}, {timeout = 100})
---
- {'accounts': [], 'customer_id': 1, 'name': 'name'}
...
-vshard.router.internal.route_map[100] =
vshard.router.internal.replicasets[replicasets[1]]
+vshard.router.internal.static_router.route_map[100] =
vshard.router.internal.static_router.replicasets[replicasets[1]]
---
...
vshard.router.call(100, 'write', 'customer_add', {{customer_id = 2,
bucket_id = 100, name = 'name2', accounts = {}}}, {timeout = 100})
@@ -146,13 +146,13 @@ test_run:switch('router_1')
...
-- Emulate a situation, when a replicaset_2 while is unknown for
-- router, but is already known for storages.
-save_rs2 = vshard.router.internal.replicasets[replicasets[2]]
+save_rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]
---
...
-vshard.router.internal.replicasets[replicasets[2]] = nil
+vshard.router.internal.static_router.replicasets[replicasets[2]] = nil
---
...
-vshard.router.internal.route_map[100] =
vshard.router.internal.replicasets[replicasets[1]]
+vshard.router.internal.static_router.route_map[100] =
vshard.router.internal.static_router.replicasets[replicasets[1]]
---
...
fiber = require('fiber')
@@ -207,7 +207,7 @@ err
require('log').info(string.rep('a', 1000))
---
...
-vshard.router.internal.route_map[100] =
vshard.router.internal.replicasets[replicasets[1]]
+vshard.router.internal.static_router.route_map[100] =
vshard.router.internal.static_router.replicasets[replicasets[1]]
---
...
call_retval = nil
@@ -219,7 +219,7 @@ f = fiber.create(do_call, 100)
while not test_run:grep_log('router_1', 'please update configuration',
1000) do fiber.sleep(0.1) end
---
...
-vshard.router.internal.replicasets[replicasets[2]] = save_rs2
+vshard.router.internal.static_router.replicasets[replicasets[2]] = save_rs2
---
...
while not call_retval do fiber.sleep(0.1) end
diff --git a/test/router/reroute_wrong_bucket.test.lua
b/test/router/reroute_wrong_bucket.test.lua
index 03384d1..a00f941 100644
--- a/test/router/reroute_wrong_bucket.test.lua
+++ b/test/router/reroute_wrong_bucket.test.lua
@@ -35,7 +35,7 @@ customer_add({customer_id = 1, bucket_id = 100, name =
'name', accounts = {}})
test_run:switch('router_1')
vshard.router.call(100, 'read', 'customer_lookup', {1}, {timeout = 100})
-vshard.router.internal.route_map[100] =
vshard.router.internal.replicasets[replicasets[1]]
+vshard.router.internal.static_router.route_map[100] =
vshard.router.internal.static_router.replicasets[replicasets[1]]
vshard.router.call(100, 'write', 'customer_add', {{customer_id = 2,
bucket_id = 100, name = 'name2', accounts = {}}}, {timeout = 100})
-- Create cycle.
@@ -55,9 +55,9 @@ box.space._bucket:replace({100,
vshard.consts.BUCKET.SENT, replicasets[2]})
test_run:switch('router_1')
-- Emulate a situation, when a replicaset_2 while is unknown for
-- router, but is already known for storages.
-save_rs2 = vshard.router.internal.replicasets[replicasets[2]]
-vshard.router.internal.replicasets[replicasets[2]] = nil
-vshard.router.internal.route_map[100] =
vshard.router.internal.replicasets[replicasets[1]]
+save_rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]
+vshard.router.internal.static_router.replicasets[replicasets[2]] = nil
+vshard.router.internal.static_router.route_map[100] =
vshard.router.internal.static_router.replicasets[replicasets[1]]
fiber = require('fiber')
call_retval = nil
@@ -84,11 +84,11 @@ err
-- detect it and end with ok.
--
require('log').info(string.rep('a', 1000))
-vshard.router.internal.route_map[100] =
vshard.router.internal.replicasets[replicasets[1]]
+vshard.router.internal.static_router.route_map[100] =
vshard.router.internal.static_router.replicasets[replicasets[1]]
call_retval = nil
f = fiber.create(do_call, 100)
while not test_run:grep_log('router_1', 'please update configuration',
1000) do fiber.sleep(0.1) end
-vshard.router.internal.replicasets[replicasets[2]] = save_rs2
+vshard.router.internal.static_router.replicasets[replicasets[2]] = save_rs2
while not call_retval do fiber.sleep(0.1) end
call_retval
vshard.router.call(100, 'read', 'customer_lookup', {3}, {timeout = 1})
diff --git a/test/router/retry_reads.result b/test/router/retry_reads.result
index 64b0ff3..b803ae3 100644
--- a/test/router/retry_reads.result
+++ b/test/router/retry_reads.result
@@ -37,7 +37,7 @@ test_run:cmd('switch router_1')
util = require('util')
---
...
-rs1 = vshard.router.internal.replicasets[replicasets[1]]
+rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]
---
...
min_timeout = vshard.consts.CALL_TIMEOUT_MIN
diff --git a/test/router/retry_reads.test.lua
b/test/router/retry_reads.test.lua
index 2fb2fc7..510e961 100644
--- a/test/router/retry_reads.test.lua
+++ b/test/router/retry_reads.test.lua
@@ -13,7 +13,7 @@ test_run:cmd("start server router_1")
test_run:cmd('switch router_1')
util = require('util')
-rs1 = vshard.router.internal.replicasets[replicasets[1]]
+rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]
min_timeout = vshard.consts.CALL_TIMEOUT_MIN
--
diff --git a/test/router/router.result b/test/router/router.result
index 45394e1..ceaf672 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -70,10 +70,10 @@ test_run:grep_log('router_1', 'connected to ')
---
- 'connected to '
...
-rs1 = vshard.router.internal.replicasets[replicasets[1]]
+rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]
---
...
-rs2 = vshard.router.internal.replicasets[replicasets[2]]
+rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]
---
...
fiber = require('fiber')
@@ -95,7 +95,7 @@ rs2.replica == rs2.master
-- Part of gh-76: on reconfiguration do not recreate connections
-- to replicas, that are kept in a new configuration.
--
-old_replicasets = vshard.router.internal.replicasets
+old_replicasets = vshard.router.internal.static_router.replicasets
---
...
old_connections = {}
@@ -127,17 +127,17 @@ connection_count == 4
vshard.router.cfg(cfg)
---
...
-new_replicasets = vshard.router.internal.replicasets
+new_replicasets = vshard.router.internal.static_router.replicasets
---
...
old_replicasets ~= new_replicasets
---
- true
...
-rs1 = vshard.router.internal.replicasets[replicasets[1]]
+rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]
---
...
-rs2 = vshard.router.internal.replicasets[replicasets[2]]
+rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]
---
...
while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end
@@ -225,7 +225,7 @@ vshard.router.bootstrap()
--
-- gh-108: negative bucket count on discovery.
--
-vshard.router.internal.route_map = {}
+vshard.router.internal.static_router.route_map = {}
---
...
rets = {}
@@ -456,7 +456,7 @@ conn.state
rs_uuid = '<replicaset_2>'
---
...
-rs = vshard.router.internal.replicasets[rs_uuid]
+rs = vshard.router.internal.static_router.replicasets[rs_uuid]
---
...
master = rs.master
@@ -605,7 +605,7 @@ vshard.router.info()
...
-- Remove replica and master connections to trigger alert
-- UNREACHABLE_REPLICASET.
-rs = vshard.router.internal.replicasets[replicasets[1]]
+rs = vshard.router.internal.static_router.replicasets[replicasets[1]]
---
...
master_conn = rs.master.conn
@@ -749,7 +749,7 @@ test_run:cmd("setopt delimiter ';'")
...
function calculate_known_buckets()
local known_buckets = 0
- for _, rs in pairs(vshard.router.internal.route_map) do
+ for _, rs in pairs(vshard.router.internal.static_router.route_map) do
known_buckets = known_buckets + 1
end
return known_buckets
@@ -851,10 +851,10 @@ test_run:cmd("setopt delimiter ';'")
- true
...
for i = 1, 100 do
- local rs = vshard.router.internal.route_map[i]
+ local rs = vshard.router.internal.static_router.route_map[i]
assert(rs)
rs.bucket_count = rs.bucket_count - 1
- vshard.router.internal.route_map[i] = nil
+ vshard.router.internal.static_router.route_map[i] = nil
end;
---
...
@@ -999,7 +999,7 @@ vshard.router.sync(100500)
-- object method like this: object.method() instead of
-- object:method(), an appropriate help-error returns.
--
-_, replicaset = next(vshard.router.internal.replicasets)
+_, replicaset = next(vshard.router.internal.static_router.replicasets)
---
...
error_messages = {}
@@ -1069,7 +1069,7 @@ test_run:cmd("setopt delimiter ';'")
---
- true
...
-for bucket, rs in pairs(vshard.router.internal.route_map) do
+for bucket, rs in pairs(vshard.router.internal.static_router.route_map) do
bucket_to_old_rs[bucket] = rs
bucket_cnt = bucket_cnt + 1
end;
@@ -1084,7 +1084,7 @@ vshard.router.cfg(cfg);
...
for bucket, old_rs in pairs(bucket_to_old_rs) do
local old_uuid = old_rs.uuid
- local rs = vshard.router.internal.route_map[bucket]
+ local rs = vshard.router.internal.static_router.route_map[bucket]
if not rs or not old_uuid == rs.uuid then
error("Bucket lost during reconfigure.")
end
@@ -1111,7 +1111,7 @@ end;
vshard.router.cfg(cfg);
---
...
-vshard.router.internal.route_map = {};
+vshard.router.internal.static_router.route_map = {};
---
...
vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false;
@@ -1119,7 +1119,7 @@
vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false;
...
-- Do discovery iteration. Upload buckets from the
-- first replicaset.
-while not next(vshard.router.internal.route_map) do
+while not next(vshard.router.internal.static_router.route_map) do
vshard.router.discovery_wakeup()
fiber.sleep(0.01)
end;
@@ -1128,12 +1128,12 @@ end;
new_replicasets = {};
---
...
-for _, rs in pairs(vshard.router.internal.replicasets) do
+for _, rs in pairs(vshard.router.internal.static_router.replicasets) do
new_replicasets[rs] = true
end;
---
...
-_, rs = next(vshard.router.internal.route_map);
+_, rs = next(vshard.router.internal.static_router.route_map);
---
...
new_replicasets[rs] == true;
@@ -1185,6 +1185,17 @@ vshard.router.route(1):callro('echo', {'some_data'})
- null
- null
...
+-- Multiple routers: check that static router can be used as an
+-- object.
+static_router = vshard.router.internal.static_router
+---
+...
+static_router:route(1):callro('echo', {'some_data'})
+---
+- some_data
+- null
+- null
+...
_ = test_run:cmd("switch default")
---
...
diff --git a/test/router/router.test.lua b/test/router/router.test.lua
index df2f381..d7588f7 100644
--- a/test/router/router.test.lua
+++ b/test/router/router.test.lua
@@ -27,8 +27,8 @@ util = require('util')
-- gh-24: log all connnect/disconnect events.
test_run:grep_log('router_1', 'connected to ')
-rs1 = vshard.router.internal.replicasets[replicasets[1]]
-rs2 = vshard.router.internal.replicasets[replicasets[2]]
+rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]
+rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]
fiber = require('fiber')
while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end
-- With no zones the nearest server is master.
@@ -39,7 +39,7 @@ rs2.replica == rs2.master
-- Part of gh-76: on reconfiguration do not recreate connections
-- to replicas, that are kept in a new configuration.
--
-old_replicasets = vshard.router.internal.replicasets
+old_replicasets = vshard.router.internal.static_router.replicasets
old_connections = {}
connection_count = 0
test_run:cmd("setopt delimiter ';'")
@@ -52,10 +52,10 @@ end;
test_run:cmd("setopt delimiter ''");
connection_count == 4
vshard.router.cfg(cfg)
-new_replicasets = vshard.router.internal.replicasets
+new_replicasets = vshard.router.internal.static_router.replicasets
old_replicasets ~= new_replicasets
-rs1 = vshard.router.internal.replicasets[replicasets[1]]
-rs2 = vshard.router.internal.replicasets[replicasets[2]]
+rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]]
+rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]]
while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end
vshard.router.discovery_wakeup()
-- Check that netbox connections are the same.
@@ -91,7 +91,7 @@ vshard.router.bootstrap()
--
-- gh-108: negative bucket count on discovery.
--
-vshard.router.internal.route_map = {}
+vshard.router.internal.static_router.route_map = {}
rets = {}
function do_echo() table.insert(rets, vshard.router.callro(1, 'echo',
{1})) end
f1 = fiber.create(do_echo) f2 = fiber.create(do_echo)
@@ -153,7 +153,7 @@ conn = vshard.router.route(1).master.conn
conn.state
-- Test missing master.
rs_uuid = 'ac522f65-aa94-4134-9f64-51ee384f1a54'
-rs = vshard.router.internal.replicasets[rs_uuid]
+rs = vshard.router.internal.static_router.replicasets[rs_uuid]
master = rs.master
rs.master = nil
vshard.router.route(1).master
@@ -223,7 +223,7 @@ vshard.router.info()
-- Remove replica and master connections to trigger alert
-- UNREACHABLE_REPLICASET.
-rs = vshard.router.internal.replicasets[replicasets[1]]
+rs = vshard.router.internal.static_router.replicasets[replicasets[1]]
master_conn = rs.master.conn
replica_conn = rs.replica.conn
rs.master.conn = nil
@@ -261,7 +261,7 @@ util.check_error(vshard.router.buckets_info, 123, '456')
test_run:cmd("setopt delimiter ';'")
function calculate_known_buckets()
local known_buckets = 0
- for _, rs in pairs(vshard.router.internal.route_map) do
+ for _, rs in pairs(vshard.router.internal.static_router.route_map) do
known_buckets = known_buckets + 1
end
return known_buckets
@@ -301,10 +301,10 @@ test_run:switch('router_1')
--
test_run:cmd("setopt delimiter ';'")
for i = 1, 100 do
- local rs = vshard.router.internal.route_map[i]
+ local rs = vshard.router.internal.static_router.route_map[i]
assert(rs)
rs.bucket_count = rs.bucket_count - 1
- vshard.router.internal.route_map[i] = nil
+ vshard.router.internal.static_router.route_map[i] = nil
end;
test_run:cmd("setopt delimiter ''");
calculate_known_buckets()
@@ -367,7 +367,7 @@ vshard.router.sync(100500)
-- object method like this: object.method() instead of
-- object:method(), an appropriate help-error returns.
--
-_, replicaset = next(vshard.router.internal.replicasets)
+_, replicaset = next(vshard.router.internal.static_router.replicasets)
error_messages = {}
test_run:cmd("setopt delimiter ';'")
@@ -395,7 +395,7 @@ error_messages
bucket_to_old_rs = {}
bucket_cnt = 0
test_run:cmd("setopt delimiter ';'")
-for bucket, rs in pairs(vshard.router.internal.route_map) do
+for bucket, rs in pairs(vshard.router.internal.static_router.route_map) do
bucket_to_old_rs[bucket] = rs
bucket_cnt = bucket_cnt + 1
end;
@@ -403,7 +403,7 @@ bucket_cnt;
vshard.router.cfg(cfg);
for bucket, old_rs in pairs(bucket_to_old_rs) do
local old_uuid = old_rs.uuid
- local rs = vshard.router.internal.route_map[bucket]
+ local rs = vshard.router.internal.static_router.route_map[bucket]
if not rs or not old_uuid == rs.uuid then
error("Bucket lost during reconfigure.")
end
@@ -423,19 +423,19 @@ while
vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY ~= 'waiting' do
fiber.sleep(0.02)
end;
vshard.router.cfg(cfg);
-vshard.router.internal.route_map = {};
+vshard.router.internal.static_router.route_map = {};
vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false;
-- Do discovery iteration. Upload buckets from the
-- first replicaset.
-while not next(vshard.router.internal.route_map) do
+while not next(vshard.router.internal.static_router.route_map) do
vshard.router.discovery_wakeup()
fiber.sleep(0.01)
end;
new_replicasets = {};
-for _, rs in pairs(vshard.router.internal.replicasets) do
+for _, rs in pairs(vshard.router.internal.static_router.replicasets) do
new_replicasets[rs] = true
end;
-_, rs = next(vshard.router.internal.route_map);
+_, rs = next(vshard.router.internal.static_router.route_map);
new_replicasets[rs] == true;
test_run:cmd("setopt delimiter ''");
@@ -453,6 +453,11 @@ vshard.router.internal.errinj.ERRINJ_CFG = false
util.has_same_fields(old_internal, vshard.router.internal)
vshard.router.route(1):callro('echo', {'some_data'})
+-- Multiple routers: check that static router can be used as an
+-- object.
+static_router = vshard.router.internal.static_router
+static_router:route(1):callro('echo', {'some_data'})
+
_ = test_run:cmd("switch default")
test_run:drop_cluster(REPLICASET_2)
diff --git a/vshard/error.lua b/vshard/error.lua
index f79107b..da92b58 100644
--- a/vshard/error.lua
+++ b/vshard/error.lua
@@ -105,7 +105,12 @@ local error_message_template = {
name = 'OBJECT_IS_OUTDATED',
msg = 'Object is outdated after module reload/reconfigure. ' ..
'Use new instance.'
- }
+ },
+ [21] = {
+ name = 'ROUTER_ALREADY_EXISTS',
+ msg = 'Router with name %s already exists',
+ args = {'name'},
+ },
}
--
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 59c25a0..b31f7dc 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -25,14 +25,33 @@ local M = rawget(_G, MODULE_INTERNALS)
if not M then
M = {
---------------- Common module attributes ----------------
- -- The last passed configuration.
- current_cfg = nil,
errinj = {
ERRINJ_CFG = false,
ERRINJ_FAILOVER_CHANGE_CFG = false,
ERRINJ_RELOAD = false,
ERRINJ_LONG_DISCOVERY = false,
},
+ -- Dictionary, key is router name, value is a router.
+ routers = {},
+ -- Router object which can be accessed by old api:
+ -- e.g. vshard.router.call(...)
+ static_router = nil,
+ -- This counter is used to restart background fibers with
+ -- new reloaded code.
+ module_version = 0,
+ -- Number of router which require collecting lua garbage.
+ collect_lua_garbage_cnt = 0,
+ }
+end
+
+--
+-- Router object attributes.
+--
+local ROUTER_TEMPLATE = {
+ -- Name of router.
+ name = nil,
+ -- The last passed configuration.
+ current_cfg = nil,
-- Time to outdate old objects on reload.
connection_outdate_delay = nil,
-- Bucket map cache.
@@ -47,38 +66,60 @@ if not M then
total_bucket_count = 0,
-- Boolean lua_gc state (create periodic gc task).
collect_lua_garbage = nil,
- -- This counter is used to restart background fibers with
- -- new reloaded code.
- module_version = 0,
- }
-end
+}
+
+local STATIC_ROUTER_NAME = '_static_router'
-- Set a bucket to a replicaset.
-local function bucket_set(bucket_id, rs_uuid)
- local replicaset = M.replicasets[rs_uuid]
+local function bucket_set(router, bucket_id, rs_uuid)
+ local replicaset = router.replicasets[rs_uuid]
-- It is technically possible to delete a replicaset at the
-- same time when route to the bucket is discovered.
if not replicaset then
return nil, lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET,
bucket_id)
end
- local old_replicaset = M.route_map[bucket_id]
+ local old_replicaset = router.route_map[bucket_id]
if old_replicaset ~= replicaset then
if old_replicaset then
old_replicaset.bucket_count = old_replicaset.bucket_count - 1
end
replicaset.bucket_count = replicaset.bucket_count + 1
end
- M.route_map[bucket_id] = replicaset
+ router.route_map[bucket_id] = replicaset
return replicaset
end
-- Remove a bucket from the cache.
-local function bucket_reset(bucket_id)
- local replicaset = M.route_map[bucket_id]
+local function bucket_reset(router, bucket_id)
+ local replicaset = router.route_map[bucket_id]
if replicaset then
replicaset.bucket_count = replicaset.bucket_count - 1
end
- M.route_map[bucket_id] = nil
+ router.route_map[bucket_id] = nil
+end
+
+--------------------------------------------------------------------------------
+-- Helpers
+--------------------------------------------------------------------------------
+
+--
+-- Increase/decrease number of routers which require to collect
+-- a lua garbage and change state of the `lua_gc` fiber.
+--
+
+local function lua_gc_cnt_inc()
+ M.collect_lua_garbage_cnt = M.collect_lua_garbage_cnt + 1
+ if M.collect_lua_garbage_cnt == 1 then
+ lua_gc.set_state(true, consts.COLLECT_LUA_GARBAGE_INTERVAL)
+ end
+end
+
+local function lua_gc_cnt_dec()
+ M.collect_lua_garbage_cnt = M.collect_lua_garbage_cnt - 1
+ assert(M.collect_lua_garbage_cnt >= 0)
+ if M.collect_lua_garbage_cnt == 0 then
+ lua_gc.set_state(false, consts.COLLECT_LUA_GARBAGE_INTERVAL)
+ end
end
--------------------------------------------------------------------------------
@@ -86,8 +127,8 @@ end
--------------------------------------------------------------------------------
-- Search bucket in whole cluster
-local function bucket_discovery(bucket_id)
- local replicaset = M.route_map[bucket_id]
+local function bucket_discovery(router, bucket_id)
+ local replicaset = router.route_map[bucket_id]
if replicaset ~= nil then
return replicaset
end
@@ -95,11 +136,11 @@ local function bucket_discovery(bucket_id)
log.verbose("Discovering bucket %d", bucket_id)
local last_err = nil
local unreachable_uuid = nil
- for uuid, replicaset in pairs(M.replicasets) do
+ for uuid, replicaset in pairs(router.replicasets) do
local _, err =
replicaset:callrw('vshard.storage.bucket_stat', {bucket_id})
if err == nil then
- return bucket_set(bucket_id, replicaset.uuid)
+ return bucket_set(router, bucket_id, replicaset.uuid)
elseif err.code ~= lerror.code.WRONG_BUCKET then
last_err = err
unreachable_uuid = uuid
@@ -128,14 +169,14 @@ local function bucket_discovery(bucket_id)
end
-- Resolve bucket id to replicaset uuid
-local function bucket_resolve(bucket_id)
+local function bucket_resolve(router, bucket_id)
local replicaset, err
- local replicaset = M.route_map[bucket_id]
+ local replicaset = router.route_map[bucket_id]
if replicaset ~= nil then
return replicaset
end
-- Replicaset removed from cluster, perform discovery
- replicaset, err = bucket_discovery(bucket_id)
+ replicaset, err = bucket_discovery(router, bucket_id)
if replicaset == nil then
return nil, err
end
@@ -146,14 +187,14 @@ end
-- Background fiber to perform discovery. It periodically scans
-- replicasets one by one and updates route_map.
--
-local function discovery_f()
+local function discovery_f(router)
local module_version = M.module_version
while module_version == M.module_version do
- while not next(M.replicasets) do
+ while not next(router.replicasets) do
lfiber.sleep(consts.DISCOVERY_INTERVAL)
end
- local old_replicasets = M.replicasets
- for rs_uuid, replicaset in pairs(M.replicasets) do
+ local old_replicasets = router.replicasets
+ for rs_uuid, replicaset in pairs(router.replicasets) do
local active_buckets, err =
replicaset:callro('vshard.storage.buckets_discovery', {},
{timeout = 2})
@@ -163,7 +204,7 @@ local function discovery_f()
end
-- Renew replicasets object captured by the for loop
-- in case of reconfigure and reload events.
- if M.replicasets ~= old_replicasets then
+ if router.replicasets ~= old_replicasets then
break
end
if not active_buckets then
@@ -176,11 +217,11 @@ local function discovery_f()
end
replicaset.bucket_count = #active_buckets
for _, bucket_id in pairs(active_buckets) do
- local old_rs = M.route_map[bucket_id]
+ local old_rs = router.route_map[bucket_id]
if old_rs and old_rs ~= replicaset then
old_rs.bucket_count = old_rs.bucket_count - 1
end
- M.route_map[bucket_id] = replicaset
+ router.route_map[bucket_id] = replicaset
end
end
lfiber.sleep(consts.DISCOVERY_INTERVAL)
@@ -191,9 +232,9 @@ end
--
-- Immediately wakeup discovery fiber if exists.
--
-local function discovery_wakeup()
- if M.discovery_fiber then
- M.discovery_fiber:wakeup()
+local function discovery_wakeup(router)
+ if router.discovery_fiber then
+ router.discovery_fiber:wakeup()
end
end
@@ -205,7 +246,7 @@ end
-- Function will restart operation after wrong bucket response until
timeout
-- is reached
--
-local function router_call(bucket_id, mode, func, args, opts)
+local function router_call(router, bucket_id, mode, func, args, opts)
if opts and (type(opts) ~= 'table' or
(opts.timeout and type(opts.timeout) ~= 'number')) then
error('Usage: call(bucket_id, mode, func, args, opts)')
@@ -213,7 +254,7 @@ local function router_call(bucket_id, mode, func,
args, opts)
local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN
local replicaset, err
local tend = lfiber.time() + timeout
- if bucket_id > M.total_bucket_count or bucket_id <= 0 then
+ if bucket_id > router.total_bucket_count or bucket_id <= 0 then
error('Bucket is unreachable: bucket id is out of range')
end
local call
@@ -223,7 +264,7 @@ local function router_call(bucket_id, mode, func,
args, opts)
call = 'callrw'
end
repeat
- replicaset, err = bucket_resolve(bucket_id)
+ replicaset, err = bucket_resolve(router, bucket_id)
if replicaset then
::replicaset_is_found::
local storage_call_status, call_status, call_error =
@@ -239,9 +280,9 @@ local function router_call(bucket_id, mode, func,
args, opts)
end
err = call_status
if err.code == lerror.code.WRONG_BUCKET then
- bucket_reset(bucket_id)
+ bucket_reset(router, bucket_id)
if err.destination then
- replicaset = M.replicasets[err.destination]
+ replicaset = router.replicasets[err.destination]
if not replicaset then
log.warn('Replicaset "%s" was not found, but
received'..
' from storage as destination -
please '..
@@ -253,13 +294,14 @@ local function router_call(bucket_id, mode, func,
args, opts)
-- but already is executed on storages.
while lfiber.time() <= tend do
lfiber.sleep(0.05)
- replicaset = M.replicasets[err.destination]
+ replicaset =
router.replicasets[err.destination]
if replicaset then
goto replicaset_is_found
end
end
else
- replicaset = bucket_set(bucket_id, replicaset.uuid)
+ replicaset = bucket_set(router, bucket_id,
+ replicaset.uuid)
lfiber.yield()
-- Protect against infinite cycle in a
-- case of broken cluster, when a bucket
@@ -276,7 +318,7 @@ local function router_call(bucket_id, mode, func,
args, opts)
-- is not timeout - these requests are repeated in
-- any case on client, if error.
assert(mode == 'write')
- bucket_reset(bucket_id)
+ bucket_reset(router, bucket_id)
return nil, err
elseif err.code == lerror.code.NON_MASTER then
-- Same, as above - do not wait and repeat.
@@ -302,12 +344,12 @@ end
--
-- Wrappers for router_call with preset mode.
--
-local function router_callro(bucket_id, ...)
- return router_call(bucket_id, 'read', ...)
+local function router_callro(router, bucket_id, ...)
+ return router_call(router, bucket_id, 'read', ...)
end
-local function router_callrw(bucket_id, ...)
- return router_call(bucket_id, 'write', ...)
+local function router_callrw(router, bucket_id, ...)
+ return router_call(router, bucket_id, 'write', ...)
end
--
@@ -315,27 +357,27 @@ end
-- @param bucket_id Bucket identifier.
-- @retval Netbox connection.
--
-local function router_route(bucket_id)
+local function router_route(router, bucket_id)
if type(bucket_id) ~= 'number' then
error('Usage: router.route(bucket_id)')
end
- return bucket_resolve(bucket_id)
+ return bucket_resolve(router, bucket_id)
end
--
-- Return map of all replicasets.
-- @retval See self.replicasets map.
--
-local function router_routeall()
- return M.replicasets
+local function router_routeall(router)
+ return router.replicasets
end
--------------------------------------------------------------------------------
-- Failover
--------------------------------------------------------------------------------
-local function failover_ping_round()
- for _, replicaset in pairs(M.replicasets) do
+local function failover_ping_round(router)
+ for _, replicaset in pairs(router.replicasets) do
local replica = replicaset.replica
if replica ~= nil and replica.conn ~= nil and
replica.down_ts == nil then
@@ -378,10 +420,10 @@ end
-- Collect UUIDs of replicasets, priority of whose replica
-- connections must be updated.
--
-local function failover_collect_to_update()
+local function failover_collect_to_update(router)
local ts = lfiber.time()
local uuid_to_update = {}
- for uuid, rs in pairs(M.replicasets) do
+ for uuid, rs in pairs(router.replicasets) do
if failover_need_down_priority(rs, ts) or
failover_need_up_priority(rs, ts) then
table.insert(uuid_to_update, uuid)
@@ -396,16 +438,16 @@ end
-- disconnected replicas.
-- @retval true A replica of an replicaset has been changed.
--
-local function failover_step()
- failover_ping_round()
- local uuid_to_update = failover_collect_to_update()
+local function failover_step(router)
+ failover_ping_round(router)
+ local uuid_to_update = failover_collect_to_update(router)
if #uuid_to_update == 0 then
return false
end
local curr_ts = lfiber.time()
local replica_is_changed = false
for _, uuid in pairs(uuid_to_update) do
- local rs = M.replicasets[uuid]
+ local rs = router.replicasets[uuid]
if M.errinj.ERRINJ_FAILOVER_CHANGE_CFG then
rs = nil
M.errinj.ERRINJ_FAILOVER_CHANGE_CFG = false
@@ -447,7 +489,7 @@ end
-- tries to reconnect to the best replica. When the connection is
-- established, it replaces the original replica.
--
-local function failover_f()
+local function failover_f(router)
local module_version = M.module_version
local min_timeout = math.min(consts.FAILOVER_UP_TIMEOUT,
consts.FAILOVER_DOWN_TIMEOUT)
@@ -457,7 +499,7 @@ local function failover_f()
local prev_was_ok = false
while module_version == M.module_version do
::continue::
- local ok, replica_is_changed = pcall(failover_step)
+ local ok, replica_is_changed = pcall(failover_step, router)
if not ok then
log.error('Error during failovering: %s',
lerror.make(replica_is_changed))
@@ -484,8 +526,8 @@ end
-- Configuration
--------------------------------------------------------------------------------
-local function router_cfg(cfg, is_reload)
- cfg = lcfg.check(cfg, M.current_cfg)
+local function router_cfg(router, cfg, is_reload)
+ cfg = lcfg.check(cfg, router.current_cfg)
local vshard_cfg, box_cfg = lcfg.split(cfg)
if not M.replicasets then
log.info('Starting router configuration')
@@ -511,41 +553,47 @@ local function router_cfg(cfg, is_reload)
-- Move connections from an old configuration to a new one.
-- It must be done with no yields to prevent usage both of not
-- fully moved old replicasets, and not fully built new ones.
- lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
+ lreplicaset.rebind_replicasets(new_replicasets, router.replicasets)
-- Now the new replicasets are fully built. Can establish
-- connections and yield.
for _, replicaset in pairs(new_replicasets) do
replicaset:connect_all()
end
+ -- Change state of lua GC.
+ if vshard_cfg.collect_lua_garbage and not
router.collect_lua_garbage then
+ lua_gc_cnt_inc()
+ elseif not vshard_cfg.collect_lua_garbage and
+ router.collect_lua_garbage then
+ lua_gc_cnt_dec()
+ end
lreplicaset.wait_masters_connect(new_replicasets)
- lreplicaset.outdate_replicasets(M.replicasets,
+ lreplicaset.outdate_replicasets(router.replicasets,
vshard_cfg.connection_outdate_delay)
- M.connection_outdate_delay = vshard_cfg.connection_outdate_delay
- M.total_bucket_count = total_bucket_count
- M.collect_lua_garbage = vshard_cfg.collect_lua_garbage
- M.current_cfg = cfg
- M.replicasets = new_replicasets
- for bucket, rs in pairs(M.route_map) do
- M.route_map[bucket] = M.replicasets[rs.uuid]
- end
- if M.failover_fiber == nil then
- M.failover_fiber =
util.reloadable_fiber_create('vshard.failover', M,
- 'failover_f')
+ router.connection_outdate_delay = vshard_cfg.connection_outdate_delay
+ router.total_bucket_count = total_bucket_count
+ router.collect_lua_garbage = vshard_cfg.collect_lua_garbage
+ router.current_cfg = cfg
+ router.replicasets = new_replicasets
+ for bucket, rs in pairs(router.route_map) do
+ router.route_map[bucket] = router.replicasets[rs.uuid]
+ end
+ if router.failover_fiber == nil then
+ router.failover_fiber = util.reloadable_fiber_create(
+ 'vshard.failover.' .. router.name, M, 'failover_f', router)
+ end
+ if router.discovery_fiber == nil then
+ router.discovery_fiber = util.reloadable_fiber_create(
+ 'vshard.discovery.' .. router.name, M, 'discovery_f', router)
end
- if M.discovery_fiber == nil then
- M.discovery_fiber =
util.reloadable_fiber_create('vshard.discovery', M,
- 'discovery_f')
- end
- lua_gc.set_state(M.collect_lua_garbage,
consts.COLLECT_LUA_GARBAGE_INTERVAL)
end
--------------------------------------------------------------------------------
-- Bootstrap
--------------------------------------------------------------------------------
-local function cluster_bootstrap()
+local function cluster_bootstrap(router)
local replicasets = {}
- for uuid, replicaset in pairs(M.replicasets) do
+ for uuid, replicaset in pairs(router.replicasets) do
table.insert(replicasets, replicaset)
local count, err =
replicaset:callrw('vshard.storage.buckets_count',
{})
@@ -556,9 +604,10 @@ local function cluster_bootstrap()
return nil, lerror.vshard(lerror.code.NON_EMPTY)
end
end
- lreplicaset.calculate_etalon_balance(M.replicasets,
M.total_bucket_count)
+ lreplicaset.calculate_etalon_balance(router.replicasets,
+ router.total_bucket_count)
local bucket_id = 1
- for uuid, replicaset in pairs(M.replicasets) do
+ for uuid, replicaset in pairs(router.replicasets) do
if replicaset.etalon_bucket_count > 0 then
local ok, err =
replicaset:callrw('vshard.storage.bucket_force_create',
@@ -614,7 +663,7 @@ local function replicaset_instance_info(replicaset,
name, alerts, errcolor,
return info, consts.STATUS.GREEN
end
-local function router_info()
+local function router_info(router)
local state = {
replicasets = {},
bucket = {
@@ -628,7 +677,7 @@ local function router_info()
}
local bucket_info = state.bucket
local known_bucket_count = 0
- for rs_uuid, replicaset in pairs(M.replicasets) do
+ for rs_uuid, replicaset in pairs(router.replicasets) do
-- Replicaset info parameters:
-- * master instance info;
-- * replica instance info;
@@ -716,7 +765,7 @@ local function router_info()
-- If a bucket is unreachable, then replicaset is
-- unreachable too and color already is red.
end
- bucket_info.unknown = M.total_bucket_count - known_bucket_count
+ bucket_info.unknown = router.total_bucket_count - known_bucket_count
if bucket_info.unknown > 0 then
state.status = math.max(state.status, consts.STATUS.YELLOW)
table.insert(state.alerts,
lerror.alert(lerror.code.UNKNOWN_BUCKETS,
@@ -733,13 +782,13 @@ end
-- @param limit Maximal bucket count in output.
-- @retval Map of type {bucket_id = 'unknown'/replicaset_uuid}.
--
-local function router_buckets_info(offset, limit)
+local function router_buckets_info(router, offset, limit)
if offset ~= nil and type(offset) ~= 'number' or
limit ~= nil and type(limit) ~= 'number' then
error('Usage: buckets_info(offset, limit)')
end
offset = offset or 0
- limit = limit or M.total_bucket_count
+ limit = limit or router.total_bucket_count
local ret = {}
-- Use one string memory for all unknown buckets.
local available_rw = 'available_rw'
@@ -748,9 +797,9 @@ local function router_buckets_info(offset, limit)
local unreachable = 'unreachable'
-- Collect limit.
local first = math.max(1, offset + 1)
- local last = math.min(offset + limit, M.total_bucket_count)
+ local last = math.min(offset + limit, router.total_bucket_count)
for bucket_id = first, last do
- local rs = M.route_map[bucket_id]
+ local rs = router.route_map[bucket_id]
if rs then
if rs.master and rs.master:is_connected() then
ret[bucket_id] = {uuid = rs.uuid, status = available_rw}
@@ -770,22 +819,22 @@ end
-- Other
--------------------------------------------------------------------------------
-local function router_bucket_id(key)
+local function router_bucket_id(router, key)
if key == nil then
error("Usage: vshard.router.bucket_id(key)")
end
- return lhash.key_hash(key) % M.total_bucket_count + 1
+ return lhash.key_hash(key) % router.total_bucket_count + 1
end
-local function router_bucket_count()
- return M.total_bucket_count
+local function router_bucket_count(router)
+ return router.total_bucket_count
end
-local function router_sync(timeout)
+local function router_sync(router, timeout)
if timeout ~= nil and type(timeout) ~= 'number' then
error('Usage: vshard.router.sync([timeout: number])')
end
- for rs_uuid, replicaset in pairs(M.replicasets) do
+ for rs_uuid, replicaset in pairs(router.replicasets) do
local status, err = replicaset:callrw('vshard.storage.sync',
{timeout})
if not status then
-- Add information about replicaset
@@ -799,6 +848,90 @@ if M.errinj.ERRINJ_RELOAD then
error('Error injection: reload')
end
+--------------------------------------------------------------------------------
+-- Managing router instances
+--------------------------------------------------------------------------------
+
+local function cfg_reconfigure(router, cfg)
+ return router_cfg(router, cfg, false)
+end
+
+local router_mt = {
+ __index = {
+ cfg = cfg_reconfigure;
+ info = router_info;
+ buckets_info = router_buckets_info;
+ call = router_call;
+ callro = router_callro;
+ callrw = router_callrw;
+ route = router_route;
+ routeall = router_routeall;
+ bucket_id = router_bucket_id;
+ bucket_count = router_bucket_count;
+ sync = router_sync;
+ bootstrap = cluster_bootstrap;
+ bucket_discovery = bucket_discovery;
+ discovery_wakeup = discovery_wakeup;
+ }
+}
+
+-- Table which represents this module.
+local module = {}
+
+-- This metatable bypasses calls to a module to the static_router.
+local module_mt = {__index = {}}
+for method_name, method in pairs(router_mt.__index) do
+ module_mt.__index[method_name] = function(...)
+ return method(M.static_router, ...)
+ end
+end
+
+local function export_static_router_attributes()
+ setmetatable(module, module_mt)
+end
+
+--
+-- Create a new instance of router.
+-- @param name Name of a new router.
+-- @param cfg Configuration for `router_cfg`.
+-- @retval Router instance.
+-- @retval Nil and error object.
+--
+local function router_new(name, cfg)
+ if type(name) ~= 'string' or type(cfg) ~= 'table' then
+ error('Wrong argument type. Usage: vshard.router.new(name,
cfg).')
+ end
+ if M.routers[name] then
+ return nil, lerror.vshard(lerror.code.ROUTER_ALREADY_EXISTS, name)
+ end
+ local router = table.deepcopy(ROUTER_TEMPLATE)
+ setmetatable(router, router_mt)
+ router.name = name
+ M.routers[name] = router
+ router_cfg(router, cfg)
+ return router
+end
+
+--
+-- Wrapper around a `router_new` API, which allow to use old
+-- static `vshard.router.cfg()` API.
+--
+local function legacy_cfg(cfg)
+ if M.static_router then
+ -- Reconfigure.
+ router_cfg(M.static_router, cfg, false)
+ else
+ -- Create new static instance.
+ local router, err = router_new(STATIC_ROUTER_NAME, cfg)
+ if router then
+ M.static_router = router
+ export_static_router_attributes()
+ else
+ return nil, err
+ end
+ end
+end
+
--------------------------------------------------------------------------------
-- Module definition
--------------------------------------------------------------------------------
@@ -809,28 +942,23 @@ end
if not rawget(_G, MODULE_INTERNALS) then
rawset(_G, MODULE_INTERNALS, M)
else
- router_cfg(M.current_cfg, true)
+ for _, router in pairs(M.routers) do
+ router_cfg(router, router.current_cfg, true)
+ setmetatable(router, router_mt)
+ end
+ if M.static_router then
+ export_static_router_attributes()
+ end
M.module_version = M.module_version + 1
end
M.discovery_f = discovery_f
M.failover_f = failover_f
+M.router_mt = router_mt
-return {
- cfg = function(cfg) return router_cfg(cfg, false) end;
- info = router_info;
- buckets_info = router_buckets_info;
- call = router_call;
- callro = router_callro;
- callrw = router_callrw;
- route = router_route;
- routeall = router_routeall;
- bucket_id = router_bucket_id;
- bucket_count = router_bucket_count;
- sync = router_sync;
- bootstrap = cluster_bootstrap;
- bucket_discovery = bucket_discovery;
- discovery_wakeup = discovery_wakeup;
- internal = M;
- module_version = function() return M.module_version end;
-}
+module.cfg = legacy_cfg
+module.new = router_new
+module.internal = M
+module.module_version = function() return M.module_version end
+
+return module
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 0593edf..63aa96f 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -1632,8 +1632,6 @@ local function storage_cfg(cfg, this_replica_uuid,
is_reload)
M.rebalancer_max_receiving = rebalancer_max_receiving
M.shard_index = shard_index
M.collect_bucket_garbage_interval = collect_bucket_garbage_interval
- M.collect_lua_garbage = collect_lua_garbage
- M.current_cfg = cfg
M.collect_lua_garbage = vshard_cfg.collect_lua_garbage
M.current_cfg = cfg
diff --git a/vshard/util.lua b/vshard/util.lua
index 37abe2b..3afaa61 100644
--- a/vshard/util.lua
+++ b/vshard/util.lua
@@ -38,11 +38,11 @@ end
-- reload of that module.
-- See description of parameters in `reloadable_fiber_create`.
--
-local function reloadable_fiber_main_loop(module, func_name)
+local function reloadable_fiber_main_loop(module, func_name, data)
log.info('%s has been started', func_name)
local func = module[func_name]
::restart_loop::
- local ok, err = pcall(func)
+ local ok, err = pcall(func, data)
-- yield serves two purposes:
-- * makes this fiber cancellable
-- * prevents 100% cpu consumption
@@ -60,7 +60,7 @@ local function reloadable_fiber_main_loop(module,
func_name)
log.info('module is reloaded, restarting')
-- luajit drops this frame if next function is called in
-- return statement.
- return M.reloadable_fiber_main_loop(module, func_name)
+ return M.reloadable_fiber_main_loop(module, func_name, data)
end
--
@@ -74,11 +74,13 @@ end
-- @param module Module which can be reloaded.
-- @param func_name Name of a function to be executed in the
-- module.
+-- @param data Data to be passed to the specified function.
-- @retval New fiber.
--
-local function reloadable_fiber_create(fiber_name, module, func_name)
+local function reloadable_fiber_create(fiber_name, module, func_name, data)
assert(type(fiber_name) == 'string')
- local xfiber = fiber.create(reloadable_fiber_main_loop, module,
func_name)
+ local xfiber = fiber.create(reloadable_fiber_main_loop, module,
func_name,
+ data)
xfiber:name(fiber_name)
return xfiber
end
More information about the Tarantool-patches
mailing list