From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id F18B324113 for ; Mon, 25 Jun 2018 17:54:53 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id yGULeqxhSV4i for ; Mon, 25 Jun 2018 17:54:53 -0400 (EDT) Received: from smtp15.mail.ru (smtp15.mail.ru [94.100.176.133]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id E20B023E18 for ; Mon, 25 Jun 2018 17:54:52 -0400 (EDT) Subject: [tarantool-patches] Re: [PATCH 3/3] Introduce destroy module feature References: From: Alex Khatskevich Message-ID: <1ebeee48-5204-f1c4-0775-bc12276bfcdf@tarantool.org> Date: Tue, 26 Jun 2018 00:54:46 +0300 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-US Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: Vladislav Shpilevoy , tarantool-patches@freelists.org >> --- a/test/lua_libs/util.lua >> +++ b/test/lua_libs/util.lua >> @@ -69,9 +69,43 @@ local function wait_master(test_run, replicaset, >> master) >>       log.info('Slaves are connected to a master "%s"', master) >>   end >>   +function vshard_fiber_list() >> +    -- Flush jit traces to prevent them from >> +    -- keeping its upvalues in memory. >> +    jit.flush() >> +    collectgarbage() >> +    -- Give a fiber time to clean itself. >> +    fiber.sleep(0.05) > > 1. Why do you need this sleep? As far as I know > collectgarbage just cleans all the garbage in the same > fiber with no yields or async tasks. netbox: If `collect garbage` has collected a connection, the fiber need a time to wakeup after fiber:cancel() to process it and exit. another module: the same possible behavior. >> >> +        local add = true >> +        for _, pattern in pairs(non_vshard_patterns) do >> +            if fib.name:match(pattern) then >> +                add = false >> +                break >> +            end >> +        end >> +        if add then >> +            table.insert(names, fib.name) >> +        end >> +    end >> +    table.sort(names) > > 2. Sort of an array just does nothing, it is not? Fibers return in non-predictable order. If one uses this function just to immediately print a fiber list, he/she expects the retval to be consistent over time. > >> diff --git a/test/router/destroy.result b/test/router/destroy.result >> +        sleep(0.05) > > 3. Maybe fiber.sleep? Process sleep looks weird here. >> +---> diff --git a/test/storage/destroy.result >> b/test/storage/destroy.result >> +        sleep(0.05) > 4. Same. Fixed, thanks >> +    end >> +end; >> +---> diff --git a/vshard/router/init.lua b/vshard/router/init.lua >> index 21093e5..da8e49e 100644 >> --- a/vshard/router/init.lua >> +++ b/vshard/router/init.lua >> @@ -33,6 +33,27 @@ if not M then >>       } >>   end >>   +-- >> +-- Destroy router module. >> +-- >> +local function destroy() >> +    local MODULE_INTERNALS = '__module_vshard_router' >> +    assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed') >> +    local bg_fibers = { >> +        'failover_fiber', >> +        'discovery_fiber', >> +    } > > 5. Please, just inline this cycle in 2 lines. Anyway on a new > fiber it is necessary to add a new name to bg_fibers. done > >> +    for _, fib_name in pairs(bg_fibers) do >> +        if M[fib_name] then >> +            M[fib_name]:cancel() >> +            M[fib_name] = nil >> +        end >> +    end >> +    vshard.router.internal = nil >> +    rawset(_G, MODULE_INTERNALS, nil) >> +    M = nil > > 6. I do not like, that recfg now looks like > > destroy > package.loaded = nil > cfg > > It should not be necessary to nullify the package. Today in > the public chat a customer asked about > > box.cfg{} > box.stop() > box.cfg{} > box.stop() > > So there is no place for package.loaded. Most of people > even do not know about this thing existence. reimplemented. waiting for feedback. > >> +end >> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua >> index 059e705..ac37163 100644 >> --- a/vshard/storage/init.lua >> +++ b/vshard/storage/init.lua >> @@ -207,6 +207,43 @@ local function on_master_enable(...) >>       end >>   end >> +-------------------------------------------------------------------------------- >> +-- Destroy >> +-------------------------------------------------------------------------------- >> >> + >> +-- >> +-- Destroy storage module. >> +-- >> +local function destroy() >> +    local MODULE_INTERNALS = '__module_vshard_storage' >> +    assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed') >> +    box.space._bucket:drop() > 7. When this instance is master for another instance (it will be so > when we return to master-master topology), this drop will be replicated > to active instances. Please, forbid destroy when the instance has active > relays. This feature is under development... >> +    for _, name in ipairs(storage_api) do >> +        box.schema.func.drop(name) >> +    end >> +    local bg_fibers = { >> +        'recovery_fiber', >> +        'collect_bucket_garbage_fiber', >> +        'rebalancer_applier_fiber', >> +        'rebalancer_fiber', >> +    } >> +    for _, fib_name in pairs(bg_fibers) do >> +        if M[fib_name] then >> +            M[fib_name]:cancel() >> +            M[fib_name] = nil >> +        end >> +    end >> +    local box_cfg = table.deepcopy(box.cfg) >> +    box_cfg.replication = nil >> +    box_cfg.read_only = nil > > 8. Assign nil to table is like removal of the key. So here actually > you just recall empty box.cfg{}. To turn off the replication you > should pass empty replication array explicitly. To turn off the > read_only you should pass read_only = false explicitly. Thanks >> +    box.snapshot() > > 9. Why? deleted Full diff commit b0cddc2a3ba30d3ecf57e71ce8d095bf2f093a5e Author: AKhatskevich Date:   Wed Jun 20 00:31:25 2018 +0300     Introduce destroy module feature     Introduce functions:      * vshard.router.destroy()      * vshard.storage.destroy()     Those functions:      * close connections      * stop background fibers      * delete vshard spaces      * delete vshard funcitons      * delete `once` metadate     After the destroy, module can be configured as it was just loaded.     Extra changes:      * introduce fiber_list function which returns names of non-tarantool        fibers      * introduce update_M function, which updates M (module internals) with        values defined in the module     Closes #121 diff --git a/test/lua_libs/util.lua b/test/lua_libs/util.lua index f2d3b48..ce0ea67 100644 --- a/test/lua_libs/util.lua +++ b/test/lua_libs/util.lua @@ -69,9 +69,43 @@ local function wait_master(test_run, replicaset, master)      log.info('Slaves are connected to a master "%s"', master)  end +function vshard_fiber_list() +    -- Flush jit traces to prevent them from +    -- keeping its upvalues in memory. +    jit.flush() +    collectgarbage() +    -- Give a fiber time to clean itself. +    fiber.sleep(0.05) +    local fibers = fiber.info() +    local non_vshard_patterns = { +        '^console', +        'feedback_daemon$', +        '^checkpoint_daemon$', +        '^main$', +        '^memtx%.gc$', +        '^vinyl%.scheduler$', +    } +    local names = {} +    for _, fib in pairs(fibers) do +        local add = true +        for _, pattern in pairs(non_vshard_patterns) do +            if fib.name:match(pattern) then +                add = false +                break +            end +        end +        if add then +            table.insert(names, fib.name) +        end +    end +    table.sort(names) +    return names +end; +  return {      check_error = check_error,      shuffle_masters = shuffle_masters,      collect_timeouts = collect_timeouts,      wait_master = wait_master, +    vshard_fiber_list = vshard_fiber_list,  } diff --git a/test/router/destroy.result b/test/router/destroy.result new file mode 100644 index 0000000..9284ef6 --- /dev/null +++ b/test/router/destroy.result @@ -0,0 +1,105 @@ +test_run = require('test_run').new() +--- +... +netbox = require('net.box') +--- +... +fiber = require('fiber') +--- +... +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } +--- +... +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } +--- +... +test_run:create_cluster(REPLICASET_1, 'storage') +--- +... +test_run:create_cluster(REPLICASET_2, 'storage') +--- +... +util = require('util') +--- +... +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') +--- +... +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') +--- +... +test_run:cmd("create server router_1 with script='router/router_1.lua'") +--- +- true +... +test_run:cmd("start server router_1") +--- +- true +... +_ = test_run:cmd("switch router_1") +--- +... +util = require('util') +--- +... +fiber = require('fiber') +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function wait_fibers_exit() +    while #util.vshard_fiber_list() > 0 do +        fiber.sleep(0.05) +    end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Validate destroy finction by fiber_list. +-- Netbox fibers are deleted after replicas by GC. +util.vshard_fiber_list() +--- +- - 127.0.0.1:3301 (net.box) +  - 127.0.0.1:3302 (net.box) +  - 127.0.0.1:3303 (net.box) +  - 127.0.0.1:3304 (net.box) +  - discovery_fiber +  - vshard.failover +... +vshard.router.destroy() +--- +... +wait_fibers_exit() +--- +... +vshard.router.cfg(cfg) +--- +... +util.vshard_fiber_list() +--- +- - 127.0.0.1:3301 (net.box) +  - 127.0.0.1:3302 (net.box) +  - 127.0.0.1:3303 (net.box) +  - 127.0.0.1:3304 (net.box) +  - discovery_fiber +  - vshard.failover +... +_ = test_run:cmd("switch default") +--- +... +test_run:cmd('stop server router_1') +--- +- true +... +test_run:cmd('cleanup server router_1') +--- +- true +... +test_run:drop_cluster(REPLICASET_2) +--- +... diff --git a/test/router/destroy.test.lua b/test/router/destroy.test.lua new file mode 100644 index 0000000..caf4d8e --- /dev/null +++ b/test/router/destroy.test.lua @@ -0,0 +1,38 @@ +test_run = require('test_run').new() +netbox = require('net.box') +fiber = require('fiber') + +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } + +test_run:create_cluster(REPLICASET_1, 'storage') +test_run:create_cluster(REPLICASET_2, 'storage') +util = require('util') +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') +test_run:cmd("create server router_1 with script='router/router_1.lua'") +test_run:cmd("start server router_1") + +_ = test_run:cmd("switch router_1") +util = require('util') +fiber = require('fiber') +test_run:cmd("setopt delimiter ';'") +function wait_fibers_exit() +    while #util.vshard_fiber_list() > 0 do +        fiber.sleep(0.05) +    end +end; +test_run:cmd("setopt delimiter ''"); + +-- Validate destroy finction by fiber_list. +-- Netbox fibers are deleted after replicas by GC. +util.vshard_fiber_list() +vshard.router.destroy() +wait_fibers_exit() +vshard.router.cfg(cfg) +util.vshard_fiber_list() + +_ = test_run:cmd("switch default") +test_run:cmd('stop server router_1') +test_run:cmd('cleanup server router_1') +test_run:drop_cluster(REPLICASET_2) diff --git a/test/storage/destroy.result b/test/storage/destroy.result new file mode 100644 index 0000000..180fc0b --- /dev/null +++ b/test/storage/destroy.result @@ -0,0 +1,129 @@ +test_run = require('test_run').new() +--- +... +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } +--- +... +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } +--- +... +test_run:create_cluster(REPLICASET_1, 'storage') +--- +... +test_run:create_cluster(REPLICASET_2, 'storage') +--- +... +util = require('util') +--- +... +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') +--- +... +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') +--- +... +_ = test_run:cmd("switch storage_1_a") +--- +... +util = require('util') +--- +... +fiber = require('fiber') +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function wait_fibers_exit() +    while #util.vshard_fiber_list() > 0 do +        fiber.sleep(0.05) +    end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Storage is configured. +-- Establish net.box connection. +_, rs = next(vshard.storage.internal.replicasets) +--- +... +rs:callro('echo', {'some data'}) +--- +- some data +- null +- null +... +rs = nil +--- +... +-- Validate destroy finction by fiber_list. +-- Netbox fibers are deleted after replicas by GC. +util.vshard_fiber_list() +--- +- - 127.0.0.1:3303 (net.box) +  - vshard.gc +  - vshard.recovery +... +box.schema.user.exists('storage') == true +--- +- true +... +box.space._bucket ~= nil +--- +- true +... +-- Destroy storage. +vshard.storage.destroy() +--- +... +wait_fibers_exit() +--- +... +box.space._bucket == nil +--- +- true +... +-- Reconfigure storage. +-- gh-52: Allow use existing user. +box.schema.user.exists('storage') == true +--- +- true +... +vshard.storage.cfg(cfg, names['storage_1_a']) +--- +... +_, rs = next(vshard.storage.internal.replicasets) +--- +... +rs:callro('echo', {'some data'}) +--- +- some data +- null +- null +... +rs = nil +--- +... +box.space._bucket ~= nil +--- +- true +... +util.vshard_fiber_list() +--- +- - 127.0.0.1:3303 (net.box) +  - vshard.gc +  - vshard.recovery +... +_ = test_run:cmd("switch default") +--- +... +test_run:drop_cluster(REPLICASET_2) +--- +... +test_run:drop_cluster(REPLICASET_1) +--- +... diff --git a/test/storage/destroy.test.lua b/test/storage/destroy.test.lua new file mode 100644 index 0000000..38e7c0a --- /dev/null +++ b/test/storage/destroy.test.lua @@ -0,0 +1,50 @@ +test_run = require('test_run').new() + +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } + +test_run:create_cluster(REPLICASET_1, 'storage') +test_run:create_cluster(REPLICASET_2, 'storage') +util = require('util') +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') + +_ = test_run:cmd("switch storage_1_a") +util = require('util') +fiber = require('fiber') +test_run:cmd("setopt delimiter ';'") +function wait_fibers_exit() +    while #util.vshard_fiber_list() > 0 do +        fiber.sleep(0.05) +    end +end; +test_run:cmd("setopt delimiter ''"); + +-- Storage is configured. +-- Establish net.box connection. +_, rs = next(vshard.storage.internal.replicasets) +rs:callro('echo', {'some data'}) +rs = nil +-- Validate destroy finction by fiber_list. +-- Netbox fibers are deleted after replicas by GC. +util.vshard_fiber_list() +box.schema.user.exists('storage') == true +box.space._bucket ~= nil +-- Destroy storage. +vshard.storage.destroy() +wait_fibers_exit() +box.space._bucket == nil + +-- Reconfigure storage. +-- gh-52: Allow use existing user. +box.schema.user.exists('storage') == true +vshard.storage.cfg(cfg, names['storage_1_a']) +_, rs = next(vshard.storage.internal.replicasets) +rs:callro('echo', {'some data'}) +rs = nil +box.space._bucket ~= nil +util.vshard_fiber_list() + +_ = test_run:cmd("switch default") +test_run:drop_cluster(REPLICASET_2) +test_run:drop_cluster(REPLICASET_1) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 21093e5..e56c38a 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -7,30 +7,49 @@ local lhash = require('vshard.hash')  local lreplicaset = require('vshard.replicaset')  local util = require('vshard.util') +local MODULE_SKELETON = { +    errinj = { +        ERRINJ_FAILOVER_CHANGE_CFG = false, +        ERRINJ_RELOAD = false, +    }, +    -- Bucket map cache. +    route_map = {}, +    -- All known replicasets used for bucket re-balancing +    replicasets = nil, +    -- Fiber to maintain replica connections. +    failover_fiber = nil, +    -- Fiber to discovery buckets in background. +    discovery_fiber = nil, +    -- Bucket count stored on all replicasets. +    total_bucket_count = 0, +    -- If true, then discovery fiber starts to call +    -- collectgarbage() periodically. +    collect_lua_garbage = nil, +    -- This counter is used to restart background fibers with +    -- new reloaded code. +    module_version = 0, +} +  local M = rawget(_G, '__module_vshard_router') -if not M then -    M = { -        errinj = { -            ERRINJ_FAILOVER_CHANGE_CFG = false, -            ERRINJ_RELOAD = false, -        }, -        -- Bucket map cache. -        route_map = {}, -        -- All known replicasets used for bucket re-balancing -        replicasets = nil, -        -- Fiber to maintain replica connections. -        failover_fiber = nil, -        -- Fiber to discovery buckets in background. -        discovery_fiber = nil, -        -- Bucket count stored on all replicasets. -        total_bucket_count = 0, -        -- If true, then discovery fiber starts to call -        -- collectgarbage() periodically. -        collect_lua_garbage = nil, -        -- This counter is used to restart background fibers with -        -- new reloaded code. -        module_version = 0, -    } +local update_M + +-- +-- Destroy router module. +-- +local function destroy() +    local MODULE_INTERNALS = '__module_vshard_router' +    assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed') +    -- Cancel background fibers. +    for _, fib_name in pairs({ +        'failover_fiber', +        'discovery_fiber', +    }) do +        if M[fib_name] then +            M[fib_name]:cancel() +        end +    end +    util.override_table(M, MODULE_SKELETON) +    update_M()  end  -- Set a replicaset by container of a bucket. @@ -758,10 +777,6 @@ local function router_sync(timeout)      end  end -if M.errinj.ERRINJ_RELOAD then -    error('Error injection: reload') -end -  --------------------------------------------------------------------------------  -- Module definition  -------------------------------------------------------------------------------- @@ -769,15 +784,28 @@ end  -- About functions, saved in M, and reloading see comment in  -- storage/init.lua.  -- -M.discovery_f = discovery_f -M.failover_f = failover_f + +-- +-- Store module-definde values to M. +-- +update_M = function() +    M.discovery_f = discovery_f +    M.failover_f = failover_f +end  if not rawget(_G, '__module_vshard_router') then +    M = table.deepcopy(MODULE_SKELETON) +    update_M()      rawset(_G, '__module_vshard_router', M)  else +    M = rawget(_G, '__module_vshard_router')      M.module_version = M.module_version + 1  end +if M.errinj.ERRINJ_RELOAD then +    error('Error injection: reload') +end +  return {      cfg = router_cfg;      info = router_info; @@ -792,6 +820,7 @@ return {      sync = router_sync;      bootstrap = cluster_bootstrap;      bucket_discovery = bucket_discovery; +    destroy = destroy,      discovery_wakeup = discovery_wakeup;      internal = M;      module_version = function() return M.module_version end; diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index c80bfbf..0d3cc24 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -9,89 +9,88 @@ local lcfg = require('vshard.cfg')  local lreplicaset = require('vshard.replicaset')  local trigger = require('internal.trigger') -local M = rawget(_G, '__module_vshard_storage') -if not M then +-- +-- The module is loaded for the first time. +-- +local MODULE_SKELETON = { +    ---------------- Common module attributes ----------------      -- -    -- The module is loaded for the first time. +    -- All known replicasets used for bucket re-balancing. +    -- See format in replicaset.lua.      -- -    M = { -        ---------------- Common module attributes ---------------- -        -- -        -- All known replicasets used for bucket re-balancing. -        -- See format in replicaset.lua. -        -- -        replicasets = nil, -        -- Triggers on master switch event. They are called right -        -- before the event occurs. -        _on_master_enable = trigger.new('_on_master_enable'), -        _on_master_disable = trigger.new('_on_master_disable'), -        -- Index which is a trigger to shard its space by numbers in -        -- this index. It must have at first part either unsigned, -        -- or integer or number type and be not nullable. Values in -        -- this part are considered as bucket identifiers. -        shard_index = nil, -        -- Bucket count stored on all replicasets. -        total_bucket_count = 0, -        errinj = { -            ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false, -            ERRINJ_RELOAD = false, -            ERRINJ_CFG_DELAY = false, -            ERRINJ_LONG_RECEIVE = false, -        }, -        -- This counter is used to restart background fibers with -        -- new reloaded code. -        module_version = 0, -        -- -        -- Timeout to wait sync with slaves. Used on master -        -- demotion or on a manual sync() call. -        -- -        sync_timeout = consts.DEFAULT_SYNC_TIMEOUT, -        -- References to a parent replicaset and self in it. -        this_replicaset = nil, -        this_replica = nil, - -        ------------------- Garbage collection ------------------- -        -- Fiber to remove garbage buckets data. -        collect_bucket_garbage_fiber = nil, -        -- Do buckets garbage collection once per this time. -        collect_bucket_garbage_interval = nil, -        -- If true, then bucket garbage collection fiber starts to -        -- call collectgarbage() periodically. -        collect_lua_garbage = nil, - -        -------------------- Bucket recovery --------------------- -        -- Bucket identifiers which are not active and are not being -        -- sent - their status is unknown. Their state must be checked -        -- periodically in recovery fiber. -        buckets_to_recovery = {}, -        buckets_to_recovery_count = 0, -        recovery_fiber = nil, - -        ----------------------- Rebalancer ----------------------- -        -- Fiber to rebalance a cluster. -        rebalancer_fiber = nil, -        -- Fiber which applies routes one by one. Its presense and -        -- active status means that the rebalancing is in progress -        -- now on the current node. -        rebalancer_applier_fiber = nil, -        -- Internal flag to activate and deactivate rebalancer. Mostly -        -- for tests. -        is_rebalancer_active = true, -        -- Maximal allowed percent deviation of bucket count on a -        -- replicaset from etalon bucket count. -        rebalancer_disbalance_threshold = 0, -        -- Maximal bucket count that can be received by a single -        -- replicaset simultaneously. -        rebalancer_max_receiving = 0, -        -- Identifier of a bucket that rebalancer is sending now, -        -- or else 0. If a bucket has state SENDING, but its id is -        -- not stored here, it means, that its sending was -        -- interrupted, for example by restart of an instance, and -        -- a destination replicaset must drop already received -        -- data. -        rebalancer_sending_bucket = 0, -    } -end +    replicasets = nil, +    -- Triggers on master switch event. They are called right +    -- before the event occurs. +    _on_master_enable = trigger.new('_on_master_enable'), +    _on_master_disable = trigger.new('_on_master_disable'), +    -- Index which is a trigger to shard its space by numbers in +    -- this index. It must have at first part either unsigned, +    -- or integer or number type and be not nullable. Values in +    -- this part are considered as bucket identifiers. +    shard_index = nil, +    -- Bucket count stored on all replicasets. +    total_bucket_count = 0, +    errinj = { +        ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false, +        ERRINJ_RELOAD = false, +        ERRINJ_CFG_DELAY = false, +        ERRINJ_LONG_RECEIVE = false, +    }, +    -- This counter is used to restart background fibers with +    -- new reloaded code. +    module_version = 0, +    -- +    -- Timeout to wait sync with slaves. Used on master +    -- demotion or on a manual sync() call. +    -- +    sync_timeout = consts.DEFAULT_SYNC_TIMEOUT, +    -- References to a parent replicaset and self in it. +    this_replicaset = nil, +    this_replica = nil, + +    ------------------- Garbage collection ------------------- +    -- Fiber to remove garbage buckets data. +    collect_bucket_garbage_fiber = nil, +    -- Do buckets garbage collection once per this time. +    collect_bucket_garbage_interval = nil, +    -- If true, then bucket garbage collection fiber starts to +    -- call collectgarbage() periodically. +    collect_lua_garbage = nil, + +    -------------------- Bucket recovery --------------------- +    -- Bucket identifiers which are not active and are not being +    -- sent - their status is unknown. Their state must be checked +    -- periodically in recovery fiber. +    buckets_to_recovery = {}, +    buckets_to_recovery_count = 0, +    recovery_fiber = nil, + +    ----------------------- Rebalancer ----------------------- +    -- Fiber to rebalance a cluster. +    rebalancer_fiber = nil, +    -- Fiber which applies routes one by one. Its presense and +    -- active status means that the rebalancing is in progress +    -- now on the current node. +    rebalancer_applier_fiber = nil, +    -- Internal flag to activate and deactivate rebalancer. Mostly +    -- for tests. +    is_rebalancer_active = true, +    -- Maximal allowed percent deviation of bucket count on a +    -- replicaset from etalon bucket count. +    rebalancer_disbalance_threshold = 0, +    -- Maximal bucket count that can be received by a single +    -- replicaset simultaneously. +    rebalancer_max_receiving = 0, +    -- Identifier of a bucket that rebalancer is sending now, +    -- or else 0. If a bucket has state SENDING, but its id is +    -- not stored here, it means, that its sending was +    -- interrupted, for example by restart of an instance, and +    -- a destination replicaset must drop already received +    -- data. +    rebalancer_sending_bucket = 0, +} +local M +local update_M  --  -- Check if this replicaset is locked. It means be invisible for @@ -138,6 +137,22 @@ end  --------------------------------------------------------------------------------  -- Schema  -------------------------------------------------------------------------------- +local storage_api = { +    'vshard.storage.sync', +    'vshard.storage.call', +    'vshard.storage.bucket_force_create', +    'vshard.storage.bucket_force_drop', +    'vshard.storage.bucket_collect', +    'vshard.storage.bucket_send', +    'vshard.storage.bucket_recv', +    'vshard.storage.bucket_stat', +    'vshard.storage.buckets_count', +    'vshard.storage.buckets_info', +    'vshard.storage.buckets_discovery', +    'vshard.storage.rebalancer_request_state', +    'vshard.storage.rebalancer_apply_routes', +} +  local function storage_schema_v1(username, password)      log.info("Initializing schema")      box.schema.user.create(username, { @@ -157,22 +172,6 @@ local function storage_schema_v1(username, password)      bucket:create_index('pk', {parts = {'id'}})      bucket:create_index('status', {parts = {'status'}, unique = false}) -    local storage_api = { -        'vshard.storage.sync', -        'vshard.storage.call', -        'vshard.storage.bucket_force_create', -        'vshard.storage.bucket_force_drop', -        'vshard.storage.bucket_collect', -        'vshard.storage.bucket_send', -        'vshard.storage.bucket_recv', -        'vshard.storage.bucket_stat', -        'vshard.storage.buckets_count', -        'vshard.storage.buckets_info', -        'vshard.storage.buckets_discovery', -        'vshard.storage.rebalancer_request_state', -        'vshard.storage.rebalancer_apply_routes', -    } -      for _, name in ipairs(storage_api) do          box.schema.func.create(name, {setuid = true})          box.schema.user.grant(username, 'execute', 'function', name) @@ -203,6 +202,40 @@ local function on_master_enable(...)      end  end +-------------------------------------------------------------------------------- +-- Destroy +-------------------------------------------------------------------------------- + +-- +-- Destroy storage module. +-- +local function destroy() +    local MODULE_INTERNALS = '__module_vshard_storage' +    assert(rawget(_G, MODULE_INTERNALS), 'Already destroyed') +    box.space._bucket:drop() +    for _, name in ipairs(storage_api) do +        box.schema.func.drop(name) +    end +    -- Cancel background fibers. +    for _, fib_name in pairs({ +        'recovery_fiber', +        'collect_bucket_garbage_fiber', +        'rebalancer_applier_fiber', +        'rebalancer_fiber', +    }) do +        if M[fib_name] then +            M[fib_name]:cancel() +        end +    end +    local box_cfg = table.deepcopy(box.cfg) +    box_cfg.replication = {} +    box_cfg.read_only = false +    box.cfg(box_cfg) +    box.space._schema:delete{'oncevshard:storage:1'} +    util.override_table(M, MODULE_SKELETON) +    update_M() +end +  --------------------------------------------------------------------------------  -- Recovery  -------------------------------------------------------------------------------- @@ -594,10 +627,6 @@ local function bucket_collect_internal(bucket_id)      return data  end -if M.errinj.ERRINJ_RELOAD then -    error('Error injection: reload') -end -  --  -- Collect content of ACTIVE bucket.  -- @@ -1808,27 +1837,37 @@ end  -- restarted (or is restarted from M.background_f, which is not  -- changed) and continues use old func1 and func2.  -- -M.recovery_f = recovery_f -M.collect_garbage_f = collect_garbage_f -M.rebalancer_f = rebalancer_f  -- --- These functions are saved in M not for atomic reload, but for --- unit testing. +-- Store module-definde values to M.  -- -M.find_garbage_bucket = find_garbage_bucket -M.collect_garbage_step = collect_garbage_step -M.collect_garbage_f = collect_garbage_f -M.rebalancer_build_routes = rebalancer_build_routes -M.rebalancer_calculate_metrics = rebalancer_calculate_metrics -M.cached_find_sharded_spaces = find_sharded_spaces +update_M = function() +    M.recovery_f = recovery_f +    M.collect_garbage_f = collect_garbage_f +    M.rebalancer_f = rebalancer_f +    -- These functions are saved in M not for atomic reload, but for +    -- unit testing. +    M.find_garbage_bucket = find_garbage_bucket +    M.collect_garbage_step = collect_garbage_step +    M.collect_garbage_f = collect_garbage_f +    M.rebalancer_build_routes = rebalancer_build_routes +    M.rebalancer_calculate_metrics = rebalancer_calculate_metrics +    M.cached_find_sharded_spaces = find_sharded_spaces +end  if not rawget(_G, '__module_vshard_storage') then +    M = table.deepcopy(MODULE_SKELETON) +    update_M()      rawset(_G, '__module_vshard_storage', M)  else +    M = rawget(_G, '__module_vshard_router')      M.module_version = M.module_version + 1  end +if M.errinj.ERRINJ_RELOAD then +    error('Error injection: reload') +end +  return {      sync = sync,      bucket_force_create = bucket_force_create, @@ -1840,6 +1879,7 @@ return {      bucket_pin = bucket_pin,      bucket_unpin = bucket_unpin,      bucket_delete_garbage = bucket_delete_garbage, +    destroy = destroy,      garbage_collector_wakeup = garbage_collector_wakeup,      rebalancer_wakeup = rebalancer_wakeup,      rebalancer_apply_routes = rebalancer_apply_routes, diff --git a/vshard/util.lua b/vshard/util.lua index bb71318..d57cf44 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -75,7 +75,22 @@ local function generate_self_checker(obj_name, func_name, mt, func)      end  end +-- +-- Replace itrernals of a table with a template. +-- @param tbl Table to be updated. +-- @param template Internals of the `tbl` after the call. +-- +local function override_table(tbl, template) +    for k, _ in pairs(tbl) do +        tbl[k] = nil +    end +    for k, v in pairs(template) do +        tbl[k] = table.deepcopy(v) +    end +end +  return { +    override_table = override_table,      tuple_extract_key = tuple_extract_key,      reloadable_fiber_f = reloadable_fiber_f,      generate_self_checker = generate_self_checker,