[Tarantool-patches] [PATCH vshard 6/7] router: make discovery smoother in a big cluster

Oleg Babin olegrok at tarantool.org
Fri May 1 20:01:49 MSK 2020


Thanks for your patch! See my comments below.

On 01/05/2020 03:16, Vladislav Shpilevoy wrote:
> Router does discovery once per 10 seconds. Discovery sends a
> request to each replicaset to download all pinned and active
> buckets from there. When there are millions of buckets, that
> becomes a long operation taking seconds, during which the storage
> is unresponsive.
> 
> The patch makes discovery work step by step, downloading not more
> than 1000 buckets at a time. That gives the storage time to
> process other requests.
> 
> Moreover, discovery now has some kind of 'state'. For each
> replicaset it keeps an iterator which is moved by 1k buckets on
> every successfully discovered bucket batch. It means, that if on a
> replicaset with 1 000 000 buckets discovery fails after 999 999
> buckets are already discovered, it won't start from 0. It will
> retry from the old position.

Could you provide a test for such case?


> However, still there is space for improvement. Discovery could
> avoid downloading anything after all is downloaded, if it could
> somehow see, if bucket space is not changed. Unfortunately it is
> not so easy, since bucket generation (version of _bucket space)
> is not persisted. So after instance restart it is always equal to
> bucket count.

Is there an issue for that?

> Part of #210
> ---
>   test/router/reload.result         |   5 +-
>   test/router/reload.test.lua       |   3 +-
>   test/router/router.result         |   4 +-
>   test/router/router.test.lua       |   2 +-
>   test/router/wrong_config.result   |   5 +-
>   test/router/wrong_config.test.lua |   5 +-
>   vshard/consts.lua                 |   5 +-
>   vshard/router/init.lua            | 145 +++++++++++++++++++++++-------
>   vshard/storage/init.lua           |  39 +++++++-
>   9 files changed, 170 insertions(+), 43 deletions(-)
> 
> diff --git a/test/router/reload.result b/test/router/reload.result
> index 3ba900a..8fe99ba 100644
> --- a/test/router/reload.result
> +++ b/test/router/reload.result
> @@ -44,7 +44,10 @@ vshard.router.bootstrap()
>   while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
>   ---
>   ...
> -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +---
> +...
> +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
>   ---
>   ...
>   --
> diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
> index 5ed5690..abcbc09 100644
> --- a/test/router/reload.test.lua
> +++ b/test/router/reload.test.lua
> @@ -15,7 +15,8 @@ fiber = require('fiber')
>   vshard.router.bootstrap()
>   
>   while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
> -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
>   
>   --
>   -- Gh-72: allow reload. Test simple reload, error during
> diff --git a/test/router/router.result b/test/router/router.result
> index df7be4a..b2efd6d 100644
> --- a/test/router/router.result
> +++ b/test/router/router.result
> @@ -943,9 +943,9 @@ calculate_known_buckets()
>   ---
>   - 3000
>   ...
> -test_run:grep_log('router_1', 'was 1, became 1500')
> +test_run:grep_log('router_1', 'was 1, became 1000')
>   ---
> -- was 1, became 1500
> +- was 1, became 1000
>   ...
>   info = vshard.router.info()
>   ---
> diff --git a/test/router/router.test.lua b/test/router/router.test.lua
> index 97dce49..154310b 100644
> --- a/test/router/router.test.lua
> +++ b/test/router/router.test.lua
> @@ -316,7 +316,7 @@ vshard.storage.bucket_pin(first_active)
>   _ = test_run:switch('router_1')
>   wait_discovery()
>   calculate_known_buckets()
> -test_run:grep_log('router_1', 'was 1, became 1500')
> +test_run:grep_log('router_1', 'was 1, became 1000')
>   info = vshard.router.info()
>   info.bucket
>   info.alerts
> diff --git a/test/router/wrong_config.result b/test/router/wrong_config.result
> index 56db9e8..92353c3 100644
> --- a/test/router/wrong_config.result
> +++ b/test/router/wrong_config.result
> @@ -45,7 +45,10 @@ cfg.bucket_count = 1000
>   r = vshard.router.new('gh-179', cfg)
>   ---
>   ...
> -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
> +while r:info().bucket.available_rw ~= 3000 do                                   \
> +    r:discovery_wakeup()                                                        \
> +    fiber.sleep(0.1)                                                            \
> +end
>   ---
>   ...
>   i = r:info()
> diff --git a/test/router/wrong_config.test.lua b/test/router/wrong_config.test.lua
> index 62ef30d..174b373 100644
> --- a/test/router/wrong_config.test.lua
> +++ b/test/router/wrong_config.test.lua
> @@ -18,7 +18,10 @@ vshard.router.bootstrap()
>   --
>   cfg.bucket_count = 1000
>   r = vshard.router.new('gh-179', cfg)
> -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
> +while r:info().bucket.available_rw ~= 3000 do                                   \
> +    r:discovery_wakeup()                                                        \
> +    fiber.sleep(0.1)                                                            \
> +end
>   i = r:info()
>   i.bucket
>   i.alerts
> diff --git a/vshard/consts.lua b/vshard/consts.lua
> index 5391c0f..a6a8c1b 100644
> --- a/vshard/consts.lua
> +++ b/vshard/consts.lua
> @@ -40,5 +40,8 @@ return {
>       DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5;
>       RECOVERY_INTERVAL = 5;
>       COLLECT_LUA_GARBAGE_INTERVAL = 100;
> -    DISCOVERY_INTERVAL = 10;
> +    DISCOVERY_IDLE_INTERVAL = 10,
> +    DISCOVERY_WORK_INTERVAL = 1,
> +    DISCOVERY_WORK_STEP = 0.01,
> +    DISCOVERY_TIMEOUT = 10,
>   }
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 6d88153..26ea85b 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -250,50 +250,127 @@ if util.version_is_at_least(1, 10, 0) then
>   discovery_f = function(router)
>       local module_version = M.module_version
>       assert(router.discovery_mode == 'on')
> +    local iterators = {}
> +    local opts = {is_async = true}
> +    local mode
>       while module_version == M.module_version do
> -        while not next(router.replicasets) do
> -            lfiber.sleep(consts.DISCOVERY_INTERVAL)
> -        end
> -        if module_version ~= M.module_version then
> -            return
> -        end
>           -- Just typical map reduce - send request to each
> -        -- replicaset in parallel, and collect responses.
> -        local pending = {}
> -        local opts = {is_async = true}
> -        local args = {}
> -        for rs_uuid, replicaset in pairs(router.replicasets) do
> +        -- replicaset in parallel, and collect responses. Many
> +        -- requests probably will be needed for each replicaset.
> +        --
> +        -- Step 1: create missing iterators, in case this is a
> +        -- first discovery iteration, or some replicasets were
> +        -- added after the router is started.
> +        for rs_uuid in pairs(router.replicasets) do
> +            local iter = iterators[rs_uuid]
> +            if not iter then
> +                iterators[rs_uuid] = {
> +                    args = {{from = 1}},
> +                    future = nil,
> +                }
> +            end
> +        end
> +        -- Step 2: map stage - send parallel requests for every
> +        -- iterator, prune orphan iterators whose replicasets were
> +        -- removed.
> +        for rs_uuid, iter in pairs(iterators) do
> +            local replicaset = router.replicasets[rs_uuid]
> +            if not replicaset then
> +                log.warn('Replicaset %s was removed during discovery', rs_uuid)
> +                iterators[rs_uuid] = nil
> +                goto continue
> +            end
>               local future, err =
> -                replicaset:callro('vshard.storage.buckets_discovery',
> -                                  args, opts)
> +                replicaset:callro('vshard.storage.buckets_discovery', iter.args,
> +                                  opts)
>               if not future then
> -                log.warn('Error during discovery %s: %s', rs_uuid, err)
> -            else
> -                pending[rs_uuid] = future
> +                log.warn('Error during discovery %s, retry will be done '..
> +                         'later: %s', rs_uuid, err)
> +                goto continue
> +            end
> +            iter.future = future
> +            -- Don't spam many requests at once. Give
> +            -- storages time to handle them and other
> +            -- requests.
> +            lfiber.sleep(consts.DISCOVERY_WORK_STEP)
> +            if module_version ~= M.module_version then
> +                return
>               end


Is it possible to place such checks to some "common" places. At 
start/end of each iteration or between steps. This looks strange and a 
bit unobvous to do such checks after each timeout.

> +            ::continue::
>           end
> -
> -        local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL
> -        for rs_uuid, p in pairs(pending) do
> +        -- Step 3: reduce stage - collect responses, restart
> +        -- iterators which reached the end.
> +        for rs_uuid, iter in pairs(iterators) do
>               lfiber.yield()
> -            local timeout = deadline - lfiber.clock()
> -            local buckets, err = p:wait_result(timeout)
> -            while M.errinj.ERRINJ_LONG_DISCOVERY do
> -                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
> -                lfiber.sleep(0.01)
> +            local future = iter.future
> +            if not future then
> +                goto continue
>               end
> -            local replicaset = router.replicasets[rs_uuid]
> -            if not buckets then
> -                p:discard()
> -                log.warn('Error during discovery %s: %s', rs_uuid, err)
> -            elseif module_version ~= M.module_version then
> +            local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT)
> +            if module_version ~= M.module_version then

Does it have sence to call "discard" before "return"?

>                   return
> -            elseif replicaset then
> -                discovery_handle_buckets(router, replicaset, buckets[1])
>               end
> +            if not result then
> +                future:discard()
> +                log.warn('Error during discovery %s, retry will be done '..
> +                         'later: %s', rs_uuid, err)
> +                goto continue
> +            end
> +            local replicaset = router.replicasets[rs_uuid]
> +            if not replicaset then
> +                iterators[rs_uuid] = nil
> +                log.warn('Replicaset %s was removed during discovery', rs_uuid)
> +                goto continue
> +            end
> +            result = result[1]
> +            -- Buckets are returned as plain array by storages
> +            -- using old vshard version. But if .buckets is set,
> +            -- this is a new storage.
> +            discovery_handle_buckets(router, replicaset,
> +                                     result.buckets or result)
> +            local discovery_args = iter.args[1]
> +            discovery_args.from = result.next_from
> +            if not result.next_from then
> +                -- Nil next_from means no more buckets to get.
> +                -- Restart the iterator.
> +                iterators[rs_uuid] = nil
> +            end
> +            ::continue::
>           end
> -
> -        lfiber.sleep(deadline - lfiber.clock())
> +        local unknown_bucket_count
> +        repeat
> +            unknown_bucket_count =
> +                router.total_bucket_count - router.known_bucket_count
> +            if unknown_bucket_count == 0 then
> +                if mode ~= 'idle' then
> +                    log.info('Discovery enters idle mode, all buckets are '..
> +                             'known. Discovery works with %s seconds '..
> +                             'interval now', consts.DISCOVERY_IDLE_INTERVAL)
> +                    mode = 'idle'
> +                end
> +                lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
> +            elseif not next(router.replicasets) then
> +                if mode ~= 'idle' then
> +                    log.info('Discovery enters idle mode because '..
> +                             'configuration does not have replicasets. '..
> +                             'Retries will happen with %s seconds interval',
> +                             consts.DISCOVERY_IDLE_INTERVAL)
> +                    mode = 'idle'
> +                end
> +                lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
> +            elseif mode ~= 'aggressive' then
> +                log.info('Start aggressive discovery, %s buckets are unknown. '..
> +                         'Discovery works with %s seconds interval',
> +                         unknown_bucket_count, consts.DISCOVERY_WORK_INTERVAL)
> +                mode = 'aggressive'
> +                lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL)
> +                break
> +            end
> +            while M.errinj.ERRINJ_LONG_DISCOVERY do
> +                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
> +                lfiber.sleep(0.01)
> +            end
> +        until next(router.replicasets)
>       end
>   end
>   
> @@ -355,9 +432,9 @@ local function discovery_set(router, new_mode)
>       router.discovery_mode = new_mode
>       if router.discovery_fiber ~= nil then
>           pcall(router.discovery_fiber.cancel, router.discovery_fiber)
> +        router.discovery_fiber = nil
>       end
>       if new_mode == 'off' then
> -        router.discovery_fiber = nil
>           return
>       end
>       router.discovery_fiber = util.reloadable_fiber_create(
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 73c6740..0050b96 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -1110,10 +1110,47 @@ local function bucket_collect(bucket_id)
>       return data
>   end
>   
> +-- Discovery used by routers. It returns limited number of
> +-- buckets to avoid stalls when _bucket is huge.
> +local function buckets_discovery_extended(opts)
> +    local limit = consts.BUCKET_CHUNK_SIZE
> +    local buckets = table.new(limit, 0)
> +    local active = consts.BUCKET.ACTIVE
> +    local pinned = consts.BUCKET.PINNED
> +    local next_from
> +    -- No way to select by {status, id}, because there are two
> +    -- statuses to select. A router would need to maintain a
> +    -- separate iterator for each status it wants to get. This may
> +    -- be implemented in future. But _bucket space anyway 99% of
> +    -- time contains only active and pinned buckets. So there is
> +    -- no big benefit in optimizing that. Perhaps a compound index
> +    -- {status, id} could help too.
> +    for _, bucket in box.space._bucket:pairs({opts.from},
> +                                             {iterator = box.index.GE}) do
> +        local status = bucket.status
> +        if status == active or status == pinned then
> +            table.insert(buckets, bucket.id)
> +        end
> +        limit = limit - 1

It's a bit strange after words about "99% time", I propose to move limit 
decrease under if condition.

> +        if limit == 0 then
> +            next_from = bucket.id + 1
> +            break
> +        end
> +    end
> +    -- Buckets list can even be empty, if all buckets in the
> +    -- scanned chunk are not active/pinned. But next_from still
> +    -- should be returned. So as the router could request more.
> +    return {buckets = buckets, next_from = next_from}
> +end
> +
>   --
>   -- Collect array of active bucket identifiers for discovery.
>   --
> -local function buckets_discovery()
> +local function buckets_discovery(opts)
> +    if opts then
> +        -- Private method. Is not documented intentionally.
> +        return buckets_discovery_extended(opts)
> +    end
>       local ret = {}
>       local status = box.space._bucket.index.status
>       for _, bucket in status:pairs({consts.BUCKET.ACTIVE}) do
> 


More information about the Tarantool-patches mailing list