[Tarantool-patches] [PATCH v2 1/3] fiber: introduce schedule_task() internal function

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Mar 3 02:29:51 MSK 2020


fiber._internal.schedule_task() is an API for a singleton fiber
worker object. It serves for not urgent delayed execution of
functions. Main purpose - schedule execution of a function, which
is going to yield, from a context, where a yield is not allowed.
Such as an FFI object's GC callback.

It will be used by SWIM and by fio, whose destruction yields, but
they need to use ffi.gc hook, where a yield is not allowed.

Part of #4727
---
 src/lua/fiber.c         | 15 +++++++++
 src/lua/fiber.lua       | 60 ++++++++++++++++++++++++++++++++++
 test/app/fiber.result   | 72 +++++++++++++++++++++++++++++++++++++++++
 test/app/fiber.test.lua | 41 +++++++++++++++++++++++
 4 files changed, 188 insertions(+)

diff --git a/src/lua/fiber.c b/src/lua/fiber.c
index 575a020d0..ddf827ab6 100644
--- a/src/lua/fiber.c
+++ b/src/lua/fiber.c
@@ -828,6 +828,19 @@ lbox_fiber_set_joinable(struct lua_State *L)
 	return 0;
 }
 
+/**
+ * Alternative to fiber.sleep(infinite) which does not participate
+ * in an event loop at all until an explicit wakeup. This is less
+ * overhead. Useful for fibers sleeping most of the time.
+ */
+static int
+lbox_fiber_sleep_infinite(struct lua_State *L)
+{
+	(void) L;
+	fiber_yield();
+	return 0;
+}
+
 static const struct luaL_Reg lbox_fiber_meta [] = {
 	{"id", lbox_fiber_id},
 	{"name", lbox_fiber_name},
@@ -865,6 +878,8 @@ static const struct luaL_Reg fiberlib[] = {
 	{"new", lbox_fiber_new},
 	{"status", lbox_fiber_status},
 	{"name", lbox_fiber_name},
+	/* Internal functions, to hide in fiber.lua. */
+	{"sleep_infinite", lbox_fiber_sleep_infinite},
 	{NULL, NULL}
 };
 
diff --git a/src/lua/fiber.lua b/src/lua/fiber.lua
index 8712ee0d6..d0b765b60 100644
--- a/src/lua/fiber.lua
+++ b/src/lua/fiber.lua
@@ -34,4 +34,64 @@ fiber.time = fiber_time
 fiber.time64 = fiber_time64
 fiber.clock = fiber_clock
 fiber.clock64 = fiber_clock64
+
+local sleep_infinite = fiber.sleep_infinite
+fiber.sleep_infinite = nil
+
+local worker_next_task = nil
+local worker_last_task = nil
+local worker_fiber = nil
+
+--
+-- Worker is a singleton fiber for not urgent delayed execution of
+-- functions. Main purpose - schedule execution of a function,
+-- which is going to yield, from a context, where a yield is not
+-- allowed. Such as an FFI object's GC callback.
+--
+local function worker_f()
+    local task
+    while true do
+        while true do
+            task = worker_next_task
+            if task then
+                break
+            end
+            sleep_infinite()
+        end
+        worker_next_task = task.next
+        task.f(task.arg)
+        fiber.sleep(0)
+    end
+end
+
+local function worker_safe_f()
+    pcall(worker_f)
+    -- This fiber is probably canceled and now is not able to
+    -- sleep, create a new one.
+    worker_fiber = fiber.new(worker_safe_f)
+end
+
+worker_fiber = fiber.new(worker_safe_f)
+
+local function worker_schedule_task(f, arg)
+    local task = {f = f, arg = arg}
+    if not worker_next_task then
+        worker_next_task = task
+    else
+        worker_last_task.next = task
+    end
+    worker_last_task = task
+    worker_fiber:wakeup()
+end
+
+-- Start from '_' to hide it from auto completion.
+fiber._internal = fiber._internal or {}
+fiber._internal.schedule_task = worker_schedule_task
+
+setmetatable(fiber, {__serialize = function(self)
+    local res = table.copy(self)
+    res._internal = nil
+    return setmetatable(res, {})
+end})
+
 return fiber
diff --git a/test/app/fiber.result b/test/app/fiber.result
index 6d9604ad8..bd60e1483 100644
--- a/test/app/fiber.result
+++ b/test/app/fiber.result
@@ -1561,6 +1561,78 @@ fiber.top()
 ---
 - error: fiber.top() is disabled. Enable it with fiber.top_enable() first
 ...
+--
+-- fiber._internal.schedule_task() - API for internal usage for
+-- delayed execution of a function.
+--
+glob_arg = {}
+---
+...
+count = 0
+---
+...
+function task_f(arg)                                                            \
+    count = count + 1                                                           \
+    table.insert(glob_arg, arg)                                                 \
+    arg = arg + 1                                                               \
+    if arg <= 3 then                                                            \
+        fiber._internal.schedule_task(task_f, arg)                              \
+    else                                                                        \
+        fiber.self():cancel()                                                   \
+        error('Worker is broken')                                               \
+    end                                                                         \
+end
+---
+...
+for i = 1, 3 do                                                                 \
+    local csw1 = fiber.info()[fiber.id()].csw                                   \
+    fiber._internal.schedule_task(task_f, i)                                    \
+    local csw2 = fiber.info()[fiber.id()].csw                                   \
+    assert(csw1 == csw2 and csw1 ~= nil)                                        \
+end
+---
+...
+old_count = count
+---
+...
+test_run:wait_cond(function()                                                   \
+    fiber.yield()                                                               \
+    if count == old_count then                                                  \
+        return true                                                             \
+    end                                                                         \
+    old_count = count                                                           \
+end)
+---
+- true
+...
+glob_arg
+---
+- - 1
+  - 2
+  - 3
+  - 2
+  - 3
+  - 3
+...
+count
+---
+- 6
+...
+-- Ensure, that after all tasks are finished, the worker didn't
+-- stuck somewhere.
+glob_arg = nil
+---
+...
+fiber._internal.schedule_task(function(arg) glob_arg = arg end, 100)
+---
+...
+fiber.yield()
+---
+...
+glob_arg
+---
+- 100
+...
 -- cleanup
 test_run:cmd("clear filter")
 ---
diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
index 6df210d9c..f10782d1f 100644
--- a/test/app/fiber.test.lua
+++ b/test/app/fiber.test.lua
@@ -688,6 +688,47 @@ tbl.time > 0
 fiber.top_disable()
 fiber.top()
 
+--
+-- fiber._internal.schedule_task() - API for internal usage for
+-- delayed execution of a function.
+--
+glob_arg = {}
+count = 0
+function task_f(arg)                                                            \
+    count = count + 1                                                           \
+    table.insert(glob_arg, arg)                                                 \
+    arg = arg + 1                                                               \
+    if arg <= 3 then                                                            \
+        fiber._internal.schedule_task(task_f, arg)                              \
+    else                                                                        \
+        fiber.self():cancel()                                                   \
+        error('Worker is broken')                                               \
+    end                                                                         \
+end
+for i = 1, 3 do                                                                 \
+    local csw1 = fiber.info()[fiber.id()].csw                                   \
+    fiber._internal.schedule_task(task_f, i)                                    \
+    local csw2 = fiber.info()[fiber.id()].csw                                   \
+    assert(csw1 == csw2 and csw1 ~= nil)                                        \
+end
+old_count = count
+test_run:wait_cond(function()                                                   \
+    fiber.yield()                                                               \
+    if count == old_count then                                                  \
+        return true                                                             \
+    end                                                                         \
+    old_count = count                                                           \
+end)
+glob_arg
+count
+
+-- Ensure, that after all tasks are finished, the worker didn't
+-- stuck somewhere.
+glob_arg = nil
+fiber._internal.schedule_task(function(arg) glob_arg = arg end, 100)
+fiber.yield()
+glob_arg
+
 -- cleanup
 test_run:cmd("clear filter")
 
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list