[Tarantool-patches] [PATCH 2/2] fiber: destroy fiber.storage created by iproto

Igor Munkin imun at tarantool.org
Thu Dec 12 03:00:47 MSK 2019


Vlad,

Thanks for the patch!

Sorry, I needed some intro into all this machinery thus the review was
postponed a bit and you've managed to prepare a new patch regarding
Kostja's remarks. Therefore, I decided to move all my notes to the new
patch instead of splitting them into two replies.

This patch LGTM in general, but please consider several minor comments I
left below.

On 11.12.19, Vladislav Shpilevoy wrote:

<snipped>

> =========================================================================
> 
> The whole new patch:
> 
> =========================================================================
> 
>     fiber: destroy fiber.storage created by iproto
>     
>     Fiber.storage was not deleted when created in a fiber started from
>     the thread pool used by IProto requests. The problem was that
>     fiber.storage was created and deleted in Lua land only, assuming
>     that only Lua-born fibers could have it. But in fact any fiber can
>     create a Lua storage. Including the ones used to serve IProto
>     requests.
>     
>     Not deletion of the storage led to a possibility of meeting a
>     non-empty fiber.storage in the beginning of an iproto request, and
>     not deletion of the memory caught by the storage until its
>     explicit nullification.
>     
>     Now the storage destructor works for any fiber, which managed to
>     create the storage. The destructor unrefs and nullifies the
>     storage.
>     
>     For destructor purposes the fiber.on_stop triggers were reworked.
>     Now they are on_cleanup triggers, and can be called multiple times
>     during fiber's lifetime. After every request done by that fiber.
>     
>     Closes #4662
>     Closes #3462
> 
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index c39b8e7bf..600a21ac8 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -1306,6 +1306,12 @@ static void
>  tx_fiber_init(struct session *session, uint64_t sync)
>  {
>  	struct fiber *f = fiber();
> +	/*
> +	 * There should not be any not executed
> +	 * destructors from a previous request
> +	 * executed in that fiber.
> +	 */
> +	assert(rlist_empty(&f->on_cleanup));
>  	f->storage.net.sync = sync;
>  	/*
>  	 * We do not cleanup fiber keys at the end of each request.
> @@ -1321,6 +1327,17 @@ tx_fiber_init(struct session *session, uint64_t sync)
>  	fiber_set_user(f, &session->credentials);
>  }
>  
> +/**
> + * Cleanup current fiber after a request is
> + * executed to make it possible to reuse the fiber
> + * for a next request.
> + */
> +static inline void
> +tx_fiber_cleanup()
> +{
> +	fiber_cleanup(fiber());
> +}
> +
>  static void
>  tx_process_disconnect(struct cmsg *m)
>  {
> @@ -1331,6 +1348,7 @@ tx_process_disconnect(struct cmsg *m)
>  		if (! rlist_empty(&session_on_disconnect)) {
>  			tx_fiber_init(con->session, 0);
>  			session_run_on_disconnect_triggers(con->session);
> +			tx_fiber_cleanup();
>  		}
>  	}
>  }
> @@ -1486,6 +1504,7 @@ tx_reply_iproto_error(struct cmsg *m)
>  	iproto_reply_error(out, diag_last_error(&msg->diag),
>  			   msg->header.sync, ::schema_version);
>  	iproto_wpos_create(&msg->wpos, out);
> +	tx_fiber_cleanup();
>  }
>  
>  /** Inject a short delay on tx request processing for testing. */
> @@ -1519,9 +1538,11 @@ tx_process1(struct cmsg *m)
>  	iproto_reply_select(out, &svp, msg->header.sync, ::schema_version,
>  			    tuple != 0);
>  	iproto_wpos_create(&msg->wpos, out);
> -	return;
> +	goto end;
>  error:
>  	tx_reply_error(msg);
> +end:
> +	tx_fiber_cleanup();
>  }
>  
>  static void
> @@ -1562,9 +1583,11 @@ tx_process_select(struct cmsg *m)
>  	iproto_reply_select(out, &svp, msg->header.sync,
>  			    ::schema_version, count);
>  	iproto_wpos_create(&msg->wpos, out);
> -	return;
> +	goto end;
>  error:
>  	tx_reply_error(msg);
> +end:
> +	tx_fiber_cleanup();
>  }
>  
>  static int
> @@ -1652,9 +1675,11 @@ tx_process_call(struct cmsg *m)
>  	iproto_reply_select(out, &svp, msg->header.sync,
>  			    ::schema_version, count);
>  	iproto_wpos_create(&msg->wpos, out);
> -	return;
> +	goto end;
>  error:
>  	tx_reply_error(msg);
> +end:
> +	tx_fiber_cleanup();
>  }
>  
>  static void
> @@ -1695,9 +1720,11 @@ tx_process_misc(struct cmsg *m)
>  	} catch (Exception *e) {
>  		tx_reply_error(msg);
>  	}
> -	return;
> +	goto end;
>  error:
>  	tx_reply_error(msg);
> +end:
> +	tx_fiber_cleanup();
>  }
>  
>  static void
> @@ -1711,8 +1738,6 @@ tx_process_sql(struct cmsg *m)
>  	const char *sql;
>  	uint32_t len;
>  
> -	tx_fiber_init(msg->connection->session, msg->header.sync);
> -
>  	if (tx_check_schema(msg->header.schema_version))
>  		goto error;
>  	assert(msg->header.type == IPROTO_EXECUTE);
> @@ -1746,9 +1771,11 @@ tx_process_sql(struct cmsg *m)
>  	port_destroy(&port);
>  	iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version);
>  	iproto_wpos_create(&msg->wpos, out);
> -	return;
> +	goto end;
>  error:
>  	tx_reply_error(msg);
> +end:
> +	tx_fiber_cleanup();
>  }
>  
>  static void
> @@ -1781,11 +1808,12 @@ tx_process_join_subscribe(struct cmsg *m)
>  			unreachable();
>  		}
>  	} catch (SocketError *e) {
> -		return; /* don't write error response to prevent SIGPIPE */
> +		/* don't write error response to prevent SIGPIPE */
>  	} catch (Exception *e) {
>  		iproto_write_error(con->input.fd, e, ::schema_version,
>  				   msg->header.sync);
>  	}
> +	tx_fiber_cleanup();
>  }
>  
>  static void
> @@ -1891,6 +1919,7 @@ tx_process_connect(struct cmsg *m)
>  		tx_reply_error(msg);
>  		msg->close_connection = true;
>  	}
> +	tx_fiber_cleanup();
>  }
>  
>  /**
> diff --git a/src/box/session.cc b/src/box/session.cc
> index 461d1cf25..09e988148 100644
> --- a/src/box/session.cc
> +++ b/src/box/session.cc
> @@ -80,8 +80,12 @@ sid_max()
>  	return sid_max;
>  }
>  
> +/**
> + * Destroy session of a background fiber when the
> + * fiber is getting destroyed.
> + */
>  static int
> -session_on_stop(struct trigger *trigger, void * /* event */)
> +session_on_fiber_cleanup(struct trigger *trigger, void * /* event */)
>  {
>  	/*
>  	 * Remove on_stop trigger from the fiber, otherwise the
> @@ -167,11 +171,10 @@ session_create_on_demand()
>  	struct session *s = session_create(SESSION_TYPE_BACKGROUND);
>  	if (s == NULL)
>  		return NULL;
> -	s->fiber_on_stop = {
> -		RLIST_LINK_INITIALIZER, session_on_stop, NULL, NULL
> -	};
> -	/* Add a trigger to destroy session on fiber stop */
> -	trigger_add(&fiber()->on_stop, &s->fiber_on_stop);
> +	trigger_create(&s->fiber_on_cleanup, session_on_fiber_cleanup,
> +		       NULL, NULL);
> +	/* Add a trigger to destroy session on fiber cleanup. */
> +	trigger_add(&fiber()->on_cleanup, &s->fiber_on_cleanup);
>  	credentials_reset(&s->credentials, admin_user);
>  	fiber_set_session(fiber(), s);
>  	fiber_set_user(fiber(), &s->credentials);
> diff --git a/src/box/session.h b/src/box/session.h
> index eff3d7a67..710e4919b 100644
> --- a/src/box/session.h
> +++ b/src/box/session.h
> @@ -103,8 +103,11 @@ struct session {
>  	union session_meta meta;
>  	/** Session user id and global grants */
>  	struct credentials credentials;
> -	/** Trigger for fiber on_stop to cleanup created on-demand session */
> -	struct trigger fiber_on_stop;
> +	/**
> +	 * Trigger for fiber on_cleanup to cleanup created
> +	 * on-demand session.
> +	 */
> +	struct trigger fiber_on_cleanup;
>  };
>  
>  struct session_vtab {
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 963ec8eeb..7d549a6a9 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -41,7 +41,7 @@ double too_long_threshold;
>  static struct stailq txn_cache = {NULL, &txn_cache.first};
>  
>  static int
> -txn_on_stop(struct trigger *trigger, void *event);
> +txn_on_fiber_cleanup(struct trigger *trigger, void *event);
>  
>  static int
>  txn_on_yield(struct trigger *trigger, void *event);
> @@ -226,8 +226,9 @@ txn_begin()
>  	txn->fiber = NULL;
>  	fiber_set_txn(fiber(), txn);
>  	/* fiber_on_yield is initialized by engine on demand */
> -	trigger_create(&txn->fiber_on_stop, txn_on_stop, NULL, NULL);
> -	trigger_add(&fiber()->on_stop, &txn->fiber_on_stop);
> +	trigger_create(&txn->fiber_on_cleanup, txn_on_fiber_cleanup,
> +		       NULL, NULL);
> +	trigger_add(&fiber()->on_cleanup, &txn->fiber_on_cleanup);
>  	/*
>  	 * By default all transactions may yield.
>  	 * It's a responsibility of an engine to disable yields
> @@ -550,7 +551,7 @@ txn_prepare(struct txn *txn)
>  		if (engine_prepare(txn->engine, txn) != 0)
>  			return -1;
>  	}
> -	trigger_clear(&txn->fiber_on_stop);
> +	trigger_clear(&txn->fiber_on_cleanup);
>  	if (!txn_has_flag(txn, TXN_CAN_YIELD))
>  		trigger_clear(&txn->fiber_on_yield);
>  	return 0;
> @@ -618,7 +619,7 @@ void
>  txn_rollback(struct txn *txn)
>  {
>  	assert(txn == in_txn());
> -	trigger_clear(&txn->fiber_on_stop);
> +	trigger_clear(&txn->fiber_on_cleanup);
>  	if (!txn_has_flag(txn, TXN_CAN_YIELD))
>  		trigger_clear(&txn->fiber_on_yield);
>  	txn->signature = -1;
> @@ -840,7 +841,7 @@ txn_savepoint_release(struct txn_savepoint *svp)
>  }
>  
>  static int
> -txn_on_stop(struct trigger *trigger, void *event)
> +txn_on_fiber_cleanup(struct trigger *trigger, void *event)
>  {
>  	(void) trigger;
>  	(void) event;
> diff --git a/src/box/txn.h b/src/box/txn.h
> index da12feebf..755c464f0 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -207,10 +207,11 @@ struct txn {
>  	 */
>  	struct trigger fiber_on_yield;
>  	/**
> -	 * Trigger on fiber stop, to rollback transaction
> -	 * in case a fiber stops (all engines).
> +	 * Trigger on fiber cleanup, to rollback transaction
> +	 * in case a fiber is getting reset/recycled/destroyed
> +	 * (all engines).
>  	 */
> -	struct trigger fiber_on_stop;
> +	struct trigger fiber_on_cleanup;
>  	/** Commit and rollback triggers. */
>  	struct rlist on_commit, on_rollback;
>  	/**
> diff --git a/src/lib/core/fiber.c b/src/lib/core/fiber.c
> index 00ae8cded..d89c640aa 100644
> --- a/src/lib/core/fiber.c
> +++ b/src/lib/core/fiber.c
> @@ -325,6 +325,20 @@ fiber_attr_getstacksize(struct fiber_attr *fiber_attr)
>  				    fiber_attr_default.stack_size;
>  }
>  
> +void
> +fiber_cleanup(struct fiber *f)
> +{
> +	if (trigger_run(&f->on_cleanup, f) != 0)
> +		panic("Fiber cleanup can't fail");
> +	/*
> +	 * All cleanup triggers are supposed to
> +	 * remove themselves. So as no to waste
> +	 * time on that here, and to make them all
> +	 * work uniformly.
> +	 */
> +	assert(rlist_empty(&f->on_cleanup));
> +}
> +
>  static void
>  fiber_recycle(struct fiber *fiber);
>  
> @@ -787,7 +801,7 @@ static void
>  fiber_reset(struct fiber *fiber)
>  {
>  	rlist_create(&fiber->on_yield);
> -	rlist_create(&fiber->on_stop);
> +	rlist_create(&fiber->on_cleanup);
>  	fiber->flags = FIBER_DEFAULT_FLAGS;
>  #if ENABLE_FIBER_TOP
>  	clock_stat_reset(&fiber->clock_stat);
> @@ -856,8 +870,7 @@ fiber_loop(MAYBE_UNUSED void *data)
>  		       assert(f != fiber);
>  		       fiber_wakeup(f);
>  	        }
> -		if (! rlist_empty(&fiber->on_stop))
> -			trigger_run(&fiber->on_stop, fiber);
> +	        fiber_cleanup(fiber);

I see you dropped the if above and fiber_cleanup doesn't have any
corresponding within. Is this removal an intentional one?

Minor: There is a mess with whitespace above. Feel free to ignore the
remark, since indentation was broken before your changes. I blamed
several lines and it looks more like a ridiculous code style, and not a
problem with editor. Thereby you just implicitly continue the mix.

>  		/* reset pending wakeups */
>  		rlist_del(&fiber->state);
>  		if (! (fiber->flags & FIBER_IS_JOINABLE))
> @@ -1161,7 +1174,7 @@ fiber_destroy(struct cord *cord, struct fiber *f)
>  	assert(f != &cord->sched);
>  
>  	trigger_destroy(&f->on_yield);
> -	trigger_destroy(&f->on_stop);
> +	trigger_destroy(&f->on_cleanup);
>  	rlist_del(&f->state);
>  	rlist_del(&f->link);
>  	region_destroy(&f->gc);
> @@ -1525,8 +1538,8 @@ cord_cojoin(struct cord *cord)
>  int
>  break_ev_loop_f(struct trigger *trigger, void *event)
>  {
> -	(void) trigger;
>  	(void) event;
> +	trigger_clear(trigger);
>  	ev_break(loop(), EVBREAK_ALL);
>  	return 0;
>  }
> @@ -1555,7 +1568,7 @@ cord_costart_thread_func(void *arg)
>  	 * Got to be in a trigger, to break the loop even
>  	 * in case of an exception.
>  	 */
> -	trigger_add(&f->on_stop, &break_ev_loop);
> +	trigger_add(&f->on_cleanup, &break_ev_loop);
>  	fiber_set_joinable(f, true);
>  	fiber_start(f, ctx.arg);
>  	if (!fiber_is_dead(f)) {
> diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h
> index c5b975513..21fff8f88 100644
> --- a/src/lib/core/fiber.h
> +++ b/src/lib/core/fiber.h
> @@ -458,8 +458,17 @@ struct fiber {
>  
>  	/** Triggers invoked before this fiber yields. Must not throw. */
>  	struct rlist on_yield;
> -	/** Triggers invoked before this fiber stops.  Must not throw. */
> -	struct rlist on_stop;
> +	/**
> +	 * Triggers invoked before this fiber is
> +	 * stopped/reset/recycled/destroyed/reused. In other
> +	 * words, each time when the fiber has finished execution
> +	 * of a request.
> +	 * In particular, for fibers not from fiber pool the
> +	 * cleanup is done before destroy and death.
> +	 * Pooled fibers are cleaned up after each request, even
> +	 * if they are never destroyed.
> +	 */
> +	struct rlist on_cleanup;

Minor: both trigger lists above has the "Must not throw" note. Does the
introduced list respect this rule? Please, adjust the comment if yes.

>  	/**
>  	 * The list of fibers awaiting for this fiber's timely
>  	 * (or untimely) death.
> @@ -506,6 +515,10 @@ struct fiber {
>  	char name[FIBER_NAME_MAX + 1];
>  };
>  
> +/** Invoke cleanup triggers and delete them. */
> +void
> +fiber_cleanup(struct fiber *f);
> +
>  struct cord_on_exit;
>  
>  /**
> diff --git a/src/lua/fiber.c b/src/lua/fiber.c
> index a7b75f9bf..4d2828f1c 100644
> --- a/src/lua/fiber.c
> +++ b/src/lua/fiber.c
> @@ -441,11 +441,6 @@ lua_fiber_run_f(MAYBE_UNUSED va_list ap)
>  	int coro_ref = lua_tointeger(L, -1);
>  	lua_pop(L, 1);
>  	result = luaT_call(L, lua_gettop(L) - 1, LUA_MULTRET);
> -
> -	/* Destroy local storage */
> -	int storage_ref = f->storage.lua.ref;
> -	if (storage_ref > 0)
> -		luaL_unref(tarantool_L, LUA_REGISTRYINDEX, storage_ref);
>  	/*
>  	 * If fiber is not joinable
>  	 * We can unref child stack here,
> @@ -606,12 +601,43 @@ lbox_fiber_name(struct lua_State *L)
>  	}
>  }
>  
> +/**
> + * Trigger invoked when the fiber has finished execution of its
> + * current request. Only purpose - delete storage.lua.ref keeping
> + * a reference of Lua fiber.storage object. Unlike Lua stack,
> + * Lua fiber storage may be created not only for fibers born from
> + * Lua land. For example, an IProto request may execute a Lua
> + * function, which can create the storage. Trigger guarantees,
> + * that even for non-Lua fibers the Lua storage is destroyed.
> + */
> +static int
> +lbox_fiber_on_cleanup(struct trigger *trigger, void *event)
> +{
> +	struct fiber *f = (struct fiber *) event;
> +	int storage_ref = f->storage.lua.ref;
> +	assert(storage_ref > 0);
> +	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, storage_ref);
> +	f->storage.lua.ref = 0;

Though 0 value is specific "ref" (a table slot, where the LRU ref is
stored) and this value cannot be yield from luaL_ref. Please, consider
using LUA_NOREF value for such cases, since it's being provided by Lua
and lua_unref does nothing for it, as well as for LUA_REFNIL.

> +	trigger_clear(trigger);
> +	free(trigger);
> +	return 0;
> +}
> +
>  static int
>  lbox_fiber_storage(struct lua_State *L)
>  {
>  	struct fiber *f = lbox_checkfiber(L, 1);
>  	int storage_ref = f->storage.lua.ref;
>  	if (storage_ref <= 0) {
> +		struct trigger *t = (struct trigger *)
> +			malloc(sizeof(*t));
> +		if (t == NULL) {
> +			diag_set(OutOfMemory, sizeof(*t), "malloc", "t");
> +			return luaT_error(L);
> +		}
> +		trigger_create(t, lbox_fiber_on_cleanup, NULL,
> +			       (trigger_f0) free);
> +		trigger_add(&f->on_cleanup, t);
>  		lua_newtable(L); /* create local storage on demand */
>  		storage_ref = luaL_ref(L, LUA_REGISTRYINDEX);
>  		f->storage.lua.ref = storage_ref;
> diff --git a/test/app/gh-4662-fiber-storage-leak.result b/test/app/gh-4662-fiber-storage-leak.result
> new file mode 100644
> index 000000000..4ade017a4
> --- /dev/null
> +++ b/test/app/gh-4662-fiber-storage-leak.result
> @@ -0,0 +1,88 @@
> +-- test-run result file version 2
> +fiber = require('fiber')
> + | ---
> + | ...
> +netbox = require('net.box')
> + | ---
> + | ...
> +
> +--
> +-- gh-4662: fiber.storage was not deleted when created in a fiber
> +-- started from the thread pool used by IProto requests. The
> +-- problem was that fiber.storage was created and deleted in Lua
> +-- land only, assuming that only Lua-born fibers could have it.
> +-- But in fact any fiber can create a Lua storage. Including the
> +-- ones used to serve IProto requests.
> +-- The test checks if fiber.storage is really deleted, and is not
> +-- shared between requests.
> +--
> +
> +box.schema.user.grant('guest', 'execute', 'universe')
> + | ---
> + | ...
> +storage = nil
> + | ---
> + | ...
> +i = 0
> + | ---
> + | ...
> +weak_table = setmetatable({}, {__mode = 'v'})
> + | ---
> + | ...
> +object = {'object'}
> + | ---
> + | ...
> +weak_table.object = object
> + | ---
> + | ...
> +function ref_object_in_fiber()                  \
> +    storage = fiber.self().storage              \
> +    assert(next(storage) == nil)                \
> +    i = i + 1                                   \
> +    fiber.self().storage.key = i                \
> +    fiber.self().storage.object = object        \
> +end
> + | ---
> + | ...
> +
> +c = netbox.connect(box.cfg.listen)
> + | ---
> + | ...
> +c:call('ref_object_in_fiber') c:call('ref_object_in_fiber')
> + | ---
> + | ...
> +storage
> + | ---
> + | - key: 2
> + |   object:
> + |   - object
> + | ...
> +i
> + | ---
> + | - 2
> + | ...
> +object = nil
> + | ---
> + | ...
> +storage = nil
> + | ---
> + | ...
> +collectgarbage('collect')
> + | ---
> + | - 0
> + | ...
> +-- The weak table should be empty, because the only two hard
> +-- references were in the fibers used to serve
> +-- ref_object_in_fiber() requests. And their storages should be
> +-- cleaned up.
> +weak_table
> + | ---
> + | - []
> + | ...
> +
> +c:close()
> + | ---
> + | ...
> +box.schema.user.revoke('guest', 'execute', 'universe')
> + | ---
> + | ...
> diff --git a/test/app/gh-4662-fiber-storage-leak.test.lua b/test/app/gh-4662-fiber-storage-leak.test.lua
> new file mode 100644
> index 000000000..25acf5679
> --- /dev/null
> +++ b/test/app/gh-4662-fiber-storage-leak.test.lua
> @@ -0,0 +1,43 @@
> +fiber = require('fiber')
> +netbox = require('net.box')
> +
> +--
> +-- gh-4662: fiber.storage was not deleted when created in a fiber
> +-- started from the thread pool used by IProto requests. The
> +-- problem was that fiber.storage was created and deleted in Lua
> +-- land only, assuming that only Lua-born fibers could have it.
> +-- But in fact any fiber can create a Lua storage. Including the
> +-- ones used to serve IProto requests.
> +-- The test checks if fiber.storage is really deleted, and is not
> +-- shared between requests.
> +--
> +
> +box.schema.user.grant('guest', 'execute', 'universe')
> +storage = nil
> +i = 0
> +weak_table = setmetatable({}, {__mode = 'v'})
> +object = {'object'}
> +weak_table.object = object
> +function ref_object_in_fiber()                  \
> +    storage = fiber.self().storage              \
> +    assert(next(storage) == nil)                \
> +    i = i + 1                                   \
> +    fiber.self().storage.key = i                \
> +    fiber.self().storage.object = object        \
> +end
> +
> +c = netbox.connect(box.cfg.listen)
> +c:call('ref_object_in_fiber') c:call('ref_object_in_fiber')
> +storage
> +i
> +object = nil
> +storage = nil
> +collectgarbage('collect')
> +-- The weak table should be empty, because the only two hard
> +-- references were in the fibers used to serve
> +-- ref_object_in_fiber() requests. And their storages should be
> +-- cleaned up.
> +weak_table
> +
> +c:close()
> +box.schema.user.revoke('guest', 'execute', 'universe')
> 
> =========================================================================

-- 
Best regards,
IM


More information about the Tarantool-patches mailing list