Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladimir Davydov via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: mechanik20051988 <mechanik20051988@tarantool.org>
Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org,
	mechanik20051988 <mechanik20.05.1988@gmail.com>
Subject: Re: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box
Date: Wed, 11 Aug 2021 14:52:05 +0300	[thread overview]
Message-ID: <20210811115205.hkhbui3zzeu63544@esperanza> (raw)
In-Reply-To: <56db4eea5c868187e151658427e1a6d65736771a.1628671235.git.mechanik20051988@tarantool.org>

On Wed, Aug 11, 2021 at 11:56:56AM +0300, mechanik20051988 wrote:
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 8f5671c15..3dffc245f 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -635,6 +749,28 @@ local function check_eval_args(args)
>      end
>  end
>  
> +local function new_stream(stream)

Let's rename this function to stream_new_stream to emphasize that
it's a stream method.

> +    check_remote_arg(stream, 'new_stream')
> +    return stream._conn:new_stream()
> +end
> +
> +function remote_methods:new_stream()
> +    check_remote_arg(self, 'new_stream')
> +    self._last_stream_id = self._last_stream_id + 1
> +    local stream = setmetatable({
> +        new_stream = new_stream,
> +        _stream_id = self._last_stream_id,
> +        space = setmetatable({
> +            _stream_space_cache = {},
> +            _stream = nil,
> +        }, stream_spaces_mt),
> +        _conn = self,
> +        _schema_version = self.schema_version,
> +    }, { __index = self, __serialize = stream_serialize })
> +    stream.space._stream = stream
> +    return stream
> +end
> +
>  function remote_methods:close()
>      check_remote_arg(self, 'close')
>      self._transport.stop()
> @@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout)
>      return self._transport.wait_state('active', timeout)
>  end
>  
> -function remote_methods:_request(method, opts, format, ...)
> +function remote_methods:_request(method, opts, format, stream_id, ...)
>      local transport = self._transport
>      local on_push, on_push_ctx, buffer, skip_header, deadline
>      -- Extract options, set defaults, check if the request is
> @@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...)
>              local res, err =
>                  transport.perform_async_request(buffer, skip_header, method,
>                                                  table.insert, {}, format,
> -                                                ...)
> +                                                stream_id, ...)
>              if err then
>                  box.error(err)
>              end
> @@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...)
>      end
>      local res, err = transport.perform_request(timeout, buffer, skip_header,
>                                                 method, on_push, on_push_ctx,
> -                                               format, ...)
> +                                               format, stream_id, ...)

Please use self._stream_id here. Then you won't need to pass stream_id
from all the functions below.

>      if err then
>          box.error(err)
>      end
> @@ -718,7 +854,7 @@ end
>  
>  function remote_methods:ping(opts)
>      check_remote_arg(self, 'ping')
> -    return (pcall(self._request, self, M_PING, opts))
> +    return (pcall(self._request, self, M_PING, opts, nil, self._stream_id))
>  end
>  
>  function remote_methods:reload_schema()
> @@ -729,14 +865,16 @@ end
>  -- @deprecated since 1.7.4
>  function remote_methods:call_16(func_name, ...)
>      check_remote_arg(self, 'call')
> -    return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...}))
> +    return (self:_request(M_CALL_16, nil, nil, self._stream_id,
> +                          tostring(func_name), {...}))
>  end
>  
>  function remote_methods:call(func_name, args, opts)
>      check_remote_arg(self, 'call')
>      check_call_args(args)
>      args = args or {}
> -    local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args)
> +    local res = self:_request(M_CALL_17, opts, nil, self._stream_id,
> +                              tostring(func_name), args)
>      if type(res) ~= 'table' or opts and opts.is_async then
>          return res
>      end
> @@ -746,14 +884,15 @@ end
>  -- @deprecated since 1.7.4
>  function remote_methods:eval_16(code, ...)
>      check_remote_arg(self, 'eval')
> -    return unpack((self:_request(M_EVAL, nil, nil, code, {...})))
> +    return unpack((self:_request(M_EVAL, nil, nil, self._stream_id,
> +                                 code, {...})))
>  end
>  
>  function remote_methods:eval(code, args, opts)
>      check_remote_arg(self, 'eval')
>      check_eval_args(args)
>      args = args or {}
> -    local res = self:_request(M_EVAL, opts, nil, code, args)
> +    local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args)
>      if type(res) ~= 'table' or opts and opts.is_async then
>          return res
>      end
> @@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts)
>      if sql_opts ~= nil then
>          box.error(box.error.UNSUPPORTED, "execute", "options")
>      end
> -    return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {},
> -                         sql_opts or {})
> +    return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id,
> +                         query, parameters or {}, sql_opts or {})
>  end
>  
>  function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args
> @@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua
>      if sql_opts ~= nil then
>          box.error(box.error.UNSUPPORTED, "prepare", "options")
>      end
> -    return self:_request(M_PREPARE, netbox_opts, nil, query)
> +    return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query)
>  end
>  
>  function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
> @@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
>      if sql_opts ~= nil then
>          box.error(box.error.UNSUPPORTED, "unprepare", "options")
>      end
> -    return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {},
> -                         sql_opts or {})
> +    return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id,
> +                         query, parameters or {}, sql_opts or {})
>  end
>  
>  function remote_methods:wait_state(state, timeout)
> @@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout)
>      end
>      if self.protocol == 'Binary' then
>          local loader = 'return require("console").eval(...)'
> -        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader,
> +        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader,
>                        {line})
>      else
>          assert(self.protocol == 'Lua console')
> -        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil,
> +        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil,
>                        line..'$EOF$\n')
>      end
>      if err then
> @@ -951,14 +1090,14 @@ space_metatable = function(remote)
>  
>      function methods:insert(tuple, opts)
>          check_space_arg(self, 'insert')
> -        return remote:_request(M_INSERT, opts, self._format_cdata, self.id,
> -                               tuple)
> +        return remote:_request(M_INSERT, opts, self._format_cdata,
> +                               self._stream_id, self.id, tuple)
>      end
>  
>      function methods:replace(tuple, opts)
>          check_space_arg(self, 'replace')
> -        return remote:_request(M_REPLACE, opts, self._format_cdata, self.id,
> -                               tuple)
> +        return remote:_request(M_REPLACE, opts, self._format_cdata,
> +                               self._stream_id, self.id, tuple)
>      end
>  
>      function methods:select(key, opts)
> @@ -978,7 +1117,8 @@ space_metatable = function(remote)
>  
>      function methods:upsert(key, oplist, opts)
>          check_space_arg(self, 'upsert')
> -        return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id,
> +        return nothing_or_data(remote:_request(M_UPSERT, opts, nil,
> +                                               self._stream_id, self.id,
>                                                 key, oplist))
>      end
>  
> @@ -1009,8 +1149,8 @@ index_metatable = function(remote)
>          local offset = tonumber(opts and opts.offset) or 0
>          local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
>          return (remote:_request(M_SELECT, opts, self.space._format_cdata,
> -                                self.space.id, self.id, iterator, offset,
> -                                limit, key))
> +                                self._stream_id, self.space.id, self.id,
> +                                iterator, offset, limit, key))
>      end
>  
>      function methods:get(key, opts)
> @@ -1020,6 +1160,7 @@ index_metatable = function(remote)
>          end
>          return nothing_or_data(remote:_request(M_GET, opts,
>                                                 self.space._format_cdata,
> +                                               self._stream_id,
>                                                 self.space.id, self.id,
>                                                 box.index.EQ, 0, 2, key))
>      end
> @@ -1031,6 +1172,7 @@ index_metatable = function(remote)
>          end
>          return nothing_or_data(remote:_request(M_MIN, opts,
>                                                 self.space._format_cdata,
> +                                               self._stream_id,
>                                                 self.space.id, self.id,
>                                                 box.index.GE, 0, 1, key))
>      end
> @@ -1042,6 +1184,7 @@ index_metatable = function(remote)
>          end
>          return nothing_or_data(remote:_request(M_MAX, opts,
>                                                 self.space._format_cdata,
> +                                               self._stream_id,
>                                                 self.space.id, self.id,
>                                                 box.index.LE, 0, 1, key))
>      end
> @@ -1053,22 +1196,24 @@ index_metatable = function(remote)
>          end
>          local code = string.format('box.space.%s.index.%s:count',
>                                     self.space.name, self.name)
> -        return remote:_request(M_COUNT, opts, nil, code, { key, opts })
> +        return remote:_request(M_COUNT, opts, nil, self._stream_id,
> +                               code, { key, opts })
>      end
>  
>      function methods:delete(key, opts)
>          check_index_arg(self, 'delete')
>          return nothing_or_data(remote:_request(M_DELETE, opts,
>                                                 self.space._format_cdata,
> -                                               self.space.id, self.id, key))
> +                                               self._stream_id, self.space.id,
> +                                               self.id, key))
>      end
>  
>      function methods:update(key, oplist, opts)
>          check_index_arg(self, 'update')
>          return nothing_or_data(remote:_request(M_UPDATE, opts,
>                                                 self.space._format_cdata,
> -                                               self.space.id, self.id, key,
> -                                               oplist))
> +                                               self._stream_id, self.space.id,
> +                                               self.id, key, oplist))
>      end
>  
>      return { __index = methods, __metatable = false }

  reply	other threads:[~2021-08-11 11:52 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-08-11  8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches
2021-08-11  8:56 ` [Tarantool-patches] [PATCH v3 1/8] xrow: remove unused call_request::header mechanik20051988 via Tarantool-patches
2021-08-11  8:56 ` [Tarantool-patches] [PATCH v3 2/8] iproto: clear request::header for client requests mechanik20051988 via Tarantool-patches
2021-08-11  8:56 ` [Tarantool-patches] [PATCH v3 3/8] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches
2021-08-11  8:56 ` [Tarantool-patches] [PATCH v3 4/8] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches
2021-08-11  8:56 ` [Tarantool-patches] [PATCH v3 5/8] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches
2021-08-11 11:30   ` Vladimir Davydov via Tarantool-patches
2021-08-11  8:56 ` [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
2021-08-11 11:52   ` Vladimir Davydov via Tarantool-patches [this message]
2021-08-11 12:09   ` Vladimir Davydov via Tarantool-patches
2021-08-11  8:56 ` [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches
2021-08-11 12:39   ` Vladimir Davydov via Tarantool-patches
2021-08-11  8:56 ` [Tarantool-patches] [PATCH v3 8/8] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches
2021-08-11 12:47   ` Vladimir Davydov via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210811115205.hkhbui3zzeu63544@esperanza \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=mechanik20.05.1988@gmail.com \
    --cc=mechanik20051988@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --cc=vdavydov@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox