[Tarantool-patches] [PATCH vshard 6/7] router: make discovery smoother in a big cluster

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat May 2 23:12:01 MSK 2020


Thanks for the review!

On 01/05/2020 19:01, Oleg Babin wrote:
> Thanks for your patch! See my comments below.
> 
> On 01/05/2020 03:16, Vladislav Shpilevoy wrote:
>> Router does discovery once per 10 seconds. Discovery sends a
>> request to each replicaset to download all pinned and active
>> buckets from there. When there are millions of buckets, that
>> becomes a long operation taking seconds, during which the storage
>> is unresponsive.
>>
>> The patch makes discovery work step by step, downloading not more
>> than 1000 buckets at a time. That gives the storage time to
>> process other requests.
>>
>> Moreover, discovery now has some kind of 'state'. For each
>> replicaset it keeps an iterator which is moved by 1k buckets on
>> every successfully discovered bucket batch. It means, that if on a
>> replicaset with 1 000 000 buckets discovery fails after 999 999
>> buckets are already discovered, it won't start from 0. It will
>> retry from the old position.
> 
> Could you provide a test for such case?

No problem.

====================
diff --git a/test/router/router2.result b/test/router/router2.result
index 556f749..0ad5a21 100644
--- a/test/router/router2.result
+++ b/test/router/router2.result
@@ -123,6 +123,66 @@ f1:status(), f2, f3:status(), f4:status(), f5, f6:status()
  | - suspended
  | ...
 
+-- Errored discovery continued successfully after errors are gone.
+vshard.router.bootstrap()
+ | ---
+ | - true
+ | ...
+vshard.router.discovery_set('off')
+ | ---
+ | ...
+vshard.router._route_map_clear()
+ | ---
+ | ...
+
+-- Discovery requests 2 and 4 will fail on storages.
+util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}},                    \
+               'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4')
+ | ---
+ | ...
+
+vshard.router.info().bucket.unknown
+ | ---
+ | - 3000
+ | ...
+vshard.router.discovery_set('on')
+ | ---
+ | ...
+function continue_discovery()                                                   \
+    local res = vshard.router.info().bucket.unknown == 0                        \
+    if not res then                                                             \
+        vshard.router.discovery_wakeup()                                        \
+    end                                                                         \
+    return res                                                                  \
+end
+ | ---
+ | ...
+test_run:wait_cond(continue_discovery)
+ | ---
+ | - true
+ | ...
+vshard.router.info().bucket.unknown
+ | ---
+ | - 0
+ | ...
+
+-- Discovery injections should be reset meaning they were returned
+-- needed number of times.
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+vshard.storage.internal.errinj.ERRINJ_DISCOVERY
+ | ---
+ | - 0
+ | ...
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+vshard.storage.internal.errinj.ERRINJ_DISCOVERY
+ | ---
+ | - 0
+ | ...
+
 _ = test_run:switch("default")
  | ---
  | ...
diff --git a/test/router/router2.test.lua b/test/router/router2.test.lua
index 33f4d3e..13fa73f 100644
--- a/test/router/router2.test.lua
+++ b/test/router/router2.test.lua
@@ -43,6 +43,34 @@ vshard.router.static.discovery_fiber:status()
 
 f1:status(), f2, f3:status(), f4:status(), f5, f6:status()
 
+-- Errored discovery continued successfully after errors are gone.
+vshard.router.bootstrap()
+vshard.router.discovery_set('off')
+vshard.router._route_map_clear()
+
+-- Discovery requests 2 and 4 will fail on storages.
+util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}},                    \
+               'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4')
+
+vshard.router.info().bucket.unknown
+vshard.router.discovery_set('on')
+function continue_discovery()                                                   \
+    local res = vshard.router.info().bucket.unknown == 0                        \
+    if not res then                                                             \
+        vshard.router.discovery_wakeup()                                        \
+    end                                                                         \
+    return res                                                                  \
+end
+test_run:wait_cond(continue_discovery)
+vshard.router.info().bucket.unknown
+
+-- Discovery injections should be reset meaning they were returned
+-- needed number of times.
+_ = test_run:switch('storage_1_a')
+vshard.storage.internal.errinj.ERRINJ_DISCOVERY
+_ = test_run:switch('storage_2_a')
+vshard.storage.internal.errinj.ERRINJ_DISCOVERY
+
 _ = test_run:switch("default")
 _ = test_run:cmd("stop server router_1")
 _ = test_run:cmd("cleanup server router_1")
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 0050b96..c6a78fe 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -75,6 +75,7 @@ if not M then
             ERRINJ_RECEIVE_PARTIALLY = false,
             ERRINJ_NO_RECOVERY = false,
             ERRINJ_UPGRADE = false,
+            ERRINJ_DISCOVERY = false,
         },
         -- This counter is used to restart background fibers with
         -- new reloaded code.
@@ -1118,6 +1119,17 @@ local function buckets_discovery_extended(opts)
     local active = consts.BUCKET.ACTIVE
     local pinned = consts.BUCKET.PINNED
     local next_from
+    local errcnt = M.errinj.ERRINJ_DISCOVERY
+    if errcnt then
+        if errcnt > 0 then
+            M.errinj.ERRINJ_DISCOVERY = errcnt - 1
+            if errcnt % 2 == 0 then
+                box.error(box.error.INJECTION, 'discovery')
+            end
+        else
+            M.errinj.ERRINJ_DISCOVERY = false
+        end
+    end
     -- No way to select by {status, id}, because there are two
     -- statuses to select. A router would need to maintain a
     -- separate iterator for each status it wants to get. This may

====================

I couldn't come up with a sane test on whether router continues
discovery from exactly the same place where it got an error. I would
need to insert some extraordinary weird injections into router to
collect statistics of that kind. But the test at least shows, that an
error does not prevent full discovery eventually.

>> However, still there is space for improvement. Discovery could
>> avoid downloading anything after all is downloaded, if it could
>> somehow see, if bucket space is not changed. Unfortunately it is
>> not so easy, since bucket generation (version of _bucket space)
>> is not persisted. So after instance restart it is always equal to
>> bucket count.
> 
> Is there an issue for that?

I was hoping you wouldn't ask :D

I don't really see how that optimization could be implemented in a sane
way in terms of simplicity. Looks like a storage would need to send
a bucket generation + some timestamp to protect form the case when
after restart bucket set is different, but generation is the same.

I was going to wait until somebody explicitly asks for it, if
'discovery_mode = "once"' won't be enough.

But ok, I was caught. Here is the issue:
https://github.com/tarantool/vshard/issues/238

>> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
>> index 6d88153..26ea85b 100644
>> --- a/vshard/router/init.lua
>> +++ b/vshard/router/init.lua
>> @@ -250,50 +250,127 @@ if util.version_is_at_least(1, 10, 0) then
>>   discovery_f = function(router)
>>       local module_version = M.module_version
>>       assert(router.discovery_mode == 'on')
>> +    local iterators = {}
>> +    local opts = {is_async = true}
>> +    local mode
>>       while module_version == M.module_version do
>> -        while not next(router.replicasets) do
>> -            lfiber.sleep(consts.DISCOVERY_INTERVAL)
>> -        end
>> -        if module_version ~= M.module_version then
>> -            return
>> -        end
>>           -- Just typical map reduce - send request to each
>> -        -- replicaset in parallel, and collect responses.
>> -        local pending = {}
>> -        local opts = {is_async = true}
>> -        local args = {}
>> -        for rs_uuid, replicaset in pairs(router.replicasets) do
>> +        -- replicaset in parallel, and collect responses. Many
>> +        -- requests probably will be needed for each replicaset.
>> +        --
>> +        -- Step 1: create missing iterators, in case this is a
>> +        -- first discovery iteration, or some replicasets were
>> +        -- added after the router is started.
>> +        for rs_uuid in pairs(router.replicasets) do
>> +            local iter = iterators[rs_uuid]
>> +            if not iter then
>> +                iterators[rs_uuid] = {
>> +                    args = {{from = 1}},
>> +                    future = nil,
>> +                }
>> +            end
>> +        end
>> +        -- Step 2: map stage - send parallel requests for every
>> +        -- iterator, prune orphan iterators whose replicasets were
>> +        -- removed.
>> +        for rs_uuid, iter in pairs(iterators) do
>> +            local replicaset = router.replicasets[rs_uuid]
>> +            if not replicaset then
>> +                log.warn('Replicaset %s was removed during discovery', rs_uuid)
>> +                iterators[rs_uuid] = nil
>> +                goto continue
>> +            end
>>               local future, err =
>> -                replicaset:callro('vshard.storage.buckets_discovery',
>> -                                  args, opts)
>> +                replicaset:callro('vshard.storage.buckets_discovery', iter.args,
>> +                                  opts)
>>               if not future then
>> -                log.warn('Error during discovery %s: %s', rs_uuid, err)
>> -            else
>> -                pending[rs_uuid] = future
>> +                log.warn('Error during discovery %s, retry will be done '..
>> +                         'later: %s', rs_uuid, err)
>> +                goto continue
>> +            end
>> +            iter.future = future
>> +            -- Don't spam many requests at once. Give
>> +            -- storages time to handle them and other
>> +            -- requests.
>> +            lfiber.sleep(consts.DISCOVERY_WORK_STEP)
>> +            if module_version ~= M.module_version then
>> +                return
>>               end
> 
> 
> Is it possible to place such checks to some "common" places. At start/end of each iteration or between steps. This looks strange and a bit unobvous to do such checks after each timeout.

Purpose is to stop the fiber as soon as possible in case a reload
happened. So moving it to a common place is not really possible.
It would make it work longer with the old code after reload. But
if we keep the checks, I still need to do then after/before any
long yield.

>> +            ::continue::
>>           end
>> -
>> -        local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL
>> -        for rs_uuid, p in pairs(pending) do
>> +        -- Step 3: reduce stage - collect responses, restart
>> +        -- iterators which reached the end.
>> +        for rs_uuid, iter in pairs(iterators) do
>>               lfiber.yield()
>> -            local timeout = deadline - lfiber.clock()
>> -            local buckets, err = p:wait_result(timeout)
>> -            while M.errinj.ERRINJ_LONG_DISCOVERY do
>> -                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
>> -                lfiber.sleep(0.01)
>> +            local future = iter.future
>> +            if not future then
>> +                goto continue
>>               end
>> -            local replicaset = router.replicasets[rs_uuid]
>> -            if not buckets then
>> -                p:discard()
>> -                log.warn('Error during discovery %s: %s', rs_uuid, err)
>> -            elseif module_version ~= M.module_version then
>> +            local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT)
>> +            if module_version ~= M.module_version then
> 
> Does it have sence to call "discard" before "return"?

Yes. Discard removes the entry from netbox's internal table of requests
waiting for a result. I don't know how long the storage won't respond on
that, so better free the memory now. The entry is not removed automatically
in case it was a timeout error.

>> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
>> index 73c6740..0050b96 100644
>> --- a/vshard/storage/init.lua
>> +++ b/vshard/storage/init.lua
>> @@ -1110,10 +1110,47 @@ local function bucket_collect(bucket_id)
>>       return data
>>   end
>>   +-- Discovery used by routers. It returns limited number of
>> +-- buckets to avoid stalls when _bucket is huge.
>> +local function buckets_discovery_extended(opts)
>> +    local limit = consts.BUCKET_CHUNK_SIZE
>> +    local buckets = table.new(limit, 0)
>> +    local active = consts.BUCKET.ACTIVE
>> +    local pinned = consts.BUCKET.PINNED
>> +    local next_from
>> +    -- No way to select by {status, id}, because there are two
>> +    -- statuses to select. A router would need to maintain a
>> +    -- separate iterator for each status it wants to get. This may
>> +    -- be implemented in future. But _bucket space anyway 99% of
>> +    -- time contains only active and pinned buckets. So there is
>> +    -- no big benefit in optimizing that. Perhaps a compound index
>> +    -- {status, id} could help too.
>> +    for _, bucket in box.space._bucket:pairs({opts.from},
>> +                                             {iterator = box.index.GE}) do
>> +        local status = bucket.status
>> +        if status == active or status == pinned then
>> +            table.insert(buckets, bucket.id)
>> +        end
>> +        limit = limit - 1
> 
> It's a bit strange after words about "99% time", I propose to move limit decrease under if condition.

But still there is this 1%, when buckets start moving. When bucket count
is millions, rebalancing is likely to move thousands of buckets, likely
from one range. For example, buckets from 1 to 10k can be moved. If in
that moment will arrive a discovery request, it will stuck for a long time
iterating over the sent and garbage buckets.

Rebalancing is a heavy thing, and such additional load would make the
cluster feel even worse.

99% here is rather about that it is not a problem to sometimes return an
empty bucket set. Not about allowing perf problems in the other 1%.


During working on your comments I found that during aggressive discovery I
added sleep in a wrong place. Fixed below:

====================
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 26ea85b..28437e3 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -358,11 +358,14 @@ discovery_f = function(router)
                     mode = 'idle'
                 end
                 lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
-            elseif mode ~= 'aggressive' then
-                log.info('Start aggressive discovery, %s buckets are unknown. '..
-                         'Discovery works with %s seconds interval',
-                         unknown_bucket_count, consts.DISCOVERY_WORK_INTERVAL)
-                mode = 'aggressive'
+            else
+                if mode ~= 'aggressive' then
+                    log.info('Start aggressive discovery, %s buckets are '..
+                             'unknown. Discovery works with %s seconds '..
+                             'interval', unknown_bucket_count,
+                             consts.DISCOVERY_WORK_INTERVAL)
+                    mode = 'aggressive'
+                end
                 lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL)
                 break
             end

====================

Since the changes are big, below is the whole new commit:

====================

    router: make discovery smoother in a big cluster
    
    Router does discovery once per 10 seconds. Discovery sends a
    request to each replicaset to download all pinned and active
    buckets from there. When there are millions of buckets, that
    becomes a long operation taking seconds, during which the storage
    is unresponsive.
    
    The patch makes discovery work step by step, downloading not more
    than 1000 buckets at a time. That gives the storage time to
    process other requests.
    
    Moreover, discovery now has some kind of 'state'. For each
    replicaset it keeps an iterator which is moved by 1k buckets on
    every successfully discovered bucket batch. It means, that if on a
    replicaset with 1 000 000 buckets discovery fails after 999 999
    buckets are already discovered, it won't start from 0. It will
    retry from the old position.
    
    However, still there is space for improvement. Discovery could
    avoid downloading anything after all is downloaded, if it could
    somehow see, if bucket space is not changed. Unfortunately it is
    not so easy, since bucket generation (version of _bucket space)
    is not persisted. So after instance restart it is always equal to
    bucket count.
    
    Part of #210

diff --git a/test/router/reload.result b/test/router/reload.result
index 3ba900a..8fe99ba 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -44,7 +44,10 @@ vshard.router.bootstrap()
 while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
 ---
 ...
-while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+---
+...
+while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
 ---
 ...
 --
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 5ed5690..abcbc09 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -15,7 +15,8 @@ fiber = require('fiber')
 vshard.router.bootstrap()
 
 while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
-while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
 
 --
 -- Gh-72: allow reload. Test simple reload, error during
diff --git a/test/router/router.result b/test/router/router.result
index df7be4a..b2efd6d 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -943,9 +943,9 @@ calculate_known_buckets()
 ---
 - 3000
 ...
-test_run:grep_log('router_1', 'was 1, became 1500')
+test_run:grep_log('router_1', 'was 1, became 1000')
 ---
-- was 1, became 1500
+- was 1, became 1000
 ...
 info = vshard.router.info()
 ---
diff --git a/test/router/router.test.lua b/test/router/router.test.lua
index 97dce49..154310b 100644
--- a/test/router/router.test.lua
+++ b/test/router/router.test.lua
@@ -316,7 +316,7 @@ vshard.storage.bucket_pin(first_active)
 _ = test_run:switch('router_1')
 wait_discovery()
 calculate_known_buckets()
-test_run:grep_log('router_1', 'was 1, became 1500')
+test_run:grep_log('router_1', 'was 1, became 1000')
 info = vshard.router.info()
 info.bucket
 info.alerts
diff --git a/test/router/router2.result b/test/router/router2.result
index 556f749..0ad5a21 100644
--- a/test/router/router2.result
+++ b/test/router/router2.result
@@ -123,6 +123,66 @@ f1:status(), f2, f3:status(), f4:status(), f5, f6:status()
  | - suspended
  | ...
 
+-- Errored discovery continued successfully after errors are gone.
+vshard.router.bootstrap()
+ | ---
+ | - true
+ | ...
+vshard.router.discovery_set('off')
+ | ---
+ | ...
+vshard.router._route_map_clear()
+ | ---
+ | ...
+
+-- Discovery requests 2 and 4 will fail on storages.
+util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}},                    \
+               'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4')
+ | ---
+ | ...
+
+vshard.router.info().bucket.unknown
+ | ---
+ | - 3000
+ | ...
+vshard.router.discovery_set('on')
+ | ---
+ | ...
+function continue_discovery()                                                   \
+    local res = vshard.router.info().bucket.unknown == 0                        \
+    if not res then                                                             \
+        vshard.router.discovery_wakeup()                                        \
+    end                                                                         \
+    return res                                                                  \
+end
+ | ---
+ | ...
+test_run:wait_cond(continue_discovery)
+ | ---
+ | - true
+ | ...
+vshard.router.info().bucket.unknown
+ | ---
+ | - 0
+ | ...
+
+-- Discovery injections should be reset meaning they were returned
+-- needed number of times.
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+vshard.storage.internal.errinj.ERRINJ_DISCOVERY
+ | ---
+ | - 0
+ | ...
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+vshard.storage.internal.errinj.ERRINJ_DISCOVERY
+ | ---
+ | - 0
+ | ...
+
 _ = test_run:switch("default")
  | ---
  | ...
diff --git a/test/router/router2.test.lua b/test/router/router2.test.lua
index 33f4d3e..ef05f8c 100644
--- a/test/router/router2.test.lua
+++ b/test/router/router2.test.lua
@@ -43,6 +43,34 @@ vshard.router.static.discovery_fiber:status()
 
 f1:status(), f2, f3:status(), f4:status(), f5, f6:status()
 
+-- Errored discovery continued successfully after errors are gone.
+vshard.router.bootstrap()
+vshard.router.discovery_set('off')
+vshard.router._route_map_clear()
+
+-- Discovery requests 2 and 4 will fail on storages.
+util.map_evals(test_run, {{'storage_1_a'}, {'storage_2_a'}},                    \
+               'vshard.storage.internal.errinj.ERRINJ_DISCOVERY = 4')
+
+vshard.router.info().bucket.unknown
+vshard.router.discovery_set('on')
+function continue_discovery()                                                   \
+    local res = vshard.router.info().bucket.unknown == 0                        \
+    if not res then                                                             \
+        vshard.router.discovery_wakeup()                                        \
+    end                                                                         \
+    return res                                                                  \
+end
+test_run:wait_cond(continue_discovery)
+vshard.router.info().bucket.unknown
+
+-- Discovery injections should be reset meaning they were returned
+-- needed number of times.
+_ = test_run:switch('storage_1_a')
+vshard.storage.internal.errinj.ERRINJ_DISCOVERY
+_ = test_run:switch('storage_2_a')
+vshard.storage.internal.errinj.ERRINJ_DISCOVERY
+
 _ = test_run:switch("default")
 _ = test_run:cmd("stop server router_1")
 _ = test_run:cmd("cleanup server router_1")
diff --git a/test/router/wrong_config.result b/test/router/wrong_config.result
index 56db9e8..92353c3 100644
--- a/test/router/wrong_config.result
+++ b/test/router/wrong_config.result
@@ -45,7 +45,10 @@ cfg.bucket_count = 1000
 r = vshard.router.new('gh-179', cfg)
 ---
 ...
-while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
+while r:info().bucket.available_rw ~= 3000 do                                   \
+    r:discovery_wakeup()                                                        \
+    fiber.sleep(0.1)                                                            \
+end
 ---
 ...
 i = r:info()
diff --git a/test/router/wrong_config.test.lua b/test/router/wrong_config.test.lua
index 62ef30d..174b373 100644
--- a/test/router/wrong_config.test.lua
+++ b/test/router/wrong_config.test.lua
@@ -18,7 +18,10 @@ vshard.router.bootstrap()
 --
 cfg.bucket_count = 1000
 r = vshard.router.new('gh-179', cfg)
-while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
+while r:info().bucket.available_rw ~= 3000 do                                   \
+    r:discovery_wakeup()                                                        \
+    fiber.sleep(0.1)                                                            \
+end
 i = r:info()
 i.bucket
 i.alerts
diff --git a/vshard/consts.lua b/vshard/consts.lua
index 5391c0f..a6a8c1b 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -40,5 +40,8 @@ return {
     DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5;
     RECOVERY_INTERVAL = 5;
     COLLECT_LUA_GARBAGE_INTERVAL = 100;
-    DISCOVERY_INTERVAL = 10;
+    DISCOVERY_IDLE_INTERVAL = 10,
+    DISCOVERY_WORK_INTERVAL = 1,
+    DISCOVERY_WORK_STEP = 0.01,
+    DISCOVERY_TIMEOUT = 10,
 }
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 6d88153..28437e3 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -250,50 +250,130 @@ if util.version_is_at_least(1, 10, 0) then
 discovery_f = function(router)
     local module_version = M.module_version
     assert(router.discovery_mode == 'on')
+    local iterators = {}
+    local opts = {is_async = true}
+    local mode
     while module_version == M.module_version do
-        while not next(router.replicasets) do
-            lfiber.sleep(consts.DISCOVERY_INTERVAL)
-        end
-        if module_version ~= M.module_version then
-            return
-        end
         -- Just typical map reduce - send request to each
-        -- replicaset in parallel, and collect responses.
-        local pending = {}
-        local opts = {is_async = true}
-        local args = {}
-        for rs_uuid, replicaset in pairs(router.replicasets) do
+        -- replicaset in parallel, and collect responses. Many
+        -- requests probably will be needed for each replicaset.
+        --
+        -- Step 1: create missing iterators, in case this is a
+        -- first discovery iteration, or some replicasets were
+        -- added after the router is started.
+        for rs_uuid in pairs(router.replicasets) do
+            local iter = iterators[rs_uuid]
+            if not iter then
+                iterators[rs_uuid] = {
+                    args = {{from = 1}},
+                    future = nil,
+                }
+            end
+        end
+        -- Step 2: map stage - send parallel requests for every
+        -- iterator, prune orphan iterators whose replicasets were
+        -- removed.
+        for rs_uuid, iter in pairs(iterators) do
+            local replicaset = router.replicasets[rs_uuid]
+            if not replicaset then
+                log.warn('Replicaset %s was removed during discovery', rs_uuid)
+                iterators[rs_uuid] = nil
+                goto continue
+            end
             local future, err =
-                replicaset:callro('vshard.storage.buckets_discovery',
-                                  args, opts)
+                replicaset:callro('vshard.storage.buckets_discovery', iter.args,
+                                  opts)
             if not future then
-                log.warn('Error during discovery %s: %s', rs_uuid, err)
-            else
-                pending[rs_uuid] = future
+                log.warn('Error during discovery %s, retry will be done '..
+                         'later: %s', rs_uuid, err)
+                goto continue
             end
+            iter.future = future
+            -- Don't spam many requests at once. Give
+            -- storages time to handle them and other
+            -- requests.
+            lfiber.sleep(consts.DISCOVERY_WORK_STEP)
+            if module_version ~= M.module_version then
+                return
+            end
+            ::continue::
         end
-
-        local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL
-        for rs_uuid, p in pairs(pending) do
+        -- Step 3: reduce stage - collect responses, restart
+        -- iterators which reached the end.
+        for rs_uuid, iter in pairs(iterators) do
             lfiber.yield()
-            local timeout = deadline - lfiber.clock()
-            local buckets, err = p:wait_result(timeout)
-            while M.errinj.ERRINJ_LONG_DISCOVERY do
-                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
-                lfiber.sleep(0.01)
+            local future = iter.future
+            if not future then
+                goto continue
             end
-            local replicaset = router.replicasets[rs_uuid]
-            if not buckets then
-                p:discard()
-                log.warn('Error during discovery %s: %s', rs_uuid, err)
-            elseif module_version ~= M.module_version then
+            local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT)
+            if module_version ~= M.module_version then
                 return
-            elseif replicaset then
-                discovery_handle_buckets(router, replicaset, buckets[1])
             end
+            if not result then
+                future:discard()
+                log.warn('Error during discovery %s, retry will be done '..
+                         'later: %s', rs_uuid, err)
+                goto continue
+            end
+            local replicaset = router.replicasets[rs_uuid]
+            if not replicaset then
+                iterators[rs_uuid] = nil
+                log.warn('Replicaset %s was removed during discovery', rs_uuid)
+                goto continue
+            end
+            result = result[1]
+            -- Buckets are returned as plain array by storages
+            -- using old vshard version. But if .buckets is set,
+            -- this is a new storage.
+            discovery_handle_buckets(router, replicaset,
+                                     result.buckets or result)
+            local discovery_args = iter.args[1]
+            discovery_args.from = result.next_from
+            if not result.next_from then
+                -- Nil next_from means no more buckets to get.
+                -- Restart the iterator.
+                iterators[rs_uuid] = nil
+            end
+            ::continue::
         end
-
-        lfiber.sleep(deadline - lfiber.clock())
+        local unknown_bucket_count
+        repeat
+            unknown_bucket_count =
+                router.total_bucket_count - router.known_bucket_count
+            if unknown_bucket_count == 0 then
+                if mode ~= 'idle' then
+                    log.info('Discovery enters idle mode, all buckets are '..
+                             'known. Discovery works with %s seconds '..
+                             'interval now', consts.DISCOVERY_IDLE_INTERVAL)
+                    mode = 'idle'
+                end
+                lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
+            elseif not next(router.replicasets) then
+                if mode ~= 'idle' then
+                    log.info('Discovery enters idle mode because '..
+                             'configuration does not have replicasets. '..
+                             'Retries will happen with %s seconds interval',
+                             consts.DISCOVERY_IDLE_INTERVAL)
+                    mode = 'idle'
+                end
+                lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
+            else
+                if mode ~= 'aggressive' then
+                    log.info('Start aggressive discovery, %s buckets are '..
+                             'unknown. Discovery works with %s seconds '..
+                             'interval', unknown_bucket_count,
+                             consts.DISCOVERY_WORK_INTERVAL)
+                    mode = 'aggressive'
+                end
+                lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL)
+                break
+            end
+            while M.errinj.ERRINJ_LONG_DISCOVERY do
+                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
+                lfiber.sleep(0.01)
+            end
+        until next(router.replicasets)
     end
 end
 
@@ -355,9 +435,9 @@ local function discovery_set(router, new_mode)
     router.discovery_mode = new_mode
     if router.discovery_fiber ~= nil then
         pcall(router.discovery_fiber.cancel, router.discovery_fiber)
+        router.discovery_fiber = nil
     end
     if new_mode == 'off' then
-        router.discovery_fiber = nil
         return
     end
     router.discovery_fiber = util.reloadable_fiber_create(
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 73c6740..c6a78fe 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -75,6 +75,7 @@ if not M then
             ERRINJ_RECEIVE_PARTIALLY = false,
             ERRINJ_NO_RECOVERY = false,
             ERRINJ_UPGRADE = false,
+            ERRINJ_DISCOVERY = false,
         },
         -- This counter is used to restart background fibers with
         -- new reloaded code.
@@ -1110,10 +1111,58 @@ local function bucket_collect(bucket_id)
     return data
 end
 
+-- Discovery used by routers. It returns limited number of
+-- buckets to avoid stalls when _bucket is huge.
+local function buckets_discovery_extended(opts)
+    local limit = consts.BUCKET_CHUNK_SIZE
+    local buckets = table.new(limit, 0)
+    local active = consts.BUCKET.ACTIVE
+    local pinned = consts.BUCKET.PINNED
+    local next_from
+    local errcnt = M.errinj.ERRINJ_DISCOVERY
+    if errcnt then
+        if errcnt > 0 then
+            M.errinj.ERRINJ_DISCOVERY = errcnt - 1
+            if errcnt % 2 == 0 then
+                box.error(box.error.INJECTION, 'discovery')
+            end
+        else
+            M.errinj.ERRINJ_DISCOVERY = false
+        end
+    end
+    -- No way to select by {status, id}, because there are two
+    -- statuses to select. A router would need to maintain a
+    -- separate iterator for each status it wants to get. This may
+    -- be implemented in future. But _bucket space anyway 99% of
+    -- time contains only active and pinned buckets. So there is
+    -- no big benefit in optimizing that. Perhaps a compound index
+    -- {status, id} could help too.
+    for _, bucket in box.space._bucket:pairs({opts.from},
+                                             {iterator = box.index.GE}) do
+        local status = bucket.status
+        if status == active or status == pinned then
+            table.insert(buckets, bucket.id)
+        end
+        limit = limit - 1
+        if limit == 0 then
+            next_from = bucket.id + 1
+            break
+        end
+    end
+    -- Buckets list can even be empty, if all buckets in the
+    -- scanned chunk are not active/pinned. But next_from still
+    -- should be returned. So as the router could request more.
+    return {buckets = buckets, next_from = next_from}
+end
+
 --
 -- Collect array of active bucket identifiers for discovery.
 --
-local function buckets_discovery()
+local function buckets_discovery(opts)
+    if opts then
+        -- Private method. Is not documented intentionally.
+        return buckets_discovery_extended(opts)
+    end
     local ret = {}
     local status = box.space._bucket.index.status
     for _, bucket in status:pairs({consts.BUCKET.ACTIVE}) do


More information about the Tarantool-patches mailing list