From: Oleg Babin via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>,
tarantool-patches@dev.tarantool.org,
yaroslav.dynnikov@tarantool.org
Subject: Re: [Tarantool-patches] [PATCH 7/9] gc: introduce reactive garbage collector
Date: Wed, 10 Feb 2021 12:00:02 +0300 [thread overview]
Message-ID: <68b5d6a6-1fb4-913d-61ac-e3062683e325@tarantool.org> (raw)
In-Reply-To: <657bade6ef0ecf12b77c1a037d8326552f761002.1612914070.git.v.shpilevoy@tarantool.org>
Thanks for your patch.
As I see you've introduced some new parameters: "LUA_CHUNK_SIZE" and
"GC_BACKOFF_INTERVAL".
I think it's better to describe them in commit message to understand
more clear how new algorithm.
I see that you didn't update comment above "gc_bucket_f" function. Is it
still relevant?
In general patch LGTM.
On 10/02/2021 02:46, Vladislav Shpilevoy wrote:
> Garbage collector is a fiber on a master node which deletes
> GARBAGE and SENT buckets along with their data.
>
> It was proactive. It used to wakeup with a constant period to
> find and delete the needed buckets.
>
> But this won't work with the future feature called 'map-reduce'.
> Map-reduce as a preparation stage will need to ensure that all
> buckets on a storage are readable and writable. With the current
> GC algorithm if a bucket is sent, it won't be deleted for the next
> 5 seconds by default. During this time all new map-reduce requests
> can't execute.
>
> This is not acceptable. As well as too frequent wakeup of GC fiber
> because it would waste TX thread time.
>
> The patch makes GC fiber wakeup not by a timeout but by events
> happening with _bucket space. GC fiber sleeps on a condition
> variable which is signaled when _bucket is changed.
>
> Once GC sees work to do, it won't sleep until it is done. It will
> only yield.
>
> This makes GC delete SENT and GARBAGE buckets as soon as possible
> reducing the waiting time for the incoming map-reduce requests.
>
> Needed for #147
>
> @TarantoolBot document
> Title: VShard: deprecate cfg option 'collect_bucket_garbage_interval'
> It was used to specify the interval between bucket garbage
> collection steps. It was needed because garbage collection in
> vshard was proactive. It didn't react to newly appeared garbage
> buckets immediately.
>
> Since now (0.1.17) garbage collection became reactive. It starts
> working with garbage buckets immediately as they appear. And
> sleeps rest of the time. The option is not used now and does not
> affect behaviour of anything.
>
> I suppose it can be deleted from the documentation. Or left with
> a big label 'deprecated' + the explanation above.
>
> An attempt to use the option does not cause an error, but logs a
> warning.
> ---
> test/lua_libs/storage_template.lua | 1 -
> test/misc/reconfigure.result | 10 -
> test/misc/reconfigure.test.lua | 3 -
> test/rebalancer/bucket_ref.result | 12 --
> test/rebalancer/bucket_ref.test.lua | 3 -
> test/rebalancer/errinj.result | 11 --
> test/rebalancer/errinj.test.lua | 5 -
> test/rebalancer/receiving_bucket.result | 8 -
> test/rebalancer/receiving_bucket.test.lua | 1 -
> test/reload_evolution/storage.result | 2 +-
> test/router/reroute_wrong_bucket.result | 8 +-
> test/router/reroute_wrong_bucket.test.lua | 4 +-
> test/storage/recovery.result | 3 +-
> test/storage/storage.result | 10 +-
> test/storage/storage.test.lua | 1 +
> test/unit/config.result | 35 +---
> test/unit/config.test.lua | 16 +-
> test/unit/garbage.result | 106 ++++++----
> test/unit/garbage.test.lua | 47 +++--
> test/unit/garbage_errinj.result | 223 ----------------------
> test/unit/garbage_errinj.test.lua | 73 -------
> vshard/cfg.lua | 4 +-
> vshard/consts.lua | 5 +-
> vshard/storage/init.lua | 207 ++++++++++----------
> vshard/storage/reload_evolution.lua | 8 +
> 25 files changed, 233 insertions(+), 573 deletions(-)
> delete mode 100644 test/unit/garbage_errinj.result
> delete mode 100644 test/unit/garbage_errinj.test.lua
>
> diff --git a/test/lua_libs/storage_template.lua b/test/lua_libs/storage_template.lua
> index 21409bd..8df89f6 100644
> --- a/test/lua_libs/storage_template.lua
> +++ b/test/lua_libs/storage_template.lua
> @@ -172,6 +172,5 @@ function wait_bucket_is_collected(id)
> return true
> end
> vshard.storage.recovery_wakeup()
> - vshard.storage.garbage_collector_wakeup()
> end)
> end
> diff --git a/test/misc/reconfigure.result b/test/misc/reconfigure.result
> index 168be5d..3b34841 100644
> --- a/test/misc/reconfigure.result
> +++ b/test/misc/reconfigure.result
> @@ -83,9 +83,6 @@ cfg.collect_lua_garbage = true
> cfg.rebalancer_max_receiving = 1000
> ---
> ...
> -cfg.collect_bucket_garbage_interval = 100
> ----
> -...
> cfg.invalid_option = 'kek'
> ---
> ...
> @@ -105,10 +102,6 @@ vshard.storage.internal.rebalancer_max_receiving ~= 1000
> ---
> - true
> ...
> -vshard.storage.internal.collect_bucket_garbage_interval ~= 100
> ----
> -- true
> -...
> cfg.sync_timeout = nil
> ---
> ...
> @@ -118,9 +111,6 @@ cfg.collect_lua_garbage = nil
> cfg.rebalancer_max_receiving = nil
> ---
> ...
> -cfg.collect_bucket_garbage_interval = nil
> ----
> -...
> cfg.invalid_option = nil
> ---
> ...
> diff --git a/test/misc/reconfigure.test.lua b/test/misc/reconfigure.test.lua
> index e891010..348628c 100644
> --- a/test/misc/reconfigure.test.lua
> +++ b/test/misc/reconfigure.test.lua
> @@ -33,17 +33,14 @@ vshard.storage.internal.sync_timeout
> cfg.sync_timeout = 100
> cfg.collect_lua_garbage = true
> cfg.rebalancer_max_receiving = 1000
> -cfg.collect_bucket_garbage_interval = 100
> cfg.invalid_option = 'kek'
> vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a)
> not vshard.storage.internal.collect_lua_garbage
> vshard.storage.internal.sync_timeout
> vshard.storage.internal.rebalancer_max_receiving ~= 1000
> -vshard.storage.internal.collect_bucket_garbage_interval ~= 100
> cfg.sync_timeout = nil
> cfg.collect_lua_garbage = nil
> cfg.rebalancer_max_receiving = nil
> -cfg.collect_bucket_garbage_interval = nil
> cfg.invalid_option = nil
>
> --
> diff --git a/test/rebalancer/bucket_ref.result b/test/rebalancer/bucket_ref.result
> index b8fc7ff..9df7480 100644
> --- a/test/rebalancer/bucket_ref.result
> +++ b/test/rebalancer/bucket_ref.result
> @@ -184,9 +184,6 @@ vshard.storage.bucket_unref(1, 'read')
> - true
> ...
> -- Force GC to take an RO lock on the bucket now.
> -vshard.storage.garbage_collector_wakeup()
> ----
> -...
> vshard.storage.buckets_info(1)
> ---
> - 1:
> @@ -203,7 +200,6 @@ while true do
> if i.status == vshard.consts.BUCKET.GARBAGE and i.ro_lock then
> break
> end
> - vshard.storage.garbage_collector_wakeup()
> fiber.sleep(0.01)
> end;
> ---
> @@ -235,14 +231,6 @@ finish_refs = true
> while f1:status() ~= 'dead' do fiber.sleep(0.01) end
> ---
> ...
> -vshard.storage.buckets_info(1)
> ----
> -- 1:
> - status: garbage
> - ro_lock: true
> - destination: <replicaset_2>
> - id: 1
> -...
> wait_bucket_is_collected(1)
> ---
> ...
> diff --git a/test/rebalancer/bucket_ref.test.lua b/test/rebalancer/bucket_ref.test.lua
> index 213ced3..1b032ff 100644
> --- a/test/rebalancer/bucket_ref.test.lua
> +++ b/test/rebalancer/bucket_ref.test.lua
> @@ -56,7 +56,6 @@ vshard.storage.bucket_unref(1, 'write') -- Error, no refs.
> vshard.storage.bucket_ref(1, 'read')
> vshard.storage.bucket_unref(1, 'read')
> -- Force GC to take an RO lock on the bucket now.
> -vshard.storage.garbage_collector_wakeup()
> vshard.storage.buckets_info(1)
> _ = test_run:cmd("setopt delimiter ';'")
> while true do
> @@ -64,7 +63,6 @@ while true do
> if i.status == vshard.consts.BUCKET.GARBAGE and i.ro_lock then
> break
> end
> - vshard.storage.garbage_collector_wakeup()
> fiber.sleep(0.01)
> end;
> _ = test_run:cmd("setopt delimiter ''");
> @@ -72,7 +70,6 @@ vshard.storage.buckets_info(1)
> vshard.storage.bucket_refro(1)
> finish_refs = true
> while f1:status() ~= 'dead' do fiber.sleep(0.01) end
> -vshard.storage.buckets_info(1)
> wait_bucket_is_collected(1)
> _ = test_run:switch('box_2_a')
> vshard.storage.buckets_info(1)
> diff --git a/test/rebalancer/errinj.result b/test/rebalancer/errinj.result
> index e50eb72..0ddb1c9 100644
> --- a/test/rebalancer/errinj.result
> +++ b/test/rebalancer/errinj.result
> @@ -226,17 +226,6 @@ ret2, err2
> - true
> - null
> ...
> -_bucket:get{35}
> ----
> -- [35, 'sent', '<replicaset_2>']
> -...
> -_bucket:get{36}
> ----
> -- [36, 'sent', '<replicaset_2>']
> -...
> --- Buckets became 'active' on box_2_a, but still are sending on
> --- box_1_a. Wait until it is marked as garbage on box_1_a by the
> --- recovery fiber.
> wait_bucket_is_collected(35)
> ---
> ...
> diff --git a/test/rebalancer/errinj.test.lua b/test/rebalancer/errinj.test.lua
> index 2cc4a69..a60f3d7 100644
> --- a/test/rebalancer/errinj.test.lua
> +++ b/test/rebalancer/errinj.test.lua
> @@ -102,11 +102,6 @@ _ = test_run:switch('box_1_a')
> while f1:status() ~= 'dead' or f2:status() ~= 'dead' do fiber.sleep(0.001) end
> ret1, err1
> ret2, err2
> -_bucket:get{35}
> -_bucket:get{36}
> --- Buckets became 'active' on box_2_a, but still are sending on
> --- box_1_a. Wait until it is marked as garbage on box_1_a by the
> --- recovery fiber.
> wait_bucket_is_collected(35)
> wait_bucket_is_collected(36)
> _ = test_run:switch('box_2_a')
> diff --git a/test/rebalancer/receiving_bucket.result b/test/rebalancer/receiving_bucket.result
> index 7d3612b..ad93445 100644
> --- a/test/rebalancer/receiving_bucket.result
> +++ b/test/rebalancer/receiving_bucket.result
> @@ -366,14 +366,6 @@ vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3})
> ---
> - true
> ...
> -vshard.storage.buckets_info(1)
> ----
> -- 1:
> - status: sent
> - ro_lock: true
> - destination: <replicaset_1>
> - id: 1
> -...
> wait_bucket_is_collected(1)
> ---
> ...
> diff --git a/test/rebalancer/receiving_bucket.test.lua b/test/rebalancer/receiving_bucket.test.lua
> index 24534b3..2cf6382 100644
> --- a/test/rebalancer/receiving_bucket.test.lua
> +++ b/test/rebalancer/receiving_bucket.test.lua
> @@ -136,7 +136,6 @@ box.space.test3:select{100}
> -- Now the bucket is unreferenced and can be transferred.
> _ = test_run:switch('box_2_a')
> vshard.storage.bucket_send(1, util.replicasets[1], {timeout = 0.3})
> -vshard.storage.buckets_info(1)
> wait_bucket_is_collected(1)
> vshard.storage.buckets_info(1)
> _ = test_run:switch('box_1_a')
> diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result
> index 753687f..9d30a04 100644
> --- a/test/reload_evolution/storage.result
> +++ b/test/reload_evolution/storage.result
> @@ -92,7 +92,7 @@ test_run:grep_log('storage_2_a', 'vshard.storage.reload_evolution: upgraded to')
> ...
> vshard.storage.internal.reload_version
> ---
> -- 2
> +- 3
> ...
> --
> -- gh-237: should be only one trigger. During gh-237 the trigger installation
> diff --git a/test/router/reroute_wrong_bucket.result b/test/router/reroute_wrong_bucket.result
> index 049bdef..ac340eb 100644
> --- a/test/router/reroute_wrong_bucket.result
> +++ b/test/router/reroute_wrong_bucket.result
> @@ -37,7 +37,7 @@ test_run:switch('storage_1_a')
> ---
> - true
> ...
> -cfg.collect_bucket_garbage_interval = 100
> +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
> ---
> ...
> vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a)
> @@ -53,7 +53,7 @@ test_run:switch('storage_2_a')
> ---
> - true
> ...
> -cfg.collect_bucket_garbage_interval = 100
> +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
> ---
> ...
> vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a)
> @@ -202,12 +202,12 @@ test_run:grep_log('router_1', 'please update configuration')
> err
> ---
> - bucket_id: 100
> - reason: write is prohibited
> + reason: Not found
> code: 1
> destination: ac522f65-aa94-4134-9f64-51ee384f1a54
> type: ShardingError
> name: WRONG_BUCKET
> - message: 'Cannot perform action with bucket 100, reason: write is prohibited'
> + message: 'Cannot perform action with bucket 100, reason: Not found'
> ...
> --
> -- Now try again, but update configuration during call(). It must
> diff --git a/test/router/reroute_wrong_bucket.test.lua b/test/router/reroute_wrong_bucket.test.lua
> index 9e6e804..207aac3 100644
> --- a/test/router/reroute_wrong_bucket.test.lua
> +++ b/test/router/reroute_wrong_bucket.test.lua
> @@ -11,13 +11,13 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt
> test_run:cmd('create server router_1 with script="router/router_1.lua"')
> test_run:cmd('start server router_1')
> test_run:switch('storage_1_a')
> -cfg.collect_bucket_garbage_interval = 100
> +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
> vshard.storage.cfg(cfg, util.name_to_uuid.storage_1_a)
> vshard.storage.rebalancer_disable()
> for i = 1, 100 do box.space._bucket:replace{i, vshard.consts.BUCKET.ACTIVE} end
>
> test_run:switch('storage_2_a')
> -cfg.collect_bucket_garbage_interval = 100
> +vshard.consts.BUCKET_SENT_GARBAGE_DELAY = 100
> vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a)
> vshard.storage.rebalancer_disable()
> for i = 101, 200 do box.space._bucket:replace{i, vshard.consts.BUCKET.ACTIVE} end
> diff --git a/test/storage/recovery.result b/test/storage/recovery.result
> index f833fe7..8ccb0b9 100644
> --- a/test/storage/recovery.result
> +++ b/test/storage/recovery.result
> @@ -79,8 +79,7 @@ _bucket = box.space._bucket
> ...
> _bucket:select{}
> ---
> -- - [2, 'garbage', '<replicaset_2>']
> - - [3, 'garbage', '<replicaset_2>']
> +- []
> ...
> _ = test_run:switch('storage_2_a')
> ---
> diff --git a/test/storage/storage.result b/test/storage/storage.result
> index 424bc4c..0550ad1 100644
> --- a/test/storage/storage.result
> +++ b/test/storage/storage.result
> @@ -547,6 +547,9 @@ vshard.storage.bucket_send(1, util.replicasets[2])
> ---
> - true
> ...
> +wait_bucket_is_collected(1)
> +---
> +...
> _ = test_run:switch("storage_2_a")
> ---
> ...
> @@ -567,12 +570,7 @@ _ = test_run:switch("storage_1_a")
> ...
> vshard.storage.buckets_info()
> ---
> -- 1:
> - status: sent
> - ro_lock: true
> - destination: <replicaset_2>
> - id: 1
> - 2:
> +- 2:
> status: active
> id: 2
> ...
> diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua
> index d631b51..d8fbd94 100644
> --- a/test/storage/storage.test.lua
> +++ b/test/storage/storage.test.lua
> @@ -136,6 +136,7 @@ vshard.storage.bucket_send(1, util.replicasets[1])
>
> -- Successful transfer.
> vshard.storage.bucket_send(1, util.replicasets[2])
> +wait_bucket_is_collected(1)
> _ = test_run:switch("storage_2_a")
> vshard.storage.buckets_info()
> _ = test_run:switch("storage_1_a")
> diff --git a/test/unit/config.result b/test/unit/config.result
> index dfd0219..e0b2482 100644
> --- a/test/unit/config.result
> +++ b/test/unit/config.result
> @@ -428,33 +428,6 @@ _ = lcfg.check(cfg)
> --
> -- gh-77: garbage collection options.
> --
> -cfg.collect_bucket_garbage_interval = 'str'
> ----
> -...
> -check(cfg)
> ----
> -- Garbage bucket collect interval must be positive number
> -...
> -cfg.collect_bucket_garbage_interval = 0
> ----
> -...
> -check(cfg)
> ----
> -- Garbage bucket collect interval must be positive number
> -...
> -cfg.collect_bucket_garbage_interval = -1
> ----
> -...
> -check(cfg)
> ----
> -- Garbage bucket collect interval must be positive number
> -...
> -cfg.collect_bucket_garbage_interval = 100.5
> ----
> -...
> -_ = lcfg.check(cfg)
> ----
> -...
> cfg.collect_lua_garbage = 100
> ---
> ...
> @@ -615,6 +588,12 @@ lcfg.check(cfg).rebalancer_max_sending
> cfg.rebalancer_max_sending = nil
> ---
> ...
> -cfg.sharding = nil
> +--
> +-- Deprecated option does not break anything.
> +--
> +cfg.collect_bucket_garbage_interval = 100
> +---
> +...
> +_ = lcfg.check(cfg)
> ---
> ...
> diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua
> index ada43db..a1c9f07 100644
> --- a/test/unit/config.test.lua
> +++ b/test/unit/config.test.lua
> @@ -175,15 +175,6 @@ _ = lcfg.check(cfg)
> --
> -- gh-77: garbage collection options.
> --
> -cfg.collect_bucket_garbage_interval = 'str'
> -check(cfg)
> -cfg.collect_bucket_garbage_interval = 0
> -check(cfg)
> -cfg.collect_bucket_garbage_interval = -1
> -check(cfg)
> -cfg.collect_bucket_garbage_interval = 100.5
> -_ = lcfg.check(cfg)
> -
> cfg.collect_lua_garbage = 100
> check(cfg)
> cfg.collect_lua_garbage = true
> @@ -244,4 +235,9 @@ util.check_error(lcfg.check, cfg)
> cfg.rebalancer_max_sending = 15
> lcfg.check(cfg).rebalancer_max_sending
> cfg.rebalancer_max_sending = nil
> -cfg.sharding = nil
> +
> +--
> +-- Deprecated option does not break anything.
> +--
> +cfg.collect_bucket_garbage_interval = 100
> +_ = lcfg.check(cfg)
> diff --git a/test/unit/garbage.result b/test/unit/garbage.result
> index 74d9ccf..a530496 100644
> --- a/test/unit/garbage.result
> +++ b/test/unit/garbage.result
> @@ -31,9 +31,6 @@ test_run:cmd("setopt delimiter ''");
> vshard.storage.internal.shard_index = 'bucket_id'
> ---
> ...
> -vshard.storage.internal.collect_bucket_garbage_interval = vshard.consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL
> ----
> -...
> --
> -- Find nothing if no bucket_id anywhere, or there is no index
> -- by it, or bucket_id is not unsigned.
> @@ -151,6 +148,9 @@ format[1] = {name = 'id', type = 'unsigned'}
> format[2] = {name = 'status', type = 'string'}
> ---
> ...
> +format[3] = {name = 'destination', type = 'string', is_nullable = true}
> +---
> +...
> _bucket = box.schema.create_space('_bucket', {format = format})
> ---
> ...
> @@ -172,22 +172,6 @@ _bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
> ---
> - [3, 'active']
> ...
> -_bucket:replace{4, vshard.consts.BUCKET.SENT}
> ----
> -- [4, 'sent']
> -...
> -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> ----
> -- [5, 'garbage']
> -...
> -_bucket:replace{6, vshard.consts.BUCKET.GARBAGE}
> ----
> -- [6, 'garbage']
> -...
> -_bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
> ----
> -- [200, 'garbage']
> -...
> s = box.schema.create_space('test', {engine = engine})
> ---
> ...
> @@ -213,7 +197,7 @@ s:replace{4, 2}
> ---
> - [4, 2]
> ...
> -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
> +gc_bucket_drop = vshard.storage.internal.gc_bucket_drop
> ---
> ...
> s2 = box.schema.create_space('test2', {engine = engine})
> @@ -249,6 +233,10 @@ function fill_spaces_with_garbage()
> s2:replace{6, 4}
> s2:replace{7, 5}
> s2:replace{7, 6}
> + _bucket:replace{4, vshard.consts.BUCKET.SENT, 'destination1'}
> + _bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> + _bucket:replace{6, vshard.consts.BUCKET.GARBAGE, 'destination2'}
> + _bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
> end;
> ---
> ...
> @@ -267,12 +255,22 @@ fill_spaces_with_garbage()
> ---
> - 1107
> ...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> +route_map = {}
> +---
> +...
> +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
> ---
> -- - 5
> - - 6
> - - 200
> - true
> +- null
> +...
> +route_map
> +---
> +- - null
> + - null
> + - null
> + - null
> + - null
> + - destination2
> ...
> #s2:select{}
> ---
> @@ -282,10 +280,20 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> ---
> - 7
> ...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> +route_map = {}
> +---
> +...
> +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
> ---
> -- - 4
> - true
> +- null
> +...
> +route_map
> +---
> +- - null
> + - null
> + - null
> + - destination1
> ...
> s2:select{}
> ---
> @@ -303,17 +311,22 @@ s:select{}
> - [6, 100]
> ...
> -- Nothing deleted - update collected generation.
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> +route_map = {}
> +---
> +...
> +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
> ---
> -- - 5
> - - 6
> - - 200
> - true
> +- null
> ...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
> ---
> -- - 4
> - true
> +- null
> +...
> +route_map
> +---
> +- []
> ...
> #s2:select{}
> ---
> @@ -329,15 +342,20 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> fill_spaces_with_garbage()
> ---
> ...
> -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
> +_ = _bucket:on_replace(function() \
> + local gen = vshard.storage.internal.bucket_generation \
> + vshard.storage.internal.bucket_generation = gen + 1 \
> + vshard.storage.internal.bucket_generation_cond:broadcast() \
> +end)
> ---
> ...
> f = fiber.create(vshard.storage.internal.gc_bucket_f)
> ---
> ...
> -- Wait until garbage collection is finished.
> -while s2:count() ~= 3 or s:count() ~= 6 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return s2:count() == 3 and s:count() == 6 end)
> ---
> +- true
> ...
> s:select{}
> ---
> @@ -360,7 +378,6 @@ _bucket:select{}
> - - [1, 'active']
> - [2, 'receiving']
> - [3, 'active']
> - - [4, 'sent']
> ...
> --
> -- Test deletion of 'sent' buckets after a specified timeout.
> @@ -370,8 +387,9 @@ _bucket:replace{2, vshard.consts.BUCKET.SENT}
> - [2, 'sent']
> ...
> -- Wait deletion after a while.
> -while _bucket:get{2} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{2} end)
> ---
> +- true
> ...
> _bucket:select{}
> ---
> @@ -410,8 +428,9 @@ _bucket:replace{4, vshard.consts.BUCKET.SENT}
> ---
> - [4, 'sent']
> ...
> -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{4} end)
> ---
> +- true
> ...
> --
> -- Test WAL errors during deletion from _bucket.
> @@ -434,11 +453,14 @@ s:replace{6, 4}
> ---
> - [6, 4]
> ...
> -while not test_run:grep_log("default", "Error during deletion of empty sent buckets") do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_log('default', 'Error during garbage collection step', \
> + 65536, 10)
> ---
> +- Error during garbage collection step
> ...
> -while #sk:select{4} ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return #sk:select{4} == 0 end)
> ---
> +- true
> ...
> s:select{}
> ---
> @@ -454,8 +476,9 @@ _bucket:select{}
> _ = _bucket:on_replace(nil, rollback_on_delete)
> ---
> ...
> -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{4} end)
> ---
> +- true
> ...
> f:cancel()
> ---
> @@ -562,8 +585,9 @@ for i = 1, 2000 do _bucket:replace{i, vshard.consts.BUCKET.GARBAGE} s:replace{i,
> f = fiber.create(vshard.storage.internal.gc_bucket_f)
> ---
> ...
> -while _bucket:count() ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return _bucket:count() == 0 end)
> ---
> +- true
> ...
> _bucket:select{}
> ---
> diff --git a/test/unit/garbage.test.lua b/test/unit/garbage.test.lua
> index 30079fa..250afb0 100644
> --- a/test/unit/garbage.test.lua
> +++ b/test/unit/garbage.test.lua
> @@ -15,7 +15,6 @@ end;
> test_run:cmd("setopt delimiter ''");
>
> vshard.storage.internal.shard_index = 'bucket_id'
> -vshard.storage.internal.collect_bucket_garbage_interval = vshard.consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL
>
> --
> -- Find nothing if no bucket_id anywhere, or there is no index
> @@ -75,16 +74,13 @@ s:drop()
> format = {}
> format[1] = {name = 'id', type = 'unsigned'}
> format[2] = {name = 'status', type = 'string'}
> +format[3] = {name = 'destination', type = 'string', is_nullable = true}
> _bucket = box.schema.create_space('_bucket', {format = format})
> _ = _bucket:create_index('pk')
> _ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false})
> _bucket:replace{1, vshard.consts.BUCKET.ACTIVE}
> _bucket:replace{2, vshard.consts.BUCKET.RECEIVING}
> _bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
> -_bucket:replace{4, vshard.consts.BUCKET.SENT}
> -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> -_bucket:replace{6, vshard.consts.BUCKET.GARBAGE}
> -_bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
>
> s = box.schema.create_space('test', {engine = engine})
> pk = s:create_index('pk')
> @@ -94,7 +90,7 @@ s:replace{2, 1}
> s:replace{3, 2}
> s:replace{4, 2}
>
> -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
> +gc_bucket_drop = vshard.storage.internal.gc_bucket_drop
> s2 = box.schema.create_space('test2', {engine = engine})
> pk2 = s2:create_index('pk')
> sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
> @@ -114,6 +110,10 @@ function fill_spaces_with_garbage()
> s2:replace{6, 4}
> s2:replace{7, 5}
> s2:replace{7, 6}
> + _bucket:replace{4, vshard.consts.BUCKET.SENT, 'destination1'}
> + _bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> + _bucket:replace{6, vshard.consts.BUCKET.GARBAGE, 'destination2'}
> + _bucket:replace{200, vshard.consts.BUCKET.GARBAGE}
> end;
> test_run:cmd("setopt delimiter ''");
>
> @@ -121,15 +121,21 @@ fill_spaces_with_garbage()
>
> #s2:select{}
> #s:select{}
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> +route_map = {}
> +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
> +route_map
> #s2:select{}
> #s:select{}
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> +route_map = {}
> +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
> +route_map
> s2:select{}
> s:select{}
> -- Nothing deleted - update collected generation.
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> +route_map = {}
> +gc_bucket_drop(vshard.consts.BUCKET.GARBAGE, route_map)
> +gc_bucket_drop(vshard.consts.BUCKET.SENT, route_map)
> +route_map
> #s2:select{}
> #s:select{}
>
> @@ -137,10 +143,14 @@ gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> -- Test continuous garbage collection via background fiber.
> --
> fill_spaces_with_garbage()
> -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
> +_ = _bucket:on_replace(function() \
> + local gen = vshard.storage.internal.bucket_generation \
> + vshard.storage.internal.bucket_generation = gen + 1 \
> + vshard.storage.internal.bucket_generation_cond:broadcast() \
> +end)
> f = fiber.create(vshard.storage.internal.gc_bucket_f)
> -- Wait until garbage collection is finished.
> -while s2:count() ~= 3 or s:count() ~= 6 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return s2:count() == 3 and s:count() == 6 end)
> s:select{}
> s2:select{}
> -- Check garbage bucket is deleted by background fiber.
> @@ -150,7 +160,7 @@ _bucket:select{}
> --
> _bucket:replace{2, vshard.consts.BUCKET.SENT}
> -- Wait deletion after a while.
> -while _bucket:get{2} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{2} end)
> _bucket:select{}
> s:select{}
> s2:select{}
> @@ -162,7 +172,7 @@ _bucket:replace{4, vshard.consts.BUCKET.ACTIVE}
> s:replace{5, 4}
> s:replace{6, 4}
> _bucket:replace{4, vshard.consts.BUCKET.SENT}
> -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{4} end)
>
> --
> -- Test WAL errors during deletion from _bucket.
> @@ -172,12 +182,13 @@ _ = _bucket:on_replace(rollback_on_delete)
> _bucket:replace{4, vshard.consts.BUCKET.SENT}
> s:replace{5, 4}
> s:replace{6, 4}
> -while not test_run:grep_log("default", "Error during deletion of empty sent buckets") do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> -while #sk:select{4} ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_log('default', 'Error during garbage collection step', \
> + 65536, 10)
> +test_run:wait_cond(function() return #sk:select{4} == 0 end)
> s:select{}
> _bucket:select{}
> _ = _bucket:on_replace(nil, rollback_on_delete)
> -while _bucket:get{4} ~= nil do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return not _bucket:get{4} end)
>
> f:cancel()
>
> @@ -220,7 +231,7 @@ for i = 1, 2000 do _bucket:replace{i, vshard.consts.BUCKET.GARBAGE} s:replace{i,
> #s:select{}
> #s2:select{}
> f = fiber.create(vshard.storage.internal.gc_bucket_f)
> -while _bucket:count() ~= 0 do vshard.storage.garbage_collector_wakeup() fiber.sleep(0.001) end
> +test_run:wait_cond(function() return _bucket:count() == 0 end)
> _bucket:select{}
> s:select{}
> s2:select{}
> diff --git a/test/unit/garbage_errinj.result b/test/unit/garbage_errinj.result
> deleted file mode 100644
> index 92c8039..0000000
> --- a/test/unit/garbage_errinj.result
> +++ /dev/null
> @@ -1,223 +0,0 @@
> -test_run = require('test_run').new()
> ----
> -...
> -vshard = require('vshard')
> ----
> -...
> -fiber = require('fiber')
> ----
> -...
> -engine = test_run:get_cfg('engine')
> ----
> -...
> -vshard.storage.internal.shard_index = 'bucket_id'
> ----
> -...
> -format = {}
> ----
> -...
> -format[1] = {name = 'id', type = 'unsigned'}
> ----
> -...
> -format[2] = {name = 'status', type = 'string', is_nullable = true}
> ----
> -...
> -_bucket = box.schema.create_space('_bucket', {format = format})
> ----
> -...
> -_ = _bucket:create_index('pk')
> ----
> -...
> -_ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false})
> ----
> -...
> -_bucket:replace{1, vshard.consts.BUCKET.ACTIVE}
> ----
> -- [1, 'active']
> -...
> -_bucket:replace{2, vshard.consts.BUCKET.RECEIVING}
> ----
> -- [2, 'receiving']
> -...
> -_bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
> ----
> -- [3, 'active']
> -...
> -_bucket:replace{4, vshard.consts.BUCKET.SENT}
> ----
> -- [4, 'sent']
> -...
> -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> ----
> -- [5, 'garbage']
> -...
> -s = box.schema.create_space('test', {engine = engine})
> ----
> -...
> -pk = s:create_index('pk')
> ----
> -...
> -sk = s:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
> ----
> -...
> -s:replace{1, 1}
> ----
> -- [1, 1]
> -...
> -s:replace{2, 1}
> ----
> -- [2, 1]
> -...
> -s:replace{3, 2}
> ----
> -- [3, 2]
> -...
> -s:replace{4, 2}
> ----
> -- [4, 2]
> -...
> -s:replace{5, 100}
> ----
> -- [5, 100]
> -...
> -s:replace{6, 100}
> ----
> -- [6, 100]
> -...
> -s:replace{7, 4}
> ----
> -- [7, 4]
> -...
> -s:replace{8, 5}
> ----
> -- [8, 5]
> -...
> -s2 = box.schema.create_space('test2', {engine = engine})
> ----
> -...
> -pk2 = s2:create_index('pk')
> ----
> -...
> -sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
> ----
> -...
> -s2:replace{1, 1}
> ----
> -- [1, 1]
> -...
> -s2:replace{3, 3}
> ----
> -- [3, 3]
> -...
> -for i = 7, 1107 do s:replace{i, 200} end
> ----
> -...
> -s2:replace{4, 200}
> ----
> -- [4, 200]
> -...
> -s2:replace{5, 100}
> ----
> -- [5, 100]
> -...
> -s2:replace{5, 300}
> ----
> -- [5, 300]
> -...
> -s2:replace{6, 4}
> ----
> -- [6, 4]
> -...
> -s2:replace{7, 5}
> ----
> -- [7, 5]
> -...
> -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
> ----
> -...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> ----
> -- - 4
> -- true
> -...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> ----
> -- - 5
> -- true
> -...
> ---
> --- Test _bucket generation change during garbage buckets search.
> ---
> -s:truncate()
> ----
> -...
> -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
> ----
> -...
> -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = true
> ----
> -...
> -f = fiber.create(function() gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) end)
> ----
> -...
> -_bucket:replace{4, vshard.consts.BUCKET.GARBAGE}
> ----
> -- [4, 'garbage']
> -...
> -s:replace{5, 4}
> ----
> -- [5, 4]
> -...
> -s:replace{6, 4}
> ----
> -- [6, 4]
> -...
> -#s:select{}
> ----
> -- 2
> -...
> -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false
> ----
> -...
> -while f:status() ~= 'dead' do fiber.sleep(0.1) end
> ----
> -...
> --- Nothing is deleted - _bucket:replace() has changed _bucket
> --- generation during search of garbage buckets.
> -#s:select{}
> ----
> -- 2
> -...
> -_bucket:select{4}
> ----
> -- - [4, 'garbage']
> -...
> --- Next step deletes garbage ok.
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> ----
> -- []
> -- true
> -...
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> ----
> -- - 4
> - - 5
> -- true
> -...
> -#s:select{}
> ----
> -- 0
> -...
> -_bucket:delete{4}
> ----
> -- [4, 'garbage']
> -...
> -s2:drop()
> ----
> -...
> -s:drop()
> ----
> -...
> -_bucket:drop()
> ----
> -...
> diff --git a/test/unit/garbage_errinj.test.lua b/test/unit/garbage_errinj.test.lua
> deleted file mode 100644
> index 31184b9..0000000
> --- a/test/unit/garbage_errinj.test.lua
> +++ /dev/null
> @@ -1,73 +0,0 @@
> -test_run = require('test_run').new()
> -vshard = require('vshard')
> -fiber = require('fiber')
> -
> -engine = test_run:get_cfg('engine')
> -vshard.storage.internal.shard_index = 'bucket_id'
> -
> -format = {}
> -format[1] = {name = 'id', type = 'unsigned'}
> -format[2] = {name = 'status', type = 'string', is_nullable = true}
> -_bucket = box.schema.create_space('_bucket', {format = format})
> -_ = _bucket:create_index('pk')
> -_ = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false})
> -_bucket:replace{1, vshard.consts.BUCKET.ACTIVE}
> -_bucket:replace{2, vshard.consts.BUCKET.RECEIVING}
> -_bucket:replace{3, vshard.consts.BUCKET.ACTIVE}
> -_bucket:replace{4, vshard.consts.BUCKET.SENT}
> -_bucket:replace{5, vshard.consts.BUCKET.GARBAGE}
> -
> -s = box.schema.create_space('test', {engine = engine})
> -pk = s:create_index('pk')
> -sk = s:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
> -s:replace{1, 1}
> -s:replace{2, 1}
> -s:replace{3, 2}
> -s:replace{4, 2}
> -s:replace{5, 100}
> -s:replace{6, 100}
> -s:replace{7, 4}
> -s:replace{8, 5}
> -
> -s2 = box.schema.create_space('test2', {engine = engine})
> -pk2 = s2:create_index('pk')
> -sk2 = s2:create_index('bucket_id', {parts = {{2, 'unsigned'}}, unique = false})
> -s2:replace{1, 1}
> -s2:replace{3, 3}
> -for i = 7, 1107 do s:replace{i, 200} end
> -s2:replace{4, 200}
> -s2:replace{5, 100}
> -s2:replace{5, 300}
> -s2:replace{6, 4}
> -s2:replace{7, 5}
> -
> -gc_bucket_step_by_type = vshard.storage.internal.gc_bucket_step_by_type
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> -
> ---
> --- Test _bucket generation change during garbage buckets search.
> ---
> -s:truncate()
> -_ = _bucket:on_replace(function() vshard.storage.internal.bucket_generation = vshard.storage.internal.bucket_generation + 1 end)
> -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = true
> -f = fiber.create(function() gc_bucket_step_by_type(vshard.consts.BUCKET.SENT) gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE) end)
> -_bucket:replace{4, vshard.consts.BUCKET.GARBAGE}
> -s:replace{5, 4}
> -s:replace{6, 4}
> -#s:select{}
> -vshard.storage.internal.errinj.ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false
> -while f:status() ~= 'dead' do fiber.sleep(0.1) end
> --- Nothing is deleted - _bucket:replace() has changed _bucket
> --- generation during search of garbage buckets.
> -#s:select{}
> -_bucket:select{4}
> --- Next step deletes garbage ok.
> -gc_bucket_step_by_type(vshard.consts.BUCKET.SENT)
> -gc_bucket_step_by_type(vshard.consts.BUCKET.GARBAGE)
> -#s:select{}
> -_bucket:delete{4}
> -
> -s2:drop()
> -s:drop()
> -_bucket:drop()
> diff --git a/vshard/cfg.lua b/vshard/cfg.lua
> index 28c3400..1345058 100644
> --- a/vshard/cfg.lua
> +++ b/vshard/cfg.lua
> @@ -245,9 +245,7 @@ local cfg_template = {
> max = consts.REBALANCER_MAX_SENDING_MAX
> },
> collect_bucket_garbage_interval = {
> - type = 'positive number', name = 'Garbage bucket collect interval',
> - is_optional = true,
> - default = consts.DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL
> + name = 'Garbage bucket collect interval', is_deprecated = true,
> },
> collect_lua_garbage = {
> type = 'boolean', name = 'Garbage Lua collect necessity',
> diff --git a/vshard/consts.lua b/vshard/consts.lua
> index 8c2a8b0..3f1585a 100644
> --- a/vshard/consts.lua
> +++ b/vshard/consts.lua
> @@ -23,6 +23,7 @@ return {
> DEFAULT_BUCKET_COUNT = 3000;
> BUCKET_SENT_GARBAGE_DELAY = 0.5;
> BUCKET_CHUNK_SIZE = 1000;
> + LUA_CHUNK_SIZE = 100000,
> DEFAULT_REBALANCER_DISBALANCE_THRESHOLD = 1;
> REBALANCER_IDLE_INTERVAL = 60 * 60;
> REBALANCER_WORK_INTERVAL = 10;
> @@ -37,7 +38,7 @@ return {
> DEFAULT_FAILOVER_PING_TIMEOUT = 5;
> DEFAULT_SYNC_TIMEOUT = 1;
> RECONNECT_TIMEOUT = 0.5;
> - DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5;
> + GC_BACKOFF_INTERVAL = 5,
> RECOVERY_INTERVAL = 5;
> COLLECT_LUA_GARBAGE_INTERVAL = 100;
>
> @@ -45,4 +46,6 @@ return {
> DISCOVERY_WORK_INTERVAL = 1,
> DISCOVERY_WORK_STEP = 0.01,
> DISCOVERY_TIMEOUT = 10,
> +
> + TIMEOUT_INFINITY = 500 * 365 * 86400,
> }
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 298df71..31a6fc7 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -69,7 +69,6 @@ if not M then
> total_bucket_count = 0,
> errinj = {
> ERRINJ_CFG = false,
> - ERRINJ_BUCKET_FIND_GARBAGE_DELAY = false,
> ERRINJ_RELOAD = false,
> ERRINJ_CFG_DELAY = false,
> ERRINJ_LONG_RECEIVE = false,
> @@ -96,6 +95,8 @@ if not M then
> -- detect that _bucket was not changed between yields.
> --
> bucket_generation = 0,
> + -- Condition variable fired on generation update.
> + bucket_generation_cond = lfiber.cond(),
> --
> -- Reference to the function used as on_replace trigger on
> -- _bucket space. It is used to replace the trigger with
> @@ -107,12 +108,14 @@ if not M then
> -- replace the old function is to keep its reference.
> --
> bucket_on_replace = nil,
> + -- Redirects for recently sent buckets. They are kept for a while to
> + -- help routers to find a new location for sent and deleted buckets
> + -- without whole cluster scan.
> + route_map = {},
>
> ------------------- Garbage collection -------------------
> -- Fiber to remove garbage buckets data.
> collect_bucket_garbage_fiber = nil,
> - -- Do buckets garbage collection once per this time.
> - collect_bucket_garbage_interval = nil,
> -- Boolean lua_gc state (create periodic gc task).
> collect_lua_garbage = nil,
>
> @@ -173,6 +176,7 @@ end
> --
> local function bucket_generation_increment()
> M.bucket_generation = M.bucket_generation + 1
> + M.bucket_generation_cond:broadcast()
> end
>
> --
> @@ -758,8 +762,9 @@ local function bucket_check_state(bucket_id, mode)
> else
> return bucket
> end
> + local dst = bucket and bucket.destination or M.route_map[bucket_id]
> return bucket, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, reason,
> - bucket and bucket.destination)
> + dst)
> end
>
> --
> @@ -804,11 +809,23 @@ end
> --
> local function bucket_unrefro(bucket_id)
> local ref = M.bucket_refs[bucket_id]
> - if not ref or ref.ro == 0 then
> + local count = ref and ref.ro or 0
> + if count == 0 then
> return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id,
> "no refs", nil)
> end
> - ref.ro = ref.ro - 1
> + if count == 1 then
> + ref.ro = 0
> + if ref.ro_lock then
> + -- Garbage collector is waiting for the bucket if RO
> + -- is locked. Let it know it has one more bucket to
> + -- collect. It relies on generation, so its increment
> + -- it enough.
> + bucket_generation_increment()
> + end
> + return true
> + end
> + ref.ro = count - 1
> return true
> end
>
> @@ -1479,79 +1496,44 @@ local function gc_bucket_in_space(space, bucket_id, status)
> end
>
> --
> --- Remove tuples from buckets of a specified type.
> --- @param type Type of buckets to gc.
> --- @retval List of ids of empty buckets of the type.
> +-- Drop buckets with the given status along with their data in all spaces.
> +-- @param status Status of target buckets.
> +-- @param route_map Destinations of deleted buckets are saved into this table.
> --
> -local function gc_bucket_step_by_type(type)
> - local sharded_spaces = find_sharded_spaces()
> - local empty_buckets = {}
> +local function gc_bucket_drop_xc(status, route_map)
> local limit = consts.BUCKET_CHUNK_SIZE
> - local is_all_collected = true
> - for _, bucket in box.space._bucket.index.status:pairs(type) do
> - local bucket_id = bucket.id
> - local ref = M.bucket_refs[bucket_id]
> + local _bucket = box.space._bucket
> + local sharded_spaces = find_sharded_spaces()
> + for _, b in _bucket.index.status:pairs(status) do
> + local id = b.id
> + local ref = M.bucket_refs[id]
> if ref then
> assert(ref.rw == 0)
> if ref.ro ~= 0 then
> ref.ro_lock = true
> - is_all_collected = false
> goto continue
> end
> - M.bucket_refs[bucket_id] = nil
> + M.bucket_refs[id] = nil
> end
> for _, space in pairs(sharded_spaces) do
> - gc_bucket_in_space_xc(space, bucket_id, type)
> + gc_bucket_in_space_xc(space, id, status)
> limit = limit - 1
> if limit == 0 then
> lfiber.sleep(0)
> limit = consts.BUCKET_CHUNK_SIZE
> end
> end
> - table.insert(empty_buckets, bucket.id)
> -::continue::
> + route_map[id] = b.destination
> + _bucket:delete{id}
> + ::continue::
> end
> - return empty_buckets, is_all_collected
> -end
> -
> ---
> --- Drop buckets with ids in the list.
> --- @param bucket_ids Bucket ids to drop.
> --- @param status Expected bucket status.
> ---
> -local function gc_bucket_drop_xc(bucket_ids, status)
> - if #bucket_ids == 0 then
> - return
> - end
> - local limit = consts.BUCKET_CHUNK_SIZE
> - box.begin()
> - local _bucket = box.space._bucket
> - for _, id in pairs(bucket_ids) do
> - local bucket_exists = _bucket:get{id} ~= nil
> - local b = _bucket:get{id}
> - if b then
> - if b.status ~= status then
> - return error(string.format('Bucket %d status is changed. Was '..
> - '%s, became %s', id, status,
> - b.status))
> - end
> - _bucket:delete{id}
> - end
> - limit = limit - 1
> - if limit == 0 then
> - box.commit()
> - box.begin()
> - limit = consts.BUCKET_CHUNK_SIZE
> - end
> - end
> - box.commit()
> end
>
> --
> -- Exception safe version of gc_bucket_drop_xc.
> --
> -local function gc_bucket_drop(bucket_ids, status)
> - local status, err = pcall(gc_bucket_drop_xc, bucket_ids, status)
> +local function gc_bucket_drop(status, route_map)
> + local status, err = pcall(gc_bucket_drop_xc, status, route_map)
> if not status then
> box.rollback()
> end
> @@ -1578,65 +1560,75 @@ function gc_bucket_f()
> -- generation == bucket generation. In such a case the fiber
> -- does nothing until next _bucket change.
> local bucket_generation_collected = -1
> - -- Empty sent buckets are collected into an array. After a
> - -- specified time interval the buckets are deleted both from
> - -- this array and from _bucket space.
> - local buckets_for_redirect = {}
> - local buckets_for_redirect_ts = clock()
> - -- Empty sent buckets, updated after each step, and when
> - -- buckets_for_redirect is deleted, it gets empty_sent_buckets
> - -- for next deletion.
> - local empty_garbage_buckets, empty_sent_buckets, status, err
> + local bucket_generation_current = M.bucket_generation
> + -- Deleted buckets are saved into a route map to redirect routers if they
> + -- didn't discover new location of the buckets yet. However route map does
> + -- not grow infinitely. Otherwise it would end up storing redirects for all
> + -- buckets in the cluster. Which could also be outdated.
> + -- Garbage collector periodically drops old routes from the map. For that it
> + -- remembers state of route map in one moment, and after a while clears the
> + -- remembered routes from the global route map.
> + local route_map = M.route_map
> + local route_map_old = {}
> + local route_map_deadline = 0
> + local status, err
> while M.module_version == module_version do
> - -- Check if no changes in buckets configuration.
> - if bucket_generation_collected ~= M.bucket_generation then
> - local bucket_generation = M.bucket_generation
> - local is_sent_collected, is_garbage_collected
> - status, empty_garbage_buckets, is_garbage_collected =
> - pcall(gc_bucket_step_by_type, consts.BUCKET.GARBAGE)
> - if not status then
> - err = empty_garbage_buckets
> - goto check_error
> - end
> - status, empty_sent_buckets, is_sent_collected =
> - pcall(gc_bucket_step_by_type, consts.BUCKET.SENT)
> - if not status then
> - err = empty_sent_buckets
> - goto check_error
> + if bucket_generation_collected ~= bucket_generation_current then
> + status, err = gc_bucket_drop(consts.BUCKET.GARBAGE, route_map)
> + if status then
> + status, err = gc_bucket_drop(consts.BUCKET.SENT, route_map)
> end
> - status, err = gc_bucket_drop(empty_garbage_buckets,
> - consts.BUCKET.GARBAGE)
> -::check_error::
> if not status then
> box.rollback()
> log.error('Error during garbage collection step: %s', err)
> - goto continue
> + else
> + -- Don't use global generation. During the collection it could
> + -- already change. Instead, remember the generation known before
> + -- the collection has started.
> + -- Since the collection also changes the generation, it makes
> + -- the GC happen always at least twice. But typically on the
> + -- second iteration it should not find any buckets to collect,
> + -- and then the collected generation matches the global one.
> + bucket_generation_collected = bucket_generation_current
> end
> - if is_sent_collected and is_garbage_collected then
> - bucket_generation_collected = bucket_generation
> + else
> + status = true
> + end
> +
> + local sleep_time = route_map_deadline - clock()
> + if sleep_time <= 0 then
> + local chunk = consts.LUA_CHUNK_SIZE
> + util.table_minus_yield(route_map, route_map_old, chunk)
> + route_map_old = util.table_copy_yield(route_map, chunk)
> + if next(route_map_old) then
> + sleep_time = consts.BUCKET_SENT_GARBAGE_DELAY
> + else
> + sleep_time = consts.TIMEOUT_INFINITY
> end
> + route_map_deadline = clock() + sleep_time
> end
> + bucket_generation_current = M.bucket_generation
>
> - if clock() - buckets_for_redirect_ts >=
> - consts.BUCKET_SENT_GARBAGE_DELAY then
> - status, err = gc_bucket_drop(buckets_for_redirect,
> - consts.BUCKET.SENT)
> - if not status then
> - buckets_for_redirect = {}
> - empty_sent_buckets = {}
> - bucket_generation_collected = -1
> - log.error('Error during deletion of empty sent buckets: %s',
> - err)
> - elseif M.module_version ~= module_version then
> - return
> + if bucket_generation_current ~= bucket_generation_collected then
> + -- Generation was changed during collection. Or *by* collection.
> + if status then
> + -- Retry immediately. If the generation was changed by the
> + -- collection itself, it will notice it next iteration, and go
> + -- to proper sleep.
> + sleep_time = 0
> else
> - buckets_for_redirect = empty_sent_buckets or {}
> - empty_sent_buckets = nil
> - buckets_for_redirect_ts = clock()
> + -- An error happened during the collection. Does not make sense
> + -- to retry on each iteration of the event loop. The most likely
> + -- errors are either a WAL error or a transaction abort - both
> + -- look like an issue in the user's code and can't be fixed
> + -- quickly anyway. Backoff.
> + sleep_time = consts.GC_BACKOFF_INTERVAL
> end
> end
> -::continue::
> - lfiber.sleep(M.collect_bucket_garbage_interval)
> +
> + if M.module_version == module_version then
> + M.bucket_generation_cond:wait(sleep_time)
> + end
> end
> end
>
> @@ -2421,8 +2413,6 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
> vshard_cfg.rebalancer_disbalance_threshold
> M.rebalancer_receiving_quota = vshard_cfg.rebalancer_max_receiving
> M.shard_index = vshard_cfg.shard_index
> - M.collect_bucket_garbage_interval =
> - vshard_cfg.collect_bucket_garbage_interval
> M.collect_lua_garbage = vshard_cfg.collect_lua_garbage
> M.rebalancer_worker_count = vshard_cfg.rebalancer_max_sending
> M.current_cfg = cfg
> @@ -2676,6 +2666,9 @@ else
> storage_cfg(M.current_cfg, M.this_replica.uuid, true)
> end
> M.module_version = M.module_version + 1
> + -- Background fibers could sleep waiting for bucket changes.
> + -- Let them know it is time to reload.
> + bucket_generation_increment()
> end
>
> M.recovery_f = recovery_f
> @@ -2686,7 +2679,7 @@ M.gc_bucket_f = gc_bucket_f
> -- These functions are saved in M not for atomic reload, but for
> -- unit testing.
> --
> -M.gc_bucket_step_by_type = gc_bucket_step_by_type
> +M.gc_bucket_drop = gc_bucket_drop
> M.rebalancer_build_routes = rebalancer_build_routes
> M.rebalancer_calculate_metrics = rebalancer_calculate_metrics
> M.cached_find_sharded_spaces = find_sharded_spaces
> diff --git a/vshard/storage/reload_evolution.lua b/vshard/storage/reload_evolution.lua
> index f38af74..484f499 100644
> --- a/vshard/storage/reload_evolution.lua
> +++ b/vshard/storage/reload_evolution.lua
> @@ -4,6 +4,7 @@
> -- in a commit.
> --
> local log = require('log')
> +local fiber = require('fiber')
>
> --
> -- Array of upgrade functions.
> @@ -25,6 +26,13 @@ migrations[#migrations + 1] = function(M)
> end
> end
>
> +migrations[#migrations + 1] = function(M)
> + if not M.route_map then
> + M.bucket_generation_cond = fiber.cond()
> + M.route_map = {}
> + end
> +end
> +
> --
> -- Perform an update based on a version stored in `M` (internals).
> -- @param M Old module internals which should be updated.
next prev parent reply other threads:[~2021-02-10 9:00 UTC|newest]
Thread overview: 36+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-02-09 23:46 [Tarantool-patches] [PATCH 0/9] VShard Map-Reduce, part 1, preparations Vladislav Shpilevoy via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 1/9] rlist: move rlist to a new module Vladislav Shpilevoy via Tarantool-patches
2021-02-10 8:57 ` Oleg Babin via Tarantool-patches
2021-02-11 6:50 ` Oleg Babin via Tarantool-patches
2021-02-12 0:09 ` Vladislav Shpilevoy via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 2/9] Use fiber.clock() instead of .time() everywhere Vladislav Shpilevoy via Tarantool-patches
2021-02-10 8:57 ` Oleg Babin via Tarantool-patches
2021-02-10 22:33 ` Vladislav Shpilevoy via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 3/9] test: introduce a helper to wait for bucket GC Vladislav Shpilevoy via Tarantool-patches
2021-02-10 8:57 ` Oleg Babin via Tarantool-patches
2021-02-10 22:33 ` Vladislav Shpilevoy via Tarantool-patches
2021-02-11 6:50 ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 4/9] storage: bucket_recv() should check rs lock Vladislav Shpilevoy via Tarantool-patches
2021-02-10 8:59 ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 5/9] util: introduce yielding table functions Vladislav Shpilevoy via Tarantool-patches
2021-02-10 8:59 ` Oleg Babin via Tarantool-patches
2021-02-10 22:34 ` Vladislav Shpilevoy via Tarantool-patches
2021-02-11 6:50 ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 6/9] cfg: introduce 'deprecated option' feature Vladislav Shpilevoy via Tarantool-patches
2021-02-10 8:59 ` Oleg Babin via Tarantool-patches
2021-02-10 22:34 ` Vladislav Shpilevoy via Tarantool-patches
2021-02-11 6:50 ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 7/9] gc: introduce reactive garbage collector Vladislav Shpilevoy via Tarantool-patches
2021-02-10 9:00 ` Oleg Babin via Tarantool-patches [this message]
2021-02-10 22:35 ` Vladislav Shpilevoy via Tarantool-patches
2021-02-11 6:50 ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 8/9] recovery: introduce reactive recovery Vladislav Shpilevoy via Tarantool-patches
2021-02-10 9:00 ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 9/9] util: introduce binary heap data structure Vladislav Shpilevoy via Tarantool-patches
2021-02-10 9:01 ` Oleg Babin via Tarantool-patches
2021-02-10 22:36 ` Vladislav Shpilevoy via Tarantool-patches
2021-02-11 6:51 ` Oleg Babin via Tarantool-patches
2021-02-12 0:09 ` Vladislav Shpilevoy via Tarantool-patches
2021-03-05 22:03 ` Vladislav Shpilevoy via Tarantool-patches
2021-02-09 23:51 ` [Tarantool-patches] [PATCH 0/9] VShard Map-Reduce, part 1, preparations Vladislav Shpilevoy via Tarantool-patches
2021-02-12 11:02 ` Oleg Babin via Tarantool-patches
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=68b5d6a6-1fb4-913d-61ac-e3062683e325@tarantool.org \
--to=tarantool-patches@dev.tarantool.org \
--cc=olegrok@tarantool.org \
--cc=v.shpilevoy@tarantool.org \
--cc=yaroslav.dynnikov@tarantool.org \
--subject='Re: [Tarantool-patches] [PATCH 7/9] gc: introduce reactive garbage collector' \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox