[tarantool-patches] [PATCH vshard 6/7] storage: open public API to pin/unpin buckets
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Mar 28 00:24:13 MSK 2018
Closes #71
---
test/rebalancer/rebalancer_lock_and_pin.result | 188 ++++++++++++++++++++++
test/rebalancer/rebalancer_lock_and_pin.test.lua | 85 ++++++++++
test/rebalancer/restart_during_rebalancing.result | 4 +
test/storage/storage.result | 7 +
vshard/storage/init.lua | 57 ++++++-
5 files changed, 339 insertions(+), 2 deletions(-)
diff --git a/test/rebalancer/rebalancer_lock_and_pin.result b/test/rebalancer/rebalancer_lock_and_pin.result
index b61cc84..95a160a 100644
--- a/test/rebalancer/rebalancer_lock_and_pin.result
+++ b/test/rebalancer/rebalancer_lock_and_pin.result
@@ -300,6 +300,194 @@ vshard.storage.info().bucket.active
---
- 1000
...
+--
+-- Test bucket pinning. At first, return to the default
+-- configuration.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+vshard.storage.info().bucket.active
+---
+- 1000
+...
+status = box.space._bucket.index.status
+---
+...
+first_id = status:select(vshard.consts.BUCKET.ACTIVE, {limit = 1})[1].id
+---
+...
+-- Test that double pin is ok.
+vshard.storage.bucket_pin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- pinned
+...
+vshard.storage.bucket_pin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- pinned
+...
+-- Test that double unpin after pin is ok.
+vshard.storage.bucket_unpin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- active
+...
+vshard.storage.bucket_unpin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- active
+...
+-- Test that can not pin other buckets.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+box.begin()
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENDING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENT}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.GARBAGE}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.rollback()
+test_run:cmd("setopt delimiter ''");
+---
+...
+--
+-- Now pin some buckets and create such disbalance, that the
+-- rebalancer will face with unreachability of the perfect
+-- balance.
+--
+for i = 1, 800 do local ok, err = vshard.storage.bucket_pin(first_id - 1 + i) assert(ok) end
+---
+...
+status:count({vshard.consts.BUCKET.PINNED})
+---
+- 800
+...
+rs1_cfg.weight = 0.5
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+-- The perfect balance is now 200-400-400, but on the replicaset 1
+-- 800 buckets are pinned, so the actual balance is 800-1100-1100.
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 800
+...
+info.pinned
+---
+- 800
+...
+test_run:switch('box_2_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 1100
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 1100
+...
test_run:cmd("switch default")
---
- true
diff --git a/test/rebalancer/rebalancer_lock_and_pin.test.lua b/test/rebalancer/rebalancer_lock_and_pin.test.lua
index d0f2163..afef135 100644
--- a/test/rebalancer/rebalancer_lock_and_pin.test.lua
+++ b/test/rebalancer/rebalancer_lock_and_pin.test.lua
@@ -133,6 +133,91 @@ vshard.storage.info().bucket.active
test_run:switch('box_3_a')
vshard.storage.info().bucket.active
+--
+-- Test bucket pinning. At first, return to the default
+-- configuration.
+--
+test_run:switch('box_2_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+test_run:switch('box_3_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+test_run:switch('box_1_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+vshard.storage.info().bucket.active
+status = box.space._bucket.index.status
+first_id = status:select(vshard.consts.BUCKET.ACTIVE, {limit = 1})[1].id
+-- Test that double pin is ok.
+vshard.storage.bucket_pin(first_id)
+box.space._bucket:get{first_id}.status
+vshard.storage.bucket_pin(first_id)
+box.space._bucket:get{first_id}.status
+
+-- Test that double unpin after pin is ok.
+vshard.storage.bucket_unpin(first_id)
+box.space._bucket:get{first_id}.status
+vshard.storage.bucket_unpin(first_id)
+box.space._bucket:get{first_id}.status
+
+-- Test that can not pin other buckets.
+test_run:cmd("setopt delimiter ';'")
+box.begin()
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENDING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENT}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.GARBAGE}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.rollback()
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Now pin some buckets and create such disbalance, that the
+-- rebalancer will face with unreachability of the perfect
+-- balance.
+--
+for i = 1, 800 do local ok, err = vshard.storage.bucket_pin(first_id - 1 + i) assert(ok) end
+status:count({vshard.consts.BUCKET.PINNED})
+rs1_cfg.weight = 0.5
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+-- The perfect balance is now 200-400-400, but on the replicaset 1
+-- 800 buckets are pinned, so the actual balance is 800-1100-1100.
+info = vshard.storage.info().bucket
+info.active
+info.pinned
+test_run:switch('box_2_a')
+vshard.storage.info().bucket.active
+
+test_run:switch('box_3_a')
+vshard.storage.info().bucket.active
+
test_run:cmd("switch default")
test_run:drop_cluster(REPLICASET_3)
test_run:drop_cluster(REPLICASET_2)
diff --git a/test/rebalancer/restart_during_rebalancing.result b/test/rebalancer/restart_during_rebalancing.result
index 083f4ff..d2b55df 100644
--- a/test/rebalancer/restart_during_rebalancing.result
+++ b/test/rebalancer/restart_during_rebalancing.result
@@ -284,6 +284,7 @@ vshard.storage.info().bucket
active: 750
total: 750
garbage: 0
+ pinned: 0
sending: 0
...
vshard.storage.internal.buckets_to_recovery
@@ -304,6 +305,7 @@ vshard.storage.info().bucket
active: 750
total: 750
garbage: 0
+ pinned: 0
sending: 0
...
vshard.storage.internal.buckets_to_recovery
@@ -324,6 +326,7 @@ vshard.storage.info().bucket
active: 750
total: 750
garbage: 0
+ pinned: 0
sending: 0
...
vshard.storage.internal.buckets_to_recovery
@@ -344,6 +347,7 @@ vshard.storage.info().bucket
active: 750
total: 750
garbage: 0
+ pinned: 0
sending: 0
...
vshard.storage.internal.buckets_to_recovery
diff --git a/test/storage/storage.result b/test/storage/storage.result
index 537c85b..acf8da8 100644
--- a/test/storage/storage.result
+++ b/test/storage/storage.result
@@ -197,6 +197,7 @@ vshard.storage.info()
active: 0
total: 0
garbage: 0
+ pinned: 0
sending: 0
status: 2
replication:
@@ -287,6 +288,7 @@ vshard.storage.info()
active: 2
total: 2
garbage: 0
+ pinned: 0
sending: 0
status: 0
replication:
@@ -317,6 +319,7 @@ vshard.storage.info()
active: 2
total: 2
garbage: 0
+ pinned: 0
sending: 0
status: 1
replication:
@@ -363,6 +366,7 @@ vshard.storage.info()
active: 2
total: 2
garbage: 0
+ pinned: 0
sending: 0
status: 2
replication:
@@ -394,6 +398,7 @@ vshard.storage.info()
active: 2
total: 2
garbage: 0
+ pinned: 0
sending: 0
status: 3
replication:
@@ -426,6 +431,7 @@ vshard.storage.info()
active: 2
total: 2
garbage: 0
+ pinned: 0
sending: 0
status: 0
replication:
@@ -457,6 +463,7 @@ vshard.storage.info()
active: 2
total: 2
garbage: 0
+ pinned: 0
sending: 0
status: 2
replication:
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 99fbf7b..9f6f804 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -105,7 +105,8 @@ end
-- buckets can accept 'read' too.
--
local function bucket_is_writable(bucket)
- return bucket.status == consts.BUCKET.ACTIVE
+ return bucket.status == consts.BUCKET.ACTIVE or
+ bucket.status == consts.BUCKET.PINNED
end
--
@@ -730,6 +731,54 @@ local function bucket_send(bucket_id, destination)
return true
end
+--
+-- Pin a bucket to a replicaset. Pinned bucket can not be sent
+-- even if is breaks the cluster balance.
+-- @param bucket_id Bucket identifier to pin.
+-- @retval true A bucket is pinned.
+-- @retval nil, err A bucket can not be pinned. @A err is the
+-- reason why.
+--
+local function bucket_pin(bucket_id)
+ if type(bucket_id) ~= 'number' then
+ error('Usage: bucket_pin(bucket_id)')
+ end
+ local bucket, err = bucket_check_state(bucket_id, 'write')
+ if err then
+ return nil, err
+ end
+ assert(bucket)
+ if bucket.status ~= consts.BUCKET.PINNED then
+ assert(bucket.status == consts.BUCKET.ACTIVE)
+ box.space._bucket:update({bucket_id}, {{'=', 2, consts.BUCKET.PINNED}})
+ end
+ return true
+end
+
+--
+-- Return a pinned bucket back into active state.
+-- @param bucket_id Bucket identifier to unpin.
+-- @retval true A bucket is unpinned.
+-- @retval nil, err A bucket can not be unpinned. @A err is the
+-- reason why.
+--
+local function bucket_unpin(bucket_id)
+ if type(bucket_id) ~= 'number' then
+ error('Usage: bucket_unpin(bucket_id)')
+ end
+ local bucket, err = bucket_check_state(bucket_id, 'write')
+ if err then
+ return nil, err
+ end
+ assert(bucket)
+ if bucket.status == consts.BUCKET.PINNED then
+ box.space._bucket:update({bucket_id}, {{'=', 2, consts.BUCKET.ACTIVE}})
+ else
+ assert(bucket.status == consts.BUCKET.ACTIVE)
+ end
+ return true
+end
+
--------------------------------------------------------------------------------
-- Garbage collector
--------------------------------------------------------------------------------
@@ -1633,11 +1682,13 @@ local function storage_info()
state.bucket.lock = true
end
local status = box.space._bucket.index.status
+ local pinned = status:count({consts.BUCKET.PINNED})
state.bucket.total = box.space._bucket:count()
- state.bucket.active = status:count({consts.BUCKET.ACTIVE})
+ state.bucket.active = status:count({consts.BUCKET.ACTIVE}) + pinned
state.bucket.garbage = status:count({consts.BUCKET.SENT})
state.bucket.receiving = status:count({consts.BUCKET.RECEIVING})
state.bucket.sending = status:count({consts.BUCKET.SENDING})
+ state.bucket.pinned = pinned
if state.bucket.receiving ~= 0 and state.bucket.sending ~= 0 then
--
--Some buckets are receiving and some buckets are sending at same time,
@@ -1758,6 +1809,8 @@ return {
bucket_recv = bucket_recv,
bucket_send = bucket_send,
bucket_stat = bucket_stat,
+ bucket_pin = bucket_pin,
+ bucket_unpin = bucket_unpin,
bucket_delete_garbage = bucket_delete_garbage,
garbage_collector_wakeup = garbage_collector_wakeup,
rebalancer_wakeup = rebalancer_wakeup,
--
2.14.3 (Apple Git-98)
More information about the Tarantool-patches
mailing list