From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 1BAB0741FF; Fri, 6 Aug 2021 15:03:30 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 1BAB0741FF DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628251410; bh=o1MoJu7Rpg+MdeqBdhG+Th5IiyifmZAbF1GGsBqQ3Ic=; h=Date:To:Cc:References:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=e7T9C9D0CqTKqsUKlf5ngiN/9Tuwp2pmhCSJFzLuOmnfh+7kMCgZiHHT/tNgB6518 rt7Zj8wlf7Ce1ztZpEv6JAIX9y0DIAVBAI3HqwH5z0UzcCO++m45Wrx0xnf1XXd5tP 6rYlH0xg97zUzCQuPwtSUQnxrI5fWNRdNqo6fFRI= Received: from smtpng2.i.mail.ru (smtpng2.i.mail.ru [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id B5CC7741FF for ; Fri, 6 Aug 2021 15:03:28 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org B5CC7741FF Received: by smtpng2.m.smailru.net with esmtpa (envelope-from ) id 1mByZT-00016o-FT; Fri, 06 Aug 2021 15:03:27 +0300 Date: Fri, 6 Aug 2021 15:03:26 +0300 To: mechanik20051988 Cc: v.shpilevoy@tarantool.org, tarantool-patches@dev.tarantool.org, mechanik20051988 Message-ID: <20210806120326.bdf5o3otxhneodq3@esperanza> References: <6ec4b53e9bb3ea0d1d5873d47646ece3c93df6b4.1628184138.git.mechanik20.05.1988@gmail.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <6ec4b53e9bb3ea0d1d5873d47646ece3c93df6b4.1628184138.git.mechanik20.05.1988@gmail.com> X-4EC0790: 10 X-7564579A: B8F34718100C35BD X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD9D5AC6413C25DCF08CC98B8FCC5CD86F3182A05F538085040BF969F0EF9446DBDCFC13F3B587C60A148EA40888B273D988677ED1078466463 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE75DF2B1F23425CAE5EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006377A7A7D315BEE81B48638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D8F8A44B78C7499AC4385EAEFFE216A774117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCF1175FABE1C0F9B6A471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F446042972877693876707352033AC447995A7AD18BDFBBEFFF4125B51D2E47CDBA5A96583BA9C0B312567BB231DD303D21008E29813377AFFFEAFD269A417C69337E82CC2E827F84554CEF50127C277FBC8AE2E8BA83251EDC214901ED5E8D9A59859A8B6A45692FFBBD75A6A089D37D7C0E48F6C5571747095F342E88FB05168BE4CE3AF X-C1DE0DAB: 0D63561A33F958A57A38CBFB70AD8945EAEEE6B204D5A495F3F8867C121F8AAAD59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA7501A9DF589746230F410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D346B71C4B0698719D7A60654BCD78ACE8801E4F040D79F5C88172E4DCBF339BAEB2A715CC42ACEF79F1D7E09C32AA3244C87D2BC91728AE001BE28F3B957D7F032F165894D92D6270683B48618A63566E0 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojFhlvmGwdUwRdkrBfN2Z8Tw== X-Mailru-Sender: 689FA8AB762F7393C37E3C1AEC41BA5D0780D68CD5211D960767C38C3E8873DB274CEFED1673C562683ABF942079399BFB559BB5D741EB966A65DFF43FF7BE03240331F90058701C67EA787935ED9F1B X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH 5/7] net.box: add stream support to net.box X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladimir Davydov via Tarantool-patches Reply-To: Vladimir Davydov Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" On Thu, Aug 05, 2021 at 09:17:43PM +0300, mechanik20051988 wrote: > From: mechanik20051988 > > Add stream support to `net.box`. In "net.box", stream > is an object over connection that has the same methods, > but all requests from it sends with non-zero stream ID. > Since there can be a lot of streams, we do not copy the > spaces from the connection to the stream immediately when > creating a stream, but do it only when we first access space. > Also, when updating the schema, we update the spaces only for > those streams that have had at least one access to the space. > > Part of #5860 > > @TarantoolBot document > Title: stream support was added to net.box > In "net.box", stream is an object over connection that > has the same methods, but all requests from it sends > with non-zero stream ID. I see some differences between a stream and a connection while we want them to be completely interchangeable. 1. They are serialized differently: tarantool> c --- - peer_uuid: 140674d0-38e7-4fbd-9a54-7133dc9151af schema_version: 95 protocol: Binary state: active peer_version_id: 133376 port: '3301' ... tarantool> s --- - &0 _schema_version: 95 commit: 'function: 0x414d32e0' begin: 'function: 0x414d3278' rollback: 'function: 0x414d3390' space: _space: [] _stream: *0 _stream_id: 1 new_stream: 'function: 0x414d31f8' _conn: peer_uuid: 140674d0-38e7-4fbd-9a54-7133dc9151af schema_version: 95 protocol: Binary state: active peer_version_id: 133376 port: '3301' ... 2. When typed into a console, c.space lists all spaces while s.space shows this: tarantool> s.space --- - &0 _space: [] _stream: &1 _schema_version: 95 commit: 'function: 0x414d32e0' begin: 'function: 0x414d3278' rollback: 'function: 0x414d3390' space: *0 _stream_id: 1 new_stream: 'function: 0x414d31f8' _conn: peer_uuid: 140674d0-38e7-4fbd-9a54-7133dc9151af schema_version: 95 protocol: Binary state: active peer_version_id: 133376 port: '3301' ... There are other similar differences in serialization. For example, compare s.space._vfunc and c.space._vfunc or s.space._vfunc.index and c.space._vfunc.index. 3. c.space._space refers to the corresponding system space while s.space._space doesn't. 4. Autocompletion doesn't work for s.space. and s.space._vfunc. and s.space._vfunc.index. Some of them (most notably 3) can and must be fixed. I'm not so sure about autocompletion though. Please check. Also, I think that stream_id should be visible (without a leading underscore) when you dump a stream to the console. > Stream ID is generated on the client side in two ways: > automatically or manually. Manual stream ids are not supported anymore. Please update the comment. > User can choose any of two methods, but can not mix them. > Simple example of stream creation using net.box: > ```lua > -- automatically generated stream_id > stream = conn:stream() Now it's called conn:new_stream(). > -- manually chosen stream_id > stream = conn:stream(1) > ``` > --- > src/box/lua/net_box.c | 101 ++-- > src/box/lua/net_box.lua | 191 ++++-- > test/box/access.result | 6 +- > test/box/access.test.lua | 6 +- > ...net.box_console_connections_gh-2677.result | 2 +- > ...t.box_console_connections_gh-2677.test.lua | 2 +- > .../net.box_incorrect_iterator_gh-841.result | 4 +- > ...net.box_incorrect_iterator_gh-841.test.lua | 4 +- > test/box/net.box_iproto_hangs_gh-3464.result | 2 +- > .../box/net.box_iproto_hangs_gh-3464.test.lua | 2 +- > .../net.box_long-poll_input_gh-3400.result | 8 +- > .../net.box_long-poll_input_gh-3400.test.lua | 8 +- > test/box/stream.lua | 13 + > test/box/stream.result | 553 ++++++++++++++++++ > test/box/stream.test.lua | 207 +++++++ > test/box/suite.ini | 2 +- > 16 files changed, 1011 insertions(+), 100 deletions(-) > create mode 100644 test/box/stream.lua > create mode 100644 test/box/stream.result > create mode 100644 test/box/stream.test.lua > > diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c > index 82efc483d..ec850cd9f 100644 > --- a/src/box/lua/net_box.c > +++ b/src/box/lua/net_box.c > @@ -128,20 +132,21 @@ netbox_encode_request(struct mpstream *stream, size_t initial_size) > > static void > netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream, > - uint64_t sync) > + uint64_t sync, uint64_t stream_id) > { > (void)L; > (void)idx; > - size_t svp = netbox_prepare_request(stream, sync, IPROTO_PING); > + size_t svp = netbox_prepare_request(stream, sync, > + IPROTO_PING, stream_id); > netbox_encode_request(stream, svp); > } > > static int > netbox_encode_auth(lua_State *L) > { > - if (lua_gettop(L) < 5) { > + if (lua_gettop(L) < 6) { > return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, " > - "user, password, greeting)"); > + "stream_id, user, password, greeting)"); IPROTO_AUTH shouldn't support streams so this is not necessary. Looking at net_box.lua, you always pass nil for stream_id anyway. Please revert this part. > } > struct ibuf *ibuf = (struct ibuf *)lua_topointer(L, 1); > uint64_t sync = luaL_touint64(L, 2); > @@ -149,14 +154,14 @@ netbox_encode_auth(lua_State *L) > struct mpstream stream; > mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, > luamp_error, L); > - size_t svp = netbox_prepare_request(&stream, sync, IPROTO_AUTH); > + size_t svp = netbox_prepare_request(&stream, sync, IPROTO_AUTH, 0); > > size_t user_len; > - const char *user = lua_tolstring(L, 3, &user_len); > + const char *user = lua_tolstring(L, 4, &user_len); > size_t password_len; > - const char *password = lua_tolstring(L, 4, &password_len); > + const char *password = lua_tolstring(L, 5, &password_len); > size_t salt_len; > - const char *salt = lua_tolstring(L, 5, &salt_len); > + const char *salt = lua_tolstring(L, 6, &salt_len); > if (salt_len < SCRAMBLE_SIZE) > return luaL_error(L, "Invalid salt"); > > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua > index 9af6028eb..bf6a89e15 100644 > --- a/src/box/lua/net_box.lua > +++ b/src/box/lua/net_box.lua > @@ -930,6 +930,8 @@ local function remote_serialize(self) > } > end > > +local stream_methods = {} > + This table is not really needed. Please remove. > local remote_methods = {} > local remote_mt = { > __index = remote_methods, __serialize = remote_serialize, > @@ -942,6 +944,90 @@ local console_mt = { > __metatable = false > } > > +local stream_index_mt = { > + __index = function(self, key) > + return self._src[key] > + end > +} > + > +-- Create stream space index, which is same as connection space > +-- index, but have non zero stream ID. > +local function stream_wrap_index(stream_id, src) > + return setmetatable({ > + _stream_id = stream_id, > + _src = src, > + }, stream_index_mt) > +end > + > +-- Metatable for stream space indexes. When stream space being > +-- created there are no indexes in it. When accessing the space > +-- index, we look for corresponding space index in corresponding > +-- connection space. If it is found we create same index for the > +-- stream space but with corresponding stream ID. We do not need > +-- to compare stream _schema_version and connection schema_version, > +-- because all access to index is carried out through it's space. > +-- So we update schema_version when we access space. > +local stream_indexes_mt = { > + __index = function(self, key) > + local _space = self._space > + local src = _space._src.index[key] > + if not src then > + return nil > + end > + local res = stream_wrap_index(_space._stream_id, src) > + self[key] = res > + return res > + end > +} > + > +local stream_space_mt = { > + __index = function(self, key) > + return self._src[key] > + end > +} > + > +-- Create stream space, which is same as connection space, > +-- but have non zero stream ID. > +local function stream_wrap_space(stream, src) > + local res = setmetatable({ > + _stream_id = stream._stream_id, > + _src = src, > + index = setmetatable({ > + _space = nil, > + }, stream_indexes_mt) > + }, stream_space_mt) > + res.index._space = res > + return res > +end > + > +-- Metatable for stream spaces. When stream being created there > +-- are no spaces in it. When user try to access some space in > +-- stream, we first of all compare _schema_version of stream with > +-- schema_version from connection and if they are not equal, we > +-- clear stream space cache and update it's schema_version. Then > +-- we look for corresponding space in the connection. If it is > +-- found we create same space for the stream but with corresponding > +-- stream ID. > +local stream_spaces_mt = { > + __index = function(self, key) > + local stream = self._stream > + if stream._schema_version ~= stream._conn.schema_version then > + stream._schema_version = stream._conn.schema_version > + self._space = {} > + end > + if self._space[key] then > + return self._space[key] > + end > + local src = stream._conn.space[key] > + if not src then > + return nil > + end > + local res = stream_wrap_space(stream, src) > + self._space[key] = res > + return res > + end > +} > + > local space_metatable, index_metatable > > local function new_sm(host, port, opts, connection, greeting) > @@ -1021,6 +1107,9 @@ local function new_sm(host, port, opts, connection, greeting) > if opts.wait_connected ~= false then > remote._transport.wait_state('active', tonumber(opts.wait_connected)) > end > + -- Last stream ID used for this connection > + remote._last_stream_id = 0 > + remote._streams = setmetatable({}, {__mode = 'v'}) This table is never read from. Please remove. > return remote > end > > @@ -1078,6 +1167,29 @@ local function check_eval_args(args) > end > end > > +function stream_methods:new_stream() Can be inlined. > + check_remote_arg(self, 'stream') 'new_stream' > + box.error(E_PROC_LUA, "Unsupported for stream"); Why? Let's create a new stream here: return self._conn:new_stream() > +end > + > +function remote_methods:new_stream() > + check_remote_arg(self, 'stream') 'new_stream' > + self._last_stream_id = self._last_stream_id + 1 > + local stream = setmetatable({ > + new_stream = stream_methods.new_stream, > + _stream_id = self._last_stream_id, > + space = setmetatable({ > + _space = {}, > + _stream = nil, > + }, stream_spaces_mt), > + _conn = self, > + _schema_version = self.schema_version, > + }, { __index = self }) > + stream.space._stream = stream > + self._streams[self._last_stream_id] = stream > + return stream > +end > + > function remote_methods:close() > check_remote_arg(self, 'close') > self._transport.stop() > @@ -1108,7 +1220,7 @@ function remote_methods:wait_connected(timeout) > return self._transport.wait_state('active', timeout) > end > > -function remote_methods:_request(method, opts, format, ...) > +function remote_methods:_request(method, opts, format, stream_id, ...) > local transport = self._transport > local on_push, on_push_ctx, buffer, skip_header, deadline > -- Extract options, set defaults, check if the request is > @@ -1123,7 +1235,7 @@ function remote_methods:_request(method, opts, format, ...) > local res, err = > transport.perform_async_request(buffer, skip_header, method, > table.insert, {}, format, > - ...) > + stream_id, ...) > if err then > box.error(err) > end > @@ -1145,7 +1257,7 @@ function remote_methods:_request(method, opts, format, ...) > end > local res, err = transport.perform_request(timeout, buffer, skip_header, > method, on_push, on_push_ctx, > - format, ...) > + format, stream_id, ...) > if err then > box.error(err) > end > @@ -1161,7 +1273,7 @@ end > > function remote_methods:ping(opts) > check_remote_arg(self, 'ping') > - return (pcall(self._request, self, M_PING, opts)) > + return (pcall(self._request, self, M_PING, opts, nil, self._stream_id)) > end > > function remote_methods:reload_schema() > @@ -1172,14 +1284,16 @@ end > -- @deprecated since 1.7.4 > function remote_methods:call_16(func_name, ...) > check_remote_arg(self, 'call') > - return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...})) > + return (self:_request(M_CALL_16, nil, nil, self._stream_id, > + tostring(func_name), {...})) > end > > function remote_methods:call(func_name, args, opts) > check_remote_arg(self, 'call') > check_call_args(args) > args = args or {} > - local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args) > + local res = self:_request(M_CALL_17, opts, nil, self._stream_id, > + tostring(func_name), args) Rather than passing self._stream_id through all the net.box methods, can't you simply set it in _request?