Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@dev.tarantool.org, imun@tarantool.org,
	korablev@tarantool.org
Subject: [Tarantool-patches] [PATCH v2 1/3] fiber: introduce schedule_task() internal function
Date: Tue,  3 Mar 2020 00:29:51 +0100	[thread overview]
Message-ID: <136873bd559abc466dc2761006f7954fea0c1a68.1583191602.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1583191602.git.v.shpilevoy@tarantool.org>

fiber._internal.schedule_task() is an API for a singleton fiber
worker object. It serves for not urgent delayed execution of
functions. Main purpose - schedule execution of a function, which
is going to yield, from a context, where a yield is not allowed.
Such as an FFI object's GC callback.

It will be used by SWIM and by fio, whose destruction yields, but
they need to use ffi.gc hook, where a yield is not allowed.

Part of #4727
---
 src/lua/fiber.c         | 15 +++++++++
 src/lua/fiber.lua       | 60 ++++++++++++++++++++++++++++++++++
 test/app/fiber.result   | 72 +++++++++++++++++++++++++++++++++++++++++
 test/app/fiber.test.lua | 41 +++++++++++++++++++++++
 4 files changed, 188 insertions(+)

diff --git a/src/lua/fiber.c b/src/lua/fiber.c
index 575a020d0..ddf827ab6 100644
--- a/src/lua/fiber.c
+++ b/src/lua/fiber.c
@@ -828,6 +828,19 @@ lbox_fiber_set_joinable(struct lua_State *L)
 	return 0;
 }
 
+/**
+ * Alternative to fiber.sleep(infinite) which does not participate
+ * in an event loop at all until an explicit wakeup. This is less
+ * overhead. Useful for fibers sleeping most of the time.
+ */
+static int
+lbox_fiber_sleep_infinite(struct lua_State *L)
+{
+	(void) L;
+	fiber_yield();
+	return 0;
+}
+
 static const struct luaL_Reg lbox_fiber_meta [] = {
 	{"id", lbox_fiber_id},
 	{"name", lbox_fiber_name},
@@ -865,6 +878,8 @@ static const struct luaL_Reg fiberlib[] = {
 	{"new", lbox_fiber_new},
 	{"status", lbox_fiber_status},
 	{"name", lbox_fiber_name},
+	/* Internal functions, to hide in fiber.lua. */
+	{"sleep_infinite", lbox_fiber_sleep_infinite},
 	{NULL, NULL}
 };
 
diff --git a/src/lua/fiber.lua b/src/lua/fiber.lua
index 8712ee0d6..d0b765b60 100644
--- a/src/lua/fiber.lua
+++ b/src/lua/fiber.lua
@@ -34,4 +34,64 @@ fiber.time = fiber_time
 fiber.time64 = fiber_time64
 fiber.clock = fiber_clock
 fiber.clock64 = fiber_clock64
+
+local sleep_infinite = fiber.sleep_infinite
+fiber.sleep_infinite = nil
+
+local worker_next_task = nil
+local worker_last_task = nil
+local worker_fiber = nil
+
+--
+-- Worker is a singleton fiber for not urgent delayed execution of
+-- functions. Main purpose - schedule execution of a function,
+-- which is going to yield, from a context, where a yield is not
+-- allowed. Such as an FFI object's GC callback.
+--
+local function worker_f()
+    local task
+    while true do
+        while true do
+            task = worker_next_task
+            if task then
+                break
+            end
+            sleep_infinite()
+        end
+        worker_next_task = task.next
+        task.f(task.arg)
+        fiber.sleep(0)
+    end
+end
+
+local function worker_safe_f()
+    pcall(worker_f)
+    -- This fiber is probably canceled and now is not able to
+    -- sleep, create a new one.
+    worker_fiber = fiber.new(worker_safe_f)
+end
+
+worker_fiber = fiber.new(worker_safe_f)
+
+local function worker_schedule_task(f, arg)
+    local task = {f = f, arg = arg}
+    if not worker_next_task then
+        worker_next_task = task
+    else
+        worker_last_task.next = task
+    end
+    worker_last_task = task
+    worker_fiber:wakeup()
+end
+
+-- Start from '_' to hide it from auto completion.
+fiber._internal = fiber._internal or {}
+fiber._internal.schedule_task = worker_schedule_task
+
+setmetatable(fiber, {__serialize = function(self)
+    local res = table.copy(self)
+    res._internal = nil
+    return setmetatable(res, {})
+end})
+
 return fiber
diff --git a/test/app/fiber.result b/test/app/fiber.result
index 6d9604ad8..bd60e1483 100644
--- a/test/app/fiber.result
+++ b/test/app/fiber.result
@@ -1561,6 +1561,78 @@ fiber.top()
 ---
 - error: fiber.top() is disabled. Enable it with fiber.top_enable() first
 ...
+--
+-- fiber._internal.schedule_task() - API for internal usage for
+-- delayed execution of a function.
+--
+glob_arg = {}
+---
+...
+count = 0
+---
+...
+function task_f(arg)                                                            \
+    count = count + 1                                                           \
+    table.insert(glob_arg, arg)                                                 \
+    arg = arg + 1                                                               \
+    if arg <= 3 then                                                            \
+        fiber._internal.schedule_task(task_f, arg)                              \
+    else                                                                        \
+        fiber.self():cancel()                                                   \
+        error('Worker is broken')                                               \
+    end                                                                         \
+end
+---
+...
+for i = 1, 3 do                                                                 \
+    local csw1 = fiber.info()[fiber.id()].csw                                   \
+    fiber._internal.schedule_task(task_f, i)                                    \
+    local csw2 = fiber.info()[fiber.id()].csw                                   \
+    assert(csw1 == csw2 and csw1 ~= nil)                                        \
+end
+---
+...
+old_count = count
+---
+...
+test_run:wait_cond(function()                                                   \
+    fiber.yield()                                                               \
+    if count == old_count then                                                  \
+        return true                                                             \
+    end                                                                         \
+    old_count = count                                                           \
+end)
+---
+- true
+...
+glob_arg
+---
+- - 1
+  - 2
+  - 3
+  - 2
+  - 3
+  - 3
+...
+count
+---
+- 6
+...
+-- Ensure, that after all tasks are finished, the worker didn't
+-- stuck somewhere.
+glob_arg = nil
+---
+...
+fiber._internal.schedule_task(function(arg) glob_arg = arg end, 100)
+---
+...
+fiber.yield()
+---
+...
+glob_arg
+---
+- 100
+...
 -- cleanup
 test_run:cmd("clear filter")
 ---
diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua
index 6df210d9c..f10782d1f 100644
--- a/test/app/fiber.test.lua
+++ b/test/app/fiber.test.lua
@@ -688,6 +688,47 @@ tbl.time > 0
 fiber.top_disable()
 fiber.top()
 
+--
+-- fiber._internal.schedule_task() - API for internal usage for
+-- delayed execution of a function.
+--
+glob_arg = {}
+count = 0
+function task_f(arg)                                                            \
+    count = count + 1                                                           \
+    table.insert(glob_arg, arg)                                                 \
+    arg = arg + 1                                                               \
+    if arg <= 3 then                                                            \
+        fiber._internal.schedule_task(task_f, arg)                              \
+    else                                                                        \
+        fiber.self():cancel()                                                   \
+        error('Worker is broken')                                               \
+    end                                                                         \
+end
+for i = 1, 3 do                                                                 \
+    local csw1 = fiber.info()[fiber.id()].csw                                   \
+    fiber._internal.schedule_task(task_f, i)                                    \
+    local csw2 = fiber.info()[fiber.id()].csw                                   \
+    assert(csw1 == csw2 and csw1 ~= nil)                                        \
+end
+old_count = count
+test_run:wait_cond(function()                                                   \
+    fiber.yield()                                                               \
+    if count == old_count then                                                  \
+        return true                                                             \
+    end                                                                         \
+    old_count = count                                                           \
+end)
+glob_arg
+count
+
+-- Ensure, that after all tasks are finished, the worker didn't
+-- stuck somewhere.
+glob_arg = nil
+fiber._internal.schedule_task(function(arg) glob_arg = arg end, 100)
+fiber.yield()
+glob_arg
+
 -- cleanup
 test_run:cmd("clear filter")
 
-- 
2.21.1 (Apple Git-122.3)

  reply	other threads:[~2020-03-02 23:29 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-03-02 23:29 [Tarantool-patches] [PATCH v2 0/3] fio: close unused descriptors automatically Vladislav Shpilevoy
2020-03-02 23:29 ` Vladislav Shpilevoy [this message]
2020-03-19 14:52   ` [Tarantool-patches] [PATCH v2 1/3] fiber: introduce schedule_task() internal function Igor Munkin
2020-03-20  0:00     ` Vladislav Shpilevoy
2020-03-20 10:48       ` Igor Munkin
2020-03-02 23:29 ` [Tarantool-patches] [PATCH v2 2/3] fio: close unused descriptors automatically Vladislav Shpilevoy
2020-03-19 14:53   ` Igor Munkin
2020-03-19 22:53     ` Igor Munkin
2020-03-20  0:00       ` Vladislav Shpilevoy
2020-03-20 10:48         ` Igor Munkin
2020-03-20 21:28           ` Vladislav Shpilevoy
2020-03-20 21:28             ` Igor Munkin
2020-03-02 23:29 ` [Tarantool-patches] [PATCH v2 3/3] swim: use fiber._internal.schedule_task() for GC Vladislav Shpilevoy
2020-03-19 14:53   ` Igor Munkin
2020-03-26  1:08 ` [Tarantool-patches] [PATCH v2 0/3] fio: close unused descriptors automatically Nikita Pettik
2020-03-26 12:56 ` Kirill Yukhin

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=136873bd559abc466dc2761006f7954fea0c1a68.1583191602.git.v.shpilevoy@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=imun@tarantool.org \
    --cc=korablev@tarantool.org \
    --cc=tarantool-patches@dev.tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v2 1/3] fiber: introduce schedule_task() internal function' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox