[tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Mon Apr 23 21:59:29 MSK 2018
Hello. Thanks for review!
On 23/04/2018 19:44, Vladimir Davydov wrote:
> On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
>> Now any netbox call blocks a caller-fiber until a result is read
>> from a socket, or time is out. To use it asynchronously it is
>> necessary to create a fiber per request. Sometimes it is
>> unwanted - for example if RPS is very high (for example, about
>> 100k), and latency is about 1 second. Or when it is neccessary
>> to send multiple requests in paralles and then collect responses
>> (map-reduce).
>>
>> The patch introduces a new option for all netbox requests:
>> is_async. With this option any called netbox method returns
>> immediately (but still yields for a moment) a 'future' object.
>>
>> By a future object a user can check if the request is finalized,
>> get a result or error, wait for a timeout, discard a response.
>>
>> Example of is_async usage:
>> future = conn:call(func, {params}, {..., is_async = true})
>> -- Do some work ...
>> if not future.is_ready() then
>> result, err = future:wait_result(timeout)
>> end
>> -- Or:
>> result, error = future:result()
>>
>> A future:result() and :wait_result() returns either an error or
>> a response in the same format, as the sync versions of the called
>> methods.
>>
>> Part of #3107
>> + -- It is possible that multiple fibers are waiting for
>> + -- a result. In such a case a first, who got it, must
>> + -- wakeup the previous waiting client. This one wakes
>> + -- up another. Another wakes up third one, etc.
>> + wakeup_client(old_client)
>
> This is rather difficult for understanding IMO. Can we use a fiber.cond
> instead?
Sure, we can. Done on the branch.
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 1f4828a7e..9bbc047d5 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -233,12 +233,6 @@ local function create_transport(host, port, user, password, callback,
local send_buf = buffer.ibuf(buffer.READAHEAD)
local recv_buf = buffer.ibuf(buffer.READAHEAD)
- local function wakeup_client(client)
- if client and client:status() ~= 'dead' then
- client:wakeup()
- end
- end
-
--
-- Async request metamethods.
--
@@ -287,22 +281,14 @@ local function create_transport(host, port, user, password, callback,
if not self:is_ready() then
-- When a response is ready before timeout, the
-- waiting client is waked up prematurely.
- local old_client = self.client
- self.client = fiber.self()
while timeout > 0 and not self:is_ready() do
local ts = fiber.clock()
- state_cond:wait(timeout)
+ self.cond:wait(timeout)
timeout = timeout - (fiber.clock() - ts)
end
- self.client = old_client
if not self:is_ready() then
return nil, box.error.new(E_TIMEOUT)
end
- -- It is possible that multiple fibers are waiting for
- -- a result. In such a case a first, who got it, must
- -- wakeup the previous waiting client. This one wakes
- -- up another. Another wakes up third one, etc.
- wakeup_client(old_client)
end
return self:result()
end
@@ -333,6 +319,7 @@ local function create_transport(host, port, user, password, callback,
request.id = nil
request.errno = new_errno
request.response = new_error
+ request.cond:broadcast()
end
requests = {}
end
@@ -428,11 +415,15 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
+ -- Request has maximum 7 members:
+ -- method, schema_version, buffer, id, cond, errno,
+ -- response.
local request = setmetatable(table_new(0, 7), request_mt)
request.method = method
request.schema_version = schema_version
request.buffer = buffer
request.id = id
+ request.cond = fiber.cond()
requests[id] = request
return request
end
@@ -468,7 +459,7 @@ local function create_transport(host, port, user, password, callback,
assert(body_end == body_end_check, "invalid xrow length")
request.errno = band(status, IPROTO_ERRNO_MASK)
request.response = body[IPROTO_ERROR_KEY]
- wakeup_client(request.client)
+ request.cond:broadcast()
return
end
@@ -479,7 +470,7 @@ local function create_transport(host, port, user, password, callback,
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
request.response = tonumber(body_len)
- wakeup_client(request.client)
+ request.cond:broadcast()
return
end
@@ -490,7 +481,7 @@ local function create_transport(host, port, user, password, callback,
request.response, request.errno =
method_decoder[request.method](body[IPROTO_DATA_KEY])
end
- wakeup_client(request.client)
+ request.cond:broadcast()
end
local function new_request_id()
@@ -601,7 +592,7 @@ local function create_transport(host, port, user, password, callback,
request.id = nil
requests[rid] = nil
request.response = response
- wakeup_client(request.client)
+ request.cond:broadcast()
return console_sm(next_id(rid))
end
end
More information about the Tarantool-patches
mailing list