[Tarantool-patches] [PATCH v3] lua: add fiber.top() listing fiber cpu consumption

Serge Petrenko sergepetrenko at tarantool.org
Fri Nov 1 17:05:23 MSK 2019


Implement a new function in Lua fiber library: top(). It returns a table
of alive fibers (including the scheduler). Each table entry has two
fields: average cpu consumption, which is calculated with exponential
moving average over event loop iterations, and current cpu consumption,
which shows fiber's cpu usage over the last event loop iteration.
The patch relies on CPU timestamp counter to measure each fiber's time
share.

Closes #2694

@TarantoolBot document
Title: fiber: new function `fiber.top()`

`fiber.top()` returns a table of all alive fibers and lists their cpu
consumption. Let's take a look at the example:
```
tarantool> fiber.top()
---
- 104/lua:
    instant: 18.433514726042
    time: 0.677505865
    average: 21.98826143184
  103/lua:
    instant: 19.131392015951
    time: 0.689521917
    average: 20.807772656431
  107/lua:
    instant: 18.624600174469
    time: 0.681585168
    average: 17.78194117452
  101/on_shutdown:
    instant: 0
    time: 0
    average: 0
  105/lua:
    instant: 18.562289702156
    time: 0.682085309
    average: 15.513811055476
  106/lua:
    instant: 18.441822789017
    time: 0.677320271
    average: 15.427595583115
  102/interactive:
    instant: 0
    time: 0.000367182
    average: 0
  cpu misses: 0
  1/sched:
    instant: 6.8063805923649
    time: 0.253035056
    average: 8.3479789103691
...

```
In the table above keys are strings containing fiber ids and names
(the only exception is a single 'cpu misses' key which indicates the
number of times tx thread was rescheduled on a different cpu core.
More on that later).
The three metrics available for each fiber are:
1) instant (per cent),
which indicates the share of time fiber was executing during the
previous event loop iteration
2) average (per cent), which is calculated as an exponential moving
average of `instant` values over all previous event loop iterations.
3) time (seconds), which estimates how much cpu time each fiber spent
processing during its lifetime.

More info on `cpu misses` field returned by `fiber.top()`:
`cpu misses` indicates the amount of times tx thread detected it was
rescheduled on a different cpu core during the last event loop
iteration.
fiber.top() uses cpu timestamp counter to measure each fiber's execution
time. However, each cpu core may have its own counter value (you can
only rely on counter deltas if both measurements were taken on the same
core, otherwise the delta may even get negative).
When tx thread is rescheduled to a different cpu core, tarantool just
assumes cpu delta was zero for the latest measurement. This loweres
precision of our computations, so the bigger `cpu misses` value the
lower the precision of fiber.top() results.

Fiber.top() doesn't work on arm architecture at the moment.

Please note, that enabling fiber.top() slows down fiber switching by
about 15 per cent, so it is disabled by default.
To enable it you need to issue `fiber.top_enable()`.
You can disable it back after you finished debugging  using
`fiber.top_disable()`.
"Time" entry is also added to each fibers output in fiber.info()
(it duplicates "time" entry from fiber.top()).
Note, that "time" is only counted while fiber.top is enabled.
---
https://github.com/tarantool/tarantool/issues/2694
https://github.com/tarantool/tarantool/tree/sp/gh-2694-fiber-top

Changes in v3: 
  - disable fiber.top() by default, throw away the 
    erroneous swim test fix from the first patch.
  - improve readability: start indexing fibers by id/name
    instead of plain id. 
  - various review fixes requested by @Gerold103
  - add fiber cpu time accounting: new field in
    fiber.info() for each fiber: "time". Same field
    in fiber.top() for each fiber.

Changes in v2: 
 - fixed unit/swim test hang in the 
   first patch
 - refactoring in second patch


 src/lib/core/fiber.c    | 194 +++++++++++++++++++++++++++++++++++++++-
 src/lib/core/fiber.h    |  70 +++++++++++++++
 src/lua/fiber.c         |  70 +++++++++++++++
 test/app/fiber.result   |  85 ++++++++++++++++++
 test/app/fiber.test.lua |  42 +++++++++
 5 files changed, 459 insertions(+), 2 deletions(-)

diff --git a/src/lib/core/fiber.c b/src/lib/core/fiber.c
index 93f22ae68..52888cc64 100644
--- a/src/lib/core/fiber.c
+++ b/src/lib/core/fiber.c
@@ -42,6 +42,10 @@
 #include "memory.h"
 #include "trigger.h"
 
+#if ENABLE_FIBER_TOP
+#include <x86intrin.h> /* __rdtscp() */
+#endif /* ENABLE_FIBER_TOP */
+
 #include "third_party/valgrind/memcheck.h"
 
 static int (*fiber_invoke)(fiber_func f, va_list ap);
@@ -82,6 +86,38 @@ static int (*fiber_invoke)(fiber_func f, va_list ap);
 	err;								\
 })
 
+#if ENABLE_FIBER_TOP
+static __thread bool fiber_top_enabled = false;
+
+/**
+ * An action performed each time a context switch happens.
+ * Used to count each fiber's processing time.
+ */
+static inline void
+clock_set_on_csw(struct fiber *caller)
+{
+	caller->csw++;
+	if (!fiber_top_enabled)
+		return;
+
+	uint64_t clock;
+	uint32_t cpu_id;
+	clock = __rdtscp(&cpu_id);
+
+	if (cpu_id == cord()->cpu_id_last) {
+		caller->clock_delta += clock - cord()->clock_last;
+		cord()->clock_delta += clock - cord()->clock_last;
+	} else {
+		cord()->cpu_id_last = cpu_id;
+		cord()->cpu_miss_count++;
+	}
+	cord()->clock_last = clock;
+}
+
+#else
+#define clock_set_on_csw(caller) ;
+#endif /* ENABLE_FIBER_TOP */
+
 /*
  * Defines a handler to be executed on exit from cord's thread func,
  * accessible via cord()->on_exit (normally NULL). It is used to
@@ -227,7 +263,6 @@ fiber_call_impl(struct fiber *callee)
 	cord->fiber = callee;
 
 	callee->flags &= ~FIBER_IS_READY;
-	callee->csw++;
 	ASAN_START_SWITCH_FIBER(asan_state, 1,
 				callee->stack,
 				callee->stack_size);
@@ -246,6 +281,7 @@ fiber_call(struct fiber *callee)
 	/** By convention, these triggers must not throw. */
 	if (! rlist_empty(&caller->on_yield))
 		trigger_run(&caller->on_yield, NULL);
+	clock_set_on_csw(caller);
 	callee->caller = caller;
 	callee->flags |= FIBER_IS_READY;
 	caller->flags |= FIBER_IS_READY;
@@ -474,11 +510,11 @@ fiber_yield(void)
 	/** By convention, these triggers must not throw. */
 	if (! rlist_empty(&caller->on_yield))
 		trigger_run(&caller->on_yield, NULL);
+	clock_set_on_csw(caller);
 
 	assert(callee->flags & FIBER_IS_READY || callee == &cord->sched);
 	assert(! (callee->flags & FIBER_IS_DEAD));
 	cord->fiber = callee;
-	callee->csw++;
 	callee->flags &= ~FIBER_IS_READY;
 	ASAN_START_SWITCH_FIBER(asan_state,
 				(caller->flags & FIBER_IS_DEAD) == 0,
@@ -584,6 +620,7 @@ fiber_schedule_list(struct rlist *list)
 	}
 	last->caller = fiber();
 	assert(fiber() == &cord()->sched);
+	clock_set_on_csw(fiber());
 	fiber_call_impl(first);
 }
 
@@ -656,6 +693,11 @@ fiber_reset(struct fiber *fiber)
 	rlist_create(&fiber->on_yield);
 	rlist_create(&fiber->on_stop);
 	fiber->flags = FIBER_DEFAULT_FLAGS;
+#if ENABLE_FIBER_TOP
+	fiber->cputime = 0;
+	fiber->clock_acc = 0;
+	fiber->clock_delta = 0;
+#endif /* ENABLE_FIBER_TOP */
 }
 
 /** Destroy an active fiber and prepare it for reuse. */
@@ -1044,6 +1086,147 @@ fiber_destroy_all(struct cord *cord)
 						      struct fiber, link));
 }
 
+#if ENABLE_FIBER_TOP
+static void
+loop_on_iteration_start(ev_loop *loop, ev_check *watcher, int revents)
+{
+	(void) loop;
+	(void) watcher;
+	(void) revents;
+
+	cord()->clock_last = __rdtscp(&cord()->cpu_id_last);
+	cord()->cpu_miss_count = 0;
+
+	/*
+	 * We want to measure thread cpu time here to calculate
+	 * each fiber's cpu time, so don't use libev's ev_now() or
+	 * ev_time() since they use either monotonic or realtime
+	 * system clocks.
+	 */
+	struct timespec ts;
+	if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts) != 0) {
+		say_debug("clock_gettime(): failed to get this"
+			  "thread's cpu time.");
+		return;
+	}
+	cord()->cputime_last = (uint64_t) ts.tv_sec * FIBER_TIME_RES +
+					  ts.tv_nsec;
+}
+
+
+/**
+ * Calculate the exponential moving average for the clock deltas
+ * per loop iteration. The coeffitient is 1/16.
+ */
+static inline uint64_t
+clock_diff_accumulate(uint64_t acc, uint64_t delta)
+{
+	if (acc > 0) {
+		return delta / 16 + 15 * acc / 16;
+	} else {
+		return delta;
+	}
+}
+
+static void
+loop_on_iteration_end(ev_loop *loop, ev_prepare *watcher, int revents)
+{
+	(void) loop;
+	(void) watcher;
+	(void) revents;
+	struct fiber *fiber;
+	assert(fiber() == &cord()->sched);
+
+	/*
+	 * Record the scheduler's latest clock change, even though
+	 * it's not a context switch, but an event loop iteration
+	 * end.
+	 */
+	clock_set_on_csw(&cord()->sched);
+
+	cord()->cpu_miss_count_last = cord()->cpu_miss_count;
+	cord()->cpu_miss_count = 0;
+
+	struct timespec ts;
+	uint64_t delta_time;
+	double nsec_per_clock = 0;
+
+	if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts) != 0) {
+		say_debug("clock_gettime(): failed to get this"
+			  "thread's cpu time.");
+	} else {
+		delta_time = (uint64_t) ts.tv_sec * FIBER_TIME_RES +
+			     ts.tv_nsec;
+		assert(delta_time > cord()->cputime_last);
+		delta_time -= cord()->cputime_last;
+
+		if (cord()->clock_delta > 0)
+			nsec_per_clock = (double) delta_time / cord()->clock_delta;
+	}
+
+	cord()->clock_acc = clock_diff_accumulate(cord()->clock_acc, cord()->clock_delta);
+	cord()->clock_delta_last = cord()->clock_delta;
+	cord()->clock_delta = 0;
+
+	cord()->sched.clock_acc = clock_diff_accumulate(cord()->sched.clock_acc, cord()->sched.clock_delta);
+	cord()->sched.clock_delta_last = cord()->sched.clock_delta;
+	cord()->sched.cputime += cord()->sched.clock_delta * nsec_per_clock;
+	cord()->sched.clock_delta = 0;
+
+	rlist_foreach_entry(fiber, &cord()->alive, link) {
+		fiber->clock_acc = clock_diff_accumulate(fiber->clock_acc, fiber->clock_delta);
+		fiber->clock_delta_last = fiber->clock_delta;
+		fiber->cputime += fiber->clock_delta * nsec_per_clock;
+		fiber->clock_delta = 0;
+	}
+}
+
+static inline void
+fiber_top_init()
+{
+	ev_prepare_init(&cord()->prepare_event, loop_on_iteration_end);
+	ev_check_init(&cord()->check_event, loop_on_iteration_start);
+}
+
+bool
+fiber_top_is_enabled()
+{
+	return fiber_top_enabled;
+}
+
+inline void
+fiber_top_enable()
+{
+	if (!fiber_top_enabled) {
+		ev_prepare_start(cord()->loop, &cord()->prepare_event);
+		ev_check_start(cord()->loop, &cord()->check_event);
+		fiber_top_enabled = true;
+
+		cord()->clock_acc = 0;
+		cord()->cpu_miss_count_last = 0;
+		cord()->clock_delta_last = 0;
+		struct timespec ts;
+		if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts) != 0) {
+			say_debug("clock_gettime(): failed to get this"
+				  "thread's cpu time.");
+			return;
+		}
+		cord()->cputime_last = (uint64_t) ts.tv_sec * FIBER_TIME_RES +
+						  ts.tv_nsec;
+	}
+}
+
+inline void
+fiber_top_disable()
+{
+	if (fiber_top_enabled) {
+		ev_prepare_stop(cord()->loop, &cord()->prepare_event);
+		ev_check_stop(cord()->loop, &cord()->check_event);
+		fiber_top_enabled = false;
+	}
+}
+#endif /* ENABLE_FIBER_TOP */
+
 void
 cord_create(struct cord *cord, const char *name)
 {
@@ -1077,6 +1260,13 @@ cord_create(struct cord *cord, const char *name)
 	ev_async_init(&cord->wakeup_event, fiber_schedule_wakeup);
 
 	ev_idle_init(&cord->idle_event, fiber_schedule_idle);
+
+#if ENABLE_FIBER_TOP
+	/* fiber.top() currently works only for the main thread. */
+	if (cord_is_main()) {
+		fiber_top_init();
+	}
+#endif /* ENABLE_FIBER_TOP */
 	cord_set_name(name);
 
 #if ENABLE_ASAN
diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h
index fb168e25e..ab313a050 100644
--- a/src/lib/core/fiber.h
+++ b/src/lib/core/fiber.h
@@ -46,6 +46,21 @@
 
 #include <third_party/coro/coro.h>
 
+/*
+ * Fiber top doesn't work on ARM processors at the moment,
+ * because we haven't chosen an alternative to rdtsc.
+ */
+#ifdef __CC_ARM
+#define ENABLE_FIBER_TOP 0
+#else
+#define ENABLE_FIBER_TOP 1
+#endif
+
+#if ENABLE_FIBER_TOP
+/* A fiber reports used up CPU time with nanosecond resolution. */
+#define FIBER_TIME_RES 1000000000
+#endif /* ENABLE_FIBER_TOP */
+
 #if defined(__cplusplus)
 extern "C" {
 #endif /* defined(__cplusplus) */
@@ -386,6 +401,23 @@ struct fiber {
 	uint32_t fid;
 	/** Fiber flags */
 	uint32_t flags;
+#if ENABLE_FIBER_TOP
+	/**
+	 * Accumulated clock value calculated using exponential
+	 * moving average.
+	 */
+	uint64_t clock_acc;
+	/**
+	 * Total processor time this fiber has spent with
+	 * 1 / FIBER_TIME_RES second precision.
+	 */
+	uint64_t cputime;
+	/**
+	 * Clock delta calculated on previous event loop iteration.
+	 */
+	uint64_t clock_delta_last;
+	uint64_t clock_delta;
+#endif /* ENABLE_FIBER_TOP */
 	/** Link in cord->alive or cord->dead list. */
 	struct rlist link;
 	/** Link in cord->ready list. */
@@ -457,6 +489,23 @@ struct cord {
 	 * reserved.
 	 */
 	uint32_t max_fid;
+#if ENABLE_FIBER_TOP
+	uint64_t clock_acc;
+	uint64_t clock_delta;
+	uint64_t clock_delta_last;
+	uint64_t clock_last;
+	uint32_t cpu_id_last;
+	uint32_t cpu_miss_count;
+	uint32_t cpu_miss_count_last;
+	/**
+	 * This thread's CPU time at the beginning of event loop
+	 * iteration. Used to calculate how much cpu time has
+	 * each loop iteration consumed and update fiber cpu
+	 * times propotionally. The resolution is
+	 * 1 / FIBER_TIME_RES seconds.
+	 */
+	uint64_t cputime_last;
+#endif /* ENABLE_FIBER_TOP */
 	pthread_t id;
 	const struct cord_on_exit *on_exit;
 	/** A helper hash to map id -> fiber. */
@@ -482,6 +531,16 @@ struct cord {
 	 * is no 1 ms delay in case of zero sleep timeout.
 	 */
 	ev_idle idle_event;
+#if ENABLE_FIBER_TOP
+	/** An event triggered on every event loop iteration start. */
+	ev_check check_event;
+	/**
+	 * An event triggered on every event loop iteration end.
+	 * Just like the event above it is used in per-fiber cpu
+	 * time calculations.
+	 */
+	ev_prepare prepare_event;
+#endif /* ENABLE_FIBER_TOP */
 	/** A memory cache for (struct fiber) */
 	struct mempool fiber_mempool;
 	/** A runtime slab cache for general use in this cord. */
@@ -625,6 +684,17 @@ typedef int (*fiber_stat_cb)(struct fiber *f, void *ctx);
 int
 fiber_stat(fiber_stat_cb cb, void *cb_ctx);
 
+#if ENABLE_FIBER_TOP
+bool
+fiber_top_is_enabled();
+
+void
+fiber_top_enable();
+
+void
+fiber_top_disable();
+#endif /* ENABLE_FIBER_TOP */
+
 /** Useful for C unit tests */
 static inline int
 fiber_c_invoke(fiber_func f, va_list ap)
diff --git a/src/lua/fiber.c b/src/lua/fiber.c
index 124908a05..a030e444d 100644
--- a/src/lua/fiber.c
+++ b/src/lua/fiber.c
@@ -276,6 +276,10 @@ lbox_fiber_statof(struct fiber *f, void *cb_ctx, bool backtrace)
 	lua_pushnumber(L, f->csw);
 	lua_settable(L, -3);
 
+	lua_pushliteral(L, "time");
+	lua_pushnumber(L, f->cputime / (double) FIBER_TIME_RES);
+	lua_settable(L, -3);
+
 	lua_pushliteral(L, "memory");
 	lua_newtable(L);
 	lua_pushstring(L, "used");
@@ -319,6 +323,67 @@ lbox_fiber_statof_nobt(struct fiber *f, void *cb_ctx)
 	return lbox_fiber_statof(f, cb_ctx, false);
 }
 
+#if ENABLE_FIBER_TOP
+static int
+lbox_fiber_top_entry(struct fiber *f, void *cb_ctx)
+{
+	struct lua_State *L = (struct lua_State *) cb_ctx;
+	char name_buf[64];
+
+	snprintf(name_buf, sizeof(name_buf), "%u/%s", f->fid, f->name);
+	lua_pushstring(L, name_buf);
+
+	lua_newtable(L);
+
+	lua_pushliteral(L, "average");
+	lua_pushnumber(L, f->clock_acc / (double)cord()->clock_acc * 100);
+	lua_settable(L, -3);
+	lua_pushliteral(L, "instant");
+	lua_pushnumber(L, f->clock_delta_last / (double)cord()->clock_delta_last * 100);
+	lua_settable(L, -3);
+	lua_pushliteral(L, "time");
+	lua_pushnumber(L, f->cputime / (double) FIBER_TIME_RES);
+	lua_settable(L, -3);
+	lua_settable(L, -3);
+
+	return 0;
+}
+
+static int
+lbox_fiber_top(struct lua_State *L)
+{
+	if (!fiber_top_is_enabled()) {
+		luaL_error(L, "fiber.top() is disabled. Enable it with"
+			      " fiber.top_enable() first");
+	}
+	lua_newtable(L);
+	lua_pushliteral(L, "cpu misses");
+	lua_pushnumber(L, cord()->cpu_miss_count_last);
+	lua_settable(L, -3);
+
+	lbox_fiber_top_entry(&cord()->sched, L);
+	fiber_stat(lbox_fiber_top_entry, L);
+
+	return 1;
+}
+
+static int
+lbox_fiber_top_enable(struct lua_State *L)
+{
+	(void) L;
+	fiber_top_enable();
+	return 0;
+}
+
+static int
+lbox_fiber_top_disable(struct lua_State *L)
+{
+	(void) L;
+	fiber_top_disable();
+	return 0;
+}
+#endif /* ENABLE_FIBER_TOP */
+
 /**
  * Return fiber statistics.
  */
@@ -743,6 +808,11 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
 
 static const struct luaL_Reg fiberlib[] = {
 	{"info", lbox_fiber_info},
+#if ENABLE_FIBER_TOP
+	{"top", lbox_fiber_top},
+	{"top_enable", lbox_fiber_top_enable},
+	{"top_disable", lbox_fiber_top_disable},
+#endif /* ENABLE_FIBER_TOP */
 	{"sleep", lbox_fiber_sleep},
 	{"yield", lbox_fiber_yield},
 	{"self", lbox_fiber_self},
diff --git a/test/app/fiber.result b/test/app/fiber.result
index 3c6115a33..3b9e5da9a 100644
--- a/test/app/fiber.result
+++ b/test/app/fiber.result
@@ -1462,6 +1462,91 @@ fiber.join(fiber.self())
 ---
 - error: the fiber is not joinable
 ...
+sum = 0
+---
+...
+-- gh-2694 fiber.top()
+fiber.top_enable()
+---
+...
+a = fiber.top()
+---
+...
+type(a)
+---
+- table
+...
+-- scheduler is present in fiber.top()
+-- and is indexed by name
+a["1/sched"] ~= nil
+---
+- true
+...
+type(a["cpu misses"]) == 'number'
+---
+- true
+...
+sum_inst = 0
+---
+...
+sum_avg = 0
+---
+...
+-- update table to make sure
+-- a full event loop iteration
+-- has ended
+a = fiber.top()
+---
+...
+for k, v in pairs(a) do\
+    if type(v) == 'table' then\
+        sum_inst = sum_inst + v["instant"]\
+        sum_avg = sum_avg + v["average"]\
+    end\
+end
+---
+...
+sum_inst
+---
+- 100
+...
+-- not exact due to accumulated integer division errors
+sum_avg > 99 and sum_avg < 101 or sum_avg
+---
+- true
+...
+tbl = nil
+---
+...
+f = fiber.new(function()\
+    for i = 1,1000 do end\
+    fiber.yield()\
+    tbl = fiber.top()[fiber.self().id()..'/'..fiber.self().name()]\
+end)
+---
+...
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+---
+...
+tbl["average"] > 0
+---
+- true
+...
+tbl["instant"] > 0
+---
+- true
+...
+tbl["time"] > 0
+---
+- true
+...
+fiber.top_disable()
+---
+...
+fiber.top()
+---
+- error: fiber.top() is disabled. Enable it with fiber.top_enable() first
+...
 -- cleanup
 test_run:cmd("clear filter")
 ---
diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
index c5647b8f2..ce1f55e8d 100644
--- a/test/app/fiber.test.lua
+++ b/test/app/fiber.test.lua
@@ -629,6 +629,48 @@ while f:status() ~= 'dead' do fiber.sleep(0.01) end
 --
 fiber.join(fiber.self())
 
+sum = 0
+
+-- gh-2694 fiber.top()
+fiber.top_enable()
+
+a = fiber.top()
+type(a)
+-- scheduler is present in fiber.top()
+-- and is indexed by name
+a["1/sched"] ~= nil
+type(a["cpu misses"]) == 'number'
+sum_inst = 0
+sum_avg = 0
+
+-- update table to make sure
+-- a full event loop iteration
+-- has ended
+a = fiber.top()
+for k, v in pairs(a) do\
+    if type(v) == 'table' then\
+        sum_inst = sum_inst + v["instant"]\
+        sum_avg = sum_avg + v["average"]\
+    end\
+end
+
+sum_inst
+-- not exact due to accumulated integer division errors
+sum_avg > 99 and sum_avg < 101 or sum_avg
+tbl = nil
+f = fiber.new(function()\
+    for i = 1,1000 do end\
+    fiber.yield()\
+    tbl = fiber.top()[fiber.self().id()..'/'..fiber.self().name()]\
+end)
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+tbl["average"] > 0
+tbl["instant"] > 0
+tbl["time"] > 0
+
+fiber.top_disable()
+fiber.top()
+
 -- cleanup
 test_run:cmd("clear filter")
 
-- 
2.21.0 (Apple Git-122)



More information about the Tarantool-patches mailing list