[Tarantool-patches] [PATCH vshard 6/7] router: make discovery smoother in a big cluster

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri May 1 03:16:33 MSK 2020


Router does discovery once per 10 seconds. Discovery sends a
request to each replicaset to download all pinned and active
buckets from there. When there are millions of buckets, that
becomes a long operation taking seconds, during which the storage
is unresponsive.

The patch makes discovery work step by step, downloading not more
than 1000 buckets at a time. That gives the storage time to
process other requests.

Moreover, discovery now has some kind of 'state'. For each
replicaset it keeps an iterator which is moved by 1k buckets on
every successfully discovered bucket batch. It means, that if on a
replicaset with 1 000 000 buckets discovery fails after 999 999
buckets are already discovered, it won't start from 0. It will
retry from the old position.

However, still there is space for improvement. Discovery could
avoid downloading anything after all is downloaded, if it could
somehow see, if bucket space is not changed. Unfortunately it is
not so easy, since bucket generation (version of _bucket space)
is not persisted. So after instance restart it is always equal to
bucket count.

Part of #210
---
 test/router/reload.result         |   5 +-
 test/router/reload.test.lua       |   3 +-
 test/router/router.result         |   4 +-
 test/router/router.test.lua       |   2 +-
 test/router/wrong_config.result   |   5 +-
 test/router/wrong_config.test.lua |   5 +-
 vshard/consts.lua                 |   5 +-
 vshard/router/init.lua            | 145 +++++++++++++++++++++++-------
 vshard/storage/init.lua           |  39 +++++++-
 9 files changed, 170 insertions(+), 43 deletions(-)

diff --git a/test/router/reload.result b/test/router/reload.result
index 3ba900a..8fe99ba 100644
--- a/test/router/reload.result
+++ b/test/router/reload.result
@@ -44,7 +44,10 @@ vshard.router.bootstrap()
 while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
 ---
 ...
-while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+---
+...
+while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
 ---
 ...
 --
diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
index 5ed5690..abcbc09 100644
--- a/test/router/reload.test.lua
+++ b/test/router/reload.test.lua
@@ -15,7 +15,8 @@ fiber = require('fiber')
 vshard.router.bootstrap()
 
 while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
-while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
+while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
 
 --
 -- Gh-72: allow reload. Test simple reload, error during
diff --git a/test/router/router.result b/test/router/router.result
index df7be4a..b2efd6d 100644
--- a/test/router/router.result
+++ b/test/router/router.result
@@ -943,9 +943,9 @@ calculate_known_buckets()
 ---
 - 3000
 ...
-test_run:grep_log('router_1', 'was 1, became 1500')
+test_run:grep_log('router_1', 'was 1, became 1000')
 ---
-- was 1, became 1500
+- was 1, became 1000
 ...
 info = vshard.router.info()
 ---
diff --git a/test/router/router.test.lua b/test/router/router.test.lua
index 97dce49..154310b 100644
--- a/test/router/router.test.lua
+++ b/test/router/router.test.lua
@@ -316,7 +316,7 @@ vshard.storage.bucket_pin(first_active)
 _ = test_run:switch('router_1')
 wait_discovery()
 calculate_known_buckets()
-test_run:grep_log('router_1', 'was 1, became 1500')
+test_run:grep_log('router_1', 'was 1, became 1000')
 info = vshard.router.info()
 info.bucket
 info.alerts
diff --git a/test/router/wrong_config.result b/test/router/wrong_config.result
index 56db9e8..92353c3 100644
--- a/test/router/wrong_config.result
+++ b/test/router/wrong_config.result
@@ -45,7 +45,10 @@ cfg.bucket_count = 1000
 r = vshard.router.new('gh-179', cfg)
 ---
 ...
-while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
+while r:info().bucket.available_rw ~= 3000 do                                   \
+    r:discovery_wakeup()                                                        \
+    fiber.sleep(0.1)                                                            \
+end
 ---
 ...
 i = r:info()
diff --git a/test/router/wrong_config.test.lua b/test/router/wrong_config.test.lua
index 62ef30d..174b373 100644
--- a/test/router/wrong_config.test.lua
+++ b/test/router/wrong_config.test.lua
@@ -18,7 +18,10 @@ vshard.router.bootstrap()
 --
 cfg.bucket_count = 1000
 r = vshard.router.new('gh-179', cfg)
-while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
+while r:info().bucket.available_rw ~= 3000 do                                   \
+    r:discovery_wakeup()                                                        \
+    fiber.sleep(0.1)                                                            \
+end
 i = r:info()
 i.bucket
 i.alerts
diff --git a/vshard/consts.lua b/vshard/consts.lua
index 5391c0f..a6a8c1b 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -40,5 +40,8 @@ return {
     DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5;
     RECOVERY_INTERVAL = 5;
     COLLECT_LUA_GARBAGE_INTERVAL = 100;
-    DISCOVERY_INTERVAL = 10;
+    DISCOVERY_IDLE_INTERVAL = 10,
+    DISCOVERY_WORK_INTERVAL = 1,
+    DISCOVERY_WORK_STEP = 0.01,
+    DISCOVERY_TIMEOUT = 10,
 }
diff --git a/vshard/router/init.lua b/vshard/router/init.lua
index 6d88153..26ea85b 100644
--- a/vshard/router/init.lua
+++ b/vshard/router/init.lua
@@ -250,50 +250,127 @@ if util.version_is_at_least(1, 10, 0) then
 discovery_f = function(router)
     local module_version = M.module_version
     assert(router.discovery_mode == 'on')
+    local iterators = {}
+    local opts = {is_async = true}
+    local mode
     while module_version == M.module_version do
-        while not next(router.replicasets) do
-            lfiber.sleep(consts.DISCOVERY_INTERVAL)
-        end
-        if module_version ~= M.module_version then
-            return
-        end
         -- Just typical map reduce - send request to each
-        -- replicaset in parallel, and collect responses.
-        local pending = {}
-        local opts = {is_async = true}
-        local args = {}
-        for rs_uuid, replicaset in pairs(router.replicasets) do
+        -- replicaset in parallel, and collect responses. Many
+        -- requests probably will be needed for each replicaset.
+        --
+        -- Step 1: create missing iterators, in case this is a
+        -- first discovery iteration, or some replicasets were
+        -- added after the router is started.
+        for rs_uuid in pairs(router.replicasets) do
+            local iter = iterators[rs_uuid]
+            if not iter then
+                iterators[rs_uuid] = {
+                    args = {{from = 1}},
+                    future = nil,
+                }
+            end
+        end
+        -- Step 2: map stage - send parallel requests for every
+        -- iterator, prune orphan iterators whose replicasets were
+        -- removed.
+        for rs_uuid, iter in pairs(iterators) do
+            local replicaset = router.replicasets[rs_uuid]
+            if not replicaset then
+                log.warn('Replicaset %s was removed during discovery', rs_uuid)
+                iterators[rs_uuid] = nil
+                goto continue
+            end
             local future, err =
-                replicaset:callro('vshard.storage.buckets_discovery',
-                                  args, opts)
+                replicaset:callro('vshard.storage.buckets_discovery', iter.args,
+                                  opts)
             if not future then
-                log.warn('Error during discovery %s: %s', rs_uuid, err)
-            else
-                pending[rs_uuid] = future
+                log.warn('Error during discovery %s, retry will be done '..
+                         'later: %s', rs_uuid, err)
+                goto continue
+            end
+            iter.future = future
+            -- Don't spam many requests at once. Give
+            -- storages time to handle them and other
+            -- requests.
+            lfiber.sleep(consts.DISCOVERY_WORK_STEP)
+            if module_version ~= M.module_version then
+                return
             end
+            ::continue::
         end
-
-        local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL
-        for rs_uuid, p in pairs(pending) do
+        -- Step 3: reduce stage - collect responses, restart
+        -- iterators which reached the end.
+        for rs_uuid, iter in pairs(iterators) do
             lfiber.yield()
-            local timeout = deadline - lfiber.clock()
-            local buckets, err = p:wait_result(timeout)
-            while M.errinj.ERRINJ_LONG_DISCOVERY do
-                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
-                lfiber.sleep(0.01)
+            local future = iter.future
+            if not future then
+                goto continue
             end
-            local replicaset = router.replicasets[rs_uuid]
-            if not buckets then
-                p:discard()
-                log.warn('Error during discovery %s: %s', rs_uuid, err)
-            elseif module_version ~= M.module_version then
+            local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT)
+            if module_version ~= M.module_version then
                 return
-            elseif replicaset then
-                discovery_handle_buckets(router, replicaset, buckets[1])
             end
+            if not result then
+                future:discard()
+                log.warn('Error during discovery %s, retry will be done '..
+                         'later: %s', rs_uuid, err)
+                goto continue
+            end
+            local replicaset = router.replicasets[rs_uuid]
+            if not replicaset then
+                iterators[rs_uuid] = nil
+                log.warn('Replicaset %s was removed during discovery', rs_uuid)
+                goto continue
+            end
+            result = result[1]
+            -- Buckets are returned as plain array by storages
+            -- using old vshard version. But if .buckets is set,
+            -- this is a new storage.
+            discovery_handle_buckets(router, replicaset,
+                                     result.buckets or result)
+            local discovery_args = iter.args[1]
+            discovery_args.from = result.next_from
+            if not result.next_from then
+                -- Nil next_from means no more buckets to get.
+                -- Restart the iterator.
+                iterators[rs_uuid] = nil
+            end
+            ::continue::
         end
-
-        lfiber.sleep(deadline - lfiber.clock())
+        local unknown_bucket_count
+        repeat
+            unknown_bucket_count =
+                router.total_bucket_count - router.known_bucket_count
+            if unknown_bucket_count == 0 then
+                if mode ~= 'idle' then
+                    log.info('Discovery enters idle mode, all buckets are '..
+                             'known. Discovery works with %s seconds '..
+                             'interval now', consts.DISCOVERY_IDLE_INTERVAL)
+                    mode = 'idle'
+                end
+                lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
+            elseif not next(router.replicasets) then
+                if mode ~= 'idle' then
+                    log.info('Discovery enters idle mode because '..
+                             'configuration does not have replicasets. '..
+                             'Retries will happen with %s seconds interval',
+                             consts.DISCOVERY_IDLE_INTERVAL)
+                    mode = 'idle'
+                end
+                lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
+            elseif mode ~= 'aggressive' then
+                log.info('Start aggressive discovery, %s buckets are unknown. '..
+                         'Discovery works with %s seconds interval',
+                         unknown_bucket_count, consts.DISCOVERY_WORK_INTERVAL)
+                mode = 'aggressive'
+                lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL)
+                break
+            end
+            while M.errinj.ERRINJ_LONG_DISCOVERY do
+                M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
+                lfiber.sleep(0.01)
+            end
+        until next(router.replicasets)
     end
 end
 
@@ -355,9 +432,9 @@ local function discovery_set(router, new_mode)
     router.discovery_mode = new_mode
     if router.discovery_fiber ~= nil then
         pcall(router.discovery_fiber.cancel, router.discovery_fiber)
+        router.discovery_fiber = nil
     end
     if new_mode == 'off' then
-        router.discovery_fiber = nil
         return
     end
     router.discovery_fiber = util.reloadable_fiber_create(
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 73c6740..0050b96 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -1110,10 +1110,47 @@ local function bucket_collect(bucket_id)
     return data
 end
 
+-- Discovery used by routers. It returns limited number of
+-- buckets to avoid stalls when _bucket is huge.
+local function buckets_discovery_extended(opts)
+    local limit = consts.BUCKET_CHUNK_SIZE
+    local buckets = table.new(limit, 0)
+    local active = consts.BUCKET.ACTIVE
+    local pinned = consts.BUCKET.PINNED
+    local next_from
+    -- No way to select by {status, id}, because there are two
+    -- statuses to select. A router would need to maintain a
+    -- separate iterator for each status it wants to get. This may
+    -- be implemented in future. But _bucket space anyway 99% of
+    -- time contains only active and pinned buckets. So there is
+    -- no big benefit in optimizing that. Perhaps a compound index
+    -- {status, id} could help too.
+    for _, bucket in box.space._bucket:pairs({opts.from},
+                                             {iterator = box.index.GE}) do
+        local status = bucket.status
+        if status == active or status == pinned then
+            table.insert(buckets, bucket.id)
+        end
+        limit = limit - 1
+        if limit == 0 then
+            next_from = bucket.id + 1
+            break
+        end
+    end
+    -- Buckets list can even be empty, if all buckets in the
+    -- scanned chunk are not active/pinned. But next_from still
+    -- should be returned. So as the router could request more.
+    return {buckets = buckets, next_from = next_from}
+end
+
 --
 -- Collect array of active bucket identifiers for discovery.
 --
-local function buckets_discovery()
+local function buckets_discovery(opts)
+    if opts then
+        -- Private method. Is not documented intentionally.
+        return buckets_discovery_extended(opts)
+    end
     local ret = {}
     local status = box.space._bucket.index.status
     for _, bucket in status:pairs({consts.BUCKET.ACTIVE}) do
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list