From: Vladimir Davydov via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: mechanik20051988 <mechanik20051988@tarantool.org>
Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org,
mechanik20051988 <mechanik20.05.1988@gmail.com>
Subject: Re: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box
Date: Wed, 11 Aug 2021 14:52:05 +0300 [thread overview]
Message-ID: <20210811115205.hkhbui3zzeu63544@esperanza> (raw)
In-Reply-To: <56db4eea5c868187e151658427e1a6d65736771a.1628671235.git.mechanik20051988@tarantool.org>
On Wed, Aug 11, 2021 at 11:56:56AM +0300, mechanik20051988 wrote:
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 8f5671c15..3dffc245f 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -635,6 +749,28 @@ local function check_eval_args(args)
> end
> end
>
> +local function new_stream(stream)
Let's rename this function to stream_new_stream to emphasize that
it's a stream method.
> + check_remote_arg(stream, 'new_stream')
> + return stream._conn:new_stream()
> +end
> +
> +function remote_methods:new_stream()
> + check_remote_arg(self, 'new_stream')
> + self._last_stream_id = self._last_stream_id + 1
> + local stream = setmetatable({
> + new_stream = new_stream,
> + _stream_id = self._last_stream_id,
> + space = setmetatable({
> + _stream_space_cache = {},
> + _stream = nil,
> + }, stream_spaces_mt),
> + _conn = self,
> + _schema_version = self.schema_version,
> + }, { __index = self, __serialize = stream_serialize })
> + stream.space._stream = stream
> + return stream
> +end
> +
> function remote_methods:close()
> check_remote_arg(self, 'close')
> self._transport.stop()
> @@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout)
> return self._transport.wait_state('active', timeout)
> end
>
> -function remote_methods:_request(method, opts, format, ...)
> +function remote_methods:_request(method, opts, format, stream_id, ...)
> local transport = self._transport
> local on_push, on_push_ctx, buffer, skip_header, deadline
> -- Extract options, set defaults, check if the request is
> @@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...)
> local res, err =
> transport.perform_async_request(buffer, skip_header, method,
> table.insert, {}, format,
> - ...)
> + stream_id, ...)
> if err then
> box.error(err)
> end
> @@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...)
> end
> local res, err = transport.perform_request(timeout, buffer, skip_header,
> method, on_push, on_push_ctx,
> - format, ...)
> + format, stream_id, ...)
Please use self._stream_id here. Then you won't need to pass stream_id
from all the functions below.
> if err then
> box.error(err)
> end
> @@ -718,7 +854,7 @@ end
>
> function remote_methods:ping(opts)
> check_remote_arg(self, 'ping')
> - return (pcall(self._request, self, M_PING, opts))
> + return (pcall(self._request, self, M_PING, opts, nil, self._stream_id))
> end
>
> function remote_methods:reload_schema()
> @@ -729,14 +865,16 @@ end
> -- @deprecated since 1.7.4
> function remote_methods:call_16(func_name, ...)
> check_remote_arg(self, 'call')
> - return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...}))
> + return (self:_request(M_CALL_16, nil, nil, self._stream_id,
> + tostring(func_name), {...}))
> end
>
> function remote_methods:call(func_name, args, opts)
> check_remote_arg(self, 'call')
> check_call_args(args)
> args = args or {}
> - local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args)
> + local res = self:_request(M_CALL_17, opts, nil, self._stream_id,
> + tostring(func_name), args)
> if type(res) ~= 'table' or opts and opts.is_async then
> return res
> end
> @@ -746,14 +884,15 @@ end
> -- @deprecated since 1.7.4
> function remote_methods:eval_16(code, ...)
> check_remote_arg(self, 'eval')
> - return unpack((self:_request(M_EVAL, nil, nil, code, {...})))
> + return unpack((self:_request(M_EVAL, nil, nil, self._stream_id,
> + code, {...})))
> end
>
> function remote_methods:eval(code, args, opts)
> check_remote_arg(self, 'eval')
> check_eval_args(args)
> args = args or {}
> - local res = self:_request(M_EVAL, opts, nil, code, args)
> + local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args)
> if type(res) ~= 'table' or opts and opts.is_async then
> return res
> end
> @@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts)
> if sql_opts ~= nil then
> box.error(box.error.UNSUPPORTED, "execute", "options")
> end
> - return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {},
> - sql_opts or {})
> + return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id,
> + query, parameters or {}, sql_opts or {})
> end
>
> function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args
> @@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua
> if sql_opts ~= nil then
> box.error(box.error.UNSUPPORTED, "prepare", "options")
> end
> - return self:_request(M_PREPARE, netbox_opts, nil, query)
> + return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query)
> end
>
> function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
> @@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
> if sql_opts ~= nil then
> box.error(box.error.UNSUPPORTED, "unprepare", "options")
> end
> - return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {},
> - sql_opts or {})
> + return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id,
> + query, parameters or {}, sql_opts or {})
> end
>
> function remote_methods:wait_state(state, timeout)
> @@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout)
> end
> if self.protocol == 'Binary' then
> local loader = 'return require("console").eval(...)'
> - res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader,
> + res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader,
> {line})
> else
> assert(self.protocol == 'Lua console')
> - res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil,
> + res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil,
> line..'$EOF$\n')
> end
> if err then
> @@ -951,14 +1090,14 @@ space_metatable = function(remote)
>
> function methods:insert(tuple, opts)
> check_space_arg(self, 'insert')
> - return remote:_request(M_INSERT, opts, self._format_cdata, self.id,
> - tuple)
> + return remote:_request(M_INSERT, opts, self._format_cdata,
> + self._stream_id, self.id, tuple)
> end
>
> function methods:replace(tuple, opts)
> check_space_arg(self, 'replace')
> - return remote:_request(M_REPLACE, opts, self._format_cdata, self.id,
> - tuple)
> + return remote:_request(M_REPLACE, opts, self._format_cdata,
> + self._stream_id, self.id, tuple)
> end
>
> function methods:select(key, opts)
> @@ -978,7 +1117,8 @@ space_metatable = function(remote)
>
> function methods:upsert(key, oplist, opts)
> check_space_arg(self, 'upsert')
> - return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id,
> + return nothing_or_data(remote:_request(M_UPSERT, opts, nil,
> + self._stream_id, self.id,
> key, oplist))
> end
>
> @@ -1009,8 +1149,8 @@ index_metatable = function(remote)
> local offset = tonumber(opts and opts.offset) or 0
> local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
> return (remote:_request(M_SELECT, opts, self.space._format_cdata,
> - self.space.id, self.id, iterator, offset,
> - limit, key))
> + self._stream_id, self.space.id, self.id,
> + iterator, offset, limit, key))
> end
>
> function methods:get(key, opts)
> @@ -1020,6 +1160,7 @@ index_metatable = function(remote)
> end
> return nothing_or_data(remote:_request(M_GET, opts,
> self.space._format_cdata,
> + self._stream_id,
> self.space.id, self.id,
> box.index.EQ, 0, 2, key))
> end
> @@ -1031,6 +1172,7 @@ index_metatable = function(remote)
> end
> return nothing_or_data(remote:_request(M_MIN, opts,
> self.space._format_cdata,
> + self._stream_id,
> self.space.id, self.id,
> box.index.GE, 0, 1, key))
> end
> @@ -1042,6 +1184,7 @@ index_metatable = function(remote)
> end
> return nothing_or_data(remote:_request(M_MAX, opts,
> self.space._format_cdata,
> + self._stream_id,
> self.space.id, self.id,
> box.index.LE, 0, 1, key))
> end
> @@ -1053,22 +1196,24 @@ index_metatable = function(remote)
> end
> local code = string.format('box.space.%s.index.%s:count',
> self.space.name, self.name)
> - return remote:_request(M_COUNT, opts, nil, code, { key, opts })
> + return remote:_request(M_COUNT, opts, nil, self._stream_id,
> + code, { key, opts })
> end
>
> function methods:delete(key, opts)
> check_index_arg(self, 'delete')
> return nothing_or_data(remote:_request(M_DELETE, opts,
> self.space._format_cdata,
> - self.space.id, self.id, key))
> + self._stream_id, self.space.id,
> + self.id, key))
> end
>
> function methods:update(key, oplist, opts)
> check_index_arg(self, 'update')
> return nothing_or_data(remote:_request(M_UPDATE, opts,
> self.space._format_cdata,
> - self.space.id, self.id, key,
> - oplist))
> + self._stream_id, self.space.id,
> + self.id, key, oplist))
> end
>
> return { __index = methods, __metatable = false }
next prev parent reply other threads:[~2021-08-11 11:52 UTC|newest]
Thread overview: 14+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 1/8] xrow: remove unused call_request::header mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 2/8] iproto: clear request::header for client requests mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 3/8] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 4/8] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 5/8] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches
2021-08-11 11:30 ` Vladimir Davydov via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
2021-08-11 11:52 ` Vladimir Davydov via Tarantool-patches [this message]
2021-08-11 12:09 ` Vladimir Davydov via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches
2021-08-11 12:39 ` Vladimir Davydov via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 8/8] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches
2021-08-11 12:47 ` Vladimir Davydov via Tarantool-patches
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210811115205.hkhbui3zzeu63544@esperanza \
--to=tarantool-patches@dev.tarantool.org \
--cc=mechanik20.05.1988@gmail.com \
--cc=mechanik20051988@tarantool.org \
--cc=v.shpilevoy@tarantool.org \
--cc=vdavydov@tarantool.org \
--subject='Re: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box' \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox