From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 7516F46971A for ; Thu, 12 Dec 2019 03:02:56 +0300 (MSK) Date: Thu, 12 Dec 2019 03:00:47 +0300 From: Igor Munkin Message-ID: <20191212000047.GL1214@tarantool.org> References: <20191210083258.GD21413@atlas> <2c8fe897-9a9d-849d-463e-5fadff982b8c@tarantool.org> <20191211070830.GA5953@atlas> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Disposition: inline In-Reply-To: Subject: Re: [Tarantool-patches] [PATCH 2/2] fiber: destroy fiber.storage created by iproto List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy Cc: tarantool-patches@dev.tarantool.org Vlad, Thanks for the patch! Sorry, I needed some intro into all this machinery thus the review was postponed a bit and you've managed to prepare a new patch regarding Kostja's remarks. Therefore, I decided to move all my notes to the new patch instead of splitting them into two replies. This patch LGTM in general, but please consider several minor comments I left below. On 11.12.19, Vladislav Shpilevoy wrote: > ========================================================================= > > The whole new patch: > > ========================================================================= > > fiber: destroy fiber.storage created by iproto > > Fiber.storage was not deleted when created in a fiber started from > the thread pool used by IProto requests. The problem was that > fiber.storage was created and deleted in Lua land only, assuming > that only Lua-born fibers could have it. But in fact any fiber can > create a Lua storage. Including the ones used to serve IProto > requests. > > Not deletion of the storage led to a possibility of meeting a > non-empty fiber.storage in the beginning of an iproto request, and > not deletion of the memory caught by the storage until its > explicit nullification. > > Now the storage destructor works for any fiber, which managed to > create the storage. The destructor unrefs and nullifies the > storage. > > For destructor purposes the fiber.on_stop triggers were reworked. > Now they are on_cleanup triggers, and can be called multiple times > during fiber's lifetime. After every request done by that fiber. > > Closes #4662 > Closes #3462 > > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index c39b8e7bf..600a21ac8 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -1306,6 +1306,12 @@ static void > tx_fiber_init(struct session *session, uint64_t sync) > { > struct fiber *f = fiber(); > + /* > + * There should not be any not executed > + * destructors from a previous request > + * executed in that fiber. > + */ > + assert(rlist_empty(&f->on_cleanup)); > f->storage.net.sync = sync; > /* > * We do not cleanup fiber keys at the end of each request. > @@ -1321,6 +1327,17 @@ tx_fiber_init(struct session *session, uint64_t sync) > fiber_set_user(f, &session->credentials); > } > > +/** > + * Cleanup current fiber after a request is > + * executed to make it possible to reuse the fiber > + * for a next request. > + */ > +static inline void > +tx_fiber_cleanup() > +{ > + fiber_cleanup(fiber()); > +} > + > static void > tx_process_disconnect(struct cmsg *m) > { > @@ -1331,6 +1348,7 @@ tx_process_disconnect(struct cmsg *m) > if (! rlist_empty(&session_on_disconnect)) { > tx_fiber_init(con->session, 0); > session_run_on_disconnect_triggers(con->session); > + tx_fiber_cleanup(); > } > } > } > @@ -1486,6 +1504,7 @@ tx_reply_iproto_error(struct cmsg *m) > iproto_reply_error(out, diag_last_error(&msg->diag), > msg->header.sync, ::schema_version); > iproto_wpos_create(&msg->wpos, out); > + tx_fiber_cleanup(); > } > > /** Inject a short delay on tx request processing for testing. */ > @@ -1519,9 +1538,11 @@ tx_process1(struct cmsg *m) > iproto_reply_select(out, &svp, msg->header.sync, ::schema_version, > tuple != 0); > iproto_wpos_create(&msg->wpos, out); > - return; > + goto end; > error: > tx_reply_error(msg); > +end: > + tx_fiber_cleanup(); > } > > static void > @@ -1562,9 +1583,11 @@ tx_process_select(struct cmsg *m) > iproto_reply_select(out, &svp, msg->header.sync, > ::schema_version, count); > iproto_wpos_create(&msg->wpos, out); > - return; > + goto end; > error: > tx_reply_error(msg); > +end: > + tx_fiber_cleanup(); > } > > static int > @@ -1652,9 +1675,11 @@ tx_process_call(struct cmsg *m) > iproto_reply_select(out, &svp, msg->header.sync, > ::schema_version, count); > iproto_wpos_create(&msg->wpos, out); > - return; > + goto end; > error: > tx_reply_error(msg); > +end: > + tx_fiber_cleanup(); > } > > static void > @@ -1695,9 +1720,11 @@ tx_process_misc(struct cmsg *m) > } catch (Exception *e) { > tx_reply_error(msg); > } > - return; > + goto end; > error: > tx_reply_error(msg); > +end: > + tx_fiber_cleanup(); > } > > static void > @@ -1711,8 +1738,6 @@ tx_process_sql(struct cmsg *m) > const char *sql; > uint32_t len; > > - tx_fiber_init(msg->connection->session, msg->header.sync); > - > if (tx_check_schema(msg->header.schema_version)) > goto error; > assert(msg->header.type == IPROTO_EXECUTE); > @@ -1746,9 +1771,11 @@ tx_process_sql(struct cmsg *m) > port_destroy(&port); > iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version); > iproto_wpos_create(&msg->wpos, out); > - return; > + goto end; > error: > tx_reply_error(msg); > +end: > + tx_fiber_cleanup(); > } > > static void > @@ -1781,11 +1808,12 @@ tx_process_join_subscribe(struct cmsg *m) > unreachable(); > } > } catch (SocketError *e) { > - return; /* don't write error response to prevent SIGPIPE */ > + /* don't write error response to prevent SIGPIPE */ > } catch (Exception *e) { > iproto_write_error(con->input.fd, e, ::schema_version, > msg->header.sync); > } > + tx_fiber_cleanup(); > } > > static void > @@ -1891,6 +1919,7 @@ tx_process_connect(struct cmsg *m) > tx_reply_error(msg); > msg->close_connection = true; > } > + tx_fiber_cleanup(); > } > > /** > diff --git a/src/box/session.cc b/src/box/session.cc > index 461d1cf25..09e988148 100644 > --- a/src/box/session.cc > +++ b/src/box/session.cc > @@ -80,8 +80,12 @@ sid_max() > return sid_max; > } > > +/** > + * Destroy session of a background fiber when the > + * fiber is getting destroyed. > + */ > static int > -session_on_stop(struct trigger *trigger, void * /* event */) > +session_on_fiber_cleanup(struct trigger *trigger, void * /* event */) > { > /* > * Remove on_stop trigger from the fiber, otherwise the > @@ -167,11 +171,10 @@ session_create_on_demand() > struct session *s = session_create(SESSION_TYPE_BACKGROUND); > if (s == NULL) > return NULL; > - s->fiber_on_stop = { > - RLIST_LINK_INITIALIZER, session_on_stop, NULL, NULL > - }; > - /* Add a trigger to destroy session on fiber stop */ > - trigger_add(&fiber()->on_stop, &s->fiber_on_stop); > + trigger_create(&s->fiber_on_cleanup, session_on_fiber_cleanup, > + NULL, NULL); > + /* Add a trigger to destroy session on fiber cleanup. */ > + trigger_add(&fiber()->on_cleanup, &s->fiber_on_cleanup); > credentials_reset(&s->credentials, admin_user); > fiber_set_session(fiber(), s); > fiber_set_user(fiber(), &s->credentials); > diff --git a/src/box/session.h b/src/box/session.h > index eff3d7a67..710e4919b 100644 > --- a/src/box/session.h > +++ b/src/box/session.h > @@ -103,8 +103,11 @@ struct session { > union session_meta meta; > /** Session user id and global grants */ > struct credentials credentials; > - /** Trigger for fiber on_stop to cleanup created on-demand session */ > - struct trigger fiber_on_stop; > + /** > + * Trigger for fiber on_cleanup to cleanup created > + * on-demand session. > + */ > + struct trigger fiber_on_cleanup; > }; > > struct session_vtab { > diff --git a/src/box/txn.c b/src/box/txn.c > index 963ec8eeb..7d549a6a9 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -41,7 +41,7 @@ double too_long_threshold; > static struct stailq txn_cache = {NULL, &txn_cache.first}; > > static int > -txn_on_stop(struct trigger *trigger, void *event); > +txn_on_fiber_cleanup(struct trigger *trigger, void *event); > > static int > txn_on_yield(struct trigger *trigger, void *event); > @@ -226,8 +226,9 @@ txn_begin() > txn->fiber = NULL; > fiber_set_txn(fiber(), txn); > /* fiber_on_yield is initialized by engine on demand */ > - trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL); > - trigger_add(&fiber()->on_stop, &txn->fiber_on_stop); > + trigger_create(&txn->fiber_on_cleanup, txn_on_fiber_cleanup, > + NULL, NULL); > + trigger_add(&fiber()->on_cleanup, &txn->fiber_on_cleanup); > /* > * By default all transactions may yield. > * It's a responsibility of an engine to disable yields > @@ -550,7 +551,7 @@ txn_prepare(struct txn *txn) > if (engine_prepare(txn->engine, txn) != 0) > return -1; > } > - trigger_clear(&txn->fiber_on_stop); > + trigger_clear(&txn->fiber_on_cleanup); > if (!txn_has_flag(txn, TXN_CAN_YIELD)) > trigger_clear(&txn->fiber_on_yield); > return 0; > @@ -618,7 +619,7 @@ void > txn_rollback(struct txn *txn) > { > assert(txn == in_txn()); > - trigger_clear(&txn->fiber_on_stop); > + trigger_clear(&txn->fiber_on_cleanup); > if (!txn_has_flag(txn, TXN_CAN_YIELD)) > trigger_clear(&txn->fiber_on_yield); > txn->signature = -1; > @@ -840,7 +841,7 @@ txn_savepoint_release(struct txn_savepoint *svp) > } > > static int > -txn_on_stop(struct trigger *trigger, void *event) > +txn_on_fiber_cleanup(struct trigger *trigger, void *event) > { > (void) trigger; > (void) event; > diff --git a/src/box/txn.h b/src/box/txn.h > index da12feebf..755c464f0 100644 > --- a/src/box/txn.h > +++ b/src/box/txn.h > @@ -207,10 +207,11 @@ struct txn { > */ > struct trigger fiber_on_yield; > /** > - * Trigger on fiber stop, to rollback transaction > - * in case a fiber stops (all engines). > + * Trigger on fiber cleanup, to rollback transaction > + * in case a fiber is getting reset/recycled/destroyed > + * (all engines). > */ > - struct trigger fiber_on_stop; > + struct trigger fiber_on_cleanup; > /** Commit and rollback triggers. */ > struct rlist on_commit, on_rollback; > /** > diff --git a/src/lib/core/fiber.c b/src/lib/core/fiber.c > index 00ae8cded..d89c640aa 100644 > --- a/src/lib/core/fiber.c > +++ b/src/lib/core/fiber.c > @@ -325,6 +325,20 @@ fiber_attr_getstacksize(struct fiber_attr *fiber_attr) > fiber_attr_default.stack_size; > } > > +void > +fiber_cleanup(struct fiber *f) > +{ > + if (trigger_run(&f->on_cleanup, f) != 0) > + panic("Fiber cleanup can't fail"); > + /* > + * All cleanup triggers are supposed to > + * remove themselves. So as no to waste > + * time on that here, and to make them all > + * work uniformly. > + */ > + assert(rlist_empty(&f->on_cleanup)); > +} > + > static void > fiber_recycle(struct fiber *fiber); > > @@ -787,7 +801,7 @@ static void > fiber_reset(struct fiber *fiber) > { > rlist_create(&fiber->on_yield); > - rlist_create(&fiber->on_stop); > + rlist_create(&fiber->on_cleanup); > fiber->flags = FIBER_DEFAULT_FLAGS; > #if ENABLE_FIBER_TOP > clock_stat_reset(&fiber->clock_stat); > @@ -856,8 +870,7 @@ fiber_loop(MAYBE_UNUSED void *data) > assert(f != fiber); > fiber_wakeup(f); > } > - if (! rlist_empty(&fiber->on_stop)) > - trigger_run(&fiber->on_stop, fiber); > + fiber_cleanup(fiber); I see you dropped the if above and fiber_cleanup doesn't have any corresponding within. Is this removal an intentional one? Minor: There is a mess with whitespace above. Feel free to ignore the remark, since indentation was broken before your changes. I blamed several lines and it looks more like a ridiculous code style, and not a problem with editor. Thereby you just implicitly continue the mix. > /* reset pending wakeups */ > rlist_del(&fiber->state); > if (! (fiber->flags & FIBER_IS_JOINABLE)) > @@ -1161,7 +1174,7 @@ fiber_destroy(struct cord *cord, struct fiber *f) > assert(f != &cord->sched); > > trigger_destroy(&f->on_yield); > - trigger_destroy(&f->on_stop); > + trigger_destroy(&f->on_cleanup); > rlist_del(&f->state); > rlist_del(&f->link); > region_destroy(&f->gc); > @@ -1525,8 +1538,8 @@ cord_cojoin(struct cord *cord) > int > break_ev_loop_f(struct trigger *trigger, void *event) > { > - (void) trigger; > (void) event; > + trigger_clear(trigger); > ev_break(loop(), EVBREAK_ALL); > return 0; > } > @@ -1555,7 +1568,7 @@ cord_costart_thread_func(void *arg) > * Got to be in a trigger, to break the loop even > * in case of an exception. > */ > - trigger_add(&f->on_stop, &break_ev_loop); > + trigger_add(&f->on_cleanup, &break_ev_loop); > fiber_set_joinable(f, true); > fiber_start(f, ctx.arg); > if (!fiber_is_dead(f)) { > diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h > index c5b975513..21fff8f88 100644 > --- a/src/lib/core/fiber.h > +++ b/src/lib/core/fiber.h > @@ -458,8 +458,17 @@ struct fiber { > > /** Triggers invoked before this fiber yields. Must not throw. */ > struct rlist on_yield; > - /** Triggers invoked before this fiber stops. Must not throw. */ > - struct rlist on_stop; > + /** > + * Triggers invoked before this fiber is > + * stopped/reset/recycled/destroyed/reused. In other > + * words, each time when the fiber has finished execution > + * of a request. > + * In particular, for fibers not from fiber pool the > + * cleanup is done before destroy and death. > + * Pooled fibers are cleaned up after each request, even > + * if they are never destroyed. > + */ > + struct rlist on_cleanup; Minor: both trigger lists above has the "Must not throw" note. Does the introduced list respect this rule? Please, adjust the comment if yes. > /** > * The list of fibers awaiting for this fiber's timely > * (or untimely) death. > @@ -506,6 +515,10 @@ struct fiber { > char name[FIBER_NAME_MAX + 1]; > }; > > +/** Invoke cleanup triggers and delete them. */ > +void > +fiber_cleanup(struct fiber *f); > + > struct cord_on_exit; > > /** > diff --git a/src/lua/fiber.c b/src/lua/fiber.c > index a7b75f9bf..4d2828f1c 100644 > --- a/src/lua/fiber.c > +++ b/src/lua/fiber.c > @@ -441,11 +441,6 @@ lua_fiber_run_f(MAYBE_UNUSED va_list ap) > int coro_ref = lua_tointeger(L, -1); > lua_pop(L, 1); > result = luaT_call(L, lua_gettop(L) - 1, LUA_MULTRET); > - > - /* Destroy local storage */ > - int storage_ref = f->storage.lua.ref; > - if (storage_ref > 0) > - luaL_unref(tarantool_L, LUA_REGISTRYINDEX, storage_ref); > /* > * If fiber is not joinable > * We can unref child stack here, > @@ -606,12 +601,43 @@ lbox_fiber_name(struct lua_State *L) > } > } > > +/** > + * Trigger invoked when the fiber has finished execution of its > + * current request. Only purpose - delete storage.lua.ref keeping > + * a reference of Lua fiber.storage object. Unlike Lua stack, > + * Lua fiber storage may be created not only for fibers born from > + * Lua land. For example, an IProto request may execute a Lua > + * function, which can create the storage. Trigger guarantees, > + * that even for non-Lua fibers the Lua storage is destroyed. > + */ > +static int > +lbox_fiber_on_cleanup(struct trigger *trigger, void *event) > +{ > + struct fiber *f = (struct fiber *) event; > + int storage_ref = f->storage.lua.ref; > + assert(storage_ref > 0); > + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, storage_ref); > + f->storage.lua.ref = 0; Though 0 value is specific "ref" (a table slot, where the LRU ref is stored) and this value cannot be yield from luaL_ref. Please, consider using LUA_NOREF value for such cases, since it's being provided by Lua and lua_unref does nothing for it, as well as for LUA_REFNIL. > + trigger_clear(trigger); > + free(trigger); > + return 0; > +} > + > static int > lbox_fiber_storage(struct lua_State *L) > { > struct fiber *f = lbox_checkfiber(L, 1); > int storage_ref = f->storage.lua.ref; > if (storage_ref <= 0) { > + struct trigger *t = (struct trigger *) > + malloc(sizeof(*t)); > + if (t == NULL) { > + diag_set(OutOfMemory, sizeof(*t), "malloc", "t"); > + return luaT_error(L); > + } > + trigger_create(t, lbox_fiber_on_cleanup, NULL, > + (trigger_f0) free); > + trigger_add(&f->on_cleanup, t); > lua_newtable(L); /* create local storage on demand */ > storage_ref = luaL_ref(L, LUA_REGISTRYINDEX); > f->storage.lua.ref = storage_ref; > diff --git a/test/app/gh-4662-fiber-storage-leak.result b/test/app/gh-4662-fiber-storage-leak.result > new file mode 100644 > index 000000000..4ade017a4 > --- /dev/null > +++ b/test/app/gh-4662-fiber-storage-leak.result > @@ -0,0 +1,88 @@ > +-- test-run result file version 2 > +fiber = require('fiber') > + | --- > + | ... > +netbox = require('net.box') > + | --- > + | ... > + > +-- > +-- gh-4662: fiber.storage was not deleted when created in a fiber > +-- started from the thread pool used by IProto requests. The > +-- problem was that fiber.storage was created and deleted in Lua > +-- land only, assuming that only Lua-born fibers could have it. > +-- But in fact any fiber can create a Lua storage. Including the > +-- ones used to serve IProto requests. > +-- The test checks if fiber.storage is really deleted, and is not > +-- shared between requests. > +-- > + > +box.schema.user.grant('guest', 'execute', 'universe') > + | --- > + | ... > +storage = nil > + | --- > + | ... > +i = 0 > + | --- > + | ... > +weak_table = setmetatable({}, {__mode = 'v'}) > + | --- > + | ... > +object = {'object'} > + | --- > + | ... > +weak_table.object = object > + | --- > + | ... > +function ref_object_in_fiber() \ > + storage = fiber.self().storage \ > + assert(next(storage) == nil) \ > + i = i + 1 \ > + fiber.self().storage.key = i \ > + fiber.self().storage.object = object \ > +end > + | --- > + | ... > + > +c = netbox.connect(box.cfg.listen) > + | --- > + | ... > +c:call('ref_object_in_fiber') c:call('ref_object_in_fiber') > + | --- > + | ... > +storage > + | --- > + | - key: 2 > + | object: > + | - object > + | ... > +i > + | --- > + | - 2 > + | ... > +object = nil > + | --- > + | ... > +storage = nil > + | --- > + | ... > +collectgarbage('collect') > + | --- > + | - 0 > + | ... > +-- The weak table should be empty, because the only two hard > +-- references were in the fibers used to serve > +-- ref_object_in_fiber() requests. And their storages should be > +-- cleaned up. > +weak_table > + | --- > + | - [] > + | ... > + > +c:close() > + | --- > + | ... > +box.schema.user.revoke('guest', 'execute', 'universe') > + | --- > + | ... > diff --git a/test/app/gh-4662-fiber-storage-leak.test.lua b/test/app/gh-4662-fiber-storage-leak.test.lua > new file mode 100644 > index 000000000..25acf5679 > --- /dev/null > +++ b/test/app/gh-4662-fiber-storage-leak.test.lua > @@ -0,0 +1,43 @@ > +fiber = require('fiber') > +netbox = require('net.box') > + > +-- > +-- gh-4662: fiber.storage was not deleted when created in a fiber > +-- started from the thread pool used by IProto requests. The > +-- problem was that fiber.storage was created and deleted in Lua > +-- land only, assuming that only Lua-born fibers could have it. > +-- But in fact any fiber can create a Lua storage. Including the > +-- ones used to serve IProto requests. > +-- The test checks if fiber.storage is really deleted, and is not > +-- shared between requests. > +-- > + > +box.schema.user.grant('guest', 'execute', 'universe') > +storage = nil > +i = 0 > +weak_table = setmetatable({}, {__mode = 'v'}) > +object = {'object'} > +weak_table.object = object > +function ref_object_in_fiber() \ > + storage = fiber.self().storage \ > + assert(next(storage) == nil) \ > + i = i + 1 \ > + fiber.self().storage.key = i \ > + fiber.self().storage.object = object \ > +end > + > +c = netbox.connect(box.cfg.listen) > +c:call('ref_object_in_fiber') c:call('ref_object_in_fiber') > +storage > +i > +object = nil > +storage = nil > +collectgarbage('collect') > +-- The weak table should be empty, because the only two hard > +-- references were in the fibers used to serve > +-- ref_object_in_fiber() requests. And their storages should be > +-- cleaned up. > +weak_table > + > +c:close() > +box.schema.user.revoke('guest', 'execute', 'universe') > > ========================================================================= -- Best regards, IM