[Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box

Vladimir Davydov vdavydov at tarantool.org
Wed Aug 11 14:52:05 MSK 2021


On Wed, Aug 11, 2021 at 11:56:56AM +0300, mechanik20051988 wrote:
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 8f5671c15..3dffc245f 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -635,6 +749,28 @@ local function check_eval_args(args)
>      end
>  end
>  
> +local function new_stream(stream)

Let's rename this function to stream_new_stream to emphasize that
it's a stream method.

> +    check_remote_arg(stream, 'new_stream')
> +    return stream._conn:new_stream()
> +end
> +
> +function remote_methods:new_stream()
> +    check_remote_arg(self, 'new_stream')
> +    self._last_stream_id = self._last_stream_id + 1
> +    local stream = setmetatable({
> +        new_stream = new_stream,
> +        _stream_id = self._last_stream_id,
> +        space = setmetatable({
> +            _stream_space_cache = {},
> +            _stream = nil,
> +        }, stream_spaces_mt),
> +        _conn = self,
> +        _schema_version = self.schema_version,
> +    }, { __index = self, __serialize = stream_serialize })
> +    stream.space._stream = stream
> +    return stream
> +end
> +
>  function remote_methods:close()
>      check_remote_arg(self, 'close')
>      self._transport.stop()
> @@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout)
>      return self._transport.wait_state('active', timeout)
>  end
>  
> -function remote_methods:_request(method, opts, format, ...)
> +function remote_methods:_request(method, opts, format, stream_id, ...)
>      local transport = self._transport
>      local on_push, on_push_ctx, buffer, skip_header, deadline
>      -- Extract options, set defaults, check if the request is
> @@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...)
>              local res, err =
>                  transport.perform_async_request(buffer, skip_header, method,
>                                                  table.insert, {}, format,
> -                                                ...)
> +                                                stream_id, ...)
>              if err then
>                  box.error(err)
>              end
> @@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...)
>      end
>      local res, err = transport.perform_request(timeout, buffer, skip_header,
>                                                 method, on_push, on_push_ctx,
> -                                               format, ...)
> +                                               format, stream_id, ...)

Please use self._stream_id here. Then you won't need to pass stream_id
from all the functions below.

>      if err then
>          box.error(err)
>      end
> @@ -718,7 +854,7 @@ end
>  
>  function remote_methods:ping(opts)
>      check_remote_arg(self, 'ping')
> -    return (pcall(self._request, self, M_PING, opts))
> +    return (pcall(self._request, self, M_PING, opts, nil, self._stream_id))
>  end
>  
>  function remote_methods:reload_schema()
> @@ -729,14 +865,16 @@ end
>  -- @deprecated since 1.7.4
>  function remote_methods:call_16(func_name, ...)
>      check_remote_arg(self, 'call')
> -    return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...}))
> +    return (self:_request(M_CALL_16, nil, nil, self._stream_id,
> +                          tostring(func_name), {...}))
>  end
>  
>  function remote_methods:call(func_name, args, opts)
>      check_remote_arg(self, 'call')
>      check_call_args(args)
>      args = args or {}
> -    local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args)
> +    local res = self:_request(M_CALL_17, opts, nil, self._stream_id,
> +                              tostring(func_name), args)
>      if type(res) ~= 'table' or opts and opts.is_async then
>          return res
>      end
> @@ -746,14 +884,15 @@ end
>  -- @deprecated since 1.7.4
>  function remote_methods:eval_16(code, ...)
>      check_remote_arg(self, 'eval')
> -    return unpack((self:_request(M_EVAL, nil, nil, code, {...})))
> +    return unpack((self:_request(M_EVAL, nil, nil, self._stream_id,
> +                                 code, {...})))
>  end
>  
>  function remote_methods:eval(code, args, opts)
>      check_remote_arg(self, 'eval')
>      check_eval_args(args)
>      args = args or {}
> -    local res = self:_request(M_EVAL, opts, nil, code, args)
> +    local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args)
>      if type(res) ~= 'table' or opts and opts.is_async then
>          return res
>      end
> @@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts)
>      if sql_opts ~= nil then
>          box.error(box.error.UNSUPPORTED, "execute", "options")
>      end
> -    return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {},
> -                         sql_opts or {})
> +    return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id,
> +                         query, parameters or {}, sql_opts or {})
>  end
>  
>  function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args
> @@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua
>      if sql_opts ~= nil then
>          box.error(box.error.UNSUPPORTED, "prepare", "options")
>      end
> -    return self:_request(M_PREPARE, netbox_opts, nil, query)
> +    return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query)
>  end
>  
>  function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
> @@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
>      if sql_opts ~= nil then
>          box.error(box.error.UNSUPPORTED, "unprepare", "options")
>      end
> -    return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {},
> -                         sql_opts or {})
> +    return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id,
> +                         query, parameters or {}, sql_opts or {})
>  end
>  
>  function remote_methods:wait_state(state, timeout)
> @@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout)
>      end
>      if self.protocol == 'Binary' then
>          local loader = 'return require("console").eval(...)'
> -        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader,
> +        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader,
>                        {line})
>      else
>          assert(self.protocol == 'Lua console')
> -        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil,
> +        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil,
>                        line..'$EOF$\n')
>      end
>      if err then
> @@ -951,14 +1090,14 @@ space_metatable = function(remote)
>  
>      function methods:insert(tuple, opts)
>          check_space_arg(self, 'insert')
> -        return remote:_request(M_INSERT, opts, self._format_cdata, self.id,
> -                               tuple)
> +        return remote:_request(M_INSERT, opts, self._format_cdata,
> +                               self._stream_id, self.id, tuple)
>      end
>  
>      function methods:replace(tuple, opts)
>          check_space_arg(self, 'replace')
> -        return remote:_request(M_REPLACE, opts, self._format_cdata, self.id,
> -                               tuple)
> +        return remote:_request(M_REPLACE, opts, self._format_cdata,
> +                               self._stream_id, self.id, tuple)
>      end
>  
>      function methods:select(key, opts)
> @@ -978,7 +1117,8 @@ space_metatable = function(remote)
>  
>      function methods:upsert(key, oplist, opts)
>          check_space_arg(self, 'upsert')
> -        return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id,
> +        return nothing_or_data(remote:_request(M_UPSERT, opts, nil,
> +                                               self._stream_id, self.id,
>                                                 key, oplist))
>      end
>  
> @@ -1009,8 +1149,8 @@ index_metatable = function(remote)
>          local offset = tonumber(opts and opts.offset) or 0
>          local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
>          return (remote:_request(M_SELECT, opts, self.space._format_cdata,
> -                                self.space.id, self.id, iterator, offset,
> -                                limit, key))
> +                                self._stream_id, self.space.id, self.id,
> +                                iterator, offset, limit, key))
>      end
>  
>      function methods:get(key, opts)
> @@ -1020,6 +1160,7 @@ index_metatable = function(remote)
>          end
>          return nothing_or_data(remote:_request(M_GET, opts,
>                                                 self.space._format_cdata,
> +                                               self._stream_id,
>                                                 self.space.id, self.id,
>                                                 box.index.EQ, 0, 2, key))
>      end
> @@ -1031,6 +1172,7 @@ index_metatable = function(remote)
>          end
>          return nothing_or_data(remote:_request(M_MIN, opts,
>                                                 self.space._format_cdata,
> +                                               self._stream_id,
>                                                 self.space.id, self.id,
>                                                 box.index.GE, 0, 1, key))
>      end
> @@ -1042,6 +1184,7 @@ index_metatable = function(remote)
>          end
>          return nothing_or_data(remote:_request(M_MAX, opts,
>                                                 self.space._format_cdata,
> +                                               self._stream_id,
>                                                 self.space.id, self.id,
>                                                 box.index.LE, 0, 1, key))
>      end
> @@ -1053,22 +1196,24 @@ index_metatable = function(remote)
>          end
>          local code = string.format('box.space.%s.index.%s:count',
>                                     self.space.name, self.name)
> -        return remote:_request(M_COUNT, opts, nil, code, { key, opts })
> +        return remote:_request(M_COUNT, opts, nil, self._stream_id,
> +                               code, { key, opts })
>      end
>  
>      function methods:delete(key, opts)
>          check_index_arg(self, 'delete')
>          return nothing_or_data(remote:_request(M_DELETE, opts,
>                                                 self.space._format_cdata,
> -                                               self.space.id, self.id, key))
> +                                               self._stream_id, self.space.id,
> +                                               self.id, key))
>      end
>  
>      function methods:update(key, oplist, opts)
>          check_index_arg(self, 'update')
>          return nothing_or_data(remote:_request(M_UPDATE, opts,
>                                                 self.space._format_cdata,
> -                                               self.space.id, self.id, key,
> -                                               oplist))
> +                                               self._stream_id, self.space.id,
> +                                               self.id, key, oplist))
>      end
>  
>      return { __index = methods, __metatable = false }


More information about the Tarantool-patches mailing list