[PATCH v2 6/8] vinyl: keep track of compaction queue length

Vladimir Davydov vdavydov.dev at gmail.com
Sun Sep 16 20:06:49 MSK 2018


Currently, there's no way to figure out whether compaction keeps up
with dumps or not while this is essential for implementing transaction
throttling. This patch adds a metric that is supposed to help answer
this question. This is the compaction queue size. It is calculated per
range and per LSM tree as the total size of slices awaiting compaction.
We update the metric along with the compaction priority of a range, in
vy_range_update_compact_priority(), and account it to an LSM tree in
vy_lsm_acct_range(). For now, the new metric is reported only on per
index basis, in index.stat() under disk.compact.queue.
---
 src/box/vinyl.c            |  1 +
 src/box/vy_lsm.c           |  6 +++
 src/box/vy_lsm.h           | 16 +++++++-
 src/box/vy_range.c         | 13 +++++--
 src/box/vy_range.h         |  2 +
 src/box/vy_scheduler.c     |  9 ++++-
 src/box/vy_stat.h          |  2 +
 src/errinj.h               |  1 +
 test/box/errinj.result     |  4 +-
 test/vinyl/errinj.result   | 94 ++++++++++++++++++++++++++++++++++++++++++++++
 test/vinyl/errinj.test.lua | 27 +++++++++++++
 test/vinyl/info.result     | 10 +++++
 12 files changed, 177 insertions(+), 8 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 2c479836..02a2b69d 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -396,6 +396,7 @@ vinyl_index_stat(struct index *index, struct info_handler *h)
 	info_append_int(h, "count", stat->disk.compact.count);
 	vy_info_append_disk_stmt_counter(h, "in", &stat->disk.compact.in);
 	vy_info_append_disk_stmt_counter(h, "out", &stat->disk.compact.out);
+	vy_info_append_disk_stmt_counter(h, "queue", &stat->disk.compact.queue);
 	info_table_end(h); /* compact */
 	info_append_int(h, "index_size", lsm->page_index_size);
 	info_append_int(h, "bloom_size", lsm->bloom_size);
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index a1d4aa80..6b9d0e6d 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -735,12 +735,16 @@ void
 vy_lsm_acct_range(struct vy_lsm *lsm, struct vy_range *range)
 {
 	histogram_collect(lsm->run_hist, range->slice_count);
+	vy_disk_stmt_counter_add(&lsm->stat.disk.compact.queue,
+				 &range->compact_queue);
 }
 
 void
 vy_lsm_unacct_range(struct vy_lsm *lsm, struct vy_range *range)
 {
 	histogram_discard(lsm->run_hist, range->slice_count);
+	vy_disk_stmt_counter_sub(&lsm->stat.disk.compact.queue,
+				 &range->compact_queue);
 }
 
 int
@@ -1155,8 +1159,10 @@ vy_lsm_force_compaction(struct vy_lsm *lsm)
 
 	vy_range_tree_ifirst(lsm->tree, &it);
 	while ((range = vy_range_tree_inext(&it)) != NULL) {
+		vy_lsm_unacct_range(lsm, range);
 		range->needs_compaction = true;
 		vy_range_update_compact_priority(range, &lsm->opts);
+		vy_lsm_acct_range(lsm, range);
 	}
 
 	vy_range_heap_update_all(&lsm->range_heap);
diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h
index 6917d475..ba2feeef 100644
--- a/src/box/vy_lsm.h
+++ b/src/box/vy_lsm.h
@@ -436,11 +436,23 @@ vy_lsm_add_range(struct vy_lsm *lsm, struct vy_range *range);
 void
 vy_lsm_remove_range(struct vy_lsm *lsm, struct vy_range *range);
 
-/** Account a range to the run histogram of an LSM tree. */
+/**
+ * Account a range in an LSM tree.
+ *
+ * This function updates the following LSM tree statistics:
+ *  - vy_lsm::run_hist after a slice is added to or removed from
+ *    a range of the LSM tree.
+ *  - vy_lsm::stat::disk::compact::queue after compaction priority
+ *    of a range is updated.
+ */
 void
 vy_lsm_acct_range(struct vy_lsm *lsm, struct vy_range *range);
 
-/** Unaccount a range from the run histogram of an LSM tree. */
+/**
+ * Unaccount a range in an LSM tree.
+ *
+ * This function undoes the effect of vy_lsm_acct_range().
+ */
 void
 vy_lsm_unacct_range(struct vy_lsm *lsm, struct vy_range *range);
 
diff --git a/src/box/vy_range.c b/src/box/vy_range.c
index ddcd2ed3..4495ecd4 100644
--- a/src/box/vy_range.c
+++ b/src/box/vy_range.c
@@ -292,19 +292,24 @@ vy_range_update_compact_priority(struct vy_range *range,
 	assert(opts->run_count_per_level > 0);
 	assert(opts->run_size_ratio > 1);
 
+	range->compact_priority = 0;
+	vy_disk_stmt_counter_reset(&range->compact_queue);
+
 	if (range->slice_count <= 1) {
 		/* Nothing to compact. */
-		range->compact_priority = 0;
 		range->needs_compaction = false;
 		return;
 	}
+
 	if (range->needs_compaction) {
 		range->compact_priority = range->slice_count;
+		range->compact_queue = range->count;
 		return;
 	}
 
-	range->compact_priority = 0;
-
+	/* Total number of statements in checked runs. */
+	struct vy_disk_stmt_counter total_stmt_count;
+	vy_disk_stmt_counter_reset(&total_stmt_count);
 	/* Total number of checked runs. */
 	uint32_t total_run_count = 0;
 	/* The total size of runs checked so far. */
@@ -333,6 +338,7 @@ vy_range_update_compact_priority(struct vy_range *range,
 		total_size += size;
 		level_run_count++;
 		total_run_count++;
+		vy_disk_stmt_counter_add(&total_stmt_count, &slice->count);
 		while (size > target_run_size) {
 			/*
 			 * The run size exceeds the threshold
@@ -370,6 +376,7 @@ vy_range_update_compact_priority(struct vy_range *range,
 			 * this level and upper levels.
 			 */
 			range->compact_priority = total_run_count;
+			range->compact_queue = total_stmt_count;
 			est_new_run_size = total_size;
 		}
 	}
diff --git a/src/box/vy_range.h b/src/box/vy_range.h
index 2ca19a1c..426854ff 100644
--- a/src/box/vy_range.h
+++ b/src/box/vy_range.h
@@ -106,6 +106,8 @@ struct vy_range {
 	 * how we  decide how many runs to compact next time.
 	 */
 	int compact_priority;
+	/** Number of statements that need to be compacted. */
+	struct vy_disk_stmt_counter compact_queue;
 	/**
 	 * If this flag is set, the range must be scheduled for
 	 * major compaction, i.e. its compact_priority must be
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index dd1e88d2..e4afeafd 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -1201,8 +1201,8 @@ vy_task_dump_complete(struct vy_task *task)
 		slice = new_slices[i];
 		vy_lsm_unacct_range(lsm, range);
 		vy_range_add_slice(range, slice);
-		vy_lsm_acct_range(lsm, range);
 		vy_range_update_compact_priority(range, &lsm->opts);
+		vy_lsm_acct_range(lsm, range);
 		if (!vy_range_is_scheduled(range))
 			vy_range_heap_update(&lsm->range_heap,
 					     &range->heap_node);
@@ -1428,6 +1428,11 @@ err:
 static int
 vy_task_compact_execute(struct vy_task *task)
 {
+	struct errinj *errinj = errinj(ERRINJ_VY_COMPACTION_DELAY, ERRINJ_BOOL);
+	if (errinj != NULL && errinj->bparam) {
+		while (errinj->bparam)
+			fiber_sleep(0.01);
+	}
 	return vy_task_write_run(task);
 }
 
@@ -1551,8 +1556,8 @@ vy_task_compact_complete(struct vy_task *task)
 	}
 	range->n_compactions++;
 	range->version++;
-	vy_lsm_acct_range(lsm, range);
 	vy_range_update_compact_priority(range, &lsm->opts);
+	vy_lsm_acct_range(lsm, range);
 	lsm->stat.disk.compact.count++;
 
 	/*
diff --git a/src/box/vy_stat.h b/src/box/vy_stat.h
index 83d3b8f9..c094d414 100644
--- a/src/box/vy_stat.h
+++ b/src/box/vy_stat.h
@@ -149,6 +149,8 @@ struct vy_lsm_stat {
 			struct vy_disk_stmt_counter in;
 			/** Number of output statements. */
 			struct vy_disk_stmt_counter out;
+			/** Number of statements awaiting compaction. */
+			struct vy_disk_stmt_counter queue;
 		} compact;
 	} disk;
 	/** TX write set statistics. */
diff --git a/src/errinj.h b/src/errinj.h
index b6d4a4c9..84a1fbb5 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -120,6 +120,7 @@ struct errinj {
 	_(ERRINJ_VY_INDEX_FILE_RENAME, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT, {.iparam = -1}) \
 	_(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT, {.iparam = -1}) \
+	_(ERRINJ_VY_COMPACTION_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 8dae7614..81087900 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -58,6 +58,8 @@ errinj.info()
     state: 0
   ERRINJ_XLOG_META:
     state: false
+  ERRINJ_SNAP_COMMIT_DELAY:
+    state: false
   ERRINJ_WAL_BREAK_LSN:
     state: -1
   ERRINJ_WAL_WRITE_DISK:
@@ -74,7 +76,7 @@ errinj.info()
     state: false
   ERRINJ_RELAY_FINAL_JOIN:
     state: false
-  ERRINJ_SNAP_COMMIT_DELAY:
+  ERRINJ_VY_COMPACTION_DELAY:
     state: false
   ERRINJ_RELAY_FINAL_SLEEP:
     state: false
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index 17e4dc8c..cc2287d2 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -2118,3 +2118,97 @@ s:select()
 s:drop()
 ---
 ...
+--
+-- Check disk.compact.queue stat.
+--
+test_run:cmd("push filter 'bytes_compressed: .*' to 'bytes_compressed: <bytes_compressed>'")
+---
+- true
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+i = s:create_index('pk', {run_count_per_level = 2})
+---
+...
+function dump() for i = 1, 10 do s:replace{i} end box.snapshot() end
+---
+...
+dump()
+---
+...
+dump()
+---
+...
+i:stat().disk.compact.queue -- none
+---
+- bytes_compressed: <bytes_compressed>
+  pages: 0
+  rows: 0
+  bytes: 0
+...
+errinj.set('ERRINJ_VY_COMPACTION_DELAY', true)
+---
+- ok
+...
+dump()
+---
+...
+i:stat().disk.compact.queue -- 30 statements
+---
+- bytes_compressed: <bytes_compressed>
+  pages: 3
+  rows: 30
+  bytes: 471
+...
+dump()
+---
+...
+i:stat().disk.compact.queue -- 40 statements
+---
+- bytes_compressed: <bytes_compressed>
+  pages: 4
+  rows: 40
+  bytes: 628
+...
+dump()
+---
+...
+i:stat().disk.compact.queue -- 50 statements
+---
+- bytes_compressed: <bytes_compressed>
+  pages: 5
+  rows: 50
+  bytes: 785
+...
+box.stat.reset() -- doesn't affect queue size
+---
+...
+i:stat().disk.compact.queue -- 50 statements
+---
+- bytes_compressed: <bytes_compressed>
+  pages: 5
+  rows: 50
+  bytes: 785
+...
+errinj.set('ERRINJ_VY_COMPACTION_DELAY', false)
+---
+- ok
+...
+while i:stat().disk.compact.count < 2 do fiber.sleep(0.01) end
+---
+...
+i:stat().disk.compact.queue -- none
+---
+- bytes_compressed: <bytes_compressed>
+  pages: 0
+  rows: 0
+  bytes: 0
+...
+s:drop()
+---
+...
+test_run:cmd("clear filter")
+---
+- true
+...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index 1b02c01c..148662d8 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -850,3 +850,30 @@ fiber.sleep(0)
 s:create_index('sk', {parts = {2, 'unsigned'}})
 s:select()
 s:drop()
+
+--
+-- Check disk.compact.queue stat.
+--
+test_run:cmd("push filter 'bytes_compressed: .*' to 'bytes_compressed: <bytes_compressed>'")
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+i = s:create_index('pk', {run_count_per_level = 2})
+function dump() for i = 1, 10 do s:replace{i} end box.snapshot() end
+dump()
+dump()
+i:stat().disk.compact.queue -- none
+errinj.set('ERRINJ_VY_COMPACTION_DELAY', true)
+dump()
+i:stat().disk.compact.queue -- 30 statements
+dump()
+i:stat().disk.compact.queue -- 40 statements
+dump()
+i:stat().disk.compact.queue -- 50 statements
+box.stat.reset() -- doesn't affect queue size
+i:stat().disk.compact.queue -- 50 statements
+errinj.set('ERRINJ_VY_COMPACTION_DELAY', false)
+while i:stat().disk.compact.count < 2 do fiber.sleep(0.01) end
+i:stat().disk.compact.queue -- none
+s:drop()
+
+test_run:cmd("clear filter")
diff --git a/test/vinyl/info.result b/test/vinyl/info.result
index 3d7108cc..d13806de 100644
--- a/test/vinyl/info.result
+++ b/test/vinyl/info.result
@@ -171,6 +171,11 @@ istat()
         rows: 0
         bytes: 0
       count: 0
+      queue:
+        bytes_compressed: 0
+        pages: 0
+        rows: 0
+        bytes: 0
       out:
         bytes_compressed: 0
         pages: 0
@@ -983,6 +988,11 @@ istat()
         rows: 0
         bytes: 0
       count: 0
+      queue:
+        bytes_compressed: <bytes_compressed>
+        pages: 0
+        rows: 0
+        bytes: 0
       out:
         bytes_compressed: <bytes_compressed>
         pages: 0
-- 
2.11.0




More information about the Tarantool-patches mailing list