[PATCH v3 5/7] net.box: add skip_header option to use with buffer
Alexander Turenko
alexander.turenko at tarantool.org
Wed Apr 10 18:21:23 MSK 2019
Needed for #3276.
@TarantoolBot document
Title: net.box: skip_header option
This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
from a buffer. This may be needed to pass this buffer to some C function
when it expects some specific msgpack input.
See src/box/lua/net_box.lua for examples.
---
src/box/lua/net_box.lua | 46 +++++---
test/box/net.box.result | 222 +++++++++++++++++++++++++++++++++++++-
test/box/net.box.test.lua | 86 ++++++++++++++-
3 files changed, 328 insertions(+), 26 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index b3139a3f5..c6ed3e138 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -15,6 +15,7 @@ local max = math.max
local fiber_clock = fiber.clock
local fiber_self = fiber.self
local decode = msgpack.decode_unchecked
+local decode_map = msgpack.decode_map
local table_new = require('table.new')
local check_iterator_type = box.internal.check_iterator_type
@@ -483,8 +484,8 @@ local function create_transport(host, port, user, password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Future object.
--
- local function perform_async_request(buffer, method, on_push, on_push_ctx,
- ...)
+ local function perform_async_request(buffer, skip_header, method, on_push,
+ on_push_ctx, ...)
if state ~= 'active' and state ~= 'fetch_schema' then
return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
reason = last_error})
@@ -497,12 +498,13 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, ...)
next_request_id = next_id(id)
- -- Request in most cases has maximum 8 members:
- -- method, buffer, id, cond, errno, response, on_push,
- -- on_push_ctx.
- local request = setmetatable(table_new(0, 8), request_mt)
+ -- Request in most cases has maximum 9 members:
+ -- method, buffer, skip_header, id, cond, errno, response,
+ -- on_push, on_push_ctx.
+ local request = setmetatable(table_new(0, 9), request_mt)
request.method = method
request.buffer = buffer
+ request.skip_header = skip_header
request.id = id
request.cond = fiber.cond()
requests[id] = request
@@ -516,10 +518,11 @@ local function create_transport(host, port, user, password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Response object.
--
- local function perform_request(timeout, buffer, method, on_push,
- on_push_ctx, ...)
+ local function perform_request(timeout, buffer, skip_header, method,
+ on_push, on_push_ctx, ...)
local request, err =
- perform_async_request(buffer, method, on_push, on_push_ctx, ...)
+ perform_async_request(buffer, skip_header, method, on_push,
+ on_push_ctx, ...)
if not request then
return nil, err
end
@@ -551,6 +554,15 @@ local function create_transport(host, port, user, password, callback,
if buffer ~= nil then
-- Copy xrow.body to user-provided buffer
local body_len = body_end - body_rpos
+ if request.skip_header then
+ -- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
+ local map_len, key
+ map_len, body_rpos = decode_map(body_rpos, body_len)
+ assert(map_len == 1)
+ key, body_rpos = decode(body_rpos)
+ assert(key == IPROTO_DATA_KEY)
+ body_len = body_end - body_rpos
+ end
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
body_len = tonumber(body_len)
@@ -1047,18 +1059,19 @@ end
function remote_methods:_request(method, opts, ...)
local transport = self._transport
- local on_push, on_push_ctx, buffer, deadline
+ local on_push, on_push_ctx, buffer, skip_header, deadline
-- Extract options, set defaults, check if the request is
-- async.
if opts then
buffer = opts.buffer
+ skip_header = opts.skip_header
if opts.is_async then
if opts.on_push or opts.on_push_ctx then
error('To handle pushes in an async request use future:pairs()')
end
local res, err =
- transport.perform_async_request(buffer, method, table.insert,
- {}, ...)
+ transport.perform_async_request(buffer, skip_header, method,
+ table.insert, {}, ...)
if err then
box.error(err)
end
@@ -1084,8 +1097,9 @@ function remote_methods:_request(method, opts, ...)
transport.wait_state('active', timeout)
timeout = deadline and max(0, deadline - fiber_clock())
end
- local res, err = transport.perform_request(timeout, buffer, method,
- on_push, on_push_ctx, ...)
+ local res, err = transport.perform_request(timeout, buffer, skip_header,
+ method, on_push, on_push_ctx,
+ ...)
if err then
box.error(err)
end
@@ -1288,10 +1302,10 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line})
+ res, err = pr(timeout, nil, false, 'eval', nil, nil, loader, {line})
else
assert(self.protocol == 'Lua console')
- res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
+ res, err = pr(timeout, nil, false, 'inject', nil, nil, line..'$EOF$\n')
end
if err then
box.error(err)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index f71699818..8ef3de808 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -29,7 +29,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
offset, limit, key)
return ret
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
---
...
@@ -1598,6 +1598,18 @@ result
---
- {48: [[2]]}
...
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[2]]
+...
-- insert
c.space.test:insert({3}, {buffer = ibuf})
---
@@ -1610,6 +1622,21 @@ result
---
- {48: [[3]]}
...
+-- insert + skip_header
+_ = space:delete({3})
+---
+...
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
-- update
c.space.test:update({3}, {}, {buffer = ibuf})
---
@@ -1633,6 +1660,29 @@ result
---
- {48: [[3]]}
...
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
@@ -1645,6 +1695,18 @@ result
---
- {48: []}
...
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
@@ -1657,6 +1719,18 @@ result
---
- {48: []}
...
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- select
c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
---
@@ -1669,6 +1743,18 @@ result
---
- {48: [[3], [2], [1, 'hello']]}
...
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+---
+- 17
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3], [2], [1, 'hello']]
+...
-- select
len = c.space.test:select({}, {buffer = ibuf})
---
@@ -1692,6 +1778,29 @@ result
---
- {48: [[1, 'hello'], [2], [3], [4]]}
...
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+---
+...
+ibuf.rpos + len == ibuf.wpos
+---
+- true
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+ibuf.rpos == ibuf.wpos
+---
+- true
+...
+len
+---
+- 19
+...
+result
+---
+- [[1, 'hello'], [2], [3], [4]]
+...
-- call
c:call("echo", {1, 2, 3}, {buffer = ibuf})
---
@@ -1726,6 +1835,40 @@ result
---
- {48: []}
...
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- eval
c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
---
@@ -1760,6 +1903,75 @@ result
---
- {48: []}
...
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
---
@@ -2596,7 +2808,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
c.state
@@ -3237,7 +3449,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
while not c:is_connected() do fiber.sleep(0.01) end
@@ -3372,7 +3584,7 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
while f:status() ~= 'dead' do fiber.sleep(0.01) end
@@ -3391,7 +3603,7 @@ c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
---
...
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
---
- null
- Peer closed
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index beb33c24f..5ff2975ac 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -12,7 +12,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
offset, limit, key)
return ret
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
LISTEN = require('uri').parse(box.cfg.listen)
@@ -626,11 +626,22 @@ c.space.test:replace({2}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- insert
c.space.test:insert({3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- insert + skip_header
+_ = space:delete({3})
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- update
c.space.test:update({3}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -639,21 +650,44 @@ c.space.test.index.primary:update({3}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- select
c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- select
len = c.space.test:select({}, {buffer = ibuf})
ibuf.rpos + len == ibuf.wpos
@@ -662,6 +696,14 @@ ibuf.rpos == ibuf.wpos
len
result
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+ibuf.rpos + len == ibuf.wpos
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+ibuf.rpos == ibuf.wpos
+len
+result
+
-- call
c:call("echo", {1, 2, 3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -673,6 +715,17 @@ c:call("echo", nil, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- eval
c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -684,6 +737,29 @@ c:eval("echo(...)", nil, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
c.space.test.index.primary:min({}, { buffer = ibuf})
@@ -1074,7 +1150,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
c.state
while not c:is_connected() do fiber.sleep(0.01) end
c:ping()
@@ -1307,7 +1383,7 @@ finalize_long()
--
c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
while not c:is_connected() do fiber.sleep(0.01) end
finalize_long()
future:wait_result(100)
@@ -1364,7 +1440,7 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
while f:status() ~= 'dead' do fiber.sleep(0.01) end
c:close()
@@ -1374,7 +1450,7 @@ c:close()
--
c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
c:close()
test_run:grep_log('default', 'too big packet size in the header') ~= nil
--
2.20.1
More information about the Tarantool-patches
mailing list