[Tarantool-patches] [PATCH 7/7] net.box: add interactive transaction support in net.box
mechanik20051988
mechanik20051988 at tarantool.org
Thu Aug 5 21:17:45 MSK 2021
From: mechanik20051988 <mechanik20.05.1988 at gmail.com>
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.
Closes #5860
@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:stream()
space = stream.space.test
space_not_from_stream = conn.space.test
stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
---
.../gh-5860-implement-streams-in-iproto.md | 28 +
src/box/lua/net_box.c | 51 +-
src/box/lua/net_box.lua | 50 +-
test/box/stream.result | 3036 +++++++++++++++++
test/box/stream.test.lua | 1201 +++++++
5 files changed, 4358 insertions(+), 8 deletions(-)
create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
new file mode 100644
index 000000000..d0f1359dd
--- /dev/null
+++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
@@ -0,0 +1,28 @@
+## feature/core
+
+* Streams and interactive transactions over streams are implemented
+ in iproto. Stream is associated with it's ID, which is unique within
+ one connection. All requests with same not zero stream ID belongs to
+ the same stream. All requests in stream processed synchronously. The
+ execution of the next request will not start until the previous one
+ is completed. If request has zero stream ID it does not belong to stream
+ and is processed in the old way.
+ In `net.box`, stream is an object above connection that has the same
+ methods, but allows to execute requests sequentially. ID is generated
+ on the client side in two ways: automatically or manually. User can
+ choose any of two methods, but can not mix them. If user writes his
+ own connector and wants to use streams, he must transmit stream_id over
+ the iproto protocol.
+ The main purpose of streams is transactions via iproto. Each stream
+ can start its own transaction, so they allows multiplexing several
+ transactions over one connection. There are multiple ways to begin,
+ commit and rollback transaction: using appropriate stream methods, using
+ `call` or `eval` methods or using `execute` method with sql transaction
+ syntax. User can mix these methods, for example, start transaction using
+ `stream:begin()`, and commit transaction using `stream:call('box.commit')`
+ or stream:execute('COMMIT').
+ If any request fails during the transaction, it will not affect the other
+ requests in the transaction. If disconnect occurs when there is some active
+ transaction in stream, this transaction will be rollbacked, if it does not
+ have time to commit before this moment.
+
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index ec850cd9f..e7b95ba84 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -70,7 +70,10 @@ enum netbox_method {
NETBOX_MIN = 14,
NETBOX_MAX = 15,
NETBOX_COUNT = 16,
- NETBOX_INJECT = 17,
+ NETBOX_BEGIN = 17,
+ NETBOX_COMMIT = 18,
+ NETBOX_ROLLBACK = 19,
+ NETBOX_INJECT = 20,
netbox_method_MAX
};
@@ -620,6 +623,46 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
netbox_encode_prepare(L, idx, stream, sync, stream_id);
}
+static inline void
+netbox_encode_txn(lua_State *L, enum iproto_type type, int idx,
+ struct mpstream *stream, uint64_t sync,
+ uint64_t stream_id)
+{
+ (void)L;
+ (void) idx;
+ assert(type == IPROTO_TRANSACTION_BEGIN ||
+ type == IPROTO_TRANSACTION_COMMIT ||
+ type == IPROTO_TRANSACTION_ROLLBACK);
+ size_t svp = netbox_prepare_request(stream, sync,
+ type, stream_id);
+
+ netbox_encode_request(stream, svp);
+}
+
+static void
+netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream,
+ uint64_t sync, uint64_t stream_id)
+{
+ return netbox_encode_txn(L, IPROTO_TRANSACTION_BEGIN, idx, stream,
+ sync, stream_id);
+}
+
+static void
+netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream,
+ uint64_t sync, uint64_t stream_id)
+{
+ return netbox_encode_txn(L, IPROTO_TRANSACTION_COMMIT, idx, stream,
+ sync, stream_id);
+}
+
+static void
+netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream,
+ uint64_t sync, uint64_t stream_id)
+{
+ return netbox_encode_txn(L, IPROTO_TRANSACTION_ROLLBACK, idx, stream,
+ sync, stream_id);
+}
+
static void
netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
uint64_t sync, uint64_t stream_id)
@@ -667,6 +710,9 @@ netbox_encode_method(struct lua_State *L)
[NETBOX_MIN] = netbox_encode_select,
[NETBOX_MAX] = netbox_encode_select,
[NETBOX_COUNT] = netbox_encode_call,
+ [NETBOX_BEGIN] = netbox_encode_begin,
+ [NETBOX_COMMIT] = netbox_encode_commit,
+ [NETBOX_ROLLBACK] = netbox_encode_rollback,
[NETBOX_INJECT] = netbox_encode_inject,
};
enum netbox_method method = lua_tointeger(L, 1);
@@ -1047,6 +1093,9 @@ netbox_decode_method(struct lua_State *L)
[NETBOX_MIN] = netbox_decode_tuple,
[NETBOX_MAX] = netbox_decode_tuple,
[NETBOX_COUNT] = netbox_decode_value,
+ [NETBOX_BEGIN] = netbox_decode_nil,
+ [NETBOX_COMMIT] = netbox_decode_nil,
+ [NETBOX_ROLLBACK] = netbox_decode_nil,
[NETBOX_INJECT] = netbox_decode_table,
};
enum netbox_method method = lua_tointeger(L, 1);
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index bf6a89e15..199d78127 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -70,8 +70,11 @@ local M_GET = 13
local M_MIN = 14
local M_MAX = 15
local M_COUNT = 16
+local M_BEGIN = 17
+local M_COMMIT = 18
+local M_ROLLBACK = 19
-- Injects raw data into connection. Used by console and tests.
-local M_INJECT = 17
+local M_INJECT = 20
ffi.cdef[[
struct error *
@@ -1167,16 +1170,52 @@ local function check_eval_args(args)
end
end
+local function nothing_or_data(value)
+ if value ~= nil then
+ return value
+ end
+end
+
function stream_methods:new_stream()
check_remote_arg(self, 'stream')
box.error(E_PROC_LUA, "Unsupported for stream");
end
+function stream_methods:begin(opts)
+ check_remote_arg(self, 'begin')
+ local res = self:_request(M_BEGIN, opts, nil, self._stream_id)
+ if type(res) ~= 'table' or opts and opts.is_async then
+ return nothing_or_data(res)
+ end
+ return unpack(res)
+end
+
+function stream_methods:commit(opts)
+ check_remote_arg(self, 'commit')
+ local res = self:_request(M_COMMIT, opts, nil, self._stream_id)
+ if type(res) ~= 'table' or opts and opts.is_async then
+ return nothing_or_data(res)
+ end
+ return unpack(res)
+end
+
+function stream_methods:rollback(opts)
+ check_remote_arg(self, 'rollback')
+ local res = self:_request(M_ROLLBACK, opts, nil, self._stream_id)
+ if type(res) ~= 'table' or opts and opts.is_async then
+ return nothing_or_data(res)
+ end
+ return unpack(res)
+end
+
function remote_methods:new_stream()
check_remote_arg(self, 'stream')
self._last_stream_id = self._last_stream_id + 1
local stream = setmetatable({
new_stream = stream_methods.new_stream,
+ begin = stream_methods.begin,
+ commit = stream_methods.commit,
+ rollback = stream_methods.rollback,
_stream_id = self._last_stream_id,
space = setmetatable({
_space = {},
@@ -1498,12 +1537,6 @@ function console_methods:eval(line, timeout)
return res[1] or res
end
-local function nothing_or_data(value)
- if value ~= nil then
- return value
- end
-end
-
space_metatable = function(remote)
local methods = {}
@@ -1662,6 +1695,9 @@ local this_module = {
min = M_MIN,
max = M_MAX,
count = M_COUNT,
+ begin = M_BEGIN,
+ commit = M_COMMIT,
+ rollback = M_ROLLBACK,
inject = M_INJECT,
}
}
diff --git a/test/box/stream.result b/test/box/stream.result
index bfcf6c6be..609ce8100 100644
--- a/test/box/stream.result
+++ b/test/box/stream.result
@@ -3,9 +3,15 @@
net_box = require('net.box')
| ---
| ...
+json = require('json')
+ | ---
+ | ...
fiber = require('fiber')
| ---
| ...
+msgpack = require('msgpack')
+ | ---
+ | ...
test_run = require('test_run').new()
| ---
| ...
@@ -95,6 +101,145 @@ stream:new_stream()
| ---
| - error: Unsupported for stream
| ...
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+ | ---
+ | ...
+conn_2 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1_1 = conn_1:new_stream()
+ | ---
+ | ...
+stream_1_2 = conn_1:new_stream()
+ | ---
+ | ...
+stream_2 = conn_2:new_stream()
+ | ---
+ | ...
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+ | ---
+ | ...
+stream_1_1:rollback()
+ | ---
+ | ...
+
+stream_1_1:begin()
+ | ---
+ | ...
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+ | ---
+ | ...
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+ | ---
+ | ...
+box.commit()
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+stream_1_1:commit()
+ | ---
+ | ...
+stream_1_2:commit()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+ | ---
+ | - error: Unable to process this type (14) of requests out of stream
+ | ...
+conn:_request(net_box._method.commit, nil, nil, nil)
+ | ---
+ | - error: Unable to process this type (15) of requests out of stream
+ | ...
+conn:_request(net_box._method.rollback, nil, nil, nil)
+ | ---
+ | - error: Unable to process this type (16) of requests out of stream
+ | ...
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+ | ---
+ | ...
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+ | ---
+ | ...
+IPROTO_REQUEST_TYPE = 0x00
+ | ---
+ | ...
+IPROTO_SYNC = 0x01
+ | ---
+ | ...
+IPROTO_AUTH = 7
+ | ---
+ | ...
+IPROTO_STREAM_ID = 0x0a
+ | ---
+ | ...
+next_request_id = 9
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+header = msgpack.encode({
+ [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+ [IPROTO_SYNC] = next_request_id,
+ [IPROTO_STREAM_ID] = 1,
+});
+ | ---
+ | ...
+body = msgpack.encode({nil});
+ | ---
+ | ...
+size = msgpack.encode(header:len() + body:len());
+ | ---
+ | ...
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+ nil, nil, nil, nil,
+ size .. header .. body);
+ | ---
+ | - null
+ | - Unable to process this type (7) of requests in stream
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
conn:close()
| ---
| ...
@@ -543,6 +688,2897 @@ test_run:cmd("stop server test")
| - true
| ...
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+space = stream.space.test
+ | ---
+ | ...
+
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+ | ---
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+space:replace({1})
+ | ---
+ | - [1]
+ | ...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+ | ---
+ | - []
+ | ...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+ | ---
+ | - []
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+ | ---
+ | - error: Transaction has been aborted by a fiber yield
+ | ...
+-- Select is empty, transaction was aborted
+space:select{}
+ | ---
+ | - []
+ | ...
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+ | ---
+ | ...
+stream:ping()
+ | ---
+ | - true
+ | ...
+stream:commit()
+ | ---
+ | ...
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+ | ---
+ | ...
+stream:call('s:replace', {{1}})
+ | ---
+ | - [1]
+ | ...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+ | ---
+ | - []
+ | ...
+stream:call('s:select', {})
+ | ---
+ | - []
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+ | ---
+ | - error: Transaction has been aborted by a fiber yield
+ | ...
+-- Select is empty, transaction was aborted
+space:select{}
+ | ---
+ | - []
+ | ...
+
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+stream:call('s:replace', {{1}})
+ | ---
+ | - [1]
+ | ...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+ | ---
+ | - []
+ | ...
+stream:call('s:select', {})
+ | ---
+ | - []
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+ | ---
+ | - error: Transaction has been aborted by a fiber yield
+ | ...
+-- Select is empty, transaction was aborted
+space:select{}
+ | ---
+ | - []
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+s:drop()
+ | ---
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+stream:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+ | ---
+ | ...
+space_2_no_stream = conn.space.test_2
+ | ---
+ | ...
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+ | ---
+ | ...
+space_1:replace({1})
+ | ---
+ | - [1]
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+space_2:replace({1})
+ | ---
+ | - [1]
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+ | ---
+ | - []
+ | ...
+space_2_no_stream:select{}
+ | ---
+ | - []
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+ | ---
+ | - - [1]
+ | ...
+space_2:select({})
+ | ---
+ | - - [1]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+ | ---
+ | - - [1]
+ | ...
+space_2:select{}
+ | ---
+ | - - [1]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+ | ---
+ | - - [1]
+ | ...
+s2:select()
+ | ---
+ | - - [1]
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_1_2 = stream_2.space.test_1
+ | ---
+ | ...
+space_2_1 = stream_1.space.test_2
+ | ---
+ | ...
+space_2_2 = stream_2.space.test_2
+ | ---
+ | ...
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+
+-- Simple read/write conflict.
+space_1_1:select({1})
+ | ---
+ | - []
+ | ...
+space_1_2:select({1})
+ | ---
+ | - []
+ | ...
+space_1_1:replace({1, 1})
+ | ---
+ | - [1, 1]
+ | ...
+space_1_2:replace({1, 2})
+ | ---
+ | - [1, 2]
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+-- This transaction fails, because of conflict
+stream_2:commit()
+ | ---
+ | - error: Transaction has been aborted by conflict
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+-- Here we must accept [1, 1]
+space_1_1:select({})
+ | ---
+ | - - [1, 1]
+ | ...
+space_1_2:select({})
+ | ---
+ | - - [1, 1]
+ | ...
+
+-- Same test for vinyl sapce
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+space_2_1:select({1})
+ | ---
+ | - []
+ | ...
+space_2_2:select({1})
+ | ---
+ | - []
+ | ...
+space_2_1:replace({1, 1})
+ | ---
+ | - [1, 1]
+ | ...
+space_2_2:replace({1, 2})
+ | ---
+ | - [1, 2]
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+-- This transaction fails, because of conflict
+stream_2:commit()
+ | ---
+ | - error: Transaction has been aborted by conflict
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+-- Here we must accept [1, 1]
+space_2_1:select({})
+ | ---
+ | - - [1, 1]
+ | ...
+space_2_2:select({})
+ | ---
+ | - - [1, 1]
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+ | ---
+ | - - [1, 1]
+ | ...
+s2:select()
+ | ---
+ | - - [1, 1]
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+
+-- Test rollback for memtx space
+space_1:replace({1})
+ | ---
+ | - [1]
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+ | ---
+ | - - [1]
+ | ...
+stream_1:rollback()
+ | ---
+ | ...
+-- Select is empty, transaction rollback
+space_1:select({})
+ | ---
+ | - []
+ | ...
+
+-- Test rollback for vinyl space
+space_2:replace({1})
+ | ---
+ | - [1]
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+ | ---
+ | - - [1]
+ | ...
+stream_2:rollback()
+ | ---
+ | ...
+-- Select is empty, transaction rollback
+space_2:select({})
+ | ---
+ | - []
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+ | ---
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Both select are empty, because transaction rollback
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+
+space_1:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_1:replace({2})
+ | ---
+ | - [2]
+ | ...
+-- Select return two previously inserted tuples
+space_1:select({})
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+
+space_2:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_2:replace({2})
+ | ---
+ | - [2]
+ | ...
+-- Select return two previously inserted tuples
+space_2:select({})
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+conn:close()
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Empty selects, transaction was rollback
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+-- Two empty selects
+space_1:select({})
+ | ---
+ | - []
+ | ...
+space_2:select({})
+ | ---
+ | - []
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Both select are empty, because transaction rollback
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+
+space_1:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_1:replace({2})
+ | ---
+ | - [2]
+ | ...
+-- Select return two previously inserted tuples
+space_1:select({})
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+
+space_2:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_2:replace({2})
+ | ---
+ | - [2]
+ | ...
+-- Select return two previously inserted tuples
+space_2:select({})
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+for i = 1, 1000 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+fiber.sleep(0)
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+ | ---
+ | - true
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+-- Select was empty, transaction rollbacked
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+_ = stream_1:commit({is_async = true})
+ | ---
+ | ...
+_ = stream_2:commit({is_async = true})
+ | ---
+ | ...
+fiber.sleep(0)
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+ | ---
+ | - true
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+ | ---
+ | ...
+rc2 = s2:select()
+ | ---
+ | ...
+assert(#rc1)
+ | ---
+ | - 100
+ | ...
+assert(#rc2)
+ | ---
+ | - 100
+ | ...
+s1:truncate()
+ | ---
+ | ...
+s2:truncate()
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+-- Two empty selects
+space_1:select({})
+ | ---
+ | - []
+ | ...
+space_2:select({})
+ | ---
+ | - []
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Both select are empty, because transaction rollback
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+
+stream_1:begin()
+ | ---
+ | ...
+stream_2:begin()
+ | ---
+ | ...
+space_1:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_1:replace({2})
+ | ---
+ | - [2]
+ | ...
+space_2:replace({1})
+ | ---
+ | - [1]
+ | ...
+space_2:replace({2})
+ | ---
+ | - [2]
+ | ...
+stream_1:commit()
+ | ---
+ | ...
+stream_2:commit()
+ | ---
+ | ...
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Here we get two tuples, commit was successful
+s1:select{}
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+-- Here we get two tuples, commit was successful
+s2:select{}
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("start server test with args='1, true'")
+ | ---
+ | - true
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+box.space.test_1:drop()
+ | ---
+ | ...
+box.space.test_2:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+space_1 = stream_1.space.test_1
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_2 = stream_2.space.test_2
+ | ---
+ | ...
+
+memtx_futures = {}
+ | ---
+ | ...
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+ | ---
+ | ...
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+ | ---
+ | ...
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+ | ---
+ | ...
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+ | ---
+ | ...
+
+vinyl_futures = {}
+ | ---
+ | ...
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+ | ---
+ | ...
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+ | ---
+ | ...
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+ | ---
+ | ...
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s1:select()
+ | ---
+ | - []
+ | ...
+s2:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+ | ---
+ | ...
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+ | ---
+ | ...
+
+memtx_results = wait_and_return_results(memtx_futures)
+ | ---
+ | ...
+vinyl_results = wait_and_return_results(vinyl_futures)
+ | ---
+ | ...
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+ | ---
+ | - true
+ | ...
+assert(not vinyl_results["begin"])
+ | ---
+ | - true
+ | ...
+-- [1]
+assert(memtx_results["replace"])
+ | ---
+ | - [1]
+ | ...
+assert(vinyl_results["replace"])
+ | ---
+ | - [1]
+ | ...
+-- [2]
+assert(memtx_results["insert"])
+ | ---
+ | - [2]
+ | ...
+assert(vinyl_results["insert"])
+ | ---
+ | - [2]
+ | ...
+-- [1] [2]
+assert(memtx_results["select"])
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+assert(vinyl_results["select"])
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+ | ---
+ | - true
+ | ...
+assert(not vinyl_results["commit"])
+ | ---
+ | - true
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+s2:select()
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s1:create_index('primary')
+ | ---
+ | ...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+ | ---
+ | ...
+_ = s2:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn:new_stream()
+ | ---
+ | ...
+stream_2 = conn:new_stream()
+ | ---
+ | ...
+space_1_1 = stream_1.space.test_1
+ | ---
+ | ...
+space_1_2 = stream_2.space.test_1
+ | ---
+ | ...
+space_2_1 = stream_1.space.test_2
+ | ---
+ | ...
+space_2_2 = stream_2.space.test_2
+ | ---
+ | ...
+
+futures_1 = {}
+ | ---
+ | ...
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+ | ---
+ | ...
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+ | ---
+ | ...
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+ | ---
+ | ...
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+ | ---
+ | ...
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+ | ---
+ | ...
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+ | ---
+ | ...
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+ | ---
+ | ...
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+ | ---
+ | ...
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+ | ---
+ | ...
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+ | ---
+ | ...
+
+results_1 = wait_and_return_results(futures_1)
+ | ---
+ | ...
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+ | ---
+ | - true
+ | ...
+assert(not results_1["begin_2"])
+ | ---
+ | - true
+ | ...
+-- []
+assert(not results_1["select_1_1"][1])
+ | ---
+ | - true
+ | ...
+assert(not results_1["select_1_2"][1])
+ | ---
+ | - true
+ | ...
+-- [1]
+assert(results_1["replace_1_1"][1])
+ | ---
+ | - 1
+ | ...
+-- [1]
+assert(results_1["replace_1_1"][2])
+ | ---
+ | - 1
+ | ...
+-- [1]
+assert(results_1["replace_1_2"][1])
+ | ---
+ | - 1
+ | ...
+-- [2]
+assert(results_1["replace_1_2"][2])
+ | ---
+ | - 2
+ | ...
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+ | ---
+ | - true
+ | ...
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+ | ---
+ | - Transaction has been aborted by conflict
+ | ...
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+ | ---
+ | - [1, 1]
+ | ...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+
+futures_2 = {}
+ | ---
+ | ...
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+ | ---
+ | ...
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+ | ---
+ | ...
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+ | ---
+ | ...
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+ | ---
+ | ...
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+ | ---
+ | ...
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+ | ---
+ | ...
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+ | ---
+ | ...
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+ | ---
+ | ...
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+ | ---
+ | ...
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+ | ---
+ | ...
+
+results_2 = wait_and_return_results(futures_2)
+ | ---
+ | ...
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+ | ---
+ | - true
+ | ...
+assert(not results_2["begin_2"])
+ | ---
+ | - true
+ | ...
+-- []
+assert(not results_2["select_2_1"][1])
+ | ---
+ | - true
+ | ...
+assert(not results_2["select_2_2"][1])
+ | ---
+ | - true
+ | ...
+-- [1]
+assert(results_2["replace_2_1"][1])
+ | ---
+ | - 1
+ | ...
+-- [1]
+assert(results_2["replace_2_1"][2])
+ | ---
+ | - 1
+ | ...
+-- [1]
+assert(results_2["replace_2_2"][1])
+ | ---
+ | - 1
+ | ...
+-- [2]
+assert(results_2["replace_2_2"][2])
+ | ---
+ | - 2
+ | ...
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+ | ---
+ | - true
+ | ...
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+ | ---
+ | - Transaction has been aborted by conflict
+ | ...
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+ | ---
+ | - [1, 1]
+ | ...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+ | ---
+ | - - [1, 1]
+ | ...
+s2:select()
+ | ---
+ | - - [1, 1]
+ | ...
+s1:drop()
+ | ---
+ | ...
+s2:drop()
+ | ---
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+function ping() return "pong" end
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+space = stream.space.test
+ | ---
+ | ...
+space_no_stream = conn.space.test
+ | ---
+ | ...
+
+-- successful begin using stream:call
+stream:call('box.begin')
+ | ---
+ | ...
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+stream:call('ping')
+ | ---
+ | - pong
+ | ...
+stream:eval('ping()')
+ | ---
+ | ...
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+stream:eval('box.begin()')
+ | ---
+ | - error: 'Operation is not permitted when there is an active transaction '
+ | ...
+-- successful commit using stream:call
+stream:call('box.commit')
+ | ---
+ | ...
+
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+ | ---
+ | ...
+space:replace({1})
+ | ---
+ | - [1]
+ | ...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+ | ---
+ | - []
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+ | ---
+ | - - [1]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+ | ---
+ | - - [1]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select return tuple, because transaction was successful
+s:select()
+ | ---
+ | - - [1]
+ | ...
+s:delete{1}
+ | ---
+ | - [1]
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+-- Check rollback using stream:call
+stream:begin()
+ | ---
+ | ...
+space:replace({2})
+ | ---
+ | - [2]
+ | ...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+ | ---
+ | - []
+ | ...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+ | ---
+ | - - [2]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Select is empty, transaction was not commited
+s:select()
+ | ---
+ | - []
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+--Successful rollback using stream:call
+stream:call('box.rollback')
+ | ---
+ | ...
+-- Empty selects transaction rollbacked
+space:select({})
+ | ---
+ | - []
+ | ...
+space_no_stream:select{}
+ | ---
+ | - []
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Empty select transaction rollbacked
+s:select()
+ | ---
+ | - []
+ | ...
+s:drop()
+ | ---
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+space = stream.space.test
+ | ---
+ | ...
+for i = 1, 10 do space:replace{i} end
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+s:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+ | ---
+ | - true
+ | ...
+function execute_sql_string(stream, sql_string)
+ if stream then
+ stream:execute(sql_string)
+ else
+ box.execute(sql_string)
+ end
+end$
+ | ---
+ | ...
+function execute_sql_string_and_return_result(stream, sql_string)
+ if stream then
+ return pcall(stream.execute, stream, sql_string)
+ else
+ return box.execute(sql_string)
+ end
+end$
+ | ---
+ | ...
+function monster_ddl(stream)
+ local _, err1, err2, err3, err4, err5, err6
+ local stream_or_box = stream or box
+ execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER);]])
+ execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER UNIQUE,
+ CONSTRAINT ck1
+ CHECK(b < 100));]])
+
+ execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+ execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+ execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+ KEY, a INTEGER);]])
+
+ execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+ execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+ CHECK(b > 0);]])
+
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+ RENAME TO t1;]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+ ck2 CHECK(a > 0);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+ INTEGER PRIMARY KEY);]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+
+ execute_sql_string(stream, [[CREATE TABLE
+ trigger_catcher(id INTEGER PRIMARY
+ KEY AUTOINCREMENT);]])
+
+ execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+ execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+ t1 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+ t2 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+ ON t1(a, b);]])
+
+ execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+ _, err5 =
+ execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+ _, err6 =
+ execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+ t_does_not_exist;]])
+
+ execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+ return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+ err5, err6}
+end$
+ | ---
+ | ...
+function monster_ddl_cmp_res(res1, res2)
+ if json.encode(res1) == json.encode(res2) then
+ return true
+ end
+ return res1, res2
+end$
+ | ---
+ | ...
+function monster_ddl_is_clean(stream)
+ local stream_or_box = stream or box
+ assert(stream_or_box.space.T1 == nil)
+ assert(stream_or_box.space.T2 == nil)
+ assert(stream_or_box.space._trigger:count() == 0)
+ assert(stream_or_box.space._fk_constraint:count() == 0)
+ assert(stream_or_box.space._ck_constraint:count() == 0)
+ assert(stream_or_box.space.T_RENAMED == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+ | ---
+ | ...
+function monster_ddl_check(stream)
+ local _, err1, err2, err3, err4, res
+ local stream_or_box = stream or box
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES (1, 1, 101)]])
+ execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES(2, 2, 1)]])
+ _, err3 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, 20, 1)]])
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, -1, 1)]])
+ execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+ if not stream then
+ assert(stream_or_box.space.T_RENAMED ~= nil)
+ assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+ res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ else
+ _, res =
+ execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ end
+ return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+ err3, err4, res}
+end$
+ | ---
+ | ...
+function monster_ddl_clear(stream)
+ execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''")$
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("start server test with args='10, true'")
+ | ---
+ | - true
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+test_run:cmd("setopt delimiter '$'")
+ | ---
+ | - true
+ | ...
+function monster_ddl_is_clean()
+ if not (box.space.T1 == nil) or
+ not (box.space.T2 == nil) or
+ not (box.space._trigger:count() == 0) or
+ not (box.space._fk_constraint:count() == 0) or
+ not (box.space._ck_constraint:count() == 0) or
+ not (box.space.T_RENAMED == nil) or
+ not (box.space.T_TO_RENAME == nil) then
+ return false
+ end
+ return true
+end$
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''")$
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+
+-- No txn.
+true_ddl_res = monster_ddl()
+ | ---
+ | ...
+true_ddl_res
+ | ---
+ | - - 'Finished ok, errors in the middle: '
+ | - Space 'T1' already exists
+ | - Space 'T1' already exists
+ | - Space 'T3' does not exist
+ | - Index 'T1A' already exists in space 'T1'
+ | - 'Failed to execute SQL statement: can not truncate space ''T2'' because other
+ | objects depend on it'
+ | - Space 'T_DOES_NOT_EXIST' does not exist
+ | ...
+
+true_check_res = monster_ddl_check()
+ | ---
+ | ...
+true_check_res
+ | ---
+ | - - 'Finished ok, errors and trigger catcher content: '
+ | - 'Check constraint failed ''CK1'': b < 100'
+ | - Duplicate key exists in unique index "unique_unnamed_T2_2" in space "T2" with
+ | old tuple - [1, 1, 1] and new tuple - [2, 2, 1]
+ | - 'Failed to execute SQL statement: FOREIGN KEY constraint failed'
+ | - 'Check constraint failed ''CK2'': a > 0'
+ | - metadata:
+ | - name: ID
+ | type: integer
+ | rows:
+ | - [1]
+ | ...
+
+monster_ddl_clear()
+ | ---
+ | ...
+monster_ddl_is_clean()
+ | ---
+ | ...
+
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+ | ---
+ | ...
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+ddl_res = monster_ddl(stream)
+ | ---
+ | ...
+monster_ddl_clear(stream)
+ | ---
+ | ...
+stream:call('monster_ddl_is_clean')
+ | ---
+ | - true
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+ | ---
+ | - true
+ | ...
+
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+ddl_res = monster_ddl(stream)
+ | ---
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+ | ---
+ | - true
+ | ...
+
+check_res = monster_ddl_check(stream)
+ | ---
+ | ...
+monster_ddl_cmp_res(check_res, true_check_res)
+ | ---
+ | - true
+ | ...
+
+monster_ddl_clear(stream)
+ | ---
+ | ...
+stream:call('monster_ddl_is_clean')
+ | ---
+ | - true
+ | ...
+
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+ | ---
+ | ...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+ | ---
+ | - true
+ | ...
+
+check_res = monster_ddl_check(stream)
+ | ---
+ | ...
+monster_ddl_cmp_res(check_res, true_check_res)
+ | ---
+ | - true
+ | ...
+
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+monster_ddl_clear(stream)
+ | ---
+ | ...
+stream:call('monster_ddl_is_clean')
+ | ---
+ | - true
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+ddl_res = monster_ddl(stream)
+ | ---
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+ | ---
+ | - true
+ | ...
+
+check_res = monster_ddl_check(stream)
+ | ---
+ | ...
+monster_ddl_cmp_res(check_res, true_check_res)
+ | ---
+ | - true
+ | ...
+
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+monster_ddl_clear(stream)
+ | ---
+ | ...
+stream:call('monster_ddl_is_clean')
+ | ---
+ | - true
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+ | ---
+ | - row_count: 1
+ | ...
+-- reload schema
+stream:ping()
+ | ---
+ | - true
+ | ...
+space = stream.space.TEST
+ | ---
+ | ...
+assert(space ~= nil)
+ | ---
+ | - true
+ | ...
+stream:execute('START TRANSACTION')
+ | ---
+ | - row_count: 0
+ | ...
+space:replace{1, 2, '3'}
+ | ---
+ | - [1, 2, '3']
+ | ...
+space:select()
+ | ---
+ | - - [1, 2, '3']
+ | ...
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+ | ---
+ | - []
+ | ...
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+ | ---
+ | ...
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+ | ---
+ | ...
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+ | ---
+ | - true
+ | ...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+ | ---
+ | - metadata:
+ | - name: ID
+ | type: integer
+ | - name: A
+ | type: number
+ | - name: B
+ | type: string
+ | rows:
+ | - [1, 2, '3']
+ | ...
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+ | ---
+ | - metadata:
+ | - name: ID
+ | type: integer
+ | - name: A
+ | type: number
+ | - name: B
+ | type: string
+ | rows: []
+ | ...
+stream:execute('COMMIT')
+ | ---
+ | - row_count: 0
+ | ...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+ | ---
+ | - metadata:
+ | - name: ID
+ | type: integer
+ | - name: A
+ | type: number
+ | - name: B
+ | type: string
+ | rows:
+ | - [1, 2, '3']
+ | ...
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+ | ---
+ | - metadata:
+ | - name: ID
+ | type: integer
+ | - name: A
+ | type: number
+ | - name: B
+ | type: string
+ | rows:
+ | - [1, 2, '3']
+ | ...
+stream:unprepare(stream_pr.stmt_id)
+ | ---
+ | - null
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+ | ---
+ | - - [1, 2, '3']
+ | ...
+box.space.TEST:drop()
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
test_run:cmd("cleanup server test")
| ---
| - true
diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua
index 190f17d8e..b8bd2d327 100644
--- a/test/box/stream.test.lua
+++ b/test/box/stream.test.lua
@@ -1,6 +1,8 @@
-- This test checks streams iplementation in iproto (gh-5860).
net_box = require('net.box')
+json = require('json')
fiber = require('fiber')
+msgpack = require('msgpack')
test_run = require('test_run').new()
test_run:cmd("create server test with script='box/stream.lua'")
@@ -45,6 +47,62 @@ conn = net_box.connect(server_addr)
stream = conn:new_stream()
-- Unsupported for stream
stream:new_stream()
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+conn_2 = net_box.connect(server_addr)
+stream_1_1 = conn_1:new_stream()
+stream_1_2 = conn_1:new_stream()
+stream_2 = conn_2:new_stream()
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+stream_1_1:rollback()
+
+stream_1_1:begin()
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+test_run:switch("test")
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+box.commit()
+test_run:switch("default")
+stream_1_1:commit()
+stream_1_2:commit()
+stream_2:commit()
+
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+conn:_request(net_box._method.commit, nil, nil, nil)
+conn:_request(net_box._method.rollback, nil, nil, nil)
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+IPROTO_REQUEST_TYPE = 0x00
+IPROTO_SYNC = 0x01
+IPROTO_AUTH = 7
+IPROTO_STREAM_ID = 0x0a
+next_request_id = 9
+test_run:cmd("setopt delimiter ';'")
+header = msgpack.encode({
+ [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+ [IPROTO_SYNC] = next_request_id,
+ [IPROTO_STREAM_ID] = 1,
+});
+body = msgpack.encode({nil});
+size = msgpack.encode(header:len() + body:len());
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+ nil, nil, nil, nil,
+ size .. header .. body);
+test_run:cmd("setopt delimiter ''");
conn:close()
-- Check that spaces in stream object updates, during reload_schema
@@ -203,5 +261,1148 @@ assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
test_run:switch("default")
test_run:cmd("stop server test")
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+test_run:switch('default')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+-- Select is empty, transaction was aborted
+space:select{}
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+stream:ping()
+stream:commit()
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+-- Select is empty, transaction was aborted
+space:select{}
+
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+-- Select is empty, transaction was aborted
+space:select{}
+
+test_run:switch('test')
+s:drop()
+-- Check that there are no streams and messages, which
+-- was not deleted
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+stream:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s1:create_index('primary')
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+space_2_no_stream = conn.space.test_2
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+space_1:replace({1})
+stream_2:begin()
+space_2:replace({1})
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+test_run:switch('default')
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+space_2_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+space_2:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+space_2:select{}
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Simple read/write conflict.
+space_1_1:select({1})
+space_1_2:select({1})
+space_1_1:replace({1, 1})
+space_1_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_1_1:select({})
+space_1_2:select({})
+
+-- Same test for vinyl sapce
+stream_1:begin()
+stream_2:begin()
+space_2_1:select({1})
+space_2_2:select({1})
+space_2_1:replace({1, 1})
+space_2_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_2_1:select({})
+space_2_2:select({})
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Test rollback for memtx space
+space_1:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+stream_1:rollback()
+-- Select is empty, transaction rollback
+space_1:select({})
+
+-- Test rollback for vinyl space
+space_2:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+stream_2:rollback()
+-- Select is empty, transaction rollback
+space_2:select({})
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+stream_1:commit()
+stream_2:begin()
+stream_2:commit()
+conn:close()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+conn:close()
+
+test_run:switch("test")
+-- Empty selects, transaction was rollback
+s1:select()
+s2:select()
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+for i = 1, 1000 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select was empty, transaction rollbacked
+s1:select()
+s2:select()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+test_run:cmd("setopt delimiter ';'")
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+_ = stream_1:commit({is_async = true})
+_ = stream_2:commit({is_async = true})
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+rc2 = s2:select()
+assert(#rc1)
+assert(#rc2)
+s1:truncate()
+s2:truncate()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+
+stream_1:begin()
+stream_2:begin()
+space_1:replace({1})
+space_1:replace({2})
+space_2:replace({1})
+space_2:replace({2})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+s1:select{}
+-- Here we get two tuples, commit was successful
+s2:select{}
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+test_run:cmd("start server test with args='1, true'")
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+box.space.test_1:drop()
+box.space.test_2:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+space_1 = stream_1.space.test_1
+stream_2 = conn:new_stream()
+space_2 = stream_2.space.test_2
+
+memtx_futures = {}
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+
+vinyl_futures = {}
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+
+memtx_results = wait_and_return_results(memtx_futures)
+vinyl_results = wait_and_return_results(vinyl_futures)
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+assert(not vinyl_results["begin"])
+-- [1]
+assert(memtx_results["replace"])
+assert(vinyl_results["replace"])
+-- [2]
+assert(memtx_results["insert"])
+assert(vinyl_results["insert"])
+-- [1] [2]
+assert(memtx_results["select"])
+assert(vinyl_results["select"])
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+assert(not vinyl_results["commit"])
+
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+
+futures_1 = {}
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+
+results_1 = wait_and_return_results(futures_1)
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+assert(not results_1["begin_2"])
+-- []
+assert(not results_1["select_1_1"][1])
+assert(not results_1["select_1_2"][1])
+-- [1]
+assert(results_1["replace_1_1"][1])
+-- [1]
+assert(results_1["replace_1_1"][2])
+-- [1]
+assert(results_1["replace_1_2"][1])
+-- [2]
+assert(results_1["replace_1_2"][2])
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+
+futures_2 = {}
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+
+results_2 = wait_and_return_results(futures_2)
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+assert(not results_2["begin_2"])
+-- []
+assert(not results_2["select_2_1"][1])
+assert(not results_2["select_2_2"][1])
+-- [1]
+assert(results_2["replace_2_1"][1])
+-- [1]
+assert(results_2["replace_2_1"][2])
+-- [1]
+assert(results_2["replace_2_2"][1])
+-- [2]
+assert(results_2["replace_2_2"][2])
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+function ping() return "pong" end
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+space_no_stream = conn.space.test
+
+-- successful begin using stream:call
+stream:call('box.begin')
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+stream:call('ping')
+stream:eval('ping()')
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+stream:eval('box.begin()')
+-- successful commit using stream:call
+stream:call('box.commit')
+
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+test_run:switch("test")
+-- Select return tuple, because transaction was successful
+s:select()
+s:delete{1}
+test_run:switch('default')
+-- Check rollback using stream:call
+stream:begin()
+space:replace({2})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful rollback using stream:call
+stream:call('box.rollback')
+-- Empty selects transaction rollbacked
+space:select({})
+space_no_stream:select{}
+test_run:switch("test")
+-- Empty select transaction rollbacked
+s:select()
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+for i = 1, 10 do space:replace{i} end
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+s:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+function execute_sql_string(stream, sql_string)
+ if stream then
+ stream:execute(sql_string)
+ else
+ box.execute(sql_string)
+ end
+end$
+function execute_sql_string_and_return_result(stream, sql_string)
+ if stream then
+ return pcall(stream.execute, stream, sql_string)
+ else
+ return box.execute(sql_string)
+ end
+end$
+function monster_ddl(stream)
+ local _, err1, err2, err3, err4, err5, err6
+ local stream_or_box = stream or box
+ execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER);]])
+ execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER UNIQUE,
+ CONSTRAINT ck1
+ CHECK(b < 100));]])
+
+ execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+ execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+ execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+ KEY, a INTEGER);]])
+
+ execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+ execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+ CHECK(b > 0);]])
+
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+ RENAME TO t1;]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+ ck2 CHECK(a > 0);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+ INTEGER PRIMARY KEY);]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+
+ execute_sql_string(stream, [[CREATE TABLE
+ trigger_catcher(id INTEGER PRIMARY
+ KEY AUTOINCREMENT);]])
+
+ execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+ execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+ t1 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+ t2 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+ ON t1(a, b);]])
+
+ execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+ _, err5 =
+ execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+ _, err6 =
+ execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+ t_does_not_exist;]])
+
+ execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+ return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+ err5, err6}
+end$
+function monster_ddl_cmp_res(res1, res2)
+ if json.encode(res1) == json.encode(res2) then
+ return true
+ end
+ return res1, res2
+end$
+function monster_ddl_is_clean(stream)
+ local stream_or_box = stream or box
+ assert(stream_or_box.space.T1 == nil)
+ assert(stream_or_box.space.T2 == nil)
+ assert(stream_or_box.space._trigger:count() == 0)
+ assert(stream_or_box.space._fk_constraint:count() == 0)
+ assert(stream_or_box.space._ck_constraint:count() == 0)
+ assert(stream_or_box.space.T_RENAMED == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+function monster_ddl_check(stream)
+ local _, err1, err2, err3, err4, res
+ local stream_or_box = stream or box
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES (1, 1, 101)]])
+ execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES(2, 2, 1)]])
+ _, err3 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, 20, 1)]])
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, -1, 1)]])
+ execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+ if not stream then
+ assert(stream_or_box.space.T_RENAMED ~= nil)
+ assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+ res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ else
+ _, res =
+ execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ end
+ return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+ err3, err4, res}
+end$
+function monster_ddl_clear(stream)
+ execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+test_run:cmd("setopt delimiter ''")$
+
+test_run:cmd("start server test with args='10, true'")
+test_run:switch('test')
+test_run:cmd("setopt delimiter '$'")
+function monster_ddl_is_clean()
+ if not (box.space.T1 == nil) or
+ not (box.space.T2 == nil) or
+ not (box.space._trigger:count() == 0) or
+ not (box.space._fk_constraint:count() == 0) or
+ not (box.space._ck_constraint:count() == 0) or
+ not (box.space.T_RENAMED == nil) or
+ not (box.space.T_TO_RENAME == nil) then
+ return false
+ end
+ return true
+end$
+test_run:cmd("setopt delimiter ''")$
+test_run:switch('default')
+
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+
+-- No txn.
+true_ddl_res = monster_ddl()
+true_ddl_res
+
+true_check_res = monster_ddl_check()
+true_check_res
+
+monster_ddl_clear()
+monster_ddl_is_clean()
+
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+-- reload schema
+stream:ping()
+space = stream.space.TEST
+assert(space ~= nil)
+stream:execute('START TRANSACTION')
+space:replace{1, 2, '3'}
+space:select()
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:execute('COMMIT')
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:unprepare(stream_pr.stmt_id)
+conn:close()
+test_run:switch('test')
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+box.space.TEST:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
test_run:cmd("cleanup server test")
test_run:cmd("delete server test")
--
2.20.1
More information about the Tarantool-patches
mailing list