From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp54.i.mail.ru (smtp54.i.mail.ru [217.69.128.34]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id DEC11469711 for ; Mon, 4 May 2020 17:26:59 +0300 (MSK) From: Oleg Babin References: <80bacfc685233ad047f6a80ddadd72b8903eae5b.1588292014.git.v.shpilevoy@tarantool.org> <4ee74ac5-55e0-9a90-91d0-69e9470d9cb3@tarantool.org> Message-ID: <2bb5c86a-04c4-96a9-ab3e-75b96d43b1d9@tarantool.org> Date: Mon, 4 May 2020 17:26:56 +0300 MIME-Version: 1.0 In-Reply-To: <4ee74ac5-55e0-9a90-91d0-69e9470d9cb3@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Language: en-GB Content-Transfer-Encoding: 8bit Subject: Re: [Tarantool-patches] [PATCH vshard 6/7] router: make discovery smoother in a big cluster List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org Thanks for answers and changes! LGTM. I left my answers and one nit to new patch diff. On 02/05/2020 23:12, Vladislav Shpilevoy wrote: > Thanks for the review! > > On 01/05/2020 19:01, Oleg Babin wrote: >> Thanks for your patch! See my comments below. >> >> On 01/05/2020 03:16, Vladislav Shpilevoy wrote: >>> Router does discovery once per 10 seconds. Discovery sends a >>> request to each replicaset to download all pinned and active >>> buckets from there. When there are millions of buckets, that >>> becomes a long operation taking seconds, during which the storage >>> is unresponsive. >>> >>> The patch makes discovery work step by step, downloading not more >>> than 1000 buckets at a time. That gives the storage time to >>> process other requests. >>> >>> Moreover, discovery now has some kind of 'state'. For each >>> replicaset it keeps an iterator which is moved by 1k buckets on >>> every successfully discovered bucket batch. It means, that if on a >>> replicaset with 1 000 000 buckets discovery fails after 999 999 >>> buckets are already discovered, it won't start from 0. It will >>> retry from the old position. >> >> Could you provide a test for such case? > > No problem. > ... > I couldn't come up with a sane test on whether router continues > discovery from exactly the same place where it got an error. I would > need to insert some extraordinary weird injections into router to > collect statistics of that kind. But the test at least shows, that an > error does not prevent full discovery eventually. > Thanks, I believe it's enough. >>> However, still there is space for improvement. Discovery could >>> avoid downloading anything after all is downloaded, if it could >>> somehow see, if bucket space is not changed. Unfortunately it is >>> not so easy, since bucket generation (version of _bucket space) >>> is not persisted. So after instance restart it is always equal to >>> bucket count. >> >> Is there an issue for that? > > I was hoping you wouldn't ask :D > > I don't really see how that optimization could be implemented in a sane > way in terms of simplicity. Looks like a storage would need to send > a bucket generation + some timestamp to protect form the case when > after restart bucket set is different, but generation is the same. > > I was going to wait until somebody explicitly asks for it, if > 'discovery_mode = "once"' won't be enough. > > But ok, I was caught. Here is the issue: > https://github.com/tarantool/vshard/issues/238 > Hah, I don't know but when I read this part of commit message I thought about trigger on the "_bucket" space that could update some "generation" value may be in "_schema" space. I don't know could it be appropriate way to implement this feature but I asked to make sure that this propose won't be lost. >>> diff --git a/vshard/router/init.lua b/vshard/router/init.lua >>> index 6d88153..26ea85b 100644 >>> --- a/vshard/router/init.lua >>> +++ b/vshard/router/init.lua >>> @@ -250,50 +250,127 @@ if util.version_is_at_least(1, 10, 0) then >>>   discovery_f = function(router) >>>       local module_version = M.module_version >>>       assert(router.discovery_mode == 'on') >>> +    local iterators = {} >>> +    local opts = {is_async = true} >>> +    local mode >>>       while module_version == M.module_version do >>> -        while not next(router.replicasets) do >>> -            lfiber.sleep(consts.DISCOVERY_INTERVAL) >>> -        end >>> -        if module_version ~= M.module_version then >>> -            return >>> -        end >>>           -- Just typical map reduce - send request to each >>> -        -- replicaset in parallel, and collect responses. >>> -        local pending = {} >>> -        local opts = {is_async = true} >>> -        local args = {} >>> -        for rs_uuid, replicaset in pairs(router.replicasets) do >>> +        -- replicaset in parallel, and collect responses. Many >>> +        -- requests probably will be needed for each replicaset. >>> +        -- >>> +        -- Step 1: create missing iterators, in case this is a >>> +        -- first discovery iteration, or some replicasets were >>> +        -- added after the router is started. >>> +        for rs_uuid in pairs(router.replicasets) do >>> +            local iter = iterators[rs_uuid] >>> +            if not iter then >>> +                iterators[rs_uuid] = { >>> +                    args = {{from = 1}}, >>> +                    future = nil, >>> +                } >>> +            end >>> +        end >>> +        -- Step 2: map stage - send parallel requests for every >>> +        -- iterator, prune orphan iterators whose replicasets were >>> +        -- removed. >>> +        for rs_uuid, iter in pairs(iterators) do >>> +            local replicaset = router.replicasets[rs_uuid] >>> +            if not replicaset then >>> +                log.warn('Replicaset %s was removed during discovery', rs_uuid) >>> +                iterators[rs_uuid] = nil >>> +                goto continue >>> +            end >>>               local future, err = >>> -                replicaset:callro('vshard.storage.buckets_discovery', >>> -                                  args, opts) >>> +                replicaset:callro('vshard.storage.buckets_discovery', iter.args, >>> +                                  opts) >>>               if not future then >>> -                log.warn('Error during discovery %s: %s', rs_uuid, err) >>> -            else >>> -                pending[rs_uuid] = future >>> +                log.warn('Error during discovery %s, retry will be done '.. >>> +                         'later: %s', rs_uuid, err) >>> +                goto continue >>> +            end >>> +            iter.future = future >>> +            -- Don't spam many requests at once. Give >>> +            -- storages time to handle them and other >>> +            -- requests. >>> +            lfiber.sleep(consts.DISCOVERY_WORK_STEP) >>> +            if module_version ~= M.module_version then >>> +                return >>>               end >> >> >> Is it possible to place such checks to some "common" places. At start/end of each iteration or between steps. This looks strange and a bit unobvous to do such checks after each timeout. > > Purpose is to stop the fiber as soon as possible in case a reload > happened. So moving it to a common place is not really possible. > It would make it work longer with the old code after reload. But > if we keep the checks, I still need to do then after/before any > long yield. > My main concern is that such things could be easily missed in future. But I agree that we should left this function early as possible. >>> +            ::continue:: >>>           end >>> - >>> -        local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL >>> -        for rs_uuid, p in pairs(pending) do >>> +        -- Step 3: reduce stage - collect responses, restart >>> +        -- iterators which reached the end. >>> +        for rs_uuid, iter in pairs(iterators) do >>>               lfiber.yield() >>> -            local timeout = deadline - lfiber.clock() >>> -            local buckets, err = p:wait_result(timeout) >>> -            while M.errinj.ERRINJ_LONG_DISCOVERY do >>> -                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting' >>> -                lfiber.sleep(0.01) >>> +            local future = iter.future >>> +            if not future then >>> +                goto continue >>>               end >>> -            local replicaset = router.replicasets[rs_uuid] >>> -            if not buckets then >>> -                p:discard() >>> -                log.warn('Error during discovery %s: %s', rs_uuid, err) >>> -            elseif module_version ~= M.module_version then >>> +            local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT) >>> +            if module_version ~= M.module_version then >> >> Does it have sence to call "discard" before "return"? > > Yes. Discard removes the entry from netbox's internal table of requests > waiting for a result. I don't know how long the storage won't respond on > that, so better free the memory now. The entry is not removed automatically > in case it was a timeout error. > >>> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua >>> index 73c6740..0050b96 100644 >>> --- a/vshard/storage/init.lua >>> +++ b/vshard/storage/init.lua >>> @@ -1110,10 +1110,47 @@ local function bucket_collect(bucket_id) >>>       return data >>>   end >>>   +-- Discovery used by routers. It returns limited number of >>> +-- buckets to avoid stalls when _bucket is huge. >>> +local function buckets_discovery_extended(opts) >>> +    local limit = consts.BUCKET_CHUNK_SIZE >>> +    local buckets = table.new(limit, 0) >>> +    local active = consts.BUCKET.ACTIVE >>> +    local pinned = consts.BUCKET.PINNED >>> +    local next_from >>> +    -- No way to select by {status, id}, because there are two >>> +    -- statuses to select. A router would need to maintain a >>> +    -- separate iterator for each status it wants to get. This may >>> +    -- be implemented in future. But _bucket space anyway 99% of >>> +    -- time contains only active and pinned buckets. So there is >>> +    -- no big benefit in optimizing that. Perhaps a compound index >>> +    -- {status, id} could help too. >>> +    for _, bucket in box.space._bucket:pairs({opts.from}, >>> +                                             {iterator = box.index.GE}) do >>> +        local status = bucket.status >>> +        if status == active or status == pinned then >>> +            table.insert(buckets, bucket.id) >>> +        end >>> +        limit = limit - 1 >> >> It's a bit strange after words about "99% time", I propose to move limit decrease under if condition. > > But still there is this 1%, when buckets start moving. When bucket count > is millions, rebalancing is likely to move thousands of buckets, likely > from one range. For example, buckets from 1 to 10k can be moved. If in > that moment will arrive a discovery request, it will stuck for a long time > iterating over the sent and garbage buckets. > > Rebalancing is a heavy thing, and such additional load would make the > cluster feel even worse. > > 99% here is rather about that it is not a problem to sometimes return an > empty bucket set. Not about allowing perf problems in the other 1%. > Agree > > During working on your comments I found that during aggressive discovery I > added sleep in a wrong place. Fixed below: > > ==================== > diff --git a/vshard/router/init.lua b/vshard/router/init.lua > index 26ea85b..28437e3 100644 > --- a/vshard/router/init.lua > +++ b/vshard/router/init.lua > @@ -358,11 +358,14 @@ discovery_f = function(router) > mode = 'idle' > end > lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL) > - elseif mode ~= 'aggressive' then > - log.info('Start aggressive discovery, %s buckets are unknown. '.. > - 'Discovery works with %s seconds interval', > - unknown_bucket_count, consts.DISCOVERY_WORK_INTERVAL) > - mode = 'aggressive' > + else > + if mode ~= 'aggressive' then > + log.info('Start aggressive discovery, %s buckets are '.. > + 'unknown. Discovery works with %s seconds '.. > + 'interval', unknown_bucket_count, > + consts.DISCOVERY_WORK_INTERVAL) > + mode = 'aggressive' > + end > lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL) > break > end > > ==================== > > Since the changes are big, below is the whole new commit: > > ==================== > > router: make discovery smoother in a big cluster > > Router does discovery once per 10 seconds. Discovery sends a > request to each replicaset to download all pinned and active > buckets from there. When there are millions of buckets, that > becomes a long operation taking seconds, during which the storage > is unresponsive. > > The patch makes discovery work step by step, downloading not more > than 1000 buckets at a time. That gives the storage time to > process other requests. > > Moreover, discovery now has some kind of 'state'. For each > replicaset it keeps an iterator which is moved by 1k buckets on > every successfully discovered bucket batch. It means, that if on a > replicaset with 1 000 000 buckets discovery fails after 999 999 > buckets are already discovered, it won't start from 0. It will > retry from the old position. > > However, still there is space for improvement. Discovery could > avoid downloading anything after all is downloaded, if it could > somehow see, if bucket space is not changed. Unfortunately it is > not so easy, since bucket generation (version of _bucket space) > is not persisted. So after instance restart it is always equal to > bucket count. > > Part of #210 > > diff --git a/test/router/reload.result b/test/router/reload.result > index 3ba900a..8fe99ba 100644 > --- a/test/router/reload.result > +++ b/test/router/reload.result > @@ -44,7 +44,10 @@ vshard.router.bootstrap() > while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end > --- > ... > -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +--- > +... > +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > --- > ... > -- > diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua > index 5ed5690..abcbc09 100644 > --- a/test/router/reload.test.lua > +++ b/test/router/reload.test.lua > @@ -15,7 +15,8 @@ fiber = require('fiber') > vshard.router.bootstrap() > > while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end > -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > > -- > -- Gh-72: allow reload. Test simple reload, error during > diff --git a/test/router/router.result b/test/router/router.result > index df7be4a..b2efd6d 100644 > --- a/test/router/router.result > +++ b/test/router/router.result > @@ -943,9 +943,9 @@ calculate_known_buckets() > --- > - 3000 > ... > -test_run:grep_log('router_1', 'was 1, became 1500') > +test_run:grep_log('router_1', 'was 1, became 1000') > --- > -- was 1, became 1500 > +- was 1, became 1000 > ... > info = vshard.router.info() > --- > diff --git a/test/router/router.test.lua b/test/router/router.test.lua > index 97dce49..154310b 100644 > --- a/test/router/router.test.lua > +++ b/test/router/router.test.lua > @@ -316,7 +316,7 @@ vshard.storage.bucket_pin(first_active) > _ = test_run:switch('router_1') > wait_discovery() > calculate_known_buckets() > -test_run:grep_log('router_1', 'was 1, became 1500') > +test_run:grep_log('router_1', 'was 1, became 1000') > info = vshard.router.info() > info.bucket > info.alerts > diff --git a/test/router/router2.result b/test/router/router2.result > index 556f749..0ad5a21 100644 > --- a/test/router/router2.result > +++ b/test/router/router2.result > @@ -123,6 +123,66 @@ f1:status(), f2, f3:status(), f4:status(), f5, f6:status() > | - suspended > | ... > > +-- Errored discovery continued successfully after errors are gone. > +vshard.router.bootstrap() > + | --- > + | - true > + | ... > +vshard.router.discovery_set('off') > + | --- > + | ... > +vshard.router._route_map_clear() > + | --- > + | ... > + > +-- Discovery requests 2 and 4 will fail on storages. > +util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}}, \ > + 'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4') > + | --- > + | ... > + > +vshard.router.info().bucket.unknown > + | --- > + | - 3000 > + | ... > +vshard.router.discovery_set('on') > + | --- > + | ... > +function continue_discovery() \ > + local res = vshard.router.info().bucket.unknown == 0 \ > + if not res then \ > + vshard.router.discovery_wakeup() \ > + end \ > + return res \ > +end > + | --- > + | ... > +test_run:wait_cond(continue_discovery) > + | --- > + | - true > + | ... > +vshard.router.info().bucket.unknown > + | --- > + | - 0 > + | ... > + > +-- Discovery injections should be reset meaning they were returned > +-- needed number of times. > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +vshard.storage.internal.errinj.ERRINJ_DISCOVERY > + | --- > + | - 0 > + | ... > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +vshard.storage.internal.errinj.ERRINJ_DISCOVERY > + | --- > + | - 0 > + | ... > + > _ = test_run:switch("default") > | --- > | ... > diff --git a/test/router/router2.test.lua b/test/router/router2.test.lua > index 33f4d3e..ef05f8c 100644 > --- a/test/router/router2.test.lua > +++ b/test/router/router2.test.lua > @@ -43,6 +43,34 @@ vshard.router.static.discovery_fiber:status() > > f1:status(), f2, f3:status(), f4:status(), f5, f6:status() > > +-- Errored discovery continued successfully after errors are gone. > +vshard.router.bootstrap() > +vshard.router.discovery_set('off') > +vshard.router._route_map_clear() > + > +-- Discovery requests 2 and 4 will fail on storages. > +util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}}, \ > + 'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4') > + > +vshard.router.info().bucket.unknown > +vshard.router.discovery_set('on') > +function continue_discovery() \ > + local res = vshard.router.info().bucket.unknown == 0 \ > + if not res then \ > + vshard.router.discovery_wakeup() \ > + end \ > + return res \ > +end > +test_run:wait_cond(continue_discovery) > +vshard.router.info().bucket.unknown > + > +-- Discovery injections should be reset meaning they were returned > +-- needed number of times. > +_ = test_run:switch('storage_1_a') > +vshard.storage.internal.errinj.ERRINJ_DISCOVERY > +_ = test_run:switch('storage_2_a') > +vshard.storage.internal.errinj.ERRINJ_DISCOVERY > + > _ = test_run:switch("default") > _ = test_run:cmd("stop server router_1") > _ = test_run:cmd("cleanup server router_1") > diff --git a/test/router/wrong_config.result b/test/router/wrong_config.result > index 56db9e8..92353c3 100644 > --- a/test/router/wrong_config.result > +++ b/test/router/wrong_config.result > @@ -45,7 +45,10 @@ cfg.bucket_count = 1000 > r = vshard.router.new('gh-179', cfg) > --- > ... > -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end > +while r:info().bucket.available_rw ~= 3000 do \ > + r:discovery_wakeup() \ > + fiber.sleep(0.1) \ > +end > --- > ... > i = r:info() > diff --git a/test/router/wrong_config.test.lua b/test/router/wrong_config.test.lua > index 62ef30d..174b373 100644 > --- a/test/router/wrong_config.test.lua > +++ b/test/router/wrong_config.test.lua > @@ -18,7 +18,10 @@ vshard.router.bootstrap() > -- > cfg.bucket_count = 1000 > r = vshard.router.new('gh-179', cfg) > -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end > +while r:info().bucket.available_rw ~= 3000 do \ > + r:discovery_wakeup() \ > + fiber.sleep(0.1) \ > +end > i = r:info() > i.bucket > i.alerts > diff --git a/vshard/consts.lua b/vshard/consts.lua > index 5391c0f..a6a8c1b 100644 > --- a/vshard/consts.lua > +++ b/vshard/consts.lua > @@ -40,5 +40,8 @@ return { > DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5; > RECOVERY_INTERVAL = 5; > COLLECT_LUA_GARBAGE_INTERVAL = 100; > - DISCOVERY_INTERVAL = 10; > + DISCOVERY_IDLE_INTERVAL = 10, > + DISCOVERY_WORK_INTERVAL = 1, > + DISCOVERY_WORK_STEP = 0.01, > + DISCOVERY_TIMEOUT = 10, > } Nit: here commas and semicolons are mixed. I think we should support it in consistent state - only commas or only semicolons. > diff --git a/vshard/router/init.lua b/vshard/router/init.lua > index 6d88153..28437e3 100644 > --- a/vshard/router/init.lua > +++ b/vshard/router/init.lua > @@ -250,50 +250,130 @@ if util.version_is_at_least(1, 10, 0) then > discovery_f = function(router) > local module_version = M.module_version > assert(router.discovery_mode == 'on') > + local iterators = {} > + local opts = {is_async = true} > + local mode > while module_version == M.module_version do > - while not next(router.replicasets) do > - lfiber.sleep(consts.DISCOVERY_INTERVAL) > - end > - if module_version ~= M.module_version then > - return > - end > -- Just typical map reduce - send request to each > - -- replicaset in parallel, and collect responses. > - local pending = {} > - local opts = {is_async = true} > - local args = {} > - for rs_uuid, replicaset in pairs(router.replicasets) do > + -- replicaset in parallel, and collect responses. Many > + -- requests probably will be needed for each replicaset. > + -- > + -- Step 1: create missing iterators, in case this is a > + -- first discovery iteration, or some replicasets were > + -- added after the router is started. > + for rs_uuid in pairs(router.replicasets) do > + local iter = iterators[rs_uuid] > + if not iter then > + iterators[rs_uuid] = { > + args = {{from = 1}}, > + future = nil, > + } > + end > + end > + -- Step 2: map stage - send parallel requests for every > + -- iterator, prune orphan iterators whose replicasets were > + -- removed. > + for rs_uuid, iter in pairs(iterators) do > + local replicaset = router.replicasets[rs_uuid] > + if not replicaset then > + log.warn('Replicaset %s was removed during discovery', rs_uuid) > + iterators[rs_uuid] = nil > + goto continue > + end > local future, err = > - replicaset:callro('vshard.storage.buckets_discovery', > - args, opts) > + replicaset:callro('vshard.storage.buckets_discovery', iter.args, > + opts) > if not future then > - log.warn('Error during discovery %s: %s', rs_uuid, err) > - else > - pending[rs_uuid] = future > + log.warn('Error during discovery %s, retry will be done '.. > + 'later: %s', rs_uuid, err) > + goto continue > end > + iter.future = future > + -- Don't spam many requests at once. Give > + -- storages time to handle them and other > + -- requests. > + lfiber.sleep(consts.DISCOVERY_WORK_STEP) > + if module_version ~= M.module_version then > + return > + end > + ::continue:: > end > - > - local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL > - for rs_uuid, p in pairs(pending) do > + -- Step 3: reduce stage - collect responses, restart > + -- iterators which reached the end. > + for rs_uuid, iter in pairs(iterators) do > lfiber.yield() > - local timeout = deadline - lfiber.clock() > - local buckets, err = p:wait_result(timeout) > - while M.errinj.ERRINJ_LONG_DISCOVERY do > - M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting' > - lfiber.sleep(0.01) > + local future = iter.future > + if not future then > + goto continue > end > - local replicaset = router.replicasets[rs_uuid] > - if not buckets then > - p:discard() > - log.warn('Error during discovery %s: %s', rs_uuid, err) > - elseif module_version ~= M.module_version then > + local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT) > + if module_version ~= M.module_version then > return > - elseif replicaset then > - discovery_handle_buckets(router, replicaset, buckets[1]) > end > + if not result then > + future:discard() > + log.warn('Error during discovery %s, retry will be done '.. > + 'later: %s', rs_uuid, err) > + goto continue > + end > + local replicaset = router.replicasets[rs_uuid] > + if not replicaset then > + iterators[rs_uuid] = nil > + log.warn('Replicaset %s was removed during discovery', rs_uuid) > + goto continue > + end > + result = result[1] > + -- Buckets are returned as plain array by storages > + -- using old vshard version. But if .buckets is set, > + -- this is a new storage. > + discovery_handle_buckets(router, replicaset, > + result.buckets or result) > + local discovery_args = iter.args[1] > + discovery_args.from = result.next_from > + if not result.next_from then > + -- Nil next_from means no more buckets to get. > + -- Restart the iterator. > + iterators[rs_uuid] = nil > + end > + ::continue:: > end > - > - lfiber.sleep(deadline - lfiber.clock()) > + local unknown_bucket_count > + repeat > + unknown_bucket_count = > + router.total_bucket_count - router.known_bucket_count > + if unknown_bucket_count == 0 then > + if mode ~= 'idle' then > + log.info('Discovery enters idle mode, all buckets are '.. > + 'known. Discovery works with %s seconds '.. > + 'interval now', consts.DISCOVERY_IDLE_INTERVAL) > + mode = 'idle' > + end > + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL) > + elseif not next(router.replicasets) then > + if mode ~= 'idle' then > + log.info('Discovery enters idle mode because '.. > + 'configuration does not have replicasets. '.. > + 'Retries will happen with %s seconds interval', > + consts.DISCOVERY_IDLE_INTERVAL) > + mode = 'idle' > + end > + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL) > + else > + if mode ~= 'aggressive' then > + log.info('Start aggressive discovery, %s buckets are '.. > + 'unknown. Discovery works with %s seconds '.. > + 'interval', unknown_bucket_count, > + consts.DISCOVERY_WORK_INTERVAL) > + mode = 'aggressive' > + end > + lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL) > + break > + end > + while M.errinj.ERRINJ_LONG_DISCOVERY do > + M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting' > + lfiber.sleep(0.01) > + end > + until next(router.replicasets) > end > end > > @@ -355,9 +435,9 @@ local function discovery_set(router, new_mode) > router.discovery_mode = new_mode > if router.discovery_fiber ~= nil then > pcall(router.discovery_fiber.cancel, router.discovery_fiber) > + router.discovery_fiber = nil > end > if new_mode == 'off' then > - router.discovery_fiber = nil > return > end > router.discovery_fiber = util.reloadable_fiber_create( > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua > index 73c6740..c6a78fe 100644 > --- a/vshard/storage/init.lua > +++ b/vshard/storage/init.lua > @@ -75,6 +75,7 @@ if not M then > ERRINJ_RECEIVE_PARTIALLY = false, > ERRINJ_NO_RECOVERY = false, > ERRINJ_UPGRADE = false, > + ERRINJ_DISCOVERY = false, > }, > -- This counter is used to restart background fibers with > -- new reloaded code. > @@ -1110,10 +1111,58 @@ local function bucket_collect(bucket_id) > return data > end > > +-- Discovery used by routers. It returns limited number of > +-- buckets to avoid stalls when _bucket is huge. > +local function buckets_discovery_extended(opts) > + local limit = consts.BUCKET_CHUNK_SIZE > + local buckets = table.new(limit, 0) > + local active = consts.BUCKET.ACTIVE > + local pinned = consts.BUCKET.PINNED > + local next_from > + local errcnt = M.errinj.ERRINJ_DISCOVERY > + if errcnt then > + if errcnt > 0 then > + M.errinj.ERRINJ_DISCOVERY = errcnt - 1 > + if errcnt % 2 == 0 then > + box.error(box.error.INJECTION, 'discovery') > + end > + else > + M.errinj.ERRINJ_DISCOVERY = false > + end > + end > + -- No way to select by {status, id}, because there are two > + -- statuses to select. A router would need to maintain a > + -- separate iterator for each status it wants to get. This may > + -- be implemented in future. But _bucket space anyway 99% of > + -- time contains only active and pinned buckets. So there is > + -- no big benefit in optimizing that. Perhaps a compound index > + -- {status, id} could help too. > + for _, bucket in box.space._bucket:pairs({opts.from}, > + {iterator = box.index.GE}) do > + local status = bucket.status > + if status == active or status == pinned then > + table.insert(buckets, bucket.id) > + end > + limit = limit - 1 > + if limit == 0 then > + next_from = bucket.id + 1 > + break > + end > + end > + -- Buckets list can even be empty, if all buckets in the > + -- scanned chunk are not active/pinned. But next_from still > + -- should be returned. So as the router could request more. > + return {buckets = buckets, next_from = next_from} > +end > + > -- > -- Collect array of active bucket identifiers for discovery. > -- > -local function buckets_discovery() > +local function buckets_discovery(opts) > + if opts then > + -- Private method. Is not documented intentionally. > + return buckets_discovery_extended(opts) > + end > local ret = {} > local status = box.space._bucket.index.status > for _, bucket in status:pairs({consts.BUCKET.ACTIVE}) do >