From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Mon, 23 Apr 2018 19:44:50 +0300 From: Vladimir Davydov Subject: Re: [PATCH 6/8] netbox: introduce fiber-async API Message-ID: <20180423164450.yd7o7deawhjxbt7f@esperanza> References: <49a50d32a154959aa786ec2a85a4f74792d7ae09.1523903144.git.v.shpilevoy@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <49a50d32a154959aa786ec2a85a4f74792d7ae09.1523903144.git.v.shpilevoy@tarantool.org> To: Vladislav Shpilevoy Cc: tarantool-patches@freelists.org List-ID: On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote: > Now any netbox call blocks a caller-fiber until a result is read > from a socket, or time is out. To use it asynchronously it is > necessary to create a fiber per request. Sometimes it is > unwanted - for example if RPS is very high (for example, about > 100k), and latency is about 1 second. Or when it is neccessary > to send multiple requests in paralles and then collect responses > (map-reduce). > > The patch introduces a new option for all netbox requests: > is_async. With this option any called netbox method returns > immediately (but still yields for a moment) a 'future' object. > > By a future object a user can check if the request is finalized, > get a result or error, wait for a timeout, discard a response. > > Example of is_async usage: > future = conn:call(func, {params}, {..., is_async = true}) > -- Do some work ... > if not future.is_ready() then > result, err = future:wait_result(timeout) > end > -- Or: > result, error = future:result() > > A future:result() and :wait_result() returns either an error or > a response in the same format, as the sync versions of the called > methods. > > Part of #3107 > --- > src/box/lua/net_box.lua | 159 ++++++++++++-- > test/box/net.box.result | 519 +++++++++++++++++++++++++++++++++++++++++++++- > test/box/net.box.test.lua | 186 ++++++++++++++++- > 3 files changed, 836 insertions(+), 28 deletions(-) > > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua > index 3868cdf1c..96f528963 100644 > --- a/src/box/lua/net_box.lua > +++ b/src/box/lua/net_box.lua > @@ -214,12 +214,18 @@ local function create_transport(host, port, user, password, callback, > local last_error > local state_cond = fiber.cond() -- signaled when the state changes > > - -- requests: requests currently 'in flight', keyed by a request id; > - -- value refs are weak hence if a client dies unexpectedly, > - -- GC cleans the mess. Client submits a request and waits on state_cond. > - -- If the reponse arrives within the timeout, the worker wakes > - -- client fiber explicitly. Otherwize, wait on state_cond completes and > - -- the client reports E_TIMEOUT. > + -- Async requests currently 'in flight', keyed by a request > + -- id. Value refs are weak hence if a client dies > + -- unexpectedly, GC cleans the mess. Client either submits a > + -- request and waits on state_cond, OR makes an async request > + -- and does not block until a response is received. If the > + -- request is not async and the reponse arrives within the > + -- timeout, the worker wakes client fiber explicitly. > + -- Otherwize, wait on state_cond completes and the client > + -- reports E_TIMEOUT. > + -- Async request can not be timed out completely. Instead a > + -- user must decide when he does not want to wait for > + -- response anymore. > local requests = setmetatable({}, { __mode = 'v' }) > local next_request_id = 1 > > @@ -227,6 +233,94 @@ local function create_transport(host, port, user, password, callback, > local send_buf = buffer.ibuf(buffer.READAHEAD) > local recv_buf = buffer.ibuf(buffer.READAHEAD) > > + local function wakeup_client(client) > + if client and client:status() ~= 'dead' then > + client:wakeup() > + end > + end > + > + -- > + -- Async request metamethods. > + -- > + local request_index = {} > + -- > + -- When an async request is finalized (with ok or error - no > + -- matter), its 'id' field is nullified by a response > + -- dispatcher. > + -- > + function request_index:is_ready() > + return self.id == nil or worker_fiber == nil > + end > + -- > + -- When a request is finished, a result can be got from a > + -- future object anytime. > + -- @retval result, nil Success, the response is returned. > + -- @retval nil, error Error occured. > + -- > + function request_index:result() > + if self.errno then > + return nil, box.error.new({code = self.errno, > + reason = self.response}) > + elseif not self.id then > + return self.response > + elseif not worker_fiber then > + return nil, box.error.new(E_NO_CONNECTION) > + else > + return nil, box.error.new(box.error.PROC_LUA, > + 'Response is not ready') > + end > + end > + -- > + -- Wait for a response or error max timeout seconds. > + -- @param timeout Max seconds to wait. > + -- @retval result, nil Success, the response is returned. > + -- @retval nil, error Error occured. > + -- > + function request_index:wait_result(timeout) > + if timeout then > + if type(timeout) ~= 'number' or timeout < 0 then > + error('Usage: future:wait_result(timeout)') > + end > + else > + timeout = TIMEOUT_INFINITY > + end > + if not self:is_ready() then > + -- When a response is ready before timeout, the > + -- waiting client is waked up spuriously. > + local old_client = self.client > + self.client = fiber.self() > + while timeout > 0 and not self:is_ready() do > + local ts = fiber.clock() > + state_cond:wait(timeout) > + timeout = timeout - (fiber.clock() - ts) > + end > + self.client = old_client > + if not self:is_ready() then > + return nil, box.error.new(E_TIMEOUT) > + end > + -- It is possible that multiple fibers are waiting for > + -- a result. In such a case a first, who got it, must > + -- wakeup the previous waiting client. This one wakes > + -- up another. Another wakes up third one, etc. > + wakeup_client(old_client) This is rather difficult for understanding IMO. Can we use a fiber.cond instead? > + end > + return self:result() > + end > + -- > + -- Make a connection forget about the response. When it will > + -- be received, it will be ignored. > + -- > + function request_index:discard() > + if self.id then > + requests[self.id] = nil > + self.id = nil > + self.errno = box.error.PROC_LUA > + self.response = 'Response is discarded' > + end > + end > + > + local request_mt = { __index = request_index } > + > -- STATE SWITCHING -- > local function set_state(new_state, new_errno, new_error) > state = new_state