From: Alexander Turenko <alexander.turenko@tarantool.org> To: Vladimir Davydov <vdavydov.dev@gmail.com> Cc: tarantool-patches@freelists.org Subject: Re: [PATCH v2 5/6] net.box: add helpers to decode msgpack headers Date: Fri, 1 Feb 2019 18:11:42 +0300 [thread overview] Message-ID: <20190201151141.pwiyaqzoqt42zt5m@tkn_work_nb> (raw) In-Reply-To: <20190110172933.255ikbqspmcjmqaj@esperanza> Splitted this patch to two ones: * lua: add non-recursive msgpack decoding functions * net.box: add skip_header option to use with buffer I attached them at end of the email. WBR, Alexander Turenko. On Thu, Jan 10, 2019 at 08:29:33PM +0300, Vladimir Davydov wrote: > On Wed, Jan 09, 2019 at 11:20:13PM +0300, Alexander Turenko wrote: > > Needed for #3276. > > > > @TarantoolBot document > > Title: net.box: helpers to decode msgpack headers > > > > They allow to skip iproto packet and msgpack array headers and pass raw > > msgpack data to some other function, say, merger. > > > > Contracts: > > > > ``` > > net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos) > > -> new_rpos > > -> nil, err_msg > > I'd prefer if this was done right in net.box.select or whatever function > writing the response to ibuf. Yes, this is going to break backward > compatibility, but IMO it's OK for 2.1 - I doubt anybody have used this > weird high perf API anyway. 1. This will break tarantool/shard. 2. Hey, Guido thinks it is okay to break compatibility btw Python 2 and Python 3 and it seems that Python 2 is in use ten years or like so. I can do it under a separate option: skip_iproto_header or skip_header. It is not about a packet header, but part of body, however I have no better variants. > > msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, [, arr_len]) > > -> new_rpos, arr_len > > -> nil, err_msg > > This seems to be OK, although I'm not sure if we really need to check > the length in this function. Looks like we will definitely need it > because of net.box.call, which wraps function return value in an array. > Not sure about the name either, because it doesn't just checks the > msgpack - it decodes it, but can't come up with anything substantially > better. May be, msgpack.decode_array? Re check length: the reason was to simplify user's code, but ok, it will not much more complex if we'll factor this check out. Like so (except from the merger's commit message): ``` conn:call('batch_select', <...>, {buffer = buf, skip_header = true}) local len, _ len, buf.rpos = msgpack.decode_array(buf.rpos, buf:size()) assert(len == 1) _, buf.rpos = msgpack.decode_array(buf.rpos, buf:size()) ``` Re name: now I understood: decode_unchecked() is like mp_decode(), decode() is like mp_check() + mp_decode(). So it worth to rename it to decode_array(). Done. Also I changed order of return values to match msgpack.decode() (before it matches msgpack.ibuf_decode()). > > ``` > > > > Below the example with msgpack.decode() as the function that need raw > > msgpack data. It is just to illustrate the approach, there is no sense > > to skip iproto/array headers manually in Lua and then decode the rest in > > Lua. But it worth when the raw msgpack data is subject to process in a C > > module. > > > > ```lua > > local function single_select(space, ...) > > return box.space[space]:select(...) > > end > > > > local function batch_select(spaces, ...) > > local res = {} > > for _, space in ipairs(spaces) do > > table.insert(res, box.space[space]:select(...)) > > end > > return res > > end > > > > _G.single_select = single_select > > _G.batch_select = batch_select > > > > local res > > > > local buf = buffer.ibuf() > > conn.space.s:select(nil, {buffer = buf}) > > -- check and skip iproto_data header > > buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)) > > -- check that we really got data from :select() as result > > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos) > > -- check that the buffer ends > > assert(buf.rpos == buf.wpos) > > > > buf:recycle() > > conn:call('single_select', {'s'}, {buffer = buf}) > > -- check and skip the iproto_data header > > buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)) > > -- check and skip the array around return values > > buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, 1)) > > -- check that we really got data from :select() as result > > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos) > > -- check that the buffer ends > > assert(buf.rpos == buf.wpos) > > > > buf:recycle() > > local spaces = {'s', 't'} > > conn:call('batch_select', {spaces}, {buffer = buf}) > > -- check and skip the iproto_data header > > buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)) > > -- check and skip the array around return values > > buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, 1)) > > -- check and skip the array header before the first select result > > buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, #spaces)) > > -- check that we really got data from s:select() as result > > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos) > > -- t:select() data > > res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos) > > -- check that the buffer ends > > assert(buf.rpos == buf.wpos) > > ``` > > --- > > src/box/lua/net_box.c | 49 +++++++++++ > > src/box/lua/net_box.lua | 1 + > > src/lua/msgpack.c | 66 ++++++++++++++ > > test/app-tap/msgpack.test.lua | 157 +++++++++++++++++++++++++++++++++- > > test/box/net.box.result | 58 +++++++++++++ > > test/box/net.box.test.lua | 26 ++++++ > > 6 files changed, 356 insertions(+), 1 deletion(-) > > > > diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c > > index c7063d9c8..d71f33768 100644 > > --- a/src/box/lua/net_box.c > > +++ b/src/box/lua/net_box.c > > @@ -51,6 +51,9 @@ > > > > #define cfg luaL_msgpack_default > > > > +static uint32_t CTID_CHAR_PTR; > > +static uint32_t CTID_CONST_CHAR_PTR; > > + > > static inline size_t > > netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type) > > { > > @@ -745,9 +748,54 @@ netbox_decode_execute(struct lua_State *L) > > return 2; > > } > > > > +/** > > + * net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos) > > + * -> new_rpos > > + * -> nil, err_msg > > + */ > > +int > > +netbox_check_iproto_data(struct lua_State *L) > > Instead of adding this function to net_box.c, I'd rather try to add > msgpack helpers for decoding a map, similar to msgpack.check_array added > by your patch, and use them in net_box.lua. Done. We discussed that we should add such helpers for all types like nil, bool, number, string, maybe bin. I think we can reuse recursive msgpack.decode() if we expect a scalar value. > > +{ > > + uint32_t ctypeid; > > + const char *data = *(const char **) luaL_checkcdata(L, 1, &ctypeid); > > + if (ctypeid != CTID_CHAR_PTR && ctypeid != CTID_CONST_CHAR_PTR) > > + return luaL_error(L, > > + "net_box.check_iproto_data: 'char *' or " > > + "'const char *' expected"); > > + > > + if (!lua_isnumber(L, 2)) > > + return luaL_error(L, "net_box.check_iproto_data: number " > > + "expected as 2nd argument"); > > + const char *end = data + lua_tointeger(L, 2); > > + > > + int ok = data < end && > > + mp_typeof(*data) == MP_MAP && > > + mp_check_map(data, end) <= 0 && > > + mp_decode_map(&data) == 1 && > > + data < end && > > + mp_typeof(*data) == MP_UINT && > > + mp_check_uint(data, end) <= 0 && > > + mp_decode_uint(&data) == IPROTO_DATA; > > + > > + if (!ok) { > > + lua_pushnil(L); > > + lua_pushstring(L, > > + "net_box.check_iproto_data: wrong iproto data packet"); > > + return 2; > > + } > > + > > + *(const char **) luaL_pushcdata(L, ctypeid) = data; > > + return 1; > > +} > > + > > int > > luaopen_net_box(struct lua_State *L) > > { > > + CTID_CHAR_PTR = luaL_ctypeid(L, "char *"); > > + assert(CTID_CHAR_PTR != 0); > > + CTID_CONST_CHAR_PTR = luaL_ctypeid(L, "const char *"); > > + assert(CTID_CONST_CHAR_PTR != 0); > > + > > static const luaL_Reg net_box_lib[] = { > > { "encode_ping", netbox_encode_ping }, > > { "encode_call_16", netbox_encode_call_16 }, > > @@ -765,6 +813,7 @@ luaopen_net_box(struct lua_State *L) > > { "communicate", netbox_communicate }, > > { "decode_select", netbox_decode_select }, > > { "decode_execute", netbox_decode_execute }, > > + { "check_iproto_data", netbox_check_iproto_data }, > > { NULL, NULL} > > }; > > /* luaL_register_module polutes _G */ > > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua > > index 2bf772aa8..0a38efa5a 100644 > > --- a/src/box/lua/net_box.lua > > +++ b/src/box/lua/net_box.lua > > @@ -1424,6 +1424,7 @@ local this_module = { > > new = connect, -- Tarantool < 1.7.1 compatibility, > > wrap = wrap, > > establish_connection = establish_connection, > > + check_iproto_data = internal.check_iproto_data, > > } > > > > function this_module.timeout(timeout, ...) > > diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c > > index b47006038..fca440660 100644 > > --- a/src/lua/msgpack.c > > +++ b/src/lua/msgpack.c > > @@ -51,6 +51,7 @@ luamp_error(void *error_ctx) > > } > > > > static uint32_t CTID_CHAR_PTR; > > +static uint32_t CTID_CONST_CHAR_PTR; > > static uint32_t CTID_STRUCT_IBUF; > > > > struct luaL_serializer *luaL_msgpack_default = NULL; > > @@ -418,6 +419,68 @@ lua_ibuf_msgpack_decode(lua_State *L) > > return 2; > > } > > > > +/** > > + * msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, [, arr_len]) > > + * -> new_rpos, arr_len > > + * -> nil, err_msg > > + */ > > +static int > > +lua_check_array(lua_State *L) > > +{ > > + uint32_t ctypeid; > > + const char *data = *(const char **) luaL_checkcdata(L, 1, &ctypeid); > > + if (ctypeid != CTID_CHAR_PTR && ctypeid != CTID_CONST_CHAR_PTR) > > Hm, msgpack.decode doesn't care about CTID_CONST_CHAR_PTR. Why should we? It looks natural to support a const pointer where we allow non-const one. But I don't have an example where we can obtain 'const char *' buffer with msgpack in Lua (w/o ffi.cast()). Msgpackffi returns 'const unsigned char *', but it is the bug and should be fixed in https://github.com/tarantool/tarantool/issues/3926 > > + return luaL_error(L, "msgpack.check_array: 'char *' or " > > + "'const char *' expected"); > > + > > + if (!lua_isnumber(L, 2)) > > + return luaL_error(L, "msgpack.check_array: number expected as " > > + "2nd argument"); > > + const char *end = data + lua_tointeger(L, 2); > > + > > + if (!lua_isnoneornil(L, 3) && !lua_isnumber(L, 3)) > > + return luaL_error(L, "msgpack.check_array: number or nil " > > + "expected as 3rd argument"); > > Why not simply luaL_checkinteger? We can separatelly check lua_gettop() and use luaL_checkinteger(). It looks shorter, now I see. Fixed. > > + > > + static const char *end_of_buffer_msg = "msgpack.check_array: " > > + "unexpected end of buffer"; > > No point to make this variable static. Ok. But now I removed it. > > + > > + if (data >= end) { > > + lua_pushnil(L); > > + lua_pushstring(L, end_of_buffer_msg); > > msgpack.decode throws an error when it fails to decode msgpack data, so > I think this function should throw too. Or Lua code style states we should report errors with `nil, err`. But this aspect is more about external modules as I see. It is quite unclear what is the best option for built-in modules. If one likely want to handle an error in Lua the `nil, err` approach looks better. As far as I know at least some of our commercial projects primarily use this approach and have to wrap many functions with pcall. Don't sure how much the overhead is. But anyway other msgpack functions just raise an error and it seems the new functions should have similar contract. Changed. > > + return 2; > > + } > > + > > + if (mp_typeof(*data) != MP_ARRAY) { > > + lua_pushnil(L); > > + lua_pushstring(L, "msgpack.check_array: wrong array header"); > > + return 2; > > + } > > + > > + if (mp_check_array(data, end) > 0) { > > + lua_pushnil(L); > > + lua_pushstring(L, end_of_buffer_msg); > > + return 2; > > + } > > + > > + uint32_t len = mp_decode_array(&data); > > + > > + if (!lua_isnoneornil(L, 3)) { > > + uint32_t exp_len = (uint32_t) lua_tointeger(L, 3); > > IMO it would be better if you set exp_len when you checked the arguments > (using luaL_checkinteger). Expected length was removed from the function as you suggested. > > + if (len != exp_len) { > > + lua_pushnil(L); > > + lua_pushfstring(L, "msgpack.check_array: expected " > > + "array of length %d, got length %d", > > + len, exp_len); > > + return 2; > > + } > > + } > > + > > + *(const char **) luaL_pushcdata(L, ctypeid) = data; > > + lua_pushinteger(L, len); > > + return 2; > > +} > > + > > static int > > lua_msgpack_new(lua_State *L); > > > > @@ -426,6 +489,7 @@ static const luaL_Reg msgpacklib[] = { > > { "decode", lua_msgpack_decode }, > > { "decode_unchecked", lua_msgpack_decode_unchecked }, > > { "ibuf_decode", lua_ibuf_msgpack_decode }, > > + { "check_array", lua_check_array }, > > { "new", lua_msgpack_new }, > > { NULL, NULL } > > }; > > @@ -447,6 +511,8 @@ luaopen_msgpack(lua_State *L) > > assert(CTID_STRUCT_IBUF != 0); > > CTID_CHAR_PTR = luaL_ctypeid(L, "char *"); > > assert(CTID_CHAR_PTR != 0); > > + CTID_CONST_CHAR_PTR = luaL_ctypeid(L, "const char *"); > > + assert(CTID_CONST_CHAR_PTR != 0); > > luaL_msgpack_default = luaL_newserializer(L, "msgpack", msgpacklib); > > return 1; > > } ---- commit 8c820dff279734d79e26591dcb771f7c6ab13639 Author: Alexander Turenko <alexander.turenko@tarantool.org> Date: Thu Jan 31 01:45:22 2019 +0300 lua: add non-recursive msgpack decoding functions Needed for #3276. @TarantoolBot document Title: Non-recursive msgpack decoding functions Contracts: ``` msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos ``` These functions are intended to be used with a msgpack buffer received from net.box. A user may want to skip {[IPROTO_DATA_KEY] = ...} wrapper and an array header before pass the buffer to decode in some C function. See https://github.com/tarantool/tarantool/issues/2195 for more information re this net.box's API. Consider merger's docbot comment for usage examples. diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c index b47006038..92a9efd25 100644 --- a/src/lua/msgpack.c +++ b/src/lua/msgpack.c @@ -418,6 +418,84 @@ lua_ibuf_msgpack_decode(lua_State *L) return 2; } +/** + * Verify and set arguments: data and size. + * + * Always return 0. In case of any fail raise a Lua error. + */ +static int +verify_decode_args(lua_State *L, const char *func_name, const char **data_p, + ptrdiff_t *size_p) +{ + /* Verify arguments count. */ + if (lua_gettop(L) != 2) + return luaL_error(L, "Usage: %s(ptr, size)", func_name); + + /* Verify ptr type. */ + uint32_t ctypeid; + const char *data = *(char **) luaL_checkcdata(L, 1, &ctypeid); + if (ctypeid != CTID_CHAR_PTR) + return luaL_error(L, "%s: 'char *' expected", func_name); + + /* Verify size type and value. */ + ptrdiff_t size = (ptrdiff_t) luaL_checkinteger(L, 2); + if (size <= 0) + return luaL_error(L, "%s: non-positive size", func_name); + + *data_p = data; + *size_p = size; + + return 0; +} + +/** + * msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos + */ +static int +lua_decode_array(lua_State *L) +{ + const char *func_name = "msgpack.decode_array"; + const char *data; + ptrdiff_t size; + verify_decode_args(L, func_name, &data, &size); + + if (mp_typeof(*data) != MP_ARRAY) + return luaL_error(L, "%s: unexpected msgpack type", func_name); + + if (mp_check_array(data, data + size) > 0) + return luaL_error(L, "%s: unexpected end of buffer", func_name); + + uint32_t len = mp_decode_array(&data); + + lua_pushinteger(L, len); + *(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data; + return 2; +} + +/** + * msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos + */ +static int +lua_decode_map(lua_State *L) +{ + const char *func_name = "msgpack.decode_map"; + const char *data; + ptrdiff_t size; + verify_decode_args(L, func_name, &data, &size); + + if (mp_typeof(*data) != MP_MAP) + return luaL_error(L, "%s: unexpected msgpack type", func_name); + + if (mp_check_map(data, data + size) > 0) + return luaL_error(L, "%s: unexpected end of buffer", func_name); + + uint32_t len = mp_decode_map(&data); + + lua_pushinteger(L, len); + *(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data; + return 2; +} + static int lua_msgpack_new(lua_State *L); @@ -426,6 +504,8 @@ static const luaL_Reg msgpacklib[] = { { "decode", lua_msgpack_decode }, { "decode_unchecked", lua_msgpack_decode_unchecked }, { "ibuf_decode", lua_ibuf_msgpack_decode }, + { "decode_array", lua_decode_array }, + { "decode_map", lua_decode_map }, { "new", lua_msgpack_new }, { NULL, NULL } }; diff --git a/test/app-tap/msgpack.test.lua b/test/app-tap/msgpack.test.lua index 0e1692ad9..ee215dfb1 100755 --- a/test/app-tap/msgpack.test.lua +++ b/test/app-tap/msgpack.test.lua @@ -49,9 +49,186 @@ local function test_misc(test, s) test:ok(not st and e:match("null"), "null ibuf") end +local function test_decode_array_map(test, s) + local ffi = require('ffi') + + local usage_err = 'Usage: msgpack%.decode_[^_(]+%(ptr, size%)' + local end_of_buffer_err = 'msgpack%.decode_[^_]+: unexpected end of buffer' + local non_positive_size_err = 'msgpack.decode_[^_]+: non%-positive size' + + local decode_cases = { + { + 'fixarray', + func = s.decode_array, + data = ffi.cast('char *', '\x94'), + size = 1, + exp_len = 4, + exp_rewind = 1, + }, + { + 'array 16', + func = s.decode_array, + data = ffi.cast('char *', '\xdc\x00\x04'), + size = 3, + exp_len = 4, + exp_rewind = 3, + }, + { + 'array 32', + func = s.decode_array, + data = ffi.cast('char *', '\xdd\x00\x00\x00\x04'), + size = 5, + exp_len = 4, + exp_rewind = 5, + }, + { + 'truncated array 16', + func = s.decode_array, + data = ffi.cast('char *', '\xdc\x00'), + size = 2, + exp_err = end_of_buffer_err, + }, + { + 'truncated array 32', + func = s.decode_array, + data = ffi.cast('char *', '\xdd\x00\x00\x00'), + size = 4, + exp_err = end_of_buffer_err, + }, + { + 'fixmap', + func = s.decode_map, + data = ffi.cast('char *', '\x84'), + size = 1, + exp_len = 4, + exp_rewind = 1, + }, + { + 'map 16', + func = s.decode_map, + data = ffi.cast('char *', '\xde\x00\x04'), + size = 3, + exp_len = 4, + exp_rewind = 3, + }, + { + 'array 32', + func = s.decode_map, + data = ffi.cast('char *', '\xdf\x00\x00\x00\x04'), + size = 5, + exp_len = 4, + exp_rewind = 5, + }, + { + 'truncated map 16', + func = s.decode_map, + data = ffi.cast('char *', '\xde\x00'), + size = 2, + exp_err = end_of_buffer_err, + }, + { + 'truncated map 32', + func = s.decode_map, + data = ffi.cast('char *', '\xdf\x00\x00\x00'), + size = 4, + exp_err = end_of_buffer_err, + }, + } + + local bad_api_cases = { + { + 'wrong msgpack type', + data = ffi.cast('char *', '\xc0'), + size = 1, + exp_err = 'msgpack.decode_[^_]+: unexpected msgpack type', + }, + { + 'zero size buffer', + data = ffi.cast('char *', ''), + size = 0, + exp_err = non_positive_size_err, + }, + { + 'negative size buffer', + data = ffi.cast('char *', ''), + size = -1, + exp_err = non_positive_size_err, + }, + { + 'size is nil', + data = ffi.cast('char *', ''), + size = nil, + exp_err = 'bad argument', + }, + { + 'no arguments', + args = {}, + exp_err = usage_err, + }, + { + 'one argument', + args = {ffi.cast('char *', '')}, + exp_err = usage_err, + }, + { + 'data is nil', + args = {nil, 1}, + exp_err = 'expected cdata as 1 argument', + }, + { + 'data is not cdata', + args = {1, 1}, + exp_err = 'expected cdata as 1 argument', + }, + { + 'data with wrong cdata type', + args = {box.tuple.new(), 1}, + exp_err = "msgpack.decode_[^_]+: 'char %*' expected", + }, + { + 'size has wrong type', + args = {ffi.cast('char *', ''), 'eee'}, + exp_err = 'bad argument', + }, + } + + test:plan(#decode_cases + 2 * #bad_api_cases) + + -- Decode cases. + for _, case in ipairs(decode_cases) do + if case.exp_err ~= nil then + local ok, err = pcall(case.func, case.data, case.size) + local description = ('bad; %s'):format(case[1]) + test:ok(ok == false and err:match(case.exp_err), description) + else + local len, new_buf = case.func(case.data, case.size) + local rewind = new_buf - case.data + local description = ('good; %s'):format(case[1]) + test:is_deeply({len, rewind}, {case.exp_len, case.exp_rewind}, + description) + end + end + + -- Bad api usage cases. + for _, func_name in ipairs({'decode_array', 'decode_map'}) do + for _, case in ipairs(bad_api_cases) do + local ok, err + if case.args ~= nil then + local args_len = table.maxn(case.args) + ok, err = pcall(s[func_name], unpack(case.args, 1, args_len)) + else + ok, err = pcall(s[func_name], case.data, case.size) + end + local description = ('%s bad api usage; %s'):format(func_name, + case[1]) + test:ok(ok == false and err:match(case.exp_err), description) + end + end +end + tap.test("msgpack", function(test) local serializer = require('msgpack') - test:plan(10) + test:plan(11) test:test("unsigned", common.test_unsigned, serializer) test:test("signed", common.test_signed, serializer) test:test("double", common.test_double, serializer) @@ -62,4 +239,5 @@ tap.test("msgpack", function(test) test:test("ucdata", common.test_ucdata, serializer) test:test("offsets", test_offsets, serializer) test:test("misc", test_misc, serializer) + test:test("decode_array_map", test_decode_array_map, serializer) end) ---- commit 3868d5c2551c893f16bd05c79d4d52a564c6a833 Author: Alexander Turenko <alexander.turenko@tarantool.org> Date: Thu Jan 31 01:59:18 2019 +0300 net.box: add skip_header option to use with buffer Needed for #3276. @TarantoolBot document Title: net.box: skip_header option This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper from a buffer. This may be needed to pass this buffer to some C function when it expects some specific msgpack input. See src/box/lua/net_box.lua for examples. Also consider merger's docbot comment for more examples. diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 2bf772aa8..53c93cafb 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -15,6 +15,7 @@ local max = math.max local fiber_clock = fiber.clock local fiber_self = fiber.self local decode = msgpack.decode_unchecked +local decode_map = msgpack.decode_map local table_new = require('table.new') local check_iterator_type = box.internal.check_iterator_type @@ -483,8 +484,8 @@ local function create_transport(host, port, user, password, callback, -- @retval nil, error Error occured. -- @retval not nil Future object. -- - local function perform_async_request(buffer, method, on_push, on_push_ctx, - ...) + local function perform_async_request(buffer, skip_header, method, on_push, + on_push_ctx, ...) if state ~= 'active' and state ~= 'fetch_schema' then return nil, box.error.new({code = last_errno or E_NO_CONNECTION, reason = last_error}) @@ -497,12 +498,13 @@ local function create_transport(host, port, user, password, callback, local id = next_request_id method_encoder[method](send_buf, id, ...) next_request_id = next_id(id) - -- Request in most cases has maximum 8 members: - -- method, buffer, id, cond, errno, response, on_push, - -- on_push_ctx. - local request = setmetatable(table_new(0, 8), request_mt) + -- Request in most cases has maximum 9 members: + -- method, buffer, skip_header, id, cond, errno, response, + -- on_push, on_push_ctx. + local request = setmetatable(table_new(0, 9), request_mt) request.method = method request.buffer = buffer + request.skip_header = skip_header request.id = id request.cond = fiber.cond() requests[id] = request @@ -516,10 +518,11 @@ local function create_transport(host, port, user, password, callback, -- @retval nil, error Error occured. -- @retval not nil Response object. -- - local function perform_request(timeout, buffer, method, on_push, - on_push_ctx, ...) + local function perform_request(timeout, buffer, skip_header, method, + on_push, on_push_ctx, ...) local request, err = - perform_async_request(buffer, method, on_push, on_push_ctx, ...) + perform_async_request(buffer, skip_header, method, on_push, + on_push_ctx, ...) if not request then return nil, err end @@ -554,6 +557,15 @@ local function create_transport(host, port, user, password, callback, local wpos = buffer:alloc(body_len) ffi.copy(wpos, body_rpos, body_len) body_len = tonumber(body_len) + if request.skip_header then + -- Skip {[IPROTO_DATA_KEY] = ...} wrapper. + local map_len, key + map_len, buffer.rpos = decode_map(buffer.rpos, buffer:size()) + assert(map_len == 1) + key, buffer.rpos = decode(buffer.rpos) + assert(key == IPROTO_DATA_KEY) + body_len = buffer:size() + end if status == IPROTO_OK_KEY then request.response = body_len requests[id] = nil @@ -1047,17 +1059,18 @@ end function remote_methods:_request(method, opts, ...) local transport = self._transport - local on_push, on_push_ctx, buffer, deadline + local on_push, on_push_ctx, buffer, skip_header, deadline -- Extract options, set defaults, check if the request is -- async. if opts then buffer = opts.buffer + skip_header = opts.skip_header if opts.is_async then if opts.on_push or opts.on_push_ctx then error('To handle pushes in an async request use future:pairs()') end - return transport.perform_async_request(buffer, method, table.insert, - {}, ...) + return transport.perform_async_request(buffer, skip_header, method, + table.insert, {}, ...) end if opts.timeout then -- conn.space:request(, { timeout = timeout }) @@ -1079,8 +1092,9 @@ function remote_methods:_request(method, opts, ...) transport.wait_state('active', timeout) timeout = deadline and max(0, deadline - fiber_clock()) end - local res, err = transport.perform_request(timeout, buffer, method, - on_push, on_push_ctx, ...) + local res, err = transport.perform_request(timeout, buffer, skip_header, + method, on_push, on_push_ctx, + ...) if err then box.error(err) end @@ -1283,10 +1297,10 @@ function console_methods:eval(line, timeout) end if self.protocol == 'Binary' then local loader = 'return require("console").eval(...)' - res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line}) + res, err = pr(timeout, nil, false, 'eval', nil, nil, loader, {line}) else assert(self.protocol == 'Lua console') - res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n') + res, err = pr(timeout, nil, false, 'inject', nil, nil, line..'$EOF$\n') end if err then box.error(err) diff --git a/test/box/net.box.result b/test/box/net.box.result index 2b5a84646..71d0e0a50 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -29,7 +29,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) offset, limit, key) return ret end -function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end +function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); --- ... @@ -1573,6 +1573,18 @@ result --- - {48: [[2]]} ... +-- replace + skip_header +c.space.test:replace({2}, {buffer = ibuf, skip_header = true}) +--- +- 7 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [[2]] +... -- insert c.space.test:insert({3}, {buffer = ibuf}) --- @@ -1585,6 +1597,21 @@ result --- - {48: [[3]]} ... +-- insert + skip_header +_ = space:delete({3}) +--- +... +c.space.test:insert({3}, {buffer = ibuf, skip_header = true}) +--- +- 7 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [[3]] +... -- update c.space.test:update({3}, {}, {buffer = ibuf}) --- @@ -1608,6 +1635,29 @@ result --- - {48: [[3]]} ... +-- update + skip_header +c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true}) +--- +- 7 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [[3]] +... +c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true}) +--- +- 7 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [[3]] +... -- upsert c.space.test:upsert({4}, {}, {buffer = ibuf}) --- @@ -1620,6 +1670,18 @@ result --- - {48: []} ... +-- upsert + skip_header +c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... -- delete c.space.test:upsert({4}, {}, {buffer = ibuf}) --- @@ -1632,6 +1694,18 @@ result --- - {48: []} ... +-- delete + skip_header +c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... -- select c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf}) --- @@ -1644,6 +1718,18 @@ result --- - {48: [[3], [2], [1, 'hello']]} ... +-- select + skip_header +c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true}) +--- +- 17 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [[3], [2], [1, 'hello']] +... -- select len = c.space.test:select({}, {buffer = ibuf}) --- @@ -1667,6 +1753,29 @@ result --- - {48: [[1, 'hello'], [2], [3], [4]]} ... +-- select + skip_header +len = c.space.test:select({}, {buffer = ibuf, skip_header = true}) +--- +... +ibuf.rpos + len == ibuf.wpos +--- +- true +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +ibuf.rpos == ibuf.wpos +--- +- true +... +len +--- +- 19 +... +result +--- +- [[1, 'hello'], [2], [3], [4]] +... -- call c:call("echo", {1, 2, 3}, {buffer = ibuf}) --- @@ -1701,6 +1810,40 @@ result --- - {48: []} ... +-- call + skip_header +c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +--- +- 8 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [1, 2, 3] +... +c:call("echo", {}, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... +c:call("echo", nil, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... -- eval c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf}) --- @@ -1735,6 +1878,40 @@ result --- - {48: []} ... +-- eval + skip_header +c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... +c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... +c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true}) +--- +- 5 +... +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +--- +... +result +--- +- [] +... -- unsupported methods c.space.test:get({1}, { buffer = ibuf}) --- @@ -2571,7 +2748,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') --- ... c.state @@ -3205,7 +3382,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01}) future = c:call('long_function', {1, 2, 3}, {is_async = true}) --- ... -_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') --- ... while not c:is_connected() do fiber.sleep(0.01) end @@ -3340,7 +3517,7 @@ c:ping() -- new attempts to read any data - the connection is closed -- already. -- -f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') --- ... while f:status() ~= 'dead' do fiber.sleep(0.01) end @@ -3359,7 +3536,7 @@ c = net:connect(box.cfg.listen) data = msgpack.encode(18400000000000000000)..'aaaaaaa' --- ... -c._transport.perform_request(nil, nil, 'inject', nil, nil, data) +c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data) --- - null - Peer closed diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua index 96d822820..48cc7147d 100644 --- a/test/box/net.box.test.lua +++ b/test/box/net.box.test.lua @@ -12,7 +12,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) offset, limit, key) return ret end -function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end +function x_fatal(cn) cn._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); LISTEN = require('uri').parse(box.cfg.listen) @@ -615,11 +615,22 @@ c.space.test:replace({2}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- replace + skip_header +c.space.test:replace({2}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- insert c.space.test:insert({3}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- insert + skip_header +_ = space:delete({3}) +c.space.test:insert({3}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- update c.space.test:update({3}, {}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) @@ -628,21 +639,44 @@ c.space.test.index.primary:update({3}, {}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- update + skip_header +c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- upsert c.space.test:upsert({4}, {}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- upsert + skip_header +c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- delete c.space.test:upsert({4}, {}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- delete + skip_header +c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- select c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- select + skip_header +c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- select len = c.space.test:select({}, {buffer = ibuf}) ibuf.rpos + len == ibuf.wpos @@ -651,6 +685,14 @@ ibuf.rpos == ibuf.wpos len result +-- select + skip_header +len = c.space.test:select({}, {buffer = ibuf, skip_header = true}) +ibuf.rpos + len == ibuf.wpos +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +ibuf.rpos == ibuf.wpos +len +result + -- call c:call("echo", {1, 2, 3}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) @@ -662,6 +704,17 @@ c:call("echo", nil, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- call + skip_header +c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +c:call("echo", {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +c:call("echo", nil, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- eval c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) @@ -673,6 +726,17 @@ c:eval("echo(...)", nil, {buffer = ibuf}) result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) result +-- eval + skip_header +c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result +c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true}) +result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) +result + -- unsupported methods c.space.test:get({1}, { buffer = ibuf}) c.space.test.index.primary:min({}, { buffer = ibuf}) @@ -1063,7 +1127,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') c.state while not c:is_connected() do fiber.sleep(0.01) end c:ping() @@ -1291,7 +1355,7 @@ finalize_long() -- c = net:connect(box.cfg.listen, {reconnect_after = 0.01}) future = c:call('long_function', {1, 2, 3}, {is_async = true}) -_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') while not c:is_connected() do fiber.sleep(0.01) end finalize_long() future:wait_result(100) @@ -1348,7 +1412,7 @@ c:ping() -- new attempts to read any data - the connection is closed -- already. -- -f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') +f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80') while f:status() ~= 'dead' do fiber.sleep(0.01) end c:close() @@ -1358,7 +1422,7 @@ c:close() -- c = net:connect(box.cfg.listen) data = msgpack.encode(18400000000000000000)..'aaaaaaa' -c._transport.perform_request(nil, nil, 'inject', nil, nil, data) +c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data) c:close() test_run:grep_log('default', 'too big packet size in the header') ~= nil
next prev parent reply other threads:[~2019-02-01 15:11 UTC|newest] Thread overview: 28+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-01-09 20:20 [PATCH v2 0/6] Merger Alexander Turenko 2019-01-09 20:20 ` [PATCH v2 1/6] Add luaL_iscallable with support of cdata metatype Alexander Turenko 2019-01-10 12:21 ` Vladimir Davydov 2019-01-09 20:20 ` [PATCH v2 2/6] Add functions to ease using Lua iterators from C Alexander Turenko 2019-01-10 12:29 ` Vladimir Davydov 2019-01-15 23:26 ` Alexander Turenko 2019-01-16 8:18 ` Vladimir Davydov 2019-01-16 11:40 ` Alexander Turenko 2019-01-16 12:20 ` Vladimir Davydov 2019-01-17 1:20 ` Alexander Turenko 2019-01-28 18:17 ` Alexander Turenko 2019-03-01 4:04 ` Alexander Turenko 2019-01-09 20:20 ` [PATCH v2 3/6] lua: add luaT_newtuple() Alexander Turenko 2019-01-10 12:44 ` Vladimir Davydov 2019-01-18 21:58 ` Alexander Turenko 2019-01-23 16:12 ` Vladimir Davydov 2019-01-28 16:51 ` Alexander Turenko 2019-03-01 4:08 ` Alexander Turenko 2019-01-09 20:20 ` [PATCH v2 4/6] lua: add luaT_new_key_def() Alexander Turenko 2019-01-10 13:07 ` Vladimir Davydov 2019-01-29 18:52 ` Alexander Turenko 2019-01-30 10:58 ` Alexander Turenko 2019-03-01 4:10 ` Alexander Turenko 2019-01-09 20:20 ` [PATCH v2 5/6] net.box: add helpers to decode msgpack headers Alexander Turenko 2019-01-10 17:29 ` Vladimir Davydov 2019-02-01 15:11 ` Alexander Turenko [this message] 2019-03-05 12:00 ` Alexander Turenko 2019-01-09 20:20 ` [PATCH v2 6/6] Add merger for tuple streams Alexander Turenko
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20190201151141.pwiyaqzoqt42zt5m@tkn_work_nb \ --to=alexander.turenko@tarantool.org \ --cc=tarantool-patches@freelists.org \ --cc=vdavydov.dev@gmail.com \ --subject='Re: [PATCH v2 5/6] net.box: add helpers to decode msgpack headers' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox