[Tarantool-patches] [PATCH 8/9] recovery: introduce reactive recovery

Oleg Babin olegrok at tarantool.org
Wed Feb 10 12:00:31 MSK 2021


Thanks for your patch. LGTM.

On 10/02/2021 02:46, Vladislav Shpilevoy wrote:
> Recovery is a fiber on a master node which tries to resolve
> SENDING/RECEIVING buckets into GARBAGE or ACTIVE, in case they are
> stuck. Usually it happens due to a conflict on the receiving side,
> or if a restart happens during bucket send.
>
> Recovery was proactive. It used to wakeup with a constant period
> to find and resolve the needed buckets.
>
> But this won't work with the future feature called 'map-reduce'.
> Map-reduce as a preparation stage will need to ensure that all
> buckets on a storage are readable and writable. With the current
> recovery algorithm if a bucket is broken, it won't be recovered
> for the next 5 seconds by default. During this time all new
> map-reduce requests can't execute.
>
> This is not acceptable. As well as too frequent wakeup of recovery
> fiber because it would waste TX thread time.
>
> The patch makes recovery fiber wakeup not by a timeout but by
> events happening with _bucket space. Recovery fiber sleeps on a
> condition variable which is signaled when _bucket is changed.
>
> This is very similar to the reactive GC feature in a previous
> commit.
>
> It is worth mentioning that the backoff happens not only when a
> bucket couldn't be recovered (its transfer is still in progress,
> for example), but also when a network error happened and recovery
> couldn't check state of the bucket on the other storage.
>
> It would be a useless busy loop to retry network errors
> immediately after their appearance. Recovery uses a backoff
> interval for them as well.
>
> Needed for #147
> ---
>   test/router/router.result             | 22 ++++++++---
>   test/router/router.test.lua           | 13 ++++++-
>   test/storage/recovery.result          |  8 ++++
>   test/storage/recovery.test.lua        |  5 +++
>   test/storage/recovery_errinj.result   | 16 +++++++-
>   test/storage/recovery_errinj.test.lua |  9 ++++-
>   vshard/consts.lua                     |  2 +-
>   vshard/storage/init.lua               | 54 +++++++++++++++++++++++----
>   8 files changed, 110 insertions(+), 19 deletions(-)
>
> diff --git a/test/router/router.result b/test/router/router.result
> index b2efd6d..3c1d073 100644
> --- a/test/router/router.result
> +++ b/test/router/router.result
> @@ -312,6 +312,11 @@ replicaset, err = vshard.router.bucket_discovery(2); return err == nil or err
>   _ = test_run:switch('storage_2_a')
>   ---
>   ...
> +-- Pause recovery. It is too aggressive, and the test needs to see buckets in
> +-- their intermediate states.
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
> +---
> +...
>   box.space._bucket:replace({1, vshard.consts.BUCKET.SENDING, util.replicasets[1]})
>   ---
>   - [1, 'sending', '<replicaset_1>']
> @@ -319,6 +324,9 @@ box.space._bucket:replace({1, vshard.consts.BUCKET.SENDING, util.replicasets[1]}
>   _ = test_run:switch('storage_1_a')
>   ---
>   ...
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
> +---
> +...
>   box.space._bucket:replace({1, vshard.consts.BUCKET.RECEIVING, util.replicasets[2]})
>   ---
>   - [1, 'receiving', '<replicaset_2>']
> @@ -342,19 +350,21 @@ util.check_error(vshard.router.call, 1, 'write', 'echo', {123})
>     name: TRANSFER_IS_IN_PROGRESS
>     message: Bucket 1 is transferring to replicaset <replicaset_1>
>   ...
> -_ = test_run:switch('storage_2_a')
> +_ = test_run:switch('storage_1_a')
> +---
> +...
> +box.space._bucket:delete({1})
>   ---
> +- [1, 'receiving', '<replicaset_2>']
>   ...
> -box.space._bucket:replace({1, vshard.consts.BUCKET.ACTIVE})
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
>   ---
> -- [1, 'active']
>   ...
> -_ = test_run:switch('storage_1_a')
> +_ = test_run:switch('storage_2_a')
>   ---
>   ...
> -box.space._bucket:delete({1})
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
>   ---
> -- [1, 'receiving', '<replicaset_2>']
>   ...
>   _ = test_run:switch('router_1')
>   ---
> diff --git a/test/router/router.test.lua b/test/router/router.test.lua
> index 154310b..aa3eb3b 100644
> --- a/test/router/router.test.lua
> +++ b/test/router/router.test.lua
> @@ -114,19 +114,28 @@ replicaset, err = vshard.router.bucket_discovery(1); return err == nil or err
>   replicaset, err = vshard.router.bucket_discovery(2); return err == nil or err
>   
>   _ = test_run:switch('storage_2_a')
> +-- Pause recovery. It is too aggressive, and the test needs to see buckets in
> +-- their intermediate states.
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
>   box.space._bucket:replace({1, vshard.consts.BUCKET.SENDING, util.replicasets[1]})
> +
>   _ = test_run:switch('storage_1_a')
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
>   box.space._bucket:replace({1, vshard.consts.BUCKET.RECEIVING, util.replicasets[2]})
> +
>   _ = test_run:switch('router_1')
>   -- Ok to read sending bucket.
>   vshard.router.call(1, 'read', 'echo', {123})
>   -- Not ok to write sending bucket.
>   util.check_error(vshard.router.call, 1, 'write', 'echo', {123})
>   
> -_ = test_run:switch('storage_2_a')
> -box.space._bucket:replace({1, vshard.consts.BUCKET.ACTIVE})
>   _ = test_run:switch('storage_1_a')
>   box.space._bucket:delete({1})
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
> +
> +_ = test_run:switch('storage_2_a')
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
> +
>   _ = test_run:switch('router_1')
>   
>   -- Check unavailability of master of a replicaset.
> diff --git a/test/storage/recovery.result b/test/storage/recovery.result
> index 8ccb0b9..fa92bca 100644
> --- a/test/storage/recovery.result
> +++ b/test/storage/recovery.result
> @@ -28,12 +28,20 @@ util.push_rs_filters(test_run)
>   _ = test_run:switch("storage_2_a")
>   ---
>   ...
> +-- Pause until restart. Otherwise recovery does its job too fast and does not
> +-- allow to simulate the intermediate state.
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
> +---
> +...
>   vshard.storage.rebalancer_disable()
>   ---
>   ...
>   _ = test_run:switch("storage_1_a")
>   ---
>   ...
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
> +---
> +...
>   -- Create buckets sending to rs2 and restart - recovery must
>   -- garbage some of them and activate others. Receiving buckets
>   -- must be garbaged on bootstrap.
> diff --git a/test/storage/recovery.test.lua b/test/storage/recovery.test.lua
> index a0651e8..93cec68 100644
> --- a/test/storage/recovery.test.lua
> +++ b/test/storage/recovery.test.lua
> @@ -10,8 +10,13 @@ util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
>   util.push_rs_filters(test_run)
>   
>   _ = test_run:switch("storage_2_a")
> +-- Pause until restart. Otherwise recovery does its job too fast and does not
> +-- allow to simulate the intermediate state.
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
>   vshard.storage.rebalancer_disable()
> +
>   _ = test_run:switch("storage_1_a")
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
>   
>   -- Create buckets sending to rs2 and restart - recovery must
>   -- garbage some of them and activate others. Receiving buckets
> diff --git a/test/storage/recovery_errinj.result b/test/storage/recovery_errinj.result
> index 3e9a9bf..8c178d5 100644
> --- a/test/storage/recovery_errinj.result
> +++ b/test/storage/recovery_errinj.result
> @@ -35,9 +35,17 @@ _ = test_run:switch('storage_2_a')
>   vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
>   ---
>   ...
> +-- Pause recovery. Otherwise it does its job too fast and does not allow to
> +-- simulate the intermediate state.
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
> +---
> +...
>   _ = test_run:switch('storage_1_a')
>   ---
>   ...
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
> +---
> +...
>   _bucket = box.space._bucket
>   ---
>   ...
> @@ -76,10 +84,16 @@ _bucket:get{1}
>   ---
>   - [1, 'active']
>   ...
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
> +---
> +...
>   _ = test_run:switch('storage_1_a')
>   ---
>   ...
> -while _bucket:count() ~= 0 do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
> +---
> +...
> +wait_bucket_is_collected(1)
>   ---
>   ...
>   _ = test_run:switch("default")
> diff --git a/test/storage/recovery_errinj.test.lua b/test/storage/recovery_errinj.test.lua
> index 8c1a9d2..c730560 100644
> --- a/test/storage/recovery_errinj.test.lua
> +++ b/test/storage/recovery_errinj.test.lua
> @@ -14,7 +14,12 @@ util.push_rs_filters(test_run)
>   --
>   _ = test_run:switch('storage_2_a')
>   vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
> +-- Pause recovery. Otherwise it does its job too fast and does not allow to
> +-- simulate the intermediate state.
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
> +
>   _ = test_run:switch('storage_1_a')
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = true
>   _bucket = box.space._bucket
>   _bucket:replace{1, vshard.consts.BUCKET.ACTIVE, util.replicasets[2]}
>   ret, err = vshard.storage.bucket_send(1, util.replicasets[2], {timeout = 0.1})
> @@ -27,9 +32,11 @@ vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
>   _bucket = box.space._bucket
>   while _bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do fiber.sleep(0.01) end
>   _bucket:get{1}
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
>   
>   _ = test_run:switch('storage_1_a')
> -while _bucket:count() ~= 0 do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
> +vshard.storage.internal.errinj.ERRINJ_NO_RECOVERY = false
> +wait_bucket_is_collected(1)
>   
>   _ = test_run:switch("default")
>   test_run:drop_cluster(REPLICASET_2)
> diff --git a/vshard/consts.lua b/vshard/consts.lua
> index 3f1585a..cf3f422 100644
> --- a/vshard/consts.lua
> +++ b/vshard/consts.lua
> @@ -39,7 +39,7 @@ return {
>       DEFAULT_SYNC_TIMEOUT = 1;
>       RECONNECT_TIMEOUT = 0.5;
>       GC_BACKOFF_INTERVAL = 5,
> -    RECOVERY_INTERVAL = 5;
> +    RECOVERY_BACKOFF_INTERVAL = 5,
>       COLLECT_LUA_GARBAGE_INTERVAL = 100;
>   
>       DISCOVERY_IDLE_INTERVAL = 10,
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 31a6fc7..85f5024 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -634,13 +634,16 @@ end
>   -- Infinite function to resolve status of buckets, whose 'sending'
>   -- has failed due to tarantool or network problems. Restarts on
>   -- reload.
> --- @param module_version Module version, on which the current
> ---        function had been started. If the actual module version
> ---        appears to be changed, then stop recovery. It is
> ---        restarted in reloadable_fiber.
>   --
>   local function recovery_f()
>       local module_version = M.module_version
> +    -- Changes of _bucket increments bucket generation. Recovery has its own
> +    -- bucket generation which is <= actual. Recovery is finished, when its
> +    -- generation == bucket generation. In such a case the fiber does nothing
> +    -- until next _bucket change.
> +    local bucket_generation_recovered = -1
> +    local bucket_generation_current = M.bucket_generation
> +    local ok, sleep_time, is_all_recovered, total, recovered
>       -- Interrupt recovery if a module has been reloaded. Perhaps,
>       -- there was found a bug, and reload fixes it.
>       while module_version == M.module_version do
> @@ -648,22 +651,57 @@ local function recovery_f()
>               lfiber.yield()
>               goto continue
>           end
> -        local ok, total, recovered = pcall(recovery_step_by_type,
> -                                           consts.BUCKET.SENDING)
> +        is_all_recovered = true
> +        if bucket_generation_recovered == bucket_generation_current then
> +            goto sleep
> +        end
> +
> +        ok, total, recovered = pcall(recovery_step_by_type,
> +                                     consts.BUCKET.SENDING)
>           if not ok then
> +            is_all_recovered = false
>               log.error('Error during sending buckets recovery: %s', total)
> +        elseif total ~= recovered then
> +            is_all_recovered = false
>           end
> +
>           ok, total, recovered = pcall(recovery_step_by_type,
>                                        consts.BUCKET.RECEIVING)
>           if not ok then
> +            is_all_recovered = false
>               log.error('Error during receiving buckets recovery: %s', total)
>           elseif total == 0 then
>               bucket_receiving_quota_reset()
>           else
>               bucket_receiving_quota_add(recovered)
> +            if total ~= recovered then
> +                is_all_recovered = false
> +            end
> +        end
> +
> +    ::sleep::
> +        if not is_all_recovered then
> +            bucket_generation_recovered = -1
> +        else
> +            bucket_generation_recovered = bucket_generation_current
> +        end
> +        bucket_generation_current = M.bucket_generation
> +
> +        if not is_all_recovered then
> +            -- One option - some buckets are not broken. Their transmission is
> +            -- still in progress. Don't need to retry immediately. Another
> +            -- option - network errors when tried to repair the buckets. Also no
> +            -- need to retry often. It won't help.
> +            sleep_time = consts.RECOVERY_BACKOFF_INTERVAL
> +        elseif bucket_generation_recovered ~= bucket_generation_current then
> +            sleep_time = 0
> +        else
> +            sleep_time = consts.TIMEOUT_INFINITY
> +        end
> +        if module_version == M.module_version then
> +            M.bucket_generation_cond:wait(sleep_time)
>           end
> -        lfiber.sleep(consts.RECOVERY_INTERVAL)
> -        ::continue::
> +    ::continue::
>       end
>   end
>   


More information about the Tarantool-patches mailing list