From: Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: Oleg Babin <olegrok@tarantool.org>, tarantool-patches@dev.tarantool.org, yaroslav.dynnikov@tarantool.org Subject: Re: [Tarantool-patches] [PATCH 7/9] gc: introduce reactive garbage collector Date: Wed, 10 Feb 2021 23:35:55 +0100 [thread overview] Message-ID: <edbf5fc1-b206-a28e-34d8-fc70bf63f704@tarantool.org> (raw) In-Reply-To: <68b5d6a6-1fb4-913d-61ac-e3062683e325@tarantool.org> Thanks for the review! On 10.02.2021 10:00, Oleg Babin wrote: > Thanks for your patch. > > As I see you've introduced some new parameters: "LUA_CHUNK_SIZE" and "GC_BACKOFF_INTERVAL". I decided not to go into too deep details and not describe private constants in the commit message. GC_BACKOFF_INTERVAL is explained in the place where it is used. LUA_CHUNK_SIZE is quite obvious if you look at its usage. > I think it's better to describe them in commit message to understand more clear how new algorithm. These constants are not super relevant to the algorithm's core idea. It does not matter much for the reactive GC concept if I yield in table utility functions, or if I have a backoff timeout. These could be considered 'optimizations', 'amendments'. I would consider them small details not worth mentioning in the commit message. > I see that you didn't update comment above "gc_bucket_f" function. Is it still relevant? No, irrelevant, thanks for noticing. Here is the diff: ==================== diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 99f92a0..1ea8069 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -1543,14 +1543,16 @@ local function gc_bucket_drop(status, route_map) end -- --- Garbage collector. Works on masters. The garbage collector --- wakes up once per specified time. +-- Garbage collector. Works on masters. The garbage collector wakes up when +-- state of any bucket changes. -- After wakeup it follows the plan: --- 1) Check if _bucket has changed. If not, then sleep again; --- 2) Scan user spaces for sent and garbage buckets, delete --- garbage data in batches of limited size; --- 3) Delete GARBAGE buckets from _bucket immediately, and --- schedule SENT buckets for deletion after a timeout; +-- 1) Check if state of any bucket has really changed. If not, then sleep again; +-- 2) Delete all GARBAGE and SENT buckets along with their data in chunks of +-- limited size. +-- 3) Bucket destinations are saved into a global route_map to reroute incoming +-- requests from routers in case they didn't notice the buckets being moved. +-- The saved routes are scheduled for deletion after a timeout, which is +-- checked on each iteration of this loop. -- 4) Sleep, go to (1). -- For each step details see comments in the code. -- ==================== The full new patch below. ==================== gc: introduce reactive garbage collector Garbage collector is a fiber on a master node which deletes GARBAGE and SENT buckets along with their data. It was proactive. It used to wakeup with a constant period to find and delete the needed buckets. But this won't work with the future feature called 'map-reduce'. Map-reduce as a preparation stage will need to ensure that all buckets on a storage are readable and writable. With the current GC algorithm if a bucket is sent, it won't be deleted for the next 5 seconds by default. During this time all new map-reduce requests can't execute. This is not acceptable. As well as too frequent wakeup of GC fiber because it would waste TX thread time. The patch makes GC fiber wakeup not by a timeout but by events happening with _bucket space. GC fiber sleeps on a condition variable which is signaled when _bucket is changed. Once GC sees work to do, it won't sleep until it is done. It will only yield. This makes GC delete SENT and GARBAGE buckets as soon as possible reducing the waiting time for the incoming map-reduce requests. Needed for #147 @TarantoolBot document Title: VShard: deprecate cfg option 'collect_bucket_garbage_interval' It was used to specify the interval between bucket garbage collection steps. It was needed because garbage collection in vshard was proactive. It didn't react to newly appeared garbage buckets immediately. Since now (0.1.17) garbage collection became reactive. It starts working with garbage buckets immediately as they appear. And sleeps rest of the time. The option is not used now and does not affect behaviour of anything. I suppose it can be deleted from the documentation. Or left with a big label 'deprecated' + the explanation above. An attempt to use the option does not cause an error, but logs a warning. diff --git a/test/lua_libs/storage_template.lua b/test/lua_libs/storage_template.lua index 21409bd..8df89f6 100644 --- a/test/lua_libs/storage_template.lua +++ b/test/lua_libs/storage_template.lua @@ -172,6 +172,5 @@ function wait_bucket_is_collected(id) return true end vshard.storage.recovery_wakeup() - vshard.storage.garbage_collector_wakeup() end) end diff --git a/test/misc/reconfigure.result b/test/misc/reconfigure.result index 168be5d..3b34841 100644 --- a/test/misc/reconfigure.result +++ b/test/misc/reconfigure.result @@ -83,9 +83,6 @@ cfg.collect_lua_garbage = true cfg.rebalancer_max_receiving = 1000 --- ... -cfg.collect_bucket_garbage_interval = 100 ---- -... cfg.invalid_option = 'kek' --- ... @@ -105,10 +102,6 @@ vshard.storage.internal.rebalancer_max_receiving ~= 1000 --- - true ... -vshard.storage.internal.collect_bucket_garbage_interval ~= 100 ---- -- true -... cfg.sync_timeout = nil --- ... @@ -118,9 +111,6 @@ cfg.collect_lua_garbage = nil cfg.rebalancer_max_receiving = nil --- ... -cfg.collect_bucket_garbage_interval = nil ---- -... cfg.invalid_option = nil --- ... diff --git a/test/misc/reconfigure.test.lua b/test/misc/reconfigure.test.lua index e891010..348628c 100644 --- a/test/misc/reconfigure.test.lua +++ b/test/misc/reconfigure.test.lua @@ -33,17 +33,14 @@ vshard.storage.internal.sync_timeout cfg.sync_timeout = 100 cfg.collect_lua_garbage = true cfg.rebalancer_max_receiving = 1000 -cfg.collect_bucket_garbage_interval = 100 cfg.invalid_option = 'kek' vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a) not vshard.storage.internal.collect_lua_garbage vshard.storage.internal.sync_timeout vshard.storage.internal.rebalancer_max_receiving ~= 1000 -vshard.storage.internal.collect_bucket_garbage_interval ~= 100 cfg.sync_timeout = nil cfg.collect_lua_garbage = nil cfg.rebalancer_max_receiving = nil -cfg.collect_bucket_garbage_interval = nil cfg.invalid_option = nil -- diff --git a/test/rebalancer/bucket_ref.result b/test/rebalancer/bucket_ref.result index b8fc7ff..9df7480 100644 --- a/test/rebalancer/bucket_ref.result +++ b/test/rebalancer/bucket_ref.result @@ -184,9 +184,6 @@ vshard.storage.bucket_unref(1, 'read') - true ... -- Force GC to take an RO lock on the bucket now. -vshard.storage.garbage_collector_wakeup() ---- -... vshard.storage.buckets_info(1) --- - 1: @@ -203,7 +200,6 @@ while true do if i.status == vshard.consts.BUCKET.GARBAGE and i.ro_lock then break end - vshard.storage.garbage_collector_wakeup() fiber.sleep(0.01) end; --- @@ -235,14 +231,6 @@ finish_refs = true while f1:status() ~= 'dead' do fiber.sleep(0.01) end --- ... -vshard.storage.buckets_info(1) ---- -- 1: - status: garbage - ro_lock: true - destination: <replicaset_2> - id: 1 -... wait_bucket_is_collected(1) --- ... diff --git a/test/rebalancer/bucket_ref.test.lua b/test/rebalancer/bucket_ref.test.lua index 213ced3..1b032ff 100644 --- a/test/rebalancer/bucket_ref.test.lua +++ b/test/rebalancer/bucket_ref.test.lua @@ -56,7 +56,6 @@ vshard.storage.bucket_unref(1, 'write') -- Error, no refs. vshard.storage.bucket_ref(1, 'read') vshard.storage.bucket_unref(1, 'read') -- Force GC to take an RO lock on the bucket now. -vshard.storage.garbage_collector_wakeup() vshard.storage.buckets_info(1) _ = test_run:cmd("setopt delimiter ';'") while true do @@ -64,7 +63,6 @@ while true do if i.status == vshard.consts.BUCKET.GARBAGE and i.ro_lock then break end - vshard.storage.garbage_collector_wakeup() fiber.sleep(0.01) end; _ = test_run:cmd("setopt delimiter ''"); @@ -72,7 +70,6 @@ vshard.storage.buckets_info(1) vshard.storage.bucket_refro(1) finish_refs = true while f1:status() ~= 'dead' do fiber.sleep(0.01) end -vshard.storage.buckets_info(1) wait_bucket_is_collected(1) _ = test_run:switch('box_2_a') vshard.storage.buckets_info(1) diff --git a/test/rebalancer/errinj.result b/test/rebalancer/errinj.result index e50eb72..0ddb1c9 100644 --- a/test/rebalancer/errinj.result +++ b/test/rebalancer/errinj.result @@ -226,17 +226,6 @@ ret2, err2 - true - null ... -_bucket:get{35} ---- -- [35, 'sent', '<replicaset_2>'] -... -_bucket:get{36} ---- -- [36, 'sent', '<replicaset_2>'] -... --- Buckets became 'active' on box_2_a, but still are sending on --- box_1_a. Wait until it is marked as garbage on box_1_a by the --- recovery fiber. wait_bucket_is_collected(35) --- ... diff --git a/test/rebalancer/errinj.test.lua b/test/rebalancer/errinj.test.lua index 2cc4a69..a60f3d7 100644 --- a/test/rebalancer/errinj.test.lua +++ b/test/rebalancer/errinj.test.lua @@ -102,11 +102,6 @@ _ = test_run:switch('box_1_a') while f1:status() ~= 'dead' or f2:status() ~= 'dead' do fiber.sleep(0.001) end ret1, err1 ret2, err2 -_bucket:get{35} -_bucket:get{36} --- Buckets became 'active' on box_2_a, but still are sending on --- box_1_a. Wait until it is marked as garbage on box_1_a by the --- recovery fiber. wait_bucket_is_collected(35) wait_bucket_is_collected(36) _ = test_run:switch('box_2_a') diff --git a/test/rebalancer/receiving_bucket.result b/test/rebalancer/receiving_bucket.result index 7d3612b..ad93445 100644 --- a/test/rebalancer/receiving_bucket.result +++ b/test/rebalancer/receiving_bucket.result @@ -366,14 +366,6 @@ vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3}) --- - true ... -vshard.storage.buckets_info(1) ---- -- 1: - status: sent - ro_lock: true - destination: <replicaset_1> - id: 1 -... wait_bucket_is_collected(1) --- ... diff --git a/test/rebalancer/receiving_bucket.test.lua b/test/rebalancer/receiving_bucket.test.lua index 24534b3..2cf6382 100644 --- a/test/rebalancer/receiving_bucket.test.lua +++ b/test/rebalancer/receiving_bucket.test.lua @@ -136,7 +136,6 @@ box.space.test3:select{100} -- Now the bucket is unreferenced and can be transferred. _ = test_run:switch('box_2_a') vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3}) -vshard.storage.buckets_info(1) wait_bucket_is_collected(1) vshard.storage.buckets_info(1) _ = test_run:switch('box_1_a') diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result index 753687f..9d30a04 100644 --- a/test/reload_evolution/storage.result +++ b/test/reload_evolution/storage.result @@ -92,7 +92,7 @@ test_run:grep_log('storage_2_a', 'vshard.storage.reload_evolution: upgraded to') ... vshard.storage.internal.reload_version --- -- 2 +- 3 ... -- -- gh-237: should be only one trigger. During gh-237 the trigger installation diff --git a/test/router/reroute_wrong_bucket.result b/test/router/reroute_wrong_bucket.result index 049bdef..ac340eb 100644 --- a/test/router/reroute_wrong_bucket.result +++ b/test/router/reroute_wrong_bucket.result @@ -37,7 +37,7 @@ test_run:switch('storage_1_a') --- - true ... -cfg.collect_bucket_garbage_interval = 100 +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100 --- ... vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a) @@ -53,7 +53,7 @@ test_run:switch('storage_2_a') --- - true ... -cfg.collect_bucket_garbage_interval = 100 +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100 --- ... vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a) @@ -202,12 +202,12 @@ test_run:grep_log('router_1', 'please update configuration') err --- - bucket_id: 100 - reason: write is prohibited + reason: Not found code: 1 destination: ac522f65-aa94-4134-9f64-51ee384f1a54 type: ShardingError name: WRONG_BUCKET - message: 'Cannot perform action with bucket 100, reason: write is prohibited' + message: 'Cannot perform action with bucket 100, reason: Not found' ... -- -- Now try again, but update configuration during call(). It must diff --git a/test/router/reroute_wrong_bucket.test.lua b/test/router/reroute_wrong_bucket.test.lua index 9e6e804..207aac3 100644 --- a/test/router/reroute_wrong_bucket.test.lua +++ b/test/router/reroute_wrong_bucket.test.lua @@ -11,13 +11,13 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt test_run:cmd('create server router_1 with script="router/router_1.lua"') test_run:cmd('start server router_1') test_run:switch('storage_1_a') -cfg.collect_bucket_garbage_interval = 100 +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100 vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a) vshard.storage.rebalancer_disable() for i = 1, 100 do box.space._bucket:replace{i, vshard.consts.BUCKET.ACTIVE} end test_run:switch('storage_2_a') -cfg.collect_bucket_garbage_interval = 100 +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100 vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a) vshard.storage.rebalancer_disable() for i = 101, 200 do box.space._bucket:replace{i, vshard.consts.BUCKET.ACTIVE} end diff --git a/test/storage/recovery.result b/test/storage/recovery.result index f833fe7..8ccb0b9 100644 --- a/test/storage/recovery.result +++ b/test/storage/recovery.result @@ -79,8 +79,7 @@ _bucket = box.space._bucket ... _bucket:select{} --- -- - [2, 'garbage', '<replicaset_2>'] - - [3, 'garbage', '<replicaset_2>'] +- [] ... _ = test_run:switch('storage_2_a') --- diff --git a/test/storage/storage.result b/test/storage/storage.result index 424bc4c..0550ad1 100644 --- a/test/storage/storage.result +++ b/test/storage/storage.result @@ -547,6 +547,9 @@ vshard.storage.bucket_send(1, util.replicasets[2]) --- - true ... +wait_bucket_is_collected(1) +--- +... _ = test_run:switch("storage_2_a") --- ... @@ -567,12 +570,7 @@ _ = test_run:switch("storage_1_a") ... vshard.storage.buckets_info() --- -- 1: - status: sent - ro_lock: true - destination: <replicaset_2> - id: 1 - 2: +- 2: status: active id: 2 ... diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua index d631b51..d8fbd94 100644 --- a/test/storage/storage.test.lua +++ b/test/storage/storage.test.lua @@ -136,6 +136,7 @@ vshard.storage.bucket_send(1, util.replicasets[1]) -- Successful transfer. vshard.storage.bucket_send(1, util.replicasets[2]) +wait_bucket_is_collected(1) _ = test_run:switch("storage_2_a") vshard.storage.buckets_info() _ = test_run:switch("storage_1_a") diff --git a/test/unit/config.result b/test/unit/config.result index dfd0219..e0b2482 100644 --- a/test/unit/config.result +++ b/test/unit/config.result @@ -428,33 +428,6 @@ _ = lcfg.check(cfg) -- -- gh-77: garbage collection options. -- -cfg.collect_bucket_garbage_interval = 'str' ---- -... -check(cfg) ---- -- Garbage bucket collect interval must be positive number -... -cfg.collect_bucket_garbage_interval = 0 ---- -... -check(cfg) ---- -- Garbage bucket collect interval must be positive number -... -cfg.collect_bucket_garbage_interval = -1 ---- -... -check(cfg) ---- -- Garbage bucket collect interval must be positive number -... -cfg.collect_bucket_garbage_interval = 100.5 ---- -... -_ = lcfg.check(cfg) ---- -... cfg.collect_lua_garbage = 100 --- ... @@ -615,6 +588,12 @@ lcfg.check(cfg).rebalancer_max_sending cfg.rebalancer_max_sending = nil --- ... -cfg.sharding = nil +-- +-- Deprecated option does not break anything. +-- +cfg.collect_bucket_garbage_interval = 100 +--- +... +_ = lcfg.check(cfg) --- ... diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua index ada43db..a1c9f07 100644 --- a/test/unit/config.test.lua +++ b/test/unit/config.test.lua @@ -175,15 +175,6 @@ _ = lcfg.check(cfg) -- -- gh-77: garbage collection options. -- -cfg.collect_bucket_garbage_interval = 'str' -check(cfg) -cfg.collect_bucket_garbage_interval = 0 -check(cfg) -cfg.collect_bucket_garbage_interval = -1 -check(cfg) -cfg.collect_bucket_garbage_interval = 100.5 -_ = lcfg.check(cfg) - cfg.collect_lua_garbage = 100 check(cfg) cfg.collect_lua_garbage = true @@ -244,4 +235,9 @@ util.check_error(lcfg.check, cfg) cfg.rebalancer_max_sending = 15 lcfg.check(cfg).rebalancer_max_sending cfg.rebalancer_max_sending = nil -cfg.sharding = nil + +-- +-- Deprecated option does not break anything. +-- +cfg.collect_bucket_garbage_interval = 100 +_ = lcfg.check(cfg) diff --git a/test/unit/garbage.result b/test/unit/garbage.result index 74d9ccf..a530496 100644 --- a/test/unit/garbage.result +++ b/test/unit/garbage.result @@ -31,9 +31,6 @@ test_run:cmd("setopt delimiter ''"); vshard.storage.internal.shard_index = 'bucket_id' --- ... -vshard.storage.internal.collect_bucket_garbage_interval = vshard.consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL ---- -... -- -- Find nothing if no bucket_id anywhere, or there is no index -- by it, or bucket_id is not unsigned. @@ -151,6 +148,9 @@ format[1] = {name = 'id', type = 'unsigned'} format[2] = {name = 'status', type = 'string'} --- ... +format[3] = {name = 'destination', type = 'string', is_nullable = true} +--- +... _bucket = box.schema.create_space('_bucket', {format = format}) --- ... @@ -172,22 +172,6 @@ _bucket:replace{3, vshard.consts.BUCKET.ACTIVE} --- - [3, 'active'] ... -_bucket:replace{4, vshard.consts.BUCKET.SENT} ---- -- [4, 'sent'] -... -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE} ---- -- [5, 'garbage'] -... -_bucket:replace{6, vshard.consts.BUCKET.GARBAGE} ---- -- [6, 'garbage'] -... -_bucket:replace{200, vshard.consts.BUCKET.GARBAGE} ---- -- [200, 'garbage'] -... s = box.schema.create_space('test', {engine = engine}) --- ... @@ -213,7 +197,7 @@ s:replace{4, 2} --- - [4, 2] ... -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type +gc_bucket_drop = vshard.storage.internal.gc_bucket_drop --- ... s2 = box.schema.create_space('test2', {engine = engine}) @@ -249,6 +233,10 @@ function fill_spaces_with_garbage() s2:replace{6, 4} s2:replace{7, 5} s2:replace{7, 6} + _bucket:replace{4, vshard.consts.BUCKET.SENT, 'destination1'} + _bucket:replace{5, vshard.consts.BUCKET.GARBAGE} + _bucket:replace{6, vshard.consts.BUCKET.GARBAGE, 'destination2'} + _bucket:replace{200, vshard.consts.BUCKET.GARBAGE} end; --- ... @@ -267,12 +255,22 @@ fill_spaces_with_garbage() --- - 1107 ... -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) +route_map = {} +--- +... +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map) --- -- - 5 - - 6 - - 200 - true +- null +... +route_map +--- +- - null + - null + - null + - null + - null + - destination2 ... #s2:select{} --- @@ -282,10 +280,20 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) --- - 7 ... -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) +route_map = {} +--- +... +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map) --- -- - 4 - true +- null +... +route_map +--- +- - null + - null + - null + - destination1 ... s2:select{} --- @@ -303,17 +311,22 @@ s:select{} - [6, 100] ... -- Nothing deleted - update collected generation. -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) +route_map = {} +--- +... +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map) --- -- - 5 - - 6 - - 200 - true +- null ... -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map) --- -- - 4 - true +- null +... +route_map +--- +- [] ... #s2:select{} --- @@ -329,15 +342,20 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) fill_spaces_with_garbage() --- ... -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end) +_ = _bucket:on_replace(function() \ + local gen = vshard.storage.internal.bucket_generation \ + vshard.storage.internal.bucket_generation = gen + 1 \ + vshard.storage.internal.bucket_generation_cond:broadcast() \ +end) --- ... f = fiber.create(vshard.storage.internal.gc_bucket_f) --- ... -- Wait until garbage collection is finished. -while s2:count() ~= 3 or s:count() ~= 6 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_cond(function() return s2:count() == 3 and s:count() == 6 end) --- +- true ... s:select{} --- @@ -360,7 +378,6 @@ _bucket:select{} - - [1, 'active'] - [2, 'receiving'] - [3, 'active'] - - [4, 'sent'] ... -- -- Test deletion of 'sent' buckets after a specified timeout. @@ -370,8 +387,9 @@ _bucket:replace{2, vshard.consts.BUCKET.SENT} - [2, 'sent'] ... -- Wait deletion after a while. -while _bucket:get{2} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_cond(function() return not _bucket:get{2} end) --- +- true ... _bucket:select{} --- @@ -410,8 +428,9 @@ _bucket:replace{4, vshard.consts.BUCKET.SENT} --- - [4, 'sent'] ... -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_cond(function() return not _bucket:get{4} end) --- +- true ... -- -- Test WAL errors during deletion from _bucket. @@ -434,11 +453,14 @@ s:replace{6, 4} --- - [6, 4] ... -while not test_run:grep_log("default", "Error during deletion of empty sent buckets") do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_log('default', 'Error during garbage collection step', \ + 65536, 10) --- +- Error during garbage collection step ... -while #sk:select{4} ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_cond(function() return #sk:select{4} == 0 end) --- +- true ... s:select{} --- @@ -454,8 +476,9 @@ _bucket:select{} _ = _bucket:on_replace(nil, rollback_on_delete) --- ... -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_cond(function() return not _bucket:get{4} end) --- +- true ... f:cancel() --- @@ -562,8 +585,9 @@ for i = 1, 2000 do _bucket:replace{i, vshard.consts.BUCKET.GARBAGE} s:replace{i, f = fiber.create(vshard.storage.internal.gc_bucket_f) --- ... -while _bucket:count() ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_cond(function() return _bucket:count() == 0 end) --- +- true ... _bucket:select{} --- diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua index 30079fa..250afb0 100644 --- a/test/unit/garbage.test.lua +++ b/test/unit/garbage.test.lua @@ -15,7 +15,6 @@ end; test_run:cmd("setopt delimiter ''"); vshard.storage.internal.shard_index = 'bucket_id' -vshard.storage.internal.collect_bucket_garbage_interval = vshard.consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL -- -- Find nothing if no bucket_id anywhere, or there is no index @@ -75,16 +74,13 @@ s:drop() format = {} format[1] = {name = 'id', type = 'unsigned'} format[2] = {name = 'status', type = 'string'} +format[3] = {name = 'destination', type = 'string', is_nullable = true} _bucket = box.schema.create_space('_bucket', {format = format}) _ = _bucket:create_index('pk') _ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false}) _bucket:replace{1, vshard.consts.BUCKET.ACTIVE} _bucket:replace{2, vshard.consts.BUCKET.RECEIVING} _bucket:replace{3, vshard.consts.BUCKET.ACTIVE} -_bucket:replace{4, vshard.consts.BUCKET.SENT} -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE} -_bucket:replace{6, vshard.consts.BUCKET.GARBAGE} -_bucket:replace{200, vshard.consts.BUCKET.GARBAGE} s = box.schema.create_space('test', {engine = engine}) pk = s:create_index('pk') @@ -94,7 +90,7 @@ s:replace{2, 1} s:replace{3, 2} s:replace{4, 2} -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type +gc_bucket_drop = vshard.storage.internal.gc_bucket_drop s2 = box.schema.create_space('test2', {engine = engine}) pk2 = s2:create_index('pk') sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false}) @@ -114,6 +110,10 @@ function fill_spaces_with_garbage() s2:replace{6, 4} s2:replace{7, 5} s2:replace{7, 6} + _bucket:replace{4, vshard.consts.BUCKET.SENT, 'destination1'} + _bucket:replace{5, vshard.consts.BUCKET.GARBAGE} + _bucket:replace{6, vshard.consts.BUCKET.GARBAGE, 'destination2'} + _bucket:replace{200, vshard.consts.BUCKET.GARBAGE} end; test_run:cmd("setopt delimiter ''"); @@ -121,15 +121,21 @@ fill_spaces_with_garbage() #s2:select{} #s:select{} -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) +route_map = {} +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map) +route_map #s2:select{} #s:select{} -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) +route_map = {} +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map) +route_map s2:select{} s:select{} -- Nothing deleted - update collected generation. -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) +route_map = {} +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map) +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map) +route_map #s2:select{} #s:select{} @@ -137,10 +143,14 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) -- Test continuous garbage collection via background fiber. -- fill_spaces_with_garbage() -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end) +_ = _bucket:on_replace(function() \ + local gen = vshard.storage.internal.bucket_generation \ + vshard.storage.internal.bucket_generation = gen + 1 \ + vshard.storage.internal.bucket_generation_cond:broadcast() \ +end) f = fiber.create(vshard.storage.internal.gc_bucket_f) -- Wait until garbage collection is finished. -while s2:count() ~= 3 or s:count() ~= 6 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_cond(function() return s2:count() == 3 and s:count() == 6 end) s:select{} s2:select{} -- Check garbage bucket is deleted by background fiber. @@ -150,7 +160,7 @@ _bucket:select{} -- _bucket:replace{2, vshard.consts.BUCKET.SENT} -- Wait deletion after a while. -while _bucket:get{2} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_cond(function() return not _bucket:get{2} end) _bucket:select{} s:select{} s2:select{} @@ -162,7 +172,7 @@ _bucket:replace{4, vshard.consts.BUCKET.ACTIVE} s:replace{5, 4} s:replace{6, 4} _bucket:replace{4, vshard.consts.BUCKET.SENT} -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_cond(function() return not _bucket:get{4} end) -- -- Test WAL errors during deletion from _bucket. @@ -172,12 +182,13 @@ _ = _bucket:on_replace(rollback_on_delete) _bucket:replace{4, vshard.consts.BUCKET.SENT} s:replace{5, 4} s:replace{6, 4} -while not test_run:grep_log("default", "Error during deletion of empty sent buckets") do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end -while #sk:select{4} ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_log('default', 'Error during garbage collection step', \ + 65536, 10) +test_run:wait_cond(function() return #sk:select{4} == 0 end) s:select{} _bucket:select{} _ = _bucket:on_replace(nil, rollback_on_delete) -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_cond(function() return not _bucket:get{4} end) f:cancel() @@ -220,7 +231,7 @@ for i = 1, 2000 do _bucket:replace{i, vshard.consts.BUCKET.GARBAGE} s:replace{i, #s:select{} #s2:select{} f = fiber.create(vshard.storage.internal.gc_bucket_f) -while _bucket:count() ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end +test_run:wait_cond(function() return _bucket:count() == 0 end) _bucket:select{} s:select{} s2:select{} diff --git a/test/unit/garbage_errinj.result b/test/unit/garbage_errinj.result deleted file mode 100644 index 92c8039..0000000 --- a/test/unit/garbage_errinj.result +++ /dev/null @@ -1,223 +0,0 @@ -test_run = require('test_run').new() ---- -... -vshard = require('vshard') ---- -... -fiber = require('fiber') ---- -... -engine = test_run:get_cfg('engine') ---- -... -vshard.storage.internal.shard_index = 'bucket_id' ---- -... -format = {} ---- -... -format[1] = {name = 'id', type = 'unsigned'} ---- -... -format[2] = {name = 'status', type = 'string', is_nullable = true} ---- -... -_bucket = box.schema.create_space('_bucket', {format = format}) ---- -... -_ = _bucket:create_index('pk') ---- -... -_ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false}) ---- -... -_bucket:replace{1, vshard.consts.BUCKET.ACTIVE} ---- -- [1, 'active'] -... -_bucket:replace{2, vshard.consts.BUCKET.RECEIVING} ---- -- [2, 'receiving'] -... -_bucket:replace{3, vshard.consts.BUCKET.ACTIVE} ---- -- [3, 'active'] -... -_bucket:replace{4, vshard.consts.BUCKET.SENT} ---- -- [4, 'sent'] -... -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE} ---- -- [5, 'garbage'] -... -s = box.schema.create_space('test', {engine = engine}) ---- -... -pk = s:create_index('pk') ---- -... -sk = s:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false}) ---- -... -s:replace{1, 1} ---- -- [1, 1] -... -s:replace{2, 1} ---- -- [2, 1] -... -s:replace{3, 2} ---- -- [3, 2] -... -s:replace{4, 2} ---- -- [4, 2] -... -s:replace{5, 100} ---- -- [5, 100] -... -s:replace{6, 100} ---- -- [6, 100] -... -s:replace{7, 4} ---- -- [7, 4] -... -s:replace{8, 5} ---- -- [8, 5] -... -s2 = box.schema.create_space('test2', {engine = engine}) ---- -... -pk2 = s2:create_index('pk') ---- -... -sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false}) ---- -... -s2:replace{1, 1} ---- -- [1, 1] -... -s2:replace{3, 3} ---- -- [3, 3] -... -for i = 7, 1107 do s:replace{i, 200} end ---- -... -s2:replace{4, 200} ---- -- [4, 200] -... -s2:replace{5, 100} ---- -- [5, 100] -... -s2:replace{5, 300} ---- -- [5, 300] -... -s2:replace{6, 4} ---- -- [6, 4] -... -s2:replace{7, 5} ---- -- [7, 5] -... -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type ---- -... -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) ---- -- - 4 -- true -... -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) ---- -- - 5 -- true -... --- --- Test _bucket generation change during garbage buckets search. --- -s:truncate() ---- -... -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end) ---- -... -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = true ---- -... -f = fiber.create(function() gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) end) ---- -... -_bucket:replace{4, vshard.consts.BUCKET.GARBAGE} ---- -- [4, 'garbage'] -... -s:replace{5, 4} ---- -- [5, 4] -... -s:replace{6, 4} ---- -- [6, 4] -... -#s:select{} ---- -- 2 -... -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false ---- -... -while f:status() ~= 'dead' do fiber.sleep(0.1) end ---- -... --- Nothing is deleted - _bucket:replace() has changed _bucket --- generation during search of garbage buckets. -#s:select{} ---- -- 2 -... -_bucket:select{4} ---- -- - [4, 'garbage'] -... --- Next step deletes garbage ok. -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) ---- -- [] -- true -... -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) ---- -- - 4 - - 5 -- true -... -#s:select{} ---- -- 0 -... -_bucket:delete{4} ---- -- [4, 'garbage'] -... -s2:drop() ---- -... -s:drop() ---- -... -_bucket:drop() ---- -... diff --git a/test/unit/garbage_errinj.test.lua b/test/unit/garbage_errinj.test.lua deleted file mode 100644 index 31184b9..0000000 --- a/test/unit/garbage_errinj.test.lua +++ /dev/null @@ -1,73 +0,0 @@ -test_run = require('test_run').new() -vshard = require('vshard') -fiber = require('fiber') - -engine = test_run:get_cfg('engine') -vshard.storage.internal.shard_index = 'bucket_id' - -format = {} -format[1] = {name = 'id', type = 'unsigned'} -format[2] = {name = 'status', type = 'string', is_nullable = true} -_bucket = box.schema.create_space('_bucket', {format = format}) -_ = _bucket:create_index('pk') -_ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false}) -_bucket:replace{1, vshard.consts.BUCKET.ACTIVE} -_bucket:replace{2, vshard.consts.BUCKET.RECEIVING} -_bucket:replace{3, vshard.consts.BUCKET.ACTIVE} -_bucket:replace{4, vshard.consts.BUCKET.SENT} -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE} - -s = box.schema.create_space('test', {engine = engine}) -pk = s:create_index('pk') -sk = s:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false}) -s:replace{1, 1} -s:replace{2, 1} -s:replace{3, 2} -s:replace{4, 2} -s:replace{5, 100} -s:replace{6, 100} -s:replace{7, 4} -s:replace{8, 5} - -s2 = box.schema.create_space('test2', {engine = engine}) -pk2 = s2:create_index('pk') -sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false}) -s2:replace{1, 1} -s2:replace{3, 3} -for i = 7, 1107 do s:replace{i, 200} end -s2:replace{4, 200} -s2:replace{5, 100} -s2:replace{5, 300} -s2:replace{6, 4} -s2:replace{7, 5} - -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) - --- --- Test _bucket generation change during garbage buckets search. --- -s:truncate() -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end) -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = true -f = fiber.create(function() gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) end) -_bucket:replace{4, vshard.consts.BUCKET.GARBAGE} -s:replace{5, 4} -s:replace{6, 4} -#s:select{} -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false -while f:status() ~= 'dead' do fiber.sleep(0.1) end --- Nothing is deleted - _bucket:replace() has changed _bucket --- generation during search of garbage buckets. -#s:select{} -_bucket:select{4} --- Next step deletes garbage ok. -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) -#s:select{} -_bucket:delete{4} - -s2:drop() -s:drop() -_bucket:drop() diff --git a/vshard/cfg.lua b/vshard/cfg.lua index f7d5dbc..63d5414 100644 --- a/vshard/cfg.lua +++ b/vshard/cfg.lua @@ -251,9 +251,8 @@ local cfg_template = { max = consts.REBALANCER_MAX_SENDING_MAX }, collect_bucket_garbage_interval = { - type = 'positive number', name = 'Garbage bucket collect interval', - is_optional = true, - default = consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL + name = 'Garbage bucket collect interval', is_deprecated = true, + reason = 'Has no effect anymore' }, collect_lua_garbage = { type = 'boolean', name = 'Garbage Lua collect necessity', diff --git a/vshard/consts.lua b/vshard/consts.lua index 8c2a8b0..3f1585a 100644 --- a/vshard/consts.lua +++ b/vshard/consts.lua @@ -23,6 +23,7 @@ return { DEFAULT_BUCKET_COUNT = 3000; BUCKET_SENT_GARBAGE_DELAY = 0.5; BUCKET_CHUNK_SIZE = 1000; + LUA_CHUNK_SIZE = 100000, DEFAULT_REBALANCER_DISBALANCE_THRESHOLD = 1; REBALANCER_IDLE_INTERVAL = 60 * 60; REBALANCER_WORK_INTERVAL = 10; @@ -37,7 +38,7 @@ return { DEFAULT_FAILOVER_PING_TIMEOUT = 5; DEFAULT_SYNC_TIMEOUT = 1; RECONNECT_TIMEOUT = 0.5; - DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5; + GC_BACKOFF_INTERVAL = 5, RECOVERY_INTERVAL = 5; COLLECT_LUA_GARBAGE_INTERVAL = 100; @@ -45,4 +46,6 @@ return { DISCOVERY_WORK_INTERVAL = 1, DISCOVERY_WORK_STEP = 0.01, DISCOVERY_TIMEOUT = 10, + + TIMEOUT_INFINITY = 500 * 365 * 86400, } diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index adf1c20..1ea8069 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -69,7 +69,6 @@ if not M then total_bucket_count = 0, errinj = { ERRINJ_CFG = false, - ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false, ERRINJ_RELOAD = false, ERRINJ_CFG_DELAY = false, ERRINJ_LONG_RECEIVE = false, @@ -96,6 +95,8 @@ if not M then -- detect that _bucket was not changed between yields. -- bucket_generation = 0, + -- Condition variable fired on generation update. + bucket_generation_cond = lfiber.cond(), -- -- Reference to the function used as on_replace trigger on -- _bucket space. It is used to replace the trigger with @@ -107,12 +108,14 @@ if not M then -- replace the old function is to keep its reference. -- bucket_on_replace = nil, + -- Redirects for recently sent buckets. They are kept for a while to + -- help routers to find a new location for sent and deleted buckets + -- without whole cluster scan. + route_map = {}, ------------------- Garbage collection ------------------- -- Fiber to remove garbage buckets data. collect_bucket_garbage_fiber = nil, - -- Do buckets garbage collection once per this time. - collect_bucket_garbage_interval = nil, -- Boolean lua_gc state (create periodic gc task). collect_lua_garbage = nil, @@ -173,6 +176,7 @@ end -- local function bucket_generation_increment() M.bucket_generation = M.bucket_generation + 1 + M.bucket_generation_cond:broadcast() end -- @@ -758,8 +762,9 @@ local function bucket_check_state(bucket_id, mode) else return bucket end + local dst = bucket and bucket.destination or M.route_map[bucket_id] return bucket, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, reason, - bucket and bucket.destination) + dst) end -- @@ -804,11 +809,23 @@ end -- local function bucket_unrefro(bucket_id) local ref = M.bucket_refs[bucket_id] - if not ref or ref.ro == 0 then + local count = ref and ref.ro or 0 + if count == 0 then return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, "no refs", nil) end - ref.ro = ref.ro - 1 + if count == 1 then + ref.ro = 0 + if ref.ro_lock then + -- Garbage collector is waiting for the bucket if RO + -- is locked. Let it know it has one more bucket to + -- collect. It relies on generation, so its increment + -- it enough. + bucket_generation_increment() + end + return true + end + ref.ro = count - 1 return true end @@ -1481,79 +1498,44 @@ local function gc_bucket_in_space(space, bucket_id, status) end -- --- Remove tuples from buckets of a specified type. --- @param type Type of buckets to gc. --- @retval List of ids of empty buckets of the type. +-- Drop buckets with the given status along with their data in all spaces. +-- @param status Status of target buckets. +-- @param route_map Destinations of deleted buckets are saved into this table. -- -local function gc_bucket_step_by_type(type) - local sharded_spaces = find_sharded_spaces() - local empty_buckets = {} +local function gc_bucket_drop_xc(status, route_map) local limit = consts.BUCKET_CHUNK_SIZE - local is_all_collected = true - for _, bucket in box.space._bucket.index.status:pairs(type) do - local bucket_id = bucket.id - local ref = M.bucket_refs[bucket_id] + local _bucket = box.space._bucket + local sharded_spaces = find_sharded_spaces() + for _, b in _bucket.index.status:pairs(status) do + local id = b.id + local ref = M.bucket_refs[id] if ref then assert(ref.rw == 0) if ref.ro ~= 0 then ref.ro_lock = true - is_all_collected = false goto continue end - M.bucket_refs[bucket_id] = nil + M.bucket_refs[id] = nil end for _, space in pairs(sharded_spaces) do - gc_bucket_in_space_xc(space, bucket_id, type) + gc_bucket_in_space_xc(space, id, status) limit = limit - 1 if limit == 0 then lfiber.sleep(0) limit = consts.BUCKET_CHUNK_SIZE end end - table.insert(empty_buckets, bucket.id) -::continue:: + route_map[id] = b.destination + _bucket:delete{id} + ::continue:: end - return empty_buckets, is_all_collected -end - --- --- Drop buckets with ids in the list. --- @param bucket_ids Bucket ids to drop. --- @param status Expected bucket status. --- -local function gc_bucket_drop_xc(bucket_ids, status) - if #bucket_ids == 0 then - return - end - local limit = consts.BUCKET_CHUNK_SIZE - box.begin() - local _bucket = box.space._bucket - for _, id in pairs(bucket_ids) do - local bucket_exists = _bucket:get{id} ~= nil - local b = _bucket:get{id} - if b then - if b.status ~= status then - return error(string.format('Bucket %d status is changed. Was '.. - '%s, became %s', id, status, - b.status)) - end - _bucket:delete{id} - end - limit = limit - 1 - if limit == 0 then - box.commit() - box.begin() - limit = consts.BUCKET_CHUNK_SIZE - end - end - box.commit() end -- -- Exception safe version of gc_bucket_drop_xc. -- -local function gc_bucket_drop(bucket_ids, status) - local status, err = pcall(gc_bucket_drop_xc, bucket_ids, status) +local function gc_bucket_drop(status, route_map) + local status, err = pcall(gc_bucket_drop_xc, status, route_map) if not status then box.rollback() end @@ -1561,14 +1543,16 @@ local function gc_bucket_drop(bucket_ids, status) end -- --- Garbage collector. Works on masters. The garbage collector --- wakes up once per specified time. +-- Garbage collector. Works on masters. The garbage collector wakes up when +-- state of any bucket changes. -- After wakeup it follows the plan: --- 1) Check if _bucket has changed. If not, then sleep again; --- 2) Scan user spaces for sent and garbage buckets, delete --- garbage data in batches of limited size; --- 3) Delete GARBAGE buckets from _bucket immediately, and --- schedule SENT buckets for deletion after a timeout; +-- 1) Check if state of any bucket has really changed. If not, then sleep again; +-- 2) Delete all GARBAGE and SENT buckets along with their data in chunks of +-- limited size. +-- 3) Bucket destinations are saved into a global route_map to reroute incoming +-- requests from routers in case they didn't notice the buckets being moved. +-- The saved routes are scheduled for deletion after a timeout, which is +-- checked on each iteration of this loop. -- 4) Sleep, go to (1). -- For each step details see comments in the code. -- @@ -1580,65 +1564,75 @@ function gc_bucket_f() -- generation == bucket generation. In such a case the fiber -- does nothing until next _bucket change. local bucket_generation_collected = -1 - -- Empty sent buckets are collected into an array. After a - -- specified time interval the buckets are deleted both from - -- this array and from _bucket space. - local buckets_for_redirect = {} - local buckets_for_redirect_ts = fiber_clock() - -- Empty sent buckets, updated after each step, and when - -- buckets_for_redirect is deleted, it gets empty_sent_buckets - -- for next deletion. - local empty_garbage_buckets, empty_sent_buckets, status, err + local bucket_generation_current = M.bucket_generation + -- Deleted buckets are saved into a route map to redirect routers if they + -- didn't discover new location of the buckets yet. However route map does + -- not grow infinitely. Otherwise it would end up storing redirects for all + -- buckets in the cluster. Which could also be outdated. + -- Garbage collector periodically drops old routes from the map. For that it + -- remembers state of route map in one moment, and after a while clears the + -- remembered routes from the global route map. + local route_map = M.route_map + local route_map_old = {} + local route_map_deadline = 0 + local status, err while M.module_version == module_version do - -- Check if no changes in buckets configuration. - if bucket_generation_collected ~= M.bucket_generation then - local bucket_generation = M.bucket_generation - local is_sent_collected, is_garbage_collected - status, empty_garbage_buckets, is_garbage_collected = - pcall(gc_bucket_step_by_type, consts.BUCKET.GARBAGE) - if not status then - err = empty_garbage_buckets - goto check_error - end - status, empty_sent_buckets, is_sent_collected = - pcall(gc_bucket_step_by_type, consts.BUCKET.SENT) - if not status then - err = empty_sent_buckets - goto check_error + if bucket_generation_collected ~= bucket_generation_current then + status, err = gc_bucket_drop(consts.BUCKET.GARBAGE, route_map) + if status then + status, err = gc_bucket_drop(consts.BUCKET.SENT, route_map) end - status, err = gc_bucket_drop(empty_garbage_buckets, - consts.BUCKET.GARBAGE) -::check_error:: if not status then box.rollback() log.error('Error during garbage collection step: %s', err) - goto continue + else + -- Don't use global generation. During the collection it could + -- already change. Instead, remember the generation known before + -- the collection has started. + -- Since the collection also changes the generation, it makes + -- the GC happen always at least twice. But typically on the + -- second iteration it should not find any buckets to collect, + -- and then the collected generation matches the global one. + bucket_generation_collected = bucket_generation_current end - if is_sent_collected and is_garbage_collected then - bucket_generation_collected = bucket_generation + else + status = true + end + + local sleep_time = route_map_deadline - fiber_clock() + if sleep_time <= 0 then + local chunk = consts.LUA_CHUNK_SIZE + util.table_minus_yield(route_map, route_map_old, chunk) + route_map_old = util.table_copy_yield(route_map, chunk) + if next(route_map_old) then + sleep_time = consts.BUCKET_SENT_GARBAGE_DELAY + else + sleep_time = consts.TIMEOUT_INFINITY end + route_map_deadline = fiber_clock() + sleep_time end + bucket_generation_current = M.bucket_generation - if fiber_clock() - buckets_for_redirect_ts >= - consts.BUCKET_SENT_GARBAGE_DELAY then - status, err = gc_bucket_drop(buckets_for_redirect, - consts.BUCKET.SENT) - if not status then - buckets_for_redirect = {} - empty_sent_buckets = {} - bucket_generation_collected = -1 - log.error('Error during deletion of empty sent buckets: %s', - err) - elseif M.module_version ~= module_version then - return + if bucket_generation_current ~= bucket_generation_collected then + -- Generation was changed during collection. Or *by* collection. + if status then + -- Retry immediately. If the generation was changed by the + -- collection itself, it will notice it next iteration, and go + -- to proper sleep. + sleep_time = 0 else - buckets_for_redirect = empty_sent_buckets or {} - empty_sent_buckets = nil - buckets_for_redirect_ts = fiber_clock() + -- An error happened during the collection. Does not make sense + -- to retry on each iteration of the event loop. The most likely + -- errors are either a WAL error or a transaction abort - both + -- look like an issue in the user's code and can't be fixed + -- quickly anyway. Backoff. + sleep_time = consts.GC_BACKOFF_INTERVAL end end -::continue:: - lfiber.sleep(M.collect_bucket_garbage_interval) + + if M.module_version == module_version then + M.bucket_generation_cond:wait(sleep_time) + end end end @@ -2423,8 +2417,6 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload) vshard_cfg.rebalancer_disbalance_threshold M.rebalancer_receiving_quota = vshard_cfg.rebalancer_max_receiving M.shard_index = vshard_cfg.shard_index - M.collect_bucket_garbage_interval = - vshard_cfg.collect_bucket_garbage_interval M.collect_lua_garbage = vshard_cfg.collect_lua_garbage M.rebalancer_worker_count = vshard_cfg.rebalancer_max_sending M.current_cfg = cfg @@ -2678,6 +2670,9 @@ else storage_cfg(M.current_cfg, M.this_replica.uuid, true) end M.module_version = M.module_version + 1 + -- Background fibers could sleep waiting for bucket changes. + -- Let them know it is time to reload. + bucket_generation_increment() end M.recovery_f = recovery_f @@ -2688,7 +2683,7 @@ M.gc_bucket_f = gc_bucket_f -- These functions are saved in M not for atomic reload, but for -- unit testing. -- -M.gc_bucket_step_by_type = gc_bucket_step_by_type +M.gc_bucket_drop = gc_bucket_drop M.rebalancer_build_routes = rebalancer_build_routes M.rebalancer_calculate_metrics = rebalancer_calculate_metrics M.cached_find_sharded_spaces = find_sharded_spaces diff --git a/vshard/storage/reload_evolution.lua b/vshard/storage/reload_evolution.lua index f38af74..484f499 100644 --- a/vshard/storage/reload_evolution.lua +++ b/vshard/storage/reload_evolution.lua @@ -4,6 +4,7 @@ -- in a commit. -- local log = require('log') +local fiber = require('fiber') -- -- Array of upgrade functions. @@ -25,6 +26,13 @@ migrations[#migrations + 1] = function(M) end end +migrations[#migrations + 1] = function(M) + if not M.route_map then + M.bucket_generation_cond = fiber.cond() + M.route_map = {} + end +end + -- -- Perform an update based on a version stored in `M` (internals). -- @param M Old module internals which should be updated.
next prev parent reply other threads:[~2021-02-10 22:36 UTC|newest] Thread overview: 36+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-02-09 23:46 [Tarantool-patches] [PATCH 0/9] VShard Map-Reduce, part 1, preparations Vladislav Shpilevoy via Tarantool-patches 2021-02-09 23:46 ` [Tarantool-patches] [PATCH 1/9] rlist: move rlist to a new module Vladislav Shpilevoy via Tarantool-patches 2021-02-10 8:57 ` Oleg Babin via Tarantool-patches 2021-02-11 6:50 ` Oleg Babin via Tarantool-patches 2021-02-12 0:09 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-09 23:46 ` [Tarantool-patches] [PATCH 2/9] Use fiber.clock() instead of .time() everywhere Vladislav Shpilevoy via Tarantool-patches 2021-02-10 8:57 ` Oleg Babin via Tarantool-patches 2021-02-10 22:33 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-09 23:46 ` [Tarantool-patches] [PATCH 3/9] test: introduce a helper to wait for bucket GC Vladislav Shpilevoy via Tarantool-patches 2021-02-10 8:57 ` Oleg Babin via Tarantool-patches 2021-02-10 22:33 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-11 6:50 ` Oleg Babin via Tarantool-patches 2021-02-09 23:46 ` [Tarantool-patches] [PATCH 4/9] storage: bucket_recv() should check rs lock Vladislav Shpilevoy via Tarantool-patches 2021-02-10 8:59 ` Oleg Babin via Tarantool-patches 2021-02-09 23:46 ` [Tarantool-patches] [PATCH 5/9] util: introduce yielding table functions Vladislav Shpilevoy via Tarantool-patches 2021-02-10 8:59 ` Oleg Babin via Tarantool-patches 2021-02-10 22:34 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-11 6:50 ` Oleg Babin via Tarantool-patches 2021-02-09 23:46 ` [Tarantool-patches] [PATCH 6/9] cfg: introduce 'deprecated option' feature Vladislav Shpilevoy via Tarantool-patches 2021-02-10 8:59 ` Oleg Babin via Tarantool-patches 2021-02-10 22:34 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-11 6:50 ` Oleg Babin via Tarantool-patches 2021-02-09 23:46 ` [Tarantool-patches] [PATCH 7/9] gc: introduce reactive garbage collector Vladislav Shpilevoy via Tarantool-patches 2021-02-10 9:00 ` Oleg Babin via Tarantool-patches 2021-02-10 22:35 ` Vladislav Shpilevoy via Tarantool-patches [this message] 2021-02-11 6:50 ` Oleg Babin via Tarantool-patches 2021-02-09 23:46 ` [Tarantool-patches] [PATCH 8/9] recovery: introduce reactive recovery Vladislav Shpilevoy via Tarantool-patches 2021-02-10 9:00 ` Oleg Babin via Tarantool-patches 2021-02-09 23:46 ` [Tarantool-patches] [PATCH 9/9] util: introduce binary heap data structure Vladislav Shpilevoy via Tarantool-patches 2021-02-10 9:01 ` Oleg Babin via Tarantool-patches 2021-02-10 22:36 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-11 6:51 ` Oleg Babin via Tarantool-patches 2021-02-12 0:09 ` Vladislav Shpilevoy via Tarantool-patches 2021-03-05 22:03 ` Vladislav Shpilevoy via Tarantool-patches 2021-02-09 23:51 ` [Tarantool-patches] [PATCH 0/9] VShard Map-Reduce, part 1, preparations Vladislav Shpilevoy via Tarantool-patches 2021-02-12 11:02 ` Oleg Babin via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=edbf5fc1-b206-a28e-34d8-fc70bf63f704@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=olegrok@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --cc=yaroslav.dynnikov@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH 7/9] gc: introduce reactive garbage collector' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox