From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 58973741D7; Thu, 5 Aug 2021 21:21:19 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 58973741D7 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628187679; bh=OgrjZwycXbx4x69FFjrgVA/RqG2/gs1imaUgG//SS7s=; h=To:Cc:Date:In-Reply-To:References:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=dCtg5syLOHZbqKEqbStqnVx1+PhaTjTfzQuN+ge9EcRUMkhG9k+x6l6qnafMTjgAt ClHihGm1B8/cUU+qEOSOhDw9qHjb2ptYHlK+ZhX0/G3o/5sUVxpRilw8SjTEgOFkVb 7P9STTS77rWc3RQYV/YpfmVErW6LSJEiLJRNyJO0= Received: from smtp53.i.mail.ru (smtp53.i.mail.ru [94.100.177.113]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C4F9A741D9 for ; Thu, 5 Aug 2021 21:17:58 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org C4F9A741D9 Received: by smtp53.i.mail.ru with esmtpa (envelope-from ) id 1mBhwL-0005CS-5U; Thu, 05 Aug 2021 21:17:57 +0300 To: v.shpilevoy@tarantool.org, vdavydov@tarantool.org Cc: tarantool-patches@dev.tarantool.org, mechanik20051988 Date: Thu, 5 Aug 2021 21:17:45 +0300 Message-Id: X-Mailer: git-send-email 2.20.1 In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD906AB4890CDABF0C5CB76CEE71D3E4007182A05F5380850403D5D695A5CF45DC5DF9CF6549492A0C9EBDF13C1C4692EC8BCF72E3DBC04AF3F X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE711269A7C2F827F16EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F790063706922F90966A37BA8638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D86186ED886A64ACA2B8EFC19A1554D4DF117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCECD08F8D939B2CE4A471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F44604297287769387670735207B96B19DC4093321BDFBBEFFF4125B51D2E47CDBA5A96583BA9C0B312567BB2376E601842F6C81A19E625A9149C048EEC8105B04EFE07628E0F2381F647739FAD8FC6C240DEA7642DBF02ECDB25306B2B78CF848AE20165D0A6AB1C7CE11FEE3672DC5A730DF09D22D242C3BD2E3F4C6C4224003CC836476EA7A3FFF5B025636E2021AF6380DFAD1A18204E546F3947CB11811A4A51E3B096D1867E19FE1407959CC434672EE6371089D37D7C0E48F6C8AA50765F790063780E7E366B0FF8F58EFF80C71ABB335746BA297DBC24807EABDAD6C7F3747799A X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8183A4AFAF3EA6BDC44F3F687384632F7D24A556CA68DC7B18041EFDCD01A0768AE731D7B0FDADE953BB1881A6453793CE9C32612AADDFBE061C61BE10805914D3804EBA3D8E7E5B87ABF8C51168CD8EBDB00E8CE3DD197987DDC48ACC2A39D04F89CDFB48F4795C241BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D343FB425EC7F4D4A4BA4A58A2C47D34502C104A649997DEFC66E6080D8D8046DD6120BB87A1E4A78EB1D7E09C32AA3244C17F9E91116AFA7B905CB8B49A07A0FBB408A6A02710B7304927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojh4v93/7HD3WSCvT6t3tNQg== X-Mailru-Sender: 583F1D7ACE8F49BD29FC049B2A5BF963272C3B768E89E9F113B0564DF997F6892F70427E7AAE2106B79567116EAC6FCF4E830D9205DBEA545646F0D3C63A617F27ACC94E9A535D22112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: [Tarantool-patches] [PATCH 7/7] net.box: add interactive transaction support in net.box X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: mechanik20051988 via Tarantool-patches Reply-To: mechanik20051988 Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" From: mechanik20051988 Implement `begin`, `commit` and `rollback` methods for stream object in `net.box`, which allows to begin, commit and rollback transaction accordingly. Closes #5860 @TarantoolBot document Title: add interactive transaction support in net.box Implement `begin`, `commit` and `rollback` methods for stream object in `net.box`, which allows to begin, commit and rollback transaction accordingly. Now there are multiple ways to begin, commit and rollback transaction from `net.box`: using appropriate stream methods, using 'call` or 'eval' methods or using `execute` method with sql transaction syntax. User can mix these methods, for example, start transaction using `stream:begin()`, and commit transaction using `stream:call('box.commit')` or stream:execute('COMMIT'). Simple example of using interactive transactions via iproto from net.box: ```lua stream = conn:stream() space = stream.space.test space_not_from_stream = conn.space.test stream:begin() space:replace({1}) -- return previously inserted tuple, because request -- belongs to transaction. space:select({}) -- empty select, because select doesn't belongs to -- transaction space_not_from_stream:select({}) stream:call('box.commit') -- now transaction was commited, so all requests -- returns tuple. ``` Different examples of using streams you can find in gh-5860-implement-streams-in-iproto.test.lua --- .../gh-5860-implement-streams-in-iproto.md | 28 + src/box/lua/net_box.c | 51 +- src/box/lua/net_box.lua | 50 +- test/box/stream.result | 3036 +++++++++++++++++ test/box/stream.test.lua | 1201 +++++++ 5 files changed, 4358 insertions(+), 8 deletions(-) create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md new file mode 100644 index 000000000..d0f1359dd --- /dev/null +++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md @@ -0,0 +1,28 @@ +## feature/core + +* Streams and interactive transactions over streams are implemented + in iproto. Stream is associated with it's ID, which is unique within + one connection. All requests with same not zero stream ID belongs to + the same stream. All requests in stream processed synchronously. The + execution of the next request will not start until the previous one + is completed. If request has zero stream ID it does not belong to stream + and is processed in the old way. + In `net.box`, stream is an object above connection that has the same + methods, but allows to execute requests sequentially. ID is generated + on the client side in two ways: automatically or manually. User can + choose any of two methods, but can not mix them. If user writes his + own connector and wants to use streams, he must transmit stream_id over + the iproto protocol. + The main purpose of streams is transactions via iproto. Each stream + can start its own transaction, so they allows multiplexing several + transactions over one connection. There are multiple ways to begin, + commit and rollback transaction: using appropriate stream methods, using + `call` or `eval` methods or using `execute` method with sql transaction + syntax. User can mix these methods, for example, start transaction using + `stream:begin()`, and commit transaction using `stream:call('box.commit')` + or stream:execute('COMMIT'). + If any request fails during the transaction, it will not affect the other + requests in the transaction. If disconnect occurs when there is some active + transaction in stream, this transaction will be rollbacked, if it does not + have time to commit before this moment. + diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index ec850cd9f..e7b95ba84 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -70,7 +70,10 @@ enum netbox_method { NETBOX_MIN = 14, NETBOX_MAX = 15, NETBOX_COUNT = 16, - NETBOX_INJECT = 17, + NETBOX_BEGIN = 17, + NETBOX_COMMIT = 18, + NETBOX_ROLLBACK = 19, + NETBOX_INJECT = 20, netbox_method_MAX }; @@ -620,6 +623,46 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream, netbox_encode_prepare(L, idx, stream, sync, stream_id); } +static inline void +netbox_encode_txn(lua_State *L, enum iproto_type type, int idx, + struct mpstream *stream, uint64_t sync, + uint64_t stream_id) +{ + (void)L; + (void) idx; + assert(type == IPROTO_TRANSACTION_BEGIN || + type == IPROTO_TRANSACTION_COMMIT || + type == IPROTO_TRANSACTION_ROLLBACK); + size_t svp = netbox_prepare_request(stream, sync, + type, stream_id); + + netbox_encode_request(stream, svp); +} + +static void +netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream, + uint64_t sync, uint64_t stream_id) +{ + return netbox_encode_txn(L, IPROTO_TRANSACTION_BEGIN, idx, stream, + sync, stream_id); +} + +static void +netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream, + uint64_t sync, uint64_t stream_id) +{ + return netbox_encode_txn(L, IPROTO_TRANSACTION_COMMIT, idx, stream, + sync, stream_id); +} + +static void +netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream, + uint64_t sync, uint64_t stream_id) +{ + return netbox_encode_txn(L, IPROTO_TRANSACTION_ROLLBACK, idx, stream, + sync, stream_id); +} + static void netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream, uint64_t sync, uint64_t stream_id) @@ -667,6 +710,9 @@ netbox_encode_method(struct lua_State *L) [NETBOX_MIN] = netbox_encode_select, [NETBOX_MAX] = netbox_encode_select, [NETBOX_COUNT] = netbox_encode_call, + [NETBOX_BEGIN] = netbox_encode_begin, + [NETBOX_COMMIT] = netbox_encode_commit, + [NETBOX_ROLLBACK] = netbox_encode_rollback, [NETBOX_INJECT] = netbox_encode_inject, }; enum netbox_method method = lua_tointeger(L, 1); @@ -1047,6 +1093,9 @@ netbox_decode_method(struct lua_State *L) [NETBOX_MIN] = netbox_decode_tuple, [NETBOX_MAX] = netbox_decode_tuple, [NETBOX_COUNT] = netbox_decode_value, + [NETBOX_BEGIN] = netbox_decode_nil, + [NETBOX_COMMIT] = netbox_decode_nil, + [NETBOX_ROLLBACK] = netbox_decode_nil, [NETBOX_INJECT] = netbox_decode_table, }; enum netbox_method method = lua_tointeger(L, 1); diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index bf6a89e15..199d78127 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -70,8 +70,11 @@ local M_GET = 13 local M_MIN = 14 local M_MAX = 15 local M_COUNT = 16 +local M_BEGIN = 17 +local M_COMMIT = 18 +local M_ROLLBACK = 19 -- Injects raw data into connection. Used by console and tests. -local M_INJECT = 17 +local M_INJECT = 20 ffi.cdef[[ struct error * @@ -1167,16 +1170,52 @@ local function check_eval_args(args) end end +local function nothing_or_data(value) + if value ~= nil then + return value + end +end + function stream_methods:new_stream() check_remote_arg(self, 'stream') box.error(E_PROC_LUA, "Unsupported for stream"); end +function stream_methods:begin(opts) + check_remote_arg(self, 'begin') + local res = self:_request(M_BEGIN, opts, nil, self._stream_id) + if type(res) ~= 'table' or opts and opts.is_async then + return nothing_or_data(res) + end + return unpack(res) +end + +function stream_methods:commit(opts) + check_remote_arg(self, 'commit') + local res = self:_request(M_COMMIT, opts, nil, self._stream_id) + if type(res) ~= 'table' or opts and opts.is_async then + return nothing_or_data(res) + end + return unpack(res) +end + +function stream_methods:rollback(opts) + check_remote_arg(self, 'rollback') + local res = self:_request(M_ROLLBACK, opts, nil, self._stream_id) + if type(res) ~= 'table' or opts and opts.is_async then + return nothing_or_data(res) + end + return unpack(res) +end + function remote_methods:new_stream() check_remote_arg(self, 'stream') self._last_stream_id = self._last_stream_id + 1 local stream = setmetatable({ new_stream = stream_methods.new_stream, + begin = stream_methods.begin, + commit = stream_methods.commit, + rollback = stream_methods.rollback, _stream_id = self._last_stream_id, space = setmetatable({ _space = {}, @@ -1498,12 +1537,6 @@ function console_methods:eval(line, timeout) return res[1] or res end -local function nothing_or_data(value) - if value ~= nil then - return value - end -end - space_metatable = function(remote) local methods = {} @@ -1662,6 +1695,9 @@ local this_module = { min = M_MIN, max = M_MAX, count = M_COUNT, + begin = M_BEGIN, + commit = M_COMMIT, + rollback = M_ROLLBACK, inject = M_INJECT, } } diff --git a/test/box/stream.result b/test/box/stream.result index bfcf6c6be..609ce8100 100644 --- a/test/box/stream.result +++ b/test/box/stream.result @@ -3,9 +3,15 @@ net_box = require('net.box') | --- | ... +json = require('json') + | --- + | ... fiber = require('fiber') | --- | ... +msgpack = require('msgpack') + | --- + | ... test_run = require('test_run').new() | --- | ... @@ -95,6 +101,145 @@ stream:new_stream() | --- | - error: Unsupported for stream | ... +-- Simple checks for transactions +conn_1 = net_box.connect(server_addr) + | --- + | ... +conn_2 = net_box.connect(server_addr) + | --- + | ... +stream_1_1 = conn_1:new_stream() + | --- + | ... +stream_1_2 = conn_1:new_stream() + | --- + | ... +stream_2 = conn_2:new_stream() + | --- + | ... +-- It's ok to commit or rollback without any active transaction +stream_1_1:commit() + | --- + | ... +stream_1_1:rollback() + | --- + | ... + +stream_1_1:begin() + | --- + | ... +-- Error unable to start second transaction in one stream +stream_1_1:begin() + | --- + | - error: 'Operation is not permitted when there is an active transaction ' + | ... +-- It's ok to start transaction in separate stream in one connection +stream_1_2:begin() + | --- + | ... +-- It's ok to start transaction in separate stream in other connection +stream_2:begin() + | --- + | ... +test_run:switch("test") + | --- + | - true + | ... +-- It's ok to start local transaction separately with active stream +-- transactions +box.begin() + | --- + | ... +box.commit() + | --- + | ... +test_run:switch("default") + | --- + | - true + | ... +stream_1_1:commit() + | --- + | ... +stream_1_2:commit() + | --- + | ... +stream_2:commit() + | --- + | ... + +-- Check unsupported requests +conn = net_box.connect(server_addr) + | --- + | ... +assert(conn:ping()) + | --- + | - true + | ... +-- Begin, commit and rollback supported only for streams +conn:_request(net_box._method.begin, nil, nil, nil) + | --- + | - error: Unable to process this type (14) of requests out of stream + | ... +conn:_request(net_box._method.commit, nil, nil, nil) + | --- + | - error: Unable to process this type (15) of requests out of stream + | ... +conn:_request(net_box._method.rollback, nil, nil, nil) + | --- + | - error: Unable to process this type (16) of requests out of stream + | ... +-- Not all requests supported by stream. +stream = conn:new_stream() + | --- + | ... +-- Start transaction to allocate stream object on the +-- server side +stream:begin() + | --- + | ... +IPROTO_REQUEST_TYPE = 0x00 + | --- + | ... +IPROTO_SYNC = 0x01 + | --- + | ... +IPROTO_AUTH = 7 + | --- + | ... +IPROTO_STREAM_ID = 0x0a + | --- + | ... +next_request_id = 9 + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +header = msgpack.encode({ + [IPROTO_REQUEST_TYPE] = IPROTO_AUTH, + [IPROTO_SYNC] = next_request_id, + [IPROTO_STREAM_ID] = 1, +}); + | --- + | ... +body = msgpack.encode({nil}); + | --- + | ... +size = msgpack.encode(header:len() + body:len()); + | --- + | ... +conn._transport.perform_request(nil, nil, false, net_box._method.inject, + nil, nil, nil, nil, + size .. header .. body); + | --- + | - null + | - Unable to process this type (7) of requests in stream + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... conn:close() | --- | ... @@ -543,6 +688,2897 @@ test_run:cmd("stop server test") | - true | ... +-- Second argument (false is a value for memtx_use_mvcc_engine option) +-- Server start without active transaction manager, so all transaction +-- fails because of yeild! +test_run:cmd("start server test with args='10, false'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +s = box.schema.space.create('test', { engine = 'memtx' }) + | --- + | ... +_ = s:create_index('primary') + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +assert(conn:ping()) + | --- + | - true + | ... +stream = conn:new_stream() + | --- + | ... +space = stream.space.test + | --- + | ... + +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = false +stream:begin() + | --- + | ... +test_run:switch('test') + | --- + | - true + | ... +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1) + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... +space:replace({1}) + | --- + | - [1] + | ... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space:select{} + | --- + | - [] + | ... +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) + | --- + | - [] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Select is empty, transaction was not commited +s:select() + | --- + | - [] + | ... +test_run:switch('default') + | --- + | - true + | ... +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:commit() + | --- + | - error: Transaction has been aborted by a fiber yield + | ... +-- Select is empty, transaction was aborted +space:select{} + | --- + | - [] + | ... +-- Check that after failed transaction commit we able to start next +-- transaction (it's strange check, but it's necessary because it was +-- bug with it) +stream:begin() + | --- + | ... +stream:ping() + | --- + | - true + | ... +stream:commit() + | --- + | ... +-- Same checks for `call` end `eval` functions. +stream:call('box.begin') + | --- + | ... +stream:call('s:replace', {{1}}) + | --- + | - [1] + | ... +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) + | --- + | - [] + | ... +stream:call('s:select', {}) + | --- + | - [] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Select is empty, transaction was not commited +s:select() + | --- + | - [] + | ... +test_run:switch('default') + | --- + | - true + | ... +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:eval('box.commit()') + | --- + | - error: Transaction has been aborted by a fiber yield + | ... +-- Select is empty, transaction was aborted +space:select{} + | --- + | - [] + | ... + +-- Same checks for `execute` function which can also +-- begin and commit transaction. +stream:execute('START TRANSACTION') + | --- + | - row_count: 0 + | ... +stream:call('s:replace', {{1}}) + | --- + | - [1] + | ... +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) + | --- + | - [] + | ... +stream:call('s:select', {}) + | --- + | - [] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Select is empty, transaction was not commited +s:select() + | --- + | - [] + | ... +test_run:switch('default') + | --- + | - true + | ... +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:execute('COMMIT') + | --- + | - error: Transaction has been aborted by a fiber yield + | ... +-- Select is empty, transaction was aborted +space:select{} + | --- + | - [] + | ... + +test_run:switch('test') + | --- + | - true + | ... +s:drop() + | --- + | ... +-- Check that there are no streams and messages, which +-- was not deleted +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... +stream:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- Next we check transactions only for memtx with +-- memtx_use_mvcc_engine = true and for vinyl, because +-- if memtx_use_mvcc_engine = false all transactions fails, +-- as we can see before! + +-- Second argument (true is a value for memtx_use_mvcc_engine option) +-- Same test case as previous but server start with active transaction +-- manager. Also check vinyl, because it's behaviour is same. +test_run:cmd("start server test with args='10, true'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) + | --- + | ... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) + | --- + | ... +_ = s1:create_index('primary') + | --- + | ... +_ = s2:create_index('primary') + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +assert(conn:ping()) + | --- + | - true + | ... +stream_1 = conn:new_stream() + | --- + | ... +stream_2 = conn:new_stream() + | --- + | ... +space_1 = stream_1.space.test_1 + | --- + | ... +space_2 = stream_2.space.test_2 + | --- + | ... +-- Spaces getting from connection, not from stream has no stream_id +-- and not belongs to stream +space_1_no_stream = conn.space.test_1 + | --- + | ... +space_2_no_stream = conn.space.test_2 + | --- + | ... +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = true and to vinyl: +-- behaviour is same! +stream_1:begin() + | --- + | ... +space_1:replace({1}) + | --- + | - [1] + | ... +stream_2:begin() + | --- + | ... +space_2:replace({1}) + | --- + | - [1] + | ... +test_run:switch('test') + | --- + | - true + | ... +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2) + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_1_no_stream:select{} + | --- + | - [] + | ... +space_2_no_stream:select{} + | --- + | - [] + | ... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) + | --- + | - - [1] + | ... +space_2:select({}) + | --- + | - - [1] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Select is empty, transaction was not commited +s1:select() + | --- + | - [] + | ... +s2:select() + | --- + | - [] + | ... +test_run:switch('default') + | --- + | - true + | ... +-- Commit was successful, transaction can yeild with +-- memtx_use_mvcc_engine = true. Vinyl transactions +-- can yeild also. +stream_1:commit() + | --- + | ... +stream_2:commit() + | --- + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch("default") + | --- + | - true + | ... + +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_1:select{} + | --- + | - - [1] + | ... +space_2:select{} + | --- + | - - [1] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() + | --- + | - - [1] + | ... +s2:select() + | --- + | - - [1] + | ... +s1:drop() + | --- + | ... +s2:drop() + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) + | --- + | ... +_ = s1:create_index('primary') + | --- + | ... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) + | --- + | ... +_ = s2:create_index('primary') + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn:new_stream() + | --- + | ... +stream_2 = conn:new_stream() + | --- + | ... +space_1_1 = stream_1.space.test_1 + | --- + | ... +space_1_2 = stream_2.space.test_1 + | --- + | ... +space_2_1 = stream_1.space.test_2 + | --- + | ... +space_2_2 = stream_2.space.test_2 + | --- + | ... +stream_1:begin() + | --- + | ... +stream_2:begin() + | --- + | ... + +-- Simple read/write conflict. +space_1_1:select({1}) + | --- + | - [] + | ... +space_1_2:select({1}) + | --- + | - [] + | ... +space_1_1:replace({1, 1}) + | --- + | - [1, 1] + | ... +space_1_2:replace({1, 2}) + | --- + | - [1, 2] + | ... +stream_1:commit() + | --- + | ... +-- This transaction fails, because of conflict +stream_2:commit() + | --- + | - error: Transaction has been aborted by conflict + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch("default") + | --- + | - true + | ... +-- Here we must accept [1, 1] +space_1_1:select({}) + | --- + | - - [1, 1] + | ... +space_1_2:select({}) + | --- + | - - [1, 1] + | ... + +-- Same test for vinyl sapce +stream_1:begin() + | --- + | ... +stream_2:begin() + | --- + | ... +space_2_1:select({1}) + | --- + | - [] + | ... +space_2_2:select({1}) + | --- + | - [] + | ... +space_2_1:replace({1, 1}) + | --- + | - [1, 1] + | ... +space_2_2:replace({1, 2}) + | --- + | - [1, 2] + | ... +stream_1:commit() + | --- + | ... +-- This transaction fails, because of conflict +stream_2:commit() + | --- + | - error: Transaction has been aborted by conflict + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch("default") + | --- + | - true + | ... +-- Here we must accept [1, 1] +space_2_1:select({}) + | --- + | - - [1, 1] + | ... +space_2_2:select({}) + | --- + | - - [1, 1] + | ... + +test_run:switch('test') + | --- + | - true + | ... +-- Both select return tuple [1, 1], transaction commited +s1:select() + | --- + | - - [1, 1] + | ... +s2:select() + | --- + | - - [1, 1] + | ... +s1:drop() + | --- + | ... +s2:drop() + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- Check rollback as a command for memtx and vinyl spaces +test_run:cmd("start server test with args='10, true'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) + | --- + | ... +_ = s1:create_index('primary') + | --- + | ... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) + | --- + | ... +_ = s2:create_index('primary') + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn:new_stream() + | --- + | ... +stream_2 = conn:new_stream() + | --- + | ... +space_1 = stream_1.space.test_1 + | --- + | ... +space_2 = stream_2.space.test_2 + | --- + | ... +stream_1:begin() + | --- + | ... +stream_2:begin() + | --- + | ... + +-- Test rollback for memtx space +space_1:replace({1}) + | --- + | - [1] + | ... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) + | --- + | - - [1] + | ... +stream_1:rollback() + | --- + | ... +-- Select is empty, transaction rollback +space_1:select({}) + | --- + | - [] + | ... + +-- Test rollback for vinyl space +space_2:replace({1}) + | --- + | - [1] + | ... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_2:select({}) + | --- + | - - [1] + | ... +stream_2:rollback() + | --- + | ... +-- Select is empty, transaction rollback +space_2:select({}) + | --- + | - [] + | ... + +test_run:switch("test") + | --- + | - true + | ... +-- Check that there are no streams and messages, which +-- was not deleted after rollback +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch("default") + | --- + | - true + | ... + +-- This is simple test is necessary because i have a bug +-- with halting stream after rollback +stream_1:begin() + | --- + | ... +stream_1:commit() + | --- + | ... +stream_2:begin() + | --- + | ... +stream_2:commit() + | --- + | ... +conn:close() + | --- + | ... + +test_run:switch('test') + | --- + | - true + | ... +-- Both select are empty, because transaction rollback +s1:select() + | --- + | - [] + | ... +s2:select() + | --- + | - [] + | ... +s1:drop() + | --- + | ... +s2:drop() + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- Check rollback on disconnect +test_run:cmd("start server test with args='10, true'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) + | --- + | ... +_ = s1:create_index('primary') + | --- + | ... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) + | --- + | ... +_ = s2:create_index('primary') + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn:new_stream() + | --- + | ... +stream_2 = conn:new_stream() + | --- + | ... +space_1 = stream_1.space.test_1 + | --- + | ... +space_2 = stream_2.space.test_2 + | --- + | ... +stream_1:begin() + | --- + | ... +stream_2:begin() + | --- + | ... + +space_1:replace({1}) + | --- + | - [1] + | ... +space_1:replace({2}) + | --- + | - [2] + | ... +-- Select return two previously inserted tuples +space_1:select({}) + | --- + | - - [1] + | - [2] + | ... + +space_2:replace({1}) + | --- + | - [1] + | ... +space_2:replace({2}) + | --- + | - [2] + | ... +-- Select return two previously inserted tuples +space_2:select({}) + | --- + | - - [1] + | - [2] + | ... +conn:close() + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +-- Empty selects, transaction was rollback +s1:select() + | --- + | - [] + | ... +s2:select() + | --- + | - [] + | ... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch("default") + | --- + | - true + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... + +-- Reconnect +conn = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn:new_stream() + | --- + | ... +stream_2 = conn:new_stream() + | --- + | ... +space_1 = stream_1.space.test_1 + | --- + | ... +space_2 = stream_2.space.test_2 + | --- + | ... +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() + | --- + | ... +stream_2:begin() + | --- + | ... +-- Two empty selects +space_1:select({}) + | --- + | - [] + | ... +space_2:select({}) + | --- + | - [] + | ... +stream_1:commit() + | --- + | ... +stream_2:commit() + | --- + | ... + +test_run:switch('test') + | --- + | - true + | ... +-- Both select are empty, because transaction rollback +s1:select() + | --- + | - [] + | ... +s2:select() + | --- + | - [] + | ... +s1:drop() + | --- + | ... +s2:drop() + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- Check rollback on disconnect with big count of async requests +test_run:cmd("start server test with args='10, true'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) + | --- + | ... +_ = s1:create_index('primary') + | --- + | ... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) + | --- + | ... +_ = s2:create_index('primary') + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn:new_stream() + | --- + | ... +stream_2 = conn:new_stream() + | --- + | ... +space_1 = stream_1.space.test_1 + | --- + | ... +space_2 = stream_2.space.test_2 + | --- + | ... +stream_1:begin() + | --- + | ... +stream_2:begin() + | --- + | ... + +space_1:replace({1}) + | --- + | - [1] + | ... +space_1:replace({2}) + | --- + | - [2] + | ... +-- Select return two previously inserted tuples +space_1:select({}) + | --- + | - - [1] + | - [2] + | ... + +space_2:replace({1}) + | --- + | - [1] + | ... +space_2:replace({2}) + | --- + | - [2] + | ... +-- Select return two previously inserted tuples +space_2:select({}) + | --- + | - - [1] + | - [2] + | ... +-- We send a large number of asynchronous requests, +-- their result is not important to us, it is important +-- that they will be in the stream queue at the time of +-- the disconnect. +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +for i = 1, 1000 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +fiber.sleep(0) + | --- + | ... +conn:close() + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); + | --- + | - true + | ... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); + | --- + | - true + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +-- Select was empty, transaction rollbacked +s1:select() + | --- + | - [] + | ... +s2:select() + | --- + | - [] + | ... +test_run:switch("default") + | --- + | - true + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... + +-- Same test, but now we check that if `commit` was received +-- by server before connection closed, we processed it successful. +conn = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn:new_stream() + | --- + | ... +stream_2 = conn:new_stream() + | --- + | ... +space_1 = stream_1.space.test_1 + | --- + | ... +space_2 = stream_2.space.test_2 + | --- + | ... +stream_1:begin() + | --- + | ... +stream_2:begin() + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +-- Here, for a large number of messages, we cannot guarantee their processing, +-- since if the net_msg_max limit is reached, we will stop processing incoming +-- requests, and after close, we will discard all raw data. '100' is the number +-- of messages that we can process without reaching net_msg_max. We will not try +-- any more, so as not to make a test flaky. +for i = 1, 100 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +_ = stream_1:commit({is_async = true}) + | --- + | ... +_ = stream_2:commit({is_async = true}) + | --- + | ... +fiber.sleep(0) + | --- + | ... +conn:close() + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); + | --- + | - true + | ... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); + | --- + | - true + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +-- Select return tuples from [1] to [100], +-- transaction was commit +rc1 = s1:select() + | --- + | ... +rc2 = s2:select() + | --- + | ... +assert(#rc1) + | --- + | - 100 + | ... +assert(#rc2) + | --- + | - 100 + | ... +s1:truncate() + | --- + | ... +s2:truncate() + | --- + | ... +test_run:switch("default") + | --- + | - true + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... + +-- Reconnect +conn = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn:new_stream() + | --- + | ... +stream_2 = conn:new_stream() + | --- + | ... +space_1 = stream_1.space.test_1 + | --- + | ... +space_2 = stream_2.space.test_2 + | --- + | ... +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() + | --- + | ... +stream_2:begin() + | --- + | ... +-- Two empty selects +space_1:select({}) + | --- + | - [] + | ... +space_2:select({}) + | --- + | - [] + | ... +stream_1:commit() + | --- + | ... +stream_2:commit() + | --- + | ... + +test_run:switch('test') + | --- + | - true + | ... +-- Both select are empty, because transaction rollback +s1:select() + | --- + | - [] + | ... +s2:select() + | --- + | - [] + | ... +s1:drop() + | --- + | ... +s2:drop() + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- Check that all requests between `begin` and `commit` +-- have correct lsn and tsn values. During my work on the +-- patch, i see that all requests in stream comes with +-- header->is_commit == true, so if we are in transaction +-- in stream we should set this value to false, otherwise +-- during recovering `wal_stream_apply_dml_row` fails, because +-- of LSN/TSN mismatch. Here is a special test case for it. +test_run:cmd("start server test with args='10, true'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) + | --- + | ... +_ = s1:create_index('primary') + | --- + | ... +s2 = box.schema.space.create('test_2', { engine = 'memtx' }) + | --- + | ... +_ = s2:create_index('primary') + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn:new_stream() + | --- + | ... +stream_2 = conn:new_stream() + | --- + | ... +space_1 = stream_1.space.test_1 + | --- + | ... +space_2 = stream_2.space.test_2 + | --- + | ... + +stream_1:begin() + | --- + | ... +stream_2:begin() + | --- + | ... +space_1:replace({1}) + | --- + | - [1] + | ... +space_1:replace({2}) + | --- + | - [2] + | ... +space_2:replace({1}) + | --- + | - [1] + | ... +space_2:replace({2}) + | --- + | - [2] + | ... +stream_1:commit() + | --- + | ... +stream_2:commit() + | --- + | ... + +test_run:switch('test') + | --- + | - true + | ... +-- Here we get two tuples, commit was successful +s1:select{} + | --- + | - - [1] + | - [2] + | ... +-- Here we get two tuples, commit was successful +s2:select{} + | --- + | - - [1] + | - [2] + | ... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +test_run:cmd("start server test with args='1, true'") + | --- + | - true + | ... +test_run:switch('test') + | --- + | - true + | ... +-- Here we get two tuples, commit was successful +box.space.test_1:select{} + | --- + | - - [1] + | - [2] + | ... +-- Here we get two tuples, commit was successful +box.space.test_2:select{} + | --- + | - - [1] + | - [2] + | ... +box.space.test_1:drop() + | --- + | ... +box.space.test_2:drop() + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- Same transactions checks for async mode +test_run:cmd("start server test with args='10, true'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) + | --- + | ... +_ = s1:create_index('primary') + | --- + | ... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) + | --- + | ... +_ = s2:create_index('primary') + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +assert(conn:ping()) + | --- + | - true + | ... +stream_1 = conn:new_stream() + | --- + | ... +space_1 = stream_1.space.test_1 + | --- + | ... +stream_2 = conn:new_stream() + | --- + | ... +space_2 = stream_2.space.test_2 + | --- + | ... + +memtx_futures = {} + | --- + | ... +memtx_futures["begin"] = stream_1:begin({is_async = true}) + | --- + | ... +memtx_futures["replace"] = space_1:replace({1}, {is_async = true}) + | --- + | ... +memtx_futures["insert"] = space_1:insert({2}, {is_async = true}) + | --- + | ... +memtx_futures["select"] = space_1:select({}, {is_async = true}) + | --- + | ... + +vinyl_futures = {} + | --- + | ... +vinyl_futures["begin"] = stream_2:begin({is_async = true}) + | --- + | ... +vinyl_futures["replace"] = space_2:replace({1}, {is_async = true}) + | --- + | ... +vinyl_futures["insert"] = space_2:insert({2}, {is_async = true}) + | --- + | ... +vinyl_futures["select"] = space_2:select({}, {is_async = true}) + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +-- Select is empty, transaction was not commited +s1:select() + | --- + | - [] + | ... +s2:select() + | --- + | - [] + | ... +test_run:switch('default') + | --- + | - true + | ... +memtx_futures["commit"] = stream_1:commit({is_async = true}) + | --- + | ... +vinyl_futures["commit"] = stream_2:commit({is_async = true}) + | --- + | ... + +memtx_results = wait_and_return_results(memtx_futures) + | --- + | ... +vinyl_results = wait_and_return_results(vinyl_futures) + | --- + | ... +-- If begin was successful it return nil +assert(not memtx_results["begin"]) + | --- + | - true + | ... +assert(not vinyl_results["begin"]) + | --- + | - true + | ... +-- [1] +assert(memtx_results["replace"]) + | --- + | - [1] + | ... +assert(vinyl_results["replace"]) + | --- + | - [1] + | ... +-- [2] +assert(memtx_results["insert"]) + | --- + | - [2] + | ... +assert(vinyl_results["insert"]) + | --- + | - [2] + | ... +-- [1] [2] +assert(memtx_results["select"]) + | --- + | - - [1] + | - [2] + | ... +assert(vinyl_results["select"]) + | --- + | - - [1] + | - [2] + | ... +-- If commit was successful it return nil +assert(not memtx_results["commit"]) + | --- + | - true + | ... +assert(not vinyl_results["commit"]) + | --- + | - true + | ... + +test_run:switch("test") + | --- + | - true + | ... +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() + | --- + | - - [1] + | - [2] + | ... +s2:select() + | --- + | - - [1] + | - [2] + | ... +s1:drop() + | --- + | ... +s2:drop() + | --- + | ... +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... + +test_run:switch("test") + | --- + | - true + | ... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) + | --- + | ... +_ = s1:create_index('primary') + | --- + | ... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) + | --- + | ... +_ = s2:create_index('primary') + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn:new_stream() + | --- + | ... +stream_2 = conn:new_stream() + | --- + | ... +space_1_1 = stream_1.space.test_1 + | --- + | ... +space_1_2 = stream_2.space.test_1 + | --- + | ... +space_2_1 = stream_1.space.test_2 + | --- + | ... +space_2_2 = stream_2.space.test_2 + | --- + | ... + +futures_1 = {} + | --- + | ... +-- Simple read/write conflict. +futures_1["begin_1"] = stream_1:begin({is_async = true}) + | --- + | ... +futures_1["begin_2"] = stream_2:begin({is_async = true}) + | --- + | ... +futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true}) + | --- + | ... +futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true}) + | --- + | ... +futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true}) + | --- + | ... +futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true}) + | --- + | ... +futures_1["commit_1"] = stream_1:commit({is_async = true}) + | --- + | ... +futures_1["commit_2"] = stream_2:commit({is_async = true}) + | --- + | ... +futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true}) + | --- + | ... +futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true}) + | --- + | ... + +results_1 = wait_and_return_results(futures_1) + | --- + | ... +-- Successful begin return nil +assert(not results_1["begin_1"]) + | --- + | - true + | ... +assert(not results_1["begin_2"]) + | --- + | - true + | ... +-- [] +assert(not results_1["select_1_1"][1]) + | --- + | - true + | ... +assert(not results_1["select_1_2"][1]) + | --- + | - true + | ... +-- [1] +assert(results_1["replace_1_1"][1]) + | --- + | - 1 + | ... +-- [1] +assert(results_1["replace_1_1"][2]) + | --- + | - 1 + | ... +-- [1] +assert(results_1["replace_1_2"][1]) + | --- + | - 1 + | ... +-- [2] +assert(results_1["replace_1_2"][2]) + | --- + | - 2 + | ... +-- Successful commit return nil +assert(not results_1["commit_1"]) + | --- + | - true + | ... +-- Error because of transaction conflict +assert(results_1["commit_2"]) + | --- + | - Transaction has been aborted by conflict + | ... +-- [1, 1] +assert(results_1["select_1_1_A"][1]) + | --- + | - [1, 1] + | ... +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_1_2_A"][1] + +futures_2 = {} + | --- + | ... +-- Simple read/write conflict. +futures_2["begin_1"] = stream_1:begin({is_async = true}) + | --- + | ... +futures_2["begin_2"] = stream_2:begin({is_async = true}) + | --- + | ... +futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true}) + | --- + | ... +futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true}) + | --- + | ... +futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true}) + | --- + | ... +futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true}) + | --- + | ... +futures_2["commit_1"] = stream_1:commit({is_async = true}) + | --- + | ... +futures_2["commit_2"] = stream_2:commit({is_async = true}) + | --- + | ... +futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true}) + | --- + | ... +futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true}) + | --- + | ... + +results_2 = wait_and_return_results(futures_2) + | --- + | ... +-- Successful begin return nil +assert(not results_2["begin_1"]) + | --- + | - true + | ... +assert(not results_2["begin_2"]) + | --- + | - true + | ... +-- [] +assert(not results_2["select_2_1"][1]) + | --- + | - true + | ... +assert(not results_2["select_2_2"][1]) + | --- + | - true + | ... +-- [1] +assert(results_2["replace_2_1"][1]) + | --- + | - 1 + | ... +-- [1] +assert(results_2["replace_2_1"][2]) + | --- + | - 1 + | ... +-- [1] +assert(results_2["replace_2_2"][1]) + | --- + | - 1 + | ... +-- [2] +assert(results_2["replace_2_2"][2]) + | --- + | - 2 + | ... +-- Successful commit return nil +assert(not results_2["commit_1"]) + | --- + | - true + | ... +-- Error because of transaction conflict +assert(results_2["commit_2"]) + | --- + | - Transaction has been aborted by conflict + | ... +-- [1, 1] +assert(results_2["select_2_1_A"][1]) + | --- + | - [1, 1] + | ... +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_2_2_A"][1] + +test_run:switch('test') + | --- + | - true + | ... +-- Both select return tuple [1, 1], transaction commited +s1:select() + | --- + | - - [1, 1] + | ... +s2:select() + | --- + | - - [1, 1] + | ... +s1:drop() + | --- + | ... +s2:drop() + | --- + | ... +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- Checks for iproto call/eval/execute in stream +test_run:cmd("start server test with args='10, true'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... +test_run:switch("test") + | --- + | - true + | ... +s = box.schema.space.create('test', { engine = 'memtx' }) + | --- + | ... +_ = s:create_index('primary') + | --- + | ... +function ping() return "pong" end + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +assert(conn:ping()) + | --- + | - true + | ... +stream = conn:new_stream() + | --- + | ... +space = stream.space.test + | --- + | ... +space_no_stream = conn.space.test + | --- + | ... + +-- successful begin using stream:call +stream:call('box.begin') + | --- + | ... +-- error: Operation is not permitted when there is an active transaction +stream:eval('box.begin()') + | --- + | - error: 'Operation is not permitted when there is an active transaction ' + | ... +-- error: Operation is not permitted when there is an active transaction +stream:begin() + | --- + | - error: 'Operation is not permitted when there is an active transaction ' + | ... +-- error: Operation is not permitted when there is an active transaction +stream:execute('START TRANSACTION') + | --- + | - error: 'Operation is not permitted when there is an active transaction ' + | ... +stream:call('ping') + | --- + | - pong + | ... +stream:eval('ping()') + | --- + | ... +-- error: Operation is not permitted when there is an active transaction +stream:call('box.begin') + | --- + | - error: 'Operation is not permitted when there is an active transaction ' + | ... +stream:eval('box.begin()') + | --- + | - error: 'Operation is not permitted when there is an active transaction ' + | ... +-- successful commit using stream:call +stream:call('box.commit') + | --- + | ... + +-- successful begin using stream:eval +stream:eval('box.begin()') + | --- + | ... +space:replace({1}) + | --- + | - [1] + | ... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} + | --- + | - [] + | ... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) + | --- + | - - [1] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Select is empty, transaction was not commited +s:select() + | --- + | - [] + | ... +test_run:switch('default') + | --- + | - true + | ... +--Successful commit using stream:execute +stream:execute('COMMIT') + | --- + | - row_count: 0 + | ... +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_no_stream:select{} + | --- + | - - [1] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Select return tuple, because transaction was successful +s:select() + | --- + | - - [1] + | ... +s:delete{1} + | --- + | - [1] + | ... +test_run:switch('default') + | --- + | - true + | ... +-- Check rollback using stream:call +stream:begin() + | --- + | ... +space:replace({2}) + | --- + | - [2] + | ... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} + | --- + | - [] + | ... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) + | --- + | - - [2] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Select is empty, transaction was not commited +s:select() + | --- + | - [] + | ... +test_run:switch('default') + | --- + | - true + | ... +--Successful rollback using stream:call +stream:call('box.rollback') + | --- + | ... +-- Empty selects transaction rollbacked +space:select({}) + | --- + | - [] + | ... +space_no_stream:select{} + | --- + | - [] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Empty select transaction rollbacked +s:select() + | --- + | - [] + | ... +s:drop() + | --- + | ... +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- Simple test which demostrates that stream immediately +-- destroyed, when no processing messages in stream and +-- no active transaction. + +test_run:cmd("start server test with args='10, true'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... +test_run:switch("test") + | --- + | - true + | ... +s = box.schema.space.create('test', { engine = 'memtx' }) + | --- + | ... +_ = s:create_index('primary') + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +assert(conn:ping()) + | --- + | - true + | ... +stream = conn:new_stream() + | --- + | ... +space = stream.space.test + | --- + | ... +for i = 1, 10 do space:replace{i} end + | --- + | ... +test_run:switch("test") + | --- + | - true + | ... +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +s:drop() + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- Transaction tests for sql iproto requests. +-- All this functions are copy-paste from sql/ddl.test.lua, +-- except that they check sql transactions in streams +test_run:cmd("setopt delimiter '$'") + | --- + | - true + | ... +function execute_sql_string(stream, sql_string) + if stream then + stream:execute(sql_string) + else + box.execute(sql_string) + end +end$ + | --- + | ... +function execute_sql_string_and_return_result(stream, sql_string) + if stream then + return pcall(stream.execute, stream, sql_string) + else + return box.execute(sql_string) + end +end$ + | --- + | ... +function monster_ddl(stream) + local _, err1, err2, err3, err4, err5, err6 + local stream_or_box = stream or box + execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER);]]) + execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER UNIQUE, + CONSTRAINT ck1 + CHECK(b < 100));]]) + + execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);') + execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);') + + execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY + KEY, a INTEGER);]]) + + execute_sql_string(stream, 'DROP INDEX t2a ON t2;') + + execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1 + CHECK(b > 0);]]) + + _, err1 = + execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename + RENAME TO t1;]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT + ck2 CHECK(a > 0);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + + _, err2 = + execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id + INTEGER PRIMARY KEY);]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + + execute_sql_string(stream, [[CREATE TABLE + trigger_catcher(id INTEGER PRIMARY + KEY AUTOINCREMENT);]]) + + execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;') + + execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;') + + execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON + t1 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;') + + execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON + t2 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err4 = + execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a + ON t1(a, b);]]) + + execute_sql_string(stream, 'TRUNCATE TABLE t1;') + _, err5 = + execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;') + _, err6 = + execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE + t_does_not_exist;]]) + + execute_sql_string(stream, 'DROP TRIGGER t2t;') + + return {'Finished ok, errors in the middle: ', err1, err2, err3, err4, + err5, err6} +end$ + | --- + | ... +function monster_ddl_cmp_res(res1, res2) + if json.encode(res1) == json.encode(res2) then + return true + end + return res1, res2 +end$ + | --- + | ... +function monster_ddl_is_clean(stream) + local stream_or_box = stream or box + assert(stream_or_box.space.T1 == nil) + assert(stream_or_box.space.T2 == nil) + assert(stream_or_box.space._trigger:count() == 0) + assert(stream_or_box.space._fk_constraint:count() == 0) + assert(stream_or_box.space._ck_constraint:count() == 0) + assert(stream_or_box.space.T_RENAMED == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) +end$ + | --- + | ... +function monster_ddl_check(stream) + local _, err1, err2, err3, err4, res + local stream_or_box = stream or box + _, err1 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES (1, 1, 101)]]) + execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)') + _, err2 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES(2, 2, 1)]]) + _, err3 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, 20, 1)]]) + _, err4 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, -1, 1)]]) + execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)') + if not stream then + assert(stream_or_box.space.T_RENAMED ~= nil) + assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) + res = execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + else + _, res = + execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + end + return {'Finished ok, errors and trigger catcher content: ', err1, err2, + err3, err4, res} +end$ + | --- + | ... +function monster_ddl_clear(stream) + execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;') + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t2') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t1') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed') +end$ + | --- + | ... +test_run:cmd("setopt delimiter ''")$ + | --- + | - true + | ... + +test_run:cmd("start server test with args='10, true'") + | --- + | - true + | ... +test_run:switch('test') + | --- + | - true + | ... +test_run:cmd("setopt delimiter '$'") + | --- + | - true + | ... +function monster_ddl_is_clean() + if not (box.space.T1 == nil) or + not (box.space.T2 == nil) or + not (box.space._trigger:count() == 0) or + not (box.space._fk_constraint:count() == 0) or + not (box.space._ck_constraint:count() == 0) or + not (box.space.T_RENAMED == nil) or + not (box.space.T_TO_RENAME == nil) then + return false + end + return true +end$ + | --- + | ... +test_run:cmd("setopt delimiter ''")$ + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... + +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... +conn = net_box.connect(server_addr) + | --- + | ... +stream = conn:new_stream() + | --- + | ... + +-- No txn. +true_ddl_res = monster_ddl() + | --- + | ... +true_ddl_res + | --- + | - - 'Finished ok, errors in the middle: ' + | - Space 'T1' already exists + | - Space 'T1' already exists + | - Space 'T3' does not exist + | - Index 'T1A' already exists in space 'T1' + | - 'Failed to execute SQL statement: can not truncate space ''T2'' because other + | objects depend on it' + | - Space 'T_DOES_NOT_EXIST' does not exist + | ... + +true_check_res = monster_ddl_check() + | --- + | ... +true_check_res + | --- + | - - 'Finished ok, errors and trigger catcher content: ' + | - 'Check constraint failed ''CK1'': b < 100' + | - Duplicate key exists in unique index "unique_unnamed_T2_2" in space "T2" with + | old tuple - [1, 1, 1] and new tuple - [2, 2, 1] + | - 'Failed to execute SQL statement: FOREIGN KEY constraint failed' + | - 'Check constraint failed ''CK2'': a > 0' + | - metadata: + | - name: ID + | type: integer + | rows: + | - [1] + | ... + +monster_ddl_clear() + | --- + | ... +monster_ddl_is_clean() + | --- + | ... + +-- Both DDL and cleanup in one txn in stream. +ddl_res = nil + | --- + | ... +stream:execute('START TRANSACTION') + | --- + | - row_count: 0 + | ... +ddl_res = monster_ddl(stream) + | --- + | ... +monster_ddl_clear(stream) + | --- + | ... +stream:call('monster_ddl_is_clean') + | --- + | - true + | ... +stream:execute('COMMIT') + | --- + | - row_count: 0 + | ... +monster_ddl_cmp_res(ddl_res, true_ddl_res) + | --- + | - true + | ... + +-- DDL in txn, cleanup is not. +stream:execute('START TRANSACTION') + | --- + | - row_count: 0 + | ... +ddl_res = monster_ddl(stream) + | --- + | ... +stream:execute('COMMIT') + | --- + | - row_count: 0 + | ... +monster_ddl_cmp_res(ddl_res, true_ddl_res) + | --- + | - true + | ... + +check_res = monster_ddl_check(stream) + | --- + | ... +monster_ddl_cmp_res(check_res, true_check_res) + | --- + | - true + | ... + +monster_ddl_clear(stream) + | --- + | ... +stream:call('monster_ddl_is_clean') + | --- + | - true + | ... + +-- DDL is not in txn, cleanup is. +ddl_res = monster_ddl(stream) + | --- + | ... +monster_ddl_cmp_res(ddl_res, true_ddl_res) + | --- + | - true + | ... + +check_res = monster_ddl_check(stream) + | --- + | ... +monster_ddl_cmp_res(check_res, true_check_res) + | --- + | - true + | ... + +stream:execute('START TRANSACTION') + | --- + | - row_count: 0 + | ... +monster_ddl_clear(stream) + | --- + | ... +stream:call('monster_ddl_is_clean') + | --- + | - true + | ... +stream:execute('COMMIT') + | --- + | - row_count: 0 + | ... + +-- DDL and cleanup in separate txns. +stream:execute('START TRANSACTION') + | --- + | - row_count: 0 + | ... +ddl_res = monster_ddl(stream) + | --- + | ... +stream:execute('COMMIT') + | --- + | - row_count: 0 + | ... +monster_ddl_cmp_res(ddl_res, true_ddl_res) + | --- + | - true + | ... + +check_res = monster_ddl_check(stream) + | --- + | ... +monster_ddl_cmp_res(check_res, true_check_res) + | --- + | - true + | ... + +stream:execute('START TRANSACTION') + | --- + | - row_count: 0 + | ... +monster_ddl_clear(stream) + | --- + | ... +stream:call('monster_ddl_is_clean') + | --- + | - true + | ... +stream:execute('COMMIT') + | --- + | - row_count: 0 + | ... + +test_run:switch("test") + | --- + | - true + | ... +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... + + +-- Check for prepare and unprepare functions +conn = net_box.connect(server_addr) + | --- + | ... +assert(conn:ping()) + | --- + | - true + | ... +stream = conn:new_stream() + | --- + | ... + +stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)') + | --- + | - row_count: 1 + | ... +-- reload schema +stream:ping() + | --- + | - true + | ... +space = stream.space.TEST + | --- + | ... +assert(space ~= nil) + | --- + | - true + | ... +stream:execute('START TRANSACTION') + | --- + | - row_count: 0 + | ... +space:replace{1, 2, '3'} + | --- + | - [1, 2, '3'] + | ... +space:select() + | --- + | - - [1, 2, '3'] + | ... +-- select is empty, because transaction was not commited +conn.space.TEST:select() + | --- + | - [] + | ... +stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") + | --- + | ... +conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") + | --- + | ... +assert(stream_pr.stmt_id == conn_pr.stmt_id) + | --- + | - true + | ... +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) + | --- + | - metadata: + | - name: ID + | type: integer + | - name: A + | type: number + | - name: B + | type: string + | rows: + | - [1, 2, '3'] + | ... +-- empty select, transaction was not commited +conn:execute(conn_pr.stmt_id, {1, 2}) + | --- + | - metadata: + | - name: ID + | type: integer + | - name: A + | type: number + | - name: B + | type: string + | rows: [] + | ... +stream:execute('COMMIT') + | --- + | - row_count: 0 + | ... +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) + | --- + | - metadata: + | - name: ID + | type: integer + | - name: A + | type: number + | - name: B + | type: string + | rows: + | - [1, 2, '3'] + | ... +-- [ 1, 2, '3' ] +conn:execute(conn_pr.stmt_id, {1, 2}) + | --- + | - metadata: + | - name: ID + | type: integer + | - name: A + | type: number + | - name: B + | type: string + | rows: + | - [1, 2, '3'] + | ... +stream:unprepare(stream_pr.stmt_id) + | --- + | - null + | ... +conn:close() + | --- + | ... +test_run:switch('test') + | --- + | - true + | ... +-- [ 1, 2, '3' ] +box.space.TEST:select() + | --- + | - - [1, 2, '3'] + | ... +box.space.TEST:drop() + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + test_run:cmd("cleanup server test") | --- | - true diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua index 190f17d8e..b8bd2d327 100644 --- a/test/box/stream.test.lua +++ b/test/box/stream.test.lua @@ -1,6 +1,8 @@ -- This test checks streams iplementation in iproto (gh-5860). net_box = require('net.box') +json = require('json') fiber = require('fiber') +msgpack = require('msgpack') test_run = require('test_run').new() test_run:cmd("create server test with script='box/stream.lua'") @@ -45,6 +47,62 @@ conn = net_box.connect(server_addr) stream = conn:new_stream() -- Unsupported for stream stream:new_stream() +-- Simple checks for transactions +conn_1 = net_box.connect(server_addr) +conn_2 = net_box.connect(server_addr) +stream_1_1 = conn_1:new_stream() +stream_1_2 = conn_1:new_stream() +stream_2 = conn_2:new_stream() +-- It's ok to commit or rollback without any active transaction +stream_1_1:commit() +stream_1_1:rollback() + +stream_1_1:begin() +-- Error unable to start second transaction in one stream +stream_1_1:begin() +-- It's ok to start transaction in separate stream in one connection +stream_1_2:begin() +-- It's ok to start transaction in separate stream in other connection +stream_2:begin() +test_run:switch("test") +-- It's ok to start local transaction separately with active stream +-- transactions +box.begin() +box.commit() +test_run:switch("default") +stream_1_1:commit() +stream_1_2:commit() +stream_2:commit() + +-- Check unsupported requests +conn = net_box.connect(server_addr) +assert(conn:ping()) +-- Begin, commit and rollback supported only for streams +conn:_request(net_box._method.begin, nil, nil, nil) +conn:_request(net_box._method.commit, nil, nil, nil) +conn:_request(net_box._method.rollback, nil, nil, nil) +-- Not all requests supported by stream. +stream = conn:new_stream() +-- Start transaction to allocate stream object on the +-- server side +stream:begin() +IPROTO_REQUEST_TYPE = 0x00 +IPROTO_SYNC = 0x01 +IPROTO_AUTH = 7 +IPROTO_STREAM_ID = 0x0a +next_request_id = 9 +test_run:cmd("setopt delimiter ';'") +header = msgpack.encode({ + [IPROTO_REQUEST_TYPE] = IPROTO_AUTH, + [IPROTO_SYNC] = next_request_id, + [IPROTO_STREAM_ID] = 1, +}); +body = msgpack.encode({nil}); +size = msgpack.encode(header:len() + body:len()); +conn._transport.perform_request(nil, nil, false, net_box._method.inject, + nil, nil, nil, nil, + size .. header .. body); +test_run:cmd("setopt delimiter ''"); conn:close() -- Check that spaces in stream object updates, during reload_schema @@ -203,5 +261,1148 @@ assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) test_run:switch("default") test_run:cmd("stop server test") +-- Second argument (false is a value for memtx_use_mvcc_engine option) +-- Server start without active transaction manager, so all transaction +-- fails because of yeild! +test_run:cmd("start server test with args='10, false'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() +space = stream.space.test + +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = false +stream:begin() +test_run:switch('test') +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1) +test_run:switch('default') +space:replace({1}) +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space:select{} +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:commit() +-- Select is empty, transaction was aborted +space:select{} +-- Check that after failed transaction commit we able to start next +-- transaction (it's strange check, but it's necessary because it was +-- bug with it) +stream:begin() +stream:ping() +stream:commit() +-- Same checks for `call` end `eval` functions. +stream:call('box.begin') +stream:call('s:replace', {{1}}) +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +stream:call('s:select', {}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:eval('box.commit()') +-- Select is empty, transaction was aborted +space:select{} + +-- Same checks for `execute` function which can also +-- begin and commit transaction. +stream:execute('START TRANSACTION') +stream:call('s:replace', {{1}}) +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +stream:call('s:select', {}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:execute('COMMIT') +-- Select is empty, transaction was aborted +space:select{} + +test_run:switch('test') +s:drop() +-- Check that there are no streams and messages, which +-- was not deleted +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +stream:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Next we check transactions only for memtx with +-- memtx_use_mvcc_engine = true and for vinyl, because +-- if memtx_use_mvcc_engine = false all transactions fails, +-- as we can see before! + +-- Second argument (true is a value for memtx_use_mvcc_engine option) +-- Same test case as previous but server start with active transaction +-- manager. Also check vinyl, because it's behaviour is same. +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s1:create_index('primary') +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +-- Spaces getting from connection, not from stream has no stream_id +-- and not belongs to stream +space_1_no_stream = conn.space.test_1 +space_2_no_stream = conn.space.test_2 +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = true and to vinyl: +-- behaviour is same! +stream_1:begin() +space_1:replace({1}) +stream_2:begin() +space_2:replace({1}) +test_run:switch('test') +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2) +test_run:switch('default') +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_1_no_stream:select{} +space_2_no_stream:select{} +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) +space_2:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s1:select() +s2:select() +test_run:switch('default') +-- Commit was successful, transaction can yeild with +-- memtx_use_mvcc_engine = true. Vinyl transactions +-- can yeild also. +stream_1:commit() +stream_2:commit() +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") + +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_1:select{} +space_2:select{} +test_run:switch("test") +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1_1 = stream_1.space.test_1 +space_1_2 = stream_2.space.test_1 +space_2_1 = stream_1.space.test_2 +space_2_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +-- Simple read/write conflict. +space_1_1:select({1}) +space_1_2:select({1}) +space_1_1:replace({1, 1}) +space_1_2:replace({1, 2}) +stream_1:commit() +-- This transaction fails, because of conflict +stream_2:commit() +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +-- Here we must accept [1, 1] +space_1_1:select({}) +space_1_2:select({}) + +-- Same test for vinyl sapce +stream_1:begin() +stream_2:begin() +space_2_1:select({1}) +space_2_2:select({1}) +space_2_1:replace({1, 1}) +space_2_2:replace({1, 2}) +stream_1:commit() +-- This transaction fails, because of conflict +stream_2:commit() +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +-- Here we must accept [1, 1] +space_2_1:select({}) +space_2_2:select({}) + +test_run:switch('test') +-- Both select return tuple [1, 1], transaction commited +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check rollback as a command for memtx and vinyl spaces +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +-- Test rollback for memtx space +space_1:replace({1}) +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) +stream_1:rollback() +-- Select is empty, transaction rollback +space_1:select({}) + +-- Test rollback for vinyl space +space_2:replace({1}) +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_2:select({}) +stream_2:rollback() +-- Select is empty, transaction rollback +space_2:select({}) + +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after rollback +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") + +-- This is simple test is necessary because i have a bug +-- with halting stream after rollback +stream_1:begin() +stream_1:commit() +stream_2:begin() +stream_2:commit() +conn:close() + +test_run:switch('test') +-- Both select are empty, because transaction rollback +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check rollback on disconnect +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +space_1:replace({1}) +space_1:replace({2}) +-- Select return two previously inserted tuples +space_1:select({}) + +space_2:replace({1}) +space_2:replace({2}) +-- Select return two previously inserted tuples +space_2:select({}) +conn:close() + +test_run:switch("test") +-- Empty selects, transaction was rollback +s1:select() +s2:select() +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + +-- Reconnect +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() +stream_2:begin() +-- Two empty selects +space_1:select({}) +space_2:select({}) +stream_1:commit() +stream_2:commit() + +test_run:switch('test') +-- Both select are empty, because transaction rollback +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check rollback on disconnect with big count of async requests +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +space_1:replace({1}) +space_1:replace({2}) +-- Select return two previously inserted tuples +space_1:select({}) + +space_2:replace({1}) +space_2:replace({2}) +-- Select return two previously inserted tuples +space_2:select({}) +-- We send a large number of asynchronous requests, +-- their result is not important to us, it is important +-- that they will be in the stream queue at the time of +-- the disconnect. +test_run:cmd("setopt delimiter ';'") +for i = 1, 1000 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; +test_run:cmd("setopt delimiter ''"); +fiber.sleep(0) +conn:close() + +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +test_run:cmd("setopt delimiter ';'") +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); +test_run:cmd("setopt delimiter ''"); +-- Select was empty, transaction rollbacked +s1:select() +s2:select() +test_run:switch("default") +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + +-- Same test, but now we check that if `commit` was received +-- by server before connection closed, we processed it successful. +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() +test_run:cmd("setopt delimiter ';'") +-- Here, for a large number of messages, we cannot guarantee their processing, +-- since if the net_msg_max limit is reached, we will stop processing incoming +-- requests, and after close, we will discard all raw data. '100' is the number +-- of messages that we can process without reaching net_msg_max. We will not try +-- any more, so as not to make a test flaky. +for i = 1, 100 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; +test_run:cmd("setopt delimiter ''"); +_ = stream_1:commit({is_async = true}) +_ = stream_2:commit({is_async = true}) +fiber.sleep(0) +conn:close() + +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +test_run:cmd("setopt delimiter ';'") +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); +test_run:cmd("setopt delimiter ''"); +-- Select return tuples from [1] to [100], +-- transaction was commit +rc1 = s1:select() +rc2 = s2:select() +assert(#rc1) +assert(#rc2) +s1:truncate() +s2:truncate() +test_run:switch("default") +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + +-- Reconnect +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() +stream_2:begin() +-- Two empty selects +space_1:select({}) +space_2:select({}) +stream_1:commit() +stream_2:commit() + +test_run:switch('test') +-- Both select are empty, because transaction rollback +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check that all requests between `begin` and `commit` +-- have correct lsn and tsn values. During my work on the +-- patch, i see that all requests in stream comes with +-- header->is_commit == true, so if we are in transaction +-- in stream we should set this value to false, otherwise +-- during recovering `wal_stream_apply_dml_row` fails, because +-- of LSN/TSN mismatch. Here is a special test case for it. +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'memtx' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 + +stream_1:begin() +stream_2:begin() +space_1:replace({1}) +space_1:replace({2}) +space_2:replace({1}) +space_2:replace({2}) +stream_1:commit() +stream_2:commit() + +test_run:switch('test') +-- Here we get two tuples, commit was successful +s1:select{} +-- Here we get two tuples, commit was successful +s2:select{} +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +test_run:cmd("start server test with args='1, true'") +test_run:switch('test') +-- Here we get two tuples, commit was successful +box.space.test_1:select{} +-- Here we get two tuples, commit was successful +box.space.test_2:select{} +box.space.test_1:drop() +box.space.test_2:drop() +test_run:switch('default') +test_run:cmd("stop server test") + +-- Same transactions checks for async mode +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream_1 = conn:new_stream() +space_1 = stream_1.space.test_1 +stream_2 = conn:new_stream() +space_2 = stream_2.space.test_2 + +memtx_futures = {} +memtx_futures["begin"] = stream_1:begin({is_async = true}) +memtx_futures["replace"] = space_1:replace({1}, {is_async = true}) +memtx_futures["insert"] = space_1:insert({2}, {is_async = true}) +memtx_futures["select"] = space_1:select({}, {is_async = true}) + +vinyl_futures = {} +vinyl_futures["begin"] = stream_2:begin({is_async = true}) +vinyl_futures["replace"] = space_2:replace({1}, {is_async = true}) +vinyl_futures["insert"] = space_2:insert({2}, {is_async = true}) +vinyl_futures["select"] = space_2:select({}, {is_async = true}) + +test_run:switch("test") +-- Select is empty, transaction was not commited +s1:select() +s2:select() +test_run:switch('default') +memtx_futures["commit"] = stream_1:commit({is_async = true}) +vinyl_futures["commit"] = stream_2:commit({is_async = true}) + +memtx_results = wait_and_return_results(memtx_futures) +vinyl_results = wait_and_return_results(vinyl_futures) +-- If begin was successful it return nil +assert(not memtx_results["begin"]) +assert(not vinyl_results["begin"]) +-- [1] +assert(memtx_results["replace"]) +assert(vinyl_results["replace"]) +-- [2] +assert(memtx_results["insert"]) +assert(vinyl_results["insert"]) +-- [1] [2] +assert(memtx_results["select"]) +assert(vinyl_results["select"]) +-- If commit was successful it return nil +assert(not memtx_results["commit"]) +assert(not vinyl_results["commit"]) + +test_run:switch("test") +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() +s2:select() +s1:drop() +s2:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1_1 = stream_1.space.test_1 +space_1_2 = stream_2.space.test_1 +space_2_1 = stream_1.space.test_2 +space_2_2 = stream_2.space.test_2 + +futures_1 = {} +-- Simple read/write conflict. +futures_1["begin_1"] = stream_1:begin({is_async = true}) +futures_1["begin_2"] = stream_2:begin({is_async = true}) +futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true}) +futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true}) +futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true}) +futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true}) +futures_1["commit_1"] = stream_1:commit({is_async = true}) +futures_1["commit_2"] = stream_2:commit({is_async = true}) +futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true}) +futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true}) + +results_1 = wait_and_return_results(futures_1) +-- Successful begin return nil +assert(not results_1["begin_1"]) +assert(not results_1["begin_2"]) +-- [] +assert(not results_1["select_1_1"][1]) +assert(not results_1["select_1_2"][1]) +-- [1] +assert(results_1["replace_1_1"][1]) +-- [1] +assert(results_1["replace_1_1"][2]) +-- [1] +assert(results_1["replace_1_2"][1]) +-- [2] +assert(results_1["replace_1_2"][2]) +-- Successful commit return nil +assert(not results_1["commit_1"]) +-- Error because of transaction conflict +assert(results_1["commit_2"]) +-- [1, 1] +assert(results_1["select_1_1_A"][1]) +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_1_2_A"][1] + +futures_2 = {} +-- Simple read/write conflict. +futures_2["begin_1"] = stream_1:begin({is_async = true}) +futures_2["begin_2"] = stream_2:begin({is_async = true}) +futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true}) +futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true}) +futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true}) +futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true}) +futures_2["commit_1"] = stream_1:commit({is_async = true}) +futures_2["commit_2"] = stream_2:commit({is_async = true}) +futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true}) +futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true}) + +results_2 = wait_and_return_results(futures_2) +-- Successful begin return nil +assert(not results_2["begin_1"]) +assert(not results_2["begin_2"]) +-- [] +assert(not results_2["select_2_1"][1]) +assert(not results_2["select_2_2"][1]) +-- [1] +assert(results_2["replace_2_1"][1]) +-- [1] +assert(results_2["replace_2_1"][2]) +-- [1] +assert(results_2["replace_2_2"][1]) +-- [2] +assert(results_2["replace_2_2"][2]) +-- Successful commit return nil +assert(not results_2["commit_1"]) +-- Error because of transaction conflict +assert(results_2["commit_2"]) +-- [1, 1] +assert(results_2["select_2_1_A"][1]) +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_2_2_A"][1] + +test_run:switch('test') +-- Both select return tuple [1, 1], transaction commited +s1:select() +s2:select() +s1:drop() +s2:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Checks for iproto call/eval/execute in stream +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +test_run:switch("test") +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +function ping() return "pong" end +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() +space = stream.space.test +space_no_stream = conn.space.test + +-- successful begin using stream:call +stream:call('box.begin') +-- error: Operation is not permitted when there is an active transaction +stream:eval('box.begin()') +-- error: Operation is not permitted when there is an active transaction +stream:begin() +-- error: Operation is not permitted when there is an active transaction +stream:execute('START TRANSACTION') +stream:call('ping') +stream:eval('ping()') +-- error: Operation is not permitted when there is an active transaction +stream:call('box.begin') +stream:eval('box.begin()') +-- successful commit using stream:call +stream:call('box.commit') + +-- successful begin using stream:eval +stream:eval('box.begin()') +space:replace({1}) +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +--Successful commit using stream:execute +stream:execute('COMMIT') +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_no_stream:select{} +test_run:switch("test") +-- Select return tuple, because transaction was successful +s:select() +s:delete{1} +test_run:switch('default') +-- Check rollback using stream:call +stream:begin() +space:replace({2}) +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +--Successful rollback using stream:call +stream:call('box.rollback') +-- Empty selects transaction rollbacked +space:select({}) +space_no_stream:select{} +test_run:switch("test") +-- Empty select transaction rollbacked +s:select() +s:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Simple test which demostrates that stream immediately +-- destroyed, when no processing messages in stream and +-- no active transaction. + +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +test_run:switch("test") +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() +space = stream.space.test +for i = 1, 10 do space:replace{i} end +test_run:switch("test") +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +s:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Transaction tests for sql iproto requests. +-- All this functions are copy-paste from sql/ddl.test.lua, +-- except that they check sql transactions in streams +test_run:cmd("setopt delimiter '$'") +function execute_sql_string(stream, sql_string) + if stream then + stream:execute(sql_string) + else + box.execute(sql_string) + end +end$ +function execute_sql_string_and_return_result(stream, sql_string) + if stream then + return pcall(stream.execute, stream, sql_string) + else + return box.execute(sql_string) + end +end$ +function monster_ddl(stream) + local _, err1, err2, err3, err4, err5, err6 + local stream_or_box = stream or box + execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER);]]) + execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER UNIQUE, + CONSTRAINT ck1 + CHECK(b < 100));]]) + + execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);') + execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);') + + execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY + KEY, a INTEGER);]]) + + execute_sql_string(stream, 'DROP INDEX t2a ON t2;') + + execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1 + CHECK(b > 0);]]) + + _, err1 = + execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename + RENAME TO t1;]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT + ck2 CHECK(a > 0);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + + _, err2 = + execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id + INTEGER PRIMARY KEY);]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + + execute_sql_string(stream, [[CREATE TABLE + trigger_catcher(id INTEGER PRIMARY + KEY AUTOINCREMENT);]]) + + execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;') + + execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;') + + execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON + t1 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;') + + execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON + t2 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err4 = + execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a + ON t1(a, b);]]) + + execute_sql_string(stream, 'TRUNCATE TABLE t1;') + _, err5 = + execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;') + _, err6 = + execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE + t_does_not_exist;]]) + + execute_sql_string(stream, 'DROP TRIGGER t2t;') + + return {'Finished ok, errors in the middle: ', err1, err2, err3, err4, + err5, err6} +end$ +function monster_ddl_cmp_res(res1, res2) + if json.encode(res1) == json.encode(res2) then + return true + end + return res1, res2 +end$ +function monster_ddl_is_clean(stream) + local stream_or_box = stream or box + assert(stream_or_box.space.T1 == nil) + assert(stream_or_box.space.T2 == nil) + assert(stream_or_box.space._trigger:count() == 0) + assert(stream_or_box.space._fk_constraint:count() == 0) + assert(stream_or_box.space._ck_constraint:count() == 0) + assert(stream_or_box.space.T_RENAMED == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) +end$ +function monster_ddl_check(stream) + local _, err1, err2, err3, err4, res + local stream_or_box = stream or box + _, err1 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES (1, 1, 101)]]) + execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)') + _, err2 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES(2, 2, 1)]]) + _, err3 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, 20, 1)]]) + _, err4 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, -1, 1)]]) + execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)') + if not stream then + assert(stream_or_box.space.T_RENAMED ~= nil) + assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) + res = execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + else + _, res = + execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + end + return {'Finished ok, errors and trigger catcher content: ', err1, err2, + err3, err4, res} +end$ +function monster_ddl_clear(stream) + execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;') + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t2') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t1') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed') +end$ +test_run:cmd("setopt delimiter ''")$ + +test_run:cmd("start server test with args='10, true'") +test_run:switch('test') +test_run:cmd("setopt delimiter '$'") +function monster_ddl_is_clean() + if not (box.space.T1 == nil) or + not (box.space.T2 == nil) or + not (box.space._trigger:count() == 0) or + not (box.space._fk_constraint:count() == 0) or + not (box.space._ck_constraint:count() == 0) or + not (box.space.T_RENAMED == nil) or + not (box.space.T_TO_RENAME == nil) then + return false + end + return true +end$ +test_run:cmd("setopt delimiter ''")$ +test_run:switch('default') + +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +conn = net_box.connect(server_addr) +stream = conn:new_stream() + +-- No txn. +true_ddl_res = monster_ddl() +true_ddl_res + +true_check_res = monster_ddl_check() +true_check_res + +monster_ddl_clear() +monster_ddl_is_clean() + +-- Both DDL and cleanup in one txn in stream. +ddl_res = nil +stream:execute('START TRANSACTION') +ddl_res = monster_ddl(stream) +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') +stream:execute('COMMIT') +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +-- DDL in txn, cleanup is not. +stream:execute('START TRANSACTION') +ddl_res = monster_ddl(stream) +stream:execute('COMMIT') +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +check_res = monster_ddl_check(stream) +monster_ddl_cmp_res(check_res, true_check_res) + +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') + +-- DDL is not in txn, cleanup is. +ddl_res = monster_ddl(stream) +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +check_res = monster_ddl_check(stream) +monster_ddl_cmp_res(check_res, true_check_res) + +stream:execute('START TRANSACTION') +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') +stream:execute('COMMIT') + +-- DDL and cleanup in separate txns. +stream:execute('START TRANSACTION') +ddl_res = monster_ddl(stream) +stream:execute('COMMIT') +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +check_res = monster_ddl_check(stream) +monster_ddl_cmp_res(check_res, true_check_res) + +stream:execute('START TRANSACTION') +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') +stream:execute('COMMIT') + +test_run:switch("test") +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + + +-- Check for prepare and unprepare functions +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() + +stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)') +-- reload schema +stream:ping() +space = stream.space.TEST +assert(space ~= nil) +stream:execute('START TRANSACTION') +space:replace{1, 2, '3'} +space:select() +-- select is empty, because transaction was not commited +conn.space.TEST:select() +stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") +conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") +assert(stream_pr.stmt_id == conn_pr.stmt_id) +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) +-- empty select, transaction was not commited +conn:execute(conn_pr.stmt_id, {1, 2}) +stream:execute('COMMIT') +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) +-- [ 1, 2, '3' ] +conn:execute(conn_pr.stmt_id, {1, 2}) +stream:unprepare(stream_pr.stmt_id) +conn:close() +test_run:switch('test') +-- [ 1, 2, '3' ] +box.space.TEST:select() +box.space.TEST:drop() +test_run:switch('default') +test_run:cmd("stop server test") + test_run:cmd("cleanup server test") test_run:cmd("delete server test") -- 2.20.1