[Tarantool-patches] [PATCH 7/7] net.box: add interactive transaction support in net.box

Vladimir Davydov vdavydov at tarantool.org
Fri Aug 6 17:04:54 MSK 2021


On Thu, Aug 05, 2021 at 09:17:45PM +0300, mechanik20051988 wrote:
> From: mechanik20051988 <mechanik20.05.1988 at gmail.com>
> 
> Implement `begin`, `commit` and `rollback` methods for stream object
> in `net.box`, which allows to begin, commit and rollback transaction
> accordingly.
> 
> Closes #5860
> 
> @TarantoolBot document
> Title: add interactive transaction support in net.box
> Implement `begin`, `commit` and `rollback` methods for stream object
> in `net.box`, which allows to begin, commit and rollback transaction
> accordingly. Now there are multiple ways to begin, commit and rollback
> transaction from `net.box`: using appropriate stream methods, using 'call`
> or 'eval' methods or using `execute` method with sql transaction syntax.
> User can mix these methods, for example, start transaction using
> `stream:begin()`, and commit transaction using `stream:call('box.commit')`
> or stream:execute('COMMIT').
> Simple example of using interactive transactions via iproto from net.box:
> ```lua
> stream = conn:stream()

new_stream

> space = stream.space.test
> space_not_from_stream = conn.space.test
> 
> stream:begin()
> space:replace({1})
> -- return previously inserted tuple, because request
> -- belongs to transaction.
> space:select({})
> -- empty select, because select doesn't belongs to
> -- transaction
> space_not_from_stream:select({})
> stream:call('box.commit')
> -- now transaction was commited, so all requests
> -- returns tuple.
> ```
> Different examples of using streams you can find in
> gh-5860-implement-streams-in-iproto.test.lua
> ---
>  .../gh-5860-implement-streams-in-iproto.md    |   28 +
>  src/box/lua/net_box.c                         |   51 +-
>  src/box/lua/net_box.lua                       |   50 +-
>  test/box/stream.result                        | 3036 +++++++++++++++++
>  test/box/stream.test.lua                      | 1201 +++++++
>  5 files changed, 4358 insertions(+), 8 deletions(-)
>  create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
> 
> diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
> new file mode 100644
> index 000000000..d0f1359dd
> --- /dev/null
> +++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
> @@ -0,0 +1,28 @@
> +## feature/core
> +
> +* Streams and interactive transactions over streams are implemented
> +  in iproto. Stream is associated with it's ID, which is unique within
> +  one connection. All requests with same not zero stream ID belongs to
> +  the same stream. All requests in stream processed synchronously. The
> +  execution of the next   request will not start until the previous one
> +  is completed. If request has zero stream ID it does not belong to stream
> +  and is processed in the old way.
> +  In `net.box`, stream is an object above connection that has the same
> +  methods, but allows to execute requests sequentially. ID is generated
> +  on the client side in two ways: automatically or manually. User can

There's no manual id generation anymore. Please update.

> +  choose any of two methods, but can not mix them. If user writes his
> +  own connector and wants to use streams, he must transmit stream_id over
> +  the iproto protocol.
> +  The main purpose of streams is transactions via iproto. Each stream
> +  can start its own transaction, so they allows multiplexing several
> +  transactions over one connection. There are multiple ways to begin,
> +  commit and rollback transaction: using appropriate stream methods, using
> +  `call` or `eval` methods or using `execute` method with sql transaction
> +  syntax. User can mix these methods, for example, start transaction using
> +  `stream:begin()`, and commit transaction using `stream:call('box.commit')`
> +  or stream:execute('COMMIT').
> +  If any request fails during the transaction, it will not affect the other
> +  requests in the transaction. If disconnect occurs when there is some active
> +  transaction in stream, this transaction will be rollbacked,  if it does not
> +  have time to commit before this moment.
> +
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index bf6a89e15..199d78127 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -70,8 +70,11 @@ local M_GET         = 13
>  local M_MIN         = 14
>  local M_MAX         = 15
>  local M_COUNT       = 16
> +local M_BEGIN       = 17
> +local M_COMMIT      = 18
> +local M_ROLLBACK    = 19
>  -- Injects raw data into connection. Used by console and tests.
> -local M_INJECT      = 17
> +local M_INJECT      = 20
>  
>  ffi.cdef[[
>  struct error *
> @@ -1167,16 +1170,52 @@ local function check_eval_args(args)
>      end
>  end
>  
> +local function nothing_or_data(value)
> +    if value ~= nil then
> +        return value
> +    end
> +end
> +
>  function stream_methods:new_stream()
>      check_remote_arg(self, 'stream')
>      box.error(E_PROC_LUA, "Unsupported for stream");
>  end
>  
> +function stream_methods:begin(opts)
> +    check_remote_arg(self, 'begin')
> +    local res = self:_request(M_BEGIN, opts, nil, self._stream_id)
> +    if type(res) ~= 'table' or opts and opts.is_async then
> +        return nothing_or_data(res)
> +    end
> +    return unpack(res)
> +end
> +
> +function stream_methods:commit(opts)
> +    check_remote_arg(self, 'commit')
> +    local res = self:_request(M_COMMIT, opts, nil, self._stream_id)
> +    if type(res) ~= 'table' or opts and opts.is_async then
> +        return nothing_or_data(res)
> +    end
> +    return unpack(res)
> +end
> +
> +function stream_methods:rollback(opts)
> +    check_remote_arg(self, 'rollback')
> +    local res = self:_request(M_ROLLBACK, opts, nil, self._stream_id)
> +    if type(res) ~= 'table' or opts and opts.is_async then
> +        return nothing_or_data(res)
> +    end
> +    return unpack(res)
> +end
> +

In the sync mode BEGIN/COMMIT/ROLLBACK return either error, which will
be raise in _request, or nothing. In the async mode they return a future
so you don't need unpack() or nothing_or_data().

function stream_methods:rollback(opts)
    check_remote_arg(self, 'rollback')
    local res = self:_request(M_ROLLBACK, opts, nil, self._stream_id)
    if opts and opts.is_async then
    	return res
    end
end


More information about the Tarantool-patches mailing list