Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: georgy@tarantool.org, Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Subject: [tarantool-patches] [PATCH vshard 6/7] storage: open public API to pin/unpin buckets
Date: Wed, 28 Mar 2018 00:24:13 +0300	[thread overview]
Message-ID: <327c7467df295dcf34ea86b8735a303797a50546.1522185711.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1522185711.git.v.shpilevoy@tarantool.org>
In-Reply-To: <cover.1522185711.git.v.shpilevoy@tarantool.org>

Closes #71
---
 test/rebalancer/rebalancer_lock_and_pin.result    | 188 ++++++++++++++++++++++
 test/rebalancer/rebalancer_lock_and_pin.test.lua  |  85 ++++++++++
 test/rebalancer/restart_during_rebalancing.result |   4 +
 test/storage/storage.result                       |   7 +
 vshard/storage/init.lua                           |  57 ++++++-
 5 files changed, 339 insertions(+), 2 deletions(-)

diff --git a/test/rebalancer/rebalancer_lock_and_pin.result b/test/rebalancer/rebalancer_lock_and_pin.result
index b61cc84..95a160a 100644
--- a/test/rebalancer/rebalancer_lock_and_pin.result
+++ b/test/rebalancer/rebalancer_lock_and_pin.result
@@ -300,6 +300,194 @@ vshard.storage.info().bucket.active
 ---
 - 1000
 ...
+--
+-- Test bucket pinning. At first, return to the default
+-- configuration.
+--
+test_run:switch('box_2_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+---
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+---
+...
+test_run:switch('box_1_a')
+---
+- true
+...
+rs1_cfg.lock = false
+---
+...
+rs1_cfg.weight = 1
+---
+...
+rs2_cfg.lock = false
+---
+...
+rs2_cfg.weight = 1
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+vshard.storage.info().bucket.active
+---
+- 1000
+...
+status = box.space._bucket.index.status
+---
+...
+first_id = status:select(vshard.consts.BUCKET.ACTIVE, {limit = 1})[1].id
+---
+...
+-- Test that double pin is ok.
+vshard.storage.bucket_pin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- pinned
+...
+vshard.storage.bucket_pin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- pinned
+...
+-- Test that double unpin after pin is ok.
+vshard.storage.bucket_unpin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- active
+...
+vshard.storage.bucket_unpin(first_id)
+---
+- true
+...
+box.space._bucket:get{first_id}.status
+---
+- active
+...
+-- Test that can not pin other buckets.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+box.begin()
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENDING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENT}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.GARBAGE}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.rollback()
+test_run:cmd("setopt delimiter ''");
+---
+...
+--
+-- Now pin some buckets and create such disbalance, that the
+-- rebalancer will face with unreachability of the perfect
+-- balance.
+--
+for i = 1, 800 do local ok, err = vshard.storage.bucket_pin(first_id - 1 + i) assert(ok) end
+---
+...
+status:count({vshard.consts.BUCKET.PINNED})
+---
+- 800
+...
+rs1_cfg.weight = 0.5
+---
+...
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+---
+...
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+---
+...
+-- The perfect balance is now 200-400-400, but on the replicaset 1
+-- 800 buckets are pinned, so the actual balance is 800-1100-1100.
+info = vshard.storage.info().bucket
+---
+...
+info.active
+---
+- 800
+...
+info.pinned
+---
+- 800
+...
+test_run:switch('box_2_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 1100
+...
+test_run:switch('box_3_a')
+---
+- true
+...
+vshard.storage.info().bucket.active
+---
+- 1100
+...
 test_run:cmd("switch default")
 ---
 - true
diff --git a/test/rebalancer/rebalancer_lock_and_pin.test.lua b/test/rebalancer/rebalancer_lock_and_pin.test.lua
index d0f2163..afef135 100644
--- a/test/rebalancer/rebalancer_lock_and_pin.test.lua
+++ b/test/rebalancer/rebalancer_lock_and_pin.test.lua
@@ -133,6 +133,91 @@ vshard.storage.info().bucket.active
 test_run:switch('box_3_a')
 vshard.storage.info().bucket.active
 
+--
+-- Test bucket pinning. At first, return to the default
+-- configuration.
+--
+test_run:switch('box_2_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_2_a)
+test_run:switch('box_3_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_3_a)
+test_run:switch('box_1_a')
+rs1_cfg.lock = false
+rs1_cfg.weight = 1
+rs2_cfg.lock = false
+rs2_cfg.weight = 1
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+vshard.storage.info().bucket.active
+status = box.space._bucket.index.status
+first_id = status:select(vshard.consts.BUCKET.ACTIVE, {limit = 1})[1].id
+-- Test that double pin is ok.
+vshard.storage.bucket_pin(first_id)
+box.space._bucket:get{first_id}.status
+vshard.storage.bucket_pin(first_id)
+box.space._bucket:get{first_id}.status
+
+-- Test that double unpin after pin is ok.
+vshard.storage.bucket_unpin(first_id)
+box.space._bucket:get{first_id}.status
+vshard.storage.bucket_unpin(first_id)
+box.space._bucket:get{first_id}.status
+
+-- Test that can not pin other buckets.
+test_run:cmd("setopt delimiter ';'")
+box.begin()
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENDING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENT}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.GARBAGE}})
+ok, err = vshard.storage.bucket_pin(first_id)
+assert(not ok and err)
+ok, err = vshard.storage.bucket_unpin(first_id)
+assert(not ok and err)
+box.rollback()
+test_run:cmd("setopt delimiter ''");
+
+--
+-- Now pin some buckets and create such disbalance, that the
+-- rebalancer will face with unreachability of the perfect
+-- balance.
+--
+for i = 1, 800 do local ok, err = vshard.storage.bucket_pin(first_id - 1 + i) assert(ok) end
+status:count({vshard.consts.BUCKET.PINNED})
+rs1_cfg.weight = 0.5
+vshard.storage.cfg(cfg, names.replica_uuid.box_1_a)
+wait_rebalancer_state('The cluster is balanced ok', test_run)
+-- The perfect balance is now 200-400-400, but on the replicaset 1
+-- 800 buckets are pinned, so the actual balance is 800-1100-1100.
+info = vshard.storage.info().bucket
+info.active
+info.pinned
+test_run:switch('box_2_a')
+vshard.storage.info().bucket.active
+
+test_run:switch('box_3_a')
+vshard.storage.info().bucket.active
+
 test_run:cmd("switch default")
 test_run:drop_cluster(REPLICASET_3)
 test_run:drop_cluster(REPLICASET_2)
diff --git a/test/rebalancer/restart_during_rebalancing.result b/test/rebalancer/restart_during_rebalancing.result
index 083f4ff..d2b55df 100644
--- a/test/rebalancer/restart_during_rebalancing.result
+++ b/test/rebalancer/restart_during_rebalancing.result
@@ -284,6 +284,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
@@ -304,6 +305,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
@@ -324,6 +326,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
@@ -344,6 +347,7 @@ vshard.storage.info().bucket
   active: 750
   total: 750
   garbage: 0
+  pinned: 0
   sending: 0
 ...
 vshard.storage.internal.buckets_to_recovery
diff --git a/test/storage/storage.result b/test/storage/storage.result
index 537c85b..acf8da8 100644
--- a/test/storage/storage.result
+++ b/test/storage/storage.result
@@ -197,6 +197,7 @@ vshard.storage.info()
     active: 0
     total: 0
     garbage: 0
+    pinned: 0
     sending: 0
   status: 2
   replication:
@@ -287,6 +288,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 0
   replication:
@@ -317,6 +319,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 1
   replication:
@@ -363,6 +366,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 2
   replication:
@@ -394,6 +398,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 3
   replication:
@@ -426,6 +431,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 0
   replication:
@@ -457,6 +463,7 @@ vshard.storage.info()
     active: 2
     total: 2
     garbage: 0
+    pinned: 0
     sending: 0
   status: 2
   replication:
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 99fbf7b..9f6f804 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -105,7 +105,8 @@ end
 -- buckets can accept 'read' too.
 --
 local function bucket_is_writable(bucket)
-    return bucket.status == consts.BUCKET.ACTIVE
+    return bucket.status == consts.BUCKET.ACTIVE or
+           bucket.status == consts.BUCKET.PINNED
 end
 
 --
@@ -730,6 +731,54 @@ local function bucket_send(bucket_id, destination)
     return true
 end
 
+--
+-- Pin a bucket to a replicaset. Pinned bucket can not be sent
+-- even if is breaks the cluster balance.
+-- @param bucket_id Bucket identifier to pin.
+-- @retval true A bucket is pinned.
+-- @retval nil, err A bucket can not be pinned. @A err is the
+--         reason why.
+--
+local function bucket_pin(bucket_id)
+    if type(bucket_id) ~= 'number' then
+        error('Usage: bucket_pin(bucket_id)')
+    end
+    local bucket, err = bucket_check_state(bucket_id, 'write')
+    if err then
+        return nil, err
+    end
+    assert(bucket)
+    if bucket.status ~= consts.BUCKET.PINNED then
+        assert(bucket.status == consts.BUCKET.ACTIVE)
+        box.space._bucket:update({bucket_id}, {{'=', 2, consts.BUCKET.PINNED}})
+    end
+    return true
+end
+
+--
+-- Return a pinned bucket back into active state.
+-- @param bucket_id Bucket identifier to unpin.
+-- @retval true A bucket is unpinned.
+-- @retval nil, err A bucket can not be unpinned. @A err is the
+--         reason why.
+--
+local function bucket_unpin(bucket_id)
+    if type(bucket_id) ~= 'number' then
+        error('Usage: bucket_unpin(bucket_id)')
+    end
+    local bucket, err = bucket_check_state(bucket_id, 'write')
+    if err then
+        return nil, err
+    end
+    assert(bucket)
+    if bucket.status == consts.BUCKET.PINNED then
+        box.space._bucket:update({bucket_id}, {{'=', 2, consts.BUCKET.ACTIVE}})
+    else
+        assert(bucket.status == consts.BUCKET.ACTIVE)
+    end
+    return true
+end
+
 --------------------------------------------------------------------------------
 -- Garbage collector
 --------------------------------------------------------------------------------
@@ -1633,11 +1682,13 @@ local function storage_info()
         state.bucket.lock = true
     end
     local status = box.space._bucket.index.status
+    local pinned = status:count({consts.BUCKET.PINNED})
     state.bucket.total = box.space._bucket:count()
-    state.bucket.active = status:count({consts.BUCKET.ACTIVE})
+    state.bucket.active = status:count({consts.BUCKET.ACTIVE}) + pinned
     state.bucket.garbage = status:count({consts.BUCKET.SENT})
     state.bucket.receiving = status:count({consts.BUCKET.RECEIVING})
     state.bucket.sending = status:count({consts.BUCKET.SENDING})
+    state.bucket.pinned = pinned
     if state.bucket.receiving ~= 0 and state.bucket.sending ~= 0 then
         --
         --Some buckets are receiving and some buckets are sending at same time,
@@ -1758,6 +1809,8 @@ return {
     bucket_recv = bucket_recv,
     bucket_send = bucket_send,
     bucket_stat = bucket_stat,
+    bucket_pin = bucket_pin,
+    bucket_unpin = bucket_unpin,
     bucket_delete_garbage = bucket_delete_garbage,
     garbage_collector_wakeup = garbage_collector_wakeup,
     rebalancer_wakeup = rebalancer_wakeup,
-- 
2.14.3 (Apple Git-98)

  parent reply	other threads:[~2018-03-27 21:24 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-03-27 21:24 [tarantool-patches] [PATCH vshard 0/7] Replicaset lock and bucket pin Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 1/7] rebalancer: allow to lock a replicaset from rebalancing Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 2/7] rebalancer: remember the currently sending bucket id Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 3/7] storage: rework recovery Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 4/7] storage: wrap bucket status checks into functions Vladislav Shpilevoy
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 5/7] rebalancer: introduce pinned bucket concept into rebalancer algo Vladislav Shpilevoy
2018-03-27 21:24 ` Vladislav Shpilevoy [this message]
2018-03-27 21:24 ` [tarantool-patches] [PATCH vshard 7/7] rfc: add RFC for replicaset lock and bucket pin Vladislav Shpilevoy
2018-03-30  4:15 ` [tarantool-patches] Re: [PATCH vshard 0/7] Replicaset " Georgy Kirichenko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=327c7467df295dcf34ea86b8735a303797a50546.1522185711.git.v.shpilevoy@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=georgy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [tarantool-patches] [PATCH vshard 6/7] storage: open public API to pin/unpin buckets' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox