[Tarantool-patches] [PATCH vshard 6/7] router: make discovery smoother in a big cluster

Oleg Babin olegrok at tarantool.org
Mon May 4 17:26:56 MSK 2020


Thanks for answers and changes! LGTM.
I left my answers and one nit to new patch diff.

On 02/05/2020 23:12, Vladislav Shpilevoy wrote:
> Thanks for the review!
> 
> On 01/05/2020 19:01, Oleg Babin wrote:
>> Thanks for your patch! See my comments below.
>>
>> On 01/05/2020 03:16, Vladislav Shpilevoy wrote:
>>> Router does discovery once per 10 seconds. Discovery sends a
>>> request to each replicaset to download all pinned and active
>>> buckets from there. When there are millions of buckets, that
>>> becomes a long operation taking seconds, during which the storage
>>> is unresponsive.
>>>
>>> The patch makes discovery work step by step, downloading not more
>>> than 1000 buckets at a time. That gives the storage time to
>>> process other requests.
>>>
>>> Moreover, discovery now has some kind of 'state'. For each
>>> replicaset it keeps an iterator which is moved by 1k buckets on
>>> every successfully discovered bucket batch. It means, that if on a
>>> replicaset with 1 000 000 buckets discovery fails after 999 999
>>> buckets are already discovered, it won't start from 0. It will
>>> retry from the old position.
>>
>> Could you provide a test for such case?
> 
> No problem.
> ... > I couldn't come up with a sane test on whether router continues
> discovery from exactly the same place where it got an error. I would
> need to insert some extraordinary weird injections into router to
> collect statistics of that kind. But the test at least shows, that an
> error does not prevent full discovery eventually.
> 

Thanks, I believe it's enough.

>>> However, still there is space for improvement. Discovery could
>>> avoid downloading anything after all is downloaded, if it could
>>> somehow see, if bucket space is not changed. Unfortunately it is
>>> not so easy, since bucket generation (version of _bucket space)
>>> is not persisted. So after instance restart it is always equal to
>>> bucket count.
>>
>> Is there an issue for that?
> 
> I was hoping you wouldn't ask :D
> 
> I don't really see how that optimization could be implemented in a sane
> way in terms of simplicity. Looks like a storage would need to send
> a bucket generation + some timestamp to protect form the case when
> after restart bucket set is different, but generation is the same.
> 
> I was going to wait until somebody explicitly asks for it, if
> 'discovery_mode = "once"' won't be enough.
> 
> But ok, I was caught. Here is the issue:
> https://github.com/tarantool/vshard/issues/238
> 

Hah, I don't know but when I read this part of commit message I thought 
about trigger on the "_bucket" space that could update some "generation" 
value may be in "_schema" space. I don't know could it be appropriate 
way to implement this feature but I asked to make sure that this propose 
won't be lost.

>>> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>>> index 6d88153..26ea85b 100644
>>> --- a/vshard/router/init.lua
>>> +++ b/vshard/router/init.lua
>>> @@ -250,50 +250,127 @@ if util.version_is_at_least(1, 10, 0) then
>>>    discovery_f = function(router)
>>>        local module_version = M.module_version
>>>        assert(router.discovery_mode == 'on')
>>> +    local iterators = {}
>>> +    local opts = {is_async = true}
>>> +    local mode
>>>        while module_version == M.module_version do
>>> -        while not next(router.replicasets) do
>>> -            lfiber.sleep(consts.DISCOVERY_INTERVAL)
>>> -        end
>>> -        if module_version ~= M.module_version then
>>> -            return
>>> -        end
>>>            -- Just typical map reduce - send request to each
>>> -        -- replicaset in parallel, and collect responses.
>>> -        local pending = {}
>>> -        local opts = {is_async = true}
>>> -        local args = {}
>>> -        for rs_uuid, replicaset in pairs(router.replicasets) do
>>> +        -- replicaset in parallel, and collect responses. Many
>>> +        -- requests probably will be needed for each replicaset.
>>> +        --
>>> +        -- Step 1: create missing iterators, in case this is a
>>> +        -- first discovery iteration, or some replicasets were
>>> +        -- added after the router is started.
>>> +        for rs_uuid in pairs(router.replicasets) do
>>> +            local iter = iterators[rs_uuid]
>>> +            if not iter then
>>> +                iterators[rs_uuid] = {
>>> +                    args = {{from = 1}},
>>> +                    future = nil,
>>> +                }
>>> +            end
>>> +        end
>>> +        -- Step 2: map stage - send parallel requests for every
>>> +        -- iterator, prune orphan iterators whose replicasets were
>>> +        -- removed.
>>> +        for rs_uuid, iter in pairs(iterators) do
>>> +            local replicaset = router.replicasets[rs_uuid]
>>> +            if not replicaset then
>>> +                log.warn('Replicaset %s was removed during discovery', rs_uuid)
>>> +                iterators[rs_uuid] = nil
>>> +                goto continue
>>> +            end
>>>                local future, err =
>>> -                replicaset:callro('vshard.storage.buckets_discovery',
>>> -                                  args, opts)
>>> +                replicaset:callro('vshard.storage.buckets_discovery', iter.args,
>>> +                                  opts)
>>>                if not future then
>>> -                log.warn('Error during discovery %s: %s', rs_uuid, err)
>>> -            else
>>> -                pending[rs_uuid] = future
>>> +                log.warn('Error during discovery %s, retry will be done '..
>>> +                         'later: %s', rs_uuid, err)
>>> +                goto continue
>>> +            end
>>> +            iter.future = future
>>> +            -- Don't spam many requests at once. Give
>>> +            -- storages time to handle them and other
>>> +            -- requests.
>>> +            lfiber.sleep(consts.DISCOVERY_WORK_STEP)
>>> +            if module_version ~= M.module_version then
>>> +                return
>>>                end
>>
>>
>> Is it possible to place such checks to some "common" places. At start/end of each iteration or between steps. This looks strange and a bit unobvous to do such checks after each timeout.
> 
> Purpose is to stop the fiber as soon as possible in case a reload
> happened. So moving it to a common place is not really possible.
> It would make it work longer with the old code after reload. But
> if we keep the checks, I still need to do then after/before any
> long yield.
> 

My main concern is that such things could be easily missed in future. 
But I agree that we should left this function early as possible.

>>> +            ::continue::
>>>            end
>>> -
>>> -        local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL
>>> -        for rs_uuid, p in pairs(pending) do
>>> +        -- Step 3: reduce stage - collect responses, restart
>>> +        -- iterators which reached the end.
>>> +        for rs_uuid, iter in pairs(iterators) do
>>>                lfiber.yield()
>>> -            local timeout = deadline - lfiber.clock()
>>> -            local buckets, err = p:wait_result(timeout)
>>> -            while M.errinj.ERRINJ_LONG_DISCOVERY do
>>> -                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
>>> -                lfiber.sleep(0.01)
>>> +            local future = iter.future
>>> +            if not future then
>>> +                goto continue
>>>                end
>>> -            local replicaset = router.replicasets[rs_uuid]
>>> -            if not buckets then
>>> -                p:discard()
>>> -                log.warn('Error during discovery %s: %s', rs_uuid, err)
>>> -            elseif module_version ~= M.module_version then
>>> +            local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT)
>>> +            if module_version ~= M.module_version then
>>
>> Does it have sence to call "discard" before "return"?
> 
> Yes. Discard removes the entry from netbox's internal table of requests
> waiting for a result. I don't know how long the storage won't respond on
> that, so better free the memory now. The entry is not removed automatically
> in case it was a timeout error.
> 
>>> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
>>> index 73c6740..0050b96 100644
>>> --- a/vshard/storage/init.lua
>>> +++ b/vshard/storage/init.lua
>>> @@ -1110,10 +1110,47 @@ local function bucket_collect(bucket_id)
>>>        return data
>>>    end
>>>    +-- Discovery used by routers. It returns limited number of
>>> +-- buckets to avoid stalls when _bucket is huge.
>>> +local function buckets_discovery_extended(opts)
>>> +    local limit = consts.BUCKET_CHUNK_SIZE
>>> +    local buckets = table.new(limit, 0)
>>> +    local active = consts.BUCKET.ACTIVE
>>> +    local pinned = consts.BUCKET.PINNED
>>> +    local next_from
>>> +    -- No way to select by {status, id}, because there are two
>>> +    -- statuses to select. A router would need to maintain a
>>> +    -- separate iterator for each status it wants to get. This may
>>> +    -- be implemented in future. But _bucket space anyway 99% of
>>> +    -- time contains only active and pinned buckets. So there is
>>> +    -- no big benefit in optimizing that. Perhaps a compound index
>>> +    -- {status, id} could help too.
>>> +    for _, bucket in box.space._bucket:pairs({opts.from},
>>> +                                             {iterator = box.index.GE}) do
>>> +        local status = bucket.status
>>> +        if status == active or status == pinned then
>>> +            table.insert(buckets, bucket.id)
>>> +        end
>>> +        limit = limit - 1
>>
>> It's a bit strange after words about "99% time", I propose to move limit decrease under if condition.
> 
> But still there is this 1%, when buckets start moving. When bucket count
> is millions, rebalancing is likely to move thousands of buckets, likely
> from one range. For example, buckets from 1 to 10k can be moved. If in
> that moment will arrive a discovery request, it will stuck for a long time
> iterating over the sent and garbage buckets.
> 
> Rebalancing is a heavy thing, and such additional load would make the
> cluster feel even worse.
> 
> 99% here is rather about that it is not a problem to sometimes return an
> empty bucket set. Not about allowing perf problems in the other 1%.
> 

Agree

> 
> During working on your comments I found that during aggressive discovery I
> added sleep in a wrong place. Fixed below:
> 
> ====================
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 26ea85b..28437e3 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -358,11 +358,14 @@ discovery_f = function(router)
>                       mode = 'idle'
>                   end
>                   lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
> -            elseif mode ~= 'aggressive' then
> -                log.info('Start aggressive discovery, %s buckets are unknown. '..
> -                         'Discovery works with %s seconds interval',
> -                         unknown_bucket_count, consts.DISCOVERY_WORK_INTERVAL)
> -                mode = 'aggressive'
> +            else
> +                if mode ~= 'aggressive' then
> +                    log.info('Start aggressive discovery, %s buckets are '..
> +                             'unknown. Discovery works with %s seconds '..
> +                             'interval', unknown_bucket_count,
> +                             consts.DISCOVERY_WORK_INTERVAL)
> +                    mode = 'aggressive'
> +                end
>                   lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL)
>                   break
>               end
> 
> ====================
> 
> Since the changes are big, below is the whole new commit:
> 
> ====================
> 
>      router: make discovery smoother in a big cluster
>      
>      Router does discovery once per 10 seconds. Discovery sends a
>      request to each replicaset to download all pinned and active
>      buckets from there. When there are millions of buckets, that
>      becomes a long operation taking seconds, during which the storage
>      is unresponsive.
>      
>      The patch makes discovery work step by step, downloading not more
>      than 1000 buckets at a time. That gives the storage time to
>      process other requests.
>      
>      Moreover, discovery now has some kind of 'state'. For each
>      replicaset it keeps an iterator which is moved by 1k buckets on
>      every successfully discovered bucket batch. It means, that if on a
>      replicaset with 1 000 000 buckets discovery fails after 999 999
>      buckets are already discovered, it won't start from 0. It will
>      retry from the old position.
>      
>      However, still there is space for improvement. Discovery could
>      avoid downloading anything after all is downloaded, if it could
>      somehow see, if bucket space is not changed. Unfortunately it is
>      not so easy, since bucket generation (version of _bucket space)
>      is not persisted. So after instance restart it is always equal to
>      bucket count.
>      
>      Part of #210
> 
> diff --git a/test/router/reload.result b/test/router/reload.result
> index 3ba900a..8fe99ba 100644
> --- a/test/router/reload.result
> +++ b/test/router/reload.result
> @@ -44,7 +44,10 @@ vshard.router.bootstrap()
>   while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
>   ---
>   ...
> -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +---
> +...
> +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
>   ---
>   ...
>   --
> diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
> index 5ed5690..abcbc09 100644
> --- a/test/router/reload.test.lua
> +++ b/test/router/reload.test.lua
> @@ -15,7 +15,8 @@ fiber = require('fiber')
>   vshard.router.bootstrap()
>   
>   while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
> -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
>   
>   --
>   -- Gh-72: allow reload. Test simple reload, error during
> diff --git a/test/router/router.result b/test/router/router.result
> index df7be4a..b2efd6d 100644
> --- a/test/router/router.result
> +++ b/test/router/router.result
> @@ -943,9 +943,9 @@ calculate_known_buckets()
>   ---
>   - 3000
>   ...
> -test_run:grep_log('router_1', 'was 1, became 1500')
> +test_run:grep_log('router_1', 'was 1, became 1000')
>   ---
> -- was 1, became 1500
> +- was 1, became 1000
>   ...
>   info = vshard.router.info()
>   ---
> diff --git a/test/router/router.test.lua b/test/router/router.test.lua
> index 97dce49..154310b 100644
> --- a/test/router/router.test.lua
> +++ b/test/router/router.test.lua
> @@ -316,7 +316,7 @@ vshard.storage.bucket_pin(first_active)
>   _ = test_run:switch('router_1')
>   wait_discovery()
>   calculate_known_buckets()
> -test_run:grep_log('router_1', 'was 1, became 1500')
> +test_run:grep_log('router_1', 'was 1, became 1000')
>   info = vshard.router.info()
>   info.bucket
>   info.alerts
> diff --git a/test/router/router2.result b/test/router/router2.result
> index 556f749..0ad5a21 100644
> --- a/test/router/router2.result
> +++ b/test/router/router2.result
> @@ -123,6 +123,66 @@ f1:status(), f2, f3:status(), f4:status(), f5, f6:status()
>    | - suspended
>    | ...
>   
> +-- Errored discovery continued successfully after errors are gone.
> +vshard.router.bootstrap()
> + | ---
> + | - true
> + | ...
> +vshard.router.discovery_set('off')
> + | ---
> + | ...
> +vshard.router._route_map_clear()
> + | ---
> + | ...
> +
> +-- Discovery requests 2 and 4 will fail on storages.
> +util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}},                    \
> +               'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4')
> + | ---
> + | ...
> +
> +vshard.router.info().bucket.unknown
> + | ---
> + | - 3000
> + | ...
> +vshard.router.discovery_set('on')
> + | ---
> + | ...
> +function continue_discovery()                                                   \
> +    local res = vshard.router.info().bucket.unknown == 0                        \
> +    if not res then                                                             \
> +        vshard.router.discovery_wakeup()                                        \
> +    end                                                                         \
> +    return res                                                                  \
> +end
> + | ---
> + | ...
> +test_run:wait_cond(continue_discovery)
> + | ---
> + | - true
> + | ...
> +vshard.router.info().bucket.unknown
> + | ---
> + | - 0
> + | ...
> +
> +-- Discovery injections should be reset meaning they were returned
> +-- needed number of times.
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +vshard.storage.internal.errinj.ERRINJ_DISCOVERY
> + | ---
> + | - 0
> + | ...
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +vshard.storage.internal.errinj.ERRINJ_DISCOVERY
> + | ---
> + | - 0
> + | ...
> +
>   _ = test_run:switch("default")
>    | ---
>    | ...
> diff --git a/test/router/router2.test.lua b/test/router/router2.test.lua
> index 33f4d3e..ef05f8c 100644
> --- a/test/router/router2.test.lua
> +++ b/test/router/router2.test.lua
> @@ -43,6 +43,34 @@ vshard.router.static.discovery_fiber:status()
>   
>   f1:status(), f2, f3:status(), f4:status(), f5, f6:status()
>   
> +-- Errored discovery continued successfully after errors are gone.
> +vshard.router.bootstrap()
> +vshard.router.discovery_set('off')
> +vshard.router._route_map_clear()
> +
> +-- Discovery requests 2 and 4 will fail on storages.
> +util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}},                    \
> +               'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4')
> +
> +vshard.router.info().bucket.unknown
> +vshard.router.discovery_set('on')
> +function continue_discovery()                                                   \
> +    local res = vshard.router.info().bucket.unknown == 0                        \
> +    if not res then                                                             \
> +        vshard.router.discovery_wakeup()                                        \
> +    end                                                                         \
> +    return res                                                                  \
> +end
> +test_run:wait_cond(continue_discovery)
> +vshard.router.info().bucket.unknown
> +
> +-- Discovery injections should be reset meaning they were returned
> +-- needed number of times.
> +_ = test_run:switch('storage_1_a')
> +vshard.storage.internal.errinj.ERRINJ_DISCOVERY
> +_ = test_run:switch('storage_2_a')
> +vshard.storage.internal.errinj.ERRINJ_DISCOVERY
> +
>   _ = test_run:switch("default")
>   _ = test_run:cmd("stop server router_1")
>   _ = test_run:cmd("cleanup server router_1")
> diff --git a/test/router/wrong_config.result b/test/router/wrong_config.result
> index 56db9e8..92353c3 100644
> --- a/test/router/wrong_config.result
> +++ b/test/router/wrong_config.result
> @@ -45,7 +45,10 @@ cfg.bucket_count = 1000
>   r = vshard.router.new('gh-179', cfg)
>   ---
>   ...
> -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
> +while r:info().bucket.available_rw ~= 3000 do                                   \
> +    r:discovery_wakeup()                                                        \
> +    fiber.sleep(0.1)                                                            \
> +end
>   ---
>   ...
>   i = r:info()
> diff --git a/test/router/wrong_config.test.lua b/test/router/wrong_config.test.lua
> index 62ef30d..174b373 100644
> --- a/test/router/wrong_config.test.lua
> +++ b/test/router/wrong_config.test.lua
> @@ -18,7 +18,10 @@ vshard.router.bootstrap()
>   --
>   cfg.bucket_count = 1000
>   r = vshard.router.new('gh-179', cfg)
> -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
> +while r:info().bucket.available_rw ~= 3000 do                                   \
> +    r:discovery_wakeup()                                                        \
> +    fiber.sleep(0.1)                                                            \
> +end
>   i = r:info()
>   i.bucket
>   i.alerts
> diff --git a/vshard/consts.lua b/vshard/consts.lua
> index 5391c0f..a6a8c1b 100644
> --- a/vshard/consts.lua
> +++ b/vshard/consts.lua
> @@ -40,5 +40,8 @@ return {
>       DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5;
>       RECOVERY_INTERVAL = 5;
>       COLLECT_LUA_GARBAGE_INTERVAL = 100;
> -    DISCOVERY_INTERVAL = 10;
> +    DISCOVERY_IDLE_INTERVAL = 10,
> +    DISCOVERY_WORK_INTERVAL = 1,
> +    DISCOVERY_WORK_STEP = 0.01,
> +    DISCOVERY_TIMEOUT = 10,
>   }

Nit: here commas and semicolons are mixed. I think we should support it 
in consistent state - only commas or only semicolons.

> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 6d88153..28437e3 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -250,50 +250,130 @@ if util.version_is_at_least(1, 10, 0) then
>   discovery_f = function(router)
>       local module_version = M.module_version
>       assert(router.discovery_mode == 'on')
> +    local iterators = {}
> +    local opts = {is_async = true}
> +    local mode
>       while module_version == M.module_version do
> -        while not next(router.replicasets) do
> -            lfiber.sleep(consts.DISCOVERY_INTERVAL)
> -        end
> -        if module_version ~= M.module_version then
> -            return
> -        end
>           -- Just typical map reduce - send request to each
> -        -- replicaset in parallel, and collect responses.
> -        local pending = {}
> -        local opts = {is_async = true}
> -        local args = {}
> -        for rs_uuid, replicaset in pairs(router.replicasets) do
> +        -- replicaset in parallel, and collect responses. Many
> +        -- requests probably will be needed for each replicaset.
> +        --
> +        -- Step 1: create missing iterators, in case this is a
> +        -- first discovery iteration, or some replicasets were
> +        -- added after the router is started.
> +        for rs_uuid in pairs(router.replicasets) do
> +            local iter = iterators[rs_uuid]
> +            if not iter then
> +                iterators[rs_uuid] = {
> +                    args = {{from = 1}},
> +                    future = nil,
> +                }
> +            end
> +        end
> +        -- Step 2: map stage - send parallel requests for every
> +        -- iterator, prune orphan iterators whose replicasets were
> +        -- removed.
> +        for rs_uuid, iter in pairs(iterators) do
> +            local replicaset = router.replicasets[rs_uuid]
> +            if not replicaset then
> +                log.warn('Replicaset %s was removed during discovery', rs_uuid)
> +                iterators[rs_uuid] = nil
> +                goto continue
> +            end
>               local future, err =
> -                replicaset:callro('vshard.storage.buckets_discovery',
> -                                  args, opts)
> +                replicaset:callro('vshard.storage.buckets_discovery', iter.args,
> +                                  opts)
>               if not future then
> -                log.warn('Error during discovery %s: %s', rs_uuid, err)
> -            else
> -                pending[rs_uuid] = future
> +                log.warn('Error during discovery %s, retry will be done '..
> +                         'later: %s', rs_uuid, err)
> +                goto continue
>               end
> +            iter.future = future
> +            -- Don't spam many requests at once. Give
> +            -- storages time to handle them and other
> +            -- requests.
> +            lfiber.sleep(consts.DISCOVERY_WORK_STEP)
> +            if module_version ~= M.module_version then
> +                return
> +            end
> +            ::continue::
>           end
> -
> -        local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL
> -        for rs_uuid, p in pairs(pending) do
> +        -- Step 3: reduce stage - collect responses, restart
> +        -- iterators which reached the end.
> +        for rs_uuid, iter in pairs(iterators) do
>               lfiber.yield()
> -            local timeout = deadline - lfiber.clock()
> -            local buckets, err = p:wait_result(timeout)
> -            while M.errinj.ERRINJ_LONG_DISCOVERY do
> -                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
> -                lfiber.sleep(0.01)
> +            local future = iter.future
> +            if not future then
> +                goto continue
>               end
> -            local replicaset = router.replicasets[rs_uuid]
> -            if not buckets then
> -                p:discard()
> -                log.warn('Error during discovery %s: %s', rs_uuid, err)
> -            elseif module_version ~= M.module_version then
> +            local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT)
> +            if module_version ~= M.module_version then
>                   return
> -            elseif replicaset then
> -                discovery_handle_buckets(router, replicaset, buckets[1])
>               end
> +            if not result then
> +                future:discard()
> +                log.warn('Error during discovery %s, retry will be done '..
> +                         'later: %s', rs_uuid, err)
> +                goto continue
> +            end
> +            local replicaset = router.replicasets[rs_uuid]
> +            if not replicaset then
> +                iterators[rs_uuid] = nil
> +                log.warn('Replicaset %s was removed during discovery', rs_uuid)
> +                goto continue
> +            end
> +            result = result[1]
> +            -- Buckets are returned as plain array by storages
> +            -- using old vshard version. But if .buckets is set,
> +            -- this is a new storage.
> +            discovery_handle_buckets(router, replicaset,
> +                                     result.buckets or result)
> +            local discovery_args = iter.args[1]
> +            discovery_args.from = result.next_from
> +            if not result.next_from then
> +                -- Nil next_from means no more buckets to get.
> +                -- Restart the iterator.
> +                iterators[rs_uuid] = nil
> +            end
> +            ::continue::
>           end
> -
> -        lfiber.sleep(deadline - lfiber.clock())
> +        local unknown_bucket_count
> +        repeat
> +            unknown_bucket_count =
> +                router.total_bucket_count - router.known_bucket_count
> +            if unknown_bucket_count == 0 then
> +                if mode ~= 'idle' then
> +                    log.info('Discovery enters idle mode, all buckets are '..
> +                             'known. Discovery works with %s seconds '..
> +                             'interval now', consts.DISCOVERY_IDLE_INTERVAL)
> +                    mode = 'idle'
> +                end
> +                lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
> +            elseif not next(router.replicasets) then
> +                if mode ~= 'idle' then
> +                    log.info('Discovery enters idle mode because '..
> +                             'configuration does not have replicasets. '..
> +                             'Retries will happen with %s seconds interval',
> +                             consts.DISCOVERY_IDLE_INTERVAL)
> +                    mode = 'idle'
> +                end
> +                lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
> +            else
> +                if mode ~= 'aggressive' then
> +                    log.info('Start aggressive discovery, %s buckets are '..
> +                             'unknown. Discovery works with %s seconds '..
> +                             'interval', unknown_bucket_count,
> +                             consts.DISCOVERY_WORK_INTERVAL)
> +                    mode = 'aggressive'
> +                end
> +                lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL)
> +                break
> +            end
> +            while M.errinj.ERRINJ_LONG_DISCOVERY do
> +                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
> +                lfiber.sleep(0.01)
> +            end
> +        until next(router.replicasets)
>       end
>   end
>   
> @@ -355,9 +435,9 @@ local function discovery_set(router, new_mode)
>       router.discovery_mode = new_mode
>       if router.discovery_fiber ~= nil then
>           pcall(router.discovery_fiber.cancel, router.discovery_fiber)
> +        router.discovery_fiber = nil
>       end
>       if new_mode == 'off' then
> -        router.discovery_fiber = nil
>           return
>       end
>       router.discovery_fiber = util.reloadable_fiber_create(
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 73c6740..c6a78fe 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -75,6 +75,7 @@ if not M then
>               ERRINJ_RECEIVE_PARTIALLY = false,
>               ERRINJ_NO_RECOVERY = false,
>               ERRINJ_UPGRADE = false,
> +            ERRINJ_DISCOVERY = false,
>           },
>           -- This counter is used to restart background fibers with
>           -- new reloaded code.
> @@ -1110,10 +1111,58 @@ local function bucket_collect(bucket_id)
>       return data
>   end
>   
> +-- Discovery used by routers. It returns limited number of
> +-- buckets to avoid stalls when _bucket is huge.
> +local function buckets_discovery_extended(opts)
> +    local limit = consts.BUCKET_CHUNK_SIZE
> +    local buckets = table.new(limit, 0)
> +    local active = consts.BUCKET.ACTIVE
> +    local pinned = consts.BUCKET.PINNED
> +    local next_from
> +    local errcnt = M.errinj.ERRINJ_DISCOVERY
> +    if errcnt then
> +        if errcnt > 0 then
> +            M.errinj.ERRINJ_DISCOVERY = errcnt - 1
> +            if errcnt % 2 == 0 then
> +                box.error(box.error.INJECTION, 'discovery')
> +            end
> +        else
> +            M.errinj.ERRINJ_DISCOVERY = false
> +        end
> +    end
> +    -- No way to select by {status, id}, because there are two
> +    -- statuses to select. A router would need to maintain a
> +    -- separate iterator for each status it wants to get. This may
> +    -- be implemented in future. But _bucket space anyway 99% of
> +    -- time contains only active and pinned buckets. So there is
> +    -- no big benefit in optimizing that. Perhaps a compound index
> +    -- {status, id} could help too.
> +    for _, bucket in box.space._bucket:pairs({opts.from},
> +                                             {iterator = box.index.GE}) do
> +        local status = bucket.status
> +        if status == active or status == pinned then
> +            table.insert(buckets, bucket.id)
> +        end
> +        limit = limit - 1
> +        if limit == 0 then
> +            next_from = bucket.id + 1
> +            break
> +        end
> +    end
> +    -- Buckets list can even be empty, if all buckets in the
> +    -- scanned chunk are not active/pinned. But next_from still
> +    -- should be returned. So as the router could request more.
> +    return {buckets = buckets, next_from = next_from}
> +end
> +
>   --
>   -- Collect array of active bucket identifiers for discovery.
>   --
> -local function buckets_discovery()
> +local function buckets_discovery(opts)
> +    if opts then
> +        -- Private method. Is not documented intentionally.
> +        return buckets_discovery_extended(opts)
> +    end
>       local ret = {}
>       local status = box.space._bucket.index.status
>       for _, bucket in status:pairs({consts.BUCKET.ACTIVE}) do
> 


More information about the Tarantool-patches mailing list