[PATCH v2 11/11] vinyl: introduce quota consumer priorities

Vladimir Davydov vdavydov.dev at gmail.com
Fri Sep 28 20:40:09 MSK 2018


Currently, we only limit quota consumption rate so that writers won't
hit the hard limit before memory dump is complete. However, it isn't
enough, because we also need to consider compaction: if it doesn't keep
up with dumps, read and space amplification will grow uncontrollably.

The problem is compaction may be a quota consumer by itself, as it may
generate deferred DELETE statements for secondary indexes. We can't
ignore quota completely there, because if we do, we may hit the memory
limit and stall all writers, which is unacceptable, but we do want to
ignore the rate limit imposed to make sure that compaction keeps up with
dumps, otherwise compaction won't benefit from such a throttling.

To tackle this problem, this patch introduces quota consumer priorities.
Now a quota object maintains one rate limit and one wait queue per each
transaction priority. Rate limit i is used by a consumer with priority
prio if and only if prio <= i. Whenever a consumer has to be throttled,
it is put to sleep to the wait queue corresponding to its priority.
When quota is replenished, we pick the oldest consumer among all that
may be woken up. This ensures fairness.

For now, there are only two priorities - one for transactions and one
for compaction jobs. Transactions have the lowest priority (0) and are
throttled for two purposes: first, to ensure that the hard memory limit
won't be hit before memory dump is complete (memory-based throttling),
and second, to keep compaction progress in sync with dumps (disk-based
throttling). Compaction jobs have a higher priority (1) and ignore
disk-based throttling, but still respect memory-based throttling.

Note, the patch doesn't implement the logic of disk-based throttling in
the regulator module. It is still left for future work.

Part of #1862
---
 src/box/vinyl.c        | 30 +++++++++-------
 src/box/vy_quota.c     | 96 +++++++++++++++++++++++++++++++++++---------------
 src/box/vy_quota.h     | 88 +++++++++++++++++++++++++++++++++++++--------
 src/box/vy_regulator.c | 11 +++---
 4 files changed, 167 insertions(+), 58 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index c3d95777..615321ee 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2337,7 +2337,8 @@ vinyl_engine_prepare(struct engine *engine, struct txn *txn)
 	 * the transaction to be sent to read view or aborted, we call
 	 * it before checking for conflicts.
 	 */
-	if (vy_quota_use(&env->quota, tx->write_size, timeout) != 0)
+	if (vy_quota_use(&env->quota, VY_QUOTA_CONSUMER_TX,
+			 tx->write_size, timeout) != 0)
 		return -1;
 
 	size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
@@ -2346,8 +2347,8 @@ vinyl_engine_prepare(struct engine *engine, struct txn *txn)
 
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-	vy_quota_adjust(&env->quota, tx->write_size,
-			mem_used_after - mem_used_before);
+	vy_quota_adjust(&env->quota, VY_QUOTA_CONSUMER_TX,
+			tx->write_size, mem_used_after - mem_used_before);
 	vy_regulator_check_watermark(&env->regulator);
 	return rc;
 }
@@ -2372,7 +2373,8 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
 	/* We can't abort the transaction at this point, use force. */
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_TX,
+			   mem_used_after - mem_used_before);
 	vy_regulator_check_watermark(&env->regulator);
 
 	txn->engine_tx = NULL;
@@ -3165,7 +3167,8 @@ vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
 	 * quota accounting.
 	 */
 	size_t reserved = tx->write_size;
-	if (vy_quota_use(&env->quota, reserved, TIMEOUT_INFINITY) != 0)
+	if (vy_quota_use(&env->quota, VY_QUOTA_CONSUMER_TX,
+			 reserved, TIMEOUT_INFINITY) != 0)
 		unreachable();
 
 	size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
@@ -3184,7 +3187,7 @@ vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
 	size_t used = mem_used_after - mem_used_before;
-	vy_quota_adjust(&env->quota, reserved, used);
+	vy_quota_adjust(&env->quota, VY_QUOTA_CONSUMER_TX, reserved, used);
 	vy_regulator_check_watermark(&env->regulator);
 	return rc;
 }
@@ -3505,7 +3508,7 @@ vy_squash_process(struct vy_squash *squash)
 		 * so there's no need in invalidating the cache.
 		 */
 		vy_mem_commit_stmt(mem, region_stmt);
-		vy_quota_force_use(&env->quota,
+		vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_TX,
 				   mem_used_after - mem_used_before);
 		vy_regulator_check_watermark(&env->regulator);
 	}
@@ -3981,9 +3984,10 @@ vy_build_insert_tuple(struct vy_env *env, struct vy_lsm *lsm,
 	/* Consume memory quota. Throttle if it is exceeded. */
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_TX,
+			   mem_used_after - mem_used_before);
 	vy_regulator_check_watermark(&env->regulator);
-	vy_quota_wait(&env->quota);
+	vy_quota_wait(&env->quota, VY_QUOTA_CONSUMER_TX);
 	return rc;
 }
 
@@ -4109,7 +4113,8 @@ vy_build_recover(struct vy_env *env, struct vy_lsm *lsm, struct vy_lsm *pk)
 
 	mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_TX,
+			   mem_used_after - mem_used_before);
 	return rc;
 }
 
@@ -4325,7 +4330,7 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 	 */
 	struct vy_env *env = vy_env(space->engine);
 	if (is_first_statement)
-		vy_quota_wait(&env->quota);
+		vy_quota_wait(&env->quota, VY_QUOTA_CONSUMER_COMPACTION);
 
 	/* Create the deferred DELETE statement. */
 	struct vy_lsm *pk = vy_lsm(space->index[0]);
@@ -4412,7 +4417,8 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 	}
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_COMPACTION,
+			   mem_used_after - mem_used_before);
 	vy_regulator_check_watermark(&env->regulator);
 
 	tuple_unref(delete);
diff --git a/src/box/vy_quota.c b/src/box/vy_quota.c
index ceac4878..1588b3dc 100644
--- a/src/box/vy_quota.c
+++ b/src/box/vy_quota.c
@@ -52,18 +52,29 @@
 static const double VY_QUOTA_TIMER_PERIOD = 0.1;
 
 /**
+ * Iterate over rate limit states that are enforced for a consumer
+ * with the given priority.
+ */
+#define vy_quota_for_each_rate_limit(quota, rl, prio) \
+	for (struct vy_rate_limit *rl = &(quota)->rate_limit[prio]; \
+	     rl - (quota)->rate_limit < vy_quota_consumer_prio_MAX; rl++)
+
+/**
  * Return true if the requested amount of memory may be consumed
  * right now, false if consumers have to wait.
  */
 static inline bool
-vy_quota_may_use(struct vy_quota *q, size_t size)
+vy_quota_may_use(struct vy_quota *q, enum vy_quota_consumer_prio prio,
+		 size_t size)
 {
 	if (!q->is_enabled)
 		return true;
 	if (q->used + size > q->limit)
 		return false;
-	if (!vy_rate_limit_may_use(&q->rate_limit))
-		return false;
+	vy_quota_for_each_rate_limit(q, rl, prio) {
+		if (!vy_rate_limit_may_use(rl))
+			return false;
+	}
 	return true;
 }
 
@@ -71,10 +82,12 @@ vy_quota_may_use(struct vy_quota *q, size_t size)
  * Consume the given amount of memory without checking the limit.
  */
 static inline void
-vy_quota_do_use(struct vy_quota *q, size_t size)
+vy_quota_do_use(struct vy_quota *q, enum vy_quota_consumer_prio prio,
+		size_t size)
 {
 	q->used += size;
-	vy_rate_limit_use(&q->rate_limit, size);
+	vy_quota_for_each_rate_limit(q, rl, prio)
+		vy_rate_limit_use(rl, size);
 }
 
 /**
@@ -82,11 +95,13 @@ vy_quota_do_use(struct vy_quota *q, size_t size)
  * This function is an exact opposite of vy_quota_do_use().
  */
 static inline void
-vy_quota_do_unuse(struct vy_quota *q, size_t size)
+vy_quota_do_unuse(struct vy_quota *q, enum vy_quota_consumer_prio prio,
+		  size_t size)
 {
 	assert(q->used >= size);
 	q->used -= size;
-	vy_rate_limit_unuse(&q->rate_limit, size);
+	vy_quota_for_each_rate_limit(q, rl, prio)
+		vy_rate_limit_unuse(rl, size);
 }
 
 /**
@@ -106,17 +121,33 @@ vy_quota_check_limit(struct vy_quota *q)
 static void
 vy_quota_signal(struct vy_quota *q)
 {
-	if (!rlist_empty(&q->wait_queue)) {
+	/*
+	 * Wake up a consumer that has waited most no matter
+	 * whether it's high or low priority. This assures that
+	 * high priority consumers don't uncontrollably throttle
+	 * low priority ones.
+	 */
+	struct vy_quota_wait_node *oldest = NULL;
+	for (int i = 0; i < vy_quota_consumer_prio_MAX; i++) {
+		struct rlist *wq = &q->wait_queue[i];
+		if (rlist_empty(wq))
+			continue;
+
 		struct vy_quota_wait_node *n;
-		n = rlist_first_entry(&q->wait_queue,
-				      struct vy_quota_wait_node, in_wait_queue);
+		n = rlist_first_entry(wq, struct vy_quota_wait_node,
+				      in_wait_queue);
 		/*
 		 * No need in waking up a consumer if it will have
 		 * to go back to sleep immediately.
 		 */
-		if (vy_quota_may_use(q, n->size))
-			fiber_wakeup(n->fiber);
+		if (!vy_quota_may_use(q, i, n->size))
+			continue;
+
+		if (oldest == NULL || oldest->timestamp > n->timestamp)
+			oldest = n;
 	}
+	if (oldest != NULL)
+		fiber_wakeup(oldest->fiber);
 }
 
 static void
@@ -127,7 +158,8 @@ vy_quota_timer_cb(ev_loop *loop, ev_timer *timer, int events)
 
 	struct vy_quota *q = timer->data;
 
-	vy_rate_limit_refill(&q->rate_limit, VY_QUOTA_TIMER_PERIOD);
+	for (int i = 0; i < vy_quota_consumer_prio_MAX; i++)
+		vy_rate_limit_refill(&q->rate_limit[i], VY_QUOTA_TIMER_PERIOD);
 	vy_quota_signal(q);
 }
 
@@ -140,8 +172,11 @@ vy_quota_create(struct vy_quota *q, size_t limit,
 	q->used = 0;
 	q->too_long_threshold = TIMEOUT_INFINITY;
 	q->quota_exceeded_cb = quota_exceeded_cb;
-	rlist_create(&q->wait_queue);
-	vy_rate_limit_create(&q->rate_limit);
+	q->wait_timestamp = 0;
+	for (int i = 0; i < vy_quota_consumer_prio_MAX; i++) {
+		rlist_create(&q->wait_queue[i]);
+		vy_rate_limit_create(&q->rate_limit[i]);
+	}
 	ev_timer_init(&q->timer, vy_quota_timer_cb, 0, VY_QUOTA_TIMER_PERIOD);
 	q->timer.data = q;
 }
@@ -170,15 +205,17 @@ vy_quota_set_limit(struct vy_quota *q, size_t limit)
 }
 
 void
-vy_quota_set_rate_limit(struct vy_quota *q, size_t rate)
+vy_quota_set_rate_limit(struct vy_quota *q, enum vy_quota_consumer_prio prio,
+			size_t rate)
 {
-	vy_rate_limit_set(&q->rate_limit, rate);
+	vy_rate_limit_set(&q->rate_limit[prio], rate);
 }
 
 void
-vy_quota_force_use(struct vy_quota *q, size_t size)
+vy_quota_force_use(struct vy_quota *q, enum vy_quota_consumer_prio prio,
+		   size_t size)
 {
-	vy_quota_do_use(q, size);
+	vy_quota_do_use(q, prio, size);
 	vy_quota_check_limit(q);
 }
 
@@ -195,10 +232,11 @@ vy_quota_release(struct vy_quota *q, size_t size)
 }
 
 int
-vy_quota_use(struct vy_quota *q, size_t size, double timeout)
+vy_quota_use(struct vy_quota *q, enum vy_quota_consumer_prio prio,
+	     size_t size, double timeout)
 {
-	if (vy_quota_may_use(q, size)) {
-		vy_quota_do_use(q, size);
+	if (vy_quota_may_use(q, prio, size)) {
+		vy_quota_do_use(q, prio, size);
 		return 0;
 	}
 
@@ -218,8 +256,9 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 	struct vy_quota_wait_node wait_node = {
 		.fiber = fiber(),
 		.size = size,
+		.timestamp = ++q->wait_timestamp,
 	};
-	rlist_add_tail_entry(&q->wait_queue, &wait_node, in_wait_queue);
+	rlist_add_tail_entry(&q->wait_queue[prio], &wait_node, in_wait_queue);
 
 	do {
 		/*
@@ -239,7 +278,7 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 			diag_set(ClientError, ER_VY_QUOTA_TIMEOUT);
 			return -1;
 		}
-	} while (!vy_quota_may_use(q, size));
+	} while (!vy_quota_may_use(q, prio, size));
 
 	rlist_del_entry(&wait_node, in_wait_queue);
 
@@ -249,7 +288,7 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 			 "for too long: %.3f sec", size, wait_time);
 	}
 
-	vy_quota_do_use(q, size);
+	vy_quota_do_use(q, prio, size);
 	/*
 	 * Blocked consumers are awaken one by one to preserve
 	 * the order they were put to sleep. It's a responsibility
@@ -261,14 +300,15 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 }
 
 void
-vy_quota_adjust(struct vy_quota *q, size_t reserved, size_t used)
+vy_quota_adjust(struct vy_quota *q, enum vy_quota_consumer_prio prio,
+		size_t reserved, size_t used)
 {
 	if (reserved > used) {
-		vy_quota_do_unuse(q, reserved - used);
+		vy_quota_do_unuse(q, prio, reserved - used);
 		vy_quota_signal(q);
 	}
 	if (reserved < used) {
-		vy_quota_do_use(q, used - reserved);
+		vy_quota_do_use(q, prio, used - reserved);
 		vy_quota_check_limit(q);
 	}
 }
diff --git a/src/box/vy_quota.h b/src/box/vy_quota.h
index 79755e89..f1c3fb90 100644
--- a/src/box/vy_quota.h
+++ b/src/box/vy_quota.h
@@ -110,6 +110,43 @@ vy_rate_limit_refill(struct vy_rate_limit *rl, double time)
 typedef void
 (*vy_quota_exceeded_f)(struct vy_quota *quota);
 
+/**
+ * Quota consumer priority. Determines how a consumer will be
+ * rate limited. See also vy_quota::rate_limit.
+ */
+enum vy_quota_consumer_prio {
+	/**
+	 * Transaction processor priority.
+	 *
+	 * Transaction throttling pursues two goals. First, it is
+	 * capping memory consumption rate so that the hard memory
+	 * limit will not be hit before memory dump has completed
+	 * (memory-based throttling). Second, we must make sure
+	 * that compaction jobs keep up with dumps to keep the read
+	 * amplification within bounds (disk-based throttling).
+	 * Transactions ought to respect them both.
+	 */
+	VY_QUOTA_CONSUMER_TX = 0,
+	/**
+	 * Compaction job priority.
+	 *
+	 * Compaction jobs may need some quota too, because they
+	 * may generate deferred DELETEs for secondary indexes.
+	 * Apparently, we must not impose the rate limit that
+	 * is supposed to speed up compaction on them, however
+	 * they still have to respect memory-based throttling to
+	 * avoid long stalls.
+	 */
+	VY_QUOTA_CONSUMER_COMPACTION = 1,
+	/**
+	 * A convenience shortcut for setting the rate limit for
+	 * all kinds of consumers.
+	 */
+	VY_QUOTA_CONSUMER_ALL = VY_QUOTA_CONSUMER_COMPACTION,
+
+	vy_quota_consumer_prio_MAX,
+};
+
 struct vy_quota_wait_node {
 	/** Link in vy_quota::wait_queue. */
 	struct rlist in_wait_queue;
@@ -117,6 +154,11 @@ struct vy_quota_wait_node {
 	struct fiber *fiber;
 	/** Amount of requested memory. */
 	size_t size;
+	/**
+	 * Timestamp assigned to this fiber when it was put to
+	 * sleep, see vy_quota::wait_timestamp for more details.
+	 */
+	int64_t timestamp;
 };
 
 /**
@@ -144,13 +186,27 @@ struct vy_quota {
 	 */
 	vy_quota_exceeded_f quota_exceeded_cb;
 	/**
-	 * Queue of consumers waiting for quota, linked by
-	 * vy_quota_wait_node::state. Newcomers are added
-	 * to the tail.
+	 * Monotonically growing timestamp assigned to consumers
+	 * waiting for quota. It is used for balancing wakeups
+	 * among wait queues: if two fibers from different wait
+	 * queues may proceed, the one with the lowest timestamp
+	 * will be picked.
+	 *
+	 * See also vy_quota_wait_node::timestamp.
+	 */
+	int64_t wait_timestamp;
+	/**
+	 * Queue of consumers waiting for quota, one per each
+	 * consumer priority, linked by vy_quota_wait_node::state.
+	 * Newcomers are added to the tail.
+	 */
+	struct rlist wait_queue[vy_quota_consumer_prio_MAX];
+	/**
+	 * Rate limit state, one per each consumer priority.
+	 * A rate limit is enforced if and only if the consumer
+	 * priority is less than or equal to its index.
 	 */
-	struct rlist wait_queue;
-	/** Rate limit state. */
-	struct vy_rate_limit rate_limit;
+	struct vy_rate_limit rate_limit[vy_quota_consumer_prio_MAX];
 	/**
 	 * Periodic timer that is used for refilling the rate
 	 * limit value.
@@ -188,18 +244,20 @@ void
 vy_quota_set_limit(struct vy_quota *q, size_t limit);
 
 /**
- * Set the max rate at which quota may be consumed,
- * in bytes per second.
+ * Set the rate limit for consumers with priority less than or
+ * equal to @prio, in bytes per second.
  */
 void
-vy_quota_set_rate_limit(struct vy_quota *q, size_t rate);
+vy_quota_set_rate_limit(struct vy_quota *q, enum vy_quota_consumer_prio prio,
+			size_t rate);
 
 /**
  * Consume @size bytes of memory. In contrast to vy_quota_use()
  * this function does not throttle the caller.
  */
 void
-vy_quota_force_use(struct vy_quota *q, size_t size);
+vy_quota_force_use(struct vy_quota *q, enum vy_quota_consumer_prio prio,
+		   size_t size);
 
 /**
  * Release @size bytes of memory.
@@ -242,7 +300,8 @@ vy_quota_release(struct vy_quota *q, size_t size);
  * account while estimating the size of a memory allocation.
  */
 int
-vy_quota_use(struct vy_quota *q, size_t size, double timeout);
+vy_quota_use(struct vy_quota *q, enum vy_quota_consumer_prio prio,
+	     size_t size, double timeout);
 
 /**
  * Adjust quota after allocating memory.
@@ -253,15 +312,16 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout);
  * See also vy_quota_use().
  */
 void
-vy_quota_adjust(struct vy_quota *q, size_t reserved, size_t used);
+vy_quota_adjust(struct vy_quota *q, enum vy_quota_consumer_prio prio,
+		size_t reserved, size_t used);
 
 /**
  * Block the caller until the quota is not exceeded.
  */
 static inline void
-vy_quota_wait(struct vy_quota *q)
+vy_quota_wait(struct vy_quota *q, enum vy_quota_consumer_prio prio)
 {
-	vy_quota_use(q, 0, TIMEOUT_INFINITY);
+	vy_quota_use(q, prio, 0, TIMEOUT_INFINITY);
 }
 
 #if defined(__cplusplus)
diff --git a/src/box/vy_regulator.c b/src/box/vy_regulator.c
index 1e106fc4..da35cc74 100644
--- a/src/box/vy_regulator.c
+++ b/src/box/vy_regulator.c
@@ -93,7 +93,7 @@ vy_regulator_trigger_dump(struct vy_regulator *regulator)
 	size_t mem_used = quota->used;
 	size_t max_write_rate = mem_left * regulator->dump_bw / (mem_used + 1);
 	max_write_rate = MIN(max_write_rate, regulator->dump_bw);
-	vy_quota_set_rate_limit(quota, max_write_rate);
+	vy_quota_set_rate_limit(quota, VY_QUOTA_CONSUMER_ALL, max_write_rate);
 }
 
 static void
@@ -190,7 +190,8 @@ void
 vy_regulator_start(struct vy_regulator *regulator)
 {
 	ev_timer_start(loop(), &regulator->timer);
-	vy_quota_set_rate_limit(regulator->quota, regulator->dump_bw);
+	vy_quota_set_rate_limit(regulator->quota, VY_QUOTA_CONSUMER_ALL,
+				regulator->dump_bw);
 }
 
 void
@@ -253,7 +254,8 @@ vy_regulator_dump_complete(struct vy_regulator *regulator,
 	 * limit to the dump bandwidth rather than disabling it
 	 * completely.
 	 */
-	vy_quota_set_rate_limit(regulator->quota, regulator->dump_bw);
+	vy_quota_set_rate_limit(regulator->quota, VY_QUOTA_CONSUMER_ALL,
+				regulator->dump_bw);
 }
 
 void
@@ -261,5 +263,6 @@ vy_regulator_reset_dump_bw(struct vy_regulator *regulator, size_t max)
 {
 	histogram_reset(regulator->dump_bw_hist);
 	regulator->dump_bw = MIN(VY_DUMP_BW_DEFAULT, max);
-	vy_quota_set_rate_limit(regulator->quota, regulator->dump_bw);
+	vy_quota_set_rate_limit(regulator->quota, VY_QUOTA_CONSUMER_ALL,
+				regulator->dump_bw);
 }
-- 
2.11.0




More information about the Tarantool-patches mailing list