[tarantool-patches] [PATCH] lua: add fiber.top() listing fiber cpu consumption

Serge Petrenko sergepetrenko at tarantool.org
Thu Oct 3 23:22:37 MSK 2019


Implement a new function in Lua fiber library: top(). It returns a table
of alive fibers (including the scheduler). Each table entry has two
fields: average cpu consumption, which is calculated with exponential
moving average over event loop iterations, and current cpu consumption,
which shows fiber's cpu usage over the last event loop iteration.
The patch relies on CPU timestamp counter to measure each fiber's time
share.

Closes #2694

@TarantoolBot document
Title: fiber: new function `fiber.top()`

`fiber.top()` returns a table of all alive fibers and lists their cpu
consumption. Let's take a look at the example:
```
tarantool> fiber.top()
---
- 1:
    cpu average (%): 10.779696493982
    cpu instant (%): 10.435256168573
  115:
    cpu average (%): 5.4571279061075
    cpu instant (%): 5.9653973440576
  120:
    cpu average (%): 21.944382148464
    cpu instant (%): 23.849021825646
  116:
    cpu average (%): 8.6603872318158
    cpu instant (%): 9.6812031335093
  119:
    cpu average (%): 21.933168871944
    cpu instant (%): 20.007540530351
  cpu misses: 0
  118:
    cpu average (%): 19.342901995963
    cpu instant (%): 16.932679820703
  117:
    cpu average (%): 11.549674814981
    cpu instant (%): 13.128901177161
...
```
In the table above keys are fiber id's (and a single 'cpu misses' key
which indicates the amount of times tx thread was rescheduled on a
different cpu core. More on that later).
The two metrics available for each fiber are:
1) cpu instant (per cent),
which indicates the share of time fiber was executing during the
previous event loop iteration
2) cpu average (per cent), which is calculated as an exponential moving
average of `cpu instant` values over all previous event loop iterations.
---
https://github.com/tarantool/tarantool/issues/2694
https://github.com/tarantool/tarantool/compare/sp/gh-2694-fiber-top

 src/lib/core/fiber.c    | 134 ++++++++++++++++++++++++++++++++++++++++
 src/lib/core/fiber.h    |  34 ++++++++++
 src/lua/fiber.c         |  56 +++++++++++++++++
 test/app/fiber.result   |  78 +++++++++++++++++++++++
 test/app/fiber.test.lua |  31 ++++++++++
 5 files changed, 333 insertions(+)

diff --git a/src/lib/core/fiber.c b/src/lib/core/fiber.c
index ce90f930c..919b1705d 100644
--- a/src/lib/core/fiber.c
+++ b/src/lib/core/fiber.c
@@ -37,6 +37,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <pmatomic.h>
+#include <x86intrin.h> /* __rdtscp() */
 
 #include "assoc.h"
 #include "memory.h"
@@ -82,6 +83,28 @@ static int (*fiber_invoke)(fiber_func f, va_list ap);
 	err;								\
 })
 
+/**
+ * An action performed each time a context switch happens.
+ * Used to count each fiber's processing time.
+ * This is a macro rather than a function, since it is used
+ * in scheduler too.
+ */
+#define clock_set_on_csw(caller)					\
+({									\
+	uint64_t clock;							\
+	uint32_t cpu_id;		 				\
+	clock = __rdtscp(&cpu_id);					\
+									\
+	if (cpu_id == cord()->cpu_id_last) {				\
+		(caller)->clock_delta += clock - cord()->clock_last;	\
+		cord()->clock_delta += clock - cord()->clock_last;	\
+	} else {							\
+		cord()->cpu_id_last = cpu_id;				\
+		cord()->cpu_miss_count++;				\
+	}								\
+	cord()->clock_last = clock;					\
+})
+
 /*
  * Defines a handler to be executed on exit from cord's thread func,
  * accessible via cord()->on_exit (normally NULL). It is used to
@@ -107,6 +130,8 @@ pthread_t main_thread_id;
 static size_t page_size;
 static int stack_direction;
 
+static bool fiber_top_enabled = false;
+
 enum {
 	/* The minimum allowable fiber stack size in bytes */
 	FIBER_STACK_SIZE_MINIMAL = 16384,
@@ -246,6 +271,7 @@ fiber_call(struct fiber *callee)
 	/** By convention, these triggers must not throw. */
 	if (! rlist_empty(&caller->on_yield))
 		trigger_run(&caller->on_yield, NULL);
+	clock_set_on_csw(caller);
 	callee->caller = caller;
 	callee->flags |= FIBER_IS_READY;
 	caller->flags |= FIBER_IS_READY;
@@ -474,6 +500,7 @@ fiber_yield(void)
 	/** By convention, these triggers must not throw. */
 	if (! rlist_empty(&caller->on_yield))
 		trigger_run(&caller->on_yield, NULL);
+	clock_set_on_csw(caller);
 
 	assert(callee->flags & FIBER_IS_READY || callee == &cord->sched);
 	assert(! (callee->flags & FIBER_IS_DEAD));
@@ -584,6 +611,7 @@ fiber_schedule_list(struct rlist *list)
 	}
 	last->caller = fiber();
 	assert(fiber() == &cord()->sched);
+	clock_set_on_csw(fiber());
 	fiber_call_impl(first);
 }
 
@@ -974,6 +1002,8 @@ fiber_new_ex(const char *name, const struct fiber_attr *fiber_attr,
 	}
 
 	fiber->f = f;
+	fiber->clock_acc = 0;
+	fiber->clock_delta = 0;
 	/* Excluding reserved range */
 	if (++cord->max_fid < FIBER_ID_MAX_RESERVED)
 		cord->max_fid = FIBER_ID_MAX_RESERVED + 1;
@@ -1038,6 +1068,103 @@ fiber_destroy_all(struct cord *cord)
 						      struct fiber, link));
 }
 
+static void
+loop_on_iteration_start(ev_loop *loop, ev_check *watcher, int revents)
+{
+	(void) loop;
+	(void) watcher;
+	(void) revents;
+
+	cord()->clock_last = __rdtscp(&cord()->cpu_id_last);
+	cord()->cpu_miss_count = 0;
+}
+
+
+/**
+ * Calculate the exponential moving average for the clock deltas
+ * per loop iteration. The coeffitient is 1/16.
+ */
+static inline uint64_t
+clock_diff_accumulate(uint64_t acc, uint64_t delta)
+{
+	if (acc > 0) {
+		return delta / 16 + 15 * acc / 16;
+	} else {
+		return delta;
+	}
+}
+
+static void
+loop_on_iteration_end(ev_loop *loop, ev_prepare *watcher, int revents)
+{
+	(void) loop;
+	(void) watcher;
+	(void) revents;
+	struct fiber *fiber;
+	assert(fiber() == &cord()->sched);
+
+	/*
+	 * Record the scheduler's latest clock change, even though
+	 * it's not a context switch, but an event loop iteration
+	 * end.
+	 */
+	clock_set_on_csw(&cord()->sched);
+
+	cord()->cpu_miss_count_last = cord()->cpu_miss_count;
+	cord()->cpu_miss_count = 0;
+
+	cord()->clock_acc = clock_diff_accumulate(cord()->clock_acc, cord()->clock_delta);
+	cord()->clock_delta_last = cord()->clock_delta;
+	cord()->clock_delta = 0;
+
+	cord()->sched.clock_acc = clock_diff_accumulate(cord()->sched.clock_acc, cord()->sched.clock_delta);
+	cord()->sched.clock_delta_last = cord()->sched.clock_delta;
+	cord()->sched.clock_delta = 0;
+
+	rlist_foreach_entry(fiber, &cord()->alive, link) {
+		fiber->clock_acc = clock_diff_accumulate(fiber->clock_acc, fiber->clock_delta);
+		fiber->clock_delta_last = fiber->clock_delta;
+		fiber->clock_delta = 0;
+	}
+}
+
+static inline void
+fiber_top_init()
+{
+	ev_prepare_init(&cord()->prepare_event, loop_on_iteration_end);
+	ev_check_init(&cord()->check_event, loop_on_iteration_start);
+}
+
+bool
+fiber_top_is_enabled()
+{
+	return fiber_top_enabled;
+}
+
+inline void
+fiber_top_enable()
+{
+	if (!fiber_top_enabled) {
+		ev_prepare_start(cord()->loop, &cord()->prepare_event);
+		ev_check_start(cord()->loop, &cord()->check_event);
+		fiber_top_enabled = true;
+
+		cord()->clock_acc = 0;
+		cord()->cpu_miss_count_last = 0;
+		cord()->clock_delta_last = 0;
+	}
+}
+
+inline void
+fiber_top_disable()
+{
+	if (fiber_top_enabled) {
+		ev_prepare_stop(cord()->loop, &cord()->prepare_event);
+		ev_check_stop(cord()->loop, &cord()->check_event);
+		fiber_top_enabled = false;
+	}
+}
+
 void
 cord_create(struct cord *cord, const char *name)
 {
@@ -1071,6 +1198,13 @@ cord_create(struct cord *cord, const char *name)
 	ev_async_init(&cord->wakeup_event, fiber_schedule_wakeup);
 
 	ev_idle_init(&cord->idle_event, fiber_schedule_idle);
+
+	/* fiber.top() currently works only for the main thread. */
+	if (cord_is_main()) {
+		fiber_top_init();
+		fiber_top_enable();
+	}
+
 	cord_set_name(name);
 
 #if ENABLE_ASAN
diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h
index fb168e25e..e56a2aa35 100644
--- a/src/lib/core/fiber.h
+++ b/src/lib/core/fiber.h
@@ -386,6 +386,16 @@ struct fiber {
 	uint32_t fid;
 	/** Fiber flags */
 	uint32_t flags;
+	/**
+	 * Accumulated clock value calculated using exponential
+	 * moving average.
+	 */
+	uint64_t clock_acc;
+	/**
+	 * Clock delta calculated on previous event loop iteration.
+	 */
+	uint64_t clock_delta_last;
+	uint64_t clock_delta;
 	/** Link in cord->alive or cord->dead list. */
 	struct rlist link;
 	/** Link in cord->ready list. */
@@ -457,6 +467,13 @@ struct cord {
 	 * reserved.
 	 */
 	uint32_t max_fid;
+	uint64_t clock_acc;
+	uint64_t clock_delta;
+	uint64_t clock_delta_last;
+	uint64_t clock_last;
+	uint32_t cpu_id_last;
+	uint32_t cpu_miss_count;
+	uint32_t cpu_miss_count_last;
 	pthread_t id;
 	const struct cord_on_exit *on_exit;
 	/** A helper hash to map id -> fiber. */
@@ -482,6 +499,14 @@ struct cord {
 	 * is no 1 ms delay in case of zero sleep timeout.
 	 */
 	ev_idle idle_event;
+	/** An event triggered on every event loop iteration start. */
+	ev_check check_event;
+	/**
+	 * An event triggered on every event loop iteration end.
+	 * Just like the event above it is used in per-fiber cpu
+	 * time calculations.
+	 */
+	ev_prepare prepare_event;
 	/** A memory cache for (struct fiber) */
 	struct mempool fiber_mempool;
 	/** A runtime slab cache for general use in this cord. */
@@ -625,6 +650,15 @@ typedef int (*fiber_stat_cb)(struct fiber *f, void *ctx);
 int
 fiber_stat(fiber_stat_cb cb, void *cb_ctx);
 
+bool
+fiber_top_is_enabled();
+
+void
+fiber_top_enable();
+
+void
+fiber_top_disable();
+
 /** Useful for C unit tests */
 static inline int
 fiber_c_invoke(fiber_func f, va_list ap)
diff --git a/src/lua/fiber.c b/src/lua/fiber.c
index 336be60a2..d5d9d5573 100644
--- a/src/lua/fiber.c
+++ b/src/lua/fiber.c
@@ -319,6 +319,59 @@ lbox_fiber_statof_nobt(struct fiber *f, void *cb_ctx)
 	return lbox_fiber_statof(f, cb_ctx, false);
 }
 
+static int
+lbox_fiber_top_entry(struct fiber *f, void *cb_ctx)
+{
+	struct lua_State *L = (struct lua_State *) cb_ctx;
+
+	lua_pushinteger(L, f->fid);
+	lua_newtable(L);
+
+	lua_pushliteral(L, "cpu average (%)");
+	lua_pushnumber(L, f->clock_acc / (double)cord()->clock_acc * 100);
+	lua_settable(L, -3);
+	lua_pushliteral(L, "cpu instant (%)");
+	lua_pushnumber(L, f->clock_delta_last / (double)cord()->clock_delta_last * 100);
+	lua_settable(L, -3);
+	lua_settable(L, -3);
+
+	return 0;
+}
+
+static int
+lbox_fiber_top(struct lua_State *L)
+{
+	if (!fiber_top_is_enabled()) {
+		luaL_error(L, "fiber.top() is disabled. enable it with"
+			      " fiber.top_enable() first");
+	}
+	lua_newtable(L);
+	lua_pushliteral(L, "cpu misses");
+	lua_pushnumber(L, cord()->cpu_miss_count_last);
+	lua_settable(L, -3);
+
+	lbox_fiber_top_entry(&cord()->sched, L);
+	fiber_stat(lbox_fiber_top_entry, L);
+
+	return 1;
+}
+
+static int
+lbox_fiber_top_enable(struct lua_State *L)
+{
+	(void) L;
+	fiber_top_enable();
+	return 0;
+}
+
+static int
+lbox_fiber_top_disable(struct lua_State *L)
+{
+	(void) L;
+	fiber_top_disable();
+	return 0;
+}
+
 /**
  * Return fiber statistics.
  */
@@ -741,6 +794,9 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
 
 static const struct luaL_Reg fiberlib[] = {
 	{"info", lbox_fiber_info},
+	{"top", lbox_fiber_top},
+	{"top_enable", lbox_fiber_top_enable},
+	{"top_disable", lbox_fiber_top_disable},
 	{"sleep", lbox_fiber_sleep},
 	{"yield", lbox_fiber_yield},
 	{"self", lbox_fiber_self},
diff --git a/test/app/fiber.result b/test/app/fiber.result
index 94e690f6c..6974cebc8 100644
--- a/test/app/fiber.result
+++ b/test/app/fiber.result
@@ -1462,6 +1462,84 @@ fiber.join(fiber.self())
 ---
 - error: the fiber is not joinable
 ...
+sum = 0
+---
+...
+-- gh-2694 fiber.top()
+a = fiber.top()
+---
+...
+-- scheduler is present in fiber.top()
+a[1] ~= nil
+---
+- true
+...
+type(a["cpu misses"]) == 'number'
+---
+- true
+...
+sum_inst = 0
+---
+...
+sum_avg = 0
+---
+...
+test_run:cmd('setopt delimiter ";"')
+---
+- true
+...
+for k, v in pairs(a) do
+    if type(v) == 'table' then
+        sum_inst = sum_inst + v["cpu instant (%)"]
+        sum_avg = sum_avg + v["cpu average (%)"]
+    end
+end;
+---
+...
+test_run:cmd('setopt delimiter ""');
+---
+- true
+...
+sum_inst
+---
+- 100
+...
+-- not exact due to accumulated integer division errors
+sum_avg > 99 and sum_avg < 101 or sum_avg
+---
+- true
+...
+tbl = nil
+---
+...
+f = fiber.new(function() for i = 1,1000 do end fiber.yield() tbl = fiber.top()[fiber.self().id()] end)
+---
+...
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+---
+...
+tbl["cpu average (%)"] > 0
+---
+- true
+...
+tbl["cpu instant (%)"] > 0
+---
+- true
+...
+fiber.top_disable()
+---
+...
+fiber.top()
+---
+- error: fiber.top() is disabled. enable it with fiber.top_enable() first
+...
+fiber.top_enable()
+---
+...
+type(fiber.top())
+---
+- table
+...
 -- cleanup
 test_run:cmd("clear filter")
 ---
diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
index bb8c24990..279c40ce1 100644
--- a/test/app/fiber.test.lua
+++ b/test/app/fiber.test.lua
@@ -629,6 +629,37 @@ while f:status() ~= 'dead' do fiber.sleep(0.01) end
 --
 fiber.join(fiber.self())
 
+sum = 0
+
+-- gh-2694 fiber.top()
+a = fiber.top()
+-- scheduler is present in fiber.top()
+a[1] ~= nil
+type(a["cpu misses"]) == 'number'
+sum_inst = 0
+sum_avg = 0
+test_run:cmd('setopt delimiter ";"')
+for k, v in pairs(a) do
+    if type(v) == 'table' then
+        sum_inst = sum_inst + v["cpu instant (%)"]
+        sum_avg = sum_avg + v["cpu average (%)"]
+    end
+end;
+test_run:cmd('setopt delimiter ""');
+sum_inst
+-- not exact due to accumulated integer division errors
+sum_avg > 99 and sum_avg < 101 or sum_avg
+tbl = nil
+f = fiber.new(function() for i = 1,1000 do end fiber.yield() tbl = fiber.top()[fiber.self().id()] end)
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+tbl["cpu average (%)"] > 0
+tbl["cpu instant (%)"] > 0
+
+fiber.top_disable()
+fiber.top()
+fiber.top_enable()
+type(fiber.top())
+
 -- cleanup
 test_run:cmd("clear filter")
 
-- 
2.21.0 (Apple Git-122)





More information about the Tarantool-patches mailing list