From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp50.i.mail.ru (smtp50.i.mail.ru [94.100.177.110]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id B695F4696C3 for ; Fri, 1 May 2020 20:01:50 +0300 (MSK) From: Oleg Babin References: <80bacfc685233ad047f6a80ddadd72b8903eae5b.1588292014.git.v.shpilevoy@tarantool.org> Message-ID: Date: Fri, 1 May 2020 20:01:49 +0300 MIME-Version: 1.0 In-Reply-To: <80bacfc685233ad047f6a80ddadd72b8903eae5b.1588292014.git.v.shpilevoy@tarantool.org> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-GB Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH vshard 6/7] router: make discovery smoother in a big cluster List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org Thanks for your patch! See my comments below. On 01/05/2020 03:16, Vladislav Shpilevoy wrote: > Router does discovery once per 10 seconds. Discovery sends a > request to each replicaset to download all pinned and active > buckets from there. When there are millions of buckets, that > becomes a long operation taking seconds, during which the storage > is unresponsive. > > The patch makes discovery work step by step, downloading not more > than 1000 buckets at a time. That gives the storage time to > process other requests. > > Moreover, discovery now has some kind of 'state'. For each > replicaset it keeps an iterator which is moved by 1k buckets on > every successfully discovered bucket batch. It means, that if on a > replicaset with 1 000 000 buckets discovery fails after 999 999 > buckets are already discovered, it won't start from 0. It will > retry from the old position. Could you provide a test for such case? > However, still there is space for improvement. Discovery could > avoid downloading anything after all is downloaded, if it could > somehow see, if bucket space is not changed. Unfortunately it is > not so easy, since bucket generation (version of _bucket space) > is not persisted. So after instance restart it is always equal to > bucket count. Is there an issue for that? > Part of #210 > --- > test/router/reload.result | 5 +- > test/router/reload.test.lua | 3 +- > test/router/router.result | 4 +- > test/router/router.test.lua | 2 +- > test/router/wrong_config.result | 5 +- > test/router/wrong_config.test.lua | 5 +- > vshard/consts.lua | 5 +- > vshard/router/init.lua | 145 +++++++++++++++++++++++------- > vshard/storage/init.lua | 39 +++++++- > 9 files changed, 170 insertions(+), 43 deletions(-) > > diff --git a/test/router/reload.result b/test/router/reload.result > index 3ba900a..8fe99ba 100644 > --- a/test/router/reload.result > +++ b/test/router/reload.result > @@ -44,7 +44,10 @@ vshard.router.bootstrap() > while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end > --- > ... > -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +--- > +... > +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > --- > ... > -- > diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua > index 5ed5690..abcbc09 100644 > --- a/test/router/reload.test.lua > +++ b/test/router/reload.test.lua > @@ -15,7 +15,8 @@ fiber = require('fiber') > vshard.router.bootstrap() > > while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end > -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end > > -- > -- Gh-72: allow reload. Test simple reload, error during > diff --git a/test/router/router.result b/test/router/router.result > index df7be4a..b2efd6d 100644 > --- a/test/router/router.result > +++ b/test/router/router.result > @@ -943,9 +943,9 @@ calculate_known_buckets() > --- > - 3000 > ... > -test_run:grep_log('router_1', 'was 1, became 1500') > +test_run:grep_log('router_1', 'was 1, became 1000') > --- > -- was 1, became 1500 > +- was 1, became 1000 > ... > info = vshard.router.info() > --- > diff --git a/test/router/router.test.lua b/test/router/router.test.lua > index 97dce49..154310b 100644 > --- a/test/router/router.test.lua > +++ b/test/router/router.test.lua > @@ -316,7 +316,7 @@ vshard.storage.bucket_pin(first_active) > _ = test_run:switch('router_1') > wait_discovery() > calculate_known_buckets() > -test_run:grep_log('router_1', 'was 1, became 1500') > +test_run:grep_log('router_1', 'was 1, became 1000') > info = vshard.router.info() > info.bucket > info.alerts > diff --git a/test/router/wrong_config.result b/test/router/wrong_config.result > index 56db9e8..92353c3 100644 > --- a/test/router/wrong_config.result > +++ b/test/router/wrong_config.result > @@ -45,7 +45,10 @@ cfg.bucket_count = 1000 > r = vshard.router.new('gh-179', cfg) > --- > ... > -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end > +while r:info().bucket.available_rw ~= 3000 do \ > + r:discovery_wakeup() \ > + fiber.sleep(0.1) \ > +end > --- > ... > i = r:info() > diff --git a/test/router/wrong_config.test.lua b/test/router/wrong_config.test.lua > index 62ef30d..174b373 100644 > --- a/test/router/wrong_config.test.lua > +++ b/test/router/wrong_config.test.lua > @@ -18,7 +18,10 @@ vshard.router.bootstrap() > -- > cfg.bucket_count = 1000 > r = vshard.router.new('gh-179', cfg) > -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end > +while r:info().bucket.available_rw ~= 3000 do \ > + r:discovery_wakeup() \ > + fiber.sleep(0.1) \ > +end > i = r:info() > i.bucket > i.alerts > diff --git a/vshard/consts.lua b/vshard/consts.lua > index 5391c0f..a6a8c1b 100644 > --- a/vshard/consts.lua > +++ b/vshard/consts.lua > @@ -40,5 +40,8 @@ return { > DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5; > RECOVERY_INTERVAL = 5; > COLLECT_LUA_GARBAGE_INTERVAL = 100; > - DISCOVERY_INTERVAL = 10; > + DISCOVERY_IDLE_INTERVAL = 10, > + DISCOVERY_WORK_INTERVAL = 1, > + DISCOVERY_WORK_STEP = 0.01, > + DISCOVERY_TIMEOUT = 10, > } > diff --git a/vshard/router/init.lua b/vshard/router/init.lua > index 6d88153..26ea85b 100644 > --- a/vshard/router/init.lua > +++ b/vshard/router/init.lua > @@ -250,50 +250,127 @@ if util.version_is_at_least(1, 10, 0) then > discovery_f = function(router) > local module_version = M.module_version > assert(router.discovery_mode == 'on') > + local iterators = {} > + local opts = {is_async = true} > + local mode > while module_version == M.module_version do > - while not next(router.replicasets) do > - lfiber.sleep(consts.DISCOVERY_INTERVAL) > - end > - if module_version ~= M.module_version then > - return > - end > -- Just typical map reduce - send request to each > - -- replicaset in parallel, and collect responses. > - local pending = {} > - local opts = {is_async = true} > - local args = {} > - for rs_uuid, replicaset in pairs(router.replicasets) do > + -- replicaset in parallel, and collect responses. Many > + -- requests probably will be needed for each replicaset. > + -- > + -- Step 1: create missing iterators, in case this is a > + -- first discovery iteration, or some replicasets were > + -- added after the router is started. > + for rs_uuid in pairs(router.replicasets) do > + local iter = iterators[rs_uuid] > + if not iter then > + iterators[rs_uuid] = { > + args = {{from = 1}}, > + future = nil, > + } > + end > + end > + -- Step 2: map stage - send parallel requests for every > + -- iterator, prune orphan iterators whose replicasets were > + -- removed. > + for rs_uuid, iter in pairs(iterators) do > + local replicaset = router.replicasets[rs_uuid] > + if not replicaset then > + log.warn('Replicaset %s was removed during discovery', rs_uuid) > + iterators[rs_uuid] = nil > + goto continue > + end > local future, err = > - replicaset:callro('vshard.storage.buckets_discovery', > - args, opts) > + replicaset:callro('vshard.storage.buckets_discovery', iter.args, > + opts) > if not future then > - log.warn('Error during discovery %s: %s', rs_uuid, err) > - else > - pending[rs_uuid] = future > + log.warn('Error during discovery %s, retry will be done '.. > + 'later: %s', rs_uuid, err) > + goto continue > + end > + iter.future = future > + -- Don't spam many requests at once. Give > + -- storages time to handle them and other > + -- requests. > + lfiber.sleep(consts.DISCOVERY_WORK_STEP) > + if module_version ~= M.module_version then > + return > end Is it possible to place such checks to some "common" places. At start/end of each iteration or between steps. This looks strange and a bit unobvous to do such checks after each timeout. > + ::continue:: > end > - > - local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL > - for rs_uuid, p in pairs(pending) do > + -- Step 3: reduce stage - collect responses, restart > + -- iterators which reached the end. > + for rs_uuid, iter in pairs(iterators) do > lfiber.yield() > - local timeout = deadline - lfiber.clock() > - local buckets, err = p:wait_result(timeout) > - while M.errinj.ERRINJ_LONG_DISCOVERY do > - M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting' > - lfiber.sleep(0.01) > + local future = iter.future > + if not future then > + goto continue > end > - local replicaset = router.replicasets[rs_uuid] > - if not buckets then > - p:discard() > - log.warn('Error during discovery %s: %s', rs_uuid, err) > - elseif module_version ~= M.module_version then > + local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT) > + if module_version ~= M.module_version then Does it have sence to call "discard" before "return"? > return > - elseif replicaset then > - discovery_handle_buckets(router, replicaset, buckets[1]) > end > + if not result then > + future:discard() > + log.warn('Error during discovery %s, retry will be done '.. > + 'later: %s', rs_uuid, err) > + goto continue > + end > + local replicaset = router.replicasets[rs_uuid] > + if not replicaset then > + iterators[rs_uuid] = nil > + log.warn('Replicaset %s was removed during discovery', rs_uuid) > + goto continue > + end > + result = result[1] > + -- Buckets are returned as plain array by storages > + -- using old vshard version. But if .buckets is set, > + -- this is a new storage. > + discovery_handle_buckets(router, replicaset, > + result.buckets or result) > + local discovery_args = iter.args[1] > + discovery_args.from = result.next_from > + if not result.next_from then > + -- Nil next_from means no more buckets to get. > + -- Restart the iterator. > + iterators[rs_uuid] = nil > + end > + ::continue:: > end > - > - lfiber.sleep(deadline - lfiber.clock()) > + local unknown_bucket_count > + repeat > + unknown_bucket_count = > + router.total_bucket_count - router.known_bucket_count > + if unknown_bucket_count == 0 then > + if mode ~= 'idle' then > + log.info('Discovery enters idle mode, all buckets are '.. > + 'known. Discovery works with %s seconds '.. > + 'interval now', consts.DISCOVERY_IDLE_INTERVAL) > + mode = 'idle' > + end > + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL) > + elseif not next(router.replicasets) then > + if mode ~= 'idle' then > + log.info('Discovery enters idle mode because '.. > + 'configuration does not have replicasets. '.. > + 'Retries will happen with %s seconds interval', > + consts.DISCOVERY_IDLE_INTERVAL) > + mode = 'idle' > + end > + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL) > + elseif mode ~= 'aggressive' then > + log.info('Start aggressive discovery, %s buckets are unknown. '.. > + 'Discovery works with %s seconds interval', > + unknown_bucket_count, consts.DISCOVERY_WORK_INTERVAL) > + mode = 'aggressive' > + lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL) > + break > + end > + while M.errinj.ERRINJ_LONG_DISCOVERY do > + M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting' > + lfiber.sleep(0.01) > + end > + until next(router.replicasets) > end > end > > @@ -355,9 +432,9 @@ local function discovery_set(router, new_mode) > router.discovery_mode = new_mode > if router.discovery_fiber ~= nil then > pcall(router.discovery_fiber.cancel, router.discovery_fiber) > + router.discovery_fiber = nil > end > if new_mode == 'off' then > - router.discovery_fiber = nil > return > end > router.discovery_fiber = util.reloadable_fiber_create( > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua > index 73c6740..0050b96 100644 > --- a/vshard/storage/init.lua > +++ b/vshard/storage/init.lua > @@ -1110,10 +1110,47 @@ local function bucket_collect(bucket_id) > return data > end > > +-- Discovery used by routers. It returns limited number of > +-- buckets to avoid stalls when _bucket is huge. > +local function buckets_discovery_extended(opts) > + local limit = consts.BUCKET_CHUNK_SIZE > + local buckets = table.new(limit, 0) > + local active = consts.BUCKET.ACTIVE > + local pinned = consts.BUCKET.PINNED > + local next_from > + -- No way to select by {status, id}, because there are two > + -- statuses to select. A router would need to maintain a > + -- separate iterator for each status it wants to get. This may > + -- be implemented in future. But _bucket space anyway 99% of > + -- time contains only active and pinned buckets. So there is > + -- no big benefit in optimizing that. Perhaps a compound index > + -- {status, id} could help too. > + for _, bucket in box.space._bucket:pairs({opts.from}, > + {iterator = box.index.GE}) do > + local status = bucket.status > + if status == active or status == pinned then > + table.insert(buckets, bucket.id) > + end > + limit = limit - 1 It's a bit strange after words about "99% time", I propose to move limit decrease under if condition. > + if limit == 0 then > + next_from = bucket.id + 1 > + break > + end > + end > + -- Buckets list can even be empty, if all buckets in the > + -- scanned chunk are not active/pinned. But next_from still > + -- should be returned. So as the router could request more. > + return {buckets = buckets, next_from = next_from} > +end > + > -- > -- Collect array of active bucket identifiers for discovery. > -- > -local function buckets_discovery() > +local function buckets_discovery(opts) > + if opts then > + -- Private method. Is not documented intentionally. > + return buckets_discovery_extended(opts) > + end > local ret = {} > local status = box.space._bucket.index.status > for _, bucket in status:pairs({consts.BUCKET.ACTIVE}) do >