From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH v2 10/11] vinyl: implement basic transaction throttling Date: Fri, 28 Sep 2018 20:40:08 +0300 Message-Id: <5993d650afef7cfb24d2953ed67311d2528d255f.1538155645.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: If the rate at which transactions are ready to write to the database is greater than the dump bandwidth, memory will get depleted before the previously scheduled dump is complete and all newer transactions will have to wait, which may take seconds or even minutes: W> waited for 555 bytes of vinyl memory quota for too long: 15.750 sec This patch set implements basic transaction throttling that is supposed to help avoid unpredictably long stalls. Now the transaction write rate is always capped by the observed dump bandwidth, because it doesn't make sense to consume memory at a greater rate than it can be freed. On top of that, when a dump begins, we estimate the amount of time it is going to take and limit the transaction write rate accordingly. Note, this patch doesn't take into account compaction when setting the rate limit so compaction threads may still fail to keep up with dumps, increasing the read amplification. It will be addressed later. Part of #1862 --- src/box/vy_quota.c | 44 ++++++++++++++++++- src/box/vy_quota.h | 78 +++++++++++++++++++++++++++++++++ src/box/vy_regulator.c | 30 +++++++++++++ test/vinyl/suite.ini | 2 +- test/vinyl/throttle.result | 102 +++++++++++++++++++++++++++++++++++++++++++ test/vinyl/throttle.test.lua | 54 +++++++++++++++++++++++ 6 files changed, 307 insertions(+), 3 deletions(-) create mode 100644 test/vinyl/throttle.result create mode 100644 test/vinyl/throttle.test.lua diff --git a/src/box/vy_quota.c b/src/box/vy_quota.c index 4b3527b4..ceac4878 100644 --- a/src/box/vy_quota.c +++ b/src/box/vy_quota.c @@ -43,6 +43,15 @@ #include "trivia/util.h" /** + * Quota timer period, in seconds. + * + * The timer is used for replenishing the rate limit value so + * its period defines how long throttled transactions will wait. + * Therefore use a relatively small period. + */ +static const double VY_QUOTA_TIMER_PERIOD = 0.1; + +/** * Return true if the requested amount of memory may be consumed * right now, false if consumers have to wait. */ @@ -53,6 +62,8 @@ vy_quota_may_use(struct vy_quota *q, size_t size) return true; if (q->used + size > q->limit) return false; + if (!vy_rate_limit_may_use(&q->rate_limit)) + return false; return true; } @@ -63,6 +74,7 @@ static inline void vy_quota_do_use(struct vy_quota *q, size_t size) { q->used += size; + vy_rate_limit_use(&q->rate_limit, size); } /** @@ -74,6 +86,7 @@ vy_quota_do_unuse(struct vy_quota *q, size_t size) { assert(q->used >= size); q->used -= size; + vy_rate_limit_unuse(&q->rate_limit, size); } /** @@ -106,6 +119,18 @@ vy_quota_signal(struct vy_quota *q) } } +static void +vy_quota_timer_cb(ev_loop *loop, ev_timer *timer, int events) +{ + (void)loop; + (void)events; + + struct vy_quota *q = timer->data; + + vy_rate_limit_refill(&q->rate_limit, VY_QUOTA_TIMER_PERIOD); + vy_quota_signal(q); +} + void vy_quota_create(struct vy_quota *q, size_t limit, vy_quota_exceeded_f quota_exceeded_cb) @@ -116,6 +141,9 @@ vy_quota_create(struct vy_quota *q, size_t limit, q->too_long_threshold = TIMEOUT_INFINITY; q->quota_exceeded_cb = quota_exceeded_cb; rlist_create(&q->wait_queue); + vy_rate_limit_create(&q->rate_limit); + ev_timer_init(&q->timer, vy_quota_timer_cb, 0, VY_QUOTA_TIMER_PERIOD); + q->timer.data = q; } void @@ -123,13 +151,14 @@ vy_quota_enable(struct vy_quota *q) { assert(!q->is_enabled); q->is_enabled = true; + ev_timer_start(loop(), &q->timer); vy_quota_check_limit(q); } void vy_quota_destroy(struct vy_quota *q) { - (void)q; + ev_timer_stop(loop(), &q->timer); } void @@ -141,6 +170,12 @@ vy_quota_set_limit(struct vy_quota *q, size_t limit) } void +vy_quota_set_rate_limit(struct vy_quota *q, size_t rate) +{ + vy_rate_limit_set(&q->rate_limit, rate); +} + +void vy_quota_force_use(struct vy_quota *q, size_t size) { vy_quota_do_use(q, size); @@ -150,7 +185,12 @@ vy_quota_force_use(struct vy_quota *q, size_t size) void vy_quota_release(struct vy_quota *q, size_t size) { - vy_quota_do_unuse(q, size); + /* + * Don't use vy_quota_do_unuse(), because it affects + * the rate limit state. + */ + assert(q->used >= size); + q->used -= size; vy_quota_signal(q); } diff --git a/src/box/vy_quota.h b/src/box/vy_quota.h index f249512b..79755e89 100644 --- a/src/box/vy_quota.h +++ b/src/box/vy_quota.h @@ -31,11 +31,14 @@ * SUCH DAMAGE. */ +#include #include #include #include #include +#include "trivia/util.h" + #if defined(__cplusplus) extern "C" { #endif /* defined(__cplusplus) */ @@ -43,6 +46,67 @@ extern "C" { struct fiber; struct vy_quota; +/** Rate limit state. */ +struct vy_rate_limit { + /** Max allowed rate, per second. */ + size_t rate; + /** Current quota. */ + ssize_t value; +}; + +/** Initialize a rate limit state. */ +static inline void +vy_rate_limit_create(struct vy_rate_limit *rl) +{ + rl->rate = SIZE_MAX; + rl->value = SSIZE_MAX; +} + +/** Set rate limit. */ +static inline void +vy_rate_limit_set(struct vy_rate_limit *rl, size_t rate) +{ + rl->rate = rate; +} + +/** + * Return true if quota may be consumed without exceeding + * the configured rate limit. + */ +static inline bool +vy_rate_limit_may_use(struct vy_rate_limit *rl) +{ + return rl->value > 0; +} + +/** Consume the given amount of quota. */ +static inline void +vy_rate_limit_use(struct vy_rate_limit *rl, size_t size) +{ + rl->value -= size; +} + +/** Release the given amount of quota. */ +static inline void +vy_rate_limit_unuse(struct vy_rate_limit *rl, size_t size) +{ + rl->value += size; +} + +/** + * Replenish quota by the amount accumulated for the given + * time interval. + */ +static inline void +vy_rate_limit_refill(struct vy_rate_limit *rl, double time) +{ + double size = rl->rate * time; + double value = rl->value + size; + /* Allow bursts up to 2x rate. */ + value = MIN(value, size * 2); + rl->value = MIN(value, SSIZE_MAX); +} + typedef void (*vy_quota_exceeded_f)(struct vy_quota *quota); @@ -85,6 +149,13 @@ struct vy_quota { * to the tail. */ struct rlist wait_queue; + /** Rate limit state. */ + struct vy_rate_limit rate_limit; + /** + * Periodic timer that is used for refilling the rate + * limit value. + */ + ev_timer timer; }; /** @@ -117,6 +188,13 @@ void vy_quota_set_limit(struct vy_quota *q, size_t limit); /** + * Set the max rate at which quota may be consumed, + * in bytes per second. + */ +void +vy_quota_set_rate_limit(struct vy_quota *q, size_t rate); + +/** * Consume @size bytes of memory. In contrast to vy_quota_use() * this function does not throttle the caller. */ diff --git a/src/box/vy_regulator.c b/src/box/vy_regulator.c index 682777fc..1e106fc4 100644 --- a/src/box/vy_regulator.c +++ b/src/box/vy_regulator.c @@ -76,6 +76,24 @@ vy_regulator_trigger_dump(struct vy_regulator *regulator) return; regulator->dump_in_progress = true; + + /* + * To avoid unpredictably long stalls, we must limit + * the write rate when a dump is in progress so that + * we don't hit the hard limit before the dump has + * completed, i.e. + * + * mem_left mem_used + * ---------- >= -------------- + * write_rate dump_bandwidth + */ + struct vy_quota *quota = regulator->quota; + size_t mem_left = (quota->used < quota->limit ? + quota->limit - quota->used : 0); + size_t mem_used = quota->used; + size_t max_write_rate = mem_left * regulator->dump_bw / (mem_used + 1); + max_write_rate = MIN(max_write_rate, regulator->dump_bw); + vy_quota_set_rate_limit(quota, max_write_rate); } static void @@ -172,6 +190,7 @@ void vy_regulator_start(struct vy_regulator *regulator) { ev_timer_start(loop(), ®ulator->timer); + vy_quota_set_rate_limit(regulator->quota, regulator->dump_bw); } void @@ -225,6 +244,16 @@ vy_regulator_dump_complete(struct vy_regulator *regulator, regulator->dump_bw = histogram_percentile_lower( regulator->dump_bw_hist, VY_DUMP_BW_PCT); } + + /* + * Reset the rate limit. + * + * It doesn't make sense to allow to consume memory at + * a higher rate than it can be dumped so we set the rate + * limit to the dump bandwidth rather than disabling it + * completely. + */ + vy_quota_set_rate_limit(regulator->quota, regulator->dump_bw); } void @@ -232,4 +261,5 @@ vy_regulator_reset_dump_bw(struct vy_regulator *regulator, size_t max) { histogram_reset(regulator->dump_bw_hist); regulator->dump_bw = MIN(VY_DUMP_BW_DEFAULT, max); + vy_quota_set_rate_limit(regulator->quota, regulator->dump_bw); } diff --git a/test/vinyl/suite.ini b/test/vinyl/suite.ini index b9dae380..785bc63d 100644 --- a/test/vinyl/suite.ini +++ b/test/vinyl/suite.ini @@ -6,5 +6,5 @@ release_disabled = errinj.test.lua errinj_gc.test.lua errinj_vylog.test.lua part config = suite.cfg lua_libs = suite.lua stress.lua large.lua txn_proxy.lua ../box/lua/utils.lua use_unix_sockets = True -long_run = stress.test.lua large.test.lua write_iterator_rand.test.lua dump_stress.test.lua select_consistency.test.lua +long_run = stress.test.lua large.test.lua write_iterator_rand.test.lua dump_stress.test.lua select_consistency.test.lua throttle.test.lua is_parallel = False diff --git a/test/vinyl/throttle.result b/test/vinyl/throttle.result new file mode 100644 index 00000000..628ee8a2 --- /dev/null +++ b/test/vinyl/throttle.result @@ -0,0 +1,102 @@ +-- +-- Basic test for transaction throttling. +-- +-- It checks that write transactions aren't stalled for long +-- due to hitting the memory limit, but instead are throttled +-- in advance. +-- +test_run = require('test_run').new() +--- +... +test_run:cmd("create server test with script='vinyl/low_quota.lua'") +--- +- true +... +test_run:cmd(string.format("start server test with args='%d'", 32 * 1024 * 1024)) +--- +- true +... +test_run:cmd('switch test') +--- +- true +... +fiber = require('fiber') +--- +... +digest = require('digest') +--- +... +box.cfg{snap_io_rate_limit = 8} +--- +... +FIBER_COUNT = 5 +--- +... +TUPLE_SIZE = 1000 +--- +... +TX_TUPLE_COUNT = 10 +--- +... +TX_SIZE = TUPLE_SIZE * TX_TUPLE_COUNT +--- +... +TX_COUNT = math.ceil(box.cfg.vinyl_memory / (TX_SIZE * FIBER_COUNT)) +--- +... +s = box.schema.space.create('test', {engine = 'vinyl'}) +--- +... +_ = s:create_index('primary', {parts = {1, 'unsigned', 2, 'unsigned', 3, 'unsigned'}}) +--- +... +latency = 0 +--- +... +c = fiber.channel(FIBER_COUNT) +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +for i = 1, FIBER_COUNT do + fiber.create(function() + for j = 1, TX_COUNT do + local t1 = fiber.time() + box.begin() + for k = 1, TX_TUPLE_COUNT do + s:replace{i, j, k, digest.urandom(TUPLE_SIZE)} + end + box.commit() + local t2 = fiber.time() + latency = math.max(latency, t2 - t1) + end + c:put(true) + end) +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +for i = 1, FIBER_COUNT do c:get() end +--- +... +latency < 0.2 or latency +--- +- true +... +test_run:cmd('switch default') +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +test_run:cmd("cleanup server test") +--- +- true +... diff --git a/test/vinyl/throttle.test.lua b/test/vinyl/throttle.test.lua new file mode 100644 index 00000000..06548a08 --- /dev/null +++ b/test/vinyl/throttle.test.lua @@ -0,0 +1,54 @@ +-- +-- Basic test for transaction throttling. +-- +-- It checks that write transactions aren't stalled for long +-- due to hitting the memory limit, but instead are throttled +-- in advance. +-- +test_run = require('test_run').new() +test_run:cmd("create server test with script='vinyl/low_quota.lua'") +test_run:cmd(string.format("start server test with args='%d'", 32 * 1024 * 1024)) +test_run:cmd('switch test') + +fiber = require('fiber') +digest = require('digest') + +box.cfg{snap_io_rate_limit = 8} + +FIBER_COUNT = 5 +TUPLE_SIZE = 1000 +TX_TUPLE_COUNT = 10 +TX_SIZE = TUPLE_SIZE * TX_TUPLE_COUNT +TX_COUNT = math.ceil(box.cfg.vinyl_memory / (TX_SIZE * FIBER_COUNT)) + +s = box.schema.space.create('test', {engine = 'vinyl'}) +_ = s:create_index('primary', {parts = {1, 'unsigned', 2, 'unsigned', 3, 'unsigned'}}) + +latency = 0 +c = fiber.channel(FIBER_COUNT) + +test_run:cmd("setopt delimiter ';'") +for i = 1, FIBER_COUNT do + fiber.create(function() + for j = 1, TX_COUNT do + local t1 = fiber.time() + box.begin() + for k = 1, TX_TUPLE_COUNT do + s:replace{i, j, k, digest.urandom(TUPLE_SIZE)} + end + box.commit() + local t2 = fiber.time() + latency = math.max(latency, t2 - t1) + end + c:put(true) + end) +end; +test_run:cmd("setopt delimiter ''"); + +for i = 1, FIBER_COUNT do c:get() end + +latency < 0.2 or latency + +test_run:cmd('switch default') +test_run:cmd("stop server test") +test_run:cmd("cleanup server test") -- 2.11.0