[patches] [fiber 1/1] fiber: Introduce fiber.join() related methods
Georgy Kirichenko
georgy at tarantool.org
Fri Feb 16 11:14:44 MSK 2018
You are on a right way but something should be fixed
1. Try to use existing fiber_join instead of introduce a new one. There isn't a
big deal to erase a current fiber diagnostic.
2. It would be nice to return from a join a pair ov values: true/false and
result/error like a pcall does
3. I think we should select a better name instead of FIBER_RESULT_KEY
On Thursday, February 15, 2018 2:49:33 PM MSK imarkov wrote:
> From: Ilya <markovilya197 at gmail.com>
>
> Introduce two functions
> * fiber.new() - create fiber, schedules it into the
> ready queue but doesn't call it and doesn't yield.
> Signature of the method is the same as for fiber.create
> * fiber.join() - waits until the specified fiber finishes
> its execution and returns result or error. Applicable only
> to joinable fibers.
> * fiber.set_joinable() - sets the fiber joinable flag
>
> Closes #1397
> ---
> src/fiber.c | 16 ++++--
> src/fiber.h | 12 +++-
> src/lua/fiber.c | 113 +++++++++++++++++++++++++++++++++-----
> test/app/fiber.result | 142
> ++++++++++++++++++++++++++++++++++++++++++++++++ test/app/fiber.test.lua |
> 72 ++++++++++++++++++++++++
> 5 files changed, 333 insertions(+), 22 deletions(-)
>
> diff --git a/src/fiber.c b/src/fiber.c
> index 9415739..cd3263d 100644
> --- a/src/fiber.c
> +++ b/src/fiber.c
> @@ -148,8 +148,6 @@ fiber_attr_getstacksize(struct fiber_attr *fiber_attr)
> fiber_attr_default.stack_size;
> }
>
> -static void
> -fiber_recycle(struct fiber *fiber);
>
> static void
> fiber_destroy(struct cord *cord, struct fiber *f);
> @@ -376,8 +374,8 @@ fiber_reschedule(void)
> fiber_yield();
> }
>
> -int
> -fiber_join(struct fiber *fiber)
> +void
> +fiber_join_wait(struct fiber *fiber)
> {
> assert(fiber->flags & FIBER_IS_JOINABLE);
>
> @@ -386,8 +384,14 @@ fiber_join(struct fiber *fiber)
> fiber_yield();
> }
> assert(fiber_is_dead(fiber));
> - /* Move exception to the caller */
> +}
> +
> +int
> +fiber_join(struct fiber *fiber)
> +{
> + fiber_join_wait(fiber);
> int ret = fiber->f_ret;
> + /* Move exception to the caller */
> if (ret != 0) {
> assert(!diag_is_empty(&fiber->diag));
> diag_move(&fiber->diag, &fiber()->diag);
> @@ -598,7 +602,7 @@ fiber_reset(struct fiber *fiber)
> }
>
> /** Destroy an active fiber and prepare it for reuse. */
> -static void
> +void
> fiber_recycle(struct fiber *fiber)
> {
> /* no exceptions are leaking */
> diff --git a/src/fiber.h b/src/fiber.h
> index 94b3f44..72caaf9 100644
> --- a/src/fiber.h
> +++ b/src/fiber.h
> @@ -105,7 +105,8 @@ enum fiber_key {
> /** User global privilege and authentication token */
> FIBER_KEY_USER = 3,
> FIBER_KEY_MSG = 4,
> - FIBER_KEY_MAX = 5
> + FIBER_KEY_RESULT = 5,
> + FIBER_KEY_MAX = 6
> };
>
> /** \cond public */
> @@ -625,6 +626,14 @@ fiber_c_invoke(fiber_func f, va_list ap)
> return f(ap);
> }
>
> +/** Destroy an active fiber and prepare it for reuse. */
> +void
> +fiber_recycle(struct fiber *fiber);
> +
> +/** Wait until fiber is finished. Applicable only to joinable fibers */
> +void
> +fiber_join_wait(struct fiber *fiber);
> +
> #if defined(__cplusplus)
> } /* extern "C" */
>
> @@ -670,6 +679,7 @@ fiber_cxx_invoke(fiber_func f, va_list ap)
>
> #endif /* defined(__cplusplus) */
>
> +
> static inline void *
> region_aligned_alloc_cb(void *ctx, size_t size)
> {
> diff --git a/src/lua/fiber.c b/src/lua/fiber.c
> index 83b5825..54b0341 100644
> --- a/src/lua/fiber.c
> +++ b/src/lua/fiber.c
> @@ -291,13 +291,14 @@ lbox_fiber_info(struct lua_State *L)
> }
>
> static int
> -lua_fiber_run_f(va_list ap)
> +lua_fiber_run_f(MAYBE_UNUSED va_list ap)
> {
> int result;
> - int coro_ref = va_arg(ap, int);
> - struct lua_State *L = va_arg(ap, struct lua_State *);
> -
> - result = luaT_call(L, lua_gettop(L) - 1, 0);
> + struct lua_State *L = (struct lua_State *)
> + fiber_get_key(fiber(), FIBER_KEY_RESULT);
> + int coro_ref = lua_tointeger(L, -1);
> + lua_pop(L, 1);
> + result = luaT_call(L, lua_gettop(L) - 1, LUA_MULTRET);
>
> /* Destroy local storage */
> int storage_ref = (int)(intptr_t)
> @@ -309,17 +310,11 @@ lua_fiber_run_f(va_list ap)
> }
>
> /**
> - * Create, resume and detach a fiber
> - * given the function and its arguments.
> + * Utility function for fiber.create and fiber.new
> */
> -static int
> -lbox_fiber_create(struct lua_State *L)
> +static struct fiber *
> +fiber_create(struct lua_State *L)
> {
> - if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> - luaL_error(L, "fiber.create(function, ...): bad arguments");
> - if (fiber_checkstack())
> - luaL_error(L, "fiber.create(): out of fiber stack");
> -
> struct lua_State *child_L = lua_newthread(L);
> int coro_ref = luaL_ref(L, LUA_REGISTRYINDEX);
>
> @@ -333,7 +328,45 @@ lbox_fiber_create(struct lua_State *L)
> lua_xmove(L, child_L, lua_gettop(L));
> /* XXX: 'fiber' is leaked if this throws a Lua error. */
> lbox_pushfiber(L, f->fid);
> - fiber_start(f, coro_ref, child_L);
> + /* Pass coro_ref via lua stack so that we don't have to pass it
> + * as an argument of fiber_run function.
> + * No function will work with child_L until the function is called.
> + * At that time we can pop coro_ref from stack
> + */
> + lua_pushinteger(child_L, coro_ref);
> + fiber_set_key(f, FIBER_KEY_RESULT, child_L);
> + return f;
> +}
> +
> +/**
> + * Create, resume and detach a fiber
> + * given the function and its arguments.
> + */
> +static int
> +lbox_fiber_create(struct lua_State *L)
> +{
> + if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> + luaL_error(L, "fiber.create(function, ...): bad arguments");
> + if (fiber_checkstack())
> + luaL_error(L, "fiber.create(): out of fiber stack");
> + struct fiber *f = fiber_create(L);
> + fiber_start(f);
> + return 1;
> +}
> +
> +/**
> + * Create a fiber, schedule it for execution, but not invoke yet
> + */
> +static int
> +lbox_fiber_new(struct lua_State *L)
> +{
> + if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> + luaL_error(L, "fiber.new(function, ...): bad arguments");
> + if (fiber_checkstack())
> + luaL_error(L, "fiber.new(): out of fiber stack");
> +
> + struct fiber *f = fiber_create(L);
> + fiber_wakeup(f);
> return 1;
> }
>
> @@ -567,6 +600,51 @@ lbox_fiber_wakeup(struct lua_State *L)
> return 0;
> }
>
> +static int
> +lbox_fiber_join(struct lua_State *L)
> +{
> + struct fiber *fiber = lbox_checkfiber(L, 1);
> + fiber_join_wait(fiber);
> + bool fiber_was_cancelled = fiber->flags & FIBER_IS_CANCELLED;
> + struct error *e = NULL;
> + if (fiber->f_ret != 0 && !fiber_was_cancelled) {
> + /*
> + * We do not want to spoil the diag of the current
> + * fiber so not calling luaT_error().
> + */
> + assert(!diag_is_empty(&fiber->diag));
> + e = diag_last_error(&fiber->diag);
> + error_ref(e);
> + luaT_pusherror(L, e);
> + diag_clear(&fiber->diag);
> + }
> + struct lua_State *child_L = fiber_get_key(fiber, FIBER_KEY_RESULT);
> + int num_ret = 0;
> + if (child_L != NULL) {
> + num_ret = lua_gettop(child_L);
> + lua_xmove(child_L, L, num_ret);
> + }
> + fiber_recycle(fiber);
> + if (e != NULL) {
> + error_unref(e);
> + lua_error(L);
> + }
> + return num_ret;
> +}
> +
> +static int
> +lbox_fiber_set_joinable(struct lua_State *L)
> +{
> +
> + if (lua_gettop(L) != 2) {
> + luaL_error(L, "fiber.set_joinable(id, yesno): bad arguments");
> + }
> + struct fiber *fiber = lbox_checkfiber(L, 1);
> + bool yesno = lua_toboolean(L, 2);
> + fiber_set_joinable(fiber, yesno);
> + return 0;
> +}
> +
> static const struct luaL_Reg lbox_fiber_meta [] = {
> {"id", lbox_fiber_id},
> {"name", lbox_fiber_name},
> @@ -575,6 +653,8 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
> {"testcancel", lbox_fiber_testcancel},
> {"__serialize", lbox_fiber_serialize},
> {"__tostring", lbox_fiber_tostring},
> + {"join", lbox_fiber_join},
> + {"set_joinable", lbox_fiber_set_joinable},
> {"wakeup", lbox_fiber_wakeup},
> {"__index", lbox_fiber_index},
> {NULL, NULL}
> @@ -589,9 +669,12 @@ static const struct luaL_Reg fiberlib[] = {
> {"find", lbox_fiber_find},
> {"kill", lbox_fiber_cancel},
> {"wakeup", lbox_fiber_wakeup},
> + {"join", lbox_fiber_join},
> + {"set_joinable", lbox_fiber_set_joinable},
> {"cancel", lbox_fiber_cancel},
> {"testcancel", lbox_fiber_testcancel},
> {"create", lbox_fiber_create},
> + {"new", lbox_fiber_new},
> {"status", lbox_fiber_status},
> {"name", lbox_fiber_name},
> {NULL, NULL}
> diff --git a/test/app/fiber.result b/test/app/fiber.result
> index 08d38ef..3ecdbd0 100644
> --- a/test/app/fiber.result
> +++ b/test/app/fiber.result
> @@ -975,6 +975,148 @@ session_type
> session_type = nil
> ---
> ...
> +-- gh-1397 fiber.new, fiber.join
> +test_run:cmd("setopt delimiter ';'")
> +---
> +- true
> +...
> +function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
> +---
> +...
> +function test1()
> + f = fiber.new(err)
> + f:set_joinable(true)
> + local e = f:join()
> + local e2 = f:join()
> + return e, e2
> +end;
> +---
> +...
> +test1();
> +---
> +- error: Illegal parameters, oh my
> +...
> +flag = false;
> +---
> +...
> +function test2()
> + f = fiber.new(function() flag = true end)
> + fiber.set_joinable(f, true)
> + fiber.join(f)
> +end;
> +---
> +...
> +test2();
> +---
> +...
> +flag;
> +---
> +- true
> +...
> +function test3()
> + f = fiber.new(function() return "hello" end)
> + fiber.set_joinable(f, true)
> + return fiber.join(f)
> +end;
> +---
> +...
> +test3();
> +---
> +- hello
> +...
> +function test4()
> + f = fiber.new(function (i) return i + 1 end, 1)
> + fiber.set_joinable(f, true)
> + return f:join()
> +end;
> +---
> +...
> +test4();
> +---
> +- 2
> +...
> +function test5()
> + f = fiber.new(function() end)
> + f:set_joinable(true)
> + return f, f:status()
> +end;
> +---
> +...
> +local status;
> +---
> +...
> +f, status = test5();
> +---
> +...
> +status;
> +---
> +- suspended
> +...
> +f:status();
> +---
> +- suspended
> +...
> +f:join();
> +---
> +...
> +f:status();
> +---
> +- dead
> +...
> +function test6()
> + f = fiber.new(function() end)
> + f:set_joinable(true)
> + f:set_joinable(false)
> + return f, f:status()
> +end;
> +---
> +...
> +f, status = test6();
> +---
> +...
> +status;
> +---
> +- suspended
> +...
> +f:status();
> +---
> +- dead
> +...
> +-- test side fiber in transaction
> +s = box.schema.space.create("test");
> +---
> +...
> +_ = s:create_index("prim", {parts={1, 'number'}});
> +---
> +...
> +flag = false;
> +---
> +...
> +function test7(i)
> + box.begin()
> + s:put{i}
> + fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
> + box.rollback()
> +end;
> +---
> +...
> +f = fiber.create(test7, 1);
> +---
> +...
> +while flag ~= true do fiber.sleep(0.001) end;
> +---
> +...
> +s:select{};
> +---
> +- - [2]
> +...
> +s:drop();
> +---
> +...
> +test_run:cmd("setopt delimiter ''");
> +---
> +- true
> +...
> fiber = nil
> ---
> ...
> diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
> index ad3da09..b509f4a 100644
> --- a/test/app/fiber.test.lua
> +++ b/test/app/fiber.test.lua
> @@ -402,6 +402,78 @@ _ = fiber.create(fn1)
> session_type
> session_type = nil
>
> +-- gh-1397 fiber.new, fiber.join
> +test_run:cmd("setopt delimiter ';'")
> +function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
> +function test1()
> + f = fiber.new(err)
> + f:set_joinable(true)
> + local e = f:join()
> + local e2 = f:join()
> + return e, e2
> +end;
> +test1();
> +
> +flag = false;
> +function test2()
> + f = fiber.new(function() flag = true end)
> + fiber.set_joinable(f, true)
> + fiber.join(f)
> +end;
> +test2();
> +flag;
> +
> +function test3()
> + f = fiber.new(function() return "hello" end)
> + fiber.set_joinable(f, true)
> + return fiber.join(f)
> +end;
> +test3();
> +
> +function test4()
> + f = fiber.new(function (i) return i + 1 end, 1)
> + fiber.set_joinable(f, true)
> + return f:join()
> +end;
> +test4();
> +
> +function test5()
> + f = fiber.new(function() end)
> + f:set_joinable(true)
> + return f, f:status()
> +end;
> +local status;
> +f, status = test5();
> +status;
> +f:status();
> +f:join();
> +f:status();
> +
> +function test6()
> + f = fiber.new(function() end)
> + f:set_joinable(true)
> + f:set_joinable(false)
> + return f, f:status()
> +end;
> +f, status = test6();
> +status;
> +f:status();
> +
> +-- test side fiber in transaction
> +s = box.schema.space.create("test");
> +_ = s:create_index("prim", {parts={1, 'number'}});
> +flag = false;
> +function test7(i)
> + box.begin()
> + s:put{i}
> + fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
> + box.rollback()
> +end;
> +f = fiber.create(test7, 1);
> +while flag ~= true do fiber.sleep(0.001) end;
> +s:select{};
> +s:drop();
> +test_run:cmd("setopt delimiter ''");
> fiber = nil
>
> --
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20180216/88ed337b/attachment.sig>
More information about the Tarantool-patches
mailing list