[Tarantool-patches] [PATCH vshard 6/7] router: make discovery smoother in a big cluster
Oleg Babin
olegrok at tarantool.org
Mon May 4 17:26:56 MSK 2020
Thanks for answers and changes! LGTM.
I left my answers and one nit to new patch diff.
On 02/05/2020 23:12, Vladislav Shpilevoy wrote:
> Thanks for the review!
>
> On 01/05/2020 19:01, Oleg Babin wrote:
>> Thanks for your patch! See my comments below.
>>
>> On 01/05/2020 03:16, Vladislav Shpilevoy wrote:
>>> Router does discovery once per 10 seconds. Discovery sends a
>>> request to each replicaset to download all pinned and active
>>> buckets from there. When there are millions of buckets, that
>>> becomes a long operation taking seconds, during which the storage
>>> is unresponsive.
>>>
>>> The patch makes discovery work step by step, downloading not more
>>> than 1000 buckets at a time. That gives the storage time to
>>> process other requests.
>>>
>>> Moreover, discovery now has some kind of 'state'. For each
>>> replicaset it keeps an iterator which is moved by 1k buckets on
>>> every successfully discovered bucket batch. It means, that if on a
>>> replicaset with 1 000 000 buckets discovery fails after 999 999
>>> buckets are already discovered, it won't start from 0. It will
>>> retry from the old position.
>>
>> Could you provide a test for such case?
>
> No problem.
> ... > I couldn't come up with a sane test on whether router continues
> discovery from exactly the same place where it got an error. I would
> need to insert some extraordinary weird injections into router to
> collect statistics of that kind. But the test at least shows, that an
> error does not prevent full discovery eventually.
>
Thanks, I believe it's enough.
>>> However, still there is space for improvement. Discovery could
>>> avoid downloading anything after all is downloaded, if it could
>>> somehow see, if bucket space is not changed. Unfortunately it is
>>> not so easy, since bucket generation (version of _bucket space)
>>> is not persisted. So after instance restart it is always equal to
>>> bucket count.
>>
>> Is there an issue for that?
>
> I was hoping you wouldn't ask :D
>
> I don't really see how that optimization could be implemented in a sane
> way in terms of simplicity. Looks like a storage would need to send
> a bucket generation + some timestamp to protect form the case when
> after restart bucket set is different, but generation is the same.
>
> I was going to wait until somebody explicitly asks for it, if
> 'discovery_mode = "once"' won't be enough.
>
> But ok, I was caught. Here is the issue:
> https://github.com/tarantool/vshard/issues/238
>
Hah, I don't know but when I read this part of commit message I thought
about trigger on the "_bucket" space that could update some "generation"
value may be in "_schema" space. I don't know could it be appropriate
way to implement this feature but I asked to make sure that this propose
won't be lost.
>>> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>>> index 6d88153..26ea85b 100644
>>> --- a/vshard/router/init.lua
>>> +++ b/vshard/router/init.lua
>>> @@ -250,50 +250,127 @@ if util.version_is_at_least(1, 10, 0) then
>>> discovery_f = function(router)
>>> local module_version = M.module_version
>>> assert(router.discovery_mode == 'on')
>>> + local iterators = {}
>>> + local opts = {is_async = true}
>>> + local mode
>>> while module_version == M.module_version do
>>> - while not next(router.replicasets) do
>>> - lfiber.sleep(consts.DISCOVERY_INTERVAL)
>>> - end
>>> - if module_version ~= M.module_version then
>>> - return
>>> - end
>>> -- Just typical map reduce - send request to each
>>> - -- replicaset in parallel, and collect responses.
>>> - local pending = {}
>>> - local opts = {is_async = true}
>>> - local args = {}
>>> - for rs_uuid, replicaset in pairs(router.replicasets) do
>>> + -- replicaset in parallel, and collect responses. Many
>>> + -- requests probably will be needed for each replicaset.
>>> + --
>>> + -- Step 1: create missing iterators, in case this is a
>>> + -- first discovery iteration, or some replicasets were
>>> + -- added after the router is started.
>>> + for rs_uuid in pairs(router.replicasets) do
>>> + local iter = iterators[rs_uuid]
>>> + if not iter then
>>> + iterators[rs_uuid] = {
>>> + args = {{from = 1}},
>>> + future = nil,
>>> + }
>>> + end
>>> + end
>>> + -- Step 2: map stage - send parallel requests for every
>>> + -- iterator, prune orphan iterators whose replicasets were
>>> + -- removed.
>>> + for rs_uuid, iter in pairs(iterators) do
>>> + local replicaset = router.replicasets[rs_uuid]
>>> + if not replicaset then
>>> + log.warn('Replicaset %s was removed during discovery', rs_uuid)
>>> + iterators[rs_uuid] = nil
>>> + goto continue
>>> + end
>>> local future, err =
>>> - replicaset:callro('vshard.storage.buckets_discovery',
>>> - args, opts)
>>> + replicaset:callro('vshard.storage.buckets_discovery', iter.args,
>>> + opts)
>>> if not future then
>>> - log.warn('Error during discovery %s: %s', rs_uuid, err)
>>> - else
>>> - pending[rs_uuid] = future
>>> + log.warn('Error during discovery %s, retry will be done '..
>>> + 'later: %s', rs_uuid, err)
>>> + goto continue
>>> + end
>>> + iter.future = future
>>> + -- Don't spam many requests at once. Give
>>> + -- storages time to handle them and other
>>> + -- requests.
>>> + lfiber.sleep(consts.DISCOVERY_WORK_STEP)
>>> + if module_version ~= M.module_version then
>>> + return
>>> end
>>
>>
>> Is it possible to place such checks to some "common" places. At start/end of each iteration or between steps. This looks strange and a bit unobvous to do such checks after each timeout.
>
> Purpose is to stop the fiber as soon as possible in case a reload
> happened. So moving it to a common place is not really possible.
> It would make it work longer with the old code after reload. But
> if we keep the checks, I still need to do then after/before any
> long yield.
>
My main concern is that such things could be easily missed in future.
But I agree that we should left this function early as possible.
>>> + ::continue::
>>> end
>>> -
>>> - local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL
>>> - for rs_uuid, p in pairs(pending) do
>>> + -- Step 3: reduce stage - collect responses, restart
>>> + -- iterators which reached the end.
>>> + for rs_uuid, iter in pairs(iterators) do
>>> lfiber.yield()
>>> - local timeout = deadline - lfiber.clock()
>>> - local buckets, err = p:wait_result(timeout)
>>> - while M.errinj.ERRINJ_LONG_DISCOVERY do
>>> - M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
>>> - lfiber.sleep(0.01)
>>> + local future = iter.future
>>> + if not future then
>>> + goto continue
>>> end
>>> - local replicaset = router.replicasets[rs_uuid]
>>> - if not buckets then
>>> - p:discard()
>>> - log.warn('Error during discovery %s: %s', rs_uuid, err)
>>> - elseif module_version ~= M.module_version then
>>> + local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT)
>>> + if module_version ~= M.module_version then
>>
>> Does it have sence to call "discard" before "return"?
>
> Yes. Discard removes the entry from netbox's internal table of requests
> waiting for a result. I don't know how long the storage won't respond on
> that, so better free the memory now. The entry is not removed automatically
> in case it was a timeout error.
>
>>> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
>>> index 73c6740..0050b96 100644
>>> --- a/vshard/storage/init.lua
>>> +++ b/vshard/storage/init.lua
>>> @@ -1110,10 +1110,47 @@ local function bucket_collect(bucket_id)
>>> return data
>>> end
>>> +-- Discovery used by routers. It returns limited number of
>>> +-- buckets to avoid stalls when _bucket is huge.
>>> +local function buckets_discovery_extended(opts)
>>> + local limit = consts.BUCKET_CHUNK_SIZE
>>> + local buckets = table.new(limit, 0)
>>> + local active = consts.BUCKET.ACTIVE
>>> + local pinned = consts.BUCKET.PINNED
>>> + local next_from
>>> + -- No way to select by {status, id}, because there are two
>>> + -- statuses to select. A router would need to maintain a
>>> + -- separate iterator for each status it wants to get. This may
>>> + -- be implemented in future. But _bucket space anyway 99% of
>>> + -- time contains only active and pinned buckets. So there is
>>> + -- no big benefit in optimizing that. Perhaps a compound index
>>> + -- {status, id} could help too.
>>> + for _, bucket in box.space._bucket:pairs({opts.from},
>>> + {iterator = box.index.GE}) do
>>> + local status = bucket.status
>>> + if status == active or status == pinned then
>>> + table.insert(buckets, bucket.id)
>>> + end
>>> + limit = limit - 1
>>
>> It's a bit strange after words about "99% time", I propose to move limit decrease under if condition.
>
> But still there is this 1%, when buckets start moving. When bucket count
> is millions, rebalancing is likely to move thousands of buckets, likely
> from one range. For example, buckets from 1 to 10k can be moved. If in
> that moment will arrive a discovery request, it will stuck for a long time
> iterating over the sent and garbage buckets.
>
> Rebalancing is a heavy thing, and such additional load would make the
> cluster feel even worse.
>
> 99% here is rather about that it is not a problem to sometimes return an
> empty bucket set. Not about allowing perf problems in the other 1%.
>
Agree
>
> During working on your comments I found that during aggressive discovery I
> added sleep in a wrong place. Fixed below:
>
> ====================
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 26ea85b..28437e3 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -358,11 +358,14 @@ discovery_f = function(router)
> mode = 'idle'
> end
> lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
> - elseif mode ~= 'aggressive' then
> - log.info('Start aggressive discovery, %s buckets are unknown. '..
> - 'Discovery works with %s seconds interval',
> - unknown_bucket_count, consts.DISCOVERY_WORK_INTERVAL)
> - mode = 'aggressive'
> + else
> + if mode ~= 'aggressive' then
> + log.info('Start aggressive discovery, %s buckets are '..
> + 'unknown. Discovery works with %s seconds '..
> + 'interval', unknown_bucket_count,
> + consts.DISCOVERY_WORK_INTERVAL)
> + mode = 'aggressive'
> + end
> lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL)
> break
> end
>
> ====================
>
> Since the changes are big, below is the whole new commit:
>
> ====================
>
> router: make discovery smoother in a big cluster
>
> Router does discovery once per 10 seconds. Discovery sends a
> request to each replicaset to download all pinned and active
> buckets from there. When there are millions of buckets, that
> becomes a long operation taking seconds, during which the storage
> is unresponsive.
>
> The patch makes discovery work step by step, downloading not more
> than 1000 buckets at a time. That gives the storage time to
> process other requests.
>
> Moreover, discovery now has some kind of 'state'. For each
> replicaset it keeps an iterator which is moved by 1k buckets on
> every successfully discovered bucket batch. It means, that if on a
> replicaset with 1 000 000 buckets discovery fails after 999 999
> buckets are already discovered, it won't start from 0. It will
> retry from the old position.
>
> However, still there is space for improvement. Discovery could
> avoid downloading anything after all is downloaded, if it could
> somehow see, if bucket space is not changed. Unfortunately it is
> not so easy, since bucket generation (version of _bucket space)
> is not persisted. So after instance restart it is always equal to
> bucket count.
>
> Part of #210
>
> diff --git a/test/router/reload.result b/test/router/reload.result
> index 3ba900a..8fe99ba 100644
> --- a/test/router/reload.result
> +++ b/test/router/reload.result
> @@ -44,7 +44,10 @@ vshard.router.bootstrap()
> while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
> ---
> ...
> -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +---
> +...
> +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> ---
> ...
> --
> diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
> index 5ed5690..abcbc09 100644
> --- a/test/router/reload.test.lua
> +++ b/test/router/reload.test.lua
> @@ -15,7 +15,8 @@ fiber = require('fiber')
> vshard.router.bootstrap()
>
> while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
> -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
>
> --
> -- Gh-72: allow reload. Test simple reload, error during
> diff --git a/test/router/router.result b/test/router/router.result
> index df7be4a..b2efd6d 100644
> --- a/test/router/router.result
> +++ b/test/router/router.result
> @@ -943,9 +943,9 @@ calculate_known_buckets()
> ---
> - 3000
> ...
> -test_run:grep_log('router_1', 'was 1, became 1500')
> +test_run:grep_log('router_1', 'was 1, became 1000')
> ---
> -- was 1, became 1500
> +- was 1, became 1000
> ...
> info = vshard.router.info()
> ---
> diff --git a/test/router/router.test.lua b/test/router/router.test.lua
> index 97dce49..154310b 100644
> --- a/test/router/router.test.lua
> +++ b/test/router/router.test.lua
> @@ -316,7 +316,7 @@ vshard.storage.bucket_pin(first_active)
> _ = test_run:switch('router_1')
> wait_discovery()
> calculate_known_buckets()
> -test_run:grep_log('router_1', 'was 1, became 1500')
> +test_run:grep_log('router_1', 'was 1, became 1000')
> info = vshard.router.info()
> info.bucket
> info.alerts
> diff --git a/test/router/router2.result b/test/router/router2.result
> index 556f749..0ad5a21 100644
> --- a/test/router/router2.result
> +++ b/test/router/router2.result
> @@ -123,6 +123,66 @@ f1:status(), f2, f3:status(), f4:status(), f5, f6:status()
> | - suspended
> | ...
>
> +-- Errored discovery continued successfully after errors are gone.
> +vshard.router.bootstrap()
> + | ---
> + | - true
> + | ...
> +vshard.router.discovery_set('off')
> + | ---
> + | ...
> +vshard.router._route_map_clear()
> + | ---
> + | ...
> +
> +-- Discovery requests 2 and 4 will fail on storages.
> +util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}}, \
> + 'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4')
> + | ---
> + | ...
> +
> +vshard.router.info().bucket.unknown
> + | ---
> + | - 3000
> + | ...
> +vshard.router.discovery_set('on')
> + | ---
> + | ...
> +function continue_discovery() \
> + local res = vshard.router.info().bucket.unknown == 0 \
> + if not res then \
> + vshard.router.discovery_wakeup() \
> + end \
> + return res \
> +end
> + | ---
> + | ...
> +test_run:wait_cond(continue_discovery)
> + | ---
> + | - true
> + | ...
> +vshard.router.info().bucket.unknown
> + | ---
> + | - 0
> + | ...
> +
> +-- Discovery injections should be reset meaning they were returned
> +-- needed number of times.
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +vshard.storage.internal.errinj.ERRINJ_DISCOVERY
> + | ---
> + | - 0
> + | ...
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +vshard.storage.internal.errinj.ERRINJ_DISCOVERY
> + | ---
> + | - 0
> + | ...
> +
> _ = test_run:switch("default")
> | ---
> | ...
> diff --git a/test/router/router2.test.lua b/test/router/router2.test.lua
> index 33f4d3e..ef05f8c 100644
> --- a/test/router/router2.test.lua
> +++ b/test/router/router2.test.lua
> @@ -43,6 +43,34 @@ vshard.router.static.discovery_fiber:status()
>
> f1:status(), f2, f3:status(), f4:status(), f5, f6:status()
>
> +-- Errored discovery continued successfully after errors are gone.
> +vshard.router.bootstrap()
> +vshard.router.discovery_set('off')
> +vshard.router._route_map_clear()
> +
> +-- Discovery requests 2 and 4 will fail on storages.
> +util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}}, \
> + 'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4')
> +
> +vshard.router.info().bucket.unknown
> +vshard.router.discovery_set('on')
> +function continue_discovery() \
> + local res = vshard.router.info().bucket.unknown == 0 \
> + if not res then \
> + vshard.router.discovery_wakeup() \
> + end \
> + return res \
> +end
> +test_run:wait_cond(continue_discovery)
> +vshard.router.info().bucket.unknown
> +
> +-- Discovery injections should be reset meaning they were returned
> +-- needed number of times.
> +_ = test_run:switch('storage_1_a')
> +vshard.storage.internal.errinj.ERRINJ_DISCOVERY
> +_ = test_run:switch('storage_2_a')
> +vshard.storage.internal.errinj.ERRINJ_DISCOVERY
> +
> _ = test_run:switch("default")
> _ = test_run:cmd("stop server router_1")
> _ = test_run:cmd("cleanup server router_1")
> diff --git a/test/router/wrong_config.result b/test/router/wrong_config.result
> index 56db9e8..92353c3 100644
> --- a/test/router/wrong_config.result
> +++ b/test/router/wrong_config.result
> @@ -45,7 +45,10 @@ cfg.bucket_count = 1000
> r = vshard.router.new('gh-179', cfg)
> ---
> ...
> -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
> +while r:info().bucket.available_rw ~= 3000 do \
> + r:discovery_wakeup() \
> + fiber.sleep(0.1) \
> +end
> ---
> ...
> i = r:info()
> diff --git a/test/router/wrong_config.test.lua b/test/router/wrong_config.test.lua
> index 62ef30d..174b373 100644
> --- a/test/router/wrong_config.test.lua
> +++ b/test/router/wrong_config.test.lua
> @@ -18,7 +18,10 @@ vshard.router.bootstrap()
> --
> cfg.bucket_count = 1000
> r = vshard.router.new('gh-179', cfg)
> -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
> +while r:info().bucket.available_rw ~= 3000 do \
> + r:discovery_wakeup() \
> + fiber.sleep(0.1) \
> +end
> i = r:info()
> i.bucket
> i.alerts
> diff --git a/vshard/consts.lua b/vshard/consts.lua
> index 5391c0f..a6a8c1b 100644
> --- a/vshard/consts.lua
> +++ b/vshard/consts.lua
> @@ -40,5 +40,8 @@ return {
> DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5;
> RECOVERY_INTERVAL = 5;
> COLLECT_LUA_GARBAGE_INTERVAL = 100;
> - DISCOVERY_INTERVAL = 10;
> + DISCOVERY_IDLE_INTERVAL = 10,
> + DISCOVERY_WORK_INTERVAL = 1,
> + DISCOVERY_WORK_STEP = 0.01,
> + DISCOVERY_TIMEOUT = 10,
> }
Nit: here commas and semicolons are mixed. I think we should support it
in consistent state - only commas or only semicolons.
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 6d88153..28437e3 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -250,50 +250,130 @@ if util.version_is_at_least(1, 10, 0) then
> discovery_f = function(router)
> local module_version = M.module_version
> assert(router.discovery_mode == 'on')
> + local iterators = {}
> + local opts = {is_async = true}
> + local mode
> while module_version == M.module_version do
> - while not next(router.replicasets) do
> - lfiber.sleep(consts.DISCOVERY_INTERVAL)
> - end
> - if module_version ~= M.module_version then
> - return
> - end
> -- Just typical map reduce - send request to each
> - -- replicaset in parallel, and collect responses.
> - local pending = {}
> - local opts = {is_async = true}
> - local args = {}
> - for rs_uuid, replicaset in pairs(router.replicasets) do
> + -- replicaset in parallel, and collect responses. Many
> + -- requests probably will be needed for each replicaset.
> + --
> + -- Step 1: create missing iterators, in case this is a
> + -- first discovery iteration, or some replicasets were
> + -- added after the router is started.
> + for rs_uuid in pairs(router.replicasets) do
> + local iter = iterators[rs_uuid]
> + if not iter then
> + iterators[rs_uuid] = {
> + args = {{from = 1}},
> + future = nil,
> + }
> + end
> + end
> + -- Step 2: map stage - send parallel requests for every
> + -- iterator, prune orphan iterators whose replicasets were
> + -- removed.
> + for rs_uuid, iter in pairs(iterators) do
> + local replicaset = router.replicasets[rs_uuid]
> + if not replicaset then
> + log.warn('Replicaset %s was removed during discovery', rs_uuid)
> + iterators[rs_uuid] = nil
> + goto continue
> + end
> local future, err =
> - replicaset:callro('vshard.storage.buckets_discovery',
> - args, opts)
> + replicaset:callro('vshard.storage.buckets_discovery', iter.args,
> + opts)
> if not future then
> - log.warn('Error during discovery %s: %s', rs_uuid, err)
> - else
> - pending[rs_uuid] = future
> + log.warn('Error during discovery %s, retry will be done '..
> + 'later: %s', rs_uuid, err)
> + goto continue
> end
> + iter.future = future
> + -- Don't spam many requests at once. Give
> + -- storages time to handle them and other
> + -- requests.
> + lfiber.sleep(consts.DISCOVERY_WORK_STEP)
> + if module_version ~= M.module_version then
> + return
> + end
> + ::continue::
> end
> -
> - local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL
> - for rs_uuid, p in pairs(pending) do
> + -- Step 3: reduce stage - collect responses, restart
> + -- iterators which reached the end.
> + for rs_uuid, iter in pairs(iterators) do
> lfiber.yield()
> - local timeout = deadline - lfiber.clock()
> - local buckets, err = p:wait_result(timeout)
> - while M.errinj.ERRINJ_LONG_DISCOVERY do
> - M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
> - lfiber.sleep(0.01)
> + local future = iter.future
> + if not future then
> + goto continue
> end
> - local replicaset = router.replicasets[rs_uuid]
> - if not buckets then
> - p:discard()
> - log.warn('Error during discovery %s: %s', rs_uuid, err)
> - elseif module_version ~= M.module_version then
> + local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT)
> + if module_version ~= M.module_version then
> return
> - elseif replicaset then
> - discovery_handle_buckets(router, replicaset, buckets[1])
> end
> + if not result then
> + future:discard()
> + log.warn('Error during discovery %s, retry will be done '..
> + 'later: %s', rs_uuid, err)
> + goto continue
> + end
> + local replicaset = router.replicasets[rs_uuid]
> + if not replicaset then
> + iterators[rs_uuid] = nil
> + log.warn('Replicaset %s was removed during discovery', rs_uuid)
> + goto continue
> + end
> + result = result[1]
> + -- Buckets are returned as plain array by storages
> + -- using old vshard version. But if .buckets is set,
> + -- this is a new storage.
> + discovery_handle_buckets(router, replicaset,
> + result.buckets or result)
> + local discovery_args = iter.args[1]
> + discovery_args.from = result.next_from
> + if not result.next_from then
> + -- Nil next_from means no more buckets to get.
> + -- Restart the iterator.
> + iterators[rs_uuid] = nil
> + end
> + ::continue::
> end
> -
> - lfiber.sleep(deadline - lfiber.clock())
> + local unknown_bucket_count
> + repeat
> + unknown_bucket_count =
> + router.total_bucket_count - router.known_bucket_count
> + if unknown_bucket_count == 0 then
> + if mode ~= 'idle' then
> + log.info('Discovery enters idle mode, all buckets are '..
> + 'known. Discovery works with %s seconds '..
> + 'interval now', consts.DISCOVERY_IDLE_INTERVAL)
> + mode = 'idle'
> + end
> + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
> + elseif not next(router.replicasets) then
> + if mode ~= 'idle' then
> + log.info('Discovery enters idle mode because '..
> + 'configuration does not have replicasets. '..
> + 'Retries will happen with %s seconds interval',
> + consts.DISCOVERY_IDLE_INTERVAL)
> + mode = 'idle'
> + end
> + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
> + else
> + if mode ~= 'aggressive' then
> + log.info('Start aggressive discovery, %s buckets are '..
> + 'unknown. Discovery works with %s seconds '..
> + 'interval', unknown_bucket_count,
> + consts.DISCOVERY_WORK_INTERVAL)
> + mode = 'aggressive'
> + end
> + lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL)
> + break
> + end
> + while M.errinj.ERRINJ_LONG_DISCOVERY do
> + M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
> + lfiber.sleep(0.01)
> + end
> + until next(router.replicasets)
> end
> end
>
> @@ -355,9 +435,9 @@ local function discovery_set(router, new_mode)
> router.discovery_mode = new_mode
> if router.discovery_fiber ~= nil then
> pcall(router.discovery_fiber.cancel, router.discovery_fiber)
> + router.discovery_fiber = nil
> end
> if new_mode == 'off' then
> - router.discovery_fiber = nil
> return
> end
> router.discovery_fiber = util.reloadable_fiber_create(
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 73c6740..c6a78fe 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -75,6 +75,7 @@ if not M then
> ERRINJ_RECEIVE_PARTIALLY = false,
> ERRINJ_NO_RECOVERY = false,
> ERRINJ_UPGRADE = false,
> + ERRINJ_DISCOVERY = false,
> },
> -- This counter is used to restart background fibers with
> -- new reloaded code.
> @@ -1110,10 +1111,58 @@ local function bucket_collect(bucket_id)
> return data
> end
>
> +-- Discovery used by routers. It returns limited number of
> +-- buckets to avoid stalls when _bucket is huge.
> +local function buckets_discovery_extended(opts)
> + local limit = consts.BUCKET_CHUNK_SIZE
> + local buckets = table.new(limit, 0)
> + local active = consts.BUCKET.ACTIVE
> + local pinned = consts.BUCKET.PINNED
> + local next_from
> + local errcnt = M.errinj.ERRINJ_DISCOVERY
> + if errcnt then
> + if errcnt > 0 then
> + M.errinj.ERRINJ_DISCOVERY = errcnt - 1
> + if errcnt % 2 == 0 then
> + box.error(box.error.INJECTION, 'discovery')
> + end
> + else
> + M.errinj.ERRINJ_DISCOVERY = false
> + end
> + end
> + -- No way to select by {status, id}, because there are two
> + -- statuses to select. A router would need to maintain a
> + -- separate iterator for each status it wants to get. This may
> + -- be implemented in future. But _bucket space anyway 99% of
> + -- time contains only active and pinned buckets. So there is
> + -- no big benefit in optimizing that. Perhaps a compound index
> + -- {status, id} could help too.
> + for _, bucket in box.space._bucket:pairs({opts.from},
> + {iterator = box.index.GE}) do
> + local status = bucket.status
> + if status == active or status == pinned then
> + table.insert(buckets, bucket.id)
> + end
> + limit = limit - 1
> + if limit == 0 then
> + next_from = bucket.id + 1
> + break
> + end
> + end
> + -- Buckets list can even be empty, if all buckets in the
> + -- scanned chunk are not active/pinned. But next_from still
> + -- should be returned. So as the router could request more.
> + return {buckets = buckets, next_from = next_from}
> +end
> +
> --
> -- Collect array of active bucket identifiers for discovery.
> --
> -local function buckets_discovery()
> +local function buckets_discovery(opts)
> + if opts then
> + -- Private method. Is not documented intentionally.
> + return buckets_discovery_extended(opts)
> + end
> local ret = {}
> local status = box.space._bucket.index.status
> for _, bucket in status:pairs({consts.BUCKET.ACTIVE}) do
>
More information about the Tarantool-patches
mailing list