[tarantool-patches] [PATCH vshard 2/7] rebalancer: remember the currently sending bucket id

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Mar 28 00:24:09 MSK 2018


If is needed for recovery - if a bucket is RECEIVING on one
replicaset, and SENDING on another, but on the source replicaset
it is known, that the rebalancer does not send it now, then this
bucket must be recovered. Its partially received data must be
deleted on a destination replicaset, and its state must be ACTIVE
on a source replicaset.
---
 vshard/storage/init.lua | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 405585d..6cbeb4b 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -82,6 +82,13 @@ if not M then
         -- Maximal bucket count that can be received by a single
         -- replicaset simultaneously.
         rebalancer_max_receiving = 0,
+        -- Identifier of a bucket that rebalancer is sending now,
+        -- or else 0. If a bucket has state SENDING, but its id is
+        -- not stored here, it means, that its sending was
+        -- interrupted, for example by restart of an instance, and
+        -- a destination replicaset must drop already received
+        -- data.
+        rebalancer_sending_bucket = 0,
     }
 end
 
@@ -1109,15 +1116,17 @@ local function rebalancer_apply_routes_f(routes)
     -- guarantee that an event loop does not contain events
     -- between this fiber and its creator.
     M.rebalancer_applier_fiber = lfiber.self()
+    M.rebalancer_sending_bucket = 0
     local active_buckets = _status:select{consts.BUCKET.ACTIVE}
     local i = 1
     for dst_uuid, bucket_count in pairs(routes) do
         assert(i + bucket_count - 1 <= #active_buckets)
-        log.info('Send %d buckets to %s', bucket_count,
-                 M.replicasets[dst_uuid])
+        log.info('Send %d buckets to %s', bucket_count, M.replicasets[dst_uuid])
         for j = i, i + bucket_count - 1 do
-            local status, ret = pcall(bucket_send, active_buckets[j].id,
-                                      dst_uuid)
+            local bucket_id = active_buckets[j].id
+            M.rebalancer_sending_bucket = bucket_id
+            local status, ret = pcall(bucket_send, bucket_id, dst_uuid)
+            M.rebalancer_sending_bucket = 0
             if not status or ret ~= true then
                 if not status then
                     log.error('Error during rebalancer routes applying: %s',
-- 
2.14.3 (Apple Git-98)





More information about the Tarantool-patches mailing list