From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id BE8FA6C7D2; Thu, 11 Feb 2021 09:52:06 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org BE8FA6C7D2 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1613026326; bh=OdwOz2ZUYfknuVld4ItmwVgKsNS6vGEt8CCOyCWJagw=; h=To:References:Date:In-Reply-To:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=YT03pX0peqa4IX6ffEnNzs9yTxDErmlTGbaIwFxWu+7V7H9NKnjugZ8oOqp3KsbgT qB0Xp8gzl6P8FqzFJBSQqjxydR/LJyjlZs1CSiTVJMSOc/DXVYwar//GuGbZ3i5OkY zZvcpmqSnvzpYvJuGJ6sI+fEIpq4FhrBJAhev2K4= Received: from smtp46.i.mail.ru (smtp46.i.mail.ru [94.100.177.106]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 94BA871814 for ; Thu, 11 Feb 2021 09:51:03 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 94BA871814 Received: by smtp46.i.mail.ru with esmtpa (envelope-from ) id 1lA5oY-0006Ek-Vg; Thu, 11 Feb 2021 09:50:59 +0300 To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org, yaroslav.dynnikov@tarantool.org References: <657bade6ef0ecf12b77c1a037d8326552f761002.1612914070.git.v.shpilevoy@tarantool.org> <68b5d6a6-1fb4-913d-61ac-e3062683e325@tarantool.org> Message-ID: Date: Thu, 11 Feb 2021 09:50:58 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:78.0) Gecko/20100101 Thunderbird/78.7.0 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 7bit Content-Language: en-GB X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD953AC099BC0052A9C4647521586BE7E6324520D2A088600D8182A05F5380850404721215EB0E06DA3D7C42D2BE3920CD911A890A368E47687A17F0A71E8C0A695 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE74378043A27BE1642EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637BD378188104BC8BE8638F802B75D45FF5571747095F342E8C7A0BC55FA0FE5FC7BF04B2A11D85FC6F3068E78C6D93C76C4E46B1D47C3B423389733CBF5DBD5E913377AFFFEAFD269A417C69337E82CC2CC7F00164DA146DAFE8445B8C89999729449624AB7ADAF37F6B57BC7E64490611E7FA7ABCAF51C92176DF2183F8FC7C07E7E81EEA8A9722B8941B15DA834481F9449624AB7ADAF37BA3038C0950A5D3613377AFFFEAFD269176DF2183F8FC7C066A5F66989CCBBE47B076A6E789B0E97A8DF7F3B2552694A1E7802607F20496D49FD398EE364050F140C956E756FBB7AD94E105876FE7799B3661434B16C20AC78D18283394535A975ECD9A6C639B01BC09775C1D3CA48CFF8A0C55DE5E8D0B235872C767BF85DA22EF20D2F80756B5F40A5AABA2AD3711975ECD9A6C639B01B78DA827A17800CE7BA474C34E03C8BE3731C566533BA786A40A5AABA2AD371193C9F3DD0FB1AF5EB82E77451A5C57BD33C9F3DD0FB1AF5EB4E70A05D1297E1BBCB5012B2E24CD356 X-C1DE0DAB: 0D63561A33F958A54918954ED31E52B39FF6D5802DAF5EC157651B1396A459D6D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA75448CF9D3A7B2C848410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D348D68DCC07DD06FF54196EA95623B3AEAD56652CE0906EAA6360D8C5E35E28184C7574338BCCE698C1D7E09C32AA3244C6EEE6C4D17243FD855CCC821CBF0B7267C0C08F7987826B9FACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojmjFTaBTEi1bAm9CdVAJCGw== X-Mailru-Sender: 583F1D7ACE8F49BD9317CE1922F30C7EFA20125C00004011B0CB9D6FA0632387CE7C75156624650D23E75C7104EB1B885DEE61814008E47C7013064206BFB89F93956FB04BA385BE9437F6177E88F7363CDA0F3B3F5B9367 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH 7/9] gc: introduce reactive garbage collector X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Oleg Babin via Tarantool-patches Reply-To: Oleg Babin Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Thanks for your fixes! LGTM. On 11/02/2021 01:35, Vladislav Shpilevoy wrote: > Thanks for the review! > > On 10.02.2021 10:00, Oleg Babin wrote: >> Thanks for your patch. >> >> As I see you've introduced some new parameters: "LUA_CHUNK_SIZE" and "GC_BACKOFF_INTERVAL". > I decided not to go into too deep details and not describe private > constants in the commit message. GC_BACKOFF_INTERVAL is explained > in the place where it is used. LUA_CHUNK_SIZE is quite obvious if > you look at its usage. > >> I think it's better to describe them in commit message to understand more clear how new algorithm. > These constants are not super relevant to the algorithm's core > idea. It does not matter much for the reactive GC concept if I > yield in table utility functions, or if I have a backoff timeout. > These could be considered 'optimizations', 'amendments'. I would > consider them small details not worth mentioning in the commit > message. > >> I see that you didn't update comment above "gc_bucket_f" function. Is it still relevant? > No, irrelevant, thanks for noticing. Here is the diff: > > ==================== > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua > index 99f92a0..1ea8069 100644 > --- a/vshard/storage/init.lua > +++ b/vshard/storage/init.lua > @@ -1543,14 +1543,16 @@ local function gc_bucket_drop(status, route_map) > end > > -- > --- Garbage collector. Works on masters. The garbage collector > --- wakes up once per specified time. > +-- Garbage collector. Works on masters. The garbage collector wakes up when > +-- state of any bucket changes. > -- After wakeup it follows the plan: > --- 1) Check if _bucket has changed. If not, then sleep again; > --- 2) Scan user spaces for sent and garbage buckets, delete > --- garbage data in batches of limited size; > --- 3) Delete GARBAGE buckets from _bucket immediately, and > --- schedule SENT buckets for deletion after a timeout; > +-- 1) Check if state of any bucket has really changed. If not, then sleep again; > +-- 2) Delete all GARBAGE and SENT buckets along with their data in chunks of > +-- limited size. > +-- 3) Bucket destinations are saved into a global route_map to reroute incoming > +-- requests from routers in case they didn't notice the buckets being moved. > +-- The saved routes are scheduled for deletion after a timeout, which is > +-- checked on each iteration of this loop. > -- 4) Sleep, go to (1). > -- For each step details see comments in the code. > -- > ==================== > > The full new patch below. > > ==================== > gc: introduce reactive garbage collector > > Garbage collector is a fiber on a master node which deletes > GARBAGE and SENT buckets along with their data. > > It was proactive. It used to wakeup with a constant period to > find and delete the needed buckets. > > But this won't work with the future feature called 'map-reduce'. > Map-reduce as a preparation stage will need to ensure that all > buckets on a storage are readable and writable. With the current > GC algorithm if a bucket is sent, it won't be deleted for the next > 5 seconds by default. During this time all new map-reduce requests > can't execute. > > This is not acceptable. As well as too frequent wakeup of GC fiber > because it would waste TX thread time. > > The patch makes GC fiber wakeup not by a timeout but by events > happening with _bucket space. GC fiber sleeps on a condition > variable which is signaled when _bucket is changed. > > Once GC sees work to do, it won't sleep until it is done. It will > only yield. > > This makes GC delete SENT and GARBAGE buckets as soon as possible > reducing the waiting time for the incoming map-reduce requests. > > Needed for #147 > > @TarantoolBot document > Title: VShard: deprecate cfg option 'collect_bucket_garbage_interval' > It was used to specify the interval between bucket garbage > collection steps. It was needed because garbage collection in > vshard was proactive. It didn't react to newly appeared garbage > buckets immediately. > > Since now (0.1.17) garbage collection became reactive. It starts > working with garbage buckets immediately as they appear. And > sleeps rest of the time. The option is not used now and does not > affect behaviour of anything. > > I suppose it can be deleted from the documentation. Or left with > a big label 'deprecated' + the explanation above. > > An attempt to use the option does not cause an error, but logs a > warning. > > diff --git a/test/lua_libs/storage_template.lua b/test/lua_libs/storage_template.lua > index 21409bd..8df89f6 100644 > --- a/test/lua_libs/storage_template.lua > +++ b/test/lua_libs/storage_template.lua > @@ -172,6 +172,5 @@ function wait_bucket_is_collected(id) > return true > end > vshard.storage.recovery_wakeup() > - vshard.storage.garbage_collector_wakeup() > end) > end > diff --git a/test/misc/reconfigure.result b/test/misc/reconfigure.result > index 168be5d..3b34841 100644 > --- a/test/misc/reconfigure.result > +++ b/test/misc/reconfigure.result > @@ -83,9 +83,6 @@ cfg.collect_lua_garbage = true > cfg.rebalancer_max_receiving = 1000 > --- > ... > -cfg.collect_bucket_garbage_interval = 100 > ---- > -... > cfg.invalid_option = 'kek' > --- > ... > @@ -105,10 +102,6 @@ vshard.storage.internal.rebalancer_max_receiving ~= 1000 > --- > - true > ... > -vshard.storage.internal.collect_bucket_garbage_interval ~= 100 > ---- > -- true > -... > cfg.sync_timeout = nil > --- > ... > @@ -118,9 +111,6 @@ cfg.collect_lua_garbage = nil > cfg.rebalancer_max_receiving = nil > --- > ... > -cfg.collect_bucket_garbage_interval = nil > ---- > -... > cfg.invalid_option = nil > --- > ... > diff --git a/test/misc/reconfigure.test.lua b/test/misc/reconfigure.test.lua > index e891010..348628c 100644 > --- a/test/misc/reconfigure.test.lua > +++ b/test/misc/reconfigure.test.lua > @@ -33,17 +33,14 @@ vshard.storage.internal.sync_timeout > cfg.sync_timeout = 100 > cfg.collect_lua_garbage = true > cfg.rebalancer_max_receiving = 1000 > -cfg.collect_bucket_garbage_interval = 100 > cfg.invalid_option = 'kek' > vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a) > not vshard.storage.internal.collect_lua_garbage > vshard.storage.internal.sync_timeout > vshard.storage.internal.rebalancer_max_receiving ~= 1000 > -vshard.storage.internal.collect_bucket_garbage_interval ~= 100 > cfg.sync_timeout = nil > cfg.collect_lua_garbage = nil > cfg.rebalancer_max_receiving = nil > -cfg.collect_bucket_garbage_interval = nil > cfg.invalid_option = nil > > -- > diff --git a/test/rebalancer/bucket_ref.result b/test/rebalancer/bucket_ref.result > index b8fc7ff..9df7480 100644 > --- a/test/rebalancer/bucket_ref.result > +++ b/test/rebalancer/bucket_ref.result > @@ -184,9 +184,6 @@ vshard.storage.bucket_unref(1, 'read') > - true > ... > -- Force GC to take an RO lock on the bucket now. > -vshard.storage.garbage_collector_wakeup() > ---- > -... > vshard.storage.buckets_info(1) > --- > - 1: > @@ -203,7 +200,6 @@ while true do > if i.status == vshard.consts.BUCKET.GARBAGE and i.ro_lock then > break > end > - vshard.storage.garbage_collector_wakeup() > fiber.sleep(0.01) > end; > --- > @@ -235,14 +231,6 @@ finish_refs = true > while f1:status() ~= 'dead' do fiber.sleep(0.01) end > --- > ... > -vshard.storage.buckets_info(1) > ---- > -- 1: > - status: garbage > - ro_lock: true > - destination: > - id: 1 > -... > wait_bucket_is_collected(1) > --- > ... > diff --git a/test/rebalancer/bucket_ref.test.lua b/test/rebalancer/bucket_ref.test.lua > index 213ced3..1b032ff 100644 > --- a/test/rebalancer/bucket_ref.test.lua > +++ b/test/rebalancer/bucket_ref.test.lua > @@ -56,7 +56,6 @@ vshard.storage.bucket_unref(1, 'write') -- Error, no refs. > vshard.storage.bucket_ref(1, 'read') > vshard.storage.bucket_unref(1, 'read') > -- Force GC to take an RO lock on the bucket now. > -vshard.storage.garbage_collector_wakeup() > vshard.storage.buckets_info(1) > _ = test_run:cmd("setopt delimiter ';'") > while true do > @@ -64,7 +63,6 @@ while true do > if i.status == vshard.consts.BUCKET.GARBAGE and i.ro_lock then > break > end > - vshard.storage.garbage_collector_wakeup() > fiber.sleep(0.01) > end; > _ = test_run:cmd("setopt delimiter ''"); > @@ -72,7 +70,6 @@ vshard.storage.buckets_info(1) > vshard.storage.bucket_refro(1) > finish_refs = true > while f1:status() ~= 'dead' do fiber.sleep(0.01) end > -vshard.storage.buckets_info(1) > wait_bucket_is_collected(1) > _ = test_run:switch('box_2_a') > vshard.storage.buckets_info(1) > diff --git a/test/rebalancer/errinj.result b/test/rebalancer/errinj.result > index e50eb72..0ddb1c9 100644 > --- a/test/rebalancer/errinj.result > +++ b/test/rebalancer/errinj.result > @@ -226,17 +226,6 @@ ret2, err2 > - true > - null > ... > -_bucket:get{35} > ---- > -- [35, 'sent', ''] > -... > -_bucket:get{36} > ---- > -- [36, 'sent', ''] > -... > --- Buckets became 'active' on box_2_a, but still are sending on > --- box_1_a. Wait until it is marked as garbage on box_1_a by the > --- recovery fiber. > wait_bucket_is_collected(35) > --- > ... > diff --git a/test/rebalancer/errinj.test.lua b/test/rebalancer/errinj.test.lua > index 2cc4a69..a60f3d7 100644 > --- a/test/rebalancer/errinj.test.lua > +++ b/test/rebalancer/errinj.test.lua > @@ -102,11 +102,6 @@ _ = test_run:switch('box_1_a') > while f1:status() ~= 'dead' or f2:status() ~= 'dead' do fiber.sleep(0.001) end > ret1, err1 > ret2, err2 > -_bucket:get{35} > -_bucket:get{36} > --- Buckets became 'active' on box_2_a, but still are sending on > --- box_1_a. Wait until it is marked as garbage on box_1_a by the > --- recovery fiber. > wait_bucket_is_collected(35) > wait_bucket_is_collected(36) > _ = test_run:switch('box_2_a') > diff --git a/test/rebalancer/receiving_bucket.result b/test/rebalancer/receiving_bucket.result > index 7d3612b..ad93445 100644 > --- a/test/rebalancer/receiving_bucket.result > +++ b/test/rebalancer/receiving_bucket.result > @@ -366,14 +366,6 @@ vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3}) > --- > - true > ... > -vshard.storage.buckets_info(1) > ---- > -- 1: > - status: sent > - ro_lock: true > - destination: > - id: 1 > -... > wait_bucket_is_collected(1) > --- > ... > diff --git a/test/rebalancer/receiving_bucket.test.lua b/test/rebalancer/receiving_bucket.test.lua > index 24534b3..2cf6382 100644 > --- a/test/rebalancer/receiving_bucket.test.lua > +++ b/test/rebalancer/receiving_bucket.test.lua > @@ -136,7 +136,6 @@ box.space.test3:select{100} > -- Now the bucket is unreferenced and can be transferred. > _ = test_run:switch('box_2_a') > vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3}) > -vshard.storage.buckets_info(1) > wait_bucket_is_collected(1) > vshard.storage.buckets_info(1) > _ = test_run:switch('box_1_a') > diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result > index 753687f..9d30a04 100644 > --- a/test/reload_evolution/storage.result > +++ b/test/reload_evolution/storage.result > @@ -92,7 +92,7 @@ test_run:grep_log('storage_2_a', 'vshard.storage.reload_evolution: upgraded to') > ... > vshard.storage.internal.reload_version > --- > -- 2 > +- 3 > ... > -- > -- gh-237: should be only one trigger. During gh-237 the trigger installation > diff --git a/test/router/reroute_wrong_bucket.result b/test/router/reroute_wrong_bucket.result > index 049bdef..ac340eb 100644 > --- a/test/router/reroute_wrong_bucket.result > +++ b/test/router/reroute_wrong_bucket.result > @@ -37,7 +37,7 @@ test_run:switch('storage_1_a') > --- > - true > ... > -cfg.collect_bucket_garbage_interval = 100 > +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100 > --- > ... > vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a) > @@ -53,7 +53,7 @@ test_run:switch('storage_2_a') > --- > - true > ... > -cfg.collect_bucket_garbage_interval = 100 > +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100 > --- > ... > vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a) > @@ -202,12 +202,12 @@ test_run:grep_log('router_1', 'please update configuration') > err > --- > - bucket_id: 100 > - reason: write is prohibited > + reason: Not found > code: 1 > destination: ac522f65-aa94-4134-9f64-51ee384f1a54 > type: ShardingError > name: WRONG_BUCKET > - message: 'Cannot perform action with bucket 100, reason: write is prohibited' > + message: 'Cannot perform action with bucket 100, reason: Not found' > ... > -- > -- Now try again, but update configuration during call(). It must > diff --git a/test/router/reroute_wrong_bucket.test.lua b/test/router/reroute_wrong_bucket.test.lua > index 9e6e804..207aac3 100644 > --- a/test/router/reroute_wrong_bucket.test.lua > +++ b/test/router/reroute_wrong_bucket.test.lua > @@ -11,13 +11,13 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt > test_run:cmd('create server router_1 with script="router/router_1.lua"') > test_run:cmd('start server router_1') > test_run:switch('storage_1_a') > -cfg.collect_bucket_garbage_interval = 100 > +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100 > vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a) > vshard.storage.rebalancer_disable() > for i = 1, 100 do box.space._bucket:replace{i, vshard.consts.BUCKET.ACTIVE} end > > test_run:switch('storage_2_a') > -cfg.collect_bucket_garbage_interval = 100 > +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100 > vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a) > vshard.storage.rebalancer_disable() > for i = 101, 200 do box.space._bucket:replace{i, vshard.consts.BUCKET.ACTIVE} end > diff --git a/test/storage/recovery.result b/test/storage/recovery.result > index f833fe7..8ccb0b9 100644 > --- a/test/storage/recovery.result > +++ b/test/storage/recovery.result > @@ -79,8 +79,7 @@ _bucket = box.space._bucket > ... > _bucket:select{} > --- > -- - [2, 'garbage', ''] > - - [3, 'garbage', ''] > +- [] > ... > _ = test_run:switch('storage_2_a') > --- > diff --git a/test/storage/storage.result b/test/storage/storage.result > index 424bc4c..0550ad1 100644 > --- a/test/storage/storage.result > +++ b/test/storage/storage.result > @@ -547,6 +547,9 @@ vshard.storage.bucket_send(1, util.replicasets[2]) > --- > - true > ... > +wait_bucket_is_collected(1) > +--- > +... > _ = test_run:switch("storage_2_a") > --- > ... > @@ -567,12 +570,7 @@ _ = test_run:switch("storage_1_a") > ... > vshard.storage.buckets_info() > --- > -- 1: > - status: sent > - ro_lock: true > - destination: > - id: 1 > - 2: > +- 2: > status: active > id: 2 > ... > diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua > index d631b51..d8fbd94 100644 > --- a/test/storage/storage.test.lua > +++ b/test/storage/storage.test.lua > @@ -136,6 +136,7 @@ vshard.storage.bucket_send(1, util.replicasets[1]) > > -- Successful transfer. > vshard.storage.bucket_send(1, util.replicasets[2]) > +wait_bucket_is_collected(1) > _ = test_run:switch("storage_2_a") > vshard.storage.buckets_info() > _ = test_run:switch("storage_1_a") > diff --git a/test/unit/config.result b/test/unit/config.result > index dfd0219..e0b2482 100644 > --- a/test/unit/config.result > +++ b/test/unit/config.result > @@ -428,33 +428,6 @@ _ = lcfg.check(cfg) > -- > -- gh-77: garbage collection options. > -- > -cfg.collect_bucket_garbage_interval = 'str' > ---- > -... > -check(cfg) > ---- > -- Garbage bucket collect interval must be positive number > -... > -cfg.collect_bucket_garbage_interval = 0 > ---- > -... > -check(cfg) > ---- > -- Garbage bucket collect interval must be positive number > -... > -cfg.collect_bucket_garbage_interval = -1 > ---- > -... > -check(cfg) > ---- > -- Garbage bucket collect interval must be positive number > -... > -cfg.collect_bucket_garbage_interval = 100.5 > ---- > -... > -_ = lcfg.check(cfg) > ---- > -... > cfg.collect_lua_garbage = 100 > --- > ... > @@ -615,6 +588,12 @@ lcfg.check(cfg).rebalancer_max_sending > cfg.rebalancer_max_sending = nil > --- > ... > -cfg.sharding = nil > +-- > +-- Deprecated option does not break anything. > +-- > +cfg.collect_bucket_garbage_interval = 100 > +--- > +... > +_ = lcfg.check(cfg) > --- > ... > diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua > index ada43db..a1c9f07 100644 > --- a/test/unit/config.test.lua > +++ b/test/unit/config.test.lua > @@ -175,15 +175,6 @@ _ = lcfg.check(cfg) > -- > -- gh-77: garbage collection options. > -- > -cfg.collect_bucket_garbage_interval = 'str' > -check(cfg) > -cfg.collect_bucket_garbage_interval = 0 > -check(cfg) > -cfg.collect_bucket_garbage_interval = -1 > -check(cfg) > -cfg.collect_bucket_garbage_interval = 100.5 > -_ = lcfg.check(cfg) > - > cfg.collect_lua_garbage = 100 > check(cfg) > cfg.collect_lua_garbage = true > @@ -244,4 +235,9 @@ util.check_error(lcfg.check, cfg) > cfg.rebalancer_max_sending = 15 > lcfg.check(cfg).rebalancer_max_sending > cfg.rebalancer_max_sending = nil > -cfg.sharding = nil > + > +-- > +-- Deprecated option does not break anything. > +-- > +cfg.collect_bucket_garbage_interval = 100 > +_ = lcfg.check(cfg) > diff --git a/test/unit/garbage.result b/test/unit/garbage.result > index 74d9ccf..a530496 100644 > --- a/test/unit/garbage.result > +++ b/test/unit/garbage.result > @@ -31,9 +31,6 @@ test_run:cmd("setopt delimiter ''"); > vshard.storage.internal.shard_index = 'bucket_id' > --- > ... > -vshard.storage.internal.collect_bucket_garbage_interval = vshard.consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL > ---- > -... > -- > -- Find nothing if no bucket_id anywhere, or there is no index > -- by it, or bucket_id is not unsigned. > @@ -151,6 +148,9 @@ format[1] = {name = 'id', type = 'unsigned'} > format[2] = {name = 'status', type = 'string'} > --- > ... > +format[3] = {name = 'destination', type = 'string', is_nullable = true} > +--- > +... > _bucket = box.schema.create_space('_bucket', {format = format}) > --- > ... > @@ -172,22 +172,6 @@ _bucket:replace{3, vshard.consts.BUCKET.ACTIVE} > --- > - [3, 'active'] > ... > -_bucket:replace{4, vshard.consts.BUCKET.SENT} > ---- > -- [4, 'sent'] > -... > -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE} > ---- > -- [5, 'garbage'] > -... > -_bucket:replace{6, vshard.consts.BUCKET.GARBAGE} > ---- > -- [6, 'garbage'] > -... > -_bucket:replace{200, vshard.consts.BUCKET.GARBAGE} > ---- > -- [200, 'garbage'] > -... > s = box.schema.create_space('test', {engine = engine}) > --- > ... > @@ -213,7 +197,7 @@ s:replace{4, 2} > --- > - [4, 2] > ... > -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type > +gc_bucket_drop = vshard.storage.internal.gc_bucket_drop > --- > ... > s2 = box.schema.create_space('test2', {engine = engine}) > @@ -249,6 +233,10 @@ function fill_spaces_with_garbage() > s2:replace{6, 4} > s2:replace{7, 5} > s2:replace{7, 6} > + _bucket:replace{4, vshard.consts.BUCKET.SENT, 'destination1'} > + _bucket:replace{5, vshard.consts.BUCKET.GARBAGE} > + _bucket:replace{6, vshard.consts.BUCKET.GARBAGE, 'destination2'} > + _bucket:replace{200, vshard.consts.BUCKET.GARBAGE} > end; > --- > ... > @@ -267,12 +255,22 @@ fill_spaces_with_garbage() > --- > - 1107 > ... > -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) > +route_map = {} > +--- > +... > +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map) > --- > -- - 5 > - - 6 > - - 200 > - true > +- null > +... > +route_map > +--- > +- - null > + - null > + - null > + - null > + - null > + - destination2 > ... > #s2:select{} > --- > @@ -282,10 +280,20 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) > --- > - 7 > ... > -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) > +route_map = {} > +--- > +... > +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map) > --- > -- - 4 > - true > +- null > +... > +route_map > +--- > +- - null > + - null > + - null > + - destination1 > ... > s2:select{} > --- > @@ -303,17 +311,22 @@ s:select{} > - [6, 100] > ... > -- Nothing deleted - update collected generation. > -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) > +route_map = {} > +--- > +... > +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map) > --- > -- - 5 > - - 6 > - - 200 > - true > +- null > ... > -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) > +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map) > --- > -- - 4 > - true > +- null > +... > +route_map > +--- > +- [] > ... > #s2:select{} > --- > @@ -329,15 +342,20 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) > fill_spaces_with_garbage() > --- > ... > -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end) > +_ = _bucket:on_replace(function() \ > + local gen = vshard.storage.internal.bucket_generation \ > + vshard.storage.internal.bucket_generation = gen + 1 \ > + vshard.storage.internal.bucket_generation_cond:broadcast() \ > +end) > --- > ... > f = fiber.create(vshard.storage.internal.gc_bucket_f) > --- > ... > -- Wait until garbage collection is finished. > -while s2:count() ~= 3 or s:count() ~= 6 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_cond(function() return s2:count() == 3 and s:count() == 6 end) > --- > +- true > ... > s:select{} > --- > @@ -360,7 +378,6 @@ _bucket:select{} > - - [1, 'active'] > - [2, 'receiving'] > - [3, 'active'] > - - [4, 'sent'] > ... > -- > -- Test deletion of 'sent' buckets after a specified timeout. > @@ -370,8 +387,9 @@ _bucket:replace{2, vshard.consts.BUCKET.SENT} > - [2, 'sent'] > ... > -- Wait deletion after a while. > -while _bucket:get{2} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_cond(function() return not _bucket:get{2} end) > --- > +- true > ... > _bucket:select{} > --- > @@ -410,8 +428,9 @@ _bucket:replace{4, vshard.consts.BUCKET.SENT} > --- > - [4, 'sent'] > ... > -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_cond(function() return not _bucket:get{4} end) > --- > +- true > ... > -- > -- Test WAL errors during deletion from _bucket. > @@ -434,11 +453,14 @@ s:replace{6, 4} > --- > - [6, 4] > ... > -while not test_run:grep_log("default", "Error during deletion of empty sent buckets") do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_log('default', 'Error during garbage collection step', \ > + 65536, 10) > --- > +- Error during garbage collection step > ... > -while #sk:select{4} ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_cond(function() return #sk:select{4} == 0 end) > --- > +- true > ... > s:select{} > --- > @@ -454,8 +476,9 @@ _bucket:select{} > _ = _bucket:on_replace(nil, rollback_on_delete) > --- > ... > -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_cond(function() return not _bucket:get{4} end) > --- > +- true > ... > f:cancel() > --- > @@ -562,8 +585,9 @@ for i = 1, 2000 do _bucket:replace{i, vshard.consts.BUCKET.GARBAGE} s:replace{i, > f = fiber.create(vshard.storage.internal.gc_bucket_f) > --- > ... > -while _bucket:count() ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_cond(function() return _bucket:count() == 0 end) > --- > +- true > ... > _bucket:select{} > --- > diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua > index 30079fa..250afb0 100644 > --- a/test/unit/garbage.test.lua > +++ b/test/unit/garbage.test.lua > @@ -15,7 +15,6 @@ end; > test_run:cmd("setopt delimiter ''"); > > vshard.storage.internal.shard_index = 'bucket_id' > -vshard.storage.internal.collect_bucket_garbage_interval = vshard.consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL > > -- > -- Find nothing if no bucket_id anywhere, or there is no index > @@ -75,16 +74,13 @@ s:drop() > format = {} > format[1] = {name = 'id', type = 'unsigned'} > format[2] = {name = 'status', type = 'string'} > +format[3] = {name = 'destination', type = 'string', is_nullable = true} > _bucket = box.schema.create_space('_bucket', {format = format}) > _ = _bucket:create_index('pk') > _ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false}) > _bucket:replace{1, vshard.consts.BUCKET.ACTIVE} > _bucket:replace{2, vshard.consts.BUCKET.RECEIVING} > _bucket:replace{3, vshard.consts.BUCKET.ACTIVE} > -_bucket:replace{4, vshard.consts.BUCKET.SENT} > -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE} > -_bucket:replace{6, vshard.consts.BUCKET.GARBAGE} > -_bucket:replace{200, vshard.consts.BUCKET.GARBAGE} > > s = box.schema.create_space('test', {engine = engine}) > pk = s:create_index('pk') > @@ -94,7 +90,7 @@ s:replace{2, 1} > s:replace{3, 2} > s:replace{4, 2} > > -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type > +gc_bucket_drop = vshard.storage.internal.gc_bucket_drop > s2 = box.schema.create_space('test2', {engine = engine}) > pk2 = s2:create_index('pk') > sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false}) > @@ -114,6 +110,10 @@ function fill_spaces_with_garbage() > s2:replace{6, 4} > s2:replace{7, 5} > s2:replace{7, 6} > + _bucket:replace{4, vshard.consts.BUCKET.SENT, 'destination1'} > + _bucket:replace{5, vshard.consts.BUCKET.GARBAGE} > + _bucket:replace{6, vshard.consts.BUCKET.GARBAGE, 'destination2'} > + _bucket:replace{200, vshard.consts.BUCKET.GARBAGE} > end; > test_run:cmd("setopt delimiter ''"); > > @@ -121,15 +121,21 @@ fill_spaces_with_garbage() > > #s2:select{} > #s:select{} > -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) > +route_map = {} > +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map) > +route_map > #s2:select{} > #s:select{} > -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) > +route_map = {} > +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map) > +route_map > s2:select{} > s:select{} > -- Nothing deleted - update collected generation. > -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) > -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) > +route_map = {} > +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map) > +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map) > +route_map > #s2:select{} > #s:select{} > > @@ -137,10 +143,14 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) > -- Test continuous garbage collection via background fiber. > -- > fill_spaces_with_garbage() > -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end) > +_ = _bucket:on_replace(function() \ > + local gen = vshard.storage.internal.bucket_generation \ > + vshard.storage.internal.bucket_generation = gen + 1 \ > + vshard.storage.internal.bucket_generation_cond:broadcast() \ > +end) > f = fiber.create(vshard.storage.internal.gc_bucket_f) > -- Wait until garbage collection is finished. > -while s2:count() ~= 3 or s:count() ~= 6 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_cond(function() return s2:count() == 3 and s:count() == 6 end) > s:select{} > s2:select{} > -- Check garbage bucket is deleted by background fiber. > @@ -150,7 +160,7 @@ _bucket:select{} > -- > _bucket:replace{2, vshard.consts.BUCKET.SENT} > -- Wait deletion after a while. > -while _bucket:get{2} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_cond(function() return not _bucket:get{2} end) > _bucket:select{} > s:select{} > s2:select{} > @@ -162,7 +172,7 @@ _bucket:replace{4, vshard.consts.BUCKET.ACTIVE} > s:replace{5, 4} > s:replace{6, 4} > _bucket:replace{4, vshard.consts.BUCKET.SENT} > -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_cond(function() return not _bucket:get{4} end) > > -- > -- Test WAL errors during deletion from _bucket. > @@ -172,12 +182,13 @@ _ = _bucket:on_replace(rollback_on_delete) > _bucket:replace{4, vshard.consts.BUCKET.SENT} > s:replace{5, 4} > s:replace{6, 4} > -while not test_run:grep_log("default", "Error during deletion of empty sent buckets") do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > -while #sk:select{4} ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_log('default', 'Error during garbage collection step', \ > + 65536, 10) > +test_run:wait_cond(function() return #sk:select{4} == 0 end) > s:select{} > _bucket:select{} > _ = _bucket:on_replace(nil, rollback_on_delete) > -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_cond(function() return not _bucket:get{4} end) > > f:cancel() > > @@ -220,7 +231,7 @@ for i = 1, 2000 do _bucket:replace{i, vshard.consts.BUCKET.GARBAGE} s:replace{i, > #s:select{} > #s2:select{} > f = fiber.create(vshard.storage.internal.gc_bucket_f) > -while _bucket:count() ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end > +test_run:wait_cond(function() return _bucket:count() == 0 end) > _bucket:select{} > s:select{} > s2:select{} > diff --git a/test/unit/garbage_errinj.result b/test/unit/garbage_errinj.result > deleted file mode 100644 > index 92c8039..0000000 > --- a/test/unit/garbage_errinj.result > +++ /dev/null > @@ -1,223 +0,0 @@ > -test_run = require('test_run').new() > ---- > -... > -vshard = require('vshard') > ---- > -... > -fiber = require('fiber') > ---- > -... > -engine = test_run:get_cfg('engine') > ---- > -... > -vshard.storage.internal.shard_index = 'bucket_id' > ---- > -... > -format = {} > ---- > -... > -format[1] = {name = 'id', type = 'unsigned'} > ---- > -... > -format[2] = {name = 'status', type = 'string', is_nullable = true} > ---- > -... > -_bucket = box.schema.create_space('_bucket', {format = format}) > ---- > -... > -_ = _bucket:create_index('pk') > ---- > -... > -_ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false}) > ---- > -... > -_bucket:replace{1, vshard.consts.BUCKET.ACTIVE} > ---- > -- [1, 'active'] > -... > -_bucket:replace{2, vshard.consts.BUCKET.RECEIVING} > ---- > -- [2, 'receiving'] > -... > -_bucket:replace{3, vshard.consts.BUCKET.ACTIVE} > ---- > -- [3, 'active'] > -... > -_bucket:replace{4, vshard.consts.BUCKET.SENT} > ---- > -- [4, 'sent'] > -... > -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE} > ---- > -- [5, 'garbage'] > -... > -s = box.schema.create_space('test', {engine = engine}) > ---- > -... > -pk = s:create_index('pk') > ---- > -... > -sk = s:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false}) > ---- > -... > -s:replace{1, 1} > ---- > -- [1, 1] > -... > -s:replace{2, 1} > ---- > -- [2, 1] > -... > -s:replace{3, 2} > ---- > -- [3, 2] > -... > -s:replace{4, 2} > ---- > -- [4, 2] > -... > -s:replace{5, 100} > ---- > -- [5, 100] > -... > -s:replace{6, 100} > ---- > -- [6, 100] > -... > -s:replace{7, 4} > ---- > -- [7, 4] > -... > -s:replace{8, 5} > ---- > -- [8, 5] > -... > -s2 = box.schema.create_space('test2', {engine = engine}) > ---- > -... > -pk2 = s2:create_index('pk') > ---- > -... > -sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false}) > ---- > -... > -s2:replace{1, 1} > ---- > -- [1, 1] > -... > -s2:replace{3, 3} > ---- > -- [3, 3] > -... > -for i = 7, 1107 do s:replace{i, 200} end > ---- > -... > -s2:replace{4, 200} > ---- > -- [4, 200] > -... > -s2:replace{5, 100} > ---- > -- [5, 100] > -... > -s2:replace{5, 300} > ---- > -- [5, 300] > -... > -s2:replace{6, 4} > ---- > -- [6, 4] > -... > -s2:replace{7, 5} > ---- > -- [7, 5] > -... > -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type > ---- > -... > -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) > ---- > -- - 4 > -- true > -... > -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) > ---- > -- - 5 > -- true > -... > --- > --- Test _bucket generation change during garbage buckets search. > --- > -s:truncate() > ---- > -... > -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end) > ---- > -... > -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = true > ---- > -... > -f = fiber.create(function() gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) end) > ---- > -... > -_bucket:replace{4, vshard.consts.BUCKET.GARBAGE} > ---- > -- [4, 'garbage'] > -... > -s:replace{5, 4} > ---- > -- [5, 4] > -... > -s:replace{6, 4} > ---- > -- [6, 4] > -... > -#s:select{} > ---- > -- 2 > -... > -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false > ---- > -... > -while f:status() ~= 'dead' do fiber.sleep(0.1) end > ---- > -... > --- Nothing is deleted - _bucket:replace() has changed _bucket > --- generation during search of garbage buckets. > -#s:select{} > ---- > -- 2 > -... > -_bucket:select{4} > ---- > -- - [4, 'garbage'] > -... > --- Next step deletes garbage ok. > -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) > ---- > -- [] > -- true > -... > -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) > ---- > -- - 4 > - - 5 > -- true > -... > -#s:select{} > ---- > -- 0 > -... > -_bucket:delete{4} > ---- > -- [4, 'garbage'] > -... > -s2:drop() > ---- > -... > -s:drop() > ---- > -... > -_bucket:drop() > ---- > -... > diff --git a/test/unit/garbage_errinj.test.lua b/test/unit/garbage_errinj.test.lua > deleted file mode 100644 > index 31184b9..0000000 > --- a/test/unit/garbage_errinj.test.lua > +++ /dev/null > @@ -1,73 +0,0 @@ > -test_run = require('test_run').new() > -vshard = require('vshard') > -fiber = require('fiber') > - > -engine = test_run:get_cfg('engine') > -vshard.storage.internal.shard_index = 'bucket_id' > - > -format = {} > -format[1] = {name = 'id', type = 'unsigned'} > -format[2] = {name = 'status', type = 'string', is_nullable = true} > -_bucket = box.schema.create_space('_bucket', {format = format}) > -_ = _bucket:create_index('pk') > -_ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false}) > -_bucket:replace{1, vshard.consts.BUCKET.ACTIVE} > -_bucket:replace{2, vshard.consts.BUCKET.RECEIVING} > -_bucket:replace{3, vshard.consts.BUCKET.ACTIVE} > -_bucket:replace{4, vshard.consts.BUCKET.SENT} > -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE} > - > -s = box.schema.create_space('test', {engine = engine}) > -pk = s:create_index('pk') > -sk = s:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false}) > -s:replace{1, 1} > -s:replace{2, 1} > -s:replace{3, 2} > -s:replace{4, 2} > -s:replace{5, 100} > -s:replace{6, 100} > -s:replace{7, 4} > -s:replace{8, 5} > - > -s2 = box.schema.create_space('test2', {engine = engine}) > -pk2 = s2:create_index('pk') > -sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false}) > -s2:replace{1, 1} > -s2:replace{3, 3} > -for i = 7, 1107 do s:replace{i, 200} end > -s2:replace{4, 200} > -s2:replace{5, 100} > -s2:replace{5, 300} > -s2:replace{6, 4} > -s2:replace{7, 5} > - > -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type > -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) > -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) > - > --- > --- Test _bucket generation change during garbage buckets search. > --- > -s:truncate() > -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end) > -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = true > -f = fiber.create(function() gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) end) > -_bucket:replace{4, vshard.consts.BUCKET.GARBAGE} > -s:replace{5, 4} > -s:replace{6, 4} > -#s:select{} > -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false > -while f:status() ~= 'dead' do fiber.sleep(0.1) end > --- Nothing is deleted - _bucket:replace() has changed _bucket > --- generation during search of garbage buckets. > -#s:select{} > -_bucket:select{4} > --- Next step deletes garbage ok. > -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) > -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) > -#s:select{} > -_bucket:delete{4} > - > -s2:drop() > -s:drop() > -_bucket:drop() > diff --git a/vshard/cfg.lua b/vshard/cfg.lua > index f7d5dbc..63d5414 100644 > --- a/vshard/cfg.lua > +++ b/vshard/cfg.lua > @@ -251,9 +251,8 @@ local cfg_template = { > max = consts.REBALANCER_MAX_SENDING_MAX > }, > collect_bucket_garbage_interval = { > - type = 'positive number', name = 'Garbage bucket collect interval', > - is_optional = true, > - default = consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL > + name = 'Garbage bucket collect interval', is_deprecated = true, > + reason = 'Has no effect anymore' > }, > collect_lua_garbage = { > type = 'boolean', name = 'Garbage Lua collect necessity', > diff --git a/vshard/consts.lua b/vshard/consts.lua > index 8c2a8b0..3f1585a 100644 > --- a/vshard/consts.lua > +++ b/vshard/consts.lua > @@ -23,6 +23,7 @@ return { > DEFAULT_BUCKET_COUNT = 3000; > BUCKET_SENT_GARBAGE_DELAY = 0.5; > BUCKET_CHUNK_SIZE = 1000; > + LUA_CHUNK_SIZE = 100000, > DEFAULT_REBALANCER_DISBALANCE_THRESHOLD = 1; > REBALANCER_IDLE_INTERVAL = 60 * 60; > REBALANCER_WORK_INTERVAL = 10; > @@ -37,7 +38,7 @@ return { > DEFAULT_FAILOVER_PING_TIMEOUT = 5; > DEFAULT_SYNC_TIMEOUT = 1; > RECONNECT_TIMEOUT = 0.5; > - DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5; > + GC_BACKOFF_INTERVAL = 5, > RECOVERY_INTERVAL = 5; > COLLECT_LUA_GARBAGE_INTERVAL = 100; > > @@ -45,4 +46,6 @@ return { > DISCOVERY_WORK_INTERVAL = 1, > DISCOVERY_WORK_STEP = 0.01, > DISCOVERY_TIMEOUT = 10, > + > + TIMEOUT_INFINITY = 500 * 365 * 86400, > } > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua > index adf1c20..1ea8069 100644 > --- a/vshard/storage/init.lua > +++ b/vshard/storage/init.lua > @@ -69,7 +69,6 @@ if not M then > total_bucket_count = 0, > errinj = { > ERRINJ_CFG = false, > - ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false, > ERRINJ_RELOAD = false, > ERRINJ_CFG_DELAY = false, > ERRINJ_LONG_RECEIVE = false, > @@ -96,6 +95,8 @@ if not M then > -- detect that _bucket was not changed between yields. > -- > bucket_generation = 0, > + -- Condition variable fired on generation update. > + bucket_generation_cond = lfiber.cond(), > -- > -- Reference to the function used as on_replace trigger on > -- _bucket space. It is used to replace the trigger with > @@ -107,12 +108,14 @@ if not M then > -- replace the old function is to keep its reference. > -- > bucket_on_replace = nil, > + -- Redirects for recently sent buckets. They are kept for a while to > + -- help routers to find a new location for sent and deleted buckets > + -- without whole cluster scan. > + route_map = {}, > > ------------------- Garbage collection ------------------- > -- Fiber to remove garbage buckets data. > collect_bucket_garbage_fiber = nil, > - -- Do buckets garbage collection once per this time. > - collect_bucket_garbage_interval = nil, > -- Boolean lua_gc state (create periodic gc task). > collect_lua_garbage = nil, > > @@ -173,6 +176,7 @@ end > -- > local function bucket_generation_increment() > M.bucket_generation = M.bucket_generation + 1 > + M.bucket_generation_cond:broadcast() > end > > -- > @@ -758,8 +762,9 @@ local function bucket_check_state(bucket_id, mode) > else > return bucket > end > + local dst = bucket and bucket.destination or M.route_map[bucket_id] > return bucket, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, reason, > - bucket and bucket.destination) > + dst) > end > > -- > @@ -804,11 +809,23 @@ end > -- > local function bucket_unrefro(bucket_id) > local ref = M.bucket_refs[bucket_id] > - if not ref or ref.ro == 0 then > + local count = ref and ref.ro or 0 > + if count == 0 then > return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, > "no refs", nil) > end > - ref.ro = ref.ro - 1 > + if count == 1 then > + ref.ro = 0 > + if ref.ro_lock then > + -- Garbage collector is waiting for the bucket if RO > + -- is locked. Let it know it has one more bucket to > + -- collect. It relies on generation, so its increment > + -- it enough. > + bucket_generation_increment() > + end > + return true > + end > + ref.ro = count - 1 > return true > end > > @@ -1481,79 +1498,44 @@ local function gc_bucket_in_space(space, bucket_id, status) > end > > -- > --- Remove tuples from buckets of a specified type. > --- @param type Type of buckets to gc. > --- @retval List of ids of empty buckets of the type. > +-- Drop buckets with the given status along with their data in all spaces. > +-- @param status Status of target buckets. > +-- @param route_map Destinations of deleted buckets are saved into this table. > -- > -local function gc_bucket_step_by_type(type) > - local sharded_spaces = find_sharded_spaces() > - local empty_buckets = {} > +local function gc_bucket_drop_xc(status, route_map) > local limit = consts.BUCKET_CHUNK_SIZE > - local is_all_collected = true > - for _, bucket in box.space._bucket.index.status:pairs(type) do > - local bucket_id = bucket.id > - local ref = M.bucket_refs[bucket_id] > + local _bucket = box.space._bucket > + local sharded_spaces = find_sharded_spaces() > + for _, b in _bucket.index.status:pairs(status) do > + local id = b.id > + local ref = M.bucket_refs[id] > if ref then > assert(ref.rw == 0) > if ref.ro ~= 0 then > ref.ro_lock = true > - is_all_collected = false > goto continue > end > - M.bucket_refs[bucket_id] = nil > + M.bucket_refs[id] = nil > end > for _, space in pairs(sharded_spaces) do > - gc_bucket_in_space_xc(space, bucket_id, type) > + gc_bucket_in_space_xc(space, id, status) > limit = limit - 1 > if limit == 0 then > lfiber.sleep(0) > limit = consts.BUCKET_CHUNK_SIZE > end > end > - table.insert(empty_buckets, bucket.id) > -::continue:: > + route_map[id] = b.destination > + _bucket:delete{id} > + ::continue:: > end > - return empty_buckets, is_all_collected > -end > - > --- > --- Drop buckets with ids in the list. > --- @param bucket_ids Bucket ids to drop. > --- @param status Expected bucket status. > --- > -local function gc_bucket_drop_xc(bucket_ids, status) > - if #bucket_ids == 0 then > - return > - end > - local limit = consts.BUCKET_CHUNK_SIZE > - box.begin() > - local _bucket = box.space._bucket > - for _, id in pairs(bucket_ids) do > - local bucket_exists = _bucket:get{id} ~= nil > - local b = _bucket:get{id} > - if b then > - if b.status ~= status then > - return error(string.format('Bucket %d status is changed. Was '.. > - '%s, became %s', id, status, > - b.status)) > - end > - _bucket:delete{id} > - end > - limit = limit - 1 > - if limit == 0 then > - box.commit() > - box.begin() > - limit = consts.BUCKET_CHUNK_SIZE > - end > - end > - box.commit() > end > > -- > -- Exception safe version of gc_bucket_drop_xc. > -- > -local function gc_bucket_drop(bucket_ids, status) > - local status, err = pcall(gc_bucket_drop_xc, bucket_ids, status) > +local function gc_bucket_drop(status, route_map) > + local status, err = pcall(gc_bucket_drop_xc, status, route_map) > if not status then > box.rollback() > end > @@ -1561,14 +1543,16 @@ local function gc_bucket_drop(bucket_ids, status) > end > > -- > --- Garbage collector. Works on masters. The garbage collector > --- wakes up once per specified time. > +-- Garbage collector. Works on masters. The garbage collector wakes up when > +-- state of any bucket changes. > -- After wakeup it follows the plan: > --- 1) Check if _bucket has changed. If not, then sleep again; > --- 2) Scan user spaces for sent and garbage buckets, delete > --- garbage data in batches of limited size; > --- 3) Delete GARBAGE buckets from _bucket immediately, and > --- schedule SENT buckets for deletion after a timeout; > +-- 1) Check if state of any bucket has really changed. If not, then sleep again; > +-- 2) Delete all GARBAGE and SENT buckets along with their data in chunks of > +-- limited size. > +-- 3) Bucket destinations are saved into a global route_map to reroute incoming > +-- requests from routers in case they didn't notice the buckets being moved. > +-- The saved routes are scheduled for deletion after a timeout, which is > +-- checked on each iteration of this loop. > -- 4) Sleep, go to (1). > -- For each step details see comments in the code. > -- > @@ -1580,65 +1564,75 @@ function gc_bucket_f() > -- generation == bucket generation. In such a case the fiber > -- does nothing until next _bucket change. > local bucket_generation_collected = -1 > - -- Empty sent buckets are collected into an array. After a > - -- specified time interval the buckets are deleted both from > - -- this array and from _bucket space. > - local buckets_for_redirect = {} > - local buckets_for_redirect_ts = fiber_clock() > - -- Empty sent buckets, updated after each step, and when > - -- buckets_for_redirect is deleted, it gets empty_sent_buckets > - -- for next deletion. > - local empty_garbage_buckets, empty_sent_buckets, status, err > + local bucket_generation_current = M.bucket_generation > + -- Deleted buckets are saved into a route map to redirect routers if they > + -- didn't discover new location of the buckets yet. However route map does > + -- not grow infinitely. Otherwise it would end up storing redirects for all > + -- buckets in the cluster. Which could also be outdated. > + -- Garbage collector periodically drops old routes from the map. For that it > + -- remembers state of route map in one moment, and after a while clears the > + -- remembered routes from the global route map. > + local route_map = M.route_map > + local route_map_old = {} > + local route_map_deadline = 0 > + local status, err > while M.module_version == module_version do > - -- Check if no changes in buckets configuration. > - if bucket_generation_collected ~= M.bucket_generation then > - local bucket_generation = M.bucket_generation > - local is_sent_collected, is_garbage_collected > - status, empty_garbage_buckets, is_garbage_collected = > - pcall(gc_bucket_step_by_type, consts.BUCKET.GARBAGE) > - if not status then > - err = empty_garbage_buckets > - goto check_error > - end > - status, empty_sent_buckets, is_sent_collected = > - pcall(gc_bucket_step_by_type, consts.BUCKET.SENT) > - if not status then > - err = empty_sent_buckets > - goto check_error > + if bucket_generation_collected ~= bucket_generation_current then > + status, err = gc_bucket_drop(consts.BUCKET.GARBAGE, route_map) > + if status then > + status, err = gc_bucket_drop(consts.BUCKET.SENT, route_map) > end > - status, err = gc_bucket_drop(empty_garbage_buckets, > - consts.BUCKET.GARBAGE) > -::check_error:: > if not status then > box.rollback() > log.error('Error during garbage collection step: %s', err) > - goto continue > + else > + -- Don't use global generation. During the collection it could > + -- already change. Instead, remember the generation known before > + -- the collection has started. > + -- Since the collection also changes the generation, it makes > + -- the GC happen always at least twice. But typically on the > + -- second iteration it should not find any buckets to collect, > + -- and then the collected generation matches the global one. > + bucket_generation_collected = bucket_generation_current > end > - if is_sent_collected and is_garbage_collected then > - bucket_generation_collected = bucket_generation > + else > + status = true > + end > + > + local sleep_time = route_map_deadline - fiber_clock() > + if sleep_time <= 0 then > + local chunk = consts.LUA_CHUNK_SIZE > + util.table_minus_yield(route_map, route_map_old, chunk) > + route_map_old = util.table_copy_yield(route_map, chunk) > + if next(route_map_old) then > + sleep_time = consts.BUCKET_SENT_GARBAGE_DELAY > + else > + sleep_time = consts.TIMEOUT_INFINITY > end > + route_map_deadline = fiber_clock() + sleep_time > end > + bucket_generation_current = M.bucket_generation > > - if fiber_clock() - buckets_for_redirect_ts >= > - consts.BUCKET_SENT_GARBAGE_DELAY then > - status, err = gc_bucket_drop(buckets_for_redirect, > - consts.BUCKET.SENT) > - if not status then > - buckets_for_redirect = {} > - empty_sent_buckets = {} > - bucket_generation_collected = -1 > - log.error('Error during deletion of empty sent buckets: %s', > - err) > - elseif M.module_version ~= module_version then > - return > + if bucket_generation_current ~= bucket_generation_collected then > + -- Generation was changed during collection. Or *by* collection. > + if status then > + -- Retry immediately. If the generation was changed by the > + -- collection itself, it will notice it next iteration, and go > + -- to proper sleep. > + sleep_time = 0 > else > - buckets_for_redirect = empty_sent_buckets or {} > - empty_sent_buckets = nil > - buckets_for_redirect_ts = fiber_clock() > + -- An error happened during the collection. Does not make sense > + -- to retry on each iteration of the event loop. The most likely > + -- errors are either a WAL error or a transaction abort - both > + -- look like an issue in the user's code and can't be fixed > + -- quickly anyway. Backoff. > + sleep_time = consts.GC_BACKOFF_INTERVAL > end > end > -::continue:: > - lfiber.sleep(M.collect_bucket_garbage_interval) > + > + if M.module_version == module_version then > + M.bucket_generation_cond:wait(sleep_time) > + end > end > end > > @@ -2423,8 +2417,6 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload) > vshard_cfg.rebalancer_disbalance_threshold > M.rebalancer_receiving_quota = vshard_cfg.rebalancer_max_receiving > M.shard_index = vshard_cfg.shard_index > - M.collect_bucket_garbage_interval = > - vshard_cfg.collect_bucket_garbage_interval > M.collect_lua_garbage = vshard_cfg.collect_lua_garbage > M.rebalancer_worker_count = vshard_cfg.rebalancer_max_sending > M.current_cfg = cfg > @@ -2678,6 +2670,9 @@ else > storage_cfg(M.current_cfg, M.this_replica.uuid, true) > end > M.module_version = M.module_version + 1 > + -- Background fibers could sleep waiting for bucket changes. > + -- Let them know it is time to reload. > + bucket_generation_increment() > end > > M.recovery_f = recovery_f > @@ -2688,7 +2683,7 @@ M.gc_bucket_f = gc_bucket_f > -- These functions are saved in M not for atomic reload, but for > -- unit testing. > -- > -M.gc_bucket_step_by_type = gc_bucket_step_by_type > +M.gc_bucket_drop = gc_bucket_drop > M.rebalancer_build_routes = rebalancer_build_routes > M.rebalancer_calculate_metrics = rebalancer_calculate_metrics > M.cached_find_sharded_spaces = find_sharded_spaces > diff --git a/vshard/storage/reload_evolution.lua b/vshard/storage/reload_evolution.lua > index f38af74..484f499 100644 > --- a/vshard/storage/reload_evolution.lua > +++ b/vshard/storage/reload_evolution.lua > @@ -4,6 +4,7 @@ > -- in a commit. > -- > local log = require('log') > +local fiber = require('fiber') > > -- > -- Array of upgrade functions. > @@ -25,6 +26,13 @@ migrations[#migrations + 1] = function(M) > end > end > > +migrations[#migrations + 1] = function(M) > + if not M.route_map then > + M.bucket_generation_cond = fiber.cond() > + M.route_map = {} > + end > +end > + > -- > -- Perform an update based on a version stored in `M` (internals). > -- @param M Old module internals which should be updated.