From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp41.i.mail.ru (smtp41.i.mail.ru [94.100.177.101]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C1E694696C9 for ; Fri, 1 May 2020 03:16:41 +0300 (MSK) From: Vladislav Shpilevoy Date: Fri, 1 May 2020 02:16:33 +0200 Message-Id: <80bacfc685233ad047f6a80ddadd72b8903eae5b.1588292014.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH vshard 6/7] router: make discovery smoother in a big cluster List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, olegrok@tarantool.org Router does discovery once per 10 seconds. Discovery sends a request to each replicaset to download all pinned and active buckets from there. When there are millions of buckets, that becomes a long operation taking seconds, during which the storage is unresponsive. The patch makes discovery work step by step, downloading not more than 1000 buckets at a time. That gives the storage time to process other requests. Moreover, discovery now has some kind of 'state'. For each replicaset it keeps an iterator which is moved by 1k buckets on every successfully discovered bucket batch. It means, that if on a replicaset with 1 000 000 buckets discovery fails after 999 999 buckets are already discovered, it won't start from 0. It will retry from the old position. However, still there is space for improvement. Discovery could avoid downloading anything after all is downloaded, if it could somehow see, if bucket space is not changed. Unfortunately it is not so easy, since bucket generation (version of _bucket space) is not persisted. So after instance restart it is always equal to bucket count. Part of #210 --- test/router/reload.result | 5 +- test/router/reload.test.lua | 3 +- test/router/router.result | 4 +- test/router/router.test.lua | 2 +- test/router/wrong_config.result | 5 +- test/router/wrong_config.test.lua | 5 +- vshard/consts.lua | 5 +- vshard/router/init.lua | 145 +++++++++++++++++++++++------- vshard/storage/init.lua | 39 +++++++- 9 files changed, 170 insertions(+), 43 deletions(-) diff --git a/test/router/reload.result b/test/router/reload.result index 3ba900a..8fe99ba 100644 --- a/test/router/reload.result +++ b/test/router/reload.result @@ -44,7 +44,10 @@ vshard.router.bootstrap() while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end --- ... -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +--- +... +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end --- ... -- diff --git a/test/router/reload.test.lua b/test/router/reload.test.lua index 5ed5690..abcbc09 100644 --- a/test/router/reload.test.lua +++ b/test/router/reload.test.lua @@ -15,7 +15,8 @@ fiber = require('fiber') vshard.router.bootstrap() while test_run:grep_log('router_1', 'All replicas are ok') == nil do fiber.sleep(0.1) end -while test_run:grep_log('router_1', 'buckets: was 0, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'buckets: was 0, became 1000') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end +while test_run:grep_log('router_1', 'buckets: was 1000, became 1500') == nil do fiber.sleep(0.1) vshard.router.discovery_wakeup() end -- -- Gh-72: allow reload. Test simple reload, error during diff --git a/test/router/router.result b/test/router/router.result index df7be4a..b2efd6d 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -943,9 +943,9 @@ calculate_known_buckets() --- - 3000 ... -test_run:grep_log('router_1', 'was 1, became 1500') +test_run:grep_log('router_1', 'was 1, became 1000') --- -- was 1, became 1500 +- was 1, became 1000 ... info = vshard.router.info() --- diff --git a/test/router/router.test.lua b/test/router/router.test.lua index 97dce49..154310b 100644 --- a/test/router/router.test.lua +++ b/test/router/router.test.lua @@ -316,7 +316,7 @@ vshard.storage.bucket_pin(first_active) _ = test_run:switch('router_1') wait_discovery() calculate_known_buckets() -test_run:grep_log('router_1', 'was 1, became 1500') +test_run:grep_log('router_1', 'was 1, became 1000') info = vshard.router.info() info.bucket info.alerts diff --git a/test/router/wrong_config.result b/test/router/wrong_config.result index 56db9e8..92353c3 100644 --- a/test/router/wrong_config.result +++ b/test/router/wrong_config.result @@ -45,7 +45,10 @@ cfg.bucket_count = 1000 r = vshard.router.new('gh-179', cfg) --- ... -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end +while r:info().bucket.available_rw ~= 3000 do \ + r:discovery_wakeup() \ + fiber.sleep(0.1) \ +end --- ... i = r:info() diff --git a/test/router/wrong_config.test.lua b/test/router/wrong_config.test.lua index 62ef30d..174b373 100644 --- a/test/router/wrong_config.test.lua +++ b/test/router/wrong_config.test.lua @@ -18,7 +18,10 @@ vshard.router.bootstrap() -- cfg.bucket_count = 1000 r = vshard.router.new('gh-179', cfg) -while type(r:info().bucket.unknown) == 'number' and r:info().bucket.unknown > 0 do r:discovery_wakeup() fiber.sleep(0.1) end +while r:info().bucket.available_rw ~= 3000 do \ + r:discovery_wakeup() \ + fiber.sleep(0.1) \ +end i = r:info() i.bucket i.alerts diff --git a/vshard/consts.lua b/vshard/consts.lua index 5391c0f..a6a8c1b 100644 --- a/vshard/consts.lua +++ b/vshard/consts.lua @@ -40,5 +40,8 @@ return { DEFAULT_COLLECT_BUCKET_GARBAGE_INTERVAL = 0.5; RECOVERY_INTERVAL = 5; COLLECT_LUA_GARBAGE_INTERVAL = 100; - DISCOVERY_INTERVAL = 10; + DISCOVERY_IDLE_INTERVAL = 10, + DISCOVERY_WORK_INTERVAL = 1, + DISCOVERY_WORK_STEP = 0.01, + DISCOVERY_TIMEOUT = 10, } diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 6d88153..26ea85b 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -250,50 +250,127 @@ if util.version_is_at_least(1, 10, 0) then discovery_f = function(router) local module_version = M.module_version assert(router.discovery_mode == 'on') + local iterators = {} + local opts = {is_async = true} + local mode while module_version == M.module_version do - while not next(router.replicasets) do - lfiber.sleep(consts.DISCOVERY_INTERVAL) - end - if module_version ~= M.module_version then - return - end -- Just typical map reduce - send request to each - -- replicaset in parallel, and collect responses. - local pending = {} - local opts = {is_async = true} - local args = {} - for rs_uuid, replicaset in pairs(router.replicasets) do + -- replicaset in parallel, and collect responses. Many + -- requests probably will be needed for each replicaset. + -- + -- Step 1: create missing iterators, in case this is a + -- first discovery iteration, or some replicasets were + -- added after the router is started. + for rs_uuid in pairs(router.replicasets) do + local iter = iterators[rs_uuid] + if not iter then + iterators[rs_uuid] = { + args = {{from = 1}}, + future = nil, + } + end + end + -- Step 2: map stage - send parallel requests for every + -- iterator, prune orphan iterators whose replicasets were + -- removed. + for rs_uuid, iter in pairs(iterators) do + local replicaset = router.replicasets[rs_uuid] + if not replicaset then + log.warn('Replicaset %s was removed during discovery', rs_uuid) + iterators[rs_uuid] = nil + goto continue + end local future, err = - replicaset:callro('vshard.storage.buckets_discovery', - args, opts) + replicaset:callro('vshard.storage.buckets_discovery', iter.args, + opts) if not future then - log.warn('Error during discovery %s: %s', rs_uuid, err) - else - pending[rs_uuid] = future + log.warn('Error during discovery %s, retry will be done '.. + 'later: %s', rs_uuid, err) + goto continue + end + iter.future = future + -- Don't spam many requests at once. Give + -- storages time to handle them and other + -- requests. + lfiber.sleep(consts.DISCOVERY_WORK_STEP) + if module_version ~= M.module_version then + return end + ::continue:: end - - local deadline = lfiber.clock() + consts.DISCOVERY_INTERVAL - for rs_uuid, p in pairs(pending) do + -- Step 3: reduce stage - collect responses, restart + -- iterators which reached the end. + for rs_uuid, iter in pairs(iterators) do lfiber.yield() - local timeout = deadline - lfiber.clock() - local buckets, err = p:wait_result(timeout) - while M.errinj.ERRINJ_LONG_DISCOVERY do - M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting' - lfiber.sleep(0.01) + local future = iter.future + if not future then + goto continue end - local replicaset = router.replicasets[rs_uuid] - if not buckets then - p:discard() - log.warn('Error during discovery %s: %s', rs_uuid, err) - elseif module_version ~= M.module_version then + local result, err = future:wait_result(consts.DISCOVERY_TIMEOUT) + if module_version ~= M.module_version then return - elseif replicaset then - discovery_handle_buckets(router, replicaset, buckets[1]) end + if not result then + future:discard() + log.warn('Error during discovery %s, retry will be done '.. + 'later: %s', rs_uuid, err) + goto continue + end + local replicaset = router.replicasets[rs_uuid] + if not replicaset then + iterators[rs_uuid] = nil + log.warn('Replicaset %s was removed during discovery', rs_uuid) + goto continue + end + result = result[1] + -- Buckets are returned as plain array by storages + -- using old vshard version. But if .buckets is set, + -- this is a new storage. + discovery_handle_buckets(router, replicaset, + result.buckets or result) + local discovery_args = iter.args[1] + discovery_args.from = result.next_from + if not result.next_from then + -- Nil next_from means no more buckets to get. + -- Restart the iterator. + iterators[rs_uuid] = nil + end + ::continue:: end - - lfiber.sleep(deadline - lfiber.clock()) + local unknown_bucket_count + repeat + unknown_bucket_count = + router.total_bucket_count - router.known_bucket_count + if unknown_bucket_count == 0 then + if mode ~= 'idle' then + log.info('Discovery enters idle mode, all buckets are '.. + 'known. Discovery works with %s seconds '.. + 'interval now', consts.DISCOVERY_IDLE_INTERVAL) + mode = 'idle' + end + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL) + elseif not next(router.replicasets) then + if mode ~= 'idle' then + log.info('Discovery enters idle mode because '.. + 'configuration does not have replicasets. '.. + 'Retries will happen with %s seconds interval', + consts.DISCOVERY_IDLE_INTERVAL) + mode = 'idle' + end + lfiber.sleep(consts.DISCOVERY_IDLE_INTERVAL) + elseif mode ~= 'aggressive' then + log.info('Start aggressive discovery, %s buckets are unknown. '.. + 'Discovery works with %s seconds interval', + unknown_bucket_count, consts.DISCOVERY_WORK_INTERVAL) + mode = 'aggressive' + lfiber.sleep(consts.DISCOVERY_WORK_INTERVAL) + break + end + while M.errinj.ERRINJ_LONG_DISCOVERY do + M.errinj.ERRINJ_LONG_DISCOVERY = 'waiting' + lfiber.sleep(0.01) + end + until next(router.replicasets) end end @@ -355,9 +432,9 @@ local function discovery_set(router, new_mode) router.discovery_mode = new_mode if router.discovery_fiber ~= nil then pcall(router.discovery_fiber.cancel, router.discovery_fiber) + router.discovery_fiber = nil end if new_mode == 'off' then - router.discovery_fiber = nil return end router.discovery_fiber = util.reloadable_fiber_create( diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 73c6740..0050b96 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -1110,10 +1110,47 @@ local function bucket_collect(bucket_id) return data end +-- Discovery used by routers. It returns limited number of +-- buckets to avoid stalls when _bucket is huge. +local function buckets_discovery_extended(opts) + local limit = consts.BUCKET_CHUNK_SIZE + local buckets = table.new(limit, 0) + local active = consts.BUCKET.ACTIVE + local pinned = consts.BUCKET.PINNED + local next_from + -- No way to select by {status, id}, because there are two + -- statuses to select. A router would need to maintain a + -- separate iterator for each status it wants to get. This may + -- be implemented in future. But _bucket space anyway 99% of + -- time contains only active and pinned buckets. So there is + -- no big benefit in optimizing that. Perhaps a compound index + -- {status, id} could help too. + for _, bucket in box.space._bucket:pairs({opts.from}, + {iterator = box.index.GE}) do + local status = bucket.status + if status == active or status == pinned then + table.insert(buckets, bucket.id) + end + limit = limit - 1 + if limit == 0 then + next_from = bucket.id + 1 + break + end + end + -- Buckets list can even be empty, if all buckets in the + -- scanned chunk are not active/pinned. But next_from still + -- should be returned. So as the router could request more. + return {buckets = buckets, next_from = next_from} +end + -- -- Collect array of active bucket identifiers for discovery. -- -local function buckets_discovery() +local function buckets_discovery(opts) + if opts then + -- Private method. Is not documented intentionally. + return buckets_discovery_extended(opts) + end local ret = {} local status = box.space._bucket.index.status for _, bucket in status:pairs({consts.BUCKET.ACTIVE}) do -- 2.21.1 (Apple Git-122.3)