From: mechanik20051988 via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: tarantool-patches@dev.tarantool.org, vdavydov@tarantool.org, v.shpilevoy@tarantool.org Cc: mechanik20051988 <mechanik20.05.1988@gmail.com> Subject: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box Date: Wed, 11 Aug 2021 11:56:56 +0300 [thread overview] Message-ID: <56db4eea5c868187e151658427e1a6d65736771a.1628671235.git.mechanik20051988@tarantool.org> (raw) In-Reply-To: <cover.1628671235.git.mechanik20051988@tarantool.org> From: mechanik20051988 <mechanik20.05.1988@gmail.com> Add stream support to `net.box`. In "net.box", stream is an object over connection that has the same methods, but all requests from it sends with non-zero stream ID. Since there can be a lot of streams, we do not copy the spaces from the connection to the stream immediately when creating a stream, but do it only when we first access space. Also, when updating the schema, we update the spaces in lazy mode: each stream has it's own schema_version, when there is some access to stream space we compare stream schema_version and connection schema_version and if they are different update clear stream space cache and wrap space that is being accessed to stream cache. Part of #5860 @TarantoolBot document Title: stream support was added to net.box In "net.box", stream is an object over connection that has the same methods, but all requests from it sends with non-zero stream ID. Stream ID is generated on the client automatically. Simple example of stream creation using net.box: ```lua stream = conn:new_stream() -- all connection methods are valid, but send requests -- with non zero stream_id. ``` --- src/box/lua/net_box.c | 95 ++-- src/box/lua/net_box.lua | 205 ++++++-- test/box/access.result | 6 +- test/box/access.test.lua | 6 +- ...net.box_console_connections_gh-2677.result | 2 +- ...t.box_console_connections_gh-2677.test.lua | 2 +- .../net.box_incorrect_iterator_gh-841.result | 4 +- ...net.box_incorrect_iterator_gh-841.test.lua | 4 +- test/box/net.box_iproto_hangs_gh-3464.result | 2 +- .../box/net.box_iproto_hangs_gh-3464.test.lua | 2 +- .../net.box_long-poll_input_gh-3400.result | 8 +- .../net.box_long-poll_input_gh-3400.test.lua | 8 +- test/box/stream.lua | 13 + test/box/stream.result | 485 ++++++++++++++++++ test/box/stream.test.lua | 182 +++++++ test/box/suite.ini | 2 +- 16 files changed, 934 insertions(+), 92 deletions(-) create mode 100644 test/box/stream.lua create mode 100644 test/box/stream.result create mode 100644 test/box/stream.test.lua diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index 06e574cdf..3bc49af23 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -328,7 +328,7 @@ netbox_registry_reset(struct netbox_registry *registry, struct error *error) static inline size_t netbox_begin_encode(struct mpstream *stream, uint64_t sync, - enum iproto_type type) + enum iproto_type type, uint64_t stream_id) { /* Remember initial size of ibuf (see netbox_end_encode()) */ struct ibuf *ibuf = stream->ctx; @@ -340,7 +340,7 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync, mpstream_advance(stream, fixheader_size); /* encode header */ - mpstream_encode_map(stream, 2); + mpstream_encode_map(stream, stream_id != 0 ? 3 : 2); mpstream_encode_uint(stream, IPROTO_SYNC); mpstream_encode_uint(stream, sync); @@ -348,6 +348,10 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync, mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE); mpstream_encode_uint(stream, type); + if (stream_id != 0) { + mpstream_encode_uint(stream, IPROTO_STREAM_ID); + mpstream_encode_uint(stream, stream_id); + } /* Caller should remember how many bytes was used in ibuf */ return used; } @@ -380,11 +384,11 @@ netbox_end_encode(struct mpstream *stream, size_t initial_size) static void netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { (void)L; (void)idx; - size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING, stream_id); netbox_end_encode(stream, svp); } @@ -402,7 +406,7 @@ netbox_encode_auth(struct ibuf *ibuf, uint64_t sync, struct mpstream stream; mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, mpstream_error_handler, &is_error); - size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH); + size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH, 0); /* Adapted from xrow_encode_auth() */ mpstream_encode_map(&stream, password != NULL ? 2 : 1); @@ -432,7 +436,7 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id) struct mpstream stream; mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, mpstream_error_handler, &is_error); - size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT); + size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT, 0); mpstream_encode_map(&stream, 3); mpstream_encode_uint(&stream, IPROTO_SPACE_ID); mpstream_encode_uint(&stream, space_id); @@ -446,10 +450,10 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id) static void netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync, enum iproto_type type) + uint64_t sync, enum iproto_type type, uint64_t stream_id) { /* Lua stack at idx: function_name, args */ - size_t svp = netbox_begin_encode(stream, sync, type); + size_t svp = netbox_begin_encode(stream, sync, type, stream_id); mpstream_encode_map(stream, 2); @@ -468,24 +472,25 @@ netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_call_16(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL_16); + netbox_encode_call_impl(L, idx, stream, sync, + IPROTO_CALL_16, stream_id); } static void netbox_encode_call(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL); + netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL, stream_id); } static void netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: expr, args */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL, stream_id); mpstream_encode_map(stream, 2); @@ -504,10 +509,11 @@ netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_select(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, index_id, iterator, offset, limit, key */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT, + stream_id); mpstream_encode_map(stream, 6); @@ -546,10 +552,11 @@ netbox_encode_select(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync, enum iproto_type type) + uint64_t sync, enum iproto_type type, + uint64_t stream_id) { /* Lua stack at idx: space_id, tuple */ - size_t svp = netbox_begin_encode(stream, sync, type); + size_t svp = netbox_begin_encode(stream, sync, type, stream_id); mpstream_encode_map(stream, 2); @@ -567,24 +574,27 @@ netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_insert(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_INSERT); + netbox_encode_insert_or_replace(L, idx, stream, sync, + IPROTO_INSERT, stream_id); } static void netbox_encode_replace(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { - netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_REPLACE); + netbox_encode_insert_or_replace(L, idx, stream, sync, + IPROTO_REPLACE, stream_id); } static void netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, index_id, key */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE, + stream_id); mpstream_encode_map(stream, 3); @@ -607,10 +617,11 @@ netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_update(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, index_id, key, ops */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE, + stream_id); mpstream_encode_map(stream, 5); @@ -641,10 +652,11 @@ netbox_encode_update(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_upsert(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: space_id, tuple, ops */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT, + stream_id); mpstream_encode_map(stream, 4); @@ -844,10 +856,11 @@ netbox_send_and_recv_console(int fd, struct ibuf *send_buf, static void netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: query, parameters, options */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE, + stream_id); mpstream_encode_map(stream, 3); @@ -873,10 +886,11 @@ netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: query */ - size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE); + size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE, + stream_id); mpstream_encode_map(stream, 1); @@ -896,18 +910,19 @@ netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream, static void netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: query, parameters, options */ - netbox_encode_prepare(L, idx, stream, sync); + netbox_encode_prepare(L, idx, stream, sync, stream_id); } static void netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream, - uint64_t sync) + uint64_t sync, uint64_t stream_id) { /* Lua stack at idx: bytes */ (void)sync; + (void)stream_id; size_t len; const char *data = lua_tolstring(L, idx, &len); mpstream_memcpy(stream, data, len); @@ -921,11 +936,11 @@ netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream, */ static int netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method, - struct ibuf *ibuf, uint64_t sync) + struct ibuf *ibuf, uint64_t sync, uint64_t stream_id) { typedef void (*method_encoder_f)(struct lua_State *L, int idx, struct mpstream *stream, - uint64_t sync); + uint64_t sync, uint64_t stream_id); static method_encoder_f method_encoder[] = { [NETBOX_PING] = netbox_encode_ping, [NETBOX_CALL_16] = netbox_encode_call_16, @@ -949,7 +964,7 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method, struct mpstream stream; mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, luamp_error, L); - method_encoder[method](L, idx, &stream, sync); + method_encoder[method](L, idx, &stream, sync, stream_id); return 0; } @@ -1569,6 +1584,7 @@ netbox_new_registry(struct lua_State *L) * - on_push: on_push trigger function * - on_push_ctx: on_push trigger function argument * - format: tuple format to use for decoding the body or nil + * - stream_id: determines whether or not the request belongs to stream * - ...: method-specific arguments passed to the encoder */ static void @@ -1581,7 +1597,8 @@ netbox_make_request(struct lua_State *L, int idx, enum netbox_method method = lua_tointeger(L, idx + 4); assert(method < netbox_method_MAX); uint64_t sync = registry->next_sync++; - netbox_encode_method(L, idx + 8, method, send_buf, sync); + uint64_t stream_id = luaL_touint64(L, idx + 8); + netbox_encode_method(L, idx + 9, method, send_buf, sync, stream_id); /* Initialize and register the request object. */ request->method = method; diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 8f5671c15..3dffc245f 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -275,14 +275,14 @@ local function create_transport(host, port, user, password, callback, -- @retval not nil Future object. -- local function perform_async_request(buffer, skip_header, method, on_push, - on_push_ctx, format, ...) + on_push_ctx, format, stream_id, ...) local err = prepare_perform_request() if err then return nil, err end return perform_async_request_impl(requests, send_buf, buffer, skip_header, method, on_push, - on_push_ctx, format, ...) + on_push_ctx, format, stream_id, ...) end -- @@ -291,14 +291,15 @@ local function create_transport(host, port, user, password, callback, -- @retval not nil Response object. -- local function perform_request(timeout, buffer, skip_header, method, - on_push, on_push_ctx, format, ...) + on_push, on_push_ctx, format, + stream_id, ...) local err = prepare_perform_request() if err then return nil, err end return perform_request_impl(timeout, requests, send_buf, buffer, skip_header, method, on_push, on_push_ctx, - format, ...) + format, stream_id, ...) end -- PROTOCOL STATE MACHINE (WORKER FIBER) -- @@ -487,6 +488,37 @@ local function remote_serialize(self) } end +local function stream_serialize(self) + return { + host = self._conn.host, + port = self._conn.port, + opts = next(self._conn.opts) and self._conn.opts, + state = self._conn.state, + error = self._conn.error, + protocol = self._conn.protocol, + schema_version = self._conn.schema_version, + peer_uuid = self._conn.peer_uuid, + peer_version_id = self._conn.peer_version_id, + stream_id = self._stream_id + } +end + +local function stream_spaces_serialize(self) + return self._stream._conn.space +end + +local function stream_space_serialize(self) + return self._src +end + +local function stream_indexes_serialize(self) + return self._space._src.index +end + +local function stream_index_serialize(self) + return self._src +end + local remote_methods = {} local remote_mt = { __index = remote_methods, __serialize = remote_serialize, @@ -499,6 +531,86 @@ local console_mt = { __metatable = false } +-- Create stream space index, which is same as connection space +-- index, but have non zero stream ID. +local function stream_wrap_index(stream_id, src) + return setmetatable({ + _stream_id = stream_id, + _src = src, + }, { + __index = src, + __serialize = stream_index_serialize + }) +end + +-- Metatable for stream space indexes. When stream space being +-- created there are no indexes in it. When accessing the space +-- index, we look for corresponding space index in corresponding +-- connection space. If it is found we create same index for the +-- stream space but with corresponding stream ID. We do not need +-- to compare stream _schema_version and connection schema_version, +-- because all access to index is carried out through it's space. +-- So we update schema_version when we access space. +local stream_indexes_mt = { + __index = function(self, key) + local _space = self._space + local src = _space._src.index[key] + if not src then + return nil + end + local res = stream_wrap_index(_space._stream_id, src) + self[key] = res + return res + end, + __serialize = stream_indexes_serialize +} + +-- Create stream space, which is same as connection space, +-- but have non zero stream ID. +local function stream_wrap_space(stream, src) + local res = setmetatable({ + _stream_id = stream._stream_id, + _src = src, + index = setmetatable({ + _space = nil, + }, stream_indexes_mt) + }, { + __index = src, + __serialize = stream_space_serialize + }) + res.index._space = res + return res +end + +-- Metatable for stream spaces. When stream being created there +-- are no spaces in it. When user try to access some space in +-- stream, we first of all compare _schema_version of stream with +-- schema_version from connection and if they are not equal, we +-- clear stream space cache and update it's schema_version. Then +-- we look for corresponding space in the connection. If it is +-- found we create same space for the stream but with corresponding +-- stream ID. +local stream_spaces_mt = { + __index = function(self, key) + local stream = self._stream + if stream._schema_version ~= stream._conn.schema_version then + stream._schema_version = stream._conn.schema_version + self._stream_space_cache = {} + end + if self._stream_space_cache[key] then + return self._stream_space_cache[key] + end + local src = stream._conn.space[key] + if not src then + return nil + end + local res = stream_wrap_space(stream, src) + self._stream_space_cache[key] = res + return res + end, + __serialize = stream_spaces_serialize +} + local space_metatable, index_metatable local function new_sm(host, port, opts, connection, greeting) @@ -578,6 +690,8 @@ local function new_sm(host, port, opts, connection, greeting) if opts.wait_connected ~= false then remote._transport.wait_state('active', tonumber(opts.wait_connected)) end + -- Last stream ID used for this connection + remote._last_stream_id = 0 return remote end @@ -635,6 +749,28 @@ local function check_eval_args(args) end end +local function new_stream(stream) + check_remote_arg(stream, 'new_stream') + return stream._conn:new_stream() +end + +function remote_methods:new_stream() + check_remote_arg(self, 'new_stream') + self._last_stream_id = self._last_stream_id + 1 + local stream = setmetatable({ + new_stream = new_stream, + _stream_id = self._last_stream_id, + space = setmetatable({ + _stream_space_cache = {}, + _stream = nil, + }, stream_spaces_mt), + _conn = self, + _schema_version = self.schema_version, + }, { __index = self, __serialize = stream_serialize }) + stream.space._stream = stream + return stream +end + function remote_methods:close() check_remote_arg(self, 'close') self._transport.stop() @@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout) return self._transport.wait_state('active', timeout) end -function remote_methods:_request(method, opts, format, ...) +function remote_methods:_request(method, opts, format, stream_id, ...) local transport = self._transport local on_push, on_push_ctx, buffer, skip_header, deadline -- Extract options, set defaults, check if the request is @@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...) local res, err = transport.perform_async_request(buffer, skip_header, method, table.insert, {}, format, - ...) + stream_id, ...) if err then box.error(err) end @@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...) end local res, err = transport.perform_request(timeout, buffer, skip_header, method, on_push, on_push_ctx, - format, ...) + format, stream_id, ...) if err then box.error(err) end @@ -718,7 +854,7 @@ end function remote_methods:ping(opts) check_remote_arg(self, 'ping') - return (pcall(self._request, self, M_PING, opts)) + return (pcall(self._request, self, M_PING, opts, nil, self._stream_id)) end function remote_methods:reload_schema() @@ -729,14 +865,16 @@ end -- @deprecated since 1.7.4 function remote_methods:call_16(func_name, ...) check_remote_arg(self, 'call') - return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...})) + return (self:_request(M_CALL_16, nil, nil, self._stream_id, + tostring(func_name), {...})) end function remote_methods:call(func_name, args, opts) check_remote_arg(self, 'call') check_call_args(args) args = args or {} - local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args) + local res = self:_request(M_CALL_17, opts, nil, self._stream_id, + tostring(func_name), args) if type(res) ~= 'table' or opts and opts.is_async then return res end @@ -746,14 +884,15 @@ end -- @deprecated since 1.7.4 function remote_methods:eval_16(code, ...) check_remote_arg(self, 'eval') - return unpack((self:_request(M_EVAL, nil, nil, code, {...}))) + return unpack((self:_request(M_EVAL, nil, nil, self._stream_id, + code, {...}))) end function remote_methods:eval(code, args, opts) check_remote_arg(self, 'eval') check_eval_args(args) args = args or {} - local res = self:_request(M_EVAL, opts, nil, code, args) + local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args) if type(res) ~= 'table' or opts and opts.is_async then return res end @@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts) if sql_opts ~= nil then box.error(box.error.UNSUPPORTED, "execute", "options") end - return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {}, - sql_opts or {}) + return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id, + query, parameters or {}, sql_opts or {}) end function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args @@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua if sql_opts ~= nil then box.error(box.error.UNSUPPORTED, "prepare", "options") end - return self:_request(M_PREPARE, netbox_opts, nil, query) + return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query) end function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts) @@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts) if sql_opts ~= nil then box.error(box.error.UNSUPPORTED, "unprepare", "options") end - return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {}, - sql_opts or {}) + return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id, + query, parameters or {}, sql_opts or {}) end function remote_methods:wait_state(state, timeout) @@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout) end if self.protocol == 'Binary' then local loader = 'return require("console").eval(...)' - res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader, + res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader, {line}) else assert(self.protocol == 'Lua console') - res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, + res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil, line..'$EOF$\n') end if err then @@ -951,14 +1090,14 @@ space_metatable = function(remote) function methods:insert(tuple, opts) check_space_arg(self, 'insert') - return remote:_request(M_INSERT, opts, self._format_cdata, self.id, - tuple) + return remote:_request(M_INSERT, opts, self._format_cdata, + self._stream_id, self.id, tuple) end function methods:replace(tuple, opts) check_space_arg(self, 'replace') - return remote:_request(M_REPLACE, opts, self._format_cdata, self.id, - tuple) + return remote:_request(M_REPLACE, opts, self._format_cdata, + self._stream_id, self.id, tuple) end function methods:select(key, opts) @@ -978,7 +1117,8 @@ space_metatable = function(remote) function methods:upsert(key, oplist, opts) check_space_arg(self, 'upsert') - return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id, + return nothing_or_data(remote:_request(M_UPSERT, opts, nil, + self._stream_id, self.id, key, oplist)) end @@ -1009,8 +1149,8 @@ index_metatable = function(remote) local offset = tonumber(opts and opts.offset) or 0 local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF return (remote:_request(M_SELECT, opts, self.space._format_cdata, - self.space.id, self.id, iterator, offset, - limit, key)) + self._stream_id, self.space.id, self.id, + iterator, offset, limit, key)) end function methods:get(key, opts) @@ -1020,6 +1160,7 @@ index_metatable = function(remote) end return nothing_or_data(remote:_request(M_GET, opts, self.space._format_cdata, + self._stream_id, self.space.id, self.id, box.index.EQ, 0, 2, key)) end @@ -1031,6 +1172,7 @@ index_metatable = function(remote) end return nothing_or_data(remote:_request(M_MIN, opts, self.space._format_cdata, + self._stream_id, self.space.id, self.id, box.index.GE, 0, 1, key)) end @@ -1042,6 +1184,7 @@ index_metatable = function(remote) end return nothing_or_data(remote:_request(M_MAX, opts, self.space._format_cdata, + self._stream_id, self.space.id, self.id, box.index.LE, 0, 1, key)) end @@ -1053,22 +1196,24 @@ index_metatable = function(remote) end local code = string.format('box.space.%s.index.%s:count', self.space.name, self.name) - return remote:_request(M_COUNT, opts, nil, code, { key, opts }) + return remote:_request(M_COUNT, opts, nil, self._stream_id, + code, { key, opts }) end function methods:delete(key, opts) check_index_arg(self, 'delete') return nothing_or_data(remote:_request(M_DELETE, opts, self.space._format_cdata, - self.space.id, self.id, key)) + self._stream_id, self.space.id, + self.id, key)) end function methods:update(key, oplist, opts) check_index_arg(self, 'update') return nothing_or_data(remote:_request(M_UPDATE, opts, self.space._format_cdata, - self.space.id, self.id, key, - oplist)) + self._stream_id, self.space.id, + self.id, key, oplist)) end return { __index = methods, __metatable = false } diff --git a/test/box/access.result b/test/box/access.result index 712cd68f8..6434da907 100644 --- a/test/box/access.result +++ b/test/box/access.result @@ -908,15 +908,15 @@ LISTEN = require('uri').parse(box.cfg.listen) c = net.connect(LISTEN.host, LISTEN.service) --- ... -c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) --- - error: Space '1' does not exist ... -c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) --- - error: Space '65537' does not exist ... -c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) --- - error: Space '4294967295' does not exist ... diff --git a/test/box/access.test.lua b/test/box/access.test.lua index 6060475d1..6abdb780d 100644 --- a/test/box/access.test.lua +++ b/test/box/access.test.lua @@ -351,9 +351,9 @@ box.schema.func.drop(name) -- very large space id, no crash occurs. LISTEN = require('uri').parse(box.cfg.listen) c = net.connect(LISTEN.host, LISTEN.service) -c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) -c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) -c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) +c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {}) c:close() session = box.session diff --git a/test/box/net.box_console_connections_gh-2677.result b/test/box/net.box_console_connections_gh-2677.result index f45aa0b56..7cea0a1da 100644 --- a/test/box/net.box_console_connections_gh-2677.result +++ b/test/box/net.box_console_connections_gh-2677.result @@ -74,7 +74,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80') --- ... while not c:is_connected() do fiber.sleep(0.01) end diff --git a/test/box/net.box_console_connections_gh-2677.test.lua b/test/box/net.box_console_connections_gh-2677.test.lua index 40d099e70..6c4e6ea4f 100644 --- a/test/box/net.box_console_connections_gh-2677.test.lua +++ b/test/box/net.box_console_connections_gh-2677.test.lua @@ -30,7 +30,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80') +_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80') while not c:is_connected() do fiber.sleep(0.01) end c:ping() diff --git a/test/box/net.box_incorrect_iterator_gh-841.result b/test/box/net.box_incorrect_iterator_gh-841.result index fbd2a7700..cd2a86787 100644 --- a/test/box/net.box_incorrect_iterator_gh-841.result +++ b/test/box/net.box_incorrect_iterator_gh-841.result @@ -16,13 +16,13 @@ test_run:cmd("setopt delimiter ';'") - true ... function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) - local ret = cn:_request(remote._method.select, opts, nil, space_id, + local ret = cn:_request(remote._method.select, opts, nil, nil, space_id, index_id, iterator, offset, limit, key) return ret end function x_fatal(cn) cn._transport.perform_request(nil, nil, false, remote._method.inject, - nil, nil, nil, '\x80') + nil, nil, nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); --- diff --git a/test/box/net.box_incorrect_iterator_gh-841.test.lua b/test/box/net.box_incorrect_iterator_gh-841.test.lua index 1d24f9f56..9c42175ef 100644 --- a/test/box/net.box_incorrect_iterator_gh-841.test.lua +++ b/test/box/net.box_incorrect_iterator_gh-841.test.lua @@ -5,13 +5,13 @@ test_run:cmd("push filter ".."'\\.lua.*:[0-9]+: ' to '.lua...\"]:<line>: '") test_run:cmd("setopt delimiter ';'") function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) - local ret = cn:_request(remote._method.select, opts, nil, space_id, + local ret = cn:_request(remote._method.select, opts, nil, nil, space_id, index_id, iterator, offset, limit, key) return ret end function x_fatal(cn) cn._transport.perform_request(nil, nil, false, remote._method.inject, - nil, nil, nil, '\x80') + nil, nil, nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); diff --git a/test/box/net.box_iproto_hangs_gh-3464.result b/test/box/net.box_iproto_hangs_gh-3464.result index 3b5458c9a..cbf8181b3 100644 --- a/test/box/net.box_iproto_hangs_gh-3464.result +++ b/test/box/net.box_iproto_hangs_gh-3464.result @@ -17,7 +17,7 @@ c = net:connect(box.cfg.listen) data = msgpack.encode(18400000000000000000)..'aaaaaaa' --- ... -c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data) +c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data) --- - null - Peer closed diff --git a/test/box/net.box_iproto_hangs_gh-3464.test.lua b/test/box/net.box_iproto_hangs_gh-3464.test.lua index a7c41ae76..51a9ddece 100644 --- a/test/box/net.box_iproto_hangs_gh-3464.test.lua +++ b/test/box/net.box_iproto_hangs_gh-3464.test.lua @@ -8,6 +8,6 @@ net = require('net.box') -- c = net:connect(box.cfg.listen) data = msgpack.encode(18400000000000000000)..'aaaaaaa' -c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data) +c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data) c:close() test_run:grep_log('default', 'too big packet size in the header') ~= nil diff --git a/test/box/net.box_long-poll_input_gh-3400.result b/test/box/net.box_long-poll_input_gh-3400.result index a16110ee6..a98eea655 100644 --- a/test/box/net.box_long-poll_input_gh-3400.result +++ b/test/box/net.box_long-poll_input_gh-3400.result @@ -24,10 +24,10 @@ c:ping() -- new attempts to read any data - the connection is closed -- already. -- -f = fiber.create(c._transport.perform_request, nil, nil, false, \ - net._method.call_17, nil, nil, nil, 'long', {}) \ -c._transport.perform_request(nil, nil, false, net._method.inject, \ - nil, nil, nil, '\x80') +f = fiber.create(c._transport.perform_request, nil, nil, false, \ + net._method.call_17, nil, nil, nil, nil, 'long', {}) \ +c._transport.perform_request(nil, nil, false, net._method.inject, \ + nil, nil, nil, nil, '\x80') --- ... while f:status() ~= 'dead' do fiber.sleep(0.01) end diff --git a/test/box/net.box_long-poll_input_gh-3400.test.lua b/test/box/net.box_long-poll_input_gh-3400.test.lua index 891b59224..a6f302ee0 100644 --- a/test/box/net.box_long-poll_input_gh-3400.test.lua +++ b/test/box/net.box_long-poll_input_gh-3400.test.lua @@ -14,9 +14,9 @@ c:ping() -- new attempts to read any data - the connection is closed -- already. -- -f = fiber.create(c._transport.perform_request, nil, nil, false, \ - net._method.call_17, nil, nil, nil, 'long', {}) \ -c._transport.perform_request(nil, nil, false, net._method.inject, \ - nil, nil, nil, '\x80') +f = fiber.create(c._transport.perform_request, nil, nil, false, \ + net._method.call_17, nil, nil, nil, nil, 'long', {}) \ +c._transport.perform_request(nil, nil, false, net._method.inject, \ + nil, nil, nil, nil, '\x80') while f:status() ~= 'dead' do fiber.sleep(0.01) end c:close() diff --git a/test/box/stream.lua b/test/box/stream.lua new file mode 100644 index 000000000..db6a29a8a --- /dev/null +++ b/test/box/stream.lua @@ -0,0 +1,13 @@ +#!/usr/bin/env tarantool + +require('console').listen(os.getenv('ADMIN')) + +local memtx_use_mvcc_engine = (arg[2] and arg[2] == 'true' and true or false) + +box.cfg({ + listen = os.getenv('LISTEN'), + iproto_threads = tonumber(arg[1]), + memtx_use_mvcc_engine = memtx_use_mvcc_engine +}) + +box.schema.user.grant('guest', 'read,write,execute,create,drop', 'universe', nil, {if_not_exists = true}) diff --git a/test/box/stream.result b/test/box/stream.result new file mode 100644 index 000000000..03200ecf6 --- /dev/null +++ b/test/box/stream.result @@ -0,0 +1,485 @@ +-- test-run result file version 2 +-- This test checks streams iplementation in iproto (gh-5860). +net_box = require('net.box') + | --- + | ... +fiber = require('fiber') + | --- + | ... +test_run = require('test_run').new() + | --- + | ... + +test_run:cmd("create server test with script='box/stream.lua'") + | --- + | - true + | ... + +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +function get_current_connection_count() + local total_net_stat_table = + test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1] + assert(total_net_stat_table) + local connection_stat_table = total_net_stat_table.CONNECTIONS + assert(connection_stat_table) + return connection_stat_table.current +end; + | --- + | ... +function wait_and_return_results(futures) + local results = {} + for name, future in pairs(futures) do + local err + results[name], err = future:wait_result() + if err then + results[name] = err + end + end + return results +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... + +-- Some simple checks for new object - stream +test_run:cmd("start server test with args='1'") + | --- + | - true + | ... +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] + | --- + | ... +conn_1 = net_box.connect(server_addr) + | --- + | ... +stream_1 = conn_1:new_stream() + | --- + | ... +conn_2 = net_box.connect(server_addr) + | --- + | ... +stream_2 = conn_2:new_stream() + | --- + | ... +-- Stream is a wrapper around connection, so if you close connection +-- you close stream, and vice versa. +conn_1:close() + | --- + | ... +assert(not stream_1:ping()) + | --- + | - true + | ... +stream_2:close() + | --- + | ... +assert(not conn_2:ping()) + | --- + | - true + | ... +conn = net_box.connect(server_addr) + | --- + | ... +stream = conn:new_stream() + | --- + | ... +-- The new method `new_stream`, for the stream object, returns a new +-- stream object, just as in the case of connection. +_ = stream:new_stream() + | --- + | ... +conn:close() + | --- + | ... + +-- Check that spaces in stream object updates, during reload_schema +conn = net_box.connect(server_addr) + | --- + | ... +stream = conn:new_stream() + | --- + | ... +test_run:switch("test") + | --- + | - true + | ... +-- Create one space on server +s = box.schema.space.create('test', { engine = 'memtx' }) + | --- + | ... +_ = s:create_index('primary') + | --- + | ... +test_run:switch("default") + | --- + | - true + | ... +assert(not conn.space.test) + | --- + | - true + | ... +assert(not stream.space.test) + | --- + | - true + | ... +assert(conn.schema_version == stream._schema_version) + | --- + | - true + | ... +conn:reload_schema() + | --- + | ... +assert(conn.space.test ~= nil) + | --- + | - true + | ... +assert(conn.schema_version ~= stream._schema_version) + | --- + | - true + | ... +assert(stream.space.test ~= nil) + | --- + | - true + | ... +-- When we touch stream.space, we compare stream._schema_version +-- and conn.schema_version if they are not equal, we clear stream +-- space cache, update it's _schema_version and load space from +-- connection to stream space cache. +assert(conn.schema_version == stream._schema_version) + | --- + | - true + | ... +collectgarbage() + | --- + | - 0 + | ... +collectgarbage() + | --- + | - 0 + | ... +assert(conn.space.test ~= nil) + | --- + | - true + | ... +assert(stream.space.test ~= nil) + | --- + | - true + | ... +test_run:switch("test") + | --- + | - true + | ... +s:drop() + | --- + | ... +test_run:switch("default") + | --- + | - true + | ... +conn:reload_schema() + | --- + | ... +assert(not conn.space.test) + | --- + | - true + | ... +assert(not stream.space.test) + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +-- All test works with iproto_thread count = 10 + +test_run:cmd("start server test with args='10'") + | --- + | - true + | ... +test_run:switch('test') + | --- + | - true + | ... +fiber = require('fiber') + | --- + | ... +s = box.schema.space.create('test', { engine = 'memtx' }) + | --- + | ... +_ = s:create_index('primary') + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +function replace_with_yeild(item) + fiber.sleep(0.1) + return s:replace({item}) +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... + +conn = net_box.connect(server_addr) + | --- + | ... +assert(conn:ping()) + | --- + | - true + | ... +conn_space = conn.space.test + | --- + | ... +stream = conn:new_stream() + | --- + | ... +stream_space = stream.space.test + | --- + | ... + +-- Check that all requests in stream processed consistently +futures = {} + | --- + | ... +replace_count = 3 + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +for i = 1, replace_count do + futures[string.format("replace_%d", i)] = + stream_space:replace({i}, {is_async = true}) + futures[string.format("select_%d", i)] = + stream_space:select({}, {is_async = true}) +end; + | --- + | ... +futures["replace_with_yeild_for_stream"] = + stream:call("replace_with_yeild", + { replace_count + 1 }, {is_async = true}); + | --- + | ... +futures["select_with_yeild_for_stream"] = + stream_space:select({}, {is_async = true}); + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +results = wait_and_return_results(futures) + | --- + | ... +-- [1] +assert(results["select_1"]) + | --- + | - - [1] + | ... +-- [1] [2] +assert(results["select_2"]) + | --- + | - - [1] + | - [2] + | ... +-- [1] [2] [3] +assert(results["select_3"]) + | --- + | - - [1] + | - [2] + | - [3] + | ... +-- [1] [2] [3] [4] +-- Even yeild in replace function does not affect +-- the order of requests execution in stream +assert(results["select_with_yeild_for_stream"]) + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | ... + +-- There is no request execution order for the connection +futures = {} + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +futures["replace_with_yeild_for_connection"] = + conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true}); + | --- + | ... +futures["select_with_yeild_for_connection"] = + conn_space:select({}, {is_async = true}); + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +results = wait_and_return_results(futures) + | --- + | ... +-- [1] [2] [3] [4] +-- Select will be processed earlier because of +-- yeild in `replace_with_yeild` function +assert(results["select_with_yeild_for_connection"]) + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | ... +test_run:switch("test") + | --- + | - true + | ... +-- [1] [2] [3] [4] [5] +s:select() + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | ... +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch('default') + | --- + | - true + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... + +-- Check that all request will be processed +-- after connection close. +conn = net_box.connect(server_addr) + | --- + | ... +stream = conn:new_stream() + | --- + | ... +space = stream.space.test + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +replace_count = 20 +for i = 1, replace_count do + space:replace({i}, {is_async = true}) +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... +-- Give time to send +fiber.sleep(0) + | --- + | ... +conn:close() + | --- + | ... +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + | --- + | - true + | ... +test_run:switch("test") + | --- + | - true + | ... +-- select return tuples from [1] to [20] +-- because all messages processed after +-- connection closed +s:select{} + | --- + | - - [1] + | - [2] + | - [3] + | - [4] + | - [5] + | - [6] + | - [7] + | - [8] + | - [9] + | - [10] + | - [11] + | - [12] + | - [13] + | - [14] + | - [15] + | - [16] + | - [17] + | - [18] + | - [19] + | - [20] + | ... +s:drop() + | --- + | ... +errinj = box.error.injection + | --- + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) + | --- + | - true + | ... +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) + | --- + | - true + | ... +test_run:switch("default") + | --- + | - true + | ... +test_run:cmd("stop server test") + | --- + | - true + | ... + +test_run:cmd("cleanup server test") + | --- + | - true + | ... +test_run:cmd("delete server test") + | --- + | - true + | ... diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua new file mode 100644 index 000000000..72129a228 --- /dev/null +++ b/test/box/stream.test.lua @@ -0,0 +1,182 @@ +-- This test checks streams iplementation in iproto (gh-5860). +net_box = require('net.box') +fiber = require('fiber') +test_run = require('test_run').new() + +test_run:cmd("create server test with script='box/stream.lua'") + +test_run:cmd("setopt delimiter ';'") +function get_current_connection_count() + local total_net_stat_table = + test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1] + assert(total_net_stat_table) + local connection_stat_table = total_net_stat_table.CONNECTIONS + assert(connection_stat_table) + return connection_stat_table.current +end; +function wait_and_return_results(futures) + local results = {} + for name, future in pairs(futures) do + local err + results[name], err = future:wait_result() + if err then + results[name] = err + end + end + return results +end; +test_run:cmd("setopt delimiter ''"); + +-- Some simple checks for new object - stream +test_run:cmd("start server test with args='1'") +server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1] +conn_1 = net_box.connect(server_addr) +stream_1 = conn_1:new_stream() +conn_2 = net_box.connect(server_addr) +stream_2 = conn_2:new_stream() +-- Stream is a wrapper around connection, so if you close connection +-- you close stream, and vice versa. +conn_1:close() +assert(not stream_1:ping()) +stream_2:close() +assert(not conn_2:ping()) +conn = net_box.connect(server_addr) +stream = conn:new_stream() +-- The new method `new_stream`, for the stream object, returns a new +-- stream object, just as in the case of connection. +_ = stream:new_stream() +conn:close() + +-- Check that spaces in stream object updates, during reload_schema +conn = net_box.connect(server_addr) +stream = conn:new_stream() +test_run:switch("test") +-- Create one space on server +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:switch("default") +assert(not conn.space.test) +assert(not stream.space.test) +assert(conn.schema_version == stream._schema_version) +conn:reload_schema() +assert(conn.space.test ~= nil) +assert(conn.schema_version ~= stream._schema_version) +assert(stream.space.test ~= nil) +-- When we touch stream.space, we compare stream._schema_version +-- and conn.schema_version if they are not equal, we clear stream +-- space cache, update it's _schema_version and load space from +-- connection to stream space cache. +assert(conn.schema_version == stream._schema_version) +collectgarbage() +collectgarbage() +assert(conn.space.test ~= nil) +assert(stream.space.test ~= nil) +test_run:switch("test") +s:drop() +test_run:switch("default") +conn:reload_schema() +assert(not conn.space.test) +assert(not stream.space.test) +test_run:cmd("stop server test") + +-- All test works with iproto_thread count = 10 + +test_run:cmd("start server test with args='10'") +test_run:switch('test') +fiber = require('fiber') +s = box.schema.space.create('test', { engine = 'memtx' }) +_ = s:create_index('primary') +test_run:cmd("setopt delimiter ';'") +function replace_with_yeild(item) + fiber.sleep(0.1) + return s:replace({item}) +end; +test_run:cmd("setopt delimiter ''"); +test_run:switch('default') + +conn = net_box.connect(server_addr) +assert(conn:ping()) +conn_space = conn.space.test +stream = conn:new_stream() +stream_space = stream.space.test + +-- Check that all requests in stream processed consistently +futures = {} +replace_count = 3 +test_run:cmd("setopt delimiter ';'") +for i = 1, replace_count do + futures[string.format("replace_%d", i)] = + stream_space:replace({i}, {is_async = true}) + futures[string.format("select_%d", i)] = + stream_space:select({}, {is_async = true}) +end; +futures["replace_with_yeild_for_stream"] = + stream:call("replace_with_yeild", + { replace_count + 1 }, {is_async = true}); +futures["select_with_yeild_for_stream"] = + stream_space:select({}, {is_async = true}); +test_run:cmd("setopt delimiter ''"); +results = wait_and_return_results(futures) +-- [1] +assert(results["select_1"]) +-- [1] [2] +assert(results["select_2"]) +-- [1] [2] [3] +assert(results["select_3"]) +-- [1] [2] [3] [4] +-- Even yeild in replace function does not affect +-- the order of requests execution in stream +assert(results["select_with_yeild_for_stream"]) + +-- There is no request execution order for the connection +futures = {} +test_run:cmd("setopt delimiter ';'") +futures["replace_with_yeild_for_connection"] = + conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true}); +futures["select_with_yeild_for_connection"] = + conn_space:select({}, {is_async = true}); +test_run:cmd("setopt delimiter ''"); +results = wait_and_return_results(futures) +-- [1] [2] [3] [4] +-- Select will be processed earlier because of +-- yeild in `replace_with_yeild` function +assert(results["select_with_yeild_for_connection"]) +test_run:switch("test") +-- [1] [2] [3] [4] [5] +s:select() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch('default') +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) + +-- Check that all request will be processed +-- after connection close. +conn = net_box.connect(server_addr) +stream = conn:new_stream() +space = stream.space.test +test_run:cmd("setopt delimiter ';'") +replace_count = 20 +for i = 1, replace_count do + space:replace({i}, {is_async = true}) +end; +test_run:cmd("setopt delimiter ''"); +-- Give time to send +fiber.sleep(0) +conn:close() +test_run:wait_cond(function () return get_current_connection_count() == 0 end) +test_run:switch("test") +-- select return tuples from [1] to [20] +-- because all messages processed after +-- connection closed +s:select{} +s:drop() +errinj = box.error.injection +assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0) +assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0) +test_run:switch("default") +test_run:cmd("stop server test") + +test_run:cmd("cleanup server test") +test_run:cmd("delete server test") diff --git a/test/box/suite.ini b/test/box/suite.ini index b5d869fb3..94cf7811f 100644 --- a/test/box/suite.ini +++ b/test/box/suite.ini @@ -5,7 +5,7 @@ script = box.lua disabled = rtree_errinj.test.lua tuple_bench.test.lua long_run = huge_field_map_long.test.lua config = engine.cfg -release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua +release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua stream.test.lua lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua use_unix_sockets = True use_unix_sockets_iproto = True -- 2.20.1
next prev parent reply other threads:[~2021-08-11 9:00 UTC|newest] Thread overview: 14+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 1/8] xrow: remove unused call_request::header mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 2/8] iproto: clear request::header for client requests mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 3/8] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 4/8] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 5/8] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches 2021-08-11 11:30 ` Vladimir Davydov via Tarantool-patches 2021-08-11 8:56 ` mechanik20051988 via Tarantool-patches [this message] 2021-08-11 11:52 ` [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box Vladimir Davydov via Tarantool-patches 2021-08-11 12:09 ` Vladimir Davydov via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches 2021-08-11 12:39 ` Vladimir Davydov via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 8/8] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches 2021-08-11 12:47 ` Vladimir Davydov via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=56db4eea5c868187e151658427e1a6d65736771a.1628671235.git.mechanik20051988@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=mechanik20.05.1988@gmail.com \ --cc=mechanik20051988@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --cc=vdavydov@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox