[patches] [fiber 1/1] fiber: Introduce fiber.join() related methods
imarkov
imarkov at tarantool.org
Thu Feb 15 14:49:33 MSK 2018
From: Ilya <markovilya197 at gmail.com>
Introduce two functions
* fiber.new() - create fiber, schedules it into the
ready queue but doesn't call it and doesn't yield.
Signature of the method is the same as for fiber.create
* fiber.join() - waits until the specified fiber finishes
its execution and returns result or error. Applicable only
to joinable fibers.
* fiber.set_joinable() - sets the fiber joinable flag
Closes #1397
---
src/fiber.c | 16 ++++--
src/fiber.h | 12 +++-
src/lua/fiber.c | 113 +++++++++++++++++++++++++++++++++-----
test/app/fiber.result | 142 ++++++++++++++++++++++++++++++++++++++++++++++++
test/app/fiber.test.lua | 72 ++++++++++++++++++++++++
5 files changed, 333 insertions(+), 22 deletions(-)
diff --git a/src/fiber.c b/src/fiber.c
index 9415739..cd3263d 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -148,8 +148,6 @@ fiber_attr_getstacksize(struct fiber_attr *fiber_attr)
fiber_attr_default.stack_size;
}
-static void
-fiber_recycle(struct fiber *fiber);
static void
fiber_destroy(struct cord *cord, struct fiber *f);
@@ -376,8 +374,8 @@ fiber_reschedule(void)
fiber_yield();
}
-int
-fiber_join(struct fiber *fiber)
+void
+fiber_join_wait(struct fiber *fiber)
{
assert(fiber->flags & FIBER_IS_JOINABLE);
@@ -386,8 +384,14 @@ fiber_join(struct fiber *fiber)
fiber_yield();
}
assert(fiber_is_dead(fiber));
- /* Move exception to the caller */
+}
+
+int
+fiber_join(struct fiber *fiber)
+{
+ fiber_join_wait(fiber);
int ret = fiber->f_ret;
+ /* Move exception to the caller */
if (ret != 0) {
assert(!diag_is_empty(&fiber->diag));
diag_move(&fiber->diag, &fiber()->diag);
@@ -598,7 +602,7 @@ fiber_reset(struct fiber *fiber)
}
/** Destroy an active fiber and prepare it for reuse. */
-static void
+void
fiber_recycle(struct fiber *fiber)
{
/* no exceptions are leaking */
diff --git a/src/fiber.h b/src/fiber.h
index 94b3f44..72caaf9 100644
--- a/src/fiber.h
+++ b/src/fiber.h
@@ -105,7 +105,8 @@ enum fiber_key {
/** User global privilege and authentication token */
FIBER_KEY_USER = 3,
FIBER_KEY_MSG = 4,
- FIBER_KEY_MAX = 5
+ FIBER_KEY_RESULT = 5,
+ FIBER_KEY_MAX = 6
};
/** \cond public */
@@ -625,6 +626,14 @@ fiber_c_invoke(fiber_func f, va_list ap)
return f(ap);
}
+/** Destroy an active fiber and prepare it for reuse. */
+void
+fiber_recycle(struct fiber *fiber);
+
+/** Wait until fiber is finished. Applicable only to joinable fibers */
+void
+fiber_join_wait(struct fiber *fiber);
+
#if defined(__cplusplus)
} /* extern "C" */
@@ -670,6 +679,7 @@ fiber_cxx_invoke(fiber_func f, va_list ap)
#endif /* defined(__cplusplus) */
+
static inline void *
region_aligned_alloc_cb(void *ctx, size_t size)
{
diff --git a/src/lua/fiber.c b/src/lua/fiber.c
index 83b5825..54b0341 100644
--- a/src/lua/fiber.c
+++ b/src/lua/fiber.c
@@ -291,13 +291,14 @@ lbox_fiber_info(struct lua_State *L)
}
static int
-lua_fiber_run_f(va_list ap)
+lua_fiber_run_f(MAYBE_UNUSED va_list ap)
{
int result;
- int coro_ref = va_arg(ap, int);
- struct lua_State *L = va_arg(ap, struct lua_State *);
-
- result = luaT_call(L, lua_gettop(L) - 1, 0);
+ struct lua_State *L = (struct lua_State *)
+ fiber_get_key(fiber(), FIBER_KEY_RESULT);
+ int coro_ref = lua_tointeger(L, -1);
+ lua_pop(L, 1);
+ result = luaT_call(L, lua_gettop(L) - 1, LUA_MULTRET);
/* Destroy local storage */
int storage_ref = (int)(intptr_t)
@@ -309,17 +310,11 @@ lua_fiber_run_f(va_list ap)
}
/**
- * Create, resume and detach a fiber
- * given the function and its arguments.
+ * Utility function for fiber.create and fiber.new
*/
-static int
-lbox_fiber_create(struct lua_State *L)
+static struct fiber *
+fiber_create(struct lua_State *L)
{
- if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
- luaL_error(L, "fiber.create(function, ...): bad arguments");
- if (fiber_checkstack())
- luaL_error(L, "fiber.create(): out of fiber stack");
-
struct lua_State *child_L = lua_newthread(L);
int coro_ref = luaL_ref(L, LUA_REGISTRYINDEX);
@@ -333,7 +328,45 @@ lbox_fiber_create(struct lua_State *L)
lua_xmove(L, child_L, lua_gettop(L));
/* XXX: 'fiber' is leaked if this throws a Lua error. */
lbox_pushfiber(L, f->fid);
- fiber_start(f, coro_ref, child_L);
+ /* Pass coro_ref via lua stack so that we don't have to pass it
+ * as an argument of fiber_run function.
+ * No function will work with child_L until the function is called.
+ * At that time we can pop coro_ref from stack
+ */
+ lua_pushinteger(child_L, coro_ref);
+ fiber_set_key(f, FIBER_KEY_RESULT, child_L);
+ return f;
+}
+
+/**
+ * Create, resume and detach a fiber
+ * given the function and its arguments.
+ */
+static int
+lbox_fiber_create(struct lua_State *L)
+{
+ if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
+ luaL_error(L, "fiber.create(function, ...): bad arguments");
+ if (fiber_checkstack())
+ luaL_error(L, "fiber.create(): out of fiber stack");
+ struct fiber *f = fiber_create(L);
+ fiber_start(f);
+ return 1;
+}
+
+/**
+ * Create a fiber, schedule it for execution, but not invoke yet
+ */
+static int
+lbox_fiber_new(struct lua_State *L)
+{
+ if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
+ luaL_error(L, "fiber.new(function, ...): bad arguments");
+ if (fiber_checkstack())
+ luaL_error(L, "fiber.new(): out of fiber stack");
+
+ struct fiber *f = fiber_create(L);
+ fiber_wakeup(f);
return 1;
}
@@ -567,6 +600,51 @@ lbox_fiber_wakeup(struct lua_State *L)
return 0;
}
+static int
+lbox_fiber_join(struct lua_State *L)
+{
+ struct fiber *fiber = lbox_checkfiber(L, 1);
+ fiber_join_wait(fiber);
+ bool fiber_was_cancelled = fiber->flags & FIBER_IS_CANCELLED;
+ struct error *e = NULL;
+ if (fiber->f_ret != 0 && !fiber_was_cancelled) {
+ /*
+ * We do not want to spoil the diag of the current
+ * fiber so not calling luaT_error().
+ */
+ assert(!diag_is_empty(&fiber->diag));
+ e = diag_last_error(&fiber->diag);
+ error_ref(e);
+ luaT_pusherror(L, e);
+ diag_clear(&fiber->diag);
+ }
+ struct lua_State *child_L = fiber_get_key(fiber, FIBER_KEY_RESULT);
+ int num_ret = 0;
+ if (child_L != NULL) {
+ num_ret = lua_gettop(child_L);
+ lua_xmove(child_L, L, num_ret);
+ }
+ fiber_recycle(fiber);
+ if (e != NULL) {
+ error_unref(e);
+ lua_error(L);
+ }
+ return num_ret;
+}
+
+static int
+lbox_fiber_set_joinable(struct lua_State *L)
+{
+
+ if (lua_gettop(L) != 2) {
+ luaL_error(L, "fiber.set_joinable(id, yesno): bad arguments");
+ }
+ struct fiber *fiber = lbox_checkfiber(L, 1);
+ bool yesno = lua_toboolean(L, 2);
+ fiber_set_joinable(fiber, yesno);
+ return 0;
+}
+
static const struct luaL_Reg lbox_fiber_meta [] = {
{"id", lbox_fiber_id},
{"name", lbox_fiber_name},
@@ -575,6 +653,8 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
{"testcancel", lbox_fiber_testcancel},
{"__serialize", lbox_fiber_serialize},
{"__tostring", lbox_fiber_tostring},
+ {"join", lbox_fiber_join},
+ {"set_joinable", lbox_fiber_set_joinable},
{"wakeup", lbox_fiber_wakeup},
{"__index", lbox_fiber_index},
{NULL, NULL}
@@ -589,9 +669,12 @@ static const struct luaL_Reg fiberlib[] = {
{"find", lbox_fiber_find},
{"kill", lbox_fiber_cancel},
{"wakeup", lbox_fiber_wakeup},
+ {"join", lbox_fiber_join},
+ {"set_joinable", lbox_fiber_set_joinable},
{"cancel", lbox_fiber_cancel},
{"testcancel", lbox_fiber_testcancel},
{"create", lbox_fiber_create},
+ {"new", lbox_fiber_new},
{"status", lbox_fiber_status},
{"name", lbox_fiber_name},
{NULL, NULL}
diff --git a/test/app/fiber.result b/test/app/fiber.result
index 08d38ef..3ecdbd0 100644
--- a/test/app/fiber.result
+++ b/test/app/fiber.result
@@ -975,6 +975,148 @@ session_type
session_type = nil
---
...
+-- gh-1397 fiber.new, fiber.join
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
+---
+...
+function test1()
+ f = fiber.new(err)
+ f:set_joinable(true)
+ local e = f:join()
+ local e2 = f:join()
+ return e, e2
+end;
+---
+...
+test1();
+---
+- error: Illegal parameters, oh my
+...
+flag = false;
+---
+...
+function test2()
+ f = fiber.new(function() flag = true end)
+ fiber.set_joinable(f, true)
+ fiber.join(f)
+end;
+---
+...
+test2();
+---
+...
+flag;
+---
+- true
+...
+function test3()
+ f = fiber.new(function() return "hello" end)
+ fiber.set_joinable(f, true)
+ return fiber.join(f)
+end;
+---
+...
+test3();
+---
+- hello
+...
+function test4()
+ f = fiber.new(function (i) return i + 1 end, 1)
+ fiber.set_joinable(f, true)
+ return f:join()
+end;
+---
+...
+test4();
+---
+- 2
+...
+function test5()
+ f = fiber.new(function() end)
+ f:set_joinable(true)
+ return f, f:status()
+end;
+---
+...
+local status;
+---
+...
+f, status = test5();
+---
+...
+status;
+---
+- suspended
+...
+f:status();
+---
+- suspended
+...
+f:join();
+---
+...
+f:status();
+---
+- dead
+...
+function test6()
+ f = fiber.new(function() end)
+ f:set_joinable(true)
+ f:set_joinable(false)
+ return f, f:status()
+end;
+---
+...
+f, status = test6();
+---
+...
+status;
+---
+- suspended
+...
+f:status();
+---
+- dead
+...
+-- test side fiber in transaction
+s = box.schema.space.create("test");
+---
+...
+_ = s:create_index("prim", {parts={1, 'number'}});
+---
+...
+flag = false;
+---
+...
+function test7(i)
+ box.begin()
+ s:put{i}
+ fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
+ box.rollback()
+end;
+---
+...
+f = fiber.create(test7, 1);
+---
+...
+while flag ~= true do fiber.sleep(0.001) end;
+---
+...
+s:select{};
+---
+- - [2]
+...
+s:drop();
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
fiber = nil
---
...
diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
index ad3da09..b509f4a 100644
--- a/test/app/fiber.test.lua
+++ b/test/app/fiber.test.lua
@@ -402,6 +402,78 @@ _ = fiber.create(fn1)
session_type
session_type = nil
+-- gh-1397 fiber.new, fiber.join
+test_run:cmd("setopt delimiter ';'")
+function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
+function test1()
+ f = fiber.new(err)
+ f:set_joinable(true)
+ local e = f:join()
+ local e2 = f:join()
+ return e, e2
+end;
+test1();
+
+flag = false;
+function test2()
+ f = fiber.new(function() flag = true end)
+ fiber.set_joinable(f, true)
+ fiber.join(f)
+end;
+test2();
+flag;
+
+function test3()
+ f = fiber.new(function() return "hello" end)
+ fiber.set_joinable(f, true)
+ return fiber.join(f)
+end;
+test3();
+
+function test4()
+ f = fiber.new(function (i) return i + 1 end, 1)
+ fiber.set_joinable(f, true)
+ return f:join()
+end;
+test4();
+
+function test5()
+ f = fiber.new(function() end)
+ f:set_joinable(true)
+ return f, f:status()
+end;
+local status;
+f, status = test5();
+status;
+f:status();
+f:join();
+f:status();
+
+function test6()
+ f = fiber.new(function() end)
+ f:set_joinable(true)
+ f:set_joinable(false)
+ return f, f:status()
+end;
+f, status = test6();
+status;
+f:status();
+
+-- test side fiber in transaction
+s = box.schema.space.create("test");
+_ = s:create_index("prim", {parts={1, 'number'}});
+flag = false;
+function test7(i)
+ box.begin()
+ s:put{i}
+ fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
+ box.rollback()
+end;
+f = fiber.create(test7, 1);
+while flag ~= true do fiber.sleep(0.001) end;
+s:select{};
+s:drop();
+test_run:cmd("setopt delimiter ''");
fiber = nil
--
--
2.7.4
More information about the Tarantool-patches
mailing list