From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id E3C546EC40; Mon, 9 Aug 2021 17:40:42 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org E3C546EC40 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628520043; bh=s4hPPsMrL927ryjj1o25AXyIUHNN1Vz2wRQ63MyzYPE=; h=To:Cc:Date:In-Reply-To:References:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=I90OwHQWUIeK9ZRb3e+u6keIELASNICFbS1ZbvlZ3EKfvsiv7ZVme+U97vlrixTun As9K3ntmSD8TVhBO3aJdyK1PGZJIcmXPoczgQRNSMI/D52owRlfZ32QqDFKTtgjBYt 0jqCI7d90SKpKmThdC8qMG8dn+pS67Ji7/fGJKiQ= Received: from smtp30.i.mail.ru (smtp30.i.mail.ru [94.100.177.90]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 5F2456EC42 for ; Mon, 9 Aug 2021 17:38:10 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 5F2456EC42 Received: by smtp30.i.mail.ru with esmtpa (envelope-from ) id 1mD6Po-0007bZ-EF; Mon, 09 Aug 2021 17:38:09 +0300 To: tarantool-patches@dev.tarantool.org, vdavydov@tarantool.org, v.shpilevoy@tarantool.org Cc: mechanik20051988 Date: Mon, 9 Aug 2021 17:38:00 +0300 Message-Id: X-Mailer: git-send-email 2.20.1 In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD9736CF3E71F18CE0C3E1D5927724F4AAA182A05F538085040A047B52EBF059F81F7ABA3EA33E69741C071F99FFD21C14EDF4D77E5D58D9F3C X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7DB7B102DCB413779EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006373745FD4183B699148638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D8337D7ADEDC76765B4C50A4C921A2C5BB117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCAA867293B0326636D2E47CDBA5A96583BD4B6F7A4D31EC0BC014FD901B82EE079FA2833FD35BB23D27C277FBC8AE2E8B3A703B70628EAD7BA471835C12D1D977C4224003CC836476EB9C4185024447017B076A6E789B0E975F5C1EE8F4F765FC98E6593B218E692D3AA81AA40904B5D9CF19DD082D7633A078D18283394535A93AA81AA40904B5D98AA50765F79006379B06BA6FE78CAE96D81D268191BDAD3D698AB9A7B718F8C4D1B931868CE1C5781A620F70A64A45A98AA50765F79006372E808ACE2090B5E1725E5C173C3A84C3C5EA940A35A165FF2DBA43225CD8A89F72BE6798D6036352CE5475246E174218B5C8C57E37DE458BEDA766A37F9254B7 X-B7AD71C0: AC4F5C86D027EB782CDD5689AFBDA7A213B5FB47DCBC3458F0AFF96BAACF4158235E5A14AD4A4A4625E192CAD1D9E79D94463893BF8742D0EA5434651F0D12C2 X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8183A4AFAF3EA6BDC446469D8A8717206BB6B668463F4F89F868704450D2F563FD81A3074A726B6A7FE9C2B6934AE262D3EE7EAB7254005DCED7532B743992DF240BDC6A1CF3F042BAD6DF99611D93F60EF6EFF71F1B3C06F72699F904B3F4130E343918A1A30D5E7FCCB5012B2E24CD356 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D3419891600CCEF46076E78ABFEC9463E5C975539945D1D3FF5E18F89CD58C95A37A025AD96AEF388C31D7E09C32AA3244C65E546A1E659422F763EC7E3DA6F71CB35DA7DC5AF9B58C0927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojGhQhWEp1aB+smUyNMXLa8g== X-Mailru-Sender: 583F1D7ACE8F49BD29FC049B2A5BF96391A1E5AAB912782F307D0C2BA3057A0C7F1D6B0BD5765AC9B79567116EAC6FCF4E830D9205DBEA545646F0D3C63A617F27ACC94E9A535D22112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: [Tarantool-patches] [PATCH v2 8/8] net.box: add interactive transaction support in net.box X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: mechanik20051988 via Tarantool-patches Reply-To: mechanik20051988 Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" From: mechanik20051988 Implement `begin`, `commit` and `rollback` methods for stream object in `net.box`, which allows to begin, commit and rollback transaction accordingly. Closes #5860 @TarantoolBot document Title: add interactive transaction support in net.box Implement `begin`, `commit` and `rollback` methods for stream object in `net.box`, which allows to begin, commit and rollback transaction accordingly. Now there are multiple ways to begin, commit and rollback transaction from `net.box`: using appropriate stream methods, using 'call` or 'eval' methods or using `execute` method with sql transaction syntax. User can mix these methods, for example, start transaction using `stream:begin()`, and commit transaction using `stream:call('box.commit')` or stream:execute('COMMIT'). Simple example of using interactive transactions via iproto from net.box: ```lua stream = conn:new_stream() space = stream.space.test space_not_from_stream = conn.space.test stream:begin() space:replace({1}) -- return previously inserted tuple, because request -- belongs to transaction. space:select({}) -- empty select, because select doesn't belongs to -- transaction space_not_from_stream:select({}) stream:call('box.commit') -- now transaction was commited, so all requests -- returns tuple. ``` Different examples of using streams you can find in gh-5860-implement-streams-in-iproto.test.lua --- .../gh-5860-implement-streams-in-iproto.md | 26 + src/box/lua/net_box.c | 51 +- src/box/lua/net_box.lua | 35 +- test/box/stream.result | 3558 +++++++++++++++-- test/box/stream.test.lua | 1202 ++++++ 5 files changed, 4556 insertions(+), 316 deletions(-) create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md new file mode 100644 index 000000000..8a8eec3e7 --- /dev/null +++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md @@ -0,0 +1,26 @@ +## feature/core + +* Streams and interactive transactions over streams are implemented + in iproto. Stream is associated with it's ID, which is unique within + one connection. All requests with same not zero stream ID belongs to + the same stream. All requests in stream processed synchronously. The + execution of the next request will not start until the previous one is + completed. If request has zero stream ID it does not belong to stream + and is processed in the old way. + In `net.box`, stream is an object above connection that has the same + methods, but allows to execute requests sequentially. ID is generated + on the client side automatically. If user writes his own connector and + wants to use streams, he must transmit stream_id over iproto protocol. + The main purpose of streams is transactions via iproto. Each stream + can start its own transaction, so they allows multiplexing several + transactions over one connection. There are multiple ways to begin, + commit and rollback transaction: using appropriate stream methods, using + `call` or `eval` methods or using `execute` method with sql transaction + syntax. User can mix these methods, for example, start transaction using + `stream:begin()`, and commit transaction using `stream:call('box.commit')` + or stream:execute('COMMIT'). + If any request fails during the transaction, it will not affect the other + requests in the transaction. If disconnect occurs when there is some active + transaction in stream, this transaction will be rollbacked, if it does not + have time to commit before this moment. + diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index df36e3991..43dbb6448 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -70,7 +70,10 @@ enum netbox_method { NETBOX_MIN = 14, NETBOX_MAX = 15, NETBOX_COUNT = 16, - NETBOX_INJECT = 17, + NETBOX_BEGIN = 17, + NETBOX_COMMIT = 18, + NETBOX_ROLLBACK = 19, + NETBOX_INJECT = 20, netbox_method_MAX }; @@ -618,6 +621,46 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream, netbox_encode_prepare(L, idx, stream, sync, stream_id); } +static inline void +netbox_encode_txn(lua_State *L, enum iproto_type type, int idx, + struct mpstream *stream, uint64_t sync, + uint64_t stream_id) +{ + (void)L; + (void) idx; + assert(type == IPROTO_TRANSACTION_BEGIN || + type == IPROTO_TRANSACTION_COMMIT || + type == IPROTO_TRANSACTION_ROLLBACK); + size_t svp = netbox_prepare_request(stream, sync, + type, stream_id); + + netbox_encode_request(stream, svp); +} + +static void +netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream, + uint64_t sync, uint64_t stream_id) +{ + return netbox_encode_txn(L, IPROTO_TRANSACTION_BEGIN, idx, stream, + sync, stream_id); +} + +static void +netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream, + uint64_t sync, uint64_t stream_id) +{ + return netbox_encode_txn(L, IPROTO_TRANSACTION_COMMIT, idx, stream, + sync, stream_id); +} + +static void +netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream, + uint64_t sync, uint64_t stream_id) +{ + return netbox_encode_txn(L, IPROTO_TRANSACTION_ROLLBACK, idx, stream, + sync, stream_id); +} + static void netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream, uint64_t sync, uint64_t stream_id) @@ -665,6 +708,9 @@ netbox_encode_method(struct lua_State *L) [NETBOX_MIN] = netbox_encode_select, [NETBOX_MAX] = netbox_encode_select, [NETBOX_COUNT] = netbox_encode_call, + [NETBOX_BEGIN] = netbox_encode_begin, + [NETBOX_COMMIT] = netbox_encode_commit, + [NETBOX_ROLLBACK] = netbox_encode_rollback, [NETBOX_INJECT] = netbox_encode_inject, }; enum netbox_method method = lua_tointeger(L, 1); @@ -1045,6 +1091,9 @@ netbox_decode_method(struct lua_State *L) [NETBOX_MIN] = netbox_decode_tuple, [NETBOX_MAX] = netbox_decode_tuple, [NETBOX_COUNT] = netbox_decode_value, + [NETBOX_BEGIN] = netbox_decode_nil, + [NETBOX_COMMIT] = netbox_decode_nil, + [NETBOX_ROLLBACK] = netbox_decode_nil, [NETBOX_INJECT] = netbox_decode_table, }; enum netbox_method method = lua_tointeger(L, 1); diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 34b396235..9e653c312 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -70,8 +70,11 @@ local M_GET = 13 local M_MIN = 14 local M_MAX = 15 local M_COUNT = 16 +local M_BEGIN = 17 +local M_COMMIT = 18 +local M_ROLLBACK = 19 -- Injects raw data into connection. Used by console and tests. -local M_INJECT = 17 +local M_INJECT = 20 ffi.cdef[[ struct error * @@ -1196,11 +1199,38 @@ local function new_stream(stream) return stream._conn:new_stream() end +local function begin(stream, opts) + check_remote_arg(stream, 'begin') + local res = stream:_request(M_BEGIN, opts, nil, stream._stream_id) + if opts and opts.is_async then + return res + end +end + +local function commit(stream, opts) + check_remote_arg(stream, 'commit') + local res = stream:_request(M_COMMIT, opts, nil, stream._stream_id) + if opts and opts.is_async then + return res + end +end + +local function rollback(stream, opts) + check_remote_arg(stream, 'rollback') + local res = stream:_request(M_ROLLBACK, opts, nil, stream._stream_id) + if opts and opts.is_async then + return res + end +end + function remote_methods:new_stream() check_remote_arg(self, 'new_stream') self._last_stream_id = self._last_stream_id + 1 local stream = setmetatable({ new_stream = new_stream, + begin = begin, + commit = commit, + rollback = rollback, _stream_id = self._last_stream_id, space = setmetatable({ _stream_space_cache = {}, @@ -1685,6 +1715,9 @@ local this_module = { min = M_MIN, max = M_MAX, count = M_COUNT, + begin = M_BEGIN, + commit = M_COMMIT, + rollback = M_ROLLBACK, inject = M_INJECT, } } diff --git a/test/box/stream.result b/test/box/stream.result index 03200ecf6..95fd1ca51 100644 --- a/test/box/stream.result +++ b/test/box/stream.result @@ -1,24 +1,27 @@ --- test-run result file version 2 -- This test checks streams iplementation in iproto (gh-5860). net_box = require('net.box') - | --- - | ... +--- +... +json = require('json') +--- +... fiber = require('fiber') - | --- - | ... +--- +... +msgpack = require('msgpack') +--- +... test_run = require('test_run').new() - | --- - | ... - +--- +... test_run:cmd("create server test with script='box/stream.lua'") - | --- - | - true - | ... - +--- +- true +... test_run:cmd("setopt delimiter ';'") - | --- - | - true - | ... +--- +- true +... function get_current_connection_count() local total_net_stat_table = test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1] @@ -27,8 +30,8 @@ function get_current_connection_count() assert(connection_stat_table) return connection_stat_table.current end; - | --- - | ... +--- +... function wait_and_return_results(futures) local results = {} for name, future in pairs(futures) do @@ -40,446 +43,3373 @@ function wait_and_return_results(futures) end return results end; - | --- - | ... +--- +... test_run:cmd("setopt delimiter ''"); - | --- - | - true - | ... - +--- +- true +... -- Some simple checks for new object - stream test_run:cmd("start server test with args='1'") - | --- - | - true - | ... +--- +- true +... server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] - | --- - | ... +--- +... conn_1 = net_box.connect(server_addr) - | --- - | ... +--- +... stream_1 = conn_1:new_stream() - | --- - | ... +--- +... conn_2 = net_box.connect(server_addr) - | --- - | ... +--- +... stream_2 = conn_2:new_stream() - | --- - | ... +--- +... -- Stream is a wrapper around connection, so if you close connection -- you close stream, and vice versa. conn_1:close() - | --- - | ... +--- +... assert(not stream_1:ping()) - | --- - | - true - | ... +--- +- true +... stream_2:close() - | --- - | ... +--- +... assert(not conn_2:ping()) - | --- - | - true - | ... +--- +- true +... conn = net_box.connect(server_addr) - | --- - | ... +--- +... stream = conn:new_stream() - | --- - | ... +--- +... -- The new method `new_stream`, for the stream object, returns a new -- stream object, just as in the case of connection. _ = stream:new_stream() - | --- - | ... +--- +... +-- Simple checks for transactions +conn_1 = net_box.connect(server_addr) +--- +... +conn_2 = net_box.connect(server_addr) +--- +... +stream_1_1 = conn_1:new_stream() +--- +... +stream_1_2 = conn_1:new_stream() +--- +... +stream_2 = conn_2:new_stream() +--- +... +-- It's ok to commit or rollback without any active transaction +stream_1_1:commit() +--- +... +stream_1_1:rollback() +--- +... +stream_1_1:begin() +--- +... +-- Error unable to start second transaction in one stream +stream_1_1:begin() +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +-- It's ok to start transaction in separate stream in one connection +stream_1_2:begin() +--- +... +-- It's ok to start transaction in separate stream in other connection +stream_2:begin() +--- +... +test_run:switch("test") +--- +- true +... +-- It's ok to start local transaction separately with active stream +-- transactions +box.begin() +--- +... +box.commit() +--- +... +test_run:switch("default") +--- +- true +... +stream_1_1:commit() +--- +... +stream_1_2:commit() +--- +... +stream_2:commit() +--- +... +-- Check unsupported requests +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +-- Begin, commit and rollback supported only for streams +conn:_request(net_box._method.begin, nil, nil, nil) +--- +- error: Unable to process BEGIN request out of stream +... +conn:_request(net_box._method.commit, nil, nil, nil) +--- +- error: Unable to process COMMIT request out of stream +... +conn:_request(net_box._method.rollback, nil, nil, nil) +--- +- error: Unable to process ROLLBACK request out of stream +... +-- Not all requests supported by stream. +stream = conn:new_stream() +--- +... +-- Start transaction to allocate stream object on the +-- server side +stream:begin() +--- +... +IPROTO_REQUEST_TYPE = 0x00 +--- +... +IPROTO_SYNC = 0x01 +--- +... +IPROTO_AUTH = 7 +--- +... +IPROTO_STREAM_ID = 0x0a +--- +... +next_request_id = 9 +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +header = msgpack.encode({ + [IPROTO_REQUEST_TYPE] = IPROTO_AUTH, + [IPROTO_SYNC] = next_request_id, + [IPROTO_STREAM_ID] = 1, +}); +--- +... +body = msgpack.encode({nil}); +--- +... +size = msgpack.encode(header:len() + body:len()); +--- +... +conn._transport.perform_request(nil, nil, false, net_box._method.inject, + nil, nil, nil, nil, + size .. header .. body); +--- +- null +- Unable to process AUTH request in stream +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... conn:close() - | --- - | ... - +--- +... -- Check that spaces in stream object updates, during reload_schema conn = net_box.connect(server_addr) - | --- - | ... +--- +... stream = conn:new_stream() - | --- - | ... +--- +... test_run:switch("test") - | --- - | - true - | ... +--- +- true +... -- Create one space on server s = box.schema.space.create('test', { engine = 'memtx' }) - | --- - | ... +--- +... _ = s:create_index('primary') - | --- - | ... +--- +... test_run:switch("default") - | --- - | - true - | ... +--- +- true +... assert(not conn.space.test) - | --- - | - true - | ... +--- +- true +... assert(not stream.space.test) - | --- - | - true - | ... +--- +- true +... assert(conn.schema_version == stream._schema_version) - | --- - | - true - | ... +--- +- true +... conn:reload_schema() - | --- - | ... +--- +... assert(conn.space.test ~= nil) - | --- - | - true - | ... +--- +- true +... assert(conn.schema_version ~= stream._schema_version) - | --- - | - true - | ... +--- +- true +... assert(stream.space.test ~= nil) - | --- - | - true - | ... +--- +- true +... -- When we touch stream.space, we compare stream._schema_version -- and conn.schema_version if they are not equal, we clear stream -- space cache, update it's _schema_version and load space from -- connection to stream space cache. assert(conn.schema_version == stream._schema_version) - | --- - | - true - | ... +--- +- true +... collectgarbage() - | --- - | - 0 - | ... +--- +- 0 +... collectgarbage() - | --- - | - 0 - | ... +--- +- 0 +... assert(conn.space.test ~= nil) - | --- - | - true - | ... +--- +- true +... assert(stream.space.test ~= nil) - | --- - | - true - | ... +--- +- true +... test_run:switch("test") - | --- - | - true - | ... +--- +- true +... s:drop() - | --- - | ... +--- +... test_run:switch("default") - | --- - | - true - | ... +--- +- true +... conn:reload_schema() - | --- - | ... +--- +... assert(not conn.space.test) - | --- - | - true - | ... +--- +- true +... assert(not stream.space.test) - | --- - | - true - | ... +--- +- true +... test_run:cmd("stop server test") - | --- - | - true - | ... - +--- +- true +... -- All test works with iproto_thread count = 10 - test_run:cmd("start server test with args='10'") - | --- - | - true - | ... +--- +- true +... test_run:switch('test') - | --- - | - true - | ... +--- +- true +... fiber = require('fiber') - | --- - | ... +--- +... s = box.schema.space.create('test', { engine = 'memtx' }) - | --- - | ... +--- +... _ = s:create_index('primary') - | --- - | ... +--- +... test_run:cmd("setopt delimiter ';'") - | --- - | - true - | ... +--- +- true +... function replace_with_yeild(item) fiber.sleep(0.1) return s:replace({item}) end; - | --- - | ... +--- +... test_run:cmd("setopt delimiter ''"); - | --- - | - true - | ... +--- +- true +... test_run:switch('default') - | --- - | - true - | ... - +--- +- true +... conn = net_box.connect(server_addr) - | --- - | ... +--- +... assert(conn:ping()) - | --- - | - true - | ... +--- +- true +... conn_space = conn.space.test - | --- - | ... +--- +... stream = conn:new_stream() - | --- - | ... +--- +... stream_space = stream.space.test - | --- - | ... - +--- +... -- Check that all requests in stream processed consistently futures = {} - | --- - | ... +--- +... replace_count = 3 - | --- - | ... +--- +... test_run:cmd("setopt delimiter ';'") - | --- - | - true - | ... +--- +- true +... for i = 1, replace_count do futures[string.format("replace_%d", i)] = stream_space:replace({i}, {is_async = true}) futures[string.format("select_%d", i)] = stream_space:select({}, {is_async = true}) end; - | --- - | ... +--- +... futures["replace_with_yeild_for_stream"] = stream:call("replace_with_yeild", { replace_count + 1 }, {is_async = true}); - | --- - | ... +--- +... futures["select_with_yeild_for_stream"] = stream_space:select({}, {is_async = true}); - | --- - | ... +--- +... test_run:cmd("setopt delimiter ''"); - | --- - | - true - | ... +--- +- true +... results = wait_and_return_results(futures) - | --- - | ... +--- +... -- [1] assert(results["select_1"]) - | --- - | - - [1] - | ... +--- +- - [1] +... -- [1] [2] assert(results["select_2"]) - | --- - | - - [1] - | - [2] - | ... +--- +- - [1] + - [2] +... -- [1] [2] [3] assert(results["select_3"]) - | --- - | - - [1] - | - [2] - | - [3] - | ... +--- +- - [1] + - [2] + - [3] +... -- [1] [2] [3] [4] -- Even yeild in replace function does not affect -- the order of requests execution in stream assert(results["select_with_yeild_for_stream"]) - | --- - | - - [1] - | - [2] - | - [3] - | - [4] - | ... - +--- +- - [1] + - [2] + - [3] + - [4] +... -- There is no request execution order for the connection futures = {} - | --- - | ... +--- +... test_run:cmd("setopt delimiter ';'") - | --- - | - true - | ... +--- +- true +... futures["replace_with_yeild_for_connection"] = conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true}); - | --- - | ... +--- +... futures["select_with_yeild_for_connection"] = conn_space:select({}, {is_async = true}); - | --- - | ... +--- +... test_run:cmd("setopt delimiter ''"); - | --- - | - true - | ... +--- +- true +... results = wait_and_return_results(futures) - | --- - | ... +--- +... -- [1] [2] [3] [4] -- Select will be processed earlier because of -- yeild in `replace_with_yeild` function assert(results["select_with_yeild_for_connection"]) - | --- - | - - [1] - | - [2] - | - [3] - | - [4] - | ... -test_run:switch("test") - | --- - | - true - | ... +--- +- - [1] + - [2] + - [3] + - [4] +... +test_run:switch("test") +--- +- true +... -- [1] [2] [3] [4] [5] s:select() - | --- - | - - [1] - | - [2] - | - [3] - | - [4] - | - [5] - | ... +--- +- - [1] + - [2] + - [3] + - [4] + - [5] +... errinj = box.error.injection - | --- - | ... +--- +... assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) - | --- - | - true - | ... +--- +- true +... assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) - | --- - | - true - | ... +--- +- true +... test_run:switch('default') - | --- - | - true - | ... +--- +- true +... conn:close() - | --- - | ... +--- +... test_run:wait_cond(function () return get_current_connection_count() == 0 end) - | --- - | - true - | ... - +--- +- true +... -- Check that all request will be processed -- after connection close. conn = net_box.connect(server_addr) - | --- - | ... +--- +... stream = conn:new_stream() - | --- - | ... +--- +... space = stream.space.test - | --- - | ... +--- +... test_run:cmd("setopt delimiter ';'") - | --- - | - true - | ... +--- +- true +... replace_count = 20 for i = 1, replace_count do space:replace({i}, {is_async = true}) end; - | --- - | ... +--- +... test_run:cmd("setopt delimiter ''"); - | --- - | - true - | ... +--- +- true +... -- Give time to send fiber.sleep(0) - | --- - | ... +--- +... conn:close() - | --- - | ... +--- +... test_run:wait_cond(function () return get_current_connection_count() == 0 end) - | --- - | - true - | ... +--- +- true +... test_run:switch("test") - | --- - | - true - | ... +--- +- true +... -- select return tuples from [1] to [20] -- because all messages processed after -- connection closed s:select{} - | --- - | - - [1] - | - [2] - | - [3] - | - [4] - | - [5] - | - [6] - | - [7] - | - [8] - | - [9] - | - [10] - | - [11] - | - [12] - | - [13] - | - [14] - | - [15] - | - [16] - | - [17] - | - [18] - | - [19] - | - [20] - | ... +--- +- - [1] + - [2] + - [3] + - [4] + - [5] + - [6] + - [7] + - [8] + - [9] + - [10] + - [11] + - [12] + - [13] + - [14] + - [15] + - [16] + - [17] + - [18] + - [19] + - [20] +... +s:drop() +--- +... +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch("default") +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Second argument (false is a value for memtx_use_mvcc_engine option) +-- Server start without active transaction manager, so all transaction +-- fails because of yeild! +test_run:cmd("start server test with args='10, false'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s = box.schema.space.create('test', { engine = 'memtx' }) +--- +... +_ = s:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream = conn:new_stream() +--- +... +space = stream.space.test +--- +... +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = false +stream:begin() +--- +... +test_run:switch('test') +--- +- true +... +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1) +--- +- true +... +test_run:switch('default') +--- +- true +... +space:replace({1}) +--- +- [1] +... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space:select{} +--- +- [] +... +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +--- +- [] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:commit() +--- +- error: Transaction has been aborted by a fiber yield +... +-- Select is empty, transaction was aborted +space:select{} +--- +- [] +... +-- Check that after failed transaction commit we able to start next +-- transaction (it's strange check, but it's necessary because it was +-- bug with it) +stream:begin() +--- +... +stream:ping() +--- +- true +... +stream:commit() +--- +... +-- Same checks for `call` end `eval` functions. +stream:call('box.begin') +--- +... +stream:call('s:replace', {{1}}) +--- +- [1] +... +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +--- +- [] +... +stream:call('s:select', {}) +--- +- [] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:eval('box.commit()') +--- +- error: Transaction has been aborted by a fiber yield +... +-- Select is empty, transaction was aborted +space:select{} +--- +- [] +... +-- Same checks for `execute` function which can also +-- begin and commit transaction. +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +stream:call('s:replace', {{1}}) +--- +- [1] +... +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +--- +- [] +... +stream:call('s:select', {}) +--- +- [] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:execute('COMMIT') +--- +- error: Transaction has been aborted by a fiber yield +... +-- Select is empty, transaction was aborted +space:select{} +--- +- [] +... +test_run:switch('test') +--- +- true +... s:drop() - | --- - | ... +--- +... +-- Check that there are no streams and messages, which +-- was not deleted errinj = box.error.injection - | --- - | ... +--- +... assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) - | --- - | - true - | ... +--- +- true +... assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) - | --- - | - true - | ... +--- +- true +... +test_run:switch('default') +--- +- true +... +stream:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Next we check transactions only for memtx with +-- memtx_use_mvcc_engine = true and for vinyl, because +-- if memtx_use_mvcc_engine = false all transactions fails, +-- as we can see before! +-- Second argument (true is a value for memtx_use_mvcc_engine option) +-- Same test case as previous but server start with active transaction +-- manager. Also check vinyl, because it's behaviour is same. +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s1:create_index('primary') +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +-- Spaces getting from connection, not from stream has no stream_id +-- and not belongs to stream +space_1_no_stream = conn.space.test_1 +--- +... +space_2_no_stream = conn.space.test_2 +--- +... +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = true and to vinyl: +-- behaviour is same! +stream_1:begin() +--- +... +space_1:replace({1}) +--- +- [1] +... +stream_2:begin() +--- +... +space_2:replace({1}) +--- +- [1] +... +test_run:switch('test') +--- +- true +... +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2) +--- +- true +... +test_run:switch('default') +--- +- true +... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_1_no_stream:select{} +--- +- [] +... +space_2_no_stream:select{} +--- +- [] +... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) +--- +- - [1] +... +space_2:select({}) +--- +- - [1] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +-- Commit was successful, transaction can yeild with +-- memtx_use_mvcc_engine = true. Vinyl transactions +-- can yeild also. +stream_1:commit() +--- +... +stream_2:commit() +--- +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... test_run:switch("default") - | --- - | - true - | ... +--- +- true +... +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_1:select{} +--- +- - [1] +... +space_2:select{} +--- +- - [1] +... +test_run:switch("test") +--- +- true +... +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() +--- +- - [1] +... +s2:select() +--- +- - [1] +... +s1:drop() +--- +... +s2:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... test_run:cmd("stop server test") - | --- - | - true - | ... +--- +- true +... +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1_1 = stream_1.space.test_1 +--- +... +space_1_2 = stream_2.space.test_1 +--- +... +space_2_1 = stream_1.space.test_2 +--- +... +space_2_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +-- Simple read/write conflict. +space_1_1:select({1}) +--- +- [] +... +space_1_2:select({1}) +--- +- [] +... +space_1_1:replace({1, 1}) +--- +- [1, 1] +... +space_1_2:replace({1, 2}) +--- +- [1, 2] +... +stream_1:commit() +--- +... +-- This transaction fails, because of conflict +stream_2:commit() +--- +- error: Transaction has been aborted by conflict +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch("default") +--- +- true +... +-- Here we must accept [1, 1] +space_1_1:select({}) +--- +- - [1, 1] +... +space_1_2:select({}) +--- +- - [1, 1] +... +-- Same test for vinyl sapce +stream_1:begin() +--- +... +stream_2:begin() +--- +... +space_2_1:select({1}) +--- +- [] +... +space_2_2:select({1}) +--- +- [] +... +space_2_1:replace({1, 1}) +--- +- [1, 1] +... +space_2_2:replace({1, 2}) +--- +- [1, 2] +... +stream_1:commit() +--- +... +-- This transaction fails, because of conflict +stream_2:commit() +--- +- error: Transaction has been aborted by conflict +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch("default") +--- +- true +... +-- Here we must accept [1, 1] +space_2_1:select({}) +--- +- - [1, 1] +... +space_2_2:select({}) +--- +- - [1, 1] +... +test_run:switch('test') +--- +- true +... +-- Both select return tuple [1, 1], transaction commited +s1:select() +--- +- - [1, 1] +... +s2:select() +--- +- - [1, 1] +... +s1:drop() +--- +... +s2:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Check rollback as a command for memtx and vinyl spaces +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +-- Test rollback for memtx space +space_1:replace({1}) +--- +- [1] +... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) +--- +- - [1] +... +stream_1:rollback() +--- +... +-- Select is empty, transaction rollback +space_1:select({}) +--- +- [] +... +-- Test rollback for vinyl space +space_2:replace({1}) +--- +- [1] +... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_2:select({}) +--- +- - [1] +... +stream_2:rollback() +--- +... +-- Select is empty, transaction rollback +space_2:select({}) +--- +- [] +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after rollback +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch("default") +--- +- true +... +-- This is simple test is necessary because i have a bug +-- with halting stream after rollback +stream_1:begin() +--- +... +stream_1:commit() +--- +... +stream_2:begin() +--- +... +stream_2:commit() +--- +... +conn:close() +--- +... +test_run:switch('test') +--- +- true +... +-- Both select are empty, because transaction rollback +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +s1:drop() +--- +... +s2:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Check rollback on disconnect +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +space_1:replace({1}) +--- +- [1] +... +space_1:replace({2}) +--- +- [2] +... +-- Select return two previously inserted tuples +space_1:select({}) +--- +- - [1] + - [2] +... +space_2:replace({1}) +--- +- [1] +... +space_2:replace({2}) +--- +- [2] +... +-- Select return two previously inserted tuples +space_2:select({}) +--- +- - [1] + - [2] +... +conn:close() +--- +... +test_run:switch("test") +--- +- true +... +-- Empty selects, transaction was rollback +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch("default") +--- +- true +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +-- Reconnect +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() +--- +... +stream_2:begin() +--- +... +-- Two empty selects +space_1:select({}) +--- +- [] +... +space_2:select({}) +--- +- [] +... +stream_1:commit() +--- +... +stream_2:commit() +--- +... +test_run:switch('test') +--- +- true +... +-- Both select are empty, because transaction rollback +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +s1:drop() +--- +... +s2:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Check rollback on disconnect with big count of async requests +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +space_1:replace({1}) +--- +- [1] +... +space_1:replace({2}) +--- +- [2] +... +-- Select return two previously inserted tuples +space_1:select({}) +--- +- - [1] + - [2] +... +space_2:replace({1}) +--- +- [1] +... +space_2:replace({2}) +--- +- [2] +... +-- Select return two previously inserted tuples +space_2:select({}) +--- +- - [1] + - [2] +... +-- We send a large number of asynchronous requests, +-- their result is not important to us, it is important +-- that they will be in the stream queue at the time of +-- the disconnect. +test_run:cmd("setopt delimiter ';'") +--- +- true +... +for i = 1, 1000 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +fiber.sleep(0) +--- +... +conn:close() +--- +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); +--- +- true +... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); +--- +- true +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Select was empty, transaction rollbacked +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +test_run:switch("default") +--- +- true +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +-- Same test, but now we check that if `commit` was received +-- by server before connection closed, we processed it successful. +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +-- Here, for a large number of messages, we cannot guarantee their processing, +-- since if the net_msg_max limit is reached, we will stop processing incoming +-- requests, and after close, we will discard all raw data. '100' is the number +-- of messages that we can process without reaching net_msg_max. We will not try +-- any more, so as not to make a test flaky. +for i = 1, 100 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +_ = stream_1:commit({is_async = true}) +--- +... +_ = stream_2:commit({is_async = true}) +--- +... +fiber.sleep(0) +--- +... +conn:close() +--- +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); +--- +- true +... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); +--- +- true +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Select return tuples from [1] to [100], +-- transaction was commit +rc1 = s1:select() +--- +... +rc2 = s2:select() +--- +... +assert(#rc1) +--- +- 100 +... +assert(#rc2) +--- +- 100 +... +s1:truncate() +--- +... +s2:truncate() +--- +... +test_run:switch("default") +--- +- true +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +-- Reconnect +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() +--- +... +stream_2:begin() +--- +... +-- Two empty selects +space_1:select({}) +--- +- [] +... +space_2:select({}) +--- +- [] +... +stream_1:commit() +--- +... +stream_2:commit() +--- +... +test_run:switch('test') +--- +- true +... +-- Both select are empty, because transaction rollback +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +s1:drop() +--- +... +s2:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Check that all requests between `begin` and `commit` +-- have correct lsn and tsn values. During my work on the +-- patch, i see that all requests in stream comes with +-- header->is_commit == true, so if we are in transaction +-- in stream we should set this value to false, otherwise +-- during recovering `wal_stream_apply_dml_row` fails, because +-- of LSN/TSN mismatch. Here is a special test case for it. +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'memtx' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +space_1:replace({1}) +--- +- [1] +... +space_1:replace({2}) +--- +- [2] +... +space_2:replace({1}) +--- +- [1] +... +space_2:replace({2}) +--- +- [2] +... +stream_1:commit() +--- +... +stream_2:commit() +--- +... +test_run:switch('test') +--- +- true +... +-- Here we get two tuples, commit was successful +s1:select{} +--- +- - [1] + - [2] +... +-- Here we get two tuples, commit was successful +s2:select{} +--- +- - [1] + - [2] +... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +test_run:cmd("start server test with args='1, true'") +--- +- true +... +test_run:switch('test') +--- +- true +... +-- Here we get two tuples, commit was successful +box.space.test_1:select{} +--- +- - [1] + - [2] +... +-- Here we get two tuples, commit was successful +box.space.test_2:select{} +--- +- - [1] + - [2] +... +box.space.test_1:drop() +--- +... +box.space.test_2:drop() +--- +... +test_run:switch('default') +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Same transactions checks for async mode +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream_1 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +stream_2 = conn:new_stream() +--- +... +space_2 = stream_2.space.test_2 +--- +... +memtx_futures = {} +--- +... +memtx_futures["begin"] = stream_1:begin({is_async = true}) +--- +... +memtx_futures["replace"] = space_1:replace({1}, {is_async = true}) +--- +... +memtx_futures["insert"] = space_1:insert({2}, {is_async = true}) +--- +... +memtx_futures["select"] = space_1:select({}, {is_async = true}) +--- +... +vinyl_futures = {} +--- +... +vinyl_futures["begin"] = stream_2:begin({is_async = true}) +--- +... +vinyl_futures["replace"] = space_2:replace({1}, {is_async = true}) +--- +... +vinyl_futures["insert"] = space_2:insert({2}, {is_async = true}) +--- +... +vinyl_futures["select"] = space_2:select({}, {is_async = true}) +--- +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +memtx_futures["commit"] = stream_1:commit({is_async = true}) +--- +... +vinyl_futures["commit"] = stream_2:commit({is_async = true}) +--- +... +memtx_results = wait_and_return_results(memtx_futures) +--- +... +vinyl_results = wait_and_return_results(vinyl_futures) +--- +... +-- If begin was successful it return nil +assert(not memtx_results["begin"]) +--- +- true +... +assert(not vinyl_results["begin"]) +--- +- true +... +-- [1] +assert(memtx_results["replace"]) +--- +- [1] +... +assert(vinyl_results["replace"]) +--- +- [1] +... +-- [2] +assert(memtx_results["insert"]) +--- +- [2] +... +assert(vinyl_results["insert"]) +--- +- [2] +... +-- [1] [2] +assert(memtx_results["select"]) +--- +- - [1] + - [2] +... +assert(vinyl_results["select"]) +--- +- - [1] + - [2] +... +-- If commit was successful it return nil +assert(not memtx_results["commit"]) +--- +- true +... +assert(not vinyl_results["commit"]) +--- +- true +... +test_run:switch("test") +--- +- true +... +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() +--- +- - [1] + - [2] +... +s2:select() +--- +- - [1] + - [2] +... +s1:drop() +--- +... +s2:drop() +--- +... +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1_1 = stream_1.space.test_1 +--- +... +space_1_2 = stream_2.space.test_1 +--- +... +space_2_1 = stream_1.space.test_2 +--- +... +space_2_2 = stream_2.space.test_2 +--- +... +futures_1 = {} +--- +... +-- Simple read/write conflict. +futures_1["begin_1"] = stream_1:begin({is_async = true}) +--- +... +futures_1["begin_2"] = stream_2:begin({is_async = true}) +--- +... +futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true}) +--- +... +futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true}) +--- +... +futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true}) +--- +... +futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true}) +--- +... +futures_1["commit_1"] = stream_1:commit({is_async = true}) +--- +... +futures_1["commit_2"] = stream_2:commit({is_async = true}) +--- +... +futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true}) +--- +... +futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true}) +--- +... +results_1 = wait_and_return_results(futures_1) +--- +... +-- Successful begin return nil +assert(not results_1["begin_1"]) +--- +- true +... +assert(not results_1["begin_2"]) +--- +- true +... +-- [] +assert(not results_1["select_1_1"][1]) +--- +- true +... +assert(not results_1["select_1_2"][1]) +--- +- true +... +-- [1] +assert(results_1["replace_1_1"][1]) +--- +- 1 +... +-- [1] +assert(results_1["replace_1_1"][2]) +--- +- 1 +... +-- [1] +assert(results_1["replace_1_2"][1]) +--- +- 1 +... +-- [2] +assert(results_1["replace_1_2"][2]) +--- +- 2 +... +-- Successful commit return nil +assert(not results_1["commit_1"]) +--- +- true +... +-- Error because of transaction conflict +assert(results_1["commit_2"]) +--- +- Transaction has been aborted by conflict +... +-- [1, 1] +assert(results_1["select_1_1_A"][1]) +--- +- [1, 1] +... +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_1_2_A"][1] +futures_2 = {} +--- +... +-- Simple read/write conflict. +futures_2["begin_1"] = stream_1:begin({is_async = true}) +--- +... +futures_2["begin_2"] = stream_2:begin({is_async = true}) +--- +... +futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true}) +--- +... +futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true}) +--- +... +futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true}) +--- +... +futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true}) +--- +... +futures_2["commit_1"] = stream_1:commit({is_async = true}) +--- +... +futures_2["commit_2"] = stream_2:commit({is_async = true}) +--- +... +futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true}) +--- +... +futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true}) +--- +... +results_2 = wait_and_return_results(futures_2) +--- +... +-- Successful begin return nil +assert(not results_2["begin_1"]) +--- +- true +... +assert(not results_2["begin_2"]) +--- +- true +... +-- [] +assert(not results_2["select_2_1"][1]) +--- +- true +... +assert(not results_2["select_2_2"][1]) +--- +- true +... +-- [1] +assert(results_2["replace_2_1"][1]) +--- +- 1 +... +-- [1] +assert(results_2["replace_2_1"][2]) +--- +- 1 +... +-- [1] +assert(results_2["replace_2_2"][1]) +--- +- 1 +... +-- [2] +assert(results_2["replace_2_2"][2]) +--- +- 2 +... +-- Successful commit return nil +assert(not results_2["commit_1"]) +--- +- true +... +-- Error because of transaction conflict +assert(results_2["commit_2"]) +--- +- Transaction has been aborted by conflict +... +-- [1, 1] +assert(results_2["select_2_1_A"][1]) +--- +- [1, 1] +... +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_2_2_A"][1] +test_run:switch('test') +--- +- true +... +-- Both select return tuple [1, 1], transaction commited +s1:select() +--- +- - [1, 1] +... +s2:select() +--- +- - [1, 1] +... +s1:drop() +--- +... +s2:drop() +--- +... +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Checks for iproto call/eval/execute in stream +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s = box.schema.space.create('test', { engine = 'memtx' }) +--- +... +_ = s:create_index('primary') +--- +... +function ping() return "pong" end +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream = conn:new_stream() +--- +... +space = stream.space.test +--- +... +space_no_stream = conn.space.test +--- +... +-- successful begin using stream:call +stream:call('box.begin') +--- +... +-- error: Operation is not permitted when there is an active transaction +stream:eval('box.begin()') +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +-- error: Operation is not permitted when there is an active transaction +stream:begin() +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +-- error: Operation is not permitted when there is an active transaction +stream:execute('START TRANSACTION') +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +stream:call('ping') +--- +- pong +... +stream:eval('ping()') +--- +... +-- error: Operation is not permitted when there is an active transaction +stream:call('box.begin') +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +stream:eval('box.begin()') +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +-- successful commit using stream:call +stream:call('box.commit') +--- +... +-- successful begin using stream:eval +stream:eval('box.begin()') +--- +... +space:replace({1}) +--- +- [1] +... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} +--- +- [] +... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) +--- +- - [1] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +--Successful commit using stream:execute +stream:execute('COMMIT') +--- +- row_count: 0 +... +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_no_stream:select{} +--- +- - [1] +... +test_run:switch("test") +--- +- true +... +-- Select return tuple, because transaction was successful +s:select() +--- +- - [1] +... +s:delete{1} +--- +- [1] +... +test_run:switch('default') +--- +- true +... +-- Check rollback using stream:call +stream:begin() +--- +... +space:replace({2}) +--- +- [2] +... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} +--- +- [] +... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) +--- +- - [2] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +--Successful rollback using stream:call +stream:call('box.rollback') +--- +... +-- Empty selects transaction rollbacked +space:select({}) +--- +- [] +... +space_no_stream:select{} +--- +- [] +... +test_run:switch("test") +--- +- true +... +-- Empty select transaction rollbacked +s:select() +--- +- [] +... +s:drop() +--- +... +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Simple test which demostrates that stream immediately +-- destroyed, when no processing messages in stream and +-- no active transaction. +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s = box.schema.space.create('test', { engine = 'memtx' }) +--- +... +_ = s:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream = conn:new_stream() +--- +... +space = stream.space.test +--- +... +for i = 1, 10 do space:replace{i} end +--- +... +test_run:switch("test") +--- +- true +... +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +s:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Transaction tests for sql iproto requests. +-- All this functions are copy-paste from sql/ddl.test.lua, +-- except that they check sql transactions in streams +test_run:cmd("setopt delimiter '$'") +--- +- true +... +function execute_sql_string(stream, sql_string) + if stream then + stream:execute(sql_string) + else + box.execute(sql_string) + end +end$ +--- +... +function execute_sql_string_and_return_result(stream, sql_string) + if stream then + return pcall(stream.execute, stream, sql_string) + else + return box.execute(sql_string) + end +end$ +--- +... +function monster_ddl(stream) + local _, err1, err2, err3, err4, err5, err6 + local stream_or_box = stream or box + execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER);]]) + execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER UNIQUE, + CONSTRAINT ck1 + CHECK(b < 100));]]) + + execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);') + execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);') + + execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY + KEY, a INTEGER);]]) + + execute_sql_string(stream, 'DROP INDEX t2a ON t2;') + + execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1 + CHECK(b > 0);]]) + _, err1 = + execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename + RENAME TO t1;]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT + ck2 CHECK(a > 0);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + + _, err2 = + execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id + INTEGER PRIMARY KEY);]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + + execute_sql_string(stream, [[CREATE TABLE + trigger_catcher(id INTEGER PRIMARY + KEY AUTOINCREMENT);]]) + + execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;') + + execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;') + + execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON + t1 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;') + + execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON + t2 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err4 = + execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a + ON t1(a, b);]]) + + execute_sql_string(stream, 'TRUNCATE TABLE t1;') + _, err5 = + execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;') + _, err6 = + execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE + t_does_not_exist;]]) + + execute_sql_string(stream, 'DROP TRIGGER t2t;') + + return {'Finished ok, errors in the middle: ', err1, err2, err3, err4, + err5, err6} +end$ +--- +... +function monster_ddl_cmp_res(res1, res2) + if json.encode(res1) == json.encode(res2) then + return true + end + return res1, res2 +end$ +--- +... +function monster_ddl_is_clean(stream) + local stream_or_box = stream or box + assert(stream_or_box.space.T1 == nil) + assert(stream_or_box.space.T2 == nil) + assert(stream_or_box.space._trigger:count() == 0) + assert(stream_or_box.space._fk_constraint:count() == 0) + assert(stream_or_box.space._ck_constraint:count() == 0) + assert(stream_or_box.space.T_RENAMED == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) +end$ +--- +... +function monster_ddl_check(stream) + local _, err1, err2, err3, err4, res + local stream_or_box = stream or box + _, err1 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES (1, 1, 101)]]) + execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)') + _, err2 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES(2, 2, 1)]]) + _, err3 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, 20, 1)]]) + _, err4 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, -1, 1)]]) + execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)') + if not stream then + assert(stream_or_box.space.T_RENAMED ~= nil) + assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) + res = execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + else + _, res = + execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + end + return {'Finished ok, errors and trigger catcher content: ', err1, err2, + err3, err4, res} +end$ +--- +... +function monster_ddl_clear(stream) + execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;') + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t2') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t1') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed') +end$ +--- +... +test_run:cmd("setopt delimiter ''")$ +--- +- true +... +test_run:cmd("start server test with args='10, true'") +--- +- true +... +test_run:switch('test') +--- +- true +... +test_run:cmd("setopt delimiter '$'") +--- +- true +... +function monster_ddl_is_clean() + if not (box.space.T1 == nil) or + not (box.space.T2 == nil) or + not (box.space._trigger:count() == 0) or + not (box.space._fk_constraint:count() == 0) or + not (box.space._ck_constraint:count() == 0) or + not (box.space.T_RENAMED == nil) or + not (box.space.T_TO_RENAME == nil) then + return false + end + return true +end$ +--- +... +test_run:cmd("setopt delimiter ''")$ +--- +- true +... +test_run:switch('default') +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +conn = net_box.connect(server_addr) +--- +... +stream = conn:new_stream() +--- +... +-- No txn. +true_ddl_res = monster_ddl() +--- +... +true_ddl_res +--- +- - 'Finished ok, errors in the middle: ' + - Space 'T1' already exists + - Space 'T1' already exists + - Space 'T3' does not exist + - Index 'T1A' already exists in space 'T1' + - 'Failed to execute SQL statement: can not truncate space ''T2'' because other + objects depend on it' + - Space 'T_DOES_NOT_EXIST' does not exist +... +true_check_res = monster_ddl_check() +--- +... +true_check_res +--- +- - 'Finished ok, errors and trigger catcher content: ' + - 'Check constraint failed ''CK1'': b < 100' + - Duplicate key exists in unique index "unique_unnamed_T2_2" in space "T2" with + old tuple - [1, 1, 1] and new tuple - [2, 2, 1] + - 'Failed to execute SQL statement: FOREIGN KEY constraint failed' + - 'Check constraint failed ''CK2'': a > 0' + - metadata: + - name: ID + type: integer + rows: + - [1] +... +monster_ddl_clear() +--- +... +monster_ddl_is_clean() +--- +... +-- Both DDL and cleanup in one txn in stream. +ddl_res = nil +--- +... +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +ddl_res = monster_ddl(stream) +--- +... +monster_ddl_clear(stream) +--- +... +stream:call('monster_ddl_is_clean') +--- +- true +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +monster_ddl_cmp_res(ddl_res, true_ddl_res) +--- +- true +... +-- DDL in txn, cleanup is not. +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +ddl_res = monster_ddl(stream) +--- +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +monster_ddl_cmp_res(ddl_res, true_ddl_res) +--- +- true +... +check_res = monster_ddl_check(stream) +--- +... +monster_ddl_cmp_res(check_res, true_check_res) +--- +- true +... +monster_ddl_clear(stream) +--- +... +stream:call('monster_ddl_is_clean') +--- +- true +... +-- DDL is not in txn, cleanup is. +ddl_res = monster_ddl(stream) +--- +... +monster_ddl_cmp_res(ddl_res, true_ddl_res) +--- +- true +... +check_res = monster_ddl_check(stream) +--- +... +monster_ddl_cmp_res(check_res, true_check_res) +--- +- true +... +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +monster_ddl_clear(stream) +--- +... +stream:call('monster_ddl_is_clean') +--- +- true +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +-- DDL and cleanup in separate txns. +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +ddl_res = monster_ddl(stream) +--- +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +monster_ddl_cmp_res(ddl_res, true_ddl_res) +--- +- true +... +check_res = monster_ddl_check(stream) +--- +... +monster_ddl_cmp_res(check_res, true_check_res) +--- +- true +... +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +monster_ddl_clear(stream) +--- +... +stream:call('monster_ddl_is_clean') +--- +- true +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +test_run:switch("test") +--- +- true +... +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +-- Check for prepare and unprepare functions +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream = conn:new_stream() +--- +... +stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)') +--- +- row_count: 1 +... +-- reload schema +stream:ping() +--- +- true +... +space = stream.space.TEST +--- +... +assert(space ~= nil) +--- +- true +... +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +space:replace{1, 2, '3'} +--- +- [1, 2, '3'] +... +space:select() +--- +- - [1, 2, '3'] +... +-- select is empty, because transaction was not commited +conn.space.TEST:select() +--- +- [] +... +stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") +--- +... +conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") +--- +... +assert(stream_pr.stmt_id == conn_pr.stmt_id) +--- +- true +... +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) +--- +- metadata: + - name: ID + type: integer + - name: A + type: number + - name: B + type: string + rows: + - [1, 2, '3'] +... +-- empty select, transaction was not commited +conn:execute(conn_pr.stmt_id, {1, 2}) +--- +- metadata: + - name: ID + type: integer + - name: A + type: number + - name: B + type: string + rows: [] +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) +--- +- metadata: + - name: ID + type: integer + - name: A + type: number + - name: B + type: string + rows: + - [1, 2, '3'] +... +-- [ 1, 2, '3' ] +conn:execute(conn_pr.stmt_id, {1, 2}) +--- +- metadata: + - name: ID + type: integer + - name: A + type: number + - name: B + type: string + rows: + - [1, 2, '3'] +... +stream:unprepare(stream_pr.stmt_id) +--- +- null +... +conn:close() +--- +... +test_run:switch('test') +--- +- true +... +-- [ 1, 2, '3' ] +box.space.TEST:select() +--- +- - [1, 2, '3'] +... +box.space.TEST:drop() +--- +... +test_run:switch('default') +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... test_run:cmd("cleanup server test") - | --- - | - true - | ... +--- +- true +... test_run:cmd("delete server test") - | --- - | - true - | ... +--- +- true +... diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua index 72129a228..f99c16c0d 100644 --- a/test/box/stream.test.lua +++ b/test/box/stream.test.lua @@ -1,6 +1,8 @@ -- This test checks streams iplementation in iproto (gh-5860). net_box = require('net.box') +json = require('json') fiber = require('fiber') +msgpack = require('msgpack') test_run = require('test_run').new() test_run:cmd("create server test with script='box/stream.lua'") @@ -45,6 +47,63 @@ stream = conn:new_stream() -- The new method `new_stream`, for the stream object, returns a new -- stream object, just as in the case of connection. _ = stream:new_stream() + +-- Simple checks for transactions +conn_1 = net_box.connect(server_addr) +conn_2 = net_box.connect(server_addr) +stream_1_1 = conn_1:new_stream() +stream_1_2 = conn_1:new_stream() +stream_2 = conn_2:new_stream() +-- It's ok to commit or rollback without any active transaction +stream_1_1:commit() +stream_1_1:rollback() + +stream_1_1:begin() +-- Error unable to start second transaction in one stream +stream_1_1:begin() +-- It's ok to start transaction in separate stream in one connection +stream_1_2:begin() +-- It's ok to start transaction in separate stream in other connection +stream_2:begin() +test_run:switch("test") +-- It's ok to start local transaction separately with active stream +-- transactions +box.begin() +box.commit() +test_run:switch("default") +stream_1_1:commit() +stream_1_2:commit() +stream_2:commit() + +-- Check unsupported requests +conn = net_box.connect(server_addr) +assert(conn:ping()) +-- Begin, commit and rollback supported only for streams +conn:_request(net_box._method.begin, nil, nil, nil) +conn:_request(net_box._method.commit, nil, nil, nil) +conn:_request(net_box._method.rollback, nil, nil, nil) +-- Not all requests supported by stream. +stream = conn:new_stream() +-- Start transaction to allocate stream object on the +-- server side +stream:begin() +IPROTO_REQUEST_TYPE = 0x00 +IPROTO_SYNC = 0x01 +IPROTO_AUTH = 7 +IPROTO_STREAM_ID = 0x0a +next_request_id = 9 +test_run:cmd("setopt delimiter ';'") +header = msgpack.encode({ + [IPROTO_REQUEST_TYPE] = IPROTO_AUTH, + [IPROTO_SYNC] = next_request_id, + [IPROTO_STREAM_ID] = 1, +}); +body = msgpack.encode({nil}); +size = msgpack.encode(header:len() + body:len()); +conn._transport.perform_request(nil, nil, false, net_box._method.inject, + nil, nil, nil, nil, + size .. header .. body); +test_run:cmd("setopt delimiter ''"); conn:close() -- Check that spaces in stream object updates, during reload_schema @@ -178,5 +237,1148 @@ assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) test_run:switch("default") test_run:cmd("stop server test") +-- Second argument (false is a value for memtx_use_mvcc_engine option) +-- Server start without active transaction manager, so all transaction +-- fails because of yeild! +test_run:cmd("start server test with args='10, false'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() +space = stream.space.test + +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = false +stream:begin() +test_run:switch('test') +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1) +test_run:switch('default') +space:replace({1}) +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space:select{} +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:commit() +-- Select is empty, transaction was aborted +space:select{} +-- Check that after failed transaction commit we able to start next +-- transaction (it's strange check, but it's necessary because it was +-- bug with it) +stream:begin() +stream:ping() +stream:commit() +-- Same checks for `call` end `eval` functions. +stream:call('box.begin') +stream:call('s:replace', {{1}}) +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +stream:call('s:select', {}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:eval('box.commit()') +-- Select is empty, transaction was aborted +space:select{} + +-- Same checks for `execute` function which can also +-- begin and commit transaction. +stream:execute('START TRANSACTION') +stream:call('s:replace', {{1}}) +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +stream:call('s:select', {}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:execute('COMMIT') +-- Select is empty, transaction was aborted +space:select{} + +test_run:switch('test') +s:drop() +-- Check that there are no streams and messages, which +-- was not deleted +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +stream:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Next we check transactions only for memtx with +-- memtx_use_mvcc_engine = true and for vinyl, because +-- if memtx_use_mvcc_engine = false all transactions fails, +-- as we can see before! + +-- Second argument (true is a value for memtx_use_mvcc_engine option) +-- Same test case as previous but server start with active transaction +-- manager. Also check vinyl, because it's behaviour is same. +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s1:create_index('primary') +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +-- Spaces getting from connection, not from stream has no stream_id +-- and not belongs to stream +space_1_no_stream = conn.space.test_1 +space_2_no_stream = conn.space.test_2 +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = true and to vinyl: +-- behaviour is same! +stream_1:begin() +space_1:replace({1}) +stream_2:begin() +space_2:replace({1}) +test_run:switch('test') +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2) +test_run:switch('default') +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_1_no_stream:select{} +space_2_no_stream:select{} +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) +space_2:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s1:select() +s2:select() +test_run:switch('default') +-- Commit was successful, transaction can yeild with +-- memtx_use_mvcc_engine = true. Vinyl transactions +-- can yeild also. +stream_1:commit() +stream_2:commit() +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") + +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_1:select{} +space_2:select{} +test_run:switch("test") +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1_1 = stream_1.space.test_1 +space_1_2 = stream_2.space.test_1 +space_2_1 = stream_1.space.test_2 +space_2_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +-- Simple read/write conflict. +space_1_1:select({1}) +space_1_2:select({1}) +space_1_1:replace({1, 1}) +space_1_2:replace({1, 2}) +stream_1:commit() +-- This transaction fails, because of conflict +stream_2:commit() +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +-- Here we must accept [1, 1] +space_1_1:select({}) +space_1_2:select({}) + +-- Same test for vinyl sapce +stream_1:begin() +stream_2:begin() +space_2_1:select({1}) +space_2_2:select({1}) +space_2_1:replace({1, 1}) +space_2_2:replace({1, 2}) +stream_1:commit() +-- This transaction fails, because of conflict +stream_2:commit() +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +-- Here we must accept [1, 1] +space_2_1:select({}) +space_2_2:select({}) + +test_run:switch('test') +-- Both select return tuple [1, 1], transaction commited +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check rollback as a command for memtx and vinyl spaces +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +-- Test rollback for memtx space +space_1:replace({1}) +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) +stream_1:rollback() +-- Select is empty, transaction rollback +space_1:select({}) + +-- Test rollback for vinyl space +space_2:replace({1}) +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_2:select({}) +stream_2:rollback() +-- Select is empty, transaction rollback +space_2:select({}) + +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after rollback +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") + +-- This is simple test is necessary because i have a bug +-- with halting stream after rollback +stream_1:begin() +stream_1:commit() +stream_2:begin() +stream_2:commit() +conn:close() + +test_run:switch('test') +-- Both select are empty, because transaction rollback +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check rollback on disconnect +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +space_1:replace({1}) +space_1:replace({2}) +-- Select return two previously inserted tuples +space_1:select({}) + +space_2:replace({1}) +space_2:replace({2}) +-- Select return two previously inserted tuples +space_2:select({}) +conn:close() + +test_run:switch("test") +-- Empty selects, transaction was rollback +s1:select() +s2:select() +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + +-- Reconnect +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() +stream_2:begin() +-- Two empty selects +space_1:select({}) +space_2:select({}) +stream_1:commit() +stream_2:commit() + +test_run:switch('test') +-- Both select are empty, because transaction rollback +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check rollback on disconnect with big count of async requests +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +space_1:replace({1}) +space_1:replace({2}) +-- Select return two previously inserted tuples +space_1:select({}) + +space_2:replace({1}) +space_2:replace({2}) +-- Select return two previously inserted tuples +space_2:select({}) +-- We send a large number of asynchronous requests, +-- their result is not important to us, it is important +-- that they will be in the stream queue at the time of +-- the disconnect. +test_run:cmd("setopt delimiter ';'") +for i = 1, 1000 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; +test_run:cmd("setopt delimiter ''"); +fiber.sleep(0) +conn:close() + +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +test_run:cmd("setopt delimiter ';'") +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); +test_run:cmd("setopt delimiter ''"); +-- Select was empty, transaction rollbacked +s1:select() +s2:select() +test_run:switch("default") +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + +-- Same test, but now we check that if `commit` was received +-- by server before connection closed, we processed it successful. +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() +test_run:cmd("setopt delimiter ';'") +-- Here, for a large number of messages, we cannot guarantee their processing, +-- since if the net_msg_max limit is reached, we will stop processing incoming +-- requests, and after close, we will discard all raw data. '100' is the number +-- of messages that we can process without reaching net_msg_max. We will not try +-- any more, so as not to make a test flaky. +for i = 1, 100 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; +test_run:cmd("setopt delimiter ''"); +_ = stream_1:commit({is_async = true}) +_ = stream_2:commit({is_async = true}) +fiber.sleep(0) +conn:close() + +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +test_run:cmd("setopt delimiter ';'") +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); +test_run:cmd("setopt delimiter ''"); +-- Select return tuples from [1] to [100], +-- transaction was commit +rc1 = s1:select() +rc2 = s2:select() +assert(#rc1) +assert(#rc2) +s1:truncate() +s2:truncate() +test_run:switch("default") +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + +-- Reconnect +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() +stream_2:begin() +-- Two empty selects +space_1:select({}) +space_2:select({}) +stream_1:commit() +stream_2:commit() + +test_run:switch('test') +-- Both select are empty, because transaction rollback +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check that all requests between `begin` and `commit` +-- have correct lsn and tsn values. During my work on the +-- patch, i see that all requests in stream comes with +-- header->is_commit == true, so if we are in transaction +-- in stream we should set this value to false, otherwise +-- during recovering `wal_stream_apply_dml_row` fails, because +-- of LSN/TSN mismatch. Here is a special test case for it. +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'memtx' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 + +stream_1:begin() +stream_2:begin() +space_1:replace({1}) +space_1:replace({2}) +space_2:replace({1}) +space_2:replace({2}) +stream_1:commit() +stream_2:commit() + +test_run:switch('test') +-- Here we get two tuples, commit was successful +s1:select{} +-- Here we get two tuples, commit was successful +s2:select{} +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +test_run:cmd("start server test with args='1, true'") +test_run:switch('test') +-- Here we get two tuples, commit was successful +box.space.test_1:select{} +-- Here we get two tuples, commit was successful +box.space.test_2:select{} +box.space.test_1:drop() +box.space.test_2:drop() +test_run:switch('default') +test_run:cmd("stop server test") + +-- Same transactions checks for async mode +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream_1 = conn:new_stream() +space_1 = stream_1.space.test_1 +stream_2 = conn:new_stream() +space_2 = stream_2.space.test_2 + +memtx_futures = {} +memtx_futures["begin"] = stream_1:begin({is_async = true}) +memtx_futures["replace"] = space_1:replace({1}, {is_async = true}) +memtx_futures["insert"] = space_1:insert({2}, {is_async = true}) +memtx_futures["select"] = space_1:select({}, {is_async = true}) + +vinyl_futures = {} +vinyl_futures["begin"] = stream_2:begin({is_async = true}) +vinyl_futures["replace"] = space_2:replace({1}, {is_async = true}) +vinyl_futures["insert"] = space_2:insert({2}, {is_async = true}) +vinyl_futures["select"] = space_2:select({}, {is_async = true}) + +test_run:switch("test") +-- Select is empty, transaction was not commited +s1:select() +s2:select() +test_run:switch('default') +memtx_futures["commit"] = stream_1:commit({is_async = true}) +vinyl_futures["commit"] = stream_2:commit({is_async = true}) + +memtx_results = wait_and_return_results(memtx_futures) +vinyl_results = wait_and_return_results(vinyl_futures) +-- If begin was successful it return nil +assert(not memtx_results["begin"]) +assert(not vinyl_results["begin"]) +-- [1] +assert(memtx_results["replace"]) +assert(vinyl_results["replace"]) +-- [2] +assert(memtx_results["insert"]) +assert(vinyl_results["insert"]) +-- [1] [2] +assert(memtx_results["select"]) +assert(vinyl_results["select"]) +-- If commit was successful it return nil +assert(not memtx_results["commit"]) +assert(not vinyl_results["commit"]) + +test_run:switch("test") +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() +s2:select() +s1:drop() +s2:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1_1 = stream_1.space.test_1 +space_1_2 = stream_2.space.test_1 +space_2_1 = stream_1.space.test_2 +space_2_2 = stream_2.space.test_2 + +futures_1 = {} +-- Simple read/write conflict. +futures_1["begin_1"] = stream_1:begin({is_async = true}) +futures_1["begin_2"] = stream_2:begin({is_async = true}) +futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true}) +futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true}) +futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true}) +futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true}) +futures_1["commit_1"] = stream_1:commit({is_async = true}) +futures_1["commit_2"] = stream_2:commit({is_async = true}) +futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true}) +futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true}) + +results_1 = wait_and_return_results(futures_1) +-- Successful begin return nil +assert(not results_1["begin_1"]) +assert(not results_1["begin_2"]) +-- [] +assert(not results_1["select_1_1"][1]) +assert(not results_1["select_1_2"][1]) +-- [1] +assert(results_1["replace_1_1"][1]) +-- [1] +assert(results_1["replace_1_1"][2]) +-- [1] +assert(results_1["replace_1_2"][1]) +-- [2] +assert(results_1["replace_1_2"][2]) +-- Successful commit return nil +assert(not results_1["commit_1"]) +-- Error because of transaction conflict +assert(results_1["commit_2"]) +-- [1, 1] +assert(results_1["select_1_1_A"][1]) +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_1_2_A"][1] + +futures_2 = {} +-- Simple read/write conflict. +futures_2["begin_1"] = stream_1:begin({is_async = true}) +futures_2["begin_2"] = stream_2:begin({is_async = true}) +futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true}) +futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true}) +futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true}) +futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true}) +futures_2["commit_1"] = stream_1:commit({is_async = true}) +futures_2["commit_2"] = stream_2:commit({is_async = true}) +futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true}) +futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true}) + +results_2 = wait_and_return_results(futures_2) +-- Successful begin return nil +assert(not results_2["begin_1"]) +assert(not results_2["begin_2"]) +-- [] +assert(not results_2["select_2_1"][1]) +assert(not results_2["select_2_2"][1]) +-- [1] +assert(results_2["replace_2_1"][1]) +-- [1] +assert(results_2["replace_2_1"][2]) +-- [1] +assert(results_2["replace_2_2"][1]) +-- [2] +assert(results_2["replace_2_2"][2]) +-- Successful commit return nil +assert(not results_2["commit_1"]) +-- Error because of transaction conflict +assert(results_2["commit_2"]) +-- [1, 1] +assert(results_2["select_2_1_A"][1]) +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_2_2_A"][1] + +test_run:switch('test') +-- Both select return tuple [1, 1], transaction commited +s1:select() +s2:select() +s1:drop() +s2:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Checks for iproto call/eval/execute in stream +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +test_run:switch("test") +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +function ping() return "pong" end +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() +space = stream.space.test +space_no_stream = conn.space.test + +-- successful begin using stream:call +stream:call('box.begin') +-- error: Operation is not permitted when there is an active transaction +stream:eval('box.begin()') +-- error: Operation is not permitted when there is an active transaction +stream:begin() +-- error: Operation is not permitted when there is an active transaction +stream:execute('START TRANSACTION') +stream:call('ping') +stream:eval('ping()') +-- error: Operation is not permitted when there is an active transaction +stream:call('box.begin') +stream:eval('box.begin()') +-- successful commit using stream:call +stream:call('box.commit') + +-- successful begin using stream:eval +stream:eval('box.begin()') +space:replace({1}) +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +--Successful commit using stream:execute +stream:execute('COMMIT') +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_no_stream:select{} +test_run:switch("test") +-- Select return tuple, because transaction was successful +s:select() +s:delete{1} +test_run:switch('default') +-- Check rollback using stream:call +stream:begin() +space:replace({2}) +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +--Successful rollback using stream:call +stream:call('box.rollback') +-- Empty selects transaction rollbacked +space:select({}) +space_no_stream:select{} +test_run:switch("test") +-- Empty select transaction rollbacked +s:select() +s:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Simple test which demostrates that stream immediately +-- destroyed, when no processing messages in stream and +-- no active transaction. + +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +test_run:switch("test") +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() +space = stream.space.test +for i = 1, 10 do space:replace{i} end +test_run:switch("test") +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +s:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Transaction tests for sql iproto requests. +-- All this functions are copy-paste from sql/ddl.test.lua, +-- except that they check sql transactions in streams +test_run:cmd("setopt delimiter '$'") +function execute_sql_string(stream, sql_string) + if stream then + stream:execute(sql_string) + else + box.execute(sql_string) + end +end$ +function execute_sql_string_and_return_result(stream, sql_string) + if stream then + return pcall(stream.execute, stream, sql_string) + else + return box.execute(sql_string) + end +end$ +function monster_ddl(stream) + local _, err1, err2, err3, err4, err5, err6 + local stream_or_box = stream or box + execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER);]]) + execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER UNIQUE, + CONSTRAINT ck1 + CHECK(b < 100));]]) + + execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);') + execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);') + + execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY + KEY, a INTEGER);]]) + + execute_sql_string(stream, 'DROP INDEX t2a ON t2;') + + execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1 + CHECK(b > 0);]]) + + _, err1 = + execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename + RENAME TO t1;]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT + ck2 CHECK(a > 0);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + + _, err2 = + execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id + INTEGER PRIMARY KEY);]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + + execute_sql_string(stream, [[CREATE TABLE + trigger_catcher(id INTEGER PRIMARY + KEY AUTOINCREMENT);]]) + + execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;') + + execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;') + + execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON + t1 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;') + + execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON + t2 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err4 = + execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a + ON t1(a, b);]]) + + execute_sql_string(stream, 'TRUNCATE TABLE t1;') + _, err5 = + execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;') + _, err6 = + execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE + t_does_not_exist;]]) + + execute_sql_string(stream, 'DROP TRIGGER t2t;') + + return {'Finished ok, errors in the middle: ', err1, err2, err3, err4, + err5, err6} +end$ +function monster_ddl_cmp_res(res1, res2) + if json.encode(res1) == json.encode(res2) then + return true + end + return res1, res2 +end$ +function monster_ddl_is_clean(stream) + local stream_or_box = stream or box + assert(stream_or_box.space.T1 == nil) + assert(stream_or_box.space.T2 == nil) + assert(stream_or_box.space._trigger:count() == 0) + assert(stream_or_box.space._fk_constraint:count() == 0) + assert(stream_or_box.space._ck_constraint:count() == 0) + assert(stream_or_box.space.T_RENAMED == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) +end$ +function monster_ddl_check(stream) + local _, err1, err2, err3, err4, res + local stream_or_box = stream or box + _, err1 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES (1, 1, 101)]]) + execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)') + _, err2 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES(2, 2, 1)]]) + _, err3 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, 20, 1)]]) + _, err4 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, -1, 1)]]) + execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)') + if not stream then + assert(stream_or_box.space.T_RENAMED ~= nil) + assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) + res = execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + else + _, res = + execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + end + return {'Finished ok, errors and trigger catcher content: ', err1, err2, + err3, err4, res} +end$ +function monster_ddl_clear(stream) + execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;') + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t2') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t1') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed') +end$ +test_run:cmd("setopt delimiter ''")$ + +test_run:cmd("start server test with args='10, true'") +test_run:switch('test') +test_run:cmd("setopt delimiter '$'") +function monster_ddl_is_clean() + if not (box.space.T1 == nil) or + not (box.space.T2 == nil) or + not (box.space._trigger:count() == 0) or + not (box.space._fk_constraint:count() == 0) or + not (box.space._ck_constraint:count() == 0) or + not (box.space.T_RENAMED == nil) or + not (box.space.T_TO_RENAME == nil) then + return false + end + return true +end$ +test_run:cmd("setopt delimiter ''")$ +test_run:switch('default') + +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +conn = net_box.connect(server_addr) +stream = conn:new_stream() + +-- No txn. +true_ddl_res = monster_ddl() +true_ddl_res + +true_check_res = monster_ddl_check() +true_check_res + +monster_ddl_clear() +monster_ddl_is_clean() + +-- Both DDL and cleanup in one txn in stream. +ddl_res = nil +stream:execute('START TRANSACTION') +ddl_res = monster_ddl(stream) +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') +stream:execute('COMMIT') +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +-- DDL in txn, cleanup is not. +stream:execute('START TRANSACTION') +ddl_res = monster_ddl(stream) +stream:execute('COMMIT') +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +check_res = monster_ddl_check(stream) +monster_ddl_cmp_res(check_res, true_check_res) + +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') + +-- DDL is not in txn, cleanup is. +ddl_res = monster_ddl(stream) +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +check_res = monster_ddl_check(stream) +monster_ddl_cmp_res(check_res, true_check_res) + +stream:execute('START TRANSACTION') +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') +stream:execute('COMMIT') + +-- DDL and cleanup in separate txns. +stream:execute('START TRANSACTION') +ddl_res = monster_ddl(stream) +stream:execute('COMMIT') +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +check_res = monster_ddl_check(stream) +monster_ddl_cmp_res(check_res, true_check_res) + +stream:execute('START TRANSACTION') +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') +stream:execute('COMMIT') + +test_run:switch("test") +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + + +-- Check for prepare and unprepare functions +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() + +stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)') +-- reload schema +stream:ping() +space = stream.space.TEST +assert(space ~= nil) +stream:execute('START TRANSACTION') +space:replace{1, 2, '3'} +space:select() +-- select is empty, because transaction was not commited +conn.space.TEST:select() +stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") +conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") +assert(stream_pr.stmt_id == conn_pr.stmt_id) +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) +-- empty select, transaction was not commited +conn:execute(conn_pr.stmt_id, {1, 2}) +stream:execute('COMMIT') +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) +-- [ 1, 2, '3' ] +conn:execute(conn_pr.stmt_id, {1, 2}) +stream:unprepare(stream_pr.stmt_id) +conn:close() +test_run:switch('test') +-- [ 1, 2, '3' ] +box.space.TEST:select() +box.space.TEST:drop() +test_run:switch('default') +test_run:cmd("stop server test") + test_run:cmd("cleanup server test") test_run:cmd("delete server test") -- 2.20.1