From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 4E7A627841 for ; Tue, 31 Jul 2018 12:25:44 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id xe9GvyxgHqF6 for ; Tue, 31 Jul 2018 12:25:44 -0400 (EDT) Received: from smtp44.i.mail.ru (smtp44.i.mail.ru [94.100.177.104]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 5687727816 for ; Tue, 31 Jul 2018 12:25:43 -0400 (EDT) From: AKhatskevich Subject: [tarantool-patches] [PATCH 3/3] Introduce multiple routers feature Date: Tue, 31 Jul 2018 19:25:28 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: v.shpilevoy@tarantool.org, tarantool-patches@freelists.org Key points: * Old `vshard.router.some_method()` api is preserved. * Add `vshard.router.new(name, cfg)` method which returns a new router. * Each router has its own: 1. name 2. background fibers 3. attributes (route_map, replicasets, outdate_delay...) * Module reload reloads all configured routers. * `cfg` reconfigures a single router. * All routers share the same box configuration. The last passed config overrides the global config. * Multiple router instances can be connected to the same cluster. * By now, a router cannot be destroyed. Extra changes: * Add `data` parameter to `reloadable_fiber_create` function. Closes #130 --- test/multiple_routers/configs.lua | 81 ++++++ test/multiple_routers/multiple_routers.result | 226 ++++++++++++++++ test/multiple_routers/multiple_routers.test.lua | 85 ++++++ test/multiple_routers/router_1.lua | 15 ++ test/multiple_routers/storage_1_1_a.lua | 23 ++ test/multiple_routers/storage_1_1_b.lua | 1 + test/multiple_routers/storage_1_2_a.lua | 1 + test/multiple_routers/storage_1_2_b.lua | 1 + test/multiple_routers/storage_2_1_a.lua | 1 + test/multiple_routers/storage_2_1_b.lua | 1 + test/multiple_routers/storage_2_2_a.lua | 1 + test/multiple_routers/storage_2_2_b.lua | 1 + test/multiple_routers/suite.ini | 6 + test/multiple_routers/test.lua | 9 + test/router/router.result | 4 +- test/router/router.test.lua | 4 +- vshard/router/init.lua | 341 ++++++++++++++++-------- vshard/util.lua | 12 +- 18 files changed, 690 insertions(+), 123 deletions(-) create mode 100644 test/multiple_routers/configs.lua create mode 100644 test/multiple_routers/multiple_routers.result create mode 100644 test/multiple_routers/multiple_routers.test.lua create mode 100644 test/multiple_routers/router_1.lua create mode 100644 test/multiple_routers/storage_1_1_a.lua create mode 120000 test/multiple_routers/storage_1_1_b.lua create mode 120000 test/multiple_routers/storage_1_2_a.lua create mode 120000 test/multiple_routers/storage_1_2_b.lua create mode 120000 test/multiple_routers/storage_2_1_a.lua create mode 120000 test/multiple_routers/storage_2_1_b.lua create mode 120000 test/multiple_routers/storage_2_2_a.lua create mode 120000 test/multiple_routers/storage_2_2_b.lua create mode 100644 test/multiple_routers/suite.ini create mode 100644 test/multiple_routers/test.lua diff --git a/test/multiple_routers/configs.lua b/test/multiple_routers/configs.lua new file mode 100644 index 0000000..a6ce33c --- /dev/null +++ b/test/multiple_routers/configs.lua @@ -0,0 +1,81 @@ +names = { + storage_1_1_a = '32a2d4b8-f146-44ed-9d51-2436507efdf8', + storage_1_1_b = 'c1c849b1-641d-40b8-9283-bcfe73d46270', + storage_1_2_a = '04e677ed-c7ba-47e0-a67f-b5100cfa86af', + storage_1_2_b = 'c7a979ee-9263-4a38-84a5-2fb6a0a32684', + storage_2_1_a = '88dc03f0-23fb-4f05-b462-e29186542864', + storage_2_1_b = '4230b711-f5c4-4131-bf98-88cd43a16901', + storage_2_2_a = '6b1eefbc-1e2e-410e-84ff-44c572ea9916', + storage_2_2_b = 'be74419a-1e56-4ba4-97e9-6b18710f63c5', +} + +rs_1_1 = 'dd208fb8-8b90-49bc-8393-6b3a99da7c52' +rs_1_2 = 'af9cfe88-2091-4613-a877-a623776c5c0e' +rs_2_1 = '9ca8ee15-ae18-4f31-9385-4859f89ce73f' +rs_2_2 = '007f5f58-b654-4125-8441-a71866fb62b5' + +local cfg_1 = {} +cfg_1.sharding = { + [rs_1_1] = { + replicas = { + [names.storage_1_1_a] = { + uri = 'storage:storage@127.0.0.1:3301', + name = 'storage_1_1_a', + master = true, + }, + [names.storage_1_1_b] = { + uri = 'storage:storage@127.0.0.1:3302', + name = 'storage_1_1_b', + }, + } + }, + [rs_1_2] = { + replicas = { + [names.storage_1_2_a] = { + uri = 'storage:storage@127.0.0.1:3303', + name = 'storage_1_2_a', + master = true, + }, + [names.storage_1_2_b] = { + uri = 'storage:storage@127.0.0.1:3304', + name = 'storage_1_2_b', + }, + } + }, +} + + +local cfg_2 = {} +cfg_2.sharding = { + [rs_2_1] = { + replicas = { + [names.storage_2_1_a] = { + uri = 'storage:storage@127.0.0.1:3305', + name = 'storage_2_1_a', + master = true, + }, + [names.storage_2_1_b] = { + uri = 'storage:storage@127.0.0.1:3306', + name = 'storage_2_1_b', + }, + } + }, + [rs_2_2] = { + replicas = { + [names.storage_2_2_a] = { + uri = 'storage:storage@127.0.0.1:3307', + name = 'storage_2_2_a', + master = true, + }, + [names.storage_2_2_b] = { + uri = 'storage:storage@127.0.0.1:3308', + name = 'storage_2_2_b', + }, + } + }, +} + +return { + cfg_1 = cfg_1, + cfg_2 = cfg_2, +} diff --git a/test/multiple_routers/multiple_routers.result b/test/multiple_routers/multiple_routers.result new file mode 100644 index 0000000..33f4034 --- /dev/null +++ b/test/multiple_routers/multiple_routers.result @@ -0,0 +1,226 @@ +test_run = require('test_run').new() +--- +... +REPLICASET_1_1 = { 'storage_1_1_a', 'storage_1_1_b' } +--- +... +REPLICASET_1_2 = { 'storage_1_2_a', 'storage_1_2_b' } +--- +... +REPLICASET_2_1 = { 'storage_2_1_a', 'storage_2_1_b' } +--- +... +REPLICASET_2_2 = { 'storage_2_2_a', 'storage_2_2_b' } +--- +... +test_run:create_cluster(REPLICASET_1_1, 'multiple_routers') +--- +... +test_run:create_cluster(REPLICASET_1_2, 'multiple_routers') +--- +... +test_run:create_cluster(REPLICASET_2_1, 'multiple_routers') +--- +... +test_run:create_cluster(REPLICASET_2_2, 'multiple_routers') +--- +... +util = require('lua_libs.util') +--- +... +util.wait_master(test_run, REPLICASET_1_1, 'storage_1_1_a') +--- +... +util.wait_master(test_run, REPLICASET_1_2, 'storage_1_2_a') +--- +... +util.wait_master(test_run, REPLICASET_2_1, 'storage_2_1_a') +--- +... +util.wait_master(test_run, REPLICASET_2_2, 'storage_2_2_a') +--- +... +test_run:cmd("create server router_1 with script='multiple_routers/router_1.lua'") +--- +- true +... +test_run:cmd("start server router_1") +--- +- true +... +-- Configure default (static) router. +_ = test_run:cmd("switch router_1") +--- +... +vshard.router.cfg(configs.cfg_1) +--- +... +vshard.router.bootstrap() +--- +- true +... +_ = test_run:cmd("switch storage_1_2_a") +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +_ = test_run:cmd("switch router_1") +--- +... +vshard.router.call(1, 'write', 'do_replace', {{1, 1}}) +--- +- true +... +vshard.router.call(1, 'read', 'do_select', {1}) +--- +- [[1, 1]] +... +-- Configure extra router. +router_2 = vshard.router.new('router_2', configs.cfg_2) +--- +... +router_2:bootstrap() +--- +- true +... +_ = test_run:cmd("switch storage_2_2_a") +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +_ = test_run:cmd("switch router_1") +--- +... +router_2:call(1, 'write', 'do_replace', {{2, 2}}) +--- +- true +... +router_2:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +-- Check that router_2 and static router serves different clusters. +#router_2:call(1, 'read', 'do_select', {1}) == 0 +--- +- true +... +-- Create several routers to the same cluster. +routers = {} +--- +... +for i = 3, 10 do routers[i] = vshard.router.new('router_' .. i, configs.cfg_2) end +--- +... +routers[3]:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +-- Check that they have their own background fibers. +fiber_names = {} +--- +... +for i = 2, 10 do fiber_names['vshard.failover.router_' .. i] = true; fiber_names['vshard.discovery.router_' .. i] = true; end +--- +... +next(fiber_names) ~= nil +--- +- true +... +fiber = require('fiber') +--- +... +for _, xfiber in pairs(fiber.info()) do fiber_names[xfiber.name] = nil end +--- +... +next(fiber_names) == nil +--- +- true +... +-- Reconfigure one of routers do not affect the others. +routers[3]:cfg(configs.cfg_1) +--- +... +routers[3]:call(1, 'read', 'do_select', {1}) +--- +- [[1, 1]] +... +#routers[3]:call(1, 'read', 'do_select', {2}) == 0 +--- +- true +... +#routers[4]:call(1, 'read', 'do_select', {1}) == 0 +--- +- true +... +routers[4]:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +routers[3]:cfg(configs.cfg_2) +--- +... +-- Try to create router with the same name. +util = require('lua_libs.util') +--- +... +util.check_error(vshard.router.new, 'router_2', configs.cfg_2) +--- +- null +- Router with name router_2 already exists +... +-- Reload router module. +_, old_rs_1 = next(vshard.router.internal.static_router.replicasets) +--- +... +_, old_rs_2 = next(router_2.replicasets) +--- +... +package.loaded['vshard.router'] = nil +--- +... +vshard.router = require('vshard.router') +--- +... +while not old_rs_1.is_outdated do fiber.sleep(0.01) end +--- +... +while not old_rs_2.is_outdated do fiber.sleep(0.01) end +--- +... +vshard.router.call(1, 'read', 'do_select', {1}) +--- +- [[1, 1]] +... +router_2:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +routers[5]:call(1, 'read', 'do_select', {2}) +--- +- [[2, 2]] +... +_ = test_run:cmd("switch default") +--- +... +test_run:cmd("stop server router_1") +--- +- true +... +test_run:cmd("cleanup server router_1") +--- +- true +... +test_run:drop_cluster(REPLICASET_1_1) +--- +... +test_run:drop_cluster(REPLICASET_1_2) +--- +... +test_run:drop_cluster(REPLICASET_2_1) +--- +... +test_run:drop_cluster(REPLICASET_2_2) +--- +... diff --git a/test/multiple_routers/multiple_routers.test.lua b/test/multiple_routers/multiple_routers.test.lua new file mode 100644 index 0000000..6d470e1 --- /dev/null +++ b/test/multiple_routers/multiple_routers.test.lua @@ -0,0 +1,85 @@ +test_run = require('test_run').new() + +REPLICASET_1_1 = { 'storage_1_1_a', 'storage_1_1_b' } +REPLICASET_1_2 = { 'storage_1_2_a', 'storage_1_2_b' } +REPLICASET_2_1 = { 'storage_2_1_a', 'storage_2_1_b' } +REPLICASET_2_2 = { 'storage_2_2_a', 'storage_2_2_b' } + +test_run:create_cluster(REPLICASET_1_1, 'multiple_routers') +test_run:create_cluster(REPLICASET_1_2, 'multiple_routers') +test_run:create_cluster(REPLICASET_2_1, 'multiple_routers') +test_run:create_cluster(REPLICASET_2_2, 'multiple_routers') +util = require('lua_libs.util') +util.wait_master(test_run, REPLICASET_1_1, 'storage_1_1_a') +util.wait_master(test_run, REPLICASET_1_2, 'storage_1_2_a') +util.wait_master(test_run, REPLICASET_2_1, 'storage_2_1_a') +util.wait_master(test_run, REPLICASET_2_2, 'storage_2_2_a') + +test_run:cmd("create server router_1 with script='multiple_routers/router_1.lua'") +test_run:cmd("start server router_1") + +-- Configure default (static) router. +_ = test_run:cmd("switch router_1") +vshard.router.cfg(configs.cfg_1) +vshard.router.bootstrap() +_ = test_run:cmd("switch storage_1_2_a") +wait_rebalancer_state('The cluster is balanced ok', test_run) +_ = test_run:cmd("switch router_1") + +vshard.router.call(1, 'write', 'do_replace', {{1, 1}}) +vshard.router.call(1, 'read', 'do_select', {1}) + +-- Configure extra router. +router_2 = vshard.router.new('router_2', configs.cfg_2) +router_2:bootstrap() +_ = test_run:cmd("switch storage_2_2_a") +wait_rebalancer_state('The cluster is balanced ok', test_run) +_ = test_run:cmd("switch router_1") + +router_2:call(1, 'write', 'do_replace', {{2, 2}}) +router_2:call(1, 'read', 'do_select', {2}) +-- Check that router_2 and static router serves different clusters. +#router_2:call(1, 'read', 'do_select', {1}) == 0 + +-- Create several routers to the same cluster. +routers = {} +for i = 3, 10 do routers[i] = vshard.router.new('router_' .. i, configs.cfg_2) end +routers[3]:call(1, 'read', 'do_select', {2}) +-- Check that they have their own background fibers. +fiber_names = {} +for i = 2, 10 do fiber_names['vshard.failover.router_' .. i] = true; fiber_names['vshard.discovery.router_' .. i] = true; end +next(fiber_names) ~= nil +fiber = require('fiber') +for _, xfiber in pairs(fiber.info()) do fiber_names[xfiber.name] = nil end +next(fiber_names) == nil + +-- Reconfigure one of routers do not affect the others. +routers[3]:cfg(configs.cfg_1) +routers[3]:call(1, 'read', 'do_select', {1}) +#routers[3]:call(1, 'read', 'do_select', {2}) == 0 +#routers[4]:call(1, 'read', 'do_select', {1}) == 0 +routers[4]:call(1, 'read', 'do_select', {2}) +routers[3]:cfg(configs.cfg_2) + +-- Try to create router with the same name. +util = require('lua_libs.util') +util.check_error(vshard.router.new, 'router_2', configs.cfg_2) + +-- Reload router module. +_, old_rs_1 = next(vshard.router.internal.static_router.replicasets) +_, old_rs_2 = next(router_2.replicasets) +package.loaded['vshard.router'] = nil +vshard.router = require('vshard.router') +while not old_rs_1.is_outdated do fiber.sleep(0.01) end +while not old_rs_2.is_outdated do fiber.sleep(0.01) end +vshard.router.call(1, 'read', 'do_select', {1}) +router_2:call(1, 'read', 'do_select', {2}) +routers[5]:call(1, 'read', 'do_select', {2}) + +_ = test_run:cmd("switch default") +test_run:cmd("stop server router_1") +test_run:cmd("cleanup server router_1") +test_run:drop_cluster(REPLICASET_1_1) +test_run:drop_cluster(REPLICASET_1_2) +test_run:drop_cluster(REPLICASET_2_1) +test_run:drop_cluster(REPLICASET_2_2) diff --git a/test/multiple_routers/router_1.lua b/test/multiple_routers/router_1.lua new file mode 100644 index 0000000..2e9ea91 --- /dev/null +++ b/test/multiple_routers/router_1.lua @@ -0,0 +1,15 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +-- Get instance name +local fio = require('fio') +local NAME = fio.basename(arg[0], '.lua') + +require('console').listen(os.getenv('ADMIN')) + +configs = require('configs') + +-- Start the database with sharding +vshard = require('vshard') +box.cfg{} diff --git a/test/multiple_routers/storage_1_1_a.lua b/test/multiple_routers/storage_1_1_a.lua new file mode 100644 index 0000000..b44a97a --- /dev/null +++ b/test/multiple_routers/storage_1_1_a.lua @@ -0,0 +1,23 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +-- Get instance name. +local fio = require('fio') +NAME = fio.basename(arg[0], '.lua') + +require('console').listen(os.getenv('ADMIN')) + +-- Fetch config for the cluster of the instance. +if NAME:sub(9,9) == '1' then + cfg = require('configs').cfg_1 +else + cfg = require('configs').cfg_2 +end + +-- Start the database with sharding. +vshard = require('vshard') +vshard.storage.cfg(cfg, names[NAME]) + +-- Bootstrap storage. +require('lua_libs.bootstrap') diff --git a/test/multiple_routers/storage_1_1_b.lua b/test/multiple_routers/storage_1_1_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_1_1_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_1_2_a.lua b/test/multiple_routers/storage_1_2_a.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_1_2_a.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_1_2_b.lua b/test/multiple_routers/storage_1_2_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_1_2_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_1_a.lua b/test/multiple_routers/storage_2_1_a.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_1_a.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_1_b.lua b/test/multiple_routers/storage_2_1_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_1_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_2_a.lua b/test/multiple_routers/storage_2_2_a.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_2_a.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/storage_2_2_b.lua b/test/multiple_routers/storage_2_2_b.lua new file mode 120000 index 0000000..76d196b --- /dev/null +++ b/test/multiple_routers/storage_2_2_b.lua @@ -0,0 +1 @@ +storage_1_1_a.lua \ No newline at end of file diff --git a/test/multiple_routers/suite.ini b/test/multiple_routers/suite.ini new file mode 100644 index 0000000..d2d4470 --- /dev/null +++ b/test/multiple_routers/suite.ini @@ -0,0 +1,6 @@ +[default] +core = tarantool +description = Multiple routers tests +script = test.lua +is_parallel = False +lua_libs = ../lua_libs configs.lua diff --git a/test/multiple_routers/test.lua b/test/multiple_routers/test.lua new file mode 100644 index 0000000..cb7c1ee --- /dev/null +++ b/test/multiple_routers/test.lua @@ -0,0 +1,9 @@ +#!/usr/bin/env tarantool + +require('strict').on() + +box.cfg{ + listen = os.getenv("LISTEN"), +} + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/router/router.result b/test/router/router.result index 45394e1..f123ab9 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -225,7 +225,7 @@ vshard.router.bootstrap() -- -- gh-108: negative bucket count on discovery. -- -vshard.router.internal.route_map = {} +vshard.router.internal.static_router.route_map = {} --- ... rets = {} @@ -1111,7 +1111,7 @@ end; vshard.router.cfg(cfg); --- ... -vshard.router.internal.route_map = {}; +vshard.router.internal.static_router.route_map = {}; --- ... vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false; diff --git a/test/router/router.test.lua b/test/router/router.test.lua index df2f381..a421d0c 100644 --- a/test/router/router.test.lua +++ b/test/router/router.test.lua @@ -91,7 +91,7 @@ vshard.router.bootstrap() -- -- gh-108: negative bucket count on discovery. -- -vshard.router.internal.route_map = {} +vshard.router.internal.static_router.route_map = {} rets = {} function do_echo() table.insert(rets, vshard.router.callro(1, 'echo', {1})) end f1 = fiber.create(do_echo) f2 = fiber.create(do_echo) @@ -423,7 +423,7 @@ while vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY ~= 'waiting' do fiber.sleep(0.02) end; vshard.router.cfg(cfg); -vshard.router.internal.route_map = {}; +vshard.router.internal.static_router.route_map = {}; vshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false; -- Do discovery iteration. Upload buckets from the -- first replicaset. diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 3e127cb..7569baf 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -25,14 +25,31 @@ local M = rawget(_G, MODULE_INTERNALS) if not M then M = { ---------------- Common module attributes ---------------- - -- The last passed configuration. - current_cfg = nil, errinj = { ERRINJ_CFG = false, ERRINJ_FAILOVER_CHANGE_CFG = false, ERRINJ_RELOAD = false, ERRINJ_LONG_DISCOVERY = false, }, + -- Dictionary, key is router name, value is a router. + routers = {}, + -- Router object which can be accessed by old api: + -- e.g. vshard.router.call(...) + static_router = nil, + -- This counter is used to restart background fibers with + -- new reloaded code. + module_version = 0, + } +end + +-- +-- Router object attributes. +-- +local ROUTER_TEMPLATE = { + -- Name of router. + name = nil, + -- The last passed configuration. + current_cfg = nil, -- Time to outdate old objects on reload. connection_outdate_delay = nil, -- Bucket map cache. @@ -47,38 +64,36 @@ if not M then total_bucket_count = 0, -- Boolean lua_gc state (create periodic gc task). collect_lua_garbage = nil, - -- This counter is used to restart background fibers with - -- new reloaded code. - module_version = 0, - } -end +} + +local STATIC_ROUTER_NAME = 'static_router' -- Set a bucket to a replicaset. -local function bucket_set(bucket_id, rs_uuid) - local replicaset = M.replicasets[rs_uuid] +local function bucket_set(router, bucket_id, rs_uuid) + local replicaset = router.replicasets[rs_uuid] -- It is technically possible to delete a replicaset at the -- same time when route to the bucket is discovered. if not replicaset then return nil, lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET, bucket_id) end - local old_replicaset = M.route_map[bucket_id] + local old_replicaset = router.route_map[bucket_id] if old_replicaset ~= replicaset then if old_replicaset then old_replicaset.bucket_count = old_replicaset.bucket_count - 1 end replicaset.bucket_count = replicaset.bucket_count + 1 end - M.route_map[bucket_id] = replicaset + router.route_map[bucket_id] = replicaset return replicaset end -- Remove a bucket from the cache. -local function bucket_reset(bucket_id) - local replicaset = M.route_map[bucket_id] +local function bucket_reset(router, bucket_id) + local replicaset = router.route_map[bucket_id] if replicaset then replicaset.bucket_count = replicaset.bucket_count - 1 end - M.route_map[bucket_id] = nil + router.route_map[bucket_id] = nil end -------------------------------------------------------------------------------- @@ -86,8 +101,8 @@ end -------------------------------------------------------------------------------- -- Search bucket in whole cluster -local function bucket_discovery(bucket_id) - local replicaset = M.route_map[bucket_id] +local function bucket_discovery(router, bucket_id) + local replicaset = router.route_map[bucket_id] if replicaset ~= nil then return replicaset end @@ -95,14 +110,14 @@ local function bucket_discovery(bucket_id) log.verbose("Discovering bucket %d", bucket_id) local last_err = nil local unreachable_uuid = nil - for uuid, _ in pairs(M.replicasets) do + for uuid, _ in pairs(router.replicasets) do -- Handle reload/reconfigure. - replicaset = M.replicasets[uuid] + replicaset = router.replicasets[uuid] if replicaset then local _, err = replicaset:callrw('vshard.storage.bucket_stat', {bucket_id}) if err == nil then - return bucket_set(bucket_id, replicaset.uuid) + return bucket_set(router, bucket_id, replicaset.uuid) elseif err.code ~= lerror.code.WRONG_BUCKET then last_err = err unreachable_uuid = uuid @@ -132,14 +147,14 @@ local function bucket_discovery(bucket_id) end -- Resolve bucket id to replicaset uuid -local function bucket_resolve(bucket_id) +local function bucket_resolve(router, bucket_id) local replicaset, err - local replicaset = M.route_map[bucket_id] + local replicaset = router.route_map[bucket_id] if replicaset ~= nil then return replicaset end -- Replicaset removed from cluster, perform discovery - replicaset, err = bucket_discovery(bucket_id) + replicaset, err = bucket_discovery(router, bucket_id) if replicaset == nil then return nil, err end @@ -150,14 +165,14 @@ end -- Background fiber to perform discovery. It periodically scans -- replicasets one by one and updates route_map. -- -local function discovery_f() +local function discovery_f(router) local module_version = M.module_version while module_version == M.module_version do - while not next(M.replicasets) do + while not next(router.replicasets) do lfiber.sleep(consts.DISCOVERY_INTERVAL) end - local old_replicasets = M.replicasets - for rs_uuid, replicaset in pairs(M.replicasets) do + local old_replicasets = router.replicasets + for rs_uuid, replicaset in pairs(router.replicasets) do local active_buckets, err = replicaset:callro('vshard.storage.buckets_discovery', {}, {timeout = 2}) @@ -167,7 +182,7 @@ local function discovery_f() end -- Renew replicasets object captured by the for loop -- in case of reconfigure and reload events. - if M.replicasets ~= old_replicasets then + if router.replicasets ~= old_replicasets then break end if not active_buckets then @@ -180,11 +195,11 @@ local function discovery_f() end replicaset.bucket_count = #active_buckets for _, bucket_id in pairs(active_buckets) do - local old_rs = M.route_map[bucket_id] + local old_rs = router.route_map[bucket_id] if old_rs and old_rs ~= replicaset then old_rs.bucket_count = old_rs.bucket_count - 1 end - M.route_map[bucket_id] = replicaset + router.route_map[bucket_id] = replicaset end end lfiber.sleep(consts.DISCOVERY_INTERVAL) @@ -195,9 +210,9 @@ end -- -- Immediately wakeup discovery fiber if exists. -- -local function discovery_wakeup() - if M.discovery_fiber then - M.discovery_fiber:wakeup() +local function discovery_wakeup(router) + if router.discovery_fiber then + router.discovery_fiber:wakeup() end end @@ -209,7 +224,7 @@ end -- Function will restart operation after wrong bucket response until timeout -- is reached -- -local function router_call(bucket_id, mode, func, args, opts) +local function router_call(router, bucket_id, mode, func, args, opts) if opts and (type(opts) ~= 'table' or (opts.timeout and type(opts.timeout) ~= 'number')) then error('Usage: call(bucket_id, mode, func, args, opts)') @@ -217,7 +232,7 @@ local function router_call(bucket_id, mode, func, args, opts) local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN local replicaset, err local tend = lfiber.time() + timeout - if bucket_id > M.total_bucket_count or bucket_id <= 0 then + if bucket_id > router.total_bucket_count or bucket_id <= 0 then error('Bucket is unreachable: bucket id is out of range') end local call @@ -227,7 +242,7 @@ local function router_call(bucket_id, mode, func, args, opts) call = 'callrw' end repeat - replicaset, err = bucket_resolve(bucket_id) + replicaset, err = bucket_resolve(router, bucket_id) if replicaset then ::replicaset_is_found:: local storage_call_status, call_status, call_error = @@ -243,9 +258,9 @@ local function router_call(bucket_id, mode, func, args, opts) end err = call_status if err.code == lerror.code.WRONG_BUCKET then - bucket_reset(bucket_id) + bucket_reset(router, bucket_id) if err.destination then - replicaset = M.replicasets[err.destination] + replicaset = router.replicasets[err.destination] if not replicaset then log.warn('Replicaset "%s" was not found, but received'.. ' from storage as destination - please '.. @@ -257,13 +272,13 @@ local function router_call(bucket_id, mode, func, args, opts) -- but already is executed on storages. while lfiber.time() <= tend do lfiber.sleep(0.05) - replicaset = M.replicasets[err.destination] + replicaset = router.replicasets[err.destination] if replicaset then goto replicaset_is_found end end else - replicaset = bucket_set(bucket_id, replicaset.uuid) + replicaset = bucket_set(router, bucket_id, replicaset.uuid) lfiber.yield() -- Protect against infinite cycle in a -- case of broken cluster, when a bucket @@ -280,7 +295,7 @@ local function router_call(bucket_id, mode, func, args, opts) -- is not timeout - these requests are repeated in -- any case on client, if error. assert(mode == 'write') - bucket_reset(bucket_id) + bucket_reset(router, bucket_id) return nil, err elseif err.code == lerror.code.NON_MASTER then -- Same, as above - do not wait and repeat. @@ -306,12 +321,12 @@ end -- -- Wrappers for router_call with preset mode. -- -local function router_callro(bucket_id, ...) - return router_call(bucket_id, 'read', ...) +local function router_callro(router, bucket_id, ...) + return router_call(router, bucket_id, 'read', ...) end -local function router_callrw(bucket_id, ...) - return router_call(bucket_id, 'write', ...) +local function router_callrw(router, bucket_id, ...) + return router_call(router, bucket_id, 'write', ...) end -- @@ -319,27 +334,27 @@ end -- @param bucket_id Bucket identifier. -- @retval Netbox connection. -- -local function router_route(bucket_id) +local function router_route(router, bucket_id) if type(bucket_id) ~= 'number' then error('Usage: router.route(bucket_id)') end - return bucket_resolve(bucket_id) + return bucket_resolve(router, bucket_id) end -- -- Return map of all replicasets. -- @retval See self.replicasets map. -- -local function router_routeall() - return M.replicasets +local function router_routeall(router) + return router.replicasets end -------------------------------------------------------------------------------- -- Failover -------------------------------------------------------------------------------- -local function failover_ping_round() - for _, replicaset in pairs(M.replicasets) do +local function failover_ping_round(router) + for _, replicaset in pairs(router.replicasets) do local replica = replicaset.replica if replica ~= nil and replica.conn ~= nil and replica.down_ts == nil then @@ -382,10 +397,10 @@ end -- Collect UUIDs of replicasets, priority of whose replica -- connections must be updated. -- -local function failover_collect_to_update() +local function failover_collect_to_update(router) local ts = lfiber.time() local uuid_to_update = {} - for uuid, rs in pairs(M.replicasets) do + for uuid, rs in pairs(router.replicasets) do if failover_need_down_priority(rs, ts) or failover_need_up_priority(rs, ts) then table.insert(uuid_to_update, uuid) @@ -400,16 +415,16 @@ end -- disconnected replicas. -- @retval true A replica of an replicaset has been changed. -- -local function failover_step() - failover_ping_round() - local uuid_to_update = failover_collect_to_update() +local function failover_step(router) + failover_ping_round(router) + local uuid_to_update = failover_collect_to_update(router) if #uuid_to_update == 0 then return false end local curr_ts = lfiber.time() local replica_is_changed = false for _, uuid in pairs(uuid_to_update) do - local rs = M.replicasets[uuid] + local rs = router.replicasets[uuid] if M.errinj.ERRINJ_FAILOVER_CHANGE_CFG then rs = nil M.errinj.ERRINJ_FAILOVER_CHANGE_CFG = false @@ -451,7 +466,7 @@ end -- tries to reconnect to the best replica. When the connection is -- established, it replaces the original replica. -- -local function failover_f() +local function failover_f(router) local module_version = M.module_version local min_timeout = math.min(consts.FAILOVER_UP_TIMEOUT, consts.FAILOVER_DOWN_TIMEOUT) @@ -461,7 +476,7 @@ local function failover_f() local prev_was_ok = false while module_version == M.module_version do ::continue:: - local ok, replica_is_changed = pcall(failover_step) + local ok, replica_is_changed = pcall(failover_step, router) if not ok then log.error('Error during failovering: %s', lerror.make(replica_is_changed)) @@ -488,9 +503,14 @@ end -- Configuration -------------------------------------------------------------------------------- -local function router_cfg(cfg) - local vshard_cfg, box_cfg = lcfg.check(cfg, M.current_cfg) - if not M.replicasets then +-- Types of configuration. +CFG_NEW = 'new' +CFG_RELOAD = 'reload' +CFG_RECONFIGURE = 'reconfigure' + +local function router_cfg(router, cfg, cfg_type) + local vshard_cfg, box_cfg = lcfg.check(cfg, router.current_cfg) + if cfg_type == CFG_NEW then log.info('Starting router configuration') else log.info('Starting router reconfiguration') @@ -512,44 +532,53 @@ local function router_cfg(cfg) -- Move connections from an old configuration to a new one. -- It must be done with no yields to prevent usage both of not -- fully moved old replicasets, and not fully built new ones. - lreplicaset.rebind_replicasets(new_replicasets, M.replicasets) + lreplicaset.rebind_replicasets(new_replicasets, router.replicasets) -- Now the new replicasets are fully built. Can establish -- connections and yield. for _, replicaset in pairs(new_replicasets) do replicaset:connect_all() end lreplicaset.wait_masters_connect(new_replicasets) - lreplicaset.outdate_replicasets(M.replicasets, + lreplicaset.outdate_replicasets(router.replicasets, vshard_cfg.connection_outdate_delay) - M.connection_outdate_delay = vshard_cfg.connection_outdate_delay - M.total_bucket_count = total_bucket_count - M.collect_lua_garbage = vshard_cfg.collect_lua_garbage - M.current_cfg = vshard_cfg - M.replicasets = new_replicasets + router.connection_outdate_delay = vshard_cfg.connection_outdate_delay + router.total_bucket_count = total_bucket_count + router.collect_lua_garbage = vshard_cfg.collect_lua_garbage + router.current_cfg = vshard_cfg + router.replicasets = new_replicasets -- Update existing route map in-place. - local old_route_map = M.route_map - M.route_map = {} + local old_route_map = router.route_map + router.route_map = {} for bucket, rs in pairs(old_route_map) do - M.route_map[bucket] = M.replicasets[rs.uuid] + router.route_map[bucket] = router.replicasets[rs.uuid] end - if M.failover_fiber == nil then - M.failover_fiber = util.reloadable_fiber_create( - 'vshard.failover', M, 'failover_f') + if router.failover_fiber == nil then + router.failover_fiber = util.reloadable_fiber_create( + 'vshard.failover.' .. router.name, M, 'failover_f', router) end - if M.discovery_fiber == nil then - M.discovery_fiber = util.reloadable_fiber_create( - 'vshard.discovery', M, 'discovery_f') + if router.discovery_fiber == nil then + router.discovery_fiber = util.reloadable_fiber_create( + 'vshard.discovery.' .. router.name, M, 'discovery_f', router) end - lua_gc.set_state(M.collect_lua_garbage, consts.COLLECT_LUA_GARBAGE_INTERVAL) +end + +local function updage_lua_gc_state() + local lua_gc = false + for _, xrouter in pairs(M.routers) do + if xrouter.collect_lua_garbage then + lua_gc = true + end + end + lua_gc.set_state(lua_gc, consts.COLLECT_LUA_GARBAGE_INTERVAL) end -------------------------------------------------------------------------------- -- Bootstrap -------------------------------------------------------------------------------- -local function cluster_bootstrap() +local function cluster_bootstrap(router) local replicasets = {} - for uuid, replicaset in pairs(M.replicasets) do + for uuid, replicaset in pairs(router.replicasets) do table.insert(replicasets, replicaset) local count, err = replicaset:callrw('vshard.storage.buckets_count', {}) @@ -560,9 +589,10 @@ local function cluster_bootstrap() return nil, lerror.vshard(lerror.code.NON_EMPTY) end end - lreplicaset.calculate_etalon_balance(M.replicasets, M.total_bucket_count) + lreplicaset.calculate_etalon_balance(router.replicasets, + router.total_bucket_count) local bucket_id = 1 - for uuid, replicaset in pairs(M.replicasets) do + for uuid, replicaset in pairs(router.replicasets) do if replicaset.etalon_bucket_count > 0 then local ok, err = replicaset:callrw('vshard.storage.bucket_force_create', @@ -618,7 +648,7 @@ local function replicaset_instance_info(replicaset, name, alerts, errcolor, return info, consts.STATUS.GREEN end -local function router_info() +local function router_info(router) local state = { replicasets = {}, bucket = { @@ -632,7 +662,7 @@ local function router_info() } local bucket_info = state.bucket local known_bucket_count = 0 - for rs_uuid, replicaset in pairs(M.replicasets) do + for rs_uuid, replicaset in pairs(router.replicasets) do -- Replicaset info parameters: -- * master instance info; -- * replica instance info; @@ -720,7 +750,7 @@ local function router_info() -- If a bucket is unreachable, then replicaset is -- unreachable too and color already is red. end - bucket_info.unknown = M.total_bucket_count - known_bucket_count + bucket_info.unknown = router.total_bucket_count - known_bucket_count if bucket_info.unknown > 0 then state.status = math.max(state.status, consts.STATUS.YELLOW) table.insert(state.alerts, lerror.alert(lerror.code.UNKNOWN_BUCKETS, @@ -737,13 +767,13 @@ end -- @param limit Maximal bucket count in output. -- @retval Map of type {bucket_id = 'unknown'/replicaset_uuid}. -- -local function router_buckets_info(offset, limit) +local function router_buckets_info(router, offset, limit) if offset ~= nil and type(offset) ~= 'number' or limit ~= nil and type(limit) ~= 'number' then error('Usage: buckets_info(offset, limit)') end offset = offset or 0 - limit = limit or M.total_bucket_count + limit = limit or router.total_bucket_count local ret = {} -- Use one string memory for all unknown buckets. local available_rw = 'available_rw' @@ -752,9 +782,9 @@ local function router_buckets_info(offset, limit) local unreachable = 'unreachable' -- Collect limit. local first = math.max(1, offset + 1) - local last = math.min(offset + limit, M.total_bucket_count) + local last = math.min(offset + limit, router.total_bucket_count) for bucket_id = first, last do - local rs = M.route_map[bucket_id] + local rs = router.route_map[bucket_id] if rs then if rs.master and rs.master:is_connected() then ret[bucket_id] = {uuid = rs.uuid, status = available_rw} @@ -774,22 +804,22 @@ end -- Other -------------------------------------------------------------------------------- -local function router_bucket_id(key) +local function router_bucket_id(router, key) if key == nil then error("Usage: vshard.router.bucket_id(key)") end - return lhash.key_hash(key) % M.total_bucket_count + 1 + return lhash.key_hash(key) % router.total_bucket_count + 1 end -local function router_bucket_count() - return M.total_bucket_count +local function router_bucket_count(router) + return router.total_bucket_count end -local function router_sync(timeout) +local function router_sync(router, timeout) if timeout ~= nil and type(timeout) ~= 'number' then error('Usage: vshard.router.sync([timeout: number])') end - for rs_uuid, replicaset in pairs(M.replicasets) do + for rs_uuid, replicaset in pairs(router.replicasets) do local status, err = replicaset:callrw('vshard.storage.sync', {timeout}) if not status then -- Add information about replicaset @@ -803,6 +833,93 @@ if M.errinj.ERRINJ_RELOAD then error('Error injection: reload') end +-------------------------------------------------------------------------------- +-- Managing router instances +-------------------------------------------------------------------------------- + +local function cfg_reconfigure(router, cfg) + return router_cfg(router, cfg, CFG_RECONFIGURE) +end + +local router_mt = { + __index = { + cfg = cfg_reconfigure; + info = router_info; + buckets_info = router_buckets_info; + call = router_call; + callro = router_callro; + callrw = router_callrw; + route = router_route; + routeall = router_routeall; + bucket_id = router_bucket_id; + bucket_count = router_bucket_count; + sync = router_sync; + bootstrap = cluster_bootstrap; + bucket_discovery = bucket_discovery; + discovery_wakeup = discovery_wakeup; + } +} + +-- Table which represents this module. +local module = {} + +local function export_static_router_attributes() + -- This metatable bypasses calls to a module to the static_router. + local module_mt = {__index = {}} + for method_name, method in pairs(router_mt.__index) do + module_mt.__index[method_name] = function(...) + if M.static_router then + return method(M.static_router, ...) + else + error('Static router is not configured') + end + end + end + setmetatable(module, module_mt) + -- Make static_router attributes accessible form + -- vshard.router.internal. + local M_static_router_attributes = { + name = true, + replicasets = true, + route_map = true, + total_bucket_count = true, + } + setmetatable(M, { + __index = function(M, key) + return M.static_router[key] + end + }) +end + +local function router_new(name, cfg) + assert(type(name) == 'string' and type(cfg) == 'table', + 'Wrong argument type. Usage: vshard.router.new(name, cfg).') + if M.routers[name] then + return nil, string.format('Router with name %s already exists', name) + end + local router = table.deepcopy(ROUTER_TEMPLATE) + setmetatable(router, router_mt) + router.name = name + M.routers[name] = router + if name == STATIC_ROUTER_NAME then + M.static_router = router + export_static_router_attributes() + end + router_cfg(router, cfg, CFG_NEW) + updage_lua_gc_state() + return router +end + +local function legacy_cfg(cfg) + if M.static_router then + -- Reconfigure. + router_cfg(M.static_router, cfg, CFG_RECONFIGURE) + else + -- Create new static instance. + router_new(STATIC_ROUTER_NAME, cfg) + end +end + -------------------------------------------------------------------------------- -- Module definition -------------------------------------------------------------------------------- @@ -813,28 +930,24 @@ end if not rawget(_G, MODULE_INTERNALS) then rawset(_G, MODULE_INTERNALS, M) else - router_cfg(M.current_cfg) + for _, router in pairs(M.routers) do + router_cfg(router, router.current_cfg, CFG_RELOAD) + setmetatable(router, router_mt) + end + updage_lua_gc_state() M.module_version = M.module_version + 1 end M.discovery_f = discovery_f M.failover_f = failover_f +M.router_mt = router_mt +if M.static_router then + export_static_router_attributes() +end -return { - cfg = router_cfg; - info = router_info; - buckets_info = router_buckets_info; - call = router_call; - callro = router_callro; - callrw = router_callrw; - route = router_route; - routeall = router_routeall; - bucket_id = router_bucket_id; - bucket_count = router_bucket_count; - sync = router_sync; - bootstrap = cluster_bootstrap; - bucket_discovery = bucket_discovery; - discovery_wakeup = discovery_wakeup; - internal = M; - module_version = function() return M.module_version end; -} +module.cfg = legacy_cfg +module.new = router_new +module.internal = M +module.module_version = function() return M.module_version end + +return module diff --git a/vshard/util.lua b/vshard/util.lua index ea676ff..852e8a3 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -38,11 +38,11 @@ end -- reload of that module. -- See description of parameters in `reloadable_fiber_create`. -- -local function reloadable_fiber_main_loop(module, func_name) +local function reloadable_fiber_main_loop(module, func_name, data) log.info('%s has been started', func_name) local func = module[func_name] ::restart_loop:: - local ok, err = pcall(func) + local ok, err = pcall(func, data) -- yield serves two purposes: -- * makes this fiber cancellable -- * prevents 100% cpu consumption @@ -60,7 +60,7 @@ local function reloadable_fiber_main_loop(module, func_name) log.info('module is reloaded, restarting') -- luajit drops this frame if next function is called in -- return statement. - return M.reloadable_fiber_main_loop(module, func_name) + return M.reloadable_fiber_main_loop(module, func_name, data) end -- @@ -73,11 +73,13 @@ end -- @param module Module which can be reloaded. -- @param func_name Name of a function to be executed in the -- module. +-- @param data Data to be passed to the specified function. -- @retval New fiber. -- -local function reloadable_fiber_create(fiber_name, module, func_name) +local function reloadable_fiber_create(fiber_name, module, func_name, data) assert(type(fiber_name) == 'string') - local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name) + local xfiber = fiber.create(reloadable_fiber_main_loop, module, func_name, + data) xfiber:name(fiber_name) return xfiber end -- 2.14.1