Tarantool development patches archive
 help / color / mirror / Atom feed
From: Oleg Babin via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>,
	tarantool-patches@dev.tarantool.org,
	yaroslav.dynnikov@tarantool.org
Subject: Re: [Tarantool-patches] [PATCH vshard 10/11] sched: introduce vshard.storage.sched module
Date: Wed, 24 Feb 2021 13:28:08 +0300	[thread overview]
Message-ID: <5e011bc2-426b-e59a-f165-fef74f998b18@tarantool.org> (raw)
In-Reply-To: <02df9747667f254ee5734cfafa681eead566f212.1614039039.git.v.shpilevoy@tarantool.org>

Thanks for you patch. It's a brief review - I hope I'll look once again 
on this patch.

Consider 2 comments below.


On 23.02.2021 03:15, Vladislav Shpilevoy wrote:
> 'vshard.storage.sched' module ensures that two incompatible
> operations share storage time fairly - storage refs and bucket
> moves.
>
> Storage refs are going to be used by map-reduce API to preserve
> data consistency while map requests are in progress on all
> storages.
>
> It means storage refs will be used as commonly as bucket refs,
> and should not block the rebalancer. However it is hard not to
> block the rebalancer forever if there are always refs on the
> storage.
>
> With bucket refs it was easy - one bucket temporary block is not a
> big deal. So rebalancer always has higher prio than bucket refs,
> and it still does not block requests for the other buckets +
> read-only requests on the subject bucket.
>
> With storage refs having rebalancer with a higher prio would make
> map-reduce requests die in the entire cluster for the whole time
> of rebalancing, which can be as long as hours or even days. It
> wouldn't be acceptable.
>
> The new module vshard.storage.sched shares time between moves and
> storeage refs fairly. They both get time to execute with
> proportions configures by user. The proportions depend on how
> big is a bucket, how long the map-reduce requests are expected to
> be. The longer is a request, the less quota it should be given,
> typically.
>
> The patch introduces new storage options to configure the
> scheduling.
>
> Part of #147
>
> @TarantoolBot document
> Title: vshard.storage.cfg new options - sched_ref_quota and sched_move_quota
>
> There are new options for `vshard.storage.cfg`: `sched_ref_quota`
> and `sched_move_quota`. The options control how much time should
> be given to storage refs and bucket moves - two incompatible but
> important operations.
>
> Storage refs are used by router's map-reduce API. Each map-reduce
> call creates storage refs on all storages to prevent data
> migration on them for the map execution.
>
> Bucket moves are used by the rebalancer. Obviously, they are
> incompatible with the storage refs.
>
> If vshard would prefer one operation to another always, it would
> lead to starvation of one of them. For example, if storage refs
> would be prefered, rebalancing could just never work if there are
> always refs under constant map-reduce load. If bucket moves would
> be prefered, storage refs (and therefore map-reduce) would stop
> for the entire rebalancing time which can be quite long (hours,
> days).
>
> To control how much time to give to which operation the new
> options serve.
>
> `sched_ref_quota` tells how many storage refs (therefore
> map-reduce requests) can be executed on the storage in a row if
> there are pending bucket moves, before they are blocked to let the
> moves work. Default value is 300.
>
> `sched_move_quota` controls the same, but vice-versa: how many
> bucket moves can be done in a row if there are pending refs.
> Default value is 1.
>
> Map-reduce requests are expected to be much shorter than bucket
> moves, so storage refs by default have a higher quota.
>
> This is how it works on an example. Assume map-reduces start.
> They execute one after another, 150 requests in a row. Now the
> rebalancer wakes up and wants to move some buckets. He stands into
> a queue and waits for the storage refs to be gone.
>
> But the ref quota is not reached yet, so the storage still can
> execute +150 map-reduces even with the queued bucket moves until
> new refs are blocked, and the moves start.
> ---
>   test/reload_evolution/storage.result |   2 +-
>   test/storage/ref.result              |  19 +-
>   test/storage/ref.test.lua            |   9 +-
>   test/storage/scheduler.result        | 410 ++++++++++++++++++++
>   test/storage/scheduler.test.lua      | 178 +++++++++
>   test/unit-tap/ref.test.lua           |   7 +-
>   test/unit-tap/scheduler.test.lua     | 555 +++++++++++++++++++++++++++
>   test/unit/config.result              |  59 +++
>   test/unit/config.test.lua            |  23 ++
>   vshard/cfg.lua                       |   8 +
>   vshard/consts.lua                    |   5 +
>   vshard/storage/CMakeLists.txt        |   2 +-
>   vshard/storage/init.lua              |  54 ++-
>   vshard/storage/ref.lua               |  30 +-
>   vshard/storage/sched.lua             | 231 +++++++++++
>   15 files changed, 1567 insertions(+), 25 deletions(-)
>   create mode 100644 test/storage/scheduler.result
>   create mode 100644 test/storage/scheduler.test.lua
>   create mode 100755 test/unit-tap/scheduler.test.lua
>   create mode 100644 vshard/storage/sched.lua
>
> diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result
> index c4a0cdd..77010a2 100644
> --- a/test/reload_evolution/storage.result
> +++ b/test/reload_evolution/storage.result
> @@ -258,7 +258,7 @@ ok, err = vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2],
>   ...
>   assert(not ok and err.message)
>   ---
> -- Storage is referenced
> +- Timeout exceeded
>   ...
>   lref.del(0, 0)
>   ---
> diff --git a/test/storage/ref.result b/test/storage/ref.result
> index d5f4166..59f07f4 100644
> --- a/test/storage/ref.result
> +++ b/test/storage/ref.result
> @@ -84,18 +84,22 @@ big_timeout = 1000000
>   small_timeout = 0.001
>    | ---
>    | ...
> +
> +timeout = 0.01
> + | ---
> + | ...
>   lref.add(rid, sid, big_timeout)
>    | ---
>    | - true
>    | ...
>   -- Send fails.
>   ok, err = vshard.storage.bucket_send(1, util.replicasets[2],                    \
> -                                     {timeout = big_timeout})
> +                                     {timeout = timeout})
>    | ---
>    | ...
>   assert(not ok and err.message)
>    | ---
> - | - Storage is referenced
> + | - Timeout exceeded
>    | ...
>   lref.use(rid, sid)
>    | ---
> @@ -103,12 +107,12 @@ lref.use(rid, sid)
>    | ...
>   -- Still fails - use only makes ref undead until it is deleted explicitly.
>   ok, err = vshard.storage.bucket_send(1, util.replicasets[2],                    \
> -                                     {timeout = big_timeout})
> +                                     {timeout = timeout})
>    | ---
>    | ...
>   assert(not ok and err.message)
>    | ---
> - | - Storage is referenced
> + | - Timeout exceeded
>    | ...
>   
>   _ = test_run:switch('storage_2_a')
> @@ -118,13 +122,16 @@ _ = test_run:switch('storage_2_a')
>   big_timeout = 1000000
>    | ---
>    | ...
> +timeout = 0.01
> + | ---
> + | ...
>   ok, err = vshard.storage.bucket_send(1501, util.replicasets[1],                 \
> -                                     {timeout = big_timeout})
> +                                     {timeout = timeout})
>    | ---
>    | ...
>   assert(not ok and err.message)
>    | ---
> - | - Storage is referenced
> + | - Timeout exceeded
>    | ...
>   
>   --
> diff --git a/test/storage/ref.test.lua b/test/storage/ref.test.lua
> index b34a294..24303e2 100644
> --- a/test/storage/ref.test.lua
> +++ b/test/storage/ref.test.lua
> @@ -35,22 +35,25 @@ sid = 0
>   rid = 0
>   big_timeout = 1000000
>   small_timeout = 0.001
> +
> +timeout = 0.01
>   lref.add(rid, sid, big_timeout)
>   -- Send fails.
>   ok, err = vshard.storage.bucket_send(1, util.replicasets[2],                    \
> -                                     {timeout = big_timeout})
> +                                     {timeout = timeout})
>   assert(not ok and err.message)
>   lref.use(rid, sid)
>   -- Still fails - use only makes ref undead until it is deleted explicitly.
>   ok, err = vshard.storage.bucket_send(1, util.replicasets[2],                    \
> -                                     {timeout = big_timeout})
> +                                     {timeout = timeout})
>   assert(not ok and err.message)
>   
>   _ = test_run:switch('storage_2_a')
>   -- Receive (from another replicaset) also fails.
>   big_timeout = 1000000
> +timeout = 0.01
>   ok, err = vshard.storage.bucket_send(1501, util.replicasets[1],                 \
> -                                     {timeout = big_timeout})
> +                                     {timeout = timeout})
>   assert(not ok and err.message)
>   
>   --
> diff --git a/test/storage/scheduler.result b/test/storage/scheduler.result
> new file mode 100644
> index 0000000..0f53e42
> --- /dev/null
> +++ b/test/storage/scheduler.result
> @@ -0,0 +1,410 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
> + | ---
> + | ...
> +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
> + | ---
> + | ...
> +
> +test_run:create_cluster(REPLICASET_1, 'storage')
> + | ---
> + | ...
> +test_run:create_cluster(REPLICASET_2, 'storage')
> + | ---
> + | ...
> +util = require('util')
> + | ---
> + | ...
> +util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
> + | ---
> + | ...
> +util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
> + | ---
> + | ...
> +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
> + | ---
> + | ...
> +util.push_rs_filters(test_run)
> + | ---
> + | ...
> +
> +--
> +-- gh-147: scheduler helps to share time fairly between incompatible but
> +-- necessary operations - storage refs and bucket moves. Refs are used for the
> +-- consistent map-reduce feature when the whole cluster can be scanned without
> +-- being afraid that some data may slip through requests on behalf of the
> +-- rebalancer.
> +--
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +
> +vshard.storage.rebalancer_disable()
> + | ---
> + | ...
> +vshard.storage.bucket_force_create(1, 1500)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +vshard.storage.rebalancer_disable()
> + | ---
> + | ...
> +vshard.storage.bucket_force_create(1501, 1500)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +--
> +-- Bucket_send() uses the scheduler.
> +--
> +lsched = require('vshard.storage.sched')
> + | ---
> + | ...
> +assert(lsched.move_strike == 0)
> + | ---
> + | - true
> + | ...
> +assert(lsched.move_count == 0)
> + | ---
> + | - true
> + | ...
> +big_timeout = 1000000
> + | ---
> + | ...
> +big_timeout_opts = {timeout = big_timeout}
> + | ---
> + | ...
> +vshard.storage.bucket_send(1, util.replicasets[2], big_timeout_opts)
> + | ---
> + | - true
> + | ...
> +assert(lsched.move_strike == 1)
> + | ---
> + | - true
> + | ...
> +assert(lsched.move_count == 0)
> + | ---
> + | - true
> + | ...
> +wait_bucket_is_collected(1)
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +lsched = require('vshard.storage.sched')
> + | ---
> + | ...
> +--
> +-- Bucket_recv() uses the scheduler.
> +--
> +assert(lsched.move_strike == 1)
> + | ---
> + | - true
> + | ...
> +assert(lsched.move_count == 0)
> + | ---
> + | - true
> + | ...
> +
> +--
> +-- When move is in progress, it is properly accounted.
> +--
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +big_timeout = 1000000
> + | ---
> + | ...
> +big_timeout_opts = {timeout = big_timeout}
> + | ---
> + | ...
> +ok, err = nil
> + | ---
> + | ...
> +assert(lsched.move_strike == 1)
> + | ---
> + | - true
> + | ...
> +_ = fiber.create(function()                                                     \
> +    ok, err = vshard.storage.bucket_send(1, util.replicasets[1],                \
> +                                         big_timeout_opts)                      \
> +end)
> + | ---
> + | ...
> +-- Strike increase does not mean the move finished. It means it was successfully
> +-- scheduled.
> +assert(lsched.move_strike == 2)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return lsched.move_strike == 2 end)
> + | ---
> + | - true
> + | ...
> +
> +--
> +-- Ref is not allowed during move.
> +--
> +small_timeout = 0.000001
> + | ---
> + | ...
> +lref = require('vshard.storage.ref')
> + | ---
> + | ...
> +ok, err = lref.add(0, 0, small_timeout)
> + | ---
> + | ...
> +assert(not ok)
> + | ---
> + | - true
> + | ...
> +err.message
> + | ---
> + | - Timeout exceeded
> + | ...
> +-- Put it to wait until move is done.
> +ok, err = nil
> + | ---
> + | ...
> +_ = fiber.create(function() ok, err = lref.add(0, 0, big_timeout) end)
> + | ---
> + | ...
> +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return ok or err end)
> + | ---
> + | - true
> + | ...
> +ok, err
> + | ---
> + | - true
> + | - null
> + | ...
> +assert(lsched.move_count == 0)
> + | ---
> + | - true
> + | ...
> +wait_bucket_is_collected(1)
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +test_run:wait_cond(function() return ok or err end)
> + | ---
> + | - true
> + | ...
> +ok, err
> + | ---
> + | - true
> + | - null
> + | ...
> +assert(lsched.move_count == 0)
> + | ---
> + | - true
> + | ...
> +assert(lsched.ref_count == 1)
> + | ---
> + | - true
> + | ...
> +lref.del(0, 0)
> + | ---
> + | - true
> + | ...
> +assert(lsched.ref_count == 0)
> + | ---
> + | - true
> + | ...
> +
> +--
> +-- Refs can't block sends infinitely. The scheduler must be fair and share time
> +-- between ref/move.
> +--
> +do_refs = true
> + | ---
> + | ...
> +ref_worker_count = 10
> + | ---
> + | ...
> +function ref_worker()                                                           \
> +    while do_refs do                                                            \
> +        lref.add(0, 0, big_timeout)                                             \
> +        fiber.sleep(small_timeout)                                              \
> +        lref.del(0, 0)                                                          \
> +    end                                                                         \
> +    ref_worker_count = ref_worker_count - 1                                     \
> +end
> + | ---
> + | ...
> +-- Simulate many fibers doing something with a ref being kept.
> +for i = 1, ref_worker_count do fiber.create(ref_worker) end
> + | ---
> + | ...
> +assert(lref.count > 0)
> + | ---
> + | - true
> + | ...
> +assert(lsched.ref_count > 0)
> + | ---
> + | - true
> + | ...
> +-- Ensure it passes with default opts (when move is in great unfairness). It is
> +-- important. Because moves are expected to be much longer than refs, and must
> +-- not happen too often with ref load in progress. But still should eventually
> +-- be processed.
> +bucket_count = 100
> + | ---
> + | ...
> +bucket_id = 1
> + | ---
> + | ...
> +bucket_worker_count = 5
> + | ---
> + | ...
> +function bucket_worker()                                                        \
> +    while bucket_id <= bucket_count do                                          \
> +        local id = bucket_id                                                    \
> +        bucket_id = bucket_id + 1                                               \
> +        assert(vshard.storage.bucket_send(id, util.replicasets[2]))             \
> +    end                                                                         \
> +    bucket_worker_count = bucket_worker_count - 1                               \
> +end
> + | ---
> + | ...
> +-- Simulate many rebalancer fibers like when max_sending is increased.
> +for i = 1, bucket_worker_count do fiber.create(bucket_worker) end
> + | ---
> + | ...
> +test_run:wait_cond(function() return bucket_worker_count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +do_refs = false
> + | ---
> + | ...
> +test_run:wait_cond(function() return ref_worker_count == 0 end)
> + | ---
> + | - true
> + | ...
> +assert(lref.count == 0)
> + | ---
> + | - true
> + | ...
> +assert(lsched.ref_count == 0)
> + | ---
> + | - true
> + | ...
> +
> +for i = 1, bucket_count do wait_bucket_is_collected(i) end
> + | ---
> + | ...
> +
> +--
> +-- Refs can't block recvs infinitely.
> +--
> +do_refs = true
> + | ---
> + | ...
> +for i = 1, ref_worker_count do fiber.create(ref_worker) end
> + | ---
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +bucket_count = 100
> + | ---
> + | ...
> +bucket_id = 1
> + | ---
> + | ...
> +bucket_worker_count = 5
> + | ---
> + | ...
> +function bucket_worker()                                                        \
> +    while bucket_id <= bucket_count do                                          \
> +        local id = bucket_id                                                    \
> +        bucket_id = bucket_id + 1                                               \
> +        assert(vshard.storage.bucket_send(id, util.replicasets[1]))             \
> +    end                                                                         \
> +    bucket_worker_count = bucket_worker_count - 1                               \
> +end
> + | ---
> + | ...
> +for i = 1, bucket_worker_count do fiber.create(bucket_worker) end
> + | ---
> + | ...
> +test_run:wait_cond(function() return bucket_worker_count == 0 end)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_1_a')
> + | ---
> + | ...
> +do_refs = false
> + | ---
> + | ...
> +test_run:wait_cond(function() return ref_worker_count == 0 end)
> + | ---
> + | - true
> + | ...
> +assert(lref.count == 0)
> + | ---
> + | - true
> + | ...
> +assert(lsched.ref_count == 0)
> + | ---
> + | - true
> + | ...
> +
> +_ = test_run:switch('storage_2_a')
> + | ---
> + | ...
> +for i = 1, bucket_count do wait_bucket_is_collected(i) end
> + | ---
> + | ...
> +
> +_ = test_run:switch("default")
> + | ---
> + | ...
> +test_run:drop_cluster(REPLICASET_2)
> + | ---
> + | ...
> +test_run:drop_cluster(REPLICASET_1)
> + | ---
> + | ...
> +_ = test_run:cmd('clear filter')
> + | ---
> + | ...
> diff --git a/test/storage/scheduler.test.lua b/test/storage/scheduler.test.lua
> new file mode 100644
> index 0000000..8628f0e
> --- /dev/null
> +++ b/test/storage/scheduler.test.lua
> @@ -0,0 +1,178 @@
> +test_run = require('test_run').new()
> +REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
> +REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
> +
> +test_run:create_cluster(REPLICASET_1, 'storage')
> +test_run:create_cluster(REPLICASET_2, 'storage')
> +util = require('util')
> +util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
> +util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
> +util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
> +util.push_rs_filters(test_run)
> +
> +--
> +-- gh-147: scheduler helps to share time fairly between incompatible but
> +-- necessary operations - storage refs and bucket moves. Refs are used for the
> +-- consistent map-reduce feature when the whole cluster can be scanned without
> +-- being afraid that some data may slip through requests on behalf of the
> +-- rebalancer.
> +--
> +
> +_ = test_run:switch('storage_1_a')
> +
> +vshard.storage.rebalancer_disable()
> +vshard.storage.bucket_force_create(1, 1500)
> +
> +_ = test_run:switch('storage_2_a')
> +vshard.storage.rebalancer_disable()
> +vshard.storage.bucket_force_create(1501, 1500)
> +
> +_ = test_run:switch('storage_1_a')
> +--
> +-- Bucket_send() uses the scheduler.
> +--
> +lsched = require('vshard.storage.sched')
> +assert(lsched.move_strike == 0)
> +assert(lsched.move_count == 0)
> +big_timeout = 1000000
> +big_timeout_opts = {timeout = big_timeout}
> +vshard.storage.bucket_send(1, util.replicasets[2], big_timeout_opts)
> +assert(lsched.move_strike == 1)
> +assert(lsched.move_count == 0)
> +wait_bucket_is_collected(1)
> +
> +_ = test_run:switch('storage_2_a')
> +lsched = require('vshard.storage.sched')
> +--
> +-- Bucket_recv() uses the scheduler.
> +--
> +assert(lsched.move_strike == 1)
> +assert(lsched.move_count == 0)
> +
> +--
> +-- When move is in progress, it is properly accounted.
> +--
> +_ = test_run:switch('storage_1_a')
> +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
> +
> +_ = test_run:switch('storage_2_a')
> +big_timeout = 1000000
> +big_timeout_opts = {timeout = big_timeout}
> +ok, err = nil
> +assert(lsched.move_strike == 1)
> +_ = fiber.create(function()                                                     \
> +    ok, err = vshard.storage.bucket_send(1, util.replicasets[1],                \
> +                                         big_timeout_opts)                      \
> +end)
> +-- Strike increase does not mean the move finished. It means it was successfully
> +-- scheduled.
> +assert(lsched.move_strike == 2)
> +
> +_ = test_run:switch('storage_1_a')
> +test_run:wait_cond(function() return lsched.move_strike == 2 end)
> +
> +--
> +-- Ref is not allowed during move.
> +--
> +small_timeout = 0.000001
> +lref = require('vshard.storage.ref')
> +ok, err = lref.add(0, 0, small_timeout)
> +assert(not ok)
> +err.message
> +-- Put it to wait until move is done.
> +ok, err = nil
> +_ = fiber.create(function() ok, err = lref.add(0, 0, big_timeout) end)
> +vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
> +
> +_ = test_run:switch('storage_2_a')
> +test_run:wait_cond(function() return ok or err end)
> +ok, err
> +assert(lsched.move_count == 0)
> +wait_bucket_is_collected(1)
> +
> +_ = test_run:switch('storage_1_a')
> +test_run:wait_cond(function() return ok or err end)
> +ok, err
> +assert(lsched.move_count == 0)
> +assert(lsched.ref_count == 1)
> +lref.del(0, 0)
> +assert(lsched.ref_count == 0)
> +
> +--
> +-- Refs can't block sends infinitely. The scheduler must be fair and share time
> +-- between ref/move.
> +--
> +do_refs = true
> +ref_worker_count = 10
> +function ref_worker()                                                           \
> +    while do_refs do                                                            \
> +        lref.add(0, 0, big_timeout)                                             \
> +        fiber.sleep(small_timeout)                                              \
> +        lref.del(0, 0)                                                          \
> +    end                                                                         \
> +    ref_worker_count = ref_worker_count - 1                                     \
> +end
> +-- Simulate many fibers doing something with a ref being kept.
> +for i = 1, ref_worker_count do fiber.create(ref_worker) end
> +assert(lref.count > 0)
> +assert(lsched.ref_count > 0)
> +-- Ensure it passes with default opts (when move is in great unfairness). It is
> +-- important. Because moves are expected to be much longer than refs, and must
> +-- not happen too often with ref load in progress. But still should eventually
> +-- be processed.
> +bucket_count = 100
> +bucket_id = 1
> +bucket_worker_count = 5
> +function bucket_worker()                                                        \
> +    while bucket_id <= bucket_count do                                          \
> +        local id = bucket_id                                                    \
> +        bucket_id = bucket_id + 1                                               \
> +        assert(vshard.storage.bucket_send(id, util.replicasets[2]))             \
> +    end                                                                         \
> +    bucket_worker_count = bucket_worker_count - 1                               \
> +end
> +-- Simulate many rebalancer fibers like when max_sending is increased.
> +for i = 1, bucket_worker_count do fiber.create(bucket_worker) end
> +test_run:wait_cond(function() return bucket_worker_count == 0 end)
> +
> +do_refs = false
> +test_run:wait_cond(function() return ref_worker_count == 0 end)
> +assert(lref.count == 0)
> +assert(lsched.ref_count == 0)
> +
> +for i = 1, bucket_count do wait_bucket_is_collected(i) end
> +
> +--
> +-- Refs can't block recvs infinitely.
> +--
> +do_refs = true
> +for i = 1, ref_worker_count do fiber.create(ref_worker) end
> +
> +_ = test_run:switch('storage_2_a')
> +bucket_count = 100
> +bucket_id = 1
> +bucket_worker_count = 5
> +function bucket_worker()                                                        \
> +    while bucket_id <= bucket_count do                                          \
> +        local id = bucket_id                                                    \
> +        bucket_id = bucket_id + 1                                               \
> +        assert(vshard.storage.bucket_send(id, util.replicasets[1]))             \
> +    end                                                                         \
> +    bucket_worker_count = bucket_worker_count - 1                               \
> +end
> +for i = 1, bucket_worker_count do fiber.create(bucket_worker) end
> +test_run:wait_cond(function() return bucket_worker_count == 0 end)
> +
> +_ = test_run:switch('storage_1_a')
> +do_refs = false
> +test_run:wait_cond(function() return ref_worker_count == 0 end)
> +assert(lref.count == 0)
> +assert(lsched.ref_count == 0)
> +
> +_ = test_run:switch('storage_2_a')
> +for i = 1, bucket_count do wait_bucket_is_collected(i) end
> +
> +_ = test_run:switch("default")
> +test_run:drop_cluster(REPLICASET_2)
> +test_run:drop_cluster(REPLICASET_1)
> +_ = test_run:cmd('clear filter')
> diff --git a/test/unit-tap/ref.test.lua b/test/unit-tap/ref.test.lua
> index d987a63..ba95eee 100755
> --- a/test/unit-tap/ref.test.lua
> +++ b/test/unit-tap/ref.test.lua
> @@ -5,6 +5,7 @@ local test = tap.test('cfg')
>   local fiber = require('fiber')
>   local lregistry = require('vshard.registry')
>   local lref = require('vshard.storage.ref')
> +require('vshard.storage.sched')
>   
>   local big_timeout = 1000000
>   local small_timeout = 0.000001
> @@ -19,9 +20,11 @@ local sid3 = 2
>   --
>   
>   --
> --- Refs used storage API to get bucket space state and wait on its changes. But
> --- not important for these unit tests.
> +-- Refs use storage API to get bucket space state and wait on its changes. And
> +-- scheduler API to sync with bucket moves. But not important for these unit
> +-- tests.
>   --
> +
>   local function bucket_are_all_rw()
>       return true
>   end
> diff --git a/test/unit-tap/scheduler.test.lua b/test/unit-tap/scheduler.test.lua
> new file mode 100755
> index 0000000..0af4f5e
> --- /dev/null
> +++ b/test/unit-tap/scheduler.test.lua
> @@ -0,0 +1,555 @@
> +#!/usr/bin/env tarantool
> +
> +local fiber = require('fiber')
> +local tap = require('tap')
> +local test = tap.test('cfg')
> +local lregistry = require('vshard.registry')
> +local lref = require('vshard.storage.ref')
> +local lsched = require('vshard.storage.sched')
> +
> +local big_timeout = 1000000
> +local small_timeout = 0.000001
> +
> +--
> +-- gh-147: scheduler helps to share time fairly between incompatible but
> +-- necessary operations - storage refs and bucket moves. Refs are used for the
> +-- consistent map-reduce feature when the whole cluster can be scanned without
> +-- being afraid that some data may slip through requests on behalf of the
> +-- rebalancer.
> +--
> +
> +box.cfg{
> +    log = 'log.txt'
> +}
> +-- io.write = function(...) require('log').info(...) end
> +
> +--
> +-- Storage registry is used by the ref module. The ref module is used in the
> +-- tests in order to ensure the scheduler performs ref garbage collection.
> +--
> +local function bucket_are_all_rw()
> +    return true
> +end
> +
> +lregistry.storage = {
> +    bucket_are_all_rw = bucket_are_all_rw,
> +}
> +
> +local function fiber_csw()
> +    return fiber.info()[fiber.self():id()].csw
> +end
> +
> +local function fiber_set_joinable()
> +    fiber.self():set_joinable(true)
> +end
> +
> +local function test_basic(test)
> +    test:plan(32)
> +
> +    local ref_strike = lsched.ref_strike
> +    --
> +    -- Simplest possible test - start and end a ref.
> +    --
> +    test:is(lsched.ref_start(big_timeout), big_timeout, 'start ref')
> +    test:is(lsched.ref_count, 1, '1 ref')
> +    test:is(lsched.ref_strike, ref_strike + 1, '+1 ref in a row')
> +    lsched.ref_end(1)
> +    test:is(lsched.ref_count, 0, '0 refs after end')
> +    test:is(lsched.ref_strike, ref_strike + 1, 'strike is kept')
> +
> +    lsched.ref_start(big_timeout)
> +    lsched.ref_end(1)
> +    test:is(lsched.ref_strike, ref_strike + 2, 'strike grows')
> +    test:is(lsched.ref_count, 0, 'count does not')
> +
> +    --
> +    -- Move ends ref strike.
> +    --
> +    test:is(lsched.move_start(big_timeout), big_timeout, 'start move')
> +    test:is(lsched.move_count, 1, '1 move')
> +    test:is(lsched.move_strike, 1, '+1 move strike')
> +    test:is(lsched.ref_strike, 0, 'ref strike is interrupted')
> +
> +    --
> +    -- Ref times out if there is a move in progress.
> +    --
> +    local ok, err = lsched.ref_start(small_timeout)
> +    test:ok(not ok and err, 'ref fails')
> +    test:is(lsched.move_count, 1, 'still 1 move')
> +    test:is(lsched.move_strike, 1, 'still 1 move strike')
> +    test:is(lsched.ref_count, 0, 'could not add ref')
> +    test:is(lsched.ref_queue, 0, 'empty ref queue')
> +
> +    --
> +    -- Ref succeeds when move ends.
> +    --
> +    local f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.ref_start(big_timeout)
> +    end)
> +    fiber.sleep(small_timeout)
> +    lsched.move_end(1)
> +    local new_timeout
> +    ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout < big_timeout, 'correct timeout')
> +    test:is(lsched.move_count, 0, 'no moves')
> +    test:is(lsched.move_strike, 0, 'move strike ends')
> +    test:is(lsched.ref_count, 1, '+1 ref')
> +    test:is(lsched.ref_strike, 1, '+1 ref strike')
> +
> +    --
> +    -- Move succeeds when ref ends.
> +    --
> +    f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.move_start(big_timeout)
> +    end)
> +    fiber.sleep(small_timeout)
> +    lsched.ref_end(1)
> +    ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout < big_timeout, 'correct timeout')
> +    test:is(lsched.ref_count, 0, 'no refs')
> +    test:is(lsched.ref_strike, 0, 'ref strike ends')
> +    test:is(lsched.move_count, 1, '+1 move')
> +    test:is(lsched.move_strike, 1, '+1 move strike')
> +    lsched.move_end(1)
> +
> +    --
> +    -- Move times out when there is a ref.
> +    --
> +    test:is(lsched.ref_start(big_timeout), big_timeout, '+ ref')
> +    ok, err = lsched.move_start(small_timeout)
> +    test:ok(not ok and err, 'move fails')
> +    test:is(lsched.ref_count, 1, 'still 1 ref')
> +    test:is(lsched.ref_strike, 1, 'still 1 ref strike')
> +    test:is(lsched.move_count, 0, 'could not add move')
> +    test:is(lsched.move_queue, 0, 'empty move queue')
> +    lsched.ref_end(1)
> +end
> +
> +local function test_negative_timeout(test)
> +    test:plan(12)
> +
> +    --
> +    -- Move works even with negative timeout if no refs.
> +    --
> +    test:is(lsched.move_start(-1), -1, 'timeout does not matter if no refs')
> +    test:is(lsched.move_count, 1, '+1 move')
> +
> +    --
> +    -- Ref fails immediately if timeout negative and has moves.
> +    --
> +    local csw = fiber_csw()
> +    local ok, err = lsched.ref_start(-1)
> +    test:ok(not ok and err, 'ref fails')
> +    test:is(csw, fiber_csw(), 'no yields')
> +    test:is(lsched.ref_count, 0, 'no refs')
> +    test:is(lsched.ref_queue, 0, 'no ref queue')
> +
> +    --
> +    -- Ref works even with negative timeout if no moves.
> +    --
> +    lsched.move_end(1)
> +    test:is(lsched.ref_start(-1), -1, 'timeout does not matter if no moves')
> +    test:is(lsched.ref_count, 1, '+1 ref')
> +
> +    --
> +    -- Move fails immediately if timeout is negative and has refs.
> +    --
> +    csw = fiber_csw()
> +    ok, err = lsched.move_start(-1)
> +    test:ok(not ok and err, 'move fails')
> +    test:is(csw, fiber_csw(), 'no yields')
> +    test:is(lsched.move_count, 0, 'no moves')
> +    test:is(lsched.move_queue, 0, 'no move queue')
> +    lsched.ref_end(1)
> +end
> +
> +local function test_move_gc_ref(test)
> +    test:plan(10)
> +
> +    --
> +    -- Move deletes expired refs if it may help to start the move.
> +    --
> +    for sid = 1, 10 do
> +        for rid = 1, 5 do
> +            lref.add(rid, sid, small_timeout)
> +        end
> +    end
> +    test:is(lsched.ref_count, 50, 'refs are in progress')
> +    local ok, err = lsched.move_start(-1)
> +    test:ok(not ok and err, 'move without timeout failed')
> +
> +    fiber.sleep(small_timeout)
> +    test:is(lsched.move_start(-1), -1, 'succeeds even with negative timeout')
> +    test:is(lsched.ref_count, 0, 'all refs are expired and deleted')
> +    test:is(lref.count, 0, 'ref module knows about it')
> +    test:is(lsched.move_count, 1, 'move is started')
> +    lsched.move_end(1)
> +
> +    --
> +    -- May need more than 1 GC step.
> +    --
> +    for rid = 1, 5 do
> +        lref.add(0, rid, small_timeout)
> +    end
> +    for rid = 1, 5 do
> +        lref.add(1, rid, small_timeout * 100)
> +    end
> +    local new_timeout = lsched.move_start(big_timeout)
> +    test:ok(new_timeout < big_timeout, 'succeeds by doing 2 gc steps')
> +    test:is(lsched.ref_count, 0, 'all refs are expired and deleted')
> +    test:is(lref.count, 0, 'ref module knows about it')
> +    test:is(lsched.move_count, 1, 'move is started')
> +    lsched.move_end(1)
> +end
> +
> +local function test_ref_strike(test)
> +    test:plan(10)
> +
> +    local quota = lsched.ref_quota
> +    --
> +    -- Strike should stop new refs if they exceed the quota and there is a
> +    -- pending move.
> +    --
> +    -- End ref strike if there was one.
> +    lsched.move_start(small_timeout)
> +    lsched.move_end(1)
> +    -- Ref strike starts.
> +    assert(lsched.ref_start(small_timeout))
> +
> +    local f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.move_start(big_timeout)
> +    end)
> +    test:is(lsched.move_queue, 1, 'move is queued')
> +    --
> +    -- New refs should work only until quota is reached, because there is a
> +    -- pending move.
> +    --
> +    for i = 1, quota - 1 do
> +        assert(lsched.ref_start(small_timeout))
> +    end
> +    local ok, err = lsched.ref_start(small_timeout)
> +    test:ok(not ok and err, 'too long strike with move queue not empty')
> +    test:is(lsched.ref_strike, quota, 'max strike is reached')
> +    -- Even if number of current refs decreases, new still are not accepted.
> +    -- Because there was too many in a row while a new move was waiting.
> +    lsched.ref_end(1)
> +    ok, err = lsched.ref_start(small_timeout)
> +    test:ok(not ok and err, 'still too long strike after one unref')
> +    test:is(lsched.ref_strike, quota, 'strike is unchanged')
> +
> +    lsched.ref_end(quota - 1)
> +    local new_timeout
> +    ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout < big_timeout, 'move succeeded')
> +    test:is(lsched.move_count, 1, '+1 move')
> +    test:is(lsched.move_strike, 1, '+1 move strike')
> +    test:is(lsched.ref_count, 0, 'no refs')
> +    test:is(lsched.ref_strike, 0, 'no ref strike')
> +    lsched.move_end(1)
> +end
> +
> +local function test_move_strike(test)
> +    test:plan(10)
> +
> +    local quota = lsched.move_quota
> +    --
> +    -- Strike should stop new moves if they exceed the quota and there is a
> +    -- pending ref.
> +    --
> +    -- End move strike if there was one.
> +    lsched.ref_start(small_timeout)
> +    lsched.ref_end(1)
> +    -- Move strike starts.
> +    assert(lsched.move_start(small_timeout))
> +
> +    local f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.ref_start(big_timeout)
> +    end)
> +    test:is(lsched.ref_queue, 1, 'ref is queued')
> +    --
> +    -- New moves should work only until quota is reached, because there is a
> +    -- pending ref.
> +    --
> +    for i = 1, quota - 1 do
> +        assert(lsched.move_start(small_timeout))
> +    end
> +    local ok, err = lsched.move_start(small_timeout)
> +    test:ok(not ok and err, 'too long strike with ref queue not empty')
> +    test:is(lsched.move_strike, quota, 'max strike is reached')
> +    -- Even if number of current moves decreases, new still are not accepted.
> +    -- Because there was too many in a row while a new ref was waiting.
> +    lsched.move_end(1)
> +    ok, err = lsched.move_start(small_timeout)
> +    test:ok(not ok and err, 'still too long strike after one move end')
> +    test:is(lsched.move_strike, quota, 'strike is unchanged')
> +
> +    lsched.move_end(quota - 1)
> +    local new_timeout
> +    ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout < big_timeout, 'ref succeeded')
> +    test:is(lsched.ref_count, 1, '+1 ref')
> +    test:is(lsched.ref_strike, 1, '+1 ref strike')
> +    test:is(lsched.move_count, 0, 'no moves')
> +    test:is(lsched.move_strike, 0, 'no move strike')
> +    lsched.ref_end(1)
> +end
> +
> +local function test_ref_increase_quota(test)
> +    test:plan(4)
> +
> +    local quota = lsched.ref_quota
> +    --
> +    -- Ref quota increase allows to do more refs even if there are pending
> +    -- moves.
> +    --
> +    -- End ref strike if there was one.
> +    lsched.move_start(big_timeout)
> +    lsched.move_end(1)
> +    -- Fill the quota.
> +    for _ = 1, quota do
> +        assert(lsched.ref_start(big_timeout))
> +    end
> +    -- Start move to block new refs by quota.
> +    local f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.move_start(big_timeout)
> +    end)
> +    test:ok(not lsched.ref_start(small_timeout), 'can not add ref - full quota')
> +
> +    lsched.cfg({sched_ref_quota = quota + 1})
> +    test:ok(lsched.ref_start(small_timeout), 'now can add - quota is extended')
> +
> +    -- Decrease quota - should not accept new refs again.
> +    lsched.cfg{sched_ref_quota = quota}
> +    test:ok(not lsched.ref_start(small_timeout), 'full quota again')
> +
> +    lsched.ref_end(quota + 1)
> +    local ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout < big_timeout, 'move started')
> +    lsched.move_end(1)
> +end
> +
> +local function test_move_increase_quota(test)
> +    test:plan(4)
> +
> +    local quota = lsched.move_quota
> +    --
> +    -- Move quota increase allows to do more moves even if there are pending
> +    -- refs.
> +    --
> +    -- End move strike if there was one.
> +    lsched.ref_start(big_timeout)
> +    lsched.ref_end(1)
> +    -- Fill the quota.
> +    for _ = 1, quota do
> +        assert(lsched.move_start(big_timeout))
> +    end
> +    -- Start ref to block new moves by quota.
> +    local f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.ref_start(big_timeout)
> +    end)
> +    test:ok(not lsched.move_start(small_timeout), 'can not add move - full quota')
> +
> +    lsched.cfg({sched_move_quota = quota + 1})
> +    test:ok(lsched.move_start(small_timeout), 'now can add - quota is extended')
> +
> +    -- Decrease quota - should not accept new moves again.
> +    lsched.cfg{sched_move_quota = quota}
> +    test:ok(not lsched.move_start(small_timeout), 'full quota again')
> +
> +    lsched.move_end(quota + 1)
> +    local ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout < big_timeout, 'ref started')
> +    lsched.ref_end(1)
> +end
> +
> +local function test_ref_decrease_quota(test)
> +    test:plan(4)
> +
> +    local old_quota = lsched.ref_quota
> +    --
> +    -- Quota decrease should not affect any existing operations or break
> +    -- anything.
> +    --
> +    lsched.cfg({sched_ref_quota = 10})
> +    for _ = 1, 5 do
> +        assert(lsched.ref_start(big_timeout))
> +    end
> +    test:is(lsched.ref_count, 5, 'started refs below quota')
> +
> +    local f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.move_start(big_timeout)
> +    end)
> +    test:ok(lsched.ref_start(big_timeout), 'another ref after move queued')
> +
> +    lsched.cfg({sched_ref_quota = 2})
> +    test:ok(not lsched.ref_start(small_timeout), 'quota decreased - can not '..
> +            'start ref')
> +
> +    lsched.ref_end(6)
> +    local ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout, 'move is started')
> +    lsched.move_end(1)
> +
> +    lsched.cfg({sched_ref_quota = old_quota})
> +end
> +
> +local function test_move_decrease_quota(test)
> +    test:plan(4)
> +
> +    local old_quota = lsched.move_quota
> +    --
> +    -- Quota decrease should not affect any existing operations or break
> +    -- anything.
> +    --
> +    lsched.cfg({sched_move_quota = 10})
> +    for _ = 1, 5 do
> +        assert(lsched.move_start(big_timeout))
> +    end
> +    test:is(lsched.move_count, 5, 'started moves below quota')
> +
> +    local f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.ref_start(big_timeout)
> +    end)
> +    test:ok(lsched.move_start(big_timeout), 'another move after ref queued')
> +
> +    lsched.cfg({sched_move_quota = 2})
> +    test:ok(not lsched.move_start(small_timeout), 'quota decreased - can not '..
> +            'start move')
> +
> +    lsched.move_end(6)
> +    local ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout, 'ref is started')
> +    lsched.ref_end(1)
> +
> +    lsched.cfg({sched_move_quota = old_quota})
> +end
> +
> +local function test_ref_zero_quota(test)
> +    test:plan(6)
> +
> +    local old_quota = lsched.ref_quota
> +    --
> +    -- Zero quota is a valid value. Moreover, it is special. It means the
> +    -- 0-quoted operation should always be paused in favor of the other
> +    -- operation.
> +    --
> +    lsched.cfg({sched_ref_quota = 0})
> +    test:ok(lsched.ref_start(big_timeout), 'started ref with 0 quota')
> +
> +    local f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.move_start(big_timeout)
> +    end)
> +    test:ok(not lsched.ref_start(small_timeout), 'can not add more refs if '..
> +            'move is queued - quota 0')
> +
> +    lsched.ref_end(1)
> +    local ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout, 'move is started')
> +
> +    -- Ensure ref never starts if there are always moves, when quota is 0.
> +    f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.ref_start(big_timeout)
> +    end)
> +    local move_count = lsched.move_quota + 3
> +    -- Start from 2 to account the already existing move.
> +    for _ = 2, move_count do
> +        -- Start one new move.
> +        assert(lsched.move_start(big_timeout))
> +        -- Start second new move.
> +        assert(lsched.move_start(big_timeout))
> +        -- End first move.
> +        lsched.move_end(1)
> +        -- In result the moves are always interleaving - no time for refs at
> +        -- all.
> +    end
> +    test:is(lsched.move_count, move_count, 'moves exceed quota')
> +    test:ok(lsched.move_strike > move_count, 'strike is not interrupted')
> +
> +    lsched.move_end(move_count)
> +    ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout, 'ref finally started')
> +    lsched.ref_end(1)
> +
> +    lsched.cfg({sched_ref_quota = old_quota})
> +end
> +
> +local function test_move_zero_quota(test)
> +    test:plan(6)
> +
> +    local old_quota = lsched.move_quota
> +    --
> +    -- Zero quota is a valid value. Moreover, it is special. It means the
> +    -- 0-quoted operation should always be paused in favor of the other
> +    -- operation.
> +    --
> +    lsched.cfg({sched_move_quota = 0})
> +    test:ok(lsched.move_start(big_timeout), 'started move with 0 quota')
> +
> +    local f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.ref_start(big_timeout)
> +    end)
> +    test:ok(not lsched.move_start(small_timeout), 'can not add more moves if '..
> +            'ref is queued - quota 0')
> +
> +    lsched.move_end(1)
> +    local ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout, 'ref is started')
> +
> +    -- Ensure move never starts if there are always refs, when quota is 0.
> +    f = fiber.create(function()
> +        fiber_set_joinable()
> +        return lsched.move_start(big_timeout)
> +    end)
> +    local ref_count = lsched.ref_quota + 3
> +    -- Start from 2 to account the already existing ref.
> +    for _ = 2, ref_count do
> +        -- Start one new ref.
> +        assert(lsched.ref_start(big_timeout))
> +        -- Start second new ref.
> +        assert(lsched.ref_start(big_timeout))
> +        -- End first ref.
> +        lsched.ref_end(1)
> +        -- In result the refs are always interleaving - no time for moves at
> +        -- all.
> +    end
> +    test:is(lsched.ref_count, ref_count, 'refs exceed quota')
> +    test:ok(lsched.ref_strike > ref_count, 'strike is not interrupted')
> +
> +    lsched.ref_end(ref_count)
> +    ok, new_timeout = f:join()
> +    test:ok(ok and new_timeout, 'move finally started')
> +    lsched.move_end(1)
> +
> +    lsched.cfg({sched_move_quota = old_quota})
> +end
> +
> +test:plan(11)
> +
> +-- Change default values. Move is 1 by default, which would reduce the number of
> +-- possible tests. Ref is decreased to speed the tests up.
> +lsched.cfg({sched_ref_quota = 10, sched_move_quota = 5})
> +
> +test:test('basic', test_basic)
> +test:test('negative timeout', test_negative_timeout)
> +test:test('ref gc', test_move_gc_ref)
> +test:test('ref strike', test_ref_strike)
> +test:test('move strike', test_move_strike)
> +test:test('ref add quota', test_ref_increase_quota)
> +test:test('move add quota', test_move_increase_quota)
> +test:test('ref decrease quota', test_ref_decrease_quota)
> +test:test('move decrease quota', test_move_decrease_quota)
> +test:test('ref zero quota', test_ref_zero_quota)
> +test:test('move zero quota', test_move_zero_quota)
> +
> +os.exit(test:check() and 0 or 1)
> diff --git a/test/unit/config.result b/test/unit/config.result
> index e0b2482..9df3bf1 100644
> --- a/test/unit/config.result
> +++ b/test/unit/config.result
> @@ -597,3 +597,62 @@ cfg.collect_bucket_garbage_interval = 100
>   _ = lcfg.check(cfg)
>   ---
>   ...
> +--
> +-- gh-147: router map-reduce. It adds scheduler options on the storage.
> +--
> +cfg.sched_ref_quota = 100
> +---
> +...
> +_ = lcfg.check(cfg)
> +---
> +...
> +cfg.sched_ref_quota = 1
> +---
> +...
> +_ = lcfg.check(cfg)
> +---
> +...
> +cfg.sched_ref_quota = 0
> +---
> +...
> +_ = lcfg.check(cfg)
> +---
> +...
> +cfg.sched_ref_quota = -1
> +---
> +...
> +util.check_error(lcfg.check, cfg)
> +---
> +- Scheduler storage ref quota must be non-negative number
> +...
> +cfg.sched_ref_quota = nil
> +---
> +...
> +cfg.sched_move_quota = 100
> +---
> +...
> +_ = lcfg.check(cfg)
> +---
> +...
> +cfg.sched_move_quota = 1
> +---
> +...
> +_ = lcfg.check(cfg)
> +---
> +...
> +cfg.sched_move_quota = 0
> +---
> +...
> +_ = lcfg.check(cfg)
> +---
> +...
> +cfg.sched_move_quota = -1
> +---
> +...
> +util.check_error(lcfg.check, cfg)
> +---
> +- Scheduler bucket move quota must be non-negative number
> +...
> +cfg.sched_move_quota = nil
> +---
> +...
> diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua
> index a1c9f07..473e460 100644
> --- a/test/unit/config.test.lua
> +++ b/test/unit/config.test.lua
> @@ -241,3 +241,26 @@ cfg.rebalancer_max_sending = nil
>   --
>   cfg.collect_bucket_garbage_interval = 100
>   _ = lcfg.check(cfg)
> +
> +--
> +-- gh-147: router map-reduce. It adds scheduler options on the storage.
> +--
> +cfg.sched_ref_quota = 100
> +_ = lcfg.check(cfg)
> +cfg.sched_ref_quota = 1
> +_ = lcfg.check(cfg)
> +cfg.sched_ref_quota = 0
> +_ = lcfg.check(cfg)
> +cfg.sched_ref_quota = -1
> +util.check_error(lcfg.check, cfg)
> +cfg.sched_ref_quota = nil
> +
> +cfg.sched_move_quota = 100
> +_ = lcfg.check(cfg)
> +cfg.sched_move_quota = 1
> +_ = lcfg.check(cfg)
> +cfg.sched_move_quota = 0
> +_ = lcfg.check(cfg)
> +cfg.sched_move_quota = -1
> +util.check_error(lcfg.check, cfg)
> +cfg.sched_move_quota = nil
> diff --git a/vshard/cfg.lua b/vshard/cfg.lua
> index 63d5414..30f8794 100644
> --- a/vshard/cfg.lua
> +++ b/vshard/cfg.lua
> @@ -274,6 +274,14 @@ local cfg_template = {
>           type = 'string', name = 'Discovery mode: on, off, once',
>           is_optional = true, default = 'on', check = check_discovery_mode
>       },
> +    sched_ref_quota = {
> +        name = 'Scheduler storage ref quota', type = 'non-negative number',
> +        is_optional = true, default = consts.DEFAULT_SCHED_REF_QUOTA
> +    },
> +    sched_move_quota = {
> +        name = 'Scheduler bucket move quota', type = 'non-negative number',
> +        is_optional = true, default = consts.DEFAULT_SCHED_MOVE_QUOTA
> +    },
>   }
>   
>   --
> diff --git a/vshard/consts.lua b/vshard/consts.lua
> index 0ffe0e2..47a893b 100644
> --- a/vshard/consts.lua
> +++ b/vshard/consts.lua
> @@ -41,6 +41,11 @@ return {
>       GC_BACKOFF_INTERVAL = 5,
>       RECOVERY_BACKOFF_INTERVAL = 5,
>       COLLECT_LUA_GARBAGE_INTERVAL = 100;
> +    DEFAULT_BUCKET_SEND_TIMEOUT = 10,
> +    DEFAULT_BUCKET_RECV_TIMEOUT = 10,
> +
> +    DEFAULT_SCHED_REF_QUOTA = 300,
> +    DEFAULT_SCHED_MOVE_QUOTA = 1,
>   
>       DISCOVERY_IDLE_INTERVAL = 10,
>       DISCOVERY_WORK_INTERVAL = 1,
> diff --git a/vshard/storage/CMakeLists.txt b/vshard/storage/CMakeLists.txt
> index 7c1e97d..396664a 100644
> --- a/vshard/storage/CMakeLists.txt
> +++ b/vshard/storage/CMakeLists.txt
> @@ -1,2 +1,2 @@
> -install(FILES init.lua reload_evolution.lua ref.lua
> +install(FILES init.lua reload_evolution.lua ref.lua sched.lua
>           DESTINATION ${TARANTOOL_INSTALL_LUADIR}/vshard/storage)
> diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
> index 2957f48..31f668f 100644
> --- a/vshard/storage/init.lua
> +++ b/vshard/storage/init.lua
> @@ -17,7 +17,7 @@ if rawget(_G, MODULE_INTERNALS) then
>           'vshard.replicaset', 'vshard.util',
>           'vshard.storage.reload_evolution',
>           'vshard.lua_gc', 'vshard.rlist', 'vshard.registry',
> -        'vshard.heap', 'vshard.storage.ref',
> +        'vshard.heap', 'vshard.storage.ref', 'vshard.storage.sched',
>       }
>       for _, module in pairs(vshard_modules) do
>           package.loaded[module] = nil
> @@ -32,6 +32,7 @@ local util = require('vshard.util')
>   local lua_gc = require('vshard.lua_gc')
>   local lregistry = require('vshard.registry')
>   local lref = require('vshard.storage.ref')
> +local lsched = require('vshard.storage.sched')
>   local reload_evolution = require('vshard.storage.reload_evolution')
>   local fiber_cond_wait = util.fiber_cond_wait
>   local bucket_ref_new
> @@ -1142,16 +1143,33 @@ local function bucket_recv_xc(bucket_id, from, data, opts)
>               return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, msg,
>                                         from)
>           end
> -        if lref.count > 0 then
> -            return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED)
> -        end
>           if is_this_replicaset_locked() then
>               return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED)
>           end
>           if not bucket_receiving_quota_add(-1) then
>               return nil, lerror.vshard(lerror.code.TOO_MANY_RECEIVING)
>           end
> -        _bucket:insert({bucket_id, recvg, from})
> +        local timeout = opts and opts.timeout or
> +                        consts.DEFAULT_BUCKET_SEND_TIMEOUT
> +        local ok, err = lsched.move_start(timeout)
> +        if not ok then
> +            return nil, err
> +        end
> +        assert(lref.count == 0)
> +        -- Move schedule is done only for the time of _bucket update.
> +        -- The reason is that one bucket_send() calls bucket_recv() on the
> +        -- remote storage multiple times. If the latter would schedule new moves
> +        -- on each call, it could happen that the scheduler would block it in
> +        -- favor of refs right in the middle of bucket_send().
> +        -- It would lead to a deadlock, because refs won't be able to start -
> +        -- the bucket won't be writable.
> +        -- This way still provides fair scheduling, but does not have the
> +        -- described issue.
> +        ok, err = pcall(_bucket.insert, _bucket, {bucket_id, recvg, from})
> +        lsched.move_end(1)
> +        if not ok then
> +            return nil, lerror.make(err)
> +        end
>       elseif b.status ~= recvg then
>           local msg = string.format("bucket state is changed: was receiving, "..
>                                     "became %s", b.status)
> @@ -1434,7 +1452,7 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard)
>       ref.rw_lock = true
>       exception_guard.ref = ref
>       exception_guard.drop_rw_lock = true
> -    local timeout = opts and opts.timeout or 10
> +    local timeout = opts and opts.timeout or consts.DEFAULT_BUCKET_SEND_TIMEOUT
>       local deadline = fiber_clock() + timeout
>       while ref.rw ~= 0 do
>           timeout = deadline - fiber_clock()
> @@ -1446,9 +1464,6 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard)
>   
>       local _bucket = box.space._bucket
>       local bucket = _bucket:get({bucket_id})
> -    if lref.count > 0 then
> -        return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED)
> -    end
>       if is_this_replicaset_locked() then
>           return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED)
>       end
> @@ -1468,7 +1483,25 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard)
>       local idx = M.shard_index
>       local bucket_generation = M.bucket_generation
>       local sendg = consts.BUCKET.SENDING
> -    _bucket:replace({bucket_id, sendg, destination})
> +
> +    local ok, err = lsched.move_start(timeout)
> +    if not ok then
> +        return nil, err
> +    end
> +    assert(lref.count == 0)
> +    -- Move is scheduled only for the time of _bucket update because:
> +    --
> +    -- * it is consistent with bucket_recv() (see its comments);
> +    --
> +    -- * gives the same effect as if move was in the scheduler for the whole
> +    --   bucket_send() time, because refs won't be able to start anyway - the
> +    --   bucket is not writable.
> +    ok, err = pcall(_bucket.replace, _bucket, {bucket_id, sendg, destination})
> +    lsched.move_end(1)
> +    if not ok then
> +        return nil, lerror.make(err)
> +    end
> +
>       -- From this moment the bucket is SENDING. Such a status is
>       -- even stronger than the lock.
>       ref.rw_lock = false
> @@ -2542,6 +2575,7 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
>           M.bucket_on_replace = bucket_generation_increment
>       end
>   
> +    lsched.cfg(vshard_cfg)
>       lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
>       lreplicaset.outdate_replicasets(M.replicasets)
>       M.replicasets = new_replicasets
> diff --git a/vshard/storage/ref.lua b/vshard/storage/ref.lua
> index 7589cb9..2daad6b 100644
> --- a/vshard/storage/ref.lua
> +++ b/vshard/storage/ref.lua
> @@ -33,6 +33,7 @@ local lregistry = require('vshard.registry')
>   local fiber_clock = lfiber.clock
>   local fiber_yield = lfiber.yield
>   local DEADLINE_INFINITY = lconsts.DEADLINE_INFINITY
> +local TIMEOUT_INFINITY = lconsts.TIMEOUT_INFINITY
>   local LUA_CHUNK_SIZE = lconsts.LUA_CHUNK_SIZE
>   
>   --
> @@ -88,6 +89,7 @@ local function ref_session_new(sid)
>       -- Cache global session storages as upvalues to save on M indexing.
>       local global_heap = M.session_heap
>       local global_map = M.session_map
> +    local sched = lregistry.storage_sched
>   
>       local function ref_session_discount(self, del_count)
>           local new_count = M.count - del_count
> @@ -97,6 +99,8 @@ local function ref_session_new(sid)
>           new_count = count - del_count
>           assert(new_count >= 0)
>           count = new_count
> +
> +        sched.ref_end(del_count)
>       end
>   
>       local function ref_session_update_deadline(self)
> @@ -310,10 +314,17 @@ local function ref_add(rid, sid, timeout)
>       local deadline = now + timeout
>       local ok, err, session
>       local storage = lregistry.storage
> +    local sched = lregistry.storage_sched
> +
> +    timeout, err = sched.ref_start(timeout)
> +    if not timeout then
> +        return nil, err
> +    end
> +
>       while not storage.bucket_are_all_rw() do
>           ok, err = storage.bucket_generation_wait(timeout)
>           if not ok then
> -            return nil, err
> +            goto fail_sched
>           end
>           now = fiber_clock()
>           timeout = deadline - now
> @@ -322,7 +333,13 @@ local function ref_add(rid, sid, timeout)
>       if not session then
>           session = ref_session_new(sid)
>       end
> -    return session:add(rid, deadline, now)
> +    ok, err = session:add(rid, deadline, now)
> +    if ok then
> +        return true
> +    end
> +::fail_sched::
> +    sched.ref_end(1)
> +    return nil, err
>   end
>   
>   local function ref_use(rid, sid)
> @@ -341,6 +358,14 @@ local function ref_del(rid, sid)
>       return session:del(rid)
>   end
>   
> +local function ref_next_deadline()
> +    local session = M.session_heap:top()
> +    if not session then
> +        return fiber_clock() + TIMEOUT_INFINITY
> +    end

Does it make sence? inf + fiber_clock() = inf


> +    return session.deadline
> +end
> +
>   local function ref_kill_session(sid)
>       local session = M.session_map[sid]
>       if session then
> @@ -366,6 +391,7 @@ M.add = ref_add
>   M.use = ref_use
>   M.cfg = ref_cfg
>   M.kill = ref_kill_session
> +M.next_deadline = ref_next_deadline
>   lregistry.storage_ref = M
>   
>   return M
> diff --git a/vshard/storage/sched.lua b/vshard/storage/sched.lua
> new file mode 100644
> index 0000000..0ac71f4
> --- /dev/null
> +++ b/vshard/storage/sched.lua
> @@ -0,0 +1,231 @@
> +--
> +-- Scheduler module ensures fair time sharing between incompatible operations:
> +-- storage refs and bucket moves.
> +-- Storage ref is supposed to prevent all bucket moves and provide safe
> +-- environment for all kinds of possible requests on entire dataset of all
> +-- spaces stored on the instance.
> +-- Bucket move, on the contrary, wants to make a part of the dataset not usable
> +-- temporary.
> +-- Without a scheduler it would be possible to always keep at least one ref on
> +-- the storage and block bucket moves forever. Or vice versa - during
> +-- rebalancing block all incoming refs for the entire time of data migration,
> +-- essentially making map-reduce not usable since it heavily depends on refs.
> +--
> +-- The schedule divides storage time between refs and moves so both of them can
> +-- execute without blocking each other. Division proportions depend on the
> +-- configuration settings.
> +--
> +-- Idea of non-blockage is based on quotas and strikes. Move and ref both have
> +-- quotas. When one op executes more than quota requests in a row (makes a
> +-- strike) while the other op has queued requests, the first op stops accepting
> +-- new requests until the other op executes.
> +--
> +
> +local MODULE_INTERNALS = '__module_vshard_storage_sched'
> +-- Update when change behaviour of anything in the file, to be able to reload.
> +local MODULE_VERSION = 1
> +
> +local lfiber = require('fiber')
> +local lerror = require('vshard.error')
> +local lconsts = require('vshard.consts')
> +local lregistry = require('vshard.registry')
> +local lutil = require('vshard.util')
> +local fiber_clock = lfiber.clock
> +local fiber_cond_wait = lutil.fiber_cond_wait
> +local fiber_is_self_canceled = lutil.fiber_is_self_canceled
> +
> +local M = rawget(_G, MODULE_INTERNALS)
> +if not M then
> +    M = {
> +        ---------------- Common module attributes ----------------
> +        module_version = MODULE_VERSION,
> +        -- Scheduler condition is signaled every time anything significant
> +        -- happens - count of an operation type drops to 0, or quota increased,
> +        -- etc.
> +        cond = lfiber.cond(),
> +
> +        -------------------------- Refs --------------------------
> +        -- Number of ref requests waiting for start.
> +        ref_queue = 0,
> +        -- Number of ref requests being executed. It is the same as ref's module
> +        -- counter, but is duplicated here for the sake of isolation and
> +        -- symmetry with moves.
> +        ref_count = 0,
> +        -- Number of ref requests executed in a row. When becomes bigger than
> +        -- quota, any next queued move blocks new refs.
> +        ref_strike = 0,
> +        ref_quota = lconsts.DEFAULT_SCHED_REF_QUOTA,
> +
> +        ------------------------- Moves --------------------------
> +        -- Number of move requests waiting for start.
> +        move_queue = 0,
> +        -- Number of move requests being executed.
> +        move_count = 0,
> +        -- Number of move requests executed in a row. When becomes bigger than
> +        -- quota, any next queued ref blocks new moves.
> +        move_strike = 0,
> +        move_quota = lconsts.DEFAULT_SCHED_MOVE_QUOTA,
> +    }
> +else
> +    return M
> +end
> +
> +local function sched_wait_anything(timeout)
> +    return fiber_cond_wait(M.cond, timeout)
> +end
> +
> +--
> +-- Return the remaining timeout in case there was a yield. This helps to save
> +-- current clock get in the caller code if there were no yields.
> +--
> +local function sched_ref_start(timeout)
> +    local deadline = fiber_clock() + timeout

Let's do it after fast check to eliminate excess fiber_clock call.

Also there are several similar places below. Please fix them as well.

> +    local ok, err
> +    -- Fast-path. Moves are extremely rare. No need to inc-dec the ref queue
> +    -- then nor try to start some loops.
> +    if M.move_count == 0 and M.move_queue == 0 then
> +        goto success
> +    end
> +
> +    M.ref_queue = M.ref_queue + 1
> +
> +::retry::
> +    if M.move_count > 0 then
> +        goto wait_and_retry
> +    end
> +    -- Even if move count is zero, must ensure the time usage is fair. Does not
> +    -- matter in case the moves have no quota at all. That allows to ignore them
> +    -- infinitely until all refs end voluntarily.
> +    if M.move_queue > 0 and M.ref_strike >= M.ref_quota and
> +       M.move_quota > 0 then
> +        goto wait_and_retry
> +    end
> +
> +    M.ref_queue = M.ref_queue - 1
> +
> +::success::
> +    M.ref_count = M.ref_count + 1
> +    M.ref_strike = M.ref_strike + 1
> +    M.move_strike = 0
> +    do return timeout end
> +
> +::wait_and_retry::
> +    ok, err = sched_wait_anything(timeout)
> +    if not ok then
> +        M.ref_queue = M.ref_queue - 1
> +        return nil, err
> +    end
> +    timeout = deadline - fiber_clock()
> +    goto retry
> +end
> +
> +local function sched_ref_end(count)
> +    count = M.ref_count - count
> +    M.ref_count = count
> +    if count == 0 and M.move_queue > 0 then
> +        M.cond:broadcast()
> +    end
> +end
> +
> +--
> +-- Return the remaining timeout in case there was a yield. This helps to save
> +-- current clock get in the caller code if there were no yields.
> +--
> +local function sched_move_start(timeout)
> +    local deadline = fiber_clock() + timeout
> +    local ok, err, ref_deadline
> +    local lref = lregistry.storage_ref
> +    -- Fast-path. Refs are not extremely rare *when used*. But they are not
> +    -- expected to be used in a lot of installations. So most of the times the
> +    -- moves should work right away.
> +    if M.ref_count == 0 and M.ref_queue == 0 then
> +        goto success
> +    end
> +
> +    M.move_queue = M.move_queue + 1
> +
> +::retry::
> +    if M.ref_count > 0 then
> +        ref_deadline = lref.next_deadline()
> +        if ref_deadline < deadline then
> +            timeout = ref_deadline - fiber_clock()
> +        end
> +        ok, err = sched_wait_anything(timeout)
> +        timeout = deadline - fiber_clock()
> +        if ok then
> +            goto retry
> +        end
> +        if fiber_is_self_canceled() then
> +            goto fail
> +        end
> +        -- Even if the timeout has expired already (or was 0 from the
> +        -- beginning), it is still possible the move can be started if all the
> +        -- present refs are expired too and can be collected.
> +        lref.gc()
> +        -- GC could yield - need to refetch the clock again.
> +        timeout = deadline - fiber_clock()
> +        if M.ref_count > 0 then
> +            if timeout < 0 then
> +                goto fail
> +            end
> +            goto retry
> +        end
> +    end
> +
> +    if M.ref_queue > 0 and M.move_strike >= M.move_quota and
> +       M.ref_quota > 0 then
> +        ok, err = sched_wait_anything(timeout)
> +        if not ok then
> +            goto fail
> +        end
> +        timeout = deadline - fiber_clock()
> +        goto retry
> +    end
> +
> +    M.move_queue = M.move_queue - 1
> +
> +::success::
> +    M.move_count = M.move_count + 1
> +    M.move_strike = M.move_strike + 1
> +    M.ref_strike = 0
> +    do return timeout end
> +
> +::fail::
> +    M.move_queue = M.move_queue - 1
> +    return nil, err
> +end
> +
> +local function sched_move_end(count)
> +    count = M.move_count - count
> +    M.move_count = count
> +    if count == 0 and M.ref_queue > 0 then
> +        M.cond:broadcast()
> +    end
> +end
> +
> +local function sched_cfg(cfg)
> +    local new_ref_quota = cfg.sched_ref_quota
> +    local new_move_quota = cfg.sched_move_quota
> +
> +    if new_ref_quota then
> +        if new_ref_quota > M.ref_quota then
> +            M.cond:broadcast()
> +        end
> +        M.ref_quota = new_ref_quota
> +    end
> +    if new_move_quota then
> +        if new_move_quota > M.move_quota then
> +            M.cond:broadcast()
> +        end
> +        M.move_quota = new_move_quota
> +    end
> +end
> +
> +M.ref_start = sched_ref_start
> +M.ref_end = sched_ref_end
> +M.move_start = sched_move_start
> +M.move_end = sched_move_end
> +M.cfg = sched_cfg
> +lregistry.storage_sched = M
> +
> +return M

  reply	other threads:[~2021-02-24 10:31 UTC|newest]

Thread overview: 47+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-23  0:15 [Tarantool-patches] [PATCH vshard 00/11] VShard Map-Reduce, part 2: Ref, Sched, Map Vladislav Shpilevoy via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 01/11] error: introduce vshard.error.timeout() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:46     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 10/11] sched: introduce vshard.storage.sched module Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:28   ` Oleg Babin via Tarantool-patches [this message]
2021-02-24 21:50     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-04 21:02   ` Oleg Babin via Tarantool-patches
2021-03-05 22:06     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-09  8:03       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 11/11] router: introduce map_callrw() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:28   ` Oleg Babin via Tarantool-patches
2021-02-24 22:04     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:43       ` Oleg Babin via Tarantool-patches
2021-02-26 23:58         ` Vladislav Shpilevoy via Tarantool-patches
2021-03-01 10:58           ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 02/11] storage: add helper for local functions invocation Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 03/11] storage: cache bucket count Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:47     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 04/11] registry: module for circular deps resolution Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 05/11] util: introduce safe fiber_cond_wait() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:48     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 06/11] util: introduce fiber_is_self_canceled() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 07/11] storage: introduce bucket_generation_wait() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 08/11] storage: introduce bucket_are_all_rw() Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:27   ` Oleg Babin via Tarantool-patches
2021-02-24 21:48     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-23  0:15 ` [Tarantool-patches] [PATCH vshard 09/11] ref: introduce vshard.storage.ref module Vladislav Shpilevoy via Tarantool-patches
2021-02-24 10:28   ` Oleg Babin via Tarantool-patches
2021-02-24 21:49     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-25 12:42       ` Oleg Babin via Tarantool-patches
2021-03-04 21:22   ` Oleg Babin via Tarantool-patches
2021-03-05 22:06     ` Vladislav Shpilevoy via Tarantool-patches
2021-03-09  8:03       ` Oleg Babin via Tarantool-patches
2021-03-21 18:49   ` Vladislav Shpilevoy via Tarantool-patches
2021-03-12 23:13 ` [Tarantool-patches] [PATCH vshard 00/11] VShard Map-Reduce, part 2: Ref, Sched, Map Vladislav Shpilevoy via Tarantool-patches
2021-03-15  7:05   ` Oleg Babin via Tarantool-patches
2021-03-28 18:17 ` Vladislav Shpilevoy via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=5e011bc2-426b-e59a-f165-fef74f998b18@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=olegrok@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --cc=yaroslav.dynnikov@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH vshard 10/11] sched: introduce vshard.storage.sched module' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox