From: Alex Khatskevich <avkhatskevich@tarantool.org> To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>, tarantool-patches@freelists.org Subject: [tarantool-patches] Re: [PATCH 3/3] Introduce multiple routers feature Date: Tue, 7 Aug 2018 16:18:25 +0300 [thread overview] Message-ID: <e98bf0e4-6014-7947-5871-ed802a8b11eb@tarantool.org> (raw) In-Reply-To: <9109c71a-0313-82c8-02db-3e4dd8c833cd@tarantool.org> On 06.08.2018 20:03, Vladislav Shpilevoy wrote: > Thanks for the patch! See 8 comments below. > > 1. You did not send a full diff. There are tests only. (In > this email I pasted it myself). Please, send a full diff > next times. Ok, sorry. > >>>> + if M.routers[name] then >>>> + return nil, string.format('Router with name %s already >>>> exists', name) >>>> + end >>>> + local router = table.deepcopy(ROUTER_TEMPLATE) >>>> + setmetatable(router, router_mt) >>>> + router.name = name >>>> + M.routers[name] = router >>>> + if name == STATIC_ROUTER_NAME then >>>> + M.static_router = router >>>> + export_static_router_attributes() >>>> + end >>> >>> 9. This check can be removed if you move >>> export_static_router_attributes call into legacy_cfg. >> Butbue to this if, the static router can be configured by >> `vshard.box.new(static_router_name)`. > > 2. It is not ok. A user should not use any internal names like > _statis_router to configure it and get. Please, add a new member > vshard.router.static that references the statis one. Until cfg > is called it is nil. Fixed. By now, user can create a static router only by calling `vshard.router.cfg()` > >> diff --git a/vshard/router/init.lua b/vshard/router/init.lua >> index 3e127cb..62fdcda 100644 >> --- a/vshard/router/init.lua >> +++ b/vshard/router/init.lua >> @@ -25,14 +25,32 @@ local M = rawget(_G, MODULE_INTERNALS) >> if not M then >> M = { >> ---------------- Common module attributes ---------------- >> - -- The last passed configuration. >> - current_cfg = nil, >> errinj = { >> ERRINJ_CFG = false, >> ERRINJ_FAILOVER_CHANGE_CFG = false, >> ERRINJ_RELOAD = false, >> ERRINJ_LONG_DISCOVERY = false, >> }, >> + -- Dictionary, key is router name, value is a router. >> + routers = {}, >> + -- Router object which can be accessed by old api: >> + -- e.g. vshard.router.call(...) >> + static_router = nil, >> + -- This counter is used to restart background fibers with >> + -- new reloaded code. >> + module_version = 0, >> + collect_lua_garbage_cnt = 0, > > 3. A comment? added -- Number of router which require collecting lua garbage. > >> + } >> +end >> + >> +-- >> +-- Router object attributes. >> +-- >> +local ROUTER_TEMPLATE = { >> + -- Name of router. >> + name = nil, >> + -- The last passed configuration. >> + current_cfg = nil, >> -- Time to outdate old objects on reload. >> connection_outdate_delay = nil, >> -- Bucket map cache.> @@ -488,8 +505,20 @@ end >> -- Configuration >> -------------------------------------------------------------------------------- >> >> >> -local function router_cfg(cfg) >> - local vshard_cfg, box_cfg = lcfg.check(cfg, M.current_cfg) >> +local function change_lua_gc_cnt(val) > > 4. The same. fixed > >> + assert(M.collect_lua_garbage_cnt >= 0) >> + local prev_cnt = M.collect_lua_garbage_cnt >> + M.collect_lua_garbage_cnt = M.collect_lua_garbage_cnt + val >> + if prev_cnt == 0 and M.collect_lua_garbage_cnt > 0 then >> + lua_gc.set_state(true, consts.COLLECT_LUA_GARBAGE_INTERVAL) >> + end >> + if prev_cnt > 0 and M.collect_lua_garbage_cnt == 0 then >> + lua_gc.set_state(false, consts.COLLECT_LUA_GARBAGE_INTERVAL) >> + end > > 5. You know the concrete val in the caller always: 1 or -1. I think > it would look simpler if you split this function into separate inc > and dec ones. The former checks for prev == 0 and new > 0, the later > checks for prev > 0 and new == 0. It is not needed to check both > each time. changed > >> +end >> + >> +local function router_cfg(router, cfg) >> + local vshard_cfg, box_cfg = lcfg.check(cfg, router.current_cfg) >> if not M.replicasets then >> log.info('Starting router configuration') >> else >> @@ -803,6 +839,77 @@ if M.errinj.ERRINJ_RELOAD then >> error('Error injection: reload') >> end >> >> +-------------------------------------------------------------------------------- >> >> +-- Managing router instances >> +-------------------------------------------------------------------------------- >> >> + >> +local function cfg_reconfigure(router, cfg) >> + return router_cfg(router, cfg) >> +end >> + >> +local router_mt = { >> + __index = { >> + cfg = cfg_reconfigure; >> + info = router_info; >> + buckets_info = router_buckets_info; >> + call = router_call; >> + callro = router_callro; >> + callrw = router_callrw; >> + route = router_route; >> + routeall = router_routeall; >> + bucket_id = router_bucket_id; >> + bucket_count = router_bucket_count; >> + sync = router_sync; >> + bootstrap = cluster_bootstrap; >> + bucket_discovery = bucket_discovery; >> + discovery_wakeup = discovery_wakeup; >> + } >> +} >> + >> +-- Table which represents this module. >> +local module = {} >> + >> +-- This metatable bypasses calls to a module to the static_router. >> +local module_mt = {__index = {}} >> +for method_name, method in pairs(router_mt.__index) do >> + module_mt.__index[method_name] = function(...) >> + return method(M.static_router, ...) >> + end >> +end >> + >> +local function export_static_router_attributes() >> + setmetatable(module, module_mt) >> +end >> + >> +local function router_new(name, cfg) > > 6. A comment? added > > 7. This function should not check for router_name == static one. > It just creates a new router and returns it. The caller should set > it into M.routers or M.static_router if needed. For a user you > expose not this function but a wrapper that calls router_new and > sets M.routers. fixed. > >> + if type(name) ~= 'string' or type(cfg) ~= 'table' then >> + error('Wrong argument type. Usage: >> vshard.router.new(name, cfg).') >> + end >> + if M.routers[name] then >> + return nil, string.format('Router with name %s already >> exists', name) >> + end >> + local router = table.deepcopy(ROUTER_TEMPLATE) >> + setmetatable(router, router_mt) >> + router.name = name >> + M.routers[name] = router >> + if name == STATIC_ROUTER_NAME then >> + M.static_router = router >> + export_static_router_attributes() >> + end >> + router_cfg(router, cfg) >> + return router >> +end >> + >> @@ -813,28 +920,23 @@ end >> if not rawget(_G, MODULE_INTERNALS) then >> rawset(_G, MODULE_INTERNALS, M) >> else >> - router_cfg(M.current_cfg) >> + for _, router in pairs(M.routers) do >> + router_cfg(router, router.current_cfg) >> + setmetatable(router, router_mt) >> + end >> M.module_version = M.module_version + 1 >> end >> >> M.discovery_f = discovery_f >> M.failover_f = failover_f >> +M.router_mt = router_mt >> +if M.static_router then >> + export_static_router_attributes() >> +end > > 8. This is possible on reload only and can be moved into > the if above to the reload case processing. Fixed. Here is a full diff commit 87b6dc044de177e159dbe24f07abf3f98839ccff Author: AKhatskevich <avkhatskevich@tarantool.org> Date: Thu Jul 26 16:17:25 2018 +0300 Introduce multiple routers feature Key points: * Old `vshard.router.some_method()` api is preserved. * Add `vshard.router.new(name, cfg)` method which returns a new router. * Each router has its own: 1. name 2. background fibers 3. attributes (route_map, replicasets, outdate_delay...) * Module reload reloads all configured routers. * `cfg` reconfigures a single router. * All routers share the same box configuration. The last passed config overrides the global box config. * Multiple router instances can be connected to the same cluster. * By now, a router cannot be destroyed. Extra changes: * Add `data` parameter to `reloadable_fiber_create` function. Closes #130 diff --git a/test/failover/failover.result b/test/failover/failover.result index 73a4250..50410ad 100644 --- a/test/failover/failover.result +++ b/test/failover/failover.result @@ -174,7 +174,7 @@ test_run:switch('router_1') --- - true ... -rs1 = vshard.router.internal.replicasets[rs_uuid[1]] +rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]] --- ... while not rs1.replica_up_ts do fiber.sleep(0.1) end diff --git a/test/failover/failover.test.lua b/test/failover/failover.test.lua index 6e06314..44c8b6d 100644 --- a/test/failover/failover.test.lua +++ b/test/failover/failover.test.lua @@ -74,7 +74,7 @@ echo_count -- Ensure that replica_up_ts is updated periodically. test_run:switch('router_1') -rs1 = vshard.router.internal.replicasets[rs_uuid[1]] +rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]] while not rs1.replica_up_ts do fiber.sleep(0.1) end old_up_ts = rs1.replica_up_ts while rs1.replica_up_ts == old_up_ts do fiber.sleep(0.1) end diff --git a/test/failover/failover_errinj.result b/test/failover/failover_errinj.result index 3b6d986..484a1e3 100644 --- a/test/failover/failover_errinj.result +++ b/test/failover/failover_errinj.result @@ -49,7 +49,7 @@ vshard.router.cfg(cfg) -- Check that already run failover step is restarted on -- configuration change (if some replicasets are removed from -- config). -rs1 = vshard.router.internal.replicasets[rs_uuid[1]] +rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]] --- ... while not rs1.replica or not rs1.replica.conn:is_connected() do fiber.sleep(0.1) end diff --git a/test/failover/failover_errinj.test.lua b/test/failover/failover_errinj.test.lua index b4d2d35..14228de 100644 --- a/test/failover/failover_errinj.test.lua +++ b/test/failover/failover_errinj.test.lua @@ -20,7 +20,7 @@ vshard.router.cfg(cfg) -- Check that already run failover step is restarted on -- configuration change (if some replicasets are removed from -- config). -rs1 = vshard.router.internal.replicasets[rs_uuid[1]] +rs1 = vshard.router.internal.static_router.replicasets[rs_uuid[1]] while not rs1.replica or not rs1.replica.conn:is_connected() do fiber.sleep(0.1) end vshard.router.internal.errinj.ERRINJ_FAILOVER_CHANGE_CFG = true wait_state('Configuration has changed, restart ') diff --git a/test/failover/router_1.lua b/test/failover/router_1.lua index d71209b..664a6c6 100644 --- a/test/failover/router_1.lua +++ b/test/failover/router_1.lua @@ -42,7 +42,7 @@ end function priority_order() local ret = {} for _, uuid in pairs(rs_uuid) do - local rs = vshard.router.internal.replicasets[uuid] + local rs = vshard.router.internal.static_router.replicasets[uuid] local sorted = {} for _, replica in pairs(rs.priority_list) do local z diff --git a/test/misc/reconfigure.result b/test/misc/reconfigure.result index c7960b3..311f749 100644 --- a/test/misc/reconfigure.result +++ b/test/misc/reconfigure.result @@ -250,7 +250,7 @@ test_run:switch('router_1') -- Ensure that in a case of error router internals are not -- changed. -- -not vshard.router.internal.collect_lua_garbage +not vshard.router.internal.static_router.collect_lua_garbage --- - true ... @@ -264,7 +264,7 @@ vshard.router.cfg(cfg) --- - error: 'Incorrect value for option ''invalid_option'': unexpected option' ... -not vshard.router.internal.collect_lua_garbage +not vshard.router.internal.static_router.collect_lua_garbage --- - true ... diff --git a/test/misc/reconfigure.test.lua b/test/misc/reconfigure.test.lua index 25dc2ca..298b9b0 100644 --- a/test/misc/reconfigure.test.lua +++ b/test/misc/reconfigure.test.lua @@ -99,11 +99,11 @@ test_run:switch('router_1') -- Ensure that in a case of error router internals are not -- changed. -- -not vshard.router.internal.collect_lua_garbage +not vshard.router.internal.static_router.collect_lua_garbage cfg.collect_lua_garbage = true cfg.invalid_option = 'kek' vshard.router.cfg(cfg) -not vshard.router.internal.collect_lua_garbage +not vshard.router.internal.static_router.collect_lua_garbage cfg.invalid_option = nil cfg.collect_lua_garbage = nil vshard.router.cfg(cfg) diff --git a/test/multiple_routers/configs.lua b/test/multiple_routers/configs.lua new file mode 100644 index 0000000..a6ce33c --- /dev/null +++ b/test/multiple_routers/configs.lua @@ -0,0 +1,81 @@ +names = { + storage_1_1_a = '32a2d4b8-f146-44ed-9d51-2436507efdf8', + storage_1_1_b = 'c1c849b1-641d-40b8-9283-bcfe73d46270', + storage_1_2_a = '04e677ed-c7ba-47e0-a67f-b5100cfa86af', + storage_1_2_b = 'c7a979ee-9263-4a38-84a5-2fb6a0a32684', + storage_2_1_a = '88dc03f0-23fb-4f05-b462-e29186542864', + storage_2_1_b = '4230b711-f5c4-4131-bf98-88cd43a16901', + storage_2_2_a = '6b1eefbc-1e2e-410e-84ff-44c572ea9916', + storage_2_2_b = 'be74419a-1e56-4ba4-97e9-6b18710f63c5', +} + +rs_1_1 = 'dd208fb8-8b90-49bc-8393-6b3a99da7c52' +rs_1_2 = 'af9cfe88-2091-4613-a877-a623776c5c0e' +rs_2_1 = '9ca8ee15-ae18-4f31-9385-4859f89ce73f' +rs_2_2 = '007f5f58-b654-4125-8441-a71866fb62b5' + +local cfg_1 = {} +cfg_1.sharding = { + [rs_1_1] = { + replicas = { + [names.storage_1_1_a] = { + uri = 'storage:storage@127.0.0.1:3301', + name = 'storage_1_1_a', + master = true, + }, + [names.storage_1_1_b] = { + uri = 'storage:storage@127.0.0.1:3302', + name = 'storage_1_1_b', + }, + } + }, + [rs_1_2] = { + replicas = { + [names.storage_1_2_a] = { + uri = 'storage:storage@127.0.0.1:3303', + name = 'storage_1_2_a', + master = true, + }, + [names.storage_1_2_b] = { + uri = 'storage:storage@127.0.0.1:3304', + name = 'storage_1_2_b', + }, + } + }, +} + + +local cfg_2 = {} +cfg_2.sharding = { + [rs_2_1] = { + replicas = { + [names.storage_2_1_a] = { + uri = 'storage:storage@127.0.0.1:3305', + name = 'storage_2_1_a', + master = true, + }, + [names.storage_2_1_b] = { + uri = 'storage:storage@127.0.0.1:3306', + name = 'storage_2_1_b', + }, + } + }, + [rs_2_2] = { + replicas = { + [names.storage_2_2_a] = { + uri = 'storage:storage@127.0.0.1:3307', + name = 'storage_2_2_a', + master = true, + }, + [names.storage_2_2_b] = { + uri = 'storage:storage@127.0.0.1:3308', + name = 'storage_2_2_b', + }, + } + }, +} + +return { + cfg_1 = cfg_1, + cfg_2 = cfg_2, +} diff --git a/test/multiple_routers/multiple_routers.result b/test/multiple_routers/multiple_routers.result new file mode 100644 index 0000000..5b85e1c --- /dev/null +++ b/test/multiple_routers/multiple_routers.result @@ -0,0 +1,301 @@ +test_run = require('test_run').new() +--- +... +REPLICASET_1_1 = { 'storage_1_1_a', 'storage_1_1_b' } +--- +... +REPLICASET_1_2 = { 'storage_1_2_a', 'storage_1_2_b' } +--- +... +REPLICASET_2_1 = { 'storage_2_1_a', 'storage_2_1_b' } +--- +... +REPLICASET_2_2 = { 'storage_2_2_a', 'storage_2_2_b' } +--- +... +test_run:create_cluster(REPLICASET_1_1, 'multiple_routers') +--- +... +test_run:create_cluster(REPLICASET_1_2, 'multiple_routers') +--- +... +test_run:create_cluster(REPLICASET_2_1, 'multiple_routers') +--- +... +test_run:create_cluster(REPLICASET_2_2, 'multiple_routers') +--- +... +util = require('lua_libs.util') +--- +... +util.wait_master(test_run, REPLICASET_1_1, 'storage_1_1_a') +--- +... +util.wait_master(test_run, REPLICASET_1_2, 'storage_1_2_a') +--- +... +util.wait_master(test_run, REPLICASET_2_1, 'storage_2_1_a') +--- +... +util.wait_master(test_run, REPLICASET_2_2, 'storage_2_2_a') +--- +... +test_run:cmd("create server router_1 with script='multiple_routers/router_1.lua'") +--- +- true +... +test_run:cmd("start server router_1") +--- +- true +... +-- Configure default (static) router. +_ = test_run:cmd("switch router_1") +--- +... +vshard.router.cfg(configs.cfg_1) +--- +... +vshard.router.bootstrap() +--- +- true +... +_ = test_run:cmd("switch storage_1_2_a") +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +_ = test_run:cmd("switch router_1") +--- +... +vshard.router.call(1, 'write', 'do_replace', {{1, 1}}) +--- +- true +... +vshard.router.call(1, 'read', 'do_select', {1}) +--- +- [[1, 1]] +... +-- Test that static router is just a router object under the hood. +static_router = vshard.router.internal.static_router +--- +... +static_router:route(1) == vshard.router.route(1) +--- +- true +... +-- Configure extra router. +router_2 = vshard.router.new('router_2', configs.cfg_2) +--- +... +router_2:bootstrap() +--- +- true +... +_ = test_run:cmd("switch storage_2_2_a") +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +_ = test_run:cmd("switch router_1") +--- +... +router_2:call(1, 'write', 'do_replace', {{2, 2}}) +--- +- true +... +router_2:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +-- Check that router_2 and static router serves different clusters. +#router_2:call(1, 'read', 'do_select', {1}) == 0 +--- +- true +... +-- Create several routers to the same cluster. +routers = {} +--- +... +for i = 3, 10 do routers[i] = vshard.router.new('router_' .. i, configs.cfg_2) end +--- +... +routers[3]:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +-- Check that they have their own background fibers. +fiber_names = {} +--- +... +for i = 2, 10 do fiber_names['vshard.failover.router_' .. i] = true; fiber_names['vshard.discovery.router_' .. i] = true; end +--- +... +next(fiber_names) ~= nil +--- +- true +... +fiber = require('fiber') +--- +... +for _, xfiber in pairs(fiber.info()) do fiber_names[xfiber.name] = nil end +--- +... +next(fiber_names) == nil +--- +- true +... +-- Reconfigure one of routers do not affect the others. +routers[3]:cfg(configs.cfg_1) +--- +... +routers[3]:call(1, 'read', 'do_select', {1}) +--- +- [[1, 1]] +... +#routers[3]:call(1, 'read', 'do_select', {2}) == 0 +--- +- true +... +#routers[4]:call(1, 'read', 'do_select', {1}) == 0 +--- +- true +... +routers[4]:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +routers[3]:cfg(configs.cfg_2) +--- +... +-- Try to create router with the same name. +util = require('lua_libs.util') +--- +... +util.check_error(vshard.router.new, 'router_2', configs.cfg_2) +--- +- null +- type: ShardingError + code: 21 + name: ROUTER_ALREADY_EXISTS + message: Router with name router_2 already exists +... +-- Reload router module. +_, old_rs_1 = next(vshard.router.internal.static_router.replicasets) +--- +... +_, old_rs_2 = next(router_2.replicasets) +--- +... +package.loaded['vshard.router'] = nil +--- +... +vshard.router = require('vshard.router') +--- +... +while not old_rs_1.is_outdated do fiber.sleep(0.01) end +--- +... +while not old_rs_2.is_outdated do fiber.sleep(0.01) end +--- +... +vshard.router.call(1, 'read', 'do_select', {1}) +--- +- [[1, 1]] +... +router_2:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +routers[5]:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +-- Check lua_gc counter. +lua_gc = require('vshard.lua_gc') +--- +... +vshard.router.internal.collect_lua_garbage_cnt == 0 +--- +- true +... +lua_gc.internal.bg_fiber == nil +--- +- true +... +configs.cfg_2.collect_lua_garbage = true +--- +... +routers[5]:cfg(configs.cfg_2) +--- +... +lua_gc.internal.bg_fiber ~= nil +--- +- true +... +routers[7]:cfg(configs.cfg_2) +--- +... +lua_gc.internal.bg_fiber ~= nil +--- +- true +... +vshard.router.internal.collect_lua_garbage_cnt == 2 +--- +- true +... +package.loaded['vshard.router'] = nil +--- +... +vshard.router = require('vshard.router') +--- +... +vshard.router.internal.collect_lua_garbage_cnt == 2 +--- +- true +... +configs.cfg_2.collect_lua_garbage = nil +--- +... +routers[5]:cfg(configs.cfg_2) +--- +... +lua_gc.internal.bg_fiber ~= nil +--- +- true +... +routers[7]:cfg(configs.cfg_2) +--- +... +vshard.router.internal.collect_lua_garbage_cnt == 0 +--- +- true +... +lua_gc.internal.bg_fiber == nil +--- +- true +... +_ = test_run:cmd("switch default") +--- +... +test_run:cmd("stop server router_1") +--- +- true +... +test_run:cmd("cleanup server router_1") +--- +- true +... +test_run:drop_cluster(REPLICASET_1_1) +--- +... +test_run:drop_cluster(REPLICASET_1_2) +--- +... +test_run:drop_cluster(REPLICASET_2_1) +--- +... +test_run:drop_cluster(REPLICASET_2_2) +--- +... diff --git a/test/multiple_routers/multiple_routers.test.lua b/test/multiple_routers/multiple_routers.test.lua new file mode 100644 index 0000000..ec3c7f7 --- /dev/null +++ b/test/multiple_routers/multiple_routers.test.lua @@ -0,0 +1,109 @@ +test_run = require('test_run').new() + +REPLICASET_1_1 = { 'storage_1_1_a', 'storage_1_1_b' } +REPLICASET_1_2 = { 'storage_1_2_a', 'storage_1_2_b' } +REPLICASET_2_1 = { 'storage_2_1_a', 'storage_2_1_b' } +REPLICASET_2_2 = { 'storage_2_2_a', 'storage_2_2_b' } + +test_run:create_cluster(REPLICASET_1_1, 'multiple_routers') +test_run:create_cluster(REPLICASET_1_2, 'multiple_routers') +test_run:create_cluster(REPLICASET_2_1, 'multiple_routers') +test_run:create_cluster(REPLICASET_2_2, 'multiple_routers') +util = require('lua_libs.util') +util.wait_master(test_run, REPLICASET_1_1, 'storage_1_1_a') +util.wait_master(test_run, REPLICASET_1_2, 'storage_1_2_a') +util.wait_master(test_run, REPLICASET_2_1, 'storage_2_1_a') +util.wait_master(test_run, REPLICASET_2_2, 'storage_2_2_a') + +test_run:cmd("create server router_1 with script='multiple_routers/router_1.lua'") +test_run:cmd("start server router_1") + +-- Configure default (static) router. +_ = test_run:cmd("switch router_1") +vshard.router.cfg(configs.cfg_1) +vshard.router.bootstrap() +_ = test_run:cmd("switch storage_1_2_a") +wait_rebalancer_state('The cluster is balanced ok', test_run) +_ = test_run:cmd("switch router_1") + +vshard.router.call(1, 'write', 'do_replace', {{1, 1}}) +vshard.router.call(1, 'read', 'do_select', {1}) + +-- Test that static router is just a router object under the hood. +static_router = vshard.router.internal.static_router +static_router:route(1) == vshard.router.route(1) + +-- Configure extra router. +router_2 = vshard.router.new('router_2', configs.cfg_2) +router_2:bootstrap() +_ = test_run:cmd("switch storage_2_2_a") +wait_rebalancer_state('The cluster is balanced ok', test_run) +_ = test_run:cmd("switch router_1") + +router_2:call(1, 'write', 'do_replace', {{2, 2}}) +router_2:call(1, 'read', 'do_select', {2}) +-- Check that router_2 and static router serves different clusters. +#router_2:call(1, 'read', 'do_select', {1}) == 0 + +-- Create several routers to the same cluster. +routers = {} +for i = 3, 10 do routers[i] = vshard.router.new('router_' .. i, configs.cfg_2) end +routers[3]:call(1, 'read', 'do_select', {2}) +-- Check that they have their own background fibers. +fiber_names = {} +for i = 2, 10 do fiber_names['vshard.failover.router_' .. i] = true; fiber_names['vshard.discovery.router_' .. i] = true; end +next(fiber_names) ~= nil +fiber = require('fiber') +for _, xfiber in pairs(fiber.info()) do fiber_names[xfiber.name] = nil end +next(fiber_names) == nil + +-- Reconfigure one of routers do not affect the others. +routers[3]:cfg(configs.cfg_1) +routers[3]:call(1, 'read', 'do_select', {1}) +#routers[3]:call(1, 'read', 'do_select', {2}) == 0 +#routers[4]:call(1, 'read', 'do_select', {1}) == 0 +routers[4]:call(1, 'read', 'do_select', {2}) +routers[3]:cfg(configs.cfg_2) + +-- Try to create router with the same name. +util = require('lua_libs.util') +util.check_error(vshard.router.new, 'router_2', configs.cfg_2) + +-- Reload router module. +_, old_rs_1 = next(vshard.router.internal.static_router.replicasets) +_, old_rs_2 = next(router_2.replicasets) +package.loaded['vshard.router'] = nil +vshard.router = require('vshard.router') +while not old_rs_1.is_outdated do fiber.sleep(0.01) end +while not old_rs_2.is_outdated do fiber.sleep(0.01) end +vshard.router.call(1, 'read', 'do_select', {1}) +router_2:call(1, 'read', 'do_select', {2}) +routers[5]:call(1, 'read', 'do_select', {2}) + +-- Check lua_gc counter. +lua_gc = require('vshard.lua_gc') +vshard.router.internal.collect_lua_garbage_cnt == 0 +lua_gc.internal.bg_fiber == nil +configs.cfg_2.collect_lua_garbage = true +routers[5]:cfg(configs.cfg_2) +lua_gc.internal.bg_fiber ~= nil +routers[7]:cfg(configs.cfg_2) +lua_gc.internal.bg_fiber ~= nil +vshard.router.internal.collect_lua_garbage_cnt == 2 +package.loaded['vshard.router'] = nil +vshard.router = require('vshard.router') +vshard.router.internal.collect_lua_garbage_cnt == 2 +configs.cfg_2.collect_lua_garbage = nil +routers[5]:cfg(configs.cfg_2) +lua_gc.internal.bg_fiber ~= nil +routers[7]:cfg(configs.cfg_2) +vshard.router.internal.collect_lua_garbage_cnt == 0 +lua_gc.internal.bg_fiber == nil + +_ = test_run:cmd("switch default") +test_run:cmd("stop server router_1") +test_run:cmd("cleanup server router_1") +test_run:drop_cluster(REPLICASET_1_1) +test_run:drop_cluster(REPLICASET_1_2) +test_run:drop_cluster(REPLICASET_2_1) +test_run:drop_cluster(REPLICASET_2_2) diff --git a/test/multiple_routers/router_1.lua b/test/multiple_routers/router_1.lua new file mode 100644 index 0000000..2e9ea91 --- /dev/null +++ b/test/multiple_routers/router_1.lua @@ -0,0 +1,15 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +-- Get instance name +local fio = require('fio') +local NAME = fio.basename(arg[0], '.lua') + +require('console').listen(os.getenv('ADMIN')) + +configs = require('configs') + +-- Start the database with sharding +vshard = require('vshard') +box.cfg{} diff --git a/test/multiple_routers/storage_1_1_a.lua b/test/multiple_routers/storage_1_1_a.lua new file mode 100644 index 0000000..b44a97a --- /dev/null +++ b/test/multiple_routers/storage_1_1_a.lua @@ -0,0 +1,23 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +-- Get instance name. +local fio = require('fio') +NAME = fio.basename(arg[0], '.lua') + +require('console').listen(os.getenv('ADMIN')) + +-- Fetch config for the cluster of the instance. +if NAME:sub(9,9) == '1' then + cfg = require('configs').cfg_1 +else + cfg = require('configs').cfg_2 +end + +-- Start the database with sharding. +vshard = require('vshard') +vshard.storage.cfg(cfg, names[NAME]) + +-- Bootstrap storage. +require('lua_libs.bootstrap') diff --git a/test/multiple_routers/storage_1_1_b.lua b/test/multiple_routers/storage_1_1_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_1_1_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_1_2_a.lua b/test/multiple_routers/storage_1_2_a.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_1_2_a.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_1_2_b.lua b/test/multiple_routers/storage_1_2_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_1_2_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_1_a.lua b/test/multiple_routers/storage_2_1_a.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_1_a.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_1_b.lua b/test/multiple_routers/storage_2_1_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_1_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_2_a.lua b/test/multiple_routers/storage_2_2_a.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_2_a.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_2_b.lua b/test/multiple_routers/storage_2_2_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_2_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/suite.ini b/test/multiple_routers/suite.ini new file mode 100644 index 0000000..d2d4470 --- /dev/null +++ b/test/multiple_routers/suite.ini @@ -0,0 +1,6 @@ +[default] +core = tarantool +description = Multiple routers tests +script = test.lua +is_parallel = False +lua_libs = ../lua_libs configs.lua diff --git a/test/multiple_routers/test.lua b/test/multiple_routers/test.lua new file mode 100644 index 0000000..cb7c1ee --- /dev/null +++ b/test/multiple_routers/test.lua @@ -0,0 +1,9 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +box.cfg{ + listen = os.getenv("LISTEN"), +} + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/router/exponential_timeout.result b/test/router/exponential_timeout.result index fb54d0f..6748b64 100644 --- a/test/router/exponential_timeout.result +++ b/test/router/exponential_timeout.result @@ -37,10 +37,10 @@ test_run:cmd('switch router_1') util = require('util') --- ... -rs1 = vshard.router.internal.replicasets[replicasets[1]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]] --- ... -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]] --- ... util.collect_timeouts(rs1) diff --git a/test/router/exponential_timeout.test.lua b/test/router/exponential_timeout.test.lua index 3ec0b8c..75d85bf 100644 --- a/test/router/exponential_timeout.test.lua +++ b/test/router/exponential_timeout.test.lua @@ -13,8 +13,8 @@ test_run:cmd("start server router_1") test_run:cmd('switch router_1') util = require('util') -rs1 = vshard.router.internal.replicasets[replicasets[1]] -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]] util.collect_timeouts(rs1) util.collect_timeouts(rs2) diff --git a/test/router/reconnect_to_master.result b/test/router/reconnect_to_master.result index 5e678ce..d502723 100644 --- a/test/router/reconnect_to_master.result +++ b/test/router/reconnect_to_master.result @@ -76,7 +76,7 @@ _ = test_run:cmd('stop server storage_1_a') _ = test_run:switch('router_1') --- ... -reps = vshard.router.internal.replicasets +reps = vshard.router.internal.static_router.replicasets --- ... test_run:cmd("setopt delimiter ';'") @@ -95,7 +95,7 @@ end; ... function count_known_buckets() local known_buckets = 0 - for _, id in pairs(vshard.router.internal.route_map) do + for _, id in pairs(vshard.router.internal.static_router.route_map) do known_buckets = known_buckets + 1 end return known_buckets @@ -127,7 +127,7 @@ is_disconnected() fiber = require('fiber') --- ... -while vshard.router.internal.replicasets[replicasets[1]].replica == nil do fiber.sleep(0.1) end +while vshard.router.internal.static_router.replicasets[replicasets[1]].replica == nil do fiber.sleep(0.1) end --- ... vshard.router.info() diff --git a/test/router/reconnect_to_master.test.lua b/test/router/reconnect_to_master.test.lua index 39ba90e..8820fa7 100644 --- a/test/router/reconnect_to_master.test.lua +++ b/test/router/reconnect_to_master.test.lua @@ -34,7 +34,7 @@ _ = test_run:cmd('stop server storage_1_a') _ = test_run:switch('router_1') -reps = vshard.router.internal.replicasets +reps = vshard.router.internal.static_router.replicasets test_run:cmd("setopt delimiter ';'") function is_disconnected() for i, rep in pairs(reps) do @@ -46,7 +46,7 @@ function is_disconnected() end; function count_known_buckets() local known_buckets = 0 - for _, id in pairs(vshard.router.internal.route_map) do + for _, id in pairs(vshard.router.internal.static_router.route_map) do known_buckets = known_buckets + 1 end return known_buckets @@ -63,7 +63,7 @@ is_disconnected() -- Wait until replica is connected to test alerts on unavailable -- master. fiber = require('fiber') -while vshard.router.internal.replicasets[replicasets[1]].replica == nil do fiber.sleep(0.1) end +while vshard.router.internal.static_router.replicasets[replicasets[1]].replica == nil do fiber.sleep(0.1) end vshard.router.info() -- Return master. diff --git a/test/router/reload.result b/test/router/reload.result index f0badc3..98e8e71 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -229,7 +229,7 @@ vshard.router.cfg(cfg) cfg.connection_outdate_delay = old_connection_delay --- ... -vshard.router.internal.connection_outdate_delay = nil +vshard.router.internal.static_router.connection_outdate_delay = nil --- ... rs_new = vshard.router.route(1) diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 528222a..293cb26 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -104,7 +104,7 @@ old_connection_delay = cfg.connection_outdate_delay cfg.connection_outdate_delay = 0.3 vshard.router.cfg(cfg) cfg.connection_outdate_delay = old_connection_delay -vshard.router.internal.connection_outdate_delay = nil +vshard.router.internal.static_router.connection_outdate_delay = nil rs_new = vshard.router.route(1) rs_old = rs _, replica_old = next(rs_old.replicas) diff --git a/test/router/reroute_wrong_bucket.result b/test/router/reroute_wrong_bucket.result index 7f2a494..989dc79 100644 --- a/test/router/reroute_wrong_bucket.result +++ b/test/router/reroute_wrong_bucket.result @@ -98,7 +98,7 @@ vshard.router.call(100, 'read', 'customer_lookup', {1}, {timeout = 100}) --- - {'accounts': [], 'customer_id': 1, 'name': 'name'} ... -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]] --- ... vshard.router.call(100, 'write', 'customer_add', {{customer_id = 2, bucket_id = 100, name = 'name2', accounts = {}}}, {timeout = 100}) @@ -146,13 +146,13 @@ test_run:switch('router_1') ... -- Emulate a situation, when a replicaset_2 while is unknown for -- router, but is already known for storages. -save_rs2 = vshard.router.internal.replicasets[replicasets[2]] +save_rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]] --- ... -vshard.router.internal.replicasets[replicasets[2]] = nil +vshard.router.internal.static_router.replicasets[replicasets[2]] = nil --- ... -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]] --- ... fiber = require('fiber') @@ -207,7 +207,7 @@ err require('log').info(string.rep('a', 1000)) --- ... -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]] --- ... call_retval = nil @@ -219,7 +219,7 @@ f = fiber.create(do_call, 100) while not test_run:grep_log('router_1', 'please update configuration', 1000) do fiber.sleep(0.1) end --- ... -vshard.router.internal.replicasets[replicasets[2]] = save_rs2 +vshard.router.internal.static_router.replicasets[replicasets[2]] = save_rs2 --- ... while not call_retval do fiber.sleep(0.1) end diff --git a/test/router/reroute_wrong_bucket.test.lua b/test/router/reroute_wrong_bucket.test.lua index 03384d1..a00f941 100644 --- a/test/router/reroute_wrong_bucket.test.lua +++ b/test/router/reroute_wrong_bucket.test.lua @@ -35,7 +35,7 @@ customer_add({customer_id = 1, bucket_id = 100, name = 'name', accounts = {}}) test_run:switch('router_1') vshard.router.call(100, 'read', 'customer_lookup', {1}, {timeout = 100}) -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]] vshard.router.call(100, 'write', 'customer_add', {{customer_id = 2, bucket_id = 100, name = 'name2', accounts = {}}}, {timeout = 100}) -- Create cycle. @@ -55,9 +55,9 @@ box.space._bucket:replace({100, vshard.consts.BUCKET.SENT, replicasets[2]}) test_run:switch('router_1') -- Emulate a situation, when a replicaset_2 while is unknown for -- router, but is already known for storages. -save_rs2 = vshard.router.internal.replicasets[replicasets[2]] -vshard.router.internal.replicasets[replicasets[2]] = nil -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +save_rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]] +vshard.router.internal.static_router.replicasets[replicasets[2]] = nil +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]] fiber = require('fiber') call_retval = nil @@ -84,11 +84,11 @@ err -- detect it and end with ok. -- require('log').info(string.rep('a', 1000)) -vshard.router.internal.route_map[100] = vshard.router.internal.replicasets[replicasets[1]] +vshard.router.internal.static_router.route_map[100] = vshard.router.internal.static_router.replicasets[replicasets[1]] call_retval = nil f = fiber.create(do_call, 100) while not test_run:grep_log('router_1', 'please update configuration', 1000) do fiber.sleep(0.1) end -vshard.router.internal.replicasets[replicasets[2]] = save_rs2 +vshard.router.internal.static_router.replicasets[replicasets[2]] = save_rs2 while not call_retval do fiber.sleep(0.1) end call_retval vshard.router.call(100, 'read', 'customer_lookup', {3}, {timeout = 1}) diff --git a/test/router/retry_reads.result b/test/router/retry_reads.result index 64b0ff3..b803ae3 100644 --- a/test/router/retry_reads.result +++ b/test/router/retry_reads.result @@ -37,7 +37,7 @@ test_run:cmd('switch router_1') util = require('util') --- ... -rs1 = vshard.router.internal.replicasets[replicasets[1]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]] --- ... min_timeout = vshard.consts.CALL_TIMEOUT_MIN diff --git a/test/router/retry_reads.test.lua b/test/router/retry_reads.test.lua index 2fb2fc7..510e961 100644 --- a/test/router/retry_reads.test.lua +++ b/test/router/retry_reads.test.lua @@ -13,7 +13,7 @@ test_run:cmd("start server router_1") test_run:cmd('switch router_1') util = require('util') -rs1 = vshard.router.internal.replicasets[replicasets[1]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]] min_timeout = vshard.consts.CALL_TIMEOUT_MIN -- diff --git a/test/router/router.result b/test/router/router.result index 45394e1..ceaf672 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -70,10 +70,10 @@ test_run:grep_log('router_1', 'connected to ') --- - 'connected to ' ... -rs1 = vshard.router.internal.replicasets[replicasets[1]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]] --- ... -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]] --- ... fiber = require('fiber') @@ -95,7 +95,7 @@ rs2.replica == rs2.master -- Part of gh-76: on reconfiguration do not recreate connections -- to replicas, that are kept in a new configuration. -- -old_replicasets = vshard.router.internal.replicasets +old_replicasets = vshard.router.internal.static_router.replicasets --- ... old_connections = {} @@ -127,17 +127,17 @@ connection_count == 4 vshard.router.cfg(cfg) --- ... -new_replicasets = vshard.router.internal.replicasets +new_replicasets = vshard.router.internal.static_router.replicasets --- ... old_replicasets ~= new_replicasets --- - true ... -rs1 = vshard.router.internal.replicasets[replicasets[1]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]] --- ... -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]] --- ... while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end @@ -225,7 +225,7 @@ vshard.router.bootstrap() -- -- gh-108: negative bucket count on discovery. -- -vshard.router.internal.route_map = {} +vshard.router.internal.static_router.route_map = {} --- ... rets = {} @@ -456,7 +456,7 @@ conn.state rs_uuid = '<replicaset_2>' --- ... -rs = vshard.router.internal.replicasets[rs_uuid] +rs = vshard.router.internal.static_router.replicasets[rs_uuid] --- ... master = rs.master @@ -605,7 +605,7 @@ vshard.router.info() ... -- Remove replica and master connections to trigger alert -- UNREACHABLE_REPLICASET. -rs = vshard.router.internal.replicasets[replicasets[1]] +rs = vshard.router.internal.static_router.replicasets[replicasets[1]] --- ... master_conn = rs.master.conn @@ -749,7 +749,7 @@ test_run:cmd("setopt delimiter ';'") ... function calculate_known_buckets() local known_buckets = 0 - for _, rs in pairs(vshard.router.internal.route_map) do + for _, rs in pairs(vshard.router.internal.static_router.route_map) do known_buckets = known_buckets + 1 end return known_buckets @@ -851,10 +851,10 @@ test_run:cmd("setopt delimiter ';'") - true ... for i = 1, 100 do - local rs = vshard.router.internal.route_map[i] + local rs = vshard.router.internal.static_router.route_map[i] assert(rs) rs.bucket_count = rs.bucket_count - 1 - vshard.router.internal.route_map[i] = nil + vshard.router.internal.static_router.route_map[i] = nil end; --- ... @@ -999,7 +999,7 @@ vshard.router.sync(100500) -- object method like this: object.method() instead of -- object:method(), an appropriate help-error returns. -- -_, replicaset = next(vshard.router.internal.replicasets) +_, replicaset = next(vshard.router.internal.static_router.replicasets) --- ... error_messages = {} @@ -1069,7 +1069,7 @@ test_run:cmd("setopt delimiter ';'") --- - true ... -for bucket, rs in pairs(vshard.router.internal.route_map) do +for bucket, rs in pairs(vshard.router.internal.static_router.route_map) do bucket_to_old_rs[bucket] = rs bucket_cnt = bucket_cnt + 1 end; @@ -1084,7 +1084,7 @@ vshard.router.cfg(cfg); ... for bucket, old_rs in pairs(bucket_to_old_rs) do local old_uuid = old_rs.uuid - local rs = vshard.router.internal.route_map[bucket] + local rs = vshard.router.internal.static_router.route_map[bucket] if not rs or not old_uuid == rs.uuid then error("Bucket lost during reconfigure.") end @@ -1111,7 +1111,7 @@ end; vshard.router.cfg(cfg); --- ... -vshard.router.internal.route_map = {}; +vshard.router.internal.static_router.route_map = {}; --- ... vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false; @@ -1119,7 +1119,7 @@ vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false; ... -- Do discovery iteration. Upload buckets from the -- first replicaset. -while not next(vshard.router.internal.route_map) do +while not next(vshard.router.internal.static_router.route_map) do vshard.router.discovery_wakeup() fiber.sleep(0.01) end; @@ -1128,12 +1128,12 @@ end; new_replicasets = {}; --- ... -for _, rs in pairs(vshard.router.internal.replicasets) do +for _, rs in pairs(vshard.router.internal.static_router.replicasets) do new_replicasets[rs] = true end; --- ... -_, rs = next(vshard.router.internal.route_map); +_, rs = next(vshard.router.internal.static_router.route_map); --- ... new_replicasets[rs] == true; @@ -1185,6 +1185,17 @@ vshard.router.route(1):callro('echo', {'some_data'}) - null - null ... +-- Multiple routers: check that static router can be used as an +-- object. +static_router = vshard.router.internal.static_router +--- +... +static_router:route(1):callro('echo', {'some_data'}) +--- +- some_data +- null +- null +... _ = test_run:cmd("switch default") --- ... diff --git a/test/router/router.test.lua b/test/router/router.test.lua index df2f381..d7588f7 100644 --- a/test/router/router.test.lua +++ b/test/router/router.test.lua @@ -27,8 +27,8 @@ util = require('util') -- gh-24: log all connnect/disconnect events. test_run:grep_log('router_1', 'connected to ') -rs1 = vshard.router.internal.replicasets[replicasets[1]] -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]] fiber = require('fiber') while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end -- With no zones the nearest server is master. @@ -39,7 +39,7 @@ rs2.replica == rs2.master -- Part of gh-76: on reconfiguration do not recreate connections -- to replicas, that are kept in a new configuration. -- -old_replicasets = vshard.router.internal.replicasets +old_replicasets = vshard.router.internal.static_router.replicasets old_connections = {} connection_count = 0 test_run:cmd("setopt delimiter ';'") @@ -52,10 +52,10 @@ end; test_run:cmd("setopt delimiter ''"); connection_count == 4 vshard.router.cfg(cfg) -new_replicasets = vshard.router.internal.replicasets +new_replicasets = vshard.router.internal.static_router.replicasets old_replicasets ~= new_replicasets -rs1 = vshard.router.internal.replicasets[replicasets[1]] -rs2 = vshard.router.internal.replicasets[replicasets[2]] +rs1 = vshard.router.internal.static_router.replicasets[replicasets[1]] +rs2 = vshard.router.internal.static_router.replicasets[replicasets[2]] while not rs1.replica or not rs2.replica do fiber.sleep(0.1) end vshard.router.discovery_wakeup() -- Check that netbox connections are the same. @@ -91,7 +91,7 @@ vshard.router.bootstrap() -- -- gh-108: negative bucket count on discovery. -- -vshard.router.internal.route_map = {} +vshard.router.internal.static_router.route_map = {} rets = {} function do_echo() table.insert(rets, vshard.router.callro(1, 'echo', {1})) end f1 = fiber.create(do_echo) f2 = fiber.create(do_echo) @@ -153,7 +153,7 @@ conn = vshard.router.route(1).master.conn conn.state -- Test missing master. rs_uuid = 'ac522f65-aa94-4134-9f64-51ee384f1a54' -rs = vshard.router.internal.replicasets[rs_uuid] +rs = vshard.router.internal.static_router.replicasets[rs_uuid] master = rs.master rs.master = nil vshard.router.route(1).master @@ -223,7 +223,7 @@ vshard.router.info() -- Remove replica and master connections to trigger alert -- UNREACHABLE_REPLICASET. -rs = vshard.router.internal.replicasets[replicasets[1]] +rs = vshard.router.internal.static_router.replicasets[replicasets[1]] master_conn = rs.master.conn replica_conn = rs.replica.conn rs.master.conn = nil @@ -261,7 +261,7 @@ util.check_error(vshard.router.buckets_info, 123, '456') test_run:cmd("setopt delimiter ';'") function calculate_known_buckets() local known_buckets = 0 - for _, rs in pairs(vshard.router.internal.route_map) do + for _, rs in pairs(vshard.router.internal.static_router.route_map) do known_buckets = known_buckets + 1 end return known_buckets @@ -301,10 +301,10 @@ test_run:switch('router_1') -- test_run:cmd("setopt delimiter ';'") for i = 1, 100 do - local rs = vshard.router.internal.route_map[i] + local rs = vshard.router.internal.static_router.route_map[i] assert(rs) rs.bucket_count = rs.bucket_count - 1 - vshard.router.internal.route_map[i] = nil + vshard.router.internal.static_router.route_map[i] = nil end; test_run:cmd("setopt delimiter ''"); calculate_known_buckets() @@ -367,7 +367,7 @@ vshard.router.sync(100500) -- object method like this: object.method() instead of -- object:method(), an appropriate help-error returns. -- -_, replicaset = next(vshard.router.internal.replicasets) +_, replicaset = next(vshard.router.internal.static_router.replicasets) error_messages = {} test_run:cmd("setopt delimiter ';'") @@ -395,7 +395,7 @@ error_messages bucket_to_old_rs = {} bucket_cnt = 0 test_run:cmd("setopt delimiter ';'") -for bucket, rs in pairs(vshard.router.internal.route_map) do +for bucket, rs in pairs(vshard.router.internal.static_router.route_map) do bucket_to_old_rs[bucket] = rs bucket_cnt = bucket_cnt + 1 end; @@ -403,7 +403,7 @@ bucket_cnt; vshard.router.cfg(cfg); for bucket, old_rs in pairs(bucket_to_old_rs) do local old_uuid = old_rs.uuid - local rs = vshard.router.internal.route_map[bucket] + local rs = vshard.router.internal.static_router.route_map[bucket] if not rs or not old_uuid == rs.uuid then error("Bucket lost during reconfigure.") end @@ -423,19 +423,19 @@ while vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY ~= 'waiting' do fiber.sleep(0.02) end; vshard.router.cfg(cfg); -vshard.router.internal.route_map = {}; +vshard.router.internal.static_router.route_map = {}; vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false; -- Do discovery iteration. Upload buckets from the -- first replicaset. -while not next(vshard.router.internal.route_map) do +while not next(vshard.router.internal.static_router.route_map) do vshard.router.discovery_wakeup() fiber.sleep(0.01) end; new_replicasets = {}; -for _, rs in pairs(vshard.router.internal.replicasets) do +for _, rs in pairs(vshard.router.internal.static_router.replicasets) do new_replicasets[rs] = true end; -_, rs = next(vshard.router.internal.route_map); +_, rs = next(vshard.router.internal.static_router.route_map); new_replicasets[rs] == true; test_run:cmd("setopt delimiter ''"); @@ -453,6 +453,11 @@ vshard.router.internal.errinj.ERRINJ_CFG = false util.has_same_fields(old_internal, vshard.router.internal) vshard.router.route(1):callro('echo', {'some_data'}) +-- Multiple routers: check that static router can be used as an +-- object. +static_router = vshard.router.internal.static_router +static_router:route(1):callro('echo', {'some_data'}) + _ = test_run:cmd("switch default") test_run:drop_cluster(REPLICASET_2) diff --git a/vshard/error.lua b/vshard/error.lua index f79107b..da92b58 100644 --- a/vshard/error.lua +++ b/vshard/error.lua @@ -105,7 +105,12 @@ local error_message_template = { name = 'OBJECT_IS_OUTDATED', msg = 'Object is outdated after module reload/reconfigure. ' .. 'Use new instance.' - } + }, + [21] = { + name = 'ROUTER_ALREADY_EXISTS', + msg = 'Router with name %s already exists', + args = {'name'}, + }, } -- diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 59c25a0..b31f7dc 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -25,14 +25,33 @@ local M = rawget(_G, MODULE_INTERNALS) if not M then M = { ---------------- Common module attributes ---------------- - -- The last passed configuration. - current_cfg = nil, errinj = { ERRINJ_CFG = false, ERRINJ_FAILOVER_CHANGE_CFG = false, ERRINJ_RELOAD = false, ERRINJ_LONG_DISCOVERY = false, }, + -- Dictionary, key is router name, value is a router. + routers = {}, + -- Router object which can be accessed by old api: + -- e.g. vshard.router.call(...) + static_router = nil, + -- This counter is used to restart background fibers with + -- new reloaded code. + module_version = 0, + -- Number of router which require collecting lua garbage. + collect_lua_garbage_cnt = 0, + } +end + +-- +-- Router object attributes. +-- +local ROUTER_TEMPLATE = { + -- Name of router. + name = nil, + -- The last passed configuration. + current_cfg = nil, -- Time to outdate old objects on reload. connection_outdate_delay = nil, -- Bucket map cache. @@ -47,38 +66,60 @@ if not M then total_bucket_count = 0, -- Boolean lua_gc state (create periodic gc task). collect_lua_garbage = nil, - -- This counter is used to restart background fibers with - -- new reloaded code. - module_version = 0, - } -end +} + +local STATIC_ROUTER_NAME = '_static_router' -- Set a bucket to a replicaset. -local function bucket_set(bucket_id, rs_uuid) - local replicaset = M.replicasets[rs_uuid] +local function bucket_set(router, bucket_id, rs_uuid) + local replicaset = router.replicasets[rs_uuid] -- It is technically possible to delete a replicaset at the -- same time when route to the bucket is discovered. if not replicaset then return nil, lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET, bucket_id) end - local old_replicaset = M.route_map[bucket_id] + local old_replicaset = router.route_map[bucket_id] if old_replicaset ~= replicaset then if old_replicaset then old_replicaset.bucket_count = old_replicaset.bucket_count - 1 end replicaset.bucket_count = replicaset.bucket_count + 1 end - M.route_map[bucket_id] = replicaset + router.route_map[bucket_id] = replicaset return replicaset end -- Remove a bucket from the cache. -local function bucket_reset(bucket_id) - local replicaset = M.route_map[bucket_id] +local function bucket_reset(router, bucket_id) + local replicaset = router.route_map[bucket_id] if replicaset then replicaset.bucket_count = replicaset.bucket_count - 1 end - M.route_map[bucket_id] = nil + router.route_map[bucket_id] = nil +end + +-------------------------------------------------------------------------------- +-- Helpers +-------------------------------------------------------------------------------- + +-- +-- Increase/decrease number of routers which require to collect +-- a lua garbage and change state of the `lua_gc` fiber. +-- + +local function lua_gc_cnt_inc() + M.collect_lua_garbage_cnt = M.collect_lua_garbage_cnt + 1 + if M.collect_lua_garbage_cnt == 1 then + lua_gc.set_state(true, consts.COLLECT_LUA_GARBAGE_INTERVAL) + end +end + +local function lua_gc_cnt_dec() + M.collect_lua_garbage_cnt = M.collect_lua_garbage_cnt - 1 + assert(M.collect_lua_garbage_cnt >= 0) + if M.collect_lua_garbage_cnt == 0 then + lua_gc.set_state(false, consts.COLLECT_LUA_GARBAGE_INTERVAL) + end end -------------------------------------------------------------------------------- @@ -86,8 +127,8 @@ end -------------------------------------------------------------------------------- -- Search bucket in whole cluster -local function bucket_discovery(bucket_id) - local replicaset = M.route_map[bucket_id] +local function bucket_discovery(router, bucket_id) + local replicaset = router.route_map[bucket_id] if replicaset ~= nil then return replicaset end @@ -95,11 +136,11 @@ local function bucket_discovery(bucket_id) log.verbose("Discovering bucket %d", bucket_id) local last_err = nil local unreachable_uuid = nil - for uuid, replicaset in pairs(M.replicasets) do + for uuid, replicaset in pairs(router.replicasets) do local _, err = replicaset:callrw('vshard.storage.bucket_stat', {bucket_id}) if err == nil then - return bucket_set(bucket_id, replicaset.uuid) + return bucket_set(router, bucket_id, replicaset.uuid) elseif err.code ~= lerror.code.WRONG_BUCKET then last_err = err unreachable_uuid = uuid @@ -128,14 +169,14 @@ local function bucket_discovery(bucket_id) end -- Resolve bucket id to replicaset uuid -local function bucket_resolve(bucket_id) +local function bucket_resolve(router, bucket_id) local replicaset, err - local replicaset = M.route_map[bucket_id] + local replicaset = router.route_map[bucket_id] if replicaset ~= nil then return replicaset end -- Replicaset removed from cluster, perform discovery - replicaset, err = bucket_discovery(bucket_id) + replicaset, err = bucket_discovery(router, bucket_id) if replicaset == nil then return nil, err end @@ -146,14 +187,14 @@ end -- Background fiber to perform discovery. It periodically scans -- replicasets one by one and updates route_map. -- -local function discovery_f() +local function discovery_f(router) local module_version = M.module_version while module_version == M.module_version do - while not next(M.replicasets) do + while not next(router.replicasets) do lfiber.sleep(consts.DISCOVERY_INTERVAL) end - local old_replicasets = M.replicasets - for rs_uuid, replicaset in pairs(M.replicasets) do + local old_replicasets = router.replicasets + for rs_uuid, replicaset in pairs(router.replicasets) do local active_buckets, err = replicaset:callro('vshard.storage.buckets_discovery', {}, {timeout = 2}) @@ -163,7 +204,7 @@ local function discovery_f() end -- Renew replicasets object captured by the for loop -- in case of reconfigure and reload events. - if M.replicasets ~= old_replicasets then + if router.replicasets ~= old_replicasets then break end if not active_buckets then @@ -176,11 +217,11 @@ local function discovery_f() end replicaset.bucket_count = #active_buckets for _, bucket_id in pairs(active_buckets) do - local old_rs = M.route_map[bucket_id] + local old_rs = router.route_map[bucket_id] if old_rs and old_rs ~= replicaset then old_rs.bucket_count = old_rs.bucket_count - 1 end - M.route_map[bucket_id] = replicaset + router.route_map[bucket_id] = replicaset end end lfiber.sleep(consts.DISCOVERY_INTERVAL) @@ -191,9 +232,9 @@ end -- -- Immediately wakeup discovery fiber if exists. -- -local function discovery_wakeup() - if M.discovery_fiber then - M.discovery_fiber:wakeup() +local function discovery_wakeup(router) + if router.discovery_fiber then + router.discovery_fiber:wakeup() end end @@ -205,7 +246,7 @@ end -- Function will restart operation after wrong bucket response until timeout -- is reached -- -local function router_call(bucket_id, mode, func, args, opts) +local function router_call(router, bucket_id, mode, func, args, opts) if opts and (type(opts) ~= 'table' or (opts.timeout and type(opts.timeout) ~= 'number')) then error('Usage: call(bucket_id, mode, func, args, opts)') @@ -213,7 +254,7 @@ local function router_call(bucket_id, mode, func, args, opts) local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN local replicaset, err local tend = lfiber.time() + timeout - if bucket_id > M.total_bucket_count or bucket_id <= 0 then + if bucket_id > router.total_bucket_count or bucket_id <= 0 then error('Bucket is unreachable: bucket id is out of range') end local call @@ -223,7 +264,7 @@ local function router_call(bucket_id, mode, func, args, opts) call = 'callrw' end repeat - replicaset, err = bucket_resolve(bucket_id) + replicaset, err = bucket_resolve(router, bucket_id) if replicaset then ::replicaset_is_found:: local storage_call_status, call_status, call_error = @@ -239,9 +280,9 @@ local function router_call(bucket_id, mode, func, args, opts) end err = call_status if err.code == lerror.code.WRONG_BUCKET then - bucket_reset(bucket_id) + bucket_reset(router, bucket_id) if err.destination then - replicaset = M.replicasets[err.destination] + replicaset = router.replicasets[err.destination] if not replicaset then log.warn('Replicaset "%s" was not found, but received'.. ' from storage as destination - please '.. @@ -253,13 +294,14 @@ local function router_call(bucket_id, mode, func, args, opts) -- but already is executed on storages. while lfiber.time() <= tend do lfiber.sleep(0.05) - replicaset = M.replicasets[err.destination] + replicaset = router.replicasets[err.destination] if replicaset then goto replicaset_is_found end end else - replicaset = bucket_set(bucket_id, replicaset.uuid) + replicaset = bucket_set(router, bucket_id, + replicaset.uuid) lfiber.yield() -- Protect against infinite cycle in a -- case of broken cluster, when a bucket @@ -276,7 +318,7 @@ local function router_call(bucket_id, mode, func, args, opts) -- is not timeout - these requests are repeated in -- any case on client, if error. assert(mode == 'write') - bucket_reset(bucket_id) + bucket_reset(router, bucket_id) return nil, err elseif err.code == lerror.code.NON_MASTER then -- Same, as above - do not wait and repeat. @@ -302,12 +344,12 @@ end -- -- Wrappers for router_call with preset mode. -- -local function router_callro(bucket_id, ...) - return router_call(bucket_id, 'read', ...) +local function router_callro(router, bucket_id, ...) + return router_call(router, bucket_id, 'read', ...) end -local function router_callrw(bucket_id, ...) - return router_call(bucket_id, 'write', ...) +local function router_callrw(router, bucket_id, ...) + return router_call(router, bucket_id, 'write', ...) end -- @@ -315,27 +357,27 @@ end -- @param bucket_id Bucket identifier. -- @retval Netbox connection. -- -local function router_route(bucket_id) +local function router_route(router, bucket_id) if type(bucket_id) ~= 'number' then error('Usage: router.route(bucket_id)') end - return bucket_resolve(bucket_id) + return bucket_resolve(router, bucket_id) end -- -- Return map of all replicasets. -- @retval See self.replicasets map. -- -local function router_routeall() - return M.replicasets +local function router_routeall(router) + return router.replicasets end -------------------------------------------------------------------------------- -- Failover -------------------------------------------------------------------------------- -local function failover_ping_round() - for _, replicaset in pairs(M.replicasets) do +local function failover_ping_round(router) + for _, replicaset in pairs(router.replicasets) do local replica = replicaset.replica if replica ~= nil and replica.conn ~= nil and replica.down_ts == nil then @@ -378,10 +420,10 @@ end -- Collect UUIDs of replicasets, priority of whose replica -- connections must be updated. -- -local function failover_collect_to_update() +local function failover_collect_to_update(router) local ts = lfiber.time() local uuid_to_update = {} - for uuid, rs in pairs(M.replicasets) do + for uuid, rs in pairs(router.replicasets) do if failover_need_down_priority(rs, ts) or failover_need_up_priority(rs, ts) then table.insert(uuid_to_update, uuid) @@ -396,16 +438,16 @@ end -- disconnected replicas. -- @retval true A replica of an replicaset has been changed. -- -local function failover_step() - failover_ping_round() - local uuid_to_update = failover_collect_to_update() +local function failover_step(router) + failover_ping_round(router) + local uuid_to_update = failover_collect_to_update(router) if #uuid_to_update == 0 then return false end local curr_ts = lfiber.time() local replica_is_changed = false for _, uuid in pairs(uuid_to_update) do - local rs = M.replicasets[uuid] + local rs = router.replicasets[uuid] if M.errinj.ERRINJ_FAILOVER_CHANGE_CFG then rs = nil M.errinj.ERRINJ_FAILOVER_CHANGE_CFG = false @@ -447,7 +489,7 @@ end -- tries to reconnect to the best replica. When the connection is -- established, it replaces the original replica. -- -local function failover_f() +local function failover_f(router) local module_version = M.module_version local min_timeout = math.min(consts.FAILOVER_UP_TIMEOUT, consts.FAILOVER_DOWN_TIMEOUT) @@ -457,7 +499,7 @@ local function failover_f() local prev_was_ok = false while module_version == M.module_version do ::continue:: - local ok, replica_is_changed = pcall(failover_step) + local ok, replica_is_changed = pcall(failover_step, router) if not ok then log.error('Error during failovering: %s', lerror.make(replica_is_changed)) @@ -484,8 +526,8 @@ end -- Configuration -------------------------------------------------------------------------------- -local function router_cfg(cfg, is_reload) - cfg = lcfg.check(cfg, M.current_cfg) +local function router_cfg(router, cfg, is_reload) + cfg = lcfg.check(cfg, router.current_cfg) local vshard_cfg, box_cfg = lcfg.split(cfg) if not M.replicasets then log.info('Starting router configuration') @@ -511,41 +553,47 @@ local function router_cfg(cfg, is_reload) -- Move connections from an old configuration to a new one. -- It must be done with no yields to prevent usage both of not -- fully moved old replicasets, and not fully built new ones. - lreplicaset.rebind_replicasets(new_replicasets, M.replicasets) + lreplicaset.rebind_replicasets(new_replicasets, router.replicasets) -- Now the new replicasets are fully built. Can establish -- connections and yield. for _, replicaset in pairs(new_replicasets) do replicaset:connect_all() end + -- Change state of lua GC. + if vshard_cfg.collect_lua_garbage and not router.collect_lua_garbage then + lua_gc_cnt_inc() + elseif not vshard_cfg.collect_lua_garbage and + router.collect_lua_garbage then + lua_gc_cnt_dec() + end lreplicaset.wait_masters_connect(new_replicasets) - lreplicaset.outdate_replicasets(M.replicasets, + lreplicaset.outdate_replicasets(router.replicasets, vshard_cfg.connection_outdate_delay) - M.connection_outdate_delay = vshard_cfg.connection_outdate_delay - M.total_bucket_count = total_bucket_count - M.collect_lua_garbage = vshard_cfg.collect_lua_garbage - M.current_cfg = cfg - M.replicasets = new_replicasets - for bucket, rs in pairs(M.route_map) do - M.route_map[bucket] = M.replicasets[rs.uuid] - end - if M.failover_fiber == nil then - M.failover_fiber = util.reloadable_fiber_create('vshard.failover', M, - 'failover_f') + router.connection_outdate_delay = vshard_cfg.connection_outdate_delay + router.total_bucket_count = total_bucket_count + router.collect_lua_garbage = vshard_cfg.collect_lua_garbage + router.current_cfg = cfg + router.replicasets = new_replicasets + for bucket, rs in pairs(router.route_map) do + router.route_map[bucket] = router.replicasets[rs.uuid] + end + if router.failover_fiber == nil then + router.failover_fiber = util.reloadable_fiber_create( + 'vshard.failover.' .. router.name, M, 'failover_f', router) + end + if router.discovery_fiber == nil then + router.discovery_fiber = util.reloadable_fiber_create( + 'vshard.discovery.' .. router.name, M, 'discovery_f', router) end - if M.discovery_fiber == nil then - M.discovery_fiber = util.reloadable_fiber_create('vshard.discovery', M, - 'discovery_f') - end - lua_gc.set_state(M.collect_lua_garbage, consts.COLLECT_LUA_GARBAGE_INTERVAL) end -------------------------------------------------------------------------------- -- Bootstrap -------------------------------------------------------------------------------- -local function cluster_bootstrap() +local function cluster_bootstrap(router) local replicasets = {} - for uuid, replicaset in pairs(M.replicasets) do + for uuid, replicaset in pairs(router.replicasets) do table.insert(replicasets, replicaset) local count, err = replicaset:callrw('vshard.storage.buckets_count', {}) @@ -556,9 +604,10 @@ local function cluster_bootstrap() return nil, lerror.vshard(lerror.code.NON_EMPTY) end end - lreplicaset.calculate_etalon_balance(M.replicasets, M.total_bucket_count) + lreplicaset.calculate_etalon_balance(router.replicasets, + router.total_bucket_count) local bucket_id = 1 - for uuid, replicaset in pairs(M.replicasets) do + for uuid, replicaset in pairs(router.replicasets) do if replicaset.etalon_bucket_count > 0 then local ok, err = replicaset:callrw('vshard.storage.bucket_force_create', @@ -614,7 +663,7 @@ local function replicaset_instance_info(replicaset, name, alerts, errcolor, return info, consts.STATUS.GREEN end -local function router_info() +local function router_info(router) local state = { replicasets = {}, bucket = { @@ -628,7 +677,7 @@ local function router_info() } local bucket_info = state.bucket local known_bucket_count = 0 - for rs_uuid, replicaset in pairs(M.replicasets) do + for rs_uuid, replicaset in pairs(router.replicasets) do -- Replicaset info parameters: -- * master instance info; -- * replica instance info; @@ -716,7 +765,7 @@ local function router_info() -- If a bucket is unreachable, then replicaset is -- unreachable too and color already is red. end - bucket_info.unknown = M.total_bucket_count - known_bucket_count + bucket_info.unknown = router.total_bucket_count - known_bucket_count if bucket_info.unknown > 0 then state.status = math.max(state.status, consts.STATUS.YELLOW) table.insert(state.alerts, lerror.alert(lerror.code.UNKNOWN_BUCKETS, @@ -733,13 +782,13 @@ end -- @param limit Maximal bucket count in output. -- @retval Map of type {bucket_id = 'unknown'/replicaset_uuid}. -- -local function router_buckets_info(offset, limit) +local function router_buckets_info(router, offset, limit) if offset ~= nil and type(offset) ~= 'number' or limit ~= nil and type(limit) ~= 'number' then error('Usage: buckets_info(offset, limit)') end offset = offset or 0 - limit = limit or M.total_bucket_count + limit = limit or router.total_bucket_count local ret = {} -- Use one string memory for all unknown buckets. local available_rw = 'available_rw' @@ -748,9 +797,9 @@ local function router_buckets_info(offset, limit) local unreachable = 'unreachable' -- Collect limit. local first = math.max(1, offset + 1) - local last = math.min(offset + limit, M.total_bucket_count) + local last = math.min(offset + limit, router.total_bucket_count) for bucket_id = first, last do - local rs = M.route_map[bucket_id] + local rs = router.route_map[bucket_id] if rs then if rs.master and rs.master:is_connected() then ret[bucket_id] = {uuid = rs.uuid, status = available_rw} @@ -770,22 +819,22 @@ end -- Other -------------------------------------------------------------------------------- -local function router_bucket_id(key) +local function router_bucket_id(router, key) if key == nil then error("Usage: vshard.router.bucket_id(key)") end - return lhash.key_hash(key) % M.total_bucket_count + 1 + return lhash.key_hash(key) % router.total_bucket_count + 1 end -local function router_bucket_count() - return M.total_bucket_count +local function router_bucket_count(router) + return router.total_bucket_count end -local function router_sync(timeout) +local function router_sync(router, timeout) if timeout ~= nil and type(timeout) ~= 'number' then error('Usage: vshard.router.sync([timeout: number])') end - for rs_uuid, replicaset in pairs(M.replicasets) do + for rs_uuid, replicaset in pairs(router.replicasets) do local status, err = replicaset:callrw('vshard.storage.sync', {timeout}) if not status then -- Add information about replicaset @@ -799,6 +848,90 @@ if M.errinj.ERRINJ_RELOAD then error('Error injection: reload') end +-------------------------------------------------------------------------------- +-- Managing router instances +-------------------------------------------------------------------------------- + +local function cfg_reconfigure(router, cfg) + return router_cfg(router, cfg, false) +end + +local router_mt = { + __index = { + cfg = cfg_reconfigure; + info = router_info; + buckets_info = router_buckets_info; + call = router_call; + callro = router_callro; + callrw = router_callrw; + route = router_route; + routeall = router_routeall; + bucket_id = router_bucket_id; + bucket_count = router_bucket_count; + sync = router_sync; + bootstrap = cluster_bootstrap; + bucket_discovery = bucket_discovery; + discovery_wakeup = discovery_wakeup; + } +} + +-- Table which represents this module. +local module = {} + +-- This metatable bypasses calls to a module to the static_router. +local module_mt = {__index = {}} +for method_name, method in pairs(router_mt.__index) do + module_mt.__index[method_name] = function(...) + return method(M.static_router, ...) + end +end + +local function export_static_router_attributes() + setmetatable(module, module_mt) +end + +-- +-- Create a new instance of router. +-- @param name Name of a new router. +-- @param cfg Configuration for `router_cfg`. +-- @retval Router instance. +-- @retval Nil and error object. +-- +local function router_new(name, cfg) + if type(name) ~= 'string' or type(cfg) ~= 'table' then + error('Wrong argument type. Usage: vshard.router.new(name, cfg).') + end + if M.routers[name] then + return nil, lerror.vshard(lerror.code.ROUTER_ALREADY_EXISTS, name) + end + local router = table.deepcopy(ROUTER_TEMPLATE) + setmetatable(router, router_mt) + router.name = name + M.routers[name] = router + router_cfg(router, cfg) + return router +end + +-- +-- Wrapper around a `router_new` API, which allow to use old +-- static `vshard.router.cfg()` API. +-- +local function legacy_cfg(cfg) + if M.static_router then + -- Reconfigure. + router_cfg(M.static_router, cfg, false) + else + -- Create new static instance. + local router, err = router_new(STATIC_ROUTER_NAME, cfg) + if router then + M.static_router = router + export_static_router_attributes() + else + return nil, err + end + end +end + -------------------------------------------------------------------------------- -- Module definition -------------------------------------------------------------------------------- @@ -809,28 +942,23 @@ end if not rawget(_G, MODULE_INTERNALS) then rawset(_G, MODULE_INTERNALS, M) else - router_cfg(M.current_cfg, true) + for _, router in pairs(M.routers) do + router_cfg(router, router.current_cfg, true) + setmetatable(router, router_mt) + end + if M.static_router then + export_static_router_attributes() + end M.module_version = M.module_version + 1 end M.discovery_f = discovery_f M.failover_f = failover_f +M.router_mt = router_mt -return { - cfg = function(cfg) return router_cfg(cfg, false) end; - info = router_info; - buckets_info = router_buckets_info; - call = router_call; - callro = router_callro; - callrw = router_callrw; - route = router_route; - routeall = router_routeall; - bucket_id = router_bucket_id; - bucket_count = router_bucket_count; - sync = router_sync; - bootstrap = cluster_bootstrap; - bucket_discovery = bucket_discovery; - discovery_wakeup = discovery_wakeup; - internal = M; - module_version = function() return M.module_version end; -} +module.cfg = legacy_cfg +module.new = router_new +module.internal = M +module.module_version = function() return M.module_version end + +return module diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 0593edf..63aa96f 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -1632,8 +1632,6 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload) M.rebalancer_max_receiving = rebalancer_max_receiving M.shard_index = shard_index M.collect_bucket_garbage_interval = collect_bucket_garbage_interval - M.collect_lua_garbage = collect_lua_garbage - M.current_cfg = cfg M.collect_lua_garbage = vshard_cfg.collect_lua_garbage M.current_cfg = cfg diff --git a/vshard/util.lua b/vshard/util.lua index 37abe2b..3afaa61 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -38,11 +38,11 @@ end -- reload of that module. -- See description of parameters in `reloadable_fiber_create`. -- -local function reloadable_fiber_main_loop(module, func_name) +local function reloadable_fiber_main_loop(module, func_name, data) log.info('%s has been started', func_name) local func = module[func_name] ::restart_loop:: - local ok, err = pcall(func) + local ok, err = pcall(func, data) -- yield serves two purposes: -- * makes this fiber cancellable -- * prevents 100% cpu consumption @@ -60,7 +60,7 @@ local function reloadable_fiber_main_loop(module, func_name) log.info('module is reloaded, restarting') -- luajit drops this frame if next function is called in -- return statement. - return M.reloadable_fiber_main_loop(module, func_name) + return M.reloadable_fiber_main_loop(module, func_name, data) end -- @@ -74,11 +74,13 @@ end -- @param module Module which can be reloaded. -- @param func_name Name of a function to be executed in the -- module. +-- @param data Data to be passed to the specified function. -- @retval New fiber. -- -local function reloadable_fiber_create(fiber_name, module, func_name) +local function reloadable_fiber_create(fiber_name, module, func_name, data) assert(type(fiber_name) == 'string') - local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name) + local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name, + data) xfiber:name(fiber_name) return xfiber end
next prev parent reply other threads:[~2018-08-07 13:18 UTC|newest] Thread overview: 23+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-07-31 16:25 [tarantool-patches] [PATCH 0/3] multiple routers AKhatskevich 2018-07-31 16:25 ` [tarantool-patches] [PATCH 1/3] Update only vshard part of a cfg on reload AKhatskevich 2018-08-01 18:43 ` [tarantool-patches] " Vladislav Shpilevoy 2018-08-03 20:03 ` Alex Khatskevich 2018-08-06 17:03 ` Vladislav Shpilevoy 2018-08-07 13:19 ` Alex Khatskevich 2018-08-08 11:17 ` Vladislav Shpilevoy 2018-07-31 16:25 ` [tarantool-patches] [PATCH 2/3] Move lua gc to a dedicated module AKhatskevich 2018-08-01 18:43 ` [tarantool-patches] " Vladislav Shpilevoy 2018-08-03 20:04 ` Alex Khatskevich 2018-08-06 17:03 ` Vladislav Shpilevoy 2018-08-08 11:17 ` Vladislav Shpilevoy 2018-07-31 16:25 ` [tarantool-patches] [PATCH 3/3] Introduce multiple routers feature AKhatskevich 2018-08-01 18:43 ` [tarantool-patches] " Vladislav Shpilevoy 2018-08-03 20:05 ` Alex Khatskevich 2018-08-06 17:03 ` Vladislav Shpilevoy 2018-08-07 13:18 ` Alex Khatskevich [this message] 2018-08-08 12:28 ` Vladislav Shpilevoy 2018-08-08 14:04 ` Alex Khatskevich 2018-08-08 15:37 ` Vladislav Shpilevoy 2018-08-01 14:30 ` [tarantool-patches] [PATCH] Check self arg passed for router objects AKhatskevich 2018-08-03 20:07 ` [tarantool-patches] [PATCH] Refactor config templates AKhatskevich 2018-08-06 15:49 ` [tarantool-patches] " Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=e98bf0e4-6014-7947-5871-ed802a8b11eb@tarantool.org \ --to=avkhatskevich@tarantool.org \ --cc=tarantool-patches@freelists.org \ --cc=v.shpilevoy@tarantool.org \ --subject='[tarantool-patches] Re: [PATCH 3/3] Introduce multiple routers feature' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox