[PATCH v3 5/7] net.box: add skip_header option to use with buffer

Alexander Turenko alexander.turenko at tarantool.org
Wed Apr 10 18:21:23 MSK 2019


Needed for #3276.

@TarantoolBot document
Title: net.box: skip_header option

This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
from a buffer. This may be needed to pass this buffer to some C function
when it expects some specific msgpack input.

See src/box/lua/net_box.lua for examples.
---
 src/box/lua/net_box.lua   |  46 +++++---
 test/box/net.box.result   | 222 +++++++++++++++++++++++++++++++++++++-
 test/box/net.box.test.lua |  86 ++++++++++++++-
 3 files changed, 328 insertions(+), 26 deletions(-)

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index b3139a3f5..c6ed3e138 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -15,6 +15,7 @@ local max           = math.max
 local fiber_clock   = fiber.clock
 local fiber_self    = fiber.self
 local decode        = msgpack.decode_unchecked
+local decode_map    = msgpack.decode_map
 
 local table_new           = require('table.new')
 local check_iterator_type = box.internal.check_iterator_type
@@ -483,8 +484,8 @@ local function create_transport(host, port, user, password, callback,
     -- @retval nil, error Error occured.
     -- @retval not nil Future object.
     --
-    local function perform_async_request(buffer, method, on_push, on_push_ctx,
-                                         ...)
+    local function perform_async_request(buffer, skip_header, method, on_push,
+                                         on_push_ctx, ...)
         if state ~= 'active' and state ~= 'fetch_schema' then
             return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
                                        reason = last_error})
@@ -497,12 +498,13 @@ local function create_transport(host, port, user, password, callback,
         local id = next_request_id
         method_encoder[method](send_buf, id, ...)
         next_request_id = next_id(id)
-        -- Request in most cases has maximum 8 members:
-        -- method, buffer, id, cond, errno, response, on_push,
-        -- on_push_ctx.
-        local request = setmetatable(table_new(0, 8), request_mt)
+        -- Request in most cases has maximum 9 members:
+        -- method, buffer, skip_header, id, cond, errno, response,
+        -- on_push, on_push_ctx.
+        local request = setmetatable(table_new(0, 9), request_mt)
         request.method = method
         request.buffer = buffer
+        request.skip_header = skip_header
         request.id = id
         request.cond = fiber.cond()
         requests[id] = request
@@ -516,10 +518,11 @@ local function create_transport(host, port, user, password, callback,
     -- @retval nil, error Error occured.
     -- @retval not nil Response object.
     --
-    local function perform_request(timeout, buffer, method, on_push,
-                                   on_push_ctx, ...)
+    local function perform_request(timeout, buffer, skip_header, method,
+                                   on_push, on_push_ctx, ...)
         local request, err =
-            perform_async_request(buffer, method, on_push, on_push_ctx, ...)
+            perform_async_request(buffer, skip_header, method, on_push,
+                                  on_push_ctx, ...)
         if not request then
             return nil, err
         end
@@ -551,6 +554,15 @@ local function create_transport(host, port, user, password, callback,
         if buffer ~= nil then
             -- Copy xrow.body to user-provided buffer
             local body_len = body_end - body_rpos
+            if request.skip_header then
+                -- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
+                local map_len, key
+                map_len, body_rpos = decode_map(body_rpos, body_len)
+                assert(map_len == 1)
+                key, body_rpos = decode(body_rpos)
+                assert(key == IPROTO_DATA_KEY)
+                body_len = body_end - body_rpos
+            end
             local wpos = buffer:alloc(body_len)
             ffi.copy(wpos, body_rpos, body_len)
             body_len = tonumber(body_len)
@@ -1047,18 +1059,19 @@ end
 
 function remote_methods:_request(method, opts, ...)
     local transport = self._transport
-    local on_push, on_push_ctx, buffer, deadline
+    local on_push, on_push_ctx, buffer, skip_header, deadline
     -- Extract options, set defaults, check if the request is
     -- async.
     if opts then
         buffer = opts.buffer
+        skip_header = opts.skip_header
         if opts.is_async then
             if opts.on_push or opts.on_push_ctx then
                 error('To handle pushes in an async request use future:pairs()')
             end
             local res, err =
-                transport.perform_async_request(buffer, method, table.insert,
-                                                {}, ...)
+                transport.perform_async_request(buffer, skip_header, method,
+                                                table.insert, {}, ...)
             if err then
                 box.error(err)
             end
@@ -1084,8 +1097,9 @@ function remote_methods:_request(method, opts, ...)
         transport.wait_state('active', timeout)
         timeout = deadline and max(0, deadline - fiber_clock())
     end
-    local res, err = transport.perform_request(timeout, buffer, method,
-                                               on_push, on_push_ctx, ...)
+    local res, err = transport.perform_request(timeout, buffer, skip_header,
+                                               method, on_push, on_push_ctx,
+                                               ...)
     if err then
         box.error(err)
     end
@@ -1288,10 +1302,10 @@ function console_methods:eval(line, timeout)
     end
     if self.protocol == 'Binary' then
         local loader = 'return require("console").eval(...)'
-        res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line})
+        res, err = pr(timeout, nil, false, 'eval', nil, nil, loader, {line})
     else
         assert(self.protocol == 'Lua console')
-        res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
+        res, err = pr(timeout, nil, false, 'inject', nil, nil, line..'$EOF$\n')
     end
     if err then
         box.error(err)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index f71699818..8ef3de808 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -29,7 +29,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
                             offset, limit, key)
     return ret
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 ---
 ...
@@ -1598,6 +1598,18 @@ result
 ---
 - {48: [[2]]}
 ...
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[2]]
+...
 -- insert
 c.space.test:insert({3}, {buffer = ibuf})
 ---
@@ -1610,6 +1622,21 @@ result
 ---
 - {48: [[3]]}
 ...
+-- insert + skip_header
+_ = space:delete({3})
+---
+...
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
 -- update
 c.space.test:update({3}, {}, {buffer = ibuf})
 ---
@@ -1633,6 +1660,29 @@ result
 ---
 - {48: [[3]]}
 ...
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
 -- upsert
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 ---
@@ -1645,6 +1695,18 @@ result
 ---
 - {48: []}
 ...
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
 -- delete
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 ---
@@ -1657,6 +1719,18 @@ result
 ---
 - {48: []}
 ...
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
 -- select
 c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
 ---
@@ -1669,6 +1743,18 @@ result
 ---
 - {48: [[3], [2], [1, 'hello']]}
 ...
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+---
+- 17
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3], [2], [1, 'hello']]
+...
 -- select
 len = c.space.test:select({}, {buffer = ibuf})
 ---
@@ -1692,6 +1778,29 @@ result
 ---
 - {48: [[1, 'hello'], [2], [3], [4]]}
 ...
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+---
+...
+ibuf.rpos + len == ibuf.wpos
+---
+- true
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+ibuf.rpos == ibuf.wpos
+---
+- true
+...
+len
+---
+- 19
+...
+result
+---
+- [[1, 'hello'], [2], [3], [4]]
+...
 -- call
 c:call("echo", {1, 2, 3}, {buffer = ibuf})
 ---
@@ -1726,6 +1835,40 @@ result
 ---
 - {48: []}
 ...
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
 -- eval
 c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
 ---
@@ -1760,6 +1903,75 @@ result
 ---
 - {48: []}
 ...
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
 -- unsupported methods
 c.space.test:get({1}, { buffer = ibuf})
 ---
@@ -2596,7 +2808,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 ---
 ...
 c.state
@@ -3237,7 +3449,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
 future = c:call('long_function', {1, 2, 3}, {is_async = true})
 ---
 ...
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 ---
 ...
 while not c:is_connected() do fiber.sleep(0.01) end
@@ -3372,7 +3584,7 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 ---
 ...
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
@@ -3391,7 +3603,7 @@ c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
 ---
 ...
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
 ---
 - null
 - Peer closed
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index beb33c24f..5ff2975ac 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -12,7 +12,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
                             offset, limit, key)
     return ret
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 
 LISTEN = require('uri').parse(box.cfg.listen)
@@ -626,11 +626,22 @@ c.space.test:replace({2}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- insert
 c.space.test:insert({3}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- insert + skip_header
+_ = space:delete({3})
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- update
 c.space.test:update({3}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -639,21 +650,44 @@ c.space.test.index.primary:update({3}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- upsert
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- delete
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- select
 c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- select
 len = c.space.test:select({}, {buffer = ibuf})
 ibuf.rpos + len == ibuf.wpos
@@ -662,6 +696,14 @@ ibuf.rpos == ibuf.wpos
 len
 result
 
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+ibuf.rpos + len == ibuf.wpos
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+ibuf.rpos == ibuf.wpos
+len
+result
+
 -- call
 c:call("echo", {1, 2, 3}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -673,6 +715,17 @@ c:call("echo", nil, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- eval
 c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -684,6 +737,29 @@ c:eval("echo(...)", nil, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- unsupported methods
 c.space.test:get({1}, { buffer = ibuf})
 c.space.test.index.primary:min({}, { buffer = ibuf})
@@ -1074,7 +1150,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 c.state
 while not c:is_connected() do fiber.sleep(0.01) end
 c:ping()
@@ -1307,7 +1383,7 @@ finalize_long()
 --
 c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
 future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 while not c:is_connected() do fiber.sleep(0.01) end
 finalize_long()
 future:wait_result(100)
@@ -1364,7 +1440,7 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 c:close()
 
@@ -1374,7 +1450,7 @@ c:close()
 --
 c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
 c:close()
 test_run:grep_log('default', 'too big packet size in the header') ~= nil
 
-- 
2.20.1




More information about the Tarantool-patches mailing list