[Tarantool-patches] [PATCH v2 6/8] net.box: add stream support to net.box

mechanik20051988 mechanik20051988 at tarantool.org
Mon Aug 9 17:37:58 MSK 2021


From: mechanik20051988 <mechanik20.05.1988 at gmail.com>

Add stream support to `net.box`. In "net.box", stream
is an object over connection that has the same methods,
but all requests from it sends with non-zero stream ID.
Since there can be a lot of streams, we do not copy the
spaces from the connection to the stream immediately when
creating a stream, but do it only when we first access space.
Also, when updating the schema, we update the spaces in lazy
mode: each stream has it's own schema_version, when there is
some access to stream space we compare stream schema_version
and connection schema_version and if they are different update
clear stream space cache and wrap space that is being accessed
to stream cache.

Part of #5860

@TarantoolBot document
Title: stream support was added to net.box
In "net.box", stream is an object over connection that
has the same methods, but all requests from it sends
with non-zero stream ID. Stream ID is generated on the
client automatically. Simple example of stream creation
using net.box:
```lua
stream = conn:new_stream()
-- all connection methods are valid, but send requests
-- with non zero stream_id.
```
---
 src/box/lua/net_box.c                         |  91 ++--
 src/box/lua/net_box.lua                       | 212 ++++++--
 test/box/access.result                        |   6 +-
 test/box/access.test.lua                      |   6 +-
 ...net.box_console_connections_gh-2677.result |   2 +-
 ...t.box_console_connections_gh-2677.test.lua |   2 +-
 .../net.box_incorrect_iterator_gh-841.result  |   4 +-
 ...net.box_incorrect_iterator_gh-841.test.lua |   4 +-
 test/box/net.box_iproto_hangs_gh-3464.result  |   2 +-
 .../box/net.box_iproto_hangs_gh-3464.test.lua |   2 +-
 .../net.box_long-poll_input_gh-3400.result    |   8 +-
 .../net.box_long-poll_input_gh-3400.test.lua  |   8 +-
 test/box/stream.lua                           |  13 +
 test/box/stream.result                        | 485 ++++++++++++++++++
 test/box/stream.test.lua                      | 182 +++++++
 test/box/suite.ini                            |   2 +-
 16 files changed, 935 insertions(+), 94 deletions(-)
 create mode 100644 test/box/stream.lua
 create mode 100644 test/box/stream.result
 create mode 100644 test/box/stream.test.lua

diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 7779fbfe0..df36e3991 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -76,7 +76,7 @@ enum netbox_method {
 
 static inline size_t
 netbox_prepare_request(struct mpstream *stream, uint64_t sync,
-		       enum iproto_type type)
+		       enum iproto_type type, uint64_t stream_id)
 {
 	/* Remember initial size of ibuf (see netbox_encode_request()) */
 	struct ibuf *ibuf = stream->ctx;
@@ -88,7 +88,7 @@ netbox_prepare_request(struct mpstream *stream, uint64_t sync,
 	mpstream_advance(stream, fixheader_size);
 
 	/* encode header */
-	mpstream_encode_map(stream, 2);
+	mpstream_encode_map(stream, stream_id != 0 ? 3 : 2);
 
 	mpstream_encode_uint(stream, IPROTO_SYNC);
 	mpstream_encode_uint(stream, sync);
@@ -96,6 +96,10 @@ netbox_prepare_request(struct mpstream *stream, uint64_t sync,
 	mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE);
 	mpstream_encode_uint(stream, type);
 
+	if (stream_id != 0) {
+		mpstream_encode_uint(stream, IPROTO_STREAM_ID);
+		mpstream_encode_uint(stream, stream_id);
+	}
 	/* Caller should remember how many bytes was used in ibuf */
 	return used;
 }
@@ -128,11 +132,12 @@ netbox_encode_request(struct mpstream *stream, size_t initial_size)
 
 static void
 netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
 	(void)L;
 	(void)idx;
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_PING);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_PING, stream_id);
 	netbox_encode_request(stream, svp);
 }
 
@@ -149,7 +154,7 @@ netbox_encode_auth(lua_State *L)
 	struct mpstream stream;
 	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
 		      luamp_error, L);
-	size_t svp = netbox_prepare_request(&stream, sync, IPROTO_AUTH);
+	size_t svp = netbox_prepare_request(&stream, sync, IPROTO_AUTH, 0);
 
 	size_t user_len;
 	const char *user = lua_tolstring(L, 3, &user_len);
@@ -179,11 +184,10 @@ netbox_encode_auth(lua_State *L)
 
 static void
 netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
-			uint64_t sync, enum iproto_type type)
+			uint64_t sync, enum iproto_type type, uint64_t stream_id)
 {
 	/* Lua stack at idx: function_name, args */
-	size_t svp = netbox_prepare_request(stream, sync, type);
-
+	size_t svp = netbox_prepare_request(stream, sync, type, stream_id);
 	mpstream_encode_map(stream, 2);
 
 	/* encode proc name */
@@ -201,24 +205,26 @@ netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_call_16(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL_16);
+	netbox_encode_call_impl(L, idx, stream, sync,
+				IPROTO_CALL_16, stream_id);
 }
 
 static void
 netbox_encode_call(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL);
+	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL, stream_id);
 }
 
 static void
 netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: expr, args */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_EVAL);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_EVAL, stream_id);
 
 	mpstream_encode_map(stream, 2);
 
@@ -237,10 +243,11 @@ netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, iterator, offset, limit, key */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_SELECT);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_SELECT, stream_id);
 
 	mpstream_encode_map(stream, 6);
 
@@ -279,10 +286,11 @@ netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
-				uint64_t sync, enum iproto_type type)
+				uint64_t sync, enum iproto_type type,
+				uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, tuple */
-	size_t svp = netbox_prepare_request(stream, sync, type);
+	size_t svp = netbox_prepare_request(stream, sync, type, stream_id);
 
 	mpstream_encode_map(stream, 2);
 
@@ -300,24 +308,27 @@ netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_insert(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_INSERT);
+	netbox_encode_insert_or_replace(L, idx, stream, sync,
+					IPROTO_INSERT, stream_id);
 }
 
 static void
 netbox_encode_replace(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_REPLACE);
+	netbox_encode_insert_or_replace(L, idx, stream, sync,
+					IPROTO_REPLACE, stream_id);
 }
 
 static void
 netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, key */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_DELETE);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_DELETE, stream_id);
 
 	mpstream_encode_map(stream, 3);
 
@@ -340,10 +351,11 @@ netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, key, ops */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_UPDATE);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_UPDATE, stream_id);
 
 	mpstream_encode_map(stream, 5);
 
@@ -374,10 +386,11 @@ netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_upsert(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, tuple, ops */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_UPSERT);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_UPSERT, stream_id);
 
 	mpstream_encode_map(stream, 4);
 
@@ -545,10 +558,11 @@ handle_error:
 
 static void
 netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query, parameters, options */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_EXECUTE);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_EXECUTE, stream_id);
 
 	mpstream_encode_map(stream, 3);
 
@@ -574,10 +588,11 @@ netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query */
-	size_t svp = netbox_prepare_request(stream, sync, IPROTO_PREPARE);
+	size_t svp = netbox_prepare_request(stream, sync,
+					    IPROTO_PREPARE, stream_id);
 
 	mpstream_encode_map(stream, 1);
 
@@ -597,18 +612,19 @@ netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
-			uint64_t sync)
+			uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query, parameters, options */
-	netbox_encode_prepare(L, idx, stream, sync);
+	netbox_encode_prepare(L, idx, stream, sync, stream_id);
 }
 
 static void
 netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: bytes */
 	(void)sync;
+	(void)stream_id;
 	size_t len;
 	const char *data = lua_tolstring(L, idx, &len);
 	mpstream_memcpy(stream, data, len);
@@ -630,7 +646,7 @@ netbox_encode_method(struct lua_State *L)
 {
 	typedef void (*method_encoder_f)(struct lua_State *L, int idx,
 					 struct mpstream *stream,
-					 uint64_t sync);
+					 uint64_t sync, uint64_t stream_id);
 	static method_encoder_f method_encoder[] = {
 		[NETBOX_PING]		= netbox_encode_ping,
 		[NETBOX_CALL_16]	= netbox_encode_call_16,
@@ -655,10 +671,11 @@ netbox_encode_method(struct lua_State *L)
 	assert(method < netbox_method_MAX);
 	struct ibuf *ibuf = (struct ibuf *)lua_topointer(L, 2);
 	uint64_t sync = luaL_touint64(L, 3);
+	uint64_t stream_id = luaL_touint64(L, 4);
 	struct mpstream stream;
 	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
 		      luamp_error, L);
-	method_encoder[method](L, 4, &stream, sync);
+	method_encoder[method](L, 5, &stream, sync, stream_id);
 	return 0;
 }
 
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 9af6028eb..34b396235 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -483,7 +483,7 @@ local function create_transport(host, port, user, password, callback,
     -- @retval not nil Future object.
     --
     local function perform_async_request(buffer, skip_header, method, on_push,
-                                         on_push_ctx, format, ...)
+                                         on_push_ctx, format, stream_id, ...)
         if state ~= 'active' and state ~= 'fetch_schema' then
             local code = last_errno or E_NO_CONNECTION
             local msg = last_error or
@@ -497,7 +497,7 @@ local function create_transport(host, port, user, password, callback,
             worker_fiber:wakeup()
         end
         local id = next_request_id
-        encode_method(method, send_buf, id, ...)
+        encode_method(method, send_buf, id, stream_id, ...)
         next_request_id = next_id(id)
         -- Request in most cases has maximum 10 members:
         -- method, buffer, skip_header, id, cond, errno, response,
@@ -521,10 +521,10 @@ local function create_transport(host, port, user, password, callback,
     -- @retval not nil Response object.
     --
     local function perform_request(timeout, buffer, skip_header, method,
-                                   on_push, on_push_ctx, format, ...)
+                                   on_push, on_push_ctx, format, stream_id, ...)
         local request, err =
             perform_async_request(buffer, skip_header, method, on_push,
-                                  on_push_ctx, format, ...)
+                                  on_push_ctx, format, stream_id, ...)
         if not request then
             return nil, err
         end
@@ -710,7 +710,7 @@ local function create_transport(host, port, user, password, callback,
             log.warn("Netbox text protocol support is deprecated since 1.10, "..
                      "please use require('console').connect() instead")
             local setup_delimiter = 'require("console").delimiter("$EOF$")\n'
-            encode_method(M_INJECT, send_buf, nil, setup_delimiter)
+            encode_method(M_INJECT, send_buf, nil, nil, setup_delimiter)
             local err, response = send_and_recv_console()
             if err then
                 return error_sm(err, response)
@@ -770,15 +770,15 @@ local function create_transport(host, port, user, password, callback,
         local select3_id
         local response = {}
         -- fetch everything from space _vspace, 2 = ITER_ALL
-        encode_method(M_SELECT, send_buf, select1_id, VSPACE_ID, 0, 2, 0,
+        encode_method(M_SELECT, send_buf, select1_id, nil, VSPACE_ID, 0, 2, 0,
                       0xFFFFFFFF, nil)
         -- fetch everything from space _vindex, 2 = ITER_ALL
-        encode_method(M_SELECT, send_buf, select2_id, VINDEX_ID, 0, 2, 0,
+        encode_method(M_SELECT, send_buf, select2_id, nil, VINDEX_ID, 0, 2, 0,
                       0xFFFFFFFF, nil)
         -- fetch everything from space _vcollation, 2 = ITER_ALL
         if peer_has_vcollation then
             select3_id = new_request_id()
-            encode_method(M_SELECT, send_buf, select3_id, VCOLLATION_ID,
+            encode_method(M_SELECT, send_buf, select3_id, nil, VCOLLATION_ID,
                           0, 2, 0, 0xFFFFFFFF, nil)
         end
 
@@ -930,6 +930,37 @@ local function remote_serialize(self)
     }
 end
 
+local function stream_serialize(self)
+    return {
+        host = self._conn.host,
+        port = self._conn.port,
+        opts = next(self._conn.opts) and self._conn.opts,
+        state = self._conn.state,
+        error = self._conn.error,
+        protocol = self._conn.protocol,
+        schema_version = self._conn.schema_version,
+        peer_uuid = self._conn.peer_uuid,
+        peer_version_id = self._conn.peer_version_id,
+        stream_id = self._stream_id
+    }
+end
+
+local function stream_spaces_serialize(self)
+    return self._stream._conn.space
+end
+
+local function stream_space_serialize(self)
+    return self._src
+end
+
+local function stream_indexes_serialize(self)
+    return self._space._src.index
+end
+
+local function stream_index_serialize(self)
+    return self._src
+end
+
 local remote_methods = {}
 local remote_mt = {
     __index = remote_methods, __serialize = remote_serialize,
@@ -942,6 +973,86 @@ local console_mt = {
     __metatable = false
 }
 
+-- Create stream space index, which is same as connection space
+-- index, but have non zero stream ID.
+local function stream_wrap_index(stream_id, src)
+    return setmetatable({
+        _stream_id = stream_id,
+        _src = src,
+    }, {
+        __index = src,
+        __serialize = stream_index_serialize
+    })
+end
+
+-- Metatable for stream space indexes. When stream space being
+-- created there are no indexes in it. When accessing the space
+-- index, we look for corresponding space index in corresponding
+-- connection space. If it is found we create same index for the
+-- stream space but with corresponding stream ID. We do not need
+-- to compare stream _schema_version and connection schema_version,
+-- because all access to index  is carried out through it's space.
+-- So we update schema_version when we access space.
+local stream_indexes_mt = {
+    __index = function(self, key)
+        local _space = self._space
+        local src = _space._src.index[key]
+        if not src then
+            return nil
+        end
+        local res = stream_wrap_index(_space._stream_id, src)
+        self[key] = res
+        return res
+    end,
+    __serialize = stream_indexes_serialize
+}
+
+-- Create stream space, which is same as connection space,
+-- but have non zero stream ID.
+local function stream_wrap_space(stream, src)
+    local res = setmetatable({
+        _stream_id = stream._stream_id,
+        _src = src,
+        index = setmetatable({
+            _space = nil,
+        }, stream_indexes_mt)
+    }, {
+        __index = src,
+        __serialize = stream_space_serialize
+    })
+    res.index._space = res
+    return res
+end
+
+-- Metatable for stream spaces. When stream being created there
+-- are no spaces in it. When user try to access some space in
+-- stream, we first of all compare _schema_version of stream with
+-- schema_version from connection and if they are not equal, we
+-- clear stream space cache and update it's schema_version. Then
+-- we look for corresponding space in the connection. If it is
+-- found we create same space for the stream but with corresponding
+-- stream ID.
+local stream_spaces_mt = {
+    __index = function(self, key)
+        local stream = self._stream
+        if stream._schema_version ~= stream._conn.schema_version then
+            stream._schema_version = stream._conn.schema_version
+            self._stream_space_cache = {}
+        end
+        if self._stream_space_cache[key] then
+            return self._stream_space_cache[key]
+        end
+        local src = stream._conn.space[key]
+        if not src then
+            return nil
+        end
+        local res = stream_wrap_space(stream, src)
+        self._stream_space_cache[key] = res
+        return res
+    end,
+    __serialize = stream_spaces_serialize
+}
+
 local space_metatable, index_metatable
 
 local function new_sm(host, port, opts, connection, greeting)
@@ -1021,6 +1132,8 @@ local function new_sm(host, port, opts, connection, greeting)
     if opts.wait_connected ~= false then
         remote._transport.wait_state('active', tonumber(opts.wait_connected))
     end
+    -- Last stream ID used for this connection
+    remote._last_stream_id = 0
     return remote
 end
 
@@ -1078,6 +1191,28 @@ local function check_eval_args(args)
     end
 end
 
+local function new_stream(stream)
+    check_remote_arg(stream, 'new_stream')
+    return stream._conn:new_stream()
+end
+
+function remote_methods:new_stream()
+    check_remote_arg(self, 'new_stream')
+    self._last_stream_id = self._last_stream_id + 1
+    local stream = setmetatable({
+        new_stream = new_stream,
+        _stream_id = self._last_stream_id,
+        space = setmetatable({
+            _stream_space_cache = {},
+            _stream = nil,
+        }, stream_spaces_mt),
+        _conn = self,
+        _schema_version = self.schema_version,
+    }, { __index = self, __serialize = stream_serialize })
+    stream.space._stream = stream
+    return stream
+end
+
 function remote_methods:close()
     check_remote_arg(self, 'close')
     self._transport.stop()
@@ -1108,7 +1243,7 @@ function remote_methods:wait_connected(timeout)
     return self._transport.wait_state('active', timeout)
 end
 
-function remote_methods:_request(method, opts, format, ...)
+function remote_methods:_request(method, opts, format, stream_id, ...)
     local transport = self._transport
     local on_push, on_push_ctx, buffer, skip_header, deadline
     -- Extract options, set defaults, check if the request is
@@ -1123,7 +1258,7 @@ function remote_methods:_request(method, opts, format, ...)
             local res, err =
                 transport.perform_async_request(buffer, skip_header, method,
                                                 table.insert, {}, format,
-                                                ...)
+                                                stream_id, ...)
             if err then
                 box.error(err)
             end
@@ -1145,7 +1280,7 @@ function remote_methods:_request(method, opts, format, ...)
     end
     local res, err = transport.perform_request(timeout, buffer, skip_header,
                                                method, on_push, on_push_ctx,
-                                               format, ...)
+                                               format, stream_id, ...)
     if err then
         box.error(err)
     end
@@ -1161,7 +1296,7 @@ end
 
 function remote_methods:ping(opts)
     check_remote_arg(self, 'ping')
-    return (pcall(self._request, self, M_PING, opts))
+    return (pcall(self._request, self, M_PING, opts, nil, self._stream_id))
 end
 
 function remote_methods:reload_schema()
@@ -1172,14 +1307,16 @@ end
 -- @deprecated since 1.7.4
 function remote_methods:call_16(func_name, ...)
     check_remote_arg(self, 'call')
-    return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...}))
+    return (self:_request(M_CALL_16, nil, nil, self._stream_id,
+                          tostring(func_name), {...}))
 end
 
 function remote_methods:call(func_name, args, opts)
     check_remote_arg(self, 'call')
     check_call_args(args)
     args = args or {}
-    local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args)
+    local res = self:_request(M_CALL_17, opts, nil, self._stream_id,
+                              tostring(func_name), args)
     if type(res) ~= 'table' or opts and opts.is_async then
         return res
     end
@@ -1189,14 +1326,15 @@ end
 -- @deprecated since 1.7.4
 function remote_methods:eval_16(code, ...)
     check_remote_arg(self, 'eval')
-    return unpack((self:_request(M_EVAL, nil, nil, code, {...})))
+    return unpack((self:_request(M_EVAL, nil, nil, self._stream_id,
+                                 code, {...})))
 end
 
 function remote_methods:eval(code, args, opts)
     check_remote_arg(self, 'eval')
     check_eval_args(args)
     args = args or {}
-    local res = self:_request(M_EVAL, opts, nil, code, args)
+    local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args)
     if type(res) ~= 'table' or opts and opts.is_async then
         return res
     end
@@ -1208,8 +1346,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts)
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "execute", "options")
     end
-    return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {},
-                         sql_opts or {})
+    return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id,
+                         query, parameters or {}, sql_opts or {})
 end
 
 function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args
@@ -1220,7 +1358,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "prepare", "options")
     end
-    return self:_request(M_PREPARE, netbox_opts, nil, query)
+    return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query)
 end
 
 function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
@@ -1231,8 +1369,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "unprepare", "options")
     end
-    return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {},
-                         sql_opts or {})
+    return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id,
+                         query, parameters or {}, sql_opts or {})
 end
 
 function remote_methods:wait_state(state, timeout)
@@ -1370,11 +1508,11 @@ function console_methods:eval(line, timeout)
     end
     if self.protocol == 'Binary' then
         local loader = 'return require("console").eval(...)'
-        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader,
+        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader,
                       {line})
     else
         assert(self.protocol == 'Lua console')
-        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil,
+        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil,
                       line..'$EOF$\n')
     end
     if err then
@@ -1394,14 +1532,14 @@ space_metatable = function(remote)
 
     function methods:insert(tuple, opts)
         check_space_arg(self, 'insert')
-        return remote:_request(M_INSERT, opts, self._format_cdata, self.id,
-                               tuple)
+        return remote:_request(M_INSERT, opts, self._format_cdata,
+                               self._stream_id, self.id, tuple)
     end
 
     function methods:replace(tuple, opts)
         check_space_arg(self, 'replace')
-        return remote:_request(M_REPLACE, opts, self._format_cdata, self.id,
-                               tuple)
+        return remote:_request(M_REPLACE, opts, self._format_cdata,
+                               self._stream_id, self.id, tuple)
     end
 
     function methods:select(key, opts)
@@ -1421,7 +1559,8 @@ space_metatable = function(remote)
 
     function methods:upsert(key, oplist, opts)
         check_space_arg(self, 'upsert')
-        return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id,
+        return nothing_or_data(remote:_request(M_UPSERT, opts, nil,
+                                               self._stream_id, self.id,
                                                key, oplist))
     end
 
@@ -1452,8 +1591,8 @@ index_metatable = function(remote)
         local offset = tonumber(opts and opts.offset) or 0
         local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
         return (remote:_request(M_SELECT, opts, self.space._format_cdata,
-                                self.space.id, self.id, iterator, offset,
-                                limit, key))
+                                self._stream_id, self.space.id, self.id,
+                                iterator, offset, limit, key))
     end
 
     function methods:get(key, opts)
@@ -1463,6 +1602,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_GET, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.EQ, 0, 2, key))
     end
@@ -1474,6 +1614,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_MIN, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.GE, 0, 1, key))
     end
@@ -1485,6 +1626,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_MAX, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.LE, 0, 1, key))
     end
@@ -1496,22 +1638,24 @@ index_metatable = function(remote)
         end
         local code = string.format('box.space.%s.index.%s:count',
                                    self.space.name, self.name)
-        return remote:_request(M_COUNT, opts, nil, code, { key, opts })
+        return remote:_request(M_COUNT, opts, nil, self._stream_id,
+                               code, { key, opts })
     end
 
     function methods:delete(key, opts)
         check_index_arg(self, 'delete')
         return nothing_or_data(remote:_request(M_DELETE, opts,
                                                self.space._format_cdata,
-                                               self.space.id, self.id, key))
+                                               self._stream_id, self.space.id,
+                                               self.id, key))
     end
 
     function methods:update(key, oplist, opts)
         check_index_arg(self, 'update')
         return nothing_or_data(remote:_request(M_UPDATE, opts,
                                                self.space._format_cdata,
-                                               self.space.id, self.id, key,
-                                               oplist))
+                                               self._stream_id, self.space.id,
+                                               self.id, key, oplist))
     end
 
     return { __index = methods, __metatable = false }
diff --git a/test/box/access.result b/test/box/access.result
index 712cd68f8..6434da907 100644
--- a/test/box/access.result
+++ b/test/box/access.result
@@ -908,15 +908,15 @@ LISTEN = require('uri').parse(box.cfg.listen)
 c = net.connect(LISTEN.host, LISTEN.service)
 ---
 ...
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '1' does not exist
 ...
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '65537' does not exist
 ...
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '4294967295' does not exist
 ...
diff --git a/test/box/access.test.lua b/test/box/access.test.lua
index 6060475d1..6abdb780d 100644
--- a/test/box/access.test.lua
+++ b/test/box/access.test.lua
@@ -351,9 +351,9 @@ box.schema.func.drop(name)
 -- very large space id, no crash occurs.
 LISTEN = require('uri').parse(box.cfg.listen)
 c = net.connect(LISTEN.host, LISTEN.service)
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 c:close()
 
 session = box.session
diff --git a/test/box/net.box_console_connections_gh-2677.result b/test/box/net.box_console_connections_gh-2677.result
index f45aa0b56..7cea0a1da 100644
--- a/test/box/net.box_console_connections_gh-2677.result
+++ b/test/box/net.box_console_connections_gh-2677.result
@@ -74,7 +74,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
 ---
 ...
 while not c:is_connected() do fiber.sleep(0.01) end
diff --git a/test/box/net.box_console_connections_gh-2677.test.lua b/test/box/net.box_console_connections_gh-2677.test.lua
index 40d099e70..6c4e6ea4f 100644
--- a/test/box/net.box_console_connections_gh-2677.test.lua
+++ b/test/box/net.box_console_connections_gh-2677.test.lua
@@ -30,7 +30,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
 while not c:is_connected() do fiber.sleep(0.01) end
 c:ping()
 
diff --git a/test/box/net.box_incorrect_iterator_gh-841.result b/test/box/net.box_incorrect_iterator_gh-841.result
index fbd2a7700..cd2a86787 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.result
+++ b/test/box/net.box_incorrect_iterator_gh-841.result
@@ -16,13 +16,13 @@ test_run:cmd("setopt delimiter ';'")
 - true
 ...
 function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
-    local ret = cn:_request(remote._method.select, opts, nil, space_id,
+    local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
                             index_id, iterator, offset, limit, key)
     return ret
 end
 function x_fatal(cn)
     cn._transport.perform_request(nil, nil, false, remote._method.inject,
-                                  nil, nil, nil, '\x80')
+                                  nil, nil, nil, nil, '\x80')
 end
 test_run:cmd("setopt delimiter ''");
 ---
diff --git a/test/box/net.box_incorrect_iterator_gh-841.test.lua b/test/box/net.box_incorrect_iterator_gh-841.test.lua
index 1d24f9f56..9c42175ef 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.test.lua
+++ b/test/box/net.box_incorrect_iterator_gh-841.test.lua
@@ -5,13 +5,13 @@ test_run:cmd("push filter ".."'\\.lua.*:[0-9]+: ' to '.lua...\"]:<line>: '")
 
 test_run:cmd("setopt delimiter ';'")
 function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
-    local ret = cn:_request(remote._method.select, opts, nil, space_id,
+    local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
                             index_id, iterator, offset, limit, key)
     return ret
 end
 function x_fatal(cn)
     cn._transport.perform_request(nil, nil, false, remote._method.inject,
-                                  nil, nil, nil, '\x80')
+                                  nil, nil, nil, nil, '\x80')
 end
 test_run:cmd("setopt delimiter ''");
 
diff --git a/test/box/net.box_iproto_hangs_gh-3464.result b/test/box/net.box_iproto_hangs_gh-3464.result
index 3b5458c9a..cbf8181b3 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.result
+++ b/test/box/net.box_iproto_hangs_gh-3464.result
@@ -17,7 +17,7 @@ c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
 ---
 ...
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
 ---
 - null
 - Peer closed
diff --git a/test/box/net.box_iproto_hangs_gh-3464.test.lua b/test/box/net.box_iproto_hangs_gh-3464.test.lua
index a7c41ae76..51a9ddece 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.test.lua
+++ b/test/box/net.box_iproto_hangs_gh-3464.test.lua
@@ -8,6 +8,6 @@ net = require('net.box')
 --
 c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
 c:close()
 test_run:grep_log('default', 'too big packet size in the header') ~= nil
diff --git a/test/box/net.box_long-poll_input_gh-3400.result b/test/box/net.box_long-poll_input_gh-3400.result
index a16110ee6..a98eea655 100644
--- a/test/box/net.box_long-poll_input_gh-3400.result
+++ b/test/box/net.box_long-poll_input_gh-3400.result
@@ -24,10 +24,10 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, false,    \
-                 net._method.call_17, nil, nil, nil, 'long', {})   \
-c._transport.perform_request(nil, nil, false, net._method.inject,  \
-                             nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false,         \
+                 net._method.call_17, nil, nil, nil, nil, 'long', {})   \
+c._transport.perform_request(nil, nil, false, net._method.inject,       \
+                             nil, nil, nil, nil, '\x80')
 ---
 ...
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/net.box_long-poll_input_gh-3400.test.lua b/test/box/net.box_long-poll_input_gh-3400.test.lua
index 891b59224..a6f302ee0 100644
--- a/test/box/net.box_long-poll_input_gh-3400.test.lua
+++ b/test/box/net.box_long-poll_input_gh-3400.test.lua
@@ -14,9 +14,9 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, false,    \
-                 net._method.call_17, nil, nil, nil, 'long', {})   \
-c._transport.perform_request(nil, nil, false, net._method.inject,  \
-                             nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false,         \
+                 net._method.call_17, nil, nil, nil, nil, 'long', {})   \
+c._transport.perform_request(nil, nil, false, net._method.inject,       \
+                             nil, nil, nil, nil, '\x80')
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 c:close()
diff --git a/test/box/stream.lua b/test/box/stream.lua
new file mode 100644
index 000000000..db6a29a8a
--- /dev/null
+++ b/test/box/stream.lua
@@ -0,0 +1,13 @@
+#!/usr/bin/env tarantool
+
+require('console').listen(os.getenv('ADMIN'))
+
+local memtx_use_mvcc_engine = (arg[2] and arg[2] == 'true' and true or false)
+
+box.cfg({
+    listen = os.getenv('LISTEN'),
+    iproto_threads = tonumber(arg[1]),
+    memtx_use_mvcc_engine = memtx_use_mvcc_engine
+})
+
+box.schema.user.grant('guest', 'read,write,execute,create,drop', 'universe',  nil, {if_not_exists = true})
diff --git a/test/box/stream.result b/test/box/stream.result
new file mode 100644
index 000000000..03200ecf6
--- /dev/null
+++ b/test/box/stream.result
@@ -0,0 +1,485 @@
+-- test-run result file version 2
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+test_run:cmd("create server test with script='box/stream.lua'")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+ | ---
+ | ...
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+conn_1 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn_1:new_stream()
+ | ---
+ | ...
+conn_2 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_2 = conn_2:new_stream()
+ | ---
+ | ...
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+ | ---
+ | ...
+assert(not stream_1:ping())
+ | ---
+ | - true
+ | ...
+stream_2:close()
+ | ---
+ | ...
+assert(not conn_2:ping())
+ | ---
+ | - true
+ | ...
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+-- The new method `new_stream`, for the stream object, returns a new
+-- stream object, just as in the case of connection.
+_ = stream:new_stream()
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+assert(not conn.space.test)
+ | ---
+ | - true
+ | ...
+assert(not stream.space.test)
+ | ---
+ | - true
+ | ...
+assert(conn.schema_version == stream._schema_version)
+ | ---
+ | - true
+ | ...
+conn:reload_schema()
+ | ---
+ | ...
+assert(conn.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+assert(conn.schema_version ~= stream._schema_version)
+ | ---
+ | - true
+ | ...
+assert(stream.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+ | ---
+ | - true
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+assert(conn.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+assert(stream.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s:drop()
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+conn:reload_schema()
+ | ---
+ | ...
+assert(not conn.space.test)
+ | ---
+ | - true
+ | ...
+assert(not stream.space.test)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- All test works with iproto_thread count = 10
+
+test_run:cmd("start server test with args='10'")
+ | ---
+ | - true
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function replace_with_yeild(item)
+    fiber.sleep(0.1)
+    return s:replace({item})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+conn_space = conn.space.test
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+stream_space = stream.space.test
+ | ---
+ | ...
+
+-- Check that all requests in stream processed consistently
+futures = {}
+ | ---
+ | ...
+replace_count = 3
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+for i = 1, replace_count do
+    futures[string.format("replace_%d", i)] =
+        stream_space:replace({i}, {is_async = true})
+    futures[string.format("select_%d", i)] =
+        stream_space:select({}, {is_async = true})
+end;
+ | ---
+ | ...
+futures["replace_with_yeild_for_stream"] =
+    stream:call("replace_with_yeild",
+                { replace_count + 1 }, {is_async = true});
+ | ---
+ | ...
+futures["select_with_yeild_for_stream"] =
+    stream_space:select({}, {is_async = true});
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+results = wait_and_return_results(futures)
+ | ---
+ | ...
+-- [1]
+assert(results["select_1"])
+ | ---
+ | - - [1]
+ | ...
+-- [1] [2]
+assert(results["select_2"])
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+-- [1] [2] [3]
+assert(results["select_3"])
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ | ...
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ | ...
+
+-- There is no request execution order for the connection
+futures = {}
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+futures["replace_with_yeild_for_connection"] =
+    conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+ | ---
+ | ...
+futures["select_with_yeild_for_connection"] =
+    conn_space:select({}, {is_async = true});
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+results = wait_and_return_results(futures)
+ | ---
+ | ...
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- [1] [2] [3] [4] [5]
+s:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ |   - [5]
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+space = stream.space.test
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+replace_count = 20
+for i = 1, replace_count do
+    space:replace({i}, {is_async = true})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+-- Give time to send
+fiber.sleep(0)
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ |   - [5]
+ |   - [6]
+ |   - [7]
+ |   - [8]
+ |   - [9]
+ |   - [10]
+ |   - [11]
+ |   - [12]
+ |   - [13]
+ |   - [14]
+ |   - [15]
+ |   - [16]
+ |   - [17]
+ |   - [18]
+ |   - [19]
+ |   - [20]
+ | ...
+s:drop()
+ | ---
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("cleanup server test")
+ | ---
+ | - true
+ | ...
+test_run:cmd("delete server test")
+ | ---
+ | - true
+ | ...
diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua
new file mode 100644
index 000000000..72129a228
--- /dev/null
+++ b/test/box/stream.test.lua
@@ -0,0 +1,182 @@
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+fiber = require('fiber')
+test_run = require('test_run').new()
+
+test_run:cmd("create server test with script='box/stream.lua'")
+
+test_run:cmd("setopt delimiter ';'")
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn_1 = net_box.connect(server_addr)
+stream_1 = conn_1:new_stream()
+conn_2 = net_box.connect(server_addr)
+stream_2 = conn_2:new_stream()
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+assert(not stream_1:ping())
+stream_2:close()
+assert(not conn_2:ping())
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+-- The new method `new_stream`, for the stream object, returns a new
+-- stream object, just as in the case of connection.
+_ = stream:new_stream()
+conn:close()
+
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+test_run:switch("test")
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch("default")
+assert(not conn.space.test)
+assert(not stream.space.test)
+assert(conn.schema_version == stream._schema_version)
+conn:reload_schema()
+assert(conn.space.test ~= nil)
+assert(conn.schema_version ~= stream._schema_version)
+assert(stream.space.test ~= nil)
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+collectgarbage()
+collectgarbage()
+assert(conn.space.test ~= nil)
+assert(stream.space.test ~= nil)
+test_run:switch("test")
+s:drop()
+test_run:switch("default")
+conn:reload_schema()
+assert(not conn.space.test)
+assert(not stream.space.test)
+test_run:cmd("stop server test")
+
+-- All test works with iproto_thread count = 10
+
+test_run:cmd("start server test with args='10'")
+test_run:switch('test')
+fiber = require('fiber')
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:cmd("setopt delimiter ';'")
+function replace_with_yeild(item)
+    fiber.sleep(0.1)
+    return s:replace({item})
+end;
+test_run:cmd("setopt delimiter ''");
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+conn_space = conn.space.test
+stream = conn:new_stream()
+stream_space = stream.space.test
+
+-- Check that all requests in stream processed consistently
+futures = {}
+replace_count = 3
+test_run:cmd("setopt delimiter ';'")
+for i = 1, replace_count do
+    futures[string.format("replace_%d", i)] =
+        stream_space:replace({i}, {is_async = true})
+    futures[string.format("select_%d", i)] =
+        stream_space:select({}, {is_async = true})
+end;
+futures["replace_with_yeild_for_stream"] =
+    stream:call("replace_with_yeild",
+                { replace_count + 1 }, {is_async = true});
+futures["select_with_yeild_for_stream"] =
+    stream_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1]
+assert(results["select_1"])
+-- [1] [2]
+assert(results["select_2"])
+-- [1] [2] [3]
+assert(results["select_3"])
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+
+-- There is no request execution order for the connection
+futures = {}
+test_run:cmd("setopt delimiter ';'")
+futures["replace_with_yeild_for_connection"] =
+    conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+futures["select_with_yeild_for_connection"] =
+    conn_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+test_run:switch("test")
+-- [1] [2] [3] [4] [5]
+s:select()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+space = stream.space.test
+test_run:cmd("setopt delimiter ';'")
+replace_count = 20
+for i = 1, replace_count do
+    space:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+-- Give time to send
+fiber.sleep(0)
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:switch("test")
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:cmd("stop server test")
+
+test_run:cmd("cleanup server test")
+test_run:cmd("delete server test")
diff --git a/test/box/suite.ini b/test/box/suite.ini
index b5d869fb3..94cf7811f 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -5,7 +5,7 @@ script = box.lua
 disabled = rtree_errinj.test.lua tuple_bench.test.lua
 long_run = huge_field_map_long.test.lua
 config = engine.cfg
-release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua
+release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua stream.test.lua
 lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua
 use_unix_sockets = True
 use_unix_sockets_iproto = True
-- 
2.20.1



More information about the Tarantool-patches mailing list