[tarantool-patches] [PATCH vshard 3/7] storage: rework recovery

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Mar 28 00:24:10 MSK 2018


Now RECEIVING bucket state is invisible actually, since it is
set inside a transaction, and reset to ACTIVE inside the same
transaction. But it will be wrong after #73. So take into
account in the recovery function and in state checking, that
RECEIVING buckets exist. The most significant change is that
a bucket can be set to ACTIVE, if it is SENDING on one
replicaset, RECEIVING on another one, but the rebalancer does not
sending it now.

Part of #73

Do some refactoring to simplify error processing.
---
 test/storage/recovery.result   |  85 ++++++++++++----
 test/storage/recovery.test.lua |  38 +++++--
 vshard/router/init.lua         |  45 +++++----
 vshard/storage/init.lua        | 222 +++++++++++++++++++++--------------------
 4 files changed, 230 insertions(+), 160 deletions(-)

diff --git a/test/storage/recovery.result b/test/storage/recovery.result
index d4fe6e4..fe58c4b 100644
--- a/test/storage/recovery.result
+++ b/test/storage/recovery.result
@@ -22,7 +22,7 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
 util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
 ---
 ...
-test_run:cmd("switch storage_1_a")
+test_run:switch("storage_1_a")
 ---
 - true
 ...
@@ -38,10 +38,6 @@ rs2_uuid = replicasets[2]
 _bucket = box.space._bucket
 ---
 ...
-_bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
----
-- [1, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
-...
 _bucket:replace{2, vshard.consts.BUCKET.SENDING, rs2_uuid}
 ---
 - [2, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
@@ -50,7 +46,7 @@ _bucket:replace{3, vshard.consts.BUCKET.RECEIVING, rs2_uuid}
 ---
 - [3, 'receiving', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
 ...
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
 ---
 - true
 ...
@@ -60,10 +56,6 @@ _bucket = box.space._bucket
 rs1_uuid = replicasets[1]
 ---
 ...
-_bucket:replace{1, vshard.consts.BUCKET.RECEIVING, rs1_uuid}
----
-- [1, 'receiving', 'cbf06940-0790-498b-948d-042b62cf3d29']
-...
 _bucket:replace{2, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
 ---
 - [2, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29']
@@ -72,6 +64,9 @@ _bucket:replace{3, vshard.consts.BUCKET.SENDING, rs1_uuid}
 ---
 - [3, 'sending', 'cbf06940-0790-498b-948d-042b62cf3d29']
 ...
+vshard.storage.internal.rebalancer_sending_bucket = 3
+---
+...
 test_run:cmd('stop server storage_1_a')
 ---
 - true
@@ -80,7 +75,7 @@ test_run:cmd('start server storage_1_a')
 ---
 - true
 ...
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
 ---
 - true
 ...
@@ -95,13 +90,64 @@ _bucket = box.space._bucket
 ...
 _bucket:select{}
 ---
-- - [1, 'active', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
-  - [2, 'garbage', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
+- - [2, 'garbage', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
+  - [3, 'receiving', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
+...
+test_run:switch('storage_2_a')
+---
+- true
+...
+_bucket:select{}
+---
+- - [2, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29']
+  - [3, 'sending', 'cbf06940-0790-498b-948d-042b62cf3d29']
+...
+test_run:switch('storage_1_a')
+---
+- true
 ...
 while _bucket:count() ~= 1 do fiber.sleep(0.1) end
 ---
 ...
 --
+-- Test a case, when a bucket is sending on one replicaset,
+-- receiving on another one, but there is no rebalancing.
+--
+test_run:cmd('stop server storage_2_a')
+---
+- true
+...
+test_run:cmd('start server storage_2_a')
+---
+- true
+...
+test_run:switch('storage_2_a')
+---
+- true
+...
+vshard.storage.recovery_wakeup()
+---
+...
+fiber = require('fiber')
+---
+...
+_bucket = box.space._bucket
+---
+...
+while _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) ~= 2 do fiber.sleep(0.1) end
+---
+...
+test_run:switch('storage_1_a')
+---
+- true
+...
+vshard.storage.recovery_wakeup()
+---
+...
+while _bucket:count() ~= 0 do fiber.sleep(0.1) end
+---
+...
+--
 -- Test a case, when a destination is down. The recovery fiber
 -- must restore buckets, when the destination is up.
 --
@@ -112,15 +158,18 @@ _bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
 ---
 - [1, 'sending', 'ac522f65-aa94-4134-9f64-51ee384f1a54']
 ...
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
 ---
 - true
 ...
+rs1_uuid = replicasets[1]
+---
+...
 _bucket:replace{1, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
 ---
 - [1, 'active', 'cbf06940-0790-498b-948d-042b62cf3d29']
 ...
-test_run:cmd('switch default')
+test_run:switch('default')
 ---
 - true
 ...
@@ -136,7 +185,7 @@ test_run:cmd('start server storage_1_a')
 ---
 - true
 ...
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
 ---
 - true
 ...
@@ -168,7 +217,7 @@ _bucket:select{}
 ---
 - []
 ...
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
 ---
 - true
 ...
@@ -230,7 +279,7 @@ fiber = require('fiber')
 while _bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
 ---
 ...
-test_run:cmd("switch default")
+test_run:switch("default")
 ---
 - true
 ...
diff --git a/test/storage/recovery.test.lua b/test/storage/recovery.test.lua
index b3ad269..8cfdc34 100644
--- a/test/storage/recovery.test.lua
+++ b/test/storage/recovery.test.lua
@@ -8,7 +8,7 @@ util = require('util')
 util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
 util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
 
-test_run:cmd("switch storage_1_a")
+test_run:switch("storage_1_a")
 vshard.storage.rebalancer_disable()
 
 rs2_uuid = replicasets[2]
@@ -17,39 +17,57 @@ rs2_uuid = replicasets[2]
 -- must be garbaged on bootstrap.
 _bucket = box.space._bucket
 
-_bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
 _bucket:replace{2, vshard.consts.BUCKET.SENDING, rs2_uuid}
 _bucket:replace{3, vshard.consts.BUCKET.RECEIVING, rs2_uuid}
 
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
 _bucket = box.space._bucket
 rs1_uuid = replicasets[1]
-_bucket:replace{1, vshard.consts.BUCKET.RECEIVING, rs1_uuid}
 _bucket:replace{2, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
 _bucket:replace{3, vshard.consts.BUCKET.SENDING, rs1_uuid}
+vshard.storage.internal.rebalancer_sending_bucket = 3
 
 test_run:cmd('stop server storage_1_a')
 test_run:cmd('start server storage_1_a')
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
 fiber = require('fiber')
 vshard.storage.recovery_wakeup()
 _bucket = box.space._bucket
 _bucket:select{}
+test_run:switch('storage_2_a')
+_bucket:select{}
+test_run:switch('storage_1_a')
 while _bucket:count() ~= 1 do fiber.sleep(0.1) end
 
+--
+-- Test a case, when a bucket is sending on one replicaset,
+-- receiving on another one, but there is no rebalancing.
+--
+test_run:cmd('stop server storage_2_a')
+test_run:cmd('start server storage_2_a')
+test_run:switch('storage_2_a')
+vshard.storage.recovery_wakeup()
+fiber = require('fiber')
+_bucket = box.space._bucket
+while _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) ~= 2 do fiber.sleep(0.1) end
+test_run:switch('storage_1_a')
+vshard.storage.recovery_wakeup()
+while _bucket:count() ~= 0 do fiber.sleep(0.1) end
+
 --
 -- Test a case, when a destination is down. The recovery fiber
 -- must restore buckets, when the destination is up.
 --
 rs2_uuid = replicasets[2]
 _bucket:replace{1, vshard.consts.BUCKET.SENDING, rs2_uuid}
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
+rs1_uuid = replicasets[1]
 _bucket:replace{1, vshard.consts.BUCKET.ACTIVE, rs1_uuid}
-test_run:cmd('switch default')
+test_run:switch('default')
 test_run:cmd('stop server storage_2_a')
 test_run:cmd('stop server storage_1_a')
 test_run:cmd('start server storage_1_a')
-test_run:cmd('switch storage_1_a')
+test_run:switch('storage_1_a')
 _bucket = box.space._bucket
 _bucket:select{}
 for i = 1, 10 do vshard.storage.recovery_wakeup() end
@@ -59,7 +77,7 @@ fiber = require('fiber')
 while _bucket:count() ~= 0 do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
 _bucket:select{}
 
-test_run:cmd('switch storage_2_a')
+test_run:switch('storage_2_a')
 _bucket = box.space._bucket
 _bucket:select{}
 
@@ -80,7 +98,7 @@ _bucket = box.space._bucket
 fiber = require('fiber')
 while _bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
 
-test_run:cmd("switch default")
+test_run:switch("default")
 
 test_run:drop_cluster(REPLICASET_2)
 test_run:drop_cluster(REPLICASET_1)
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index e990c69..9780b15 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -67,29 +67,29 @@ local function bucket_discovery(bucket_id)
     end
 
     log.verbose("Discovering bucket %d", bucket_id)
+    local last_err = nil
     local unreachable_uuid = nil
-    local is_transfer_in_progress = false
-    for _, replicaset in pairs(M.replicasets) do
-        local stat, err = replicaset:callrw('vshard.storage.bucket_stat',
-                                             {bucket_id})
-        if stat then
-            if stat.status == consts.BUCKET.ACTIVE or
-               stat.status == consts.BUCKET.SENDING then
-                log.info("Discovered bucket %d on %s", bucket_id, replicaset)
-                bucket_set(bucket_id, replicaset)
-                return replicaset
-            elseif stat.status == consts.BUCKET.RECEIVING then
-                is_transfer_in_progress = true
-            end
+    for uuid, replicaset in pairs(M.replicasets) do
+        local _, err =
+            replicaset:callrw('vshard.storage.bucket_stat', {bucket_id})
+        if err == nil then
+            bucket_set(bucket_id, replicaset)
+            return replicaset
         elseif err.code ~= lerror.code.WRONG_BUCKET then
-            unreachable_uuid = replicaset.uuid
+            last_err = err
+            unreachable_uuid = uuid
         end
     end
-    local errcode = nil
-    if unreachable_uuid then
-        errcode = lerror.code.UNREACHABLE_REPLICASET
-    elseif is_transfer_in_progress then
-        errcode = lerror.code.TRANSFER_IS_IN_PROGRESS
+    local err = nil
+    if last_err then
+        if last_err.type == 'ClientError' and
+           last_err.code == box.error.NO_CONNECTION then
+            err = lerror.vshard(lerror.code.UNREACHABLE_REPLICASET,
+                                {bucket_id = bucket_id,
+                                 unreachable_uuid = unreachable_uuid})
+        else
+            err = lerror.make(last_err)
+        end
     else
         -- All replicasets were scanned, but a bucket was not
         -- found anywhere, so most likely it does not exist. It
@@ -97,11 +97,12 @@ local function bucket_discovery(bucket_id)
         -- bucket was found to be RECEIVING on one replicaset, and
         -- was not found on other replicasets (it was sent during
         -- discovery).
-        errcode = lerror.code.NO_ROUTE_TO_BUCKET
+        err = lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET,
+                            {bucket_id = bucket_id,
+                             unreachable_uuid = unreachable_uuid})
     end
 
-    return nil, lerror.vshard(errcode, {bucket_id = bucket_id,
-                                        unreachable_uuid = unreachable_uuid})
+    return nil, err
 end
 
 -- Resolve bucket id to replicaset uuid
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 6cbeb4b..471d26a 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -100,6 +100,16 @@ local function is_this_replicaset_locked()
     return M.this_replicaset and M.this_replicaset.lock
 end
 
+--
+-- Check if @a bucket is garbage. It is true for
+-- * sent buckets;
+-- * buckets explicitly marked to be a garbage.
+--
+local function bucket_is_garbage(bucket)
+    return bucket.status == consts.BUCKET.SENT or
+           bucket.status == consts.BUCKET.GARBAGE
+end
+
 --------------------------------------------------------------------------------
 -- Schema
 --------------------------------------------------------------------------------
@@ -168,6 +178,37 @@ end
 -- Recovery
 --------------------------------------------------------------------------------
 
+--
+-- Check if a rebalancing is in progress. It is true, if the node
+-- applies routes received from a rebalancer node in the special
+-- fiber.
+--
+local function rebalancing_is_in_progress()
+    local f = M.rebalancer_applier_fiber
+    return f ~= nil and f:status() ~= 'dead'
+end
+
+--
+-- Check if a local bucket can be deleted.
+--
+local function recovery_local_bucket_is_garbage(local_bucket, remote_bucket)
+    return remote_bucket and remote_bucket.status == consts.BUCKET.ACTIVE
+end
+
+--
+-- Check if a local bucket can become active.
+--
+local function recovery_local_bucket_is_active(local_bucket, remote_bucket)
+    if not remote_bucket or bucket_is_garbage(remote_bucket) then
+        return true
+    end
+    if remote_bucket.status == consts.BUCKET.RECEIVING and
+       local_bucket.status == consts.BUCKET.SENDING then
+        return M.rebalancer_sending_bucket ~= local_bucket.id
+    end
+    return false
+end
+
 --
 -- Check status of each bucket scheduled for recovery. Resolve
 -- status where possible.
@@ -186,13 +227,23 @@ local function recovery_step()
     local _bucket = box.space._bucket
     local new_count = 0
     local is_empty = true
+    --
+    -- If a rebalancer route applier fiber had exited with error
+    -- during bucket sending, then it might did not manage to
+    -- reset currently sending bucket.
+    --
+    if not rebalancing_is_in_progress() and
+       M.rebalancer_sending_bucket ~= 0 then
+        M.rebalancer_sending_bucket = 0
+    end
     for bucket_id, _ in pairs(M.buckets_to_recovery) do
         if is_empty then
             log.info('Starting buckets recovery step')
         end
         is_empty = false
         local bucket = _bucket:get{bucket_id}
-        if not bucket or bucket.status ~= consts.BUCKET.SENDING then
+        if not bucket or (bucket.status ~= consts.BUCKET.SENDING and
+                          bucket.status ~= consts.BUCKET.RECEIVING) then
             -- Possibly, a bucket was deleted or recovered by
             -- an admin. Or recovery_f started not after
             -- bootstrap, but after master change - in such a case
@@ -209,27 +260,21 @@ local function recovery_step()
         end
         local remote_bucket, err =
             destination:callrw('vshard.storage.bucket_stat', {bucket_id})
-        -- If a bucket is not found with WRONG_BUCKET errcode,
-        -- then it either does not exist on destination (possibly,
-        -- the bucket is garbage collected), or is in garbage
-        -- state (GC is in progress). In both cases it can be
-        -- restored here as active.
+        -- Check if it is not a bucket error, and this result can
+        -- not be used to recovery anything. Try later.
         if not remote_bucket and (not err or err.type ~= 'ShardingError' or
-           err.code ~= lerror.code.WRONG_BUCKET) then
-            -- We can ignore other replicasets, because a bucket
-            -- could not be sent from destination further, on
-            -- another replicaset. It is guaranteed by rebalancer
-            -- algorithm, which is stopped, if there are 'sending'
-            -- buckets. And the current bucket is exactly
-            -- 'sending'.
+                                  err.code ~= lerror.code.WRONG_BUCKET) then
             new_count = new_count + 1
             goto continue
         end
-        table.insert(recovered, bucket_id)
-        if remote_bucket and remote_bucket.status == consts.BUCKET.ACTIVE then
+        if recovery_local_bucket_is_garbage(bucket, remote_bucket) then
+            table.insert(recovered, bucket_id)
             table.insert(garbage, bucket_id)
-        else
+        elseif recovery_local_bucket_is_active(bucket, remote_bucket) then
+            table.insert(recovered, bucket_id)
             table.insert(active, bucket_id)
+        else
+            new_count = new_count + 1
         end
 ::continue::
     end
@@ -252,24 +297,6 @@ local function recovery_step()
     M.buckets_to_recovery_count = new_count
 end
 
---
--- Make all 'receiving' buckets be 'garbage'. The procedure is
--- called on instance start to garbage collect buckets, whose
--- transmition was interrupted by the server down.
---
-local function recovery_garbage_receiving_buckets()
-    local _bucket = box.space._bucket
-    local receiving_buckets =
-        _bucket.index.status:select{consts.BUCKET.RECEIVING}
-    if #receiving_buckets > 0 then
-        box.begin()
-        for _, bucket in pairs(receiving_buckets) do
-            _bucket:update({bucket.id}, {{'=', 2, consts.BUCKET.GARBAGE}})
-        end
-        box.commit()
-    end
-end
-
 --
 -- Infinite function to resolve status of buckets, whose 'sending'
 -- has failed due to tarantool or network problems. Restarts on
@@ -282,9 +309,11 @@ end
 local function recovery_f(module_version)
     lfiber.name('vshard.recovery')
     local _bucket = box.space._bucket
-    local sending_buckets = _bucket.index.status:select{consts.BUCKET.SENDING}
     M.buckets_to_recovery = {}
-    for _, bucket in pairs(sending_buckets) do
+    for _, bucket in _bucket.index.status:pairs({consts.BUCKET.SENDING}) do
+        M.buckets_to_recovery[bucket.id] = true
+    end
+    for _, bucket in _bucket.index.status:pairs({consts.BUCKET.RECEIVING}) do
         M.buckets_to_recovery[bucket.id] = true
     end
     -- Interrupt recovery if a module has been reloaded. Perhaps,
@@ -354,31 +383,24 @@ end
 -- Buckets
 --------------------------------------------------------------------------------
 
---
--- Check if @a bucket is garbage. It is true for
--- * sent buckets;
--- * buckets explicitly marked to be a garbage.
---
-local function bucket_is_garbage(bucket)
-    return bucket.status == consts.BUCKET.SENT or
-           bucket.status == consts.BUCKET.GARBAGE
-end
-
 --
 -- Check that an action of a specified mode can be applied to a
 -- bucket.
 -- @param bucket_id Bucket identifier.
 -- @param mode 'Read' or 'write' mode.
 --
--- @retval true Bucket can accept an action of a specified mode.
--- @retval nil, error object Bucket can not accept the action.
+-- @retval bucket Bucket that can accept an action of a specified
+--         mode.
+-- @retval bucket and error object Bucket that can not accept the
+--         action, and a reason why.
 --
 local function bucket_check_state(bucket_id, mode)
     assert(type(bucket_id) == 'number')
     assert(mode == 'read' or mode == 'write')
     local bucket = box.space._bucket:get({bucket_id})
     local errcode = nil
-    if bucket == nil or bucket_is_garbage(bucket) then
+    if bucket == nil or bucket_is_garbage(bucket) or
+       bucket.status == consts.BUCKET.RECEIVING then
         errcode = lerror.code.WRONG_BUCKET
     elseif (bucket.status == consts.BUCKET.SENDING and mode ~= 'read') then
         errcode = lerror.code.TRANSFER_IS_IN_PROGRESS
@@ -388,13 +410,13 @@ local function bucket_check_state(bucket_id, mode)
     end
     if errcode ~= nil then
         local dest = bucket and bucket.destination or nil
-        return nil, lerror.vshard(errcode, {bucket_id = bucket_id,
-                                            destination = dest})
+        return bucket, lerror.vshard(errcode, {bucket_id = bucket_id,
+                                               destination = dest})
     end
 
     assert(bucket.status == consts.BUCKET.ACTIVE or
            bucket.status == consts.BUCKET.SENDING and mode == 'read')
-    return true
+    return bucket, nil
 end
 
 --
@@ -404,18 +426,8 @@ local function bucket_stat(bucket_id)
     if type(bucket_id) ~= 'number' then
         error('Usage: bucket_stat(bucket_id)')
     end
-    local bucket = box.space._bucket:get({bucket_id})
-
-    if not bucket or bucket_is_garbage(bucket) then
-        return nil, lerror.vshard(lerror.code.WRONG_BUCKET,
-                                  {bucket_id = bucket_id})
-    else
-        return {
-            id = bucket.id;
-            status = bucket.status;
-            destination = bucket.destination;
-        }
-    end
+    local stat, err = bucket_check_state(bucket_id, 'read')
+    return stat and stat:tomap(), err
 end
 
 --
@@ -470,6 +482,7 @@ local function bucket_recv(bucket_id, from, data)
     end
 
     box.begin()
+    M.buckets_to_recovery[bucket_id] = true
     bucket = box.space._bucket:insert({bucket_id, consts.BUCKET.RECEIVING,
                                        from})
     -- Fill spaces with data
@@ -482,6 +495,7 @@ local function bucket_recv(bucket_id, from, data)
             -- https://github.com/tarantool/tarantool/issues/3031
             local _, boxerror = pcall(box.error, box.error.NO_SUCH_SPACE,
                                       space_id)
+            M.buckets_to_recovery[bucket_id] = nil
             return nil, lerror.box(boxerror)
         end
         for _, tuple in ipairs(space_data) do
@@ -493,6 +507,7 @@ local function bucket_recv(bucket_id, from, data)
     bucket = box.space._bucket:replace({bucket_id, consts.BUCKET.ACTIVE})
 
     box.commit()
+    M.buckets_to_recovery[bucket_id] = nil
     return true
 end
 
@@ -545,7 +560,7 @@ local function bucket_collect(bucket_id)
     end
 
     local status, err = bucket_check_state(bucket_id, 'read')
-    if not status then
+    if err then
         return nil, err
     end
     return bucket_collect_internal(bucket_id)
@@ -602,8 +617,6 @@ local function local_on_master_disable()
     log.info("Resigned from the replicaset master role")
 end
 
-local collect_garbage_f
-
 --
 -- The only thing, that must be done to abort a master promotion
 -- is a set read_only back to true.
@@ -629,7 +642,6 @@ end
 local function local_on_master_enable()
     box.cfg({read_only = false})
     M._on_master_enable:run()
-    recovery_garbage_receiving_buckets()
     -- Start background process to collect garbage.
     M.collect_bucket_garbage_fiber =
         lfiber.create(util.reloadable_fiber_f, M, 'collect_garbage_f',
@@ -651,7 +663,7 @@ local function bucket_send(bucket_id, destination)
     end
 
     local status, err = bucket_check_state(bucket_id, 'write')
-    if not status then
+    if err then
         return nil, err
     end
     local replicaset = M.replicasets[destination]
@@ -672,9 +684,8 @@ local function bucket_send(bucket_id, destination)
     M.buckets_to_recovery[bucket_id] = true
     box.space._bucket:replace({bucket_id, consts.BUCKET.SENDING, destination})
 
-    local status, err =
-        replicaset:callrw('vshard.storage.bucket_recv',
-                           {bucket_id, box.info.cluster.uuid, data})
+    status, err = replicaset:callrw('vshard.storage.bucket_recv',
+                                    {bucket_id, box.info.cluster.uuid, data})
     if not status then
         if err.type == 'ShardingError' then
             -- Rollback bucket state.
@@ -1142,16 +1153,6 @@ local function rebalancer_apply_routes_f(routes)
     log.info('Rebalancer routes are applied')
 end
 
---
--- Check if a rebalancing is in progress. It is true, if the node
--- applies routes received from a rebalancer node in the special
--- fiber.
---
-local function rebalancing_is_in_progress()
-    local f = M.rebalancer_applier_fiber
-    return f ~= nil and f:status() ~= 'dead'
-end
-
 --
 -- Apply routes table of type: {
 --     dst_uuid = number, -- Bucket count to send.
@@ -1337,8 +1338,8 @@ local function storage_call(bucket_id, mode, name, args)
         error('Unknown mode: '..tostring(mode))
     end
 
-    local ok, err = bucket_check_state(bucket_id, mode)
-    if not ok then
+    local status, err = bucket_check_state(bucket_id, mode)
+    if err then
         return nil, err
     end
     -- TODO: implement box.call()
@@ -1715,30 +1716,31 @@ else
 end
 
 return {
-    sync = sync;
-    bucket_force_create = bucket_force_create;
-    bucket_force_drop = bucket_force_drop;
-    bucket_collect = bucket_collect;
-    bucket_recv = bucket_recv;
-    bucket_send = bucket_send;
-    bucket_stat = bucket_stat;
-    bucket_delete_garbage = bucket_delete_garbage;
-    garbage_collector_wakeup = garbage_collector_wakeup;
-    rebalancer_wakeup = rebalancer_wakeup;
-    rebalancer_apply_routes = rebalancer_apply_routes;
-    rebalancer_disable = rebalancer_disable;
-    rebalancer_enable = rebalancer_enable;
-    is_locked = is_this_replicaset_locked;
-    recovery_wakeup = recovery_wakeup;
-    call = storage_call;
-    cfg = storage_cfg;
-    info = storage_info;
-    buckets_info = storage_buckets_info;
-    buckets_count = storage_buckets_count;
-    buckets_discovery = buckets_discovery;
-    rebalancer_request_state = rebalancer_request_state;
-    internal = M;
-    on_master_enable = on_master_enable;
-    on_master_disable = on_master_disable;
-    module_version = function() return M.module_version end;
+    sync = sync,
+    bucket_force_create = bucket_force_create,
+    bucket_force_drop = bucket_force_drop,
+    bucket_collect = bucket_collect,
+    bucket_recv = bucket_recv,
+    bucket_send = bucket_send,
+    bucket_stat = bucket_stat,
+    bucket_delete_garbage = bucket_delete_garbage,
+    garbage_collector_wakeup = garbage_collector_wakeup,
+    rebalancer_wakeup = rebalancer_wakeup,
+    rebalancer_apply_routes = rebalancer_apply_routes,
+    rebalancer_disable = rebalancer_disable,
+    rebalancer_enable = rebalancer_enable,
+    is_locked = is_this_replicaset_locked,
+    rebalancing_is_in_progress = rebalancing_is_in_progress,
+    recovery_wakeup = recovery_wakeup,
+    call = storage_call,
+    cfg = storage_cfg,
+    info = storage_info,
+    buckets_info = storage_buckets_info,
+    buckets_count = storage_buckets_count,
+    buckets_discovery = buckets_discovery,
+    rebalancer_request_state = rebalancer_request_state,
+    internal = M,
+    on_master_enable = on_master_enable,
+    on_master_disable = on_master_disable,
+    module_version = function() return M.module_version end,
 }
-- 
2.14.3 (Apple Git-98)





More information about the Tarantool-patches mailing list