[Tarantool-patches] [PATCH v4 9/9] net.box: add interactive transaction support in net.box

mechanik20051988 mechanik20051988 at tarantool.org
Thu Aug 12 12:50:46 MSK 2021


From: mechanik20051988 <mechanik20.05.1988 at gmail.com>

Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.

Closes #5860

@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:new_stream()
space = stream.space.test
space_not_from_stream = conn.space.test

stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
---
 .../gh-5860-implement-streams-in-iproto.md    |   26 +
 src/box/lua/net_box.c                         |   49 +-
 src/box/lua/net_box.lua                       |   35 +-
 ...ox_iproto_transactions_over_streams.result | 3009 +++++++++++++++++
 ..._iproto_transactions_over_streams.test.lua | 1238 +++++++
 test/box/suite.ini                            |    2 +-
 6 files changed, 4356 insertions(+), 3 deletions(-)
 create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
 create mode 100644 test/box/net.box_iproto_transactions_over_streams.result
 create mode 100644 test/box/net.box_iproto_transactions_over_streams.test.lua

diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
new file mode 100644
index 000000000..8a8eec3e7
--- /dev/null
+++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
@@ -0,0 +1,26 @@
+## feature/core
+
+* Streams and interactive transactions over streams are implemented
+  in iproto. Stream is associated with it's ID, which is unique within
+  one connection. All requests with same not zero stream ID belongs to
+  the same stream. All requests in stream processed synchronously. The
+  execution of the next request will not start until the previous one is
+  completed. If request has zero stream ID it does not belong to stream
+  and is processed in the old way.
+  In `net.box`, stream is an object above connection that has the same
+  methods, but allows to execute requests sequentially. ID is generated
+  on the client side automatically. If user writes his own connector and
+  wants to use streams, he must transmit stream_id over iproto protocol.
+  The main purpose of streams is transactions via iproto. Each stream
+  can start its own transaction, so they allows multiplexing several
+  transactions over one connection. There are multiple ways to begin,
+  commit and rollback transaction: using appropriate stream methods, using
+  `call` or `eval` methods or using `execute` method with sql transaction
+  syntax. User can mix these methods, for example, start transaction using
+  `stream:begin()`, and commit transaction using `stream:call('box.commit')`
+  or stream:execute('COMMIT').
+  If any request fails during the transaction, it will not affect the other
+  requests in the transaction. If disconnect occurs when there is some active
+  transaction in stream, this transaction will be rollbacked, if it does not
+  have time to commit before this moment.
+
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 3bc49af23..229dec590 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -75,7 +75,10 @@ enum netbox_method {
 	NETBOX_MIN         = 14,
 	NETBOX_MAX         = 15,
 	NETBOX_COUNT       = 16,
-	NETBOX_INJECT      = 17,
+	NETBOX_BEGIN       = 17,
+	NETBOX_COMMIT      = 18,
+	NETBOX_ROLLBACK    = 19,
+	NETBOX_INJECT      = 20,
 	netbox_method_MAX
 };
 
@@ -916,6 +919,44 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
 	netbox_encode_prepare(L, idx, stream, sync, stream_id);
 }
 
+static inline void
+netbox_encode_txn(lua_State *L, enum iproto_type type, int idx,
+		  struct mpstream *stream, uint64_t sync,
+		  uint64_t stream_id)
+{
+	(void)L;
+	(void) idx;
+	assert(type == IPROTO_BEGIN ||
+	       type == IPROTO_COMMIT ||
+	       type == IPROTO_ROLLBACK);
+	size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
+	netbox_end_encode(stream, svp);
+}
+
+static void
+netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream,
+		    uint64_t sync, uint64_t stream_id)
+{
+	return netbox_encode_txn(L, IPROTO_BEGIN, idx, stream,
+				 sync, stream_id);
+}
+
+static void
+netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream,
+		     uint64_t sync, uint64_t stream_id)
+{
+	return netbox_encode_txn(L, IPROTO_COMMIT, idx, stream,
+				 sync, stream_id);
+}
+
+static void
+netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream,
+		       uint64_t sync, uint64_t stream_id)
+{
+	return netbox_encode_txn(L, IPROTO_ROLLBACK, idx, stream,
+				 sync, stream_id);
+}
+
 static void
 netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
 		     uint64_t sync, uint64_t stream_id)
@@ -959,6 +1000,9 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
 		[NETBOX_MIN]		= netbox_encode_select,
 		[NETBOX_MAX]		= netbox_encode_select,
 		[NETBOX_COUNT]		= netbox_encode_call,
+		[NETBOX_BEGIN]          = netbox_encode_begin,
+		[NETBOX_COMMIT]         = netbox_encode_commit,
+		[NETBOX_ROLLBACK]       = netbox_encode_rollback,
 		[NETBOX_INJECT]		= netbox_encode_inject,
 	};
 	struct mpstream stream;
@@ -1330,6 +1374,9 @@ netbox_decode_method(struct lua_State *L, enum netbox_method method,
 		[NETBOX_MIN]		= netbox_decode_tuple,
 		[NETBOX_MAX]		= netbox_decode_tuple,
 		[NETBOX_COUNT]		= netbox_decode_value,
+		[NETBOX_BEGIN]          = netbox_decode_nil,
+		[NETBOX_COMMIT]         = netbox_decode_nil,
+		[NETBOX_ROLLBACK]       = netbox_decode_nil,
 		[NETBOX_INJECT]		= netbox_decode_table,
 	};
 	method_decoder[method](L, data, data_end, format);
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 8d707fb26..f203b203e 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -51,8 +51,11 @@ local M_GET         = 13
 local M_MIN         = 14
 local M_MAX         = 15
 local M_COUNT       = 16
+local M_BEGIN       = 17
+local M_COMMIT      = 18
+local M_ROLLBACK    = 19
 -- Injects raw data into connection. Used by console and tests.
-local M_INJECT      = 17
+local M_INJECT      = 20
 
 -- utility tables
 local is_final_state         = {closed = 1, error = 1}
@@ -754,11 +757,38 @@ local function stream_new_stream(stream)
     return stream._conn:new_stream()
 end
 
+local function stream_begin(stream, opts)
+    check_remote_arg(stream, 'begin')
+    local res = stream:_request(M_BEGIN, opts, nil, stream._stream_id)
+    if opts and opts.is_async then
+        return res
+    end
+end
+
+local function stream_commit(stream, opts)
+    check_remote_arg(stream, 'commit')
+    local res = stream:_request(M_COMMIT, opts, nil, stream._stream_id)
+    if opts and opts.is_async then
+        return res
+    end
+end
+
+local function stream_rollback(stream, opts)
+    check_remote_arg(stream, 'rollback')
+    local res = stream:_request(M_ROLLBACK, opts, nil, stream._stream_id)
+    if opts and opts.is_async then
+        return res
+    end
+end
+
 function remote_methods:new_stream()
     check_remote_arg(self, 'new_stream')
     self._last_stream_id = self._last_stream_id + 1
     local stream = setmetatable({
         new_stream = stream_new_stream,
+        begin = stream_begin,
+        commit = stream_commit,
+        rollback = stream_rollback,
         _stream_id = self._last_stream_id,
         space = setmetatable({
             _stream_space_cache = {},
@@ -1243,6 +1273,9 @@ local this_module = {
         min         = M_MIN,
         max         = M_MAX,
         count       = M_COUNT,
+        begin       = M_BEGIN,
+        commit      = M_COMMIT,
+        rollback    = M_ROLLBACK,
         inject      = M_INJECT,
     }
 }
diff --git a/test/box/net.box_iproto_transactions_over_streams.result b/test/box/net.box_iproto_transactions_over_streams.result
new file mode 100644
index 000000000..c2167e760
--- /dev/null
+++ b/test/box/net.box_iproto_transactions_over_streams.result
@@ -0,0 +1,3009 @@
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+---
+...
+json = require('json')
+---
+...
+fiber = require('fiber')
+---
+...
+msgpack = require('msgpack')
+---
+...
+test_run = require('test_run').new()
+---
+...
+test_run:cmd("create server test with script='box/iproto_streams.lua'")
+---
+- true
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+---
+...
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+test_run:cmd("start server test with args='1'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+---
+...
+conn_2 = net_box.connect(server_addr)
+---
+...
+stream_1_1 = conn_1:new_stream()
+---
+...
+stream_1_2 = conn_1:new_stream()
+---
+...
+stream_2 = conn_2:new_stream()
+---
+...
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+---
+...
+stream_1_1:rollback()
+---
+...
+stream_1_1:begin()
+---
+...
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+---
+...
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+---
+...
+box.commit()
+---
+...
+test_run:switch("default")
+---
+- true
+...
+stream_1_1:commit()
+---
+...
+stream_1_2:commit()
+---
+...
+stream_2:commit()
+---
+...
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+---
+- error: Unable to process BEGIN request out of stream
+...
+conn:_request(net_box._method.commit, nil, nil, nil)
+---
+- error: Unable to process COMMIT request out of stream
+...
+conn:_request(net_box._method.rollback, nil, nil, nil)
+---
+- error: Unable to process ROLLBACK request out of stream
+...
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+---
+...
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+---
+...
+IPROTO_REQUEST_TYPE       = 0x00
+---
+...
+IPROTO_SYNC               = 0x01
+---
+...
+IPROTO_AUTH               = 7
+---
+...
+IPROTO_STREAM_ID          = 0x0a
+---
+...
+next_request_id           = 9
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+header = msgpack.encode({
+    [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+    [IPROTO_SYNC] = next_request_id,
+    [IPROTO_STREAM_ID] = 1,
+});
+---
+...
+body = msgpack.encode({nil});
+---
+...
+size = msgpack.encode(header:len() + body:len());
+---
+...
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+                                nil, nil, nil, nil,
+                                size .. header .. body);
+---
+- null
+- Unable to process AUTH request in stream
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+conn:close()
+---
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+space:replace({1})
+---
+- [1]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+---
+- []
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+---
+...
+stream:ping()
+---
+- true
+...
+stream:commit()
+---
+...
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+---
+...
+stream:call('s:replace', {{1}})
+---
+- [1]
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+stream:call('s:select', {})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+stream:call('s:replace', {{1}})
+---
+- [1]
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+stream:call('s:select', {})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+test_run:switch('test')
+---
+- true
+...
+s:drop()
+---
+...
+-- Check that there are no streams and messages, which
+-- was not deleted
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+stream:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+---
+...
+space_2_no_stream = conn.space.test_2
+---
+...
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+stream_2:begin()
+---
+...
+space_2:replace({1})
+---
+- [1]
+...
+test_run:switch('test')
+---
+- true
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+---
+- []
+...
+space_2_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+---
+- - [1]
+...
+space_2:select({})
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+---
+- - [1]
+...
+space_2:select{}
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+---
+- - [1]
+...
+s2:select()
+---
+- - [1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1_1 = stream_1.space.test_1
+---
+...
+space_1_2 = stream_2.space.test_1
+---
+...
+space_2_1 = stream_1.space.test_2
+---
+...
+space_2_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Simple read/write conflict.
+space_1_1:select({1})
+---
+- []
+...
+space_1_2:select({1})
+---
+- []
+...
+space_1_1:replace({1, 1})
+---
+- [1, 1]
+...
+space_1_2:replace({1, 2})
+---
+- [1, 2]
+...
+stream_1:commit()
+---
+...
+-- This transaction fails, because of conflict
+stream_2:commit()
+---
+- error: Transaction has been aborted by conflict
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- Here we must accept [1, 1]
+space_1_1:select({})
+---
+- - [1, 1]
+...
+space_1_2:select({})
+---
+- - [1, 1]
+...
+-- Same test for vinyl sapce
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_2_1:select({1})
+---
+- []
+...
+space_2_2:select({1})
+---
+- []
+...
+space_2_1:replace({1, 1})
+---
+- [1, 1]
+...
+space_2_2:replace({1, 2})
+---
+- [1, 2]
+...
+stream_1:commit()
+---
+...
+-- This transaction fails, because of conflict
+stream_2:commit()
+---
+- error: Transaction has been aborted by conflict
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- Here we must accept [1, 1]
+space_2_1:select({})
+---
+- - [1, 1]
+...
+space_2_2:select({})
+---
+- - [1, 1]
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+---
+- - [1, 1]
+...
+s2:select()
+---
+- - [1, 1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Test rollback for memtx space
+space_1:replace({1})
+---
+- [1]
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+---
+- - [1]
+...
+stream_1:rollback()
+---
+...
+-- Select is empty, transaction rollback
+space_1:select({})
+---
+- []
+...
+-- Test rollback for vinyl space
+space_2:replace({1})
+---
+- [1]
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+---
+- - [1]
+...
+stream_2:rollback()
+---
+...
+-- Select is empty, transaction rollback
+space_2:select({})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+---
+...
+stream_1:commit()
+---
+...
+stream_2:begin()
+---
+...
+stream_2:commit()
+---
+...
+conn:close()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_1:select({})
+---
+- - [1]
+  - [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_2:select({})
+---
+- - [1]
+  - [2]
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Empty selects, transaction was rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Reconnect
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Two empty selects
+space_1:select({})
+---
+- []
+...
+space_2:select({})
+---
+- []
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_1:select({})
+---
+- - [1]
+  - [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_2:select({})
+---
+- - [1]
+  - [2]
+...
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for i = 1, 1000 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+fiber.sleep(0)
+---
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+---
+- true
+...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+---
+- true
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Select was empty, transaction rollbacked
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = stream_1:commit({is_async = true})
+---
+...
+_ = stream_2:commit({is_async = true})
+---
+...
+fiber.sleep(0)
+---
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+---
+- true
+...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+---
+- true
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+---
+...
+rc2 = s2:select()
+---
+...
+assert(#rc1)
+---
+- 100
+...
+assert(#rc2)
+---
+- 100
+...
+s1:truncate()
+---
+...
+s2:truncate()
+---
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Reconnect
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Two empty selects
+space_1:select({})
+---
+- []
+...
+space_2:select({})
+---
+- []
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Here we get two tuples, commit was successful
+s1:select{}
+---
+- - [1]
+  - [2]
+...
+-- Here we get two tuples, commit was successful
+s2:select{}
+---
+- - [1]
+  - [2]
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("start server test with args='1, true'")
+---
+- true
+...
+test_run:switch('test')
+---
+- true
+...
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+---
+- - [1]
+  - [2]
+...
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+---
+- - [1]
+  - [2]
+...
+box.space.test_1:drop()
+---
+...
+box.space.test_2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream_1 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+memtx_futures = {}
+---
+...
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+---
+...
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+---
+...
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+---
+...
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+---
+...
+vinyl_futures = {}
+---
+...
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+---
+...
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+---
+...
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+---
+...
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+---
+...
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+---
+...
+memtx_results = wait_and_return_results(memtx_futures)
+---
+...
+vinyl_results = wait_and_return_results(vinyl_futures)
+---
+...
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+---
+- true
+...
+assert(not vinyl_results["begin"])
+---
+- true
+...
+-- [1]
+assert(memtx_results["replace"])
+---
+- [1]
+...
+assert(vinyl_results["replace"])
+---
+- [1]
+...
+-- [2]
+assert(memtx_results["insert"])
+---
+- [2]
+...
+assert(vinyl_results["insert"])
+---
+- [2]
+...
+-- [1] [2]
+assert(memtx_results["select"])
+---
+- - [1]
+  - [2]
+...
+assert(vinyl_results["select"])
+---
+- - [1]
+  - [2]
+...
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+---
+- true
+...
+assert(not vinyl_results["commit"])
+---
+- true
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+---
+- - [1]
+  - [2]
+...
+s2:select()
+---
+- - [1]
+  - [2]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1_1 = stream_1.space.test_1
+---
+...
+space_1_2 = stream_2.space.test_1
+---
+...
+space_2_1 = stream_1.space.test_2
+---
+...
+space_2_2 = stream_2.space.test_2
+---
+...
+futures_1 = {}
+---
+...
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+---
+...
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+---
+...
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+---
+...
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+---
+...
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+---
+...
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+---
+...
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+---
+...
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+---
+...
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+---
+...
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+---
+...
+results_1 = wait_and_return_results(futures_1)
+---
+...
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+---
+- true
+...
+assert(not results_1["begin_2"])
+---
+- true
+...
+-- []
+assert(not results_1["select_1_1"][1])
+---
+- true
+...
+assert(not results_1["select_1_2"][1])
+---
+- true
+...
+-- [1]
+assert(results_1["replace_1_1"][1])
+---
+- 1
+...
+-- [1]
+assert(results_1["replace_1_1"][2])
+---
+- 1
+...
+-- [1]
+assert(results_1["replace_1_2"][1])
+---
+- 1
+...
+-- [2]
+assert(results_1["replace_1_2"][2])
+---
+- 2
+...
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+---
+- true
+...
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+---
+- Transaction has been aborted by conflict
+...
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+---
+- [1, 1]
+...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+futures_2 = {}
+---
+...
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+---
+...
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+---
+...
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+---
+...
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+---
+...
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+---
+...
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+---
+...
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+---
+...
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+---
+...
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+---
+...
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+---
+...
+results_2 = wait_and_return_results(futures_2)
+---
+...
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+---
+- true
+...
+assert(not results_2["begin_2"])
+---
+- true
+...
+-- []
+assert(not results_2["select_2_1"][1])
+---
+- true
+...
+assert(not results_2["select_2_2"][1])
+---
+- true
+...
+-- [1]
+assert(results_2["replace_2_1"][1])
+---
+- 1
+...
+-- [1]
+assert(results_2["replace_2_1"][2])
+---
+- 1
+...
+-- [1]
+assert(results_2["replace_2_2"][1])
+---
+- 1
+...
+-- [2]
+assert(results_2["replace_2_2"][2])
+---
+- 2
+...
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+---
+- true
+...
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+---
+- Transaction has been aborted by conflict
+...
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+---
+- [1, 1]
+...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+test_run:switch('test')
+---
+- true
+...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+---
+- - [1, 1]
+...
+s2:select()
+---
+- - [1, 1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+function ping() return "pong" end
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+space_no_stream = conn.space.test
+---
+...
+-- successful begin using stream:call
+stream:call('box.begin')
+---
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+stream:call('ping')
+---
+- pong
+...
+stream:eval('ping()')
+---
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+stream:eval('box.begin()')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- successful commit using stream:call
+stream:call('box.commit')
+---
+...
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+---
+...
+space:replace({1})
+---
+- [1]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, because transaction was successful
+s:select()
+---
+- - [1]
+...
+s:delete{1}
+---
+- [1]
+...
+test_run:switch('default')
+---
+- true
+...
+-- Check rollback using stream:call
+stream:begin()
+---
+...
+space:replace({2})
+---
+- [2]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+---
+- - [2]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+--Successful rollback using stream:call
+stream:call('box.rollback')
+---
+...
+-- Empty selects transaction rollbacked
+space:select({})
+---
+- []
+...
+space_no_stream:select{}
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Empty select transaction rollbacked
+s:select()
+---
+- []
+...
+s:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+for i = 1, 10 do space:replace{i} end
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+s:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+---
+- true
+...
+function execute_sql_string(stream, sql_string)
+    if stream then
+        stream:execute(sql_string)
+    else
+        box.execute(sql_string)
+    end
+end$
+---
+...
+function execute_sql_string_and_return_result(stream, sql_string)
+    if stream then
+        return pcall(stream.execute, stream, sql_string)
+    else
+        return box.execute(sql_string)
+    end
+end$
+---
+...
+function monster_ddl(stream)
+    local _, err1, err2, err3, err4, err5, err6
+    local stream_or_box = stream or box
+    execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER);]])
+    execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER UNIQUE,
+                                                 CONSTRAINT ck1
+                                                 CHECK(b < 100));]])
+
+    execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+    execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+    execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+                                                          KEY, a INTEGER);]])
+
+    execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+    execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+                                 CHECK(b > 0);]])
+
+    _, err1 =
+        execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+                                                       RENAME TO t1;]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+                                 ck2 CHECK(a > 0);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+                                                       INTEGER PRIMARY KEY);]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+
+    execute_sql_string(stream, [[CREATE TABLE
+                                 trigger_catcher(id INTEGER PRIMARY
+                                                 KEY AUTOINCREMENT);]])
+
+    execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+    execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+                                 t1 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+                                 t2 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+                                                       ON t1(a, b);]])
+
+    execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+    _, err5 =
+        execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+    _, err6 =
+        execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+                                                       t_does_not_exist;]])
+
+    execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+    return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+            err5, err6}
+end$
+---
+...
+function monster_ddl_cmp_res(res1, res2)
+    if json.encode(res1) == json.encode(res2) then
+        return true
+    end
+    return res1, res2
+end$
+---
+...
+function monster_ddl_is_clean(stream)
+    local stream_or_box = stream or box
+    assert(stream_or_box.space.T1 == nil)
+    assert(stream_or_box.space.T2 == nil)
+    assert(stream_or_box.space._trigger:count() == 0)
+    assert(stream_or_box.space._fk_constraint:count() == 0)
+    assert(stream_or_box.space._ck_constraint:count() == 0)
+    assert(stream_or_box.space.T_RENAMED == nil)
+    assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+---
+...
+function monster_ddl_check(stream)
+    local _, err1, err2, err3, err4, res
+    local stream_or_box = stream or box
+    _, err1 =
+       execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                      VALUES (1, 1, 101)]])
+    execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                       VALUES(2, 2, 1)]])
+    _, err3 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, 20, 1)]])
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, -1, 1)]])
+    execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+    if not stream then
+        assert(stream_or_box.space.T_RENAMED ~= nil)
+        assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+        assert(stream_or_box.space.T_TO_RENAME == nil)
+        res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                             trigger_catcher]])
+    else
+        _, res =
+            execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                           trigger_catcher]])
+    end
+    return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+            err3, err4, res}
+end$
+---
+...
+function monster_ddl_clear(stream)
+    execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+---
+...
+test_run:cmd("setopt delimiter ''")$
+---
+- true
+...
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+test_run:switch('test')
+---
+- true
+...
+test_run:cmd("setopt delimiter '$'")
+---
+- true
+...
+function monster_ddl_is_clean()
+    if not (box.space.T1 == nil) or
+       not (box.space.T2 == nil) or
+       not (box.space._trigger:count() == 0) or
+       not (box.space._fk_constraint:count() == 0) or
+       not (box.space._ck_constraint:count() == 0) or
+       not (box.space.T_RENAMED == nil) or
+       not (box.space.T_TO_RENAME == nil) then
+           return false
+    end
+    return true
+end$
+---
+...
+test_run:cmd("setopt delimiter ''")$
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream = conn:new_stream()
+---
+...
+-- No txn.
+true_ddl_res = monster_ddl()
+---
+...
+true_ddl_res
+---
+- - 'Finished ok, errors in the middle: '
+  - Space 'T1' already exists
+  - Space 'T1' already exists
+  - Space 'T3' does not exist
+  - Index 'T1A' already exists in space 'T1'
+  - 'Failed to execute SQL statement: can not truncate space ''T2'' because other
+    objects depend on it'
+  - Space 'T_DOES_NOT_EXIST' does not exist
+...
+true_check_res = monster_ddl_check()
+---
+...
+true_check_res
+---
+- - 'Finished ok, errors and trigger catcher content: '
+  - 'Check constraint failed ''CK1'': b < 100'
+  - Duplicate key exists in unique index "unique_unnamed_T2_2" in space "T2" with
+    old tuple - [1, 1, 1] and new tuple - [2, 2, 1]
+  - 'Failed to execute SQL statement: FOREIGN KEY constraint failed'
+  - 'Check constraint failed ''CK2'': a > 0'
+  - metadata:
+    - name: ID
+      type: integer
+    rows:
+    - [1]
+...
+monster_ddl_clear()
+---
+...
+monster_ddl_is_clean()
+---
+...
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+---
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+---
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+test_run:switch("test")
+---
+- true
+...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+---
+- row_count: 1
+...
+-- reload schema
+stream:ping()
+---
+- true
+...
+space = stream.space.TEST
+---
+...
+assert(space ~= nil)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+space:replace{1, 2, '3'}
+---
+- [1, 2, '3']
+...
+space:select()
+---
+- - [1, 2, '3']
+...
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+---
+- []
+...
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+---
+...
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+---
+...
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+---
+- true
+...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+---
+- metadata:
+  - name: ID
+    type: integer
+  - name: A
+    type: number
+  - name: B
+    type: string
+  rows:
+  - [1, 2, '3']
+...
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+---
+- metadata:
+  - name: ID
+    type: integer
+  - name: A
+    type: number
+  - name: B
+    type: string
+  rows: []
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+---
+- metadata:
+  - name: ID
+    type: integer
+  - name: A
+    type: number
+  - name: B
+    type: string
+  rows:
+  - [1, 2, '3']
+...
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+---
+- metadata:
+  - name: ID
+    type: integer
+  - name: A
+    type: number
+  - name: B
+    type: string
+  rows:
+  - [1, 2, '3']
+...
+stream:unprepare(stream_pr.stmt_id)
+---
+- null
+...
+conn:close()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+---
+- - [1, 2, '3']
+...
+box.space.TEST:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("cleanup server test")
+---
+- true
+...
+test_run:cmd("delete server test")
+---
+- true
+...
diff --git a/test/box/net.box_iproto_transactions_over_streams.test.lua b/test/box/net.box_iproto_transactions_over_streams.test.lua
new file mode 100644
index 000000000..094c451a9
--- /dev/null
+++ b/test/box/net.box_iproto_transactions_over_streams.test.lua
@@ -0,0 +1,1238 @@
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+json = require('json')
+fiber = require('fiber')
+msgpack = require('msgpack')
+test_run = require('test_run').new()
+
+test_run:cmd("create server test with script='box/iproto_streams.lua'")
+
+test_run:cmd("setopt delimiter ';'")
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+test_run:cmd("setopt delimiter ''");
+
+test_run:cmd("start server test with args='1'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+conn_2 = net_box.connect(server_addr)
+stream_1_1 = conn_1:new_stream()
+stream_1_2 = conn_1:new_stream()
+stream_2 = conn_2:new_stream()
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+stream_1_1:rollback()
+
+stream_1_1:begin()
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+test_run:switch("test")
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+box.commit()
+test_run:switch("default")
+stream_1_1:commit()
+stream_1_2:commit()
+stream_2:commit()
+
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+conn:_request(net_box._method.commit, nil, nil, nil)
+conn:_request(net_box._method.rollback, nil, nil, nil)
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+IPROTO_REQUEST_TYPE       = 0x00
+IPROTO_SYNC               = 0x01
+IPROTO_AUTH               = 7
+IPROTO_STREAM_ID          = 0x0a
+next_request_id           = 9
+test_run:cmd("setopt delimiter ';'")
+header = msgpack.encode({
+    [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+    [IPROTO_SYNC] = next_request_id,
+    [IPROTO_STREAM_ID] = 1,
+});
+body = msgpack.encode({nil});
+size = msgpack.encode(header:len() + body:len());
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+                                nil, nil, nil, nil,
+                                size .. header .. body);
+test_run:cmd("setopt delimiter ''");
+conn:close()
+test_run:cmd("stop server test")
+
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+test_run:switch('default')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+-- Select is empty, transaction was aborted
+space:select{}
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+stream:ping()
+stream:commit()
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+-- Select is empty, transaction was aborted
+space:select{}
+
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+-- Select is empty, transaction was aborted
+space:select{}
+
+test_run:switch('test')
+s:drop()
+-- Check that there are no streams and messages, which
+-- was not deleted
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+stream:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s1:create_index('primary')
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+space_2_no_stream = conn.space.test_2
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+space_1:replace({1})
+stream_2:begin()
+space_2:replace({1})
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+test_run:switch('default')
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+space_2_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+space_2:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+space_2:select{}
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Simple read/write conflict.
+space_1_1:select({1})
+space_1_2:select({1})
+space_1_1:replace({1, 1})
+space_1_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_1_1:select({})
+space_1_2:select({})
+
+-- Same test for vinyl sapce
+stream_1:begin()
+stream_2:begin()
+space_2_1:select({1})
+space_2_2:select({1})
+space_2_1:replace({1, 1})
+space_2_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_2_1:select({})
+space_2_2:select({})
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Test rollback for memtx space
+space_1:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+stream_1:rollback()
+-- Select is empty, transaction rollback
+space_1:select({})
+
+-- Test rollback for vinyl space
+space_2:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+stream_2:rollback()
+-- Select is empty, transaction rollback
+space_2:select({})
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+stream_1:commit()
+stream_2:begin()
+stream_2:commit()
+conn:close()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+conn:close()
+
+test_run:switch("test")
+-- Empty selects, transaction was rollback
+s1:select()
+s2:select()
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+for i = 1, 1000 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select was empty, transaction rollbacked
+s1:select()
+s2:select()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+test_run:cmd("setopt delimiter ';'")
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+_ = stream_1:commit({is_async = true})
+_ = stream_2:commit({is_async = true})
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+rc2 = s2:select()
+assert(#rc1)
+assert(#rc2)
+s1:truncate()
+s2:truncate()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+
+stream_1:begin()
+stream_2:begin()
+space_1:replace({1})
+space_1:replace({2})
+space_2:replace({1})
+space_2:replace({2})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+s1:select{}
+-- Here we get two tuples, commit was successful
+s2:select{}
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+test_run:cmd("start server test with args='1, true'")
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+box.space.test_1:drop()
+box.space.test_2:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+space_1 = stream_1.space.test_1
+stream_2 = conn:new_stream()
+space_2 = stream_2.space.test_2
+
+memtx_futures = {}
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+
+vinyl_futures = {}
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+
+memtx_results = wait_and_return_results(memtx_futures)
+vinyl_results = wait_and_return_results(vinyl_futures)
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+assert(not vinyl_results["begin"])
+-- [1]
+assert(memtx_results["replace"])
+assert(vinyl_results["replace"])
+-- [2]
+assert(memtx_results["insert"])
+assert(vinyl_results["insert"])
+-- [1] [2]
+assert(memtx_results["select"])
+assert(vinyl_results["select"])
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+assert(not vinyl_results["commit"])
+
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+
+futures_1 = {}
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+
+results_1 = wait_and_return_results(futures_1)
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+assert(not results_1["begin_2"])
+-- []
+assert(not results_1["select_1_1"][1])
+assert(not results_1["select_1_2"][1])
+-- [1]
+assert(results_1["replace_1_1"][1])
+-- [1]
+assert(results_1["replace_1_1"][2])
+-- [1]
+assert(results_1["replace_1_2"][1])
+-- [2]
+assert(results_1["replace_1_2"][2])
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+
+futures_2 = {}
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+
+results_2 = wait_and_return_results(futures_2)
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+assert(not results_2["begin_2"])
+-- []
+assert(not results_2["select_2_1"][1])
+assert(not results_2["select_2_2"][1])
+-- [1]
+assert(results_2["replace_2_1"][1])
+-- [1]
+assert(results_2["replace_2_1"][2])
+-- [1]
+assert(results_2["replace_2_2"][1])
+-- [2]
+assert(results_2["replace_2_2"][2])
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+function ping() return "pong" end
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+space_no_stream = conn.space.test
+
+-- successful begin using stream:call
+stream:call('box.begin')
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+stream:call('ping')
+stream:eval('ping()')
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+stream:eval('box.begin()')
+-- successful commit using stream:call
+stream:call('box.commit')
+
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+test_run:switch("test")
+-- Select return tuple, because transaction was successful
+s:select()
+s:delete{1}
+test_run:switch('default')
+-- Check rollback using stream:call
+stream:begin()
+space:replace({2})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful rollback using stream:call
+stream:call('box.rollback')
+-- Empty selects transaction rollbacked
+space:select({})
+space_no_stream:select{}
+test_run:switch("test")
+-- Empty select transaction rollbacked
+s:select()
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+for i = 1, 10 do space:replace{i} end
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+s:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+function execute_sql_string(stream, sql_string)
+    if stream then
+        stream:execute(sql_string)
+    else
+        box.execute(sql_string)
+    end
+end$
+function execute_sql_string_and_return_result(stream, sql_string)
+    if stream then
+        return pcall(stream.execute, stream, sql_string)
+    else
+        return box.execute(sql_string)
+    end
+end$
+function monster_ddl(stream)
+    local _, err1, err2, err3, err4, err5, err6
+    local stream_or_box = stream or box
+    execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER);]])
+    execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER UNIQUE,
+                                                 CONSTRAINT ck1
+                                                 CHECK(b < 100));]])
+
+    execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+    execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+    execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+                                                          KEY, a INTEGER);]])
+
+    execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+    execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+                                 CHECK(b > 0);]])
+
+    _, err1 =
+        execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+                                                       RENAME TO t1;]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+                                 ck2 CHECK(a > 0);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+                                                       INTEGER PRIMARY KEY);]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+
+    execute_sql_string(stream, [[CREATE TABLE
+                                 trigger_catcher(id INTEGER PRIMARY
+                                                 KEY AUTOINCREMENT);]])
+
+    execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+    execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+                                 t1 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+                                 t2 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+                                                       ON t1(a, b);]])
+
+    execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+    _, err5 =
+        execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+    _, err6 =
+        execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+                                                       t_does_not_exist;]])
+
+    execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+    return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+            err5, err6}
+end$
+function monster_ddl_cmp_res(res1, res2)
+    if json.encode(res1) == json.encode(res2) then
+        return true
+    end
+    return res1, res2
+end$
+function monster_ddl_is_clean(stream)
+    local stream_or_box = stream or box
+    assert(stream_or_box.space.T1 == nil)
+    assert(stream_or_box.space.T2 == nil)
+    assert(stream_or_box.space._trigger:count() == 0)
+    assert(stream_or_box.space._fk_constraint:count() == 0)
+    assert(stream_or_box.space._ck_constraint:count() == 0)
+    assert(stream_or_box.space.T_RENAMED == nil)
+    assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+function monster_ddl_check(stream)
+    local _, err1, err2, err3, err4, res
+    local stream_or_box = stream or box
+    _, err1 =
+       execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                      VALUES (1, 1, 101)]])
+    execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                       VALUES(2, 2, 1)]])
+    _, err3 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, 20, 1)]])
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, -1, 1)]])
+    execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+    if not stream then
+        assert(stream_or_box.space.T_RENAMED ~= nil)
+        assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+        assert(stream_or_box.space.T_TO_RENAME == nil)
+        res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                             trigger_catcher]])
+    else
+        _, res =
+            execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                           trigger_catcher]])
+    end
+    return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+            err3, err4, res}
+end$
+function monster_ddl_clear(stream)
+    execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+test_run:cmd("setopt delimiter ''")$
+
+test_run:cmd("start server test with args='10, true'")
+test_run:switch('test')
+test_run:cmd("setopt delimiter '$'")
+function monster_ddl_is_clean()
+    if not (box.space.T1 == nil) or
+       not (box.space.T2 == nil) or
+       not (box.space._trigger:count() == 0) or
+       not (box.space._fk_constraint:count() == 0) or
+       not (box.space._ck_constraint:count() == 0) or
+       not (box.space.T_RENAMED == nil) or
+       not (box.space.T_TO_RENAME == nil) then
+           return false
+    end
+    return true
+end$
+test_run:cmd("setopt delimiter ''")$
+test_run:switch('default')
+
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+
+-- No txn.
+true_ddl_res = monster_ddl()
+true_ddl_res
+
+true_check_res = monster_ddl_check()
+true_check_res
+
+monster_ddl_clear()
+monster_ddl_is_clean()
+
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+-- reload schema
+stream:ping()
+space = stream.space.TEST
+assert(space ~= nil)
+stream:execute('START TRANSACTION')
+space:replace{1, 2, '3'}
+space:select()
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:execute('COMMIT')
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:unprepare(stream_pr.stmt_id)
+conn:close()
+test_run:switch('test')
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+box.space.TEST:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
+test_run:cmd("cleanup server test")
+test_run:cmd("delete server test")
diff --git a/test/box/suite.ini b/test/box/suite.ini
index 637766cdd..369354eda 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -5,7 +5,7 @@ script = box.lua
 disabled = rtree_errinj.test.lua tuple_bench.test.lua
 long_run = huge_field_map_long.test.lua
 config = engine.cfg
-release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua net.box_iproto_streams.test.lua
+release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua net.box_iproto_streams.test.lua net.box_iproto_transactions_over_streams.test.lua
 lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua
 use_unix_sockets = True
 use_unix_sockets_iproto = True
-- 
2.20.1



More information about the Tarantool-patches mailing list