[tarantool-patches] [PATCH vshard 3/7] storage: rework recovery
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Mar 28 00:24:10 MSK 2018
Now RECEIVING bucket state is invisible actually, since it is
set inside a transaction, and reset to ACTIVE inside the same
transaction. But it will be wrong after #73. So take into
account in the recovery function and in state checking, that
RECEIVING buckets exist. The most significant change is that
a bucket can be set to ACTIVE, if it is SENDING on one
replicaset, RECEIVING on another one, but the rebalancer does not
sending it now.
Part of #73
Do some refactoring to simplify error processing.
---
test/storage/recovery.result | 85 ++++++++++++----
test/storage/recovery.test.lua | 38 +++++--
vshard/router/init.lua | 45 +++++----
vshard/storage/init.lua | 222 +++++++++++++++++++++--------------------
4 files changed, 230 insertions(+), 160 deletions(-)
diff --git a/test/storage/recovery.result b/test/storage/recovery.result
index d4fe6e4..fe58c4b 100644
--- a/test/storage/recovery.result
+++ b/test/storage/recovery.result
@@ -22,7 +22,7 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
---
...
-test_run:cmd("switch storage_1_a")
+test_run:switch("storage_1_a")
---
- true
...
@@ -38,10 +38,6 @@ rs2_uuid = replicasets[2]
_bucket = box.space._bucket
---
...
-_bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
----
-- [1, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
-...
_bucket:replace{2, vshard.consts.BUCKET.SENDING, rs2_uuid}
---
- [2, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
@@ -50,7 +46,7 @@ _bucket:replace{3, vshard.consts.BUCKET.RECEIVING, rs2_uuid}
---
- [3, 'receiving', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
...
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
---
- true
...
@@ -60,10 +56,6 @@ _bucket = box.space._bucket
rs1_uuid = replicasets[1]
---
...
-_bucket:replace{1, vshard.consts.BUCKET.RECEIVING, rs1_uuid}
----
-- [1, 'receiving', 'cbf06940-0790-498b-948d-042b62cf3d29']
-...
_bucket:replace{2, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
---
- [2, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29']
@@ -72,6 +64,9 @@ _bucket:replace{3, vshard.consts.BUCKET.SENDING, rs1_uuid}
---
- [3, 'sending', 'cbf06940-0790-498b-948d-042b62cf3d29']
...
+vshard.storage.internal.rebalancer_sending_bucket = 3
+---
+...
test_run:cmd('stop server storage_1_a')
---
- true
@@ -80,7 +75,7 @@ test_run:cmd('start server storage_1_a')
---
- true
...
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
---
- true
...
@@ -95,13 +90,64 @@ _bucket = box.space._bucket
...
_bucket:select{}
---
-- - [1, 'active', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
- - [2, 'garbage', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
+- - [2, 'garbage', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
+ - [3, 'receiving', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
+...
+test_run:switch('storage_2_a')
+---
+- true
+...
+_bucket:select{}
+---
+- - [2, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29']
+ - [3, 'sending', 'cbf06940-0790-498b-948d-042b62cf3d29']
+...
+test_run:switch('storage_1_a')
+---
+- true
...
while _bucket:count() ~= 1 do fiber.sleep(0.1) end
---
...
--
+-- Test a case, when a bucket is sending on one replicaset,
+-- receiving on another one, but there is no rebalancing.
+--
+test_run:cmd('stop server storage_2_a')
+---
+- true
+...
+test_run:cmd('start server storage_2_a')
+---
+- true
+...
+test_run:switch('storage_2_a')
+---
+- true
+...
+vshard.storage.recovery_wakeup()
+---
+...
+fiber = require('fiber')
+---
+...
+_bucket = box.space._bucket
+---
+...
+while _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) ~= 2 do fiber.sleep(0.1) end
+---
+...
+test_run:switch('storage_1_a')
+---
+- true
+...
+vshard.storage.recovery_wakeup()
+---
+...
+while _bucket:count() ~= 0 do fiber.sleep(0.1) end
+---
+...
+--
-- Test a case, when a destination is down. The recovery fiber
-- must restore buckets, when the destination is up.
--
@@ -112,15 +158,18 @@ _bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
---
- [1, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
...
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
---
- true
...
+rs1_uuid = replicasets[1]
+---
+...
_bucket:replace{1, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
---
- [1, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29']
...
-test_run:cmd('switch default')
+test_run:switch('default')
---
- true
...
@@ -136,7 +185,7 @@ test_run:cmd('start server storage_1_a')
---
- true
...
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
---
- true
...
@@ -168,7 +217,7 @@ _bucket:select{}
---
- []
...
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
---
- true
...
@@ -230,7 +279,7 @@ fiber = require('fiber')
while _bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
---
...
-test_run:cmd("switch default")
+test_run:switch("default")
---
- true
...
diff --git a/test/storage/recovery.test.lua b/test/storage/recovery.test.lua
index b3ad269..8cfdc34 100644
--- a/test/storage/recovery.test.lua
+++ b/test/storage/recovery.test.lua
@@ -8,7 +8,7 @@ util = require('util')
util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
-test_run:cmd("switch storage_1_a")
+test_run:switch("storage_1_a")
vshard.storage.rebalancer_disable()
rs2_uuid = replicasets[2]
@@ -17,39 +17,57 @@ rs2_uuid = replicasets[2]
-- must be garbaged on bootstrap.
_bucket = box.space._bucket
-_bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
_bucket:replace{2, vshard.consts.BUCKET.SENDING, rs2_uuid}
_bucket:replace{3, vshard.consts.BUCKET.RECEIVING, rs2_uuid}
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
_bucket = box.space._bucket
rs1_uuid = replicasets[1]
-_bucket:replace{1, vshard.consts.BUCKET.RECEIVING, rs1_uuid}
_bucket:replace{2, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
_bucket:replace{3, vshard.consts.BUCKET.SENDING, rs1_uuid}
+vshard.storage.internal.rebalancer_sending_bucket = 3
test_run:cmd('stop server storage_1_a')
test_run:cmd('start server storage_1_a')
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
fiber = require('fiber')
vshard.storage.recovery_wakeup()
_bucket = box.space._bucket
_bucket:select{}
+test_run:switch('storage_2_a')
+_bucket:select{}
+test_run:switch('storage_1_a')
while _bucket:count() ~= 1 do fiber.sleep(0.1) end
+--
+-- Test a case, when a bucket is sending on one replicaset,
+-- receiving on another one, but there is no rebalancing.
+--
+test_run:cmd('stop server storage_2_a')
+test_run:cmd('start server storage_2_a')
+test_run:switch('storage_2_a')
+vshard.storage.recovery_wakeup()
+fiber = require('fiber')
+_bucket = box.space._bucket
+while _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) ~= 2 do fiber.sleep(0.1) end
+test_run:switch('storage_1_a')
+vshard.storage.recovery_wakeup()
+while _bucket:count() ~= 0 do fiber.sleep(0.1) end
+
--
-- Test a case, when a destination is down. The recovery fiber
-- must restore buckets, when the destination is up.
--
rs2_uuid = replicasets[2]
_bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
+rs1_uuid = replicasets[1]
_bucket:replace{1, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
-test_run:cmd('switch default')
+test_run:switch('default')
test_run:cmd('stop server storage_2_a')
test_run:cmd('stop server storage_1_a')
test_run:cmd('start server storage_1_a')
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
_bucket = box.space._bucket
_bucket:select{}
for i = 1, 10 do vshard.storage.recovery_wakeup() end
@@ -59,7 +77,7 @@ fiber = require('fiber')
while _bucket:count() ~= 0 do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
_bucket:select{}
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
_bucket = box.space._bucket
_bucket:select{}
@@ -80,7 +98,7 @@ _bucket = box.space._bucket
fiber = require('fiber')
while _bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
-test_run:cmd("switch default")
+test_run:switch("default")
test_run:drop_cluster(REPLICASET_2)
test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index e990c69..9780b15 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -67,29 +67,29 @@ local function bucket_discovery(bucket_id)
end
log.verbose("Discovering bucket %d", bucket_id)
+ local last_err = nil
local unreachable_uuid = nil
- local is_transfer_in_progress = false
- for _, replicaset in pairs(M.replicasets) do
- local stat, err = replicaset:callrw('vshard.storage.bucket_stat',
- {bucket_id})
- if stat then
- if stat.status == consts.BUCKET.ACTIVE or
- stat.status == consts.BUCKET.SENDING then
- log.info("Discovered bucket %d on %s", bucket_id, replicaset)
- bucket_set(bucket_id, replicaset)
- return replicaset
- elseif stat.status == consts.BUCKET.RECEIVING then
- is_transfer_in_progress = true
- end
+ for uuid, replicaset in pairs(M.replicasets) do
+ local _, err =
+ replicaset:callrw('vshard.storage.bucket_stat', {bucket_id})
+ if err == nil then
+ bucket_set(bucket_id, replicaset)
+ return replicaset
elseif err.code ~= lerror.code.WRONG_BUCKET then
- unreachable_uuid = replicaset.uuid
+ last_err = err
+ unreachable_uuid = uuid
end
end
- local errcode = nil
- if unreachable_uuid then
- errcode = lerror.code.UNREACHABLE_REPLICASET
- elseif is_transfer_in_progress then
- errcode = lerror.code.TRANSFER_IS_IN_PROGRESS
+ local err = nil
+ if last_err then
+ if last_err.type == 'ClientError' and
+ last_err.code == box.error.NO_CONNECTION then
+ err = lerror.vshard(lerror.code.UNREACHABLE_REPLICASET,
+ {bucket_id = bucket_id,
+ unreachable_uuid = unreachable_uuid})
+ else
+ err = lerror.make(last_err)
+ end
else
-- All replicasets were scanned, but a bucket was not
-- found anywhere, so most likely it does not exist. It
@@ -97,11 +97,12 @@ local function bucket_discovery(bucket_id)
-- bucket was found to be RECEIVING on one replicaset, and
-- was not found on other replicasets (it was sent during
-- discovery).
- errcode = lerror.code.NO_ROUTE_TO_BUCKET
+ err = lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET,
+ {bucket_id = bucket_id,
+ unreachable_uuid = unreachable_uuid})
end
- return nil, lerror.vshard(errcode, {bucket_id = bucket_id,
- unreachable_uuid = unreachable_uuid})
+ return nil, err
end
-- Resolve bucket id to replicaset uuid
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 6cbeb4b..471d26a 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -100,6 +100,16 @@ local function is_this_replicaset_locked()
return M.this_replicaset and M.this_replicaset.lock
end
+--
+-- Check if @a bucket is garbage. It is true for
+-- * sent buckets;
+-- * buckets explicitly marked to be a garbage.
+--
+local function bucket_is_garbage(bucket)
+ return bucket.status == consts.BUCKET.SENT or
+ bucket.status == consts.BUCKET.GARBAGE
+end
+
--------------------------------------------------------------------------------
-- Schema
--------------------------------------------------------------------------------
@@ -168,6 +178,37 @@ end
-- Recovery
--------------------------------------------------------------------------------
+--
+-- Check if a rebalancing is in progress. It is true, if the node
+-- applies routes received from a rebalancer node in the special
+-- fiber.
+--
+local function rebalancing_is_in_progress()
+ local f = M.rebalancer_applier_fiber
+ return f ~= nil and f:status() ~= 'dead'
+end
+
+--
+-- Check if a local bucket can be deleted.
+--
+local function recovery_local_bucket_is_garbage(local_bucket, remote_bucket)
+ return remote_bucket and remote_bucket.status == consts.BUCKET.ACTIVE
+end
+
+--
+-- Check if a local bucket can become active.
+--
+local function recovery_local_bucket_is_active(local_bucket, remote_bucket)
+ if not remote_bucket or bucket_is_garbage(remote_bucket) then
+ return true
+ end
+ if remote_bucket.status == consts.BUCKET.RECEIVING and
+ local_bucket.status == consts.BUCKET.SENDING then
+ return M.rebalancer_sending_bucket ~= local_bucket.id
+ end
+ return false
+end
+
--
-- Check status of each bucket scheduled for recovery. Resolve
-- status where possible.
@@ -186,13 +227,23 @@ local function recovery_step()
local _bucket = box.space._bucket
local new_count = 0
local is_empty = true
+ --
+ -- If a rebalancer route applier fiber had exited with error
+ -- during bucket sending, then it might did not manage to
+ -- reset currently sending bucket.
+ --
+ if not rebalancing_is_in_progress() and
+ M.rebalancer_sending_bucket ~= 0 then
+ M.rebalancer_sending_bucket = 0
+ end
for bucket_id, _ in pairs(M.buckets_to_recovery) do
if is_empty then
log.info('Starting buckets recovery step')
end
is_empty = false
local bucket = _bucket:get{bucket_id}
- if not bucket or bucket.status ~= consts.BUCKET.SENDING then
+ if not bucket or (bucket.status ~= consts.BUCKET.SENDING and
+ bucket.status ~= consts.BUCKET.RECEIVING) then
-- Possibly, a bucket was deleted or recovered by
-- an admin. Or recovery_f started not after
-- bootstrap, but after master change - in such a case
@@ -209,27 +260,21 @@ local function recovery_step()
end
local remote_bucket, err =
destination:callrw('vshard.storage.bucket_stat', {bucket_id})
- -- If a bucket is not found with WRONG_BUCKET errcode,
- -- then it either does not exist on destination (possibly,
- -- the bucket is garbage collected), or is in garbage
- -- state (GC is in progress). In both cases it can be
- -- restored here as active.
+ -- Check if it is not a bucket error, and this result can
+ -- not be used to recovery anything. Try later.
if not remote_bucket and (not err or err.type ~= 'ShardingError' or
- err.code ~= lerror.code.WRONG_BUCKET) then
- -- We can ignore other replicasets, because a bucket
- -- could not be sent from destination further, on
- -- another replicaset. It is guaranteed by rebalancer
- -- algorithm, which is stopped, if there are 'sending'
- -- buckets. And the current bucket is exactly
- -- 'sending'.
+ err.code ~= lerror.code.WRONG_BUCKET) then
new_count = new_count + 1
goto continue
end
- table.insert(recovered, bucket_id)
- if remote_bucket and remote_bucket.status == consts.BUCKET.ACTIVE then
+ if recovery_local_bucket_is_garbage(bucket, remote_bucket) then
+ table.insert(recovered, bucket_id)
table.insert(garbage, bucket_id)
- else
+ elseif recovery_local_bucket_is_active(bucket, remote_bucket) then
+ table.insert(recovered, bucket_id)
table.insert(active, bucket_id)
+ else
+ new_count = new_count + 1
end
::continue::
end
@@ -252,24 +297,6 @@ local function recovery_step()
M.buckets_to_recovery_count = new_count
end
---
--- Make all 'receiving' buckets be 'garbage'. The procedure is
--- called on instance start to garbage collect buckets, whose
--- transmition was interrupted by the server down.
---
-local function recovery_garbage_receiving_buckets()
- local _bucket = box.space._bucket
- local receiving_buckets =
- _bucket.index.status:select{consts.BUCKET.RECEIVING}
- if #receiving_buckets > 0 then
- box.begin()
- for _, bucket in pairs(receiving_buckets) do
- _bucket:update({bucket.id}, {{'=', 2, consts.BUCKET.GARBAGE}})
- end
- box.commit()
- end
-end
-
--
-- Infinite function to resolve status of buckets, whose 'sending'
-- has failed due to tarantool or network problems. Restarts on
@@ -282,9 +309,11 @@ end
local function recovery_f(module_version)
lfiber.name('vshard.recovery')
local _bucket = box.space._bucket
- local sending_buckets = _bucket.index.status:select{consts.BUCKET.SENDING}
M.buckets_to_recovery = {}
- for _, bucket in pairs(sending_buckets) do
+ for _, bucket in _bucket.index.status:pairs({consts.BUCKET.SENDING}) do
+ M.buckets_to_recovery[bucket.id] = true
+ end
+ for _, bucket in _bucket.index.status:pairs({consts.BUCKET.RECEIVING}) do
M.buckets_to_recovery[bucket.id] = true
end
-- Interrupt recovery if a module has been reloaded. Perhaps,
@@ -354,31 +383,24 @@ end
-- Buckets
--------------------------------------------------------------------------------
---
--- Check if @a bucket is garbage. It is true for
--- * sent buckets;
--- * buckets explicitly marked to be a garbage.
---
-local function bucket_is_garbage(bucket)
- return bucket.status == consts.BUCKET.SENT or
- bucket.status == consts.BUCKET.GARBAGE
-end
-
--
-- Check that an action of a specified mode can be applied to a
-- bucket.
-- @param bucket_id Bucket identifier.
-- @param mode 'Read' or 'write' mode.
--
--- @retval true Bucket can accept an action of a specified mode.
--- @retval nil, error object Bucket can not accept the action.
+-- @retval bucket Bucket that can accept an action of a specified
+-- mode.
+-- @retval bucket and error object Bucket that can not accept the
+-- action, and a reason why.
--
local function bucket_check_state(bucket_id, mode)
assert(type(bucket_id) == 'number')
assert(mode == 'read' or mode == 'write')
local bucket = box.space._bucket:get({bucket_id})
local errcode = nil
- if bucket == nil or bucket_is_garbage(bucket) then
+ if bucket == nil or bucket_is_garbage(bucket) or
+ bucket.status == consts.BUCKET.RECEIVING then
errcode = lerror.code.WRONG_BUCKET
elseif (bucket.status == consts.BUCKET.SENDING and mode ~= 'read') then
errcode = lerror.code.TRANSFER_IS_IN_PROGRESS
@@ -388,13 +410,13 @@ local function bucket_check_state(bucket_id, mode)
end
if errcode ~= nil then
local dest = bucket and bucket.destination or nil
- return nil, lerror.vshard(errcode, {bucket_id = bucket_id,
- destination = dest})
+ return bucket, lerror.vshard(errcode, {bucket_id = bucket_id,
+ destination = dest})
end
assert(bucket.status == consts.BUCKET.ACTIVE or
bucket.status == consts.BUCKET.SENDING and mode == 'read')
- return true
+ return bucket, nil
end
--
@@ -404,18 +426,8 @@ local function bucket_stat(bucket_id)
if type(bucket_id) ~= 'number' then
error('Usage: bucket_stat(bucket_id)')
end
- local bucket = box.space._bucket:get({bucket_id})
-
- if not bucket or bucket_is_garbage(bucket) then
- return nil, lerror.vshard(lerror.code.WRONG_BUCKET,
- {bucket_id = bucket_id})
- else
- return {
- id = bucket.id;
- status = bucket.status;
- destination = bucket.destination;
- }
- end
+ local stat, err = bucket_check_state(bucket_id, 'read')
+ return stat and stat:tomap(), err
end
--
@@ -470,6 +482,7 @@ local function bucket_recv(bucket_id, from, data)
end
box.begin()
+ M.buckets_to_recovery[bucket_id] = true
bucket = box.space._bucket:insert({bucket_id, consts.BUCKET.RECEIVING,
from})
-- Fill spaces with data
@@ -482,6 +495,7 @@ local function bucket_recv(bucket_id, from, data)
-- https://github.com/tarantool/tarantool/issues/3031
local _, boxerror = pcall(box.error, box.error.NO_SUCH_SPACE,
space_id)
+ M.buckets_to_recovery[bucket_id] = nil
return nil, lerror.box(boxerror)
end
for _, tuple in ipairs(space_data) do
@@ -493,6 +507,7 @@ local function bucket_recv(bucket_id, from, data)
bucket = box.space._bucket:replace({bucket_id, consts.BUCKET.ACTIVE})
box.commit()
+ M.buckets_to_recovery[bucket_id] = nil
return true
end
@@ -545,7 +560,7 @@ local function bucket_collect(bucket_id)
end
local status, err = bucket_check_state(bucket_id, 'read')
- if not status then
+ if err then
return nil, err
end
return bucket_collect_internal(bucket_id)
@@ -602,8 +617,6 @@ local function local_on_master_disable()
log.info("Resigned from the replicaset master role")
end
-local collect_garbage_f
-
--
-- The only thing, that must be done to abort a master promotion
-- is a set read_only back to true.
@@ -629,7 +642,6 @@ end
local function local_on_master_enable()
box.cfg({read_only = false})
M._on_master_enable:run()
- recovery_garbage_receiving_buckets()
-- Start background process to collect garbage.
M.collect_bucket_garbage_fiber =
lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',
@@ -651,7 +663,7 @@ local function bucket_send(bucket_id, destination)
end
local status, err = bucket_check_state(bucket_id, 'write')
- if not status then
+ if err then
return nil, err
end
local replicaset = M.replicasets[destination]
@@ -672,9 +684,8 @@ local function bucket_send(bucket_id, destination)
M.buckets_to_recovery[bucket_id] = true
box.space._bucket:replace({bucket_id, consts.BUCKET.SENDING, destination})
- local status, err =
- replicaset:callrw('vshard.storage.bucket_recv',
- {bucket_id, box.info.cluster.uuid, data})
+ status, err = replicaset:callrw('vshard.storage.bucket_recv',
+ {bucket_id, box.info.cluster.uuid, data})
if not status then
if err.type == 'ShardingError' then
-- Rollback bucket state.
@@ -1142,16 +1153,6 @@ local function rebalancer_apply_routes_f(routes)
log.info('Rebalancer routes are applied')
end
---
--- Check if a rebalancing is in progress. It is true, if the node
--- applies routes received from a rebalancer node in the special
--- fiber.
---
-local function rebalancing_is_in_progress()
- local f = M.rebalancer_applier_fiber
- return f ~= nil and f:status() ~= 'dead'
-end
-
--
-- Apply routes table of type: {
-- dst_uuid = number, -- Bucket count to send.
@@ -1337,8 +1338,8 @@ local function storage_call(bucket_id, mode, name, args)
error('Unknown mode: '..tostring(mode))
end
- local ok, err = bucket_check_state(bucket_id, mode)
- if not ok then
+ local status, err = bucket_check_state(bucket_id, mode)
+ if err then
return nil, err
end
-- TODO: implement box.call()
@@ -1715,30 +1716,31 @@ else
end
return {
- sync = sync;
- bucket_force_create = bucket_force_create;
- bucket_force_drop = bucket_force_drop;
- bucket_collect = bucket_collect;
- bucket_recv = bucket_recv;
- bucket_send = bucket_send;
- bucket_stat = bucket_stat;
- bucket_delete_garbage = bucket_delete_garbage;
- garbage_collector_wakeup = garbage_collector_wakeup;
- rebalancer_wakeup = rebalancer_wakeup;
- rebalancer_apply_routes = rebalancer_apply_routes;
- rebalancer_disable = rebalancer_disable;
- rebalancer_enable = rebalancer_enable;
- is_locked = is_this_replicaset_locked;
- recovery_wakeup = recovery_wakeup;
- call = storage_call;
- cfg = storage_cfg;
- info = storage_info;
- buckets_info = storage_buckets_info;
- buckets_count = storage_buckets_count;
- buckets_discovery = buckets_discovery;
- rebalancer_request_state = rebalancer_request_state;
- internal = M;
- on_master_enable = on_master_enable;
- on_master_disable = on_master_disable;
- module_version = function() return M.module_version end;
+ sync = sync,
+ bucket_force_create = bucket_force_create,
+ bucket_force_drop = bucket_force_drop,
+ bucket_collect = bucket_collect,
+ bucket_recv = bucket_recv,
+ bucket_send = bucket_send,
+ bucket_stat = bucket_stat,
+ bucket_delete_garbage = bucket_delete_garbage,
+ garbage_collector_wakeup = garbage_collector_wakeup,
+ rebalancer_wakeup = rebalancer_wakeup,
+ rebalancer_apply_routes = rebalancer_apply_routes,
+ rebalancer_disable = rebalancer_disable,
+ rebalancer_enable = rebalancer_enable,
+ is_locked = is_this_replicaset_locked,
+ rebalancing_is_in_progress = rebalancing_is_in_progress,
+ recovery_wakeup = recovery_wakeup,
+ call = storage_call,
+ cfg = storage_cfg,
+ info = storage_info,
+ buckets_info = storage_buckets_info,
+ buckets_count = storage_buckets_count,
+ buckets_discovery = buckets_discovery,
+ rebalancer_request_state = rebalancer_request_state,
+ internal = M,
+ on_master_enable = on_master_enable,
+ on_master_disable = on_master_disable,
+ module_version = function() return M.module_version end,
}
--
2.14.3 (Apple Git-98)
More information about the Tarantool-patches
mailing list