[Tarantool-patches] [PATCH v2 8/8] net.box: add interactive transaction support in net.box
mechanik20051988
mechanik20051988 at tarantool.org
Mon Aug 9 17:38:00 MSK 2021
From: mechanik20051988 <mechanik20.05.1988 at gmail.com>
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.
Closes #5860
@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:new_stream()
space = stream.space.test
space_not_from_stream = conn.space.test
stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
---
.../gh-5860-implement-streams-in-iproto.md | 26 +
src/box/lua/net_box.c | 51 +-
src/box/lua/net_box.lua | 35 +-
test/box/stream.result | 3558 +++++++++++++++--
test/box/stream.test.lua | 1202 ++++++
5 files changed, 4556 insertions(+), 316 deletions(-)
create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
new file mode 100644
index 000000000..8a8eec3e7
--- /dev/null
+++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
@@ -0,0 +1,26 @@
+## feature/core
+
+* Streams and interactive transactions over streams are implemented
+ in iproto. Stream is associated with it's ID, which is unique within
+ one connection. All requests with same not zero stream ID belongs to
+ the same stream. All requests in stream processed synchronously. The
+ execution of the next request will not start until the previous one is
+ completed. If request has zero stream ID it does not belong to stream
+ and is processed in the old way.
+ In `net.box`, stream is an object above connection that has the same
+ methods, but allows to execute requests sequentially. ID is generated
+ on the client side automatically. If user writes his own connector and
+ wants to use streams, he must transmit stream_id over iproto protocol.
+ The main purpose of streams is transactions via iproto. Each stream
+ can start its own transaction, so they allows multiplexing several
+ transactions over one connection. There are multiple ways to begin,
+ commit and rollback transaction: using appropriate stream methods, using
+ `call` or `eval` methods or using `execute` method with sql transaction
+ syntax. User can mix these methods, for example, start transaction using
+ `stream:begin()`, and commit transaction using `stream:call('box.commit')`
+ or stream:execute('COMMIT').
+ If any request fails during the transaction, it will not affect the other
+ requests in the transaction. If disconnect occurs when there is some active
+ transaction in stream, this transaction will be rollbacked, if it does not
+ have time to commit before this moment.
+
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index df36e3991..43dbb6448 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -70,7 +70,10 @@ enum netbox_method {
NETBOX_MIN = 14,
NETBOX_MAX = 15,
NETBOX_COUNT = 16,
- NETBOX_INJECT = 17,
+ NETBOX_BEGIN = 17,
+ NETBOX_COMMIT = 18,
+ NETBOX_ROLLBACK = 19,
+ NETBOX_INJECT = 20,
netbox_method_MAX
};
@@ -618,6 +621,46 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
netbox_encode_prepare(L, idx, stream, sync, stream_id);
}
+static inline void
+netbox_encode_txn(lua_State *L, enum iproto_type type, int idx,
+ struct mpstream *stream, uint64_t sync,
+ uint64_t stream_id)
+{
+ (void)L;
+ (void) idx;
+ assert(type == IPROTO_TRANSACTION_BEGIN ||
+ type == IPROTO_TRANSACTION_COMMIT ||
+ type == IPROTO_TRANSACTION_ROLLBACK);
+ size_t svp = netbox_prepare_request(stream, sync,
+ type, stream_id);
+
+ netbox_encode_request(stream, svp);
+}
+
+static void
+netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream,
+ uint64_t sync, uint64_t stream_id)
+{
+ return netbox_encode_txn(L, IPROTO_TRANSACTION_BEGIN, idx, stream,
+ sync, stream_id);
+}
+
+static void
+netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream,
+ uint64_t sync, uint64_t stream_id)
+{
+ return netbox_encode_txn(L, IPROTO_TRANSACTION_COMMIT, idx, stream,
+ sync, stream_id);
+}
+
+static void
+netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream,
+ uint64_t sync, uint64_t stream_id)
+{
+ return netbox_encode_txn(L, IPROTO_TRANSACTION_ROLLBACK, idx, stream,
+ sync, stream_id);
+}
+
static void
netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
uint64_t sync, uint64_t stream_id)
@@ -665,6 +708,9 @@ netbox_encode_method(struct lua_State *L)
[NETBOX_MIN] = netbox_encode_select,
[NETBOX_MAX] = netbox_encode_select,
[NETBOX_COUNT] = netbox_encode_call,
+ [NETBOX_BEGIN] = netbox_encode_begin,
+ [NETBOX_COMMIT] = netbox_encode_commit,
+ [NETBOX_ROLLBACK] = netbox_encode_rollback,
[NETBOX_INJECT] = netbox_encode_inject,
};
enum netbox_method method = lua_tointeger(L, 1);
@@ -1045,6 +1091,9 @@ netbox_decode_method(struct lua_State *L)
[NETBOX_MIN] = netbox_decode_tuple,
[NETBOX_MAX] = netbox_decode_tuple,
[NETBOX_COUNT] = netbox_decode_value,
+ [NETBOX_BEGIN] = netbox_decode_nil,
+ [NETBOX_COMMIT] = netbox_decode_nil,
+ [NETBOX_ROLLBACK] = netbox_decode_nil,
[NETBOX_INJECT] = netbox_decode_table,
};
enum netbox_method method = lua_tointeger(L, 1);
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 34b396235..9e653c312 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -70,8 +70,11 @@ local M_GET = 13
local M_MIN = 14
local M_MAX = 15
local M_COUNT = 16
+local M_BEGIN = 17
+local M_COMMIT = 18
+local M_ROLLBACK = 19
-- Injects raw data into connection. Used by console and tests.
-local M_INJECT = 17
+local M_INJECT = 20
ffi.cdef[[
struct error *
@@ -1196,11 +1199,38 @@ local function new_stream(stream)
return stream._conn:new_stream()
end
+local function begin(stream, opts)
+ check_remote_arg(stream, 'begin')
+ local res = stream:_request(M_BEGIN, opts, nil, stream._stream_id)
+ if opts and opts.is_async then
+ return res
+ end
+end
+
+local function commit(stream, opts)
+ check_remote_arg(stream, 'commit')
+ local res = stream:_request(M_COMMIT, opts, nil, stream._stream_id)
+ if opts and opts.is_async then
+ return res
+ end
+end
+
+local function rollback(stream, opts)
+ check_remote_arg(stream, 'rollback')
+ local res = stream:_request(M_ROLLBACK, opts, nil, stream._stream_id)
+ if opts and opts.is_async then
+ return res
+ end
+end
+
function remote_methods:new_stream()
check_remote_arg(self, 'new_stream')
self._last_stream_id = self._last_stream_id + 1
local stream = setmetatable({
new_stream = new_stream,
+ begin = begin,
+ commit = commit,
+ rollback = rollback,
_stream_id = self._last_stream_id,
space = setmetatable({
_stream_space_cache = {},
@@ -1685,6 +1715,9 @@ local this_module = {
min = M_MIN,
max = M_MAX,
count = M_COUNT,
+ begin = M_BEGIN,
+ commit = M_COMMIT,
+ rollback = M_ROLLBACK,
inject = M_INJECT,
}
}
diff --git a/test/box/stream.result b/test/box/stream.result
index 03200ecf6..95fd1ca51 100644
--- a/test/box/stream.result
+++ b/test/box/stream.result
@@ -1,24 +1,27 @@
--- test-run result file version 2
-- This test checks streams iplementation in iproto (gh-5860).
net_box = require('net.box')
- | ---
- | ...
+---
+...
+json = require('json')
+---
+...
fiber = require('fiber')
- | ---
- | ...
+---
+...
+msgpack = require('msgpack')
+---
+...
test_run = require('test_run').new()
- | ---
- | ...
-
+---
+...
test_run:cmd("create server test with script='box/stream.lua'")
- | ---
- | - true
- | ...
-
+---
+- true
+...
test_run:cmd("setopt delimiter ';'")
- | ---
- | - true
- | ...
+---
+- true
+...
function get_current_connection_count()
local total_net_stat_table =
test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
@@ -27,8 +30,8 @@ function get_current_connection_count()
assert(connection_stat_table)
return connection_stat_table.current
end;
- | ---
- | ...
+---
+...
function wait_and_return_results(futures)
local results = {}
for name, future in pairs(futures) do
@@ -40,446 +43,3373 @@ function wait_and_return_results(futures)
end
return results
end;
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ''");
- | ---
- | - true
- | ...
-
+---
+- true
+...
-- Some simple checks for new object - stream
test_run:cmd("start server test with args='1'")
- | ---
- | - true
- | ...
+---
+- true
+...
server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
- | ---
- | ...
+---
+...
conn_1 = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
stream_1 = conn_1:new_stream()
- | ---
- | ...
+---
+...
conn_2 = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
stream_2 = conn_2:new_stream()
- | ---
- | ...
+---
+...
-- Stream is a wrapper around connection, so if you close connection
-- you close stream, and vice versa.
conn_1:close()
- | ---
- | ...
+---
+...
assert(not stream_1:ping())
- | ---
- | - true
- | ...
+---
+- true
+...
stream_2:close()
- | ---
- | ...
+---
+...
assert(not conn_2:ping())
- | ---
- | - true
- | ...
+---
+- true
+...
conn = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
stream = conn:new_stream()
- | ---
- | ...
+---
+...
-- The new method `new_stream`, for the stream object, returns a new
-- stream object, just as in the case of connection.
_ = stream:new_stream()
- | ---
- | ...
+---
+...
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+---
+...
+conn_2 = net_box.connect(server_addr)
+---
+...
+stream_1_1 = conn_1:new_stream()
+---
+...
+stream_1_2 = conn_1:new_stream()
+---
+...
+stream_2 = conn_2:new_stream()
+---
+...
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+---
+...
+stream_1_1:rollback()
+---
+...
+stream_1_1:begin()
+---
+...
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+---
+...
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+---
+...
+box.commit()
+---
+...
+test_run:switch("default")
+---
+- true
+...
+stream_1_1:commit()
+---
+...
+stream_1_2:commit()
+---
+...
+stream_2:commit()
+---
+...
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+---
+- error: Unable to process BEGIN request out of stream
+...
+conn:_request(net_box._method.commit, nil, nil, nil)
+---
+- error: Unable to process COMMIT request out of stream
+...
+conn:_request(net_box._method.rollback, nil, nil, nil)
+---
+- error: Unable to process ROLLBACK request out of stream
+...
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+---
+...
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+---
+...
+IPROTO_REQUEST_TYPE = 0x00
+---
+...
+IPROTO_SYNC = 0x01
+---
+...
+IPROTO_AUTH = 7
+---
+...
+IPROTO_STREAM_ID = 0x0a
+---
+...
+next_request_id = 9
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+header = msgpack.encode({
+ [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+ [IPROTO_SYNC] = next_request_id,
+ [IPROTO_STREAM_ID] = 1,
+});
+---
+...
+body = msgpack.encode({nil});
+---
+...
+size = msgpack.encode(header:len() + body:len());
+---
+...
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+ nil, nil, nil, nil,
+ size .. header .. body);
+---
+- null
+- Unable to process AUTH request in stream
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
conn:close()
- | ---
- | ...
-
+---
+...
-- Check that spaces in stream object updates, during reload_schema
conn = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
stream = conn:new_stream()
- | ---
- | ...
+---
+...
test_run:switch("test")
- | ---
- | - true
- | ...
+---
+- true
+...
-- Create one space on server
s = box.schema.space.create('test', { engine = 'memtx' })
- | ---
- | ...
+---
+...
_ = s:create_index('primary')
- | ---
- | ...
+---
+...
test_run:switch("default")
- | ---
- | - true
- | ...
+---
+- true
+...
assert(not conn.space.test)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(not stream.space.test)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(conn.schema_version == stream._schema_version)
- | ---
- | - true
- | ...
+---
+- true
+...
conn:reload_schema()
- | ---
- | ...
+---
+...
assert(conn.space.test ~= nil)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(conn.schema_version ~= stream._schema_version)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(stream.space.test ~= nil)
- | ---
- | - true
- | ...
+---
+- true
+...
-- When we touch stream.space, we compare stream._schema_version
-- and conn.schema_version if they are not equal, we clear stream
-- space cache, update it's _schema_version and load space from
-- connection to stream space cache.
assert(conn.schema_version == stream._schema_version)
- | ---
- | - true
- | ...
+---
+- true
+...
collectgarbage()
- | ---
- | - 0
- | ...
+---
+- 0
+...
collectgarbage()
- | ---
- | - 0
- | ...
+---
+- 0
+...
assert(conn.space.test ~= nil)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(stream.space.test ~= nil)
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:switch("test")
- | ---
- | - true
- | ...
+---
+- true
+...
s:drop()
- | ---
- | ...
+---
+...
test_run:switch("default")
- | ---
- | - true
- | ...
+---
+- true
+...
conn:reload_schema()
- | ---
- | ...
+---
+...
assert(not conn.space.test)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(not stream.space.test)
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:cmd("stop server test")
- | ---
- | - true
- | ...
-
+---
+- true
+...
-- All test works with iproto_thread count = 10
-
test_run:cmd("start server test with args='10'")
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:switch('test')
- | ---
- | - true
- | ...
+---
+- true
+...
fiber = require('fiber')
- | ---
- | ...
+---
+...
s = box.schema.space.create('test', { engine = 'memtx' })
- | ---
- | ...
+---
+...
_ = s:create_index('primary')
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ';'")
- | ---
- | - true
- | ...
+---
+- true
+...
function replace_with_yeild(item)
fiber.sleep(0.1)
return s:replace({item})
end;
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ''");
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:switch('default')
- | ---
- | - true
- | ...
-
+---
+- true
+...
conn = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
assert(conn:ping())
- | ---
- | - true
- | ...
+---
+- true
+...
conn_space = conn.space.test
- | ---
- | ...
+---
+...
stream = conn:new_stream()
- | ---
- | ...
+---
+...
stream_space = stream.space.test
- | ---
- | ...
-
+---
+...
-- Check that all requests in stream processed consistently
futures = {}
- | ---
- | ...
+---
+...
replace_count = 3
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ';'")
- | ---
- | - true
- | ...
+---
+- true
+...
for i = 1, replace_count do
futures[string.format("replace_%d", i)] =
stream_space:replace({i}, {is_async = true})
futures[string.format("select_%d", i)] =
stream_space:select({}, {is_async = true})
end;
- | ---
- | ...
+---
+...
futures["replace_with_yeild_for_stream"] =
stream:call("replace_with_yeild",
{ replace_count + 1 }, {is_async = true});
- | ---
- | ...
+---
+...
futures["select_with_yeild_for_stream"] =
stream_space:select({}, {is_async = true});
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ''");
- | ---
- | - true
- | ...
+---
+- true
+...
results = wait_and_return_results(futures)
- | ---
- | ...
+---
+...
-- [1]
assert(results["select_1"])
- | ---
- | - - [1]
- | ...
+---
+- - [1]
+...
-- [1] [2]
assert(results["select_2"])
- | ---
- | - - [1]
- | - [2]
- | ...
+---
+- - [1]
+ - [2]
+...
-- [1] [2] [3]
assert(results["select_3"])
- | ---
- | - - [1]
- | - [2]
- | - [3]
- | ...
+---
+- - [1]
+ - [2]
+ - [3]
+...
-- [1] [2] [3] [4]
-- Even yeild in replace function does not affect
-- the order of requests execution in stream
assert(results["select_with_yeild_for_stream"])
- | ---
- | - - [1]
- | - [2]
- | - [3]
- | - [4]
- | ...
-
+---
+- - [1]
+ - [2]
+ - [3]
+ - [4]
+...
-- There is no request execution order for the connection
futures = {}
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ';'")
- | ---
- | - true
- | ...
+---
+- true
+...
futures["replace_with_yeild_for_connection"] =
conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
- | ---
- | ...
+---
+...
futures["select_with_yeild_for_connection"] =
conn_space:select({}, {is_async = true});
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ''");
- | ---
- | - true
- | ...
+---
+- true
+...
results = wait_and_return_results(futures)
- | ---
- | ...
+---
+...
-- [1] [2] [3] [4]
-- Select will be processed earlier because of
-- yeild in `replace_with_yeild` function
assert(results["select_with_yeild_for_connection"])
- | ---
- | - - [1]
- | - [2]
- | - [3]
- | - [4]
- | ...
-test_run:switch("test")
- | ---
- | - true
- | ...
+---
+- - [1]
+ - [2]
+ - [3]
+ - [4]
+...
+test_run:switch("test")
+---
+- true
+...
-- [1] [2] [3] [4] [5]
s:select()
- | ---
- | - - [1]
- | - [2]
- | - [3]
- | - [4]
- | - [5]
- | ...
+---
+- - [1]
+ - [2]
+ - [3]
+ - [4]
+ - [5]
+...
errinj = box.error.injection
- | ---
- | ...
+---
+...
assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:switch('default')
- | ---
- | - true
- | ...
+---
+- true
+...
conn:close()
- | ---
- | ...
+---
+...
test_run:wait_cond(function () return get_current_connection_count() == 0 end)
- | ---
- | - true
- | ...
-
+---
+- true
+...
-- Check that all request will be processed
-- after connection close.
conn = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
stream = conn:new_stream()
- | ---
- | ...
+---
+...
space = stream.space.test
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ';'")
- | ---
- | - true
- | ...
+---
+- true
+...
replace_count = 20
for i = 1, replace_count do
space:replace({i}, {is_async = true})
end;
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ''");
- | ---
- | - true
- | ...
+---
+- true
+...
-- Give time to send
fiber.sleep(0)
- | ---
- | ...
+---
+...
conn:close()
- | ---
- | ...
+---
+...
test_run:wait_cond(function () return get_current_connection_count() == 0 end)
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:switch("test")
- | ---
- | - true
- | ...
+---
+- true
+...
-- select return tuples from [1] to [20]
-- because all messages processed after
-- connection closed
s:select{}
- | ---
- | - - [1]
- | - [2]
- | - [3]
- | - [4]
- | - [5]
- | - [6]
- | - [7]
- | - [8]
- | - [9]
- | - [10]
- | - [11]
- | - [12]
- | - [13]
- | - [14]
- | - [15]
- | - [16]
- | - [17]
- | - [18]
- | - [19]
- | - [20]
- | ...
+---
+- - [1]
+ - [2]
+ - [3]
+ - [4]
+ - [5]
+ - [6]
+ - [7]
+ - [8]
+ - [9]
+ - [10]
+ - [11]
+ - [12]
+ - [13]
+ - [14]
+ - [15]
+ - [16]
+ - [17]
+ - [18]
+ - [19]
+ - [20]
+...
+s:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+space:replace({1})
+---
+- [1]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+---
+- []
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+---
+...
+stream:ping()
+---
+- true
+...
+stream:commit()
+---
+...
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+---
+...
+stream:call('s:replace', {{1}})
+---
+- [1]
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+stream:call('s:select', {})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+stream:call('s:replace', {{1}})
+---
+- [1]
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+stream:call('s:select', {})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+test_run:switch('test')
+---
+- true
+...
s:drop()
- | ---
- | ...
+---
+...
+-- Check that there are no streams and messages, which
+-- was not deleted
errinj = box.error.injection
- | ---
- | ...
+---
+...
assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
- | ---
- | - true
- | ...
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+stream:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+---
+...
+space_2_no_stream = conn.space.test_2
+---
+...
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+stream_2:begin()
+---
+...
+space_2:replace({1})
+---
+- [1]
+...
+test_run:switch('test')
+---
+- true
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+---
+- []
+...
+space_2_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+---
+- - [1]
+...
+space_2:select({})
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
test_run:switch("default")
- | ---
- | - true
- | ...
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+---
+- - [1]
+...
+space_2:select{}
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+---
+- - [1]
+...
+s2:select()
+---
+- - [1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
test_run:cmd("stop server test")
- | ---
- | - true
- | ...
+---
+- true
+...
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1_1 = stream_1.space.test_1
+---
+...
+space_1_2 = stream_2.space.test_1
+---
+...
+space_2_1 = stream_1.space.test_2
+---
+...
+space_2_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Simple read/write conflict.
+space_1_1:select({1})
+---
+- []
+...
+space_1_2:select({1})
+---
+- []
+...
+space_1_1:replace({1, 1})
+---
+- [1, 1]
+...
+space_1_2:replace({1, 2})
+---
+- [1, 2]
+...
+stream_1:commit()
+---
+...
+-- This transaction fails, because of conflict
+stream_2:commit()
+---
+- error: Transaction has been aborted by conflict
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- Here we must accept [1, 1]
+space_1_1:select({})
+---
+- - [1, 1]
+...
+space_1_2:select({})
+---
+- - [1, 1]
+...
+-- Same test for vinyl sapce
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_2_1:select({1})
+---
+- []
+...
+space_2_2:select({1})
+---
+- []
+...
+space_2_1:replace({1, 1})
+---
+- [1, 1]
+...
+space_2_2:replace({1, 2})
+---
+- [1, 2]
+...
+stream_1:commit()
+---
+...
+-- This transaction fails, because of conflict
+stream_2:commit()
+---
+- error: Transaction has been aborted by conflict
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- Here we must accept [1, 1]
+space_2_1:select({})
+---
+- - [1, 1]
+...
+space_2_2:select({})
+---
+- - [1, 1]
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+---
+- - [1, 1]
+...
+s2:select()
+---
+- - [1, 1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Test rollback for memtx space
+space_1:replace({1})
+---
+- [1]
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+---
+- - [1]
+...
+stream_1:rollback()
+---
+...
+-- Select is empty, transaction rollback
+space_1:select({})
+---
+- []
+...
+-- Test rollback for vinyl space
+space_2:replace({1})
+---
+- [1]
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+---
+- - [1]
+...
+stream_2:rollback()
+---
+...
+-- Select is empty, transaction rollback
+space_2:select({})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+---
+...
+stream_1:commit()
+---
+...
+stream_2:begin()
+---
+...
+stream_2:commit()
+---
+...
+conn:close()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_1:select({})
+---
+- - [1]
+ - [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_2:select({})
+---
+- - [1]
+ - [2]
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Empty selects, transaction was rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Reconnect
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Two empty selects
+space_1:select({})
+---
+- []
+...
+space_2:select({})
+---
+- []
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_1:select({})
+---
+- - [1]
+ - [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_2:select({})
+---
+- - [1]
+ - [2]
+...
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for i = 1, 1000 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+fiber.sleep(0)
+---
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+---
+- true
+...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+---
+- true
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Select was empty, transaction rollbacked
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = stream_1:commit({is_async = true})
+---
+...
+_ = stream_2:commit({is_async = true})
+---
+...
+fiber.sleep(0)
+---
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+---
+- true
+...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+---
+- true
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+---
+...
+rc2 = s2:select()
+---
+...
+assert(#rc1)
+---
+- 100
+...
+assert(#rc2)
+---
+- 100
+...
+s1:truncate()
+---
+...
+s2:truncate()
+---
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Reconnect
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Two empty selects
+space_1:select({})
+---
+- []
+...
+space_2:select({})
+---
+- []
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Here we get two tuples, commit was successful
+s1:select{}
+---
+- - [1]
+ - [2]
+...
+-- Here we get two tuples, commit was successful
+s2:select{}
+---
+- - [1]
+ - [2]
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("start server test with args='1, true'")
+---
+- true
+...
+test_run:switch('test')
+---
+- true
+...
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+---
+- - [1]
+ - [2]
+...
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+---
+- - [1]
+ - [2]
+...
+box.space.test_1:drop()
+---
+...
+box.space.test_2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream_1 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+memtx_futures = {}
+---
+...
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+---
+...
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+---
+...
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+---
+...
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+---
+...
+vinyl_futures = {}
+---
+...
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+---
+...
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+---
+...
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+---
+...
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+---
+...
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+---
+...
+memtx_results = wait_and_return_results(memtx_futures)
+---
+...
+vinyl_results = wait_and_return_results(vinyl_futures)
+---
+...
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+---
+- true
+...
+assert(not vinyl_results["begin"])
+---
+- true
+...
+-- [1]
+assert(memtx_results["replace"])
+---
+- [1]
+...
+assert(vinyl_results["replace"])
+---
+- [1]
+...
+-- [2]
+assert(memtx_results["insert"])
+---
+- [2]
+...
+assert(vinyl_results["insert"])
+---
+- [2]
+...
+-- [1] [2]
+assert(memtx_results["select"])
+---
+- - [1]
+ - [2]
+...
+assert(vinyl_results["select"])
+---
+- - [1]
+ - [2]
+...
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+---
+- true
+...
+assert(not vinyl_results["commit"])
+---
+- true
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+---
+- - [1]
+ - [2]
+...
+s2:select()
+---
+- - [1]
+ - [2]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1_1 = stream_1.space.test_1
+---
+...
+space_1_2 = stream_2.space.test_1
+---
+...
+space_2_1 = stream_1.space.test_2
+---
+...
+space_2_2 = stream_2.space.test_2
+---
+...
+futures_1 = {}
+---
+...
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+---
+...
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+---
+...
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+---
+...
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+---
+...
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+---
+...
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+---
+...
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+---
+...
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+---
+...
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+---
+...
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+---
+...
+results_1 = wait_and_return_results(futures_1)
+---
+...
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+---
+- true
+...
+assert(not results_1["begin_2"])
+---
+- true
+...
+-- []
+assert(not results_1["select_1_1"][1])
+---
+- true
+...
+assert(not results_1["select_1_2"][1])
+---
+- true
+...
+-- [1]
+assert(results_1["replace_1_1"][1])
+---
+- 1
+...
+-- [1]
+assert(results_1["replace_1_1"][2])
+---
+- 1
+...
+-- [1]
+assert(results_1["replace_1_2"][1])
+---
+- 1
+...
+-- [2]
+assert(results_1["replace_1_2"][2])
+---
+- 2
+...
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+---
+- true
+...
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+---
+- Transaction has been aborted by conflict
+...
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+---
+- [1, 1]
+...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+futures_2 = {}
+---
+...
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+---
+...
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+---
+...
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+---
+...
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+---
+...
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+---
+...
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+---
+...
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+---
+...
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+---
+...
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+---
+...
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+---
+...
+results_2 = wait_and_return_results(futures_2)
+---
+...
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+---
+- true
+...
+assert(not results_2["begin_2"])
+---
+- true
+...
+-- []
+assert(not results_2["select_2_1"][1])
+---
+- true
+...
+assert(not results_2["select_2_2"][1])
+---
+- true
+...
+-- [1]
+assert(results_2["replace_2_1"][1])
+---
+- 1
+...
+-- [1]
+assert(results_2["replace_2_1"][2])
+---
+- 1
+...
+-- [1]
+assert(results_2["replace_2_2"][1])
+---
+- 1
+...
+-- [2]
+assert(results_2["replace_2_2"][2])
+---
+- 2
+...
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+---
+- true
+...
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+---
+- Transaction has been aborted by conflict
+...
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+---
+- [1, 1]
+...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+test_run:switch('test')
+---
+- true
+...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+---
+- - [1, 1]
+...
+s2:select()
+---
+- - [1, 1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+function ping() return "pong" end
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+space_no_stream = conn.space.test
+---
+...
+-- successful begin using stream:call
+stream:call('box.begin')
+---
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+stream:call('ping')
+---
+- pong
+...
+stream:eval('ping()')
+---
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+stream:eval('box.begin()')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- successful commit using stream:call
+stream:call('box.commit')
+---
+...
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+---
+...
+space:replace({1})
+---
+- [1]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, because transaction was successful
+s:select()
+---
+- - [1]
+...
+s:delete{1}
+---
+- [1]
+...
+test_run:switch('default')
+---
+- true
+...
+-- Check rollback using stream:call
+stream:begin()
+---
+...
+space:replace({2})
+---
+- [2]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+---
+- - [2]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+--Successful rollback using stream:call
+stream:call('box.rollback')
+---
+...
+-- Empty selects transaction rollbacked
+space:select({})
+---
+- []
+...
+space_no_stream:select{}
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Empty select transaction rollbacked
+s:select()
+---
+- []
+...
+s:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+for i = 1, 10 do space:replace{i} end
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+s:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+---
+- true
+...
+function execute_sql_string(stream, sql_string)
+ if stream then
+ stream:execute(sql_string)
+ else
+ box.execute(sql_string)
+ end
+end$
+---
+...
+function execute_sql_string_and_return_result(stream, sql_string)
+ if stream then
+ return pcall(stream.execute, stream, sql_string)
+ else
+ return box.execute(sql_string)
+ end
+end$
+---
+...
+function monster_ddl(stream)
+ local _, err1, err2, err3, err4, err5, err6
+ local stream_or_box = stream or box
+ execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER);]])
+ execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER UNIQUE,
+ CONSTRAINT ck1
+ CHECK(b < 100));]])
+
+ execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+ execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+ execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+ KEY, a INTEGER);]])
+
+ execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+ execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+ CHECK(b > 0);]])
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+ RENAME TO t1;]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+ ck2 CHECK(a > 0);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+ INTEGER PRIMARY KEY);]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+
+ execute_sql_string(stream, [[CREATE TABLE
+ trigger_catcher(id INTEGER PRIMARY
+ KEY AUTOINCREMENT);]])
+
+ execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+ execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+ t1 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+ t2 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+ ON t1(a, b);]])
+
+ execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+ _, err5 =
+ execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+ _, err6 =
+ execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+ t_does_not_exist;]])
+
+ execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+ return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+ err5, err6}
+end$
+---
+...
+function monster_ddl_cmp_res(res1, res2)
+ if json.encode(res1) == json.encode(res2) then
+ return true
+ end
+ return res1, res2
+end$
+---
+...
+function monster_ddl_is_clean(stream)
+ local stream_or_box = stream or box
+ assert(stream_or_box.space.T1 == nil)
+ assert(stream_or_box.space.T2 == nil)
+ assert(stream_or_box.space._trigger:count() == 0)
+ assert(stream_or_box.space._fk_constraint:count() == 0)
+ assert(stream_or_box.space._ck_constraint:count() == 0)
+ assert(stream_or_box.space.T_RENAMED == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+---
+...
+function monster_ddl_check(stream)
+ local _, err1, err2, err3, err4, res
+ local stream_or_box = stream or box
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES (1, 1, 101)]])
+ execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES(2, 2, 1)]])
+ _, err3 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, 20, 1)]])
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, -1, 1)]])
+ execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+ if not stream then
+ assert(stream_or_box.space.T_RENAMED ~= nil)
+ assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+ res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ else
+ _, res =
+ execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ end
+ return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+ err3, err4, res}
+end$
+---
+...
+function monster_ddl_clear(stream)
+ execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+---
+...
+test_run:cmd("setopt delimiter ''")$
+---
+- true
+...
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+test_run:switch('test')
+---
+- true
+...
+test_run:cmd("setopt delimiter '$'")
+---
+- true
+...
+function monster_ddl_is_clean()
+ if not (box.space.T1 == nil) or
+ not (box.space.T2 == nil) or
+ not (box.space._trigger:count() == 0) or
+ not (box.space._fk_constraint:count() == 0) or
+ not (box.space._ck_constraint:count() == 0) or
+ not (box.space.T_RENAMED == nil) or
+ not (box.space.T_TO_RENAME == nil) then
+ return false
+ end
+ return true
+end$
+---
+...
+test_run:cmd("setopt delimiter ''")$
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream = conn:new_stream()
+---
+...
+-- No txn.
+true_ddl_res = monster_ddl()
+---
+...
+true_ddl_res
+---
+- - 'Finished ok, errors in the middle: '
+ - Space 'T1' already exists
+ - Space 'T1' already exists
+ - Space 'T3' does not exist
+ - Index 'T1A' already exists in space 'T1'
+ - 'Failed to execute SQL statement: can not truncate space ''T2'' because other
+ objects depend on it'
+ - Space 'T_DOES_NOT_EXIST' does not exist
+...
+true_check_res = monster_ddl_check()
+---
+...
+true_check_res
+---
+- - 'Finished ok, errors and trigger catcher content: '
+ - 'Check constraint failed ''CK1'': b < 100'
+ - Duplicate key exists in unique index "unique_unnamed_T2_2" in space "T2" with
+ old tuple - [1, 1, 1] and new tuple - [2, 2, 1]
+ - 'Failed to execute SQL statement: FOREIGN KEY constraint failed'
+ - 'Check constraint failed ''CK2'': a > 0'
+ - metadata:
+ - name: ID
+ type: integer
+ rows:
+ - [1]
+...
+monster_ddl_clear()
+---
+...
+monster_ddl_is_clean()
+---
+...
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+---
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+---
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+test_run:switch("test")
+---
+- true
+...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+---
+- row_count: 1
+...
+-- reload schema
+stream:ping()
+---
+- true
+...
+space = stream.space.TEST
+---
+...
+assert(space ~= nil)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+space:replace{1, 2, '3'}
+---
+- [1, 2, '3']
+...
+space:select()
+---
+- - [1, 2, '3']
+...
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+---
+- []
+...
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+---
+...
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+---
+...
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+---
+- true
+...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+---
+- metadata:
+ - name: ID
+ type: integer
+ - name: A
+ type: number
+ - name: B
+ type: string
+ rows:
+ - [1, 2, '3']
+...
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+---
+- metadata:
+ - name: ID
+ type: integer
+ - name: A
+ type: number
+ - name: B
+ type: string
+ rows: []
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+---
+- metadata:
+ - name: ID
+ type: integer
+ - name: A
+ type: number
+ - name: B
+ type: string
+ rows:
+ - [1, 2, '3']
+...
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+---
+- metadata:
+ - name: ID
+ type: integer
+ - name: A
+ type: number
+ - name: B
+ type: string
+ rows:
+ - [1, 2, '3']
+...
+stream:unprepare(stream_pr.stmt_id)
+---
+- null
+...
+conn:close()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+---
+- - [1, 2, '3']
+...
+box.space.TEST:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
test_run:cmd("cleanup server test")
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:cmd("delete server test")
- | ---
- | - true
- | ...
+---
+- true
+...
diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua
index 72129a228..f99c16c0d 100644
--- a/test/box/stream.test.lua
+++ b/test/box/stream.test.lua
@@ -1,6 +1,8 @@
-- This test checks streams iplementation in iproto (gh-5860).
net_box = require('net.box')
+json = require('json')
fiber = require('fiber')
+msgpack = require('msgpack')
test_run = require('test_run').new()
test_run:cmd("create server test with script='box/stream.lua'")
@@ -45,6 +47,63 @@ stream = conn:new_stream()
-- The new method `new_stream`, for the stream object, returns a new
-- stream object, just as in the case of connection.
_ = stream:new_stream()
+
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+conn_2 = net_box.connect(server_addr)
+stream_1_1 = conn_1:new_stream()
+stream_1_2 = conn_1:new_stream()
+stream_2 = conn_2:new_stream()
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+stream_1_1:rollback()
+
+stream_1_1:begin()
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+test_run:switch("test")
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+box.commit()
+test_run:switch("default")
+stream_1_1:commit()
+stream_1_2:commit()
+stream_2:commit()
+
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+conn:_request(net_box._method.commit, nil, nil, nil)
+conn:_request(net_box._method.rollback, nil, nil, nil)
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+IPROTO_REQUEST_TYPE = 0x00
+IPROTO_SYNC = 0x01
+IPROTO_AUTH = 7
+IPROTO_STREAM_ID = 0x0a
+next_request_id = 9
+test_run:cmd("setopt delimiter ';'")
+header = msgpack.encode({
+ [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+ [IPROTO_SYNC] = next_request_id,
+ [IPROTO_STREAM_ID] = 1,
+});
+body = msgpack.encode({nil});
+size = msgpack.encode(header:len() + body:len());
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+ nil, nil, nil, nil,
+ size .. header .. body);
+test_run:cmd("setopt delimiter ''");
conn:close()
-- Check that spaces in stream object updates, during reload_schema
@@ -178,5 +237,1148 @@ assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
test_run:switch("default")
test_run:cmd("stop server test")
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+test_run:switch('default')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+-- Select is empty, transaction was aborted
+space:select{}
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+stream:ping()
+stream:commit()
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+-- Select is empty, transaction was aborted
+space:select{}
+
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+-- Select is empty, transaction was aborted
+space:select{}
+
+test_run:switch('test')
+s:drop()
+-- Check that there are no streams and messages, which
+-- was not deleted
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+stream:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s1:create_index('primary')
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+space_2_no_stream = conn.space.test_2
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+space_1:replace({1})
+stream_2:begin()
+space_2:replace({1})
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+test_run:switch('default')
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+space_2_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+space_2:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+space_2:select{}
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Simple read/write conflict.
+space_1_1:select({1})
+space_1_2:select({1})
+space_1_1:replace({1, 1})
+space_1_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_1_1:select({})
+space_1_2:select({})
+
+-- Same test for vinyl sapce
+stream_1:begin()
+stream_2:begin()
+space_2_1:select({1})
+space_2_2:select({1})
+space_2_1:replace({1, 1})
+space_2_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_2_1:select({})
+space_2_2:select({})
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Test rollback for memtx space
+space_1:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+stream_1:rollback()
+-- Select is empty, transaction rollback
+space_1:select({})
+
+-- Test rollback for vinyl space
+space_2:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+stream_2:rollback()
+-- Select is empty, transaction rollback
+space_2:select({})
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+stream_1:commit()
+stream_2:begin()
+stream_2:commit()
+conn:close()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+conn:close()
+
+test_run:switch("test")
+-- Empty selects, transaction was rollback
+s1:select()
+s2:select()
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+for i = 1, 1000 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select was empty, transaction rollbacked
+s1:select()
+s2:select()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+test_run:cmd("setopt delimiter ';'")
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+_ = stream_1:commit({is_async = true})
+_ = stream_2:commit({is_async = true})
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+rc2 = s2:select()
+assert(#rc1)
+assert(#rc2)
+s1:truncate()
+s2:truncate()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+
+stream_1:begin()
+stream_2:begin()
+space_1:replace({1})
+space_1:replace({2})
+space_2:replace({1})
+space_2:replace({2})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+s1:select{}
+-- Here we get two tuples, commit was successful
+s2:select{}
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+test_run:cmd("start server test with args='1, true'")
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+box.space.test_1:drop()
+box.space.test_2:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+space_1 = stream_1.space.test_1
+stream_2 = conn:new_stream()
+space_2 = stream_2.space.test_2
+
+memtx_futures = {}
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+
+vinyl_futures = {}
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+
+memtx_results = wait_and_return_results(memtx_futures)
+vinyl_results = wait_and_return_results(vinyl_futures)
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+assert(not vinyl_results["begin"])
+-- [1]
+assert(memtx_results["replace"])
+assert(vinyl_results["replace"])
+-- [2]
+assert(memtx_results["insert"])
+assert(vinyl_results["insert"])
+-- [1] [2]
+assert(memtx_results["select"])
+assert(vinyl_results["select"])
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+assert(not vinyl_results["commit"])
+
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+
+futures_1 = {}
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+
+results_1 = wait_and_return_results(futures_1)
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+assert(not results_1["begin_2"])
+-- []
+assert(not results_1["select_1_1"][1])
+assert(not results_1["select_1_2"][1])
+-- [1]
+assert(results_1["replace_1_1"][1])
+-- [1]
+assert(results_1["replace_1_1"][2])
+-- [1]
+assert(results_1["replace_1_2"][1])
+-- [2]
+assert(results_1["replace_1_2"][2])
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+
+futures_2 = {}
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+
+results_2 = wait_and_return_results(futures_2)
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+assert(not results_2["begin_2"])
+-- []
+assert(not results_2["select_2_1"][1])
+assert(not results_2["select_2_2"][1])
+-- [1]
+assert(results_2["replace_2_1"][1])
+-- [1]
+assert(results_2["replace_2_1"][2])
+-- [1]
+assert(results_2["replace_2_2"][1])
+-- [2]
+assert(results_2["replace_2_2"][2])
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+function ping() return "pong" end
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+space_no_stream = conn.space.test
+
+-- successful begin using stream:call
+stream:call('box.begin')
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+stream:call('ping')
+stream:eval('ping()')
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+stream:eval('box.begin()')
+-- successful commit using stream:call
+stream:call('box.commit')
+
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+test_run:switch("test")
+-- Select return tuple, because transaction was successful
+s:select()
+s:delete{1}
+test_run:switch('default')
+-- Check rollback using stream:call
+stream:begin()
+space:replace({2})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful rollback using stream:call
+stream:call('box.rollback')
+-- Empty selects transaction rollbacked
+space:select({})
+space_no_stream:select{}
+test_run:switch("test")
+-- Empty select transaction rollbacked
+s:select()
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+for i = 1, 10 do space:replace{i} end
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+s:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+function execute_sql_string(stream, sql_string)
+ if stream then
+ stream:execute(sql_string)
+ else
+ box.execute(sql_string)
+ end
+end$
+function execute_sql_string_and_return_result(stream, sql_string)
+ if stream then
+ return pcall(stream.execute, stream, sql_string)
+ else
+ return box.execute(sql_string)
+ end
+end$
+function monster_ddl(stream)
+ local _, err1, err2, err3, err4, err5, err6
+ local stream_or_box = stream or box
+ execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER);]])
+ execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER UNIQUE,
+ CONSTRAINT ck1
+ CHECK(b < 100));]])
+
+ execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+ execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+ execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+ KEY, a INTEGER);]])
+
+ execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+ execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+ CHECK(b > 0);]])
+
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+ RENAME TO t1;]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+ ck2 CHECK(a > 0);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+ INTEGER PRIMARY KEY);]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+
+ execute_sql_string(stream, [[CREATE TABLE
+ trigger_catcher(id INTEGER PRIMARY
+ KEY AUTOINCREMENT);]])
+
+ execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+ execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+ t1 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+ t2 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+ ON t1(a, b);]])
+
+ execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+ _, err5 =
+ execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+ _, err6 =
+ execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+ t_does_not_exist;]])
+
+ execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+ return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+ err5, err6}
+end$
+function monster_ddl_cmp_res(res1, res2)
+ if json.encode(res1) == json.encode(res2) then
+ return true
+ end
+ return res1, res2
+end$
+function monster_ddl_is_clean(stream)
+ local stream_or_box = stream or box
+ assert(stream_or_box.space.T1 == nil)
+ assert(stream_or_box.space.T2 == nil)
+ assert(stream_or_box.space._trigger:count() == 0)
+ assert(stream_or_box.space._fk_constraint:count() == 0)
+ assert(stream_or_box.space._ck_constraint:count() == 0)
+ assert(stream_or_box.space.T_RENAMED == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+function monster_ddl_check(stream)
+ local _, err1, err2, err3, err4, res
+ local stream_or_box = stream or box
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES (1, 1, 101)]])
+ execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES(2, 2, 1)]])
+ _, err3 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, 20, 1)]])
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, -1, 1)]])
+ execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+ if not stream then
+ assert(stream_or_box.space.T_RENAMED ~= nil)
+ assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+ res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ else
+ _, res =
+ execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ end
+ return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+ err3, err4, res}
+end$
+function monster_ddl_clear(stream)
+ execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+test_run:cmd("setopt delimiter ''")$
+
+test_run:cmd("start server test with args='10, true'")
+test_run:switch('test')
+test_run:cmd("setopt delimiter '$'")
+function monster_ddl_is_clean()
+ if not (box.space.T1 == nil) or
+ not (box.space.T2 == nil) or
+ not (box.space._trigger:count() == 0) or
+ not (box.space._fk_constraint:count() == 0) or
+ not (box.space._ck_constraint:count() == 0) or
+ not (box.space.T_RENAMED == nil) or
+ not (box.space.T_TO_RENAME == nil) then
+ return false
+ end
+ return true
+end$
+test_run:cmd("setopt delimiter ''")$
+test_run:switch('default')
+
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+
+-- No txn.
+true_ddl_res = monster_ddl()
+true_ddl_res
+
+true_check_res = monster_ddl_check()
+true_check_res
+
+monster_ddl_clear()
+monster_ddl_is_clean()
+
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+-- reload schema
+stream:ping()
+space = stream.space.TEST
+assert(space ~= nil)
+stream:execute('START TRANSACTION')
+space:replace{1, 2, '3'}
+space:select()
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:execute('COMMIT')
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:unprepare(stream_pr.stmt_id)
+conn:close()
+test_run:switch('test')
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+box.space.TEST:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
test_run:cmd("cleanup server test")
test_run:cmd("delete server test")
--
2.20.1
More information about the Tarantool-patches
mailing list