[Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box
mechanik20051988
mechanik20051988 at tarantool.org
Wed Aug 11 11:56:56 MSK 2021
From: mechanik20051988 <mechanik20.05.1988 at gmail.com>
Add stream support to `net.box`. In "net.box", stream
is an object over connection that has the same methods,
but all requests from it sends with non-zero stream ID.
Since there can be a lot of streams, we do not copy the
spaces from the connection to the stream immediately when
creating a stream, but do it only when we first access space.
Also, when updating the schema, we update the spaces in lazy
mode: each stream has it's own schema_version, when there is
some access to stream space we compare stream schema_version
and connection schema_version and if they are different update
clear stream space cache and wrap space that is being accessed
to stream cache.
Part of #5860
@TarantoolBot document
Title: stream support was added to net.box
In "net.box", stream is an object over connection that
has the same methods, but all requests from it sends
with non-zero stream ID. Stream ID is generated on the
client automatically. Simple example of stream creation
using net.box:
```lua
stream = conn:new_stream()
-- all connection methods are valid, but send requests
-- with non zero stream_id.
```
---
src/box/lua/net_box.c | 95 ++--
src/box/lua/net_box.lua | 205 ++++++--
test/box/access.result | 6 +-
test/box/access.test.lua | 6 +-
...net.box_console_connections_gh-2677.result | 2 +-
...t.box_console_connections_gh-2677.test.lua | 2 +-
.../net.box_incorrect_iterator_gh-841.result | 4 +-
...net.box_incorrect_iterator_gh-841.test.lua | 4 +-
test/box/net.box_iproto_hangs_gh-3464.result | 2 +-
.../box/net.box_iproto_hangs_gh-3464.test.lua | 2 +-
.../net.box_long-poll_input_gh-3400.result | 8 +-
.../net.box_long-poll_input_gh-3400.test.lua | 8 +-
test/box/stream.lua | 13 +
test/box/stream.result | 485 ++++++++++++++++++
test/box/stream.test.lua | 182 +++++++
test/box/suite.ini | 2 +-
16 files changed, 934 insertions(+), 92 deletions(-)
create mode 100644 test/box/stream.lua
create mode 100644 test/box/stream.result
create mode 100644 test/box/stream.test.lua
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 06e574cdf..3bc49af23 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -328,7 +328,7 @@ netbox_registry_reset(struct netbox_registry *registry, struct error *error)
static inline size_t
netbox_begin_encode(struct mpstream *stream, uint64_t sync,
- enum iproto_type type)
+ enum iproto_type type, uint64_t stream_id)
{
/* Remember initial size of ibuf (see netbox_end_encode()) */
struct ibuf *ibuf = stream->ctx;
@@ -340,7 +340,7 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync,
mpstream_advance(stream, fixheader_size);
/* encode header */
- mpstream_encode_map(stream, 2);
+ mpstream_encode_map(stream, stream_id != 0 ? 3 : 2);
mpstream_encode_uint(stream, IPROTO_SYNC);
mpstream_encode_uint(stream, sync);
@@ -348,6 +348,10 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync,
mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE);
mpstream_encode_uint(stream, type);
+ if (stream_id != 0) {
+ mpstream_encode_uint(stream, IPROTO_STREAM_ID);
+ mpstream_encode_uint(stream, stream_id);
+ }
/* Caller should remember how many bytes was used in ibuf */
return used;
}
@@ -380,11 +384,11 @@ netbox_end_encode(struct mpstream *stream, size_t initial_size)
static void
netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
(void)L;
(void)idx;
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING, stream_id);
netbox_end_encode(stream, svp);
}
@@ -402,7 +406,7 @@ netbox_encode_auth(struct ibuf *ibuf, uint64_t sync,
struct mpstream stream;
mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
mpstream_error_handler, &is_error);
- size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH);
+ size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH, 0);
/* Adapted from xrow_encode_auth() */
mpstream_encode_map(&stream, password != NULL ? 2 : 1);
@@ -432,7 +436,7 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id)
struct mpstream stream;
mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
mpstream_error_handler, &is_error);
- size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT);
+ size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT, 0);
mpstream_encode_map(&stream, 3);
mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
mpstream_encode_uint(&stream, space_id);
@@ -446,10 +450,10 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id)
static void
netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync, enum iproto_type type)
+ uint64_t sync, enum iproto_type type, uint64_t stream_id)
{
/* Lua stack at idx: function_name, args */
- size_t svp = netbox_begin_encode(stream, sync, type);
+ size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
mpstream_encode_map(stream, 2);
@@ -468,24 +472,25 @@ netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_call_16(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
- netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL_16);
+ netbox_encode_call_impl(L, idx, stream, sync,
+ IPROTO_CALL_16, stream_id);
}
static void
netbox_encode_call(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
- netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL);
+ netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL, stream_id);
}
static void
netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: expr, args */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL, stream_id);
mpstream_encode_map(stream, 2);
@@ -504,10 +509,11 @@ netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: space_id, index_id, iterator, offset, limit, key */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT,
+ stream_id);
mpstream_encode_map(stream, 6);
@@ -546,10 +552,11 @@ netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync, enum iproto_type type)
+ uint64_t sync, enum iproto_type type,
+ uint64_t stream_id)
{
/* Lua stack at idx: space_id, tuple */
- size_t svp = netbox_begin_encode(stream, sync, type);
+ size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
mpstream_encode_map(stream, 2);
@@ -567,24 +574,27 @@ netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_insert(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
- netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_INSERT);
+ netbox_encode_insert_or_replace(L, idx, stream, sync,
+ IPROTO_INSERT, stream_id);
}
static void
netbox_encode_replace(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
- netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_REPLACE);
+ netbox_encode_insert_or_replace(L, idx, stream, sync,
+ IPROTO_REPLACE, stream_id);
}
static void
netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: space_id, index_id, key */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE,
+ stream_id);
mpstream_encode_map(stream, 3);
@@ -607,10 +617,11 @@ netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: space_id, index_id, key, ops */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE,
+ stream_id);
mpstream_encode_map(stream, 5);
@@ -641,10 +652,11 @@ netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_upsert(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: space_id, tuple, ops */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT,
+ stream_id);
mpstream_encode_map(stream, 4);
@@ -844,10 +856,11 @@ netbox_send_and_recv_console(int fd, struct ibuf *send_buf,
static void
netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: query, parameters, options */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE,
+ stream_id);
mpstream_encode_map(stream, 3);
@@ -873,10 +886,11 @@ netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: query */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE,
+ stream_id);
mpstream_encode_map(stream, 1);
@@ -896,18 +910,19 @@ netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: query, parameters, options */
- netbox_encode_prepare(L, idx, stream, sync);
+ netbox_encode_prepare(L, idx, stream, sync, stream_id);
}
static void
netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: bytes */
(void)sync;
+ (void)stream_id;
size_t len;
const char *data = lua_tolstring(L, idx, &len);
mpstream_memcpy(stream, data, len);
@@ -921,11 +936,11 @@ netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
*/
static int
netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
- struct ibuf *ibuf, uint64_t sync)
+ struct ibuf *ibuf, uint64_t sync, uint64_t stream_id)
{
typedef void (*method_encoder_f)(struct lua_State *L, int idx,
struct mpstream *stream,
- uint64_t sync);
+ uint64_t sync, uint64_t stream_id);
static method_encoder_f method_encoder[] = {
[NETBOX_PING] = netbox_encode_ping,
[NETBOX_CALL_16] = netbox_encode_call_16,
@@ -949,7 +964,7 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
struct mpstream stream;
mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
luamp_error, L);
- method_encoder[method](L, idx, &stream, sync);
+ method_encoder[method](L, idx, &stream, sync, stream_id);
return 0;
}
@@ -1569,6 +1584,7 @@ netbox_new_registry(struct lua_State *L)
* - on_push: on_push trigger function
* - on_push_ctx: on_push trigger function argument
* - format: tuple format to use for decoding the body or nil
+ * - stream_id: determines whether or not the request belongs to stream
* - ...: method-specific arguments passed to the encoder
*/
static void
@@ -1581,7 +1597,8 @@ netbox_make_request(struct lua_State *L, int idx,
enum netbox_method method = lua_tointeger(L, idx + 4);
assert(method < netbox_method_MAX);
uint64_t sync = registry->next_sync++;
- netbox_encode_method(L, idx + 8, method, send_buf, sync);
+ uint64_t stream_id = luaL_touint64(L, idx + 8);
+ netbox_encode_method(L, idx + 9, method, send_buf, sync, stream_id);
/* Initialize and register the request object. */
request->method = method;
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 8f5671c15..3dffc245f 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -275,14 +275,14 @@ local function create_transport(host, port, user, password, callback,
-- @retval not nil Future object.
--
local function perform_async_request(buffer, skip_header, method, on_push,
- on_push_ctx, format, ...)
+ on_push_ctx, format, stream_id, ...)
local err = prepare_perform_request()
if err then
return nil, err
end
return perform_async_request_impl(requests, send_buf, buffer,
skip_header, method, on_push,
- on_push_ctx, format, ...)
+ on_push_ctx, format, stream_id, ...)
end
--
@@ -291,14 +291,15 @@ local function create_transport(host, port, user, password, callback,
-- @retval not nil Response object.
--
local function perform_request(timeout, buffer, skip_header, method,
- on_push, on_push_ctx, format, ...)
+ on_push, on_push_ctx, format,
+ stream_id, ...)
local err = prepare_perform_request()
if err then
return nil, err
end
return perform_request_impl(timeout, requests, send_buf, buffer,
skip_header, method, on_push, on_push_ctx,
- format, ...)
+ format, stream_id, ...)
end
-- PROTOCOL STATE MACHINE (WORKER FIBER) --
@@ -487,6 +488,37 @@ local function remote_serialize(self)
}
end
+local function stream_serialize(self)
+ return {
+ host = self._conn.host,
+ port = self._conn.port,
+ opts = next(self._conn.opts) and self._conn.opts,
+ state = self._conn.state,
+ error = self._conn.error,
+ protocol = self._conn.protocol,
+ schema_version = self._conn.schema_version,
+ peer_uuid = self._conn.peer_uuid,
+ peer_version_id = self._conn.peer_version_id,
+ stream_id = self._stream_id
+ }
+end
+
+local function stream_spaces_serialize(self)
+ return self._stream._conn.space
+end
+
+local function stream_space_serialize(self)
+ return self._src
+end
+
+local function stream_indexes_serialize(self)
+ return self._space._src.index
+end
+
+local function stream_index_serialize(self)
+ return self._src
+end
+
local remote_methods = {}
local remote_mt = {
__index = remote_methods, __serialize = remote_serialize,
@@ -499,6 +531,86 @@ local console_mt = {
__metatable = false
}
+-- Create stream space index, which is same as connection space
+-- index, but have non zero stream ID.
+local function stream_wrap_index(stream_id, src)
+ return setmetatable({
+ _stream_id = stream_id,
+ _src = src,
+ }, {
+ __index = src,
+ __serialize = stream_index_serialize
+ })
+end
+
+-- Metatable for stream space indexes. When stream space being
+-- created there are no indexes in it. When accessing the space
+-- index, we look for corresponding space index in corresponding
+-- connection space. If it is found we create same index for the
+-- stream space but with corresponding stream ID. We do not need
+-- to compare stream _schema_version and connection schema_version,
+-- because all access to index is carried out through it's space.
+-- So we update schema_version when we access space.
+local stream_indexes_mt = {
+ __index = function(self, key)
+ local _space = self._space
+ local src = _space._src.index[key]
+ if not src then
+ return nil
+ end
+ local res = stream_wrap_index(_space._stream_id, src)
+ self[key] = res
+ return res
+ end,
+ __serialize = stream_indexes_serialize
+}
+
+-- Create stream space, which is same as connection space,
+-- but have non zero stream ID.
+local function stream_wrap_space(stream, src)
+ local res = setmetatable({
+ _stream_id = stream._stream_id,
+ _src = src,
+ index = setmetatable({
+ _space = nil,
+ }, stream_indexes_mt)
+ }, {
+ __index = src,
+ __serialize = stream_space_serialize
+ })
+ res.index._space = res
+ return res
+end
+
+-- Metatable for stream spaces. When stream being created there
+-- are no spaces in it. When user try to access some space in
+-- stream, we first of all compare _schema_version of stream with
+-- schema_version from connection and if they are not equal, we
+-- clear stream space cache and update it's schema_version. Then
+-- we look for corresponding space in the connection. If it is
+-- found we create same space for the stream but with corresponding
+-- stream ID.
+local stream_spaces_mt = {
+ __index = function(self, key)
+ local stream = self._stream
+ if stream._schema_version ~= stream._conn.schema_version then
+ stream._schema_version = stream._conn.schema_version
+ self._stream_space_cache = {}
+ end
+ if self._stream_space_cache[key] then
+ return self._stream_space_cache[key]
+ end
+ local src = stream._conn.space[key]
+ if not src then
+ return nil
+ end
+ local res = stream_wrap_space(stream, src)
+ self._stream_space_cache[key] = res
+ return res
+ end,
+ __serialize = stream_spaces_serialize
+}
+
local space_metatable, index_metatable
local function new_sm(host, port, opts, connection, greeting)
@@ -578,6 +690,8 @@ local function new_sm(host, port, opts, connection, greeting)
if opts.wait_connected ~= false then
remote._transport.wait_state('active', tonumber(opts.wait_connected))
end
+ -- Last stream ID used for this connection
+ remote._last_stream_id = 0
return remote
end
@@ -635,6 +749,28 @@ local function check_eval_args(args)
end
end
+local function new_stream(stream)
+ check_remote_arg(stream, 'new_stream')
+ return stream._conn:new_stream()
+end
+
+function remote_methods:new_stream()
+ check_remote_arg(self, 'new_stream')
+ self._last_stream_id = self._last_stream_id + 1
+ local stream = setmetatable({
+ new_stream = new_stream,
+ _stream_id = self._last_stream_id,
+ space = setmetatable({
+ _stream_space_cache = {},
+ _stream = nil,
+ }, stream_spaces_mt),
+ _conn = self,
+ _schema_version = self.schema_version,
+ }, { __index = self, __serialize = stream_serialize })
+ stream.space._stream = stream
+ return stream
+end
+
function remote_methods:close()
check_remote_arg(self, 'close')
self._transport.stop()
@@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout)
return self._transport.wait_state('active', timeout)
end
-function remote_methods:_request(method, opts, format, ...)
+function remote_methods:_request(method, opts, format, stream_id, ...)
local transport = self._transport
local on_push, on_push_ctx, buffer, skip_header, deadline
-- Extract options, set defaults, check if the request is
@@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...)
local res, err =
transport.perform_async_request(buffer, skip_header, method,
table.insert, {}, format,
- ...)
+ stream_id, ...)
if err then
box.error(err)
end
@@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...)
end
local res, err = transport.perform_request(timeout, buffer, skip_header,
method, on_push, on_push_ctx,
- format, ...)
+ format, stream_id, ...)
if err then
box.error(err)
end
@@ -718,7 +854,7 @@ end
function remote_methods:ping(opts)
check_remote_arg(self, 'ping')
- return (pcall(self._request, self, M_PING, opts))
+ return (pcall(self._request, self, M_PING, opts, nil, self._stream_id))
end
function remote_methods:reload_schema()
@@ -729,14 +865,16 @@ end
-- @deprecated since 1.7.4
function remote_methods:call_16(func_name, ...)
check_remote_arg(self, 'call')
- return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...}))
+ return (self:_request(M_CALL_16, nil, nil, self._stream_id,
+ tostring(func_name), {...}))
end
function remote_methods:call(func_name, args, opts)
check_remote_arg(self, 'call')
check_call_args(args)
args = args or {}
- local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args)
+ local res = self:_request(M_CALL_17, opts, nil, self._stream_id,
+ tostring(func_name), args)
if type(res) ~= 'table' or opts and opts.is_async then
return res
end
@@ -746,14 +884,15 @@ end
-- @deprecated since 1.7.4
function remote_methods:eval_16(code, ...)
check_remote_arg(self, 'eval')
- return unpack((self:_request(M_EVAL, nil, nil, code, {...})))
+ return unpack((self:_request(M_EVAL, nil, nil, self._stream_id,
+ code, {...})))
end
function remote_methods:eval(code, args, opts)
check_remote_arg(self, 'eval')
check_eval_args(args)
args = args or {}
- local res = self:_request(M_EVAL, opts, nil, code, args)
+ local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args)
if type(res) ~= 'table' or opts and opts.is_async then
return res
end
@@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts)
if sql_opts ~= nil then
box.error(box.error.UNSUPPORTED, "execute", "options")
end
- return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {},
- sql_opts or {})
+ return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id,
+ query, parameters or {}, sql_opts or {})
end
function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args
@@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua
if sql_opts ~= nil then
box.error(box.error.UNSUPPORTED, "prepare", "options")
end
- return self:_request(M_PREPARE, netbox_opts, nil, query)
+ return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query)
end
function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
@@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
if sql_opts ~= nil then
box.error(box.error.UNSUPPORTED, "unprepare", "options")
end
- return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {},
- sql_opts or {})
+ return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id,
+ query, parameters or {}, sql_opts or {})
end
function remote_methods:wait_state(state, timeout)
@@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader,
+ res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader,
{line})
else
assert(self.protocol == 'Lua console')
- res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil,
+ res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil,
line..'$EOF$\n')
end
if err then
@@ -951,14 +1090,14 @@ space_metatable = function(remote)
function methods:insert(tuple, opts)
check_space_arg(self, 'insert')
- return remote:_request(M_INSERT, opts, self._format_cdata, self.id,
- tuple)
+ return remote:_request(M_INSERT, opts, self._format_cdata,
+ self._stream_id, self.id, tuple)
end
function methods:replace(tuple, opts)
check_space_arg(self, 'replace')
- return remote:_request(M_REPLACE, opts, self._format_cdata, self.id,
- tuple)
+ return remote:_request(M_REPLACE, opts, self._format_cdata,
+ self._stream_id, self.id, tuple)
end
function methods:select(key, opts)
@@ -978,7 +1117,8 @@ space_metatable = function(remote)
function methods:upsert(key, oplist, opts)
check_space_arg(self, 'upsert')
- return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id,
+ return nothing_or_data(remote:_request(M_UPSERT, opts, nil,
+ self._stream_id, self.id,
key, oplist))
end
@@ -1009,8 +1149,8 @@ index_metatable = function(remote)
local offset = tonumber(opts and opts.offset) or 0
local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
return (remote:_request(M_SELECT, opts, self.space._format_cdata,
- self.space.id, self.id, iterator, offset,
- limit, key))
+ self._stream_id, self.space.id, self.id,
+ iterator, offset, limit, key))
end
function methods:get(key, opts)
@@ -1020,6 +1160,7 @@ index_metatable = function(remote)
end
return nothing_or_data(remote:_request(M_GET, opts,
self.space._format_cdata,
+ self._stream_id,
self.space.id, self.id,
box.index.EQ, 0, 2, key))
end
@@ -1031,6 +1172,7 @@ index_metatable = function(remote)
end
return nothing_or_data(remote:_request(M_MIN, opts,
self.space._format_cdata,
+ self._stream_id,
self.space.id, self.id,
box.index.GE, 0, 1, key))
end
@@ -1042,6 +1184,7 @@ index_metatable = function(remote)
end
return nothing_or_data(remote:_request(M_MAX, opts,
self.space._format_cdata,
+ self._stream_id,
self.space.id, self.id,
box.index.LE, 0, 1, key))
end
@@ -1053,22 +1196,24 @@ index_metatable = function(remote)
end
local code = string.format('box.space.%s.index.%s:count',
self.space.name, self.name)
- return remote:_request(M_COUNT, opts, nil, code, { key, opts })
+ return remote:_request(M_COUNT, opts, nil, self._stream_id,
+ code, { key, opts })
end
function methods:delete(key, opts)
check_index_arg(self, 'delete')
return nothing_or_data(remote:_request(M_DELETE, opts,
self.space._format_cdata,
- self.space.id, self.id, key))
+ self._stream_id, self.space.id,
+ self.id, key))
end
function methods:update(key, oplist, opts)
check_index_arg(self, 'update')
return nothing_or_data(remote:_request(M_UPDATE, opts,
self.space._format_cdata,
- self.space.id, self.id, key,
- oplist))
+ self._stream_id, self.space.id,
+ self.id, key, oplist))
end
return { __index = methods, __metatable = false }
diff --git a/test/box/access.result b/test/box/access.result
index 712cd68f8..6434da907 100644
--- a/test/box/access.result
+++ b/test/box/access.result
@@ -908,15 +908,15 @@ LISTEN = require('uri').parse(box.cfg.listen)
c = net.connect(LISTEN.host, LISTEN.service)
---
...
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
---
- error: Space '1' does not exist
...
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
---
- error: Space '65537' does not exist
...
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
---
- error: Space '4294967295' does not exist
...
diff --git a/test/box/access.test.lua b/test/box/access.test.lua
index 6060475d1..6abdb780d 100644
--- a/test/box/access.test.lua
+++ b/test/box/access.test.lua
@@ -351,9 +351,9 @@ box.schema.func.drop(name)
-- very large space id, no crash occurs.
LISTEN = require('uri').parse(box.cfg.listen)
c = net.connect(LISTEN.host, LISTEN.service)
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
c:close()
session = box.session
diff --git a/test/box/net.box_console_connections_gh-2677.result b/test/box/net.box_console_connections_gh-2677.result
index f45aa0b56..7cea0a1da 100644
--- a/test/box/net.box_console_connections_gh-2677.result
+++ b/test/box/net.box_console_connections_gh-2677.result
@@ -74,7 +74,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
---
...
while not c:is_connected() do fiber.sleep(0.01) end
diff --git a/test/box/net.box_console_connections_gh-2677.test.lua b/test/box/net.box_console_connections_gh-2677.test.lua
index 40d099e70..6c4e6ea4f 100644
--- a/test/box/net.box_console_connections_gh-2677.test.lua
+++ b/test/box/net.box_console_connections_gh-2677.test.lua
@@ -30,7 +30,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
while not c:is_connected() do fiber.sleep(0.01) end
c:ping()
diff --git a/test/box/net.box_incorrect_iterator_gh-841.result b/test/box/net.box_incorrect_iterator_gh-841.result
index fbd2a7700..cd2a86787 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.result
+++ b/test/box/net.box_incorrect_iterator_gh-841.result
@@ -16,13 +16,13 @@ test_run:cmd("setopt delimiter ';'")
- true
...
function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
- local ret = cn:_request(remote._method.select, opts, nil, space_id,
+ local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
index_id, iterator, offset, limit, key)
return ret
end
function x_fatal(cn)
cn._transport.perform_request(nil, nil, false, remote._method.inject,
- nil, nil, nil, '\x80')
+ nil, nil, nil, nil, '\x80')
end
test_run:cmd("setopt delimiter ''");
---
diff --git a/test/box/net.box_incorrect_iterator_gh-841.test.lua b/test/box/net.box_incorrect_iterator_gh-841.test.lua
index 1d24f9f56..9c42175ef 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.test.lua
+++ b/test/box/net.box_incorrect_iterator_gh-841.test.lua
@@ -5,13 +5,13 @@ test_run:cmd("push filter ".."'\\.lua.*:[0-9]+: ' to '.lua...\"]:<line>: '")
test_run:cmd("setopt delimiter ';'")
function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
- local ret = cn:_request(remote._method.select, opts, nil, space_id,
+ local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
index_id, iterator, offset, limit, key)
return ret
end
function x_fatal(cn)
cn._transport.perform_request(nil, nil, false, remote._method.inject,
- nil, nil, nil, '\x80')
+ nil, nil, nil, nil, '\x80')
end
test_run:cmd("setopt delimiter ''");
diff --git a/test/box/net.box_iproto_hangs_gh-3464.result b/test/box/net.box_iproto_hangs_gh-3464.result
index 3b5458c9a..cbf8181b3 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.result
+++ b/test/box/net.box_iproto_hangs_gh-3464.result
@@ -17,7 +17,7 @@ c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
---
...
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
---
- null
- Peer closed
diff --git a/test/box/net.box_iproto_hangs_gh-3464.test.lua b/test/box/net.box_iproto_hangs_gh-3464.test.lua
index a7c41ae76..51a9ddece 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.test.lua
+++ b/test/box/net.box_iproto_hangs_gh-3464.test.lua
@@ -8,6 +8,6 @@ net = require('net.box')
--
c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
c:close()
test_run:grep_log('default', 'too big packet size in the header') ~= nil
diff --git a/test/box/net.box_long-poll_input_gh-3400.result b/test/box/net.box_long-poll_input_gh-3400.result
index a16110ee6..a98eea655 100644
--- a/test/box/net.box_long-poll_input_gh-3400.result
+++ b/test/box/net.box_long-poll_input_gh-3400.result
@@ -24,10 +24,10 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, false, \
- net._method.call_17, nil, nil, nil, 'long', {}) \
-c._transport.perform_request(nil, nil, false, net._method.inject, \
- nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, \
+ net._method.call_17, nil, nil, nil, nil, 'long', {}) \
+c._transport.perform_request(nil, nil, false, net._method.inject, \
+ nil, nil, nil, nil, '\x80')
---
...
while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/net.box_long-poll_input_gh-3400.test.lua b/test/box/net.box_long-poll_input_gh-3400.test.lua
index 891b59224..a6f302ee0 100644
--- a/test/box/net.box_long-poll_input_gh-3400.test.lua
+++ b/test/box/net.box_long-poll_input_gh-3400.test.lua
@@ -14,9 +14,9 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, false, \
- net._method.call_17, nil, nil, nil, 'long', {}) \
-c._transport.perform_request(nil, nil, false, net._method.inject, \
- nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, \
+ net._method.call_17, nil, nil, nil, nil, 'long', {}) \
+c._transport.perform_request(nil, nil, false, net._method.inject, \
+ nil, nil, nil, nil, '\x80')
while f:status() ~= 'dead' do fiber.sleep(0.01) end
c:close()
diff --git a/test/box/stream.lua b/test/box/stream.lua
new file mode 100644
index 000000000..db6a29a8a
--- /dev/null
+++ b/test/box/stream.lua
@@ -0,0 +1,13 @@
+#!/usr/bin/env tarantool
+
+require('console').listen(os.getenv('ADMIN'))
+
+local memtx_use_mvcc_engine = (arg[2] and arg[2] == 'true' and true or false)
+
+box.cfg({
+ listen = os.getenv('LISTEN'),
+ iproto_threads = tonumber(arg[1]),
+ memtx_use_mvcc_engine = memtx_use_mvcc_engine
+})
+
+box.schema.user.grant('guest', 'read,write,execute,create,drop', 'universe', nil, {if_not_exists = true})
diff --git a/test/box/stream.result b/test/box/stream.result
new file mode 100644
index 000000000..03200ecf6
--- /dev/null
+++ b/test/box/stream.result
@@ -0,0 +1,485 @@
+-- test-run result file version 2
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+test_run:cmd("create server test with script='box/stream.lua'")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function get_current_connection_count()
+ local total_net_stat_table =
+ test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+ assert(total_net_stat_table)
+ local connection_stat_table = total_net_stat_table.CONNECTIONS
+ assert(connection_stat_table)
+ return connection_stat_table.current
+end;
+ | ---
+ | ...
+function wait_and_return_results(futures)
+ local results = {}
+ for name, future in pairs(futures) do
+ local err
+ results[name], err = future:wait_result()
+ if err then
+ results[name] = err
+ end
+ end
+ return results
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+conn_1 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn_1:new_stream()
+ | ---
+ | ...
+conn_2 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_2 = conn_2:new_stream()
+ | ---
+ | ...
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+ | ---
+ | ...
+assert(not stream_1:ping())
+ | ---
+ | - true
+ | ...
+stream_2:close()
+ | ---
+ | ...
+assert(not conn_2:ping())
+ | ---
+ | - true
+ | ...
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+-- The new method `new_stream`, for the stream object, returns a new
+-- stream object, just as in the case of connection.
+_ = stream:new_stream()
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+assert(not conn.space.test)
+ | ---
+ | - true
+ | ...
+assert(not stream.space.test)
+ | ---
+ | - true
+ | ...
+assert(conn.schema_version == stream._schema_version)
+ | ---
+ | - true
+ | ...
+conn:reload_schema()
+ | ---
+ | ...
+assert(conn.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+assert(conn.schema_version ~= stream._schema_version)
+ | ---
+ | - true
+ | ...
+assert(stream.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+ | ---
+ | - true
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+assert(conn.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+assert(stream.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s:drop()
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+conn:reload_schema()
+ | ---
+ | ...
+assert(not conn.space.test)
+ | ---
+ | - true
+ | ...
+assert(not stream.space.test)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- All test works with iproto_thread count = 10
+
+test_run:cmd("start server test with args='10'")
+ | ---
+ | - true
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function replace_with_yeild(item)
+ fiber.sleep(0.1)
+ return s:replace({item})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+conn_space = conn.space.test
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+stream_space = stream.space.test
+ | ---
+ | ...
+
+-- Check that all requests in stream processed consistently
+futures = {}
+ | ---
+ | ...
+replace_count = 3
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+for i = 1, replace_count do
+ futures[string.format("replace_%d", i)] =
+ stream_space:replace({i}, {is_async = true})
+ futures[string.format("select_%d", i)] =
+ stream_space:select({}, {is_async = true})
+end;
+ | ---
+ | ...
+futures["replace_with_yeild_for_stream"] =
+ stream:call("replace_with_yeild",
+ { replace_count + 1 }, {is_async = true});
+ | ---
+ | ...
+futures["select_with_yeild_for_stream"] =
+ stream_space:select({}, {is_async = true});
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+results = wait_and_return_results(futures)
+ | ---
+ | ...
+-- [1]
+assert(results["select_1"])
+ | ---
+ | - - [1]
+ | ...
+-- [1] [2]
+assert(results["select_2"])
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+-- [1] [2] [3]
+assert(results["select_3"])
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | ...
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | ...
+
+-- There is no request execution order for the connection
+futures = {}
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+futures["replace_with_yeild_for_connection"] =
+ conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+ | ---
+ | ...
+futures["select_with_yeild_for_connection"] =
+ conn_space:select({}, {is_async = true});
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+results = wait_and_return_results(futures)
+ | ---
+ | ...
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- [1] [2] [3] [4] [5]
+s:select()
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | - [5]
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+space = stream.space.test
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+replace_count = 20
+for i = 1, replace_count do
+ space:replace({i}, {is_async = true})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+-- Give time to send
+fiber.sleep(0)
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | - [5]
+ | - [6]
+ | - [7]
+ | - [8]
+ | - [9]
+ | - [10]
+ | - [11]
+ | - [12]
+ | - [13]
+ | - [14]
+ | - [15]
+ | - [16]
+ | - [17]
+ | - [18]
+ | - [19]
+ | - [20]
+ | ...
+s:drop()
+ | ---
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("cleanup server test")
+ | ---
+ | - true
+ | ...
+test_run:cmd("delete server test")
+ | ---
+ | - true
+ | ...
diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua
new file mode 100644
index 000000000..72129a228
--- /dev/null
+++ b/test/box/stream.test.lua
@@ -0,0 +1,182 @@
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+fiber = require('fiber')
+test_run = require('test_run').new()
+
+test_run:cmd("create server test with script='box/stream.lua'")
+
+test_run:cmd("setopt delimiter ';'")
+function get_current_connection_count()
+ local total_net_stat_table =
+ test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+ assert(total_net_stat_table)
+ local connection_stat_table = total_net_stat_table.CONNECTIONS
+ assert(connection_stat_table)
+ return connection_stat_table.current
+end;
+function wait_and_return_results(futures)
+ local results = {}
+ for name, future in pairs(futures) do
+ local err
+ results[name], err = future:wait_result()
+ if err then
+ results[name] = err
+ end
+ end
+ return results
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn_1 = net_box.connect(server_addr)
+stream_1 = conn_1:new_stream()
+conn_2 = net_box.connect(server_addr)
+stream_2 = conn_2:new_stream()
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+assert(not stream_1:ping())
+stream_2:close()
+assert(not conn_2:ping())
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+-- The new method `new_stream`, for the stream object, returns a new
+-- stream object, just as in the case of connection.
+_ = stream:new_stream()
+conn:close()
+
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+test_run:switch("test")
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch("default")
+assert(not conn.space.test)
+assert(not stream.space.test)
+assert(conn.schema_version == stream._schema_version)
+conn:reload_schema()
+assert(conn.space.test ~= nil)
+assert(conn.schema_version ~= stream._schema_version)
+assert(stream.space.test ~= nil)
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+collectgarbage()
+collectgarbage()
+assert(conn.space.test ~= nil)
+assert(stream.space.test ~= nil)
+test_run:switch("test")
+s:drop()
+test_run:switch("default")
+conn:reload_schema()
+assert(not conn.space.test)
+assert(not stream.space.test)
+test_run:cmd("stop server test")
+
+-- All test works with iproto_thread count = 10
+
+test_run:cmd("start server test with args='10'")
+test_run:switch('test')
+fiber = require('fiber')
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:cmd("setopt delimiter ';'")
+function replace_with_yeild(item)
+ fiber.sleep(0.1)
+ return s:replace({item})
+end;
+test_run:cmd("setopt delimiter ''");
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+conn_space = conn.space.test
+stream = conn:new_stream()
+stream_space = stream.space.test
+
+-- Check that all requests in stream processed consistently
+futures = {}
+replace_count = 3
+test_run:cmd("setopt delimiter ';'")
+for i = 1, replace_count do
+ futures[string.format("replace_%d", i)] =
+ stream_space:replace({i}, {is_async = true})
+ futures[string.format("select_%d", i)] =
+ stream_space:select({}, {is_async = true})
+end;
+futures["replace_with_yeild_for_stream"] =
+ stream:call("replace_with_yeild",
+ { replace_count + 1 }, {is_async = true});
+futures["select_with_yeild_for_stream"] =
+ stream_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1]
+assert(results["select_1"])
+-- [1] [2]
+assert(results["select_2"])
+-- [1] [2] [3]
+assert(results["select_3"])
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+
+-- There is no request execution order for the connection
+futures = {}
+test_run:cmd("setopt delimiter ';'")
+futures["replace_with_yeild_for_connection"] =
+ conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+futures["select_with_yeild_for_connection"] =
+ conn_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+test_run:switch("test")
+-- [1] [2] [3] [4] [5]
+s:select()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+space = stream.space.test
+test_run:cmd("setopt delimiter ';'")
+replace_count = 20
+for i = 1, replace_count do
+ space:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+-- Give time to send
+fiber.sleep(0)
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:switch("test")
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:cmd("stop server test")
+
+test_run:cmd("cleanup server test")
+test_run:cmd("delete server test")
diff --git a/test/box/suite.ini b/test/box/suite.ini
index b5d869fb3..94cf7811f 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -5,7 +5,7 @@ script = box.lua
disabled = rtree_errinj.test.lua tuple_bench.test.lua
long_run = huge_field_map_long.test.lua
config = engine.cfg
-release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua
+release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua stream.test.lua
lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua
use_unix_sockets = True
use_unix_sockets_iproto = True
--
2.20.1
More information about the Tarantool-patches
mailing list