From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id C04A62C146 for ; Tue, 27 Mar 2018 17:24:22 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 0q-zSG840ALi for ; Tue, 27 Mar 2018 17:24:22 -0400 (EDT) Received: from smtp43.i.mail.ru (smtp43.i.mail.ru [94.100.177.103]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 5A01A2BCAF for ; Tue, 27 Mar 2018 17:24:22 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH vshard 6/7] storage: open public API to pin/unpin buckets Date: Wed, 28 Mar 2018 00:24:13 +0300 Message-Id: <327c7467df295dcf34ea86b8735a303797a50546.1522185711.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: georgy@tarantool.org, Vladislav Shpilevoy Closes #71 --- test/rebalancer/rebalancer_lock_and_pin.result | 188 ++++++++++++++++++++++ test/rebalancer/rebalancer_lock_and_pin.test.lua | 85 ++++++++++ test/rebalancer/restart_during_rebalancing.result | 4 + test/storage/storage.result | 7 + vshard/storage/init.lua | 57 ++++++- 5 files changed, 339 insertions(+), 2 deletions(-) diff --git a/test/rebalancer/rebalancer_lock_and_pin.result b/test/rebalancer/rebalancer_lock_and_pin.result index b61cc84..95a160a 100644 --- a/test/rebalancer/rebalancer_lock_and_pin.result +++ b/test/rebalancer/rebalancer_lock_and_pin.result @@ -300,6 +300,194 @@ vshard.storage.info().bucket.active --- - 1000 ... +-- +-- Test bucket pinning. At first, return to the default +-- configuration. +-- +test_run:switch('box_2_a') +--- +- true +... +rs1_cfg.lock = false +--- +... +rs1_cfg.weight = 1 +--- +... +rs2_cfg.lock = false +--- +... +rs2_cfg.weight = 1 +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_2_a) +--- +... +test_run:switch('box_3_a') +--- +- true +... +rs1_cfg.lock = false +--- +... +rs1_cfg.weight = 1 +--- +... +rs2_cfg.lock = false +--- +... +rs2_cfg.weight = 1 +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_3_a) +--- +... +test_run:switch('box_1_a') +--- +- true +... +rs1_cfg.lock = false +--- +... +rs1_cfg.weight = 1 +--- +... +rs2_cfg.lock = false +--- +... +rs2_cfg.weight = 1 +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +vshard.storage.info().bucket.active +--- +- 1000 +... +status = box.space._bucket.index.status +--- +... +first_id = status:select(vshard.consts.BUCKET.ACTIVE, {limit = 1})[1].id +--- +... +-- Test that double pin is ok. +vshard.storage.bucket_pin(first_id) +--- +- true +... +box.space._bucket:get{first_id}.status +--- +- pinned +... +vshard.storage.bucket_pin(first_id) +--- +- true +... +box.space._bucket:get{first_id}.status +--- +- pinned +... +-- Test that double unpin after pin is ok. +vshard.storage.bucket_unpin(first_id) +--- +- true +... +box.space._bucket:get{first_id}.status +--- +- active +... +vshard.storage.bucket_unpin(first_id) +--- +- true +... +box.space._bucket:get{first_id}.status +--- +- active +... +-- Test that can not pin other buckets. +test_run:cmd("setopt delimiter ';'") +--- +- true +... +box.begin() +box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENDING}}) +ok, err = vshard.storage.bucket_pin(first_id) +assert(not ok and err) +ok, err = vshard.storage.bucket_unpin(first_id) +assert(not ok and err) +box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}}) +ok, err = vshard.storage.bucket_pin(first_id) +assert(not ok and err) +ok, err = vshard.storage.bucket_unpin(first_id) +assert(not ok and err) +box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENT}}) +ok, err = vshard.storage.bucket_pin(first_id) +assert(not ok and err) +ok, err = vshard.storage.bucket_unpin(first_id) +assert(not ok and err) +box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.GARBAGE}}) +ok, err = vshard.storage.bucket_pin(first_id) +assert(not ok and err) +ok, err = vshard.storage.bucket_unpin(first_id) +assert(not ok and err) +box.rollback() +test_run:cmd("setopt delimiter ''"); +--- +... +-- +-- Now pin some buckets and create such disbalance, that the +-- rebalancer will face with unreachability of the perfect +-- balance. +-- +for i = 1, 800 do local ok, err = vshard.storage.bucket_pin(first_id - 1 + i) assert(ok) end +--- +... +status:count({vshard.consts.BUCKET.PINNED}) +--- +- 800 +... +rs1_cfg.weight = 0.5 +--- +... +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) +--- +... +wait_rebalancer_state('The cluster is balanced ok', test_run) +--- +... +-- The perfect balance is now 200-400-400, but on the replicaset 1 +-- 800 buckets are pinned, so the actual balance is 800-1100-1100. +info = vshard.storage.info().bucket +--- +... +info.active +--- +- 800 +... +info.pinned +--- +- 800 +... +test_run:switch('box_2_a') +--- +- true +... +vshard.storage.info().bucket.active +--- +- 1100 +... +test_run:switch('box_3_a') +--- +- true +... +vshard.storage.info().bucket.active +--- +- 1100 +... test_run:cmd("switch default") --- - true diff --git a/test/rebalancer/rebalancer_lock_and_pin.test.lua b/test/rebalancer/rebalancer_lock_and_pin.test.lua index d0f2163..afef135 100644 --- a/test/rebalancer/rebalancer_lock_and_pin.test.lua +++ b/test/rebalancer/rebalancer_lock_and_pin.test.lua @@ -133,6 +133,91 @@ vshard.storage.info().bucket.active test_run:switch('box_3_a') vshard.storage.info().bucket.active +-- +-- Test bucket pinning. At first, return to the default +-- configuration. +-- +test_run:switch('box_2_a') +rs1_cfg.lock = false +rs1_cfg.weight = 1 +rs2_cfg.lock = false +rs2_cfg.weight = 1 +vshard.storage.cfg(cfg, names.replica_uuid.box_2_a) +test_run:switch('box_3_a') +rs1_cfg.lock = false +rs1_cfg.weight = 1 +rs2_cfg.lock = false +rs2_cfg.weight = 1 +vshard.storage.cfg(cfg, names.replica_uuid.box_3_a) +test_run:switch('box_1_a') +rs1_cfg.lock = false +rs1_cfg.weight = 1 +rs2_cfg.lock = false +rs2_cfg.weight = 1 +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) +wait_rebalancer_state('The cluster is balanced ok', test_run) +vshard.storage.info().bucket.active +status = box.space._bucket.index.status +first_id = status:select(vshard.consts.BUCKET.ACTIVE, {limit = 1})[1].id +-- Test that double pin is ok. +vshard.storage.bucket_pin(first_id) +box.space._bucket:get{first_id}.status +vshard.storage.bucket_pin(first_id) +box.space._bucket:get{first_id}.status + +-- Test that double unpin after pin is ok. +vshard.storage.bucket_unpin(first_id) +box.space._bucket:get{first_id}.status +vshard.storage.bucket_unpin(first_id) +box.space._bucket:get{first_id}.status + +-- Test that can not pin other buckets. +test_run:cmd("setopt delimiter ';'") +box.begin() +box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENDING}}) +ok, err = vshard.storage.bucket_pin(first_id) +assert(not ok and err) +ok, err = vshard.storage.bucket_unpin(first_id) +assert(not ok and err) +box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}}) +ok, err = vshard.storage.bucket_pin(first_id) +assert(not ok and err) +ok, err = vshard.storage.bucket_unpin(first_id) +assert(not ok and err) +box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.SENT}}) +ok, err = vshard.storage.bucket_pin(first_id) +assert(not ok and err) +ok, err = vshard.storage.bucket_unpin(first_id) +assert(not ok and err) +box.space._bucket:update({first_id}, {{'=', 2, vshard.consts.BUCKET.GARBAGE}}) +ok, err = vshard.storage.bucket_pin(first_id) +assert(not ok and err) +ok, err = vshard.storage.bucket_unpin(first_id) +assert(not ok and err) +box.rollback() +test_run:cmd("setopt delimiter ''"); + +-- +-- Now pin some buckets and create such disbalance, that the +-- rebalancer will face with unreachability of the perfect +-- balance. +-- +for i = 1, 800 do local ok, err = vshard.storage.bucket_pin(first_id - 1 + i) assert(ok) end +status:count({vshard.consts.BUCKET.PINNED}) +rs1_cfg.weight = 0.5 +vshard.storage.cfg(cfg, names.replica_uuid.box_1_a) +wait_rebalancer_state('The cluster is balanced ok', test_run) +-- The perfect balance is now 200-400-400, but on the replicaset 1 +-- 800 buckets are pinned, so the actual balance is 800-1100-1100. +info = vshard.storage.info().bucket +info.active +info.pinned +test_run:switch('box_2_a') +vshard.storage.info().bucket.active + +test_run:switch('box_3_a') +vshard.storage.info().bucket.active + test_run:cmd("switch default") test_run:drop_cluster(REPLICASET_3) test_run:drop_cluster(REPLICASET_2) diff --git a/test/rebalancer/restart_during_rebalancing.result b/test/rebalancer/restart_during_rebalancing.result index 083f4ff..d2b55df 100644 --- a/test/rebalancer/restart_during_rebalancing.result +++ b/test/rebalancer/restart_during_rebalancing.result @@ -284,6 +284,7 @@ vshard.storage.info().bucket active: 750 total: 750 garbage: 0 + pinned: 0 sending: 0 ... vshard.storage.internal.buckets_to_recovery @@ -304,6 +305,7 @@ vshard.storage.info().bucket active: 750 total: 750 garbage: 0 + pinned: 0 sending: 0 ... vshard.storage.internal.buckets_to_recovery @@ -324,6 +326,7 @@ vshard.storage.info().bucket active: 750 total: 750 garbage: 0 + pinned: 0 sending: 0 ... vshard.storage.internal.buckets_to_recovery @@ -344,6 +347,7 @@ vshard.storage.info().bucket active: 750 total: 750 garbage: 0 + pinned: 0 sending: 0 ... vshard.storage.internal.buckets_to_recovery diff --git a/test/storage/storage.result b/test/storage/storage.result index 537c85b..acf8da8 100644 --- a/test/storage/storage.result +++ b/test/storage/storage.result @@ -197,6 +197,7 @@ vshard.storage.info() active: 0 total: 0 garbage: 0 + pinned: 0 sending: 0 status: 2 replication: @@ -287,6 +288,7 @@ vshard.storage.info() active: 2 total: 2 garbage: 0 + pinned: 0 sending: 0 status: 0 replication: @@ -317,6 +319,7 @@ vshard.storage.info() active: 2 total: 2 garbage: 0 + pinned: 0 sending: 0 status: 1 replication: @@ -363,6 +366,7 @@ vshard.storage.info() active: 2 total: 2 garbage: 0 + pinned: 0 sending: 0 status: 2 replication: @@ -394,6 +398,7 @@ vshard.storage.info() active: 2 total: 2 garbage: 0 + pinned: 0 sending: 0 status: 3 replication: @@ -426,6 +431,7 @@ vshard.storage.info() active: 2 total: 2 garbage: 0 + pinned: 0 sending: 0 status: 0 replication: @@ -457,6 +463,7 @@ vshard.storage.info() active: 2 total: 2 garbage: 0 + pinned: 0 sending: 0 status: 2 replication: diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 99fbf7b..9f6f804 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -105,7 +105,8 @@ end -- buckets can accept 'read' too. -- local function bucket_is_writable(bucket) - return bucket.status == consts.BUCKET.ACTIVE + return bucket.status == consts.BUCKET.ACTIVE or + bucket.status == consts.BUCKET.PINNED end -- @@ -730,6 +731,54 @@ local function bucket_send(bucket_id, destination) return true end +-- +-- Pin a bucket to a replicaset. Pinned bucket can not be sent +-- even if is breaks the cluster balance. +-- @param bucket_id Bucket identifier to pin. +-- @retval true A bucket is pinned. +-- @retval nil, err A bucket can not be pinned. @A err is the +-- reason why. +-- +local function bucket_pin(bucket_id) + if type(bucket_id) ~= 'number' then + error('Usage: bucket_pin(bucket_id)') + end + local bucket, err = bucket_check_state(bucket_id, 'write') + if err then + return nil, err + end + assert(bucket) + if bucket.status ~= consts.BUCKET.PINNED then + assert(bucket.status == consts.BUCKET.ACTIVE) + box.space._bucket:update({bucket_id}, {{'=', 2, consts.BUCKET.PINNED}}) + end + return true +end + +-- +-- Return a pinned bucket back into active state. +-- @param bucket_id Bucket identifier to unpin. +-- @retval true A bucket is unpinned. +-- @retval nil, err A bucket can not be unpinned. @A err is the +-- reason why. +-- +local function bucket_unpin(bucket_id) + if type(bucket_id) ~= 'number' then + error('Usage: bucket_unpin(bucket_id)') + end + local bucket, err = bucket_check_state(bucket_id, 'write') + if err then + return nil, err + end + assert(bucket) + if bucket.status == consts.BUCKET.PINNED then + box.space._bucket:update({bucket_id}, {{'=', 2, consts.BUCKET.ACTIVE}}) + else + assert(bucket.status == consts.BUCKET.ACTIVE) + end + return true +end + -------------------------------------------------------------------------------- -- Garbage collector -------------------------------------------------------------------------------- @@ -1633,11 +1682,13 @@ local function storage_info() state.bucket.lock = true end local status = box.space._bucket.index.status + local pinned = status:count({consts.BUCKET.PINNED}) state.bucket.total = box.space._bucket:count() - state.bucket.active = status:count({consts.BUCKET.ACTIVE}) + state.bucket.active = status:count({consts.BUCKET.ACTIVE}) + pinned state.bucket.garbage = status:count({consts.BUCKET.SENT}) state.bucket.receiving = status:count({consts.BUCKET.RECEIVING}) state.bucket.sending = status:count({consts.BUCKET.SENDING}) + state.bucket.pinned = pinned if state.bucket.receiving ~= 0 and state.bucket.sending ~= 0 then -- --Some buckets are receiving and some buckets are sending at same time, @@ -1758,6 +1809,8 @@ return { bucket_recv = bucket_recv, bucket_send = bucket_send, bucket_stat = bucket_stat, + bucket_pin = bucket_pin, + bucket_unpin = bucket_unpin, bucket_delete_garbage = bucket_delete_garbage, garbage_collector_wakeup = garbage_collector_wakeup, rebalancer_wakeup = rebalancer_wakeup, -- 2.14.3 (Apple Git-98)