From: Vladimir Davydov via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: mechanik20051988 <mechanik20051988@tarantool.org> Cc: v.shpilevoy@tarantool.org, tarantool-patches@dev.tarantool.org, mechanik20051988 <mechanik20.05.1988@gmail.com> Subject: Re: [Tarantool-patches] [PATCH 5/7] net.box: add stream support to net.box Date: Fri, 6 Aug 2021 15:03:26 +0300 [thread overview] Message-ID: <20210806120326.bdf5o3otxhneodq3@esperanza> (raw) In-Reply-To: <6ec4b53e9bb3ea0d1d5873d47646ece3c93df6b4.1628184138.git.mechanik20.05.1988@gmail.com> On Thu, Aug 05, 2021 at 09:17:43PM +0300, mechanik20051988 wrote: > From: mechanik20051988 <mechanik20.05.1988@gmail.com> > > Add stream support to `net.box`. In "net.box", stream > is an object over connection that has the same methods, > but all requests from it sends with non-zero stream ID. > Since there can be a lot of streams, we do not copy the > spaces from the connection to the stream immediately when > creating a stream, but do it only when we first access space. > Also, when updating the schema, we update the spaces only for > those streams that have had at least one access to the space. > > Part of #5860 > > @TarantoolBot document > Title: stream support was added to net.box > In "net.box", stream is an object over connection that > has the same methods, but all requests from it sends > with non-zero stream ID. I see some differences between a stream and a connection while we want them to be completely interchangeable. 1. They are serialized differently: tarantool> c --- - peer_uuid: 140674d0-38e7-4fbd-9a54-7133dc9151af schema_version: 95 protocol: Binary state: active peer_version_id: 133376 port: '3301' ... tarantool> s --- - &0 _schema_version: 95 commit: 'function: 0x414d32e0' begin: 'function: 0x414d3278' rollback: 'function: 0x414d3390' space: _space: [] _stream: *0 _stream_id: 1 new_stream: 'function: 0x414d31f8' _conn: peer_uuid: 140674d0-38e7-4fbd-9a54-7133dc9151af schema_version: 95 protocol: Binary state: active peer_version_id: 133376 port: '3301' ... 2. When typed into a console, c.space lists all spaces while s.space shows this: tarantool> s.space --- - &0 _space: [] _stream: &1 _schema_version: 95 commit: 'function: 0x414d32e0' begin: 'function: 0x414d3278' rollback: 'function: 0x414d3390' space: *0 _stream_id: 1 new_stream: 'function: 0x414d31f8' _conn: peer_uuid: 140674d0-38e7-4fbd-9a54-7133dc9151af schema_version: 95 protocol: Binary state: active peer_version_id: 133376 port: '3301' ... There are other similar differences in serialization. For example, compare s.space._vfunc and c.space._vfunc or s.space._vfunc.index and c.space._vfunc.index. 3. c.space._space refers to the corresponding system space while s.space._space doesn't. 4. Autocompletion doesn't work for s.space.<TAB> and s.space._vfunc.<TAB> and s.space._vfunc.index.<TAB> Some of them (most notably 3) can and must be fixed. I'm not so sure about autocompletion though. Please check. Also, I think that stream_id should be visible (without a leading underscore) when you dump a stream to the console. > Stream ID is generated on the client side in two ways: > automatically or manually. Manual stream ids are not supported anymore. Please update the comment. > User can choose any of two methods, but can not mix them. > Simple example of stream creation using net.box: > ```lua > -- automatically generated stream_id > stream = conn:stream() Now it's called conn:new_stream(). > -- manually chosen stream_id > stream = conn:stream(1) > ``` > --- > src/box/lua/net_box.c | 101 ++-- > src/box/lua/net_box.lua | 191 ++++-- > test/box/access.result | 6 +- > test/box/access.test.lua | 6 +- > ...net.box_console_connections_gh-2677.result | 2 +- > ...t.box_console_connections_gh-2677.test.lua | 2 +- > .../net.box_incorrect_iterator_gh-841.result | 4 +- > ...net.box_incorrect_iterator_gh-841.test.lua | 4 +- > test/box/net.box_iproto_hangs_gh-3464.result | 2 +- > .../box/net.box_iproto_hangs_gh-3464.test.lua | 2 +- > .../net.box_long-poll_input_gh-3400.result | 8 +- > .../net.box_long-poll_input_gh-3400.test.lua | 8 +- > test/box/stream.lua | 13 + > test/box/stream.result | 553 ++++++++++++++++++ > test/box/stream.test.lua | 207 +++++++ > test/box/suite.ini | 2 +- > 16 files changed, 1011 insertions(+), 100 deletions(-) > create mode 100644 test/box/stream.lua > create mode 100644 test/box/stream.result > create mode 100644 test/box/stream.test.lua > > diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c > index 82efc483d..ec850cd9f 100644 > --- a/src/box/lua/net_box.c > +++ b/src/box/lua/net_box.c > @@ -128,20 +132,21 @@ netbox_encode_request(struct mpstream *stream, size_t initial_size) > > static void > netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream, > - uint64_t sync) > + uint64_t sync, uint64_t stream_id) > { > (void)L; > (void)idx; > - size_t svp = netbox_prepare_request(stream, sync, IPROTO_PING); > + size_t svp = netbox_prepare_request(stream, sync, > + IPROTO_PING, stream_id); > netbox_encode_request(stream, svp); > } > > static int > netbox_encode_auth(lua_State *L) > { > - if (lua_gettop(L) < 5) { > + if (lua_gettop(L) < 6) { > return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, " > - "user, password, greeting)"); > + "stream_id, user, password, greeting)"); IPROTO_AUTH shouldn't support streams so this is not necessary. Looking at net_box.lua, you always pass nil for stream_id anyway. Please revert this part. > } > struct ibuf *ibuf = (struct ibuf *)lua_topointer(L, 1); > uint64_t sync = luaL_touint64(L, 2); > @@ -149,14 +154,14 @@ netbox_encode_auth(lua_State *L) > struct mpstream stream; > mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, > luamp_error, L); > - size_t svp = netbox_prepare_request(&stream, sync, IPROTO_AUTH); > + size_t svp = netbox_prepare_request(&stream, sync, IPROTO_AUTH, 0); > > size_t user_len; > - const char *user = lua_tolstring(L, 3, &user_len); > + const char *user = lua_tolstring(L, 4, &user_len); > size_t password_len; > - const char *password = lua_tolstring(L, 4, &password_len); > + const char *password = lua_tolstring(L, 5, &password_len); > size_t salt_len; > - const char *salt = lua_tolstring(L, 5, &salt_len); > + const char *salt = lua_tolstring(L, 6, &salt_len); > if (salt_len < SCRAMBLE_SIZE) > return luaL_error(L, "Invalid salt"); > > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua > index 9af6028eb..bf6a89e15 100644 > --- a/src/box/lua/net_box.lua > +++ b/src/box/lua/net_box.lua > @@ -930,6 +930,8 @@ local function remote_serialize(self) > } > end > > +local stream_methods = {} > + This table is not really needed. Please remove. > local remote_methods = {} > local remote_mt = { > __index = remote_methods, __serialize = remote_serialize, > @@ -942,6 +944,90 @@ local console_mt = { > __metatable = false > } > > +local stream_index_mt = { > + __index = function(self, key) > + return self._src[key] > + end > +} > + > +-- Create stream space index, which is same as connection space > +-- index, but have non zero stream ID. > +local function stream_wrap_index(stream_id, src) > + return setmetatable({ > + _stream_id = stream_id, > + _src = src, > + }, stream_index_mt) > +end > + > +-- Metatable for stream space indexes. When stream space being > +-- created there are no indexes in it. When accessing the space > +-- index, we look for corresponding space index in corresponding > +-- connection space. If it is found we create same index for the > +-- stream space but with corresponding stream ID. We do not need > +-- to compare stream _schema_version and connection schema_version, > +-- because all access to index is carried out through it's space. > +-- So we update schema_version when we access space. > +local stream_indexes_mt = { > + __index = function(self, key) > + local _space = self._space > + local src = _space._src.index[key] > + if not src then > + return nil > + end > + local res = stream_wrap_index(_space._stream_id, src) > + self[key] = res > + return res > + end > +} > + > +local stream_space_mt = { > + __index = function(self, key) > + return self._src[key] > + end > +} > + > +-- Create stream space, which is same as connection space, > +-- but have non zero stream ID. > +local function stream_wrap_space(stream, src) > + local res = setmetatable({ > + _stream_id = stream._stream_id, > + _src = src, > + index = setmetatable({ > + _space = nil, > + }, stream_indexes_mt) > + }, stream_space_mt) > + res.index._space = res > + return res > +end > + > +-- Metatable for stream spaces. When stream being created there > +-- are no spaces in it. When user try to access some space in > +-- stream, we first of all compare _schema_version of stream with > +-- schema_version from connection and if they are not equal, we > +-- clear stream space cache and update it's schema_version. Then > +-- we look for corresponding space in the connection. If it is > +-- found we create same space for the stream but with corresponding > +-- stream ID. > +local stream_spaces_mt = { > + __index = function(self, key) > + local stream = self._stream > + if stream._schema_version ~= stream._conn.schema_version then > + stream._schema_version = stream._conn.schema_version > + self._space = {} > + end > + if self._space[key] then > + return self._space[key] > + end > + local src = stream._conn.space[key] > + if not src then > + return nil > + end > + local res = stream_wrap_space(stream, src) > + self._space[key] = res > + return res > + end > +} > + > local space_metatable, index_metatable > > local function new_sm(host, port, opts, connection, greeting) > @@ -1021,6 +1107,9 @@ local function new_sm(host, port, opts, connection, greeting) > if opts.wait_connected ~= false then > remote._transport.wait_state('active', tonumber(opts.wait_connected)) > end > + -- Last stream ID used for this connection > + remote._last_stream_id = 0 > + remote._streams = setmetatable({}, {__mode = 'v'}) This table is never read from. Please remove. > return remote > end > > @@ -1078,6 +1167,29 @@ local function check_eval_args(args) > end > end > > +function stream_methods:new_stream() Can be inlined. > + check_remote_arg(self, 'stream') 'new_stream' > + box.error(E_PROC_LUA, "Unsupported for stream"); Why? Let's create a new stream here: return self._conn:new_stream() > +end > + > +function remote_methods:new_stream() > + check_remote_arg(self, 'stream') 'new_stream' > + self._last_stream_id = self._last_stream_id + 1 > + local stream = setmetatable({ > + new_stream = stream_methods.new_stream, > + _stream_id = self._last_stream_id, > + space = setmetatable({ > + _space = {}, > + _stream = nil, > + }, stream_spaces_mt), > + _conn = self, > + _schema_version = self.schema_version, > + }, { __index = self }) > + stream.space._stream = stream > + self._streams[self._last_stream_id] = stream > + return stream > +end > + > function remote_methods:close() > check_remote_arg(self, 'close') > self._transport.stop() > @@ -1108,7 +1220,7 @@ function remote_methods:wait_connected(timeout) > return self._transport.wait_state('active', timeout) > end > > -function remote_methods:_request(method, opts, format, ...) > +function remote_methods:_request(method, opts, format, stream_id, ...) > local transport = self._transport > local on_push, on_push_ctx, buffer, skip_header, deadline > -- Extract options, set defaults, check if the request is > @@ -1123,7 +1235,7 @@ function remote_methods:_request(method, opts, format, ...) > local res, err = > transport.perform_async_request(buffer, skip_header, method, > table.insert, {}, format, > - ...) > + stream_id, ...) > if err then > box.error(err) > end > @@ -1145,7 +1257,7 @@ function remote_methods:_request(method, opts, format, ...) > end > local res, err = transport.perform_request(timeout, buffer, skip_header, > method, on_push, on_push_ctx, > - format, ...) > + format, stream_id, ...) > if err then > box.error(err) > end > @@ -1161,7 +1273,7 @@ end > > function remote_methods:ping(opts) > check_remote_arg(self, 'ping') > - return (pcall(self._request, self, M_PING, opts)) > + return (pcall(self._request, self, M_PING, opts, nil, self._stream_id)) > end > > function remote_methods:reload_schema() > @@ -1172,14 +1284,16 @@ end > -- @deprecated since 1.7.4 > function remote_methods:call_16(func_name, ...) > check_remote_arg(self, 'call') > - return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...})) > + return (self:_request(M_CALL_16, nil, nil, self._stream_id, > + tostring(func_name), {...})) > end > > function remote_methods:call(func_name, args, opts) > check_remote_arg(self, 'call') > check_call_args(args) > args = args or {} > - local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args) > + local res = self:_request(M_CALL_17, opts, nil, self._stream_id, > + tostring(func_name), args) Rather than passing self._stream_id through all the net.box methods, can't you simply set it in _request?
next prev parent reply other threads:[~2021-08-06 12:03 UTC|newest] Thread overview: 20+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-08-05 18:17 [Tarantool-patches] [PATCH 0/7] implement iproto streams mechanik20051988 via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 1/7] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches 2021-08-06 8:20 ` Vladimir Davydov via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 2/7] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches 2021-08-06 8:33 ` Vladimir Davydov via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 3/7] txn: detach transaction from fiber mechanik20051988 via Tarantool-patches 2021-08-06 8:51 ` Vladimir Davydov via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 4/7] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches 2021-08-06 10:30 ` Vladimir Davydov via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 5/7] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches 2021-08-06 12:03 ` Vladimir Davydov via Tarantool-patches [this message] 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 6/7] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches 2021-08-06 12:59 ` Vladimir Davydov via Tarantool-patches 2021-08-09 10:39 ` Vladimir Davydov via Tarantool-patches 2021-08-09 10:40 ` [Tarantool-patches] [PATCH 1/2] xrow: remove unused call_request::header Vladimir Davydov via Tarantool-patches 2021-08-09 10:40 ` [Tarantool-patches] [PATCH 2/2] iproto: clear request::header for client requests Vladimir Davydov via Tarantool-patches 2021-08-09 11:27 ` Evgeny Mekhanik via Tarantool-patches 2021-08-09 11:26 ` [Tarantool-patches] [PATCH 1/2] xrow: remove unused call_request::header Evgeny Mekhanik via Tarantool-patches 2021-08-05 18:17 ` [Tarantool-patches] [PATCH 7/7] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches 2021-08-06 14:04 ` Vladimir Davydov via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20210806120326.bdf5o3otxhneodq3@esperanza \ --to=tarantool-patches@dev.tarantool.org \ --cc=mechanik20.05.1988@gmail.com \ --cc=mechanik20051988@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --cc=vdavydov@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH 5/7] net.box: add stream support to net.box' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox