From: Vladimir Davydov via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: mechanik20051988 <mechanik20051988@tarantool.org> Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org, mechanik20051988 <mechanik20.05.1988@gmail.com> Subject: Re: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box Date: Wed, 11 Aug 2021 14:52:05 +0300 [thread overview] Message-ID: <20210811115205.hkhbui3zzeu63544@esperanza> (raw) In-Reply-To: <56db4eea5c868187e151658427e1a6d65736771a.1628671235.git.mechanik20051988@tarantool.org> On Wed, Aug 11, 2021 at 11:56:56AM +0300, mechanik20051988 wrote: > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua > index 8f5671c15..3dffc245f 100644 > --- a/src/box/lua/net_box.lua > +++ b/src/box/lua/net_box.lua > @@ -635,6 +749,28 @@ local function check_eval_args(args) > end > end > > +local function new_stream(stream) Let's rename this function to stream_new_stream to emphasize that it's a stream method. > + check_remote_arg(stream, 'new_stream') > + return stream._conn:new_stream() > +end > + > +function remote_methods:new_stream() > + check_remote_arg(self, 'new_stream') > + self._last_stream_id = self._last_stream_id + 1 > + local stream = setmetatable({ > + new_stream = new_stream, > + _stream_id = self._last_stream_id, > + space = setmetatable({ > + _stream_space_cache = {}, > + _stream = nil, > + }, stream_spaces_mt), > + _conn = self, > + _schema_version = self.schema_version, > + }, { __index = self, __serialize = stream_serialize }) > + stream.space._stream = stream > + return stream > +end > + > function remote_methods:close() > check_remote_arg(self, 'close') > self._transport.stop() > @@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout) > return self._transport.wait_state('active', timeout) > end > > -function remote_methods:_request(method, opts, format, ...) > +function remote_methods:_request(method, opts, format, stream_id, ...) > local transport = self._transport > local on_push, on_push_ctx, buffer, skip_header, deadline > -- Extract options, set defaults, check if the request is > @@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...) > local res, err = > transport.perform_async_request(buffer, skip_header, method, > table.insert, {}, format, > - ...) > + stream_id, ...) > if err then > box.error(err) > end > @@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...) > end > local res, err = transport.perform_request(timeout, buffer, skip_header, > method, on_push, on_push_ctx, > - format, ...) > + format, stream_id, ...) Please use self._stream_id here. Then you won't need to pass stream_id from all the functions below. > if err then > box.error(err) > end > @@ -718,7 +854,7 @@ end > > function remote_methods:ping(opts) > check_remote_arg(self, 'ping') > - return (pcall(self._request, self, M_PING, opts)) > + return (pcall(self._request, self, M_PING, opts, nil, self._stream_id)) > end > > function remote_methods:reload_schema() > @@ -729,14 +865,16 @@ end > -- @deprecated since 1.7.4 > function remote_methods:call_16(func_name, ...) > check_remote_arg(self, 'call') > - return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...})) > + return (self:_request(M_CALL_16, nil, nil, self._stream_id, > + tostring(func_name), {...})) > end > > function remote_methods:call(func_name, args, opts) > check_remote_arg(self, 'call') > check_call_args(args) > args = args or {} > - local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args) > + local res = self:_request(M_CALL_17, opts, nil, self._stream_id, > + tostring(func_name), args) > if type(res) ~= 'table' or opts and opts.is_async then > return res > end > @@ -746,14 +884,15 @@ end > -- @deprecated since 1.7.4 > function remote_methods:eval_16(code, ...) > check_remote_arg(self, 'eval') > - return unpack((self:_request(M_EVAL, nil, nil, code, {...}))) > + return unpack((self:_request(M_EVAL, nil, nil, self._stream_id, > + code, {...}))) > end > > function remote_methods:eval(code, args, opts) > check_remote_arg(self, 'eval') > check_eval_args(args) > args = args or {} > - local res = self:_request(M_EVAL, opts, nil, code, args) > + local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args) > if type(res) ~= 'table' or opts and opts.is_async then > return res > end > @@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts) > if sql_opts ~= nil then > box.error(box.error.UNSUPPORTED, "execute", "options") > end > - return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {}, > - sql_opts or {}) > + return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id, > + query, parameters or {}, sql_opts or {}) > end > > function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args > @@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua > if sql_opts ~= nil then > box.error(box.error.UNSUPPORTED, "prepare", "options") > end > - return self:_request(M_PREPARE, netbox_opts, nil, query) > + return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query) > end > > function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts) > @@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts) > if sql_opts ~= nil then > box.error(box.error.UNSUPPORTED, "unprepare", "options") > end > - return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {}, > - sql_opts or {}) > + return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id, > + query, parameters or {}, sql_opts or {}) > end > > function remote_methods:wait_state(state, timeout) > @@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout) > end > if self.protocol == 'Binary' then > local loader = 'return require("console").eval(...)' > - res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader, > + res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader, > {line}) > else > assert(self.protocol == 'Lua console') > - res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, > + res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil, > line..'$EOF$\n') > end > if err then > @@ -951,14 +1090,14 @@ space_metatable = function(remote) > > function methods:insert(tuple, opts) > check_space_arg(self, 'insert') > - return remote:_request(M_INSERT, opts, self._format_cdata, self.id, > - tuple) > + return remote:_request(M_INSERT, opts, self._format_cdata, > + self._stream_id, self.id, tuple) > end > > function methods:replace(tuple, opts) > check_space_arg(self, 'replace') > - return remote:_request(M_REPLACE, opts, self._format_cdata, self.id, > - tuple) > + return remote:_request(M_REPLACE, opts, self._format_cdata, > + self._stream_id, self.id, tuple) > end > > function methods:select(key, opts) > @@ -978,7 +1117,8 @@ space_metatable = function(remote) > > function methods:upsert(key, oplist, opts) > check_space_arg(self, 'upsert') > - return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id, > + return nothing_or_data(remote:_request(M_UPSERT, opts, nil, > + self._stream_id, self.id, > key, oplist)) > end > > @@ -1009,8 +1149,8 @@ index_metatable = function(remote) > local offset = tonumber(opts and opts.offset) or 0 > local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF > return (remote:_request(M_SELECT, opts, self.space._format_cdata, > - self.space.id, self.id, iterator, offset, > - limit, key)) > + self._stream_id, self.space.id, self.id, > + iterator, offset, limit, key)) > end > > function methods:get(key, opts) > @@ -1020,6 +1160,7 @@ index_metatable = function(remote) > end > return nothing_or_data(remote:_request(M_GET, opts, > self.space._format_cdata, > + self._stream_id, > self.space.id, self.id, > box.index.EQ, 0, 2, key)) > end > @@ -1031,6 +1172,7 @@ index_metatable = function(remote) > end > return nothing_or_data(remote:_request(M_MIN, opts, > self.space._format_cdata, > + self._stream_id, > self.space.id, self.id, > box.index.GE, 0, 1, key)) > end > @@ -1042,6 +1184,7 @@ index_metatable = function(remote) > end > return nothing_or_data(remote:_request(M_MAX, opts, > self.space._format_cdata, > + self._stream_id, > self.space.id, self.id, > box.index.LE, 0, 1, key)) > end > @@ -1053,22 +1196,24 @@ index_metatable = function(remote) > end > local code = string.format('box.space.%s.index.%s:count', > self.space.name, self.name) > - return remote:_request(M_COUNT, opts, nil, code, { key, opts }) > + return remote:_request(M_COUNT, opts, nil, self._stream_id, > + code, { key, opts }) > end > > function methods:delete(key, opts) > check_index_arg(self, 'delete') > return nothing_or_data(remote:_request(M_DELETE, opts, > self.space._format_cdata, > - self.space.id, self.id, key)) > + self._stream_id, self.space.id, > + self.id, key)) > end > > function methods:update(key, oplist, opts) > check_index_arg(self, 'update') > return nothing_or_data(remote:_request(M_UPDATE, opts, > self.space._format_cdata, > - self.space.id, self.id, key, > - oplist)) > + self._stream_id, self.space.id, > + self.id, key, oplist)) > end > > return { __index = methods, __metatable = false }
next prev parent reply other threads:[~2021-08-11 11:52 UTC|newest] Thread overview: 14+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 1/8] xrow: remove unused call_request::header mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 2/8] iproto: clear request::header for client requests mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 3/8] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 4/8] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 5/8] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches 2021-08-11 11:30 ` Vladimir Davydov via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches 2021-08-11 11:52 ` Vladimir Davydov via Tarantool-patches [this message] 2021-08-11 12:09 ` Vladimir Davydov via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches 2021-08-11 12:39 ` Vladimir Davydov via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 8/8] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches 2021-08-11 12:47 ` Vladimir Davydov via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20210811115205.hkhbui3zzeu63544@esperanza \ --to=tarantool-patches@dev.tarantool.org \ --cc=mechanik20.05.1988@gmail.com \ --cc=mechanik20051988@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --cc=vdavydov@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox