[PATCH 09/11] vinyl: implement basic transaction throttling

Vladimir Davydov vdavydov.dev at gmail.com
Thu Sep 20 12:34:14 MSK 2018


If the rate at which transactions are ready to write to the database is
greater than the dump bandwidth, memory will get depleted before the
last dump is complete and all newer transactions will have to wait,
which may take seconds or even minutes:

  W> waited for 555 bytes of vinyl memory quota for too long: 15.750 sec

This patch set implements basic transaction throttling that is supposed
to help avoid unpredictably long stalls. Now transaction write rate is
always limited by observed dump bandwidth, because it doesn't make sense
to consume memory at a greater rate than it can be freed. On top of
that, when a dump begins, we estimate the amount of time it is going to
take and limit the transaction write rate appropriately.

Note, this patch doesn't take into account compaction when setting the
rate limit so compaction threads may still fail to keep up with dumps,
increasing read amplification. It will be addressed later.

Part of #1862
---
 src/box/vy_quota.c           |  76 +++++++++++++++++++++++++++++++-
 src/box/vy_quota.h           |  21 ++++++++-
 test/vinyl/suite.ini         |   2 +-
 test/vinyl/throttle.result   | 102 +++++++++++++++++++++++++++++++++++++++++++
 test/vinyl/throttle.test.lua |  54 +++++++++++++++++++++++
 5 files changed, 252 insertions(+), 3 deletions(-)
 create mode 100644 test/vinyl/throttle.result
 create mode 100644 test/vinyl/throttle.test.lua

diff --git a/src/box/vy_quota.c b/src/box/vy_quota.c
index def05aa2..18cf4f35 100644
--- a/src/box/vy_quota.c
+++ b/src/box/vy_quota.c
@@ -67,6 +67,40 @@ static const size_t VY_DEFAULT_DUMP_BANDWIDTH = 10 * 1024 * 1024;
  */
 enum { VY_DUMP_BANDWIDTH_PCT = 10 };
 
+static inline void
+vy_quota_rl_set(struct vy_quota_rl *rl, size_t rate)
+{
+	/* Sanity check: never limit rate below 100 KB/s. */
+	rl->rate = MAX(rate, 100U * 1024);
+}
+
+static inline bool
+vy_quota_rl_may_use(struct vy_quota_rl *rl)
+{
+	return rl->value > 0;
+}
+
+static inline void
+vy_quota_rl_use(struct vy_quota_rl *rl, size_t size)
+{
+	rl->value -= size;
+}
+
+static inline void
+vy_quota_rl_unuse(struct vy_quota_rl *rl, size_t size)
+{
+	rl->value += size;
+}
+
+static inline void
+vy_quota_rl_refill(struct vy_quota_rl *rl)
+{
+	ssize_t size = rl->rate * VY_QUOTA_UPDATE_INTERVAL;
+	rl->value += size;
+	/* Allow bursts up to 2x rate. */
+	rl->value = MIN(rl->value, size * 2);
+}
+
 /**
  * Trigger memory dump unless it is already in progress.
  */
@@ -78,6 +112,22 @@ vy_quota_trigger_dump(struct vy_quota *q)
 		return;
 	if (q->trigger_dump_cb(q) != 0)
 		return;
+
+	/*
+	 * To avoid unpredictably long stalls, we must limit
+	 * the write rate when a dump is in progress so that
+	 * we don't hit the hard limit before the dump has
+	 * completed, i.e.
+	 *
+	 *   quota_left       dump_size
+	 *   ---------- >= --------------
+	 *    use_rate     dump_bandwidth
+	 */
+	size_t quota_left = q->used < q->limit ? q->limit - q->used : 0;
+	size_t max_use_rate = quota_left * q->dump_bw / (q->used + 1);
+	if (q->rl.rate > max_use_rate)
+		vy_quota_rl_set(&q->rl, max_use_rate);
+
 	q->dump_in_progress = true;
 }
 
@@ -124,6 +174,12 @@ vy_quota_timer_cb(ev_loop *loop, ev_timer *timer, int events)
 	q->watermark = ((double)q->limit * q->dump_bw /
 			(q->dump_bw + q->use_rate + 1));
 	vy_quota_check_watermark(q);
+
+	/*
+	 * Replenish quota and wake up throttled fibers, if any.
+	 */
+	vy_quota_rl_refill(&q->rl);
+	fiber_cond_signal(&q->cond);
 }
 
 int
@@ -170,6 +226,8 @@ vy_quota_enable(struct vy_quota *q)
 	assert(!q->is_enabled);
 	q->is_enabled = true;
 	q->use_curr = 0;
+	vy_quota_rl_set(&q->rl, q->dump_bw);
+	vy_quota_rl_refill(&q->rl);
 	ev_timer_start(loop(), &q->timer);
 	vy_quota_check_watermark(q);
 }
@@ -227,6 +285,16 @@ vy_quota_dump(struct vy_quota *q, size_t size, double duration)
 		q->dump_bw = histogram_percentile_lower(q->dump_bw_hist,
 							VY_DUMP_BANDWIDTH_PCT);
 	}
+
+	/*
+	 * Reset the rate limit.
+	 *
+	 * It doesn't make sense to allow to consume memory at
+	 * a higher rate than it can be dumped so we set the rate
+	 * limit to the dump bandwidth rather than disabling it
+	 * completely.
+	 */
+	vy_quota_rl_set(&q->rl, q->dump_bw);
 }
 
 void
@@ -241,6 +309,7 @@ vy_quota_force_use(struct vy_quota *q, size_t size)
 {
 	q->used += size;
 	q->use_curr += size;
+	vy_quota_rl_use(&q->rl, size);
 	vy_quota_check_watermark(q);
 }
 
@@ -253,7 +322,11 @@ vy_quota_may_use(struct vy_quota *q, size_t size)
 {
 	if (!q->is_enabled)
 		return true;
-	return q->used + size <= q->limit;
+	if (q->used + size > q->limit)
+		return false;
+	if (!vy_quota_rl_may_use(&q->rl))
+		return false;
+	return true;
 }
 
 int
@@ -293,6 +366,7 @@ vy_quota_unuse(struct vy_quota *q, size_t size)
 	q->used -= size;
 	/* use_curr could have been reset on timeout. */
 	q->use_curr -= MIN(q->use_curr, size);
+	vy_quota_rl_unuse(&q->rl, size);
 	fiber_cond_signal(&q->cond);
 }
 
diff --git a/src/box/vy_quota.h b/src/box/vy_quota.h
index 793a4430..3070ff82 100644
--- a/src/box/vy_quota.h
+++ b/src/box/vy_quota.h
@@ -47,6 +47,20 @@ typedef int
 (*vy_quota_trigger_dump_f)(struct vy_quota *quota);
 
 /**
+ * Rate limiter state.
+ */
+struct vy_quota_rl {
+	/** Max allowed use rate, in bytes per second. */
+	size_t rate;
+	/**
+	 * Amount of quota that can be used by transactions.
+	 * Refilled by the timer callback in accordance with
+	 * the configured rate.
+	 */
+	ssize_t value;
+};
+
+/**
  * Quota used for accounting and limiting memory consumption
  * in the vinyl engine. It is NOT multi-threading safe.
  */
@@ -76,7 +90,10 @@ struct vy_quota {
 	 * there is no quota left.
 	 */
 	struct fiber_cond cond;
-	/** Timer for updating quota watermark. */
+	/**
+	 * Timer used for updating average use rate, calculating
+	 * quota watermark and refilling rate limit value.
+	 */
 	ev_timer timer;
 	/**
 	 * Amount of quota used since the last
@@ -89,6 +106,8 @@ struct vy_quota {
 	 * moving average of use_curr.
 	 */
 	size_t use_rate;
+	/** Rate limiter. */
+	struct vy_quota_rl rl;
 	/**
 	 * Called when quota is consumed if used >= watermark.
 	 * It is supposed to trigger memory dump and return 0
diff --git a/test/vinyl/suite.ini b/test/vinyl/suite.ini
index b9dae380..785bc63d 100644
--- a/test/vinyl/suite.ini
+++ b/test/vinyl/suite.ini
@@ -6,5 +6,5 @@ release_disabled = errinj.test.lua errinj_gc.test.lua errinj_vylog.test.lua part
 config = suite.cfg
 lua_libs = suite.lua stress.lua large.lua txn_proxy.lua ../box/lua/utils.lua
 use_unix_sockets = True
-long_run = stress.test.lua large.test.lua write_iterator_rand.test.lua dump_stress.test.lua select_consistency.test.lua
+long_run = stress.test.lua large.test.lua write_iterator_rand.test.lua dump_stress.test.lua select_consistency.test.lua throttle.test.lua
 is_parallel = False
diff --git a/test/vinyl/throttle.result b/test/vinyl/throttle.result
new file mode 100644
index 00000000..7e84e496
--- /dev/null
+++ b/test/vinyl/throttle.result
@@ -0,0 +1,102 @@
+--
+-- Basic test for transaction throttling.
+--
+-- It checks that write transactions aren't stalled for long
+-- due to hitting the memory limit, but instead are throttled
+-- in advance.
+--
+test_run = require('test_run').new()
+---
+...
+test_run:cmd("create server test with script='vinyl/low_quota.lua'")
+---
+- true
+...
+test_run:cmd(string.format("start server test with args='%d'", 32 * 1024 * 1024))
+---
+- true
+...
+test_run:cmd('switch test')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+digest = require('digest')
+---
+...
+box.cfg{snap_io_rate_limit = 4}
+---
+...
+FIBER_COUNT = 5
+---
+...
+TUPLE_SIZE = 1000
+---
+...
+TX_TUPLE_COUNT = 10
+---
+...
+TX_SIZE = TUPLE_SIZE * TX_TUPLE_COUNT
+---
+...
+TX_COUNT = math.ceil(box.cfg.vinyl_memory / (TX_SIZE * FIBER_COUNT))
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('primary', {parts = {1, 'unsigned', 2, 'unsigned', 3, 'unsigned'}})
+---
+...
+latency = 0
+---
+...
+c = fiber.channel(FIBER_COUNT)
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for i = 1, FIBER_COUNT do
+    fiber.create(function()
+        for j = 1, TX_COUNT do
+            local t1 = fiber.time()
+            box.begin()
+            for k = 1, TX_TUPLE_COUNT do
+                s:replace{i, j, k, digest.urandom(TUPLE_SIZE)}
+            end
+            box.commit()
+            local t2 = fiber.time()
+            latency = math.max(latency, t2 - t1)
+        end
+        c:put(true)
+    end)
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+for i = 1, FIBER_COUNT do c:get() end
+---
+...
+latency < 0.2 or latency
+---
+- true
+...
+test_run:cmd('switch default')
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("cleanup server test")
+---
+- true
+...
diff --git a/test/vinyl/throttle.test.lua b/test/vinyl/throttle.test.lua
new file mode 100644
index 00000000..130c8fcd
--- /dev/null
+++ b/test/vinyl/throttle.test.lua
@@ -0,0 +1,54 @@
+--
+-- Basic test for transaction throttling.
+--
+-- It checks that write transactions aren't stalled for long
+-- due to hitting the memory limit, but instead are throttled
+-- in advance.
+--
+test_run = require('test_run').new()
+test_run:cmd("create server test with script='vinyl/low_quota.lua'")
+test_run:cmd(string.format("start server test with args='%d'", 32 * 1024 * 1024))
+test_run:cmd('switch test')
+
+fiber = require('fiber')
+digest = require('digest')
+
+box.cfg{snap_io_rate_limit = 4}
+
+FIBER_COUNT = 5
+TUPLE_SIZE = 1000
+TX_TUPLE_COUNT = 10
+TX_SIZE = TUPLE_SIZE * TX_TUPLE_COUNT
+TX_COUNT = math.ceil(box.cfg.vinyl_memory / (TX_SIZE * FIBER_COUNT))
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('primary', {parts = {1, 'unsigned', 2, 'unsigned', 3, 'unsigned'}})
+
+latency = 0
+c = fiber.channel(FIBER_COUNT)
+
+test_run:cmd("setopt delimiter ';'")
+for i = 1, FIBER_COUNT do
+    fiber.create(function()
+        for j = 1, TX_COUNT do
+            local t1 = fiber.time()
+            box.begin()
+            for k = 1, TX_TUPLE_COUNT do
+                s:replace{i, j, k, digest.urandom(TUPLE_SIZE)}
+            end
+            box.commit()
+            local t2 = fiber.time()
+            latency = math.max(latency, t2 - t1)
+        end
+        c:put(true)
+    end)
+end;
+test_run:cmd("setopt delimiter ''");
+
+for i = 1, FIBER_COUNT do c:get() end
+
+latency < 0.2 or latency
+
+test_run:cmd('switch default')
+test_run:cmd("stop server test")
+test_run:cmd("cleanup server test")
-- 
2.11.0




More information about the Tarantool-patches mailing list