From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp1.mail.ru (smtp1.mail.ru [94.100.179.111]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id A279B469719 for ; Tue, 3 Mar 2020 02:29:57 +0300 (MSK) From: Vladislav Shpilevoy Date: Tue, 3 Mar 2020 00:29:51 +0100 Message-Id: <136873bd559abc466dc2761006f7954fea0c1a68.1583191602.git.v.shpilevoy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v2 1/3] fiber: introduce schedule_task() internal function List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, imun@tarantool.org, korablev@tarantool.org fiber._internal.schedule_task() is an API for a singleton fiber worker object. It serves for not urgent delayed execution of functions. Main purpose - schedule execution of a function, which is going to yield, from a context, where a yield is not allowed. Such as an FFI object's GC callback. It will be used by SWIM and by fio, whose destruction yields, but they need to use ffi.gc hook, where a yield is not allowed. Part of #4727 --- src/lua/fiber.c | 15 +++++++++ src/lua/fiber.lua | 60 ++++++++++++++++++++++++++++++++++ test/app/fiber.result | 72 +++++++++++++++++++++++++++++++++++++++++ test/app/fiber.test.lua | 41 +++++++++++++++++++++++ 4 files changed, 188 insertions(+) diff --git a/src/lua/fiber.c b/src/lua/fiber.c index 575a020d0..ddf827ab6 100644 --- a/src/lua/fiber.c +++ b/src/lua/fiber.c @@ -828,6 +828,19 @@ lbox_fiber_set_joinable(struct lua_State *L) return 0; } +/** + * Alternative to fiber.sleep(infinite) which does not participate + * in an event loop at all until an explicit wakeup. This is less + * overhead. Useful for fibers sleeping most of the time. + */ +static int +lbox_fiber_sleep_infinite(struct lua_State *L) +{ + (void) L; + fiber_yield(); + return 0; +} + static const struct luaL_Reg lbox_fiber_meta [] = { {"id", lbox_fiber_id}, {"name", lbox_fiber_name}, @@ -865,6 +878,8 @@ static const struct luaL_Reg fiberlib[] = { {"new", lbox_fiber_new}, {"status", lbox_fiber_status}, {"name", lbox_fiber_name}, + /* Internal functions, to hide in fiber.lua. */ + {"sleep_infinite", lbox_fiber_sleep_infinite}, {NULL, NULL} }; diff --git a/src/lua/fiber.lua b/src/lua/fiber.lua index 8712ee0d6..d0b765b60 100644 --- a/src/lua/fiber.lua +++ b/src/lua/fiber.lua @@ -34,4 +34,64 @@ fiber.time = fiber_time fiber.time64 = fiber_time64 fiber.clock = fiber_clock fiber.clock64 = fiber_clock64 + +local sleep_infinite = fiber.sleep_infinite +fiber.sleep_infinite = nil + +local worker_next_task = nil +local worker_last_task = nil +local worker_fiber = nil + +-- +-- Worker is a singleton fiber for not urgent delayed execution of +-- functions. Main purpose - schedule execution of a function, +-- which is going to yield, from a context, where a yield is not +-- allowed. Such as an FFI object's GC callback. +-- +local function worker_f() + local task + while true do + while true do + task = worker_next_task + if task then + break + end + sleep_infinite() + end + worker_next_task = task.next + task.f(task.arg) + fiber.sleep(0) + end +end + +local function worker_safe_f() + pcall(worker_f) + -- This fiber is probably canceled and now is not able to + -- sleep, create a new one. + worker_fiber = fiber.new(worker_safe_f) +end + +worker_fiber = fiber.new(worker_safe_f) + +local function worker_schedule_task(f, arg) + local task = {f = f, arg = arg} + if not worker_next_task then + worker_next_task = task + else + worker_last_task.next = task + end + worker_last_task = task + worker_fiber:wakeup() +end + +-- Start from '_' to hide it from auto completion. +fiber._internal = fiber._internal or {} +fiber._internal.schedule_task = worker_schedule_task + +setmetatable(fiber, {__serialize = function(self) + local res = table.copy(self) + res._internal = nil + return setmetatable(res, {}) +end}) + return fiber diff --git a/test/app/fiber.result b/test/app/fiber.result index 6d9604ad8..bd60e1483 100644 --- a/test/app/fiber.result +++ b/test/app/fiber.result @@ -1561,6 +1561,78 @@ fiber.top() --- - error: fiber.top() is disabled. Enable it with fiber.top_enable() first ... +-- +-- fiber._internal.schedule_task() - API for internal usage for +-- delayed execution of a function. +-- +glob_arg = {} +--- +... +count = 0 +--- +... +function task_f(arg) \ + count = count + 1 \ + table.insert(glob_arg, arg) \ + arg = arg + 1 \ + if arg <= 3 then \ + fiber._internal.schedule_task(task_f, arg) \ + else \ + fiber.self():cancel() \ + error('Worker is broken') \ + end \ +end +--- +... +for i = 1, 3 do \ + local csw1 = fiber.info()[fiber.id()].csw \ + fiber._internal.schedule_task(task_f, i) \ + local csw2 = fiber.info()[fiber.id()].csw \ + assert(csw1 == csw2 and csw1 ~= nil) \ +end +--- +... +old_count = count +--- +... +test_run:wait_cond(function() \ + fiber.yield() \ + if count == old_count then \ + return true \ + end \ + old_count = count \ +end) +--- +- true +... +glob_arg +--- +- - 1 + - 2 + - 3 + - 2 + - 3 + - 3 +... +count +--- +- 6 +... +-- Ensure, that after all tasks are finished, the worker didn't +-- stuck somewhere. +glob_arg = nil +--- +... +fiber._internal.schedule_task(function(arg) glob_arg = arg end, 100) +--- +... +fiber.yield() +--- +... +glob_arg +--- +- 100 +... -- cleanup test_run:cmd("clear filter") --- diff --git a/test/app/fiber.test.lua b/test/app/fiber.test.lua index 6df210d9c..f10782d1f 100644 --- a/test/app/fiber.test.lua +++ b/test/app/fiber.test.lua @@ -688,6 +688,47 @@ tbl.time > 0 fiber.top_disable() fiber.top() +-- +-- fiber._internal.schedule_task() - API for internal usage for +-- delayed execution of a function. +-- +glob_arg = {} +count = 0 +function task_f(arg) \ + count = count + 1 \ + table.insert(glob_arg, arg) \ + arg = arg + 1 \ + if arg <= 3 then \ + fiber._internal.schedule_task(task_f, arg) \ + else \ + fiber.self():cancel() \ + error('Worker is broken') \ + end \ +end +for i = 1, 3 do \ + local csw1 = fiber.info()[fiber.id()].csw \ + fiber._internal.schedule_task(task_f, i) \ + local csw2 = fiber.info()[fiber.id()].csw \ + assert(csw1 == csw2 and csw1 ~= nil) \ +end +old_count = count +test_run:wait_cond(function() \ + fiber.yield() \ + if count == old_count then \ + return true \ + end \ + old_count = count \ +end) +glob_arg +count + +-- Ensure, that after all tasks are finished, the worker didn't +-- stuck somewhere. +glob_arg = nil +fiber._internal.schedule_task(function(arg) glob_arg = arg end, 100) +fiber.yield() +glob_arg + -- cleanup test_run:cmd("clear filter") -- 2.21.1 (Apple Git-122.3)