From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp16.mail.ru (smtp16.mail.ru [94.100.176.153]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id F40F64696C3 for ; Sat, 2 May 2020 23:12:03 +0300 (MSK) References: <80bacfc685233ad047f6a80ddadd72b8903eae5b.1588292014.git.v.shpilevoy@tarantool.org> From: Vladislav Shpilevoy Message-ID: <4ee74ac5-55e0-9a90-91d0-69e9470d9cb3@tarantool.org> Date: Sat, 2 May 2020 22:12:01 +0200 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset="utf-8" Content-Language: en-US Content-Transfer-Encoding: 8bit Subject: Re: [Tarantool-patches] [PATCH vshard 6/7] router: make discovery smoother in a big cluster List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Oleg Babin , tarantool-patches@dev.tarantool.org Thanks for the review! On 01/05/2020 19:01, Oleg Babin wrote: > Thanks for your patch! See my comments below. > > On 01/05/2020 03:16, Vladislav Shpilevoy wrote: >> Router does discovery once per 10 seconds. Discovery sends a >> request to each replicaset to download all pinned and active >> buckets from there. When there are millions of buckets, that >> becomes a long operation taking seconds, during which the storage >> is unresponsive. >> >> The patch makes discovery work step by step, downloading not more >> than 1000 buckets at a time. That gives the storage time to >> process other requests. >> >> Moreover, discovery now has some kind of 'state'. For each >> replicaset it keeps an iterator which is moved by 1k buckets on >> every successfully discovered bucket batch. It means, that if on a >> replicaset with 1 000 000 buckets discovery fails after 999 999 >> buckets are already discovered, it won't start from 0. It will >> retry from the old position. > > Could you provide a test for such case? No problem. ==================== diff --git a/test/router/router2.result b/test/router/router2.result index 556f749..0ad5a21 100644 --- a/test/router/router2.result +++ b/test/router/router2.result @@ -123,6 +123,66 @@ f1:status(), f2, f3:status(), f4:status(), f5, f6:status() | - suspended | ... +-- Errored discovery continued successfully after errors are gone. +vshard.router.bootstrap() + | --- + | - true + | ... +vshard.router.discovery_set('off') + | --- + | ... +vshard.router._route_map_clear() + | --- + | ... + +-- Discovery requests 2 and 4 will fail on storages. +util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}}, \ + 'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4') + | --- + | ... + +vshard.router.info().bucket.unknown + | --- + | - 3000 + | ... +vshard.router.discovery_set('on') + | --- + | ... +function continue_discovery() \ + local res = vshard.router.info().bucket.unknown == 0 \ + if not res then \ + vshard.router.discovery_wakeup() \ + end \ + return res \ +end + | --- + | ... +test_run:wait_cond(continue_discovery) + | --- + | - true + | ... +vshard.router.info().bucket.unknown + | --- + | - 0 + | ... + +-- Discovery injections should be reset meaning they were returned +-- needed number of times. +_ = test_run:switch('storage_1_a') + | --- + | ... +vshard.storage.internal.errinj.ERRINJ_DISCOVERY + | --- + | - 0 + | ... +_ = test_run:switch('storage_2_a') + | --- + | ... +vshard.storage.internal.errinj.ERRINJ_DISCOVERY + | --- + | - 0 + | ... + _ = test_run:switch("default") | --- | ... diff --git a/test/router/router2.test.lua b/test/router/router2.test.lua index 33f4d3e..13fa73f 100644 --- a/test/router/router2.test.lua +++ b/test/router/router2.test.lua @@ -43,6 +43,34 @@ vshard.router.static.discovery_fiber:status() f1:status(), f2, f3:status(), f4:status(), f5, f6:status() +-- Errored discovery continued successfully after errors are gone. +vshard.router.bootstrap() +vshard.router.discovery_set('off') +vshard.router._route_map_clear() + +-- Discovery requests 2 and 4 will fail on storages. +util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}}, \ + 'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4') + +vshard.router.info().bucket.unknown +vshard.router.discovery_set('on') +function continue_discovery() \ + local res = vshard.router.info().bucket.unknown == 0 \ + if not res then \ + vshard.router.discovery_wakeup() \ + end \ + return res \ +end +test_run:wait_cond(continue_discovery) +vshard.router.info().bucket.unknown + +-- Discovery injections should be reset meaning they were returned +-- needed number of times. +_ = test_run:switch('storage_1_a') +vshard.storage.internal.errinj.ERRINJ_DISCOVERY +_ = test_run:switch('storage_2_a') +vshard.storage.internal.errinj.ERRINJ_DISCOVERY + _ = test_run:switch("default") _ = test_run:cmd("stop server router_1") _ = test_run:cmd("cleanup server router_1") diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 0050b96..c6a78fe 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -75,6 +75,7 @@ if not M then ERRINJ_RECEIVE_PARTIALLY = false, ERRINJ_NO_RECOVERY = false, ERRINJ_UPGRADE = false, + ERRINJ_DISCOVERY = false, }, -- This counter is used to restart background fibers with -- new reloaded code. @@ -1118,6 +1119,17 @@ local function buckets_discovery_extended(opts) local active = consts.BUCKET.ACTIVE local pinned = consts.BUCKET.PINNED local next_from + local errcnt = M.errinj.ERRINJ_DISCOVERY + if errcnt then + if errcnt > 0 then + M.errinj.ERRINJ_DISCOVERY = errcnt - 1 + if errcnt % 2 == 0 then + box.error(box.error.INJECTION, 'discovery') + end + else + M.errinj.ERRINJ_DISCOVERY = false + end + end -- No way to select by {status, id}, because there are two -- statuses to select. A router would need to maintain a -- separate iterator for each status it wants to get. This may ==================== I couldn't come up with a sane test on whether router continues discovery from exactly the same place where it got an error. I would need to insert some extraordinary weird injections into router to collect statistics of that kind. But the test at least shows, that an error does not prevent full discovery eventually. >> However, still there is space for improvement. Discovery could >> avoid downloading anything after all is downloaded, if it could >> somehow see, if bucket space is not changed. Unfortunately it is >> not so easy, since bucket generation (version of _bucket space) >> is not persisted. So after instance restart it is always equal to >> bucket count. > > Is there an issue for that? I was hoping you wouldn't ask :D I don't really see how that optimization could be implemented in a sane way in terms of simplicity. Looks like a storage would need to send a bucket generation + some timestamp to protect form the case when after restart bucket set is different, but generation is the same. I was going to wait until somebody explicitly asks for it, if 'discovery_mode = "once"' won't be enough. But ok, I was caught. Here is the issue: https://github.com/tarantool/vshard/issues/238 >> diff --git a/vshard/router/init.lua b/vshard/router/init.lua >> index 6d88153..26ea85b 100644 >> --- a/vshard/router/init.lua >> +++ b/vshard/router/init.lua >> @@ -250,50 +250,127 @@ if util.version_is_at_least(1, 10, 0) then >>   discovery_f = function(router) >>       local module_version = M.module_version >>       assert(router.discovery_mode == 'on') >> +    local iterators = {} >> +    local opts = {is_async = true} >> +    local mode >>       while module_version == M.module_version do >> -        while not next(router.replicasets) do >> -            lfiber.sleep(consts.DISCOVERY_INTERVAL) >> -        end >> -        if module_version ~= M.module_version then >> -            return >> -        end >>           -- Just typical map reduce - send request to each >> -        -- replicaset in parallel, and collect responses. >> -        local pending = {} >> -        local opts = {is_async = true} >> -        local args = {} >> -        for rs_uuid, replicaset in pairs(router.replicasets) do >> +        -- replicaset in parallel, and collect responses. Many >> +        -- requests probably will be needed for each replicaset. >> +        -- >> +        -- Step 1: create missing iterators, in case this is a >> +        -- first discovery iteration, or some replicasets were >> +        -- added after the router is started. >> +        for rs_uuid in pairs(router.replicasets) do >> +            local iter = iterators[rs_uuid] >> +            if not iter then >> +                iterators[rs_uuid] = { >> +                    args = {{from = 1}}, >> +                    future = nil, >> +                } >> +            end >> +        end >> +        -- Step 2: map stage - send parallel requests for every >> +        -- iterator, prune orphan iterators whose replicasets were >> +        -- removed. >> +        for rs_uuid, iter in pairs(iterators) do >> +            local replicaset = router.replicasets[rs_uuid] >> +            if not replicaset then >> +                log.warn('Replicaset %s was removed during discovery', rs_uuid) >> +                iterators[rs_uuid] = nil >> +                goto continue >> +            end >>               local future, err = >> -                replicaset:callro('vshard.storage.buckets_discovery', >> -                                  args, opts) >> +                replicaset:callro('vshard.storage.buckets_discovery', iter.args, >> +                                  opts) >>               if not future then >> -                log.warn('Error during discovery %s: %s', rs_uuid, err) >> -            else >> -                pending[rs_uuid] = future >> +                log.warn('Error during discovery %s, retry will be done '.. >> +                         'later: %s', rs_uuid, err) >> +                goto continue >> +            end >> +            iter.future = future >> +            -- Don't spam many requests at once. Give >> +            -- storages time to handle them and other >> +            -- requests. >> +            lfiber.sleep(consts.DISCOVERY_WORK_STEP) >> +            if module_version ~= M.module_version then >> +                return >>               end > > > Is it possible to place such checks to some "common" places. At start/end of each iteration or between steps. This looks strange and a bit unobvous to do such checks after each timeout. Purpose is to stop the fiber as soon as possible in case a reload happened. So moving it to a common place is not really possible. It would make it work longer with the old code after reload. But if we keep the checks, I still need to do then after/before any long yield. >> +            ::continue:: >>           end >> - >> -        local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL >> -        for rs_uuid, p in pairs(pending) do >> +        -- Step 3: reduce stage - collect responses, restart >> +        -- iterators which reached the end. >> +        for rs_uuid, iter in pairs(iterators) do >>               lfiber.yield() >> -            local timeout = deadline - lfiber.clock() >> -            local buckets, err = p:wait_result(timeout) >> -            while M.errinj.ERRINJ_LONG_DISCOVERY do >> -                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting' >> -                lfiber.sleep(0.01) >> +            local future = iter.future >> +            if not future then >> +                goto continue >>               end >> -            local replicaset = router.replicasets[rs_uuid] >> -            if not buckets then >> -                p:discard() >> -                log.warn('Error during discovery %s: %s', rs_uuid, err) >> -            elseif module_version ~= M.module_version then >> +            local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT) >> +            if module_version ~= M.module_version then > > Does it have sence to call "discard" before "return"? Yes. Discard removes the entry from netbox's internal table of requests waiting for a result. I don't know how long the storage won't respond on that, so better free the memory now. The entry is not removed automatically in case it was a timeout error. >> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua >> index 73c6740..0050b96 100644 >> --- a/vshard/storage/init.lua >> +++ b/vshard/storage/init.lua >> @@ -1110,10 +1110,47 @@ local function bucket_collect(bucket_id) >>       return data >>   end >>   +-- Discovery used by routers. It returns limited number of >> +-- buckets to avoid stalls when _bucket is huge. >> +local function buckets_discovery_extended(opts) >> +    local limit = consts.BUCKET_CHUNK_SIZE >> +    local buckets = table.new(limit, 0) >> +    local active = consts.BUCKET.ACTIVE >> +    local pinned = consts.BUCKET.PINNED >> +    local next_from >> +    -- No way to select by {status, id}, because there are two >> +    -- statuses to select. A router would need to maintain a >> +    -- separate iterator for each status it wants to get. This may >> +    -- be implemented in future. But _bucket space anyway 99% of >> +    -- time contains only active and pinned buckets. So there is >> +    -- no big benefit in optimizing that. Perhaps a compound index >> +    -- {status, id} could help too. >> +    for _, bucket in box.space._bucket:pairs({opts.from}, >> +                                             {iterator = box.index.GE}) do >> +        local status = bucket.status >> +        if status == active or status == pinned then >> +            table.insert(buckets, bucket.id) >> +        end >> +        limit = limit - 1 > > It's a bit strange after words about "99% time", I propose to move limit decrease under if condition. But still there is this 1%, when buckets start moving. When bucket count is millions, rebalancing is likely to move thousands of buckets, likely from one range. For example, buckets from 1 to 10k can be moved. If in that moment will arrive a discovery request, it will stuck for a long time iterating over the sent and garbage buckets. Rebalancing is a heavy thing, and such additional load would make the cluster feel even worse. 99% here is rather about that it is not a problem to sometimes return an empty bucket set. Not about allowing perf problems in the other 1%. During working on your comments I found that during aggressive discovery I added sleep in a wrong place. Fixed below: ==================== diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 26ea85b..28437e3 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -358,11 +358,14 @@ discovery_f = function(router) mode = 'idle' end lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL) - elseif mode ~= 'aggressive' then - log.info('Start aggressive discovery, %s buckets are unknown. '.. - 'Discovery works with %s seconds interval', - unknown_bucket_count, consts.DISCOVERY_WORK_INTERVAL) - mode = 'aggressive' + else + if mode ~= 'aggressive' then + log.info('Start aggressive discovery, %s buckets are '.. + 'unknown. Discovery works with %s seconds '.. + 'interval', unknown_bucket_count, + consts.DISCOVERY_WORK_INTERVAL) + mode = 'aggressive' + end lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL) break end ==================== Since the changes are big, below is the whole new commit: ==================== router: make discovery smoother in a big cluster Router does discovery once per 10 seconds. Discovery sends a request to each replicaset to download all pinned and active buckets from there. When there are millions of buckets, that becomes a long operation taking seconds, during which the storage is unresponsive. The patch makes discovery work step by step, downloading not more than 1000 buckets at a time. That gives the storage time to process other requests. Moreover, discovery now has some kind of 'state'. For each replicaset it keeps an iterator which is moved by 1k buckets on every successfully discovered bucket batch. It means, that if on a replicaset with 1 000 000 buckets discovery fails after 999 999 buckets are already discovered, it won't start from 0. It will retry from the old position. However, still there is space for improvement. Discovery could avoid downloading anything after all is downloaded, if it could somehow see, if bucket space is not changed. Unfortunately it is not so easy, since bucket generation (version of _bucket space) is not persisted. So after instance restart it is always equal to bucket count. Part of #210 diff --git a/test/router/reload.result b/test/router/reload.result index 3ba900a..8fe99ba 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -44,7 +44,10 @@ vshard.router.bootstrap() while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +--- +... +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end --- ... -- diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 5ed5690..abcbc09 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -15,7 +15,8 @@ fiber = require('fiber') vshard.router.bootstrap() while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end -- -- Gh-72: allow reload. Test simple reload, error during diff --git a/test/router/router.result b/test/router/router.result index df7be4a..b2efd6d 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -943,9 +943,9 @@ calculate_known_buckets() --- - 3000 ... -test_run:grep_log('router_1', 'was 1, became 1500') +test_run:grep_log('router_1', 'was 1, became 1000') --- -- was 1, became 1500 +- was 1, became 1000 ... info = vshard.router.info() --- diff --git a/test/router/router.test.lua b/test/router/router.test.lua index 97dce49..154310b 100644 --- a/test/router/router.test.lua +++ b/test/router/router.test.lua @@ -316,7 +316,7 @@ vshard.storage.bucket_pin(first_active) _ = test_run:switch('router_1') wait_discovery() calculate_known_buckets() -test_run:grep_log('router_1', 'was 1, became 1500') +test_run:grep_log('router_1', 'was 1, became 1000') info = vshard.router.info() info.bucket info.alerts diff --git a/test/router/router2.result b/test/router/router2.result index 556f749..0ad5a21 100644 --- a/test/router/router2.result +++ b/test/router/router2.result @@ -123,6 +123,66 @@ f1:status(), f2, f3:status(), f4:status(), f5, f6:status() | - suspended | ... +-- Errored discovery continued successfully after errors are gone. +vshard.router.bootstrap() + | --- + | - true + | ... +vshard.router.discovery_set('off') + | --- + | ... +vshard.router._route_map_clear() + | --- + | ... + +-- Discovery requests 2 and 4 will fail on storages. +util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}}, \ + 'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4') + | --- + | ... + +vshard.router.info().bucket.unknown + | --- + | - 3000 + | ... +vshard.router.discovery_set('on') + | --- + | ... +function continue_discovery() \ + local res = vshard.router.info().bucket.unknown == 0 \ + if not res then \ + vshard.router.discovery_wakeup() \ + end \ + return res \ +end + | --- + | ... +test_run:wait_cond(continue_discovery) + | --- + | - true + | ... +vshard.router.info().bucket.unknown + | --- + | - 0 + | ... + +-- Discovery injections should be reset meaning they were returned +-- needed number of times. +_ = test_run:switch('storage_1_a') + | --- + | ... +vshard.storage.internal.errinj.ERRINJ_DISCOVERY + | --- + | - 0 + | ... +_ = test_run:switch('storage_2_a') + | --- + | ... +vshard.storage.internal.errinj.ERRINJ_DISCOVERY + | --- + | - 0 + | ... + _ = test_run:switch("default") | --- | ... diff --git a/test/router/router2.test.lua b/test/router/router2.test.lua index 33f4d3e..ef05f8c 100644 --- a/test/router/router2.test.lua +++ b/test/router/router2.test.lua @@ -43,6 +43,34 @@ vshard.router.static.discovery_fiber:status() f1:status(), f2, f3:status(), f4:status(), f5, f6:status() +-- Errored discovery continued successfully after errors are gone. +vshard.router.bootstrap() +vshard.router.discovery_set('off') +vshard.router._route_map_clear() + +-- Discovery requests 2 and 4 will fail on storages. +util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}}, \ + 'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4') + +vshard.router.info().bucket.unknown +vshard.router.discovery_set('on') +function continue_discovery() \ + local res = vshard.router.info().bucket.unknown == 0 \ + if not res then \ + vshard.router.discovery_wakeup() \ + end \ + return res \ +end +test_run:wait_cond(continue_discovery) +vshard.router.info().bucket.unknown + +-- Discovery injections should be reset meaning they were returned +-- needed number of times. +_ = test_run:switch('storage_1_a') +vshard.storage.internal.errinj.ERRINJ_DISCOVERY +_ = test_run:switch('storage_2_a') +vshard.storage.internal.errinj.ERRINJ_DISCOVERY + _ = test_run:switch("default") _ = test_run:cmd("stop server router_1") _ = test_run:cmd("cleanup server router_1") diff --git a/test/router/wrong_config.result b/test/router/wrong_config.result index 56db9e8..92353c3 100644 --- a/test/router/wrong_config.result +++ b/test/router/wrong_config.result @@ -45,7 +45,10 @@ cfg.bucket_count = 1000 r = vshard.router.new('gh-179', cfg) --- ... -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end +while r:info().bucket.available_rw ~= 3000 do \ + r:discovery_wakeup() \ + fiber.sleep(0.1) \ +end --- ... i = r:info() diff --git a/test/router/wrong_config.test.lua b/test/router/wrong_config.test.lua index 62ef30d..174b373 100644 --- a/test/router/wrong_config.test.lua +++ b/test/router/wrong_config.test.lua @@ -18,7 +18,10 @@ vshard.router.bootstrap() -- cfg.bucket_count = 1000 r = vshard.router.new('gh-179', cfg) -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end +while r:info().bucket.available_rw ~= 3000 do \ + r:discovery_wakeup() \ + fiber.sleep(0.1) \ +end i = r:info() i.bucket i.alerts diff --git a/vshard/consts.lua b/vshard/consts.lua index 5391c0f..a6a8c1b 100644 --- a/vshard/consts.lua +++ b/vshard/consts.lua @@ -40,5 +40,8 @@ return { DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5; RECOVERY_INTERVAL = 5; COLLECT_LUA_GARBAGE_INTERVAL = 100; - DISCOVERY_INTERVAL = 10; + DISCOVERY_IDLE_INTERVAL = 10, + DISCOVERY_WORK_INTERVAL = 1, + DISCOVERY_WORK_STEP = 0.01, + DISCOVERY_TIMEOUT = 10, } diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 6d88153..28437e3 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -250,50 +250,130 @@ if util.version_is_at_least(1, 10, 0) then discovery_f = function(router) local module_version = M.module_version assert(router.discovery_mode == 'on') + local iterators = {} + local opts = {is_async = true} + local mode while module_version == M.module_version do - while not next(router.replicasets) do - lfiber.sleep(consts.DISCOVERY_INTERVAL) - end - if module_version ~= M.module_version then - return - end -- Just typical map reduce - send request to each - -- replicaset in parallel, and collect responses. - local pending = {} - local opts = {is_async = true} - local args = {} - for rs_uuid, replicaset in pairs(router.replicasets) do + -- replicaset in parallel, and collect responses. Many + -- requests probably will be needed for each replicaset. + -- + -- Step 1: create missing iterators, in case this is a + -- first discovery iteration, or some replicasets were + -- added after the router is started. + for rs_uuid in pairs(router.replicasets) do + local iter = iterators[rs_uuid] + if not iter then + iterators[rs_uuid] = { + args = {{from = 1}}, + future = nil, + } + end + end + -- Step 2: map stage - send parallel requests for every + -- iterator, prune orphan iterators whose replicasets were + -- removed. + for rs_uuid, iter in pairs(iterators) do + local replicaset = router.replicasets[rs_uuid] + if not replicaset then + log.warn('Replicaset %s was removed during discovery', rs_uuid) + iterators[rs_uuid] = nil + goto continue + end local future, err = - replicaset:callro('vshard.storage.buckets_discovery', - args, opts) + replicaset:callro('vshard.storage.buckets_discovery', iter.args, + opts) if not future then - log.warn('Error during discovery %s: %s', rs_uuid, err) - else - pending[rs_uuid] = future + log.warn('Error during discovery %s, retry will be done '.. + 'later: %s', rs_uuid, err) + goto continue end + iter.future = future + -- Don't spam many requests at once. Give + -- storages time to handle them and other + -- requests. + lfiber.sleep(consts.DISCOVERY_WORK_STEP) + if module_version ~= M.module_version then + return + end + ::continue:: end - - local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL - for rs_uuid, p in pairs(pending) do + -- Step 3: reduce stage - collect responses, restart + -- iterators which reached the end. + for rs_uuid, iter in pairs(iterators) do lfiber.yield() - local timeout = deadline - lfiber.clock() - local buckets, err = p:wait_result(timeout) - while M.errinj.ERRINJ_LONG_DISCOVERY do - M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting' - lfiber.sleep(0.01) + local future = iter.future + if not future then + goto continue end - local replicaset = router.replicasets[rs_uuid] - if not buckets then - p:discard() - log.warn('Error during discovery %s: %s', rs_uuid, err) - elseif module_version ~= M.module_version then + local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT) + if module_version ~= M.module_version then return - elseif replicaset then - discovery_handle_buckets(router, replicaset, buckets[1]) end + if not result then + future:discard() + log.warn('Error during discovery %s, retry will be done '.. + 'later: %s', rs_uuid, err) + goto continue + end + local replicaset = router.replicasets[rs_uuid] + if not replicaset then + iterators[rs_uuid] = nil + log.warn('Replicaset %s was removed during discovery', rs_uuid) + goto continue + end + result = result[1] + -- Buckets are returned as plain array by storages + -- using old vshard version. But if .buckets is set, + -- this is a new storage. + discovery_handle_buckets(router, replicaset, + result.buckets or result) + local discovery_args = iter.args[1] + discovery_args.from = result.next_from + if not result.next_from then + -- Nil next_from means no more buckets to get. + -- Restart the iterator. + iterators[rs_uuid] = nil + end + ::continue:: end - - lfiber.sleep(deadline - lfiber.clock()) + local unknown_bucket_count + repeat + unknown_bucket_count = + router.total_bucket_count - router.known_bucket_count + if unknown_bucket_count == 0 then + if mode ~= 'idle' then + log.info('Discovery enters idle mode, all buckets are '.. + 'known. Discovery works with %s seconds '.. + 'interval now', consts.DISCOVERY_IDLE_INTERVAL) + mode = 'idle' + end + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL) + elseif not next(router.replicasets) then + if mode ~= 'idle' then + log.info('Discovery enters idle mode because '.. + 'configuration does not have replicasets. '.. + 'Retries will happen with %s seconds interval', + consts.DISCOVERY_IDLE_INTERVAL) + mode = 'idle' + end + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL) + else + if mode ~= 'aggressive' then + log.info('Start aggressive discovery, %s buckets are '.. + 'unknown. Discovery works with %s seconds '.. + 'interval', unknown_bucket_count, + consts.DISCOVERY_WORK_INTERVAL) + mode = 'aggressive' + end + lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL) + break + end + while M.errinj.ERRINJ_LONG_DISCOVERY do + M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting' + lfiber.sleep(0.01) + end + until next(router.replicasets) end end @@ -355,9 +435,9 @@ local function discovery_set(router, new_mode) router.discovery_mode = new_mode if router.discovery_fiber ~= nil then pcall(router.discovery_fiber.cancel, router.discovery_fiber) + router.discovery_fiber = nil end if new_mode == 'off' then - router.discovery_fiber = nil return end router.discovery_fiber = util.reloadable_fiber_create( diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 73c6740..c6a78fe 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -75,6 +75,7 @@ if not M then ERRINJ_RECEIVE_PARTIALLY = false, ERRINJ_NO_RECOVERY = false, ERRINJ_UPGRADE = false, + ERRINJ_DISCOVERY = false, }, -- This counter is used to restart background fibers with -- new reloaded code. @@ -1110,10 +1111,58 @@ local function bucket_collect(bucket_id) return data end +-- Discovery used by routers. It returns limited number of +-- buckets to avoid stalls when _bucket is huge. +local function buckets_discovery_extended(opts) + local limit = consts.BUCKET_CHUNK_SIZE + local buckets = table.new(limit, 0) + local active = consts.BUCKET.ACTIVE + local pinned = consts.BUCKET.PINNED + local next_from + local errcnt = M.errinj.ERRINJ_DISCOVERY + if errcnt then + if errcnt > 0 then + M.errinj.ERRINJ_DISCOVERY = errcnt - 1 + if errcnt % 2 == 0 then + box.error(box.error.INJECTION, 'discovery') + end + else + M.errinj.ERRINJ_DISCOVERY = false + end + end + -- No way to select by {status, id}, because there are two + -- statuses to select. A router would need to maintain a + -- separate iterator for each status it wants to get. This may + -- be implemented in future. But _bucket space anyway 99% of + -- time contains only active and pinned buckets. So there is + -- no big benefit in optimizing that. Perhaps a compound index + -- {status, id} could help too. + for _, bucket in box.space._bucket:pairs({opts.from}, + {iterator = box.index.GE}) do + local status = bucket.status + if status == active or status == pinned then + table.insert(buckets, bucket.id) + end + limit = limit - 1 + if limit == 0 then + next_from = bucket.id + 1 + break + end + end + -- Buckets list can even be empty, if all buckets in the + -- scanned chunk are not active/pinned. But next_from still + -- should be returned. So as the router could request more. + return {buckets = buckets, next_from = next_from} +end + -- -- Collect array of active bucket identifiers for discovery. -- -local function buckets_discovery() +local function buckets_discovery(opts) + if opts then + -- Private method. Is not documented intentionally. + return buckets_discovery_extended(opts) + end local ret = {} local status = box.space._bucket.index.status for _, bucket in status:pairs({consts.BUCKET.ACTIVE}) do