[Tarantool-patches] [PATCH vshard 10/11] sched: introduce vshard.storage.sched module

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Feb 23 03:15:38 MSK 2021


'vshard.storage.sched' module ensures that two incompatible
operations share storage time fairly - storage refs and bucket
moves.

Storage refs are going to be used by map-reduce API to preserve
data consistency while map requests are in progress on all
storages.

It means storage refs will be used as commonly as bucket refs,
and should not block the rebalancer. However it is hard not to
block the rebalancer forever if there are always refs on the
storage.

With bucket refs it was easy - one bucket temporary block is not a
big deal. So rebalancer always has higher prio than bucket refs,
and it still does not block requests for the other buckets +
read-only requests on the subject bucket.

With storage refs having rebalancer with a higher prio would make
map-reduce requests die in the entire cluster for the whole time
of rebalancing, which can be as long as hours or even days. It
wouldn't be acceptable.

The new module vshard.storage.sched shares time between moves and
storeage refs fairly. They both get time to execute with
proportions configures by user. The proportions depend on how
big is a bucket, how long the map-reduce requests are expected to
be. The longer is a request, the less quota it should be given,
typically.

The patch introduces new storage options to configure the
scheduling.

Part of #147

@TarantoolBot document
Title: vshard.storage.cfg new options - sched_ref_quota and sched_move_quota

There are new options for `vshard.storage.cfg`: `sched_ref_quota`
and `sched_move_quota`. The options control how much time should
be given to storage refs and bucket moves - two incompatible but
important operations.

Storage refs are used by router's map-reduce API. Each map-reduce
call creates storage refs on all storages to prevent data
migration on them for the map execution.

Bucket moves are used by the rebalancer. Obviously, they are
incompatible with the storage refs.

If vshard would prefer one operation to another always, it would
lead to starvation of one of them. For example, if storage refs
would be prefered, rebalancing could just never work if there are
always refs under constant map-reduce load. If bucket moves would
be prefered, storage refs (and therefore map-reduce) would stop
for the entire rebalancing time which can be quite long (hours,
days).

To control how much time to give to which operation the new
options serve.

`sched_ref_quota` tells how many storage refs (therefore
map-reduce requests) can be executed on the storage in a row if
there are pending bucket moves, before they are blocked to let the
moves work. Default value is 300.

`sched_move_quota` controls the same, but vice-versa: how many
bucket moves can be done in a row if there are pending refs.
Default value is 1.

Map-reduce requests are expected to be much shorter than bucket
moves, so storage refs by default have a higher quota.

This is how it works on an example. Assume map-reduces start.
They execute one after another, 150 requests in a row. Now the
rebalancer wakes up and wants to move some buckets. He stands into
a queue and waits for the storage refs to be gone.

But the ref quota is not reached yet, so the storage still can
execute +150 map-reduces even with the queued bucket moves until
new refs are blocked, and the moves start.
---
 test/reload_evolution/storage.result |   2 +-
 test/storage/ref.result              |  19 +-
 test/storage/ref.test.lua            |   9 +-
 test/storage/scheduler.result        | 410 ++++++++++++++++++++
 test/storage/scheduler.test.lua      | 178 +++++++++
 test/unit-tap/ref.test.lua           |   7 +-
 test/unit-tap/scheduler.test.lua     | 555 +++++++++++++++++++++++++++
 test/unit/config.result              |  59 +++
 test/unit/config.test.lua            |  23 ++
 vshard/cfg.lua                       |   8 +
 vshard/consts.lua                    |   5 +
 vshard/storage/CMakeLists.txt        |   2 +-
 vshard/storage/init.lua              |  54 ++-
 vshard/storage/ref.lua               |  30 +-
 vshard/storage/sched.lua             | 231 +++++++++++
 15 files changed, 1567 insertions(+), 25 deletions(-)
 create mode 100644 test/storage/scheduler.result
 create mode 100644 test/storage/scheduler.test.lua
 create mode 100755 test/unit-tap/scheduler.test.lua
 create mode 100644 vshard/storage/sched.lua

diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result
index c4a0cdd..77010a2 100644
--- a/test/reload_evolution/storage.result
+++ b/test/reload_evolution/storage.result
@@ -258,7 +258,7 @@ ok, err = vshard.storage.bucket_send(bucket_id_to_move, util.replicasets[2],
 ...
 assert(not ok and err.message)
 ---
-- Storage is referenced
+- Timeout exceeded
 ...
 lref.del(0, 0)
 ---
diff --git a/test/storage/ref.result b/test/storage/ref.result
index d5f4166..59f07f4 100644
--- a/test/storage/ref.result
+++ b/test/storage/ref.result
@@ -84,18 +84,22 @@ big_timeout = 1000000
 small_timeout = 0.001
  | ---
  | ...
+
+timeout = 0.01
+ | ---
+ | ...
 lref.add(rid, sid, big_timeout)
  | ---
  | - true
  | ...
 -- Send fails.
 ok, err = vshard.storage.bucket_send(1, util.replicasets[2],                    \
-                                     {timeout = big_timeout})
+                                     {timeout = timeout})
  | ---
  | ...
 assert(not ok and err.message)
  | ---
- | - Storage is referenced
+ | - Timeout exceeded
  | ...
 lref.use(rid, sid)
  | ---
@@ -103,12 +107,12 @@ lref.use(rid, sid)
  | ...
 -- Still fails - use only makes ref undead until it is deleted explicitly.
 ok, err = vshard.storage.bucket_send(1, util.replicasets[2],                    \
-                                     {timeout = big_timeout})
+                                     {timeout = timeout})
  | ---
  | ...
 assert(not ok and err.message)
  | ---
- | - Storage is referenced
+ | - Timeout exceeded
  | ...
 
 _ = test_run:switch('storage_2_a')
@@ -118,13 +122,16 @@ _ = test_run:switch('storage_2_a')
 big_timeout = 1000000
  | ---
  | ...
+timeout = 0.01
+ | ---
+ | ...
 ok, err = vshard.storage.bucket_send(1501, util.replicasets[1],                 \
-                                     {timeout = big_timeout})
+                                     {timeout = timeout})
  | ---
  | ...
 assert(not ok and err.message)
  | ---
- | - Storage is referenced
+ | - Timeout exceeded
  | ...
 
 --
diff --git a/test/storage/ref.test.lua b/test/storage/ref.test.lua
index b34a294..24303e2 100644
--- a/test/storage/ref.test.lua
+++ b/test/storage/ref.test.lua
@@ -35,22 +35,25 @@ sid = 0
 rid = 0
 big_timeout = 1000000
 small_timeout = 0.001
+
+timeout = 0.01
 lref.add(rid, sid, big_timeout)
 -- Send fails.
 ok, err = vshard.storage.bucket_send(1, util.replicasets[2],                    \
-                                     {timeout = big_timeout})
+                                     {timeout = timeout})
 assert(not ok and err.message)
 lref.use(rid, sid)
 -- Still fails - use only makes ref undead until it is deleted explicitly.
 ok, err = vshard.storage.bucket_send(1, util.replicasets[2],                    \
-                                     {timeout = big_timeout})
+                                     {timeout = timeout})
 assert(not ok and err.message)
 
 _ = test_run:switch('storage_2_a')
 -- Receive (from another replicaset) also fails.
 big_timeout = 1000000
+timeout = 0.01
 ok, err = vshard.storage.bucket_send(1501, util.replicasets[1],                 \
-                                     {timeout = big_timeout})
+                                     {timeout = timeout})
 assert(not ok and err.message)
 
 --
diff --git a/test/storage/scheduler.result b/test/storage/scheduler.result
new file mode 100644
index 0000000..0f53e42
--- /dev/null
+++ b/test/storage/scheduler.result
@@ -0,0 +1,410 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+ | ---
+ | ...
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+ | ---
+ | ...
+
+test_run:create_cluster(REPLICASET_1, 'storage')
+ | ---
+ | ...
+test_run:create_cluster(REPLICASET_2, 'storage')
+ | ---
+ | ...
+util = require('util')
+ | ---
+ | ...
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+ | ---
+ | ...
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+ | ---
+ | ...
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
+ | ---
+ | ...
+util.push_rs_filters(test_run)
+ | ---
+ | ...
+
+--
+-- gh-147: scheduler helps to share time fairly between incompatible but
+-- necessary operations - storage refs and bucket moves. Refs are used for the
+-- consistent map-reduce feature when the whole cluster can be scanned without
+-- being afraid that some data may slip through requests on behalf of the
+-- rebalancer.
+--
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+
+vshard.storage.rebalancer_disable()
+ | ---
+ | ...
+vshard.storage.bucket_force_create(1, 1500)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+vshard.storage.rebalancer_disable()
+ | ---
+ | ...
+vshard.storage.bucket_force_create(1501, 1500)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+--
+-- Bucket_send() uses the scheduler.
+--
+lsched = require('vshard.storage.sched')
+ | ---
+ | ...
+assert(lsched.move_strike == 0)
+ | ---
+ | - true
+ | ...
+assert(lsched.move_count == 0)
+ | ---
+ | - true
+ | ...
+big_timeout = 1000000
+ | ---
+ | ...
+big_timeout_opts = {timeout = big_timeout}
+ | ---
+ | ...
+vshard.storage.bucket_send(1, util.replicasets[2], big_timeout_opts)
+ | ---
+ | - true
+ | ...
+assert(lsched.move_strike == 1)
+ | ---
+ | - true
+ | ...
+assert(lsched.move_count == 0)
+ | ---
+ | - true
+ | ...
+wait_bucket_is_collected(1)
+ | ---
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+lsched = require('vshard.storage.sched')
+ | ---
+ | ...
+--
+-- Bucket_recv() uses the scheduler.
+--
+assert(lsched.move_strike == 1)
+ | ---
+ | - true
+ | ...
+assert(lsched.move_count == 0)
+ | ---
+ | - true
+ | ...
+
+--
+-- When move is in progress, it is properly accounted.
+--
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
+ | ---
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+big_timeout = 1000000
+ | ---
+ | ...
+big_timeout_opts = {timeout = big_timeout}
+ | ---
+ | ...
+ok, err = nil
+ | ---
+ | ...
+assert(lsched.move_strike == 1)
+ | ---
+ | - true
+ | ...
+_ = fiber.create(function()                                                     \
+    ok, err = vshard.storage.bucket_send(1, util.replicasets[1],                \
+                                         big_timeout_opts)                      \
+end)
+ | ---
+ | ...
+-- Strike increase does not mean the move finished. It means it was successfully
+-- scheduled.
+assert(lsched.move_strike == 2)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return lsched.move_strike == 2 end)
+ | ---
+ | - true
+ | ...
+
+--
+-- Ref is not allowed during move.
+--
+small_timeout = 0.000001
+ | ---
+ | ...
+lref = require('vshard.storage.ref')
+ | ---
+ | ...
+ok, err = lref.add(0, 0, small_timeout)
+ | ---
+ | ...
+assert(not ok)
+ | ---
+ | - true
+ | ...
+err.message
+ | ---
+ | - Timeout exceeded
+ | ...
+-- Put it to wait until move is done.
+ok, err = nil
+ | ---
+ | ...
+_ = fiber.create(function() ok, err = lref.add(0, 0, big_timeout) end)
+ | ---
+ | ...
+vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
+ | ---
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return ok or err end)
+ | ---
+ | - true
+ | ...
+ok, err
+ | ---
+ | - true
+ | - null
+ | ...
+assert(lsched.move_count == 0)
+ | ---
+ | - true
+ | ...
+wait_bucket_is_collected(1)
+ | ---
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+test_run:wait_cond(function() return ok or err end)
+ | ---
+ | - true
+ | ...
+ok, err
+ | ---
+ | - true
+ | - null
+ | ...
+assert(lsched.move_count == 0)
+ | ---
+ | - true
+ | ...
+assert(lsched.ref_count == 1)
+ | ---
+ | - true
+ | ...
+lref.del(0, 0)
+ | ---
+ | - true
+ | ...
+assert(lsched.ref_count == 0)
+ | ---
+ | - true
+ | ...
+
+--
+-- Refs can't block sends infinitely. The scheduler must be fair and share time
+-- between ref/move.
+--
+do_refs = true
+ | ---
+ | ...
+ref_worker_count = 10
+ | ---
+ | ...
+function ref_worker()                                                           \
+    while do_refs do                                                            \
+        lref.add(0, 0, big_timeout)                                             \
+        fiber.sleep(small_timeout)                                              \
+        lref.del(0, 0)                                                          \
+    end                                                                         \
+    ref_worker_count = ref_worker_count - 1                                     \
+end
+ | ---
+ | ...
+-- Simulate many fibers doing something with a ref being kept.
+for i = 1, ref_worker_count do fiber.create(ref_worker) end
+ | ---
+ | ...
+assert(lref.count > 0)
+ | ---
+ | - true
+ | ...
+assert(lsched.ref_count > 0)
+ | ---
+ | - true
+ | ...
+-- Ensure it passes with default opts (when move is in great unfairness). It is
+-- important. Because moves are expected to be much longer than refs, and must
+-- not happen too often with ref load in progress. But still should eventually
+-- be processed.
+bucket_count = 100
+ | ---
+ | ...
+bucket_id = 1
+ | ---
+ | ...
+bucket_worker_count = 5
+ | ---
+ | ...
+function bucket_worker()                                                        \
+    while bucket_id <= bucket_count do                                          \
+        local id = bucket_id                                                    \
+        bucket_id = bucket_id + 1                                               \
+        assert(vshard.storage.bucket_send(id, util.replicasets[2]))             \
+    end                                                                         \
+    bucket_worker_count = bucket_worker_count - 1                               \
+end
+ | ---
+ | ...
+-- Simulate many rebalancer fibers like when max_sending is increased.
+for i = 1, bucket_worker_count do fiber.create(bucket_worker) end
+ | ---
+ | ...
+test_run:wait_cond(function() return bucket_worker_count == 0 end)
+ | ---
+ | - true
+ | ...
+
+do_refs = false
+ | ---
+ | ...
+test_run:wait_cond(function() return ref_worker_count == 0 end)
+ | ---
+ | - true
+ | ...
+assert(lref.count == 0)
+ | ---
+ | - true
+ | ...
+assert(lsched.ref_count == 0)
+ | ---
+ | - true
+ | ...
+
+for i = 1, bucket_count do wait_bucket_is_collected(i) end
+ | ---
+ | ...
+
+--
+-- Refs can't block recvs infinitely.
+--
+do_refs = true
+ | ---
+ | ...
+for i = 1, ref_worker_count do fiber.create(ref_worker) end
+ | ---
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+bucket_count = 100
+ | ---
+ | ...
+bucket_id = 1
+ | ---
+ | ...
+bucket_worker_count = 5
+ | ---
+ | ...
+function bucket_worker()                                                        \
+    while bucket_id <= bucket_count do                                          \
+        local id = bucket_id                                                    \
+        bucket_id = bucket_id + 1                                               \
+        assert(vshard.storage.bucket_send(id, util.replicasets[1]))             \
+    end                                                                         \
+    bucket_worker_count = bucket_worker_count - 1                               \
+end
+ | ---
+ | ...
+for i = 1, bucket_worker_count do fiber.create(bucket_worker) end
+ | ---
+ | ...
+test_run:wait_cond(function() return bucket_worker_count == 0 end)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_1_a')
+ | ---
+ | ...
+do_refs = false
+ | ---
+ | ...
+test_run:wait_cond(function() return ref_worker_count == 0 end)
+ | ---
+ | - true
+ | ...
+assert(lref.count == 0)
+ | ---
+ | - true
+ | ...
+assert(lsched.ref_count == 0)
+ | ---
+ | - true
+ | ...
+
+_ = test_run:switch('storage_2_a')
+ | ---
+ | ...
+for i = 1, bucket_count do wait_bucket_is_collected(i) end
+ | ---
+ | ...
+
+_ = test_run:switch("default")
+ | ---
+ | ...
+test_run:drop_cluster(REPLICASET_2)
+ | ---
+ | ...
+test_run:drop_cluster(REPLICASET_1)
+ | ---
+ | ...
+_ = test_run:cmd('clear filter')
+ | ---
+ | ...
diff --git a/test/storage/scheduler.test.lua b/test/storage/scheduler.test.lua
new file mode 100644
index 0000000..8628f0e
--- /dev/null
+++ b/test/storage/scheduler.test.lua
@@ -0,0 +1,178 @@
+test_run = require('test_run').new()
+REPLICASET_1 = { 'storage_1_a', 'storage_1_b' }
+REPLICASET_2 = { 'storage_2_a', 'storage_2_b' }
+
+test_run:create_cluster(REPLICASET_1, 'storage')
+test_run:create_cluster(REPLICASET_2, 'storage')
+util = require('util')
+util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
+util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
+util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage()')
+util.push_rs_filters(test_run)
+
+--
+-- gh-147: scheduler helps to share time fairly between incompatible but
+-- necessary operations - storage refs and bucket moves. Refs are used for the
+-- consistent map-reduce feature when the whole cluster can be scanned without
+-- being afraid that some data may slip through requests on behalf of the
+-- rebalancer.
+--
+
+_ = test_run:switch('storage_1_a')
+
+vshard.storage.rebalancer_disable()
+vshard.storage.bucket_force_create(1, 1500)
+
+_ = test_run:switch('storage_2_a')
+vshard.storage.rebalancer_disable()
+vshard.storage.bucket_force_create(1501, 1500)
+
+_ = test_run:switch('storage_1_a')
+--
+-- Bucket_send() uses the scheduler.
+--
+lsched = require('vshard.storage.sched')
+assert(lsched.move_strike == 0)
+assert(lsched.move_count == 0)
+big_timeout = 1000000
+big_timeout_opts = {timeout = big_timeout}
+vshard.storage.bucket_send(1, util.replicasets[2], big_timeout_opts)
+assert(lsched.move_strike == 1)
+assert(lsched.move_count == 0)
+wait_bucket_is_collected(1)
+
+_ = test_run:switch('storage_2_a')
+lsched = require('vshard.storage.sched')
+--
+-- Bucket_recv() uses the scheduler.
+--
+assert(lsched.move_strike == 1)
+assert(lsched.move_count == 0)
+
+--
+-- When move is in progress, it is properly accounted.
+--
+_ = test_run:switch('storage_1_a')
+vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
+
+_ = test_run:switch('storage_2_a')
+big_timeout = 1000000
+big_timeout_opts = {timeout = big_timeout}
+ok, err = nil
+assert(lsched.move_strike == 1)
+_ = fiber.create(function()                                                     \
+    ok, err = vshard.storage.bucket_send(1, util.replicasets[1],                \
+                                         big_timeout_opts)                      \
+end)
+-- Strike increase does not mean the move finished. It means it was successfully
+-- scheduled.
+assert(lsched.move_strike == 2)
+
+_ = test_run:switch('storage_1_a')
+test_run:wait_cond(function() return lsched.move_strike == 2 end)
+
+--
+-- Ref is not allowed during move.
+--
+small_timeout = 0.000001
+lref = require('vshard.storage.ref')
+ok, err = lref.add(0, 0, small_timeout)
+assert(not ok)
+err.message
+-- Put it to wait until move is done.
+ok, err = nil
+_ = fiber.create(function() ok, err = lref.add(0, 0, big_timeout) end)
+vshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
+
+_ = test_run:switch('storage_2_a')
+test_run:wait_cond(function() return ok or err end)
+ok, err
+assert(lsched.move_count == 0)
+wait_bucket_is_collected(1)
+
+_ = test_run:switch('storage_1_a')
+test_run:wait_cond(function() return ok or err end)
+ok, err
+assert(lsched.move_count == 0)
+assert(lsched.ref_count == 1)
+lref.del(0, 0)
+assert(lsched.ref_count == 0)
+
+--
+-- Refs can't block sends infinitely. The scheduler must be fair and share time
+-- between ref/move.
+--
+do_refs = true
+ref_worker_count = 10
+function ref_worker()                                                           \
+    while do_refs do                                                            \
+        lref.add(0, 0, big_timeout)                                             \
+        fiber.sleep(small_timeout)                                              \
+        lref.del(0, 0)                                                          \
+    end                                                                         \
+    ref_worker_count = ref_worker_count - 1                                     \
+end
+-- Simulate many fibers doing something with a ref being kept.
+for i = 1, ref_worker_count do fiber.create(ref_worker) end
+assert(lref.count > 0)
+assert(lsched.ref_count > 0)
+-- Ensure it passes with default opts (when move is in great unfairness). It is
+-- important. Because moves are expected to be much longer than refs, and must
+-- not happen too often with ref load in progress. But still should eventually
+-- be processed.
+bucket_count = 100
+bucket_id = 1
+bucket_worker_count = 5
+function bucket_worker()                                                        \
+    while bucket_id <= bucket_count do                                          \
+        local id = bucket_id                                                    \
+        bucket_id = bucket_id + 1                                               \
+        assert(vshard.storage.bucket_send(id, util.replicasets[2]))             \
+    end                                                                         \
+    bucket_worker_count = bucket_worker_count - 1                               \
+end
+-- Simulate many rebalancer fibers like when max_sending is increased.
+for i = 1, bucket_worker_count do fiber.create(bucket_worker) end
+test_run:wait_cond(function() return bucket_worker_count == 0 end)
+
+do_refs = false
+test_run:wait_cond(function() return ref_worker_count == 0 end)
+assert(lref.count == 0)
+assert(lsched.ref_count == 0)
+
+for i = 1, bucket_count do wait_bucket_is_collected(i) end
+
+--
+-- Refs can't block recvs infinitely.
+--
+do_refs = true
+for i = 1, ref_worker_count do fiber.create(ref_worker) end
+
+_ = test_run:switch('storage_2_a')
+bucket_count = 100
+bucket_id = 1
+bucket_worker_count = 5
+function bucket_worker()                                                        \
+    while bucket_id <= bucket_count do                                          \
+        local id = bucket_id                                                    \
+        bucket_id = bucket_id + 1                                               \
+        assert(vshard.storage.bucket_send(id, util.replicasets[1]))             \
+    end                                                                         \
+    bucket_worker_count = bucket_worker_count - 1                               \
+end
+for i = 1, bucket_worker_count do fiber.create(bucket_worker) end
+test_run:wait_cond(function() return bucket_worker_count == 0 end)
+
+_ = test_run:switch('storage_1_a')
+do_refs = false
+test_run:wait_cond(function() return ref_worker_count == 0 end)
+assert(lref.count == 0)
+assert(lsched.ref_count == 0)
+
+_ = test_run:switch('storage_2_a')
+for i = 1, bucket_count do wait_bucket_is_collected(i) end
+
+_ = test_run:switch("default")
+test_run:drop_cluster(REPLICASET_2)
+test_run:drop_cluster(REPLICASET_1)
+_ = test_run:cmd('clear filter')
diff --git a/test/unit-tap/ref.test.lua b/test/unit-tap/ref.test.lua
index d987a63..ba95eee 100755
--- a/test/unit-tap/ref.test.lua
+++ b/test/unit-tap/ref.test.lua
@@ -5,6 +5,7 @@ local test = tap.test('cfg')
 local fiber = require('fiber')
 local lregistry = require('vshard.registry')
 local lref = require('vshard.storage.ref')
+require('vshard.storage.sched')
 
 local big_timeout = 1000000
 local small_timeout = 0.000001
@@ -19,9 +20,11 @@ local sid3 = 2
 --
 
 --
--- Refs used storage API to get bucket space state and wait on its changes. But
--- not important for these unit tests.
+-- Refs use storage API to get bucket space state and wait on its changes. And
+-- scheduler API to sync with bucket moves. But not important for these unit
+-- tests.
 --
+
 local function bucket_are_all_rw()
     return true
 end
diff --git a/test/unit-tap/scheduler.test.lua b/test/unit-tap/scheduler.test.lua
new file mode 100755
index 0000000..0af4f5e
--- /dev/null
+++ b/test/unit-tap/scheduler.test.lua
@@ -0,0 +1,555 @@
+#!/usr/bin/env tarantool
+
+local fiber = require('fiber')
+local tap = require('tap')
+local test = tap.test('cfg')
+local lregistry = require('vshard.registry')
+local lref = require('vshard.storage.ref')
+local lsched = require('vshard.storage.sched')
+
+local big_timeout = 1000000
+local small_timeout = 0.000001
+
+--
+-- gh-147: scheduler helps to share time fairly between incompatible but
+-- necessary operations - storage refs and bucket moves. Refs are used for the
+-- consistent map-reduce feature when the whole cluster can be scanned without
+-- being afraid that some data may slip through requests on behalf of the
+-- rebalancer.
+--
+
+box.cfg{
+    log = 'log.txt'
+}
+-- io.write = function(...) require('log').info(...) end
+
+--
+-- Storage registry is used by the ref module. The ref module is used in the
+-- tests in order to ensure the scheduler performs ref garbage collection.
+--
+local function bucket_are_all_rw()
+    return true
+end
+
+lregistry.storage = {
+    bucket_are_all_rw = bucket_are_all_rw,
+}
+
+local function fiber_csw()
+    return fiber.info()[fiber.self():id()].csw
+end
+
+local function fiber_set_joinable()
+    fiber.self():set_joinable(true)
+end
+
+local function test_basic(test)
+    test:plan(32)
+
+    local ref_strike = lsched.ref_strike
+    --
+    -- Simplest possible test - start and end a ref.
+    --
+    test:is(lsched.ref_start(big_timeout), big_timeout, 'start ref')
+    test:is(lsched.ref_count, 1, '1 ref')
+    test:is(lsched.ref_strike, ref_strike + 1, '+1 ref in a row')
+    lsched.ref_end(1)
+    test:is(lsched.ref_count, 0, '0 refs after end')
+    test:is(lsched.ref_strike, ref_strike + 1, 'strike is kept')
+
+    lsched.ref_start(big_timeout)
+    lsched.ref_end(1)
+    test:is(lsched.ref_strike, ref_strike + 2, 'strike grows')
+    test:is(lsched.ref_count, 0, 'count does not')
+
+    --
+    -- Move ends ref strike.
+    --
+    test:is(lsched.move_start(big_timeout), big_timeout, 'start move')
+    test:is(lsched.move_count, 1, '1 move')
+    test:is(lsched.move_strike, 1, '+1 move strike')
+    test:is(lsched.ref_strike, 0, 'ref strike is interrupted')
+
+    --
+    -- Ref times out if there is a move in progress.
+    --
+    local ok, err = lsched.ref_start(small_timeout)
+    test:ok(not ok and err, 'ref fails')
+    test:is(lsched.move_count, 1, 'still 1 move')
+    test:is(lsched.move_strike, 1, 'still 1 move strike')
+    test:is(lsched.ref_count, 0, 'could not add ref')
+    test:is(lsched.ref_queue, 0, 'empty ref queue')
+
+    --
+    -- Ref succeeds when move ends.
+    --
+    local f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.ref_start(big_timeout)
+    end)
+    fiber.sleep(small_timeout)
+    lsched.move_end(1)
+    local new_timeout
+    ok, new_timeout = f:join()
+    test:ok(ok and new_timeout < big_timeout, 'correct timeout')
+    test:is(lsched.move_count, 0, 'no moves')
+    test:is(lsched.move_strike, 0, 'move strike ends')
+    test:is(lsched.ref_count, 1, '+1 ref')
+    test:is(lsched.ref_strike, 1, '+1 ref strike')
+
+    --
+    -- Move succeeds when ref ends.
+    --
+    f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.move_start(big_timeout)
+    end)
+    fiber.sleep(small_timeout)
+    lsched.ref_end(1)
+    ok, new_timeout = f:join()
+    test:ok(ok and new_timeout < big_timeout, 'correct timeout')
+    test:is(lsched.ref_count, 0, 'no refs')
+    test:is(lsched.ref_strike, 0, 'ref strike ends')
+    test:is(lsched.move_count, 1, '+1 move')
+    test:is(lsched.move_strike, 1, '+1 move strike')
+    lsched.move_end(1)
+
+    --
+    -- Move times out when there is a ref.
+    --
+    test:is(lsched.ref_start(big_timeout), big_timeout, '+ ref')
+    ok, err = lsched.move_start(small_timeout)
+    test:ok(not ok and err, 'move fails')
+    test:is(lsched.ref_count, 1, 'still 1 ref')
+    test:is(lsched.ref_strike, 1, 'still 1 ref strike')
+    test:is(lsched.move_count, 0, 'could not add move')
+    test:is(lsched.move_queue, 0, 'empty move queue')
+    lsched.ref_end(1)
+end
+
+local function test_negative_timeout(test)
+    test:plan(12)
+
+    --
+    -- Move works even with negative timeout if no refs.
+    --
+    test:is(lsched.move_start(-1), -1, 'timeout does not matter if no refs')
+    test:is(lsched.move_count, 1, '+1 move')
+
+    --
+    -- Ref fails immediately if timeout negative and has moves.
+    --
+    local csw = fiber_csw()
+    local ok, err = lsched.ref_start(-1)
+    test:ok(not ok and err, 'ref fails')
+    test:is(csw, fiber_csw(), 'no yields')
+    test:is(lsched.ref_count, 0, 'no refs')
+    test:is(lsched.ref_queue, 0, 'no ref queue')
+
+    --
+    -- Ref works even with negative timeout if no moves.
+    --
+    lsched.move_end(1)
+    test:is(lsched.ref_start(-1), -1, 'timeout does not matter if no moves')
+    test:is(lsched.ref_count, 1, '+1 ref')
+
+    --
+    -- Move fails immediately if timeout is negative and has refs.
+    --
+    csw = fiber_csw()
+    ok, err = lsched.move_start(-1)
+    test:ok(not ok and err, 'move fails')
+    test:is(csw, fiber_csw(), 'no yields')
+    test:is(lsched.move_count, 0, 'no moves')
+    test:is(lsched.move_queue, 0, 'no move queue')
+    lsched.ref_end(1)
+end
+
+local function test_move_gc_ref(test)
+    test:plan(10)
+
+    --
+    -- Move deletes expired refs if it may help to start the move.
+    --
+    for sid = 1, 10 do
+        for rid = 1, 5 do
+            lref.add(rid, sid, small_timeout)
+        end
+    end
+    test:is(lsched.ref_count, 50, 'refs are in progress')
+    local ok, err = lsched.move_start(-1)
+    test:ok(not ok and err, 'move without timeout failed')
+
+    fiber.sleep(small_timeout)
+    test:is(lsched.move_start(-1), -1, 'succeeds even with negative timeout')
+    test:is(lsched.ref_count, 0, 'all refs are expired and deleted')
+    test:is(lref.count, 0, 'ref module knows about it')
+    test:is(lsched.move_count, 1, 'move is started')
+    lsched.move_end(1)
+
+    --
+    -- May need more than 1 GC step.
+    --
+    for rid = 1, 5 do
+        lref.add(0, rid, small_timeout)
+    end
+    for rid = 1, 5 do
+        lref.add(1, rid, small_timeout * 100)
+    end
+    local new_timeout = lsched.move_start(big_timeout)
+    test:ok(new_timeout < big_timeout, 'succeeds by doing 2 gc steps')
+    test:is(lsched.ref_count, 0, 'all refs are expired and deleted')
+    test:is(lref.count, 0, 'ref module knows about it')
+    test:is(lsched.move_count, 1, 'move is started')
+    lsched.move_end(1)
+end
+
+local function test_ref_strike(test)
+    test:plan(10)
+
+    local quota = lsched.ref_quota
+    --
+    -- Strike should stop new refs if they exceed the quota and there is a
+    -- pending move.
+    --
+    -- End ref strike if there was one.
+    lsched.move_start(small_timeout)
+    lsched.move_end(1)
+    -- Ref strike starts.
+    assert(lsched.ref_start(small_timeout))
+
+    local f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.move_start(big_timeout)
+    end)
+    test:is(lsched.move_queue, 1, 'move is queued')
+    --
+    -- New refs should work only until quota is reached, because there is a
+    -- pending move.
+    --
+    for i = 1, quota - 1 do
+        assert(lsched.ref_start(small_timeout))
+    end
+    local ok, err = lsched.ref_start(small_timeout)
+    test:ok(not ok and err, 'too long strike with move queue not empty')
+    test:is(lsched.ref_strike, quota, 'max strike is reached')
+    -- Even if number of current refs decreases, new still are not accepted.
+    -- Because there was too many in a row while a new move was waiting.
+    lsched.ref_end(1)
+    ok, err = lsched.ref_start(small_timeout)
+    test:ok(not ok and err, 'still too long strike after one unref')
+    test:is(lsched.ref_strike, quota, 'strike is unchanged')
+
+    lsched.ref_end(quota - 1)
+    local new_timeout
+    ok, new_timeout = f:join()
+    test:ok(ok and new_timeout < big_timeout, 'move succeeded')
+    test:is(lsched.move_count, 1, '+1 move')
+    test:is(lsched.move_strike, 1, '+1 move strike')
+    test:is(lsched.ref_count, 0, 'no refs')
+    test:is(lsched.ref_strike, 0, 'no ref strike')
+    lsched.move_end(1)
+end
+
+local function test_move_strike(test)
+    test:plan(10)
+
+    local quota = lsched.move_quota
+    --
+    -- Strike should stop new moves if they exceed the quota and there is a
+    -- pending ref.
+    --
+    -- End move strike if there was one.
+    lsched.ref_start(small_timeout)
+    lsched.ref_end(1)
+    -- Move strike starts.
+    assert(lsched.move_start(small_timeout))
+
+    local f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.ref_start(big_timeout)
+    end)
+    test:is(lsched.ref_queue, 1, 'ref is queued')
+    --
+    -- New moves should work only until quota is reached, because there is a
+    -- pending ref.
+    --
+    for i = 1, quota - 1 do
+        assert(lsched.move_start(small_timeout))
+    end
+    local ok, err = lsched.move_start(small_timeout)
+    test:ok(not ok and err, 'too long strike with ref queue not empty')
+    test:is(lsched.move_strike, quota, 'max strike is reached')
+    -- Even if number of current moves decreases, new still are not accepted.
+    -- Because there was too many in a row while a new ref was waiting.
+    lsched.move_end(1)
+    ok, err = lsched.move_start(small_timeout)
+    test:ok(not ok and err, 'still too long strike after one move end')
+    test:is(lsched.move_strike, quota, 'strike is unchanged')
+
+    lsched.move_end(quota - 1)
+    local new_timeout
+    ok, new_timeout = f:join()
+    test:ok(ok and new_timeout < big_timeout, 'ref succeeded')
+    test:is(lsched.ref_count, 1, '+1 ref')
+    test:is(lsched.ref_strike, 1, '+1 ref strike')
+    test:is(lsched.move_count, 0, 'no moves')
+    test:is(lsched.move_strike, 0, 'no move strike')
+    lsched.ref_end(1)
+end
+
+local function test_ref_increase_quota(test)
+    test:plan(4)
+
+    local quota = lsched.ref_quota
+    --
+    -- Ref quota increase allows to do more refs even if there are pending
+    -- moves.
+    --
+    -- End ref strike if there was one.
+    lsched.move_start(big_timeout)
+    lsched.move_end(1)
+    -- Fill the quota.
+    for _ = 1, quota do
+        assert(lsched.ref_start(big_timeout))
+    end
+    -- Start move to block new refs by quota.
+    local f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.move_start(big_timeout)
+    end)
+    test:ok(not lsched.ref_start(small_timeout), 'can not add ref - full quota')
+
+    lsched.cfg({sched_ref_quota = quota + 1})
+    test:ok(lsched.ref_start(small_timeout), 'now can add - quota is extended')
+
+    -- Decrease quota - should not accept new refs again.
+    lsched.cfg{sched_ref_quota = quota}
+    test:ok(not lsched.ref_start(small_timeout), 'full quota again')
+
+    lsched.ref_end(quota + 1)
+    local ok, new_timeout = f:join()
+    test:ok(ok and new_timeout < big_timeout, 'move started')
+    lsched.move_end(1)
+end
+
+local function test_move_increase_quota(test)
+    test:plan(4)
+
+    local quota = lsched.move_quota
+    --
+    -- Move quota increase allows to do more moves even if there are pending
+    -- refs.
+    --
+    -- End move strike if there was one.
+    lsched.ref_start(big_timeout)
+    lsched.ref_end(1)
+    -- Fill the quota.
+    for _ = 1, quota do
+        assert(lsched.move_start(big_timeout))
+    end
+    -- Start ref to block new moves by quota.
+    local f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.ref_start(big_timeout)
+    end)
+    test:ok(not lsched.move_start(small_timeout), 'can not add move - full quota')
+
+    lsched.cfg({sched_move_quota = quota + 1})
+    test:ok(lsched.move_start(small_timeout), 'now can add - quota is extended')
+
+    -- Decrease quota - should not accept new moves again.
+    lsched.cfg{sched_move_quota = quota}
+    test:ok(not lsched.move_start(small_timeout), 'full quota again')
+
+    lsched.move_end(quota + 1)
+    local ok, new_timeout = f:join()
+    test:ok(ok and new_timeout < big_timeout, 'ref started')
+    lsched.ref_end(1)
+end
+
+local function test_ref_decrease_quota(test)
+    test:plan(4)
+
+    local old_quota = lsched.ref_quota
+    --
+    -- Quota decrease should not affect any existing operations or break
+    -- anything.
+    --
+    lsched.cfg({sched_ref_quota = 10})
+    for _ = 1, 5 do
+        assert(lsched.ref_start(big_timeout))
+    end
+    test:is(lsched.ref_count, 5, 'started refs below quota')
+
+    local f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.move_start(big_timeout)
+    end)
+    test:ok(lsched.ref_start(big_timeout), 'another ref after move queued')
+
+    lsched.cfg({sched_ref_quota = 2})
+    test:ok(not lsched.ref_start(small_timeout), 'quota decreased - can not '..
+            'start ref')
+
+    lsched.ref_end(6)
+    local ok, new_timeout = f:join()
+    test:ok(ok and new_timeout, 'move is started')
+    lsched.move_end(1)
+
+    lsched.cfg({sched_ref_quota = old_quota})
+end
+
+local function test_move_decrease_quota(test)
+    test:plan(4)
+
+    local old_quota = lsched.move_quota
+    --
+    -- Quota decrease should not affect any existing operations or break
+    -- anything.
+    --
+    lsched.cfg({sched_move_quota = 10})
+    for _ = 1, 5 do
+        assert(lsched.move_start(big_timeout))
+    end
+    test:is(lsched.move_count, 5, 'started moves below quota')
+
+    local f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.ref_start(big_timeout)
+    end)
+    test:ok(lsched.move_start(big_timeout), 'another move after ref queued')
+
+    lsched.cfg({sched_move_quota = 2})
+    test:ok(not lsched.move_start(small_timeout), 'quota decreased - can not '..
+            'start move')
+
+    lsched.move_end(6)
+    local ok, new_timeout = f:join()
+    test:ok(ok and new_timeout, 'ref is started')
+    lsched.ref_end(1)
+
+    lsched.cfg({sched_move_quota = old_quota})
+end
+
+local function test_ref_zero_quota(test)
+    test:plan(6)
+
+    local old_quota = lsched.ref_quota
+    --
+    -- Zero quota is a valid value. Moreover, it is special. It means the
+    -- 0-quoted operation should always be paused in favor of the other
+    -- operation.
+    --
+    lsched.cfg({sched_ref_quota = 0})
+    test:ok(lsched.ref_start(big_timeout), 'started ref with 0 quota')
+
+    local f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.move_start(big_timeout)
+    end)
+    test:ok(not lsched.ref_start(small_timeout), 'can not add more refs if '..
+            'move is queued - quota 0')
+
+    lsched.ref_end(1)
+    local ok, new_timeout = f:join()
+    test:ok(ok and new_timeout, 'move is started')
+
+    -- Ensure ref never starts if there are always moves, when quota is 0.
+    f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.ref_start(big_timeout)
+    end)
+    local move_count = lsched.move_quota + 3
+    -- Start from 2 to account the already existing move.
+    for _ = 2, move_count do
+        -- Start one new move.
+        assert(lsched.move_start(big_timeout))
+        -- Start second new move.
+        assert(lsched.move_start(big_timeout))
+        -- End first move.
+        lsched.move_end(1)
+        -- In result the moves are always interleaving - no time for refs at
+        -- all.
+    end
+    test:is(lsched.move_count, move_count, 'moves exceed quota')
+    test:ok(lsched.move_strike > move_count, 'strike is not interrupted')
+
+    lsched.move_end(move_count)
+    ok, new_timeout = f:join()
+    test:ok(ok and new_timeout, 'ref finally started')
+    lsched.ref_end(1)
+
+    lsched.cfg({sched_ref_quota = old_quota})
+end
+
+local function test_move_zero_quota(test)
+    test:plan(6)
+
+    local old_quota = lsched.move_quota
+    --
+    -- Zero quota is a valid value. Moreover, it is special. It means the
+    -- 0-quoted operation should always be paused in favor of the other
+    -- operation.
+    --
+    lsched.cfg({sched_move_quota = 0})
+    test:ok(lsched.move_start(big_timeout), 'started move with 0 quota')
+
+    local f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.ref_start(big_timeout)
+    end)
+    test:ok(not lsched.move_start(small_timeout), 'can not add more moves if '..
+            'ref is queued - quota 0')
+
+    lsched.move_end(1)
+    local ok, new_timeout = f:join()
+    test:ok(ok and new_timeout, 'ref is started')
+
+    -- Ensure move never starts if there are always refs, when quota is 0.
+    f = fiber.create(function()
+        fiber_set_joinable()
+        return lsched.move_start(big_timeout)
+    end)
+    local ref_count = lsched.ref_quota + 3
+    -- Start from 2 to account the already existing ref.
+    for _ = 2, ref_count do
+        -- Start one new ref.
+        assert(lsched.ref_start(big_timeout))
+        -- Start second new ref.
+        assert(lsched.ref_start(big_timeout))
+        -- End first ref.
+        lsched.ref_end(1)
+        -- In result the refs are always interleaving - no time for moves at
+        -- all.
+    end
+    test:is(lsched.ref_count, ref_count, 'refs exceed quota')
+    test:ok(lsched.ref_strike > ref_count, 'strike is not interrupted')
+
+    lsched.ref_end(ref_count)
+    ok, new_timeout = f:join()
+    test:ok(ok and new_timeout, 'move finally started')
+    lsched.move_end(1)
+
+    lsched.cfg({sched_move_quota = old_quota})
+end
+
+test:plan(11)
+
+-- Change default values. Move is 1 by default, which would reduce the number of
+-- possible tests. Ref is decreased to speed the tests up.
+lsched.cfg({sched_ref_quota = 10, sched_move_quota = 5})
+
+test:test('basic', test_basic)
+test:test('negative timeout', test_negative_timeout)
+test:test('ref gc', test_move_gc_ref)
+test:test('ref strike', test_ref_strike)
+test:test('move strike', test_move_strike)
+test:test('ref add quota', test_ref_increase_quota)
+test:test('move add quota', test_move_increase_quota)
+test:test('ref decrease quota', test_ref_decrease_quota)
+test:test('move decrease quota', test_move_decrease_quota)
+test:test('ref zero quota', test_ref_zero_quota)
+test:test('move zero quota', test_move_zero_quota)
+
+os.exit(test:check() and 0 or 1)
diff --git a/test/unit/config.result b/test/unit/config.result
index e0b2482..9df3bf1 100644
--- a/test/unit/config.result
+++ b/test/unit/config.result
@@ -597,3 +597,62 @@ cfg.collect_bucket_garbage_interval = 100
 _ = lcfg.check(cfg)
 ---
 ...
+--
+-- gh-147: router map-reduce. It adds scheduler options on the storage.
+--
+cfg.sched_ref_quota = 100
+---
+...
+_ = lcfg.check(cfg)
+---
+...
+cfg.sched_ref_quota = 1
+---
+...
+_ = lcfg.check(cfg)
+---
+...
+cfg.sched_ref_quota = 0
+---
+...
+_ = lcfg.check(cfg)
+---
+...
+cfg.sched_ref_quota = -1
+---
+...
+util.check_error(lcfg.check, cfg)
+---
+- Scheduler storage ref quota must be non-negative number
+...
+cfg.sched_ref_quota = nil
+---
+...
+cfg.sched_move_quota = 100
+---
+...
+_ = lcfg.check(cfg)
+---
+...
+cfg.sched_move_quota = 1
+---
+...
+_ = lcfg.check(cfg)
+---
+...
+cfg.sched_move_quota = 0
+---
+...
+_ = lcfg.check(cfg)
+---
+...
+cfg.sched_move_quota = -1
+---
+...
+util.check_error(lcfg.check, cfg)
+---
+- Scheduler bucket move quota must be non-negative number
+...
+cfg.sched_move_quota = nil
+---
+...
diff --git a/test/unit/config.test.lua b/test/unit/config.test.lua
index a1c9f07..473e460 100644
--- a/test/unit/config.test.lua
+++ b/test/unit/config.test.lua
@@ -241,3 +241,26 @@ cfg.rebalancer_max_sending = nil
 --
 cfg.collect_bucket_garbage_interval = 100
 _ = lcfg.check(cfg)
+
+--
+-- gh-147: router map-reduce. It adds scheduler options on the storage.
+--
+cfg.sched_ref_quota = 100
+_ = lcfg.check(cfg)
+cfg.sched_ref_quota = 1
+_ = lcfg.check(cfg)
+cfg.sched_ref_quota = 0
+_ = lcfg.check(cfg)
+cfg.sched_ref_quota = -1
+util.check_error(lcfg.check, cfg)
+cfg.sched_ref_quota = nil
+
+cfg.sched_move_quota = 100
+_ = lcfg.check(cfg)
+cfg.sched_move_quota = 1
+_ = lcfg.check(cfg)
+cfg.sched_move_quota = 0
+_ = lcfg.check(cfg)
+cfg.sched_move_quota = -1
+util.check_error(lcfg.check, cfg)
+cfg.sched_move_quota = nil
diff --git a/vshard/cfg.lua b/vshard/cfg.lua
index 63d5414..30f8794 100644
--- a/vshard/cfg.lua
+++ b/vshard/cfg.lua
@@ -274,6 +274,14 @@ local cfg_template = {
         type = 'string', name = 'Discovery mode: on, off, once',
         is_optional = true, default = 'on', check = check_discovery_mode
     },
+    sched_ref_quota = {
+        name = 'Scheduler storage ref quota', type = 'non-negative number',
+        is_optional = true, default = consts.DEFAULT_SCHED_REF_QUOTA
+    },
+    sched_move_quota = {
+        name = 'Scheduler bucket move quota', type = 'non-negative number',
+        is_optional = true, default = consts.DEFAULT_SCHED_MOVE_QUOTA
+    },
 }
 
 --
diff --git a/vshard/consts.lua b/vshard/consts.lua
index 0ffe0e2..47a893b 100644
--- a/vshard/consts.lua
+++ b/vshard/consts.lua
@@ -41,6 +41,11 @@ return {
     GC_BACKOFF_INTERVAL = 5,
     RECOVERY_BACKOFF_INTERVAL = 5,
     COLLECT_LUA_GARBAGE_INTERVAL = 100;
+    DEFAULT_BUCKET_SEND_TIMEOUT = 10,
+    DEFAULT_BUCKET_RECV_TIMEOUT = 10,
+
+    DEFAULT_SCHED_REF_QUOTA = 300,
+    DEFAULT_SCHED_MOVE_QUOTA = 1,
 
     DISCOVERY_IDLE_INTERVAL = 10,
     DISCOVERY_WORK_INTERVAL = 1,
diff --git a/vshard/storage/CMakeLists.txt b/vshard/storage/CMakeLists.txt
index 7c1e97d..396664a 100644
--- a/vshard/storage/CMakeLists.txt
+++ b/vshard/storage/CMakeLists.txt
@@ -1,2 +1,2 @@
-install(FILES init.lua reload_evolution.lua ref.lua
+install(FILES init.lua reload_evolution.lua ref.lua sched.lua
         DESTINATION ${TARANTOOL_INSTALL_LUADIR}/vshard/storage)
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 2957f48..31f668f 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -17,7 +17,7 @@ if rawget(_G, MODULE_INTERNALS) then
         'vshard.replicaset', 'vshard.util',
         'vshard.storage.reload_evolution',
         'vshard.lua_gc', 'vshard.rlist', 'vshard.registry',
-        'vshard.heap', 'vshard.storage.ref',
+        'vshard.heap', 'vshard.storage.ref', 'vshard.storage.sched',
     }
     for _, module in pairs(vshard_modules) do
         package.loaded[module] = nil
@@ -32,6 +32,7 @@ local util = require('vshard.util')
 local lua_gc = require('vshard.lua_gc')
 local lregistry = require('vshard.registry')
 local lref = require('vshard.storage.ref')
+local lsched = require('vshard.storage.sched')
 local reload_evolution = require('vshard.storage.reload_evolution')
 local fiber_cond_wait = util.fiber_cond_wait
 local bucket_ref_new
@@ -1142,16 +1143,33 @@ local function bucket_recv_xc(bucket_id, from, data, opts)
             return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bucket_id, msg,
                                       from)
         end
-        if lref.count > 0 then
-            return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED)
-        end
         if is_this_replicaset_locked() then
             return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED)
         end
         if not bucket_receiving_quota_add(-1) then
             return nil, lerror.vshard(lerror.code.TOO_MANY_RECEIVING)
         end
-        _bucket:insert({bucket_id, recvg, from})
+        local timeout = opts and opts.timeout or
+                        consts.DEFAULT_BUCKET_SEND_TIMEOUT
+        local ok, err = lsched.move_start(timeout)
+        if not ok then
+            return nil, err
+        end
+        assert(lref.count == 0)
+        -- Move schedule is done only for the time of _bucket update.
+        -- The reason is that one bucket_send() calls bucket_recv() on the
+        -- remote storage multiple times. If the latter would schedule new moves
+        -- on each call, it could happen that the scheduler would block it in
+        -- favor of refs right in the middle of bucket_send().
+        -- It would lead to a deadlock, because refs won't be able to start -
+        -- the bucket won't be writable.
+        -- This way still provides fair scheduling, but does not have the
+        -- described issue.
+        ok, err = pcall(_bucket.insert, _bucket, {bucket_id, recvg, from})
+        lsched.move_end(1)
+        if not ok then
+            return nil, lerror.make(err)
+        end
     elseif b.status ~= recvg then
         local msg = string.format("bucket state is changed: was receiving, "..
                                   "became %s", b.status)
@@ -1434,7 +1452,7 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard)
     ref.rw_lock = true
     exception_guard.ref = ref
     exception_guard.drop_rw_lock = true
-    local timeout = opts and opts.timeout or 10
+    local timeout = opts and opts.timeout or consts.DEFAULT_BUCKET_SEND_TIMEOUT
     local deadline = fiber_clock() + timeout
     while ref.rw ~= 0 do
         timeout = deadline - fiber_clock()
@@ -1446,9 +1464,6 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard)
 
     local _bucket = box.space._bucket
     local bucket = _bucket:get({bucket_id})
-    if lref.count > 0 then
-        return nil, lerror.vshard(lerror.code.STORAGE_IS_REFERENCED)
-    end
     if is_this_replicaset_locked() then
         return nil, lerror.vshard(lerror.code.REPLICASET_IS_LOCKED)
     end
@@ -1468,7 +1483,25 @@ local function bucket_send_xc(bucket_id, destination, opts, exception_guard)
     local idx = M.shard_index
     local bucket_generation = M.bucket_generation
     local sendg = consts.BUCKET.SENDING
-    _bucket:replace({bucket_id, sendg, destination})
+
+    local ok, err = lsched.move_start(timeout)
+    if not ok then
+        return nil, err
+    end
+    assert(lref.count == 0)
+    -- Move is scheduled only for the time of _bucket update because:
+    --
+    -- * it is consistent with bucket_recv() (see its comments);
+    --
+    -- * gives the same effect as if move was in the scheduler for the whole
+    --   bucket_send() time, because refs won't be able to start anyway - the
+    --   bucket is not writable.
+    ok, err = pcall(_bucket.replace, _bucket, {bucket_id, sendg, destination})
+    lsched.move_end(1)
+    if not ok then
+        return nil, lerror.make(err)
+    end
+
     -- From this moment the bucket is SENDING. Such a status is
     -- even stronger than the lock.
     ref.rw_lock = false
@@ -2542,6 +2575,7 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
         M.bucket_on_replace = bucket_generation_increment
     end
 
+    lsched.cfg(vshard_cfg)
     lreplicaset.rebind_replicasets(new_replicasets, M.replicasets)
     lreplicaset.outdate_replicasets(M.replicasets)
     M.replicasets = new_replicasets
diff --git a/vshard/storage/ref.lua b/vshard/storage/ref.lua
index 7589cb9..2daad6b 100644
--- a/vshard/storage/ref.lua
+++ b/vshard/storage/ref.lua
@@ -33,6 +33,7 @@ local lregistry = require('vshard.registry')
 local fiber_clock = lfiber.clock
 local fiber_yield = lfiber.yield
 local DEADLINE_INFINITY = lconsts.DEADLINE_INFINITY
+local TIMEOUT_INFINITY = lconsts.TIMEOUT_INFINITY
 local LUA_CHUNK_SIZE = lconsts.LUA_CHUNK_SIZE
 
 --
@@ -88,6 +89,7 @@ local function ref_session_new(sid)
     -- Cache global session storages as upvalues to save on M indexing.
     local global_heap = M.session_heap
     local global_map = M.session_map
+    local sched = lregistry.storage_sched
 
     local function ref_session_discount(self, del_count)
         local new_count = M.count - del_count
@@ -97,6 +99,8 @@ local function ref_session_new(sid)
         new_count = count - del_count
         assert(new_count >= 0)
         count = new_count
+
+        sched.ref_end(del_count)
     end
 
     local function ref_session_update_deadline(self)
@@ -310,10 +314,17 @@ local function ref_add(rid, sid, timeout)
     local deadline = now + timeout
     local ok, err, session
     local storage = lregistry.storage
+    local sched = lregistry.storage_sched
+
+    timeout, err = sched.ref_start(timeout)
+    if not timeout then
+        return nil, err
+    end
+
     while not storage.bucket_are_all_rw() do
         ok, err = storage.bucket_generation_wait(timeout)
         if not ok then
-            return nil, err
+            goto fail_sched
         end
         now = fiber_clock()
         timeout = deadline - now
@@ -322,7 +333,13 @@ local function ref_add(rid, sid, timeout)
     if not session then
         session = ref_session_new(sid)
     end
-    return session:add(rid, deadline, now)
+    ok, err = session:add(rid, deadline, now)
+    if ok then
+        return true
+    end
+::fail_sched::
+    sched.ref_end(1)
+    return nil, err
 end
 
 local function ref_use(rid, sid)
@@ -341,6 +358,14 @@ local function ref_del(rid, sid)
     return session:del(rid)
 end
 
+local function ref_next_deadline()
+    local session = M.session_heap:top()
+    if not session then
+        return fiber_clock() + TIMEOUT_INFINITY
+    end
+    return session.deadline
+end
+
 local function ref_kill_session(sid)
     local session = M.session_map[sid]
     if session then
@@ -366,6 +391,7 @@ M.add = ref_add
 M.use = ref_use
 M.cfg = ref_cfg
 M.kill = ref_kill_session
+M.next_deadline = ref_next_deadline
 lregistry.storage_ref = M
 
 return M
diff --git a/vshard/storage/sched.lua b/vshard/storage/sched.lua
new file mode 100644
index 0000000..0ac71f4
--- /dev/null
+++ b/vshard/storage/sched.lua
@@ -0,0 +1,231 @@
+--
+-- Scheduler module ensures fair time sharing between incompatible operations:
+-- storage refs and bucket moves.
+-- Storage ref is supposed to prevent all bucket moves and provide safe
+-- environment for all kinds of possible requests on entire dataset of all
+-- spaces stored on the instance.
+-- Bucket move, on the contrary, wants to make a part of the dataset not usable
+-- temporary.
+-- Without a scheduler it would be possible to always keep at least one ref on
+-- the storage and block bucket moves forever. Or vice versa - during
+-- rebalancing block all incoming refs for the entire time of data migration,
+-- essentially making map-reduce not usable since it heavily depends on refs.
+--
+-- The schedule divides storage time between refs and moves so both of them can
+-- execute without blocking each other. Division proportions depend on the
+-- configuration settings.
+--
+-- Idea of non-blockage is based on quotas and strikes. Move and ref both have
+-- quotas. When one op executes more than quota requests in a row (makes a
+-- strike) while the other op has queued requests, the first op stops accepting
+-- new requests until the other op executes.
+--
+
+local MODULE_INTERNALS = '__module_vshard_storage_sched'
+-- Update when change behaviour of anything in the file, to be able to reload.
+local MODULE_VERSION = 1
+
+local lfiber = require('fiber')
+local lerror = require('vshard.error')
+local lconsts = require('vshard.consts')
+local lregistry = require('vshard.registry')
+local lutil = require('vshard.util')
+local fiber_clock = lfiber.clock
+local fiber_cond_wait = lutil.fiber_cond_wait
+local fiber_is_self_canceled = lutil.fiber_is_self_canceled
+
+local M = rawget(_G, MODULE_INTERNALS)
+if not M then
+    M = {
+        ---------------- Common module attributes ----------------
+        module_version = MODULE_VERSION,
+        -- Scheduler condition is signaled every time anything significant
+        -- happens - count of an operation type drops to 0, or quota increased,
+        -- etc.
+        cond = lfiber.cond(),
+
+        -------------------------- Refs --------------------------
+        -- Number of ref requests waiting for start.
+        ref_queue = 0,
+        -- Number of ref requests being executed. It is the same as ref's module
+        -- counter, but is duplicated here for the sake of isolation and
+        -- symmetry with moves.
+        ref_count = 0,
+        -- Number of ref requests executed in a row. When becomes bigger than
+        -- quota, any next queued move blocks new refs.
+        ref_strike = 0,
+        ref_quota = lconsts.DEFAULT_SCHED_REF_QUOTA,
+
+        ------------------------- Moves --------------------------
+        -- Number of move requests waiting for start.
+        move_queue = 0,
+        -- Number of move requests being executed.
+        move_count = 0,
+        -- Number of move requests executed in a row. When becomes bigger than
+        -- quota, any next queued ref blocks new moves.
+        move_strike = 0,
+        move_quota = lconsts.DEFAULT_SCHED_MOVE_QUOTA,
+    }
+else
+    return M
+end
+
+local function sched_wait_anything(timeout)
+    return fiber_cond_wait(M.cond, timeout)
+end
+
+--
+-- Return the remaining timeout in case there was a yield. This helps to save
+-- current clock get in the caller code if there were no yields.
+--
+local function sched_ref_start(timeout)
+    local deadline = fiber_clock() + timeout
+    local ok, err
+    -- Fast-path. Moves are extremely rare. No need to inc-dec the ref queue
+    -- then nor try to start some loops.
+    if M.move_count == 0 and M.move_queue == 0 then
+        goto success
+    end
+
+    M.ref_queue = M.ref_queue + 1
+
+::retry::
+    if M.move_count > 0 then
+        goto wait_and_retry
+    end
+    -- Even if move count is zero, must ensure the time usage is fair. Does not
+    -- matter in case the moves have no quota at all. That allows to ignore them
+    -- infinitely until all refs end voluntarily.
+    if M.move_queue > 0 and M.ref_strike >= M.ref_quota and
+       M.move_quota > 0 then
+        goto wait_and_retry
+    end
+
+    M.ref_queue = M.ref_queue - 1
+
+::success::
+    M.ref_count = M.ref_count + 1
+    M.ref_strike = M.ref_strike + 1
+    M.move_strike = 0
+    do return timeout end
+
+::wait_and_retry::
+    ok, err = sched_wait_anything(timeout)
+    if not ok then
+        M.ref_queue = M.ref_queue - 1
+        return nil, err
+    end
+    timeout = deadline - fiber_clock()
+    goto retry
+end
+
+local function sched_ref_end(count)
+    count = M.ref_count - count
+    M.ref_count = count
+    if count == 0 and M.move_queue > 0 then
+        M.cond:broadcast()
+    end
+end
+
+--
+-- Return the remaining timeout in case there was a yield. This helps to save
+-- current clock get in the caller code if there were no yields.
+--
+local function sched_move_start(timeout)
+    local deadline = fiber_clock() + timeout
+    local ok, err, ref_deadline
+    local lref = lregistry.storage_ref
+    -- Fast-path. Refs are not extremely rare *when used*. But they are not
+    -- expected to be used in a lot of installations. So most of the times the
+    -- moves should work right away.
+    if M.ref_count == 0 and M.ref_queue == 0 then
+        goto success
+    end
+
+    M.move_queue = M.move_queue + 1
+
+::retry::
+    if M.ref_count > 0 then
+        ref_deadline = lref.next_deadline()
+        if ref_deadline < deadline then
+            timeout = ref_deadline - fiber_clock()
+        end
+        ok, err = sched_wait_anything(timeout)
+        timeout = deadline - fiber_clock()
+        if ok then
+            goto retry
+        end
+        if fiber_is_self_canceled() then
+            goto fail
+        end
+        -- Even if the timeout has expired already (or was 0 from the
+        -- beginning), it is still possible the move can be started if all the
+        -- present refs are expired too and can be collected.
+        lref.gc()
+        -- GC could yield - need to refetch the clock again.
+        timeout = deadline - fiber_clock()
+        if M.ref_count > 0 then
+            if timeout < 0 then
+                goto fail
+            end
+            goto retry
+        end
+    end
+
+    if M.ref_queue > 0 and M.move_strike >= M.move_quota and
+       M.ref_quota > 0 then
+        ok, err = sched_wait_anything(timeout)
+        if not ok then
+            goto fail
+        end
+        timeout = deadline - fiber_clock()
+        goto retry
+    end
+
+    M.move_queue = M.move_queue - 1
+
+::success::
+    M.move_count = M.move_count + 1
+    M.move_strike = M.move_strike + 1
+    M.ref_strike = 0
+    do return timeout end
+
+::fail::
+    M.move_queue = M.move_queue - 1
+    return nil, err
+end
+
+local function sched_move_end(count)
+    count = M.move_count - count
+    M.move_count = count
+    if count == 0 and M.ref_queue > 0 then
+        M.cond:broadcast()
+    end
+end
+
+local function sched_cfg(cfg)
+    local new_ref_quota = cfg.sched_ref_quota
+    local new_move_quota = cfg.sched_move_quota
+
+    if new_ref_quota then
+        if new_ref_quota > M.ref_quota then
+            M.cond:broadcast()
+        end
+        M.ref_quota = new_ref_quota
+    end
+    if new_move_quota then
+        if new_move_quota > M.move_quota then
+            M.cond:broadcast()
+        end
+        M.move_quota = new_move_quota
+    end
+end
+
+M.ref_start = sched_ref_start
+M.ref_end = sched_ref_end
+M.move_start = sched_move_start
+M.move_end = sched_move_end
+M.cfg = sched_cfg
+lregistry.storage_sched = M
+
+return M
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list