[Tarantool-patches] [PATCH 8/9] recovery: introduce reactive recovery

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Feb 10 02:46:14 MSK 2021


Recovery is a fiber on a master node which tries to resolve
SENDING/RECEIVING buckets into GARBAGE or ACTIVE, in case they are
stuck. Usually it happens due to a conflict on the receiving side,
or if a restart happens during bucket send.

Recovery was proactive. It used to wakeup with a constant period
to find and resolve the needed buckets.

But this won't work with the future feature called 'map-reduce'.
Map-reduce as a preparation stage will need to ensure that all
buckets on a storage are readable and writable. With the current
recovery algorithm if a bucket is broken, it won't be recovered
for the next 5 seconds by default. During this time all new
map-reduce requests can't execute.

This is not acceptable. As well as too frequent wakeup of recovery
fiber because it would waste TX thread time.

The patch makes recovery fiber wakeup not by a timeout but by
events happening with _bucket space. Recovery fiber sleeps on a
condition variable which is signaled when _bucket is changed.

This is very similar to the reactive GC feature in a previous
commit.

It is worth mentioning that the backoff happens not only when a
bucket couldn't be recovered (its transfer is still in progress,
for example), but also when a network error happened and recovery
couldn't check state of the bucket on the other storage.

It would be a useless busy loop to retry network errors
immediately after their appearance. Recovery uses a backoff
interval for them as well.

Needed for #147
---
 test/router/router.result             | 22 ++++++++---
 test/router/router.test.lua           | 13 ++++++-
 test/storage/recovery.result          |  8 ++++
 test/storage/recovery.test.lua        |  5 +++
 test/storage/recovery_errinj.result   | 16 +++++++-
 test/storage/recovery_errinj.test.lua |  9 ++++-
 vshard/consts.lua                     |  2 +-
 vshard/storage/init.lua               | 54 +++++++++++++++++++++++----
 8 files changed, 110 insertions(+), 19 deletions(-)

diff --git a/test/router/router.result b/test/router/router.result
index b2efd6d..3c1d073 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -312,6 +312,11 @@ replicaset, err = vshard.router.bucket_discovery(2); return err == nil or err
 _ = test_run:switch('storage_2_a')
 ---
 ...
+-- Pause recovery. It is too aggressive, and the test needs to see buckets in
+-- their intermediate states.
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
+---
+...
 box.space._bucket:replace({1, vshard.consts.BUCKET.SENDING, util.replicasets[1]})
 ---
 - [1, 'sending', '<replicaset_1>']
@@ -319,6 +324,9 @@ box.space._bucket:replace({1, vshard.consts.BUCKET.SENDING, util.replicasets[1]}
 _ = test_run:switch('storage_1_a')
 ---
 ...
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
+---
+...
 box.space._bucket:replace({1, vshard.consts.BUCKET.RECEIVING, util.replicasets[2]})
 ---
 - [1, 'receiving', '<replicaset_2>']
@@ -342,19 +350,21 @@ util.check_error(vshard.router.call, 1, 'write', 'echo', {123})
   name: TRANSFER_IS_IN_PROGRESS
   message: Bucket 1 is transferring to replicaset <replicaset_1>
 ...
-_ = test_run:switch('storage_2_a')
+_ = test_run:switch('storage_1_a')
+---
+...
+box.space._bucket:delete({1})
 ---
+- [1, 'receiving', '<replicaset_2>']
 ...
-box.space._bucket:replace({1, vshard.consts.BUCKET.ACTIVE})
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
 ---
-- [1, 'active']
 ...
-_ = test_run:switch('storage_1_a')
+_ = test_run:switch('storage_2_a')
 ---
 ...
-box.space._bucket:delete({1})
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
 ---
-- [1, 'receiving', '<replicaset_2>']
 ...
 _ = test_run:switch('router_1')
 ---
diff --git a/test/router/router.test.lua b/test/router/router.test.lua
index 154310b..aa3eb3b 100644
--- a/test/router/router.test.lua
+++ b/test/router/router.test.lua
@@ -114,19 +114,28 @@ replicaset, err = vshard.router.bucket_discovery(1); return err == nil or err
 replicaset, err = vshard.router.bucket_discovery(2); return err == nil or err
 
 _ = test_run:switch('storage_2_a')
+-- Pause recovery. It is too aggressive, and the test needs to see buckets in
+-- their intermediate states.
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
 box.space._bucket:replace({1, vshard.consts.BUCKET.SENDING, util.replicasets[1]})
+
 _ = test_run:switch('storage_1_a')
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
 box.space._bucket:replace({1, vshard.consts.BUCKET.RECEIVING, util.replicasets[2]})
+
 _ = test_run:switch('router_1')
 -- Ok to read sending bucket.
 vshard.router.call(1, 'read', 'echo', {123})
 -- Not ok to write sending bucket.
 util.check_error(vshard.router.call, 1, 'write', 'echo', {123})
 
-_ = test_run:switch('storage_2_a')
-box.space._bucket:replace({1, vshard.consts.BUCKET.ACTIVE})
 _ = test_run:switch('storage_1_a')
 box.space._bucket:delete({1})
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
+
+_ = test_run:switch('storage_2_a')
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
+
 _ = test_run:switch('router_1')
 
 -- Check unavailability of master of a replicaset.
diff --git a/test/storage/recovery.result b/test/storage/recovery.result
index 8ccb0b9..fa92bca 100644
--- a/test/storage/recovery.result
+++ b/test/storage/recovery.result
@@ -28,12 +28,20 @@ util.push_rs_filters(test_run)
 _ = test_run:switch("storage_2_a")
 ---
 ...
+-- Pause until restart. Otherwise recovery does its job too fast and does not
+-- allow to simulate the intermediate state.
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
+---
+...
 vshard.storage.rebalancer_disable()
 ---
 ...
 _ = test_run:switch("storage_1_a")
 ---
 ...
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
+---
+...
 -- Create buckets sending to rs2 and restart - recovery must
 -- garbage some of them and activate others. Receiving buckets
 -- must be garbaged on bootstrap.
diff --git a/test/storage/recovery.test.lua b/test/storage/recovery.test.lua
index a0651e8..93cec68 100644
--- a/test/storage/recovery.test.lua
+++ b/test/storage/recovery.test.lua
@@ -10,8 +10,13 @@ util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
 util.push_rs_filters(test_run)
 
 _ = test_run:switch("storage_2_a")
+-- Pause until restart. Otherwise recovery does its job too fast and does not
+-- allow to simulate the intermediate state.
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
 vshard.storage.rebalancer_disable()
+
 _ = test_run:switch("storage_1_a")
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
 
 -- Create buckets sending to rs2 and restart - recovery must
 -- garbage some of them and activate others. Receiving buckets
diff --git a/test/storage/recovery_errinj.result b/test/storage/recovery_errinj.result
index 3e9a9bf..8c178d5 100644
--- a/test/storage/recovery_errinj.result
+++ b/test/storage/recovery_errinj.result
@@ -35,9 +35,17 @@ _ = test_run:switch('storage_2_a')
 vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
 ---
 ...
+-- Pause recovery. Otherwise it does its job too fast and does not allow to
+-- simulate the intermediate state.
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
+---
+...
 _ = test_run:switch('storage_1_a')
 ---
 ...
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
+---
+...
 _bucket = box.space._bucket
 ---
 ...
@@ -76,10 +84,16 @@ _bucket:get{1}
 ---
 - [1, 'active']
 ...
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
+---
+...
 _ = test_run:switch('storage_1_a')
 ---
 ...
-while _bucket:count() ~= 0 do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
+---
+...
+wait_bucket_is_collected(1)
 ---
 ...
 _ = test_run:switch("default")
diff --git a/test/storage/recovery_errinj.test.lua b/test/storage/recovery_errinj.test.lua
index 8c1a9d2..c730560 100644
--- a/test/storage/recovery_errinj.test.lua
+++ b/test/storage/recovery_errinj.test.lua
@@ -14,7 +14,12 @@ util.push_rs_filters(test_run)
 --
 _ = test_run:switch('storage_2_a')
 vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
+-- Pause recovery. Otherwise it does its job too fast and does not allow to
+-- simulate the intermediate state.
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
+
 _ = test_run:switch('storage_1_a')
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
 _bucket = box.space._bucket
 _bucket:replace{1, vshard.consts.BUCKET.ACTIVE, util.replicasets[2]}
 ret, err = vshard.storage.bucket_send(1, util.replicasets[2], {timeout = 0.1})
@@ -27,9 +32,11 @@ vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
 _bucket = box.space._bucket
 while _bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do fiber.sleep(0.01) end
 _bucket:get{1}
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
 
 _ = test_run:switch('storage_1_a')
-while _bucket:count() ~= 0 do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
+vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
+wait_bucket_is_collected(1)
 
 _ = test_run:switch("default")
 test_run:drop_cluster(REPLICASET_2)
diff --git a/vshard/consts.lua b/vshard/consts.lua
index 3f1585a..cf3f422 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -39,7 +39,7 @@ return {
     DEFAULT_SYNC_TIMEOUT = 1;
     RECONNECT_TIMEOUT = 0.5;
     GC_BACKOFF_INTERVAL = 5,
-    RECOVERY_INTERVAL = 5;
+    RECOVERY_BACKOFF_INTERVAL = 5,
     COLLECT_LUA_GARBAGE_INTERVAL = 100;
 
     DISCOVERY_IDLE_INTERVAL = 10,
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 31a6fc7..85f5024 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -634,13 +634,16 @@ end
 -- Infinite function to resolve status of buckets, whose 'sending'
 -- has failed due to tarantool or network problems. Restarts on
 -- reload.
--- @param module_version Module version, on which the current
---        function had been started. If the actual module version
---        appears to be changed, then stop recovery. It is
---        restarted in reloadable_fiber.
 --
 local function recovery_f()
     local module_version = M.module_version
+    -- Changes of _bucket increments bucket generation. Recovery has its own
+    -- bucket generation which is <= actual. Recovery is finished, when its
+    -- generation == bucket generation. In such a case the fiber does nothing
+    -- until next _bucket change.
+    local bucket_generation_recovered = -1
+    local bucket_generation_current = M.bucket_generation
+    local ok, sleep_time, is_all_recovered, total, recovered
     -- Interrupt recovery if a module has been reloaded. Perhaps,
     -- there was found a bug, and reload fixes it.
     while module_version == M.module_version do
@@ -648,22 +651,57 @@ local function recovery_f()
             lfiber.yield()
             goto continue
         end
-        local ok, total, recovered = pcall(recovery_step_by_type,
-                                           consts.BUCKET.SENDING)
+        is_all_recovered = true
+        if bucket_generation_recovered == bucket_generation_current then
+            goto sleep
+        end
+
+        ok, total, recovered = pcall(recovery_step_by_type,
+                                     consts.BUCKET.SENDING)
         if not ok then
+            is_all_recovered = false
             log.error('Error during sending buckets recovery: %s', total)
+        elseif total ~= recovered then
+            is_all_recovered = false
         end
+
         ok, total, recovered = pcall(recovery_step_by_type,
                                      consts.BUCKET.RECEIVING)
         if not ok then
+            is_all_recovered = false
             log.error('Error during receiving buckets recovery: %s', total)
         elseif total == 0 then
             bucket_receiving_quota_reset()
         else
             bucket_receiving_quota_add(recovered)
+            if total ~= recovered then
+                is_all_recovered = false
+            end
+        end
+
+    ::sleep::
+        if not is_all_recovered then
+            bucket_generation_recovered = -1
+        else
+            bucket_generation_recovered = bucket_generation_current
+        end
+        bucket_generation_current = M.bucket_generation
+
+        if not is_all_recovered then
+            -- One option - some buckets are not broken. Their transmission is
+            -- still in progress. Don't need to retry immediately. Another
+            -- option - network errors when tried to repair the buckets. Also no
+            -- need to retry often. It won't help.
+            sleep_time = consts.RECOVERY_BACKOFF_INTERVAL
+        elseif bucket_generation_recovered ~= bucket_generation_current then
+            sleep_time = 0
+        else
+            sleep_time = consts.TIMEOUT_INFINITY
+        end
+        if module_version == M.module_version then
+            M.bucket_generation_cond:wait(sleep_time)
         end
-        lfiber.sleep(consts.RECOVERY_INTERVAL)
-        ::continue::
+    ::continue::
     end
 end
 
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list