[tarantool-patches] [PATCH vshard 1/7] rebalancer: allow to lock a replicaset from rebalancing
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Mar 28 00:24:08 MSK 2018
Locked replicaset can neither receive new buckets nor send its
own ones. Actually, it and its buckets do not participate in
rebalancing, and non-locked replicasets are rebalanced
independently.
For example, consider a cluster:
replicaset1: locked, weight = 1, bucket count = 1500
replicaset2: weight = 1, bucket count = 1500
When a replicaset3 is added, only rs3 and rs2 participate in
rebalancing (respecting their weights):
replicaset1: locked, weight = 1, bucket count = 1500
replicaset2: weight = 1, bucket_count = 500
replicaset3: weight = 2, bucket_count = 1000
The lock is useful for example to hold some test data on a
particular replicaset, or to store on the replicaset a special
data, that must be stored together.
Part of #71
---
test/rebalancer/box_1_a.lua | 2 +-
test/rebalancer/rebalancer_lock_and_pin.result | 315 +++++++++++++++++++++++
test/rebalancer/rebalancer_lock_and_pin.test.lua | 139 ++++++++++
vshard/cfg.lua | 3 +-
vshard/error.lua | 12 +-
vshard/replicaset.lua | 3 +-
vshard/router/init.lua | 1 +
vshard/storage/init.lua | 55 +++-
8 files changed, 509 insertions(+), 21 deletions(-)
create mode 100644 test/rebalancer/rebalancer_lock_and_pin.result
create mode 100644 test/rebalancer/rebalancer_lock_and_pin.test.lua
diff --git a/test/rebalancer/box_1_a.lua b/test/rebalancer/box_1_a.lua
index d670293..8fddcf0 100644
--- a/test/rebalancer/box_1_a.lua
+++ b/test/rebalancer/box_1_a.lua
@@ -3,7 +3,7 @@
require('strict').on()
local fio = require('fio')
local NAME = fio.basename(arg[0], '.lua')
-local log = require('log')
+log = require('log')
require('console').listen(os.getenv('ADMIN'))
fiber = require('fiber')
diff --git a/test/rebalancer/rebalancer_lock_and_pin.result b/test/rebalancer/rebalancer_lock_and_pin.result
new file mode 100644
index 0000000..b61cc84
--- /dev/null
+++ b/test/rebalancer/rebalancer_lock_and_pin.result
@@ -0,0 +1,315 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'box_1_a', 'box_1_b' }
+---
+...
+REPLICASET_2 = { 'box_2_a', 'box_2_b' }
+---
+...
+REPLICASET_3 = { 'box_3_a', 'box_3_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'rebalancer')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'rebalancer')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'box_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'box_2_a')
+---
+...
+--
+-- A replicaset can be locked. Locked replicaset can neither
+-- receive new buckets nor send own ones during rebalancing.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+vshard.storage.bucket_force_create(1501, 1500)
+---
+- true
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+vshard.storage.bucket_force_create(1, 1500)
+---
+- true
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+--
+-- Check that a weight = 0 will not do anything with a locked
+-- replicaset. Moreover, this cluster is considered to be balanced
+-- ok.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+---
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 0
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+---
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 0
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+vshard.storage.is_locked()
+---
+- true
+...
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 1500
+...
+info.lock
+---
+- true
+...
+--
+-- Check that a locked replicaset not only blocks bucket sending,
+-- but blocks receiving as well.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.weight = 2
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.weight = 2
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 1500
+...
+info.lock
+---
+- true
+...
+--
+-- Vshard ensures that if a replicaset is locked, then it will not
+-- allow to change its bucket set even if a rebalancer does not
+-- know about a lock yet. For example, a locked replicaset could
+-- be reconfigured a bit earlier.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+---
+...
+rs2_cfg.lock = true
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('Replicaset is locked', test_run)
+---
+...
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+---
+...
+rs2_cfg.lock = true
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+--
+-- Check that when a new replicaset is added, buckets are spreaded
+-- on non-locked replicasets as if locked replicasets and buckets
+-- do not exist.
+--
+test_run:switch('default')
+---
+- true
+...
+test_run:create_cluster(REPLICASET_3, 'rebalancer')
+---
+...
+util.wait_master(test_run, REPLICASET_3, 'box_3_a')
+---
+...
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 1
+---
+...
+-- Return default configuration.
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 0.5
+---
+...
+add_replicaset()
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+---
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+---
+...
+rs2_cfg.weight = 0.5
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 0.5
+---
+...
+add_replicaset()
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 1500
+...
+info.lock
+---
+- true
+...
+test_run:switch('box_2_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 500
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 1000
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_3)
+---
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
diff --git a/test/rebalancer/rebalancer_lock_and_pin.test.lua b/test/rebalancer/rebalancer_lock_and_pin.test.lua
new file mode 100644
index 0000000..d0f2163
--- /dev/null
+++ b/test/rebalancer/rebalancer_lock_and_pin.test.lua
@@ -0,0 +1,139 @@
+test_run = require('test_run').new()
+
+REPLICASET_1 = { 'box_1_a', 'box_1_b' }
+REPLICASET_2 = { 'box_2_a', 'box_2_b' }
+REPLICASET_3 = { 'box_3_a', 'box_3_b' }
+
+test_run:create_cluster(REPLICASET_1, 'rebalancer')
+test_run:create_cluster(REPLICASET_2, 'rebalancer')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'box_1_a')
+util.wait_master(test_run, REPLICASET_2, 'box_2_a')
+
+--
+-- A replicaset can be locked. Locked replicaset can neither
+-- receive new buckets nor send own ones during rebalancing.
+--
+
+test_run:switch('box_2_a')
+vshard.storage.bucket_force_create(1501, 1500)
+
+test_run:switch('box_1_a')
+vshard.storage.bucket_force_create(1, 1500)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+
+--
+-- Check that a weight = 0 will not do anything with a locked
+-- replicaset. Moreover, this cluster is considered to be balanced
+-- ok.
+--
+test_run:switch('box_2_a')
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+rs1_cfg.lock = true
+rs1_cfg.weight = 0
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_1_a')
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+rs1_cfg.lock = true
+rs1_cfg.weight = 0
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+vshard.storage.is_locked()
+info = vshard.storage.info().bucket
+info.active
+info.lock
+
+--
+-- Check that a locked replicaset not only blocks bucket sending,
+-- but blocks receiving as well.
+--
+test_run:switch('box_2_a')
+rs1_cfg.weight = 2
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_1_a')
+rs1_cfg.weight = 2
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+info = vshard.storage.info().bucket
+info.active
+info.lock
+
+--
+-- Vshard ensures that if a replicaset is locked, then it will not
+-- allow to change its bucket set even if a rebalancer does not
+-- know about a lock yet. For example, a locked replicaset could
+-- be reconfigured a bit earlier.
+--
+test_run:switch('box_2_a')
+rs1_cfg.lock = false
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+rs2_cfg.lock = true
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_1_a')
+rs1_cfg.lock = false
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('Replicaset is locked', test_run)
+
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+rs2_cfg.lock = true
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+
+--
+-- Check that when a new replicaset is added, buckets are spreaded
+-- on non-locked replicasets as if locked replicasets and buckets
+-- do not exist.
+--
+
+test_run:switch('default')
+test_run:create_cluster(REPLICASET_3, 'rebalancer')
+util.wait_master(test_run, REPLICASET_3, 'box_3_a')
+
+test_run:switch('box_2_a')
+rs1_cfg.lock = true
+rs1_cfg.weight = 1
+-- Return default configuration.
+rs2_cfg.lock = false
+rs2_cfg.weight = 0.5
+add_replicaset()
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_3_a')
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+rs1_cfg.lock = true
+rs1_cfg.weight = 1
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+rs2_cfg.weight = 0.5
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+
+test_run:switch('box_1_a')
+rs1_cfg.lock = true
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 0.5
+add_replicaset()
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+info = vshard.storage.info().bucket
+info.active
+info.lock
+
+test_run:switch('box_2_a')
+vshard.storage.info().bucket.active
+
+test_run:switch('box_3_a')
+vshard.storage.info().bucket.active
+
+test_run:cmd("switch default")
+test_run:drop_cluster(REPLICASET_3)
+test_run:drop_cluster(REPLICASET_2)
+test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index 9860676..c30dbf6 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -105,7 +105,8 @@ local replicaset_template = {
{'weight', {
type = 'non-negative number', name = 'Weight', is_optional = true,
default = 1,
- }}
+ }},
+ {'lock', {type = 'boolean', name = 'Lock', is_optional = true}},
}
--
diff --git a/vshard/error.lua b/vshard/error.lua
index 10977fa..8d26d11 100644
--- a/vshard/error.lua
+++ b/vshard/error.lua
@@ -19,8 +19,10 @@ end
local function vshard_error(code, args, msg)
local ret = setmetatable({type = 'ShardingError', code = code,
message = msg}, {__tostring = json.encode})
- for k, v in pairs(args) do
- ret[k] = v
+ if args then
+ for k, v in pairs(args) do
+ ret[k] = v
+ end
end
return ret
end
@@ -36,13 +38,14 @@ local function make_error(e)
elseif type(e) == 'string' then
local ok, err = pcall(box.error, box.error.PROC_LUA, e)
return box_error(err)
+ elseif type(e) == 'table' then
+ return setmetatable(e, {__tostring = json.encode})
else
return e
end
end
local error_code = {
- -- Error codes. Some of them are used for alerts too.
WRONG_BUCKET = 1,
NON_MASTER = 2,
BUCKET_ALREADY_EXISTS = 3,
@@ -53,8 +56,6 @@ local error_code = {
UNREACHABLE_REPLICASET = 8,
NO_ROUTE_TO_BUCKET = 9,
NON_EMPTY = 10,
-
- -- Alert codes.
UNREACHABLE_MASTER = 11,
OUT_OF_SYNC = 12,
HIGH_REPLICATION_LAG = 13,
@@ -63,6 +64,7 @@ local error_code = {
INVALID_REBALANCING = 16,
SUBOPTIMAL_REPLICA = 17,
UNKNOWN_BUCKETS = 18,
+ REPLICASET_IS_LOCKED = 19,
}
local error_message_template = {
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 559d305..724b1b3 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -456,6 +456,7 @@ local function buildall(sharding_cfg, old_replicasets)
uuid = replicaset_uuid,
weight = replicaset.weight,
bucket_count = 0,
+ lock = replicaset.lock,
}, replicaset_mt)
local priority_list = {}
for replica_uuid, replica in pairs(replicaset.replicas) do
@@ -511,8 +512,6 @@ local function buildall(sharding_cfg, old_replicasets)
new_replicaset.priority_list = priority_list
new_replicasets[replicaset_uuid] = new_replicaset
end
- cluster_calculate_ethalon_balance(new_replicasets,
- sharding_cfg.bucket_count)
return new_replicasets
end
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index c04aebc..e990c69 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -520,6 +520,7 @@ local function cluster_bootstrap()
'Cluster is already bootstrapped')
end
end
+ lreplicaset.calculate_ethalon_balance(M.replicasets, M.total_bucket_count)
local bucket_id = 1
for uuid, replicaset in pairs(M.replicasets) do
if replicaset.ethalon_bucket_count > 0 then
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index f68ea35..405585d 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -40,7 +40,14 @@ if not M then
-- This counter is used to restart background fibers with
-- new reloaded code.
module_version = 0,
+ --
+ -- Timeout to wait sync with slaves. Used on master
+ -- demotion or on a manual sync() call.
+ --
sync_timeout = consts.DEFAULT_SYNC_TIMEOUT,
+ -- References to a parent replicaset and self in it.
+ this_replicaset = nil,
+ this_replica = nil,
------------------- Garbage collection -------------------
-- Fiber to remove garbage buckets data.
@@ -78,6 +85,14 @@ if not M then
}
end
+--
+-- Check if this replicaset is locked. It means be invisible for
+-- the rebalancer.
+--
+local function is_this_replicaset_locked()
+ return M.this_replicaset and M.this_replicaset.lock
+end
+
--------------------------------------------------------------------------------
-- Schema
--------------------------------------------------------------------------------
@@ -1135,6 +1150,10 @@ end
-- }. Is used by a rebalancer.
--
local function rebalancer_apply_routes(routes)
+ if is_this_replicaset_locked() then
+ return false, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED, nil,
+ "Replicaset is locked");
+ end
assert(not rebalancing_is_in_progress())
-- Can not apply routes here because of gh-946 in tarantool
-- about problems with long polling. Apply routes in a fiber.
@@ -1151,6 +1170,7 @@ end
--
local function rebalancer_download_states()
local replicasets = {}
+ local total_bucket_locked_count = 0
local total_bucket_active_count = 0
for uuid, replicaset in pairs(M.replicasets) do
local bucket_active_count =
@@ -1158,19 +1178,23 @@ local function rebalancer_download_states()
if bucket_active_count == nil then
return
end
- total_bucket_active_count = total_bucket_active_count +
- bucket_active_count
- replicasets[uuid] = {bucket_count = bucket_active_count,
- weight = replicaset.weight or 1,
- ethalon_bucket_count =
- replicaset.ethalon_bucket_count}
- end
- if total_bucket_active_count == M.total_bucket_count then
- return replicasets
+ if replicaset.lock then
+ total_bucket_locked_count =
+ total_bucket_locked_count + bucket_active_count
+ else
+ total_bucket_active_count =
+ total_bucket_active_count + bucket_active_count
+ replicasets[uuid] = {bucket_count = bucket_active_count,
+ weight = replicaset.weight}
+ end
+ end
+ local sum = total_bucket_active_count + total_bucket_locked_count
+ if sum == M.total_bucket_count then
+ return replicasets, total_bucket_active_count
else
log.info('Total active bucket count is not equal to total. '..
'Possibly a boostrap is not finished yet. Expected %d, but '..
- 'found %d', M.total_bucket_count, total_bucket_active_count)
+ 'found %d', M.total_bucket_count, sum)
end
end
@@ -1185,7 +1209,8 @@ local function rebalancer_f(module_version)
log.info('Rebalancer is disabled. Sleep')
lfiber.sleep(consts.REBALANCER_IDLE_INTERVAL)
end
- local status, replicasets = pcall(rebalancer_download_states)
+ local status, replicasets, total_bucket_active_count =
+ pcall(rebalancer_download_states)
if M.module_version ~= module_version then
return
end
@@ -1198,6 +1223,8 @@ local function rebalancer_f(module_version)
lfiber.sleep(consts.REBALANCER_WORK_INTERVAL)
goto continue
end
+ lreplicaset.calculate_ethalon_balance(replicasets,
+ total_bucket_active_count)
local max_disbalance =
rebalancer_calculate_metrics(replicasets,
M.rebalancer_max_receiving)
@@ -1224,7 +1251,7 @@ local function rebalancer_f(module_version)
{src_routes})
if not status then
log.error('Error during routes appying on "%s": %s. '..
- 'Try rebalance later', rs, err)
+ 'Try rebalance later', rs, lerror.make(err))
lfiber.sleep(consts.REBALANCER_WORK_INTERVAL)
goto continue
end
@@ -1558,6 +1585,9 @@ local function storage_info()
state.replication.status = 'slave'
end
+ if is_this_replicaset_locked() then
+ state.bucket.lock = true
+ end
state.bucket.total = box.space._bucket.index.pk:count()
state.bucket.active = box.space._bucket.index.status:count({consts.BUCKET.ACTIVE})
state.bucket.garbage = box.space._bucket.index.status:count({consts.BUCKET.SENT})
@@ -1689,6 +1719,7 @@ return {
rebalancer_apply_routes = rebalancer_apply_routes;
rebalancer_disable = rebalancer_disable;
rebalancer_enable = rebalancer_enable;
+ is_locked = is_this_replicaset_locked;
recovery_wakeup = recovery_wakeup;
call = storage_call;
cfg = storage_cfg;
--
2.14.3 (Apple Git-98)
More information about the Tarantool-patches
mailing list