Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v4 0/9] implement iproto streams
@ 2021-08-12  9:50 mechanik20051988 via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 1/9] xrow: remove unused call_request::header mechanik20051988 via Tarantool-patches
                   ` (9 more replies)
  0 siblings, 10 replies; 15+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-12  9:50 UTC (permalink / raw)
  To: tarantool-patches, vdavydov, v.shpilevoy

branch: https://github.com/tarantool/tarantool/tree/mechanik20051988/gh-5860-iproto-streams-v1
pr: https://github.com/tarantool/tarantool/pull/6139

Changes in v4:
1 - 4: commits was approved.
5: move 'if (stream == NULL)' check into 'iproto_msg_finish_processing_in_stream' function.
6: - rename test
   - rename 'local new_stream' function to 'stream_new_stream'.
   - we unable to use 'self._stream_id' in _request method, because when we call
     it for spaces and indexes, 'self' == 'remote', which has no stream_id.
7: - add RAFT prefix for all requests types in iproto.
8: - rename 'rollback_on_disconnect_stream_route' ot 'rollback_on_disconnect_route'.
   - rename 'iproto_stream_push_on_disconnect_msg' to 'iproto_stream_rollback_on_disconnect'
   - add comment why we need 'mh_size(streams) == 0' check in iproto_connection_is_idle.
   - add some asserts as indicated in the review.
9: - rename begin, commit, rollback functions to stream_begin, stream_commit and stream_rollback.
   - add separate test for transactions.

Vladimir Davydov (2):
  xrow: remove unused call_request::header
  iproto: clear request::header for client requests

mechanik20051988 (7):
  iproto: implement stream id in binary iproto protocol
  salad: fix segfault in case when mhash table allocation failure
  iproto: implement streams in iproto
  net.box: add stream support to net.box
  iproto: add RAFT prefix for all requests related to 'raft'.
  iproto: implement interactive transactions over iproto streams
  net.box: add interactive transaction support in net.box

 .../gh-5860-implement-streams-in-iproto.md    |   26 +
 src/box/box.cc                                |    4 +-
 src/box/call.c                                |   12 -
 src/box/errcode.h                             |    2 +
 src/box/iproto.cc                             |  479 ++-
 src/box/iproto_constants.c                    |   10 +-
 src/box/iproto_constants.h                    |   29 +-
 src/box/lua/net_box.c                         |  144 +-
 src/box/lua/net_box.lua                       |  240 +-
 src/box/memtx_engine.c                        |    4 +-
 src/box/txn.c                                 |   25 +-
 src/box/txn.h                                 |   19 +
 src/box/txn_limbo.c                           |   18 +-
 src/box/xrow.c                                |    9 +-
 src/box/xrow.h                                |   13 +-
 src/lib/core/errinj.h                         |    2 +
 src/lib/salad/mhash.h                         |   99 +-
 test/box-tap/feedback_daemon.test.lua         |    2 +-
 test/box/access.result                        |    6 +-
 test/box/access.test.lua                      |    6 +-
 test/box/errinj.result                        |    2 +
 test/box/error.result                         |    2 +
 test/box/iproto_streams.lua                   |   13 +
 test/box/misc.result                          |    5 +-
 ...net.box_console_connections_gh-2677.result |    2 +-
 ...t.box_console_connections_gh-2677.test.lua |    2 +-
 .../net.box_incorrect_iterator_gh-841.result  |    4 +-
 ...net.box_incorrect_iterator_gh-841.test.lua |    4 +-
 test/box/net.box_iproto_hangs_gh-3464.result  |    2 +-
 .../box/net.box_iproto_hangs_gh-3464.test.lua |    2 +-
 test/box/net.box_iproto_streams.result        |  473 +++
 test/box/net.box_iproto_streams.test.lua      |  182 +
 ...ox_iproto_transactions_over_streams.result | 3009 +++++++++++++++++
 ..._iproto_transactions_over_streams.test.lua | 1238 +++++++
 .../net.box_long-poll_input_gh-3400.result    |    8 +-
 .../net.box_long-poll_input_gh-3400.test.lua  |    8 +-
 test/box/suite.ini                            |    2 +-
 test/unit/mhash_body.c                        |    4 +-
 test/unit/xrow.cc                             |    7 +-
 test/unit/xrow.result                         |  168 +-
 40 files changed, 6021 insertions(+), 265 deletions(-)
 create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
 create mode 100644 test/box/iproto_streams.lua
 create mode 100644 test/box/net.box_iproto_streams.result
 create mode 100644 test/box/net.box_iproto_streams.test.lua
 create mode 100644 test/box/net.box_iproto_transactions_over_streams.result
 create mode 100644 test/box/net.box_iproto_transactions_over_streams.test.lua

--
2.20.1


^ permalink raw reply	[flat|nested] 15+ messages in thread

* [Tarantool-patches] [PATCH v4 1/9] xrow: remove unused call_request::header
  2021-08-12  9:50 [Tarantool-patches] [PATCH v4 0/9] implement iproto streams mechanik20051988 via Tarantool-patches
@ 2021-08-12  9:50 ` mechanik20051988 via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 2/9] iproto: clear request::header for client requests mechanik20051988 via Tarantool-patches
                   ` (8 subsequent siblings)
  9 siblings, 0 replies; 15+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-12  9:50 UTC (permalink / raw)
  To: tarantool-patches, vdavydov, v.shpilevoy

From: Vladimir Davydov <vdavydov@tarantool.org>

---
 src/box/xrow.c | 1 -
 src/box/xrow.h | 2 --
 2 files changed, 3 deletions(-)

diff --git a/src/box/xrow.c b/src/box/xrow.c
index 8ab8b2768..a61c6e345 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1139,7 +1139,6 @@ error:
 	}
 
 	memset(request, 0, sizeof(*request));
-	request->header = row;
 
 	uint32_t map_size = mp_decode_map(&data);
 	for (uint32_t i = 0; i < map_size; ++i) {
diff --git a/src/box/xrow.h b/src/box/xrow.h
index c6e8ed0fd..0f2fcf94a 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -304,8 +304,6 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
  * CALL/EVAL request.
  */
 struct call_request {
-	/** Request header */
-	const struct xrow_header *header;
 	/** Function name for CALL request. MessagePack String. */
 	const char *name;
 	/** Expression for EVAL request. MessagePack String. */
-- 
2.20.1


^ permalink raw reply	[flat|nested] 15+ messages in thread

* [Tarantool-patches] [PATCH v4 2/9] iproto: clear request::header for client requests
  2021-08-12  9:50 [Tarantool-patches] [PATCH v4 0/9] implement iproto streams mechanik20051988 via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 1/9] xrow: remove unused call_request::header mechanik20051988 via Tarantool-patches
@ 2021-08-12  9:50 ` mechanik20051988 via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 3/9] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches
                   ` (7 subsequent siblings)
  9 siblings, 0 replies; 15+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-12  9:50 UTC (permalink / raw)
  To: tarantool-patches, vdavydov, v.shpilevoy

From: Vladimir Davydov <vdavydov@tarantool.org>

To apply a client request, we only need to know its type and body. All
the meta information, such as LSN, TSN, or replica id, must be set by
WAL. Currently, however, it isn't necessarily true: iproto leaves a
request header received over iproto as is, and tx will reuse the header
instead of allocating a new one in this case, which is needed to process
replication requests, see txn_add_redo().

Unless a client actually sets one of those meta fields, this causes no
problems. However, if we added transaction support to the replication
protocol, reusing the header would result in broken xlog, because
currently, all requests received over iproto have the is_commit field
set in xrow_header for the lack of TSN, while is_commit must only be set
for the final statement in a transaction. One way to fix it would be
clearing is_commit explicitly in iproto, but ignoring the whole header
received over iproto looks more logical and error-proof.

Needed for #5860
---
 src/box/iproto.cc | 6 ++++++
 src/box/xrow.h    | 2 +-
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 5cc69b77f..dcf60e1be 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1264,6 +1264,12 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		if (xrow_decode_dml(&msg->header, &msg->dml,
 				    dml_request_key_map(type)))
 			goto error;
+		/*
+		 * In contrast to replication requests, for a client request
+		 * the xrow header is set by WAL, which generates LSNs and sets
+		 * replica id. Ignore the header received over network.
+		 */
+		msg->dml.header = NULL;
 		assert(type < sizeof(iproto_thread->dml_route) /
 		              sizeof(*(iproto_thread->dml_route)));
 		cmsg_init(&msg->base, iproto_thread->dml_route[type]);
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 0f2fcf94a..48b8b55f5 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -163,7 +163,7 @@ struct request {
 	/*
 	 * Either log row, or network header, or NULL, depending
 	 * on where this packet originated from: the write ahead
-	 * log/snapshot, client request, or a Lua request.
+	 * log/snapshot, repliation, or a client request.
 	 */
 	struct xrow_header *header;
 	/**
-- 
2.20.1


^ permalink raw reply	[flat|nested] 15+ messages in thread

* [Tarantool-patches] [PATCH v4 3/9] iproto: implement stream id in binary iproto protocol
  2021-08-12  9:50 [Tarantool-patches] [PATCH v4 0/9] implement iproto streams mechanik20051988 via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 1/9] xrow: remove unused call_request::header mechanik20051988 via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 2/9] iproto: clear request::header for client requests mechanik20051988 via Tarantool-patches
@ 2021-08-12  9:50 ` mechanik20051988 via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 4/9] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches
                   ` (6 subsequent siblings)
  9 siblings, 0 replies; 15+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-12  9:50 UTC (permalink / raw)
  To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988

From: mechanik20051988 <mechanik20.05.1988@gmail.com>

For further implementation of streams, we need to separate
requests belonging to and not belonging to streams. For this
purpose, the stream ID field was added to the iproto binary
protocol. For requests that do not belong to stream, this field
is omitted or equal to zero. For requests belonging to stream,
we use this field to determine which stream the request belongs to.

Part of #5860

@TarantoolBot document
Title: new field in binary iproto protocol

Add new field to binary iproto protocol.
`IPROTO_STREAM_ID 0x0a` determines whether a request
belongs to a stream or not. If this field is omited
or equal to zero this request doesn't belongs to stream.
---
 src/box/iproto_constants.c |   4 +-
 src/box/iproto_constants.h |   1 +
 src/box/xrow.c             |   8 ++
 src/box/xrow.h             |   5 ++
 test/unit/xrow.cc          |   7 +-
 test/unit/xrow.result      | 168 +++++++++++++++++++------------------
 6 files changed, 109 insertions(+), 84 deletions(-)

diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index addda39dc..f2902946a 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -43,10 +43,10 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
 		/* 0x07 */	MP_UINT,   /* IPROTO_GROUP_ID */
 		/* 0x08 */	MP_UINT,   /* IPROTO_TSN */
 		/* 0x09 */	MP_UINT,   /* IPROTO_FLAGS */
+		/* 0x0a */	MP_UINT,   /* IPROTO_STREAM_ID */
 	/* }}} */
 
 	/* {{{ unused */
-		/* 0x0a */	MP_UINT,
 		/* 0x0b */	MP_UINT,
 		/* 0x0c */	MP_UINT,
 		/* 0x0d */	MP_UINT,
@@ -198,7 +198,7 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
 	"group id",         /* 0x07 */
 	"tsn",              /* 0x08 */
 	"flags",            /* 0x09 */
-	NULL,               /* 0x0a */
+	"stream_id",        /* 0x0a */
 	NULL,               /* 0x0b */
 	NULL,               /* 0x0c */
 	NULL,               /* 0x0d */
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 3d78ce2bb..b9498868c 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -72,6 +72,7 @@ enum iproto_key {
 	IPROTO_GROUP_ID = 0x07,
 	IPROTO_TSN = 0x08,
 	IPROTO_FLAGS = 0x09,
+	IPROTO_STREAM_ID = 0x0a,
 	/* Leave a gap for other keys in the header. */
 	IPROTO_SPACE_ID = 0x10,
 	IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/xrow.c b/src/box/xrow.c
index a61c6e345..7df1af4ab 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -186,6 +186,9 @@ error:
 			flags = mp_decode_uint(pos);
 			header->flags = flags;
 			break;
+		case IPROTO_STREAM_ID:
+			header->stream_id = mp_decode_uint(pos);
+			break;
 		default:
 			/* unknown header */
 			mp_next(pos);
@@ -319,6 +322,11 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
 			flags_to_encode |= IPROTO_FLAG_COMMIT;
 		}
 	}
+	if (header->stream_id != 0) {
+		d = mp_encode_uint(d, IPROTO_STREAM_ID);
+		d = mp_encode_uint(d, header->stream_id);
+		map_size++;
+	}
 	if (flags_to_encode != 0) {
 		d = mp_encode_uint(d, IPROTO_FLAGS);
 		d = mp_encode_uint(d, flags_to_encode);
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 48b8b55f5..cb83fddff 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -81,6 +81,11 @@ struct xrow_header {
 	 * transaction.
 	 */
 	int64_t tsn;
+	/**
+	 * Stream id. Used in iproto binary protocol to identify stream.
+	 * Zero if stream is not used.
+	 */
+	uint64_t stream_id;
 	/** Transaction meta flags set only in the last transaction row. */
 	union {
 		uint8_t flags;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index b6018eed9..2c0dd88b6 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -220,8 +220,10 @@ test_xrow_header_encode_decode()
 	header.bodycnt = 0;
 	header.tsn = header.lsn;
 	uint64_t sync = 100500;
+	uint64_t stream_id = 1;
 	for (int opt_idx = 0; opt_idx < bit_comb_count; opt_idx++) {
-		plan(12);
+		plan(13);
+		header.stream_id = stream_id++;
 		header.is_commit = opt_idx & 0x01;
 		header.wait_sync = opt_idx >> 1 & 0x01;
 		header.wait_ack = opt_idx >> 2 & 0x01;
@@ -229,7 +231,7 @@ test_xrow_header_encode_decode()
 		is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
 		int fixheader_len = 200;
 		pos = (char *)vec[0].iov_base + fixheader_len;
-		uint32_t exp_map_size = 5;
+		uint32_t exp_map_size = 6;
 		/*
 		 * header.is_commit flag isn't encoded, since this row looks
 		 * like a single-statement transaction.
@@ -249,6 +251,7 @@ test_xrow_header_encode_decode()
 		end += vec[0].iov_len;
 		is(xrow_header_decode(&decoded_header, &begin, end, true), 0,
 		   "header decode");
+		is(header.stream_id, decoded_header.stream_id, "decoded stream_id");
 		is(header.is_commit, decoded_header.is_commit, "decoded is_commit");
 		is(header.wait_sync, decoded_header.wait_sync, "decoded wait_sync");
 		is(header.wait_ack, decoded_header.wait_ack, "decoded wait_ack");
diff --git a/test/unit/xrow.result b/test/unit/xrow.result
index 3b705d5ba..1ca222d37 100644
--- a/test/unit/xrow.result
+++ b/test/unit/xrow.result
@@ -43,117 +43,125 @@
 ok 1 - subtests
     1..9
     ok 1 - bad msgpack end
-        1..12
+        1..13
         ok 1 - encode
         ok 2 - header map size
         ok 3 - header decode
-        ok 4 - decoded is_commit
-        ok 5 - decoded wait_sync
-        ok 6 - decoded wait_ack
-        ok 7 - decoded type
-        ok 8 - decoded replica_id
-        ok 9 - decoded lsn
-        ok 10 - decoded tm
-        ok 11 - decoded sync
-        ok 12 - decoded bodycnt
+        ok 4 - decoded stream_id
+        ok 5 - decoded is_commit
+        ok 6 - decoded wait_sync
+        ok 7 - decoded wait_ack
+        ok 8 - decoded type
+        ok 9 - decoded replica_id
+        ok 10 - decoded lsn
+        ok 11 - decoded tm
+        ok 12 - decoded sync
+        ok 13 - decoded bodycnt
     ok 2 - subtests
-        1..12
+        1..13
         ok 1 - encode
         ok 2 - header map size
         ok 3 - header decode
-        ok 4 - decoded is_commit
-        ok 5 - decoded wait_sync
-        ok 6 - decoded wait_ack
-        ok 7 - decoded type
-        ok 8 - decoded replica_id
-        ok 9 - decoded lsn
-        ok 10 - decoded tm
-        ok 11 - decoded sync
-        ok 12 - decoded bodycnt
+        ok 4 - decoded stream_id
+        ok 5 - decoded is_commit
+        ok 6 - decoded wait_sync
+        ok 7 - decoded wait_ack
+        ok 8 - decoded type
+        ok 9 - decoded replica_id
+        ok 10 - decoded lsn
+        ok 11 - decoded tm
+        ok 12 - decoded sync
+        ok 13 - decoded bodycnt
     ok 3 - subtests
-        1..12
+        1..13
         ok 1 - encode
         ok 2 - header map size
         ok 3 - header decode
-        ok 4 - decoded is_commit
-        ok 5 - decoded wait_sync
-        ok 6 - decoded wait_ack
-        ok 7 - decoded type
-        ok 8 - decoded replica_id
-        ok 9 - decoded lsn
-        ok 10 - decoded tm
-        ok 11 - decoded sync
-        ok 12 - decoded bodycnt
+        ok 4 - decoded stream_id
+        ok 5 - decoded is_commit
+        ok 6 - decoded wait_sync
+        ok 7 - decoded wait_ack
+        ok 8 - decoded type
+        ok 9 - decoded replica_id
+        ok 10 - decoded lsn
+        ok 11 - decoded tm
+        ok 12 - decoded sync
+        ok 13 - decoded bodycnt
     ok 4 - subtests
-        1..12
+        1..13
         ok 1 - encode
         ok 2 - header map size
         ok 3 - header decode
-        ok 4 - decoded is_commit
-        ok 5 - decoded wait_sync
-        ok 6 - decoded wait_ack
-        ok 7 - decoded type
-        ok 8 - decoded replica_id
-        ok 9 - decoded lsn
-        ok 10 - decoded tm
-        ok 11 - decoded sync
-        ok 12 - decoded bodycnt
+        ok 4 - decoded stream_id
+        ok 5 - decoded is_commit
+        ok 6 - decoded wait_sync
+        ok 7 - decoded wait_ack
+        ok 8 - decoded type
+        ok 9 - decoded replica_id
+        ok 10 - decoded lsn
+        ok 11 - decoded tm
+        ok 12 - decoded sync
+        ok 13 - decoded bodycnt
     ok 5 - subtests
-        1..12
+        1..13
         ok 1 - encode
         ok 2 - header map size
         ok 3 - header decode
-        ok 4 - decoded is_commit
-        ok 5 - decoded wait_sync
-        ok 6 - decoded wait_ack
-        ok 7 - decoded type
-        ok 8 - decoded replica_id
-        ok 9 - decoded lsn
-        ok 10 - decoded tm
-        ok 11 - decoded sync
-        ok 12 - decoded bodycnt
+        ok 4 - decoded stream_id
+        ok 5 - decoded is_commit
+        ok 6 - decoded wait_sync
+        ok 7 - decoded wait_ack
+        ok 8 - decoded type
+        ok 9 - decoded replica_id
+        ok 10 - decoded lsn
+        ok 11 - decoded tm
+        ok 12 - decoded sync
+        ok 13 - decoded bodycnt
     ok 6 - subtests
-        1..12
+        1..13
         ok 1 - encode
         ok 2 - header map size
         ok 3 - header decode
-        ok 4 - decoded is_commit
-        ok 5 - decoded wait_sync
-        ok 6 - decoded wait_ack
-        ok 7 - decoded type
-        ok 8 - decoded replica_id
-        ok 9 - decoded lsn
-        ok 10 - decoded tm
-        ok 11 - decoded sync
-        ok 12 - decoded bodycnt
+        ok 4 - decoded stream_id
+        ok 5 - decoded is_commit
+        ok 6 - decoded wait_sync
+        ok 7 - decoded wait_ack
+        ok 8 - decoded type
+        ok 9 - decoded replica_id
+        ok 10 - decoded lsn
+        ok 11 - decoded tm
+        ok 12 - decoded sync
+        ok 13 - decoded bodycnt
     ok 7 - subtests
-        1..12
+        1..13
         ok 1 - encode
         ok 2 - header map size
         ok 3 - header decode
-        ok 4 - decoded is_commit
-        ok 5 - decoded wait_sync
-        ok 6 - decoded wait_ack
-        ok 7 - decoded type
-        ok 8 - decoded replica_id
-        ok 9 - decoded lsn
-        ok 10 - decoded tm
-        ok 11 - decoded sync
-        ok 12 - decoded bodycnt
+        ok 4 - decoded stream_id
+        ok 5 - decoded is_commit
+        ok 6 - decoded wait_sync
+        ok 7 - decoded wait_ack
+        ok 8 - decoded type
+        ok 9 - decoded replica_id
+        ok 10 - decoded lsn
+        ok 11 - decoded tm
+        ok 12 - decoded sync
+        ok 13 - decoded bodycnt
     ok 8 - subtests
-        1..12
+        1..13
         ok 1 - encode
         ok 2 - header map size
         ok 3 - header decode
-        ok 4 - decoded is_commit
-        ok 5 - decoded wait_sync
-        ok 6 - decoded wait_ack
-        ok 7 - decoded type
-        ok 8 - decoded replica_id
-        ok 9 - decoded lsn
-        ok 10 - decoded tm
-        ok 11 - decoded sync
-        ok 12 - decoded bodycnt
+        ok 4 - decoded stream_id
+        ok 5 - decoded is_commit
+        ok 6 - decoded wait_sync
+        ok 7 - decoded wait_ack
+        ok 8 - decoded type
+        ok 9 - decoded replica_id
+        ok 10 - decoded lsn
+        ok 11 - decoded tm
+        ok 12 - decoded sync
+        ok 13 - decoded bodycnt
     ok 9 - subtests
 ok 2 - subtests
     1..1
-- 
2.20.1


^ permalink raw reply	[flat|nested] 15+ messages in thread

* [Tarantool-patches] [PATCH v4 4/9] salad: fix segfault in case when mhash table allocation failure
  2021-08-12  9:50 [Tarantool-patches] [PATCH v4 0/9] implement iproto streams mechanik20051988 via Tarantool-patches
                   ` (2 preceding siblings ...)
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 3/9] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches
@ 2021-08-12  9:50 ` mechanik20051988 via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 5/9] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches
                   ` (5 subsequent siblings)
  9 siblings, 0 replies; 15+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-12  9:50 UTC (permalink / raw)
  To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988

From: mechanik20051988 <mechanik20.05.1988@gmail.com>

There was no check for successful memory allocation in `new` and `clear`
functions for mhash table. And if the memory was not allocated, a null
pointer dereference occured.
---
 src/lib/salad/mhash.h  | 99 +++++++++++++++++++++++++++---------------
 test/unit/mhash_body.c |  4 +-
 2 files changed, 66 insertions(+), 37 deletions(-)

diff --git a/src/lib/salad/mhash.h b/src/lib/salad/mhash.h
index b555cad4c..74235eeaa 100644
--- a/src/lib/salad/mhash.h
+++ b/src/lib/salad/mhash.h
@@ -157,7 +157,7 @@ struct _mh(t) {
 #define MH_DENSITY 0.7
 
 struct _mh(t) * _mh(new)();
-void _mh(clear)(struct _mh(t) *h);
+int _mh(clear)(struct _mh(t) *h);
 void _mh(delete)(struct _mh(t) *h);
 void _mh(resize)(struct _mh(t) *h, mh_arg_t arg);
 int _mh(start_resize)(struct _mh(t) *h, mh_int_t buckets, mh_int_t batch,
@@ -399,23 +399,50 @@ _mh(del_resize)(struct _mh(t) *h, mh_int_t x,
 struct _mh(t) *
 _mh(new)()
 {
-	struct _mh(t) *h = (struct _mh(t) *) calloc(1, sizeof(*h));
-	h->shadow = (struct _mh(t) *) calloc(1, sizeof(*h));
+	struct _mh(t) *h = (struct _mh(t) *)calloc(1, sizeof(*h));
+	if (h == NULL)
+		return NULL;
+	h->shadow = (struct _mh(t) *)calloc(1, sizeof(*h));
+	if (h->shadow == NULL)
+		goto fail;
 	h->prime = 0;
 	h->n_buckets = __ac_prime_list[h->prime];
-	h->p = (mh_node_t *) calloc(h->n_buckets, sizeof(mh_node_t));
+	h->p = (mh_node_t *)calloc(h->n_buckets, sizeof(mh_node_t));
+	if (h->p == NULL)
+		goto fail;
 #if !mh_bytemap
-	h->b = (uint32_t *) calloc(h->n_buckets / 16 + 1, sizeof(uint32_t));
+	h->b = (uint32_t *)calloc(h->n_buckets / 16 + 1, sizeof(uint32_t));
 #else
-	h->b = (uint8_t *) calloc(h->n_buckets, sizeof(uint8_t));
+	h->b = (uint8_t *)calloc(h->n_buckets, sizeof(uint8_t));
 #endif
+	if (h->b == NULL)
+		goto fail;
 	h->upper_bound = h->n_buckets * MH_DENSITY;
 	return h;
+
+fail:
+	free(h->p);
+	free(h->shadow);
+	free(h);
+	return NULL;
 }
 
-void
+int
 _mh(clear)(struct _mh(t) *h)
 {
+	mh_int_t n_buckets = __ac_prime_list[h->prime];
+	mh_node_t *p = (mh_node_t *)calloc(n_buckets, sizeof(mh_node_t));
+	if (p == NULL)
+		return -1;
+#if !mh_bytemap
+	uint32_t *b = (uint32_t *)calloc(n_buckets / 16 + 1, sizeof(uint32_t));
+#else
+	uint8_t *b = (uint8_t *)calloc(n_buckets, sizeof(uint8_t));
+#endif
+	if (b == NULL) {
+		free(p);
+		return -1;
+	}
 	if (h->shadow->p) {
 		free(h->shadow->p);
 		free(h->shadow->b);
@@ -424,15 +451,12 @@ _mh(clear)(struct _mh(t) *h)
 	free(h->p);
 	free(h->b);
 	h->prime = 0;
-	h->n_buckets = __ac_prime_list[h->prime];
-	h->p = (mh_node_t *) calloc(h->n_buckets, sizeof(mh_node_t));
-#if !mh_bytemap
-	h->b = (uint32_t *) calloc(h->n_buckets / 16 + 1, sizeof(uint32_t));
-#else
-	h->b = (uint8_t *) calloc(h->n_buckets, sizeof(uint8_t));
-#endif
+	h->n_buckets = n_buckets;
+	h->p = p;
+	h->b = b;
 	h->size = 0;
 	h->upper_bound = h->n_buckets * MH_DENSITY;
+	return 0;
 }
 
 void
@@ -515,42 +539,47 @@ _mh(start_resize)(struct _mh(t) *h, mh_int_t buckets, mh_int_t batch,
 		/* hash size is already greater than requested */
 		return 0;
 	}
-	while (h->prime < __ac_HASH_PRIME_SIZE - 1) {
-		if (__ac_prime_list[h->prime] >= buckets)
+	mh_int_t new_prime = h->prime;
+	while (new_prime < __ac_HASH_PRIME_SIZE - 1) {
+		if (__ac_prime_list[new_prime] >= buckets)
 			break;
-		h->prime += 1;
+		new_prime += 1;
 	}
-
-	h->batch = batch > 0 ? batch : h->n_buckets / (256 * 1024);
-	if (h->batch < 256) {
+	mh_int_t new_batch = batch > 0 ? batch : h->n_buckets / (256 * 1024);
+	if (new_batch < 256) {
 		/*
 		 * Minimal batch must be greater or equal to
 		 * 1 / (1 - f), where f is upper bound percent
 		 * = MH_DENSITY
 		 */
-		h->batch = 256;
+		new_batch = 256;
 	}
 
-	struct _mh(t) *s = h->shadow;
-	memcpy(s, h, sizeof(*h));
-	s->resize_position = 0;
-	s->n_buckets = __ac_prime_list[h->prime];
-	s->upper_bound = s->n_buckets * MH_DENSITY;
-	s->n_dirty = 0;
-	s->size = 0;
-	s->p = (mh_node_t *) malloc(s->n_buckets * sizeof(mh_node_t));
-	if (s->p == NULL)
+	mh_int_t n_buckets = __ac_prime_list[new_prime];
+	mh_node_t *p = (mh_node_t *)malloc(n_buckets * sizeof(mh_node_t));
+	if (p == NULL)
 		return -1;
 #if !mh_bytemap
-	s->b = (uint32_t *) calloc(s->n_buckets / 16 + 1, sizeof(uint32_t));
+	uint32_t *b = (uint32_t *)calloc(n_buckets / 16 + 1, sizeof(uint32_t));
 #else
-	s->b = (uint8_t *) calloc(s->n_buckets, sizeof(uint8_t));
+	uint8_t *b = (uint8_t *)calloc(n_buckets, sizeof(uint8_t));
 #endif
-	if (s->b == NULL) {
-		free(s->p);
-		s->p = NULL;
+	if (b == NULL) {
+		free(p);
 		return -1;
 	}
+
+	h->prime = new_prime;
+	h->batch = new_batch;
+	struct _mh(t) *s = h->shadow;
+	memcpy(s, h, sizeof(*h));
+	s->resize_position = 0;
+	s->n_buckets = n_buckets;
+	s->upper_bound = s->n_buckets * MH_DENSITY;
+	s->n_dirty = 0;
+	s->size = 0;
+	s->p = p;
+	s->b = b;
 	_mh(resize)(h, arg);
 
 	return 0;
diff --git a/test/unit/mhash_body.c b/test/unit/mhash_body.c
index 458817fb1..324c72a43 100644
--- a/test/unit/mhash_body.c
+++ b/test/unit/mhash_body.c
@@ -23,7 +23,7 @@ h = init();
 destroy(h);
 
 h = init();
-clear(h);
+fail_unless(clear(h) == 0);
 
 /* access not yet initialized hash */
 clr(9);
@@ -59,7 +59,7 @@ tst(7);
 tst(8);
 tst(9);
 
-clear(h);
+fail_unless(clear(h) == 0);
 
 /* after clear no items should exist */
 clr(1);
-- 
2.20.1


^ permalink raw reply	[flat|nested] 15+ messages in thread

* [Tarantool-patches] [PATCH v4 5/9] iproto: implement streams in iproto
  2021-08-12  9:50 [Tarantool-patches] [PATCH v4 0/9] implement iproto streams mechanik20051988 via Tarantool-patches
                   ` (3 preceding siblings ...)
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 4/9] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches
@ 2021-08-12  9:50 ` mechanik20051988 via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 6/9] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
                   ` (4 subsequent siblings)
  9 siblings, 0 replies; 15+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-12  9:50 UTC (permalink / raw)
  To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988

From: mechanik20051988 <mechanik20.05.1988@gmail.com>

Implement streams in iproto. There is a hash table of streams for
each connection. When a new request comes with a non-zero stream ID,
we look for the stream with such ID in this table and if it does not
exist, we create it. The request is placed in the queue of pending
requests, and if this queue was empty at the time of its receipt, it
is pushed to the tx thread for processing. When a request belonging to
stream returns to the network thread after processing is completed, we
take the next request out of the queue of pending requests and send it
for processing to tx thread. If there is no pending requests we remove
stream object from hash table and destroy it. Requests with zero stream
ID are processed in the old way.

Part of #5860

@TarantoolBot document
Title: streams are implemented in iproto
A distinctive feature of streams is that all requests in them
are processed sequentially. The execution of the next request
in stream will not start until the previous one is completed.
To separate requests belonging to and not belonging to streams
we use stream ID field in binary iproto protocol: requests with
non-zero stream ID belongs to some stream. Stream ID is unique
within the connection and indicates which stream the request
belongs to. For streams from different connections, the IDs may
be the same.
---
 src/box/errcode.h      |   1 +
 src/box/iproto.cc      | 225 ++++++++++++++++++++++++++++++++++++++++-
 src/lib/core/errinj.h  |   2 +
 test/box/errinj.result |   2 +
 test/box/error.result  |   1 +
 5 files changed, 226 insertions(+), 5 deletions(-)

diff --git a/src/box/errcode.h b/src/box/errcode.h
index ef2b2e9b1..f8fda23c1 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -281,6 +281,7 @@ struct errcode_record {
 	/*226 */_(ER_NOT_LEADER,		"The instance is not a leader. New leader is %u")\
 	/*227 */_(ER_SYNC_QUEUE_UNCLAIMED,	"The synchronous transaction queue doesn't belong to any instance")\
 	/*228 */_(ER_SYNC_QUEUE_FOREIGN,	"The synchronous transaction queue belongs to other instance with id %u")\
+	/*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process %s request in stream") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index dcf60e1be..84dbdab40 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -64,6 +64,8 @@
 #include "execute.h"
 #include "errinj.h"
 #include "tt_static.h"
+#include "salad/stailq.h"
+#include "assoc.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -74,6 +76,21 @@ enum {
 	 ENDPOINT_NAME_MAX = 10
 };
 
+struct iproto_connection;
+
+struct iproto_stream {
+	/**
+	 * Queue of pending requests (iproto messages) for this stream,
+	 * processed sequentially. This field is accesable only from
+	 * iproto thread. Queue items has iproto_msg type.
+	 */
+	struct stailq pending_requests;
+	/** Id of this stream, used as a key in streams hash table */
+	uint64_t id;
+	/** This stream connection */
+	struct iproto_connection *connection;
+};
+
 /**
  * A position in connection output buffer.
  * Since we use rotating buffers to recycle memory,
@@ -136,6 +153,7 @@ struct iproto_thread {
 	 */
 	struct mempool iproto_msg_pool;
 	struct mempool iproto_connection_pool;
+	struct mempool iproto_stream_pool;
 	/*
 	 * List of stopped connections
 	 */
@@ -304,6 +322,16 @@ struct iproto_msg
 	 * and the connection must be closed.
 	 */
 	bool close_connection;
+	/**
+	 * A stailq_entry to hold message in stream.
+	 * All messages processed in stream sequently. Before processing
+	 * all messages added to queue of pending requests. If this queue
+	 * was empty message begins to be processed, otherwise it waits until
+	 * all previous messages are processed.
+	 */
+	struct stailq_entry in_stream;
+	/** Stream that owns this message, or NULL. */
+	struct iproto_stream *stream;
 };
 
 static struct iproto_msg *
@@ -505,6 +533,11 @@ struct iproto_connection
 	 */
 	enum iproto_connection_state state;
 	struct rlist in_stop_list;
+	/**
+	 * Hash table that holds all streams for this connection.
+	 * This field is accesable only from iproto thread.
+	 */
+	struct mh_i64ptr_t *streams;
 	/**
 	 * Kharon is used to implement box.session.push().
 	 * When a new push is ready, tx uses kharon to notify
@@ -572,6 +605,48 @@ struct iproto_connection
 } while (0);
 #endif
 
+/*
+ * TODO(gh-6293): Implement necessary statistic for iproto streams
+ * and remove it from errinj.
+ */
+static inline void
+errinj_stream_count_add(MAYBE_UNUSED int val)
+{
+#ifndef NDEBUG
+	struct errinj *inj =
+		errinj(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT);
+	__atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
+#endif
+}
+
+static inline void
+errinj_stream_msg_count_add(MAYBE_UNUSED int val)
+{
+#ifndef NDEBUG
+	struct errinj *inj =
+		errinj(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT);
+	__atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
+#endif
+}
+
+static struct iproto_stream *
+iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
+{
+	struct iproto_thread *iproto_thread = connection->iproto_thread;
+	struct iproto_stream *stream = (struct iproto_stream *)
+		mempool_alloc(&iproto_thread->iproto_stream_pool);
+	if (stream == NULL) {
+		diag_set(OutOfMemory, sizeof(*stream),
+			 "mempool_alloc", "stream");
+		return NULL;
+	}
+	errinj_stream_count_add(1);
+	stailq_create(&stream->pending_requests);
+	stream->id = stream_id;
+	stream->connection = connection;
+	return stream;
+}
+
 /**
  * Return true if we have not enough spare messages
  * in the message pool.
@@ -591,6 +666,14 @@ iproto_msg_delete(struct iproto_msg *msg)
 	iproto_resume(iproto_thread);
 }
 
+static void
+iproto_stream_delete(struct iproto_stream *stream)
+{
+	assert(stailq_empty(&stream->pending_requests));
+	errinj_stream_count_add(-1);
+	mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream);
+}
+
 static struct iproto_msg *
 iproto_msg_new(struct iproto_connection *con)
 {
@@ -609,6 +692,7 @@ iproto_msg_new(struct iproto_connection *con)
 	}
 	msg->close_connection = false;
 	msg->connection = con;
+	msg->stream = NULL;
 	rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1);
 	return msg;
 }
@@ -836,6 +920,63 @@ iproto_connection_input_buffer(struct iproto_connection *con)
 	return new_ibuf;
 }
 
+/**
+ * Check if message belongs to stream (stream_id != 0), and if it
+ * is so create new stream or get stream from connection streams
+ * hash table. Put message to stream pending messages list.
+ * @retval 0 - the message is ready to push to TX thread (either if
+ *             stream_id is not set (is zero) or the stream is not
+ *             processing other messages).
+ *         1 - the message is postponed because its stream is busy
+ *             processing previous message(s).
+ *        -1 - memory error.
+ */
+static int
+iproto_msg_start_processing_in_stream(struct iproto_msg *msg)
+{
+	uint64_t stream_id = msg->header.stream_id;
+	if (stream_id == 0)
+		return 0;
+
+	struct iproto_connection *con = msg->connection;
+	struct iproto_stream *stream = NULL;
+	mh_int_t pos = mh_i64ptr_find(con->streams, stream_id, 0);
+	if (pos == mh_end(con->streams)) {
+		stream = iproto_stream_new(msg->connection, msg->header.stream_id);
+		if (stream == NULL)
+			return -1;
+		struct mh_i64ptr_node_t node;
+		node.key = stream_id;
+		node.val = stream;
+		pos = mh_i64ptr_put(con->streams, &node, NULL, NULL);
+		if (pos == mh_end(con->streams)) {
+			iproto_stream_delete(stream);
+			diag_set(OutOfMemory, pos + 1, "mh_streams_put",
+				 "mh_streams_node");
+			return -1;
+		}
+	}
+	/*
+	 * Not all messages belongs to stream. We can't determine which
+	 * messages belong to stream in `iproto_msg_new`, so we increment
+	 * ERRINJ_IPROTO_STREAM_MSG_COUNT here, when we already know it.
+	 * In `iproto_msg_delete` we decrement ERRINJ_IPROTO_STREAM_MSG_COUNT
+	 * only if msg->stream != NULL.
+	 */
+	errinj_stream_msg_count_add(1);
+	stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
+	msg->stream = stream;
+	/*
+	 * If the request queue in the stream is not empty, it means
+	 * that some previous message wasn't processed yet. Regardless
+	 * of this, we put the message in the queue, but we start processing
+	 * the message only if the message queue in the stream was empty.
+	 */
+	bool was_not_empty = !stailq_empty(&stream->pending_requests);
+	stailq_add_tail_entry(&stream->pending_requests, msg, in_stream);
+	return was_not_empty ? 1 : 0;
+}
+
 /**
  * Enqueue all requests which were read up. If a request limit is
  * reached - stop the connection input even if not the whole batch
@@ -845,7 +986,7 @@ iproto_connection_input_buffer(struct iproto_connection *con)
  * @param in Buffer to parse.
  *
  * @retval  0 Success.
- * @retval -1 Invalid MessagePack error.
+ * @retval -1 Invalid MessagePack or memory error.
  */
 static inline int
 iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
@@ -898,12 +1039,25 @@ err_msgpack:
 		msg->len = reqend - reqstart; /* total request length */
 
 		iproto_msg_decode(msg, &pos, reqend, &stop_input);
+
+		int rc = iproto_msg_start_processing_in_stream(msg);
+		if (rc < 0) {
+			iproto_msg_delete(msg);
+			return -1;
+		}
 		/*
-		 * This can't throw, but should not be
-		 * done in case of exception.
+		 * rc > 0, means that stream pending requests queue is not
+		 * empty, skip push.
 		 */
-		cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
-		n_requests++;
+		if (rc == 0) {
+			/*
+			 * This can't throw, but should not be
+			 * done in case of exception.
+			 */
+			cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
+			n_requests++;
+		}
+
 		/* Request is parsed */
 		assert(reqend > reqstart);
 		assert(con->parse_size >= (size_t) (reqend - reqstart));
@@ -1145,6 +1299,13 @@ iproto_connection_new(struct iproto_thread *iproto_thread, int fd)
 		diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
 		return NULL;
 	}
+	con->streams = mh_i64ptr_new();
+	if (con->streams == NULL) {
+		diag_set(OutOfMemory, sizeof(*(con->streams)),
+			 "mh_streams_new", "streams");
+		mempool_free(&con->iproto_thread->iproto_connection_pool, con);
+		return NULL;
+	}
 	con->iproto_thread = iproto_thread;
 	con->input.data = con->output.data = con;
 	con->loop = loop();
@@ -1193,6 +1354,9 @@ iproto_connection_delete(struct iproto_connection *con)
 	       con->obuf[0].iov[0].iov_base == NULL);
 	assert(con->obuf[1].pos == 0 &&
 	       con->obuf[1].iov[0].iov_base == NULL);
+
+	assert(mh_size(con->streams) == 0);
+	mh_i64ptr_delete(con->streams);
 	mempool_free(&con->iproto_thread->iproto_connection_pool, con);
 }
 
@@ -1240,7 +1404,9 @@ static void
 iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		  bool *stop_input)
 {
+	uint64_t stream_id;
 	uint8_t type;
+	bool request_is_not_for_stream;
 	struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
 
 	if (xrow_header_decode(&msg->header, pos, reqend, true))
@@ -1248,6 +1414,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 	assert(*pos == reqend);
 
 	type = msg->header.type;
+	stream_id = msg->header.stream_id;
+	request_is_not_for_stream =
+		((type > IPROTO_TYPE_STAT_MAX &&
+		 type != IPROTO_PING) || type == IPROTO_AUTH);
+
+	if (stream_id != 0 && request_is_not_for_stream) {
+		diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
+			 iproto_type_name(type));
+		goto error;
+	}
 
 	/*
 	 * Parse request before putting it into the queue
@@ -1873,12 +2049,49 @@ tx_process_replication(struct cmsg *m)
 	}
 }
 
+static void
+iproto_msg_finish_processing_in_stream(struct iproto_msg *msg)
+{
+	struct iproto_connection *con = msg->connection;
+	struct iproto_stream *stream = msg->stream;
+
+	if (stream == NULL)
+		return;
+
+	struct iproto_msg *tmp =
+		stailq_shift_entry(&stream->pending_requests,
+				   struct iproto_msg, in_stream);
+	assert(tmp == msg);
+	(void)tmp;
+	errinj_stream_msg_count_add(-1);
+
+	if (stailq_empty(&stream->pending_requests)) {
+		struct mh_i64ptr_node_t node = { stream->id, NULL };
+		mh_i64ptr_remove(con->streams, &node, 0);
+		iproto_stream_delete(stream);
+	} else {
+		/*
+		 * If there are new messages for this stream
+		 * then schedule their processing.
+		 */
+		struct iproto_msg *next =
+			stailq_first_entry(&stream->pending_requests,
+					   struct iproto_msg,
+					   in_stream);
+		assert(next != NULL);
+		next->wpos = con->wpos;
+		cpipe_push_input(&con->iproto_thread->tx_pipe, &next->base);
+		cpipe_flush_input(&con->iproto_thread->tx_pipe);
+	}
+}
+
 static void
 net_send_msg(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
 
+	iproto_msg_finish_processing_in_stream(msg);
 	if (msg->len != 0) {
 		/* Discard request (see iproto_enqueue_batch()). */
 		msg->p_ibuf->rpos += msg->len;
@@ -2066,6 +2279,8 @@ net_cord_f(va_list  ap)
 		       sizeof(struct iproto_msg));
 	mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc,
 		       sizeof(struct iproto_connection));
+	mempool_create(&iproto_thread->iproto_stream_pool, &cord()->slabc,
+		       sizeof(struct iproto_stream));
 
 	evio_service_init(loop(), &iproto_thread->binary, "binary",
 			  iproto_on_accept, iproto_thread);
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index 51611f654..75caaed06 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -156,6 +156,8 @@ struct errinj {
 	_(ERRINJ_APPLIER_READ_TX_ROW_DELAY, ERRINJ_BOOL, {.bparam = false})\
 	_(ERRINJ_NETBOX_IO_DELAY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_REPLICASET_VCLOCK, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT, {.iparam = 0}) \
+	_(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT, {.iparam = 0}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index b7e5ec667..129b6e879 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -59,6 +59,8 @@ evals
   - ERRINJ_INDEX_ALLOC: false
   - ERRINJ_INDEX_RESERVE: false
   - ERRINJ_IPROTO_SINGLE_THREAD_STAT: -1
+  - ERRINJ_IPROTO_STREAM_COUNT: 0
+  - ERRINJ_IPROTO_STREAM_MSG_COUNT: 0
   - ERRINJ_IPROTO_TX_DELAY: false
   - ERRINJ_IPROTO_WRITE_ERROR_DELAY: false
   - ERRINJ_LOG_ROTATE: false
diff --git a/test/box/error.result b/test/box/error.result
index b7ac7a138..f80fdfed5 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -447,6 +447,7 @@ t;
  |   226: box.error.NOT_LEADER
  |   227: box.error.SYNC_QUEUE_UNCLAIMED
  |   228: box.error.SYNC_QUEUE_FOREIGN
+ |   229: box.error.UNABLE_TO_PROCESS_IN_STREAM
  | ...
 
 test_run:cmd("setopt delimiter ''");
-- 
2.20.1


^ permalink raw reply	[flat|nested] 15+ messages in thread

* [Tarantool-patches] [PATCH v4 6/9] net.box: add stream support to net.box
  2021-08-12  9:50 [Tarantool-patches] [PATCH v4 0/9] implement iproto streams mechanik20051988 via Tarantool-patches
                   ` (4 preceding siblings ...)
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 5/9] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches
@ 2021-08-12  9:50 ` mechanik20051988 via Tarantool-patches
  2021-08-12 17:49   ` Vladislav Shpilevoy via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 7/9] iproto: add RAFT prefix for all requests related to 'raft' mechanik20051988 via Tarantool-patches
                   ` (3 subsequent siblings)
  9 siblings, 1 reply; 15+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-12  9:50 UTC (permalink / raw)
  To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988

From: mechanik20051988 <mechanik20.05.1988@gmail.com>

Add stream support to `net.box`. In "net.box", stream
is an object over connection that has the same methods,
but all requests from it sends with non-zero stream ID.
Since there can be a lot of streams, we do not copy the
spaces from the connection to the stream immediately when
creating a stream, but do it only when we first access space.
Also, when updating the schema, we update the spaces in lazy
mode: each stream has it's own schema_version, when there is
some access to stream space we compare stream schema_version
and connection schema_version and if they are different update
clear stream space cache and wrap space that is being accessed
to stream cache.

Part of #5860

@TarantoolBot document
Title: stream support was added to net.box
In "net.box", stream is an object over connection that
has the same methods, but all requests from it sends
with non-zero stream ID. Stream ID is generated on the
client automatically. Simple example of stream creation
using net.box:
```lua
stream = conn:new_stream()
-- all connection methods are valid, but send requests
-- with non zero stream_id.
```
---
 src/box/lua/net_box.c                         |  95 ++--
 src/box/lua/net_box.lua                       | 205 ++++++--
 test/box/access.result                        |   6 +-
 test/box/access.test.lua                      |   6 +-
 test/box/iproto_streams.lua                   |  13 +
 ...net.box_console_connections_gh-2677.result |   2 +-
 ...t.box_console_connections_gh-2677.test.lua |   2 +-
 .../net.box_incorrect_iterator_gh-841.result  |   4 +-
 ...net.box_incorrect_iterator_gh-841.test.lua |   4 +-
 test/box/net.box_iproto_hangs_gh-3464.result  |   2 +-
 .../box/net.box_iproto_hangs_gh-3464.test.lua |   2 +-
 test/box/net.box_iproto_streams.result        | 473 ++++++++++++++++++
 test/box/net.box_iproto_streams.test.lua      | 182 +++++++
 .../net.box_long-poll_input_gh-3400.result    |   8 +-
 .../net.box_long-poll_input_gh-3400.test.lua  |   8 +-
 test/box/suite.ini                            |   2 +-
 16 files changed, 922 insertions(+), 92 deletions(-)
 create mode 100644 test/box/iproto_streams.lua
 create mode 100644 test/box/net.box_iproto_streams.result
 create mode 100644 test/box/net.box_iproto_streams.test.lua

diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 06e574cdf..3bc49af23 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -328,7 +328,7 @@ netbox_registry_reset(struct netbox_registry *registry, struct error *error)
 
 static inline size_t
 netbox_begin_encode(struct mpstream *stream, uint64_t sync,
-		    enum iproto_type type)
+		    enum iproto_type type, uint64_t stream_id)
 {
 	/* Remember initial size of ibuf (see netbox_end_encode()) */
 	struct ibuf *ibuf = stream->ctx;
@@ -340,7 +340,7 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync,
 	mpstream_advance(stream, fixheader_size);
 
 	/* encode header */
-	mpstream_encode_map(stream, 2);
+	mpstream_encode_map(stream, stream_id != 0 ? 3 : 2);
 
 	mpstream_encode_uint(stream, IPROTO_SYNC);
 	mpstream_encode_uint(stream, sync);
@@ -348,6 +348,10 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync,
 	mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE);
 	mpstream_encode_uint(stream, type);
 
+	if (stream_id != 0) {
+		mpstream_encode_uint(stream, IPROTO_STREAM_ID);
+		mpstream_encode_uint(stream, stream_id);
+	}
 	/* Caller should remember how many bytes was used in ibuf */
 	return used;
 }
@@ -380,11 +384,11 @@ netbox_end_encode(struct mpstream *stream, size_t initial_size)
 
 static void
 netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
 	(void)L;
 	(void)idx;
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING, stream_id);
 	netbox_end_encode(stream, svp);
 }
 
@@ -402,7 +406,7 @@ netbox_encode_auth(struct ibuf *ibuf, uint64_t sync,
 	struct mpstream stream;
 	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
 		      mpstream_error_handler, &is_error);
-	size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH);
+	size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH, 0);
 
 	/* Adapted from xrow_encode_auth() */
 	mpstream_encode_map(&stream, password != NULL ? 2 : 1);
@@ -432,7 +436,7 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id)
 	struct mpstream stream;
 	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
 		      mpstream_error_handler, &is_error);
-	size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT);
+	size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT, 0);
 	mpstream_encode_map(&stream, 3);
 	mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
 	mpstream_encode_uint(&stream, space_id);
@@ -446,10 +450,10 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id)
 
 static void
 netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
-			uint64_t sync, enum iproto_type type)
+			uint64_t sync, enum iproto_type type, uint64_t stream_id)
 {
 	/* Lua stack at idx: function_name, args */
-	size_t svp = netbox_begin_encode(stream, sync, type);
+	size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
 
 	mpstream_encode_map(stream, 2);
 
@@ -468,24 +472,25 @@ netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_call_16(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL_16);
+	netbox_encode_call_impl(L, idx, stream, sync,
+				IPROTO_CALL_16, stream_id);
 }
 
 static void
 netbox_encode_call(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL);
+	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL, stream_id);
 }
 
 static void
 netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: expr, args */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL, stream_id);
 
 	mpstream_encode_map(stream, 2);
 
@@ -504,10 +509,11 @@ netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, iterator, offset, limit, key */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT,
+					 stream_id);
 
 	mpstream_encode_map(stream, 6);
 
@@ -546,10 +552,11 @@ netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
-				uint64_t sync, enum iproto_type type)
+				uint64_t sync, enum iproto_type type,
+				uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, tuple */
-	size_t svp = netbox_begin_encode(stream, sync, type);
+	size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
 
 	mpstream_encode_map(stream, 2);
 
@@ -567,24 +574,27 @@ netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_insert(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_INSERT);
+	netbox_encode_insert_or_replace(L, idx, stream, sync,
+					IPROTO_INSERT, stream_id);
 }
 
 static void
 netbox_encode_replace(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_REPLACE);
+	netbox_encode_insert_or_replace(L, idx, stream, sync,
+					IPROTO_REPLACE, stream_id);
 }
 
 static void
 netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, key */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE,
+					 stream_id);
 
 	mpstream_encode_map(stream, 3);
 
@@ -607,10 +617,11 @@ netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, key, ops */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE,
+					 stream_id);
 
 	mpstream_encode_map(stream, 5);
 
@@ -641,10 +652,11 @@ netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_upsert(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, tuple, ops */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT,
+					 stream_id);
 
 	mpstream_encode_map(stream, 4);
 
@@ -844,10 +856,11 @@ netbox_send_and_recv_console(int fd, struct ibuf *send_buf,
 
 static void
 netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query, parameters, options */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE,
+					 stream_id);
 
 	mpstream_encode_map(stream, 3);
 
@@ -873,10 +886,11 @@ netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE,
+					 stream_id);
 
 	mpstream_encode_map(stream, 1);
 
@@ -896,18 +910,19 @@ netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
-			uint64_t sync)
+			uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query, parameters, options */
-	netbox_encode_prepare(L, idx, stream, sync);
+	netbox_encode_prepare(L, idx, stream, sync, stream_id);
 }
 
 static void
 netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: bytes */
 	(void)sync;
+	(void)stream_id;
 	size_t len;
 	const char *data = lua_tolstring(L, idx, &len);
 	mpstream_memcpy(stream, data, len);
@@ -921,11 +936,11 @@ netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
  */
 static int
 netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
-		     struct ibuf *ibuf, uint64_t sync)
+		     struct ibuf *ibuf, uint64_t sync, uint64_t stream_id)
 {
 	typedef void (*method_encoder_f)(struct lua_State *L, int idx,
 					 struct mpstream *stream,
-					 uint64_t sync);
+					 uint64_t sync, uint64_t stream_id);
 	static method_encoder_f method_encoder[] = {
 		[NETBOX_PING]		= netbox_encode_ping,
 		[NETBOX_CALL_16]	= netbox_encode_call_16,
@@ -949,7 +964,7 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
 	struct mpstream stream;
 	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
 		      luamp_error, L);
-	method_encoder[method](L, idx, &stream, sync);
+	method_encoder[method](L, idx, &stream, sync, stream_id);
 	return 0;
 }
 
@@ -1569,6 +1584,7 @@ netbox_new_registry(struct lua_State *L)
  *  - on_push: on_push trigger function
  *  - on_push_ctx: on_push trigger function argument
  *  - format: tuple format to use for decoding the body or nil
+ *  - stream_id: determines whether or not the request belongs to stream
  *  - ...: method-specific arguments passed to the encoder
  */
 static void
@@ -1581,7 +1597,8 @@ netbox_make_request(struct lua_State *L, int idx,
 	enum netbox_method method = lua_tointeger(L, idx + 4);
 	assert(method < netbox_method_MAX);
 	uint64_t sync = registry->next_sync++;
-	netbox_encode_method(L, idx + 8, method, send_buf, sync);
+	uint64_t stream_id = luaL_touint64(L, idx + 8);
+	netbox_encode_method(L, idx + 9, method, send_buf, sync, stream_id);
 
 	/* Initialize and register the request object. */
 	request->method = method;
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 8f5671c15..8d707fb26 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -275,14 +275,14 @@ local function create_transport(host, port, user, password, callback,
     -- @retval not nil Future object.
     --
     local function perform_async_request(buffer, skip_header, method, on_push,
-                                         on_push_ctx, format, ...)
+                                         on_push_ctx, format, stream_id, ...)
         local err = prepare_perform_request()
         if err then
             return nil, err
         end
         return perform_async_request_impl(requests, send_buf, buffer,
                                           skip_header, method, on_push,
-                                          on_push_ctx, format, ...)
+                                          on_push_ctx, format, stream_id, ...)
     end
 
     --
@@ -291,14 +291,15 @@ local function create_transport(host, port, user, password, callback,
     -- @retval not nil Response object.
     --
     local function perform_request(timeout, buffer, skip_header, method,
-                                   on_push, on_push_ctx, format, ...)
+                                   on_push, on_push_ctx, format,
+                                   stream_id, ...)
         local err = prepare_perform_request()
         if err then
             return nil, err
         end
         return perform_request_impl(timeout, requests, send_buf, buffer,
                                     skip_header, method, on_push, on_push_ctx,
-                                    format, ...)
+                                    format, stream_id, ...)
     end
 
     -- PROTOCOL STATE MACHINE (WORKER FIBER) --
@@ -487,6 +488,37 @@ local function remote_serialize(self)
     }
 end
 
+local function stream_serialize(self)
+    return {
+        host = self._conn.host,
+        port = self._conn.port,
+        opts = next(self._conn.opts) and self._conn.opts,
+        state = self._conn.state,
+        error = self._conn.error,
+        protocol = self._conn.protocol,
+        schema_version = self._conn.schema_version,
+        peer_uuid = self._conn.peer_uuid,
+        peer_version_id = self._conn.peer_version_id,
+        stream_id = self._stream_id
+    }
+end
+
+local function stream_spaces_serialize(self)
+    return self._stream._conn.space
+end
+
+local function stream_space_serialize(self)
+    return self._src
+end
+
+local function stream_indexes_serialize(self)
+    return self._space._src.index
+end
+
+local function stream_index_serialize(self)
+    return self._src
+end
+
 local remote_methods = {}
 local remote_mt = {
     __index = remote_methods, __serialize = remote_serialize,
@@ -499,6 +531,86 @@ local console_mt = {
     __metatable = false
 }
 
+-- Create stream space index, which is same as connection space
+-- index, but have non zero stream ID.
+local function stream_wrap_index(stream_id, src)
+    return setmetatable({
+        _stream_id = stream_id,
+        _src = src,
+    }, {
+        __index = src,
+        __serialize = stream_index_serialize
+    })
+end
+
+-- Metatable for stream space indexes. When stream space being
+-- created there are no indexes in it. When accessing the space
+-- index, we look for corresponding space index in corresponding
+-- connection space. If it is found we create same index for the
+-- stream space but with corresponding stream ID. We do not need
+-- to compare stream _schema_version and connection schema_version,
+-- because all access to index  is carried out through it's space.
+-- So we update schema_version when we access space.
+local stream_indexes_mt = {
+    __index = function(self, key)
+        local _space = self._space
+        local src = _space._src.index[key]
+        if not src then
+            return nil
+        end
+        local res = stream_wrap_index(_space._stream_id, src)
+        self[key] = res
+        return res
+    end,
+    __serialize = stream_indexes_serialize
+}
+
+-- Create stream space, which is same as connection space,
+-- but have non zero stream ID.
+local function stream_wrap_space(stream, src)
+    local res = setmetatable({
+        _stream_id = stream._stream_id,
+        _src = src,
+        index = setmetatable({
+            _space = nil,
+        }, stream_indexes_mt)
+    }, {
+        __index = src,
+        __serialize = stream_space_serialize
+    })
+    res.index._space = res
+    return res
+end
+
+-- Metatable for stream spaces. When stream being created there
+-- are no spaces in it. When user try to access some space in
+-- stream, we first of all compare _schema_version of stream with
+-- schema_version from connection and if they are not equal, we
+-- clear stream space cache and update it's schema_version. Then
+-- we look for corresponding space in the connection. If it is
+-- found we create same space for the stream but with corresponding
+-- stream ID.
+local stream_spaces_mt = {
+    __index = function(self, key)
+        local stream = self._stream
+        if stream._schema_version ~= stream._conn.schema_version then
+            stream._schema_version = stream._conn.schema_version
+            self._stream_space_cache = {}
+        end
+        if self._stream_space_cache[key] then
+            return self._stream_space_cache[key]
+        end
+        local src = stream._conn.space[key]
+        if not src then
+            return nil
+        end
+        local res = stream_wrap_space(stream, src)
+        self._stream_space_cache[key] = res
+        return res
+    end,
+    __serialize = stream_spaces_serialize
+}
+
 local space_metatable, index_metatable
 
 local function new_sm(host, port, opts, connection, greeting)
@@ -578,6 +690,8 @@ local function new_sm(host, port, opts, connection, greeting)
     if opts.wait_connected ~= false then
         remote._transport.wait_state('active', tonumber(opts.wait_connected))
     end
+    -- Last stream ID used for this connection
+    remote._last_stream_id = 0
     return remote
 end
 
@@ -635,6 +749,28 @@ local function check_eval_args(args)
     end
 end
 
+local function stream_new_stream(stream)
+    check_remote_arg(stream, 'new_stream')
+    return stream._conn:new_stream()
+end
+
+function remote_methods:new_stream()
+    check_remote_arg(self, 'new_stream')
+    self._last_stream_id = self._last_stream_id + 1
+    local stream = setmetatable({
+        new_stream = stream_new_stream,
+        _stream_id = self._last_stream_id,
+        space = setmetatable({
+            _stream_space_cache = {},
+            _stream = nil,
+        }, stream_spaces_mt),
+        _conn = self,
+        _schema_version = self.schema_version,
+    }, { __index = self, __serialize = stream_serialize })
+    stream.space._stream = stream
+    return stream
+end
+
 function remote_methods:close()
     check_remote_arg(self, 'close')
     self._transport.stop()
@@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout)
     return self._transport.wait_state('active', timeout)
 end
 
-function remote_methods:_request(method, opts, format, ...)
+function remote_methods:_request(method, opts, format, stream_id, ...)
     local transport = self._transport
     local on_push, on_push_ctx, buffer, skip_header, deadline
     -- Extract options, set defaults, check if the request is
@@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...)
             local res, err =
                 transport.perform_async_request(buffer, skip_header, method,
                                                 table.insert, {}, format,
-                                                ...)
+                                                stream_id, ...)
             if err then
                 box.error(err)
             end
@@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...)
     end
     local res, err = transport.perform_request(timeout, buffer, skip_header,
                                                method, on_push, on_push_ctx,
-                                               format, ...)
+                                               format, stream_id, ...)
     if err then
         box.error(err)
     end
@@ -718,7 +854,7 @@ end
 
 function remote_methods:ping(opts)
     check_remote_arg(self, 'ping')
-    return (pcall(self._request, self, M_PING, opts))
+    return (pcall(self._request, self, M_PING, opts, nil, self._stream_id))
 end
 
 function remote_methods:reload_schema()
@@ -729,14 +865,16 @@ end
 -- @deprecated since 1.7.4
 function remote_methods:call_16(func_name, ...)
     check_remote_arg(self, 'call')
-    return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...}))
+    return (self:_request(M_CALL_16, nil, nil, self._stream_id,
+                          tostring(func_name), {...}))
 end
 
 function remote_methods:call(func_name, args, opts)
     check_remote_arg(self, 'call')
     check_call_args(args)
     args = args or {}
-    local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args)
+    local res = self:_request(M_CALL_17, opts, nil, self._stream_id,
+                              tostring(func_name), args)
     if type(res) ~= 'table' or opts and opts.is_async then
         return res
     end
@@ -746,14 +884,15 @@ end
 -- @deprecated since 1.7.4
 function remote_methods:eval_16(code, ...)
     check_remote_arg(self, 'eval')
-    return unpack((self:_request(M_EVAL, nil, nil, code, {...})))
+    return unpack((self:_request(M_EVAL, nil, nil, self._stream_id,
+                                 code, {...})))
 end
 
 function remote_methods:eval(code, args, opts)
     check_remote_arg(self, 'eval')
     check_eval_args(args)
     args = args or {}
-    local res = self:_request(M_EVAL, opts, nil, code, args)
+    local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args)
     if type(res) ~= 'table' or opts and opts.is_async then
         return res
     end
@@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts)
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "execute", "options")
     end
-    return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {},
-                         sql_opts or {})
+    return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id,
+                         query, parameters or {}, sql_opts or {})
 end
 
 function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args
@@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "prepare", "options")
     end
-    return self:_request(M_PREPARE, netbox_opts, nil, query)
+    return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query)
 end
 
 function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
@@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "unprepare", "options")
     end
-    return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {},
-                         sql_opts or {})
+    return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id,
+                         query, parameters or {}, sql_opts or {})
 end
 
 function remote_methods:wait_state(state, timeout)
@@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout)
     end
     if self.protocol == 'Binary' then
         local loader = 'return require("console").eval(...)'
-        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader,
+        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader,
                       {line})
     else
         assert(self.protocol == 'Lua console')
-        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil,
+        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil,
                       line..'$EOF$\n')
     end
     if err then
@@ -951,14 +1090,14 @@ space_metatable = function(remote)
 
     function methods:insert(tuple, opts)
         check_space_arg(self, 'insert')
-        return remote:_request(M_INSERT, opts, self._format_cdata, self.id,
-                               tuple)
+        return remote:_request(M_INSERT, opts, self._format_cdata,
+                               self._stream_id, self.id, tuple)
     end
 
     function methods:replace(tuple, opts)
         check_space_arg(self, 'replace')
-        return remote:_request(M_REPLACE, opts, self._format_cdata, self.id,
-                               tuple)
+        return remote:_request(M_REPLACE, opts, self._format_cdata,
+                               self._stream_id, self.id, tuple)
     end
 
     function methods:select(key, opts)
@@ -978,7 +1117,8 @@ space_metatable = function(remote)
 
     function methods:upsert(key, oplist, opts)
         check_space_arg(self, 'upsert')
-        return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id,
+        return nothing_or_data(remote:_request(M_UPSERT, opts, nil,
+                                               self._stream_id, self.id,
                                                key, oplist))
     end
 
@@ -1009,8 +1149,8 @@ index_metatable = function(remote)
         local offset = tonumber(opts and opts.offset) or 0
         local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
         return (remote:_request(M_SELECT, opts, self.space._format_cdata,
-                                self.space.id, self.id, iterator, offset,
-                                limit, key))
+                                self._stream_id, self.space.id, self.id,
+                                iterator, offset, limit, key))
     end
 
     function methods:get(key, opts)
@@ -1020,6 +1160,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_GET, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.EQ, 0, 2, key))
     end
@@ -1031,6 +1172,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_MIN, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.GE, 0, 1, key))
     end
@@ -1042,6 +1184,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_MAX, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.LE, 0, 1, key))
     end
@@ -1053,22 +1196,24 @@ index_metatable = function(remote)
         end
         local code = string.format('box.space.%s.index.%s:count',
                                    self.space.name, self.name)
-        return remote:_request(M_COUNT, opts, nil, code, { key, opts })
+        return remote:_request(M_COUNT, opts, nil, self._stream_id,
+                               code, { key, opts })
     end
 
     function methods:delete(key, opts)
         check_index_arg(self, 'delete')
         return nothing_or_data(remote:_request(M_DELETE, opts,
                                                self.space._format_cdata,
-                                               self.space.id, self.id, key))
+                                               self._stream_id, self.space.id,
+                                               self.id, key))
     end
 
     function methods:update(key, oplist, opts)
         check_index_arg(self, 'update')
         return nothing_or_data(remote:_request(M_UPDATE, opts,
                                                self.space._format_cdata,
-                                               self.space.id, self.id, key,
-                                               oplist))
+                                               self._stream_id, self.space.id,
+                                               self.id, key, oplist))
     end
 
     return { __index = methods, __metatable = false }
diff --git a/test/box/access.result b/test/box/access.result
index 712cd68f8..6434da907 100644
--- a/test/box/access.result
+++ b/test/box/access.result
@@ -908,15 +908,15 @@ LISTEN = require('uri').parse(box.cfg.listen)
 c = net.connect(LISTEN.host, LISTEN.service)
 ---
 ...
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '1' does not exist
 ...
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '65537' does not exist
 ...
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '4294967295' does not exist
 ...
diff --git a/test/box/access.test.lua b/test/box/access.test.lua
index 6060475d1..6abdb780d 100644
--- a/test/box/access.test.lua
+++ b/test/box/access.test.lua
@@ -351,9 +351,9 @@ box.schema.func.drop(name)
 -- very large space id, no crash occurs.
 LISTEN = require('uri').parse(box.cfg.listen)
 c = net.connect(LISTEN.host, LISTEN.service)
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 c:close()
 
 session = box.session
diff --git a/test/box/iproto_streams.lua b/test/box/iproto_streams.lua
new file mode 100644
index 000000000..db6a29a8a
--- /dev/null
+++ b/test/box/iproto_streams.lua
@@ -0,0 +1,13 @@
+#!/usr/bin/env tarantool
+
+require('console').listen(os.getenv('ADMIN'))
+
+local memtx_use_mvcc_engine = (arg[2] and arg[2] == 'true' and true or false)
+
+box.cfg({
+    listen = os.getenv('LISTEN'),
+    iproto_threads = tonumber(arg[1]),
+    memtx_use_mvcc_engine = memtx_use_mvcc_engine
+})
+
+box.schema.user.grant('guest', 'read,write,execute,create,drop', 'universe',  nil, {if_not_exists = true})
diff --git a/test/box/net.box_console_connections_gh-2677.result b/test/box/net.box_console_connections_gh-2677.result
index f45aa0b56..7cea0a1da 100644
--- a/test/box/net.box_console_connections_gh-2677.result
+++ b/test/box/net.box_console_connections_gh-2677.result
@@ -74,7 +74,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
 ---
 ...
 while not c:is_connected() do fiber.sleep(0.01) end
diff --git a/test/box/net.box_console_connections_gh-2677.test.lua b/test/box/net.box_console_connections_gh-2677.test.lua
index 40d099e70..6c4e6ea4f 100644
--- a/test/box/net.box_console_connections_gh-2677.test.lua
+++ b/test/box/net.box_console_connections_gh-2677.test.lua
@@ -30,7 +30,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
 while not c:is_connected() do fiber.sleep(0.01) end
 c:ping()
 
diff --git a/test/box/net.box_incorrect_iterator_gh-841.result b/test/box/net.box_incorrect_iterator_gh-841.result
index fbd2a7700..cd2a86787 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.result
+++ b/test/box/net.box_incorrect_iterator_gh-841.result
@@ -16,13 +16,13 @@ test_run:cmd("setopt delimiter ';'")
 - true
 ...
 function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
-    local ret = cn:_request(remote._method.select, opts, nil, space_id,
+    local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
                             index_id, iterator, offset, limit, key)
     return ret
 end
 function x_fatal(cn)
     cn._transport.perform_request(nil, nil, false, remote._method.inject,
-                                  nil, nil, nil, '\x80')
+                                  nil, nil, nil, nil, '\x80')
 end
 test_run:cmd("setopt delimiter ''");
 ---
diff --git a/test/box/net.box_incorrect_iterator_gh-841.test.lua b/test/box/net.box_incorrect_iterator_gh-841.test.lua
index 1d24f9f56..9c42175ef 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.test.lua
+++ b/test/box/net.box_incorrect_iterator_gh-841.test.lua
@@ -5,13 +5,13 @@ test_run:cmd("push filter ".."'\\.lua.*:[0-9]+: ' to '.lua...\"]:<line>: '")
 
 test_run:cmd("setopt delimiter ';'")
 function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
-    local ret = cn:_request(remote._method.select, opts, nil, space_id,
+    local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
                             index_id, iterator, offset, limit, key)
     return ret
 end
 function x_fatal(cn)
     cn._transport.perform_request(nil, nil, false, remote._method.inject,
-                                  nil, nil, nil, '\x80')
+                                  nil, nil, nil, nil, '\x80')
 end
 test_run:cmd("setopt delimiter ''");
 
diff --git a/test/box/net.box_iproto_hangs_gh-3464.result b/test/box/net.box_iproto_hangs_gh-3464.result
index 3b5458c9a..cbf8181b3 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.result
+++ b/test/box/net.box_iproto_hangs_gh-3464.result
@@ -17,7 +17,7 @@ c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
 ---
 ...
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
 ---
 - null
 - Peer closed
diff --git a/test/box/net.box_iproto_hangs_gh-3464.test.lua b/test/box/net.box_iproto_hangs_gh-3464.test.lua
index a7c41ae76..51a9ddece 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.test.lua
+++ b/test/box/net.box_iproto_hangs_gh-3464.test.lua
@@ -8,6 +8,6 @@ net = require('net.box')
 --
 c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
 c:close()
 test_run:grep_log('default', 'too big packet size in the header') ~= nil
diff --git a/test/box/net.box_iproto_streams.result b/test/box/net.box_iproto_streams.result
new file mode 100644
index 000000000..051dcc4fd
--- /dev/null
+++ b/test/box/net.box_iproto_streams.result
@@ -0,0 +1,473 @@
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+---
+...
+fiber = require('fiber')
+---
+...
+test_run = require('test_run').new()
+---
+...
+test_run:cmd("create server test with script='box/iproto_streams.lua'")
+---
+- true
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+---
+...
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+conn_1 = net_box.connect(server_addr)
+---
+...
+stream_1 = conn_1:new_stream()
+---
+...
+conn_2 = net_box.connect(server_addr)
+---
+...
+stream_2 = conn_2:new_stream()
+---
+...
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+---
+...
+assert(not stream_1:ping())
+---
+- true
+...
+stream_2:close()
+---
+...
+assert(not conn_2:ping())
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream = conn:new_stream()
+---
+...
+-- The new method `new_stream`, for the stream object, returns a new
+-- stream object, just as in the case of connection.
+_ = stream:new_stream()
+---
+...
+conn:close()
+---
+...
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+---
+...
+stream = conn:new_stream()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+test_run:switch("default")
+---
+- true
+...
+assert(not conn.space.test)
+---
+- true
+...
+assert(not stream.space.test)
+---
+- true
+...
+assert(conn.schema_version == stream._schema_version)
+---
+- true
+...
+conn:reload_schema()
+---
+...
+assert(conn.space.test ~= nil)
+---
+- true
+...
+assert(conn.schema_version ~= stream._schema_version)
+---
+- true
+...
+assert(stream.space.test ~= nil)
+---
+- true
+...
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+---
+- true
+...
+collectgarbage()
+---
+- 0
+...
+collectgarbage()
+---
+- 0
+...
+assert(conn.space.test ~= nil)
+---
+- true
+...
+assert(stream.space.test ~= nil)
+---
+- true
+...
+test_run:switch("test")
+---
+- true
+...
+s:drop()
+---
+...
+test_run:switch("default")
+---
+- true
+...
+conn:reload_schema()
+---
+...
+assert(not conn.space.test)
+---
+- true
+...
+assert(not stream.space.test)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- All test works with iproto_thread count = 10
+test_run:cmd("start server test with args='10'")
+---
+- true
+...
+test_run:switch('test')
+---
+- true
+...
+fiber = require('fiber')
+---
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function replace_with_yeild(item)
+    fiber.sleep(0.1)
+    return s:replace({item})
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+conn_space = conn.space.test
+---
+...
+stream = conn:new_stream()
+---
+...
+stream_space = stream.space.test
+---
+...
+-- Check that all requests in stream processed consistently
+futures = {}
+---
+...
+replace_count = 3
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for i = 1, replace_count do
+    futures[string.format("replace_%d", i)] =
+        stream_space:replace({i}, {is_async = true})
+    futures[string.format("select_%d", i)] =
+        stream_space:select({}, {is_async = true})
+end;
+---
+...
+futures["replace_with_yeild_for_stream"] =
+    stream:call("replace_with_yeild",
+                { replace_count + 1 }, {is_async = true});
+---
+...
+futures["select_with_yeild_for_stream"] =
+    stream_space:select({}, {is_async = true});
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+results = wait_and_return_results(futures)
+---
+...
+-- [1]
+assert(results["select_1"])
+---
+- - [1]
+...
+-- [1] [2]
+assert(results["select_2"])
+---
+- - [1]
+  - [2]
+...
+-- [1] [2] [3]
+assert(results["select_3"])
+---
+- - [1]
+  - [2]
+  - [3]
+...
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+...
+-- There is no request execution order for the connection
+futures = {}
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+futures["replace_with_yeild_for_connection"] =
+    conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+---
+...
+futures["select_with_yeild_for_connection"] =
+    conn_space:select({}, {is_async = true});
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+results = wait_and_return_results(futures)
+---
+...
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+...
+test_run:switch("test")
+---
+- true
+...
+-- [1] [2] [3] [4] [5]
+s:select()
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [5]
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+---
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+replace_count = 20
+for i = 1, replace_count do
+    space:replace({i}, {is_async = true})
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Give time to send
+fiber.sleep(0)
+---
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:switch("test")
+---
+- true
+...
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [5]
+  - [6]
+  - [7]
+  - [8]
+  - [9]
+  - [10]
+  - [11]
+  - [12]
+  - [13]
+  - [14]
+  - [15]
+  - [16]
+  - [17]
+  - [18]
+  - [19]
+  - [20]
+...
+s:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("cleanup server test")
+---
+- true
+...
+test_run:cmd("delete server test")
+---
+- true
+...
diff --git a/test/box/net.box_iproto_streams.test.lua b/test/box/net.box_iproto_streams.test.lua
new file mode 100644
index 000000000..ae878e42e
--- /dev/null
+++ b/test/box/net.box_iproto_streams.test.lua
@@ -0,0 +1,182 @@
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+fiber = require('fiber')
+test_run = require('test_run').new()
+
+test_run:cmd("create server test with script='box/iproto_streams.lua'")
+
+test_run:cmd("setopt delimiter ';'")
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn_1 = net_box.connect(server_addr)
+stream_1 = conn_1:new_stream()
+conn_2 = net_box.connect(server_addr)
+stream_2 = conn_2:new_stream()
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+assert(not stream_1:ping())
+stream_2:close()
+assert(not conn_2:ping())
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+-- The new method `new_stream`, for the stream object, returns a new
+-- stream object, just as in the case of connection.
+_ = stream:new_stream()
+conn:close()
+
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+test_run:switch("test")
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch("default")
+assert(not conn.space.test)
+assert(not stream.space.test)
+assert(conn.schema_version == stream._schema_version)
+conn:reload_schema()
+assert(conn.space.test ~= nil)
+assert(conn.schema_version ~= stream._schema_version)
+assert(stream.space.test ~= nil)
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+collectgarbage()
+collectgarbage()
+assert(conn.space.test ~= nil)
+assert(stream.space.test ~= nil)
+test_run:switch("test")
+s:drop()
+test_run:switch("default")
+conn:reload_schema()
+assert(not conn.space.test)
+assert(not stream.space.test)
+test_run:cmd("stop server test")
+
+-- All test works with iproto_thread count = 10
+
+test_run:cmd("start server test with args='10'")
+test_run:switch('test')
+fiber = require('fiber')
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:cmd("setopt delimiter ';'")
+function replace_with_yeild(item)
+    fiber.sleep(0.1)
+    return s:replace({item})
+end;
+test_run:cmd("setopt delimiter ''");
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+conn_space = conn.space.test
+stream = conn:new_stream()
+stream_space = stream.space.test
+
+-- Check that all requests in stream processed consistently
+futures = {}
+replace_count = 3
+test_run:cmd("setopt delimiter ';'")
+for i = 1, replace_count do
+    futures[string.format("replace_%d", i)] =
+        stream_space:replace({i}, {is_async = true})
+    futures[string.format("select_%d", i)] =
+        stream_space:select({}, {is_async = true})
+end;
+futures["replace_with_yeild_for_stream"] =
+    stream:call("replace_with_yeild",
+                { replace_count + 1 }, {is_async = true});
+futures["select_with_yeild_for_stream"] =
+    stream_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1]
+assert(results["select_1"])
+-- [1] [2]
+assert(results["select_2"])
+-- [1] [2] [3]
+assert(results["select_3"])
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+
+-- There is no request execution order for the connection
+futures = {}
+test_run:cmd("setopt delimiter ';'")
+futures["replace_with_yeild_for_connection"] =
+    conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+futures["select_with_yeild_for_connection"] =
+    conn_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+test_run:switch("test")
+-- [1] [2] [3] [4] [5]
+s:select()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+space = stream.space.test
+test_run:cmd("setopt delimiter ';'")
+replace_count = 20
+for i = 1, replace_count do
+    space:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+-- Give time to send
+fiber.sleep(0)
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:switch("test")
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:cmd("stop server test")
+
+test_run:cmd("cleanup server test")
+test_run:cmd("delete server test")
diff --git a/test/box/net.box_long-poll_input_gh-3400.result b/test/box/net.box_long-poll_input_gh-3400.result
index a16110ee6..a98eea655 100644
--- a/test/box/net.box_long-poll_input_gh-3400.result
+++ b/test/box/net.box_long-poll_input_gh-3400.result
@@ -24,10 +24,10 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, false,    \
-                 net._method.call_17, nil, nil, nil, 'long', {})   \
-c._transport.perform_request(nil, nil, false, net._method.inject,  \
-                             nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false,         \
+                 net._method.call_17, nil, nil, nil, nil, 'long', {})   \
+c._transport.perform_request(nil, nil, false, net._method.inject,       \
+                             nil, nil, nil, nil, '\x80')
 ---
 ...
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/net.box_long-poll_input_gh-3400.test.lua b/test/box/net.box_long-poll_input_gh-3400.test.lua
index 891b59224..a6f302ee0 100644
--- a/test/box/net.box_long-poll_input_gh-3400.test.lua
+++ b/test/box/net.box_long-poll_input_gh-3400.test.lua
@@ -14,9 +14,9 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, false,    \
-                 net._method.call_17, nil, nil, nil, 'long', {})   \
-c._transport.perform_request(nil, nil, false, net._method.inject,  \
-                             nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false,         \
+                 net._method.call_17, nil, nil, nil, nil, 'long', {})   \
+c._transport.perform_request(nil, nil, false, net._method.inject,       \
+                             nil, nil, nil, nil, '\x80')
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 c:close()
diff --git a/test/box/suite.ini b/test/box/suite.ini
index b5d869fb3..637766cdd 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -5,7 +5,7 @@ script = box.lua
 disabled = rtree_errinj.test.lua tuple_bench.test.lua
 long_run = huge_field_map_long.test.lua
 config = engine.cfg
-release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua
+release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua net.box_iproto_streams.test.lua
 lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua
 use_unix_sockets = True
 use_unix_sockets_iproto = True
-- 
2.20.1


^ permalink raw reply	[flat|nested] 15+ messages in thread

* [Tarantool-patches] [PATCH v4 7/9] iproto: add RAFT prefix for all requests related to 'raft'.
  2021-08-12  9:50 [Tarantool-patches] [PATCH v4 0/9] implement iproto streams mechanik20051988 via Tarantool-patches
                   ` (5 preceding siblings ...)
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 6/9] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
@ 2021-08-12  9:50 ` mechanik20051988 via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 8/9] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches
                   ` (2 subsequent siblings)
  9 siblings, 0 replies; 15+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-12  9:50 UTC (permalink / raw)
  To: tarantool-patches, vdavydov, v.shpilevoy

Adding interactive transactions over iproto streamss requires
adding new request types for begin, commit and rollback them.
The type names of these new requests conflict with the existing
names for the 'raft' requests. Adding RAFT prefix for all requests
related to 'raft' resolves this problem.

Part of #5860

@TarantoolBot document
Title: add RAFT prefix for all requests related to 'raft'.
Rename IPROTO_PROMOTE, IPROTO_DEMOTE, IPROTO_CONFIRM and
IPROTO_ROLLBACK to IPROTO_RAFT_PROMOTE, IPROTO_RAFT_DEMOTE,
IPROTO_RAFT_CONFIRM and IPROTO_RAFT_ROLLBACK accordingly.
---
 src/box/box.cc             |  4 ++--
 src/box/iproto_constants.h | 22 +++++++++++-----------
 src/box/memtx_engine.c     |  4 ++--
 src/box/txn.c              |  2 +-
 src/box/txn_limbo.c        | 18 +++++++++---------
 src/box/xrow.h             |  4 ++--
 6 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index eb9baa4b8..ead02c58c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1669,7 +1669,7 @@ box_issue_promote(uint32_t prev_leader_id, int64_t promote_lsn)
 	txn_limbo_write_promote(&txn_limbo, promote_lsn,
 				raft->term);
 	struct synchro_request req = {
-		.type = IPROTO_PROMOTE,
+		.type = IPROTO_RAFT_PROMOTE,
 		.replica_id = prev_leader_id,
 		.origin_id = instance_id,
 		.lsn = promote_lsn,
@@ -1691,7 +1691,7 @@ box_issue_demote(uint32_t prev_leader_id, int64_t promote_lsn)
 	txn_limbo_write_demote(&txn_limbo, promote_lsn,
 				box_raft()->term);
 	struct synchro_request req = {
-		.type = IPROTO_DEMOTE,
+		.type = IPROTO_RAFT_DEMOTE,
 		.replica_id = prev_leader_id,
 		.origin_id = instance_id,
 		.lsn = promote_lsn,
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index b9498868c..8792737b2 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -242,14 +242,14 @@ enum iproto_type {
 
 	IPROTO_RAFT = 30,
 	/** PROMOTE request. */
-	IPROTO_PROMOTE = 31,
+	IPROTO_RAFT_PROMOTE = 31,
 	/** DEMOTE request. */
-	IPROTO_DEMOTE = 32,
+	IPROTO_RAFT_DEMOTE = 32,
 
 	/** A confirmation message for synchronous transactions. */
-	IPROTO_CONFIRM = 40,
+	IPROTO_RAFT_CONFIRM = 40,
 	/** A rollback message for synchronous transactions. */
-	IPROTO_ROLLBACK = 41,
+	IPROTO_RAFT_ROLLBACK = 41,
 
 	/** PING request */
 	IPROTO_PING = 64,
@@ -314,13 +314,13 @@ iproto_type_name(uint16_t type)
 	switch (type) {
 	case IPROTO_RAFT:
 		return "RAFT";
-	case IPROTO_PROMOTE:
+	case IPROTO_RAFT_PROMOTE:
 		return "PROMOTE";
-	case IPROTO_DEMOTE:
+	case IPROTO_RAFT_DEMOTE:
 		return "DEMOTE";
-	case IPROTO_CONFIRM:
+	case IPROTO_RAFT_CONFIRM:
 		return "CONFIRM";
-	case IPROTO_ROLLBACK:
+	case IPROTO_RAFT_ROLLBACK:
 		return "ROLLBACK";
 	case VY_INDEX_RUN_INFO:
 		return "RUNINFO";
@@ -371,15 +371,15 @@ dml_request_key_map(uint16_t type)
 static inline bool
 iproto_type_is_synchro_request(uint16_t type)
 {
-	return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK ||
-	       type == IPROTO_PROMOTE || type == IPROTO_DEMOTE;
+	return type == IPROTO_RAFT_CONFIRM || type == IPROTO_RAFT_ROLLBACK ||
+	       type == IPROTO_RAFT_PROMOTE || type == IPROTO_RAFT_DEMOTE;
 }
 
 /** PROMOTE/DEMOTE entry (synchronous replication and leader elections). */
 static inline bool
 iproto_type_is_promote_request(uint32_t type)
 {
-       return type == IPROTO_PROMOTE || type == IPROTO_DEMOTE;
+       return type == IPROTO_RAFT_PROMOTE || type == IPROTO_RAFT_DEMOTE;
 }
 
 static inline bool
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 0b06e5e63..fc369149d 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -229,7 +229,7 @@ memtx_engine_recover_raft(const struct xrow_header *row)
 static int
 memtx_engine_recover_synchro(const struct xrow_header *row)
 {
-	assert(row->type == IPROTO_PROMOTE);
+	assert(row->type == IPROTO_RAFT_PROMOTE);
 	struct synchro_request req;
 	if (xrow_decode_synchro(row, &req) != 0)
 		return -1;
@@ -250,7 +250,7 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
 	if (row->type != IPROTO_INSERT) {
 		if (row->type == IPROTO_RAFT)
 			return memtx_engine_recover_raft(row);
-		if (row->type == IPROTO_PROMOTE)
+		if (row->type == IPROTO_RAFT_PROMOTE)
 			return memtx_engine_recover_synchro(row);
 		diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
 			 (uint32_t) row->type);
diff --git a/src/box/txn.c b/src/box/txn.c
index b80e722a4..e057d2762 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -407,7 +407,7 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	/*
 	 * Create WAL record for the write requests in
 	 * non-temporary spaces. stmt->space can be NULL for
-	 * IRPOTO_NOP or IPROTO_CONFIRM.
+	 * IRPOTO_NOP or IPROTO_RAFT_CONFIRM.
 	 */
 	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
 		if (txn_add_redo(txn, stmt, request) != 0)
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 570f77c46..70447caaf 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -305,7 +305,7 @@ void
 txn_limbo_checkpoint(const struct txn_limbo *limbo,
 		     struct synchro_request *req)
 {
-	req->type = IPROTO_PROMOTE;
+	req->type = IPROTO_RAFT_PROMOTE;
 	req->replica_id = limbo->owner_id;
 	req->lsn = limbo->confirmed_lsn;
 	req->term = limbo->promote_greatest_term;
@@ -372,7 +372,7 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
 	assert(lsn > limbo->confirmed_lsn);
 	assert(!limbo->is_in_rollback);
 	limbo->confirmed_lsn = lsn;
-	txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn, 0);
+	txn_limbo_write_synchro(limbo, IPROTO_RAFT_CONFIRM, lsn, 0);
 }
 
 /** Confirm all the entries <= @a lsn. */
@@ -450,7 +450,7 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
 	assert(lsn > limbo->confirmed_lsn);
 	assert(!limbo->is_in_rollback);
 	limbo->is_in_rollback = true;
-	txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn, 0);
+	txn_limbo_write_synchro(limbo, IPROTO_RAFT_ROLLBACK, lsn, 0);
 	limbo->is_in_rollback = false;
 }
 
@@ -498,7 +498,7 @@ txn_limbo_write_promote(struct txn_limbo *limbo, int64_t lsn, uint64_t term)
 	struct txn_limbo_entry *e = txn_limbo_last_synchro_entry(limbo);
 	assert(e == NULL || e->lsn <= lsn);
 	(void) e;
-	txn_limbo_write_synchro(limbo, IPROTO_PROMOTE, lsn, term);
+	txn_limbo_write_synchro(limbo, IPROTO_RAFT_PROMOTE, lsn, term);
 	limbo->is_in_rollback = false;
 }
 
@@ -526,7 +526,7 @@ txn_limbo_write_demote(struct txn_limbo *limbo, int64_t lsn, uint64_t term)
 	struct txn_limbo_entry *e = txn_limbo_last_synchro_entry(limbo);
 	assert(e == NULL || e->lsn <= lsn);
 	(void)e;
-	txn_limbo_write_synchro(limbo, IPROTO_DEMOTE, lsn, term);
+	txn_limbo_write_synchro(limbo, IPROTO_RAFT_DEMOTE, lsn, term);
 	limbo->is_in_rollback = false;
 }
 
@@ -768,16 +768,16 @@ txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
 		lsn = 0;
 	}
 	switch (req->type) {
-	case IPROTO_CONFIRM:
+	case IPROTO_RAFT_CONFIRM:
 		txn_limbo_read_confirm(limbo, lsn);
 		break;
-	case IPROTO_ROLLBACK:
+	case IPROTO_RAFT_ROLLBACK:
 		txn_limbo_read_rollback(limbo, lsn);
 		break;
-	case IPROTO_PROMOTE:
+	case IPROTO_RAFT_PROMOTE:
 		txn_limbo_read_promote(limbo, req->origin_id, lsn);
 		break;
-	case IPROTO_DEMOTE:
+	case IPROTO_RAFT_DEMOTE:
 		txn_limbo_read_demote(limbo, lsn);
 		break;
 	default:
diff --git a/src/box/xrow.h b/src/box/xrow.h
index cb83fddff..d32dcbc0d 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -233,8 +233,8 @@ xrow_encode_dml(const struct request *request, struct region *region,
  */
 struct synchro_request {
 	/**
-	 * Operation type - either IPROTO_ROLLBACK or IPROTO_CONFIRM or
-	 * IPROTO_PROMOTE
+	 * Operation type - either IPROTO_RAFT_ROLLBACK or IPROTO_RAFT_CONFIRM
+	 * or IPROTO_RAFT_PROMOTE
 	 */
 	uint16_t type;
 	/**
-- 
2.20.1


^ permalink raw reply	[flat|nested] 15+ messages in thread

* [Tarantool-patches] [PATCH v4 8/9] iproto: implement interactive transactions over iproto streams
  2021-08-12  9:50 [Tarantool-patches] [PATCH v4 0/9] implement iproto streams mechanik20051988 via Tarantool-patches
                   ` (6 preceding siblings ...)
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 7/9] iproto: add RAFT prefix for all requests related to 'raft' mechanik20051988 via Tarantool-patches
@ 2021-08-12  9:50 ` mechanik20051988 via Tarantool-patches
  2021-08-12 10:48   ` Vladimir Davydov via Tarantool-patches
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 9/9] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches
  2021-08-12 10:51 ` [Tarantool-patches] [PATCH v4 0/9] implement iproto streams Vladimir Davydov via Tarantool-patches
  9 siblings, 1 reply; 15+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-12  9:50 UTC (permalink / raw)
  To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988

From: mechanik20051988 <mechanik20.05.1988@gmail.com>

Implement interactive transactions over iproto streams. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. If any request fails during the
transaction, it will not affect the other requests in the transaction.
If disconnect occurs when there is some active transaction in stream,
this transaction will be rollbacked, if it does not have time to commit
before this moment.

Part of #5860

@TarantoolBot document
Title: interactive transactions was implemented over iproto streams.
The main purpose of streams is transactions via iproto. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. There are multiple ways to begin,
commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
with corresponding function (box.begin, box.commit and box.rollback),
IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
'COMMIT', 'ROLLBACK') and IPROTO_BEGIN, IPROTO_COMMIT, IPROTO_ROLLBACK
accordingly. If disconnect occurs when there is some active transaction
in stream, this transaction will be rollbacked, if it does not have time
to commit before this moment. Add new command codes for begin, commit and
rollback transactions: `IPROTO_BEGIN 14`, `IPROTO_COMMIT 15` and
`IPROTO_ROLLBACK 16` accordingly.
---
 src/box/call.c                        |  12 --
 src/box/errcode.h                     |   1 +
 src/box/iproto.cc                     | 254 +++++++++++++++++++++++++-
 src/box/iproto_constants.c            |   6 +
 src/box/iproto_constants.h            |   6 +
 src/box/txn.c                         |  23 +++
 src/box/txn.h                         |  19 ++
 test/box-tap/feedback_daemon.test.lua |   2 +-
 test/box/error.result                 |   1 +
 test/box/misc.result                  |   5 +-
 10 files changed, 312 insertions(+), 17 deletions(-)

diff --git a/src/box/call.c b/src/box/call.c
index a6384efe2..0ce84b1ed 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -141,8 +141,6 @@ box_process_call(struct call_request *request, struct port *port)
 	const char *name = request->name;
 	assert(name != NULL);
 	uint32_t name_len = mp_decode_strl(&name);
-	/* Transaction is not started. */
-	assert(!in_txn());
 
 	int rc;
 	struct port args;
@@ -157,11 +155,6 @@ box_process_call(struct call_request *request, struct port *port)
 	}
 	if (rc != 0)
 		return -1;
-	if (in_txn() != NULL) {
-		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
-		port_destroy(port);
-		return -1;
-	}
 	return 0;
 }
 
@@ -179,10 +172,5 @@ box_process_eval(struct call_request *request, struct port *port)
 	uint32_t expr_len = mp_decode_strl(&expr);
 	if (box_lua_eval(expr, expr_len, &args, port) != 0)
 		return -1;
-	if (in_txn() != 0) {
-		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
-		port_destroy(port);
-		return -1;
-	}
 	return 0;
 }
diff --git a/src/box/errcode.h b/src/box/errcode.h
index f8fda23c1..a6f096698 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -282,6 +282,7 @@ struct errcode_record {
 	/*227 */_(ER_SYNC_QUEUE_UNCLAIMED,	"The synchronous transaction queue doesn't belong to any instance")\
 	/*228 */_(ER_SYNC_QUEUE_FOREIGN,	"The synchronous transaction queue belongs to other instance with id %u")\
 	/*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process %s request in stream") \
+	/*227 */_(ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, "Unable to process %s request out of stream") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 84dbdab40..6ae122235 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -66,6 +66,7 @@
 #include "tt_static.h"
 #include "salad/stailq.h"
 #include "assoc.h"
+#include "txn.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -79,6 +80,8 @@ enum {
 struct iproto_connection;
 
 struct iproto_stream {
+	/** Currently active stream transaction or NULL */
+	struct txn *txn;
 	/**
 	 * Queue of pending requests (iproto messages) for this stream,
 	 * processed sequentially. This field is accesable only from
@@ -89,6 +92,11 @@ struct iproto_stream {
 	uint64_t id;
 	/** This stream connection */
 	struct iproto_connection *connection;
+	/**
+	 * Pre-allocated disconnect msg to gracefully rollback stream
+	 * transaction and destroy stream object.
+	 */
+	struct cmsg on_disconnect;
 };
 
 /**
@@ -135,6 +143,10 @@ struct iproto_thread {
 	/**
 	 * Static routes for this iproto thread
 	 */
+	struct cmsg_hop begin_route[2];
+	struct cmsg_hop commit_route[2];
+	struct cmsg_hop rollback_route[2];
+	struct cmsg_hop rollback_on_disconnect_route[2];
 	struct cmsg_hop destroy_route[2];
 	struct cmsg_hop disconnect_route[2];
 	struct cmsg_hop misc_route[2];
@@ -641,12 +653,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
 		return NULL;
 	}
 	errinj_stream_count_add(1);
+	stream->txn = NULL;
 	stailq_create(&stream->pending_requests);
 	stream->id = stream_id;
 	stream->connection = connection;
 	return stream;
 }
 
+static inline void
+iproto_stream_rollback_on_disconnect(struct iproto_stream *stream)
+{
+	struct iproto_connection *conn = stream->connection;
+	struct iproto_thread *iproto_thread = conn->iproto_thread;
+	struct cmsg_hop *route =
+		iproto_thread->rollback_on_disconnect_route;
+	cmsg_init(&stream->on_disconnect, route);
+	cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect);
+}
+
 /**
  * Return true if we have not enough spare messages
  * in the message pool.
@@ -670,6 +694,7 @@ static void
 iproto_stream_delete(struct iproto_stream *stream)
 {
 	assert(stailq_empty(&stream->pending_requests));
+	assert(stream->txn == NULL);
 	errinj_stream_count_add(-1);
 	mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream);
 }
@@ -715,7 +740,17 @@ iproto_msg_new(struct iproto_connection *con)
 static inline bool
 iproto_connection_is_idle(struct iproto_connection *con)
 {
+	/*
+         * The check for 'mh_size (streams) == 0' was added, because it is
+         * possible that when disconnect occurs, there is active transaction
+         * in stream after processing all messages. In this case we send
+         * special message to rollback it, and without this check we would
+         * immidiatly send special message to destroy connection. This would
+         * not lead to error now, since the messages are processed strictly
+         * sequentially, and rollback does not yield, but it is not safely and
+	 */
 	return con->long_poll_count == 0 &&
+	       mh_size(con->streams) == 0 &&
 	       ibuf_used(&con->ibuf[0]) == 0 &&
 	       ibuf_used(&con->ibuf[1]) == 0;
 }
@@ -805,6 +840,23 @@ iproto_connection_close(struct iproto_connection *con)
 		 * is done only once.
 		 */
 		con->p_ibuf->wpos -= con->parse_size;
+		mh_int_t node;
+		mh_foreach(con->streams, node) {
+			struct iproto_stream *stream = (struct iproto_stream *)
+				mh_i64ptr_node(con->streams, node)->val;
+			/**
+			 * If stream requests queue is empty, it means that
+			 * that there is some active transaction which was
+			 * not commited yet. We need to rollback it, since
+			 * we push on_disconnect message to tx thread here.
+			 * If stream requests queue is not empty, it means
+			 * that stream processing some request in tx thread
+			 * now. We destroy stream in `net_send_msg` after
+			 * processing all requests.
+			 */
+			if (stailq_empty(&stream->pending_requests))
+				iproto_stream_rollback_on_disconnect(stream);
+		}
 		cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg);
 		assert(con->state == IPROTO_CONNECTION_ALIVE);
 		con->state = IPROTO_CONNECTION_CLOSED;
@@ -965,6 +1017,7 @@ iproto_msg_start_processing_in_stream(struct iproto_msg *msg)
 	 */
 	errinj_stream_msg_count_add(1);
 	stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
+	assert(stream != NULL);
 	msg->stream = stream;
 	/*
 	 * If the request queue in the stream is not empty, it means
@@ -1407,6 +1460,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 	uint64_t stream_id;
 	uint8_t type;
 	bool request_is_not_for_stream;
+	bool request_is_only_for_stream;
 	struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
 
 	if (xrow_header_decode(&msg->header, pos, reqend, true))
@@ -1418,11 +1472,19 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 	request_is_not_for_stream =
 		((type > IPROTO_TYPE_STAT_MAX &&
 		 type != IPROTO_PING) || type == IPROTO_AUTH);
+	request_is_only_for_stream =
+		(type == IPROTO_BEGIN ||
+		 type == IPROTO_COMMIT ||
+		 type == IPROTO_ROLLBACK);
 
 	if (stream_id != 0 && request_is_not_for_stream) {
 		diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
 			 iproto_type_name(type));
 		goto error;
+	} else if (stream_id == 0 && request_is_only_for_stream) {
+		diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM,
+			 iproto_type_name(type));
+		goto error;
 	}
 
 	/*
@@ -1450,6 +1512,15 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		              sizeof(*(iproto_thread->dml_route)));
 		cmsg_init(&msg->base, iproto_thread->dml_route[type]);
 		break;
+	case IPROTO_BEGIN:
+		cmsg_init(&msg->base, iproto_thread->begin_route);
+		break;
+	case IPROTO_COMMIT:
+		cmsg_init(&msg->base, iproto_thread->commit_route);
+		break;
+	case IPROTO_ROLLBACK:
+		cmsg_init(&msg->base, iproto_thread->rollback_route);
+		break;
 	case IPROTO_CALL_16:
 	case IPROTO_CALL:
 	case IPROTO_EVAL:
@@ -1523,6 +1594,38 @@ tx_fiber_init(struct session *session, uint64_t sync)
 	fiber_set_user(f, &session->credentials);
 }
 
+static void
+tx_process_rollback_on_disconnect(struct cmsg *m)
+{
+	struct iproto_stream *stream =
+		container_of(m, struct iproto_stream,
+			     on_disconnect);
+
+	if (stream->txn != NULL) {
+		tx_fiber_init(stream->connection->session, 0);
+		txn_attach(stream->txn);
+		if (box_txn_rollback() != 0)
+			panic("failed to rollback transaction on disconnect");
+		stream->txn = NULL;
+	}
+}
+
+static void
+net_finish_rollback_on_disconnect(struct cmsg *m)
+{
+	struct iproto_stream *stream =
+		container_of(m, struct iproto_stream,
+			     on_disconnect);
+	struct iproto_connection *con = stream->connection;
+
+	struct mh_i64ptr_node_t node = { stream->id, NULL };
+	mh_i64ptr_remove(con->streams, &node, 0);
+	iproto_stream_delete(stream);
+	assert(!evio_has_fd(&con->input));
+	if (con->state == IPROTO_CONNECTION_PENDING_DESTROY)
+		iproto_connection_try_to_start_destroy(con);
+}
+
 static void
 tx_process_disconnect(struct cmsg *m)
 {
@@ -1656,15 +1759,43 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
 	}
 }
 
+/**
+ * Since the processing of requests within a transaction
+ * for a stream can occur in different fibers, we store
+ * a pointer to transaction in the stream structure.
+ * Check if message belongs to stream and there is active
+ * transaction for this stream. In case it is so, sets this
+ * transaction for current fiber.
+ */
+static inline void
+tx_prepare_transaction_for_request(struct iproto_msg *msg)
+{
+	if (msg->stream != NULL && msg->stream->txn != NULL) {
+		txn_attach(msg->stream->txn);
+		msg->stream->txn = NULL;
+	}
+	assert(!in_txn() || msg->stream != NULL);
+}
+
 static inline struct iproto_msg *
 tx_accept_msg(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	tx_accept_wpos(msg->connection, &msg->wpos);
 	tx_fiber_init(msg->connection->session, msg->header.sync);
+	tx_prepare_transaction_for_request(msg);
 	return msg;
 }
 
+static inline void
+tx_end_msg(struct iproto_msg *msg)
+{
+	if (msg->stream != NULL) {
+		assert(msg->stream->txn == NULL);
+		msg->stream->txn = txn_detach();
+	}
+}
+
 /**
  * Write error message to the output buffer and advance
  * write position. Doesn't throw.
@@ -1690,6 +1821,7 @@ tx_reply_iproto_error(struct cmsg *m)
 	iproto_reply_error(out, diag_last_error(&msg->diag),
 			   msg->header.sync, ::schema_version);
 	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
 }
 
 /** Inject a short delay on tx request processing for testing. */
@@ -1702,6 +1834,72 @@ tx_inject_delay(void)
 	});
 }
 
+static void
+tx_process_begin(struct cmsg *m)
+{
+	struct iproto_msg *msg = tx_accept_msg(m);
+	struct obuf *out;
+
+	if (tx_check_schema(msg->header.schema_version))
+		goto error;
+
+	if (box_txn_begin() != 0)
+		goto error;
+
+	out = msg->connection->tx.p_obuf;
+	iproto_reply_ok(out, msg->header.sync, ::schema_version);
+	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
+	return;
+error:
+	tx_reply_error(msg);
+	tx_end_msg(msg);
+}
+
+static void
+tx_process_commit(struct cmsg *m)
+{
+	struct iproto_msg *msg = tx_accept_msg(m);
+	struct obuf *out;
+
+	if (tx_check_schema(msg->header.schema_version))
+		goto error;
+
+	if (box_txn_commit() != 0)
+		goto error;
+
+	out = msg->connection->tx.p_obuf;
+	iproto_reply_ok(out, msg->header.sync, ::schema_version);
+	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
+	return;
+error:
+	tx_reply_error(msg);
+	tx_end_msg(msg);
+}
+
+static void
+tx_process_rollback(struct cmsg *m)
+{
+	struct iproto_msg *msg = tx_accept_msg(m);
+	struct obuf *out;
+
+	if (tx_check_schema(msg->header.schema_version))
+		goto error;
+
+	if (box_txn_rollback() != 0)
+		goto error;
+
+	out = msg->connection->tx.p_obuf;
+	iproto_reply_ok(out, msg->header.sync, ::schema_version);
+	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
+	return;
+error:
+	tx_reply_error(msg);
+	tx_end_msg(msg);
+}
+
 static void
 tx_process1(struct cmsg *m)
 {
@@ -1723,9 +1921,11 @@ tx_process1(struct cmsg *m)
 	iproto_reply_select(out, &svp, msg->header.sync, ::schema_version,
 			    tuple != 0);
 	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
 	return;
 error:
 	tx_reply_error(msg);
+	tx_end_msg(msg);
 }
 
 static void
@@ -1766,9 +1966,11 @@ tx_process_select(struct cmsg *m)
 	iproto_reply_select(out, &svp, msg->header.sync,
 			    ::schema_version, count);
 	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
 	return;
 error:
 	tx_reply_error(msg);
+	tx_end_msg(msg);
 }
 
 static int
@@ -1818,6 +2020,12 @@ tx_process_call(struct cmsg *m)
 	if (rc != 0)
 		goto error;
 
+	if (in_txn() != NULL && msg->header.stream_id == 0) {
+		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
+		port_destroy(&port);
+		goto error;
+	}
+
 	/*
 	 * Add all elements returned by the function to iproto.
 	 *
@@ -1856,9 +2064,11 @@ tx_process_call(struct cmsg *m)
 	iproto_reply_select(out, &svp, msg->header.sync,
 			    ::schema_version, count);
 	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
 	return;
 error:
 	tx_reply_error(msg);
+	tx_end_msg(msg);
 }
 
 static void
@@ -1867,6 +2077,7 @@ tx_process_misc(struct cmsg *m)
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = con->tx.p_obuf;
+	assert(!(msg->header.type != IPROTO_PING && in_txn()));
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1899,9 +2110,11 @@ tx_process_misc(struct cmsg *m)
 	} catch (Exception *e) {
 		tx_reply_error(msg);
 	}
+	tx_end_msg(msg);
 	return;
 error:
 	tx_reply_error(msg);
+	tx_end_msg(msg);
 }
 
 static void
@@ -1995,9 +2208,11 @@ tx_process_sql(struct cmsg *m)
 	port_destroy(&port);
 	iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version);
 	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
 	return;
 error:
 	tx_reply_error(msg);
+	tx_end_msg(msg);
 }
 
 static void
@@ -2007,6 +2222,7 @@ tx_process_replication(struct cmsg *m)
 	struct iproto_connection *con = msg->connection;
 	struct ev_io io;
 	coio_create(&io, con->input.fd);
+	assert(!in_txn());
 	try {
 		switch (msg->header.type) {
 		case IPROTO_JOIN:
@@ -2066,9 +2282,24 @@ iproto_msg_finish_processing_in_stream(struct iproto_msg *msg)
 	errinj_stream_msg_count_add(-1);
 
 	if (stailq_empty(&stream->pending_requests)) {
-		struct mh_i64ptr_node_t node = { stream->id, NULL };
-		mh_i64ptr_remove(con->streams, &node, 0);
-		iproto_stream_delete(stream);
+		/*
+		 * If no more messages for the current stream
+		 * and no transaction started, then delete it.
+		 */
+		if (stream->txn == NULL) {
+			struct mh_i64ptr_node_t node = { stream->id, NULL };
+			mh_i64ptr_remove(con->streams, &node, 0);
+			iproto_stream_delete(stream);
+		} else if (!evio_has_fd(&con->input)) {
+			/*
+			 * Here we are in case when connection was closed,
+			 * there is no messages in stream queue, but there
+			 * is some active transaction in stream.
+			 * Send disconnect message to rollback this
+			 * transaction.
+			 */
+			iproto_stream_rollback_on_disconnect(stream);
+		}
 	} else {
 		/*
 		 * If there are new messages for this stream
@@ -2404,6 +2635,23 @@ iproto_session_push(struct session *session, struct port *port)
 static inline void
 iproto_thread_init_routes(struct iproto_thread *iproto_thread)
 {
+	iproto_thread->begin_route[0] =
+		{ tx_process_begin, &iproto_thread->net_pipe };
+	iproto_thread->begin_route[1] =
+		{ net_send_msg, NULL };
+	iproto_thread->commit_route[0] =
+		{ tx_process_commit, &iproto_thread->net_pipe };
+	iproto_thread->commit_route[1] =
+		{ net_send_msg, NULL };
+	iproto_thread->rollback_route[0] =
+		{ tx_process_rollback, &iproto_thread->net_pipe };
+	iproto_thread->rollback_route[1] =
+		{ net_send_msg, NULL };
+	iproto_thread->rollback_on_disconnect_route[0] =
+		{ tx_process_rollback_on_disconnect,
+		  &iproto_thread->net_pipe };
+	iproto_thread->rollback_on_disconnect_route[1] =
+		{ net_finish_rollback_on_disconnect, NULL };
 	iproto_thread->destroy_route[0] =
 		{ tx_process_destroy, &iproto_thread->net_pipe };
 	iproto_thread->destroy_route[1] =
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index f2902946a..913a64de5 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -166,6 +166,9 @@ const char *iproto_type_strs[] =
 	"EXECUTE",
 	NULL, /* NOP */
 	"PREPARE",
+	"BEGIN",
+	"COMMIT",
+	"ROLLBACK",
 };
 
 #define bit(c) (1ULL<<IPROTO_##c)
@@ -184,6 +187,9 @@ const uint64_t iproto_body_key_map[IPROTO_TYPE_STAT_MAX] = {
 	0,                                                     /* EXECUTE */
 	0,                                                     /* NOP */
 	0,                                                     /* PREPARE */
+	0,                                                     /* BEGIN */
+	0,                                                     /* COMMIT */
+	0,                                                     /* ROLLBACK */
 };
 #undef bit
 
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 8792737b2..921007580 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -237,6 +237,12 @@ enum iproto_type {
 	IPROTO_NOP = 12,
 	/** Prepare SQL statement. */
 	IPROTO_PREPARE = 13,
+	/* Begin transaction */
+	IPROTO_BEGIN = 14,
+	/* Commit transaction */
+	IPROTO_COMMIT = 15,
+	/* Rollback transaction */
+	IPROTO_ROLLBACK = 16,
 	/** The maximum typecode used for box.stat() */
 	IPROTO_TYPE_STAT_MAX,
 
diff --git a/src/box/txn.c b/src/box/txn.c
index e057d2762..06d048870 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -1255,3 +1255,26 @@ txn_on_yield(struct trigger *trigger, void *event)
 	txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD);
 	return 0;
 }
+
+struct txn *
+txn_detach(void)
+{
+	struct txn *txn = in_txn();
+	if (txn == NULL)
+		return NULL;
+	if (!txn_has_flag(txn, TXN_CAN_YIELD)) {
+		txn_on_yield(NULL, NULL);
+		trigger_clear(&txn->fiber_on_yield);
+	}
+	trigger_clear(&txn->fiber_on_stop);
+	fiber_set_txn(fiber(), NULL);
+	return txn;
+}
+
+void
+txn_attach(struct txn *txn)
+{
+	assert(txn != NULL);
+	assert(!in_txn());
+	fiber_set_txn(fiber(), txn);
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index 8741dc6a1..f11144567 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -457,6 +457,25 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
 	fiber->storage.txn = txn;
 }
 
+/**
+ * Detach transaction from fiber.
+ * By default if the fiber is stopped the transaction started
+ * in this fiber is rollback. This function detaches transaction
+ * from fiber - detached transaction does not rollback in case
+ * when fiber stopped, but can be aborted in case it does not
+ * support yeild.
+ */
+struct txn *
+txn_detach(void);
+
+/**
+ * Attach transaction to fiber.
+ * Attach @a txn that has been detached previously and saved
+ * somewhere to a new fiber.
+ */
+void
+txn_attach(struct txn *txn);
+
 /**
  * Start a transaction explicitly.
  * @pre no transaction is active
diff --git a/test/box-tap/feedback_daemon.test.lua b/test/box-tap/feedback_daemon.test.lua
index a2e041649..f700f3f72 100755
--- a/test/box-tap/feedback_daemon.test.lua
+++ b/test/box-tap/feedback_daemon.test.lua
@@ -251,7 +251,7 @@ box.space.features_sync:drop()
 
 local function check_stats(stat)
     local sub = test:test('feedback operation stats')
-    sub:plan(18)
+    sub:plan(21)
     local box_stat = box.stat()
     local net_stat = box.stat.net()
     for op, val in pairs(box_stat) do
diff --git a/test/box/error.result b/test/box/error.result
index f80fdfed5..bc804197a 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -448,6 +448,7 @@ t;
  |   227: box.error.SYNC_QUEUE_UNCLAIMED
  |   228: box.error.SYNC_QUEUE_FOREIGN
  |   229: box.error.UNABLE_TO_PROCESS_IN_STREAM
+ |   230: box.error.UNABLE_TO_PROCESS_OUT_OF_STREAM
  | ...
 
 test_run:cmd("setopt delimiter ''");
diff --git a/test/box/misc.result b/test/box/misc.result
index b62a64355..c86245914 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -136,11 +136,14 @@ end;
 t;
 ---
 - - DELETE
+  - COMMIT
   - SELECT
+  - ROLLBACK
   - INSERT
   - EVAL
-  - CALL
   - ERROR
+  - CALL
+  - BEGIN
   - PREPARE
   - REPLACE
   - UPSERT
-- 
2.20.1


^ permalink raw reply	[flat|nested] 15+ messages in thread

* [Tarantool-patches] [PATCH v4 9/9] net.box: add interactive transaction support in net.box
  2021-08-12  9:50 [Tarantool-patches] [PATCH v4 0/9] implement iproto streams mechanik20051988 via Tarantool-patches
                   ` (7 preceding siblings ...)
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 8/9] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches
@ 2021-08-12  9:50 ` mechanik20051988 via Tarantool-patches
  2021-08-12 17:52   ` Vladislav Shpilevoy via Tarantool-patches
  2021-08-12 10:51 ` [Tarantool-patches] [PATCH v4 0/9] implement iproto streams Vladimir Davydov via Tarantool-patches
  9 siblings, 1 reply; 15+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-12  9:50 UTC (permalink / raw)
  To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988

From: mechanik20051988 <mechanik20.05.1988@gmail.com>

Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.

Closes #5860

@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:new_stream()
space = stream.space.test
space_not_from_stream = conn.space.test

stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
---
 .../gh-5860-implement-streams-in-iproto.md    |   26 +
 src/box/lua/net_box.c                         |   49 +-
 src/box/lua/net_box.lua                       |   35 +-
 ...ox_iproto_transactions_over_streams.result | 3009 +++++++++++++++++
 ..._iproto_transactions_over_streams.test.lua | 1238 +++++++
 test/box/suite.ini                            |    2 +-
 6 files changed, 4356 insertions(+), 3 deletions(-)
 create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
 create mode 100644 test/box/net.box_iproto_transactions_over_streams.result
 create mode 100644 test/box/net.box_iproto_transactions_over_streams.test.lua

diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
new file mode 100644
index 000000000..8a8eec3e7
--- /dev/null
+++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
@@ -0,0 +1,26 @@
+## feature/core
+
+* Streams and interactive transactions over streams are implemented
+  in iproto. Stream is associated with it's ID, which is unique within
+  one connection. All requests with same not zero stream ID belongs to
+  the same stream. All requests in stream processed synchronously. The
+  execution of the next request will not start until the previous one is
+  completed. If request has zero stream ID it does not belong to stream
+  and is processed in the old way.
+  In `net.box`, stream is an object above connection that has the same
+  methods, but allows to execute requests sequentially. ID is generated
+  on the client side automatically. If user writes his own connector and
+  wants to use streams, he must transmit stream_id over iproto protocol.
+  The main purpose of streams is transactions via iproto. Each stream
+  can start its own transaction, so they allows multiplexing several
+  transactions over one connection. There are multiple ways to begin,
+  commit and rollback transaction: using appropriate stream methods, using
+  `call` or `eval` methods or using `execute` method with sql transaction
+  syntax. User can mix these methods, for example, start transaction using
+  `stream:begin()`, and commit transaction using `stream:call('box.commit')`
+  or stream:execute('COMMIT').
+  If any request fails during the transaction, it will not affect the other
+  requests in the transaction. If disconnect occurs when there is some active
+  transaction in stream, this transaction will be rollbacked, if it does not
+  have time to commit before this moment.
+
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 3bc49af23..229dec590 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -75,7 +75,10 @@ enum netbox_method {
 	NETBOX_MIN         = 14,
 	NETBOX_MAX         = 15,
 	NETBOX_COUNT       = 16,
-	NETBOX_INJECT      = 17,
+	NETBOX_BEGIN       = 17,
+	NETBOX_COMMIT      = 18,
+	NETBOX_ROLLBACK    = 19,
+	NETBOX_INJECT      = 20,
 	netbox_method_MAX
 };
 
@@ -916,6 +919,44 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
 	netbox_encode_prepare(L, idx, stream, sync, stream_id);
 }
 
+static inline void
+netbox_encode_txn(lua_State *L, enum iproto_type type, int idx,
+		  struct mpstream *stream, uint64_t sync,
+		  uint64_t stream_id)
+{
+	(void)L;
+	(void) idx;
+	assert(type == IPROTO_BEGIN ||
+	       type == IPROTO_COMMIT ||
+	       type == IPROTO_ROLLBACK);
+	size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
+	netbox_end_encode(stream, svp);
+}
+
+static void
+netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream,
+		    uint64_t sync, uint64_t stream_id)
+{
+	return netbox_encode_txn(L, IPROTO_BEGIN, idx, stream,
+				 sync, stream_id);
+}
+
+static void
+netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream,
+		     uint64_t sync, uint64_t stream_id)
+{
+	return netbox_encode_txn(L, IPROTO_COMMIT, idx, stream,
+				 sync, stream_id);
+}
+
+static void
+netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream,
+		       uint64_t sync, uint64_t stream_id)
+{
+	return netbox_encode_txn(L, IPROTO_ROLLBACK, idx, stream,
+				 sync, stream_id);
+}
+
 static void
 netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
 		     uint64_t sync, uint64_t stream_id)
@@ -959,6 +1000,9 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
 		[NETBOX_MIN]		= netbox_encode_select,
 		[NETBOX_MAX]		= netbox_encode_select,
 		[NETBOX_COUNT]		= netbox_encode_call,
+		[NETBOX_BEGIN]          = netbox_encode_begin,
+		[NETBOX_COMMIT]         = netbox_encode_commit,
+		[NETBOX_ROLLBACK]       = netbox_encode_rollback,
 		[NETBOX_INJECT]		= netbox_encode_inject,
 	};
 	struct mpstream stream;
@@ -1330,6 +1374,9 @@ netbox_decode_method(struct lua_State *L, enum netbox_method method,
 		[NETBOX_MIN]		= netbox_decode_tuple,
 		[NETBOX_MAX]		= netbox_decode_tuple,
 		[NETBOX_COUNT]		= netbox_decode_value,
+		[NETBOX_BEGIN]          = netbox_decode_nil,
+		[NETBOX_COMMIT]         = netbox_decode_nil,
+		[NETBOX_ROLLBACK]       = netbox_decode_nil,
 		[NETBOX_INJECT]		= netbox_decode_table,
 	};
 	method_decoder[method](L, data, data_end, format);
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 8d707fb26..f203b203e 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -51,8 +51,11 @@ local M_GET         = 13
 local M_MIN         = 14
 local M_MAX         = 15
 local M_COUNT       = 16
+local M_BEGIN       = 17
+local M_COMMIT      = 18
+local M_ROLLBACK    = 19
 -- Injects raw data into connection. Used by console and tests.
-local M_INJECT      = 17
+local M_INJECT      = 20
 
 -- utility tables
 local is_final_state         = {closed = 1, error = 1}
@@ -754,11 +757,38 @@ local function stream_new_stream(stream)
     return stream._conn:new_stream()
 end
 
+local function stream_begin(stream, opts)
+    check_remote_arg(stream, 'begin')
+    local res = stream:_request(M_BEGIN, opts, nil, stream._stream_id)
+    if opts and opts.is_async then
+        return res
+    end
+end
+
+local function stream_commit(stream, opts)
+    check_remote_arg(stream, 'commit')
+    local res = stream:_request(M_COMMIT, opts, nil, stream._stream_id)
+    if opts and opts.is_async then
+        return res
+    end
+end
+
+local function stream_rollback(stream, opts)
+    check_remote_arg(stream, 'rollback')
+    local res = stream:_request(M_ROLLBACK, opts, nil, stream._stream_id)
+    if opts and opts.is_async then
+        return res
+    end
+end
+
 function remote_methods:new_stream()
     check_remote_arg(self, 'new_stream')
     self._last_stream_id = self._last_stream_id + 1
     local stream = setmetatable({
         new_stream = stream_new_stream,
+        begin = stream_begin,
+        commit = stream_commit,
+        rollback = stream_rollback,
         _stream_id = self._last_stream_id,
         space = setmetatable({
             _stream_space_cache = {},
@@ -1243,6 +1273,9 @@ local this_module = {
         min         = M_MIN,
         max         = M_MAX,
         count       = M_COUNT,
+        begin       = M_BEGIN,
+        commit      = M_COMMIT,
+        rollback    = M_ROLLBACK,
         inject      = M_INJECT,
     }
 }
diff --git a/test/box/net.box_iproto_transactions_over_streams.result b/test/box/net.box_iproto_transactions_over_streams.result
new file mode 100644
index 000000000..c2167e760
--- /dev/null
+++ b/test/box/net.box_iproto_transactions_over_streams.result
@@ -0,0 +1,3009 @@
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+---
+...
+json = require('json')
+---
+...
+fiber = require('fiber')
+---
+...
+msgpack = require('msgpack')
+---
+...
+test_run = require('test_run').new()
+---
+...
+test_run:cmd("create server test with script='box/iproto_streams.lua'")
+---
+- true
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+---
+...
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+test_run:cmd("start server test with args='1'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+---
+...
+conn_2 = net_box.connect(server_addr)
+---
+...
+stream_1_1 = conn_1:new_stream()
+---
+...
+stream_1_2 = conn_1:new_stream()
+---
+...
+stream_2 = conn_2:new_stream()
+---
+...
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+---
+...
+stream_1_1:rollback()
+---
+...
+stream_1_1:begin()
+---
+...
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+---
+...
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+---
+...
+box.commit()
+---
+...
+test_run:switch("default")
+---
+- true
+...
+stream_1_1:commit()
+---
+...
+stream_1_2:commit()
+---
+...
+stream_2:commit()
+---
+...
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+---
+- error: Unable to process BEGIN request out of stream
+...
+conn:_request(net_box._method.commit, nil, nil, nil)
+---
+- error: Unable to process COMMIT request out of stream
+...
+conn:_request(net_box._method.rollback, nil, nil, nil)
+---
+- error: Unable to process ROLLBACK request out of stream
+...
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+---
+...
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+---
+...
+IPROTO_REQUEST_TYPE       = 0x00
+---
+...
+IPROTO_SYNC               = 0x01
+---
+...
+IPROTO_AUTH               = 7
+---
+...
+IPROTO_STREAM_ID          = 0x0a
+---
+...
+next_request_id           = 9
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+header = msgpack.encode({
+    [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+    [IPROTO_SYNC] = next_request_id,
+    [IPROTO_STREAM_ID] = 1,
+});
+---
+...
+body = msgpack.encode({nil});
+---
+...
+size = msgpack.encode(header:len() + body:len());
+---
+...
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+                                nil, nil, nil, nil,
+                                size .. header .. body);
+---
+- null
+- Unable to process AUTH request in stream
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+conn:close()
+---
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+space:replace({1})
+---
+- [1]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+---
+- []
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+---
+...
+stream:ping()
+---
+- true
+...
+stream:commit()
+---
+...
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+---
+...
+stream:call('s:replace', {{1}})
+---
+- [1]
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+stream:call('s:select', {})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+stream:call('s:replace', {{1}})
+---
+- [1]
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+stream:call('s:select', {})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+test_run:switch('test')
+---
+- true
+...
+s:drop()
+---
+...
+-- Check that there are no streams and messages, which
+-- was not deleted
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+stream:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+---
+...
+space_2_no_stream = conn.space.test_2
+---
+...
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+stream_2:begin()
+---
+...
+space_2:replace({1})
+---
+- [1]
+...
+test_run:switch('test')
+---
+- true
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+---
+- []
+...
+space_2_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+---
+- - [1]
+...
+space_2:select({})
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+---
+- - [1]
+...
+space_2:select{}
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+---
+- - [1]
+...
+s2:select()
+---
+- - [1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1_1 = stream_1.space.test_1
+---
+...
+space_1_2 = stream_2.space.test_1
+---
+...
+space_2_1 = stream_1.space.test_2
+---
+...
+space_2_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Simple read/write conflict.
+space_1_1:select({1})
+---
+- []
+...
+space_1_2:select({1})
+---
+- []
+...
+space_1_1:replace({1, 1})
+---
+- [1, 1]
+...
+space_1_2:replace({1, 2})
+---
+- [1, 2]
+...
+stream_1:commit()
+---
+...
+-- This transaction fails, because of conflict
+stream_2:commit()
+---
+- error: Transaction has been aborted by conflict
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- Here we must accept [1, 1]
+space_1_1:select({})
+---
+- - [1, 1]
+...
+space_1_2:select({})
+---
+- - [1, 1]
+...
+-- Same test for vinyl sapce
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_2_1:select({1})
+---
+- []
+...
+space_2_2:select({1})
+---
+- []
+...
+space_2_1:replace({1, 1})
+---
+- [1, 1]
+...
+space_2_2:replace({1, 2})
+---
+- [1, 2]
+...
+stream_1:commit()
+---
+...
+-- This transaction fails, because of conflict
+stream_2:commit()
+---
+- error: Transaction has been aborted by conflict
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- Here we must accept [1, 1]
+space_2_1:select({})
+---
+- - [1, 1]
+...
+space_2_2:select({})
+---
+- - [1, 1]
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+---
+- - [1, 1]
+...
+s2:select()
+---
+- - [1, 1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Test rollback for memtx space
+space_1:replace({1})
+---
+- [1]
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+---
+- - [1]
+...
+stream_1:rollback()
+---
+...
+-- Select is empty, transaction rollback
+space_1:select({})
+---
+- []
+...
+-- Test rollback for vinyl space
+space_2:replace({1})
+---
+- [1]
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+---
+- - [1]
+...
+stream_2:rollback()
+---
+...
+-- Select is empty, transaction rollback
+space_2:select({})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+---
+...
+stream_1:commit()
+---
+...
+stream_2:begin()
+---
+...
+stream_2:commit()
+---
+...
+conn:close()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_1:select({})
+---
+- - [1]
+  - [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_2:select({})
+---
+- - [1]
+  - [2]
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Empty selects, transaction was rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Reconnect
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Two empty selects
+space_1:select({})
+---
+- []
+...
+space_2:select({})
+---
+- []
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_1:select({})
+---
+- - [1]
+  - [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_2:select({})
+---
+- - [1]
+  - [2]
+...
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for i = 1, 1000 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+fiber.sleep(0)
+---
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+---
+- true
+...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+---
+- true
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Select was empty, transaction rollbacked
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = stream_1:commit({is_async = true})
+---
+...
+_ = stream_2:commit({is_async = true})
+---
+...
+fiber.sleep(0)
+---
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+---
+- true
+...
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+---
+- true
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+---
+...
+rc2 = s2:select()
+---
+...
+assert(#rc1)
+---
+- 100
+...
+assert(#rc2)
+---
+- 100
+...
+s1:truncate()
+---
+...
+s2:truncate()
+---
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Reconnect
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Two empty selects
+space_1:select({})
+---
+- []
+...
+space_2:select({})
+---
+- []
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Here we get two tuples, commit was successful
+s1:select{}
+---
+- - [1]
+  - [2]
+...
+-- Here we get two tuples, commit was successful
+s2:select{}
+---
+- - [1]
+  - [2]
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("start server test with args='1, true'")
+---
+- true
+...
+test_run:switch('test')
+---
+- true
+...
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+---
+- - [1]
+  - [2]
+...
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+---
+- - [1]
+  - [2]
+...
+box.space.test_1:drop()
+---
+...
+box.space.test_2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream_1 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+memtx_futures = {}
+---
+...
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+---
+...
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+---
+...
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+---
+...
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+---
+...
+vinyl_futures = {}
+---
+...
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+---
+...
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+---
+...
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+---
+...
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+---
+...
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+---
+...
+memtx_results = wait_and_return_results(memtx_futures)
+---
+...
+vinyl_results = wait_and_return_results(vinyl_futures)
+---
+...
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+---
+- true
+...
+assert(not vinyl_results["begin"])
+---
+- true
+...
+-- [1]
+assert(memtx_results["replace"])
+---
+- [1]
+...
+assert(vinyl_results["replace"])
+---
+- [1]
+...
+-- [2]
+assert(memtx_results["insert"])
+---
+- [2]
+...
+assert(vinyl_results["insert"])
+---
+- [2]
+...
+-- [1] [2]
+assert(memtx_results["select"])
+---
+- - [1]
+  - [2]
+...
+assert(vinyl_results["select"])
+---
+- - [1]
+  - [2]
+...
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+---
+- true
+...
+assert(not vinyl_results["commit"])
+---
+- true
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+---
+- - [1]
+  - [2]
+...
+s2:select()
+---
+- - [1]
+  - [2]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1_1 = stream_1.space.test_1
+---
+...
+space_1_2 = stream_2.space.test_1
+---
+...
+space_2_1 = stream_1.space.test_2
+---
+...
+space_2_2 = stream_2.space.test_2
+---
+...
+futures_1 = {}
+---
+...
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+---
+...
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+---
+...
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+---
+...
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+---
+...
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+---
+...
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+---
+...
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+---
+...
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+---
+...
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+---
+...
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+---
+...
+results_1 = wait_and_return_results(futures_1)
+---
+...
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+---
+- true
+...
+assert(not results_1["begin_2"])
+---
+- true
+...
+-- []
+assert(not results_1["select_1_1"][1])
+---
+- true
+...
+assert(not results_1["select_1_2"][1])
+---
+- true
+...
+-- [1]
+assert(results_1["replace_1_1"][1])
+---
+- 1
+...
+-- [1]
+assert(results_1["replace_1_1"][2])
+---
+- 1
+...
+-- [1]
+assert(results_1["replace_1_2"][1])
+---
+- 1
+...
+-- [2]
+assert(results_1["replace_1_2"][2])
+---
+- 2
+...
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+---
+- true
+...
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+---
+- Transaction has been aborted by conflict
+...
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+---
+- [1, 1]
+...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+futures_2 = {}
+---
+...
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+---
+...
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+---
+...
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+---
+...
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+---
+...
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+---
+...
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+---
+...
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+---
+...
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+---
+...
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+---
+...
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+---
+...
+results_2 = wait_and_return_results(futures_2)
+---
+...
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+---
+- true
+...
+assert(not results_2["begin_2"])
+---
+- true
+...
+-- []
+assert(not results_2["select_2_1"][1])
+---
+- true
+...
+assert(not results_2["select_2_2"][1])
+---
+- true
+...
+-- [1]
+assert(results_2["replace_2_1"][1])
+---
+- 1
+...
+-- [1]
+assert(results_2["replace_2_1"][2])
+---
+- 1
+...
+-- [1]
+assert(results_2["replace_2_2"][1])
+---
+- 1
+...
+-- [2]
+assert(results_2["replace_2_2"][2])
+---
+- 2
+...
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+---
+- true
+...
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+---
+- Transaction has been aborted by conflict
+...
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+---
+- [1, 1]
+...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+test_run:switch('test')
+---
+- true
+...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+---
+- - [1, 1]
+...
+s2:select()
+---
+- - [1, 1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+function ping() return "pong" end
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+space_no_stream = conn.space.test
+---
+...
+-- successful begin using stream:call
+stream:call('box.begin')
+---
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+stream:call('ping')
+---
+- pong
+...
+stream:eval('ping()')
+---
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+stream:eval('box.begin()')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- successful commit using stream:call
+stream:call('box.commit')
+---
+...
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+---
+...
+space:replace({1})
+---
+- [1]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, because transaction was successful
+s:select()
+---
+- - [1]
+...
+s:delete{1}
+---
+- [1]
+...
+test_run:switch('default')
+---
+- true
+...
+-- Check rollback using stream:call
+stream:begin()
+---
+...
+space:replace({2})
+---
+- [2]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+---
+- - [2]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+--Successful rollback using stream:call
+stream:call('box.rollback')
+---
+...
+-- Empty selects transaction rollbacked
+space:select({})
+---
+- []
+...
+space_no_stream:select{}
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Empty select transaction rollbacked
+s:select()
+---
+- []
+...
+s:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+for i = 1, 10 do space:replace{i} end
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+s:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+---
+- true
+...
+function execute_sql_string(stream, sql_string)
+    if stream then
+        stream:execute(sql_string)
+    else
+        box.execute(sql_string)
+    end
+end$
+---
+...
+function execute_sql_string_and_return_result(stream, sql_string)
+    if stream then
+        return pcall(stream.execute, stream, sql_string)
+    else
+        return box.execute(sql_string)
+    end
+end$
+---
+...
+function monster_ddl(stream)
+    local _, err1, err2, err3, err4, err5, err6
+    local stream_or_box = stream or box
+    execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER);]])
+    execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER UNIQUE,
+                                                 CONSTRAINT ck1
+                                                 CHECK(b < 100));]])
+
+    execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+    execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+    execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+                                                          KEY, a INTEGER);]])
+
+    execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+    execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+                                 CHECK(b > 0);]])
+
+    _, err1 =
+        execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+                                                       RENAME TO t1;]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+                                 ck2 CHECK(a > 0);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+                                                       INTEGER PRIMARY KEY);]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+
+    execute_sql_string(stream, [[CREATE TABLE
+                                 trigger_catcher(id INTEGER PRIMARY
+                                                 KEY AUTOINCREMENT);]])
+
+    execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+    execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+                                 t1 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+                                 t2 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+                                                       ON t1(a, b);]])
+
+    execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+    _, err5 =
+        execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+    _, err6 =
+        execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+                                                       t_does_not_exist;]])
+
+    execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+    return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+            err5, err6}
+end$
+---
+...
+function monster_ddl_cmp_res(res1, res2)
+    if json.encode(res1) == json.encode(res2) then
+        return true
+    end
+    return res1, res2
+end$
+---
+...
+function monster_ddl_is_clean(stream)
+    local stream_or_box = stream or box
+    assert(stream_or_box.space.T1 == nil)
+    assert(stream_or_box.space.T2 == nil)
+    assert(stream_or_box.space._trigger:count() == 0)
+    assert(stream_or_box.space._fk_constraint:count() == 0)
+    assert(stream_or_box.space._ck_constraint:count() == 0)
+    assert(stream_or_box.space.T_RENAMED == nil)
+    assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+---
+...
+function monster_ddl_check(stream)
+    local _, err1, err2, err3, err4, res
+    local stream_or_box = stream or box
+    _, err1 =
+       execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                      VALUES (1, 1, 101)]])
+    execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                       VALUES(2, 2, 1)]])
+    _, err3 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, 20, 1)]])
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, -1, 1)]])
+    execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+    if not stream then
+        assert(stream_or_box.space.T_RENAMED ~= nil)
+        assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+        assert(stream_or_box.space.T_TO_RENAME == nil)
+        res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                             trigger_catcher]])
+    else
+        _, res =
+            execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                           trigger_catcher]])
+    end
+    return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+            err3, err4, res}
+end$
+---
+...
+function monster_ddl_clear(stream)
+    execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+---
+...
+test_run:cmd("setopt delimiter ''")$
+---
+- true
+...
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+test_run:switch('test')
+---
+- true
+...
+test_run:cmd("setopt delimiter '$'")
+---
+- true
+...
+function monster_ddl_is_clean()
+    if not (box.space.T1 == nil) or
+       not (box.space.T2 == nil) or
+       not (box.space._trigger:count() == 0) or
+       not (box.space._fk_constraint:count() == 0) or
+       not (box.space._ck_constraint:count() == 0) or
+       not (box.space.T_RENAMED == nil) or
+       not (box.space.T_TO_RENAME == nil) then
+           return false
+    end
+    return true
+end$
+---
+...
+test_run:cmd("setopt delimiter ''")$
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream = conn:new_stream()
+---
+...
+-- No txn.
+true_ddl_res = monster_ddl()
+---
+...
+true_ddl_res
+---
+- - 'Finished ok, errors in the middle: '
+  - Space 'T1' already exists
+  - Space 'T1' already exists
+  - Space 'T3' does not exist
+  - Index 'T1A' already exists in space 'T1'
+  - 'Failed to execute SQL statement: can not truncate space ''T2'' because other
+    objects depend on it'
+  - Space 'T_DOES_NOT_EXIST' does not exist
+...
+true_check_res = monster_ddl_check()
+---
+...
+true_check_res
+---
+- - 'Finished ok, errors and trigger catcher content: '
+  - 'Check constraint failed ''CK1'': b < 100'
+  - Duplicate key exists in unique index "unique_unnamed_T2_2" in space "T2" with
+    old tuple - [1, 1, 1] and new tuple - [2, 2, 1]
+  - 'Failed to execute SQL statement: FOREIGN KEY constraint failed'
+  - 'Check constraint failed ''CK2'': a > 0'
+  - metadata:
+    - name: ID
+      type: integer
+    rows:
+    - [1]
+...
+monster_ddl_clear()
+---
+...
+monster_ddl_is_clean()
+---
+...
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+---
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+---
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+test_run:switch("test")
+---
+- true
+...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+---
+- row_count: 1
+...
+-- reload schema
+stream:ping()
+---
+- true
+...
+space = stream.space.TEST
+---
+...
+assert(space ~= nil)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+space:replace{1, 2, '3'}
+---
+- [1, 2, '3']
+...
+space:select()
+---
+- - [1, 2, '3']
+...
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+---
+- []
+...
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+---
+...
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+---
+...
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+---
+- true
+...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+---
+- metadata:
+  - name: ID
+    type: integer
+  - name: A
+    type: number
+  - name: B
+    type: string
+  rows:
+  - [1, 2, '3']
+...
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+---
+- metadata:
+  - name: ID
+    type: integer
+  - name: A
+    type: number
+  - name: B
+    type: string
+  rows: []
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+---
+- metadata:
+  - name: ID
+    type: integer
+  - name: A
+    type: number
+  - name: B
+    type: string
+  rows:
+  - [1, 2, '3']
+...
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+---
+- metadata:
+  - name: ID
+    type: integer
+  - name: A
+    type: number
+  - name: B
+    type: string
+  rows:
+  - [1, 2, '3']
+...
+stream:unprepare(stream_pr.stmt_id)
+---
+- null
+...
+conn:close()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+---
+- - [1, 2, '3']
+...
+box.space.TEST:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("cleanup server test")
+---
+- true
+...
+test_run:cmd("delete server test")
+---
+- true
+...
diff --git a/test/box/net.box_iproto_transactions_over_streams.test.lua b/test/box/net.box_iproto_transactions_over_streams.test.lua
new file mode 100644
index 000000000..094c451a9
--- /dev/null
+++ b/test/box/net.box_iproto_transactions_over_streams.test.lua
@@ -0,0 +1,1238 @@
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+json = require('json')
+fiber = require('fiber')
+msgpack = require('msgpack')
+test_run = require('test_run').new()
+
+test_run:cmd("create server test with script='box/iproto_streams.lua'")
+
+test_run:cmd("setopt delimiter ';'")
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+test_run:cmd("setopt delimiter ''");
+
+test_run:cmd("start server test with args='1'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+conn_2 = net_box.connect(server_addr)
+stream_1_1 = conn_1:new_stream()
+stream_1_2 = conn_1:new_stream()
+stream_2 = conn_2:new_stream()
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+stream_1_1:rollback()
+
+stream_1_1:begin()
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+test_run:switch("test")
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+box.commit()
+test_run:switch("default")
+stream_1_1:commit()
+stream_1_2:commit()
+stream_2:commit()
+
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+conn:_request(net_box._method.commit, nil, nil, nil)
+conn:_request(net_box._method.rollback, nil, nil, nil)
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+IPROTO_REQUEST_TYPE       = 0x00
+IPROTO_SYNC               = 0x01
+IPROTO_AUTH               = 7
+IPROTO_STREAM_ID          = 0x0a
+next_request_id           = 9
+test_run:cmd("setopt delimiter ';'")
+header = msgpack.encode({
+    [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+    [IPROTO_SYNC] = next_request_id,
+    [IPROTO_STREAM_ID] = 1,
+});
+body = msgpack.encode({nil});
+size = msgpack.encode(header:len() + body:len());
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+                                nil, nil, nil, nil,
+                                size .. header .. body);
+test_run:cmd("setopt delimiter ''");
+conn:close()
+test_run:cmd("stop server test")
+
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+test_run:switch('default')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+-- Select is empty, transaction was aborted
+space:select{}
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+stream:ping()
+stream:commit()
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+-- Select is empty, transaction was aborted
+space:select{}
+
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+-- Select is empty, transaction was aborted
+space:select{}
+
+test_run:switch('test')
+s:drop()
+-- Check that there are no streams and messages, which
+-- was not deleted
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+stream:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s1:create_index('primary')
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+space_2_no_stream = conn.space.test_2
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+space_1:replace({1})
+stream_2:begin()
+space_2:replace({1})
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+test_run:switch('default')
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+space_2_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+space_2:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+space_2:select{}
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Simple read/write conflict.
+space_1_1:select({1})
+space_1_2:select({1})
+space_1_1:replace({1, 1})
+space_1_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_1_1:select({})
+space_1_2:select({})
+
+-- Same test for vinyl sapce
+stream_1:begin()
+stream_2:begin()
+space_2_1:select({1})
+space_2_2:select({1})
+space_2_1:replace({1, 1})
+space_2_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_2_1:select({})
+space_2_2:select({})
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Test rollback for memtx space
+space_1:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+stream_1:rollback()
+-- Select is empty, transaction rollback
+space_1:select({})
+
+-- Test rollback for vinyl space
+space_2:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+stream_2:rollback()
+-- Select is empty, transaction rollback
+space_2:select({})
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+stream_1:commit()
+stream_2:begin()
+stream_2:commit()
+conn:close()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+conn:close()
+
+test_run:switch("test")
+-- Empty selects, transaction was rollback
+s1:select()
+s2:select()
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+for i = 1, 1000 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select was empty, transaction rollbacked
+s1:select()
+s2:select()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+test_run:cmd("setopt delimiter ';'")
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+    space_1:replace({i}, {is_async = true})
+    space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+_ = stream_1:commit({is_async = true})
+_ = stream_2:commit({is_async = true})
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+    return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+rc2 = s2:select()
+assert(#rc1)
+assert(#rc2)
+s1:truncate()
+s2:truncate()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+
+stream_1:begin()
+stream_2:begin()
+space_1:replace({1})
+space_1:replace({2})
+space_2:replace({1})
+space_2:replace({2})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+s1:select{}
+-- Here we get two tuples, commit was successful
+s2:select{}
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+test_run:cmd("start server test with args='1, true'")
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+box.space.test_1:drop()
+box.space.test_2:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+space_1 = stream_1.space.test_1
+stream_2 = conn:new_stream()
+space_2 = stream_2.space.test_2
+
+memtx_futures = {}
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+
+vinyl_futures = {}
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+
+memtx_results = wait_and_return_results(memtx_futures)
+vinyl_results = wait_and_return_results(vinyl_futures)
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+assert(not vinyl_results["begin"])
+-- [1]
+assert(memtx_results["replace"])
+assert(vinyl_results["replace"])
+-- [2]
+assert(memtx_results["insert"])
+assert(vinyl_results["insert"])
+-- [1] [2]
+assert(memtx_results["select"])
+assert(vinyl_results["select"])
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+assert(not vinyl_results["commit"])
+
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+
+futures_1 = {}
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+
+results_1 = wait_and_return_results(futures_1)
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+assert(not results_1["begin_2"])
+-- []
+assert(not results_1["select_1_1"][1])
+assert(not results_1["select_1_2"][1])
+-- [1]
+assert(results_1["replace_1_1"][1])
+-- [1]
+assert(results_1["replace_1_1"][2])
+-- [1]
+assert(results_1["replace_1_2"][1])
+-- [2]
+assert(results_1["replace_1_2"][2])
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+
+futures_2 = {}
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+
+results_2 = wait_and_return_results(futures_2)
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+assert(not results_2["begin_2"])
+-- []
+assert(not results_2["select_2_1"][1])
+assert(not results_2["select_2_2"][1])
+-- [1]
+assert(results_2["replace_2_1"][1])
+-- [1]
+assert(results_2["replace_2_1"][2])
+-- [1]
+assert(results_2["replace_2_2"][1])
+-- [2]
+assert(results_2["replace_2_2"][2])
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+function ping() return "pong" end
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+space_no_stream = conn.space.test
+
+-- successful begin using stream:call
+stream:call('box.begin')
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+stream:call('ping')
+stream:eval('ping()')
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+stream:eval('box.begin()')
+-- successful commit using stream:call
+stream:call('box.commit')
+
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+test_run:switch("test")
+-- Select return tuple, because transaction was successful
+s:select()
+s:delete{1}
+test_run:switch('default')
+-- Check rollback using stream:call
+stream:begin()
+space:replace({2})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful rollback using stream:call
+stream:call('box.rollback')
+-- Empty selects transaction rollbacked
+space:select({})
+space_no_stream:select{}
+test_run:switch("test")
+-- Empty select transaction rollbacked
+s:select()
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+for i = 1, 10 do space:replace{i} end
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+s:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+function execute_sql_string(stream, sql_string)
+    if stream then
+        stream:execute(sql_string)
+    else
+        box.execute(sql_string)
+    end
+end$
+function execute_sql_string_and_return_result(stream, sql_string)
+    if stream then
+        return pcall(stream.execute, stream, sql_string)
+    else
+        return box.execute(sql_string)
+    end
+end$
+function monster_ddl(stream)
+    local _, err1, err2, err3, err4, err5, err6
+    local stream_or_box = stream or box
+    execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER);]])
+    execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+                                                 a INTEGER,
+                                                 b INTEGER UNIQUE,
+                                                 CONSTRAINT ck1
+                                                 CHECK(b < 100));]])
+
+    execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+    execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+    execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+                                                          KEY, a INTEGER);]])
+
+    execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+    execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+                                 CHECK(b > 0);]])
+
+    _, err1 =
+        execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+                                                       RENAME TO t1;]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+                                 ck2 CHECK(a > 0);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+                                                       INTEGER PRIMARY KEY);]])
+
+    execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+                                 (a) REFERENCES t2(b);]])
+
+    execute_sql_string(stream, [[CREATE TABLE
+                                 trigger_catcher(id INTEGER PRIMARY
+                                                 KEY AUTOINCREMENT);]])
+
+    execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+    execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+                                 t1 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+    execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+                                 t2 FOR EACH ROW
+                                 BEGIN
+                                     INSERT INTO trigger_catcher VALUES(1);
+                                 END; ]])
+
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+                                                       ON t1(a, b);]])
+
+    execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+    _, err5 =
+        execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+    _, err6 =
+        execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+                                                       t_does_not_exist;]])
+
+    execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+    return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+            err5, err6}
+end$
+function monster_ddl_cmp_res(res1, res2)
+    if json.encode(res1) == json.encode(res2) then
+        return true
+    end
+    return res1, res2
+end$
+function monster_ddl_is_clean(stream)
+    local stream_or_box = stream or box
+    assert(stream_or_box.space.T1 == nil)
+    assert(stream_or_box.space.T2 == nil)
+    assert(stream_or_box.space._trigger:count() == 0)
+    assert(stream_or_box.space._fk_constraint:count() == 0)
+    assert(stream_or_box.space._ck_constraint:count() == 0)
+    assert(stream_or_box.space.T_RENAMED == nil)
+    assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+function monster_ddl_check(stream)
+    local _, err1, err2, err3, err4, res
+    local stream_or_box = stream or box
+    _, err1 =
+       execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                      VALUES (1, 1, 101)]])
+    execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+    _, err2 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+                                                       VALUES(2, 2, 1)]])
+    _, err3 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, 20, 1)]])
+    _, err4 =
+        execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+                                                       VALUES(1, -1, 1)]])
+    execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+    if not stream then
+        assert(stream_or_box.space.T_RENAMED ~= nil)
+        assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+        assert(stream_or_box.space.T_TO_RENAME == nil)
+        res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                             trigger_catcher]])
+    else
+        _, res =
+            execute_sql_string_and_return_result(stream, [[SELECT * FROM
+                                                           trigger_catcher]])
+    end
+    return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+            err3, err4, res}
+end$
+function monster_ddl_clear(stream)
+    execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+    execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+    execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+test_run:cmd("setopt delimiter ''")$
+
+test_run:cmd("start server test with args='10, true'")
+test_run:switch('test')
+test_run:cmd("setopt delimiter '$'")
+function monster_ddl_is_clean()
+    if not (box.space.T1 == nil) or
+       not (box.space.T2 == nil) or
+       not (box.space._trigger:count() == 0) or
+       not (box.space._fk_constraint:count() == 0) or
+       not (box.space._ck_constraint:count() == 0) or
+       not (box.space.T_RENAMED == nil) or
+       not (box.space.T_TO_RENAME == nil) then
+           return false
+    end
+    return true
+end$
+test_run:cmd("setopt delimiter ''")$
+test_run:switch('default')
+
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+
+-- No txn.
+true_ddl_res = monster_ddl()
+true_ddl_res
+
+true_check_res = monster_ddl_check()
+true_check_res
+
+monster_ddl_clear()
+monster_ddl_is_clean()
+
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+-- reload schema
+stream:ping()
+space = stream.space.TEST
+assert(space ~= nil)
+stream:execute('START TRANSACTION')
+space:replace{1, 2, '3'}
+space:select()
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:execute('COMMIT')
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:unprepare(stream_pr.stmt_id)
+conn:close()
+test_run:switch('test')
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+box.space.TEST:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
+test_run:cmd("cleanup server test")
+test_run:cmd("delete server test")
diff --git a/test/box/suite.ini b/test/box/suite.ini
index 637766cdd..369354eda 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -5,7 +5,7 @@ script = box.lua
 disabled = rtree_errinj.test.lua tuple_bench.test.lua
 long_run = huge_field_map_long.test.lua
 config = engine.cfg
-release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua net.box_iproto_streams.test.lua
+release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua net.box_iproto_streams.test.lua net.box_iproto_transactions_over_streams.test.lua
 lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua
 use_unix_sockets = True
 use_unix_sockets_iproto = True
-- 
2.20.1


^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 8/9] iproto: implement interactive transactions over iproto streams
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 8/9] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches
@ 2021-08-12 10:48   ` Vladimir Davydov via Tarantool-patches
  0 siblings, 0 replies; 15+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2021-08-12 10:48 UTC (permalink / raw)
  To: mechanik20051988; +Cc: tarantool-patches, v.shpilevoy, mechanik20051988

On Thu, Aug 12, 2021 at 12:50:45PM +0300, mechanik20051988 wrote:
> diff --git a/src/box/call.c b/src/box/call.c
> index a6384efe2..0ce84b1ed 100644
> --- a/src/box/call.c
> +++ b/src/box/call.c
> @@ -715,7 +740,17 @@ iproto_msg_new(struct iproto_connection *con)
>  static inline bool
>  iproto_connection_is_idle(struct iproto_connection *con)
>  {
> +	/*
> +         * The check for 'mh_size (streams) == 0' was added, because it is

Please fix formatting (you use spaces instead of tabs here).

> +         * possible that when disconnect occurs, there is active transaction
> +         * in stream after processing all messages. In this case we send
> +         * special message to rollback it, and without this check we would
> +         * immidiatly send special message to destroy connection. This would

immediately

> +         * not lead to error now, since the messages are processed strictly
> +         * sequentially, and rollback does not yield, but it is not safely and

and?

> +	 */
>  	return con->long_poll_count == 0 &&
> +	       mh_size(con->streams) == 0 &&
>  	       ibuf_used(&con->ibuf[0]) == 0 &&
>  	       ibuf_used(&con->ibuf[1]) == 0;
>  }

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 0/9] implement iproto streams
  2021-08-12  9:50 [Tarantool-patches] [PATCH v4 0/9] implement iproto streams mechanik20051988 via Tarantool-patches
                   ` (8 preceding siblings ...)
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 9/9] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches
@ 2021-08-12 10:51 ` Vladimir Davydov via Tarantool-patches
  9 siblings, 0 replies; 15+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2021-08-12 10:51 UTC (permalink / raw)
  To: mechanik20051988; +Cc: tarantool-patches, v.shpilevoy

On Thu, Aug 12, 2021 at 12:50:37PM +0300, mechanik20051988 wrote:
> Vladimir Davydov (2):
>   xrow: remove unused call_request::header
>   iproto: clear request::header for client requests
> 
> mechanik20051988 (7):
>   iproto: implement stream id in binary iproto protocol
>   salad: fix segfault in case when mhash table allocation failure
>   iproto: implement streams in iproto
>   net.box: add stream support to net.box
>   iproto: add RAFT prefix for all requests related to 'raft'.
>   iproto: implement interactive transactions over iproto streams
>   net.box: add interactive transaction support in net.box

LGTM after fixing the comment in patch 8.

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 6/9] net.box: add stream support to net.box
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 6/9] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
@ 2021-08-12 17:49   ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 0 replies; 15+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-08-12 17:49 UTC (permalink / raw)
  To: mechanik20051988, tarantool-patches, vdavydov; +Cc: mechanik20051988

Hi! Thanks for the patch!

See 2 comments below.

> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 8f5671c15..8d707fb26 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua

<...>

> +-- Metatable for stream space indexes. When stream space being
> +-- created there are no indexes in it. When accessing the space
> +-- index, we look for corresponding space index in corresponding
> +-- connection space. If it is found we create same index for the
> +-- stream space but with corresponding stream ID. We do not need
> +-- to compare stream _schema_version and connection schema_version,
> +-- because all access to index  is carried out through it's space.
> +-- So we update schema_version when we access space.
> +local stream_indexes_mt = {
> +    __index = function(self, key)
> +        local _space = self._space
> +        local src = _space._src.index[key]
> +        if not src then
> +            return nil
> +        end
> +        local res = stream_wrap_index(_space._stream_id, src)
> +        self[key] = res
> +        return res
> +    end,
> +    __serialize = stream_indexes_serialize
> +}
> +
> +-- Create stream space, which is same as connection space,
> +-- but have non zero stream ID.
> +local function stream_wrap_space(stream, src)
> +    local res = setmetatable({
> +        _stream_id = stream._stream_id,
> +        _src = src,
> +        index = setmetatable({
> +            _space = nil,
> +        }, stream_indexes_mt)
> +    }, {
> +        __index = src,
> +        __serialize = stream_space_serialize
> +    })

1. That is the thing I explicitly tried to avoid when proposed
to make a light proxy of conn spaces - it is not good to create
a new metatable for each space. Can you create it one time and
use it for all space objects? Like stream_indexes_mt - it is
shared by all 'index hashes'.

The only reason you create the metatable here is to use 'src'
as __index, but it might be done with a function.

	__index = function(self, key)
		return self._src[key]
	end

And it does not need to be a closure. This should make possible
to share the metatable by all spaces.

The same for stream metatable in :new_stream().

> +    res.index._space = res
> +    return res
> +end
> +
> +-- Metatable for stream spaces. When stream being created there
> +-- are no spaces in it. When user try to access some space in
> +-- stream, we first of all compare _schema_version of stream with
> +-- schema_version from connection and if they are not equal, we
> +-- clear stream space cache and update it's schema_version. Then
> +-- we look for corresponding space in the connection. If it is
> +-- found we create same space for the stream but with corresponding
> +-- stream ID.
> +local stream_spaces_mt = {
> +    __index = function(self, key)
> +        local stream = self._stream
> +        if stream._schema_version ~= stream._conn.schema_version then
> +            stream._schema_version = stream._conn.schema_version
> +            self._stream_space_cache = {}
> +        end
> +        if self._stream_space_cache[key] then
> +            return self._stream_space_cache[key]
> +        end
> +        local src = stream._conn.space[key]

2. Please, try to cache 'stream._conn' into a local variable so as
not to look it up more than once ('.' operator is not free).

The same for stream._stream_space_cache.

The same for stream._conn.schema_version.

The same for _last_stream_id in :new_stream().

> +        if not src then
> +            return nil
> +        end
> +        local res = stream_wrap_space(stream, src)
> +        self._stream_space_cache[key] = res
> +        return res
> +    end,
> +    __serialize = stream_spaces_serialize
> +}

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 9/9] net.box: add interactive transaction support in net.box
  2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 9/9] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches
@ 2021-08-12 17:52   ` Vladislav Shpilevoy via Tarantool-patches
  2021-08-13  7:44     ` Vladimir Davydov via Tarantool-patches
  0 siblings, 1 reply; 15+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-08-12 17:52 UTC (permalink / raw)
  To: mechanik20051988, tarantool-patches, vdavydov; +Cc: mechanik20051988

Thanks for the patch!

See 3 comments below.

> diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
> new file mode 100644
> index 000000000..8a8eec3e7
> --- /dev/null
> +++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
> @@ -0,0 +1,26 @@
> +## feature/core
> +
> +* Streams and interactive transactions over streams are implemented
> +  in iproto. Stream is associated with it's ID, which is unique within
> +  one connection. All requests with same not zero stream ID belongs to
> +  the same stream. All requests in stream processed synchronously. The
> +  execution of the next request will not start until the previous one is
> +  completed. If request has zero stream ID it does not belong to stream
> +  and is processed in the old way.
> +  In `net.box`, stream is an object above connection that has the same
> +  methods, but allows to execute requests sequentially. ID is generated
> +  on the client side automatically. If user writes his own connector and
> +  wants to use streams, he must transmit stream_id over iproto protocol.
> +  The main purpose of streams is transactions via iproto. Each stream
> +  can start its own transaction, so they allows multiplexing several
> +  transactions over one connection. There are multiple ways to begin,
> +  commit and rollback transaction: using appropriate stream methods, using
> +  `call` or `eval` methods or using `execute` method with sql transaction
> +  syntax. User can mix these methods, for example, start transaction using
> +  `stream:begin()`, and commit transaction using `stream:call('box.commit')`
> +  or stream:execute('COMMIT').
> +  If any request fails during the transaction, it will not affect the other
> +  requests in the transaction. If disconnect occurs when there is some active
> +  transaction in stream, this transaction will be rollbacked, if it does not
> +  have time to commit before this moment.

1. Please, add a reference to the ticket in the end in the form `(gh-####)`.

> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 8d707fb26..f203b203e 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -754,11 +757,38 @@ local function stream_new_stream(stream)
>      return stream._conn:new_stream()
>  end
>  
> +local function stream_begin(stream, opts)
> +    check_remote_arg(stream, 'begin')
> +    local res = stream:_request(M_BEGIN, opts, nil, stream._stream_id)
> +    if opts and opts.is_async then
> +        return res
> +    end
> +end
> +
> +local function stream_commit(stream, opts)
> +    check_remote_arg(stream, 'commit')
> +    local res = stream:_request(M_COMMIT, opts, nil, stream._stream_id)
> +    if opts and opts.is_async then
> +        return res
> +    end

2. Why can't you just return the result of :_request()? Isn't it
supposed to return the correct thing right away? For example,
remote_methods:execute() does it, space methods too.

> +end
> +
> +local function stream_rollback(stream, opts)
> +    check_remote_arg(stream, 'rollback')
> +    local res = stream:_request(M_ROLLBACK, opts, nil, stream._stream_id)
> +    if opts and opts.is_async then
> +        return res
> +    end
> +end
> diff --git a/test/box/net.box_iproto_transactions_over_streams.result b/test/box/net.box_iproto_transactions_over_streams.result
> new file mode 100644
> index 000000000..c2167e760
> --- /dev/null
> +++ b/test/box/net.box_iproto_transactions_over_streams.result

<...>

> +---
> +...
> +-- successful begin using stream:call
> +stream:call('box.begin')
> +---
> +...
> +-- error: Operation is not permitted when there is an active transaction

3. Well, it does not look very successful like you said in
the comment. The test seems broken.

I didn't validate the others since I have very few time for that.
I hope Vova will take a closer look at the tests now despite he
has given LGTM already.

(Or am I missing something and the test actually succeeds with
this weird output on purpose?)

> +stream:eval('box.begin()')
> +---
> +- error: 'Operation is not permitted when there is an active transaction '
> +...
> +-- error: Operation is not permitted when there is an active transaction
> +stream:begin()
> +---
> +- error: 'Operation is not permitted when there is an active transaction '
> +...
> +-- error: Operation is not permitted when there is an active transaction
> +stream:execute('START TRANSACTION')
> +---
> +- error: 'Operation is not permitted when there is an active transaction '
> +...
> +stream:call('ping')

^ permalink raw reply	[flat|nested] 15+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 9/9] net.box: add interactive transaction support in net.box
  2021-08-12 17:52   ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-08-13  7:44     ` Vladimir Davydov via Tarantool-patches
  0 siblings, 0 replies; 15+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2021-08-13  7:44 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: mechanik20051988, tarantool-patches

On Thu, Aug 12, 2021 at 08:52:30PM +0300, Vladislav Shpilevoy wrote:
> > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> > index 8d707fb26..f203b203e 100644
> > --- a/src/box/lua/net_box.lua
> > +++ b/src/box/lua/net_box.lua
> > @@ -754,11 +757,38 @@ local function stream_new_stream(stream)
> >      return stream._conn:new_stream()
> >  end
> >  
> > +local function stream_begin(stream, opts)
> > +    check_remote_arg(stream, 'begin')
> > +    local res = stream:_request(M_BEGIN, opts, nil, stream._stream_id)
> > +    if opts and opts.is_async then
> > +        return res
> > +    end
> > +end
> > +
> > +local function stream_commit(stream, opts)
> > +    check_remote_arg(stream, 'commit')
> > +    local res = stream:_request(M_COMMIT, opts, nil, stream._stream_id)
> > +    if opts and opts.is_async then
> > +        return res
> > +    end
> 
> 2. Why can't you just return the result of :_request()? Isn't it
> supposed to return the correct thing right away? For example,
> remote_methods:execute() does it, space methods too.

_request returns nil on success. I assume we want to return nothing,
similarly to box.commit().

> > +end
> > +
> > +local function stream_rollback(stream, opts)
> > +    check_remote_arg(stream, 'rollback')
> > +    local res = stream:_request(M_ROLLBACK, opts, nil, stream._stream_id)
> > +    if opts and opts.is_async then
> > +        return res
> > +    end
> > +end
> > diff --git a/test/box/net.box_iproto_transactions_over_streams.result b/test/box/net.box_iproto_transactions_over_streams.result
> > new file mode 100644
> > index 000000000..c2167e760
> > --- /dev/null
> > +++ b/test/box/net.box_iproto_transactions_over_streams.result
> 
> <...>
> 
> > +---
> > +...
> > +-- successful begin using stream:call
> > +stream:call('box.begin')
> > +---
> > +...
> > +-- error: Operation is not permitted when there is an active transaction
> 
> 3. Well, it does not look very successful like you said in
> the comment. The test seems broken.

It's not the command output, it's a comment :-)

> 
> I didn't validate the others since I have very few time for that.
> I hope Vova will take a closer look at the tests now despite he
> has given LGTM already.
> 
> (Or am I missing something and the test actually succeeds with
> this weird output on purpose?)
> 
> > +stream:eval('box.begin()')
> > +---
> > +- error: 'Operation is not permitted when there is an active transaction '
> > +...
> > +-- error: Operation is not permitted when there is an active transaction
> > +stream:begin()
> > +---
> > +- error: 'Operation is not permitted when there is an active transaction '
> > +...
> > +-- error: Operation is not permitted when there is an active transaction
> > +stream:execute('START TRANSACTION')
> > +---
> > +- error: 'Operation is not permitted when there is an active transaction '
> > +...
> > +stream:call('ping')

^ permalink raw reply	[flat|nested] 15+ messages in thread

end of thread, other threads:[~2021-08-13  7:44 UTC | newest]

Thread overview: 15+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-08-12  9:50 [Tarantool-patches] [PATCH v4 0/9] implement iproto streams mechanik20051988 via Tarantool-patches
2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 1/9] xrow: remove unused call_request::header mechanik20051988 via Tarantool-patches
2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 2/9] iproto: clear request::header for client requests mechanik20051988 via Tarantool-patches
2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 3/9] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches
2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 4/9] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches
2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 5/9] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches
2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 6/9] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
2021-08-12 17:49   ` Vladislav Shpilevoy via Tarantool-patches
2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 7/9] iproto: add RAFT prefix for all requests related to 'raft' mechanik20051988 via Tarantool-patches
2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 8/9] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches
2021-08-12 10:48   ` Vladimir Davydov via Tarantool-patches
2021-08-12  9:50 ` [Tarantool-patches] [PATCH v4 9/9] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches
2021-08-12 17:52   ` Vladislav Shpilevoy via Tarantool-patches
2021-08-13  7:44     ` Vladimir Davydov via Tarantool-patches
2021-08-12 10:51 ` [Tarantool-patches] [PATCH v4 0/9] implement iproto streams Vladimir Davydov via Tarantool-patches

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox