[Tarantool-patches] [PATCH 2/2] fiber: destroy fiber.storage created by iproto
Igor Munkin
imun at tarantool.org
Thu Dec 12 03:00:47 MSK 2019
Vlad,
Thanks for the patch!
Sorry, I needed some intro into all this machinery thus the review was
postponed a bit and you've managed to prepare a new patch regarding
Kostja's remarks. Therefore, I decided to move all my notes to the new
patch instead of splitting them into two replies.
This patch LGTM in general, but please consider several minor comments I
left below.
On 11.12.19, Vladislav Shpilevoy wrote:
<snipped>
> =========================================================================
>
> The whole new patch:
>
> =========================================================================
>
> fiber: destroy fiber.storage created by iproto
>
> Fiber.storage was not deleted when created in a fiber started from
> the thread pool used by IProto requests. The problem was that
> fiber.storage was created and deleted in Lua land only, assuming
> that only Lua-born fibers could have it. But in fact any fiber can
> create a Lua storage. Including the ones used to serve IProto
> requests.
>
> Not deletion of the storage led to a possibility of meeting a
> non-empty fiber.storage in the beginning of an iproto request, and
> not deletion of the memory caught by the storage until its
> explicit nullification.
>
> Now the storage destructor works for any fiber, which managed to
> create the storage. The destructor unrefs and nullifies the
> storage.
>
> For destructor purposes the fiber.on_stop triggers were reworked.
> Now they are on_cleanup triggers, and can be called multiple times
> during fiber's lifetime. After every request done by that fiber.
>
> Closes #4662
> Closes #3462
>
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index c39b8e7bf..600a21ac8 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -1306,6 +1306,12 @@ static void
> tx_fiber_init(struct session *session, uint64_t sync)
> {
> struct fiber *f = fiber();
> + /*
> + * There should not be any not executed
> + * destructors from a previous request
> + * executed in that fiber.
> + */
> + assert(rlist_empty(&f->on_cleanup));
> f->storage.net.sync = sync;
> /*
> * We do not cleanup fiber keys at the end of each request.
> @@ -1321,6 +1327,17 @@ tx_fiber_init(struct session *session, uint64_t sync)
> fiber_set_user(f, &session->credentials);
> }
>
> +/**
> + * Cleanup current fiber after a request is
> + * executed to make it possible to reuse the fiber
> + * for a next request.
> + */
> +static inline void
> +tx_fiber_cleanup()
> +{
> + fiber_cleanup(fiber());
> +}
> +
> static void
> tx_process_disconnect(struct cmsg *m)
> {
> @@ -1331,6 +1348,7 @@ tx_process_disconnect(struct cmsg *m)
> if (! rlist_empty(&session_on_disconnect)) {
> tx_fiber_init(con->session, 0);
> session_run_on_disconnect_triggers(con->session);
> + tx_fiber_cleanup();
> }
> }
> }
> @@ -1486,6 +1504,7 @@ tx_reply_iproto_error(struct cmsg *m)
> iproto_reply_error(out, diag_last_error(&msg->diag),
> msg->header.sync, ::schema_version);
> iproto_wpos_create(&msg->wpos, out);
> + tx_fiber_cleanup();
> }
>
> /** Inject a short delay on tx request processing for testing. */
> @@ -1519,9 +1538,11 @@ tx_process1(struct cmsg *m)
> iproto_reply_select(out, &svp, msg->header.sync, ::schema_version,
> tuple != 0);
> iproto_wpos_create(&msg->wpos, out);
> - return;
> + goto end;
> error:
> tx_reply_error(msg);
> +end:
> + tx_fiber_cleanup();
> }
>
> static void
> @@ -1562,9 +1583,11 @@ tx_process_select(struct cmsg *m)
> iproto_reply_select(out, &svp, msg->header.sync,
> ::schema_version, count);
> iproto_wpos_create(&msg->wpos, out);
> - return;
> + goto end;
> error:
> tx_reply_error(msg);
> +end:
> + tx_fiber_cleanup();
> }
>
> static int
> @@ -1652,9 +1675,11 @@ tx_process_call(struct cmsg *m)
> iproto_reply_select(out, &svp, msg->header.sync,
> ::schema_version, count);
> iproto_wpos_create(&msg->wpos, out);
> - return;
> + goto end;
> error:
> tx_reply_error(msg);
> +end:
> + tx_fiber_cleanup();
> }
>
> static void
> @@ -1695,9 +1720,11 @@ tx_process_misc(struct cmsg *m)
> } catch (Exception *e) {
> tx_reply_error(msg);
> }
> - return;
> + goto end;
> error:
> tx_reply_error(msg);
> +end:
> + tx_fiber_cleanup();
> }
>
> static void
> @@ -1711,8 +1738,6 @@ tx_process_sql(struct cmsg *m)
> const char *sql;
> uint32_t len;
>
> - tx_fiber_init(msg->connection->session, msg->header.sync);
> -
> if (tx_check_schema(msg->header.schema_version))
> goto error;
> assert(msg->header.type == IPROTO_EXECUTE);
> @@ -1746,9 +1771,11 @@ tx_process_sql(struct cmsg *m)
> port_destroy(&port);
> iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version);
> iproto_wpos_create(&msg->wpos, out);
> - return;
> + goto end;
> error:
> tx_reply_error(msg);
> +end:
> + tx_fiber_cleanup();
> }
>
> static void
> @@ -1781,11 +1808,12 @@ tx_process_join_subscribe(struct cmsg *m)
> unreachable();
> }
> } catch (SocketError *e) {
> - return; /* don't write error response to prevent SIGPIPE */
> + /* don't write error response to prevent SIGPIPE */
> } catch (Exception *e) {
> iproto_write_error(con->input.fd, e, ::schema_version,
> msg->header.sync);
> }
> + tx_fiber_cleanup();
> }
>
> static void
> @@ -1891,6 +1919,7 @@ tx_process_connect(struct cmsg *m)
> tx_reply_error(msg);
> msg->close_connection = true;
> }
> + tx_fiber_cleanup();
> }
>
> /**
> diff --git a/src/box/session.cc b/src/box/session.cc
> index 461d1cf25..09e988148 100644
> --- a/src/box/session.cc
> +++ b/src/box/session.cc
> @@ -80,8 +80,12 @@ sid_max()
> return sid_max;
> }
>
> +/**
> + * Destroy session of a background fiber when the
> + * fiber is getting destroyed.
> + */
> static int
> -session_on_stop(struct trigger *trigger, void * /* event */)
> +session_on_fiber_cleanup(struct trigger *trigger, void * /* event */)
> {
> /*
> * Remove on_stop trigger from the fiber, otherwise the
> @@ -167,11 +171,10 @@ session_create_on_demand()
> struct session *s = session_create(SESSION_TYPE_BACKGROUND);
> if (s == NULL)
> return NULL;
> - s->fiber_on_stop = {
> - RLIST_LINK_INITIALIZER, session_on_stop, NULL, NULL
> - };
> - /* Add a trigger to destroy session on fiber stop */
> - trigger_add(&fiber()->on_stop, &s->fiber_on_stop);
> + trigger_create(&s->fiber_on_cleanup, session_on_fiber_cleanup,
> + NULL, NULL);
> + /* Add a trigger to destroy session on fiber cleanup. */
> + trigger_add(&fiber()->on_cleanup, &s->fiber_on_cleanup);
> credentials_reset(&s->credentials, admin_user);
> fiber_set_session(fiber(), s);
> fiber_set_user(fiber(), &s->credentials);
> diff --git a/src/box/session.h b/src/box/session.h
> index eff3d7a67..710e4919b 100644
> --- a/src/box/session.h
> +++ b/src/box/session.h
> @@ -103,8 +103,11 @@ struct session {
> union session_meta meta;
> /** Session user id and global grants */
> struct credentials credentials;
> - /** Trigger for fiber on_stop to cleanup created on-demand session */
> - struct trigger fiber_on_stop;
> + /**
> + * Trigger for fiber on_cleanup to cleanup created
> + * on-demand session.
> + */
> + struct trigger fiber_on_cleanup;
> };
>
> struct session_vtab {
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 963ec8eeb..7d549a6a9 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -41,7 +41,7 @@ double too_long_threshold;
> static struct stailq txn_cache = {NULL, &txn_cache.first};
>
> static int
> -txn_on_stop(struct trigger *trigger, void *event);
> +txn_on_fiber_cleanup(struct trigger *trigger, void *event);
>
> static int
> txn_on_yield(struct trigger *trigger, void *event);
> @@ -226,8 +226,9 @@ txn_begin()
> txn->fiber = NULL;
> fiber_set_txn(fiber(), txn);
> /* fiber_on_yield is initialized by engine on demand */
> - trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
> - trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
> + trigger_create(&txn->fiber_on_cleanup, txn_on_fiber_cleanup,
> + NULL, NULL);
> + trigger_add(&fiber()->on_cleanup, &txn->fiber_on_cleanup);
> /*
> * By default all transactions may yield.
> * It's a responsibility of an engine to disable yields
> @@ -550,7 +551,7 @@ txn_prepare(struct txn *txn)
> if (engine_prepare(txn->engine, txn) != 0)
> return -1;
> }
> - trigger_clear(&txn->fiber_on_stop);
> + trigger_clear(&txn->fiber_on_cleanup);
> if (!txn_has_flag(txn, TXN_CAN_YIELD))
> trigger_clear(&txn->fiber_on_yield);
> return 0;
> @@ -618,7 +619,7 @@ void
> txn_rollback(struct txn *txn)
> {
> assert(txn == in_txn());
> - trigger_clear(&txn->fiber_on_stop);
> + trigger_clear(&txn->fiber_on_cleanup);
> if (!txn_has_flag(txn, TXN_CAN_YIELD))
> trigger_clear(&txn->fiber_on_yield);
> txn->signature = -1;
> @@ -840,7 +841,7 @@ txn_savepoint_release(struct txn_savepoint *svp)
> }
>
> static int
> -txn_on_stop(struct trigger *trigger, void *event)
> +txn_on_fiber_cleanup(struct trigger *trigger, void *event)
> {
> (void) trigger;
> (void) event;
> diff --git a/src/box/txn.h b/src/box/txn.h
> index da12feebf..755c464f0 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -207,10 +207,11 @@ struct txn {
> */
> struct trigger fiber_on_yield;
> /**
> - * Trigger on fiber stop, to rollback transaction
> - * in case a fiber stops (all engines).
> + * Trigger on fiber cleanup, to rollback transaction
> + * in case a fiber is getting reset/recycled/destroyed
> + * (all engines).
> */
> - struct trigger fiber_on_stop;
> + struct trigger fiber_on_cleanup;
> /** Commit and rollback triggers. */
> struct rlist on_commit, on_rollback;
> /**
> diff --git a/src/lib/core/fiber.c b/src/lib/core/fiber.c
> index 00ae8cded..d89c640aa 100644
> --- a/src/lib/core/fiber.c
> +++ b/src/lib/core/fiber.c
> @@ -325,6 +325,20 @@ fiber_attr_getstacksize(struct fiber_attr *fiber_attr)
> fiber_attr_default.stack_size;
> }
>
> +void
> +fiber_cleanup(struct fiber *f)
> +{
> + if (trigger_run(&f->on_cleanup, f) != 0)
> + panic("Fiber cleanup can't fail");
> + /*
> + * All cleanup triggers are supposed to
> + * remove themselves. So as no to waste
> + * time on that here, and to make them all
> + * work uniformly.
> + */
> + assert(rlist_empty(&f->on_cleanup));
> +}
> +
> static void
> fiber_recycle(struct fiber *fiber);
>
> @@ -787,7 +801,7 @@ static void
> fiber_reset(struct fiber *fiber)
> {
> rlist_create(&fiber->on_yield);
> - rlist_create(&fiber->on_stop);
> + rlist_create(&fiber->on_cleanup);
> fiber->flags = FIBER_DEFAULT_FLAGS;
> #if ENABLE_FIBER_TOP
> clock_stat_reset(&fiber->clock_stat);
> @@ -856,8 +870,7 @@ fiber_loop(MAYBE_UNUSED void *data)
> assert(f != fiber);
> fiber_wakeup(f);
> }
> - if (! rlist_empty(&fiber->on_stop))
> - trigger_run(&fiber->on_stop, fiber);
> + fiber_cleanup(fiber);
I see you dropped the if above and fiber_cleanup doesn't have any
corresponding within. Is this removal an intentional one?
Minor: There is a mess with whitespace above. Feel free to ignore the
remark, since indentation was broken before your changes. I blamed
several lines and it looks more like a ridiculous code style, and not a
problem with editor. Thereby you just implicitly continue the mix.
> /* reset pending wakeups */
> rlist_del(&fiber->state);
> if (! (fiber->flags & FIBER_IS_JOINABLE))
> @@ -1161,7 +1174,7 @@ fiber_destroy(struct cord *cord, struct fiber *f)
> assert(f != &cord->sched);
>
> trigger_destroy(&f->on_yield);
> - trigger_destroy(&f->on_stop);
> + trigger_destroy(&f->on_cleanup);
> rlist_del(&f->state);
> rlist_del(&f->link);
> region_destroy(&f->gc);
> @@ -1525,8 +1538,8 @@ cord_cojoin(struct cord *cord)
> int
> break_ev_loop_f(struct trigger *trigger, void *event)
> {
> - (void) trigger;
> (void) event;
> + trigger_clear(trigger);
> ev_break(loop(), EVBREAK_ALL);
> return 0;
> }
> @@ -1555,7 +1568,7 @@ cord_costart_thread_func(void *arg)
> * Got to be in a trigger, to break the loop even
> * in case of an exception.
> */
> - trigger_add(&f->on_stop, &break_ev_loop);
> + trigger_add(&f->on_cleanup, &break_ev_loop);
> fiber_set_joinable(f, true);
> fiber_start(f, ctx.arg);
> if (!fiber_is_dead(f)) {
> diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h
> index c5b975513..21fff8f88 100644
> --- a/src/lib/core/fiber.h
> +++ b/src/lib/core/fiber.h
> @@ -458,8 +458,17 @@ struct fiber {
>
> /** Triggers invoked before this fiber yields. Must not throw. */
> struct rlist on_yield;
> - /** Triggers invoked before this fiber stops. Must not throw. */
> - struct rlist on_stop;
> + /**
> + * Triggers invoked before this fiber is
> + * stopped/reset/recycled/destroyed/reused. In other
> + * words, each time when the fiber has finished execution
> + * of a request.
> + * In particular, for fibers not from fiber pool the
> + * cleanup is done before destroy and death.
> + * Pooled fibers are cleaned up after each request, even
> + * if they are never destroyed.
> + */
> + struct rlist on_cleanup;
Minor: both trigger lists above has the "Must not throw" note. Does the
introduced list respect this rule? Please, adjust the comment if yes.
> /**
> * The list of fibers awaiting for this fiber's timely
> * (or untimely) death.
> @@ -506,6 +515,10 @@ struct fiber {
> char name[FIBER_NAME_MAX + 1];
> };
>
> +/** Invoke cleanup triggers and delete them. */
> +void
> +fiber_cleanup(struct fiber *f);
> +
> struct cord_on_exit;
>
> /**
> diff --git a/src/lua/fiber.c b/src/lua/fiber.c
> index a7b75f9bf..4d2828f1c 100644
> --- a/src/lua/fiber.c
> +++ b/src/lua/fiber.c
> @@ -441,11 +441,6 @@ lua_fiber_run_f(MAYBE_UNUSED va_list ap)
> int coro_ref = lua_tointeger(L, -1);
> lua_pop(L, 1);
> result = luaT_call(L, lua_gettop(L) - 1, LUA_MULTRET);
> -
> - /* Destroy local storage */
> - int storage_ref = f->storage.lua.ref;
> - if (storage_ref > 0)
> - luaL_unref(tarantool_L, LUA_REGISTRYINDEX, storage_ref);
> /*
> * If fiber is not joinable
> * We can unref child stack here,
> @@ -606,12 +601,43 @@ lbox_fiber_name(struct lua_State *L)
> }
> }
>
> +/**
> + * Trigger invoked when the fiber has finished execution of its
> + * current request. Only purpose - delete storage.lua.ref keeping
> + * a reference of Lua fiber.storage object. Unlike Lua stack,
> + * Lua fiber storage may be created not only for fibers born from
> + * Lua land. For example, an IProto request may execute a Lua
> + * function, which can create the storage. Trigger guarantees,
> + * that even for non-Lua fibers the Lua storage is destroyed.
> + */
> +static int
> +lbox_fiber_on_cleanup(struct trigger *trigger, void *event)
> +{
> + struct fiber *f = (struct fiber *) event;
> + int storage_ref = f->storage.lua.ref;
> + assert(storage_ref > 0);
> + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, storage_ref);
> + f->storage.lua.ref = 0;
Though 0 value is specific "ref" (a table slot, where the LRU ref is
stored) and this value cannot be yield from luaL_ref. Please, consider
using LUA_NOREF value for such cases, since it's being provided by Lua
and lua_unref does nothing for it, as well as for LUA_REFNIL.
> + trigger_clear(trigger);
> + free(trigger);
> + return 0;
> +}
> +
> static int
> lbox_fiber_storage(struct lua_State *L)
> {
> struct fiber *f = lbox_checkfiber(L, 1);
> int storage_ref = f->storage.lua.ref;
> if (storage_ref <= 0) {
> + struct trigger *t = (struct trigger *)
> + malloc(sizeof(*t));
> + if (t == NULL) {
> + diag_set(OutOfMemory, sizeof(*t), "malloc", "t");
> + return luaT_error(L);
> + }
> + trigger_create(t, lbox_fiber_on_cleanup, NULL,
> + (trigger_f0) free);
> + trigger_add(&f->on_cleanup, t);
> lua_newtable(L); /* create local storage on demand */
> storage_ref = luaL_ref(L, LUA_REGISTRYINDEX);
> f->storage.lua.ref = storage_ref;
> diff --git a/test/app/gh-4662-fiber-storage-leak.result b/test/app/gh-4662-fiber-storage-leak.result
> new file mode 100644
> index 000000000..4ade017a4
> --- /dev/null
> +++ b/test/app/gh-4662-fiber-storage-leak.result
> @@ -0,0 +1,88 @@
> +-- test-run result file version 2
> +fiber = require('fiber')
> + | ---
> + | ...
> +netbox = require('net.box')
> + | ---
> + | ...
> +
> +--
> +-- gh-4662: fiber.storage was not deleted when created in a fiber
> +-- started from the thread pool used by IProto requests. The
> +-- problem was that fiber.storage was created and deleted in Lua
> +-- land only, assuming that only Lua-born fibers could have it.
> +-- But in fact any fiber can create a Lua storage. Including the
> +-- ones used to serve IProto requests.
> +-- The test checks if fiber.storage is really deleted, and is not
> +-- shared between requests.
> +--
> +
> +box.schema.user.grant('guest', 'execute', 'universe')
> + | ---
> + | ...
> +storage = nil
> + | ---
> + | ...
> +i = 0
> + | ---
> + | ...
> +weak_table = setmetatable({}, {__mode = 'v'})
> + | ---
> + | ...
> +object = {'object'}
> + | ---
> + | ...
> +weak_table.object = object
> + | ---
> + | ...
> +function ref_object_in_fiber() \
> + storage = fiber.self().storage \
> + assert(next(storage) == nil) \
> + i = i + 1 \
> + fiber.self().storage.key = i \
> + fiber.self().storage.object = object \
> +end
> + | ---
> + | ...
> +
> +c = netbox.connect(box.cfg.listen)
> + | ---
> + | ...
> +c:call('ref_object_in_fiber') c:call('ref_object_in_fiber')
> + | ---
> + | ...
> +storage
> + | ---
> + | - key: 2
> + | object:
> + | - object
> + | ...
> +i
> + | ---
> + | - 2
> + | ...
> +object = nil
> + | ---
> + | ...
> +storage = nil
> + | ---
> + | ...
> +collectgarbage('collect')
> + | ---
> + | - 0
> + | ...
> +-- The weak table should be empty, because the only two hard
> +-- references were in the fibers used to serve
> +-- ref_object_in_fiber() requests. And their storages should be
> +-- cleaned up.
> +weak_table
> + | ---
> + | - []
> + | ...
> +
> +c:close()
> + | ---
> + | ...
> +box.schema.user.revoke('guest', 'execute', 'universe')
> + | ---
> + | ...
> diff --git a/test/app/gh-4662-fiber-storage-leak.test.lua b/test/app/gh-4662-fiber-storage-leak.test.lua
> new file mode 100644
> index 000000000..25acf5679
> --- /dev/null
> +++ b/test/app/gh-4662-fiber-storage-leak.test.lua
> @@ -0,0 +1,43 @@
> +fiber = require('fiber')
> +netbox = require('net.box')
> +
> +--
> +-- gh-4662: fiber.storage was not deleted when created in a fiber
> +-- started from the thread pool used by IProto requests. The
> +-- problem was that fiber.storage was created and deleted in Lua
> +-- land only, assuming that only Lua-born fibers could have it.
> +-- But in fact any fiber can create a Lua storage. Including the
> +-- ones used to serve IProto requests.
> +-- The test checks if fiber.storage is really deleted, and is not
> +-- shared between requests.
> +--
> +
> +box.schema.user.grant('guest', 'execute', 'universe')
> +storage = nil
> +i = 0
> +weak_table = setmetatable({}, {__mode = 'v'})
> +object = {'object'}
> +weak_table.object = object
> +function ref_object_in_fiber() \
> + storage = fiber.self().storage \
> + assert(next(storage) == nil) \
> + i = i + 1 \
> + fiber.self().storage.key = i \
> + fiber.self().storage.object = object \
> +end
> +
> +c = netbox.connect(box.cfg.listen)
> +c:call('ref_object_in_fiber') c:call('ref_object_in_fiber')
> +storage
> +i
> +object = nil
> +storage = nil
> +collectgarbage('collect')
> +-- The weak table should be empty, because the only two hard
> +-- references were in the fibers used to serve
> +-- ref_object_in_fiber() requests. And their storages should be
> +-- cleaned up.
> +weak_table
> +
> +c:close()
> +box.schema.user.revoke('guest', 'execute', 'universe')
>
> =========================================================================
--
Best regards,
IM
More information about the Tarantool-patches
mailing list