[Tarantool-patches] [PATCH vshard 6/7] router: make discovery smoother in a big cluster
Oleg Babin
olegrok at tarantool.org
Fri May 1 20:01:49 MSK 2020
Thanks for your patch! See my comments below.
On 01/05/2020 03:16, Vladislav Shpilevoy wrote:
> Router does discovery once per 10 seconds. Discovery sends a
> request to each replicaset to download all pinned and active
> buckets from there. When there are millions of buckets, that
> becomes a long operation taking seconds, during which the storage
> is unresponsive.
>
> The patch makes discovery work step by step, downloading not more
> than 1000 buckets at a time. That gives the storage time to
> process other requests.
>
> Moreover, discovery now has some kind of 'state'. For each
> replicaset it keeps an iterator which is moved by 1k buckets on
> every successfully discovered bucket batch. It means, that if on a
> replicaset with 1 000 000 buckets discovery fails after 999 999
> buckets are already discovered, it won't start from 0. It will
> retry from the old position.
Could you provide a test for such case?
> However, still there is space for improvement. Discovery could
> avoid downloading anything after all is downloaded, if it could
> somehow see, if bucket space is not changed. Unfortunately it is
> not so easy, since bucket generation (version of _bucket space)
> is not persisted. So after instance restart it is always equal to
> bucket count.
Is there an issue for that?
> Part of #210
> ---
> test/router/reload.result | 5 +-
> test/router/reload.test.lua | 3 +-
> test/router/router.result | 4 +-
> test/router/router.test.lua | 2 +-
> test/router/wrong_config.result | 5 +-
> test/router/wrong_config.test.lua | 5 +-
> vshard/consts.lua | 5 +-
> vshard/router/init.lua | 145 +++++++++++++++++++++++-------
> vshard/storage/init.lua | 39 +++++++-
> 9 files changed, 170 insertions(+), 43 deletions(-)
>
> diff --git a/test/router/reload.result b/test/router/reload.result
> index 3ba900a..8fe99ba 100644
> --- a/test/router/reload.result
> +++ b/test/router/reload.result
> @@ -44,7 +44,10 @@ vshard.router.bootstrap()
> while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
> ---
> ...
> -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +---
> +...
> +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> ---
> ...
> --
> diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua
> index 5ed5690..abcbc09 100644
> --- a/test/router/reload.test.lua
> +++ b/test/router/reload.test.lua
> @@ -15,7 +15,8 @@ fiber = require('fiber')
> vshard.router.bootstrap()
>
> while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end
> -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
> +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end
>
> --
> -- Gh-72: allow reload. Test simple reload, error during
> diff --git a/test/router/router.result b/test/router/router.result
> index df7be4a..b2efd6d 100644
> --- a/test/router/router.result
> +++ b/test/router/router.result
> @@ -943,9 +943,9 @@ calculate_known_buckets()
> ---
> - 3000
> ...
> -test_run:grep_log('router_1', 'was 1, became 1500')
> +test_run:grep_log('router_1', 'was 1, became 1000')
> ---
> -- was 1, became 1500
> +- was 1, became 1000
> ...
> info = vshard.router.info()
> ---
> diff --git a/test/router/router.test.lua b/test/router/router.test.lua
> index 97dce49..154310b 100644
> --- a/test/router/router.test.lua
> +++ b/test/router/router.test.lua
> @@ -316,7 +316,7 @@ vshard.storage.bucket_pin(first_active)
> _ = test_run:switch('router_1')
> wait_discovery()
> calculate_known_buckets()
> -test_run:grep_log('router_1', 'was 1, became 1500')
> +test_run:grep_log('router_1', 'was 1, became 1000')
> info = vshard.router.info()
> info.bucket
> info.alerts
> diff --git a/test/router/wrong_config.result b/test/router/wrong_config.result
> index 56db9e8..92353c3 100644
> --- a/test/router/wrong_config.result
> +++ b/test/router/wrong_config.result
> @@ -45,7 +45,10 @@ cfg.bucket_count = 1000
> r = vshard.router.new('gh-179', cfg)
> ---
> ...
> -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
> +while r:info().bucket.available_rw ~= 3000 do \
> + r:discovery_wakeup() \
> + fiber.sleep(0.1) \
> +end
> ---
> ...
> i = r:info()
> diff --git a/test/router/wrong_config.test.lua b/test/router/wrong_config.test.lua
> index 62ef30d..174b373 100644
> --- a/test/router/wrong_config.test.lua
> +++ b/test/router/wrong_config.test.lua
> @@ -18,7 +18,10 @@ vshard.router.bootstrap()
> --
> cfg.bucket_count = 1000
> r = vshard.router.new('gh-179', cfg)
> -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end
> +while r:info().bucket.available_rw ~= 3000 do \
> + r:discovery_wakeup() \
> + fiber.sleep(0.1) \
> +end
> i = r:info()
> i.bucket
> i.alerts
> diff --git a/vshard/consts.lua b/vshard/consts.lua
> index 5391c0f..a6a8c1b 100644
> --- a/vshard/consts.lua
> +++ b/vshard/consts.lua
> @@ -40,5 +40,8 @@ return {
> DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5;
> RECOVERY_INTERVAL = 5;
> COLLECT_LUA_GARBAGE_INTERVAL = 100;
> - DISCOVERY_INTERVAL = 10;
> + DISCOVERY_IDLE_INTERVAL = 10,
> + DISCOVERY_WORK_INTERVAL = 1,
> + DISCOVERY_WORK_STEP = 0.01,
> + DISCOVERY_TIMEOUT = 10,
> }
> diff --git a/vshard/router/init.lua b/vshard/router/init.lua
> index 6d88153..26ea85b 100644
> --- a/vshard/router/init.lua
> +++ b/vshard/router/init.lua
> @@ -250,50 +250,127 @@ if util.version_is_at_least(1, 10, 0) then
> discovery_f = function(router)
> local module_version = M.module_version
> assert(router.discovery_mode == 'on')
> + local iterators = {}
> + local opts = {is_async = true}
> + local mode
> while module_version == M.module_version do
> - while not next(router.replicasets) do
> - lfiber.sleep(consts.DISCOVERY_INTERVAL)
> - end
> - if module_version ~= M.module_version then
> - return
> - end
> -- Just typical map reduce - send request to each
> - -- replicaset in parallel, and collect responses.
> - local pending = {}
> - local opts = {is_async = true}
> - local args = {}
> - for rs_uuid, replicaset in pairs(router.replicasets) do
> + -- replicaset in parallel, and collect responses. Many
> + -- requests probably will be needed for each replicaset.
> + --
> + -- Step 1: create missing iterators, in case this is a
> + -- first discovery iteration, or some replicasets were
> + -- added after the router is started.
> + for rs_uuid in pairs(router.replicasets) do
> + local iter = iterators[rs_uuid]
> + if not iter then
> + iterators[rs_uuid] = {
> + args = {{from = 1}},
> + future = nil,
> + }
> + end
> + end
> + -- Step 2: map stage - send parallel requests for every
> + -- iterator, prune orphan iterators whose replicasets were
> + -- removed.
> + for rs_uuid, iter in pairs(iterators) do
> + local replicaset = router.replicasets[rs_uuid]
> + if not replicaset then
> + log.warn('Replicaset %s was removed during discovery', rs_uuid)
> + iterators[rs_uuid] = nil
> + goto continue
> + end
> local future, err =
> - replicaset:callro('vshard.storage.buckets_discovery',
> - args, opts)
> + replicaset:callro('vshard.storage.buckets_discovery', iter.args,
> + opts)
> if not future then
> - log.warn('Error during discovery %s: %s', rs_uuid, err)
> - else
> - pending[rs_uuid] = future
> + log.warn('Error during discovery %s, retry will be done '..
> + 'later: %s', rs_uuid, err)
> + goto continue
> + end
> + iter.future = future
> + -- Don't spam many requests at once. Give
> + -- storages time to handle them and other
> + -- requests.
> + lfiber.sleep(consts.DISCOVERY_WORK_STEP)
> + if module_version ~= M.module_version then
> + return
> end
Is it possible to place such checks to some "common" places. At
start/end of each iteration or between steps. This looks strange and a
bit unobvous to do such checks after each timeout.
> + ::continue::
> end
> -
> - local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL
> - for rs_uuid, p in pairs(pending) do
> + -- Step 3: reduce stage - collect responses, restart
> + -- iterators which reached the end.
> + for rs_uuid, iter in pairs(iterators) do
> lfiber.yield()
> - local timeout = deadline - lfiber.clock()
> - local buckets, err = p:wait_result(timeout)
> - while M.errinj.ERRINJ_LONG_DISCOVERY do
> - M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
> - lfiber.sleep(0.01)
> + local future = iter.future
> + if not future then
> + goto continue
> end
> - local replicaset = router.replicasets[rs_uuid]
> - if not buckets then
> - p:discard()
> - log.warn('Error during discovery %s: %s', rs_uuid, err)
> - elseif module_version ~= M.module_version then
> + local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT)
> + if module_version ~= M.module_version then
Does it have sence to call "discard" before "return"?
> return
> - elseif replicaset then
> - discovery_handle_buckets(router, replicaset, buckets[1])
> end
> + if not result then
> + future:discard()
> + log.warn('Error during discovery %s, retry will be done '..
> + 'later: %s', rs_uuid, err)
> + goto continue
> + end
> + local replicaset = router.replicasets[rs_uuid]
> + if not replicaset then
> + iterators[rs_uuid] = nil
> + log.warn('Replicaset %s was removed during discovery', rs_uuid)
> + goto continue
> + end
> + result = result[1]
> + -- Buckets are returned as plain array by storages
> + -- using old vshard version. But if .buckets is set,
> + -- this is a new storage.
> + discovery_handle_buckets(router, replicaset,
> + result.buckets or result)
> + local discovery_args = iter.args[1]
> + discovery_args.from = result.next_from
> + if not result.next_from then
> + -- Nil next_from means no more buckets to get.
> + -- Restart the iterator.
> + iterators[rs_uuid] = nil
> + end
> + ::continue::
> end
> -
> - lfiber.sleep(deadline - lfiber.clock())
> + local unknown_bucket_count
> + repeat
> + unknown_bucket_count =
> + router.total_bucket_count - router.known_bucket_count
> + if unknown_bucket_count == 0 then
> + if mode ~= 'idle' then
> + log.info('Discovery enters idle mode, all buckets are '..
> + 'known. Discovery works with %s seconds '..
> + 'interval now', consts.DISCOVERY_IDLE_INTERVAL)
> + mode = 'idle'
> + end
> + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
> + elseif not next(router.replicasets) then
> + if mode ~= 'idle' then
> + log.info('Discovery enters idle mode because '..
> + 'configuration does not have replicasets. '..
> + 'Retries will happen with %s seconds interval',
> + consts.DISCOVERY_IDLE_INTERVAL)
> + mode = 'idle'
> + end
> + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL)
> + elseif mode ~= 'aggressive' then
> + log.info('Start aggressive discovery, %s buckets are unknown. '..
> + 'Discovery works with %s seconds interval',
> + unknown_bucket_count, consts.DISCOVERY_WORK_INTERVAL)
> + mode = 'aggressive'
> + lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL)
> + break
> + end
> + while M.errinj.ERRINJ_LONG_DISCOVERY do
> + M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting'
> + lfiber.sleep(0.01)
> + end
> + until next(router.replicasets)
> end
> end
>
> @@ -355,9 +432,9 @@ local function discovery_set(router, new_mode)
> router.discovery_mode = new_mode
> if router.discovery_fiber ~= nil then
> pcall(router.discovery_fiber.cancel, router.discovery_fiber)
> + router.discovery_fiber = nil
> end
> if new_mode == 'off' then
> - router.discovery_fiber = nil
> return
> end
> router.discovery_fiber = util.reloadable_fiber_create(
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 73c6740..0050b96 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -1110,10 +1110,47 @@ local function bucket_collect(bucket_id)
> return data
> end
>
> +-- Discovery used by routers. It returns limited number of
> +-- buckets to avoid stalls when _bucket is huge.
> +local function buckets_discovery_extended(opts)
> + local limit = consts.BUCKET_CHUNK_SIZE
> + local buckets = table.new(limit, 0)
> + local active = consts.BUCKET.ACTIVE
> + local pinned = consts.BUCKET.PINNED
> + local next_from
> + -- No way to select by {status, id}, because there are two
> + -- statuses to select. A router would need to maintain a
> + -- separate iterator for each status it wants to get. This may
> + -- be implemented in future. But _bucket space anyway 99% of
> + -- time contains only active and pinned buckets. So there is
> + -- no big benefit in optimizing that. Perhaps a compound index
> + -- {status, id} could help too.
> + for _, bucket in box.space._bucket:pairs({opts.from},
> + {iterator = box.index.GE}) do
> + local status = bucket.status
> + if status == active or status == pinned then
> + table.insert(buckets, bucket.id)
> + end
> + limit = limit - 1
It's a bit strange after words about "99% time", I propose to move limit
decrease under if condition.
> + if limit == 0 then
> + next_from = bucket.id + 1
> + break
> + end
> + end
> + -- Buckets list can even be empty, if all buckets in the
> + -- scanned chunk are not active/pinned. But next_from still
> + -- should be returned. So as the router could request more.
> + return {buckets = buckets, next_from = next_from}
> +end
> +
> --
> -- Collect array of active bucket identifiers for discovery.
> --
> -local function buckets_discovery()
> +local function buckets_discovery(opts)
> + if opts then
> + -- Private method. Is not documented intentionally.
> + return buckets_discovery_extended(opts)
> + end
> local ret = {}
> local status = box.space._bucket.index.status
> for _, bucket in status:pairs({consts.BUCKET.ACTIVE}) do
>
More information about the Tarantool-patches
mailing list