[PATCH 6/8] netbox: introduce fiber-async API
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Mon Apr 16 21:39:16 MSK 2018
Now any netbox call blocks a caller-fiber until a result is read
from a socket, or time is out. To use it asynchronously it is
necessary to create a fiber per request. Sometimes it is
unwanted - for example if RPS is very high (for example, about
100k), and latency is about 1 second. Or when it is neccessary
to send multiple requests in paralles and then collect responses
(map-reduce).
The patch introduces a new option for all netbox requests:
is_async. With this option any called netbox method returns
immediately (but still yields for a moment) a 'future' object.
By a future object a user can check if the request is finalized,
get a result or error, wait for a timeout, discard a response.
Example of is_async usage:
future = conn:call(func, {params}, {..., is_async = true})
-- Do some work ...
if not future.is_ready() then
result, err = future:wait_result(timeout)
end
-- Or:
result, error = future:result()
A future:result() and :wait_result() returns either an error or
a response in the same format, as the sync versions of the called
methods.
Part of #3107
---
src/box/lua/net_box.lua | 159 ++++++++++++--
test/box/net.box.result | 519 +++++++++++++++++++++++++++++++++++++++++++++-
test/box/net.box.test.lua | 186 ++++++++++++++++-
3 files changed, 836 insertions(+), 28 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 3868cdf1c..96f528963 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -214,12 +214,18 @@ local function create_transport(host, port, user, password, callback,
local last_error
local state_cond = fiber.cond() -- signaled when the state changes
- -- requests: requests currently 'in flight', keyed by a request id;
- -- value refs are weak hence if a client dies unexpectedly,
- -- GC cleans the mess. Client submits a request and waits on state_cond.
- -- If the reponse arrives within the timeout, the worker wakes
- -- client fiber explicitly. Otherwize, wait on state_cond completes and
- -- the client reports E_TIMEOUT.
+ -- Async requests currently 'in flight', keyed by a request
+ -- id. Value refs are weak hence if a client dies
+ -- unexpectedly, GC cleans the mess. Client either submits a
+ -- request and waits on state_cond, OR makes an async request
+ -- and does not block until a response is received. If the
+ -- request is not async and the reponse arrives within the
+ -- timeout, the worker wakes client fiber explicitly.
+ -- Otherwize, wait on state_cond completes and the client
+ -- reports E_TIMEOUT.
+ -- Async request can not be timed out completely. Instead a
+ -- user must decide when he does not want to wait for
+ -- response anymore.
local requests = setmetatable({}, { __mode = 'v' })
local next_request_id = 1
@@ -227,6 +233,94 @@ local function create_transport(host, port, user, password, callback,
local send_buf = buffer.ibuf(buffer.READAHEAD)
local recv_buf = buffer.ibuf(buffer.READAHEAD)
+ local function wakeup_client(client)
+ if client and client:status() ~= 'dead' then
+ client:wakeup()
+ end
+ end
+
+ --
+ -- Async request metamethods.
+ --
+ local request_index = {}
+ --
+ -- When an async request is finalized (with ok or error - no
+ -- matter), its 'id' field is nullified by a response
+ -- dispatcher.
+ --
+ function request_index:is_ready()
+ return self.id == nil or worker_fiber == nil
+ end
+ --
+ -- When a request is finished, a result can be got from a
+ -- future object anytime.
+ -- @retval result, nil Success, the response is returned.
+ -- @retval nil, error Error occured.
+ --
+ function request_index:result()
+ if self.errno then
+ return nil, box.error.new({code = self.errno,
+ reason = self.response})
+ elseif not self.id then
+ return self.response
+ elseif not worker_fiber then
+ return nil, box.error.new(E_NO_CONNECTION)
+ else
+ return nil, box.error.new(box.error.PROC_LUA,
+ 'Response is not ready')
+ end
+ end
+ --
+ -- Wait for a response or error max timeout seconds.
+ -- @param timeout Max seconds to wait.
+ -- @retval result, nil Success, the response is returned.
+ -- @retval nil, error Error occured.
+ --
+ function request_index:wait_result(timeout)
+ if timeout then
+ if type(timeout) ~= 'number' or timeout < 0 then
+ error('Usage: future:wait_result(timeout)')
+ end
+ else
+ timeout = TIMEOUT_INFINITY
+ end
+ if not self:is_ready() then
+ -- When a response is ready before timeout, the
+ -- waiting client is waked up spuriously.
+ local old_client = self.client
+ self.client = fiber.self()
+ while timeout > 0 and not self:is_ready() do
+ local ts = fiber.clock()
+ state_cond:wait(timeout)
+ timeout = timeout - (fiber.clock() - ts)
+ end
+ self.client = old_client
+ if not self:is_ready() then
+ return nil, box.error.new(E_TIMEOUT)
+ end
+ -- It is possible that multiple fibers are waiting for
+ -- a result. In such a case a first, who got it, must
+ -- wakeup the previous waiting client. This one wakes
+ -- up another. Another wakes up third one, etc.
+ wakeup_client(old_client)
+ end
+ return self:result()
+ end
+ --
+ -- Make a connection forget about the response. When it will
+ -- be received, it will be ignored.
+ --
+ function request_index:discard()
+ if self.id then
+ requests[self.id] = nil
+ self.id = nil
+ self.errno = box.error.PROC_LUA
+ self.response = 'Response is discarded'
+ end
+ end
+
+ local request_mt = { __index = request_index }
+
-- STATE SWITCHING --
local function set_state(new_state, new_errno, new_error)
state = new_state
@@ -236,6 +330,7 @@ local function create_transport(host, port, user, password, callback,
state_cond:broadcast()
if state == 'error' or state == 'error_reconnect' then
for _, request in pairs(requests) do
+ request.id = nil
request.errno = new_errno
request.response = new_error
end
@@ -315,12 +410,16 @@ local function create_transport(host, port, user, password, callback,
end
end
- -- REQUEST/RESPONSE --
- local function perform_request(timeout, buffer, method, schema_version, ...)
+ --
+ -- Send a request and do not wait for response.
+ -- @retval nil, error Error occured.
+ -- @retval not nil Future object.
+ --
+ local function perform_async_request(buffer, method, schema_version, ...)
if state ~= 'active' then
- return last_errno or E_NO_CONNECTION, last_error
+ return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
+ reason = last_error})
end
- local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
-- alert worker to notify it of the queued outgoing data;
-- if the buffer wasn't empty, assume the worker was already alerted
if send_buf:size() == 0 then
@@ -329,12 +428,27 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
- local request = table_new(0, 6) -- reserve space for 6 keys
- request.client = fiber_self()
+ local request = setmetatable(table_new(0, 7), request_mt)
request.method = method
request.schema_version = schema_version
request.buffer = buffer
+ request.id = id
requests[id] = request
+ return request
+ end
+
+ --
+ -- Send a request and wait for response.
+ --
+ local function perform_request(timeout, buffer, method, schema_version, ...)
+ local request, err =
+ perform_async_request(buffer, method, schema_version, ...)
+ if not request then
+ return last_errno or E_NO_CONNECTION, last_error
+ end
+ request.client = fiber_self()
+ local id = request.id
+ local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
repeat
local timeout = max(0, deadline - fiber_clock())
if not state_cond:wait(timeout) then
@@ -345,12 +459,6 @@ local function create_transport(host, port, user, password, callback,
return request.errno, request.response
end
- local function wakeup_client(client)
- if client:status() ~= 'dead' then
- client:wakeup()
- end
- end
-
local function dispatch_response_iproto(hdr, body_rpos, body_end)
local id = hdr[IPROTO_SYNC_KEY]
local request = requests[id]
@@ -358,6 +466,7 @@ local function create_transport(host, port, user, password, callback,
return
end
requests[id] = nil
+ request.id = nil
local status = hdr[IPROTO_STATUS_KEY]
local body, body_end_check
@@ -607,7 +716,8 @@ local function create_transport(host, port, user, password, callback,
stop = stop,
start = start,
wait_state = wait_state,
- perform_request = perform_request
+ perform_request = perform_request,
+ perform_async_request = perform_async_request,
}
end
@@ -864,8 +974,12 @@ function remote_methods:wait_connected(timeout)
end
function remote_methods:_request(method, opts, ...)
- local this_fiber = fiber_self()
local transport = self._transport
+ local buffer = opts and opts.buffer
+ if opts and opts.is_async then
+ return transport.perform_async_request(buffer, method, 0, ...)
+ end
+ local this_fiber = fiber_self()
local perform_request = transport.perform_request
local wait_state = transport.wait_state
local deadline = nil
@@ -877,7 +991,6 @@ function remote_methods:_request(method, opts, ...)
-- @deprecated since 1.7.4
deadline = self._deadlines[this_fiber]
end
- local buffer = opts and opts.buffer
local err, res
repeat
local timeout = deadline and max(0, deadline - fiber_clock())
@@ -928,7 +1041,7 @@ function remote_methods:call(func_name, args, opts)
check_call_args(args)
args = args or {}
local res = self:_request('call_17', opts, tostring(func_name), args)
- if type(res) ~= 'table' then
+ if type(res) ~= 'table' or opts and opts.is_async then
return res
end
return unpack(res)
@@ -945,7 +1058,7 @@ function remote_methods:eval(code, args, opts)
check_eval_args(args)
args = args or {}
local res = self:_request('eval', opts, code, args)
- if type(res) ~= 'table' then
+ if type(res) ~= 'table' or opts and opts.is_async then
return res
end
return unpack(res)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 6a3713fc0..aaa421ec6 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -2475,9 +2475,6 @@ box.internal.collation.drop('test')
space:drop()
---
...
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
----
-...
c.state
---
- closed
@@ -2485,3 +2482,519 @@ c.state
c = nil
---
...
+--
+-- gh-3107: fiber-async netbox.
+--
+f = nil
+---
+...
+function long_function(...) f = fiber.self() fiber.sleep(1000000) return ... end
+---
+...
+s = box.schema.create_space('test')
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1}
+---
+- [1]
+...
+s:replace{2}
+---
+- [2]
+...
+s:replace{3}
+---
+- [3]
+...
+s:replace{4}
+---
+- [4]
+...
+c = net:connect(box.cfg.listen)
+---
+...
+--
+-- Check long connections, multiple wait_result().
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+future:result()
+---
+- null
+- Response is not ready
+...
+future:is_ready()
+---
+- false
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+ret = future:wait_result(0.01)
+---
+...
+future:is_ready()
+---
+- true
+...
+future:wait_result(0.01)
+---
+- [1, 2, 3]
+...
+ret
+---
+- [1, 2, 3]
+...
+_, err = pcall(future.wait_result, future, true)
+---
+...
+err:find('Usage') ~= nil
+---
+- true
+...
+_, err = pcall(future.wait_result, future, '100')
+---
+...
+err:find('Usage') ~= nil
+---
+- true
+...
+--
+-- Check infinity timeout.
+--
+ret = nil
+---
+...
+_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end)
+---
+...
+f:wakeup()
+---
+...
+while not ret do fiber.sleep(0.01) end
+---
+...
+ret
+---
+- [1, 2, 3]
+...
+future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true})
+---
+...
+future:result()
+---
+- null
+- Response is not ready
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+future:wait_result(0.01)
+---
+- [1, 2, 3]
+...
+--
+-- Ensure the request is garbage collected both if is not used and
+-- if is.
+--
+gc_test = setmetatable({}, {__mode = 'v'})
+---
+...
+gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+gc_test.future ~= nil
+---
+- true
+...
+collectgarbage()
+---
+- 0
+...
+gc_test
+---
+- []
+...
+f:wakeup()
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+collectgarbage()
+---
+- 0
+...
+future ~= nil
+---
+- true
+...
+f:wakeup()
+---
+...
+future:wait_result(1000)
+---
+- [1, 2, 3]
+...
+collectgarbage()
+---
+- 0
+...
+future ~= nil
+---
+- true
+...
+gc_test.future = future
+---
+...
+future = nil
+---
+...
+collectgarbage()
+---
+- 0
+...
+gc_test
+---
+- []
+...
+--
+-- Ensure a request can be finalized from non-caller fibers.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+ret = {}
+---
+...
+count = 0
+---
+...
+for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end
+---
+...
+future:wait_result(0.01)
+---
+- null
+- Timeout exceeded
+...
+f:wakeup()
+---
+...
+while count ~= 10 do fiber.sleep(0.1) end
+---
+...
+ret
+---
+- - &0 [1, 2, 3]
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+ - *0
+...
+--
+-- Test space methods.
+--
+future = c.space.test:select({1}, {is_async = true})
+---
+...
+ret = future:wait_result(100)
+---
+...
+ret
+---
+- - [1]
+...
+type(ret[1])
+---
+- cdata
+...
+future = c.space.test:insert({5}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5]
+...
+s:get{5}
+---
+- [5]
+...
+future = c.space.test:replace({6}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [6]
+...
+s:get{6}
+---
+- [6]
+...
+future = c.space.test:delete({6}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [6]
+...
+s:get{6}
+---
+...
+future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 5]
+...
+s:get{5}
+---
+- [5, 5]
+...
+future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- null
+...
+s:get{5}
+---
+- [5, 6]
+...
+future = c.space.test:get({5}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 6]
+...
+--
+-- Test index methods.
+--
+future = c.space.test.index.pk:select({1}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- - [1]
+...
+future = c.space.test.index.pk:get({2}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [2]
+...
+future = c.space.test.index.pk:min({}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [1]
+...
+future = c.space.test.index.pk:max({}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [5, 6]
+...
+future = c.space.test.index.pk:count({3}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- 1
+...
+future = c.space.test.index.pk:delete({3}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [3]
+...
+s:get{3}
+---
+...
+future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true})
+---
+...
+future:wait_result(100)
+---
+- [4, 6]
+...
+s:get{4}
+---
+- [4, 6]
+...
+--
+-- Test async errors.
+--
+future = c.space.test:insert({1}, {is_async = true})
+---
+...
+future:wait_result()
+---
+- null
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+future:result()
+---
+- null
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+--
+-- Test discard.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+future:discard()
+---
+...
+f:wakeup()
+---
+...
+future:result()
+---
+- null
+- Response is discarded
+...
+future:wait_result(100)
+---
+- null
+- Response is discarded
+...
+--
+-- Test closed connection.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+future2 = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+c:close()
+---
+...
+future2:wait_result(100)
+---
+- null
+- Connection is not established
+...
+future2:result()
+---
+- null
+- Connection is not established
+...
+future2:discard()
+---
+...
+-- Already successful result must be available.
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+future:result()
+---
+- [1, 2, 3]
+...
+future:is_ready()
+---
+- true
+...
+--
+-- Test reconnect.
+--
+c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+---
+...
+while not c:is_connected() do fiber.sleep(0.01) end
+---
+...
+future:wait_result(100)
+---
+- null
+- Peer closed
+...
+future:result()
+---
+- null
+- Peer closed
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- [1, 2, 3]
+...
+--
+-- Test raw response getting.
+--
+ibuf = require('buffer').ibuf()
+---
+...
+future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf})
+---
+...
+f:wakeup()
+---
+...
+future:wait_result(100)
+---
+- 10
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- {48: [1, 2, 3]}
+...
+c:close()
+---
+...
+s:drop()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 576b5cfea..82c538fbe 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -1004,8 +1004,190 @@ c.space.test.index.sk.parts
c:close()
box.internal.collation.drop('test')
space:drop()
-
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
c.state
c = nil
+--
+-- gh-3107: fiber-async netbox.
+--
+f = nil
+function long_function(...) f = fiber.self() fiber.sleep(1000000) return ... end
+s = box.schema.create_space('test')
+pk = s:create_index('pk')
+s:replace{1}
+s:replace{2}
+s:replace{3}
+s:replace{4}
+c = net:connect(box.cfg.listen)
+--
+-- Check long connections, multiple wait_result().
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+future:result()
+future:is_ready()
+future:wait_result(0.01)
+f:wakeup()
+ret = future:wait_result(0.01)
+future:is_ready()
+future:wait_result(0.01)
+ret
+
+_, err = pcall(future.wait_result, future, true)
+err:find('Usage') ~= nil
+_, err = pcall(future.wait_result, future, '100')
+err:find('Usage') ~= nil
+
+--
+-- Check infinity timeout.
+--
+ret = nil
+_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end)
+f:wakeup()
+while not ret do fiber.sleep(0.01) end
+ret
+
+future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true})
+future:result()
+future:wait_result(0.01)
+f:wakeup()
+future:wait_result(0.01)
+
+--
+-- Ensure the request is garbage collected both if is not used and
+-- if is.
+--
+gc_test = setmetatable({}, {__mode = 'v'})
+gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true})
+gc_test.future ~= nil
+collectgarbage()
+gc_test
+f:wakeup()
+
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+collectgarbage()
+future ~= nil
+f:wakeup()
+future:wait_result(1000)
+collectgarbage()
+future ~= nil
+gc_test.future = future
+future = nil
+collectgarbage()
+gc_test
+
+--
+-- Ensure a request can be finalized from non-caller fibers.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+ret = {}
+count = 0
+for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end
+future:wait_result(0.01)
+f:wakeup()
+while count ~= 10 do fiber.sleep(0.1) end
+ret
+
+--
+-- Test space methods.
+--
+future = c.space.test:select({1}, {is_async = true})
+ret = future:wait_result(100)
+ret
+type(ret[1])
+future = c.space.test:insert({5}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:replace({6}, {is_async = true})
+future:wait_result(100)
+s:get{6}
+future = c.space.test:delete({6}, {is_async = true})
+future:wait_result(100)
+s:get{6}
+future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true})
+future:wait_result(100)
+s:get{5}
+future = c.space.test:get({5}, {is_async = true})
+future:wait_result(100)
+
+--
+-- Test index methods.
+--
+future = c.space.test.index.pk:select({1}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:get({2}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:min({}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:max({}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:count({3}, {is_async = true})
+future:wait_result(100)
+future = c.space.test.index.pk:delete({3}, {is_async = true})
+future:wait_result(100)
+s:get{3}
+future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true})
+future:wait_result(100)
+s:get{4}
+
+--
+-- Test async errors.
+--
+future = c.space.test:insert({1}, {is_async = true})
+future:wait_result()
+future:result()
+
+--
+-- Test discard.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+future:discard()
+f:wakeup()
+future:result()
+future:wait_result(100)
+
+--
+-- Test closed connection.
+--
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+f:wakeup()
+future:wait_result(100)
+future2 = c:call('long_function', {1, 2, 3}, {is_async = true})
+c:close()
+future2:wait_result(100)
+future2:result()
+future2:discard()
+-- Already successful result must be available.
+future:wait_result(100)
+future:result()
+future:is_ready()
+
+--
+-- Test reconnect.
+--
+c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+while not c:is_connected() do fiber.sleep(0.01) end
+future:wait_result(100)
+future:result()
+future = c:call('long_function', {1, 2, 3}, {is_async = true})
+f:wakeup()
+future:wait_result(100)
+
+--
+-- Test raw response getting.
+--
+ibuf = require('buffer').ibuf()
+future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf})
+f:wakeup()
+future:wait_result(100)
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
+c:close()
+s:drop()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
--
2.15.1 (Apple Git-101)
More information about the Tarantool-patches
mailing list