From: Vladimir Davydov <vdavydov.dev@gmail.com> To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> Cc: tarantool-patches@freelists.org Subject: Re: [PATCH 6/8] netbox: introduce fiber-async API Date: Mon, 23 Apr 2018 19:44:50 +0300 [thread overview] Message-ID: <20180423164450.yd7o7deawhjxbt7f@esperanza> (raw) In-Reply-To: <49a50d32a154959aa786ec2a85a4f74792d7ae09.1523903144.git.v.shpilevoy@tarantool.org> On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote: > Now any netbox call blocks a caller-fiber until a result is read > from a socket, or time is out. To use it asynchronously it is > necessary to create a fiber per request. Sometimes it is > unwanted - for example if RPS is very high (for example, about > 100k), and latency is about 1 second. Or when it is neccessary > to send multiple requests in paralles and then collect responses > (map-reduce). > > The patch introduces a new option for all netbox requests: > is_async. With this option any called netbox method returns > immediately (but still yields for a moment) a 'future' object. > > By a future object a user can check if the request is finalized, > get a result or error, wait for a timeout, discard a response. > > Example of is_async usage: > future = conn:call(func, {params}, {..., is_async = true}) > -- Do some work ... > if not future.is_ready() then > result, err = future:wait_result(timeout) > end > -- Or: > result, error = future:result() > > A future:result() and :wait_result() returns either an error or > a response in the same format, as the sync versions of the called > methods. > > Part of #3107 > --- > src/box/lua/net_box.lua | 159 ++++++++++++-- > test/box/net.box.result | 519 +++++++++++++++++++++++++++++++++++++++++++++- > test/box/net.box.test.lua | 186 ++++++++++++++++- > 3 files changed, 836 insertions(+), 28 deletions(-) > > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua > index 3868cdf1c..96f528963 100644 > --- a/src/box/lua/net_box.lua > +++ b/src/box/lua/net_box.lua > @@ -214,12 +214,18 @@ local function create_transport(host, port, user, password, callback, > local last_error > local state_cond = fiber.cond() -- signaled when the state changes > > - -- requests: requests currently 'in flight', keyed by a request id; > - -- value refs are weak hence if a client dies unexpectedly, > - -- GC cleans the mess. Client submits a request and waits on state_cond. > - -- If the reponse arrives within the timeout, the worker wakes > - -- client fiber explicitly. Otherwize, wait on state_cond completes and > - -- the client reports E_TIMEOUT. > + -- Async requests currently 'in flight', keyed by a request > + -- id. Value refs are weak hence if a client dies > + -- unexpectedly, GC cleans the mess. Client either submits a > + -- request and waits on state_cond, OR makes an async request > + -- and does not block until a response is received. If the > + -- request is not async and the reponse arrives within the > + -- timeout, the worker wakes client fiber explicitly. > + -- Otherwize, wait on state_cond completes and the client > + -- reports E_TIMEOUT. > + -- Async request can not be timed out completely. Instead a > + -- user must decide when he does not want to wait for > + -- response anymore. > local requests = setmetatable({}, { __mode = 'v' }) > local next_request_id = 1 > > @@ -227,6 +233,94 @@ local function create_transport(host, port, user, password, callback, > local send_buf = buffer.ibuf(buffer.READAHEAD) > local recv_buf = buffer.ibuf(buffer.READAHEAD) > > + local function wakeup_client(client) > + if client and client:status() ~= 'dead' then > + client:wakeup() > + end > + end > + > + -- > + -- Async request metamethods. > + -- > + local request_index = {} > + -- > + -- When an async request is finalized (with ok or error - no > + -- matter), its 'id' field is nullified by a response > + -- dispatcher. > + -- > + function request_index:is_ready() > + return self.id == nil or worker_fiber == nil > + end > + -- > + -- When a request is finished, a result can be got from a > + -- future object anytime. > + -- @retval result, nil Success, the response is returned. > + -- @retval nil, error Error occured. > + -- > + function request_index:result() > + if self.errno then > + return nil, box.error.new({code = self.errno, > + reason = self.response}) > + elseif not self.id then > + return self.response > + elseif not worker_fiber then > + return nil, box.error.new(E_NO_CONNECTION) > + else > + return nil, box.error.new(box.error.PROC_LUA, > + 'Response is not ready') > + end > + end > + -- > + -- Wait for a response or error max timeout seconds. > + -- @param timeout Max seconds to wait. > + -- @retval result, nil Success, the response is returned. > + -- @retval nil, error Error occured. > + -- > + function request_index:wait_result(timeout) > + if timeout then > + if type(timeout) ~= 'number' or timeout < 0 then > + error('Usage: future:wait_result(timeout)') > + end > + else > + timeout = TIMEOUT_INFINITY > + end > + if not self:is_ready() then > + -- When a response is ready before timeout, the > + -- waiting client is waked up spuriously. > + local old_client = self.client > + self.client = fiber.self() > + while timeout > 0 and not self:is_ready() do > + local ts = fiber.clock() > + state_cond:wait(timeout) > + timeout = timeout - (fiber.clock() - ts) > + end > + self.client = old_client > + if not self:is_ready() then > + return nil, box.error.new(E_TIMEOUT) > + end > + -- It is possible that multiple fibers are waiting for > + -- a result. In such a case a first, who got it, must > + -- wakeup the previous waiting client. This one wakes > + -- up another. Another wakes up third one, etc. > + wakeup_client(old_client) This is rather difficult for understanding IMO. Can we use a fiber.cond instead? > + end > + return self:result() > + end > + -- > + -- Make a connection forget about the response. When it will > + -- be received, it will be ignored. > + -- > + function request_index:discard() > + if self.id then > + requests[self.id] = nil > + self.id = nil > + self.errno = box.error.PROC_LUA > + self.response = 'Response is discarded' > + end > + end > + > + local request_mt = { __index = request_index } > + > -- STATE SWITCHING -- > local function set_state(new_state, new_errno, new_error) > state = new_state
next prev parent reply other threads:[~2018-04-23 16:44 UTC|newest] Thread overview: 32+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-04-16 18:39 [PATCH 0/8] " Vladislav Shpilevoy 2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy 2018-04-23 16:19 ` Vladimir Davydov 2018-05-08 15:36 ` [tarantool-patches] " Konstantin Osipov 2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy 2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy 2018-04-23 16:20 ` Vladimir Davydov 2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov 2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy 2018-04-23 16:20 ` Vladimir Davydov 2018-05-08 15:37 ` [tarantool-patches] " Konstantin Osipov 2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy 2018-04-23 16:42 ` Vladimir Davydov 2018-04-23 18:59 ` [tarantool-patches] " Vladislav Shpilevoy 2018-04-24 13:16 ` Vladimir Davydov 2018-05-08 15:49 ` [tarantool-patches] " Konstantin Osipov 2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy 2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy 2018-04-22 5:32 ` [tarantool-patches] " Kirill Yukhin 2018-05-08 15:50 ` Konstantin Osipov 2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy 2018-04-23 12:31 ` [tarantool-patches] " Alexander Turenko 2018-04-23 18:59 ` Vladislav Shpilevoy 2018-04-23 16:44 ` Vladimir Davydov [this message] 2018-04-23 18:59 ` Vladislav Shpilevoy 2018-04-24 13:05 ` Vladimir Davydov 2018-04-16 18:39 ` [PATCH 7/8] netbox: remove schema_version from requests Vladislav Shpilevoy 2018-05-08 16:06 ` [tarantool-patches] " Konstantin Osipov 2018-05-08 17:24 ` [tarantool-patches] " Vladislav Shpilevoy 2018-04-16 18:39 ` [PATCH 8/8] netbox: implement perform_request via async version Vladislav Shpilevoy 2018-04-23 16:47 ` Vladimir Davydov 2018-04-23 19:00 ` [tarantool-patches] " Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20180423164450.yd7o7deawhjxbt7f@esperanza \ --to=vdavydov.dev@gmail.com \ --cc=tarantool-patches@freelists.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [PATCH 6/8] netbox: introduce fiber-async API' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox