[Tarantool-patches] [PATCH 7/7] net.box: add interactive transaction support in net.box
Vladimir Davydov
vdavydov at tarantool.org
Fri Aug 6 17:04:54 MSK 2021
On Thu, Aug 05, 2021 at 09:17:45PM +0300, mechanik20051988 wrote:
> From: mechanik20051988 <mechanik20.05.1988 at gmail.com>
>
> Implement `begin`, `commit` and `rollback` methods for stream object
> in `net.box`, which allows to begin, commit and rollback transaction
> accordingly.
>
> Closes #5860
>
> @TarantoolBot document
> Title: add interactive transaction support in net.box
> Implement `begin`, `commit` and `rollback` methods for stream object
> in `net.box`, which allows to begin, commit and rollback transaction
> accordingly. Now there are multiple ways to begin, commit and rollback
> transaction from `net.box`: using appropriate stream methods, using 'call`
> or 'eval' methods or using `execute` method with sql transaction syntax.
> User can mix these methods, for example, start transaction using
> `stream:begin()`, and commit transaction using `stream:call('box.commit')`
> or stream:execute('COMMIT').
> Simple example of using interactive transactions via iproto from net.box:
> ```lua
> stream = conn:stream()
new_stream
> space = stream.space.test
> space_not_from_stream = conn.space.test
>
> stream:begin()
> space:replace({1})
> -- return previously inserted tuple, because request
> -- belongs to transaction.
> space:select({})
> -- empty select, because select doesn't belongs to
> -- transaction
> space_not_from_stream:select({})
> stream:call('box.commit')
> -- now transaction was commited, so all requests
> -- returns tuple.
> ```
> Different examples of using streams you can find in
> gh-5860-implement-streams-in-iproto.test.lua
> ---
> .../gh-5860-implement-streams-in-iproto.md | 28 +
> src/box/lua/net_box.c | 51 +-
> src/box/lua/net_box.lua | 50 +-
> test/box/stream.result | 3036 +++++++++++++++++
> test/box/stream.test.lua | 1201 +++++++
> 5 files changed, 4358 insertions(+), 8 deletions(-)
> create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
>
> diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
> new file mode 100644
> index 000000000..d0f1359dd
> --- /dev/null
> +++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
> @@ -0,0 +1,28 @@
> +## feature/core
> +
> +* Streams and interactive transactions over streams are implemented
> + in iproto. Stream is associated with it's ID, which is unique within
> + one connection. All requests with same not zero stream ID belongs to
> + the same stream. All requests in stream processed synchronously. The
> + execution of the next request will not start until the previous one
> + is completed. If request has zero stream ID it does not belong to stream
> + and is processed in the old way.
> + In `net.box`, stream is an object above connection that has the same
> + methods, but allows to execute requests sequentially. ID is generated
> + on the client side in two ways: automatically or manually. User can
There's no manual id generation anymore. Please update.
> + choose any of two methods, but can not mix them. If user writes his
> + own connector and wants to use streams, he must transmit stream_id over
> + the iproto protocol.
> + The main purpose of streams is transactions via iproto. Each stream
> + can start its own transaction, so they allows multiplexing several
> + transactions over one connection. There are multiple ways to begin,
> + commit and rollback transaction: using appropriate stream methods, using
> + `call` or `eval` methods or using `execute` method with sql transaction
> + syntax. User can mix these methods, for example, start transaction using
> + `stream:begin()`, and commit transaction using `stream:call('box.commit')`
> + or stream:execute('COMMIT').
> + If any request fails during the transaction, it will not affect the other
> + requests in the transaction. If disconnect occurs when there is some active
> + transaction in stream, this transaction will be rollbacked, if it does not
> + have time to commit before this moment.
> +
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index bf6a89e15..199d78127 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -70,8 +70,11 @@ local M_GET = 13
> local M_MIN = 14
> local M_MAX = 15
> local M_COUNT = 16
> +local M_BEGIN = 17
> +local M_COMMIT = 18
> +local M_ROLLBACK = 19
> -- Injects raw data into connection. Used by console and tests.
> -local M_INJECT = 17
> +local M_INJECT = 20
>
> ffi.cdef[[
> struct error *
> @@ -1167,16 +1170,52 @@ local function check_eval_args(args)
> end
> end
>
> +local function nothing_or_data(value)
> + if value ~= nil then
> + return value
> + end
> +end
> +
> function stream_methods:new_stream()
> check_remote_arg(self, 'stream')
> box.error(E_PROC_LUA, "Unsupported for stream");
> end
>
> +function stream_methods:begin(opts)
> + check_remote_arg(self, 'begin')
> + local res = self:_request(M_BEGIN, opts, nil, self._stream_id)
> + if type(res) ~= 'table' or opts and opts.is_async then
> + return nothing_or_data(res)
> + end
> + return unpack(res)
> +end
> +
> +function stream_methods:commit(opts)
> + check_remote_arg(self, 'commit')
> + local res = self:_request(M_COMMIT, opts, nil, self._stream_id)
> + if type(res) ~= 'table' or opts and opts.is_async then
> + return nothing_or_data(res)
> + end
> + return unpack(res)
> +end
> +
> +function stream_methods:rollback(opts)
> + check_remote_arg(self, 'rollback')
> + local res = self:_request(M_ROLLBACK, opts, nil, self._stream_id)
> + if type(res) ~= 'table' or opts and opts.is_async then
> + return nothing_or_data(res)
> + end
> + return unpack(res)
> +end
> +
In the sync mode BEGIN/COMMIT/ROLLBACK return either error, which will
be raise in _request, or nothing. In the async mode they return a future
so you don't need unpack() or nothing_or_data().
function stream_methods:rollback(opts)
check_remote_arg(self, 'rollback')
local res = self:_request(M_ROLLBACK, opts, nil, self._stream_id)
if opts and opts.is_async then
return res
end
end
More information about the Tarantool-patches
mailing list