[tarantool-patches] Re: [PATCH v2 2/2] lua: add fiber.top() listing fiber cpu consumption
Georgy Kirichenko
georgy at tarantool.org
Tue Oct 15 11:02:56 MSK 2019
Great job, thanks
The only question I have is what about ARM and x86(32), would we like to
support it or we could just disable fiber.top on this architectures using
defines?
@kyukhin
On Tuesday, October 8, 2019 6:03:56 PM MSK Serge Petrenko wrote:
> Implement a new function in Lua fiber library: top(). It returns a table
> of alive fibers (including the scheduler). Each table entry has two
> fields: average cpu consumption, which is calculated with exponential
> moving average over event loop iterations, and current cpu consumption,
> which shows fiber's cpu usage over the last event loop iteration.
> The patch relies on CPU timestamp counter to measure each fiber's time
> share.
>
> Closes #2694
>
> @TarantoolBot document
> Title: fiber: new function `fiber.top()`
>
> `fiber.top()` returns a table of all alive fibers and lists their cpu
> consumption. Let's take a look at the example:
> ```
> tarantool> fiber.top()
> ---
> - 1:
> cpu average (%): 10.779696493982
> cpu instant (%): 10.435256168573
> 115:
> cpu average (%): 5.4571279061075
> cpu instant (%): 5.9653973440576
> 120:
> cpu average (%): 21.944382148464
> cpu instant (%): 23.849021825646
> 116:
> cpu average (%): 8.6603872318158
> cpu instant (%): 9.6812031335093
> 119:
> cpu average (%): 21.933168871944
> cpu instant (%): 20.007540530351
> cpu misses: 0
> 118:
> cpu average (%): 19.342901995963
> cpu instant (%): 16.932679820703
> 117:
> cpu average (%): 11.549674814981
> cpu instant (%): 13.128901177161
> ...
> ```
> In the table above keys are fiber id's (and a single 'cpu misses' key
> which indicates the amount of times tx thread was rescheduled on a
> different cpu core. More on that later).
> The two metrics available for each fiber are:
> 1) cpu instant (per cent),
> which indicates the share of time fiber was executing during the
> previous event loop iteration
> 2) cpu average (per cent), which is calculated as an exponential moving
> average of `cpu instant` values over all previous event loop iterations.
>
> More info on `cpu misses` field returned by `fiber.top()`:
> `cpu misses` indicates the amount of times tx thread detected it was
> rescheduled on a different cpu core during the last event loop
> iteration.
> fiber.top() uses cpu timestamp counter to measure each fiber's execution
> time. However, each cpu core may have its own counter value (you can
> only rely on counter deltas if both measurements were taken on the same
> core, otherwise the delta may even get negative).
> When tx thread is rescheduled to a different cpu core, tarantool just
> assumes cpu delta was zero for the lust measurement. This loweres
> precision of our computations, so the bigger `cpu misses` value the
> lower the precision of fiber.top() results.
> ---
> src/lib/core/fiber.c | 134 ++++++++++++++++++++++++++++++++++++++++
> src/lib/core/fiber.h | 34 ++++++++++
> src/lua/fiber.c | 56 +++++++++++++++++
> test/app/fiber.result | 78 +++++++++++++++++++++++
> test/app/fiber.test.lua | 31 ++++++++++
> 5 files changed, 333 insertions(+)
>
> diff --git a/src/lib/core/fiber.c b/src/lib/core/fiber.c
> index b813c1739..6973fd678 100644
> --- a/src/lib/core/fiber.c
> +++ b/src/lib/core/fiber.c
> @@ -37,6 +37,7 @@
> #include <stdlib.h>
> #include <string.h>
> #include <pmatomic.h>
> +#include <x86intrin.h> /* __rdtscp() */
>
> #include "assoc.h"
> #include "memory.h"
> @@ -82,6 +83,28 @@ static int (*fiber_invoke)(fiber_func f, va_list ap);
> err;
\
> })
>
> +/**
> + * An action performed each time a context switch happens.
> + * Used to count each fiber's processing time.
> + * This is a macro rather than a function, since it is used
> + * in scheduler too.
> + */
> +#define clock_set_on_csw(caller)
\
> +({
\
> + uint64_t clock;
\
> + uint32_t cpu_id;
\
> + clock = __rdtscp(&cpu_id);
\
> +
\
> + if (cpu_id == cord()->cpu_id_last) {
\
> + (caller)->clock_delta += clock - cord()->clock_last;
\
> + cord()->clock_delta += clock - cord()->clock_last; \
> + } else {
\
> + cord()->cpu_id_last = cpu_id;
\
> + cord()->cpu_miss_count++;
\
> + }
\
> + cord()->clock_last = clock;
\
> +})
> +
> /*
> * Defines a handler to be executed on exit from cord's thread func,
> * accessible via cord()->on_exit (normally NULL). It is used to
> @@ -107,6 +130,8 @@ pthread_t main_thread_id;
> static size_t page_size;
> static int stack_direction;
>
> +static bool fiber_top_enabled = false;
> +
> enum {
> /* The minimum allowable fiber stack size in bytes */
> FIBER_STACK_SIZE_MINIMAL = 16384,
> @@ -246,6 +271,7 @@ fiber_call(struct fiber *callee)
> /** By convention, these triggers must not throw. */
> if (! rlist_empty(&caller->on_yield))
> trigger_run(&caller->on_yield, NULL);
> + clock_set_on_csw(caller);
> callee->caller = caller;
> callee->flags |= FIBER_IS_READY;
> caller->flags |= FIBER_IS_READY;
> @@ -474,6 +500,7 @@ fiber_yield(void)
> /** By convention, these triggers must not throw. */
> if (! rlist_empty(&caller->on_yield))
> trigger_run(&caller->on_yield, NULL);
> + clock_set_on_csw(caller);
>
> assert(callee->flags & FIBER_IS_READY || callee == &cord->sched);
> assert(! (callee->flags & FIBER_IS_DEAD));
> @@ -584,6 +611,7 @@ fiber_schedule_list(struct rlist *list)
> }
> last->caller = fiber();
> assert(fiber() == &cord()->sched);
> + clock_set_on_csw(fiber());
> fiber_call_impl(first);
> }
>
> @@ -980,6 +1008,8 @@ fiber_new_ex(const char *name, const struct fiber_attr
> *fiber_attr, }
>
> fiber->f = f;
> + fiber->clock_acc = 0;
> + fiber->clock_delta = 0;
> /* Excluding reserved range */
> if (++cord->max_fid < FIBER_ID_MAX_RESERVED)
> cord->max_fid = FIBER_ID_MAX_RESERVED + 1;
> @@ -1044,6 +1074,103 @@ fiber_destroy_all(struct cord *cord)
>
struct fiber, link));
> }
>
> +static void
> +loop_on_iteration_start(ev_loop *loop, ev_check *watcher, int revents)
> +{
> + (void) loop;
> + (void) watcher;
> + (void) revents;
> +
> + cord()->clock_last = __rdtscp(&cord()->cpu_id_last);
> + cord()->cpu_miss_count = 0;
> +}
> +
> +
> +/**
> + * Calculate the exponential moving average for the clock deltas
> + * per loop iteration. The coeffitient is 1/16.
> + */
> +static inline uint64_t
> +clock_diff_accumulate(uint64_t acc, uint64_t delta)
> +{
> + if (acc > 0) {
> + return delta / 16 + 15 * acc / 16;
> + } else {
> + return delta;
> + }
> +}
> +
> +static void
> +loop_on_iteration_end(ev_loop *loop, ev_prepare *watcher, int revents)
> +{
> + (void) loop;
> + (void) watcher;
> + (void) revents;
> + struct fiber *fiber;
> + assert(fiber() == &cord()->sched);
> +
> + /*
> + * Record the scheduler's latest clock change, even though
> + * it's not a context switch, but an event loop iteration
> + * end.
> + */
> + clock_set_on_csw(&cord()->sched);
> +
> + cord()->cpu_miss_count_last = cord()->cpu_miss_count;
> + cord()->cpu_miss_count = 0;
> +
> + cord()->clock_acc = clock_diff_accumulate(cord()->clock_acc,
> cord()->clock_delta); + cord()->clock_delta_last = cord()->clock_delta;
> + cord()->clock_delta = 0;
> +
> + cord()->sched.clock_acc = clock_diff_accumulate(cord()-
>sched.clock_acc,
> cord()->sched.clock_delta); + cord()->sched.clock_delta_last =
> cord()->sched.clock_delta;
> + cord()->sched.clock_delta = 0;
> +
> + rlist_foreach_entry(fiber, &cord()->alive, link) {
> + fiber->clock_acc = clock_diff_accumulate(fiber->clock_acc,
> fiber->clock_delta); + fiber->clock_delta_last = fiber-
>clock_delta;
> + fiber->clock_delta = 0;
> + }
> +}
> +
> +static inline void
> +fiber_top_init()
> +{
> + ev_prepare_init(&cord()->prepare_event, loop_on_iteration_end);
> + ev_check_init(&cord()->check_event, loop_on_iteration_start);
> +}
> +
> +bool
> +fiber_top_is_enabled()
> +{
> + return fiber_top_enabled;
> +}
> +
> +inline void
> +fiber_top_enable()
> +{
> + if (!fiber_top_enabled) {
> + ev_prepare_start(cord()->loop, &cord()->prepare_event);
> + ev_check_start(cord()->loop, &cord()->check_event);
> + fiber_top_enabled = true;
> +
> + cord()->clock_acc = 0;
> + cord()->cpu_miss_count_last = 0;
> + cord()->clock_delta_last = 0;
> + }
> +}
> +
> +inline void
> +fiber_top_disable()
> +{
> + if (fiber_top_enabled) {
> + ev_prepare_stop(cord()->loop, &cord()->prepare_event);
> + ev_check_stop(cord()->loop, &cord()->check_event);
> + fiber_top_enabled = false;
> + }
> +}
> +
> void
> cord_create(struct cord *cord, const char *name)
> {
> @@ -1077,6 +1204,13 @@ cord_create(struct cord *cord, const char *name)
> ev_async_init(&cord->wakeup_event, fiber_schedule_wakeup);
>
> ev_idle_init(&cord->idle_event, fiber_schedule_idle);
> +
> + /* fiber.top() currently works only for the main thread. */
> + if (cord_is_main()) {
> + fiber_top_init();
> + fiber_top_enable();
> + }
> +
> cord_set_name(name);
>
> #if ENABLE_ASAN
> diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h
> index fb168e25e..e56a2aa35 100644
> --- a/src/lib/core/fiber.h
> +++ b/src/lib/core/fiber.h
> @@ -386,6 +386,16 @@ struct fiber {
> uint32_t fid;
> /** Fiber flags */
> uint32_t flags;
> + /**
> + * Accumulated clock value calculated using exponential
> + * moving average.
> + */
> + uint64_t clock_acc;
> + /**
> + * Clock delta calculated on previous event loop iteration.
> + */
> + uint64_t clock_delta_last;
> + uint64_t clock_delta;
> /** Link in cord->alive or cord->dead list. */
> struct rlist link;
> /** Link in cord->ready list. */
> @@ -457,6 +467,13 @@ struct cord {
> * reserved.
> */
> uint32_t max_fid;
> + uint64_t clock_acc;
> + uint64_t clock_delta;
> + uint64_t clock_delta_last;
> + uint64_t clock_last;
> + uint32_t cpu_id_last;
> + uint32_t cpu_miss_count;
> + uint32_t cpu_miss_count_last;
> pthread_t id;
> const struct cord_on_exit *on_exit;
> /** A helper hash to map id -> fiber. */
> @@ -482,6 +499,14 @@ struct cord {
> * is no 1 ms delay in case of zero sleep timeout.
> */
> ev_idle idle_event;
> + /** An event triggered on every event loop iteration start. */
> + ev_check check_event;
> + /**
> + * An event triggered on every event loop iteration end.
> + * Just like the event above it is used in per-fiber cpu
> + * time calculations.
> + */
> + ev_prepare prepare_event;
> /** A memory cache for (struct fiber) */
> struct mempool fiber_mempool;
> /** A runtime slab cache for general use in this cord. */
> @@ -625,6 +650,15 @@ typedef int (*fiber_stat_cb)(struct fiber *f, void
> *ctx); int
> fiber_stat(fiber_stat_cb cb, void *cb_ctx);
>
> +bool
> +fiber_top_is_enabled();
> +
> +void
> +fiber_top_enable();
> +
> +void
> +fiber_top_disable();
> +
> /** Useful for C unit tests */
> static inline int
> fiber_c_invoke(fiber_func f, va_list ap)
> diff --git a/src/lua/fiber.c b/src/lua/fiber.c
> index 336be60a2..d5d9d5573 100644
> --- a/src/lua/fiber.c
> +++ b/src/lua/fiber.c
> @@ -319,6 +319,59 @@ lbox_fiber_statof_nobt(struct fiber *f, void *cb_ctx)
> return lbox_fiber_statof(f, cb_ctx, false);
> }
>
> +static int
> +lbox_fiber_top_entry(struct fiber *f, void *cb_ctx)
> +{
> + struct lua_State *L = (struct lua_State *) cb_ctx;
> +
> + lua_pushinteger(L, f->fid);
> + lua_newtable(L);
> +
> + lua_pushliteral(L, "cpu average (%)");
> + lua_pushnumber(L, f->clock_acc / (double)cord()->clock_acc * 100);
> + lua_settable(L, -3);
> + lua_pushliteral(L, "cpu instant (%)");
> + lua_pushnumber(L, f->clock_delta_last / (double)cord()-
>clock_delta_last *
> 100); + lua_settable(L, -3);
> + lua_settable(L, -3);
> +
> + return 0;
> +}
> +
> +static int
> +lbox_fiber_top(struct lua_State *L)
> +{
> + if (!fiber_top_is_enabled()) {
> + luaL_error(L, "fiber.top() is disabled. enable it with"
> + " fiber.top_enable() first");
> + }
> + lua_newtable(L);
> + lua_pushliteral(L, "cpu misses");
> + lua_pushnumber(L, cord()->cpu_miss_count_last);
> + lua_settable(L, -3);
> +
> + lbox_fiber_top_entry(&cord()->sched, L);
> + fiber_stat(lbox_fiber_top_entry, L);
> +
> + return 1;
> +}
> +
> +static int
> +lbox_fiber_top_enable(struct lua_State *L)
> +{
> + (void) L;
> + fiber_top_enable();
> + return 0;
> +}
> +
> +static int
> +lbox_fiber_top_disable(struct lua_State *L)
> +{
> + (void) L;
> + fiber_top_disable();
> + return 0;
> +}
> +
> /**
> * Return fiber statistics.
> */
> @@ -741,6 +794,9 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
>
> static const struct luaL_Reg fiberlib[] = {
> {"info", lbox_fiber_info},
> + {"top", lbox_fiber_top},
> + {"top_enable", lbox_fiber_top_enable},
> + {"top_disable", lbox_fiber_top_disable},
> {"sleep", lbox_fiber_sleep},
> {"yield", lbox_fiber_yield},
> {"self", lbox_fiber_self},
> diff --git a/test/app/fiber.result b/test/app/fiber.result
> index 94e690f6c..6974cebc8 100644
> --- a/test/app/fiber.result
> +++ b/test/app/fiber.result
> @@ -1462,6 +1462,84 @@ fiber.join(fiber.self())
> ---
> - error: the fiber is not joinable
> ...
> +sum = 0
> +---
> +...
> +-- gh-2694 fiber.top()
> +a = fiber.top()
> +---
> +...
> +-- scheduler is present in fiber.top()
> +a[1] ~= nil
> +---
> +- true
> +...
> +type(a["cpu misses"]) == 'number'
> +---
> +- true
> +...
> +sum_inst = 0
> +---
> +...
> +sum_avg = 0
> +---
> +...
> +test_run:cmd('setopt delimiter ";"')
> +---
> +- true
> +...
> +for k, v in pairs(a) do
> + if type(v) == 'table' then
> + sum_inst = sum_inst + v["cpu instant (%)"]
> + sum_avg = sum_avg + v["cpu average (%)"]
> + end
> +end;
> +---
> +...
> +test_run:cmd('setopt delimiter ""');
> +---
> +- true
> +...
> +sum_inst
> +---
> +- 100
> +...
> +-- not exact due to accumulated integer division errors
> +sum_avg > 99 and sum_avg < 101 or sum_avg
> +---
> +- true
> +...
> +tbl = nil
> +---
> +...
> +f = fiber.new(function() for i = 1,1000 do end fiber.yield() tbl =
> fiber.top()[fiber.self().id()] end) +---
> +...
> +while f:status() ~= 'dead' do fiber.sleep(0.01) end
> +---
> +...
> +tbl["cpu average (%)"] > 0
> +---
> +- true
> +...
> +tbl["cpu instant (%)"] > 0
> +---
> +- true
> +...
> +fiber.top_disable()
> +---
> +...
> +fiber.top()
> +---
> +- error: fiber.top() is disabled. enable it with fiber.top_enable() first
> +...
> +fiber.top_enable()
> +---
> +...
> +type(fiber.top())
> +---
> +- table
> +...
> -- cleanup
> test_run:cmd("clear filter")
> ---
> diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
> index bb8c24990..279c40ce1 100644
> --- a/test/app/fiber.test.lua
> +++ b/test/app/fiber.test.lua
> @@ -629,6 +629,37 @@ while f:status() ~= 'dead' do fiber.sleep(0.01) end
> --
> fiber.join(fiber.self())
>
> +sum = 0
> +
> +-- gh-2694 fiber.top()
> +a = fiber.top()
> +-- scheduler is present in fiber.top()
> +a[1] ~= nil
> +type(a["cpu misses"]) == 'number'
> +sum_inst = 0
> +sum_avg = 0
> +test_run:cmd('setopt delimiter ";"')
> +for k, v in pairs(a) do
> + if type(v) == 'table' then
> + sum_inst = sum_inst + v["cpu instant (%)"]
> + sum_avg = sum_avg + v["cpu average (%)"]
> + end
> +end;
> +test_run:cmd('setopt delimiter ""');
> +sum_inst
> +-- not exact due to accumulated integer division errors
> +sum_avg > 99 and sum_avg < 101 or sum_avg
> +tbl = nil
> +f = fiber.new(function() for i = 1,1000 do end fiber.yield() tbl =
> fiber.top()[fiber.self().id()] end) +while f:status() ~= 'dead' do
> fiber.sleep(0.01) end
> +tbl["cpu average (%)"] > 0
> +tbl["cpu instant (%)"] > 0
> +
> +fiber.top_disable()
> +fiber.top()
> +fiber.top_enable()
> +type(fiber.top())
> +
> -- cleanup
> test_run:cmd("clear filter")
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20191015/33aa90f7/attachment.sig>
More information about the Tarantool-patches
mailing list