[tarantool-patches] [PATCH vshard 1/7] rebalancer: allow to lock a replicaset from rebalancing

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Mar 28 00:24:08 MSK 2018


Locked replicaset can neither receive new buckets nor send its
own ones. Actually, it and its buckets do not participate in
rebalancing, and non-locked replicasets are rebalanced
independently.

For example, consider a cluster:
replicaset1: locked, weight = 1, bucket count = 1500
replicaset2:         weight = 1, bucket count = 1500

When a replicaset3 is added, only rs3 and rs2 participate in
rebalancing (respecting their weights):
replicaset1: locked, weight = 1, bucket count = 1500
replicaset2:         weight = 1, bucket_count = 500
replicaset3:         weight = 2, bucket_count = 1000

The lock is useful for example to hold some test data on a
particular replicaset, or to store on the replicaset a special
data, that must be stored together.

Part of #71
---
 test/rebalancer/box_1_a.lua                      |   2 +-
 test/rebalancer/rebalancer_lock_and_pin.result   | 315 +++++++++++++++++++++++
 test/rebalancer/rebalancer_lock_and_pin.test.lua | 139 ++++++++++
 vshard/cfg.lua                                   |   3 +-
 vshard/error.lua                                 |  12 +-
 vshard/replicaset.lua                            |   3 +-
 vshard/router/init.lua                           |   1 +
 vshard/storage/init.lua                          |  55 +++-
 8 files changed, 509 insertions(+), 21 deletions(-)
 create mode 100644 test/rebalancer/rebalancer_lock_and_pin.result
 create mode 100644 test/rebalancer/rebalancer_lock_and_pin.test.lua

diff --git a/test/rebalancer/box_1_a.lua b/test/rebalancer/box_1_a.lua
index d670293..8fddcf0 100644
--- a/test/rebalancer/box_1_a.lua
+++ b/test/rebalancer/box_1_a.lua
@@ -3,7 +3,7 @@
 require('strict').on()
 local fio = require('fio')
 local NAME = fio.basename(arg[0], '.lua')
-local log = require('log')
+log = require('log')
 require('console').listen(os.getenv('ADMIN'))
 fiber = require('fiber')
 
diff --git a/test/rebalancer/rebalancer_lock_and_pin.result b/test/rebalancer/rebalancer_lock_and_pin.result
new file mode 100644
index 0000000..b61cc84
--- /dev/null
+++ b/test/rebalancer/rebalancer_lock_and_pin.result
@@ -0,0 +1,315 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'box_1_a', 'box_1_b' }
+---
+...
+REPLICASET_2 = { 'box_2_a', 'box_2_b' }
+---
+...
+REPLICASET_3 = { 'box_3_a', 'box_3_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'rebalancer')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'rebalancer')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'box_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'box_2_a')
+---
+...
+--
+-- A replicaset can be locked. Locked replicaset can neither
+-- receive new buckets nor send own ones during rebalancing.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+vshard.storage.bucket_force_create(1501, 1500)
+---
+- true
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+vshard.storage.bucket_force_create(1, 1500)
+---
+- true
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+--
+-- Check that a weight = 0 will not do anything with a locked
+-- replicaset. Moreover, this cluster is considered to be balanced
+-- ok.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+---
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 0
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+---
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 0
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+vshard.storage.is_locked()
+---
+- true
+...
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 1500
+...
+info.lock
+---
+- true
+...
+--
+-- Check that a locked replicaset not only blocks bucket sending,
+-- but blocks receiving as well.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.weight = 2
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.weight = 2
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 1500
+...
+info.lock
+---
+- true
+...
+--
+-- Vshard ensures that if a replicaset is locked, then it will not
+-- allow to change its bucket set even if a rebalancer does not
+-- know about a lock yet. For example, a locked replicaset could
+-- be reconfigured a bit earlier.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+---
+...
+rs2_cfg.lock = true
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('Replicaset is locked', test_run)
+---
+...
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+---
+...
+rs2_cfg.lock = true
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+--
+-- Check that when a new replicaset is added, buckets are spreaded
+-- on non-locked replicasets as if locked replicasets and buckets
+-- do not exist.
+--
+test_run:switch('default')
+---
+- true
+...
+test_run:create_cluster(REPLICASET_3, 'rebalancer')
+---
+...
+util.wait_master(test_run, REPLICASET_3, 'box_3_a')
+---
+...
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 1
+---
+...
+-- Return default configuration.
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 0.5
+---
+...
+add_replicaset()
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+---
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+---
+...
+rs2_cfg.weight = 0.5
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 0.5
+---
+...
+add_replicaset()
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 1500
+...
+info.lock
+---
+- true
+...
+test_run:switch('box_2_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 500
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 1000
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_3)
+---
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
diff --git a/test/rebalancer/rebalancer_lock_and_pin.test.lua b/test/rebalancer/rebalancer_lock_and_pin.test.lua
new file mode 100644
index 0000000..d0f2163
--- /dev/null
+++ b/test/rebalancer/rebalancer_lock_and_pin.test.lua
@@ -0,0 +1,139 @@
+test_run = require('test_run').new()
+
+REPLICASET_1 = { 'box_1_a', 'box_1_b' }
+REPLICASET_2 = { 'box_2_a', 'box_2_b' }
+REPLICASET_3 = { 'box_3_a', 'box_3_b' }
+
+test_run:create_cluster(REPLICASET_1, 'rebalancer')
+test_run:create_cluster(REPLICASET_2, 'rebalancer')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'box_1_a')
+util.wait_master(test_run, REPLICASET_2, 'box_2_a')
+
+--
+-- A replicaset can be locked. Locked replicaset can neither
+-- receive new buckets nor send own ones during rebalancing.
+--
+
+test_run:switch('box_2_a')
+vshard.storage.bucket_force_create(1501, 1500)
+
+test_run:switch('box_1_a')
+vshard.storage.bucket_force_create(1, 1500)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+
+--
+-- Check that a weight = 0 will not do anything with a locked
+-- replicaset. Moreover, this cluster is considered to be balanced
+-- ok.
+--
+test_run:switch('box_2_a')
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+rs1_cfg.lock = true
+rs1_cfg.weight = 0
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_1_a')
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+rs1_cfg.lock = true
+rs1_cfg.weight = 0
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+vshard.storage.is_locked()
+info = vshard.storage.info().bucket
+info.active
+info.lock
+
+--
+-- Check that a locked replicaset not only blocks bucket sending,
+-- but blocks receiving as well.
+--
+test_run:switch('box_2_a')
+rs1_cfg.weight = 2
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_1_a')
+rs1_cfg.weight = 2
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+info = vshard.storage.info().bucket
+info.active
+info.lock
+
+--
+-- Vshard ensures that if a replicaset is locked, then it will not
+-- allow to change its bucket set even if a rebalancer does not
+-- know about a lock yet. For example, a locked replicaset could
+-- be reconfigured a bit earlier.
+--
+test_run:switch('box_2_a')
+rs1_cfg.lock = false
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+rs2_cfg.lock = true
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_1_a')
+rs1_cfg.lock = false
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('Replicaset is locked', test_run)
+
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+rs2_cfg.lock = true
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+
+--
+-- Check that when a new replicaset is added, buckets are spreaded
+-- on non-locked replicasets as if locked replicasets and buckets
+-- do not exist.
+--
+
+test_run:switch('default')
+test_run:create_cluster(REPLICASET_3, 'rebalancer')
+util.wait_master(test_run, REPLICASET_3, 'box_3_a')
+
+test_run:switch('box_2_a')
+rs1_cfg.lock = true
+rs1_cfg.weight = 1
+-- Return default configuration.
+rs2_cfg.lock = false
+rs2_cfg.weight = 0.5
+add_replicaset()
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_3_a')
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+rs1_cfg.lock = true
+rs1_cfg.weight = 1
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+rs2_cfg.weight = 0.5
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+
+test_run:switch('box_1_a')
+rs1_cfg.lock = true
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 0.5
+add_replicaset()
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+info = vshard.storage.info().bucket
+info.active
+info.lock
+
+test_run:switch('box_2_a')
+vshard.storage.info().bucket.active
+
+test_run:switch('box_3_a')
+vshard.storage.info().bucket.active
+
+test_run:cmd("switch default")
+test_run:drop_cluster(REPLICASET_3)
+test_run:drop_cluster(REPLICASET_2)
+test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index 9860676..c30dbf6 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -105,7 +105,8 @@ local replicaset_template = {
     {'weight', {
         type = 'non-negative number', name = 'Weight', is_optional = true,
         default = 1,
-    }}
+    }},
+    {'lock', {type = 'boolean', name = 'Lock', is_optional = true}},
 }
 
 --
diff --git a/vshard/error.lua b/vshard/error.lua
index 10977fa..8d26d11 100644
--- a/vshard/error.lua
+++ b/vshard/error.lua
@@ -19,8 +19,10 @@ end
 local function vshard_error(code, args, msg)
     local ret = setmetatable({type = 'ShardingError', code = code,
                               message = msg}, {__tostring = json.encode})
-    for k, v in pairs(args) do
-        ret[k] = v
+    if args then
+        for k, v in pairs(args) do
+            ret[k] = v
+        end
     end
     return ret
 end
@@ -36,13 +38,14 @@ local function make_error(e)
     elseif type(e) == 'string' then
         local ok, err = pcall(box.error, box.error.PROC_LUA, e)
         return box_error(err)
+    elseif type(e) == 'table' then
+        return setmetatable(e, {__tostring = json.encode})
     else
         return e
     end
 end
 
 local error_code = {
-    -- Error codes. Some of them are used for alerts too.
     WRONG_BUCKET = 1,
     NON_MASTER = 2,
     BUCKET_ALREADY_EXISTS = 3,
@@ -53,8 +56,6 @@ local error_code = {
     UNREACHABLE_REPLICASET = 8,
     NO_ROUTE_TO_BUCKET = 9,
     NON_EMPTY = 10,
-
-    -- Alert codes.
     UNREACHABLE_MASTER = 11,
     OUT_OF_SYNC = 12,
     HIGH_REPLICATION_LAG = 13,
@@ -63,6 +64,7 @@ local error_code = {
     INVALID_REBALANCING = 16,
     SUBOPTIMAL_REPLICA = 17,
     UNKNOWN_BUCKETS = 18,
+    REPLICASET_IS_LOCKED = 19,
 }
 
 local error_message_template = {
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 559d305..724b1b3 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -456,6 +456,7 @@ local function buildall(sharding_cfg, old_replicasets)
             uuid = replicaset_uuid,
             weight = replicaset.weight,
             bucket_count = 0,
+            lock = replicaset.lock,
         }, replicaset_mt)
         local priority_list = {}
         for replica_uuid, replica in pairs(replicaset.replicas) do
@@ -511,8 +512,6 @@ local function buildall(sharding_cfg, old_replicasets)
         new_replicaset.priority_list = priority_list
         new_replicasets[replicaset_uuid] = new_replicaset
     end
-    cluster_calculate_ethalon_balance(new_replicasets,
-                                      sharding_cfg.bucket_count)
     return new_replicasets
 end
 
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index c04aebc..e990c69 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -520,6 +520,7 @@ local function cluster_bootstrap()
                                       'Cluster is already bootstrapped')
         end
     end
+    lreplicaset.calculate_ethalon_balance(M.replicasets, M.total_bucket_count)
     local bucket_id = 1
     for uuid, replicaset in pairs(M.replicasets) do
         if replicaset.ethalon_bucket_count > 0 then
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index f68ea35..405585d 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -40,7 +40,14 @@ if not M then
         -- This counter is used to restart background fibers with
         -- new reloaded code.
         module_version = 0,
+        --
+        -- Timeout to wait sync with slaves. Used on master
+        -- demotion or on a manual sync() call.
+        --
         sync_timeout = consts.DEFAULT_SYNC_TIMEOUT,
+        -- References to a parent replicaset and self in it.
+        this_replicaset = nil,
+        this_replica = nil,
 
         ------------------- Garbage collection -------------------
         -- Fiber to remove garbage buckets data.
@@ -78,6 +85,14 @@ if not M then
     }
 end
 
+--
+-- Check if this replicaset is locked. It means be invisible for
+-- the rebalancer.
+--
+local function is_this_replicaset_locked()
+    return M.this_replicaset and M.this_replicaset.lock
+end
+
 --------------------------------------------------------------------------------
 -- Schema
 --------------------------------------------------------------------------------
@@ -1135,6 +1150,10 @@ end
 -- }. Is used by a rebalancer.
 --
 local function rebalancer_apply_routes(routes)
+    if is_this_replicaset_locked() then
+        return false, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED, nil,
+                                    "Replicaset is locked");
+    end
     assert(not rebalancing_is_in_progress())
     -- Can not apply routes here because of gh-946 in tarantool
     -- about problems with long polling. Apply routes in a fiber.
@@ -1151,6 +1170,7 @@ end
 --
 local function rebalancer_download_states()
     local replicasets = {}
+    local total_bucket_locked_count = 0
     local total_bucket_active_count = 0
     for uuid, replicaset in pairs(M.replicasets) do
         local bucket_active_count =
@@ -1158,19 +1178,23 @@ local function rebalancer_download_states()
         if bucket_active_count == nil then
             return
         end
-        total_bucket_active_count = total_bucket_active_count +
-                                    bucket_active_count
-        replicasets[uuid] = {bucket_count = bucket_active_count,
-                             weight = replicaset.weight or 1,
-                             ethalon_bucket_count =
-                                replicaset.ethalon_bucket_count}
-    end
-    if total_bucket_active_count == M.total_bucket_count then
-        return replicasets
+        if replicaset.lock then
+            total_bucket_locked_count =
+                total_bucket_locked_count + bucket_active_count
+        else
+            total_bucket_active_count =
+                total_bucket_active_count + bucket_active_count
+            replicasets[uuid] = {bucket_count = bucket_active_count,
+                                 weight = replicaset.weight}
+        end
+    end
+    local sum = total_bucket_active_count + total_bucket_locked_count
+    if sum == M.total_bucket_count then
+        return replicasets, total_bucket_active_count
     else
         log.info('Total active bucket count is not equal to total. '..
                  'Possibly a boostrap is not finished yet. Expected %d, but '..
-                 'found %d', M.total_bucket_count, total_bucket_active_count)
+                 'found %d', M.total_bucket_count, sum)
     end
 end
 
@@ -1185,7 +1209,8 @@ local function rebalancer_f(module_version)
             log.info('Rebalancer is disabled. Sleep')
             lfiber.sleep(consts.REBALANCER_IDLE_INTERVAL)
         end
-        local status, replicasets = pcall(rebalancer_download_states)
+        local status, replicasets, total_bucket_active_count =
+            pcall(rebalancer_download_states)
         if M.module_version ~= module_version then
             return
         end
@@ -1198,6 +1223,8 @@ local function rebalancer_f(module_version)
             lfiber.sleep(consts.REBALANCER_WORK_INTERVAL)
             goto continue
         end
+        lreplicaset.calculate_ethalon_balance(replicasets,
+                                              total_bucket_active_count)
         local max_disbalance =
             rebalancer_calculate_metrics(replicasets,
                                          M.rebalancer_max_receiving)
@@ -1224,7 +1251,7 @@ local function rebalancer_f(module_version)
                           {src_routes})
             if not status then
                 log.error('Error during routes appying on "%s": %s. '..
-                          'Try rebalance later', rs, err)
+                          'Try rebalance later', rs, lerror.make(err))
                 lfiber.sleep(consts.REBALANCER_WORK_INTERVAL)
                 goto continue
             end
@@ -1558,6 +1585,9 @@ local function storage_info()
         state.replication.status = 'slave'
     end
 
+    if is_this_replicaset_locked() then
+        state.bucket.lock = true
+    end
     state.bucket.total = box.space._bucket.index.pk:count()
     state.bucket.active = box.space._bucket.index.status:count({consts.BUCKET.ACTIVE})
     state.bucket.garbage = box.space._bucket.index.status:count({consts.BUCKET.SENT})
@@ -1689,6 +1719,7 @@ return {
     rebalancer_apply_routes = rebalancer_apply_routes;
     rebalancer_disable = rebalancer_disable;
     rebalancer_enable = rebalancer_enable;
+    is_locked = is_this_replicaset_locked;
     recovery_wakeup = recovery_wakeup;
     call = storage_call;
     cfg = storage_cfg;
-- 
2.14.3 (Apple Git-98)





More information about the Tarantool-patches mailing list