[PATCH 11/11] vinyl: split quota consumption rate limit into soft and hard

Vladimir Davydov vdavydov.dev at gmail.com
Thu Sep 20 12:34:16 MSK 2018


Currently, we only limit quota consumption rate so that writers won't
hit the hard limit before memory dump is complete. However, it isn't
enough, because we also need to consider compaction: if it doesn't keep
up with dumps, read and space amplification will grow uncontrollably.

The problem is compaction may be a quota consumer by itself, as it may
generate deferred DELETE statements for secondary indexes. We can't
ignore quota completely there, because if we do, we may hit the memory
limit and stall all writers, which is unacceptable, but we do want to
ignore the rate limit imposed to make sure that compaction keeps up with
dumps, otherwise compaction won't benefit from such a throttling.

So this patch splits the rate limit in two, soft and hard, and
introduces the concept of quota consumer priorities. There are two
priorities, low and high. High priority consumers (compaction) respect
only the hard limit while low priority consumers (the rest of the world)
observe both limits. Currently, both limits are always equal, but when
compaction-aware throttling is introduced, the hard limit will be set
solely to make sure that writers won't stall due to pending dump while
the soft limit will also depend on the compaction debt.

The tricky part here is waking up throttled consumers. The problem is we
have to maintain two wait queues to be able to wake up high priority
consumers even when soft rate limit is exceeded and hence all low
priority consumers have to wait. At the same time, we must preserve
fairness between the two queues to make sure that high priority
consumers don't uncontrollably throttle low priority ones, in other
words if the soft rate limit isn't breached, then the two queues should
work like one. To resolve this issue, we assign monotonically growing
timestamps to throttled fibers and always wake up the one with the
lowest timestamp (i.e. the one that has waited most), no matter if it's
low or high priority.
---
 src/box/vinyl.c    |  31 ++++++++++------
 src/box/vy_quota.c | 106 +++++++++++++++++++++++++++++++++++++++--------------
 src/box/vy_quota.h |  53 +++++++++++++++++++++++----
 3 files changed, 145 insertions(+), 45 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 4c1e860d..2c74df89 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2346,7 +2346,8 @@ vinyl_engine_prepare(struct engine *engine, struct txn *txn)
 	 * the transaction to be sent to read view or aborted, we call
 	 * it before checking for conflicts.
 	 */
-	if (vy_quota_use(&env->quota, tx->write_size, timeout) != 0) {
+	if (vy_quota_use(&env->quota, tx->write_size,
+			 VY_QUOTA_CONSUMER_DEFAULT, timeout) != 0) {
 		diag_set(ClientError, ER_VY_QUOTA_TIMEOUT);
 		return -1;
 	}
@@ -2358,7 +2359,8 @@ vinyl_engine_prepare(struct engine *engine, struct txn *txn)
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
 	vy_quota_adjust(&env->quota, tx->write_size,
-			mem_used_after - mem_used_before);
+			mem_used_after - mem_used_before,
+			VY_QUOTA_CONSUMER_DEFAULT);
 	return rc;
 }
 
@@ -2382,7 +2384,8 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
 	/* We can't abort the transaction at this point, use force. */
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before,
+			   VY_QUOTA_CONSUMER_DEFAULT);
 
 	txn->engine_tx = NULL;
 	if (!txn->is_autocommit)
@@ -3160,7 +3163,8 @@ vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
 	 * quota accounting.
 	 */
 	size_t reserved = tx->write_size;
-	if (vy_quota_use(&env->quota, reserved, TIMEOUT_INFINITY) != 0)
+	if (vy_quota_use(&env->quota, reserved, VY_QUOTA_CONSUMER_DEFAULT,
+			 TIMEOUT_INFINITY) != 0)
 		unreachable();
 
 	size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
@@ -3179,7 +3183,8 @@ vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
 	size_t used = mem_used_after - mem_used_before;
-	vy_quota_adjust(&env->quota, reserved, used);
+	vy_quota_adjust(&env->quota, reserved, used,
+			VY_QUOTA_CONSUMER_DEFAULT);
 	return rc;
 }
 
@@ -3500,7 +3505,8 @@ vy_squash_process(struct vy_squash *squash)
 		 */
 		vy_mem_commit_stmt(mem, region_stmt);
 		vy_quota_force_use(&env->quota,
-				   mem_used_after - mem_used_before);
+				   mem_used_after - mem_used_before,
+				   VY_QUOTA_CONSUMER_DEFAULT);
 	}
 	return rc;
 }
@@ -3974,8 +3980,9 @@ vy_build_insert_tuple(struct vy_env *env, struct vy_lsm *lsm,
 	/* Consume memory quota. Throttle if it is exceeded. */
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
-	vy_quota_wait(&env->quota);
+	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before,
+			   VY_QUOTA_CONSUMER_DEFAULT);
+	vy_quota_wait(&env->quota, VY_QUOTA_CONSUMER_DEFAULT);
 	return rc;
 }
 
@@ -4101,7 +4108,8 @@ vy_build_recover(struct vy_env *env, struct vy_lsm *lsm, struct vy_lsm *pk)
 
 	mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before,
+			   VY_QUOTA_CONSUMER_DEFAULT);
 	return rc;
 }
 
@@ -4317,7 +4325,7 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 	 */
 	struct vy_env *env = vy_env(space->engine);
 	if (is_first_statement)
-		vy_quota_wait(&env->quota);
+		vy_quota_wait(&env->quota, VY_QUOTA_CONSUMER_DEFAULT);
 
 	/* Create the deferred DELETE statement. */
 	struct vy_lsm *pk = vy_lsm(space->index[0]);
@@ -4404,7 +4412,8 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 	}
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before,
+			   VY_QUOTA_CONSUMER_DEFAULT);
 
 	tuple_unref(delete);
 	if (rc != 0)
diff --git a/src/box/vy_quota.c b/src/box/vy_quota.c
index 3374aacc..8e40ab88 100644
--- a/src/box/vy_quota.c
+++ b/src/box/vy_quota.c
@@ -125,8 +125,10 @@ vy_quota_trigger_dump(struct vy_quota *q)
 	 */
 	size_t quota_left = q->used < q->limit ? q->limit - q->used : 0;
 	size_t max_use_rate = quota_left * q->dump_bw / (q->used + 1);
-	if (q->rl.rate > max_use_rate)
-		vy_quota_rl_set(&q->rl, max_use_rate);
+	if (q->rl.soft.rate > max_use_rate)
+		vy_quota_rl_set(&q->rl.soft, max_use_rate);
+	if (q->rl.hard.rate > max_use_rate)
+		vy_quota_rl_set(&q->rl.hard, max_use_rate);
 
 	q->dump_in_progress = true;
 }
@@ -147,13 +149,45 @@ vy_quota_check_watermark(struct vy_quota *q)
 static void
 vy_quota_signal(struct vy_quota *q)
 {
-	if (!rlist_empty(&q->wait_queue)) {
-		struct vy_quota_wait_node *n;
-		n = rlist_first_entry(&q->wait_queue,
-				      struct vy_quota_wait_node,
-				      in_wait_queue);
-		fiber_wakeup(n->fiber);
+	/*
+	 * If hard rate limit is exceeded, neither low nor high
+	 * priority consumers may proceed so no point in waking
+	 * anyone.
+	 */
+	if (!vy_quota_rl_may_use(&q->rl.hard))
+		return;
+
+	struct vy_quota_wait_node *n1 = NULL;
+	struct vy_quota_wait_node *n2 = NULL;
+
+	/*
+	 * Wake up low priority consumers only if soft rate limit
+	 * is not exceeded.
+	 */
+	if (vy_quota_rl_may_use(&q->rl.soft) &&
+	    !rlist_empty(&q->wait_queue[VY_QUOTA_CONSUMER_LO])) {
+		n1 = rlist_first_entry(&q->wait_queue[VY_QUOTA_CONSUMER_LO],
+				       struct vy_quota_wait_node,
+				       in_wait_queue);
 	}
+	if (!rlist_empty(&q->wait_queue[VY_QUOTA_CONSUMER_HI])) {
+		n2 = rlist_first_entry(&q->wait_queue[VY_QUOTA_CONSUMER_HI],
+				       struct vy_quota_wait_node,
+				       in_wait_queue);
+	}
+
+	int64_t ts1 = n1 != NULL ? n1->timestamp : INT64_MAX;
+	int64_t ts2 = n2 != NULL ? n2->timestamp : INT64_MAX;
+
+	/*
+	 * Wake up a consumer that have waited most no matter
+	 * whether it's high or low priority. This assures that
+	 * high priority consumers don't uncontrollably throttle
+	 * low priority ones.
+	 */
+	struct vy_quota_wait_node *n = ts1 < ts2 ? n1 : n2;
+	if (n != NULL)
+		fiber_wakeup(n->fiber);
 }
 
 static void
@@ -193,7 +227,8 @@ vy_quota_timer_cb(ev_loop *loop, ev_timer *timer, int events)
 	/*
 	 * Replenish quota and wake up throttled fibers, if any.
 	 */
-	vy_quota_rl_refill(&q->rl);
+	vy_quota_rl_refill(&q->rl.soft);
+	vy_quota_rl_refill(&q->rl.hard);
 	vy_quota_signal(q);
 }
 
@@ -228,7 +263,8 @@ vy_quota_create(struct vy_quota *q, size_t limit,
 	q->too_long_threshold = TIMEOUT_INFINITY;
 	q->dump_bw = VY_DEFAULT_DUMP_BANDWIDTH;
 	q->trigger_dump_cb = trigger_dump_cb;
-	rlist_create(&q->wait_queue);
+	rlist_create(&q->wait_queue[VY_QUOTA_CONSUMER_LO]);
+	rlist_create(&q->wait_queue[VY_QUOTA_CONSUMER_HI]);
 	ev_timer_init(&q->timer, vy_quota_timer_cb, 0,
 		      VY_QUOTA_UPDATE_INTERVAL);
 	q->timer.data = q;
@@ -241,8 +277,10 @@ vy_quota_enable(struct vy_quota *q)
 	assert(!q->is_enabled);
 	q->is_enabled = true;
 	q->use_curr = 0;
-	vy_quota_rl_set(&q->rl, q->dump_bw);
-	vy_quota_rl_refill(&q->rl);
+	vy_quota_rl_set(&q->rl.soft, q->dump_bw);
+	vy_quota_rl_set(&q->rl.hard, q->dump_bw);
+	vy_quota_rl_refill(&q->rl.soft);
+	vy_quota_rl_refill(&q->rl.hard);
 	ev_timer_start(loop(), &q->timer);
 	vy_quota_check_watermark(q);
 }
@@ -307,7 +345,8 @@ vy_quota_dump(struct vy_quota *q, size_t size, double duration)
 	 * limit to the dump bandwidth rather than disabling it
 	 * completely.
 	 */
-	vy_quota_rl_set(&q->rl, q->dump_bw);
+	vy_quota_rl_set(&q->rl.soft, q->dump_bw);
+	vy_quota_rl_set(&q->rl.hard, q->dump_bw);
 }
 
 void
@@ -318,11 +357,14 @@ vy_quota_reset_dump_bandwidth(struct vy_quota *q, size_t max)
 }
 
 void
-vy_quota_force_use(struct vy_quota *q, size_t size)
+vy_quota_force_use(struct vy_quota *q, size_t size,
+		   enum vy_quota_consumer_prio prio)
 {
 	q->used += size;
 	q->use_curr += size;
-	vy_quota_rl_use(&q->rl, size);
+	vy_quota_rl_use(&q->rl.hard, size);
+	if (prio == VY_QUOTA_CONSUMER_LO)
+		vy_quota_rl_use(&q->rl.soft, size);
 	vy_quota_check_watermark(q);
 }
 
@@ -331,21 +373,26 @@ vy_quota_force_use(struct vy_quota *q, size_t size)
  * or false if consumers have to wait.
  */
 static bool
-vy_quota_may_use(struct vy_quota *q, size_t size)
+vy_quota_may_use(struct vy_quota *q, size_t size,
+		 enum vy_quota_consumer_prio prio)
 {
 	if (!q->is_enabled)
 		return true;
 	if (q->used + size > q->limit)
 		return false;
-	if (!vy_quota_rl_may_use(&q->rl))
+	if (!vy_quota_rl_may_use(&q->rl.hard))
+		return false;
+	if (prio == VY_QUOTA_CONSUMER_LO &&
+	    !vy_quota_rl_may_use(&q->rl.soft))
 		return false;
 	return true;
 }
 
 int
-vy_quota_use(struct vy_quota *q, size_t size, double timeout)
+vy_quota_use(struct vy_quota *q, size_t size,
+	     enum vy_quota_consumer_prio prio, double timeout)
 {
-	if (!vy_quota_may_use(q, size)) {
+	if (!vy_quota_may_use(q, size, prio)) {
 		/* Wait for quota. */
 		double start_time = ev_monotonic_now(loop());
 		double deadline = start_time + timeout;
@@ -362,12 +409,13 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 
 			struct vy_quota_wait_node node = {
 				.fiber = fiber(),
+				.timestamp = ++q->wait_timestamp,
 			};
-			rlist_add_tail_entry(&q->wait_queue,
+			rlist_add_tail_entry(&q->wait_queue[prio],
 					     &node, in_wait_queue);
 			fiber_yield_timeout(deadline - now);
 			rlist_del_entry(&node, in_wait_queue);
-		} while (!vy_quota_may_use(q, size));
+		} while (!vy_quota_may_use(q, size, prio));
 
 		double wait_time = ev_monotonic_now(loop()) - start_time;
 		if (wait_time > q->too_long_threshold) {
@@ -380,26 +428,30 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 		 */
 		vy_quota_signal(q);
 	}
-	vy_quota_force_use(q, size);
+	vy_quota_force_use(q, size, prio);
 	return 0;
 }
 
 static void
-vy_quota_unuse(struct vy_quota *q, size_t size)
+vy_quota_unuse(struct vy_quota *q, size_t size,
+	       enum vy_quota_consumer_prio prio)
 {
 	assert(q->used >= size);
 	q->used -= size;
 	/* use_curr could have been reset on timeout. */
 	q->use_curr -= MIN(q->use_curr, size);
-	vy_quota_rl_unuse(&q->rl, size);
+	vy_quota_rl_unuse(&q->rl.hard, size);
+	if (prio == VY_QUOTA_CONSUMER_LO)
+		vy_quota_rl_unuse(&q->rl.soft, size);
 	vy_quota_signal(q);
 }
 
 void
-vy_quota_adjust(struct vy_quota *q, size_t reserved, size_t used)
+vy_quota_adjust(struct vy_quota *q, size_t reserved, size_t used,
+		enum vy_quota_consumer_prio prio)
 {
 	if (reserved > used)
-		vy_quota_unuse(q, reserved - used);
+		vy_quota_unuse(q, reserved - used, prio);
 	if (reserved < used)
-		vy_quota_force_use(q, used - reserved);
+		vy_quota_force_use(q, used - reserved, prio);
 }
diff --git a/src/box/vy_quota.h b/src/box/vy_quota.h
index f53bb8a9..31fd0d54 100644
--- a/src/box/vy_quota.h
+++ b/src/box/vy_quota.h
@@ -33,6 +33,7 @@
 
 #include <stdbool.h>
 #include <stddef.h>
+#include <stdint.h>
 #include <small/rlist.h>
 #include <tarantool_ev.h>
 
@@ -44,6 +45,18 @@ struct vy_quota;
 struct histogram;
 struct fiber;
 
+/**
+ * Quota consumer priority. Determines how a consumer will be
+ * rate limited, see vy_quota::rl.
+ */
+enum vy_quota_consumer_prio {
+	VY_QUOTA_CONSUMER_LO = 0,
+	VY_QUOTA_CONSUMER_HI = 1,
+	vy_quota_consumer_prio_MAX,
+
+	VY_QUOTA_CONSUMER_DEFAULT = VY_QUOTA_CONSUMER_LO,
+};
+
 typedef int
 (*vy_quota_trigger_dump_f)(struct vy_quota *quota);
 
@@ -66,6 +79,13 @@ struct vy_quota_wait_node {
 	struct rlist in_wait_queue;
 	/** Fiber waiting for quota. */
 	struct fiber *fiber;
+	/**
+	 * Timestamp assigned to this fiber when it was
+	 * put to sleep (see vy_quota::wait_timestamp).
+	 * It is used to balance wake-ups between low and
+	 * high priority queues.
+	 */
+	int64_t timestamp;
 };
 
 /**
@@ -98,7 +118,12 @@ struct vy_quota {
 	 * vy_quota_wait_node::state. Newcomers are added
 	 * to the tail.
 	 */
-	struct rlist wait_queue;
+	struct rlist wait_queue[vy_quota_consumer_prio_MAX];
+	/**
+	 * Monotonically growing timestamp assigned to fibers
+	 * waiting for quota, see vy_quota_wait_node::timestamp.
+	 */
+	int64_t wait_timestamp;
 	/**
 	 * Timer used for updating average use rate, calculating
 	 * quota watermark and refilling rate limit value.
@@ -116,7 +141,18 @@ struct vy_quota {
 	 */
 	size_t use_rate;
 	/** Rate limiter. */
-	struct vy_quota_rl rl;
+	struct {
+		/**
+		 * Soft rate limit, bypassed by high priority
+		 * consumers.
+		 */
+		struct vy_quota_rl soft;
+		/**
+		 * Hard rate limit, observed by both low and
+		 * high priority consumers.
+		 */
+		struct vy_quota_rl hard;
+	} rl;
 	/**
 	 * Called when quota is consumed if used >= watermark.
 	 * It is supposed to trigger memory dump and return 0
@@ -201,7 +237,8 @@ vy_quota_reset_dump_bandwidth(struct vy_quota *q, size_t max);
  * this function does not throttle the caller.
  */
 void
-vy_quota_force_use(struct vy_quota *q, size_t size);
+vy_quota_force_use(struct vy_quota *q, size_t size,
+		   enum vy_quota_consumer_prio prio);
 
 /**
  * Try to consume @size bytes of memory, throttle the caller
@@ -238,7 +275,8 @@ vy_quota_force_use(struct vy_quota *q, size_t size);
  * account while estimating the size of a memory allocation.
  */
 int
-vy_quota_use(struct vy_quota *q, size_t size, double timeout);
+vy_quota_use(struct vy_quota *q, size_t size,
+	     enum vy_quota_consumer_prio prio, double timeout);
 
 /**
  * Adjust quota after allocating memory.
@@ -249,15 +287,16 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout);
  * See also vy_quota_use().
  */
 void
-vy_quota_adjust(struct vy_quota *q, size_t reserved, size_t used);
+vy_quota_adjust(struct vy_quota *q, size_t reserved, size_t used,
+		enum vy_quota_consumer_prio prio);
 
 /**
  * Block the caller until the quota is not exceeded.
  */
 static inline void
-vy_quota_wait(struct vy_quota *q)
+vy_quota_wait(struct vy_quota *q, enum vy_quota_consumer_prio prio)
 {
-	vy_quota_use(q, 0, TIMEOUT_INFINITY);
+	vy_quota_use(q, 0, prio, TIMEOUT_INFINITY);
 }
 
 #if defined(__cplusplus)
-- 
2.11.0




More information about the Tarantool-patches mailing list