From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 66A3870202; Wed, 24 Feb 2021 13:31:52 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 66A3870202 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1614162712; bh=N99OUa4YRMVoa/ANvQiSci4dW9sEK/iPYwlF6YOzbBo=; h=To:References:Date:In-Reply-To:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=kfKXKqK1SMBx0XtRJ84OvWmA/ZVG9zWRPVTgsGBiJ6o2uLmRzQmjuCXZiPxLZXceV JYzODGC6QGk2Mzl9MmJm65vJtlS8NNviwhV2tpiWIIdmo4ElvdYz3DEz63Sv/PaLVn XcPrYUuoMkOZnr1jGMsRk9OkD3DwHOo64cqKn96I= Received: from smtp36.i.mail.ru (smtp36.i.mail.ru [94.100.177.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 194396C1A5 for ; Wed, 24 Feb 2021 13:28:10 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 194396C1A5 Received: by smtp36.i.mail.ru with esmtpa (envelope-from ) id 1lErOr-0004xp-0b; Wed, 24 Feb 2021 13:28:09 +0300 To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org, yaroslav.dynnikov@tarantool.org References: <02df9747667f254ee5734cfafa681eead566f212.1614039039.git.v.shpilevoy@tarantool.org> Message-ID: <5e011bc2-426b-e59a-f165-fef74f998b18@tarantool.org> Date: Wed, 24 Feb 2021 13:28:08 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.16; rv:78.0) Gecko/20100101 Thunderbird/78.7.1 MIME-Version: 1.0 In-Reply-To: <02df9747667f254ee5734cfafa681eead566f212.1614039039.git.v.shpilevoy@tarantool.org> Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 7bit Content-Language: en-GB X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD975C3EC174F5669229511437AA01F46811CFCF616A939B362182A05F538085040D0F6D8F4A7C0E539612ECD02D1E06500B60200DC1A8F8C366FF70728E2B4E786 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7F2EC3597058CFA6DEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637B3D52627AD81B52CEA1F7E6F0F101C674E70A05D1297E1BBC6CDE5D1141D2B1CC8CD956854CCCECA891613FFB08A255573A3AE2F43A4ED509FA2833FD35BB23D9E625A9149C048EE9ECD01F8117BC8BEA471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F44604297287769387670735209ECD01F8117BC8BEA471835C12D1D977C4224003CC836476EB9C4185024447017B076A6E789B0E975F5C1EE8F4F765FC69C563AEBEC0BF273AA81AA40904B5D9CF19DD082D7633A078D18283394535A93AA81AA40904B5D98AA50765F7900637998A28DB1B9AB3A1D81D268191BDAD3D698AB9A7B718F8C442539A7722CA490C13377AFFFEAFD26923F8577A6DFFEA7CB59C7783CC88FA9693EC92FD9297F6715571747095F342E857739F23D657EF2BD5E8D9A59859A8B655EC1579764E9EAF089D37D7C0E48F6C5571747095F342E857739F23D657EF2B6825BDBE14D8E7028C9DFF55498CEFB0BD9CCCA9EDD067B1EDA766A37F9254B7 X-B7AD71C0: 6FEFE4C63DFE2D85469AD6E133326EAB664F5199923B286E81C2AD9CFA0FBF5C9C2BBA594F31363B5803BE1F3B17DC36E8F7B195E1C97831ADCC3A0E2517F839F58DDC17270A414D X-C1DE0DAB: 0D63561A33F958A515CD71BE73D96E2DB251013FABA5FA9D2040FC0DB2A0215ED59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA75B7BFB303F1C7DB4D8E8E86DC7131B365E7726E8460B7C23C X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D342833AC5E8E9ACF1CD13C5C123298CA61C2BDAAB7356DE8156BBB5E49129AF845D9E6CB0909E4ABC91D7E09C32AA3244CAA61953414AC030420C315FB1E855B335A1673A01BA68E40FACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojyK6JYJ15DtIjKeuGSlJQpQ== X-Mailru-Sender: 583F1D7ACE8F49BD9317CE1922F30C7E468A939311E08B1522FB407E1E812E319E53F16290B7035F23E75C7104EB1B885DEE61814008E47C7013064206BFB89F93956FB04BA385BE9437F6177E88F7363CDA0F3B3F5B9367 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH vshard 10/11] sched: introduce vshard.storage.sched module X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Oleg Babin via Tarantool-patches Reply-To: Oleg Babin Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Thanks for you patch. It's a brief review - I hope I'll look once again on this patch. Consider 2 comments below. On 23.02.2021 03:15, Vladislav Shpilevoy wrote: > 'vshard.storage.sched' module ensures that two incompatible > operations share storage time fairly - storage refs and bucket > moves. > > Storage refs are going to be used by map-reduce API to preserve > data consistency while map requests are in progress on all > storages. > > It means storage refs will be used as commonly as bucket refs, > and should not block the rebalancer. However it is hard not to > block the rebalancer forever if there are always refs on the > storage. > > With bucket refs it was easy - one bucket temporary block is not a > big deal. So rebalancer always has higher prio than bucket refs, > and it still does not block requests for the other buckets + > read-only requests on the subject bucket. > > With storage refs having rebalancer with a higher prio would make > map-reduce requests die in the entire cluster for the whole time > of rebalancing, which can be as long as hours or even days. It > wouldn't be acceptable. > > The new module vshard.storage.sched shares time between moves and > storeage refs fairly. They both get time to execute with > proportions configures by user. The proportions depend on how > big is a bucket, how long the map-reduce requests are expected to > be. The longer is a request, the less quota it should be given, > typically. > > The patch introduces new storage options to configure the > scheduling. > > Part of #147 > > @TarantoolBot document > Title: vshard.storage.cfg new options - sched_ref_quota and sched_move_quota > > There are new options for `vshard.storage.cfg`: `sched_ref_quota` > and `sched_move_quota`. The options control how much time should > be given to storage refs and bucket moves - two incompatible but > important operations. > > Storage refs are used by router's map-reduce API. Each map-reduce > call creates storage refs on all storages to prevent data > migration on them for the map execution. > > Bucket moves are used by the rebalancer. Obviously, they are > incompatible with the storage refs. > > If vshard would prefer one operation to another always, it would > lead to starvation of one of them. For example, if storage refs > would be prefered, rebalancing could just never work if there are > always refs under constant map-reduce load. If bucket moves would > be prefered, storage refs (and therefore map-reduce) would stop > for the entire rebalancing time which can be quite long (hours, > days). > > To control how much time to give to which operation the new > options serve. > > `sched_ref_quota` tells how many storage refs (therefore > map-reduce requests) can be executed on the storage in a row if > there are pending bucket moves, before they are blocked to let the > moves work. Default value is 300. > > `sched_move_quota` controls the same, but vice-versa: how many > bucket moves can be done in a row if there are pending refs. > Default value is 1. > > Map-reduce requests are expected to be much shorter than bucket > moves, so storage refs by default have a higher quota. > > This is how it works on an example. Assume map-reduces start. > They execute one after another, 150 requests in a row. Now the > rebalancer wakes up and wants to move some buckets. He stands into > a queue and waits for the storage refs to be gone. > > But the ref quota is not reached yet, so the storage still can > execute +150 map-reduces even with the queued bucket moves until > new refs are blocked, and the moves start. > --- > test/reload_evolution/storage.result | 2 +- > test/storage/ref.result | 19 +- > test/storage/ref.test.lua | 9 +- > test/storage/scheduler.result | 410 ++++++++++++++++++++ > test/storage/scheduler.test.lua | 178 +++++++++ > test/unit-tap/ref.test.lua | 7 +- > test/unit-tap/scheduler.test.lua | 555 +++++++++++++++++++++++++++ > test/unit/config.result | 59 +++ > test/unit/config.test.lua | 23 ++ > vshard/cfg.lua | 8 + > vshard/consts.lua | 5 + > vshard/storage/CMakeLists.txt | 2 +- > vshard/storage/init.lua | 54 ++- > vshard/storage/ref.lua | 30 +- > vshard/storage/sched.lua | 231 +++++++++++ > 15 files changed, 1567 insertions(+), 25 deletions(-) > create mode 100644 test/storage/scheduler.result > create mode 100644 test/storage/scheduler.test.lua > create mode 100755 test/unit-tap/scheduler.test.lua > create mode 100644 vshard/storage/sched.lua > > diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result > index c4a0cdd..77010a2 100644 > --- a/test/reload_evolution/storage.result > +++ b/test/reload_evolution/storage.result > @@ -258,7 +258,7 @@ ok, err = vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2], > ... > assert(not ok and err.message) > --- > -- Storage is referenced > +- Timeout exceeded > ... > lref.del(0, 0) > --- > diff --git a/test/storage/ref.result b/test/storage/ref.result > index d5f4166..59f07f4 100644 > --- a/test/storage/ref.result > +++ b/test/storage/ref.result > @@ -84,18 +84,22 @@ big_timeout = 1000000 > small_timeout = 0.001 > | --- > | ... > + > +timeout = 0.01 > + | --- > + | ... > lref.add(rid, sid, big_timeout) > | --- > | - true > | ... > -- Send fails. > ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \ > - {timeout = big_timeout}) > + {timeout = timeout}) > | --- > | ... > assert(not ok and err.message) > | --- > - | - Storage is referenced > + | - Timeout exceeded > | ... > lref.use(rid, sid) > | --- > @@ -103,12 +107,12 @@ lref.use(rid, sid) > | ... > -- Still fails - use only makes ref undead until it is deleted explicitly. > ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \ > - {timeout = big_timeout}) > + {timeout = timeout}) > | --- > | ... > assert(not ok and err.message) > | --- > - | - Storage is referenced > + | - Timeout exceeded > | ... > > _ = test_run:switch('storage_2_a') > @@ -118,13 +122,16 @@ _ = test_run:switch('storage_2_a') > big_timeout = 1000000 > | --- > | ... > +timeout = 0.01 > + | --- > + | ... > ok, err = vshard.storage.bucket_send(1501, util.replicasets[1], \ > - {timeout = big_timeout}) > + {timeout = timeout}) > | --- > | ... > assert(not ok and err.message) > | --- > - | - Storage is referenced > + | - Timeout exceeded > | ... > > -- > diff --git a/test/storage/ref.test.lua b/test/storage/ref.test.lua > index b34a294..24303e2 100644 > --- a/test/storage/ref.test.lua > +++ b/test/storage/ref.test.lua > @@ -35,22 +35,25 @@ sid = 0 > rid = 0 > big_timeout = 1000000 > small_timeout = 0.001 > + > +timeout = 0.01 > lref.add(rid, sid, big_timeout) > -- Send fails. > ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \ > - {timeout = big_timeout}) > + {timeout = timeout}) > assert(not ok and err.message) > lref.use(rid, sid) > -- Still fails - use only makes ref undead until it is deleted explicitly. > ok, err = vshard.storage.bucket_send(1, util.replicasets[2], \ > - {timeout = big_timeout}) > + {timeout = timeout}) > assert(not ok and err.message) > > _ = test_run:switch('storage_2_a') > -- Receive (from another replicaset) also fails. > big_timeout = 1000000 > +timeout = 0.01 > ok, err = vshard.storage.bucket_send(1501, util.replicasets[1], \ > - {timeout = big_timeout}) > + {timeout = timeout}) > assert(not ok and err.message) > > -- > diff --git a/test/storage/scheduler.result b/test/storage/scheduler.result > new file mode 100644 > index 0000000..0f53e42 > --- /dev/null > +++ b/test/storage/scheduler.result > @@ -0,0 +1,410 @@ > +-- test-run result file version 2 > +test_run = require('test_run').new() > + | --- > + | ... > +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } > + | --- > + | ... > +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } > + | --- > + | ... > + > +test_run:create_cluster(REPLICASET_1, 'storage') > + | --- > + | ... > +test_run:create_cluster(REPLICASET_2, 'storage') > + | --- > + | ... > +util = require('util') > + | --- > + | ... > +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') > + | --- > + | ... > +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') > + | --- > + | ... > +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()') > + | --- > + | ... > +util.push_rs_filters(test_run) > + | --- > + | ... > + > +-- > +-- gh-147: scheduler helps to share time fairly between incompatible but > +-- necessary operations - storage refs and bucket moves. Refs are used for the > +-- consistent map-reduce feature when the whole cluster can be scanned without > +-- being afraid that some data may slip through requests on behalf of the > +-- rebalancer. > +-- > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > + > +vshard.storage.rebalancer_disable() > + | --- > + | ... > +vshard.storage.bucket_force_create(1, 1500) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +vshard.storage.rebalancer_disable() > + | --- > + | ... > +vshard.storage.bucket_force_create(1501, 1500) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +-- > +-- Bucket_send() uses the scheduler. > +-- > +lsched = require('vshard.storage.sched') > + | --- > + | ... > +assert(lsched.move_strike == 0) > + | --- > + | - true > + | ... > +assert(lsched.move_count == 0) > + | --- > + | - true > + | ... > +big_timeout = 1000000 > + | --- > + | ... > +big_timeout_opts = {timeout = big_timeout} > + | --- > + | ... > +vshard.storage.bucket_send(1, util.replicasets[2], big_timeout_opts) > + | --- > + | - true > + | ... > +assert(lsched.move_strike == 1) > + | --- > + | - true > + | ... > +assert(lsched.move_count == 0) > + | --- > + | - true > + | ... > +wait_bucket_is_collected(1) > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +lsched = require('vshard.storage.sched') > + | --- > + | ... > +-- > +-- Bucket_recv() uses the scheduler. > +-- > +assert(lsched.move_strike == 1) > + | --- > + | - true > + | ... > +assert(lsched.move_count == 0) > + | --- > + | - true > + | ... > + > +-- > +-- When move is in progress, it is properly accounted. > +-- > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +big_timeout = 1000000 > + | --- > + | ... > +big_timeout_opts = {timeout = big_timeout} > + | --- > + | ... > +ok, err = nil > + | --- > + | ... > +assert(lsched.move_strike == 1) > + | --- > + | - true > + | ... > +_ = fiber.create(function() \ > + ok, err = vshard.storage.bucket_send(1, util.replicasets[1], \ > + big_timeout_opts) \ > +end) > + | --- > + | ... > +-- Strike increase does not mean the move finished. It means it was successfully > +-- scheduled. > +assert(lsched.move_strike == 2) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return lsched.move_strike == 2 end) > + | --- > + | - true > + | ... > + > +-- > +-- Ref is not allowed during move. > +-- > +small_timeout = 0.000001 > + | --- > + | ... > +lref = require('vshard.storage.ref') > + | --- > + | ... > +ok, err = lref.add(0, 0, small_timeout) > + | --- > + | ... > +assert(not ok) > + | --- > + | - true > + | ... > +err.message > + | --- > + | - Timeout exceeded > + | ... > +-- Put it to wait until move is done. > +ok, err = nil > + | --- > + | ... > +_ = fiber.create(function() ok, err = lref.add(0, 0, big_timeout) end) > + | --- > + | ... > +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +test_run:wait_cond(function() return ok or err end) > + | --- > + | - true > + | ... > +ok, err > + | --- > + | - true > + | - null > + | ... > +assert(lsched.move_count == 0) > + | --- > + | - true > + | ... > +wait_bucket_is_collected(1) > + | --- > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +test_run:wait_cond(function() return ok or err end) > + | --- > + | - true > + | ... > +ok, err > + | --- > + | - true > + | - null > + | ... > +assert(lsched.move_count == 0) > + | --- > + | - true > + | ... > +assert(lsched.ref_count == 1) > + | --- > + | - true > + | ... > +lref.del(0, 0) > + | --- > + | - true > + | ... > +assert(lsched.ref_count == 0) > + | --- > + | - true > + | ... > + > +-- > +-- Refs can't block sends infinitely. The scheduler must be fair and share time > +-- between ref/move. > +-- > +do_refs = true > + | --- > + | ... > +ref_worker_count = 10 > + | --- > + | ... > +function ref_worker() \ > + while do_refs do \ > + lref.add(0, 0, big_timeout) \ > + fiber.sleep(small_timeout) \ > + lref.del(0, 0) \ > + end \ > + ref_worker_count = ref_worker_count - 1 \ > +end > + | --- > + | ... > +-- Simulate many fibers doing something with a ref being kept. > +for i = 1, ref_worker_count do fiber.create(ref_worker) end > + | --- > + | ... > +assert(lref.count > 0) > + | --- > + | - true > + | ... > +assert(lsched.ref_count > 0) > + | --- > + | - true > + | ... > +-- Ensure it passes with default opts (when move is in great unfairness). It is > +-- important. Because moves are expected to be much longer than refs, and must > +-- not happen too often with ref load in progress. But still should eventually > +-- be processed. > +bucket_count = 100 > + | --- > + | ... > +bucket_id = 1 > + | --- > + | ... > +bucket_worker_count = 5 > + | --- > + | ... > +function bucket_worker() \ > + while bucket_id <= bucket_count do \ > + local id = bucket_id \ > + bucket_id = bucket_id + 1 \ > + assert(vshard.storage.bucket_send(id, util.replicasets[2])) \ > + end \ > + bucket_worker_count = bucket_worker_count - 1 \ > +end > + | --- > + | ... > +-- Simulate many rebalancer fibers like when max_sending is increased. > +for i = 1, bucket_worker_count do fiber.create(bucket_worker) end > + | --- > + | ... > +test_run:wait_cond(function() return bucket_worker_count == 0 end) > + | --- > + | - true > + | ... > + > +do_refs = false > + | --- > + | ... > +test_run:wait_cond(function() return ref_worker_count == 0 end) > + | --- > + | - true > + | ... > +assert(lref.count == 0) > + | --- > + | - true > + | ... > +assert(lsched.ref_count == 0) > + | --- > + | - true > + | ... > + > +for i = 1, bucket_count do wait_bucket_is_collected(i) end > + | --- > + | ... > + > +-- > +-- Refs can't block recvs infinitely. > +-- > +do_refs = true > + | --- > + | ... > +for i = 1, ref_worker_count do fiber.create(ref_worker) end > + | --- > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +bucket_count = 100 > + | --- > + | ... > +bucket_id = 1 > + | --- > + | ... > +bucket_worker_count = 5 > + | --- > + | ... > +function bucket_worker() \ > + while bucket_id <= bucket_count do \ > + local id = bucket_id \ > + bucket_id = bucket_id + 1 \ > + assert(vshard.storage.bucket_send(id, util.replicasets[1])) \ > + end \ > + bucket_worker_count = bucket_worker_count - 1 \ > +end > + | --- > + | ... > +for i = 1, bucket_worker_count do fiber.create(bucket_worker) end > + | --- > + | ... > +test_run:wait_cond(function() return bucket_worker_count == 0 end) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_1_a') > + | --- > + | ... > +do_refs = false > + | --- > + | ... > +test_run:wait_cond(function() return ref_worker_count == 0 end) > + | --- > + | - true > + | ... > +assert(lref.count == 0) > + | --- > + | - true > + | ... > +assert(lsched.ref_count == 0) > + | --- > + | - true > + | ... > + > +_ = test_run:switch('storage_2_a') > + | --- > + | ... > +for i = 1, bucket_count do wait_bucket_is_collected(i) end > + | --- > + | ... > + > +_ = test_run:switch("default") > + | --- > + | ... > +test_run:drop_cluster(REPLICASET_2) > + | --- > + | ... > +test_run:drop_cluster(REPLICASET_1) > + | --- > + | ... > +_ = test_run:cmd('clear filter') > + | --- > + | ... > diff --git a/test/storage/scheduler.test.lua b/test/storage/scheduler.test.lua > new file mode 100644 > index 0000000..8628f0e > --- /dev/null > +++ b/test/storage/scheduler.test.lua > @@ -0,0 +1,178 @@ > +test_run = require('test_run').new() > +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } > +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } > + > +test_run:create_cluster(REPLICASET_1, 'storage') > +test_run:create_cluster(REPLICASET_2, 'storage') > +util = require('util') > +util.wait_master(test_run, REPLICASET_1, 'storage_1_a') > +util.wait_master(test_run, REPLICASET_2, 'storage_2_a') > +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()') > +util.push_rs_filters(test_run) > + > +-- > +-- gh-147: scheduler helps to share time fairly between incompatible but > +-- necessary operations - storage refs and bucket moves. Refs are used for the > +-- consistent map-reduce feature when the whole cluster can be scanned without > +-- being afraid that some data may slip through requests on behalf of the > +-- rebalancer. > +-- > + > +_ = test_run:switch('storage_1_a') > + > +vshard.storage.rebalancer_disable() > +vshard.storage.bucket_force_create(1, 1500) > + > +_ = test_run:switch('storage_2_a') > +vshard.storage.rebalancer_disable() > +vshard.storage.bucket_force_create(1501, 1500) > + > +_ = test_run:switch('storage_1_a') > +-- > +-- Bucket_send() uses the scheduler. > +-- > +lsched = require('vshard.storage.sched') > +assert(lsched.move_strike == 0) > +assert(lsched.move_count == 0) > +big_timeout = 1000000 > +big_timeout_opts = {timeout = big_timeout} > +vshard.storage.bucket_send(1, util.replicasets[2], big_timeout_opts) > +assert(lsched.move_strike == 1) > +assert(lsched.move_count == 0) > +wait_bucket_is_collected(1) > + > +_ = test_run:switch('storage_2_a') > +lsched = require('vshard.storage.sched') > +-- > +-- Bucket_recv() uses the scheduler. > +-- > +assert(lsched.move_strike == 1) > +assert(lsched.move_count == 0) > + > +-- > +-- When move is in progress, it is properly accounted. > +-- > +_ = test_run:switch('storage_1_a') > +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true > + > +_ = test_run:switch('storage_2_a') > +big_timeout = 1000000 > +big_timeout_opts = {timeout = big_timeout} > +ok, err = nil > +assert(lsched.move_strike == 1) > +_ = fiber.create(function() \ > + ok, err = vshard.storage.bucket_send(1, util.replicasets[1], \ > + big_timeout_opts) \ > +end) > +-- Strike increase does not mean the move finished. It means it was successfully > +-- scheduled. > +assert(lsched.move_strike == 2) > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return lsched.move_strike == 2 end) > + > +-- > +-- Ref is not allowed during move. > +-- > +small_timeout = 0.000001 > +lref = require('vshard.storage.ref') > +ok, err = lref.add(0, 0, small_timeout) > +assert(not ok) > +err.message > +-- Put it to wait until move is done. > +ok, err = nil > +_ = fiber.create(function() ok, err = lref.add(0, 0, big_timeout) end) > +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false > + > +_ = test_run:switch('storage_2_a') > +test_run:wait_cond(function() return ok or err end) > +ok, err > +assert(lsched.move_count == 0) > +wait_bucket_is_collected(1) > + > +_ = test_run:switch('storage_1_a') > +test_run:wait_cond(function() return ok or err end) > +ok, err > +assert(lsched.move_count == 0) > +assert(lsched.ref_count == 1) > +lref.del(0, 0) > +assert(lsched.ref_count == 0) > + > +-- > +-- Refs can't block sends infinitely. The scheduler must be fair and share time > +-- between ref/move. > +-- > +do_refs = true > +ref_worker_count = 10 > +function ref_worker() \ > + while do_refs do \ > + lref.add(0, 0, big_timeout) \ > + fiber.sleep(small_timeout) \ > + lref.del(0, 0) \ > + end \ > + ref_worker_count = ref_worker_count - 1 \ > +end > +-- Simulate many fibers doing something with a ref being kept. > +for i = 1, ref_worker_count do fiber.create(ref_worker) end > +assert(lref.count > 0) > +assert(lsched.ref_count > 0) > +-- Ensure it passes with default opts (when move is in great unfairness). It is > +-- important. Because moves are expected to be much longer than refs, and must > +-- not happen too often with ref load in progress. But still should eventually > +-- be processed. > +bucket_count = 100 > +bucket_id = 1 > +bucket_worker_count = 5 > +function bucket_worker() \ > + while bucket_id <= bucket_count do \ > + local id = bucket_id \ > + bucket_id = bucket_id + 1 \ > + assert(vshard.storage.bucket_send(id, util.replicasets[2])) \ > + end \ > + bucket_worker_count = bucket_worker_count - 1 \ > +end > +-- Simulate many rebalancer fibers like when max_sending is increased. > +for i = 1, bucket_worker_count do fiber.create(bucket_worker) end > +test_run:wait_cond(function() return bucket_worker_count == 0 end) > + > +do_refs = false > +test_run:wait_cond(function() return ref_worker_count == 0 end) > +assert(lref.count == 0) > +assert(lsched.ref_count == 0) > + > +for i = 1, bucket_count do wait_bucket_is_collected(i) end > + > +-- > +-- Refs can't block recvs infinitely. > +-- > +do_refs = true > +for i = 1, ref_worker_count do fiber.create(ref_worker) end > + > +_ = test_run:switch('storage_2_a') > +bucket_count = 100 > +bucket_id = 1 > +bucket_worker_count = 5 > +function bucket_worker() \ > + while bucket_id <= bucket_count do \ > + local id = bucket_id \ > + bucket_id = bucket_id + 1 \ > + assert(vshard.storage.bucket_send(id, util.replicasets[1])) \ > + end \ > + bucket_worker_count = bucket_worker_count - 1 \ > +end > +for i = 1, bucket_worker_count do fiber.create(bucket_worker) end > +test_run:wait_cond(function() return bucket_worker_count == 0 end) > + > +_ = test_run:switch('storage_1_a') > +do_refs = false > +test_run:wait_cond(function() return ref_worker_count == 0 end) > +assert(lref.count == 0) > +assert(lsched.ref_count == 0) > + > +_ = test_run:switch('storage_2_a') > +for i = 1, bucket_count do wait_bucket_is_collected(i) end > + > +_ = test_run:switch("default") > +test_run:drop_cluster(REPLICASET_2) > +test_run:drop_cluster(REPLICASET_1) > +_ = test_run:cmd('clear filter') > diff --git a/test/unit-tap/ref.test.lua b/test/unit-tap/ref.test.lua > index d987a63..ba95eee 100755 > --- a/test/unit-tap/ref.test.lua > +++ b/test/unit-tap/ref.test.lua > @@ -5,6 +5,7 @@ local test = tap.test('cfg') > local fiber = require('fiber') > local lregistry = require('vshard.registry') > local lref = require('vshard.storage.ref') > +require('vshard.storage.sched') > > local big_timeout = 1000000 > local small_timeout = 0.000001 > @@ -19,9 +20,11 @@ local sid3 = 2 > -- > > -- > --- Refs used storage API to get bucket space state and wait on its changes. But > --- not important for these unit tests. > +-- Refs use storage API to get bucket space state and wait on its changes. And > +-- scheduler API to sync with bucket moves. But not important for these unit > +-- tests. > -- > + > local function bucket_are_all_rw() > return true > end > diff --git a/test/unit-tap/scheduler.test.lua b/test/unit-tap/scheduler.test.lua > new file mode 100755 > index 0000000..0af4f5e > --- /dev/null > +++ b/test/unit-tap/scheduler.test.lua > @@ -0,0 +1,555 @@ > +#!/usr/bin/env tarantool > + > +local fiber = require('fiber') > +local tap = require('tap') > +local test = tap.test('cfg') > +local lregistry = require('vshard.registry') > +local lref = require('vshard.storage.ref') > +local lsched = require('vshard.storage.sched') > + > +local big_timeout = 1000000 > +local small_timeout = 0.000001 > + > +-- > +-- gh-147: scheduler helps to share time fairly between incompatible but > +-- necessary operations - storage refs and bucket moves. Refs are used for the > +-- consistent map-reduce feature when the whole cluster can be scanned without > +-- being afraid that some data may slip through requests on behalf of the > +-- rebalancer. > +-- > + > +box.cfg{ > + log = 'log.txt' > +} > +-- io.write = function(...) require('log').info(...) end > + > +-- > +-- Storage registry is used by the ref module. The ref module is used in the > +-- tests in order to ensure the scheduler performs ref garbage collection. > +-- > +local function bucket_are_all_rw() > + return true > +end > + > +lregistry.storage = { > + bucket_are_all_rw = bucket_are_all_rw, > +} > + > +local function fiber_csw() > + return fiber.info()[fiber.self():id()].csw > +end > + > +local function fiber_set_joinable() > + fiber.self():set_joinable(true) > +end > + > +local function test_basic(test) > + test:plan(32) > + > + local ref_strike = lsched.ref_strike > + -- > + -- Simplest possible test - start and end a ref. > + -- > + test:is(lsched.ref_start(big_timeout), big_timeout, 'start ref') > + test:is(lsched.ref_count, 1, '1 ref') > + test:is(lsched.ref_strike, ref_strike + 1, '+1 ref in a row') > + lsched.ref_end(1) > + test:is(lsched.ref_count, 0, '0 refs after end') > + test:is(lsched.ref_strike, ref_strike + 1, 'strike is kept') > + > + lsched.ref_start(big_timeout) > + lsched.ref_end(1) > + test:is(lsched.ref_strike, ref_strike + 2, 'strike grows') > + test:is(lsched.ref_count, 0, 'count does not') > + > + -- > + -- Move ends ref strike. > + -- > + test:is(lsched.move_start(big_timeout), big_timeout, 'start move') > + test:is(lsched.move_count, 1, '1 move') > + test:is(lsched.move_strike, 1, '+1 move strike') > + test:is(lsched.ref_strike, 0, 'ref strike is interrupted') > + > + -- > + -- Ref times out if there is a move in progress. > + -- > + local ok, err = lsched.ref_start(small_timeout) > + test:ok(not ok and err, 'ref fails') > + test:is(lsched.move_count, 1, 'still 1 move') > + test:is(lsched.move_strike, 1, 'still 1 move strike') > + test:is(lsched.ref_count, 0, 'could not add ref') > + test:is(lsched.ref_queue, 0, 'empty ref queue') > + > + -- > + -- Ref succeeds when move ends. > + -- > + local f = fiber.create(function() > + fiber_set_joinable() > + return lsched.ref_start(big_timeout) > + end) > + fiber.sleep(small_timeout) > + lsched.move_end(1) > + local new_timeout > + ok, new_timeout = f:join() > + test:ok(ok and new_timeout < big_timeout, 'correct timeout') > + test:is(lsched.move_count, 0, 'no moves') > + test:is(lsched.move_strike, 0, 'move strike ends') > + test:is(lsched.ref_count, 1, '+1 ref') > + test:is(lsched.ref_strike, 1, '+1 ref strike') > + > + -- > + -- Move succeeds when ref ends. > + -- > + f = fiber.create(function() > + fiber_set_joinable() > + return lsched.move_start(big_timeout) > + end) > + fiber.sleep(small_timeout) > + lsched.ref_end(1) > + ok, new_timeout = f:join() > + test:ok(ok and new_timeout < big_timeout, 'correct timeout') > + test:is(lsched.ref_count, 0, 'no refs') > + test:is(lsched.ref_strike, 0, 'ref strike ends') > + test:is(lsched.move_count, 1, '+1 move') > + test:is(lsched.move_strike, 1, '+1 move strike') > + lsched.move_end(1) > + > + -- > + -- Move times out when there is a ref. > + -- > + test:is(lsched.ref_start(big_timeout), big_timeout, '+ ref') > + ok, err = lsched.move_start(small_timeout) > + test:ok(not ok and err, 'move fails') > + test:is(lsched.ref_count, 1, 'still 1 ref') > + test:is(lsched.ref_strike, 1, 'still 1 ref strike') > + test:is(lsched.move_count, 0, 'could not add move') > + test:is(lsched.move_queue, 0, 'empty move queue') > + lsched.ref_end(1) > +end > + > +local function test_negative_timeout(test) > + test:plan(12) > + > + -- > + -- Move works even with negative timeout if no refs. > + -- > + test:is(lsched.move_start(-1), -1, 'timeout does not matter if no refs') > + test:is(lsched.move_count, 1, '+1 move') > + > + -- > + -- Ref fails immediately if timeout negative and has moves. > + -- > + local csw = fiber_csw() > + local ok, err = lsched.ref_start(-1) > + test:ok(not ok and err, 'ref fails') > + test:is(csw, fiber_csw(), 'no yields') > + test:is(lsched.ref_count, 0, 'no refs') > + test:is(lsched.ref_queue, 0, 'no ref queue') > + > + -- > + -- Ref works even with negative timeout if no moves. > + -- > + lsched.move_end(1) > + test:is(lsched.ref_start(-1), -1, 'timeout does not matter if no moves') > + test:is(lsched.ref_count, 1, '+1 ref') > + > + -- > + -- Move fails immediately if timeout is negative and has refs. > + -- > + csw = fiber_csw() > + ok, err = lsched.move_start(-1) > + test:ok(not ok and err, 'move fails') > + test:is(csw, fiber_csw(), 'no yields') > + test:is(lsched.move_count, 0, 'no moves') > + test:is(lsched.move_queue, 0, 'no move queue') > + lsched.ref_end(1) > +end > + > +local function test_move_gc_ref(test) > + test:plan(10) > + > + -- > + -- Move deletes expired refs if it may help to start the move. > + -- > + for sid = 1, 10 do > + for rid = 1, 5 do > + lref.add(rid, sid, small_timeout) > + end > + end > + test:is(lsched.ref_count, 50, 'refs are in progress') > + local ok, err = lsched.move_start(-1) > + test:ok(not ok and err, 'move without timeout failed') > + > + fiber.sleep(small_timeout) > + test:is(lsched.move_start(-1), -1, 'succeeds even with negative timeout') > + test:is(lsched.ref_count, 0, 'all refs are expired and deleted') > + test:is(lref.count, 0, 'ref module knows about it') > + test:is(lsched.move_count, 1, 'move is started') > + lsched.move_end(1) > + > + -- > + -- May need more than 1 GC step. > + -- > + for rid = 1, 5 do > + lref.add(0, rid, small_timeout) > + end > + for rid = 1, 5 do > + lref.add(1, rid, small_timeout * 100) > + end > + local new_timeout = lsched.move_start(big_timeout) > + test:ok(new_timeout < big_timeout, 'succeeds by doing 2 gc steps') > + test:is(lsched.ref_count, 0, 'all refs are expired and deleted') > + test:is(lref.count, 0, 'ref module knows about it') > + test:is(lsched.move_count, 1, 'move is started') > + lsched.move_end(1) > +end > + > +local function test_ref_strike(test) > + test:plan(10) > + > + local quota = lsched.ref_quota > + -- > + -- Strike should stop new refs if they exceed the quota and there is a > + -- pending move. > + -- > + -- End ref strike if there was one. > + lsched.move_start(small_timeout) > + lsched.move_end(1) > + -- Ref strike starts. > + assert(lsched.ref_start(small_timeout)) > + > + local f = fiber.create(function() > + fiber_set_joinable() > + return lsched.move_start(big_timeout) > + end) > + test:is(lsched.move_queue, 1, 'move is queued') > + -- > + -- New refs should work only until quota is reached, because there is a > + -- pending move. > + -- > + for i = 1, quota - 1 do > + assert(lsched.ref_start(small_timeout)) > + end > + local ok, err = lsched.ref_start(small_timeout) > + test:ok(not ok and err, 'too long strike with move queue not empty') > + test:is(lsched.ref_strike, quota, 'max strike is reached') > + -- Even if number of current refs decreases, new still are not accepted. > + -- Because there was too many in a row while a new move was waiting. > + lsched.ref_end(1) > + ok, err = lsched.ref_start(small_timeout) > + test:ok(not ok and err, 'still too long strike after one unref') > + test:is(lsched.ref_strike, quota, 'strike is unchanged') > + > + lsched.ref_end(quota - 1) > + local new_timeout > + ok, new_timeout = f:join() > + test:ok(ok and new_timeout < big_timeout, 'move succeeded') > + test:is(lsched.move_count, 1, '+1 move') > + test:is(lsched.move_strike, 1, '+1 move strike') > + test:is(lsched.ref_count, 0, 'no refs') > + test:is(lsched.ref_strike, 0, 'no ref strike') > + lsched.move_end(1) > +end > + > +local function test_move_strike(test) > + test:plan(10) > + > + local quota = lsched.move_quota > + -- > + -- Strike should stop new moves if they exceed the quota and there is a > + -- pending ref. > + -- > + -- End move strike if there was one. > + lsched.ref_start(small_timeout) > + lsched.ref_end(1) > + -- Move strike starts. > + assert(lsched.move_start(small_timeout)) > + > + local f = fiber.create(function() > + fiber_set_joinable() > + return lsched.ref_start(big_timeout) > + end) > + test:is(lsched.ref_queue, 1, 'ref is queued') > + -- > + -- New moves should work only until quota is reached, because there is a > + -- pending ref. > + -- > + for i = 1, quota - 1 do > + assert(lsched.move_start(small_timeout)) > + end > + local ok, err = lsched.move_start(small_timeout) > + test:ok(not ok and err, 'too long strike with ref queue not empty') > + test:is(lsched.move_strike, quota, 'max strike is reached') > + -- Even if number of current moves decreases, new still are not accepted. > + -- Because there was too many in a row while a new ref was waiting. > + lsched.move_end(1) > + ok, err = lsched.move_start(small_timeout) > + test:ok(not ok and err, 'still too long strike after one move end') > + test:is(lsched.move_strike, quota, 'strike is unchanged') > + > + lsched.move_end(quota - 1) > + local new_timeout > + ok, new_timeout = f:join() > + test:ok(ok and new_timeout < big_timeout, 'ref succeeded') > + test:is(lsched.ref_count, 1, '+1 ref') > + test:is(lsched.ref_strike, 1, '+1 ref strike') > + test:is(lsched.move_count, 0, 'no moves') > + test:is(lsched.move_strike, 0, 'no move strike') > + lsched.ref_end(1) > +end > + > +local function test_ref_increase_quota(test) > + test:plan(4) > + > + local quota = lsched.ref_quota > + -- > + -- Ref quota increase allows to do more refs even if there are pending > + -- moves. > + -- > + -- End ref strike if there was one. > + lsched.move_start(big_timeout) > + lsched.move_end(1) > + -- Fill the quota. > + for _ = 1, quota do > + assert(lsched.ref_start(big_timeout)) > + end > + -- Start move to block new refs by quota. > + local f = fiber.create(function() > + fiber_set_joinable() > + return lsched.move_start(big_timeout) > + end) > + test:ok(not lsched.ref_start(small_timeout), 'can not add ref - full quota') > + > + lsched.cfg({sched_ref_quota = quota + 1}) > + test:ok(lsched.ref_start(small_timeout), 'now can add - quota is extended') > + > + -- Decrease quota - should not accept new refs again. > + lsched.cfg{sched_ref_quota = quota} > + test:ok(not lsched.ref_start(small_timeout), 'full quota again') > + > + lsched.ref_end(quota + 1) > + local ok, new_timeout = f:join() > + test:ok(ok and new_timeout < big_timeout, 'move started') > + lsched.move_end(1) > +end > + > +local function test_move_increase_quota(test) > + test:plan(4) > + > + local quota = lsched.move_quota > + -- > + -- Move quota increase allows to do more moves even if there are pending > + -- refs. > + -- > + -- End move strike if there was one. > + lsched.ref_start(big_timeout) > + lsched.ref_end(1) > + -- Fill the quota. > + for _ = 1, quota do > + assert(lsched.move_start(big_timeout)) > + end > + -- Start ref to block new moves by quota. > + local f = fiber.create(function() > + fiber_set_joinable() > + return lsched.ref_start(big_timeout) > + end) > + test:ok(not lsched.move_start(small_timeout), 'can not add move - full quota') > + > + lsched.cfg({sched_move_quota = quota + 1}) > + test:ok(lsched.move_start(small_timeout), 'now can add - quota is extended') > + > + -- Decrease quota - should not accept new moves again. > + lsched.cfg{sched_move_quota = quota} > + test:ok(not lsched.move_start(small_timeout), 'full quota again') > + > + lsched.move_end(quota + 1) > + local ok, new_timeout = f:join() > + test:ok(ok and new_timeout < big_timeout, 'ref started') > + lsched.ref_end(1) > +end > + > +local function test_ref_decrease_quota(test) > + test:plan(4) > + > + local old_quota = lsched.ref_quota > + -- > + -- Quota decrease should not affect any existing operations or break > + -- anything. > + -- > + lsched.cfg({sched_ref_quota = 10}) > + for _ = 1, 5 do > + assert(lsched.ref_start(big_timeout)) > + end > + test:is(lsched.ref_count, 5, 'started refs below quota') > + > + local f = fiber.create(function() > + fiber_set_joinable() > + return lsched.move_start(big_timeout) > + end) > + test:ok(lsched.ref_start(big_timeout), 'another ref after move queued') > + > + lsched.cfg({sched_ref_quota = 2}) > + test:ok(not lsched.ref_start(small_timeout), 'quota decreased - can not '.. > + 'start ref') > + > + lsched.ref_end(6) > + local ok, new_timeout = f:join() > + test:ok(ok and new_timeout, 'move is started') > + lsched.move_end(1) > + > + lsched.cfg({sched_ref_quota = old_quota}) > +end > + > +local function test_move_decrease_quota(test) > + test:plan(4) > + > + local old_quota = lsched.move_quota > + -- > + -- Quota decrease should not affect any existing operations or break > + -- anything. > + -- > + lsched.cfg({sched_move_quota = 10}) > + for _ = 1, 5 do > + assert(lsched.move_start(big_timeout)) > + end > + test:is(lsched.move_count, 5, 'started moves below quota') > + > + local f = fiber.create(function() > + fiber_set_joinable() > + return lsched.ref_start(big_timeout) > + end) > + test:ok(lsched.move_start(big_timeout), 'another move after ref queued') > + > + lsched.cfg({sched_move_quota = 2}) > + test:ok(not lsched.move_start(small_timeout), 'quota decreased - can not '.. > + 'start move') > + > + lsched.move_end(6) > + local ok, new_timeout = f:join() > + test:ok(ok and new_timeout, 'ref is started') > + lsched.ref_end(1) > + > + lsched.cfg({sched_move_quota = old_quota}) > +end > + > +local function test_ref_zero_quota(test) > + test:plan(6) > + > + local old_quota = lsched.ref_quota > + -- > + -- Zero quota is a valid value. Moreover, it is special. It means the > + -- 0-quoted operation should always be paused in favor of the other > + -- operation. > + -- > + lsched.cfg({sched_ref_quota = 0}) > + test:ok(lsched.ref_start(big_timeout), 'started ref with 0 quota') > + > + local f = fiber.create(function() > + fiber_set_joinable() > + return lsched.move_start(big_timeout) > + end) > + test:ok(not lsched.ref_start(small_timeout), 'can not add more refs if '.. > + 'move is queued - quota 0') > + > + lsched.ref_end(1) > + local ok, new_timeout = f:join() > + test:ok(ok and new_timeout, 'move is started') > + > + -- Ensure ref never starts if there are always moves, when quota is 0. > + f = fiber.create(function() > + fiber_set_joinable() > + return lsched.ref_start(big_timeout) > + end) > + local move_count = lsched.move_quota + 3 > + -- Start from 2 to account the already existing move. > + for _ = 2, move_count do > + -- Start one new move. > + assert(lsched.move_start(big_timeout)) > + -- Start second new move. > + assert(lsched.move_start(big_timeout)) > + -- End first move. > + lsched.move_end(1) > + -- In result the moves are always interleaving - no time for refs at > + -- all. > + end > + test:is(lsched.move_count, move_count, 'moves exceed quota') > + test:ok(lsched.move_strike > move_count, 'strike is not interrupted') > + > + lsched.move_end(move_count) > + ok, new_timeout = f:join() > + test:ok(ok and new_timeout, 'ref finally started') > + lsched.ref_end(1) > + > + lsched.cfg({sched_ref_quota = old_quota}) > +end > + > +local function test_move_zero_quota(test) > + test:plan(6) > + > + local old_quota = lsched.move_quota > + -- > + -- Zero quota is a valid value. Moreover, it is special. It means the > + -- 0-quoted operation should always be paused in favor of the other > + -- operation. > + -- > + lsched.cfg({sched_move_quota = 0}) > + test:ok(lsched.move_start(big_timeout), 'started move with 0 quota') > + > + local f = fiber.create(function() > + fiber_set_joinable() > + return lsched.ref_start(big_timeout) > + end) > + test:ok(not lsched.move_start(small_timeout), 'can not add more moves if '.. > + 'ref is queued - quota 0') > + > + lsched.move_end(1) > + local ok, new_timeout = f:join() > + test:ok(ok and new_timeout, 'ref is started') > + > + -- Ensure move never starts if there are always refs, when quota is 0. > + f = fiber.create(function() > + fiber_set_joinable() > + return lsched.move_start(big_timeout) > + end) > + local ref_count = lsched.ref_quota + 3 > + -- Start from 2 to account the already existing ref. > + for _ = 2, ref_count do > + -- Start one new ref. > + assert(lsched.ref_start(big_timeout)) > + -- Start second new ref. > + assert(lsched.ref_start(big_timeout)) > + -- End first ref. > + lsched.ref_end(1) > + -- In result the refs are always interleaving - no time for moves at > + -- all. > + end > + test:is(lsched.ref_count, ref_count, 'refs exceed quota') > + test:ok(lsched.ref_strike > ref_count, 'strike is not interrupted') > + > + lsched.ref_end(ref_count) > + ok, new_timeout = f:join() > + test:ok(ok and new_timeout, 'move finally started') > + lsched.move_end(1) > + > + lsched.cfg({sched_move_quota = old_quota}) > +end > + > +test:plan(11) > + > +-- Change default values. Move is 1 by default, which would reduce the number of > +-- possible tests. Ref is decreased to speed the tests up. > +lsched.cfg({sched_ref_quota = 10, sched_move_quota = 5}) > + > +test:test('basic', test_basic) > +test:test('negative timeout', test_negative_timeout) > +test:test('ref gc', test_move_gc_ref) > +test:test('ref strike', test_ref_strike) > +test:test('move strike', test_move_strike) > +test:test('ref add quota', test_ref_increase_quota) > +test:test('move add quota', test_move_increase_quota) > +test:test('ref decrease quota', test_ref_decrease_quota) > +test:test('move decrease quota', test_move_decrease_quota) > +test:test('ref zero quota', test_ref_zero_quota) > +test:test('move zero quota', test_move_zero_quota) > + > +os.exit(test:check() and 0 or 1) > diff --git a/test/unit/config.result b/test/unit/config.result > index e0b2482..9df3bf1 100644 > --- a/test/unit/config.result > +++ b/test/unit/config.result > @@ -597,3 +597,62 @@ cfg.collect_bucket_garbage_interval = 100 > _ = lcfg.check(cfg) > --- > ... > +-- > +-- gh-147: router map-reduce. It adds scheduler options on the storage. > +-- > +cfg.sched_ref_quota = 100 > +--- > +... > +_ = lcfg.check(cfg) > +--- > +... > +cfg.sched_ref_quota = 1 > +--- > +... > +_ = lcfg.check(cfg) > +--- > +... > +cfg.sched_ref_quota = 0 > +--- > +... > +_ = lcfg.check(cfg) > +--- > +... > +cfg.sched_ref_quota = -1 > +--- > +... > +util.check_error(lcfg.check, cfg) > +--- > +- Scheduler storage ref quota must be non-negative number > +... > +cfg.sched_ref_quota = nil > +--- > +... > +cfg.sched_move_quota = 100 > +--- > +... > +_ = lcfg.check(cfg) > +--- > +... > +cfg.sched_move_quota = 1 > +--- > +... > +_ = lcfg.check(cfg) > +--- > +... > +cfg.sched_move_quota = 0 > +--- > +... > +_ = lcfg.check(cfg) > +--- > +... > +cfg.sched_move_quota = -1 > +--- > +... > +util.check_error(lcfg.check, cfg) > +--- > +- Scheduler bucket move quota must be non-negative number > +... > +cfg.sched_move_quota = nil > +--- > +... > diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua > index a1c9f07..473e460 100644 > --- a/test/unit/config.test.lua > +++ b/test/unit/config.test.lua > @@ -241,3 +241,26 @@ cfg.rebalancer_max_sending = nil > -- > cfg.collect_bucket_garbage_interval = 100 > _ = lcfg.check(cfg) > + > +-- > +-- gh-147: router map-reduce. It adds scheduler options on the storage. > +-- > +cfg.sched_ref_quota = 100 > +_ = lcfg.check(cfg) > +cfg.sched_ref_quota = 1 > +_ = lcfg.check(cfg) > +cfg.sched_ref_quota = 0 > +_ = lcfg.check(cfg) > +cfg.sched_ref_quota = -1 > +util.check_error(lcfg.check, cfg) > +cfg.sched_ref_quota = nil > + > +cfg.sched_move_quota = 100 > +_ = lcfg.check(cfg) > +cfg.sched_move_quota = 1 > +_ = lcfg.check(cfg) > +cfg.sched_move_quota = 0 > +_ = lcfg.check(cfg) > +cfg.sched_move_quota = -1 > +util.check_error(lcfg.check, cfg) > +cfg.sched_move_quota = nil > diff --git a/vshard/cfg.lua b/vshard/cfg.lua > index 63d5414..30f8794 100644 > --- a/vshard/cfg.lua > +++ b/vshard/cfg.lua > @@ -274,6 +274,14 @@ local cfg_template = { > type = 'string', name = 'Discovery mode: on, off, once', > is_optional = true, default = 'on', check = check_discovery_mode > }, > + sched_ref_quota = { > + name = 'Scheduler storage ref quota', type = 'non-negative number', > + is_optional = true, default = consts.DEFAULT_SCHED_REF_QUOTA > + }, > + sched_move_quota = { > + name = 'Scheduler bucket move quota', type = 'non-negative number', > + is_optional = true, default = consts.DEFAULT_SCHED_MOVE_QUOTA > + }, > } > > -- > diff --git a/vshard/consts.lua b/vshard/consts.lua > index 0ffe0e2..47a893b 100644 > --- a/vshard/consts.lua > +++ b/vshard/consts.lua > @@ -41,6 +41,11 @@ return { > GC_BACKOFF_INTERVAL = 5, > RECOVERY_BACKOFF_INTERVAL = 5, > COLLECT_LUA_GARBAGE_INTERVAL = 100; > + DEFAULT_BUCKET_SEND_TIMEOUT = 10, > + DEFAULT_BUCKET_RECV_TIMEOUT = 10, > + > + DEFAULT_SCHED_REF_QUOTA = 300, > + DEFAULT_SCHED_MOVE_QUOTA = 1, > > DISCOVERY_IDLE_INTERVAL = 10, > DISCOVERY_WORK_INTERVAL = 1, > diff --git a/vshard/storage/CMakeLists.txt b/vshard/storage/CMakeLists.txt > index 7c1e97d..396664a 100644 > --- a/vshard/storage/CMakeLists.txt > +++ b/vshard/storage/CMakeLists.txt > @@ -1,2 +1,2 @@ > -install(FILES init.lua reload_evolution.lua ref.lua > +install(FILES init.lua reload_evolution.lua ref.lua sched.lua > DESTINATION ${TARANTOOL_INSTALL_LUADIR}/vshard/storage) > diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua > index 2957f48..31f668f 100644 > --- a/vshard/storage/init.lua > +++ b/vshard/storage/init.lua > @@ -17,7 +17,7 @@ if rawget(_G, MODULE_INTERNALS) then > 'vshard.replicaset', 'vshard.util', > 'vshard.storage.reload_evolution', > 'vshard.lua_gc', 'vshard.rlist', 'vshard.registry', > - 'vshard.heap', 'vshard.storage.ref', > + 'vshard.heap', 'vshard.storage.ref', 'vshard.storage.sched', > } > for _, module in pairs(vshard_modules) do > package.loaded[module] = nil > @@ -32,6 +32,7 @@ local util = require('vshard.util') > local lua_gc = require('vshard.lua_gc') > local lregistry = require('vshard.registry') > local lref = require('vshard.storage.ref') > +local lsched = require('vshard.storage.sched') > local reload_evolution = require('vshard.storage.reload_evolution') > local fiber_cond_wait = util.fiber_cond_wait > local bucket_ref_new > @@ -1142,16 +1143,33 @@ local function bucket_recv_xc(bucket_id, from, data, opts) > return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, msg, > from) > end > - if lref.count > 0 then > - return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED) > - end > if is_this_replicaset_locked() then > return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED) > end > if not bucket_receiving_quota_add(-1) then > return nil, lerror.vshard(lerror.code.TOO_MANY_RECEIVING) > end > - _bucket:insert({bucket_id, recvg, from}) > + local timeout = opts and opts.timeout or > + consts.DEFAULT_BUCKET_SEND_TIMEOUT > + local ok, err = lsched.move_start(timeout) > + if not ok then > + return nil, err > + end > + assert(lref.count == 0) > + -- Move schedule is done only for the time of _bucket update. > + -- The reason is that one bucket_send() calls bucket_recv() on the > + -- remote storage multiple times. If the latter would schedule new moves > + -- on each call, it could happen that the scheduler would block it in > + -- favor of refs right in the middle of bucket_send(). > + -- It would lead to a deadlock, because refs won't be able to start - > + -- the bucket won't be writable. > + -- This way still provides fair scheduling, but does not have the > + -- described issue. > + ok, err = pcall(_bucket.insert, _bucket, {bucket_id, recvg, from}) > + lsched.move_end(1) > + if not ok then > + return nil, lerror.make(err) > + end > elseif b.status ~= recvg then > local msg = string.format("bucket state is changed: was receiving, ".. > "became %s", b.status) > @@ -1434,7 +1452,7 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard) > ref.rw_lock = true > exception_guard.ref = ref > exception_guard.drop_rw_lock = true > - local timeout = opts and opts.timeout or 10 > + local timeout = opts and opts.timeout or consts.DEFAULT_BUCKET_SEND_TIMEOUT > local deadline = fiber_clock() + timeout > while ref.rw ~= 0 do > timeout = deadline - fiber_clock() > @@ -1446,9 +1464,6 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard) > > local _bucket = box.space._bucket > local bucket = _bucket:get({bucket_id}) > - if lref.count > 0 then > - return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED) > - end > if is_this_replicaset_locked() then > return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED) > end > @@ -1468,7 +1483,25 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard) > local idx = M.shard_index > local bucket_generation = M.bucket_generation > local sendg = consts.BUCKET.SENDING > - _bucket:replace({bucket_id, sendg, destination}) > + > + local ok, err = lsched.move_start(timeout) > + if not ok then > + return nil, err > + end > + assert(lref.count == 0) > + -- Move is scheduled only for the time of _bucket update because: > + -- > + -- * it is consistent with bucket_recv() (see its comments); > + -- > + -- * gives the same effect as if move was in the scheduler for the whole > + -- bucket_send() time, because refs won't be able to start anyway - the > + -- bucket is not writable. > + ok, err = pcall(_bucket.replace, _bucket, {bucket_id, sendg, destination}) > + lsched.move_end(1) > + if not ok then > + return nil, lerror.make(err) > + end > + > -- From this moment the bucket is SENDING. Such a status is > -- even stronger than the lock. > ref.rw_lock = false > @@ -2542,6 +2575,7 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload) > M.bucket_on_replace = bucket_generation_increment > end > > + lsched.cfg(vshard_cfg) > lreplicaset.rebind_replicasets(new_replicasets, M.replicasets) > lreplicaset.outdate_replicasets(M.replicasets) > M.replicasets = new_replicasets > diff --git a/vshard/storage/ref.lua b/vshard/storage/ref.lua > index 7589cb9..2daad6b 100644 > --- a/vshard/storage/ref.lua > +++ b/vshard/storage/ref.lua > @@ -33,6 +33,7 @@ local lregistry = require('vshard.registry') > local fiber_clock = lfiber.clock > local fiber_yield = lfiber.yield > local DEADLINE_INFINITY = lconsts.DEADLINE_INFINITY > +local TIMEOUT_INFINITY = lconsts.TIMEOUT_INFINITY > local LUA_CHUNK_SIZE = lconsts.LUA_CHUNK_SIZE > > -- > @@ -88,6 +89,7 @@ local function ref_session_new(sid) > -- Cache global session storages as upvalues to save on M indexing. > local global_heap = M.session_heap > local global_map = M.session_map > + local sched = lregistry.storage_sched > > local function ref_session_discount(self, del_count) > local new_count = M.count - del_count > @@ -97,6 +99,8 @@ local function ref_session_new(sid) > new_count = count - del_count > assert(new_count >= 0) > count = new_count > + > + sched.ref_end(del_count) > end > > local function ref_session_update_deadline(self) > @@ -310,10 +314,17 @@ local function ref_add(rid, sid, timeout) > local deadline = now + timeout > local ok, err, session > local storage = lregistry.storage > + local sched = lregistry.storage_sched > + > + timeout, err = sched.ref_start(timeout) > + if not timeout then > + return nil, err > + end > + > while not storage.bucket_are_all_rw() do > ok, err = storage.bucket_generation_wait(timeout) > if not ok then > - return nil, err > + goto fail_sched > end > now = fiber_clock() > timeout = deadline - now > @@ -322,7 +333,13 @@ local function ref_add(rid, sid, timeout) > if not session then > session = ref_session_new(sid) > end > - return session:add(rid, deadline, now) > + ok, err = session:add(rid, deadline, now) > + if ok then > + return true > + end > +::fail_sched:: > + sched.ref_end(1) > + return nil, err > end > > local function ref_use(rid, sid) > @@ -341,6 +358,14 @@ local function ref_del(rid, sid) > return session:del(rid) > end > > +local function ref_next_deadline() > + local session = M.session_heap:top() > + if not session then > + return fiber_clock() + TIMEOUT_INFINITY > + end Does it make sence? inf + fiber_clock() = inf > + return session.deadline > +end > + > local function ref_kill_session(sid) > local session = M.session_map[sid] > if session then > @@ -366,6 +391,7 @@ M.add = ref_add > M.use = ref_use > M.cfg = ref_cfg > M.kill = ref_kill_session > +M.next_deadline = ref_next_deadline > lregistry.storage_ref = M > > return M > diff --git a/vshard/storage/sched.lua b/vshard/storage/sched.lua > new file mode 100644 > index 0000000..0ac71f4 > --- /dev/null > +++ b/vshard/storage/sched.lua > @@ -0,0 +1,231 @@ > +-- > +-- Scheduler module ensures fair time sharing between incompatible operations: > +-- storage refs and bucket moves. > +-- Storage ref is supposed to prevent all bucket moves and provide safe > +-- environment for all kinds of possible requests on entire dataset of all > +-- spaces stored on the instance. > +-- Bucket move, on the contrary, wants to make a part of the dataset not usable > +-- temporary. > +-- Without a scheduler it would be possible to always keep at least one ref on > +-- the storage and block bucket moves forever. Or vice versa - during > +-- rebalancing block all incoming refs for the entire time of data migration, > +-- essentially making map-reduce not usable since it heavily depends on refs. > +-- > +-- The schedule divides storage time between refs and moves so both of them can > +-- execute without blocking each other. Division proportions depend on the > +-- configuration settings. > +-- > +-- Idea of non-blockage is based on quotas and strikes. Move and ref both have > +-- quotas. When one op executes more than quota requests in a row (makes a > +-- strike) while the other op has queued requests, the first op stops accepting > +-- new requests until the other op executes. > +-- > + > +local MODULE_INTERNALS = '__module_vshard_storage_sched' > +-- Update when change behaviour of anything in the file, to be able to reload. > +local MODULE_VERSION = 1 > + > +local lfiber = require('fiber') > +local lerror = require('vshard.error') > +local lconsts = require('vshard.consts') > +local lregistry = require('vshard.registry') > +local lutil = require('vshard.util') > +local fiber_clock = lfiber.clock > +local fiber_cond_wait = lutil.fiber_cond_wait > +local fiber_is_self_canceled = lutil.fiber_is_self_canceled > + > +local M = rawget(_G, MODULE_INTERNALS) > +if not M then > + M = { > + ---------------- Common module attributes ---------------- > + module_version = MODULE_VERSION, > + -- Scheduler condition is signaled every time anything significant > + -- happens - count of an operation type drops to 0, or quota increased, > + -- etc. > + cond = lfiber.cond(), > + > + -------------------------- Refs -------------------------- > + -- Number of ref requests waiting for start. > + ref_queue = 0, > + -- Number of ref requests being executed. It is the same as ref's module > + -- counter, but is duplicated here for the sake of isolation and > + -- symmetry with moves. > + ref_count = 0, > + -- Number of ref requests executed in a row. When becomes bigger than > + -- quota, any next queued move blocks new refs. > + ref_strike = 0, > + ref_quota = lconsts.DEFAULT_SCHED_REF_QUOTA, > + > + ------------------------- Moves -------------------------- > + -- Number of move requests waiting for start. > + move_queue = 0, > + -- Number of move requests being executed. > + move_count = 0, > + -- Number of move requests executed in a row. When becomes bigger than > + -- quota, any next queued ref blocks new moves. > + move_strike = 0, > + move_quota = lconsts.DEFAULT_SCHED_MOVE_QUOTA, > + } > +else > + return M > +end > + > +local function sched_wait_anything(timeout) > + return fiber_cond_wait(M.cond, timeout) > +end > + > +-- > +-- Return the remaining timeout in case there was a yield. This helps to save > +-- current clock get in the caller code if there were no yields. > +-- > +local function sched_ref_start(timeout) > + local deadline = fiber_clock() + timeout Let's do it after fast check to eliminate excess fiber_clock call. Also there are several similar places below. Please fix them as well. > + local ok, err > + -- Fast-path. Moves are extremely rare. No need to inc-dec the ref queue > + -- then nor try to start some loops. > + if M.move_count == 0 and M.move_queue == 0 then > + goto success > + end > + > + M.ref_queue = M.ref_queue + 1 > + > +::retry:: > + if M.move_count > 0 then > + goto wait_and_retry > + end > + -- Even if move count is zero, must ensure the time usage is fair. Does not > + -- matter in case the moves have no quota at all. That allows to ignore them > + -- infinitely until all refs end voluntarily. > + if M.move_queue > 0 and M.ref_strike >= M.ref_quota and > + M.move_quota > 0 then > + goto wait_and_retry > + end > + > + M.ref_queue = M.ref_queue - 1 > + > +::success:: > + M.ref_count = M.ref_count + 1 > + M.ref_strike = M.ref_strike + 1 > + M.move_strike = 0 > + do return timeout end > + > +::wait_and_retry:: > + ok, err = sched_wait_anything(timeout) > + if not ok then > + M.ref_queue = M.ref_queue - 1 > + return nil, err > + end > + timeout = deadline - fiber_clock() > + goto retry > +end > + > +local function sched_ref_end(count) > + count = M.ref_count - count > + M.ref_count = count > + if count == 0 and M.move_queue > 0 then > + M.cond:broadcast() > + end > +end > + > +-- > +-- Return the remaining timeout in case there was a yield. This helps to save > +-- current clock get in the caller code if there were no yields. > +-- > +local function sched_move_start(timeout) > + local deadline = fiber_clock() + timeout > + local ok, err, ref_deadline > + local lref = lregistry.storage_ref > + -- Fast-path. Refs are not extremely rare *when used*. But they are not > + -- expected to be used in a lot of installations. So most of the times the > + -- moves should work right away. > + if M.ref_count == 0 and M.ref_queue == 0 then > + goto success > + end > + > + M.move_queue = M.move_queue + 1 > + > +::retry:: > + if M.ref_count > 0 then > + ref_deadline = lref.next_deadline() > + if ref_deadline < deadline then > + timeout = ref_deadline - fiber_clock() > + end > + ok, err = sched_wait_anything(timeout) > + timeout = deadline - fiber_clock() > + if ok then > + goto retry > + end > + if fiber_is_self_canceled() then > + goto fail > + end > + -- Even if the timeout has expired already (or was 0 from the > + -- beginning), it is still possible the move can be started if all the > + -- present refs are expired too and can be collected. > + lref.gc() > + -- GC could yield - need to refetch the clock again. > + timeout = deadline - fiber_clock() > + if M.ref_count > 0 then > + if timeout < 0 then > + goto fail > + end > + goto retry > + end > + end > + > + if M.ref_queue > 0 and M.move_strike >= M.move_quota and > + M.ref_quota > 0 then > + ok, err = sched_wait_anything(timeout) > + if not ok then > + goto fail > + end > + timeout = deadline - fiber_clock() > + goto retry > + end > + > + M.move_queue = M.move_queue - 1 > + > +::success:: > + M.move_count = M.move_count + 1 > + M.move_strike = M.move_strike + 1 > + M.ref_strike = 0 > + do return timeout end > + > +::fail:: > + M.move_queue = M.move_queue - 1 > + return nil, err > +end > + > +local function sched_move_end(count) > + count = M.move_count - count > + M.move_count = count > + if count == 0 and M.ref_queue > 0 then > + M.cond:broadcast() > + end > +end > + > +local function sched_cfg(cfg) > + local new_ref_quota = cfg.sched_ref_quota > + local new_move_quota = cfg.sched_move_quota > + > + if new_ref_quota then > + if new_ref_quota > M.ref_quota then > + M.cond:broadcast() > + end > + M.ref_quota = new_ref_quota > + end > + if new_move_quota then > + if new_move_quota > M.move_quota then > + M.cond:broadcast() > + end > + M.move_quota = new_move_quota > + end > +end > + > +M.ref_start = sched_ref_start > +M.ref_end = sched_ref_end > +M.move_start = sched_move_start > +M.move_end = sched_move_end > +M.cfg = sched_cfg > +lregistry.storage_sched = M > + > +return M