From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 6C5AB2BCAF for ; Tue, 27 Mar 2018 17:24:20 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id yfA-C6cfJq9S for ; Tue, 27 Mar 2018 17:24:20 -0400 (EDT) Received: from smtp43.i.mail.ru (smtp43.i.mail.ru [94.100.177.103]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id ED6F12A610 for ; Tue, 27 Mar 2018 17:24:19 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH vshard 3/7] storage: rework recovery Date: Wed, 28 Mar 2018 00:24:10 +0300 Message-Id: <1424d9c18b4dd0d9a324cb0930e39e8d57ca0160.1522185711.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: georgy@tarantool.org, Vladislav Shpilevoy Now RECEIVING bucket state is invisible actually, since it is set inside a transaction, and reset to ACTIVE inside the same transaction. But it will be wrong after #73. So take into account in the recovery function and in state checking, that RECEIVING buckets exist. The most significant change is that a bucket can be set to ACTIVE, if it is SENDING on one replicaset, RECEIVING on another one, but the rebalancer does not sending it now. Part of #73 Do some refactoring to simplify error processing. --- test/storage/recovery.result | 85 ++++++++++++---- test/storage/recovery.test.lua | 38 +++++-- vshard/router/init.lua | 45 +++++---- vshard/storage/init.lua | 222 +++++++++++++++++++++-------------------- 4 files changed, 230 insertions(+), 160 deletions(-) diff --git a/test/storage/recovery.result b/test/storage/recovery.result index d4fe6e4..fe58c4b 100644 --- a/test/storage/recovery.result +++ b/test/storage/recovery.result @@ -22,7 +22,7 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a') util.wait_master(test_run, REPLICASET_2, 'storage_2_a') --- ... -test_run:cmd("switch storage_1_a") +test_run:switch("storage_1_a") --- - true ... @@ -38,10 +38,6 @@ rs2_uuid = replicasets[2] _bucket = box.space._bucket --- ... -_bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid} ---- -- [1, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54'] -... _bucket:replace{2, vshard.consts.BUCKET.SENDING, rs2_uuid} --- - [2, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54'] @@ -50,7 +46,7 @@ _bucket:replace{3, vshard.consts.BUCKET.RECEIVING, rs2_uuid} --- - [3, 'receiving', 'ac522f65-aa94-4134-9f64-51ee384f1a54'] ... -test_run:cmd('switch storage_2_a') +test_run:switch('storage_2_a') --- - true ... @@ -60,10 +56,6 @@ _bucket = box.space._bucket rs1_uuid = replicasets[1] --- ... -_bucket:replace{1, vshard.consts.BUCKET.RECEIVING, rs1_uuid} ---- -- [1, 'receiving', 'cbf06940-0790-498b-948d-042b62cf3d29'] -... _bucket:replace{2, vshard.consts.BUCKET.ACTIVE, rs1_uuid} --- - [2, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29'] @@ -72,6 +64,9 @@ _bucket:replace{3, vshard.consts.BUCKET.SENDING, rs1_uuid} --- - [3, 'sending', 'cbf06940-0790-498b-948d-042b62cf3d29'] ... +vshard.storage.internal.rebalancer_sending_bucket = 3 +--- +... test_run:cmd('stop server storage_1_a') --- - true @@ -80,7 +75,7 @@ test_run:cmd('start server storage_1_a') --- - true ... -test_run:cmd('switch storage_1_a') +test_run:switch('storage_1_a') --- - true ... @@ -95,13 +90,64 @@ _bucket = box.space._bucket ... _bucket:select{} --- -- - [1, 'active', 'ac522f65-aa94-4134-9f64-51ee384f1a54'] - - [2, 'garbage', 'ac522f65-aa94-4134-9f64-51ee384f1a54'] +- - [2, 'garbage', 'ac522f65-aa94-4134-9f64-51ee384f1a54'] + - [3, 'receiving', 'ac522f65-aa94-4134-9f64-51ee384f1a54'] +... +test_run:switch('storage_2_a') +--- +- true +... +_bucket:select{} +--- +- - [2, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29'] + - [3, 'sending', 'cbf06940-0790-498b-948d-042b62cf3d29'] +... +test_run:switch('storage_1_a') +--- +- true ... while _bucket:count() ~= 1 do fiber.sleep(0.1) end --- ... -- +-- Test a case, when a bucket is sending on one replicaset, +-- receiving on another one, but there is no rebalancing. +-- +test_run:cmd('stop server storage_2_a') +--- +- true +... +test_run:cmd('start server storage_2_a') +--- +- true +... +test_run:switch('storage_2_a') +--- +- true +... +vshard.storage.recovery_wakeup() +--- +... +fiber = require('fiber') +--- +... +_bucket = box.space._bucket +--- +... +while _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) ~= 2 do fiber.sleep(0.1) end +--- +... +test_run:switch('storage_1_a') +--- +- true +... +vshard.storage.recovery_wakeup() +--- +... +while _bucket:count() ~= 0 do fiber.sleep(0.1) end +--- +... +-- -- Test a case, when a destination is down. The recovery fiber -- must restore buckets, when the destination is up. -- @@ -112,15 +158,18 @@ _bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid} --- - [1, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54'] ... -test_run:cmd('switch storage_2_a') +test_run:switch('storage_2_a') --- - true ... +rs1_uuid = replicasets[1] +--- +... _bucket:replace{1, vshard.consts.BUCKET.ACTIVE, rs1_uuid} --- - [1, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29'] ... -test_run:cmd('switch default') +test_run:switch('default') --- - true ... @@ -136,7 +185,7 @@ test_run:cmd('start server storage_1_a') --- - true ... -test_run:cmd('switch storage_1_a') +test_run:switch('storage_1_a') --- - true ... @@ -168,7 +217,7 @@ _bucket:select{} --- - [] ... -test_run:cmd('switch storage_2_a') +test_run:switch('storage_2_a') --- - true ... @@ -230,7 +279,7 @@ fiber = require('fiber') while _bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end --- ... -test_run:cmd("switch default") +test_run:switch("default") --- - true ... diff --git a/test/storage/recovery.test.lua b/test/storage/recovery.test.lua index b3ad269..8cfdc34 100644 --- a/test/storage/recovery.test.lua +++ b/test/storage/recovery.test.lua @@ -8,7 +8,7 @@ util = require('util') util.wait_master(test_run, REPLICASET_1, 'storage_1_a') util.wait_master(test_run, REPLICASET_2, 'storage_2_a') -test_run:cmd("switch storage_1_a") +test_run:switch("storage_1_a") vshard.storage.rebalancer_disable() rs2_uuid = replicasets[2] @@ -17,39 +17,57 @@ rs2_uuid = replicasets[2] -- must be garbaged on bootstrap. _bucket = box.space._bucket -_bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid} _bucket:replace{2, vshard.consts.BUCKET.SENDING, rs2_uuid} _bucket:replace{3, vshard.consts.BUCKET.RECEIVING, rs2_uuid} -test_run:cmd('switch storage_2_a') +test_run:switch('storage_2_a') _bucket = box.space._bucket rs1_uuid = replicasets[1] -_bucket:replace{1, vshard.consts.BUCKET.RECEIVING, rs1_uuid} _bucket:replace{2, vshard.consts.BUCKET.ACTIVE, rs1_uuid} _bucket:replace{3, vshard.consts.BUCKET.SENDING, rs1_uuid} +vshard.storage.internal.rebalancer_sending_bucket = 3 test_run:cmd('stop server storage_1_a') test_run:cmd('start server storage_1_a') -test_run:cmd('switch storage_1_a') +test_run:switch('storage_1_a') fiber = require('fiber') vshard.storage.recovery_wakeup() _bucket = box.space._bucket _bucket:select{} +test_run:switch('storage_2_a') +_bucket:select{} +test_run:switch('storage_1_a') while _bucket:count() ~= 1 do fiber.sleep(0.1) end +-- +-- Test a case, when a bucket is sending on one replicaset, +-- receiving on another one, but there is no rebalancing. +-- +test_run:cmd('stop server storage_2_a') +test_run:cmd('start server storage_2_a') +test_run:switch('storage_2_a') +vshard.storage.recovery_wakeup() +fiber = require('fiber') +_bucket = box.space._bucket +while _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) ~= 2 do fiber.sleep(0.1) end +test_run:switch('storage_1_a') +vshard.storage.recovery_wakeup() +while _bucket:count() ~= 0 do fiber.sleep(0.1) end + -- -- Test a case, when a destination is down. The recovery fiber -- must restore buckets, when the destination is up. -- rs2_uuid = replicasets[2] _bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid} -test_run:cmd('switch storage_2_a') +test_run:switch('storage_2_a') +rs1_uuid = replicasets[1] _bucket:replace{1, vshard.consts.BUCKET.ACTIVE, rs1_uuid} -test_run:cmd('switch default') +test_run:switch('default') test_run:cmd('stop server storage_2_a') test_run:cmd('stop server storage_1_a') test_run:cmd('start server storage_1_a') -test_run:cmd('switch storage_1_a') +test_run:switch('storage_1_a') _bucket = box.space._bucket _bucket:select{} for i = 1, 10 do vshard.storage.recovery_wakeup() end @@ -59,7 +77,7 @@ fiber = require('fiber') while _bucket:count() ~= 0 do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end _bucket:select{} -test_run:cmd('switch storage_2_a') +test_run:switch('storage_2_a') _bucket = box.space._bucket _bucket:select{} @@ -80,7 +98,7 @@ _bucket = box.space._bucket fiber = require('fiber') while _bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end -test_run:cmd("switch default") +test_run:switch("default") test_run:drop_cluster(REPLICASET_2) test_run:drop_cluster(REPLICASET_1) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index e990c69..9780b15 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -67,29 +67,29 @@ local function bucket_discovery(bucket_id) end log.verbose("Discovering bucket %d", bucket_id) + local last_err = nil local unreachable_uuid = nil - local is_transfer_in_progress = false - for _, replicaset in pairs(M.replicasets) do - local stat, err = replicaset:callrw('vshard.storage.bucket_stat', - {bucket_id}) - if stat then - if stat.status == consts.BUCKET.ACTIVE or - stat.status == consts.BUCKET.SENDING then - log.info("Discovered bucket %d on %s", bucket_id, replicaset) - bucket_set(bucket_id, replicaset) - return replicaset - elseif stat.status == consts.BUCKET.RECEIVING then - is_transfer_in_progress = true - end + for uuid, replicaset in pairs(M.replicasets) do + local _, err = + replicaset:callrw('vshard.storage.bucket_stat', {bucket_id}) + if err == nil then + bucket_set(bucket_id, replicaset) + return replicaset elseif err.code ~= lerror.code.WRONG_BUCKET then - unreachable_uuid = replicaset.uuid + last_err = err + unreachable_uuid = uuid end end - local errcode = nil - if unreachable_uuid then - errcode = lerror.code.UNREACHABLE_REPLICASET - elseif is_transfer_in_progress then - errcode = lerror.code.TRANSFER_IS_IN_PROGRESS + local err = nil + if last_err then + if last_err.type == 'ClientError' and + last_err.code == box.error.NO_CONNECTION then + err = lerror.vshard(lerror.code.UNREACHABLE_REPLICASET, + {bucket_id = bucket_id, + unreachable_uuid = unreachable_uuid}) + else + err = lerror.make(last_err) + end else -- All replicasets were scanned, but a bucket was not -- found anywhere, so most likely it does not exist. It @@ -97,11 +97,12 @@ local function bucket_discovery(bucket_id) -- bucket was found to be RECEIVING on one replicaset, and -- was not found on other replicasets (it was sent during -- discovery). - errcode = lerror.code.NO_ROUTE_TO_BUCKET + err = lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET, + {bucket_id = bucket_id, + unreachable_uuid = unreachable_uuid}) end - return nil, lerror.vshard(errcode, {bucket_id = bucket_id, - unreachable_uuid = unreachable_uuid}) + return nil, err end -- Resolve bucket id to replicaset uuid diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 6cbeb4b..471d26a 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -100,6 +100,16 @@ local function is_this_replicaset_locked() return M.this_replicaset and M.this_replicaset.lock end +-- +-- Check if @a bucket is garbage. It is true for +-- * sent buckets; +-- * buckets explicitly marked to be a garbage. +-- +local function bucket_is_garbage(bucket) + return bucket.status == consts.BUCKET.SENT or + bucket.status == consts.BUCKET.GARBAGE +end + -------------------------------------------------------------------------------- -- Schema -------------------------------------------------------------------------------- @@ -168,6 +178,37 @@ end -- Recovery -------------------------------------------------------------------------------- +-- +-- Check if a rebalancing is in progress. It is true, if the node +-- applies routes received from a rebalancer node in the special +-- fiber. +-- +local function rebalancing_is_in_progress() + local f = M.rebalancer_applier_fiber + return f ~= nil and f:status() ~= 'dead' +end + +-- +-- Check if a local bucket can be deleted. +-- +local function recovery_local_bucket_is_garbage(local_bucket, remote_bucket) + return remote_bucket and remote_bucket.status == consts.BUCKET.ACTIVE +end + +-- +-- Check if a local bucket can become active. +-- +local function recovery_local_bucket_is_active(local_bucket, remote_bucket) + if not remote_bucket or bucket_is_garbage(remote_bucket) then + return true + end + if remote_bucket.status == consts.BUCKET.RECEIVING and + local_bucket.status == consts.BUCKET.SENDING then + return M.rebalancer_sending_bucket ~= local_bucket.id + end + return false +end + -- -- Check status of each bucket scheduled for recovery. Resolve -- status where possible. @@ -186,13 +227,23 @@ local function recovery_step() local _bucket = box.space._bucket local new_count = 0 local is_empty = true + -- + -- If a rebalancer route applier fiber had exited with error + -- during bucket sending, then it might did not manage to + -- reset currently sending bucket. + -- + if not rebalancing_is_in_progress() and + M.rebalancer_sending_bucket ~= 0 then + M.rebalancer_sending_bucket = 0 + end for bucket_id, _ in pairs(M.buckets_to_recovery) do if is_empty then log.info('Starting buckets recovery step') end is_empty = false local bucket = _bucket:get{bucket_id} - if not bucket or bucket.status ~= consts.BUCKET.SENDING then + if not bucket or (bucket.status ~= consts.BUCKET.SENDING and + bucket.status ~= consts.BUCKET.RECEIVING) then -- Possibly, a bucket was deleted or recovered by -- an admin. Or recovery_f started not after -- bootstrap, but after master change - in such a case @@ -209,27 +260,21 @@ local function recovery_step() end local remote_bucket, err = destination:callrw('vshard.storage.bucket_stat', {bucket_id}) - -- If a bucket is not found with WRONG_BUCKET errcode, - -- then it either does not exist on destination (possibly, - -- the bucket is garbage collected), or is in garbage - -- state (GC is in progress). In both cases it can be - -- restored here as active. + -- Check if it is not a bucket error, and this result can + -- not be used to recovery anything. Try later. if not remote_bucket and (not err or err.type ~= 'ShardingError' or - err.code ~= lerror.code.WRONG_BUCKET) then - -- We can ignore other replicasets, because a bucket - -- could not be sent from destination further, on - -- another replicaset. It is guaranteed by rebalancer - -- algorithm, which is stopped, if there are 'sending' - -- buckets. And the current bucket is exactly - -- 'sending'. + err.code ~= lerror.code.WRONG_BUCKET) then new_count = new_count + 1 goto continue end - table.insert(recovered, bucket_id) - if remote_bucket and remote_bucket.status == consts.BUCKET.ACTIVE then + if recovery_local_bucket_is_garbage(bucket, remote_bucket) then + table.insert(recovered, bucket_id) table.insert(garbage, bucket_id) - else + elseif recovery_local_bucket_is_active(bucket, remote_bucket) then + table.insert(recovered, bucket_id) table.insert(active, bucket_id) + else + new_count = new_count + 1 end ::continue:: end @@ -252,24 +297,6 @@ local function recovery_step() M.buckets_to_recovery_count = new_count end --- --- Make all 'receiving' buckets be 'garbage'. The procedure is --- called on instance start to garbage collect buckets, whose --- transmition was interrupted by the server down. --- -local function recovery_garbage_receiving_buckets() - local _bucket = box.space._bucket - local receiving_buckets = - _bucket.index.status:select{consts.BUCKET.RECEIVING} - if #receiving_buckets > 0 then - box.begin() - for _, bucket in pairs(receiving_buckets) do - _bucket:update({bucket.id}, {{'=', 2, consts.BUCKET.GARBAGE}}) - end - box.commit() - end -end - -- -- Infinite function to resolve status of buckets, whose 'sending' -- has failed due to tarantool or network problems. Restarts on @@ -282,9 +309,11 @@ end local function recovery_f(module_version) lfiber.name('vshard.recovery') local _bucket = box.space._bucket - local sending_buckets = _bucket.index.status:select{consts.BUCKET.SENDING} M.buckets_to_recovery = {} - for _, bucket in pairs(sending_buckets) do + for _, bucket in _bucket.index.status:pairs({consts.BUCKET.SENDING}) do + M.buckets_to_recovery[bucket.id] = true + end + for _, bucket in _bucket.index.status:pairs({consts.BUCKET.RECEIVING}) do M.buckets_to_recovery[bucket.id] = true end -- Interrupt recovery if a module has been reloaded. Perhaps, @@ -354,31 +383,24 @@ end -- Buckets -------------------------------------------------------------------------------- --- --- Check if @a bucket is garbage. It is true for --- * sent buckets; --- * buckets explicitly marked to be a garbage. --- -local function bucket_is_garbage(bucket) - return bucket.status == consts.BUCKET.SENT or - bucket.status == consts.BUCKET.GARBAGE -end - -- -- Check that an action of a specified mode can be applied to a -- bucket. -- @param bucket_id Bucket identifier. -- @param mode 'Read' or 'write' mode. -- --- @retval true Bucket can accept an action of a specified mode. --- @retval nil, error object Bucket can not accept the action. +-- @retval bucket Bucket that can accept an action of a specified +-- mode. +-- @retval bucket and error object Bucket that can not accept the +-- action, and a reason why. -- local function bucket_check_state(bucket_id, mode) assert(type(bucket_id) == 'number') assert(mode == 'read' or mode == 'write') local bucket = box.space._bucket:get({bucket_id}) local errcode = nil - if bucket == nil or bucket_is_garbage(bucket) then + if bucket == nil or bucket_is_garbage(bucket) or + bucket.status == consts.BUCKET.RECEIVING then errcode = lerror.code.WRONG_BUCKET elseif (bucket.status == consts.BUCKET.SENDING and mode ~= 'read') then errcode = lerror.code.TRANSFER_IS_IN_PROGRESS @@ -388,13 +410,13 @@ local function bucket_check_state(bucket_id, mode) end if errcode ~= nil then local dest = bucket and bucket.destination or nil - return nil, lerror.vshard(errcode, {bucket_id = bucket_id, - destination = dest}) + return bucket, lerror.vshard(errcode, {bucket_id = bucket_id, + destination = dest}) end assert(bucket.status == consts.BUCKET.ACTIVE or bucket.status == consts.BUCKET.SENDING and mode == 'read') - return true + return bucket, nil end -- @@ -404,18 +426,8 @@ local function bucket_stat(bucket_id) if type(bucket_id) ~= 'number' then error('Usage: bucket_stat(bucket_id)') end - local bucket = box.space._bucket:get({bucket_id}) - - if not bucket or bucket_is_garbage(bucket) then - return nil, lerror.vshard(lerror.code.WRONG_BUCKET, - {bucket_id = bucket_id}) - else - return { - id = bucket.id; - status = bucket.status; - destination = bucket.destination; - } - end + local stat, err = bucket_check_state(bucket_id, 'read') + return stat and stat:tomap(), err end -- @@ -470,6 +482,7 @@ local function bucket_recv(bucket_id, from, data) end box.begin() + M.buckets_to_recovery[bucket_id] = true bucket = box.space._bucket:insert({bucket_id, consts.BUCKET.RECEIVING, from}) -- Fill spaces with data @@ -482,6 +495,7 @@ local function bucket_recv(bucket_id, from, data) -- https://github.com/tarantool/tarantool/issues/3031 local _, boxerror = pcall(box.error, box.error.NO_SUCH_SPACE, space_id) + M.buckets_to_recovery[bucket_id] = nil return nil, lerror.box(boxerror) end for _, tuple in ipairs(space_data) do @@ -493,6 +507,7 @@ local function bucket_recv(bucket_id, from, data) bucket = box.space._bucket:replace({bucket_id, consts.BUCKET.ACTIVE}) box.commit() + M.buckets_to_recovery[bucket_id] = nil return true end @@ -545,7 +560,7 @@ local function bucket_collect(bucket_id) end local status, err = bucket_check_state(bucket_id, 'read') - if not status then + if err then return nil, err end return bucket_collect_internal(bucket_id) @@ -602,8 +617,6 @@ local function local_on_master_disable() log.info("Resigned from the replicaset master role") end -local collect_garbage_f - -- -- The only thing, that must be done to abort a master promotion -- is a set read_only back to true. @@ -629,7 +642,6 @@ end local function local_on_master_enable() box.cfg({read_only = false}) M._on_master_enable:run() - recovery_garbage_receiving_buckets() -- Start background process to collect garbage. M.collect_bucket_garbage_fiber = lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f', @@ -651,7 +663,7 @@ local function bucket_send(bucket_id, destination) end local status, err = bucket_check_state(bucket_id, 'write') - if not status then + if err then return nil, err end local replicaset = M.replicasets[destination] @@ -672,9 +684,8 @@ local function bucket_send(bucket_id, destination) M.buckets_to_recovery[bucket_id] = true box.space._bucket:replace({bucket_id, consts.BUCKET.SENDING, destination}) - local status, err = - replicaset:callrw('vshard.storage.bucket_recv', - {bucket_id, box.info.cluster.uuid, data}) + status, err = replicaset:callrw('vshard.storage.bucket_recv', + {bucket_id, box.info.cluster.uuid, data}) if not status then if err.type == 'ShardingError' then -- Rollback bucket state. @@ -1142,16 +1153,6 @@ local function rebalancer_apply_routes_f(routes) log.info('Rebalancer routes are applied') end --- --- Check if a rebalancing is in progress. It is true, if the node --- applies routes received from a rebalancer node in the special --- fiber. --- -local function rebalancing_is_in_progress() - local f = M.rebalancer_applier_fiber - return f ~= nil and f:status() ~= 'dead' -end - -- -- Apply routes table of type: { -- dst_uuid = number, -- Bucket count to send. @@ -1337,8 +1338,8 @@ local function storage_call(bucket_id, mode, name, args) error('Unknown mode: '..tostring(mode)) end - local ok, err = bucket_check_state(bucket_id, mode) - if not ok then + local status, err = bucket_check_state(bucket_id, mode) + if err then return nil, err end -- TODO: implement box.call() @@ -1715,30 +1716,31 @@ else end return { - sync = sync; - bucket_force_create = bucket_force_create; - bucket_force_drop = bucket_force_drop; - bucket_collect = bucket_collect; - bucket_recv = bucket_recv; - bucket_send = bucket_send; - bucket_stat = bucket_stat; - bucket_delete_garbage = bucket_delete_garbage; - garbage_collector_wakeup = garbage_collector_wakeup; - rebalancer_wakeup = rebalancer_wakeup; - rebalancer_apply_routes = rebalancer_apply_routes; - rebalancer_disable = rebalancer_disable; - rebalancer_enable = rebalancer_enable; - is_locked = is_this_replicaset_locked; - recovery_wakeup = recovery_wakeup; - call = storage_call; - cfg = storage_cfg; - info = storage_info; - buckets_info = storage_buckets_info; - buckets_count = storage_buckets_count; - buckets_discovery = buckets_discovery; - rebalancer_request_state = rebalancer_request_state; - internal = M; - on_master_enable = on_master_enable; - on_master_disable = on_master_disable; - module_version = function() return M.module_version end; + sync = sync, + bucket_force_create = bucket_force_create, + bucket_force_drop = bucket_force_drop, + bucket_collect = bucket_collect, + bucket_recv = bucket_recv, + bucket_send = bucket_send, + bucket_stat = bucket_stat, + bucket_delete_garbage = bucket_delete_garbage, + garbage_collector_wakeup = garbage_collector_wakeup, + rebalancer_wakeup = rebalancer_wakeup, + rebalancer_apply_routes = rebalancer_apply_routes, + rebalancer_disable = rebalancer_disable, + rebalancer_enable = rebalancer_enable, + is_locked = is_this_replicaset_locked, + rebalancing_is_in_progress = rebalancing_is_in_progress, + recovery_wakeup = recovery_wakeup, + call = storage_call, + cfg = storage_cfg, + info = storage_info, + buckets_info = storage_buckets_info, + buckets_count = storage_buckets_count, + buckets_discovery = buckets_discovery, + rebalancer_request_state = rebalancer_request_state, + internal = M, + on_master_enable = on_master_enable, + on_master_disable = on_master_disable, + module_version = function() return M.module_version end, } -- 2.14.3 (Apple Git-98)