[PATCH 6/8] netbox: introduce fiber-async API
Vladimir Davydov
vdavydov.dev at gmail.com
Mon Apr 23 19:44:50 MSK 2018
On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
> Now any netbox call blocks a caller-fiber until a result is read
> from a socket, or time is out. To use it asynchronously it is
> necessary to create a fiber per request. Sometimes it is
> unwanted - for example if RPS is very high (for example, about
> 100k), and latency is about 1 second. Or when it is neccessary
> to send multiple requests in paralles and then collect responses
> (map-reduce).
>
> The patch introduces a new option for all netbox requests:
> is_async. With this option any called netbox method returns
> immediately (but still yields for a moment) a 'future' object.
>
> By a future object a user can check if the request is finalized,
> get a result or error, wait for a timeout, discard a response.
>
> Example of is_async usage:
> future = conn:call(func, {params}, {..., is_async = true})
> -- Do some work ...
> if not future.is_ready() then
> result, err = future:wait_result(timeout)
> end
> -- Or:
> result, error = future:result()
>
> A future:result() and :wait_result() returns either an error or
> a response in the same format, as the sync versions of the called
> methods.
>
> Part of #3107
> ---
> src/box/lua/net_box.lua | 159 ++++++++++++--
> test/box/net.box.result | 519 +++++++++++++++++++++++++++++++++++++++++++++-
> test/box/net.box.test.lua | 186 ++++++++++++++++-
> 3 files changed, 836 insertions(+), 28 deletions(-)
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 3868cdf1c..96f528963 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -214,12 +214,18 @@ local function create_transport(host, port, user, password, callback,
> local last_error
> local state_cond = fiber.cond() -- signaled when the state changes
>
> - -- requests: requests currently 'in flight', keyed by a request id;
> - -- value refs are weak hence if a client dies unexpectedly,
> - -- GC cleans the mess. Client submits a request and waits on state_cond.
> - -- If the reponse arrives within the timeout, the worker wakes
> - -- client fiber explicitly. Otherwize, wait on state_cond completes and
> - -- the client reports E_TIMEOUT.
> + -- Async requests currently 'in flight', keyed by a request
> + -- id. Value refs are weak hence if a client dies
> + -- unexpectedly, GC cleans the mess. Client either submits a
> + -- request and waits on state_cond, OR makes an async request
> + -- and does not block until a response is received. If the
> + -- request is not async and the reponse arrives within the
> + -- timeout, the worker wakes client fiber explicitly.
> + -- Otherwize, wait on state_cond completes and the client
> + -- reports E_TIMEOUT.
> + -- Async request can not be timed out completely. Instead a
> + -- user must decide when he does not want to wait for
> + -- response anymore.
> local requests = setmetatable({}, { __mode = 'v' })
> local next_request_id = 1
>
> @@ -227,6 +233,94 @@ local function create_transport(host, port, user, password, callback,
> local send_buf = buffer.ibuf(buffer.READAHEAD)
> local recv_buf = buffer.ibuf(buffer.READAHEAD)
>
> + local function wakeup_client(client)
> + if client and client:status() ~= 'dead' then
> + client:wakeup()
> + end
> + end
> +
> + --
> + -- Async request metamethods.
> + --
> + local request_index = {}
> + --
> + -- When an async request is finalized (with ok or error - no
> + -- matter), its 'id' field is nullified by a response
> + -- dispatcher.
> + --
> + function request_index:is_ready()
> + return self.id == nil or worker_fiber == nil
> + end
> + --
> + -- When a request is finished, a result can be got from a
> + -- future object anytime.
> + -- @retval result, nil Success, the response is returned.
> + -- @retval nil, error Error occured.
> + --
> + function request_index:result()
> + if self.errno then
> + return nil, box.error.new({code = self.errno,
> + reason = self.response})
> + elseif not self.id then
> + return self.response
> + elseif not worker_fiber then
> + return nil, box.error.new(E_NO_CONNECTION)
> + else
> + return nil, box.error.new(box.error.PROC_LUA,
> + 'Response is not ready')
> + end
> + end
> + --
> + -- Wait for a response or error max timeout seconds.
> + -- @param timeout Max seconds to wait.
> + -- @retval result, nil Success, the response is returned.
> + -- @retval nil, error Error occured.
> + --
> + function request_index:wait_result(timeout)
> + if timeout then
> + if type(timeout) ~= 'number' or timeout < 0 then
> + error('Usage: future:wait_result(timeout)')
> + end
> + else
> + timeout = TIMEOUT_INFINITY
> + end
> + if not self:is_ready() then
> + -- When a response is ready before timeout, the
> + -- waiting client is waked up spuriously.
> + local old_client = self.client
> + self.client = fiber.self()
> + while timeout > 0 and not self:is_ready() do
> + local ts = fiber.clock()
> + state_cond:wait(timeout)
> + timeout = timeout - (fiber.clock() - ts)
> + end
> + self.client = old_client
> + if not self:is_ready() then
> + return nil, box.error.new(E_TIMEOUT)
> + end
> + -- It is possible that multiple fibers are waiting for
> + -- a result. In such a case a first, who got it, must
> + -- wakeup the previous waiting client. This one wakes
> + -- up another. Another wakes up third one, etc.
> + wakeup_client(old_client)
This is rather difficult for understanding IMO. Can we use a fiber.cond
instead?
> + end
> + return self:result()
> + end
> + --
> + -- Make a connection forget about the response. When it will
> + -- be received, it will be ignored.
> + --
> + function request_index:discard()
> + if self.id then
> + requests[self.id] = nil
> + self.id = nil
> + self.errno = box.error.PROC_LUA
> + self.response = 'Response is discarded'
> + end
> + end
> +
> + local request_mt = { __index = request_index }
> +
> -- STATE SWITCHING --
> local function set_state(new_state, new_errno, new_error)
> state = new_state
More information about the Tarantool-patches
mailing list