From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 2E9256EC45; Thu, 12 Aug 2021 12:55:23 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 2E9256EC45 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628762123; bh=QeA71lnwQH9moIX4R3UP7L2f4io24Ya7t1YdUjqUM5M=; h=To:Cc:Date:In-Reply-To:References:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=USX+5bbIURYZBI1w1gY4Tdaz6LLiEZgg/mEPjOUomusJ1UzoAPXh96cZJIZjTetsa Ca/mFNQZCjpZwcdu1E/B8sqKhCulmrtpsqlm1UTFAz61Mo0qidoP5Rj7wf7BQNhUhZ ZIEXvU0TZF4M3zq8fNZ1ihuCIzY0cFbyiyQaQFd0= Received: from smtp33.i.mail.ru (smtp33.i.mail.ru [94.100.177.93]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id A7CA16EC63 for ; Thu, 12 Aug 2021 12:51:00 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org A7CA16EC63 Received: by smtp33.i.mail.ru with esmtpa (envelope-from ) id 1mE7MY-0005NV-Ua; Thu, 12 Aug 2021 12:50:59 +0300 To: tarantool-patches@dev.tarantool.org, vdavydov@tarantool.org, v.shpilevoy@tarantool.org Cc: mechanik20051988 Date: Thu, 12 Aug 2021 12:50:46 +0300 Message-Id: <01ad18203fb09e1e4f1cb26e9a7db83a4d1e9087.1628759886.git.mechanik20051988@tarantool.org> X-Mailer: git-send-email 2.20.1 In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD972FF4A7D76DB5E242D14FEF1BD8BF4AC182A05F538085040CA6E03148AF5AE17175814F99BF850485EFF8FC55753582B8F05BC76D7B00418 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE797F4D2EDC29AFAF7EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F790063770398A047C76876C8638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D85C56E2C6EA724B14D3C876B249FCECA1117882F4460429724CE54428C33FAD305F5C1EE8F4F765FC974A882099E279BDA471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F446042972877693876707352026055571C92BF10FF6B57BC7E6449061A352F6E88A58FB86F5D81C698A659EA73AA81AA40904B5D9A18204E546F3947C540F9B2D9BA47D5603F1AB874ED890284AD6D5ED66289B52698AB9A7B718F8C46E0066C2D8992A16725E5C173C3A84C3832E772DE64BCD96BA3038C0950A5D36B5C8C57E37DE458B0BC6067A898B09E46D1867E19FE14079C09775C1D3CA48CF3D321E7403792E342EB15956EA79C166A417C69337E82CC275ECD9A6C639B01B78DA827A17800CE71B8D961A28FB0488731C566533BA786AA5CC5B56E945C8DA X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8183A4AFAF3EA6BDC44F3F687384632F7D2B673E04388D63C4810E888DE9C909385C85F1CB843489FD0B1881A6453793CE9C32612AADDFBE061C61BE10805914D3804EBA3D8E7E5B87ABF8C51168CD8EBDB6D77D8F98F67F34EDC48ACC2A39D04F89CDFB48F4795C241BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34A9A0A0BF1A2CAC623F95A6C9AB846CF3EED50E7E2FD8089DE322EFE4F442932A40DAB04CB317ECBE1D7E09C32AA3244CEE34B56EC5867CF42E0AAB27EE7FF786E3D93501275E802F927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojKW4rnL99YhIgBsUAAyW09w== X-Mailru-Sender: 583F1D7ACE8F49BD29FC049B2A5BF96395EA2367FC8BA7F5AA85B8A5D2F88D83D939EFB03ED2D852B79567116EAC6FCF4E830D9205DBEA545646F0D3C63A617F27ACC94E9A535D22112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: [Tarantool-patches] [PATCH v4 9/9] net.box: add interactive transaction support in net.box X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: mechanik20051988 via Tarantool-patches Reply-To: mechanik20051988 Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" From: mechanik20051988 Implement `begin`, `commit` and `rollback` methods for stream object in `net.box`, which allows to begin, commit and rollback transaction accordingly. Closes #5860 @TarantoolBot document Title: add interactive transaction support in net.box Implement `begin`, `commit` and `rollback` methods for stream object in `net.box`, which allows to begin, commit and rollback transaction accordingly. Now there are multiple ways to begin, commit and rollback transaction from `net.box`: using appropriate stream methods, using 'call` or 'eval' methods or using `execute` method with sql transaction syntax. User can mix these methods, for example, start transaction using `stream:begin()`, and commit transaction using `stream:call('box.commit')` or stream:execute('COMMIT'). Simple example of using interactive transactions via iproto from net.box: ```lua stream = conn:new_stream() space = stream.space.test space_not_from_stream = conn.space.test stream:begin() space:replace({1}) -- return previously inserted tuple, because request -- belongs to transaction. space:select({}) -- empty select, because select doesn't belongs to -- transaction space_not_from_stream:select({}) stream:call('box.commit') -- now transaction was commited, so all requests -- returns tuple. ``` Different examples of using streams you can find in gh-5860-implement-streams-in-iproto.test.lua --- .../gh-5860-implement-streams-in-iproto.md | 26 + src/box/lua/net_box.c | 49 +- src/box/lua/net_box.lua | 35 +- ...ox_iproto_transactions_over_streams.result | 3009 +++++++++++++++++ ..._iproto_transactions_over_streams.test.lua | 1238 +++++++ test/box/suite.ini | 2 +- 6 files changed, 4356 insertions(+), 3 deletions(-) create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md create mode 100644 test/box/net.box_iproto_transactions_over_streams.result create mode 100644 test/box/net.box_iproto_transactions_over_streams.test.lua diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md new file mode 100644 index 000000000..8a8eec3e7 --- /dev/null +++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md @@ -0,0 +1,26 @@ +## feature/core + +* Streams and interactive transactions over streams are implemented + in iproto. Stream is associated with it's ID, which is unique within + one connection. All requests with same not zero stream ID belongs to + the same stream. All requests in stream processed synchronously. The + execution of the next request will not start until the previous one is + completed. If request has zero stream ID it does not belong to stream + and is processed in the old way. + In `net.box`, stream is an object above connection that has the same + methods, but allows to execute requests sequentially. ID is generated + on the client side automatically. If user writes his own connector and + wants to use streams, he must transmit stream_id over iproto protocol. + The main purpose of streams is transactions via iproto. Each stream + can start its own transaction, so they allows multiplexing several + transactions over one connection. There are multiple ways to begin, + commit and rollback transaction: using appropriate stream methods, using + `call` or `eval` methods or using `execute` method with sql transaction + syntax. User can mix these methods, for example, start transaction using + `stream:begin()`, and commit transaction using `stream:call('box.commit')` + or stream:execute('COMMIT'). + If any request fails during the transaction, it will not affect the other + requests in the transaction. If disconnect occurs when there is some active + transaction in stream, this transaction will be rollbacked, if it does not + have time to commit before this moment. + diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index 3bc49af23..229dec590 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -75,7 +75,10 @@ enum netbox_method { NETBOX_MIN = 14, NETBOX_MAX = 15, NETBOX_COUNT = 16, - NETBOX_INJECT = 17, + NETBOX_BEGIN = 17, + NETBOX_COMMIT = 18, + NETBOX_ROLLBACK = 19, + NETBOX_INJECT = 20, netbox_method_MAX }; @@ -916,6 +919,44 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream, netbox_encode_prepare(L, idx, stream, sync, stream_id); } +static inline void +netbox_encode_txn(lua_State *L, enum iproto_type type, int idx, + struct mpstream *stream, uint64_t sync, + uint64_t stream_id) +{ + (void)L; + (void) idx; + assert(type == IPROTO_BEGIN || + type == IPROTO_COMMIT || + type == IPROTO_ROLLBACK); + size_t svp = netbox_begin_encode(stream, sync, type, stream_id); + netbox_end_encode(stream, svp); +} + +static void +netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream, + uint64_t sync, uint64_t stream_id) +{ + return netbox_encode_txn(L, IPROTO_BEGIN, idx, stream, + sync, stream_id); +} + +static void +netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream, + uint64_t sync, uint64_t stream_id) +{ + return netbox_encode_txn(L, IPROTO_COMMIT, idx, stream, + sync, stream_id); +} + +static void +netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream, + uint64_t sync, uint64_t stream_id) +{ + return netbox_encode_txn(L, IPROTO_ROLLBACK, idx, stream, + sync, stream_id); +} + static void netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream, uint64_t sync, uint64_t stream_id) @@ -959,6 +1000,9 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method, [NETBOX_MIN] = netbox_encode_select, [NETBOX_MAX] = netbox_encode_select, [NETBOX_COUNT] = netbox_encode_call, + [NETBOX_BEGIN] = netbox_encode_begin, + [NETBOX_COMMIT] = netbox_encode_commit, + [NETBOX_ROLLBACK] = netbox_encode_rollback, [NETBOX_INJECT] = netbox_encode_inject, }; struct mpstream stream; @@ -1330,6 +1374,9 @@ netbox_decode_method(struct lua_State *L, enum netbox_method method, [NETBOX_MIN] = netbox_decode_tuple, [NETBOX_MAX] = netbox_decode_tuple, [NETBOX_COUNT] = netbox_decode_value, + [NETBOX_BEGIN] = netbox_decode_nil, + [NETBOX_COMMIT] = netbox_decode_nil, + [NETBOX_ROLLBACK] = netbox_decode_nil, [NETBOX_INJECT] = netbox_decode_table, }; method_decoder[method](L, data, data_end, format); diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 8d707fb26..f203b203e 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -51,8 +51,11 @@ local M_GET = 13 local M_MIN = 14 local M_MAX = 15 local M_COUNT = 16 +local M_BEGIN = 17 +local M_COMMIT = 18 +local M_ROLLBACK = 19 -- Injects raw data into connection. Used by console and tests. -local M_INJECT = 17 +local M_INJECT = 20 -- utility tables local is_final_state = {closed = 1, error = 1} @@ -754,11 +757,38 @@ local function stream_new_stream(stream) return stream._conn:new_stream() end +local function stream_begin(stream, opts) + check_remote_arg(stream, 'begin') + local res = stream:_request(M_BEGIN, opts, nil, stream._stream_id) + if opts and opts.is_async then + return res + end +end + +local function stream_commit(stream, opts) + check_remote_arg(stream, 'commit') + local res = stream:_request(M_COMMIT, opts, nil, stream._stream_id) + if opts and opts.is_async then + return res + end +end + +local function stream_rollback(stream, opts) + check_remote_arg(stream, 'rollback') + local res = stream:_request(M_ROLLBACK, opts, nil, stream._stream_id) + if opts and opts.is_async then + return res + end +end + function remote_methods:new_stream() check_remote_arg(self, 'new_stream') self._last_stream_id = self._last_stream_id + 1 local stream = setmetatable({ new_stream = stream_new_stream, + begin = stream_begin, + commit = stream_commit, + rollback = stream_rollback, _stream_id = self._last_stream_id, space = setmetatable({ _stream_space_cache = {}, @@ -1243,6 +1273,9 @@ local this_module = { min = M_MIN, max = M_MAX, count = M_COUNT, + begin = M_BEGIN, + commit = M_COMMIT, + rollback = M_ROLLBACK, inject = M_INJECT, } } diff --git a/test/box/net.box_iproto_transactions_over_streams.result b/test/box/net.box_iproto_transactions_over_streams.result new file mode 100644 index 000000000..c2167e760 --- /dev/null +++ b/test/box/net.box_iproto_transactions_over_streams.result @@ -0,0 +1,3009 @@ +-- This test checks streams iplementation in iproto (gh-5860). +net_box = require('net.box') +--- +... +json = require('json') +--- +... +fiber = require('fiber') +--- +... +msgpack = require('msgpack') +--- +... +test_run = require('test_run').new() +--- +... +test_run:cmd("create server test with script='box/iproto_streams.lua'") +--- +- true +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function get_current_connection_count() + local total_net_stat_table = + test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1] + assert(total_net_stat_table) + local connection_stat_table = total_net_stat_table.CONNECTIONS + assert(connection_stat_table) + return connection_stat_table.current +end; +--- +... +function wait_and_return_results(futures) + local results = {} + for name, future in pairs(futures) do + local err + results[name], err = future:wait_result() + if err then + results[name] = err + end + end + return results +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +test_run:cmd("start server test with args='1'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +-- Simple checks for transactions +conn_1 = net_box.connect(server_addr) +--- +... +conn_2 = net_box.connect(server_addr) +--- +... +stream_1_1 = conn_1:new_stream() +--- +... +stream_1_2 = conn_1:new_stream() +--- +... +stream_2 = conn_2:new_stream() +--- +... +-- It's ok to commit or rollback without any active transaction +stream_1_1:commit() +--- +... +stream_1_1:rollback() +--- +... +stream_1_1:begin() +--- +... +-- Error unable to start second transaction in one stream +stream_1_1:begin() +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +-- It's ok to start transaction in separate stream in one connection +stream_1_2:begin() +--- +... +-- It's ok to start transaction in separate stream in other connection +stream_2:begin() +--- +... +test_run:switch("test") +--- +- true +... +-- It's ok to start local transaction separately with active stream +-- transactions +box.begin() +--- +... +box.commit() +--- +... +test_run:switch("default") +--- +- true +... +stream_1_1:commit() +--- +... +stream_1_2:commit() +--- +... +stream_2:commit() +--- +... +-- Check unsupported requests +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +-- Begin, commit and rollback supported only for streams +conn:_request(net_box._method.begin, nil, nil, nil) +--- +- error: Unable to process BEGIN request out of stream +... +conn:_request(net_box._method.commit, nil, nil, nil) +--- +- error: Unable to process COMMIT request out of stream +... +conn:_request(net_box._method.rollback, nil, nil, nil) +--- +- error: Unable to process ROLLBACK request out of stream +... +-- Not all requests supported by stream. +stream = conn:new_stream() +--- +... +-- Start transaction to allocate stream object on the +-- server side +stream:begin() +--- +... +IPROTO_REQUEST_TYPE = 0x00 +--- +... +IPROTO_SYNC = 0x01 +--- +... +IPROTO_AUTH = 7 +--- +... +IPROTO_STREAM_ID = 0x0a +--- +... +next_request_id = 9 +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +header = msgpack.encode({ + [IPROTO_REQUEST_TYPE] = IPROTO_AUTH, + [IPROTO_SYNC] = next_request_id, + [IPROTO_STREAM_ID] = 1, +}); +--- +... +body = msgpack.encode({nil}); +--- +... +size = msgpack.encode(header:len() + body:len()); +--- +... +conn._transport.perform_request(nil, nil, false, net_box._method.inject, + nil, nil, nil, nil, + size .. header .. body); +--- +- null +- Unable to process AUTH request in stream +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +conn:close() +--- +... +test_run:cmd("stop server test") +--- +- true +... +-- Second argument (false is a value for memtx_use_mvcc_engine option) +-- Server start without active transaction manager, so all transaction +-- fails because of yeild! +test_run:cmd("start server test with args='10, false'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s = box.schema.space.create('test', { engine = 'memtx' }) +--- +... +_ = s:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream = conn:new_stream() +--- +... +space = stream.space.test +--- +... +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = false +stream:begin() +--- +... +test_run:switch('test') +--- +- true +... +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1) +--- +- true +... +test_run:switch('default') +--- +- true +... +space:replace({1}) +--- +- [1] +... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space:select{} +--- +- [] +... +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +--- +- [] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:commit() +--- +- error: Transaction has been aborted by a fiber yield +... +-- Select is empty, transaction was aborted +space:select{} +--- +- [] +... +-- Check that after failed transaction commit we able to start next +-- transaction (it's strange check, but it's necessary because it was +-- bug with it) +stream:begin() +--- +... +stream:ping() +--- +- true +... +stream:commit() +--- +... +-- Same checks for `call` end `eval` functions. +stream:call('box.begin') +--- +... +stream:call('s:replace', {{1}}) +--- +- [1] +... +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +--- +- [] +... +stream:call('s:select', {}) +--- +- [] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:eval('box.commit()') +--- +- error: Transaction has been aborted by a fiber yield +... +-- Select is empty, transaction was aborted +space:select{} +--- +- [] +... +-- Same checks for `execute` function which can also +-- begin and commit transaction. +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +stream:call('s:replace', {{1}}) +--- +- [1] +... +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +--- +- [] +... +stream:call('s:select', {}) +--- +- [] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:execute('COMMIT') +--- +- error: Transaction has been aborted by a fiber yield +... +-- Select is empty, transaction was aborted +space:select{} +--- +- [] +... +test_run:switch('test') +--- +- true +... +s:drop() +--- +... +-- Check that there are no streams and messages, which +-- was not deleted +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch('default') +--- +- true +... +stream:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Next we check transactions only for memtx with +-- memtx_use_mvcc_engine = true and for vinyl, because +-- if memtx_use_mvcc_engine = false all transactions fails, +-- as we can see before! +-- Second argument (true is a value for memtx_use_mvcc_engine option) +-- Same test case as previous but server start with active transaction +-- manager. Also check vinyl, because it's behaviour is same. +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s1:create_index('primary') +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +-- Spaces getting from connection, not from stream has no stream_id +-- and not belongs to stream +space_1_no_stream = conn.space.test_1 +--- +... +space_2_no_stream = conn.space.test_2 +--- +... +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = true and to vinyl: +-- behaviour is same! +stream_1:begin() +--- +... +space_1:replace({1}) +--- +- [1] +... +stream_2:begin() +--- +... +space_2:replace({1}) +--- +- [1] +... +test_run:switch('test') +--- +- true +... +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2) +--- +- true +... +test_run:switch('default') +--- +- true +... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_1_no_stream:select{} +--- +- [] +... +space_2_no_stream:select{} +--- +- [] +... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) +--- +- - [1] +... +space_2:select({}) +--- +- - [1] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +-- Commit was successful, transaction can yeild with +-- memtx_use_mvcc_engine = true. Vinyl transactions +-- can yeild also. +stream_1:commit() +--- +... +stream_2:commit() +--- +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch("default") +--- +- true +... +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_1:select{} +--- +- - [1] +... +space_2:select{} +--- +- - [1] +... +test_run:switch("test") +--- +- true +... +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() +--- +- - [1] +... +s2:select() +--- +- - [1] +... +s1:drop() +--- +... +s2:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1_1 = stream_1.space.test_1 +--- +... +space_1_2 = stream_2.space.test_1 +--- +... +space_2_1 = stream_1.space.test_2 +--- +... +space_2_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +-- Simple read/write conflict. +space_1_1:select({1}) +--- +- [] +... +space_1_2:select({1}) +--- +- [] +... +space_1_1:replace({1, 1}) +--- +- [1, 1] +... +space_1_2:replace({1, 2}) +--- +- [1, 2] +... +stream_1:commit() +--- +... +-- This transaction fails, because of conflict +stream_2:commit() +--- +- error: Transaction has been aborted by conflict +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch("default") +--- +- true +... +-- Here we must accept [1, 1] +space_1_1:select({}) +--- +- - [1, 1] +... +space_1_2:select({}) +--- +- - [1, 1] +... +-- Same test for vinyl sapce +stream_1:begin() +--- +... +stream_2:begin() +--- +... +space_2_1:select({1}) +--- +- [] +... +space_2_2:select({1}) +--- +- [] +... +space_2_1:replace({1, 1}) +--- +- [1, 1] +... +space_2_2:replace({1, 2}) +--- +- [1, 2] +... +stream_1:commit() +--- +... +-- This transaction fails, because of conflict +stream_2:commit() +--- +- error: Transaction has been aborted by conflict +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch("default") +--- +- true +... +-- Here we must accept [1, 1] +space_2_1:select({}) +--- +- - [1, 1] +... +space_2_2:select({}) +--- +- - [1, 1] +... +test_run:switch('test') +--- +- true +... +-- Both select return tuple [1, 1], transaction commited +s1:select() +--- +- - [1, 1] +... +s2:select() +--- +- - [1, 1] +... +s1:drop() +--- +... +s2:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Check rollback as a command for memtx and vinyl spaces +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +-- Test rollback for memtx space +space_1:replace({1}) +--- +- [1] +... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) +--- +- - [1] +... +stream_1:rollback() +--- +... +-- Select is empty, transaction rollback +space_1:select({}) +--- +- [] +... +-- Test rollback for vinyl space +space_2:replace({1}) +--- +- [1] +... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_2:select({}) +--- +- - [1] +... +stream_2:rollback() +--- +... +-- Select is empty, transaction rollback +space_2:select({}) +--- +- [] +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after rollback +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch("default") +--- +- true +... +-- This is simple test is necessary because i have a bug +-- with halting stream after rollback +stream_1:begin() +--- +... +stream_1:commit() +--- +... +stream_2:begin() +--- +... +stream_2:commit() +--- +... +conn:close() +--- +... +test_run:switch('test') +--- +- true +... +-- Both select are empty, because transaction rollback +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +s1:drop() +--- +... +s2:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Check rollback on disconnect +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +space_1:replace({1}) +--- +- [1] +... +space_1:replace({2}) +--- +- [2] +... +-- Select return two previously inserted tuples +space_1:select({}) +--- +- - [1] + - [2] +... +space_2:replace({1}) +--- +- [1] +... +space_2:replace({2}) +--- +- [2] +... +-- Select return two previously inserted tuples +space_2:select({}) +--- +- - [1] + - [2] +... +conn:close() +--- +... +test_run:switch("test") +--- +- true +... +-- Empty selects, transaction was rollback +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch("default") +--- +- true +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +-- Reconnect +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() +--- +... +stream_2:begin() +--- +... +-- Two empty selects +space_1:select({}) +--- +- [] +... +space_2:select({}) +--- +- [] +... +stream_1:commit() +--- +... +stream_2:commit() +--- +... +test_run:switch('test') +--- +- true +... +-- Both select are empty, because transaction rollback +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +s1:drop() +--- +... +s2:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Check rollback on disconnect with big count of async requests +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +space_1:replace({1}) +--- +- [1] +... +space_1:replace({2}) +--- +- [2] +... +-- Select return two previously inserted tuples +space_1:select({}) +--- +- - [1] + - [2] +... +space_2:replace({1}) +--- +- [1] +... +space_2:replace({2}) +--- +- [2] +... +-- Select return two previously inserted tuples +space_2:select({}) +--- +- - [1] + - [2] +... +-- We send a large number of asynchronous requests, +-- their result is not important to us, it is important +-- that they will be in the stream queue at the time of +-- the disconnect. +test_run:cmd("setopt delimiter ';'") +--- +- true +... +for i = 1, 1000 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +fiber.sleep(0) +--- +... +conn:close() +--- +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); +--- +- true +... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); +--- +- true +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Select was empty, transaction rollbacked +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +test_run:switch("default") +--- +- true +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +-- Same test, but now we check that if `commit` was received +-- by server before connection closed, we processed it successful. +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +-- Here, for a large number of messages, we cannot guarantee their processing, +-- since if the net_msg_max limit is reached, we will stop processing incoming +-- requests, and after close, we will discard all raw data. '100' is the number +-- of messages that we can process without reaching net_msg_max. We will not try +-- any more, so as not to make a test flaky. +for i = 1, 100 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +_ = stream_1:commit({is_async = true}) +--- +... +_ = stream_2:commit({is_async = true}) +--- +... +fiber.sleep(0) +--- +... +conn:close() +--- +... +test_run:switch("test") +--- +- true +... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); +--- +- true +... +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); +--- +- true +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Select return tuples from [1] to [100], +-- transaction was commit +rc1 = s1:select() +--- +... +rc2 = s2:select() +--- +... +assert(#rc1) +--- +- 100 +... +assert(#rc2) +--- +- 100 +... +s1:truncate() +--- +... +s2:truncate() +--- +... +test_run:switch("default") +--- +- true +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +-- Reconnect +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() +--- +... +stream_2:begin() +--- +... +-- Two empty selects +space_1:select({}) +--- +- [] +... +space_2:select({}) +--- +- [] +... +stream_1:commit() +--- +... +stream_2:commit() +--- +... +test_run:switch('test') +--- +- true +... +-- Both select are empty, because transaction rollback +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +s1:drop() +--- +... +s2:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Check that all requests between `begin` and `commit` +-- have correct lsn and tsn values. During my work on the +-- patch, i see that all requests in stream comes with +-- header->is_commit == true, so if we are in transaction +-- in stream we should set this value to false, otherwise +-- during recovering `wal_stream_apply_dml_row` fails, because +-- of LSN/TSN mismatch. Here is a special test case for it. +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'memtx' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +space_2 = stream_2.space.test_2 +--- +... +stream_1:begin() +--- +... +stream_2:begin() +--- +... +space_1:replace({1}) +--- +- [1] +... +space_1:replace({2}) +--- +- [2] +... +space_2:replace({1}) +--- +- [1] +... +space_2:replace({2}) +--- +- [2] +... +stream_1:commit() +--- +... +stream_2:commit() +--- +... +test_run:switch('test') +--- +- true +... +-- Here we get two tuples, commit was successful +s1:select{} +--- +- - [1] + - [2] +... +-- Here we get two tuples, commit was successful +s2:select{} +--- +- - [1] + - [2] +... +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +test_run:cmd("start server test with args='1, true'") +--- +- true +... +test_run:switch('test') +--- +- true +... +-- Here we get two tuples, commit was successful +box.space.test_1:select{} +--- +- - [1] + - [2] +... +-- Here we get two tuples, commit was successful +box.space.test_2:select{} +--- +- - [1] + - [2] +... +box.space.test_1:drop() +--- +... +box.space.test_2:drop() +--- +... +test_run:switch('default') +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Same transactions checks for async mode +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream_1 = conn:new_stream() +--- +... +space_1 = stream_1.space.test_1 +--- +... +stream_2 = conn:new_stream() +--- +... +space_2 = stream_2.space.test_2 +--- +... +memtx_futures = {} +--- +... +memtx_futures["begin"] = stream_1:begin({is_async = true}) +--- +... +memtx_futures["replace"] = space_1:replace({1}, {is_async = true}) +--- +... +memtx_futures["insert"] = space_1:insert({2}, {is_async = true}) +--- +... +memtx_futures["select"] = space_1:select({}, {is_async = true}) +--- +... +vinyl_futures = {} +--- +... +vinyl_futures["begin"] = stream_2:begin({is_async = true}) +--- +... +vinyl_futures["replace"] = space_2:replace({1}, {is_async = true}) +--- +... +vinyl_futures["insert"] = space_2:insert({2}, {is_async = true}) +--- +... +vinyl_futures["select"] = space_2:select({}, {is_async = true}) +--- +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s1:select() +--- +- [] +... +s2:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +memtx_futures["commit"] = stream_1:commit({is_async = true}) +--- +... +vinyl_futures["commit"] = stream_2:commit({is_async = true}) +--- +... +memtx_results = wait_and_return_results(memtx_futures) +--- +... +vinyl_results = wait_and_return_results(vinyl_futures) +--- +... +-- If begin was successful it return nil +assert(not memtx_results["begin"]) +--- +- true +... +assert(not vinyl_results["begin"]) +--- +- true +... +-- [1] +assert(memtx_results["replace"]) +--- +- [1] +... +assert(vinyl_results["replace"]) +--- +- [1] +... +-- [2] +assert(memtx_results["insert"]) +--- +- [2] +... +assert(vinyl_results["insert"]) +--- +- [2] +... +-- [1] [2] +assert(memtx_results["select"]) +--- +- - [1] + - [2] +... +assert(vinyl_results["select"]) +--- +- - [1] + - [2] +... +-- If commit was successful it return nil +assert(not memtx_results["commit"]) +--- +- true +... +assert(not vinyl_results["commit"]) +--- +- true +... +test_run:switch("test") +--- +- true +... +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() +--- +- - [1] + - [2] +... +s2:select() +--- +- - [1] + - [2] +... +s1:drop() +--- +... +s2:drop() +--- +... +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +--- +... +_ = s1:create_index('primary') +--- +... +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +--- +... +_ = s2:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +stream_1 = conn:new_stream() +--- +... +stream_2 = conn:new_stream() +--- +... +space_1_1 = stream_1.space.test_1 +--- +... +space_1_2 = stream_2.space.test_1 +--- +... +space_2_1 = stream_1.space.test_2 +--- +... +space_2_2 = stream_2.space.test_2 +--- +... +futures_1 = {} +--- +... +-- Simple read/write conflict. +futures_1["begin_1"] = stream_1:begin({is_async = true}) +--- +... +futures_1["begin_2"] = stream_2:begin({is_async = true}) +--- +... +futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true}) +--- +... +futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true}) +--- +... +futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true}) +--- +... +futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true}) +--- +... +futures_1["commit_1"] = stream_1:commit({is_async = true}) +--- +... +futures_1["commit_2"] = stream_2:commit({is_async = true}) +--- +... +futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true}) +--- +... +futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true}) +--- +... +results_1 = wait_and_return_results(futures_1) +--- +... +-- Successful begin return nil +assert(not results_1["begin_1"]) +--- +- true +... +assert(not results_1["begin_2"]) +--- +- true +... +-- [] +assert(not results_1["select_1_1"][1]) +--- +- true +... +assert(not results_1["select_1_2"][1]) +--- +- true +... +-- [1] +assert(results_1["replace_1_1"][1]) +--- +- 1 +... +-- [1] +assert(results_1["replace_1_1"][2]) +--- +- 1 +... +-- [1] +assert(results_1["replace_1_2"][1]) +--- +- 1 +... +-- [2] +assert(results_1["replace_1_2"][2]) +--- +- 2 +... +-- Successful commit return nil +assert(not results_1["commit_1"]) +--- +- true +... +-- Error because of transaction conflict +assert(results_1["commit_2"]) +--- +- Transaction has been aborted by conflict +... +-- [1, 1] +assert(results_1["select_1_1_A"][1]) +--- +- [1, 1] +... +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_1_2_A"][1] +futures_2 = {} +--- +... +-- Simple read/write conflict. +futures_2["begin_1"] = stream_1:begin({is_async = true}) +--- +... +futures_2["begin_2"] = stream_2:begin({is_async = true}) +--- +... +futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true}) +--- +... +futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true}) +--- +... +futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true}) +--- +... +futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true}) +--- +... +futures_2["commit_1"] = stream_1:commit({is_async = true}) +--- +... +futures_2["commit_2"] = stream_2:commit({is_async = true}) +--- +... +futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true}) +--- +... +futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true}) +--- +... +results_2 = wait_and_return_results(futures_2) +--- +... +-- Successful begin return nil +assert(not results_2["begin_1"]) +--- +- true +... +assert(not results_2["begin_2"]) +--- +- true +... +-- [] +assert(not results_2["select_2_1"][1]) +--- +- true +... +assert(not results_2["select_2_2"][1]) +--- +- true +... +-- [1] +assert(results_2["replace_2_1"][1]) +--- +- 1 +... +-- [1] +assert(results_2["replace_2_1"][2]) +--- +- 1 +... +-- [1] +assert(results_2["replace_2_2"][1]) +--- +- 1 +... +-- [2] +assert(results_2["replace_2_2"][2]) +--- +- 2 +... +-- Successful commit return nil +assert(not results_2["commit_1"]) +--- +- true +... +-- Error because of transaction conflict +assert(results_2["commit_2"]) +--- +- Transaction has been aborted by conflict +... +-- [1, 1] +assert(results_2["select_2_1_A"][1]) +--- +- [1, 1] +... +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_2_2_A"][1] +test_run:switch('test') +--- +- true +... +-- Both select return tuple [1, 1], transaction commited +s1:select() +--- +- - [1, 1] +... +s2:select() +--- +- - [1, 1] +... +s1:drop() +--- +... +s2:drop() +--- +... +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Checks for iproto call/eval/execute in stream +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s = box.schema.space.create('test', { engine = 'memtx' }) +--- +... +_ = s:create_index('primary') +--- +... +function ping() return "pong" end +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream = conn:new_stream() +--- +... +space = stream.space.test +--- +... +space_no_stream = conn.space.test +--- +... +-- successful begin using stream:call +stream:call('box.begin') +--- +... +-- error: Operation is not permitted when there is an active transaction +stream:eval('box.begin()') +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +-- error: Operation is not permitted when there is an active transaction +stream:begin() +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +-- error: Operation is not permitted when there is an active transaction +stream:execute('START TRANSACTION') +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +stream:call('ping') +--- +- pong +... +stream:eval('ping()') +--- +... +-- error: Operation is not permitted when there is an active transaction +stream:call('box.begin') +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +stream:eval('box.begin()') +--- +- error: 'Operation is not permitted when there is an active transaction ' +... +-- successful commit using stream:call +stream:call('box.commit') +--- +... +-- successful begin using stream:eval +stream:eval('box.begin()') +--- +... +space:replace({1}) +--- +- [1] +... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} +--- +- [] +... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) +--- +- - [1] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +--Successful commit using stream:execute +stream:execute('COMMIT') +--- +- row_count: 0 +... +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_no_stream:select{} +--- +- - [1] +... +test_run:switch("test") +--- +- true +... +-- Select return tuple, because transaction was successful +s:select() +--- +- - [1] +... +s:delete{1} +--- +- [1] +... +test_run:switch('default') +--- +- true +... +-- Check rollback using stream:call +stream:begin() +--- +... +space:replace({2}) +--- +- [2] +... +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} +--- +- [] +... +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) +--- +- - [2] +... +test_run:switch("test") +--- +- true +... +-- Select is empty, transaction was not commited +s:select() +--- +- [] +... +test_run:switch('default') +--- +- true +... +--Successful rollback using stream:call +stream:call('box.rollback') +--- +... +-- Empty selects transaction rollbacked +space:select({}) +--- +- [] +... +space_no_stream:select{} +--- +- [] +... +test_run:switch("test") +--- +- true +... +-- Empty select transaction rollbacked +s:select() +--- +- [] +... +s:drop() +--- +... +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Simple test which demostrates that stream immediately +-- destroyed, when no processing messages in stream and +-- no active transaction. +test_run:cmd("start server test with args='10, true'") +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +test_run:switch("test") +--- +- true +... +s = box.schema.space.create('test', { engine = 'memtx' }) +--- +... +_ = s:create_index('primary') +--- +... +test_run:switch('default') +--- +- true +... +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream = conn:new_stream() +--- +... +space = stream.space.test +--- +... +for i = 1, 10 do space:replace{i} end +--- +... +test_run:switch("test") +--- +- true +... +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +s:drop() +--- +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +-- Transaction tests for sql iproto requests. +-- All this functions are copy-paste from sql/ddl.test.lua, +-- except that they check sql transactions in streams +test_run:cmd("setopt delimiter '$'") +--- +- true +... +function execute_sql_string(stream, sql_string) + if stream then + stream:execute(sql_string) + else + box.execute(sql_string) + end +end$ +--- +... +function execute_sql_string_and_return_result(stream, sql_string) + if stream then + return pcall(stream.execute, stream, sql_string) + else + return box.execute(sql_string) + end +end$ +--- +... +function monster_ddl(stream) + local _, err1, err2, err3, err4, err5, err6 + local stream_or_box = stream or box + execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER);]]) + execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER UNIQUE, + CONSTRAINT ck1 + CHECK(b < 100));]]) + + execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);') + execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);') + + execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY + KEY, a INTEGER);]]) + + execute_sql_string(stream, 'DROP INDEX t2a ON t2;') + + execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1 + CHECK(b > 0);]]) + + _, err1 = + execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename + RENAME TO t1;]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT + ck2 CHECK(a > 0);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + + _, err2 = + execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id + INTEGER PRIMARY KEY);]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + + execute_sql_string(stream, [[CREATE TABLE + trigger_catcher(id INTEGER PRIMARY + KEY AUTOINCREMENT);]]) + + execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;') + + execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;') + + execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON + t1 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;') + + execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON + t2 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err4 = + execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a + ON t1(a, b);]]) + + execute_sql_string(stream, 'TRUNCATE TABLE t1;') + _, err5 = + execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;') + _, err6 = + execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE + t_does_not_exist;]]) + + execute_sql_string(stream, 'DROP TRIGGER t2t;') + + return {'Finished ok, errors in the middle: ', err1, err2, err3, err4, + err5, err6} +end$ +--- +... +function monster_ddl_cmp_res(res1, res2) + if json.encode(res1) == json.encode(res2) then + return true + end + return res1, res2 +end$ +--- +... +function monster_ddl_is_clean(stream) + local stream_or_box = stream or box + assert(stream_or_box.space.T1 == nil) + assert(stream_or_box.space.T2 == nil) + assert(stream_or_box.space._trigger:count() == 0) + assert(stream_or_box.space._fk_constraint:count() == 0) + assert(stream_or_box.space._ck_constraint:count() == 0) + assert(stream_or_box.space.T_RENAMED == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) +end$ +--- +... +function monster_ddl_check(stream) + local _, err1, err2, err3, err4, res + local stream_or_box = stream or box + _, err1 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES (1, 1, 101)]]) + execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)') + _, err2 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES(2, 2, 1)]]) + _, err3 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, 20, 1)]]) + _, err4 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, -1, 1)]]) + execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)') + if not stream then + assert(stream_or_box.space.T_RENAMED ~= nil) + assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) + res = execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + else + _, res = + execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + end + return {'Finished ok, errors and trigger catcher content: ', err1, err2, + err3, err4, res} +end$ +--- +... +function monster_ddl_clear(stream) + execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;') + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t2') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t1') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed') +end$ +--- +... +test_run:cmd("setopt delimiter ''")$ +--- +- true +... +test_run:cmd("start server test with args='10, true'") +--- +- true +... +test_run:switch('test') +--- +- true +... +test_run:cmd("setopt delimiter '$'") +--- +- true +... +function monster_ddl_is_clean() + if not (box.space.T1 == nil) or + not (box.space.T2 == nil) or + not (box.space._trigger:count() == 0) or + not (box.space._fk_constraint:count() == 0) or + not (box.space._ck_constraint:count() == 0) or + not (box.space.T_RENAMED == nil) or + not (box.space.T_TO_RENAME == nil) then + return false + end + return true +end$ +--- +... +test_run:cmd("setopt delimiter ''")$ +--- +- true +... +test_run:switch('default') +--- +- true +... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +--- +... +conn = net_box.connect(server_addr) +--- +... +stream = conn:new_stream() +--- +... +-- No txn. +true_ddl_res = monster_ddl() +--- +... +true_ddl_res +--- +- - 'Finished ok, errors in the middle: ' + - Space 'T1' already exists + - Space 'T1' already exists + - Space 'T3' does not exist + - Index 'T1A' already exists in space 'T1' + - 'Failed to execute SQL statement: can not truncate space ''T2'' because other + objects depend on it' + - Space 'T_DOES_NOT_EXIST' does not exist +... +true_check_res = monster_ddl_check() +--- +... +true_check_res +--- +- - 'Finished ok, errors and trigger catcher content: ' + - 'Check constraint failed ''CK1'': b < 100' + - Duplicate key exists in unique index "unique_unnamed_T2_2" in space "T2" with + old tuple - [1, 1, 1] and new tuple - [2, 2, 1] + - 'Failed to execute SQL statement: FOREIGN KEY constraint failed' + - 'Check constraint failed ''CK2'': a > 0' + - metadata: + - name: ID + type: integer + rows: + - [1] +... +monster_ddl_clear() +--- +... +monster_ddl_is_clean() +--- +... +-- Both DDL and cleanup in one txn in stream. +ddl_res = nil +--- +... +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +ddl_res = monster_ddl(stream) +--- +... +monster_ddl_clear(stream) +--- +... +stream:call('monster_ddl_is_clean') +--- +- true +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +monster_ddl_cmp_res(ddl_res, true_ddl_res) +--- +- true +... +-- DDL in txn, cleanup is not. +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +ddl_res = monster_ddl(stream) +--- +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +monster_ddl_cmp_res(ddl_res, true_ddl_res) +--- +- true +... +check_res = monster_ddl_check(stream) +--- +... +monster_ddl_cmp_res(check_res, true_check_res) +--- +- true +... +monster_ddl_clear(stream) +--- +... +stream:call('monster_ddl_is_clean') +--- +- true +... +-- DDL is not in txn, cleanup is. +ddl_res = monster_ddl(stream) +--- +... +monster_ddl_cmp_res(ddl_res, true_ddl_res) +--- +- true +... +check_res = monster_ddl_check(stream) +--- +... +monster_ddl_cmp_res(check_res, true_check_res) +--- +- true +... +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +monster_ddl_clear(stream) +--- +... +stream:call('monster_ddl_is_clean') +--- +- true +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +-- DDL and cleanup in separate txns. +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +ddl_res = monster_ddl(stream) +--- +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +monster_ddl_cmp_res(ddl_res, true_ddl_res) +--- +- true +... +check_res = monster_ddl_check(stream) +--- +... +monster_ddl_cmp_res(check_res, true_check_res) +--- +- true +... +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +monster_ddl_clear(stream) +--- +... +stream:call('monster_ddl_is_clean') +--- +- true +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +test_run:switch("test") +--- +- true +... +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection +--- +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +--- +- true +... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +--- +- true +... +test_run:switch('default') +--- +- true +... +conn:close() +--- +... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +--- +- true +... +-- Check for prepare and unprepare functions +conn = net_box.connect(server_addr) +--- +... +assert(conn:ping()) +--- +- true +... +stream = conn:new_stream() +--- +... +stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)') +--- +- row_count: 1 +... +-- reload schema +stream:ping() +--- +- true +... +space = stream.space.TEST +--- +... +assert(space ~= nil) +--- +- true +... +stream:execute('START TRANSACTION') +--- +- row_count: 0 +... +space:replace{1, 2, '3'} +--- +- [1, 2, '3'] +... +space:select() +--- +- - [1, 2, '3'] +... +-- select is empty, because transaction was not commited +conn.space.TEST:select() +--- +- [] +... +stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") +--- +... +conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") +--- +... +assert(stream_pr.stmt_id == conn_pr.stmt_id) +--- +- true +... +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) +--- +- metadata: + - name: ID + type: integer + - name: A + type: number + - name: B + type: string + rows: + - [1, 2, '3'] +... +-- empty select, transaction was not commited +conn:execute(conn_pr.stmt_id, {1, 2}) +--- +- metadata: + - name: ID + type: integer + - name: A + type: number + - name: B + type: string + rows: [] +... +stream:execute('COMMIT') +--- +- row_count: 0 +... +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) +--- +- metadata: + - name: ID + type: integer + - name: A + type: number + - name: B + type: string + rows: + - [1, 2, '3'] +... +-- [ 1, 2, '3' ] +conn:execute(conn_pr.stmt_id, {1, 2}) +--- +- metadata: + - name: ID + type: integer + - name: A + type: number + - name: B + type: string + rows: + - [1, 2, '3'] +... +stream:unprepare(stream_pr.stmt_id) +--- +- null +... +conn:close() +--- +... +test_run:switch('test') +--- +- true +... +-- [ 1, 2, '3' ] +box.space.TEST:select() +--- +- - [1, 2, '3'] +... +box.space.TEST:drop() +--- +... +test_run:switch('default') +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +test_run:cmd("cleanup server test") +--- +- true +... +test_run:cmd("delete server test") +--- +- true +... diff --git a/test/box/net.box_iproto_transactions_over_streams.test.lua b/test/box/net.box_iproto_transactions_over_streams.test.lua new file mode 100644 index 000000000..094c451a9 --- /dev/null +++ b/test/box/net.box_iproto_transactions_over_streams.test.lua @@ -0,0 +1,1238 @@ +-- This test checks streams iplementation in iproto (gh-5860). +net_box = require('net.box') +json = require('json') +fiber = require('fiber') +msgpack = require('msgpack') +test_run = require('test_run').new() + +test_run:cmd("create server test with script='box/iproto_streams.lua'") + +test_run:cmd("setopt delimiter ';'") +function get_current_connection_count() + local total_net_stat_table = + test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1] + assert(total_net_stat_table) + local connection_stat_table = total_net_stat_table.CONNECTIONS + assert(connection_stat_table) + return connection_stat_table.current +end; +function wait_and_return_results(futures) + local results = {} + for name, future in pairs(futures) do + local err + results[name], err = future:wait_result() + if err then + results[name] = err + end + end + return results +end; +test_run:cmd("setopt delimiter ''"); + +test_run:cmd("start server test with args='1'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +-- Simple checks for transactions +conn_1 = net_box.connect(server_addr) +conn_2 = net_box.connect(server_addr) +stream_1_1 = conn_1:new_stream() +stream_1_2 = conn_1:new_stream() +stream_2 = conn_2:new_stream() +-- It's ok to commit or rollback without any active transaction +stream_1_1:commit() +stream_1_1:rollback() + +stream_1_1:begin() +-- Error unable to start second transaction in one stream +stream_1_1:begin() +-- It's ok to start transaction in separate stream in one connection +stream_1_2:begin() +-- It's ok to start transaction in separate stream in other connection +stream_2:begin() +test_run:switch("test") +-- It's ok to start local transaction separately with active stream +-- transactions +box.begin() +box.commit() +test_run:switch("default") +stream_1_1:commit() +stream_1_2:commit() +stream_2:commit() + +-- Check unsupported requests +conn = net_box.connect(server_addr) +assert(conn:ping()) +-- Begin, commit and rollback supported only for streams +conn:_request(net_box._method.begin, nil, nil, nil) +conn:_request(net_box._method.commit, nil, nil, nil) +conn:_request(net_box._method.rollback, nil, nil, nil) +-- Not all requests supported by stream. +stream = conn:new_stream() +-- Start transaction to allocate stream object on the +-- server side +stream:begin() +IPROTO_REQUEST_TYPE = 0x00 +IPROTO_SYNC = 0x01 +IPROTO_AUTH = 7 +IPROTO_STREAM_ID = 0x0a +next_request_id = 9 +test_run:cmd("setopt delimiter ';'") +header = msgpack.encode({ + [IPROTO_REQUEST_TYPE] = IPROTO_AUTH, + [IPROTO_SYNC] = next_request_id, + [IPROTO_STREAM_ID] = 1, +}); +body = msgpack.encode({nil}); +size = msgpack.encode(header:len() + body:len()); +conn._transport.perform_request(nil, nil, false, net_box._method.inject, + nil, nil, nil, nil, + size .. header .. body); +test_run:cmd("setopt delimiter ''"); +conn:close() +test_run:cmd("stop server test") + +-- Second argument (false is a value for memtx_use_mvcc_engine option) +-- Server start without active transaction manager, so all transaction +-- fails because of yeild! +test_run:cmd("start server test with args='10, false'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() +space = stream.space.test + +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = false +stream:begin() +test_run:switch('test') +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1) +test_run:switch('default') +space:replace({1}) +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space:select{} +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:commit() +-- Select is empty, transaction was aborted +space:select{} +-- Check that after failed transaction commit we able to start next +-- transaction (it's strange check, but it's necessary because it was +-- bug with it) +stream:begin() +stream:ping() +stream:commit() +-- Same checks for `call` end `eval` functions. +stream:call('box.begin') +stream:call('s:replace', {{1}}) +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +stream:call('s:select', {}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:eval('box.commit()') +-- Select is empty, transaction was aborted +space:select{} + +-- Same checks for `execute` function which can also +-- begin and commit transaction. +stream:execute('START TRANSACTION') +stream:call('s:replace', {{1}}) +-- Select is empty, because memtx_use_mvcc_engine is false +space:select({}) +stream:call('s:select', {}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false +stream:execute('COMMIT') +-- Select is empty, transaction was aborted +space:select{} + +test_run:switch('test') +s:drop() +-- Check that there are no streams and messages, which +-- was not deleted +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +stream:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Next we check transactions only for memtx with +-- memtx_use_mvcc_engine = true and for vinyl, because +-- if memtx_use_mvcc_engine = false all transactions fails, +-- as we can see before! + +-- Second argument (true is a value for memtx_use_mvcc_engine option) +-- Same test case as previous but server start with active transaction +-- manager. Also check vinyl, because it's behaviour is same. +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s1:create_index('primary') +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +-- Spaces getting from connection, not from stream has no stream_id +-- and not belongs to stream +space_1_no_stream = conn.space.test_1 +space_2_no_stream = conn.space.test_2 +-- Check syncronious stream txn requests for memtx +-- with memtx_use_mvcc_engine = true and to vinyl: +-- behaviour is same! +stream_1:begin() +space_1:replace({1}) +stream_2:begin() +space_2:replace({1}) +test_run:switch('test') +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2) +test_run:switch('default') +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_1_no_stream:select{} +space_2_no_stream:select{} +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) +space_2:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s1:select() +s2:select() +test_run:switch('default') +-- Commit was successful, transaction can yeild with +-- memtx_use_mvcc_engine = true. Vinyl transactions +-- can yeild also. +stream_1:commit() +stream_2:commit() +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") + +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_1:select{} +space_2:select{} +test_run:switch("test") +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1_1 = stream_1.space.test_1 +space_1_2 = stream_2.space.test_1 +space_2_1 = stream_1.space.test_2 +space_2_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +-- Simple read/write conflict. +space_1_1:select({1}) +space_1_2:select({1}) +space_1_1:replace({1, 1}) +space_1_2:replace({1, 2}) +stream_1:commit() +-- This transaction fails, because of conflict +stream_2:commit() +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +-- Here we must accept [1, 1] +space_1_1:select({}) +space_1_2:select({}) + +-- Same test for vinyl sapce +stream_1:begin() +stream_2:begin() +space_2_1:select({1}) +space_2_2:select({1}) +space_2_1:replace({1, 1}) +space_2_2:replace({1, 2}) +stream_1:commit() +-- This transaction fails, because of conflict +stream_2:commit() +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after commit +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +-- Here we must accept [1, 1] +space_2_1:select({}) +space_2_2:select({}) + +test_run:switch('test') +-- Both select return tuple [1, 1], transaction commited +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check rollback as a command for memtx and vinyl spaces +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +-- Test rollback for memtx space +space_1:replace({1}) +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_1:select({}) +stream_1:rollback() +-- Select is empty, transaction rollback +space_1:select({}) + +-- Test rollback for vinyl space +space_2:replace({1}) +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space_2:select({}) +stream_2:rollback() +-- Select is empty, transaction rollback +space_2:select({}) + +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after rollback +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") + +-- This is simple test is necessary because i have a bug +-- with halting stream after rollback +stream_1:begin() +stream_1:commit() +stream_2:begin() +stream_2:commit() +conn:close() + +test_run:switch('test') +-- Both select are empty, because transaction rollback +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check rollback on disconnect +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +space_1:replace({1}) +space_1:replace({2}) +-- Select return two previously inserted tuples +space_1:select({}) + +space_2:replace({1}) +space_2:replace({2}) +-- Select return two previously inserted tuples +space_2:select({}) +conn:close() + +test_run:switch("test") +-- Empty selects, transaction was rollback +s1:select() +s2:select() +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + +-- Reconnect +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() +stream_2:begin() +-- Two empty selects +space_1:select({}) +space_2:select({}) +stream_1:commit() +stream_2:commit() + +test_run:switch('test') +-- Both select are empty, because transaction rollback +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check rollback on disconnect with big count of async requests +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() + +space_1:replace({1}) +space_1:replace({2}) +-- Select return two previously inserted tuples +space_1:select({}) + +space_2:replace({1}) +space_2:replace({2}) +-- Select return two previously inserted tuples +space_2:select({}) +-- We send a large number of asynchronous requests, +-- their result is not important to us, it is important +-- that they will be in the stream queue at the time of +-- the disconnect. +test_run:cmd("setopt delimiter ';'") +for i = 1, 1000 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; +test_run:cmd("setopt delimiter ''"); +fiber.sleep(0) +conn:close() + +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +test_run:cmd("setopt delimiter ';'") +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); +test_run:cmd("setopt delimiter ''"); +-- Select was empty, transaction rollbacked +s1:select() +s2:select() +test_run:switch("default") +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + +-- Same test, but now we check that if `commit` was received +-- by server before connection closed, we processed it successful. +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +stream_1:begin() +stream_2:begin() +test_run:cmd("setopt delimiter ';'") +-- Here, for a large number of messages, we cannot guarantee their processing, +-- since if the net_msg_max limit is reached, we will stop processing incoming +-- requests, and after close, we will discard all raw data. '100' is the number +-- of messages that we can process without reaching net_msg_max. We will not try +-- any more, so as not to make a test flaky. +for i = 1, 100 do + space_1:replace({i}, {is_async = true}) + space_2:replace({i}, {is_async = true}) +end; +test_run:cmd("setopt delimiter ''"); +_ = stream_1:commit({is_async = true}) +_ = stream_2:commit({is_async = true}) +fiber.sleep(0) +conn:close() + +test_run:switch("test") +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +test_run:cmd("setopt delimiter ';'") +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0 +end); +test_run:wait_cond(function () + return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0 +end); +test_run:cmd("setopt delimiter ''"); +-- Select return tuples from [1] to [100], +-- transaction was commit +rc1 = s1:select() +rc2 = s2:select() +assert(#rc1) +assert(#rc2) +s1:truncate() +s2:truncate() +test_run:switch("default") +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + +-- Reconnect +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 +-- We can begin new transactions with same stream_id, because +-- previous one was rollbacked and destroyed. +stream_1:begin() +stream_2:begin() +-- Two empty selects +space_1:select({}) +space_2:select({}) +stream_1:commit() +stream_2:commit() + +test_run:switch('test') +-- Both select are empty, because transaction rollback +s1:select() +s2:select() +s1:drop() +s2:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check that all requests between `begin` and `commit` +-- have correct lsn and tsn values. During my work on the +-- patch, i see that all requests in stream comes with +-- header->is_commit == true, so if we are in transaction +-- in stream we should set this value to false, otherwise +-- during recovering `wal_stream_apply_dml_row` fails, because +-- of LSN/TSN mismatch. Here is a special test case for it. +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'memtx' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1 = stream_1.space.test_1 +space_2 = stream_2.space.test_2 + +stream_1:begin() +stream_2:begin() +space_1:replace({1}) +space_1:replace({2}) +space_2:replace({1}) +space_2:replace({2}) +stream_1:commit() +stream_2:commit() + +test_run:switch('test') +-- Here we get two tuples, commit was successful +s1:select{} +-- Here we get two tuples, commit was successful +s2:select{} +-- Check that there are no streams and messages, which +-- was not deleted after connection close +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +test_run:cmd("start server test with args='1, true'") +test_run:switch('test') +-- Here we get two tuples, commit was successful +box.space.test_1:select{} +-- Here we get two tuples, commit was successful +box.space.test_2:select{} +box.space.test_1:drop() +box.space.test_2:drop() +test_run:switch('default') +test_run:cmd("stop server test") + +-- Same transactions checks for async mode +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream_1 = conn:new_stream() +space_1 = stream_1.space.test_1 +stream_2 = conn:new_stream() +space_2 = stream_2.space.test_2 + +memtx_futures = {} +memtx_futures["begin"] = stream_1:begin({is_async = true}) +memtx_futures["replace"] = space_1:replace({1}, {is_async = true}) +memtx_futures["insert"] = space_1:insert({2}, {is_async = true}) +memtx_futures["select"] = space_1:select({}, {is_async = true}) + +vinyl_futures = {} +vinyl_futures["begin"] = stream_2:begin({is_async = true}) +vinyl_futures["replace"] = space_2:replace({1}, {is_async = true}) +vinyl_futures["insert"] = space_2:insert({2}, {is_async = true}) +vinyl_futures["select"] = space_2:select({}, {is_async = true}) + +test_run:switch("test") +-- Select is empty, transaction was not commited +s1:select() +s2:select() +test_run:switch('default') +memtx_futures["commit"] = stream_1:commit({is_async = true}) +vinyl_futures["commit"] = stream_2:commit({is_async = true}) + +memtx_results = wait_and_return_results(memtx_futures) +vinyl_results = wait_and_return_results(vinyl_futures) +-- If begin was successful it return nil +assert(not memtx_results["begin"]) +assert(not vinyl_results["begin"]) +-- [1] +assert(memtx_results["replace"]) +assert(vinyl_results["replace"]) +-- [2] +assert(memtx_results["insert"]) +assert(vinyl_results["insert"]) +-- [1] [2] +assert(memtx_results["select"]) +assert(vinyl_results["select"]) +-- If commit was successful it return nil +assert(not memtx_results["commit"]) +assert(not vinyl_results["commit"]) + +test_run:switch("test") +-- Select return tuple, which was previously inserted, +-- because transaction was successful +s1:select() +s2:select() +s1:drop() +s2:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Check conflict resolution in stream transactions, +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + +test_run:switch("test") +s1 = box.schema.space.create('test_1', { engine = 'memtx' }) +_ = s1:create_index('primary') +s2 = box.schema.space.create('test_2', { engine = 'vinyl' }) +_ = s2:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +stream_1 = conn:new_stream() +stream_2 = conn:new_stream() +space_1_1 = stream_1.space.test_1 +space_1_2 = stream_2.space.test_1 +space_2_1 = stream_1.space.test_2 +space_2_2 = stream_2.space.test_2 + +futures_1 = {} +-- Simple read/write conflict. +futures_1["begin_1"] = stream_1:begin({is_async = true}) +futures_1["begin_2"] = stream_2:begin({is_async = true}) +futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true}) +futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true}) +futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true}) +futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true}) +futures_1["commit_1"] = stream_1:commit({is_async = true}) +futures_1["commit_2"] = stream_2:commit({is_async = true}) +futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true}) +futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true}) + +results_1 = wait_and_return_results(futures_1) +-- Successful begin return nil +assert(not results_1["begin_1"]) +assert(not results_1["begin_2"]) +-- [] +assert(not results_1["select_1_1"][1]) +assert(not results_1["select_1_2"][1]) +-- [1] +assert(results_1["replace_1_1"][1]) +-- [1] +assert(results_1["replace_1_1"][2]) +-- [1] +assert(results_1["replace_1_2"][1]) +-- [2] +assert(results_1["replace_1_2"][2]) +-- Successful commit return nil +assert(not results_1["commit_1"]) +-- Error because of transaction conflict +assert(results_1["commit_2"]) +-- [1, 1] +assert(results_1["select_1_1_A"][1]) +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_1_2_A"][1] + +futures_2 = {} +-- Simple read/write conflict. +futures_2["begin_1"] = stream_1:begin({is_async = true}) +futures_2["begin_2"] = stream_2:begin({is_async = true}) +futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true}) +futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true}) +futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true}) +futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true}) +futures_2["commit_1"] = stream_1:commit({is_async = true}) +futures_2["commit_2"] = stream_2:commit({is_async = true}) +futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true}) +futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true}) + +results_2 = wait_and_return_results(futures_2) +-- Successful begin return nil +assert(not results_2["begin_1"]) +assert(not results_2["begin_2"]) +-- [] +assert(not results_2["select_2_1"][1]) +assert(not results_2["select_2_2"][1]) +-- [1] +assert(results_2["replace_2_1"][1]) +-- [1] +assert(results_2["replace_2_1"][2]) +-- [1] +assert(results_2["replace_2_2"][1]) +-- [2] +assert(results_2["replace_2_2"][2]) +-- Successful commit return nil +assert(not results_2["commit_1"]) +-- Error because of transaction conflict +assert(results_2["commit_2"]) +-- [1, 1] +assert(results_2["select_2_1_A"][1]) +-- commit_1 could have ended before commit_2, so +-- here we can get both empty select and [1, 1] +-- for results_1["select_2_2_A"][1] + +test_run:switch('test') +-- Both select return tuple [1, 1], transaction commited +s1:select() +s2:select() +s1:drop() +s2:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Checks for iproto call/eval/execute in stream +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +test_run:switch("test") +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +function ping() return "pong" end +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() +space = stream.space.test +space_no_stream = conn.space.test + +-- successful begin using stream:call +stream:call('box.begin') +-- error: Operation is not permitted when there is an active transaction +stream:eval('box.begin()') +-- error: Operation is not permitted when there is an active transaction +stream:begin() +-- error: Operation is not permitted when there is an active transaction +stream:execute('START TRANSACTION') +stream:call('ping') +stream:eval('ping()') +-- error: Operation is not permitted when there is an active transaction +stream:call('box.begin') +stream:eval('box.begin()') +-- successful commit using stream:call +stream:call('box.commit') + +-- successful begin using stream:eval +stream:eval('box.begin()') +space:replace({1}) +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +--Successful commit using stream:execute +stream:execute('COMMIT') +-- Select return tuple, which was previously inserted, +-- because transaction was successful +space_no_stream:select{} +test_run:switch("test") +-- Select return tuple, because transaction was successful +s:select() +s:delete{1} +test_run:switch('default') +-- Check rollback using stream:call +stream:begin() +space:replace({2}) +-- Empty select, transaction was not commited and +-- is not visible from requests not belonging to the +-- transaction. +space_no_stream:select{} +-- Select return tuple, which was previously inserted, +-- because this select belongs to transaction. +space:select({}) +test_run:switch("test") +-- Select is empty, transaction was not commited +s:select() +test_run:switch('default') +--Successful rollback using stream:call +stream:call('box.rollback') +-- Empty selects transaction rollbacked +space:select({}) +space_no_stream:select{} +test_run:switch("test") +-- Empty select transaction rollbacked +s:select() +s:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Simple test which demostrates that stream immediately +-- destroyed, when no processing messages in stream and +-- no active transaction. + +test_run:cmd("start server test with args='10, true'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +test_run:switch("test") +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() +space = stream.space.test +for i = 1, 10 do space:replace{i} end +test_run:switch("test") +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +s:drop() +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:cmd("stop server test") + +-- Transaction tests for sql iproto requests. +-- All this functions are copy-paste from sql/ddl.test.lua, +-- except that they check sql transactions in streams +test_run:cmd("setopt delimiter '$'") +function execute_sql_string(stream, sql_string) + if stream then + stream:execute(sql_string) + else + box.execute(sql_string) + end +end$ +function execute_sql_string_and_return_result(stream, sql_string) + if stream then + return pcall(stream.execute, stream, sql_string) + else + return box.execute(sql_string) + end +end$ +function monster_ddl(stream) + local _, err1, err2, err3, err4, err5, err6 + local stream_or_box = stream or box + execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER);]]) + execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY, + a INTEGER, + b INTEGER UNIQUE, + CONSTRAINT ck1 + CHECK(b < 100));]]) + + execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);') + execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);') + + execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY + KEY, a INTEGER);]]) + + execute_sql_string(stream, 'DROP INDEX t2a ON t2;') + + execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1 + CHECK(b > 0);]]) + + _, err1 = + execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename + RENAME TO t1;]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT + ck2 CHECK(a > 0);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;') + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + + _, err2 = + execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id + INTEGER PRIMARY KEY);]]) + + execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY + (a) REFERENCES t2(b);]]) + + execute_sql_string(stream, [[CREATE TABLE + trigger_catcher(id INTEGER PRIMARY + KEY AUTOINCREMENT);]]) + + execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;') + + execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;') + + execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON + t1 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;') + + execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON + t2 FOR EACH ROW + BEGIN + INSERT INTO trigger_catcher VALUES(1); + END; ]]) + + _, err4 = + execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a + ON t1(a, b);]]) + + execute_sql_string(stream, 'TRUNCATE TABLE t1;') + _, err5 = + execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;') + _, err6 = + execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE + t_does_not_exist;]]) + + execute_sql_string(stream, 'DROP TRIGGER t2t;') + + return {'Finished ok, errors in the middle: ', err1, err2, err3, err4, + err5, err6} +end$ +function monster_ddl_cmp_res(res1, res2) + if json.encode(res1) == json.encode(res2) then + return true + end + return res1, res2 +end$ +function monster_ddl_is_clean(stream) + local stream_or_box = stream or box + assert(stream_or_box.space.T1 == nil) + assert(stream_or_box.space.T2 == nil) + assert(stream_or_box.space._trigger:count() == 0) + assert(stream_or_box.space._fk_constraint:count() == 0) + assert(stream_or_box.space._ck_constraint:count() == 0) + assert(stream_or_box.space.T_RENAMED == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) +end$ +function monster_ddl_check(stream) + local _, err1, err2, err3, err4, res + local stream_or_box = stream or box + _, err1 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES (1, 1, 101)]]) + execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)') + _, err2 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t2 + VALUES(2, 2, 1)]]) + _, err3 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, 20, 1)]]) + _, err4 = + execute_sql_string_and_return_result(stream, [[INSERT INTO t1 + VALUES(1, -1, 1)]]) + execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)') + if not stream then + assert(stream_or_box.space.T_RENAMED ~= nil) + assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil) + assert(stream_or_box.space.T_TO_RENAME == nil) + res = execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + else + _, res = + execute_sql_string_and_return_result(stream, [[SELECT * FROM + trigger_catcher]]) + end + return {'Finished ok, errors and trigger catcher content: ', err1, err2, + err3, err4, res} +end$ +function monster_ddl_clear(stream) + execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;') + execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t2') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t1') + execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed') +end$ +test_run:cmd("setopt delimiter ''")$ + +test_run:cmd("start server test with args='10, true'") +test_run:switch('test') +test_run:cmd("setopt delimiter '$'") +function monster_ddl_is_clean() + if not (box.space.T1 == nil) or + not (box.space.T2 == nil) or + not (box.space._trigger:count() == 0) or + not (box.space._fk_constraint:count() == 0) or + not (box.space._ck_constraint:count() == 0) or + not (box.space.T_RENAMED == nil) or + not (box.space.T_TO_RENAME == nil) then + return false + end + return true +end$ +test_run:cmd("setopt delimiter ''")$ +test_run:switch('default') + +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +conn = net_box.connect(server_addr) +stream = conn:new_stream() + +-- No txn. +true_ddl_res = monster_ddl() +true_ddl_res + +true_check_res = monster_ddl_check() +true_check_res + +monster_ddl_clear() +monster_ddl_is_clean() + +-- Both DDL and cleanup in one txn in stream. +ddl_res = nil +stream:execute('START TRANSACTION') +ddl_res = monster_ddl(stream) +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') +stream:execute('COMMIT') +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +-- DDL in txn, cleanup is not. +stream:execute('START TRANSACTION') +ddl_res = monster_ddl(stream) +stream:execute('COMMIT') +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +check_res = monster_ddl_check(stream) +monster_ddl_cmp_res(check_res, true_check_res) + +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') + +-- DDL is not in txn, cleanup is. +ddl_res = monster_ddl(stream) +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +check_res = monster_ddl_check(stream) +monster_ddl_cmp_res(check_res, true_check_res) + +stream:execute('START TRANSACTION') +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') +stream:execute('COMMIT') + +-- DDL and cleanup in separate txns. +stream:execute('START TRANSACTION') +ddl_res = monster_ddl(stream) +stream:execute('COMMIT') +monster_ddl_cmp_res(ddl_res, true_ddl_res) + +check_res = monster_ddl_check(stream) +monster_ddl_cmp_res(check_res, true_check_res) + +stream:execute('START TRANSACTION') +monster_ddl_clear(stream) +stream:call('monster_ddl_is_clean') +stream:execute('COMMIT') + +test_run:switch("test") +-- All messages was processed, so stream object was immediately +-- deleted, because no active transaction started. +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + + +-- Check for prepare and unprepare functions +conn = net_box.connect(server_addr) +assert(conn:ping()) +stream = conn:new_stream() + +stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)') +-- reload schema +stream:ping() +space = stream.space.TEST +assert(space ~= nil) +stream:execute('START TRANSACTION') +space:replace{1, 2, '3'} +space:select() +-- select is empty, because transaction was not commited +conn.space.TEST:select() +stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") +conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;") +assert(stream_pr.stmt_id == conn_pr.stmt_id) +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) +-- empty select, transaction was not commited +conn:execute(conn_pr.stmt_id, {1, 2}) +stream:execute('COMMIT') +-- [ 1, 2, '3' ] +stream:execute(stream_pr.stmt_id, {1, 2}) +-- [ 1, 2, '3' ] +conn:execute(conn_pr.stmt_id, {1, 2}) +stream:unprepare(stream_pr.stmt_id) +conn:close() +test_run:switch('test') +-- [ 1, 2, '3' ] +box.space.TEST:select() +box.space.TEST:drop() +test_run:switch('default') +test_run:cmd("stop server test") + +test_run:cmd("cleanup server test") +test_run:cmd("delete server test") diff --git a/test/box/suite.ini b/test/box/suite.ini index 637766cdd..369354eda 100644 --- a/test/box/suite.ini +++ b/test/box/suite.ini @@ -5,7 +5,7 @@ script = box.lua disabled = rtree_errinj.test.lua tuple_bench.test.lua long_run = huge_field_map_long.test.lua config = engine.cfg -release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua net.box_iproto_streams.test.lua +release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua net.box_iproto_streams.test.lua net.box_iproto_transactions_over_streams.test.lua lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua use_unix_sockets = True use_unix_sockets_iproto = True -- 2.20.1