[PATCH v2 5/6] net.box: add helpers to decode msgpack headers
Alexander Turenko
alexander.turenko at tarantool.org
Fri Feb 1 18:11:42 MSK 2019
Splitted this patch to two ones:
* lua: add non-recursive msgpack decoding functions
* net.box: add skip_header option to use with buffer
I attached them at end of the email.
WBR, Alexander Turenko.
On Thu, Jan 10, 2019 at 08:29:33PM +0300, Vladimir Davydov wrote:
> On Wed, Jan 09, 2019 at 11:20:13PM +0300, Alexander Turenko wrote:
> > Needed for #3276.
> >
> > @TarantoolBot document
> > Title: net.box: helpers to decode msgpack headers
> >
> > They allow to skip iproto packet and msgpack array headers and pass raw
> > msgpack data to some other function, say, merger.
> >
> > Contracts:
> >
> > ```
> > net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)
> > -> new_rpos
> > -> nil, err_msg
>
> I'd prefer if this was done right in net.box.select or whatever function
> writing the response to ibuf. Yes, this is going to break backward
> compatibility, but IMO it's OK for 2.1 - I doubt anybody have used this
> weird high perf API anyway.
1. This will break tarantool/shard.
2. Hey, Guido thinks it is okay to break compatibility btw Python 2 and
Python 3 and it seems that Python 2 is in use ten years or like so.
I can do it under a separate option: skip_iproto_header or skip_header.
It is not about a packet header, but part of body, however I have no
better variants.
> > msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, [, arr_len])
> > -> new_rpos, arr_len
> > -> nil, err_msg
>
> This seems to be OK, although I'm not sure if we really need to check
> the length in this function. Looks like we will definitely need it
> because of net.box.call, which wraps function return value in an array.
> Not sure about the name either, because it doesn't just checks the
> msgpack - it decodes it, but can't come up with anything substantially
> better. May be, msgpack.decode_array?
Re check length: the reason was to simplify user's code, but ok, it will
not much more complex if we'll factor this check out. Like so (except
from the merger's commit message):
```
conn:call('batch_select', <...>, {buffer = buf, skip_header = true})
local len, _
len, buf.rpos = msgpack.decode_array(buf.rpos, buf:size())
assert(len == 1)
_, buf.rpos = msgpack.decode_array(buf.rpos, buf:size())
```
Re name: now I understood: decode_unchecked() is like mp_decode(),
decode() is like mp_check() + mp_decode(). So it worth to rename it to
decode_array(). Done.
Also I changed order of return values to match msgpack.decode() (before
it matches msgpack.ibuf_decode()).
> > ```
> >
> > Below the example with msgpack.decode() as the function that need raw
> > msgpack data. It is just to illustrate the approach, there is no sense
> > to skip iproto/array headers manually in Lua and then decode the rest in
> > Lua. But it worth when the raw msgpack data is subject to process in a C
> > module.
> >
> > ```lua
> > local function single_select(space, ...)
> > return box.space[space]:select(...)
> > end
> >
> > local function batch_select(spaces, ...)
> > local res = {}
> > for _, space in ipairs(spaces) do
> > table.insert(res, box.space[space]:select(...))
> > end
> > return res
> > end
> >
> > _G.single_select = single_select
> > _G.batch_select = batch_select
> >
> > local res
> >
> > local buf = buffer.ibuf()
> > conn.space.s:select(nil, {buffer = buf})
> > -- check and skip iproto_data header
> > buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos))
> > -- check that we really got data from :select() as result
> > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
> > -- check that the buffer ends
> > assert(buf.rpos == buf.wpos)
> >
> > buf:recycle()
> > conn:call('single_select', {'s'}, {buffer = buf})
> > -- check and skip the iproto_data header
> > buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos))
> > -- check and skip the array around return values
> > buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, 1))
> > -- check that we really got data from :select() as result
> > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
> > -- check that the buffer ends
> > assert(buf.rpos == buf.wpos)
> >
> > buf:recycle()
> > local spaces = {'s', 't'}
> > conn:call('batch_select', {spaces}, {buffer = buf})
> > -- check and skip the iproto_data header
> > buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos))
> > -- check and skip the array around return values
> > buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, 1))
> > -- check and skip the array header before the first select result
> > buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, #spaces))
> > -- check that we really got data from s:select() as result
> > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
> > -- t:select() data
> > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
> > -- check that the buffer ends
> > assert(buf.rpos == buf.wpos)
> > ```
> > ---
> > src/box/lua/net_box.c | 49 +++++++++++
> > src/box/lua/net_box.lua | 1 +
> > src/lua/msgpack.c | 66 ++++++++++++++
> > test/app-tap/msgpack.test.lua | 157 +++++++++++++++++++++++++++++++++-
> > test/box/net.box.result | 58 +++++++++++++
> > test/box/net.box.test.lua | 26 ++++++
> > 6 files changed, 356 insertions(+), 1 deletion(-)
> >
> > diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
> > index c7063d9c8..d71f33768 100644
> > --- a/src/box/lua/net_box.c
> > +++ b/src/box/lua/net_box.c
> > @@ -51,6 +51,9 @@
> >
> > #define cfg luaL_msgpack_default
> >
> > +static uint32_t CTID_CHAR_PTR;
> > +static uint32_t CTID_CONST_CHAR_PTR;
> > +
> > static inline size_t
> > netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type)
> > {
> > @@ -745,9 +748,54 @@ netbox_decode_execute(struct lua_State *L)
> > return 2;
> > }
> >
> > +/**
> > + * net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)
> > + * -> new_rpos
> > + * -> nil, err_msg
> > + */
> > +int
> > +netbox_check_iproto_data(struct lua_State *L)
>
> Instead of adding this function to net_box.c, I'd rather try to add
> msgpack helpers for decoding a map, similar to msgpack.check_array added
> by your patch, and use them in net_box.lua.
Done.
We discussed that we should add such helpers for all types like nil,
bool, number, string, maybe bin. I think we can reuse recursive
msgpack.decode() if we expect a scalar value.
> > +{
> > + uint32_t ctypeid;
> > + const char *data = *(const char **) luaL_checkcdata(L, 1, &ctypeid);
> > + if (ctypeid != CTID_CHAR_PTR && ctypeid != CTID_CONST_CHAR_PTR)
> > + return luaL_error(L,
> > + "net_box.check_iproto_data: 'char *' or "
> > + "'const char *' expected");
> > +
> > + if (!lua_isnumber(L, 2))
> > + return luaL_error(L, "net_box.check_iproto_data: number "
> > + "expected as 2nd argument");
> > + const char *end = data + lua_tointeger(L, 2);
> > +
> > + int ok = data < end &&
> > + mp_typeof(*data) == MP_MAP &&
> > + mp_check_map(data, end) <= 0 &&
> > + mp_decode_map(&data) == 1 &&
> > + data < end &&
> > + mp_typeof(*data) == MP_UINT &&
> > + mp_check_uint(data, end) <= 0 &&
> > + mp_decode_uint(&data) == IPROTO_DATA;
> > +
> > + if (!ok) {
> > + lua_pushnil(L);
> > + lua_pushstring(L,
> > + "net_box.check_iproto_data: wrong iproto data packet");
> > + return 2;
> > + }
> > +
> > + *(const char **) luaL_pushcdata(L, ctypeid) = data;
> > + return 1;
> > +}
> > +
> > int
> > luaopen_net_box(struct lua_State *L)
> > {
> > + CTID_CHAR_PTR = luaL_ctypeid(L, "char *");
> > + assert(CTID_CHAR_PTR != 0);
> > + CTID_CONST_CHAR_PTR = luaL_ctypeid(L, "const char *");
> > + assert(CTID_CONST_CHAR_PTR != 0);
> > +
> > static const luaL_Reg net_box_lib[] = {
> > { "encode_ping", netbox_encode_ping },
> > { "encode_call_16", netbox_encode_call_16 },
> > @@ -765,6 +813,7 @@ luaopen_net_box(struct lua_State *L)
> > { "communicate", netbox_communicate },
> > { "decode_select", netbox_decode_select },
> > { "decode_execute", netbox_decode_execute },
> > + { "check_iproto_data", netbox_check_iproto_data },
> > { NULL, NULL}
> > };
> > /* luaL_register_module polutes _G */
> > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> > index 2bf772aa8..0a38efa5a 100644
> > --- a/src/box/lua/net_box.lua
> > +++ b/src/box/lua/net_box.lua
> > @@ -1424,6 +1424,7 @@ local this_module = {
> > new = connect, -- Tarantool < 1.7.1 compatibility,
> > wrap = wrap,
> > establish_connection = establish_connection,
> > + check_iproto_data = internal.check_iproto_data,
> > }
> >
> > function this_module.timeout(timeout, ...)
> > diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
> > index b47006038..fca440660 100644
> > --- a/src/lua/msgpack.c
> > +++ b/src/lua/msgpack.c
> > @@ -51,6 +51,7 @@ luamp_error(void *error_ctx)
> > }
> >
> > static uint32_t CTID_CHAR_PTR;
> > +static uint32_t CTID_CONST_CHAR_PTR;
> > static uint32_t CTID_STRUCT_IBUF;
> >
> > struct luaL_serializer *luaL_msgpack_default = NULL;
> > @@ -418,6 +419,68 @@ lua_ibuf_msgpack_decode(lua_State *L)
> > return 2;
> > }
> >
> > +/**
> > + * msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, [, arr_len])
> > + * -> new_rpos, arr_len
> > + * -> nil, err_msg
> > + */
> > +static int
> > +lua_check_array(lua_State *L)
> > +{
> > + uint32_t ctypeid;
> > + const char *data = *(const char **) luaL_checkcdata(L, 1, &ctypeid);
> > + if (ctypeid != CTID_CHAR_PTR && ctypeid != CTID_CONST_CHAR_PTR)
>
> Hm, msgpack.decode doesn't care about CTID_CONST_CHAR_PTR. Why should we?
It looks natural to support a const pointer where we allow non-const
one. But I don't have an example where we can obtain 'const char *'
buffer with msgpack in Lua (w/o ffi.cast()). Msgpackffi returns 'const
unsigned char *', but it is the bug and should be fixed in
https://github.com/tarantool/tarantool/issues/3926
> > + return luaL_error(L, "msgpack.check_array: 'char *' or "
> > + "'const char *' expected");
> > +
> > + if (!lua_isnumber(L, 2))
> > + return luaL_error(L, "msgpack.check_array: number expected as "
> > + "2nd argument");
> > + const char *end = data + lua_tointeger(L, 2);
> > +
> > + if (!lua_isnoneornil(L, 3) && !lua_isnumber(L, 3))
> > + return luaL_error(L, "msgpack.check_array: number or nil "
> > + "expected as 3rd argument");
>
> Why not simply luaL_checkinteger?
We can separatelly check lua_gettop() and use luaL_checkinteger(). It
looks shorter, now I see. Fixed.
> > +
> > + static const char *end_of_buffer_msg = "msgpack.check_array: "
> > + "unexpected end of buffer";
>
> No point to make this variable static.
Ok. But now I removed it.
> > +
> > + if (data >= end) {
> > + lua_pushnil(L);
> > + lua_pushstring(L, end_of_buffer_msg);
>
> msgpack.decode throws an error when it fails to decode msgpack data, so
> I think this function should throw too.
Or Lua code style states we should report errors with `nil, err`. But
this aspect is more about external modules as I see. It is quite unclear
what is the best option for built-in modules.
If one likely want to handle an error in Lua the `nil, err` approach
looks better. As far as I know at least some of our commercial projects
primarily use this approach and have to wrap many functions with pcall.
Don't sure how much the overhead is.
But anyway other msgpack functions just raise an error and it seems the
new functions should have similar contract.
Changed.
> > + return 2;
> > + }
> > +
> > + if (mp_typeof(*data) != MP_ARRAY) {
> > + lua_pushnil(L);
> > + lua_pushstring(L, "msgpack.check_array: wrong array header");
> > + return 2;
> > + }
> > +
> > + if (mp_check_array(data, end) > 0) {
> > + lua_pushnil(L);
> > + lua_pushstring(L, end_of_buffer_msg);
> > + return 2;
> > + }
> > +
> > + uint32_t len = mp_decode_array(&data);
> > +
> > + if (!lua_isnoneornil(L, 3)) {
> > + uint32_t exp_len = (uint32_t) lua_tointeger(L, 3);
>
> IMO it would be better if you set exp_len when you checked the arguments
> (using luaL_checkinteger).
Expected length was removed from the function as you suggested.
> > + if (len != exp_len) {
> > + lua_pushnil(L);
> > + lua_pushfstring(L, "msgpack.check_array: expected "
> > + "array of length %d, got length %d",
> > + len, exp_len);
> > + return 2;
> > + }
> > + }
> > +
> > + *(const char **) luaL_pushcdata(L, ctypeid) = data;
> > + lua_pushinteger(L, len);
> > + return 2;
> > +}
> > +
> > static int
> > lua_msgpack_new(lua_State *L);
> >
> > @@ -426,6 +489,7 @@ static const luaL_Reg msgpacklib[] = {
> > { "decode", lua_msgpack_decode },
> > { "decode_unchecked", lua_msgpack_decode_unchecked },
> > { "ibuf_decode", lua_ibuf_msgpack_decode },
> > + { "check_array", lua_check_array },
> > { "new", lua_msgpack_new },
> > { NULL, NULL }
> > };
> > @@ -447,6 +511,8 @@ luaopen_msgpack(lua_State *L)
> > assert(CTID_STRUCT_IBUF != 0);
> > CTID_CHAR_PTR = luaL_ctypeid(L, "char *");
> > assert(CTID_CHAR_PTR != 0);
> > + CTID_CONST_CHAR_PTR = luaL_ctypeid(L, "const char *");
> > + assert(CTID_CONST_CHAR_PTR != 0);
> > luaL_msgpack_default = luaL_newserializer(L, "msgpack", msgpacklib);
> > return 1;
> > }
----
commit 8c820dff279734d79e26591dcb771f7c6ab13639
Author: Alexander Turenko <alexander.turenko at tarantool.org>
Date: Thu Jan 31 01:45:22 2019 +0300
lua: add non-recursive msgpack decoding functions
Needed for #3276.
@TarantoolBot document
Title: Non-recursive msgpack decoding functions
Contracts:
```
msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
```
These functions are intended to be used with a msgpack buffer received
from net.box. A user may want to skip {[IPROTO_DATA_KEY] = ...} wrapper
and an array header before pass the buffer to decode in some C function.
See https://github.com/tarantool/tarantool/issues/2195 for more
information re this net.box's API.
Consider merger's docbot comment for usage examples.
diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
index b47006038..92a9efd25 100644
--- a/src/lua/msgpack.c
+++ b/src/lua/msgpack.c
@@ -418,6 +418,84 @@ lua_ibuf_msgpack_decode(lua_State *L)
return 2;
}
+/**
+ * Verify and set arguments: data and size.
+ *
+ * Always return 0. In case of any fail raise a Lua error.
+ */
+static int
+verify_decode_args(lua_State *L, const char *func_name, const char **data_p,
+ ptrdiff_t *size_p)
+{
+ /* Verify arguments count. */
+ if (lua_gettop(L) != 2)
+ return luaL_error(L, "Usage: %s(ptr, size)", func_name);
+
+ /* Verify ptr type. */
+ uint32_t ctypeid;
+ const char *data = *(char **) luaL_checkcdata(L, 1, &ctypeid);
+ if (ctypeid != CTID_CHAR_PTR)
+ return luaL_error(L, "%s: 'char *' expected", func_name);
+
+ /* Verify size type and value. */
+ ptrdiff_t size = (ptrdiff_t) luaL_checkinteger(L, 2);
+ if (size <= 0)
+ return luaL_error(L, "%s: non-positive size", func_name);
+
+ *data_p = data;
+ *size_p = size;
+
+ return 0;
+}
+
+/**
+ * msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
+ */
+static int
+lua_decode_array(lua_State *L)
+{
+ const char *func_name = "msgpack.decode_array";
+ const char *data;
+ ptrdiff_t size;
+ verify_decode_args(L, func_name, &data, &size);
+
+ if (mp_typeof(*data) != MP_ARRAY)
+ return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+ if (mp_check_array(data, data + size) > 0)
+ return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+ uint32_t len = mp_decode_array(&data);
+
+ lua_pushinteger(L, len);
+ *(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+ return 2;
+}
+
+/**
+ * msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
+ */
+static int
+lua_decode_map(lua_State *L)
+{
+ const char *func_name = "msgpack.decode_map";
+ const char *data;
+ ptrdiff_t size;
+ verify_decode_args(L, func_name, &data, &size);
+
+ if (mp_typeof(*data) != MP_MAP)
+ return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+ if (mp_check_map(data, data + size) > 0)
+ return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+ uint32_t len = mp_decode_map(&data);
+
+ lua_pushinteger(L, len);
+ *(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+ return 2;
+}
+
static int
lua_msgpack_new(lua_State *L);
@@ -426,6 +504,8 @@ static const luaL_Reg msgpacklib[] = {
{ "decode", lua_msgpack_decode },
{ "decode_unchecked", lua_msgpack_decode_unchecked },
{ "ibuf_decode", lua_ibuf_msgpack_decode },
+ { "decode_array", lua_decode_array },
+ { "decode_map", lua_decode_map },
{ "new", lua_msgpack_new },
{ NULL, NULL }
};
diff --git a/test/app-tap/msgpack.test.lua b/test/app-tap/msgpack.test.lua
index 0e1692ad9..ee215dfb1 100755
--- a/test/app-tap/msgpack.test.lua
+++ b/test/app-tap/msgpack.test.lua
@@ -49,9 +49,186 @@ local function test_misc(test, s)
test:ok(not st and e:match("null"), "null ibuf")
end
+local function test_decode_array_map(test, s)
+ local ffi = require('ffi')
+
+ local usage_err = 'Usage: msgpack%.decode_[^_(]+%(ptr, size%)'
+ local end_of_buffer_err = 'msgpack%.decode_[^_]+: unexpected end of buffer'
+ local non_positive_size_err = 'msgpack.decode_[^_]+: non%-positive size'
+
+ local decode_cases = {
+ {
+ 'fixarray',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\x94'),
+ size = 1,
+ exp_len = 4,
+ exp_rewind = 1,
+ },
+ {
+ 'array 16',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdc\x00\x04'),
+ size = 3,
+ exp_len = 4,
+ exp_rewind = 3,
+ },
+ {
+ 'array 32',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdd\x00\x00\x00\x04'),
+ size = 5,
+ exp_len = 4,
+ exp_rewind = 5,
+ },
+ {
+ 'truncated array 16',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdc\x00'),
+ size = 2,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'truncated array 32',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdd\x00\x00\x00'),
+ size = 4,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'fixmap',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\x84'),
+ size = 1,
+ exp_len = 4,
+ exp_rewind = 1,
+ },
+ {
+ 'map 16',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xde\x00\x04'),
+ size = 3,
+ exp_len = 4,
+ exp_rewind = 3,
+ },
+ {
+ 'array 32',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xdf\x00\x00\x00\x04'),
+ size = 5,
+ exp_len = 4,
+ exp_rewind = 5,
+ },
+ {
+ 'truncated map 16',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xde\x00'),
+ size = 2,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'truncated map 32',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xdf\x00\x00\x00'),
+ size = 4,
+ exp_err = end_of_buffer_err,
+ },
+ }
+
+ local bad_api_cases = {
+ {
+ 'wrong msgpack type',
+ data = ffi.cast('char *', '\xc0'),
+ size = 1,
+ exp_err = 'msgpack.decode_[^_]+: unexpected msgpack type',
+ },
+ {
+ 'zero size buffer',
+ data = ffi.cast('char *', ''),
+ size = 0,
+ exp_err = non_positive_size_err,
+ },
+ {
+ 'negative size buffer',
+ data = ffi.cast('char *', ''),
+ size = -1,
+ exp_err = non_positive_size_err,
+ },
+ {
+ 'size is nil',
+ data = ffi.cast('char *', ''),
+ size = nil,
+ exp_err = 'bad argument',
+ },
+ {
+ 'no arguments',
+ args = {},
+ exp_err = usage_err,
+ },
+ {
+ 'one argument',
+ args = {ffi.cast('char *', '')},
+ exp_err = usage_err,
+ },
+ {
+ 'data is nil',
+ args = {nil, 1},
+ exp_err = 'expected cdata as 1 argument',
+ },
+ {
+ 'data is not cdata',
+ args = {1, 1},
+ exp_err = 'expected cdata as 1 argument',
+ },
+ {
+ 'data with wrong cdata type',
+ args = {box.tuple.new(), 1},
+ exp_err = "msgpack.decode_[^_]+: 'char %*' expected",
+ },
+ {
+ 'size has wrong type',
+ args = {ffi.cast('char *', ''), 'eee'},
+ exp_err = 'bad argument',
+ },
+ }
+
+ test:plan(#decode_cases + 2 * #bad_api_cases)
+
+ -- Decode cases.
+ for _, case in ipairs(decode_cases) do
+ if case.exp_err ~= nil then
+ local ok, err = pcall(case.func, case.data, case.size)
+ local description = ('bad; %s'):format(case[1])
+ test:ok(ok == false and err:match(case.exp_err), description)
+ else
+ local len, new_buf = case.func(case.data, case.size)
+ local rewind = new_buf - case.data
+ local description = ('good; %s'):format(case[1])
+ test:is_deeply({len, rewind}, {case.exp_len, case.exp_rewind},
+ description)
+ end
+ end
+
+ -- Bad api usage cases.
+ for _, func_name in ipairs({'decode_array', 'decode_map'}) do
+ for _, case in ipairs(bad_api_cases) do
+ local ok, err
+ if case.args ~= nil then
+ local args_len = table.maxn(case.args)
+ ok, err = pcall(s[func_name], unpack(case.args, 1, args_len))
+ else
+ ok, err = pcall(s[func_name], case.data, case.size)
+ end
+ local description = ('%s bad api usage; %s'):format(func_name,
+ case[1])
+ test:ok(ok == false and err:match(case.exp_err), description)
+ end
+ end
+end
+
tap.test("msgpack", function(test)
local serializer = require('msgpack')
- test:plan(10)
+ test:plan(11)
test:test("unsigned", common.test_unsigned, serializer)
test:test("signed", common.test_signed, serializer)
test:test("double", common.test_double, serializer)
@@ -62,4 +239,5 @@ tap.test("msgpack", function(test)
test:test("ucdata", common.test_ucdata, serializer)
test:test("offsets", test_offsets, serializer)
test:test("misc", test_misc, serializer)
+ test:test("decode_array_map", test_decode_array_map, serializer)
end)
----
commit 3868d5c2551c893f16bd05c79d4d52a564c6a833
Author: Alexander Turenko <alexander.turenko at tarantool.org>
Date: Thu Jan 31 01:59:18 2019 +0300
net.box: add skip_header option to use with buffer
Needed for #3276.
@TarantoolBot document
Title: net.box: skip_header option
This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
from a buffer. This may be needed to pass this buffer to some C function
when it expects some specific msgpack input.
See src/box/lua/net_box.lua for examples. Also consider merger's docbot
comment for more examples.
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 2bf772aa8..53c93cafb 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -15,6 +15,7 @@ local max = math.max
local fiber_clock = fiber.clock
local fiber_self = fiber.self
local decode = msgpack.decode_unchecked
+local decode_map = msgpack.decode_map
local table_new = require('table.new')
local check_iterator_type = box.internal.check_iterator_type
@@ -483,8 +484,8 @@ local function create_transport(host, port, user, password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Future object.
--
- local function perform_async_request(buffer, method, on_push, on_push_ctx,
- ...)
+ local function perform_async_request(buffer, skip_header, method, on_push,
+ on_push_ctx, ...)
if state ~= 'active' and state ~= 'fetch_schema' then
return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
reason = last_error})
@@ -497,12 +498,13 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, ...)
next_request_id = next_id(id)
- -- Request in most cases has maximum 8 members:
- -- method, buffer, id, cond, errno, response, on_push,
- -- on_push_ctx.
- local request = setmetatable(table_new(0, 8), request_mt)
+ -- Request in most cases has maximum 9 members:
+ -- method, buffer, skip_header, id, cond, errno, response,
+ -- on_push, on_push_ctx.
+ local request = setmetatable(table_new(0, 9), request_mt)
request.method = method
request.buffer = buffer
+ request.skip_header = skip_header
request.id = id
request.cond = fiber.cond()
requests[id] = request
@@ -516,10 +518,11 @@ local function create_transport(host, port, user, password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Response object.
--
- local function perform_request(timeout, buffer, method, on_push,
- on_push_ctx, ...)
+ local function perform_request(timeout, buffer, skip_header, method,
+ on_push, on_push_ctx, ...)
local request, err =
- perform_async_request(buffer, method, on_push, on_push_ctx, ...)
+ perform_async_request(buffer, skip_header, method, on_push,
+ on_push_ctx, ...)
if not request then
return nil, err
end
@@ -554,6 +557,15 @@ local function create_transport(host, port, user, password, callback,
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
body_len = tonumber(body_len)
+ if request.skip_header then
+ -- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
+ local map_len, key
+ map_len, buffer.rpos = decode_map(buffer.rpos, buffer:size())
+ assert(map_len == 1)
+ key, buffer.rpos = decode(buffer.rpos)
+ assert(key == IPROTO_DATA_KEY)
+ body_len = buffer:size()
+ end
if status == IPROTO_OK_KEY then
request.response = body_len
requests[id] = nil
@@ -1047,17 +1059,18 @@ end
function remote_methods:_request(method, opts, ...)
local transport = self._transport
- local on_push, on_push_ctx, buffer, deadline
+ local on_push, on_push_ctx, buffer, skip_header, deadline
-- Extract options, set defaults, check if the request is
-- async.
if opts then
buffer = opts.buffer
+ skip_header = opts.skip_header
if opts.is_async then
if opts.on_push or opts.on_push_ctx then
error('To handle pushes in an async request use future:pairs()')
end
- return transport.perform_async_request(buffer, method, table.insert,
- {}, ...)
+ return transport.perform_async_request(buffer, skip_header, method,
+ table.insert, {}, ...)
end
if opts.timeout then
-- conn.space:request(, { timeout = timeout })
@@ -1079,8 +1092,9 @@ function remote_methods:_request(method, opts, ...)
transport.wait_state('active', timeout)
timeout = deadline and max(0, deadline - fiber_clock())
end
- local res, err = transport.perform_request(timeout, buffer, method,
- on_push, on_push_ctx, ...)
+ local res, err = transport.perform_request(timeout, buffer, skip_header,
+ method, on_push, on_push_ctx,
+ ...)
if err then
box.error(err)
end
@@ -1283,10 +1297,10 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line})
+ res, err = pr(timeout, nil, false, 'eval', nil, nil, loader, {line})
else
assert(self.protocol == 'Lua console')
- res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
+ res, err = pr(timeout, nil, false, 'inject', nil, nil, line..'$EOF$\n')
end
if err then
box.error(err)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 2b5a84646..71d0e0a50 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -29,7 +29,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
offset, limit, key)
return ret
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
---
...
@@ -1573,6 +1573,18 @@ result
---
- {48: [[2]]}
...
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[2]]
+...
-- insert
c.space.test:insert({3}, {buffer = ibuf})
---
@@ -1585,6 +1597,21 @@ result
---
- {48: [[3]]}
...
+-- insert + skip_header
+_ = space:delete({3})
+---
+...
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
-- update
c.space.test:update({3}, {}, {buffer = ibuf})
---
@@ -1608,6 +1635,29 @@ result
---
- {48: [[3]]}
...
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
@@ -1620,6 +1670,18 @@ result
---
- {48: []}
...
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
@@ -1632,6 +1694,18 @@ result
---
- {48: []}
...
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- select
c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
---
@@ -1644,6 +1718,18 @@ result
---
- {48: [[3], [2], [1, 'hello']]}
...
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+---
+- 17
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3], [2], [1, 'hello']]
+...
-- select
len = c.space.test:select({}, {buffer = ibuf})
---
@@ -1667,6 +1753,29 @@ result
---
- {48: [[1, 'hello'], [2], [3], [4]]}
...
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+---
+...
+ibuf.rpos + len == ibuf.wpos
+---
+- true
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+ibuf.rpos == ibuf.wpos
+---
+- true
+...
+len
+---
+- 19
+...
+result
+---
+- [[1, 'hello'], [2], [3], [4]]
+...
-- call
c:call("echo", {1, 2, 3}, {buffer = ibuf})
---
@@ -1701,6 +1810,40 @@ result
---
- {48: []}
...
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- eval
c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
---
@@ -1735,6 +1878,40 @@ result
---
- {48: []}
...
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
---
@@ -2571,7 +2748,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
c.state
@@ -3205,7 +3382,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
while not c:is_connected() do fiber.sleep(0.01) end
@@ -3340,7 +3517,7 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
while f:status() ~= 'dead' do fiber.sleep(0.01) end
@@ -3359,7 +3536,7 @@ c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
---
...
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
---
- null
- Peer closed
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 96d822820..48cc7147d 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -12,7 +12,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
offset, limit, key)
return ret
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
LISTEN = require('uri').parse(box.cfg.listen)
@@ -615,11 +615,22 @@ c.space.test:replace({2}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- insert
c.space.test:insert({3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- insert + skip_header
+_ = space:delete({3})
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- update
c.space.test:update({3}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -628,21 +639,44 @@ c.space.test.index.primary:update({3}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- select
c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- select
len = c.space.test:select({}, {buffer = ibuf})
ibuf.rpos + len == ibuf.wpos
@@ -651,6 +685,14 @@ ibuf.rpos == ibuf.wpos
len
result
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+ibuf.rpos + len == ibuf.wpos
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+ibuf.rpos == ibuf.wpos
+len
+result
+
-- call
c:call("echo", {1, 2, 3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -662,6 +704,17 @@ c:call("echo", nil, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- eval
c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -673,6 +726,17 @@ c:eval("echo(...)", nil, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
c.space.test.index.primary:min({}, { buffer = ibuf})
@@ -1063,7 +1127,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
c.state
while not c:is_connected() do fiber.sleep(0.01) end
c:ping()
@@ -1291,7 +1355,7 @@ finalize_long()
--
c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
while not c:is_connected() do fiber.sleep(0.01) end
finalize_long()
future:wait_result(100)
@@ -1348,7 +1412,7 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
while f:status() ~= 'dead' do fiber.sleep(0.01) end
c:close()
@@ -1358,7 +1422,7 @@ c:close()
--
c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
c:close()
test_run:grep_log('default', 'too big packet size in the header') ~= nil
More information about the Tarantool-patches
mailing list