From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 305C32BC98 for ; Tue, 27 Mar 2018 17:24:20 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 1H4YKapkVqyJ for ; Tue, 27 Mar 2018 17:24:20 -0400 (EDT) Received: from smtp43.i.mail.ru (smtp43.i.mail.ru [94.100.177.103]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id B51042B747 for ; Tue, 27 Mar 2018 17:24:19 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH vshard 1/7] rebalancer: allow to lock a replicaset from rebalancing Date: Wed, 28 Mar 2018 00:24:08 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: georgy@tarantool.org, Vladislav Shpilevoy Locked replicaset can neither receive new buckets nor send its own ones. Actually, it and its buckets do not participate in rebalancing, and non-locked replicasets are rebalanced independently. For example, consider a cluster: replicaset1: locked, weight = 1, bucket count = 1500 replicaset2: weight = 1, bucket count = 1500 When a replicaset3 is added, only rs3 and rs2 participate in rebalancing (respecting their weights): replicaset1: locked, weight = 1, bucket count = 1500 replicaset2: weight = 1, bucket_count = 500 replicaset3: weight = 2, bucket_count = 1000 The lock is useful for example to hold some test data on a particular replicaset, or to store on the replicaset a special data, that must be stored together. Part of #71 --- test/rebalancer/box_1_a.lua | 2 +- test/rebalancer/rebalancer_lock_and_pin.result | 315 +++++++++++++++++++++++ test/rebalancer/rebalancer_lock_and_pin.test.lua | 139 ++++++++++ vshard/cfg.lua | 3 +- vshard/error.lua | 12 +- vshard/replicaset.lua | 3 +- vshard/router/init.lua | 1 + vshard/storage/init.lua | 55 +++- 8 files changed, 509 insertions(+), 21 deletions(-) create mode 100644 test/rebalancer/rebalancer_lock_and_pin.result create mode 100644 test/rebalancer/rebalancer_lock_and_pin.test.lua diff --git a/test/rebalancer/box_1_a.lua b/test/rebalancer/box_1_a.lua index d670293..8fddcf0 100644 --- a/test/rebalancer/box_1_a.lua +++ b/test/rebalancer/box_1_a.lua @@ -3,7 +3,7 @@ require('strict').on() local fio = require('fio') local NAME = fio.basename(arg[0], '.lua') -local log = require('log') +log = require('log') require('console').listen(os.getenv('ADMIN')) fiber = require('fiber') diff --git a/test/rebalancer/rebalancer_lock_and_pin.result b/test/rebalancer/rebalancer_lock_and_pin.result new file mode 100644 index 0000000..b61cc84 --- /dev/null +++ b/test/rebalancer/rebalancer_lock_and_pin.result @@ -0,0 +1,315 @@ +test_run = require('test_run').new() +--- +... +REPLICASET_1 = { 'box_1_a', 'box_1_b' } +--- +... +REPLICASET_2 = { 'box_2_a', 'box_2_b' } +--- +... +REPLICASET_3 = { 'box_3_a', 'box_3_b' } +--- +... +test_run:create_cluster(REPLICASET_1, 'rebalancer') +--- +... +test_run:create_cluster(REPLICASET_2, 'rebalancer') +--- +... +util = require('util') +--- +... +util.wait_master(test_run, REPLICASET_1, 'box_1_a') +--- +... +util.wait_master(test_run, REPLICASET_2, 'box_2_a') +--- +... +-- +-- A replicaset can be locked. Locked replicaset can neither +-- receive new buckets nor send own ones during rebalancing. +-- +test_run:switch('box_2_a') +--- +- true +... +vshard.storage.bucket_force_create(1501, 1500) +--- +- true +... +test_run:switch('box_1_a') +--- +- true +... +vshard.storage.bucket_force_create(1, 1500) +--- +- true +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +-- +-- Check that a weight = 0 will not do anything with a locked +-- replicaset. Moreover, this cluster is considered to be balanced +-- ok. +-- +test_run:switch('box_2_a') +--- +- true +... +rs1_cfg = cfg.sharding[names.rs_uuid[1]] +--- +... +rs1_cfg.lock = true +--- +... +rs1_cfg.weight = 0 +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_2_a) +--- +... +test_run:switch('box_1_a') +--- +- true +... +rs1_cfg = cfg.sharding[names.rs_uuid[1]] +--- +... +rs1_cfg.lock = true +--- +... +rs1_cfg.weight = 0 +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +vshard.storage.is_locked() +--- +- true +... +info = vshard.storage.info().bucket +--- +... +info.active +--- +- 1500 +... +info.lock +--- +- true +... +-- +-- Check that a locked replicaset not only blocks bucket sending, +-- but blocks receiving as well. +-- +test_run:switch('box_2_a') +--- +- true +... +rs1_cfg.weight = 2 +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_2_a) +--- +... +test_run:switch('box_1_a') +--- +- true +... +rs1_cfg.weight = 2 +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +info = vshard.storage.info().bucket +--- +... +info.active +--- +- 1500 +... +info.lock +--- +- true +... +-- +-- Vshard ensures that if a replicaset is locked, then it will not +-- allow to change its bucket set even if a rebalancer does not +-- know about a lock yet. For example, a locked replicaset could +-- be reconfigured a bit earlier. +-- +test_run:switch('box_2_a') +--- +- true +... +rs1_cfg.lock = false +--- +... +rs2_cfg = cfg.sharding[names.rs_uuid[2]] +--- +... +rs2_cfg.lock = true +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_2_a) +--- +... +test_run:switch('box_1_a') +--- +- true +... +rs1_cfg.lock = false +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) +--- +... +wait_rebalancer_state('Replicaset is locked', test_run) +--- +... +rs2_cfg = cfg.sharding[names.rs_uuid[2]] +--- +... +rs2_cfg.lock = true +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +-- +-- Check that when a new replicaset is added, buckets are spreaded +-- on non-locked replicasets as if locked replicasets and buckets +-- do not exist. +-- +test_run:switch('default') +--- +- true +... +test_run:create_cluster(REPLICASET_3, 'rebalancer') +--- +... +util.wait_master(test_run, REPLICASET_3, 'box_3_a') +--- +... +test_run:switch('box_2_a') +--- +- true +... +rs1_cfg.lock = true +--- +... +rs1_cfg.weight = 1 +--- +... +-- Return default configuration. +rs2_cfg.lock = false +--- +... +rs2_cfg.weight = 0.5 +--- +... +add_replicaset() +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_2_a) +--- +... +test_run:switch('box_3_a') +--- +- true +... +rs1_cfg = cfg.sharding[names.rs_uuid[1]] +--- +... +rs1_cfg.lock = true +--- +... +rs1_cfg.weight = 1 +--- +... +rs2_cfg = cfg.sharding[names.rs_uuid[2]] +--- +... +rs2_cfg.weight = 0.5 +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_3_a) +--- +... +test_run:switch('box_1_a') +--- +- true +... +rs1_cfg.lock = true +--- +... +rs1_cfg.weight = 1 +--- +... +rs2_cfg.lock = false +--- +... +rs2_cfg.weight = 0.5 +--- +... +add_replicaset() +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +info = vshard.storage.info().bucket +--- +... +info.active +--- +- 1500 +... +info.lock +--- +- true +... +test_run:switch('box_2_a') +--- +- true +... +vshard.storage.info().bucket.active +--- +- 500 +... +test_run:switch('box_3_a') +--- +- true +... +vshard.storage.info().bucket.active +--- +- 1000 +... +test_run:cmd("switch default") +--- +- true +... +test_run:drop_cluster(REPLICASET_3) +--- +... +test_run:drop_cluster(REPLICASET_2) +--- +... +test_run:drop_cluster(REPLICASET_1) +--- +... diff --git a/test/rebalancer/rebalancer_lock_and_pin.test.lua b/test/rebalancer/rebalancer_lock_and_pin.test.lua new file mode 100644 index 0000000..d0f2163 --- /dev/null +++ b/test/rebalancer/rebalancer_lock_and_pin.test.lua @@ -0,0 +1,139 @@ +test_run = require('test_run').new() + +REPLICASET_1 = { 'box_1_a', 'box_1_b' } +REPLICASET_2 = { 'box_2_a', 'box_2_b' } +REPLICASET_3 = { 'box_3_a', 'box_3_b' } + +test_run:create_cluster(REPLICASET_1, 'rebalancer') +test_run:create_cluster(REPLICASET_2, 'rebalancer') +util = require('util') +util.wait_master(test_run, REPLICASET_1, 'box_1_a') +util.wait_master(test_run, REPLICASET_2, 'box_2_a') + +-- +-- A replicaset can be locked. Locked replicaset can neither +-- receive new buckets nor send own ones during rebalancing. +-- + +test_run:switch('box_2_a') +vshard.storage.bucket_force_create(1501, 1500) + +test_run:switch('box_1_a') +vshard.storage.bucket_force_create(1, 1500) + +wait_rebalancer_state('The cluster is balanced ok', test_run) + +-- +-- Check that a weight = 0 will not do anything with a locked +-- replicaset. Moreover, this cluster is considered to be balanced +-- ok. +-- +test_run:switch('box_2_a') +rs1_cfg = cfg.sharding[names.rs_uuid[1]] +rs1_cfg.lock = true +rs1_cfg.weight = 0 +vshard.storage.cfg(cfg, names.replica_uuid.box_2_a) + +test_run:switch('box_1_a') +rs1_cfg = cfg.sharding[names.rs_uuid[1]] +rs1_cfg.lock = true +rs1_cfg.weight = 0 +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) + +wait_rebalancer_state('The cluster is balanced ok', test_run) +vshard.storage.is_locked() +info = vshard.storage.info().bucket +info.active +info.lock + +-- +-- Check that a locked replicaset not only blocks bucket sending, +-- but blocks receiving as well. +-- +test_run:switch('box_2_a') +rs1_cfg.weight = 2 +vshard.storage.cfg(cfg, names.replica_uuid.box_2_a) + +test_run:switch('box_1_a') +rs1_cfg.weight = 2 +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) + +wait_rebalancer_state('The cluster is balanced ok', test_run) +info = vshard.storage.info().bucket +info.active +info.lock + +-- +-- Vshard ensures that if a replicaset is locked, then it will not +-- allow to change its bucket set even if a rebalancer does not +-- know about a lock yet. For example, a locked replicaset could +-- be reconfigured a bit earlier. +-- +test_run:switch('box_2_a') +rs1_cfg.lock = false +rs2_cfg = cfg.sharding[names.rs_uuid[2]] +rs2_cfg.lock = true +vshard.storage.cfg(cfg, names.replica_uuid.box_2_a) + +test_run:switch('box_1_a') +rs1_cfg.lock = false +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) + +wait_rebalancer_state('Replicaset is locked', test_run) + +rs2_cfg = cfg.sharding[names.rs_uuid[2]] +rs2_cfg.lock = true +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) + +wait_rebalancer_state('The cluster is balanced ok', test_run) + +-- +-- Check that when a new replicaset is added, buckets are spreaded +-- on non-locked replicasets as if locked replicasets and buckets +-- do not exist. +-- + +test_run:switch('default') +test_run:create_cluster(REPLICASET_3, 'rebalancer') +util.wait_master(test_run, REPLICASET_3, 'box_3_a') + +test_run:switch('box_2_a') +rs1_cfg.lock = true +rs1_cfg.weight = 1 +-- Return default configuration. +rs2_cfg.lock = false +rs2_cfg.weight = 0.5 +add_replicaset() +vshard.storage.cfg(cfg, names.replica_uuid.box_2_a) + +test_run:switch('box_3_a') +rs1_cfg = cfg.sharding[names.rs_uuid[1]] +rs1_cfg.lock = true +rs1_cfg.weight = 1 +rs2_cfg = cfg.sharding[names.rs_uuid[2]] +rs2_cfg.weight = 0.5 +vshard.storage.cfg(cfg, names.replica_uuid.box_3_a) + +test_run:switch('box_1_a') +rs1_cfg.lock = true +rs1_cfg.weight = 1 +rs2_cfg.lock = false +rs2_cfg.weight = 0.5 +add_replicaset() +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) + +wait_rebalancer_state('The cluster is balanced ok', test_run) +info = vshard.storage.info().bucket +info.active +info.lock + +test_run:switch('box_2_a') +vshard.storage.info().bucket.active + +test_run:switch('box_3_a') +vshard.storage.info().bucket.active + +test_run:cmd("switch default") +test_run:drop_cluster(REPLICASET_3) +test_run:drop_cluster(REPLICASET_2) +test_run:drop_cluster(REPLICASET_1) diff --git a/vshard/cfg.lua b/vshard/cfg.lua index 9860676..c30dbf6 100644 --- a/vshard/cfg.lua +++ b/vshard/cfg.lua @@ -105,7 +105,8 @@ local replicaset_template = { {'weight', { type = 'non-negative number', name = 'Weight', is_optional = true, default = 1, - }} + }}, + {'lock', {type = 'boolean', name = 'Lock', is_optional = true}}, } -- diff --git a/vshard/error.lua b/vshard/error.lua index 10977fa..8d26d11 100644 --- a/vshard/error.lua +++ b/vshard/error.lua @@ -19,8 +19,10 @@ end local function vshard_error(code, args, msg) local ret = setmetatable({type = 'ShardingError', code = code, message = msg}, {__tostring = json.encode}) - for k, v in pairs(args) do - ret[k] = v + if args then + for k, v in pairs(args) do + ret[k] = v + end end return ret end @@ -36,13 +38,14 @@ local function make_error(e) elseif type(e) == 'string' then local ok, err = pcall(box.error, box.error.PROC_LUA, e) return box_error(err) + elseif type(e) == 'table' then + return setmetatable(e, {__tostring = json.encode}) else return e end end local error_code = { - -- Error codes. Some of them are used for alerts too. WRONG_BUCKET = 1, NON_MASTER = 2, BUCKET_ALREADY_EXISTS = 3, @@ -53,8 +56,6 @@ local error_code = { UNREACHABLE_REPLICASET = 8, NO_ROUTE_TO_BUCKET = 9, NON_EMPTY = 10, - - -- Alert codes. UNREACHABLE_MASTER = 11, OUT_OF_SYNC = 12, HIGH_REPLICATION_LAG = 13, @@ -63,6 +64,7 @@ local error_code = { INVALID_REBALANCING = 16, SUBOPTIMAL_REPLICA = 17, UNKNOWN_BUCKETS = 18, + REPLICASET_IS_LOCKED = 19, } local error_message_template = { diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index 559d305..724b1b3 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -456,6 +456,7 @@ local function buildall(sharding_cfg, old_replicasets) uuid = replicaset_uuid, weight = replicaset.weight, bucket_count = 0, + lock = replicaset.lock, }, replicaset_mt) local priority_list = {} for replica_uuid, replica in pairs(replicaset.replicas) do @@ -511,8 +512,6 @@ local function buildall(sharding_cfg, old_replicasets) new_replicaset.priority_list = priority_list new_replicasets[replicaset_uuid] = new_replicaset end - cluster_calculate_ethalon_balance(new_replicasets, - sharding_cfg.bucket_count) return new_replicasets end diff --git a/vshard/router/init.lua b/vshard/router/init.lua index c04aebc..e990c69 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -520,6 +520,7 @@ local function cluster_bootstrap() 'Cluster is already bootstrapped') end end + lreplicaset.calculate_ethalon_balance(M.replicasets, M.total_bucket_count) local bucket_id = 1 for uuid, replicaset in pairs(M.replicasets) do if replicaset.ethalon_bucket_count > 0 then diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index f68ea35..405585d 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -40,7 +40,14 @@ if not M then -- This counter is used to restart background fibers with -- new reloaded code. module_version = 0, + -- + -- Timeout to wait sync with slaves. Used on master + -- demotion or on a manual sync() call. + -- sync_timeout = consts.DEFAULT_SYNC_TIMEOUT, + -- References to a parent replicaset and self in it. + this_replicaset = nil, + this_replica = nil, ------------------- Garbage collection ------------------- -- Fiber to remove garbage buckets data. @@ -78,6 +85,14 @@ if not M then } end +-- +-- Check if this replicaset is locked. It means be invisible for +-- the rebalancer. +-- +local function is_this_replicaset_locked() + return M.this_replicaset and M.this_replicaset.lock +end + -------------------------------------------------------------------------------- -- Schema -------------------------------------------------------------------------------- @@ -1135,6 +1150,10 @@ end -- }. Is used by a rebalancer. -- local function rebalancer_apply_routes(routes) + if is_this_replicaset_locked() then + return false, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED, nil, + "Replicaset is locked"); + end assert(not rebalancing_is_in_progress()) -- Can not apply routes here because of gh-946 in tarantool -- about problems with long polling. Apply routes in a fiber. @@ -1151,6 +1170,7 @@ end -- local function rebalancer_download_states() local replicasets = {} + local total_bucket_locked_count = 0 local total_bucket_active_count = 0 for uuid, replicaset in pairs(M.replicasets) do local bucket_active_count = @@ -1158,19 +1178,23 @@ local function rebalancer_download_states() if bucket_active_count == nil then return end - total_bucket_active_count = total_bucket_active_count + - bucket_active_count - replicasets[uuid] = {bucket_count = bucket_active_count, - weight = replicaset.weight or 1, - ethalon_bucket_count = - replicaset.ethalon_bucket_count} - end - if total_bucket_active_count == M.total_bucket_count then - return replicasets + if replicaset.lock then + total_bucket_locked_count = + total_bucket_locked_count + bucket_active_count + else + total_bucket_active_count = + total_bucket_active_count + bucket_active_count + replicasets[uuid] = {bucket_count = bucket_active_count, + weight = replicaset.weight} + end + end + local sum = total_bucket_active_count + total_bucket_locked_count + if sum == M.total_bucket_count then + return replicasets, total_bucket_active_count else log.info('Total active bucket count is not equal to total. '.. 'Possibly a boostrap is not finished yet. Expected %d, but '.. - 'found %d', M.total_bucket_count, total_bucket_active_count) + 'found %d', M.total_bucket_count, sum) end end @@ -1185,7 +1209,8 @@ local function rebalancer_f(module_version) log.info('Rebalancer is disabled. Sleep') lfiber.sleep(consts.REBALANCER_IDLE_INTERVAL) end - local status, replicasets = pcall(rebalancer_download_states) + local status, replicasets, total_bucket_active_count = + pcall(rebalancer_download_states) if M.module_version ~= module_version then return end @@ -1198,6 +1223,8 @@ local function rebalancer_f(module_version) lfiber.sleep(consts.REBALANCER_WORK_INTERVAL) goto continue end + lreplicaset.calculate_ethalon_balance(replicasets, + total_bucket_active_count) local max_disbalance = rebalancer_calculate_metrics(replicasets, M.rebalancer_max_receiving) @@ -1224,7 +1251,7 @@ local function rebalancer_f(module_version) {src_routes}) if not status then log.error('Error during routes appying on "%s": %s. '.. - 'Try rebalance later', rs, err) + 'Try rebalance later', rs, lerror.make(err)) lfiber.sleep(consts.REBALANCER_WORK_INTERVAL) goto continue end @@ -1558,6 +1585,9 @@ local function storage_info() state.replication.status = 'slave' end + if is_this_replicaset_locked() then + state.bucket.lock = true + end state.bucket.total = box.space._bucket.index.pk:count() state.bucket.active = box.space._bucket.index.status:count({consts.BUCKET.ACTIVE}) state.bucket.garbage = box.space._bucket.index.status:count({consts.BUCKET.SENT}) @@ -1689,6 +1719,7 @@ return { rebalancer_apply_routes = rebalancer_apply_routes; rebalancer_disable = rebalancer_disable; rebalancer_enable = rebalancer_enable; + is_locked = is_this_replicaset_locked; recovery_wakeup = recovery_wakeup; call = storage_call; cfg = storage_cfg; -- 2.14.3 (Apple Git-98)