[tarantool-patches] [PATCH vshard 6/7] storage: open public API to pin/unpin buckets

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Mar 28 00:24:13 MSK 2018


Closes #71
---
 test/rebalancer/rebalancer_lock_and_pin.result    | 188 ++++++++++++++++++++++
 test/rebalancer/rebalancer_lock_and_pin.test.lua  |  85 ++++++++++
 test/rebalancer/restart_during_rebalancing.result |   4 +
 test/storage/storage.result                       |   7 +
 vshard/storage/init.lua                           |  57 ++++++-
 5 files changed, 339 insertions(+), 2 deletions(-)

diff --git a/test/rebalancer/rebalancer_lock_and_pin.result b/test/rebalancer/rebalancer_lock_and_pin.result
index b61cc84..95a160a 100644
--- a/test/rebalancer/rebalancer_lock_and_pin.result
+++ b/test/rebalancer/rebalancer_lock_and_pin.result
@@ -300,6 +300,194 @@ vshard.storage.info().bucket.active
 ---
 - 1000
 ...
+--
+-- Test bucket pinning. At first, return to the default
+-- configuration.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+vshard.storage.info().bucket.active
+---
+- 1000
+...
+status = box.space._bucket.index.status
+---
+...
+first_id = status:select(vshard.consts.BUCKET.ACTIVE, {limit = 1})[1].id
+---
+...
+-- Test that double pin is ok.
+vshard.storage.bucket_pin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- pinned
+...
+vshard.storage.bucket_pin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- pinned
+...
+-- Test that double unpin after pin is ok.
+vshard.storage.bucket_unpin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- active
+...
+vshard.storage.bucket_unpin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- active
+...
+-- Test that can not pin other buckets.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+box.begin()
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENDING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENT}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.GARBAGE}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.rollback()
+test_run:cmd("setopt delimiter ''");
+---
+...
+--
+-- Now pin some buckets and create such disbalance, that the
+-- rebalancer will face with unreachability of the perfect
+-- balance.
+--
+for i = 1, 800 do local ok, err = vshard.storage.bucket_pin(first_id - 1 + i) assert(ok) end
+---
+...
+status:count({vshard.consts.BUCKET.PINNED})
+---
+- 800
+...
+rs1_cfg.weight = 0.5
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+-- The perfect balance is now 200-400-400, but on the replicaset 1
+-- 800 buckets are pinned, so the actual balance is 800-1100-1100.
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 800
+...
+info.pinned
+---
+- 800
+...
+test_run:switch('box_2_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 1100
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 1100
+...
 test_run:cmd("switch default")
 ---
 - true
diff --git a/test/rebalancer/rebalancer_lock_and_pin.test.lua b/test/rebalancer/rebalancer_lock_and_pin.test.lua
index d0f2163..afef135 100644
--- a/test/rebalancer/rebalancer_lock_and_pin.test.lua
+++ b/test/rebalancer/rebalancer_lock_and_pin.test.lua
@@ -133,6 +133,91 @@ vshard.storage.info().bucket.active
 test_run:switch('box_3_a')
 vshard.storage.info().bucket.active
 
+--
+-- Test bucket pinning. At first, return to the default
+-- configuration.
+--
+test_run:switch('box_2_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+test_run:switch('box_3_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+test_run:switch('box_1_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+vshard.storage.info().bucket.active
+status = box.space._bucket.index.status
+first_id = status:select(vshard.consts.BUCKET.ACTIVE, {limit = 1})[1].id
+-- Test that double pin is ok.
+vshard.storage.bucket_pin(first_id)
+box.space._bucket:get{first_id}.status
+vshard.storage.bucket_pin(first_id)
+box.space._bucket:get{first_id}.status
+
+-- Test that double unpin after pin is ok.
+vshard.storage.bucket_unpin(first_id)
+box.space._bucket:get{first_id}.status
+vshard.storage.bucket_unpin(first_id)
+box.space._bucket:get{first_id}.status
+
+-- Test that can not pin other buckets.
+test_run:cmd("setopt delimiter ';'")
+box.begin()
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENDING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENT}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.GARBAGE}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.rollback()
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Now pin some buckets and create such disbalance, that the
+-- rebalancer will face with unreachability of the perfect
+-- balance.
+--
+for i = 1, 800 do local ok, err = vshard.storage.bucket_pin(first_id - 1 + i) assert(ok) end
+status:count({vshard.consts.BUCKET.PINNED})
+rs1_cfg.weight = 0.5
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+-- The perfect balance is now 200-400-400, but on the replicaset 1
+-- 800 buckets are pinned, so the actual balance is 800-1100-1100.
+info = vshard.storage.info().bucket
+info.active
+info.pinned
+test_run:switch('box_2_a')
+vshard.storage.info().bucket.active
+
+test_run:switch('box_3_a')
+vshard.storage.info().bucket.active
+
 test_run:cmd("switch default")
 test_run:drop_cluster(REPLICASET_3)
 test_run:drop_cluster(REPLICASET_2)
diff --git a/test/rebalancer/restart_during_rebalancing.result b/test/rebalancer/restart_during_rebalancing.result
index 083f4ff..d2b55df 100644
--- a/test/rebalancer/restart_during_rebalancing.result
+++ b/test/rebalancer/restart_during_rebalancing.result
@@ -284,6 +284,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
@@ -304,6 +305,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
@@ -324,6 +326,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
@@ -344,6 +347,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
diff --git a/test/storage/storage.result b/test/storage/storage.result
index 537c85b..acf8da8 100644
--- a/test/storage/storage.result
+++ b/test/storage/storage.result
@@ -197,6 +197,7 @@ vshard.storage.info()
     active: 0
     total: 0
     garbage: 0
+    pinned: 0
     sending: 0
   status: 2
   replication:
@@ -287,6 +288,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 0
   replication:
@@ -317,6 +319,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 1
   replication:
@@ -363,6 +366,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 2
   replication:
@@ -394,6 +398,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 3
   replication:
@@ -426,6 +431,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 0
   replication:
@@ -457,6 +463,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 2
   replication:
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 99fbf7b..9f6f804 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -105,7 +105,8 @@ end
 -- buckets can accept 'read' too.
 --
 local function bucket_is_writable(bucket)
-    return bucket.status == consts.BUCKET.ACTIVE
+    return bucket.status == consts.BUCKET.ACTIVE or
+           bucket.status == consts.BUCKET.PINNED
 end
 
 --
@@ -730,6 +731,54 @@ local function bucket_send(bucket_id, destination)
     return true
 end
 
+--
+-- Pin a bucket to a replicaset. Pinned bucket can not be sent
+-- even if is breaks the cluster balance.
+-- @param bucket_id Bucket identifier to pin.
+-- @retval true A bucket is pinned.
+-- @retval nil, err A bucket can not be pinned. @A err is the
+--         reason why.
+--
+local function bucket_pin(bucket_id)
+    if type(bucket_id) ~= 'number' then
+        error('Usage: bucket_pin(bucket_id)')
+    end
+    local bucket, err = bucket_check_state(bucket_id, 'write')
+    if err then
+        return nil, err
+    end
+    assert(bucket)
+    if bucket.status ~= consts.BUCKET.PINNED then
+        assert(bucket.status == consts.BUCKET.ACTIVE)
+        box.space._bucket:update({bucket_id}, {{'=', 2, consts.BUCKET.PINNED}})
+    end
+    return true
+end
+
+--
+-- Return a pinned bucket back into active state.
+-- @param bucket_id Bucket identifier to unpin.
+-- @retval true A bucket is unpinned.
+-- @retval nil, err A bucket can not be unpinned. @A err is the
+--         reason why.
+--
+local function bucket_unpin(bucket_id)
+    if type(bucket_id) ~= 'number' then
+        error('Usage: bucket_unpin(bucket_id)')
+    end
+    local bucket, err = bucket_check_state(bucket_id, 'write')
+    if err then
+        return nil, err
+    end
+    assert(bucket)
+    if bucket.status == consts.BUCKET.PINNED then
+        box.space._bucket:update({bucket_id}, {{'=', 2, consts.BUCKET.ACTIVE}})
+    else
+        assert(bucket.status == consts.BUCKET.ACTIVE)
+    end
+    return true
+end
+
 --------------------------------------------------------------------------------
 -- Garbage collector
 --------------------------------------------------------------------------------
@@ -1633,11 +1682,13 @@ local function storage_info()
         state.bucket.lock = true
     end
     local status = box.space._bucket.index.status
+    local pinned = status:count({consts.BUCKET.PINNED})
     state.bucket.total = box.space._bucket:count()
-    state.bucket.active = status:count({consts.BUCKET.ACTIVE})
+    state.bucket.active = status:count({consts.BUCKET.ACTIVE}) + pinned
     state.bucket.garbage = status:count({consts.BUCKET.SENT})
     state.bucket.receiving = status:count({consts.BUCKET.RECEIVING})
     state.bucket.sending = status:count({consts.BUCKET.SENDING})
+    state.bucket.pinned = pinned
     if state.bucket.receiving ~= 0 and state.bucket.sending ~= 0 then
         --
         --Some buckets are receiving and some buckets are sending at same time,
@@ -1758,6 +1809,8 @@ return {
     bucket_recv = bucket_recv,
     bucket_send = bucket_send,
     bucket_stat = bucket_stat,
+    bucket_pin = bucket_pin,
+    bucket_unpin = bucket_unpin,
     bucket_delete_garbage = bucket_delete_garbage,
     garbage_collector_wakeup = garbage_collector_wakeup,
     rebalancer_wakeup = rebalancer_wakeup,
-- 
2.14.3 (Apple Git-98)





More information about the Tarantool-patches mailing list