[Tarantool-patches] [PATCH 7/7] net.box: add interactive transaction support in net.box

mechanik20051988 mechanik20051988 at tarantool.org
Thu Aug 5 21:17:45 MSK 2021


From: mechanik20051988 <mechanik20.05.1988 at gmail.com>

Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.

Closes #5860

@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:stream()
space = stream.space.test
space_not_from_stream = conn.space.test

stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
---
 .../gh-5860-implement-streams-in-iproto.md    |   28 +
 src/box/lua/net_box.c                         |   51 +-
 src/box/lua/net_box.lua                       |   50 +-
 test/box/stream.result                        | 3036 +++++++++++++++++
 test/box/stream.test.lua                      | 1201 +++++++
 5 files changed, 4358 insertions(+), 8 deletions(-)
 create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md

diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
new file mode 100644
index 000000000..d0f1359dd
--- /dev/null
+++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
@@ -0,0 +1,28 @@
+## feature/core
+
+* Streams and interactive transactions over streams are implemented
+  in iproto. Stream is associated with it's ID, which is unique within
+  one connection. All requests with same not zero stream ID belongs to
+  the same stream. All requests in stream processed synchronously. The
+  execution of the next   request will not start until the previous one
+  is completed. If request has zero stream ID it does not belong to stream
+  and is processed in the old way.
+  In `net.box`, stream is an object above connection that has the same
+  methods, but allows to execute requests sequentially. ID is generated
+  on the client side in two ways: automatically or manually. User can
+  choose any of two methods, but can not mix them. If user writes his
+  own connector and wants to use streams, he must transmit stream_id over
+  the iproto protocol.
+  The main purpose of streams is transactions via iproto. Each stream
+  can start its own transaction, so they allows multiplexing several
+  transactions over one connection. There are multiple ways to begin,
+  commit and rollback transaction: using appropriate stream methods, using
+  `call` or `eval` methods or using `execute` method with sql transaction
+  syntax. User can mix these methods, for example, start transaction using
+  `stream:begin()`, and commit transaction using `stream:call('box.commit')`
+  or stream:execute('COMMIT').
+  If any request fails during the transaction, it will not affect the other
+  requests in the transaction. If disconnect occurs when there is some active
+  transaction in stream, this transaction will be rollbacked,  if it does not
+  have time to commit before this moment.
+
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index ec850cd9f..e7b95ba84 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -70,7 +70,10 @@ enum netbox_method {
 	NETBOX_MIN         = 14,
 	NETBOX_MAX         = 15,
 	NETBOX_COUNT       = 16,
-	NETBOX_INJECT      = 17,
+	NETBOX_BEGIN       = 17,
+	NETBOX_COMMIT      = 18,
+	NETBOX_ROLLBACK    = 19,
+	NETBOX_INJECT      = 20,
 	netbox_method_MAX
 };
 
@@ -620,6 +623,46 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
 	netbox_encode_prepare(L, idx, stream, sync, stream_id);
 }
 
+static inline void
+netbox_encode_txn(lua_State *L, enum iproto_type type, int idx,
+		  struct mpstream *stream, uint64_t sync,
+		  uint64_t stream_id)
+{
+	(void)L;
+	(void) idx;
+	assert(type == IPROTO_TRANSACTION_BEGIN ||
+	       type == IPROTO_TRANSACTION_COMMIT ||
+	       type == IPROTO_TRANSACTION_ROLLBACK);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    type, stream_id);
+
+	netbox_encode_request(stream, svp);
+}
+
+static void
+netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream,
+		    uint64_t sync, uint64_t stream_id)
+{
+	return netbox_encode_txn(L, IPROTO_TRANSACTION_BEGIN, idx, stream,
+				 sync, stream_id);
+}
+
+static void
+netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream,
+		     uint64_t sync, uint64_t stream_id)
+{
+	return netbox_encode_txn(L, IPROTO_TRANSACTION_COMMIT, idx, stream,
+				 sync, stream_id);
+}
+
+static void
+netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream,
+		       uint64_t sync, uint64_t stream_id)
+{
+	return netbox_encode_txn(L, IPROTO_TRANSACTION_ROLLBACK, idx, stream,
+				 sync, stream_id);
+}
+
 static void
 netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
 		     uint64_t sync, uint64_t stream_id)
@@ -667,6 +710,9 @@ netbox_encode_method(struct lua_State *L)
 		[NETBOX_MIN]		= netbox_encode_select,
 		[NETBOX_MAX]		= netbox_encode_select,
 		[NETBOX_COUNT]		= netbox_encode_call,
+		[NETBOX_BEGIN]          = netbox_encode_begin,
+		[NETBOX_COMMIT]         = netbox_encode_commit,
+		[NETBOX_ROLLBACK]       = netbox_encode_rollback,
 		[NETBOX_INJECT]		= netbox_encode_inject,
 	};
 	enum netbox_method method = lua_tointeger(L, 1);
@@ -1047,6 +1093,9 @@ netbox_decode_method(struct lua_State *L)
 		[NETBOX_MIN]		= netbox_decode_tuple,
 		[NETBOX_MAX]		= netbox_decode_tuple,
 		[NETBOX_COUNT]		= netbox_decode_value,
+		[NETBOX_BEGIN]          = netbox_decode_nil,
+		[NETBOX_COMMIT]         = netbox_decode_nil,
+		[NETBOX_ROLLBACK]       = netbox_decode_nil,
 		[NETBOX_INJECT]		= netbox_decode_table,
 	};
 	enum netbox_method method = lua_tointeger(L, 1);
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index bf6a89e15..199d78127 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -70,8 +70,11 @@ local M_GET         = 13
 local M_MIN         = 14
 local M_MAX         = 15
 local M_COUNT       = 16
+local M_BEGIN       = 17
+local M_COMMIT      = 18
+local M_ROLLBACK    = 19
 -- Injects raw data into connection. Used by console and tests.
-local M_INJECT      = 17
+local M_INJECT      = 20
 
 ffi.cdef[[
 struct error *
@@ -1167,16 +1170,52 @@ local function check_eval_args(args)
     end
 end
 
+local function nothing_or_data(value)
+    if value ~= nil then
+        return value
+    end
+end
+
 function stream_methods:new_stream()
     check_remote_arg(self, 'stream')
     box.error(E_PROC_LUA, "Unsupported for stream");
 end
 
+function stream_methods:begin(opts)
+    check_remote_arg(self, 'begin')
+    local res = self:_request(M_BEGIN, opts, nil, self._stream_id)
+    if type(res) ~= 'table' or opts and opts.is_async then
+        return nothing_or_data(res)
+    end
+    return unpack(res)
+end
+
+function stream_methods:commit(opts)
+    check_remote_arg(self, 'commit')
+    local res = self:_request(M_COMMIT, opts, nil, self._stream_id)
+    if type(res) ~= 'table' or opts and opts.is_async then
+        return nothing_or_data(res)
+    end
+    return unpack(res)
+end
+
+function stream_methods:rollback(opts)
+    check_remote_arg(self, 'rollback')
+    local res = self:_request(M_ROLLBACK, opts, nil, self._stream_id)
+    if type(res) ~= 'table' or opts and opts.is_async then
+        return nothing_or_data(res)
+    end
+    return unpack(res)
+end
+
 function remote_methods:new_stream()
     check_remote_arg(self, 'stream')
     self._last_stream_id = self._last_stream_id + 1
     local stream = setmetatable({
         new_stream = stream_methods.new_stream,
+        begin = stream_methods.begin,
+        commit = stream_methods.commit,
+        rollback = stream_methods.rollback,
         _stream_id = self._last_stream_id,
         space = setmetatable({
             _space = {},
@@ -1498,12 +1537,6 @@ function console_methods:eval(line, timeout)
     return res[1] or res
 end
 
-local function nothing_or_data(value)
-    if value ~= nil then
-        return value
-    end
-end
-
 space_metatable = function(remote)
     local methods = {}
 
@@ -1662,6 +1695,9 @@ local this_module = {
         min         = M_MIN,
         max         = M_MAX,
         count       = M_COUNT,
+        begin       = M_BEGIN,
+        commit      = M_COMMIT,
+        rollback    = M_ROLLBACK,
         inject      = M_INJECT,
     }
 }
diff --git a/test/box/stream.result b/test/box/stream.result
index bfcf6c6be..609ce8100 100644
--- a/test/box/stream.result
+++ b/test/box/stream.result
@@ -3,9 +3,15 @@
 net_box = require('net.box')
  | ---
  | ...
+json = require('json')
+ | ---
+ | ...
 fiber = require('fiber')
  | ---
  | ...
+msgpack = require('msgpack')
+ | ---
+ | ...
 test_run = require('test_run').new()
  | ---
  | ...
@@ -95,6 +101,145 @@ stream:new_stream()
  | ---
  | - error: Unsupported for stream
  | ...
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+ | ---
+ | ...
+conn_2 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1_1 = conn_1:new_stream()
+ | ---
+ | ...
+stream_1_2 = conn_1:new_stream()
+ | ---
+ | ...
+stream_2 = conn_2:new_stream()
+ | ---
+ | ...
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+ | ---
+ | ...
+stream_1_1:rollback()
+ | ---
+ | ...
+
+stream_1_1:begin()
+ | ---
+ | ...
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+ | ---
+ | ...
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+ | ---
+ | ...
+box.commit()
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+stream_1_1:commit()
+ | ---
+ | ...
+stream_1_2:commit()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+ | ---
+ | - error: Unable to process this type (14) of requests out of stream
+ | ...
+conn:_request(net_box._method.commit, nil, nil, nil)
+ | ---
+ | - error: Unable to process this type (15) of requests out of stream
+ | ...
+conn:_request(net_box._method.rollback, nil, nil, nil)
+ | ---
+ | - error: Unable to process this type (16) of requests out of stream
+ | ...
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+ | ---
+ | ...
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+ | ---
+ | ...
+IPROTO_REQUEST_TYPE       = 0x00
+ | ---
+ | ...
+IPROTO_SYNC               = 0x01
+ | ---
+ | ...
+IPROTO_AUTH               = 7
+ | ---
+ | ...
+IPROTO_STREAM_ID          = 0x0a
+ | ---
+ | ...
+next_request_id           = 9
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+header = msgpack.encode({
+    [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+    [IPROTO_SYNC] = next_request_id,
+    [IPROTO_STREAM_ID] = 1,
+});
+ | ---
+ | ...
+body = msgpack.encode({nil});
+ | ---
+ | ...
+size = msgpack.encode(header:len() + body:len());
+ | ---
+ | ...
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+                                nil, nil, nil, nil,
+                                size .. header .. body);
+ | ---
+ | - null
+ | - Unable to process this type (7) of requests in stream
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
 conn:close()
  | ---
  | ...
@@ -543,6 +688,2897 @@ test_run:cmd("stop server test")
  | - true
  | ...
 
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+space = stream.space.test
+ | ---
+ | ...
+
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+ | ---
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+space:replace({1})
+ | ---
+ | - [1]
+ | ...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+ | ---
+ | - []
+ | ...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+ | ---
+ | - []
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+ | ---
+ | - error: Transaction has been aborted by a fiber yield
+ | ...
+-- Select is empty, transaction was aborted
+space:select{}
+ | ---
+ | - []
+ | ...
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+ | ---
+ | ...
+stream:ping()
+ | ---
+ | - true
+ | ...
+stream:commit()
+ | ---
+ | ...
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+ | ---
+ | ...
+stream:call('s:replace', {{1}})
+ | ---
+ | - [1]
+ | ...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+ | ---
+ | - []
+ | ...
+stream:call('s:select', {})
+ | ---
+ | - []
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+ | ---
+ | - error: Transaction has been aborted by a fiber yield
+ | ...
+-- Select is empty, transaction was aborted
+space:select{}
+ | ---
+ | - []
+ | ...
+
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+stream:call('s:replace', {{1}})
+ | ---
+ | - [1]
+ | ...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+ | ---
+ | - []
+ | ...
+stream:call('s:select', {})
+ | ---
+ | - []
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+ | ---
+ | - error: Transaction has been aborted by a fiber yield
+ | ...
+-- Select is empty, transaction was aborted
+space:select{}
+ | ---
+ | - []
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+s:drop()
+ | ---
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+stream:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+ | ---
+ | ...
+space_2_no_stream = conn.space.test_2
+ | ---
+ | ...
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+ | ---
+ | ...
+space_1:replace({1})
+ | ---
+ | - [1]
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+space_2:replace({1})
+ | ---
+ | - [1]
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+ | ---
+ | - []
+ | ...
+space_2_no_stream:select{}
+ | ---
+ | - []
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+ | ---
+ | - - [1]
+ | ...
+space_2:select({})
+ | ---
+ | - - [1]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+ | ---
+ | - - [1]
+ | ...
+space_2:select{}
+ | ---
+ | - - [1]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+ | ---
+ | - - [1]
+ | ...
+s2:select()
+ | ---
+ | - - [1]
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_1_2 = stream_2.space.test_1
+ | ---
+ | ...
+space_2_1 = stream_1.space.test_2
+ | ---
+ | ...
+space_2_2 = stream_2.space.test_2
+ | ---
+ | ...
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+
+-- Simple read/write conflict.
+space_1_1:select({1})
+ | ---
+ | - []
+ | ...
+space_1_2:select({1})
+ | ---
+ | - []
+ | ...
+space_1_1:replace({1, 1})
+ | ---
+ | - [1, 1]
+ | ...
+space_1_2:replace({1, 2})
+ | ---
+ | - [1, 2]
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+-- This transaction fails, because of conflict
+stream_2:commit()
+ | ---
+ | - error: Transaction has been aborted by conflict
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+-- Here we must accept [1, 1]
+space_1_1:select({})
+ | ---
+ | - - [1, 1]
+ | ...
+space_1_2:select({})
+ | ---
+ | - - [1, 1]
+ | ...
+
+-- Same test for vinyl sapce
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+space_2_1:select({1})
+ | ---
+ | - []
+ | ...
+space_2_2:select({1})
+ | ---
+ | - []
+ | ...
+space_2_1:replace({1, 1})
+ | ---
+ | - [1, 1]
+ | ...
+space_2_2:replace({1, 2})
+ | ---
+ | - [1, 2]
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+-- This transaction fails, because of conflict
+stream_2:commit()
+ | ---
+ | - error: Transaction has been aborted by conflict
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+-- Here we must accept [1, 1]
+space_2_1:select({})
+ | ---
+ | - - [1, 1]
+ | ...
+space_2_2:select({})
+ | ---
+ | - - [1, 1]
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+ | ---
+ | - - [1, 1]
+ | ...
+s2:select()
+ | ---
+ | - - [1, 1]
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+
+-- Test rollback for memtx space
+space_1:replace({1})
+ | ---
+ | - [1]
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+ | ---
+ | - - [1]
+ | ...
+stream_1:rollback()
+ | ---
+ | ...
+-- Select is empty, transaction rollback
+space_1:select({})
+ | ---
+ | - []
+ | ...
+
+-- Test rollback for vinyl space
+space_2:replace({1})
+ | ---
+ | - [1]
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+ | ---
+ | - - [1]
+ | ...
+stream_2:rollback()
+ | ---
+ | ...
+-- Select is empty, transaction rollback
+space_2:select({})
+ | ---
+ | - []
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+ | ---
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Both select are empty, because transaction rollback
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+
+space_1:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_1:replace({2})
+ | ---
+ | - [2]
+ | ...
+-- Select return two previously inserted tuples
+space_1:select({})
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+
+space_2:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_2:replace({2})
+ | ---
+ | - [2]
+ | ...
+-- Select return two previously inserted tuples
+space_2:select({})
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+conn:close()
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Empty selects, transaction was rollback
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+-- Two empty selects
+space_1:select({})
+ | ---
+ | - []
+ | ...
+space_2:select({})
+ | ---
+ | - []
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Both select are empty, because transaction rollback
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+
+space_1:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_1:replace({2})
+ | ---
+ | - [2]
+ | ...
+-- Select return two previously inserted tuples
+space_1:select({})
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+
+space_2:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_2:replace({2})
+ | ---
+ | - [2]
+ | ...
+-- Select return two previously inserted tuples
+space_2:select({})
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+for i = 1, 1000 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+fiber.sleep(0)
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+ | ---
+ | - true
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+-- Select was empty, transaction rollbacked
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+_ = stream_1:commit({is_async = true})
+ | ---
+ | ...
+_ = stream_2:commit({is_async = true})
+ | ---
+ | ...
+fiber.sleep(0)
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+ | ---
+ | - true
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+ | ---
+ | ...
+rc2 = s2:select()
+ | ---
+ | ...
+assert(#rc1)
+ | ---
+ | - 100
+ | ...
+assert(#rc2)
+ | ---
+ | - 100
+ | ...
+s1:truncate()
+ | ---
+ | ...
+s2:truncate()
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+-- Two empty selects
+space_1:select({})
+ | ---
+ | - []
+ | ...
+space_2:select({})
+ | ---
+ | - []
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Both select are empty, because transaction rollback
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+space_1:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_1:replace({2})
+ | ---
+ | - [2]
+ | ...
+space_2:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_2:replace({2})
+ | ---
+ | - [2]
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Here we get two tuples, commit was successful
+s1:select{}
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+-- Here we get two tuples, commit was successful
+s2:select{}
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("start server test with args='1, true'")
+ | ---
+ | - true
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+box.space.test_1:drop()
+ | ---
+ | ...
+box.space.test_2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+
+memtx_futures = {}
+ | ---
+ | ...
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+ | ---
+ | ...
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+ | ---
+ | ...
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+ | ---
+ | ...
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+ | ---
+ | ...
+
+vinyl_futures = {}
+ | ---
+ | ...
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+ | ---
+ | ...
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+ | ---
+ | ...
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+ | ---
+ | ...
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+ | ---
+ | ...
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+ | ---
+ | ...
+
+memtx_results = wait_and_return_results(memtx_futures)
+ | ---
+ | ...
+vinyl_results = wait_and_return_results(vinyl_futures)
+ | ---
+ | ...
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+ | ---
+ | - true
+ | ...
+assert(not vinyl_results["begin"])
+ | ---
+ | - true
+ | ...
+-- [1]
+assert(memtx_results["replace"])
+ | ---
+ | - [1]
+ | ...
+assert(vinyl_results["replace"])
+ | ---
+ | - [1]
+ | ...
+-- [2]
+assert(memtx_results["insert"])
+ | ---
+ | - [2]
+ | ...
+assert(vinyl_results["insert"])
+ | ---
+ | - [2]
+ | ...
+-- [1] [2]
+assert(memtx_results["select"])
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+assert(vinyl_results["select"])
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+ | ---
+ | - true
+ | ...
+assert(not vinyl_results["commit"])
+ | ---
+ | - true
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+s2:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_1_2 = stream_2.space.test_1
+ | ---
+ | ...
+space_2_1 = stream_1.space.test_2
+ | ---
+ | ...
+space_2_2 = stream_2.space.test_2
+ | ---
+ | ...
+
+futures_1 = {}
+ | ---
+ | ...
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+ | ---
+ | ...
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+ | ---
+ | ...
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+ | ---
+ | ...
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+ | ---
+ | ...
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+ | ---
+ | ...
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+ | ---
+ | ...
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+ | ---
+ | ...
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+ | ---
+ | ...
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+ | ---
+ | ...
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+ | ---
+ | ...
+
+results_1 = wait_and_return_results(futures_1)
+ | ---
+ | ...
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+ | ---
+ | - true
+ | ...
+assert(not results_1["begin_2"])
+ | ---
+ | - true
+ | ...
+-- []
+assert(not results_1["select_1_1"][1])
+ | ---
+ | - true
+ | ...
+assert(not results_1["select_1_2"][1])
+ | ---
+ | - true
+ | ...
+-- [1]
+assert(results_1["replace_1_1"][1])
+ | ---
+ | - 1
+ | ...
+-- [1]
+assert(results_1["replace_1_1"][2])
+ | ---
+ | - 1
+ | ...
+-- [1]
+assert(results_1["replace_1_2"][1])
+ | ---
+ | - 1
+ | ...
+-- [2]
+assert(results_1["replace_1_2"][2])
+ | ---
+ | - 2
+ | ...
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+ | ---
+ | - true
+ | ...
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+ | ---
+ | - Transaction has been aborted by conflict
+ | ...
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+ | ---
+ | - [1, 1]
+ | ...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+
+futures_2 = {}
+ | ---
+ | ...
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+ | ---
+ | ...
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+ | ---
+ | ...
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+ | ---
+ | ...
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+ | ---
+ | ...
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+ | ---
+ | ...
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+ | ---
+ | ...
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+ | ---
+ | ...
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+ | ---
+ | ...
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+ | ---
+ | ...
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+ | ---
+ | ...
+
+results_2 = wait_and_return_results(futures_2)
+ | ---
+ | ...
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+ | ---
+ | - true
+ | ...
+assert(not results_2["begin_2"])
+ | ---
+ | - true
+ | ...
+-- []
+assert(not results_2["select_2_1"][1])
+ | ---
+ | - true
+ | ...
+assert(not results_2["select_2_2"][1])
+ | ---
+ | - true
+ | ...
+-- [1]
+assert(results_2["replace_2_1"][1])
+ | ---
+ | - 1
+ | ...
+-- [1]
+assert(results_2["replace_2_1"][2])
+ | ---
+ | - 1
+ | ...
+-- [1]
+assert(results_2["replace_2_2"][1])
+ | ---
+ | - 1
+ | ...
+-- [2]
+assert(results_2["replace_2_2"][2])
+ | ---
+ | - 2
+ | ...
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+ | ---
+ | - true
+ | ...
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+ | ---
+ | - Transaction has been aborted by conflict
+ | ...
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+ | ---
+ | - [1, 1]
+ | ...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+ | ---
+ | - - [1, 1]
+ | ...
+s2:select()
+ | ---
+ | - - [1, 1]
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+function ping() return "pong" end
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+space = stream.space.test
+ | ---
+ | ...
+space_no_stream = conn.space.test
+ | ---
+ | ...
+
+-- successful begin using stream:call
+stream:call('box.begin')
+ | ---
+ | ...
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+stream:call('ping')
+ | ---
+ | - pong
+ | ...
+stream:eval('ping()')
+ | ---
+ | ...
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+stream:eval('box.begin()')
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+-- successful commit using stream:call
+stream:call('box.commit')
+ | ---
+ | ...
+
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+ | ---
+ | ...
+space:replace({1})
+ | ---
+ | - [1]
+ | ...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+ | ---
+ | - []
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+ | ---
+ | - - [1]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+ | ---
+ | - - [1]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select return tuple, because transaction was successful
+s:select()
+ | ---
+ | - - [1]
+ | ...
+s:delete{1}
+ | ---
+ | - [1]
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Check rollback using stream:call
+stream:begin()
+ | ---
+ | ...
+space:replace({2})
+ | ---
+ | - [2]
+ | ...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+ | ---
+ | - []
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+ | ---
+ | - - [2]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+--Successful rollback using stream:call
+stream:call('box.rollback')
+ | ---
+ | ...
+-- Empty selects transaction rollbacked
+space:select({})
+ | ---
+ | - []
+ | ...
+space_no_stream:select{}
+ | ---
+ | - []
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Empty select transaction rollbacked
+s:select()
+ | ---
+ | - []
+ | ...
+s:drop()
+ | ---
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+space = stream.space.test
+ | ---
+ | ...
+for i = 1, 10 do space:replace{i} end
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+s:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+ | ---
+ | - true
+ | ...
+function execute_sql_string(stream, sql_string)
+    if stream then
+        stream:execute(sql_string)
+    else
+        box.execute(sql_string)
+    end
+end$
+ | ---
+ | ...
+function execute_sql_string_and_return_result(stream, sql_string)
+    if stream then
+        return pcall(stream.execute, stream, sql_string)
+    else
+        return box.execute(sql_string)
+    end
+end$
+ | ---
+ | ...
+function monster_ddl(stream)
+    local _, err1, err2, err3, err4, err5, err6
+    local stream_or_box = stream or box
+    execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER);]])
+    execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER UNIQUE,
+                                                 CONSTRAINT ck1
+                                                 CHECK(b < 100));]])
+
+    execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+    execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+    execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+                                                          KEY, a INTEGER);]])
+
+    execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+    execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+                                 CHECK(b > 0);]])
+
+    _, err1 =
+        execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+                                                       RENAME TO t1;]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+                                 ck2 CHECK(a > 0);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+                                                       INTEGER PRIMARY KEY);]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+
+    execute_sql_string(stream, [[CREATE TABLE
+                                 trigger_catcher(id INTEGER PRIMARY
+                                                 KEY AUTOINCREMENT);]])
+
+    execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+    execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+                                 t1 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+                                 t2 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+                                                       ON t1(a, b);]])
+
+    execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+    _, err5 =
+        execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+    _, err6 =
+        execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+                                                       t_does_not_exist;]])
+
+    execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+    return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+            err5, err6}
+end$
+ | ---
+ | ...
+function monster_ddl_cmp_res(res1, res2)
+    if json.encode(res1) == json.encode(res2) then
+        return true
+    end
+    return res1, res2
+end$
+ | ---
+ | ...
+function monster_ddl_is_clean(stream)
+    local stream_or_box = stream or box
+    assert(stream_or_box.space.T1 == nil)
+    assert(stream_or_box.space.T2 == nil)
+    assert(stream_or_box.space._trigger:count() == 0)
+    assert(stream_or_box.space._fk_constraint:count() == 0)
+    assert(stream_or_box.space._ck_constraint:count() == 0)
+    assert(stream_or_box.space.T_RENAMED == nil)
+    assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+ | ---
+ | ...
+function monster_ddl_check(stream)
+    local _, err1, err2, err3, err4, res
+    local stream_or_box = stream or box
+    _, err1 =
+       execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                      VALUES (1, 1, 101)]])
+    execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                       VALUES(2, 2, 1)]])
+    _, err3 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, 20, 1)]])
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, -1, 1)]])
+    execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+    if not stream then
+        assert(stream_or_box.space.T_RENAMED ~= nil)
+        assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+        assert(stream_or_box.space.T_TO_RENAME == nil)
+        res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                             trigger_catcher]])
+    else
+        _, res =
+            execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                           trigger_catcher]])
+    end
+    return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+            err3, err4, res}
+end$
+ | ---
+ | ...
+function monster_ddl_clear(stream)
+    execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''")$
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+test_run:cmd("setopt delimiter '$'")
+ | ---
+ | - true
+ | ...
+function monster_ddl_is_clean()
+    if not (box.space.T1 == nil) or
+       not (box.space.T2 == nil) or
+       not (box.space._trigger:count() == 0) or
+       not (box.space._fk_constraint:count() == 0) or
+       not (box.space._ck_constraint:count() == 0) or
+       not (box.space.T_RENAMED == nil) or
+       not (box.space.T_TO_RENAME == nil) then
+           return false
+    end
+    return true
+end$
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''")$
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+
+-- No txn.
+true_ddl_res = monster_ddl()
+ | ---
+ | ...
+true_ddl_res
+ | ---
+ | - - 'Finished ok, errors in the middle: '
+ |   - Space 'T1' already exists
+ |   - Space 'T1' already exists
+ |   - Space 'T3' does not exist
+ |   - Index 'T1A' already exists in space 'T1'
+ |   - 'Failed to execute SQL statement: can not truncate space ''T2'' because other
+ |     objects depend on it'
+ |   - Space 'T_DOES_NOT_EXIST' does not exist
+ | ...
+
+true_check_res = monster_ddl_check()
+ | ---
+ | ...
+true_check_res
+ | ---
+ | - - 'Finished ok, errors and trigger catcher content: '
+ |   - 'Check constraint failed ''CK1'': b < 100'
+ |   - Duplicate key exists in unique index "unique_unnamed_T2_2" in space "T2" with
+ |     old tuple - [1, 1, 1] and new tuple - [2, 2, 1]
+ |   - 'Failed to execute SQL statement: FOREIGN KEY constraint failed'
+ |   - 'Check constraint failed ''CK2'': a > 0'
+ |   - metadata:
+ |     - name: ID
+ |       type: integer
+ |     rows:
+ |     - [1]
+ | ...
+
+monster_ddl_clear()
+ | ---
+ | ...
+monster_ddl_is_clean()
+ | ---
+ | ...
+
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+ | ---
+ | ...
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+ddl_res = monster_ddl(stream)
+ | ---
+ | ...
+monster_ddl_clear(stream)
+ | ---
+ | ...
+stream:call('monster_ddl_is_clean')
+ | ---
+ | - true
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+ | ---
+ | - true
+ | ...
+
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+ddl_res = monster_ddl(stream)
+ | ---
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+ | ---
+ | - true
+ | ...
+
+check_res = monster_ddl_check(stream)
+ | ---
+ | ...
+monster_ddl_cmp_res(check_res, true_check_res)
+ | ---
+ | - true
+ | ...
+
+monster_ddl_clear(stream)
+ | ---
+ | ...
+stream:call('monster_ddl_is_clean')
+ | ---
+ | - true
+ | ...
+
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+ | ---
+ | ...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+ | ---
+ | - true
+ | ...
+
+check_res = monster_ddl_check(stream)
+ | ---
+ | ...
+monster_ddl_cmp_res(check_res, true_check_res)
+ | ---
+ | - true
+ | ...
+
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+monster_ddl_clear(stream)
+ | ---
+ | ...
+stream:call('monster_ddl_is_clean')
+ | ---
+ | - true
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+ddl_res = monster_ddl(stream)
+ | ---
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+ | ---
+ | - true
+ | ...
+
+check_res = monster_ddl_check(stream)
+ | ---
+ | ...
+monster_ddl_cmp_res(check_res, true_check_res)
+ | ---
+ | - true
+ | ...
+
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+monster_ddl_clear(stream)
+ | ---
+ | ...
+stream:call('monster_ddl_is_clean')
+ | ---
+ | - true
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+ | ---
+ | - row_count: 1
+ | ...
+-- reload schema
+stream:ping()
+ | ---
+ | - true
+ | ...
+space = stream.space.TEST
+ | ---
+ | ...
+assert(space ~= nil)
+ | ---
+ | - true
+ | ...
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+space:replace{1, 2, '3'}
+ | ---
+ | - [1, 2, '3']
+ | ...
+space:select()
+ | ---
+ | - - [1, 2, '3']
+ | ...
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+ | ---
+ | - []
+ | ...
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+ | ---
+ | ...
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+ | ---
+ | ...
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+ | ---
+ | - true
+ | ...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+ | ---
+ | - metadata:
+ |   - name: ID
+ |     type: integer
+ |   - name: A
+ |     type: number
+ |   - name: B
+ |     type: string
+ |   rows:
+ |   - [1, 2, '3']
+ | ...
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+ | ---
+ | - metadata:
+ |   - name: ID
+ |     type: integer
+ |   - name: A
+ |     type: number
+ |   - name: B
+ |     type: string
+ |   rows: []
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+ | ---
+ | - metadata:
+ |   - name: ID
+ |     type: integer
+ |   - name: A
+ |     type: number
+ |   - name: B
+ |     type: string
+ |   rows:
+ |   - [1, 2, '3']
+ | ...
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+ | ---
+ | - metadata:
+ |   - name: ID
+ |     type: integer
+ |   - name: A
+ |     type: number
+ |   - name: B
+ |     type: string
+ |   rows:
+ |   - [1, 2, '3']
+ | ...
+stream:unprepare(stream_pr.stmt_id)
+ | ---
+ | - null
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+ | ---
+ | - - [1, 2, '3']
+ | ...
+box.space.TEST:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
 test_run:cmd("cleanup server test")
  | ---
  | - true
diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua
index 190f17d8e..b8bd2d327 100644
--- a/test/box/stream.test.lua
+++ b/test/box/stream.test.lua
@@ -1,6 +1,8 @@
 -- This test checks streams iplementation in iproto (gh-5860).
 net_box = require('net.box')
+json = require('json')
 fiber = require('fiber')
+msgpack = require('msgpack')
 test_run = require('test_run').new()
 
 test_run:cmd("create server test with script='box/stream.lua'")
@@ -45,6 +47,62 @@ conn = net_box.connect(server_addr)
 stream = conn:new_stream()
 -- Unsupported for stream
 stream:new_stream()
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+conn_2 = net_box.connect(server_addr)
+stream_1_1 = conn_1:new_stream()
+stream_1_2 = conn_1:new_stream()
+stream_2 = conn_2:new_stream()
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+stream_1_1:rollback()
+
+stream_1_1:begin()
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+test_run:switch("test")
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+box.commit()
+test_run:switch("default")
+stream_1_1:commit()
+stream_1_2:commit()
+stream_2:commit()
+
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+conn:_request(net_box._method.commit, nil, nil, nil)
+conn:_request(net_box._method.rollback, nil, nil, nil)
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+IPROTO_REQUEST_TYPE       = 0x00
+IPROTO_SYNC               = 0x01
+IPROTO_AUTH               = 7
+IPROTO_STREAM_ID          = 0x0a
+next_request_id           = 9
+test_run:cmd("setopt delimiter ';'")
+header = msgpack.encode({
+    [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+    [IPROTO_SYNC] = next_request_id,
+    [IPROTO_STREAM_ID] = 1,
+});
+body = msgpack.encode({nil});
+size = msgpack.encode(header:len() + body:len());
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+                                nil, nil, nil, nil,
+                                size .. header .. body);
+test_run:cmd("setopt delimiter ''");
 conn:close()
 
 -- Check that spaces in stream object updates, during reload_schema
@@ -203,5 +261,1148 @@ assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
 test_run:switch("default")
 test_run:cmd("stop server test")
 
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+test_run:switch('default')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+-- Select is empty, transaction was aborted
+space:select{}
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+stream:ping()
+stream:commit()
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+-- Select is empty, transaction was aborted
+space:select{}
+
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+-- Select is empty, transaction was aborted
+space:select{}
+
+test_run:switch('test')
+s:drop()
+-- Check that there are no streams and messages, which
+-- was not deleted
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+stream:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s1:create_index('primary')
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+space_2_no_stream = conn.space.test_2
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+space_1:replace({1})
+stream_2:begin()
+space_2:replace({1})
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+test_run:switch('default')
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+space_2_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+space_2:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+space_2:select{}
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Simple read/write conflict.
+space_1_1:select({1})
+space_1_2:select({1})
+space_1_1:replace({1, 1})
+space_1_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_1_1:select({})
+space_1_2:select({})
+
+-- Same test for vinyl sapce
+stream_1:begin()
+stream_2:begin()
+space_2_1:select({1})
+space_2_2:select({1})
+space_2_1:replace({1, 1})
+space_2_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_2_1:select({})
+space_2_2:select({})
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Test rollback for memtx space
+space_1:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+stream_1:rollback()
+-- Select is empty, transaction rollback
+space_1:select({})
+
+-- Test rollback for vinyl space
+space_2:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+stream_2:rollback()
+-- Select is empty, transaction rollback
+space_2:select({})
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+stream_1:commit()
+stream_2:begin()
+stream_2:commit()
+conn:close()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+conn:close()
+
+test_run:switch("test")
+-- Empty selects, transaction was rollback
+s1:select()
+s2:select()
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+for i = 1, 1000 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select was empty, transaction rollbacked
+s1:select()
+s2:select()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+test_run:cmd("setopt delimiter ';'")
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+_ = stream_1:commit({is_async = true})
+_ = stream_2:commit({is_async = true})
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+rc2 = s2:select()
+assert(#rc1)
+assert(#rc2)
+s1:truncate()
+s2:truncate()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+
+stream_1:begin()
+stream_2:begin()
+space_1:replace({1})
+space_1:replace({2})
+space_2:replace({1})
+space_2:replace({2})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+s1:select{}
+-- Here we get two tuples, commit was successful
+s2:select{}
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+test_run:cmd("start server test with args='1, true'")
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+box.space.test_1:drop()
+box.space.test_2:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+space_1 = stream_1.space.test_1
+stream_2 = conn:new_stream()
+space_2 = stream_2.space.test_2
+
+memtx_futures = {}
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+
+vinyl_futures = {}
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+
+memtx_results = wait_and_return_results(memtx_futures)
+vinyl_results = wait_and_return_results(vinyl_futures)
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+assert(not vinyl_results["begin"])
+-- [1]
+assert(memtx_results["replace"])
+assert(vinyl_results["replace"])
+-- [2]
+assert(memtx_results["insert"])
+assert(vinyl_results["insert"])
+-- [1] [2]
+assert(memtx_results["select"])
+assert(vinyl_results["select"])
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+assert(not vinyl_results["commit"])
+
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+
+futures_1 = {}
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+
+results_1 = wait_and_return_results(futures_1)
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+assert(not results_1["begin_2"])
+-- []
+assert(not results_1["select_1_1"][1])
+assert(not results_1["select_1_2"][1])
+-- [1]
+assert(results_1["replace_1_1"][1])
+-- [1]
+assert(results_1["replace_1_1"][2])
+-- [1]
+assert(results_1["replace_1_2"][1])
+-- [2]
+assert(results_1["replace_1_2"][2])
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+
+futures_2 = {}
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+
+results_2 = wait_and_return_results(futures_2)
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+assert(not results_2["begin_2"])
+-- []
+assert(not results_2["select_2_1"][1])
+assert(not results_2["select_2_2"][1])
+-- [1]
+assert(results_2["replace_2_1"][1])
+-- [1]
+assert(results_2["replace_2_1"][2])
+-- [1]
+assert(results_2["replace_2_2"][1])
+-- [2]
+assert(results_2["replace_2_2"][2])
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+function ping() return "pong" end
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+space_no_stream = conn.space.test
+
+-- successful begin using stream:call
+stream:call('box.begin')
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+stream:call('ping')
+stream:eval('ping()')
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+stream:eval('box.begin()')
+-- successful commit using stream:call
+stream:call('box.commit')
+
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+test_run:switch("test")
+-- Select return tuple, because transaction was successful
+s:select()
+s:delete{1}
+test_run:switch('default')
+-- Check rollback using stream:call
+stream:begin()
+space:replace({2})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful rollback using stream:call
+stream:call('box.rollback')
+-- Empty selects transaction rollbacked
+space:select({})
+space_no_stream:select{}
+test_run:switch("test")
+-- Empty select transaction rollbacked
+s:select()
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+for i = 1, 10 do space:replace{i} end
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+s:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+function execute_sql_string(stream, sql_string)
+    if stream then
+        stream:execute(sql_string)
+    else
+        box.execute(sql_string)
+    end
+end$
+function execute_sql_string_and_return_result(stream, sql_string)
+    if stream then
+        return pcall(stream.execute, stream, sql_string)
+    else
+        return box.execute(sql_string)
+    end
+end$
+function monster_ddl(stream)
+    local _, err1, err2, err3, err4, err5, err6
+    local stream_or_box = stream or box
+    execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER);]])
+    execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER UNIQUE,
+                                                 CONSTRAINT ck1
+                                                 CHECK(b < 100));]])
+
+    execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+    execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+    execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+                                                          KEY, a INTEGER);]])
+
+    execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+    execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+                                 CHECK(b > 0);]])
+
+    _, err1 =
+        execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+                                                       RENAME TO t1;]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+                                 ck2 CHECK(a > 0);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+                                                       INTEGER PRIMARY KEY);]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+
+    execute_sql_string(stream, [[CREATE TABLE
+                                 trigger_catcher(id INTEGER PRIMARY
+                                                 KEY AUTOINCREMENT);]])
+
+    execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+    execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+                                 t1 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+                                 t2 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+                                                       ON t1(a, b);]])
+
+    execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+    _, err5 =
+        execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+    _, err6 =
+        execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+                                                       t_does_not_exist;]])
+
+    execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+    return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+            err5, err6}
+end$
+function monster_ddl_cmp_res(res1, res2)
+    if json.encode(res1) == json.encode(res2) then
+        return true
+    end
+    return res1, res2
+end$
+function monster_ddl_is_clean(stream)
+    local stream_or_box = stream or box
+    assert(stream_or_box.space.T1 == nil)
+    assert(stream_or_box.space.T2 == nil)
+    assert(stream_or_box.space._trigger:count() == 0)
+    assert(stream_or_box.space._fk_constraint:count() == 0)
+    assert(stream_or_box.space._ck_constraint:count() == 0)
+    assert(stream_or_box.space.T_RENAMED == nil)
+    assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+function monster_ddl_check(stream)
+    local _, err1, err2, err3, err4, res
+    local stream_or_box = stream or box
+    _, err1 =
+       execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                      VALUES (1, 1, 101)]])
+    execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                       VALUES(2, 2, 1)]])
+    _, err3 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, 20, 1)]])
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, -1, 1)]])
+    execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+    if not stream then
+        assert(stream_or_box.space.T_RENAMED ~= nil)
+        assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+        assert(stream_or_box.space.T_TO_RENAME == nil)
+        res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                             trigger_catcher]])
+    else
+        _, res =
+            execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                           trigger_catcher]])
+    end
+    return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+            err3, err4, res}
+end$
+function monster_ddl_clear(stream)
+    execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+test_run:cmd("setopt delimiter ''")$
+
+test_run:cmd("start server test with args='10, true'")
+test_run:switch('test')
+test_run:cmd("setopt delimiter '$'")
+function monster_ddl_is_clean()
+    if not (box.space.T1 == nil) or
+       not (box.space.T2 == nil) or
+       not (box.space._trigger:count() == 0) or
+       not (box.space._fk_constraint:count() == 0) or
+       not (box.space._ck_constraint:count() == 0) or
+       not (box.space.T_RENAMED == nil) or
+       not (box.space.T_TO_RENAME == nil) then
+           return false
+    end
+    return true
+end$
+test_run:cmd("setopt delimiter ''")$
+test_run:switch('default')
+
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+
+-- No txn.
+true_ddl_res = monster_ddl()
+true_ddl_res
+
+true_check_res = monster_ddl_check()
+true_check_res
+
+monster_ddl_clear()
+monster_ddl_is_clean()
+
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+-- reload schema
+stream:ping()
+space = stream.space.TEST
+assert(space ~= nil)
+stream:execute('START TRANSACTION')
+space:replace{1, 2, '3'}
+space:select()
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:execute('COMMIT')
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:unprepare(stream_pr.stmt_id)
+conn:close()
+test_run:switch('test')
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+box.space.TEST:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
 test_run:cmd("cleanup server test")
 test_run:cmd("delete server test")
-- 
2.20.1



More information about the Tarantool-patches mailing list