[Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box
Vladimir Davydov
vdavydov at tarantool.org
Wed Aug 11 14:52:05 MSK 2021
On Wed, Aug 11, 2021 at 11:56:56AM +0300, mechanik20051988 wrote:
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 8f5671c15..3dffc245f 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -635,6 +749,28 @@ local function check_eval_args(args)
> end
> end
>
> +local function new_stream(stream)
Let's rename this function to stream_new_stream to emphasize that
it's a stream method.
> + check_remote_arg(stream, 'new_stream')
> + return stream._conn:new_stream()
> +end
> +
> +function remote_methods:new_stream()
> + check_remote_arg(self, 'new_stream')
> + self._last_stream_id = self._last_stream_id + 1
> + local stream = setmetatable({
> + new_stream = new_stream,
> + _stream_id = self._last_stream_id,
> + space = setmetatable({
> + _stream_space_cache = {},
> + _stream = nil,
> + }, stream_spaces_mt),
> + _conn = self,
> + _schema_version = self.schema_version,
> + }, { __index = self, __serialize = stream_serialize })
> + stream.space._stream = stream
> + return stream
> +end
> +
> function remote_methods:close()
> check_remote_arg(self, 'close')
> self._transport.stop()
> @@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout)
> return self._transport.wait_state('active', timeout)
> end
>
> -function remote_methods:_request(method, opts, format, ...)
> +function remote_methods:_request(method, opts, format, stream_id, ...)
> local transport = self._transport
> local on_push, on_push_ctx, buffer, skip_header, deadline
> -- Extract options, set defaults, check if the request is
> @@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...)
> local res, err =
> transport.perform_async_request(buffer, skip_header, method,
> table.insert, {}, format,
> - ...)
> + stream_id, ...)
> if err then
> box.error(err)
> end
> @@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...)
> end
> local res, err = transport.perform_request(timeout, buffer, skip_header,
> method, on_push, on_push_ctx,
> - format, ...)
> + format, stream_id, ...)
Please use self._stream_id here. Then you won't need to pass stream_id
from all the functions below.
> if err then
> box.error(err)
> end
> @@ -718,7 +854,7 @@ end
>
> function remote_methods:ping(opts)
> check_remote_arg(self, 'ping')
> - return (pcall(self._request, self, M_PING, opts))
> + return (pcall(self._request, self, M_PING, opts, nil, self._stream_id))
> end
>
> function remote_methods:reload_schema()
> @@ -729,14 +865,16 @@ end
> -- @deprecated since 1.7.4
> function remote_methods:call_16(func_name, ...)
> check_remote_arg(self, 'call')
> - return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...}))
> + return (self:_request(M_CALL_16, nil, nil, self._stream_id,
> + tostring(func_name), {...}))
> end
>
> function remote_methods:call(func_name, args, opts)
> check_remote_arg(self, 'call')
> check_call_args(args)
> args = args or {}
> - local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args)
> + local res = self:_request(M_CALL_17, opts, nil, self._stream_id,
> + tostring(func_name), args)
> if type(res) ~= 'table' or opts and opts.is_async then
> return res
> end
> @@ -746,14 +884,15 @@ end
> -- @deprecated since 1.7.4
> function remote_methods:eval_16(code, ...)
> check_remote_arg(self, 'eval')
> - return unpack((self:_request(M_EVAL, nil, nil, code, {...})))
> + return unpack((self:_request(M_EVAL, nil, nil, self._stream_id,
> + code, {...})))
> end
>
> function remote_methods:eval(code, args, opts)
> check_remote_arg(self, 'eval')
> check_eval_args(args)
> args = args or {}
> - local res = self:_request(M_EVAL, opts, nil, code, args)
> + local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args)
> if type(res) ~= 'table' or opts and opts.is_async then
> return res
> end
> @@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts)
> if sql_opts ~= nil then
> box.error(box.error.UNSUPPORTED, "execute", "options")
> end
> - return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {},
> - sql_opts or {})
> + return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id,
> + query, parameters or {}, sql_opts or {})
> end
>
> function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args
> @@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua
> if sql_opts ~= nil then
> box.error(box.error.UNSUPPORTED, "prepare", "options")
> end
> - return self:_request(M_PREPARE, netbox_opts, nil, query)
> + return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query)
> end
>
> function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
> @@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
> if sql_opts ~= nil then
> box.error(box.error.UNSUPPORTED, "unprepare", "options")
> end
> - return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {},
> - sql_opts or {})
> + return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id,
> + query, parameters or {}, sql_opts or {})
> end
>
> function remote_methods:wait_state(state, timeout)
> @@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout)
> end
> if self.protocol == 'Binary' then
> local loader = 'return require("console").eval(...)'
> - res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader,
> + res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader,
> {line})
> else
> assert(self.protocol == 'Lua console')
> - res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil,
> + res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil,
> line..'$EOF$\n')
> end
> if err then
> @@ -951,14 +1090,14 @@ space_metatable = function(remote)
>
> function methods:insert(tuple, opts)
> check_space_arg(self, 'insert')
> - return remote:_request(M_INSERT, opts, self._format_cdata, self.id,
> - tuple)
> + return remote:_request(M_INSERT, opts, self._format_cdata,
> + self._stream_id, self.id, tuple)
> end
>
> function methods:replace(tuple, opts)
> check_space_arg(self, 'replace')
> - return remote:_request(M_REPLACE, opts, self._format_cdata, self.id,
> - tuple)
> + return remote:_request(M_REPLACE, opts, self._format_cdata,
> + self._stream_id, self.id, tuple)
> end
>
> function methods:select(key, opts)
> @@ -978,7 +1117,8 @@ space_metatable = function(remote)
>
> function methods:upsert(key, oplist, opts)
> check_space_arg(self, 'upsert')
> - return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id,
> + return nothing_or_data(remote:_request(M_UPSERT, opts, nil,
> + self._stream_id, self.id,
> key, oplist))
> end
>
> @@ -1009,8 +1149,8 @@ index_metatable = function(remote)
> local offset = tonumber(opts and opts.offset) or 0
> local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
> return (remote:_request(M_SELECT, opts, self.space._format_cdata,
> - self.space.id, self.id, iterator, offset,
> - limit, key))
> + self._stream_id, self.space.id, self.id,
> + iterator, offset, limit, key))
> end
>
> function methods:get(key, opts)
> @@ -1020,6 +1160,7 @@ index_metatable = function(remote)
> end
> return nothing_or_data(remote:_request(M_GET, opts,
> self.space._format_cdata,
> + self._stream_id,
> self.space.id, self.id,
> box.index.EQ, 0, 2, key))
> end
> @@ -1031,6 +1172,7 @@ index_metatable = function(remote)
> end
> return nothing_or_data(remote:_request(M_MIN, opts,
> self.space._format_cdata,
> + self._stream_id,
> self.space.id, self.id,
> box.index.GE, 0, 1, key))
> end
> @@ -1042,6 +1184,7 @@ index_metatable = function(remote)
> end
> return nothing_or_data(remote:_request(M_MAX, opts,
> self.space._format_cdata,
> + self._stream_id,
> self.space.id, self.id,
> box.index.LE, 0, 1, key))
> end
> @@ -1053,22 +1196,24 @@ index_metatable = function(remote)
> end
> local code = string.format('box.space.%s.index.%s:count',
> self.space.name, self.name)
> - return remote:_request(M_COUNT, opts, nil, code, { key, opts })
> + return remote:_request(M_COUNT, opts, nil, self._stream_id,
> + code, { key, opts })
> end
>
> function methods:delete(key, opts)
> check_index_arg(self, 'delete')
> return nothing_or_data(remote:_request(M_DELETE, opts,
> self.space._format_cdata,
> - self.space.id, self.id, key))
> + self._stream_id, self.space.id,
> + self.id, key))
> end
>
> function methods:update(key, oplist, opts)
> check_index_arg(self, 'update')
> return nothing_or_data(remote:_request(M_UPDATE, opts,
> self.space._format_cdata,
> - self.space.id, self.id, key,
> - oplist))
> + self._stream_id, self.space.id,
> + self.id, key, oplist))
> end
>
> return { __index = methods, __metatable = false }
More information about the Tarantool-patches
mailing list