[patches] [fiber 1/1] fiber: Introduce fiber.join() related methods

Georgy Kirichenko georgy at tarantool.org
Fri Feb 16 11:14:44 MSK 2018


You are on a right way but something should be fixed

1. Try to use existing fiber_join instead of introduce a new one. There isn't a 
big deal to erase a current fiber diagnostic.
2. It would be nice to return from a join a pair ov values: true/false and 
result/error like a pcall does
3. I think we should select a better name instead of FIBER_RESULT_KEY


On Thursday, February 15, 2018 2:49:33 PM MSK imarkov wrote:
> From: Ilya <markovilya197 at gmail.com>
> 
> Introduce two functions
> * fiber.new() - create fiber, schedules it into the
> ready queue but doesn't call it and doesn't yield.
> Signature of the method is the same as for fiber.create
> * fiber.join() - waits until the specified fiber finishes
> its execution and returns result or error. Applicable only
> to joinable fibers.
> * fiber.set_joinable() - sets the fiber joinable flag
> 
> Closes #1397
> ---
>  src/fiber.c             |  16 ++++--
>  src/fiber.h             |  12 +++-
>  src/lua/fiber.c         | 113 +++++++++++++++++++++++++++++++++-----
>  test/app/fiber.result   | 142
> ++++++++++++++++++++++++++++++++++++++++++++++++ test/app/fiber.test.lua | 
> 72 ++++++++++++++++++++++++
>  5 files changed, 333 insertions(+), 22 deletions(-)
> 
> diff --git a/src/fiber.c b/src/fiber.c
> index 9415739..cd3263d 100644
> --- a/src/fiber.c
> +++ b/src/fiber.c
> @@ -148,8 +148,6 @@ fiber_attr_getstacksize(struct fiber_attr *fiber_attr)
>  				    fiber_attr_default.stack_size;
>  }
> 
> -static void
> -fiber_recycle(struct fiber *fiber);
> 
>  static void
>  fiber_destroy(struct cord *cord, struct fiber *f);
> @@ -376,8 +374,8 @@ fiber_reschedule(void)
>  	fiber_yield();
>  }
> 
> -int
> -fiber_join(struct fiber *fiber)
> +void
> +fiber_join_wait(struct fiber *fiber)
>  {
>  	assert(fiber->flags & FIBER_IS_JOINABLE);
> 
> @@ -386,8 +384,14 @@ fiber_join(struct fiber *fiber)
>  		fiber_yield();
>  	}
>  	assert(fiber_is_dead(fiber));
> -	/* Move exception to the caller */
> +}
> +
> +int
> +fiber_join(struct fiber *fiber)
> +{
> +	fiber_join_wait(fiber);
>  	int ret = fiber->f_ret;
> +	/* Move exception to the caller */
>  	if (ret != 0) {
>  		assert(!diag_is_empty(&fiber->diag));
>  		diag_move(&fiber->diag, &fiber()->diag);
> @@ -598,7 +602,7 @@ fiber_reset(struct fiber *fiber)
>  }
> 
>  /** Destroy an active fiber and prepare it for reuse. */
> -static void
> +void
>  fiber_recycle(struct fiber *fiber)
>  {
>  	/* no exceptions are leaking */
> diff --git a/src/fiber.h b/src/fiber.h
> index 94b3f44..72caaf9 100644
> --- a/src/fiber.h
> +++ b/src/fiber.h
> @@ -105,7 +105,8 @@ enum fiber_key {
>  	/** User global privilege and authentication token */
>  	FIBER_KEY_USER = 3,
>  	FIBER_KEY_MSG = 4,
> -	FIBER_KEY_MAX = 5
> +	FIBER_KEY_RESULT = 5,
> +	FIBER_KEY_MAX = 6
>  };
> 
>  /** \cond public */
> @@ -625,6 +626,14 @@ fiber_c_invoke(fiber_func f, va_list ap)
>  	return f(ap);
>  }
> 
> +/** Destroy an active fiber and prepare it for reuse. */
> +void
> +fiber_recycle(struct fiber *fiber);
> +
> +/** Wait until fiber is finished. Applicable only to joinable fibers */
> +void
> +fiber_join_wait(struct fiber *fiber);
> +
>  #if defined(__cplusplus)
>  } /* extern "C" */
> 
> @@ -670,6 +679,7 @@ fiber_cxx_invoke(fiber_func f, va_list ap)
> 
>  #endif /* defined(__cplusplus) */
> 
> +
>  static inline void *
>  region_aligned_alloc_cb(void *ctx, size_t size)
>  {
> diff --git a/src/lua/fiber.c b/src/lua/fiber.c
> index 83b5825..54b0341 100644
> --- a/src/lua/fiber.c
> +++ b/src/lua/fiber.c
> @@ -291,13 +291,14 @@ lbox_fiber_info(struct lua_State *L)
>  }
> 
>  static int
> -lua_fiber_run_f(va_list ap)
> +lua_fiber_run_f(MAYBE_UNUSED va_list ap)
>  {
>  	int result;
> -	int coro_ref = va_arg(ap, int);
> -	struct lua_State *L = va_arg(ap, struct lua_State *);
> -
> -	result = luaT_call(L, lua_gettop(L) - 1, 0);
> +	struct lua_State *L = (struct lua_State *)
> +		fiber_get_key(fiber(), FIBER_KEY_RESULT);
> +	int coro_ref = lua_tointeger(L, -1);
> +	lua_pop(L, 1);
> +	result = luaT_call(L, lua_gettop(L) - 1, LUA_MULTRET);
> 
>  	/* Destroy local storage */
>  	int storage_ref = (int)(intptr_t)
> @@ -309,17 +310,11 @@ lua_fiber_run_f(va_list ap)
>  }
> 
>  /**
> - * Create, resume and detach a fiber
> - * given the function and its arguments.
> + * Utility function for fiber.create and fiber.new
>   */
> -static int
> -lbox_fiber_create(struct lua_State *L)
> +static struct fiber *
> +fiber_create(struct lua_State *L)
>  {
> -	if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> -		luaL_error(L, "fiber.create(function, ...): bad arguments");
> -	if (fiber_checkstack())
> -		luaL_error(L, "fiber.create(): out of fiber stack");
> -
>  	struct lua_State *child_L = lua_newthread(L);
>  	int coro_ref = luaL_ref(L, LUA_REGISTRYINDEX);
> 
> @@ -333,7 +328,45 @@ lbox_fiber_create(struct lua_State *L)
>  	lua_xmove(L, child_L, lua_gettop(L));
>  	/* XXX: 'fiber' is leaked if this throws a Lua error. */
>  	lbox_pushfiber(L, f->fid);
> -	fiber_start(f, coro_ref, child_L);
> +	/* Pass coro_ref via lua stack so that we don't have to pass it
> +	 * as an argument of fiber_run function.
> +	 * No function will work with child_L until the function is called.
> +	 * At that time we can pop coro_ref from stack
> +	 */
> +	lua_pushinteger(child_L, coro_ref);
> +	fiber_set_key(f, FIBER_KEY_RESULT, child_L);
> +	return f;
> +}
> +
> +/**
> + * Create, resume and detach a fiber
> + * given the function and its arguments.
> + */
> +static int
> +lbox_fiber_create(struct lua_State *L)
> +{
> +	if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> +		luaL_error(L, "fiber.create(function, ...): bad arguments");
> +	if (fiber_checkstack())
> +		luaL_error(L, "fiber.create(): out of fiber stack");
> +	struct fiber *f = fiber_create(L);
> +	fiber_start(f);
> +	return 1;
> +}
> +
> +/**
> + * Create a fiber, schedule it for execution, but not invoke yet
> + */
> +static int
> +lbox_fiber_new(struct lua_State *L)
> +{
> +	if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> +		luaL_error(L, "fiber.new(function, ...): bad arguments");
> +	if (fiber_checkstack())
> +		luaL_error(L, "fiber.new(): out of fiber stack");
> +
> +	struct fiber *f = fiber_create(L);
> +	fiber_wakeup(f);
>  	return 1;
>  }
> 
> @@ -567,6 +600,51 @@ lbox_fiber_wakeup(struct lua_State *L)
>  	return 0;
>  }
> 
> +static int
> +lbox_fiber_join(struct lua_State *L)
> +{
> +	struct fiber *fiber = lbox_checkfiber(L, 1);
> +	fiber_join_wait(fiber);
> +	bool fiber_was_cancelled = fiber->flags & FIBER_IS_CANCELLED;
> +	struct error *e = NULL;
> +	if (fiber->f_ret != 0 && !fiber_was_cancelled) {
> +		/*
> +		 * We do not want to spoil the diag of the current
> +		 * fiber so not calling luaT_error().
> +		 */
> +		assert(!diag_is_empty(&fiber->diag));
> +		e = diag_last_error(&fiber->diag);
> +		error_ref(e);
> +		luaT_pusherror(L, e);
> +		diag_clear(&fiber->diag);
> +	}
> +	struct lua_State *child_L = fiber_get_key(fiber, FIBER_KEY_RESULT);
> +	int num_ret = 0;
> +	if (child_L != NULL) {
> +		num_ret = lua_gettop(child_L);
> +		lua_xmove(child_L, L, num_ret);
> +	}
> +	fiber_recycle(fiber);
> +	if (e != NULL) {
> +		error_unref(e);
> +		lua_error(L);
> +	}
> +	return num_ret;
> +}
> +
> +static int
> +lbox_fiber_set_joinable(struct lua_State *L)
> +{
> +
> +	if (lua_gettop(L) != 2) {
> +		luaL_error(L, "fiber.set_joinable(id, yesno): bad arguments");
> +	}
> +	struct fiber *fiber = lbox_checkfiber(L, 1);
> +	bool yesno = lua_toboolean(L, 2);
> +	fiber_set_joinable(fiber, yesno);
> +	return 0;
> +}
> +
>  static const struct luaL_Reg lbox_fiber_meta [] = {
>  	{"id", lbox_fiber_id},
>  	{"name", lbox_fiber_name},
> @@ -575,6 +653,8 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
>  	{"testcancel", lbox_fiber_testcancel},
>  	{"__serialize", lbox_fiber_serialize},
>  	{"__tostring", lbox_fiber_tostring},
> +	{"join", lbox_fiber_join},
> +	{"set_joinable", lbox_fiber_set_joinable},
>  	{"wakeup", lbox_fiber_wakeup},
>  	{"__index", lbox_fiber_index},
>  	{NULL, NULL}
> @@ -589,9 +669,12 @@ static const struct luaL_Reg fiberlib[] = {
>  	{"find", lbox_fiber_find},
>  	{"kill", lbox_fiber_cancel},
>  	{"wakeup", lbox_fiber_wakeup},
> +	{"join", lbox_fiber_join},
> +	{"set_joinable", lbox_fiber_set_joinable},
>  	{"cancel", lbox_fiber_cancel},
>  	{"testcancel", lbox_fiber_testcancel},
>  	{"create", lbox_fiber_create},
> +	{"new", lbox_fiber_new},
>  	{"status", lbox_fiber_status},
>  	{"name", lbox_fiber_name},
>  	{NULL, NULL}
> diff --git a/test/app/fiber.result b/test/app/fiber.result
> index 08d38ef..3ecdbd0 100644
> --- a/test/app/fiber.result
> +++ b/test/app/fiber.result
> @@ -975,6 +975,148 @@ session_type
>  session_type = nil
>  ---
>  ...
> +-- gh-1397 fiber.new, fiber.join
> +test_run:cmd("setopt delimiter ';'")
> +---
> +- true
> +...
> +function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
> +---
> +...
> +function test1()
> +    f = fiber.new(err)
> +    f:set_joinable(true)
> +    local e = f:join()
> +    local e2 = f:join()
> +    return e, e2
> +end;
> +---
> +...
> +test1();
> +---
> +- error: Illegal parameters, oh my
> +...
> +flag = false;
> +---
> +...
> +function test2()
> +    f = fiber.new(function() flag = true  end)
> +    fiber.set_joinable(f, true)
> +    fiber.join(f)
> +end;
> +---
> +...
> +test2();
> +---
> +...
> +flag;
> +---
> +- true
> +...
> +function test3()
> +    f = fiber.new(function() return "hello" end)
> +    fiber.set_joinable(f, true)
> +    return fiber.join(f)
> +end;
> +---
> +...
> +test3();
> +---
> +- hello
> +...
> +function test4()
> +    f = fiber.new(function (i) return i + 1  end, 1)
> +    fiber.set_joinable(f, true)
> +    return f:join()
> +end;
> +---
> +...
> +test4();
> +---
> +- 2
> +...
> +function test5()
> +    f = fiber.new(function() end)
> +    f:set_joinable(true)
> +    return f, f:status()
> +end;
> +---
> +...
> +local status;
> +---
> +...
> +f, status = test5();
> +---
> +...
> +status;
> +---
> +- suspended
> +...
> +f:status();
> +---
> +- suspended
> +...
> +f:join();
> +---
> +...
> +f:status();
> +---
> +- dead
> +...
> +function test6()
> +    f = fiber.new(function() end)
> +    f:set_joinable(true)
> +    f:set_joinable(false)
> +    return f, f:status()
> +end;
> +---
> +...
> +f, status = test6();
> +---
> +...
> +status;
> +---
> +- suspended
> +...
> +f:status();
> +---
> +- dead
> +...
> +-- test side fiber in transaction
> +s = box.schema.space.create("test");
> +---
> +...
> +_ = s:create_index("prim", {parts={1, 'number'}});
> +---
> +...
> +flag = false;
> +---
> +...
> +function test7(i)
> +    box.begin()
> +    s:put{i}
> +    fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
> +    box.rollback()
> +end;
> +---
> +...
> +f = fiber.create(test7, 1);
> +---
> +...
> +while flag ~= true do fiber.sleep(0.001) end;
> +---
> +...
> +s:select{};
> +---
> +- - [2]
> +...
> +s:drop();
> +---
> +...
> +test_run:cmd("setopt delimiter ''");
> +---
> +- true
> +...
>  fiber = nil
>  ---
>  ...
> diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
> index ad3da09..b509f4a 100644
> --- a/test/app/fiber.test.lua
> +++ b/test/app/fiber.test.lua
> @@ -402,6 +402,78 @@ _ = fiber.create(fn1)
>  session_type
>  session_type = nil
> 
> +-- gh-1397 fiber.new, fiber.join
> +test_run:cmd("setopt delimiter ';'")
> +function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
> +function test1()
> +    f = fiber.new(err)
> +    f:set_joinable(true)
> +    local e = f:join()
> +    local e2 = f:join()
> +    return e, e2
> +end;
> +test1();
> +
> +flag = false;
> +function test2()
> +    f = fiber.new(function() flag = true  end)
> +    fiber.set_joinable(f, true)
> +    fiber.join(f)
> +end;
> +test2();
> +flag;
> +
> +function test3()
> +    f = fiber.new(function() return "hello" end)
> +    fiber.set_joinable(f, true)
> +    return fiber.join(f)
> +end;
> +test3();
> +
> +function test4()
> +    f = fiber.new(function (i) return i + 1  end, 1)
> +    fiber.set_joinable(f, true)
> +    return f:join()
> +end;
> +test4();
> +
> +function test5()
> +    f = fiber.new(function() end)
> +    f:set_joinable(true)
> +    return f, f:status()
> +end;
> +local status;
> +f, status = test5();
> +status;
> +f:status();
> +f:join();
> +f:status();
> +
> +function test6()
> +    f = fiber.new(function() end)
> +    f:set_joinable(true)
> +    f:set_joinable(false)
> +    return f, f:status()
> +end;
> +f, status = test6();
> +status;
> +f:status();
> +
> +-- test side fiber in transaction
> +s = box.schema.space.create("test");
> +_ = s:create_index("prim", {parts={1, 'number'}});
> +flag = false;
> +function test7(i)
> +    box.begin()
> +    s:put{i}
> +    fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
> +    box.rollback()
> +end;
> +f = fiber.create(test7, 1);
> +while flag ~= true do fiber.sleep(0.001) end;
> +s:select{};
> +s:drop();
> +test_run:cmd("setopt delimiter ''");
>  fiber = nil
> 
>  --

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20180216/88ed337b/attachment.sig>


More information about the Tarantool-patches mailing list