[tarantool-patches] Re: [PATCH v2 2/2] lua: add fiber.top() listing fiber cpu consumption

Georgy Kirichenko georgy at tarantool.org
Tue Oct 15 11:02:56 MSK 2019


Great job, thanks
The only question I have is what about ARM and x86(32), would we like to 
support it or we could just disable fiber.top on this architectures using 
defines?
@kyukhin

On Tuesday, October 8, 2019 6:03:56 PM MSK Serge Petrenko wrote:
> Implement a new function in Lua fiber library: top(). It returns a table
> of alive fibers (including the scheduler). Each table entry has two
> fields: average cpu consumption, which is calculated with exponential
> moving average over event loop iterations, and current cpu consumption,
> which shows fiber's cpu usage over the last event loop iteration.
> The patch relies on CPU timestamp counter to measure each fiber's time
> share.
> 
> Closes #2694
> 
> @TarantoolBot document
> Title: fiber: new function `fiber.top()`
> 
> `fiber.top()` returns a table of all alive fibers and lists their cpu
> consumption. Let's take a look at the example:
> ```
> tarantool> fiber.top()
> ---
> - 1:
>     cpu average (%): 10.779696493982
>     cpu instant (%): 10.435256168573
>   115:
>     cpu average (%): 5.4571279061075
>     cpu instant (%): 5.9653973440576
>   120:
>     cpu average (%): 21.944382148464
>     cpu instant (%): 23.849021825646
>   116:
>     cpu average (%): 8.6603872318158
>     cpu instant (%): 9.6812031335093
>   119:
>     cpu average (%): 21.933168871944
>     cpu instant (%): 20.007540530351
>   cpu misses: 0
>   118:
>     cpu average (%): 19.342901995963
>     cpu instant (%): 16.932679820703
>   117:
>     cpu average (%): 11.549674814981
>     cpu instant (%): 13.128901177161
> ...
> ```
> In the table above keys are fiber id's (and a single 'cpu misses' key
> which indicates the amount of times tx thread was rescheduled on a
> different cpu core. More on that later).
> The two metrics available for each fiber are:
> 1) cpu instant (per cent),
> which indicates the share of time fiber was executing during the
> previous event loop iteration
> 2) cpu average (per cent), which is calculated as an exponential moving
> average of `cpu instant` values over all previous event loop iterations.
> 
> More info on `cpu misses` field returned by `fiber.top()`:
> `cpu misses` indicates the amount of times tx thread detected it was
> rescheduled on a different cpu core during the last event loop
> iteration.
> fiber.top() uses cpu timestamp counter to measure each fiber's execution
> time. However, each cpu core may have its own counter value (you can
> only rely on counter deltas if both measurements were taken on the same
> core, otherwise the delta may even get negative).
> When tx thread is rescheduled to a different cpu core, tarantool just
> assumes cpu delta was zero for the lust measurement. This loweres
> precision of our computations, so the bigger `cpu misses` value the
> lower the precision of fiber.top() results.
> ---
>  src/lib/core/fiber.c    | 134 ++++++++++++++++++++++++++++++++++++++++
>  src/lib/core/fiber.h    |  34 ++++++++++
>  src/lua/fiber.c         |  56 +++++++++++++++++
>  test/app/fiber.result   |  78 +++++++++++++++++++++++
>  test/app/fiber.test.lua |  31 ++++++++++
>  5 files changed, 333 insertions(+)
> 
> diff --git a/src/lib/core/fiber.c b/src/lib/core/fiber.c
> index b813c1739..6973fd678 100644
> --- a/src/lib/core/fiber.c
> +++ b/src/lib/core/fiber.c
> @@ -37,6 +37,7 @@
>  #include <stdlib.h>
>  #include <string.h>
>  #include <pmatomic.h>
> +#include <x86intrin.h> /* __rdtscp() */
> 
>  #include "assoc.h"
>  #include "memory.h"
> @@ -82,6 +83,28 @@ static int (*fiber_invoke)(fiber_func f, va_list ap);
>  	err;						
		\
>  })
> 
> +/**
> + * An action performed each time a context switch happens.
> + * Used to count each fiber's processing time.
> + * This is a macro rather than a function, since it is used
> + * in scheduler too.
> + */
> +#define clock_set_on_csw(caller)					
\
> +({								
	\
> +	uint64_t clock;					
		\
> +	uint32_t cpu_id;		 			
	\
> +	clock = __rdtscp(&cpu_id);					
\
> +								
	\
> +	if (cpu_id == cord()->cpu_id_last) {				
\
> +		(caller)->clock_delta += clock - cord()->clock_last;	
\
> +		cord()->clock_delta += clock - cord()->clock_last;	\
> +	} else {						
	\
> +		cord()->cpu_id_last = cpu_id;			
	\
> +		cord()->cpu_miss_count++;			
	\
> +	}							
	\
> +	cord()->clock_last = clock;					
\
> +})
> +
>  /*
>   * Defines a handler to be executed on exit from cord's thread func,
>   * accessible via cord()->on_exit (normally NULL). It is used to
> @@ -107,6 +130,8 @@ pthread_t main_thread_id;
>  static size_t page_size;
>  static int stack_direction;
> 
> +static bool fiber_top_enabled = false;
> +
>  enum {
>  	/* The minimum allowable fiber stack size in bytes */
>  	FIBER_STACK_SIZE_MINIMAL = 16384,
> @@ -246,6 +271,7 @@ fiber_call(struct fiber *callee)
>  	/** By convention, these triggers must not throw. */
>  	if (! rlist_empty(&caller->on_yield))
>  		trigger_run(&caller->on_yield, NULL);
> +	clock_set_on_csw(caller);
>  	callee->caller = caller;
>  	callee->flags |= FIBER_IS_READY;
>  	caller->flags |= FIBER_IS_READY;
> @@ -474,6 +500,7 @@ fiber_yield(void)
>  	/** By convention, these triggers must not throw. */
>  	if (! rlist_empty(&caller->on_yield))
>  		trigger_run(&caller->on_yield, NULL);
> +	clock_set_on_csw(caller);
> 
>  	assert(callee->flags & FIBER_IS_READY || callee == &cord->sched);
>  	assert(! (callee->flags & FIBER_IS_DEAD));
> @@ -584,6 +611,7 @@ fiber_schedule_list(struct rlist *list)
>  	}
>  	last->caller = fiber();
>  	assert(fiber() == &cord()->sched);
> +	clock_set_on_csw(fiber());
>  	fiber_call_impl(first);
>  }
> 
> @@ -980,6 +1008,8 @@ fiber_new_ex(const char *name, const struct fiber_attr
> *fiber_attr, }
> 
>  	fiber->f = f;
> +	fiber->clock_acc = 0;
> +	fiber->clock_delta = 0;
>  	/* Excluding reserved range */
>  	if (++cord->max_fid < FIBER_ID_MAX_RESERVED)
>  		cord->max_fid = FIBER_ID_MAX_RESERVED + 1;
> @@ -1044,6 +1074,103 @@ fiber_destroy_all(struct cord *cord)
>  						      
struct fiber, link));
>  }
> 
> +static void
> +loop_on_iteration_start(ev_loop *loop, ev_check *watcher, int revents)
> +{
> +	(void) loop;
> +	(void) watcher;
> +	(void) revents;
> +
> +	cord()->clock_last = __rdtscp(&cord()->cpu_id_last);
> +	cord()->cpu_miss_count = 0;
> +}
> +
> +
> +/**
> + * Calculate the exponential moving average for the clock deltas
> + * per loop iteration. The coeffitient is 1/16.
> + */
> +static inline uint64_t
> +clock_diff_accumulate(uint64_t acc, uint64_t delta)
> +{
> +	if (acc > 0) {
> +		return delta / 16 + 15 * acc / 16;
> +	} else {
> +		return delta;
> +	}
> +}
> +
> +static void
> +loop_on_iteration_end(ev_loop *loop, ev_prepare *watcher, int revents)
> +{
> +	(void) loop;
> +	(void) watcher;
> +	(void) revents;
> +	struct fiber *fiber;
> +	assert(fiber() == &cord()->sched);
> +
> +	/*
> +	 * Record the scheduler's latest clock change, even though
> +	 * it's not a context switch, but an event loop iteration
> +	 * end.
> +	 */
> +	clock_set_on_csw(&cord()->sched);
> +
> +	cord()->cpu_miss_count_last = cord()->cpu_miss_count;
> +	cord()->cpu_miss_count = 0;
> +
> +	cord()->clock_acc = clock_diff_accumulate(cord()->clock_acc,
> cord()->clock_delta); +	cord()->clock_delta_last = cord()->clock_delta;
> +	cord()->clock_delta = 0;
> +
> +	cord()->sched.clock_acc = clock_diff_accumulate(cord()-
>sched.clock_acc,
> cord()->sched.clock_delta); +	cord()->sched.clock_delta_last =
> cord()->sched.clock_delta;
> +	cord()->sched.clock_delta = 0;
> +
> +	rlist_foreach_entry(fiber, &cord()->alive, link) {
> +		fiber->clock_acc = clock_diff_accumulate(fiber->clock_acc,
> fiber->clock_delta); +		fiber->clock_delta_last = fiber-
>clock_delta;
> +		fiber->clock_delta = 0;
> +	}
> +}
> +
> +static inline void
> +fiber_top_init()
> +{
> +	ev_prepare_init(&cord()->prepare_event, loop_on_iteration_end);
> +	ev_check_init(&cord()->check_event, loop_on_iteration_start);
> +}
> +
> +bool
> +fiber_top_is_enabled()
> +{
> +	return fiber_top_enabled;
> +}
> +
> +inline void
> +fiber_top_enable()
> +{
> +	if (!fiber_top_enabled) {
> +		ev_prepare_start(cord()->loop, &cord()->prepare_event);
> +		ev_check_start(cord()->loop, &cord()->check_event);
> +		fiber_top_enabled = true;
> +
> +		cord()->clock_acc = 0;
> +		cord()->cpu_miss_count_last = 0;
> +		cord()->clock_delta_last = 0;
> +	}
> +}
> +
> +inline void
> +fiber_top_disable()
> +{
> +	if (fiber_top_enabled) {
> +		ev_prepare_stop(cord()->loop, &cord()->prepare_event);
> +		ev_check_stop(cord()->loop, &cord()->check_event);
> +		fiber_top_enabled = false;
> +	}
> +}
> +
>  void
>  cord_create(struct cord *cord, const char *name)
>  {
> @@ -1077,6 +1204,13 @@ cord_create(struct cord *cord, const char *name)
>  	ev_async_init(&cord->wakeup_event, fiber_schedule_wakeup);
> 
>  	ev_idle_init(&cord->idle_event, fiber_schedule_idle);
> +
> +	/* fiber.top() currently works only for the main thread. */
> +	if (cord_is_main()) {
> +		fiber_top_init();
> +		fiber_top_enable();
> +	}
> +
>  	cord_set_name(name);
> 
>  #if ENABLE_ASAN
> diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h
> index fb168e25e..e56a2aa35 100644
> --- a/src/lib/core/fiber.h
> +++ b/src/lib/core/fiber.h
> @@ -386,6 +386,16 @@ struct fiber {
>  	uint32_t fid;
>  	/** Fiber flags */
>  	uint32_t flags;
> +	/**
> +	 * Accumulated clock value calculated using exponential
> +	 * moving average.
> +	 */
> +	uint64_t clock_acc;
> +	/**
> +	 * Clock delta calculated on previous event loop iteration.
> +	 */
> +	uint64_t clock_delta_last;
> +	uint64_t clock_delta;
>  	/** Link in cord->alive or cord->dead list. */
>  	struct rlist link;
>  	/** Link in cord->ready list. */
> @@ -457,6 +467,13 @@ struct cord {
>  	 * reserved.
>  	 */
>  	uint32_t max_fid;
> +	uint64_t clock_acc;
> +	uint64_t clock_delta;
> +	uint64_t clock_delta_last;
> +	uint64_t clock_last;
> +	uint32_t cpu_id_last;
> +	uint32_t cpu_miss_count;
> +	uint32_t cpu_miss_count_last;
>  	pthread_t id;
>  	const struct cord_on_exit *on_exit;
>  	/** A helper hash to map id -> fiber. */
> @@ -482,6 +499,14 @@ struct cord {
>  	 * is no 1 ms delay in case of zero sleep timeout.
>  	 */
>  	ev_idle idle_event;
> +	/** An event triggered on every event loop iteration start. */
> +	ev_check check_event;
> +	/**
> +	 * An event triggered on every event loop iteration end.
> +	 * Just like the event above it is used in per-fiber cpu
> +	 * time calculations.
> +	 */
> +	ev_prepare prepare_event;
>  	/** A memory cache for (struct fiber) */
>  	struct mempool fiber_mempool;
>  	/** A runtime slab cache for general use in this cord. */
> @@ -625,6 +650,15 @@ typedef int (*fiber_stat_cb)(struct fiber *f, void
> *ctx); int
>  fiber_stat(fiber_stat_cb cb, void *cb_ctx);
> 
> +bool
> +fiber_top_is_enabled();
> +
> +void
> +fiber_top_enable();
> +
> +void
> +fiber_top_disable();
> +
>  /** Useful for C unit tests */
>  static inline int
>  fiber_c_invoke(fiber_func f, va_list ap)
> diff --git a/src/lua/fiber.c b/src/lua/fiber.c
> index 336be60a2..d5d9d5573 100644
> --- a/src/lua/fiber.c
> +++ b/src/lua/fiber.c
> @@ -319,6 +319,59 @@ lbox_fiber_statof_nobt(struct fiber *f, void *cb_ctx)
>  	return lbox_fiber_statof(f, cb_ctx, false);
>  }
> 
> +static int
> +lbox_fiber_top_entry(struct fiber *f, void *cb_ctx)
> +{
> +	struct lua_State *L = (struct lua_State *) cb_ctx;
> +
> +	lua_pushinteger(L, f->fid);
> +	lua_newtable(L);
> +
> +	lua_pushliteral(L, "cpu average (%)");
> +	lua_pushnumber(L, f->clock_acc / (double)cord()->clock_acc * 100);
> +	lua_settable(L, -3);
> +	lua_pushliteral(L, "cpu instant (%)");
> +	lua_pushnumber(L, f->clock_delta_last / (double)cord()-
>clock_delta_last *
> 100); +	lua_settable(L, -3);
> +	lua_settable(L, -3);
> +
> +	return 0;
> +}
> +
> +static int
> +lbox_fiber_top(struct lua_State *L)
> +{
> +	if (!fiber_top_is_enabled()) {
> +		luaL_error(L, "fiber.top() is disabled. enable it with"
> +			      " fiber.top_enable() first");
> +	}
> +	lua_newtable(L);
> +	lua_pushliteral(L, "cpu misses");
> +	lua_pushnumber(L, cord()->cpu_miss_count_last);
> +	lua_settable(L, -3);
> +
> +	lbox_fiber_top_entry(&cord()->sched, L);
> +	fiber_stat(lbox_fiber_top_entry, L);
> +
> +	return 1;
> +}
> +
> +static int
> +lbox_fiber_top_enable(struct lua_State *L)
> +{
> +	(void) L;
> +	fiber_top_enable();
> +	return 0;
> +}
> +
> +static int
> +lbox_fiber_top_disable(struct lua_State *L)
> +{
> +	(void) L;
> +	fiber_top_disable();
> +	return 0;
> +}
> +
>  /**
>   * Return fiber statistics.
>   */
> @@ -741,6 +794,9 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
> 
>  static const struct luaL_Reg fiberlib[] = {
>  	{"info", lbox_fiber_info},
> +	{"top", lbox_fiber_top},
> +	{"top_enable", lbox_fiber_top_enable},
> +	{"top_disable", lbox_fiber_top_disable},
>  	{"sleep", lbox_fiber_sleep},
>  	{"yield", lbox_fiber_yield},
>  	{"self", lbox_fiber_self},
> diff --git a/test/app/fiber.result b/test/app/fiber.result
> index 94e690f6c..6974cebc8 100644
> --- a/test/app/fiber.result
> +++ b/test/app/fiber.result
> @@ -1462,6 +1462,84 @@ fiber.join(fiber.self())
>  ---
>  - error: the fiber is not joinable
>  ...
> +sum = 0
> +---
> +...
> +-- gh-2694 fiber.top()
> +a = fiber.top()
> +---
> +...
> +-- scheduler is present in fiber.top()
> +a[1] ~= nil
> +---
> +- true
> +...
> +type(a["cpu misses"]) == 'number'
> +---
> +- true
> +...
> +sum_inst = 0
> +---
> +...
> +sum_avg = 0
> +---
> +...
> +test_run:cmd('setopt delimiter ";"')
> +---
> +- true
> +...
> +for k, v in pairs(a) do
> +    if type(v) == 'table' then
> +        sum_inst = sum_inst + v["cpu instant (%)"]
> +        sum_avg = sum_avg + v["cpu average (%)"]
> +    end
> +end;
> +---
> +...
> +test_run:cmd('setopt delimiter ""');
> +---
> +- true
> +...
> +sum_inst
> +---
> +- 100
> +...
> +-- not exact due to accumulated integer division errors
> +sum_avg > 99 and sum_avg < 101 or sum_avg
> +---
> +- true
> +...
> +tbl = nil
> +---
> +...
> +f = fiber.new(function() for i = 1,1000 do end fiber.yield() tbl =
> fiber.top()[fiber.self().id()] end) +---
> +...
> +while f:status() ~= 'dead' do fiber.sleep(0.01) end
> +---
> +...
> +tbl["cpu average (%)"] > 0
> +---
> +- true
> +...
> +tbl["cpu instant (%)"] > 0
> +---
> +- true
> +...
> +fiber.top_disable()
> +---
> +...
> +fiber.top()
> +---
> +- error: fiber.top() is disabled. enable it with fiber.top_enable() first
> +...
> +fiber.top_enable()
> +---
> +...
> +type(fiber.top())
> +---
> +- table
> +...
>  -- cleanup
>  test_run:cmd("clear filter")
>  ---
> diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
> index bb8c24990..279c40ce1 100644
> --- a/test/app/fiber.test.lua
> +++ b/test/app/fiber.test.lua
> @@ -629,6 +629,37 @@ while f:status() ~= 'dead' do fiber.sleep(0.01) end
>  --
>  fiber.join(fiber.self())
> 
> +sum = 0
> +
> +-- gh-2694 fiber.top()
> +a = fiber.top()
> +-- scheduler is present in fiber.top()
> +a[1] ~= nil
> +type(a["cpu misses"]) == 'number'
> +sum_inst = 0
> +sum_avg = 0
> +test_run:cmd('setopt delimiter ";"')
> +for k, v in pairs(a) do
> +    if type(v) == 'table' then
> +        sum_inst = sum_inst + v["cpu instant (%)"]
> +        sum_avg = sum_avg + v["cpu average (%)"]
> +    end
> +end;
> +test_run:cmd('setopt delimiter ""');
> +sum_inst
> +-- not exact due to accumulated integer division errors
> +sum_avg > 99 and sum_avg < 101 or sum_avg
> +tbl = nil
> +f = fiber.new(function() for i = 1,1000 do end fiber.yield() tbl =
> fiber.top()[fiber.self().id()] end) +while f:status() ~= 'dead' do
> fiber.sleep(0.01) end
> +tbl["cpu average (%)"] > 0
> +tbl["cpu instant (%)"] > 0
> +
> +fiber.top_disable()
> +fiber.top()
> +fiber.top_enable()
> +type(fiber.top())
> +
>  -- cleanup
>  test_run:cmd("clear filter")

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20191015/33aa90f7/attachment.sig>


More information about the Tarantool-patches mailing list