[patches] [fiber 1/1] fiber: Introduce fiber.join() related methods
Georgy Kirichenko
georgy at tarantool.org
Tue Feb 20 19:54:09 MSK 2018
Seems to be Ok
On Tuesday, February 20, 2018 6:18:24 PM MSK imarkov wrote:
> From: Ilya <markovilya197 at gmail.com>
>
> Introduce two functions
> * fiber.new() - create fiber, schedules it into the
> ready queue but doesn't call it and doesn't yield.
> Signature of the method is the same as for fiber.create
> * fiber.join() - waits until the specified fiber finishes
> its execution and returns result or error. Applicable only
> to joinable fibers.
> * fiber.set_joinable() - sets the fiber joinable flag
>
> Closes #1397
> ---
> src/fiber.c | 2 +
> src/fiber.h | 5 +-
> src/lua/fiber.c | 128 +++++++++++++++++++++++++++++++-----
> test/app/fiber.result | 168
> ++++++++++++++++++++++++++++++++++++++++++++++++ test/app/fiber.test.lua |
> 82 +++++++++++++++++++++++
> 5 files changed, 368 insertions(+), 17 deletions(-)
>
> diff --git a/src/fiber.c b/src/fiber.c
> index 9415739..9933eeb 100644
> --- a/src/fiber.c
> +++ b/src/fiber.c
> @@ -383,6 +383,8 @@ fiber_join(struct fiber *fiber)
>
> if (! fiber_is_dead(fiber)) {
> rlist_add_tail_entry(&fiber->wake, fiber(), state);
> + }
> + while (!fiber_is_dead(fiber)) {
> fiber_yield();
> }
> assert(fiber_is_dead(fiber));
> diff --git a/src/fiber.h b/src/fiber.h
> index 94b3f44..dcba245 100644
> --- a/src/fiber.h
> +++ b/src/fiber.h
> @@ -105,7 +105,9 @@ enum fiber_key {
> /** User global privilege and authentication token */
> FIBER_KEY_USER = 3,
> FIBER_KEY_MSG = 4,
> - FIBER_KEY_MAX = 5
> + /** Storage for lua stack */
> + FIBER_KEY_LUA_STACK = 5,
> + FIBER_KEY_MAX = 6
> };
>
> /** \cond public */
> @@ -670,6 +672,7 @@ fiber_cxx_invoke(fiber_func f, va_list ap)
>
> #endif /* defined(__cplusplus) */
>
> +
> static inline void *
> region_aligned_alloc_cb(void *ctx, size_t size)
> {
> diff --git a/src/lua/fiber.c b/src/lua/fiber.c
> index 83b5825..312edd3 100644
> --- a/src/lua/fiber.c
> +++ b/src/lua/fiber.c
> @@ -291,35 +291,39 @@ lbox_fiber_info(struct lua_State *L)
> }
>
> static int
> -lua_fiber_run_f(va_list ap)
> +lua_fiber_run_f(MAYBE_UNUSED va_list ap)
> {
> int result;
> - int coro_ref = va_arg(ap, int);
> - struct lua_State *L = va_arg(ap, struct lua_State *);
> -
> - result = luaT_call(L, lua_gettop(L) - 1, 0);
> + struct lua_State *L = (struct lua_State *)
> + fiber_get_key(fiber(), FIBER_KEY_LUA_STACK);
> + int coro_ref = lua_tointeger(L, -1);
> + lua_pop(L, 1);
> + result = luaT_call(L, lua_gettop(L) - 1, LUA_MULTRET);
>
> /* Destroy local storage */
> int storage_ref = (int)(intptr_t)
> fiber_get_key(fiber(), FIBER_KEY_LUA_STORAGE);
> if (storage_ref > 0)
> luaL_unref(L, LUA_REGISTRYINDEX, storage_ref);
> - luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);
> + /*
> + * If fiber is not joinable
> + * We can unref child stack here,
> + * otherwise we have to unref child stack in join
> + */
> + if (fiber()->flags & FIBER_IS_JOINABLE)
> + lua_pushinteger(L, coro_ref);
> + else
> + luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);
> +
> return result;
> }
>
> /**
> - * Create, resume and detach a fiber
> - * given the function and its arguments.
> + * Utility function for fiber.create and fiber.new
> */
> -static int
> -lbox_fiber_create(struct lua_State *L)
> +static struct fiber *
> +fiber_create(struct lua_State *L)
> {
> - if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> - luaL_error(L, "fiber.create(function, ...): bad arguments");
> - if (fiber_checkstack())
> - luaL_error(L, "fiber.create(): out of fiber stack");
> -
> struct lua_State *child_L = lua_newthread(L);
> int coro_ref = luaL_ref(L, LUA_REGISTRYINDEX);
>
> @@ -333,7 +337,45 @@ lbox_fiber_create(struct lua_State *L)
> lua_xmove(L, child_L, lua_gettop(L));
> /* XXX: 'fiber' is leaked if this throws a Lua error. */
> lbox_pushfiber(L, f->fid);
> - fiber_start(f, coro_ref, child_L);
> + /* Pass coro_ref via lua stack so that we don't have to pass it
> + * as an argument of fiber_run function.
> + * No function will work with child_L until the function is called.
> + * At that time we can pop coro_ref from stack
> + */
> + lua_pushinteger(child_L, coro_ref);
> + fiber_set_key(f, FIBER_KEY_LUA_STACK, child_L);
> + return f;
> +}
> +
> +/**
> + * Create, resume and detach a fiber
> + * given the function and its arguments.
> + */
> +static int
> +lbox_fiber_create(struct lua_State *L)
> +{
> + if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> + luaL_error(L, "fiber.create(function, ...): bad arguments");
> + if (fiber_checkstack())
> + luaL_error(L, "fiber.create(): out of fiber stack");
> + struct fiber *f = fiber_create(L);
> + fiber_start(f);
> + return 1;
> +}
> +
> +/**
> + * Create a fiber, schedule it for execution, but not invoke yet
> + */
> +static int
> +lbox_fiber_new(struct lua_State *L)
> +{
> + if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
> + luaL_error(L, "fiber.new(function, ...): bad arguments");
> + if (fiber_checkstack())
> + luaL_error(L, "fiber.new(): out of fiber stack");
> +
> + struct fiber *f = fiber_create(L);
> + fiber_wakeup(f);
> return 1;
> }
>
> @@ -567,6 +609,55 @@ lbox_fiber_wakeup(struct lua_State *L)
> return 0;
> }
>
> +static int
> +lbox_fiber_join(struct lua_State *L)
> +{
> + struct fiber *fiber = lbox_checkfiber(L, 1);
> + struct lua_State *child_L = fiber_get_key(fiber, FIBER_KEY_LUA_STACK);
> + fiber_join(fiber);
> + struct error *e = NULL;
> + int num_ret = 0;
> + int coro_ref = 0;
> + if (child_L != NULL) {
> + coro_ref = lua_tointeger(child_L, -1);
> + lua_pop(child_L, 1);
> + }
> + if (fiber->f_ret != 0) {
> + /*
> + * After fiber_join the error of fiber being joined was moved to
> + * current fiber diag so we have to get it from there.
> + */
> + assert(!diag_is_empty(&fiber()->diag));
> + e = diag_last_error(&fiber()->diag);
> + lua_pushboolean(L, false);
> + luaT_pusherror(L, e);
> + diag_clear(&fiber()->diag);
> + num_ret = 1;
> + } else {
> + lua_pushboolean(L, true);
> + if (child_L != NULL) {
> + num_ret = lua_gettop(child_L);
> + lua_xmove(child_L, L, num_ret);
> + }
> + }
> + if (child_L != NULL)
> + luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);
> + return num_ret + 1;
> +}
> +
> +static int
> +lbox_fiber_set_joinable(struct lua_State *L)
> +{
> +
> + if (lua_gettop(L) != 2) {
> + luaL_error(L, "fiber.set_joinable(id, yesno): bad arguments");
> + }
> + struct fiber *fiber = lbox_checkfiber(L, 1);
> + bool yesno = lua_toboolean(L, 2);
> + fiber_set_joinable(fiber, yesno);
> + return 0;
> +}
> +
> static const struct luaL_Reg lbox_fiber_meta [] = {
> {"id", lbox_fiber_id},
> {"name", lbox_fiber_name},
> @@ -575,6 +666,8 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
> {"testcancel", lbox_fiber_testcancel},
> {"__serialize", lbox_fiber_serialize},
> {"__tostring", lbox_fiber_tostring},
> + {"join", lbox_fiber_join},
> + {"set_joinable", lbox_fiber_set_joinable},
> {"wakeup", lbox_fiber_wakeup},
> {"__index", lbox_fiber_index},
> {NULL, NULL}
> @@ -589,9 +682,12 @@ static const struct luaL_Reg fiberlib[] = {
> {"find", lbox_fiber_find},
> {"kill", lbox_fiber_cancel},
> {"wakeup", lbox_fiber_wakeup},
> + {"join", lbox_fiber_join},
> + {"set_joinable", lbox_fiber_set_joinable},
> {"cancel", lbox_fiber_cancel},
> {"testcancel", lbox_fiber_testcancel},
> {"create", lbox_fiber_create},
> + {"new", lbox_fiber_new},
> {"status", lbox_fiber_status},
> {"name", lbox_fiber_name},
> {NULL, NULL}
> diff --git a/test/app/fiber.result b/test/app/fiber.result
> index 08d38ef..6e81c30 100644
> --- a/test/app/fiber.result
> +++ b/test/app/fiber.result
> @@ -975,6 +975,174 @@ session_type
> session_type = nil
> ---
> ...
> +-- gh-1397 fiber.new, fiber.join
> +test_run:cmd("setopt delimiter ';'")
> +---
> +- true
> +...
> +function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
> +---
> +...
> +function test1()
> + f = fiber.new(err)
> + f:set_joinable(true)
> + local st, e = f:join()
> + return st, e
> +end;
> +---
> +...
> +st, e = test1();
> +---
> +...
> +st;
> +---
> +- false
> +...
> +e:unpack();
> +---
> +- type: ClientError
> + code: 1
> + message: Illegal parameters, oh my
> + trace:
> + - file: '[string "function err() box.error(box.error.ILLEGAL_PA..."]'
> + line: 1
> +...
> +flag = false;
> +---
> +...
> +function test2()
> + f = fiber.new(function() flag = true end)
> + fiber.set_joinable(f, true)
> + fiber.join(f)
> +end;
> +---
> +...
> +test2();
> +---
> +...
> +flag;
> +---
> +- true
> +...
> +function test3()
> + f = fiber.new(function() return "hello" end)
> + fiber.set_joinable(f, true)
> + return fiber.join(f)
> +end;
> +---
> +...
> +test3();
> +---
> +- true
> +- hello
> +...
> +function test4()
> + f = fiber.new(function (i) return i + 1 end, 1)
> + fiber.set_joinable(f, true)
> + return f:join()
> +end;
> +---
> +...
> +test4();
> +---
> +- true
> +- 2
> +...
> +function test_double_join()
> + f = fiber.new(function (i) return i + 1 end, 1)
> + fiber.set_joinable(f, true)
> + f:join()
> + return f:join()
> +end;
> +---
> +...
> +test_double_join();
> +---
> +- error: the fiber is dead
> +...
> +function test5()
> + f = fiber.new(function() end)
> + f:set_joinable(true)
> + return f, f:status()
> +end;
> +---
> +...
> +local status;
> +---
> +...
> +f, status = test5();
> +---
> +...
> +status;
> +---
> +- suspended
> +...
> +f:status();
> +---
> +- suspended
> +...
> +f:join();
> +---
> +- true
> +...
> +f:status();
> +---
> +- dead
> +...
> +function test6()
> + f = fiber.new(function() end)
> + f:set_joinable(true)
> + f:set_joinable(false)
> + return f, f:status()
> +end;
> +---
> +...
> +f, status = test6();
> +---
> +...
> +status;
> +---
> +- suspended
> +...
> +f:status();
> +---
> +- dead
> +...
> +-- test side fiber in transaction
> +s = box.schema.space.create("test");
> +---
> +...
> +_ = s:create_index("prim", {parts={1, 'number'}});
> +---
> +...
> +flag = false;
> +---
> +...
> +function test7(i)
> + box.begin()
> + s:put{i}
> + fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
> + box.rollback()
> +end;
> +---
> +...
> +f = fiber.create(test7, 1);
> +---
> +...
> +while flag ~= true do fiber.sleep(0.001) end;
> +---
> +...
> +s:select{};
> +---
> +- - [2]
> +...
> +s:drop();
> +---
> +...
> +test_run:cmd("setopt delimiter ''");
> +---
> +- true
> +...
> fiber = nil
> ---
> ...
> diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
> index ad3da09..b7ca8d4 100644
> --- a/test/app/fiber.test.lua
> +++ b/test/app/fiber.test.lua
> @@ -402,6 +402,88 @@ _ = fiber.create(fn1)
> session_type
> session_type = nil
>
> +-- gh-1397 fiber.new, fiber.join
> +test_run:cmd("setopt delimiter ';'")
> +function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
> +function test1()
> + f = fiber.new(err)
> + f:set_joinable(true)
> + local st, e = f:join()
> + return st, e
> +end;
> +st, e = test1();
> +st;
> +e:unpack();
> +
> +flag = false;
> +function test2()
> + f = fiber.new(function() flag = true end)
> + fiber.set_joinable(f, true)
> + fiber.join(f)
> +end;
> +test2();
> +flag;
> +
> +function test3()
> + f = fiber.new(function() return "hello" end)
> + fiber.set_joinable(f, true)
> + return fiber.join(f)
> +end;
> +test3();
> +
> +function test4()
> + f = fiber.new(function (i) return i + 1 end, 1)
> + fiber.set_joinable(f, true)
> + return f:join()
> +end;
> +test4();
> +
> +function test_double_join()
> + f = fiber.new(function (i) return i + 1 end, 1)
> + fiber.set_joinable(f, true)
> + f:join()
> + return f:join()
> +end;
> +test_double_join();
> +
> +
> +function test5()
> + f = fiber.new(function() end)
> + f:set_joinable(true)
> + return f, f:status()
> +end;
> +local status;
> +f, status = test5();
> +status;
> +f:status();
> +f:join();
> +f:status();
> +
> +function test6()
> + f = fiber.new(function() end)
> + f:set_joinable(true)
> + f:set_joinable(false)
> + return f, f:status()
> +end;
> +f, status = test6();
> +status;
> +f:status();
> +
> +-- test side fiber in transaction
> +s = box.schema.space.create("test");
> +_ = s:create_index("prim", {parts={1, 'number'}});
> +flag = false;
> +function test7(i)
> + box.begin()
> + s:put{i}
> + fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
> + box.rollback()
> +end;
> +f = fiber.create(test7, 1);
> +while flag ~= true do fiber.sleep(0.001) end;
> +s:select{};
> +s:drop();
> +test_run:cmd("setopt delimiter ''");
> fiber = nil
>
> --
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20180220/ada0e786/attachment.sig>
More information about the Tarantool-patches
mailing list