[Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box

mechanik20051988 mechanik20051988 at tarantool.org
Wed Aug 11 11:56:56 MSK 2021


From: mechanik20051988 <mechanik20.05.1988 at gmail.com>

Add stream support to `net.box`. In "net.box", stream
is an object over connection that has the same methods,
but all requests from it sends with non-zero stream ID.
Since there can be a lot of streams, we do not copy the
spaces from the connection to the stream immediately when
creating a stream, but do it only when we first access space.
Also, when updating the schema, we update the spaces in lazy
mode: each stream has it's own schema_version, when there is
some access to stream space we compare stream schema_version
and connection schema_version and if they are different update
clear stream space cache and wrap space that is being accessed
to stream cache.

Part of #5860

@TarantoolBot document
Title: stream support was added to net.box
In "net.box", stream is an object over connection that
has the same methods, but all requests from it sends
with non-zero stream ID. Stream ID is generated on the
client automatically. Simple example of stream creation
using net.box:
```lua
stream = conn:new_stream()
-- all connection methods are valid, but send requests
-- with non zero stream_id.
```
---
 src/box/lua/net_box.c                         |  95 ++--
 src/box/lua/net_box.lua                       | 205 ++++++--
 test/box/access.result                        |   6 +-
 test/box/access.test.lua                      |   6 +-
 ...net.box_console_connections_gh-2677.result |   2 +-
 ...t.box_console_connections_gh-2677.test.lua |   2 +-
 .../net.box_incorrect_iterator_gh-841.result  |   4 +-
 ...net.box_incorrect_iterator_gh-841.test.lua |   4 +-
 test/box/net.box_iproto_hangs_gh-3464.result  |   2 +-
 .../box/net.box_iproto_hangs_gh-3464.test.lua |   2 +-
 .../net.box_long-poll_input_gh-3400.result    |   8 +-
 .../net.box_long-poll_input_gh-3400.test.lua  |   8 +-
 test/box/stream.lua                           |  13 +
 test/box/stream.result                        | 485 ++++++++++++++++++
 test/box/stream.test.lua                      | 182 +++++++
 test/box/suite.ini                            |   2 +-
 16 files changed, 934 insertions(+), 92 deletions(-)
 create mode 100644 test/box/stream.lua
 create mode 100644 test/box/stream.result
 create mode 100644 test/box/stream.test.lua

diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 06e574cdf..3bc49af23 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -328,7 +328,7 @@ netbox_registry_reset(struct netbox_registry *registry, struct error *error)
 
 static inline size_t
 netbox_begin_encode(struct mpstream *stream, uint64_t sync,
-		    enum iproto_type type)
+		    enum iproto_type type, uint64_t stream_id)
 {
 	/* Remember initial size of ibuf (see netbox_end_encode()) */
 	struct ibuf *ibuf = stream->ctx;
@@ -340,7 +340,7 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync,
 	mpstream_advance(stream, fixheader_size);
 
 	/* encode header */
-	mpstream_encode_map(stream, 2);
+	mpstream_encode_map(stream, stream_id != 0 ? 3 : 2);
 
 	mpstream_encode_uint(stream, IPROTO_SYNC);
 	mpstream_encode_uint(stream, sync);
@@ -348,6 +348,10 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync,
 	mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE);
 	mpstream_encode_uint(stream, type);
 
+	if (stream_id != 0) {
+		mpstream_encode_uint(stream, IPROTO_STREAM_ID);
+		mpstream_encode_uint(stream, stream_id);
+	}
 	/* Caller should remember how many bytes was used in ibuf */
 	return used;
 }
@@ -380,11 +384,11 @@ netbox_end_encode(struct mpstream *stream, size_t initial_size)
 
 static void
 netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
 	(void)L;
 	(void)idx;
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING, stream_id);
 	netbox_end_encode(stream, svp);
 }
 
@@ -402,7 +406,7 @@ netbox_encode_auth(struct ibuf *ibuf, uint64_t sync,
 	struct mpstream stream;
 	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
 		      mpstream_error_handler, &is_error);
-	size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH);
+	size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH, 0);
 
 	/* Adapted from xrow_encode_auth() */
 	mpstream_encode_map(&stream, password != NULL ? 2 : 1);
@@ -432,7 +436,7 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id)
 	struct mpstream stream;
 	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
 		      mpstream_error_handler, &is_error);
-	size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT);
+	size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT, 0);
 	mpstream_encode_map(&stream, 3);
 	mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
 	mpstream_encode_uint(&stream, space_id);
@@ -446,10 +450,10 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id)
 
 static void
 netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
-			uint64_t sync, enum iproto_type type)
+			uint64_t sync, enum iproto_type type, uint64_t stream_id)
 {
 	/* Lua stack at idx: function_name, args */
-	size_t svp = netbox_begin_encode(stream, sync, type);
+	size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
 
 	mpstream_encode_map(stream, 2);
 
@@ -468,24 +472,25 @@ netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_call_16(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL_16);
+	netbox_encode_call_impl(L, idx, stream, sync,
+				IPROTO_CALL_16, stream_id);
 }
 
 static void
 netbox_encode_call(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL);
+	netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL, stream_id);
 }
 
 static void
 netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
-		   uint64_t sync)
+		   uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: expr, args */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL, stream_id);
 
 	mpstream_encode_map(stream, 2);
 
@@ -504,10 +509,11 @@ netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, iterator, offset, limit, key */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT,
+					 stream_id);
 
 	mpstream_encode_map(stream, 6);
 
@@ -546,10 +552,11 @@ netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
-				uint64_t sync, enum iproto_type type)
+				uint64_t sync, enum iproto_type type,
+				uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, tuple */
-	size_t svp = netbox_begin_encode(stream, sync, type);
+	size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
 
 	mpstream_encode_map(stream, 2);
 
@@ -567,24 +574,27 @@ netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_insert(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_INSERT);
+	netbox_encode_insert_or_replace(L, idx, stream, sync,
+					IPROTO_INSERT, stream_id);
 }
 
 static void
 netbox_encode_replace(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
-	netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_REPLACE);
+	netbox_encode_insert_or_replace(L, idx, stream, sync,
+					IPROTO_REPLACE, stream_id);
 }
 
 static void
 netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, key */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE,
+					 stream_id);
 
 	mpstream_encode_map(stream, 3);
 
@@ -607,10 +617,11 @@ netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, index_id, key, ops */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE,
+					 stream_id);
 
 	mpstream_encode_map(stream, 5);
 
@@ -641,10 +652,11 @@ netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_upsert(lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: space_id, tuple, ops */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT,
+					 stream_id);
 
 	mpstream_encode_map(stream, 4);
 
@@ -844,10 +856,11 @@ netbox_send_and_recv_console(int fd, struct ibuf *send_buf,
 
 static void
 netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query, parameters, options */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE,
+					 stream_id);
 
 	mpstream_encode_map(stream, 3);
 
@@ -873,10 +886,11 @@ netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
-		      uint64_t sync)
+		      uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query */
-	size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE,
+					 stream_id);
 
 	mpstream_encode_map(stream, 1);
 
@@ -896,18 +910,19 @@ netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
 
 static void
 netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
-			uint64_t sync)
+			uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: query, parameters, options */
-	netbox_encode_prepare(L, idx, stream, sync);
+	netbox_encode_prepare(L, idx, stream, sync, stream_id);
 }
 
 static void
 netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
-		     uint64_t sync)
+		     uint64_t sync, uint64_t stream_id)
 {
 	/* Lua stack at idx: bytes */
 	(void)sync;
+	(void)stream_id;
 	size_t len;
 	const char *data = lua_tolstring(L, idx, &len);
 	mpstream_memcpy(stream, data, len);
@@ -921,11 +936,11 @@ netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
  */
 static int
 netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
-		     struct ibuf *ibuf, uint64_t sync)
+		     struct ibuf *ibuf, uint64_t sync, uint64_t stream_id)
 {
 	typedef void (*method_encoder_f)(struct lua_State *L, int idx,
 					 struct mpstream *stream,
-					 uint64_t sync);
+					 uint64_t sync, uint64_t stream_id);
 	static method_encoder_f method_encoder[] = {
 		[NETBOX_PING]		= netbox_encode_ping,
 		[NETBOX_CALL_16]	= netbox_encode_call_16,
@@ -949,7 +964,7 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
 	struct mpstream stream;
 	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
 		      luamp_error, L);
-	method_encoder[method](L, idx, &stream, sync);
+	method_encoder[method](L, idx, &stream, sync, stream_id);
 	return 0;
 }
 
@@ -1569,6 +1584,7 @@ netbox_new_registry(struct lua_State *L)
  *  - on_push: on_push trigger function
  *  - on_push_ctx: on_push trigger function argument
  *  - format: tuple format to use for decoding the body or nil
+ *  - stream_id: determines whether or not the request belongs to stream
  *  - ...: method-specific arguments passed to the encoder
  */
 static void
@@ -1581,7 +1597,8 @@ netbox_make_request(struct lua_State *L, int idx,
 	enum netbox_method method = lua_tointeger(L, idx + 4);
 	assert(method < netbox_method_MAX);
 	uint64_t sync = registry->next_sync++;
-	netbox_encode_method(L, idx + 8, method, send_buf, sync);
+	uint64_t stream_id = luaL_touint64(L, idx + 8);
+	netbox_encode_method(L, idx + 9, method, send_buf, sync, stream_id);
 
 	/* Initialize and register the request object. */
 	request->method = method;
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 8f5671c15..3dffc245f 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -275,14 +275,14 @@ local function create_transport(host, port, user, password, callback,
     -- @retval not nil Future object.
     --
     local function perform_async_request(buffer, skip_header, method, on_push,
-                                         on_push_ctx, format, ...)
+                                         on_push_ctx, format, stream_id, ...)
         local err = prepare_perform_request()
         if err then
             return nil, err
         end
         return perform_async_request_impl(requests, send_buf, buffer,
                                           skip_header, method, on_push,
-                                          on_push_ctx, format, ...)
+                                          on_push_ctx, format, stream_id, ...)
     end
 
     --
@@ -291,14 +291,15 @@ local function create_transport(host, port, user, password, callback,
     -- @retval not nil Response object.
     --
     local function perform_request(timeout, buffer, skip_header, method,
-                                   on_push, on_push_ctx, format, ...)
+                                   on_push, on_push_ctx, format,
+                                   stream_id, ...)
         local err = prepare_perform_request()
         if err then
             return nil, err
         end
         return perform_request_impl(timeout, requests, send_buf, buffer,
                                     skip_header, method, on_push, on_push_ctx,
-                                    format, ...)
+                                    format, stream_id, ...)
     end
 
     -- PROTOCOL STATE MACHINE (WORKER FIBER) --
@@ -487,6 +488,37 @@ local function remote_serialize(self)
     }
 end
 
+local function stream_serialize(self)
+    return {
+        host = self._conn.host,
+        port = self._conn.port,
+        opts = next(self._conn.opts) and self._conn.opts,
+        state = self._conn.state,
+        error = self._conn.error,
+        protocol = self._conn.protocol,
+        schema_version = self._conn.schema_version,
+        peer_uuid = self._conn.peer_uuid,
+        peer_version_id = self._conn.peer_version_id,
+        stream_id = self._stream_id
+    }
+end
+
+local function stream_spaces_serialize(self)
+    return self._stream._conn.space
+end
+
+local function stream_space_serialize(self)
+    return self._src
+end
+
+local function stream_indexes_serialize(self)
+    return self._space._src.index
+end
+
+local function stream_index_serialize(self)
+    return self._src
+end
+
 local remote_methods = {}
 local remote_mt = {
     __index = remote_methods, __serialize = remote_serialize,
@@ -499,6 +531,86 @@ local console_mt = {
     __metatable = false
 }
 
+-- Create stream space index, which is same as connection space
+-- index, but have non zero stream ID.
+local function stream_wrap_index(stream_id, src)
+    return setmetatable({
+        _stream_id = stream_id,
+        _src = src,
+    }, {
+        __index = src,
+        __serialize = stream_index_serialize
+    })
+end
+
+-- Metatable for stream space indexes. When stream space being
+-- created there are no indexes in it. When accessing the space
+-- index, we look for corresponding space index in corresponding
+-- connection space. If it is found we create same index for the
+-- stream space but with corresponding stream ID. We do not need
+-- to compare stream _schema_version and connection schema_version,
+-- because all access to index  is carried out through it's space.
+-- So we update schema_version when we access space.
+local stream_indexes_mt = {
+    __index = function(self, key)
+        local _space = self._space
+        local src = _space._src.index[key]
+        if not src then
+            return nil
+        end
+        local res = stream_wrap_index(_space._stream_id, src)
+        self[key] = res
+        return res
+    end,
+    __serialize = stream_indexes_serialize
+}
+
+-- Create stream space, which is same as connection space,
+-- but have non zero stream ID.
+local function stream_wrap_space(stream, src)
+    local res = setmetatable({
+        _stream_id = stream._stream_id,
+        _src = src,
+        index = setmetatable({
+            _space = nil,
+        }, stream_indexes_mt)
+    }, {
+        __index = src,
+        __serialize = stream_space_serialize
+    })
+    res.index._space = res
+    return res
+end
+
+-- Metatable for stream spaces. When stream being created there
+-- are no spaces in it. When user try to access some space in
+-- stream, we first of all compare _schema_version of stream with
+-- schema_version from connection and if they are not equal, we
+-- clear stream space cache and update it's schema_version. Then
+-- we look for corresponding space in the connection. If it is
+-- found we create same space for the stream but with corresponding
+-- stream ID.
+local stream_spaces_mt = {
+    __index = function(self, key)
+        local stream = self._stream
+        if stream._schema_version ~= stream._conn.schema_version then
+            stream._schema_version = stream._conn.schema_version
+            self._stream_space_cache = {}
+        end
+        if self._stream_space_cache[key] then
+            return self._stream_space_cache[key]
+        end
+        local src = stream._conn.space[key]
+        if not src then
+            return nil
+        end
+        local res = stream_wrap_space(stream, src)
+        self._stream_space_cache[key] = res
+        return res
+    end,
+    __serialize = stream_spaces_serialize
+}
+
 local space_metatable, index_metatable
 
 local function new_sm(host, port, opts, connection, greeting)
@@ -578,6 +690,8 @@ local function new_sm(host, port, opts, connection, greeting)
     if opts.wait_connected ~= false then
         remote._transport.wait_state('active', tonumber(opts.wait_connected))
     end
+    -- Last stream ID used for this connection
+    remote._last_stream_id = 0
     return remote
 end
 
@@ -635,6 +749,28 @@ local function check_eval_args(args)
     end
 end
 
+local function new_stream(stream)
+    check_remote_arg(stream, 'new_stream')
+    return stream._conn:new_stream()
+end
+
+function remote_methods:new_stream()
+    check_remote_arg(self, 'new_stream')
+    self._last_stream_id = self._last_stream_id + 1
+    local stream = setmetatable({
+        new_stream = new_stream,
+        _stream_id = self._last_stream_id,
+        space = setmetatable({
+            _stream_space_cache = {},
+            _stream = nil,
+        }, stream_spaces_mt),
+        _conn = self,
+        _schema_version = self.schema_version,
+    }, { __index = self, __serialize = stream_serialize })
+    stream.space._stream = stream
+    return stream
+end
+
 function remote_methods:close()
     check_remote_arg(self, 'close')
     self._transport.stop()
@@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout)
     return self._transport.wait_state('active', timeout)
 end
 
-function remote_methods:_request(method, opts, format, ...)
+function remote_methods:_request(method, opts, format, stream_id, ...)
     local transport = self._transport
     local on_push, on_push_ctx, buffer, skip_header, deadline
     -- Extract options, set defaults, check if the request is
@@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...)
             local res, err =
                 transport.perform_async_request(buffer, skip_header, method,
                                                 table.insert, {}, format,
-                                                ...)
+                                                stream_id, ...)
             if err then
                 box.error(err)
             end
@@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...)
     end
     local res, err = transport.perform_request(timeout, buffer, skip_header,
                                                method, on_push, on_push_ctx,
-                                               format, ...)
+                                               format, stream_id, ...)
     if err then
         box.error(err)
     end
@@ -718,7 +854,7 @@ end
 
 function remote_methods:ping(opts)
     check_remote_arg(self, 'ping')
-    return (pcall(self._request, self, M_PING, opts))
+    return (pcall(self._request, self, M_PING, opts, nil, self._stream_id))
 end
 
 function remote_methods:reload_schema()
@@ -729,14 +865,16 @@ end
 -- @deprecated since 1.7.4
 function remote_methods:call_16(func_name, ...)
     check_remote_arg(self, 'call')
-    return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...}))
+    return (self:_request(M_CALL_16, nil, nil, self._stream_id,
+                          tostring(func_name), {...}))
 end
 
 function remote_methods:call(func_name, args, opts)
     check_remote_arg(self, 'call')
     check_call_args(args)
     args = args or {}
-    local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args)
+    local res = self:_request(M_CALL_17, opts, nil, self._stream_id,
+                              tostring(func_name), args)
     if type(res) ~= 'table' or opts and opts.is_async then
         return res
     end
@@ -746,14 +884,15 @@ end
 -- @deprecated since 1.7.4
 function remote_methods:eval_16(code, ...)
     check_remote_arg(self, 'eval')
-    return unpack((self:_request(M_EVAL, nil, nil, code, {...})))
+    return unpack((self:_request(M_EVAL, nil, nil, self._stream_id,
+                                 code, {...})))
 end
 
 function remote_methods:eval(code, args, opts)
     check_remote_arg(self, 'eval')
     check_eval_args(args)
     args = args or {}
-    local res = self:_request(M_EVAL, opts, nil, code, args)
+    local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args)
     if type(res) ~= 'table' or opts and opts.is_async then
         return res
     end
@@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts)
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "execute", "options")
     end
-    return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {},
-                         sql_opts or {})
+    return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id,
+                         query, parameters or {}, sql_opts or {})
 end
 
 function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args
@@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "prepare", "options")
     end
-    return self:_request(M_PREPARE, netbox_opts, nil, query)
+    return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query)
 end
 
 function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
@@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
     if sql_opts ~= nil then
         box.error(box.error.UNSUPPORTED, "unprepare", "options")
     end
-    return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {},
-                         sql_opts or {})
+    return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id,
+                         query, parameters or {}, sql_opts or {})
 end
 
 function remote_methods:wait_state(state, timeout)
@@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout)
     end
     if self.protocol == 'Binary' then
         local loader = 'return require("console").eval(...)'
-        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader,
+        res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader,
                       {line})
     else
         assert(self.protocol == 'Lua console')
-        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil,
+        res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil,
                       line..'$EOF$\n')
     end
     if err then
@@ -951,14 +1090,14 @@ space_metatable = function(remote)
 
     function methods:insert(tuple, opts)
         check_space_arg(self, 'insert')
-        return remote:_request(M_INSERT, opts, self._format_cdata, self.id,
-                               tuple)
+        return remote:_request(M_INSERT, opts, self._format_cdata,
+                               self._stream_id, self.id, tuple)
     end
 
     function methods:replace(tuple, opts)
         check_space_arg(self, 'replace')
-        return remote:_request(M_REPLACE, opts, self._format_cdata, self.id,
-                               tuple)
+        return remote:_request(M_REPLACE, opts, self._format_cdata,
+                               self._stream_id, self.id, tuple)
     end
 
     function methods:select(key, opts)
@@ -978,7 +1117,8 @@ space_metatable = function(remote)
 
     function methods:upsert(key, oplist, opts)
         check_space_arg(self, 'upsert')
-        return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id,
+        return nothing_or_data(remote:_request(M_UPSERT, opts, nil,
+                                               self._stream_id, self.id,
                                                key, oplist))
     end
 
@@ -1009,8 +1149,8 @@ index_metatable = function(remote)
         local offset = tonumber(opts and opts.offset) or 0
         local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
         return (remote:_request(M_SELECT, opts, self.space._format_cdata,
-                                self.space.id, self.id, iterator, offset,
-                                limit, key))
+                                self._stream_id, self.space.id, self.id,
+                                iterator, offset, limit, key))
     end
 
     function methods:get(key, opts)
@@ -1020,6 +1160,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_GET, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.EQ, 0, 2, key))
     end
@@ -1031,6 +1172,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_MIN, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.GE, 0, 1, key))
     end
@@ -1042,6 +1184,7 @@ index_metatable = function(remote)
         end
         return nothing_or_data(remote:_request(M_MAX, opts,
                                                self.space._format_cdata,
+                                               self._stream_id,
                                                self.space.id, self.id,
                                                box.index.LE, 0, 1, key))
     end
@@ -1053,22 +1196,24 @@ index_metatable = function(remote)
         end
         local code = string.format('box.space.%s.index.%s:count',
                                    self.space.name, self.name)
-        return remote:_request(M_COUNT, opts, nil, code, { key, opts })
+        return remote:_request(M_COUNT, opts, nil, self._stream_id,
+                               code, { key, opts })
     end
 
     function methods:delete(key, opts)
         check_index_arg(self, 'delete')
         return nothing_or_data(remote:_request(M_DELETE, opts,
                                                self.space._format_cdata,
-                                               self.space.id, self.id, key))
+                                               self._stream_id, self.space.id,
+                                               self.id, key))
     end
 
     function methods:update(key, oplist, opts)
         check_index_arg(self, 'update')
         return nothing_or_data(remote:_request(M_UPDATE, opts,
                                                self.space._format_cdata,
-                                               self.space.id, self.id, key,
-                                               oplist))
+                                               self._stream_id, self.space.id,
+                                               self.id, key, oplist))
     end
 
     return { __index = methods, __metatable = false }
diff --git a/test/box/access.result b/test/box/access.result
index 712cd68f8..6434da907 100644
--- a/test/box/access.result
+++ b/test/box/access.result
@@ -908,15 +908,15 @@ LISTEN = require('uri').parse(box.cfg.listen)
 c = net.connect(LISTEN.host, LISTEN.service)
 ---
 ...
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '1' does not exist
 ...
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '65537' does not exist
 ...
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 ---
 - error: Space '4294967295' does not exist
 ...
diff --git a/test/box/access.test.lua b/test/box/access.test.lua
index 6060475d1..6abdb780d 100644
--- a/test/box/access.test.lua
+++ b/test/box/access.test.lua
@@ -351,9 +351,9 @@ box.schema.func.drop(name)
 -- very large space id, no crash occurs.
 LISTEN = require('uri').parse(box.cfg.listen)
 c = net.connect(LISTEN.host, LISTEN.service)
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
 c:close()
 
 session = box.session
diff --git a/test/box/net.box_console_connections_gh-2677.result b/test/box/net.box_console_connections_gh-2677.result
index f45aa0b56..7cea0a1da 100644
--- a/test/box/net.box_console_connections_gh-2677.result
+++ b/test/box/net.box_console_connections_gh-2677.result
@@ -74,7 +74,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
 ---
 ...
 while not c:is_connected() do fiber.sleep(0.01) end
diff --git a/test/box/net.box_console_connections_gh-2677.test.lua b/test/box/net.box_console_connections_gh-2677.test.lua
index 40d099e70..6c4e6ea4f 100644
--- a/test/box/net.box_console_connections_gh-2677.test.lua
+++ b/test/box/net.box_console_connections_gh-2677.test.lua
@@ -30,7 +30,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
 while not c:is_connected() do fiber.sleep(0.01) end
 c:ping()
 
diff --git a/test/box/net.box_incorrect_iterator_gh-841.result b/test/box/net.box_incorrect_iterator_gh-841.result
index fbd2a7700..cd2a86787 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.result
+++ b/test/box/net.box_incorrect_iterator_gh-841.result
@@ -16,13 +16,13 @@ test_run:cmd("setopt delimiter ';'")
 - true
 ...
 function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
-    local ret = cn:_request(remote._method.select, opts, nil, space_id,
+    local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
                             index_id, iterator, offset, limit, key)
     return ret
 end
 function x_fatal(cn)
     cn._transport.perform_request(nil, nil, false, remote._method.inject,
-                                  nil, nil, nil, '\x80')
+                                  nil, nil, nil, nil, '\x80')
 end
 test_run:cmd("setopt delimiter ''");
 ---
diff --git a/test/box/net.box_incorrect_iterator_gh-841.test.lua b/test/box/net.box_incorrect_iterator_gh-841.test.lua
index 1d24f9f56..9c42175ef 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.test.lua
+++ b/test/box/net.box_incorrect_iterator_gh-841.test.lua
@@ -5,13 +5,13 @@ test_run:cmd("push filter ".."'\\.lua.*:[0-9]+: ' to '.lua...\"]:<line>: '")
 
 test_run:cmd("setopt delimiter ';'")
 function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
-    local ret = cn:_request(remote._method.select, opts, nil, space_id,
+    local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
                             index_id, iterator, offset, limit, key)
     return ret
 end
 function x_fatal(cn)
     cn._transport.perform_request(nil, nil, false, remote._method.inject,
-                                  nil, nil, nil, '\x80')
+                                  nil, nil, nil, nil, '\x80')
 end
 test_run:cmd("setopt delimiter ''");
 
diff --git a/test/box/net.box_iproto_hangs_gh-3464.result b/test/box/net.box_iproto_hangs_gh-3464.result
index 3b5458c9a..cbf8181b3 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.result
+++ b/test/box/net.box_iproto_hangs_gh-3464.result
@@ -17,7 +17,7 @@ c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
 ---
 ...
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
 ---
 - null
 - Peer closed
diff --git a/test/box/net.box_iproto_hangs_gh-3464.test.lua b/test/box/net.box_iproto_hangs_gh-3464.test.lua
index a7c41ae76..51a9ddece 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.test.lua
+++ b/test/box/net.box_iproto_hangs_gh-3464.test.lua
@@ -8,6 +8,6 @@ net = require('net.box')
 --
 c = net:connect(box.cfg.listen)
 data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
 c:close()
 test_run:grep_log('default', 'too big packet size in the header') ~= nil
diff --git a/test/box/net.box_long-poll_input_gh-3400.result b/test/box/net.box_long-poll_input_gh-3400.result
index a16110ee6..a98eea655 100644
--- a/test/box/net.box_long-poll_input_gh-3400.result
+++ b/test/box/net.box_long-poll_input_gh-3400.result
@@ -24,10 +24,10 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, false,    \
-                 net._method.call_17, nil, nil, nil, 'long', {})   \
-c._transport.perform_request(nil, nil, false, net._method.inject,  \
-                             nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false,         \
+                 net._method.call_17, nil, nil, nil, nil, 'long', {})   \
+c._transport.perform_request(nil, nil, false, net._method.inject,       \
+                             nil, nil, nil, nil, '\x80')
 ---
 ...
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/net.box_long-poll_input_gh-3400.test.lua b/test/box/net.box_long-poll_input_gh-3400.test.lua
index 891b59224..a6f302ee0 100644
--- a/test/box/net.box_long-poll_input_gh-3400.test.lua
+++ b/test/box/net.box_long-poll_input_gh-3400.test.lua
@@ -14,9 +14,9 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, false,    \
-                 net._method.call_17, nil, nil, nil, 'long', {})   \
-c._transport.perform_request(nil, nil, false, net._method.inject,  \
-                             nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false,         \
+                 net._method.call_17, nil, nil, nil, nil, 'long', {})   \
+c._transport.perform_request(nil, nil, false, net._method.inject,       \
+                             nil, nil, nil, nil, '\x80')
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 c:close()
diff --git a/test/box/stream.lua b/test/box/stream.lua
new file mode 100644
index 000000000..db6a29a8a
--- /dev/null
+++ b/test/box/stream.lua
@@ -0,0 +1,13 @@
+#!/usr/bin/env tarantool
+
+require('console').listen(os.getenv('ADMIN'))
+
+local memtx_use_mvcc_engine = (arg[2] and arg[2] == 'true' and true or false)
+
+box.cfg({
+    listen = os.getenv('LISTEN'),
+    iproto_threads = tonumber(arg[1]),
+    memtx_use_mvcc_engine = memtx_use_mvcc_engine
+})
+
+box.schema.user.grant('guest', 'read,write,execute,create,drop', 'universe',  nil, {if_not_exists = true})
diff --git a/test/box/stream.result b/test/box/stream.result
new file mode 100644
index 000000000..03200ecf6
--- /dev/null
+++ b/test/box/stream.result
@@ -0,0 +1,485 @@
+-- test-run result file version 2
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+test_run:cmd("create server test with script='box/stream.lua'")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+ | ---
+ | ...
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+conn_1 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn_1:new_stream()
+ | ---
+ | ...
+conn_2 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_2 = conn_2:new_stream()
+ | ---
+ | ...
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+ | ---
+ | ...
+assert(not stream_1:ping())
+ | ---
+ | - true
+ | ...
+stream_2:close()
+ | ---
+ | ...
+assert(not conn_2:ping())
+ | ---
+ | - true
+ | ...
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+-- The new method `new_stream`, for the stream object, returns a new
+-- stream object, just as in the case of connection.
+_ = stream:new_stream()
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+assert(not conn.space.test)
+ | ---
+ | - true
+ | ...
+assert(not stream.space.test)
+ | ---
+ | - true
+ | ...
+assert(conn.schema_version == stream._schema_version)
+ | ---
+ | - true
+ | ...
+conn:reload_schema()
+ | ---
+ | ...
+assert(conn.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+assert(conn.schema_version ~= stream._schema_version)
+ | ---
+ | - true
+ | ...
+assert(stream.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+ | ---
+ | - true
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+assert(conn.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+assert(stream.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s:drop()
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+conn:reload_schema()
+ | ---
+ | ...
+assert(not conn.space.test)
+ | ---
+ | - true
+ | ...
+assert(not stream.space.test)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- All test works with iproto_thread count = 10
+
+test_run:cmd("start server test with args='10'")
+ | ---
+ | - true
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function replace_with_yeild(item)
+    fiber.sleep(0.1)
+    return s:replace({item})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+conn_space = conn.space.test
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+stream_space = stream.space.test
+ | ---
+ | ...
+
+-- Check that all requests in stream processed consistently
+futures = {}
+ | ---
+ | ...
+replace_count = 3
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+for i = 1, replace_count do
+    futures[string.format("replace_%d", i)] =
+        stream_space:replace({i}, {is_async = true})
+    futures[string.format("select_%d", i)] =
+        stream_space:select({}, {is_async = true})
+end;
+ | ---
+ | ...
+futures["replace_with_yeild_for_stream"] =
+    stream:call("replace_with_yeild",
+                { replace_count + 1 }, {is_async = true});
+ | ---
+ | ...
+futures["select_with_yeild_for_stream"] =
+    stream_space:select({}, {is_async = true});
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+results = wait_and_return_results(futures)
+ | ---
+ | ...
+-- [1]
+assert(results["select_1"])
+ | ---
+ | - - [1]
+ | ...
+-- [1] [2]
+assert(results["select_2"])
+ | ---
+ | - - [1]
+ |   - [2]
+ | ...
+-- [1] [2] [3]
+assert(results["select_3"])
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ | ...
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ | ...
+
+-- There is no request execution order for the connection
+futures = {}
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+futures["replace_with_yeild_for_connection"] =
+    conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+ | ---
+ | ...
+futures["select_with_yeild_for_connection"] =
+    conn_space:select({}, {is_async = true});
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+results = wait_and_return_results(futures)
+ | ---
+ | ...
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- [1] [2] [3] [4] [5]
+s:select()
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ |   - [5]
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+space = stream.space.test
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+replace_count = 20
+for i = 1, replace_count do
+    space:replace({i}, {is_async = true})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+-- Give time to send
+fiber.sleep(0)
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+ | ---
+ | - - [1]
+ |   - [2]
+ |   - [3]
+ |   - [4]
+ |   - [5]
+ |   - [6]
+ |   - [7]
+ |   - [8]
+ |   - [9]
+ |   - [10]
+ |   - [11]
+ |   - [12]
+ |   - [13]
+ |   - [14]
+ |   - [15]
+ |   - [16]
+ |   - [17]
+ |   - [18]
+ |   - [19]
+ |   - [20]
+ | ...
+s:drop()
+ | ---
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("cleanup server test")
+ | ---
+ | - true
+ | ...
+test_run:cmd("delete server test")
+ | ---
+ | - true
+ | ...
diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua
new file mode 100644
index 000000000..72129a228
--- /dev/null
+++ b/test/box/stream.test.lua
@@ -0,0 +1,182 @@
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+fiber = require('fiber')
+test_run = require('test_run').new()
+
+test_run:cmd("create server test with script='box/stream.lua'")
+
+test_run:cmd("setopt delimiter ';'")
+function get_current_connection_count()
+    local total_net_stat_table =
+        test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+    assert(total_net_stat_table)
+    local connection_stat_table = total_net_stat_table.CONNECTIONS
+    assert(connection_stat_table)
+    return connection_stat_table.current
+end;
+function wait_and_return_results(futures)
+    local results = {}
+    for name, future in pairs(futures) do
+        local err
+        results[name], err = future:wait_result()
+        if err then
+            results[name] = err
+        end
+    end
+    return results
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn_1 = net_box.connect(server_addr)
+stream_1 = conn_1:new_stream()
+conn_2 = net_box.connect(server_addr)
+stream_2 = conn_2:new_stream()
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+assert(not stream_1:ping())
+stream_2:close()
+assert(not conn_2:ping())
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+-- The new method `new_stream`, for the stream object, returns a new
+-- stream object, just as in the case of connection.
+_ = stream:new_stream()
+conn:close()
+
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+test_run:switch("test")
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch("default")
+assert(not conn.space.test)
+assert(not stream.space.test)
+assert(conn.schema_version == stream._schema_version)
+conn:reload_schema()
+assert(conn.space.test ~= nil)
+assert(conn.schema_version ~= stream._schema_version)
+assert(stream.space.test ~= nil)
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+collectgarbage()
+collectgarbage()
+assert(conn.space.test ~= nil)
+assert(stream.space.test ~= nil)
+test_run:switch("test")
+s:drop()
+test_run:switch("default")
+conn:reload_schema()
+assert(not conn.space.test)
+assert(not stream.space.test)
+test_run:cmd("stop server test")
+
+-- All test works with iproto_thread count = 10
+
+test_run:cmd("start server test with args='10'")
+test_run:switch('test')
+fiber = require('fiber')
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:cmd("setopt delimiter ';'")
+function replace_with_yeild(item)
+    fiber.sleep(0.1)
+    return s:replace({item})
+end;
+test_run:cmd("setopt delimiter ''");
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+conn_space = conn.space.test
+stream = conn:new_stream()
+stream_space = stream.space.test
+
+-- Check that all requests in stream processed consistently
+futures = {}
+replace_count = 3
+test_run:cmd("setopt delimiter ';'")
+for i = 1, replace_count do
+    futures[string.format("replace_%d", i)] =
+        stream_space:replace({i}, {is_async = true})
+    futures[string.format("select_%d", i)] =
+        stream_space:select({}, {is_async = true})
+end;
+futures["replace_with_yeild_for_stream"] =
+    stream:call("replace_with_yeild",
+                { replace_count + 1 }, {is_async = true});
+futures["select_with_yeild_for_stream"] =
+    stream_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1]
+assert(results["select_1"])
+-- [1] [2]
+assert(results["select_2"])
+-- [1] [2] [3]
+assert(results["select_3"])
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+
+-- There is no request execution order for the connection
+futures = {}
+test_run:cmd("setopt delimiter ';'")
+futures["replace_with_yeild_for_connection"] =
+    conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+futures["select_with_yeild_for_connection"] =
+    conn_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+test_run:switch("test")
+-- [1] [2] [3] [4] [5]
+s:select()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+space = stream.space.test
+test_run:cmd("setopt delimiter ';'")
+replace_count = 20
+for i = 1, replace_count do
+    space:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+-- Give time to send
+fiber.sleep(0)
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:switch("test")
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:cmd("stop server test")
+
+test_run:cmd("cleanup server test")
+test_run:cmd("delete server test")
diff --git a/test/box/suite.ini b/test/box/suite.ini
index b5d869fb3..94cf7811f 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -5,7 +5,7 @@ script = box.lua
 disabled = rtree_errinj.test.lua tuple_bench.test.lua
 long_run = huge_field_map_long.test.lua
 config = engine.cfg
-release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua
+release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua stream.test.lua
 lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua
 use_unix_sockets = True
 use_unix_sockets_iproto = True
-- 
2.20.1



More information about the Tarantool-patches mailing list