[Tarantool-patches] [PATCH 7/9] gc: introduce reactive garbage collector

Oleg Babin olegrok at tarantool.org
Wed Feb 10 12:00:02 MSK 2021


Thanks for your patch.

As I see you've introduced some new parameters: "LUA_CHUNK_SIZE" and 
"GC_BACKOFF_INTERVAL".

I think it's better to describe them in commit message to understand 
more clear how new algorithm.

I see that you didn't update comment above "gc_bucket_f" function. Is it 
still relevant?

In general patch LGTM.


On 10/02/2021 02:46, Vladislav Shpilevoy wrote:
> Garbage collector is a fiber on a master node which deletes
> GARBAGE and SENT buckets along with their data.
>
> It was proactive. It used to wakeup with a constant period to
> find and delete the needed buckets.
>
> But this won't work with the future feature called 'map-reduce'.
> Map-reduce as a preparation stage will need to ensure that all
> buckets on a storage are readable and writable. With the current
> GC algorithm if a bucket is sent, it won't be deleted for the next
> 5 seconds by default. During this time all new map-reduce requests
> can't execute.
>
> This is not acceptable. As well as too frequent wakeup of GC fiber
> because it would waste TX thread time.
>
> The patch makes GC fiber wakeup not by a timeout but by events
> happening with _bucket space. GC fiber sleeps on a condition
> variable which is signaled when _bucket is changed.
>
> Once GC sees work to do, it won't sleep until it is done. It will
> only yield.
>
> This makes GC delete SENT and GARBAGE buckets as soon as possible
> reducing the waiting time for the incoming map-reduce requests.
>
> Needed for #147
>
> @TarantoolBot document
> Title: VShard: deprecate cfg option 'collect_bucket_garbage_interval'
> It was used to specify the interval between bucket garbage
> collection steps. It was needed because garbage collection in
> vshard was proactive. It didn't react to newly appeared garbage
> buckets immediately.
>
> Since now (0.1.17) garbage collection became reactive. It starts
> working with garbage buckets immediately as they appear. And
> sleeps rest of the time. The option is not used now and does not
> affect behaviour of anything.
>
> I suppose it can be deleted from the documentation. Or left with
> a big label 'deprecated' + the explanation above.
>
> An attempt to use the option does not cause an error, but logs a
> warning.
> ---
>   test/lua_libs/storage_template.lua        |   1 -
>   test/misc/reconfigure.result              |  10 -
>   test/misc/reconfigure.test.lua            |   3 -
>   test/rebalancer/bucket_ref.result         |  12 --
>   test/rebalancer/bucket_ref.test.lua       |   3 -
>   test/rebalancer/errinj.result             |  11 --
>   test/rebalancer/errinj.test.lua           |   5 -
>   test/rebalancer/receiving_bucket.result   |   8 -
>   test/rebalancer/receiving_bucket.test.lua |   1 -
>   test/reload_evolution/storage.result      |   2 +-
>   test/router/reroute_wrong_bucket.result   |   8 +-
>   test/router/reroute_wrong_bucket.test.lua |   4 +-
>   test/storage/recovery.result              |   3 +-
>   test/storage/storage.result               |  10 +-
>   test/storage/storage.test.lua             |   1 +
>   test/unit/config.result                   |  35 +---
>   test/unit/config.test.lua                 |  16 +-
>   test/unit/garbage.result                  | 106 ++++++----
>   test/unit/garbage.test.lua                |  47 +++--
>   test/unit/garbage_errinj.result           | 223 ----------------------
>   test/unit/garbage_errinj.test.lua         |  73 -------
>   vshard/cfg.lua                            |   4 +-
>   vshard/consts.lua                         |   5 +-
>   vshard/storage/init.lua                   | 207 ++++++++++----------
>   vshard/storage/reload_evolution.lua       |   8 +
>   25 files changed, 233 insertions(+), 573 deletions(-)
>   delete mode 100644 test/unit/garbage_errinj.result
>   delete mode 100644 test/unit/garbage_errinj.test.lua
>
> diff --git a/test/lua_libs/storage_template.lua b/test/lua_libs/storage_template.lua
> index 21409bd..8df89f6 100644
> --- a/test/lua_libs/storage_template.lua
> +++ b/test/lua_libs/storage_template.lua
> @@ -172,6 +172,5 @@ function wait_bucket_is_collected(id)
>               return true
>           end
>           vshard.storage.recovery_wakeup()
> -        vshard.storage.garbage_collector_wakeup()
>       end)
>   end
> diff --git a/test/misc/reconfigure.result b/test/misc/reconfigure.result
> index 168be5d..3b34841 100644
> --- a/test/misc/reconfigure.result
> +++ b/test/misc/reconfigure.result
> @@ -83,9 +83,6 @@ cfg.collect_lua_garbage = true
>   cfg.rebalancer_max_receiving = 1000
>   ---
>   ...
> -cfg.collect_bucket_garbage_interval = 100
> ----
> -...
>   cfg.invalid_option = 'kek'
>   ---
>   ...
> @@ -105,10 +102,6 @@ vshard.storage.internal.rebalancer_max_receiving ~= 1000
>   ---
>   - true
>   ...
> -vshard.storage.internal.collect_bucket_garbage_interval ~= 100
> ----
> -- true
> -...
>   cfg.sync_timeout = nil
>   ---
>   ...
> @@ -118,9 +111,6 @@ cfg.collect_lua_garbage = nil
>   cfg.rebalancer_max_receiving = nil
>   ---
>   ...
> -cfg.collect_bucket_garbage_interval = nil
> ----
> -...
>   cfg.invalid_option = nil
>   ---
>   ...
> diff --git a/test/misc/reconfigure.test.lua b/test/misc/reconfigure.test.lua
> index e891010..348628c 100644
> --- a/test/misc/reconfigure.test.lua
> +++ b/test/misc/reconfigure.test.lua
> @@ -33,17 +33,14 @@ vshard.storage.internal.sync_timeout
>   cfg.sync_timeout = 100
>   cfg.collect_lua_garbage = true
>   cfg.rebalancer_max_receiving = 1000
> -cfg.collect_bucket_garbage_interval = 100
>   cfg.invalid_option = 'kek'
>   vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a)
>   not vshard.storage.internal.collect_lua_garbage
>   vshard.storage.internal.sync_timeout
>   vshard.storage.internal.rebalancer_max_receiving ~= 1000
> -vshard.storage.internal.collect_bucket_garbage_interval ~= 100
>   cfg.sync_timeout = nil
>   cfg.collect_lua_garbage = nil
>   cfg.rebalancer_max_receiving = nil
> -cfg.collect_bucket_garbage_interval = nil
>   cfg.invalid_option = nil
>   
>   --
> diff --git a/test/rebalancer/bucket_ref.result b/test/rebalancer/bucket_ref.result
> index b8fc7ff..9df7480 100644
> --- a/test/rebalancer/bucket_ref.result
> +++ b/test/rebalancer/bucket_ref.result
> @@ -184,9 +184,6 @@ vshard.storage.bucket_unref(1, 'read')
>   - true
>   ...
>   -- Force GC to take an RO lock on the bucket now.
> -vshard.storage.garbage_collector_wakeup()
> ----
> -...
>   vshard.storage.buckets_info(1)
>   ---
>   - 1:
> @@ -203,7 +200,6 @@ while true do
>       if i.status == vshard.consts.BUCKET.GARBAGE and i.ro_lock then
>           break
>       end
> -    vshard.storage.garbage_collector_wakeup()
>       fiber.sleep(0.01)
>   end;
>   ---
> @@ -235,14 +231,6 @@ finish_refs = true
>   while f1:status() ~= 'dead' do fiber.sleep(0.01) end
>   ---
>   ...
> -vshard.storage.buckets_info(1)
> ----
> -- 1:
> -    status: garbage
> -    ro_lock: true
> -    destination: <replicaset_2>
> -    id: 1
> -...
>   wait_bucket_is_collected(1)
>   ---
>   ...
> diff --git a/test/rebalancer/bucket_ref.test.lua b/test/rebalancer/bucket_ref.test.lua
> index 213ced3..1b032ff 100644
> --- a/test/rebalancer/bucket_ref.test.lua
> +++ b/test/rebalancer/bucket_ref.test.lua
> @@ -56,7 +56,6 @@ vshard.storage.bucket_unref(1, 'write') -- Error, no refs.
>   vshard.storage.bucket_ref(1, 'read')
>   vshard.storage.bucket_unref(1, 'read')
>   -- Force GC to take an RO lock on the bucket now.
> -vshard.storage.garbage_collector_wakeup()
>   vshard.storage.buckets_info(1)
>   _ = test_run:cmd("setopt delimiter ';'")
>   while true do
> @@ -64,7 +63,6 @@ while true do
>       if i.status == vshard.consts.BUCKET.GARBAGE and i.ro_lock then
>           break
>       end
> -    vshard.storage.garbage_collector_wakeup()
>       fiber.sleep(0.01)
>   end;
>   _ = test_run:cmd("setopt delimiter ''");
> @@ -72,7 +70,6 @@ vshard.storage.buckets_info(1)
>   vshard.storage.bucket_refro(1)
>   finish_refs = true
>   while f1:status() ~= 'dead' do fiber.sleep(0.01) end
> -vshard.storage.buckets_info(1)
>   wait_bucket_is_collected(1)
>   _ = test_run:switch('box_2_a')
>   vshard.storage.buckets_info(1)
> diff --git a/test/rebalancer/errinj.result b/test/rebalancer/errinj.result
> index e50eb72..0ddb1c9 100644
> --- a/test/rebalancer/errinj.result
> +++ b/test/rebalancer/errinj.result
> @@ -226,17 +226,6 @@ ret2, err2
>   - true
>   - null
>   ...
> -_bucket:get{35}
> ----
> -- [35, 'sent', '<replicaset_2>']
> -...
> -_bucket:get{36}
> ----
> -- [36, 'sent', '<replicaset_2>']
> -...
> --- Buckets became 'active' on box_2_a, but still are sending on
> --- box_1_a. Wait until it is marked as garbage on box_1_a by the
> --- recovery fiber.
>   wait_bucket_is_collected(35)
>   ---
>   ...
> diff --git a/test/rebalancer/errinj.test.lua b/test/rebalancer/errinj.test.lua
> index 2cc4a69..a60f3d7 100644
> --- a/test/rebalancer/errinj.test.lua
> +++ b/test/rebalancer/errinj.test.lua
> @@ -102,11 +102,6 @@ _ = test_run:switch('box_1_a')
>   while f1:status() ~= 'dead' or f2:status() ~= 'dead' do fiber.sleep(0.001) end
>   ret1, err1
>   ret2, err2
> -_bucket:get{35}
> -_bucket:get{36}
> --- Buckets became 'active' on box_2_a, but still are sending on
> --- box_1_a. Wait until it is marked as garbage on box_1_a by the
> --- recovery fiber.
>   wait_bucket_is_collected(35)
>   wait_bucket_is_collected(36)
>   _ = test_run:switch('box_2_a')
> diff --git a/test/rebalancer/receiving_bucket.result b/test/rebalancer/receiving_bucket.result
> index 7d3612b..ad93445 100644
> --- a/test/rebalancer/receiving_bucket.result
> +++ b/test/rebalancer/receiving_bucket.result
> @@ -366,14 +366,6 @@ vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3})
>   ---
>   - true
>   ...
> -vshard.storage.buckets_info(1)
> ----
> -- 1:
> -    status: sent
> -    ro_lock: true
> -    destination: <replicaset_1>
> -    id: 1
> -...
>   wait_bucket_is_collected(1)
>   ---
>   ...
> diff --git a/test/rebalancer/receiving_bucket.test.lua b/test/rebalancer/receiving_bucket.test.lua
> index 24534b3..2cf6382 100644
> --- a/test/rebalancer/receiving_bucket.test.lua
> +++ b/test/rebalancer/receiving_bucket.test.lua
> @@ -136,7 +136,6 @@ box.space.test3:select{100}
>   -- Now the bucket is unreferenced and can be transferred.
>   _ = test_run:switch('box_2_a')
>   vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3})
> -vshard.storage.buckets_info(1)
>   wait_bucket_is_collected(1)
>   vshard.storage.buckets_info(1)
>   _ = test_run:switch('box_1_a')
> diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result
> index 753687f..9d30a04 100644
> --- a/test/reload_evolution/storage.result
> +++ b/test/reload_evolution/storage.result
> @@ -92,7 +92,7 @@ test_run:grep_log('storage_2_a', 'vshard.storage.reload_evolution: upgraded to')
>   ...
>   vshard.storage.internal.reload_version
>   ---
> -- 2
> +- 3
>   ...
>   --
>   -- gh-237: should be only one trigger. During gh-237 the trigger installation
> diff --git a/test/router/reroute_wrong_bucket.result b/test/router/reroute_wrong_bucket.result
> index 049bdef..ac340eb 100644
> --- a/test/router/reroute_wrong_bucket.result
> +++ b/test/router/reroute_wrong_bucket.result
> @@ -37,7 +37,7 @@ test_run:switch('storage_1_a')
>   ---
>   - true
>   ...
> -cfg.collect_bucket_garbage_interval = 100
> +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
>   ---
>   ...
>   vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a)
> @@ -53,7 +53,7 @@ test_run:switch('storage_2_a')
>   ---
>   - true
>   ...
> -cfg.collect_bucket_garbage_interval = 100
> +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
>   ---
>   ...
>   vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a)
> @@ -202,12 +202,12 @@ test_run:grep_log('router_1', 'please update configuration')
>   err
>   ---
>   - bucket_id: 100
> -  reason: write is prohibited
> +  reason: Not found
>     code: 1
>     destination: ac522f65-aa94-4134-9f64-51ee384f1a54
>     type: ShardingError
>     name: WRONG_BUCKET
> -  message: 'Cannot perform action with bucket 100, reason: write is prohibited'
> +  message: 'Cannot perform action with bucket 100, reason: Not found'
>   ...
>   --
>   -- Now try again, but update configuration during call(). It must
> diff --git a/test/router/reroute_wrong_bucket.test.lua b/test/router/reroute_wrong_bucket.test.lua
> index 9e6e804..207aac3 100644
> --- a/test/router/reroute_wrong_bucket.test.lua
> +++ b/test/router/reroute_wrong_bucket.test.lua
> @@ -11,13 +11,13 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt
>   test_run:cmd('create server router_1 with script="router/router_1.lua"')
>   test_run:cmd('start server router_1')
>   test_run:switch('storage_1_a')
> -cfg.collect_bucket_garbage_interval = 100
> +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
>   vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a)
>   vshard.storage.rebalancer_disable()
>   for i = 1, 100 do box.space._bucket:replace{i, vshard.consts.BUCKET.ACTIVE} end
>   
>   test_run:switch('storage_2_a')
> -cfg.collect_bucket_garbage_interval = 100
> +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
>   vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a)
>   vshard.storage.rebalancer_disable()
>   for i = 101, 200 do box.space._bucket:replace{i, vshard.consts.BUCKET.ACTIVE} end
> diff --git a/test/storage/recovery.result b/test/storage/recovery.result
> index f833fe7..8ccb0b9 100644
> --- a/test/storage/recovery.result
> +++ b/test/storage/recovery.result
> @@ -79,8 +79,7 @@ _bucket = box.space._bucket
>   ...
>   _bucket:select{}
>   ---
> -- - [2, 'garbage', '<replicaset_2>']
> -  - [3, 'garbage', '<replicaset_2>']
> +- []
>   ...
>   _ = test_run:switch('storage_2_a')
>   ---
> diff --git a/test/storage/storage.result b/test/storage/storage.result
> index 424bc4c..0550ad1 100644
> --- a/test/storage/storage.result
> +++ b/test/storage/storage.result
> @@ -547,6 +547,9 @@ vshard.storage.bucket_send(1, util.replicasets[2])
>   ---
>   - true
>   ...
> +wait_bucket_is_collected(1)
> +---
> +...
>   _ = test_run:switch("storage_2_a")
>   ---
>   ...
> @@ -567,12 +570,7 @@ _ = test_run:switch("storage_1_a")
>   ...
>   vshard.storage.buckets_info()
>   ---
> -- 1:
> -    status: sent
> -    ro_lock: true
> -    destination: <replicaset_2>
> -    id: 1
> -  2:
> +- 2:
>       status: active
>       id: 2
>   ...
> diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua
> index d631b51..d8fbd94 100644
> --- a/test/storage/storage.test.lua
> +++ b/test/storage/storage.test.lua
> @@ -136,6 +136,7 @@ vshard.storage.bucket_send(1, util.replicasets[1])
>   
>   -- Successful transfer.
>   vshard.storage.bucket_send(1, util.replicasets[2])
> +wait_bucket_is_collected(1)
>   _ = test_run:switch("storage_2_a")
>   vshard.storage.buckets_info()
>   _ = test_run:switch("storage_1_a")
> diff --git a/test/unit/config.result b/test/unit/config.result
> index dfd0219..e0b2482 100644
> --- a/test/unit/config.result
> +++ b/test/unit/config.result
> @@ -428,33 +428,6 @@ _ = lcfg.check(cfg)
>   --
>   -- gh-77: garbage collection options.
>   --
> -cfg.collect_bucket_garbage_interval = 'str'
> ----
> -...
> -check(cfg)
> ----
> -- Garbage bucket collect interval must be positive number
> -...
> -cfg.collect_bucket_garbage_interval = 0
> ----
> -...
> -check(cfg)
> ----
> -- Garbage bucket collect interval must be positive number
> -...
> -cfg.collect_bucket_garbage_interval = -1
> ----
> -...
> -check(cfg)
> ----
> -- Garbage bucket collect interval must be positive number
> -...
> -cfg.collect_bucket_garbage_interval = 100.5
> ----
> -...
> -_ = lcfg.check(cfg)
> ----
> -...
>   cfg.collect_lua_garbage = 100
>   ---
>   ...
> @@ -615,6 +588,12 @@ lcfg.check(cfg).rebalancer_max_sending
>   cfg.rebalancer_max_sending = nil
>   ---
>   ...
> -cfg.sharding = nil
> +--
> +-- Deprecated option does not break anything.
> +--
> +cfg.collect_bucket_garbage_interval = 100
> +---
> +...
> +_ = lcfg.check(cfg)
>   ---
>   ...
> diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua
> index ada43db..a1c9f07 100644
> --- a/test/unit/config.test.lua
> +++ b/test/unit/config.test.lua
> @@ -175,15 +175,6 @@ _ = lcfg.check(cfg)
>   --
>   -- gh-77: garbage collection options.
>   --
> -cfg.collect_bucket_garbage_interval = 'str'
> -check(cfg)
> -cfg.collect_bucket_garbage_interval = 0
> -check(cfg)
> -cfg.collect_bucket_garbage_interval = -1
> -check(cfg)
> -cfg.collect_bucket_garbage_interval = 100.5
> -_ = lcfg.check(cfg)
> -
>   cfg.collect_lua_garbage = 100
>   check(cfg)
>   cfg.collect_lua_garbage = true
> @@ -244,4 +235,9 @@ util.check_error(lcfg.check, cfg)
>   cfg.rebalancer_max_sending = 15
>   lcfg.check(cfg).rebalancer_max_sending
>   cfg.rebalancer_max_sending = nil
> -cfg.sharding = nil
> +
> +--
> +-- Deprecated option does not break anything.
> +--
> +cfg.collect_bucket_garbage_interval = 100
> +_ = lcfg.check(cfg)
> diff --git a/test/unit/garbage.result b/test/unit/garbage.result
> index 74d9ccf..a530496 100644
> --- a/test/unit/garbage.result
> +++ b/test/unit/garbage.result
> @@ -31,9 +31,6 @@ test_run:cmd("setopt delimiter ''");
>   vshard.storage.internal.shard_index = 'bucket_id'
>   ---
>   ...
> -vshard.storage.internal.collect_bucket_garbage_interval = vshard.consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL
> ----
> -...
>   --
>   -- Find nothing if no bucket_id anywhere, or there is no index
>   -- by it, or bucket_id is not unsigned.
> @@ -151,6 +148,9 @@ format[1] = {name = 'id', type = 'unsigned'}
>   format[2] = {name = 'status', type = 'string'}
>   ---
>   ...
> +format[3] = {name = 'destination', type = 'string', is_nullable = true}
> +---
> +...
>   _bucket = box.schema.create_space('_bucket', {format = format})
>   ---
>   ...
> @@ -172,22 +172,6 @@ _bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
>   ---
>   - [3, 'active']
>   ...
> -_bucket:replace{4, vshard.consts.BUCKET.SENT}
> ----
> -- [4, 'sent']
> -...
> -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> ----
> -- [5, 'garbage']
> -...
> -_bucket:replace{6, vshard.consts.BUCKET.GARBAGE}
> ----
> -- [6, 'garbage']
> -...
> -_bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
> ----
> -- [200, 'garbage']
> -...
>   s = box.schema.create_space('test', {engine = engine})
>   ---
>   ...
> @@ -213,7 +197,7 @@ s:replace{4, 2}
>   ---
>   - [4, 2]
>   ...
> -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
> +gc_bucket_drop = vshard.storage.internal.gc_bucket_drop
>   ---
>   ...
>   s2 = box.schema.create_space('test2', {engine = engine})
> @@ -249,6 +233,10 @@ function fill_spaces_with_garbage()
>       s2:replace{6, 4}
>       s2:replace{7, 5}
>       s2:replace{7, 6}
> +    _bucket:replace{4, vshard.consts.BUCKET.SENT, 'destination1'}
> +    _bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> +    _bucket:replace{6, vshard.consts.BUCKET.GARBAGE, 'destination2'}
> +    _bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
>   end;
>   ---
>   ...
> @@ -267,12 +255,22 @@ fill_spaces_with_garbage()
>   ---
>   - 1107
>   ...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> +route_map = {}
> +---
> +...
> +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
>   ---
> -- - 5
> -  - 6
> -  - 200
>   - true
> +- null
> +...
> +route_map
> +---
> +- - null
> +  - null
> +  - null
> +  - null
> +  - null
> +  - destination2
>   ...
>   #s2:select{}
>   ---
> @@ -282,10 +280,20 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
>   ---
>   - 7
>   ...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> +route_map = {}
> +---
> +...
> +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
>   ---
> -- - 4
>   - true
> +- null
> +...
> +route_map
> +---
> +- - null
> +  - null
> +  - null
> +  - destination1
>   ...
>   s2:select{}
>   ---
> @@ -303,17 +311,22 @@ s:select{}
>     - [6, 100]
>   ...
>   -- Nothing deleted - update collected generation.
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> +route_map = {}
> +---
> +...
> +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
>   ---
> -- - 5
> -  - 6
> -  - 200
>   - true
> +- null
>   ...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
>   ---
> -- - 4
>   - true
> +- null
> +...
> +route_map
> +---
> +- []
>   ...
>   #s2:select{}
>   ---
> @@ -329,15 +342,20 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
>   fill_spaces_with_garbage()
>   ---
>   ...
> -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
> +_ = _bucket:on_replace(function()                                               \
> +    local gen = vshard.storage.internal.bucket_generation                       \
> +    vshard.storage.internal.bucket_generation = gen + 1                         \
> +    vshard.storage.internal.bucket_generation_cond:broadcast()                  \
> +end)
>   ---
>   ...
>   f = fiber.create(vshard.storage.internal.gc_bucket_f)
>   ---
>   ...
>   -- Wait until garbage collection is finished.
> -while s2:count() ~= 3 or s:count() ~= 6 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return s2:count() == 3 and s:count() == 6 end)
>   ---
> +- true
>   ...
>   s:select{}
>   ---
> @@ -360,7 +378,6 @@ _bucket:select{}
>   - - [1, 'active']
>     - [2, 'receiving']
>     - [3, 'active']
> -  - [4, 'sent']
>   ...
>   --
>   -- Test deletion of 'sent' buckets after a specified timeout.
> @@ -370,8 +387,9 @@ _bucket:replace{2, vshard.consts.BUCKET.SENT}
>   - [2, 'sent']
>   ...
>   -- Wait deletion after a while.
> -while _bucket:get{2} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{2} end)
>   ---
> +- true
>   ...
>   _bucket:select{}
>   ---
> @@ -410,8 +428,9 @@ _bucket:replace{4, vshard.consts.BUCKET.SENT}
>   ---
>   - [4, 'sent']
>   ...
> -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{4} end)
>   ---
> +- true
>   ...
>   --
>   -- Test WAL errors during deletion from _bucket.
> @@ -434,11 +453,14 @@ s:replace{6, 4}
>   ---
>   - [6, 4]
>   ...
> -while not test_run:grep_log("default", "Error during deletion of empty sent buckets") do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_log('default', 'Error during garbage collection step',            \
> +                  65536, 10)
>   ---
> +- Error during garbage collection step
>   ...
> -while #sk:select{4} ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return #sk:select{4} == 0 end)
>   ---
> +- true
>   ...
>   s:select{}
>   ---
> @@ -454,8 +476,9 @@ _bucket:select{}
>   _ = _bucket:on_replace(nil, rollback_on_delete)
>   ---
>   ...
> -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{4} end)
>   ---
> +- true
>   ...
>   f:cancel()
>   ---
> @@ -562,8 +585,9 @@ for i = 1, 2000 do _bucket:replace{i, vshard.consts.BUCKET.GARBAGE} s:replace{i,
>   f = fiber.create(vshard.storage.internal.gc_bucket_f)
>   ---
>   ...
> -while _bucket:count() ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return _bucket:count() == 0 end)
>   ---
> +- true
>   ...
>   _bucket:select{}
>   ---
> diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
> index 30079fa..250afb0 100644
> --- a/test/unit/garbage.test.lua
> +++ b/test/unit/garbage.test.lua
> @@ -15,7 +15,6 @@ end;
>   test_run:cmd("setopt delimiter ''");
>   
>   vshard.storage.internal.shard_index = 'bucket_id'
> -vshard.storage.internal.collect_bucket_garbage_interval = vshard.consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL
>   
>   --
>   -- Find nothing if no bucket_id anywhere, or there is no index
> @@ -75,16 +74,13 @@ s:drop()
>   format = {}
>   format[1] = {name = 'id', type = 'unsigned'}
>   format[2] = {name = 'status', type = 'string'}
> +format[3] = {name = 'destination', type = 'string', is_nullable = true}
>   _bucket = box.schema.create_space('_bucket', {format = format})
>   _ = _bucket:create_index('pk')
>   _ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false})
>   _bucket:replace{1, vshard.consts.BUCKET.ACTIVE}
>   _bucket:replace{2, vshard.consts.BUCKET.RECEIVING}
>   _bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
> -_bucket:replace{4, vshard.consts.BUCKET.SENT}
> -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> -_bucket:replace{6, vshard.consts.BUCKET.GARBAGE}
> -_bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
>   
>   s = box.schema.create_space('test', {engine = engine})
>   pk = s:create_index('pk')
> @@ -94,7 +90,7 @@ s:replace{2, 1}
>   s:replace{3, 2}
>   s:replace{4, 2}
>   
> -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
> +gc_bucket_drop = vshard.storage.internal.gc_bucket_drop
>   s2 = box.schema.create_space('test2', {engine = engine})
>   pk2 = s2:create_index('pk')
>   sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
> @@ -114,6 +110,10 @@ function fill_spaces_with_garbage()
>       s2:replace{6, 4}
>       s2:replace{7, 5}
>       s2:replace{7, 6}
> +    _bucket:replace{4, vshard.consts.BUCKET.SENT, 'destination1'}
> +    _bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> +    _bucket:replace{6, vshard.consts.BUCKET.GARBAGE, 'destination2'}
> +    _bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
>   end;
>   test_run:cmd("setopt delimiter ''");
>   
> @@ -121,15 +121,21 @@ fill_spaces_with_garbage()
>   
>   #s2:select{}
>   #s:select{}
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> +route_map = {}
> +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
> +route_map
>   #s2:select{}
>   #s:select{}
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> +route_map = {}
> +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
> +route_map
>   s2:select{}
>   s:select{}
>   -- Nothing deleted - update collected generation.
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> +route_map = {}
> +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
> +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
> +route_map
>   #s2:select{}
>   #s:select{}
>   
> @@ -137,10 +143,14 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
>   -- Test continuous garbage collection via background fiber.
>   --
>   fill_spaces_with_garbage()
> -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
> +_ = _bucket:on_replace(function()                                               \
> +    local gen = vshard.storage.internal.bucket_generation                       \
> +    vshard.storage.internal.bucket_generation = gen + 1                         \
> +    vshard.storage.internal.bucket_generation_cond:broadcast()                  \
> +end)
>   f = fiber.create(vshard.storage.internal.gc_bucket_f)
>   -- Wait until garbage collection is finished.
> -while s2:count() ~= 3 or s:count() ~= 6 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return s2:count() == 3 and s:count() == 6 end)
>   s:select{}
>   s2:select{}
>   -- Check garbage bucket is deleted by background fiber.
> @@ -150,7 +160,7 @@ _bucket:select{}
>   --
>   _bucket:replace{2, vshard.consts.BUCKET.SENT}
>   -- Wait deletion after a while.
> -while _bucket:get{2} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{2} end)
>   _bucket:select{}
>   s:select{}
>   s2:select{}
> @@ -162,7 +172,7 @@ _bucket:replace{4, vshard.consts.BUCKET.ACTIVE}
>   s:replace{5, 4}
>   s:replace{6, 4}
>   _bucket:replace{4, vshard.consts.BUCKET.SENT}
> -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{4} end)
>   
>   --
>   -- Test WAL errors during deletion from _bucket.
> @@ -172,12 +182,13 @@ _ = _bucket:on_replace(rollback_on_delete)
>   _bucket:replace{4, vshard.consts.BUCKET.SENT}
>   s:replace{5, 4}
>   s:replace{6, 4}
> -while not test_run:grep_log("default", "Error during deletion of empty sent buckets") do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> -while #sk:select{4} ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_log('default', 'Error during garbage collection step',            \
> +                  65536, 10)
> +test_run:wait_cond(function() return #sk:select{4} == 0 end)
>   s:select{}
>   _bucket:select{}
>   _ = _bucket:on_replace(nil, rollback_on_delete)
> -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{4} end)
>   
>   f:cancel()
>   
> @@ -220,7 +231,7 @@ for i = 1, 2000 do _bucket:replace{i, vshard.consts.BUCKET.GARBAGE} s:replace{i,
>   #s:select{}
>   #s2:select{}
>   f = fiber.create(vshard.storage.internal.gc_bucket_f)
> -while _bucket:count() ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return _bucket:count() == 0 end)
>   _bucket:select{}
>   s:select{}
>   s2:select{}
> diff --git a/test/unit/garbage_errinj.result b/test/unit/garbage_errinj.result
> deleted file mode 100644
> index 92c8039..0000000
> --- a/test/unit/garbage_errinj.result
> +++ /dev/null
> @@ -1,223 +0,0 @@
> -test_run = require('test_run').new()
> ----
> -...
> -vshard = require('vshard')
> ----
> -...
> -fiber = require('fiber')
> ----
> -...
> -engine = test_run:get_cfg('engine')
> ----
> -...
> -vshard.storage.internal.shard_index = 'bucket_id'
> ----
> -...
> -format = {}
> ----
> -...
> -format[1] = {name = 'id', type = 'unsigned'}
> ----
> -...
> -format[2] = {name = 'status', type = 'string', is_nullable = true}
> ----
> -...
> -_bucket = box.schema.create_space('_bucket', {format = format})
> ----
> -...
> -_ = _bucket:create_index('pk')
> ----
> -...
> -_ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false})
> ----
> -...
> -_bucket:replace{1, vshard.consts.BUCKET.ACTIVE}
> ----
> -- [1, 'active']
> -...
> -_bucket:replace{2, vshard.consts.BUCKET.RECEIVING}
> ----
> -- [2, 'receiving']
> -...
> -_bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
> ----
> -- [3, 'active']
> -...
> -_bucket:replace{4, vshard.consts.BUCKET.SENT}
> ----
> -- [4, 'sent']
> -...
> -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> ----
> -- [5, 'garbage']
> -...
> -s = box.schema.create_space('test', {engine = engine})
> ----
> -...
> -pk = s:create_index('pk')
> ----
> -...
> -sk = s:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
> ----
> -...
> -s:replace{1, 1}
> ----
> -- [1, 1]
> -...
> -s:replace{2, 1}
> ----
> -- [2, 1]
> -...
> -s:replace{3, 2}
> ----
> -- [3, 2]
> -...
> -s:replace{4, 2}
> ----
> -- [4, 2]
> -...
> -s:replace{5, 100}
> ----
> -- [5, 100]
> -...
> -s:replace{6, 100}
> ----
> -- [6, 100]
> -...
> -s:replace{7, 4}
> ----
> -- [7, 4]
> -...
> -s:replace{8, 5}
> ----
> -- [8, 5]
> -...
> -s2 = box.schema.create_space('test2', {engine = engine})
> ----
> -...
> -pk2 = s2:create_index('pk')
> ----
> -...
> -sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
> ----
> -...
> -s2:replace{1, 1}
> ----
> -- [1, 1]
> -...
> -s2:replace{3, 3}
> ----
> -- [3, 3]
> -...
> -for i = 7, 1107 do s:replace{i, 200} end
> ----
> -...
> -s2:replace{4, 200}
> ----
> -- [4, 200]
> -...
> -s2:replace{5, 100}
> ----
> -- [5, 100]
> -...
> -s2:replace{5, 300}
> ----
> -- [5, 300]
> -...
> -s2:replace{6, 4}
> ----
> -- [6, 4]
> -...
> -s2:replace{7, 5}
> ----
> -- [7, 5]
> -...
> -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
> ----
> -...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> ----
> -- - 4
> -- true
> -...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> ----
> -- - 5
> -- true
> -...
> ---
> --- Test _bucket generation change during garbage buckets search.
> ---
> -s:truncate()
> ----
> -...
> -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
> ----
> -...
> -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = true
> ----
> -...
> -f = fiber.create(function() gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) end)
> ----
> -...
> -_bucket:replace{4, vshard.consts.BUCKET.GARBAGE}
> ----
> -- [4, 'garbage']
> -...
> -s:replace{5, 4}
> ----
> -- [5, 4]
> -...
> -s:replace{6, 4}
> ----
> -- [6, 4]
> -...
> -#s:select{}
> ----
> -- 2
> -...
> -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false
> ----
> -...
> -while f:status() ~= 'dead' do fiber.sleep(0.1) end
> ----
> -...
> --- Nothing is deleted - _bucket:replace() has changed _bucket
> --- generation during search of garbage buckets.
> -#s:select{}
> ----
> -- 2
> -...
> -_bucket:select{4}
> ----
> -- - [4, 'garbage']
> -...
> --- Next step deletes garbage ok.
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> ----
> -- []
> -- true
> -...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> ----
> -- - 4
> -  - 5
> -- true
> -...
> -#s:select{}
> ----
> -- 0
> -...
> -_bucket:delete{4}
> ----
> -- [4, 'garbage']
> -...
> -s2:drop()
> ----
> -...
> -s:drop()
> ----
> -...
> -_bucket:drop()
> ----
> -...
> diff --git a/test/unit/garbage_errinj.test.lua b/test/unit/garbage_errinj.test.lua
> deleted file mode 100644
> index 31184b9..0000000
> --- a/test/unit/garbage_errinj.test.lua
> +++ /dev/null
> @@ -1,73 +0,0 @@
> -test_run = require('test_run').new()
> -vshard = require('vshard')
> -fiber = require('fiber')
> -
> -engine = test_run:get_cfg('engine')
> -vshard.storage.internal.shard_index = 'bucket_id'
> -
> -format = {}
> -format[1] = {name = 'id', type = 'unsigned'}
> -format[2] = {name = 'status', type = 'string', is_nullable = true}
> -_bucket = box.schema.create_space('_bucket', {format = format})
> -_ = _bucket:create_index('pk')
> -_ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false})
> -_bucket:replace{1, vshard.consts.BUCKET.ACTIVE}
> -_bucket:replace{2, vshard.consts.BUCKET.RECEIVING}
> -_bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
> -_bucket:replace{4, vshard.consts.BUCKET.SENT}
> -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> -
> -s = box.schema.create_space('test', {engine = engine})
> -pk = s:create_index('pk')
> -sk = s:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
> -s:replace{1, 1}
> -s:replace{2, 1}
> -s:replace{3, 2}
> -s:replace{4, 2}
> -s:replace{5, 100}
> -s:replace{6, 100}
> -s:replace{7, 4}
> -s:replace{8, 5}
> -
> -s2 = box.schema.create_space('test2', {engine = engine})
> -pk2 = s2:create_index('pk')
> -sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
> -s2:replace{1, 1}
> -s2:replace{3, 3}
> -for i = 7, 1107 do s:replace{i, 200} end
> -s2:replace{4, 200}
> -s2:replace{5, 100}
> -s2:replace{5, 300}
> -s2:replace{6, 4}
> -s2:replace{7, 5}
> -
> -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> -
> ---
> --- Test _bucket generation change during garbage buckets search.
> ---
> -s:truncate()
> -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
> -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = true
> -f = fiber.create(function() gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) end)
> -_bucket:replace{4, vshard.consts.BUCKET.GARBAGE}
> -s:replace{5, 4}
> -s:replace{6, 4}
> -#s:select{}
> -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false
> -while f:status() ~= 'dead' do fiber.sleep(0.1) end
> --- Nothing is deleted - _bucket:replace() has changed _bucket
> --- generation during search of garbage buckets.
> -#s:select{}
> -_bucket:select{4}
> --- Next step deletes garbage ok.
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> -#s:select{}
> -_bucket:delete{4}
> -
> -s2:drop()
> -s:drop()
> -_bucket:drop()
> diff --git a/vshard/cfg.lua b/vshard/cfg.lua
> index 28c3400..1345058 100644
> --- a/vshard/cfg.lua
> +++ b/vshard/cfg.lua
> @@ -245,9 +245,7 @@ local cfg_template = {
>           max = consts.REBALANCER_MAX_SENDING_MAX
>       },
>       collect_bucket_garbage_interval = {
> -        type = 'positive number', name = 'Garbage bucket collect interval',
> -        is_optional = true,
> -        default = consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL
> +        name = 'Garbage bucket collect interval', is_deprecated = true,
>       },
>       collect_lua_garbage = {
>           type = 'boolean', name = 'Garbage Lua collect necessity',
> diff --git a/vshard/consts.lua b/vshard/consts.lua
> index 8c2a8b0..3f1585a 100644
> --- a/vshard/consts.lua
> +++ b/vshard/consts.lua
> @@ -23,6 +23,7 @@ return {
>       DEFAULT_BUCKET_COUNT = 3000;
>       BUCKET_SENT_GARBAGE_DELAY = 0.5;
>       BUCKET_CHUNK_SIZE = 1000;
> +    LUA_CHUNK_SIZE = 100000,
>       DEFAULT_REBALANCER_DISBALANCE_THRESHOLD = 1;
>       REBALANCER_IDLE_INTERVAL = 60 * 60;
>       REBALANCER_WORK_INTERVAL = 10;
> @@ -37,7 +38,7 @@ return {
>       DEFAULT_FAILOVER_PING_TIMEOUT = 5;
>       DEFAULT_SYNC_TIMEOUT = 1;
>       RECONNECT_TIMEOUT = 0.5;
> -    DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5;
> +    GC_BACKOFF_INTERVAL = 5,
>       RECOVERY_INTERVAL = 5;
>       COLLECT_LUA_GARBAGE_INTERVAL = 100;
>   
> @@ -45,4 +46,6 @@ return {
>       DISCOVERY_WORK_INTERVAL = 1,
>       DISCOVERY_WORK_STEP = 0.01,
>       DISCOVERY_TIMEOUT = 10,
> +
> +    TIMEOUT_INFINITY = 500 * 365 * 86400,
>   }
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 298df71..31a6fc7 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -69,7 +69,6 @@ if not M then
>           total_bucket_count = 0,
>           errinj = {
>               ERRINJ_CFG = false,
> -            ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
>               ERRINJ_RELOAD = false,
>               ERRINJ_CFG_DELAY = false,
>               ERRINJ_LONG_RECEIVE = false,
> @@ -96,6 +95,8 @@ if not M then
>           -- detect that _bucket was not changed between yields.
>           --
>           bucket_generation = 0,
> +        -- Condition variable fired on generation update.
> +        bucket_generation_cond = lfiber.cond(),
>           --
>           -- Reference to the function used as on_replace trigger on
>           -- _bucket space. It is used to replace the trigger with
> @@ -107,12 +108,14 @@ if not M then
>           -- replace the old function is to keep its reference.
>           --
>           bucket_on_replace = nil,
> +        -- Redirects for recently sent buckets. They are kept for a while to
> +        -- help routers to find a new location for sent and deleted buckets
> +        -- without whole cluster scan.
> +        route_map = {},
>   
>           ------------------- Garbage collection -------------------
>           -- Fiber to remove garbage buckets data.
>           collect_bucket_garbage_fiber = nil,
> -        -- Do buckets garbage collection once per this time.
> -        collect_bucket_garbage_interval = nil,
>           -- Boolean lua_gc state (create periodic gc task).
>           collect_lua_garbage = nil,
>   
> @@ -173,6 +176,7 @@ end
>   --
>   local function bucket_generation_increment()
>       M.bucket_generation = M.bucket_generation + 1
> +    M.bucket_generation_cond:broadcast()
>   end
>   
>   --
> @@ -758,8 +762,9 @@ local function bucket_check_state(bucket_id, mode)
>       else
>           return bucket
>       end
> +    local dst = bucket and bucket.destination or M.route_map[bucket_id]
>       return bucket, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, reason,
> -                                 bucket and bucket.destination)
> +                                 dst)
>   end
>   
>   --
> @@ -804,11 +809,23 @@ end
>   --
>   local function bucket_unrefro(bucket_id)
>       local ref = M.bucket_refs[bucket_id]
> -    if not ref or ref.ro == 0 then
> +    local count = ref and ref.ro or 0
> +    if count == 0 then
>           return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id,
>                                     "no refs", nil)
>       end
> -    ref.ro = ref.ro - 1
> +    if count == 1 then
> +        ref.ro = 0
> +        if ref.ro_lock then
> +            -- Garbage collector is waiting for the bucket if RO
> +            -- is locked. Let it know it has one more bucket to
> +            -- collect. It relies on generation, so its increment
> +            -- it enough.
> +            bucket_generation_increment()
> +        end
> +        return true
> +    end
> +    ref.ro = count - 1
>       return true
>   end
>   
> @@ -1479,79 +1496,44 @@ local function gc_bucket_in_space(space, bucket_id, status)
>   end
>   
>   --
> --- Remove tuples from buckets of a specified type.
> --- @param type Type of buckets to gc.
> --- @retval List of ids of empty buckets of the type.
> +-- Drop buckets with the given status along with their data in all spaces.
> +-- @param status Status of target buckets.
> +-- @param route_map Destinations of deleted buckets are saved into this table.
>   --
> -local function gc_bucket_step_by_type(type)
> -    local sharded_spaces = find_sharded_spaces()
> -    local empty_buckets = {}
> +local function gc_bucket_drop_xc(status, route_map)
>       local limit = consts.BUCKET_CHUNK_SIZE
> -    local is_all_collected = true
> -    for _, bucket in box.space._bucket.index.status:pairs(type) do
> -        local bucket_id = bucket.id
> -        local ref = M.bucket_refs[bucket_id]
> +    local _bucket = box.space._bucket
> +    local sharded_spaces = find_sharded_spaces()
> +    for _, b in _bucket.index.status:pairs(status) do
> +        local id = b.id
> +        local ref = M.bucket_refs[id]
>           if ref then
>               assert(ref.rw == 0)
>               if ref.ro ~= 0 then
>                   ref.ro_lock = true
> -                is_all_collected = false
>                   goto continue
>               end
> -            M.bucket_refs[bucket_id] = nil
> +            M.bucket_refs[id] = nil
>           end
>           for _, space in pairs(sharded_spaces) do
> -            gc_bucket_in_space_xc(space, bucket_id, type)
> +            gc_bucket_in_space_xc(space, id, status)
>               limit = limit - 1
>               if limit == 0 then
>                   lfiber.sleep(0)
>                   limit = consts.BUCKET_CHUNK_SIZE
>               end
>           end
> -        table.insert(empty_buckets, bucket.id)
> -::continue::
> +        route_map[id] = b.destination
> +        _bucket:delete{id}
> +    ::continue::
>       end
> -    return empty_buckets, is_all_collected
> -end
> -
> ---
> --- Drop buckets with ids in the list.
> --- @param bucket_ids Bucket ids to drop.
> --- @param status Expected bucket status.
> ---
> -local function gc_bucket_drop_xc(bucket_ids, status)
> -    if #bucket_ids == 0 then
> -        return
> -    end
> -    local limit = consts.BUCKET_CHUNK_SIZE
> -    box.begin()
> -    local _bucket = box.space._bucket
> -    for _, id in pairs(bucket_ids) do
> -        local bucket_exists = _bucket:get{id} ~= nil
> -        local b = _bucket:get{id}
> -        if b then
> -            if b.status ~= status then
> -                return error(string.format('Bucket %d status is changed. Was '..
> -                                           '%s, became %s', id, status,
> -                                           b.status))
> -            end
> -            _bucket:delete{id}
> -        end
> -        limit = limit - 1
> -        if limit == 0 then
> -            box.commit()
> -            box.begin()
> -            limit = consts.BUCKET_CHUNK_SIZE
> -        end
> -    end
> -    box.commit()
>   end
>   
>   --
>   -- Exception safe version of gc_bucket_drop_xc.
>   --
> -local function gc_bucket_drop(bucket_ids, status)
> -    local status, err = pcall(gc_bucket_drop_xc, bucket_ids, status)
> +local function gc_bucket_drop(status, route_map)
> +    local status, err = pcall(gc_bucket_drop_xc, status, route_map)
>       if not status then
>           box.rollback()
>       end
> @@ -1578,65 +1560,75 @@ function gc_bucket_f()
>       -- generation == bucket generation. In such a case the fiber
>       -- does nothing until next _bucket change.
>       local bucket_generation_collected = -1
> -    -- Empty sent buckets are collected into an array. After a
> -    -- specified time interval the buckets are deleted both from
> -    -- this array and from _bucket space.
> -    local buckets_for_redirect = {}
> -    local buckets_for_redirect_ts = clock()
> -    -- Empty sent buckets, updated after each step, and when
> -    -- buckets_for_redirect is deleted, it gets empty_sent_buckets
> -    -- for next deletion.
> -    local empty_garbage_buckets, empty_sent_buckets, status, err
> +    local bucket_generation_current = M.bucket_generation
> +    -- Deleted buckets are saved into a route map to redirect routers if they
> +    -- didn't discover new location of the buckets yet. However route map does
> +    -- not grow infinitely. Otherwise it would end up storing redirects for all
> +    -- buckets in the cluster. Which could also be outdated.
> +    -- Garbage collector periodically drops old routes from the map. For that it
> +    -- remembers state of route map in one moment, and after a while clears the
> +    -- remembered routes from the global route map.
> +    local route_map = M.route_map
> +    local route_map_old = {}
> +    local route_map_deadline = 0
> +    local status, err
>       while M.module_version == module_version do
> -        -- Check if no changes in buckets configuration.
> -        if bucket_generation_collected ~= M.bucket_generation then
> -            local bucket_generation = M.bucket_generation
> -            local is_sent_collected, is_garbage_collected
> -            status, empty_garbage_buckets, is_garbage_collected =
> -                pcall(gc_bucket_step_by_type, consts.BUCKET.GARBAGE)
> -            if not status then
> -                err = empty_garbage_buckets
> -                goto check_error
> -            end
> -            status, empty_sent_buckets, is_sent_collected =
> -                pcall(gc_bucket_step_by_type, consts.BUCKET.SENT)
> -            if not status then
> -                err = empty_sent_buckets
> -                goto check_error
> +        if bucket_generation_collected ~= bucket_generation_current then
> +            status, err = gc_bucket_drop(consts.BUCKET.GARBAGE, route_map)
> +            if status then
> +                status, err = gc_bucket_drop(consts.BUCKET.SENT, route_map)
>               end
> -            status, err = gc_bucket_drop(empty_garbage_buckets,
> -                                         consts.BUCKET.GARBAGE)
> -::check_error::
>               if not status then
>                   box.rollback()
>                   log.error('Error during garbage collection step: %s', err)
> -                goto continue
> +            else
> +                -- Don't use global generation. During the collection it could
> +                -- already change. Instead, remember the generation known before
> +                -- the collection has started.
> +                -- Since the collection also changes the generation, it makes
> +                -- the GC happen always at least twice. But typically on the
> +                -- second iteration it should not find any buckets to collect,
> +                -- and then the collected generation matches the global one.
> +                bucket_generation_collected = bucket_generation_current
>               end
> -            if is_sent_collected and is_garbage_collected then
> -                bucket_generation_collected = bucket_generation
> +        else
> +            status = true
> +        end
> +
> +        local sleep_time = route_map_deadline - clock()
> +        if sleep_time <= 0 then
> +            local chunk = consts.LUA_CHUNK_SIZE
> +            util.table_minus_yield(route_map, route_map_old, chunk)
> +            route_map_old = util.table_copy_yield(route_map, chunk)
> +            if next(route_map_old) then
> +                sleep_time = consts.BUCKET_SENT_GARBAGE_DELAY
> +            else
> +                sleep_time = consts.TIMEOUT_INFINITY
>               end
> +            route_map_deadline = clock() + sleep_time
>           end
> +        bucket_generation_current = M.bucket_generation
>   
> -        if clock() - buckets_for_redirect_ts >=
> -           consts.BUCKET_SENT_GARBAGE_DELAY then
> -            status, err = gc_bucket_drop(buckets_for_redirect,
> -                                         consts.BUCKET.SENT)
> -            if not status then
> -                buckets_for_redirect = {}
> -                empty_sent_buckets = {}
> -                bucket_generation_collected = -1
> -                log.error('Error during deletion of empty sent buckets: %s',
> -                          err)
> -            elseif M.module_version ~= module_version then
> -                return
> +        if bucket_generation_current ~= bucket_generation_collected then
> +            -- Generation was changed during collection. Or *by* collection.
> +            if status then
> +                -- Retry immediately. If the generation was changed by the
> +                -- collection itself, it will notice it next iteration, and go
> +                -- to proper sleep.
> +                sleep_time = 0
>               else
> -                buckets_for_redirect = empty_sent_buckets or {}
> -                empty_sent_buckets = nil
> -                buckets_for_redirect_ts = clock()
> +                -- An error happened during the collection. Does not make sense
> +                -- to retry on each iteration of the event loop. The most likely
> +                -- errors are either a WAL error or a transaction abort - both
> +                -- look like an issue in the user's code and can't be fixed
> +                -- quickly anyway. Backoff.
> +                sleep_time = consts.GC_BACKOFF_INTERVAL
>               end
>           end
> -::continue::
> -        lfiber.sleep(M.collect_bucket_garbage_interval)
> +
> +        if M.module_version == module_version then
> +            M.bucket_generation_cond:wait(sleep_time)
> +        end
>       end
>   end
>   
> @@ -2421,8 +2413,6 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
>           vshard_cfg.rebalancer_disbalance_threshold
>       M.rebalancer_receiving_quota = vshard_cfg.rebalancer_max_receiving
>       M.shard_index = vshard_cfg.shard_index
> -    M.collect_bucket_garbage_interval =
> -        vshard_cfg.collect_bucket_garbage_interval
>       M.collect_lua_garbage = vshard_cfg.collect_lua_garbage
>       M.rebalancer_worker_count = vshard_cfg.rebalancer_max_sending
>       M.current_cfg = cfg
> @@ -2676,6 +2666,9 @@ else
>           storage_cfg(M.current_cfg, M.this_replica.uuid, true)
>       end
>       M.module_version = M.module_version + 1
> +    -- Background fibers could sleep waiting for bucket changes.
> +    -- Let them know it is time to reload.
> +    bucket_generation_increment()
>   end
>   
>   M.recovery_f = recovery_f
> @@ -2686,7 +2679,7 @@ M.gc_bucket_f = gc_bucket_f
>   -- These functions are saved in M not for atomic reload, but for
>   -- unit testing.
>   --
> -M.gc_bucket_step_by_type = gc_bucket_step_by_type
> +M.gc_bucket_drop = gc_bucket_drop
>   M.rebalancer_build_routes = rebalancer_build_routes
>   M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
>   M.cached_find_sharded_spaces = find_sharded_spaces
> diff --git a/vshard/storage/reload_evolution.lua b/vshard/storage/reload_evolution.lua
> index f38af74..484f499 100644
> --- a/vshard/storage/reload_evolution.lua
> +++ b/vshard/storage/reload_evolution.lua
> @@ -4,6 +4,7 @@
>   -- in a commit.
>   --
>   local log = require('log')
> +local fiber = require('fiber')
>   
>   --
>   -- Array of upgrade functions.
> @@ -25,6 +26,13 @@ migrations[#migrations + 1] = function(M)
>       end
>   end
>   
> +migrations[#migrations + 1] = function(M)
> +    if not M.route_map then
> +        M.bucket_generation_cond = fiber.cond()
> +        M.route_map = {}
> +    end
> +end
> +
>   --
>   -- Perform an update based on a version stored in `M` (internals).
>   -- @param M Old module internals which should be updated.


More information about the Tarantool-patches mailing list