[Tarantool-patches] [PATCH 5/7] net.box: add stream support to net.box

mechanik20051988 mechanik20051988 at tarantool.org
Thu Aug 5 21:17:43 MSK 2021


From: mechanik20051988 <mechanik20.05.1988 at gmail.com>

Add stream support to `net.box`. In "net.box", stream
is an object over connection that has the same methods,
but all requests from it sends with non-zero stream ID.
Since there can be a lot of streams, we do not copy the
spaces from the connection to the stream immediately when
creating a stream, but do it only when we first access space.
Also, when updating the schema, we update the spaces only for
those streams that have had at least one access to the space.

Part of #5860

@TarantoolBot document
Title: stream support was added to net.box
In "net.box", stream is an object over connection that
has the same methods, but all requests from it sends
with non-zero stream ID. Stream ID is generated on the
client side in two ways: automatically or manually. User
can choose any of two methods, but can not mix them.
Simple example of stream creation using net.box:
```lua
-- automatically generated stream_id
stream = conn:stream()
-- manually chosen stream_id
stream = conn:stream(1)
```
---
 src/box/lua/net_box.c                         | 101 ++--
 src/box/lua/net_box.lua                       | 191 ++++--
 test/box/access.result                        |   6 +-
 test/box/access.test.lua                      |   6 +-
 ...net.box_console_connections_gh-2677.result |   2 +-
 ...t.box_console_connections_gh-2677.test.lua |   2 +-
 .../net.box_incorrect_iterator_gh-841.result  |   4 +-
 ...net.box_incorrect_iterator_gh-841.test.lua |   4 +-
 test/box/net.box_iproto_hangs_gh-3464.result  |   2 +-
 .../box/net.box_iproto_hangs_gh-3464.test.lua |   2 +-
 .../net.box_long-poll_input_gh-3400.result    |   8 +-
 .../net.box_long-poll_input_gh-3400.test.lua  |   8 +-
 test/box/stream.lua                           |  13 +
 test/box/stream.result                        | 553 ++++++++++++++++++
 test/box/stream.test.lua                      | 207 +++++++
 test/box/suite.ini                            |   2 +-
 16 files changed, 1011 insertions(+), 100 deletions(-)
 create mode 100644 test/box/stream.lua
 create mode 100644 test/box/stream.result
 create mode 100644 test/box/stream.test.lua

diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 82efc483d..ec850cd9f 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -76,7 +76,7 @@ enum netbox_method {
 
 static inline size_t
 netbox_prepare_request(struct mpstream *stream, uint64_t sync,
-		       enum iproto_type type)
+		       enum iproto_type type, uint64_t stream_id)
 {
 	/* Remember initial size of ibuf (see netbox_encode_request()) */
 	struct ibuf *ibuf = stream->ctx;
@@ -88,7 +88,7 @@ netbox_prepare_request(struct mpstream *stream, uint64_t sync,
 	mpstream_advance(stream, fixheader_size);
 
 	/* encode header */
-	mpstream_encode_map(stream, 2);
+	mpstream_encode_map(stream, stream_id != 0 ? 3 : 2);
 
 	mpstream_encode_uint(stream, IPROTO_SYNC);
 	mpstream_encode_uint(stream, sync);
@@ -96,6 +96,10 @@ netbox_prepare_request(struct mpstream *stream, uint64_t sync,
 	mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE);
 	mpstream_encode_uint(stream, type);
 
+	if (stream_id != 0) {
+		mpstream_encode_uint(stream, IPROTO_STREAM_ID);
+		mpstream_encode_uint(stream, stream_id);
+	}
 	/* Caller should remember how many bytes was used in ibuf */
 	return used;
 }
@@ -128,20 +132,21 @@ netbox_encode_request(struct mpstream *stream, size_t initial_size)
 
 static void
 netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
 	(void)L;
 	(void)idx;
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_PING);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_PING, stream_id);
 	netbox_encode_request(stream, svp);
 }
 
 static int
 netbox_encode_auth(lua_State *L)
 {
-	if (lua_gettop(L) < 5) {
+	if (lua_gettop(L) < 6) {
 		return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, "
-				     "user, password, greeting)");
+				     "stream_id, user, password, greeting)");
 	}
 	struct ibuf *ibuf = (struct ibuf *)lua_topointer(L, 1);
 	uint64_t sync = luaL_touint64(L, 2);
@@ -149,14 +154,14 @@ netbox_encode_auth(lua_State *L)
 	struct mpstream stream;
 	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
 		      luamp_error, L);
-	size_t svp = netbox_prepare_request(&stream, sync, IPROTO_AUTH);
+	size_t svp = netbox_prepare_request(&stream, sync, IPROTO_AUTH, 0);
 
 	size_t user_len;
-	const char *user = lua_tolstring(L, 3, &user_len);
+	const char *user = lua_tolstring(L, 4, &user_len);
 	size_t password_len;
-	const char *password = lua_tolstring(L, 4, &password_len);
+	const char *password = lua_tolstring(L, 5, &password_len);
 	size_t salt_len;
-	const char *salt = lua_tolstring(L, 5, &salt_len);
+	const char *salt = lua_tolstring(L, 6, &salt_len);
 	if (salt_len < SCRAMBLE_SIZE)
 		return luaL_error(L, "Invalid salt");
 
@@ -179,11 +184,10 @@ netbox_encode_auth(lua_State *L)
 
 static void
 netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
-			uint64_t sync, enum iproto_type type)
+			uint64_t sync, enum iproto_type type, uint64_t stream_id)
 {
 	/* Lua stack at idx: function_name, args */
-	size_t svp = netbox_prepare_request(stream, sync, type);
-
+	size_t svp = netbox_prepare_request(stream, sync, type, stream_id);
 	mpstream_encode_map(stream, 2);
 
 	/* encode proc name */
@@ -201,24 +205,26 @@ netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_call_16(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL_16);
+	netbox_encode_call_impl(L, idx, stream, sync,
+				IPROTO_CALL_16, stream_id);
 }
 
 static void
 netbox_encode_call(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL);
+	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL, stream_id);
 }
 
 static void
 netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: expr, args */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_EVAL);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_EVAL, stream_id);
 
 	mpstream_encode_map(stream, 2);
 
@@ -237,10 +243,11 @@ netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, iterator, offset, limit, key */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_SELECT);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_SELECT, stream_id);
 
 	mpstream_encode_map(stream, 6);
 
@@ -279,10 +286,11 @@ netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
-				uint64_t sync, enum iproto_type type)
+				uint64_t sync, enum iproto_type type,
+				uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, tuple */
-	size_t svp = netbox_prepare_request(stream, sync, type);
+	size_t svp = netbox_prepare_request(stream, sync, type, stream_id);
 
 	mpstream_encode_map(stream, 2);
 
@@ -300,24 +308,27 @@ netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_insert(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_INSERT);
+	netbox_encode_insert_or_replace(L, idx, stream, sync,
+					IPROTO_INSERT, stream_id);
 }
 
 static void
 netbox_encode_replace(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_REPLACE);
+	netbox_encode_insert_or_replace(L, idx, stream, sync,
+					IPROTO_REPLACE, stream_id);
 }
 
 static void
 netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, key */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_DELETE);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_DELETE, stream_id);
 
 	mpstream_encode_map(stream, 3);
 
@@ -340,10 +351,11 @@ netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, key, ops */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_UPDATE);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_UPDATE, stream_id);
 
 	mpstream_encode_map(stream, 5);
 
@@ -374,10 +386,11 @@ netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_upsert(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, tuple, ops */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_UPSERT);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_UPSERT, stream_id);
 
 	mpstream_encode_map(stream, 4);
 
@@ -547,10 +560,11 @@ handle_error:
 
 static void
 netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query, parameters, options */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_EXECUTE);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_EXECUTE, stream_id);
 
 	mpstream_encode_map(stream, 3);
 
@@ -576,10 +590,11 @@ netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_PREPARE);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_PREPARE, stream_id);
 
 	mpstream_encode_map(stream, 1);
 
@@ -599,18 +614,19 @@ netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
-			uint64_t sync)
+			uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query, parameters, options */
-	netbox_encode_prepare(L, idx, stream, sync);
+	netbox_encode_prepare(L, idx, stream, sync, stream_id);
 }
 
 static void
 netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: bytes */
 	(void)sync;
+	(void)stream_id;
 	size_t len;
 	const char *data = lua_tolstring(L, idx, &len);
 	mpstream_memcpy(stream, data, len);
@@ -632,7 +648,7 @@ netbox_encode_method(struct lua_State *L)
 {
 	typedef void (*method_encoder_f)(struct lua_State *L, int idx,
 					 struct mpstream *stream,
-					 uint64_t sync);
+					 uint64_t sync, uint64_t stream_id);
 	static method_encoder_f method_encoder[] = {
 		[NETBOX_PING]		= netbox_encode_ping,
 		[NETBOX_CALL_16]	= netbox_encode_call_16,
@@ -657,10 +673,11 @@ netbox_encode_method(struct lua_State *L)
 	assert(method < netbox_method_MAX);
 	struct ibuf *ibuf = (struct ibuf *)lua_topointer(L, 2);
 	uint64_t sync = luaL_touint64(L, 3);
+	uint64_t stream_id = luaL_touint64(L, 4);
 	struct mpstream stream;
 	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
 		      luamp_error, L);
-	method_encoder[method](L, 4, &stream, sync);
+	method_encoder[method](L, 5, &stream, sync, stream_id);
 	return 0;
 }
 
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 9af6028eb..bf6a89e15 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -483,7 +483,7 @@ local function create_transport(host, port, user, password, callback,
     -- @retval not nil Future object.
     --
     local function perform_async_request(buffer, skip_header, method, on_push,
-                                         on_push_ctx, format, ...)
+                                         on_push_ctx, format, stream_id, ...)
         if state ~= 'active' and state ~= 'fetch_schema' then
             local code = last_errno or E_NO_CONNECTION
             local msg = last_error or
@@ -497,7 +497,7 @@ local function create_transport(host, port, user, password, callback,
             worker_fiber:wakeup()
         end
         local id = next_request_id
-        encode_method(method, send_buf, id, ...)
+        encode_method(method, send_buf, id, stream_id, ...)
         next_request_id = next_id(id)
         -- Request in most cases has maximum 10 members:
         -- method, buffer, skip_header, id, cond, errno, response,
@@ -521,10 +521,10 @@ local function create_transport(host, port, user, password, callback,
     -- @retval not nil Response object.
     --
     local function perform_request(timeout, buffer, skip_header, method,
-                                   on_push, on_push_ctx, format, ...)
+                                   on_push, on_push_ctx, format, stream_id, ...)
         local request, err =
             perform_async_request(buffer, skip_header, method, on_push,
-                                  on_push_ctx, format, ...)
+                                  on_push_ctx, format, stream_id, ...)
         if not request then
             return nil, err
         end
@@ -710,7 +710,7 @@ local function create_transport(host, port, user, password, callback,
             log.warn("Netbox text protocol support is deprecated since 1.10, "..
                      "please use require('console').connect() instead")
             local setup_delimiter = 'require("console").delimiter("$EOF$")\n'
-            encode_method(M_INJECT, send_buf, nil, setup_delimiter)
+            encode_method(M_INJECT, send_buf, nil, nil, setup_delimiter)
             local err, response = send_and_recv_console()
             if err then
                 return error_sm(err, response)
@@ -744,7 +744,7 @@ local function create_transport(host, port, user, password, callback,
             set_state('fetch_schema')
             return iproto_schema_sm()
         end
-        encode_auth(send_buf, new_request_id(), user, password, salt)
+        encode_auth(send_buf, new_request_id(), nil, user, password, salt)
         local err, hdr, body_rpos = send_and_recv_iproto()
         if err then
             return error_sm(err, hdr)
@@ -770,15 +770,15 @@ local function create_transport(host, port, user, password, callback,
         local select3_id
         local response = {}
         -- fetch everything from space _vspace, 2 = ITER_ALL
-        encode_method(M_SELECT, send_buf, select1_id, VSPACE_ID, 0, 2, 0,
+        encode_method(M_SELECT, send_buf, select1_id, nil, VSPACE_ID, 0, 2, 0,
                       0xFFFFFFFF, nil)
         -- fetch everything from space _vindex, 2 = ITER_ALL
-        encode_method(M_SELECT, send_buf, select2_id, VINDEX_ID, 0, 2, 0,
+        encode_method(M_SELECT, send_buf, select2_id, nil, VINDEX_ID, 0, 2, 0,
                       0xFFFFFFFF, nil)
         -- fetch everything from space _vcollation, 2 = ITER_ALL
         if peer_has_vcollation then
             select3_id = new_request_id()
-            encode_method(M_SELECT, send_buf, select3_id, VCOLLATION_ID,
+            encode_method(M_SELECT, send_buf, select3_id, nil, VCOLLATION_ID,
                           0, 2, 0, 0xFFFFFFFF, nil)
         end
 
@@ -930,6 +930,8 @@ local function remote_serialize(self)
     }
 end
 
+local stream_methods = {}
+
 local remote_methods = {}
 local remote_mt = {
     __index = remote_methods, __serialize = remote_serialize,
@@ -942,6 +944,90 @@ local console_mt = {
     __metatable = false
 }
 
+local stream_index_mt = {
+    __index = function(self, key)
+        return self._src[key]
+    end
+}
+
+-- Create stream space index, which is same as connection space
+-- index, but have non zero stream ID.
+local function stream_wrap_index(stream_id, src)
+    return setmetatable({
+        _stream_id = stream_id,
+        _src = src,
+    }, stream_index_mt)
+end
+
+-- Metatable for stream space indexes. When stream space being
+-- created there are no indexes in it. When accessing the space
+-- index, we look for corresponding space index in corresponding
+-- connection space. If it is found we create same index for the
+-- stream space but with corresponding stream ID. We do not need
+-- to compare stream _schema_version and connection schema_version,
+-- because all access to index  is carried out through it's space.
+-- So we update schema_version when we access space.
+local stream_indexes_mt = {
+    __index = function(self, key)
+        local _space = self._space
+        local src = _space._src.index[key]
+        if not src then
+            return nil
+        end
+        local res = stream_wrap_index(_space._stream_id, src)
+        self[key] = res
+        return res
+    end
+}
+
+local stream_space_mt = {
+    __index = function(self, key)
+        return self._src[key]
+    end
+}
+
+-- Create stream space, which is same as connection space,
+-- but have non zero stream ID.
+local function stream_wrap_space(stream, src)
+    local res = setmetatable({
+        _stream_id = stream._stream_id,
+        _src = src,
+        index = setmetatable({
+            _space = nil,
+        }, stream_indexes_mt)
+    }, stream_space_mt)
+    res.index._space = res
+    return res
+end
+
+-- Metatable for stream spaces. When stream being created there
+-- are no spaces in it. When user try to access some space in
+-- stream, we first of all compare _schema_version of stream with
+-- schema_version from connection and if they are not equal, we
+-- clear stream space cache and update it's schema_version. Then
+-- we look for corresponding space in the connection. If it is
+-- found we create same space for the stream but with corresponding
+-- stream ID.
+local stream_spaces_mt = {
+    __index = function(self, key)
+        local stream = self._stream
+        if stream._schema_version ~= stream._conn.schema_version then
+            stream._schema_version = stream._conn.schema_version
+            self._space = {}
+        end
+        if self._space[key] then
+            return self._space[key]
+        end
+        local src = stream._conn.space[key]
+        if not src then
+            return nil
+        end
+        local res = stream_wrap_space(stream, src)
+        self._space[key] = res
+        return res
+    end
+}
+
 local space_metatable, index_metatable
 
 local function new_sm(host, port, opts, connection, greeting)
@@ -1021,6 +1107,9 @@ local function new_sm(host, port, opts, connection, greeting)
     if opts.wait_connected ~= false then
         remote._transport.wait_state('active', tonumber(opts.wait_connected))
     end
+    -- Last stream ID used for this connection
+    remote._last_stream_id = 0
+    remote._streams = setmetatable({}, {__mode = 'v'})
     return remote
 end
 
@@ -1078,6 +1167,29 @@ local function check_eval_args(args)
     end
 end
 
+function stream_methods:new_stream()
+    check_remote_arg(self, 'stream')
+    box.error(E_PROC_LUA, "Unsupported for stream");
+end
+
+function remote_methods:new_stream()
+    check_remote_arg(self, 'stream')
+    self._last_stream_id = self._last_stream_id + 1
+    local stream = setmetatable({
+        new_stream = stream_methods.new_stream,
+        _stream_id = self._last_stream_id,
+        space = setmetatable({
+            _space = {},
+            _stream = nil,
+        }, stream_spaces_mt),
+        _conn = self,
+        _schema_version = self.schema_version,
+    }, { __index = self })
+    stream.space._stream = stream
+    self._streams[self._last_stream_id] = stream
+    return stream
+end
+
 function remote_methods:close()
     check_remote_arg(self, 'close')
     self._transport.stop()
@@ -1108,7 +1220,7 @@ function remote_methods:wait_connected(timeout)
     return self._transport.wait_state('active', timeout)
 end
 
-function remote_methods:_request(method, opts, format, ...)
+function remote_methods:_request(method, opts, format, stream_id, ...)
     local transport = self._transport
     local on_push, on_push_ctx, buffer, skip_header, deadline
     -- Extract options, set defaults, check if the request is
@@ -1123,7 +1235,7 @@ function remote_methods:_request(method, opts, format, ...)
             local res, err =
                 transport.perform_async_request(buffer, skip_header, method,
                                                 table.insert, {}, format,
-                                                ...)
+                                                stream_id, ...)
             if err then
                 box.error(err)
             end
@@ -1145,7 +1257,7 @@ function remote_methods:_request(method, opts, format, ...)
     end
     local res, err = transport.perform_request(timeout, buffer, skip_header,
                                                method, on_push, on_push_ctx,
-                                               format, ...)
+                                               format, stream_id, ...)
     if err then
         box.error(err)
     end
@@ -1161,7 +1273,7 @@ end
 
 function remote_methods:ping(opts)
     check_remote_arg(self, 'ping')
-    return (pcall(self._request, self, M_PING, opts))
+    return (pcall(self._request, self, M_PING, opts, nil, self._stream_id))
 end
 
 function remote_methods:reload_schema()
@@ -1172,14 +1284,16 @@ end
 -- @deprecated since 1.7.4
 function remote_methods:call_16(func_name, ...)
     check_remote_arg(self, 'call')
-    return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...}))
+    return (self:_request(M_CALL_16, nil, nil, self._stream_id,
+                          tostring(func_name), {...}))
 end
 
 function remote_methods:call(func_name, args, opts)
     check_remote_arg(self, 'call')
     check_call_args(args)
     args = args or {}
-    local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args)
+    local res = self:_request(M_CALL_17, opts, nil, self._stream_id,
+                              tostring(func_name), args)
     if type(res) ~= 'table' or opts and opts.is_async then
         return res
     end
@@ -1189,14 +1303,15 @@ end
 -- @deprecated since 1.7.4
 function remote_methods:eval_16(code, ...)
     check_remote_arg(self, 'eval')
-    return unpack((self:_request(M_EVAL, nil, nil, code, {...})))
+    return unpack((self:_request(M_EVAL, nil, nil, self._stream_id,
+                                 code, {...})))
 end
 
 function remote_methods:eval(code, args, opts)
     check_remote_arg(self, 'eval')
     check_eval_args(args)
     args = args or {}
-    local res = self:_request(M_EVAL, opts, nil, code, args)
+    local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args)
     if type(res) ~= 'table' or opts and opts.is_async then
         return res
     end
@@ -1208,8 +1323,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts)
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "execute", "options")
     end
-    return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {},
-                         sql_opts or {})
+    return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id,
+                         query, parameters or {}, sql_opts or {})
 end
 
 function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args
@@ -1220,7 +1335,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "prepare", "options")
     end
-    return self:_request(M_PREPARE, netbox_opts, nil, query)
+    return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query)
 end
 
 function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
@@ -1231,8 +1346,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "unprepare", "options")
     end
-    return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {},
-                         sql_opts or {})
+    return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id,
+                         query, parameters or {}, sql_opts or {})
 end
 
 function remote_methods:wait_state(state, timeout)
@@ -1370,11 +1485,11 @@ function console_methods:eval(line, timeout)
     end
     if self.protocol == 'Binary' then
         local loader = 'return require("console").eval(...)'
-        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader,
+        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader,
                       {line})
     else
         assert(self.protocol == 'Lua console')
-        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil,
+        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil,
                       line..'$EOF$\n')
     end
     if err then
@@ -1394,14 +1509,14 @@ space_metatable = function(remote)
 
     function methods:insert(tuple, opts)
         check_space_arg(self, 'insert')
-        return remote:_request(M_INSERT, opts, self._format_cdata, self.id,
-                               tuple)
+        return remote:_request(M_INSERT, opts, self._format_cdata,
+                               self._stream_id, self.id, tuple)
     end
 
     function methods:replace(tuple, opts)
         check_space_arg(self, 'replace')
-        return remote:_request(M_REPLACE, opts, self._format_cdata, self.id,
-                               tuple)
+        return remote:_request(M_REPLACE, opts, self._format_cdata,
+                               self._stream_id, self.id, tuple)
     end
 
     function methods:select(key, opts)
@@ -1421,7 +1536,8 @@ space_metatable = function(remote)
 
     function methods:upsert(key, oplist, opts)
         check_space_arg(self, 'upsert')
-        return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id,
+        return nothing_or_data(remote:_request(M_UPSERT, opts, nil,
+                                               self._stream_id, self.id,
                                                key, oplist))
     end
 
@@ -1452,8 +1568,8 @@ index_metatable = function(remote)
         local offset = tonumber(opts and opts.offset) or 0
         local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
         return (remote:_request(M_SELECT, opts, self.space._format_cdata,
-                                self.space.id, self.id, iterator, offset,
-                                limit, key))
+                                self._stream_id, self.space.id, self.id,
+                                iterator, offset, limit, key))
     end
 
     function methods:get(key, opts)
@@ -1463,6 +1579,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_GET, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.EQ, 0, 2, key))
     end
@@ -1474,6 +1591,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_MIN, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.GE, 0, 1, key))
     end
@@ -1485,6 +1603,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_MAX, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.LE, 0, 1, key))
     end
@@ -1496,22 +1615,24 @@ index_metatable = function(remote)
         end
         local code = string.format('box.space.%s.index.%s:count',
                                    self.space.name, self.name)
-        return remote:_request(M_COUNT, opts, nil, code, { key, opts })
+        return remote:_request(M_COUNT, opts, nil, self._stream_id,
+                               code, { key, opts })
     end
 
     function methods:delete(key, opts)
         check_index_arg(self, 'delete')
         return nothing_or_data(remote:_request(M_DELETE, opts,
                                                self.space._format_cdata,
-                                               self.space.id, self.id, key))
+                                               self._stream_id, self.space.id,
+                                               self.id, key))
     end
 
     function methods:update(key, oplist, opts)
         check_index_arg(self, 'update')
         return nothing_or_data(remote:_request(M_UPDATE, opts,
                                                self.space._format_cdata,
-                                               self.space.id, self.id, key,
-                                               oplist))
+                                               self._stream_id, self.space.id,
+                                               self.id, key, oplist))
     end
 
     return { __index = methods, __metatable = false }
diff --git a/test/box/access.result b/test/box/access.result
index 712cd68f8..6434da907 100644
--- a/test/box/access.result
+++ b/test/box/access.result
@@ -908,15 +908,15 @@ LISTEN = require('uri').parse(box.cfg.listen)
 c = net.connect(LISTEN.host, LISTEN.service)
 ---
 ...
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '1' does not exist
 ...
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '65537' does not exist
 ...
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '4294967295' does not exist
 ...
diff --git a/test/box/access.test.lua b/test/box/access.test.lua
index 6060475d1..6abdb780d 100644
--- a/test/box/access.test.lua
+++ b/test/box/access.test.lua
@@ -351,9 +351,9 @@ box.schema.func.drop(name)
 -- very large space id, no crash occurs.
 LISTEN = require('uri').parse(box.cfg.listen)
 c = net.connect(LISTEN.host, LISTEN.service)
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 c:close()
 
 session = box.session
diff --git a/test/box/net.box_console_connections_gh-2677.result b/test/box/net.box_console_connections_gh-2677.result
index f45aa0b56..7cea0a1da 100644
--- a/test/box/net.box_console_connections_gh-2677.result
+++ b/test/box/net.box_console_connections_gh-2677.result
@@ -74,7 +74,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
 ---
 ...
 while not c:is_connected() do fiber.sleep(0.01) end
diff --git a/test/box/net.box_console_connections_gh-2677.test.lua b/test/box/net.box_console_connections_gh-2677.test.lua
index 40d099e70..6c4e6ea4f 100644
--- a/test/box/net.box_console_connections_gh-2677.test.lua
+++ b/test/box/net.box_console_connections_gh-2677.test.lua
@@ -30,7 +30,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
 while not c:is_connected() do fiber.sleep(0.01) end
 c:ping()
 
diff --git a/test/box/net.box_incorrect_iterator_gh-841.result b/test/box/net.box_incorrect_iterator_gh-841.result
index fbd2a7700..cd2a86787 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.result
+++ b/test/box/net.box_incorrect_iterator_gh-841.result
@@ -16,13 +16,13 @@ test_run:cmd("setopt delimiter ';'")
 - true
 ...
 function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
-    local ret = cn:_request(remote._method.select, opts, nil, space_id,
+    local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
                             index_id, iterator, offset, limit, key)
     return ret
 end
 function x_fatal(cn)
     cn._transport.perform_request(nil, nil, false, remote._method.inject,
-                                  nil, nil, nil, '\x80')
+                                  nil, nil, nil, nil, '\x80')
 end
 test_run:cmd("setopt delimiter ''");
 ---
diff --git a/test/box/net.box_incorrect_iterator_gh-841.test.lua b/test/box/net.box_incorrect_iterator_gh-841.test.lua
index 1d24f9f56..9c42175ef 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.test.lua
+++ b/test/box/net.box_incorrect_iterator_gh-841.test.lua
@@ -5,13 +5,13 @@ test_run:cmd("push filter ".."'\\.lua.*:[0-9]+: ' to '.lua...\"]:<line>: '")
 
 test_run:cmd("setopt delimiter ';'")
 function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
-    local ret = cn:_request(remote._method.select, opts, nil, space_id,
+    local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
                             index_id, iterator, offset, limit, key)
     return ret
 end
 function x_fatal(cn)
     cn._transport.perform_request(nil, nil, false, remote._method.inject,
-                                  nil, nil, nil, '\x80')
+                                  nil, nil, nil, nil, '\x80')
 end
 test_run:cmd("setopt delimiter ''");
 
diff --git a/test/box/net.box_iproto_hangs_gh-3464.result b/test/box/net.box_iproto_hangs_gh-3464.result
index 3b5458c9a..cbf8181b3 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.result
+++ b/test/box/net.box_iproto_hangs_gh-3464.result
@@ -17,7 +17,7 @@ c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
 ---
 ...
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
 ---
 - null
 - Peer closed
diff --git a/test/box/net.box_iproto_hangs_gh-3464.test.lua b/test/box/net.box_iproto_hangs_gh-3464.test.lua
index a7c41ae76..51a9ddece 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.test.lua
+++ b/test/box/net.box_iproto_hangs_gh-3464.test.lua
@@ -8,6 +8,6 @@ net = require('net.box')
 --
 c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
 c:close()
 test_run:grep_log('default', 'too big packet size in the header') ~= nil
diff --git a/test/box/net.box_long-poll_input_gh-3400.result b/test/box/net.box_long-poll_input_gh-3400.result
index a16110ee6..a98eea655 100644
--- a/test/box/net.box_long-poll_input_gh-3400.result
+++ b/test/box/net.box_long-poll_input_gh-3400.result
@@ -24,10 +24,10 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, false,    \
-                 net._method.call_17, nil, nil, nil, 'long', {})   \
-c._transport.perform_request(nil, nil, false, net._method.inject,  \
-                             nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false,         \
+                 net._method.call_17, nil, nil, nil, nil, 'long', {})   \
+c._transport.perform_request(nil, nil, false, net._method.inject,       \
+                             nil, nil, nil, nil, '\x80')
 ---
 ...
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/net.box_long-poll_input_gh-3400.test.lua b/test/box/net.box_long-poll_input_gh-3400.test.lua
index 891b59224..a6f302ee0 100644
--- a/test/box/net.box_long-poll_input_gh-3400.test.lua
+++ b/test/box/net.box_long-poll_input_gh-3400.test.lua
@@ -14,9 +14,9 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, false,    \
-                 net._method.call_17, nil, nil, nil, 'long', {})   \
-c._transport.perform_request(nil, nil, false, net._method.inject,  \
-                             nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false,         \
+                 net._method.call_17, nil, nil, nil, nil, 'long', {})   \
+c._transport.perform_request(nil, nil, false, net._method.inject,       \
+                             nil, nil, nil, nil, '\x80')
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 c:close()
diff --git a/test/box/stream.lua b/test/box/stream.lua
new file mode 100644
index 000000000..db6a29a8a
--- /dev/null
+++ b/test/box/stream.lua
@@ -0,0 +1,13 @@
+#!/usr/bin/env tarantool
+
+require('console').listen(os.getenv('ADMIN'))
+
+local memtx_use_mvcc_engine = (arg[2] and arg[2] == 'true' and true or false)
+
+box.cfg({
+    listen = os.getenv('LISTEN'),
+    iproto_threads = tonumber(arg[1]),
+    memtx_use_mvcc_engine = memtx_use_mvcc_engine
+})
+
+box.schema.user.grant('guest', 'read,write,execute,create,drop', 'universe',  nil, {if_not_exists = true})
diff --git a/test/box/stream.result b/test/box/stream.result
new file mode 100644
index 000000000..bfcf6c6be
--- /dev/null
+++ b/test/box/stream.result
@@ -0,0 +1,553 @@
+-- test-run result file version 2
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+test_run:cmd("create server test with script='box/stream.lua'")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+ | ---
+ | ...
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+conn_1 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn_1:new_stream()
+ | ---
+ | ...
+conn_2 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_2 = conn_2:new_stream()
+ | ---
+ | ...
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+ | ---
+ | ...
+assert(not stream_1:ping())
+ | ---
+ | - true
+ | ...
+stream_2:close()
+ | ---
+ | ...
+assert(not conn_2:ping())
+ | ---
+ | - true
+ | ...
+-- new_stream method unsupported for stream
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+-- Unsupported for stream
+stream:new_stream()
+ | ---
+ | - error: Unsupported for stream
+ | ...
+conn:close()
+ | ---
+ | ...
+
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+assert(not conn.space.test)
+ | ---
+ | - true
+ | ...
+assert(not stream.space.test)
+ | ---
+ | - true
+ | ...
+assert(conn.schema_version == stream._schema_version)
+ | ---
+ | - true
+ | ...
+conn:reload_schema()
+ | ---
+ | ...
+assert(conn.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+assert(conn.schema_version ~= stream._schema_version)
+ | ---
+ | - true
+ | ...
+assert(stream.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+ | ---
+ | - true
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+assert(conn.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+assert(stream.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s:drop()
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+conn:reload_schema()
+ | ---
+ | ...
+assert(not conn.space.test)
+ | ---
+ | - true
+ | ...
+assert(not stream.space.test)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- All test works with iproto_thread count = 10
+
+test_run:cmd("start server test with args='10'")
+ | ---
+ | - true
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function replace_with_yeild(item)
+    fiber.sleep(0.1)
+    return s:replace({item})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+conn_space = conn.space.test
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+stream_space = stream.space.test
+ | ---
+ | ...
+
+-- Check that all requests in stream processed consistently
+futures = {}
+ | ---
+ | ...
+replace_count = 3
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+for i = 1, replace_count do
+    futures[string.format("replace_%d", i)] =
+        stream_space:replace({i}, {is_async = true})
+    futures[string.format("select_%d", i)] =
+        stream_space:select({}, {is_async = true})
+end;
+ | ---
+ | ...
+futures["replace_with_yeild_for_stream"] =
+    stream:call("replace_with_yeild",
+                { replace_count + 1 }, {is_async = true});
+ | ---
+ | ...
+futures["select_with_yeild_for_stream"] =
+    stream_space:select({}, {is_async = true});
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+results = wait_and_return_results(futures)
+ | ---
+ | ...
+-- [1]
+assert(results["select_1"])
+ | ---
+ | - - [1]
+ | ...
+-- [1] [2]
+assert(results["select_2"])
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+-- [1] [2] [3]
+assert(results["select_3"])
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ | ...
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ | ...
+
+-- There is no request execution order for the connection
+futures = {}
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+futures["replace_with_yeild_for_connection"] =
+    conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+ | ---
+ | ...
+futures["select_with_yeild_for_connection"] =
+    conn_space:select({}, {is_async = true});
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+results = wait_and_return_results(futures)
+ | ---
+ | ...
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- [1] [2] [3] [4] [5]
+s:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ |   - [5]
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+-- Сheck that stream object is not leak
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+streams = {}
+spaces = {}
+conns = setmetatable({ conn }, {__mode = 'v'})
+count = 10
+for i = 1, count do
+    streams[i] = conn:new_stream()
+    spaces[i] = streams[i].space.test
+    assert(spaces[i])
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+assert(#conn._streams == count)
+ | ---
+ | - true
+ | ...
+spaces = nil
+ | ---
+ | ...
+streams = nil
+ | ---
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+assert(#conn._streams == 0)
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+assert(#conns == 1)
+ | ---
+ | - true
+ | ...
+conn = nil
+ | ---
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+assert(#conns == 0)
+ | ---
+ | - true
+ | ...
+
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+space = stream.space.test
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+replace_count = 20
+for i = 1, replace_count do
+    space:replace({i}, {is_async = true})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+-- Give time to send
+fiber.sleep(0)
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ |   - [5]
+ |   - [6]
+ |   - [7]
+ |   - [8]
+ |   - [9]
+ |   - [10]
+ |   - [11]
+ |   - [12]
+ |   - [13]
+ |   - [14]
+ |   - [15]
+ |   - [16]
+ |   - [17]
+ |   - [18]
+ |   - [19]
+ |   - [20]
+ | ...
+s:drop()
+ | ---
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("cleanup server test")
+ | ---
+ | - true
+ | ...
+test_run:cmd("delete server test")
+ | ---
+ | - true
+ | ...
diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua
new file mode 100644
index 000000000..190f17d8e
--- /dev/null
+++ b/test/box/stream.test.lua
@@ -0,0 +1,207 @@
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+fiber = require('fiber')
+test_run = require('test_run').new()
+
+test_run:cmd("create server test with script='box/stream.lua'")
+
+test_run:cmd("setopt delimiter ';'")
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn_1 = net_box.connect(server_addr)
+stream_1 = conn_1:new_stream()
+conn_2 = net_box.connect(server_addr)
+stream_2 = conn_2:new_stream()
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+assert(not stream_1:ping())
+stream_2:close()
+assert(not conn_2:ping())
+-- new_stream method unsupported for stream
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+-- Unsupported for stream
+stream:new_stream()
+conn:close()
+
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+test_run:switch("test")
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch("default")
+assert(not conn.space.test)
+assert(not stream.space.test)
+assert(conn.schema_version == stream._schema_version)
+conn:reload_schema()
+assert(conn.space.test ~= nil)
+assert(conn.schema_version ~= stream._schema_version)
+assert(stream.space.test ~= nil)
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+collectgarbage()
+collectgarbage()
+assert(conn.space.test ~= nil)
+assert(stream.space.test ~= nil)
+test_run:switch("test")
+s:drop()
+test_run:switch("default")
+conn:reload_schema()
+assert(not conn.space.test)
+assert(not stream.space.test)
+test_run:cmd("stop server test")
+
+-- All test works with iproto_thread count = 10
+
+test_run:cmd("start server test with args='10'")
+test_run:switch('test')
+fiber = require('fiber')
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:cmd("setopt delimiter ';'")
+function replace_with_yeild(item)
+    fiber.sleep(0.1)
+    return s:replace({item})
+end;
+test_run:cmd("setopt delimiter ''");
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+conn_space = conn.space.test
+stream = conn:new_stream()
+stream_space = stream.space.test
+
+-- Check that all requests in stream processed consistently
+futures = {}
+replace_count = 3
+test_run:cmd("setopt delimiter ';'")
+for i = 1, replace_count do
+    futures[string.format("replace_%d", i)] =
+        stream_space:replace({i}, {is_async = true})
+    futures[string.format("select_%d", i)] =
+        stream_space:select({}, {is_async = true})
+end;
+futures["replace_with_yeild_for_stream"] =
+    stream:call("replace_with_yeild",
+                { replace_count + 1 }, {is_async = true});
+futures["select_with_yeild_for_stream"] =
+    stream_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1]
+assert(results["select_1"])
+-- [1] [2]
+assert(results["select_2"])
+-- [1] [2] [3]
+assert(results["select_3"])
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+
+-- There is no request execution order for the connection
+futures = {}
+test_run:cmd("setopt delimiter ';'")
+futures["replace_with_yeild_for_connection"] =
+    conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+futures["select_with_yeild_for_connection"] =
+    conn_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+test_run:switch("test")
+-- [1] [2] [3] [4] [5]
+s:select()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+-- Сheck that stream object is not leak
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+test_run:cmd("setopt delimiter ';'")
+streams = {}
+spaces = {}
+conns = setmetatable({ conn }, {__mode = 'v'})
+count = 10
+for i = 1, count do
+    streams[i] = conn:new_stream()
+    spaces[i] = streams[i].space.test
+    assert(spaces[i])
+end;
+test_run:cmd("setopt delimiter ''");
+assert(#conn._streams == count)
+spaces = nil
+streams = nil
+collectgarbage()
+assert(#conn._streams == 0)
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+assert(#conns == 1)
+conn = nil
+collectgarbage()
+assert(#conns == 0)
+
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+space = stream.space.test
+test_run:cmd("setopt delimiter ';'")
+replace_count = 20
+for i = 1, replace_count do
+    space:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+-- Give time to send
+fiber.sleep(0)
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:switch("test")
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:cmd("stop server test")
+
+test_run:cmd("cleanup server test")
+test_run:cmd("delete server test")
diff --git a/test/box/suite.ini b/test/box/suite.ini
index b5d869fb3..94cf7811f 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -5,7 +5,7 @@ script = box.lua
 disabled = rtree_errinj.test.lua tuple_bench.test.lua
 long_run = huge_field_map_long.test.lua
 config = engine.cfg
-release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua
+release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua stream.test.lua
 lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua
 use_unix_sockets = True
 use_unix_sockets_iproto = True
-- 
2.20.1



More information about the Tarantool-patches mailing list