[patches] [fiber 1/1] fiber: Introduce fiber.join() related methods
imarkov
imarkov at tarantool.org
Tue Feb 20 18:18:24 MSK 2018
From: Ilya <markovilya197 at gmail.com>
Introduce two functions
* fiber.new() - create fiber, schedules it into the
ready queue but doesn't call it and doesn't yield.
Signature of the method is the same as for fiber.create
* fiber.join() - waits until the specified fiber finishes
its execution and returns result or error. Applicable only
to joinable fibers.
* fiber.set_joinable() - sets the fiber joinable flag
Closes #1397
---
src/fiber.c | 2 +
src/fiber.h | 5 +-
src/lua/fiber.c | 128 +++++++++++++++++++++++++++++++-----
test/app/fiber.result | 168 ++++++++++++++++++++++++++++++++++++++++++++++++
test/app/fiber.test.lua | 82 +++++++++++++++++++++++
5 files changed, 368 insertions(+), 17 deletions(-)
diff --git a/src/fiber.c b/src/fiber.c
index 9415739..9933eeb 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -383,6 +383,8 @@ fiber_join(struct fiber *fiber)
if (! fiber_is_dead(fiber)) {
rlist_add_tail_entry(&fiber->wake, fiber(), state);
+ }
+ while (!fiber_is_dead(fiber)) {
fiber_yield();
}
assert(fiber_is_dead(fiber));
diff --git a/src/fiber.h b/src/fiber.h
index 94b3f44..dcba245 100644
--- a/src/fiber.h
+++ b/src/fiber.h
@@ -105,7 +105,9 @@ enum fiber_key {
/** User global privilege and authentication token */
FIBER_KEY_USER = 3,
FIBER_KEY_MSG = 4,
- FIBER_KEY_MAX = 5
+ /** Storage for lua stack */
+ FIBER_KEY_LUA_STACK = 5,
+ FIBER_KEY_MAX = 6
};
/** \cond public */
@@ -670,6 +672,7 @@ fiber_cxx_invoke(fiber_func f, va_list ap)
#endif /* defined(__cplusplus) */
+
static inline void *
region_aligned_alloc_cb(void *ctx, size_t size)
{
diff --git a/src/lua/fiber.c b/src/lua/fiber.c
index 83b5825..312edd3 100644
--- a/src/lua/fiber.c
+++ b/src/lua/fiber.c
@@ -291,35 +291,39 @@ lbox_fiber_info(struct lua_State *L)
}
static int
-lua_fiber_run_f(va_list ap)
+lua_fiber_run_f(MAYBE_UNUSED va_list ap)
{
int result;
- int coro_ref = va_arg(ap, int);
- struct lua_State *L = va_arg(ap, struct lua_State *);
-
- result = luaT_call(L, lua_gettop(L) - 1, 0);
+ struct lua_State *L = (struct lua_State *)
+ fiber_get_key(fiber(), FIBER_KEY_LUA_STACK);
+ int coro_ref = lua_tointeger(L, -1);
+ lua_pop(L, 1);
+ result = luaT_call(L, lua_gettop(L) - 1, LUA_MULTRET);
/* Destroy local storage */
int storage_ref = (int)(intptr_t)
fiber_get_key(fiber(), FIBER_KEY_LUA_STORAGE);
if (storage_ref > 0)
luaL_unref(L, LUA_REGISTRYINDEX, storage_ref);
- luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);
+ /*
+ * If fiber is not joinable
+ * We can unref child stack here,
+ * otherwise we have to unref child stack in join
+ */
+ if (fiber()->flags & FIBER_IS_JOINABLE)
+ lua_pushinteger(L, coro_ref);
+ else
+ luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);
+
return result;
}
/**
- * Create, resume and detach a fiber
- * given the function and its arguments.
+ * Utility function for fiber.create and fiber.new
*/
-static int
-lbox_fiber_create(struct lua_State *L)
+static struct fiber *
+fiber_create(struct lua_State *L)
{
- if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
- luaL_error(L, "fiber.create(function, ...): bad arguments");
- if (fiber_checkstack())
- luaL_error(L, "fiber.create(): out of fiber stack");
-
struct lua_State *child_L = lua_newthread(L);
int coro_ref = luaL_ref(L, LUA_REGISTRYINDEX);
@@ -333,7 +337,45 @@ lbox_fiber_create(struct lua_State *L)
lua_xmove(L, child_L, lua_gettop(L));
/* XXX: 'fiber' is leaked if this throws a Lua error. */
lbox_pushfiber(L, f->fid);
- fiber_start(f, coro_ref, child_L);
+ /* Pass coro_ref via lua stack so that we don't have to pass it
+ * as an argument of fiber_run function.
+ * No function will work with child_L until the function is called.
+ * At that time we can pop coro_ref from stack
+ */
+ lua_pushinteger(child_L, coro_ref);
+ fiber_set_key(f, FIBER_KEY_LUA_STACK, child_L);
+ return f;
+}
+
+/**
+ * Create, resume and detach a fiber
+ * given the function and its arguments.
+ */
+static int
+lbox_fiber_create(struct lua_State *L)
+{
+ if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
+ luaL_error(L, "fiber.create(function, ...): bad arguments");
+ if (fiber_checkstack())
+ luaL_error(L, "fiber.create(): out of fiber stack");
+ struct fiber *f = fiber_create(L);
+ fiber_start(f);
+ return 1;
+}
+
+/**
+ * Create a fiber, schedule it for execution, but not invoke yet
+ */
+static int
+lbox_fiber_new(struct lua_State *L)
+{
+ if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
+ luaL_error(L, "fiber.new(function, ...): bad arguments");
+ if (fiber_checkstack())
+ luaL_error(L, "fiber.new(): out of fiber stack");
+
+ struct fiber *f = fiber_create(L);
+ fiber_wakeup(f);
return 1;
}
@@ -567,6 +609,55 @@ lbox_fiber_wakeup(struct lua_State *L)
return 0;
}
+static int
+lbox_fiber_join(struct lua_State *L)
+{
+ struct fiber *fiber = lbox_checkfiber(L, 1);
+ struct lua_State *child_L = fiber_get_key(fiber, FIBER_KEY_LUA_STACK);
+ fiber_join(fiber);
+ struct error *e = NULL;
+ int num_ret = 0;
+ int coro_ref = 0;
+ if (child_L != NULL) {
+ coro_ref = lua_tointeger(child_L, -1);
+ lua_pop(child_L, 1);
+ }
+ if (fiber->f_ret != 0) {
+ /*
+ * After fiber_join the error of fiber being joined was moved to
+ * current fiber diag so we have to get it from there.
+ */
+ assert(!diag_is_empty(&fiber()->diag));
+ e = diag_last_error(&fiber()->diag);
+ lua_pushboolean(L, false);
+ luaT_pusherror(L, e);
+ diag_clear(&fiber()->diag);
+ num_ret = 1;
+ } else {
+ lua_pushboolean(L, true);
+ if (child_L != NULL) {
+ num_ret = lua_gettop(child_L);
+ lua_xmove(child_L, L, num_ret);
+ }
+ }
+ if (child_L != NULL)
+ luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);
+ return num_ret + 1;
+}
+
+static int
+lbox_fiber_set_joinable(struct lua_State *L)
+{
+
+ if (lua_gettop(L) != 2) {
+ luaL_error(L, "fiber.set_joinable(id, yesno): bad arguments");
+ }
+ struct fiber *fiber = lbox_checkfiber(L, 1);
+ bool yesno = lua_toboolean(L, 2);
+ fiber_set_joinable(fiber, yesno);
+ return 0;
+}
+
static const struct luaL_Reg lbox_fiber_meta [] = {
{"id", lbox_fiber_id},
{"name", lbox_fiber_name},
@@ -575,6 +666,8 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
{"testcancel", lbox_fiber_testcancel},
{"__serialize", lbox_fiber_serialize},
{"__tostring", lbox_fiber_tostring},
+ {"join", lbox_fiber_join},
+ {"set_joinable", lbox_fiber_set_joinable},
{"wakeup", lbox_fiber_wakeup},
{"__index", lbox_fiber_index},
{NULL, NULL}
@@ -589,9 +682,12 @@ static const struct luaL_Reg fiberlib[] = {
{"find", lbox_fiber_find},
{"kill", lbox_fiber_cancel},
{"wakeup", lbox_fiber_wakeup},
+ {"join", lbox_fiber_join},
+ {"set_joinable", lbox_fiber_set_joinable},
{"cancel", lbox_fiber_cancel},
{"testcancel", lbox_fiber_testcancel},
{"create", lbox_fiber_create},
+ {"new", lbox_fiber_new},
{"status", lbox_fiber_status},
{"name", lbox_fiber_name},
{NULL, NULL}
diff --git a/test/app/fiber.result b/test/app/fiber.result
index 08d38ef..6e81c30 100644
--- a/test/app/fiber.result
+++ b/test/app/fiber.result
@@ -975,6 +975,174 @@ session_type
session_type = nil
---
...
+-- gh-1397 fiber.new, fiber.join
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
+---
+...
+function test1()
+ f = fiber.new(err)
+ f:set_joinable(true)
+ local st, e = f:join()
+ return st, e
+end;
+---
+...
+st, e = test1();
+---
+...
+st;
+---
+- false
+...
+e:unpack();
+---
+- type: ClientError
+ code: 1
+ message: Illegal parameters, oh my
+ trace:
+ - file: '[string "function err() box.error(box.error.ILLEGAL_PA..."]'
+ line: 1
+...
+flag = false;
+---
+...
+function test2()
+ f = fiber.new(function() flag = true end)
+ fiber.set_joinable(f, true)
+ fiber.join(f)
+end;
+---
+...
+test2();
+---
+...
+flag;
+---
+- true
+...
+function test3()
+ f = fiber.new(function() return "hello" end)
+ fiber.set_joinable(f, true)
+ return fiber.join(f)
+end;
+---
+...
+test3();
+---
+- true
+- hello
+...
+function test4()
+ f = fiber.new(function (i) return i + 1 end, 1)
+ fiber.set_joinable(f, true)
+ return f:join()
+end;
+---
+...
+test4();
+---
+- true
+- 2
+...
+function test_double_join()
+ f = fiber.new(function (i) return i + 1 end, 1)
+ fiber.set_joinable(f, true)
+ f:join()
+ return f:join()
+end;
+---
+...
+test_double_join();
+---
+- error: the fiber is dead
+...
+function test5()
+ f = fiber.new(function() end)
+ f:set_joinable(true)
+ return f, f:status()
+end;
+---
+...
+local status;
+---
+...
+f, status = test5();
+---
+...
+status;
+---
+- suspended
+...
+f:status();
+---
+- suspended
+...
+f:join();
+---
+- true
+...
+f:status();
+---
+- dead
+...
+function test6()
+ f = fiber.new(function() end)
+ f:set_joinable(true)
+ f:set_joinable(false)
+ return f, f:status()
+end;
+---
+...
+f, status = test6();
+---
+...
+status;
+---
+- suspended
+...
+f:status();
+---
+- dead
+...
+-- test side fiber in transaction
+s = box.schema.space.create("test");
+---
+...
+_ = s:create_index("prim", {parts={1, 'number'}});
+---
+...
+flag = false;
+---
+...
+function test7(i)
+ box.begin()
+ s:put{i}
+ fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
+ box.rollback()
+end;
+---
+...
+f = fiber.create(test7, 1);
+---
+...
+while flag ~= true do fiber.sleep(0.001) end;
+---
+...
+s:select{};
+---
+- - [2]
+...
+s:drop();
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
fiber = nil
---
...
diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
index ad3da09..b7ca8d4 100644
--- a/test/app/fiber.test.lua
+++ b/test/app/fiber.test.lua
@@ -402,6 +402,88 @@ _ = fiber.create(fn1)
session_type
session_type = nil
+-- gh-1397 fiber.new, fiber.join
+test_run:cmd("setopt delimiter ';'")
+function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
+function test1()
+ f = fiber.new(err)
+ f:set_joinable(true)
+ local st, e = f:join()
+ return st, e
+end;
+st, e = test1();
+st;
+e:unpack();
+
+flag = false;
+function test2()
+ f = fiber.new(function() flag = true end)
+ fiber.set_joinable(f, true)
+ fiber.join(f)
+end;
+test2();
+flag;
+
+function test3()
+ f = fiber.new(function() return "hello" end)
+ fiber.set_joinable(f, true)
+ return fiber.join(f)
+end;
+test3();
+
+function test4()
+ f = fiber.new(function (i) return i + 1 end, 1)
+ fiber.set_joinable(f, true)
+ return f:join()
+end;
+test4();
+
+function test_double_join()
+ f = fiber.new(function (i) return i + 1 end, 1)
+ fiber.set_joinable(f, true)
+ f:join()
+ return f:join()
+end;
+test_double_join();
+
+
+function test5()
+ f = fiber.new(function() end)
+ f:set_joinable(true)
+ return f, f:status()
+end;
+local status;
+f, status = test5();
+status;
+f:status();
+f:join();
+f:status();
+
+function test6()
+ f = fiber.new(function() end)
+ f:set_joinable(true)
+ f:set_joinable(false)
+ return f, f:status()
+end;
+f, status = test6();
+status;
+f:status();
+
+-- test side fiber in transaction
+s = box.schema.space.create("test");
+_ = s:create_index("prim", {parts={1, 'number'}});
+flag = false;
+function test7(i)
+ box.begin()
+ s:put{i}
+ fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
+ box.rollback()
+end;
+f = fiber.create(test7, 1);
+while flag ~= true do fiber.sleep(0.001) end;
+s:select{};
+s:drop();
+test_run:cmd("setopt delimiter ''");
fiber = nil
--
--
2.7.4
More information about the Tarantool-patches
mailing list