Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: Vladimir Davydov <vdavydov.dev@gmail.com>
Cc: tarantool-patches@freelists.org
Subject: Re: [tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API
Date: Mon, 23 Apr 2018 21:59:29 +0300	[thread overview]
Message-ID: <feb6d53e-9c5b-efc3-032a-d20a126d44e2@tarantool.org> (raw)
In-Reply-To: <20180423164450.yd7o7deawhjxbt7f@esperanza>

Hello. Thanks for review!

On 23/04/2018 19:44, Vladimir Davydov wrote:
> On Mon, Apr 16, 2018 at 09:39:16PM +0300, Vladislav Shpilevoy wrote:
>> Now any netbox call blocks a caller-fiber until a result is read
>> from a socket, or time is out. To use it asynchronously it is
>> necessary to create a fiber per request. Sometimes it is
>> unwanted - for example if RPS is very high (for example, about
>> 100k), and latency is about 1 second. Or when it is neccessary
>> to send multiple requests in paralles and then collect responses
>> (map-reduce).
>>
>> The patch introduces a new option for all netbox requests:
>> is_async. With this option any called netbox method returns
>> immediately (but still yields for a moment) a 'future' object.
>>
>> By a future object a user can check if the request is finalized,
>> get a result or error, wait for a timeout, discard a response.
>>
>> Example of is_async usage:
>> future = conn:call(func, {params}, {..., is_async = true})
>> -- Do some work ...
>> if not future.is_ready() then
>>      result, err = future:wait_result(timeout)
>> end
>> -- Or:
>> result, error = future:result()
>>
>> A future:result() and :wait_result() returns either an error or
>> a response in the same format, as the sync versions of the called
>> methods.
>>
>> Part of #3107
>> +            -- It is possible that multiple fibers are waiting for
>> +            -- a result. In such a case a first, who got it, must
>> +            -- wakeup the previous waiting client. This one wakes
>> +            -- up another. Another wakes up third one, etc.
>> +            wakeup_client(old_client)
> 
> This is rather difficult for understanding IMO. Can we use a fiber.cond
> instead?

Sure, we can. Done on the branch.

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 1f4828a7e..9bbc047d5 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -233,12 +233,6 @@ local function create_transport(host, port, user, password, callback,
      local send_buf         = buffer.ibuf(buffer.READAHEAD)
      local recv_buf         = buffer.ibuf(buffer.READAHEAD)
  
-    local function wakeup_client(client)
-        if client and client:status() ~= 'dead' then
-            client:wakeup()
-        end
-    end
-
      --
      -- Async request metamethods.
      --
@@ -287,22 +281,14 @@ local function create_transport(host, port, user, password, callback,
          if not self:is_ready() then
              -- When a response is ready before timeout, the
              -- waiting client is waked up prematurely.
-            local old_client = self.client
-            self.client = fiber.self()
              while timeout > 0 and not self:is_ready() do
                  local ts = fiber.clock()
-                state_cond:wait(timeout)
+                self.cond:wait(timeout)
                  timeout = timeout - (fiber.clock() - ts)
              end
-            self.client = old_client
              if not self:is_ready() then
                  return nil, box.error.new(E_TIMEOUT)
              end
-            -- It is possible that multiple fibers are waiting for
-            -- a result. In such a case a first, who got it, must
-            -- wakeup the previous waiting client. This one wakes
-            -- up another. Another wakes up third one, etc.
-            wakeup_client(old_client)
          end
          return self:result()
      end
@@ -333,6 +319,7 @@ local function create_transport(host, port, user, password, callback,
                  request.id = nil
                  request.errno = new_errno
                  request.response = new_error
+                request.cond:broadcast()
              end
              requests = {}
          end
@@ -428,11 +415,15 @@ local function create_transport(host, port, user, password, callback,
          local id = next_request_id
          method_encoder[method](send_buf, id, schema_version, ...)
          next_request_id = next_id(id)
+        -- Request has maximum 7 members:
+        -- method, schema_version, buffer, id, cond, errno,
+        -- response.
          local request = setmetatable(table_new(0, 7), request_mt)
          request.method = method
          request.schema_version = schema_version
          request.buffer = buffer
          request.id = id
+        request.cond = fiber.cond()
          requests[id] = request
          return request
      end
@@ -468,7 +459,7 @@ local function create_transport(host, port, user, password, callback,
              assert(body_end == body_end_check, "invalid xrow length")
              request.errno = band(status, IPROTO_ERRNO_MASK)
              request.response = body[IPROTO_ERROR_KEY]
-            wakeup_client(request.client)
+            request.cond:broadcast()
              return
          end
  
@@ -479,7 +470,7 @@ local function create_transport(host, port, user, password, callback,
              local wpos = buffer:alloc(body_len)
              ffi.copy(wpos, body_rpos, body_len)
              request.response = tonumber(body_len)
-            wakeup_client(request.client)
+            request.cond:broadcast()
              return
          end
  
@@ -490,7 +481,7 @@ local function create_transport(host, port, user, password, callback,
              request.response, request.errno =
                  method_decoder[request.method](body[IPROTO_DATA_KEY])
          end
-        wakeup_client(request.client)
+        request.cond:broadcast()
      end
  
      local function new_request_id()
@@ -601,7 +592,7 @@ local function create_transport(host, port, user, password, callback,
              request.id = nil
              requests[rid] = nil
              request.response = response
-            wakeup_client(request.client)
+            request.cond:broadcast()
              return console_sm(next_id(rid))
          end
      end

  reply	other threads:[~2018-04-23 18:59 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-04-16 18:39 [PATCH 0/8] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
2018-04-23 16:19   ` Vladimir Davydov
2018-05-08 15:36   ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
2018-04-23 16:20   ` Vladimir Davydov
2018-05-08 15:37   ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
2018-04-23 16:20   ` Vladimir Davydov
2018-05-08 15:37   ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
2018-04-23 16:42   ` Vladimir Davydov
2018-04-23 18:59     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-24 13:16       ` Vladimir Davydov
2018-05-08 15:49   ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
2018-04-22  5:32   ` [tarantool-patches] " Kirill Yukhin
2018-05-08 15:50   ` Konstantin Osipov
2018-04-16 18:39 ` [PATCH 6/8] netbox: introduce fiber-async API Vladislav Shpilevoy
2018-04-23 12:31   ` [tarantool-patches] " Alexander Turenko
2018-04-23 18:59     ` Vladislav Shpilevoy
2018-04-23 16:44   ` Vladimir Davydov
2018-04-23 18:59     ` Vladislav Shpilevoy [this message]
2018-04-24 13:05       ` [tarantool-patches] " Vladimir Davydov
2018-04-16 18:39 ` [PATCH 7/8] netbox: remove schema_version from requests Vladislav Shpilevoy
2018-05-08 16:06   ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 8/8] netbox: implement perform_request via async version Vladislav Shpilevoy
2018-04-23 16:47   ` Vladimir Davydov
2018-04-23 19:00     ` [tarantool-patches] " Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=feb6d53e-9c5b-efc3-032a-d20a126d44e2@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox