Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH vshard 0/7] Replicaset lock and bucket pin
@ 2018-03-27 21:24 Vladislav Shpilevoy
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 1/7] rebalancer: allow to lock a replicaset from rebalancing Vladislav Shpilevoy
                   ` (7 more replies)
  0 siblings, 8 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-27 21:24 UTC (permalink / raw)
  To: tarantool-patches; +Cc: georgy, Vladislav Shpilevoy

Branch: http://github.com/tarantool/tarantool/tree/gh-71-bucket-pin-lock
Issue: https://github.com/tarantool/vshard/issues/71

Replicaset lock makes it invisible for the rebalancer - a locked
replicaset can neither receive new buckets nor send its own.
Bucket pin blocks this concrete bucket sending - it will stay on a
replicaset to which pinned, until it is unpinned. Pinning all
replicaset buckets is not the same as replicaset locking - even if
you pin all buckets, the non-locked replicaset still can receive
new buckets.

Replicaset lock allows, for example, to separate a replicaset for
testsing from production replicasets. Or to preserve some
application metadata, that must not be sharded for a while. Bucket
pin allows the same, but in the smaller scope.

Difference between replicaset lock and all buckets pinning is
motivated by ability to button-up an entire replicaset.

Mostly locked and pinned buckets affect the rebalancing algorithm,
which must ignore locked replicasets, and take pinned buckets into
account, attempting to reach the best possible balance. It is not
a trivial task, because a user can pin to a replicaset so many
buckets, that a perfect balance is unreachable.

Vladislav Shpilevoy (7):
  rebalancer: allow to lock a replicaset from rebalancing
  rebalancer: remember the currently sending bucket id
  storage: rework recovery
  storage: wrap bucket status checks into functions
  rebalancer: introduce pinned bucket concept into rebalancer algo
  storage: open public API to pin/unpin buckets
  rfc: add RFC for replicaset lock and bucket pin

 docs/RFC/replicaset_lock_and_bucket.md            |  84 ++++
 test/rebalancer/box_1_a.lua                       |   2 +-
 test/rebalancer/rebalancer_lock_and_pin.result    | 503 ++++++++++++++++++++++
 test/rebalancer/rebalancer_lock_and_pin.test.lua  | 224 ++++++++++
 test/rebalancer/restart_during_rebalancing.result |   4 +
 test/storage/recovery.result                      |  85 +++-
 test/storage/recovery.test.lua                    |  38 +-
 test/storage/storage.result                       |   7 +
 test/unit/rebalancer.result                       | 333 +++++++++++++-
 test/unit/rebalancer.test.lua                     |  79 ++++
 vshard/cfg.lua                                    |   3 +-
 vshard/consts.lua                                 |   1 +
 vshard/error.lua                                  |  12 +-
 vshard/replicaset.lua                             | 105 +++--
 vshard/router/init.lua                            |  46 +-
 vshard/storage/init.lua                           | 418 +++++++++++-------
 16 files changed, 1713 insertions(+), 231 deletions(-)
 create mode 100644 docs/RFC/replicaset_lock_and_bucket.md
 create mode 100644 test/rebalancer/rebalancer_lock_and_pin.result
 create mode 100644 test/rebalancer/rebalancer_lock_and_pin.test.lua

-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [tarantool-patches] [PATCH vshard 1/7] rebalancer: allow to lock a replicaset from rebalancing
  2018-03-27 21:24 [tarantool-patches] [PATCH vshard 0/7] Replicaset lock and bucket pin Vladislav Shpilevoy
@ 2018-03-27 21:24 ` Vladislav Shpilevoy
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 2/7] rebalancer: remember the currently sending bucket id Vladislav Shpilevoy
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-27 21:24 UTC (permalink / raw)
  To: tarantool-patches; +Cc: georgy, Vladislav Shpilevoy

Locked replicaset can neither receive new buckets nor send its
own ones. Actually, it and its buckets do not participate in
rebalancing, and non-locked replicasets are rebalanced
independently.

For example, consider a cluster:
replicaset1: locked, weight = 1, bucket count = 1500
replicaset2:         weight = 1, bucket count = 1500

When a replicaset3 is added, only rs3 and rs2 participate in
rebalancing (respecting their weights):
replicaset1: locked, weight = 1, bucket count = 1500
replicaset2:         weight = 1, bucket_count = 500
replicaset3:         weight = 2, bucket_count = 1000

The lock is useful for example to hold some test data on a
particular replicaset, or to store on the replicaset a special
data, that must be stored together.

Part of #71
---
 test/rebalancer/box_1_a.lua                      |   2 +-
 test/rebalancer/rebalancer_lock_and_pin.result   | 315 +++++++++++++++++++++++
 test/rebalancer/rebalancer_lock_and_pin.test.lua | 139 ++++++++++
 vshard/cfg.lua                                   |   3 +-
 vshard/error.lua                                 |  12 +-
 vshard/replicaset.lua                            |   3 +-
 vshard/router/init.lua                           |   1 +
 vshard/storage/init.lua                          |  55 +++-
 8 files changed, 509 insertions(+), 21 deletions(-)
 create mode 100644 test/rebalancer/rebalancer_lock_and_pin.result
 create mode 100644 test/rebalancer/rebalancer_lock_and_pin.test.lua

diff --git a/test/rebalancer/box_1_a.lua b/test/rebalancer/box_1_a.lua
index d670293..8fddcf0 100644
--- a/test/rebalancer/box_1_a.lua
+++ b/test/rebalancer/box_1_a.lua
@@ -3,7 +3,7 @@
 require('strict').on()
 local fio = require('fio')
 local NAME = fio.basename(arg[0], '.lua')
-local log = require('log')
+log = require('log')
 require('console').listen(os.getenv('ADMIN'))
 fiber = require('fiber')
 
diff --git a/test/rebalancer/rebalancer_lock_and_pin.result b/test/rebalancer/rebalancer_lock_and_pin.result
new file mode 100644
index 0000000..b61cc84
--- /dev/null
+++ b/test/rebalancer/rebalancer_lock_and_pin.result
@@ -0,0 +1,315 @@
+test_run = require('test_run').new()
+---
+...
+REPLICASET_1 = { 'box_1_a', 'box_1_b' }
+---
+...
+REPLICASET_2 = { 'box_2_a', 'box_2_b' }
+---
+...
+REPLICASET_3 = { 'box_3_a', 'box_3_b' }
+---
+...
+test_run:create_cluster(REPLICASET_1, 'rebalancer')
+---
+...
+test_run:create_cluster(REPLICASET_2, 'rebalancer')
+---
+...
+util = require('util')
+---
+...
+util.wait_master(test_run, REPLICASET_1, 'box_1_a')
+---
+...
+util.wait_master(test_run, REPLICASET_2, 'box_2_a')
+---
+...
+--
+-- A replicaset can be locked. Locked replicaset can neither
+-- receive new buckets nor send own ones during rebalancing.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+vshard.storage.bucket_force_create(1501, 1500)
+---
+- true
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+vshard.storage.bucket_force_create(1, 1500)
+---
+- true
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+--
+-- Check that a weight = 0 will not do anything with a locked
+-- replicaset. Moreover, this cluster is considered to be balanced
+-- ok.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+---
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 0
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+---
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 0
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+vshard.storage.is_locked()
+---
+- true
+...
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 1500
+...
+info.lock
+---
+- true
+...
+--
+-- Check that a locked replicaset not only blocks bucket sending,
+-- but blocks receiving as well.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.weight = 2
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.weight = 2
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 1500
+...
+info.lock
+---
+- true
+...
+--
+-- Vshard ensures that if a replicaset is locked, then it will not
+-- allow to change its bucket set even if a rebalancer does not
+-- know about a lock yet. For example, a locked replicaset could
+-- be reconfigured a bit earlier.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+---
+...
+rs2_cfg.lock = true
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('Replicaset is locked', test_run)
+---
+...
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+---
+...
+rs2_cfg.lock = true
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+--
+-- Check that when a new replicaset is added, buckets are spreaded
+-- on non-locked replicasets as if locked replicasets and buckets
+-- do not exist.
+--
+test_run:switch('default')
+---
+- true
+...
+test_run:create_cluster(REPLICASET_3, 'rebalancer')
+---
+...
+util.wait_master(test_run, REPLICASET_3, 'box_3_a')
+---
+...
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 1
+---
+...
+-- Return default configuration.
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 0.5
+---
+...
+add_replicaset()
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+---
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+---
+...
+rs2_cfg.weight = 0.5
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.lock = true
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 0.5
+---
+...
+add_replicaset()
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 1500
+...
+info.lock
+---
+- true
+...
+test_run:switch('box_2_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 500
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 1000
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:drop_cluster(REPLICASET_3)
+---
+...
+test_run:drop_cluster(REPLICASET_2)
+---
+...
+test_run:drop_cluster(REPLICASET_1)
+---
+...
diff --git a/test/rebalancer/rebalancer_lock_and_pin.test.lua b/test/rebalancer/rebalancer_lock_and_pin.test.lua
new file mode 100644
index 0000000..d0f2163
--- /dev/null
+++ b/test/rebalancer/rebalancer_lock_and_pin.test.lua
@@ -0,0 +1,139 @@
+test_run = require('test_run').new()
+
+REPLICASET_1 = { 'box_1_a', 'box_1_b' }
+REPLICASET_2 = { 'box_2_a', 'box_2_b' }
+REPLICASET_3 = { 'box_3_a', 'box_3_b' }
+
+test_run:create_cluster(REPLICASET_1, 'rebalancer')
+test_run:create_cluster(REPLICASET_2, 'rebalancer')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'box_1_a')
+util.wait_master(test_run, REPLICASET_2, 'box_2_a')
+
+--
+-- A replicaset can be locked. Locked replicaset can neither
+-- receive new buckets nor send own ones during rebalancing.
+--
+
+test_run:switch('box_2_a')
+vshard.storage.bucket_force_create(1501, 1500)
+
+test_run:switch('box_1_a')
+vshard.storage.bucket_force_create(1, 1500)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+
+--
+-- Check that a weight = 0 will not do anything with a locked
+-- replicaset. Moreover, this cluster is considered to be balanced
+-- ok.
+--
+test_run:switch('box_2_a')
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+rs1_cfg.lock = true
+rs1_cfg.weight = 0
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_1_a')
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+rs1_cfg.lock = true
+rs1_cfg.weight = 0
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+vshard.storage.is_locked()
+info = vshard.storage.info().bucket
+info.active
+info.lock
+
+--
+-- Check that a locked replicaset not only blocks bucket sending,
+-- but blocks receiving as well.
+--
+test_run:switch('box_2_a')
+rs1_cfg.weight = 2
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_1_a')
+rs1_cfg.weight = 2
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+info = vshard.storage.info().bucket
+info.active
+info.lock
+
+--
+-- Vshard ensures that if a replicaset is locked, then it will not
+-- allow to change its bucket set even if a rebalancer does not
+-- know about a lock yet. For example, a locked replicaset could
+-- be reconfigured a bit earlier.
+--
+test_run:switch('box_2_a')
+rs1_cfg.lock = false
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+rs2_cfg.lock = true
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_1_a')
+rs1_cfg.lock = false
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('Replicaset is locked', test_run)
+
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+rs2_cfg.lock = true
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+
+--
+-- Check that when a new replicaset is added, buckets are spreaded
+-- on non-locked replicasets as if locked replicasets and buckets
+-- do not exist.
+--
+
+test_run:switch('default')
+test_run:create_cluster(REPLICASET_3, 'rebalancer')
+util.wait_master(test_run, REPLICASET_3, 'box_3_a')
+
+test_run:switch('box_2_a')
+rs1_cfg.lock = true
+rs1_cfg.weight = 1
+-- Return default configuration.
+rs2_cfg.lock = false
+rs2_cfg.weight = 0.5
+add_replicaset()
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+
+test_run:switch('box_3_a')
+rs1_cfg = cfg.sharding[names.rs_uuid[1]]
+rs1_cfg.lock = true
+rs1_cfg.weight = 1
+rs2_cfg = cfg.sharding[names.rs_uuid[2]]
+rs2_cfg.weight = 0.5
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+
+test_run:switch('box_1_a')
+rs1_cfg.lock = true
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 0.5
+add_replicaset()
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+info = vshard.storage.info().bucket
+info.active
+info.lock
+
+test_run:switch('box_2_a')
+vshard.storage.info().bucket.active
+
+test_run:switch('box_3_a')
+vshard.storage.info().bucket.active
+
+test_run:cmd("switch default")
+test_run:drop_cluster(REPLICASET_3)
+test_run:drop_cluster(REPLICASET_2)
+test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index 9860676..c30dbf6 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -105,7 +105,8 @@ local replicaset_template = {
     {'weight', {
         type = 'non-negative number', name = 'Weight', is_optional = true,
         default = 1,
-    }}
+    }},
+    {'lock', {type = 'boolean', name = 'Lock', is_optional = true}},
 }
 
 --
diff --git a/vshard/error.lua b/vshard/error.lua
index 10977fa..8d26d11 100644
--- a/vshard/error.lua
+++ b/vshard/error.lua
@@ -19,8 +19,10 @@ end
 local function vshard_error(code, args, msg)
     local ret = setmetatable({type = 'ShardingError', code = code,
                               message = msg}, {__tostring = json.encode})
-    for k, v in pairs(args) do
-        ret[k] = v
+    if args then
+        for k, v in pairs(args) do
+            ret[k] = v
+        end
     end
     return ret
 end
@@ -36,13 +38,14 @@ local function make_error(e)
     elseif type(e) == 'string' then
         local ok, err = pcall(box.error, box.error.PROC_LUA, e)
         return box_error(err)
+    elseif type(e) == 'table' then
+        return setmetatable(e, {__tostring = json.encode})
     else
         return e
     end
 end
 
 local error_code = {
-    -- Error codes. Some of them are used for alerts too.
     WRONG_BUCKET = 1,
     NON_MASTER = 2,
     BUCKET_ALREADY_EXISTS = 3,
@@ -53,8 +56,6 @@ local error_code = {
     UNREACHABLE_REPLICASET = 8,
     NO_ROUTE_TO_BUCKET = 9,
     NON_EMPTY = 10,
-
-    -- Alert codes.
     UNREACHABLE_MASTER = 11,
     OUT_OF_SYNC = 12,
     HIGH_REPLICATION_LAG = 13,
@@ -63,6 +64,7 @@ local error_code = {
     INVALID_REBALANCING = 16,
     SUBOPTIMAL_REPLICA = 17,
     UNKNOWN_BUCKETS = 18,
+    REPLICASET_IS_LOCKED = 19,
 }
 
 local error_message_template = {
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 559d305..724b1b3 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -456,6 +456,7 @@ local function buildall(sharding_cfg, old_replicasets)
             uuid = replicaset_uuid,
             weight = replicaset.weight,
             bucket_count = 0,
+            lock = replicaset.lock,
         }, replicaset_mt)
         local priority_list = {}
         for replica_uuid, replica in pairs(replicaset.replicas) do
@@ -511,8 +512,6 @@ local function buildall(sharding_cfg, old_replicasets)
         new_replicaset.priority_list = priority_list
         new_replicasets[replicaset_uuid] = new_replicaset
     end
-    cluster_calculate_ethalon_balance(new_replicasets,
-                                      sharding_cfg.bucket_count)
     return new_replicasets
 end
 
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index c04aebc..e990c69 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -520,6 +520,7 @@ local function cluster_bootstrap()
                                       'Cluster is already bootstrapped')
         end
     end
+    lreplicaset.calculate_ethalon_balance(M.replicasets, M.total_bucket_count)
     local bucket_id = 1
     for uuid, replicaset in pairs(M.replicasets) do
         if replicaset.ethalon_bucket_count > 0 then
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index f68ea35..405585d 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -40,7 +40,14 @@ if not M then
         -- This counter is used to restart background fibers with
         -- new reloaded code.
         module_version = 0,
+        --
+        -- Timeout to wait sync with slaves. Used on master
+        -- demotion or on a manual sync() call.
+        --
         sync_timeout = consts.DEFAULT_SYNC_TIMEOUT,
+        -- References to a parent replicaset and self in it.
+        this_replicaset = nil,
+        this_replica = nil,
 
         ------------------- Garbage collection -------------------
         -- Fiber to remove garbage buckets data.
@@ -78,6 +85,14 @@ if not M then
     }
 end
 
+--
+-- Check if this replicaset is locked. It means be invisible for
+-- the rebalancer.
+--
+local function is_this_replicaset_locked()
+    return M.this_replicaset and M.this_replicaset.lock
+end
+
 --------------------------------------------------------------------------------
 -- Schema
 --------------------------------------------------------------------------------
@@ -1135,6 +1150,10 @@ end
 -- }. Is used by a rebalancer.
 --
 local function rebalancer_apply_routes(routes)
+    if is_this_replicaset_locked() then
+        return false, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED, nil,
+                                    "Replicaset is locked");
+    end
     assert(not rebalancing_is_in_progress())
     -- Can not apply routes here because of gh-946 in tarantool
     -- about problems with long polling. Apply routes in a fiber.
@@ -1151,6 +1170,7 @@ end
 --
 local function rebalancer_download_states()
     local replicasets = {}
+    local total_bucket_locked_count = 0
     local total_bucket_active_count = 0
     for uuid, replicaset in pairs(M.replicasets) do
         local bucket_active_count =
@@ -1158,19 +1178,23 @@ local function rebalancer_download_states()
         if bucket_active_count == nil then
             return
         end
-        total_bucket_active_count = total_bucket_active_count +
-                                    bucket_active_count
-        replicasets[uuid] = {bucket_count = bucket_active_count,
-                             weight = replicaset.weight or 1,
-                             ethalon_bucket_count =
-                                replicaset.ethalon_bucket_count}
-    end
-    if total_bucket_active_count == M.total_bucket_count then
-        return replicasets
+        if replicaset.lock then
+            total_bucket_locked_count =
+                total_bucket_locked_count + bucket_active_count
+        else
+            total_bucket_active_count =
+                total_bucket_active_count + bucket_active_count
+            replicasets[uuid] = {bucket_count = bucket_active_count,
+                                 weight = replicaset.weight}
+        end
+    end
+    local sum = total_bucket_active_count + total_bucket_locked_count
+    if sum == M.total_bucket_count then
+        return replicasets, total_bucket_active_count
     else
         log.info('Total active bucket count is not equal to total. '..
                  'Possibly a boostrap is not finished yet. Expected %d, but '..
-                 'found %d', M.total_bucket_count, total_bucket_active_count)
+                 'found %d', M.total_bucket_count, sum)
     end
 end
 
@@ -1185,7 +1209,8 @@ local function rebalancer_f(module_version)
             log.info('Rebalancer is disabled. Sleep')
             lfiber.sleep(consts.REBALANCER_IDLE_INTERVAL)
         end
-        local status, replicasets = pcall(rebalancer_download_states)
+        local status, replicasets, total_bucket_active_count =
+            pcall(rebalancer_download_states)
         if M.module_version ~= module_version then
             return
         end
@@ -1198,6 +1223,8 @@ local function rebalancer_f(module_version)
             lfiber.sleep(consts.REBALANCER_WORK_INTERVAL)
             goto continue
         end
+        lreplicaset.calculate_ethalon_balance(replicasets,
+                                              total_bucket_active_count)
         local max_disbalance =
             rebalancer_calculate_metrics(replicasets,
                                          M.rebalancer_max_receiving)
@@ -1224,7 +1251,7 @@ local function rebalancer_f(module_version)
                           {src_routes})
             if not status then
                 log.error('Error during routes appying on "%s": %s. '..
-                          'Try rebalance later', rs, err)
+                          'Try rebalance later', rs, lerror.make(err))
                 lfiber.sleep(consts.REBALANCER_WORK_INTERVAL)
                 goto continue
             end
@@ -1558,6 +1585,9 @@ local function storage_info()
         state.replication.status = 'slave'
     end
 
+    if is_this_replicaset_locked() then
+        state.bucket.lock = true
+    end
     state.bucket.total = box.space._bucket.index.pk:count()
     state.bucket.active = box.space._bucket.index.status:count({consts.BUCKET.ACTIVE})
     state.bucket.garbage = box.space._bucket.index.status:count({consts.BUCKET.SENT})
@@ -1689,6 +1719,7 @@ return {
     rebalancer_apply_routes = rebalancer_apply_routes;
     rebalancer_disable = rebalancer_disable;
     rebalancer_enable = rebalancer_enable;
+    is_locked = is_this_replicaset_locked;
     recovery_wakeup = recovery_wakeup;
     call = storage_call;
     cfg = storage_cfg;
-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [tarantool-patches] [PATCH vshard 2/7] rebalancer: remember the currently sending bucket id
  2018-03-27 21:24 [tarantool-patches] [PATCH vshard 0/7] Replicaset lock and bucket pin Vladislav Shpilevoy
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 1/7] rebalancer: allow to lock a replicaset from rebalancing Vladislav Shpilevoy
@ 2018-03-27 21:24 ` Vladislav Shpilevoy
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 3/7] storage: rework recovery Vladislav Shpilevoy
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-27 21:24 UTC (permalink / raw)
  To: tarantool-patches; +Cc: georgy, Vladislav Shpilevoy

If is needed for recovery - if a bucket is RECEIVING on one
replicaset, and SENDING on another, but on the source replicaset
it is known, that the rebalancer does not send it now, then this
bucket must be recovered. Its partially received data must be
deleted on a destination replicaset, and its state must be ACTIVE
on a source replicaset.
---
 vshard/storage/init.lua | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 405585d..6cbeb4b 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -82,6 +82,13 @@ if not M then
         -- Maximal bucket count that can be received by a single
         -- replicaset simultaneously.
         rebalancer_max_receiving = 0,
+        -- Identifier of a bucket that rebalancer is sending now,
+        -- or else 0. If a bucket has state SENDING, but its id is
+        -- not stored here, it means, that its sending was
+        -- interrupted, for example by restart of an instance, and
+        -- a destination replicaset must drop already received
+        -- data.
+        rebalancer_sending_bucket = 0,
     }
 end
 
@@ -1109,15 +1116,17 @@ local function rebalancer_apply_routes_f(routes)
     -- guarantee that an event loop does not contain events
     -- between this fiber and its creator.
     M.rebalancer_applier_fiber = lfiber.self()
+    M.rebalancer_sending_bucket = 0
     local active_buckets = _status:select{consts.BUCKET.ACTIVE}
     local i = 1
     for dst_uuid, bucket_count in pairs(routes) do
         assert(i + bucket_count - 1 <= #active_buckets)
-        log.info('Send %d buckets to %s', bucket_count,
-                 M.replicasets[dst_uuid])
+        log.info('Send %d buckets to %s', bucket_count, M.replicasets[dst_uuid])
         for j = i, i + bucket_count - 1 do
-            local status, ret = pcall(bucket_send, active_buckets[j].id,
-                                      dst_uuid)
+            local bucket_id = active_buckets[j].id
+            M.rebalancer_sending_bucket = bucket_id
+            local status, ret = pcall(bucket_send, bucket_id, dst_uuid)
+            M.rebalancer_sending_bucket = 0
             if not status or ret ~= true then
                 if not status then
                     log.error('Error during rebalancer routes applying: %s',
-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [tarantool-patches] [PATCH vshard 3/7] storage: rework recovery
  2018-03-27 21:24 [tarantool-patches] [PATCH vshard 0/7] Replicaset lock and bucket pin Vladislav Shpilevoy
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 1/7] rebalancer: allow to lock a replicaset from rebalancing Vladislav Shpilevoy
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 2/7] rebalancer: remember the currently sending bucket id Vladislav Shpilevoy
@ 2018-03-27 21:24 ` Vladislav Shpilevoy
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 4/7] storage: wrap bucket status checks into functions Vladislav Shpilevoy
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-27 21:24 UTC (permalink / raw)
  To: tarantool-patches; +Cc: georgy, Vladislav Shpilevoy

Now RECEIVING bucket state is invisible actually, since it is
set inside a transaction, and reset to ACTIVE inside the same
transaction. But it will be wrong after #73. So take into
account in the recovery function and in state checking, that
RECEIVING buckets exist. The most significant change is that
a bucket can be set to ACTIVE, if it is SENDING on one
replicaset, RECEIVING on another one, but the rebalancer does not
sending it now.

Part of #73

Do some refactoring to simplify error processing.
---
 test/storage/recovery.result   |  85 ++++++++++++----
 test/storage/recovery.test.lua |  38 +++++--
 vshard/router/init.lua         |  45 +++++----
 vshard/storage/init.lua        | 222 +++++++++++++++++++++--------------------
 4 files changed, 230 insertions(+), 160 deletions(-)

diff --git a/test/storage/recovery.result b/test/storage/recovery.result
index d4fe6e4..fe58c4b 100644
--- a/test/storage/recovery.result
+++ b/test/storage/recovery.result
@@ -22,7 +22,7 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
 util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
 ---
 ...
-test_run:cmd("switch storage_1_a")
+test_run:switch("storage_1_a")
 ---
 - true
 ...
@@ -38,10 +38,6 @@ rs2_uuid = replicasets[2]
 _bucket = box.space._bucket
 ---
 ...
-_bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
----
-- [1, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
-...
 _bucket:replace{2, vshard.consts.BUCKET.SENDING, rs2_uuid}
 ---
 - [2, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
@@ -50,7 +46,7 @@ _bucket:replace{3, vshard.consts.BUCKET.RECEIVING, rs2_uuid}
 ---
 - [3, 'receiving', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
 ...
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
 ---
 - true
 ...
@@ -60,10 +56,6 @@ _bucket = box.space._bucket
 rs1_uuid = replicasets[1]
 ---
 ...
-_bucket:replace{1, vshard.consts.BUCKET.RECEIVING, rs1_uuid}
----
-- [1, 'receiving', 'cbf06940-0790-498b-948d-042b62cf3d29']
-...
 _bucket:replace{2, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
 ---
 - [2, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29']
@@ -72,6 +64,9 @@ _bucket:replace{3, vshard.consts.BUCKET.SENDING, rs1_uuid}
 ---
 - [3, 'sending', 'cbf06940-0790-498b-948d-042b62cf3d29']
 ...
+vshard.storage.internal.rebalancer_sending_bucket = 3
+---
+...
 test_run:cmd('stop server storage_1_a')
 ---
 - true
@@ -80,7 +75,7 @@ test_run:cmd('start server storage_1_a')
 ---
 - true
 ...
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
 ---
 - true
 ...
@@ -95,13 +90,64 @@ _bucket = box.space._bucket
 ...
 _bucket:select{}
 ---
-- - [1, 'active', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
-  - [2, 'garbage', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
+- - [2, 'garbage', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
+  - [3, 'receiving', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
+...
+test_run:switch('storage_2_a')
+---
+- true
+...
+_bucket:select{}
+---
+- - [2, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29']
+  - [3, 'sending', 'cbf06940-0790-498b-948d-042b62cf3d29']
+...
+test_run:switch('storage_1_a')
+---
+- true
 ...
 while _bucket:count() ~= 1 do fiber.sleep(0.1) end
 ---
 ...
 --
+-- Test a case, when a bucket is sending on one replicaset,
+-- receiving on another one, but there is no rebalancing.
+--
+test_run:cmd('stop server storage_2_a')
+---
+- true
+...
+test_run:cmd('start server storage_2_a')
+---
+- true
+...
+test_run:switch('storage_2_a')
+---
+- true
+...
+vshard.storage.recovery_wakeup()
+---
+...
+fiber = require('fiber')
+---
+...
+_bucket = box.space._bucket
+---
+...
+while _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) ~= 2 do fiber.sleep(0.1) end
+---
+...
+test_run:switch('storage_1_a')
+---
+- true
+...
+vshard.storage.recovery_wakeup()
+---
+...
+while _bucket:count() ~= 0 do fiber.sleep(0.1) end
+---
+...
+--
 -- Test a case, when a destination is down. The recovery fiber
 -- must restore buckets, when the destination is up.
 --
@@ -112,15 +158,18 @@ _bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
 ---
 - [1, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
 ...
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
 ---
 - true
 ...
+rs1_uuid = replicasets[1]
+---
+...
 _bucket:replace{1, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
 ---
 - [1, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29']
 ...
-test_run:cmd('switch default')
+test_run:switch('default')
 ---
 - true
 ...
@@ -136,7 +185,7 @@ test_run:cmd('start server storage_1_a')
 ---
 - true
 ...
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
 ---
 - true
 ...
@@ -168,7 +217,7 @@ _bucket:select{}
 ---
 - []
 ...
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
 ---
 - true
 ...
@@ -230,7 +279,7 @@ fiber = require('fiber')
 while _bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
 ---
 ...
-test_run:cmd("switch default")
+test_run:switch("default")
 ---
 - true
 ...
diff --git a/test/storage/recovery.test.lua b/test/storage/recovery.test.lua
index b3ad269..8cfdc34 100644
--- a/test/storage/recovery.test.lua
+++ b/test/storage/recovery.test.lua
@@ -8,7 +8,7 @@ util = require('util')
 util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
 util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
 
-test_run:cmd("switch storage_1_a")
+test_run:switch("storage_1_a")
 vshard.storage.rebalancer_disable()
 
 rs2_uuid = replicasets[2]
@@ -17,39 +17,57 @@ rs2_uuid = replicasets[2]
 -- must be garbaged on bootstrap.
 _bucket = box.space._bucket
 
-_bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
 _bucket:replace{2, vshard.consts.BUCKET.SENDING, rs2_uuid}
 _bucket:replace{3, vshard.consts.BUCKET.RECEIVING, rs2_uuid}
 
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
 _bucket = box.space._bucket
 rs1_uuid = replicasets[1]
-_bucket:replace{1, vshard.consts.BUCKET.RECEIVING, rs1_uuid}
 _bucket:replace{2, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
 _bucket:replace{3, vshard.consts.BUCKET.SENDING, rs1_uuid}
+vshard.storage.internal.rebalancer_sending_bucket = 3
 
 test_run:cmd('stop server storage_1_a')
 test_run:cmd('start server storage_1_a')
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
 fiber = require('fiber')
 vshard.storage.recovery_wakeup()
 _bucket = box.space._bucket
 _bucket:select{}
+test_run:switch('storage_2_a')
+_bucket:select{}
+test_run:switch('storage_1_a')
 while _bucket:count() ~= 1 do fiber.sleep(0.1) end
 
+--
+-- Test a case, when a bucket is sending on one replicaset,
+-- receiving on another one, but there is no rebalancing.
+--
+test_run:cmd('stop server storage_2_a')
+test_run:cmd('start server storage_2_a')
+test_run:switch('storage_2_a')
+vshard.storage.recovery_wakeup()
+fiber = require('fiber')
+_bucket = box.space._bucket
+while _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) ~= 2 do fiber.sleep(0.1) end
+test_run:switch('storage_1_a')
+vshard.storage.recovery_wakeup()
+while _bucket:count() ~= 0 do fiber.sleep(0.1) end
+
 --
 -- Test a case, when a destination is down. The recovery fiber
 -- must restore buckets, when the destination is up.
 --
 rs2_uuid = replicasets[2]
 _bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
+rs1_uuid = replicasets[1]
 _bucket:replace{1, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
-test_run:cmd('switch default')
+test_run:switch('default')
 test_run:cmd('stop server storage_2_a')
 test_run:cmd('stop server storage_1_a')
 test_run:cmd('start server storage_1_a')
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
 _bucket = box.space._bucket
 _bucket:select{}
 for i = 1, 10 do vshard.storage.recovery_wakeup() end
@@ -59,7 +77,7 @@ fiber = require('fiber')
 while _bucket:count() ~= 0 do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
 _bucket:select{}
 
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
 _bucket = box.space._bucket
 _bucket:select{}
 
@@ -80,7 +98,7 @@ _bucket = box.space._bucket
 fiber = require('fiber')
 while _bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
 
-test_run:cmd("switch default")
+test_run:switch("default")
 
 test_run:drop_cluster(REPLICASET_2)
 test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index e990c69..9780b15 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -67,29 +67,29 @@ local function bucket_discovery(bucket_id)
     end
 
     log.verbose("Discovering bucket %d", bucket_id)
+    local last_err = nil
     local unreachable_uuid = nil
-    local is_transfer_in_progress = false
-    for _, replicaset in pairs(M.replicasets) do
-        local stat, err = replicaset:callrw('vshard.storage.bucket_stat',
-                                             {bucket_id})
-        if stat then
-            if stat.status == consts.BUCKET.ACTIVE or
-               stat.status == consts.BUCKET.SENDING then
-                log.info("Discovered bucket %d on %s", bucket_id, replicaset)
-                bucket_set(bucket_id, replicaset)
-                return replicaset
-            elseif stat.status == consts.BUCKET.RECEIVING then
-                is_transfer_in_progress = true
-            end
+    for uuid, replicaset in pairs(M.replicasets) do
+        local _, err =
+            replicaset:callrw('vshard.storage.bucket_stat', {bucket_id})
+        if err == nil then
+            bucket_set(bucket_id, replicaset)
+            return replicaset
         elseif err.code ~= lerror.code.WRONG_BUCKET then
-            unreachable_uuid = replicaset.uuid
+            last_err = err
+            unreachable_uuid = uuid
         end
     end
-    local errcode = nil
-    if unreachable_uuid then
-        errcode = lerror.code.UNREACHABLE_REPLICASET
-    elseif is_transfer_in_progress then
-        errcode = lerror.code.TRANSFER_IS_IN_PROGRESS
+    local err = nil
+    if last_err then
+        if last_err.type == 'ClientError' and
+           last_err.code == box.error.NO_CONNECTION then
+            err = lerror.vshard(lerror.code.UNREACHABLE_REPLICASET,
+                                {bucket_id = bucket_id,
+                                 unreachable_uuid = unreachable_uuid})
+        else
+            err = lerror.make(last_err)
+        end
     else
         -- All replicasets were scanned, but a bucket was not
         -- found anywhere, so most likely it does not exist. It
@@ -97,11 +97,12 @@ local function bucket_discovery(bucket_id)
         -- bucket was found to be RECEIVING on one replicaset, and
         -- was not found on other replicasets (it was sent during
         -- discovery).
-        errcode = lerror.code.NO_ROUTE_TO_BUCKET
+        err = lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET,
+                            {bucket_id = bucket_id,
+                             unreachable_uuid = unreachable_uuid})
     end
 
-    return nil, lerror.vshard(errcode, {bucket_id = bucket_id,
-                                        unreachable_uuid = unreachable_uuid})
+    return nil, err
 end
 
 -- Resolve bucket id to replicaset uuid
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 6cbeb4b..471d26a 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -100,6 +100,16 @@ local function is_this_replicaset_locked()
     return M.this_replicaset and M.this_replicaset.lock
 end
 
+--
+-- Check if @a bucket is garbage. It is true for
+-- * sent buckets;
+-- * buckets explicitly marked to be a garbage.
+--
+local function bucket_is_garbage(bucket)
+    return bucket.status == consts.BUCKET.SENT or
+           bucket.status == consts.BUCKET.GARBAGE
+end
+
 --------------------------------------------------------------------------------
 -- Schema
 --------------------------------------------------------------------------------
@@ -168,6 +178,37 @@ end
 -- Recovery
 --------------------------------------------------------------------------------
 
+--
+-- Check if a rebalancing is in progress. It is true, if the node
+-- applies routes received from a rebalancer node in the special
+-- fiber.
+--
+local function rebalancing_is_in_progress()
+    local f = M.rebalancer_applier_fiber
+    return f ~= nil and f:status() ~= 'dead'
+end
+
+--
+-- Check if a local bucket can be deleted.
+--
+local function recovery_local_bucket_is_garbage(local_bucket, remote_bucket)
+    return remote_bucket and remote_bucket.status == consts.BUCKET.ACTIVE
+end
+
+--
+-- Check if a local bucket can become active.
+--
+local function recovery_local_bucket_is_active(local_bucket, remote_bucket)
+    if not remote_bucket or bucket_is_garbage(remote_bucket) then
+        return true
+    end
+    if remote_bucket.status == consts.BUCKET.RECEIVING and
+       local_bucket.status == consts.BUCKET.SENDING then
+        return M.rebalancer_sending_bucket ~= local_bucket.id
+    end
+    return false
+end
+
 --
 -- Check status of each bucket scheduled for recovery. Resolve
 -- status where possible.
@@ -186,13 +227,23 @@ local function recovery_step()
     local _bucket = box.space._bucket
     local new_count = 0
     local is_empty = true
+    --
+    -- If a rebalancer route applier fiber had exited with error
+    -- during bucket sending, then it might did not manage to
+    -- reset currently sending bucket.
+    --
+    if not rebalancing_is_in_progress() and
+       M.rebalancer_sending_bucket ~= 0 then
+        M.rebalancer_sending_bucket = 0
+    end
     for bucket_id, _ in pairs(M.buckets_to_recovery) do
         if is_empty then
             log.info('Starting buckets recovery step')
         end
         is_empty = false
         local bucket = _bucket:get{bucket_id}
-        if not bucket or bucket.status ~= consts.BUCKET.SENDING then
+        if not bucket or (bucket.status ~= consts.BUCKET.SENDING and
+                          bucket.status ~= consts.BUCKET.RECEIVING) then
             -- Possibly, a bucket was deleted or recovered by
             -- an admin. Or recovery_f started not after
             -- bootstrap, but after master change - in such a case
@@ -209,27 +260,21 @@ local function recovery_step()
         end
         local remote_bucket, err =
             destination:callrw('vshard.storage.bucket_stat', {bucket_id})
-        -- If a bucket is not found with WRONG_BUCKET errcode,
-        -- then it either does not exist on destination (possibly,
-        -- the bucket is garbage collected), or is in garbage
-        -- state (GC is in progress). In both cases it can be
-        -- restored here as active.
+        -- Check if it is not a bucket error, and this result can
+        -- not be used to recovery anything. Try later.
         if not remote_bucket and (not err or err.type ~= 'ShardingError' or
-           err.code ~= lerror.code.WRONG_BUCKET) then
-            -- We can ignore other replicasets, because a bucket
-            -- could not be sent from destination further, on
-            -- another replicaset. It is guaranteed by rebalancer
-            -- algorithm, which is stopped, if there are 'sending'
-            -- buckets. And the current bucket is exactly
-            -- 'sending'.
+                                  err.code ~= lerror.code.WRONG_BUCKET) then
             new_count = new_count + 1
             goto continue
         end
-        table.insert(recovered, bucket_id)
-        if remote_bucket and remote_bucket.status == consts.BUCKET.ACTIVE then
+        if recovery_local_bucket_is_garbage(bucket, remote_bucket) then
+            table.insert(recovered, bucket_id)
             table.insert(garbage, bucket_id)
-        else
+        elseif recovery_local_bucket_is_active(bucket, remote_bucket) then
+            table.insert(recovered, bucket_id)
             table.insert(active, bucket_id)
+        else
+            new_count = new_count + 1
         end
 ::continue::
     end
@@ -252,24 +297,6 @@ local function recovery_step()
     M.buckets_to_recovery_count = new_count
 end
 
---
--- Make all 'receiving' buckets be 'garbage'. The procedure is
--- called on instance start to garbage collect buckets, whose
--- transmition was interrupted by the server down.
---
-local function recovery_garbage_receiving_buckets()
-    local _bucket = box.space._bucket
-    local receiving_buckets =
-        _bucket.index.status:select{consts.BUCKET.RECEIVING}
-    if #receiving_buckets > 0 then
-        box.begin()
-        for _, bucket in pairs(receiving_buckets) do
-            _bucket:update({bucket.id}, {{'=', 2, consts.BUCKET.GARBAGE}})
-        end
-        box.commit()
-    end
-end
-
 --
 -- Infinite function to resolve status of buckets, whose 'sending'
 -- has failed due to tarantool or network problems. Restarts on
@@ -282,9 +309,11 @@ end
 local function recovery_f(module_version)
     lfiber.name('vshard.recovery')
     local _bucket = box.space._bucket
-    local sending_buckets = _bucket.index.status:select{consts.BUCKET.SENDING}
     M.buckets_to_recovery = {}
-    for _, bucket in pairs(sending_buckets) do
+    for _, bucket in _bucket.index.status:pairs({consts.BUCKET.SENDING}) do
+        M.buckets_to_recovery[bucket.id] = true
+    end
+    for _, bucket in _bucket.index.status:pairs({consts.BUCKET.RECEIVING}) do
         M.buckets_to_recovery[bucket.id] = true
     end
     -- Interrupt recovery if a module has been reloaded. Perhaps,
@@ -354,31 +383,24 @@ end
 -- Buckets
 --------------------------------------------------------------------------------
 
---
--- Check if @a bucket is garbage. It is true for
--- * sent buckets;
--- * buckets explicitly marked to be a garbage.
---
-local function bucket_is_garbage(bucket)
-    return bucket.status == consts.BUCKET.SENT or
-           bucket.status == consts.BUCKET.GARBAGE
-end
-
 --
 -- Check that an action of a specified mode can be applied to a
 -- bucket.
 -- @param bucket_id Bucket identifier.
 -- @param mode 'Read' or 'write' mode.
 --
--- @retval true Bucket can accept an action of a specified mode.
--- @retval nil, error object Bucket can not accept the action.
+-- @retval bucket Bucket that can accept an action of a specified
+--         mode.
+-- @retval bucket and error object Bucket that can not accept the
+--         action, and a reason why.
 --
 local function bucket_check_state(bucket_id, mode)
     assert(type(bucket_id) == 'number')
     assert(mode == 'read' or mode == 'write')
     local bucket = box.space._bucket:get({bucket_id})
     local errcode = nil
-    if bucket == nil or bucket_is_garbage(bucket) then
+    if bucket == nil or bucket_is_garbage(bucket) or
+       bucket.status == consts.BUCKET.RECEIVING then
         errcode = lerror.code.WRONG_BUCKET
     elseif (bucket.status == consts.BUCKET.SENDING and mode ~= 'read') then
         errcode = lerror.code.TRANSFER_IS_IN_PROGRESS
@@ -388,13 +410,13 @@ local function bucket_check_state(bucket_id, mode)
     end
     if errcode ~= nil then
         local dest = bucket and bucket.destination or nil
-        return nil, lerror.vshard(errcode, {bucket_id = bucket_id,
-                                            destination = dest})
+        return bucket, lerror.vshard(errcode, {bucket_id = bucket_id,
+                                               destination = dest})
     end
 
     assert(bucket.status == consts.BUCKET.ACTIVE or
            bucket.status == consts.BUCKET.SENDING and mode == 'read')
-    return true
+    return bucket, nil
 end
 
 --
@@ -404,18 +426,8 @@ local function bucket_stat(bucket_id)
     if type(bucket_id) ~= 'number' then
         error('Usage: bucket_stat(bucket_id)')
     end
-    local bucket = box.space._bucket:get({bucket_id})
-
-    if not bucket or bucket_is_garbage(bucket) then
-        return nil, lerror.vshard(lerror.code.WRONG_BUCKET,
-                                  {bucket_id = bucket_id})
-    else
-        return {
-            id = bucket.id;
-            status = bucket.status;
-            destination = bucket.destination;
-        }
-    end
+    local stat, err = bucket_check_state(bucket_id, 'read')
+    return stat and stat:tomap(), err
 end
 
 --
@@ -470,6 +482,7 @@ local function bucket_recv(bucket_id, from, data)
     end
 
     box.begin()
+    M.buckets_to_recovery[bucket_id] = true
     bucket = box.space._bucket:insert({bucket_id, consts.BUCKET.RECEIVING,
                                        from})
     -- Fill spaces with data
@@ -482,6 +495,7 @@ local function bucket_recv(bucket_id, from, data)
             -- https://github.com/tarantool/tarantool/issues/3031
             local _, boxerror = pcall(box.error, box.error.NO_SUCH_SPACE,
                                       space_id)
+            M.buckets_to_recovery[bucket_id] = nil
             return nil, lerror.box(boxerror)
         end
         for _, tuple in ipairs(space_data) do
@@ -493,6 +507,7 @@ local function bucket_recv(bucket_id, from, data)
     bucket = box.space._bucket:replace({bucket_id, consts.BUCKET.ACTIVE})
 
     box.commit()
+    M.buckets_to_recovery[bucket_id] = nil
     return true
 end
 
@@ -545,7 +560,7 @@ local function bucket_collect(bucket_id)
     end
 
     local status, err = bucket_check_state(bucket_id, 'read')
-    if not status then
+    if err then
         return nil, err
     end
     return bucket_collect_internal(bucket_id)
@@ -602,8 +617,6 @@ local function local_on_master_disable()
     log.info("Resigned from the replicaset master role")
 end
 
-local collect_garbage_f
-
 --
 -- The only thing, that must be done to abort a master promotion
 -- is a set read_only back to true.
@@ -629,7 +642,6 @@ end
 local function local_on_master_enable()
     box.cfg({read_only = false})
     M._on_master_enable:run()
-    recovery_garbage_receiving_buckets()
     -- Start background process to collect garbage.
     M.collect_bucket_garbage_fiber =
         lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',
@@ -651,7 +663,7 @@ local function bucket_send(bucket_id, destination)
     end
 
     local status, err = bucket_check_state(bucket_id, 'write')
-    if not status then
+    if err then
         return nil, err
     end
     local replicaset = M.replicasets[destination]
@@ -672,9 +684,8 @@ local function bucket_send(bucket_id, destination)
     M.buckets_to_recovery[bucket_id] = true
     box.space._bucket:replace({bucket_id, consts.BUCKET.SENDING, destination})
 
-    local status, err =
-        replicaset:callrw('vshard.storage.bucket_recv',
-                           {bucket_id, box.info.cluster.uuid, data})
+    status, err = replicaset:callrw('vshard.storage.bucket_recv',
+                                    {bucket_id, box.info.cluster.uuid, data})
     if not status then
         if err.type == 'ShardingError' then
             -- Rollback bucket state.
@@ -1142,16 +1153,6 @@ local function rebalancer_apply_routes_f(routes)
     log.info('Rebalancer routes are applied')
 end
 
---
--- Check if a rebalancing is in progress. It is true, if the node
--- applies routes received from a rebalancer node in the special
--- fiber.
---
-local function rebalancing_is_in_progress()
-    local f = M.rebalancer_applier_fiber
-    return f ~= nil and f:status() ~= 'dead'
-end
-
 --
 -- Apply routes table of type: {
 --     dst_uuid = number, -- Bucket count to send.
@@ -1337,8 +1338,8 @@ local function storage_call(bucket_id, mode, name, args)
         error('Unknown mode: '..tostring(mode))
     end
 
-    local ok, err = bucket_check_state(bucket_id, mode)
-    if not ok then
+    local status, err = bucket_check_state(bucket_id, mode)
+    if err then
         return nil, err
     end
     -- TODO: implement box.call()
@@ -1715,30 +1716,31 @@ else
 end
 
 return {
-    sync = sync;
-    bucket_force_create = bucket_force_create;
-    bucket_force_drop = bucket_force_drop;
-    bucket_collect = bucket_collect;
-    bucket_recv = bucket_recv;
-    bucket_send = bucket_send;
-    bucket_stat = bucket_stat;
-    bucket_delete_garbage = bucket_delete_garbage;
-    garbage_collector_wakeup = garbage_collector_wakeup;
-    rebalancer_wakeup = rebalancer_wakeup;
-    rebalancer_apply_routes = rebalancer_apply_routes;
-    rebalancer_disable = rebalancer_disable;
-    rebalancer_enable = rebalancer_enable;
-    is_locked = is_this_replicaset_locked;
-    recovery_wakeup = recovery_wakeup;
-    call = storage_call;
-    cfg = storage_cfg;
-    info = storage_info;
-    buckets_info = storage_buckets_info;
-    buckets_count = storage_buckets_count;
-    buckets_discovery = buckets_discovery;
-    rebalancer_request_state = rebalancer_request_state;
-    internal = M;
-    on_master_enable = on_master_enable;
-    on_master_disable = on_master_disable;
-    module_version = function() return M.module_version end;
+    sync = sync,
+    bucket_force_create = bucket_force_create,
+    bucket_force_drop = bucket_force_drop,
+    bucket_collect = bucket_collect,
+    bucket_recv = bucket_recv,
+    bucket_send = bucket_send,
+    bucket_stat = bucket_stat,
+    bucket_delete_garbage = bucket_delete_garbage,
+    garbage_collector_wakeup = garbage_collector_wakeup,
+    rebalancer_wakeup = rebalancer_wakeup,
+    rebalancer_apply_routes = rebalancer_apply_routes,
+    rebalancer_disable = rebalancer_disable,
+    rebalancer_enable = rebalancer_enable,
+    is_locked = is_this_replicaset_locked,
+    rebalancing_is_in_progress = rebalancing_is_in_progress,
+    recovery_wakeup = recovery_wakeup,
+    call = storage_call,
+    cfg = storage_cfg,
+    info = storage_info,
+    buckets_info = storage_buckets_info,
+    buckets_count = storage_buckets_count,
+    buckets_discovery = buckets_discovery,
+    rebalancer_request_state = rebalancer_request_state,
+    internal = M,
+    on_master_enable = on_master_enable,
+    on_master_disable = on_master_disable,
+    module_version = function() return M.module_version end,
 }
-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [tarantool-patches] [PATCH vshard 4/7] storage: wrap bucket status checks into functions
  2018-03-27 21:24 [tarantool-patches] [PATCH vshard 0/7] Replicaset lock and bucket pin Vladislav Shpilevoy
                   ` (2 preceding siblings ...)
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 3/7] storage: rework recovery Vladislav Shpilevoy
@ 2018-03-27 21:24 ` Vladislav Shpilevoy
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 5/7] rebalancer: introduce pinned bucket concept into rebalancer algo Vladislav Shpilevoy
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-27 21:24 UTC (permalink / raw)
  To: tarantool-patches; +Cc: georgy, Vladislav Shpilevoy

It is hard to update all places, where statuses are used, when
a new status is added.
---
 vshard/storage/init.lua | 79 ++++++++++++++++++++++++++++++++++---------------
 1 file changed, 55 insertions(+), 24 deletions(-)

diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 471d26a..fe8a40f 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -100,6 +100,29 @@ local function is_this_replicaset_locked()
     return M.this_replicaset and M.this_replicaset.lock
 end
 
+--
+-- Check if @a bucket can accept 'write' requests. Writable
+-- buckets can accept 'read' too.
+--
+local function bucket_is_writable(bucket)
+    return bucket.status == consts.BUCKET.ACTIVE
+end
+
+--
+-- Check if @a bucket can accept 'read' requests.
+--
+local function bucket_is_readable(bucket)
+    return bucket_is_writable(bucket) or bucket.status == consts.BUCKET.SENDING
+end
+
+--
+-- Check if a bucket is sending or receiving.
+--
+local function bucket_is_transfer_in_progress(bucket)
+    return bucket.status == consts.BUCKET.SENDING or
+           bucket.status == consts.BUCKET.RECEIVING
+end
+
 --
 -- Check if @a bucket is garbage. It is true for
 -- * sent buckets;
@@ -192,7 +215,7 @@ end
 -- Check if a local bucket can be deleted.
 --
 local function recovery_local_bucket_is_garbage(local_bucket, remote_bucket)
-    return remote_bucket and remote_bucket.status == consts.BUCKET.ACTIVE
+    return remote_bucket and bucket_is_writable(remote_bucket)
 end
 
 --
@@ -242,8 +265,7 @@ local function recovery_step()
         end
         is_empty = false
         local bucket = _bucket:get{bucket_id}
-        if not bucket or (bucket.status ~= consts.BUCKET.SENDING and
-                          bucket.status ~= consts.BUCKET.RECEIVING) then
+        if not bucket or not bucket_is_transfer_in_progress(bucket) then
             -- Possibly, a bucket was deleted or recovered by
             -- an admin. Or recovery_f started not after
             -- bootstrap, but after master change - in such a case
@@ -399,24 +421,32 @@ local function bucket_check_state(bucket_id, mode)
     assert(mode == 'read' or mode == 'write')
     local bucket = box.space._bucket:get({bucket_id})
     local errcode = nil
-    if bucket == nil or bucket_is_garbage(bucket) or
-       bucket.status == consts.BUCKET.RECEIVING then
+    if not bucket then
         errcode = lerror.code.WRONG_BUCKET
-    elseif (bucket.status == consts.BUCKET.SENDING and mode ~= 'read') then
-        errcode = lerror.code.TRANSFER_IS_IN_PROGRESS
-    elseif bucket.status == consts.BUCKET.ACTIVE and mode ~= 'read' and
-           M.this_replicaset.master ~= M.this_replica then
+        goto finish
+    elseif mode == 'read' then
+        if not bucket_is_readable(bucket) then
+            errcode = lerror.code.WRONG_BUCKET
+            goto finish
+        end
+    elseif not bucket_is_writable(bucket) then
+        if bucket_is_transfer_in_progress(bucket) then
+            errcode = lerror.code.TRANSFER_IS_IN_PROGRESS
+        else
+            errcode = lerror.code.WRONG_BUCKET
+        end
+        goto finish
+    elseif M.this_replicaset.master ~= M.this_replica then
         errcode = lerror.code.NON_MASTER
-    end
-    if errcode ~= nil then
-        local dest = bucket and bucket.destination or nil
-        return bucket, lerror.vshard(errcode, {bucket_id = bucket_id,
-                                               destination = dest})
-    end
-
-    assert(bucket.status == consts.BUCKET.ACTIVE or
-           bucket.status == consts.BUCKET.SENDING and mode == 'read')
-    return bucket, nil
+        goto finish
+    end
+    assert(not errcode)
+    assert(mode == 'read' and bucket_is_readable(bucket) or
+           mode == 'write' and bucket_is_writable(bucket))
+::finish::
+    return bucket, errcode and
+           lerror.vshard(errcode, {bucket_id = bucket_id,
+                                   destination = bucket and bucket.destination})
 end
 
 --
@@ -1598,11 +1628,12 @@ local function storage_info()
     if is_this_replicaset_locked() then
         state.bucket.lock = true
     end
-    state.bucket.total = box.space._bucket.index.pk:count()
-    state.bucket.active = box.space._bucket.index.status:count({consts.BUCKET.ACTIVE})
-    state.bucket.garbage = box.space._bucket.index.status:count({consts.BUCKET.SENT})
-    state.bucket.receiving = box.space._bucket.index.status:count({consts.BUCKET.RECEIVING})
-    state.bucket.sending = box.space._bucket.index.status:count({consts.BUCKET.SENDING})
+    local status = box.space._bucket.index.status
+    state.bucket.total = box.space._bucket:count()
+    state.bucket.active = status:count({consts.BUCKET.ACTIVE})
+    state.bucket.garbage = status:count({consts.BUCKET.SENT})
+    state.bucket.receiving = status:count({consts.BUCKET.RECEIVING})
+    state.bucket.sending = status:count({consts.BUCKET.SENDING})
     if state.bucket.receiving ~= 0 and state.bucket.sending ~= 0 then
         --
         --Some buckets are receiving and some buckets are sending at same time,
-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [tarantool-patches] [PATCH vshard 5/7] rebalancer: introduce pinned bucket concept into rebalancer algo
  2018-03-27 21:24 [tarantool-patches] [PATCH vshard 0/7] Replicaset lock and bucket pin Vladislav Shpilevoy
                   ` (3 preceding siblings ...)
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 4/7] storage: wrap bucket status checks into functions Vladislav Shpilevoy
@ 2018-03-27 21:24 ` Vladislav Shpilevoy
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 6/7] storage: open public API to pin/unpin buckets Vladislav Shpilevoy
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-27 21:24 UTC (permalink / raw)
  To: tarantool-patches; +Cc: georgy, Vladislav Shpilevoy

Pinned bucket is the bucket, that can not be sent out of its
replicaset. Taking pinned buckets into account changes rebalancer
algorithm, since now on some replicasets the perfect balance can
not be reached.

Iterative algorithm is used to learn the best balance in a
cluster. On each step it calculates perfect bucket count for each
replicaset. If this count can not be satisfied due to pinned
buckets, the algorithm does best effort to get the perfect
balance. This is done via ignoring of replicasets disbalanced via
pinning, and their pinned buckets. After that a new balance is
calculated. And it can happen, that it can not be satisfied too.
It is possible, because ignoring of pinned buckets in
overpopulated replicasets leads to decrease of perfect bucket
count in other replicasets, and a new values can become less that
their pinned bucket count.

Part of #71
---
 test/unit/rebalancer.result   | 333 +++++++++++++++++++++++++++++++++++++++++-
 test/unit/rebalancer.test.lua |  79 ++++++++++
 vshard/consts.lua             |   1 +
 vshard/replicaset.lua         | 102 +++++++++----
 vshard/storage/init.lua       |  22 +--
 5 files changed, 500 insertions(+), 37 deletions(-)

diff --git a/test/unit/rebalancer.result b/test/unit/rebalancer.result
index af909eb..ff7fcfc 100644
--- a/test/unit/rebalancer.result
+++ b/test/unit/rebalancer.result
@@ -369,7 +369,8 @@ _bucket:replace{3, consts.BUCKET.SENT}
 ...
 get_state()
 ---
-- 2
+- bucket_active_count: 2
+  bucket_pinned_count: 0
 ...
 _bucket:replace{1, consts.BUCKET.RECEIVING}
 ---
@@ -643,6 +644,336 @@ replicasets
     ethalon_bucket_count: 34
     bucket_count: 25
 ...
+--
+-- gh-71: allow to pin buckets. A pinned bucket can not be sent
+-- out of its replicaset even to satisfy perfect balance.
+--
+-- For this case the rebalancer does best effort balance. The
+-- perfect balance here is unrechable, since on each replicaset
+-- too many buckets are pinned.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+replicasets = {
+	uuid1 = {bucket_count = 33, pinned_count = 26, weight = 1},
+	uuid2 = {bucket_count = 33, pinned_count = 24, weight = 1},
+	uuid3 = {bucket_count = 34, pinned_count = 30, weight = 1},
+	uuid4 = {bucket_count = 0, weight = 1},
+};
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+calc_ethalon(replicasets, 100)
+---
+...
+replicasets
+---
+- uuid4:
+    weight: 1
+    ethalon_bucket_count: 20
+    bucket_count: 0
+  uuid1:
+    bucket_count: 33
+    ethalon_bucket_count: 26
+    ignore_disbalance: true
+    weight: 1
+    pinned_count: 26
+  uuid3:
+    bucket_count: 34
+    ethalon_bucket_count: 30
+    ignore_disbalance: true
+    weight: 1
+    pinned_count: 30
+  uuid2:
+    bucket_count: 33
+    ethalon_bucket_count: 24
+    ignore_disbalance: true
+    weight: 1
+    pinned_count: 24
+...
+calc_metrics(replicasets, consts.DEFAULT_REBALANCER_MAX_RECEIVING)
+---
+- 100
+...
+replicasets
+---
+- uuid4:
+    needed: 20
+    weight: 1
+    ethalon_bucket_count: 20
+    bucket_count: 0
+  uuid1:
+    bucket_count: 33
+    ethalon_bucket_count: 26
+    ignore_disbalance: true
+    needed: -7
+    weight: 1
+    pinned_count: 26
+  uuid3:
+    bucket_count: 34
+    ethalon_bucket_count: 30
+    ignore_disbalance: true
+    needed: -4
+    weight: 1
+    pinned_count: 30
+  uuid2:
+    bucket_count: 33
+    ethalon_bucket_count: 24
+    ignore_disbalance: true
+    needed: -9
+    weight: 1
+    pinned_count: 24
+...
+--
+-- Here the disbalance is ok for the replicaset with uuid1 only -
+-- other replicasets have pinned buckets too, but not enough to
+-- break the balance: buckets are moved ok to uuid4.
+--
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+replicasets = {
+	uuid1 = {bucket_count = 33, pinned_count = 30, weight = 1},
+	uuid2 = {bucket_count = 33, pinned_count = 10, weight = 1},
+	uuid3 = {bucket_count = 34, pinned_count = 15, weight = 1},
+	uuid4 = {bucket_count = 0, weight = 1},
+};
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+calc_ethalon(replicasets, 100)
+---
+...
+replicasets
+---
+- uuid4:
+    weight: 1
+    ethalon_bucket_count: 23
+    bucket_count: 0
+  uuid1:
+    bucket_count: 33
+    ethalon_bucket_count: 30
+    ignore_disbalance: true
+    weight: 1
+    pinned_count: 30
+  uuid3:
+    ethalon_bucket_count: 23
+    bucket_count: 34
+    pinned_count: 15
+    weight: 1
+  uuid2:
+    ethalon_bucket_count: 24
+    bucket_count: 33
+    pinned_count: 10
+    weight: 1
+...
+calc_metrics(replicasets, consts.DEFAULT_REBALANCER_MAX_RECEIVING)
+---
+- 100
+...
+replicasets
+---
+- uuid4:
+    needed: 23
+    weight: 1
+    ethalon_bucket_count: 23
+    bucket_count: 0
+  uuid1:
+    bucket_count: 33
+    ethalon_bucket_count: 30
+    ignore_disbalance: true
+    needed: -3
+    weight: 1
+    pinned_count: 30
+  uuid3:
+    bucket_count: 34
+    ethalon_bucket_count: 23
+    needed: -11
+    weight: 1
+    pinned_count: 15
+  uuid2:
+    bucket_count: 33
+    ethalon_bucket_count: 24
+    needed: -9
+    weight: 1
+    pinned_count: 10
+...
+--
+-- Non-locked replicaset with any pinned bucket count can receive
+-- more buckets, if the rebalancer decides it is the best balance.
+--
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+replicasets = {
+	uuid1 = {bucket_count = 30, pinned_count = 25, weight = 0},
+	uuid2 = {bucket_count = 25, pinned_count = 25, weight = 1},
+	uuid3 = {bucket_count = 25, pinned_count = 25, weight = 1},
+	uuid4 = {bucket_count = 20, weight = 0},
+};
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+calc_ethalon(replicasets, 100)
+---
+...
+replicasets
+---
+- uuid4:
+    weight: 0
+    ethalon_bucket_count: 0
+    bucket_count: 20
+  uuid1:
+    bucket_count: 30
+    ethalon_bucket_count: 25
+    ignore_disbalance: true
+    weight: 0
+    pinned_count: 25
+  uuid3:
+    ethalon_bucket_count: 37
+    bucket_count: 25
+    pinned_count: 25
+    weight: 1
+  uuid2:
+    ethalon_bucket_count: 38
+    bucket_count: 25
+    pinned_count: 25
+    weight: 1
+...
+calc_metrics(replicasets, consts.DEFAULT_REBALANCER_MAX_RECEIVING)
+---
+- inf
+...
+replicasets
+---
+- uuid4:
+    needed: -20
+    weight: 0
+    ethalon_bucket_count: 0
+    bucket_count: 20
+  uuid1:
+    bucket_count: 30
+    ethalon_bucket_count: 25
+    ignore_disbalance: true
+    needed: -5
+    weight: 0
+    pinned_count: 25
+  uuid3:
+    bucket_count: 25
+    ethalon_bucket_count: 37
+    needed: 12
+    weight: 1
+    pinned_count: 25
+  uuid2:
+    bucket_count: 25
+    ethalon_bucket_count: 38
+    needed: 13
+    weight: 1
+    pinned_count: 25
+...
+--
+-- Check that the rebalancer can calculate a complex case, when a
+-- perfect balance is learned in several steps of the algorithm.
+-- Here on the first step it is calculated, that each replicaset
+-- must contain 25 buckets. But UUID1 can not satisfy it, so it
+-- is ignored. On the next step there are 100 - 30 pinned buckets
+-- from UUID1 = 70 buckets, and 3 replicasets. A new perfect
+-- balance is 23-23-24. But it can not be satisfied too - UUID2
+-- has 25 pinned buckets, so it is ignored. On the third step
+-- there are 70 - 25 = 45 buckets and 2 replicasets. A new perfect
+-- balance is 22-23. But it is unreachable too, because UUID3 has
+-- 24 pinned buckets. So only UUID4 is not ignored, and it
+-- receives all non-pinned buckets: 45 - 24 = 21.
+--
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+replicasets = {
+	uuid1 = {bucket_count = 33, pinned_count = 30, weight = 1},
+	uuid2 = {bucket_count = 33, pinned_count = 25, weight = 1},
+	uuid3 = {bucket_count = 34, pinned_count = 24, weight = 1},
+	uuid4 = {bucket_count = 0, weight = 1},
+};
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+calc_ethalon(replicasets, 100)
+---
+...
+replicasets
+---
+- uuid4:
+    weight: 1
+    ethalon_bucket_count: 21
+    bucket_count: 0
+  uuid1:
+    bucket_count: 33
+    ethalon_bucket_count: 30
+    ignore_disbalance: true
+    weight: 1
+    pinned_count: 30
+  uuid3:
+    bucket_count: 34
+    ethalon_bucket_count: 24
+    ignore_disbalance: true
+    weight: 1
+    pinned_count: 24
+  uuid2:
+    bucket_count: 33
+    ethalon_bucket_count: 25
+    ignore_disbalance: true
+    weight: 1
+    pinned_count: 25
+...
+calc_metrics(replicasets, consts.DEFAULT_REBALANCER_MAX_RECEIVING)
+---
+- 100
+...
+replicasets
+---
+- uuid4:
+    needed: 21
+    weight: 1
+    ethalon_bucket_count: 21
+    bucket_count: 0
+  uuid1:
+    bucket_count: 33
+    ethalon_bucket_count: 30
+    ignore_disbalance: true
+    needed: -3
+    weight: 1
+    pinned_count: 30
+  uuid3:
+    bucket_count: 34
+    ethalon_bucket_count: 24
+    ignore_disbalance: true
+    needed: -10
+    weight: 1
+    pinned_count: 24
+  uuid2:
+    bucket_count: 33
+    ethalon_bucket_count: 25
+    ignore_disbalance: true
+    needed: -8
+    weight: 1
+    pinned_count: 25
+...
 _bucket:drop()
 ---
 ...
diff --git a/test/unit/rebalancer.test.lua b/test/unit/rebalancer.test.lua
index d22116b..71411bb 100644
--- a/test/unit/rebalancer.test.lua
+++ b/test/unit/rebalancer.test.lua
@@ -162,4 +162,83 @@ test_run:cmd("setopt delimiter ''");
 calc_ethalon(replicasets, 100)
 replicasets
 
+--
+-- gh-71: allow to pin buckets. A pinned bucket can not be sent
+-- out of its replicaset even to satisfy perfect balance.
+--
+-- For this case the rebalancer does best effort balance. The
+-- perfect balance here is unrechable, since on each replicaset
+-- too many buckets are pinned.
+test_run:cmd("setopt delimiter ';'")
+replicasets = {
+	uuid1 = {bucket_count = 33, pinned_count = 26, weight = 1},
+	uuid2 = {bucket_count = 33, pinned_count = 24, weight = 1},
+	uuid3 = {bucket_count = 34, pinned_count = 30, weight = 1},
+	uuid4 = {bucket_count = 0, weight = 1},
+};
+test_run:cmd("setopt delimiter ''");
+calc_ethalon(replicasets, 100)
+replicasets
+calc_metrics(replicasets, consts.DEFAULT_REBALANCER_MAX_RECEIVING)
+replicasets
+--
+-- Here the disbalance is ok for the replicaset with uuid1 only -
+-- other replicasets have pinned buckets too, but not enough to
+-- break the balance: buckets are moved ok to uuid4.
+--
+test_run:cmd("setopt delimiter ';'")
+replicasets = {
+	uuid1 = {bucket_count = 33, pinned_count = 30, weight = 1},
+	uuid2 = {bucket_count = 33, pinned_count = 10, weight = 1},
+	uuid3 = {bucket_count = 34, pinned_count = 15, weight = 1},
+	uuid4 = {bucket_count = 0, weight = 1},
+};
+test_run:cmd("setopt delimiter ''");
+calc_ethalon(replicasets, 100)
+replicasets
+calc_metrics(replicasets, consts.DEFAULT_REBALANCER_MAX_RECEIVING)
+replicasets
+--
+-- Non-locked replicaset with any pinned bucket count can receive
+-- more buckets, if the rebalancer decides it is the best balance.
+--
+test_run:cmd("setopt delimiter ';'")
+replicasets = {
+	uuid1 = {bucket_count = 30, pinned_count = 25, weight = 0},
+	uuid2 = {bucket_count = 25, pinned_count = 25, weight = 1},
+	uuid3 = {bucket_count = 25, pinned_count = 25, weight = 1},
+	uuid4 = {bucket_count = 20, weight = 0},
+};
+test_run:cmd("setopt delimiter ''");
+calc_ethalon(replicasets, 100)
+replicasets
+calc_metrics(replicasets, consts.DEFAULT_REBALANCER_MAX_RECEIVING)
+replicasets
+--
+-- Check that the rebalancer can calculate a complex case, when a
+-- perfect balance is learned in several steps of the algorithm.
+-- Here on the first step it is calculated, that each replicaset
+-- must contain 25 buckets. But UUID1 can not satisfy it, so it
+-- is ignored. On the next step there are 100 - 30 pinned buckets
+-- from UUID1 = 70 buckets, and 3 replicasets. A new perfect
+-- balance is 23-23-24. But it can not be satisfied too - UUID2
+-- has 25 pinned buckets, so it is ignored. On the third step
+-- there are 70 - 25 = 45 buckets and 2 replicasets. A new perfect
+-- balance is 22-23. But it is unreachable too, because UUID3 has
+-- 24 pinned buckets. So only UUID4 is not ignored, and it
+-- receives all non-pinned buckets: 45 - 24 = 21.
+--
+test_run:cmd("setopt delimiter ';'")
+replicasets = {
+	uuid1 = {bucket_count = 33, pinned_count = 30, weight = 1},
+	uuid2 = {bucket_count = 33, pinned_count = 25, weight = 1},
+	uuid3 = {bucket_count = 34, pinned_count = 24, weight = 1},
+	uuid4 = {bucket_count = 0, weight = 1},
+};
+test_run:cmd("setopt delimiter ''");
+calc_ethalon(replicasets, 100)
+replicasets
+calc_metrics(replicasets, consts.DEFAULT_REBALANCER_MAX_RECEIVING)
+replicasets
+
 _bucket:drop()
diff --git a/vshard/consts.lua b/vshard/consts.lua
index 1a0e4ab..db0daf4 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -2,6 +2,7 @@ return {
     -- Bucket FSM
     BUCKET = {
         ACTIVE = 'active',
+        PINNED = 'pinned',
         SENDING = 'sending',
         SENT = 'sent',
         RECEIVING = 'receiving',
diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua
index 724b1b3..6a888d7 100644
--- a/vshard/replicaset.lua
+++ b/vshard/replicaset.lua
@@ -396,42 +396,90 @@ local replica_mt = {
 
 --
 -- Calculate for each replicaset its ethalon bucket count.
+-- Iterative algorithm is used to learn the best balance in a
+-- cluster. On each step it calculates perfect bucket count for
+-- each replicaset. If this count can not be satisfied due to
+-- pinned buckets, the algorithm does best effort to get the
+-- perfect balance. This is done via ignoring of replicasets
+-- disbalanced via pinning, and their pinned buckets. After that a
+-- new balance is calculated. And it can happen, that it can not
+-- be satisfied too. It is possible, because ignoring of pinned
+-- buckets in overpopulated replicasets leads to decrease of
+-- perfect bucket count in other replicasets, and a new values can
+-- become less that their pinned bucket count.
+--
+-- On each step the algorithm either is finished, or ignores at
+-- least one new overpopulated replicaset, so it has complexity
+-- O(N^2), where N - replicaset count.
 --
 local function cluster_calculate_ethalon_balance(replicasets, bucket_count)
+    local is_balance_found = false
     local weight_sum = 0
+    local step_count = 0
+    local replicaset_count = 0
     for _, replicaset in pairs(replicasets) do
         weight_sum = weight_sum + replicaset.weight
+        replicaset_count = replicaset_count + 1
     end
-    assert(weight_sum > 0)
-    local bucket_per_weight = bucket_count / weight_sum
-    local buckets_calculated = 0
-    for _, replicaset in pairs(replicasets) do
-        replicaset.ethalon_bucket_count =
-            math.ceil(replicaset.weight * bucket_per_weight)
-        buckets_calculated =
-            buckets_calculated + replicaset.ethalon_bucket_count
-    end
-    if buckets_calculated == bucket_count then
-        return
-    end
-    -- A situation is possible, when bucket_per_weight is not
-    -- integer. Lets spread this disbalance over cluster to
-    -- make for any replicaset pair
-    -- |replicaset_1 - replicaset_2| <= 1 - this difference is
-    -- admissible.
-    local buckets_rest = buckets_calculated - bucket_count
-    for _, replicaset in pairs(replicasets) do
-        local ceil = math.ceil(replicaset.weight * bucket_per_weight)
-        local floor = math.floor(replicaset.weight * bucket_per_weight)
-        if replicaset.ethalon_bucket_count > 0 and ceil ~= floor then
-            replicaset.ethalon_bucket_count = replicaset.ethalon_bucket_count - 1
-            buckets_rest = buckets_rest - 1
-            if buckets_rest == 0 then
-                return
+    while not is_balance_found do
+        step_count = step_count + 1
+        assert(weight_sum > 0)
+        local bucket_per_weight = bucket_count / weight_sum
+        local buckets_calculated = 0
+        for _, replicaset in pairs(replicasets) do
+            if not replicaset.ignore_disbalance then
+                replicaset.ethalon_bucket_count =
+                    math.ceil(replicaset.weight * bucket_per_weight)
+                buckets_calculated =
+                    buckets_calculated + replicaset.ethalon_bucket_count
             end
         end
+        local buckets_rest = buckets_calculated - bucket_count
+        is_balance_found = true
+        for _, replicaset in pairs(replicasets) do
+            if not replicaset.ignore_disbalance then
+                -- A situation is possible, when bucket_per_weight
+                -- is not integer. Lets spread this disbalance
+                -- over the cluster.
+                if buckets_rest > 0 then
+                    local n = replicaset.weight * bucket_per_weight
+                    local ceil = math.ceil(n)
+                    local floor = math.floor(n)
+                    if replicaset.ethalon_bucket_count > 0 and ceil ~= floor then
+                        replicaset.ethalon_bucket_count =
+                            replicaset.ethalon_bucket_count - 1
+                        buckets_rest = buckets_rest - 1
+                    end
+                end
+                --
+                -- Search for incorrigible disbalance due to
+                -- pinned buckets.
+                --
+                local pinned = replicaset.pinned_count
+                if pinned and replicaset.ethalon_bucket_count < pinned then
+                    -- This replicaset can not send out enough
+                    -- buckets to reach a balance. So do the best
+                    -- effort balance by sending from the
+                    -- replicaset though non-pinned buckets. This
+                    -- replicaset and its pinned buckets does not
+                    -- participate in the next steps of balance
+                    -- calculation.
+                    is_balance_found = false
+                    bucket_count = bucket_count - replicaset.pinned_count
+                    replicaset.ethalon_bucket_count = replicaset.pinned_count
+                    replicaset.ignore_disbalance = true
+                    weight_sum = weight_sum - replicaset.weight
+                end
+            end
+        end
+        assert(buckets_rest == 0)
+        if step_count > replicaset_count then
+            -- This can happed only because of a bug in this
+            -- algorithm. But it occupies 100% of transaction
+            -- thread, so check step count explicitly.
+            return error('PANIC: the rebalancer is broken')
+        end
     end
-    assert(buckets_rest == 0)
 end
 
 --
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index fe8a40f..99fbf7b 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -1213,19 +1213,20 @@ local function rebalancer_download_states()
     local total_bucket_locked_count = 0
     local total_bucket_active_count = 0
     for uuid, replicaset in pairs(M.replicasets) do
-        local bucket_active_count =
+        local state =
             replicaset:callrw('vshard.storage.rebalancer_request_state', {})
-        if bucket_active_count == nil then
+        if state == nil then
             return
         end
+        local bucket_count = state.bucket_active_count +
+                             state.bucket_pinned_count
         if replicaset.lock then
-            total_bucket_locked_count =
-                total_bucket_locked_count + bucket_active_count
+            total_bucket_locked_count = total_bucket_locked_count + bucket_count
         else
-            total_bucket_active_count =
-                total_bucket_active_count + bucket_active_count
-            replicasets[uuid] = {bucket_count = bucket_active_count,
-                                 weight = replicaset.weight}
+            total_bucket_active_count = total_bucket_active_count + bucket_count
+            replicasets[uuid] = {bucket_count = bucket_count,
+                                 weight = replicaset.weight,
+                                 pinned_count = state.bucket_pinned_count}
         end
     end
     local sum = total_bucket_active_count + total_bucket_locked_count
@@ -1325,7 +1326,10 @@ local function rebalancer_request_state()
         return
     end
     local bucket_count = _bucket:count()
-    return status_index:count({consts.BUCKET.ACTIVE})
+    return {
+        bucket_active_count = status_index:count({consts.BUCKET.ACTIVE}),
+        bucket_pinned_count = status_index:count({consts.BUCKET.PINNED}),
+    }
 end
 
 --
-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [tarantool-patches] [PATCH vshard 6/7] storage: open public API to pin/unpin buckets
  2018-03-27 21:24 [tarantool-patches] [PATCH vshard 0/7] Replicaset lock and bucket pin Vladislav Shpilevoy
                   ` (4 preceding siblings ...)
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 5/7] rebalancer: introduce pinned bucket concept into rebalancer algo Vladislav Shpilevoy
@ 2018-03-27 21:24 ` Vladislav Shpilevoy
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 7/7] rfc: add RFC for replicaset lock and bucket pin Vladislav Shpilevoy
  2018-03-30  4:15 ` [tarantool-patches] Re: [PATCH vshard 0/7] Replicaset " Georgy Kirichenko
  7 siblings, 0 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-27 21:24 UTC (permalink / raw)
  To: tarantool-patches; +Cc: georgy, Vladislav Shpilevoy

Closes #71
---
 test/rebalancer/rebalancer_lock_and_pin.result    | 188 ++++++++++++++++++++++
 test/rebalancer/rebalancer_lock_and_pin.test.lua  |  85 ++++++++++
 test/rebalancer/restart_during_rebalancing.result |   4 +
 test/storage/storage.result                       |   7 +
 vshard/storage/init.lua                           |  57 ++++++-
 5 files changed, 339 insertions(+), 2 deletions(-)

diff --git a/test/rebalancer/rebalancer_lock_and_pin.result b/test/rebalancer/rebalancer_lock_and_pin.result
index b61cc84..95a160a 100644
--- a/test/rebalancer/rebalancer_lock_and_pin.result
+++ b/test/rebalancer/rebalancer_lock_and_pin.result
@@ -300,6 +300,194 @@ vshard.storage.info().bucket.active
 ---
 - 1000
 ...
+--
+-- Test bucket pinning. At first, return to the default
+-- configuration.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+vshard.storage.info().bucket.active
+---
+- 1000
+...
+status = box.space._bucket.index.status
+---
+...
+first_id = status:select(vshard.consts.BUCKET.ACTIVE, {limit = 1})[1].id
+---
+...
+-- Test that double pin is ok.
+vshard.storage.bucket_pin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- pinned
+...
+vshard.storage.bucket_pin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- pinned
+...
+-- Test that double unpin after pin is ok.
+vshard.storage.bucket_unpin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- active
+...
+vshard.storage.bucket_unpin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- active
+...
+-- Test that can not pin other buckets.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+box.begin()
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENDING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENT}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.GARBAGE}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.rollback()
+test_run:cmd("setopt delimiter ''");
+---
+...
+--
+-- Now pin some buckets and create such disbalance, that the
+-- rebalancer will face with unreachability of the perfect
+-- balance.
+--
+for i = 1, 800 do local ok, err = vshard.storage.bucket_pin(first_id - 1 + i) assert(ok) end
+---
+...
+status:count({vshard.consts.BUCKET.PINNED})
+---
+- 800
+...
+rs1_cfg.weight = 0.5
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+-- The perfect balance is now 200-400-400, but on the replicaset 1
+-- 800 buckets are pinned, so the actual balance is 800-1100-1100.
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 800
+...
+info.pinned
+---
+- 800
+...
+test_run:switch('box_2_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 1100
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 1100
+...
 test_run:cmd("switch default")
 ---
 - true
diff --git a/test/rebalancer/rebalancer_lock_and_pin.test.lua b/test/rebalancer/rebalancer_lock_and_pin.test.lua
index d0f2163..afef135 100644
--- a/test/rebalancer/rebalancer_lock_and_pin.test.lua
+++ b/test/rebalancer/rebalancer_lock_and_pin.test.lua
@@ -133,6 +133,91 @@ vshard.storage.info().bucket.active
 test_run:switch('box_3_a')
 vshard.storage.info().bucket.active
 
+--
+-- Test bucket pinning. At first, return to the default
+-- configuration.
+--
+test_run:switch('box_2_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+test_run:switch('box_3_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+test_run:switch('box_1_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+vshard.storage.info().bucket.active
+status = box.space._bucket.index.status
+first_id = status:select(vshard.consts.BUCKET.ACTIVE, {limit = 1})[1].id
+-- Test that double pin is ok.
+vshard.storage.bucket_pin(first_id)
+box.space._bucket:get{first_id}.status
+vshard.storage.bucket_pin(first_id)
+box.space._bucket:get{first_id}.status
+
+-- Test that double unpin after pin is ok.
+vshard.storage.bucket_unpin(first_id)
+box.space._bucket:get{first_id}.status
+vshard.storage.bucket_unpin(first_id)
+box.space._bucket:get{first_id}.status
+
+-- Test that can not pin other buckets.
+test_run:cmd("setopt delimiter ';'")
+box.begin()
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENDING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENT}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.GARBAGE}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.rollback()
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Now pin some buckets and create such disbalance, that the
+-- rebalancer will face with unreachability of the perfect
+-- balance.
+--
+for i = 1, 800 do local ok, err = vshard.storage.bucket_pin(first_id - 1 + i) assert(ok) end
+status:count({vshard.consts.BUCKET.PINNED})
+rs1_cfg.weight = 0.5
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+-- The perfect balance is now 200-400-400, but on the replicaset 1
+-- 800 buckets are pinned, so the actual balance is 800-1100-1100.
+info = vshard.storage.info().bucket
+info.active
+info.pinned
+test_run:switch('box_2_a')
+vshard.storage.info().bucket.active
+
+test_run:switch('box_3_a')
+vshard.storage.info().bucket.active
+
 test_run:cmd("switch default")
 test_run:drop_cluster(REPLICASET_3)
 test_run:drop_cluster(REPLICASET_2)
diff --git a/test/rebalancer/restart_during_rebalancing.result b/test/rebalancer/restart_during_rebalancing.result
index 083f4ff..d2b55df 100644
--- a/test/rebalancer/restart_during_rebalancing.result
+++ b/test/rebalancer/restart_during_rebalancing.result
@@ -284,6 +284,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
@@ -304,6 +305,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
@@ -324,6 +326,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
@@ -344,6 +347,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
diff --git a/test/storage/storage.result b/test/storage/storage.result
index 537c85b..acf8da8 100644
--- a/test/storage/storage.result
+++ b/test/storage/storage.result
@@ -197,6 +197,7 @@ vshard.storage.info()
     active: 0
     total: 0
     garbage: 0
+    pinned: 0
     sending: 0
   status: 2
   replication:
@@ -287,6 +288,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 0
   replication:
@@ -317,6 +319,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 1
   replication:
@@ -363,6 +366,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 2
   replication:
@@ -394,6 +398,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 3
   replication:
@@ -426,6 +431,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 0
   replication:
@@ -457,6 +463,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 2
   replication:
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 99fbf7b..9f6f804 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -105,7 +105,8 @@ end
 -- buckets can accept 'read' too.
 --
 local function bucket_is_writable(bucket)
-    return bucket.status == consts.BUCKET.ACTIVE
+    return bucket.status == consts.BUCKET.ACTIVE or
+           bucket.status == consts.BUCKET.PINNED
 end
 
 --
@@ -730,6 +731,54 @@ local function bucket_send(bucket_id, destination)
     return true
 end
 
+--
+-- Pin a bucket to a replicaset. Pinned bucket can not be sent
+-- even if is breaks the cluster balance.
+-- @param bucket_id Bucket identifier to pin.
+-- @retval true A bucket is pinned.
+-- @retval nil, err A bucket can not be pinned. @A err is the
+--         reason why.
+--
+local function bucket_pin(bucket_id)
+    if type(bucket_id) ~= 'number' then
+        error('Usage: bucket_pin(bucket_id)')
+    end
+    local bucket, err = bucket_check_state(bucket_id, 'write')
+    if err then
+        return nil, err
+    end
+    assert(bucket)
+    if bucket.status ~= consts.BUCKET.PINNED then
+        assert(bucket.status == consts.BUCKET.ACTIVE)
+        box.space._bucket:update({bucket_id}, {{'=', 2, consts.BUCKET.PINNED}})
+    end
+    return true
+end
+
+--
+-- Return a pinned bucket back into active state.
+-- @param bucket_id Bucket identifier to unpin.
+-- @retval true A bucket is unpinned.
+-- @retval nil, err A bucket can not be unpinned. @A err is the
+--         reason why.
+--
+local function bucket_unpin(bucket_id)
+    if type(bucket_id) ~= 'number' then
+        error('Usage: bucket_unpin(bucket_id)')
+    end
+    local bucket, err = bucket_check_state(bucket_id, 'write')
+    if err then
+        return nil, err
+    end
+    assert(bucket)
+    if bucket.status == consts.BUCKET.PINNED then
+        box.space._bucket:update({bucket_id}, {{'=', 2, consts.BUCKET.ACTIVE}})
+    else
+        assert(bucket.status == consts.BUCKET.ACTIVE)
+    end
+    return true
+end
+
 --------------------------------------------------------------------------------
 -- Garbage collector
 --------------------------------------------------------------------------------
@@ -1633,11 +1682,13 @@ local function storage_info()
         state.bucket.lock = true
     end
     local status = box.space._bucket.index.status
+    local pinned = status:count({consts.BUCKET.PINNED})
     state.bucket.total = box.space._bucket:count()
-    state.bucket.active = status:count({consts.BUCKET.ACTIVE})
+    state.bucket.active = status:count({consts.BUCKET.ACTIVE}) + pinned
     state.bucket.garbage = status:count({consts.BUCKET.SENT})
     state.bucket.receiving = status:count({consts.BUCKET.RECEIVING})
     state.bucket.sending = status:count({consts.BUCKET.SENDING})
+    state.bucket.pinned = pinned
     if state.bucket.receiving ~= 0 and state.bucket.sending ~= 0 then
         --
         --Some buckets are receiving and some buckets are sending at same time,
@@ -1758,6 +1809,8 @@ return {
     bucket_recv = bucket_recv,
     bucket_send = bucket_send,
     bucket_stat = bucket_stat,
+    bucket_pin = bucket_pin,
+    bucket_unpin = bucket_unpin,
     bucket_delete_garbage = bucket_delete_garbage,
     garbage_collector_wakeup = garbage_collector_wakeup,
     rebalancer_wakeup = rebalancer_wakeup,
-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [tarantool-patches] [PATCH vshard 7/7] rfc: add RFC for replicaset lock and bucket pin
  2018-03-27 21:24 [tarantool-patches] [PATCH vshard 0/7] Replicaset lock and bucket pin Vladislav Shpilevoy
                   ` (5 preceding siblings ...)
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 6/7] storage: open public API to pin/unpin buckets Vladislav Shpilevoy
@ 2018-03-27 21:24 ` Vladislav Shpilevoy
  2018-03-30  4:15 ` [tarantool-patches] Re: [PATCH vshard 0/7] Replicaset " Georgy Kirichenko
  7 siblings, 0 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-27 21:24 UTC (permalink / raw)
  To: tarantool-patches; +Cc: georgy, Vladislav Shpilevoy

---
 docs/RFC/replicaset_lock_and_bucket.md | 84 ++++++++++++++++++++++++++++++++++
 1 file changed, 84 insertions(+)
 create mode 100644 docs/RFC/replicaset_lock_and_bucket.md

diff --git a/docs/RFC/replicaset_lock_and_bucket.md b/docs/RFC/replicaset_lock_and_bucket.md
new file mode 100644
index 0000000..1bf39c9
--- /dev/null
+++ b/docs/RFC/replicaset_lock_and_bucket.md
@@ -0,0 +1,84 @@
+# Replicaset lock and bucket pin
+
+* **Status**: In progress
+* **Start date**: 19-03-2018
+* **Authors**: Vladislav Shpilevoy @Gerold103 <v.shpilevoy@tarantool.org>
+* **Issues**: [#71](https://github.com/tarantool/vshard/issues/71)
+
+## Summary
+
+Replicaset lock makes it invisible for the rebalancer - a locked replicaset can neither receive new buckets nor send its own.
+Bucket pin blocks this concrete bucket sending - it will stay on a replicaset to which pinned, until it is unpinned.
+Pinning all replicaset buckets is not the same as replicaset locking - even if you pin all buckets, the **non-locked replicaset still can receive new buckets**.
+
+## Background and motivation
+
+Replicaset lock allows, for example, to separate a replicaset for testsing from production replicasets. Or to preserve some application metadata, that must not be sharded for a while. Bucket pin allows the same, but in the smaller scope.
+
+Difference between replicaset lock and all buckets pinning is motivated by ability to button-up an entire replicaset.
+
+Mostly locked and pinned buckets affect the rebalancing algorithm, which must ignore locked replicasets, and take pinned buckets into account, attempting to reach the best possible balance. It is not a trivial task, because a user can pin to a replicaset so many buckets, that a perfect balance is unreachable. For example, see the cluster (assume all weights are equal to 1):
+```
+rs - replicaset
+
+rs1: bucket_count = 150
+rs2: bucket_count = 150, pinned_count 120
+
+Add a replicaset:
+
+rs1: bucket_count = 150
+rs2: bucket_count = 150, pinned_count 120
+rs3: bucket_count = 0
+```
+Here the perfect balance is `100 - 100 - 100` that is impossible, since the replicaset rs2 have 120 pinned buckets. So the best reachable balance here is the following:
+```
+rs1: bucket_count = 90
+rs2: bucket_count = 120, pinned_count 120
+rs3: bucket_count = 90
+```
+
+Here the rebalancer moved from rs2 as many buckets as could to decrease disbalance. At the same time it respected equal weights of rs1 and rs3.
+
+## Detailed design
+
+The algorithms of respecting locks and pins are completely different despite of the similar functionality, and are considered separately.
+
+### Replicaset lock and rebalancing
+
+When a replicaset is locked, it simply does not participate in rebalancing. It means, that even if its perfect bucket count is not equal to an actual one, this disbalance can not be fixed due to lock. When the rebalancer detects that one of replicasets appears to be locked, it recalculates perfect bucket count of non-locked replicasets as if the locked replicaset and its buckets does not exist.
+
+### Bucket pin and rebalancing
+
+It is the much more complex case, that splits the rebalancing algorithm in the several steps:
+
+1. The rebalancer calculates perfect bucket count as if all buckets are not pinned. Then it looks at each replicaset and compares its new perfect bucket count against pinned bucket count. If the pinned one is less - it is ok. A non-locked replicaset (on this step all locked replicasets already are filtered out) with pinned buckets can receive new ones.
+
+2. If perfect bucket count is less, than pinned one, this disbalance can not be fixed - the rebalancer can not move pinned buckets out of this replicaset. In such a case perfect bucket count of this replicasets are set to the exactly pinned one. These replicasets are not considered by the rebalancer then, and their pinned count is subtracted from a total bucket count. Here the rebalancer tries to move out of such replicasets as many buckets as possible.
+
+3. The described procecure is restarted from the step 1 with new total bucket count and with replicasets, those perfect bucket count >= pinned one, until it appears, that on all replicasets perfect bucket count >= pinned one.
+
+Pseudocode:
+```
+function cluster_calculate_perfect_balance(replicasets, bucket_count)
+	-- spread buckets over still considered replicasets using weights --
+end;
+
+cluster = <all of non-locked replicasets>;
+bucket_count = <total bucket count in the cluster>;
+can_reach_balance = false
+while not can_reach_balance do
+	can_reach_balance = true
+	cluster_calculate_perfect_balance(cluster, bucket_count);
+	foreach replicaset in cluster do
+		if replicaset.perfect_bucket_count <
+		   replicaset.pinned_bucket_count then
+			can_reach_balance = false
+			bucket_count -= replicaset.pinned_bucket_count;
+			replicaset.perfect_bucket_count =
+				replicaset.pinned_bucket_count;
+		end;
+	end;
+end;
+cluster_calculate_perfect_balance(cluster, bucket_count);
+```
+Complexity of the algorithm is `O(N^2)`, where `N` is replicaset count. On each step it either finishes the calculation, or ignores at least one new replicaset, which is overpopulated due to pinned buckets, and updates perfect bucket count of others.
-- 
2.14.3 (Apple Git-98)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [tarantool-patches] Re: [PATCH vshard 0/7] Replicaset lock and bucket pin
  2018-03-27 21:24 [tarantool-patches] [PATCH vshard 0/7] Replicaset lock and bucket pin Vladislav Shpilevoy
                   ` (6 preceding siblings ...)
  2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 7/7] rfc: add RFC for replicaset lock and bucket pin Vladislav Shpilevoy
@ 2018-03-30  4:15 ` Georgy Kirichenko
  7 siblings, 0 replies; 9+ messages in thread
From: Georgy Kirichenko @ 2018-03-30  4:15 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Vladislav Shpilevoy

[-- Attachment #1: Type: text/plain, Size: 3049 bytes --]

Seems to be Ok

On Wednesday, March 28, 2018 12:24:07 AM MSK Vladislav Shpilevoy wrote:
> Branch: http://github.com/tarantool/tarantool/tree/gh-71-bucket-pin-lock
> Issue: https://github.com/tarantool/vshard/issues/71
> 
> Replicaset lock makes it invisible for the rebalancer - a locked
> replicaset can neither receive new buckets nor send its own.
> Bucket pin blocks this concrete bucket sending - it will stay on a
> replicaset to which pinned, until it is unpinned. Pinning all
> replicaset buckets is not the same as replicaset locking - even if
> you pin all buckets, the non-locked replicaset still can receive
> new buckets.
> 
> Replicaset lock allows, for example, to separate a replicaset for
> testsing from production replicasets. Or to preserve some
> application metadata, that must not be sharded for a while. Bucket
> pin allows the same, but in the smaller scope.
> 
> Difference between replicaset lock and all buckets pinning is
> motivated by ability to button-up an entire replicaset.
> 
> Mostly locked and pinned buckets affect the rebalancing algorithm,
> which must ignore locked replicasets, and take pinned buckets into
> account, attempting to reach the best possible balance. It is not
> a trivial task, because a user can pin to a replicaset so many
> buckets, that a perfect balance is unreachable.
> 
> Vladislav Shpilevoy (7):
>   rebalancer: allow to lock a replicaset from rebalancing
>   rebalancer: remember the currently sending bucket id
>   storage: rework recovery
>   storage: wrap bucket status checks into functions
>   rebalancer: introduce pinned bucket concept into rebalancer algo
>   storage: open public API to pin/unpin buckets
>   rfc: add RFC for replicaset lock and bucket pin
> 
>  docs/RFC/replicaset_lock_and_bucket.md            |  84 ++++
>  test/rebalancer/box_1_a.lua                       |   2 +-
>  test/rebalancer/rebalancer_lock_and_pin.result    | 503
> ++++++++++++++++++++++ test/rebalancer/rebalancer_lock_and_pin.test.lua  |
> 224 ++++++++++ test/rebalancer/restart_during_rebalancing.result |   4 +
>  test/storage/recovery.result                      |  85 +++-
>  test/storage/recovery.test.lua                    |  38 +-
>  test/storage/storage.result                       |   7 +
>  test/unit/rebalancer.result                       | 333 +++++++++++++-
>  test/unit/rebalancer.test.lua                     |  79 ++++
>  vshard/cfg.lua                                    |   3 +-
>  vshard/consts.lua                                 |   1 +
>  vshard/error.lua                                  |  12 +-
>  vshard/replicaset.lua                             | 105 +++--
>  vshard/router/init.lua                            |  46 +-
>  vshard/storage/init.lua                           | 418 +++++++++++-------
>  16 files changed, 1713 insertions(+), 231 deletions(-)
>  create mode 100644 docs/RFC/replicaset_lock_and_bucket.md
>  create mode 100644 test/rebalancer/rebalancer_lock_and_pin.result
>  create mode 100644 test/rebalancer/rebalancer_lock_and_pin.test.lua


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2018-03-30  4:15 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-03-27 21:24 [tarantool-patches] [PATCH vshard 0/7] Replicaset lock and bucket pin Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 1/7] rebalancer: allow to lock a replicaset from rebalancing Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 2/7] rebalancer: remember the currently sending bucket id Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 3/7] storage: rework recovery Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 4/7] storage: wrap bucket status checks into functions Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 5/7] rebalancer: introduce pinned bucket concept into rebalancer algo Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 6/7] storage: open public API to pin/unpin buckets Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 7/7] rfc: add RFC for replicaset lock and bucket pin Vladislav Shpilevoy
2018-03-30  4:15 ` [tarantool-patches] Re: [PATCH vshard 0/7] Replicaset " Georgy Kirichenko

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox