[PATCH v2 5/6] net.box: add helpers to decode msgpack headers

Alexander Turenko alexander.turenko at tarantool.org
Fri Feb 1 18:11:42 MSK 2019


Splitted this patch to two ones:

* lua: add non-recursive msgpack decoding functions
* net.box: add skip_header option to use with buffer

I attached them at end of the email.

WBR, Alexander Turenko.

On Thu, Jan 10, 2019 at 08:29:33PM +0300, Vladimir Davydov wrote:
> On Wed, Jan 09, 2019 at 11:20:13PM +0300, Alexander Turenko wrote:
> > Needed for #3276.
> > 
> > @TarantoolBot document
> > Title: net.box: helpers to decode msgpack headers
> > 
> > They allow to skip iproto packet and msgpack array headers and pass raw
> > msgpack data to some other function, say, merger.
> > 
> > Contracts:
> > 
> > ```
> > net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)
> >     -> new_rpos
> >     -> nil, err_msg
> 
> I'd prefer if this was done right in net.box.select or whatever function
> writing the response to ibuf. Yes, this is going to break backward
> compatibility, but IMO it's OK for 2.1 - I doubt anybody have used this
> weird high perf API anyway.

1. This will break tarantool/shard.
2. Hey, Guido thinks it is okay to break compatibility btw Python 2 and
   Python 3 and it seems that Python 2 is in use ten years or like so.

I can do it under a separate option: skip_iproto_header or skip_header.
It is not about a packet header, but part of body, however I have no
better variants.

> > msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, [, arr_len])
> >     -> new_rpos, arr_len
> >     -> nil, err_msg
> 
> This seems to be OK, although I'm not sure if we really need to check
> the length in this function. Looks like we will definitely need it
> because of net.box.call, which wraps function return value in an array.
> Not sure about the name either, because it doesn't just checks the
> msgpack - it decodes it, but can't come up with anything substantially
> better. May be, msgpack.decode_array?

Re check length: the reason was to simplify user's code, but ok, it will
not much more complex if we'll factor this check out. Like so (except
from the merger's commit message):

```
conn:call('batch_select', <...>, {buffer = buf, skip_header = true})
local len, _
len, buf.rpos = msgpack.decode_array(buf.rpos, buf:size())
assert(len == 1)
_, buf.rpos = msgpack.decode_array(buf.rpos, buf:size())
```

Re name: now I understood: decode_unchecked() is like mp_decode(),
decode() is like mp_check() + mp_decode(). So it worth to rename it to
decode_array(). Done.

Also I changed order of return values to match msgpack.decode() (before
it matches msgpack.ibuf_decode()).

> > ```
> > 
> > Below the example with msgpack.decode() as the function that need raw
> > msgpack data. It is just to illustrate the approach, there is no sense
> > to skip iproto/array headers manually in Lua and then decode the rest in
> > Lua. But it worth when the raw msgpack data is subject to process in a C
> > module.
> > 
> > ```lua
> > local function single_select(space, ...)
> >     return box.space[space]:select(...)
> > end
> > 
> > local function batch_select(spaces, ...)
> >     local res = {}
> >     for _, space in ipairs(spaces) do
> >         table.insert(res, box.space[space]:select(...))
> >     end
> >     return res
> > end
> > 
> > _G.single_select = single_select
> > _G.batch_select = batch_select
> > 
> > local res
> > 
> > local buf = buffer.ibuf()
> > conn.space.s:select(nil, {buffer = buf})
> > -- check and skip iproto_data header
> > buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos))
> > -- check that we really got data from :select() as result
> > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
> > -- check that the buffer ends
> > assert(buf.rpos == buf.wpos)
> > 
> > buf:recycle()
> > conn:call('single_select', {'s'}, {buffer = buf})
> > -- check and skip the iproto_data header
> > buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos))
> > -- check and skip the array around return values
> > buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, 1))
> > -- check that we really got data from :select() as result
> > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
> > -- check that the buffer ends
> > assert(buf.rpos == buf.wpos)
> > 
> > buf:recycle()
> > local spaces = {'s', 't'}
> > conn:call('batch_select', {spaces}, {buffer = buf})
> > -- check and skip the iproto_data header
> > buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos))
> > -- check and skip the array around return values
> > buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, 1))
> > -- check and skip the array header before the first select result
> > buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, #spaces))
> > -- check that we really got data from s:select() as result
> > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
> > -- t:select() data
> > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
> > -- check that the buffer ends
> > assert(buf.rpos == buf.wpos)
> > ```
> > ---
> >  src/box/lua/net_box.c         |  49 +++++++++++
> >  src/box/lua/net_box.lua       |   1 +
> >  src/lua/msgpack.c             |  66 ++++++++++++++
> >  test/app-tap/msgpack.test.lua | 157 +++++++++++++++++++++++++++++++++-
> >  test/box/net.box.result       |  58 +++++++++++++
> >  test/box/net.box.test.lua     |  26 ++++++
> >  6 files changed, 356 insertions(+), 1 deletion(-)
> > 
> > diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
> > index c7063d9c8..d71f33768 100644
> > --- a/src/box/lua/net_box.c
> > +++ b/src/box/lua/net_box.c
> > @@ -51,6 +51,9 @@
> >  
> >  #define cfg luaL_msgpack_default
> >  
> > +static uint32_t CTID_CHAR_PTR;
> > +static uint32_t CTID_CONST_CHAR_PTR;
> > +
> >  static inline size_t
> >  netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type)
> >  {
> > @@ -745,9 +748,54 @@ netbox_decode_execute(struct lua_State *L)
> >  	return 2;
> >  }
> >  
> > +/**
> > + * net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)
> > + *     -> new_rpos
> > + *     -> nil, err_msg
> > + */
> > +int
> > +netbox_check_iproto_data(struct lua_State *L)
> 
> Instead of adding this function to net_box.c, I'd rather try to add
> msgpack helpers for decoding a map, similar to msgpack.check_array added
> by your patch, and use them in net_box.lua.

Done.

We discussed that we should add such helpers for all types like nil,
bool, number, string, maybe bin. I think we can reuse recursive
msgpack.decode() if we expect a scalar value.

> > +{
> > +	uint32_t ctypeid;
> > +	const char *data = *(const char **) luaL_checkcdata(L, 1, &ctypeid);
> > +	if (ctypeid != CTID_CHAR_PTR && ctypeid != CTID_CONST_CHAR_PTR)
> > +		return luaL_error(L,
> > +			"net_box.check_iproto_data: 'char *' or "
> > +			"'const char *' expected");
> > +
> > +	if (!lua_isnumber(L, 2))
> > +		return luaL_error(L, "net_box.check_iproto_data: number "
> > +				  "expected as 2nd argument");
> > +	const char *end = data + lua_tointeger(L, 2);
> > +
> > +	int ok = data < end &&
> > +		mp_typeof(*data) == MP_MAP &&
> > +		mp_check_map(data, end) <= 0 &&
> > +		mp_decode_map(&data) == 1 &&
> > +		data < end &&
> > +		mp_typeof(*data) == MP_UINT &&
> > +		mp_check_uint(data, end) <= 0 &&
> > +		mp_decode_uint(&data) == IPROTO_DATA;
> > +
> > +	if (!ok) {
> > +		lua_pushnil(L);
> > +		lua_pushstring(L,
> > +			"net_box.check_iproto_data: wrong iproto data packet");
> > +		return 2;
> > +	}
> > +
> > +	*(const char **) luaL_pushcdata(L, ctypeid) = data;
> > +	return 1;
> > +}
> > +
> >  int
> >  luaopen_net_box(struct lua_State *L)
> >  {
> > +	CTID_CHAR_PTR = luaL_ctypeid(L, "char *");
> > +	assert(CTID_CHAR_PTR != 0);
> > +	CTID_CONST_CHAR_PTR = luaL_ctypeid(L, "const char *");
> > +	assert(CTID_CONST_CHAR_PTR != 0);
> > +
> >  	static const luaL_Reg net_box_lib[] = {
> >  		{ "encode_ping",    netbox_encode_ping },
> >  		{ "encode_call_16", netbox_encode_call_16 },
> > @@ -765,6 +813,7 @@ luaopen_net_box(struct lua_State *L)
> >  		{ "communicate",    netbox_communicate },
> >  		{ "decode_select",  netbox_decode_select },
> >  		{ "decode_execute", netbox_decode_execute },
> > +		{ "check_iproto_data", netbox_check_iproto_data },
> >  		{ NULL, NULL}
> >  	};
> >  	/* luaL_register_module polutes _G */
> > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> > index 2bf772aa8..0a38efa5a 100644
> > --- a/src/box/lua/net_box.lua
> > +++ b/src/box/lua/net_box.lua
> > @@ -1424,6 +1424,7 @@ local this_module = {
> >      new = connect, -- Tarantool < 1.7.1 compatibility,
> >      wrap = wrap,
> >      establish_connection = establish_connection,
> > +    check_iproto_data = internal.check_iproto_data,
> >  }
> >  
> >  function this_module.timeout(timeout, ...)
> > diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
> > index b47006038..fca440660 100644
> > --- a/src/lua/msgpack.c
> > +++ b/src/lua/msgpack.c
> > @@ -51,6 +51,7 @@ luamp_error(void *error_ctx)
> >  }
> >  
> >  static uint32_t CTID_CHAR_PTR;
> > +static uint32_t CTID_CONST_CHAR_PTR;
> >  static uint32_t CTID_STRUCT_IBUF;
> >  
> >  struct luaL_serializer *luaL_msgpack_default = NULL;
> > @@ -418,6 +419,68 @@ lua_ibuf_msgpack_decode(lua_State *L)
> >  	return 2;
> >  }
> >  
> > +/**
> > + * msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, [, arr_len])
> > + *     -> new_rpos, arr_len
> > + *     -> nil, err_msg
> > + */
> > +static int
> > +lua_check_array(lua_State *L)
> > +{
> > +	uint32_t ctypeid;
> > +	const char *data = *(const char **) luaL_checkcdata(L, 1, &ctypeid);
> > +	if (ctypeid != CTID_CHAR_PTR && ctypeid != CTID_CONST_CHAR_PTR)
> 
> Hm, msgpack.decode doesn't care about CTID_CONST_CHAR_PTR. Why should we?

It looks natural to support a const pointer where we allow non-const
one. But I don't have an example where we can obtain 'const char *'
buffer with msgpack in Lua (w/o ffi.cast()). Msgpackffi returns 'const
unsigned char *', but it is the bug and should be fixed in
https://github.com/tarantool/tarantool/issues/3926

> > +		return luaL_error(L, "msgpack.check_array: 'char *' or "
> > +				  "'const char *' expected");
> > +
> > +	if (!lua_isnumber(L, 2))
> > +		return luaL_error(L, "msgpack.check_array: number expected as "
> > +				  "2nd argument");
> > +	const char *end = data + lua_tointeger(L, 2);
> > +
> > +	if (!lua_isnoneornil(L, 3) && !lua_isnumber(L, 3))
> > +		return luaL_error(L, "msgpack.check_array: number or nil "
> > +				  "expected as 3rd argument");
> 
> Why not simply luaL_checkinteger?

We can separatelly check lua_gettop() and use luaL_checkinteger(). It
looks shorter, now I see. Fixed.

> > +
> > +	static const char *end_of_buffer_msg = "msgpack.check_array: "
> > +		"unexpected end of buffer";
> 
> No point to make this variable static.

Ok. But now I removed it.

> > +
> > +	if (data >= end) {
> > +		lua_pushnil(L);
> > +		lua_pushstring(L, end_of_buffer_msg);
> 
> msgpack.decode throws an error when it fails to decode msgpack data, so
> I think this function should throw too.

Or Lua code style states we should report errors with `nil, err`. But
this aspect is more about external modules as I see. It is quite unclear
what is the best option for built-in modules.

If one likely want to handle an error in Lua the `nil, err` approach
looks better. As far as I know at least some of our commercial projects
primarily use this approach and have to wrap many functions with pcall.
Don't sure how much the overhead is.

But anyway other msgpack functions just raise an error and it seems the
new functions should have similar contract.

Changed.

> > +		return 2;
> > +	}
> > +
> > +	if (mp_typeof(*data) != MP_ARRAY) {
> > +		lua_pushnil(L);
> > +		lua_pushstring(L, "msgpack.check_array: wrong array header");
> > +		return 2;
> > +	}
> > +
> > +	if (mp_check_array(data, end) > 0) {
> > +		lua_pushnil(L);
> > +		lua_pushstring(L, end_of_buffer_msg);
> > +		return 2;
> > +	}
> > +
> > +	uint32_t len = mp_decode_array(&data);
> > +
> > +	if (!lua_isnoneornil(L, 3)) {
> > +		uint32_t exp_len = (uint32_t) lua_tointeger(L, 3);
> 
> IMO it would be better if you set exp_len when you checked the arguments
> (using luaL_checkinteger).

Expected length was removed from the function as you suggested.

> > +		if (len != exp_len) {
> > +			lua_pushnil(L);
> > +			lua_pushfstring(L, "msgpack.check_array: expected "
> > +					"array of length %d, got length %d",
> > +					len, exp_len);
> > +			return 2;
> > +		}
> > +	}
> > +
> > +	*(const char **) luaL_pushcdata(L, ctypeid) = data;
> > +	lua_pushinteger(L, len);
> > +	return 2;
> > +}
> > +
> >  static int
> >  lua_msgpack_new(lua_State *L);
> >  
> > @@ -426,6 +489,7 @@ static const luaL_Reg msgpacklib[] = {
> >  	{ "decode", lua_msgpack_decode },
> >  	{ "decode_unchecked", lua_msgpack_decode_unchecked },
> >  	{ "ibuf_decode", lua_ibuf_msgpack_decode },
> > +	{ "check_array", lua_check_array },
> >  	{ "new", lua_msgpack_new },
> >  	{ NULL, NULL }
> >  };
> > @@ -447,6 +511,8 @@ luaopen_msgpack(lua_State *L)
> >  	assert(CTID_STRUCT_IBUF != 0);
> >  	CTID_CHAR_PTR = luaL_ctypeid(L, "char *");
> >  	assert(CTID_CHAR_PTR != 0);
> > +	CTID_CONST_CHAR_PTR = luaL_ctypeid(L, "const char *");
> > +	assert(CTID_CONST_CHAR_PTR != 0);
> >  	luaL_msgpack_default = luaL_newserializer(L, "msgpack", msgpacklib);
> >  	return 1;
> >  }

----

commit 8c820dff279734d79e26591dcb771f7c6ab13639
Author: Alexander Turenko <alexander.turenko at tarantool.org>
Date:   Thu Jan 31 01:45:22 2019 +0300

    lua: add non-recursive msgpack decoding functions
    
    Needed for #3276.
    
    @TarantoolBot document
    Title: Non-recursive msgpack decoding functions
    
    Contracts:
    
    ```
    msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
    msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
    ```
    
    These functions are intended to be used with a msgpack buffer received
    from net.box. A user may want to skip {[IPROTO_DATA_KEY] = ...} wrapper
    and an array header before pass the buffer to decode in some C function.
    
    See https://github.com/tarantool/tarantool/issues/2195 for more
    information re this net.box's API.
    
    Consider merger's docbot comment for usage examples.

diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
index b47006038..92a9efd25 100644
--- a/src/lua/msgpack.c
+++ b/src/lua/msgpack.c
@@ -418,6 +418,84 @@ lua_ibuf_msgpack_decode(lua_State *L)
 	return 2;
 }
 
+/**
+ * Verify and set arguments: data and size.
+ *
+ * Always return 0. In case of any fail raise a Lua error.
+ */
+static int
+verify_decode_args(lua_State *L, const char *func_name, const char **data_p,
+		   ptrdiff_t *size_p)
+{
+	/* Verify arguments count. */
+	if (lua_gettop(L) != 2)
+		return luaL_error(L, "Usage: %s(ptr, size)", func_name);
+
+	/* Verify ptr type. */
+	uint32_t ctypeid;
+	const char *data = *(char **) luaL_checkcdata(L, 1, &ctypeid);
+	if (ctypeid != CTID_CHAR_PTR)
+		return luaL_error(L, "%s: 'char *' expected", func_name);
+
+	/* Verify size type and value. */
+	ptrdiff_t size = (ptrdiff_t) luaL_checkinteger(L, 2);
+	if (size <= 0)
+		return luaL_error(L, "%s: non-positive size", func_name);
+
+	*data_p = data;
+	*size_p = size;
+
+	return 0;
+}
+
+/**
+ * msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
+ */
+static int
+lua_decode_array(lua_State *L)
+{
+	const char *func_name = "msgpack.decode_array";
+	const char *data;
+	ptrdiff_t size;
+	verify_decode_args(L, func_name, &data, &size);
+
+	if (mp_typeof(*data) != MP_ARRAY)
+		return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+	if (mp_check_array(data, data + size) > 0)
+		return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+	uint32_t len = mp_decode_array(&data);
+
+	lua_pushinteger(L, len);
+	*(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+	return 2;
+}
+
+/**
+ * msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
+ */
+static int
+lua_decode_map(lua_State *L)
+{
+	const char *func_name = "msgpack.decode_map";
+	const char *data;
+	ptrdiff_t size;
+	verify_decode_args(L, func_name, &data, &size);
+
+	if (mp_typeof(*data) != MP_MAP)
+		return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+	if (mp_check_map(data, data + size) > 0)
+		return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+	uint32_t len = mp_decode_map(&data);
+
+	lua_pushinteger(L, len);
+	*(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+	return 2;
+}
+
 static int
 lua_msgpack_new(lua_State *L);
 
@@ -426,6 +504,8 @@ static const luaL_Reg msgpacklib[] = {
 	{ "decode", lua_msgpack_decode },
 	{ "decode_unchecked", lua_msgpack_decode_unchecked },
 	{ "ibuf_decode", lua_ibuf_msgpack_decode },
+	{ "decode_array", lua_decode_array },
+	{ "decode_map", lua_decode_map },
 	{ "new", lua_msgpack_new },
 	{ NULL, NULL }
 };
diff --git a/test/app-tap/msgpack.test.lua b/test/app-tap/msgpack.test.lua
index 0e1692ad9..ee215dfb1 100755
--- a/test/app-tap/msgpack.test.lua
+++ b/test/app-tap/msgpack.test.lua
@@ -49,9 +49,186 @@ local function test_misc(test, s)
     test:ok(not st and e:match("null"), "null ibuf")
 end
 
+local function test_decode_array_map(test, s)
+    local ffi = require('ffi')
+
+    local usage_err = 'Usage: msgpack%.decode_[^_(]+%(ptr, size%)'
+    local end_of_buffer_err = 'msgpack%.decode_[^_]+: unexpected end of buffer'
+    local non_positive_size_err = 'msgpack.decode_[^_]+: non%-positive size'
+
+    local decode_cases = {
+        {
+            'fixarray',
+            func = s.decode_array,
+            data = ffi.cast('char *', '\x94'),
+            size = 1,
+            exp_len = 4,
+            exp_rewind = 1,
+        },
+        {
+            'array 16',
+            func = s.decode_array,
+            data = ffi.cast('char *', '\xdc\x00\x04'),
+            size = 3,
+            exp_len = 4,
+            exp_rewind = 3,
+        },
+        {
+            'array 32',
+            func = s.decode_array,
+            data = ffi.cast('char *', '\xdd\x00\x00\x00\x04'),
+            size = 5,
+            exp_len = 4,
+            exp_rewind = 5,
+        },
+        {
+            'truncated array 16',
+            func = s.decode_array,
+            data = ffi.cast('char *', '\xdc\x00'),
+            size = 2,
+            exp_err = end_of_buffer_err,
+        },
+        {
+            'truncated array 32',
+            func = s.decode_array,
+            data = ffi.cast('char *', '\xdd\x00\x00\x00'),
+            size = 4,
+            exp_err = end_of_buffer_err,
+        },
+        {
+            'fixmap',
+            func = s.decode_map,
+            data = ffi.cast('char *', '\x84'),
+            size = 1,
+            exp_len = 4,
+            exp_rewind = 1,
+        },
+        {
+            'map 16',
+            func = s.decode_map,
+            data = ffi.cast('char *', '\xde\x00\x04'),
+            size = 3,
+            exp_len = 4,
+            exp_rewind = 3,
+        },
+        {
+            'array 32',
+            func = s.decode_map,
+            data = ffi.cast('char *', '\xdf\x00\x00\x00\x04'),
+            size = 5,
+            exp_len = 4,
+            exp_rewind = 5,
+        },
+        {
+            'truncated map 16',
+            func = s.decode_map,
+            data = ffi.cast('char *', '\xde\x00'),
+            size = 2,
+            exp_err = end_of_buffer_err,
+        },
+        {
+            'truncated map 32',
+            func = s.decode_map,
+            data = ffi.cast('char *', '\xdf\x00\x00\x00'),
+            size = 4,
+            exp_err = end_of_buffer_err,
+        },
+    }
+
+    local bad_api_cases = {
+        {
+            'wrong msgpack type',
+            data = ffi.cast('char *', '\xc0'),
+            size = 1,
+            exp_err = 'msgpack.decode_[^_]+: unexpected msgpack type',
+        },
+        {
+            'zero size buffer',
+            data = ffi.cast('char *', ''),
+            size = 0,
+            exp_err = non_positive_size_err,
+        },
+        {
+            'negative size buffer',
+            data = ffi.cast('char *', ''),
+            size = -1,
+            exp_err = non_positive_size_err,
+        },
+        {
+            'size is nil',
+            data = ffi.cast('char *', ''),
+            size = nil,
+            exp_err = 'bad argument',
+        },
+        {
+            'no arguments',
+            args = {},
+            exp_err = usage_err,
+        },
+        {
+            'one argument',
+            args = {ffi.cast('char *', '')},
+            exp_err = usage_err,
+        },
+        {
+            'data is nil',
+            args = {nil, 1},
+            exp_err = 'expected cdata as 1 argument',
+        },
+        {
+            'data is not cdata',
+            args = {1, 1},
+            exp_err = 'expected cdata as 1 argument',
+        },
+        {
+            'data with wrong cdata type',
+            args = {box.tuple.new(), 1},
+            exp_err = "msgpack.decode_[^_]+: 'char %*' expected",
+        },
+        {
+            'size has wrong type',
+            args = {ffi.cast('char *', ''), 'eee'},
+            exp_err = 'bad argument',
+        },
+    }
+
+    test:plan(#decode_cases + 2 * #bad_api_cases)
+
+    -- Decode cases.
+    for _, case in ipairs(decode_cases) do
+        if case.exp_err ~= nil then
+            local ok, err = pcall(case.func, case.data, case.size)
+            local description = ('bad; %s'):format(case[1])
+            test:ok(ok == false and err:match(case.exp_err), description)
+        else
+            local len, new_buf = case.func(case.data, case.size)
+            local rewind = new_buf - case.data
+            local description = ('good; %s'):format(case[1])
+            test:is_deeply({len, rewind}, {case.exp_len, case.exp_rewind},
+                description)
+        end
+    end
+
+    -- Bad api usage cases.
+    for _, func_name in ipairs({'decode_array', 'decode_map'}) do
+        for _, case in ipairs(bad_api_cases) do
+            local ok, err
+            if case.args ~= nil then
+                local args_len = table.maxn(case.args)
+                ok, err = pcall(s[func_name], unpack(case.args, 1, args_len))
+            else
+                ok, err = pcall(s[func_name], case.data, case.size)
+            end
+            local description = ('%s bad api usage; %s'):format(func_name,
+                                                                case[1])
+            test:ok(ok == false and err:match(case.exp_err), description)
+        end
+    end
+end
+
 tap.test("msgpack", function(test)
     local serializer = require('msgpack')
-    test:plan(10)
+    test:plan(11)
     test:test("unsigned", common.test_unsigned, serializer)
     test:test("signed", common.test_signed, serializer)
     test:test("double", common.test_double, serializer)
@@ -62,4 +239,5 @@ tap.test("msgpack", function(test)
     test:test("ucdata", common.test_ucdata, serializer)
     test:test("offsets", test_offsets, serializer)
     test:test("misc", test_misc, serializer)
+    test:test("decode_array_map", test_decode_array_map, serializer)
 end)

----

commit 3868d5c2551c893f16bd05c79d4d52a564c6a833
Author: Alexander Turenko <alexander.turenko at tarantool.org>
Date:   Thu Jan 31 01:59:18 2019 +0300

    net.box: add skip_header option to use with buffer
    
    Needed for #3276.
    
    @TarantoolBot document
    Title: net.box: skip_header option
    
    This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
    from a buffer. This may be needed to pass this buffer to some C function
    when it expects some specific msgpack input.
    
    See src/box/lua/net_box.lua for examples. Also consider merger's docbot
    comment for more examples.

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 2bf772aa8..53c93cafb 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -15,6 +15,7 @@ local max           = math.max
 local fiber_clock   = fiber.clock
 local fiber_self    = fiber.self
 local decode        = msgpack.decode_unchecked
+local decode_map    = msgpack.decode_map
 
 local table_new           = require('table.new')
 local check_iterator_type = box.internal.check_iterator_type
@@ -483,8 +484,8 @@ local function create_transport(host, port, user, password, callback,
     -- @retval nil, error Error occured.
     -- @retval not nil Future object.
     --
-    local function perform_async_request(buffer, method, on_push, on_push_ctx,
-                                         ...)
+    local function perform_async_request(buffer, skip_header, method, on_push,
+                                         on_push_ctx, ...)
         if state ~= 'active' and state ~= 'fetch_schema' then
             return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
                                        reason = last_error})
@@ -497,12 +498,13 @@ local function create_transport(host, port, user, password, callback,
         local id = next_request_id
         method_encoder[method](send_buf, id, ...)
         next_request_id = next_id(id)
-        -- Request in most cases has maximum 8 members:
-        -- method, buffer, id, cond, errno, response, on_push,
-        -- on_push_ctx.
-        local request = setmetatable(table_new(0, 8), request_mt)
+        -- Request in most cases has maximum 9 members:
+        -- method, buffer, skip_header, id, cond, errno, response,
+        -- on_push, on_push_ctx.
+        local request = setmetatable(table_new(0, 9), request_mt)
         request.method = method
         request.buffer = buffer
+        request.skip_header = skip_header
         request.id = id
         request.cond = fiber.cond()
         requests[id] = request
@@ -516,10 +518,11 @@ local function create_transport(host, port, user, password, callback,
     -- @retval nil, error Error occured.
     -- @retval not nil Response object.
     --
-    local function perform_request(timeout, buffer, method, on_push,
-                                   on_push_ctx, ...)
+    local function perform_request(timeout, buffer, skip_header, method,
+                                   on_push, on_push_ctx, ...)
         local request, err =
-            perform_async_request(buffer, method, on_push, on_push_ctx, ...)
+            perform_async_request(buffer, skip_header, method, on_push,
+                                  on_push_ctx, ...)
         if not request then
             return nil, err
         end
@@ -554,6 +557,15 @@ local function create_transport(host, port, user, password, callback,
             local wpos = buffer:alloc(body_len)
             ffi.copy(wpos, body_rpos, body_len)
             body_len = tonumber(body_len)
+            if request.skip_header then
+                -- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
+                local map_len, key
+                map_len, buffer.rpos = decode_map(buffer.rpos, buffer:size())
+                assert(map_len == 1)
+                key, buffer.rpos = decode(buffer.rpos)
+                assert(key == IPROTO_DATA_KEY)
+                body_len = buffer:size()
+            end
             if status == IPROTO_OK_KEY then
                 request.response = body_len
                 requests[id] = nil
@@ -1047,17 +1059,18 @@ end
 
 function remote_methods:_request(method, opts, ...)
     local transport = self._transport
-    local on_push, on_push_ctx, buffer, deadline
+    local on_push, on_push_ctx, buffer, skip_header, deadline
     -- Extract options, set defaults, check if the request is
     -- async.
     if opts then
         buffer = opts.buffer
+        skip_header = opts.skip_header
         if opts.is_async then
             if opts.on_push or opts.on_push_ctx then
                 error('To handle pushes in an async request use future:pairs()')
             end
-            return transport.perform_async_request(buffer, method, table.insert,
-                                                   {}, ...)
+            return transport.perform_async_request(buffer, skip_header, method,
+                                                   table.insert, {}, ...)
         end
         if opts.timeout then
             -- conn.space:request(, { timeout = timeout })
@@ -1079,8 +1092,9 @@ function remote_methods:_request(method, opts, ...)
         transport.wait_state('active', timeout)
         timeout = deadline and max(0, deadline - fiber_clock())
     end
-    local res, err = transport.perform_request(timeout, buffer, method,
-                                               on_push, on_push_ctx, ...)
+    local res, err = transport.perform_request(timeout, buffer, skip_header,
+                                               method, on_push, on_push_ctx,
+                                               ...)
     if err then
         box.error(err)
     end
@@ -1283,10 +1297,10 @@ function console_methods:eval(line, timeout)
     end
     if self.protocol == 'Binary' then
         local loader = 'return require("console").eval(...)'
-        res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line})
+        res, err = pr(timeout, nil, false, 'eval', nil, nil, loader, {line})
     else
         assert(self.protocol == 'Lua console')
-        res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
+        res, err = pr(timeout, nil, false, 'inject', nil, nil, line..'$EOF$\n')
     end
     if err then
         box.error(err)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 2b5a84646..71d0e0a50 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -29,7 +29,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
                             offset, limit, key)
     return ret
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 ---
 ...
@@ -1573,6 +1573,18 @@ result
 ---
 - {48: [[2]]}
 ...
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[2]]
+...
 -- insert
 c.space.test:insert({3}, {buffer = ibuf})
 ---
@@ -1585,6 +1597,21 @@ result
 ---
 - {48: [[3]]}
 ...
+-- insert + skip_header
+_ = space:delete({3})
+---
+...
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
 -- update
 c.space.test:update({3}, {}, {buffer = ibuf})
 ---
@@ -1608,6 +1635,29 @@ result
 ---
 - {48: [[3]]}
 ...
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
 -- upsert
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 ---
@@ -1620,6 +1670,18 @@ result
 ---
 - {48: []}
 ...
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
 -- delete
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 ---
@@ -1632,6 +1694,18 @@ result
 ---
 - {48: []}
 ...
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
 -- select
 c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
 ---
@@ -1644,6 +1718,18 @@ result
 ---
 - {48: [[3], [2], [1, 'hello']]}
 ...
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+---
+- 17
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3], [2], [1, 'hello']]
+...
 -- select
 len = c.space.test:select({}, {buffer = ibuf})
 ---
@@ -1667,6 +1753,29 @@ result
 ---
 - {48: [[1, 'hello'], [2], [3], [4]]}
 ...
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+---
+...
+ibuf.rpos + len == ibuf.wpos
+---
+- true
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+ibuf.rpos == ibuf.wpos
+---
+- true
+...
+len
+---
+- 19
+...
+result
+---
+- [[1, 'hello'], [2], [3], [4]]
+...
 -- call
 c:call("echo", {1, 2, 3}, {buffer = ibuf})
 ---
@@ -1701,6 +1810,40 @@ result
 ---
 - {48: []}
 ...
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
 -- eval
 c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
 ---
@@ -1735,6 +1878,40 @@ result
 ---
 - {48: []}
 ...
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
 -- unsupported methods
 c.space.test:get({1}, { buffer = ibuf})
 ---
@@ -2571,7 +2748,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 ---
 ...
 c.state
@@ -3205,7 +3382,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
 future = c:call('long_function', {1, 2, 3}, {is_async = true})
 ---
 ...
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 ---
 ...
 while not c:is_connected() do fiber.sleep(0.01) end
@@ -3340,7 +3517,7 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 ---
 ...
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
@@ -3359,7 +3536,7 @@ c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
 ---
 ...
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
 ---
 - null
 - Peer closed
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 96d822820..48cc7147d 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -12,7 +12,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
                             offset, limit, key)
     return ret
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 
 LISTEN = require('uri').parse(box.cfg.listen)
@@ -615,11 +615,22 @@ c.space.test:replace({2}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- insert
 c.space.test:insert({3}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- insert + skip_header
+_ = space:delete({3})
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- update
 c.space.test:update({3}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -628,21 +639,44 @@ c.space.test.index.primary:update({3}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- upsert
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- delete
 c.space.test:upsert({4}, {}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- select
 c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- select
 len = c.space.test:select({}, {buffer = ibuf})
 ibuf.rpos + len == ibuf.wpos
@@ -651,6 +685,14 @@ ibuf.rpos == ibuf.wpos
 len
 result
 
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+ibuf.rpos + len == ibuf.wpos
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+ibuf.rpos == ibuf.wpos
+len
+result
+
 -- call
 c:call("echo", {1, 2, 3}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -662,6 +704,17 @@ c:call("echo", nil, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- eval
 c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -673,6 +726,17 @@ c:eval("echo(...)", nil, {buffer = ibuf})
 result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
 result
 
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
 -- unsupported methods
 c.space.test:get({1}, { buffer = ibuf})
 c.space.test.index.primary:min({}, { buffer = ibuf})
@@ -1063,7 +1127,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 c.state
 while not c:is_connected() do fiber.sleep(0.01) end
 c:ping()
@@ -1291,7 +1355,7 @@ finalize_long()
 --
 c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
 future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 while not c:is_connected() do fiber.sleep(0.01) end
 finalize_long()
 future:wait_result(100)
@@ -1348,7 +1412,7 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 c:close()
 
@@ -1358,7 +1422,7 @@ c:close()
 --
 c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
 c:close()
 test_run:grep_log('default', 'too big packet size in the header') ~= nil



More information about the Tarantool-patches mailing list