[PATCH v2 10/11] vinyl: implement basic transaction throttling

Vladimir Davydov vdavydov.dev at gmail.com
Fri Sep 28 20:40:08 MSK 2018


If the rate at which transactions are ready to write to the database is
greater than the dump bandwidth, memory will get depleted before the
previously scheduled dump is complete and all newer transactions will
have to wait, which may take seconds or even minutes:

  W> waited for 555 bytes of vinyl memory quota for too long: 15.750 sec

This patch set implements basic transaction throttling that is supposed
to help avoid unpredictably long stalls. Now the transaction write rate
is always capped by the observed dump bandwidth, because it doesn't make
sense to consume memory at a greater rate than it can be freed. On top
of that, when a dump begins, we estimate the amount of time it is going
to take and limit the transaction write rate accordingly.

Note, this patch doesn't take into account compaction when setting the
rate limit so compaction threads may still fail to keep up with dumps,
increasing the read amplification. It will be addressed later.

Part of #1862
---
 src/box/vy_quota.c           |  44 ++++++++++++++++++-
 src/box/vy_quota.h           |  78 +++++++++++++++++++++++++++++++++
 src/box/vy_regulator.c       |  30 +++++++++++++
 test/vinyl/suite.ini         |   2 +-
 test/vinyl/throttle.result   | 102 +++++++++++++++++++++++++++++++++++++++++++
 test/vinyl/throttle.test.lua |  54 +++++++++++++++++++++++
 6 files changed, 307 insertions(+), 3 deletions(-)
 create mode 100644 test/vinyl/throttle.result
 create mode 100644 test/vinyl/throttle.test.lua

diff --git a/src/box/vy_quota.c b/src/box/vy_quota.c
index 4b3527b4..ceac4878 100644
--- a/src/box/vy_quota.c
+++ b/src/box/vy_quota.c
@@ -43,6 +43,15 @@
 #include "trivia/util.h"
 
 /**
+ * Quota timer period, in seconds.
+ *
+ * The timer is used for replenishing the rate limit value so
+ * its period defines how long throttled transactions will wait.
+ * Therefore use a relatively small period.
+ */
+static const double VY_QUOTA_TIMER_PERIOD = 0.1;
+
+/**
  * Return true if the requested amount of memory may be consumed
  * right now, false if consumers have to wait.
  */
@@ -53,6 +62,8 @@ vy_quota_may_use(struct vy_quota *q, size_t size)
 		return true;
 	if (q->used + size > q->limit)
 		return false;
+	if (!vy_rate_limit_may_use(&q->rate_limit))
+		return false;
 	return true;
 }
 
@@ -63,6 +74,7 @@ static inline void
 vy_quota_do_use(struct vy_quota *q, size_t size)
 {
 	q->used += size;
+	vy_rate_limit_use(&q->rate_limit, size);
 }
 
 /**
@@ -74,6 +86,7 @@ vy_quota_do_unuse(struct vy_quota *q, size_t size)
 {
 	assert(q->used >= size);
 	q->used -= size;
+	vy_rate_limit_unuse(&q->rate_limit, size);
 }
 
 /**
@@ -106,6 +119,18 @@ vy_quota_signal(struct vy_quota *q)
 	}
 }
 
+static void
+vy_quota_timer_cb(ev_loop *loop, ev_timer *timer, int events)
+{
+	(void)loop;
+	(void)events;
+
+	struct vy_quota *q = timer->data;
+
+	vy_rate_limit_refill(&q->rate_limit, VY_QUOTA_TIMER_PERIOD);
+	vy_quota_signal(q);
+}
+
 void
 vy_quota_create(struct vy_quota *q, size_t limit,
 		vy_quota_exceeded_f quota_exceeded_cb)
@@ -116,6 +141,9 @@ vy_quota_create(struct vy_quota *q, size_t limit,
 	q->too_long_threshold = TIMEOUT_INFINITY;
 	q->quota_exceeded_cb = quota_exceeded_cb;
 	rlist_create(&q->wait_queue);
+	vy_rate_limit_create(&q->rate_limit);
+	ev_timer_init(&q->timer, vy_quota_timer_cb, 0, VY_QUOTA_TIMER_PERIOD);
+	q->timer.data = q;
 }
 
 void
@@ -123,13 +151,14 @@ vy_quota_enable(struct vy_quota *q)
 {
 	assert(!q->is_enabled);
 	q->is_enabled = true;
+	ev_timer_start(loop(), &q->timer);
 	vy_quota_check_limit(q);
 }
 
 void
 vy_quota_destroy(struct vy_quota *q)
 {
-	(void)q;
+	ev_timer_stop(loop(), &q->timer);
 }
 
 void
@@ -141,6 +170,12 @@ vy_quota_set_limit(struct vy_quota *q, size_t limit)
 }
 
 void
+vy_quota_set_rate_limit(struct vy_quota *q, size_t rate)
+{
+	vy_rate_limit_set(&q->rate_limit, rate);
+}
+
+void
 vy_quota_force_use(struct vy_quota *q, size_t size)
 {
 	vy_quota_do_use(q, size);
@@ -150,7 +185,12 @@ vy_quota_force_use(struct vy_quota *q, size_t size)
 void
 vy_quota_release(struct vy_quota *q, size_t size)
 {
-	vy_quota_do_unuse(q, size);
+	/*
+	 * Don't use vy_quota_do_unuse(), because it affects
+	 * the rate limit state.
+	 */
+	assert(q->used >= size);
+	q->used -= size;
 	vy_quota_signal(q);
 }
 
diff --git a/src/box/vy_quota.h b/src/box/vy_quota.h
index f249512b..79755e89 100644
--- a/src/box/vy_quota.h
+++ b/src/box/vy_quota.h
@@ -31,11 +31,14 @@
  * SUCH DAMAGE.
  */
 
+#include <limits.h>
 #include <stdbool.h>
 #include <stddef.h>
 #include <small/rlist.h>
 #include <tarantool_ev.h>
 
+#include "trivia/util.h"
+
 #if defined(__cplusplus)
 extern "C" {
 #endif /* defined(__cplusplus) */
@@ -43,6 +46,67 @@ extern "C" {
 struct fiber;
 struct vy_quota;
 
+/** Rate limit state. */
+struct vy_rate_limit {
+	/** Max allowed rate, per second. */
+	size_t rate;
+	/** Current quota. */
+	ssize_t value;
+};
+
+/** Initialize a rate limit state. */
+static inline void
+vy_rate_limit_create(struct vy_rate_limit *rl)
+{
+	rl->rate = SIZE_MAX;
+	rl->value = SSIZE_MAX;
+}
+
+/** Set rate limit. */
+static inline void
+vy_rate_limit_set(struct vy_rate_limit *rl, size_t rate)
+{
+	rl->rate = rate;
+}
+
+/**
+ * Return true if quota may be consumed without exceeding
+ * the configured rate limit.
+ */
+static inline bool
+vy_rate_limit_may_use(struct vy_rate_limit *rl)
+{
+	return rl->value > 0;
+}
+
+/** Consume the given amount of quota. */
+static inline void
+vy_rate_limit_use(struct vy_rate_limit *rl, size_t size)
+{
+	rl->value -= size;
+}
+
+/** Release the given amount of quota. */
+static inline void
+vy_rate_limit_unuse(struct vy_rate_limit *rl, size_t size)
+{
+	rl->value += size;
+}
+
+/**
+ * Replenish quota by the amount accumulated for the given
+ * time interval.
+ */
+static inline void
+vy_rate_limit_refill(struct vy_rate_limit *rl, double time)
+{
+	double size = rl->rate * time;
+	double value = rl->value + size;
+	/* Allow bursts up to 2x rate. */
+	value = MIN(value, size * 2);
+	rl->value = MIN(value, SSIZE_MAX);
+}
+
 typedef void
 (*vy_quota_exceeded_f)(struct vy_quota *quota);
 
@@ -85,6 +149,13 @@ struct vy_quota {
 	 * to the tail.
 	 */
 	struct rlist wait_queue;
+	/** Rate limit state. */
+	struct vy_rate_limit rate_limit;
+	/**
+	 * Periodic timer that is used for refilling the rate
+	 * limit value.
+	 */
+	ev_timer timer;
 };
 
 /**
@@ -117,6 +188,13 @@ void
 vy_quota_set_limit(struct vy_quota *q, size_t limit);
 
 /**
+ * Set the max rate at which quota may be consumed,
+ * in bytes per second.
+ */
+void
+vy_quota_set_rate_limit(struct vy_quota *q, size_t rate);
+
+/**
  * Consume @size bytes of memory. In contrast to vy_quota_use()
  * this function does not throttle the caller.
  */
diff --git a/src/box/vy_regulator.c b/src/box/vy_regulator.c
index 682777fc..1e106fc4 100644
--- a/src/box/vy_regulator.c
+++ b/src/box/vy_regulator.c
@@ -76,6 +76,24 @@ vy_regulator_trigger_dump(struct vy_regulator *regulator)
 		return;
 
 	regulator->dump_in_progress = true;
+
+	/*
+	 * To avoid unpredictably long stalls, we must limit
+	 * the write rate when a dump is in progress so that
+	 * we don't hit the hard limit before the dump has
+	 * completed, i.e.
+	 *
+	 *    mem_left        mem_used
+	 *   ---------- >= --------------
+	 *   write_rate    dump_bandwidth
+	 */
+	struct vy_quota *quota = regulator->quota;
+	size_t mem_left = (quota->used < quota->limit ?
+			   quota->limit - quota->used : 0);
+	size_t mem_used = quota->used;
+	size_t max_write_rate = mem_left * regulator->dump_bw / (mem_used + 1);
+	max_write_rate = MIN(max_write_rate, regulator->dump_bw);
+	vy_quota_set_rate_limit(quota, max_write_rate);
 }
 
 static void
@@ -172,6 +190,7 @@ void
 vy_regulator_start(struct vy_regulator *regulator)
 {
 	ev_timer_start(loop(), &regulator->timer);
+	vy_quota_set_rate_limit(regulator->quota, regulator->dump_bw);
 }
 
 void
@@ -225,6 +244,16 @@ vy_regulator_dump_complete(struct vy_regulator *regulator,
 		regulator->dump_bw = histogram_percentile_lower(
 			regulator->dump_bw_hist, VY_DUMP_BW_PCT);
 	}
+
+	/*
+	 * Reset the rate limit.
+	 *
+	 * It doesn't make sense to allow to consume memory at
+	 * a higher rate than it can be dumped so we set the rate
+	 * limit to the dump bandwidth rather than disabling it
+	 * completely.
+	 */
+	vy_quota_set_rate_limit(regulator->quota, regulator->dump_bw);
 }
 
 void
@@ -232,4 +261,5 @@ vy_regulator_reset_dump_bw(struct vy_regulator *regulator, size_t max)
 {
 	histogram_reset(regulator->dump_bw_hist);
 	regulator->dump_bw = MIN(VY_DUMP_BW_DEFAULT, max);
+	vy_quota_set_rate_limit(regulator->quota, regulator->dump_bw);
 }
diff --git a/test/vinyl/suite.ini b/test/vinyl/suite.ini
index b9dae380..785bc63d 100644
--- a/test/vinyl/suite.ini
+++ b/test/vinyl/suite.ini
@@ -6,5 +6,5 @@ release_disabled = errinj.test.lua errinj_gc.test.lua errinj_vylog.test.lua part
 config = suite.cfg
 lua_libs = suite.lua stress.lua large.lua txn_proxy.lua ../box/lua/utils.lua
 use_unix_sockets = True
-long_run = stress.test.lua large.test.lua write_iterator_rand.test.lua dump_stress.test.lua select_consistency.test.lua
+long_run = stress.test.lua large.test.lua write_iterator_rand.test.lua dump_stress.test.lua select_consistency.test.lua throttle.test.lua
 is_parallel = False
diff --git a/test/vinyl/throttle.result b/test/vinyl/throttle.result
new file mode 100644
index 00000000..628ee8a2
--- /dev/null
+++ b/test/vinyl/throttle.result
@@ -0,0 +1,102 @@
+--
+-- Basic test for transaction throttling.
+--
+-- It checks that write transactions aren't stalled for long
+-- due to hitting the memory limit, but instead are throttled
+-- in advance.
+--
+test_run = require('test_run').new()
+---
+...
+test_run:cmd("create server test with script='vinyl/low_quota.lua'")
+---
+- true
+...
+test_run:cmd(string.format("start server test with args='%d'", 32 * 1024 * 1024))
+---
+- true
+...
+test_run:cmd('switch test')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+digest = require('digest')
+---
+...
+box.cfg{snap_io_rate_limit = 8}
+---
+...
+FIBER_COUNT = 5
+---
+...
+TUPLE_SIZE = 1000
+---
+...
+TX_TUPLE_COUNT = 10
+---
+...
+TX_SIZE = TUPLE_SIZE * TX_TUPLE_COUNT
+---
+...
+TX_COUNT = math.ceil(box.cfg.vinyl_memory / (TX_SIZE * FIBER_COUNT))
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('primary', {parts = {1, 'unsigned', 2, 'unsigned', 3, 'unsigned'}})
+---
+...
+latency = 0
+---
+...
+c = fiber.channel(FIBER_COUNT)
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for i = 1, FIBER_COUNT do
+    fiber.create(function()
+        for j = 1, TX_COUNT do
+            local t1 = fiber.time()
+            box.begin()
+            for k = 1, TX_TUPLE_COUNT do
+                s:replace{i, j, k, digest.urandom(TUPLE_SIZE)}
+            end
+            box.commit()
+            local t2 = fiber.time()
+            latency = math.max(latency, t2 - t1)
+        end
+        c:put(true)
+    end)
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+for i = 1, FIBER_COUNT do c:get() end
+---
+...
+latency < 0.2 or latency
+---
+- true
+...
+test_run:cmd('switch default')
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("cleanup server test")
+---
+- true
+...
diff --git a/test/vinyl/throttle.test.lua b/test/vinyl/throttle.test.lua
new file mode 100644
index 00000000..06548a08
--- /dev/null
+++ b/test/vinyl/throttle.test.lua
@@ -0,0 +1,54 @@
+--
+-- Basic test for transaction throttling.
+--
+-- It checks that write transactions aren't stalled for long
+-- due to hitting the memory limit, but instead are throttled
+-- in advance.
+--
+test_run = require('test_run').new()
+test_run:cmd("create server test with script='vinyl/low_quota.lua'")
+test_run:cmd(string.format("start server test with args='%d'", 32 * 1024 * 1024))
+test_run:cmd('switch test')
+
+fiber = require('fiber')
+digest = require('digest')
+
+box.cfg{snap_io_rate_limit = 8}
+
+FIBER_COUNT = 5
+TUPLE_SIZE = 1000
+TX_TUPLE_COUNT = 10
+TX_SIZE = TUPLE_SIZE * TX_TUPLE_COUNT
+TX_COUNT = math.ceil(box.cfg.vinyl_memory / (TX_SIZE * FIBER_COUNT))
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('primary', {parts = {1, 'unsigned', 2, 'unsigned', 3, 'unsigned'}})
+
+latency = 0
+c = fiber.channel(FIBER_COUNT)
+
+test_run:cmd("setopt delimiter ';'")
+for i = 1, FIBER_COUNT do
+    fiber.create(function()
+        for j = 1, TX_COUNT do
+            local t1 = fiber.time()
+            box.begin()
+            for k = 1, TX_TUPLE_COUNT do
+                s:replace{i, j, k, digest.urandom(TUPLE_SIZE)}
+            end
+            box.commit()
+            local t2 = fiber.time()
+            latency = math.max(latency, t2 - t1)
+        end
+        c:put(true)
+    end)
+end;
+test_run:cmd("setopt delimiter ''");
+
+for i = 1, FIBER_COUNT do c:get() end
+
+latency < 0.2 or latency
+
+test_run:cmd('switch default')
+test_run:cmd("stop server test")
+test_run:cmd("cleanup server test")
-- 
2.11.0




More information about the Tarantool-patches mailing list