[patches] [fiber 1/1] fiber: Introduce fiber.join() related methods

imarkov imarkov at tarantool.org
Thu Feb 15 14:49:33 MSK 2018


From: Ilya <markovilya197 at gmail.com>

Introduce two functions
* fiber.new() - create fiber, schedules it into the
ready queue but doesn't call it and doesn't yield.
Signature of the method is the same as for fiber.create
* fiber.join() - waits until the specified fiber finishes
its execution and returns result or error. Applicable only
to joinable fibers.
* fiber.set_joinable() - sets the fiber joinable flag

Closes #1397
---
 src/fiber.c             |  16 ++++--
 src/fiber.h             |  12 +++-
 src/lua/fiber.c         | 113 +++++++++++++++++++++++++++++++++-----
 test/app/fiber.result   | 142 ++++++++++++++++++++++++++++++++++++++++++++++++
 test/app/fiber.test.lua |  72 ++++++++++++++++++++++++
 5 files changed, 333 insertions(+), 22 deletions(-)

diff --git a/src/fiber.c b/src/fiber.c
index 9415739..cd3263d 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -148,8 +148,6 @@ fiber_attr_getstacksize(struct fiber_attr *fiber_attr)
 				    fiber_attr_default.stack_size;
 }
 
-static void
-fiber_recycle(struct fiber *fiber);
 
 static void
 fiber_destroy(struct cord *cord, struct fiber *f);
@@ -376,8 +374,8 @@ fiber_reschedule(void)
 	fiber_yield();
 }
 
-int
-fiber_join(struct fiber *fiber)
+void
+fiber_join_wait(struct fiber *fiber)
 {
 	assert(fiber->flags & FIBER_IS_JOINABLE);
 
@@ -386,8 +384,14 @@ fiber_join(struct fiber *fiber)
 		fiber_yield();
 	}
 	assert(fiber_is_dead(fiber));
-	/* Move exception to the caller */
+}
+
+int
+fiber_join(struct fiber *fiber)
+{
+	fiber_join_wait(fiber);
 	int ret = fiber->f_ret;
+	/* Move exception to the caller */
 	if (ret != 0) {
 		assert(!diag_is_empty(&fiber->diag));
 		diag_move(&fiber->diag, &fiber()->diag);
@@ -598,7 +602,7 @@ fiber_reset(struct fiber *fiber)
 }
 
 /** Destroy an active fiber and prepare it for reuse. */
-static void
+void
 fiber_recycle(struct fiber *fiber)
 {
 	/* no exceptions are leaking */
diff --git a/src/fiber.h b/src/fiber.h
index 94b3f44..72caaf9 100644
--- a/src/fiber.h
+++ b/src/fiber.h
@@ -105,7 +105,8 @@ enum fiber_key {
 	/** User global privilege and authentication token */
 	FIBER_KEY_USER = 3,
 	FIBER_KEY_MSG = 4,
-	FIBER_KEY_MAX = 5
+	FIBER_KEY_RESULT = 5,
+	FIBER_KEY_MAX = 6
 };
 
 /** \cond public */
@@ -625,6 +626,14 @@ fiber_c_invoke(fiber_func f, va_list ap)
 	return f(ap);
 }
 
+/** Destroy an active fiber and prepare it for reuse. */
+void
+fiber_recycle(struct fiber *fiber);
+
+/** Wait until fiber is finished. Applicable only to joinable fibers */
+void
+fiber_join_wait(struct fiber *fiber);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 
@@ -670,6 +679,7 @@ fiber_cxx_invoke(fiber_func f, va_list ap)
 
 #endif /* defined(__cplusplus) */
 
+
 static inline void *
 region_aligned_alloc_cb(void *ctx, size_t size)
 {
diff --git a/src/lua/fiber.c b/src/lua/fiber.c
index 83b5825..54b0341 100644
--- a/src/lua/fiber.c
+++ b/src/lua/fiber.c
@@ -291,13 +291,14 @@ lbox_fiber_info(struct lua_State *L)
 }
 
 static int
-lua_fiber_run_f(va_list ap)
+lua_fiber_run_f(MAYBE_UNUSED va_list ap)
 {
 	int result;
-	int coro_ref = va_arg(ap, int);
-	struct lua_State *L = va_arg(ap, struct lua_State *);
-
-	result = luaT_call(L, lua_gettop(L) - 1, 0);
+	struct lua_State *L = (struct lua_State *)
+		fiber_get_key(fiber(), FIBER_KEY_RESULT);
+	int coro_ref = lua_tointeger(L, -1);
+	lua_pop(L, 1);
+	result = luaT_call(L, lua_gettop(L) - 1, LUA_MULTRET);
 
 	/* Destroy local storage */
 	int storage_ref = (int)(intptr_t)
@@ -309,17 +310,11 @@ lua_fiber_run_f(va_list ap)
 }
 
 /**
- * Create, resume and detach a fiber
- * given the function and its arguments.
+ * Utility function for fiber.create and fiber.new
  */
-static int
-lbox_fiber_create(struct lua_State *L)
+static struct fiber *
+fiber_create(struct lua_State *L)
 {
-	if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
-		luaL_error(L, "fiber.create(function, ...): bad arguments");
-	if (fiber_checkstack())
-		luaL_error(L, "fiber.create(): out of fiber stack");
-
 	struct lua_State *child_L = lua_newthread(L);
 	int coro_ref = luaL_ref(L, LUA_REGISTRYINDEX);
 
@@ -333,7 +328,45 @@ lbox_fiber_create(struct lua_State *L)
 	lua_xmove(L, child_L, lua_gettop(L));
 	/* XXX: 'fiber' is leaked if this throws a Lua error. */
 	lbox_pushfiber(L, f->fid);
-	fiber_start(f, coro_ref, child_L);
+	/* Pass coro_ref via lua stack so that we don't have to pass it
+	 * as an argument of fiber_run function.
+	 * No function will work with child_L until the function is called.
+	 * At that time we can pop coro_ref from stack
+	 */
+	lua_pushinteger(child_L, coro_ref);
+	fiber_set_key(f, FIBER_KEY_RESULT, child_L);
+	return f;
+}
+
+/**
+ * Create, resume and detach a fiber
+ * given the function and its arguments.
+ */
+static int
+lbox_fiber_create(struct lua_State *L)
+{
+	if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
+		luaL_error(L, "fiber.create(function, ...): bad arguments");
+	if (fiber_checkstack())
+		luaL_error(L, "fiber.create(): out of fiber stack");
+	struct fiber *f = fiber_create(L);
+	fiber_start(f);
+	return 1;
+}
+
+/**
+ * Create a fiber, schedule it for execution, but not invoke yet
+ */
+static int
+lbox_fiber_new(struct lua_State *L)
+{
+	if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
+		luaL_error(L, "fiber.new(function, ...): bad arguments");
+	if (fiber_checkstack())
+		luaL_error(L, "fiber.new(): out of fiber stack");
+
+	struct fiber *f = fiber_create(L);
+	fiber_wakeup(f);
 	return 1;
 }
 
@@ -567,6 +600,51 @@ lbox_fiber_wakeup(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_fiber_join(struct lua_State *L)
+{
+	struct fiber *fiber = lbox_checkfiber(L, 1);
+	fiber_join_wait(fiber);
+	bool fiber_was_cancelled = fiber->flags & FIBER_IS_CANCELLED;
+	struct error *e = NULL;
+	if (fiber->f_ret != 0 && !fiber_was_cancelled) {
+		/*
+		 * We do not want to spoil the diag of the current
+		 * fiber so not calling luaT_error().
+		 */
+		assert(!diag_is_empty(&fiber->diag));
+		e = diag_last_error(&fiber->diag);
+		error_ref(e);
+		luaT_pusherror(L, e);
+		diag_clear(&fiber->diag);
+	}
+	struct lua_State *child_L = fiber_get_key(fiber, FIBER_KEY_RESULT);
+	int num_ret = 0;
+	if (child_L != NULL) {
+		num_ret = lua_gettop(child_L);
+		lua_xmove(child_L, L, num_ret);
+	}
+	fiber_recycle(fiber);
+	if (e != NULL) {
+		error_unref(e);
+		lua_error(L);
+	}
+	return num_ret;
+}
+
+static int
+lbox_fiber_set_joinable(struct lua_State *L)
+{
+
+	if (lua_gettop(L) != 2) {
+		luaL_error(L, "fiber.set_joinable(id, yesno): bad arguments");
+	}
+	struct fiber *fiber = lbox_checkfiber(L, 1);
+	bool yesno = lua_toboolean(L, 2);
+	fiber_set_joinable(fiber, yesno);
+	return 0;
+}
+
 static const struct luaL_Reg lbox_fiber_meta [] = {
 	{"id", lbox_fiber_id},
 	{"name", lbox_fiber_name},
@@ -575,6 +653,8 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
 	{"testcancel", lbox_fiber_testcancel},
 	{"__serialize", lbox_fiber_serialize},
 	{"__tostring", lbox_fiber_tostring},
+	{"join", lbox_fiber_join},
+	{"set_joinable", lbox_fiber_set_joinable},
 	{"wakeup", lbox_fiber_wakeup},
 	{"__index", lbox_fiber_index},
 	{NULL, NULL}
@@ -589,9 +669,12 @@ static const struct luaL_Reg fiberlib[] = {
 	{"find", lbox_fiber_find},
 	{"kill", lbox_fiber_cancel},
 	{"wakeup", lbox_fiber_wakeup},
+	{"join", lbox_fiber_join},
+	{"set_joinable", lbox_fiber_set_joinable},
 	{"cancel", lbox_fiber_cancel},
 	{"testcancel", lbox_fiber_testcancel},
 	{"create", lbox_fiber_create},
+	{"new", lbox_fiber_new},
 	{"status", lbox_fiber_status},
 	{"name", lbox_fiber_name},
 	{NULL, NULL}
diff --git a/test/app/fiber.result b/test/app/fiber.result
index 08d38ef..3ecdbd0 100644
--- a/test/app/fiber.result
+++ b/test/app/fiber.result
@@ -975,6 +975,148 @@ session_type
 session_type = nil
 ---
 ...
+-- gh-1397 fiber.new, fiber.join
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
+---
+...
+function test1()
+    f = fiber.new(err)
+    f:set_joinable(true)
+    local e = f:join()
+    local e2 = f:join()
+    return e, e2
+end;
+---
+...
+test1();
+---
+- error: Illegal parameters, oh my
+...
+flag = false;
+---
+...
+function test2()
+    f = fiber.new(function() flag = true  end)
+    fiber.set_joinable(f, true)
+    fiber.join(f)
+end;
+---
+...
+test2();
+---
+...
+flag;
+---
+- true
+...
+function test3()
+    f = fiber.new(function() return "hello" end)
+    fiber.set_joinable(f, true)
+    return fiber.join(f)
+end;
+---
+...
+test3();
+---
+- hello
+...
+function test4()
+    f = fiber.new(function (i) return i + 1  end, 1)
+    fiber.set_joinable(f, true)
+    return f:join()
+end;
+---
+...
+test4();
+---
+- 2
+...
+function test5()
+    f = fiber.new(function() end)
+    f:set_joinable(true)
+    return f, f:status()
+end;
+---
+...
+local status;
+---
+...
+f, status = test5();
+---
+...
+status;
+---
+- suspended
+...
+f:status();
+---
+- suspended
+...
+f:join();
+---
+...
+f:status();
+---
+- dead
+...
+function test6()
+    f = fiber.new(function() end)
+    f:set_joinable(true)
+    f:set_joinable(false)
+    return f, f:status()
+end;
+---
+...
+f, status = test6();
+---
+...
+status;
+---
+- suspended
+...
+f:status();
+---
+- dead
+...
+-- test side fiber in transaction
+s = box.schema.space.create("test");
+---
+...
+_ = s:create_index("prim", {parts={1, 'number'}});
+---
+...
+flag = false;
+---
+...
+function test7(i)
+    box.begin()
+    s:put{i}
+    fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
+    box.rollback()
+end;
+---
+...
+f = fiber.create(test7, 1);
+---
+...
+while flag ~= true do fiber.sleep(0.001) end;
+---
+...
+s:select{};
+---
+- - [2]
+...
+s:drop();
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
 fiber = nil
 ---
 ...
diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
index ad3da09..b509f4a 100644
--- a/test/app/fiber.test.lua
+++ b/test/app/fiber.test.lua
@@ -402,6 +402,78 @@ _ = fiber.create(fn1)
 session_type
 session_type = nil
 
+-- gh-1397 fiber.new, fiber.join
+test_run:cmd("setopt delimiter ';'")
+function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
+function test1()
+    f = fiber.new(err)
+    f:set_joinable(true)
+    local e = f:join()
+    local e2 = f:join()
+    return e, e2
+end;
+test1();
+
+flag = false;
+function test2()
+    f = fiber.new(function() flag = true  end)
+    fiber.set_joinable(f, true)
+    fiber.join(f)
+end;
+test2();
+flag;
+
+function test3()
+    f = fiber.new(function() return "hello" end)
+    fiber.set_joinable(f, true)
+    return fiber.join(f)
+end;
+test3();
+
+function test4()
+    f = fiber.new(function (i) return i + 1  end, 1)
+    fiber.set_joinable(f, true)
+    return f:join()
+end;
+test4();
+
+function test5()
+    f = fiber.new(function() end)
+    f:set_joinable(true)
+    return f, f:status()
+end;
+local status;
+f, status = test5();
+status;
+f:status();
+f:join();
+f:status();
+
+function test6()
+    f = fiber.new(function() end)
+    f:set_joinable(true)
+    f:set_joinable(false)
+    return f, f:status()
+end;
+f, status = test6();
+status;
+f:status();
+
+-- test side fiber in transaction
+s = box.schema.space.create("test");
+_ = s:create_index("prim", {parts={1, 'number'}});
+flag = false;
+function test7(i)
+    box.begin()
+    s:put{i}
+    fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
+    box.rollback()
+end;
+f = fiber.create(test7, 1);
+while flag ~= true do fiber.sleep(0.001) end;
+s:select{};
+s:drop();
+test_run:cmd("setopt delimiter ''");
 fiber = nil
 
 --
-- 
2.7.4




More information about the Tarantool-patches mailing list