[PATCH v3 1/4] vinyl: introduce quota consumer types
    Vladimir Davydov 
    vdavydov.dev at gmail.com
       
    Tue Feb 12 20:26:53 MSK 2019
    
    
  
Currently, we only limit quota consumption rate so that writers won't
hit the hard limit before memory dump is complete. However, it isn't
enough, because we also need to consider compaction: if it doesn't keep
up with dumps, read and space amplification will grow uncontrollably.
The problem is compaction may be a quota consumer by itself, as it may
generate deferred DELETE statements for secondary indexes. We can't
ignore quota completely there, because if we do, we may hit the memory
limit and stall all writers, which is unacceptable, but we do want to
ignore the rate limit imposed to make sure that compaction keeps up with
dumps, otherwise compaction won't benefit from such a throttling.
To tackle this problem, this patch introduces the concept of quota
consumer types and resources. Now vy_quota maintains one rate limit per
each resource and one wait queue per each consumer type. There are two
types of consumers, compaction jobs and usual transactions, and there
are two resources managed by vy_quota, disk and memory. Memory-based
rate limit ensures that transactions won't hit the hard memory limit and
stall before memory dump is complete. It is respected by all types of
consumers. Disk-based rate limit is supposed to be set when compaction
doesn't keep up with dumps. It is only used by usual transactions and
ignored by compaction jobs.
Since now there are two wait queues, we need to balance wakeups between
them in case consumers in both queues are ready to proceed. To ensure
there's no starvation, we maintain a monotonically growing counter and
assign its value to each consumer put to slip (ticket). We use it to
wake up the consumer that has waited most when both queues are ready.
Note, the patch doesn't implement the logic of disk-based throttling in
the regulator module. It is still left for future work.
Needed for #3721
---
 src/box/vinyl.c        |  30 ++++++-----
 src/box/vy_quota.c     | 133 ++++++++++++++++++++++++++++++++++++++-----------
 src/box/vy_quota.h     |  91 +++++++++++++++++++++++++++------
 src/box/vy_regulator.c |  12 +++--
 4 files changed, 208 insertions(+), 58 deletions(-)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 2815a7fd..22fe135b 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2347,7 +2347,8 @@ vinyl_engine_prepare(struct engine *engine, struct txn *txn)
 	 * the transaction to be sent to read view or aborted, we call
 	 * it before checking for conflicts.
 	 */
-	if (vy_quota_use(&env->quota, tx->write_size, timeout) != 0)
+	if (vy_quota_use(&env->quota, VY_QUOTA_CONSUMER_TX,
+			 tx->write_size, timeout) != 0)
 		return -1;
 
 	size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
@@ -2356,8 +2357,8 @@ vinyl_engine_prepare(struct engine *engine, struct txn *txn)
 
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-	vy_quota_adjust(&env->quota, tx->write_size,
-			mem_used_after - mem_used_before);
+	vy_quota_adjust(&env->quota, VY_QUOTA_CONSUMER_TX,
+			tx->write_size, mem_used_after - mem_used_before);
 	vy_regulator_check_dump_watermark(&env->regulator);
 	return rc;
 }
@@ -2382,7 +2383,8 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn)
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
 	/* We can't abort the transaction at this point, use force. */
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_TX,
+			   mem_used_after - mem_used_before);
 	vy_regulator_check_dump_watermark(&env->regulator);
 
 	txn->engine_tx = NULL;
@@ -3194,7 +3196,8 @@ vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
 	 * quota accounting.
 	 */
 	size_t reserved = tx->write_size;
-	if (vy_quota_use(&env->quota, reserved, TIMEOUT_INFINITY) != 0)
+	if (vy_quota_use(&env->quota, VY_QUOTA_CONSUMER_TX,
+			 reserved, TIMEOUT_INFINITY) != 0)
 		unreachable();
 
 	size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
@@ -3213,7 +3216,7 @@ vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
 	size_t used = mem_used_after - mem_used_before;
-	vy_quota_adjust(&env->quota, reserved, used);
+	vy_quota_adjust(&env->quota, VY_QUOTA_CONSUMER_TX, reserved, used);
 	vy_regulator_check_dump_watermark(&env->regulator);
 	return rc;
 }
@@ -3534,7 +3537,7 @@ vy_squash_process(struct vy_squash *squash)
 		 * so there's no need in invalidating the cache.
 		 */
 		vy_mem_commit_stmt(mem, region_stmt);
-		vy_quota_force_use(&env->quota,
+		vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_TX,
 				   mem_used_after - mem_used_before);
 		vy_regulator_check_dump_watermark(&env->regulator);
 	}
@@ -4010,9 +4013,10 @@ vy_build_insert_tuple(struct vy_env *env, struct vy_lsm *lsm,
 	/* Consume memory quota. Throttle if it is exceeded. */
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_TX,
+			   mem_used_after - mem_used_before);
 	vy_regulator_check_dump_watermark(&env->regulator);
-	vy_quota_wait(&env->quota);
+	vy_quota_wait(&env->quota, VY_QUOTA_CONSUMER_TX);
 	return rc;
 }
 
@@ -4138,7 +4142,8 @@ vy_build_recover(struct vy_env *env, struct vy_lsm *lsm, struct vy_lsm *pk)
 
 	mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_TX,
+			   mem_used_after - mem_used_before);
 	return rc;
 }
 
@@ -4354,7 +4359,7 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 	 */
 	struct vy_env *env = vy_env(space->engine);
 	if (is_first_statement)
-		vy_quota_wait(&env->quota);
+		vy_quota_wait(&env->quota, VY_QUOTA_CONSUMER_COMPACTION);
 
 	/* Create the deferred DELETE statement. */
 	struct vy_lsm *pk = vy_lsm(space->index[0]);
@@ -4441,7 +4446,8 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 	}
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
-	vy_quota_force_use(&env->quota, mem_used_after - mem_used_before);
+	vy_quota_force_use(&env->quota, VY_QUOTA_CONSUMER_COMPACTION,
+			   mem_used_after - mem_used_before);
 	vy_regulator_check_dump_watermark(&env->regulator);
 
 	tuple_unref(delete);
diff --git a/src/box/vy_quota.c b/src/box/vy_quota.c
index 07cd5856..dc452adb 100644
--- a/src/box/vy_quota.c
+++ b/src/box/vy_quota.c
@@ -52,6 +52,45 @@
 static const double VY_QUOTA_TIMER_PERIOD = 0.1;
 
 /**
+ * Bit mask of resources used by a particular consumer type.
+ */
+static unsigned
+vy_quota_consumer_resource_map[] = {
+	/**
+	 * Transaction throttling pursues two goals. First, it is
+	 * capping memory consumption rate so that the hard memory
+	 * limit will not be hit before memory dump has completed
+	 * (memory-based throttling). Second, we must make sure
+	 * that compaction jobs keep up with dumps to keep read and
+	 * space amplification within bounds (disk-based throttling).
+	 * Transactions ought to respect them both.
+	 */
+	[VY_QUOTA_CONSUMER_TX] = (1 << VY_QUOTA_RESOURCE_DISK) |
+				 (1 << VY_QUOTA_RESOURCE_MEMORY),
+	/**
+	 * Compaction jobs may need some quota too, because they
+	 * may generate deferred DELETEs for secondary indexes.
+	 * Apparently, we must not impose the rate limit that is
+	 * supposed to speed up compaction on them (disk-based),
+	 * however they still have to respect memory-based throttling
+	 * to avoid long stalls.
+	 */
+	[VY_QUOTA_CONSUMER_COMPACTION] = (1 << VY_QUOTA_RESOURCE_MEMORY),
+};
+
+/**
+ * Check if the rate limit corresponding to resource @resource_type
+ * should be applied to a consumer of type @consumer_type.
+ */
+static inline bool
+vy_rate_limit_is_applicable(enum vy_quota_consumer_type consumer_type,
+			    enum vy_quota_resource_type resource_type)
+{
+	return (vy_quota_consumer_resource_map[consumer_type] &
+					(1 << resource_type)) != 0;
+}
+
+/**
  * Return true if the requested amount of memory may be consumed
  * right now, false if consumers have to wait.
  *
@@ -60,7 +99,8 @@ static const double VY_QUOTA_TIMER_PERIOD = 0.1;
  * it can start memory reclaim immediately.
  */
 static inline bool
-vy_quota_may_use(struct vy_quota *q, size_t size)
+vy_quota_may_use(struct vy_quota *q, enum vy_quota_consumer_type type,
+		 size_t size)
 {
 	if (!q->is_enabled)
 		return true;
@@ -68,8 +108,12 @@ vy_quota_may_use(struct vy_quota *q, size_t size)
 		q->quota_exceeded_cb(q);
 		return false;
 	}
-	if (!vy_rate_limit_may_use(&q->rate_limit))
-		return false;
+	for (int i = 0; i < vy_quota_resource_type_MAX; i++) {
+		struct vy_rate_limit *rl = &q->rate_limit[i];
+		if (vy_rate_limit_is_applicable(type, i) &&
+		    !vy_rate_limit_may_use(rl))
+			return false;
+	}
 	return true;
 }
 
@@ -77,10 +121,15 @@ vy_quota_may_use(struct vy_quota *q, size_t size)
  * Consume the given amount of memory without checking the limit.
  */
 static inline void
-vy_quota_do_use(struct vy_quota *q, size_t size)
+vy_quota_do_use(struct vy_quota *q, enum vy_quota_consumer_type type,
+		size_t size)
 {
 	q->used += size;
-	vy_rate_limit_use(&q->rate_limit, size);
+	for (int i = 0; i < vy_quota_resource_type_MAX; i++) {
+		struct vy_rate_limit *rl = &q->rate_limit[i];
+		if (vy_rate_limit_is_applicable(type, i))
+			vy_rate_limit_use(rl, size);
+	}
 }
 
 /**
@@ -88,11 +137,16 @@ vy_quota_do_use(struct vy_quota *q, size_t size)
  * This function is an exact opposite of vy_quota_do_use().
  */
 static inline void
-vy_quota_do_unuse(struct vy_quota *q, size_t size)
+vy_quota_do_unuse(struct vy_quota *q, enum vy_quota_consumer_type type,
+		  size_t size)
 {
 	assert(q->used >= size);
 	q->used -= size;
-	vy_rate_limit_unuse(&q->rate_limit, size);
+	for (int i = 0; i < vy_quota_resource_type_MAX; i++) {
+		struct vy_rate_limit *rl = &q->rate_limit[i];
+		if (vy_rate_limit_is_applicable(type, i))
+			vy_rate_limit_unuse(rl, size);
+	}
 }
 
 /**
@@ -112,17 +166,31 @@ vy_quota_check_limit(struct vy_quota *q)
 static void
 vy_quota_signal(struct vy_quota *q)
 {
-	if (!rlist_empty(&q->wait_queue)) {
+	/*
+	 * To prevent starvation, wake up a consumer that has
+	 * waited most irrespective of its type.
+	 */
+	struct vy_quota_wait_node *oldest = NULL;
+	for (int i = 0; i < vy_quota_consumer_type_MAX; i++) {
+		struct rlist *wq = &q->wait_queue[i];
+		if (rlist_empty(wq))
+			continue;
+
 		struct vy_quota_wait_node *n;
-		n = rlist_first_entry(&q->wait_queue,
-				      struct vy_quota_wait_node, in_wait_queue);
+		n = rlist_first_entry(wq, struct vy_quota_wait_node,
+				      in_wait_queue);
 		/*
 		 * No need in waking up a consumer if it will have
 		 * to go back to sleep immediately.
 		 */
-		if (vy_quota_may_use(q, n->size))
-			fiber_wakeup(n->fiber);
+		if (!vy_quota_may_use(q, i, n->size))
+			continue;
+
+		if (oldest == NULL || oldest->ticket > n->ticket)
+			oldest = n;
 	}
+	if (oldest != NULL)
+		fiber_wakeup(oldest->fiber);
 }
 
 static void
@@ -133,7 +201,8 @@ vy_quota_timer_cb(ev_loop *loop, ev_timer *timer, int events)
 
 	struct vy_quota *q = timer->data;
 
-	vy_rate_limit_refill(&q->rate_limit, VY_QUOTA_TIMER_PERIOD);
+	for (int i = 0; i < vy_quota_resource_type_MAX; i++)
+		vy_rate_limit_refill(&q->rate_limit[i], VY_QUOTA_TIMER_PERIOD);
 	vy_quota_signal(q);
 }
 
@@ -146,8 +215,11 @@ vy_quota_create(struct vy_quota *q, size_t limit,
 	q->used = 0;
 	q->too_long_threshold = TIMEOUT_INFINITY;
 	q->quota_exceeded_cb = quota_exceeded_cb;
-	rlist_create(&q->wait_queue);
-	vy_rate_limit_create(&q->rate_limit);
+	q->wait_ticket = 0;
+	for (int i = 0; i < vy_quota_consumer_type_MAX; i++)
+		rlist_create(&q->wait_queue[i]);
+	for (int i = 0; i < vy_quota_resource_type_MAX; i++)
+		vy_rate_limit_create(&q->rate_limit[i]);
 	ev_timer_init(&q->timer, vy_quota_timer_cb, 0, VY_QUOTA_TIMER_PERIOD);
 	q->timer.data = q;
 }
@@ -176,15 +248,17 @@ vy_quota_set_limit(struct vy_quota *q, size_t limit)
 }
 
 void
-vy_quota_set_rate_limit(struct vy_quota *q, size_t rate)
+vy_quota_set_rate_limit(struct vy_quota *q, enum vy_quota_resource_type type,
+			size_t rate)
 {
-	vy_rate_limit_set(&q->rate_limit, rate);
+	vy_rate_limit_set(&q->rate_limit[type], rate);
 }
 
 void
-vy_quota_force_use(struct vy_quota *q, size_t size)
+vy_quota_force_use(struct vy_quota *q, enum vy_quota_consumer_type type,
+		   size_t size)
 {
-	vy_quota_do_use(q, size);
+	vy_quota_do_use(q, type, size);
 	vy_quota_check_limit(q);
 }
 
@@ -201,7 +275,8 @@ vy_quota_release(struct vy_quota *q, size_t size)
 }
 
 int
-vy_quota_use(struct vy_quota *q, size_t size, double timeout)
+vy_quota_use(struct vy_quota *q, enum vy_quota_consumer_type type,
+	     size_t size, double timeout)
 {
 	/*
 	 * Fail early if the configured memory limit never allows
@@ -212,8 +287,8 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 		return -1;
 	}
 
-	if (vy_quota_may_use(q, size)) {
-		vy_quota_do_use(q, size);
+	if (vy_quota_may_use(q, type, size)) {
+		vy_quota_do_use(q, type, size);
 		return 0;
 	}
 
@@ -224,8 +299,9 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 	struct vy_quota_wait_node wait_node = {
 		.fiber = fiber(),
 		.size = size,
+		.ticket = ++q->wait_ticket,
 	};
-	rlist_add_tail_entry(&q->wait_queue, &wait_node, in_wait_queue);
+	rlist_add_tail_entry(&q->wait_queue[type], &wait_node, in_wait_queue);
 
 	do {
 		double now = ev_monotonic_now(loop());
@@ -235,7 +311,7 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 			diag_set(ClientError, ER_VY_QUOTA_TIMEOUT);
 			return -1;
 		}
-	} while (!vy_quota_may_use(q, size));
+	} while (!vy_quota_may_use(q, type, size));
 
 	rlist_del_entry(&wait_node, in_wait_queue);
 
@@ -246,7 +322,7 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 				     wait_time);
 	}
 
-	vy_quota_do_use(q, size);
+	vy_quota_do_use(q, type, size);
 	/*
 	 * Blocked consumers are awaken one by one to preserve
 	 * the order they were put to sleep. It's a responsibility
@@ -258,14 +334,15 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout)
 }
 
 void
-vy_quota_adjust(struct vy_quota *q, size_t reserved, size_t used)
+vy_quota_adjust(struct vy_quota *q, enum vy_quota_consumer_type type,
+		size_t reserved, size_t used)
 {
 	if (reserved > used) {
-		vy_quota_do_unuse(q, reserved - used);
+		vy_quota_do_unuse(q, type, reserved - used);
 		vy_quota_signal(q);
 	}
 	if (reserved < used) {
-		vy_quota_do_use(q, used - reserved);
+		vy_quota_do_use(q, type, used - reserved);
 		vy_quota_check_limit(q);
 	}
 }
diff --git a/src/box/vy_quota.h b/src/box/vy_quota.h
index 79755e89..d90922b2 100644
--- a/src/box/vy_quota.h
+++ b/src/box/vy_quota.h
@@ -110,6 +110,50 @@ vy_rate_limit_refill(struct vy_rate_limit *rl, double time)
 typedef void
 (*vy_quota_exceeded_f)(struct vy_quota *quota);
 
+/**
+ * Apart from memory usage accounting and limiting, vy_quota is
+ * responsible for consumption rate limiting (aka throttling).
+ * There are multiple rate limits, each of which is associated
+ * with a particular resource type. Different kinds of consumers
+ * respect different limits. The following enumeration defines
+ * the resource types for which vy_quota enables throttling.
+ *
+ * See also vy_quota_consumer_resource_map.
+ */
+enum vy_quota_resource_type {
+	/**
+	 * The goal of disk-based throttling is to keep LSM trees
+	 * in a good shape so that read and space amplification
+	 * stay within bounds. It is enabled when compaction does
+	 * not keep up with dumps.
+	 */
+	VY_QUOTA_RESOURCE_DISK = 0,
+	/**
+	 * Memory-based throttling is needed to avoid long stalls
+	 * caused by hitting the hard memory limit. It is set so
+	 * that by the time the hard limit is hit, the last memory
+	 * dump will have completed.
+	 */
+	VY_QUOTA_RESOURCE_MEMORY = 1,
+
+	vy_quota_resource_type_MAX,
+};
+
+/**
+ * Quota consumer type determines how a quota consumer will be
+ * rate limited.
+ *
+ * See also vy_quota_consumer_resource_map.
+ */
+enum vy_quota_consumer_type {
+	/** Transaction processor. */
+	VY_QUOTA_CONSUMER_TX = 0,
+	/** Compaction job. */
+	VY_QUOTA_CONSUMER_COMPACTION = 1,
+
+	vy_quota_consumer_type_MAX,
+};
+
 struct vy_quota_wait_node {
 	/** Link in vy_quota::wait_queue. */
 	struct rlist in_wait_queue;
@@ -117,6 +161,11 @@ struct vy_quota_wait_node {
 	struct fiber *fiber;
 	/** Amount of requested memory. */
 	size_t size;
+	/**
+	 * Ticket assigned to this fiber when it was put to
+	 * sleep, see vy_quota::wait_ticket for more details.
+	 */
+	int64_t ticket;
 };
 
 /**
@@ -144,13 +193,23 @@ struct vy_quota {
 	 */
 	vy_quota_exceeded_f quota_exceeded_cb;
 	/**
-	 * Queue of consumers waiting for quota, linked by
-	 * vy_quota_wait_node::state. Newcomers are added
-	 * to the tail.
+	 * Monotonically growing counter assigned to consumers
+	 * waiting for quota. It is used for balancing wakeups
+	 * among wait queues: if two fibers from different wait
+	 * queues may proceed, the one with the lowest ticket
+	 * will be picked.
+	 *
+	 * See also vy_quota_wait_node::ticket.
 	 */
-	struct rlist wait_queue;
-	/** Rate limit state. */
-	struct vy_rate_limit rate_limit;
+	int64_t wait_ticket;
+	/**
+	 * Queue of consumers waiting for quota, one per each
+	 * consumer type, linked by vy_quota_wait_node::state.
+	 * Newcomers are added to the tail.
+	 */
+	struct rlist wait_queue[vy_quota_consumer_type_MAX];
+	/** Rate limit state, one per each resource type. */
+	struct vy_rate_limit rate_limit[vy_quota_resource_type_MAX];
 	/**
 	 * Periodic timer that is used for refilling the rate
 	 * limit value.
@@ -188,18 +247,20 @@ void
 vy_quota_set_limit(struct vy_quota *q, size_t limit);
 
 /**
- * Set the max rate at which quota may be consumed,
- * in bytes per second.
+ * Set the rate limit corresponding to the resource of the given
+ * type. The rate limit is given in bytes per second.
  */
 void
-vy_quota_set_rate_limit(struct vy_quota *q, size_t rate);
+vy_quota_set_rate_limit(struct vy_quota *q, enum vy_quota_resource_type type,
+			size_t rate);
 
 /**
  * Consume @size bytes of memory. In contrast to vy_quota_use()
  * this function does not throttle the caller.
  */
 void
-vy_quota_force_use(struct vy_quota *q, size_t size);
+vy_quota_force_use(struct vy_quota *q, enum vy_quota_consumer_type type,
+		   size_t size);
 
 /**
  * Release @size bytes of memory.
@@ -242,7 +303,8 @@ vy_quota_release(struct vy_quota *q, size_t size);
  * account while estimating the size of a memory allocation.
  */
 int
-vy_quota_use(struct vy_quota *q, size_t size, double timeout);
+vy_quota_use(struct vy_quota *q, enum vy_quota_consumer_type type,
+	     size_t size, double timeout);
 
 /**
  * Adjust quota after allocating memory.
@@ -253,15 +315,16 @@ vy_quota_use(struct vy_quota *q, size_t size, double timeout);
  * See also vy_quota_use().
  */
 void
-vy_quota_adjust(struct vy_quota *q, size_t reserved, size_t used);
+vy_quota_adjust(struct vy_quota *q, enum vy_quota_consumer_type type,
+		size_t reserved, size_t used);
 
 /**
  * Block the caller until the quota is not exceeded.
  */
 static inline void
-vy_quota_wait(struct vy_quota *q)
+vy_quota_wait(struct vy_quota *q, enum vy_quota_consumer_type type)
 {
-	vy_quota_use(q, 0, TIMEOUT_INFINITY);
+	vy_quota_use(q, type, 0, TIMEOUT_INFINITY);
 }
 
 #if defined(__cplusplus)
diff --git a/src/box/vy_regulator.c b/src/box/vy_regulator.c
index 2e09b93c..e14b01aa 100644
--- a/src/box/vy_regulator.c
+++ b/src/box/vy_regulator.c
@@ -101,7 +101,8 @@ vy_regulator_trigger_dump(struct vy_regulator *regulator)
 	size_t max_write_rate = (double)mem_left / (mem_used + 1) *
 					regulator->dump_bandwidth;
 	max_write_rate = MIN(max_write_rate, regulator->dump_bandwidth);
-	vy_quota_set_rate_limit(quota, max_write_rate);
+	vy_quota_set_rate_limit(quota, VY_QUOTA_RESOURCE_MEMORY,
+				max_write_rate);
 }
 
 static void
@@ -202,7 +203,8 @@ void
 vy_regulator_start(struct vy_regulator *regulator)
 {
 	regulator->quota_used_last = regulator->quota->used;
-	vy_quota_set_rate_limit(regulator->quota, regulator->dump_bandwidth);
+	vy_quota_set_rate_limit(regulator->quota, VY_QUOTA_RESOURCE_MEMORY,
+				regulator->dump_bandwidth);
 	ev_timer_start(loop(), ®ulator->timer);
 }
 
@@ -253,7 +255,8 @@ vy_regulator_dump_complete(struct vy_regulator *regulator,
 	 * limit to the dump bandwidth rather than disabling it
 	 * completely.
 	 */
-	vy_quota_set_rate_limit(regulator->quota, regulator->dump_bandwidth);
+	vy_quota_set_rate_limit(regulator->quota, VY_QUOTA_RESOURCE_MEMORY,
+				regulator->dump_bandwidth);
 }
 
 void
@@ -263,5 +266,6 @@ vy_regulator_reset_dump_bandwidth(struct vy_regulator *regulator, size_t max)
 	regulator->dump_bandwidth = VY_DUMP_BANDWIDTH_DEFAULT;
 	if (max > 0 && regulator->dump_bandwidth > max)
 		regulator->dump_bandwidth = max;
-	vy_quota_set_rate_limit(regulator->quota, regulator->dump_bandwidth);
+	vy_quota_set_rate_limit(regulator->quota, VY_QUOTA_RESOURCE_MEMORY,
+				regulator->dump_bandwidth);
 }
-- 
2.11.0
    
    
More information about the Tarantool-patches
mailing list