From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH 6/8] netbox: introduce fiber-async API Date: Mon, 16 Apr 2018 21:39:16 +0300 Message-Id: <49a50d32a154959aa786ec2a85a4f74792d7ae09.1523903144.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com List-ID: Now any netbox call blocks a caller-fiber until a result is read from a socket, or time is out. To use it asynchronously it is necessary to create a fiber per request. Sometimes it is unwanted - for example if RPS is very high (for example, about 100k), and latency is about 1 second. Or when it is neccessary to send multiple requests in paralles and then collect responses (map-reduce). The patch introduces a new option for all netbox requests: is_async. With this option any called netbox method returns immediately (but still yields for a moment) a 'future' object. By a future object a user can check if the request is finalized, get a result or error, wait for a timeout, discard a response. Example of is_async usage: future = conn:call(func, {params}, {..., is_async = true}) -- Do some work ... if not future.is_ready() then result, err = future:wait_result(timeout) end -- Or: result, error = future:result() A future:result() and :wait_result() returns either an error or a response in the same format, as the sync versions of the called methods. Part of #3107 --- src/box/lua/net_box.lua | 159 ++++++++++++-- test/box/net.box.result | 519 +++++++++++++++++++++++++++++++++++++++++++++- test/box/net.box.test.lua | 186 ++++++++++++++++- 3 files changed, 836 insertions(+), 28 deletions(-) diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 3868cdf1c..96f528963 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -214,12 +214,18 @@ local function create_transport(host, port, user, password, callback, local last_error local state_cond = fiber.cond() -- signaled when the state changes - -- requests: requests currently 'in flight', keyed by a request id; - -- value refs are weak hence if a client dies unexpectedly, - -- GC cleans the mess. Client submits a request and waits on state_cond. - -- If the reponse arrives within the timeout, the worker wakes - -- client fiber explicitly. Otherwize, wait on state_cond completes and - -- the client reports E_TIMEOUT. + -- Async requests currently 'in flight', keyed by a request + -- id. Value refs are weak hence if a client dies + -- unexpectedly, GC cleans the mess. Client either submits a + -- request and waits on state_cond, OR makes an async request + -- and does not block until a response is received. If the + -- request is not async and the reponse arrives within the + -- timeout, the worker wakes client fiber explicitly. + -- Otherwize, wait on state_cond completes and the client + -- reports E_TIMEOUT. + -- Async request can not be timed out completely. Instead a + -- user must decide when he does not want to wait for + -- response anymore. local requests = setmetatable({}, { __mode = 'v' }) local next_request_id = 1 @@ -227,6 +233,94 @@ local function create_transport(host, port, user, password, callback, local send_buf = buffer.ibuf(buffer.READAHEAD) local recv_buf = buffer.ibuf(buffer.READAHEAD) + local function wakeup_client(client) + if client and client:status() ~= 'dead' then + client:wakeup() + end + end + + -- + -- Async request metamethods. + -- + local request_index = {} + -- + -- When an async request is finalized (with ok or error - no + -- matter), its 'id' field is nullified by a response + -- dispatcher. + -- + function request_index:is_ready() + return self.id == nil or worker_fiber == nil + end + -- + -- When a request is finished, a result can be got from a + -- future object anytime. + -- @retval result, nil Success, the response is returned. + -- @retval nil, error Error occured. + -- + function request_index:result() + if self.errno then + return nil, box.error.new({code = self.errno, + reason = self.response}) + elseif not self.id then + return self.response + elseif not worker_fiber then + return nil, box.error.new(E_NO_CONNECTION) + else + return nil, box.error.new(box.error.PROC_LUA, + 'Response is not ready') + end + end + -- + -- Wait for a response or error max timeout seconds. + -- @param timeout Max seconds to wait. + -- @retval result, nil Success, the response is returned. + -- @retval nil, error Error occured. + -- + function request_index:wait_result(timeout) + if timeout then + if type(timeout) ~= 'number' or timeout < 0 then + error('Usage: future:wait_result(timeout)') + end + else + timeout = TIMEOUT_INFINITY + end + if not self:is_ready() then + -- When a response is ready before timeout, the + -- waiting client is waked up spuriously. + local old_client = self.client + self.client = fiber.self() + while timeout > 0 and not self:is_ready() do + local ts = fiber.clock() + state_cond:wait(timeout) + timeout = timeout - (fiber.clock() - ts) + end + self.client = old_client + if not self:is_ready() then + return nil, box.error.new(E_TIMEOUT) + end + -- It is possible that multiple fibers are waiting for + -- a result. In such a case a first, who got it, must + -- wakeup the previous waiting client. This one wakes + -- up another. Another wakes up third one, etc. + wakeup_client(old_client) + end + return self:result() + end + -- + -- Make a connection forget about the response. When it will + -- be received, it will be ignored. + -- + function request_index:discard() + if self.id then + requests[self.id] = nil + self.id = nil + self.errno = box.error.PROC_LUA + self.response = 'Response is discarded' + end + end + + local request_mt = { __index = request_index } + -- STATE SWITCHING -- local function set_state(new_state, new_errno, new_error) state = new_state @@ -236,6 +330,7 @@ local function create_transport(host, port, user, password, callback, state_cond:broadcast() if state == 'error' or state == 'error_reconnect' then for _, request in pairs(requests) do + request.id = nil request.errno = new_errno request.response = new_error end @@ -315,12 +410,16 @@ local function create_transport(host, port, user, password, callback, end end - -- REQUEST/RESPONSE -- - local function perform_request(timeout, buffer, method, schema_version, ...) + -- + -- Send a request and do not wait for response. + -- @retval nil, error Error occured. + -- @retval not nil Future object. + -- + local function perform_async_request(buffer, method, schema_version, ...) if state ~= 'active' then - return last_errno or E_NO_CONNECTION, last_error + return nil, box.error.new({code = last_errno or E_NO_CONNECTION, + reason = last_error}) end - local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY) -- alert worker to notify it of the queued outgoing data; -- if the buffer wasn't empty, assume the worker was already alerted if send_buf:size() == 0 then @@ -329,12 +428,27 @@ local function create_transport(host, port, user, password, callback, local id = next_request_id method_encoder[method](send_buf, id, schema_version, ...) next_request_id = next_id(id) - local request = table_new(0, 6) -- reserve space for 6 keys - request.client = fiber_self() + local request = setmetatable(table_new(0, 7), request_mt) request.method = method request.schema_version = schema_version request.buffer = buffer + request.id = id requests[id] = request + return request + end + + -- + -- Send a request and wait for response. + -- + local function perform_request(timeout, buffer, method, schema_version, ...) + local request, err = + perform_async_request(buffer, method, schema_version, ...) + if not request then + return last_errno or E_NO_CONNECTION, last_error + end + request.client = fiber_self() + local id = request.id + local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY) repeat local timeout = max(0, deadline - fiber_clock()) if not state_cond:wait(timeout) then @@ -345,12 +459,6 @@ local function create_transport(host, port, user, password, callback, return request.errno, request.response end - local function wakeup_client(client) - if client:status() ~= 'dead' then - client:wakeup() - end - end - local function dispatch_response_iproto(hdr, body_rpos, body_end) local id = hdr[IPROTO_SYNC_KEY] local request = requests[id] @@ -358,6 +466,7 @@ local function create_transport(host, port, user, password, callback, return end requests[id] = nil + request.id = nil local status = hdr[IPROTO_STATUS_KEY] local body, body_end_check @@ -607,7 +716,8 @@ local function create_transport(host, port, user, password, callback, stop = stop, start = start, wait_state = wait_state, - perform_request = perform_request + perform_request = perform_request, + perform_async_request = perform_async_request, } end @@ -864,8 +974,12 @@ function remote_methods:wait_connected(timeout) end function remote_methods:_request(method, opts, ...) - local this_fiber = fiber_self() local transport = self._transport + local buffer = opts and opts.buffer + if opts and opts.is_async then + return transport.perform_async_request(buffer, method, 0, ...) + end + local this_fiber = fiber_self() local perform_request = transport.perform_request local wait_state = transport.wait_state local deadline = nil @@ -877,7 +991,6 @@ function remote_methods:_request(method, opts, ...) -- @deprecated since 1.7.4 deadline = self._deadlines[this_fiber] end - local buffer = opts and opts.buffer local err, res repeat local timeout = deadline and max(0, deadline - fiber_clock()) @@ -928,7 +1041,7 @@ function remote_methods:call(func_name, args, opts) check_call_args(args) args = args or {} local res = self:_request('call_17', opts, tostring(func_name), args) - if type(res) ~= 'table' then + if type(res) ~= 'table' or opts and opts.is_async then return res end return unpack(res) @@ -945,7 +1058,7 @@ function remote_methods:eval(code, args, opts) check_eval_args(args) args = args or {} local res = self:_request('eval', opts, code, args) - if type(res) ~= 'table' then + if type(res) ~= 'table' or opts and opts.is_async then return res end return unpack(res) diff --git a/test/box/net.box.result b/test/box/net.box.result index 6a3713fc0..aaa421ec6 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -2475,9 +2475,6 @@ box.internal.collation.drop('test') space:drop() --- ... -box.schema.user.revoke('guest', 'read,write,execute', 'universe') ---- -... c.state --- - closed @@ -2485,3 +2482,519 @@ c.state c = nil --- ... +-- +-- gh-3107: fiber-async netbox. +-- +f = nil +--- +... +function long_function(...) f = fiber.self() fiber.sleep(1000000) return ... end +--- +... +s = box.schema.create_space('test') +--- +... +pk = s:create_index('pk') +--- +... +s:replace{1} +--- +- [1] +... +s:replace{2} +--- +- [2] +... +s:replace{3} +--- +- [3] +... +s:replace{4} +--- +- [4] +... +c = net:connect(box.cfg.listen) +--- +... +-- +-- Check long connections, multiple wait_result(). +-- +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +--- +... +future:result() +--- +- null +- Response is not ready +... +future:is_ready() +--- +- false +... +future:wait_result(0.01) +--- +- null +- Timeout exceeded +... +f:wakeup() +--- +... +ret = future:wait_result(0.01) +--- +... +future:is_ready() +--- +- true +... +future:wait_result(0.01) +--- +- [1, 2, 3] +... +ret +--- +- [1, 2, 3] +... +_, err = pcall(future.wait_result, future, true) +--- +... +err:find('Usage') ~= nil +--- +- true +... +_, err = pcall(future.wait_result, future, '100') +--- +... +err:find('Usage') ~= nil +--- +- true +... +-- +-- Check infinity timeout. +-- +ret = nil +--- +... +_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end) +--- +... +f:wakeup() +--- +... +while not ret do fiber.sleep(0.01) end +--- +... +ret +--- +- [1, 2, 3] +... +future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true}) +--- +... +future:result() +--- +- null +- Response is not ready +... +future:wait_result(0.01) +--- +- null +- Timeout exceeded +... +f:wakeup() +--- +... +future:wait_result(0.01) +--- +- [1, 2, 3] +... +-- +-- Ensure the request is garbage collected both if is not used and +-- if is. +-- +gc_test = setmetatable({}, {__mode = 'v'}) +--- +... +gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true}) +--- +... +gc_test.future ~= nil +--- +- true +... +collectgarbage() +--- +- 0 +... +gc_test +--- +- [] +... +f:wakeup() +--- +... +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +--- +... +collectgarbage() +--- +- 0 +... +future ~= nil +--- +- true +... +f:wakeup() +--- +... +future:wait_result(1000) +--- +- [1, 2, 3] +... +collectgarbage() +--- +- 0 +... +future ~= nil +--- +- true +... +gc_test.future = future +--- +... +future = nil +--- +... +collectgarbage() +--- +- 0 +... +gc_test +--- +- [] +... +-- +-- Ensure a request can be finalized from non-caller fibers. +-- +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +--- +... +ret = {} +--- +... +count = 0 +--- +... +for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end +--- +... +future:wait_result(0.01) +--- +- null +- Timeout exceeded +... +f:wakeup() +--- +... +while count ~= 10 do fiber.sleep(0.1) end +--- +... +ret +--- +- - &0 [1, 2, 3] + - *0 + - *0 + - *0 + - *0 + - *0 + - *0 + - *0 + - *0 + - *0 +... +-- +-- Test space methods. +-- +future = c.space.test:select({1}, {is_async = true}) +--- +... +ret = future:wait_result(100) +--- +... +ret +--- +- - [1] +... +type(ret[1]) +--- +- cdata +... +future = c.space.test:insert({5}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- [5] +... +s:get{5} +--- +- [5] +... +future = c.space.test:replace({6}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- [6] +... +s:get{6} +--- +- [6] +... +future = c.space.test:delete({6}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- [6] +... +s:get{6} +--- +... +future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- [5, 5] +... +s:get{5} +--- +- [5, 5] +... +future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- null +... +s:get{5} +--- +- [5, 6] +... +future = c.space.test:get({5}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- [5, 6] +... +-- +-- Test index methods. +-- +future = c.space.test.index.pk:select({1}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- - [1] +... +future = c.space.test.index.pk:get({2}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- [2] +... +future = c.space.test.index.pk:min({}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- [1] +... +future = c.space.test.index.pk:max({}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- [5, 6] +... +future = c.space.test.index.pk:count({3}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- 1 +... +future = c.space.test.index.pk:delete({3}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- [3] +... +s:get{3} +--- +... +future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true}) +--- +... +future:wait_result(100) +--- +- [4, 6] +... +s:get{4} +--- +- [4, 6] +... +-- +-- Test async errors. +-- +future = c.space.test:insert({1}, {is_async = true}) +--- +... +future:wait_result() +--- +- null +- Duplicate key exists in unique index 'pk' in space 'test' +... +future:result() +--- +- null +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- +-- Test discard. +-- +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +--- +... +future:discard() +--- +... +f:wakeup() +--- +... +future:result() +--- +- null +- Response is discarded +... +future:wait_result(100) +--- +- null +- Response is discarded +... +-- +-- Test closed connection. +-- +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +--- +... +f:wakeup() +--- +... +future:wait_result(100) +--- +- [1, 2, 3] +... +future2 = c:call('long_function', {1, 2, 3}, {is_async = true}) +--- +... +c:close() +--- +... +future2:wait_result(100) +--- +- null +- Connection is not established +... +future2:result() +--- +- null +- Connection is not established +... +future2:discard() +--- +... +-- Already successful result must be available. +future:wait_result(100) +--- +- [1, 2, 3] +... +future:result() +--- +- [1, 2, 3] +... +future:is_ready() +--- +- true +... +-- +-- Test reconnect. +-- +c = net:connect(box.cfg.listen, {reconnect_after = 0.01}) +--- +... +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +--- +... +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +--- +... +while not c:is_connected() do fiber.sleep(0.01) end +--- +... +future:wait_result(100) +--- +- null +- Peer closed +... +future:result() +--- +- null +- Peer closed +... +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +--- +... +f:wakeup() +--- +... +future:wait_result(100) +--- +- [1, 2, 3] +... +-- +-- Test raw response getting. +-- +ibuf = require('buffer').ibuf() +--- +... +future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf}) +--- +... +f:wakeup() +--- +... +future:wait_result(100) +--- +- 10 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- {48: [1, 2, 3]} +... +c:close() +--- +... +s:drop() +--- +... +box.schema.user.revoke('guest', 'read,write,execute', 'universe') +--- +... diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua index 576b5cfea..82c538fbe 100644 --- a/test/box/net.box.test.lua +++ b/test/box/net.box.test.lua @@ -1004,8 +1004,190 @@ c.space.test.index.sk.parts c:close() box.internal.collation.drop('test') space:drop() - -box.schema.user.revoke('guest', 'read,write,execute', 'universe') c.state c = nil +-- +-- gh-3107: fiber-async netbox. +-- +f = nil +function long_function(...) f = fiber.self() fiber.sleep(1000000) return ... end +s = box.schema.create_space('test') +pk = s:create_index('pk') +s:replace{1} +s:replace{2} +s:replace{3} +s:replace{4} +c = net:connect(box.cfg.listen) +-- +-- Check long connections, multiple wait_result(). +-- +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +future:result() +future:is_ready() +future:wait_result(0.01) +f:wakeup() +ret = future:wait_result(0.01) +future:is_ready() +future:wait_result(0.01) +ret + +_, err = pcall(future.wait_result, future, true) +err:find('Usage') ~= nil +_, err = pcall(future.wait_result, future, '100') +err:find('Usage') ~= nil + +-- +-- Check infinity timeout. +-- +ret = nil +_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end) +f:wakeup() +while not ret do fiber.sleep(0.01) end +ret + +future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true}) +future:result() +future:wait_result(0.01) +f:wakeup() +future:wait_result(0.01) + +-- +-- Ensure the request is garbage collected both if is not used and +-- if is. +-- +gc_test = setmetatable({}, {__mode = 'v'}) +gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true}) +gc_test.future ~= nil +collectgarbage() +gc_test +f:wakeup() + +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +collectgarbage() +future ~= nil +f:wakeup() +future:wait_result(1000) +collectgarbage() +future ~= nil +gc_test.future = future +future = nil +collectgarbage() +gc_test + +-- +-- Ensure a request can be finalized from non-caller fibers. +-- +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +ret = {} +count = 0 +for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end +future:wait_result(0.01) +f:wakeup() +while count ~= 10 do fiber.sleep(0.1) end +ret + +-- +-- Test space methods. +-- +future = c.space.test:select({1}, {is_async = true}) +ret = future:wait_result(100) +ret +type(ret[1]) +future = c.space.test:insert({5}, {is_async = true}) +future:wait_result(100) +s:get{5} +future = c.space.test:replace({6}, {is_async = true}) +future:wait_result(100) +s:get{6} +future = c.space.test:delete({6}, {is_async = true}) +future:wait_result(100) +s:get{6} +future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true}) +future:wait_result(100) +s:get{5} +future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true}) +future:wait_result(100) +s:get{5} +future = c.space.test:get({5}, {is_async = true}) +future:wait_result(100) + +-- +-- Test index methods. +-- +future = c.space.test.index.pk:select({1}, {is_async = true}) +future:wait_result(100) +future = c.space.test.index.pk:get({2}, {is_async = true}) +future:wait_result(100) +future = c.space.test.index.pk:min({}, {is_async = true}) +future:wait_result(100) +future = c.space.test.index.pk:max({}, {is_async = true}) +future:wait_result(100) +future = c.space.test.index.pk:count({3}, {is_async = true}) +future:wait_result(100) +future = c.space.test.index.pk:delete({3}, {is_async = true}) +future:wait_result(100) +s:get{3} +future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true}) +future:wait_result(100) +s:get{4} + +-- +-- Test async errors. +-- +future = c.space.test:insert({1}, {is_async = true}) +future:wait_result() +future:result() + +-- +-- Test discard. +-- +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +future:discard() +f:wakeup() +future:result() +future:wait_result(100) + +-- +-- Test closed connection. +-- +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +f:wakeup() +future:wait_result(100) +future2 = c:call('long_function', {1, 2, 3}, {is_async = true}) +c:close() +future2:wait_result(100) +future2:result() +future2:discard() +-- Already successful result must be available. +future:wait_result(100) +future:result() +future:is_ready() + +-- +-- Test reconnect. +-- +c = net:connect(box.cfg.listen, {reconnect_after = 0.01}) +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +while not c:is_connected() do fiber.sleep(0.01) end +future:wait_result(100) +future:result() +future = c:call('long_function', {1, 2, 3}, {is_async = true}) +f:wakeup() +future:wait_result(100) + +-- +-- Test raw response getting. +-- +ibuf = require('buffer').ibuf() +future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf}) +f:wakeup() +future:wait_result(100) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + +c:close() +s:drop() + +box.schema.user.revoke('guest', 'read,write,execute', 'universe') -- 2.15.1 (Apple Git-101)