[Tarantool-patches] [PATCH 7/9] gc: introduce reactive garbage collector

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Feb 10 02:46:13 MSK 2021


Garbage collector is a fiber on a master node which deletes
GARBAGE and SENT buckets along with their data.

It was proactive. It used to wakeup with a constant period to
find and delete the needed buckets.

But this won't work with the future feature called 'map-reduce'.
Map-reduce as a preparation stage will need to ensure that all
buckets on a storage are readable and writable. With the current
GC algorithm if a bucket is sent, it won't be deleted for the next
5 seconds by default. During this time all new map-reduce requests
can't execute.

This is not acceptable. As well as too frequent wakeup of GC fiber
because it would waste TX thread time.

The patch makes GC fiber wakeup not by a timeout but by events
happening with _bucket space. GC fiber sleeps on a condition
variable which is signaled when _bucket is changed.

Once GC sees work to do, it won't sleep until it is done. It will
only yield.

This makes GC delete SENT and GARBAGE buckets as soon as possible
reducing the waiting time for the incoming map-reduce requests.

Needed for #147

@TarantoolBot document
Title: VShard: deprecate cfg option 'collect_bucket_garbage_interval'
It was used to specify the interval between bucket garbage
collection steps. It was needed because garbage collection in
vshard was proactive. It didn't react to newly appeared garbage
buckets immediately.

Since now (0.1.17) garbage collection became reactive. It starts
working with garbage buckets immediately as they appear. And
sleeps rest of the time. The option is not used now and does not
affect behaviour of anything.

I suppose it can be deleted from the documentation. Or left with
a big label 'deprecated' + the explanation above.

An attempt to use the option does not cause an error, but logs a
warning.
---
 test/lua_libs/storage_template.lua        |   1 -
 test/misc/reconfigure.result              |  10 -
 test/misc/reconfigure.test.lua            |   3 -
 test/rebalancer/bucket_ref.result         |  12 --
 test/rebalancer/bucket_ref.test.lua       |   3 -
 test/rebalancer/errinj.result             |  11 --
 test/rebalancer/errinj.test.lua           |   5 -
 test/rebalancer/receiving_bucket.result   |   8 -
 test/rebalancer/receiving_bucket.test.lua |   1 -
 test/reload_evolution/storage.result      |   2 +-
 test/router/reroute_wrong_bucket.result   |   8 +-
 test/router/reroute_wrong_bucket.test.lua |   4 +-
 test/storage/recovery.result              |   3 +-
 test/storage/storage.result               |  10 +-
 test/storage/storage.test.lua             |   1 +
 test/unit/config.result                   |  35 +---
 test/unit/config.test.lua                 |  16 +-
 test/unit/garbage.result                  | 106 ++++++----
 test/unit/garbage.test.lua                |  47 +++--
 test/unit/garbage_errinj.result           | 223 ----------------------
 test/unit/garbage_errinj.test.lua         |  73 -------
 vshard/cfg.lua                            |   4 +-
 vshard/consts.lua                         |   5 +-
 vshard/storage/init.lua                   | 207 ++++++++++----------
 vshard/storage/reload_evolution.lua       |   8 +
 25 files changed, 233 insertions(+), 573 deletions(-)
 delete mode 100644 test/unit/garbage_errinj.result
 delete mode 100644 test/unit/garbage_errinj.test.lua

diff --git a/test/lua_libs/storage_template.lua b/test/lua_libs/storage_template.lua
index 21409bd..8df89f6 100644
--- a/test/lua_libs/storage_template.lua
+++ b/test/lua_libs/storage_template.lua
@@ -172,6 +172,5 @@ function wait_bucket_is_collected(id)
             return true
         end
         vshard.storage.recovery_wakeup()
-        vshard.storage.garbage_collector_wakeup()
     end)
 end
diff --git a/test/misc/reconfigure.result b/test/misc/reconfigure.result
index 168be5d..3b34841 100644
--- a/test/misc/reconfigure.result
+++ b/test/misc/reconfigure.result
@@ -83,9 +83,6 @@ cfg.collect_lua_garbage = true
 cfg.rebalancer_max_receiving = 1000
 ---
 ...
-cfg.collect_bucket_garbage_interval = 100
----
-...
 cfg.invalid_option = 'kek'
 ---
 ...
@@ -105,10 +102,6 @@ vshard.storage.internal.rebalancer_max_receiving ~= 1000
 ---
 - true
 ...
-vshard.storage.internal.collect_bucket_garbage_interval ~= 100
----
-- true
-...
 cfg.sync_timeout = nil
 ---
 ...
@@ -118,9 +111,6 @@ cfg.collect_lua_garbage = nil
 cfg.rebalancer_max_receiving = nil
 ---
 ...
-cfg.collect_bucket_garbage_interval = nil
----
-...
 cfg.invalid_option = nil
 ---
 ...
diff --git a/test/misc/reconfigure.test.lua b/test/misc/reconfigure.test.lua
index e891010..348628c 100644
--- a/test/misc/reconfigure.test.lua
+++ b/test/misc/reconfigure.test.lua
@@ -33,17 +33,14 @@ vshard.storage.internal.sync_timeout
 cfg.sync_timeout = 100
 cfg.collect_lua_garbage = true
 cfg.rebalancer_max_receiving = 1000
-cfg.collect_bucket_garbage_interval = 100
 cfg.invalid_option = 'kek'
 vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a)
 not vshard.storage.internal.collect_lua_garbage
 vshard.storage.internal.sync_timeout
 vshard.storage.internal.rebalancer_max_receiving ~= 1000
-vshard.storage.internal.collect_bucket_garbage_interval ~= 100
 cfg.sync_timeout = nil
 cfg.collect_lua_garbage = nil
 cfg.rebalancer_max_receiving = nil
-cfg.collect_bucket_garbage_interval = nil
 cfg.invalid_option = nil
 
 --
diff --git a/test/rebalancer/bucket_ref.result b/test/rebalancer/bucket_ref.result
index b8fc7ff..9df7480 100644
--- a/test/rebalancer/bucket_ref.result
+++ b/test/rebalancer/bucket_ref.result
@@ -184,9 +184,6 @@ vshard.storage.bucket_unref(1, 'read')
 - true
 ...
 -- Force GC to take an RO lock on the bucket now.
-vshard.storage.garbage_collector_wakeup()
----
-...
 vshard.storage.buckets_info(1)
 ---
 - 1:
@@ -203,7 +200,6 @@ while true do
     if i.status == vshard.consts.BUCKET.GARBAGE and i.ro_lock then
         break
     end
-    vshard.storage.garbage_collector_wakeup()
     fiber.sleep(0.01)
 end;
 ---
@@ -235,14 +231,6 @@ finish_refs = true
 while f1:status() ~= 'dead' do fiber.sleep(0.01) end
 ---
 ...
-vshard.storage.buckets_info(1)
----
-- 1:
-    status: garbage
-    ro_lock: true
-    destination: <replicaset_2>
-    id: 1
-...
 wait_bucket_is_collected(1)
 ---
 ...
diff --git a/test/rebalancer/bucket_ref.test.lua b/test/rebalancer/bucket_ref.test.lua
index 213ced3..1b032ff 100644
--- a/test/rebalancer/bucket_ref.test.lua
+++ b/test/rebalancer/bucket_ref.test.lua
@@ -56,7 +56,6 @@ vshard.storage.bucket_unref(1, 'write') -- Error, no refs.
 vshard.storage.bucket_ref(1, 'read')
 vshard.storage.bucket_unref(1, 'read')
 -- Force GC to take an RO lock on the bucket now.
-vshard.storage.garbage_collector_wakeup()
 vshard.storage.buckets_info(1)
 _ = test_run:cmd("setopt delimiter ';'")
 while true do
@@ -64,7 +63,6 @@ while true do
     if i.status == vshard.consts.BUCKET.GARBAGE and i.ro_lock then
         break
     end
-    vshard.storage.garbage_collector_wakeup()
     fiber.sleep(0.01)
 end;
 _ = test_run:cmd("setopt delimiter ''");
@@ -72,7 +70,6 @@ vshard.storage.buckets_info(1)
 vshard.storage.bucket_refro(1)
 finish_refs = true
 while f1:status() ~= 'dead' do fiber.sleep(0.01) end
-vshard.storage.buckets_info(1)
 wait_bucket_is_collected(1)
 _ = test_run:switch('box_2_a')
 vshard.storage.buckets_info(1)
diff --git a/test/rebalancer/errinj.result b/test/rebalancer/errinj.result
index e50eb72..0ddb1c9 100644
--- a/test/rebalancer/errinj.result
+++ b/test/rebalancer/errinj.result
@@ -226,17 +226,6 @@ ret2, err2
 - true
 - null
 ...
-_bucket:get{35}
----
-- [35, 'sent', '<replicaset_2>']
-...
-_bucket:get{36}
----
-- [36, 'sent', '<replicaset_2>']
-...
--- Buckets became 'active' on box_2_a, but still are sending on
--- box_1_a. Wait until it is marked as garbage on box_1_a by the
--- recovery fiber.
 wait_bucket_is_collected(35)
 ---
 ...
diff --git a/test/rebalancer/errinj.test.lua b/test/rebalancer/errinj.test.lua
index 2cc4a69..a60f3d7 100644
--- a/test/rebalancer/errinj.test.lua
+++ b/test/rebalancer/errinj.test.lua
@@ -102,11 +102,6 @@ _ = test_run:switch('box_1_a')
 while f1:status() ~= 'dead' or f2:status() ~= 'dead' do fiber.sleep(0.001) end
 ret1, err1
 ret2, err2
-_bucket:get{35}
-_bucket:get{36}
--- Buckets became 'active' on box_2_a, but still are sending on
--- box_1_a. Wait until it is marked as garbage on box_1_a by the
--- recovery fiber.
 wait_bucket_is_collected(35)
 wait_bucket_is_collected(36)
 _ = test_run:switch('box_2_a')
diff --git a/test/rebalancer/receiving_bucket.result b/test/rebalancer/receiving_bucket.result
index 7d3612b..ad93445 100644
--- a/test/rebalancer/receiving_bucket.result
+++ b/test/rebalancer/receiving_bucket.result
@@ -366,14 +366,6 @@ vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3})
 ---
 - true
 ...
-vshard.storage.buckets_info(1)
----
-- 1:
-    status: sent
-    ro_lock: true
-    destination: <replicaset_1>
-    id: 1
-...
 wait_bucket_is_collected(1)
 ---
 ...
diff --git a/test/rebalancer/receiving_bucket.test.lua b/test/rebalancer/receiving_bucket.test.lua
index 24534b3..2cf6382 100644
--- a/test/rebalancer/receiving_bucket.test.lua
+++ b/test/rebalancer/receiving_bucket.test.lua
@@ -136,7 +136,6 @@ box.space.test3:select{100}
 -- Now the bucket is unreferenced and can be transferred.
 _ = test_run:switch('box_2_a')
 vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3})
-vshard.storage.buckets_info(1)
 wait_bucket_is_collected(1)
 vshard.storage.buckets_info(1)
 _ = test_run:switch('box_1_a')
diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result
index 753687f..9d30a04 100644
--- a/test/reload_evolution/storage.result
+++ b/test/reload_evolution/storage.result
@@ -92,7 +92,7 @@ test_run:grep_log('storage_2_a', 'vshard.storage.reload_evolution: upgraded to')
 ...
 vshard.storage.internal.reload_version
 ---
-- 2
+- 3
 ...
 --
 -- gh-237: should be only one trigger. During gh-237 the trigger installation
diff --git a/test/router/reroute_wrong_bucket.result b/test/router/reroute_wrong_bucket.result
index 049bdef..ac340eb 100644
--- a/test/router/reroute_wrong_bucket.result
+++ b/test/router/reroute_wrong_bucket.result
@@ -37,7 +37,7 @@ test_run:switch('storage_1_a')
 ---
 - true
 ...
-cfg.collect_bucket_garbage_interval = 100
+vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
 ---
 ...
 vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a)
@@ -53,7 +53,7 @@ test_run:switch('storage_2_a')
 ---
 - true
 ...
-cfg.collect_bucket_garbage_interval = 100
+vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
 ---
 ...
 vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a)
@@ -202,12 +202,12 @@ test_run:grep_log('router_1', 'please update configuration')
 err
 ---
 - bucket_id: 100
-  reason: write is prohibited
+  reason: Not found
   code: 1
   destination: ac522f65-aa94-4134-9f64-51ee384f1a54
   type: ShardingError
   name: WRONG_BUCKET
-  message: 'Cannot perform action with bucket 100, reason: write is prohibited'
+  message: 'Cannot perform action with bucket 100, reason: Not found'
 ...
 --
 -- Now try again, but update configuration during call(). It must
diff --git a/test/router/reroute_wrong_bucket.test.lua b/test/router/reroute_wrong_bucket.test.lua
index 9e6e804..207aac3 100644
--- a/test/router/reroute_wrong_bucket.test.lua
+++ b/test/router/reroute_wrong_bucket.test.lua
@@ -11,13 +11,13 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt
 test_run:cmd('create server router_1 with script="router/router_1.lua"')
 test_run:cmd('start server router_1')
 test_run:switch('storage_1_a')
-cfg.collect_bucket_garbage_interval = 100
+vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
 vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a)
 vshard.storage.rebalancer_disable()
 for i = 1, 100 do box.space._bucket:replace{i, vshard.consts.BUCKET.ACTIVE} end
 
 test_run:switch('storage_2_a')
-cfg.collect_bucket_garbage_interval = 100
+vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
 vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a)
 vshard.storage.rebalancer_disable()
 for i = 101, 200 do box.space._bucket:replace{i, vshard.consts.BUCKET.ACTIVE} end
diff --git a/test/storage/recovery.result b/test/storage/recovery.result
index f833fe7..8ccb0b9 100644
--- a/test/storage/recovery.result
+++ b/test/storage/recovery.result
@@ -79,8 +79,7 @@ _bucket = box.space._bucket
 ...
 _bucket:select{}
 ---
-- - [2, 'garbage', '<replicaset_2>']
-  - [3, 'garbage', '<replicaset_2>']
+- []
 ...
 _ = test_run:switch('storage_2_a')
 ---
diff --git a/test/storage/storage.result b/test/storage/storage.result
index 424bc4c..0550ad1 100644
--- a/test/storage/storage.result
+++ b/test/storage/storage.result
@@ -547,6 +547,9 @@ vshard.storage.bucket_send(1, util.replicasets[2])
 ---
 - true
 ...
+wait_bucket_is_collected(1)
+---
+...
 _ = test_run:switch("storage_2_a")
 ---
 ...
@@ -567,12 +570,7 @@ _ = test_run:switch("storage_1_a")
 ...
 vshard.storage.buckets_info()
 ---
-- 1:
-    status: sent
-    ro_lock: true
-    destination: <replicaset_2>
-    id: 1
-  2:
+- 2:
     status: active
     id: 2
 ...
diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua
index d631b51..d8fbd94 100644
--- a/test/storage/storage.test.lua
+++ b/test/storage/storage.test.lua
@@ -136,6 +136,7 @@ vshard.storage.bucket_send(1, util.replicasets[1])
 
 -- Successful transfer.
 vshard.storage.bucket_send(1, util.replicasets[2])
+wait_bucket_is_collected(1)
 _ = test_run:switch("storage_2_a")
 vshard.storage.buckets_info()
 _ = test_run:switch("storage_1_a")
diff --git a/test/unit/config.result b/test/unit/config.result
index dfd0219..e0b2482 100644
--- a/test/unit/config.result
+++ b/test/unit/config.result
@@ -428,33 +428,6 @@ _ = lcfg.check(cfg)
 --
 -- gh-77: garbage collection options.
 --
-cfg.collect_bucket_garbage_interval = 'str'
----
-...
-check(cfg)
----
-- Garbage bucket collect interval must be positive number
-...
-cfg.collect_bucket_garbage_interval = 0
----
-...
-check(cfg)
----
-- Garbage bucket collect interval must be positive number
-...
-cfg.collect_bucket_garbage_interval = -1
----
-...
-check(cfg)
----
-- Garbage bucket collect interval must be positive number
-...
-cfg.collect_bucket_garbage_interval = 100.5
----
-...
-_ = lcfg.check(cfg)
----
-...
 cfg.collect_lua_garbage = 100
 ---
 ...
@@ -615,6 +588,12 @@ lcfg.check(cfg).rebalancer_max_sending
 cfg.rebalancer_max_sending = nil
 ---
 ...
-cfg.sharding = nil
+--
+-- Deprecated option does not break anything.
+--
+cfg.collect_bucket_garbage_interval = 100
+---
+...
+_ = lcfg.check(cfg)
 ---
 ...
diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua
index ada43db..a1c9f07 100644
--- a/test/unit/config.test.lua
+++ b/test/unit/config.test.lua
@@ -175,15 +175,6 @@ _ = lcfg.check(cfg)
 --
 -- gh-77: garbage collection options.
 --
-cfg.collect_bucket_garbage_interval = 'str'
-check(cfg)
-cfg.collect_bucket_garbage_interval = 0
-check(cfg)
-cfg.collect_bucket_garbage_interval = -1
-check(cfg)
-cfg.collect_bucket_garbage_interval = 100.5
-_ = lcfg.check(cfg)
-
 cfg.collect_lua_garbage = 100
 check(cfg)
 cfg.collect_lua_garbage = true
@@ -244,4 +235,9 @@ util.check_error(lcfg.check, cfg)
 cfg.rebalancer_max_sending = 15
 lcfg.check(cfg).rebalancer_max_sending
 cfg.rebalancer_max_sending = nil
-cfg.sharding = nil
+
+--
+-- Deprecated option does not break anything.
+--
+cfg.collect_bucket_garbage_interval = 100
+_ = lcfg.check(cfg)
diff --git a/test/unit/garbage.result b/test/unit/garbage.result
index 74d9ccf..a530496 100644
--- a/test/unit/garbage.result
+++ b/test/unit/garbage.result
@@ -31,9 +31,6 @@ test_run:cmd("setopt delimiter ''");
 vshard.storage.internal.shard_index = 'bucket_id'
 ---
 ...
-vshard.storage.internal.collect_bucket_garbage_interval = vshard.consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL
----
-...
 --
 -- Find nothing if no bucket_id anywhere, or there is no index
 -- by it, or bucket_id is not unsigned.
@@ -151,6 +148,9 @@ format[1] = {name = 'id', type = 'unsigned'}
 format[2] = {name = 'status', type = 'string'}
 ---
 ...
+format[3] = {name = 'destination', type = 'string', is_nullable = true}
+---
+...
 _bucket = box.schema.create_space('_bucket', {format = format})
 ---
 ...
@@ -172,22 +172,6 @@ _bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
 ---
 - [3, 'active']
 ...
-_bucket:replace{4, vshard.consts.BUCKET.SENT}
----
-- [4, 'sent']
-...
-_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
----
-- [5, 'garbage']
-...
-_bucket:replace{6, vshard.consts.BUCKET.GARBAGE}
----
-- [6, 'garbage']
-...
-_bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
----
-- [200, 'garbage']
-...
 s = box.schema.create_space('test', {engine = engine})
 ---
 ...
@@ -213,7 +197,7 @@ s:replace{4, 2}
 ---
 - [4, 2]
 ...
-gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
+gc_bucket_drop = vshard.storage.internal.gc_bucket_drop
 ---
 ...
 s2 = box.schema.create_space('test2', {engine = engine})
@@ -249,6 +233,10 @@ function fill_spaces_with_garbage()
     s2:replace{6, 4}
     s2:replace{7, 5}
     s2:replace{7, 6}
+    _bucket:replace{4, vshard.consts.BUCKET.SENT, 'destination1'}
+    _bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
+    _bucket:replace{6, vshard.consts.BUCKET.GARBAGE, 'destination2'}
+    _bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
 end;
 ---
 ...
@@ -267,12 +255,22 @@ fill_spaces_with_garbage()
 ---
 - 1107
 ...
-gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
+route_map = {}
+---
+...
+gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
 ---
-- - 5
-  - 6
-  - 200
 - true
+- null
+...
+route_map
+---
+- - null
+  - null
+  - null
+  - null
+  - null
+  - destination2
 ...
 #s2:select{}
 ---
@@ -282,10 +280,20 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
 ---
 - 7
 ...
-gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
+route_map = {}
+---
+...
+gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
 ---
-- - 4
 - true
+- null
+...
+route_map
+---
+- - null
+  - null
+  - null
+  - destination1
 ...
 s2:select{}
 ---
@@ -303,17 +311,22 @@ s:select{}
   - [6, 100]
 ...
 -- Nothing deleted - update collected generation.
-gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
+route_map = {}
+---
+...
+gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
 ---
-- - 5
-  - 6
-  - 200
 - true
+- null
 ...
-gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
+gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
 ---
-- - 4
 - true
+- null
+...
+route_map
+---
+- []
 ...
 #s2:select{}
 ---
@@ -329,15 +342,20 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
 fill_spaces_with_garbage()
 ---
 ...
-_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
+_ = _bucket:on_replace(function()                                               \
+    local gen = vshard.storage.internal.bucket_generation                       \
+    vshard.storage.internal.bucket_generation = gen + 1                         \
+    vshard.storage.internal.bucket_generation_cond:broadcast()                  \
+end)
 ---
 ...
 f = fiber.create(vshard.storage.internal.gc_bucket_f)
 ---
 ...
 -- Wait until garbage collection is finished.
-while s2:count() ~= 3 or s:count() ~= 6 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_cond(function() return s2:count() == 3 and s:count() == 6 end)
 ---
+- true
 ...
 s:select{}
 ---
@@ -360,7 +378,6 @@ _bucket:select{}
 - - [1, 'active']
   - [2, 'receiving']
   - [3, 'active']
-  - [4, 'sent']
 ...
 --
 -- Test deletion of 'sent' buckets after a specified timeout.
@@ -370,8 +387,9 @@ _bucket:replace{2, vshard.consts.BUCKET.SENT}
 - [2, 'sent']
 ...
 -- Wait deletion after a while.
-while _bucket:get{2} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_cond(function() return not _bucket:get{2} end)
 ---
+- true
 ...
 _bucket:select{}
 ---
@@ -410,8 +428,9 @@ _bucket:replace{4, vshard.consts.BUCKET.SENT}
 ---
 - [4, 'sent']
 ...
-while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_cond(function() return not _bucket:get{4} end)
 ---
+- true
 ...
 --
 -- Test WAL errors during deletion from _bucket.
@@ -434,11 +453,14 @@ s:replace{6, 4}
 ---
 - [6, 4]
 ...
-while not test_run:grep_log("default", "Error during deletion of empty sent buckets") do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_log('default', 'Error during garbage collection step',            \
+                  65536, 10)
 ---
+- Error during garbage collection step
 ...
-while #sk:select{4} ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_cond(function() return #sk:select{4} == 0 end)
 ---
+- true
 ...
 s:select{}
 ---
@@ -454,8 +476,9 @@ _bucket:select{}
 _ = _bucket:on_replace(nil, rollback_on_delete)
 ---
 ...
-while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_cond(function() return not _bucket:get{4} end)
 ---
+- true
 ...
 f:cancel()
 ---
@@ -562,8 +585,9 @@ for i = 1, 2000 do _bucket:replace{i, vshard.consts.BUCKET.GARBAGE} s:replace{i,
 f = fiber.create(vshard.storage.internal.gc_bucket_f)
 ---
 ...
-while _bucket:count() ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_cond(function() return _bucket:count() == 0 end)
 ---
+- true
 ...
 _bucket:select{}
 ---
diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
index 30079fa..250afb0 100644
--- a/test/unit/garbage.test.lua
+++ b/test/unit/garbage.test.lua
@@ -15,7 +15,6 @@ end;
 test_run:cmd("setopt delimiter ''");
 
 vshard.storage.internal.shard_index = 'bucket_id'
-vshard.storage.internal.collect_bucket_garbage_interval = vshard.consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL
 
 --
 -- Find nothing if no bucket_id anywhere, or there is no index
@@ -75,16 +74,13 @@ s:drop()
 format = {}
 format[1] = {name = 'id', type = 'unsigned'}
 format[2] = {name = 'status', type = 'string'}
+format[3] = {name = 'destination', type = 'string', is_nullable = true}
 _bucket = box.schema.create_space('_bucket', {format = format})
 _ = _bucket:create_index('pk')
 _ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false})
 _bucket:replace{1, vshard.consts.BUCKET.ACTIVE}
 _bucket:replace{2, vshard.consts.BUCKET.RECEIVING}
 _bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
-_bucket:replace{4, vshard.consts.BUCKET.SENT}
-_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
-_bucket:replace{6, vshard.consts.BUCKET.GARBAGE}
-_bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
 
 s = box.schema.create_space('test', {engine = engine})
 pk = s:create_index('pk')
@@ -94,7 +90,7 @@ s:replace{2, 1}
 s:replace{3, 2}
 s:replace{4, 2}
 
-gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
+gc_bucket_drop = vshard.storage.internal.gc_bucket_drop
 s2 = box.schema.create_space('test2', {engine = engine})
 pk2 = s2:create_index('pk')
 sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
@@ -114,6 +110,10 @@ function fill_spaces_with_garbage()
     s2:replace{6, 4}
     s2:replace{7, 5}
     s2:replace{7, 6}
+    _bucket:replace{4, vshard.consts.BUCKET.SENT, 'destination1'}
+    _bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
+    _bucket:replace{6, vshard.consts.BUCKET.GARBAGE, 'destination2'}
+    _bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
 end;
 test_run:cmd("setopt delimiter ''");
 
@@ -121,15 +121,21 @@ fill_spaces_with_garbage()
 
 #s2:select{}
 #s:select{}
-gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
+route_map = {}
+gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
+route_map
 #s2:select{}
 #s:select{}
-gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
+route_map = {}
+gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
+route_map
 s2:select{}
 s:select{}
 -- Nothing deleted - update collected generation.
-gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
-gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
+route_map = {}
+gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
+gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
+route_map
 #s2:select{}
 #s:select{}
 
@@ -137,10 +143,14 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
 -- Test continuous garbage collection via background fiber.
 --
 fill_spaces_with_garbage()
-_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
+_ = _bucket:on_replace(function()                                               \
+    local gen = vshard.storage.internal.bucket_generation                       \
+    vshard.storage.internal.bucket_generation = gen + 1                         \
+    vshard.storage.internal.bucket_generation_cond:broadcast()                  \
+end)
 f = fiber.create(vshard.storage.internal.gc_bucket_f)
 -- Wait until garbage collection is finished.
-while s2:count() ~= 3 or s:count() ~= 6 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_cond(function() return s2:count() == 3 and s:count() == 6 end)
 s:select{}
 s2:select{}
 -- Check garbage bucket is deleted by background fiber.
@@ -150,7 +160,7 @@ _bucket:select{}
 --
 _bucket:replace{2, vshard.consts.BUCKET.SENT}
 -- Wait deletion after a while.
-while _bucket:get{2} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_cond(function() return not _bucket:get{2} end)
 _bucket:select{}
 s:select{}
 s2:select{}
@@ -162,7 +172,7 @@ _bucket:replace{4, vshard.consts.BUCKET.ACTIVE}
 s:replace{5, 4}
 s:replace{6, 4}
 _bucket:replace{4, vshard.consts.BUCKET.SENT}
-while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_cond(function() return not _bucket:get{4} end)
 
 --
 -- Test WAL errors during deletion from _bucket.
@@ -172,12 +182,13 @@ _ = _bucket:on_replace(rollback_on_delete)
 _bucket:replace{4, vshard.consts.BUCKET.SENT}
 s:replace{5, 4}
 s:replace{6, 4}
-while not test_run:grep_log("default", "Error during deletion of empty sent buckets") do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
-while #sk:select{4} ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_log('default', 'Error during garbage collection step',            \
+                  65536, 10)
+test_run:wait_cond(function() return #sk:select{4} == 0 end)
 s:select{}
 _bucket:select{}
 _ = _bucket:on_replace(nil, rollback_on_delete)
-while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_cond(function() return not _bucket:get{4} end)
 
 f:cancel()
 
@@ -220,7 +231,7 @@ for i = 1, 2000 do _bucket:replace{i, vshard.consts.BUCKET.GARBAGE} s:replace{i,
 #s:select{}
 #s2:select{}
 f = fiber.create(vshard.storage.internal.gc_bucket_f)
-while _bucket:count() ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
+test_run:wait_cond(function() return _bucket:count() == 0 end)
 _bucket:select{}
 s:select{}
 s2:select{}
diff --git a/test/unit/garbage_errinj.result b/test/unit/garbage_errinj.result
deleted file mode 100644
index 92c8039..0000000
--- a/test/unit/garbage_errinj.result
+++ /dev/null
@@ -1,223 +0,0 @@
-test_run = require('test_run').new()
----
-...
-vshard = require('vshard')
----
-...
-fiber = require('fiber')
----
-...
-engine = test_run:get_cfg('engine')
----
-...
-vshard.storage.internal.shard_index = 'bucket_id'
----
-...
-format = {}
----
-...
-format[1] = {name = 'id', type = 'unsigned'}
----
-...
-format[2] = {name = 'status', type = 'string', is_nullable = true}
----
-...
-_bucket = box.schema.create_space('_bucket', {format = format})
----
-...
-_ = _bucket:create_index('pk')
----
-...
-_ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false})
----
-...
-_bucket:replace{1, vshard.consts.BUCKET.ACTIVE}
----
-- [1, 'active']
-...
-_bucket:replace{2, vshard.consts.BUCKET.RECEIVING}
----
-- [2, 'receiving']
-...
-_bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
----
-- [3, 'active']
-...
-_bucket:replace{4, vshard.consts.BUCKET.SENT}
----
-- [4, 'sent']
-...
-_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
----
-- [5, 'garbage']
-...
-s = box.schema.create_space('test', {engine = engine})
----
-...
-pk = s:create_index('pk')
----
-...
-sk = s:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
----
-...
-s:replace{1, 1}
----
-- [1, 1]
-...
-s:replace{2, 1}
----
-- [2, 1]
-...
-s:replace{3, 2}
----
-- [3, 2]
-...
-s:replace{4, 2}
----
-- [4, 2]
-...
-s:replace{5, 100}
----
-- [5, 100]
-...
-s:replace{6, 100}
----
-- [6, 100]
-...
-s:replace{7, 4}
----
-- [7, 4]
-...
-s:replace{8, 5}
----
-- [8, 5]
-...
-s2 = box.schema.create_space('test2', {engine = engine})
----
-...
-pk2 = s2:create_index('pk')
----
-...
-sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
----
-...
-s2:replace{1, 1}
----
-- [1, 1]
-...
-s2:replace{3, 3}
----
-- [3, 3]
-...
-for i = 7, 1107 do s:replace{i, 200} end
----
-...
-s2:replace{4, 200}
----
-- [4, 200]
-...
-s2:replace{5, 100}
----
-- [5, 100]
-...
-s2:replace{5, 300}
----
-- [5, 300]
-...
-s2:replace{6, 4}
----
-- [6, 4]
-...
-s2:replace{7, 5}
----
-- [7, 5]
-...
-gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
----
-...
-gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
----
-- - 4
-- true
-...
-gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
----
-- - 5
-- true
-...
---
--- Test _bucket generation change during garbage buckets search.
---
-s:truncate()
----
-...
-_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
----
-...
-vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = true
----
-...
-f = fiber.create(function() gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) end)
----
-...
-_bucket:replace{4, vshard.consts.BUCKET.GARBAGE}
----
-- [4, 'garbage']
-...
-s:replace{5, 4}
----
-- [5, 4]
-...
-s:replace{6, 4}
----
-- [6, 4]
-...
-#s:select{}
----
-- 2
-...
-vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false
----
-...
-while f:status() ~= 'dead' do fiber.sleep(0.1) end
----
-...
--- Nothing is deleted - _bucket:replace() has changed _bucket
--- generation during search of garbage buckets.
-#s:select{}
----
-- 2
-...
-_bucket:select{4}
----
-- - [4, 'garbage']
-...
--- Next step deletes garbage ok.
-gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
----
-- []
-- true
-...
-gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
----
-- - 4
-  - 5
-- true
-...
-#s:select{}
----
-- 0
-...
-_bucket:delete{4}
----
-- [4, 'garbage']
-...
-s2:drop()
----
-...
-s:drop()
----
-...
-_bucket:drop()
----
-...
diff --git a/test/unit/garbage_errinj.test.lua b/test/unit/garbage_errinj.test.lua
deleted file mode 100644
index 31184b9..0000000
--- a/test/unit/garbage_errinj.test.lua
+++ /dev/null
@@ -1,73 +0,0 @@
-test_run = require('test_run').new()
-vshard = require('vshard')
-fiber = require('fiber')
-
-engine = test_run:get_cfg('engine')
-vshard.storage.internal.shard_index = 'bucket_id'
-
-format = {}
-format[1] = {name = 'id', type = 'unsigned'}
-format[2] = {name = 'status', type = 'string', is_nullable = true}
-_bucket = box.schema.create_space('_bucket', {format = format})
-_ = _bucket:create_index('pk')
-_ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false})
-_bucket:replace{1, vshard.consts.BUCKET.ACTIVE}
-_bucket:replace{2, vshard.consts.BUCKET.RECEIVING}
-_bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
-_bucket:replace{4, vshard.consts.BUCKET.SENT}
-_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
-
-s = box.schema.create_space('test', {engine = engine})
-pk = s:create_index('pk')
-sk = s:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
-s:replace{1, 1}
-s:replace{2, 1}
-s:replace{3, 2}
-s:replace{4, 2}
-s:replace{5, 100}
-s:replace{6, 100}
-s:replace{7, 4}
-s:replace{8, 5}
-
-s2 = box.schema.create_space('test2', {engine = engine})
-pk2 = s2:create_index('pk')
-sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
-s2:replace{1, 1}
-s2:replace{3, 3}
-for i = 7, 1107 do s:replace{i, 200} end
-s2:replace{4, 200}
-s2:replace{5, 100}
-s2:replace{5, 300}
-s2:replace{6, 4}
-s2:replace{7, 5}
-
-gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
-gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
-gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
-
---
--- Test _bucket generation change during garbage buckets search.
---
-s:truncate()
-_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
-vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = true
-f = fiber.create(function() gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) end)
-_bucket:replace{4, vshard.consts.BUCKET.GARBAGE}
-s:replace{5, 4}
-s:replace{6, 4}
-#s:select{}
-vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false
-while f:status() ~= 'dead' do fiber.sleep(0.1) end
--- Nothing is deleted - _bucket:replace() has changed _bucket
--- generation during search of garbage buckets.
-#s:select{}
-_bucket:select{4}
--- Next step deletes garbage ok.
-gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
-gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
-#s:select{}
-_bucket:delete{4}
-
-s2:drop()
-s:drop()
-_bucket:drop()
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index 28c3400..1345058 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -245,9 +245,7 @@ local cfg_template = {
         max = consts.REBALANCER_MAX_SENDING_MAX
     },
     collect_bucket_garbage_interval = {
-        type = 'positive number', name = 'Garbage bucket collect interval',
-        is_optional = true,
-        default = consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL
+        name = 'Garbage bucket collect interval', is_deprecated = true,
     },
     collect_lua_garbage = {
         type = 'boolean', name = 'Garbage Lua collect necessity',
diff --git a/vshard/consts.lua b/vshard/consts.lua
index 8c2a8b0..3f1585a 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -23,6 +23,7 @@ return {
     DEFAULT_BUCKET_COUNT = 3000;
     BUCKET_SENT_GARBAGE_DELAY = 0.5;
     BUCKET_CHUNK_SIZE = 1000;
+    LUA_CHUNK_SIZE = 100000,
     DEFAULT_REBALANCER_DISBALANCE_THRESHOLD = 1;
     REBALANCER_IDLE_INTERVAL = 60 * 60;
     REBALANCER_WORK_INTERVAL = 10;
@@ -37,7 +38,7 @@ return {
     DEFAULT_FAILOVER_PING_TIMEOUT = 5;
     DEFAULT_SYNC_TIMEOUT = 1;
     RECONNECT_TIMEOUT = 0.5;
-    DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5;
+    GC_BACKOFF_INTERVAL = 5,
     RECOVERY_INTERVAL = 5;
     COLLECT_LUA_GARBAGE_INTERVAL = 100;
 
@@ -45,4 +46,6 @@ return {
     DISCOVERY_WORK_INTERVAL = 1,
     DISCOVERY_WORK_STEP = 0.01,
     DISCOVERY_TIMEOUT = 10,
+
+    TIMEOUT_INFINITY = 500 * 365 * 86400,
 }
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 298df71..31a6fc7 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -69,7 +69,6 @@ if not M then
         total_bucket_count = 0,
         errinj = {
             ERRINJ_CFG = false,
-            ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
             ERRINJ_RELOAD = false,
             ERRINJ_CFG_DELAY = false,
             ERRINJ_LONG_RECEIVE = false,
@@ -96,6 +95,8 @@ if not M then
         -- detect that _bucket was not changed between yields.
         --
         bucket_generation = 0,
+        -- Condition variable fired on generation update.
+        bucket_generation_cond = lfiber.cond(),
         --
         -- Reference to the function used as on_replace trigger on
         -- _bucket space. It is used to replace the trigger with
@@ -107,12 +108,14 @@ if not M then
         -- replace the old function is to keep its reference.
         --
         bucket_on_replace = nil,
+        -- Redirects for recently sent buckets. They are kept for a while to
+        -- help routers to find a new location for sent and deleted buckets
+        -- without whole cluster scan.
+        route_map = {},
 
         ------------------- Garbage collection -------------------
         -- Fiber to remove garbage buckets data.
         collect_bucket_garbage_fiber = nil,
-        -- Do buckets garbage collection once per this time.
-        collect_bucket_garbage_interval = nil,
         -- Boolean lua_gc state (create periodic gc task).
         collect_lua_garbage = nil,
 
@@ -173,6 +176,7 @@ end
 --
 local function bucket_generation_increment()
     M.bucket_generation = M.bucket_generation + 1
+    M.bucket_generation_cond:broadcast()
 end
 
 --
@@ -758,8 +762,9 @@ local function bucket_check_state(bucket_id, mode)
     else
         return bucket
     end
+    local dst = bucket and bucket.destination or M.route_map[bucket_id]
     return bucket, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, reason,
-                                 bucket and bucket.destination)
+                                 dst)
 end
 
 --
@@ -804,11 +809,23 @@ end
 --
 local function bucket_unrefro(bucket_id)
     local ref = M.bucket_refs[bucket_id]
-    if not ref or ref.ro == 0 then
+    local count = ref and ref.ro or 0
+    if count == 0 then
         return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id,
                                   "no refs", nil)
     end
-    ref.ro = ref.ro - 1
+    if count == 1 then
+        ref.ro = 0
+        if ref.ro_lock then
+            -- Garbage collector is waiting for the bucket if RO
+            -- is locked. Let it know it has one more bucket to
+            -- collect. It relies on generation, so its increment
+            -- it enough.
+            bucket_generation_increment()
+        end
+        return true
+    end
+    ref.ro = count - 1
     return true
 end
 
@@ -1479,79 +1496,44 @@ local function gc_bucket_in_space(space, bucket_id, status)
 end
 
 --
--- Remove tuples from buckets of a specified type.
--- @param type Type of buckets to gc.
--- @retval List of ids of empty buckets of the type.
+-- Drop buckets with the given status along with their data in all spaces.
+-- @param status Status of target buckets.
+-- @param route_map Destinations of deleted buckets are saved into this table.
 --
-local function gc_bucket_step_by_type(type)
-    local sharded_spaces = find_sharded_spaces()
-    local empty_buckets = {}
+local function gc_bucket_drop_xc(status, route_map)
     local limit = consts.BUCKET_CHUNK_SIZE
-    local is_all_collected = true
-    for _, bucket in box.space._bucket.index.status:pairs(type) do
-        local bucket_id = bucket.id
-        local ref = M.bucket_refs[bucket_id]
+    local _bucket = box.space._bucket
+    local sharded_spaces = find_sharded_spaces()
+    for _, b in _bucket.index.status:pairs(status) do
+        local id = b.id
+        local ref = M.bucket_refs[id]
         if ref then
             assert(ref.rw == 0)
             if ref.ro ~= 0 then
                 ref.ro_lock = true
-                is_all_collected = false
                 goto continue
             end
-            M.bucket_refs[bucket_id] = nil
+            M.bucket_refs[id] = nil
         end
         for _, space in pairs(sharded_spaces) do
-            gc_bucket_in_space_xc(space, bucket_id, type)
+            gc_bucket_in_space_xc(space, id, status)
             limit = limit - 1
             if limit == 0 then
                 lfiber.sleep(0)
                 limit = consts.BUCKET_CHUNK_SIZE
             end
         end
-        table.insert(empty_buckets, bucket.id)
-::continue::
+        route_map[id] = b.destination
+        _bucket:delete{id}
+    ::continue::
     end
-    return empty_buckets, is_all_collected
-end
-
---
--- Drop buckets with ids in the list.
--- @param bucket_ids Bucket ids to drop.
--- @param status Expected bucket status.
---
-local function gc_bucket_drop_xc(bucket_ids, status)
-    if #bucket_ids == 0 then
-        return
-    end
-    local limit = consts.BUCKET_CHUNK_SIZE
-    box.begin()
-    local _bucket = box.space._bucket
-    for _, id in pairs(bucket_ids) do
-        local bucket_exists = _bucket:get{id} ~= nil
-        local b = _bucket:get{id}
-        if b then
-            if b.status ~= status then
-                return error(string.format('Bucket %d status is changed. Was '..
-                                           '%s, became %s', id, status,
-                                           b.status))
-            end
-            _bucket:delete{id}
-        end
-        limit = limit - 1
-        if limit == 0 then
-            box.commit()
-            box.begin()
-            limit = consts.BUCKET_CHUNK_SIZE
-        end
-    end
-    box.commit()
 end
 
 --
 -- Exception safe version of gc_bucket_drop_xc.
 --
-local function gc_bucket_drop(bucket_ids, status)
-    local status, err = pcall(gc_bucket_drop_xc, bucket_ids, status)
+local function gc_bucket_drop(status, route_map)
+    local status, err = pcall(gc_bucket_drop_xc, status, route_map)
     if not status then
         box.rollback()
     end
@@ -1578,65 +1560,75 @@ function gc_bucket_f()
     -- generation == bucket generation. In such a case the fiber
     -- does nothing until next _bucket change.
     local bucket_generation_collected = -1
-    -- Empty sent buckets are collected into an array. After a
-    -- specified time interval the buckets are deleted both from
-    -- this array and from _bucket space.
-    local buckets_for_redirect = {}
-    local buckets_for_redirect_ts = clock()
-    -- Empty sent buckets, updated after each step, and when
-    -- buckets_for_redirect is deleted, it gets empty_sent_buckets
-    -- for next deletion.
-    local empty_garbage_buckets, empty_sent_buckets, status, err
+    local bucket_generation_current = M.bucket_generation
+    -- Deleted buckets are saved into a route map to redirect routers if they
+    -- didn't discover new location of the buckets yet. However route map does
+    -- not grow infinitely. Otherwise it would end up storing redirects for all
+    -- buckets in the cluster. Which could also be outdated.
+    -- Garbage collector periodically drops old routes from the map. For that it
+    -- remembers state of route map in one moment, and after a while clears the
+    -- remembered routes from the global route map.
+    local route_map = M.route_map
+    local route_map_old = {}
+    local route_map_deadline = 0
+    local status, err
     while M.module_version == module_version do
-        -- Check if no changes in buckets configuration.
-        if bucket_generation_collected ~= M.bucket_generation then
-            local bucket_generation = M.bucket_generation
-            local is_sent_collected, is_garbage_collected
-            status, empty_garbage_buckets, is_garbage_collected =
-                pcall(gc_bucket_step_by_type, consts.BUCKET.GARBAGE)
-            if not status then
-                err = empty_garbage_buckets
-                goto check_error
-            end
-            status, empty_sent_buckets, is_sent_collected =
-                pcall(gc_bucket_step_by_type, consts.BUCKET.SENT)
-            if not status then
-                err = empty_sent_buckets
-                goto check_error
+        if bucket_generation_collected ~= bucket_generation_current then
+            status, err = gc_bucket_drop(consts.BUCKET.GARBAGE, route_map)
+            if status then
+                status, err = gc_bucket_drop(consts.BUCKET.SENT, route_map)
             end
-            status, err = gc_bucket_drop(empty_garbage_buckets,
-                                         consts.BUCKET.GARBAGE)
-::check_error::
             if not status then
                 box.rollback()
                 log.error('Error during garbage collection step: %s', err)
-                goto continue
+            else
+                -- Don't use global generation. During the collection it could
+                -- already change. Instead, remember the generation known before
+                -- the collection has started.
+                -- Since the collection also changes the generation, it makes
+                -- the GC happen always at least twice. But typically on the
+                -- second iteration it should not find any buckets to collect,
+                -- and then the collected generation matches the global one.
+                bucket_generation_collected = bucket_generation_current
             end
-            if is_sent_collected and is_garbage_collected then
-                bucket_generation_collected = bucket_generation
+        else
+            status = true
+        end
+
+        local sleep_time = route_map_deadline - clock()
+        if sleep_time <= 0 then
+            local chunk = consts.LUA_CHUNK_SIZE
+            util.table_minus_yield(route_map, route_map_old, chunk)
+            route_map_old = util.table_copy_yield(route_map, chunk)
+            if next(route_map_old) then
+                sleep_time = consts.BUCKET_SENT_GARBAGE_DELAY
+            else
+                sleep_time = consts.TIMEOUT_INFINITY
             end
+            route_map_deadline = clock() + sleep_time
         end
+        bucket_generation_current = M.bucket_generation
 
-        if clock() - buckets_for_redirect_ts >=
-           consts.BUCKET_SENT_GARBAGE_DELAY then
-            status, err = gc_bucket_drop(buckets_for_redirect,
-                                         consts.BUCKET.SENT)
-            if not status then
-                buckets_for_redirect = {}
-                empty_sent_buckets = {}
-                bucket_generation_collected = -1
-                log.error('Error during deletion of empty sent buckets: %s',
-                          err)
-            elseif M.module_version ~= module_version then
-                return
+        if bucket_generation_current ~= bucket_generation_collected then
+            -- Generation was changed during collection. Or *by* collection.
+            if status then
+                -- Retry immediately. If the generation was changed by the
+                -- collection itself, it will notice it next iteration, and go
+                -- to proper sleep.
+                sleep_time = 0
             else
-                buckets_for_redirect = empty_sent_buckets or {}
-                empty_sent_buckets = nil
-                buckets_for_redirect_ts = clock()
+                -- An error happened during the collection. Does not make sense
+                -- to retry on each iteration of the event loop. The most likely
+                -- errors are either a WAL error or a transaction abort - both
+                -- look like an issue in the user's code and can't be fixed
+                -- quickly anyway. Backoff.
+                sleep_time = consts.GC_BACKOFF_INTERVAL
             end
         end
-::continue::
-        lfiber.sleep(M.collect_bucket_garbage_interval)
+
+        if M.module_version == module_version then
+            M.bucket_generation_cond:wait(sleep_time)
+        end
     end
 end
 
@@ -2421,8 +2413,6 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
         vshard_cfg.rebalancer_disbalance_threshold
     M.rebalancer_receiving_quota = vshard_cfg.rebalancer_max_receiving
     M.shard_index = vshard_cfg.shard_index
-    M.collect_bucket_garbage_interval =
-        vshard_cfg.collect_bucket_garbage_interval
     M.collect_lua_garbage = vshard_cfg.collect_lua_garbage
     M.rebalancer_worker_count = vshard_cfg.rebalancer_max_sending
     M.current_cfg = cfg
@@ -2676,6 +2666,9 @@ else
         storage_cfg(M.current_cfg, M.this_replica.uuid, true)
     end
     M.module_version = M.module_version + 1
+    -- Background fibers could sleep waiting for bucket changes.
+    -- Let them know it is time to reload.
+    bucket_generation_increment()
 end
 
 M.recovery_f = recovery_f
@@ -2686,7 +2679,7 @@ M.gc_bucket_f = gc_bucket_f
 -- These functions are saved in M not for atomic reload, but for
 -- unit testing.
 --
-M.gc_bucket_step_by_type = gc_bucket_step_by_type
+M.gc_bucket_drop = gc_bucket_drop
 M.rebalancer_build_routes = rebalancer_build_routes
 M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
 M.cached_find_sharded_spaces = find_sharded_spaces
diff --git a/vshard/storage/reload_evolution.lua b/vshard/storage/reload_evolution.lua
index f38af74..484f499 100644
--- a/vshard/storage/reload_evolution.lua
+++ b/vshard/storage/reload_evolution.lua
@@ -4,6 +4,7 @@
 -- in a commit.
 --
 local log = require('log')
+local fiber = require('fiber')
 
 --
 -- Array of upgrade functions.
@@ -25,6 +26,13 @@ migrations[#migrations + 1] = function(M)
     end
 end
 
+migrations[#migrations + 1] = function(M)
+    if not M.route_map then
+        M.bucket_generation_cond = fiber.cond()
+        M.route_map = {}
+    end
+end
+
 --
 -- Perform an update based on a version stored in `M` (internals).
 -- @param M Old module internals which should be updated.
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list