[patches] [fiber 1/1] fiber: Introduce fiber.join() related methods

Georgy Kirichenko georgy at tarantool.org
Tue Feb 20 19:54:09 MSK 2018


Seems to be Ok

On Tuesday, February 20, 2018 6:18:24 PM MSK imarkov wrote:
> From: Ilya <markovilya197 at gmail.com>
> 
> Introduce two functions
> * fiber.new() - create fiber, schedules it into the
> ready queue but doesn't call it and doesn't yield.
> Signature of the method is the same as for fiber.create
> * fiber.join() - waits until the specified fiber finishes
> its execution and returns result or error. Applicable only
> to joinable fibers.
> * fiber.set_joinable() - sets the fiber joinable flag
> 
> Closes #1397
> ---
>  src/fiber.c             |   2 +
>  src/fiber.h             |   5 +-
>  src/lua/fiber.c         | 128 +++++++++++++++++++++++++++++++-----
>  test/app/fiber.result   | 168
> ++++++++++++++++++++++++++++++++++++++++++++++++ test/app/fiber.test.lua | 
> 82 +++++++++++++++++++++++
>  5 files changed, 368 insertions(+), 17 deletions(-)
> 
> diff --git a/src/fiber.c b/src/fiber.c
> index 9415739..9933eeb 100644
> --- a/src/fiber.c
> +++ b/src/fiber.c
> @@ -383,6 +383,8 @@ fiber_join(struct fiber *fiber)
> 
>  	if (! fiber_is_dead(fiber)) {
>  		rlist_add_tail_entry(&fiber->wake, fiber(), state);
> +	}
> +	while (!fiber_is_dead(fiber)) {
>  		fiber_yield();
>  	}
>  	assert(fiber_is_dead(fiber));
> diff --git a/src/fiber.h b/src/fiber.h
> index 94b3f44..dcba245 100644
> --- a/src/fiber.h
> +++ b/src/fiber.h
> @@ -105,7 +105,9 @@ enum fiber_key {
>  	/** User global privilege and authentication token */
>  	FIBER_KEY_USER = 3,
>  	FIBER_KEY_MSG = 4,
> -	FIBER_KEY_MAX = 5
> +	/** Storage for lua stack */
> +	FIBER_KEY_LUA_STACK = 5,
> +	FIBER_KEY_MAX = 6
>  };
> 
>  /** \cond public */
> @@ -670,6 +672,7 @@ fiber_cxx_invoke(fiber_func f, va_list ap)
> 
>  #endif /* defined(__cplusplus) */
> 
> +
>  static inline void *
>  region_aligned_alloc_cb(void *ctx, size_t size)
>  {
> diff --git a/src/lua/fiber.c b/src/lua/fiber.c
> index 83b5825..312edd3 100644
> --- a/src/lua/fiber.c
> +++ b/src/lua/fiber.c
> @@ -291,35 +291,39 @@ lbox_fiber_info(struct lua_State *L)
>  }
> 
>  static int
> -lua_fiber_run_f(va_list ap)
> +lua_fiber_run_f(MAYBE_UNUSED va_list ap)
>  {
>  	int result;
> -	int coro_ref = va_arg(ap, int);
> -	struct lua_State *L = va_arg(ap, struct lua_State *);
> -
> -	result = luaT_call(L, lua_gettop(L) - 1, 0);
> +	struct lua_State *L = (struct lua_State *)
> +		fiber_get_key(fiber(), FIBER_KEY_LUA_STACK);
> +	int coro_ref = lua_tointeger(L, -1);
> +	lua_pop(L, 1);
> +	result = luaT_call(L, lua_gettop(L) - 1, LUA_MULTRET);
> 
>  	/* Destroy local storage */
>  	int storage_ref = (int)(intptr_t)
>  		fiber_get_key(fiber(), FIBER_KEY_LUA_STORAGE);
>  	if (storage_ref > 0)
>  		luaL_unref(L, LUA_REGISTRYINDEX, storage_ref);
> -	luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);
> +	/*
> +	 * If fiber is not joinable
> +	 * We can unref child stack here,
> +	 * otherwise we have to unref child stack in join
> +	 */
> +	if (fiber()->flags & FIBER_IS_JOINABLE)
> +		lua_pushinteger(L, coro_ref);
> +	else
> +		luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);
> +
>  	return result;
>  }
> 
>  /**
> - * Create, resume and detach a fiber
> - * given the function and its arguments.
> + * Utility function for fiber.create and fiber.new
>   */
> -static int
> -lbox_fiber_create(struct lua_State *L)
> +static struct fiber *
> +fiber_create(struct lua_State *L)
>  {
> -	if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> -		luaL_error(L, "fiber.create(function, ...): bad arguments");
> -	if (fiber_checkstack())
> -		luaL_error(L, "fiber.create(): out of fiber stack");
> -
>  	struct lua_State *child_L = lua_newthread(L);
>  	int coro_ref = luaL_ref(L, LUA_REGISTRYINDEX);
> 
> @@ -333,7 +337,45 @@ lbox_fiber_create(struct lua_State *L)
>  	lua_xmove(L, child_L, lua_gettop(L));
>  	/* XXX: 'fiber' is leaked if this throws a Lua error. */
>  	lbox_pushfiber(L, f->fid);
> -	fiber_start(f, coro_ref, child_L);
> +	/* Pass coro_ref via lua stack so that we don't have to pass it
> +	 * as an argument of fiber_run function.
> +	 * No function will work with child_L until the function is called.
> +	 * At that time we can pop coro_ref from stack
> +	 */
> +	lua_pushinteger(child_L, coro_ref);
> +	fiber_set_key(f, FIBER_KEY_LUA_STACK, child_L);
> +	return f;
> +}
> +
> +/**
> + * Create, resume and detach a fiber
> + * given the function and its arguments.
> + */
> +static int
> +lbox_fiber_create(struct lua_State *L)
> +{
> +	if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> +		luaL_error(L, "fiber.create(function, ...): bad arguments");
> +	if (fiber_checkstack())
> +		luaL_error(L, "fiber.create(): out of fiber stack");
> +	struct fiber *f = fiber_create(L);
> +	fiber_start(f);
> +	return 1;
> +}
> +
> +/**
> + * Create a fiber, schedule it for execution, but not invoke yet
> + */
> +static int
> +lbox_fiber_new(struct lua_State *L)
> +{
> +	if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> +		luaL_error(L, "fiber.new(function, ...): bad arguments");
> +	if (fiber_checkstack())
> +		luaL_error(L, "fiber.new(): out of fiber stack");
> +
> +	struct fiber *f = fiber_create(L);
> +	fiber_wakeup(f);
>  	return 1;
>  }
> 
> @@ -567,6 +609,55 @@ lbox_fiber_wakeup(struct lua_State *L)
>  	return 0;
>  }
> 
> +static int
> +lbox_fiber_join(struct lua_State *L)
> +{
> +	struct fiber *fiber = lbox_checkfiber(L, 1);
> +	struct lua_State *child_L = fiber_get_key(fiber, FIBER_KEY_LUA_STACK);
> +	fiber_join(fiber);
> +	struct error *e = NULL;
> +	int num_ret = 0;
> +	int coro_ref = 0;
> +	if (child_L != NULL) {
> +		coro_ref = lua_tointeger(child_L, -1);
> +		lua_pop(child_L, 1);
> +	}
> +	if (fiber->f_ret != 0) {
> +		/*
> +		 * After fiber_join the error of fiber being joined was moved to
> +		 * current fiber diag so we have to get it from there.
> +		 */
> +		assert(!diag_is_empty(&fiber()->diag));
> +		e = diag_last_error(&fiber()->diag);
> +		lua_pushboolean(L, false);
> +		luaT_pusherror(L, e);
> +		diag_clear(&fiber()->diag);
> +		num_ret = 1;
> +	} else {
> +		lua_pushboolean(L, true);
> +		if (child_L != NULL) {
> +			num_ret = lua_gettop(child_L);
> +			lua_xmove(child_L, L, num_ret);
> +		}
> +	}
> +	if (child_L != NULL)
> +		luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);
> +	return num_ret + 1;
> +}
> +
> +static int
> +lbox_fiber_set_joinable(struct lua_State *L)
> +{
> +
> +	if (lua_gettop(L) != 2) {
> +		luaL_error(L, "fiber.set_joinable(id, yesno): bad arguments");
> +	}
> +	struct fiber *fiber = lbox_checkfiber(L, 1);
> +	bool yesno = lua_toboolean(L, 2);
> +	fiber_set_joinable(fiber, yesno);
> +	return 0;
> +}
> +
>  static const struct luaL_Reg lbox_fiber_meta [] = {
>  	{"id", lbox_fiber_id},
>  	{"name", lbox_fiber_name},
> @@ -575,6 +666,8 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
>  	{"testcancel", lbox_fiber_testcancel},
>  	{"__serialize", lbox_fiber_serialize},
>  	{"__tostring", lbox_fiber_tostring},
> +	{"join", lbox_fiber_join},
> +	{"set_joinable", lbox_fiber_set_joinable},
>  	{"wakeup", lbox_fiber_wakeup},
>  	{"__index", lbox_fiber_index},
>  	{NULL, NULL}
> @@ -589,9 +682,12 @@ static const struct luaL_Reg fiberlib[] = {
>  	{"find", lbox_fiber_find},
>  	{"kill", lbox_fiber_cancel},
>  	{"wakeup", lbox_fiber_wakeup},
> +	{"join", lbox_fiber_join},
> +	{"set_joinable", lbox_fiber_set_joinable},
>  	{"cancel", lbox_fiber_cancel},
>  	{"testcancel", lbox_fiber_testcancel},
>  	{"create", lbox_fiber_create},
> +	{"new", lbox_fiber_new},
>  	{"status", lbox_fiber_status},
>  	{"name", lbox_fiber_name},
>  	{NULL, NULL}
> diff --git a/test/app/fiber.result b/test/app/fiber.result
> index 08d38ef..6e81c30 100644
> --- a/test/app/fiber.result
> +++ b/test/app/fiber.result
> @@ -975,6 +975,174 @@ session_type
>  session_type = nil
>  ---
>  ...
> +-- gh-1397 fiber.new, fiber.join
> +test_run:cmd("setopt delimiter ';'")
> +---
> +- true
> +...
> +function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
> +---
> +...
> +function test1()
> +    f = fiber.new(err)
> +    f:set_joinable(true)
> +    local st, e = f:join()
> +    return st, e
> +end;
> +---
> +...
> +st, e = test1();
> +---
> +...
> +st;
> +---
> +- false
> +...
> +e:unpack();
> +---
> +- type: ClientError
> +  code: 1
> +  message: Illegal parameters, oh my
> +  trace:
> +  - file: '[string "function err() box.error(box.error.ILLEGAL_PA..."]'
> +    line: 1
> +...
> +flag = false;
> +---
> +...
> +function test2()
> +    f = fiber.new(function() flag = true  end)
> +    fiber.set_joinable(f, true)
> +    fiber.join(f)
> +end;
> +---
> +...
> +test2();
> +---
> +...
> +flag;
> +---
> +- true
> +...
> +function test3()
> +    f = fiber.new(function() return "hello" end)
> +    fiber.set_joinable(f, true)
> +    return fiber.join(f)
> +end;
> +---
> +...
> +test3();
> +---
> +- true
> +- hello
> +...
> +function test4()
> +    f = fiber.new(function (i) return i + 1  end, 1)
> +    fiber.set_joinable(f, true)
> +    return f:join()
> +end;
> +---
> +...
> +test4();
> +---
> +- true
> +- 2
> +...
> +function test_double_join()
> +    f = fiber.new(function (i) return i + 1  end, 1)
> +    fiber.set_joinable(f, true)
> +    f:join()
> +    return f:join()
> +end;
> +---
> +...
> +test_double_join();
> +---
> +- error: the fiber is dead
> +...
> +function test5()
> +    f = fiber.new(function() end)
> +    f:set_joinable(true)
> +    return f, f:status()
> +end;
> +---
> +...
> +local status;
> +---
> +...
> +f, status = test5();
> +---
> +...
> +status;
> +---
> +- suspended
> +...
> +f:status();
> +---
> +- suspended
> +...
> +f:join();
> +---
> +- true
> +...
> +f:status();
> +---
> +- dead
> +...
> +function test6()
> +    f = fiber.new(function() end)
> +    f:set_joinable(true)
> +    f:set_joinable(false)
> +    return f, f:status()
> +end;
> +---
> +...
> +f, status = test6();
> +---
> +...
> +status;
> +---
> +- suspended
> +...
> +f:status();
> +---
> +- dead
> +...
> +-- test side fiber in transaction
> +s = box.schema.space.create("test");
> +---
> +...
> +_ = s:create_index("prim", {parts={1, 'number'}});
> +---
> +...
> +flag = false;
> +---
> +...
> +function test7(i)
> +    box.begin()
> +    s:put{i}
> +    fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
> +    box.rollback()
> +end;
> +---
> +...
> +f = fiber.create(test7, 1);
> +---
> +...
> +while flag ~= true do fiber.sleep(0.001) end;
> +---
> +...
> +s:select{};
> +---
> +- - [2]
> +...
> +s:drop();
> +---
> +...
> +test_run:cmd("setopt delimiter ''");
> +---
> +- true
> +...
>  fiber = nil
>  ---
>  ...
> diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
> index ad3da09..b7ca8d4 100644
> --- a/test/app/fiber.test.lua
> +++ b/test/app/fiber.test.lua
> @@ -402,6 +402,88 @@ _ = fiber.create(fn1)
>  session_type
>  session_type = nil
> 
> +-- gh-1397 fiber.new, fiber.join
> +test_run:cmd("setopt delimiter ';'")
> +function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
> +function test1()
> +    f = fiber.new(err)
> +    f:set_joinable(true)
> +    local st, e = f:join()
> +    return st, e
> +end;
> +st, e = test1();
> +st;
> +e:unpack();
> +
> +flag = false;
> +function test2()
> +    f = fiber.new(function() flag = true  end)
> +    fiber.set_joinable(f, true)
> +    fiber.join(f)
> +end;
> +test2();
> +flag;
> +
> +function test3()
> +    f = fiber.new(function() return "hello" end)
> +    fiber.set_joinable(f, true)
> +    return fiber.join(f)
> +end;
> +test3();
> +
> +function test4()
> +    f = fiber.new(function (i) return i + 1  end, 1)
> +    fiber.set_joinable(f, true)
> +    return f:join()
> +end;
> +test4();
> +
> +function test_double_join()
> +    f = fiber.new(function (i) return i + 1  end, 1)
> +    fiber.set_joinable(f, true)
> +    f:join()
> +    return f:join()
> +end;
> +test_double_join();
> +
> +
> +function test5()
> +    f = fiber.new(function() end)
> +    f:set_joinable(true)
> +    return f, f:status()
> +end;
> +local status;
> +f, status = test5();
> +status;
> +f:status();
> +f:join();
> +f:status();
> +
> +function test6()
> +    f = fiber.new(function() end)
> +    f:set_joinable(true)
> +    f:set_joinable(false)
> +    return f, f:status()
> +end;
> +f, status = test6();
> +status;
> +f:status();
> +
> +-- test side fiber in transaction
> +s = box.schema.space.create("test");
> +_ = s:create_index("prim", {parts={1, 'number'}});
> +flag = false;
> +function test7(i)
> +    box.begin()
> +    s:put{i}
> +    fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
> +    box.rollback()
> +end;
> +f = fiber.create(test7, 1);
> +while flag ~= true do fiber.sleep(0.001) end;
> +s:select{};
> +s:drop();
> +test_run:cmd("setopt delimiter ''");
>  fiber = nil
> 
>  --

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20180220/ada0e786/attachment.sig>


More information about the Tarantool-patches mailing list