Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: vdavydov.dev@gmail.com
Subject: [PATCH 6/8] netbox: introduce fiber-async API
Date: Mon, 16 Apr 2018 21:39:16 +0300	[thread overview]
Message-ID: <49a50d32a154959aa786ec2a85a4f74792d7ae09.1523903144.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1523903144.git.v.shpilevoy@tarantool.org>
In-Reply-To: <cover.1523903144.git.v.shpilevoy@tarantool.org>

Now any netbox call blocks a caller-fiber until a result is read
from a socket, or time is out. To use it asynchronously it is
necessary to create a fiber per request. Sometimes it is
unwanted - for example if RPS is very high (for example, about
100k), and latency is about 1 second. Or when it is neccessary
to send multiple requests in paralles and then collect responses
(map-reduce).

The patch introduces a new option for all netbox requests:
is_async. With this option any called netbox method returns
immediately (but still yields for a moment) a 'future' object.

By a future object a user can check if the request is finalized,
get a result or error, wait for a timeout, discard a response.

Example of is_async usage:
future = conn:call(func, {params}, {..., is_async = true})
-- Do some work ...
if not future.is_ready() then
    result, err = future:wait_result(timeout)
end
-- Or:
result, error = future:result()

A future:result() and :wait_result() returns either an error or
a response in the same format, as the sync versions of the called
methods.

Part of #3107
---
 src/box/lua/net_box.lua   | 159 ++++++++++++--
 test/box/net.box.result   | 519 +++++++++++++++++++++++++++++++++++++++++++++-
 test/box/net.box.test.lua | 186 ++++++++++++++++-
 3 files changed, 836 insertions(+), 28 deletions(-)

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 3868cdf1c..96f528963 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -214,12 +214,18 @@ local function create_transport(host, port, user, password, callback,
     local last_error
     local state_cond       = fiber.cond() -- signaled when the state changes
 
-    -- requests: requests currently 'in flight', keyed by a request id;
-    -- value refs are weak hence if a client dies unexpectedly,
-    -- GC cleans the mess. Client submits a request and waits on state_cond.
-    -- If the reponse arrives within the timeout, the worker wakes
-    -- client fiber explicitly. Otherwize, wait on state_cond completes and
-    -- the client reports E_TIMEOUT.
+    -- Async requests currently 'in flight', keyed by a request
+    -- id. Value refs are weak hence if a client dies
+    -- unexpectedly, GC cleans the mess. Client either submits a
+    -- request and waits on state_cond, OR makes an async request
+    -- and does not block until a response is received. If the
+    -- request is not async and the reponse arrives within the
+    -- timeout, the worker wakes client fiber explicitly.
+    -- Otherwize, wait on state_cond completes and the client
+    -- reports E_TIMEOUT.
+    -- Async request can not be timed out completely. Instead a
+    -- user must decide when he does not want to wait for
+    -- response anymore.
     local requests         = setmetatable({}, { __mode = 'v' })
     local next_request_id  = 1
 
@@ -227,6 +233,94 @@ local function create_transport(host, port, user, password, callback,
     local send_buf         = buffer.ibuf(buffer.READAHEAD)
     local recv_buf         = buffer.ibuf(buffer.READAHEAD)
 
+    local function wakeup_client(client)
+        if client and client:status() ~= 'dead' then
+            client:wakeup()
+        end
+    end
+
+    --
+    -- Async request metamethods.
+    --
+    local request_index = {}
+    --
+    -- When an async request is finalized (with ok or error - no
+    -- matter), its 'id' field is nullified by a response
+    -- dispatcher.
+    --
+    function request_index:is_ready()
+        return self.id == nil or worker_fiber == nil
+    end
+    --
+    -- When a request is finished, a result can be got from a
+    -- future object anytime.
+    -- @retval result, nil Success, the response is returned.
+    -- @retval nil, error Error occured.
+    --
+    function request_index:result()
+        if self.errno then
+            return nil, box.error.new({code = self.errno,
+                                       reason = self.response})
+        elseif not self.id then
+            return self.response
+        elseif not worker_fiber then
+            return nil, box.error.new(E_NO_CONNECTION)
+        else
+            return nil, box.error.new(box.error.PROC_LUA,
+                                      'Response is not ready')
+        end
+    end
+    --
+    -- Wait for a response or error max timeout seconds.
+    -- @param timeout Max seconds to wait.
+    -- @retval result, nil Success, the response is returned.
+    -- @retval nil, error Error occured.
+    --
+    function request_index:wait_result(timeout)
+        if timeout then
+            if type(timeout) ~= 'number' or timeout < 0 then
+                error('Usage: future:wait_result(timeout)')
+            end
+        else
+            timeout = TIMEOUT_INFINITY
+        end
+        if not self:is_ready() then
+            -- When a response is ready before timeout, the
+            -- waiting client is waked up spuriously.
+            local old_client = self.client
+            self.client = fiber.self()
+            while timeout > 0 and not self:is_ready() do
+                local ts = fiber.clock()
+                state_cond:wait(timeout)
+                timeout = timeout - (fiber.clock() - ts)
+            end
+            self.client = old_client
+            if not self:is_ready() then
+                return nil, box.error.new(E_TIMEOUT)
+            end
+            -- It is possible that multiple fibers are waiting for
+            -- a result. In such a case a first, who got it, must
+            -- wakeup the previous waiting client. This one wakes
+            -- up another. Another wakes up third one, etc.
+            wakeup_client(old_client)
+        end
+        return self:result()
+    end
+    --
+    -- Make a connection forget about the response. When it will
+    -- be received, it will be ignored.
+    --
+    function request_index:discard()
+        if self.id then
+            requests[self.id] = nil
+            self.id = nil
+            self.errno = box.error.PROC_LUA
+            self.response = 'Response is discarded'
+        end
+    end
+
+    local request_mt = { __index = request_index }
+
     -- STATE SWITCHING --
     local function set_state(new_state, new_errno, new_error)
         state = new_state
@@ -236,6 +330,7 @@ local function create_transport(host, port, user, password, callback,
         state_cond:broadcast()
         if state == 'error' or state == 'error_reconnect' then
             for _, request in pairs(requests) do
+                request.id = nil
                 request.errno = new_errno
                 request.response = new_error
             end
@@ -315,12 +410,16 @@ local function create_transport(host, port, user, password, callback,
         end
     end
 
-    -- REQUEST/RESPONSE --
-    local function perform_request(timeout, buffer, method, schema_version, ...)
+    --
+    -- Send a request and do not wait for response.
+    -- @retval nil, error Error occured.
+    -- @retval not nil Future object.
+    --
+    local function perform_async_request(buffer, method, schema_version, ...)
         if state ~= 'active' then
-            return last_errno or E_NO_CONNECTION, last_error
+            return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
+                                       reason = last_error})
         end
-        local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
         -- alert worker to notify it of the queued outgoing data;
         -- if the buffer wasn't empty, assume the worker was already alerted
         if send_buf:size() == 0 then
@@ -329,12 +428,27 @@ local function create_transport(host, port, user, password, callback,
         local id = next_request_id
         method_encoder[method](send_buf, id, schema_version, ...)
         next_request_id = next_id(id)
-        local request = table_new(0, 6) -- reserve space for 6 keys
-        request.client = fiber_self()
+        local request = setmetatable(table_new(0, 7), request_mt)
         request.method = method
         request.schema_version = schema_version
         request.buffer = buffer
+        request.id = id
         requests[id] = request
+        return request
+    end
+
+    --
+    -- Send a request and wait for response.
+    --
+    local function perform_request(timeout, buffer, method, schema_version, ...)
+        local request, err =
+            perform_async_request(buffer, method, schema_version, ...)
+        if not request then
+            return last_errno or E_NO_CONNECTION, last_error
+        end
+        request.client = fiber_self()
+        local id = request.id
+        local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
         repeat
             local timeout = max(0, deadline - fiber_clock())
             if not state_cond:wait(timeout) then
@@ -345,12 +459,6 @@ local function create_transport(host, port, user, password, callback,
         return request.errno, request.response
     end
 
-    local function wakeup_client(client)
-        if client:status() ~= 'dead' then
-            client:wakeup()
-        end
-    end
-
     local function dispatch_response_iproto(hdr, body_rpos, body_end)
         local id = hdr[IPROTO_SYNC_KEY]
         local request = requests[id]
@@ -358,6 +466,7 @@ local function create_transport(host, port, user, password, callback,
             return
         end
         requests[id] = nil
+        request.id = nil
         local status = hdr[IPROTO_STATUS_KEY]
         local body, body_end_check
 
@@ -607,7 +716,8 @@ local function create_transport(host, port, user, password, callback,
         stop            = stop,
         start           = start,
         wait_state      = wait_state,
-        perform_request = perform_request
+        perform_request = perform_request,
+        perform_async_request = perform_async_request,
     }
 end
 
@@ -864,8 +974,12 @@ function remote_methods:wait_connected(timeout)
 end
 
 function remote_methods:_request(method, opts, ...)
-    local this_fiber = fiber_self()
     local transport = self._transport
+    local buffer = opts and opts.buffer
+    if opts and opts.is_async then
+        return transport.perform_async_request(buffer, method, 0, ...)
+    end
+    local this_fiber = fiber_self()
     local perform_request = transport.perform_request
     local wait_state = transport.wait_state
     local deadline = nil
@@ -877,7 +991,6 @@ function remote_methods:_request(method, opts, ...)
         -- @deprecated since 1.7.4
         deadline = self._deadlines[this_fiber]
     end
-    local buffer = opts and opts.buffer
     local err, res
     repeat
         local timeout = deadline and max(0, deadline - fiber_clock())
@@ -928,7 +1041,7 @@ function remote_methods:call(func_name, args, opts)
     check_call_args(args)
     args = args or {}
     local res = self:_request('call_17', opts, tostring(func_name), args)
-    if type(res) ~= 'table' then
+    if type(res) ~= 'table' or opts and opts.is_async then
         return res
     end
     return unpack(res)
@@ -945,7 +1058,7 @@ function remote_methods:eval(code, args, opts)
     check_eval_args(args)
     args = args or {}
     local res = self:_request('eval', opts, code, args)
-    if type(res) ~= 'table' then
+    if type(res) ~= 'table' or opts and opts.is_async then
         return res
     end
     return unpack(res)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 6a3713fc0..aaa421ec6 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -2475,9 +2475,6 @@ box.internal.collation.drop('test')
 space:drop()
 ---
 ...
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
----
-...
 c.state
 ---
 - closed
@@ -2485,3 +2482,519 @@ c.state
 c = nil
 ---
 ...
+--
+-- gh-3107: fiber-async netbox.
+--
+f = nil
+---
+...
+function long_function(...) f = fiber.self() fiber.sleep(1000000) return ... end
+---
+...
+s = box.schema.create_space('test')
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1}
+---
+- [1]
+...
+s:replace{2}
+---
+- [2]
+...
+s:replace{3}
+---
+- [3]
+...
+s:replace{4}
+---
+- [4]
+...
+c = net:connect(box.cfg.listen)
+---
+...
+--
+-- Check long connections, multiple wait_result().
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+future:result()
+---
+- null
+- Response is not ready
+...
+future:is_ready()
+---
+- false
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+ret = future:wait_result(0.01)
+---
+...
+future:is_ready()
+---
+- true
+...
+future:wait_result(0.01)
+---
+- [1, 2, 3]
+...
+ret
+---
+- [1, 2, 3]
+...
+_, err = pcall(future.wait_result, future, true)
+---
+...
+err:find('Usage') ~= nil
+---
+- true
+...
+_, err = pcall(future.wait_result, future, '100')
+---
+...
+err:find('Usage') ~= nil
+---
+- true
+...
+--
+-- Check infinity timeout.
+--
+ret = nil
+---
+...
+_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end)
+---
+...
+f:wakeup()
+---
+...
+while not ret do fiber.sleep(0.01) end
+---
+...
+ret
+---
+- [1, 2, 3]
+...
+future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true})
+---
+...
+future:result()
+---
+- null
+- Response is not ready
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+future:wait_result(0.01)
+---
+- [1, 2, 3]
+...
+--
+-- Ensure the request is garbage collected both if is not used and
+-- if is.
+--
+gc_test = setmetatable({}, {__mode = 'v'})
+---
+...
+gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+gc_test.future ~= nil
+---
+- true
+...
+collectgarbage()
+---
+- 0
+...
+gc_test
+---
+- []
+...
+f:wakeup()
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+collectgarbage()
+---
+- 0
+...
+future ~= nil
+---
+- true
+...
+f:wakeup()
+---
+...
+future:wait_result(1000)
+---
+- [1, 2, 3]
+...
+collectgarbage()
+---
+- 0
+...
+future ~= nil
+---
+- true
+...
+gc_test.future = future
+---
+...
+future = nil
+---
+...
+collectgarbage()
+---
+- 0
+...
+gc_test
+---
+- []
+...
+--
+-- Ensure a request can be finalized from non-caller fibers.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+ret = {}
+---
+...
+count = 0
+---
+...
+for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end
+---
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+while count ~= 10 do fiber.sleep(0.1) end
+---
+...
+ret
+---
+- - &0 [1, 2, 3]
+  - *0
+  - *0
+  - *0
+  - *0
+  - *0
+  - *0
+  - *0
+  - *0
+  - *0
+...
+--
+-- Test space methods.
+--
+future = c.space.test:select({1}, {is_async = true})
+---
+...
+ret = future:wait_result(100)
+---
+...
+ret
+---
+- - [1]
+...
+type(ret[1])
+---
+- cdata
+...
+future = c.space.test:insert({5}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5]
+...
+s:get{5}
+---
+- [5]
+...
+future = c.space.test:replace({6}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [6]
+...
+s:get{6}
+---
+- [6]
+...
+future = c.space.test:delete({6}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [6]
+...
+s:get{6}
+---
+...
+future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 5]
+...
+s:get{5}
+---
+- [5, 5]
+...
+future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- null
+...
+s:get{5}
+---
+- [5, 6]
+...
+future = c.space.test:get({5}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 6]
+...
+--
+-- Test index methods.
+--
+future = c.space.test.index.pk:select({1}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- - [1]
+...
+future = c.space.test.index.pk:get({2}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [2]
+...
+future = c.space.test.index.pk:min({}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [1]
+...
+future = c.space.test.index.pk:max({}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 6]
+...
+future = c.space.test.index.pk:count({3}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- 1
+...
+future = c.space.test.index.pk:delete({3}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [3]
+...
+s:get{3}
+---
+...
+future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [4, 6]
+...
+s:get{4}
+---
+- [4, 6]
+...
+--
+-- Test async errors.
+--
+future = c.space.test:insert({1}, {is_async = true})
+---
+...
+future:wait_result()
+---
+- null
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+future:result()
+---
+- null
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+--
+-- Test discard.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+future:discard()
+---
+...
+f:wakeup()
+---
+...
+future:result()
+---
+- null
+- Response is discarded
+...
+future:wait_result(100)
+---
+- null
+- Response is discarded
+...
+--
+-- Test closed connection.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+future2 = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+c:close()
+---
+...
+future2:wait_result(100)
+---
+- null
+- Connection is not established
+...
+future2:result()
+---
+- null
+- Connection is not established
+...
+future2:discard()
+---
+...
+-- Already successful result must be available.
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+future:result()
+---
+- [1, 2, 3]
+...
+future:is_ready()
+---
+- true
+...
+--
+-- Test reconnect.
+--
+c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+---
+...
+while not c:is_connected() do fiber.sleep(0.01) end
+---
+...
+future:wait_result(100)
+---
+- null
+- Peer closed
+...
+future:result()
+---
+- null
+- Peer closed
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+--
+-- Test raw response getting.
+--
+ibuf = require('buffer').ibuf()
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- 10
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- {48: [1, 2, 3]}
+...
+c:close()
+---
+...
+s:drop()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 576b5cfea..82c538fbe 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -1004,8 +1004,190 @@ c.space.test.index.sk.parts
 c:close()
 box.internal.collation.drop('test')
 space:drop()
-
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 c.state
 c = nil
 
+--
+-- gh-3107: fiber-async netbox.
+--
+f = nil
+function long_function(...) f = fiber.self() fiber.sleep(1000000) return ... end
+s = box.schema.create_space('test')
+pk = s:create_index('pk')
+s:replace{1}
+s:replace{2}
+s:replace{3}
+s:replace{4}
+c = net:connect(box.cfg.listen)
+--
+-- Check long connections, multiple wait_result().
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+future:result()
+future:is_ready()
+future:wait_result(0.01)
+f:wakeup()
+ret = future:wait_result(0.01)
+future:is_ready()
+future:wait_result(0.01)
+ret
+
+_, err = pcall(future.wait_result, future, true)
+err:find('Usage') ~= nil
+_, err = pcall(future.wait_result, future, '100')
+err:find('Usage') ~= nil
+
+--
+-- Check infinity timeout.
+--
+ret = nil
+_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end)
+f:wakeup()
+while not ret do fiber.sleep(0.01) end
+ret
+
+future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true})
+future:result()
+future:wait_result(0.01)
+f:wakeup()
+future:wait_result(0.01)
+
+--
+-- Ensure the request is garbage collected both if is not used and
+-- if is.
+--
+gc_test = setmetatable({}, {__mode = 'v'})
+gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true})
+gc_test.future ~= nil
+collectgarbage()
+gc_test
+f:wakeup()
+
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+collectgarbage()
+future ~= nil
+f:wakeup()
+future:wait_result(1000)
+collectgarbage()
+future ~= nil
+gc_test.future = future
+future = nil
+collectgarbage()
+gc_test
+
+--
+-- Ensure a request can be finalized from non-caller fibers.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+ret = {}
+count = 0
+for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end
+future:wait_result(0.01)
+f:wakeup()
+while count ~= 10 do fiber.sleep(0.1) end
+ret
+
+--
+-- Test space methods.
+--
+future = c.space.test:select({1}, {is_async = true})
+ret = future:wait_result(100)
+ret
+type(ret[1])
+future = c.space.test:insert({5}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:replace({6}, {is_async = true})
+future:wait_result(100)
+s:get{6}
+future = c.space.test:delete({6}, {is_async = true})
+future:wait_result(100)
+s:get{6}
+future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:get({5}, {is_async = true})
+future:wait_result(100)
+
+--
+-- Test index methods.
+--
+future = c.space.test.index.pk:select({1}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:get({2}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:min({}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:max({}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:count({3}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:delete({3}, {is_async = true})
+future:wait_result(100)
+s:get{3}
+future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true})
+future:wait_result(100)
+s:get{4}
+
+--
+-- Test async errors.
+--
+future = c.space.test:insert({1}, {is_async = true})
+future:wait_result()
+future:result()
+
+--
+-- Test discard.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+future:discard()
+f:wakeup()
+future:result()
+future:wait_result(100)
+
+--
+-- Test closed connection.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+f:wakeup()
+future:wait_result(100)
+future2 = c:call('long_function', {1, 2, 3}, {is_async = true})
+c:close()
+future2:wait_result(100)
+future2:result()
+future2:discard()
+-- Already successful result must be available.
+future:wait_result(100)
+future:result()
+future:is_ready()
+
+--
+-- Test reconnect.
+--
+c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+while not c:is_connected() do fiber.sleep(0.01) end
+future:wait_result(100)
+future:result()
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+f:wakeup()
+future:wait_result(100)
+
+--
+-- Test raw response getting.
+--
+ibuf = require('buffer').ibuf()
+future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf})
+f:wakeup()
+future:wait_result(100)
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
+c:close()
+s:drop()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-- 
2.15.1 (Apple Git-101)

  parent reply	other threads:[~2018-04-16 18:39 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-04-16 18:39 [PATCH 0/8] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 1/8] lua: fix box.error.raise Vladislav Shpilevoy
2018-04-23 16:19   ` Vladimir Davydov
2018-05-08 15:36   ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 2/8] lua: allow to create and error object with no throw Vladislav Shpilevoy
2018-04-23 16:20   ` Vladimir Davydov
2018-05-08 15:37   ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 3/8] console: fix a bug in interactive readline usage Vladislav Shpilevoy
2018-04-23 16:20   ` Vladimir Davydov
2018-05-08 15:37   ` [tarantool-patches] " Konstantin Osipov
2018-04-16 18:39 ` [PATCH 4/8] netbox: extend codec with 'decode' methods Vladislav Shpilevoy
2018-04-23 16:42   ` Vladimir Davydov
2018-04-23 18:59     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-24 13:16       ` Vladimir Davydov
2018-05-08 15:49   ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 5/8] test: fix unstable test Vladislav Shpilevoy
2018-04-22  5:32   ` [tarantool-patches] " Kirill Yukhin
2018-05-08 15:50   ` Konstantin Osipov
2018-04-16 18:39 ` Vladislav Shpilevoy [this message]
2018-04-23 12:31   ` [tarantool-patches] Re: [PATCH 6/8] netbox: introduce fiber-async API Alexander Turenko
2018-04-23 18:59     ` Vladislav Shpilevoy
2018-04-23 16:44   ` Vladimir Davydov
2018-04-23 18:59     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-24 13:05       ` Vladimir Davydov
2018-04-16 18:39 ` [PATCH 7/8] netbox: remove schema_version from requests Vladislav Shpilevoy
2018-05-08 16:06   ` [tarantool-patches] " Konstantin Osipov
2018-05-08 17:24     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-16 18:39 ` [PATCH 8/8] netbox: implement perform_request via async version Vladislav Shpilevoy
2018-04-23 16:47   ` Vladimir Davydov
2018-04-23 19:00     ` [tarantool-patches] " Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=49a50d32a154959aa786ec2a85a4f74792d7ae09.1523903144.git.v.shpilevoy@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [PATCH 6/8] netbox: introduce fiber-async API' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox