From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Alexander Turenko Subject: [PATCH v3 5/7] net.box: add skip_header option to use with buffer Date: Wed, 10 Apr 2019 18:21:23 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit To: Vladimir Davydov Cc: Alexander Turenko , tarantool-patches@freelists.org List-ID: Needed for #3276. @TarantoolBot document Title: net.box: skip_header option This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper from a buffer. This may be needed to pass this buffer to some C function when it expects some specific msgpack input. See src/box/lua/net_box.lua for examples. --- src/box/lua/net_box.lua | 46 +++++--- test/box/net.box.result | 222 +++++++++++++++++++++++++++++++++++++- test/box/net.box.test.lua | 86 ++++++++++++++- 3 files changed, 328 insertions(+), 26 deletions(-) diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index b3139a3f5..c6ed3e138 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -15,6 +15,7 @@ local max = math.max local fiber_clock = fiber.clock local fiber_self = fiber.self local decode = msgpack.decode_unchecked +local decode_map = msgpack.decode_map local table_new = require('table.new') local check_iterator_type = box.internal.check_iterator_type @@ -483,8 +484,8 @@ local function create_transport(host, port, user, password, callback, -- @retval nil, error Error occured. -- @retval not nil Future object. -- - local function perform_async_request(buffer, method, on_push, on_push_ctx, - ...) + local function perform_async_request(buffer, skip_header, method, on_push, + on_push_ctx, ...) if state ~= 'active' and state ~= 'fetch_schema' then return nil, box.error.new({code = last_errno or E_NO_CONNECTION, reason = last_error}) @@ -497,12 +498,13 @@ local function create_transport(host, port, user, password, callback, local id = next_request_id method_encoder[method](send_buf, id, ...) next_request_id = next_id(id) - -- Request in most cases has maximum 8 members: - -- method, buffer, id, cond, errno, response, on_push, - -- on_push_ctx. - local request = setmetatable(table_new(0, 8), request_mt) + -- Request in most cases has maximum 9 members: + -- method, buffer, skip_header, id, cond, errno, response, + -- on_push, on_push_ctx. + local request = setmetatable(table_new(0, 9), request_mt) request.method = method request.buffer = buffer + request.skip_header = skip_header request.id = id request.cond = fiber.cond() requests[id] = request @@ -516,10 +518,11 @@ local function create_transport(host, port, user, password, callback, -- @retval nil, error Error occured. -- @retval not nil Response object. -- - local function perform_request(timeout, buffer, method, on_push, - on_push_ctx, ...) + local function perform_request(timeout, buffer, skip_header, method, + on_push, on_push_ctx, ...) local request, err = - perform_async_request(buffer, method, on_push, on_push_ctx, ...) + perform_async_request(buffer, skip_header, method, on_push, + on_push_ctx, ...) if not request then return nil, err end @@ -551,6 +554,15 @@ local function create_transport(host, port, user, password, callback, if buffer ~= nil then -- Copy xrow.body to user-provided buffer local body_len = body_end - body_rpos + if request.skip_header then + -- Skip {[IPROTO_DATA_KEY] = ...} wrapper. + local map_len, key + map_len, body_rpos = decode_map(body_rpos, body_len) + assert(map_len == 1) + key, body_rpos = decode(body_rpos) + assert(key == IPROTO_DATA_KEY) + body_len = body_end - body_rpos + end local wpos = buffer:alloc(body_len) ffi.copy(wpos, body_rpos, body_len) body_len = tonumber(body_len) @@ -1047,18 +1059,19 @@ end function remote_methods:_request(method, opts, ...) local transport = self._transport - local on_push, on_push_ctx, buffer, deadline + local on_push, on_push_ctx, buffer, skip_header, deadline -- Extract options, set defaults, check if the request is -- async. if opts then buffer = opts.buffer + skip_header = opts.skip_header if opts.is_async then if opts.on_push or opts.on_push_ctx then error('To handle pushes in an async request use future:pairs()') end local res, err = - transport.perform_async_request(buffer, method, table.insert, - {}, ...) + transport.perform_async_request(buffer, skip_header, method, + table.insert, {}, ...) if err then box.error(err) end @@ -1084,8 +1097,9 @@ function remote_methods:_request(method, opts, ...) transport.wait_state('active', timeout) timeout = deadline and max(0, deadline - fiber_clock()) end - local res, err = transport.perform_request(timeout, buffer, method, - on_push, on_push_ctx, ...) + local res, err = transport.perform_request(timeout, buffer, skip_header, + method, on_push, on_push_ctx, + ...) if err then box.error(err) end @@ -1288,10 +1302,10 @@ function console_methods:eval(line, timeout) end if self.protocol == 'Binary' then local loader = 'return require("console").eval(...)' - res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line}) + res, err = pr(timeout, nil, false, 'eval', nil, nil, loader, {line}) else assert(self.protocol == 'Lua console') - res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n') + res, err = pr(timeout, nil, false, 'inject', nil, nil, line..'$EOF$\n') end if err then box.error(err) diff --git a/test/box/net.box.result b/test/box/net.box.result index f71699818..8ef3de808 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -29,7 +29,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) offset, limit, key) return ret end -function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end +function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); --- ... @@ -1598,6 +1598,18 @@ result --- - {48: [[2]]} ... +-- replace + skip_header +c.space.test:replace({2}, {buffer = ibuf, skip_header = true}) +--- +- 7 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [[2]] +... -- insert c.space.test:insert({3}, {buffer = ibuf}) --- @@ -1610,6 +1622,21 @@ result --- - {48: [[3]]} ... +-- insert + skip_header +_ = space:delete({3}) +--- +... +c.space.test:insert({3}, {buffer = ibuf, skip_header = true}) +--- +- 7 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [[3]] +... -- update c.space.test:update({3}, {}, {buffer = ibuf}) --- @@ -1633,6 +1660,29 @@ result --- - {48: [[3]]} ... +-- update + skip_header +c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true}) +--- +- 7 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [[3]] +... +c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true}) +--- +- 7 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [[3]] +... -- upsert c.space.test:upsert({4}, {}, {buffer = ibuf}) --- @@ -1645,6 +1695,18 @@ result --- - {48: []} ... +-- upsert + skip_header +c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... -- delete c.space.test:upsert({4}, {}, {buffer = ibuf}) --- @@ -1657,6 +1719,18 @@ result --- - {48: []} ... +-- delete + skip_header +c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... -- select c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf}) --- @@ -1669,6 +1743,18 @@ result --- - {48: [[3], [2], [1, 'hello']]} ... +-- select + skip_header +c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true}) +--- +- 17 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [[3], [2], [1, 'hello']] +... -- select len = c.space.test:select({}, {buffer = ibuf}) --- @@ -1692,6 +1778,29 @@ result --- - {48: [[1, 'hello'], [2], [3], [4]]} ... +-- select + skip_header +len = c.space.test:select({}, {buffer = ibuf, skip_header = true}) +--- +... +ibuf.rpos + len == ibuf.wpos +--- +- true +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +ibuf.rpos == ibuf.wpos +--- +- true +... +len +--- +- 19 +... +result +--- +- [[1, 'hello'], [2], [3], [4]] +... -- call c:call("echo", {1, 2, 3}, {buffer = ibuf}) --- @@ -1726,6 +1835,40 @@ result --- - {48: []} ... +-- call + skip_header +c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +--- +- 8 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [1, 2, 3] +... +c:call("echo", {}, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... +c:call("echo", nil, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... -- eval c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf}) --- @@ -1760,6 +1903,75 @@ result --- - {48: []} ... +-- eval + skip_header +c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... +c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... +c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... +-- make several request into a buffer with skip_header, then read +-- results +c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +--- +- 8 +... +c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +--- +- 8 +... +c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +--- +- 8 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [1, 2, 3] +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [1, 2, 3] +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [1, 2, 3] +... -- unsupported methods c.space.test:get({1}, { buffer = ibuf}) --- @@ -2596,7 +2808,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') --- ... c.state @@ -3237,7 +3449,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01}) future = c:call('long_function', {1, 2, 3}, {is_async = true}) --- ... -_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') --- ... while not c:is_connected() do fiber.sleep(0.01) end @@ -3372,7 +3584,7 @@ c:ping() -- new attempts to read any data - the connection is closed -- already. -- -f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') --- ... while f:status() ~= 'dead' do fiber.sleep(0.01) end @@ -3391,7 +3603,7 @@ c = net:connect(box.cfg.listen) data = msgpack.encode(18400000000000000000)..'aaaaaaa' --- ... -c._transport.perform_request(nil, nil, 'inject', nil, nil, data) +c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data) --- - null - Peer closed diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua index beb33c24f..5ff2975ac 100644 --- a/test/box/net.box.test.lua +++ b/test/box/net.box.test.lua @@ -12,7 +12,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) offset, limit, key) return ret end -function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end +function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); LISTEN = require('uri').parse(box.cfg.listen) @@ -626,11 +626,22 @@ c.space.test:replace({2}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- replace + skip_header +c.space.test:replace({2}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- insert c.space.test:insert({3}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- insert + skip_header +_ = space:delete({3}) +c.space.test:insert({3}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- update c.space.test:update({3}, {}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) @@ -639,21 +650,44 @@ c.space.test.index.primary:update({3}, {}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- update + skip_header +c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- upsert c.space.test:upsert({4}, {}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- upsert + skip_header +c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- delete c.space.test:upsert({4}, {}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- delete + skip_header +c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- select c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- select + skip_header +c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- select len = c.space.test:select({}, {buffer = ibuf}) ibuf.rpos + len == ibuf.wpos @@ -662,6 +696,14 @@ ibuf.rpos == ibuf.wpos len result +-- select + skip_header +len = c.space.test:select({}, {buffer = ibuf, skip_header = true}) +ibuf.rpos + len == ibuf.wpos +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +ibuf.rpos == ibuf.wpos +len +result + -- call c:call("echo", {1, 2, 3}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) @@ -673,6 +715,17 @@ c:call("echo", nil, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- call + skip_header +c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +c:call("echo", {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +c:call("echo", nil, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- eval c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) @@ -684,6 +737,29 @@ c:eval("echo(...)", nil, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- eval + skip_header +c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + +-- make several request into a buffer with skip_header, then read +-- results +c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- unsupported methods c.space.test:get({1}, { buffer = ibuf}) c.space.test.index.primary:min({}, { buffer = ibuf}) @@ -1074,7 +1150,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') c.state while not c:is_connected() do fiber.sleep(0.01) end c:ping() @@ -1307,7 +1383,7 @@ finalize_long() -- c = net:connect(box.cfg.listen, {reconnect_after = 0.01}) future = c:call('long_function', {1, 2, 3}, {is_async = true}) -_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') while not c:is_connected() do fiber.sleep(0.01) end finalize_long() future:wait_result(100) @@ -1364,7 +1440,7 @@ c:ping() -- new attempts to read any data - the connection is closed -- already. -- -f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') while f:status() ~= 'dead' do fiber.sleep(0.01) end c:close() @@ -1374,7 +1450,7 @@ c:close() -- c = net:connect(box.cfg.listen) data = msgpack.encode(18400000000000000000)..'aaaaaaa' -c._transport.perform_request(nil, nil, 'inject', nil, nil, data) +c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data) c:close() test_run:grep_log('default', 'too big packet size in the header') ~= nil -- 2.20.1