Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladimir Davydov <vdavydov.dev@gmail.com>
To: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Cc: tarantool-patches@freelists.org
Subject: Re: [PATCH 6/8] netbox: introduce fiber-async API
Date: Mon, 23 Apr 2018 19:44:50 +0300	[thread overview]
Message-ID: <20180423164450.yd7o7deawhjxbt7f@esperanza> (raw)
In-Reply-To: <49a50d32a154959aa786ec2a85a4f74792d7ae09.1523903144.git.v.shpilevoy@tarantool.org>

On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
> Now any netbox call blocks a caller-fiber until a result is read
> from a socket, or time is out. To use it asynchronously it is
> necessary to create a fiber per request. Sometimes it is
> unwanted - for example if RPS is very high (for example, about
> 100k), and latency is about 1 second. Or when it is neccessary
> to send multiple requests in paralles and then collect responses
> (map-reduce).
> 
> The patch introduces a new option for all netbox requests:
> is_async. With this option any called netbox method returns
> immediately (but still yields for a moment) a 'future' object.
> 
> By a future object a user can check if the request is finalized,
> get a result or error, wait for a timeout, discard a response.
> 
> Example of is_async usage:
> future = conn:call(func, {params}, {..., is_async = true})
> -- Do some work ...
> if not future.is_ready() then
>     result, err = future:wait_result(timeout)
> end
> -- Or:
> result, error = future:result()
> 
> A future:result() and :wait_result() returns either an error or
> a response in the same format, as the sync versions of the called
> methods.
> 
> Part of #3107
> ---
>  src/box/lua/net_box.lua   | 159 ++++++++++++--
>  test/box/net.box.result   | 519 +++++++++++++++++++++++++++++++++++++++++++++-
>  test/box/net.box.test.lua | 186 ++++++++++++++++-
>  3 files changed, 836 insertions(+), 28 deletions(-)
> 
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 3868cdf1c..96f528963 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -214,12 +214,18 @@ local function create_transport(host, port, user, password, callback,
>      local last_error
>      local state_cond       = fiber.cond() -- signaled when the state changes
>  
> -    -- requests: requests currently 'in flight', keyed by a request id;
> -    -- value refs are weak hence if a client dies unexpectedly,
> -    -- GC cleans the mess. Client submits a request and waits on state_cond.
> -    -- If the reponse arrives within the timeout, the worker wakes
> -    -- client fiber explicitly. Otherwize, wait on state_cond completes and
> -    -- the client reports E_TIMEOUT.
> +    -- Async requests currently 'in flight', keyed by a request
> +    -- id. Value refs are weak hence if a client dies
> +    -- unexpectedly, GC cleans the mess. Client either submits a
> +    -- request and waits on state_cond, OR makes an async request
> +    -- and does not block until a response is received. If the
> +    -- request is not async and the reponse arrives within the
> +    -- timeout, the worker wakes client fiber explicitly.
> +    -- Otherwize, wait on state_cond completes and the client
> +    -- reports E_TIMEOUT.
> +    -- Async request can not be timed out completely. Instead a
> +    -- user must decide when he does not want to wait for
> +    -- response anymore.
>      local requests         = setmetatable({}, { __mode = 'v' })
>      local next_request_id  = 1
>  
> @@ -227,6 +233,94 @@ local function create_transport(host, port, user, password, callback,
>      local send_buf         = buffer.ibuf(buffer.READAHEAD)
>      local recv_buf         = buffer.ibuf(buffer.READAHEAD)
>  
> +    local function wakeup_client(client)
> +        if client and client:status() ~= 'dead' then
> +            client:wakeup()
> +        end
> +    end
> +
> +    --
> +    -- Async request metamethods.
> +    --
> +    local request_index = {}
> +    --
> +    -- When an async request is finalized (with ok or error - no
> +    -- matter), its 'id' field is nullified by a response
> +    -- dispatcher.
> +    --
> +    function request_index:is_ready()
> +        return self.id == nil or worker_fiber == nil
> +    end
> +    --
> +    -- When a request is finished, a result can be got from a
> +    -- future object anytime.
> +    -- @retval result, nil Success, the response is returned.
> +    -- @retval nil, error Error occured.
> +    --
> +    function request_index:result()
> +        if self.errno then
> +            return nil, box.error.new({code = self.errno,
> +                                       reason = self.response})
> +        elseif not self.id then
> +            return self.response
> +        elseif not worker_fiber then
> +            return nil, box.error.new(E_NO_CONNECTION)
> +        else
> +            return nil, box.error.new(box.error.PROC_LUA,
> +                                      'Response is not ready')
> +        end
> +    end
> +    --
> +    -- Wait for a response or error max timeout seconds.
> +    -- @param timeout Max seconds to wait.
> +    -- @retval result, nil Success, the response is returned.
> +    -- @retval nil, error Error occured.
> +    --
> +    function request_index:wait_result(timeout)
> +        if timeout then
> +            if type(timeout) ~= 'number' or timeout < 0 then
> +                error('Usage: future:wait_result(timeout)')
> +            end
> +        else
> +            timeout = TIMEOUT_INFINITY
> +        end
> +        if not self:is_ready() then
> +            -- When a response is ready before timeout, the
> +            -- waiting client is waked up spuriously.
> +            local old_client = self.client
> +            self.client = fiber.self()
> +            while timeout > 0 and not self:is_ready() do
> +                local ts = fiber.clock()
> +                state_cond:wait(timeout)
> +                timeout = timeout - (fiber.clock() - ts)
> +            end
> +            self.client = old_client
> +            if not self:is_ready() then
> +                return nil, box.error.new(E_TIMEOUT)
> +            end
> +            -- It is possible that multiple fibers are waiting for
> +            -- a result. In such a case a first, who got it, must
> +            -- wakeup the previous waiting client. This one wakes
> +            -- up another. Another wakes up third one, etc.
> +            wakeup_client(old_client)

This is rather difficult for understanding IMO. Can we use a fiber.cond
instead?

> +        end
> +        return self:result()
> +    end
> +    --
> +    -- Make a connection forget about the response. When it will
> +    -- be received, it will be ignored.
> +    --
> +    function request_index:discard()
> +        if self.id then
> +            requests[self.id] = nil
> +            self.id = nil
> +            self.errno = box.error.PROC_LUA
> +            self.response = 'Response is discarded'
> +        end
> +    end
> +
> +    local request_mt = { __index = request_index }
> +
>      -- STATE SWITCHING --
>      local function set_state(new_state, new_errno, new_error)
>          state = new_state

  parent reply	other threads:[~2018-04-23 16:44 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-04-16 18:39 [PATCH 0/8] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
2018-04-23 16:19   ` Vladimir Davydov
2018-05-08 15:36   ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
2018-04-23 16:20   ` Vladimir Davydov
2018-05-08 15:37   ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
2018-04-23 16:20   ` Vladimir Davydov
2018-05-08 15:37   ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
2018-04-23 16:42   ` Vladimir Davydov
2018-04-23 18:59     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-24 13:16       ` Vladimir Davydov
2018-05-08 15:49   ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
2018-04-22  5:32   ` [tarantool-patches] " Kirill Yukhin
2018-05-08 15:50   ` Konstantin Osipov
2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy
2018-04-23 12:31   ` [tarantool-patches] " Alexander Turenko
2018-04-23 18:59     ` Vladislav Shpilevoy
2018-04-23 16:44   ` Vladimir Davydov [this message]
2018-04-23 18:59     ` Vladislav Shpilevoy
2018-04-24 13:05       ` Vladimir Davydov
2018-04-16 18:39 ` [PATCH 7/8] netbox: remove schema_version from requests Vladislav Shpilevoy
2018-05-08 16:06   ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 8/8] netbox: implement perform_request via async version Vladislav Shpilevoy
2018-04-23 16:47   ` Vladimir Davydov
2018-04-23 19:00     ` [tarantool-patches] " Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20180423164450.yd7o7deawhjxbt7f@esperanza \
    --to=vdavydov.dev@gmail.com \
    --cc=tarantool-patches@freelists.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [PATCH 6/8] netbox: introduce fiber-async API' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox